Commit 8d90ddae authored by Glitter's avatar Glitter Committed by Connor

Trigger next gc if needed after a gc (#36)

* trigger next gc after gc if needed
parent a4e585a7
...@@ -137,6 +137,9 @@ class BlobFileMeta { ...@@ -137,6 +137,9 @@ class BlobFileMeta {
bool is_obsolete() const { return state_ == FileState::kObsolete; } bool is_obsolete() const { return state_ == FileState::kObsolete; }
uint64_t discardable_size() const { return discardable_size_; } uint64_t discardable_size() const { return discardable_size_; }
bool gc_mark() const { return gc_mark_; }
void set_gc_mark(bool mark) { gc_mark_ = mark; }
void FileStateTransit(const FileEvent& event); void FileStateTransit(const FileEvent& event);
void AddDiscardableSize(uint64_t _discardable_size); void AddDiscardableSize(uint64_t _discardable_size);
...@@ -151,7 +154,9 @@ class BlobFileMeta { ...@@ -151,7 +154,9 @@ class BlobFileMeta {
FileState state_{FileState::kInit}; FileState state_{FileState::kInit};
uint64_t discardable_size_{0}; uint64_t discardable_size_{0};
// bool marked_for_gc_{false}; // gc_mark is set to true when this file is recovered from re-opening the DB
// that means this file needs to be checked for GC
bool gc_mark_{false};
}; };
// Blob file header format. // Blob file header format.
......
...@@ -4,9 +4,10 @@ namespace rocksdb { ...@@ -4,9 +4,10 @@ namespace rocksdb {
namespace titandb { namespace titandb {
BlobGC::BlobGC(std::vector<BlobFileMeta*>&& blob_files, BlobGC::BlobGC(std::vector<BlobFileMeta*>&& blob_files,
TitanCFOptions&& _titan_cf_options) TitanCFOptions&& _titan_cf_options, bool need_trigger_next)
: inputs_(std::move(blob_files)), : inputs_(std::move(blob_files)),
titan_cf_options_(std::move(_titan_cf_options)) { titan_cf_options_(std::move(_titan_cf_options)),
trigger_next_(need_trigger_next) {
MarkFilesBeingGC(); MarkFilesBeingGC();
} }
......
...@@ -13,7 +13,7 @@ namespace titandb { ...@@ -13,7 +13,7 @@ namespace titandb {
class BlobGC { class BlobGC {
public: public:
BlobGC(std::vector<BlobFileMeta*>&& blob_files, BlobGC(std::vector<BlobFileMeta*>&& blob_files,
TitanCFOptions&& _titan_cf_options); TitanCFOptions&& _titan_cf_options, bool need_trigger_next);
// No copying allowed // No copying allowed
BlobGC(const BlobGC&) = delete; BlobGC(const BlobGC&) = delete;
...@@ -43,12 +43,16 @@ class BlobGC { ...@@ -43,12 +43,16 @@ class BlobGC {
void ReleaseGcFiles(); void ReleaseGcFiles();
bool trigger_next() { return trigger_next_; }
private: private:
std::vector<BlobFileMeta*> inputs_; std::vector<BlobFileMeta*> inputs_;
std::vector<BlobFileMeta*> sampled_inputs_; std::vector<BlobFileMeta*> sampled_inputs_;
std::vector<BlobFileMeta*> outputs_; std::vector<BlobFileMeta*> outputs_;
TitanCFOptions titan_cf_options_; TitanCFOptions titan_cf_options_;
ColumnFamilyHandle* cfh_{nullptr}; ColumnFamilyHandle* cfh_{nullptr};
// Whether need to trigger gc after this gc or not
const bool trigger_next_;
}; };
struct GCScore { struct GCScore {
......
#include "blob_gc_job.h"
#ifndef __STDC_FORMAT_MACROS #ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS #define __STDC_FORMAT_MACROS
#endif #endif
#include <inttypes.h> #include <inttypes.h>
#include "blob_gc_job.h"
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
...@@ -166,8 +165,10 @@ Status BlobGCJob::SampleCandidateFiles() { ...@@ -166,8 +165,10 @@ Status BlobGCJob::SampleCandidateFiles() {
Status BlobGCJob::DoSample(const BlobFileMeta* file, bool* selected) { Status BlobGCJob::DoSample(const BlobFileMeta* file, bool* selected) {
assert(selected != nullptr); assert(selected != nullptr);
if (file->GetDiscardableRatio() >= if (file->file_size() <=
blob_gc_->titan_cf_options().blob_file_discardable_ratio) { blob_gc_->titan_cf_options().merge_small_file_threshold ||
file->GetDiscardableRatio() >=
blob_gc_->titan_cf_options().blob_file_discardable_ratio) {
*selected = true; *selected = true;
return Status::OK(); return Status::OK();
} }
......
...@@ -185,7 +185,7 @@ class BlobGCJobTest : public testing::Test { ...@@ -185,7 +185,7 @@ class BlobGCJobTest : public testing::Test {
auto rewrite_status = base_db_->Write(WriteOptions(), &wb); auto rewrite_status = base_db_->Write(WriteOptions(), &wb);
std::vector<BlobFileMeta*> tmp; std::vector<BlobFileMeta*> tmp;
BlobGC blob_gc(std::move(tmp), TitanCFOptions()); BlobGC blob_gc(std::move(tmp), TitanCFOptions(), false /*trigger_next*/);
blob_gc.SetColumnFamily(cfh); blob_gc.SetColumnFamily(cfh);
BlobGCJob blob_gc_job(&blob_gc, base_db_, mutex_, TitanDBOptions(), BlobGCJob blob_gc_job(&blob_gc, base_db_, mutex_, TitanDBOptions(),
Env::Default(), EnvOptions(), nullptr, version_set_, Env::Default(), EnvOptions(), nullptr, version_set_,
...@@ -294,19 +294,19 @@ TEST_F(BlobGCJobTest, PurgeBlobs) { ...@@ -294,19 +294,19 @@ TEST_F(BlobGCJobTest, PurgeBlobs) {
db_->ReleaseSnapshot(snap2); db_->ReleaseSnapshot(snap2);
RunGC(); RunGC();
CheckBlobNumber(2); CheckBlobNumber(3);
db_->ReleaseSnapshot(snap3); db_->ReleaseSnapshot(snap3);
RunGC(); RunGC();
CheckBlobNumber(2); CheckBlobNumber(3);
db_->ReleaseSnapshot(snap1); db_->ReleaseSnapshot(snap1);
RunGC(); RunGC();
CheckBlobNumber(2); CheckBlobNumber(3);
db_->ReleaseSnapshot(snap4); db_->ReleaseSnapshot(snap4);
RunGC(); RunGC();
CheckBlobNumber(1); CheckBlobNumber(2);
db_->ReleaseSnapshot(snap5); db_->ReleaseSnapshot(snap5);
RunGC(); RunGC();
......
...@@ -15,12 +15,21 @@ std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC( ...@@ -15,12 +15,21 @@ std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC(
std::vector<BlobFileMeta*> blob_files; std::vector<BlobFileMeta*> blob_files;
uint64_t batch_size = 0; uint64_t batch_size = 0;
uint64_t estimate_output_size = 0;
// ROCKS_LOG_INFO(db_options_.info_log, "blob file num:%lu gc score:%lu", // ROCKS_LOG_INFO(db_options_.info_log, "blob file num:%lu gc score:%lu",
// blob_storage->NumBlobFiles(), // blob_storage->NumBlobFiles(),
// blob_storage->gc_score().size()); // blob_storage->gc_score().size());
bool stop_picking = false;
bool maybe_continue_next_time = false;
uint64_t next_gc_size = 0;
for (auto& gc_score : blob_storage->gc_score()) { for (auto& gc_score : blob_storage->gc_score()) {
auto blob_file = blob_storage->FindFile(gc_score.file_number).lock(); auto blob_file = blob_storage->FindFile(gc_score.file_number).lock();
assert(blob_file); if (!blob_file ||
blob_file->file_state() == BlobFileMeta::FileState::kBeingGC) {
// Skip this file id this file is being GCed
// or this file had been GCed
continue;
}
// ROCKS_LOG_INFO(db_options_.info_log, // ROCKS_LOG_INFO(db_options_.info_log,
// "file number:%lu score:%f being_gc:%d pending:%d, " // "file number:%lu score:%f being_gc:%d pending:%d, "
...@@ -37,17 +46,46 @@ std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC( ...@@ -37,17 +46,46 @@ std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC(
blob_file->file_number()); blob_file->file_number());
continue; continue;
} }
blob_files.push_back(blob_file.get());
batch_size += blob_file->file_size(); if (!stop_picking) {
if (batch_size >= cf_options_.max_gc_batch_size) break; blob_files.push_back(blob_file.get());
batch_size += blob_file->file_size();
estimate_output_size +=
(blob_file->file_size() - blob_file->discardable_size());
if (batch_size >= cf_options_.max_gc_batch_size ||
estimate_output_size >= cf_options_.blob_file_target_size) {
// Stop pick file for this gc, but still check file for whether need
// trigger gc after this
stop_picking = true;
}
} else {
if (blob_file->file_size() <= cf_options_.merge_small_file_threshold ||
blob_file->gc_mark() ||
blob_file->GetDiscardableRatio() >=
cf_options_.blob_file_discardable_ratio) {
next_gc_size += blob_file->file_size();
if (next_gc_size > cf_options_.min_gc_batch_size) {
maybe_continue_next_time = true;
ROCKS_LOG_INFO(db_options_.info_log,
"remain more than %" PRIu64
" bytes to be gc and trigger after this gc",
next_gc_size);
break;
}
} else {
break;
}
}
} }
ROCKS_LOG_DEBUG(db_options_.info_log,
"got batch size %" PRIu64 ", estimate output %" PRIu64
" bytes",
batch_size, estimate_output_size);
if (blob_files.empty() || batch_size < cf_options_.min_gc_batch_size) if (blob_files.empty() || batch_size < cf_options_.min_gc_batch_size)
return nullptr; return nullptr;
return std::unique_ptr<BlobGC>( return std::unique_ptr<BlobGC>(new BlobGC(
new BlobGC(std::move(blob_files), std::move(cf_options_))); std::move(blob_files), std::move(cf_options_), maybe_continue_next_time));
} }
bool BasicBlobGCPicker::CheckBlobFile(BlobFileMeta* blob_file) const { bool BasicBlobGCPicker::CheckBlobFile(BlobFileMeta* blob_file) const {
......
...@@ -39,6 +39,11 @@ class BlobGCPickerTest : public testing::Test { ...@@ -39,6 +39,11 @@ class BlobGCPickerTest : public testing::Test {
blob_storage_->files_[file_number] = f; blob_storage_->files_[file_number] = f;
} }
void RemoveBlobFile(uint64_t file_number) {
ASSERT_TRUE(blob_storage_->files_[file_number] != nullptr);
blob_storage_->files_.erase(file_number);
}
void UpdateBlobStorage() { blob_storage_->ComputeGCScore(); } void UpdateBlobStorage() { blob_storage_->ComputeGCScore(); }
}; };
...@@ -73,6 +78,88 @@ TEST_F(BlobGCPickerTest, BeingGC) { ...@@ -73,6 +78,88 @@ TEST_F(BlobGCPickerTest, BeingGC) {
ASSERT_EQ(blob_gc->inputs()[0]->file_number(), 2U); ASSERT_EQ(blob_gc->inputs()[0]->file_number(), 2U);
} }
TEST_F(BlobGCPickerTest, TriggerNext) {
TitanDBOptions titan_db_options;
TitanCFOptions titan_cf_options;
titan_cf_options.max_gc_batch_size = 1 << 30;
titan_cf_options.blob_file_target_size = 256 << 20;
NewBlobStorageAndPicker(titan_db_options, titan_cf_options);
AddBlobFile(1U, 1U << 30, 1000U << 20); // valid_size = 24MB
AddBlobFile(2U, 1U << 30, 512U << 20); // valid_size = 512MB
AddBlobFile(3U, 1U << 30, 512U << 20); // valid_size = 512MB
AddBlobFile(4U, 1U << 30, 512U << 20); // valid_size = 512MB
UpdateBlobStorage();
auto blob_gc = basic_blob_gc_picker_->PickBlobGC(blob_storage_.get());
ASSERT_TRUE(blob_gc != nullptr);
ASSERT_EQ(blob_gc->trigger_next(), true);
NewBlobStorageAndPicker(titan_db_options, titan_cf_options);
AddBlobFile(1U, 1U << 30, 0U); // valid_size = 1GB
AddBlobFile(2U, 1U << 30, 0U); // valid_size = 1GB
AddBlobFile(3U, 1U << 30, 0U); // valid_size = 1GB
AddBlobFile(4U, 1U << 30, 0U); // valid_size = 1GB
UpdateBlobStorage();
blob_gc = basic_blob_gc_picker_->PickBlobGC(blob_storage_.get());
ASSERT_TRUE(blob_gc != nullptr);
ASSERT_EQ(blob_gc->trigger_next(), false);
}
TEST_F(BlobGCPickerTest, PickFileAndTriggerNext) {
TitanDBOptions titan_db_options;
TitanCFOptions titan_cf_options;
titan_cf_options.max_gc_batch_size = 1 << 30;
titan_cf_options.blob_file_target_size = 256 << 20;
NewBlobStorageAndPicker(titan_db_options, titan_cf_options);
for (size_t i = 1; i < 41; i++) {
// add 70 files with 10MB valid data each file
AddBlobFile(i, titan_cf_options.blob_file_target_size, 246 << 20);
}
UpdateBlobStorage();
int gc_times = 0;
auto blob_gc = basic_blob_gc_picker_->PickBlobGC(blob_storage_.get());
ASSERT_TRUE(blob_gc != nullptr);
while (blob_gc != nullptr && blob_gc->trigger_next()) {
gc_times++;
ASSERT_EQ(blob_gc->trigger_next(), true);
ASSERT_EQ(blob_gc->inputs().size(), 4);
for (auto file : blob_gc->inputs()) {
RemoveBlobFile(file->file_number());
}
UpdateBlobStorage();
blob_gc = basic_blob_gc_picker_->PickBlobGC(blob_storage_.get());
}
ASSERT_EQ(gc_times, 9);
ASSERT_TRUE(blob_gc != nullptr);
ASSERT_EQ(blob_gc->inputs().size(), 4);
}
TEST_F(BlobGCPickerTest, ParallelPickGC) {
TitanDBOptions titan_db_options;
TitanCFOptions titan_cf_options;
titan_cf_options.max_gc_batch_size = 1 << 30;
titan_cf_options.blob_file_target_size = 256 << 20;
NewBlobStorageAndPicker(titan_db_options, titan_cf_options);
for (size_t i = 1; i < 9; i++) {
// add 70 files with 10MB valid data each file
AddBlobFile(i, titan_cf_options.blob_file_target_size, 246 << 20);
}
UpdateBlobStorage();
auto blob_gc1 = basic_blob_gc_picker_->PickBlobGC(blob_storage_.get());
ASSERT_TRUE(blob_gc1 != nullptr);
ASSERT_EQ(blob_gc1->trigger_next(), true);
ASSERT_EQ(blob_gc1->inputs().size(), 4);
auto blob_gc2 = basic_blob_gc_picker_->PickBlobGC(blob_storage_.get());
ASSERT_TRUE(blob_gc2 != nullptr);
ASSERT_EQ(blob_gc2->trigger_next(), false);
ASSERT_EQ(blob_gc2->inputs().size(), 4);
for (auto file : blob_gc1->inputs()) {
RemoveBlobFile(file->file_number());
}
for (auto file : blob_gc2->inputs()) {
RemoveBlobFile(file->file_number());
}
UpdateBlobStorage();
}
} // namespace titandb } // namespace titandb
} // namespace rocksdb } // namespace rocksdb
......
...@@ -121,8 +121,12 @@ void BlobStorage::ComputeGCScore() { ...@@ -121,8 +121,12 @@ void BlobStorage::ComputeGCScore() {
gc_score_.push_back({}); gc_score_.push_back({});
auto& gcs = gc_score_.back(); auto& gcs = gc_score_.back();
gcs.file_number = file.first; gcs.file_number = file.first;
if (file.second->file_size() < cf_options_.merge_small_file_threshold) { if (file.second->file_size() < cf_options_.merge_small_file_threshold ||
gcs.score = 1; file.second->gc_mark()) {
// for the small file or file with gc mark (usually the file that just
// recovered) we want gc these file but more hope to gc other file with
// more invalid data
gcs.score = cf_options_.blob_file_discardable_ratio;
} else { } else {
gcs.score = file.second->GetDiscardableRatio(); gcs.score = file.second->GetDiscardableRatio();
} }
......
#pragma once #pragma once
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h> #include <inttypes.h>
#include "blob_file_cache.h" #include "blob_file_cache.h"
#include "blob_format.h" #include "blob_format.h"
#include "blob_gc.h" #include "blob_gc.h"
...@@ -65,6 +66,7 @@ class BlobStorage { ...@@ -65,6 +66,7 @@ class BlobStorage {
void MarkAllFilesForGC() { void MarkAllFilesForGC() {
MutexLock l(&mutex_); MutexLock l(&mutex_);
for (auto& file : files_) { for (auto& file : files_) {
file.second->set_gc_mark(true);
file.second->FileStateTransit(BlobFileMeta::FileEvent::kDbRestart); file.second->FileStateTransit(BlobFileMeta::FileEvent::kDbRestart);
} }
} }
......
...@@ -149,6 +149,8 @@ class TitanDBImpl : public TitanDB { ...@@ -149,6 +149,8 @@ class TitanDBImpl : public TitanDB {
// REQUIRE: mutex_ held // REQUIRE: mutex_ held
void AddToGCQueue(uint32_t column_family_id) { void AddToGCQueue(uint32_t column_family_id) {
mutex_.AssertHeld();
unscheduled_gc_++;
gc_queue_.push_back(column_family_id); gc_queue_.push_back(column_family_id);
} }
...@@ -245,6 +247,8 @@ class TitanDBImpl : public TitanDB { ...@@ -245,6 +247,8 @@ class TitanDBImpl : public TitanDB {
// Guarded by mutex_. // Guarded by mutex_.
int bg_gc_scheduled_{0}; int bg_gc_scheduled_{0};
// REQUIRE: mutex_ held
int unscheduled_gc_{0};
std::atomic_bool shuting_down_{false}; std::atomic_bool shuting_down_{false};
}; };
......
...@@ -14,11 +14,12 @@ void TitanDBImpl::MaybeScheduleGC() { ...@@ -14,11 +14,12 @@ void TitanDBImpl::MaybeScheduleGC() {
if (shuting_down_.load(std::memory_order_acquire)) return; if (shuting_down_.load(std::memory_order_acquire)) return;
if (bg_gc_scheduled_ >= db_options_.max_background_gc) return; while (unscheduled_gc_ > 0 &&
bg_gc_scheduled_ < db_options_.max_background_gc) {
bg_gc_scheduled_++; unscheduled_gc_--;
bg_gc_scheduled_++;
env_->Schedule(&TitanDBImpl::BGWorkGC, this, Env::Priority::BOTTOM, this); env_->Schedule(&TitanDBImpl::BGWorkGC, this, Env::Priority::BOTTOM, this);
}
} }
void TitanDBImpl::BGWorkGC(void* db) { void TitanDBImpl::BGWorkGC(void* db) {
...@@ -42,6 +43,7 @@ void TitanDBImpl::BackgroundCallGC() { ...@@ -42,6 +43,7 @@ void TitanDBImpl::BackgroundCallGC() {
} }
bg_gc_scheduled_--; bg_gc_scheduled_--;
MaybeScheduleGC();
if (bg_gc_scheduled_ == 0) { if (bg_gc_scheduled_ == 0) {
// signal if // signal if
// * bg_gc_scheduled_ == 0 -- need to wakeup ~TitanDBImpl // * bg_gc_scheduled_ == 0 -- need to wakeup ~TitanDBImpl
...@@ -100,6 +102,15 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) { ...@@ -100,6 +102,15 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) {
s = blob_gc_job.Finish(); s = blob_gc_job.Finish();
} }
blob_gc->ReleaseGcFiles(); blob_gc->ReleaseGcFiles();
if (blob_gc->trigger_next() &&
(bg_gc_scheduled_ - 1 + gc_queue_.size() <
2 * static_cast<uint32_t>(db_options_.max_background_gc))) {
// There is still data remained to be GCed
// and the queue is not overwhelmed
// then put this cf to GC queue for next GC
AddToGCQueue(blob_gc->column_family_handle()->GetID());
}
} }
if (s.ok()) { if (s.ok()) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment