Commit e5731f44 authored by Phoebe Bell's avatar Phoebe Bell Committed by yiwu-arbug

Update blob file format for ZSTD dictionary compression (#109)

Adding a new version to blob file format. v2 blob files come with 4 more bytes in the header to hold future flags. In particular if `kHasUncompressionDictionary` is set, the whole file is compressed using dictionary compression and the per-blob compression type should be ignored.

Also update `BlobEncoder` and `BlobDecoder` to allow passing in a dictionary and use it to  compress/decompress. Add unit test for it.
parent 38e409cd
......@@ -18,11 +18,11 @@ BlobFileIterator::~BlobFileIterator() {}
bool BlobFileIterator::Init() {
Slice slice;
char header_buf[BlobFileHeader::kEncodedLength];
char header_buf[BlobFileHeader::kMaxEncodedLength];
// With for_compaction=true, rate_limiter is enabled. Since BlobFileIterator
// is only used for GC, we always set for_compaction to true.
status_ = file_->Read(0, BlobFileHeader::kEncodedLength, &slice, header_buf,
true /*for_compaction*/);
status_ = file_->Read(0, BlobFileHeader::kMaxEncodedLength, &slice,
header_buf, true /*for_compaction*/);
if (!status_.ok()) {
return false;
}
......@@ -31,6 +31,9 @@ bool BlobFileIterator::Init() {
if (!status_.ok()) {
return false;
}
header_size_ = blob_file_header.size();
char footer_buf[BlobFileFooter::kEncodedLength];
// With for_compaction=true, rate_limiter is enabled. Since BlobFileIterator
// is only used for GC, we always set for_compaction to true.
......@@ -42,7 +45,7 @@ bool BlobFileIterator::Init() {
status_ = blob_file_footer.DecodeFrom(&slice);
end_of_blob_record_ = file_size_ - BlobFileFooter::kEncodedLength -
blob_file_footer.meta_index_handle.size();
assert(end_of_blob_record_ > BlobFileHeader::kEncodedLength);
assert(end_of_blob_record_ > BlobFileHeader::kMinEncodedLength);
init_ = true;
return true;
}
......@@ -50,7 +53,7 @@ bool BlobFileIterator::Init() {
void BlobFileIterator::SeekToFirst() {
if (!init_ && !Init()) return;
status_ = Status::OK();
iterate_offset_ = BlobFileHeader::kEncodedLength;
iterate_offset_ = header_size_;
PrefetchAndGet();
}
......@@ -78,7 +81,7 @@ void BlobFileIterator::IterateForPrev(uint64_t offset) {
uint64_t total_length = 0;
FixedSlice<kRecordHeaderSize> header_buffer;
iterate_offset_ = BlobFileHeader::kEncodedLength;
iterate_offset_ = header_size_;
for (; iterate_offset_ < offset; iterate_offset_ += total_length) {
// With for_compaction=true, rate_limiter is enabled. Since BlobFileIterator
// is only used for GC, we always set for_compaction to true.
......
......@@ -32,6 +32,7 @@ class BlobFileIterator {
Slice key() const;
Slice value() const;
Status status() const { return status_; }
uint64_t header_size() const { return header_size_; }
void IterateForPrev(uint64_t);
......@@ -64,6 +65,7 @@ class BlobFileIterator {
BlobRecord cur_blob_record_;
uint64_t cur_record_offset_;
uint64_t cur_record_size_;
uint64_t header_size_;
uint64_t readahead_begin_offset_{0};
uint64_t readahead_end_offset_{0};
......
......@@ -82,7 +82,7 @@ Status BlobDecoder::DecodeRecord(Slice* src, BlobRecord* record,
return DecodeInto(input, record);
}
UncompressionContext ctx(compression_);
UncompressionInfo info(ctx, UncompressionDict::GetEmptyDict(), compression_);
UncompressionInfo info(ctx, uncompression_dict_, compression_);
Status s = Uncompress(info, input, buffer);
if (!s.ok()) {
return s;
......@@ -268,6 +268,10 @@ double BlobFileMeta::GetDiscardableRatio() const {
void BlobFileHeader::EncodeTo(std::string* dst) const {
PutFixed32(dst, kHeaderMagicNumber);
PutFixed32(dst, version);
if (version == BlobFileHeader::kVersion2) {
PutFixed32(dst, flags);
}
}
Status BlobFileHeader::DecodeFrom(Slice* src) {
......@@ -276,9 +280,16 @@ Status BlobFileHeader::DecodeFrom(Slice* src) {
return Status::Corruption(
"Blob file header magic number missing or mismatched.");
}
if (!GetFixed32(src, &version) || version != kVersion1) {
if (!GetFixed32(src, &version) ||
(version != kVersion1 && version != kVersion2)) {
return Status::Corruption("Blob file header version missing or invalid.");
}
if (version == BlobFileHeader::kVersion2) {
// Check that no other flags are set
if (!GetFixed32(src, &flags) || flags & ~kHasUncompressionDictionary) {
return Status::Corruption("Blob file header flags missing or invalid.");
}
}
return Status::OK();
}
......
......@@ -16,7 +16,15 @@ namespace titandb {
// [record head + record 2]
// ...
// [record head + record N]
// [blob file meta block 1]
// [blob file meta block 2]
// ...
// [blob file meta block M]
// [blob file meta index]
// [blob file footer]
//
// For now, the only kind of meta block is an optional uncompression dictionary
// indicated by a flag in the file header.
// Format of blob head (9 bytes):
//
......@@ -26,7 +34,7 @@ namespace titandb {
// | Fixed32 | Fixed32 | char |
// +---------+---------+-------------+
//
const uint64_t kBlobHeaderSize = 8;
const uint64_t kBlobMaxHeaderSize = 12;
const uint64_t kRecordHeaderSize = 9;
const uint64_t kBlobFooterSize = BlockHandle::kMaxEncodedLength + 8 + 4;
......@@ -52,11 +60,13 @@ struct BlobRecord {
class BlobEncoder {
public:
BlobEncoder(CompressionType compression)
BlobEncoder(CompressionType compression,
const CompressionDict& compression_dict)
: compression_ctx_(compression),
compression_info_(compression_opt_, compression_ctx_,
CompressionDict::GetEmptyDict(), compression,
0 /*sample_for_compression*/) {}
compression_info_(compression_opt_, compression_ctx_, compression_dict,
compression, 0 /*sample_for_compression*/) {}
BlobEncoder(CompressionType compression)
: BlobEncoder(compression, CompressionDict::GetEmptyDict()) {}
void EncodeRecord(const BlobRecord& record);
......@@ -77,6 +87,10 @@ class BlobEncoder {
class BlobDecoder {
public:
BlobDecoder(const UncompressionDict& uncompression_dict)
: uncompression_dict_(uncompression_dict) {}
BlobDecoder() : BlobDecoder(UncompressionDict::GetEmptyDict()) {}
Status DecodeHeader(Slice* src);
Status DecodeRecord(Slice* src, BlobRecord* record, OwnedSlice* buffer);
......@@ -87,6 +101,7 @@ class BlobDecoder {
uint32_t header_crc_{0};
uint32_t record_size_{0};
CompressionType compression_{kNoCompression};
const UncompressionDict& uncompression_dict_;
};
// Format of blob handle (not fixed size):
......@@ -218,7 +233,8 @@ class BlobFileMeta {
void AddDiscardableSize(uint64_t _discardable_size);
double GetDiscardableRatio() const;
bool NoLiveData() {
return discardable_size_ == file_size_ - kBlobHeaderSize - kBlobFooterSize;
return discardable_size_ ==
file_size_ - kBlobMaxHeaderSize - kBlobFooterSize;
}
TitanInternalStats::StatsType GetDiscardableRatioLevel() const;
......@@ -243,7 +259,7 @@ class BlobFileMeta {
bool gc_mark_{false};
};
// Format of blob file header (8 bytes):
// Format of blob file header for version 1 (8 bytes):
//
// +--------------+---------+
// | magic number | version |
......@@ -251,15 +267,36 @@ class BlobFileMeta {
// | Fixed32 | Fixed32 |
// +--------------+---------+
//
// For version 2, there are another 4 bytes for flags:
//
// +--------------+---------+---------+
// | magic number | version | flags |
// +--------------+---------+---------+
// | Fixed32 | Fixed32 | Fixed32 |
// +--------------+---------+---------+
//
// The header is mean to be compatible with header of BlobDB blob files, except
// we use a different magic number.
struct BlobFileHeader {
// The first 32bits from $(echo titandb/blob | sha1sum).
static const uint32_t kHeaderMagicNumber = 0x2be0a614ul;
static const uint32_t kVersion1 = 1;
static const uint64_t kEncodedLength = 4 + 4;
static const uint32_t kVersion2 = 2;
static const uint64_t kMinEncodedLength = 4 + 4;
static const uint64_t kMaxEncodedLength = 4 + 4 + 4;
// Flags:
static const uint32_t kHasUncompressionDictionary = 1 << 0;
uint32_t version = kVersion1;
uint32_t version = kVersion2;
uint32_t flags = 0;
uint64_t size() const {
return version == BlobFileHeader::kVersion1
? BlobFileHeader::kMinEncodedLength
: BlobFileHeader::kMaxEncodedLength;
}
void EncodeTo(std::string* dst) const;
Status DecodeFrom(Slice* src);
......@@ -282,6 +319,10 @@ struct BlobFileFooter {
BlockHandle meta_index_handle{BlockHandle::NullBlockHandle()};
// Points to a uncompression dictionary (which is also pointed to by the meta
// index) when `kHasUncompressionDictionary` is set in the header.
BlockHandle uncompression_dict_handle{BlockHandle::NullBlockHandle()};
void EncodeTo(std::string* dst) const;
Status DecodeFrom(Slice* src);
......
......@@ -67,6 +67,77 @@ TEST(BlobFormatTest, BlobFileStateTransit) {
ASSERT_EQ(compaction_output.file_state(), BlobFileMeta::FileState::kNormal);
}
TEST(BlobFormatTest, BlobCompressionLZ4) {
BlobEncoder encoder(kLZ4Compression);
BlobDecoder decoder;
BlobRecord record;
record.key = "key1";
record.value = "value1";
encoder.EncodeRecord(record);
Slice encoded_record = encoder.GetRecord();
Slice encoded_header = encoder.GetHeader();
decoder.DecodeHeader(&encoded_header);
BlobRecord decoded_record;
OwnedSlice blob;
decoder.DecodeRecord(&encoded_record, &decoded_record, &blob);
ASSERT_EQ(record, decoded_record);
}
#if defined(ZSTD)
std::string CreateDict() {
const int sample_count = 1000;
std::string samples = "";
std::vector<size_t> sample_lens;
BlobRecord record;
BlobEncoder encoder(kZSTD);
for (int i = 0; i < sample_count; ++i) {
record.key = "key" + std::to_string(i);
record.value = "value" + std::to_string(i);
encoder.EncodeRecord(record);
std::string encoded_record = encoder.GetRecord().ToString();
sample_lens.push_back(encoded_record.size());
samples += encoded_record;
}
return ZSTD_TrainDictionary(samples, sample_lens, 4000);
}
TEST(BlobFormatTest, BlobCompressionZSTD) {
auto dict = CreateDict();
CompressionDict compression_dict(dict, kZSTD, 10);
UncompressionDict uncompression_dict(dict, true);
BlobEncoder encoder(kZSTD, compression_dict);
BlobDecoder decoder(uncompression_dict);
BlobRecord record;
record.key = "key1";
record.value = "value1";
encoder.EncodeRecord(record);
Slice encoded_record = encoder.GetRecord();
Slice encoded_header = encoder.GetHeader();
decoder.DecodeHeader(&encoded_header);
BlobRecord decoded_record;
OwnedSlice blob;
decoder.DecodeRecord(&encoded_record, &decoded_record, &blob);
ASSERT_EQ(record, decoded_record);
}
#endif // ZSTD
} // namespace titandb
} // namespace rocksdb
......
......@@ -184,12 +184,14 @@ Status BlobGCJob::DoSample(const BlobFileMeta* file, bool* selected) {
if (*selected) return Status::OK();
// TODO: add do sample count metrics
auto records_size = file->file_size() - BlobFileHeader::kEncodedLength -
// `records_size` won't be accurate if the file is version 1, but this method
// is planned to be removed soon.
auto records_size = file->file_size() - BlobFileHeader::kMaxEncodedLength -
BlobFileFooter::kEncodedLength;
Status s;
uint64_t sample_size_window = static_cast<uint64_t>(
records_size * blob_gc_->titan_cf_options().sample_file_size_ratio);
uint64_t sample_begin_offset = BlobFileHeader::kEncodedLength;
uint64_t sample_begin_offset = BlobFileHeader::kMaxEncodedLength;
if (records_size != sample_size_window) {
Random64 random64(records_size);
sample_begin_offset += random64.Uniform(records_size - sample_size_window);
......@@ -207,7 +209,7 @@ Status BlobGCJob::DoSample(const BlobFileMeta* file, bool* selected) {
// TODO(@DorianZheng) sample_begin_offset maybe out of data block size, need
// more elegant solution
if (iter.status().IsInvalidArgument()) {
iter.IterateForPrev(BlobFileHeader::kEncodedLength);
iter.IterateForPrev(BlobFileHeader::kMaxEncodedLength);
}
if (!iter.status().ok()) {
s = iter.status();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment