Unverified Commit 0a3f87a2 authored by yiwu-arbug's avatar yiwu-arbug Committed by GitHub

Temp fix for data loss caused by concurrent flush (#96)

Summary:
Temporary fix for #93. Fixing it by waiting until no flush is ongoing before starting a GC job. The drawback is GC can starve if flush runs so frequent that there's no gap between flush jobs.

Test Plan:
See the new test. The test will fail with "Corruption: Missing blob file" error before the fix.
parent 8ac5003c
......@@ -1017,6 +1017,7 @@ void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) {
file->FileStateTransit(BlobFileMeta::FileEvent::kFlushCompleted);
}
}
TEST_SYNC_POINT("TitanDBImpl::OnFlushCompleted:Finished");
}
void TitanDBImpl::OnCompactionCompleted(
......
......@@ -107,6 +107,12 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer,
// Nothing to do
ROCKS_LOG_BUFFER(log_buffer, "Titan GC nothing to do");
} else {
{
mutex_.Unlock();
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh.get())->cfd();
db_impl_->WaitForFlushMemTable(cfd);
mutex_.Lock();
}
BlobGCJob blob_gc_job(blob_gc.get(), db_, &mutex_, db_options_, env_,
env_options_, blob_manager_.get(),
blob_file_set_.get(), log_buffer, &shuting_down_,
......
......@@ -2,7 +2,9 @@
#include <options/cf_options.h>
#include <unordered_map>
#include "db/db_impl/db_impl.h"
#include "file/filename.h"
#include "port/port.h"
#include "rocksdb/utilities/debug.h"
#include "test_util/sync_point.h"
#include "test_util/testharness.h"
......@@ -1105,6 +1107,76 @@ TEST_F(TitanDBTest, GCAfterDropCF) {
Close();
}
TEST_F(TitanDBTest, GCBeforeFlushCommit) {
std::atomic<bool> is_first_flush{true};
DBImpl* db_impl = nullptr;
SyncPoint::GetInstance()->LoadDependency(
{{"TitanDBTest::GCBeforeFlushCommit:PauseInstall",
"TitanDBTest::GCBeforeFlushCommit:WaitFlushPause"},
{"TitanDBImpl::OnFlushCompleted:Finished",
"TitanDBTest::GCBeforeFlushCommit:WaitSecondFlush"}});
SyncPoint::GetInstance()->SetCallBack("FlushJob::InstallResults", [&](void*) {
if (is_first_flush) {
is_first_flush = false;
} else {
// skip waiting for the second flush.
return;
}
auto* db_mutex = db_impl->mutex();
db_mutex->Unlock();
TEST_SYNC_POINT("TitanDBTest::GCBeforeFlushCommit:PauseInstall");
Env::Default()->SleepForMicroseconds(1000 * 1000); // 1s
db_mutex->Lock();
});
options_.create_if_missing = true;
// Setting max_flush_jobs = max_background_jobs / 4 = 2.
options_.max_background_jobs = 8;
options_.max_write_buffer_number = 4;
options_.min_blob_size = 0;
options_.merge_small_file_threshold = 1024 * 1024;
options_.disable_background_gc = true;
Open();
uint32_t cf_id = db_->DefaultColumnFamily()->GetID();
db_impl = reinterpret_cast<DBImpl*>(db_->GetRootDB());
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(db_->Put(WriteOptions(), "foo", "v"));
// t1 will wait for the second flush complete before install super version.
auto t1 = port::Thread([&]() {
// flush_opts.wait = true
ASSERT_OK(db_->Flush(FlushOptions()));
});
TEST_SYNC_POINT("TitanDBTest::GCBeforeFlushCommit:WaitFlushPause");
// In the second flush we check if memtable has been committed, and signal
// the first flush to proceed.
ASSERT_OK(db_->Put(WriteOptions(), "bar", "v"));
FlushOptions flush_opts;
flush_opts.wait = false;
ASSERT_OK(db_->Flush(flush_opts));
TEST_SYNC_POINT("TitanDBTest::GCBeforeFlushCommit:WaitSecondFlush");
// Set GC mark to force GC select the file.
auto blob_storage = GetBlobStorage().lock();
std::map<uint64_t, std::weak_ptr<BlobFileMeta>> blob_files;
blob_storage->ExportBlobFiles(blob_files);
ASSERT_EQ(2, blob_files.size());
auto second_file = blob_files.rbegin()->second.lock();
second_file->set_gc_mark(true);
ASSERT_OK(db_impl_->TEST_StartGC(cf_id));
ASSERT_OK(db_impl_->TEST_PurgeObsoleteFiles());
t1.join();
// Check value after memtable committed.
std::string value;
// Before fixing the issue, this call will return
// Corruption: Missing blob file error.
ASSERT_OK(db_->Get(ReadOptions(), "bar", &value));
ASSERT_EQ("v", value);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
} // namespace titandb
} // namespace rocksdb
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment