Unverified Commit 0fc61eec authored by yiwu-arbug's avatar yiwu-arbug Committed by GitHub

TitanDBImpl::Open only open base DB once (#114)

Summary:
Refactor `TitanDBImpl::Open` to open base DB only once. Previously we open the DB twice - the first time only to obtain `cf_id` for each CF, and then initialize internals before open base DB for real. The patch change to open base DB only once, initialize internals after that. Before `TitanDBImpl` is initialized, `BaseDbListener` will be no-op and `TableBuilder` will not output blob files.

Test Plan:
Added new tests
parent e5731f44
...@@ -158,9 +158,8 @@ struct TitanCFOptions : public ColumnFamilyOptions { ...@@ -158,9 +158,8 @@ struct TitanCFOptions : public ColumnFamilyOptions {
TitanCFOptions() = default; TitanCFOptions() = default;
explicit TitanCFOptions(const ColumnFamilyOptions& options) explicit TitanCFOptions(const ColumnFamilyOptions& options)
: ColumnFamilyOptions(options) {} : ColumnFamilyOptions(options) {}
explicit TitanCFOptions(const ColumnFamilyOptions&, TitanCFOptions(const ColumnFamilyOptions&, const ImmutableTitanCFOptions&,
const ImmutableTitanCFOptions&, const MutableTitanCFOptions&);
const MutableTitanCFOptions&);
TitanCFOptions& operator=(const ColumnFamilyOptions& options) { TitanCFOptions& operator=(const ColumnFamilyOptions& options) {
*dynamic_cast<ColumnFamilyOptions*>(this) = options; *dynamic_cast<ColumnFamilyOptions*>(this) = options;
......
#include "base_db_listener.h" #include "base_db_listener.h"
#include "db_impl.h"
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
BaseDbListener::BaseDbListener(TitanDBImpl* db) : db_impl_(db) {} BaseDbListener::BaseDbListener(TitanDBImpl* db) : db_impl_(db) {
assert(db_impl_ != nullptr);
}
BaseDbListener::~BaseDbListener() {} BaseDbListener::~BaseDbListener() {}
void BaseDbListener::OnFlushCompleted(DB* /*db*/, void BaseDbListener::OnFlushCompleted(DB* /*db*/,
const FlushJobInfo& flush_job_info) { const FlushJobInfo& flush_job_info) {
db_impl_->OnFlushCompleted(flush_job_info); if (db_impl_->initialized()) {
db_impl_->OnFlushCompleted(flush_job_info);
}
} }
void BaseDbListener::OnCompactionCompleted( void BaseDbListener::OnCompactionCompleted(
DB* /* db */, const CompactionJobInfo& compaction_job_info) { DB* /* db */, const CompactionJobInfo& compaction_job_info) {
db_impl_->OnCompactionCompleted(compaction_job_info); if (db_impl_->initialized()) {
db_impl_->OnCompactionCompleted(compaction_job_info);
}
} }
} // namespace titandb } // namespace titandb
......
#pragma once #pragma once
#include "db_impl.h"
#include "rocksdb/listener.h" #include "rocksdb/listener.h"
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
class TitanDBImpl;
class BaseDbListener final : public EventListener { class BaseDbListener final : public EventListener {
public: public:
BaseDbListener(TitanDBImpl* db); BaseDbListener(TitanDBImpl* db);
...@@ -22,5 +22,4 @@ class BaseDbListener final : public EventListener { ...@@ -22,5 +22,4 @@ class BaseDbListener final : public EventListener {
}; };
} // namespace titandb } // namespace titandb
} // namespace rocksdb } // namespace rocksdb
...@@ -29,7 +29,6 @@ Status TitanDB::Open(const TitanDBOptions& db_options, ...@@ -29,7 +29,6 @@ Status TitanDB::Open(const TitanDBOptions& db_options,
auto s = impl->Open(descs, handles); auto s = impl->Open(descs, handles);
if (s.ok()) { if (s.ok()) {
*db = impl; *db = impl;
impl->StartBackgroundTasks();
} else { } else {
*db = nullptr; *db = nullptr;
delete impl; delete impl;
......
...@@ -175,114 +175,144 @@ Status TitanDBImpl::ValidateOptions( ...@@ -175,114 +175,144 @@ Status TitanDBImpl::ValidateOptions(
Status TitanDBImpl::Open(const std::vector<TitanCFDescriptor>& descs, Status TitanDBImpl::Open(const std::vector<TitanCFDescriptor>& descs,
std::vector<ColumnFamilyHandle*>* handles) { std::vector<ColumnFamilyHandle*>* handles) {
if (handles == nullptr) {
return Status::InvalidArgument("handles must be non-null.");
}
Status s = OpenImpl(descs, handles);
// Cleanup after failure.
if (!s.ok()) {
if (handles->size() > 0) {
assert(db_ != nullptr);
for (ColumnFamilyHandle* cfh : *handles) {
Status destroy_handle_status = db_->DestroyColumnFamilyHandle(cfh);
if (!destroy_handle_status.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to destroy CF handle after open failure: %s",
destroy_handle_status.ToString().c_str());
}
}
handles->clear();
}
if (db_ != nullptr) {
Status close_status = db_->Close();
if (!close_status.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to close base DB after open failure: %s",
close_status.ToString().c_str());
}
db_ = nullptr;
db_impl_ = nullptr;
}
if (lock_) {
env_->UnlockFile(lock_);
lock_ = nullptr;
}
}
return s;
}
Status TitanDBImpl::OpenImpl(const std::vector<TitanCFDescriptor>& descs,
std::vector<ColumnFamilyHandle*>* handles) {
Status s = ValidateOptions(db_options_, descs); Status s = ValidateOptions(db_options_, descs);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
// Sets up directories for base DB and Titan. // Sets up directories for base DB and Titan.
s = env_->CreateDirIfMissing(dbname_); s = env_->CreateDirIfMissing(dbname_);
if (!s.ok()) return s; if (!s.ok()) {
return s;
}
if (!db_options_.info_log) { if (!db_options_.info_log) {
s = CreateLoggerFromOptions(dbname_, db_options_, &db_options_.info_log); s = CreateLoggerFromOptions(dbname_, db_options_, &db_options_.info_log);
if (!s.ok()) return s; if (!s.ok()) {
return s;
}
} }
s = env_->CreateDirIfMissing(dirname_); s = env_->CreateDirIfMissing(dirname_);
if (!s.ok()) return s; if (!s.ok()) {
return s;
}
s = env_->LockFile(LockFileName(dirname_), &lock_); s = env_->LockFile(LockFileName(dirname_), &lock_);
if (!s.ok()) return s; if (!s.ok()) {
return s;
// Descriptors for initial DB open to get CF ids.
std::vector<ColumnFamilyDescriptor> init_descs;
// Descriptors for actually open DB.
std::vector<ColumnFamilyDescriptor> base_descs;
for (auto& desc : descs) {
init_descs.emplace_back(desc.name, desc.options);
base_descs.emplace_back(desc.name, desc.options);
} }
std::map<uint32_t, TitanCFOptions> column_families;
// Opens the base DB first to collect the column families information
//
// Disable compaction at this point because we haven't add table properties
// collector. A compaction can generate a SST file without blob size table
// property. A later compaction after Titan DB open can cause crash because
// OnCompactionCompleted use table property to discover blob files generated
// by the compaction, and get confused by missing property.
//
// We also avoid flush here because we haven't replaced the table factory
// yet, but rocksdb may still flush if memtable is full. This is fine though,
// since values in memtable are raw values.
for (auto& desc : init_descs) {
desc.options.disable_auto_compactions = true;
}
db_options_.avoid_flush_during_recovery = true;
// Add EventListener to collect statistics for GC
db_options_.listeners.emplace_back(std::make_shared<BaseDbListener>(this));
// Note that info log is initialized after `CreateLoggerFromOptions`, // Note that info log is initialized after `CreateLoggerFromOptions`,
// so new `BlobFileSet` here but not in constructor is to get a proper info // so new `BlobFileSet` here but not in constructor is to get a proper info
// log. // log.
blob_file_set_.reset(new BlobFileSet(db_options_, stats_.get())); blob_file_set_.reset(new BlobFileSet(db_options_, stats_.get()));
// Setup options.
s = DB::Open(db_options_, dbname_, init_descs, handles, &db_); db_options_.listeners.emplace_back(std::make_shared<BaseDbListener>(this));
if (s.ok()) { // Descriptors for actually open DB.
for (size_t i = 0; i < descs.size(); i++) { std::vector<ColumnFamilyDescriptor> base_descs;
auto handle = (*handles)[i]; std::vector<std::shared_ptr<TitanTableFactory>> titan_table_factories;
uint32_t cf_id = handle->GetID(); for (auto& desc : descs) {
std::string cf_name = handle->GetName(); base_descs.emplace_back(desc.name, desc.options);
column_families.emplace(cf_id, descs[i].options); ColumnFamilyOptions& cf_opts = base_descs.back().options;
db_->DestroyColumnFamilyHandle(handle); // Disable compactions before everything is initialized.
// Replaces the provided table factory with TitanTableFactory. cf_opts.disable_auto_compactions = true;
// While we need to preserve original table_factory for GetOptions. cf_opts.table_properties_collector_factories.emplace_back(
auto& base_table_factory = base_descs[i].options.table_factory; std::make_shared<BlobFileSizeCollectorFactory>());
assert(base_table_factory != nullptr); titan_table_factories.push_back(std::make_shared<TitanTableFactory>(
auto titan_table_factory = std::make_shared<TitanTableFactory>( db_options_, desc.options, this, blob_manager_, &mutex_,
db_options_, descs[i].options, blob_manager_, &mutex_, blob_file_set_.get(), stats_.get()));
blob_file_set_.get(), stats_.get()); cf_opts.table_factory = titan_table_factories.back();
cf_info_.emplace(cf_id,
TitanColumnFamilyInfo(
{cf_name, ImmutableTitanCFOptions(descs[i].options),
MutableTitanCFOptions(descs[i].options),
base_table_factory, titan_table_factory}));
base_descs[i].options.table_factory = titan_table_factory;
// Add TableProperties for collecting statistics GC
base_descs[i].options.table_properties_collector_factories.emplace_back(
std::make_shared<BlobFileSizeCollectorFactory>());
}
handles->clear();
s = db_->Close();
delete db_;
db_ = nullptr;
}
if (!s.ok()) return s;
if (stats_.get()) {
stats_->Initialize(column_families);
} }
s = blob_file_set_->Open(column_families);
if (!s.ok()) return s;
// Initialize GC thread pool. // Initialize GC thread pool.
if (!db_options_.disable_background_gc && db_options_.max_background_gc > 0) { if (!db_options_.disable_background_gc && db_options_.max_background_gc > 0) {
env_->IncBackgroundThreadsIfNeeded(db_options_.max_background_gc, env_->IncBackgroundThreadsIfNeeded(db_options_.max_background_gc,
Env::Priority::BOTTOM); Env::Priority::BOTTOM);
} }
// Open base DB.
s = DB::Open(db_options_, dbname_, base_descs, handles, &db_); s = DB::Open(db_options_, dbname_, base_descs, handles, &db_);
if (s.ok()) { if (!s.ok()) {
db_impl_ = reinterpret_cast<DBImpl*>(db_->GetRootDB()); db_ = nullptr;
ROCKS_LOG_INFO(db_options_.info_log, "Titan DB open."); handles->clear();
ROCKS_LOG_HEADER(db_options_.info_log, "Titan git sha: %s", return s;
titan_build_git_sha); }
db_options_.Dump(db_options_.info_log.get()); db_impl_ = reinterpret_cast<DBImpl*>(db_->GetRootDB());
for (auto& desc : descs) { assert(db_ != nullptr);
ROCKS_LOG_HEADER(db_options_.info_log, assert(handles->size() == descs.size());
"Column family [%s], options:", desc.name.c_str()); std::map<uint32_t, TitanCFOptions> column_families;
desc.options.Dump(db_options_.info_log.get()); std::vector<ColumnFamilyHandle*> cf_with_compaction;
for (size_t i = 0; i < descs.size(); i++) {
cf_info_.emplace((*handles)[i]->GetID(),
TitanColumnFamilyInfo(
{(*handles)[i]->GetName(),
ImmutableTitanCFOptions(descs[i].options),
MutableTitanCFOptions(descs[i].options),
descs[i].options.table_factory /*base_table_factory*/,
titan_table_factories[i]}));
column_families[(*handles)[i]->GetID()] = descs[i].options;
if (!descs[i].options.disable_auto_compactions) {
cf_with_compaction.push_back((*handles)[i]);
} }
} else { }
ROCKS_LOG_ERROR(db_options_.info_log, "Titan DB open failed: %s", // Initialize Titan internals.
s.ToString().c_str()); if (stats_ != nullptr) {
stats_->Initialize(column_families);
}
s = blob_file_set_->Open(column_families);
if (!s.ok()) {
return s;
}
TEST_SYNC_POINT_CALLBACK("TitanDBImpl::OpenImpl:BeforeInitialized", db_);
// Initialization done.
initialized_ = true;
// Enable compaction and background tasks after initilization.
s = db_->EnableAutoCompaction(cf_with_compaction);
if (!s.ok()) {
return s;
}
StartBackgroundTasks();
// Dump options.
ROCKS_LOG_INFO(db_options_.info_log, "Titan DB open.");
ROCKS_LOG_HEADER(db_options_.info_log, "Titan git sha: %s",
titan_build_git_sha);
db_options_.Dump(db_options_.info_log.get());
for (auto& desc : descs) {
ROCKS_LOG_HEADER(db_options_.info_log,
"Column family [%s], options:", desc.name.c_str());
desc.options.Dump(db_options_.info_log.get());
} }
return s; return s;
} }
...@@ -346,8 +376,8 @@ Status TitanDBImpl::CreateColumnFamilies( ...@@ -346,8 +376,8 @@ Status TitanDBImpl::CreateColumnFamilies(
// Replaces the provided table factory with TitanTableFactory. // Replaces the provided table factory with TitanTableFactory.
base_table_factory.emplace_back(options.table_factory); base_table_factory.emplace_back(options.table_factory);
titan_table_factory.emplace_back(std::make_shared<TitanTableFactory>( titan_table_factory.emplace_back(std::make_shared<TitanTableFactory>(
db_options_, desc.options, blob_manager_, &mutex_, blob_file_set_.get(), db_options_, desc.options, this, blob_manager_, &mutex_,
stats_.get())); blob_file_set_.get(), stats_.get()));
options.table_factory = titan_table_factory.back(); options.table_factory = titan_table_factory.back();
options.table_properties_collector_factories.emplace_back( options.table_properties_collector_factories.emplace_back(
std::make_shared<BlobFileSizeCollectorFactory>()); std::make_shared<BlobFileSizeCollectorFactory>());
...@@ -1015,6 +1045,11 @@ bool TitanDBImpl::GetIntProperty(ColumnFamilyHandle* column_family, ...@@ -1015,6 +1045,11 @@ bool TitanDBImpl::GetIntProperty(ColumnFamilyHandle* column_family,
} }
void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) { void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) {
TEST_SYNC_POINT("TitanDBImpl::OnFlushCompleted:Begin");
if (!initialized()) {
assert(false);
return;
}
const auto& tps = flush_job_info.table_properties; const auto& tps = flush_job_info.table_properties;
auto ucp_iter = tps.user_collected_properties.find( auto ucp_iter = tps.user_collected_properties.find(
BlobFileSizeCollector::kPropertiesName); BlobFileSizeCollector::kPropertiesName);
...@@ -1068,6 +1103,11 @@ void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) { ...@@ -1068,6 +1103,11 @@ void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) {
void TitanDBImpl::OnCompactionCompleted( void TitanDBImpl::OnCompactionCompleted(
const CompactionJobInfo& compaction_job_info) { const CompactionJobInfo& compaction_job_info) {
TEST_SYNC_POINT("TitanDBImpl::OnCompactionCompleted:Begin");
if (!initialized()) {
assert(false);
return;
}
if (!compaction_job_info.status.ok()) { if (!compaction_job_info.status.ok()) {
// TODO: Clean up blob file generated by the failed compaction. // TODO: Clean up blob file generated by the failed compaction.
return; return;
......
...@@ -128,12 +128,16 @@ class TitanDBImpl : public TitanDB { ...@@ -128,12 +128,16 @@ class TitanDBImpl : public TitanDB {
bool GetIntProperty(ColumnFamilyHandle* column_family, const Slice& property, bool GetIntProperty(ColumnFamilyHandle* column_family, const Slice& property,
uint64_t* value) override; uint64_t* value) override;
bool initialized() const { return initialized_; }
void OnFlushCompleted(const FlushJobInfo& flush_job_info); void OnFlushCompleted(const FlushJobInfo& flush_job_info);
void OnCompactionCompleted(const CompactionJobInfo& compaction_job_info); void OnCompactionCompleted(const CompactionJobInfo& compaction_job_info);
void StartBackgroundTasks(); void StartBackgroundTasks();
void TEST_set_initialized(bool initialized) { initialized_ = initialized; }
Status TEST_StartGC(uint32_t column_family_id); Status TEST_StartGC(uint32_t column_family_id);
Status TEST_PurgeObsoleteFiles(); Status TEST_PurgeObsoleteFiles();
...@@ -150,6 +154,9 @@ class TitanDBImpl : public TitanDB { ...@@ -150,6 +154,9 @@ class TitanDBImpl : public TitanDB {
friend class TitanDBTest; friend class TitanDBTest;
friend class TitanThreadSafetyTest; friend class TitanThreadSafetyTest;
Status OpenImpl(const std::vector<TitanCFDescriptor>& descs,
std::vector<ColumnFamilyHandle*>* handles);
Status ValidateOptions( Status ValidateOptions(
const TitanDBOptions& options, const TitanDBOptions& options,
const std::vector<TitanCFDescriptor>& column_families) const; const std::vector<TitanCFDescriptor>& column_families) const;
...@@ -242,6 +249,9 @@ class TitanDBImpl : public TitanDB { ...@@ -242,6 +249,9 @@ class TitanDBImpl : public TitanDB {
EnvOptions env_options_; EnvOptions env_options_;
DBImpl* db_impl_; DBImpl* db_impl_;
TitanDBOptions db_options_; TitanDBOptions db_options_;
std::atomic<bool> initialized_{false};
// Turn DB into read-only if background error happened // Turn DB into read-only if background error happened
Status bg_error_; Status bg_error_;
std::atomic_bool has_bg_error_{false}; std::atomic_bool has_bg_error_{false};
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#include "blob_file_manager.h" #include "blob_file_manager.h"
#include "blob_file_reader.h" #include "blob_file_reader.h"
#include "blob_file_set.h" #include "blob_file_set.h"
#include "db_impl.h"
#include "table_builder.h" #include "table_builder.h"
#include "table_factory.h" #include "table_factory.h"
...@@ -98,6 +99,60 @@ class FileManager : public BlobFileManager { ...@@ -98,6 +99,60 @@ class FileManager : public BlobFileManager {
BlobFileSet* blob_file_set_; BlobFileSet* blob_file_set_;
}; };
class TestTableFactory : public TableFactory {
private:
std::shared_ptr<TableFactory> base_factory_;
mutable TableBuilder* latest_table_builder_ = nullptr;
public:
TestTableFactory(std::shared_ptr<TableFactory>& base_factory)
: base_factory_(base_factory) {}
const char* Name() const override { return "TestTableFactory"; }
Status NewTableReader(
const TableReaderOptions& options,
std::unique_ptr<RandomAccessFileReader>&& file, uint64_t file_size,
std::unique_ptr<TableReader>* result,
bool prefetch_index_and_filter_in_cache) const override {
return base_factory_->NewTableReader(options, std::move(file), file_size,
result,
prefetch_index_and_filter_in_cache);
}
TableBuilder* NewTableBuilder(const TableBuilderOptions& options,
uint32_t column_family_id,
WritableFileWriter* file) const override {
latest_table_builder_ =
base_factory_->NewTableBuilder(options, column_family_id, file);
return latest_table_builder_;
}
std::string GetPrintableTableOptions() const override {
return base_factory_->GetPrintableTableOptions();
}
Status SanitizeOptions(const DBOptions& db_options,
const ColumnFamilyOptions& cf_options) const override {
// Override this when we need to validate our options.
return base_factory_->SanitizeOptions(db_options, cf_options);
}
Status GetOptionString(std::string* opt_string,
const std::string& delimiter) const override {
// Override this when we need to persist our options.
return base_factory_->GetOptionString(opt_string, delimiter);
}
void* GetOptions() override { return base_factory_->GetOptions(); }
bool IsDeleteRangeSupported() const override {
return base_factory_->IsDeleteRangeSupported();
}
TableBuilder* latest_table_builder() const { return latest_table_builder_; }
};
class TableBuilderTest : public testing::Test { class TableBuilderTest : public testing::Test {
public: public:
TableBuilderTest() TableBuilderTest()
...@@ -110,11 +165,18 @@ class TableBuilderTest : public testing::Test { ...@@ -110,11 +165,18 @@ class TableBuilderTest : public testing::Test {
cf_options_.min_blob_size = kMinBlobSize; cf_options_.min_blob_size = kMinBlobSize;
blob_file_set_.reset(new BlobFileSet(db_options_, nullptr)); blob_file_set_.reset(new BlobFileSet(db_options_, nullptr));
std::map<uint32_t, TitanCFOptions> cfs{{0, cf_options_}}; std::map<uint32_t, TitanCFOptions> cfs{{0, cf_options_}};
db_impl_.reset(new TitanDBImpl(db_options_, tmpdir_));
db_impl_->TEST_set_initialized(true);
blob_file_set_->AddColumnFamilies(cfs); blob_file_set_->AddColumnFamilies(cfs);
blob_manager_.reset(new FileManager(db_options_, blob_file_set_.get())); blob_manager_.reset(new FileManager(db_options_, blob_file_set_.get()));
table_factory_.reset(new TitanTableFactory(db_options_, cf_options_, // Replace base table facotry.
blob_manager_, &mutex_, base_table_factory_ =
blob_file_set_.get(), nullptr)); std::make_shared<TestTableFactory>(cf_options_.table_factory);
cf_options_.table_factory = base_table_factory_;
cf_ioptions_.table_factory = base_table_factory_.get();
table_factory_.reset(new TitanTableFactory(
db_options_, cf_options_, db_impl_.get(), blob_manager_, &mutex_,
blob_file_set_.get(), nullptr));
} }
~TableBuilderTest() { ~TableBuilderTest() {
...@@ -205,11 +267,40 @@ class TableBuilderTest : public testing::Test { ...@@ -205,11 +267,40 @@ class TableBuilderTest : public testing::Test {
std::string tmpdir_; std::string tmpdir_;
std::string base_name_; std::string base_name_;
std::string blob_name_; std::string blob_name_;
std::unique_ptr<TitanDBImpl> db_impl_;
std::shared_ptr<TestTableFactory> base_table_factory_;
std::unique_ptr<TableFactory> table_factory_; std::unique_ptr<TableFactory> table_factory_;
std::shared_ptr<BlobFileManager> blob_manager_; std::shared_ptr<BlobFileManager> blob_manager_;
std::unique_ptr<BlobFileSet> blob_file_set_; std::unique_ptr<BlobFileSet> blob_file_set_;
}; };
// Before TitanDBImpl initialized, table factory should return base table
// builder.
TEST_F(TableBuilderTest, BeforeDBInitialized) {
CompressionOptions compression_opts;
TableBuilderOptions opts(cf_ioptions_, cf_moptions_,
cf_ioptions_.internal_comparator, &collectors_,
kNoCompression, 0 /*sample_for_compression*/,
compression_opts, false /*skip_filters*/,
kDefaultColumnFamilyName, 0 /*target_level*/);
db_impl_->TEST_set_initialized(false);
std::unique_ptr<WritableFileWriter> file1;
NewBaseFileWriter(&file1);
std::unique_ptr<TableBuilder> builder1(
table_factory_->NewTableBuilder(opts, 0 /*cf_id*/, file1.get()));
ASSERT_EQ(builder1.get(), base_table_factory_->latest_table_builder());
builder1->Abandon();
db_impl_->TEST_set_initialized(true);
std::unique_ptr<WritableFileWriter> file2;
NewBaseFileWriter(&file2);
std::unique_ptr<TableBuilder> builder2(
table_factory_->NewTableBuilder(opts, 0 /*cf_id*/, file2.get()));
ASSERT_NE(builder2.get(), base_table_factory_->latest_table_builder());
builder2->Abandon();
}
TEST_F(TableBuilderTest, Basic) { TEST_F(TableBuilderTest, Basic) {
std::unique_ptr<WritableFileWriter> base_file; std::unique_ptr<WritableFileWriter> base_file;
NewBaseFileWriter(&base_file); NewBaseFileWriter(&base_file);
...@@ -356,9 +447,9 @@ TEST_F(TableBuilderTest, NumEntries) { ...@@ -356,9 +447,9 @@ TEST_F(TableBuilderTest, NumEntries) {
// To test size of each blob file is around blob_file_target_size after building // To test size of each blob file is around blob_file_target_size after building
TEST_F(TableBuilderTest, TargetSize) { TEST_F(TableBuilderTest, TargetSize) {
cf_options_.blob_file_target_size = kTargetBlobFileSize; cf_options_.blob_file_target_size = kTargetBlobFileSize;
table_factory_.reset(new TitanTableFactory(db_options_, cf_options_, table_factory_.reset(new TitanTableFactory(
blob_manager_, &mutex_, db_options_, cf_options_, db_impl_.get(), blob_manager_, &mutex_,
blob_file_set_.get(), nullptr)); blob_file_set_.get(), nullptr));
std::unique_ptr<WritableFileWriter> base_file; std::unique_ptr<WritableFileWriter> base_file;
NewBaseFileWriter(&base_file); NewBaseFileWriter(&base_file);
std::unique_ptr<TableBuilder> table_builder; std::unique_ptr<TableBuilder> table_builder;
...@@ -387,9 +478,9 @@ TEST_F(TableBuilderTest, TargetSize) { ...@@ -387,9 +478,9 @@ TEST_F(TableBuilderTest, TargetSize) {
// correct // correct
TEST_F(TableBuilderTest, LevelMerge) { TEST_F(TableBuilderTest, LevelMerge) {
cf_options_.level_merge = true; cf_options_.level_merge = true;
table_factory_.reset(new TitanTableFactory(db_options_, cf_options_, table_factory_.reset(new TitanTableFactory(
blob_manager_, &mutex_, db_options_, cf_options_, db_impl_.get(), blob_manager_, &mutex_,
blob_file_set_.get(), nullptr)); blob_file_set_.get(), nullptr));
std::unique_ptr<WritableFileWriter> base_file; std::unique_ptr<WritableFileWriter> base_file;
NewBaseFileWriter(&base_file); NewBaseFileWriter(&base_file);
std::unique_ptr<TableBuilder> table_builder; std::unique_ptr<TableBuilder> table_builder;
......
#include "table_factory.h" #include "table_factory.h"
#include "db_impl.h"
#include "table_builder.h" #include "table_builder.h"
namespace rocksdb { namespace rocksdb {
...@@ -20,6 +21,9 @@ TableBuilder* TitanTableFactory::NewTableBuilder( ...@@ -20,6 +21,9 @@ TableBuilder* TitanTableFactory::NewTableBuilder(
WritableFileWriter* file) const { WritableFileWriter* file) const {
std::unique_ptr<TableBuilder> base_builder( std::unique_ptr<TableBuilder> base_builder(
base_factory_->NewTableBuilder(options, column_family_id, file)); base_factory_->NewTableBuilder(options, column_family_id, file));
if (!db_impl_->initialized()) {
return base_builder.release();
}
TitanCFOptions cf_options = cf_options_; TitanCFOptions cf_options = cf_options_;
cf_options.blob_run_mode = blob_run_mode_.load(); cf_options.blob_run_mode = blob_run_mode_.load();
std::weak_ptr<BlobStorage> blob_storage; std::weak_ptr<BlobStorage> blob_storage;
......
...@@ -11,10 +11,12 @@ ...@@ -11,10 +11,12 @@
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
class TitanDBImpl;
class TitanTableFactory : public TableFactory { class TitanTableFactory : public TableFactory {
public: public:
TitanTableFactory(const TitanDBOptions& db_options, TitanTableFactory(const TitanDBOptions& db_options,
const TitanCFOptions& cf_options, const TitanCFOptions& cf_options, TitanDBImpl* db_impl,
std::shared_ptr<BlobFileManager> blob_manager, std::shared_ptr<BlobFileManager> blob_manager,
port::Mutex* db_mutex, BlobFileSet* blob_file_set, port::Mutex* db_mutex, BlobFileSet* blob_file_set,
TitanStats* stats) TitanStats* stats)
...@@ -22,6 +24,7 @@ class TitanTableFactory : public TableFactory { ...@@ -22,6 +24,7 @@ class TitanTableFactory : public TableFactory {
cf_options_(cf_options), cf_options_(cf_options),
blob_run_mode_(cf_options.blob_run_mode), blob_run_mode_(cf_options.blob_run_mode),
base_factory_(cf_options.table_factory), base_factory_(cf_options.table_factory),
db_impl_(db_impl),
blob_manager_(blob_manager), blob_manager_(blob_manager),
db_mutex_(db_mutex), db_mutex_(db_mutex),
blob_file_set_(blob_file_set), blob_file_set_(blob_file_set),
...@@ -66,6 +69,7 @@ class TitanTableFactory : public TableFactory { ...@@ -66,6 +69,7 @@ class TitanTableFactory : public TableFactory {
const TitanCFOptions cf_options_; const TitanCFOptions cf_options_;
std::atomic<TitanBlobRunMode> blob_run_mode_; std::atomic<TitanBlobRunMode> blob_run_mode_;
std::shared_ptr<TableFactory> base_factory_; std::shared_ptr<TableFactory> base_factory_;
TitanDBImpl* db_impl_;
std::shared_ptr<BlobFileManager> blob_manager_; std::shared_ptr<BlobFileManager> blob_manager_;
port::Mutex* db_mutex_; port::Mutex* db_mutex_;
BlobFileSet* blob_file_set_; BlobFileSet* blob_file_set_;
......
...@@ -310,6 +310,33 @@ class TitanDBTest : public testing::Test { ...@@ -310,6 +310,33 @@ class TitanDBTest : public testing::Test {
std::vector<ColumnFamilyHandle*> cf_handles_; std::vector<ColumnFamilyHandle*> cf_handles_;
}; };
TEST_F(TitanDBTest, Open) {
std::atomic<bool> checked_before_initialized{false};
std::atomic<bool> background_job_started{false};
SyncPoint::GetInstance()->SetCallBack(
"TitanDBImpl::OnFlushCompleted:Begin",
[&](void*) { background_job_started = true; });
SyncPoint::GetInstance()->SetCallBack(
"TitanDBImpl::OnCompactionCompleted:Begin",
[&](void*) { background_job_started = true; });
SyncPoint::GetInstance()->SetCallBack(
"TitanDBImpl::OpenImpl:BeforeInitialized", [&](void* arg) {
checked_before_initialized = true;
TitanDBImpl* db = reinterpret_cast<TitanDBImpl*>(arg);
// Try to trigger flush and compaction. Listeners should not be call.
ASSERT_OK(db->Put(WriteOptions(), "k1", "v1"));
ASSERT_OK(db->Flush(FlushOptions()));
ASSERT_OK(db->Put(WriteOptions(), "k1", "v2"));
ASSERT_OK(db->Put(WriteOptions(), "k2", "v3"));
ASSERT_OK(db->Flush(FlushOptions()));
ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
});
SyncPoint::GetInstance()->EnableProcessing();
Open();
ASSERT_TRUE(checked_before_initialized.load());
ASSERT_FALSE(background_job_started.load());
}
TEST_F(TitanDBTest, Basic) { TEST_F(TitanDBTest, Basic) {
const uint64_t kNumKeys = 100; const uint64_t kNumKeys = 100;
std::map<std::string, std::string> data; std::map<std::string, std::string> data;
......
...@@ -168,7 +168,7 @@ class TitanStats : public Statistics { ...@@ -168,7 +168,7 @@ class TitanStats : public Statistics {
// created after DB open. // created after DB open.
Status Initialize(std::map<uint32_t, TitanCFOptions> cf_options) { Status Initialize(std::map<uint32_t, TitanCFOptions> cf_options) {
for (auto& opts : cf_options) { for (auto& opts : cf_options) {
internal_stats_[opts.first] = NewTitanInternalStats(opts.second); internal_stats_[opts.first] = std::make_shared<TitanInternalStats>();
} }
return Status::OK(); return Status::OK();
} }
...@@ -279,11 +279,6 @@ class TitanStats : public Statistics { ...@@ -279,11 +279,6 @@ class TitanStats : public Statistics {
tickers_; tickers_;
std::array<HistogramImpl, INTERNAL_HISTOGRAM_ENUM_MAX - HISTOGRAM_ENUM_MAX> std::array<HistogramImpl, INTERNAL_HISTOGRAM_ENUM_MAX - HISTOGRAM_ENUM_MAX>
histograms_; histograms_;
std::shared_ptr<TitanInternalStats> NewTitanInternalStats(
TitanCFOptions& opts) {
return std::make_shared<TitanInternalStats>();
}
}; };
// Utility functions for Titan ticker and histogram stats types // Utility functions for Titan ticker and histogram stats types
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment