Unverified Commit ab0a8b90 authored by Yongsheng Xu's avatar Yongsheng Xu Committed by GitHub

implement dictionary compression decompression (#192)

This is a subtask of issue: https://github.com/tikv/tikv/issues/5743

> Read blob from file with dictionary compression

Mainly made following changes:

- Refactored `BlobDecoder`, add decompression related components in it.
- Refactored `BlobFileReader` and `BlobFileIterator`, initialize the decoder with corresponding data in blob files.
- add testcases

Issue Number: https://github.com/tikv/tikv/issues/8635
parent 54a20a56
#include "blob_file_iterator.h"
#include "blob_file_reader.h"
#include "util.h"
#include "util/crc32c.h"
......@@ -28,10 +29,6 @@ bool BlobFileIterator::Init() {
}
BlobFileHeader blob_file_header;
status_ = blob_file_header.DecodeFrom(&slice);
if (blob_file_header.flags & BlobFileHeader::kHasUncompressionDictionary) {
status_ = Status::NotSupported(
"blob file with dictionary compression is not supported yet");
}
if (!status_.ok()) {
return false;
}
......@@ -47,8 +44,29 @@ bool BlobFileIterator::Init() {
if (!status_.ok()) return false;
BlobFileFooter blob_file_footer;
status_ = blob_file_footer.DecodeFrom(&slice);
end_of_blob_record_ = file_size_ - BlobFileFooter::kEncodedLength -
blob_file_footer.meta_index_handle.size();
end_of_blob_record_ = file_size_ - BlobFileFooter::kEncodedLength;
if (!blob_file_footer.meta_index_handle.IsNull()) {
end_of_blob_record_ -=
(blob_file_footer.meta_index_handle.size() + kBlockTrailerSize);
}
if (blob_file_header.flags & BlobFileHeader::kHasUncompressionDictionary) {
status_ = InitUncompressionDict(blob_file_footer, file_.get(),
&uncompression_dict_);
if (!status_.ok()) {
return false;
}
decoder_.SetUncompressionDict(uncompression_dict_.get());
// the layout of blob file is like:
// | .... |
// | records |
// | compression dict + kBlockTrailerSize(5) |
// | metaindex block(40) + kBlockTrailerSize(5) |
// | footer(kEncodedLength: 32) |
end_of_blob_record_ -=
(uncompression_dict_->GetRawDict().size() + kBlockTrailerSize);
}
assert(end_of_blob_record_ > BlobFileHeader::kMinEncodedLength);
init_ = true;
return true;
......@@ -87,8 +105,9 @@ void BlobFileIterator::IterateForPrev(uint64_t offset) {
FixedSlice<kRecordHeaderSize> header_buffer;
iterate_offset_ = header_size_;
for (; iterate_offset_ < offset; iterate_offset_ += total_length) {
// With for_compaction=true, rate_limiter is enabled. Since BlobFileIterator
// is only used for GC, we always set for_compaction to true.
// With for_compaction=true, rate_limiter is enabled. Since
// BlobFileIterator is only used for GC, we always set for_compaction to
// true.
status_ = file_->Read(iterate_offset_, kRecordHeaderSize, &header_buffer,
header_buffer.get(), true /*for_compaction*/);
if (!status_.ok()) return;
......
......@@ -58,7 +58,9 @@ class BlobFileIterator {
Status status_;
bool valid_{false};
std::unique_ptr<UncompressionDict> uncompression_dict_;
BlobDecoder decoder_;
uint64_t iterate_offset_{0};
std::vector<char> buffer_;
OwnedSlice uncompressed_;
......
......@@ -136,6 +136,18 @@ TEST_F(BlobFileIteratorTest, Basic) {
TestBlobFileIterator();
}
TEST_F(BlobFileIteratorTest, DictCompress) {
#if ZSTD_VERSION_NUMBER >= 10103
CompressionOptions compression_opts;
compression_opts.enabled = true;
compression_opts.max_dict_bytes = 4000;
titan_options_.blob_file_compression = kZSTD;
titan_options_.blob_file_compression_options = compression_opts;
TestBlobFileIterator();
#endif
}
TEST_F(BlobFileIteratorTest, IterateForPrev) {
NewBuilder();
const int n = 1000;
......
......@@ -7,6 +7,8 @@
#include <inttypes.h>
#include "file/filename.h"
#include "table/block_based/block.h"
#include "table/internal_iterator.h"
#include "test_util/sync_point.h"
#include "titan_stats.h"
#include "util/crc32c.h"
......@@ -52,6 +54,27 @@ void EncodeBlobCache(std::string* dst, const Slice& prefix, uint64_t offset) {
PutVarint64(dst, offset);
}
// Seek to the specified meta block.
// Return true if it successfully seeks to that block.
Status SeekToMetaBlock(InternalIterator* meta_iter,
const std::string& block_name, bool* is_found,
BlockHandle* block_handle = nullptr) {
if (block_handle != nullptr) {
*block_handle = BlockHandle::NullBlockHandle();
}
*is_found = false;
meta_iter->Seek(block_name);
if (meta_iter->status().ok() && meta_iter->Valid() &&
meta_iter->key() == block_name) {
*is_found = true;
if (block_handle) {
Slice v = meta_iter->value();
return block_handle->DecodeFrom(&v);
}
}
return meta_iter->status();
}
} // namespace
Status BlobFileReader::Open(const TitanCFOptions& options,
......@@ -68,10 +91,6 @@ Status BlobFileReader::Open(const TitanCFOptions& options,
if (!s.ok()) {
return s;
}
if (header.flags & BlobFileHeader::kHasUncompressionDictionary) {
return Status::NotSupported(
"blob file with dictionary compression is not supported yet");
}
FixedSlice<BlobFileFooter::kEncodedLength> buffer;
s = file->Read(file_size - BlobFileFooter::kEncodedLength,
......@@ -88,6 +107,13 @@ Status BlobFileReader::Open(const TitanCFOptions& options,
auto reader = new BlobFileReader(options, std::move(file), stats);
reader->footer_ = footer;
if (header.flags & BlobFileHeader::kHasUncompressionDictionary) {
s = InitUncompressionDict(footer, reader->file_.get(),
&reader->uncompression_dict_);
if (!s.ok()) {
return s;
}
}
result->reset(reader);
return Status::OK();
}
......@@ -169,7 +195,9 @@ Status BlobFileReader::ReadRecord(const BlobHandle& handle, BlobRecord* record,
" not equal to blob size " + ToString(handle.size));
}
BlobDecoder decoder;
BlobDecoder decoder(uncompression_dict_ == nullptr
? &UncompressionDict::GetEmptyDict()
: uncompression_dict_.get());
s = decoder.DecodeHeader(&blob);
if (!s.ok()) {
return s;
......@@ -199,5 +227,58 @@ Status BlobFilePrefetcher::Get(const ReadOptions& options,
return reader_->Get(options, handle, record, buffer);
}
Status InitUncompressionDict(
const BlobFileFooter& footer, RandomAccessFileReader* file,
std::unique_ptr<UncompressionDict>* uncompression_dict) {
// TODO: Cache the compression dictionary in either block cache or blob cache.
#if ZSTD_VERSION_NUMBER < 10103
return Status::NotSupported("the version of libztsd is too low");
#endif
// 1. read meta index block
// 2. read dictionary
// 3. reset the dictionary
assert(footer.meta_index_handle.size() > 0);
BlockHandle meta_index_handle = footer.meta_index_handle;
Slice blob;
CacheAllocationPtr ubuf(new char[meta_index_handle.size()]);
Status s = file->Read(meta_index_handle.offset(), meta_index_handle.size(),
&blob, ubuf.get());
if (!s.ok()) {
return s;
}
BlockContents meta_block_content(std::move(ubuf), meta_index_handle.size());
std::unique_ptr<Block> meta(
new Block(std::move(meta_block_content), kDisableGlobalSequenceNumber));
std::unique_ptr<InternalIterator> meta_iter(
meta->NewDataIterator(BytewiseComparator(), BytewiseComparator()));
bool dict_is_found = false;
BlockHandle dict_block;
s = SeekToMetaBlock(meta_iter.get(), kCompressionDictBlock, &dict_is_found,
&dict_block);
if (!s.ok()) {
return s;
}
if (!dict_is_found) {
return Status::NotFound("uncompression dict");
}
Slice dict_slice;
CacheAllocationPtr dict_buf(new char[dict_block.size()]);
s = file->Read(dict_block.offset(), dict_block.size(), &dict_slice,
dict_buf.get());
if (!s.ok()) {
return s;
}
std::string dict_str(dict_buf.get(), dict_buf.get() + dict_block.size());
uncompression_dict->reset(new UncompressionDict(dict_str, true));
return s;
}
} // namespace titandb
} // namespace rocksdb
......@@ -50,6 +50,8 @@ class BlobFileReader {
// Information read from the file.
BlobFileFooter footer_;
std::unique_ptr<UncompressionDict> uncompression_dict_ = nullptr;
TitanStats* stats_;
};
......@@ -70,5 +72,12 @@ class BlobFilePrefetcher : public Cleanable {
uint64_t readahead_limit_{0};
};
// Init uncompression dictionary
// called by BlobFileReader and BlobFileIterator when blob file has
// uncompression dictionary
Status InitUncompressionDict(
const BlobFileFooter& footer, RandomAccessFileReader* file,
std::unique_ptr<UncompressionDict>* uncompression_dict);
} // namespace titandb
} // namespace rocksdb
......@@ -65,8 +65,8 @@ Status BlobDecoder::DecodeHeader(Slice* src) {
if (!GetFixed32(src, &record_size_) || !GetChar(src, &compression)) {
return Status::Corruption("BlobHeader");
}
compression_ = static_cast<CompressionType>(compression);
compression_ = static_cast<CompressionType>(compression);
return Status::OK();
}
......@@ -85,7 +85,7 @@ Status BlobDecoder::DecodeRecord(Slice* src, BlobRecord* record,
return DecodeInto(input, record);
}
UncompressionContext ctx(compression_);
UncompressionInfo info(ctx, uncompression_dict_, compression_);
UncompressionInfo info(ctx, *uncompression_dict_, compression_);
Status s = Uncompress(info, input, buffer);
if (!s.ok()) {
return s;
......
......@@ -106,13 +106,20 @@ class BlobEncoder {
class BlobDecoder {
public:
BlobDecoder(const UncompressionDict& uncompression_dict)
: uncompression_dict_(uncompression_dict) {}
BlobDecoder() : BlobDecoder(UncompressionDict::GetEmptyDict()) {}
BlobDecoder(const UncompressionDict* uncompression_dict,
CompressionType compression = kNoCompression)
: compression_(compression), uncompression_dict_(uncompression_dict) {}
BlobDecoder()
: BlobDecoder(&UncompressionDict::GetEmptyDict(), kNoCompression) {}
Status DecodeHeader(Slice* src);
Status DecodeRecord(Slice* src, BlobRecord* record, OwnedSlice* buffer);
void SetUncompressionDict(const UncompressionDict* uncompression_dict) {
uncompression_dict_ = uncompression_dict;
}
size_t GetRecordSize() const { return record_size_; }
private:
......@@ -120,7 +127,7 @@ class BlobDecoder {
uint32_t header_crc_{0};
uint32_t record_size_{0};
CompressionType compression_{kNoCompression};
const UncompressionDict& uncompression_dict_;
const UncompressionDict* uncompression_dict_;
};
// Format of blob handle (not fixed size):
......
......@@ -119,7 +119,7 @@ TEST(BlobFormatTest, BlobCompressionZSTD) {
UncompressionDict uncompression_dict(dict, true);
BlobEncoder encoder(kZSTD, &compression_dict);
BlobDecoder decoder(uncompression_dict);
BlobDecoder decoder(&uncompression_dict, kZSTD);
BlobRecord record;
record.key = "key1";
......
......@@ -372,7 +372,7 @@ TEST_F(TableBuilderTest, DictCompress) {
compression_opts.enabled = true;
compression_opts.max_dict_bytes = 4000;
cf_options_.blob_file_compression_options = compression_opts;
cf_options_.compression = kZSTD;
cf_options_.blob_file_compression = kZSTD;
table_factory_.reset(new TitanTableFactory(
db_options_, cf_options_, db_impl_.get(), blob_manager_, &mutex_,
......@@ -400,13 +400,32 @@ TEST_F(TableBuilderTest, DictCompress) {
std::unique_ptr<TableReader> base_reader;
NewTableReader(base_name_, &base_reader);
std::unique_ptr<BlobFileReader> blob_reader;
std::unique_ptr<RandomAccessFileReader> file;
NewFileReader(blob_name_, &file);
uint64_t file_size = 0;
ASSERT_OK(env_->GetFileSize(blob_name_, &file_size));
Status stat = BlobFileReader::Open(cf_options_, std::move(file), file_size,
&blob_reader, nullptr);
assert(stat.code() == Status::kNotSupported);
NewBlobFileReader(&blob_reader);
ReadOptions ro;
std::unique_ptr<InternalIterator> iter;
iter.reset(base_reader->NewIterator(ro, nullptr /*prefix_extractor*/,
nullptr /*arena*/, false /*skip_filters*/,
TableReaderCaller::kUncategorized));
iter->SeekToFirst();
for (char i = 0; i < n; i++) {
ASSERT_TRUE(iter->Valid());
std::string key(1, i);
ParsedInternalKey ikey;
ASSERT_TRUE(ParseInternalKey(iter->key(), &ikey));
ASSERT_EQ(ikey.user_key, key);
ASSERT_EQ(ikey.type, kTypeBlobIndex);
BlobIndex index;
ASSERT_OK(DecodeInto(iter->value(), &index));
ASSERT_EQ(index.file_number, kTestFileNumber);
BlobRecord record;
PinnableSlice buffer;
ASSERT_OK(blob_reader->Get(ro, index.blob_handle, &record, &buffer));
ASSERT_EQ(record.key, key);
ASSERT_EQ(record.value, std::string(kMinBlobSize, i));
iter->Next();
}
ASSERT_TRUE(!iter->Valid());
#endif
}
......@@ -416,7 +435,7 @@ TEST_F(TableBuilderTest, DictCompressDisorder) {
compression_opts.enabled = true;
compression_opts.max_dict_bytes = 4000;
cf_options_.blob_file_compression_options = compression_opts;
cf_options_.compression = kZSTD;
cf_options_.blob_file_compression = kZSTD;
table_factory_.reset(new TitanTableFactory(
db_options_, cf_options_, db_impl_.get(), blob_manager_, &mutex_,
......@@ -445,6 +464,8 @@ TEST_F(TableBuilderTest, DictCompressDisorder) {
ASSERT_OK(base_file->Close());
std::unique_ptr<TableReader> base_reader;
NewTableReader(base_name_, &base_reader);
std::unique_ptr<BlobFileReader> blob_reader;
NewBlobFileReader(&blob_reader);
ReadOptions ro;
std::unique_ptr<InternalIterator> iter;
......@@ -464,7 +485,14 @@ TEST_F(TableBuilderTest, DictCompressDisorder) {
ASSERT_EQ(iter->value(), std::string(1, i));
} else {
ASSERT_EQ(ikey.type, kTypeBlobIndex);
// TODO: reading is not implmented yet
BlobIndex index;
ASSERT_OK(DecodeInto(iter->value(), &index));
ASSERT_EQ(index.file_number, kTestFileNumber);
BlobRecord record;
PinnableSlice buffer;
ASSERT_OK(blob_reader->Get(ro, index.blob_handle, &record, &buffer));
ASSERT_EQ(record.key, key);
ASSERT_EQ(record.value, std::string(kMinBlobSize, i));
}
iter->Next();
}
......@@ -730,7 +758,7 @@ TEST_F(TableBuilderTest, LevelMergeWithDictCompressDisorder) {
compression_opts.enabled = true;
compression_opts.max_dict_bytes = 4000;
cf_options_.blob_file_compression_options = compression_opts;
cf_options_.compression = kZSTD;
cf_options_.blob_file_compression = kZSTD;
NewTableBuilder(base_file.get(), &table_builder, compression_opts,
cf_options_.num_levels - 1);
......@@ -753,6 +781,9 @@ TEST_F(TableBuilderTest, LevelMergeWithDictCompressDisorder) {
ro, nullptr /*prefix_extractor*/, nullptr /*arena*/,
false /*skip_filters*/, TableReaderCaller::kUncategorized));
std::unique_ptr<BlobFileReader> blob_reader;
NewBlobFileReader(&blob_reader);
first_iter->SeekToFirst();
second_iter->SeekToFirst();
// check orders of keys
......@@ -767,7 +798,27 @@ TEST_F(TableBuilderTest, LevelMergeWithDictCompressDisorder) {
ASSERT_TRUE(ParseInternalKey(first_iter->key(), &second_ikey));
ASSERT_EQ(first_ikey.type, second_ikey.type);
ASSERT_EQ(first_ikey.user_key, second_ikey.user_key);
// TODO: Compare blob records, need to implement decompression first
if (i % 2 == 0) {
// key: 0, 2, 4, 6 ..98
ASSERT_EQ(second_ikey.type, kTypeBlobIndex);
std::string key(1, i);
BlobIndex index;
ASSERT_OK(DecodeInto(second_iter->value(), &index));
BlobRecord record;
PinnableSlice buffer;
ASSERT_OK(blob_reader->Get(ro, index.blob_handle, &record, &buffer));
ASSERT_EQ(record.key, key);
ASSERT_EQ(record.value, std::string(kMinBlobSize, i));
} else {
// key: 1, 3, 5, ..99
std::string key(1, i);
std::string value = std::string(1, i);
ASSERT_EQ(second_ikey.type, kTypeValue);
ASSERT_EQ(second_iter->value(), value);
}
first_iter->Next();
second_iter->Next();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment