Unverified Commit 8ac5003c authored by Connor's avatar Connor Committed by GitHub

merge BackgroundGC with TEST_StartGC (#94)

* merge TEST_StartGC with BackgroundGC
Signed-off-by: 's avatarConnor1996 <zbk602423539@gmail.com>
parent b9915d9b
...@@ -561,12 +561,17 @@ Status TitanDBImpl::GetImpl(const ReadOptions& options, ...@@ -561,12 +561,17 @@ Status TitanDBImpl::GetImpl(const ReadOptions& options,
auto storage = blob_file_set_->GetBlobStorage(handle->GetID()).lock(); auto storage = blob_file_set_->GetBlobStorage(handle->GetID()).lock();
mutex_.Unlock(); mutex_.Unlock();
{ if (storage) {
StopWatch read_sw(env_, stats_.get(), BLOB_DB_BLOB_FILE_READ_MICROS); StopWatch read_sw(env_, stats_.get(), BLOB_DB_BLOB_FILE_READ_MICROS);
s = storage->Get(options, index, &record, &buffer); s = storage->Get(options, index, &record, &buffer);
RecordTick(stats_.get(), BLOB_DB_NUM_KEYS_READ); RecordTick(stats_.get(), BLOB_DB_NUM_KEYS_READ);
RecordTick(stats_.get(), BLOB_DB_BLOB_FILE_BYTES_READ, RecordTick(stats_.get(), BLOB_DB_BLOB_FILE_BYTES_READ,
index.blob_handle.size); index.blob_handle.size);
} else {
ROCKS_LOG_ERROR(db_options_.info_log,
"Column family id:%" PRIu32 " not Found.", handle->GetID());
return Status::NotFound(
"Column family id: " + std::to_string(handle->GetID()) + " not Found.");
} }
if (s.IsCorruption()) { if (s.IsCorruption()) {
ROCKS_LOG_ERROR(db_options_.info_log, ROCKS_LOG_ERROR(db_options_.info_log,
...@@ -633,15 +638,20 @@ Iterator* TitanDBImpl::NewIteratorImpl( ...@@ -633,15 +638,20 @@ Iterator* TitanDBImpl::NewIteratorImpl(
auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(handle)->cfd(); auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(handle)->cfd();
mutex_.Lock(); mutex_.Lock();
auto storage = blob_file_set_->GetBlobStorage(handle->GetID()); auto storage = blob_file_set_->GetBlobStorage(handle->GetID()).lock();
mutex_.Unlock(); mutex_.Unlock();
if (!storage) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Column family id:%" PRIu32 " not Found.", handle->GetID());
return nullptr;
}
std::unique_ptr<ArenaWrappedDBIter> iter(db_impl_->NewIteratorImpl( std::unique_ptr<ArenaWrappedDBIter> iter(db_impl_->NewIteratorImpl(
options, cfd, options.snapshot->GetSequenceNumber(), options, cfd, options.snapshot->GetSequenceNumber(),
nullptr /*read_callback*/, true /*allow_blob*/, true /*allow_refresh*/)); nullptr /*read_callback*/, true /*allow_blob*/, true /*allow_refresh*/));
return new TitanDBIterator(options, storage.lock().get(), snapshot, return new TitanDBIterator(options, storage.get(), snapshot, std::move(iter),
std::move(iter), env_, stats_.get(), env_, stats_.get(), db_options_.info_log.get());
db_options_.info_log.get());
} }
Status TitanDBImpl::NewIterators( Status TitanDBImpl::NewIterators(
......
...@@ -187,7 +187,7 @@ class TitanDBImpl : public TitanDB { ...@@ -187,7 +187,7 @@ class TitanDBImpl : public TitanDB {
static void BGWorkGC(void* db); static void BGWorkGC(void* db);
void BackgroundCallGC(); void BackgroundCallGC();
Status BackgroundGC(LogBuffer* log_buffer); Status BackgroundGC(LogBuffer* log_buffer, uint32_t column_family_id);
void PurgeObsoleteFiles(); void PurgeObsoleteFiles();
Status PurgeObsoleteFilesImpl(); Status PurgeObsoleteFilesImpl();
......
...@@ -29,9 +29,7 @@ void TitanDBImpl::BGWorkGC(void* db) { ...@@ -29,9 +29,7 @@ void TitanDBImpl::BGWorkGC(void* db) {
} }
void TitanDBImpl::BackgroundCallGC() { void TitanDBImpl::BackgroundCallGC() {
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
TEST_SYNC_POINT("TitanDBImpl::BackgroundCallGC:BeforeGCRunning"); TEST_SYNC_POINT("TitanDBImpl::BackgroundCallGC:BeforeGCRunning");
{ {
MutexLock l(&mutex_); MutexLock l(&mutex_);
assert(bg_gc_scheduled_ > 0); assert(bg_gc_scheduled_ > 0);
...@@ -41,13 +39,17 @@ void TitanDBImpl::BackgroundCallGC() { ...@@ -41,13 +39,17 @@ void TitanDBImpl::BackgroundCallGC() {
bg_gc_running_++; bg_gc_running_++;
TEST_SYNC_POINT("TitanDBImpl::BackgroundCallGC:BeforeBackgroundGC"); TEST_SYNC_POINT("TitanDBImpl::BackgroundCallGC:BeforeBackgroundGC");
BackgroundGC(&log_buffer); if (!gc_queue_.empty()) {
uint32_t column_family_id = PopFirstFromGCQueue();
{ LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
mutex_.Unlock(); db_options_.info_log.get());
log_buffer.FlushBufferToLog(); BackgroundGC(&log_buffer, column_family_id);
LogFlush(db_options_.info_log.get()); {
mutex_.Lock(); mutex_.Unlock();
log_buffer.FlushBufferToLog();
LogFlush(db_options_.info_log.get());
mutex_.Lock();
}
} }
bg_gc_running_--; bg_gc_running_--;
...@@ -67,36 +69,35 @@ void TitanDBImpl::BackgroundCallGC() { ...@@ -67,36 +69,35 @@ void TitanDBImpl::BackgroundCallGC() {
} }
} }
Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) { Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer,
uint32_t column_family_id) {
mutex_.AssertHeld(); mutex_.AssertHeld();
StopWatch gc_sw(env_, stats_.get(), BLOB_DB_GC_MICROS); StopWatch gc_sw(env_, stats_.get(), BLOB_DB_GC_MICROS);
std::unique_ptr<BlobGC> blob_gc; std::unique_ptr<BlobGC> blob_gc;
std::unique_ptr<ColumnFamilyHandle> cfh; std::unique_ptr<ColumnFamilyHandle> cfh;
Status s; Status s;
if (!gc_queue_.empty()) {
uint32_t column_family_id = PopFirstFromGCQueue(); std::shared_ptr<BlobStorage> blob_storage;
std::shared_ptr<BlobStorage> blob_storage; // Skip CFs that have been dropped.
// Skip CFs that have been dropped. if (!blob_file_set_->IsColumnFamilyObsolete(column_family_id)) {
if (!blob_file_set_->IsColumnFamilyObsolete(column_family_id)) { blob_storage = blob_file_set_->GetBlobStorage(column_family_id).lock();
blob_storage = blob_file_set_->GetBlobStorage(column_family_id).lock(); } else {
} else { TEST_SYNC_POINT_CALLBACK("TitanDBImpl::BackgroundGC:CFDropped", nullptr);
TEST_SYNC_POINT_CALLBACK("TitanDBImpl::BackgroundGC:CFDropped", nullptr); ROCKS_LOG_BUFFER(log_buffer, "GC skip dropped colum family [%s].",
ROCKS_LOG_BUFFER(log_buffer, "GC skip dropped colum family [%s].", cf_info_[column_family_id].name.c_str());
cf_info_[column_family_id].name.c_str()); }
} if (blob_storage != nullptr) {
if (blob_storage != nullptr) { const auto& cf_options = blob_storage->cf_options();
const auto& cf_options = blob_storage->cf_options(); std::shared_ptr<BlobGCPicker> blob_gc_picker =
std::shared_ptr<BlobGCPicker> blob_gc_picker = std::make_shared<BasicBlobGCPicker>(db_options_, cf_options,
std::make_shared<BasicBlobGCPicker>(db_options_, cf_options, stats_.get());
stats_.get()); blob_gc = blob_gc_picker->PickBlobGC(blob_storage.get());
blob_gc = blob_gc_picker->PickBlobGC(blob_storage.get());
if (blob_gc) {
if (blob_gc) { cfh = db_impl_->GetColumnFamilyHandleUnlocked(column_family_id);
cfh = db_impl_->GetColumnFamilyHandleUnlocked(column_family_id); assert(column_family_id == cfh->GetID());
assert(column_family_id == cfh->GetID()); blob_gc->SetColumnFamily(cfh.get());
blob_gc->SetColumnFamily(cfh.get());
}
} }
} }
...@@ -146,7 +147,6 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) { ...@@ -146,7 +147,6 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) {
return s; return s;
} }
// TODO(yiwu): merge with BackgroundGC().
Status TitanDBImpl::TEST_StartGC(uint32_t column_family_id) { Status TitanDBImpl::TEST_StartGC(uint32_t column_family_id) {
// BackgroundCallGC // BackgroundCallGC
Status s; Status s;
...@@ -160,54 +160,7 @@ Status TitanDBImpl::TEST_StartGC(uint32_t column_family_id) { ...@@ -160,54 +160,7 @@ Status TitanDBImpl::TEST_StartGC(uint32_t column_family_id) {
bg_gc_running_++; bg_gc_running_++;
bg_gc_scheduled_++; bg_gc_scheduled_++;
// BackgroudGC s = BackgroundGC(&log_buffer, column_family_id);
StopWatch gc_sw(env_, stats_.get(), BLOB_DB_GC_MICROS);
std::unique_ptr<BlobGC> blob_gc;
std::unique_ptr<ColumnFamilyHandle> cfh;
if (blob_file_set_->IsColumnFamilyObsolete(column_family_id)) {
return Status::ColumnFamilyDropped(
"Column Family has been dropped before GC.");
}
auto bs = blob_file_set_->GetBlobStorage(column_family_id).lock().get();
const auto& cf_options = bs->cf_options();
std::shared_ptr<BlobGCPicker> blob_gc_picker =
std::make_shared<BasicBlobGCPicker>(db_options_, cf_options, nullptr);
blob_gc = blob_gc_picker->PickBlobGC(bs);
if (blob_gc) {
cfh = db_impl_->GetColumnFamilyHandleUnlocked(column_family_id);
assert(column_family_id == cfh->GetID());
blob_gc->SetColumnFamily(cfh.get());
}
if (UNLIKELY(!blob_gc)) {
ROCKS_LOG_BUFFER(&log_buffer, "Titan GC nothing to do");
} else {
BlobGCJob blob_gc_job(blob_gc.get(), db_, &mutex_, db_options_, env_,
env_options_, blob_manager_.get(),
blob_file_set_.get(), &log_buffer, &shuting_down_,
stats_.get());
s = blob_gc_job.Prepare();
if (s.ok()) {
mutex_.Unlock();
s = blob_gc_job.Run();
mutex_.Lock();
}
if (s.ok()) {
s = blob_gc_job.Finish();
}
blob_gc->ReleaseGcFiles();
}
if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log, "Titan GC error: %s",
s.ToString().c_str());
}
{ {
mutex_.Unlock(); mutex_.Unlock();
......
...@@ -153,6 +153,10 @@ class TitanDBTest : public testing::Test { ...@@ -153,6 +153,10 @@ class TitanDBTest : public testing::Test {
return db_impl_->blob_file_set_->GetBlobStorage(cf_handle->GetID()); return db_impl_->blob_file_set_->GetBlobStorage(cf_handle->GetID());
} }
ColumnFamilyHandle* GetColumnFamilyHandle(uint32_t cf_id) {
return db_impl_->db_impl_->GetColumnFamilyHandleUnlocked(cf_id).release();
}
void VerifyDB(const std::map<std::string, std::string>& data, void VerifyDB(const std::map<std::string, std::string>& data,
ReadOptions ropts = ReadOptions()) { ReadOptions ropts = ReadOptions()) {
db_impl_->PurgeObsoleteFiles(); db_impl_->PurgeObsoleteFiles();
...@@ -519,6 +523,35 @@ TEST_F(TitanDBTest, DropColumnFamily) { ...@@ -519,6 +523,35 @@ TEST_F(TitanDBTest, DropColumnFamily) {
Close(); Close();
} }
TEST_F(TitanDBTest, DestroyColumnFamilyHandle) {
Open();
const uint64_t kNumCF = 3;
for (uint64_t i = 1; i <= kNumCF; i++) {
AddCF(std::to_string(i));
}
const uint64_t kNumEntries = 10;
std::map<std::string, std::string> data;
for (uint64_t i = 1; i <= kNumEntries; i++) {
Put(i, &data);
}
VerifyDB(data);
Flush();
VerifyDB(data);
// Destroy column families handle, check whether GC skips the column families.
for (auto& handle : cf_handles_) {
auto cf_id = handle->GetID();
db_->DestroyColumnFamilyHandle(handle);
ASSERT_OK(db_impl_->TEST_StartGC(cf_id));
}
cf_handles_.clear();
VerifyDB(data);
Reopen();
VerifyDB(data);
Close();
}
TEST_F(TitanDBTest, DeleteFilesInRange) { TEST_F(TitanDBTest, DeleteFilesInRange) {
Open(); Open();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment