Commit 8ee2e794 authored by Wu Jiayu's avatar Wu Jiayu Committed by yiwu-arbug

Turn titan into read-only on error during gc and version_change (#42)

- Set background error if encountered a critical error. Now it's set for two cases
    - BackgroundGC() error
    - LogAndApply() error after generate new blob file
- If background error is set, all following foreground write operations will be refused and return background error status
parent 53c1096f
......@@ -73,6 +73,9 @@ class TitanDBImpl::FileManager : public BlobFileManager {
{
MutexLock l(&db_->mutex_);
s = db_->vset_->LogAndApply(edit);
if (!s.ok()) {
db_->SetBGError(s);
}
for (const auto& file : files)
db_->pending_outputs_.erase(file.second->GetNumber());
}
......@@ -422,6 +425,7 @@ Status TitanDBImpl::CompactFiles(
const std::vector<std::string>& input_file_names, const int output_level,
const int output_path_id, std::vector<std::string>* const output_file_names,
CompactionJobInfo* compaction_job_info) {
if (HasBGError()) return GetBGError();
std::unique_ptr<CompactionJobInfo> compaction_job_info_ptr;
if (compaction_job_info == nullptr) {
compaction_job_info_ptr.reset(new CompactionJobInfo());
......@@ -437,6 +441,47 @@ Status TitanDBImpl::CompactFiles(
return s;
}
Status TitanDBImpl::Put(const rocksdb::WriteOptions& options,
rocksdb::ColumnFamilyHandle* column_family,
const rocksdb::Slice& key,
const rocksdb::Slice& value) {
return HasBGError() ? GetBGError()
: db_->Put(options, column_family, key, value);
}
Status TitanDBImpl::Write(const rocksdb::WriteOptions& options,
rocksdb::WriteBatch* updates) {
return HasBGError() ? GetBGError() : db_->Write(options, updates);
}
Status TitanDBImpl::Delete(const rocksdb::WriteOptions& options,
rocksdb::ColumnFamilyHandle* column_family,
const rocksdb::Slice& key) {
return HasBGError() ? GetBGError() : db_->Delete(options, column_family, key);
}
Status TitanDBImpl::IngestExternalFile(
rocksdb::ColumnFamilyHandle* column_family,
const std::vector<std::string>& external_files,
const rocksdb::IngestExternalFileOptions& options) {
return HasBGError()
? GetBGError()
: db_->IngestExternalFile(column_family, external_files, options);
}
Status TitanDBImpl::CompactRange(const rocksdb::CompactRangeOptions& options,
rocksdb::ColumnFamilyHandle* column_family,
const rocksdb::Slice* begin,
const rocksdb::Slice* end) {
return HasBGError() ? GetBGError()
: db_->CompactRange(options, column_family, begin, end);
}
Status TitanDBImpl::Flush(const rocksdb::FlushOptions& options,
rocksdb::ColumnFamilyHandle* column_family) {
return HasBGError() ? GetBGError() : db_->Flush(options, column_family);
}
Status TitanDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle* handle,
const Slice& key, PinnableSlice* value) {
if (options.snapshot) {
......@@ -867,5 +912,24 @@ void TitanDBImpl::OnCompactionCompleted(
}
}
Status TitanDBImpl::SetBGError(const Status& s) {
if (s.ok()) return s;
mutex_.AssertHeld();
Status bg_err = s;
if (!db_options_.listeners.empty()) {
// TODO(@jiayu) : check if mutex_ is freeable for future use case
mutex_.Unlock();
for (auto& listener : db_options_.listeners) {
listener->OnBackgroundError(BackgroundErrorReason::kCompaction, &bg_err);
}
mutex_.Lock();
}
if (!bg_err.ok()) {
bg_error_ = bg_err;
has_bg_error_.store(true);
}
return bg_err;
}
} // namespace titandb
} // namespace rocksdb
......@@ -43,6 +43,31 @@ class TitanDBImpl : public TitanDB {
Status CloseImpl();
using TitanDB::Put;
Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) override;
using TitanDB::Write;
Status Write(const WriteOptions& options, WriteBatch* updates) override;
using TitanDB::Delete;
Status Delete(const WriteOptions& options, ColumnFamilyHandle* column_family,
const Slice& key) override;
using TitanDB::IngestExternalFile;
Status IngestExternalFile(ColumnFamilyHandle* column_family,
const std::vector<std::string>& external_files,
const IngestExternalFileOptions& options) override;
using TitanDB::CompactRange;
Status CompactRange(const CompactRangeOptions& options,
ColumnFamilyHandle* column_family, const Slice* begin,
const Slice* end) override;
using TitanDB::Flush;
Status Flush(const FlushOptions& fopts,
ColumnFamilyHandle* column_family) override;
using TitanDB::Get;
Status Get(const ReadOptions& options, ColumnFamilyHandle* handle,
const Slice& key, PinnableSlice* value) override;
......@@ -155,6 +180,16 @@ class TitanDBImpl : public TitanDB {
return oldest_snapshot;
}
// REQUIRE: mutex_ held
Status SetBGError(const Status& s);
Status GetBGError() {
MutexLock l(&mutex_);
return bg_error_;
}
bool HasBGError() { return has_bg_error_.load(); }
FileLock* lock_{nullptr};
// The lock sequence must be Titan.mutex_.Lock() -> Base DB mutex_.Lock()
// while the unlock sequence must be Base DB mutex.Unlock() ->
......@@ -171,6 +206,9 @@ class TitanDBImpl : public TitanDB {
EnvOptions env_options_;
DBImpl* db_impl_;
TitanDBOptions db_options_;
// Turn DB into read-only if background error happened
Status bg_error_;
std::atomic_bool has_bg_error_{false};
// TitanStats is turned on only if statistics field of DBOptions
// is not null.
......
......@@ -65,7 +65,6 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) {
std::unique_ptr<BlobGC> blob_gc;
std::unique_ptr<ColumnFamilyHandle> cfh;
Status s;
if (!gc_queue_.empty()) {
uint32_t column_family_id = PopFirstFromGCQueue();
auto bs = vset_->GetBlobStorage(column_family_id).lock().get();
......@@ -108,6 +107,7 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) {
if (s.ok()) {
// Done
} else {
SetBGError(s);
ROCKS_LOG_WARN(db_options_.info_log, "Titan GC error: %s",
s.ToString().c_str());
}
......
......@@ -225,6 +225,30 @@ class TitanDBTest : public testing::Test {
DeleteDir(env_, dbname_);
}
void SetBGError(const Status& s) {
MutexLock l(&db_impl_->mutex_);
db_impl_->SetBGError(s);
}
void CallGC() {
db_impl_->bg_gc_scheduled_++;
db_impl_->BackgroundCallGC();
while (db_impl_->bg_gc_scheduled_)
;
}
// Make db ignore first bg_error
class BGErrorListener : public EventListener {
public:
void OnBackgroundError(BackgroundErrorReason reason,
Status* error) override {
if (++cnt == 1) *error = Status();
}
private:
int cnt{0};
};
Env* env_{Env::Default()};
std::string dbname_;
TitanOptions options_;
......@@ -748,6 +772,49 @@ TEST_F(TitanDBTest, FallbackModeEncounterMissingBlobFile) {
VerifyDB({{"bar", "v1"}});
}
TEST_F(TitanDBTest, BackgroundErrorHandling) {
options_.listeners.emplace_back(std::make_shared<BGErrorListener>());
Open();
std::string key = "key", val = "val";
SetBGError(Status::IOError(""));
// BG error is restored by listener for first time
ASSERT_OK(db_->Put(WriteOptions(), key, val));
SetBGError(Status::IOError(""));
ASSERT_OK(db_->Get(ReadOptions(), key, &val));
ASSERT_EQ(val, "val");
ASSERT_TRUE(db_->Put(WriteOptions(), key, val).IsIOError());
ASSERT_TRUE(db_->Flush(FlushOptions()).IsIOError());
ASSERT_TRUE(db_->Delete(WriteOptions(), key).IsIOError());
ASSERT_TRUE(
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr).IsIOError());
ASSERT_TRUE(
db_->CompactFiles(CompactionOptions(), std::vector<std::string>(), 1)
.IsIOError());
Close();
}
TEST_F(TitanDBTest, BackgroundErrorTrigger) {
std::unique_ptr<TitanFaultInjectionTestEnv> mock_env(
new TitanFaultInjectionTestEnv(env_));
options_.env = mock_env.get();
Open();
std::map<std::string, std::string> data;
const int kNumEntries = 100;
for (uint64_t i = 1; i <= kNumEntries; i++) {
Put(i, &data);
}
Flush();
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
SyncPoint::GetInstance()->SetCallBack("VersionSet::LogAndApply", [&](void*) {
mock_env->SetFilesystemActive(false, Status::IOError("Injected error"));
});
SyncPoint::GetInstance()->EnableProcessing();
CallGC();
mock_env->SetFilesystemActive(true);
// Still failed for bg error
ASSERT_TRUE(db_impl_->Put(WriteOptions(), "key", "val").IsIOError());
Close();
}
} // namespace titandb
} // namespace rocksdb
......
......@@ -204,6 +204,7 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
}
Status VersionSet::LogAndApply(VersionEdit& edit) {
TEST_SYNC_POINT("VersionSet::LogAndApply");
// TODO(@huachao): write manifest file unlocked
std::string record;
edit.SetNextFileNumber(next_file_number_.load());
......@@ -213,11 +214,12 @@ Status VersionSet::LogAndApply(VersionEdit& edit) {
ImmutableDBOptions ioptions(db_options_);
s = SyncManifest(env_, &ioptions, manifest_->file());
}
if (!s.ok()) return s;
EditCollector collector;
collector.AddEdit(edit);
return collector.Apply(*this);
if (s.ok()) {
EditCollector collector;
collector.AddEdit(edit);
s = collector.Apply(*this);
}
return s;
}
void VersionSet::AddColumnFamilies(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment