Commit e8bc9254 authored by Wu Jiayu's avatar Wu Jiayu Committed by yiwu-arbug

Implement basic range merge to maintain last level sorted run number (#92)

With level merge enabled, we expect there are no more than 10 sorted runs of blob files in both of last two levels. But 

- Since we force using level_compaction_dynamic_level_bytes, last level blob files won't be merged again, sorted run number in last level will increase infinitely.
- Some hot range may contain more sorted runs.

This patch implement range merge:

- Check sorted run number of compaction range after last level compaction
- Mark blob files to merge in next compaction if sorted run number > max_sorted_runs
parent 44668f8c
......@@ -137,6 +137,24 @@ struct TitanCFOptions : public ColumnFamilyOptions {
// Default: false
bool level_merge{false};
// With level merge enabled, we expect there are no more than 10 sorted runs
// of blob files in both of last two levels. But since last level blob files
// won't be merged again, sorted runs in last level will increase infinitely.
//
// With this feature enabled, Titan will check sorted runs of compaction range
// after each last level compaction and mark related blob files if there are
// too many. These marked blob files will be merged to a new sorted run in
// next compaction.
//
// Default: false
bool range_merge{false};
// Max sorted runs to trigger range merge. Decrease this value will increase
// write amplification but get better short range scan performance.
//
// Default: 20
int max_sorted_runs{20};
TitanCFOptions() = default;
explicit TitanCFOptions(const ColumnFamilyOptions& options)
: ColumnFamilyOptions(options) {}
......
......@@ -225,6 +225,9 @@ void BlobFileMeta::FileStateTransit(const FileEvent& event) {
assert(state_ == FileState::kNormal);
state_ = FileState::kToMerge;
break;
case FileEvent::kReset:
state_ = FileState::kNormal;
break;
default:
assert(false);
}
......
......@@ -167,6 +167,7 @@ class BlobFileMeta {
kDbRestart,
kDelete,
kNeedMerge,
kReset, // reset file to normal for test
};
enum class FileState {
......@@ -202,8 +203,8 @@ class BlobFileMeta {
uint64_t file_size() const { return file_size_; }
uint64_t file_entries() const { return file_entries_; }
uint32_t file_level() const { return file_level_; }
Slice smallest_key() const { return smallest_key_; }
Slice largest_key() const { return largest_key_; }
const std::string& smallest_key() const { return smallest_key_; }
const std::string& largest_key() const { return largest_key_; }
FileState file_state() const { return state_; }
bool is_obsolete() const { return state_ == FileState::kObsolete; }
......
......@@ -586,9 +586,8 @@ Status BlobGCJob::DeleteInputBlobFiles() {
for (const auto& file : blob_gc_->sampled_inputs()) {
ROCKS_LOG_INFO(db_options_.info_log,
"Titan add obsolete file [%" PRIu64 "] range [%s, %s]",
file->file_number(),
file->smallest_key().ToString(true).c_str(),
file->largest_key().ToString(true).c_str());
file->file_number(), file->smallest_key().c_str(),
file->largest_key().c_str());
metrics_.gc_num_files++;
RecordInHistogram(stats_, TitanStats::GC_INPUT_FILE_SIZE,
file->file_size());
......
......@@ -91,6 +91,14 @@ class BlobGCJobTest : public testing::Test {
Open();
}
void ScheduleRangeMerge(
const std::vector<std::shared_ptr<BlobFileMeta>>& files,
int max_sorted_runs) {
tdb_->mutex_.Lock();
tdb_->MarkFileIfNeedMerge(files, max_sorted_runs);
tdb_->mutex_.Unlock();
}
void Flush() {
FlushOptions fopts;
fopts.wait = true;
......@@ -583,7 +591,7 @@ TEST_F(BlobGCJobTest, LevelMergeGC) {
CheckBlobNumber(4);
auto b = GetBlobStorage(base_db_->DefaultColumnFamily()->GetID()).lock();
// blob file number is start from 2, they are: old level0 blob file, old
// blob file number starts from 2, they are: old level0 blob file, old
// last level blob file, new level0 blob file, new last level blob file
// respectively.
ASSERT_EQ(b->FindFile(2).lock()->file_state(),
......@@ -598,7 +606,130 @@ TEST_F(BlobGCJobTest, LevelMergeGC) {
DestroyDB();
}
TEST_F(BlobGCJobTest, RangeMergeScheduler) {
NewDB();
int max_sorted_run = 1;
std::vector<std::shared_ptr<BlobFileMeta>> files;
auto add_file = [&](int file_num, const std::string& smallest,
const std::string& largest) {
auto file =
std::make_shared<BlobFileMeta>(file_num, 0, 0, 0, smallest, largest);
file->FileStateTransit(BlobFileMeta::FileEvent::kReset);
files.emplace_back(file);
};
// one sorted run, no file will be marked
// run 1: [a, b] [c, d] [e, f] [g, h] [i, j] [k, l]
for (size_t i = 0; i <= 5; i++) {
add_file(i, std::string(1, 'a' + i * 2), std::string(1, 'a' + i * 2 + 1));
}
ScheduleRangeMerge(files, max_sorted_run);
for (const auto& file : files) {
ASSERT_EQ(file->file_state(), BlobFileMeta::FileState::kNormal);
}
// run 1: [a, b] [c, d] [e, f] [g, h] [i, j] [k, l]
// run 2: [e, f] [g, h]
// files overlaped with [e, h] will be marked
add_file(6, "e", "f");
add_file(7, "g", "h");
ScheduleRangeMerge(files, max_sorted_run);
for (size_t i = 0; i < files.size(); i++) {
if (i == 2 || i == 3 || i == 6 || i == 7) {
ASSERT_EQ(files[i]->file_state(), BlobFileMeta::FileState::kToMerge);
files[i]->FileStateTransit(BlobFileMeta::FileEvent::kReset);
} else {
ASSERT_EQ(files[i]->file_state(), BlobFileMeta::FileState::kNormal);
}
}
// run 1: [a, b] [c, d] [e, f] [g, h] [i, j] [k, l]
// run 2: [a, b] [e, f] [g, h] [l, m]
// files overlaped with [a, b] and [e, h] will be marked
add_file(8, "a", "b");
add_file(9, "l", "m");
ScheduleRangeMerge(files, max_sorted_run);
for (size_t i = 0; i < files.size(); i++) {
if (i == 1 || i == 4 || i == 5 || i == 9) {
ASSERT_EQ(files[i]->file_state(), BlobFileMeta::FileState::kNormal);
} else {
ASSERT_EQ(files[i]->file_state(), BlobFileMeta::FileState::kToMerge);
files[i]->FileStateTransit(BlobFileMeta::FileEvent::kReset);
}
}
max_sorted_run = 2;
// run 1: [a, b] [c, d] [e, f] [g, h] [i, j] [k, l]
// run 2: [a, b] [e, f] [g, h] [l, m]
// run 3: [a, l1]
// files overlaped with [a, b] and [e, h] will be marked.
add_file(10, "a", "l1");
ScheduleRangeMerge(files, max_sorted_run);
for (size_t i = 0; i < files.size(); i++) {
if (i == 1 || i == 4 || i == 5 || i == 9) {
ASSERT_EQ(files[i]->file_state(), BlobFileMeta::FileState::kNormal);
} else {
ASSERT_EQ(files[i]->file_state(), BlobFileMeta::FileState::kToMerge);
files[i]->FileStateTransit(BlobFileMeta::FileEvent::kReset);
}
}
DestroyDB();
}
TEST_F(BlobGCJobTest, RangeMerge) {
options_.level_merge = true;
options_.level_compaction_dynamic_level_bytes = true;
options_.blob_file_discardable_ratio = 0.5;
options_.range_merge = true;
options_.max_sorted_runs = 4;
options_.purge_obsolete_files_period_sec = 0;
NewDB();
ColumnFamilyMetaData cf_meta;
std::vector<std::string> to_compact(1);
auto opts = db_->GetOptions();
// compact 5 sorted runs to last level of key range [1, 50]
for (int i = 1; i <= 5; i++) {
for (int j = 0; j < 10; j++) {
db_->Put(WriteOptions(), GenKey(5 * j + i), GenValue(5 * j + i));
}
Flush();
db_->GetColumnFamilyMetaData(base_db_->DefaultColumnFamily(), &cf_meta);
to_compact[0] = cf_meta.levels[0].files[0].name;
db_->CompactFiles(CompactionOptions(), base_db_->DefaultColumnFamily(),
to_compact, opts.num_levels - 1);
CheckBlobNumber(2 * i);
}
auto b = GetBlobStorage(base_db_->DefaultColumnFamily()->GetID()).lock();
// blob file number starts from 2. Even number blob files belong to level0,
// odd number blob files belong to last level.
for (int i = 2; i < 12; i++) {
auto blob = b->FindFile(i).lock();
if (i % 2 == 0) {
ASSERT_EQ(blob->file_state(), BlobFileMeta::FileState::kObsolete);
} else {
ASSERT_EQ(blob->file_state(), BlobFileMeta::FileState::kToMerge);
}
}
db_->GetColumnFamilyMetaData(base_db_->DefaultColumnFamily(), &cf_meta);
to_compact[0] = cf_meta.levels[opts.num_levels - 1].files[0].name;
db_->CompactFiles(CompactionOptions(), base_db_->DefaultColumnFamily(),
to_compact, opts.num_levels - 1);
// after last level compaction, marked blob files are merged to new blob
// files and obsoleted.
for (int i = 2; i < 12; i++) {
auto blob = b->FindFile(i).lock();
ASSERT_EQ(blob->file_state(), BlobFileMeta::FileState::kObsolete);
}
DestroyDB();
}
} // namespace titandb
} // namespace rocksdb
int main(int argc, char** argv) {
......
......@@ -88,7 +88,7 @@ void BlobStorage::ExportBlobFiles(
void BlobStorage::AddBlobFile(std::shared_ptr<BlobFileMeta>& file) {
MutexLock l(&mutex_);
files_.emplace(std::make_pair(file->file_number(), file));
blob_ranges_.emplace(std::make_pair(file->smallest_key(), file));
blob_ranges_.emplace(std::make_pair(Slice(file->smallest_key()), file));
AddStats(stats_, cf_id_, TitanInternalStats::LIVE_BLOB_FILE_SIZE,
file->file_size());
AddStats(stats_, cf_id_, TitanInternalStats::NUM_LIVE_BLOB_FILE, 1);
......
......@@ -70,8 +70,8 @@ class TitanDBImpl::FileManager : public BlobFileManager {
ROCKS_LOG_INFO(db_->db_options_.info_log,
"Titan adding blob file [%" PRIu64 "] range [%s, %s]",
file.first->file_number(),
file.first->smallest_key().ToString(true).c_str(),
file.first->largest_key().ToString(true).c_str());
file.first->smallest_key().c_str(),
file.first->largest_key().c_str());
edit.AddBlobFile(file.first);
}
......@@ -854,6 +854,48 @@ Status TitanDBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
return s;
}
void TitanDBImpl::MarkFileIfNeedMerge(
const std::vector<std::shared_ptr<BlobFileMeta>>& files,
int max_sorted_runs) {
mutex_.AssertHeld();
if (files.empty()) return;
// store and sort both ends of blob files to count sorted runs
std::vector<std::pair<BlobFileMeta*, bool /* is smallest end? */>> blob_ends;
for (const auto& file : files) {
blob_ends.emplace_back(std::make_pair(file.get(), true));
blob_ends.emplace_back(std::make_pair(file.get(), false));
}
auto blob_ends_cmp = [](const std::pair<BlobFileMeta*, bool>& end1,
const std::pair<BlobFileMeta*, bool>& end2) {
const std::string& key1 =
end1.second ? end1.first->smallest_key() : end1.first->largest_key();
const std::string& key2 =
end2.second ? end2.first->smallest_key() : end2.first->largest_key();
int cmp = key1.compare(key2);
// when the key being the same, order largest_key before smallest_key
return (cmp == 0) ? (!end1.second && end2.second) : (cmp < 0);
};
std::sort(blob_ends.begin(), blob_ends.end(), blob_ends_cmp);
int cur_add = 0;
int cur_remove = 0;
int size = blob_ends.size();
std::unordered_map<BlobFileMeta*, int> tmp;
for (int i = 0; i < size; i++) {
if (blob_ends[i].second) {
++cur_add;
tmp[blob_ends[i].first] = cur_remove;
} else {
++cur_remove;
auto record = tmp.find(blob_ends[i].first);
if (cur_add - record->second > max_sorted_runs) {
record->first->FileStateTransit(BlobFileMeta::FileEvent::kNeedMerge);
}
}
}
}
Options TitanDBImpl::GetOptions(ColumnFamilyHandle* column_family) const {
assert(column_family != nullptr);
Options options = db_->GetOptions(column_family);
......@@ -1122,9 +1164,19 @@ void TitanDBImpl::OnCompactionCompleted(
uint64_t delta = 0;
VersionEdit edit;
auto cf_options = bs->cf_options();
std::vector<std::shared_ptr<BlobFileMeta>> files;
bool count_sorted_run =
cf_options.level_merge && cf_options.range_merge &&
cf_options.num_levels - 1 == compaction_job_info.output_level;
for (const auto& bfs : blob_files_size_diff) {
// blob file size < 0 means discardable size > 0
if (bfs.second >= 0) {
if (count_sorted_run) {
auto file = bs->FindFile(bfs.first).lock();
if (file != nullptr) {
files.emplace_back(std::move(file));
}
}
continue;
}
auto file = bs->FindFile(bfs.first).lock();
......@@ -1150,12 +1202,16 @@ void TitanDBImpl::OnCompactionCompleted(
if (file->NoLiveData()) {
edit.DeleteBlobFile(file->file_number(),
db_impl_->GetLatestSequenceNumber());
continue;
} else if (static_cast<int>(file->file_level()) >=
cf_options.num_levels - 2 &&
file->GetDiscardableRatio() >
cf_options.blob_file_discardable_ratio) {
file->FileStateTransit(BlobFileMeta::FileEvent::kNeedMerge);
}
if (count_sorted_run) {
files.emplace_back(std::move(file));
}
}
}
SubStats(stats_.get(), compaction_job_info.cf_id,
......@@ -1164,6 +1220,7 @@ void TitanDBImpl::OnCompactionCompleted(
// data based GC, so we don't need to trigger regular GC anymore
if (cf_options.level_merge) {
blob_file_set_->LogAndApply(edit);
MarkFileIfNeedMerge(files, cf_options.max_sorted_runs);
} else {
bs->ComputeGCScore();
......
......@@ -216,6 +216,10 @@ class TitanDBImpl : public TitanDB {
return bg_error_;
}
void MarkFileIfNeedMerge(
const std::vector<std::shared_ptr<BlobFileMeta>>& files,
int max_sorted_runs);
bool HasBGError() { return has_bg_error_.load(); }
void DumpStats();
......
......@@ -221,6 +221,10 @@ TableProperties TitanTableBuilder::GetTableProperties() const {
bool TitanTableBuilder::ShouldMerge(
const std::shared_ptr<rocksdb::titandb::BlobFileMeta>& file) {
assert(cf_options_.level_merge);
// Values in blob file should be merged if
// 1. Corresponding keys are being compacted to last two level from lower
// level
// 2. Blob file is marked by GC or range merge
return file != nullptr &&
(static_cast<int>(file->file_level()) < target_level_ ||
file->file_state() == BlobFileMeta::FileState::kToMerge);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment