Unverified Commit becc3ecb authored by Connor's avatar Connor Committed by GitHub

Fix GC may delete a already deleted blob file (#168) (#170)

* Fix GC may delete a already deleted blob file (#168)
Signed-off-by: 's avatarConnor1996 <zbk602423539@gmail.com>
parent b623e6be
...@@ -209,6 +209,7 @@ class BlobFileMeta { ...@@ -209,6 +209,7 @@ class BlobFileMeta {
private: private:
// Persistent field // Persistent field
uint64_t file_number_{0}; uint64_t file_number_{0};
uint64_t file_size_{0}; uint64_t file_size_{0};
uint64_t file_entries_; uint64_t file_entries_;
...@@ -220,9 +221,9 @@ class BlobFileMeta { ...@@ -220,9 +221,9 @@ class BlobFileMeta {
std::string largest_key_; std::string largest_key_;
// Not persistent field // Not persistent field
FileState state_{FileState::kInit}; std::atomic<FileState> state_{FileState::kInit};
uint64_t discardable_size_{0}; std::atomic<uint64_t> discardable_size_{0};
// gc_mark is set to true when this file is recovered from re-opening the DB // gc_mark is set to true when this file is recovered from re-opening the DB
// that means this file needs to be checked for GC // that means this file needs to be checked for GC
bool gc_mark_{false}; bool gc_mark_{false};
......
...@@ -3,9 +3,9 @@ ...@@ -3,9 +3,9 @@
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
BlobGC::BlobGC(std::vector<BlobFileMeta*>&& blob_files, BlobGC::BlobGC(std::vector<std::shared_ptr<BlobFileMeta>>&& blob_files,
TitanCFOptions&& _titan_cf_options, bool need_trigger_next) TitanCFOptions&& _titan_cf_options, bool need_trigger_next)
: inputs_(std::move(blob_files)), : inputs_(blob_files),
titan_cf_options_(std::move(_titan_cf_options)), titan_cf_options_(std::move(_titan_cf_options)),
trigger_next_(need_trigger_next) { trigger_next_(need_trigger_next) {
MarkFilesBeingGC(); MarkFilesBeingGC();
......
...@@ -12,7 +12,7 @@ namespace titandb { ...@@ -12,7 +12,7 @@ namespace titandb {
// A BlobGC encapsulates information about a blob gc. // A BlobGC encapsulates information about a blob gc.
class BlobGC { class BlobGC {
public: public:
BlobGC(std::vector<BlobFileMeta*>&& blob_files, BlobGC(std::vector<std::shared_ptr<BlobFileMeta>>&& blob_files,
TitanCFOptions&& _titan_cf_options, bool need_trigger_next); TitanCFOptions&& _titan_cf_options, bool need_trigger_next);
// No copying allowed // No copying allowed
...@@ -21,13 +21,15 @@ class BlobGC { ...@@ -21,13 +21,15 @@ class BlobGC {
~BlobGC(); ~BlobGC();
const std::vector<BlobFileMeta*>& inputs() { return inputs_; } const std::vector<std::shared_ptr<BlobFileMeta>>& inputs() { return inputs_; }
void set_sampled_inputs(std::vector<BlobFileMeta*>&& files) { void set_sampled_inputs(std::vector<std::shared_ptr<BlobFileMeta>>&& files) {
sampled_inputs_ = std::move(files); sampled_inputs_ = std::move(files);
} }
const std::vector<BlobFileMeta*>& sampled_inputs() { return sampled_inputs_; } const std::vector<std::shared_ptr<BlobFileMeta>>& sampled_inputs() {
return sampled_inputs_;
}
const TitanCFOptions& titan_cf_options() { return titan_cf_options_; } const TitanCFOptions& titan_cf_options() { return titan_cf_options_; }
...@@ -46,8 +48,8 @@ class BlobGC { ...@@ -46,8 +48,8 @@ class BlobGC {
bool trigger_next() { return trigger_next_; } bool trigger_next() { return trigger_next_; }
private: private:
std::vector<BlobFileMeta*> inputs_; std::vector<std::shared_ptr<BlobFileMeta>> inputs_;
std::vector<BlobFileMeta*> sampled_inputs_; std::vector<std::shared_ptr<BlobFileMeta>> sampled_inputs_;
std::vector<BlobFileMeta*> outputs_; std::vector<BlobFileMeta*> outputs_;
TitanCFOptions titan_cf_options_; TitanCFOptions titan_cf_options_;
ColumnFamilyHandle* cfh_{nullptr}; ColumnFamilyHandle* cfh_{nullptr};
......
...@@ -151,10 +151,10 @@ Status BlobGCJob::Run() { ...@@ -151,10 +151,10 @@ Status BlobGCJob::Run() {
Status BlobGCJob::SampleCandidateFiles() { Status BlobGCJob::SampleCandidateFiles() {
TitanStopWatch sw(env_, metrics_.gc_sampling_micros); TitanStopWatch sw(env_, metrics_.gc_sampling_micros);
std::vector<BlobFileMeta*> result; std::vector<std::shared_ptr<BlobFileMeta>> result;
for (const auto& file : blob_gc_->inputs()) { for (const auto& file : blob_gc_->inputs()) {
bool selected = false; bool selected = false;
Status s = DoSample(file, &selected); Status s = DoSample(file.get(), &selected);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
...@@ -441,6 +441,7 @@ Status BlobGCJob::Finish() { ...@@ -441,6 +441,7 @@ Status BlobGCJob::Finish() {
mutex_->Unlock(); mutex_->Unlock();
s = InstallOutputBlobFiles(); s = InstallOutputBlobFiles();
if (s.ok()) { if (s.ok()) {
TEST_SYNC_POINT("BlobGCJob::Finish::BeforeRewriteValidKeyToLSM");
s = RewriteValidKeyToLSM(); s = RewriteValidKeyToLSM();
if (!s.ok()) { if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log, ROCKS_LOG_ERROR(db_options_.info_log,
...@@ -462,6 +463,7 @@ Status BlobGCJob::Finish() { ...@@ -462,6 +463,7 @@ Status BlobGCJob::Finish() {
if (s.ok() && !blob_gc_->GetColumnFamilyData()->IsDropped()) { if (s.ok() && !blob_gc_->GetColumnFamilyData()->IsDropped()) {
s = DeleteInputBlobFiles(); s = DeleteInputBlobFiles();
} }
TEST_SYNC_POINT("BlobGCJob::Finish::AfterRewriteValidKeyToLSM");
if (s.ok()) { if (s.ok()) {
UpdateInternalOpStats(); UpdateInternalOpStats();
...@@ -593,6 +595,11 @@ Status BlobGCJob::DeleteInputBlobFiles() { ...@@ -593,6 +595,11 @@ Status BlobGCJob::DeleteInputBlobFiles() {
file->file_number()); file->file_number());
metrics_.gc_num_files++; metrics_.gc_num_files++;
MeasureTime(stats_, TitanStats::GC_INPUT_FILE_SIZE, file->file_size()); MeasureTime(stats_, TitanStats::GC_INPUT_FILE_SIZE, file->file_size());
if (file->is_obsolete()) {
// There may be a concurrent DeleteBlobFilesInRanges or GC,
// so the input file is already deleted.
continue;
}
edit.DeleteBlobFile(file->file_number(), obsolete_sequence); edit.DeleteBlobFile(file->file_number(), obsolete_sequence);
} }
s = blob_file_set_->LogAndApply(edit); s = blob_file_set_->LogAndApply(edit);
......
...@@ -203,7 +203,7 @@ class BlobGCJobTest : public testing::Test { ...@@ -203,7 +203,7 @@ class BlobGCJobTest : public testing::Test {
ASSERT_OK(WriteBatchInternal::PutBlobIndex(&wb, cfh->GetID(), key, res)); ASSERT_OK(WriteBatchInternal::PutBlobIndex(&wb, cfh->GetID(), key, res));
auto rewrite_status = base_db_->Write(WriteOptions(), &wb); auto rewrite_status = base_db_->Write(WriteOptions(), &wb);
std::vector<BlobFileMeta*> tmp; std::vector<std::shared_ptr<BlobFileMeta>> tmp;
BlobGC blob_gc(std::move(tmp), TitanCFOptions(), false /*trigger_next*/); BlobGC blob_gc(std::move(tmp), TitanCFOptions(), false /*trigger_next*/);
blob_gc.SetColumnFamily(cfh); blob_gc.SetColumnFamily(cfh);
BlobGCJob blob_gc_job(&blob_gc, base_db_, mutex_, TitanDBOptions(), BlobGCJob blob_gc_job(&blob_gc, base_db_, mutex_, TitanDBOptions(),
......
...@@ -13,7 +13,7 @@ BasicBlobGCPicker::~BasicBlobGCPicker() {} ...@@ -13,7 +13,7 @@ BasicBlobGCPicker::~BasicBlobGCPicker() {}
std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC( std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC(
BlobStorage* blob_storage) { BlobStorage* blob_storage) {
Status s; Status s;
std::vector<BlobFileMeta*> blob_files; std::vector<std::shared_ptr<BlobFileMeta>> blob_files;
uint64_t batch_size = 0; uint64_t batch_size = 0;
uint64_t estimate_output_size = 0; uint64_t estimate_output_size = 0;
...@@ -38,7 +38,7 @@ std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC( ...@@ -38,7 +38,7 @@ std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC(
} }
if (!stop_picking) { if (!stop_picking) {
blob_files.push_back(blob_file.get()); blob_files.emplace_back(blob_file);
batch_size += blob_file->file_size(); batch_size += blob_file->file_size();
estimate_output_size += estimate_output_size +=
(blob_file->file_size() - blob_file->discardable_size()); (blob_file->file_size() - blob_file->discardable_size());
...@@ -61,9 +61,8 @@ std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC( ...@@ -61,9 +61,8 @@ std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC(
} }
} }
} }
ROCKS_LOG_DEBUG(db_options_.info_log, ROCKS_LOG_DEBUG(db_options_.info_log, "got batch size %" PRIu64
"got batch size %" PRIu64 ", estimate output %" PRIu64 ", estimate output %" PRIu64 " bytes",
" bytes",
batch_size, estimate_output_size); batch_size, estimate_output_size);
if (blob_files.empty() || batch_size < cf_options_.min_gc_batch_size) { if (blob_files.empty() || batch_size < cf_options_.min_gc_batch_size) {
return nullptr; return nullptr;
......
...@@ -114,6 +114,7 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer, ...@@ -114,6 +114,7 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer,
s = blob_gc_job.Prepare(); s = blob_gc_job.Prepare();
if (s.ok()) { if (s.ok()) {
mutex_.Unlock(); mutex_.Unlock();
TEST_SYNC_POINT("TitanDBImpl::BackgroundGC::BeforeRunGCJob");
s = blob_gc_job.Run(); s = blob_gc_job.Run();
mutex_.Lock(); mutex_.Lock();
} }
......
...@@ -219,13 +219,17 @@ class TitanDBTest : public testing::Test { ...@@ -219,13 +219,17 @@ class TitanDBTest : public testing::Test {
} }
} }
void CompactAll() { void CompactAll(ColumnFamilyHandle* cf_handle = nullptr) {
if (cf_handle == nullptr) {
cf_handle = db_->DefaultColumnFamily();
}
auto opts = db_->GetOptions(); auto opts = db_->GetOptions();
auto compact_opts = CompactRangeOptions(); auto compact_opts = CompactRangeOptions();
compact_opts.change_level = true; compact_opts.change_level = true;
compact_opts.target_level = opts.num_levels - 1; compact_opts.target_level = opts.num_levels - 1;
compact_opts.bottommost_level_compaction = BottommostLevelCompaction::kSkip; compact_opts.bottommost_level_compaction =
ASSERT_OK(db_->CompactRange(compact_opts, nullptr, nullptr)); BottommostLevelCompaction::kForce;
ASSERT_OK(db_->CompactRange(compact_opts, cf_handle, nullptr, nullptr));
} }
void DeleteFilesInRange(const Slice* begin, const Slice* end) { void DeleteFilesInRange(const Slice* begin, const Slice* end) {
...@@ -1176,6 +1180,52 @@ TEST_F(TitanDBTest, GCBeforeFlushCommit) { ...@@ -1176,6 +1180,52 @@ TEST_F(TitanDBTest, GCBeforeFlushCommit) {
SyncPoint::GetInstance()->ClearAllCallBacks(); SyncPoint::GetInstance()->ClearAllCallBacks();
} }
TEST_F(TitanDBTest, DeleteFilesInRangeDuringGC) {
options_.max_background_gc = 1;
options_.disable_background_gc = false;
options_.blob_file_discardable_ratio = 0.01;
Open();
ASSERT_OK(db_->Put(WriteOptions(), "k1", std::string(10 * 1024, 'v')));
auto snap = db_->GetSnapshot();
ASSERT_OK(db_->Put(WriteOptions(), "k1", std::string(100 * 1024, 'v')));
Flush();
db_->ReleaseSnapshot(snap);
std::shared_ptr<BlobStorage> blob_storage = GetBlobStorage().lock();
ASSERT_TRUE(blob_storage != nullptr);
std::map<uint64_t, std::weak_ptr<BlobFileMeta>> blob_files;
blob_storage->ExportBlobFiles(blob_files);
ASSERT_EQ(blob_files.size(), 1);
SyncPoint::GetInstance()->LoadDependency(
{{"TitanDBImpl::BackgroundGC::BeforeRunGCJob",
"TitanDBTest::DeleteFilesInRangeDuringGC::WaitGCStart"},
{"TitanDBTest::DeleteFilesInRangeDuringGC::ContinueGC",
"BlobGCJob::Finish::BeforeRewriteValidKeyToLSM"},
{"BlobGCJob::Finish::AfterRewriteValidKeyToLSM",
"TitanDBTest::DeleteFilesInRangeDuringGC::WaitGCFinish"}});
SyncPoint::GetInstance()->EnableProcessing();
CompactAll();
// trigger GC
CompactAll();
TEST_SYNC_POINT("TitanDBTest::DeleteFilesInRangeDuringGC::WaitGCStart");
DeleteFilesInRange(nullptr, nullptr);
TEST_SYNC_POINT("TitanDBTest::DeleteFilesInRangeDuringGC::ContinueGC");
TEST_SYNC_POINT("TitanDBTest::DeleteFilesInRangeDuringGC::WaitGCFinish");
std::string value;
Status s = db_->Get(ReadOptions(), "k1", &value);
ASSERT_TRUE(s.IsNotFound());
// it shouldn't be any background error
ASSERT_OK(db_->Flush(FlushOptions()));
SyncPoint::GetInstance()->DisableProcessing();
}
} // namespace titandb } // namespace titandb
} // namespace rocksdb } // namespace rocksdb
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment