Commit ff30897d authored by Connor's avatar Connor Committed by yiwu-arbug

reset gc mark after sampling (#88)

Reset gc mark after sampling, Fix #87 
parent cb67cd56
......@@ -94,6 +94,12 @@ Status BlobFileSet::Recover() {
"Next blob file number is %" PRIu64 ".", next_file_number);
}
// Make sure perform gc on all files at the beginning
MarkAllFilesForGC();
for (auto& cf : column_families_) {
cf.second->ComputeGCScore();
}
auto new_manifest_file_number = NewFileNumber();
s = OpenManifest(new_manifest_file_number);
if (!s.ok()) return s;
......@@ -137,9 +143,6 @@ Status BlobFileSet::Recover() {
env_->DeleteFile(dirname_ + "/" + f);
}
// Make sure perform gc on all files at the beginning
MarkAllFilesForGC();
return Status::OK();
}
......
......@@ -32,6 +32,7 @@ void BlobGC::MarkFilesBeingGC() {
void BlobGC::ReleaseGcFiles() {
for (auto& f : inputs_) {
f->set_gc_mark(false);
f->FileStateTransit(BlobFileMeta::FileEvent::kGCCompleted);
}
......
......@@ -43,6 +43,8 @@ class BlobGCJobTest : public testing::Test {
}
~BlobGCJobTest() {}
void DisableMergeSmall() { options_.merge_small_file_threshold = 0; }
std::weak_ptr<BlobStorage> GetBlobStorage(uint32_t cf_id) {
MutexLock l(mutex_);
return blob_file_set_->GetBlobStorage(cf_id);
......@@ -73,6 +75,10 @@ class BlobGCJobTest : public testing::Test {
void NewDB() {
ClearDir();
Open();
}
void Open() {
ASSERT_OK(TitanDB::Open(options_, dbname_, &db_));
tdb_ = reinterpret_cast<TitanDBImpl*>(db_);
blob_file_set_ = tdb_->blob_file_set_.get();
......@@ -80,6 +86,11 @@ class BlobGCJobTest : public testing::Test {
base_db_ = reinterpret_cast<DBImpl*>(tdb_->GetRootDB());
}
void Reopen() {
DestroyDB();
Open();
}
void Flush() {
FlushOptions fopts;
fopts.wait = true;
......@@ -95,6 +106,11 @@ class BlobGCJobTest : public testing::Test {
ASSERT_OK(db_->CompactRange(compact_opts, nullptr, nullptr));
}
void ReComputeGCScore() {
auto b = GetBlobStorage(base_db_->DefaultColumnFamily()->GetID()).lock();
b->ComputeGCScore();
}
void DestroyDB() {
Status s __attribute__((__unused__)) = db_->Close();
assert(s.ok());
......@@ -102,7 +118,7 @@ class BlobGCJobTest : public testing::Test {
db_ = nullptr;
}
void RunGC(bool expected = false) {
void RunGC(bool expected, bool disable_merge_small = false) {
MutexLock l(mutex_);
Status s;
auto* cfh = base_db_->DefaultColumnFamily();
......@@ -112,6 +128,9 @@ class BlobGCJobTest : public testing::Test {
TitanCFOptions cf_options;
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options.info_log.get());
cf_options.min_gc_batch_size = 0;
if (disable_merge_small) {
cf_options.merge_small_file_threshold = 0;
}
cf_options.blob_file_discardable_ratio = 0.4;
cf_options.sample_file_size_ratio = 1;
......@@ -123,9 +142,7 @@ class BlobGCJobTest : public testing::Test {
blob_file_set_->GetBlobStorage(cfh->GetID()).lock().get());
}
if (expected) {
ASSERT_TRUE(blob_gc != nullptr);
}
ASSERT_TRUE((blob_gc != nullptr) == expected);
if (blob_gc) {
blob_gc->SetColumnFamily(cfh);
......@@ -151,6 +168,7 @@ class BlobGCJobTest : public testing::Test {
s = blob_gc_job.Finish();
ASSERT_OK(s);
}
blob_gc->ReleaseGcFiles();
}
mutex_->Unlock();
......@@ -205,16 +223,14 @@ class BlobGCJobTest : public testing::Test {
Flush();
std::string result;
for (int i = 0; i < MAX_KEY_NUM; i++) {
if (i % 2 != 0) continue;
if (i % 3 == 0) continue;
db_->Delete(WriteOptions(), GenKey(i));
}
Flush();
CompactAll();
auto b = GetBlobStorage(base_db_->DefaultColumnFamily()->GetID()).lock();
ASSERT_EQ(b->files_.size(), 1);
auto old = b->files_.begin()->first;
// for (auto& f : b->files_) {
// f.second->marked_for_sample = false;
// }
std::unique_ptr<BlobFileIterator> iter;
ASSERT_OK(NewIterator(b->files_.begin()->second->file_number(),
b->files_.begin()->second->file_size(), &iter));
......@@ -224,7 +240,7 @@ class BlobGCJobTest : public testing::Test {
ASSERT_TRUE(iter->Valid());
ASSERT_TRUE(iter->key().compare(Slice(GenKey(i))) == 0);
}
RunGC();
RunGC(true);
b = GetBlobStorage(base_db_->DefaultColumnFamily()->GetID()).lock();
ASSERT_EQ(b->files_.size(), 1);
auto new1 = b->files_.begin()->first;
......@@ -235,7 +251,7 @@ class BlobGCJobTest : public testing::Test {
auto* db_iter = db_->NewIterator(ReadOptions(), db_->DefaultColumnFamily());
db_iter->SeekToFirst();
for (int i = 0; i < MAX_KEY_NUM; i++) {
if (i % 2 == 0) continue;
if (i % 3 != 0) continue;
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_TRUE(iter->key().compare(Slice(GenKey(i))) == 0);
......@@ -327,7 +343,7 @@ TEST_F(BlobGCJobTest, GCLimiter) {
NewDB();
PutAndUpdate();
test_limiter->Reset();
RunGC();
RunGC(true);
ASSERT_TRUE(test_limiter->WriteRequested());
ASSERT_FALSE(test_limiter->ReadRequested());
DestroyDB();
......@@ -337,7 +353,7 @@ TEST_F(BlobGCJobTest, GCLimiter) {
NewDB();
PutAndUpdate();
test_limiter->Reset();
RunGC();
RunGC(true);
ASSERT_FALSE(test_limiter->WriteRequested());
ASSERT_TRUE(test_limiter->ReadRequested());
DestroyDB();
......@@ -347,12 +363,36 @@ TEST_F(BlobGCJobTest, GCLimiter) {
NewDB();
PutAndUpdate();
test_limiter->Reset();
RunGC();
RunGC(true);
ASSERT_TRUE(test_limiter->WriteRequested());
ASSERT_TRUE(test_limiter->ReadRequested());
DestroyDB();
}
TEST_F(BlobGCJobTest, Reopen) {
DisableMergeSmall();
NewDB();
for (int i = 0; i < 10; i++) {
db_->Put(WriteOptions(), GenKey(i), GenValue(i));
}
Flush();
CheckBlobNumber(1);
Reopen();
RunGC(true, true);
CheckBlobNumber(1);
// trigger compute gc score
ReComputeGCScore();
RunGC(false, true);
CheckBlobNumber(1);
DestroyDB();
}
// Tests blob file will be kept after GC, if it is still visible by active
// snapshots.
TEST_F(BlobGCJobTest, PurgeBlobs) {
......@@ -375,34 +415,38 @@ TEST_F(BlobGCJobTest, PurgeBlobs) {
CheckBlobNumber(1);
auto snap4 = db_->GetSnapshot();
RunGC();
CheckBlobNumber(1);
for (int i = 10; i < 20; i++) {
db_->Put(WriteOptions(), GenKey(i), GenValue(i));
}
Flush();
auto snap5 = db_->GetSnapshot();
CheckBlobNumber(2);
// merge two blob files into one
CompactAll();
RunGC(true);
CheckBlobNumber(3);
auto snap5 = db_->GetSnapshot();
db_->ReleaseSnapshot(snap2);
RunGC();
RunGC(false);
CheckBlobNumber(3);
db_->ReleaseSnapshot(snap3);
RunGC();
RunGC(false);
CheckBlobNumber(3);
db_->ReleaseSnapshot(snap1);
RunGC();
RunGC(false);
CheckBlobNumber(3);
db_->ReleaseSnapshot(snap4);
RunGC();
CheckBlobNumber(2);
RunGC(false);
CheckBlobNumber(3);
db_->ReleaseSnapshot(snap5);
RunGC();
RunGC(false);
CheckBlobNumber(1);
DestroyDB();
......@@ -413,6 +457,11 @@ TEST_F(BlobGCJobTest, DeleteFilesInRange) {
ASSERT_OK(db_->Put(WriteOptions(), GenKey(2), GenValue(21)));
ASSERT_OK(db_->Put(WriteOptions(), GenKey(4), GenValue(4)));
ASSERT_OK(db_->Put(WriteOptions(), GenKey(5), GenValue(5)));
ASSERT_OK(db_->Put(WriteOptions(), GenKey(6), GenValue(5)));
ASSERT_OK(db_->Put(WriteOptions(), GenKey(7), GenValue(5)));
ASSERT_OK(db_->Put(WriteOptions(), GenKey(8), GenValue(5)));
ASSERT_OK(db_->Put(WriteOptions(), GenKey(9), GenValue(5)));
Flush();
CompactAll();
std::string value;
......@@ -421,6 +470,18 @@ TEST_F(BlobGCJobTest, DeleteFilesInRange) {
ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level6", &value));
ASSERT_EQ(value, "1");
ASSERT_OK(db_->Delete(WriteOptions(), GenKey(5)));
ASSERT_OK(db_->Delete(WriteOptions(), GenKey(6)));
ASSERT_OK(db_->Delete(WriteOptions(), GenKey(7)));
ASSERT_OK(db_->Delete(WriteOptions(), GenKey(8)));
ASSERT_OK(db_->Delete(WriteOptions(), GenKey(9)));
CompactAll();
ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level0", &value));
ASSERT_EQ(value, "0");
ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level6", &value));
ASSERT_EQ(value, "1");
SstFileWriter sst_file_writer(EnvOptions(), options_);
std::string sst_file = options_.dirname + "/for_ingest.sst";
ASSERT_OK(sst_file_writer.Open(sst_file));
......@@ -435,6 +496,8 @@ TEST_F(BlobGCJobTest, DeleteFilesInRange) {
ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level6", &value));
ASSERT_EQ(value, "1");
CheckBlobNumber(1);
RunGC(true);
std::string key0 = GenKey(0);
......
......@@ -27,6 +27,9 @@ std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC(
bool maybe_continue_next_time = false;
uint64_t next_gc_size = 0;
for (auto& gc_score : blob_storage->gc_score()) {
if (gc_score.score < cf_options_.blob_file_discardable_ratio) {
break;
}
auto blob_file = blob_storage->FindFile(gc_score.file_number).lock();
if (!CheckBlobFile(blob_file.get())) {
RecordTick(stats_, TitanStats::GC_NO_NEED, 1);
......@@ -48,21 +51,14 @@ std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC(
stop_picking = true;
}
} else {
if (blob_file->file_size() <= cf_options_.merge_small_file_threshold ||
blob_file->gc_mark() ||
blob_file->GetDiscardableRatio() >=
cf_options_.blob_file_discardable_ratio) {
next_gc_size += blob_file->file_size();
if (next_gc_size > cf_options_.min_gc_batch_size) {
maybe_continue_next_time = true;
RecordTick(stats_, TitanStats::GC_REMAIN, 1);
ROCKS_LOG_INFO(db_options_.info_log,
"remain more than %" PRIu64
" bytes to be gc and trigger after this gc",
next_gc_size);
break;
}
} else {
next_gc_size += blob_file->file_size();
if (next_gc_size > cf_options_.min_gc_batch_size) {
maybe_continue_next_time = true;
RecordTick(stats_, TitanStats::GC_REMAIN, 1);
ROCKS_LOG_INFO(db_options_.info_log,
"remain more than %" PRIu64
" bytes to be gc and trigger after this gc",
next_gc_size);
break;
}
}
......@@ -71,8 +67,17 @@ std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC(
"got batch size %" PRIu64 ", estimate output %" PRIu64
" bytes",
batch_size, estimate_output_size);
if (blob_files.empty() || batch_size < cf_options_.min_gc_batch_size)
if (blob_files.empty() || batch_size < cf_options_.min_gc_batch_size) {
return nullptr;
}
// if there is only one small file to merge, no need to perform
if (blob_files.size() == 1 &&
blob_files[0]->file_size() <= cf_options_.merge_small_file_threshold &&
blob_files[0]->gc_mark() == false &&
blob_files[0]->GetDiscardableRatio() <
cf_options_.blob_file_discardable_ratio) {
return nullptr;
}
return std::unique_ptr<BlobGC>(new BlobGC(
std::move(blob_files), std::move(cf_options_), maybe_continue_next_time));
......
......@@ -57,9 +57,13 @@ TEST_F(BlobGCPickerTest, Basic) {
AddBlobFile(1U, 1U, 0U);
UpdateBlobStorage();
auto blob_gc = basic_blob_gc_picker_->PickBlobGC(blob_storage_.get());
ASSERT_TRUE(blob_gc == nullptr);
AddBlobFile(2U, 1U, 0U);
UpdateBlobStorage();
blob_gc = basic_blob_gc_picker_->PickBlobGC(blob_storage_.get());
ASSERT_TRUE(blob_gc != nullptr);
ASSERT_EQ(blob_gc->inputs().size(), 1);
ASSERT_EQ(blob_gc->inputs()[0]->file_number(), 1U);
ASSERT_EQ(blob_gc->inputs().size(), 2);
}
TEST_F(BlobGCPickerTest, BeingGC) {
......@@ -74,10 +78,12 @@ TEST_F(BlobGCPickerTest, BeingGC) {
NewBlobStorageAndPicker(titan_db_options, titan_cf_options);
AddBlobFile(1U, 1U, 0U, true);
AddBlobFile(2U, 1U, 0U);
AddBlobFile(3U, 1U, 0U);
UpdateBlobStorage();
blob_gc = basic_blob_gc_picker_->PickBlobGC(blob_storage_.get());
ASSERT_EQ(blob_gc->inputs().size(), 1);
ASSERT_EQ(blob_gc->inputs()[0]->file_number(), 2U);
ASSERT_EQ(blob_gc->inputs().size(), 2);
ASSERT_NE(blob_gc->inputs()[0]->file_number(), 1U);
ASSERT_NE(blob_gc->inputs()[1]->file_number(), 1U);
}
TEST_F(BlobGCPickerTest, TriggerNext) {
......@@ -94,15 +100,6 @@ TEST_F(BlobGCPickerTest, TriggerNext) {
auto blob_gc = basic_blob_gc_picker_->PickBlobGC(blob_storage_.get());
ASSERT_TRUE(blob_gc != nullptr);
ASSERT_EQ(blob_gc->trigger_next(), true);
NewBlobStorageAndPicker(titan_db_options, titan_cf_options);
AddBlobFile(1U, 1U << 30, 0U); // valid_size = 1GB
AddBlobFile(2U, 1U << 30, 0U); // valid_size = 1GB
AddBlobFile(3U, 1U << 30, 0U); // valid_size = 1GB
AddBlobFile(4U, 1U << 30, 0U); // valid_size = 1GB
UpdateBlobStorage();
blob_gc = basic_blob_gc_picker_->PickBlobGC(blob_storage_.get());
ASSERT_TRUE(blob_gc != nullptr);
ASSERT_EQ(blob_gc->trigger_next(), false);
}
TEST_F(BlobGCPickerTest, PickFileAndTriggerNext) {
......
......@@ -123,6 +123,15 @@ class TitanDBTest : public testing::Test {
}
}
void Delete(uint64_t k) {
WriteOptions wopts;
std::string key = GenKey(k);
ASSERT_OK(db_->Delete(wopts, key));
for (auto& handle : cf_handles_) {
ASSERT_OK(db_->Delete(wopts, handle, key));
}
}
void Flush() {
FlushOptions fopts;
ASSERT_OK(db_->Flush(fopts));
......@@ -918,6 +927,7 @@ TEST_F(TitanDBTest, FallbackModeEncounterMissingBlobFile) {
ASSERT_EQ(1, GetBlobStorage().lock()->NumBlobFiles());
ASSERT_OK(db_->Delete(WriteOptions(), "foo"));
ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
uint32_t default_cf_id = db_->DefaultColumnFamily()->GetID();
// GC the first blob file.
ASSERT_OK(db_impl_->TEST_StartGC(default_cf_id));
......@@ -964,6 +974,9 @@ TEST_F(TitanDBTest, BackgroundErrorTrigger) {
Put(i, &data);
}
Flush();
for (uint64_t i = 1; i <= kNumEntries; i++) {
Delete(i);
}
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
SyncPoint::GetInstance()->SetCallBack("BlobFileSet::LogAndApply", [&](void*) {
mock_env->SetFilesystemActive(false, Status::IOError("Injected error"));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment