Unverified Commit 427b3481 authored by Connor's avatar Connor Committed by GitHub

Add ticker and histrogram metrics (#12)

* add metrics
Signed-off-by: 's avatarConnor1996 <zbk602423539@gmail.com>
parent e4939168
...@@ -3,9 +3,12 @@ ...@@ -3,9 +3,12 @@
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
BlobFileBuilder::BlobFileBuilder(const TitanCFOptions& options, BlobFileBuilder::BlobFileBuilder(const TitanDBOptions& db_options,
const TitanCFOptions& cf_options,
WritableFileWriter* file) WritableFileWriter* file)
: options_(options), file_(file), encoder_(options_.blob_file_compression) { : cf_options_(cf_options),
file_(file),
encoder_(cf_options_.blob_file_compression) {
BlobFileHeader header; BlobFileHeader header;
std::string buffer; std::string buffer;
header.EncodeTo(&buffer); header.EncodeTo(&buffer);
...@@ -34,6 +37,7 @@ Status BlobFileBuilder::Finish() { ...@@ -34,6 +37,7 @@ Status BlobFileBuilder::Finish() {
status_ = file_->Append(buffer); status_ = file_->Append(buffer);
if (ok()) { if (ok()) {
// The Sync will be done in `BatchFinishFiles`
status_ = file_->Flush(); status_ = file_->Flush();
} }
return status(); return status();
......
...@@ -36,7 +36,8 @@ class BlobFileBuilder { ...@@ -36,7 +36,8 @@ class BlobFileBuilder {
// Constructs a builder that will store the contents of the file it // Constructs a builder that will store the contents of the file it
// is building in "*file". Does not close the file. It is up to the // is building in "*file". Does not close the file. It is up to the
// caller to sync and close the file after calling Finish(). // caller to sync and close the file after calling Finish().
BlobFileBuilder(const TitanCFOptions& options, WritableFileWriter* file); BlobFileBuilder(const TitanDBOptions& db_options,
const TitanCFOptions& cf_options, WritableFileWriter* file);
// Adds the record to the file and points the handle to it. // Adds the record to the file and points the handle to it.
void Add(const BlobRecord& record, BlobHandle* handle); void Add(const BlobRecord& record, BlobHandle* handle);
...@@ -56,7 +57,7 @@ class BlobFileBuilder { ...@@ -56,7 +57,7 @@ class BlobFileBuilder {
private: private:
bool ok() const { return status().ok(); } bool ok() const { return status().ok(); }
TitanCFOptions options_; TitanCFOptions cf_options_;
WritableFileWriter* file_; WritableFileWriter* file_;
Status status_; Status status_;
......
...@@ -21,7 +21,8 @@ BlobFileCache::BlobFileCache(const TitanDBOptions& db_options, ...@@ -21,7 +21,8 @@ BlobFileCache::BlobFileCache(const TitanDBOptions& db_options,
env_options_(db_options), env_options_(db_options),
db_options_(db_options), db_options_(db_options),
cf_options_(cf_options), cf_options_(cf_options),
cache_(cache) {} cache_(cache),
stats_(db_options.statistics.get()) {}
Status BlobFileCache::Get(const ReadOptions& options, uint64_t file_number, Status BlobFileCache::Get(const ReadOptions& options, uint64_t file_number,
uint64_t file_size, const BlobHandle& handle, uint64_t file_size, const BlobHandle& handle,
...@@ -59,8 +60,10 @@ Status BlobFileCache::FindFile(uint64_t file_number, uint64_t file_size, ...@@ -59,8 +60,10 @@ Status BlobFileCache::FindFile(uint64_t file_number, uint64_t file_size,
Status s; Status s;
Slice cache_key = EncodeFileNumber(&file_number); Slice cache_key = EncodeFileNumber(&file_number);
*handle = cache_->Lookup(cache_key); *handle = cache_->Lookup(cache_key);
if (*handle) return s; if (*handle) {
// TODO: add file reader cache hit/miss metrics
return s;
}
std::unique_ptr<RandomAccessFileReader> file; std::unique_ptr<RandomAccessFileReader> file;
{ {
std::unique_ptr<RandomAccessFile> f; std::unique_ptr<RandomAccessFile> f;
...@@ -74,7 +77,8 @@ Status BlobFileCache::FindFile(uint64_t file_number, uint64_t file_size, ...@@ -74,7 +77,8 @@ Status BlobFileCache::FindFile(uint64_t file_number, uint64_t file_size,
} }
std::unique_ptr<BlobFileReader> reader; std::unique_ptr<BlobFileReader> reader;
s = BlobFileReader::Open(cf_options_, std::move(file), file_size, &reader); s = BlobFileReader::Open(cf_options_, std::move(file), file_size, &reader,
stats_);
if (!s.ok()) return s; if (!s.ok()) return s;
cache_->Insert(cache_key, reader.release(), 1, cache_->Insert(cache_key, reader.release(), 1,
......
...@@ -41,6 +41,7 @@ class BlobFileCache { ...@@ -41,6 +41,7 @@ class BlobFileCache {
TitanDBOptions db_options_; TitanDBOptions db_options_;
TitanCFOptions cf_options_; TitanCFOptions cf_options_;
std::shared_ptr<Cache> cache_; std::shared_ptr<Cache> cache_;
Statistics* stats_;
}; };
} // namespace titandb } // namespace titandb
......
...@@ -60,7 +60,8 @@ class BlobFileIteratorTest : public testing::Test { ...@@ -60,7 +60,8 @@ class BlobFileIteratorTest : public testing::Test {
writable_file_.reset( writable_file_.reset(
new WritableFileWriter(std::move(f), file_name_, env_options_)); new WritableFileWriter(std::move(f), file_name_, env_options_));
} }
builder_.reset(new BlobFileBuilder(cf_options, writable_file_.get())); builder_.reset(
new BlobFileBuilder(db_options, cf_options, writable_file_.get()));
} }
void AddKeyValue(const std::string& key, const std::string& value, void AddKeyValue(const std::string& key, const std::string& value,
......
...@@ -47,7 +47,8 @@ void EncodeBlobCache(std::string* dst, const Slice& prefix, uint64_t offset) { ...@@ -47,7 +47,8 @@ void EncodeBlobCache(std::string* dst, const Slice& prefix, uint64_t offset) {
Status BlobFileReader::Open(const TitanCFOptions& options, Status BlobFileReader::Open(const TitanCFOptions& options,
std::unique_ptr<RandomAccessFileReader> file, std::unique_ptr<RandomAccessFileReader> file,
uint64_t file_size, uint64_t file_size,
std::unique_ptr<BlobFileReader>* result) { std::unique_ptr<BlobFileReader>* result,
Statistics* stats) {
if (file_size < BlobFileFooter::kEncodedLength) { if (file_size < BlobFileFooter::kEncodedLength) {
return Status::Corruption("file is too short to be a blob file"); return Status::Corruption("file is too short to be a blob file");
} }
...@@ -59,15 +60,19 @@ Status BlobFileReader::Open(const TitanCFOptions& options, ...@@ -59,15 +60,19 @@ Status BlobFileReader::Open(const TitanCFOptions& options,
BlobFileFooter footer; BlobFileFooter footer;
TRY(DecodeInto(buffer, &footer)); TRY(DecodeInto(buffer, &footer));
auto reader = new BlobFileReader(options, std::move(file)); auto reader = new BlobFileReader(options, std::move(file), stats);
reader->footer_ = footer; reader->footer_ = footer;
result->reset(reader); result->reset(reader);
return Status::OK(); return Status::OK();
} }
BlobFileReader::BlobFileReader(const TitanCFOptions& options, BlobFileReader::BlobFileReader(const TitanCFOptions& options,
std::unique_ptr<RandomAccessFileReader> file) std::unique_ptr<RandomAccessFileReader> file,
: options_(options), file_(std::move(file)), cache_(options.blob_cache) { Statistics* stats)
: options_(options),
file_(std::move(file)),
cache_(options.blob_cache),
stats_(stats) {
if (cache_) { if (cache_) {
GenerateCachePrefix(&cache_prefix_, cache_.get(), file_->file()); GenerateCachePrefix(&cache_prefix_, cache_.get(), file_->file());
} }
...@@ -84,11 +89,15 @@ Status BlobFileReader::Get(const ReadOptions& /*options*/, ...@@ -84,11 +89,15 @@ Status BlobFileReader::Get(const ReadOptions& /*options*/,
EncodeBlobCache(&cache_key, cache_prefix_, handle.offset); EncodeBlobCache(&cache_key, cache_prefix_, handle.offset);
cache_handle = cache_->Lookup(cache_key); cache_handle = cache_->Lookup(cache_key);
if (cache_handle) { if (cache_handle) {
RecordTick(stats_, BLOCK_CACHE_DATA_HIT);
RecordTick(stats_, BLOCK_CACHE_HIT);
auto blob = reinterpret_cast<OwnedSlice*>(cache_->Value(cache_handle)); auto blob = reinterpret_cast<OwnedSlice*>(cache_->Value(cache_handle));
buffer->PinSlice(*blob, UnrefCacheHandle, cache_.get(), cache_handle); buffer->PinSlice(*blob, UnrefCacheHandle, cache_.get(), cache_handle);
return DecodeInto(*blob, record); return DecodeInto(*blob, record);
} }
} }
RecordTick(stats_, BLOCK_CACHE_DATA_MISS);
RecordTick(stats_, BLOCK_CACHE_MISS);
OwnedSlice blob; OwnedSlice blob;
TRY(ReadRecord(handle, record, &blob)); TRY(ReadRecord(handle, record, &blob));
......
...@@ -19,7 +19,8 @@ class BlobFileReader { ...@@ -19,7 +19,8 @@ class BlobFileReader {
static Status Open(const TitanCFOptions& options, static Status Open(const TitanCFOptions& options,
std::unique_ptr<RandomAccessFileReader> file, std::unique_ptr<RandomAccessFileReader> file,
uint64_t file_size, uint64_t file_size,
std::unique_ptr<BlobFileReader>* result); std::unique_ptr<BlobFileReader>* result,
Statistics* stats);
// Gets the blob record pointed by the handle in this file. The data // Gets the blob record pointed by the handle in this file. The data
// of the record is stored in the provided buffer, so the buffer // of the record is stored in the provided buffer, so the buffer
...@@ -31,7 +32,8 @@ class BlobFileReader { ...@@ -31,7 +32,8 @@ class BlobFileReader {
friend class BlobFilePrefetcher; friend class BlobFilePrefetcher;
BlobFileReader(const TitanCFOptions& options, BlobFileReader(const TitanCFOptions& options,
std::unique_ptr<RandomAccessFileReader> file); std::unique_ptr<RandomAccessFileReader> file,
Statistics* stats);
Status ReadRecord(const BlobHandle& handle, BlobRecord* record, Status ReadRecord(const BlobHandle& handle, BlobRecord* record,
OwnedSlice* buffer); OwnedSlice* buffer);
...@@ -44,6 +46,8 @@ class BlobFileReader { ...@@ -44,6 +46,8 @@ class BlobFileReader {
// Information read from the file. // Information read from the file.
BlobFileFooter footer_; BlobFileFooter footer_;
Statistics* stats_;
}; };
// Performs readahead on continuous reads. // Performs readahead on continuous reads.
......
...@@ -35,7 +35,7 @@ class BlobFileTest : public testing::Test { ...@@ -35,7 +35,7 @@ class BlobFileTest : public testing::Test {
new WritableFileWriter(std::move(f), file_name_, env_options_)); new WritableFileWriter(std::move(f), file_name_, env_options_));
} }
std::unique_ptr<BlobFileBuilder> builder( std::unique_ptr<BlobFileBuilder> builder(
new BlobFileBuilder(cf_options, file.get())); new BlobFileBuilder(db_options, cf_options, file.get()));
for (int i = 0; i < n; i++) { for (int i = 0; i < n; i++) {
auto key = std::to_string(i); auto key = std::to_string(i);
...@@ -96,7 +96,7 @@ class BlobFileTest : public testing::Test { ...@@ -96,7 +96,7 @@ class BlobFileTest : public testing::Test {
new WritableFileWriter(std::move(f), file_name_, env_options_)); new WritableFileWriter(std::move(f), file_name_, env_options_));
} }
std::unique_ptr<BlobFileBuilder> builder( std::unique_ptr<BlobFileBuilder> builder(
new BlobFileBuilder(cf_options, file.get())); new BlobFileBuilder(db_options, cf_options, file.get()));
for (int i = 0; i < n; i++) { for (int i = 0; i < n; i++) {
auto key = std::to_string(i); auto key = std::to_string(i);
...@@ -120,7 +120,7 @@ class BlobFileTest : public testing::Test { ...@@ -120,7 +120,7 @@ class BlobFileTest : public testing::Test {
std::unique_ptr<BlobFileReader> blob_file_reader; std::unique_ptr<BlobFileReader> blob_file_reader;
ASSERT_OK(BlobFileReader::Open(cf_options, ASSERT_OK(BlobFileReader::Open(cf_options,
std::move(random_access_file_reader), std::move(random_access_file_reader),
file_size, &blob_file_reader)); file_size, &blob_file_reader, nullptr));
for (int i = 0; i < n; i++) { for (int i = 0; i < n; i++) {
auto key = std::to_string(i); auto key = std::to_string(i);
auto value = std::string(1024, i); auto value = std::string(1024, i);
......
...@@ -8,7 +8,7 @@ namespace titandb { ...@@ -8,7 +8,7 @@ namespace titandb {
class BlobGCJob::GarbageCollectionWriteCallback : public WriteCallback { class BlobGCJob::GarbageCollectionWriteCallback : public WriteCallback {
public: public:
GarbageCollectionWriteCallback(ColumnFamilyHandle* cfh, std::string&& _key, GarbageCollectionWriteCallback(ColumnFamilyHandle* cfh, std::string&& _key,
BlobIndex&& blob_index) BlobIndex&& blob_index, Statistics* stats)
: cfh_(cfh), key_(std::move(_key)), blob_index_(blob_index) { : cfh_(cfh), key_(std::move(_key)), blob_index_(blob_index) {
assert(!key_.empty()); assert(!key_.empty());
} }
...@@ -26,6 +26,7 @@ class BlobGCJob::GarbageCollectionWriteCallback : public WriteCallback { ...@@ -26,6 +26,7 @@ class BlobGCJob::GarbageCollectionWriteCallback : public WriteCallback {
fprintf(stderr, "GetImpl err, status:%s\n", s.ToString().c_str()); fprintf(stderr, "GetImpl err, status:%s\n", s.ToString().c_str());
abort(); abort();
} }
read_bytes_ = key_.size() + index_entry.size();
if (s.IsNotFound()) { if (s.IsNotFound()) {
// Either the key is deleted or updated with a newer version which is // Either the key is deleted or updated with a newer version which is
// inlined in LSM. // inlined in LSM.
...@@ -55,11 +56,16 @@ class BlobGCJob::GarbageCollectionWriteCallback : public WriteCallback { ...@@ -55,11 +56,16 @@ class BlobGCJob::GarbageCollectionWriteCallback : public WriteCallback {
std::string key() { return key_; } std::string key() { return key_; }
uint64_t read_bytes() { return read_bytes_; }
uint64_t blob_record_size() { return blob_index_.blob_handle.size; }
private: private:
ColumnFamilyHandle* cfh_; ColumnFamilyHandle* cfh_;
// Key to check // Key to check
std::string key_; std::string key_;
BlobIndex blob_index_; BlobIndex blob_index_;
uint64_t read_bytes_;
}; };
BlobGCJob::BlobGCJob(BlobGC* blob_gc, DB* db, port::Mutex* mutex, BlobGCJob::BlobGCJob(BlobGC* blob_gc, DB* db, port::Mutex* mutex,
...@@ -78,10 +84,24 @@ BlobGCJob::BlobGCJob(BlobGC* blob_gc, DB* db, port::Mutex* mutex, ...@@ -78,10 +84,24 @@ BlobGCJob::BlobGCJob(BlobGC* blob_gc, DB* db, port::Mutex* mutex,
blob_file_manager_(blob_file_manager), blob_file_manager_(blob_file_manager),
version_set_(version_set), version_set_(version_set),
log_buffer_(log_buffer), log_buffer_(log_buffer),
shuting_down_(shuting_down) {} shuting_down_(shuting_down),
stats_(db_options_.statistics.get()) {}
BlobGCJob::~BlobGCJob() { BlobGCJob::~BlobGCJob() {
if (cmp_) delete cmp_; // flush metrics
RecordTick(stats_, BLOB_DB_BYTES_READ, metrics_.blob_db_bytes_read);
RecordTick(stats_, BLOB_DB_BYTES_WRITTEN, metrics_.blob_db_bytes_written);
RecordTick(stats_, BLOB_DB_GC_NUM_KEYS_OVERWRITTEN,
metrics_.blob_db_gc_num_keys_overwritten);
RecordTick(stats_, BLOB_DB_GC_BYTES_OVERWRITTEN,
metrics_.blob_db_gc_bytes_overwritten);
RecordTick(stats_, BLOB_DB_GC_NUM_KEYS_RELOCATED,
metrics_.blob_db_gc_num_keys_relocated);
RecordTick(stats_, BLOB_DB_GC_BYTES_RELOCATED,
metrics_.blob_db_gc_bytes_relocated);
RecordTick(stats_, BLOB_DB_GC_NUM_NEW_FILES,
metrics_.blob_db_gc_num_new_files);
RecordTick(stats_, BLOB_DB_GC_NUM_FILES, metrics_.blob_db_gc_num_files);
} }
Status BlobGCJob::Prepare() { return Status::OK(); } Status BlobGCJob::Prepare() { return Status::OK(); }
...@@ -140,6 +160,8 @@ bool BlobGCJob::DoSample(const BlobFileMeta* file) { ...@@ -140,6 +160,8 @@ bool BlobGCJob::DoSample(const BlobFileMeta* file) {
return true; return true;
} }
// TODO: add do sample count metrics
Status s; Status s;
uint64_t sample_size_window = static_cast<uint64_t>( uint64_t sample_size_window = static_cast<uint64_t>(
file->file_size() * blob_gc_->titan_cf_options().sample_file_size_ratio); file->file_size() * blob_gc_->titan_cf_options().sample_file_size_ratio);
...@@ -185,6 +207,7 @@ bool BlobGCJob::DoSample(const BlobFileMeta* file) { ...@@ -185,6 +207,7 @@ bool BlobGCJob::DoSample(const BlobFileMeta* file) {
discardable_size += total_length; discardable_size += total_length;
} }
} }
metrics_.blob_db_bytes_read += iterated_size;
assert(iter.status().ok()); assert(iter.status().ok());
return discardable_size >= return discardable_size >=
...@@ -230,6 +253,9 @@ Status BlobGCJob::DoRunGC() { ...@@ -230,6 +253,9 @@ Status BlobGCJob::DoRunGC() {
break; break;
} }
BlobIndex blob_index = gc_iter->GetBlobIndex(); BlobIndex blob_index = gc_iter->GetBlobIndex();
// count read bytes for blob record of gc candidate files
metrics_.blob_db_bytes_read += blob_index.blob_handle.size;
if (!last_key.empty() && !gc_iter->key().compare(last_key)) { if (!last_key.empty() && !gc_iter->key().compare(last_key)) {
if (last_key_valid) { if (last_key_valid) {
continue; continue;
...@@ -240,6 +266,8 @@ Status BlobGCJob::DoRunGC() { ...@@ -240,6 +266,8 @@ Status BlobGCJob::DoRunGC() {
} }
if (DiscardEntry(gc_iter->key(), blob_index)) { if (DiscardEntry(gc_iter->key(), blob_index)) {
metrics_.blob_db_gc_num_keys_overwritten++;
metrics_.blob_db_gc_bytes_overwritten += blob_index.blob_handle.size;
continue; continue;
} }
...@@ -259,8 +287,9 @@ Status BlobGCJob::DoRunGC() { ...@@ -259,8 +287,9 @@ Status BlobGCJob::DoRunGC() {
if (!s.ok()) { if (!s.ok()) {
break; break;
} }
blob_file_builder = unique_ptr<BlobFileBuilder>(new BlobFileBuilder( blob_file_builder = unique_ptr<BlobFileBuilder>(
blob_gc_->titan_cf_options(), blob_file_handle->GetFile())); new BlobFileBuilder(db_options_, blob_gc_->titan_cf_options(),
blob_file_handle->GetFile()));
file_size = 0; file_size = 0;
} }
assert(blob_file_handle); assert(blob_file_handle);
...@@ -269,8 +298,10 @@ Status BlobGCJob::DoRunGC() { ...@@ -269,8 +298,10 @@ Status BlobGCJob::DoRunGC() {
BlobRecord blob_record; BlobRecord blob_record;
blob_record.key = gc_iter->key(); blob_record.key = gc_iter->key();
blob_record.value = gc_iter->value(); blob_record.value = gc_iter->value();
// count written bytes for new blob record,
// file_size_ += blob_record.key.size() + blob_record.value.size(); // blob index's size is counted in `RewriteValidKeyToLSM`
metrics_.blob_db_bytes_written +=
blob_record.key.size() + blob_record.value.size();
BlobIndex new_blob_index; BlobIndex new_blob_index;
new_blob_index.file_number = blob_file_handle->GetNumber(); new_blob_index.file_number = blob_file_handle->GetNumber();
...@@ -280,7 +311,7 @@ Status BlobGCJob::DoRunGC() { ...@@ -280,7 +311,7 @@ Status BlobGCJob::DoRunGC() {
// Store WriteBatch for rewriting new Key-Index pairs to LSM // Store WriteBatch for rewriting new Key-Index pairs to LSM
GarbageCollectionWriteCallback callback(cfh, blob_record.key.ToString(), GarbageCollectionWriteCallback callback(cfh, blob_record.key.ToString(),
std::move(blob_index)); std::move(blob_index), stats_);
callback.value = index_entry; callback.value = index_entry;
rewrite_batches_.emplace_back( rewrite_batches_.emplace_back(
std::make_pair(WriteBatch(), std::move(callback))); std::make_pair(WriteBatch(), std::move(callback)));
...@@ -341,6 +372,8 @@ bool BlobGCJob::DiscardEntry(const Slice& key, const BlobIndex& blob_index) { ...@@ -341,6 +372,8 @@ bool BlobGCJob::DiscardEntry(const Slice& key, const BlobIndex& blob_index) {
fprintf(stderr, "GetImpl err, status:%s\n", s.ToString().c_str()); fprintf(stderr, "GetImpl err, status:%s\n", s.ToString().c_str());
abort(); abort();
} }
// count read bytes for checking LSM entry
metrics_.blob_db_bytes_read += key.size() + index_entry.size();
if (s.IsNotFound() || !is_blob_index) { if (s.IsNotFound() || !is_blob_index) {
// Either the key is deleted or updated with a newer version which is // Either the key is deleted or updated with a newer version which is
// inlined in LSM. // inlined in LSM.
...@@ -384,6 +417,7 @@ Status BlobGCJob::InstallOutputBlobFiles() { ...@@ -384,6 +417,7 @@ Status BlobGCJob::InstallOutputBlobFiles() {
if (!s.ok()) { if (!s.ok()) {
break; break;
} }
metrics_.blob_db_gc_num_new_files++;
} }
if (s.ok()) { if (s.ok()) {
std::vector<std::pair<std::shared_ptr<BlobFileMeta>, std::vector<std::pair<std::shared_ptr<BlobFileMeta>,
...@@ -424,13 +458,23 @@ Status BlobGCJob::RewriteValidKeyToLSM() { ...@@ -424,13 +458,23 @@ Status BlobGCJob::RewriteValidKeyToLSM() {
} }
s = db_impl->WriteWithCallback(wo, &write_batch.first, &write_batch.second); s = db_impl->WriteWithCallback(wo, &write_batch.first, &write_batch.second);
if (s.ok()) { if (s.ok()) {
// Key is successfully written to LSM // count written bytes for new blob index.
metrics_.blob_db_bytes_written += write_batch.first.GetDataSize();
metrics_.blob_db_gc_num_keys_relocated++;
metrics_.blob_db_gc_bytes_relocated +=
write_batch.second.blob_record_size();
// Key is successfully written to LSM.
} else if (s.IsBusy()) { } else if (s.IsBusy()) {
metrics_.blob_db_gc_num_keys_overwritten++;
metrics_.blob_db_gc_bytes_overwritten +=
write_batch.second.blob_record_size();
// The key is overwritten in the meanwhile. Drop the blob record. // The key is overwritten in the meanwhile. Drop the blob record.
} else { } else {
// We hit an error. // We hit an error.
break; break;
} }
// count read bytes in write callback
metrics_.blob_db_bytes_read += write_batch.second.read_bytes();
} }
if (s.IsBusy()) { if (s.IsBusy()) {
s = Status::OK(); s = Status::OK();
...@@ -443,7 +487,7 @@ Status BlobGCJob::RewriteValidKeyToLSM() { ...@@ -443,7 +487,7 @@ Status BlobGCJob::RewriteValidKeyToLSM() {
return s; return s;
} }
Status BlobGCJob::DeleteInputBlobFiles() const { Status BlobGCJob::DeleteInputBlobFiles() {
SequenceNumber obsolete_sequence = base_db_impl_->GetLatestSequenceNumber(); SequenceNumber obsolete_sequence = base_db_impl_->GetLatestSequenceNumber();
Status s; Status s;
...@@ -452,6 +496,7 @@ Status BlobGCJob::DeleteInputBlobFiles() const { ...@@ -452,6 +496,7 @@ Status BlobGCJob::DeleteInputBlobFiles() const {
for (const auto& file : blob_gc_->sampled_inputs()) { for (const auto& file : blob_gc_->sampled_inputs()) {
ROCKS_LOG_INFO(db_options_.info_log, "Titan add obsolete file [%llu]", ROCKS_LOG_INFO(db_options_.info_log, "Titan add obsolete file [%llu]",
file->file_number()); file->file_number());
metrics_.blob_db_gc_num_files++;
edit.DeleteBlobFile(file->file_number(), obsolete_sequence); edit.DeleteBlobFile(file->file_number(), obsolete_sequence);
} }
s = version_set_->LogAndApply(&edit); s = version_set_->LogAndApply(&edit);
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#include "blob_file_manager.h" #include "blob_file_manager.h"
#include "blob_gc.h" #include "blob_gc.h"
#include "db/db_impl.h" #include "db/db_impl.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "titan/options.h" #include "titan/options.h"
#include "version_set.h" #include "version_set.h"
...@@ -45,7 +46,7 @@ class BlobGCJob { ...@@ -45,7 +46,7 @@ class BlobGCJob {
Env* env_; Env* env_;
EnvOptions env_options_; EnvOptions env_options_;
BlobFileManager* blob_file_manager_; BlobFileManager* blob_file_manager_;
titandb::VersionSet* version_set_; VersionSet* version_set_;
LogBuffer* log_buffer_{nullptr}; LogBuffer* log_buffer_{nullptr};
std::vector<std::pair<std::unique_ptr<BlobFileHandle>, std::vector<std::pair<std::unique_ptr<BlobFileHandle>,
...@@ -53,10 +54,22 @@ class BlobGCJob { ...@@ -53,10 +54,22 @@ class BlobGCJob {
blob_file_builders_; blob_file_builders_;
std::vector<std::pair<WriteBatch, GarbageCollectionWriteCallback>> std::vector<std::pair<WriteBatch, GarbageCollectionWriteCallback>>
rewrite_batches_; rewrite_batches_;
InternalKeyComparator* cmp_{nullptr};
std::atomic_bool* shuting_down_{nullptr}; std::atomic_bool* shuting_down_{nullptr};
Statistics* stats_;
struct {
uint64_t blob_db_bytes_read;
uint64_t blob_db_bytes_written;
uint64_t blob_db_gc_num_keys_overwritten;
uint64_t blob_db_gc_bytes_overwritten;
uint64_t blob_db_gc_num_keys_relocated;
uint64_t blob_db_gc_bytes_relocated;
uint64_t blob_db_gc_num_new_files;
uint64_t blob_db_gc_num_files;
} metrics_;
Status SampleCandidateFiles(); Status SampleCandidateFiles();
bool DoSample(const BlobFileMeta* file); bool DoSample(const BlobFileMeta* file);
Status DoRunGC(); Status DoRunGC();
...@@ -64,7 +77,7 @@ class BlobGCJob { ...@@ -64,7 +77,7 @@ class BlobGCJob {
bool DiscardEntry(const Slice& key, const BlobIndex& blob_index); bool DiscardEntry(const Slice& key, const BlobIndex& blob_index);
Status InstallOutputBlobFiles(); Status InstallOutputBlobFiles();
Status RewriteValidKeyToLSM(); Status RewriteValidKeyToLSM();
Status DeleteInputBlobFiles() const; Status DeleteInputBlobFiles();
bool IsShutingDown(); bool IsShutingDown();
}; };
......
...@@ -77,6 +77,7 @@ void BlobStorage::GetObsoleteFiles(std::vector<std::string>* obsolete_files, ...@@ -77,6 +77,7 @@ void BlobStorage::GetObsoleteFiles(std::vector<std::string>* obsolete_files,
if (obsolete_files) { if (obsolete_files) {
obsolete_files->emplace_back( obsolete_files->emplace_back(
BlobFileName(db_options_.dirname, file_number)); BlobFileName(db_options_.dirname, file_number));
// TODO: add obsolete files count metrics
} }
it = obsolete_files_.erase(it); it = obsolete_files_.erase(it);
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#include "blob_format.h" #include "blob_format.h"
#include "blob_gc.h" #include "blob_gc.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/statistics.h"
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
...@@ -19,6 +20,7 @@ class BlobStorage { ...@@ -19,6 +20,7 @@ class BlobStorage {
this->file_cache_ = bs.file_cache_; this->file_cache_ = bs.file_cache_;
this->db_options_ = bs.db_options_; this->db_options_ = bs.db_options_;
this->cf_options_ = bs.cf_options_; this->cf_options_ = bs.cf_options_;
this->stats_ = bs.stats_;
} }
BlobStorage(const TitanDBOptions& _db_options, BlobStorage(const TitanDBOptions& _db_options,
...@@ -27,7 +29,8 @@ class BlobStorage { ...@@ -27,7 +29,8 @@ class BlobStorage {
: db_options_(_db_options), : db_options_(_db_options),
cf_options_(_cf_options), cf_options_(_cf_options),
file_cache_(_file_cache), file_cache_(_file_cache),
destroyed_(false) {} destroyed_(false),
stats_(_db_options.statistics.get()) {}
~BlobStorage() { ~BlobStorage() {
for (auto& file : files_) { for (auto& file : files_) {
...@@ -112,6 +115,8 @@ class BlobStorage { ...@@ -112,6 +115,8 @@ class BlobStorage {
// in-memory data structure can be destroyed. Physical files may still be // in-memory data structure can be destroyed. Physical files may still be
// kept. // kept.
bool destroyed_; bool destroyed_;
Statistics* stats_;
}; };
} // namespace titandb } // namespace titandb
......
...@@ -51,7 +51,12 @@ class TitanDBImpl::FileManager : public BlobFileManager { ...@@ -51,7 +51,12 @@ class TitanDBImpl::FileManager : public BlobFileManager {
VersionEdit edit; VersionEdit edit;
edit.SetColumnFamilyID(cf_id); edit.SetColumnFamilyID(cf_id);
for (auto& file : files) { for (auto& file : files) {
s = file.second->GetFile()->Sync(false); RecordTick(db_->stats_, BLOB_DB_BLOB_FILE_SYNCED);
{
StopWatch sync_sw(db_->env_, db_->stats_,
BLOB_DB_BLOB_FILE_SYNC_MICROS);
s = file.second->GetFile()->Sync(false);
}
if (s.ok()) { if (s.ok()) {
s = file.second->GetFile()->Close(); s = file.second->GetFile()->Close();
} }
...@@ -113,7 +118,8 @@ TitanDBImpl::TitanDBImpl(const TitanDBOptions& options, ...@@ -113,7 +118,8 @@ TitanDBImpl::TitanDBImpl(const TitanDBOptions& options,
dbname_(dbname), dbname_(dbname),
env_(options.env), env_(options.env),
env_options_(options), env_options_(options),
db_options_(options) { db_options_(options),
stats_(options.statistics.get()) {
if (db_options_.dirname.empty()) { if (db_options_.dirname.empty()) {
db_options_.dirname = dbname_ + "/titandb"; db_options_.dirname = dbname_ + "/titandb";
} }
...@@ -124,7 +130,7 @@ TitanDBImpl::TitanDBImpl(const TitanDBOptions& options, ...@@ -124,7 +130,7 @@ TitanDBImpl::TitanDBImpl(const TitanDBOptions& options,
TitanDBImpl::~TitanDBImpl() { Close(); } TitanDBImpl::~TitanDBImpl() { Close(); }
// how often to schedule delete obs files periods // how often to schedule delete obsolete blob files periods
static constexpr uint32_t kDeleteObsoleteFilesPeriodSecs = 10; // 10s static constexpr uint32_t kDeleteObsoleteFilesPeriodSecs = 10; // 10s
void TitanDBImpl::StartBackgroundTasks() { void TitanDBImpl::StartBackgroundTasks() {
...@@ -352,6 +358,8 @@ Status TitanDBImpl::GetImpl(const ReadOptions& options, ...@@ -352,6 +358,8 @@ Status TitanDBImpl::GetImpl(const ReadOptions& options,
nullptr /*read_callback*/, &is_blob_index); nullptr /*read_callback*/, &is_blob_index);
if (!s.ok() || !is_blob_index) return s; if (!s.ok() || !is_blob_index) return s;
StopWatch get_sw(env_, stats_, BLOB_DB_GET_MICROS);
BlobIndex index; BlobIndex index;
s = index.DecodeFrom(value); s = index.DecodeFrom(value);
assert(s.ok()); assert(s.ok());
...@@ -364,7 +372,12 @@ Status TitanDBImpl::GetImpl(const ReadOptions& options, ...@@ -364,7 +372,12 @@ Status TitanDBImpl::GetImpl(const ReadOptions& options,
auto storage = vset_->GetBlobStorage(handle->GetID()).lock(); auto storage = vset_->GetBlobStorage(handle->GetID()).lock();
mutex_.Unlock(); mutex_.Unlock();
s = storage->Get(options, index, &record, &buffer); {
StopWatch read_sw(env_, stats_, BLOB_DB_BLOB_FILE_READ_MICROS);
s = storage->Get(options, index, &record, &buffer);
RecordTick(stats_, BLOB_DB_NUM_KEYS_READ);
RecordTick(stats_, BLOB_DB_BLOB_FILE_BYTES_READ, index.blob_handle.size);
}
if (s.IsCorruption()) { if (s.IsCorruption()) {
ROCKS_LOG_DEBUG(db_options_.info_log, ROCKS_LOG_DEBUG(db_options_.info_log,
"Key:%s Snapshot:%" PRIu64 " GetBlobFile err:%s\n", "Key:%s Snapshot:%" PRIu64 " GetBlobFile err:%s\n",
...@@ -437,7 +450,7 @@ Iterator* TitanDBImpl::NewIteratorImpl( ...@@ -437,7 +450,7 @@ Iterator* TitanDBImpl::NewIteratorImpl(
options, cfd, options.snapshot->GetSequenceNumber(), options, cfd, options.snapshot->GetSequenceNumber(),
nullptr /*read_callback*/, true /*allow_blob*/, true /*allow_refresh*/)); nullptr /*read_callback*/, true /*allow_blob*/, true /*allow_refresh*/));
return new TitanDBIterator(options, storage.lock().get(), snapshot, return new TitanDBIterator(options, storage.lock().get(), snapshot,
std::move(iter)); std::move(iter), env_, stats_);
} }
Status TitanDBImpl::NewIterators( Status TitanDBImpl::NewIterators(
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
#include "blob_file_manager.h" #include "blob_file_manager.h"
#include "db/db_impl.h" #include "db/db_impl.h"
#include "rocksdb/statistics.h"
#include "titan/db.h" #include "titan/db.h"
#include "util/repeatable_thread.h" #include "util/repeatable_thread.h"
#include "version_set.h" #include "version_set.h"
...@@ -143,6 +144,10 @@ class TitanDBImpl : public TitanDB { ...@@ -143,6 +144,10 @@ class TitanDBImpl : public TitanDB {
EnvOptions env_options_; EnvOptions env_options_;
DBImpl* db_impl_; DBImpl* db_impl_;
TitanDBOptions db_options_; TitanDBOptions db_options_;
// statistics object sharing with RocksDB
Statistics* stats_;
std::unordered_map<uint32_t, std::shared_ptr<TableFactory>> std::unordered_map<uint32_t, std::shared_ptr<TableFactory>>
original_table_factory_; original_table_factory_;
......
...@@ -60,6 +60,7 @@ void TitanDBImpl::BackgroundCallGC() { ...@@ -60,6 +60,7 @@ void TitanDBImpl::BackgroundCallGC() {
Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) { Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) {
mutex_.AssertHeld(); mutex_.AssertHeld();
StopWatch gc_sw(env_, stats_, BLOB_DB_GC_MICROS);
std::unique_ptr<BlobGC> blob_gc; std::unique_ptr<BlobGC> blob_gc;
std::unique_ptr<ColumnFamilyHandle> cfh; std::unique_ptr<ColumnFamilyHandle> cfh;
......
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
#include <inttypes.h> #include <inttypes.h>
#include "db/db_iter.h" #include "db/db_iter.h"
#include "rocksdb/statistics.h"
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
...@@ -15,11 +16,14 @@ class TitanDBIterator : public Iterator { ...@@ -15,11 +16,14 @@ class TitanDBIterator : public Iterator {
public: public:
TitanDBIterator(const ReadOptions& options, BlobStorage* storage, TitanDBIterator(const ReadOptions& options, BlobStorage* storage,
std::shared_ptr<ManagedSnapshot> snap, std::shared_ptr<ManagedSnapshot> snap,
std::unique_ptr<ArenaWrappedDBIter> iter) std::unique_ptr<ArenaWrappedDBIter> iter, Env* env,
Statistics* stats)
: options_(options), : options_(options),
storage_(storage), storage_(storage),
snap_(snap), snap_(snap),
iter_(std::move(iter)) {} iter_(std::move(iter)),
env_(env),
stats_(stats) {}
bool Valid() const override { return iter_->Valid() && status_.ok(); } bool Valid() const override { return iter_->Valid() && status_.ok(); }
...@@ -34,34 +38,58 @@ class TitanDBIterator : public Iterator { ...@@ -34,34 +38,58 @@ class TitanDBIterator : public Iterator {
void SeekToFirst() override { void SeekToFirst() override {
iter_->SeekToFirst(); iter_->SeekToFirst();
GetBlobValue(); if (Check()) {
StopWatch seek_sw(env_, stats_, BLOB_DB_SEEK_MICROS);
GetBlobValue();
RecordTick(stats_, BLOB_DB_NUM_SEEK);
}
} }
void SeekToLast() override { void SeekToLast() override {
iter_->SeekToLast(); iter_->SeekToLast();
GetBlobValue(); if (Check()) {
StopWatch seek_sw(env_, stats_, BLOB_DB_SEEK_MICROS);
GetBlobValue();
RecordTick(stats_, BLOB_DB_NUM_SEEK);
}
} }
void Seek(const Slice& target) override { void Seek(const Slice& target) override {
iter_->Seek(target); iter_->Seek(target);
GetBlobValue(); if (Check()) {
StopWatch seek_sw(env_, stats_, BLOB_DB_SEEK_MICROS);
GetBlobValue();
RecordTick(stats_, BLOB_DB_NUM_SEEK);
}
} }
void SeekForPrev(const Slice& target) override { void SeekForPrev(const Slice& target) override {
iter_->SeekForPrev(target); iter_->SeekForPrev(target);
GetBlobValue(); if (Check()) {
StopWatch seek_sw(env_, stats_, BLOB_DB_SEEK_MICROS);
GetBlobValue();
RecordTick(stats_, BLOB_DB_NUM_SEEK);
}
} }
void Next() override { void Next() override {
assert(Valid()); assert(Valid());
iter_->Next(); iter_->Next();
GetBlobValue(); if (Check()) {
StopWatch next_sw(env_, stats_, BLOB_DB_NEXT_MICROS);
GetBlobValue();
RecordTick(stats_, BLOB_DB_NUM_NEXT);
}
} }
void Prev() override { void Prev() override {
assert(Valid()); assert(Valid());
iter_->Prev(); iter_->Prev();
GetBlobValue(); if (Check()) {
StopWatch prev_sw(env_, stats_, BLOB_DB_PREV_MICROS);
GetBlobValue();
RecordTick(stats_, BLOB_DB_NUM_PREV);
}
} }
Slice key() const override { Slice key() const override {
...@@ -76,11 +104,15 @@ class TitanDBIterator : public Iterator { ...@@ -76,11 +104,15 @@ class TitanDBIterator : public Iterator {
} }
private: private:
void GetBlobValue() { bool Check() {
if (!iter_->Valid() || !iter_->IsBlob()) { if (!iter_->Valid() || !iter_->IsBlob()) {
status_ = iter_->status(); status_ = iter_->status();
return; return false;
} }
return true;
}
void GetBlobValue() {
assert(iter_->status().ok()); assert(iter_->status().ok());
BlobIndex index; BlobIndex index;
...@@ -107,6 +139,7 @@ class TitanDBIterator : public Iterator { ...@@ -107,6 +139,7 @@ class TitanDBIterator : public Iterator {
buffer_.Reset(); buffer_.Reset();
status_ = it->second->Get(options_, index.blob_handle, &record_, &buffer_); status_ = it->second->Get(options_, index.blob_handle, &record_, &buffer_);
return;
} }
Status status_; Status status_;
...@@ -118,6 +151,9 @@ class TitanDBIterator : public Iterator { ...@@ -118,6 +151,9 @@ class TitanDBIterator : public Iterator {
std::shared_ptr<ManagedSnapshot> snap_; std::shared_ptr<ManagedSnapshot> snap_;
std::unique_ptr<ArenaWrappedDBIter> iter_; std::unique_ptr<ArenaWrappedDBIter> iter_;
std::map<uint64_t, std::unique_ptr<BlobFilePrefetcher>> files_; std::map<uint64_t, std::unique_ptr<BlobFilePrefetcher>> files_;
Env* env_;
Statistics* stats_;
}; };
} // namespace titandb } // namespace titandb
......
...@@ -30,20 +30,26 @@ void TitanTableBuilder::Add(const Slice& key, const Slice& value) { ...@@ -30,20 +30,26 @@ void TitanTableBuilder::Add(const Slice& key, const Slice& value) {
void TitanTableBuilder::AddBlob(const Slice& key, const Slice& value, void TitanTableBuilder::AddBlob(const Slice& key, const Slice& value,
std::string* index_value) { std::string* index_value) {
if (!ok()) return; if (!ok()) return;
StopWatch write_sw(db_options_.env, stats_, BLOB_DB_BLOB_FILE_WRITE_MICROS);
if (!blob_builder_) { if (!blob_builder_) {
status_ = blob_manager_->NewFile(&blob_handle_); status_ = blob_manager_->NewFile(&blob_handle_);
if (!ok()) return; if (!ok()) return;
blob_builder_.reset( blob_builder_.reset(
new BlobFileBuilder(cf_options_, blob_handle_->GetFile())); new BlobFileBuilder(db_options_, cf_options_, blob_handle_->GetFile()));
} }
RecordTick(stats_, BLOB_DB_NUM_KEYS_WRITTEN);
MeasureTime(stats_, BLOB_DB_KEY_SIZE, key.size());
MeasureTime(stats_, BLOB_DB_VALUE_SIZE, value.size());
BlobIndex index; BlobIndex index;
BlobRecord record; BlobRecord record;
record.key = key; record.key = key;
record.value = value; record.value = value;
index.file_number = blob_handle_->GetNumber(); index.file_number = blob_handle_->GetNumber();
blob_builder_->Add(record, &index.blob_handle); blob_builder_->Add(record, &index.blob_handle);
RecordTick(stats_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN, index.blob_handle.size);
if (ok()) { if (ok()) {
index.EncodeTo(index_value); index.EncodeTo(index_value);
} }
...@@ -70,8 +76,6 @@ Status TitanTableBuilder::Finish() { ...@@ -70,8 +76,6 @@ Status TitanTableBuilder::Finish() {
file->FileStateTransit(BlobFileMeta::FileEvent::kFlushOrCompactionOutput); file->FileStateTransit(BlobFileMeta::FileEvent::kFlushOrCompactionOutput);
status_ = status_ =
blob_manager_->FinishFile(cf_id_, file, std::move(blob_handle_)); blob_manager_->FinishFile(cf_id_, file, std::move(blob_handle_));
// ROCKS_LOG_INFO(db_options_.info_log, "[%u] AddFile %lu", cf_id_,
// file->file_number_);
} else { } else {
status_ = blob_manager_->DeleteFile(std::move(blob_handle_)); status_ = blob_manager_->DeleteFile(std::move(blob_handle_));
} }
......
...@@ -18,7 +18,8 @@ class TitanTableBuilder : public TableBuilder { ...@@ -18,7 +18,8 @@ class TitanTableBuilder : public TableBuilder {
db_options_(db_options), db_options_(db_options),
cf_options_(cf_options), cf_options_(cf_options),
base_builder_(std::move(base_builder)), base_builder_(std::move(base_builder)),
blob_manager_(blob_manager) {} blob_manager_(blob_manager),
stats_(db_options.statistics.get()) {}
void Add(const Slice& key, const Slice& value) override; void Add(const Slice& key, const Slice& value) override;
...@@ -49,6 +50,8 @@ class TitanTableBuilder : public TableBuilder { ...@@ -49,6 +50,8 @@ class TitanTableBuilder : public TableBuilder {
std::unique_ptr<BlobFileHandle> blob_handle_; std::unique_ptr<BlobFileHandle> blob_handle_;
std::shared_ptr<BlobFileManager> blob_manager_; std::shared_ptr<BlobFileManager> blob_manager_;
std::unique_ptr<BlobFileBuilder> blob_builder_; std::unique_ptr<BlobFileBuilder> blob_builder_;
Statistics* stats_;
}; };
} // namespace titandb } // namespace titandb
......
...@@ -126,8 +126,8 @@ class TableBuilderTest : public testing::Test { ...@@ -126,8 +126,8 @@ class TableBuilderTest : public testing::Test {
NewFileReader(blob_name_, &file); NewFileReader(blob_name_, &file);
uint64_t file_size = 0; uint64_t file_size = 0;
ASSERT_OK(env_->GetFileSize(blob_name_, &file_size)); ASSERT_OK(env_->GetFileSize(blob_name_, &file_size));
ASSERT_OK( ASSERT_OK(BlobFileReader::Open(cf_options_, std::move(file), file_size,
BlobFileReader::Open(cf_options_, std::move(file), file_size, result)); result, nullptr));
} }
void NewTableReader(std::unique_ptr<TableReader>* result) { void NewTableReader(std::unique_ptr<TableReader>* result) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment