Unverified Commit ec440e2a authored by Xinye Tao's avatar Xinye Tao Committed by GitHub

Add Titan internal stats and metrics for blob file and live value (#15)

* Add Titan internal stats

* add metrics for blob monitoring

* add override quantifier

* address comments

* fix minor issue

* address comments

* address comments

* address comments

* merge master

* turn on test output

* fix merge issue

* fix segfault in background gc

* fix loop

* remove loop
parent 70dc779b
...@@ -32,6 +32,7 @@ matrix: ...@@ -32,6 +32,7 @@ matrix:
install: install:
- git clone --depth=1 --branch=tikv-3.0 https://github.com/pingcap/rocksdb.git - git clone --depth=1 --branch=tikv-3.0 https://github.com/pingcap/rocksdb.git
- export CTEST_OUTPUT_ON_FAILURE=1
- if [ "${COMPILER}" == gcc7 ]; then - if [ "${COMPILER}" == gcc7 ]; then
CC=gcc-7; CC=gcc-7;
CXX=g++-7; CXX=g++-7;
......
...@@ -106,6 +106,34 @@ class TitanDB : public StackableDB { ...@@ -106,6 +106,34 @@ class TitanDB : public StackableDB {
Status SetOptions(ColumnFamilyHandle* column_family, Status SetOptions(ColumnFamilyHandle* column_family,
const std::unordered_map<std::string, std::string>& const std::unordered_map<std::string, std::string>&
new_options) override = 0; new_options) override = 0;
struct Properties {
// "rocksdb.titandb.live-blob-size" - returns total blob value size
// referenced by LSM tree.
static const std::string kLiveBlobSize;
// "rocksdb.titandb.num-live-blob-file" - returns total blob file count.
static const std::string kNumLiveBlobFile;
// "rocksdb.titandb.num-obsolete-blob-file" - return obsolete blob file.
static const std::string kNumObsoleteBlobFile;
// "rocksdb.titandb.live-blob-file-size" - returns total size of live blob
// files.
static const std::string kLiveBlobFileSize;
// "rocksdb.titandb.obsolete-blob-file-size" - returns size of obsolete
// blob files.
static const std::string kObsoleteBlobFileSize;
};
bool GetProperty(ColumnFamilyHandle* column_family, const Slice& property,
std::string* value) override = 0;
bool GetProperty(const Slice& property, std::string* value) override {
return GetProperty(DefaultColumnFamily(), property, value);
}
bool GetIntProperty(ColumnFamilyHandle* column_family, const Slice& property,
uint64_t* value) override = 0;
bool GetIntProperty(const Slice& property, uint64_t* value) override {
return GetIntProperty(DefaultColumnFamily(), property, value);
}
}; };
} // namespace titandb } // namespace titandb
......
...@@ -29,7 +29,7 @@ struct TitanDBOptions : public DBOptions { ...@@ -29,7 +29,7 @@ struct TitanDBOptions : public DBOptions {
explicit TitanDBOptions(const DBOptions& options) : DBOptions(options) {} explicit TitanDBOptions(const DBOptions& options) : DBOptions(options) {}
TitanDBOptions& operator=(const DBOptions& options) { TitanDBOptions& operator=(const DBOptions& options) {
*dynamic_cast<DBOptions*>(this) = options; *static_cast<DBOptions*>(this) = options;
return *this; return *this;
} }
}; };
......
...@@ -16,13 +16,13 @@ Slice EncodeFileNumber(const uint64_t* number) { ...@@ -16,13 +16,13 @@ Slice EncodeFileNumber(const uint64_t* number) {
BlobFileCache::BlobFileCache(const TitanDBOptions& db_options, BlobFileCache::BlobFileCache(const TitanDBOptions& db_options,
const TitanCFOptions& cf_options, const TitanCFOptions& cf_options,
std::shared_ptr<Cache> cache) std::shared_ptr<Cache> cache, TitanStats* stats)
: env_(db_options.env), : env_(db_options.env),
env_options_(db_options), env_options_(db_options),
db_options_(db_options), db_options_(db_options),
cf_options_(cf_options), cf_options_(cf_options),
cache_(cache), cache_(cache),
stats_(db_options.statistics.get()) {} stats_(stats) {}
Status BlobFileCache::Get(const ReadOptions& options, uint64_t file_number, Status BlobFileCache::Get(const ReadOptions& options, uint64_t file_number,
uint64_t file_size, const BlobHandle& handle, uint64_t file_size, const BlobHandle& handle,
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
#include "blob_format.h" #include "blob_format.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "titan/options.h" #include "titan/options.h"
#include "titan_stats.h"
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
...@@ -12,7 +13,8 @@ class BlobFileCache { ...@@ -12,7 +13,8 @@ class BlobFileCache {
public: public:
// Constructs a blob file cache to cache opened files. // Constructs a blob file cache to cache opened files.
BlobFileCache(const TitanDBOptions& db_options, BlobFileCache(const TitanDBOptions& db_options,
const TitanCFOptions& cf_options, std::shared_ptr<Cache> cache); const TitanCFOptions& cf_options, std::shared_ptr<Cache> cache,
TitanStats* stats);
// Gets the blob record pointed by the handle in the specified file // Gets the blob record pointed by the handle in the specified file
// number. The corresponding file size must be exactly "file_size" // number. The corresponding file size must be exactly "file_size"
...@@ -41,7 +43,7 @@ class BlobFileCache { ...@@ -41,7 +43,7 @@ class BlobFileCache {
TitanDBOptions db_options_; TitanDBOptions db_options_;
TitanCFOptions cf_options_; TitanCFOptions cf_options_;
std::shared_ptr<Cache> cache_; std::shared_ptr<Cache> cache_;
Statistics* stats_; TitanStats* stats_;
}; };
} // namespace titandb } // namespace titandb
......
...@@ -52,7 +52,7 @@ class BlobFileIteratorTest : public testing::Test { ...@@ -52,7 +52,7 @@ class BlobFileIteratorTest : public testing::Test {
void NewBuiler() { void NewBuiler() {
TitanDBOptions db_options(titan_options_); TitanDBOptions db_options(titan_options_);
TitanCFOptions cf_options(titan_options_); TitanCFOptions cf_options(titan_options_);
BlobFileCache cache(db_options, cf_options, {NewLRUCache(128)}); BlobFileCache cache(db_options, cf_options, {NewLRUCache(128)}, nullptr);
{ {
std::unique_ptr<WritableFile> f; std::unique_ptr<WritableFile> f;
......
#include "blob_file_reader.h" #include "blob_file_reader.h"
#include "titan_stats.h"
#include "util/crc32c.h" #include "util/crc32c.h"
#include "util/filename.h" #include "util/filename.h"
...@@ -48,7 +49,7 @@ Status BlobFileReader::Open(const TitanCFOptions& options, ...@@ -48,7 +49,7 @@ Status BlobFileReader::Open(const TitanCFOptions& options,
std::unique_ptr<RandomAccessFileReader> file, std::unique_ptr<RandomAccessFileReader> file,
uint64_t file_size, uint64_t file_size,
std::unique_ptr<BlobFileReader>* result, std::unique_ptr<BlobFileReader>* result,
Statistics* stats) { TitanStats* stats) {
if (file_size < BlobFileFooter::kEncodedLength) { if (file_size < BlobFileFooter::kEncodedLength) {
return Status::Corruption("file is too short to be a blob file"); return Status::Corruption("file is too short to be a blob file");
} }
...@@ -68,7 +69,7 @@ Status BlobFileReader::Open(const TitanCFOptions& options, ...@@ -68,7 +69,7 @@ Status BlobFileReader::Open(const TitanCFOptions& options,
BlobFileReader::BlobFileReader(const TitanCFOptions& options, BlobFileReader::BlobFileReader(const TitanCFOptions& options,
std::unique_ptr<RandomAccessFileReader> file, std::unique_ptr<RandomAccessFileReader> file,
Statistics* stats) TitanStats* stats)
: options_(options), : options_(options),
file_(std::move(file)), file_(std::move(file)),
cache_(options.blob_cache), cache_(options.blob_cache),
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
#include "blob_format.h" #include "blob_format.h"
#include "titan/options.h" #include "titan/options.h"
#include "titan_stats.h"
#include "util/file_reader_writer.h" #include "util/file_reader_writer.h"
namespace rocksdb { namespace rocksdb {
...@@ -20,7 +21,7 @@ class BlobFileReader { ...@@ -20,7 +21,7 @@ class BlobFileReader {
std::unique_ptr<RandomAccessFileReader> file, std::unique_ptr<RandomAccessFileReader> file,
uint64_t file_size, uint64_t file_size,
std::unique_ptr<BlobFileReader>* result, std::unique_ptr<BlobFileReader>* result,
Statistics* stats); TitanStats* stats);
// Gets the blob record pointed by the handle in this file. The data // Gets the blob record pointed by the handle in this file. The data
// of the record is stored in the provided buffer, so the buffer // of the record is stored in the provided buffer, so the buffer
...@@ -33,7 +34,7 @@ class BlobFileReader { ...@@ -33,7 +34,7 @@ class BlobFileReader {
BlobFileReader(const TitanCFOptions& options, BlobFileReader(const TitanCFOptions& options,
std::unique_ptr<RandomAccessFileReader> file, std::unique_ptr<RandomAccessFileReader> file,
Statistics* stats); TitanStats* stats);
Status ReadRecord(const BlobHandle& handle, BlobRecord* record, Status ReadRecord(const BlobHandle& handle, BlobRecord* record,
OwnedSlice* buffer); OwnedSlice* buffer);
...@@ -47,7 +48,7 @@ class BlobFileReader { ...@@ -47,7 +48,7 @@ class BlobFileReader {
// Information read from the file. // Information read from the file.
BlobFileFooter footer_; BlobFileFooter footer_;
Statistics* stats_; TitanStats* stats_;
}; };
// Performs readahead on continuous reads. // Performs readahead on continuous reads.
......
...@@ -22,7 +22,7 @@ class BlobFileTest : public testing::Test { ...@@ -22,7 +22,7 @@ class BlobFileTest : public testing::Test {
options.dirname = dirname_; options.dirname = dirname_;
TitanDBOptions db_options(options); TitanDBOptions db_options(options);
TitanCFOptions cf_options(options); TitanCFOptions cf_options(options);
BlobFileCache cache(db_options, cf_options, {NewLRUCache(128)}); BlobFileCache cache(db_options, cf_options, {NewLRUCache(128)}, nullptr);
const int n = 100; const int n = 100;
std::vector<BlobHandle> handles(n); std::vector<BlobHandle> handles(n);
...@@ -83,7 +83,7 @@ class BlobFileTest : public testing::Test { ...@@ -83,7 +83,7 @@ class BlobFileTest : public testing::Test {
options.dirname = dirname_; options.dirname = dirname_;
TitanDBOptions db_options(options); TitanDBOptions db_options(options);
TitanCFOptions cf_options(options); TitanCFOptions cf_options(options);
BlobFileCache cache(db_options, cf_options, {NewLRUCache(128)}); BlobFileCache cache(db_options, cf_options, {NewLRUCache(128)}, nullptr);
const int n = 100; const int n = 100;
std::vector<BlobHandle> handles(n); std::vector<BlobHandle> handles(n);
......
...@@ -8,7 +8,7 @@ namespace titandb { ...@@ -8,7 +8,7 @@ namespace titandb {
class BlobGCJob::GarbageCollectionWriteCallback : public WriteCallback { class BlobGCJob::GarbageCollectionWriteCallback : public WriteCallback {
public: public:
GarbageCollectionWriteCallback(ColumnFamilyHandle* cfh, std::string&& _key, GarbageCollectionWriteCallback(ColumnFamilyHandle* cfh, std::string&& _key,
BlobIndex&& blob_index, Statistics* stats) BlobIndex&& blob_index)
: cfh_(cfh), key_(std::move(_key)), blob_index_(blob_index) { : cfh_(cfh), key_(std::move(_key)), blob_index_(blob_index) {
assert(!key_.empty()); assert(!key_.empty());
} }
...@@ -73,7 +73,7 @@ BlobGCJob::BlobGCJob(BlobGC* blob_gc, DB* db, port::Mutex* mutex, ...@@ -73,7 +73,7 @@ BlobGCJob::BlobGCJob(BlobGC* blob_gc, DB* db, port::Mutex* mutex,
const EnvOptions& env_options, const EnvOptions& env_options,
BlobFileManager* blob_file_manager, BlobFileManager* blob_file_manager,
VersionSet* version_set, LogBuffer* log_buffer, VersionSet* version_set, LogBuffer* log_buffer,
std::atomic_bool* shuting_down) std::atomic_bool* shuting_down, TitanStats* stats)
: blob_gc_(blob_gc), : blob_gc_(blob_gc),
base_db_(db), base_db_(db),
base_db_impl_(reinterpret_cast<DBImpl*>(base_db_)), base_db_impl_(reinterpret_cast<DBImpl*>(base_db_)),
...@@ -85,7 +85,7 @@ BlobGCJob::BlobGCJob(BlobGC* blob_gc, DB* db, port::Mutex* mutex, ...@@ -85,7 +85,7 @@ BlobGCJob::BlobGCJob(BlobGC* blob_gc, DB* db, port::Mutex* mutex,
version_set_(version_set), version_set_(version_set),
log_buffer_(log_buffer), log_buffer_(log_buffer),
shuting_down_(shuting_down), shuting_down_(shuting_down),
stats_(db_options_.statistics.get()) {} stats_(stats) {}
BlobGCJob::~BlobGCJob() { BlobGCJob::~BlobGCJob() {
// flush metrics // flush metrics
...@@ -311,7 +311,7 @@ Status BlobGCJob::DoRunGC() { ...@@ -311,7 +311,7 @@ Status BlobGCJob::DoRunGC() {
// Store WriteBatch for rewriting new Key-Index pairs to LSM // Store WriteBatch for rewriting new Key-Index pairs to LSM
GarbageCollectionWriteCallback callback(cfh, blob_record.key.ToString(), GarbageCollectionWriteCallback callback(cfh, blob_record.key.ToString(),
std::move(blob_index), stats_); std::move(blob_index));
callback.value = index_entry; callback.value = index_entry;
rewrite_batches_.emplace_back( rewrite_batches_.emplace_back(
std::make_pair(WriteBatch(), std::move(callback))); std::make_pair(WriteBatch(), std::move(callback)));
......
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
#include "rocksdb/statistics.h" #include "rocksdb/statistics.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "titan/options.h" #include "titan/options.h"
#include "titan_stats.h"
#include "version_set.h" #include "version_set.h"
namespace rocksdb { namespace rocksdb {
...@@ -19,7 +20,7 @@ class BlobGCJob { ...@@ -19,7 +20,7 @@ class BlobGCJob {
const TitanDBOptions& titan_db_options, Env* env, const TitanDBOptions& titan_db_options, Env* env,
const EnvOptions& env_options, BlobFileManager* blob_file_manager, const EnvOptions& env_options, BlobFileManager* blob_file_manager,
VersionSet* version_set, LogBuffer* log_buffer, VersionSet* version_set, LogBuffer* log_buffer,
std::atomic_bool* shuting_down); std::atomic_bool* shuting_down, TitanStats* stats);
// No copying allowed // No copying allowed
BlobGCJob(const BlobGCJob&) = delete; BlobGCJob(const BlobGCJob&) = delete;
...@@ -57,7 +58,7 @@ class BlobGCJob { ...@@ -57,7 +58,7 @@ class BlobGCJob {
std::atomic_bool* shuting_down_{nullptr}; std::atomic_bool* shuting_down_{nullptr};
Statistics* stats_; TitanStats* stats_;
struct { struct {
uint64_t blob_db_bytes_read; uint64_t blob_db_bytes_read;
......
...@@ -116,7 +116,7 @@ class BlobGCJobTest : public testing::Test { ...@@ -116,7 +116,7 @@ class BlobGCJobTest : public testing::Test {
BlobGCJob blob_gc_job(blob_gc.get(), base_db_, mutex_, tdb_->db_options_, BlobGCJob blob_gc_job(blob_gc.get(), base_db_, mutex_, tdb_->db_options_,
tdb_->env_, EnvOptions(), tdb_->blob_manager_.get(), tdb_->env_, EnvOptions(), tdb_->blob_manager_.get(),
version_set_, &log_buffer, nullptr); version_set_, &log_buffer, nullptr, nullptr);
s = blob_gc_job.Prepare(); s = blob_gc_job.Prepare();
ASSERT_OK(s); ASSERT_OK(s);
...@@ -170,7 +170,7 @@ class BlobGCJobTest : public testing::Test { ...@@ -170,7 +170,7 @@ class BlobGCJobTest : public testing::Test {
blob_gc.SetColumnFamily(cfh); blob_gc.SetColumnFamily(cfh);
BlobGCJob blob_gc_job(&blob_gc, base_db_, mutex_, TitanDBOptions(), BlobGCJob blob_gc_job(&blob_gc, base_db_, mutex_, TitanDBOptions(),
Env::Default(), EnvOptions(), nullptr, version_set_, Env::Default(), EnvOptions(), nullptr, version_set_,
nullptr, nullptr); nullptr, nullptr, nullptr);
ASSERT_FALSE(blob_gc_job.DiscardEntry(key, blob_index)); ASSERT_FALSE(blob_gc_job.DiscardEntry(key, blob_index));
DestroyDB(); DestroyDB();
} }
......
...@@ -21,9 +21,9 @@ class BlobGCPickerTest : public testing::Test { ...@@ -21,9 +21,9 @@ class BlobGCPickerTest : public testing::Test {
void NewBlobStorageAndPicker(const TitanDBOptions& titan_db_options, void NewBlobStorageAndPicker(const TitanDBOptions& titan_db_options,
const TitanCFOptions& titan_cf_options) { const TitanCFOptions& titan_cf_options) {
auto blob_file_cache = std::make_shared<BlobFileCache>( auto blob_file_cache = std::make_shared<BlobFileCache>(
titan_db_options, titan_cf_options, NewLRUCache(128)); titan_db_options, titan_cf_options, NewLRUCache(128), nullptr);
blob_storage_.reset( blob_storage_.reset(new BlobStorage(titan_db_options, titan_cf_options, 0,
new BlobStorage(titan_db_options, titan_cf_options, blob_file_cache)); blob_file_cache, nullptr));
basic_blob_gc_picker_.reset( basic_blob_gc_picker_.reset(
new BasicBlobGCPicker(titan_db_options, titan_cf_options)); new BasicBlobGCPicker(titan_db_options, titan_cf_options));
} }
......
...@@ -45,6 +45,9 @@ void BlobStorage::ExportBlobFiles( ...@@ -45,6 +45,9 @@ void BlobStorage::ExportBlobFiles(
void BlobStorage::AddBlobFile(std::shared_ptr<BlobFileMeta>& file) { void BlobStorage::AddBlobFile(std::shared_ptr<BlobFileMeta>& file) {
MutexLock l(&mutex_); MutexLock l(&mutex_);
files_.emplace(std::make_pair(file->file_number(), file)); files_.emplace(std::make_pair(file->file_number(), file));
AddStats(stats_, cf_id_, TitanInternalStats::LIVE_BLOB_FILE_SIZE,
file->file_size());
AddStats(stats_, cf_id_, TitanInternalStats::NUM_LIVE_BLOB_FILE, 1);
} }
void BlobStorage::MarkFileObsolete(std::shared_ptr<BlobFileMeta> file, void BlobStorage::MarkFileObsolete(std::shared_ptr<BlobFileMeta> file,
...@@ -53,12 +56,22 @@ void BlobStorage::MarkFileObsolete(std::shared_ptr<BlobFileMeta> file, ...@@ -53,12 +56,22 @@ void BlobStorage::MarkFileObsolete(std::shared_ptr<BlobFileMeta> file,
obsolete_files_.push_back( obsolete_files_.push_back(
std::make_pair(file->file_number(), obsolete_sequence)); std::make_pair(file->file_number(), obsolete_sequence));
file->FileStateTransit(BlobFileMeta::FileEvent::kDelete); file->FileStateTransit(BlobFileMeta::FileEvent::kDelete);
SubStats(stats_, cf_id_, TitanInternalStats::LIVE_BLOB_SIZE,
file->file_size() - file->discardable_size());
SubStats(stats_, cf_id_, TitanInternalStats::LIVE_BLOB_FILE_SIZE,
file->file_size());
SubStats(stats_, cf_id_, TitanInternalStats::NUM_LIVE_BLOB_FILE, 1);
AddStats(stats_, cf_id_, TitanInternalStats::OBSOLETE_BLOB_FILE_SIZE,
file->file_size());
AddStats(stats_, cf_id_, TitanInternalStats::NUM_OBSOLETE_BLOB_FILE, 1);
} }
void BlobStorage::GetObsoleteFiles(std::vector<std::string>* obsolete_files, void BlobStorage::GetObsoleteFiles(std::vector<std::string>* obsolete_files,
SequenceNumber oldest_sequence) { SequenceNumber oldest_sequence) {
MutexLock l(&mutex_); MutexLock l(&mutex_);
uint32_t file_dropped = 0;
uint64_t file_dropped_size = 0;
for (auto it = obsolete_files_.begin(); it != obsolete_files_.end();) { for (auto it = obsolete_files_.begin(); it != obsolete_files_.end();) {
auto& file_number = it->first; auto& file_number = it->first;
auto& obsolete_sequence = it->second; auto& obsolete_sequence = it->second;
...@@ -67,7 +80,12 @@ void BlobStorage::GetObsoleteFiles(std::vector<std::string>* obsolete_files, ...@@ -67,7 +80,12 @@ void BlobStorage::GetObsoleteFiles(std::vector<std::string>* obsolete_files,
// visible to all existing snapshots. // visible to all existing snapshots.
if (oldest_sequence > obsolete_sequence) { if (oldest_sequence > obsolete_sequence) {
// remove obsolete files // remove obsolete files
files_.erase(file_number); auto p = files_.find(file_number);
assert(p != files_.end());
file_dropped++;
file_dropped_size += p->second->file_size();
files_.erase(p);
file_cache_->Evict(file_number); file_cache_->Evict(file_number);
ROCKS_LOG_INFO(db_options_.info_log, ROCKS_LOG_INFO(db_options_.info_log,
...@@ -85,6 +103,10 @@ void BlobStorage::GetObsoleteFiles(std::vector<std::string>* obsolete_files, ...@@ -85,6 +103,10 @@ void BlobStorage::GetObsoleteFiles(std::vector<std::string>* obsolete_files,
} }
++it; ++it;
} }
SubStats(stats_, cf_id_, TitanInternalStats::OBSOLETE_BLOB_FILE_SIZE,
file_dropped_size);
SubStats(stats_, cf_id_, TitanInternalStats::NUM_OBSOLETE_BLOB_FILE,
file_dropped);
} }
void BlobStorage::ComputeGCScore() { void BlobStorage::ComputeGCScore() {
......
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
#include "blob_format.h" #include "blob_format.h"
#include "blob_gc.h" #include "blob_gc.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/statistics.h" #include "titan_stats.h"
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
...@@ -20,17 +20,19 @@ class BlobStorage { ...@@ -20,17 +20,19 @@ class BlobStorage {
this->file_cache_ = bs.file_cache_; this->file_cache_ = bs.file_cache_;
this->db_options_ = bs.db_options_; this->db_options_ = bs.db_options_;
this->cf_options_ = bs.cf_options_; this->cf_options_ = bs.cf_options_;
this->cf_id_ = bs.cf_id_;
this->stats_ = bs.stats_; this->stats_ = bs.stats_;
} }
BlobStorage(const TitanDBOptions& _db_options, BlobStorage(const TitanDBOptions& _db_options,
const TitanCFOptions& _cf_options, const TitanCFOptions& _cf_options, uint32_t cf_id,
std::shared_ptr<BlobFileCache> _file_cache) std::shared_ptr<BlobFileCache> _file_cache, TitanStats* stats)
: db_options_(_db_options), : db_options_(_db_options),
cf_options_(_cf_options), cf_options_(_cf_options),
cf_id_(cf_id),
file_cache_(_file_cache), file_cache_(_file_cache),
destroyed_(false), destroyed_(false),
stats_(_db_options.statistics.get()) {} stats_(stats) {}
~BlobStorage() { ~BlobStorage() {
for (auto& file : files_) { for (auto& file : files_) {
...@@ -103,6 +105,7 @@ class BlobStorage { ...@@ -103,6 +105,7 @@ class BlobStorage {
TitanDBOptions db_options_; TitanDBOptions db_options_;
TitanCFOptions cf_options_; TitanCFOptions cf_options_;
uint32_t cf_id_;
mutable port::Mutex mutex_; mutable port::Mutex mutex_;
...@@ -118,7 +121,7 @@ class BlobStorage { ...@@ -118,7 +121,7 @@ class BlobStorage {
// kept. // kept.
bool destroyed_; bool destroyed_;
Statistics* stats_; TitanStats* stats_;
}; };
} // namespace titandb } // namespace titandb
......
...@@ -51,9 +51,9 @@ class TitanDBImpl::FileManager : public BlobFileManager { ...@@ -51,9 +51,9 @@ class TitanDBImpl::FileManager : public BlobFileManager {
VersionEdit edit; VersionEdit edit;
edit.SetColumnFamilyID(cf_id); edit.SetColumnFamilyID(cf_id);
for (auto& file : files) { for (auto& file : files) {
RecordTick(db_->stats_, BLOB_DB_BLOB_FILE_SYNCED); RecordTick(statistics(db_->stats_.get()), BLOB_DB_BLOB_FILE_SYNCED);
{ {
StopWatch sync_sw(db_->env_, db_->stats_, StopWatch sync_sw(db_->env_, statistics(db_->stats_.get()),
BLOB_DB_BLOB_FILE_SYNC_MICROS); BLOB_DB_BLOB_FILE_SYNC_MICROS);
s = file.second->GetFile()->Sync(false); s = file.second->GetFile()->Sync(false);
} }
...@@ -79,7 +79,11 @@ class TitanDBImpl::FileManager : public BlobFileManager { ...@@ -79,7 +79,11 @@ class TitanDBImpl::FileManager : public BlobFileManager {
Status BatchDeleteFiles( Status BatchDeleteFiles(
const std::vector<std::unique_ptr<BlobFileHandle>>& handles) override { const std::vector<std::unique_ptr<BlobFileHandle>>& handles) override {
Status s; Status s;
for (auto& handle : handles) s = db_->env_->DeleteFile(handle->GetName()); uint64_t file_size = 0;
for (auto& handle : handles) {
s = db_->env_->DeleteFile(handle->GetName());
file_size += handle->GetFile()->GetFileSize();
}
{ {
MutexLock l(&db_->mutex_); MutexLock l(&db_->mutex_);
for (const auto& handle : handles) for (const auto& handle : handles)
...@@ -116,13 +120,15 @@ TitanDBImpl::TitanDBImpl(const TitanDBOptions& options, ...@@ -116,13 +120,15 @@ TitanDBImpl::TitanDBImpl(const TitanDBOptions& options,
dbname_(dbname), dbname_(dbname),
env_(options.env), env_(options.env),
env_options_(options), env_options_(options),
db_options_(options), db_options_(options) {
stats_(options.statistics.get()) {
if (db_options_.dirname.empty()) { if (db_options_.dirname.empty()) {
db_options_.dirname = dbname_ + "/titandb"; db_options_.dirname = dbname_ + "/titandb";
} }
dirname_ = db_options_.dirname; dirname_ = db_options_.dirname;
vset_.reset(new VersionSet(db_options_)); if (db_options_.statistics != nullptr) {
stats_.reset(new TitanStats(db_options_.statistics.get()));
}
vset_.reset(new VersionSet(db_options_, stats_.get()));
blob_manager_.reset(new FileManager(this)); blob_manager_.reset(new FileManager(this));
} }
...@@ -175,7 +181,8 @@ Status TitanDBImpl::Open(const std::vector<TitanCFDescriptor>& descs, ...@@ -175,7 +181,8 @@ Status TitanDBImpl::Open(const std::vector<TitanCFDescriptor>& descs,
assert(base_table_factory != nullptr); assert(base_table_factory != nullptr);
base_table_factory_[cf_id] = base_table_factory; base_table_factory_[cf_id] = base_table_factory;
titan_table_factory_[cf_id] = std::make_shared<TitanTableFactory>( titan_table_factory_[cf_id] = std::make_shared<TitanTableFactory>(
db_options_, descs[i].options, blob_manager_, &mutex_, vset_.get()); db_options_, descs[i].options, blob_manager_, &mutex_, vset_.get(),
stats_.get());
base_descs[i].options.table_factory = titan_table_factory_[cf_id]; base_descs[i].options.table_factory = titan_table_factory_[cf_id];
// Add TableProperties for collecting statistics GC // Add TableProperties for collecting statistics GC
base_descs[i].options.table_properties_collector_factories.emplace_back( base_descs[i].options.table_properties_collector_factories.emplace_back(
...@@ -211,6 +218,9 @@ Status TitanDBImpl::Open(const std::vector<TitanCFDescriptor>& descs, ...@@ -211,6 +218,9 @@ Status TitanDBImpl::Open(const std::vector<TitanCFDescriptor>& descs,
s = DB::Open(db_options_, dbname_, base_descs, handles, &db_); s = DB::Open(db_options_, dbname_, base_descs, handles, &db_);
if (s.ok()) { if (s.ok()) {
db_impl_ = reinterpret_cast<DBImpl*>(db_->GetRootDB()); db_impl_ = reinterpret_cast<DBImpl*>(db_->GetRootDB());
if (stats_.get()) {
stats_->Initialize(column_families, db_->DefaultColumnFamily()->GetID());
}
} }
return s; return s;
} }
...@@ -274,7 +284,8 @@ Status TitanDBImpl::CreateColumnFamilies( ...@@ -274,7 +284,8 @@ Status TitanDBImpl::CreateColumnFamilies(
// Replaces the provided table factory with TitanTableFactory. // Replaces the provided table factory with TitanTableFactory.
base_table_factory.emplace_back(options.table_factory); base_table_factory.emplace_back(options.table_factory);
titan_table_factory.emplace_back(std::make_shared<TitanTableFactory>( titan_table_factory.emplace_back(std::make_shared<TitanTableFactory>(
db_options_, desc.options, blob_manager_, &mutex_, vset_.get())); db_options_, desc.options, blob_manager_, &mutex_, vset_.get(),
stats_.get()));
options.table_factory = titan_table_factory.back(); options.table_factory = titan_table_factory.back();
base_descs.emplace_back(desc.name, options); base_descs.emplace_back(desc.name, options);
} }
...@@ -370,7 +381,7 @@ Status TitanDBImpl::GetImpl(const ReadOptions& options, ...@@ -370,7 +381,7 @@ Status TitanDBImpl::GetImpl(const ReadOptions& options,
nullptr /*read_callback*/, &is_blob_index); nullptr /*read_callback*/, &is_blob_index);
if (!s.ok() || !is_blob_index) return s; if (!s.ok() || !is_blob_index) return s;
StopWatch get_sw(env_, stats_, BLOB_DB_GET_MICROS); StopWatch get_sw(env_, statistics(stats_.get()), BLOB_DB_GET_MICROS);
BlobIndex index; BlobIndex index;
s = index.DecodeFrom(value); s = index.DecodeFrom(value);
...@@ -385,10 +396,12 @@ Status TitanDBImpl::GetImpl(const ReadOptions& options, ...@@ -385,10 +396,12 @@ Status TitanDBImpl::GetImpl(const ReadOptions& options,
mutex_.Unlock(); mutex_.Unlock();
{ {
StopWatch read_sw(env_, stats_, BLOB_DB_BLOB_FILE_READ_MICROS); StopWatch read_sw(env_, statistics(stats_.get()),
BLOB_DB_BLOB_FILE_READ_MICROS);
s = storage->Get(options, index, &record, &buffer); s = storage->Get(options, index, &record, &buffer);
RecordTick(stats_, BLOB_DB_NUM_KEYS_READ); RecordTick(statistics(stats_.get()), BLOB_DB_NUM_KEYS_READ);
RecordTick(stats_, BLOB_DB_BLOB_FILE_BYTES_READ, index.blob_handle.size); RecordTick(statistics(stats_.get()), BLOB_DB_BLOB_FILE_BYTES_READ,
index.blob_handle.size);
} }
if (s.IsCorruption()) { if (s.IsCorruption()) {
ROCKS_LOG_DEBUG(db_options_.info_log, ROCKS_LOG_DEBUG(db_options_.info_log,
...@@ -462,7 +475,7 @@ Iterator* TitanDBImpl::NewIteratorImpl( ...@@ -462,7 +475,7 @@ Iterator* TitanDBImpl::NewIteratorImpl(
options, cfd, options.snapshot->GetSequenceNumber(), options, cfd, options.snapshot->GetSequenceNumber(),
nullptr /*read_callback*/, true /*allow_blob*/, true /*allow_refresh*/)); nullptr /*read_callback*/, true /*allow_blob*/, true /*allow_refresh*/));
return new TitanDBIterator(options, storage.lock().get(), snapshot, return new TitanDBIterator(options, storage.lock().get(), snapshot,
std::move(iter), env_, stats_); std::move(iter), env_, stats_.get());
} }
Status TitanDBImpl::NewIterators( Status TitanDBImpl::NewIterators(
...@@ -544,6 +557,40 @@ Status TitanDBImpl::SetOptions( ...@@ -544,6 +557,40 @@ Status TitanDBImpl::SetOptions(
return Status::OK(); return Status::OK();
} }
bool TitanDBImpl::GetProperty(ColumnFamilyHandle* column_family,
const Slice& property, std::string* value) {
assert(column_family != nullptr);
bool s = false;
if (stats_.get() != nullptr) {
auto stats = stats_->internal_stats(column_family->GetID());
if (stats != nullptr) {
s = stats->GetStringProperty(property, value);
}
}
if (s) {
return s;
} else {
return db_impl_->GetProperty(column_family, property, value);
}
}
bool TitanDBImpl::GetIntProperty(ColumnFamilyHandle* column_family,
const Slice& property, uint64_t* value) {
assert(column_family != nullptr);
bool s = false;
if (stats_.get() != nullptr) {
auto stats = stats_->internal_stats(column_family->GetID());
if (stats != nullptr) {
s = stats->GetIntProperty(property, value);
}
}
if (s) {
return s;
} else {
return db_impl_->GetIntProperty(column_family, property, value);
}
}
void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) { void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) {
const auto& tps = flush_job_info.table_properties; const auto& tps = flush_job_info.table_properties;
auto ucp_iter = tps.user_collected_properties.find( auto ucp_iter = tps.user_collected_properties.find(
...@@ -651,6 +698,7 @@ void TitanDBImpl::OnCompactionCompleted( ...@@ -651,6 +698,7 @@ void TitanDBImpl::OnCompactionCompleted(
file->FileStateTransit(BlobFileMeta::FileEvent::kCompactionCompleted); file->FileStateTransit(BlobFileMeta::FileEvent::kCompactionCompleted);
} }
uint64_t delta = 0;
for (const auto& bfs : blob_files_size) { for (const auto& bfs : blob_files_size) {
// blob file size < 0 means discardable size > 0 // blob file size < 0 means discardable size > 0
if (bfs.second >= 0) { if (bfs.second >= 0) {
...@@ -661,8 +709,13 @@ void TitanDBImpl::OnCompactionCompleted( ...@@ -661,8 +709,13 @@ void TitanDBImpl::OnCompactionCompleted(
// file has been gc out // file has been gc out
continue; continue;
} }
if (!file->is_obsolete()) {
delta += -bfs.second;
}
file->AddDiscardableSize(static_cast<uint64_t>(-bfs.second)); file->AddDiscardableSize(static_cast<uint64_t>(-bfs.second));
} }
SubStats(stats_.get(), compaction_job_info.cf_id,
TitanInternalStats::LIVE_BLOB_SIZE, delta);
bs->ComputeGCScore(); bs->ComputeGCScore();
AddToGCQueue(compaction_job_info.cf_id); AddToGCQueue(compaction_job_info.cf_id);
......
...@@ -73,6 +73,14 @@ class TitanDBImpl : public TitanDB { ...@@ -73,6 +73,14 @@ class TitanDBImpl : public TitanDB {
ColumnFamilyHandle* column_family, ColumnFamilyHandle* column_family,
const std::unordered_map<std::string, std::string>& new_options) override; const std::unordered_map<std::string, std::string>& new_options) override;
using TitanDB::GetProperty;
bool GetProperty(ColumnFamilyHandle* column_family, const Slice& property,
std::string* value) override;
using TitanDB::GetIntProperty;
bool GetIntProperty(ColumnFamilyHandle* column_family, const Slice& property,
uint64_t* value) override;
void OnFlushCompleted(const FlushJobInfo& flush_job_info); void OnFlushCompleted(const FlushJobInfo& flush_job_info);
void OnCompactionCompleted(const CompactionJobInfo& compaction_job_info); void OnCompactionCompleted(const CompactionJobInfo& compaction_job_info);
...@@ -153,8 +161,9 @@ class TitanDBImpl : public TitanDB { ...@@ -153,8 +161,9 @@ class TitanDBImpl : public TitanDB {
DBImpl* db_impl_; DBImpl* db_impl_;
TitanDBOptions db_options_; TitanDBOptions db_options_;
// statistics object sharing with RocksDB // TitanStats is turned on only if statistics field of DBOptions
Statistics* stats_; // is not null.
std::unique_ptr<TitanStats> stats_;
std::unordered_map<uint32_t, std::shared_ptr<TableFactory>> std::unordered_map<uint32_t, std::shared_ptr<TableFactory>>
base_table_factory_; base_table_factory_;
......
...@@ -60,7 +60,7 @@ void TitanDBImpl::BackgroundCallGC() { ...@@ -60,7 +60,7 @@ void TitanDBImpl::BackgroundCallGC() {
Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) { Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) {
mutex_.AssertHeld(); mutex_.AssertHeld();
StopWatch gc_sw(env_, stats_, BLOB_DB_GC_MICROS); StopWatch gc_sw(env_, statistics(stats_.get()), BLOB_DB_GC_MICROS);
std::unique_ptr<BlobGC> blob_gc; std::unique_ptr<BlobGC> blob_gc;
std::unique_ptr<ColumnFamilyHandle> cfh; std::unique_ptr<ColumnFamilyHandle> cfh;
...@@ -68,8 +68,9 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) { ...@@ -68,8 +68,9 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) {
if (!gc_queue_.empty()) { if (!gc_queue_.empty()) {
uint32_t column_family_id = PopFirstFromGCQueue(); uint32_t column_family_id = PopFirstFromGCQueue();
auto bs = vset_->GetBlobStorage(column_family_id).lock().get(); auto bs = vset_->GetBlobStorage(column_family_id).lock().get();
if (bs) {
const auto& cf_options = bs->cf_options(); const auto& cf_options = bs->cf_options();
std::shared_ptr<BlobGCPicker> blob_gc_picker = std::shared_ptr<BlobGCPicker> blob_gc_picker =
std::make_shared<BasicBlobGCPicker>(db_options_, cf_options); std::make_shared<BasicBlobGCPicker>(db_options_, cf_options);
...@@ -81,6 +82,7 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) { ...@@ -81,6 +82,7 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) {
blob_gc->SetColumnFamily(cfh.get()); blob_gc->SetColumnFamily(cfh.get());
} }
} }
}
// TODO(@DorianZheng) Make sure enough room for GC // TODO(@DorianZheng) Make sure enough room for GC
...@@ -90,7 +92,7 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) { ...@@ -90,7 +92,7 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) {
} else { } else {
BlobGCJob blob_gc_job(blob_gc.get(), db_, &mutex_, db_options_, env_, BlobGCJob blob_gc_job(blob_gc.get(), db_, &mutex_, db_options_, env_,
env_options_, blob_manager_.get(), vset_.get(), env_options_, blob_manager_.get(), vset_.get(),
log_buffer, &shuting_down_); log_buffer, &shuting_down_, stats_.get());
s = blob_gc_job.Prepare(); s = blob_gc_job.Prepare();
if (s.ok()) { if (s.ok()) {
mutex_.Unlock(); mutex_.Unlock();
...@@ -126,7 +128,7 @@ Status TitanDBImpl::TEST_StartGC(uint32_t column_family_id) { ...@@ -126,7 +128,7 @@ Status TitanDBImpl::TEST_StartGC(uint32_t column_family_id) {
assert(bg_gc_scheduled_ > 0); assert(bg_gc_scheduled_ > 0);
// BackgroudGC // BackgroudGC
StopWatch gc_sw(env_, stats_, BLOB_DB_GC_MICROS); StopWatch gc_sw(env_, statistics(stats_.get()), BLOB_DB_GC_MICROS);
std::unique_ptr<BlobGC> blob_gc; std::unique_ptr<BlobGC> blob_gc;
std::unique_ptr<ColumnFamilyHandle> cfh; std::unique_ptr<ColumnFamilyHandle> cfh;
...@@ -148,7 +150,7 @@ Status TitanDBImpl::TEST_StartGC(uint32_t column_family_id) { ...@@ -148,7 +150,7 @@ Status TitanDBImpl::TEST_StartGC(uint32_t column_family_id) {
} else { } else {
BlobGCJob blob_gc_job(blob_gc.get(), db_, &mutex_, db_options_, env_, BlobGCJob blob_gc_job(blob_gc.get(), db_, &mutex_, db_options_, env_,
env_options_, blob_manager_.get(), vset_.get(), env_options_, blob_manager_.get(), vset_.get(),
&log_buffer, &shuting_down_); &log_buffer, &shuting_down_, stats_.get());
s = blob_gc_job.Prepare(); s = blob_gc_job.Prepare();
if (s.ok()) { if (s.ok()) {
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
#include <inttypes.h> #include <inttypes.h>
#include "db/db_iter.h" #include "db/db_iter.h"
#include "rocksdb/statistics.h" #include "titan_stats.h"
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
...@@ -17,7 +17,7 @@ class TitanDBIterator : public Iterator { ...@@ -17,7 +17,7 @@ class TitanDBIterator : public Iterator {
TitanDBIterator(const ReadOptions& options, BlobStorage* storage, TitanDBIterator(const ReadOptions& options, BlobStorage* storage,
std::shared_ptr<ManagedSnapshot> snap, std::shared_ptr<ManagedSnapshot> snap,
std::unique_ptr<ArenaWrappedDBIter> iter, Env* env, std::unique_ptr<ArenaWrappedDBIter> iter, Env* env,
Statistics* stats) TitanStats* stats)
: options_(options), : options_(options),
storage_(storage), storage_(storage),
snap_(snap), snap_(snap),
...@@ -39,7 +39,7 @@ class TitanDBIterator : public Iterator { ...@@ -39,7 +39,7 @@ class TitanDBIterator : public Iterator {
void SeekToFirst() override { void SeekToFirst() override {
iter_->SeekToFirst(); iter_->SeekToFirst();
if (Check()) { if (Check()) {
StopWatch seek_sw(env_, stats_, BLOB_DB_SEEK_MICROS); StopWatch seek_sw(env_, statistics(stats_), BLOB_DB_SEEK_MICROS);
GetBlobValue(); GetBlobValue();
RecordTick(stats_, BLOB_DB_NUM_SEEK); RecordTick(stats_, BLOB_DB_NUM_SEEK);
} }
...@@ -48,7 +48,7 @@ class TitanDBIterator : public Iterator { ...@@ -48,7 +48,7 @@ class TitanDBIterator : public Iterator {
void SeekToLast() override { void SeekToLast() override {
iter_->SeekToLast(); iter_->SeekToLast();
if (Check()) { if (Check()) {
StopWatch seek_sw(env_, stats_, BLOB_DB_SEEK_MICROS); StopWatch seek_sw(env_, statistics(stats_), BLOB_DB_SEEK_MICROS);
GetBlobValue(); GetBlobValue();
RecordTick(stats_, BLOB_DB_NUM_SEEK); RecordTick(stats_, BLOB_DB_NUM_SEEK);
} }
...@@ -57,7 +57,7 @@ class TitanDBIterator : public Iterator { ...@@ -57,7 +57,7 @@ class TitanDBIterator : public Iterator {
void Seek(const Slice& target) override { void Seek(const Slice& target) override {
iter_->Seek(target); iter_->Seek(target);
if (Check()) { if (Check()) {
StopWatch seek_sw(env_, stats_, BLOB_DB_SEEK_MICROS); StopWatch seek_sw(env_, statistics(stats_), BLOB_DB_SEEK_MICROS);
GetBlobValue(); GetBlobValue();
RecordTick(stats_, BLOB_DB_NUM_SEEK); RecordTick(stats_, BLOB_DB_NUM_SEEK);
} }
...@@ -66,7 +66,7 @@ class TitanDBIterator : public Iterator { ...@@ -66,7 +66,7 @@ class TitanDBIterator : public Iterator {
void SeekForPrev(const Slice& target) override { void SeekForPrev(const Slice& target) override {
iter_->SeekForPrev(target); iter_->SeekForPrev(target);
if (Check()) { if (Check()) {
StopWatch seek_sw(env_, stats_, BLOB_DB_SEEK_MICROS); StopWatch seek_sw(env_, statistics(stats_), BLOB_DB_SEEK_MICROS);
GetBlobValue(); GetBlobValue();
RecordTick(stats_, BLOB_DB_NUM_SEEK); RecordTick(stats_, BLOB_DB_NUM_SEEK);
} }
...@@ -76,7 +76,7 @@ class TitanDBIterator : public Iterator { ...@@ -76,7 +76,7 @@ class TitanDBIterator : public Iterator {
assert(Valid()); assert(Valid());
iter_->Next(); iter_->Next();
if (Check()) { if (Check()) {
StopWatch next_sw(env_, stats_, BLOB_DB_NEXT_MICROS); StopWatch next_sw(env_, statistics(stats_), BLOB_DB_NEXT_MICROS);
GetBlobValue(); GetBlobValue();
RecordTick(stats_, BLOB_DB_NUM_NEXT); RecordTick(stats_, BLOB_DB_NUM_NEXT);
} }
...@@ -86,7 +86,7 @@ class TitanDBIterator : public Iterator { ...@@ -86,7 +86,7 @@ class TitanDBIterator : public Iterator {
assert(Valid()); assert(Valid());
iter_->Prev(); iter_->Prev();
if (Check()) { if (Check()) {
StopWatch prev_sw(env_, stats_, BLOB_DB_PREV_MICROS); StopWatch prev_sw(env_, statistics(stats_), BLOB_DB_PREV_MICROS);
GetBlobValue(); GetBlobValue();
RecordTick(stats_, BLOB_DB_NUM_PREV); RecordTick(stats_, BLOB_DB_NUM_PREV);
} }
...@@ -153,7 +153,7 @@ class TitanDBIterator : public Iterator { ...@@ -153,7 +153,7 @@ class TitanDBIterator : public Iterator {
std::map<uint64_t, std::unique_ptr<BlobFilePrefetcher>> files_; std::map<uint64_t, std::unique_ptr<BlobFilePrefetcher>> files_;
Env* env_; Env* env_;
Statistics* stats_; TitanStats* stats_;
}; };
} // namespace titandb } // namespace titandb
......
...@@ -56,7 +56,8 @@ void TitanTableBuilder::Add(const Slice& key, const Slice& value) { ...@@ -56,7 +56,8 @@ void TitanTableBuilder::Add(const Slice& key, const Slice& value) {
void TitanTableBuilder::AddBlob(const Slice& key, const Slice& value, void TitanTableBuilder::AddBlob(const Slice& key, const Slice& value,
std::string* index_value) { std::string* index_value) {
if (!ok()) return; if (!ok()) return;
StopWatch write_sw(db_options_.env, stats_, BLOB_DB_BLOB_FILE_WRITE_MICROS); StopWatch write_sw(db_options_.env, statistics(stats_),
BLOB_DB_BLOB_FILE_WRITE_MICROS);
if (!blob_builder_) { if (!blob_builder_) {
status_ = blob_manager_->NewFile(&blob_handle_); status_ = blob_manager_->NewFile(&blob_handle_);
...@@ -68,6 +69,7 @@ void TitanTableBuilder::AddBlob(const Slice& key, const Slice& value, ...@@ -68,6 +69,7 @@ void TitanTableBuilder::AddBlob(const Slice& key, const Slice& value,
RecordTick(stats_, BLOB_DB_NUM_KEYS_WRITTEN); RecordTick(stats_, BLOB_DB_NUM_KEYS_WRITTEN);
MeasureTime(stats_, BLOB_DB_KEY_SIZE, key.size()); MeasureTime(stats_, BLOB_DB_KEY_SIZE, key.size());
MeasureTime(stats_, BLOB_DB_VALUE_SIZE, value.size()); MeasureTime(stats_, BLOB_DB_VALUE_SIZE, value.size());
AddStats(stats_, cf_id_, TitanInternalStats::LIVE_BLOB_SIZE, value.size());
BlobIndex index; BlobIndex index;
BlobRecord record; BlobRecord record;
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
#include "blob_file_manager.h" #include "blob_file_manager.h"
#include "table/table_builder.h" #include "table/table_builder.h"
#include "titan/options.h" #include "titan/options.h"
#include "titan_stats.h"
#include "version_set.h" #include "version_set.h"
namespace rocksdb { namespace rocksdb {
...@@ -15,14 +16,14 @@ class TitanTableBuilder : public TableBuilder { ...@@ -15,14 +16,14 @@ class TitanTableBuilder : public TableBuilder {
const TitanCFOptions& cf_options, const TitanCFOptions& cf_options,
std::unique_ptr<TableBuilder> base_builder, std::unique_ptr<TableBuilder> base_builder,
std::shared_ptr<BlobFileManager> blob_manager, std::shared_ptr<BlobFileManager> blob_manager,
std::weak_ptr<BlobStorage> blob_storage) std::weak_ptr<BlobStorage> blob_storage, TitanStats* stats)
: cf_id_(cf_id), : cf_id_(cf_id),
db_options_(db_options), db_options_(db_options),
cf_options_(cf_options), cf_options_(cf_options),
base_builder_(std::move(base_builder)), base_builder_(std::move(base_builder)),
blob_manager_(blob_manager), blob_manager_(blob_manager),
blob_storage_(blob_storage), blob_storage_(blob_storage),
stats_(db_options.statistics.get()) {} stats_(stats) {}
void Add(const Slice& key, const Slice& value) override; void Add(const Slice& key, const Slice& value) override;
...@@ -55,7 +56,7 @@ class TitanTableBuilder : public TableBuilder { ...@@ -55,7 +56,7 @@ class TitanTableBuilder : public TableBuilder {
std::unique_ptr<BlobFileBuilder> blob_builder_; std::unique_ptr<BlobFileBuilder> blob_builder_;
std::weak_ptr<BlobStorage> blob_storage_; std::weak_ptr<BlobStorage> blob_storage_;
Statistics* stats_; TitanStats* stats_;
}; };
} // namespace titandb } // namespace titandb
......
...@@ -80,10 +80,11 @@ class TableBuilderTest : public testing::Test { ...@@ -80,10 +80,11 @@ class TableBuilderTest : public testing::Test {
blob_name_(BlobFileName(tmpdir_, kTestFileNumber)) { blob_name_(BlobFileName(tmpdir_, kTestFileNumber)) {
db_options_.dirname = tmpdir_; db_options_.dirname = tmpdir_;
cf_options_.min_blob_size = kMinBlobSize; cf_options_.min_blob_size = kMinBlobSize;
vset_.reset(new VersionSet(db_options_)); vset_.reset(new VersionSet(db_options_, nullptr));
blob_manager_.reset(new FileManager(db_options_)); blob_manager_.reset(new FileManager(db_options_));
table_factory_.reset(new TitanTableFactory( table_factory_.reset(new TitanTableFactory(db_options_, cf_options_,
db_options_, cf_options_, blob_manager_, &mutex_, vset_.get())); blob_manager_, &mutex_,
vset_.get(), nullptr));
} }
~TableBuilderTest() { ~TableBuilderTest() {
......
...@@ -32,7 +32,7 @@ TableBuilder* TitanTableFactory::NewTableBuilder( ...@@ -32,7 +32,7 @@ TableBuilder* TitanTableFactory::NewTableBuilder(
} }
return new TitanTableBuilder(column_family_id, db_options_, cf_options, return new TitanTableBuilder(column_family_id, db_options_, cf_options,
std::move(base_builder), blob_manager_, std::move(base_builder), blob_manager_,
blob_storage); blob_storage, stats_);
} }
std::string TitanTableFactory::GetPrintableTableOptions() const { std::string TitanTableFactory::GetPrintableTableOptions() const {
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
#include "blob_file_manager.h" #include "blob_file_manager.h"
#include "rocksdb/table.h" #include "rocksdb/table.h"
#include "titan/options.h" #include "titan/options.h"
#include "titan_stats.h"
#include "version_set.h" #include "version_set.h"
namespace rocksdb { namespace rocksdb {
...@@ -13,14 +14,15 @@ class TitanTableFactory : public TableFactory { ...@@ -13,14 +14,15 @@ class TitanTableFactory : public TableFactory {
TitanTableFactory(const TitanDBOptions& db_options, TitanTableFactory(const TitanDBOptions& db_options,
const TitanCFOptions& cf_options, const TitanCFOptions& cf_options,
std::shared_ptr<BlobFileManager> blob_manager, std::shared_ptr<BlobFileManager> blob_manager,
port::Mutex* db_mutex, VersionSet* vset) port::Mutex* db_mutex, VersionSet* vset, TitanStats* stats)
: db_options_(db_options), : db_options_(db_options),
immutable_cf_options_(cf_options), immutable_cf_options_(cf_options),
mutable_cf_options_(cf_options), mutable_cf_options_(cf_options),
base_factory_(cf_options.table_factory), base_factory_(cf_options.table_factory),
blob_manager_(blob_manager), blob_manager_(blob_manager),
db_mutex_(db_mutex), db_mutex_(db_mutex),
vset_(vset) {} vset_(vset),
stats_(stats) {}
const char* Name() const override { return "TitanTable"; } const char* Name() const override { return "TitanTable"; }
...@@ -69,6 +71,7 @@ class TitanTableFactory : public TableFactory { ...@@ -69,6 +71,7 @@ class TitanTableFactory : public TableFactory {
std::shared_ptr<BlobFileManager> blob_manager_; std::shared_ptr<BlobFileManager> blob_manager_;
port::Mutex* db_mutex_; port::Mutex* db_mutex_;
VersionSet* vset_; VersionSet* vset_;
TitanStats* stats_;
}; };
} // namespace titandb } // namespace titandb
......
#include "titan_stats.h"
#include "titan/db.h"
#include <map>
#include <string>
namespace rocksdb {
namespace titandb {
static const std::string titandb_prefix = "rocksdb.titandb.";
static const std::string live_blob_size = "live-blob-size";
static const std::string num_live_blob_file = "num-live-blob-file";
static const std::string num_obsolete_blob_file = "num-obsolete-blob-file";
static const std::string live_blob_file_size = "live-blob-file-size";
static const std::string obsolete_blob_file_size = "obsolete-blob-file-size";
const std::string TitanDB::Properties::kLiveBlobSize =
titandb_prefix + live_blob_size;
const std::string TitanDB::Properties::kNumLiveBlobFile =
titandb_prefix + num_live_blob_file;
const std::string TitanDB::Properties::kNumObsoleteBlobFile =
titandb_prefix + num_obsolete_blob_file;
const std::string TitanDB::Properties::kLiveBlobFileSize =
titandb_prefix + live_blob_file_size;
const std::string TitanDB::Properties::kObsoleteBlobFileSize =
titandb_prefix + obsolete_blob_file_size;
const std::unordered_map<std::string, TitanInternalStats::StatsType>
TitanInternalStats::stats_type_string_map = {
{TitanDB::Properties::kLiveBlobSize,
TitanInternalStats::LIVE_BLOB_SIZE},
{TitanDB::Properties::kNumLiveBlobFile,
TitanInternalStats::NUM_LIVE_BLOB_FILE},
{TitanDB::Properties::kNumObsoleteBlobFile,
TitanInternalStats::NUM_OBSOLETE_BLOB_FILE},
{TitanDB::Properties::kLiveBlobFileSize,
TitanInternalStats::LIVE_BLOB_FILE_SIZE},
{TitanDB::Properties::kObsoleteBlobFileSize,
TitanInternalStats::OBSOLETE_BLOB_FILE_SIZE},
};
} // namespace titandb
} // namespace rocksdb
#pragma once
#include "rocksdb/statistics.h"
#include "titan/options.h"
#include <atomic>
#include <map>
#include <string>
#include <unordered_map>
namespace rocksdb {
namespace titandb {
// Titan internal stats does NOT optimize race
// condition by making thread local copies of
// data.
class TitanInternalStats {
public:
enum StatsType {
LIVE_BLOB_SIZE,
NUM_LIVE_BLOB_FILE,
NUM_OBSOLETE_BLOB_FILE,
LIVE_BLOB_FILE_SIZE,
OBSOLETE_BLOB_FILE_SIZE,
INTERNAL_STATS_ENUM_MAX,
};
void Clear() {
for (int i = 0; i < INTERNAL_STATS_ENUM_MAX; i++) {
stats_[i].store(0, std::memory_order_relaxed);
}
}
void ResetStats(StatsType type) {
stats_[type].store(0, std::memory_order_relaxed);
}
void AddStats(StatsType type, uint64_t value) {
auto& v = stats_[type];
v.fetch_add(value, std::memory_order_relaxed);
}
void SubStats(StatsType type, uint64_t value) {
auto& v = stats_[type];
v.fetch_sub(value, std::memory_order_relaxed);
}
bool GetIntProperty(const Slice& property, uint64_t* value) const {
auto p = stats_type_string_map.find(property.ToString());
if (p != stats_type_string_map.end()) {
*value = stats_[p->second].load(std::memory_order_relaxed);
return true;
}
return false;
}
bool GetStringProperty(const Slice& property, std::string* value) const {
uint64_t int_value;
if (GetIntProperty(property, &int_value)) {
*value = std::to_string(int_value);
return true;
}
return false;
}
private:
static const std::unordered_map<std::string, TitanInternalStats::StatsType>
stats_type_string_map;
std::atomic<uint64_t> stats_[INTERNAL_STATS_ENUM_MAX];
};
class TitanStats {
public:
TitanStats(Statistics* stats) : stats_(stats) {}
Status Initialize(std::map<uint32_t, TitanCFOptions> cf_options,
uint32_t default_cf) {
for (auto& opts : cf_options) {
internal_stats_[opts.first] = NewTitanInternalStats(opts.second);
}
default_cf_ = default_cf;
return Status::OK();
}
Statistics* statistics() { return stats_; }
TitanInternalStats* internal_stats(uint32_t cf_id) {
auto p = internal_stats_.find(cf_id);
if (p == internal_stats_.end()) {
return nullptr;
} else {
return p->second.get();
}
}
private:
Statistics* stats_ = nullptr;
uint32_t default_cf_ = 0;
std::unordered_map<uint32_t, std::shared_ptr<TitanInternalStats>>
internal_stats_;
std::shared_ptr<TitanInternalStats> NewTitanInternalStats(
TitanCFOptions& opts) {
return std::make_shared<TitanInternalStats>();
}
};
// Utility functions
inline Statistics* statistics(TitanStats* stats) {
return (stats) ? stats->statistics() : nullptr;
}
inline void RecordTick(TitanStats* stats, uint32_t ticker_type,
uint64_t count = 1) {
if (stats && stats->statistics()) {
stats->statistics()->recordTick(ticker_type, count);
}
}
inline void MeasureTime(TitanStats* stats, uint32_t histogram_type,
uint64_t time) {
if (stats && stats->statistics()) {
stats->statistics()->measureTime(histogram_type, time);
}
}
inline void SetTickerCount(TitanStats* stats, uint32_t ticker_type,
uint64_t count) {
if (stats && stats->statistics()) {
stats->statistics()->setTickerCount(ticker_type, count);
}
}
inline void ResetStats(TitanStats* stats, uint32_t cf_id,
TitanInternalStats::StatsType type) {
if (stats) {
auto p = stats->internal_stats(cf_id);
if (p) {
p->ResetStats(type);
}
}
}
inline void AddStats(TitanStats* stats, uint32_t cf_id,
TitanInternalStats::StatsType type, uint64_t value) {
if (stats) {
auto p = stats->internal_stats(cf_id);
if (p) {
p->AddStats(type, value);
}
}
}
inline void SubStats(TitanStats* stats, uint32_t cf_id,
TitanInternalStats::StatsType type, uint64_t value) {
if (stats) {
auto p = stats->internal_stats(cf_id);
if (p) {
p->SubStats(type, value);
}
}
}
} // namespace titandb
} // namespace rocksdb
...@@ -9,11 +9,12 @@ namespace titandb { ...@@ -9,11 +9,12 @@ namespace titandb {
const size_t kMaxFileCacheSize = 1024 * 1024; const size_t kMaxFileCacheSize = 1024 * 1024;
VersionSet::VersionSet(const TitanDBOptions& options) VersionSet::VersionSet(const TitanDBOptions& options, TitanStats* stats)
: dirname_(options.dirname), : dirname_(options.dirname),
env_(options.env), env_(options.env),
env_options_(options), env_options_(options),
db_options_(options) { db_options_(options),
stats_(stats) {
auto file_cache_size = db_options_.max_open_files; auto file_cache_size = db_options_.max_open_files;
if (file_cache_size < 0) { if (file_cache_size < 0) {
file_cache_size = kMaxFileCacheSize; file_cache_size = kMaxFileCacheSize;
...@@ -255,10 +256,10 @@ Status VersionSet::Apply(VersionEdit* edit) { ...@@ -255,10 +256,10 @@ Status VersionSet::Apply(VersionEdit* edit) {
void VersionSet::AddColumnFamilies( void VersionSet::AddColumnFamilies(
const std::map<uint32_t, TitanCFOptions>& column_families) { const std::map<uint32_t, TitanCFOptions>& column_families) {
for (auto& cf : column_families) { for (auto& cf : column_families) {
auto file_cache = auto file_cache = std::make_shared<BlobFileCache>(db_options_, cf.second,
std::make_shared<BlobFileCache>(db_options_, cf.second, file_cache_); file_cache_, stats_);
auto blob_storage = auto blob_storage = std::make_shared<BlobStorage>(
std::make_shared<BlobStorage>(db_options_, cf.second, file_cache); db_options_, cf.second, cf.first, file_cache, stats_);
column_families_.emplace(cf.first, blob_storage); column_families_.emplace(cf.first, blob_storage);
} }
} }
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "titan/options.h" #include "titan/options.h"
#include "titan_stats.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "version_edit.h" #include "version_edit.h"
...@@ -21,7 +22,7 @@ namespace titandb { ...@@ -21,7 +22,7 @@ namespace titandb {
class VersionSet { class VersionSet {
public: public:
explicit VersionSet(const TitanDBOptions& options); explicit VersionSet(const TitanDBOptions& options, TitanStats* stats);
// Sets up the storage specified in "options.dirname". // Sets up the storage specified in "options.dirname".
// If the manifest doesn't exist, it will create one. // If the manifest doesn't exist, it will create one.
...@@ -92,6 +93,8 @@ class VersionSet { ...@@ -92,6 +93,8 @@ class VersionSet {
TitanDBOptions db_options_; TitanDBOptions db_options_;
std::shared_ptr<Cache> file_cache_; std::shared_ptr<Cache> file_cache_;
TitanStats* stats_;
std::vector<std::string> obsolete_manifests_; std::vector<std::string> obsolete_manifests_;
// As rocksdb described, `DropColumnFamilies()` only records the drop of the // As rocksdb described, `DropColumnFamilies()` only records the drop of the
......
...@@ -38,7 +38,8 @@ class VersionTest : public testing::Test { ...@@ -38,7 +38,8 @@ class VersionTest : public testing::Test {
env_->CreateDirIfMissing(dbname_); env_->CreateDirIfMissing(dbname_);
env_->CreateDirIfMissing(db_options_.dirname); env_->CreateDirIfMissing(db_options_.dirname);
auto cache = NewLRUCache(db_options_.max_open_files); auto cache = NewLRUCache(db_options_.max_open_files);
file_cache_.reset(new BlobFileCache(db_options_, cf_options_, cache)); file_cache_.reset(
new BlobFileCache(db_options_, cf_options_, cache, nullptr));
Reset(); Reset();
} }
...@@ -46,15 +47,17 @@ class VersionTest : public testing::Test { ...@@ -46,15 +47,17 @@ class VersionTest : public testing::Test {
DeleteDir(env_, db_options_.dirname); DeleteDir(env_, db_options_.dirname);
env_->CreateDirIfMissing(db_options_.dirname); env_->CreateDirIfMissing(db_options_.dirname);
vset_.reset(new VersionSet(db_options_)); vset_.reset(new VersionSet(db_options_, nullptr));
ASSERT_OK(vset_->Open({})); ASSERT_OK(vset_->Open({}));
column_families_.clear(); column_families_.clear();
// Sets up some column families. // Sets up some column families.
for (uint32_t id = 0; id < 10; id++) { for (uint32_t id = 0; id < 10; id++) {
std::shared_ptr<BlobStorage> storage; std::shared_ptr<BlobStorage> storage;
storage.reset(new BlobStorage(db_options_, cf_options_, file_cache_)); storage.reset(
new BlobStorage(db_options_, cf_options_, id, file_cache_, nullptr));
column_families_.emplace(id, storage); column_families_.emplace(id, storage);
storage.reset(new BlobStorage(db_options_, cf_options_, file_cache_)); storage.reset(
new BlobStorage(db_options_, cf_options_, id, file_cache_, nullptr));
vset_->column_families_.emplace(id, storage); vset_->column_families_.emplace(id, storage);
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment