Unverified Commit 54a20a56 authored by Yongsheng Xu's avatar Yongsheng Xu Committed by GitHub

Add dictionary compression support for blob file (#189)

Fixes #118

Briefly I've done following things:
- add compression options in TitanCFOptions
- set flags in header when there is compression dictionary
- add some members(states, sample data, etc.) in BlobFileBuilder for dictionary compression logic
- write the dictionary in meta blocks, when dictionary compression is enabled
- refactor BlobEncoder to support dictionary update

currently for decoding
- update BlobFileReader and BlobFileIterator to return Status::NotSupported if a blob file come with dictionary compression.
- add a test case for BlobFileReader

Issue Number: https://github.com/tikv/tikv/issues/8635Signed-off-by: 's avatarhexyoungs <chuxdesign@hotmail.com>
parent 80928f8e
...@@ -85,6 +85,12 @@ struct TitanCFOptions : public ColumnFamilyOptions { ...@@ -85,6 +85,12 @@ struct TitanCFOptions : public ColumnFamilyOptions {
// Default: kNoCompression // Default: kNoCompression
CompressionType blob_file_compression{kNoCompression}; CompressionType blob_file_compression{kNoCompression};
// The compression options. The `blob_file_compression.enabled` option is
// ignored, we only use `blob_file_compression` above to determine wether the
// blob file is compressed. We use this options mainly to configure the
// compression dictionary.
CompressionOptions blob_file_compression_options;
// The desirable blob file size. This is not a hard limit but a wish. // The desirable blob file size. This is not a hard limit but a wish.
// //
// Default: 256MB // Default: 256MB
......
...@@ -6,19 +6,120 @@ namespace titandb { ...@@ -6,19 +6,120 @@ namespace titandb {
BlobFileBuilder::BlobFileBuilder(const TitanDBOptions& db_options, BlobFileBuilder::BlobFileBuilder(const TitanDBOptions& db_options,
const TitanCFOptions& cf_options, const TitanCFOptions& cf_options,
WritableFileWriter* file) WritableFileWriter* file)
: cf_options_(cf_options), : builder_state_(cf_options.blob_file_compression_options.max_dict_bytes > 0
? BuilderState::kBuffered
: BuilderState::kUnbuffered),
cf_options_(cf_options),
file_(file), file_(file),
encoder_(cf_options_.blob_file_compression) { encoder_(cf_options.blob_file_compression,
cf_options.blob_file_compression_options) {
#if ZSTD_VERSION_NUMBER < 10103
if (cf_options_.blob_file_compression_options.max_dict_bytes > 0) {
status_ = Status::NotSupported("ZSTD version too old.");
return;
}
#endif
WriteHeader();
}
void BlobFileBuilder::WriteHeader() {
BlobFileHeader header; BlobFileHeader header;
if (cf_options_.blob_file_compression_options.max_dict_bytes > 0) {
header.flags |= BlobFileHeader::kHasUncompressionDictionary;
}
std::string buffer; std::string buffer;
header.EncodeTo(&buffer); header.EncodeTo(&buffer);
status_ = file_->Append(buffer); status_ = file_->Append(buffer);
} }
void BlobFileBuilder::Add(const BlobRecord& record, BlobHandle* handle) { void BlobFileBuilder::Add(const BlobRecord& record,
std::unique_ptr<BlobRecordContext> ctx,
OutContexts* out_ctx) {
if (!ok()) return; if (!ok()) return;
std::string key = record.key.ToString();
if (builder_state_ == BuilderState::kBuffered) {
std::string record_str;
// Encode to take ownership of underlying string.
record.EncodeTo(&record_str);
sample_records_.emplace_back(record_str);
sample_str_len_ += record_str.size();
cached_contexts_.emplace_back(std::move(ctx));
if (cf_options_.blob_file_compression_options.zstd_max_train_bytes > 0 &&
sample_str_len_ >=
cf_options_.blob_file_compression_options.zstd_max_train_bytes) {
EnterUnbuffered(out_ctx);
}
} else {
encoder_.EncodeRecord(record);
WriteEncoderData(&ctx->new_blob_index.blob_handle);
out_ctx->emplace_back(std::move(ctx));
}
// The keys added into blob files are in order.
// We do key range checks for both state
if (smallest_key_.empty()) {
smallest_key_.assign(record.key.data(), record.key.size());
}
assert(cf_options_.comparator->Compare(record.key, Slice(smallest_key_)) >=
0);
assert(cf_options_.comparator->Compare(record.key, Slice(largest_key_)) >= 0);
largest_key_.assign(record.key.data(), record.key.size());
}
void BlobFileBuilder::AddSmall(std::unique_ptr<BlobRecordContext> ctx) {
cached_contexts_.emplace_back(std::move(ctx));
}
void BlobFileBuilder::EnterUnbuffered(OutContexts* out_ctx) {
// Using collected samples to train the compression dictionary
// Then replay those records in memory, encode them to blob file
// When above things are done, transform builder state into unbuffered
std::string samples;
samples.reserve(sample_str_len_);
std::vector<size_t> sample_lens;
for (const auto& record_str : sample_records_) {
samples.append(record_str, 0, record_str.size());
sample_lens.emplace_back(record_str.size());
}
std::string dict;
dict = ZSTD_TrainDictionary(
samples, sample_lens,
cf_options_.blob_file_compression_options.max_dict_bytes);
compression_dict_.reset(
new CompressionDict(dict, cf_options_.blob_file_compression,
cf_options_.blob_file_compression_options.level));
encoder_.SetCompressionDict(compression_dict_.get());
FlushSampleRecords(out_ctx);
builder_state_ = BuilderState::kUnbuffered;
}
void BlobFileBuilder::FlushSampleRecords(OutContexts* out_ctx) {
assert(cached_contexts_.size() >= sample_records_.size());
size_t sample_idx = 0, ctx_idx = 0;
for (; sample_idx < sample_records_.size(); sample_idx++, ctx_idx++) {
const std::string& record_str = sample_records_[sample_idx];
for (; ctx_idx < cached_contexts_.size() &&
cached_contexts_[ctx_idx]->has_value;
ctx_idx++) {
out_ctx->emplace_back(std::move(cached_contexts_[ctx_idx]));
}
const std::unique_ptr<BlobRecordContext>& ctx = cached_contexts_[ctx_idx];
encoder_.EncodeSlice(record_str);
WriteEncoderData(&ctx->new_blob_index.blob_handle);
out_ctx->emplace_back(std::move(cached_contexts_[ctx_idx]));
}
assert(sample_idx == sample_records_.size());
assert(ctx_idx == cached_contexts_.size());
sample_records_.clear();
sample_str_len_ = 0;
cached_contexts_.clear();
}
encoder_.EncodeRecord(record); void BlobFileBuilder::WriteEncoderData(BlobHandle* handle) {
handle->offset = file_->GetFileSize(); handle->offset = file_->GetFileSize();
handle->size = encoder_.GetEncodedSize(); handle->size = encoder_.GetEncodedSize();
live_data_size_ += handle->size; live_data_size_ += handle->size;
...@@ -27,23 +128,56 @@ void BlobFileBuilder::Add(const BlobRecord& record, BlobHandle* handle) { ...@@ -27,23 +128,56 @@ void BlobFileBuilder::Add(const BlobRecord& record, BlobHandle* handle) {
if (ok()) { if (ok()) {
status_ = file_->Append(encoder_.GetRecord()); status_ = file_->Append(encoder_.GetRecord());
num_entries_++; num_entries_++;
// The keys added into blob files are in order.
if (smallest_key_.empty()) {
smallest_key_.assign(record.key.data(), record.key.size());
}
assert(cf_options_.comparator->Compare(record.key, Slice(smallest_key_)) >=
0);
assert(cf_options_.comparator->Compare(record.key, Slice(largest_key_)) >=
0);
largest_key_.assign(record.key.data(), record.key.size());
} }
} }
Status BlobFileBuilder::Finish() { void BlobFileBuilder::WriteRawBlock(const Slice& block, BlockHandle* handle) {
handle->set_offset(file_->GetFileSize());
handle->set_size(block.size());
status_ = file_->Append(block);
if (ok()) {
// follow rocksdb's block based table format
char trailer[kBlockTrailerSize];
// only compression dictionary and meta index block are written
// by this method, we use `kNoCompression` as placeholder
trailer[0] = kNoCompression;
char* trailer_without_type = trailer + 1;
// crc32 checksum
auto crc = crc32c::Value(block.data(), block.size());
crc = crc32c::Extend(crc, trailer, 1); // Extend to cover compression type
EncodeFixed32(trailer_without_type, crc32c::Mask(crc));
status_ = file_->Append(Slice(trailer, kBlockTrailerSize));
}
}
void BlobFileBuilder::WriteCompressionDictBlock(
MetaIndexBuilder* meta_index_builder) {
BlockHandle handle;
WriteRawBlock(compression_dict_->GetRawDict(), &handle);
if (ok()) {
meta_index_builder->Add(kCompressionDictBlock, handle);
}
}
Status BlobFileBuilder::Finish(OutContexts* out_ctx) {
if (!ok()) return status(); if (!ok()) return status();
std::string buffer; if (builder_state_ == BuilderState::kBuffered) {
EnterUnbuffered(out_ctx);
}
BlobFileFooter footer; BlobFileFooter footer;
// if has compression dictionary, encode it into meta blocks
if (cf_options_.blob_file_compression_options.max_dict_bytes > 0) {
BlockHandle meta_index_handle;
MetaIndexBuilder meta_index_builder;
WriteCompressionDictBlock(&meta_index_builder);
WriteRawBlock(meta_index_builder.Finish(), &meta_index_handle);
footer.meta_index_handle = meta_index_handle;
}
std::string buffer;
footer.EncodeTo(&buffer); footer.EncodeTo(&buffer);
status_ = file_->Append(buffer); status_ = file_->Append(buffer);
......
#pragma once #pragma once
#include "blob_format.h" #include "blob_format.h"
#include "table/meta_blocks.h"
#include "titan/options.h" #include "titan/options.h"
#include "util/autovector.h"
#include "util/compression.h"
#include "util/file_reader_writer.h" #include "util/file_reader_writer.h"
namespace rocksdb { namespace rocksdb {
...@@ -30,24 +33,65 @@ namespace titandb { ...@@ -30,24 +33,65 @@ namespace titandb {
// meta index block with block handles pointed to the meta blocks. The // meta index block with block handles pointed to the meta blocks. The
// meta block and the meta index block are formatted the same as the // meta block and the meta index block are formatted the same as the
// BlockBasedTable. // BlockBasedTable.
class BlobFileBuilder { class BlobFileBuilder {
public: public:
// States of the builder.
//
// - `kBuffered`: This is the initial state where zero or more data blocks are
// accumulated uncompressed in-memory. From this state, call
// `EnterUnbuffered()` to finalize the compression dictionary if enabled,
// compress/write out any buffered blocks, and proceed to the `kUnbuffered`
// state.
//
// - `kUnbuffered`: This is the state when compression dictionary is finalized
// either because it wasn't enabled in the first place or it's been created
// from sampling previously buffered data. In this state, blocks are simply
// compressed/written out as they fill up. From this state, call `Finish()`
// to complete the file (write meta-blocks, etc.), or `Abandon()` to delete
// the partially created file.
enum class BuilderState {
kBuffered,
kUnbuffered,
};
struct BlobRecordContext {
std::string key; // original internal key
BlobIndex original_blob_index;
BlobIndex new_blob_index;
bool has_value = false;
std::string value;
};
typedef autovector<std::unique_ptr<BlobRecordContext>> OutContexts;
// Constructs a builder that will store the contents of the file it // Constructs a builder that will store the contents of the file it
// is building in "*file". Does not close the file. It is up to the // is building in "*file". Does not close the file. It is up to the
// caller to sync and close the file after calling Finish(). // caller to sync and close the file after calling Finish().
BlobFileBuilder(const TitanDBOptions& db_options, BlobFileBuilder(const TitanDBOptions& db_options,
const TitanCFOptions& cf_options, WritableFileWriter* file); const TitanCFOptions& cf_options, WritableFileWriter* file);
// Adds the record to the file and points the handle to it. // Tries to add the record to the file
void Add(const BlobRecord& record, BlobHandle* handle); // Notice:
// 1. The `out_ctx` might be empty when builder is in `kBuffered` state.
// 2. Caller should set `ctx.new_blob_index.file_number` before pass it in,
// the file builder will only change the `blob_handle` of it
void Add(const BlobRecord& record, std::unique_ptr<BlobRecordContext> ctx,
OutContexts* out_ctx);
// AddSmall is used to prevent the disorder issue, small KV pairs and blob
// index block may be passed in here
void AddSmall(std::unique_ptr<BlobRecordContext> ctx);
// Returns builder state
BuilderState GetBuilderState() { return builder_state_; }
// Returns non-ok iff some error has been detected. // Returns non-ok iff some error has been detected.
Status status() const { return status_; } Status status() const { return status_; }
// Finishes building the table. // Finishes building the table, and return status.
// This method will return modify output contexts when it is called in
// `kBuffered` state.
// REQUIRES: Finish(), Abandon() have not been called. // REQUIRES: Finish(), Abandon() have not been called.
Status Finish(); Status Finish(OutContexts* out_ctx);
// Abandons building the table. If the caller is not going to call // Abandons building the table. If the caller is not going to call
// Finish(), it must call Abandon() before destroying this builder. // Finish(), it must call Abandon() before destroying this builder.
...@@ -63,7 +107,18 @@ class BlobFileBuilder { ...@@ -63,7 +107,18 @@ class BlobFileBuilder {
uint64_t live_data_size() const { return live_data_size_; } uint64_t live_data_size() const { return live_data_size_; }
private: private:
BuilderState builder_state_;
bool ok() const { return status().ok(); } bool ok() const { return status().ok(); }
// Enter unbuffered state, only be called after collecting enough samples
// for compression dictionary. It will modify `out_ctx` of the buffered
// records
void EnterUnbuffered(OutContexts* out_ctx);
void WriteHeader();
void WriteRawBlock(const Slice& block, BlockHandle* handle);
void WriteCompressionDictBlock(MetaIndexBuilder* meta_index_builder);
void FlushSampleRecords(OutContexts* out_ctx);
void WriteEncoderData(BlobHandle* handle);
TitanCFOptions cf_options_; TitanCFOptions cf_options_;
WritableFileWriter* file_; WritableFileWriter* file_;
...@@ -71,6 +126,13 @@ class BlobFileBuilder { ...@@ -71,6 +126,13 @@ class BlobFileBuilder {
Status status_; Status status_;
BlobEncoder encoder_; BlobEncoder encoder_;
// following 3 may be refactored in to Rep
std::vector<std::string> sample_records_;
uint64_t sample_str_len_ = 0;
std::unique_ptr<CompressionDict> compression_dict_;
OutContexts cached_contexts_;
uint64_t num_entries_ = 0; uint64_t num_entries_ = 0;
std::string smallest_key_; std::string smallest_key_;
std::string largest_key_; std::string largest_key_;
......
...@@ -28,6 +28,10 @@ bool BlobFileIterator::Init() { ...@@ -28,6 +28,10 @@ bool BlobFileIterator::Init() {
} }
BlobFileHeader blob_file_header; BlobFileHeader blob_file_header;
status_ = blob_file_header.DecodeFrom(&slice); status_ = blob_file_header.DecodeFrom(&slice);
if (blob_file_header.flags & BlobFileHeader::kHasUncompressionDictionary) {
status_ = Status::NotSupported(
"blob file with dictionary compression is not supported yet");
}
if (!status_.ok()) { if (!status_.ok()) {
return false; return false;
} }
......
...@@ -2,13 +2,12 @@ ...@@ -2,13 +2,12 @@
#include <cinttypes> #include <cinttypes>
#include "file/filename.h"
#include "test_util/testharness.h"
#include "util/random.h"
#include "blob_file_builder.h" #include "blob_file_builder.h"
#include "blob_file_cache.h" #include "blob_file_cache.h"
#include "blob_file_reader.h" #include "blob_file_reader.h"
#include "file/filename.h"
#include "test_util/testharness.h"
#include "util/random.h"
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
...@@ -67,17 +66,33 @@ class BlobFileIteratorTest : public testing::Test { ...@@ -67,17 +66,33 @@ class BlobFileIteratorTest : public testing::Test {
} }
void AddKeyValue(const std::string& key, const std::string& value, void AddKeyValue(const std::string& key, const std::string& value,
BlobHandle* blob_handle) { BlobFileBuilder::OutContexts& contexts) {
BlobRecord record; BlobRecord record;
record.key = key; record.key = key;
record.value = value; record.value = value;
builder_->Add(record, blob_handle);
std::unique_ptr<BlobFileBuilder::BlobRecordContext> c(
new BlobFileBuilder::BlobRecordContext);
InternalKey ikey(key, 1, kTypeValue);
c->key = ikey.Encode().ToString();
BlobFileBuilder::OutContexts cur_contexts;
builder_->Add(record, std::move(c), &cur_contexts);
ASSERT_OK(builder_->status()); ASSERT_OK(builder_->status());
for (size_t i = 0; i < cur_contexts.size(); i++) {
contexts.emplace_back(std::move(cur_contexts[i]));
}
} }
void FinishBuilder() { void FinishBuilder(BlobFileBuilder::OutContexts& contexts) {
ASSERT_OK(builder_->Finish()); BlobFileBuilder::OutContexts cur_contexts;
Status s = builder_->Finish(&cur_contexts);
ASSERT_OK(s);
ASSERT_OK(builder_->status()); ASSERT_OK(builder_->status());
for (size_t i = 0; i < cur_contexts.size(); i++) {
contexts.emplace_back(std::move(cur_contexts[i]));
}
} }
void NewBlobFileIterator() { void NewBlobFileIterator() {
...@@ -93,23 +108,25 @@ class BlobFileIteratorTest : public testing::Test { ...@@ -93,23 +108,25 @@ class BlobFileIteratorTest : public testing::Test {
NewBuilder(); NewBuilder();
const int n = 1000; const int n = 1000;
std::vector<BlobHandle> handles(n); BlobFileBuilder::OutContexts contexts;
for (int i = 0; i < n; i++) { for (int i = 0; i < n; i++) {
AddKeyValue(GenKey(i), GenValue(i), &handles[i]); AddKeyValue(GenKey(i), GenValue(i), contexts);
} }
FinishBuilder(); FinishBuilder(contexts);
NewBlobFileIterator(); NewBlobFileIterator();
blob_file_iterator_->SeekToFirst(); blob_file_iterator_->SeekToFirst();
ASSERT_EQ(contexts.size(), n);
for (int i = 0; i < n; blob_file_iterator_->Next(), i++) { for (int i = 0; i < n; blob_file_iterator_->Next(), i++) {
ASSERT_OK(blob_file_iterator_->status()); ASSERT_OK(blob_file_iterator_->status());
ASSERT_EQ(blob_file_iterator_->Valid(), true); ASSERT_EQ(blob_file_iterator_->Valid(), true);
ASSERT_EQ(GenKey(i), blob_file_iterator_->key()); ASSERT_EQ(GenKey(i), blob_file_iterator_->key());
ASSERT_EQ(GenValue(i), blob_file_iterator_->value()); ASSERT_EQ(GenValue(i), blob_file_iterator_->value());
BlobIndex blob_index = blob_file_iterator_->GetBlobIndex(); BlobIndex blob_index = blob_file_iterator_->GetBlobIndex();
ASSERT_EQ(handles[i], blob_index.blob_handle); ASSERT_EQ(contexts[i]->new_blob_index.blob_handle,
blob_index.blob_handle);
} }
} }
}; };
...@@ -122,68 +139,77 @@ TEST_F(BlobFileIteratorTest, Basic) { ...@@ -122,68 +139,77 @@ TEST_F(BlobFileIteratorTest, Basic) {
TEST_F(BlobFileIteratorTest, IterateForPrev) { TEST_F(BlobFileIteratorTest, IterateForPrev) {
NewBuilder(); NewBuilder();
const int n = 1000; const int n = 1000;
std::vector<BlobHandle> handles(n);
BlobFileBuilder::OutContexts contexts;
for (int i = 0; i < n; i++) { for (int i = 0; i < n; i++) {
AddKeyValue(GenKey(i), GenValue(i), &handles[i]); AddKeyValue(GenKey(i), GenValue(i), contexts);
} }
FinishBuilder(); FinishBuilder(contexts);
NewBlobFileIterator(); NewBlobFileIterator();
int i = n / 2; int i = n / 2;
blob_file_iterator_->IterateForPrev(handles[i].offset); ASSERT_EQ(contexts.size(), n);
BlobHandle blob_handle = contexts[i]->new_blob_index.blob_handle;
blob_file_iterator_->IterateForPrev(blob_handle.offset);
ASSERT_OK(blob_file_iterator_->status()); ASSERT_OK(blob_file_iterator_->status());
for (blob_file_iterator_->Next(); i < n; i++, blob_file_iterator_->Next()) { for (blob_file_iterator_->Next(); i < n; i++, blob_file_iterator_->Next()) {
ASSERT_OK(blob_file_iterator_->status()); ASSERT_OK(blob_file_iterator_->status());
ASSERT_EQ(blob_file_iterator_->Valid(), true); ASSERT_EQ(blob_file_iterator_->Valid(), true);
BlobIndex blob_index; BlobIndex blob_index;
blob_index = blob_file_iterator_->GetBlobIndex(); blob_index = blob_file_iterator_->GetBlobIndex();
ASSERT_EQ(handles[i], blob_index.blob_handle); blob_handle = contexts[i]->new_blob_index.blob_handle;
ASSERT_EQ(blob_handle, blob_index.blob_handle);
ASSERT_EQ(GenKey(i), blob_file_iterator_->key()); ASSERT_EQ(GenKey(i), blob_file_iterator_->key());
ASSERT_EQ(GenValue(i), blob_file_iterator_->value()); ASSERT_EQ(GenValue(i), blob_file_iterator_->value());
} }
auto idx = Random::GetTLSInstance()->Uniform(n); auto idx = Random::GetTLSInstance()->Uniform(n);
blob_file_iterator_->IterateForPrev(handles[idx].offset); blob_handle = contexts[idx]->new_blob_index.blob_handle;
blob_file_iterator_->IterateForPrev(blob_handle.offset);
ASSERT_OK(blob_file_iterator_->status()); ASSERT_OK(blob_file_iterator_->status());
blob_file_iterator_->Next(); blob_file_iterator_->Next();
ASSERT_OK(blob_file_iterator_->status()); ASSERT_OK(blob_file_iterator_->status());
ASSERT_TRUE(blob_file_iterator_->Valid()); ASSERT_TRUE(blob_file_iterator_->Valid());
BlobIndex blob_index; BlobIndex blob_index;
blob_index = blob_file_iterator_->GetBlobIndex(); blob_index = blob_file_iterator_->GetBlobIndex();
ASSERT_EQ(handles[idx], blob_index.blob_handle); ASSERT_EQ(blob_handle, blob_index.blob_handle);
while ((idx = Random::GetTLSInstance()->Uniform(n)) == 0) while ((idx = Random::GetTLSInstance()->Uniform(n)) == 0)
; ;
blob_file_iterator_->IterateForPrev(handles[idx].offset - kRecordHeaderSize - blob_handle = contexts[idx]->new_blob_index.blob_handle;
blob_file_iterator_->IterateForPrev(blob_handle.offset - kRecordHeaderSize -
1); 1);
ASSERT_OK(blob_file_iterator_->status()); ASSERT_OK(blob_file_iterator_->status());
blob_file_iterator_->Next(); blob_file_iterator_->Next();
ASSERT_OK(blob_file_iterator_->status()); ASSERT_OK(blob_file_iterator_->status());
ASSERT_TRUE(blob_file_iterator_->Valid()); ASSERT_TRUE(blob_file_iterator_->Valid());
blob_index = blob_file_iterator_->GetBlobIndex(); blob_index = blob_file_iterator_->GetBlobIndex();
ASSERT_EQ(handles[idx - 1], blob_index.blob_handle); blob_handle = contexts[idx - 1]->new_blob_index.blob_handle;
ASSERT_EQ(blob_handle, blob_index.blob_handle);
idx = Random::GetTLSInstance()->Uniform(n); idx = Random::GetTLSInstance()->Uniform(n);
blob_file_iterator_->IterateForPrev(handles[idx].offset + 1); blob_handle = contexts[idx]->new_blob_index.blob_handle;
blob_file_iterator_->IterateForPrev(blob_handle.offset + 1);
ASSERT_OK(blob_file_iterator_->status()); ASSERT_OK(blob_file_iterator_->status());
blob_file_iterator_->Next(); blob_file_iterator_->Next();
ASSERT_OK(blob_file_iterator_->status()); ASSERT_OK(blob_file_iterator_->status());
ASSERT_TRUE(blob_file_iterator_->Valid()); ASSERT_TRUE(blob_file_iterator_->Valid());
blob_index = blob_file_iterator_->GetBlobIndex(); blob_index = blob_file_iterator_->GetBlobIndex();
ASSERT_EQ(handles[idx], blob_index.blob_handle); ASSERT_EQ(blob_handle, blob_index.blob_handle);
} }
TEST_F(BlobFileIteratorTest, MergeIterator) { TEST_F(BlobFileIteratorTest, MergeIterator) {
const int kMaxKeyNum = 1000; const int kMaxKeyNum = 1000;
std::vector<BlobHandle> handles(kMaxKeyNum); BlobFileBuilder::OutContexts contexts;
std::vector<std::unique_ptr<BlobFileIterator>> iters; std::vector<std::unique_ptr<BlobFileIterator>> iters;
NewBuilder(); NewBuilder();
for (int i = 1; i < kMaxKeyNum; i++) { for (int i = 1; i < kMaxKeyNum; i++) {
AddKeyValue(GenKey(i), GenValue(i), &handles[i]); AddKeyValue(GenKey(i), GenValue(i), contexts);
if (i % 100 == 0) { if (i % 100 == 0) {
FinishBuilder(); FinishBuilder(contexts);
uint64_t file_size = 0; uint64_t file_size = 0;
ASSERT_OK(env_->GetFileSize(file_name_, &file_size)); ASSERT_OK(env_->GetFileSize(file_name_, &file_size));
NewBlobFileReader(file_number_, 0, titan_options_, env_options_, env_, NewBlobFileReader(file_number_, 0, titan_options_, env_options_, env_,
...@@ -197,7 +223,7 @@ TEST_F(BlobFileIteratorTest, MergeIterator) { ...@@ -197,7 +223,7 @@ TEST_F(BlobFileIteratorTest, MergeIterator) {
} }
} }
FinishBuilder(); FinishBuilder(contexts);
uint64_t file_size = 0; uint64_t file_size = 0;
ASSERT_OK(env_->GetFileSize(file_name_, &file_size)); ASSERT_OK(env_->GetFileSize(file_name_, &file_size));
NewBlobFileReader(file_number_, 0, titan_options_, env_options_, env_, NewBlobFileReader(file_number_, 0, titan_options_, env_options_, env_,
...@@ -213,7 +239,8 @@ TEST_F(BlobFileIteratorTest, MergeIterator) { ...@@ -213,7 +239,8 @@ TEST_F(BlobFileIteratorTest, MergeIterator) {
ASSERT_TRUE(iter.Valid()); ASSERT_TRUE(iter.Valid());
ASSERT_EQ(iter.key(), GenKey(i)); ASSERT_EQ(iter.key(), GenKey(i));
ASSERT_EQ(iter.value(), GenValue(i)); ASSERT_EQ(iter.value(), GenValue(i));
ASSERT_EQ(iter.GetBlobIndex().blob_handle, handles[i]); ASSERT_EQ(iter.GetBlobIndex().blob_handle,
contexts[i - 1]->new_blob_index.blob_handle);
} }
ASSERT_EQ(i, kMaxKeyNum); ASSERT_EQ(i, kMaxKeyNum);
} }
......
...@@ -8,11 +8,10 @@ ...@@ -8,11 +8,10 @@
#include "file/filename.h" #include "file/filename.h"
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "titan_stats.h"
#include "util/crc32c.h" #include "util/crc32c.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "titan_stats.h"
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
...@@ -64,9 +63,19 @@ Status BlobFileReader::Open(const TitanCFOptions& options, ...@@ -64,9 +63,19 @@ Status BlobFileReader::Open(const TitanCFOptions& options,
return Status::Corruption("file is too short to be a blob file"); return Status::Corruption("file is too short to be a blob file");
} }
BlobFileHeader header;
Status s = ReadHeader(file, &header);
if (!s.ok()) {
return s;
}
if (header.flags & BlobFileHeader::kHasUncompressionDictionary) {
return Status::NotSupported(
"blob file with dictionary compression is not supported yet");
}
FixedSlice<BlobFileFooter::kEncodedLength> buffer; FixedSlice<BlobFileFooter::kEncodedLength> buffer;
Status s = file->Read(file_size - BlobFileFooter::kEncodedLength, s = file->Read(file_size - BlobFileFooter::kEncodedLength,
BlobFileFooter::kEncodedLength, &buffer, buffer.get()); BlobFileFooter::kEncodedLength, &buffer, buffer.get());
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
...@@ -83,6 +92,18 @@ Status BlobFileReader::Open(const TitanCFOptions& options, ...@@ -83,6 +92,18 @@ Status BlobFileReader::Open(const TitanCFOptions& options,
return Status::OK(); return Status::OK();
} }
Status BlobFileReader::ReadHeader(std::unique_ptr<RandomAccessFileReader>& file,
BlobFileHeader* header) {
FixedSlice<BlobFileHeader::kMaxEncodedLength> buffer;
Status s =
file->Read(0, BlobFileHeader::kMaxEncodedLength, &buffer, buffer.get());
if (!s.ok()) return s;
s = DecodeInto(buffer, header);
return s;
}
BlobFileReader::BlobFileReader(const TitanCFOptions& options, BlobFileReader::BlobFileReader(const TitanCFOptions& options,
std::unique_ptr<RandomAccessFileReader> file, std::unique_ptr<RandomAccessFileReader> file,
TitanStats* stats) TitanStats* stats)
......
...@@ -38,6 +38,8 @@ class BlobFileReader { ...@@ -38,6 +38,8 @@ class BlobFileReader {
Status ReadRecord(const BlobHandle& handle, BlobRecord* record, Status ReadRecord(const BlobHandle& handle, BlobRecord* record,
OwnedSlice* buffer); OwnedSlice* buffer);
static Status ReadHeader(std::unique_ptr<RandomAccessFileReader>& file,
BlobFileHeader* header);
TitanCFOptions options_; TitanCFOptions options_;
std::unique_ptr<RandomAccessFileReader> file_; std::unique_ptr<RandomAccessFileReader> file_;
......
#include "file/filename.h" #include <cinttypes>
#include "test_util/testharness.h"
#include "blob_file_builder.h" #include "blob_file_builder.h"
#include "blob_file_cache.h" #include "blob_file_cache.h"
#include "blob_file_reader.h" #include "blob_file_reader.h"
#include "file/filename.h"
#include <cinttypes> #include "test_util/testharness.h"
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
...@@ -29,6 +28,28 @@ class BlobFileTest : public testing::Test { ...@@ -29,6 +28,28 @@ class BlobFileTest : public testing::Test {
std::string GenValue(uint64_t i) { return std::string(1024, i); } std::string GenValue(uint64_t i) { return std::string(1024, i); }
void AddRecord(BlobFileBuilder* builder, BlobRecord& record,
BlobFileBuilder::OutContexts& contexts) {
std::unique_ptr<BlobFileBuilder::BlobRecordContext> ctx(
new BlobFileBuilder::BlobRecordContext);
ctx->key = record.key.ToString();
BlobFileBuilder::OutContexts cur_contexts;
builder->Add(record, std::move(ctx), &cur_contexts);
for (size_t i = 0; i < cur_contexts.size(); i++) {
contexts.emplace_back(std::move(cur_contexts[i]));
}
}
Status Finish(BlobFileBuilder* builder,
BlobFileBuilder::OutContexts& contexts) {
BlobFileBuilder::OutContexts cur_contexts;
Status s = builder->Finish(&cur_contexts);
for (size_t i = 0; i < cur_contexts.size(); i++) {
contexts.emplace_back(std::move(cur_contexts[i]));
}
return s;
}
void TestBlobFilePrefetcher(TitanOptions options) { void TestBlobFilePrefetcher(TitanOptions options) {
options.dirname = dirname_; options.dirname = dirname_;
TitanDBOptions db_options(options); TitanDBOptions db_options(options);
...@@ -36,7 +57,7 @@ class BlobFileTest : public testing::Test { ...@@ -36,7 +57,7 @@ class BlobFileTest : public testing::Test {
BlobFileCache cache(db_options, cf_options, {NewLRUCache(128)}, nullptr); BlobFileCache cache(db_options, cf_options, {NewLRUCache(128)}, nullptr);
const int n = 100; const int n = 100;
std::vector<BlobHandle> handles(n); BlobFileBuilder::OutContexts contexts;
std::unique_ptr<WritableFileWriter> file; std::unique_ptr<WritableFileWriter> file;
{ {
...@@ -54,10 +75,12 @@ class BlobFileTest : public testing::Test { ...@@ -54,10 +75,12 @@ class BlobFileTest : public testing::Test {
BlobRecord record; BlobRecord record;
record.key = key; record.key = key;
record.value = value; record.value = value;
builder->Add(record, &handles[i]);
AddRecord(builder.get(), record, contexts);
ASSERT_OK(builder->status()); ASSERT_OK(builder->status());
} }
ASSERT_OK(builder->Finish()); ASSERT_OK(Finish(builder.get(), contexts));
ASSERT_OK(builder->status()); ASSERT_OK(builder->status());
uint64_t file_size = 0; uint64_t file_size = 0;
...@@ -66,6 +89,7 @@ class BlobFileTest : public testing::Test { ...@@ -66,6 +89,7 @@ class BlobFileTest : public testing::Test {
ReadOptions ro; ReadOptions ro;
std::unique_ptr<BlobFilePrefetcher> prefetcher; std::unique_ptr<BlobFilePrefetcher> prefetcher;
ASSERT_OK(cache.NewPrefetcher(file_number_, file_size, &prefetcher)); ASSERT_OK(cache.NewPrefetcher(file_number_, file_size, &prefetcher));
ASSERT_EQ(contexts.size(), n);
for (int i = 0; i < n; i++) { for (int i = 0; i < n; i++) {
auto key = GenKey(i); auto key = GenKey(i);
auto value = GenValue(i); auto value = GenValue(i);
...@@ -74,18 +98,19 @@ class BlobFileTest : public testing::Test { ...@@ -74,18 +98,19 @@ class BlobFileTest : public testing::Test {
expect.value = value; expect.value = value;
BlobRecord record; BlobRecord record;
PinnableSlice buffer; PinnableSlice buffer;
ASSERT_OK( BlobHandle blob_handle = contexts[i]->new_blob_index.blob_handle;
cache.Get(ro, file_number_, file_size, handles[i], &record, &buffer)); ASSERT_OK(cache.Get(ro, file_number_, file_size, blob_handle, &record,
&buffer));
ASSERT_EQ(record, expect); ASSERT_EQ(record, expect);
buffer.Reset(); buffer.Reset();
ASSERT_OK( ASSERT_OK(cache.Get(ro, file_number_, file_size, blob_handle, &record,
cache.Get(ro, file_number_, file_size, handles[i], &record, &buffer)); &buffer));
ASSERT_EQ(record, expect); ASSERT_EQ(record, expect);
buffer.Reset(); buffer.Reset();
ASSERT_OK(prefetcher->Get(ro, handles[i], &record, &buffer)); ASSERT_OK(prefetcher->Get(ro, blob_handle, &record, &buffer));
ASSERT_EQ(record, expect); ASSERT_EQ(record, expect);
buffer.Reset(); buffer.Reset();
ASSERT_OK(prefetcher->Get(ro, handles[i], &record, &buffer)); ASSERT_OK(prefetcher->Get(ro, blob_handle, &record, &buffer));
ASSERT_EQ(record, expect); ASSERT_EQ(record, expect);
} }
} }
...@@ -97,7 +122,7 @@ class BlobFileTest : public testing::Test { ...@@ -97,7 +122,7 @@ class BlobFileTest : public testing::Test {
BlobFileCache cache(db_options, cf_options, {NewLRUCache(128)}, nullptr); BlobFileCache cache(db_options, cf_options, {NewLRUCache(128)}, nullptr);
const int n = 100; const int n = 100;
std::vector<BlobHandle> handles(n); BlobFileBuilder::OutContexts contexts;
std::unique_ptr<WritableFileWriter> file; std::unique_ptr<WritableFileWriter> file;
{ {
...@@ -115,10 +140,13 @@ class BlobFileTest : public testing::Test { ...@@ -115,10 +140,13 @@ class BlobFileTest : public testing::Test {
BlobRecord record; BlobRecord record;
record.key = key; record.key = key;
record.value = value; record.value = value;
builder->Add(record, &handles[i]);
AddRecord(builder.get(), record, contexts);
ASSERT_OK(builder->status()); ASSERT_OK(builder->status());
} }
ASSERT_OK(builder->Finish());
ASSERT_OK(Finish(builder.get(), contexts));
ASSERT_OK(builder->status()); ASSERT_OK(builder->status());
uint64_t file_size = 0; uint64_t file_size = 0;
...@@ -132,6 +160,8 @@ class BlobFileTest : public testing::Test { ...@@ -132,6 +160,8 @@ class BlobFileTest : public testing::Test {
ASSERT_OK(BlobFileReader::Open(cf_options, ASSERT_OK(BlobFileReader::Open(cf_options,
std::move(random_access_file_reader), std::move(random_access_file_reader),
file_size, &blob_file_reader, nullptr)); file_size, &blob_file_reader, nullptr));
ASSERT_EQ(contexts.size(), n);
for (int i = 0; i < n; i++) { for (int i = 0; i < n; i++) {
auto key = GenKey(i); auto key = GenKey(i);
auto value = GenValue(i); auto value = GenValue(i);
...@@ -140,18 +170,19 @@ class BlobFileTest : public testing::Test { ...@@ -140,18 +170,19 @@ class BlobFileTest : public testing::Test {
expect.value = value; expect.value = value;
BlobRecord record; BlobRecord record;
PinnableSlice buffer; PinnableSlice buffer;
ASSERT_OK( BlobHandle blob_handle = contexts[i]->new_blob_index.blob_handle;
cache.Get(ro, file_number_, file_size, handles[i], &record, &buffer)); ASSERT_OK(cache.Get(ro, file_number_, file_size, blob_handle, &record,
&buffer));
ASSERT_EQ(record, expect); ASSERT_EQ(record, expect);
buffer.Reset(); buffer.Reset();
ASSERT_OK( ASSERT_OK(cache.Get(ro, file_number_, file_size, blob_handle, &record,
cache.Get(ro, file_number_, file_size, handles[i], &record, &buffer)); &buffer));
ASSERT_EQ(record, expect); ASSERT_EQ(record, expect);
buffer.Reset(); buffer.Reset();
ASSERT_OK(blob_file_reader->Get(ro, handles[i], &record, &buffer)); ASSERT_OK(blob_file_reader->Get(ro, blob_handle, &record, &buffer));
ASSERT_EQ(record, expect); ASSERT_EQ(record, expect);
buffer.Reset(); buffer.Reset();
ASSERT_OK(blob_file_reader->Get(ro, handles[i], &record, &buffer)); ASSERT_OK(blob_file_reader->Get(ro, blob_handle, &record, &buffer));
ASSERT_EQ(record, expect); ASSERT_EQ(record, expect);
} }
} }
......
...@@ -36,12 +36,15 @@ bool operator==(const BlobRecord& lhs, const BlobRecord& rhs) { ...@@ -36,12 +36,15 @@ bool operator==(const BlobRecord& lhs, const BlobRecord& rhs) {
void BlobEncoder::EncodeRecord(const BlobRecord& record) { void BlobEncoder::EncodeRecord(const BlobRecord& record) {
record_buffer_.clear(); record_buffer_.clear();
compressed_buffer_.clear(); record.EncodeTo(&record_buffer_);
EncodeSlice(record_buffer_);
}
void BlobEncoder::EncodeSlice(const Slice& record) {
compressed_buffer_.clear();
CompressionType compression; CompressionType compression;
record.EncodeTo(&record_buffer_); record_ =
record_ = Compress(compression_info_, record_buffer_, &compressed_buffer_, Compress(*compression_info_, record, &compressed_buffer_, &compression);
&compression);
assert(record_.size() < std::numeric_limits<uint32_t>::max()); assert(record_.size() < std::numeric_limits<uint32_t>::max());
EncodeFixed32(header_ + 4, static_cast<uint32_t>(record_.size())); EncodeFixed32(header_ + 4, static_cast<uint32_t>(record_.size()));
......
...@@ -61,15 +61,32 @@ struct BlobRecord { ...@@ -61,15 +61,32 @@ struct BlobRecord {
class BlobEncoder { class BlobEncoder {
public: public:
BlobEncoder(CompressionType compression, BlobEncoder(CompressionType compression, CompressionOptions compression_opt,
const CompressionDict& compression_dict) const CompressionDict* compression_dict)
: compression_ctx_(compression), : compression_opt_(compression_opt),
compression_info_(compression_opt_, compression_ctx_, compression_dict, compression_ctx_(compression),
compression, 0 /*sample_for_compression*/) {} compression_dict_(compression_dict),
compression_info_(new CompressionInfo(
compression_opt_, compression_ctx_, *compression_dict_, compression,
0 /*sample_for_compression*/)) {}
BlobEncoder(CompressionType compression) BlobEncoder(CompressionType compression)
: BlobEncoder(compression, CompressionDict::GetEmptyDict()) {} : BlobEncoder(compression, CompressionOptions(),
&CompressionDict::GetEmptyDict()) {}
BlobEncoder(CompressionType compression,
const CompressionDict* compression_dict)
: BlobEncoder(compression, CompressionOptions(), compression_dict) {}
BlobEncoder(CompressionType compression, CompressionOptions compression_opt)
: BlobEncoder(compression, compression_opt,
&CompressionDict::GetEmptyDict()) {}
void EncodeRecord(const BlobRecord& record); void EncodeRecord(const BlobRecord& record);
void EncodeSlice(const Slice& record);
void SetCompressionDict(const CompressionDict* compression_dict) {
compression_dict_ = compression_dict;
compression_info_.reset(new CompressionInfo(
compression_opt_, compression_ctx_, *compression_dict_,
compression_info_->type(), compression_info_->SampleForCompression()));
}
Slice GetHeader() const { return Slice(header_, sizeof(header_)); } Slice GetHeader() const { return Slice(header_, sizeof(header_)); }
Slice GetRecord() const { return record_; } Slice GetRecord() const { return record_; }
...@@ -83,7 +100,8 @@ class BlobEncoder { ...@@ -83,7 +100,8 @@ class BlobEncoder {
std::string compressed_buffer_; std::string compressed_buffer_;
CompressionOptions compression_opt_; CompressionOptions compression_opt_;
CompressionContext compression_ctx_; CompressionContext compression_ctx_;
CompressionInfo compression_info_; const CompressionDict* compression_dict_;
std::unique_ptr<CompressionInfo> compression_info_;
}; };
class BlobDecoder { class BlobDecoder {
...@@ -353,10 +371,6 @@ struct BlobFileFooter { ...@@ -353,10 +371,6 @@ struct BlobFileFooter {
BlockHandle meta_index_handle{BlockHandle::NullBlockHandle()}; BlockHandle meta_index_handle{BlockHandle::NullBlockHandle()};
// Points to a uncompression dictionary (which is also pointed to by the meta
// index) when `kHasUncompressionDictionary` is set in the header.
BlockHandle uncompression_dict_handle{BlockHandle::NullBlockHandle()};
void EncodeTo(std::string* dst) const; void EncodeTo(std::string* dst) const;
Status DecodeFrom(Slice* src); Status DecodeFrom(Slice* src);
......
#include "test_util/testharness.h"
#include "blob_format.h" #include "blob_format.h"
#include "test_util/testharness.h"
#include "testutil.h" #include "testutil.h"
#include "util.h" #include "util.h"
...@@ -118,7 +118,7 @@ TEST(BlobFormatTest, BlobCompressionZSTD) { ...@@ -118,7 +118,7 @@ TEST(BlobFormatTest, BlobCompressionZSTD) {
CompressionDict compression_dict(dict, kZSTD, 10); CompressionDict compression_dict(dict, kZSTD, 10);
UncompressionDict uncompression_dict(dict, true); UncompressionDict uncompression_dict(dict, true);
BlobEncoder encoder(kZSTD, compression_dict); BlobEncoder encoder(kZSTD, &compression_dict);
BlobDecoder decoder(uncompression_dict); BlobDecoder decoder(uncompression_dict);
BlobRecord record; BlobRecord record;
......
#ifndef __STDC_FORMAT_MACROS #ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS #define __STDC_FORMAT_MACROS
#endif #endif
#include "blob_gc_job.h"
#include <inttypes.h> #include <inttypes.h>
#include <memory> #include <memory>
#include "blob_gc_job.h"
#include "blob_file_size_collector.h" #include "blob_file_size_collector.h"
namespace rocksdb { namespace rocksdb {
...@@ -158,8 +158,6 @@ Status BlobGCJob::DoRunGC() { ...@@ -158,8 +158,6 @@ Status BlobGCJob::DoRunGC() {
std::unique_ptr<BlobFileHandle> blob_file_handle; std::unique_ptr<BlobFileHandle> blob_file_handle;
std::unique_ptr<BlobFileBuilder> blob_file_builder; std::unique_ptr<BlobFileBuilder> blob_file_builder;
auto* cfh = blob_gc_->column_family_handle();
// uint64_t drop_entry_num = 0; // uint64_t drop_entry_num = 0;
// uint64_t drop_entry_size = 0; // uint64_t drop_entry_size = 0;
// uint64_t total_entry_num = 0; // uint64_t total_entry_num = 0;
...@@ -234,32 +232,20 @@ Status BlobGCJob::DoRunGC() { ...@@ -234,32 +232,20 @@ Status BlobGCJob::DoRunGC() {
// blob index's size is counted in `RewriteValidKeyToLSM` // blob index's size is counted in `RewriteValidKeyToLSM`
metrics_.gc_bytes_written += blob_record.size(); metrics_.gc_bytes_written += blob_record.size();
MergeBlobIndex new_blob_index; // BlobRecordContext require key to be an internal key. We encode key to
new_blob_index.file_number = blob_file_handle->GetNumber(); // internal key in spite we only need the user key.
new_blob_index.source_file_number = blob_index.file_number; std::unique_ptr<BlobFileBuilder::BlobRecordContext> ctx(
new_blob_index.source_file_offset = blob_index.blob_handle.offset; new BlobFileBuilder::BlobRecordContext);
blob_file_builder->Add(blob_record, &new_blob_index.blob_handle); InternalKey ikey(blob_record.key, 1, kTypeValue);
std::string index_entry; ctx->key = ikey.Encode().ToString();
ctx->original_blob_index = blob_index;
ctx->new_blob_index.file_number = blob_file_handle->GetNumber();
BlobFileBuilder::OutContexts contexts;
blob_file_builder->Add(blob_record, std::move(ctx), &contexts);
BatchWriteNewIndices(contexts, &s);
if (!gc_merge_rewrite_) {
new_blob_index.EncodeToBase(&index_entry);
// Store WriteBatch for rewriting new Key-Index pairs to LSM
GarbageCollectionWriteCallback callback(cfh, blob_record.key.ToString(),
std::move(blob_index));
callback.value = index_entry;
rewrite_batches_.emplace_back(
std::make_pair(WriteBatch(), std::move(callback)));
auto& wb = rewrite_batches_.back().first;
s = WriteBatchInternal::PutBlobIndex(&wb, cfh->GetID(), blob_record.key,
index_entry);
} else {
new_blob_index.EncodeTo(&index_entry);
rewrite_batches_without_callback_.emplace_back(
std::make_pair(WriteBatch(), blob_index.blob_handle.size));
auto& wb = rewrite_batches_without_callback_.back().first;
s = WriteBatchInternal::Merge(&wb, cfh->GetID(), blob_record.key,
index_entry);
}
if (!s.ok()) { if (!s.ok()) {
break; break;
} }
...@@ -281,6 +267,48 @@ Status BlobGCJob::DoRunGC() { ...@@ -281,6 +267,48 @@ Status BlobGCJob::DoRunGC() {
return s; return s;
} }
void BlobGCJob::BatchWriteNewIndices(BlobFileBuilder::OutContexts& contexts,
Status* s) {
auto* cfh = blob_gc_->column_family_handle();
for (const std::unique_ptr<BlobFileBuilder::BlobRecordContext>& ctx :
contexts) {
MergeBlobIndex merge_blob_index;
merge_blob_index.file_number = ctx->new_blob_index.file_number;
merge_blob_index.source_file_number = ctx->original_blob_index.file_number;
merge_blob_index.source_file_offset =
ctx->original_blob_index.blob_handle.offset;
merge_blob_index.blob_handle = ctx->new_blob_index.blob_handle;
std::string index_entry;
BlobIndex original_index = ctx->original_blob_index;
ParsedInternalKey ikey;
if (!ParseInternalKey(ctx->key, &ikey)) {
*s = Status::Corruption(Slice());
return;
}
if (!gc_merge_rewrite_) {
merge_blob_index.EncodeToBase(&index_entry);
// Store WriteBatch for rewriting new Key-Index pairs to LSM
GarbageCollectionWriteCallback callback(cfh, ikey.user_key.ToString(),
std::move(original_index));
callback.value = index_entry;
rewrite_batches_.emplace_back(
std::make_pair(WriteBatch(), std::move(callback)));
auto& wb = rewrite_batches_.back().first;
*s = WriteBatchInternal::PutBlobIndex(&wb, cfh->GetID(), ikey.user_key,
index_entry);
} else {
merge_blob_index.EncodeTo(&index_entry);
rewrite_batches_without_callback_.emplace_back(
std::make_pair(WriteBatch(), original_index.blob_handle.size));
auto& wb = rewrite_batches_without_callback_.back().first;
*s = WriteBatchInternal::Merge(&wb, cfh->GetID(), ikey.user_key,
index_entry);
}
if (!s->ok()) break;
}
}
Status BlobGCJob::BuildIterator( Status BlobGCJob::BuildIterator(
std::unique_ptr<BlobFileMergeIterator>* result) { std::unique_ptr<BlobFileMergeIterator>* result) {
Status s; Status s;
...@@ -378,33 +406,33 @@ Status BlobGCJob::Finish() { ...@@ -378,33 +406,33 @@ Status BlobGCJob::Finish() {
Status BlobGCJob::InstallOutputBlobFiles() { Status BlobGCJob::InstallOutputBlobFiles() {
Status s; Status s;
std::vector<
std::pair<std::shared_ptr<BlobFileMeta>, std::unique_ptr<BlobFileHandle>>>
files;
std::string tmp;
for (auto& builder : blob_file_builders_) { for (auto& builder : blob_file_builders_) {
s = builder.second->Finish(); BlobFileBuilder::OutContexts contexts;
s = builder.second->Finish(&contexts);
BatchWriteNewIndices(contexts, &s);
if (!s.ok()) { if (!s.ok()) {
break; break;
} }
metrics_.gc_num_new_files++; metrics_.gc_num_new_files++;
auto file = std::make_shared<BlobFileMeta>(
builder.first->GetNumber(), builder.first->GetFile()->GetFileSize(), 0,
0, builder.second->GetSmallestKey(), builder.second->GetLargestKey());
file->set_live_data_size(builder.second->live_data_size());
file->FileStateTransit(BlobFileMeta::FileEvent::kGCOutput);
RecordInHistogram(statistics(stats_), TITAN_GC_OUTPUT_FILE_SIZE,
file->file_size());
if (!tmp.empty()) {
tmp.append(" ");
}
tmp.append(std::to_string(file->file_number()));
files.emplace_back(std::make_pair(file, std::move(builder.first)));
} }
if (s.ok()) { if (s.ok()) {
std::vector<std::pair<std::shared_ptr<BlobFileMeta>,
std::unique_ptr<BlobFileHandle>>>
files;
std::string tmp;
for (auto& builder : blob_file_builders_) {
auto file = std::make_shared<BlobFileMeta>(
builder.first->GetNumber(), builder.first->GetFile()->GetFileSize(),
0, 0, builder.second->GetSmallestKey(),
builder.second->GetLargestKey());
file->set_live_data_size(builder.second->live_data_size());
file->FileStateTransit(BlobFileMeta::FileEvent::kGCOutput);
RecordInHistogram(statistics(stats_), TITAN_GC_OUTPUT_FILE_SIZE,
file->file_size());
if (!tmp.empty()) {
tmp.append(" ");
}
tmp.append(std::to_string(file->file_number()));
files.emplace_back(std::make_pair(file, std::move(builder.first)));
}
ROCKS_LOG_BUFFER(log_buffer_, "[%s] output[%s]", ROCKS_LOG_BUFFER(log_buffer_, "[%s] output[%s]",
blob_gc_->column_family_handle()->GetName().c_str(), blob_gc_->column_family_handle()->GetName().c_str(),
tmp.c_str()); tmp.c_str());
...@@ -430,9 +458,9 @@ Status BlobGCJob::InstallOutputBlobFiles() { ...@@ -430,9 +458,9 @@ Status BlobGCJob::InstallOutputBlobFiles() {
"files: %s", "files: %s",
blob_gc_->column_family_handle()->GetName().c_str(), blob_gc_->column_family_handle()->GetName().c_str(),
to_delete_files.c_str()); to_delete_files.c_str());
// Do not set status `s` here, cause it may override the non-okay-status of // Do not set status `s` here, cause it may override the non-okay-status
// `s` so that in the outer funcation it will rewrite blob indexes to LSM by // of `s` so that in the outer funcation it will rewrite blob indexes to
// mistake. // LSM by mistake.
Status status = blob_file_manager_->BatchDeleteFiles(handles); Status status = blob_file_manager_->BatchDeleteFiles(handles);
if (!status.ok()) { if (!status.ok()) {
ROCKS_LOG_WARN(db_options_.info_log, ROCKS_LOG_WARN(db_options_.info_log,
...@@ -440,6 +468,7 @@ Status BlobGCJob::InstallOutputBlobFiles() { ...@@ -440,6 +468,7 @@ Status BlobGCJob::InstallOutputBlobFiles() {
to_delete_files.c_str(), status.ToString().c_str()); to_delete_files.c_str(), status.ToString().c_str());
} }
} }
return s; return s;
} }
......
...@@ -90,9 +90,10 @@ class BlobGCJob { ...@@ -90,9 +90,10 @@ class BlobGCJob {
uint64_t io_bytes_written_ = 0; uint64_t io_bytes_written_ = 0;
Status DoRunGC(); Status DoRunGC();
Status BuildIterator(std::unique_ptr<BlobFileMergeIterator>* result); void BatchWriteNewIndices(BlobFileBuilder::OutContexts &contexts, Status *s);
Status DiscardEntry(const Slice& key, const BlobIndex& blob_index, Status BuildIterator(std::unique_ptr<BlobFileMergeIterator> *result);
bool* discardable); Status DiscardEntry(const Slice &key, const BlobIndex &blob_index,
bool *discardable);
Status InstallOutputBlobFiles(); Status InstallOutputBlobFiles();
Status RewriteValidKeyToLSM(); Status RewriteValidKeyToLSM();
Status DeleteInputBlobFiles(); Status DeleteInputBlobFiles();
......
#include "rocksdb/convenience.h"
#include "test_util/testharness.h"
#include "blob_gc_job.h" #include "blob_gc_job.h"
#include "blob_gc_picker.h" #include "blob_gc_picker.h"
#include "db_impl.h" #include "db_impl.h"
#include "rocksdb/convenience.h"
#include "test_util/testharness.h"
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
......
...@@ -5,11 +5,23 @@ ...@@ -5,11 +5,23 @@
#endif #endif
#include <inttypes.h> #include <inttypes.h>
#include "monitoring/statistics.h" #include "monitoring/statistics.h"
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
std::unique_ptr<BlobFileBuilder::BlobRecordContext>
TitanTableBuilder::NewCachedRecordContext(const ParsedInternalKey& ikey,
const Slice& value) {
std::unique_ptr<BlobFileBuilder::BlobRecordContext> ctx(
new BlobFileBuilder::BlobRecordContext);
AppendInternalKey(&ctx->key, ikey);
ctx->has_value = true;
ctx->value = value.ToString();
return ctx;
}
void TitanTableBuilder::Add(const Slice& key, const Slice& value) { void TitanTableBuilder::Add(const Slice& key, const Slice& value) {
if (!ok()) return; if (!ok()) return;
...@@ -42,6 +54,7 @@ void TitanTableBuilder::Add(const Slice& key, const Slice& value) { ...@@ -42,6 +54,7 @@ void TitanTableBuilder::Add(const Slice& key, const Slice& value) {
ikey.type = kTypeValue; ikey.type = kTypeValue;
std::string index_key; std::string index_key;
AppendInternalKey(&index_key, ikey); AppendInternalKey(&index_key, ikey);
assert(blob_builder_ == nullptr);
base_builder_->Add(index_key, record.value); base_builder_->Add(index_key, record.value);
bytes_read_ += record.size(); bytes_read_ += record.size();
} else { } else {
...@@ -49,21 +62,27 @@ void TitanTableBuilder::Add(const Slice& key, const Slice& value) { ...@@ -49,21 +62,27 @@ void TitanTableBuilder::Add(const Slice& key, const Slice& value) {
// deleted. In this case we write the blob index as is to compaction // deleted. In this case we write the blob index as is to compaction
// output. // output.
// TODO: return error if it is indeed an error. // TODO: return error if it is indeed an error.
assert(blob_builder_ == nullptr);
base_builder_->Add(key, value); base_builder_->Add(key, value);
} }
} else if (ikey.type == kTypeValue && } else if (ikey.type == kTypeValue &&
value.size() >= cf_options_.min_blob_size &&
cf_options_.blob_run_mode == TitanBlobRunMode::kNormal) { cf_options_.blob_run_mode == TitanBlobRunMode::kNormal) {
// we write to blob file and insert index bool is_small_kv = value.size() < cf_options_.min_blob_size;
std::string index_value; if (is_small_kv) {
AddBlob(ikey.user_key, value, &index_value); if (builder_unbuffered()) {
UpdateIOBytes(prev_bytes_read, prev_bytes_written, &io_bytes_read_, // We can append this into SST safely, without disorder issue.
&io_bytes_written_); base_builder_->Add(key, value);
if (ok()) { } else {
ikey.type = kTypeBlobIndex; // We have to let builder to cache this KV pair, and it will be returned
std::string index_key; // when state changed
AppendInternalKey(&index_key, ikey); std::unique_ptr<BlobFileBuilder::BlobRecordContext> ctx =
base_builder_->Add(index_key, index_value); NewCachedRecordContext(ikey, value);
blob_builder_->AddSmall(std::move(ctx));
}
return;
} else {
// We write to blob file and insert index
AddBlob(ikey, value);
} }
} else if (ikey.type == kTypeBlobIndex && cf_options_.level_merge && } else if (ikey.type == kTypeBlobIndex && cf_options_.level_merge &&
target_level_ >= merge_level_ && target_level_ >= merge_level_ &&
...@@ -86,17 +105,8 @@ void TitanTableBuilder::Add(const Slice& key, const Slice& value) { ...@@ -86,17 +105,8 @@ void TitanTableBuilder::Add(const Slice& key, const Slice& value) {
// If not ok, write original blob index as compaction output without // If not ok, write original blob index as compaction output without
// doing level merge. // doing level merge.
if (get_status.ok()) { if (get_status.ok()) {
std::string index_value; AddBlob(ikey, record.value);
AddBlob(ikey.user_key, record.value, &index_value); if (ok()) return;
UpdateIOBytes(prev_bytes_read, prev_bytes_written, &io_bytes_read_,
&io_bytes_written_);
if (ok()) {
std::string index_key;
ikey.type = kTypeBlobIndex;
AppendInternalKey(&index_key, ikey);
base_builder_->Add(index_key, index_value);
return;
}
} else { } else {
++error_read_cnt_; ++error_read_cnt_;
ROCKS_LOG_DEBUG(db_options_.info_log, ROCKS_LOG_DEBUG(db_options_.info_log,
...@@ -104,18 +114,37 @@ void TitanTableBuilder::Add(const Slice& key, const Slice& value) { ...@@ -104,18 +114,37 @@ void TitanTableBuilder::Add(const Slice& key, const Slice& value) {
index.file_number, get_status.ToString().c_str()); index.file_number, get_status.ToString().c_str());
} }
} }
base_builder_->Add(key, value); if (builder_unbuffered()) {
base_builder_->Add(key, value);
} else {
std::unique_ptr<BlobFileBuilder::BlobRecordContext> ctx =
NewCachedRecordContext(ikey, value);
blob_builder_->AddSmall(std::move(ctx));
}
} else { } else {
assert(builder_unbuffered());
base_builder_->Add(key, value); base_builder_->Add(key, value);
} }
} }
void TitanTableBuilder::AddBlob(const Slice& key, const Slice& value, void TitanTableBuilder::AddBlob(const ParsedInternalKey& ikey,
std::string* index_value) { const Slice& value) {
if (!ok()) return; if (!ok()) return;
BlobRecord record;
record.key = ikey.user_key;
record.value = value;
uint64_t prev_bytes_read = 0;
uint64_t prev_bytes_written = 0;
SavePrevIOBytes(&prev_bytes_read, &prev_bytes_written);
BlobFileBuilder::OutContexts contexts;
StopWatch write_sw(db_options_.env, statistics(stats_), StopWatch write_sw(db_options_.env, statistics(stats_),
TITAN_BLOB_FILE_WRITE_MICROS); TITAN_BLOB_FILE_WRITE_MICROS);
// Init blob_builder_ first
if (!blob_builder_) { if (!blob_builder_) {
status_ = blob_manager_->NewFile(&blob_handle_); status_ = blob_manager_->NewFile(&blob_handle_);
if (!ok()) return; if (!ok()) return;
...@@ -127,25 +156,58 @@ void TitanTableBuilder::AddBlob(const Slice& key, const Slice& value, ...@@ -127,25 +156,58 @@ void TitanTableBuilder::AddBlob(const Slice& key, const Slice& value,
} }
RecordTick(statistics(stats_), TITAN_BLOB_FILE_NUM_KEYS_WRITTEN); RecordTick(statistics(stats_), TITAN_BLOB_FILE_NUM_KEYS_WRITTEN);
RecordInHistogram(statistics(stats_), TITAN_KEY_SIZE, key.size()); RecordInHistogram(statistics(stats_), TITAN_KEY_SIZE, record.key.size());
RecordInHistogram(statistics(stats_), TITAN_VALUE_SIZE, value.size()); RecordInHistogram(statistics(stats_), TITAN_VALUE_SIZE, record.value.size());
AddStats(stats_, cf_id_, TitanInternalStats::LIVE_BLOB_SIZE, value.size()); AddStats(stats_, cf_id_, TitanInternalStats::LIVE_BLOB_SIZE,
bytes_written_ += key.size() + value.size(); record.value.size());
bytes_written_ += record.key.size() + record.value.size();
BlobIndex index; std::unique_ptr<BlobFileBuilder::BlobRecordContext> ctx(
BlobRecord record; new BlobFileBuilder::BlobRecordContext);
record.key = key; AppendInternalKey(&ctx->key, ikey);
record.value = value; ctx->new_blob_index.file_number = blob_handle_->GetNumber();
index.file_number = blob_handle_->GetNumber(); blob_builder_->Add(record, std::move(ctx), &contexts);
blob_builder_->Add(record, &index.blob_handle);
RecordTick(statistics(stats_), TITAN_BLOB_FILE_BYTES_WRITTEN, UpdateIOBytes(prev_bytes_read, prev_bytes_written, &io_bytes_read_,
index.blob_handle.size); &io_bytes_written_);
bytes_written_ += record.size();
if (ok()) { if (blob_handle_->GetFile()->GetFileSize() >=
index.EncodeTo(index_value); cf_options_.blob_file_target_size) {
if (blob_handle_->GetFile()->GetFileSize() >= // if blob file hit the size limit, we have to finish it
cf_options_.blob_file_target_size) { // in this case, when calling `BlobFileBuilder::Finish`, builder will be in
FinishBlobFile(); // unbuffered state, so it will not trigger another `AddToBaseTable` call
FinishBlobFile();
}
AddToBaseTable(contexts);
}
void TitanTableBuilder::AddToBaseTable(
const BlobFileBuilder::OutContexts& contexts) {
if (contexts.empty()) return;
for (const std::unique_ptr<BlobFileBuilder::BlobRecordContext>& ctx :
contexts) {
ParsedInternalKey ikey;
if (!ParseInternalKey(ctx->key, &ikey)) {
status_ = Status::Corruption(Slice());
return;
}
if (ctx->has_value) {
// write directly to base table
base_builder_->Add(ctx->key, ctx->value);
} else {
RecordTick(statistics(stats_), TITAN_BLOB_FILE_BYTES_WRITTEN,
ctx->new_blob_index.blob_handle.size);
bytes_written_ += ctx->new_blob_index.blob_handle.size;
if (ok()) {
std::string index_value;
ctx->new_blob_index.EncodeTo(&index_value);
ikey.type = kTypeBlobIndex;
std::string index_key;
AppendInternalKey(&index_key, ikey);
base_builder_->Add(index_key, index_value);
}
} }
} }
} }
...@@ -155,11 +217,14 @@ void TitanTableBuilder::FinishBlobFile() { ...@@ -155,11 +217,14 @@ void TitanTableBuilder::FinishBlobFile() {
uint64_t prev_bytes_read = 0; uint64_t prev_bytes_read = 0;
uint64_t prev_bytes_written = 0; uint64_t prev_bytes_written = 0;
SavePrevIOBytes(&prev_bytes_read, &prev_bytes_written); SavePrevIOBytes(&prev_bytes_read, &prev_bytes_written);
blob_builder_->Finish(); Status s;
BlobFileBuilder::OutContexts contexts;
s = blob_builder_->Finish(&contexts);
UpdateIOBytes(prev_bytes_read, prev_bytes_written, &io_bytes_read_, UpdateIOBytes(prev_bytes_read, prev_bytes_written, &io_bytes_read_,
&io_bytes_written_); &io_bytes_written_);
AddToBaseTable(contexts);
if (ok()) { if (s.ok() && ok()) {
ROCKS_LOG_INFO(db_options_.info_log, ROCKS_LOG_INFO(db_options_.info_log,
"Titan table builder finish output file %" PRIu64 ".", "Titan table builder finish output file %" PRIu64 ".",
blob_handle_->GetNumber()); blob_handle_->GetNumber());
...@@ -192,8 +257,11 @@ Status TitanTableBuilder::status() const { ...@@ -192,8 +257,11 @@ Status TitanTableBuilder::status() const {
} }
Status TitanTableBuilder::Finish() { Status TitanTableBuilder::Finish() {
base_builder_->Finish();
FinishBlobFile(); FinishBlobFile();
// `FinishBlobFile()` may transform its state from `kBuffered` to
// `kUnbuffered`, in this case, the relative blob handles will be updated, so
// `base_builder_->Finish()` have to be after `FinishBlobFile()`
base_builder_->Finish();
status_ = blob_manager_->BatchFinishFiles(cf_id_, finished_blobs_); status_ = blob_manager_->BatchFinishFiles(cf_id_, finished_blobs_);
if (!status_.ok()) { if (!status_.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log, ROCKS_LOG_ERROR(db_options_.info_log,
......
#pragma once #pragma once
#include "rocksdb/types.h"
#include "blob_file_builder.h" #include "blob_file_builder.h"
#include "blob_file_manager.h" #include "blob_file_manager.h"
#include "blob_file_set.h" #include "blob_file_set.h"
#include "rocksdb/types.h"
#include "table/table_builder.h" #include "table/table_builder.h"
#include "titan/options.h" #include "titan/options.h"
#include "titan_stats.h" #include "titan_stats.h"
...@@ -51,7 +50,17 @@ class TitanTableBuilder : public TableBuilder { ...@@ -51,7 +50,17 @@ class TitanTableBuilder : public TableBuilder {
bool ok() const { return status().ok(); } bool ok() const { return status().ok(); }
void AddBlob(const Slice& key, const Slice& value, std::string* index_value); bool builder_unbuffered() const {
return !blob_builder_ || blob_builder_->GetBuilderState() ==
BlobFileBuilder::BuilderState::kUnbuffered;
}
std::unique_ptr<BlobFileBuilder::BlobRecordContext> NewCachedRecordContext(
const ParsedInternalKey& ikey, const Slice& value);
void AddBlob(const ParsedInternalKey& ikey, const Slice& value);
void AddToBaseTable(const BlobFileBuilder::OutContexts& contexts);
bool ShouldMerge(const std::shared_ptr<BlobFileMeta>& file); bool ShouldMerge(const std::shared_ptr<BlobFileMeta>& file);
......
#include "table/table_builder.h" #include "table/table_builder.h"
#include "file/filename.h"
#include "table/table_reader.h"
#include "test_util/testharness.h"
#include "blob_file_manager.h" #include "blob_file_manager.h"
#include "blob_file_reader.h" #include "blob_file_reader.h"
#include "blob_file_set.h" #include "blob_file_set.h"
#include "db_impl.h" #include "db_impl.h"
#include "file/filename.h"
#include "table/table_reader.h"
#include "table_builder.h" #include "table_builder.h"
#include "table_factory.h" #include "table_factory.h"
#include "test_util/testharness.h"
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
...@@ -245,6 +245,13 @@ class TableBuilderTest : public testing::Test { ...@@ -245,6 +245,13 @@ class TableBuilderTest : public testing::Test {
std::unique_ptr<TableBuilder>* result, std::unique_ptr<TableBuilder>* result,
int target_level = 0) { int target_level = 0) {
CompressionOptions compression_opts; CompressionOptions compression_opts;
NewTableBuilder(file, result, compression_opts, target_level);
}
void NewTableBuilder(WritableFileWriter* file,
std::unique_ptr<TableBuilder>* result,
CompressionOptions compression_opts,
int target_level = 0) {
TableBuilderOptions options(cf_ioptions_, cf_moptions_, TableBuilderOptions options(cf_ioptions_, cf_moptions_,
cf_ioptions_.internal_comparator, &collectors_, cf_ioptions_.internal_comparator, &collectors_,
kNoCompression, 0 /*sample_for_compression*/, kNoCompression, 0 /*sample_for_compression*/,
...@@ -359,6 +366,111 @@ TEST_F(TableBuilderTest, Basic) { ...@@ -359,6 +366,111 @@ TEST_F(TableBuilderTest, Basic) {
} }
} }
TEST_F(TableBuilderTest, DictCompress) {
#if ZSTD_VERSION_NUMBER >= 10103
CompressionOptions compression_opts;
compression_opts.enabled = true;
compression_opts.max_dict_bytes = 4000;
cf_options_.blob_file_compression_options = compression_opts;
cf_options_.compression = kZSTD;
table_factory_.reset(new TitanTableFactory(
db_options_, cf_options_, db_impl_.get(), blob_manager_, &mutex_,
blob_file_set_.get(), nullptr));
std::unique_ptr<WritableFileWriter> base_file;
NewBaseFileWriter(&base_file);
std::unique_ptr<TableBuilder> table_builder;
NewTableBuilder(base_file.get(), &table_builder);
// Build a base table and a blob file.
const int n = 100;
for (char i = 0; i < n; i++) {
std::string key(1, i);
InternalKey ikey(key, 1, kTypeValue);
std::string value;
value = std::string(kMinBlobSize, i);
table_builder->Add(ikey.Encode(), value);
}
ASSERT_OK(table_builder->Finish());
ASSERT_OK(base_file->Sync(true));
ASSERT_OK(base_file->Close());
BlobFileExists(true);
std::unique_ptr<TableReader> base_reader;
NewTableReader(base_name_, &base_reader);
std::unique_ptr<BlobFileReader> blob_reader;
std::unique_ptr<RandomAccessFileReader> file;
NewFileReader(blob_name_, &file);
uint64_t file_size = 0;
ASSERT_OK(env_->GetFileSize(blob_name_, &file_size));
Status stat = BlobFileReader::Open(cf_options_, std::move(file), file_size,
&blob_reader, nullptr);
assert(stat.code() == Status::kNotSupported);
#endif
}
TEST_F(TableBuilderTest, DictCompressDisorder) {
#if ZSTD_VERSION_NUMBER >= 10103
CompressionOptions compression_opts;
compression_opts.enabled = true;
compression_opts.max_dict_bytes = 4000;
cf_options_.blob_file_compression_options = compression_opts;
cf_options_.compression = kZSTD;
table_factory_.reset(new TitanTableFactory(
db_options_, cf_options_, db_impl_.get(), blob_manager_, &mutex_,
blob_file_set_.get(), nullptr));
std::unique_ptr<WritableFileWriter> base_file;
NewBaseFileWriter(&base_file);
std::unique_ptr<TableBuilder> table_builder;
NewTableBuilder(base_file.get(), &table_builder);
// Build a base table and a blob file.
const int n = 100;
for (char i = 0; i < n; i++) {
std::string key(1, i);
InternalKey ikey(key, 1, kTypeValue);
std::string value;
if (i % 2 == 0) {
value = std::string(1, i);
} else {
value = std::string(kMinBlobSize, i);
}
table_builder->Add(ikey.Encode(), value);
}
ASSERT_OK(table_builder->Finish());
ASSERT_OK(base_file->Sync(true));
ASSERT_OK(base_file->Close());
std::unique_ptr<TableReader> base_reader;
NewTableReader(base_name_, &base_reader);
ReadOptions ro;
std::unique_ptr<InternalIterator> iter;
iter.reset(base_reader->NewIterator(ro, nullptr /*prefix_extractor*/,
nullptr /*arena*/, false /*skip_filters*/,
TableReaderCaller::kUncategorized));
iter->SeekToFirst();
for (char i = 0; i < n; i++) {
ASSERT_TRUE(iter->Valid());
std::string key(1, i);
ParsedInternalKey ikey;
ASSERT_TRUE(ParseInternalKey(iter->key(), &ikey));
// check order
ASSERT_EQ(ikey.user_key, key);
if (i % 2 == 0) {
ASSERT_EQ(ikey.type, kTypeValue);
ASSERT_EQ(iter->value(), std::string(1, i));
} else {
ASSERT_EQ(ikey.type, kTypeBlobIndex);
// TODO: reading is not implmented yet
}
iter->Next();
}
#endif
}
TEST_F(TableBuilderTest, NoBlob) { TEST_F(TableBuilderTest, NoBlob) {
std::unique_ptr<WritableFileWriter> base_file; std::unique_ptr<WritableFileWriter> base_file;
NewBaseFileWriter(&base_file); NewBaseFileWriter(&base_file);
...@@ -568,6 +680,103 @@ TEST_F(TableBuilderTest, LevelMerge) { ...@@ -568,6 +680,103 @@ TEST_F(TableBuilderTest, LevelMerge) {
env_->DeleteFile(second_base_name); env_->DeleteFile(second_base_name);
} }
// Write blob index, to test key order is correct with dictionary compression
TEST_F(TableBuilderTest, LevelMergeWithDictCompressDisorder) {
#if ZSTD_VERSION_NUMBER >= 10103
cf_options_.level_merge = true;
table_factory_.reset(new TitanTableFactory(
db_options_, cf_options_, db_impl_.get(), blob_manager_, &mutex_,
blob_file_set_.get(), nullptr));
std::unique_ptr<WritableFileWriter> base_file;
NewBaseFileWriter(&base_file);
std::unique_ptr<TableBuilder> table_builder;
NewTableBuilder(base_file.get(), &table_builder);
// Generate level 0 sst with blob file
int n = 100;
for (unsigned char i = 0; i < n; i++) {
if (i % 2 == 0) {
// key: 0, 2, 4, 6 ..98
std::string key(1, i);
InternalKey ikey(key, 1, kTypeValue);
std::string value(kMinBlobSize, i);
table_builder->Add(ikey.Encode(), value);
} else {
// key: 1, 3, 5, ..99
std::string key(1, i);
InternalKey ikey(key, 1, kTypeValue);
std::string value = std::string(1, i);
table_builder->Add(ikey.Encode(), value);
}
}
ASSERT_OK(table_builder->Finish());
ASSERT_OK(base_file->Sync(true));
ASSERT_OK(base_file->Close());
std::unique_ptr<TableReader> base_reader;
NewTableReader(base_name_, &base_reader);
ReadOptions ro;
std::unique_ptr<InternalIterator> first_iter;
first_iter.reset(base_reader->NewIterator(
ro, nullptr /*prefix_extractor*/, nullptr /*arena*/,
false /*skip_filters*/, TableReaderCaller::kUncategorized));
// Base file of last level sst
std::string second_base_name = base_name_ + "second";
NewFileWriter(second_base_name, &base_file);
CompressionOptions compression_opts;
compression_opts.enabled = true;
compression_opts.max_dict_bytes = 4000;
cf_options_.blob_file_compression_options = compression_opts;
cf_options_.compression = kZSTD;
NewTableBuilder(base_file.get(), &table_builder, compression_opts,
cf_options_.num_levels - 1);
first_iter->SeekToFirst();
// compact data from level0 to level1
for (unsigned char i = 0; i < n; i++) {
ASSERT_TRUE(first_iter->Valid());
table_builder->Add(first_iter->key(), first_iter->value());
first_iter->Next();
}
ASSERT_OK(table_builder->Finish());
ASSERT_OK(base_file->Sync(true));
ASSERT_OK(base_file->Close());
std::unique_ptr<TableReader> second_base_reader;
NewTableReader(second_base_name, &second_base_reader);
std::unique_ptr<InternalIterator> second_iter;
second_iter.reset(second_base_reader->NewIterator(
ro, nullptr /*prefix_extractor*/, nullptr /*arena*/,
false /*skip_filters*/, TableReaderCaller::kUncategorized));
first_iter->SeekToFirst();
second_iter->SeekToFirst();
// check orders of keys
for (unsigned char i = 0; i < n; i++) {
ASSERT_TRUE(second_iter->Valid());
ASSERT_TRUE(first_iter->Valid());
// Compare sst key
ParsedInternalKey first_ikey, second_ikey;
ASSERT_TRUE(ParseInternalKey(first_iter->key(), &first_ikey));
ASSERT_TRUE(ParseInternalKey(first_iter->key(), &second_ikey));
ASSERT_EQ(first_ikey.type, second_ikey.type);
ASSERT_EQ(first_ikey.user_key, second_ikey.user_key);
// TODO: Compare blob records, need to implement decompression first
first_iter->Next();
second_iter->Next();
}
env_->DeleteFile(second_base_name);
#endif
}
} // namespace titandb } // namespace titandb
} // namespace rocksdb } // namespace rocksdb
......
...@@ -2953,7 +2953,8 @@ class StressTest { ...@@ -2953,7 +2953,8 @@ class StressTest {
1024 * 1024 * 1024; 1024 * 1024 * 1024;
} }
fprintf(stdout, fprintf(stdout,
"Create Titan column family %s with min_blob_size %lu\n", "Create Titan column family %s with min_blob_size %" PRIu64
"\n",
cfd.name.c_str(), cfd.name.c_str(),
titan_cf_descriptors.back().options.min_blob_size); titan_cf_descriptors.back().options.min_blob_size);
} }
...@@ -3236,9 +3237,11 @@ class NonBatchedOpsStressTest : public StressTest { ...@@ -3236,9 +3237,11 @@ class NonBatchedOpsStressTest : public StressTest {
if (new_column_family_name_ % 2 == 0) { if (new_column_family_name_ % 2 == 0) {
tmp.back().options.min_blob_size = 1024 * 1024 * 1024; tmp.back().options.min_blob_size = 1024 * 1024 * 1024;
} }
fprintf(stdout, fprintf(
"recreate Titan column family %s with min_blob_size %lu\n", stdout,
new_name.c_str(), tmp.back().options.min_blob_size); "recreate Titan column family %s with min_blob_size %" PRIu64
"\n",
new_name.c_str(), tmp.back().options.min_blob_size);
} }
std::vector<ColumnFamilyHandle*> result; std::vector<ColumnFamilyHandle*> result;
s = tdb->CreateColumnFamilies(tmp, &result); s = tdb->CreateColumnFamilies(tmp, &result);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment