Unverified Commit c4e3271c authored by Xu Qiaolun's avatar Xu Qiaolun Committed by GitHub

feat: Titan should return user value to compaction filter (#164)

Closes #163 
Signed-off-by: 's avatarXu Qiaolun <Jamesxql@Gmail.com>
parent 47ff7da9
dist: xenial
dist: bionic
language: cpp
......
......@@ -100,6 +100,7 @@ if (WITH_TITAN_TESTS AND (NOT CMAKE_BUILD_TYPE STREQUAL "Release"))
titan_db_test
titan_options_test
util_test
compaction_filter_test
version_test)
set(TEST_LIBS
titan
......
......@@ -165,6 +165,12 @@ struct TitanCFOptions : public ColumnFamilyOptions {
// Default: false
bool gc_merge_rewrite{false};
// If set true, Titan will pass empty value in user compaction filter,
// improves compaction performance by avoid fetching value from blob files.
//
// Default: false
bool skip_value_in_compaction_filter{false};
TitanCFOptions() = default;
explicit TitanCFOptions(const ColumnFamilyOptions& options)
: ColumnFamilyOptions(options) {}
......@@ -192,7 +198,8 @@ struct ImmutableTitanCFOptions {
blob_file_discardable_ratio(opts.blob_file_discardable_ratio),
sample_file_size_ratio(opts.sample_file_size_ratio),
merge_small_file_threshold(opts.merge_small_file_threshold),
level_merge(opts.level_merge) {}
level_merge(opts.level_merge),
skip_value_in_compaction_filter(opts.skip_value_in_compaction_filter) {}
uint64_t min_blob_size;
......@@ -213,6 +220,8 @@ struct ImmutableTitanCFOptions {
uint64_t merge_small_file_threshold;
bool level_merge;
bool skip_value_in_compaction_filter;
};
struct MutableTitanCFOptions {
......
#pragma once
#include <string>
#include <utility>
#include "db_impl.h"
#include "rocksdb/compaction_filter.h"
#include "util/mutexlock.h"
namespace rocksdb {
namespace titandb {
class TitanCompactionFilter final : public CompactionFilter {
public:
TitanCompactionFilter(TitanDBImpl *db, const std::string &cf_name,
const CompactionFilter *original,
std::unique_ptr<CompactionFilter> &&owned_filter,
std::shared_ptr<BlobStorage> blob_storage,
bool skip_value)
: db_(db),
cf_name_(cf_name),
blob_storage_(std::move(blob_storage)),
original_filter_(original),
owned_filter_(std::move(owned_filter)),
skip_value_(skip_value),
filter_name_(std::string("TitanCompactionfilter.")
.append(original_filter_->Name())) {
assert(blob_storage_ != nullptr);
assert(original_filter_ != nullptr);
}
const char *Name() const override { return filter_name_.c_str(); }
Decision FilterV2(int level, const Slice &key, ValueType value_type,
const Slice &value, std::string *new_value,
std::string *skip_until) const override {
if (skip_value_) {
return original_filter_->FilterV2(level, key, value_type, Slice(),
new_value, skip_until);
}
if (value_type != kBlobIndex) {
return original_filter_->FilterV2(level, key, value_type, value,
new_value, skip_until);
}
BlobIndex blob_index;
Slice original_value(value.data());
Status s = blob_index.DecodeFrom(&original_value);
if (!s.ok()) {
ROCKS_LOG_ERROR(db_->db_options_.info_log,
"[%s] Unable to decode blob index", cf_name_.c_str());
// TODO(yiwu): Better to fail the compaction as well, but current
// compaction filter API doesn't support it.
{
MutexLock l(&db_->mutex_);
db_->SetBGError(s);
}
// Unable to decode blob index. Keeping the value.
return Decision::kKeep;
}
if (BlobIndex::IsDeletionMarker(blob_index)) {
// TODO(yiwu): handle deletion marker at bottom level.
return Decision::kKeep;
}
BlobRecord record;
PinnableSlice buffer;
ReadOptions read_options;
s = blob_storage_->Get(read_options, blob_index, &record, &buffer);
if (s.IsCorruption()) {
// Could be cause by blob file beinged GC-ed, or real corruption.
// TODO(yiwu): Tell the two cases apart.
return Decision::kKeep;
} else if (s.ok()) {
auto decision = original_filter_->FilterV2(
level, key, kValue, record.value, new_value, skip_until);
// It would be a problem if it change the value whereas the value_type
// is still kBlobIndex. For now, just returns kKeep.
// TODO: we should make rocksdb Filter API support changing value_type
// assert(decision != CompactionFilter::Decision::kChangeValue);
if (decision == Decision::kChangeValue) {
{
MutexLock l(&db_->mutex_);
db_->SetBGError(Status::NotSupported(
"It would be a problem if it change the value whereas the "
"value_type is still kBlobIndex."));
}
decision = Decision::kKeep;
}
return decision;
} else {
{
MutexLock l(&db_->mutex_);
db_->SetBGError(s);
}
// GetBlobRecord failed, keep the value.
return Decision::kKeep;
}
}
private:
TitanDBImpl *db_;
const std::string cf_name_;
std::shared_ptr<BlobStorage> blob_storage_;
const CompactionFilter *original_filter_;
const std::unique_ptr<CompactionFilter> owned_filter_;
bool skip_value_;
std::string filter_name_;
};
class TitanCompactionFilterFactory final : public CompactionFilterFactory {
public:
TitanCompactionFilterFactory(
const CompactionFilter *original_filter,
std::shared_ptr<CompactionFilterFactory> original_filter_factory,
TitanDBImpl *db, bool skip_value, const std::string &cf_name)
: original_filter_(original_filter),
original_filter_factory_(original_filter_factory),
titan_db_impl_(db),
skip_value_(skip_value),
cf_name_(cf_name) {
assert(original_filter != nullptr || original_filter_factory != nullptr);
if (original_filter_ != nullptr) {
factory_name_ = std::string("TitanCompactionFilterFactory.")
.append(original_filter_->Name());
} else {
factory_name_ = std::string("TitanCompactionFilterFactory.")
.append(original_filter_factory_->Name());
}
}
const char *Name() const override { return factory_name_.c_str(); }
std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context &context) override {
assert(original_filter_ != nullptr || original_filter_factory_ != nullptr);
std::shared_ptr<BlobStorage> blob_storage;
{
MutexLock l(&titan_db_impl_->mutex_);
blob_storage = titan_db_impl_->blob_file_set_
->GetBlobStorage(context.column_family_id)
.lock();
}
if (blob_storage == nullptr) {
assert(false);
// Shouldn't be here, but ignore compaction filter when we hit error.
return nullptr;
}
const CompactionFilter *original_filter = original_filter_;
std::unique_ptr<CompactionFilter> original_filter_from_factory;
if (original_filter == nullptr) {
original_filter_from_factory =
original_filter_factory_->CreateCompactionFilter(context);
original_filter = original_filter_from_factory.get();
}
return std::unique_ptr<CompactionFilter>(new TitanCompactionFilter(
titan_db_impl_, cf_name_, original_filter,
std::move(original_filter_from_factory), blob_storage, skip_value_));
}
private:
const CompactionFilter *original_filter_;
std::shared_ptr<CompactionFilterFactory> original_filter_factory_;
TitanDBImpl *titan_db_impl_;
bool skip_value_;
const std::string cf_name_;
std::string factory_name_;
};
} // namespace titandb
} // namespace rocksdb
#include "db_impl.h"
#include "test_util/testharness.h"
namespace rocksdb {
namespace titandb {
class TestCompactionFilter : public CompactionFilter {
public:
explicit TestCompactionFilter(uint64_t min_blob_size)
: min_blob_size_(min_blob_size) {}
const char *Name() const override { return "DeleteCompactionFilter"; }
bool Filter(int level, const Slice &key, const Slice &value,
std::string * /*&new_value*/,
bool * /*value_changed*/) const override {
AssertValue(key, value);
return !value.starts_with("remain");
}
private:
void AssertValue(const Slice &key, const Slice &value) const {
if (key.ToString() == "mykey") {
ASSERT_EQ(value.ToString(), "myvalue");
}
if (key.ToString() == "bigkey") {
ASSERT_EQ(value.ToString(), std::string(min_blob_size_ + 1, 'v'));
}
if (key.starts_with("skip")) {
ASSERT_EQ(value, Slice());
}
}
uint64_t min_blob_size_;
};
class TitanCompactionFilterTest : public testing::Test {
public:
TitanCompactionFilterTest() : dbname_(test::TmpDir()) {
options_.dirname = dbname_ + "/titandb";
options_.create_if_missing = true;
options_.disable_background_gc = true;
options_.disable_auto_compactions = true;
options_.compaction_filter =
new TestCompactionFilter(options_.min_blob_size);
DeleteDir(options_.dirname);
DeleteDir(dbname_);
}
~TitanCompactionFilterTest() override {
Close();
delete options_.compaction_filter;
DeleteDir(options_.dirname);
DeleteDir(dbname_);
}
static void DeleteDir(const std::string &dirname) {
Env *env = Env::Default();
std::vector<std::string> filenames;
env->GetChildren(dirname, &filenames);
for (auto &fname : filenames) {
env->DeleteFile(dirname + "/" + fname);
}
env->DeleteDir(dirname);
}
void Open() {
ASSERT_OK(TitanDB::Open(options_, dbname_, &db_));
db_impl_ = reinterpret_cast<TitanDBImpl *>(db_);
}
void Close() {
if (!db_) return;
ASSERT_OK(db_->Close());
delete db_;
db_ = nullptr;
}
Status Get(const std::string &key, std::string *value) {
ReadOptions ropts;
return db_->Get(ropts, key, value);
}
Status Put(const std::string &key, const std::string &value) {
WriteOptions wopts;
return db_->Put(wopts, key, value);
}
std::string GetBigValue() {
return std::string(options_.min_blob_size + 1, 'v');
}
void CompactAll() {
CompactRangeOptions copts;
ASSERT_OK(db_->CompactRange(copts, nullptr, nullptr));
}
protected:
std::string dbname_;
TitanOptions options_;
TitanDB *db_{nullptr};
TitanDBImpl *db_impl_{nullptr};
};
TEST_F(TitanCompactionFilterTest, CompactNormalValue) {
Open();
Status s = Put("mykey", "myvalue");
ASSERT_OK(s);
std::string value;
s = Get("mykey", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "myvalue");
CompactAll();
s = Get("mykey", &value);
ASSERT_TRUE(s.IsNotFound());
}
TEST_F(TitanCompactionFilterTest, CompactBlobValue) {
Open();
std::string value = GetBigValue();
ASSERT_GT(value.length(), options_.min_blob_size);
Status s = Put("bigkey", value);
ASSERT_OK(s);
std::string value1;
s = Get("bigkey", &value1);
ASSERT_OK(s);
ASSERT_EQ(value1, value);
CompactAll();
s = Get("bigkey", &value1);
ASSERT_TRUE(s.IsNotFound());
}
TEST_F(TitanCompactionFilterTest, CompactUpdateValue) {
options_.blob_file_discardable_ratio = 0.01;
options_.min_blob_size = 1;
options_.target_file_size_base = 1;
Open();
ASSERT_OK(db_->Put(WriteOptions(), "update-key", "remain1"));
ASSERT_OK(db_->Put(WriteOptions(), "update-another-key", "remain2"));
ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_OK(db_->Put(WriteOptions(), "update-key", "value"));
ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
uint32_t cf_id = db_->DefaultColumnFamily()->GetID();
ASSERT_OK(db_impl_->TEST_StartGC(cf_id));
ASSERT_OK(db_impl_->TEST_PurgeObsoleteFiles());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
std::string value;
ASSERT_TRUE(db_->Get(ReadOptions(), "update-key", &value).IsNotFound());
ASSERT_OK(db_->Get(ReadOptions(), "update-another-key", &value));
ASSERT_EQ(value, "remain2");
}
TEST_F(TitanCompactionFilterTest, CompactSkipValue) {
options_.skip_value_in_compaction_filter = true;
Open();
ASSERT_OK(db_->Put(WriteOptions(), "skip-key", "skip-value"));
ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
std::string value;
ASSERT_TRUE(db_->Get(ReadOptions(), "skip-key", &value).IsNotFound());
}
} // namespace titandb
} // namespace rocksdb
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
\ No newline at end of file
......@@ -5,21 +5,22 @@
#endif
#include <inttypes.h>
#include "logging/log_buffer.h"
#include "monitoring/statistics_impl.h"
#include "port/port.h"
#include "util/autovector.h"
#include "util/threadpool_imp.h"
#include "base_db_listener.h"
#include "blob_file_builder.h"
#include "blob_file_iterator.h"
#include "blob_file_size_collector.h"
#include "blob_gc.h"
#include "compaction_filter.h"
#include "db_iter.h"
#include "logging/log_buffer.h"
#include "monitoring/statistics_impl.h"
#include "port/port.h"
#include "table_factory.h"
#include "titan_build_version.h"
#include "titan_stats.h"
#include "util/autovector.h"
#include "util/threadpool_imp.h"
namespace rocksdb {
namespace titandb {
......@@ -269,6 +270,15 @@ Status TitanDBImpl::OpenImpl(const std::vector<TitanCFDescriptor>& descs,
blob_file_set_.get(), stats_.get()));
cf_opts.table_factory = titan_table_factories.back();
cf_opts.merge_operator = shared_merge_operator_;
if (cf_opts.compaction_filter != nullptr ||
cf_opts.compaction_filter_factory != nullptr) {
std::shared_ptr<TitanCompactionFilterFactory> titan_cf_factory =
std::make_shared<TitanCompactionFilterFactory>(
cf_opts.compaction_filter, cf_opts.compaction_filter_factory,
this, desc.options.skip_value_in_compaction_filter, desc.name);
cf_opts.compaction_filter = nullptr;
cf_opts.compaction_filter_factory = titan_cf_factory;
}
}
// Initialize GC thread pool.
if (!db_options_.disable_background_gc && db_options_.max_background_gc > 0) {
......@@ -400,6 +410,16 @@ Status TitanDBImpl::CreateColumnFamilies(
std::make_shared<BlobFileSizeCollectorFactory>());
options.merge_operator = shared_merge_operator_;
base_descs.emplace_back(desc.name, options);
if (options.compaction_filter != nullptr ||
options.compaction_filter_factory != nullptr) {
std::shared_ptr<TitanCompactionFilterFactory> titan_cf_factory =
std::make_shared<TitanCompactionFilterFactory>(
options.compaction_filter, options.compaction_filter_factory,
this, desc.options.skip_value_in_compaction_filter, desc.name);
options.compaction_filter = nullptr;
options.compaction_filter_factory = titan_cf_factory;
}
}
Status s = db_impl_->CreateColumnFamilies(base_descs, handles);
......
......@@ -23,6 +23,9 @@ struct TitanColumnFamilyInfo {
std::shared_ptr<TitanTableFactory> titan_table_factory;
};
class TitanCompactionFilterFactory;
class TitanCompactionFilter;
class TitanDBImpl : public TitanDB {
public:
TitanDBImpl(const TitanDBOptions& options, const std::string& dbname);
......@@ -167,6 +170,8 @@ class TitanDBImpl : public TitanDB {
friend class BaseDbListener;
friend class TitanDBTest;
friend class TitanThreadSafetyTest;
friend class TitanCompactionFilterFactory;
friend class TitanCompactionFilter;
Status OpenImpl(const std::vector<TitanCFDescriptor>& descs,
std::vector<ColumnFamilyHandle*>* handles);
......
......@@ -43,7 +43,9 @@ TitanCFOptions::TitanCFOptions(const ColumnFamilyOptions& cf_opts,
sample_file_size_ratio(immutable_opts.sample_file_size_ratio),
merge_small_file_threshold(immutable_opts.merge_small_file_threshold),
blob_run_mode(mutable_opts.blob_run_mode),
gc_merge_rewrite(mutable_opts.gc_merge_rewrite) {}
gc_merge_rewrite(mutable_opts.gc_merge_rewrite),
skip_value_in_compaction_filter(
immutable_opts.skip_value_in_compaction_filter) {}
void TitanCFOptions::Dump(Logger* logger) const {
ROCKS_LOG_HEADER(logger,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment