Unverified Commit 73ed3a9f authored by Wu Jiayu's avatar Wu Jiayu Committed by GitHub

Implement basic level merge for blob files (#66)

* In table builder, if target level is last two level, write values of lower level to new blob files.
Signed-off-by: 's avatarwujy-cs <wujy.cs@gmail.com>
parent 144e20f0
......@@ -124,6 +124,18 @@ struct TitanCFOptions : public ColumnFamilyOptions {
// Default: kNormal
TitanBlobRunMode blob_run_mode{TitanBlobRunMode::kNormal};
// If set true, values in blob file will be merged to a new blob file while
// their corresponding keys are compacted to last two level in LSM-Tree.
//
// With this feature enabled, Titan could get better scan performance, and
// better write performance during GC, but will suffer around 1.1 space
// amplification and 3 more write amplification if no GC needed (eg. uniformly
// distributed keys) under default rocksdb setting.
//
// Requirement: level_compaction_dynamic_level_base = true
// Default: false
bool level_merge{false};
TitanCFOptions() = default;
explicit TitanCFOptions(const ColumnFamilyOptions& options)
: ColumnFamilyOptions(options) {}
......@@ -151,7 +163,8 @@ struct ImmutableTitanCFOptions {
min_gc_batch_size(opts.min_gc_batch_size),
blob_file_discardable_ratio(opts.blob_file_discardable_ratio),
sample_file_size_ratio(opts.sample_file_size_ratio),
merge_small_file_threshold(opts.merge_small_file_threshold) {}
merge_small_file_threshold(opts.merge_small_file_threshold),
level_merge(opts.level_merge) {}
uint64_t min_blob_size;
......@@ -170,6 +183,8 @@ struct ImmutableTitanCFOptions {
double sample_file_size_ratio;
uint64_t merge_small_file_threshold;
bool level_merge;
};
struct MutableTitanCFOptions {
......
......@@ -25,6 +25,7 @@ void BlobFileBuilder::Add(const BlobRecord& record, BlobHandle* handle) {
status_ = file_->Append(encoder_.GetHeader());
if (ok()) {
status_ = file_->Append(encoder_.GetRecord());
num_entries_++;
// The keys added into blob files are in order.
if (smallest_key_.empty()) {
smallest_key_.assign(record.key.data(), record.key.size());
......@@ -54,5 +55,7 @@ Status BlobFileBuilder::Finish() {
void BlobFileBuilder::Abandon() {}
uint64_t BlobFileBuilder::NumEntries() { return num_entries_; }
} // namespace titandb
} // namespace rocksdb
......@@ -54,6 +54,9 @@ class BlobFileBuilder {
// REQUIRES: Finish(), Abandon() have not been called.
void Abandon();
// Number of calls to Add() so far.
uint64_t NumEntries();
const std::string& GetSmallestKey() { return smallest_key_; }
const std::string& GetLargestKey() { return largest_key_; }
......@@ -66,6 +69,7 @@ class BlobFileBuilder {
Status status_;
BlobEncoder encoder_;
uint64_t num_entries_{0};
std::string smallest_key_;
std::string largest_key_;
};
......
......@@ -169,7 +169,9 @@ Status BlobFileMeta::DecodeFrom(Slice* src) {
bool operator==(const BlobFileMeta& lhs, const BlobFileMeta& rhs) {
return (lhs.file_number_ == rhs.file_number_ &&
lhs.file_size_ == rhs.file_size_);
lhs.file_size_ == rhs.file_size_ &&
lhs.file_entries_ == rhs.file_entries_ &&
lhs.file_level_ == rhs.file_level_);
}
void BlobFileMeta::FileStateTransit(const FileEvent& event) {
......
......@@ -181,6 +181,7 @@ class BlobFileMeta {
};
BlobFileMeta() = default;
BlobFileMeta(uint64_t _file_number, uint64_t _file_size,
uint64_t _file_entries, uint32_t _file_level,
const std::string& _smallest_key,
......
......@@ -156,17 +156,27 @@ void TitanDBImpl::StartBackgroundTasks() {
}
}
Status TitanDBImpl::ValidateOptions() const {
if (db_options_.purge_obsolete_files_period_sec == 0) {
Status TitanDBImpl::ValidateOptions(
const TitanDBOptions& options,
const std::vector<TitanCFDescriptor>& column_families) const {
if (options.purge_obsolete_files_period_sec == 0) {
return Status::InvalidArgument(
"Require non-zero purge_obsolete_files_period_sec");
}
for (const auto& cf : column_families) {
if (cf.options.level_merge &&
!cf.options.level_compaction_dynamic_level_bytes) {
return Status::InvalidArgument(
"Require enabling level_compaction_dynamic_level_bytes for "
"level_merge");
}
}
return Status::OK();
}
Status TitanDBImpl::Open(const std::vector<TitanCFDescriptor>& descs,
std::vector<ColumnFamilyHandle*>* handles) {
Status s = ValidateOptions();
Status s = ValidateOptions(db_options_, descs);
if (!s.ok()) {
return s;
}
......
......@@ -145,7 +145,9 @@ class TitanDBImpl : public TitanDB {
friend class TitanDBTest;
friend class TitanThreadSafetyTest;
Status ValidateOptions() const;
Status ValidateOptions(
const TitanDBOptions& options,
const std::vector<TitanCFDescriptor>& column_families) const;
Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* handle,
const Slice& key, PinnableSlice* value);
......
......@@ -68,6 +68,36 @@ void TitanTableBuilder::Add(const Slice& key, const Slice& value) {
AppendInternalKey(&index_key, ikey);
base_builder_->Add(index_key, index_value);
}
} else if (ikey.type == kTypeBlobIndex && cf_options_.level_merge &&
target_level_ >= merge_level_ &&
cf_options_.blob_run_mode == TitanBlobRunMode::kNormal) {
// we merge value to new blob file
BlobIndex index;
Slice copy = value;
status_ = index.DecodeFrom(&copy);
if (!ok()) {
return;
}
auto storage = blob_storage_.lock();
assert(storage != nullptr);
auto blob_file = storage->FindFile(index.file_number).lock();
if (ShouldMerge(blob_file)) {
BlobRecord record;
PinnableSlice buffer;
Status s = storage->Get(ReadOptions(), index, &record, &buffer);
if (s.ok()) {
std::string index_value;
AddBlob(ikey.user_key, record.value, &index_value);
if (ok()) {
std::string index_key;
ikey.type = kTypeBlobIndex;
AppendInternalKey(&index_key, ikey);
base_builder_->Add(index_key, index_value);
return;
}
}
}
base_builder_->Add(key, value);
} else {
base_builder_->Add(key, value);
}
......@@ -105,22 +135,14 @@ void TitanTableBuilder::AddBlob(const Slice& key, const Slice& value,
bytes_written_ += record.size();
if (ok()) {
index.EncodeTo(index_value);
if (blob_handle_->GetFile()->GetFileSize() >=
cf_options_.blob_file_target_size) {
FinishBlob();
}
}
}
Status TitanTableBuilder::status() const {
Status s = status_;
if (s.ok()) {
s = base_builder_->status();
}
if (s.ok() && blob_builder_) {
s = blob_builder_->status();
}
return s;
}
Status TitanTableBuilder::Finish() {
base_builder_->Finish();
void TitanTableBuilder::FinishBlob() {
if (blob_builder_) {
blob_builder_->Finish();
if (ok()) {
......@@ -128,11 +150,12 @@ Status TitanTableBuilder::Finish() {
"Titan table builder finish output file %" PRIu64 ".",
blob_handle_->GetNumber());
std::shared_ptr<BlobFileMeta> file = std::make_shared<BlobFileMeta>(
blob_handle_->GetNumber(), blob_handle_->GetFile()->GetFileSize(), 0,
0, blob_builder_->GetSmallestKey(), blob_builder_->GetLargestKey());
blob_handle_->GetNumber(), blob_handle_->GetFile()->GetFileSize(),
blob_builder_->NumEntries(), target_level_,
blob_builder_->GetSmallestKey(), blob_builder_->GetLargestKey());
file->FileStateTransit(BlobFileMeta::FileEvent::kFlushOrCompactionOutput);
status_ =
blob_manager_->FinishFile(cf_id_, file, std::move(blob_handle_));
finished_blobs_.push_back({file, std::move(blob_handle_)});
blob_builder_.reset();
} else {
ROCKS_LOG_WARN(
db_options_.info_log,
......@@ -141,6 +164,23 @@ Status TitanTableBuilder::Finish() {
status_ = blob_manager_->DeleteFile(std::move(blob_handle_));
}
}
}
Status TitanTableBuilder::status() const {
Status s = status_;
if (s.ok()) {
s = base_builder_->status();
}
if (s.ok() && blob_builder_) {
s = blob_builder_->status();
}
return s;
}
Status TitanTableBuilder::Finish() {
base_builder_->Finish();
FinishBlob();
status_ = blob_manager_->BatchFinishFiles(cf_id_, finished_blobs_);
if (!status_.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Titan table builder failed on finish: %s",
......@@ -178,6 +218,11 @@ TableProperties TitanTableBuilder::GetTableProperties() const {
return base_builder_->GetTableProperties();
}
bool TitanTableBuilder::ShouldMerge(
const std::shared_ptr<rocksdb::titandb::BlobFileMeta>& file) {
return file != nullptr && (int)file->file_level() < target_level_;
}
void TitanTableBuilder::UpdateInternalOpStats() {
if (stats_ == nullptr) {
return;
......@@ -187,7 +232,7 @@ void TitanTableBuilder::UpdateInternalOpStats() {
return;
}
InternalOpType op_type = InternalOpType::COMPACTION;
if (level_ == 0) {
if (target_level_ == 0) {
op_type = InternalOpType::FLUSH;
}
InternalOpStats* internal_op_stats =
......
......@@ -16,16 +16,17 @@ class TitanTableBuilder : public TableBuilder {
const TitanCFOptions& cf_options,
std::unique_ptr<TableBuilder> base_builder,
std::shared_ptr<BlobFileManager> blob_manager,
std::weak_ptr<BlobStorage> blob_storage, int level,
TitanStats* stats)
std::weak_ptr<BlobStorage> blob_storage, TitanStats* stats,
int merge_level, int target_level)
: cf_id_(cf_id),
db_options_(db_options),
cf_options_(cf_options),
base_builder_(std::move(base_builder)),
blob_manager_(blob_manager),
blob_storage_(blob_storage),
level_(level),
stats_(stats) {}
stats_(stats),
target_level_(target_level),
merge_level_(merge_level) {}
void Add(const Slice& key, const Slice& value) override;
......@@ -44,10 +45,16 @@ class TitanTableBuilder : public TableBuilder {
TableProperties GetTableProperties() const override;
private:
friend class TableBuilderTest;
bool ok() const { return status().ok(); }
void AddBlob(const Slice& key, const Slice& value, std::string* index_value);
bool ShouldMerge(const std::shared_ptr<BlobFileMeta>& file);
void FinishBlob();
void UpdateInternalOpStats();
Status status_;
......@@ -59,9 +66,18 @@ class TitanTableBuilder : public TableBuilder {
std::shared_ptr<BlobFileManager> blob_manager_;
std::unique_ptr<BlobFileBuilder> blob_builder_;
std::weak_ptr<BlobStorage> blob_storage_;
int level_;
std::vector<
std::pair<std::shared_ptr<BlobFileMeta>, std::unique_ptr<BlobFileHandle>>>
finished_blobs_;
TitanStats* stats_;
// target level in LSM-Tree for generated SSTs and blob files
int target_level_;
// with cf_options_.level_merge == true, if target_level_ is higher than or
// equals to merge_level_, values belong to blob files which have lower level
// than target_level_ will be merged to new blob file
int merge_level_;
// counters
uint64_t bytes_read_ = 0;
uint64_t bytes_written_ = 0;
......
......@@ -5,6 +5,7 @@
#include "blob_file_manager.h"
#include "blob_file_reader.h"
#include "table_builder.h"
#include "table_factory.h"
#include "version_set.h"
......@@ -13,13 +14,15 @@ namespace titandb {
const uint64_t kMinBlobSize = 128;
const uint64_t kTestFileNumber = 123;
const uint64_t kTargetBlobFileSize = 4096;
class FileManager : public BlobFileManager {
public:
FileManager(const TitanDBOptions& db_options) : db_options_(db_options) {}
FileManager(const TitanDBOptions& db_options, VersionSet* vset)
: db_options_(db_options), number_(kTestFileNumber), vset_(vset) {}
Status NewFile(std::unique_ptr<BlobFileHandle>* handle) override {
auto number = kTestFileNumber;
auto number = number_.fetch_add(1);
auto name = BlobFileName(db_options_.dirname, number);
std::unique_ptr<WritableFileWriter> file;
{
......@@ -32,11 +35,29 @@ class FileManager : public BlobFileManager {
return Status::OK();
}
Status FinishFile(uint32_t /*cf_id*/, std::shared_ptr<BlobFileMeta> /*file*/,
Status FinishFile(uint32_t /*cf_id*/,
std::shared_ptr<BlobFileMeta> file /*file*/,
std::unique_ptr<BlobFileHandle>&& handle) override {
Status s = handle->GetFile()->Sync(true);
if (s.ok()) {
s = handle->GetFile()->Close();
auto storage = vset_->GetBlobStorage(0).lock();
storage->AddBlobFile(file);
}
return s;
}
Status BatchFinishFiles(
uint32_t cf_id,
const std::vector<std::pair<std::shared_ptr<BlobFileMeta>,
std::unique_ptr<BlobFileHandle>>>& files)
override {
Status s;
for (auto& file : files) {
s = FinishFile(cf_id, file.first,
const_cast<std::unique_ptr<BlobFileHandle>&&>(
std::move(file.second)));
if (!s.ok()) return s;
}
return s;
}
......@@ -45,6 +66,8 @@ class FileManager : public BlobFileManager {
return env_->DeleteFile(handle->GetName());
}
uint64_t LastBlobNumber() { return number_.load() - 1; }
private:
class FileHandle : public BlobFileHandle {
public:
......@@ -69,6 +92,8 @@ class FileManager : public BlobFileManager {
Env* env_{Env::Default()};
EnvOptions env_options_;
TitanDBOptions db_options_;
std::atomic<uint64_t> number_{0};
VersionSet* vset_;
};
class TableBuilderTest : public testing::Test {
......@@ -82,15 +107,21 @@ class TableBuilderTest : public testing::Test {
db_options_.dirname = tmpdir_;
cf_options_.min_blob_size = kMinBlobSize;
vset_.reset(new VersionSet(db_options_, nullptr));
blob_manager_.reset(new FileManager(db_options_));
std::map<uint32_t, TitanCFOptions> cfs{{0, cf_options_}};
vset_->AddColumnFamilies(cfs);
blob_manager_.reset(new FileManager(db_options_, vset_.get()));
table_factory_.reset(new TitanTableFactory(db_options_, cf_options_,
blob_manager_, &mutex_,
vset_.get(), nullptr));
}
~TableBuilderTest() {
uint64_t last_blob_number =
reinterpret_cast<FileManager*>(blob_manager_.get())->LastBlobNumber();
for (uint64_t i = kTestFileNumber; i <= last_blob_number; i++) {
env_->DeleteFile(BlobFileName(tmpdir_, i));
}
env_->DeleteFile(base_name_);
env_->DeleteFile(blob_name_);
env_->DeleteDir(tmpdir_);
}
......@@ -134,9 +165,10 @@ class TableBuilderTest : public testing::Test {
result, nullptr));
}
void NewTableReader(std::unique_ptr<TableReader>* result) {
void NewTableReader(const std::string& fname,
std::unique_ptr<TableReader>* result) {
std::unique_ptr<RandomAccessFileReader> file;
NewBaseFileReader(&file);
NewFileReader(fname, &file);
uint64_t file_size = 0;
ASSERT_OK(env_->GetFileSize(file->file_name(), &file_size));
TableReaderOptions options(cf_ioptions_, nullptr, env_options_,
......@@ -146,13 +178,14 @@ class TableBuilderTest : public testing::Test {
}
void NewTableBuilder(WritableFileWriter* file,
std::unique_ptr<TableBuilder>* result) {
std::unique_ptr<TableBuilder>* result,
int target_level = 0) {
CompressionOptions compression_opts;
TableBuilderOptions options(cf_ioptions_, cf_moptions_,
cf_ioptions_.internal_comparator, &collectors_,
kNoCompression, 0 /*sample_for_compression*/,
compression_opts, false /*skip_filters*/,
kDefaultColumnFamilyName, 0 /*level*/);
kDefaultColumnFamilyName, target_level);
result->reset(table_factory_->NewTableBuilder(options, 0, file));
}
......@@ -199,7 +232,7 @@ TEST_F(TableBuilderTest, Basic) {
ASSERT_OK(base_file->Close());
std::unique_ptr<TableReader> base_reader;
NewTableReader(&base_reader);
NewTableReader(base_name_, &base_reader);
std::unique_ptr<BlobFileReader> blob_reader;
NewBlobFileReader(&blob_reader);
......@@ -252,7 +285,7 @@ TEST_F(TableBuilderTest, NoBlob) {
BlobFileExists(false);
std::unique_ptr<TableReader> base_reader;
NewTableReader(&base_reader);
NewTableReader(base_name_, &base_reader);
ReadOptions ro;
std::unique_ptr<InternalIterator> iter;
......@@ -318,6 +351,128 @@ TEST_F(TableBuilderTest, NumEntries) {
ASSERT_OK(table_builder->Finish());
}
// To test size of each blob file is around blob_file_target_size after building
TEST_F(TableBuilderTest, TargetSize) {
cf_options_.blob_file_target_size = kTargetBlobFileSize;
table_factory_.reset(new TitanTableFactory(
db_options_, cf_options_, blob_manager_, &mutex_, vset_.get(), nullptr));
std::unique_ptr<WritableFileWriter> base_file;
NewBaseFileWriter(&base_file);
std::unique_ptr<TableBuilder> table_builder;
NewTableBuilder(base_file.get(), &table_builder);
const int n = 255;
for (unsigned char i = 0; i < n; i++) {
std::string key(1, i);
InternalKey ikey(key, 1, kTypeValue);
std::string value(kMinBlobSize, i);
table_builder->Add(ikey.Encode(), value);
}
ASSERT_OK(table_builder->Finish());
uint64_t last_file_number =
reinterpret_cast<FileManager*>(blob_manager_.get())->LastBlobNumber();
for (uint64_t i = kTestFileNumber; i <= last_file_number; i++) {
uint64_t file_size = 0;
ASSERT_OK(env_->GetFileSize(BlobFileName(tmpdir_, i), &file_size));
ASSERT_TRUE(file_size < kTargetBlobFileSize + 1 + kMinBlobSize);
if (i < last_file_number) {
ASSERT_TRUE(file_size > kTargetBlobFileSize - 1 - kMinBlobSize);
}
}
}
// Compact a level 0 file to last level, to test level merge is functional and
// correct
TEST_F(TableBuilderTest, LevelMerge) {
cf_options_.level_merge = true;
table_factory_.reset(new TitanTableFactory(
db_options_, cf_options_, blob_manager_, &mutex_, vset_.get(), nullptr));
std::unique_ptr<WritableFileWriter> base_file;
NewBaseFileWriter(&base_file);
std::unique_ptr<TableBuilder> table_builder;
NewTableBuilder(base_file.get(), &table_builder, 0 /* target_level */);
// Generate a level 0 sst with blob file
const int n = 255;
for (unsigned char i = 0; i < n; i++) {
std::string key(1, i);
InternalKey ikey(key, 1, kTypeValue);
std::string value(kMinBlobSize, i);
table_builder->Add(ikey.Encode(), value);
}
ASSERT_OK(table_builder->Finish());
ASSERT_OK(base_file->Sync(true));
ASSERT_OK(base_file->Close());
std::unique_ptr<TableReader> base_reader;
NewTableReader(base_name_, &base_reader);
ReadOptions ro;
std::unique_ptr<InternalIterator> first_iter;
first_iter.reset(base_reader->NewIterator(
ro, nullptr /*prefix_extractor*/, nullptr /*arena*/,
false /*skip_filters*/, TableReaderCaller::kUncategorized));
// Base file of last level sst
std::string second_base_name = base_name_ + "second";
NewFileWriter(second_base_name, &base_file);
NewTableBuilder(base_file.get(), &table_builder, cf_options_.num_levels - 1);
first_iter->SeekToFirst();
// Compact level0 sst to last level, values will be merge to another blob file
for (unsigned char i = 0; i < n; i++) {
ASSERT_TRUE(first_iter->Valid());
table_builder->Add(first_iter->key(), first_iter->value());
first_iter->Next();
}
ASSERT_OK(table_builder->Finish());
ASSERT_OK(base_file->Sync(true));
ASSERT_OK(base_file->Close());
std::unique_ptr<TableReader> second_base_reader;
NewTableReader(second_base_name, &second_base_reader);
std::unique_ptr<InternalIterator> second_iter;
second_iter.reset(second_base_reader->NewIterator(
ro, nullptr /*prefix_extractor*/, nullptr /*arena*/,
false /*skip_filters*/, TableReaderCaller::kUncategorized));
// Compare key, index and blob records after level merge
first_iter->SeekToFirst();
second_iter->SeekToFirst();
auto storage = vset_->GetBlobStorage(0).lock();
for (unsigned char i = 0; i < n; i++) {
ASSERT_TRUE(first_iter->Valid());
ASSERT_TRUE(second_iter->Valid());
// Compare sst key
ParsedInternalKey first_ikey, second_ikey;
ASSERT_TRUE(ParseInternalKey(first_iter->key(), &first_ikey));
ASSERT_TRUE(ParseInternalKey(first_iter->key(), &second_ikey));
ASSERT_EQ(first_ikey.type, kTypeBlobIndex);
ASSERT_EQ(second_ikey.type, kTypeBlobIndex);
ASSERT_EQ(first_ikey.user_key, second_ikey.user_key);
// Compare blob records
Slice first_value = first_iter->value();
Slice second_value = second_iter->value();
BlobIndex first_index, second_index;
BlobRecord first_record, second_record;
PinnableSlice first_buffer, second_buffer;
ASSERT_OK(first_index.DecodeFrom(&first_value));
ASSERT_OK(second_index.DecodeFrom(&second_value));
ASSERT_FALSE(first_index == second_index);
ASSERT_OK(
storage->Get(ReadOptions(), first_index, &first_record, &first_buffer));
ASSERT_OK(storage->Get(ReadOptions(), second_index, &second_record,
&second_buffer));
ASSERT_EQ(first_record.key, second_record.key);
ASSERT_EQ(first_record.value, second_record.value);
first_iter->Next();
second_iter->Next();
}
env_->DeleteFile(second_base_name);
}
} // namespace titandb
} // namespace rocksdb
......
......@@ -23,13 +23,20 @@ TableBuilder* TitanTableFactory::NewTableBuilder(
TitanCFOptions cf_options = cf_options_;
cf_options.blob_run_mode = blob_run_mode_.load();
std::weak_ptr<BlobStorage> blob_storage;
// since we force use dynamic_level_bytes=true when level_merge=true, the last
// level of a cf is always cf_options.num_levels - 1.
int num_levels = cf_options.num_levels;
{
MutexLock l(db_mutex_);
blob_storage = vset_->GetBlobStorage(column_family_id);
}
return new TitanTableBuilder(column_family_id, db_options_, cf_options,
std::move(base_builder), blob_manager_,
blob_storage, options.level, stats_);
return new TitanTableBuilder(
column_family_id, db_options_, cf_options, std::move(base_builder),
blob_manager_, blob_storage, stats_,
std::max(1, num_levels - 2) /* merge level */, options.level);
}
std::string TitanTableFactory::GetPrintableTableOptions() const {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment