Unverified Commit 47ff7da9 authored by Connor's avatar Connor Committed by GitHub

Fix wrong live data size when encounter rewrite failure (#161)

* fix wrong live data size when encounter rewrite failure
Signed-off-by: 's avatarConnor1996 <zbk602423539@gmail.com>
parent 6683406d
...@@ -451,6 +451,9 @@ Status BlobGCJob::RewriteValidKeyToLSM() { ...@@ -451,6 +451,9 @@ Status BlobGCJob::RewriteValidKeyToLSM() {
WriteOptions wo; WriteOptions wo;
wo.low_pri = true; wo.low_pri = true;
wo.ignore_missing_column_families = true; wo.ignore_missing_column_families = true;
std::unordered_map<uint64_t, uint64_t>
dropped; // blob_file_number -> dropped_size
if (!gc_merge_rewrite_) { if (!gc_merge_rewrite_) {
for (auto& write_batch : rewrite_batches_) { for (auto& write_batch : rewrite_batches_) {
if (blob_gc_->GetColumnFamilyData()->IsDropped()) { if (blob_gc_->GetColumnFamilyData()->IsDropped()) {
...@@ -473,6 +476,13 @@ Status BlobGCJob::RewriteValidKeyToLSM() { ...@@ -473,6 +476,13 @@ Status BlobGCJob::RewriteValidKeyToLSM() {
metrics_.gc_num_keys_overwritten++; metrics_.gc_num_keys_overwritten++;
metrics_.gc_bytes_overwritten += write_batch.second.blob_record_size(); metrics_.gc_bytes_overwritten += write_batch.second.blob_record_size();
// The key is overwritten in the meanwhile. Drop the blob record. // The key is overwritten in the meanwhile. Drop the blob record.
// Though record is dropped, the diff won't counted in discardable
// ratio,
// so we should update the live_data_size here.
BlobIndex blob_index;
Slice str(write_batch.second.value);
blob_index.DecodeFrom(&str);
dropped[blob_index.file_number] += blob_index.blob_handle.size;
} else { } else {
// We hit an error. // We hit an error.
break; break;
...@@ -508,6 +518,30 @@ Status BlobGCJob::RewriteValidKeyToLSM() { ...@@ -508,6 +518,30 @@ Status BlobGCJob::RewriteValidKeyToLSM() {
s = Status::OK(); s = Status::OK();
} }
mutex_->Lock();
auto cf_id = blob_gc_->column_family_handle()->GetID();
for (auto blob_file : dropped) {
auto blob_storage = blob_file_set_->GetBlobStorage(cf_id).lock();
if (blob_storage) {
auto file = blob_storage->FindFile(blob_file.first).lock();
if (!file) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Blob File %" PRIu64 " not found when GC.",
blob_file.first);
continue;
}
SubStats(stats_, cf_id, file->GetDiscardableRatioLevel(), 1);
file->UpdateLiveDataSize(-blob_file.second);
AddStats(stats_, cf_id, file->GetDiscardableRatioLevel(), 1);
blob_storage->ComputeGCScore();
} else {
ROCKS_LOG_ERROR(db_options_.info_log,
"Column family id:%" PRIu32 " not Found when GC.", cf_id);
}
}
mutex_->Unlock();
if (s.ok()) { if (s.ok()) {
// Flush and sync WAL. // Flush and sync WAL.
s = db_impl->FlushWAL(true /*sync*/); s = db_impl->FlushWAL(true /*sync*/);
......
...@@ -213,6 +213,7 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer, ...@@ -213,6 +213,7 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer,
mutex_.Unlock(); mutex_.Unlock();
TEST_SYNC_POINT("TitanDBImpl::BackgroundGC::BeforeRunGCJob"); TEST_SYNC_POINT("TitanDBImpl::BackgroundGC::BeforeRunGCJob");
s = blob_gc_job.Run(); s = blob_gc_job.Run();
TEST_SYNC_POINT("TitanDBImpl::BackgroundGC::AfterRunGCJob");
mutex_.Lock(); mutex_.Lock();
} }
if (s.ok()) { if (s.ok()) {
......
...@@ -1401,6 +1401,66 @@ TEST_F(TitanDBTest, GCAfterReopen) { ...@@ -1401,6 +1401,66 @@ TEST_F(TitanDBTest, GCAfterReopen) {
ASSERT_GT(file2->file_number(), file_number1); ASSERT_GT(file2->file_number(), file_number1);
} }
TEST_F(TitanDBTest, CompactionDuringGC) {
options_.max_background_gc = 1;
options_.disable_background_gc = false;
options_.blob_file_discardable_ratio = 0.01;
Open();
ASSERT_OK(db_->Put(WriteOptions(), "k1", std::string(10 * 1024, 'v')));
auto snap = db_->GetSnapshot();
ASSERT_OK(db_->Put(WriteOptions(), "k1", std::string(100 * 1024, 'v')));
Flush();
db_->ReleaseSnapshot(snap);
SyncPoint::GetInstance()->LoadDependency(
{{"TitanDBImpl::BackgroundGC::AfterRunGCJob",
"TitanDBTest::CompactionDuringGC::WaitGCStart"},
{"TitanDBTest::CompactionDuringGC::ContinueGC",
"BlobGCJob::Finish::BeforeRewriteValidKeyToLSM"},
{"BlobGCJob::Finish::AfterRewriteValidKeyToLSM",
"TitanDBTest::CompactionDuringGC::WaitGCFinish"}});
SyncPoint::GetInstance()->EnableProcessing();
CheckBlobFileCount(1);
CompactAll();
std::shared_ptr<BlobStorage> blob_storage = GetBlobStorage().lock();
ASSERT_TRUE(blob_storage != nullptr);
std::map<uint64_t, std::weak_ptr<BlobFileMeta>> blob_files;
blob_storage->ExportBlobFiles(blob_files);
ASSERT_EQ(blob_files.size(), 1);
// trigger GC
CompactAll();
TEST_SYNC_POINT("TitanDBTest::CompactionDuringGC::WaitGCStart");
ASSERT_OK(db_->Delete(WriteOptions(), "k1"));
TEST_SYNC_POINT("TitanDBTest::CompactionDuringGC::ContinueGC");
TEST_SYNC_POINT("TitanDBTest::CompactionDuringGC::WaitGCFinish");
blob_storage->ExportBlobFiles(blob_files);
// rewriting index to LSM failed, but the output blob file is already
// generated
ASSERT_EQ(blob_files.size(), 2);
std::string value;
Status status = db_->Get(ReadOptions(), "k1", &value);
ASSERT_EQ(status, Status::NotFound());
SyncPoint::GetInstance()->DisableProcessing();
CheckBlobFileCount(1);
Flush();
CompactAll();
CompactAll();
db_impl_->TEST_StartGC(db_impl_->DefaultColumnFamily()->GetID());
CheckBlobFileCount(0);
}
TEST_F(TitanDBTest, DeleteFilesInRangeDuringGC) { TEST_F(TitanDBTest, DeleteFilesInRangeDuringGC) {
options_.max_background_gc = 1; options_.max_background_gc = 1;
options_.disable_background_gc = false; options_.disable_background_gc = false;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment