Commit 9d2b70ef authored by yiwu-arbug's avatar yiwu-arbug Committed by Connor

Cherry-pick recent changes to tikv-3.0 branch (#89)

cherry-pick following patches
```
ff30897d 2019-09-23 zbk602423539@gmail.. reset gc mark after sampling (#88)
cb67cd56 2019-09-22 zbk602423539@gmail.. fix wrong property after restart (#82)
c6e408c2 2019-09-20 yiwu@pingcap.com     Fix blob file format description (#83)
fc4106d4 2019-09-18 yiwu@pingcap.com     Fix new CF missing BlobFileSizeCollector (#78)
10710bb1 2019-09-18 zbk602423539@gmail.. Add more metrics (#79)
8768067e 2019-09-17 yiwu@pingcap.com     Prevent CF being dropped while GC is running (#72)
07aa0655 2019-09-16 zbk602423539@gmail.. Rename version_set to blob_file_set (#69)
f963b880 2019-09-16 zbk602423539@gmail.. support histogram for titan stats (#74)
144e20f0 2019-09-12 wujy.cs@gmail.com    gc breakdown metrics (#73)
715dbd69 2019-09-11 zbk602423539@gmail.. Update BlobFileMeta format (#68)
468ddc97 2019-09-03 yiwu@pingcap.com     Add internal operation stats and dump to info log periodically (#62)
```
parent 19ea115a
......@@ -96,6 +96,7 @@ if (WITH_TITAN_TESTS AND (NOT CMAKE_BUILD_TYPE STREQUAL "Release"))
table_builder_test
thread_safety_test
titan_db_test
titan_options_test
util_test
version_test)
set(TEST_LIBS
......
......@@ -158,6 +158,21 @@ class TitanDB : public StackableDB {
// "rocksdb.titandb.obsolete-blob-file-size" - returns size of obsolete
// blob files.
static const std::string kObsoleteBlobFileSize;
// "rocksdb.titandb.discardable_ratio_le0_file_num" - returns count of
// file whose discardable ratio is less or equal to 0%.
static const std::string kNumDiscardableRatioLE0File;
// "rocksdb.titandb.discardable_ratio_le20_file_num" - returns count of
// file whose discardable ratio is less or equal to 20%.
static const std::string kNumDiscardableRatioLE20File;
// "rocksdb.titandb.discardable_ratio_le50_file_num" - returns count of
// file whose discardable ratio is less or equal to 50%.
static const std::string kNumDiscardableRatioLE50File;
// "rocksdb.titandb.discardable_ratio_le80_file_num" - returns count of
// file whose discardable ratio is less or equal to 80%.
static const std::string kNumDiscardableRatioLE80File;
// "rocksdb.titandb.discardable_ratio_le100_file_num" - returns count of
// file whose discardable ratio is less or equal to 100%.
static const std::string kNumDiscardableRatioLE100File;
};
bool GetProperty(ColumnFamilyHandle* column_family, const Slice& property,
......
......@@ -29,7 +29,13 @@ struct TitanDBOptions : public DBOptions {
// How often to schedule delete obsolete blob files periods
//
// Default: 10
uint32_t purge_obsolete_files_period{10}; // 10s
uint32_t purge_obsolete_files_period_sec{10}; // 10s
// If non-zero, dump titan internal stats to info log every
// titan_stats_dump_period_sec.
//
// Default: 600 (10 min)
uint32_t titan_stats_dump_period_sec{600};
TitanDBOptions() = default;
explicit TitanDBOptions(const DBOptions& options) : DBOptions(options) {}
......
......@@ -25,6 +25,15 @@ void BlobFileBuilder::Add(const BlobRecord& record, BlobHandle* handle) {
status_ = file_->Append(encoder_.GetHeader());
if (ok()) {
status_ = file_->Append(encoder_.GetRecord());
// The keys added into blob files are in order.
if (smallest_key_.empty()) {
smallest_key_.assign(record.key.data(), record.key.size());
}
assert(cf_options_.comparator->Compare(record.key, Slice(smallest_key_)) >=
0);
assert(cf_options_.comparator->Compare(record.key, Slice(largest_key_)) >=
0);
largest_key_.assign(record.key.data(), record.key.size());
}
}
......
......@@ -54,6 +54,9 @@ class BlobFileBuilder {
// REQUIRES: Finish(), Abandon() have not been called.
void Abandon();
const std::string& GetSmallestKey() { return smallest_key_; }
const std::string& GetLargestKey() { return largest_key_; }
private:
bool ok() const { return status().ok(); }
......@@ -62,6 +65,9 @@ class BlobFileBuilder {
Status status_;
BlobEncoder encoder_;
std::string smallest_key_;
std::string largest_key_;
};
} // namespace titandb
......
......@@ -144,8 +144,10 @@ void BlobFileIterator::PrefetchAndGet() {
}
BlobFileMergeIterator::BlobFileMergeIterator(
std::vector<std::unique_ptr<BlobFileIterator>>&& blob_file_iterators)
: blob_file_iterators_(std::move(blob_file_iterators)) {}
std::vector<std::unique_ptr<BlobFileIterator>>&& blob_file_iterators,
const Comparator* comparator)
: blob_file_iterators_(std::move(blob_file_iterators)),
min_heap_(BlobFileIterComparator(comparator)) {}
bool BlobFileMergeIterator::Valid() const {
if (current_ == nullptr) return false;
......
......@@ -75,7 +75,7 @@ class BlobFileIterator {
class BlobFileMergeIterator {
public:
explicit BlobFileMergeIterator(
std::vector<std::unique_ptr<BlobFileIterator>>&&);
std::vector<std::unique_ptr<BlobFileIterator>>&&, const Comparator*);
~BlobFileMergeIterator() = default;
......@@ -93,19 +93,28 @@ class BlobFileMergeIterator {
BlobIndex GetBlobIndex() { return current_->GetBlobIndex(); }
private:
class IternalComparator {
class BlobFileIterComparator {
public:
// The default constructor is not supposed to be used.
// It is only to make std::priority_queue can compile.
BlobFileIterComparator() : comparator_(nullptr){};
explicit BlobFileIterComparator(const Comparator* comparator)
: comparator_(comparator){};
// Smaller value get Higher priority
bool operator()(const BlobFileIterator* iter1,
const BlobFileIterator* iter2) {
return BytewiseComparator()->Compare(iter1->key(), iter2->key()) > 0;
assert(comparator_ != nullptr);
return comparator_->Compare(iter1->key(), iter2->key()) > 0;
}
private:
const Comparator* comparator_;
};
Status status_;
std::vector<std::unique_ptr<BlobFileIterator>> blob_file_iterators_;
std::priority_queue<BlobFileIterator*, std::vector<BlobFileIterator*>,
IternalComparator>
BlobFileIterComparator>
min_heap_;
BlobFileIterator* current_ = nullptr;
};
......
......@@ -93,8 +93,7 @@ class BlobFileIteratorTest : public testing::Test {
const int n = 1000;
std::vector<BlobHandle> handles(n);
for (int i = 0; i < n; i++) {
auto id = std::to_string(i);
AddKeyValue(id, id, &handles[i]);
AddKeyValue(GenKey(i), GenValue(i), &handles[i]);
}
FinishBuilder();
......@@ -105,9 +104,8 @@ class BlobFileIteratorTest : public testing::Test {
for (int i = 0; i < n; blob_file_iterator_->Next(), i++) {
ASSERT_OK(blob_file_iterator_->status());
ASSERT_EQ(blob_file_iterator_->Valid(), true);
auto id = std::to_string(i);
ASSERT_EQ(id, blob_file_iterator_->key());
ASSERT_EQ(id, blob_file_iterator_->value());
ASSERT_EQ(GenKey(i), blob_file_iterator_->key());
ASSERT_EQ(GenValue(i), blob_file_iterator_->value());
BlobIndex blob_index = blob_file_iterator_->GetBlobIndex();
ASSERT_EQ(handles[i], blob_index.blob_handle);
}
......@@ -124,8 +122,7 @@ TEST_F(BlobFileIteratorTest, IterateForPrev) {
const int n = 1000;
std::vector<BlobHandle> handles(n);
for (int i = 0; i < n; i++) {
auto id = std::to_string(i);
AddKeyValue(id, id, &handles[i]);
AddKeyValue(GenKey(i), GenValue(i), &handles[i]);
}
FinishBuilder();
......@@ -141,9 +138,8 @@ TEST_F(BlobFileIteratorTest, IterateForPrev) {
BlobIndex blob_index;
blob_index = blob_file_iterator_->GetBlobIndex();
ASSERT_EQ(handles[i], blob_index.blob_handle);
auto id = std::to_string(i);
ASSERT_EQ(id, blob_file_iterator_->key());
ASSERT_EQ(id, blob_file_iterator_->value());
ASSERT_EQ(GenKey(i), blob_file_iterator_->key());
ASSERT_EQ(GenValue(i), blob_file_iterator_->value());
}
auto idx = Random::GetTLSInstance()->Uniform(n);
......@@ -206,7 +202,7 @@ TEST_F(BlobFileIteratorTest, MergeIterator) {
&readable_file_);
iters.emplace_back(std::unique_ptr<BlobFileIterator>(new BlobFileIterator{
std::move(readable_file_), file_number_, file_size, TitanCFOptions()}));
BlobFileMergeIterator iter(std::move(iters));
BlobFileMergeIterator iter(std::move(iters), titan_options_.comparator);
iter.SeekToFirst();
int i = 1;
......
......@@ -109,15 +109,13 @@ Status BlobFileReader::Get(const ReadOptions& /*options*/,
EncodeBlobCache(&cache_key, cache_prefix_, handle.offset);
cache_handle = cache_->Lookup(cache_key);
if (cache_handle) {
RecordTick(stats_, BLOCK_CACHE_DATA_HIT);
RecordTick(stats_, BLOCK_CACHE_HIT);
RecordTick(stats_, TitanStats::BLOB_CACHE_HIT);
auto blob = reinterpret_cast<OwnedSlice*>(cache_->Value(cache_handle));
buffer->PinSlice(*blob, UnrefCacheHandle, cache_.get(), cache_handle);
return DecodeInto(*blob, record);
}
}
RecordTick(stats_, BLOCK_CACHE_DATA_MISS);
RecordTick(stats_, BLOCK_CACHE_MISS);
RecordTick(stats_, TitanStats::BLOB_CACHE_MISS);
OwnedSlice blob;
Status s = ReadRecord(handle, record, &blob);
......
#include "version_set.h"
#include "blob_file_set.h"
#include <inttypes.h>
......@@ -10,7 +10,7 @@ namespace titandb {
const size_t kMaxFileCacheSize = 1024 * 1024;
VersionSet::VersionSet(const TitanDBOptions& options, TitanStats* stats)
BlobFileSet::BlobFileSet(const TitanDBOptions& options, TitanStats* stats)
: dirname_(options.dirname),
env_(options.env),
env_options_(options),
......@@ -23,7 +23,7 @@ VersionSet::VersionSet(const TitanDBOptions& options, TitanStats* stats)
file_cache_ = NewLRUCache(file_cache_size);
}
Status VersionSet::Open(
Status BlobFileSet::Open(
const std::map<uint32_t, TitanCFOptions>& column_families) {
// Sets up initial column families.
AddColumnFamilies(column_families);
......@@ -38,7 +38,7 @@ Status VersionSet::Open(
return OpenManifest(NewFileNumber());
}
Status VersionSet::Recover() {
Status BlobFileSet::Recover() {
struct LogReporter : public log::Reader::Reporter {
Status* status;
void Corruption(size_t, const Status& s) override {
......@@ -95,6 +95,12 @@ Status VersionSet::Recover() {
"Next blob file number is %" PRIu64 ".", next_file_number);
}
// Make sure perform gc on all files at the beginning
MarkAllFilesForGC();
for (auto& cf : column_families_) {
cf.second->ComputeGCScore();
}
auto new_manifest_file_number = NewFileNumber();
s = OpenManifest(new_manifest_file_number);
if (!s.ok()) return s;
......@@ -138,13 +144,10 @@ Status VersionSet::Recover() {
env_->DeleteFile(dirname_ + "/" + f);
}
// Make sure perform gc on all files at the beginning
MarkAllFilesForGC();
return Status::OK();
}
Status VersionSet::OpenManifest(uint64_t file_number) {
Status BlobFileSet::OpenManifest(uint64_t file_number) {
Status s;
auto file_name = DescriptorFileName(dirname_, file_number);
......@@ -162,7 +165,7 @@ Status VersionSet::OpenManifest(uint64_t file_number) {
s = WriteSnapshot(manifest_.get());
if (s.ok()) {
ImmutableDBOptions ioptions(db_options_);
s = SyncManifest(env_, &ioptions, manifest_->file());
s = SyncTitanManifest(env_, stats_, &ioptions, manifest_->file());
}
if (s.ok()) {
// Makes "CURRENT" file that points to the new manifest file.
......@@ -176,7 +179,7 @@ Status VersionSet::OpenManifest(uint64_t file_number) {
return s;
}
Status VersionSet::WriteSnapshot(log::Writer* log) {
Status BlobFileSet::WriteSnapshot(log::Writer* log) {
Status s;
// Saves global information
{
......@@ -206,8 +209,8 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
return s;
}
Status VersionSet::LogAndApply(VersionEdit& edit) {
TEST_SYNC_POINT("VersionSet::LogAndApply");
Status BlobFileSet::LogAndApply(VersionEdit& edit) {
TEST_SYNC_POINT("BlobFileSet::LogAndApply");
// TODO(@huachao): write manifest file unlocked
std::string record;
edit.SetNextFileNumber(next_file_number_.load());
......@@ -222,12 +225,12 @@ Status VersionSet::LogAndApply(VersionEdit& edit) {
if (!s.ok()) return s;
ImmutableDBOptions ioptions(db_options_);
s = SyncManifest(env_, &ioptions, manifest_->file());
s = SyncTitanManifest(env_, stats_, &ioptions, manifest_->file());
if (!s.ok()) return s;
return collector.Apply(*this);
}
void VersionSet::AddColumnFamilies(
void BlobFileSet::AddColumnFamilies(
const std::map<uint32_t, TitanCFOptions>& column_families) {
for (auto& cf : column_families) {
auto file_cache = std::make_shared<BlobFileCache>(db_options_, cf.second,
......@@ -238,7 +241,7 @@ void VersionSet::AddColumnFamilies(
}
}
Status VersionSet::DropColumnFamilies(
Status BlobFileSet::DropColumnFamilies(
const std::vector<uint32_t>& column_families,
SequenceNumber obsolete_sequence) {
Status s;
......@@ -248,10 +251,13 @@ Status VersionSet::DropColumnFamilies(
VersionEdit edit;
edit.SetColumnFamilyID(it->first);
for (auto& file : it->second->files_) {
ROCKS_LOG_INFO(db_options_.info_log, "Titan add obsolete file [%llu]",
if (!file.second->is_obsolete()) {
ROCKS_LOG_INFO(db_options_.info_log,
"Titan add obsolete file [%" PRIu64 "]",
file.second->file_number());
edit.DeleteBlobFile(file.first, obsolete_sequence);
}
}
s = LogAndApply(edit);
if (!s.ok()) return s;
} else {
......@@ -264,7 +270,7 @@ Status VersionSet::DropColumnFamilies(
return s;
}
Status VersionSet::DestroyColumnFamily(uint32_t cf_id) {
Status BlobFileSet::MaybeDestroyColumnFamily(uint32_t cf_id) {
obsolete_columns_.erase(cf_id);
auto it = column_families_.find(cf_id);
if (it != column_families_.end()) {
......@@ -279,7 +285,7 @@ Status VersionSet::DestroyColumnFamily(uint32_t cf_id) {
return Status::NotFound("invalid column family");
}
void VersionSet::GetObsoleteFiles(std::vector<std::string>* obsolete_files,
void BlobFileSet::GetObsoleteFiles(std::vector<std::string>* obsolete_files,
SequenceNumber oldest_sequence) {
for (auto it = column_families_.begin(); it != column_families_.end();) {
auto& cf_id = it->first;
......
......@@ -20,9 +20,11 @@
namespace rocksdb {
namespace titandb {
class VersionSet {
// BlobFileSet is the set of all the blobs file generated by Titan.
// It records blob file meta in terms of column family.
class BlobFileSet {
public:
explicit VersionSet(const TitanDBOptions& options, TitanStats* stats);
explicit BlobFileSet(const TitanDBOptions& options, TitanStats* stats);
// Sets up the storage specified in "options.dirname".
// If the manifest doesn't exist, it will create one.
......@@ -31,8 +33,7 @@ class VersionSet {
// outside of the provided column families.
Status Open(const std::map<uint32_t, TitanCFOptions>& column_families);
// Applies *edit on the current version to form a new version that is
// both saved to the manifest and installed as the new current version.
// Applies *edit and saved to the manifest.
// REQUIRES: mutex is held
Status LogAndApply(VersionEdit& edit);
......@@ -50,7 +51,7 @@ class VersionSet {
// Destroy the column family. Only after this is called, the obsolete files
// of the dropped column family can be physical deleted.
// REQUIRES: mutex is held
Status DestroyColumnFamily(uint32_t cf_id);
Status MaybeDestroyColumnFamily(uint32_t cf_id);
// Allocates a new file number.
uint64_t NewFileNumber() { return next_file_number_.fetch_add(1); }
......@@ -75,6 +76,11 @@ class VersionSet {
}
}
// REQUIRES: mutex is held
bool IsColumnFamilyObsolete(uint32_t cf_id) {
return obsolete_columns_.count(cf_id) > 0;
}
private:
friend class BlobFileSizeCollectorTest;
friend class VersionTest;
......
#pragma once
#include "blob_file_set.h"
#include "db_impl.h"
#include "rocksdb/listener.h"
#include "rocksdb/table_properties.h"
#include "util/coding.h"
#include "version_set.h"
namespace rocksdb {
namespace titandb {
......
......@@ -4,6 +4,8 @@
#include "util/filename.h"
#include "util/testharness.h"
#include <cinttypes>
namespace rocksdb {
namespace titandb {
......@@ -18,6 +20,14 @@ class BlobFileTest : public testing::Test {
env_->DeleteDir(dirname_);
}
std::string GenKey(uint64_t i) {
char buf[64];
snprintf(buf, sizeof(buf), "k-%08" PRIu64, i);
return buf;
}
std::string GenValue(uint64_t i) { return std::string(1024, i); }
void TestBlobFilePrefetcher(TitanOptions options) {
options.dirname = dirname_;
TitanDBOptions db_options(options);
......@@ -38,8 +48,8 @@ class BlobFileTest : public testing::Test {
new BlobFileBuilder(db_options, cf_options, file.get()));
for (int i = 0; i < n; i++) {
auto key = std::to_string(i);
auto value = std::string(1024, i);
auto key = GenKey(i);
auto value = GenValue(i);
BlobRecord record;
record.key = key;
record.value = value;
......@@ -56,8 +66,8 @@ class BlobFileTest : public testing::Test {
std::unique_ptr<BlobFilePrefetcher> prefetcher;
ASSERT_OK(cache.NewPrefetcher(file_number_, file_size, &prefetcher));
for (int i = 0; i < n; i++) {
auto key = std::to_string(i);
auto value = std::string(1024, i);
auto key = GenKey(i);
auto value = GenValue(i);
BlobRecord expect;
expect.key = key;
expect.value = value;
......@@ -99,8 +109,8 @@ class BlobFileTest : public testing::Test {
new BlobFileBuilder(db_options, cf_options, file.get()));
for (int i = 0; i < n; i++) {
auto key = std::to_string(i);
auto value = std::string(1024, i);
auto key = GenKey(i);
auto value = GenValue(i);
BlobRecord record;
record.key = key;
record.value = value;
......@@ -122,8 +132,8 @@ class BlobFileTest : public testing::Test {
std::move(random_access_file_reader),
file_size, &blob_file_reader, nullptr));
for (int i = 0; i < n; i++) {
auto key = std::to_string(i);
auto value = std::string(1024, i);
auto key = GenKey(i);
auto value = GenValue(i);
BlobRecord expect;
expect.key = key;
expect.value = value;
......
......@@ -132,11 +132,36 @@ bool operator==(const BlobIndex& lhs, const BlobIndex& rhs) {
void BlobFileMeta::EncodeTo(std::string* dst) const {
PutVarint64(dst, file_number_);
PutVarint64(dst, file_size_);
PutVarint64(dst, file_entries_);
PutVarint32(dst, file_level_);
PutLengthPrefixedSlice(dst, smallest_key_);
PutLengthPrefixedSlice(dst, largest_key_);
}
Status BlobFileMeta::DecodeFrom(Slice* src) {
Status BlobFileMeta::DecodeFromLegacy(Slice* src) {
if (!GetVarint64(src, &file_number_) || !GetVarint64(src, &file_size_)) {
return Status::Corruption("BlobFileMeta Decode failed");
return Status::Corruption("BlobFileMeta decode legacy failed");
}
assert(smallest_key_.empty());
assert(largest_key_.empty());
return Status::OK();
}
Status BlobFileMeta::DecodeFrom(Slice* src) {
if (!GetVarint64(src, &file_number_) || !GetVarint64(src, &file_size_) ||
!GetVarint64(src, &file_entries_) || !GetVarint32(src, &file_level_)) {
return Status::Corruption("BlobFileMeta decode failed");
}
Slice str;
if (GetLengthPrefixedSlice(src, &str)) {
smallest_key_.assign(str.data(), str.size());
} else {
return Status::Corruption("BlobSmallestKey Decode failed");
}
if (GetLengthPrefixedSlice(src, &str)) {
largest_key_.assign(str.data(), str.size());
} else {
return Status::Corruption("BlobLargestKey decode failed");
}
return Status::OK();
}
......@@ -155,7 +180,7 @@ void BlobFileMeta::FileStateTransit(const FileEvent& event) {
// normal state after flush completed.
assert(state_ == FileState::kPendingLSM ||
state_ == FileState::kPendingGC || state_ == FileState::kNormal ||
state_ == FileState::kBeingGC);
state_ == FileState::kBeingGC || state_ == FileState::kObsolete);
if (state_ == FileState::kPendingLSM) state_ = FileState::kNormal;
break;
case FileEvent::kGCCompleted:
......@@ -201,6 +226,27 @@ void BlobFileMeta::AddDiscardableSize(uint64_t _discardable_size) {
assert(discardable_size_ < file_size_);
}
TitanInternalStats::StatsType BlobFileMeta::GetDiscardableRatioLevel() const {
auto ratio = GetDiscardableRatio();
TitanInternalStats::StatsType type;
if (ratio == 0) {
type = TitanInternalStats::NUM_DISCARDABLE_RATIO_LE0;
} else if (ratio <= 0.2) {
type = TitanInternalStats::NUM_DISCARDABLE_RATIO_LE20;
} else if (ratio <= 0.5) {
type = TitanInternalStats::NUM_DISCARDABLE_RATIO_LE50;
} else if (ratio <= 0.8) {
type = TitanInternalStats::NUM_DISCARDABLE_RATIO_LE80;
} else if (ratio <= 1.0 ||
(ratio - 1.0) < std::numeric_limits<double>::epsilon()) {
type = TitanInternalStats::NUM_DISCARDABLE_RATIO_LE100;
} else {
fprintf(stderr, "invalid discarable ratio");
abort();
}
return type;
}
double BlobFileMeta::GetDiscardableRatio() const {
return static_cast<double>(discardable_size_) /
static_cast<double>(file_size_);
......
......@@ -9,17 +9,33 @@
namespace rocksdb {
namespace titandb {
// Blob header format:
// Blob file overall format:
//
// [blob file header]
// [blob head + record 1]
// [blob head + record 2]
// ...
// [blob head + record N]
// [blob file footer]
// Format of blob head (9 bytes):
//
// +---------+---------+-------------+
// | crc | size | compression |
// +---------+---------+-------------+
// | Fixed32 | Fixed32 | char |
// +---------+---------+-------------+
//
// crc : fixed32
// size : fixed32
// compression : char
const uint64_t kBlobHeaderSize = 9;
// Blob record format:
// Format of blob record (not fixed size):
//
// +--------------------+----------------------+
// | key | value |
// +--------------------+----------------------+
// | Varint64 + key_len | Varint64 + value_len |
// +--------------------+----------------------+
//
// key : varint64 length + length bytes
// value : varint64 length + length bytes
struct BlobRecord {
Slice key;
Slice value;
......@@ -27,6 +43,8 @@ struct BlobRecord {
void EncodeTo(std::string* dst) const;
Status DecodeFrom(Slice* src);
size_t size() const { return key.size() + value.size(); }
friend bool operator==(const BlobRecord& lhs, const BlobRecord& rhs);
};
......@@ -63,10 +81,14 @@ class BlobDecoder {
CompressionType compression_{kNoCompression};
};
// Blob handle format:
// Format of blob handle (not fixed size):
//
// +----------+----------+
// | offset | size |
// +----------+----------+
// | Varint64 | Varint64 |
// +----------+----------+
//
// offset : varint64
// size : varint64
struct BlobHandle {
uint64_t offset{0};
uint64_t size{0};
......@@ -77,11 +99,16 @@ struct BlobHandle {
friend bool operator==(const BlobHandle& lhs, const BlobHandle& rhs);
};
// Blob index format:
// Format of blob index (not fixed size):
//
// +------+-------------+------------------------------------+
// | type | file number | blob handle |
// +------+-------------+------------------------------------+
// | char | Varint64 | Varint64(offsest) + Varint64(size) |
// +------+-------------+------------------------------------+
//
// type : char
// file_number_ : varint64
// blob_handle : varint64 offset + varint64 size
// It is stored in LSM-Tree as the value of key, then Titan can use this blob
// index to locate actual value from blob file.
struct BlobIndex {
enum Type : unsigned char {
kBlobRecord = 1,
......@@ -95,10 +122,30 @@ struct BlobIndex {
friend bool operator==(const BlobIndex& lhs, const BlobIndex& rhs);
};
// Blob file meta format:
// Format of blob file meta (not fixed size):
//
// +-------------+-----------+--------------+------------+
// | file number | file size | file entries | file level |
// +-------------+-----------+--------------+------------+
// | Varint64 | Varint64 | Varint64 | Varint32 |
// +-------------+-----------+--------------+------------+
// +--------------------+--------------------+
// | smallest key | largest key |
// +--------------------+--------------------+
// | Varint32 + key_len | Varint32 + key_len |
// +--------------------+--------------------+
//
// The blob file meta is stored in Titan's manifest for quick constructing of
// meta infomations of all the blob files in memory.
//
// Legacy format:
//
// +-------------+-----------+
// | file number | file size |
// +-------------+-----------+
// | Varint64 | Varint64 |
// +-------------+-----------+
//
// file_number_ : varint64
// file_size_ : varint64
class BlobFileMeta {
public:
enum class FileEvent {
......@@ -123,16 +170,30 @@ class BlobFileMeta {
};
BlobFileMeta() = default;
BlobFileMeta(uint64_t _file_number, uint64_t _file_size)
: file_number_(_file_number), file_size_(_file_size) {}
BlobFileMeta(uint64_t _file_number, uint64_t _file_size,
uint64_t _file_entries, uint32_t _file_level,
const std::string& _smallest_key,
const std::string& _largest_key)
: file_number_(_file_number),
file_size_(_file_size),
file_entries_(_file_entries),
file_level_(_file_level),
smallest_key_(_smallest_key),
largest_key_(_largest_key) {}
friend bool operator==(const BlobFileMeta& lhs, const BlobFileMeta& rhs);
void EncodeTo(std::string* dst) const;
Status DecodeFrom(Slice* src);
Status DecodeFromLegacy(Slice* src);
uint64_t file_number() const { return file_number_; }
uint64_t file_size() const { return file_size_; }
uint64_t file_entries() const { return file_entries_; }
uint32_t file_level() const { return file_level_; }
Slice smallest_key() const { return smallest_key_; }
Slice largest_key() const { return largest_key_; }
FileState file_state() const { return state_; }
bool is_obsolete() const { return state_ == FileState::kObsolete; }
uint64_t discardable_size() const { return discardable_size_; }
......@@ -144,11 +205,19 @@ class BlobFileMeta {
void AddDiscardableSize(uint64_t _discardable_size);
double GetDiscardableRatio() const;
TitanInternalStats::StatsType GetDiscardableRatioLevel() const;
private:
// Persistent field
uint64_t file_number_{0};
uint64_t file_size_{0};
uint64_t file_entries_;
// Target level of compaction/flush which generates this blob file
uint32_t file_level_;
// Empty `smallest_key_` and `largest_key_` means smallest key is unknown,
// and can only happen when the file is from legacy version.
std::string smallest_key_;
std::string largest_key_;
// Not persistent field
FileState state_{FileState::kInit};
......@@ -159,12 +228,16 @@ class BlobFileMeta {
bool gc_mark_{false};
};
// Blob file header format.
// Format of blob file header (8 bytes):
//
// +--------------+---------+
// | magic number | version |
// +--------------+---------+
// | Fixed32 | Fixed32 |
// +--------------+---------+
//
// The header is mean to be compatible with header of BlobDB blob files, except
// we use a different magic number.
//
// magic_number : fixed32
// version : fixed32
struct BlobFileHeader {
// The first 32bits from $(echo titandb/blob | sha1sum).
static const uint32_t kHeaderMagicNumber = 0x2be0a614ul;
......@@ -177,12 +250,16 @@ struct BlobFileHeader {
Status DecodeFrom(Slice* src);
};
// Blob file footer format:
// Format of blob file footer (BlockHandle::kMaxEncodedLength + 12):
//
// +---------------------+-------------+--------------+----------+
// | meta index handle | padding | magic number | checksum |
// +---------------------+-------------+--------------+----------+
// | Varint64 + Varint64 | padding_len | Fixed64 | Fixed32 |
// +---------------------+-------------+--------------+----------+
//
// meta_index_handle : varint64 offset + varint64 size
// <padding> : [... kEncodedLength - 12] bytes
// magic_number : fixed64
// checksum : fixed32
// To make the blob file footer fixed size,
// the padding_len is `BlockHandle::kMaxEncodedLength - meta_handle_len`
struct BlobFileFooter {
// The first 64bits from $(echo titandb/blob | sha1sum).
static const uint64_t kFooterMagicNumber{0x2be0a6148e39edc6ull};
......
......@@ -34,7 +34,7 @@ TEST(BlobFormatTest, BlobIndex) {
}
TEST(BlobFormatTest, BlobFileMeta) {
BlobFileMeta input(2, 3);
BlobFileMeta input(2, 3, 0, 0, "0", "9");
CheckCodec(input);
}
......
......@@ -21,7 +21,6 @@ ColumnFamilyData* BlobGC::GetColumnFamilyData() {
}
void BlobGC::AddOutputFile(BlobFileMeta* blob_file) {
blob_file->FileStateTransit(BlobFileMeta::FileEvent::kGCOutput);
outputs_.push_back(blob_file);
}
......@@ -33,6 +32,7 @@ void BlobGC::MarkFilesBeingGC() {
void BlobGC::ReleaseGcFiles() {
for (auto& f : inputs_) {
f->set_gc_mark(false);
f->FileStateTransit(BlobFileMeta::FileEvent::kGCCompleted);
}
......
This diff is collapsed.
......@@ -3,6 +3,7 @@
#include "blob_file_builder.h"
#include "blob_file_iterator.h"
#include "blob_file_manager.h"
#include "blob_file_set.h"
#include "blob_gc.h"
#include "db/db_impl.h"
#include "rocksdb/statistics.h"
......@@ -10,7 +11,6 @@
#include "titan/options.h"
#include "titan_stats.h"
#include "version_edit.h"
#include "version_set.h"
namespace rocksdb {
namespace titandb {
......@@ -20,7 +20,7 @@ class BlobGCJob {
BlobGCJob(BlobGC* blob_gc, DB* db, port::Mutex* mutex,
const TitanDBOptions& titan_db_options, Env* env,
const EnvOptions& env_options, BlobFileManager* blob_file_manager,
VersionSet* version_set, LogBuffer* log_buffer,
BlobFileSet* blob_file_set, LogBuffer* log_buffer,
std::atomic_bool* shuting_down, TitanStats* stats);
// No copying allowed
......@@ -40,6 +40,8 @@ class BlobGCJob {
class GarbageCollectionWriteCallback;
friend class BlobGCJobTest;
void UpdateInternalOpStats();
BlobGC* blob_gc_;
DB* base_db_;
DBImpl* base_db_impl_;
......@@ -48,7 +50,7 @@ class BlobGCJob {
Env* env_;
EnvOptions env_options_;
BlobFileManager* blob_file_manager_;
VersionSet* version_set_;
BlobFileSet* blob_file_set_;
LogBuffer* log_buffer_{nullptr};
std::vector<std::pair<std::unique_ptr<BlobFileHandle>,
......@@ -62,16 +64,27 @@ class BlobGCJob {
TitanStats* stats_;
struct {
uint64_t blob_db_bytes_read = 0;
uint64_t blob_db_bytes_written = 0;
uint64_t blob_db_gc_num_keys_overwritten = 0;
uint64_t blob_db_gc_bytes_overwritten = 0;
uint64_t blob_db_gc_num_keys_relocated = 0;
uint64_t blob_db_gc_bytes_relocated = 0;
uint64_t blob_db_gc_num_new_files = 0;
uint64_t blob_db_gc_num_files = 0;
uint64_t bytes_read = 0;
uint64_t bytes_written = 0;
uint64_t gc_num_keys_overwritten = 0;
uint64_t gc_bytes_overwritten = 0;
uint64_t gc_num_keys_relocated = 0;
uint64_t gc_bytes_relocated = 0;
uint64_t gc_num_new_files = 0;
uint64_t gc_num_files = 0;
uint64_t gc_small_file = 0;
uint64_t gc_discardable = 0;
uint64_t gc_sample = 0;
uint64_t gc_sampling_micros = 0;
uint64_t gc_read_lsm_micros = 0;
uint64_t gc_update_lsm_micros = 0;
} metrics_;
uint64_t prev_bytes_read_ = 0;
uint64_t prev_bytes_written_ = 0;
uint64_t io_bytes_read_ = 0;
uint64_t io_bytes_written_ = 0;
Status SampleCandidateFiles();
Status DoSample(const BlobFileMeta* file, bool* selected);
Status DoRunGC();
......
......@@ -28,7 +28,7 @@ class BlobGCJobTest : public testing::Test {
TitanDB* db_;
DBImpl* base_db_;
TitanDBImpl* tdb_;
VersionSet* version_set_;
BlobFileSet* blob_file_set_;
TitanOptions options_;
port::Mutex* mutex_;
......@@ -43,9 +43,11 @@ class BlobGCJobTest : public testing::Test {
}
~BlobGCJobTest() {}
void DisableMergeSmall() { options_.merge_small_file_threshold = 0; }
std::weak_ptr<BlobStorage> GetBlobStorage(uint32_t cf_id) {
MutexLock l(mutex_);
return version_set_->GetBlobStorage(cf_id);
return blob_file_set_->GetBlobStorage(cf_id);
}
void CheckBlobNumber(int expected) {
......@@ -73,13 +75,22 @@ class BlobGCJobTest : public testing::Test {
void NewDB() {
ClearDir();
Open();
}
void Open() {
ASSERT_OK(TitanDB::Open(options_, dbname_, &db_));
tdb_ = reinterpret_cast<TitanDBImpl*>(db_);
version_set_ = tdb_->vset_.get();
blob_file_set_ = tdb_->blob_file_set_.get();
mutex_ = &tdb_->mutex_;
base_db_ = reinterpret_cast<DBImpl*>(tdb_->GetRootDB());
}
void Reopen() {
DestroyDB();
Open();
}
void Flush() {
FlushOptions fopts;
fopts.wait = true;
......@@ -95,6 +106,11 @@ class BlobGCJobTest : public testing::Test {
ASSERT_OK(db_->CompactRange(compact_opts, nullptr, nullptr));
}
void ReComputeGCScore() {
auto b = GetBlobStorage(base_db_->DefaultColumnFamily()->GetID()).lock();
b->ComputeGCScore();
}
void DestroyDB() {
Status s __attribute__((__unused__)) = db_->Close();
assert(s.ok());
......@@ -102,7 +118,7 @@ class BlobGCJobTest : public testing::Test {
db_ = nullptr;
}
void RunGC(bool expected = false) {
void RunGC(bool expected, bool disable_merge_small = false) {
MutexLock l(mutex_);
Status s;
auto* cfh = base_db_->DefaultColumnFamily();
......@@ -112,27 +128,28 @@ class BlobGCJobTest : public testing::Test {
TitanCFOptions cf_options;
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options.info_log.get());
cf_options.min_gc_batch_size = 0;
if (disable_merge_small) {
cf_options.merge_small_file_threshold = 0;
}
cf_options.blob_file_discardable_ratio = 0.4;
cf_options.sample_file_size_ratio = 1;
std::unique_ptr<BlobGC> blob_gc;
{
std::shared_ptr<BlobGCPicker> blob_gc_picker =
std::make_shared<BasicBlobGCPicker>(db_options, cf_options);
std::make_shared<BasicBlobGCPicker>(db_options, cf_options, nullptr);
blob_gc = blob_gc_picker->PickBlobGC(
version_set_->GetBlobStorage(cfh->GetID()).lock().get());
blob_file_set_->GetBlobStorage(cfh->GetID()).lock().get());
}
if (expected) {
ASSERT_TRUE(blob_gc != nullptr);
}
ASSERT_TRUE((blob_gc != nullptr) == expected);
if (blob_gc) {
blob_gc->SetColumnFamily(cfh);
BlobGCJob blob_gc_job(blob_gc.get(), base_db_, mutex_, tdb_->db_options_,
tdb_->env_, EnvOptions(options_),
tdb_->blob_manager_.get(), version_set_,
tdb_->blob_manager_.get(), blob_file_set_,
&log_buffer, nullptr, nullptr);
s = blob_gc_job.Prepare();
......@@ -151,6 +168,7 @@ class BlobGCJobTest : public testing::Test {
s = blob_gc_job.Finish();
ASSERT_OK(s);
}
blob_gc->ReleaseGcFiles();
}
mutex_->Unlock();
......@@ -189,7 +207,7 @@ class BlobGCJobTest : public testing::Test {
BlobGC blob_gc(std::move(tmp), TitanCFOptions(), false /*trigger_next*/);
blob_gc.SetColumnFamily(cfh);
BlobGCJob blob_gc_job(&blob_gc, base_db_, mutex_, TitanDBOptions(),
Env::Default(), EnvOptions(), nullptr, version_set_,
Env::Default(), EnvOptions(), nullptr, blob_file_set_,
nullptr, nullptr, nullptr);
bool discardable = false;
ASSERT_OK(blob_gc_job.DiscardEntry(key, blob_index, &discardable));
......@@ -205,16 +223,14 @@ class BlobGCJobTest : public testing::Test {
Flush();
std::string result;
for (int i = 0; i < MAX_KEY_NUM; i++) {
if (i % 2 != 0) continue;
if (i % 3 == 0) continue;
db_->Delete(WriteOptions(), GenKey(i));
}
Flush();
CompactAll();
auto b = GetBlobStorage(base_db_->DefaultColumnFamily()->GetID()).lock();
ASSERT_EQ(b->files_.size(), 1);
auto old = b->files_.begin()->first;
// for (auto& f : b->files_) {
// f.second->marked_for_sample = false;
// }
std::unique_ptr<BlobFileIterator> iter;
ASSERT_OK(NewIterator(b->files_.begin()->second->file_number(),
b->files_.begin()->second->file_size(), &iter));
......@@ -224,7 +240,7 @@ class BlobGCJobTest : public testing::Test {
ASSERT_TRUE(iter->Valid());
ASSERT_TRUE(iter->key().compare(Slice(GenKey(i))) == 0);
}
RunGC();
RunGC(true);
b = GetBlobStorage(base_db_->DefaultColumnFamily()->GetID()).lock();
ASSERT_EQ(b->files_.size(), 1);
auto new1 = b->files_.begin()->first;
......@@ -235,7 +251,7 @@ class BlobGCJobTest : public testing::Test {
auto* db_iter = db_->NewIterator(ReadOptions(), db_->DefaultColumnFamily());
db_iter->SeekToFirst();
for (int i = 0; i < MAX_KEY_NUM; i++) {
if (i % 2 == 0) continue;
if (i % 3 != 0) continue;
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_TRUE(iter->key().compare(Slice(GenKey(i))) == 0);
......@@ -327,7 +343,7 @@ TEST_F(BlobGCJobTest, GCLimiter) {
NewDB();
PutAndUpdate();
test_limiter->Reset();
RunGC();
RunGC(true);
ASSERT_TRUE(test_limiter->WriteRequested());
ASSERT_FALSE(test_limiter->ReadRequested());
DestroyDB();
......@@ -337,7 +353,7 @@ TEST_F(BlobGCJobTest, GCLimiter) {
NewDB();
PutAndUpdate();
test_limiter->Reset();
RunGC();
RunGC(true);
ASSERT_FALSE(test_limiter->WriteRequested());
ASSERT_TRUE(test_limiter->ReadRequested());
DestroyDB();
......@@ -347,12 +363,36 @@ TEST_F(BlobGCJobTest, GCLimiter) {
NewDB();
PutAndUpdate();
test_limiter->Reset();
RunGC();
RunGC(true);
ASSERT_TRUE(test_limiter->WriteRequested());
ASSERT_TRUE(test_limiter->ReadRequested());
DestroyDB();
}
TEST_F(BlobGCJobTest, Reopen) {
DisableMergeSmall();
NewDB();
for (int i = 0; i < 10; i++) {
db_->Put(WriteOptions(), GenKey(i), GenValue(i));
}
Flush();
CheckBlobNumber(1);
Reopen();
RunGC(true, true);
CheckBlobNumber(1);
// trigger compute gc score
ReComputeGCScore();
RunGC(false, true);
CheckBlobNumber(1);
DestroyDB();
}
// Tests blob file will be kept after GC, if it is still visible by active
// snapshots.
TEST_F(BlobGCJobTest, PurgeBlobs) {
......@@ -375,34 +415,38 @@ TEST_F(BlobGCJobTest, PurgeBlobs) {
CheckBlobNumber(1);
auto snap4 = db_->GetSnapshot();
RunGC();
CheckBlobNumber(1);
for (int i = 10; i < 20; i++) {
db_->Put(WriteOptions(), GenKey(i), GenValue(i));
}
Flush();
auto snap5 = db_->GetSnapshot();
CheckBlobNumber(2);
// merge two blob files into one
CompactAll();
RunGC(true);
CheckBlobNumber(3);
auto snap5 = db_->GetSnapshot();
db_->ReleaseSnapshot(snap2);
RunGC();
RunGC(false);
CheckBlobNumber(3);
db_->ReleaseSnapshot(snap3);
RunGC();
RunGC(false);
CheckBlobNumber(3);
db_->ReleaseSnapshot(snap1);
RunGC();
RunGC(false);
CheckBlobNumber(3);
db_->ReleaseSnapshot(snap4);
RunGC();
CheckBlobNumber(2);
RunGC(false);
CheckBlobNumber(3);
db_->ReleaseSnapshot(snap5);
RunGC();
RunGC(false);
CheckBlobNumber(1);
DestroyDB();
......@@ -413,6 +457,11 @@ TEST_F(BlobGCJobTest, DeleteFilesInRange) {
ASSERT_OK(db_->Put(WriteOptions(), GenKey(2), GenValue(21)));
ASSERT_OK(db_->Put(WriteOptions(), GenKey(4), GenValue(4)));
ASSERT_OK(db_->Put(WriteOptions(), GenKey(5), GenValue(5)));
ASSERT_OK(db_->Put(WriteOptions(), GenKey(6), GenValue(5)));
ASSERT_OK(db_->Put(WriteOptions(), GenKey(7), GenValue(5)));
ASSERT_OK(db_->Put(WriteOptions(), GenKey(8), GenValue(5)));
ASSERT_OK(db_->Put(WriteOptions(), GenKey(9), GenValue(5)));
Flush();
CompactAll();
std::string value;
......@@ -421,6 +470,18 @@ TEST_F(BlobGCJobTest, DeleteFilesInRange) {
ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level6", &value));
ASSERT_EQ(value, "1");
ASSERT_OK(db_->Delete(WriteOptions(), GenKey(5)));
ASSERT_OK(db_->Delete(WriteOptions(), GenKey(6)));
ASSERT_OK(db_->Delete(WriteOptions(), GenKey(7)));
ASSERT_OK(db_->Delete(WriteOptions(), GenKey(8)));
ASSERT_OK(db_->Delete(WriteOptions(), GenKey(9)));
CompactAll();
ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level0", &value));
ASSERT_EQ(value, "0");
ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level6", &value));
ASSERT_EQ(value, "1");
SstFileWriter sst_file_writer(EnvOptions(), options_);
std::string sst_file = options_.dirname + "/for_ingest.sst";
ASSERT_OK(sst_file_writer.Open(sst_file));
......@@ -435,6 +496,8 @@ TEST_F(BlobGCJobTest, DeleteFilesInRange) {
ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level6", &value));
ASSERT_EQ(value, "1");
CheckBlobNumber(1);
RunGC(true);
std::string key0 = GenKey(0);
......
......@@ -4,8 +4,9 @@ namespace rocksdb {
namespace titandb {
BasicBlobGCPicker::BasicBlobGCPicker(TitanDBOptions db_options,
TitanCFOptions cf_options)
: db_options_(db_options), cf_options_(cf_options) {}
TitanCFOptions cf_options,
TitanStats* stats)
: db_options_(db_options), cf_options_(cf_options), stats_(stats) {}
BasicBlobGCPicker::~BasicBlobGCPicker() {}
......@@ -23,26 +24,15 @@ std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC(
bool maybe_continue_next_time = false;
uint64_t next_gc_size = 0;
for (auto& gc_score : blob_storage->gc_score()) {
if (gc_score.score < cf_options_.blob_file_discardable_ratio) {
break;
}
auto blob_file = blob_storage->FindFile(gc_score.file_number).lock();
if (!blob_file ||
blob_file->file_state() == BlobFileMeta::FileState::kBeingGC) {
if (!CheckBlobFile(blob_file.get())) {
RecordTick(stats_, TitanStats::GC_NO_NEED, 1);
// Skip this file id this file is being GCed
// or this file had been GCed
continue;
}
// ROCKS_LOG_INFO(db_options_.info_log,
// "file number:%lu score:%f being_gc:%d pending:%d, "
// "size:%lu discard:%lu mark_for_gc:%d
// mark_for_sample:%d", blob_file->file_number_,
// gc_score.score, blob_file->being_gc,
// blob_file->pending, blob_file->file_size_,
// blob_file->discardable_size_,
// blob_file->marked_for_gc_,
// blob_file->marked_for_sample);
if (!CheckBlobFile(blob_file.get())) {
ROCKS_LOG_INFO(db_options_.info_log, "file number:%lu no need gc",
ROCKS_LOG_INFO(db_options_.info_log, "Blob file %" PRIu64 " no need gc",
blob_file->file_number());
continue;
}
......@@ -59,38 +49,44 @@ std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC(
stop_picking = true;
}
} else {
if (blob_file->file_size() <= cf_options_.merge_small_file_threshold ||
blob_file->gc_mark() ||
blob_file->GetDiscardableRatio() >=
cf_options_.blob_file_discardable_ratio) {
next_gc_size += blob_file->file_size();
if (next_gc_size > cf_options_.min_gc_batch_size) {
maybe_continue_next_time = true;
RecordTick(stats_, TitanStats::GC_REMAIN, 1);
ROCKS_LOG_INFO(db_options_.info_log,
"remain more than %" PRIu64
" bytes to be gc and trigger after this gc",
next_gc_size);
break;
}
} else {
break;
}
}
}
ROCKS_LOG_DEBUG(db_options_.info_log,
"got batch size %" PRIu64 ", estimate output %" PRIu64
" bytes",
batch_size, estimate_output_size);
if (blob_files.empty() || batch_size < cf_options_.min_gc_batch_size)
if (blob_files.empty() || batch_size < cf_options_.min_gc_batch_size) {
return nullptr;
}
// if there is only one small file to merge, no need to perform
if (blob_files.size() == 1 &&
blob_files[0]->file_size() <= cf_options_.merge_small_file_threshold &&
blob_files[0]->gc_mark() == false &&
blob_files[0]->GetDiscardableRatio() <
cf_options_.blob_file_discardable_ratio) {
return nullptr;
}
return std::unique_ptr<BlobGC>(new BlobGC(
std::move(blob_files), std::move(cf_options_), maybe_continue_next_time));
}
bool BasicBlobGCPicker::CheckBlobFile(BlobFileMeta* blob_file) const {
assert(blob_file->file_state() != BlobFileMeta::FileState::kInit);
if (blob_file->file_state() != BlobFileMeta::FileState::kNormal) return false;
assert(blob_file != nullptr &&
blob_file->file_state() != BlobFileMeta::FileState::kInit);
if (blob_file != nullptr &&
blob_file->file_state() != BlobFileMeta::FileState::kNormal)
return false;
return true;
}
......
......@@ -28,7 +28,7 @@ class BlobGCPicker {
class BasicBlobGCPicker final : public BlobGCPicker {
public:
BasicBlobGCPicker(TitanDBOptions, TitanCFOptions);
BasicBlobGCPicker(TitanDBOptions, TitanCFOptions, TitanStats*);
~BasicBlobGCPicker();
std::unique_ptr<BlobGC> PickBlobGC(BlobStorage* blob_storage) override;
......@@ -36,6 +36,7 @@ class BasicBlobGCPicker final : public BlobGCPicker {
private:
TitanDBOptions db_options_;
TitanCFOptions cf_options_;
TitanStats* stats_;
// Check if blob_file needs to gc, return true means we need pick this
// file for gc
......
......@@ -25,12 +25,13 @@ class BlobGCPickerTest : public testing::Test {
blob_storage_.reset(new BlobStorage(titan_db_options, titan_cf_options, 0,
blob_file_cache, nullptr));
basic_blob_gc_picker_.reset(
new BasicBlobGCPicker(titan_db_options, titan_cf_options));
new BasicBlobGCPicker(titan_db_options, titan_cf_options, nullptr));
}
void AddBlobFile(uint64_t file_number, uint64_t file_size,
uint64_t discardable_size, bool being_gc = false) {
auto f = std::make_shared<BlobFileMeta>(file_number, file_size);
auto f =
std::make_shared<BlobFileMeta>(file_number, file_size, 0, 0, "", "");
f->AddDiscardableSize(discardable_size);
f->FileStateTransit(BlobFileMeta::FileEvent::kDbRestart);
if (being_gc) {
......@@ -55,9 +56,13 @@ TEST_F(BlobGCPickerTest, Basic) {
AddBlobFile(1U, 1U, 0U);
UpdateBlobStorage();
auto blob_gc = basic_blob_gc_picker_->PickBlobGC(blob_storage_.get());
ASSERT_TRUE(blob_gc == nullptr);
AddBlobFile(2U, 1U, 0U);
UpdateBlobStorage();
blob_gc = basic_blob_gc_picker_->PickBlobGC(blob_storage_.get());
ASSERT_TRUE(blob_gc != nullptr);
ASSERT_EQ(blob_gc->inputs().size(), 1);
ASSERT_EQ(blob_gc->inputs()[0]->file_number(), 1U);
ASSERT_EQ(blob_gc->inputs().size(), 2);
}
TEST_F(BlobGCPickerTest, BeingGC) {
......@@ -72,10 +77,12 @@ TEST_F(BlobGCPickerTest, BeingGC) {
NewBlobStorageAndPicker(titan_db_options, titan_cf_options);
AddBlobFile(1U, 1U, 0U, true);
AddBlobFile(2U, 1U, 0U);
AddBlobFile(3U, 1U, 0U);
UpdateBlobStorage();
blob_gc = basic_blob_gc_picker_->PickBlobGC(blob_storage_.get());
ASSERT_EQ(blob_gc->inputs().size(), 1);
ASSERT_EQ(blob_gc->inputs()[0]->file_number(), 2U);
ASSERT_EQ(blob_gc->inputs().size(), 2);
ASSERT_NE(blob_gc->inputs()[0]->file_number(), 1U);
ASSERT_NE(blob_gc->inputs()[1]->file_number(), 1U);
}
TEST_F(BlobGCPickerTest, TriggerNext) {
......@@ -92,15 +99,6 @@ TEST_F(BlobGCPickerTest, TriggerNext) {
auto blob_gc = basic_blob_gc_picker_->PickBlobGC(blob_storage_.get());
ASSERT_TRUE(blob_gc != nullptr);
ASSERT_EQ(blob_gc->trigger_next(), true);
NewBlobStorageAndPicker(titan_db_options, titan_cf_options);
AddBlobFile(1U, 1U << 30, 0U); // valid_size = 1GB
AddBlobFile(2U, 1U << 30, 0U); // valid_size = 1GB
AddBlobFile(3U, 1U << 30, 0U); // valid_size = 1GB
AddBlobFile(4U, 1U << 30, 0U); // valid_size = 1GB
UpdateBlobStorage();
blob_gc = basic_blob_gc_picker_->PickBlobGC(blob_storage_.get());
ASSERT_TRUE(blob_gc != nullptr);
ASSERT_EQ(blob_gc->trigger_next(), false);
}
TEST_F(BlobGCPickerTest, PickFileAndTriggerNext) {
......
#include "blob_storage.h"
#include "version_set.h"
#include "blob_file_set.h"
namespace rocksdb {
namespace titandb {
......
......@@ -13,7 +13,7 @@ namespace rocksdb {
namespace titandb {
// Provides methods to access the blob storage for a specific
// column family. The version must be valid when this storage is used.
// column family.
class BlobStorage {
public:
BlobStorage(const BlobStorage& bs) : destroyed_(false) {
......@@ -68,6 +68,8 @@ class BlobStorage {
for (auto& file : files_) {
file.second->set_gc_mark(true);
file.second->FileStateTransit(BlobFileMeta::FileEvent::kDbRestart);
auto level = file.second->GetDiscardableRatioLevel();
AddStats(stats_, cf_id_, level, 1);
}
}
......@@ -101,7 +103,7 @@ class BlobStorage {
SequenceNumber obsolete_sequence);
private:
friend class VersionSet;
friend class BlobFileSet;
friend class VersionTest;
friend class BlobGCPickerTest;
friend class BlobGCJobTest;
......
This diff is collapsed.
#pragma once
#include "blob_file_manager.h"
#include "db/db_impl.h"
#include "rocksdb/statistics.h"
#include "util/repeatable_thread.h"
#include "blob_file_manager.h"
#include "blob_file_set.h"
#include "table_factory.h"
#include "titan/db.h"
#include "util/repeatable_thread.h"
#include "version_set.h"
#include "titan_stats.h"
namespace rocksdb {
namespace titandb {
struct TitanColumnFamilyInfo {
const std::string name;
const ImmutableTitanCFOptions immutable_cf_options;
MutableTitanCFOptions mutable_cf_options;
std::shared_ptr<TableFactory> base_table_factory;
std::shared_ptr<TitanTableFactory> titan_table_factory;
};
class TitanDBImpl : public TitanDB {
public:
TitanDBImpl(const TitanDBOptions& options, const std::string& dbname);
......@@ -127,6 +137,11 @@ class TitanDBImpl : public TitanDB {
Status TEST_StartGC(uint32_t column_family_id);
Status TEST_PurgeObsoleteFiles();
int TEST_bg_gc_running() {
MutexLock l(&mutex_);
return bg_gc_running_;
}
private:
class FileManager;
friend class FileManager;
......@@ -135,6 +150,8 @@ class TitanDBImpl : public TitanDB {
friend class TitanDBTest;
friend class TitanThreadSafetyTest;
Status ValidateOptions() const;
Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* handle,
const Slice& key, PinnableSlice* value);
......@@ -186,6 +203,9 @@ class TitanDBImpl : public TitanDB {
return oldest_snapshot;
}
// REQUIRE: mutex_ held
bool HasPendingDropCFRequest(uint32_t cf_id);
// REQUIRE: mutex_ held
Status SetBGError(const Status& s);
......@@ -196,6 +216,8 @@ class TitanDBImpl : public TitanDB {
bool HasBGError() { return has_bg_error_.load(); }
void DumpStats();
FileLock* lock_{nullptr};
// The lock sequence must be Titan.mutex_.Lock() -> Base DB mutex_.Lock()
// while the unlock sequence must be Base DB mutex.Unlock() ->
......@@ -203,7 +225,9 @@ class TitanDBImpl : public TitanDB {
// potential dead lock.
mutable port::Mutex mutex_;
// This condition variable is signaled on these conditions:
// * whenever bg_gc_scheduled_ goes down to 0
// * whenever bg_gc_scheduled_ goes down to 0.
// * whenever bg_gc_running_ goes down to 0.
// * whenever drop_cf_requests_ goes down to 0.
port::CondVar bg_cv_;
std::string dbname_;
......@@ -220,24 +244,16 @@ class TitanDBImpl : public TitanDB {
// is not null.
std::unique_ptr<TitanStats> stats_;
// Guarded by mutex_.
std::unordered_map<uint32_t, ImmutableTitanCFOptions> immutable_cf_options_;
// Guarded by mutex_.
std::unordered_map<uint32_t, MutableTitanCFOptions> mutable_cf_options_;
// Guarded by mutex_.
std::unordered_map<uint32_t, std::shared_ptr<TableFactory>>
base_table_factory_;
// Guarded by mutex_.
std::unordered_map<uint32_t, std::shared_ptr<TitanTableFactory>>
titan_table_factory_;
// Access while holding mutex_ lock or during DB open.
std::unordered_map<uint32_t, TitanColumnFamilyInfo> cf_info_;
// handle for purging obsolete blob files at fixed intervals
std::unique_ptr<RepeatableThread> thread_purge_obsolete_;
std::unique_ptr<VersionSet> vset_;
// handle for dump internal stats at fixed intervals.
std::unique_ptr<RepeatableThread> thread_dump_stats_;
std::unique_ptr<BlobFileSet> blob_file_set_;
std::set<uint64_t> pending_outputs_;
std::shared_ptr<BlobFileManager> blob_manager_;
......@@ -245,10 +261,14 @@ class TitanDBImpl : public TitanDB {
// pending_gc_ hold column families that already on gc_queue_.
std::deque<uint32_t> gc_queue_;
// Guarded by mutex_.
int bg_gc_scheduled_{0};
// REQUIRE: mutex_ held
int unscheduled_gc_{0};
// REQUIRE: mutex_ held.
int bg_gc_scheduled_ = 0;
// REQUIRE: mutex_ held.
int bg_gc_running_ = 0;
// REQUIRE: mutex_ held.
int unscheduled_gc_ = 0;
// REQUIRE: mutex_ held.
int drop_cf_requests_ = 0;
std::atomic_bool shuting_down_{false};
};
......
......@@ -9,7 +9,7 @@ Status TitanDBImpl::PurgeObsoleteFilesImpl() {
auto oldest_sequence = GetOldestSnapshotSequence();
{
MutexLock l(&mutex_);
vset_->GetObsoleteFiles(&candidate_files, oldest_sequence);
blob_file_set_->GetObsoleteFiles(&candidate_files, oldest_sequence);
}
// dedup state.inputs so we don't try to delete the same
......
#include "db_impl.h"
#include "util/sync_point.h"
#include "blob_file_iterator.h"
#include "blob_gc_job.h"
#include "blob_gc_picker.h"
......@@ -28,11 +30,17 @@ void TitanDBImpl::BGWorkGC(void* db) {
void TitanDBImpl::BackgroundCallGC() {
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
TEST_SYNC_POINT("TitanDBImpl::BackgroundCallGC:BeforeGCRunning");
{
MutexLock l(&mutex_);
assert(bg_gc_scheduled_ > 0);
while (drop_cf_requests_ > 0) {
bg_cv_.Wait();
}
bg_gc_running_++;
TEST_SYNC_POINT("TitanDBImpl::BackgroundCallGC:BeforeBackgroundGC");
BackgroundGC(&log_buffer);
{
......@@ -42,13 +50,14 @@ void TitanDBImpl::BackgroundCallGC() {
mutex_.Lock();
}
bg_gc_running_--;
bg_gc_scheduled_--;
MaybeScheduleGC();
if (bg_gc_scheduled_ == 0) {
// signal if
// * bg_gc_scheduled_ == 0 -- need to wakeup ~TitanDBImpl
if (bg_gc_scheduled_ == 0 || bg_gc_running_ == 0) {
// Signal DB destructor if bg_gc_scheduled_ drop to 0.
// Signal drop CF requests if bg_gc_running_ drop to 0.
// If none of this is true, there is no need to signal since nobody is
// waiting for it
// waiting for it.
bg_cv_.SignalAll();
}
// IMPORTANT: there should be no code after calling SignalAll. This call may
......@@ -60,20 +69,28 @@ void TitanDBImpl::BackgroundCallGC() {
Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) {
mutex_.AssertHeld();
StopWatch gc_sw(env_, statistics(stats_.get()), BLOB_DB_GC_MICROS);
StopWatch gc_sw(env_, stats_.get(), BLOB_DB_GC_MICROS);
std::unique_ptr<BlobGC> blob_gc;
std::unique_ptr<ColumnFamilyHandle> cfh;
Status s;
if (!gc_queue_.empty()) {
uint32_t column_family_id = PopFirstFromGCQueue();
auto bs = vset_->GetBlobStorage(column_family_id).lock().get();
if (bs) {
const auto& cf_options = bs->cf_options();
std::shared_ptr<BlobStorage> blob_storage;
// Skip CFs that have been dropped.
if (!blob_file_set_->IsColumnFamilyObsolete(column_family_id)) {
blob_storage = blob_file_set_->GetBlobStorage(column_family_id).lock();
} else {
TEST_SYNC_POINT_CALLBACK("TitanDBImpl::BackgroundGC:CFDropped", nullptr);
ROCKS_LOG_BUFFER(log_buffer, "GC skip dropped colum family [%s].",
cf_info_[column_family_id].name.c_str());
}
if (blob_storage != nullptr) {
const auto& cf_options = blob_storage->cf_options();
std::shared_ptr<BlobGCPicker> blob_gc_picker =
std::make_shared<BasicBlobGCPicker>(db_options_, cf_options);
blob_gc = blob_gc_picker->PickBlobGC(bs);
std::make_shared<BasicBlobGCPicker>(db_options_, cf_options,
stats_.get());
blob_gc = blob_gc_picker->PickBlobGC(blob_storage.get());
if (blob_gc) {
cfh = db_impl_->GetColumnFamilyHandleUnlocked(column_family_id);
......@@ -90,8 +107,9 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) {
ROCKS_LOG_BUFFER(log_buffer, "Titan GC nothing to do");
} else {
BlobGCJob blob_gc_job(blob_gc.get(), db_, &mutex_, db_options_, env_,
env_options_, blob_manager_.get(), vset_.get(),
log_buffer, &shuting_down_, stats_.get());
env_options_, blob_manager_.get(),
blob_file_set_.get(), log_buffer, &shuting_down_,
stats_.get());
s = blob_gc_job.Prepare();
if (s.ok()) {
mutex_.Unlock();
......@@ -106,6 +124,7 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) {
if (blob_gc->trigger_next() &&
(bg_gc_scheduled_ - 1 + gc_queue_.size() <
2 * static_cast<uint32_t>(db_options_.max_background_gc))) {
RecordTick(stats_.get(), TitanStats::GC_TRIGGER_NEXT, 1);
// There is still data remained to be GCed
// and the queue is not overwhelmed
// then put this cf to GC queue for next GC
......@@ -114,38 +133,47 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) {
}
if (s.ok()) {
RecordTick(stats_.get(), TitanStats::GC_SUCCESS, 1);
// Done
} else {
SetBGError(s);
RecordTick(stats_.get(), TitanStats::GC_FAIL, 1);
ROCKS_LOG_WARN(db_options_.info_log, "Titan GC error: %s",
s.ToString().c_str());
}
TEST_SYNC_POINT("TitanDBImpl::BackgroundGC:Finish");
return s;
}
// TODO(yiwu): merge with BackgroundGC().
Status TitanDBImpl::TEST_StartGC(uint32_t column_family_id) {
{
MutexLock l(&mutex_);
bg_gc_scheduled_++;
}
// BackgroundCallGC
Status s;
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
{
MutexLock l(&mutex_);
assert(bg_gc_scheduled_ > 0);
// Prevent CF being dropped while GC is running.
while (drop_cf_requests_ > 0) {
bg_cv_.Wait();
}
bg_gc_running_++;
bg_gc_scheduled_++;
// BackgroudGC
StopWatch gc_sw(env_, statistics(stats_.get()), BLOB_DB_GC_MICROS);
StopWatch gc_sw(env_, stats_.get(), BLOB_DB_GC_MICROS);
std::unique_ptr<BlobGC> blob_gc;
std::unique_ptr<ColumnFamilyHandle> cfh;
auto bs = vset_->GetBlobStorage(column_family_id).lock().get();
if (blob_file_set_->IsColumnFamilyObsolete(column_family_id)) {
return Status::ShutdownInProgress(
"Column Family has been dropped before GC.");
}
auto bs = blob_file_set_->GetBlobStorage(column_family_id).lock().get();
const auto& cf_options = bs->cf_options();
std::shared_ptr<BlobGCPicker> blob_gc_picker =
std::make_shared<BasicBlobGCPicker>(db_options_, cf_options);
std::make_shared<BasicBlobGCPicker>(db_options_, cf_options, nullptr);
blob_gc = blob_gc_picker->PickBlobGC(bs);
if (blob_gc) {
......@@ -158,8 +186,9 @@ Status TitanDBImpl::TEST_StartGC(uint32_t column_family_id) {
ROCKS_LOG_BUFFER(&log_buffer, "Titan GC nothing to do");
} else {
BlobGCJob blob_gc_job(blob_gc.get(), db_, &mutex_, db_options_, env_,
env_options_, blob_manager_.get(), vset_.get(),
&log_buffer, &shuting_down_, stats_.get());
env_options_, blob_manager_.get(),
blob_file_set_.get(), &log_buffer, &shuting_down_,
stats_.get());
s = blob_gc_job.Prepare();
if (s.ok()) {
......@@ -187,8 +216,9 @@ Status TitanDBImpl::TEST_StartGC(uint32_t column_family_id) {
mutex_.Lock();
}
bg_gc_running_--;
bg_gc_scheduled_--;
if (bg_gc_scheduled_ == 0) {
if (bg_gc_scheduled_ == 0 || bg_gc_running_ == 0) {
bg_cv_.SignalAll();
}
}
......
......@@ -46,7 +46,7 @@ class TitanDBIterator : public Iterator {
void SeekToFirst() override {
iter_->SeekToFirst();
if (ShouldGetBlobValue()) {
StopWatch seek_sw(env_, statistics(stats_), BLOB_DB_SEEK_MICROS);
StopWatch seek_sw(env_, stats_, BLOB_DB_SEEK_MICROS);
GetBlobValue();
RecordTick(stats_, BLOB_DB_NUM_SEEK);
}
......@@ -55,7 +55,7 @@ class TitanDBIterator : public Iterator {
void SeekToLast() override {
iter_->SeekToLast();
if (ShouldGetBlobValue()) {
StopWatch seek_sw(env_, statistics(stats_), BLOB_DB_SEEK_MICROS);
StopWatch seek_sw(env_, stats_, BLOB_DB_SEEK_MICROS);
GetBlobValue();
RecordTick(stats_, BLOB_DB_NUM_SEEK);
}
......@@ -64,7 +64,7 @@ class TitanDBIterator : public Iterator {
void Seek(const Slice& target) override {
iter_->Seek(target);
if (ShouldGetBlobValue()) {
StopWatch seek_sw(env_, statistics(stats_), BLOB_DB_SEEK_MICROS);
StopWatch seek_sw(env_, stats_, BLOB_DB_SEEK_MICROS);
GetBlobValue();
RecordTick(stats_, BLOB_DB_NUM_SEEK);
}
......@@ -73,7 +73,7 @@ class TitanDBIterator : public Iterator {
void SeekForPrev(const Slice& target) override {
iter_->SeekForPrev(target);
if (ShouldGetBlobValue()) {
StopWatch seek_sw(env_, statistics(stats_), BLOB_DB_SEEK_MICROS);
StopWatch seek_sw(env_, stats_, BLOB_DB_SEEK_MICROS);
GetBlobValue();
RecordTick(stats_, BLOB_DB_NUM_SEEK);
}
......@@ -83,7 +83,7 @@ class TitanDBIterator : public Iterator {
assert(Valid());
iter_->Next();
if (ShouldGetBlobValue()) {
StopWatch next_sw(env_, statistics(stats_), BLOB_DB_NEXT_MICROS);
StopWatch next_sw(env_, stats_, BLOB_DB_NEXT_MICROS);
GetBlobValue();
RecordTick(stats_, BLOB_DB_NUM_NEXT);
}
......@@ -93,7 +93,7 @@ class TitanDBIterator : public Iterator {
assert(Valid());
iter_->Prev();
if (ShouldGetBlobValue()) {
StopWatch prev_sw(env_, statistics(stats_), BLOB_DB_PREV_MICROS);
StopWatch prev_sw(env_, stats_, BLOB_DB_PREV_MICROS);
GetBlobValue();
RecordTick(stats_, BLOB_DB_NUM_PREV);
}
......
......@@ -2,9 +2,9 @@
#include <unordered_map>
#include "blob_file_set.h"
#include "util/string_util.h"
#include "version_edit.h"
#include "version_set.h"
#include <inttypes.h>
......@@ -49,12 +49,12 @@ class EditCollector {
}
// Seal the batch and check the validation of the edits.
Status Seal(VersionSet& vset) {
Status Seal(BlobFileSet& blob_file_set) {
if (!status_.ok()) return status_;
for (auto& cf : column_families_) {
auto cf_id = cf.first;
auto storage = vset.GetBlobStorage(cf_id).lock();
auto storage = blob_file_set.GetBlobStorage(cf_id).lock();
if (!storage) {
// TODO: support OpenForReadOnly which doesn't open DB with all column
// family so there are maybe some invalid column family, but we can't
......@@ -71,7 +71,7 @@ class EditCollector {
}
// Apply the edits of the batch.
Status Apply(VersionSet& vset) {
Status Apply(BlobFileSet& blob_file_set) {
if (!status_.ok()) return status_;
if (!sealed_)
return Status::Incomplete(
......@@ -79,7 +79,7 @@ class EditCollector {
for (auto& cf : column_families_) {
auto cf_id = cf.first;
auto storage = vset.GetBlobStorage(cf_id).lock();
auto storage = blob_file_set.GetBlobStorage(cf_id).lock();
if (!storage) {
// TODO: support OpenForReadOnly which doesn't open DB with all column
// family so there are maybe some invalid column family, but we can't
......
......@@ -22,8 +22,11 @@ void TitanDBOptions::Dump(Logger* logger) const {
"TitanDBOptions.max_background_gc : %" PRIi32,
max_background_gc);
ROCKS_LOG_HEADER(logger,
"TitanDBOptions.purge_obsolete_files_period: %" PRIu32,
purge_obsolete_files_period);
"TitanDBOptions.purge_obsolete_files_period_sec: %" PRIu32,
purge_obsolete_files_period_sec);
ROCKS_LOG_HEADER(logger,
"TitanDBOptions.titan_stats_dump_period_sec: %" PRIu32,
titan_stats_dump_period_sec);
}
TitanCFOptions::TitanCFOptions(const ColumnFamilyOptions& cf_opts,
......
......@@ -5,6 +5,7 @@
#endif
#include <inttypes.h>
#include "monitoring/statistics.h"
namespace rocksdb {
namespace titandb {
......@@ -18,6 +19,10 @@ void TitanTableBuilder::Add(const Slice& key, const Slice& value) {
return;
}
uint64_t prev_bytes_read = 0;
uint64_t prev_bytes_written = 0;
SavePrevIOBytes(&prev_bytes_read, &prev_bytes_written);
if (ikey.type == kTypeBlobIndex &&
cf_options_.blob_run_mode == TitanBlobRunMode::kFallback) {
// we ingest value from blob file
......@@ -33,14 +38,16 @@ void TitanTableBuilder::Add(const Slice& key, const Slice& value) {
auto storage = blob_storage_.lock();
assert(storage != nullptr);
ReadOptions options; // dummy option
Status get_status = storage->Get(options, index, &record, &buffer);
UpdateIOBytes(prev_bytes_read, prev_bytes_written, &io_bytes_read_,
&io_bytes_written_);
if (get_status.ok()) {
ikey.type = kTypeValue;
std::string index_key;
AppendInternalKey(&index_key, ikey);
base_builder_->Add(index_key, record.value);
bytes_read_ += record.size();
} else {
// Get blob value can fail if corresponding blob file has been GC-ed
// deleted. In this case we write the blob index as is to compaction
......@@ -54,6 +61,8 @@ void TitanTableBuilder::Add(const Slice& key, const Slice& value) {
// we write to blob file and insert index
std::string index_value;
AddBlob(ikey.user_key, value, &index_value);
UpdateIOBytes(prev_bytes_read, prev_bytes_written, &io_bytes_read_,
&io_bytes_written_);
if (ok()) {
ikey.type = kTypeBlobIndex;
std::string index_key;
......@@ -68,8 +77,7 @@ void TitanTableBuilder::Add(const Slice& key, const Slice& value) {
void TitanTableBuilder::AddBlob(const Slice& key, const Slice& value,
std::string* index_value) {
if (!ok()) return;
StopWatch write_sw(db_options_.env, statistics(stats_),
BLOB_DB_BLOB_FILE_WRITE_MICROS);
StopWatch write_sw(db_options_.env, stats_, BLOB_DB_BLOB_FILE_WRITE_MICROS);
if (!blob_builder_) {
status_ = blob_manager_->NewFile(&blob_handle_);
......@@ -85,6 +93,7 @@ void TitanTableBuilder::AddBlob(const Slice& key, const Slice& value,
MeasureTime(stats_, BLOB_DB_KEY_SIZE, key.size());
MeasureTime(stats_, BLOB_DB_VALUE_SIZE, value.size());
AddStats(stats_, cf_id_, TitanInternalStats::LIVE_BLOB_SIZE, value.size());
bytes_written_ += key.size() + value.size();
BlobIndex index;
BlobRecord record;
......@@ -93,6 +102,7 @@ void TitanTableBuilder::AddBlob(const Slice& key, const Slice& value,
index.file_number = blob_handle_->GetNumber();
blob_builder_->Add(record, &index.blob_handle);
RecordTick(stats_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN, index.blob_handle.size);
bytes_written_ += record.size();
if (ok()) {
index.EncodeTo(index_value);
}
......@@ -118,7 +128,8 @@ Status TitanTableBuilder::Finish() {
"Titan table builder finish output file %" PRIu64 ".",
blob_handle_->GetNumber());
std::shared_ptr<BlobFileMeta> file = std::make_shared<BlobFileMeta>(
blob_handle_->GetNumber(), blob_handle_->GetFile()->GetFileSize());
blob_handle_->GetNumber(), blob_handle_->GetFile()->GetFileSize(), 0,
0, blob_builder_->GetSmallestKey(), blob_builder_->GetLargestKey());
file->FileStateTransit(BlobFileMeta::FileEvent::kFlushOrCompactionOutput);
status_ =
blob_manager_->FinishFile(cf_id_, file, std::move(blob_handle_));
......@@ -135,6 +146,7 @@ Status TitanTableBuilder::Finish() {
"Titan table builder failed on finish: %s",
status_.ToString().c_str());
}
UpdateInternalOpStats();
return status();
}
......@@ -166,5 +178,33 @@ TableProperties TitanTableBuilder::GetTableProperties() const {
return base_builder_->GetTableProperties();
}
void TitanTableBuilder::UpdateInternalOpStats() {
if (stats_ == nullptr) {
return;
}
TitanInternalStats* internal_stats = stats_->internal_stats(cf_id_);
if (internal_stats == nullptr) {
return;
}
InternalOpType op_type = InternalOpType::COMPACTION;
if (level_ == 0) {
op_type = InternalOpType::FLUSH;
}
InternalOpStats* internal_op_stats =
internal_stats->GetInternalOpStatsForType(op_type);
assert(internal_op_stats != nullptr);
AddStats(internal_op_stats, InternalOpStatsType::COUNT);
AddStats(internal_op_stats, InternalOpStatsType::BYTES_READ, bytes_read_);
AddStats(internal_op_stats, InternalOpStatsType::BYTES_WRITTEN,
bytes_written_);
AddStats(internal_op_stats, InternalOpStatsType::IO_BYTES_READ,
io_bytes_read_);
AddStats(internal_op_stats, InternalOpStatsType::IO_BYTES_WRITTEN,
io_bytes_written_);
if (blob_builder_ != nullptr) {
AddStats(internal_op_stats, InternalOpStatsType::OUTPUT_FILE_NUM);
}
}
} // namespace titandb
} // namespace rocksdb
......@@ -2,10 +2,10 @@
#include "blob_file_builder.h"
#include "blob_file_manager.h"
#include "blob_file_set.h"
#include "table/table_builder.h"
#include "titan/options.h"
#include "titan_stats.h"
#include "version_set.h"
namespace rocksdb {
namespace titandb {
......@@ -16,13 +16,15 @@ class TitanTableBuilder : public TableBuilder {
const TitanCFOptions& cf_options,
std::unique_ptr<TableBuilder> base_builder,
std::shared_ptr<BlobFileManager> blob_manager,
std::weak_ptr<BlobStorage> blob_storage, TitanStats* stats)
std::weak_ptr<BlobStorage> blob_storage, int level,
TitanStats* stats)
: cf_id_(cf_id),
db_options_(db_options),
cf_options_(cf_options),
base_builder_(std::move(base_builder)),
blob_manager_(blob_manager),
blob_storage_(blob_storage),
level_(level),
stats_(stats) {}
void Add(const Slice& key, const Slice& value) override;
......@@ -46,6 +48,8 @@ class TitanTableBuilder : public TableBuilder {
void AddBlob(const Slice& key, const Slice& value, std::string* index_value);
void UpdateInternalOpStats();
Status status_;
uint32_t cf_id_;
TitanDBOptions db_options_;
......@@ -55,8 +59,14 @@ class TitanTableBuilder : public TableBuilder {
std::shared_ptr<BlobFileManager> blob_manager_;
std::unique_ptr<BlobFileBuilder> blob_builder_;
std::weak_ptr<BlobStorage> blob_storage_;
int level_;
TitanStats* stats_;
// counters
uint64_t bytes_read_ = 0;
uint64_t bytes_written_ = 0;
uint64_t io_bytes_read_ = 0;
uint64_t io_bytes_written_ = 0;
};
} // namespace titandb
......
#include "table/table_builder.h"
#include "blob_file_manager.h"
#include "blob_file_reader.h"
#include "table/table_reader.h"
#include "table_factory.h"
#include "util/filename.h"
#include "util/testharness.h"
#include "version_set.h"
#include "blob_file_manager.h"
#include "blob_file_reader.h"
#include "blob_file_set.h"
#include "table_factory.h"
namespace rocksdb {
namespace titandb {
......@@ -80,11 +81,11 @@ class TableBuilderTest : public testing::Test {
blob_name_(BlobFileName(tmpdir_, kTestFileNumber)) {
db_options_.dirname = tmpdir_;
cf_options_.min_blob_size = kMinBlobSize;
vset_.reset(new VersionSet(db_options_, nullptr));
blob_file_set_.reset(new BlobFileSet(db_options_, nullptr));
blob_manager_.reset(new FileManager(db_options_));
table_factory_.reset(new TitanTableFactory(db_options_, cf_options_,
blob_manager_, &mutex_,
vset_.get(), nullptr));
blob_file_set_.get(), nullptr));
}
~TableBuilderTest() {
......@@ -170,7 +171,7 @@ class TableBuilderTest : public testing::Test {
std::string blob_name_;
std::unique_ptr<TableFactory> table_factory_;
std::shared_ptr<BlobFileManager> blob_manager_;
std::unique_ptr<VersionSet> vset_;
std::unique_ptr<BlobFileSet> blob_file_set_;
};
TEST_F(TableBuilderTest, Basic) {
......
......@@ -25,11 +25,11 @@ TableBuilder* TitanTableFactory::NewTableBuilder(
std::weak_ptr<BlobStorage> blob_storage;
{
MutexLock l(db_mutex_);
blob_storage = vset_->GetBlobStorage(column_family_id);
blob_storage = blob_file_set_->GetBlobStorage(column_family_id);
}
return new TitanTableBuilder(column_family_id, db_options_, cf_options,
std::move(base_builder), blob_manager_,
blob_storage, stats_);
blob_storage, options.level, stats_);
}
std::string TitanTableFactory::GetPrintableTableOptions() const {
......
......@@ -3,10 +3,10 @@
#include <atomic>
#include "blob_file_manager.h"
#include "blob_file_set.h"
#include "rocksdb/table.h"
#include "titan/options.h"
#include "titan_stats.h"
#include "version_set.h"
namespace rocksdb {
namespace titandb {
......@@ -16,14 +16,15 @@ class TitanTableFactory : public TableFactory {
TitanTableFactory(const TitanDBOptions& db_options,
const TitanCFOptions& cf_options,
std::shared_ptr<BlobFileManager> blob_manager,
port::Mutex* db_mutex, VersionSet* vset, TitanStats* stats)
port::Mutex* db_mutex, BlobFileSet* blob_file_set,
TitanStats* stats)
: db_options_(db_options),
cf_options_(cf_options),
blob_run_mode_(cf_options.blob_run_mode),
base_factory_(cf_options.table_factory),
blob_manager_(blob_manager),
db_mutex_(db_mutex),
vset_(vset),
blob_file_set_(blob_file_set),
stats_(stats) {}
const char* Name() const override { return "TitanTable"; }
......@@ -67,7 +68,7 @@ class TitanTableFactory : public TableFactory {
std::shared_ptr<TableFactory> base_factory_;
std::shared_ptr<BlobFileManager> blob_manager_;
port::Mutex* db_mutex_;
VersionSet* vset_;
BlobFileSet* blob_file_set_;
TitanStats* stats_;
};
......
......@@ -4,8 +4,10 @@
#include "blob_file_iterator.h"
#include "blob_file_reader.h"
#include "blob_file_size_collector.h"
#include "db_impl.h"
#include "db_iter.h"
#include "monitoring/statistics.h"
#include "rocksdb/utilities/debug.h"
#include "titan/db.h"
#include "titan_fault_injection_test_env.h"
......@@ -36,6 +38,7 @@ class TitanDBTest : public testing::Test {
options_.merge_small_file_threshold = 0;
options_.disable_background_gc = true;
options_.blob_file_compression = CompressionType::kLZ4Compression;
options_.statistics = CreateDBStatistics();
DeleteDir(env_, options_.dirname);
DeleteDir(env_, dbname_);
}
......@@ -44,6 +47,8 @@ class TitanDBTest : public testing::Test {
Close();
DeleteDir(env_, options_.dirname);
DeleteDir(env_, dbname_);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
void Open() {
......@@ -101,7 +106,7 @@ class TitanDBTest : public testing::Test {
}
Status LogAndApply(VersionEdit& edit) {
return db_impl_->vset_->LogAndApply(edit);
return db_impl_->blob_file_set_->LogAndApply(edit);
}
void Put(uint64_t k, std::map<std::string, std::string>* data = nullptr) {
......@@ -117,6 +122,15 @@ class TitanDBTest : public testing::Test {
}
}
void Delete(uint64_t k) {
WriteOptions wopts;
std::string key = GenKey(k);
ASSERT_OK(db_->Delete(wopts, key));
for (auto& handle : cf_handles_) {
ASSERT_OK(db_->Delete(wopts, handle, key));
}
}
void Flush() {
FlushOptions fopts;
ASSERT_OK(db_->Flush(fopts));
......@@ -125,13 +139,17 @@ class TitanDBTest : public testing::Test {
}
}
bool GetIntProperty(const Slice& property, uint64_t* value) {
return db_->GetIntProperty(property, value);
}
std::weak_ptr<BlobStorage> GetBlobStorage(
ColumnFamilyHandle* cf_handle = nullptr) {
if (cf_handle == nullptr) {
cf_handle = db_->DefaultColumnFamily();
}
MutexLock l(&db_impl_->mutex_);
return db_impl_->vset_->GetBlobStorage(cf_handle->GetID());
return db_impl_->blob_file_set_->GetBlobStorage(cf_handle->GetID());
}
void VerifyDB(const std::map<std::string, std::string>& data,
......@@ -251,10 +269,17 @@ class TitanDBTest : public testing::Test {
}
void CallGC() {
{
MutexLock l(&db_impl_->mutex_);
db_impl_->bg_gc_scheduled_++;
}
db_impl_->BackgroundCallGC();
while (db_impl_->bg_gc_scheduled_)
;
{
MutexLock l(&db_impl_->mutex_);
while (db_impl_->bg_gc_scheduled_) {
db_impl_->bg_cv_.Wait();
}
}
}
// Make db ignore first bg_error
......@@ -356,6 +381,20 @@ TEST_F(TitanDBTest, DBIterSeek) {
}
}
TEST_F(TitanDBTest, GetProperty) {
Open();
for (uint64_t k = 1; k <= 100; k++) {
Put(k);
}
Flush();
uint64_t value;
ASSERT_TRUE(GetIntProperty(TitanDB::Properties::kNumLiveBlobFile, &value));
ASSERT_EQ(value, 1);
Reopen();
ASSERT_TRUE(GetIntProperty(TitanDB::Properties::kNumLiveBlobFile, &value));
ASSERT_EQ(value, 1);
}
TEST_F(TitanDBTest, Snapshot) {
Open();
std::map<std::string, std::string> data;
......@@ -429,6 +468,17 @@ TEST_F(TitanDBTest, IngestExternalFiles) {
}
}
TEST_F(TitanDBTest, NewColumnFamilyHasBlobFileSizeCollector) {
Open();
AddCF("new_cf");
Options opt = db_->GetOptions(cf_handles_.back());
ASSERT_EQ(1, opt.table_properties_collector_factories.size());
std::unique_ptr<BlobFileSizeCollectorFactory> prop_collector_factory(
new BlobFileSizeCollectorFactory());
ASSERT_EQ(std::string(prop_collector_factory->Name()),
std::string(opt.table_properties_collector_factories[0]->Name()));
}
TEST_F(TitanDBTest, DropColumnFamily) {
Open();
const uint64_t kNumCF = 3;
......@@ -552,7 +602,7 @@ TEST_F(TitanDBTest, VersionEditError) {
auto cf_id = db_->DefaultColumnFamily()->GetID();
VersionEdit edit;
edit.SetColumnFamilyID(cf_id);
edit.AddBlobFile(std::make_shared<BlobFileMeta>(1, 1));
edit.AddBlobFile(std::make_shared<BlobFileMeta>(1, 1, 0, 0, "", ""));
ASSERT_OK(LogAndApply(edit));
VerifyDB(data);
......@@ -560,7 +610,7 @@ TEST_F(TitanDBTest, VersionEditError) {
// add same blob file twice
VersionEdit edit1;
edit1.SetColumnFamilyID(cf_id);
edit1.AddBlobFile(std::make_shared<BlobFileMeta>(1, 1));
edit1.AddBlobFile(std::make_shared<BlobFileMeta>(1, 1, 0, 0, "", ""));
ASSERT_NOK(LogAndApply(edit));
Reopen();
......@@ -876,6 +926,7 @@ TEST_F(TitanDBTest, FallbackModeEncounterMissingBlobFile) {
ASSERT_EQ(1, GetBlobStorage().lock()->NumBlobFiles());
ASSERT_OK(db_->Delete(WriteOptions(), "foo"));
ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
uint32_t default_cf_id = db_->DefaultColumnFamily()->GetID();
// GC the first blob file.
ASSERT_OK(db_impl_->TEST_StartGC(default_cf_id));
......@@ -922,8 +973,11 @@ TEST_F(TitanDBTest, BackgroundErrorTrigger) {
Put(i, &data);
}
Flush();
for (uint64_t i = 1; i <= kNumEntries; i++) {
Delete(i);
}
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
SyncPoint::GetInstance()->SetCallBack("VersionSet::LogAndApply", [&](void*) {
SyncPoint::GetInstance()->SetCallBack("BlobFileSet::LogAndApply", [&](void*) {
mock_env->SetFilesystemActive(false, Status::IOError("Injected error"));
});
SyncPoint::GetInstance()->EnableProcessing();
......@@ -934,6 +988,89 @@ TEST_F(TitanDBTest, BackgroundErrorTrigger) {
Close();
}
// Make sure DropColumnFamilies() will wait if there's running GC job.
TEST_F(TitanDBTest, DropCFWhileGC) {
options_.min_blob_size = 0;
options_.blob_file_discardable_ratio = 0.1;
options_.disable_background_gc = false;
Open();
// Create CF.
std::vector<TitanCFDescriptor> descs = {{"new_cf", options_}};
std::vector<ColumnFamilyHandle*> handles;
ASSERT_OK(db_->CreateColumnFamilies(descs, &handles));
ASSERT_EQ(1, handles.size());
auto* cfh = handles[0];
SyncPoint::GetInstance()->LoadDependency(
{{"TitanDBImpl::BackgroundCallGC:BeforeBackgroundGC",
"TitanDBImpl::DropColumnFamilies:Begin"}});
SyncPoint::GetInstance()->SetCallBack(
"TitanDBImpl::DropColumnFamilies:BeforeBaseDBDropCF",
[&](void*) { ASSERT_EQ(0, db_impl_->TEST_bg_gc_running()); });
SyncPoint::GetInstance()->EnableProcessing();
// Create two blob files, and trigger GC of the first one.
ASSERT_OK(db_->Put(WriteOptions(), cfh, "foo", "v1"));
ASSERT_OK(db_->Put(WriteOptions(), cfh, "bar", "v1"));
ASSERT_OK(db_->Flush(FlushOptions(), cfh));
ASSERT_OK(db_->Put(WriteOptions(), cfh, "foo", "v2"));
ASSERT_OK(db_->Flush(FlushOptions(), cfh));
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), cfh, nullptr, nullptr));
// Verify no GC job is running while we drop the CF.
ASSERT_OK(db_->DropColumnFamilies(handles));
// Cleanup.
ASSERT_OK(db_->DestroyColumnFamilyHandle(cfh));
Close();
}
// Make sure GC job will not run on a dropped CF.
TEST_F(TitanDBTest, GCAfterDropCF) {
options_.min_blob_size = 0;
options_.blob_file_discardable_ratio = 0.1;
options_.disable_background_gc = false;
Open();
// Create CF.
std::vector<TitanCFDescriptor> descs = {{"new_cf", options_}};
std::vector<ColumnFamilyHandle*> handles;
ASSERT_OK(db_->CreateColumnFamilies(descs, &handles));
ASSERT_EQ(1, handles.size());
auto* cfh = handles[0];
std::atomic<int> skip_dropped_cf_count{0};
SyncPoint::GetInstance()->LoadDependency(
{{"TitanDBTest::GCAfterDropCF:AfterDropCF",
"TitanDBImpl::BackgroundCallGC:BeforeGCRunning"},
{"TitanDBImpl::BackgroundGC:Finish",
"TitanDBTest::GCAfterDropCF:WaitGC"}});
SyncPoint::GetInstance()->SetCallBack(
"TitanDBImpl::BackgroundGC:CFDropped",
[&](void*) { skip_dropped_cf_count++; });
SyncPoint::GetInstance()->EnableProcessing();
// Create two blob files, and trigger GC of the first one.
ASSERT_OK(db_->Put(WriteOptions(), cfh, "foo", "v1"));
ASSERT_OK(db_->Put(WriteOptions(), cfh, "bar", "v1"));
ASSERT_OK(db_->Flush(FlushOptions(), cfh));
ASSERT_OK(db_->Put(WriteOptions(), cfh, "foo", "v2"));
ASSERT_OK(db_->Flush(FlushOptions(), cfh));
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), cfh, nullptr, nullptr));
// Drop CF before GC runs. Check if GC job skip the dropped CF.
ASSERT_OK(db_->DropColumnFamilies(handles));
TEST_SYNC_POINT("TitanDBTest::GCAfterDropCF:AfterDropCF");
TEST_SYNC_POINT("TitanDBTest::GCAfterDropCF:WaitGC");
ASSERT_EQ(1, skip_dropped_cf_count.load());
// Cleanup.
ASSERT_OK(db_->DestroyColumnFamilyHandle(cfh));
Close();
}
} // namespace titandb
} // namespace rocksdb
......
#include "util/testharness.h"
#include "titan/db.h"
namespace rocksdb {
namespace titandb {
class TitanOptionsTest : public testing::Test {
public:
TitanOptionsTest() : db_name_(test::TmpDir()) {
titan_options_.create_if_missing = true;
titan_options_.dirname = db_name_ + "/titandb";
}
~TitanOptionsTest() {
Status s = Close();
assert(s.ok());
}
Status Open() { return TitanDB::Open(titan_options_, db_name_, &titan_db); }
Status DeleteDir(const std::string& dirname) {
Status s;
Env* env = Env::Default();
std::vector<std::string> filenames;
s = env->GetChildren(dirname, &filenames);
if (!s.ok()) {
return s;
}
for (auto& fname : filenames) {
s = env->DeleteFile(dirname + "/" + fname);
if (!s.ok()) {
return s;
}
}
s = env->DeleteDir(dirname);
return s;
}
Status Close() {
Status s;
if (titan_db != nullptr) {
s = titan_db->Close();
if (!s.ok()) {
return s;
}
titan_db = nullptr;
s = DeleteDir(titan_options_.dirname);
if (!s.ok()) {
return s;
}
rocksdb::Options opts;
s = rocksdb::DestroyDB(db_name_, opts);
}
return s;
}
protected:
std::string db_name_;
TitanOptions titan_options_;
TitanDB* titan_db = nullptr;
};
TEST_F(TitanOptionsTest, PurgeObsoleteFilesPeriodSec) {
titan_options_.purge_obsolete_files_period_sec = 0;
Status s = Open();
ASSERT_TRUE(s.IsInvalidArgument());
}
} // namespace titandb
} // namespace rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
......@@ -14,6 +14,16 @@ static const std::string num_live_blob_file = "num-live-blob-file";
static const std::string num_obsolete_blob_file = "num-obsolete-blob-file";
static const std::string live_blob_file_size = "live-blob-file-size";
static const std::string obsolete_blob_file_size = "obsolete-blob-file-size";
static const std::string num_discardable_ratio_le0_file =
"num-discardable-ratio-le0-file";
static const std::string num_discardable_ratio_le20_file =
"num-discardable-ratio-le20-file";
static const std::string num_discardable_ratio_le50_file =
"num-discardable-ratio-le50-file";
static const std::string num_discardable_ratio_le80_file =
"num-discardable-ratio-le80-file";
static const std::string num_discardable_ratio_le100_file =
"num-discardable-ratio-le100-file";
const std::string TitanDB::Properties::kLiveBlobSize =
titandb_prefix + live_blob_size;
......@@ -25,6 +35,16 @@ const std::string TitanDB::Properties::kLiveBlobFileSize =
titandb_prefix + live_blob_file_size;
const std::string TitanDB::Properties::kObsoleteBlobFileSize =
titandb_prefix + obsolete_blob_file_size;
const std::string TitanDB::Properties::kNumDiscardableRatioLE0File =
titandb_prefix + num_discardable_ratio_le0_file;
const std::string TitanDB::Properties::kNumDiscardableRatioLE20File =
titandb_prefix + num_discardable_ratio_le20_file;
const std::string TitanDB::Properties::kNumDiscardableRatioLE50File =
titandb_prefix + num_discardable_ratio_le50_file;
const std::string TitanDB::Properties::kNumDiscardableRatioLE80File =
titandb_prefix + num_discardable_ratio_le80_file;
const std::string TitanDB::Properties::kNumDiscardableRatioLE100File =
titandb_prefix + num_discardable_ratio_le100_file;
const std::unordered_map<std::string, TitanInternalStats::StatsType>
TitanInternalStats::stats_type_string_map = {
......@@ -38,7 +58,69 @@ const std::unordered_map<std::string, TitanInternalStats::StatsType>
TitanInternalStats::LIVE_BLOB_FILE_SIZE},
{TitanDB::Properties::kObsoleteBlobFileSize,
TitanInternalStats::OBSOLETE_BLOB_FILE_SIZE},
{TitanDB::Properties::kNumDiscardableRatioLE0File,
TitanInternalStats::NUM_DISCARDABLE_RATIO_LE0},
{TitanDB::Properties::kNumDiscardableRatioLE20File,
TitanInternalStats::NUM_DISCARDABLE_RATIO_LE20},
{TitanDB::Properties::kNumDiscardableRatioLE50File,
TitanInternalStats::NUM_DISCARDABLE_RATIO_LE50},
{TitanDB::Properties::kNumDiscardableRatioLE80File,
TitanInternalStats::NUM_DISCARDABLE_RATIO_LE80},
{TitanDB::Properties::kNumDiscardableRatioLE100File,
TitanInternalStats::NUM_DISCARDABLE_RATIO_LE100},
};
const std::array<std::string,
static_cast<int>(InternalOpType::INTERNAL_OP_ENUM_MAX)>
TitanInternalStats::internal_op_names = {
"Flush ",
"Compaction",
"GC ",
};
void TitanInternalStats::DumpAndResetInternalOpStats(LogBuffer* log_buffer) {
constexpr double GB = 1.0 * 1024 * 1024 * 1024;
constexpr double SECOND = 1.0 * 1000000;
LogToBuffer(log_buffer,
"OP COUNT READ(GB) WRITE(GB) IO_READ(GB) IO_WRITE(GB) "
" FILE_IN FILE_OUT");
LogToBuffer(log_buffer,
"----------------------------------------------------------------"
"-----------------");
for (int op = 0; op < static_cast<int>(InternalOpType::INTERNAL_OP_ENUM_MAX);
op++) {
LogToBuffer(
log_buffer,
"%s %5d %10.1f %10.1f %10.1f %10.1f %8d %8d %10.1f %10.1f %10.1f",
internal_op_names[op].c_str(),
GetAndResetStats(&internal_op_stats_[op], InternalOpStatsType::COUNT),
GetAndResetStats(&internal_op_stats_[op],
InternalOpStatsType::BYTES_READ) /
GB,
GetAndResetStats(&internal_op_stats_[op],
InternalOpStatsType::BYTES_WRITTEN) /
GB,
GetAndResetStats(&internal_op_stats_[op],
InternalOpStatsType::IO_BYTES_READ) /
GB,
GetAndResetStats(&internal_op_stats_[op],
InternalOpStatsType::IO_BYTES_WRITTEN) /
GB,
GetAndResetStats(&internal_op_stats_[op],
InternalOpStatsType::INPUT_FILE_NUM),
GetAndResetStats(&internal_op_stats_[op],
InternalOpStatsType::OUTPUT_FILE_NUM),
GetAndResetStats(&internal_op_stats_[op],
InternalOpStatsType::GC_SAMPLING_MICROS) /
SECOND,
GetAndResetStats(&internal_op_stats_[op],
InternalOpStatsType::GC_READ_LSM_MICROS) /
SECOND,
GetAndResetStats(&internal_op_stats_[op],
InternalOpStatsType::GC_UPDATE_LSM_MICROS) /
SECOND);
}
}
} // namespace titandb
} // namespace rocksdb
This diff is collapsed.
#include "util.h"
#include "util/stop_watch.h"
namespace rocksdb {
namespace titandb {
......@@ -157,5 +159,12 @@ void UnrefCacheHandle(void* arg1, void* arg2) {
cache->Release(h);
}
Status SyncTitanManifest(Env* env, TitanStats* stats,
const ImmutableDBOptions* db_options,
WritableFileWriter* file) {
StopWatch sw(env, stats, TitanStats::TITAN_MANIFEST_FILE_SYNC_MICROS);
return file->Sync(db_options->use_fsync);
}
} // namespace titandb
} // namespace rocksdb
#pragma once
#include "options/db_options.h"
#include "rocksdb/cache.h"
#include "util/compression.h"
#include "util/file_reader_writer.h"
#include "titan_stats.h"
namespace rocksdb {
namespace titandb {
......@@ -71,5 +75,9 @@ void DeleteCacheValue(const Slice&, void* value) {
delete reinterpret_cast<T*>(value);
}
Status SyncTitanManifest(Env* env, TitanStats* stats,
const ImmutableDBOptions* db_options,
WritableFileWriter* file);
} // namespace titandb
} // namespace rocksdb
......@@ -5,13 +5,6 @@
namespace rocksdb {
namespace titandb {
enum Tag {
kNextFileNumber = 1,
kColumnFamilyID = 10,
kAddedBlobFile = 11,
kDeletedBlobFile = 12,
};
void VersionEdit::EncodeTo(std::string* dst) const {
if (has_next_file_number_) {
PutVarint32Varint64(dst, kNextFileNumber, next_file_number_);
......@@ -20,7 +13,7 @@ void VersionEdit::EncodeTo(std::string* dst) const {
PutVarint32Varint32(dst, kColumnFamilyID, column_family_id_);
for (auto& file : added_files_) {
PutVarint32(dst, kAddedBlobFile);
PutVarint32(dst, kAddedBlobFileV2);
file->EncodeTo(dst);
}
for (auto& file : deleted_files_) {
......@@ -33,6 +26,7 @@ Status VersionEdit::DecodeFrom(Slice* src) {
uint32_t tag;
uint64_t file_number;
std::shared_ptr<BlobFileMeta> blob_file;
Status s;
const char* error = nullptr;
while (!error && !src->empty()) {
......@@ -54,12 +48,23 @@ Status VersionEdit::DecodeFrom(Slice* src) {
error = "column family id";
}
break;
// for compatibility issue
case kAddedBlobFile:
blob_file = std::make_shared<BlobFileMeta>();
if (blob_file->DecodeFrom(src).ok()) {
s = blob_file->DecodeFromLegacy(src);
if (s.ok()) {
AddBlobFile(blob_file);
} else {
error = s.ToString().c_str();
}
break;
case kAddedBlobFileV2:
blob_file = std::make_shared<BlobFileMeta>();
s = blob_file->DecodeFrom(src);
if (s.ok()) {
AddBlobFile(blob_file);
} else {
error = "added blob file";
error = s.ToString().c_str();
}
break;
case kDeletedBlobFile:
......
......@@ -10,6 +10,15 @@
namespace rocksdb {
namespace titandb {
enum Tag {
kNextFileNumber = 1,
kColumnFamilyID = 10,
kAddedBlobFile = 11,
kDeletedBlobFile = 12, // Deprecated, leave here for backward compatibility
kAddedBlobFileV2 = 13, // Comparing to kAddedBlobFile, it newly includes
// smallest_key and largest_key of blob file
};
class VersionEdit {
public:
void SetNextFileNumber(uint64_t v) {
......@@ -34,7 +43,8 @@ class VersionEdit {
friend bool operator==(const VersionEdit& lhs, const VersionEdit& rhs);
private:
friend class VersionSet;
friend class BlobFileSet;
friend class VersionTest;
friend class EditCollector;
bool has_next_file_number_{false};
......
#include "blob_file_set.h"
#include "blob_format.h"
#include "edit_collector.h"
#include "testutil.h"
#include "util.h"
#include "util/filename.h"
#include "util/testharness.h"
#include "version_edit.h"
#include "version_set.h"
namespace rocksdb {
namespace titandb {
......@@ -28,7 +29,7 @@ class VersionTest : public testing::Test {
TitanCFOptions cf_options_;
std::shared_ptr<BlobFileCache> file_cache_;
std::map<uint32_t, std::shared_ptr<BlobStorage>> column_families_;
std::unique_ptr<VersionSet> vset_;
std::unique_ptr<BlobFileSet> blob_file_set_;
port::Mutex mutex_;
std::string dbname_;
Env* env_;
......@@ -48,8 +49,8 @@ class VersionTest : public testing::Test {
DeleteDir(env_, db_options_.dirname);
env_->CreateDirIfMissing(db_options_.dirname);
vset_.reset(new VersionSet(db_options_, nullptr));
ASSERT_OK(vset_->Open({}));
blob_file_set_.reset(new BlobFileSet(db_options_, nullptr));
ASSERT_OK(blob_file_set_->Open({}));
column_families_.clear();
// Sets up some column families.
for (uint32_t id = 0; id < 10; id++) {
......@@ -59,14 +60,14 @@ class VersionTest : public testing::Test {
column_families_.emplace(id, storage);
storage.reset(
new BlobStorage(db_options_, cf_options_, id, file_cache_, nullptr));
vset_->column_families_.emplace(id, storage);
blob_file_set_->column_families_.emplace(id, storage);
}
}
void AddBlobFiles(uint32_t cf_id, uint64_t start, uint64_t end) {
auto storage = column_families_[cf_id];
for (auto i = start; i < end; i++) {
auto file = std::make_shared<BlobFileMeta>(i, i);
auto file = std::make_shared<BlobFileMeta>(i, i, 0, 0, "", "");
storage->files_.emplace(i, file);
}
}
......@@ -83,9 +84,9 @@ class VersionTest : public testing::Test {
for (auto& edit : edits) {
ASSERT_OK(collector.AddEdit(edit));
}
ASSERT_OK(collector.Seal(*vset_.get()));
ASSERT_OK(collector.Apply(*vset_.get()));
for (auto& it : vset_->column_families_) {
ASSERT_OK(collector.Seal(*blob_file_set_.get()));
ASSERT_OK(collector.Apply(*blob_file_set_.get()));
for (auto& it : blob_file_set_->column_families_) {
auto& storage = column_families_[it.first];
// ignore obsolete file
auto size = 0;
......@@ -104,7 +105,21 @@ class VersionTest : public testing::Test {
}
void CheckColumnFamiliesSize(uint64_t size) {
ASSERT_EQ(vset_->column_families_.size(), size);
ASSERT_EQ(blob_file_set_->column_families_.size(), size);
}
void LegacyEncode(const VersionEdit& edit, std::string* dst) {
PutVarint32Varint32(dst, Tag::kColumnFamilyID, edit.column_family_id_);
for (auto& file : edit.added_files_) {
PutVarint32(dst, Tag::kAddedBlobFile);
PutVarint64(dst, file->file_number());
PutVarint64(dst, file->file_size());
}
for (auto& file : edit.deleted_files_) {
// obsolete sequence is a inpersistent field, so no need to encode it.
PutVarint32Varint64(dst, Tag::kDeletedBlobFile, file.first);
}
}
};
......@@ -114,8 +129,8 @@ TEST_F(VersionTest, VersionEdit) {
input.SetNextFileNumber(1);
input.SetColumnFamilyID(2);
CheckCodec(input);
auto file1 = std::make_shared<BlobFileMeta>(3, 4);
auto file2 = std::make_shared<BlobFileMeta>(5, 6);
auto file1 = std::make_shared<BlobFileMeta>(3, 4, 0, 0, "", "");
auto file2 = std::make_shared<BlobFileMeta>(5, 6, 0, 0, "", "");
input.AddBlobFile(file1);
input.AddBlobFile(file2);
input.DeleteBlobFile(7);
......@@ -127,7 +142,7 @@ VersionEdit AddBlobFilesEdit(uint32_t cf_id, uint64_t start, uint64_t end) {
VersionEdit edit;
edit.SetColumnFamilyID(cf_id);
for (auto i = start; i < end; i++) {
auto file = std::make_shared<BlobFileMeta>(i, i);
auto file = std::make_shared<BlobFileMeta>(i, i, 0, 0, "", "");
edit.AddBlobFile(file);
}
return edit;
......@@ -148,8 +163,8 @@ TEST_F(VersionTest, InvalidEdit) {
auto add1_0_4 = AddBlobFilesEdit(1, 0, 4);
EditCollector collector;
ASSERT_OK(collector.AddEdit(add1_0_4));
ASSERT_OK(collector.Seal(*vset_.get()));
ASSERT_OK(collector.Apply(*vset_.get()));
ASSERT_OK(collector.Seal(*blob_file_set_.get()));
ASSERT_OK(collector.Apply(*blob_file_set_.get()));
}
// delete nonexistent blobs
......@@ -157,8 +172,8 @@ TEST_F(VersionTest, InvalidEdit) {
auto del1_4_6 = DeleteBlobFilesEdit(1, 4, 6);
EditCollector collector;
ASSERT_OK(collector.AddEdit(del1_4_6));
ASSERT_NOK(collector.Seal(*vset_.get()));
ASSERT_NOK(collector.Apply(*vset_.get()));
ASSERT_NOK(collector.Seal(*blob_file_set_.get()));
ASSERT_NOK(collector.Apply(*blob_file_set_.get()));
}
// add already existing blobs
......@@ -166,8 +181,8 @@ TEST_F(VersionTest, InvalidEdit) {
auto add1_1_3 = AddBlobFilesEdit(1, 1, 3);
EditCollector collector;
ASSERT_OK(collector.AddEdit(add1_1_3));
ASSERT_NOK(collector.Seal(*vset_.get()));
ASSERT_NOK(collector.Apply(*vset_.get()));
ASSERT_NOK(collector.Seal(*blob_file_set_.get()));
ASSERT_NOK(collector.Apply(*blob_file_set_.get()));
}
// add same blobs
......@@ -177,8 +192,8 @@ TEST_F(VersionTest, InvalidEdit) {
EditCollector collector;
ASSERT_OK(collector.AddEdit(add1_4_5_1));
ASSERT_NOK(collector.AddEdit(add1_4_5_2));
ASSERT_NOK(collector.Seal(*vset_.get()));
ASSERT_NOK(collector.Apply(*vset_.get()));
ASSERT_NOK(collector.Seal(*blob_file_set_.get()));
ASSERT_NOK(collector.Apply(*blob_file_set_.get()));
}
// delete same blobs
......@@ -188,8 +203,8 @@ TEST_F(VersionTest, InvalidEdit) {
EditCollector collector;
ASSERT_OK(collector.AddEdit(del1_3_4_1));
ASSERT_NOK(collector.AddEdit(del1_3_4_2));
ASSERT_NOK(collector.Seal(*vset_.get()));
ASSERT_NOK(collector.Apply(*vset_.get()));
ASSERT_NOK(collector.Seal(*blob_file_set_.get()));
ASSERT_NOK(collector.Apply(*blob_file_set_.get()));
}
}
......@@ -236,38 +251,52 @@ TEST_F(VersionTest, ObsoleteFiles) {
std::map<uint32_t, TitanCFOptions> m;
m.insert({1, TitanCFOptions()});
m.insert({2, TitanCFOptions()});
vset_->AddColumnFamilies(m);
blob_file_set_->AddColumnFamilies(m);
{
auto add1_1_5 = AddBlobFilesEdit(1, 1, 5);
MutexLock l(&mutex_);
vset_->LogAndApply(add1_1_5);
blob_file_set_->LogAndApply(add1_1_5);
}
std::vector<std::string> of;
vset_->GetObsoleteFiles(&of, kMaxSequenceNumber);
blob_file_set_->GetObsoleteFiles(&of, kMaxSequenceNumber);
ASSERT_EQ(of.size(), 0);
{
auto del1_4_5 = DeleteBlobFilesEdit(1, 4, 5);
MutexLock l(&mutex_);
vset_->LogAndApply(del1_4_5);
blob_file_set_->LogAndApply(del1_4_5);
}
vset_->GetObsoleteFiles(&of, kMaxSequenceNumber);
blob_file_set_->GetObsoleteFiles(&of, kMaxSequenceNumber);
ASSERT_EQ(of.size(), 1);
std::vector<uint32_t> cfs = {1};
ASSERT_OK(vset_->DropColumnFamilies(cfs, 0));
vset_->GetObsoleteFiles(&of, kMaxSequenceNumber);
ASSERT_OK(blob_file_set_->DropColumnFamilies(cfs, 0));
blob_file_set_->GetObsoleteFiles(&of, kMaxSequenceNumber);
ASSERT_EQ(of.size(), 1);
CheckColumnFamiliesSize(10);
ASSERT_OK(vset_->DestroyColumnFamily(1));
vset_->GetObsoleteFiles(&of, kMaxSequenceNumber);
ASSERT_OK(blob_file_set_->MaybeDestroyColumnFamily(1));
blob_file_set_->GetObsoleteFiles(&of, kMaxSequenceNumber);
ASSERT_EQ(of.size(), 4);
CheckColumnFamiliesSize(9);
ASSERT_OK(vset_->DestroyColumnFamily(2));
ASSERT_OK(blob_file_set_->MaybeDestroyColumnFamily(2));
CheckColumnFamiliesSize(8);
}
TEST_F(VersionTest, BlobFileMetaV1ToV2) {
VersionEdit edit;
edit.SetColumnFamilyID(1);
edit.AddBlobFile(std::make_shared<BlobFileMeta>(1, 1, 0, 0, "", ""));
edit.DeleteBlobFile(1);
edit.AddBlobFile(std::make_shared<BlobFileMeta>(2, 2, 0, 0, "", ""));
std::string str;
LegacyEncode(edit, &str);
VersionEdit edit1;
ASSERT_OK(DecodeInto(Slice(str), &edit1));
CheckCodec(edit1);
}
} // namespace titandb
} // namespace rocksdb
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment