Unverified Commit 167e65b2 authored by Connor's avatar Connor Committed by GitHub

Fix wrong assert delta < 0 for cocurrent compaction while flush (#172)

* fix cocurrent compaction while flush
Signed-off-by: 's avatarConnor1996 <zbk602423539@gmail.com>
parent c4e3271c
...@@ -280,6 +280,15 @@ class BlobFileMeta { ...@@ -280,6 +280,15 @@ class BlobFileMeta {
// Not persistent field // Not persistent field
// Size of data with reference from SST files. // Size of data with reference from SST files.
//
// Because the new generated SST is added to superversion before
// `OnFlushCompleted()`/`OnCompactionCompleted()` is called, so if there is a
// later compaction trigger by the new generated SST, the later
// `OnCompactionCompleted()` maybe called before the previous events'
// `OnFlushCompleted()`/`OnCompactionCompleted()` is called.
// So when state_ == kPendingLSM, it uses this to record the delta as a
// positive number if any later compaction is trigger before previous
// `OnCompactionCompleted()` is called.
std::atomic<uint64_t> live_data_size_{0}; std::atomic<uint64_t> live_data_size_{0};
std::atomic<FileState> state_{FileState::kInit}; std::atomic<FileState> state_{FileState::kInit};
}; };
......
...@@ -91,7 +91,14 @@ void BlobStorage::AddBlobFile(std::shared_ptr<BlobFileMeta>& file) { ...@@ -91,7 +91,14 @@ void BlobStorage::AddBlobFile(std::shared_ptr<BlobFileMeta>& file) {
files_.emplace(std::make_pair(file->file_number(), file)); files_.emplace(std::make_pair(file->file_number(), file));
blob_ranges_.emplace(std::make_pair(Slice(file->smallest_key()), file)); blob_ranges_.emplace(std::make_pair(Slice(file->smallest_key()), file));
levels_file_count_[file->file_level()]++; levels_file_count_[file->file_level()]++;
AddStats(stats_, cf_id_, file->GetDiscardableRatioLevel(), 1); if (file->live_data_size() != 0) {
// When live data size == 0, it means the live size of blob file is unknown
// now.
// So don't count this metrics now, it will delayed to when setting the real
// live data size
// in `InitializeGC()` and `OnFlushCompleted()`/`OnCompactionCompleted()`.
AddStats(stats_, cf_id_, file->GetDiscardableRatioLevel(), 1);
}
AddStats(stats_, cf_id_, TitanInternalStats::LIVE_BLOB_FILE_SIZE, AddStats(stats_, cf_id_, TitanInternalStats::LIVE_BLOB_FILE_SIZE,
file->file_size()); file->file_size());
AddStats(stats_, cf_id_, TitanInternalStats::NUM_LIVE_BLOB_FILE, 1); AddStats(stats_, cf_id_, TitanInternalStats::NUM_LIVE_BLOB_FILE, 1);
......
...@@ -1109,6 +1109,7 @@ bool TitanDBImpl::GetIntProperty(ColumnFamilyHandle* column_family, ...@@ -1109,6 +1109,7 @@ bool TitanDBImpl::GetIntProperty(ColumnFamilyHandle* column_family,
} }
void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) { void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) {
TEST_SYNC_POINT("TitanDBImpl::OnFlushCompleted:Begin1");
TEST_SYNC_POINT("TitanDBImpl::OnFlushCompleted:Begin"); TEST_SYNC_POINT("TitanDBImpl::OnFlushCompleted:Begin");
if (!initialized()) { if (!initialized()) {
assert(false); assert(false);
...@@ -1164,10 +1165,18 @@ void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) { ...@@ -1164,10 +1165,18 @@ void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) {
delta = 0; delta = 0;
} }
// the metrics is counted in the table builder when flushing, if (file->live_data_size() != 0) {
// so update it when updateing the live data size. // Because the flushed SST is added to superversion before
SubStats(stats_.get(), flush_job_info.cf_id, // `OnFlushCompleted()` is called, so if there is a concurrent
file->GetDiscardableRatioLevel(), 1); // compaction, `OnCompactionCompleted()` maybe called before
// `OnFlushCompleted()` is called.
// In this case, the state of the blob file generated by the flush is
// still `kPendingLSM`, while the blob file size delta is for the
// compaction event. So it is possible that delta is negative.
// It records the delta as a positive number if any, so here subtract it
// from the total live data size.
delta -= file->live_data_size();
}
file->set_live_data_size(static_cast<uint64_t>(delta)); file->set_live_data_size(static_cast<uint64_t>(delta));
AddStats(stats_.get(), flush_job_info.cf_id, AddStats(stats_.get(), flush_job_info.cf_id,
file->GetDiscardableRatioLevel(), 1); file->GetDiscardableRatioLevel(), 1);
...@@ -1250,17 +1259,44 @@ void TitanDBImpl::OnCompactionCompleted( ...@@ -1250,17 +1259,44 @@ void TitanDBImpl::OnCompactionCompleted(
continue; continue;
} }
if (file->file_state() == BlobFileMeta::FileState::kPendingLSM) { if (file->file_state() == BlobFileMeta::FileState::kPendingLSM) {
if (delta < 0) { if (delta <= 0) {
// Cannot happen.. // Because the new generated SST is added to superversion before
ROCKS_LOG_WARN(db_options_.info_log, // `OnFlushCompleted()`/`OnCompactionCompleted()` is called, so if
"OnCompactionCompleted[%d]: New blob file %" PRIu64 // there is a later compaction trigger by the new generated SST, the
" live size being negative", // later `OnCompactionCompleted()` maybe called before the previous
// events' `OnFlushCompleted()`/`OnCompactionCompleted()` is called.
// In this case, the state of the blob file generated by the
// flush/compaction is still `kPendingLSM`, while the blob file size
// delta is for the later compaction event. So it is possible that
// delta is negative.
// To make the live data size accurate, here records the delta as a
// positive number. And the delta will be subtracted with total live
// data size in the previous
// `OnFlushCompleted()`/`OnCompactionCompleted()`.
bool ok = file->UpdateLiveDataSize(static_cast<uint64_t>(-delta));
if (!ok) {
// Cannot happen
ROCKS_LOG_WARN(
db_options_.info_log,
"OnCompactionCompleted[%d]: pendingLSM Blob file %" PRIu64
" live size below zero.",
compaction_job_info.job_id, file_number);
assert(false);
}
ROCKS_LOG_INFO(db_options_.info_log,
"OnCompactionCompleted[%d]: Get blob file %" PRIu64
" live size being negative, maybe due to "
"OnFlushCompleted() is called yet",
compaction_job_info.job_id, file_number); compaction_job_info.job_id, file_number);
assert(false); continue;
delta = 0; }
if (file->live_data_size() != 0) {
// It records the delta as a positive number if any later compaction
// is trigger before previous `OnCompactionCompleted()` is called, so
// here subtract it
// from the total live data size.
delta -= file->live_data_size();
} }
SubStats(stats_.get(), compaction_job_info.cf_id,
file->GetDiscardableRatioLevel(), 1);
file->set_live_data_size(static_cast<uint64_t>(delta)); file->set_live_data_size(static_cast<uint64_t>(delta));
AddStats(stats_.get(), compaction_job_info.cf_id, AddStats(stats_.get(), compaction_job_info.cf_id,
file->GetDiscardableRatioLevel(), 1); file->GetDiscardableRatioLevel(), 1);
......
...@@ -83,8 +83,6 @@ Status TitanDBImpl::InitializeGC( ...@@ -83,8 +83,6 @@ Status TitanDBImpl::InitializeGC(
blob_storage->FindFile(file_size.first).lock(); blob_storage->FindFile(file_size.first).lock();
if (file != nullptr) { if (file != nullptr) {
assert(file->live_data_size() == 0); assert(file->live_data_size() == 0);
SubStats(stats_.get(), cf_handle->GetID(),
file->GetDiscardableRatioLevel(), 1);
file->set_live_data_size(static_cast<uint64_t>(file_size.second)); file->set_live_data_size(static_cast<uint64_t>(file_size.second));
AddStats(stats_.get(), cf_handle->GetID(), AddStats(stats_.get(), cf_handle->GetID(),
file->GetDiscardableRatioLevel(), 1); file->GetDiscardableRatioLevel(), 1);
......
...@@ -1401,6 +1401,46 @@ TEST_F(TitanDBTest, GCAfterReopen) { ...@@ -1401,6 +1401,46 @@ TEST_F(TitanDBTest, GCAfterReopen) {
ASSERT_GT(file2->file_number(), file_number1); ASSERT_GT(file2->file_number(), file_number1);
} }
TEST_F(TitanDBTest, CompactionDuringFlush) {
options_.max_background_gc = 1;
options_.disable_background_gc = true;
Open();
ASSERT_OK(db_->Put(WriteOptions(), "k1", "value"));
Flush();
SyncPoint::GetInstance()->LoadDependency(
{{"TitanDBImpl::OnFlushCompleted:Begin1",
"TitanDBTest::CompactionDuringFlush::WaitFlushStart"},
{"TitanDBTest::CompactionDuringFlush::ContinueFlush",
"TitanDBImpl::OnFlushCompleted:Begin"}});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(db_->Put(WriteOptions(), "k1", std::string(10 * 1024, 'v')));
auto snap = db_->GetSnapshot();
ASSERT_OK(db_->Delete(WriteOptions(), "k1"));
port::Thread writer([&]() { Flush(); });
TEST_SYNC_POINT("TitanDBTest::CompactionDuringFlush::WaitFlushStart");
db_->ReleaseSnapshot(snap);
auto compact_opts = CompactRangeOptions();
compact_opts.bottommost_level_compaction = BottommostLevelCompaction::kForce;
ASSERT_OK(db_->CompactRange(compact_opts, nullptr, nullptr));
ASSERT_OK(db_->CompactRange(compact_opts, nullptr, nullptr));
TEST_SYNC_POINT("TitanDBTest::CompactionDuringFlush::ContinueFlush");
writer.join();
CheckBlobFileCount(1);
SyncPoint::GetInstance()->DisableProcessing();
std::string value;
Status s = db_->Get(ReadOptions(), "k1", &value);
ASSERT_TRUE(s.IsNotFound());
// it shouldn't be any background error
ASSERT_OK(db_->Flush(FlushOptions()));
}
TEST_F(TitanDBTest, CompactionDuringGC) { TEST_F(TitanDBTest, CompactionDuringGC) {
options_.max_background_gc = 1; options_.max_background_gc = 1;
options_.disable_background_gc = false; options_.disable_background_gc = false;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment