Unverified Commit 4dc4ba89 authored by Xinye Tao's avatar Xinye Tao Committed by GitHub

[PCP-21] Titan GC doesn’t affect online write (#121)

* Green Blob GC by merge operator
Signed-off-by: 's avatartabokie <xy.tao@outlook.com>
parent ffaa9d1a
...@@ -93,6 +93,7 @@ if (WITH_TITAN_TESTS AND (NOT CMAKE_BUILD_TYPE STREQUAL "Release")) ...@@ -93,6 +93,7 @@ if (WITH_TITAN_TESTS AND (NOT CMAKE_BUILD_TYPE STREQUAL "Release"))
blob_format_test blob_format_test
blob_gc_job_test blob_gc_job_test
blob_gc_picker_test blob_gc_picker_test
blob_index_merge_operator_test
gc_stats_test gc_stats_test
table_builder_test table_builder_test
thread_safety_test thread_safety_test
......
...@@ -155,6 +155,16 @@ struct TitanCFOptions : public ColumnFamilyOptions { ...@@ -155,6 +155,16 @@ struct TitanCFOptions : public ColumnFamilyOptions {
// Default: 20 // Default: 20
int max_sorted_runs{20}; int max_sorted_runs{20};
// If set true, Titan will rewrite valid blob index from GC output as merge
// operands back to data store.
//
// With this feature enabled, Titan background GC won't block online write,
// trade-off being read performance slightly reduced compared to normal
// rewrite mode.
//
// Default: false
bool gc_merge_rewrite{false};
TitanCFOptions() = default; TitanCFOptions() = default;
explicit TitanCFOptions(const ColumnFamilyOptions& options) explicit TitanCFOptions(const ColumnFamilyOptions& options)
: ColumnFamilyOptions(options) {} : ColumnFamilyOptions(options) {}
...@@ -209,9 +219,11 @@ struct MutableTitanCFOptions { ...@@ -209,9 +219,11 @@ struct MutableTitanCFOptions {
MutableTitanCFOptions() : MutableTitanCFOptions(TitanCFOptions()) {} MutableTitanCFOptions() : MutableTitanCFOptions(TitanCFOptions()) {}
explicit MutableTitanCFOptions(const TitanCFOptions& opts) explicit MutableTitanCFOptions(const TitanCFOptions& opts)
: blob_run_mode(opts.blob_run_mode) {} : blob_run_mode(opts.blob_run_mode),
gc_merge_rewrite(opts.gc_merge_rewrite) {}
TitanBlobRunMode blob_run_mode; TitanBlobRunMode blob_run_mode;
bool gc_merge_rewrite;
}; };
struct TitanOptions : public TitanDBOptions, public TitanCFOptions { struct TitanOptions : public TitanDBOptions, public TitanCFOptions {
......
...@@ -46,15 +46,24 @@ Status BlobFileSizeCollector::AddUserKey(const Slice& /* key */, ...@@ -46,15 +46,24 @@ Status BlobFileSizeCollector::AddUserKey(const Slice& /* key */,
const Slice& value, EntryType type, const Slice& value, EntryType type,
SequenceNumber /* seq */, SequenceNumber /* seq */,
uint64_t /* file_size */) { uint64_t /* file_size */) {
if (type != kEntryBlobIndex) { if (type != kEntryBlobIndex && type != kEntryMerge) {
return Status::OK(); return Status::OK();
} }
BlobIndex index; Status s;
auto s = index.DecodeFrom(const_cast<Slice*>(&value)); MergeBlobIndex index;
if (type == kEntryMerge) {
s = index.DecodeFrom(const_cast<Slice*>(&value));
} else {
s = index.DecodeFromBase(const_cast<Slice*>(&value));
}
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
if (BlobIndex::IsDeletionMarker(index)) {
return Status::OK();
}
auto iter = blob_files_size_.find(index.file_number); auto iter = blob_files_size_.find(index.file_number);
if (iter == blob_files_size_.end()) { if (iter == blob_files_size_.end()) {
......
...@@ -85,6 +85,8 @@ TEST_F(BlobFileSizeCollectorTest, Basic) { ...@@ -85,6 +85,8 @@ TEST_F(BlobFileSizeCollectorTest, Basic) {
std::unique_ptr<TableBuilder> table_builder; std::unique_ptr<TableBuilder> table_builder;
NewTableBuilder(wfile.get(), &table_builder); NewTableBuilder(wfile.get(), &table_builder);
constexpr uint64_t kFirstFileNumber = 1ULL;
constexpr uint64_t kSecondFileNumber = 2ULL;
const int kNumEntries = 100; const int kNumEntries = 100;
char buf[16]; char buf[16];
for (int i = 0; i < kNumEntries; i++) { for (int i = 0; i < kNumEntries; i++) {
...@@ -97,9 +99,9 @@ TEST_F(BlobFileSizeCollectorTest, Basic) { ...@@ -97,9 +99,9 @@ TEST_F(BlobFileSizeCollectorTest, Basic) {
BlobIndex index; BlobIndex index;
if (i % 2 == 0) { if (i % 2 == 0) {
index.file_number = 0ULL; index.file_number = kFirstFileNumber;
} else { } else {
index.file_number = 1ULL; index.file_number = kSecondFileNumber;
} }
index.blob_handle.size = 10; index.blob_handle.size = 10;
std::string value; std::string value;
...@@ -130,8 +132,8 @@ TEST_F(BlobFileSizeCollectorTest, Basic) { ...@@ -130,8 +132,8 @@ TEST_F(BlobFileSizeCollectorTest, Basic) {
ASSERT_EQ(2, result.size()); ASSERT_EQ(2, result.size());
ASSERT_EQ(kNumEntries / 2 * 10, result[0]); ASSERT_EQ(kNumEntries / 2 * 10, result[kFirstFileNumber]);
ASSERT_EQ(kNumEntries / 2 * 10, result[1]); ASSERT_EQ(kNumEntries / 2 * 10, result[kSecondFileNumber]);
} }
} // namespace titandb } // namespace titandb
......
...@@ -125,9 +125,51 @@ Status BlobIndex::DecodeFrom(Slice* src) { ...@@ -125,9 +125,51 @@ Status BlobIndex::DecodeFrom(Slice* src) {
return s; return s;
} }
bool operator==(const BlobIndex& lhs, const BlobIndex& rhs) { void BlobIndex::EncodeDeletionMarkerTo(std::string* dst) {
return (lhs.file_number == rhs.file_number && dst->push_back(kBlobRecord);
lhs.blob_handle == rhs.blob_handle); PutVarint64(dst, 0);
BlobHandle dummy;
dummy.EncodeTo(dst);
}
bool BlobIndex::IsDeletionMarker(const BlobIndex& index) {
return index.file_number == 0;
}
bool BlobIndex::operator==(const BlobIndex& rhs) const {
return (file_number == rhs.file_number && blob_handle == rhs.blob_handle);
}
void MergeBlobIndex::EncodeTo(std::string* dst) const {
BlobIndex::EncodeTo(dst);
PutVarint64(dst, source_file_number);
PutVarint64(dst, source_file_offset);
}
void MergeBlobIndex::EncodeToBase(std::string* dst) const {
BlobIndex::EncodeTo(dst);
}
Status MergeBlobIndex::DecodeFrom(Slice* src) {
Status s = BlobIndex::DecodeFrom(src);
if (!s.ok()) {
return s;
}
if (!GetVarint64(src, &source_file_number) ||
!GetVarint64(src, &source_file_offset)) {
return Status::Corruption("MergeBlobIndex");
}
return s;
}
Status MergeBlobIndex::DecodeFromBase(Slice* src) {
return BlobIndex::DecodeFrom(src);
}
bool MergeBlobIndex::operator==(const MergeBlobIndex& rhs) const {
return (source_file_number == rhs.source_file_number &&
source_file_offset == rhs.source_file_offset &&
BlobIndex::operator==(rhs));
} }
void BlobFileMeta::EncodeTo(std::string* dst) const { void BlobFileMeta::EncodeTo(std::string* dst) const {
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/types.h"
#include "table/format.h" #include "table/format.h"
#include "util.h" #include "util.h"
...@@ -139,10 +140,26 @@ struct BlobIndex { ...@@ -139,10 +140,26 @@ struct BlobIndex {
uint64_t file_number{0}; uint64_t file_number{0};
BlobHandle blob_handle; BlobHandle blob_handle;
virtual ~BlobIndex() {}
void EncodeTo(std::string* dst) const;
Status DecodeFrom(Slice* src);
static void EncodeDeletionMarkerTo(std::string* dst);
static bool IsDeletionMarker(const BlobIndex& index);
bool operator==(const BlobIndex& rhs) const;
};
struct MergeBlobIndex : public BlobIndex {
uint64_t source_file_number{0};
uint64_t source_file_offset{0};
void EncodeTo(std::string* dst) const; void EncodeTo(std::string* dst) const;
void EncodeToBase(std::string* dst) const;
Status DecodeFrom(Slice* src); Status DecodeFrom(Slice* src);
Status DecodeFromBase(Slice* src);
friend bool operator==(const BlobIndex& lhs, const BlobIndex& rhs); bool operator==(const MergeBlobIndex& rhs) const;
}; };
// Format of blob file meta (not fixed size): // Format of blob file meta (not fixed size):
......
...@@ -7,6 +7,8 @@ ...@@ -7,6 +7,8 @@
#include "blob_gc_job.h" #include "blob_gc_job.h"
#include "blob_file_size_collector.h"
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
...@@ -73,7 +75,8 @@ class BlobGCJob::GarbageCollectionWriteCallback : public WriteCallback { ...@@ -73,7 +75,8 @@ class BlobGCJob::GarbageCollectionWriteCallback : public WriteCallback {
}; };
BlobGCJob::BlobGCJob(BlobGC* blob_gc, DB* db, port::Mutex* mutex, BlobGCJob::BlobGCJob(BlobGC* blob_gc, DB* db, port::Mutex* mutex,
const TitanDBOptions& titan_db_options, Env* env, const TitanDBOptions& titan_db_options,
bool gc_merge_rewrite, Env* env,
const EnvOptions& env_options, const EnvOptions& env_options,
BlobFileManager* blob_file_manager, BlobFileManager* blob_file_manager,
BlobFileSet* blob_file_set, LogBuffer* log_buffer, BlobFileSet* blob_file_set, LogBuffer* log_buffer,
...@@ -83,6 +86,7 @@ BlobGCJob::BlobGCJob(BlobGC* blob_gc, DB* db, port::Mutex* mutex, ...@@ -83,6 +86,7 @@ BlobGCJob::BlobGCJob(BlobGC* blob_gc, DB* db, port::Mutex* mutex,
base_db_impl_(reinterpret_cast<DBImpl*>(base_db_)), base_db_impl_(reinterpret_cast<DBImpl*>(base_db_)),
mutex_(mutex), mutex_(mutex),
db_options_(titan_db_options), db_options_(titan_db_options),
gc_merge_rewrite_(gc_merge_rewrite),
env_(env), env_(env),
env_options_(env_options), env_options_(env_options),
blob_file_manager_(blob_file_manager), blob_file_manager_(blob_file_manager),
...@@ -227,21 +231,32 @@ Status BlobGCJob::DoRunGC() { ...@@ -227,21 +231,32 @@ Status BlobGCJob::DoRunGC() {
// blob index's size is counted in `RewriteValidKeyToLSM` // blob index's size is counted in `RewriteValidKeyToLSM`
metrics_.bytes_written += blob_record.size(); metrics_.bytes_written += blob_record.size();
BlobIndex new_blob_index; MergeBlobIndex new_blob_index;
new_blob_index.file_number = blob_file_handle->GetNumber(); new_blob_index.file_number = blob_file_handle->GetNumber();
new_blob_index.source_file_number = blob_index.file_number;
new_blob_index.source_file_offset = blob_index.blob_handle.offset;
blob_file_builder->Add(blob_record, &new_blob_index.blob_handle); blob_file_builder->Add(blob_record, &new_blob_index.blob_handle);
std::string index_entry; std::string index_entry;
new_blob_index.EncodeTo(&index_entry);
if (!gc_merge_rewrite_) {
// Store WriteBatch for rewriting new Key-Index pairs to LSM new_blob_index.EncodeToBase(&index_entry);
GarbageCollectionWriteCallback callback(cfh, blob_record.key.ToString(), // Store WriteBatch for rewriting new Key-Index pairs to LSM
std::move(blob_index)); GarbageCollectionWriteCallback callback(cfh, blob_record.key.ToString(),
callback.value = index_entry; std::move(blob_index));
rewrite_batches_.emplace_back( callback.value = index_entry;
std::make_pair(WriteBatch(), std::move(callback))); rewrite_batches_.emplace_back(
auto& wb = rewrite_batches_.back().first; std::make_pair(WriteBatch(), std::move(callback)));
s = WriteBatchInternal::PutBlobIndex(&wb, cfh->GetID(), blob_record.key, auto& wb = rewrite_batches_.back().first;
index_entry); s = WriteBatchInternal::PutBlobIndex(&wb, cfh->GetID(), blob_record.key,
index_entry);
} else {
new_blob_index.EncodeTo(&index_entry);
rewrite_batches_without_callback_.emplace_back(
std::make_pair(WriteBatch(), blob_index.blob_handle.size));
auto& wb = rewrite_batches_without_callback_.back().first;
s = WriteBatchInternal::Merge(&wb, cfh->GetID(), blob_record.key,
index_entry);
}
if (!s.ok()) { if (!s.ok()) {
break; break;
} }
...@@ -433,32 +448,58 @@ Status BlobGCJob::RewriteValidKeyToLSM() { ...@@ -433,32 +448,58 @@ Status BlobGCJob::RewriteValidKeyToLSM() {
WriteOptions wo; WriteOptions wo;
wo.low_pri = true; wo.low_pri = true;
wo.ignore_missing_column_families = true; wo.ignore_missing_column_families = true;
for (auto& write_batch : rewrite_batches_) { if (!gc_merge_rewrite_) {
if (blob_gc_->GetColumnFamilyData()->IsDropped()) { for (auto& write_batch : rewrite_batches_) {
s = Status::Aborted("Column family drop"); if (blob_gc_->GetColumnFamilyData()->IsDropped()) {
break; s = Status::Aborted("Column family drop");
} break;
if (IsShutingDown()) { }
s = Status::ShutdownInProgress(); if (IsShutingDown()) {
break; s = Status::ShutdownInProgress();
break;
}
s = db_impl->WriteWithCallback(wo, &write_batch.first,
&write_batch.second);
if (s.ok()) {
// count written bytes for new blob index.
metrics_.bytes_written += write_batch.first.GetDataSize();
metrics_.gc_num_keys_relocated++;
metrics_.gc_bytes_relocated += write_batch.second.blob_record_size();
// Key is successfully written to LSM.
} else if (s.IsBusy()) {
metrics_.gc_num_keys_overwritten++;
metrics_.gc_bytes_overwritten += write_batch.second.blob_record_size();
// The key is overwritten in the meanwhile. Drop the blob record.
} else {
// We hit an error.
break;
}
// count read bytes in write callback
metrics_.bytes_read += write_batch.second.read_bytes();
} }
s = db_impl->WriteWithCallback(wo, &write_batch.first, &write_batch.second); } else {
if (s.ok()) { for (auto& write_batch : rewrite_batches_without_callback_) {
// count written bytes for new blob index. if (blob_gc_->GetColumnFamilyData()->IsDropped()) {
metrics_.bytes_written += write_batch.first.GetDataSize(); s = Status::Aborted("Column family drop");
metrics_.gc_num_keys_relocated++; break;
metrics_.gc_bytes_relocated += write_batch.second.blob_record_size(); }
// Key is successfully written to LSM. if (IsShutingDown()) {
} else if (s.IsBusy()) { s = Status::ShutdownInProgress();
metrics_.gc_num_keys_overwritten++; break;
metrics_.gc_bytes_overwritten += write_batch.second.blob_record_size(); }
// The key is overwritten in the meanwhile. Drop the blob record. s = db_impl->Write(wo, &write_batch.first);
} else { if (s.ok()) {
// We hit an error. // count written bytes for new blob index.
break; metrics_.bytes_written += write_batch.first.GetDataSize();
metrics_.gc_num_keys_relocated++;
metrics_.gc_bytes_relocated += write_batch.second;
// Key is successfully written to LSM.
} else {
// We hit an error.
break;
}
// read bytes is 0
} }
// count read bytes in write callback
metrics_.bytes_read += write_batch.second.read_bytes();
} }
if (s.IsBusy()) { if (s.IsBusy()) {
s = Status::OK(); s = Status::OK();
......
...@@ -17,15 +17,16 @@ namespace titandb { ...@@ -17,15 +17,16 @@ namespace titandb {
class BlobGCJob { class BlobGCJob {
public: public:
BlobGCJob(BlobGC* blob_gc, DB* db, port::Mutex* mutex, BlobGCJob(BlobGC *blob_gc, DB *db, port::Mutex *mutex,
const TitanDBOptions& titan_db_options, Env* env, const TitanDBOptions &titan_db_options, bool gc_merge_rewrite,
const EnvOptions& env_options, BlobFileManager* blob_file_manager, Env *env, const EnvOptions &env_options,
BlobFileSet* blob_file_set, LogBuffer* log_buffer, BlobFileManager *blob_file_manager, BlobFileSet *blob_file_set,
std::atomic_bool* shuting_down, TitanStats* stats); LogBuffer *log_buffer, std::atomic_bool *shuting_down,
TitanStats *stats);
// No copying allowed // No copying allowed
BlobGCJob(const BlobGCJob&) = delete; BlobGCJob(const BlobGCJob &) = delete;
void operator=(const BlobGCJob&) = delete; void operator=(const BlobGCJob &) = delete;
~BlobGCJob(); ~BlobGCJob();
...@@ -42,26 +43,29 @@ class BlobGCJob { ...@@ -42,26 +43,29 @@ class BlobGCJob {
void UpdateInternalOpStats(); void UpdateInternalOpStats();
BlobGC* blob_gc_; BlobGC *blob_gc_;
DB* base_db_; DB *base_db_;
DBImpl* base_db_impl_; DBImpl *base_db_impl_;
port::Mutex* mutex_; port::Mutex *mutex_;
TitanDBOptions db_options_; TitanDBOptions db_options_;
Env* env_; const bool gc_merge_rewrite_;
Env *env_;
EnvOptions env_options_; EnvOptions env_options_;
BlobFileManager* blob_file_manager_; BlobFileManager *blob_file_manager_;
BlobFileSet* blob_file_set_; BlobFileSet *blob_file_set_;
LogBuffer* log_buffer_{nullptr}; LogBuffer *log_buffer_{nullptr};
std::vector<std::pair<std::unique_ptr<BlobFileHandle>, std::vector<std::pair<std::unique_ptr<BlobFileHandle>,
std::unique_ptr<BlobFileBuilder>>> std::unique_ptr<BlobFileBuilder>>>
blob_file_builders_; blob_file_builders_;
std::vector<std::pair<WriteBatch, GarbageCollectionWriteCallback>> std::vector<std::pair<WriteBatch, GarbageCollectionWriteCallback>>
rewrite_batches_; rewrite_batches_;
std::vector<std::pair<WriteBatch, uint64_t /*blob_record_size*/>>
rewrite_batches_without_callback_;
std::atomic_bool* shuting_down_{nullptr}; std::atomic_bool *shuting_down_{nullptr};
TitanStats* stats_; TitanStats *stats_;
struct { struct {
uint64_t bytes_read = 0; uint64_t bytes_read = 0;
......
...@@ -22,7 +22,7 @@ std::string GenValue(int i) { ...@@ -22,7 +22,7 @@ std::string GenValue(int i) {
return buffer; return buffer;
} }
class BlobGCJobTest : public testing::Test { class BlobGCJobTest : public testing::TestWithParam<bool /*gc_merge_mode*/> {
public: public:
std::string dbname_; std::string dbname_;
TitanDB* db_; TitanDB* db_;
...@@ -41,6 +41,7 @@ class BlobGCJobTest : public testing::Test { ...@@ -41,6 +41,7 @@ class BlobGCJobTest : public testing::Test {
options_.env->CreateDirIfMissing(dbname_); options_.env->CreateDirIfMissing(dbname_);
options_.env->CreateDirIfMissing(options_.dirname); options_.env->CreateDirIfMissing(options_.dirname);
} }
~BlobGCJobTest() { Close(); } ~BlobGCJobTest() { Close(); }
void DisableMergeSmall() { options_.merge_small_file_threshold = 0; } void DisableMergeSmall() { options_.merge_small_file_threshold = 0; }
...@@ -157,7 +158,7 @@ class BlobGCJobTest : public testing::Test { ...@@ -157,7 +158,7 @@ class BlobGCJobTest : public testing::Test {
blob_gc->SetColumnFamily(cfh); blob_gc->SetColumnFamily(cfh);
BlobGCJob blob_gc_job(blob_gc.get(), base_db_, mutex_, tdb_->db_options_, BlobGCJob blob_gc_job(blob_gc.get(), base_db_, mutex_, tdb_->db_options_,
tdb_->env_, EnvOptions(options_), GetParam(), tdb_->env_, EnvOptions(options_),
tdb_->blob_manager_.get(), blob_file_set_, tdb_->blob_manager_.get(), blob_file_set_,
&log_buffer, nullptr, nullptr); &log_buffer, nullptr, nullptr);
...@@ -216,8 +217,8 @@ class BlobGCJobTest : public testing::Test { ...@@ -216,8 +217,8 @@ class BlobGCJobTest : public testing::Test {
BlobGC blob_gc(std::move(tmp), TitanCFOptions(), false /*trigger_next*/); BlobGC blob_gc(std::move(tmp), TitanCFOptions(), false /*trigger_next*/);
blob_gc.SetColumnFamily(cfh); blob_gc.SetColumnFamily(cfh);
BlobGCJob blob_gc_job(&blob_gc, base_db_, mutex_, TitanDBOptions(), BlobGCJob blob_gc_job(&blob_gc, base_db_, mutex_, TitanDBOptions(),
Env::Default(), EnvOptions(), nullptr, blob_file_set_, GetParam(), Env::Default(), EnvOptions(), nullptr,
nullptr, nullptr, nullptr); blob_file_set_, nullptr, nullptr, nullptr);
bool discardable = false; bool discardable = false;
ASSERT_OK(blob_gc_job.DiscardEntry(key, blob_index, &discardable)); ASSERT_OK(blob_gc_job.DiscardEntry(key, blob_index, &discardable));
ASSERT_FALSE(discardable); ASSERT_FALSE(discardable);
...@@ -280,11 +281,11 @@ class BlobGCJobTest : public testing::Test { ...@@ -280,11 +281,11 @@ class BlobGCJobTest : public testing::Test {
} }
}; };
TEST_F(BlobGCJobTest, DiscardEntry) { TestDiscardEntry(); } TEST_P(BlobGCJobTest, DiscardEntry) { TestDiscardEntry(); }
TEST_F(BlobGCJobTest, RunGC) { TestRunGC(); } TEST_P(BlobGCJobTest, RunGC) { TestRunGC(); }
TEST_F(BlobGCJobTest, GCLimiter) { TEST_P(BlobGCJobTest, GCLimiter) {
class TestLimiter : public RateLimiter { class TestLimiter : public RateLimiter {
public: public:
TestLimiter(RateLimiter::Mode mode) TestLimiter(RateLimiter::Mode mode)
...@@ -376,7 +377,7 @@ TEST_F(BlobGCJobTest, GCLimiter) { ...@@ -376,7 +377,7 @@ TEST_F(BlobGCJobTest, GCLimiter) {
Close(); Close();
} }
TEST_F(BlobGCJobTest, Reopen) { TEST_P(BlobGCJobTest, Reopen) {
DisableMergeSmall(); DisableMergeSmall();
NewDB(); NewDB();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
...@@ -403,7 +404,7 @@ TEST_F(BlobGCJobTest, Reopen) { ...@@ -403,7 +404,7 @@ TEST_F(BlobGCJobTest, Reopen) {
// Tests blob file will be kept after GC, if it is still visible by active // Tests blob file will be kept after GC, if it is still visible by active
// snapshots. // snapshots.
TEST_F(BlobGCJobTest, PurgeBlobs) { TEST_P(BlobGCJobTest, PurgeBlobs) {
NewDB(); NewDB();
auto snap1 = db_->GetSnapshot(); auto snap1 = db_->GetSnapshot();
...@@ -458,7 +459,7 @@ TEST_F(BlobGCJobTest, PurgeBlobs) { ...@@ -458,7 +459,7 @@ TEST_F(BlobGCJobTest, PurgeBlobs) {
CheckBlobNumber(1); CheckBlobNumber(1);
} }
TEST_F(BlobGCJobTest, DeleteFilesInRange) { TEST_P(BlobGCJobTest, DeleteFilesInRange) {
NewDB(); NewDB();
ASSERT_OK(db_->Put(WriteOptions(), GenKey(2), GenValue(21))); ASSERT_OK(db_->Put(WriteOptions(), GenKey(2), GenValue(21)));
...@@ -549,7 +550,7 @@ TEST_F(BlobGCJobTest, DeleteFilesInRange) { ...@@ -549,7 +550,7 @@ TEST_F(BlobGCJobTest, DeleteFilesInRange) {
delete iter; delete iter;
} }
TEST_F(BlobGCJobTest, LevelMergeGC) { TEST_P(BlobGCJobTest, LevelMergeGC) {
options_.level_merge = true; options_.level_merge = true;
options_.level_compaction_dynamic_level_bytes = true; options_.level_compaction_dynamic_level_bytes = true;
options_.blob_file_discardable_ratio = 0.5; options_.blob_file_discardable_ratio = 0.5;
...@@ -600,7 +601,7 @@ TEST_F(BlobGCJobTest, LevelMergeGC) { ...@@ -600,7 +601,7 @@ TEST_F(BlobGCJobTest, LevelMergeGC) {
BlobFileMeta::FileState::kNormal); BlobFileMeta::FileState::kNormal);
} }
TEST_F(BlobGCJobTest, RangeMergeScheduler) { TEST_P(BlobGCJobTest, RangeMergeScheduler) {
NewDB(); NewDB();
auto init_files = auto init_files =
[&](std::vector<std::vector<std::pair<std::string, std::string>>> [&](std::vector<std::vector<std::pair<std::string, std::string>>>
...@@ -771,7 +772,7 @@ TEST_F(BlobGCJobTest, RangeMergeScheduler) { ...@@ -771,7 +772,7 @@ TEST_F(BlobGCJobTest, RangeMergeScheduler) {
} }
} }
TEST_F(BlobGCJobTest, RangeMerge) { TEST_P(BlobGCJobTest, RangeMerge) {
options_.level_merge = true; options_.level_merge = true;
options_.level_compaction_dynamic_level_bytes = true; options_.level_compaction_dynamic_level_bytes = true;
options_.blob_file_discardable_ratio = 0.5; options_.blob_file_discardable_ratio = 0.5;
...@@ -821,6 +822,9 @@ TEST_F(BlobGCJobTest, RangeMerge) { ...@@ -821,6 +822,9 @@ TEST_F(BlobGCJobTest, RangeMerge) {
ASSERT_EQ(file->file_state(), BlobFileMeta::FileState::kObsolete); ASSERT_EQ(file->file_state(), BlobFileMeta::FileState::kObsolete);
} }
} }
INSTANTIATE_TEST_CASE_P(BlobGCJobTestParameterized, BlobGCJobTest,
::testing::Values(false, true));
} // namespace titandb } // namespace titandb
} // namespace rocksdb } // namespace rocksdb
......
#pragma once
#include "rocksdb/merge_operator.h"
#include "blob_file_set.h"
namespace rocksdb {
namespace titandb {
class BlobIndexMergeOperator : public MergeOperator {
public:
BlobIndexMergeOperator() = default;
// FullMergeV2 merges one base value with multiple merge operands and
// preserves latest value w.r.t. timestamp of original *put*. Each merge
// is the output of blob GC, and contains meta data including *src-file-no*
// and *src-file-offset*.
// Merge operation follows such rules:
// *. basic rule (keep base value): [Y][Z] ... [X](Y)(Z) => [X]
// a. same put (keep merge value): [Y] ... [X](Y)(X')(X") => [X"]
// we identify this case by checking *src-location* of merges against
// *blob-handle* of base.
// b. deletion (keep deletion marker): [delete](X)(Y) => [deletion marker]
// this is a workaround since vanilla rocksdb disallow empty result from
// merge.
bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const override {
Status s;
if (merge_in.existing_value && merge_in.value_type == kValue) {
merge_out->new_type = kValue;
merge_out->existing_operand = *merge_in.existing_value;
return true;
}
BlobIndex existing_index;
bool existing_index_valid = false;
if (merge_in.existing_value) {
assert(merge_in.value_type == kBlobIndex);
Slice copy = *merge_in.existing_value;
s = existing_index.DecodeFrom(&copy);
if (!s.ok()) {
return false;
}
existing_index_valid = !BlobIndex::IsDeletionMarker(existing_index);
}
if (!existing_index_valid) {
// this key must be deleted
merge_out->new_type = kBlobIndex;
merge_out->new_value.clear();
BlobIndex::EncodeDeletionMarkerTo(&merge_out->new_value);
return true;
}
MergeBlobIndex index;
BlobIndex merge_index;
for (auto operand : merge_in.operand_list) {
s = index.DecodeFrom(&operand);
if (!s.ok()) {
return false;
}
if (existing_index_valid) {
if (index.source_file_number == existing_index.file_number &&
index.source_file_offset == existing_index.blob_handle.offset) {
existing_index_valid = false;
merge_index = index;
}
} else if (index.source_file_number == merge_index.file_number &&
index.source_file_offset == merge_index.blob_handle.offset) {
merge_index = index;
}
}
merge_out->new_type = kBlobIndex;
if (existing_index_valid) {
merge_out->existing_operand = *merge_in.existing_value;
} else {
merge_out->new_value.clear();
merge_index.EncodeTo(&merge_out->new_value);
}
return true;
}
bool PartialMergeMulti(const Slice& key,
const std::deque<Slice>& operand_list,
std::string* new_value,
Logger* logger) const override {
return false;
}
const char* Name() const override { return "BlobGCOperator"; }
};
} // namespace titandb
} // namespace rocksdb
#include "test_util/testharness.h"
#include "blob_index_merge_operator.h"
namespace rocksdb {
namespace titandb {
std::string GenKey(int i) {
char buffer[32];
snprintf(buffer, sizeof(buffer), "k-%08d", i);
return buffer;
}
std::string GenValue(int i) {
char buffer[32];
snprintf(buffer, sizeof(buffer), "v-%08d", i);
return buffer;
}
BlobIndex GenBlobIndex(uint32_t i, uint32_t j = 0) {
BlobIndex index;
index.file_number = i;
index.blob_handle.offset = j;
index.blob_handle.size = 10;
return index;
}
MergeBlobIndex GenMergeBlobIndex(BlobIndex src, uint32_t i, uint32_t j = 0) {
MergeBlobIndex index;
index.file_number = i;
index.blob_handle.offset = j;
index.blob_handle.size = 10;
index.source_file_number = src.file_number;
index.source_file_offset = src.blob_handle.offset;
return index;
}
ValueType ToValueType(MergeOperator::MergeValueType value_type) {
switch (value_type) {
case MergeOperator::kDeletion:
return kTypeDeletion;
case MergeOperator::kValue:
return kTypeValue;
case MergeOperator::kBlobIndex:
return kTypeBlobIndex;
default:
return kTypeValue;
}
}
MergeOperator::MergeValueType ToMergeValueType(ValueType value_type) {
switch (value_type) {
case kTypeDeletion:
case kTypeSingleDeletion:
case kTypeRangeDeletion:
return MergeOperator::kDeletion;
case kTypeValue:
return MergeOperator::kValue;
case kTypeBlobIndex:
return MergeOperator::kBlobIndex;
default:
return MergeOperator::kValue;
}
}
class BlobIndexMergeOperatorTest : public testing::Test {
public:
std::string key_;
ValueType value_type_{kTypeDeletion};
std::string value_;
std::vector<std::string> operands_;
std::shared_ptr<BlobIndexMergeOperator> merge_operator_;
BlobIndexMergeOperatorTest()
: key_("k"),
merge_operator_(std::make_shared<BlobIndexMergeOperator>()) {}
void Put(std::string value, ValueType type = kTypeValue) {
value_ = value;
value_type_ = type;
operands_.clear();
}
void Put(BlobIndex blob_index) {
value_.clear();
blob_index.EncodeTo(&value_);
value_type_ = kTypeBlobIndex;
operands_.clear();
}
void Merge(MergeBlobIndex blob_index) {
std::string tmp;
blob_index.EncodeTo(&tmp);
operands_.emplace_back(tmp);
}
void Read(ValueType expect_type, std::string expect_value) {
std::string tmp_result_string;
Slice tmp_result_operand(nullptr, 0);
MergeOperator::MergeValueType merge_type = ToMergeValueType(value_type_);
Slice value = value_;
std::vector<Slice> operands;
for (auto& op : operands_) {
operands.emplace_back(op);
}
const MergeOperator::MergeOperationInput merge_in(
key_, merge_type,
merge_type == MergeOperator::kDeletion ? nullptr : &value, operands,
nullptr);
MergeOperator::MergeOperationOutput merge_out(tmp_result_string,
tmp_result_operand);
ASSERT_EQ(true, merge_operator_->FullMergeV2(merge_in, &merge_out));
ASSERT_EQ(true, merge_out.new_type != MergeOperator::kDeletion);
if (merge_out.new_type == merge_type) {
ASSERT_EQ(expect_type, value_type_);
} else {
ASSERT_EQ(expect_type, ToValueType(merge_out.new_type));
}
if (tmp_result_operand.data()) {
ASSERT_EQ(expect_value, tmp_result_operand);
} else {
ASSERT_EQ(expect_value, tmp_result_string);
}
}
void Clear() {
value_type_ = kTypeDeletion;
value_.clear();
operands_.clear();
}
};
TEST_F(BlobIndexMergeOperatorTest, KeepBaseValue) {
// [1] [2] (1->3)
Put(GenBlobIndex(2));
Merge(GenMergeBlobIndex(GenBlobIndex(1), 3));
std::string value;
GenBlobIndex(2).EncodeTo(&value);
Read(kTypeBlobIndex, value);
// [v] (1->2)
Clear();
Put(GenValue(1));
Merge(GenMergeBlobIndex(GenBlobIndex(1), 2));
Read(kTypeValue, GenValue(1));
}
TEST_F(BlobIndexMergeOperatorTest, KeepLatestMerge) {
// [1] (1->2) (3->4) (2->5)
Put(GenBlobIndex(1));
Merge(GenMergeBlobIndex(GenBlobIndex(1), 2));
Merge(GenMergeBlobIndex(GenBlobIndex(3), 4));
Merge(GenMergeBlobIndex(GenBlobIndex(2), 5));
std::string value;
GenBlobIndex(5).EncodeTo(&value);
Read(kTypeBlobIndex, value);
}
TEST_F(BlobIndexMergeOperatorTest, Delete) {
// [delete] (0->1)
Merge(GenMergeBlobIndex(GenBlobIndex(0), 1));
std::string value;
BlobIndex::EncodeDeletionMarkerTo(&value);
Read(kTypeBlobIndex, value);
// [deletion marker] (0->1)
Clear();
Put(value, kTypeBlobIndex);
Merge(GenMergeBlobIndex(GenBlobIndex(0), 1));
Read(kTypeBlobIndex, value);
}
} // namespace titandb
} // namespace rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
...@@ -144,6 +144,7 @@ TitanDBImpl::TitanDBImpl(const TitanDBOptions& options, ...@@ -144,6 +144,7 @@ TitanDBImpl::TitanDBImpl(const TitanDBOptions& options,
stats_.reset(new TitanStats(db_options_.statistics.get())); stats_.reset(new TitanStats(db_options_.statistics.get()));
} }
blob_manager_.reset(new FileManager(this)); blob_manager_.reset(new FileManager(this));
shared_merge_operator_ = std::make_shared<BlobIndexMergeOperator>();
} }
TitanDBImpl::~TitanDBImpl() { Close(); } TitanDBImpl::~TitanDBImpl() { Close(); }
...@@ -264,6 +265,7 @@ Status TitanDBImpl::OpenImpl(const std::vector<TitanCFDescriptor>& descs, ...@@ -264,6 +265,7 @@ Status TitanDBImpl::OpenImpl(const std::vector<TitanCFDescriptor>& descs,
db_options_, desc.options, this, blob_manager_, &mutex_, db_options_, desc.options, this, blob_manager_, &mutex_,
blob_file_set_.get(), stats_.get())); blob_file_set_.get(), stats_.get()));
cf_opts.table_factory = titan_table_factories.back(); cf_opts.table_factory = titan_table_factories.back();
cf_opts.merge_operator = shared_merge_operator_;
} }
// Initialize GC thread pool. // Initialize GC thread pool.
if (!db_options_.disable_background_gc && db_options_.max_background_gc > 0) { if (!db_options_.disable_background_gc && db_options_.max_background_gc > 0) {
...@@ -397,6 +399,7 @@ Status TitanDBImpl::CreateColumnFamilies( ...@@ -397,6 +399,7 @@ Status TitanDBImpl::CreateColumnFamilies(
options.table_factory = titan_table_factory.back(); options.table_factory = titan_table_factory.back();
options.table_properties_collector_factories.emplace_back( options.table_properties_collector_factories.emplace_back(
std::make_shared<BlobFileSizeCollectorFactory>()); std::make_shared<BlobFileSizeCollectorFactory>());
options.merge_operator = shared_merge_operator_;
base_descs.emplace_back(desc.name, options); base_descs.emplace_back(desc.name, options);
} }
...@@ -601,6 +604,9 @@ Status TitanDBImpl::GetImpl(const ReadOptions& options, ...@@ -601,6 +604,9 @@ Status TitanDBImpl::GetImpl(const ReadOptions& options,
s = index.DecodeFrom(value); s = index.DecodeFrom(value);
assert(s.ok()); assert(s.ok());
if (!s.ok()) return s; if (!s.ok()) return s;
if (BlobIndex::IsDeletionMarker(index)) {
return Status::NotFound("encounter deletion marker");
}
BlobRecord record; BlobRecord record;
PinnableSlice buffer; PinnableSlice buffer;
...@@ -955,22 +961,40 @@ Status TitanDBImpl::SetOptions( ...@@ -955,22 +961,40 @@ Status TitanDBImpl::SetOptions(
const std::unordered_map<std::string, std::string>& new_options) { const std::unordered_map<std::string, std::string>& new_options) {
Status s; Status s;
auto opts = new_options; auto opts = new_options;
auto p = opts.find("blob_run_mode"); bool set_blob_run_mode = false;
bool set_blob_run_mode = (p != opts.end()); TitanBlobRunMode blob_run_mode = TitanBlobRunMode::kNormal;
TitanBlobRunMode mode = TitanBlobRunMode::kNormal; bool set_gc_merge_rewrite = false;
if (set_blob_run_mode) { bool gc_merge_rewrite = false;
const std::string& blob_run_mode_string = p->second; {
auto pm = blob_run_mode_string_map.find(blob_run_mode_string); auto p = opts.find("blob_run_mode");
if (pm == blob_run_mode_string_map.end()) { set_blob_run_mode = (p != opts.end());
return Status::InvalidArgument("No blob_run_mode defined for " + if (set_blob_run_mode) {
blob_run_mode_string); const std::string& blob_run_mode_string = p->second;
} else { auto pm = blob_run_mode_string_map.find(blob_run_mode_string);
mode = pm->second; if (pm == blob_run_mode_string_map.end()) {
ROCKS_LOG_INFO(db_options_.info_log, "[%s] Set blob_run_mode: %s", return Status::InvalidArgument("No blob_run_mode defined for " +
column_family->GetName().c_str(), blob_run_mode_string);
blob_run_mode_string.c_str()); } else {
blob_run_mode = pm->second;
ROCKS_LOG_INFO(db_options_.info_log, "[%s] Set blob_run_mode: %s",
column_family->GetName().c_str(),
blob_run_mode_string.c_str());
}
opts.erase(p);
}
}
{
auto p = opts.find("gc_merge_rewrite");
set_gc_merge_rewrite = (p != opts.end());
if (set_gc_merge_rewrite) {
try {
gc_merge_rewrite = ParseBoolean("", p->second);
} catch (std::exception& e) {
return Status::InvalidArgument("Error parsing " + p->second + ":" +
std::string(e.what()));
}
opts.erase(p);
} }
opts.erase(p);
} }
if (opts.size() > 0) { if (opts.size() > 0) {
s = db_->SetOptions(column_family, opts); s = db_->SetOptions(column_family, opts);
...@@ -979,14 +1003,19 @@ Status TitanDBImpl::SetOptions( ...@@ -979,14 +1003,19 @@ Status TitanDBImpl::SetOptions(
} }
} }
// Make sure base db's SetOptions success before setting blob_run_mode. // Make sure base db's SetOptions success before setting blob_run_mode.
if (set_blob_run_mode) { if (set_blob_run_mode || set_gc_merge_rewrite) {
uint32_t cf_id = column_family->GetID(); uint32_t cf_id = column_family->GetID();
{ {
MutexLock l(&mutex_); MutexLock l(&mutex_);
assert(cf_info_.count(cf_id) > 0); assert(cf_info_.count(cf_id) > 0);
TitanColumnFamilyInfo& cf_info = cf_info_[cf_id]; TitanColumnFamilyInfo& cf_info = cf_info_[cf_id];
cf_info.titan_table_factory->SetBlobRunMode(mode); if (set_blob_run_mode) {
cf_info.mutable_cf_options.blob_run_mode = mode; cf_info.titan_table_factory->SetBlobRunMode(blob_run_mode);
cf_info.mutable_cf_options.blob_run_mode = blob_run_mode;
}
if (set_gc_merge_rewrite) {
cf_info.mutable_cf_options.gc_merge_rewrite = gc_merge_rewrite;
}
} }
} }
return Status::OK(); return Status::OK();
......
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
#include "blob_file_manager.h" #include "blob_file_manager.h"
#include "blob_file_set.h" #include "blob_file_set.h"
#include "blob_index_merge_operator.h"
#include "table_factory.h" #include "table_factory.h"
#include "titan/db.h" #include "titan/db.h"
#include "titan_stats.h" #include "titan_stats.h"
...@@ -269,6 +270,7 @@ class TitanDBImpl : public TitanDB { ...@@ -269,6 +270,7 @@ class TitanDBImpl : public TitanDB {
DBImpl* db_impl_; DBImpl* db_impl_;
TitanDBOptions db_options_; TitanDBOptions db_options_;
std::unique_ptr<Directory> directory_; std::unique_ptr<Directory> directory_;
std::shared_ptr<BlobIndexMergeOperator> shared_merge_operator_;
std::atomic<bool> initialized_{false}; std::atomic<bool> initialized_{false};
......
...@@ -164,6 +164,7 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer, ...@@ -164,6 +164,7 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer,
StopWatch gc_sw(env_, stats_.get(), BLOB_DB_GC_MICROS); StopWatch gc_sw(env_, stats_.get(), BLOB_DB_GC_MICROS);
std::unique_ptr<BlobGC> blob_gc; std::unique_ptr<BlobGC> blob_gc;
bool gc_merge_rewrite;
std::unique_ptr<ColumnFamilyHandle> cfh; std::unique_ptr<ColumnFamilyHandle> cfh;
Status s; Status s;
...@@ -187,6 +188,8 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer, ...@@ -187,6 +188,8 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer,
cfh = db_impl_->GetColumnFamilyHandleUnlocked(column_family_id); cfh = db_impl_->GetColumnFamilyHandleUnlocked(column_family_id);
assert(column_family_id == cfh->GetID()); assert(column_family_id == cfh->GetID());
blob_gc->SetColumnFamily(cfh.get()); blob_gc->SetColumnFamily(cfh.get());
gc_merge_rewrite =
cf_info_[column_family_id].mutable_cf_options.gc_merge_rewrite;
} }
} }
...@@ -196,10 +199,10 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer, ...@@ -196,10 +199,10 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer,
// Nothing to do // Nothing to do
ROCKS_LOG_BUFFER(log_buffer, "Titan GC nothing to do"); ROCKS_LOG_BUFFER(log_buffer, "Titan GC nothing to do");
} else { } else {
BlobGCJob blob_gc_job(blob_gc.get(), db_, &mutex_, db_options_, env_, BlobGCJob blob_gc_job(blob_gc.get(), db_, &mutex_, db_options_,
env_options_, blob_manager_.get(), gc_merge_rewrite, env_, env_options_,
blob_file_set_.get(), log_buffer, &shuting_down_, blob_manager_.get(), blob_file_set_.get(), log_buffer,
stats_.get()); &shuting_down_, stats_.get());
s = blob_gc_job.Prepare(); s = blob_gc_job.Prepare();
if (s.ok()) { if (s.ok()) {
mutex_.Unlock(); mutex_.Unlock();
......
...@@ -47,7 +47,7 @@ class TitanDBIterator : public Iterator { ...@@ -47,7 +47,7 @@ class TitanDBIterator : public Iterator {
iter_->SeekToFirst(); iter_->SeekToFirst();
if (ShouldGetBlobValue()) { if (ShouldGetBlobValue()) {
StopWatch seek_sw(env_, stats_, BLOB_DB_SEEK_MICROS); StopWatch seek_sw(env_, stats_, BLOB_DB_SEEK_MICROS);
GetBlobValue(); GetBlobValue(true);
RecordTick(stats_, BLOB_DB_NUM_SEEK); RecordTick(stats_, BLOB_DB_NUM_SEEK);
} }
} }
...@@ -56,7 +56,7 @@ class TitanDBIterator : public Iterator { ...@@ -56,7 +56,7 @@ class TitanDBIterator : public Iterator {
iter_->SeekToLast(); iter_->SeekToLast();
if (ShouldGetBlobValue()) { if (ShouldGetBlobValue()) {
StopWatch seek_sw(env_, stats_, BLOB_DB_SEEK_MICROS); StopWatch seek_sw(env_, stats_, BLOB_DB_SEEK_MICROS);
GetBlobValue(); GetBlobValue(false);
RecordTick(stats_, BLOB_DB_NUM_SEEK); RecordTick(stats_, BLOB_DB_NUM_SEEK);
} }
} }
...@@ -65,7 +65,7 @@ class TitanDBIterator : public Iterator { ...@@ -65,7 +65,7 @@ class TitanDBIterator : public Iterator {
iter_->Seek(target); iter_->Seek(target);
if (ShouldGetBlobValue()) { if (ShouldGetBlobValue()) {
StopWatch seek_sw(env_, stats_, BLOB_DB_SEEK_MICROS); StopWatch seek_sw(env_, stats_, BLOB_DB_SEEK_MICROS);
GetBlobValue(); GetBlobValue(true);
RecordTick(stats_, BLOB_DB_NUM_SEEK); RecordTick(stats_, BLOB_DB_NUM_SEEK);
} }
} }
...@@ -74,7 +74,7 @@ class TitanDBIterator : public Iterator { ...@@ -74,7 +74,7 @@ class TitanDBIterator : public Iterator {
iter_->SeekForPrev(target); iter_->SeekForPrev(target);
if (ShouldGetBlobValue()) { if (ShouldGetBlobValue()) {
StopWatch seek_sw(env_, stats_, BLOB_DB_SEEK_MICROS); StopWatch seek_sw(env_, stats_, BLOB_DB_SEEK_MICROS);
GetBlobValue(); GetBlobValue(false);
RecordTick(stats_, BLOB_DB_NUM_SEEK); RecordTick(stats_, BLOB_DB_NUM_SEEK);
} }
} }
...@@ -84,7 +84,7 @@ class TitanDBIterator : public Iterator { ...@@ -84,7 +84,7 @@ class TitanDBIterator : public Iterator {
iter_->Next(); iter_->Next();
if (ShouldGetBlobValue()) { if (ShouldGetBlobValue()) {
StopWatch next_sw(env_, stats_, BLOB_DB_NEXT_MICROS); StopWatch next_sw(env_, stats_, BLOB_DB_NEXT_MICROS);
GetBlobValue(); GetBlobValue(true);
RecordTick(stats_, BLOB_DB_NUM_NEXT); RecordTick(stats_, BLOB_DB_NUM_NEXT);
} }
} }
...@@ -94,7 +94,7 @@ class TitanDBIterator : public Iterator { ...@@ -94,7 +94,7 @@ class TitanDBIterator : public Iterator {
iter_->Prev(); iter_->Prev();
if (ShouldGetBlobValue()) { if (ShouldGetBlobValue()) {
StopWatch prev_sw(env_, stats_, BLOB_DB_PREV_MICROS); StopWatch prev_sw(env_, stats_, BLOB_DB_PREV_MICROS);
GetBlobValue(); GetBlobValue(false);
RecordTick(stats_, BLOB_DB_NUM_PREV); RecordTick(stats_, BLOB_DB_NUM_PREV);
} }
} }
...@@ -120,7 +120,7 @@ class TitanDBIterator : public Iterator { ...@@ -120,7 +120,7 @@ class TitanDBIterator : public Iterator {
return true; return true;
} }
void GetBlobValue() { void GetBlobValue(bool forward) {
assert(iter_->status().ok()); assert(iter_->status().ok());
BlobIndex index; BlobIndex index;
...@@ -132,7 +132,30 @@ class TitanDBIterator : public Iterator { ...@@ -132,7 +132,30 @@ class TitanDBIterator : public Iterator {
status_.ToString().c_str()); status_.ToString().c_str());
return; return;
} }
while (BlobIndex::IsDeletionMarker(index)) {
// skip deletion marker
if (forward) {
iter_->Next();
} else {
iter_->Prev();
}
if (!ShouldGetBlobValue()) {
return;
} else {
status_ = DecodeInto(iter_->value(), &index);
if (!status_.ok()) {
ROCKS_LOG_ERROR(info_log_,
"Titan iterator: failed to decode blob index %s: %s",
iter_->value().ToString(true /*hex*/).c_str(),
status_.ToString().c_str());
return;
}
}
}
GetBlobValueImpl(index);
}
void GetBlobValueImpl(const BlobIndex& index) {
auto it = files_.find(index.file_number); auto it = files_.find(index.file_number);
if (it == files_.end()) { if (it == files_.end()) {
std::unique_ptr<BlobFilePrefetcher> prefetcher; std::unique_ptr<BlobFilePrefetcher> prefetcher;
......
...@@ -42,7 +42,8 @@ TitanCFOptions::TitanCFOptions(const ColumnFamilyOptions& cf_opts, ...@@ -42,7 +42,8 @@ TitanCFOptions::TitanCFOptions(const ColumnFamilyOptions& cf_opts,
blob_file_discardable_ratio(immutable_opts.blob_file_discardable_ratio), blob_file_discardable_ratio(immutable_opts.blob_file_discardable_ratio),
sample_file_size_ratio(immutable_opts.sample_file_size_ratio), sample_file_size_ratio(immutable_opts.sample_file_size_ratio),
merge_small_file_threshold(immutable_opts.merge_small_file_threshold), merge_small_file_threshold(immutable_opts.merge_small_file_threshold),
blob_run_mode(mutable_opts.blob_run_mode) {} blob_run_mode(mutable_opts.blob_run_mode),
gc_merge_rewrite(mutable_opts.gc_merge_rewrite) {}
void TitanCFOptions::Dump(Logger* logger) const { void TitanCFOptions::Dump(Logger* logger) const {
ROCKS_LOG_HEADER(logger, ROCKS_LOG_HEADER(logger,
......
#pragma once #pragma once
#include "rocksdb/types.h"
#include "blob_file_builder.h" #include "blob_file_builder.h"
#include "blob_file_manager.h" #include "blob_file_manager.h"
#include "blob_file_set.h" #include "blob_file_set.h"
......
#include <inttypes.h> #include <inttypes.h>
#include <options/cf_options.h> #include <options/cf_options.h>
#include <initializer_list>
#include <map> #include <map>
#include <vector> #include <vector>
...@@ -15,22 +16,6 @@ ...@@ -15,22 +16,6 @@
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
struct ThreadParam {
ThreadParam() = default;
ThreadParam(uint32_t _concurrency, uint32_t _repeat, bool _sync)
: concurrency(_concurrency), repeat(_repeat), sync(_sync) {}
ThreadParam(const ThreadParam& rhs)
: concurrency(rhs.concurrency), repeat(rhs.repeat), sync(rhs.sync) {}
uint32_t concurrency{4};
uint32_t repeat{10};
// when set true, faster worker will run extra jobs util slowest
// worker finishes to maximize race condition.
bool sync{true};
};
void DeleteDir(Env* env, const std::string& dirname) { void DeleteDir(Env* env, const std::string& dirname) {
std::vector<std::string> filenames; std::vector<std::string> filenames;
env->GetChildren(dirname, &filenames); env->GetChildren(dirname, &filenames);
...@@ -83,6 +68,12 @@ class TitanThreadSafetyTest : public testing::Test { ...@@ -83,6 +68,12 @@ class TitanThreadSafetyTest : public testing::Test {
ASSERT_OK(db_->Put(wopts, handle, key, value)); ASSERT_OK(db_->Put(wopts, handle, key, value));
} }
void DeleteCF(ColumnFamilyHandle* handle, uint64_t k) {
WriteOptions wopts;
std::string key = GenKey(k);
ASSERT_OK(db_->Delete(wopts, handle, key));
}
void VerifyCF(ColumnFamilyHandle* handle, void VerifyCF(ColumnFamilyHandle* handle,
const std::map<std::string, std::string>& data, const std::map<std::string, std::string>& data,
ReadOptions ropts = ReadOptions()) { ReadOptions ropts = ReadOptions()) {
...@@ -105,6 +96,25 @@ class TitanThreadSafetyTest : public testing::Test { ...@@ -105,6 +96,25 @@ class TitanThreadSafetyTest : public testing::Test {
delete iterator; delete iterator;
} }
void ReadCF(ColumnFamilyHandle* handle,
const std::map<std::string, std::string>& data,
ReadOptions ropts = ReadOptions()) {
db_impl_->PurgeObsoleteFiles();
for (auto& kv : data) {
std::string value;
auto s = db_->Get(ropts, handle, kv.first, &value);
ASSERT_TRUE(s.ok() || s.IsNotFound());
}
Iterator* iterator = db_->NewIterator(ropts, handle);
iterator->SeekToFirst();
while (iterator->Valid()) {
iterator->Next();
}
delete iterator;
}
std::string GenKey(uint64_t k) { std::string GenKey(uint64_t k) {
char buf[64]; char buf[64];
snprintf(buf, sizeof(buf), "k-%08" PRIu64, k); snprintf(buf, sizeof(buf), "k-%08" PRIu64, k);
...@@ -119,6 +129,56 @@ class TitanThreadSafetyTest : public testing::Test { ...@@ -119,6 +129,56 @@ class TitanThreadSafetyTest : public testing::Test {
} }
} }
void RunJobs(
std::initializer_list<std::function<void(ColumnFamilyHandle*)>> jobs) {
Open();
std::vector<port::Thread> threads;
std::map<std::string, ColumnFamilyHandle*> handles;
std::map<std::string, uint32_t> ref_count;
uint32_t job_count = jobs.size();
unfinished_worker_.store(job_count * param_.num_column_family,
std::memory_order_relaxed);
for (uint32_t col = 0; col < param_.num_column_family; col++) {
std::string name = std::to_string(col);
TitanCFDescriptor desc(name, options_);
ColumnFamilyHandle* handle = nullptr;
ASSERT_OK(db_->CreateColumnFamily(desc, &handle));
{
MutexLock l(&mutex_);
handles[name] = handle;
ref_count[name] = job_count;
}
for (auto& job : jobs) {
threads.emplace_back([&, handle, name] {
for (uint32_t k = 0; k < param_.repeat; k++) {
job(handle);
}
unfinished_worker_.fetch_sub(1, std::memory_order_relaxed);
if (param_.sync) {
while (unfinished_worker_.load(std::memory_order_relaxed) != 0) {
job(handle);
}
}
bool need_drop = false;
{
MutexLock l(&mutex_);
if ((--ref_count[name]) == 0) {
ref_count.erase(name);
handles.erase(name);
need_drop = true;
}
}
if (need_drop) {
ASSERT_OK(db_->DropColumnFamily(handle));
db_->DestroyColumnFamilyHandle(handle);
}
});
}
}
std::for_each(threads.begin(), threads.end(),
std::mem_fn(&port::Thread::join));
}
port::Mutex mutex_; port::Mutex mutex_;
Env* env_{Env::Default()}; Env* env_{Env::Default()};
std::string dbname_; std::string dbname_;
...@@ -126,85 +186,95 @@ class TitanThreadSafetyTest : public testing::Test { ...@@ -126,85 +186,95 @@ class TitanThreadSafetyTest : public testing::Test {
TitanDB* db_{nullptr}; TitanDB* db_{nullptr};
TitanDBImpl* db_impl_{nullptr}; TitanDBImpl* db_impl_{nullptr};
ThreadParam param_; struct TestParam {
TestParam() = default;
uint32_t num_column_family{4};
uint32_t repeat{10};
// when set true, faster worker will run extra jobs util slowest
// worker finishes to maximize race condition.
bool sync{true};
} param_;
std::atomic<uint32_t> unfinished_worker_; std::atomic<uint32_t> unfinished_worker_;
}; };
TEST_F(TitanThreadSafetyTest, Basic) { TEST_F(TitanThreadSafetyTest, Insert) {
Open();
const uint64_t kNumEntries = 100; const uint64_t kNumEntries = 100;
std::vector<port::Thread> threads;
std::map<std::string, ColumnFamilyHandle*> handles;
std::map<std::string, uint32_t> ref_count;
std::map<std::string, std::string> data; std::map<std::string, std::string> data;
for (uint64_t i = 1; i <= kNumEntries; i++) { for (uint64_t i = 1; i <= kNumEntries; i++) {
PutMap(data, i); PutMap(data, i);
} }
ASSERT_EQ(kNumEntries, data.size()); ASSERT_EQ(kNumEntries, data.size());
std::vector<std::function<void(ColumnFamilyHandle*)>> jobs = { RunJobs({// Write and Flush and Verify
// Write and Flush [&](ColumnFamilyHandle* handle) {
[&](ColumnFamilyHandle* handle) { ASSERT_TRUE(handle != nullptr);
ASSERT_TRUE(handle != nullptr); for (uint64_t i = 1; i <= kNumEntries; i++) {
for (uint64_t i = 1; i <= kNumEntries; i++) { PutCF(handle, i);
PutCF(handle, i); }
} FlushOptions fopts;
FlushOptions fopts; ASSERT_OK(db_->Flush(fopts, handle));
ASSERT_OK(db_->Flush(fopts, handle)); VerifyCF(handle, data);
VerifyCF(handle, data); },
}, [&](ColumnFamilyHandle* handle) {
// Compact ASSERT_TRUE(handle != nullptr);
[&](ColumnFamilyHandle* handle) { ReadCF(handle, data);
ASSERT_TRUE(handle != nullptr); },
CompactRangeOptions copts; // Compact
ASSERT_OK(db_->CompactRange(copts, handle, nullptr, nullptr)); [&](ColumnFamilyHandle* handle) {
}, ASSERT_TRUE(handle != nullptr);
// GC CompactRangeOptions copts;
[&](ColumnFamilyHandle* handle) { ASSERT_OK(db_->CompactRange(copts, handle, nullptr, nullptr));
ASSERT_TRUE(handle != nullptr); },
GC(handle); // GC
}}; [&](ColumnFamilyHandle* handle) {
uint32_t job_count = jobs.size(); ASSERT_TRUE(handle != nullptr);
unfinished_worker_.store(job_count * param_.concurrency, GC(handle);
std::memory_order_relaxed); }});
for (uint32_t col = 0; col < param_.concurrency; col++) { }
std::string name = std::to_string(col);
TitanCFDescriptor desc(name, options_); TEST_F(TitanThreadSafetyTest, InsertAndDelete) {
ColumnFamilyHandle* handle = nullptr; const uint64_t kNumEntries = 100;
ASSERT_OK(db_->CreateColumnFamily(desc, &handle)); std::map<std::string, std::string> data;
{ for (uint64_t i = 1; i <= kNumEntries; i++) {
MutexLock l(&mutex_); PutMap(data, i);
handles[name] = handle;
ref_count[name] = job_count;
}
for (uint32_t job = 0; job < job_count; job++) {
threads.emplace_back([&, job, handle, name] {
for (uint32_t k = 0; k < param_.repeat; k++) {
jobs[job](handle);
}
unfinished_worker_.fetch_sub(1, std::memory_order_relaxed);
if (param_.sync) {
while (unfinished_worker_.load(std::memory_order_relaxed) != 0) {
jobs[job](handle);
}
}
bool need_drop = false;
{
MutexLock l(&mutex_);
if ((--ref_count[name]) == 0) {
ref_count.erase(name);
handles.erase(name);
need_drop = true;
}
}
if (need_drop) {
ASSERT_OK(db_->DropColumnFamily(handle));
db_->DestroyColumnFamilyHandle(handle);
}
});
}
} }
std::for_each(threads.begin(), threads.end(), ASSERT_EQ(kNumEntries, data.size());
std::mem_fn(&port::Thread::join)); RunJobs({// Insert and Flush
[&](ColumnFamilyHandle* handle) {
ASSERT_TRUE(handle != nullptr);
for (uint64_t i = 1; i <= kNumEntries; i++) {
PutCF(handle, i);
}
FlushOptions fopts;
ASSERT_OK(db_->Flush(fopts, handle));
},
// Delete and Flush
[&](ColumnFamilyHandle* handle) {
ASSERT_TRUE(handle != nullptr);
for (uint64_t i = 1; i <= kNumEntries; i++) {
DeleteCF(handle, i);
}
FlushOptions fopts;
ASSERT_OK(db_->Flush(fopts, handle));
},
// Read
[&](ColumnFamilyHandle* handle) {
ASSERT_TRUE(handle != nullptr);
ReadCF(handle, data);
},
// Compact
[&](ColumnFamilyHandle* handle) {
ASSERT_TRUE(handle != nullptr);
CompactRangeOptions copts;
ASSERT_OK(db_->CompactRange(copts, handle, nullptr, nullptr));
},
// GC
[&](ColumnFamilyHandle* handle) {
ASSERT_TRUE(handle != nullptr);
GC(handle);
}});
} }
} // namespace titandb } // namespace titandb
......
...@@ -881,13 +881,24 @@ TEST_F(TitanDBTest, SetOptions) { ...@@ -881,13 +881,24 @@ TEST_F(TitanDBTest, SetOptions) {
std::unordered_map<std::string, std::string> opts; std::unordered_map<std::string, std::string> opts;
// Set titan options. // Set titan blob_run_mode.
opts["blob_run_mode"] = "kReadOnly"; opts["blob_run_mode"] = "kReadOnly";
ASSERT_OK(db_->SetOptions(opts)); ASSERT_OK(db_->SetOptions(opts));
titan_options = db_->GetTitanOptions(); titan_options = db_->GetTitanOptions();
ASSERT_EQ(TitanBlobRunMode::kReadOnly, titan_options.blob_run_mode); ASSERT_EQ(TitanBlobRunMode::kReadOnly, titan_options.blob_run_mode);
opts.clear(); opts.clear();
// Set titan gc_merge_rewrite.
opts["gc_merge_rewrite"] = "true";
ASSERT_OK(db_->SetOptions(opts));
titan_options = db_->GetTitanOptions();
ASSERT_EQ(true, titan_options.gc_merge_rewrite);
opts["gc_merge_rewrite"] = "0";
ASSERT_OK(db_->SetOptions(opts));
titan_options = db_->GetTitanOptions();
ASSERT_EQ(false, titan_options.gc_merge_rewrite);
opts.clear();
// Set column family options. // Set column family options.
opts["disable_auto_compactions"] = "true"; opts["disable_auto_compactions"] = "true";
ASSERT_OK(db_->SetOptions(opts)); ASSERT_OK(db_->SetOptions(opts));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment