Unverified Commit 6683406d authored by Connor's avatar Connor Committed by GitHub

Fix GC may delete a already deleted blob file (#168)

* add deletefilesinrange during gc case
Signed-off-by: 's avatarConnor1996 <zbk602423539@gmail.com>

* check obsolete before apply
Signed-off-by: 's avatarConnor1996 <zbk602423539@gmail.com>
parent c480fea7
...@@ -234,17 +234,16 @@ class BlobFileMeta { ...@@ -234,17 +234,16 @@ class BlobFileMeta {
uint64_t file_number() const { return file_number_; } uint64_t file_number() const { return file_number_; }
uint64_t file_size() const { return file_size_; } uint64_t file_size() const { return file_size_; }
uint64_t live_data_size() const { return live_data_size_; } uint64_t live_data_size() const { return live_data_size_; }
void set_live_data_size(uint64_t size) { live_data_size_ = size; }
uint64_t file_entries() const { return file_entries_; }
uint32_t file_level() const { return file_level_; } uint32_t file_level() const { return file_level_; }
const std::string& smallest_key() const { return smallest_key_; } const std::string& smallest_key() const { return smallest_key_; }
const std::string& largest_key() const { return largest_key_; } const std::string& largest_key() const { return largest_key_; }
void set_live_data_size(uint64_t size) { live_data_size_ = size; }
uint64_t file_entries() const { return file_entries_; }
FileState file_state() const { return state_; } FileState file_state() const { return state_; }
bool is_obsolete() const { return state_ == FileState::kObsolete; } bool is_obsolete() const { return state_ == FileState::kObsolete; }
void FileStateTransit(const FileEvent& event); void FileStateTransit(const FileEvent& event);
bool UpdateLiveDataSize(int64_t delta) { bool UpdateLiveDataSize(int64_t delta) {
int64_t result = static_cast<int64_t>(live_data_size_) + delta; int64_t result = static_cast<int64_t>(live_data_size_) + delta;
if (result < 0) { if (result < 0) {
...@@ -267,10 +266,9 @@ class BlobFileMeta { ...@@ -267,10 +266,9 @@ class BlobFileMeta {
private: private:
// Persistent field // Persistent field
uint64_t file_number_{0}; uint64_t file_number_{0};
uint64_t file_size_{0}; uint64_t file_size_{0};
// Size of data with reference from SST files.
uint64_t live_data_size_{0};
uint64_t file_entries_; uint64_t file_entries_;
// Target level of compaction/flush which generates this blob file // Target level of compaction/flush which generates this blob file
uint32_t file_level_; uint32_t file_level_;
...@@ -280,7 +278,10 @@ class BlobFileMeta { ...@@ -280,7 +278,10 @@ class BlobFileMeta {
std::string largest_key_; std::string largest_key_;
// Not persistent field // Not persistent field
FileState state_{FileState::kInit};
// Size of data with reference from SST files.
std::atomic<uint64_t> live_data_size_{0};
std::atomic<FileState> state_{FileState::kInit};
}; };
// Format of blob file header for version 1 (8 bytes): // Format of blob file header for version 1 (8 bytes):
......
...@@ -3,9 +3,9 @@ ...@@ -3,9 +3,9 @@
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
BlobGC::BlobGC(std::vector<BlobFileMeta*>&& blob_files, BlobGC::BlobGC(std::vector<std::shared_ptr<BlobFileMeta>>&& blob_files,
TitanCFOptions&& _titan_cf_options, bool need_trigger_next) TitanCFOptions&& _titan_cf_options, bool need_trigger_next)
: inputs_(std::move(blob_files)), : inputs_(blob_files),
titan_cf_options_(std::move(_titan_cf_options)), titan_cf_options_(std::move(_titan_cf_options)),
trigger_next_(need_trigger_next) { trigger_next_(need_trigger_next) {
MarkFilesBeingGC(); MarkFilesBeingGC();
......
...@@ -12,7 +12,7 @@ namespace titandb { ...@@ -12,7 +12,7 @@ namespace titandb {
// A BlobGC encapsulates information about a blob gc. // A BlobGC encapsulates information about a blob gc.
class BlobGC { class BlobGC {
public: public:
BlobGC(std::vector<BlobFileMeta*>&& blob_files, BlobGC(std::vector<std::shared_ptr<BlobFileMeta>>&& blob_files,
TitanCFOptions&& _titan_cf_options, bool need_trigger_next); TitanCFOptions&& _titan_cf_options, bool need_trigger_next);
// No copying allowed // No copying allowed
...@@ -21,7 +21,7 @@ class BlobGC { ...@@ -21,7 +21,7 @@ class BlobGC {
~BlobGC(); ~BlobGC();
const std::vector<BlobFileMeta*>& inputs() { return inputs_; } const std::vector<std::shared_ptr<BlobFileMeta>>& inputs() { return inputs_; }
const TitanCFOptions& titan_cf_options() { return titan_cf_options_; } const TitanCFOptions& titan_cf_options() { return titan_cf_options_; }
...@@ -40,7 +40,7 @@ class BlobGC { ...@@ -40,7 +40,7 @@ class BlobGC {
bool trigger_next() { return trigger_next_; } bool trigger_next() { return trigger_next_; }
private: private:
std::vector<BlobFileMeta*> inputs_; std::vector<std::shared_ptr<BlobFileMeta>> inputs_;
std::vector<BlobFileMeta*> outputs_; std::vector<BlobFileMeta*> outputs_;
TitanCFOptions titan_cf_options_; TitanCFOptions titan_cf_options_;
ColumnFamilyHandle* cfh_{nullptr}; ColumnFamilyHandle* cfh_{nullptr};
......
...@@ -347,6 +347,7 @@ Status BlobGCJob::Finish() { ...@@ -347,6 +347,7 @@ Status BlobGCJob::Finish() {
mutex_->Unlock(); mutex_->Unlock();
s = InstallOutputBlobFiles(); s = InstallOutputBlobFiles();
if (s.ok()) { if (s.ok()) {
TEST_SYNC_POINT("BlobGCJob::Finish::BeforeRewriteValidKeyToLSM");
s = RewriteValidKeyToLSM(); s = RewriteValidKeyToLSM();
if (!s.ok()) { if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log, ROCKS_LOG_ERROR(db_options_.info_log,
...@@ -363,11 +364,10 @@ Status BlobGCJob::Finish() { ...@@ -363,11 +364,10 @@ Status BlobGCJob::Finish() {
mutex_->Lock(); mutex_->Lock();
} }
// TODO(@DorianZheng) cal discardable size for new blob file
if (s.ok() && !blob_gc_->GetColumnFamilyData()->IsDropped()) { if (s.ok() && !blob_gc_->GetColumnFamilyData()->IsDropped()) {
s = DeleteInputBlobFiles(); s = DeleteInputBlobFiles();
} }
TEST_SYNC_POINT("BlobGCJob::Finish::AfterRewriteValidKeyToLSM");
if (s.ok()) { if (s.ok()) {
UpdateInternalOpStats(); UpdateInternalOpStats();
...@@ -531,11 +531,14 @@ Status BlobGCJob::DeleteInputBlobFiles() { ...@@ -531,11 +531,14 @@ Status BlobGCJob::DeleteInputBlobFiles() {
metrics_.gc_num_files++; metrics_.gc_num_files++;
RecordInHistogram(statistics(stats_), TITAN_GC_INPUT_FILE_SIZE, RecordInHistogram(statistics(stats_), TITAN_GC_INPUT_FILE_SIZE,
file->file_size()); file->file_size());
if (file->is_obsolete()) {
// There may be a concurrent DeleteBlobFilesInRanges or GC,
// so the input file is already deleted.
continue;
}
edit.DeleteBlobFile(file->file_number(), obsolete_sequence); edit.DeleteBlobFile(file->file_number(), obsolete_sequence);
} }
s = blob_file_set_->LogAndApply(edit); s = blob_file_set_->LogAndApply(edit);
// TODO(@DorianZheng) Purge pending outputs
// base_db_->pending_outputs_.erase(handle->GetNumber());
return s; return s;
} }
......
...@@ -213,7 +213,7 @@ class BlobGCJobTest : public testing::TestWithParam<bool /*gc_merge_mode*/> { ...@@ -213,7 +213,7 @@ class BlobGCJobTest : public testing::TestWithParam<bool /*gc_merge_mode*/> {
ASSERT_OK(WriteBatchInternal::PutBlobIndex(&wb, cfh->GetID(), key, res)); ASSERT_OK(WriteBatchInternal::PutBlobIndex(&wb, cfh->GetID(), key, res));
auto rewrite_status = base_db_->Write(WriteOptions(), &wb); auto rewrite_status = base_db_->Write(WriteOptions(), &wb);
std::vector<BlobFileMeta*> tmp; std::vector<std::shared_ptr<BlobFileMeta>> tmp;
BlobGC blob_gc(std::move(tmp), TitanCFOptions(), false /*trigger_next*/); BlobGC blob_gc(std::move(tmp), TitanCFOptions(), false /*trigger_next*/);
blob_gc.SetColumnFamily(cfh); blob_gc.SetColumnFamily(cfh);
BlobGCJob blob_gc_job(&blob_gc, base_db_, mutex_, TitanDBOptions(), BlobGCJob blob_gc_job(&blob_gc, base_db_, mutex_, TitanDBOptions(),
......
...@@ -19,7 +19,7 @@ BasicBlobGCPicker::~BasicBlobGCPicker() {} ...@@ -19,7 +19,7 @@ BasicBlobGCPicker::~BasicBlobGCPicker() {}
std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC( std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC(
BlobStorage* blob_storage) { BlobStorage* blob_storage) {
Status s; Status s;
std::vector<BlobFileMeta*> blob_files; std::vector<std::shared_ptr<BlobFileMeta>> blob_files;
uint64_t batch_size = 0; uint64_t batch_size = 0;
uint64_t estimate_output_size = 0; uint64_t estimate_output_size = 0;
...@@ -39,7 +39,7 @@ std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC( ...@@ -39,7 +39,7 @@ std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC(
continue; continue;
} }
if (!stop_picking) { if (!stop_picking) {
blob_files.push_back(blob_file.get()); blob_files.emplace_back(blob_file);
batch_size += blob_file->file_size(); batch_size += blob_file->file_size();
estimate_output_size += blob_file->live_data_size(); estimate_output_size += blob_file->live_data_size();
if (batch_size >= cf_options_.max_gc_batch_size || if (batch_size >= cf_options_.max_gc_batch_size ||
......
...@@ -211,6 +211,7 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer, ...@@ -211,6 +211,7 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer,
s = blob_gc_job.Prepare(); s = blob_gc_job.Prepare();
if (s.ok()) { if (s.ok()) {
mutex_.Unlock(); mutex_.Unlock();
TEST_SYNC_POINT("TitanDBImpl::BackgroundGC::BeforeRunGCJob");
s = blob_gc_job.Run(); s = blob_gc_job.Run();
mutex_.Lock(); mutex_.Lock();
} }
......
...@@ -155,6 +155,17 @@ class TitanDBTest : public testing::Test { ...@@ -155,6 +155,17 @@ class TitanDBTest : public testing::Test {
return db_impl_->blob_file_set_->GetBlobStorage(cf_handle->GetID()); return db_impl_->blob_file_set_->GetBlobStorage(cf_handle->GetID());
} }
void CheckBlobFileCount(int count, ColumnFamilyHandle* cf_handle = nullptr) {
db_impl_->TEST_WaitForBackgroundGC();
ASSERT_OK(db_impl_->TEST_PurgeObsoleteFiles());
std::shared_ptr<BlobStorage> blob_storage =
GetBlobStorage(cf_handle).lock();
ASSERT_TRUE(blob_storage != nullptr);
std::map<uint64_t, std::weak_ptr<BlobFileMeta>> blob_files;
blob_storage->ExportBlobFiles(blob_files);
ASSERT_EQ(count, blob_files.size());
}
ColumnFamilyHandle* GetColumnFamilyHandle(uint32_t cf_id) { ColumnFamilyHandle* GetColumnFamilyHandle(uint32_t cf_id) {
return db_impl_->db_impl_->GetColumnFamilyHandleUnlocked(cf_id).release(); return db_impl_->db_impl_->GetColumnFamilyHandleUnlocked(cf_id).release();
} }
...@@ -220,13 +231,17 @@ class TitanDBTest : public testing::Test { ...@@ -220,13 +231,17 @@ class TitanDBTest : public testing::Test {
} }
} }
void CompactAll() { void CompactAll(ColumnFamilyHandle* cf_handle = nullptr) {
if (cf_handle == nullptr) {
cf_handle = db_->DefaultColumnFamily();
}
auto opts = db_->GetOptions(); auto opts = db_->GetOptions();
auto compact_opts = CompactRangeOptions(); auto compact_opts = CompactRangeOptions();
compact_opts.change_level = true; compact_opts.change_level = true;
compact_opts.target_level = opts.num_levels - 1; compact_opts.target_level = opts.num_levels - 1;
compact_opts.bottommost_level_compaction = BottommostLevelCompaction::kSkip; compact_opts.bottommost_level_compaction =
ASSERT_OK(db_->CompactRange(compact_opts, nullptr, nullptr)); BottommostLevelCompaction::kForce;
ASSERT_OK(db_->CompactRange(compact_opts, cf_handle, nullptr, nullptr));
} }
void DeleteFilesInRange(const Slice* begin, const Slice* end) { void DeleteFilesInRange(const Slice* begin, const Slice* end) {
...@@ -1386,6 +1401,54 @@ TEST_F(TitanDBTest, GCAfterReopen) { ...@@ -1386,6 +1401,54 @@ TEST_F(TitanDBTest, GCAfterReopen) {
ASSERT_GT(file2->file_number(), file_number1); ASSERT_GT(file2->file_number(), file_number1);
} }
TEST_F(TitanDBTest, DeleteFilesInRangeDuringGC) {
options_.max_background_gc = 1;
options_.disable_background_gc = false;
options_.blob_file_discardable_ratio = 0.01;
Open();
ASSERT_OK(db_->Put(WriteOptions(), "k1", std::string(10 * 1024, 'v')));
auto snap = db_->GetSnapshot();
ASSERT_OK(db_->Put(WriteOptions(), "k1", std::string(100 * 1024, 'v')));
Flush();
db_->ReleaseSnapshot(snap);
SyncPoint::GetInstance()->LoadDependency(
{{"TitanDBImpl::BackgroundGC::BeforeRunGCJob",
"TitanDBTest::DeleteFilesInRangeDuringGC::WaitGCStart"},
{"TitanDBTest::DeleteFilesInRangeDuringGC::ContinueGC",
"BlobGCJob::Finish::BeforeRewriteValidKeyToLSM"},
{"BlobGCJob::Finish::AfterRewriteValidKeyToLSM",
"TitanDBTest::DeleteFilesInRangeDuringGC::WaitGCFinish"}});
SyncPoint::GetInstance()->EnableProcessing();
CheckBlobFileCount(1);
CompactAll();
std::shared_ptr<BlobStorage> blob_storage = GetBlobStorage().lock();
ASSERT_TRUE(blob_storage != nullptr);
std::map<uint64_t, std::weak_ptr<BlobFileMeta>> blob_files;
blob_storage->ExportBlobFiles(blob_files);
ASSERT_EQ(blob_files.size(), 1);
// trigger GC
CompactAll();
TEST_SYNC_POINT("TitanDBTest::DeleteFilesInRangeDuringGC::WaitGCStart");
DeleteFilesInRange(nullptr, nullptr);
TEST_SYNC_POINT("TitanDBTest::DeleteFilesInRangeDuringGC::ContinueGC");
TEST_SYNC_POINT("TitanDBTest::DeleteFilesInRangeDuringGC::WaitGCFinish");
std::string value;
Status s = db_->Get(ReadOptions(), "k1", &value);
ASSERT_TRUE(s.IsNotFound());
// it shouldn't be any background error
ASSERT_OK(db_->Flush(FlushOptions()));
SyncPoint::GetInstance()->DisableProcessing();
}
} // namespace titandb } // namespace titandb
} // namespace rocksdb } // namespace rocksdb
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment