Unverified Commit 8768067e authored by yiwu-arbug's avatar yiwu-arbug Committed by GitHub

Prevent CF being dropped while GC is running (#72)

Summary:
If `DropColumnFamilies` is called while GC is running, GC can fail because the CF is gone. The GC job will then set background error which halts Titan as a whole. To prevent it, we let `DropColumnFamilies` wait till there no running GC before proceed, and let GC job wait for pending drop CF requests before start running. Fixes #71.

Also fix `DropColumnFamilies` mark obsolete file as obsolete again, which will cause assert failure.

Test Plan:
Run titandb_stress and the above issues don't reproduce.
parent 04850259
...@@ -249,10 +249,12 @@ Status BlobFileSet::DropColumnFamilies( ...@@ -249,10 +249,12 @@ Status BlobFileSet::DropColumnFamilies(
VersionEdit edit; VersionEdit edit;
edit.SetColumnFamilyID(it->first); edit.SetColumnFamilyID(it->first);
for (auto& file : it->second->files_) { for (auto& file : it->second->files_) {
ROCKS_LOG_INFO(db_options_.info_log, if (!file.second->is_obsolete()) {
"Titan add obsolete file [%" PRIu64 "]", ROCKS_LOG_INFO(db_options_.info_log,
file.second->file_number()); "Titan add obsolete file [%" PRIu64 "]",
edit.DeleteBlobFile(file.first, obsolete_sequence); file.second->file_number());
edit.DeleteBlobFile(file.first, obsolete_sequence);
}
} }
s = LogAndApply(edit); s = LogAndApply(edit);
if (!s.ok()) return s; if (!s.ok()) return s;
......
...@@ -76,6 +76,11 @@ class BlobFileSet { ...@@ -76,6 +76,11 @@ class BlobFileSet {
} }
} }
// REQUIRES: mutex is held
bool IsColumnFamilyObsolete(uint32_t cf_id) {
return obsolete_columns_.count(cf_id) > 0;
}
private: private:
friend class BlobFileSizeCollectorTest; friend class BlobFileSizeCollectorTest;
friend class VersionTest; friend class VersionTest;
......
...@@ -259,19 +259,10 @@ Status TitanDBImpl::Open(const std::vector<TitanCFDescriptor>& descs, ...@@ -259,19 +259,10 @@ Status TitanDBImpl::Open(const std::vector<TitanCFDescriptor>& descs,
s = blob_file_set_->Open(column_families); s = blob_file_set_->Open(column_families);
if (!s.ok()) return s; if (!s.ok()) return s;
static bool has_init_background_threads = false; // Initialize GC thread pool.
if (!has_init_background_threads) { if (!db_options_.disable_background_gc && db_options_.max_background_gc > 0) {
auto bottom_pri_threads_num = env_->IncBackgroundThreadsIfNeeded(db_options_.max_background_gc,
env_->GetBackgroundThreads(Env::Priority::BOTTOM); Env::Priority::BOTTOM);
if (!db_options_.disable_background_gc &&
db_options_.max_background_gc > 0) {
env_->IncBackgroundThreadsIfNeeded(
db_options_.max_background_gc + bottom_pri_threads_num,
Env::Priority::BOTTOM);
assert(env_->GetBackgroundThreads(Env::Priority::BOTTOM) ==
bottom_pri_threads_num + db_options_.max_background_gc);
}
has_init_background_threads = true;
} }
s = DB::Open(db_options_, dbname_, base_descs, handles, &db_); s = DB::Open(db_options_, dbname_, base_descs, handles, &db_);
...@@ -402,17 +393,35 @@ Status TitanDBImpl::CreateColumnFamilies( ...@@ -402,17 +393,35 @@ Status TitanDBImpl::CreateColumnFamilies(
Status TitanDBImpl::DropColumnFamilies( Status TitanDBImpl::DropColumnFamilies(
const std::vector<ColumnFamilyHandle*>& handles) { const std::vector<ColumnFamilyHandle*>& handles) {
TEST_SYNC_POINT("TitanDBImpl::DropColumnFamilies:Begin");
std::vector<uint32_t> column_families; std::vector<uint32_t> column_families;
std::string column_families_str; std::string column_families_str;
for (auto& handle : handles) { for (auto& handle : handles) {
column_families.emplace_back(handle->GetID()); column_families.emplace_back(handle->GetID());
column_families_str += "[" + handle->GetName() + "]"; column_families_str += "[" + handle->GetName() + "]";
} }
{
MutexLock l(&mutex_);
drop_cf_requests_++;
// Has to wait till no GC job is running before proceed, otherwise GC jobs
// can fail and set background error.
// TODO(yiwu): only wait for GC jobs of CFs being dropped.
while (bg_gc_running_ > 0) {
bg_cv_.Wait();
}
}
TEST_SYNC_POINT_CALLBACK("TitanDBImpl::DropColumnFamilies:BeforeBaseDBDropCF",
nullptr);
Status s = db_impl_->DropColumnFamilies(handles); Status s = db_impl_->DropColumnFamilies(handles);
if (s.ok()) { if (s.ok()) {
MutexLock l(&mutex_); MutexLock l(&mutex_);
SequenceNumber obsolete_sequence = db_impl_->GetLatestSequenceNumber(); SequenceNumber obsolete_sequence = db_impl_->GetLatestSequenceNumber();
s = blob_file_set_->DropColumnFamilies(column_families, obsolete_sequence); s = blob_file_set_->DropColumnFamilies(column_families, obsolete_sequence);
drop_cf_requests_--;
if (drop_cf_requests_ == 0) {
bg_cv_.SignalAll();
}
} }
if (s.ok()) { if (s.ok()) {
ROCKS_LOG_INFO(db_options_.info_log, "Dropped column families: %s", ROCKS_LOG_INFO(db_options_.info_log, "Dropped column families: %s",
......
...@@ -137,6 +137,11 @@ class TitanDBImpl : public TitanDB { ...@@ -137,6 +137,11 @@ class TitanDBImpl : public TitanDB {
Status TEST_StartGC(uint32_t column_family_id); Status TEST_StartGC(uint32_t column_family_id);
Status TEST_PurgeObsoleteFiles(); Status TEST_PurgeObsoleteFiles();
int TEST_bg_gc_running() {
MutexLock l(&mutex_);
return bg_gc_running_;
}
private: private:
class FileManager; class FileManager;
friend class FileManager; friend class FileManager;
...@@ -200,6 +205,9 @@ class TitanDBImpl : public TitanDB { ...@@ -200,6 +205,9 @@ class TitanDBImpl : public TitanDB {
return oldest_snapshot; return oldest_snapshot;
} }
// REQUIRE: mutex_ held
bool HasPendingDropCFRequest(uint32_t cf_id);
// REQUIRE: mutex_ held // REQUIRE: mutex_ held
Status SetBGError(const Status& s); Status SetBGError(const Status& s);
...@@ -219,7 +227,9 @@ class TitanDBImpl : public TitanDB { ...@@ -219,7 +227,9 @@ class TitanDBImpl : public TitanDB {
// potential dead lock. // potential dead lock.
mutable port::Mutex mutex_; mutable port::Mutex mutex_;
// This condition variable is signaled on these conditions: // This condition variable is signaled on these conditions:
// * whenever bg_gc_scheduled_ goes down to 0 // * whenever bg_gc_scheduled_ goes down to 0.
// * whenever bg_gc_running_ goes down to 0.
// * whenever drop_cf_requests_ goes down to 0.
port::CondVar bg_cv_; port::CondVar bg_cv_;
std::string dbname_; std::string dbname_;
...@@ -253,10 +263,14 @@ class TitanDBImpl : public TitanDB { ...@@ -253,10 +263,14 @@ class TitanDBImpl : public TitanDB {
// pending_gc_ hold column families that already on gc_queue_. // pending_gc_ hold column families that already on gc_queue_.
std::deque<uint32_t> gc_queue_; std::deque<uint32_t> gc_queue_;
// Guarded by mutex_. // REQUIRE: mutex_ held.
int bg_gc_scheduled_{0}; int bg_gc_scheduled_ = 0;
// REQUIRE: mutex_ held // REQUIRE: mutex_ held.
int unscheduled_gc_{0}; int bg_gc_running_ = 0;
// REQUIRE: mutex_ held.
int unscheduled_gc_ = 0;
// REQUIRE: mutex_ held.
int drop_cf_requests_ = 0;
std::atomic_bool shuting_down_{false}; std::atomic_bool shuting_down_{false};
}; };
......
#include "db_impl.h" #include "db_impl.h"
#include "test_util/sync_point.h"
#include "blob_file_iterator.h" #include "blob_file_iterator.h"
#include "blob_gc_job.h" #include "blob_gc_job.h"
#include "blob_gc_picker.h" #include "blob_gc_picker.h"
...@@ -28,11 +30,17 @@ void TitanDBImpl::BGWorkGC(void* db) { ...@@ -28,11 +30,17 @@ void TitanDBImpl::BGWorkGC(void* db) {
void TitanDBImpl::BackgroundCallGC() { void TitanDBImpl::BackgroundCallGC() {
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get()); LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
TEST_SYNC_POINT("TitanDBImpl::BackgroundCallGC:BeforeGCRunning");
{ {
MutexLock l(&mutex_); MutexLock l(&mutex_);
assert(bg_gc_scheduled_ > 0); assert(bg_gc_scheduled_ > 0);
while (drop_cf_requests_ > 0) {
bg_cv_.Wait();
}
bg_gc_running_++;
TEST_SYNC_POINT("TitanDBImpl::BackgroundCallGC:BeforeBackgroundGC");
BackgroundGC(&log_buffer); BackgroundGC(&log_buffer);
{ {
...@@ -42,13 +50,14 @@ void TitanDBImpl::BackgroundCallGC() { ...@@ -42,13 +50,14 @@ void TitanDBImpl::BackgroundCallGC() {
mutex_.Lock(); mutex_.Lock();
} }
bg_gc_running_--;
bg_gc_scheduled_--; bg_gc_scheduled_--;
MaybeScheduleGC(); MaybeScheduleGC();
if (bg_gc_scheduled_ == 0) { if (bg_gc_scheduled_ == 0 || bg_gc_running_ == 0) {
// signal if // Signal DB destructor if bg_gc_scheduled_ drop to 0.
// * bg_gc_scheduled_ == 0 -- need to wakeup ~TitanDBImpl // Signal drop CF requests if bg_gc_running_ drop to 0.
// If none of this is true, there is no need to signal since nobody is // If none of this is true, there is no need to signal since nobody is
// waiting for it // waiting for it.
bg_cv_.SignalAll(); bg_cv_.SignalAll();
} }
// IMPORTANT: there should be no code after calling SignalAll. This call may // IMPORTANT: there should be no code after calling SignalAll. This call may
...@@ -67,13 +76,20 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) { ...@@ -67,13 +76,20 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) {
Status s; Status s;
if (!gc_queue_.empty()) { if (!gc_queue_.empty()) {
uint32_t column_family_id = PopFirstFromGCQueue(); uint32_t column_family_id = PopFirstFromGCQueue();
auto bs = blob_file_set_->GetBlobStorage(column_family_id).lock().get(); std::shared_ptr<BlobStorage> blob_storage;
// Skip CFs that have been dropped.
if (bs) { if (!blob_file_set_->IsColumnFamilyObsolete(column_family_id)) {
const auto& cf_options = bs->cf_options(); blob_storage = blob_file_set_->GetBlobStorage(column_family_id).lock();
} else {
TEST_SYNC_POINT_CALLBACK("TitanDBImpl::BackgroundGC:CFDropped", nullptr);
ROCKS_LOG_BUFFER(log_buffer, "GC skip dropped colum family [%s].",
cf_info_[column_family_id].name.c_str());
}
if (blob_storage != nullptr) {
const auto& cf_options = blob_storage->cf_options();
std::shared_ptr<BlobGCPicker> blob_gc_picker = std::shared_ptr<BlobGCPicker> blob_gc_picker =
std::make_shared<BasicBlobGCPicker>(db_options_, cf_options); std::make_shared<BasicBlobGCPicker>(db_options_, cf_options);
blob_gc = blob_gc_picker->PickBlobGC(bs); blob_gc = blob_gc_picker->PickBlobGC(blob_storage.get());
if (blob_gc) { if (blob_gc) {
cfh = db_impl_->GetColumnFamilyHandleUnlocked(column_family_id); cfh = db_impl_->GetColumnFamilyHandleUnlocked(column_family_id);
...@@ -122,20 +138,23 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) { ...@@ -122,20 +138,23 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) {
s.ToString().c_str()); s.ToString().c_str());
} }
TEST_SYNC_POINT("TitanDBImpl::BackgroundGC:Finish");
return s; return s;
} }
// TODO(yiwu): merge with BackgroundGC().
Status TitanDBImpl::TEST_StartGC(uint32_t column_family_id) { Status TitanDBImpl::TEST_StartGC(uint32_t column_family_id) {
{
MutexLock l(&mutex_);
bg_gc_scheduled_++;
}
// BackgroundCallGC // BackgroundCallGC
Status s; Status s;
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get()); LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
{ {
MutexLock l(&mutex_); MutexLock l(&mutex_);
assert(bg_gc_scheduled_ > 0); // Prevent CF being dropped while GC is running.
while (drop_cf_requests_ > 0) {
bg_cv_.Wait();
}
bg_gc_running_++;
bg_gc_scheduled_++;
// BackgroudGC // BackgroudGC
StopWatch gc_sw(env_, statistics(stats_.get()), BLOB_DB_GC_MICROS); StopWatch gc_sw(env_, statistics(stats_.get()), BLOB_DB_GC_MICROS);
...@@ -143,6 +162,10 @@ Status TitanDBImpl::TEST_StartGC(uint32_t column_family_id) { ...@@ -143,6 +162,10 @@ Status TitanDBImpl::TEST_StartGC(uint32_t column_family_id) {
std::unique_ptr<BlobGC> blob_gc; std::unique_ptr<BlobGC> blob_gc;
std::unique_ptr<ColumnFamilyHandle> cfh; std::unique_ptr<ColumnFamilyHandle> cfh;
if (blob_file_set_->IsColumnFamilyObsolete(column_family_id)) {
return Status::ColumnFamilyDropped(
"Column Family has been dropped before GC.");
}
auto bs = blob_file_set_->GetBlobStorage(column_family_id).lock().get(); auto bs = blob_file_set_->GetBlobStorage(column_family_id).lock().get();
const auto& cf_options = bs->cf_options(); const auto& cf_options = bs->cf_options();
std::shared_ptr<BlobGCPicker> blob_gc_picker = std::shared_ptr<BlobGCPicker> blob_gc_picker =
...@@ -189,8 +212,9 @@ Status TitanDBImpl::TEST_StartGC(uint32_t column_family_id) { ...@@ -189,8 +212,9 @@ Status TitanDBImpl::TEST_StartGC(uint32_t column_family_id) {
mutex_.Lock(); mutex_.Lock();
} }
bg_gc_running_--;
bg_gc_scheduled_--; bg_gc_scheduled_--;
if (bg_gc_scheduled_ == 0) { if (bg_gc_scheduled_ == 0 || bg_gc_running_ == 0) {
bg_cv_.SignalAll(); bg_cv_.SignalAll();
} }
} }
......
...@@ -45,6 +45,8 @@ class TitanDBTest : public testing::Test { ...@@ -45,6 +45,8 @@ class TitanDBTest : public testing::Test {
Close(); Close();
DeleteDir(env_, options_.dirname); DeleteDir(env_, options_.dirname);
DeleteDir(env_, dbname_); DeleteDir(env_, dbname_);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
} }
void Open() { void Open() {
...@@ -252,10 +254,17 @@ class TitanDBTest : public testing::Test { ...@@ -252,10 +254,17 @@ class TitanDBTest : public testing::Test {
} }
void CallGC() { void CallGC() {
db_impl_->bg_gc_scheduled_++; {
MutexLock l(&db_impl_->mutex_);
db_impl_->bg_gc_scheduled_++;
}
db_impl_->BackgroundCallGC(); db_impl_->BackgroundCallGC();
while (db_impl_->bg_gc_scheduled_) {
; MutexLock l(&db_impl_->mutex_);
while (db_impl_->bg_gc_scheduled_) {
db_impl_->bg_cv_.Wait();
}
}
} }
// Make db ignore first bg_error // Make db ignore first bg_error
...@@ -935,6 +944,89 @@ TEST_F(TitanDBTest, BackgroundErrorTrigger) { ...@@ -935,6 +944,89 @@ TEST_F(TitanDBTest, BackgroundErrorTrigger) {
Close(); Close();
} }
// Make sure DropColumnFamilies() will wait if there's running GC job.
TEST_F(TitanDBTest, DropCFWhileGC) {
options_.min_blob_size = 0;
options_.blob_file_discardable_ratio = 0.1;
options_.disable_background_gc = false;
Open();
// Create CF.
std::vector<TitanCFDescriptor> descs = {{"new_cf", options_}};
std::vector<ColumnFamilyHandle*> handles;
ASSERT_OK(db_->CreateColumnFamilies(descs, &handles));
ASSERT_EQ(1, handles.size());
auto* cfh = handles[0];
SyncPoint::GetInstance()->LoadDependency(
{{"TitanDBImpl::BackgroundCallGC:BeforeBackgroundGC",
"TitanDBImpl::DropColumnFamilies:Begin"}});
SyncPoint::GetInstance()->SetCallBack(
"TitanDBImpl::DropColumnFamilies:BeforeBaseDBDropCF",
[&](void*) { ASSERT_EQ(0, db_impl_->TEST_bg_gc_running()); });
SyncPoint::GetInstance()->EnableProcessing();
// Create two blob files, and trigger GC of the first one.
ASSERT_OK(db_->Put(WriteOptions(), cfh, "foo", "v1"));
ASSERT_OK(db_->Put(WriteOptions(), cfh, "bar", "v1"));
ASSERT_OK(db_->Flush(FlushOptions(), cfh));
ASSERT_OK(db_->Put(WriteOptions(), cfh, "foo", "v2"));
ASSERT_OK(db_->Flush(FlushOptions(), cfh));
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), cfh, nullptr, nullptr));
// Verify no GC job is running while we drop the CF.
ASSERT_OK(db_->DropColumnFamilies(handles));
// Cleanup.
ASSERT_OK(db_->DestroyColumnFamilyHandle(cfh));
Close();
}
// Make sure GC job will not run on a dropped CF.
TEST_F(TitanDBTest, GCAfterDropCF) {
options_.min_blob_size = 0;
options_.blob_file_discardable_ratio = 0.1;
options_.disable_background_gc = false;
Open();
// Create CF.
std::vector<TitanCFDescriptor> descs = {{"new_cf", options_}};
std::vector<ColumnFamilyHandle*> handles;
ASSERT_OK(db_->CreateColumnFamilies(descs, &handles));
ASSERT_EQ(1, handles.size());
auto* cfh = handles[0];
std::atomic<int> skip_dropped_cf_count{0};
SyncPoint::GetInstance()->LoadDependency(
{{"TitanDBTest::GCAfterDropCF:AfterDropCF",
"TitanDBImpl::BackgroundCallGC:BeforeGCRunning"},
{"TitanDBImpl::BackgroundGC:Finish",
"TitanDBTest::GCAfterDropCF:WaitGC"}});
SyncPoint::GetInstance()->SetCallBack(
"TitanDBImpl::BackgroundGC:CFDropped",
[&](void*) { skip_dropped_cf_count++; });
SyncPoint::GetInstance()->EnableProcessing();
// Create two blob files, and trigger GC of the first one.
ASSERT_OK(db_->Put(WriteOptions(), cfh, "foo", "v1"));
ASSERT_OK(db_->Put(WriteOptions(), cfh, "bar", "v1"));
ASSERT_OK(db_->Flush(FlushOptions(), cfh));
ASSERT_OK(db_->Put(WriteOptions(), cfh, "foo", "v2"));
ASSERT_OK(db_->Flush(FlushOptions(), cfh));
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), cfh, nullptr, nullptr));
// Drop CF before GC runs. Check if GC job skip the dropped CF.
ASSERT_OK(db_->DropColumnFamilies(handles));
TEST_SYNC_POINT("TitanDBTest::GCAfterDropCF:AfterDropCF");
TEST_SYNC_POINT("TitanDBTest::GCAfterDropCF:WaitGC");
ASSERT_EQ(1, skip_dropped_cf_count.load());
// Cleanup.
ASSERT_OK(db_->DestroyColumnFamilyHandle(cfh));
Close();
}
} // namespace titandb } // namespace titandb
} // namespace rocksdb } // namespace rocksdb
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment