Unverified Commit 81814ece authored by Connor's avatar Connor Committed by GitHub

Fix GC may delete a already deleted blob file (#168) (#169)

parent 80657c08
......@@ -234,17 +234,16 @@ class BlobFileMeta {
uint64_t file_number() const { return file_number_; }
uint64_t file_size() const { return file_size_; }
uint64_t live_data_size() const { return live_data_size_; }
void set_live_data_size(uint64_t size) { live_data_size_ = size; }
uint64_t file_entries() const { return file_entries_; }
uint32_t file_level() const { return file_level_; }
const std::string& smallest_key() const { return smallest_key_; }
const std::string& largest_key() const { return largest_key_; }
void set_live_data_size(uint64_t size) { live_data_size_ = size; }
uint64_t file_entries() const { return file_entries_; }
FileState file_state() const { return state_; }
bool is_obsolete() const { return state_ == FileState::kObsolete; }
void FileStateTransit(const FileEvent& event);
bool UpdateLiveDataSize(int64_t delta) {
int64_t result = static_cast<int64_t>(live_data_size_) + delta;
if (result < 0) {
......@@ -267,6 +266,7 @@ class BlobFileMeta {
private:
// Persistent field
uint64_t file_number_{0};
uint64_t file_size_{0};
uint64_t file_entries_;
......
......@@ -3,9 +3,9 @@
namespace rocksdb {
namespace titandb {
BlobGC::BlobGC(std::vector<BlobFileMeta*>&& blob_files,
BlobGC::BlobGC(std::vector<std::shared_ptr<BlobFileMeta>>&& blob_files,
TitanCFOptions&& _titan_cf_options, bool need_trigger_next)
: inputs_(std::move(blob_files)),
: inputs_(blob_files),
titan_cf_options_(std::move(_titan_cf_options)),
trigger_next_(need_trigger_next) {
MarkFilesBeingGC();
......
......@@ -12,7 +12,7 @@ namespace titandb {
// A BlobGC encapsulates information about a blob gc.
class BlobGC {
public:
BlobGC(std::vector<BlobFileMeta*>&& blob_files,
BlobGC(std::vector<std::shared_ptr<BlobFileMeta>>&& blob_files,
TitanCFOptions&& _titan_cf_options, bool need_trigger_next);
// No copying allowed
......@@ -21,7 +21,7 @@ class BlobGC {
~BlobGC();
const std::vector<BlobFileMeta*>& inputs() { return inputs_; }
const std::vector<std::shared_ptr<BlobFileMeta>>& inputs() { return inputs_; }
const TitanCFOptions& titan_cf_options() { return titan_cf_options_; }
......@@ -40,7 +40,7 @@ class BlobGC {
bool trigger_next() { return trigger_next_; }
private:
std::vector<BlobFileMeta*> inputs_;
std::vector<std::shared_ptr<BlobFileMeta>> inputs_;
std::vector<BlobFileMeta*> outputs_;
TitanCFOptions titan_cf_options_;
ColumnFamilyHandle* cfh_{nullptr};
......
......@@ -364,8 +364,6 @@ Status BlobGCJob::Finish() {
mutex_->Lock();
}
// TODO(@DorianZheng) cal discardable size for new blob file
if (s.ok() && !blob_gc_->GetColumnFamilyData()->IsDropped()) {
s = DeleteInputBlobFiles();
}
......@@ -567,11 +565,14 @@ Status BlobGCJob::DeleteInputBlobFiles() {
metrics_.gc_num_files++;
RecordInHistogram(statistics(stats_), TITAN_GC_INPUT_FILE_SIZE,
file->file_size());
if (file->is_obsolete()) {
// There may be a concurrent DeleteBlobFilesInRanges or GC,
// so the input file is already deleted.
continue;
}
edit.DeleteBlobFile(file->file_number(), obsolete_sequence);
}
s = blob_file_set_->LogAndApply(edit);
// TODO(@DorianZheng) Purge pending outputs
// base_db_->pending_outputs_.erase(handle->GetNumber());
return s;
}
......
......@@ -213,7 +213,7 @@ class BlobGCJobTest : public testing::TestWithParam<bool /*gc_merge_mode*/> {
ASSERT_OK(WriteBatchInternal::PutBlobIndex(&wb, cfh->GetID(), key, res));
auto rewrite_status = base_db_->Write(WriteOptions(), &wb);
std::vector<BlobFileMeta*> tmp;
std::vector<std::shared_ptr<BlobFileMeta>> tmp;
BlobGC blob_gc(std::move(tmp), TitanCFOptions(), false /*trigger_next*/);
blob_gc.SetColumnFamily(cfh);
BlobGCJob blob_gc_job(&blob_gc, base_db_, mutex_, TitanDBOptions(),
......
......@@ -19,7 +19,7 @@ BasicBlobGCPicker::~BasicBlobGCPicker() {}
std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC(
BlobStorage* blob_storage) {
Status s;
std::vector<BlobFileMeta*> blob_files;
std::vector<std::shared_ptr<BlobFileMeta>> blob_files;
uint64_t batch_size = 0;
uint64_t estimate_output_size = 0;
......@@ -39,7 +39,7 @@ std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC(
continue;
}
if (!stop_picking) {
blob_files.push_back(blob_file.get());
blob_files.emplace_back(blob_file);
batch_size += blob_file->file_size();
estimate_output_size += blob_file->live_data_size();
if (batch_size >= cf_options_.max_gc_batch_size ||
......
......@@ -209,6 +209,7 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer,
s = blob_gc_job.Prepare();
if (s.ok()) {
mutex_.Unlock();
TEST_SYNC_POINT("TitanDBImpl::BackgroundGC::BeforeRunGCJob");
s = blob_gc_job.Run();
TEST_SYNC_POINT("TitanDBImpl::BackgroundGC::AfterRunGCJob");
mutex_.Lock();
......
......@@ -1501,6 +1501,54 @@ TEST_F(TitanDBTest, CompactionDuringGC) {
CheckBlobFileCount(0);
}
TEST_F(TitanDBTest, DeleteFilesInRangeDuringGC) {
options_.max_background_gc = 1;
options_.disable_background_gc = false;
options_.blob_file_discardable_ratio = 0.01;
Open();
ASSERT_OK(db_->Put(WriteOptions(), "k1", std::string(10 * 1024, 'v')));
auto snap = db_->GetSnapshot();
ASSERT_OK(db_->Put(WriteOptions(), "k1", std::string(100 * 1024, 'v')));
Flush();
db_->ReleaseSnapshot(snap);
SyncPoint::GetInstance()->LoadDependency(
{{"TitanDBImpl::BackgroundGC::BeforeRunGCJob",
"TitanDBTest::DeleteFilesInRangeDuringGC::WaitGCStart"},
{"TitanDBTest::DeleteFilesInRangeDuringGC::ContinueGC",
"BlobGCJob::Finish::BeforeRewriteValidKeyToLSM"},
{"BlobGCJob::Finish::AfterRewriteValidKeyToLSM",
"TitanDBTest::DeleteFilesInRangeDuringGC::WaitGCFinish"}});
SyncPoint::GetInstance()->EnableProcessing();
CheckBlobFileCount(1);
CompactAll();
std::shared_ptr<BlobStorage> blob_storage = GetBlobStorage().lock();
ASSERT_TRUE(blob_storage != nullptr);
std::map<uint64_t, std::weak_ptr<BlobFileMeta>> blob_files;
blob_storage->ExportBlobFiles(blob_files);
ASSERT_EQ(blob_files.size(), 1);
// trigger GC
CompactAll();
TEST_SYNC_POINT("TitanDBTest::DeleteFilesInRangeDuringGC::WaitGCStart");
DeleteFilesInRange(nullptr, nullptr);
TEST_SYNC_POINT("TitanDBTest::DeleteFilesInRangeDuringGC::ContinueGC");
TEST_SYNC_POINT("TitanDBTest::DeleteFilesInRangeDuringGC::WaitGCFinish");
std::string value;
Status s = db_->Get(ReadOptions(), "k1", &value);
ASSERT_TRUE(s.IsNotFound());
// it shouldn't be any background error
ASSERT_OK(db_->Flush(FlushOptions()));
SyncPoint::GetInstance()->DisableProcessing();
}
} // namespace titandb
} // namespace rocksdb
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment