Unverified Commit 79dc247a authored by Connor's avatar Connor Committed by GitHub

Store Titan stats in rocksdb statistics (#120)

* store stats in rocksdb statistics
Signed-off-by: 's avatarConnor1996 <zbk602423539@gmail.com>
parent f6dc570f
comment:
layout: "diff"
behavior: default
coverage:
status:
project: off
patch: off
......@@ -106,13 +106,13 @@ Status BlobFileReader::Get(const ReadOptions& /*options*/,
EncodeBlobCache(&cache_key, cache_prefix_, handle.offset);
cache_handle = cache_->Lookup(cache_key);
if (cache_handle) {
RecordTick(stats_, TitanStats::BLOB_CACHE_HIT);
RecordTick(statistics(stats_), BLOB_CACHE_HIT);
auto blob = reinterpret_cast<OwnedSlice*>(cache_->Value(cache_handle));
buffer->PinSlice(*blob, UnrefCacheHandle, cache_.get(), cache_handle);
return DecodeInto(*blob, record);
}
}
RecordTick(stats_, TitanStats::BLOB_CACHE_MISS);
RecordTick(statistics(stats_), BLOB_CACHE_MISS);
OwnedSlice blob;
Status s = ReadRecord(handle, record, &blob);
......
......@@ -101,20 +101,22 @@ BlobGCJob::~BlobGCJob() {
LogFlush(db_options_.info_log.get());
}
// flush metrics
RecordTick(stats_, BLOB_DB_BYTES_READ, metrics_.bytes_read);
RecordTick(stats_, BLOB_DB_BYTES_WRITTEN, metrics_.bytes_written);
RecordTick(stats_, BLOB_DB_GC_NUM_KEYS_OVERWRITTEN,
RecordTick(statistics(stats_), BLOB_DB_BYTES_READ, metrics_.bytes_read);
RecordTick(statistics(stats_), BLOB_DB_BYTES_WRITTEN, metrics_.bytes_written);
RecordTick(statistics(stats_), BLOB_DB_GC_NUM_KEYS_OVERWRITTEN,
metrics_.gc_num_keys_overwritten);
RecordTick(stats_, BLOB_DB_GC_BYTES_OVERWRITTEN,
RecordTick(statistics(stats_), BLOB_DB_GC_BYTES_OVERWRITTEN,
metrics_.gc_bytes_overwritten);
RecordTick(stats_, BLOB_DB_GC_NUM_KEYS_RELOCATED,
RecordTick(statistics(stats_), BLOB_DB_GC_NUM_KEYS_RELOCATED,
metrics_.gc_num_keys_relocated);
RecordTick(stats_, BLOB_DB_GC_BYTES_RELOCATED, metrics_.gc_bytes_relocated);
RecordTick(stats_, BLOB_DB_GC_NUM_NEW_FILES, metrics_.gc_num_new_files);
RecordTick(stats_, BLOB_DB_GC_NUM_FILES, metrics_.gc_num_files);
RecordTick(stats_, TitanStats::GC_DISCARDABLE, metrics_.gc_discardable);
RecordTick(stats_, TitanStats::GC_SMALL_FILE, metrics_.gc_small_file);
RecordTick(stats_, TitanStats::GC_SAMPLE, metrics_.gc_sample);
RecordTick(statistics(stats_), BLOB_DB_GC_BYTES_RELOCATED,
metrics_.gc_bytes_relocated);
RecordTick(statistics(stats_), BLOB_DB_GC_NUM_NEW_FILES,
metrics_.gc_num_new_files);
RecordTick(statistics(stats_), BLOB_DB_GC_NUM_FILES, metrics_.gc_num_files);
RecordTick(statistics(stats_), GC_DISCARDABLE, metrics_.gc_discardable);
RecordTick(statistics(stats_), GC_SMALL_FILE, metrics_.gc_small_file);
RecordTick(statistics(stats_), GC_SAMPLE, metrics_.gc_sample);
}
Status BlobGCJob::Prepare() {
......@@ -394,7 +396,7 @@ Status BlobGCJob::InstallOutputBlobFiles() {
builder.second->GetLargestKey());
file->set_live_data_size(builder.second->live_data_size());
file->FileStateTransit(BlobFileMeta::FileEvent::kGCOutput);
RecordInHistogram(stats_, TitanStats::GC_OUTPUT_FILE_SIZE,
RecordInHistogram(statistics(stats_), GC_OUTPUT_FILE_SIZE,
file->file_size());
if (!tmp.empty()) {
tmp.append(" ");
......@@ -526,7 +528,7 @@ Status BlobGCJob::DeleteInputBlobFiles() {
Slice(file->smallest_key()).ToString(true).c_str(),
Slice(file->largest_key()).ToString(true).c_str());
metrics_.gc_num_files++;
RecordInHistogram(stats_, TitanStats::GC_INPUT_FILE_SIZE,
RecordInHistogram(statistics(stats_), GC_INPUT_FILE_SIZE,
file->file_size());
edit.DeleteBlobFile(file->file_number(), obsolete_sequence);
}
......
......@@ -32,7 +32,7 @@ std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC(
}
auto blob_file = blob_storage->FindFile(gc_score.file_number).lock();
if (!CheckBlobFile(blob_file.get())) {
RecordTick(stats_, TitanStats::GC_NO_NEED, 1);
RecordTick(statistics(stats_), GC_NO_NEED, 1);
// Skip this file id this file is being GCed
// or this file had been GCed
ROCKS_LOG_INFO(db_options_.info_log, "Blob file %" PRIu64 " no need gc",
......@@ -53,7 +53,7 @@ std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC(
next_gc_size += blob_file->file_size();
if (next_gc_size > cf_options_.min_gc_batch_size) {
maybe_continue_next_time = true;
RecordTick(stats_, TitanStats::GC_REMAIN, 1);
RecordTick(statistics(stats_), GC_REMAIN, 1);
ROCKS_LOG_INFO(db_options_.info_log,
"remain more than %" PRIu64
" bytes to be gc and trigger after this gc",
......
......@@ -18,6 +18,7 @@
#include "db_iter.h"
#include "table_factory.h"
#include "titan_build_version.h"
#include "titan_stats.h"
namespace rocksdb {
namespace titandb {
......@@ -56,9 +57,9 @@ class TitanDBImpl::FileManager : public BlobFileManager {
VersionEdit edit;
edit.SetColumnFamilyID(cf_id);
for (auto& file : files) {
RecordTick(db_->stats_.get(), BLOB_DB_BLOB_FILE_SYNCED);
RecordTick(statistics(db_->stats_.get()), BLOB_DB_BLOB_FILE_SYNCED);
{
StopWatch sync_sw(db_->env_, db_->stats_.get(),
StopWatch sync_sw(db_->env_, statistics(db_->stats_.get()),
BLOB_DB_BLOB_FILE_SYNC_MICROS);
s = file.second->GetFile()->Sync(false);
}
......@@ -141,6 +142,7 @@ TitanDBImpl::TitanDBImpl(const TitanDBOptions& options,
}
dirname_ = db_options_.dirname;
if (db_options_.statistics != nullptr) {
db_options_.statistics = titandb::CreateDBStatistics();
stats_.reset(new TitanStats(db_options_.statistics.get()));
}
blob_manager_.reset(new FileManager(this));
......@@ -597,8 +599,8 @@ Status TitanDBImpl::GetImpl(const ReadOptions& options,
nullptr /*read_callback*/, &is_blob_index);
if (!s.ok() || !is_blob_index) return s;
StopWatch get_sw(env_, stats_.get(), BLOB_DB_GET_MICROS);
RecordTick(stats_.get(), BLOB_DB_NUM_GET);
StopWatch get_sw(env_, statistics(stats_.get()), BLOB_DB_GET_MICROS);
RecordTick(statistics(stats_.get()), BLOB_DB_NUM_GET);
BlobIndex index;
s = index.DecodeFrom(value);
......@@ -616,10 +618,11 @@ Status TitanDBImpl::GetImpl(const ReadOptions& options,
mutex_.Unlock();
if (storage) {
StopWatch read_sw(env_, stats_.get(), BLOB_DB_BLOB_FILE_READ_MICROS);
StopWatch read_sw(env_, statistics(stats_.get()),
BLOB_DB_BLOB_FILE_READ_MICROS);
s = storage->Get(options, index, &record, &buffer);
RecordTick(stats_.get(), BLOB_DB_NUM_KEYS_READ);
RecordTick(stats_.get(), BLOB_DB_BLOB_FILE_BYTES_READ,
RecordTick(statistics(stats_.get()), BLOB_DB_NUM_KEYS_READ);
RecordTick(statistics(stats_.get()), BLOB_DB_BLOB_FILE_BYTES_READ,
index.blob_handle.size);
} else {
ROCKS_LOG_ERROR(db_options_.info_log,
......
......@@ -161,7 +161,7 @@ void TitanDBImpl::BackgroundCallGC() {
Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer,
uint32_t column_family_id) {
mutex_.AssertHeld();
StopWatch gc_sw(env_, stats_.get(), BLOB_DB_GC_MICROS);
StopWatch gc_sw(env_, statistics(stats_.get()), BLOB_DB_GC_MICROS);
std::unique_ptr<BlobGC> blob_gc;
bool gc_merge_rewrite = false;
......@@ -217,7 +217,7 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer,
if (blob_gc->trigger_next() &&
(bg_gc_scheduled_ - 1 + gc_queue_.size() <
2 * static_cast<uint32_t>(db_options_.max_background_gc))) {
RecordTick(stats_.get(), TitanStats::GC_TRIGGER_NEXT, 1);
RecordTick(statistics(stats_.get()), GC_TRIGGER_NEXT, 1);
// There is still data remained to be GCed
// and the queue is not overwhelmed
// then put this cf to GC queue for next GC
......@@ -226,11 +226,11 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer,
}
if (s.ok()) {
RecordTick(stats_.get(), TitanStats::GC_SUCCESS, 1);
RecordTick(statistics(stats_.get()), GC_SUCCESS, 1);
// Done
} else {
SetBGError(s);
RecordTick(stats_.get(), TitanStats::GC_FAIL, 1);
RecordTick(statistics(stats_.get()), GC_FAIL, 1);
ROCKS_LOG_WARN(db_options_.info_log, "Titan GC error: %s",
s.ToString().c_str());
}
......
......@@ -46,36 +46,36 @@ class TitanDBIterator : public Iterator {
void SeekToFirst() override {
iter_->SeekToFirst();
if (ShouldGetBlobValue()) {
StopWatch seek_sw(env_, stats_, BLOB_DB_SEEK_MICROS);
StopWatch seek_sw(env_, statistics(stats_), BLOB_DB_SEEK_MICROS);
GetBlobValue(true);
RecordTick(stats_, BLOB_DB_NUM_SEEK);
RecordTick(statistics(stats_), BLOB_DB_NUM_SEEK);
}
}
void SeekToLast() override {
iter_->SeekToLast();
if (ShouldGetBlobValue()) {
StopWatch seek_sw(env_, stats_, BLOB_DB_SEEK_MICROS);
StopWatch seek_sw(env_, statistics(stats_), BLOB_DB_SEEK_MICROS);
GetBlobValue(false);
RecordTick(stats_, BLOB_DB_NUM_SEEK);
RecordTick(statistics(stats_), BLOB_DB_NUM_SEEK);
}
}
void Seek(const Slice& target) override {
iter_->Seek(target);
if (ShouldGetBlobValue()) {
StopWatch seek_sw(env_, stats_, BLOB_DB_SEEK_MICROS);
StopWatch seek_sw(env_, statistics(stats_), BLOB_DB_SEEK_MICROS);
GetBlobValue(true);
RecordTick(stats_, BLOB_DB_NUM_SEEK);
RecordTick(statistics(stats_), BLOB_DB_NUM_SEEK);
}
}
void SeekForPrev(const Slice& target) override {
iter_->SeekForPrev(target);
if (ShouldGetBlobValue()) {
StopWatch seek_sw(env_, stats_, BLOB_DB_SEEK_MICROS);
StopWatch seek_sw(env_, statistics(stats_), BLOB_DB_SEEK_MICROS);
GetBlobValue(false);
RecordTick(stats_, BLOB_DB_NUM_SEEK);
RecordTick(statistics(stats_), BLOB_DB_NUM_SEEK);
}
}
......@@ -83,9 +83,9 @@ class TitanDBIterator : public Iterator {
assert(Valid());
iter_->Next();
if (ShouldGetBlobValue()) {
StopWatch next_sw(env_, stats_, BLOB_DB_NEXT_MICROS);
StopWatch next_sw(env_, statistics(stats_), BLOB_DB_NEXT_MICROS);
GetBlobValue(true);
RecordTick(stats_, BLOB_DB_NUM_NEXT);
RecordTick(statistics(stats_), BLOB_DB_NUM_NEXT);
}
}
......@@ -93,9 +93,9 @@ class TitanDBIterator : public Iterator {
assert(Valid());
iter_->Prev();
if (ShouldGetBlobValue()) {
StopWatch prev_sw(env_, stats_, BLOB_DB_PREV_MICROS);
StopWatch prev_sw(env_, statistics(stats_), BLOB_DB_PREV_MICROS);
GetBlobValue(false);
RecordTick(stats_, BLOB_DB_NUM_PREV);
RecordTick(statistics(stats_), BLOB_DB_NUM_PREV);
}
}
......
......@@ -113,7 +113,8 @@ void TitanTableBuilder::Add(const Slice& key, const Slice& value) {
void TitanTableBuilder::AddBlob(const Slice& key, const Slice& value,
std::string* index_value) {
if (!ok()) return;
StopWatch write_sw(db_options_.env, stats_, BLOB_DB_BLOB_FILE_WRITE_MICROS);
StopWatch write_sw(db_options_.env, statistics(stats_),
BLOB_DB_BLOB_FILE_WRITE_MICROS);
if (!blob_builder_) {
status_ = blob_manager_->NewFile(&blob_handle_);
......@@ -125,9 +126,9 @@ void TitanTableBuilder::AddBlob(const Slice& key, const Slice& value,
new BlobFileBuilder(db_options_, cf_options_, blob_handle_->GetFile()));
}
RecordTick(stats_, BLOB_DB_NUM_KEYS_WRITTEN);
RecordInHistogram(stats_, BLOB_DB_KEY_SIZE, key.size());
RecordInHistogram(stats_, BLOB_DB_VALUE_SIZE, value.size());
RecordTick(statistics(stats_), BLOB_DB_NUM_KEYS_WRITTEN);
RecordInHistogram(statistics(stats_), BLOB_DB_KEY_SIZE, key.size());
RecordInHistogram(statistics(stats_), BLOB_DB_VALUE_SIZE, value.size());
AddStats(stats_, cf_id_, TitanInternalStats::LIVE_BLOB_SIZE, value.size());
bytes_written_ += key.size() + value.size();
......@@ -137,7 +138,8 @@ void TitanTableBuilder::AddBlob(const Slice& key, const Slice& value,
record.value = value;
index.file_number = blob_handle_->GetNumber();
blob_builder_->Add(record, &index.blob_handle);
RecordTick(stats_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN, index.blob_handle.size);
RecordTick(statistics(stats_), BLOB_DB_BLOB_FILE_BYTES_WRITTEN,
index.blob_handle.size);
bytes_written_ += record.size();
if (ok()) {
index.EncodeTo(index_value);
......
#include "titan_stats.h"
#include "titan/db.h"
#include "monitoring/statistics_impl.h"
#include <map>
#include <string>
namespace rocksdb {
namespace titandb {
std::shared_ptr<Statistics> CreateDBStatistics() {
return rocksdb::CreateDBStatistics<TITAN_TICKER_ENUM_MAX,
TITAN_HISTOGRAM_ENUM_MAX>();
}
static const std::string titandb_prefix = "rocksdb.titandb.";
static const std::string live_blob_size = "live-blob-size";
......
......@@ -8,14 +8,16 @@
#include "logging/log_buffer.h"
#include "monitoring/histogram.h"
#include "monitoring/statistics.h"
#include "rocksdb/iostats_context.h"
#include "rocksdb/statistics.h"
#include "titan/options.h"
namespace rocksdb {
namespace titandb {
std::shared_ptr<Statistics> CreateDBStatistics();
enum class InternalOpStatsType : int {
COUNT = 0,
BYTES_READ,
......@@ -134,34 +136,34 @@ class TitanInternalStats {
internal_op_stats_;
};
class TitanStats : public Statistics {
public:
enum TickerType : uint32_t {
BLOB_CACHE_HIT = TICKER_ENUM_MAX + 1,
BLOB_CACHE_MISS,
enum TickerType : uint32_t {
BLOB_CACHE_HIT = TICKER_ENUM_MAX,
BLOB_CACHE_MISS,
GC_NO_NEED,
GC_REMAIN,
GC_NO_NEED,
GC_REMAIN,
GC_DISCARDABLE,
GC_SAMPLE,
GC_SMALL_FILE,
GC_DISCARDABLE,
GC_SAMPLE,
GC_SMALL_FILE,
GC_FAIL,
GC_SUCCESS,
GC_TRIGGER_NEXT,
GC_FAIL,
GC_SUCCESS,
GC_TRIGGER_NEXT,
INTERNAL_TICKER_ENUM_MAX,
};
TITAN_TICKER_ENUM_MAX,
};
enum HistogramType : uint32_t {
TITAN_MANIFEST_FILE_SYNC_MICROS = HISTOGRAM_ENUM_MAX + 1,
GC_INPUT_FILE_SIZE,
GC_OUTPUT_FILE_SIZE,
enum HistogramType : uint32_t {
TITAN_MANIFEST_FILE_SYNC_MICROS = HISTOGRAM_ENUM_MAX,
GC_INPUT_FILE_SIZE,
GC_OUTPUT_FILE_SIZE,
INTERNAL_HISTOGRAM_ENUM_MAX,
};
TITAN_HISTOGRAM_ENUM_MAX,
};
class TitanStats {
public:
TitanStats(Statistics* stats) : stats_(stats) {}
// TODO: Initialize corresponding internal stats struct for Column families
......@@ -173,6 +175,8 @@ class TitanStats : public Statistics {
return Status::OK();
}
Statistics* statistics() { return stats_; }
TitanInternalStats* internal_stats(uint32_t cf_id) {
auto p = internal_stats_.find(cf_id);
if (p == internal_stats_.end()) {
......@@ -184,103 +188,26 @@ class TitanStats : public Statistics {
void DumpInternalOpStats(uint32_t cf_id, const std::string& cf_name);
uint64_t getTickerCount(uint32_t tickerType) const {
if (stats_ == nullptr) return 0;
if (tickerType > TICKER_ENUM_MAX) {
return tickers_[tickerType - (TICKER_ENUM_MAX + 1)].load(
std::memory_order_relaxed);
}
return stats_->getTickerCount(tickerType);
}
void histogramData(uint32_t type, HistogramData* const data) const {
if (stats_ == nullptr) return;
if (type > HISTOGRAM_ENUM_MAX) {
return histograms_[type - (HISTOGRAM_ENUM_MAX + 1)].Data(data);
}
return stats_->histogramData(type, data);
}
std::string getHistogramString(uint32_t type) const {
return stats_->getHistogramString(type);
}
void recordTick(uint32_t tickerType, uint64_t count = 0) {
if (stats_ == nullptr) return;
if (tickerType > TICKER_ENUM_MAX) {
tickers_[tickerType - (TICKER_ENUM_MAX + 1)].fetch_add(
count, std::memory_order_relaxed);
} else {
stats_->recordTick(tickerType, count);
}
}
void setTickerCount(uint32_t tickerType, uint64_t count) {
if (stats_ == nullptr) return;
if (tickerType > TICKER_ENUM_MAX) {
tickers_[tickerType - (TICKER_ENUM_MAX + 1)].store(
count, std::memory_order_relaxed);
} else {
stats_->setTickerCount(tickerType, count);
}
}
uint64_t getAndResetTickerCount(uint32_t tickerType) {
if (stats_ == nullptr) return 0;
if (tickerType > TICKER_ENUM_MAX) {
return tickers_[tickerType - (TICKER_ENUM_MAX + 1)].exchange(
0, std::memory_order_relaxed);
}
return stats_->getAndResetTickerCount(tickerType);
}
void measureTime(uint32_t histogramType, uint64_t time) {
if (stats_ == nullptr) return;
if (histogramType > HISTOGRAM_ENUM_MAX) {
histograms_[histogramType - (HISTOGRAM_ENUM_MAX + 1)].Add(time);
} else {
stats_->measureTime(histogramType, time);
}
}
// Resets all ticker and histogram stats
virtual Status Reset() {
Status Reset() {
for (auto& p : internal_stats_) {
p.second->Clear();
}
for (uint32_t type = TICKER_ENUM_MAX; type < INTERNAL_TICKER_ENUM_MAX;
type++) {
tickers_[type].store(0, std::memory_order_relaxed);
}
for (uint32_t type = HISTOGRAM_ENUM_MAX; type < INTERNAL_HISTOGRAM_ENUM_MAX;
type++) {
histograms_[type].Clear();
}
return stats_->Reset();
}
// String representation of the statistic object.
virtual std::string ToString() const { return stats_->ToString(); }
// Override this function to disable particular histogram collection
virtual bool HistEnabledForType(uint32_t type) const {
return type < INTERNAL_HISTOGRAM_ENUM_MAX;
}
private:
// RocksDB statistics
Statistics* stats_ = nullptr;
std::unordered_map<uint32_t, std::shared_ptr<TitanInternalStats>>
internal_stats_;
std::array<std::atomic<uint64_t>,
TickerType::INTERNAL_TICKER_ENUM_MAX - TICKER_ENUM_MAX>
tickers_;
std::array<HistogramImpl, INTERNAL_HISTOGRAM_ENUM_MAX - HISTOGRAM_ENUM_MAX>
histograms_;
};
// Utility functions for RocksDB stats types
inline Statistics* statistics(TitanStats* stats) {
return (stats) ? stats->statistics() : nullptr;
}
// Utility functions for Titan ticker and histogram stats types
inline void ResetStats(TitanStats* stats, uint32_t cf_id,
TitanInternalStats::StatsType type) {
......
......@@ -162,7 +162,7 @@ void UnrefCacheHandle(void* arg1, void* arg2) {
Status SyncTitanManifest(Env* env, TitanStats* stats,
const ImmutableDBOptions* db_options,
WritableFileWriter* file) {
StopWatch sw(env, stats, TitanStats::TITAN_MANIFEST_FILE_SYNC_MICROS);
StopWatch sw(env, statistics(stats), TITAN_MANIFEST_FILE_SYNC_MICROS);
return file->Sync(db_options->use_fsync);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment