Unverified Commit 8376c989 authored by Connor's avatar Connor Committed by GitHub

Fix algorithm for counting runs (#104)

* fix algorithm for counting runs
Signed-off-by: 's avatarConnor1996 <zbk602423539@gmail.com>
parent c6b93a63
......@@ -586,8 +586,9 @@ Status BlobGCJob::DeleteInputBlobFiles() {
for (const auto& file : blob_gc_->sampled_inputs()) {
ROCKS_LOG_INFO(db_options_.info_log,
"Titan add obsolete file [%" PRIu64 "] range [%s, %s]",
file->file_number(), file->smallest_key().c_str(),
file->largest_key().c_str());
file->file_number(),
Slice(file->smallest_key()).ToString(true).c_str(),
Slice(file->largest_key()).ToString(true).c_str());
metrics_.gc_num_files++;
RecordInHistogram(stats_, TitanStats::GC_INPUT_FILE_SIZE,
file->file_size());
......
......@@ -608,32 +608,50 @@ TEST_F(BlobGCJobTest, LevelMergeGC) {
TEST_F(BlobGCJobTest, RangeMergeScheduler) {
NewDB();
int max_sorted_run = 1;
std::vector<std::shared_ptr<BlobFileMeta>> files;
auto add_file = [&](int file_num, const std::string& smallest,
const std::string& largest) {
auto file =
std::make_shared<BlobFileMeta>(file_num, 0, 0, 0, smallest, largest);
file->FileStateTransit(BlobFileMeta::FileEvent::kReset);
files.emplace_back(file);
};
auto init_files =
[&](std::vector<std::vector<std::pair<std::string, std::string>>>
file_runs) {
std::vector<std::shared_ptr<BlobFileMeta>> files;
int file_num = 0;
for (auto& run : file_runs) {
for (auto& range : run) {
auto file = std::make_shared<BlobFileMeta>(
file_num++, 0, 0, 0, range.first, range.second);
file->FileStateTransit(BlobFileMeta::FileEvent::kReset);
files.emplace_back(file);
}
}
return files;
};
// one sorted run, no file will be marked
// max_sorted_run = 1
// run 1: [a, b] [c, d] [e, f] [g, h] [i, j] [k, l]
for (size_t i = 0; i <= 5; i++) {
add_file(i, std::string(1, 'a' + i * 2), std::string(1, 'a' + i * 2 + 1));
}
ScheduleRangeMerge(files, max_sorted_run);
// no file will be marked
auto file_runs =
std::vector<std::vector<std::pair<std::string, std::string>>>{
{{"a", "b"},
{"c", "d"},
{"e", "f"},
{"g", "h"},
{"i", "j"},
{"k", "l"}},
};
auto files = init_files(file_runs);
ScheduleRangeMerge(files, 1);
for (const auto& file : files) {
ASSERT_EQ(file->file_state(), BlobFileMeta::FileState::kNormal);
}
// max_sorted_run = 1
// run 1: [a, b] [c, d] [e, f] [g, h] [i, j] [k, l]
// run 2: [e, f] [g, h]
// files overlaped with [e, h] will be marked
add_file(6, "e", "f");
add_file(7, "g", "h");
ScheduleRangeMerge(files, max_sorted_run);
// files overlapped with [e, h] will be marked
file_runs = std::vector<std::vector<std::pair<std::string, std::string>>>{
{{"a", "b"}, {"c", "d"}, {"e", "f"}, {"g", "h"}, {"i", "j"}, {"k", "l"}},
{{"e", "f"}, {"g", "h"}},
};
files = init_files(file_runs);
ScheduleRangeMerge(files, 1);
for (size_t i = 0; i < files.size(); i++) {
if (i == 2 || i == 3 || i == 6 || i == 7) {
ASSERT_EQ(files[i]->file_state(), BlobFileMeta::FileState::kToMerge);
......@@ -643,12 +661,16 @@ TEST_F(BlobGCJobTest, RangeMergeScheduler) {
}
}
// max_sorted_run = 1
// run 1: [a, b] [c, d] [e, f] [g, h] [i, j] [k, l]
// run 2: [a, b] [e, f] [g, h] [l, m]
// files overlaped with [a, b] and [e, h] will be marked
add_file(8, "a", "b");
add_file(9, "l", "m");
ScheduleRangeMerge(files, max_sorted_run);
// files overlapped with [a, b] and [e, h] will be marked
file_runs = std::vector<std::vector<std::pair<std::string, std::string>>>{
{{"a", "b"}, {"c", "d"}, {"e", "f"}, {"g", "h"}, {"i", "j"}, {"k", "l"}},
{{"a", "b"}, {"e", "f"}, {"g", "h"}, {"l", "m"}},
};
files = init_files(file_runs);
ScheduleRangeMerge(files, 1);
for (size_t i = 0; i < files.size(); i++) {
if (i == 1 || i == 4 || i == 5 || i == 9) {
ASSERT_EQ(files[i]->file_state(), BlobFileMeta::FileState::kNormal);
......@@ -658,13 +680,93 @@ TEST_F(BlobGCJobTest, RangeMergeScheduler) {
}
}
max_sorted_run = 2;
// max_sorted_run = 2
// run 1: [a, b] [c, d] [e, f] [g, h] [i, j] [k, l]
// run 2: [c, l]
// no file will be marked
file_runs = std::vector<std::vector<std::pair<std::string, std::string>>>{
{{"a", "b"}, {"c", "d"}, {"e", "f"}, {"g", "h"}, {"i", "j"}, {"k", "l"}},
{{"c", "l"}},
};
files = init_files(file_runs);
ScheduleRangeMerge(files, 2);
for (const auto& file : files) {
ASSERT_EQ(file->file_state(), BlobFileMeta::FileState::kNormal);
}
// max_sorted_run = 2
// run 1: [a, b] [c, d] [e, f] [g, h] [i, j] [k, l]
// run 2: [c, d]
// run 3: [c, d]
// files overlapped with [c, d] will be marked.
file_runs = std::vector<std::vector<std::pair<std::string, std::string>>>{
{{"a", "b"}, {"c", "d"}, {"e", "f"}, {"g", "h"}, {"i", "j"}, {"k", "l"}},
{{"c", "d"}},
{{"c", "d"}},
};
files = init_files(file_runs);
ScheduleRangeMerge(files, 2);
for (size_t i = 0; i < files.size(); i++) {
if (i == 1 || i == 6 || i == 7) {
ASSERT_EQ(files[i]->file_state(), BlobFileMeta::FileState::kToMerge);
files[i]->FileStateTransit(BlobFileMeta::FileEvent::kReset);
} else {
ASSERT_EQ(files[i]->file_state(), BlobFileMeta::FileState::kNormal);
}
}
// max_sorted_run = 2
// run 1: [a, b] [c, d] [e, f] [g, h] [i, j] [k, l]
// run 2: [b1, d]
// run 3: [a, d]
// files overlapped with [c, d] will be marked.
file_runs = std::vector<std::vector<std::pair<std::string, std::string>>>{
{{"a", "b"}, {"c", "d"}, {"e", "f"}, {"g", "h"}, {"i", "j"}, {"k", "l"}},
{{"b1", "d"}},
{{"a", "d"}},
};
files = init_files(file_runs);
ScheduleRangeMerge(files, 2);
for (size_t i = 0; i < files.size(); i++) {
if (i == 1 || i == 6 || i == 7) {
ASSERT_EQ(files[i]->file_state(), BlobFileMeta::FileState::kToMerge);
files[i]->FileStateTransit(BlobFileMeta::FileEvent::kReset);
} else {
ASSERT_EQ(files[i]->file_state(), BlobFileMeta::FileState::kNormal);
}
}
// max_sorted_run = 2;
// run 1: [a, b] [c, d] [e, f] [g, h] [i, j] [k, l]
// run 2: [a, b] [e, f] [g, h] [l, m]
// run 3: [e, g1]
// files overlapped with [e, g] will be marked.
file_runs = std::vector<std::vector<std::pair<std::string, std::string>>>{
{{"a", "b"}, {"c", "d"}, {"e", "f"}, {"g", "h"}, {"i", "j"}, {"k", "l"}},
{{"a", "b"}, {"e", "f"}, {"g", "h"}, {"l", "m"}},
{{"e", "g1"}}};
files = init_files(file_runs);
ScheduleRangeMerge(files, 2);
for (size_t i = 0; i < files.size(); i++) {
if (i == 2 || i == 3 || i == 7 || i == 8 || i == 10) {
ASSERT_EQ(files[i]->file_state(), BlobFileMeta::FileState::kToMerge);
files[i]->FileStateTransit(BlobFileMeta::FileEvent::kReset);
} else {
ASSERT_EQ(files[i]->file_state(), BlobFileMeta::FileState::kNormal);
}
}
// max_sorted_run = 2;
// run 1: [a, b] [c, d] [e, f] [g, h] [i, j] [k, l]
// run 2: [a, b] [e, f] [g, h] [l, m]
// run 3: [a, l1]
// files overlaped with [a, b] and [e, h] will be marked.
add_file(10, "a", "l1");
ScheduleRangeMerge(files, max_sorted_run);
// files overlapped with [a, b] and [e, h] will be marked.
file_runs = std::vector<std::vector<std::pair<std::string, std::string>>>{
{{"a", "b"}, {"c", "d"}, {"e", "f"}, {"g", "h"}, {"i", "j"}, {"k", "l"}},
{{"a", "b"}, {"e", "f"}, {"g", "h"}, {"l", "m"}},
{{"a", "l1"}}};
files = init_files(file_runs);
ScheduleRangeMerge(files, 2);
for (size_t i = 0; i < files.size(); i++) {
if (i == 1 || i == 4 || i == 5 || i == 9) {
ASSERT_EQ(files[i]->file_state(), BlobFileMeta::FileState::kNormal);
......
......@@ -70,8 +70,8 @@ class TitanDBImpl::FileManager : public BlobFileManager {
ROCKS_LOG_INFO(db_->db_options_.info_log,
"Titan adding blob file [%" PRIu64 "] range [%s, %s]",
file.first->file_number(),
file.first->smallest_key().c_str(),
file.first->largest_key().c_str());
Slice(file.first->smallest_key()).ToString(true).c_str(),
Slice(file.first->largest_key()).ToString(true).c_str());
edit.AddBlobFile(file.first);
}
......@@ -878,20 +878,17 @@ void TitanDBImpl::MarkFileIfNeedMerge(
};
std::sort(blob_ends.begin(), blob_ends.end(), blob_ends_cmp);
int cur_add = 0;
int cur_remove = 0;
int size = blob_ends.size();
std::unordered_map<BlobFileMeta*, int> tmp;
for (int i = 0; i < size; i++) {
if (blob_ends[i].second) {
++cur_add;
tmp[blob_ends[i].first] = cur_remove;
} else {
++cur_remove;
auto record = tmp.find(blob_ends[i].first);
if (cur_add - record->second > max_sorted_runs) {
record->first->FileStateTransit(BlobFileMeta::FileEvent::kNeedMerge);
std::unordered_set<BlobFileMeta*> set;
for (auto& end : blob_ends) {
if (end.second) {
set.insert(end.first);
if (set.size() > static_cast<size_t>(max_sorted_runs)) {
for (auto file : set) {
file->FileStateTransit(BlobFileMeta::FileEvent::kNeedMerge);
}
}
} else {
set.erase(end.first);
}
}
}
......
......@@ -760,6 +760,10 @@ DEFINE_uint64(blob_db_min_blob_size, 0,
// Titan Options
DEFINE_bool(use_titan, true, "Open a Titan instance.");
DEFINE_bool(titan_level_merge, false, "Enable Titan level merge.");
DEFINE_bool(titan_range_merge, false, "Enable Titan range merge.");
DEFINE_uint64(titan_min_blob_size, 0,
"Smallest blob to store in a file. Blobs smaller than this "
"will be inlined with the key in the LSM tree.");
......@@ -3825,6 +3829,8 @@ class Benchmark {
options.listeners.emplace_back(listener_);
opts->min_blob_size = FLAGS_titan_min_blob_size;
opts->level_merge = FLAGS_titan_level_merge;
opts->range_merge = FLAGS_titan_range_merge;
opts->disable_background_gc = FLAGS_titan_disable_background_gc;
opts->max_background_gc = FLAGS_titan_max_background_gc;
opts->min_gc_batch_size = 128 << 20;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment