Unverified Commit ffaa9d1a authored by yiwu-arbug's avatar yiwu-arbug Committed by GitHub

Reconstruct GC stats on reopen (#130)

Summary:
* On DB reopen, iterate all SST file's properties and sum up live data size of all blob files to use as GC stats.
* Remove previous logic to set GC mark on blob files on reopen.
* Refactor `OnFlushCompleted` and `OnCompactionCompleted`. Check file state is `kPendingLSM` before transit file state. 
* Refactor `BlobFileMeta`.

Test Plan:
Updated unit tests
Signed-off-by: 's avatarYi Wu <yiwu@pingcap.com>
parent 3b267dc5
...@@ -21,6 +21,7 @@ void BlobFileBuilder::Add(const BlobRecord& record, BlobHandle* handle) { ...@@ -21,6 +21,7 @@ void BlobFileBuilder::Add(const BlobRecord& record, BlobHandle* handle) {
encoder_.EncodeRecord(record); encoder_.EncodeRecord(record);
handle->offset = file_->GetFileSize(); handle->offset = file_->GetFileSize();
handle->size = encoder_.GetEncodedSize(); handle->size = encoder_.GetEncodedSize();
live_data_size_ += handle->size;
status_ = file_->Append(encoder_.GetHeader()); status_ = file_->Append(encoder_.GetHeader());
if (ok()) { if (ok()) {
......
...@@ -60,6 +60,8 @@ class BlobFileBuilder { ...@@ -60,6 +60,8 @@ class BlobFileBuilder {
const std::string& GetSmallestKey() { return smallest_key_; } const std::string& GetSmallestKey() { return smallest_key_; }
const std::string& GetLargestKey() { return largest_key_; } const std::string& GetLargestKey() { return largest_key_; }
uint64_t live_data_size() const { return live_data_size_; }
private: private:
bool ok() const { return status().ok(); } bool ok() const { return status().ok(); }
...@@ -69,9 +71,10 @@ class BlobFileBuilder { ...@@ -69,9 +71,10 @@ class BlobFileBuilder {
Status status_; Status status_;
BlobEncoder encoder_; BlobEncoder encoder_;
uint64_t num_entries_{0}; uint64_t num_entries_ = 0;
std::string smallest_key_; std::string smallest_key_;
std::string largest_key_; std::string largest_key_;
uint64_t live_data_size_ = 0;
}; };
} // namespace titandb } // namespace titandb
......
...@@ -94,12 +94,6 @@ Status BlobFileSet::Recover() { ...@@ -94,12 +94,6 @@ Status BlobFileSet::Recover() {
"Next blob file number is %" PRIu64 ".", next_file_number); "Next blob file number is %" PRIu64 ".", next_file_number);
} }
// Make sure perform gc on all files at the beginning
MarkAllFilesForGC();
for (auto& cf : column_families_) {
cf.second->ComputeGCScore();
}
auto new_manifest_file_number = NewFileNumber(); auto new_manifest_file_number = NewFileNumber();
s = OpenManifest(new_manifest_file_number); s = OpenManifest(new_manifest_file_number);
if (!s.ok()) return s; if (!s.ok()) return s;
......
...@@ -75,13 +75,6 @@ class BlobFileSet { ...@@ -75,13 +75,6 @@ class BlobFileSet {
void GetObsoleteFiles(std::vector<std::string>* obsolete_files, void GetObsoleteFiles(std::vector<std::string>* obsolete_files,
SequenceNumber oldest_sequence); SequenceNumber oldest_sequence);
// REQUIRES: mutex is held
void MarkAllFilesForGC() {
for (auto& cf : column_families_) {
cf.second->MarkAllFilesForGC();
}
}
// REQUIRES: mutex is held // REQUIRES: mutex is held
bool IsColumnFamilyObsolete(uint32_t cf_id) { bool IsColumnFamilyObsolete(uint32_t cf_id) {
return obsolete_columns_.count(cf_id) > 0; return obsolete_columns_.count(cf_id) > 0;
......
...@@ -233,12 +233,6 @@ void BlobFileMeta::FileStateTransit(const FileEvent& event) { ...@@ -233,12 +233,6 @@ void BlobFileMeta::FileStateTransit(const FileEvent& event) {
} }
} }
void BlobFileMeta::AddDiscardableSize(uint64_t _discardable_size) {
assert(_discardable_size < file_size_);
discardable_size_ += _discardable_size;
assert(discardable_size_ < file_size_);
}
TitanInternalStats::StatsType BlobFileMeta::GetDiscardableRatioLevel() const { TitanInternalStats::StatsType BlobFileMeta::GetDiscardableRatioLevel() const {
auto ratio = GetDiscardableRatio(); auto ratio = GetDiscardableRatio();
TitanInternalStats::StatsType type; TitanInternalStats::StatsType type;
...@@ -260,11 +254,6 @@ TitanInternalStats::StatsType BlobFileMeta::GetDiscardableRatioLevel() const { ...@@ -260,11 +254,6 @@ TitanInternalStats::StatsType BlobFileMeta::GetDiscardableRatioLevel() const {
return type; return type;
} }
double BlobFileMeta::GetDiscardableRatio() const {
return static_cast<double>(discardable_size_) /
static_cast<double>(file_size_);
}
void BlobFileHeader::EncodeTo(std::string* dst) const { void BlobFileHeader::EncodeTo(std::string* dst) const {
PutFixed32(dst, kHeaderMagicNumber); PutFixed32(dst, kHeaderMagicNumber);
PutFixed32(dst, version); PutFixed32(dst, version);
......
...@@ -171,7 +171,7 @@ struct BlobIndex { ...@@ -171,7 +171,7 @@ struct BlobIndex {
// //
class BlobFileMeta { class BlobFileMeta {
public: public:
enum class FileEvent { enum class FileEvent : int {
kInit, kInit,
kFlushCompleted, kFlushCompleted,
kCompactionCompleted, kCompactionCompleted,
...@@ -185,7 +185,7 @@ class BlobFileMeta { ...@@ -185,7 +185,7 @@ class BlobFileMeta {
kReset, // reset file to normal for test kReset, // reset file to normal for test
}; };
enum class FileState { enum class FileState : int {
kInit, // file never at this state kInit, // file never at this state
kNormal, kNormal,
kPendingLSM, // waiting keys adding to LSM kPendingLSM, // waiting keys adding to LSM
...@@ -216,6 +216,8 @@ class BlobFileMeta { ...@@ -216,6 +216,8 @@ class BlobFileMeta {
uint64_t file_number() const { return file_number_; } uint64_t file_number() const { return file_number_; }
uint64_t file_size() const { return file_size_; } uint64_t file_size() const { return file_size_; }
uint64_t live_data_size() const { return live_data_size_; }
void set_live_data_size(uint64_t size) { live_data_size_ = size; }
uint64_t file_entries() const { return file_entries_; } uint64_t file_entries() const { return file_entries_; }
uint32_t file_level() const { return file_level_; } uint32_t file_level() const { return file_level_; }
const std::string& smallest_key() const { return smallest_key_; } const std::string& smallest_key() const { return smallest_key_; }
...@@ -223,18 +225,22 @@ class BlobFileMeta { ...@@ -223,18 +225,22 @@ class BlobFileMeta {
FileState file_state() const { return state_; } FileState file_state() const { return state_; }
bool is_obsolete() const { return state_ == FileState::kObsolete; } bool is_obsolete() const { return state_ == FileState::kObsolete; }
uint64_t discardable_size() const { return discardable_size_; }
bool gc_mark() const { return gc_mark_; }
void set_gc_mark(bool mark) { gc_mark_ = mark; }
void FileStateTransit(const FileEvent& event); void FileStateTransit(const FileEvent& event);
void AddDiscardableSize(uint64_t _discardable_size); bool UpdateLiveDataSize(int64_t delta) {
double GetDiscardableRatio() const; int64_t result = static_cast<int64_t>(live_data_size_) + delta;
bool NoLiveData() { if (result < 0) {
return discardable_size_ == live_data_size_ = 0;
file_size_ - kBlobMaxHeaderSize - kBlobFooterSize; return false;
}
live_data_size_ = static_cast<uint64_t>(result);
return true;
}
bool NoLiveData() { return live_data_size_ == 0; }
double GetDiscardableRatio() const {
// TODO: Exclude metadata size from file size.
return 1 - (static_cast<double>(live_data_size_) / file_size_);
} }
TitanInternalStats::StatsType GetDiscardableRatioLevel() const; TitanInternalStats::StatsType GetDiscardableRatioLevel() const;
...@@ -242,6 +248,8 @@ class BlobFileMeta { ...@@ -242,6 +248,8 @@ class BlobFileMeta {
// Persistent field // Persistent field
uint64_t file_number_{0}; uint64_t file_number_{0};
uint64_t file_size_{0}; uint64_t file_size_{0};
// Size of data with reference from SST files.
uint64_t live_data_size_{0};
uint64_t file_entries_; uint64_t file_entries_;
// Target level of compaction/flush which generates this blob file // Target level of compaction/flush which generates this blob file
uint32_t file_level_; uint32_t file_level_;
...@@ -252,11 +260,6 @@ class BlobFileMeta { ...@@ -252,11 +260,6 @@ class BlobFileMeta {
// Not persistent field // Not persistent field
FileState state_{FileState::kInit}; FileState state_{FileState::kInit};
uint64_t discardable_size_{0};
// gc_mark is set to true when this file is recovered from re-opening the DB
// that means this file needs to be checked for GC
bool gc_mark_{false};
}; };
// Format of blob file header for version 1 (8 bytes): // Format of blob file header for version 1 (8 bytes):
......
...@@ -32,7 +32,6 @@ void BlobGC::MarkFilesBeingGC() { ...@@ -32,7 +32,6 @@ void BlobGC::MarkFilesBeingGC() {
void BlobGC::ReleaseGcFiles() { void BlobGC::ReleaseGcFiles() {
for (auto& f : inputs_) { for (auto& f : inputs_) {
f->set_gc_mark(false);
f->FileStateTransit(BlobFileMeta::FileEvent::kGCCompleted); f->FileStateTransit(BlobFileMeta::FileEvent::kGCCompleted);
} }
......
...@@ -23,12 +23,6 @@ class BlobGC { ...@@ -23,12 +23,6 @@ class BlobGC {
const std::vector<BlobFileMeta*>& inputs() { return inputs_; } const std::vector<BlobFileMeta*>& inputs() { return inputs_; }
void set_sampled_inputs(std::vector<BlobFileMeta*>&& files) {
sampled_inputs_ = std::move(files);
}
const std::vector<BlobFileMeta*>& sampled_inputs() { return sampled_inputs_; }
const TitanCFOptions& titan_cf_options() { return titan_cf_options_; } const TitanCFOptions& titan_cf_options() { return titan_cf_options_; }
void SetColumnFamily(ColumnFamilyHandle* cfh); void SetColumnFamily(ColumnFamilyHandle* cfh);
...@@ -47,7 +41,6 @@ class BlobGC { ...@@ -47,7 +41,6 @@ class BlobGC {
private: private:
std::vector<BlobFileMeta*> inputs_; std::vector<BlobFileMeta*> inputs_;
std::vector<BlobFileMeta*> sampled_inputs_;
std::vector<BlobFileMeta*> outputs_; std::vector<BlobFileMeta*> outputs_;
TitanCFOptions titan_cf_options_; TitanCFOptions titan_cf_options_;
ColumnFamilyHandle* cfh_{nullptr}; ColumnFamilyHandle* cfh_{nullptr};
......
...@@ -119,11 +119,6 @@ Status BlobGCJob::Prepare() { ...@@ -119,11 +119,6 @@ Status BlobGCJob::Prepare() {
} }
Status BlobGCJob::Run() { Status BlobGCJob::Run() {
Status s = SampleCandidateFiles();
if (!s.ok()) {
return s;
}
std::string tmp; std::string tmp;
for (const auto& f : blob_gc_->inputs()) { for (const auto& f : blob_gc_->inputs()) {
if (!tmp.empty()) { if (!tmp.empty()) {
...@@ -131,123 +126,12 @@ Status BlobGCJob::Run() { ...@@ -131,123 +126,12 @@ Status BlobGCJob::Run() {
} }
tmp.append(std::to_string(f->file_number())); tmp.append(std::to_string(f->file_number()));
} }
ROCKS_LOG_BUFFER(log_buffer_, "[%s] Titan GC candidates[%s]",
std::string tmp2;
for (const auto& f : blob_gc_->sampled_inputs()) {
if (!tmp2.empty()) {
tmp2.append(" ");
}
tmp2.append(std::to_string(f->file_number()));
}
ROCKS_LOG_BUFFER(log_buffer_, "[%s] Titan GC candidates[%s] selected[%s]",
blob_gc_->column_family_handle()->GetName().c_str(), blob_gc_->column_family_handle()->GetName().c_str(),
tmp.c_str(), tmp2.c_str()); tmp.c_str());
if (blob_gc_->sampled_inputs().empty()) {
return Status::OK();
}
return DoRunGC(); return DoRunGC();
} }
Status BlobGCJob::SampleCandidateFiles() {
TitanStopWatch sw(env_, metrics_.gc_sampling_micros);
std::vector<BlobFileMeta*> result;
for (const auto& file : blob_gc_->inputs()) {
bool selected = false;
Status s = DoSample(file, &selected);
if (!s.ok()) {
return s;
}
if (selected) {
result.push_back(file);
}
}
if (!result.empty()) {
blob_gc_->set_sampled_inputs(std::move(result));
}
return Status::OK();
}
Status BlobGCJob::DoSample(const BlobFileMeta* file, bool* selected) {
assert(selected != nullptr);
if (file->file_size() <=
blob_gc_->titan_cf_options().merge_small_file_threshold) {
metrics_.gc_small_file += 1;
*selected = true;
} else if (file->GetDiscardableRatio() >=
blob_gc_->titan_cf_options().blob_file_discardable_ratio) {
metrics_.gc_discardable += 1;
*selected = true;
}
if (*selected) return Status::OK();
// TODO: add do sample count metrics
// `records_size` won't be accurate if the file is version 1, but this method
// is planned to be removed soon.
auto records_size = file->file_size() - BlobFileHeader::kMaxEncodedLength -
BlobFileFooter::kEncodedLength;
Status s;
uint64_t sample_size_window = static_cast<uint64_t>(
records_size * blob_gc_->titan_cf_options().sample_file_size_ratio);
uint64_t sample_begin_offset = BlobFileHeader::kMaxEncodedLength;
if (records_size != sample_size_window) {
Random64 random64(records_size);
sample_begin_offset += random64.Uniform(records_size - sample_size_window);
}
std::unique_ptr<RandomAccessFileReader> file_reader;
const int readahead = 256 << 10;
s = NewBlobFileReader(file->file_number(), readahead, db_options_,
env_options_, env_, &file_reader);
if (!s.ok()) {
return s;
}
BlobFileIterator iter(std::move(file_reader), file->file_number(),
file->file_size(), blob_gc_->titan_cf_options());
iter.IterateForPrev(sample_begin_offset);
// TODO(@DorianZheng) sample_begin_offset maybe out of data block size, need
// more elegant solution
if (iter.status().IsInvalidArgument()) {
iter.IterateForPrev(BlobFileHeader::kMaxEncodedLength);
}
if (!iter.status().ok()) {
s = iter.status();
ROCKS_LOG_ERROR(db_options_.info_log,
"IterateForPrev failed, file number[%" PRIu64
"] size[%" PRIu64 "] status[%s]",
file->file_number(), file->file_size(),
s.ToString().c_str());
return s;
}
uint64_t iterated_size{0};
uint64_t discardable_size{0};
for (iter.Next();
iterated_size < sample_size_window && iter.status().ok() && iter.Valid();
iter.Next()) {
BlobIndex blob_index = iter.GetBlobIndex();
uint64_t total_length = blob_index.blob_handle.size;
iterated_size += total_length;
bool discardable = false;
s = DiscardEntry(iter.key(), blob_index, &discardable);
if (!s.ok()) {
return s;
}
if (discardable) {
discardable_size += total_length;
}
}
metrics_.bytes_read += iterated_size;
assert(iter.status().ok());
*selected =
discardable_size >=
std::ceil(sample_size_window *
blob_gc_->titan_cf_options().blob_file_discardable_ratio);
return s;
}
Status BlobGCJob::DoRunGC() { Status BlobGCJob::DoRunGC() {
Status s; Status s;
...@@ -382,7 +266,7 @@ Status BlobGCJob::DoRunGC() { ...@@ -382,7 +266,7 @@ Status BlobGCJob::DoRunGC() {
Status BlobGCJob::BuildIterator( Status BlobGCJob::BuildIterator(
std::unique_ptr<BlobFileMergeIterator>* result) { std::unique_ptr<BlobFileMergeIterator>* result) {
Status s; Status s;
const auto& inputs = blob_gc_->sampled_inputs(); const auto& inputs = blob_gc_->inputs();
assert(!inputs.empty()); assert(!inputs.empty());
std::vector<std::unique_ptr<BlobFileIterator>> list; std::vector<std::unique_ptr<BlobFileIterator>> list;
for (std::size_t i = 0; i < inputs.size(); ++i) { for (std::size_t i = 0; i < inputs.size(); ++i) {
...@@ -493,6 +377,7 @@ Status BlobGCJob::InstallOutputBlobFiles() { ...@@ -493,6 +377,7 @@ Status BlobGCJob::InstallOutputBlobFiles() {
builder.first->GetNumber(), builder.first->GetFile()->GetFileSize(), builder.first->GetNumber(), builder.first->GetFile()->GetFileSize(),
0, 0, builder.second->GetSmallestKey(), 0, 0, builder.second->GetSmallestKey(),
builder.second->GetLargestKey()); builder.second->GetLargestKey());
file->set_live_data_size(builder.second->live_data_size());
file->FileStateTransit(BlobFileMeta::FileEvent::kGCOutput); file->FileStateTransit(BlobFileMeta::FileEvent::kGCOutput);
RecordInHistogram(stats_, TitanStats::GC_OUTPUT_FILE_SIZE, RecordInHistogram(stats_, TitanStats::GC_OUTPUT_FILE_SIZE,
file->file_size()); file->file_size());
...@@ -593,7 +478,7 @@ Status BlobGCJob::DeleteInputBlobFiles() { ...@@ -593,7 +478,7 @@ Status BlobGCJob::DeleteInputBlobFiles() {
Status s; Status s;
VersionEdit edit; VersionEdit edit;
edit.SetColumnFamilyID(blob_gc_->column_family_handle()->GetID()); edit.SetColumnFamilyID(blob_gc_->column_family_handle()->GetID());
for (const auto& file : blob_gc_->sampled_inputs()) { for (const auto& file : blob_gc_->inputs()) {
ROCKS_LOG_INFO(db_options_.info_log, ROCKS_LOG_INFO(db_options_.info_log,
"Titan add obsolete file [%" PRIu64 "] range [%s, %s]", "Titan add obsolete file [%" PRIu64 "] range [%s, %s]",
file->file_number(), file->file_number(),
......
...@@ -85,8 +85,6 @@ class BlobGCJob { ...@@ -85,8 +85,6 @@ class BlobGCJob {
uint64_t io_bytes_read_ = 0; uint64_t io_bytes_read_ = 0;
uint64_t io_bytes_written_ = 0; uint64_t io_bytes_written_ = 0;
Status SampleCandidateFiles();
Status DoSample(const BlobFileMeta* file, bool* selected);
Status DoRunGC(); Status DoRunGC();
Status BuildIterator(std::unique_ptr<BlobFileMergeIterator>* result); Status BuildIterator(std::unique_ptr<BlobFileMergeIterator>* result);
Status DiscardEntry(const Slice& key, const BlobIndex& blob_index, Status DiscardEntry(const Slice& key, const BlobIndex& blob_index,
......
...@@ -126,7 +126,8 @@ class BlobGCJobTest : public testing::Test { ...@@ -126,7 +126,8 @@ class BlobGCJobTest : public testing::Test {
db_ = nullptr; db_ = nullptr;
} }
void RunGC(bool expected, bool disable_merge_small = false) { // TODO: unifiy this and TitanDBImpl::TEST_StartGC
void RunGC(bool expect_gc, bool disable_merge_small = false) {
MutexLock l(mutex_); MutexLock l(mutex_);
Status s; Status s;
auto* cfh = base_db_->DefaultColumnFamily(); auto* cfh = base_db_->DefaultColumnFamily();
...@@ -150,7 +151,7 @@ class BlobGCJobTest : public testing::Test { ...@@ -150,7 +151,7 @@ class BlobGCJobTest : public testing::Test {
blob_file_set_->GetBlobStorage(cfh->GetID()).lock().get()); blob_file_set_->GetBlobStorage(cfh->GetID()).lock().get());
} }
ASSERT_TRUE((blob_gc != nullptr) == expected); ASSERT_TRUE((blob_gc != nullptr) == expect_gc);
if (blob_gc) { if (blob_gc) {
blob_gc->SetColumnFamily(cfh); blob_gc->SetColumnFamily(cfh);
...@@ -166,7 +167,7 @@ class BlobGCJobTest : public testing::Test { ...@@ -166,7 +167,7 @@ class BlobGCJobTest : public testing::Test {
{ {
mutex_->Unlock(); mutex_->Unlock();
s = blob_gc_job.Run(); s = blob_gc_job.Run();
if (expected) { if (expect_gc) {
ASSERT_OK(s); ASSERT_OK(s);
} }
mutex_->Lock(); mutex_->Lock();
...@@ -378,22 +379,25 @@ TEST_F(BlobGCJobTest, GCLimiter) { ...@@ -378,22 +379,25 @@ TEST_F(BlobGCJobTest, GCLimiter) {
TEST_F(BlobGCJobTest, Reopen) { TEST_F(BlobGCJobTest, Reopen) {
DisableMergeSmall(); DisableMergeSmall();
NewDB(); NewDB();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
db_->Put(WriteOptions(), GenKey(i), GenValue(i)); ASSERT_OK(db_->Put(WriteOptions(), GenKey(i), GenValue(i)));
} }
Flush(); Flush();
CheckBlobNumber(1); CheckBlobNumber(1);
Reopen(); Reopen();
RunGC(false /*expect_gc*/, true /*disable_merge_small*/);
RunGC(true, true); CheckBlobNumber(1);
for (int i = 0; i < 5; i++) {
ASSERT_OK(db_->Delete(WriteOptions(), GenKey(i)));
}
Flush();
CompactAll();
CheckBlobNumber(1); CheckBlobNumber(1);
// trigger compute gc score // Should recover GC stats after reopen.
ReComputeGCScore(); Reopen();
RunGC(true /*expect_gc*/, true /*dissable_merge_small*/);
RunGC(false, true);
CheckBlobNumber(1); CheckBlobNumber(1);
} }
...@@ -812,8 +816,9 @@ TEST_F(BlobGCJobTest, RangeMerge) { ...@@ -812,8 +816,9 @@ TEST_F(BlobGCJobTest, RangeMerge) {
// after last level compaction, marked blob files are merged to new blob // after last level compaction, marked blob files are merged to new blob
// files and obsoleted. // files and obsoleted.
for (int i = 2; i < 12; i++) { for (int i = 2; i < 12; i++) {
auto blob = b->FindFile(i).lock(); auto file = b->FindFile(i).lock();
ASSERT_EQ(blob->file_state(), BlobFileMeta::FileState::kObsolete); ASSERT_TRUE(file->NoLiveData());
ASSERT_EQ(file->file_state(), BlobFileMeta::FileState::kObsolete);
} }
} }
} // namespace titandb } // namespace titandb
......
...@@ -42,8 +42,7 @@ std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC( ...@@ -42,8 +42,7 @@ std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC(
if (!stop_picking) { if (!stop_picking) {
blob_files.push_back(blob_file.get()); blob_files.push_back(blob_file.get());
batch_size += blob_file->file_size(); batch_size += blob_file->file_size();
estimate_output_size += estimate_output_size += blob_file->live_data_size();
(blob_file->file_size() - blob_file->discardable_size());
if (batch_size >= cf_options_.max_gc_batch_size || if (batch_size >= cf_options_.max_gc_batch_size ||
estimate_output_size >= cf_options_.blob_file_target_size) { estimate_output_size >= cf_options_.blob_file_target_size) {
// Stop pick file for this gc, but still check file for whether need // Stop pick file for this gc, but still check file for whether need
...@@ -73,7 +72,6 @@ std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC( ...@@ -73,7 +72,6 @@ std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC(
// if there is only one small file to merge, no need to perform // if there is only one small file to merge, no need to perform
if (blob_files.size() == 1 && if (blob_files.size() == 1 &&
blob_files[0]->file_size() <= cf_options_.merge_small_file_threshold && blob_files[0]->file_size() <= cf_options_.merge_small_file_threshold &&
blob_files[0]->gc_mark() == false &&
blob_files[0]->GetDiscardableRatio() < blob_files[0]->GetDiscardableRatio() <
cf_options_.blob_file_discardable_ratio) { cf_options_.blob_file_discardable_ratio) {
return nullptr; return nullptr;
......
...@@ -33,7 +33,7 @@ class BlobGCPickerTest : public testing::Test { ...@@ -33,7 +33,7 @@ class BlobGCPickerTest : public testing::Test {
uint64_t discardable_size, bool being_gc = false) { uint64_t discardable_size, bool being_gc = false) {
auto f = auto f =
std::make_shared<BlobFileMeta>(file_number, file_size, 0, 0, "", ""); std::make_shared<BlobFileMeta>(file_number, file_size, 0, 0, "", "");
f->AddDiscardableSize(discardable_size); f->set_live_data_size(file_size - discardable_size);
f->FileStateTransit(BlobFileMeta::FileEvent::kDbRestart); f->FileStateTransit(BlobFileMeta::FileEvent::kDbRestart);
if (being_gc) { if (being_gc) {
f->FileStateTransit(BlobFileMeta::FileEvent::kGCBegin); f->FileStateTransit(BlobFileMeta::FileEvent::kGCBegin);
......
...@@ -114,7 +114,7 @@ void BlobStorage::MarkFileObsoleteLocked(std::shared_ptr<BlobFileMeta> file, ...@@ -114,7 +114,7 @@ void BlobStorage::MarkFileObsoleteLocked(std::shared_ptr<BlobFileMeta> file,
std::make_pair(file->file_number(), obsolete_sequence)); std::make_pair(file->file_number(), obsolete_sequence));
file->FileStateTransit(BlobFileMeta::FileEvent::kDelete); file->FileStateTransit(BlobFileMeta::FileEvent::kDelete);
SubStats(stats_, cf_id_, TitanInternalStats::LIVE_BLOB_SIZE, SubStats(stats_, cf_id_, TitanInternalStats::LIVE_BLOB_SIZE,
file->file_size() - file->discardable_size()); file->live_data_size());
SubStats(stats_, cf_id_, TitanInternalStats::LIVE_BLOB_FILE_SIZE, SubStats(stats_, cf_id_, TitanInternalStats::LIVE_BLOB_FILE_SIZE,
file->file_size()); file->file_size());
SubStats(stats_, cf_id_, TitanInternalStats::NUM_LIVE_BLOB_FILE, 1); SubStats(stats_, cf_id_, TitanInternalStats::NUM_LIVE_BLOB_FILE, 1);
...@@ -188,8 +188,7 @@ void BlobStorage::ComputeGCScore() { ...@@ -188,8 +188,7 @@ void BlobStorage::ComputeGCScore() {
gc_score_.push_back({}); gc_score_.push_back({});
auto& gcs = gc_score_.back(); auto& gcs = gc_score_.back();
gcs.file_number = file.first; gcs.file_number = file.first;
if (file.second->file_size() < cf_options_.merge_small_file_threshold || if (file.second->file_size() < cf_options_.merge_small_file_threshold) {
file.second->gc_mark()) {
// for the small file or file with gc mark (usually the file that just // for the small file or file with gc mark (usually the file that just
// recovered) we want gc these file but more hope to gc other file with // recovered) we want gc these file but more hope to gc other file with
// more invalid data // more invalid data
......
...@@ -69,15 +69,14 @@ class BlobStorage { ...@@ -69,15 +69,14 @@ class BlobStorage {
// corruption if the file doesn't exist. // corruption if the file doesn't exist.
std::weak_ptr<BlobFileMeta> FindFile(uint64_t file_number) const; std::weak_ptr<BlobFileMeta> FindFile(uint64_t file_number) const;
// Marks all the blob files so that they can be picked by GC job. // Must call before TitanDBImpl initialized.
void MarkAllFilesForGC() { void InitializeAllFiles() {
MutexLock l(&mutex_);
for (auto& file : files_) { for (auto& file : files_) {
file.second->set_gc_mark(true);
file.second->FileStateTransit(BlobFileMeta::FileEvent::kDbRestart); file.second->FileStateTransit(BlobFileMeta::FileEvent::kDbRestart);
auto level = file.second->GetDiscardableRatioLevel(); auto level = file.second->GetDiscardableRatioLevel();
AddStats(stats_, cf_id_, level, 1); AddStats(stats_, cf_id_, level, 1);
} }
ComputeGCScore();
} }
// The corresponding column family is dropped, so mark destroyed and we can // The corresponding column family is dropped, so mark destroyed and we can
......
...@@ -307,7 +307,8 @@ Status TitanDBImpl::OpenImpl(const std::vector<TitanCFDescriptor>& descs, ...@@ -307,7 +307,8 @@ Status TitanDBImpl::OpenImpl(const std::vector<TitanCFDescriptor>& descs,
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
TEST_SYNC_POINT_CALLBACK("TitanDBImpl::OpenImpl:BeforeInitialized", db_); s = InitializeGC(*handles);
TEST_SYNC_POINT_CALLBACK("TitanDBImpl::OpenImpl:BeforeInitialized", this);
// Initialization done. // Initialization done.
initialized_ = true; initialized_ = true;
// Enable compaction and background tasks after initilization. // Enable compaction and background tasks after initilization.
...@@ -806,30 +807,16 @@ Status TitanDBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family, ...@@ -806,30 +807,16 @@ Status TitanDBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
} }
auto cf_id = column_family->GetID(); auto cf_id = column_family->GetID();
std::map<uint64_t, uint64_t> blob_files_discardable_size; std::map<uint64_t, int64_t> blob_file_size_diff;
for (auto& collection : props) { for (auto& prop : props) {
auto& prop = collection.second; Status gc_stats_status = ExtractGCStatsFromTableProperty(
auto ucp_iter = prop->user_collected_properties.find( prop.second, false /*to_add*/, &blob_file_size_diff);
BlobFileSizeCollector::kPropertiesName); if (!gc_stats_status.ok()) {
// this sst file doesn't contain any blob index
if (ucp_iter == prop->user_collected_properties.end()) {
continue;
}
std::map<uint64_t, uint64_t> sst_blob_files_size;
std::string str = ucp_iter->second;
Slice slice{str};
if (!BlobFileSizeCollector::Decode(&slice, &sst_blob_files_size)) {
// TODO: Should treat it as background error and make DB read-only. // TODO: Should treat it as background error and make DB read-only.
ROCKS_LOG_ERROR(db_options_.info_log, ROCKS_LOG_ERROR(db_options_.info_log,
"failed to decode table property, " "failed to extract GC stats, file: %s, error: %s",
"deleted file: %s, property size: %" ROCKSDB_PRIszt ".", prop.first.c_str(), gc_stats_status.ToString().c_str());
collection.first.c_str(), str.size());
assert(false); assert(false);
continue;
}
for (auto& it : sst_blob_files_size) {
blob_files_discardable_size[it.first] += it.second;
} }
} }
...@@ -857,24 +844,31 @@ Status TitanDBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family, ...@@ -857,24 +844,31 @@ Status TitanDBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
" not Found."); " not Found.");
} }
uint64_t delta = 0;
VersionEdit edit; VersionEdit edit;
auto cf_options = bs->cf_options(); auto cf_options = bs->cf_options();
for (const auto& bfs : blob_files_discardable_size) { for (const auto& file_size : blob_file_size_diff) {
auto file = bs->FindFile(bfs.first).lock(); uint64_t file_number = file_size.first;
int64_t delta = file_size.second;
auto file = bs->FindFile(file_number).lock();
if (!file) { if (!file) {
// file has been gc out // file has been gc out
continue; continue;
} }
if (!file->is_obsolete()) { if (!file->is_obsolete()) {
delta += bfs.second; auto before = file->GetDiscardableRatioLevel();
} bool ok = file->UpdateLiveDataSize(delta);
auto before = file->GetDiscardableRatioLevel(); if (!ok) {
file->AddDiscardableSize(static_cast<uint64_t>(bfs.second)); ROCKS_LOG_WARN(db_options_.info_log,
auto after = file->GetDiscardableRatioLevel(); "During DeleteFilesInRanges: blob file %" PRIu64
if (before != after) { " live size below zero.",
AddStats(stats_.get(), cf_id, after, 1); file_number);
SubStats(stats_.get(), cf_id, before, 1); assert(false);
}
auto after = file->GetDiscardableRatioLevel();
if (before != after) {
AddStats(stats_.get(), cf_id, after, 1);
SubStats(stats_.get(), cf_id, before, 1);
}
} }
if (cf_options.level_merge) { if (cf_options.level_merge) {
if (file->NoLiveData()) { if (file->NoLiveData()) {
...@@ -885,8 +879,8 @@ Status TitanDBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family, ...@@ -885,8 +879,8 @@ Status TitanDBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
file->FileStateTransit(BlobFileMeta::FileEvent::kNeedMerge); file->FileStateTransit(BlobFileMeta::FileEvent::kNeedMerge);
} }
} }
SubStats(stats_.get(), cf_id, TitanInternalStats::LIVE_BLOB_SIZE, -delta);
} }
SubStats(stats_.get(), cf_id, TitanInternalStats::LIVE_BLOB_SIZE, delta);
if (cf_options.level_merge) { if (cf_options.level_merge) {
blob_file_set_->LogAndApply(edit); blob_file_set_->LogAndApply(edit);
} else { } else {
...@@ -1065,28 +1059,16 @@ void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) { ...@@ -1065,28 +1059,16 @@ void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) {
assert(false); assert(false);
return; return;
} }
const auto& tps = flush_job_info.table_properties; std::map<uint64_t, int64_t> blob_file_size_diff;
auto ucp_iter = tps.user_collected_properties.find( Status s = ExtractGCStatsFromTableProperty(
BlobFileSizeCollector::kPropertiesName); flush_job_info.table_properties, true /*to_add*/, &blob_file_size_diff);
// sst file doesn't contain any blob index if (!s.ok()) {
if (ucp_iter == tps.user_collected_properties.end()) {
return;
}
std::map<uint64_t, uint64_t> blob_files_size;
Slice src{ucp_iter->second};
if (!BlobFileSizeCollector::Decode(&src, &blob_files_size)) {
// TODO: Should treat it as background error and make DB read-only. // TODO: Should treat it as background error and make DB read-only.
ROCKS_LOG_ERROR(db_options_.info_log, ROCKS_LOG_ERROR(db_options_.info_log,
"OnFlushCompleted[%d]: failed to decode table property, " "OnFlushCompleted[%d]: failed to extract GC stats: %s",
"property size: %" ROCKSDB_PRIszt ".", flush_job_info.job_id, s.ToString().c_str());
flush_job_info.job_id, ucp_iter->second.size());
assert(false); assert(false);
} }
assert(!blob_files_size.empty());
std::set<uint64_t> outputs;
for (const auto f : blob_files_size) {
outputs.insert(f.first);
}
{ {
MutexLock l(&mutex_); MutexLock l(&mutex_);
...@@ -1101,16 +1083,39 @@ void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) { ...@@ -1101,16 +1083,39 @@ void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) {
assert(false); assert(false);
return; return;
} }
for (const auto& file_number : outputs) { for (const auto& file_diff : blob_file_size_diff) {
uint64_t file_number = file_diff.first;
int64_t delta = file_diff.second;
auto file = blob_storage->FindFile(file_number).lock(); auto file = blob_storage->FindFile(file_number).lock();
// This file maybe output of a gc job, and it's been GCed out. // This file may be output of a GC job, and it's been GCed out.
if (!file) { if (file == nullptr) {
continue; continue;
} }
ROCKS_LOG_INFO(db_options_.info_log, if (file->file_state() != BlobFileMeta::FileState::kPendingLSM) {
"OnFlushCompleted[%d]: output blob file %" PRIu64 ".", // This file may be output of a GC job.
flush_job_info.job_id, file->file_number()); ROCKS_LOG_INFO(db_options_.info_log,
"OnFlushCompleted[%d]: ignore GC output file %" PRIu64
".",
flush_job_info.job_id, file->file_number());
continue;
}
if (delta < 0) {
// Cannot happen..
ROCKS_LOG_WARN(db_options_.info_log,
"OnFlushCompleted[%d]: New blob file %" PRIu64
" live size being negative",
flush_job_info.job_id, file_number);
assert(false);
delta = 0;
}
file->set_live_data_size(static_cast<uint64_t>(delta));
file->FileStateTransit(BlobFileMeta::FileEvent::kFlushCompleted); file->FileStateTransit(BlobFileMeta::FileEvent::kFlushCompleted);
ROCKS_LOG_INFO(db_options_.info_log,
"OnFlushCompleted[%d]: output blob file %" PRIu64
","
" live data size %" PRIu64 ".",
flush_job_info.job_id, file->file_number(),
file->live_data_size());
} }
} }
TEST_SYNC_POINT("TitanDBImpl::OnFlushCompleted:Finished"); TEST_SYNC_POINT("TitanDBImpl::OnFlushCompleted:Finished");
...@@ -1127,62 +1132,35 @@ void TitanDBImpl::OnCompactionCompleted( ...@@ -1127,62 +1132,35 @@ void TitanDBImpl::OnCompactionCompleted(
// TODO: Clean up blob file generated by the failed compaction. // TODO: Clean up blob file generated by the failed compaction.
return; return;
} }
std::map<uint64_t, int64_t> blob_files_size_diff; std::map<uint64_t, int64_t> blob_file_size_diff;
std::set<uint64_t> outputs; const TablePropertiesCollection& prop_collection =
std::set<uint64_t> inputs; compaction_job_info.table_properties;
auto calc_bfs = [&](const std::vector<std::string>& files, int coefficient, auto update_diff = [&](const std::vector<std::string>& files, bool to_add) {
bool output) { for (const auto& file_name : files) {
for (const auto& file : files) { auto prop_iter = prop_collection.find(file_name);
auto tp_iter = compaction_job_info.table_properties.find(file); if (prop_iter == prop_collection.end()) {
if (tp_iter == compaction_job_info.table_properties.end()) { ROCKS_LOG_WARN(
if (output) { db_options_.info_log,
ROCKS_LOG_WARN( "OnCompactionCompleted[%d]: No table properties for file %s.",
db_options_.info_log, compaction_job_info.job_id, file_name.c_str());
"OnCompactionCompleted[%d]: No table properties for file %s.",
compaction_job_info.job_id, file.c_str());
}
continue;
}
auto ucp_iter = tp_iter->second->user_collected_properties.find(
BlobFileSizeCollector::kPropertiesName);
// this sst file doesn't contain any blob index
if (ucp_iter == tp_iter->second->user_collected_properties.end()) {
continue; continue;
} }
std::map<uint64_t, uint64_t> input_blob_files_size; Status gc_stats_status = ExtractGCStatsFromTableProperty(
std::string s = ucp_iter->second; prop_iter->second, to_add, &blob_file_size_diff);
Slice slice{s}; if (!gc_stats_status.ok()) {
if (!BlobFileSizeCollector::Decode(&slice, &input_blob_files_size)) {
// TODO: Should treat it as background error and make DB read-only. // TODO: Should treat it as background error and make DB read-only.
ROCKS_LOG_ERROR( ROCKS_LOG_ERROR(
db_options_.info_log, db_options_.info_log,
"OnCompactionCompleted[%d]: failed to decode table property, " "OnCompactionCompleted[%d]: failed to extract GC stats from table "
"compaction file: %s, property size: %" ROCKSDB_PRIszt ".", "property: compaction file: %s, error: %s",
compaction_job_info.job_id, file.c_str(), s.size()); compaction_job_info.job_id, file_name.c_str(),
gc_stats_status.ToString().c_str());
assert(false); assert(false);
continue;
}
for (const auto& input_bfs : input_blob_files_size) {
if (output) {
if (inputs.find(input_bfs.first) == inputs.end()) {
outputs.insert(input_bfs.first);
}
} else {
inputs.insert(input_bfs.first);
}
auto bfs_iter = blob_files_size_diff.find(input_bfs.first);
if (bfs_iter == blob_files_size_diff.end()) {
blob_files_size_diff[input_bfs.first] =
coefficient * input_bfs.second;
} else {
bfs_iter->second += coefficient * input_bfs.second;
}
} }
} }
}; };
update_diff(compaction_job_info.input_files, false /*to_add*/);
calc_bfs(compaction_job_info.input_files, -1, false); update_diff(compaction_job_info.output_files, true /*to_add*/);
calc_bfs(compaction_job_info.output_files, 1, true);
{ {
MutexLock l(&mutex_); MutexLock l(&mutex_);
...@@ -1195,87 +1173,93 @@ void TitanDBImpl::OnCompactionCompleted( ...@@ -1195,87 +1173,93 @@ void TitanDBImpl::OnCompactionCompleted(
compaction_job_info.job_id, compaction_job_info.cf_id); compaction_job_info.job_id, compaction_job_info.cf_id);
return; return;
} }
for (const auto& file_number : outputs) {
auto file = bs->FindFile(file_number).lock();
if (!file) {
// TODO: Should treat it as background error and make DB read-only.
ROCKS_LOG_ERROR(
db_options_.info_log,
"OnCompactionCompleted[%d]: Failed to get file %" PRIu64,
compaction_job_info.job_id, file_number);
assert(false);
return;
}
ROCKS_LOG_INFO(
db_options_.info_log,
"OnCompactionCompleted[%d]: compaction output blob file %" PRIu64 ".",
compaction_job_info.job_id, file->file_number());
file->FileStateTransit(BlobFileMeta::FileEvent::kCompactionCompleted);
}
uint64_t delta = 0;
VersionEdit edit; VersionEdit edit;
auto cf_options = bs->cf_options(); auto cf_options = bs->cf_options();
std::vector<std::shared_ptr<BlobFileMeta>> files; std::vector<std::shared_ptr<BlobFileMeta>> to_merge_candidates;
bool count_sorted_run = bool count_sorted_run =
cf_options.level_merge && cf_options.range_merge && cf_options.level_merge && cf_options.range_merge &&
cf_options.num_levels - 1 == compaction_job_info.output_level; cf_options.num_levels - 1 == compaction_job_info.output_level;
for (const auto& bfs : blob_files_size_diff) { for (const auto& file_diff : blob_file_size_diff) {
// blob file size < 0 means discardable size > 0 uint64_t file_number = file_diff.first;
if (bfs.second >= 0) { int64_t delta = file_diff.second;
if (count_sorted_run) { std::shared_ptr<BlobFileMeta> file = bs->FindFile(file_number).lock();
auto file = bs->FindFile(bfs.first).lock(); if (file == nullptr || file->is_obsolete()) {
if (file != nullptr) { // File has been GC out.
files.emplace_back(std::move(file));
}
}
continue; continue;
} }
auto file = bs->FindFile(bfs.first).lock(); if (file->file_state() == BlobFileMeta::FileState::kPendingLSM) {
if (!file) { if (delta < 0) {
// file has been gc out // Cannot happen..
continue; ROCKS_LOG_WARN(db_options_.info_log,
} "OnCompactionCompleted[%d]: New blob file %" PRIu64
if (!file->is_obsolete()) { " live size being negative",
delta += -bfs.second; compaction_job_info.job_id, file_number);
} assert(false);
auto before = file->GetDiscardableRatioLevel(); delta = 0;
file->AddDiscardableSize(static_cast<uint64_t>(-bfs.second)); }
auto after = file->GetDiscardableRatioLevel(); file->set_live_data_size(static_cast<uint64_t>(delta));
if (before != after) { file->FileStateTransit(BlobFileMeta::FileEvent::kCompactionCompleted);
AddStats(stats_.get(), compaction_job_info.cf_id, after, 1); to_merge_candidates.push_back(file);
ROCKS_LOG_INFO(
db_options_.info_log,
"OnCompactionCompleted[%d]: compaction output blob file %" PRIu64
", live data size %" PRIu64 ".",
compaction_job_info.job_id, file->file_number(),
file->live_data_size());
} else if (file->file_state() == BlobFileMeta::FileState::kNormal ||
file->file_state() == BlobFileMeta::FileState::kToMerge) {
if (delta > 0) {
assert(false);
ROCKS_LOG_WARN(db_options_.info_log,
"OnCompactionCompleted[%d]: Blob file %" PRIu64
" live size increase after compaction.",
compaction_job_info.job_id, file_number);
}
auto before = file->GetDiscardableRatioLevel();
SubStats(stats_.get(), compaction_job_info.cf_id, before, 1); SubStats(stats_.get(), compaction_job_info.cf_id, before, 1);
} bool ok = file->UpdateLiveDataSize(delta);
if (cf_options.level_merge) { if (!ok) {
// After level merge, most entries of merged blob files are written to ROCKS_LOG_WARN(db_options_.info_log,
// new blob files. Delete blob files which have no live data. "OnCompactionCompleted[%d]: Blob file %" PRIu64
// Mark last two level blob files to merge in next compaction if " live size below zero.",
// discardable size reached GC threshold compaction_job_info.job_id, file_number);
if (file->NoLiveData()) { assert(false);
edit.DeleteBlobFile(file->file_number(),
db_impl_->GetLatestSequenceNumber());
continue;
} else if (static_cast<int>(file->file_level()) >=
cf_options.num_levels - 2 &&
file->GetDiscardableRatio() >
cf_options.blob_file_discardable_ratio) {
file->FileStateTransit(BlobFileMeta::FileEvent::kNeedMerge);
} }
if (count_sorted_run) { SubStats(stats_.get(), compaction_job_info.cf_id,
files.emplace_back(std::move(file)); TitanInternalStats::LIVE_BLOB_SIZE, delta);
if (cf_options.level_merge) {
// After level merge, most entries of merged blob files are written to
// new blob files. Delete blob files which have no live data.
// Mark last two level blob files to merge in next compaction if
// discardable size reached GC threshold
if (file->NoLiveData()) {
edit.DeleteBlobFile(file->file_number(),
db_impl_->GetLatestSequenceNumber());
} else if (static_cast<int>(file->file_level()) >=
cf_options.num_levels - 2 &&
file->GetDiscardableRatio() >
cf_options.blob_file_discardable_ratio) {
file->FileStateTransit(BlobFileMeta::FileEvent::kNeedMerge);
} else {
if (count_sorted_run) {
to_merge_candidates.push_back(file);
}
}
} }
} }
if (file->file_state() == BlobFileMeta::FileState::kNormal ||
file->file_state() == BlobFileMeta::FileState::kToMerge) {
auto after = file->GetDiscardableRatioLevel();
AddStats(stats_.get(), compaction_job_info.cf_id, after, 1);
}
} }
SubStats(stats_.get(), compaction_job_info.cf_id,
TitanInternalStats::LIVE_BLOB_SIZE, delta);
// If level merge is enabled, blob files will be deleted by live // If level merge is enabled, blob files will be deleted by live
// data based GC, so we don't need to trigger regular GC anymore // data based GC, so we don't need to trigger regular GC anymore
if (cf_options.level_merge) { if (cf_options.level_merge) {
blob_file_set_->LogAndApply(edit); blob_file_set_->LogAndApply(edit);
MarkFileIfNeedMerge(files, cf_options.max_sorted_runs); MarkFileIfNeedMerge(to_merge_candidates, cf_options.max_sorted_runs);
} else { } else {
bs->ComputeGCScore(); bs->ComputeGCScore();
AddToGCQueue(compaction_job_info.cf_id); AddToGCQueue(compaction_job_info.cf_id);
MaybeScheduleGC(); MaybeScheduleGC();
} }
......
...@@ -140,6 +140,8 @@ class TitanDBImpl : public TitanDB { ...@@ -140,6 +140,8 @@ class TitanDBImpl : public TitanDB {
void TEST_set_initialized(bool _initialized) { initialized_ = _initialized; } void TEST_set_initialized(bool _initialized) { initialized_ = _initialized; }
Status TEST_StartGC(uint32_t column_family_id); Status TEST_StartGC(uint32_t column_family_id);
void TEST_WaitForBackgroundGC();
Status TEST_PurgeObsoleteFiles(); Status TEST_PurgeObsoleteFiles();
int TEST_bg_gc_running() { int TEST_bg_gc_running() {
...@@ -180,6 +182,16 @@ class TitanDBImpl : public TitanDB { ...@@ -180,6 +182,16 @@ class TitanDBImpl : public TitanDB {
ColumnFamilyHandle* handle, ColumnFamilyHandle* handle,
std::shared_ptr<ManagedSnapshot> snapshot); std::shared_ptr<ManagedSnapshot> snapshot);
Status InitializeGC(const std::vector<ColumnFamilyHandle*>& cf_handles);
Status ExtractGCStatsFromTableProperty(
const std::shared_ptr<const TableProperties>& table_properties,
bool to_add, std::map<uint64_t, int64_t>* blob_file_size_diff);
Status ExtractGCStatsFromTableProperty(
const TableProperties& table_properties, bool to_add,
std::map<uint64_t, int64_t>* blob_file_size_diff);
// REQUIRE: mutex_ held // REQUIRE: mutex_ held
void AddToGCQueue(uint32_t column_family_id) { void AddToGCQueue(uint32_t column_family_id) {
mutex_.AssertHeld(); mutex_.AssertHeld();
......
...@@ -3,12 +3,101 @@ ...@@ -3,12 +3,101 @@
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "blob_file_iterator.h" #include "blob_file_iterator.h"
#include "blob_file_size_collector.h"
#include "blob_gc_job.h" #include "blob_gc_job.h"
#include "blob_gc_picker.h" #include "blob_gc_picker.h"
#include "util.h"
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
Status TitanDBImpl::ExtractGCStatsFromTableProperty(
const std::shared_ptr<const TableProperties>& table_properties, bool to_add,
std::map<uint64_t, int64_t>* blob_file_size_diff) {
assert(blob_file_size_diff != nullptr);
if (table_properties == nullptr) {
// No table property found. File may not contain blob indices.
return Status::OK();
}
return ExtractGCStatsFromTableProperty(*table_properties.get(), to_add,
blob_file_size_diff);
}
Status TitanDBImpl::ExtractGCStatsFromTableProperty(
const TableProperties& table_properties, bool to_add,
std::map<uint64_t, int64_t>* blob_file_size_diff) {
assert(blob_file_size_diff != nullptr);
auto& prop = table_properties.user_collected_properties;
auto prop_iter = prop.find(BlobFileSizeCollector::kPropertiesName);
if (prop_iter == prop.end()) {
// No table property found. File may not contain blob indices.
return Status::OK();
}
Slice prop_slice(prop_iter->second);
std::map<uint64_t, uint64_t> blob_file_sizes;
if (!BlobFileSizeCollector::Decode(&prop_slice, &blob_file_sizes)) {
return Status::Corruption("Failed to decode blob file size property.");
}
for (const auto& blob_file_size : blob_file_sizes) {
uint64_t file_number = blob_file_size.first;
int64_t diff = static_cast<int64_t>(blob_file_size.second);
if (!to_add) {
diff = -diff;
}
(*blob_file_size_diff)[file_number] += diff;
}
return Status::OK();
}
Status TitanDBImpl::InitializeGC(
const std::vector<ColumnFamilyHandle*>& cf_handles) {
assert(!initialized());
Status s;
FlushOptions flush_opts;
flush_opts.wait = true;
for (ColumnFamilyHandle* cf_handle : cf_handles) {
// Flush memtable to make sure keys written by GC are all in SSTs.
s = Flush(flush_opts, cf_handle);
if (!s.ok()) {
return s;
}
TablePropertiesCollection collection;
s = GetPropertiesOfAllTables(cf_handle, &collection);
if (!s.ok()) {
return s;
}
std::map<uint64_t, int64_t> blob_file_size_diff;
for (auto& file : collection) {
s = ExtractGCStatsFromTableProperty(file.second, true /*to_add*/,
&blob_file_size_diff);
if (!s.ok()) {
return s;
}
}
std::shared_ptr<BlobStorage> blob_storage =
blob_file_set_->GetBlobStorage(cf_handle->GetID()).lock();
assert(blob_storage != nullptr);
for (auto& file_size : blob_file_size_diff) {
assert(file_size.second >= 0);
std::shared_ptr<BlobFileMeta> file =
blob_storage->FindFile(file_size.first).lock();
if (file != nullptr) {
assert(file->live_data_size() == 0);
file->set_live_data_size(static_cast<uint64_t>(file_size.second));
}
}
blob_storage->InitializeAllFiles();
}
{
MutexLock l(&mutex_);
for (ColumnFamilyHandle* cf_handle : cf_handles) {
AddToGCQueue(cf_handle->GetID());
}
MaybeScheduleGC();
}
return s;
}
void TitanDBImpl::MaybeScheduleGC() { void TitanDBImpl::MaybeScheduleGC() {
mutex_.AssertHeld(); mutex_.AssertHeld();
...@@ -178,5 +267,12 @@ Status TitanDBImpl::TEST_StartGC(uint32_t column_family_id) { ...@@ -178,5 +267,12 @@ Status TitanDBImpl::TEST_StartGC(uint32_t column_family_id) {
return s; return s;
} }
void TitanDBImpl::TEST_WaitForBackgroundGC() {
MutexLock l(&mutex_);
while (bg_gc_scheduled_ > 0) {
bg_cv_.Wait();
}
}
} // namespace titandb } // namespace titandb
} // namespace rocksdb } // namespace rocksdb
...@@ -140,6 +140,194 @@ class TitanGCStatsTest : public testing::Test { ...@@ -140,6 +140,194 @@ class TitanGCStatsTest : public testing::Test {
TitanDBImpl* db_impl_ = nullptr; TitanDBImpl* db_impl_ = nullptr;
}; };
TEST_F(TitanGCStatsTest, Flush) {
constexpr size_t kValueSize = 123;
constexpr size_t kNumKeys = 456;
std::string value(kValueSize, 'v');
uint64_t blob_size = get_blob_size(value);
ASSERT_OK(Open());
for (uint32_t k = 0; k < kNumKeys; k++) {
ASSERT_OK(Put(k, value));
}
ASSERT_OK(Flush());
std::map<uint64_t, std::weak_ptr<BlobFileMeta>> blob_files;
GetBlobFiles(&blob_files);
ASSERT_EQ(1, blob_files.size());
std::shared_ptr<BlobFileMeta> blob_file = blob_files.begin()->second.lock();
ASSERT_TRUE(blob_file != nullptr);
ASSERT_EQ(blob_size * kNumKeys, blob_file->live_data_size());
}
TEST_F(TitanGCStatsTest, Compaction) {
constexpr size_t kValueSize = 123;
constexpr size_t kNumKeys = 456;
std::string value(kValueSize, 'v');
uint64_t blob_size = get_blob_size(value);
// Insert some data without generating a blob file.
options_.min_blob_size = 1000;
ASSERT_OK(Open());
for (uint32_t k = 0; k < kNumKeys; k++) {
ASSERT_OK(Put(k, value));
}
ASSERT_OK(Flush());
std::map<uint64_t, std::weak_ptr<BlobFileMeta>> blob_files;
GetBlobFiles(&blob_files);
ASSERT_EQ(0, blob_files.size());
// Generate two blob files from flush.
options_.min_blob_size = 0;
ASSERT_OK(Reopen());
for (uint32_t k = 0; k < kNumKeys; k++) {
if (k % 2 == 0) {
ASSERT_OK(Put(k, value));
}
}
ASSERT_OK(Flush());
for (uint32_t k = 0; k < kNumKeys; k++) {
if (k % 3 == 0) {
ASSERT_OK(Put(k, value));
}
}
ASSERT_OK(Flush());
GetBlobFiles(&blob_files);
ASSERT_EQ(2, blob_files.size());
std::shared_ptr<BlobFileMeta> file1 = blob_files.begin()->second.lock();
std::shared_ptr<BlobFileMeta> file2 = blob_files.rbegin()->second.lock();
ASSERT_EQ(blob_size * ((kNumKeys + 1) / 2), file1->live_data_size());
ASSERT_EQ(blob_size * ((kNumKeys + 2) / 3), file2->live_data_size());
// Compact 3 SST files which should generate a new blob file, and update
// live data size for the other two blob files.
ASSERT_OK(CompactAll());
GetBlobFiles(&blob_files);
ASSERT_EQ(3, blob_files.size());
std::shared_ptr<BlobFileMeta> file3 = blob_files.rbegin()->second.lock();
uint64_t live_keys1 = (kNumKeys + 1) / 2 - (kNumKeys + 5) / 6;
uint64_t live_keys2 = (kNumKeys + 2) / 3;
uint64_t live_keys3 = kNumKeys - live_keys1 - live_keys2;
ASSERT_EQ(blob_size * live_keys1, file1->live_data_size());
ASSERT_EQ(blob_size * live_keys2, file2->live_data_size());
ASSERT_EQ(blob_size * live_keys3, file3->live_data_size());
}
TEST_F(TitanGCStatsTest, GCOutput) {
constexpr size_t kValueSize = 123;
constexpr size_t kNumKeys = 10;
std::string value(kValueSize, 'v');
uint64_t blob_size = get_blob_size(value);
std::map<uint64_t, std::weak_ptr<BlobFileMeta>> blob_files;
// Generate one blob file.
options_.blob_file_discardable_ratio = 0.01;
ASSERT_OK(Open());
for (uint32_t k = 0; k < kNumKeys; k++) {
ASSERT_OK(Put(k, value));
}
ASSERT_OK(Flush());
GetBlobFiles(&blob_files);
ASSERT_EQ(1, blob_files.size());
std::shared_ptr<BlobFileMeta> file1 = blob_files.begin()->second.lock();
ASSERT_TRUE(file1 != nullptr);
ASSERT_EQ(blob_size * kNumKeys, file1->live_data_size());
// Delete some keys and run GC.
size_t num_remaining_keys = kNumKeys;
for (uint32_t k = 0; k < kNumKeys; k++) {
if (k % 7 == 0) {
ASSERT_OK(Delete(k));
num_remaining_keys--;
}
}
ASSERT_OK(Flush());
ASSERT_OK(CompactAll());
// Check file1 live data size updated after compaction.
GetBlobFiles(&blob_files);
ASSERT_EQ(1, blob_files.size());
ASSERT_EQ(file1.get(), blob_files.begin()->second.lock().get());
ASSERT_EQ(blob_size * num_remaining_keys, file1->live_data_size());
ASSERT_EQ(file1.get(), blob_files.begin()->second.lock().get());
ASSERT_OK(db_impl_->TEST_StartGC(db_->DefaultColumnFamily()->GetID()));
// Check file2 live data size is set after GC.
GetBlobFiles(&blob_files);
ASSERT_EQ(2, blob_files.size());
ASSERT_EQ(file1.get(), blob_files.begin()->second.lock().get());
ASSERT_TRUE(file1->is_obsolete());
std::shared_ptr<BlobFileMeta> file2 = blob_files.rbegin()->second.lock();
ASSERT_TRUE(file2 != nullptr);
ASSERT_EQ(blob_size * num_remaining_keys, file2->live_data_size());
}
TEST_F(TitanGCStatsTest, Reopen) {
constexpr size_t kValueSize = 123;
constexpr size_t kNumKeysPerFile = 456;
constexpr size_t kNumFiles = 5;
constexpr size_t kDelKeys = 789;
std::string value(kValueSize, 'v');
uint64_t blob_size = get_blob_size(value);
std::map<uint64_t, std::weak_ptr<BlobFileMeta>> blob_files;
uint64_t expected_size[kNumFiles];
// Generate blob files.
ASSERT_OK(Open());
for (size_t idx = 0; idx < kNumFiles; idx++) {
for (uint32_t k = 0; k < kNumKeysPerFile; k++) {
uint32_t key = static_cast<uint32_t>(idx * kNumKeysPerFile + k);
ASSERT_OK(Put(key, value));
}
ASSERT_OK(Flush());
}
GetBlobFiles(&blob_files);
ASSERT_EQ(kNumFiles, blob_files.size());
auto iter = blob_files.begin();
for (size_t idx = 0; idx < kNumFiles; idx++) {
ASSERT_TRUE(iter != blob_files.end());
std::shared_ptr<BlobFileMeta> file = iter->second.lock();
ASSERT_TRUE(file != nullptr);
expected_size[idx] = blob_size * kNumKeysPerFile;
ASSERT_EQ(expected_size[idx], file->live_data_size());
iter++;
}
// Delete some keys.
Random rand(666);
for (size_t d = 0; d < kDelKeys; d++) {
uint32_t key = rand.Next() % (kNumKeysPerFile * kNumFiles);
bool key_exists = false;
ASSERT_OK(KeyExists(key, &key_exists));
if (key_exists) {
ASSERT_OK(Delete(key));
expected_size[key / kNumKeysPerFile] -= blob_size;
}
}
ASSERT_OK(Flush());
ASSERT_OK(CompactAll());
GetBlobFiles(&blob_files);
ASSERT_EQ(kNumFiles, blob_files.size());
iter = blob_files.begin();
for (size_t idx = 0; idx < kNumFiles; idx++) {
ASSERT_TRUE(iter != blob_files.end());
std::shared_ptr<BlobFileMeta> file = iter->second.lock();
ASSERT_TRUE(file != nullptr);
ASSERT_EQ(expected_size[idx], file->live_data_size());
iter++;
}
// Check live data size after reopen.
ASSERT_OK(Reopen());
GetBlobFiles(&blob_files);
ASSERT_EQ(kNumFiles, blob_files.size());
iter = blob_files.begin();
for (size_t idx = 0; idx < kNumFiles; idx++) {
ASSERT_TRUE(iter != blob_files.end());
std::shared_ptr<BlobFileMeta> file = iter->second.lock();
ASSERT_TRUE(file != nullptr);
ASSERT_EQ(expected_size[idx], file->live_data_size());
iter++;
}
}
TEST_F(TitanGCStatsTest, DeleteFilesInRange) { TEST_F(TitanGCStatsTest, DeleteFilesInRange) {
constexpr size_t kValueSize = 123; constexpr size_t kValueSize = 123;
constexpr size_t kNumKeys = 10; constexpr size_t kNumKeys = 10;
...@@ -159,7 +347,7 @@ TEST_F(TitanGCStatsTest, DeleteFilesInRange) { ...@@ -159,7 +347,7 @@ TEST_F(TitanGCStatsTest, DeleteFilesInRange) {
GetBlobFiles(&blob_files); GetBlobFiles(&blob_files);
ASSERT_EQ(1, blob_files.size()); ASSERT_EQ(1, blob_files.size());
std::shared_ptr<BlobFileMeta> blob_file = blob_files.begin()->second.lock(); std::shared_ptr<BlobFileMeta> blob_file = blob_files.begin()->second.lock();
ASSERT_EQ(0, blob_file->discardable_size()); ASSERT_EQ(blob_size * kNumKeys, blob_file->live_data_size());
// Force to split SST into smaller ones. With the current rocksdb // Force to split SST into smaller ones. With the current rocksdb
// implementation it split the file into every two keys per SST. // implementation it split the file into every two keys per SST.
...@@ -175,7 +363,7 @@ TEST_F(TitanGCStatsTest, DeleteFilesInRange) { ...@@ -175,7 +363,7 @@ TEST_F(TitanGCStatsTest, DeleteFilesInRange) {
ASSERT_EQ(1, blob_files.size()); ASSERT_EQ(1, blob_files.size());
blob_file = blob_files.begin()->second.lock(); blob_file = blob_files.begin()->second.lock();
ASSERT_TRUE(blob_file != nullptr); ASSERT_TRUE(blob_file != nullptr);
ASSERT_EQ(0, blob_file->discardable_size()); ASSERT_EQ(blob_size * kNumKeys, blob_file->live_data_size());
// Add a overlapping SST to disable trivial move. // Add a overlapping SST to disable trivial move.
ASSERT_OK(Put(0, value)); ASSERT_OK(Put(0, value));
...@@ -190,7 +378,7 @@ TEST_F(TitanGCStatsTest, DeleteFilesInRange) { ...@@ -190,7 +378,7 @@ TEST_F(TitanGCStatsTest, DeleteFilesInRange) {
ASSERT_EQ(1, blob_files.size()); ASSERT_EQ(1, blob_files.size());
blob_file = blob_files.begin()->second.lock(); blob_file = blob_files.begin()->second.lock();
ASSERT_TRUE(blob_file != nullptr); ASSERT_TRUE(blob_file != nullptr);
ASSERT_EQ(0, blob_file->discardable_size()); ASSERT_EQ(blob_size * kNumKeys, blob_file->live_data_size());
// Check live data size updated after DeleteFilesInRange. // Check live data size updated after DeleteFilesInRange.
std::string key4 = gen_key(4); std::string key4 = gen_key(4);
...@@ -202,7 +390,136 @@ TEST_F(TitanGCStatsTest, DeleteFilesInRange) { ...@@ -202,7 +390,136 @@ TEST_F(TitanGCStatsTest, DeleteFilesInRange) {
ASSERT_EQ(1, blob_files.size()); ASSERT_EQ(1, blob_files.size());
blob_file = blob_files.begin()->second.lock(); blob_file = blob_files.begin()->second.lock();
ASSERT_TRUE(blob_file != nullptr); ASSERT_TRUE(blob_file != nullptr);
ASSERT_EQ(blob_size * 4, blob_file->discardable_size()); ASSERT_EQ(blob_size * (kNumKeys - 4), blob_file->live_data_size());
}
TEST_F(TitanGCStatsTest, LevelMerge) {
constexpr size_t kValueSize = 123;
constexpr size_t kNumKeys = 456;
constexpr size_t kNumFiles = 3;
std::string value(kValueSize, 'v');
uint64_t blob_size = get_blob_size(value);
std::map<uint64_t, std::weak_ptr<BlobFileMeta>> blob_files;
Random rand(666);
uint64_t expected_size[kNumFiles];
// Enable level merge and load 3 files at L0.
options_.level_merge = true;
ASSERT_OK(Open());
for (size_t idx = 0; idx < kNumFiles; idx++) {
expected_size[idx] = 0;
for (uint32_t k = 0; k < kNumKeys; k++) {
if (k % kNumFiles == idx) {
ASSERT_OK(Put(k, value));
expected_size[idx] += blob_size;
}
}
ASSERT_OK(Flush());
}
GetBlobFiles(&blob_files);
ASSERT_EQ(kNumFiles, blob_files.size());
auto iter = blob_files.begin();
for (size_t idx = 0; idx < kNumFiles; idx++) {
ASSERT_TRUE(iter != blob_files.end());
std::shared_ptr<BlobFileMeta> file = iter->second.lock();
ASSERT_TRUE(file != nullptr);
ASSERT_EQ(expected_size[idx], file->live_data_size());
iter++;
}
// Compact to trigger level merge. New blob file should be generated, and
// existing ones will be obsolete.
ASSERT_OK(CompactAll());
GetBlobFiles(&blob_files);
ASSERT_EQ(kNumFiles + 1, blob_files.size());
iter = blob_files.begin();
for (size_t idx = 0; idx < kNumFiles; idx++) {
ASSERT_TRUE(iter != blob_files.end());
std::shared_ptr<BlobFileMeta> file = iter->second.lock();
ASSERT_TRUE(file != nullptr);
ASSERT_TRUE(file->is_obsolete());
iter++;
}
ASSERT_TRUE(iter != blob_files.end());
std::shared_ptr<BlobFileMeta> new_file = blob_files.rbegin()->second.lock();
ASSERT_TRUE(new_file != nullptr);
ASSERT_EQ(blob_size * kNumKeys, new_file->live_data_size());
ASSERT_EQ(options_.num_levels - 1, new_file->file_level());
}
TEST_F(TitanGCStatsTest, RangeMerge) {
constexpr size_t kLargeValueSize = 123;
constexpr size_t kNumKeys = 456;
constexpr size_t kNumFiles = 3;
std::string small_value = "v";
std::string large_value(kLargeValueSize, 'v');
uint64_t blob_size = get_blob_size(large_value);
std::map<uint64_t, std::weak_ptr<BlobFileMeta>> blob_files;
Random rand(666);
uint64_t expected_size[kNumFiles];
// Enable level merge and range merge. Generate some files and compact them
// to bottom level.
options_.min_blob_size = 10;
options_.level_merge = true;
options_.range_merge = true;
options_.max_sorted_runs = static_cast<int>(kNumFiles - 1);
ASSERT_OK(Open());
for (size_t idx = 0; idx < kNumFiles; idx++) {
expected_size[idx] = 0;
for (size_t k = 1; k <= kNumKeys; k++) {
if (k % kNumFiles == idx) {
ASSERT_OK(Put(k, large_value));
expected_size[idx] += blob_size;
}
}
ASSERT_OK(Flush());
// Generate an extra overlapping files to disable trivial move.
ASSERT_OK(Put(0, small_value));
ASSERT_OK(Put(kNumKeys + 1, small_value));
ASSERT_OK(Flush());
ASSERT_OK(CompactAll());
}
// Verify file at bottom level equals to kNumFiles.
size_t num_live_files = 0;
GetBlobFiles(&blob_files);
ASSERT_EQ(kNumFiles * 2, blob_files.size());
size_t ptr = 0;
for (auto iter = blob_files.begin(); iter != blob_files.end(); iter++) {
std::shared_ptr<BlobFileMeta> file = iter->second.lock();
ASSERT_TRUE(file != nullptr);
if (!file->is_obsolete()) {
ASSERT_EQ(options_.num_levels - 1, file->file_level());
ASSERT_EQ(BlobFileMeta::FileState::kToMerge, file->file_state());
ASSERT_EQ(expected_size[ptr], file->live_data_size());
ASSERT_LT(num_live_files, kNumFiles);
num_live_files++;
ptr++;
}
}
ASSERT_EQ(kNumFiles, num_live_files);
// Generate an extra overlapping files to disable trivial move.
ASSERT_OK(Put(0, small_value));
ASSERT_OK(Put(kNumKeys + 1, small_value));
ASSERT_OK(Flush());
// Trigger a compaction to run range merge.
ASSERT_OK(CompactAll());
GetBlobFiles(&blob_files);
ASSERT_EQ(kNumFiles * 2 + 1, blob_files.size());
for (auto iter = blob_files.begin(); iter != blob_files.end();) {
std::shared_ptr<BlobFileMeta> file = iter->second.lock();
iter++;
// check all files except the last one.
if (iter != blob_files.end()) {
ASSERT_TRUE(file != nullptr);
ASSERT_TRUE(file->is_obsolete());
}
}
std::shared_ptr<BlobFileMeta> new_file = blob_files.rbegin()->second.lock();
ASSERT_TRUE(new_file != nullptr);
ASSERT_EQ(blob_size * kNumKeys, new_file->live_data_size());
} }
} // namespace titandb } // namespace titandb
......
...@@ -1158,14 +1158,15 @@ TEST_F(TitanDBTest, GCAfterDropCF) { ...@@ -1158,14 +1158,15 @@ TEST_F(TitanDBTest, GCAfterDropCF) {
} }
TEST_F(TitanDBTest, GCBeforeFlushCommit) { TEST_F(TitanDBTest, GCBeforeFlushCommit) {
port::Mutex mu;
port::CondVar cv(&mu);
std::atomic<bool> is_first_flush{true}; std::atomic<bool> is_first_flush{true};
std::atomic<int> flush_completed{0};
DBImpl* db_impl = nullptr; DBImpl* db_impl = nullptr;
SyncPoint::GetInstance()->LoadDependency( SyncPoint::GetInstance()->LoadDependency(
{{"TitanDBTest::GCBeforeFlushCommit:PauseInstall", {{"TitanDBTest::GCBeforeFlushCommit:PauseInstall",
"TitanDBTest::GCBeforeFlushCommit:WaitFlushPause"}, "TitanDBTest::GCBeforeFlushCommit:WaitFlushPause"}});
{"TitanDBImpl::OnFlushCompleted:Finished",
"TitanDBTest::GCBeforeFlushCommit:WaitSecondFlush"}});
SyncPoint::GetInstance()->SetCallBack("FlushJob::InstallResults", [&](void*) { SyncPoint::GetInstance()->SetCallBack("FlushJob::InstallResults", [&](void*) {
if (is_first_flush) { if (is_first_flush) {
is_first_flush = false; is_first_flush = false;
...@@ -1179,6 +1180,19 @@ TEST_F(TitanDBTest, GCBeforeFlushCommit) { ...@@ -1179,6 +1180,19 @@ TEST_F(TitanDBTest, GCBeforeFlushCommit) {
Env::Default()->SleepForMicroseconds(1000 * 1000); // 1s Env::Default()->SleepForMicroseconds(1000 * 1000); // 1s
db_mutex->Lock(); db_mutex->Lock();
}); });
SyncPoint::GetInstance()->SetCallBack(
"TitanDBImpl::OnFlushCompleted:Finished", [&](void*) {
MutexLock l(&mu);
flush_completed++;
cv.SignalAll();
});
SyncPoint::GetInstance()->SetCallBack(
"TitanDBTest::GCBeforeFlushCommit:WaitSecondFlush", [&](void*) {
MutexLock l(&mu);
while (flush_completed < 2) {
cv.Wait();
}
});
options_.create_if_missing = true; options_.create_if_missing = true;
// Setting max_flush_jobs = max_background_jobs / 4 = 2. // Setting max_flush_jobs = max_background_jobs / 4 = 2.
...@@ -1207,13 +1221,14 @@ TEST_F(TitanDBTest, GCBeforeFlushCommit) { ...@@ -1207,13 +1221,14 @@ TEST_F(TitanDBTest, GCBeforeFlushCommit) {
flush_opts.wait = false; flush_opts.wait = false;
ASSERT_OK(db_->Flush(flush_opts)); ASSERT_OK(db_->Flush(flush_opts));
TEST_SYNC_POINT("TitanDBTest::GCBeforeFlushCommit:WaitSecondFlush"); TEST_SYNC_POINT("TitanDBTest::GCBeforeFlushCommit:WaitSecondFlush");
// Set GC mark to force GC select the file. // Set live data size to force GC select the file.
auto blob_storage = GetBlobStorage().lock(); auto blob_storage = GetBlobStorage().lock();
std::map<uint64_t, std::weak_ptr<BlobFileMeta>> blob_files; std::map<uint64_t, std::weak_ptr<BlobFileMeta>> blob_files;
blob_storage->ExportBlobFiles(blob_files); blob_storage->ExportBlobFiles(blob_files);
ASSERT_EQ(2, blob_files.size()); ASSERT_EQ(2, blob_files.size());
// Set live data size to 0 to force GC.
auto second_file = blob_files.rbegin()->second.lock(); auto second_file = blob_files.rbegin()->second.lock();
second_file->set_gc_mark(true); second_file->set_live_data_size(0);
ASSERT_OK(db_impl_->TEST_StartGC(cf_id)); ASSERT_OK(db_impl_->TEST_StartGC(cf_id));
ASSERT_OK(db_impl_->TEST_PurgeObsoleteFiles()); ASSERT_OK(db_impl_->TEST_PurgeObsoleteFiles());
t1.join(); t1.join();
...@@ -1227,6 +1242,72 @@ TEST_F(TitanDBTest, GCBeforeFlushCommit) { ...@@ -1227,6 +1242,72 @@ TEST_F(TitanDBTest, GCBeforeFlushCommit) {
SyncPoint::GetInstance()->ClearAllCallBacks(); SyncPoint::GetInstance()->ClearAllCallBacks();
} }
// Test GC stats will be recover on DB reopen and GC resume after reopen.
TEST_F(TitanDBTest, GCAfterReopen) {
options_.min_blob_size = 0;
options_.blob_file_discardable_ratio = 0.01;
options_.disable_background_gc = true;
options_.blob_file_compression = CompressionType::kNoCompression;
// Generate a blob file and delete half of keys in it.
Open();
for (int i = 0; i < 100; i++) {
std::string key = GenKey(i);
ASSERT_OK(db_->Put(WriteOptions(), key, "v"));
}
Flush();
for (int i = 0; i < 100; i++) {
if (i % 2 == 0) {
Delete(i);
}
}
Flush();
CompactAll();
std::shared_ptr<BlobStorage> blob_storage = GetBlobStorage().lock();
ASSERT_TRUE(blob_storage != nullptr);
std::map<uint64_t, std::weak_ptr<BlobFileMeta>> blob_files;
blob_storage->ExportBlobFiles(blob_files);
ASSERT_EQ(1, blob_files.size());
std::shared_ptr<BlobFileMeta> file1 = blob_files.begin()->second.lock();
ASSERT_TRUE(file1 != nullptr);
ASSERT_TRUE(abs(file1->GetDiscardableRatio() - 0.5) < 0.01);
uint64_t file_number1 = file1->file_number();
file1.reset();
blob_files.clear();
blob_storage.reset();
// Sync point to verify GC stat recovered after reopen.
std::atomic<int> num_gc_job{0};
SyncPoint::GetInstance()->SetCallBack(
"TitanDBImpl::OpenImpl:BeforeInitialized", [&](void* arg) {
TitanDBImpl* db_impl = reinterpret_cast<TitanDBImpl*>(arg);
blob_storage =
db_impl->TEST_GetBlobStorage(db_impl->DefaultColumnFamily());
ASSERT_TRUE(blob_storage != nullptr);
blob_storage->ExportBlobFiles(blob_files);
ASSERT_EQ(1, blob_files.size());
std::shared_ptr<BlobFileMeta> file = blob_files.begin()->second.lock();
ASSERT_TRUE(file != nullptr);
ASSERT_TRUE(abs(file->GetDiscardableRatio() - 0.5) < 0.01);
});
SyncPoint::GetInstance()->SetCallBack("TitanDBImpl::BackgroundGC:Finish",
[&](void*) { num_gc_job++; });
SyncPoint::GetInstance()->EnableProcessing();
// Re-enable background GC and Reopen. See if GC resume.
options_.disable_background_gc = false;
Reopen();
db_impl_->TEST_WaitForBackgroundGC();
ASSERT_OK(db_impl_->TEST_PurgeObsoleteFiles());
ASSERT_EQ(1, num_gc_job);
blob_storage = GetBlobStorage().lock();
ASSERT_TRUE(blob_storage != nullptr);
blob_storage->ExportBlobFiles(blob_files);
ASSERT_EQ(1, blob_files.size());
std::shared_ptr<BlobFileMeta> file2 = blob_files.begin()->second.lock();
ASSERT_GT(file2->file_number(), file_number1);
}
} // namespace titandb } // namespace titandb
} // namespace rocksdb } // namespace rocksdb
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment