Unverified Commit 70dc779b authored by Xinye Tao's avatar Xinye Tao Committed by GitHub

Add blob-run-mode for downgrading Titan (#11)

* Add blob-run-mode for downgrading Titan

* fix format issue

* address comment

* change string req to kabab-case

* address comments

* address comments

* fix bug from merge

* address comments

* address comments

* address comments

* address comment

* address comment

* pick comments from mutex-fix

* remove mutex fix

* fix mutex issue in blob storage
parent 3429decb
......@@ -98,6 +98,14 @@ class TitanDB : public StackableDB {
const int output_path_id = -1,
std::vector<std::string>* const output_file_names = nullptr,
CompactionJobInfo* compaction_job_info = nullptr) override = 0;
using rocksdb::StackableDB::GetOptions;
Options GetOptions(ColumnFamilyHandle* column_family) const override = 0;
using rocksdb::StackableDB::SetOptions;
Status SetOptions(ColumnFamilyHandle* column_family,
const std::unordered_map<std::string, std::string>&
new_options) override = 0;
};
} // namespace titandb
......
......@@ -2,6 +2,9 @@
#include "rocksdb/options.h"
#include <map>
#include <unordered_map>
namespace rocksdb {
namespace titandb {
......@@ -31,6 +34,30 @@ struct TitanDBOptions : public DBOptions {
}
};
enum class TitanBlobRunMode {
kNormal = 0, // Titan process read/write as normal
kReadOnly = 1, // Titan stop writing value into blob log during flush
// and compaction. Existing values in blob log is still
// readable and garbage collected.
kFallback = 2, // On flush and compaction, Titan will convert blob
// index into real value, by reading from blob log,
// and store the value in SST file.
};
struct TitanOptionsHelper {
static std::map<TitanBlobRunMode, std::string> blob_run_mode_to_string;
static std::unordered_map<std::string, TitanBlobRunMode>
blob_run_mode_string_map;
};
static auto& blob_run_mode_to_string =
TitanOptionsHelper::blob_run_mode_to_string;
static auto& blob_run_mode_string_map =
TitanOptionsHelper::blob_run_mode_string_map;
struct ImmutableTitanCFOptions;
struct MutableTitanCFOptions;
struct TitanCFOptions : public ColumnFamilyOptions {
// The smallest value to store in blob files. Value smaller than
// this threshold will be inlined in base DB.
......@@ -53,34 +80,41 @@ struct TitanCFOptions : public ColumnFamilyOptions {
// Default: nullptr
std::shared_ptr<Cache> blob_cache;
// Max batch size for gc
// Max batch size for GC.
//
// Default: 1GB
uint64_t max_gc_batch_size{1 << 30};
// Min batch size for gc
// Min batch size for GC.
//
// Default: 512MB
uint64_t min_gc_batch_size{512 << 20};
// The ratio of how much discardable size of a blob file can be GC
// The ratio of how much discardable size of a blob file can be GC.
//
// Default: 0.5
float blob_file_discardable_ratio{0.5};
// The ratio of how much size of a blob file need to be sample before GC
// The ratio of how much size of a blob file need to be sample before GC.
//
// Default: 0.1
float sample_file_size_ratio{0.1};
// The blob file size less than this option will be mark gc
// The blob file size less than this option will be mark GC.
//
// Default: 8MB
uint64_t merge_small_file_threshold{8 << 20};
// The mode used to process blob file.
//
// Default: kNormal
TitanBlobRunMode blob_run_mode{TitanBlobRunMode::kNormal};
TitanCFOptions() = default;
explicit TitanCFOptions(const ColumnFamilyOptions& options)
: ColumnFamilyOptions(options) {}
explicit TitanCFOptions(const ImmutableTitanCFOptions&,
const MutableTitanCFOptions&);
TitanCFOptions& operator=(const ColumnFamilyOptions& options) {
*dynamic_cast<ColumnFamilyOptions*>(this) = options;
......@@ -90,6 +124,48 @@ struct TitanCFOptions : public ColumnFamilyOptions {
std::string ToString() const;
};
struct ImmutableTitanCFOptions {
ImmutableTitanCFOptions() : ImmutableTitanCFOptions(TitanCFOptions()) {}
explicit ImmutableTitanCFOptions(const TitanCFOptions& opts)
: min_blob_size(opts.min_blob_size),
blob_file_compression(opts.blob_file_compression),
blob_file_target_size(opts.blob_file_target_size),
blob_cache(opts.blob_cache),
max_gc_batch_size(opts.max_gc_batch_size),
min_gc_batch_size(opts.min_gc_batch_size),
blob_file_discardable_ratio(opts.blob_file_discardable_ratio),
sample_file_size_ratio(opts.sample_file_size_ratio),
merge_small_file_threshold(opts.merge_small_file_threshold) {}
uint64_t min_blob_size;
CompressionType blob_file_compression;
uint64_t blob_file_target_size;
std::shared_ptr<Cache> blob_cache;
uint64_t max_gc_batch_size;
uint64_t min_gc_batch_size;
float blob_file_discardable_ratio;
float sample_file_size_ratio;
uint64_t merge_small_file_threshold;
};
struct MutableTitanCFOptions {
MutableTitanCFOptions() : MutableTitanCFOptions(TitanCFOptions()) {}
explicit MutableTitanCFOptions(const TitanCFOptions& opts)
: blob_run_mode(opts.blob_run_mode) {}
TitanBlobRunMode blob_run_mode;
};
struct TitanOptions : public TitanDBOptions, public TitanCFOptions {
TitanOptions() = default;
explicit TitanOptions(const Options& options)
......
......@@ -41,10 +41,13 @@ class BlobGCJobTest : public testing::Test {
}
~BlobGCJobTest() {}
std::weak_ptr<BlobStorage> GetBlobStorage(uint32_t cf_id) {
MutexLock l(mutex_);
return version_set_->GetBlobStorage(cf_id);
}
void CheckBlobNumber(int expected) {
auto b =
version_set_->GetBlobStorage(base_db_->DefaultColumnFamily()->GetID())
.lock();
auto b = GetBlobStorage(base_db_->DefaultColumnFamily()->GetID()).lock();
ASSERT_EQ(expected, b->files_.size());
}
......@@ -184,9 +187,7 @@ class BlobGCJobTest : public testing::Test {
db_->Delete(WriteOptions(), GenKey(i));
}
Flush();
auto b =
version_set_->GetBlobStorage(base_db_->DefaultColumnFamily()->GetID())
.lock();
auto b = GetBlobStorage(base_db_->DefaultColumnFamily()->GetID()).lock();
ASSERT_EQ(b->files_.size(), 1);
auto old = b->files_.begin()->first;
// for (auto& f : b->files_) {
......@@ -202,8 +203,7 @@ class BlobGCJobTest : public testing::Test {
ASSERT_TRUE(iter->key().compare(Slice(GenKey(i))) == 0);
}
RunGC();
b = version_set_->GetBlobStorage(base_db_->DefaultColumnFamily()->GetID())
.lock();
b = GetBlobStorage(base_db_->DefaultColumnFamily()->GetID()).lock();
ASSERT_EQ(b->files_.size(), 1);
auto new1 = b->files_.begin()->first;
ASSERT_TRUE(old != new1);
......
......@@ -25,7 +25,7 @@ Status BlobStorage::NewPrefetcher(uint64_t file_number,
}
std::weak_ptr<BlobFileMeta> BlobStorage::FindFile(uint64_t file_number) const {
ReadLock rl(&mutex_);
MutexLock l(&mutex_);
auto it = files_.find(file_number);
if (it != files_.end()) {
assert(file_number == it->second->file_number());
......@@ -36,20 +36,20 @@ std::weak_ptr<BlobFileMeta> BlobStorage::FindFile(uint64_t file_number) const {
void BlobStorage::ExportBlobFiles(
std::map<uint64_t, std::weak_ptr<BlobFileMeta>>& ret) const {
ReadLock rl(&mutex_);
MutexLock l(&mutex_);
for (auto& kv : files_) {
ret.emplace(kv.first, std::weak_ptr<BlobFileMeta>(kv.second));
}
}
void BlobStorage::AddBlobFile(std::shared_ptr<BlobFileMeta>& file) {
WriteLock wl(&mutex_);
MutexLock l(&mutex_);
files_.emplace(std::make_pair(file->file_number(), file));
}
void BlobStorage::MarkFileObsolete(std::shared_ptr<BlobFileMeta> file,
SequenceNumber obsolete_sequence) {
WriteLock wl(&mutex_);
MutexLock l(&mutex_);
obsolete_files_.push_back(
std::make_pair(file->file_number(), obsolete_sequence));
file->FileStateTransit(BlobFileMeta::FileEvent::kDelete);
......@@ -57,7 +57,7 @@ void BlobStorage::MarkFileObsolete(std::shared_ptr<BlobFileMeta> file,
void BlobStorage::GetObsoleteFiles(std::vector<std::string>* obsolete_files,
SequenceNumber oldest_sequence) {
WriteLock wl(&mutex_);
MutexLock l(&mutex_);
for (auto it = obsolete_files_.begin(); it != obsolete_files_.end();) {
auto& file_number = it->first;
......@@ -89,22 +89,20 @@ void BlobStorage::GetObsoleteFiles(std::vector<std::string>* obsolete_files,
void BlobStorage::ComputeGCScore() {
// TODO: no need to recompute all everytime
MutexLock l(&mutex_);
gc_score_.clear();
{
ReadLock rl(&mutex_);
for (auto& file : files_) {
if (file.second->is_obsolete()) {
continue;
}
gc_score_.push_back({});
auto& gcs = gc_score_.back();
gcs.file_number = file.first;
if (file.second->file_size() < cf_options_.merge_small_file_threshold) {
gcs.score = 1;
} else {
gcs.score = file.second->GetDiscardableRatio();
}
for (auto& file : files_) {
if (file.second->is_obsolete()) {
continue;
}
gc_score_.push_back({});
auto& gcs = gc_score_.back();
gcs.file_number = file.first;
if (file.second->file_size() < cf_options_.merge_small_file_threshold) {
gcs.score = 1;
} else {
gcs.score = file.second->GetDiscardableRatio();
}
}
......
......@@ -53,7 +53,7 @@ class BlobStorage {
std::weak_ptr<BlobFileMeta> FindFile(uint64_t file_number) const;
std::size_t NumBlobFiles() const {
ReadLock rl(&mutex_);
MutexLock l(&mutex_);
return files_.size();
}
......@@ -61,23 +61,26 @@ class BlobStorage {
std::map<uint64_t, std::weak_ptr<BlobFileMeta>>& ret) const;
void MarkAllFilesForGC() {
WriteLock wl(&mutex_);
MutexLock l(&mutex_);
for (auto& file : files_) {
file.second->FileStateTransit(BlobFileMeta::FileEvent::kDbRestart);
}
}
void MarkDestroyed() {
WriteLock wl(&mutex_);
MutexLock l(&mutex_);
destroyed_ = true;
}
bool MaybeRemove() const {
ReadLock rl(&mutex_);
MutexLock l(&mutex_);
return destroyed_ && obsolete_files_.empty();
}
const std::vector<GCScore> gc_score() { return gc_score_; }
const std::vector<GCScore> gc_score() {
MutexLock l(&mutex_);
return gc_score_;
}
void ComputeGCScore();
......@@ -101,8 +104,7 @@ class BlobStorage {
TitanDBOptions db_options_;
TitanCFOptions cf_options_;
// Read Write Mutex, which protects the `files_` structures
mutable port::RWMutex mutex_;
mutable port::Mutex mutex_;
// Only BlobStorage OWNS BlobFileMeta
std::unordered_map<uint64_t, std::shared_ptr<BlobFileMeta>> files_;
......
......@@ -112,9 +112,7 @@ class TitanDBImpl::FileManager : public BlobFileManager {
TitanDBImpl::TitanDBImpl(const TitanDBOptions& options,
const std::string& dbname)
: TitanDB(),
mutex_(),
bg_cv_(&mutex_),
: bg_cv_(&mutex_),
dbname_(dbname),
env_(options.env),
env_options_(options),
......@@ -173,11 +171,12 @@ Status TitanDBImpl::Open(const std::vector<TitanCFDescriptor>& descs,
db_->DestroyColumnFamilyHandle(handle);
// Replaces the provided table factory with TitanTableFactory.
// While we need to preserve original table_factory for GetOptions.
auto& original_table_factory = base_descs[i].options.table_factory;
assert(original_table_factory != nullptr);
original_table_factory_[cf_id] = original_table_factory;
base_descs[i].options.table_factory = std::make_shared<TitanTableFactory>(
db_options_, descs[i].options, blob_manager_);
auto& base_table_factory = base_descs[i].options.table_factory;
assert(base_table_factory != nullptr);
base_table_factory_[cf_id] = base_table_factory;
titan_table_factory_[cf_id] = std::make_shared<TitanTableFactory>(
db_options_, descs[i].options, blob_manager_, &mutex_, vset_.get());
base_descs[i].options.table_factory = titan_table_factory_[cf_id];
// Add TableProperties for collecting statistics GC
base_descs[i].options.table_properties_collector_factories.emplace_back(
std::make_shared<BlobFileSizeCollectorFactory>());
......@@ -268,11 +267,15 @@ Status TitanDBImpl::CreateColumnFamilies(
const std::vector<TitanCFDescriptor>& descs,
std::vector<ColumnFamilyHandle*>* handles) {
std::vector<ColumnFamilyDescriptor> base_descs;
std::vector<std::shared_ptr<TableFactory>> base_table_factory;
std::vector<std::shared_ptr<TitanTableFactory>> titan_table_factory;
for (auto& desc : descs) {
ColumnFamilyOptions options = desc.options;
// Replaces the provided table factory with TitanTableFactory.
options.table_factory.reset(
new TitanTableFactory(db_options_, desc.options, blob_manager_));
base_table_factory.emplace_back(options.table_factory);
titan_table_factory.emplace_back(std::make_shared<TitanTableFactory>(
db_options_, desc.options, blob_manager_, &mutex_, vset_.get()));
options.table_factory = titan_table_factory.back();
base_descs.emplace_back(desc.name, options);
}
......@@ -281,11 +284,16 @@ Status TitanDBImpl::CreateColumnFamilies(
if (s.ok()) {
std::map<uint32_t, TitanCFOptions> column_families;
for (size_t i = 0; i < descs.size(); i++) {
column_families.emplace((*handles)[i]->GetID(), descs[i].options);
{
MutexLock l(&mutex_);
for (size_t i = 0; i < descs.size(); i++) {
uint32_t cf_id = (*handles)[i]->GetID();
column_families.emplace(cf_id, descs[i].options);
base_table_factory_[cf_id] = base_table_factory[i];
titan_table_factory_[cf_id] = titan_table_factory[i];
}
vset_->AddColumnFamilies(column_families);
}
MutexLock l(&mutex_);
vset_->AddColumnFamilies(column_families);
}
return s;
}
......@@ -299,6 +307,10 @@ Status TitanDBImpl::DropColumnFamilies(
Status s = db_impl_->DropColumnFamilies(handles);
if (s.ok()) {
MutexLock l(&mutex_);
for (auto cf_id : column_families) {
base_table_factory_.erase(cf_id);
titan_table_factory_.erase(cf_id);
}
SequenceNumber obsolete_sequence = db_impl_->GetLatestSequenceNumber();
s = vset_->DropColumnFamilies(column_families, obsolete_sequence);
}
......@@ -484,8 +496,10 @@ Options TitanDBImpl::GetOptions(ColumnFamilyHandle* column_family) const {
assert(column_family != nullptr);
Options options = db_->GetOptions(column_family);
uint32_t cf_id = column_family->GetID();
if (original_table_factory_.count(cf_id) > 0) {
options.table_factory = original_table_factory_.at(cf_id);
MutexLock l(&mutex_);
if (base_table_factory_.count(cf_id) > 0) {
options.table_factory = base_table_factory_.at(cf_id);
} else {
ROCKS_LOG_ERROR(
db_options_.info_log,
......@@ -496,6 +510,40 @@ Options TitanDBImpl::GetOptions(ColumnFamilyHandle* column_family) const {
return options;
}
Status TitanDBImpl::SetOptions(
ColumnFamilyHandle* column_family,
const std::unordered_map<std::string, std::string>& new_options) {
Status s;
auto opts = new_options;
auto p = opts.find("blob_run_mode");
bool set_blob_run_mode = (p != opts.end());
std::string blob_run_mode_string;
if (set_blob_run_mode) {
blob_run_mode_string = p->second;
opts.erase(p);
}
if (opts.size() > 0) {
s = db_->SetOptions(column_family, opts);
if (!s.ok()) {
return s;
}
}
TitanBlobRunMode mode = TitanBlobRunMode::kNormal;
auto pm = blob_run_mode_string_map.find(blob_run_mode_string);
if (pm == blob_run_mode_string_map.end()) {
return Status::InvalidArgument("No blob_run_mode defined for " +
blob_run_mode_string);
} else {
mode = pm->second;
}
{
MutexLock l(&mutex_);
auto& table_factory = titan_table_factory_[column_family->GetID()];
table_factory->SetBlobRunMode(mode);
}
return Status::OK();
}
void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) {
const auto& tps = flush_job_info.table_properties;
auto ucp_iter = tps.user_collected_properties.find(
......
......@@ -3,6 +3,7 @@
#include "blob_file_manager.h"
#include "db/db_impl.h"
#include "rocksdb/statistics.h"
#include "table_factory.h"
#include "titan/db.h"
#include "util/repeatable_thread.h"
#include "version_set.h"
......@@ -67,6 +68,11 @@ class TitanDBImpl : public TitanDB {
using TitanDB::GetOptions;
Options GetOptions(ColumnFamilyHandle* column_family) const override;
using TitanDB::SetOptions;
Status SetOptions(
ColumnFamilyHandle* column_family,
const std::unordered_map<std::string, std::string>& new_options) override;
void OnFlushCompleted(const FlushJobInfo& flush_job_info);
void OnCompactionCompleted(const CompactionJobInfo& compaction_job_info);
......@@ -135,7 +141,7 @@ class TitanDBImpl : public TitanDB {
// while the unlock sequence must be Base DB mutex.Unlock() ->
// Titan.mutex_.Unlock() Only if we all obey these sequence, we can prevent
// potential dead lock.
port::Mutex mutex_;
mutable port::Mutex mutex_;
// This condition variable is signaled on these conditions:
// * whenever bg_gc_scheduled_ goes down to 0
port::CondVar bg_cv_;
......@@ -151,7 +157,9 @@ class TitanDBImpl : public TitanDB {
Statistics* stats_;
std::unordered_map<uint32_t, std::shared_ptr<TableFactory>>
original_table_factory_;
base_table_factory_;
std::unordered_map<uint32_t, std::shared_ptr<TitanTableFactory>>
titan_table_factory_;
// handle for purging obsolete blob files at fixed intervals
std::unique_ptr<RepeatableThread> thread_purge_obsolete_;
......
......@@ -11,6 +11,19 @@
namespace rocksdb {
namespace titandb {
TitanCFOptions::TitanCFOptions(const ImmutableTitanCFOptions& immutable_opts,
const MutableTitanCFOptions& mutable_opts)
: min_blob_size(immutable_opts.min_blob_size),
blob_file_compression(immutable_opts.blob_file_compression),
blob_file_target_size(immutable_opts.blob_file_target_size),
blob_cache(immutable_opts.blob_cache),
max_gc_batch_size(immutable_opts.max_gc_batch_size),
min_gc_batch_size(immutable_opts.min_gc_batch_size),
blob_file_discardable_ratio(immutable_opts.blob_file_discardable_ratio),
sample_file_size_ratio(immutable_opts.sample_file_size_ratio),
merge_small_file_threshold(immutable_opts.merge_small_file_threshold),
blob_run_mode(mutable_opts.blob_run_mode) {}
std::string TitanCFOptions::ToString() const {
char buf[256];
std::string str;
......@@ -23,8 +36,23 @@ std::string TitanCFOptions::ToString() const {
snprintf(buf, sizeof(buf), "blob_file_target_size = %" PRIu64 "\n",
blob_file_target_size);
res += buf;
snprintf(buf, sizeof(buf), "blob_run_mode = %s\n",
blob_run_mode_to_string[blob_run_mode].c_str());
res += buf;
return res;
}
std::map<TitanBlobRunMode, std::string>
TitanOptionsHelper::blob_run_mode_to_string = {
{TitanBlobRunMode::kNormal, "kNormal"},
{TitanBlobRunMode::kReadOnly, "kReadOnly"},
{TitanBlobRunMode::kFallback, "kFallback"}};
std::unordered_map<std::string, TitanBlobRunMode>
TitanOptionsHelper::blob_run_mode_string_map = {
{"kNormal", TitanBlobRunMode::kNormal},
{"kReadOnly", TitanBlobRunMode::kReadOnly},
{"kFallback", TitanBlobRunMode::kFallback}};
} // namespace titandb
} // namespace rocksdb
......@@ -12,19 +12,45 @@ void TitanTableBuilder::Add(const Slice& key, const Slice& value) {
return;
}
if (ikey.type != kTypeValue || value.size() < cf_options_.min_blob_size) {
base_builder_->Add(key, value);
return;
}
if (ikey.type == kTypeBlobIndex &&
cf_options_.blob_run_mode == TitanBlobRunMode::kFallback) {
// we ingest value from blob file
Slice copy = value;
BlobIndex index;
status_ = index.DecodeFrom(&copy);
if (!ok()) {
return;
}
std::string index_value;
AddBlob(ikey.user_key, value, &index_value);
if (!ok()) return;
BlobRecord record;
PinnableSlice buffer;
auto storage = blob_storage_.lock();
assert(storage != nullptr);
ikey.type = kTypeBlobIndex;
std::string index_key;
AppendInternalKey(&index_key, ikey);
base_builder_->Add(index_key, index_value);
ReadOptions options; // dummy option
status_ = storage->Get(options, index, &record, &buffer);
if (ok()) {
ikey.type = kTypeValue;
std::string index_key;
AppendInternalKey(&index_key, ikey);
base_builder_->Add(index_key, record.value);
}
} else if (ikey.type == kTypeValue &&
value.size() >= cf_options_.min_blob_size &&
cf_options_.blob_run_mode == TitanBlobRunMode::kNormal) {
// we write to blob file and insert index
std::string index_value;
AddBlob(ikey.user_key, value, &index_value);
if (ok()) {
ikey.type = kTypeBlobIndex;
std::string index_key;
AppendInternalKey(&index_key, ikey);
base_builder_->Add(index_key, index_value);
}
} else {
base_builder_->Add(key, value);
}
}
void TitanTableBuilder::AddBlob(const Slice& key, const Slice& value,
......
......@@ -4,6 +4,7 @@
#include "blob_file_manager.h"
#include "table/table_builder.h"
#include "titan/options.h"
#include "version_set.h"
namespace rocksdb {
namespace titandb {
......@@ -13,12 +14,14 @@ class TitanTableBuilder : public TableBuilder {
TitanTableBuilder(uint32_t cf_id, const TitanDBOptions& db_options,
const TitanCFOptions& cf_options,
std::unique_ptr<TableBuilder> base_builder,
std::shared_ptr<BlobFileManager> blob_manager)
std::shared_ptr<BlobFileManager> blob_manager,
std::weak_ptr<BlobStorage> blob_storage)
: cf_id_(cf_id),
db_options_(db_options),
cf_options_(cf_options),
base_builder_(std::move(base_builder)),
blob_manager_(blob_manager),
blob_storage_(blob_storage),
stats_(db_options.statistics.get()) {}
void Add(const Slice& key, const Slice& value) override;
......@@ -50,6 +53,7 @@ class TitanTableBuilder : public TableBuilder {
std::unique_ptr<BlobFileHandle> blob_handle_;
std::shared_ptr<BlobFileManager> blob_manager_;
std::unique_ptr<BlobFileBuilder> blob_builder_;
std::weak_ptr<BlobStorage> blob_storage_;
Statistics* stats_;
};
......
......@@ -5,6 +5,7 @@
#include "table_factory.h"
#include "util/filename.h"
#include "util/testharness.h"
#include "version_set.h"
namespace rocksdb {
namespace titandb {
......@@ -79,9 +80,10 @@ class TableBuilderTest : public testing::Test {
blob_name_(BlobFileName(tmpdir_, kTestFileNumber)) {
db_options_.dirname = tmpdir_;
cf_options_.min_blob_size = kMinBlobSize;
vset_.reset(new VersionSet(db_options_));
blob_manager_.reset(new FileManager(db_options_));
table_factory_.reset(
new TitanTableFactory(db_options_, cf_options_, blob_manager_));
table_factory_.reset(new TitanTableFactory(
db_options_, cf_options_, blob_manager_, &mutex_, vset_.get()));
}
~TableBuilderTest() {
......@@ -151,6 +153,8 @@ class TableBuilderTest : public testing::Test {
result->reset(table_factory_->NewTableBuilder(options, 0, file));
}
port::Mutex mutex_;
Env* env_{Env::Default()};
EnvOptions env_options_;
Options options_;
......@@ -165,6 +169,7 @@ class TableBuilderTest : public testing::Test {
std::string blob_name_;
std::unique_ptr<TableFactory> table_factory_;
std::shared_ptr<BlobFileManager> blob_manager_;
std::unique_ptr<VersionSet> vset_;
};
TEST_F(TableBuilderTest, Basic) {
......
......@@ -20,12 +20,25 @@ TableBuilder* TitanTableFactory::NewTableBuilder(
WritableFileWriter* file) const {
std::unique_ptr<TableBuilder> base_builder(
base_factory_->NewTableBuilder(options, column_family_id, file));
return new TitanTableBuilder(column_family_id, db_options_, cf_options_,
std::move(base_builder), blob_manager_);
TitanCFOptions cf_options;
{
MutexLock l(&mutex_);
cf_options = TitanCFOptions(immutable_cf_options_, mutable_cf_options_);
}
std::weak_ptr<BlobStorage> blob_storage;
{
MutexLock l(db_mutex_);
blob_storage = vset_->GetBlobStorage(column_family_id);
}
return new TitanTableBuilder(column_family_id, db_options_, cf_options,
std::move(base_builder), blob_manager_,
blob_storage);
}
std::string TitanTableFactory::GetPrintableTableOptions() const {
return base_factory_->GetPrintableTableOptions() + cf_options_.ToString();
MutexLock l(&mutex_);
return base_factory_->GetPrintableTableOptions() +
TitanCFOptions(immutable_cf_options_, mutable_cf_options_).ToString();
}
} // namespace titandb
......
......@@ -3,6 +3,7 @@
#include "blob_file_manager.h"
#include "rocksdb/table.h"
#include "titan/options.h"
#include "version_set.h"
namespace rocksdb {
namespace titandb {
......@@ -11,11 +12,15 @@ class TitanTableFactory : public TableFactory {
public:
TitanTableFactory(const TitanDBOptions& db_options,
const TitanCFOptions& cf_options,
std::shared_ptr<BlobFileManager> blob_manager)
std::shared_ptr<BlobFileManager> blob_manager,
port::Mutex* db_mutex, VersionSet* vset)
: db_options_(db_options),
cf_options_(cf_options),
immutable_cf_options_(cf_options),
mutable_cf_options_(cf_options),
base_factory_(cf_options.table_factory),
blob_manager_(blob_manager) {}
blob_manager_(blob_manager),
db_mutex_(db_mutex),
vset_(vset) {}
const char* Name() const override { return "TitanTable"; }
......@@ -45,15 +50,25 @@ class TitanTableFactory : public TableFactory {
void* GetOptions() override { return base_factory_->GetOptions(); }
void SetBlobRunMode(TitanBlobRunMode mode) {
MutexLock l(&mutex_);
mutable_cf_options_.blob_run_mode = mode;
}
bool IsDeleteRangeSupported() const override {
return base_factory_->IsDeleteRangeSupported();
}
private:
mutable port::Mutex mutex_;
TitanDBOptions db_options_;
TitanCFOptions cf_options_;
ImmutableTitanCFOptions immutable_cf_options_;
MutableTitanCFOptions mutable_cf_options_;
std::shared_ptr<TableFactory> base_factory_;
std::shared_ptr<BlobFileManager> blob_manager_;
port::Mutex* db_mutex_;
VersionSet* vset_;
};
} // namespace titandb
......
#include <inttypes.h>
#include <options/cf_options.h>
#include <unordered_map>
#include "blob_file_iterator.h"
#include "blob_file_reader.h"
#include "db_impl.h"
#include "db_iter.h"
#include "rocksdb/utilities/debug.h"
#include "titan/db.h"
#include "titan_fault_injection_test_env.h"
#include "util/filename.h"
......@@ -122,6 +124,7 @@ class TitanDBTest : public testing::Test {
if (cf_handle == nullptr) {
cf_handle = db_->DefaultColumnFamily();
}
MutexLock l(&db_impl_->mutex_);
return db_impl_->vset_->GetBlobStorage(cf_handle->GetID());
}
......@@ -610,6 +613,90 @@ TEST_F(TitanDBTest, BlobFileCorruptionErrorHandling) {
}
#endif // !NDEBUG
TEST_F(TitanDBTest, Options) {
Open();
std::unordered_map<std::string, std::string> opts;
opts["blob_run_mode"] = "kReadOnly";
ASSERT_OK(db_->SetOptions(opts));
opts["disable_auto_compactions"] = "true";
ASSERT_OK(db_->SetOptions(opts));
}
TEST_F(TitanDBTest, BlobRunModeBasic) {
options_.disable_background_gc = true;
Open();
const uint64_t kNumEntries = 1000;
const uint64_t kMaxKeys = 100000;
std::unordered_map<std::string, std::string> opts;
std::map<std::string, std::string> data;
std::vector<KeyVersion> version;
std::string begin_key;
std::string end_key;
uint64_t num_blob_files;
for (uint64_t i = 1; i <= kNumEntries; i++) {
Put(i, &data);
}
begin_key = GenKey(1);
end_key = GenKey(kNumEntries);
ASSERT_EQ(kNumEntries, data.size());
VerifyDB(data);
Flush();
auto blob = GetBlobStorage();
num_blob_files = blob.lock()->NumBlobFiles();
VerifyDB(data);
GetAllKeyVersions(db_, begin_key, end_key, kMaxKeys, &version);
for (auto v : version) {
if (data[v.user_key].size() >= options_.min_blob_size) {
ASSERT_EQ(v.type, static_cast<int>(ValueType::kTypeBlobIndex));
} else {
ASSERT_EQ(v.type, static_cast<int>(ValueType::kTypeValue));
}
}
version.clear();
opts["blob_run_mode"] = "kReadOnly";
db_->SetOptions(opts);
for (uint64_t i = kNumEntries + 1; i <= kNumEntries * 2; i++) {
Put(i, &data);
}
begin_key = GenKey(kNumEntries + 1);
end_key = GenKey(kNumEntries * 2);
ASSERT_EQ(kNumEntries * 2, data.size());
VerifyDB(data);
Flush();
blob = GetBlobStorage();
ASSERT_EQ(num_blob_files, blob.lock()->NumBlobFiles());
VerifyDB(data);
GetAllKeyVersions(db_, begin_key, end_key, kMaxKeys, &version);
for (auto v : version) {
ASSERT_EQ(v.type, static_cast<int>(ValueType::kTypeValue));
}
version.clear();
opts["blob_run_mode"] = "fallback";
db_->SetOptions(opts);
for (uint64_t i = kNumEntries * 2 + 1; i <= kNumEntries * 3; i++) {
Put(i, &data);
}
begin_key = GenKey(kNumEntries * 2 + 1);
end_key = GenKey(kNumEntries * 3);
ASSERT_EQ(kNumEntries * 3, data.size());
VerifyDB(data);
Flush();
blob = GetBlobStorage();
ASSERT_EQ(num_blob_files, blob.lock()->NumBlobFiles());
VerifyDB(data);
GetAllKeyVersions(db_, begin_key, end_key, kMaxKeys, &version);
for (auto v : version) {
ASSERT_EQ(v.type, static_cast<int>(ValueType::kTypeValue));
}
version.clear();
}
} // namespace titandb
} // namespace rocksdb
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment