Unverified Commit 468ddc97 authored by yiwu-arbug's avatar yiwu-arbug Committed by GitHub

Add internal operation stats and dump to info log periodically (#62)

Summary:
Add stats for read/write bytes and input/output file of internal operations and break down by operation type (being flush, compaction or GC). Later after we introduce vtable we can further breakdown compaction into top levels compaction, L_n-1 compaction and Ln compaction.

Test Plan:
Run db_bench and get sample output:
```
2019/08/29-23:49:29.593805 7f6fd97fa700 (Original Log Time 2019/08/29-23:49:29.593742) Titan internal stats for column family [default]:
2019/08/29-23:49:29.593813 7f6fd97fa700 (Original Log Time 2019/08/29-23:49:29.593753) OP          COUNT  READ(GB)  WRITE(GB) IO_READ(GB) IO_WRITE(GB)  FILE_IN FILE_OUT
2019/08/29-23:49:29.593821 7f6fd97fa700 (Original Log Time 2019/08/29-23:49:29.593760) ---------------------------------------------------------------------------------
2019/08/29-23:49:29.593830 7f6fd97fa700 (Original Log Time 2019/08/29-23:49:29.593768) Flush          3        0.0       34.7         0.0         18.0        0        3
2019/08/29-23:49:29.593839 7f6fd97fa700 (Original Log Time 2019/08/29-23:49:29.593784) Compaction     1        0.0        0.0         0.0          0.0        0        0
2019/08/29-23:49:29.593848 7f6fd97fa700 (Original Log Time 2019/08/29-23:49:29.593793) GC             1      360.9        1.3       354.3          2.8        8        0
```
parent c99cf9d2
......@@ -96,6 +96,7 @@ if (WITH_TITAN_TESTS AND (NOT CMAKE_BUILD_TYPE STREQUAL "Release"))
table_builder_test
thread_safety_test
titan_db_test
titan_options_test
util_test
version_test)
set(TEST_LIBS
......
......@@ -29,7 +29,13 @@ struct TitanDBOptions : public DBOptions {
// How often to schedule delete obsolete blob files periods
//
// Default: 10
uint32_t purge_obsolete_files_period{10}; // 10s
uint32_t purge_obsolete_files_period_sec{10}; // 10s
// If non-zero, dump titan internal stats to info log every
// titan_stats_dump_period_sec.
//
// Default: 600 (10 min)
uint32_t titan_stats_dump_period_sec{600};
TitanDBOptions() = default;
explicit TitanDBOptions(const DBOptions& options) : DBOptions(options) {}
......
......@@ -27,6 +27,8 @@ struct BlobRecord {
void EncodeTo(std::string* dst) const;
Status DecodeFrom(Slice* src);
size_t size() const { return key.size() + value.size(); }
friend bool operator==(const BlobRecord& lhs, const BlobRecord& rhs);
};
......
......@@ -112,7 +112,10 @@ BlobGCJob::~BlobGCJob() {
RecordTick(stats_, BLOB_DB_GC_NUM_FILES, metrics_.blob_db_gc_num_files);
}
Status BlobGCJob::Prepare() { return Status::OK(); }
Status BlobGCJob::Prepare() {
SavePrevIOBytes(&prev_bytes_read_, &prev_bytes_written_);
return Status::OK();
}
Status BlobGCJob::Run() {
Status s = SampleCandidateFiles();
......@@ -331,8 +334,7 @@ Status BlobGCJob::DoRunGC() {
blob_record.value = gc_iter->value();
// count written bytes for new blob record,
// blob index's size is counted in `RewriteValidKeyToLSM`
metrics_.blob_db_bytes_written +=
blob_record.key.size() + blob_record.value.size();
metrics_.blob_db_bytes_written += blob_record.size();
BlobIndex new_blob_index;
new_blob_index.file_number = blob_file_handle->GetNumber();
......@@ -455,6 +457,10 @@ Status BlobGCJob::Finish() {
s = DeleteInputBlobFiles();
}
if (s.ok()) {
UpdateInternalOpStats();
}
return s;
}
......@@ -583,5 +589,34 @@ bool BlobGCJob::IsShutingDown() {
return (shuting_down_ && shuting_down_->load(std::memory_order_acquire));
}
void BlobGCJob::UpdateInternalOpStats() {
if (stats_ == nullptr) {
return;
}
UpdateIOBytes(prev_bytes_read_, prev_bytes_written_, &io_bytes_read_,
&io_bytes_written_);
uint32_t cf_id = blob_gc_->column_family_handle()->GetID();
TitanInternalStats* internal_stats = stats_->internal_stats(cf_id);
if (internal_stats == nullptr) {
return;
}
InternalOpStats* internal_op_stats =
internal_stats->GetInternalOpStatsForType(InternalOpType::GC);
assert(internal_op_stats != nullptr);
AddStats(internal_op_stats, InternalOpStatsType::COUNT);
AddStats(internal_op_stats, InternalOpStatsType::BYTES_READ,
metrics_.blob_db_bytes_read);
AddStats(internal_op_stats, InternalOpStatsType::BYTES_WRITTEN,
metrics_.blob_db_bytes_written);
AddStats(internal_op_stats, InternalOpStatsType::IO_BYTES_READ,
io_bytes_read_);
AddStats(internal_op_stats, InternalOpStatsType::IO_BYTES_WRITTEN,
io_bytes_written_);
AddStats(internal_op_stats, InternalOpStatsType::INPUT_FILE_NUM,
metrics_.blob_db_gc_num_files);
AddStats(internal_op_stats, InternalOpStatsType::OUTPUT_FILE_NUM,
metrics_.blob_db_gc_num_new_files);
}
} // namespace titandb
} // namespace rocksdb
......@@ -40,6 +40,8 @@ class BlobGCJob {
class GarbageCollectionWriteCallback;
friend class BlobGCJobTest;
void UpdateInternalOpStats();
BlobGC* blob_gc_;
DB* base_db_;
DBImpl* base_db_impl_;
......@@ -72,6 +74,11 @@ class BlobGCJob {
uint64_t blob_db_gc_num_files = 0;
} metrics_;
uint64_t prev_bytes_read_ = 0;
uint64_t prev_bytes_written_ = 0;
uint64_t io_bytes_read_ = 0;
uint64_t io_bytes_written_ = 0;
Status SampleCandidateFiles();
Status DoSample(const BlobFileMeta* file, bool* selected);
Status DoRunGC();
......
......@@ -6,7 +6,9 @@
#include <inttypes.h>
#include "logging/log_buffer.h"
#include "port/port.h"
#include "util/autovector.h"
#include "base_db_listener.h"
#include "blob_file_builder.h"
......@@ -141,17 +143,35 @@ TitanDBImpl::TitanDBImpl(const TitanDBOptions& options,
TitanDBImpl::~TitanDBImpl() { Close(); }
void TitanDBImpl::StartBackgroundTasks() {
if (!thread_purge_obsolete_) {
if (thread_purge_obsolete_ == nullptr) {
thread_purge_obsolete_.reset(new rocksdb::RepeatableThread(
[this]() { TitanDBImpl::PurgeObsoleteFiles(); }, "titanbg", env_,
db_options_.purge_obsolete_files_period * 1000 * 1000));
db_options_.purge_obsolete_files_period_sec * 1000 * 1000));
}
if (thread_dump_stats_ == nullptr &&
db_options_.titan_stats_dump_period_sec > 0) {
thread_dump_stats_.reset(new rocksdb::RepeatableThread(
[this]() { TitanDBImpl::DumpStats(); }, "titanst", env_,
db_options_.titan_stats_dump_period_sec * 1000 * 1000));
}
}
Status TitanDBImpl::ValidateOptions() const {
if (db_options_.purge_obsolete_files_period_sec == 0) {
return Status::InvalidArgument(
"Require non-zero purge_obsolete_files_period_sec");
}
return Status::OK();
}
Status TitanDBImpl::Open(const std::vector<TitanCFDescriptor>& descs,
std::vector<ColumnFamilyHandle*>* handles) {
Status s = ValidateOptions();
if (!s.ok()) {
return s;
}
// Sets up directories for base DB and Titan.
Status s = env_->CreateDirIfMissing(dbname_);
s = env_->CreateDirIfMissing(dbname_);
if (!s.ok()) return s;
if (!db_options_.info_log) {
s = CreateLoggerFromOptions(dbname_, db_options_, &db_options_.info_log);
......@@ -199,21 +219,22 @@ Status TitanDBImpl::Open(const std::vector<TitanCFDescriptor>& descs,
for (size_t i = 0; i < descs.size(); i++) {
auto handle = (*handles)[i];
uint32_t cf_id = handle->GetID();
std::string cf_name = handle->GetName();
column_families.emplace(cf_id, descs[i].options);
db_->DestroyColumnFamilyHandle(handle);
// Replaces the provided table factory with TitanTableFactory.
// While we need to preserve original table_factory for GetOptions.
auto& base_table_factory = base_descs[i].options.table_factory;
assert(base_table_factory != nullptr);
immutable_cf_options_.emplace(cf_id,
ImmutableTitanCFOptions(descs[i].options));
mutable_cf_options_.emplace(cf_id,
MutableTitanCFOptions(descs[i].options));
base_table_factory_[cf_id] = base_table_factory;
titan_table_factory_[cf_id] = std::make_shared<TitanTableFactory>(
auto titan_table_factory = std::make_shared<TitanTableFactory>(
db_options_, descs[i].options, blob_manager_, &mutex_, vset_.get(),
stats_.get());
base_descs[i].options.table_factory = titan_table_factory_[cf_id];
cf_info_.emplace(cf_id,
TitanColumnFamilyInfo(
{cf_name, ImmutableTitanCFOptions(descs[i].options),
MutableTitanCFOptions(descs[i].options),
base_table_factory, titan_table_factory}));
base_descs[i].options.table_factory = titan_table_factory;
// Add TableProperties for collecting statistics GC
base_descs[i].options.table_properties_collector_factories.emplace_back(
std::make_shared<BlobFileSizeCollectorFactory>());
......@@ -337,14 +358,15 @@ Status TitanDBImpl::CreateColumnFamilies(
{
MutexLock l(&mutex_);
for (size_t i = 0; i < descs.size(); i++) {
uint32_t cf_id = (*handles)[i]->GetID();
ColumnFamilyHandle* handle = (*handles)[i];
uint32_t cf_id = handle->GetID();
column_families.emplace(cf_id, descs[i].options);
immutable_cf_options_.emplace(
cf_id, ImmutableTitanCFOptions(descs[i].options));
mutable_cf_options_.emplace(cf_id,
MutableTitanCFOptions(descs[i].options));
base_table_factory_[cf_id] = base_table_factory[i];
titan_table_factory_[cf_id] = titan_table_factory[i];
cf_info_.emplace(
cf_id,
TitanColumnFamilyInfo(
{handle->GetName(), ImmutableTitanCFOptions(descs[i].options),
MutableTitanCFOptions(descs[i].options), base_table_factory[i],
titan_table_factory[i]}));
}
vset_->AddColumnFamilies(column_families);
}
......@@ -378,10 +400,6 @@ Status TitanDBImpl::DropColumnFamilies(
Status s = db_impl_->DropColumnFamilies(handles);
if (s.ok()) {
MutexLock l(&mutex_);
for (auto cf_id : column_families) {
base_table_factory_.erase(cf_id);
titan_table_factory_.erase(cf_id);
}
SequenceNumber obsolete_sequence = db_impl_->GetLatestSequenceNumber();
s = vset_->DropColumnFamilies(column_families, obsolete_sequence);
}
......@@ -408,7 +426,12 @@ Status TitanDBImpl::DestroyColumnFamilyHandle(
if (s.ok()) {
MutexLock l(&mutex_);
// it just changes some marks and doesn't delete blob files physically.
vset_->DestroyColumnFamily(cf_id);
Status destroy_status = vset_->MaybeDestroyColumnFamily(cf_id);
// VersionSet will return NotFound status if the cf is not destroyed.
if (destroy_status.ok()) {
assert(cf_info_.count(cf_id) > 0);
cf_info_.erase(cf_id);
}
}
if (s.ok()) {
ROCKS_LOG_INFO(db_options_.info_log, "Destroyed column family handle [%s].",
......@@ -779,8 +802,8 @@ Options TitanDBImpl::GetOptions(ColumnFamilyHandle* column_family) const {
uint32_t cf_id = column_family->GetID();
MutexLock l(&mutex_);
if (base_table_factory_.count(cf_id) > 0) {
options.table_factory = base_table_factory_.at(cf_id);
if (cf_info_.count(cf_id) > 0) {
options.table_factory = cf_info_.at(cf_id).base_table_factory;
} else {
ROCKS_LOG_ERROR(
db_options_.info_log,
......@@ -824,9 +847,10 @@ Status TitanDBImpl::SetOptions(
uint32_t cf_id = column_family->GetID();
{
MutexLock l(&mutex_);
auto& table_factory = titan_table_factory_[cf_id];
table_factory->SetBlobRunMode(mode);
mutable_cf_options_[cf_id].blob_run_mode = mode;
assert(cf_info_.count(cf_id) > 0);
TitanColumnFamilyInfo& cf_info = cf_info_[cf_id];
cf_info.titan_table_factory->SetBlobRunMode(mode);
cf_info.mutable_cf_options.blob_run_mode = mode;
}
}
return Status::OK();
......@@ -843,9 +867,11 @@ TitanOptions TitanDBImpl::GetTitanOptions(
uint32_t cf_id = column_family->GetID();
{
MutexLock l(&mutex_);
assert(cf_info_.count(cf_id) > 0);
const TitanColumnFamilyInfo& cf_info = cf_info_.at(cf_id);
*static_cast<TitanCFOptions*>(&titan_options) = TitanCFOptions(
static_cast<ColumnFamilyOptions>(base_options),
immutable_cf_options_.at(cf_id), mutable_cf_options_.at(cf_id));
cf_info.immutable_cf_options, cf_info.mutable_cf_options);
}
return titan_options;
}
......@@ -1076,5 +1102,28 @@ Status TitanDBImpl::SetBGError(const Status& s) {
return bg_err;
}
void TitanDBImpl::DumpStats() {
if (stats_ == nullptr) {
return;
}
LogBuffer log_buffer(InfoLogLevel::HEADER_LEVEL, db_options_.info_log.get());
{
MutexLock l(&mutex_);
for (auto& cf : cf_info_) {
TitanInternalStats* internal_stats = stats_->internal_stats(cf.first);
if (internal_stats == nullptr) {
ROCKS_LOG_WARN(db_options_.info_log,
"Column family [%s] missing internal stats.",
cf.second.name.c_str());
continue;
}
LogToBuffer(&log_buffer, "Titan internal stats for column family [%s]:",
cf.second.name.c_str());
internal_stats->DumpAndResetInternalOpStats(&log_buffer);
}
}
log_buffer.FlushBufferToLog();
}
} // namespace titandb
} // namespace rocksdb
#pragma once
#include "blob_file_manager.h"
#include "db/db_impl/db_impl.h"
#include "rocksdb/statistics.h"
#include "util/repeatable_thread.h"
#include "blob_file_manager.h"
#include "table_factory.h"
#include "titan/db.h"
#include "util/repeatable_thread.h"
#include "titan_stats.h"
#include "version_set.h"
namespace rocksdb {
namespace titandb {
struct TitanColumnFamilyInfo {
const std::string name;
const ImmutableTitanCFOptions immutable_cf_options;
MutableTitanCFOptions mutable_cf_options;
std::shared_ptr<TableFactory> base_table_factory;
std::shared_ptr<TitanTableFactory> titan_table_factory;
};
class TitanDBImpl : public TitanDB {
public:
TitanDBImpl(const TitanDBOptions& options, const std::string& dbname);
......@@ -135,6 +145,8 @@ class TitanDBImpl : public TitanDB {
friend class TitanDBTest;
friend class TitanThreadSafetyTest;
Status ValidateOptions() const;
Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* handle,
const Slice& key, PinnableSlice* value);
......@@ -196,6 +208,8 @@ class TitanDBImpl : public TitanDB {
bool HasBGError() { return has_bg_error_.load(); }
void DumpStats();
FileLock* lock_{nullptr};
// The lock sequence must be Titan.mutex_.Lock() -> Base DB mutex_.Lock()
// while the unlock sequence must be Base DB mutex.Unlock() ->
......@@ -220,23 +234,15 @@ class TitanDBImpl : public TitanDB {
// is not null.
std::unique_ptr<TitanStats> stats_;
// Guarded by mutex_.
std::unordered_map<uint32_t, ImmutableTitanCFOptions> immutable_cf_options_;
// Guarded by mutex_.
std::unordered_map<uint32_t, MutableTitanCFOptions> mutable_cf_options_;
// Guarded by mutex_.
std::unordered_map<uint32_t, std::shared_ptr<TableFactory>>
base_table_factory_;
// Guarded by mutex_.
std::unordered_map<uint32_t, std::shared_ptr<TitanTableFactory>>
titan_table_factory_;
// Access while holding mutex_ lock or during DB open.
std::unordered_map<uint32_t, TitanColumnFamilyInfo> cf_info_;
// handle for purging obsolete blob files at fixed intervals
std::unique_ptr<RepeatableThread> thread_purge_obsolete_;
// handle for dump internal stats at fixed intervals.
std::unique_ptr<RepeatableThread> thread_dump_stats_;
std::unique_ptr<VersionSet> vset_;
std::set<uint64_t> pending_outputs_;
std::shared_ptr<BlobFileManager> blob_manager_;
......
......@@ -22,8 +22,11 @@ void TitanDBOptions::Dump(Logger* logger) const {
"TitanDBOptions.max_background_gc : %" PRIi32,
max_background_gc);
ROCKS_LOG_HEADER(logger,
"TitanDBOptions.purge_obsolete_files_period: %" PRIu32,
purge_obsolete_files_period);
"TitanDBOptions.purge_obsolete_files_period_sec: %" PRIu32,
purge_obsolete_files_period_sec);
ROCKS_LOG_HEADER(logger,
"TitanDBOptions.titan_stats_dump_period_sec: %" PRIu32,
titan_stats_dump_period_sec);
}
TitanCFOptions::TitanCFOptions(const ColumnFamilyOptions& cf_opts,
......
......@@ -18,6 +18,10 @@ void TitanTableBuilder::Add(const Slice& key, const Slice& value) {
return;
}
uint64_t prev_bytes_read = 0;
uint64_t prev_bytes_written = 0;
SavePrevIOBytes(&prev_bytes_read, &prev_bytes_written);
if (ikey.type == kTypeBlobIndex &&
cf_options_.blob_run_mode == TitanBlobRunMode::kFallback) {
// we ingest value from blob file
......@@ -33,14 +37,16 @@ void TitanTableBuilder::Add(const Slice& key, const Slice& value) {
auto storage = blob_storage_.lock();
assert(storage != nullptr);
ReadOptions options; // dummy option
Status get_status = storage->Get(options, index, &record, &buffer);
UpdateIOBytes(prev_bytes_read, prev_bytes_written, &io_bytes_read_,
&io_bytes_written_);
if (get_status.ok()) {
ikey.type = kTypeValue;
std::string index_key;
AppendInternalKey(&index_key, ikey);
base_builder_->Add(index_key, record.value);
bytes_read_ += record.size();
} else {
// Get blob value can fail if corresponding blob file has been GC-ed
// deleted. In this case we write the blob index as is to compaction
......@@ -54,6 +60,8 @@ void TitanTableBuilder::Add(const Slice& key, const Slice& value) {
// we write to blob file and insert index
std::string index_value;
AddBlob(ikey.user_key, value, &index_value);
UpdateIOBytes(prev_bytes_read, prev_bytes_written, &io_bytes_read_,
&io_bytes_written_);
if (ok()) {
ikey.type = kTypeBlobIndex;
std::string index_key;
......@@ -85,6 +93,7 @@ void TitanTableBuilder::AddBlob(const Slice& key, const Slice& value,
MeasureTime(stats_, BLOB_DB_KEY_SIZE, key.size());
MeasureTime(stats_, BLOB_DB_VALUE_SIZE, value.size());
AddStats(stats_, cf_id_, TitanInternalStats::LIVE_BLOB_SIZE, value.size());
bytes_written_ += key.size() + value.size();
BlobIndex index;
BlobRecord record;
......@@ -93,6 +102,7 @@ void TitanTableBuilder::AddBlob(const Slice& key, const Slice& value,
index.file_number = blob_handle_->GetNumber();
blob_builder_->Add(record, &index.blob_handle);
RecordTick(stats_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN, index.blob_handle.size);
bytes_written_ += record.size();
if (ok()) {
index.EncodeTo(index_value);
}
......@@ -135,6 +145,7 @@ Status TitanTableBuilder::Finish() {
"Titan table builder failed on finish: %s",
status_.ToString().c_str());
}
UpdateInternalOpStats();
return status();
}
......@@ -166,5 +177,33 @@ TableProperties TitanTableBuilder::GetTableProperties() const {
return base_builder_->GetTableProperties();
}
void TitanTableBuilder::UpdateInternalOpStats() {
if (stats_ == nullptr) {
return;
}
TitanInternalStats* internal_stats = stats_->internal_stats(cf_id_);
if (internal_stats == nullptr) {
return;
}
InternalOpType op_type = InternalOpType::COMPACTION;
if (level_ == 0) {
op_type = InternalOpType::FLUSH;
}
InternalOpStats* internal_op_stats =
internal_stats->GetInternalOpStatsForType(op_type);
assert(internal_op_stats != nullptr);
AddStats(internal_op_stats, InternalOpStatsType::COUNT);
AddStats(internal_op_stats, InternalOpStatsType::BYTES_READ, bytes_read_);
AddStats(internal_op_stats, InternalOpStatsType::BYTES_WRITTEN,
bytes_written_);
AddStats(internal_op_stats, InternalOpStatsType::IO_BYTES_READ,
io_bytes_read_);
AddStats(internal_op_stats, InternalOpStatsType::IO_BYTES_WRITTEN,
io_bytes_written_);
if (blob_builder_ != nullptr) {
AddStats(internal_op_stats, InternalOpStatsType::OUTPUT_FILE_NUM);
}
}
} // namespace titandb
} // namespace rocksdb
......@@ -16,13 +16,15 @@ class TitanTableBuilder : public TableBuilder {
const TitanCFOptions& cf_options,
std::unique_ptr<TableBuilder> base_builder,
std::shared_ptr<BlobFileManager> blob_manager,
std::weak_ptr<BlobStorage> blob_storage, TitanStats* stats)
std::weak_ptr<BlobStorage> blob_storage, int level,
TitanStats* stats)
: cf_id_(cf_id),
db_options_(db_options),
cf_options_(cf_options),
base_builder_(std::move(base_builder)),
blob_manager_(blob_manager),
blob_storage_(blob_storage),
level_(level),
stats_(stats) {}
void Add(const Slice& key, const Slice& value) override;
......@@ -46,6 +48,8 @@ class TitanTableBuilder : public TableBuilder {
void AddBlob(const Slice& key, const Slice& value, std::string* index_value);
void UpdateInternalOpStats();
Status status_;
uint32_t cf_id_;
TitanDBOptions db_options_;
......@@ -55,8 +59,14 @@ class TitanTableBuilder : public TableBuilder {
std::shared_ptr<BlobFileManager> blob_manager_;
std::unique_ptr<BlobFileBuilder> blob_builder_;
std::weak_ptr<BlobStorage> blob_storage_;
int level_;
TitanStats* stats_;
// counters
uint64_t bytes_read_ = 0;
uint64_t bytes_written_ = 0;
uint64_t io_bytes_read_ = 0;
uint64_t io_bytes_written_ = 0;
};
} // namespace titandb
......
......@@ -29,7 +29,7 @@ TableBuilder* TitanTableFactory::NewTableBuilder(
}
return new TitanTableBuilder(column_family_id, db_options_, cf_options,
std::move(base_builder), blob_manager_,
blob_storage, stats_);
blob_storage, options.level, stats_);
}
std::string TitanTableFactory::GetPrintableTableOptions() const {
......
#include "test_util/testharness.h"
#include "titan/db.h"
namespace rocksdb {
namespace titandb {
class TitanOptionsTest : public testing::Test {
public:
TitanOptionsTest() : db_name_(test::TmpDir()) {
titan_options_.create_if_missing = true;
titan_options_.dirname = db_name_ + "/titandb";
}
~TitanOptionsTest() {
Status s = Close();
assert(s.ok());
}
Status Open() { return TitanDB::Open(titan_options_, db_name_, &titan_db); }
Status DeleteDir(const std::string& dirname) {
Status s;
Env* env = Env::Default();
std::vector<std::string> filenames;
s = env->GetChildren(dirname, &filenames);
if (!s.ok()) {
return s;
}
for (auto& fname : filenames) {
s = env->DeleteFile(dirname + "/" + fname);
if (!s.ok()) {
return s;
}
}
s = env->DeleteDir(dirname);
return s;
}
Status Close() {
Status s;
if (titan_db != nullptr) {
s = titan_db->Close();
if (!s.ok()) {
return s;
}
titan_db = nullptr;
s = DeleteDir(titan_options_.dirname);
if (!s.ok()) {
return s;
}
rocksdb::Options opts;
s = rocksdb::DestroyDB(db_name_, opts);
}
return s;
}
protected:
std::string db_name_;
TitanOptions titan_options_;
TitanDB* titan_db = nullptr;
};
TEST_F(TitanOptionsTest, PurgeObsoleteFilesPeriodSec) {
titan_options_.purge_obsolete_files_period_sec = 0;
Status s = Open();
ASSERT_TRUE(s.IsInvalidArgument());
}
} // namespace titandb
} // namespace rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
......@@ -40,5 +40,46 @@ const std::unordered_map<std::string, TitanInternalStats::StatsType>
TitanInternalStats::OBSOLETE_BLOB_FILE_SIZE},
};
const std::array<std::string,
static_cast<int>(InternalOpType::INTERNAL_OP_ENUM_MAX)>
TitanInternalStats::internal_op_names = {
"Flush ",
"Compaction",
"GC ",
};
void TitanInternalStats::DumpAndResetInternalOpStats(LogBuffer* log_buffer) {
constexpr double GB = 1.0 * 1024 * 1024 * 1024;
LogToBuffer(log_buffer,
"OP COUNT READ(GB) WRITE(GB) IO_READ(GB) IO_WRITE(GB) "
" FILE_IN FILE_OUT");
LogToBuffer(log_buffer,
"----------------------------------------------------------------"
"-----------------");
for (int op = 0; op < static_cast<int>(InternalOpType::INTERNAL_OP_ENUM_MAX);
op++) {
LogToBuffer(
log_buffer, "%s %5d %10.1f %10.1f %10.1f %10.1f %8d %8d",
internal_op_names[op].c_str(),
GetAndResetStats(&internal_op_stats_[op], InternalOpStatsType::COUNT),
GetAndResetStats(&internal_op_stats_[op],
InternalOpStatsType::BYTES_READ) /
GB,
GetAndResetStats(&internal_op_stats_[op],
InternalOpStatsType::BYTES_WRITTEN) /
GB,
GetAndResetStats(&internal_op_stats_[op],
InternalOpStatsType::IO_BYTES_READ) /
GB,
GetAndResetStats(&internal_op_stats_[op],
InternalOpStatsType::IO_BYTES_WRITTEN) /
GB,
GetAndResetStats(&internal_op_stats_[op],
InternalOpStatsType::INPUT_FILE_NUM),
GetAndResetStats(&internal_op_stats_[op],
InternalOpStatsType::OUTPUT_FILE_NUM));
}
}
} // namespace titandb
} // namespace rocksdb
#pragma once
#include "rocksdb/statistics.h"
#include "titan/options.h"
#include <array>
#include <atomic>
#include <map>
#include <string>
#include <unordered_map>
#include "logging/log_buffer.h"
#include "rocksdb/iostats_context.h"
#include "rocksdb/statistics.h"
#include "titan/options.h"
namespace rocksdb {
namespace titandb {
enum class InternalOpStatsType : int {
COUNT = 0,
BYTES_READ,
BYTES_WRITTEN,
IO_BYTES_READ,
IO_BYTES_WRITTEN,
INPUT_FILE_NUM,
OUTPUT_FILE_NUM,
INTERNAL_OP_STATS_ENUM_MAX,
};
enum class InternalOpType : int {
FLUSH = 0,
COMPACTION,
GC,
INTERNAL_OP_ENUM_MAX,
};
using InternalOpStats =
std::array<std::atomic<uint64_t>,
static_cast<size_t>(
InternalOpStatsType::INTERNAL_OP_STATS_ENUM_MAX)>;
// Titan internal stats does NOT optimize race
// condition by making thread local copies of
// data.
class TitanInternalStats {
public:
enum StatsType {
LIVE_BLOB_SIZE,
LIVE_BLOB_SIZE = 0,
NUM_LIVE_BLOB_FILE,
NUM_OBSOLETE_BLOB_FILE,
LIVE_BLOB_FILE_SIZE,
OBSOLETE_BLOB_FILE_SIZE,
INTERNAL_STATS_ENUM_MAX,
};
TitanInternalStats() { Clear(); }
void Clear() {
for (int i = 0; i < INTERNAL_STATS_ENUM_MAX; i++) {
stats_[i].store(0, std::memory_order_relaxed);
for (int stat = 0; stat < INTERNAL_STATS_ENUM_MAX; stat++) {
stats_[stat].store(0, std::memory_order_relaxed);
}
for (int op = 0;
op < static_cast<int>(InternalOpType::INTERNAL_OP_ENUM_MAX); op++) {
assert(
internal_op_stats_[op].size() ==
static_cast<size_t>(InternalOpStatsType::INTERNAL_OP_STATS_ENUM_MAX));
for (int stat = 0;
stat <
static_cast<int>(InternalOpStatsType::INTERNAL_OP_STATS_ENUM_MAX);
stat++) {
internal_op_stats_[op][stat].store(0, std::memory_order_relaxed);
}
}
}
void ResetStats(StatsType type) {
stats_[type].store(0, std::memory_order_relaxed);
}
void AddStats(StatsType type, uint64_t value) {
auto& v = stats_[type];
v.fetch_add(value, std::memory_order_relaxed);
}
void SubStats(StatsType type, uint64_t value) {
auto& v = stats_[type];
v.fetch_sub(value, std::memory_order_relaxed);
}
bool GetIntProperty(const Slice& property, uint64_t* value) const {
auto p = stats_type_string_map.find(property.ToString());
if (p != stats_type_string_map.end()) {
......@@ -48,6 +94,7 @@ class TitanInternalStats {
}
return false;
}
bool GetStringProperty(const Slice& property, std::string* value) const {
uint64_t int_value;
if (GetIntProperty(property, &int_value)) {
......@@ -57,15 +104,30 @@ class TitanInternalStats {
return false;
}
InternalOpStats* GetInternalOpStatsForType(InternalOpType type) {
return &internal_op_stats_[static_cast<int>(type)];
}
void DumpAndResetInternalOpStats(LogBuffer* log_buffer);
private:
static const std::unordered_map<std::string, TitanInternalStats::StatsType>
stats_type_string_map;
std::atomic<uint64_t> stats_[INTERNAL_STATS_ENUM_MAX];
static const std::array<
std::string, static_cast<int>(InternalOpType::INTERNAL_OP_ENUM_MAX)>
internal_op_names;
std::array<std::atomic<uint64_t>, INTERNAL_STATS_ENUM_MAX> stats_;
std::array<InternalOpStats,
static_cast<size_t>(InternalOpType::INTERNAL_OP_ENUM_MAX)>
internal_op_stats_;
};
class TitanStats {
public:
TitanStats(Statistics* stats) : stats_(stats) {}
// TODO: Initialize corresponding internal stats struct for Column families
// created after DB open.
Status Initialize(std::map<uint32_t, TitanCFOptions> cf_options,
uint32_t default_cf) {
for (auto& opts : cf_options) {
......@@ -74,7 +136,9 @@ class TitanStats {
default_cf_ = default_cf;
return Status::OK();
}
Statistics* statistics() { return stats_; }
TitanInternalStats* internal_stats(uint32_t cf_id) {
auto p = internal_stats_.find(cf_id);
if (p == internal_stats_.end()) {
......@@ -84,6 +148,8 @@ class TitanStats {
}
}
void DumpInternalOpStats(uint32_t cf_id, const std::string& cf_name);
private:
Statistics* stats_ = nullptr;
uint32_t default_cf_ = 0;
......@@ -151,5 +217,50 @@ inline void SubStats(TitanStats* stats, uint32_t cf_id,
}
}
inline uint64_t GetAndResetStats(InternalOpStats* stats,
InternalOpStatsType type) {
if (stats != nullptr) {
return (*stats)[static_cast<int>(type)].exchange(0,
std::memory_order_relaxed);
}
return 0;
}
inline void AddStats(InternalOpStats* stats, InternalOpStatsType type,
uint64_t value = 1) {
if (stats != nullptr) {
(*stats)[static_cast<int>(type)].fetch_add(value,
std::memory_order_relaxed);
}
}
inline void SubStats(InternalOpStats* stats, InternalOpStatsType type,
uint64_t value = 1) {
if (stats != nullptr) {
(*stats)[static_cast<int>(type)].fetch_sub(value,
std::memory_order_relaxed);
}
}
// IOStatsContext helper
inline void SavePrevIOBytes(uint64_t* prev_bytes_read,
uint64_t* prev_bytes_written) {
IOStatsContext* io_stats = get_iostats_context();
if (io_stats != nullptr) {
*prev_bytes_read = io_stats->bytes_read;
*prev_bytes_written = io_stats->bytes_written;
}
}
inline void UpdateIOBytes(uint64_t prev_bytes_read, uint64_t prev_bytes_written,
uint64_t* bytes_read, uint64_t* bytes_written) {
IOStatsContext* io_stats = get_iostats_context();
if (io_stats != nullptr) {
*bytes_read += io_stats->bytes_read - prev_bytes_read;
*bytes_written += io_stats->bytes_written - prev_bytes_written;
}
}
} // namespace titandb
} // namespace rocksdb
......@@ -266,7 +266,7 @@ Status VersionSet::DropColumnFamilies(
return s;
}
Status VersionSet::DestroyColumnFamily(uint32_t cf_id) {
Status VersionSet::MaybeDestroyColumnFamily(uint32_t cf_id) {
obsolete_columns_.erase(cf_id);
auto it = column_families_.find(cf_id);
if (it != column_families_.end()) {
......
......@@ -50,7 +50,7 @@ class VersionSet {
// Destroy the column family. Only after this is called, the obsolete files
// of the dropped column family can be physical deleted.
// REQUIRES: mutex is held
Status DestroyColumnFamily(uint32_t cf_id);
Status MaybeDestroyColumnFamily(uint32_t cf_id);
// Allocates a new file number.
uint64_t NewFileNumber() { return next_file_number_.fetch_add(1); }
......
......@@ -260,12 +260,12 @@ TEST_F(VersionTest, ObsoleteFiles) {
ASSERT_EQ(of.size(), 1);
CheckColumnFamiliesSize(10);
ASSERT_OK(vset_->DestroyColumnFamily(1));
ASSERT_OK(vset_->MaybeDestroyColumnFamily(1));
vset_->GetObsoleteFiles(&of, kMaxSequenceNumber);
ASSERT_EQ(of.size(), 4);
CheckColumnFamiliesSize(9);
ASSERT_OK(vset_->DestroyColumnFamily(2));
ASSERT_OK(vset_->MaybeDestroyColumnFamily(2));
CheckColumnFamiliesSize(8);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment