Commit 7a63c355 authored by Yi Wu's avatar Yi Wu

Copy Titan from latest pingcap/rocksdb tikv-3.0 branch

Signed-off-by: 's avatarYi Wu <yiwu@pingcap.com>
parent 77146969
#pragma once
#include "rocksdb/utilities/stackable_db.h"
#include "utilities/titandb/options.h"
namespace rocksdb {
namespace titandb {
struct TitanCFDescriptor {
std::string name;
TitanCFOptions options;
TitanCFDescriptor()
: name(kDefaultColumnFamilyName), options(TitanCFOptions()) {}
TitanCFDescriptor(const std::string& _name, const TitanCFOptions& _options)
: name(_name), options(_options) {}
};
class TitanDB : public StackableDB {
public:
static Status Open(const TitanOptions& options, const std::string& dbname,
TitanDB** db);
static Status Open(const TitanDBOptions& db_options,
const std::string& dbname,
const std::vector<TitanCFDescriptor>& descs,
std::vector<ColumnFamilyHandle*>* handles, TitanDB** db);
TitanDB() : StackableDB(nullptr) {}
using StackableDB::CreateColumnFamily;
Status CreateColumnFamily(const ColumnFamilyOptions& options,
const std::string& name,
ColumnFamilyHandle** handle) override {
TitanCFDescriptor desc(name, TitanCFOptions(options));
return CreateColumnFamily(desc, handle);
}
Status CreateColumnFamily(const TitanCFDescriptor& desc,
ColumnFamilyHandle** handle) {
std::vector<ColumnFamilyHandle*> handles;
Status s = CreateColumnFamilies({desc}, &handles);
if (s.ok()) {
*handle = handles[0];
}
return s;
}
using StackableDB::CreateColumnFamilies;
Status CreateColumnFamilies(
const ColumnFamilyOptions& options,
const std::vector<std::string>& names,
std::vector<ColumnFamilyHandle*>* handles) override {
std::vector<TitanCFDescriptor> descs;
for (auto& name : names) {
descs.emplace_back(name, TitanCFOptions(options));
}
return CreateColumnFamilies(descs, handles);
}
Status CreateColumnFamilies(
const std::vector<ColumnFamilyDescriptor>& base_descs,
std::vector<ColumnFamilyHandle*>* handles) override {
std::vector<TitanCFDescriptor> descs;
for (auto& desc : base_descs) {
descs.emplace_back(desc.name, TitanCFOptions(desc.options));
}
return CreateColumnFamilies(descs, handles);
}
virtual Status CreateColumnFamilies(
const std::vector<TitanCFDescriptor>& descs,
std::vector<ColumnFamilyHandle*>* handles) = 0;
Status DropColumnFamily(ColumnFamilyHandle* handle) override {
return DropColumnFamilies({handle});
}
virtual Status DropColumnFamilies(
const std::vector<ColumnFamilyHandle*>& handles) override = 0;
using StackableDB::Merge;
Status Merge(const WriteOptions&, ColumnFamilyHandle*, const Slice& /*key*/,
const Slice& /*value*/) override {
return Status::NotSupported("TitanDB doesn't support this operation");
}
using rocksdb::StackableDB::SingleDelete;
virtual Status SingleDelete(const WriteOptions& /*wopts*/,
ColumnFamilyHandle* /*column_family*/,
const Slice& /*key*/) override {
return Status::NotSupported("Not supported operation in titan db.");
}
using rocksdb::StackableDB::CompactFiles;
virtual Status CompactFiles(
const CompactionOptions& compact_options,
ColumnFamilyHandle* column_family,
const std::vector<std::string>& input_file_names, const int output_level,
const int output_path_id = -1,
std::vector<std::string>* const output_file_names = nullptr,
CompactionJobInfo* compaction_job_info = nullptr) override = 0;
};
} // namespace titandb
} // namespace rocksdb
/*
C bindings for TitanDB. May be useful as a stable ABI that can be
used by programs that keep rocksdb in a shared library, or for
a JNI api.
Does not support:
. getters for the option types
. custom comparators that implement key shortening
. capturing post-write-snapshot
. custom iter, db, env, cache implementations using just the C bindings
Some conventions:
(1) We expose just opaque struct pointers and functions to clients.
This allows us to change internal representations without having to
recompile clients.
(2) For simplicity, there is no equivalent to the Slice type. Instead,
the caller has to pass the pointer and length as separate
arguments.
(3) Errors are represented by a null-terminated c string. NULL
means no error. All operations that can raise an error are passed
a "char** errptr" as the last argument. One of the following must
be true on entry:
*errptr == NULL
*errptr points to a malloc()ed null-terminated error message
On success, a leveldb routine leaves *errptr unchanged.
On failure, leveldb frees the old value of *errptr and
set *errptr to a malloc()ed error message.
(4) Bools have the type unsigned char (0 == false; rest == true)
(5) All of the pointer arguments must be non-NULL.
*/
#ifndef ROCKSDB_TITAN_C_H
#define ROCKSDB_TITAN_C_H
#pragma once
#ifdef _WIN32
#ifdef ROCKSDB_DLL
#ifdef ROCKSDB_LIBRARY_EXPORTS
#define ROCKSDB_LIBRARY_API __declspec(dllexport)
#else
#define ROCKSDB_LIBRARY_API __declspec(dllimport)
#endif
#else
#define ROCKSDB_LIBRARY_API
#endif
#else
#define ROCKSDB_LIBRARY_API
#endif
#ifdef __cplusplus
extern "C" {
#endif
#include <stdarg.h>
#include <stddef.h>
#include <stdint.h>
#include "rocksdb/c.h"
/* Exported types */
// TitanDB
typedef struct titandb_options_t titandb_options_t;
extern ROCKSDB_LIBRARY_API rocksdb_t* titandb_open(
const titandb_options_t* options, const char* name, char** errptr);
extern ROCKSDB_LIBRARY_API titandb_options_t* titandb_options_create();
extern ROCKSDB_LIBRARY_API void titandb_options_destroy(titandb_options_t*);
extern ROCKSDB_LIBRARY_API void titandb_options_set_rocksdb(
titandb_options_t* options, rocksdb_options_t* rocksdb);
extern ROCKSDB_LIBRARY_API void titandb_options_set_dirname(
titandb_options_t* options, const char* name);
extern ROCKSDB_LIBRARY_API void titandb_options_set_min_blob_size(
titandb_options_t* options, uint64_t size);
extern ROCKSDB_LIBRARY_API void titandb_options_set_blob_file_compression(
titandb_options_t* options, int compression);
extern ROCKSDB_LIBRARY_API void titandb_options_set_blob_cache(
titandb_options_t* options, rocksdb_cache_t* blob_cache);
extern ROCKSDB_LIBRARY_API void titandb_options_set_disable_background_gc(
titandb_options_t* options, unsigned char disable);
extern ROCKSDB_LIBRARY_API void titandb_options_set_max_gc_batch_size(
titandb_options_t* options, uint64_t size);
extern ROCKSDB_LIBRARY_API void titandb_options_set_min_gc_batch_size(
titandb_options_t* options, uint64_t size);
extern ROCKSDB_LIBRARY_API void titandb_options_set_blob_file_discardable_ratio(
titandb_options_t* options, float ratio);
extern ROCKSDB_LIBRARY_API void titandb_options_set_sample_file_size_ratio(
titandb_options_t* options, float ratio);
extern ROCKSDB_LIBRARY_API void titandb_options_set_merge_small_file_threshold(
titandb_options_t* options, uint64_t size);
#ifdef __cplusplus
} /* end extern "C" */
#endif
#endif // ROCKSDB_TITAN_C_H
#include "base_db_listener.h"
namespace rocksdb {
namespace titandb {
BaseDbListener::BaseDbListener(TitanDBImpl* db) : db_impl_(db) {}
BaseDbListener::~BaseDbListener() {}
void BaseDbListener::OnFlushCompleted(DB* /*db*/,
const FlushJobInfo& flush_job_info) {
db_impl_->OnFlushCompleted(flush_job_info);
}
void BaseDbListener::OnCompactionCompleted(
DB* /* db */, const CompactionJobInfo& compaction_job_info) {
db_impl_->OnCompactionCompleted(compaction_job_info);
}
} // namespace titandb
} // namespace rocksdb
#pragma once
#include "rocksdb/listener.h"
#include "utilities/titandb/db_impl.h"
namespace rocksdb {
namespace titandb {
class BaseDbListener final : public EventListener {
public:
BaseDbListener(TitanDBImpl* db);
~BaseDbListener();
void OnFlushCompleted(DB* db, const FlushJobInfo& flush_job_info) override;
void OnCompactionCompleted(
DB* db, const CompactionJobInfo& compaction_job_info) override;
private:
rocksdb::titandb::TitanDBImpl* db_impl_;
};
} // namespace titandb
} // namespace rocksdb
#include "utilities/titandb/blob_file_builder.h"
namespace rocksdb {
namespace titandb {
BlobFileBuilder::BlobFileBuilder(const TitanCFOptions& options,
WritableFileWriter* file)
: options_(options), file_(file), encoder_(options_.blob_file_compression) {
BlobFileHeader header;
std::string buffer;
header.EncodeTo(&buffer);
status_ = file_->Append(buffer);
}
void BlobFileBuilder::Add(const BlobRecord& record, BlobHandle* handle) {
if (!ok()) return;
encoder_.EncodeRecord(record);
handle->offset = file_->GetFileSize();
handle->size = encoder_.GetEncodedSize();
status_ = file_->Append(encoder_.GetHeader());
if (ok()) {
status_ = file_->Append(encoder_.GetRecord());
}
}
Status BlobFileBuilder::Finish() {
if (!ok()) return status();
std::string buffer;
BlobFileFooter footer;
footer.EncodeTo(&buffer);
status_ = file_->Append(buffer);
if (ok()) {
status_ = file_->Flush();
}
return status();
}
void BlobFileBuilder::Abandon() {}
} // namespace titandb
} // namespace rocksdb
#pragma once
#include "util/file_reader_writer.h"
#include "utilities/titandb/blob_format.h"
#include "utilities/titandb/options.h"
namespace rocksdb {
namespace titandb {
// Blob file format:
//
// <begin>
// [blob record 1]
// [blob record 2]
// ...
// [blob record N]
// [meta block 1]
// [meta block 2]
// ...
// [meta block K]
// [meta index block]
// [footer]
// <end>
//
// 1. The sequence of blob records in the file are stored in sorted
// order. These records come one after another at the beginning of the
// file, and are compressed according to the compression options.
//
// 2. After the blob records we store a bunch of meta blocks, and a
// meta index block with block handles pointed to the meta blocks. The
// meta block and the meta index block are formatted the same as the
// BlockBasedTable.
class BlobFileBuilder {
public:
// Constructs a builder that will store the contents of the file it
// is building in "*file". Does not close the file. It is up to the
// caller to sync and close the file after calling Finish().
BlobFileBuilder(const TitanCFOptions& options, WritableFileWriter* file);
// Adds the record to the file and points the handle to it.
void Add(const BlobRecord& record, BlobHandle* handle);
// Returns non-ok iff some error has been detected.
Status status() const { return status_; }
// Finishes building the table.
// REQUIRES: Finish(), Abandon() have not been called.
Status Finish();
// Abandons building the table. If the caller is not going to call
// Finish(), it must call Abandon() before destroying this builder.
// REQUIRES: Finish(), Abandon() have not been called.
void Abandon();
private:
bool ok() const { return status().ok(); }
TitanCFOptions options_;
WritableFileWriter* file_;
Status status_;
BlobEncoder encoder_;
};
} // namespace titandb
} // namespace rocksdb
#include "utilities/titandb/blob_file_cache.h"
#include "util/filename.h"
#include "utilities/titandb/util.h"
namespace rocksdb {
namespace titandb {
namespace {
Slice EncodeFileNumber(const uint64_t* number) {
return Slice(reinterpret_cast<const char*>(number), sizeof(*number));
}
} // namespace
BlobFileCache::BlobFileCache(const TitanDBOptions& db_options,
const TitanCFOptions& cf_options,
std::shared_ptr<Cache> cache)
: env_(db_options.env),
env_options_(db_options),
db_options_(db_options),
cf_options_(cf_options),
cache_(cache) {}
Status BlobFileCache::Get(const ReadOptions& options, uint64_t file_number,
uint64_t file_size, const BlobHandle& handle,
BlobRecord* record, PinnableSlice* buffer) {
Cache::Handle* cache_handle = nullptr;
Status s = FindFile(file_number, file_size, &cache_handle);
if (!s.ok()) return s;
auto reader = reinterpret_cast<BlobFileReader*>(cache_->Value(cache_handle));
s = reader->Get(options, handle, record, buffer);
cache_->Release(cache_handle);
return s;
}
Status BlobFileCache::NewPrefetcher(
uint64_t file_number, uint64_t file_size,
std::unique_ptr<BlobFilePrefetcher>* result) {
Cache::Handle* cache_handle = nullptr;
Status s = FindFile(file_number, file_size, &cache_handle);
if (!s.ok()) return s;
auto reader = reinterpret_cast<BlobFileReader*>(cache_->Value(cache_handle));
auto prefetcher = new BlobFilePrefetcher(reader);
prefetcher->RegisterCleanup(&UnrefCacheHandle, cache_.get(), cache_handle);
result->reset(prefetcher);
return s;
}
void BlobFileCache::Evict(uint64_t file_number) {
cache_->Erase(EncodeFileNumber(&file_number));
}
Status BlobFileCache::FindFile(uint64_t file_number, uint64_t file_size,
Cache::Handle** handle) {
Status s;
Slice cache_key = EncodeFileNumber(&file_number);
*handle = cache_->Lookup(cache_key);
if (*handle) return s;
std::unique_ptr<RandomAccessFileReader> file;
{
std::unique_ptr<RandomAccessFile> f;
auto file_name = BlobFileName(db_options_.dirname, file_number);
s = env_->NewRandomAccessFile(file_name, &f, env_options_);
if (!s.ok()) return s;
if (db_options_.advise_random_on_open) {
f->Hint(RandomAccessFile::RANDOM);
}
file.reset(new RandomAccessFileReader(std::move(f), file_name));
}
std::unique_ptr<BlobFileReader> reader;
s = BlobFileReader::Open(cf_options_, std::move(file), file_size, &reader);
if (!s.ok()) return s;
cache_->Insert(cache_key, reader.release(), 1,
&DeleteCacheValue<BlobFileReader>, handle);
return s;
}
} // namespace titandb
} // namespace rocksdb
#pragma once
#include "rocksdb/options.h"
#include "utilities/titandb/blob_file_reader.h"
#include "utilities/titandb/blob_format.h"
#include "utilities/titandb/options.h"
namespace rocksdb {
namespace titandb {
class BlobFileCache {
public:
// Constructs a blob file cache to cache opened files.
BlobFileCache(const TitanDBOptions& db_options,
const TitanCFOptions& cf_options, std::shared_ptr<Cache> cache);
// Gets the blob record pointed by the handle in the specified file
// number. The corresponding file size must be exactly "file_size"
// bytes. The provided buffer is used to store the record data, so
// the buffer must be valid when the record is used.
Status Get(const ReadOptions& options, uint64_t file_number,
uint64_t file_size, const BlobHandle& handle, BlobRecord* record,
PinnableSlice* buffer);
// Creates a prefetcher for the specified file number.
Status NewPrefetcher(uint64_t file_number, uint64_t file_size,
std::unique_ptr<BlobFilePrefetcher>* result);
// Evicts the file cache for the specified file number.
void Evict(uint64_t file_number);
private:
// Finds the file for the specified file number. Opens the file if
// the file is not found in the cache and caches it.
// If successful, sets "*handle" to the cached file.
Status FindFile(uint64_t file_number, uint64_t file_size,
Cache::Handle** handle);
Env* env_;
EnvOptions env_options_;
TitanDBOptions db_options_;
TitanCFOptions cf_options_;
std::shared_ptr<Cache> cache_;
};
} // namespace titandb
} // namespace rocksdb
#include "utilities/titandb/blob_file_iterator.h"
#include "util/crc32c.h"
#include "utilities/titandb/util.h"
namespace rocksdb {
namespace titandb {
BlobFileIterator::BlobFileIterator(
std::unique_ptr<RandomAccessFileReader>&& file, uint64_t file_name,
uint64_t file_size, const TitanCFOptions& titan_cf_options)
: file_(std::move(file)),
file_number_(file_name),
file_size_(file_size),
titan_cf_options_(titan_cf_options) {}
BlobFileIterator::~BlobFileIterator() {}
bool BlobFileIterator::Init() {
Slice slice;
char header_buf[BlobFileHeader::kEncodedLength];
status_ = file_->Read(0, BlobFileHeader::kEncodedLength, &slice, header_buf);
if (!status_.ok()) {
return false;
}
BlobFileHeader blob_file_header;
status_ = blob_file_header.DecodeFrom(&slice);
if (!status_.ok()) {
return false;
}
char footer_buf[BlobFileFooter::kEncodedLength];
status_ = file_->Read(file_size_ - BlobFileFooter::kEncodedLength,
BlobFileFooter::kEncodedLength, &slice, footer_buf);
if (!status_.ok()) return false;
BlobFileFooter blob_file_footer;
status_ = blob_file_footer.DecodeFrom(&slice);
end_of_blob_record_ = file_size_ - BlobFileFooter::kEncodedLength -
blob_file_footer.meta_index_handle.size();
assert(end_of_blob_record_ > BlobFileHeader::kEncodedLength);
init_ = true;
return true;
}
void BlobFileIterator::SeekToFirst() {
if (!init_ && !Init()) return;
status_ = Status::OK();
iterate_offset_ = BlobFileHeader::kEncodedLength;
PrefetchAndGet();
}
bool BlobFileIterator::Valid() const { return valid_ && status().ok(); }
void BlobFileIterator::Next() {
assert(init_);
PrefetchAndGet();
}
Slice BlobFileIterator::key() const { return cur_blob_record_.key; }
Slice BlobFileIterator::value() const { return cur_blob_record_.value; }
void BlobFileIterator::IterateForPrev(uint64_t offset) {
if (!init_ && !Init()) return;
status_ = Status::OK();
if (offset >= end_of_blob_record_) {
iterate_offset_ = offset;
status_ = Status::InvalidArgument("Out of bound");
return;
}
uint64_t total_length = 0;
FixedSlice<kBlobHeaderSize> header_buffer;
iterate_offset_ = BlobFileHeader::kEncodedLength;
for (; iterate_offset_ < offset; iterate_offset_ += total_length) {
status_ = file_->Read(iterate_offset_, kBlobHeaderSize, &header_buffer,
header_buffer.get());
if (!status_.ok()) return;
status_ = decoder_.DecodeHeader(&header_buffer);
if (!status_.ok()) return;
total_length = kBlobHeaderSize + decoder_.GetRecordSize();
}
if (iterate_offset_ > offset) iterate_offset_ -= total_length;
valid_ = false;
}
void BlobFileIterator::GetBlobRecord() {
FixedSlice<kBlobHeaderSize> header_buffer;
status_ = file_->Read(iterate_offset_, kBlobHeaderSize, &header_buffer,
header_buffer.get());
if (!status_.ok()) return;
status_ = decoder_.DecodeHeader(&header_buffer);
if (!status_.ok()) return;
Slice record_slice;
auto record_size = decoder_.GetRecordSize();
buffer_.reserve(record_size);
status_ = file_->Read(iterate_offset_ + kBlobHeaderSize, record_size,
&record_slice, buffer_.data());
if (status_.ok()) {
status_ =
decoder_.DecodeRecord(&record_slice, &cur_blob_record_, &uncompressed_);
}
if (!status_.ok()) return;
cur_record_offset_ = iterate_offset_;
cur_record_size_ = kBlobHeaderSize + record_size;
iterate_offset_ += cur_record_size_;
valid_ = true;
}
void BlobFileIterator::PrefetchAndGet() {
if (iterate_offset_ >= end_of_blob_record_) {
valid_ = false;
return;
}
if (readahead_begin_offset_ > iterate_offset_ ||
readahead_end_offset_ < iterate_offset_) {
// alignment
readahead_begin_offset_ =
iterate_offset_ - (iterate_offset_ & (kDefaultPageSize - 1));
readahead_end_offset_ = readahead_begin_offset_;
readahead_size_ = kMinReadaheadSize;
}
auto min_blob_size =
iterate_offset_ + kBlobHeaderSize + titan_cf_options_.min_blob_size;
if (readahead_end_offset_ <= min_blob_size) {
while (readahead_end_offset_ + readahead_size_ <= min_blob_size &&
readahead_size_ < kMaxReadaheadSize)
readahead_size_ <<= 1;
file_->Prefetch(readahead_end_offset_, readahead_size_);
readahead_end_offset_ += readahead_size_;
readahead_size_ = std::min(kMaxReadaheadSize, readahead_size_ << 1);
}
GetBlobRecord();
if (readahead_end_offset_ < iterate_offset_) {
readahead_end_offset_ = iterate_offset_;
}
}
BlobFileMergeIterator::BlobFileMergeIterator(
std::vector<std::unique_ptr<BlobFileIterator>>&& blob_file_iterators)
: blob_file_iterators_(std::move(blob_file_iterators)) {}
bool BlobFileMergeIterator::Valid() const {
if (current_ == nullptr) return false;
if (!status().ok()) return false;
return current_->Valid() && current_->status().ok();
}
void BlobFileMergeIterator::SeekToFirst() {
for (auto& iter : blob_file_iterators_) {
iter->SeekToFirst();
if (iter->status().ok() && iter->Valid()) min_heap_.push(iter.get());
}
if (!min_heap_.empty()) {
current_ = min_heap_.top();
min_heap_.pop();
} else {
status_ = Status::Aborted("No iterator is valid");
}
}
void BlobFileMergeIterator::Next() {
assert(current_ != nullptr);
current_->Next();
if (current_->status().ok() && current_->Valid()) min_heap_.push(current_);
current_ = min_heap_.top();
min_heap_.pop();
}
Slice BlobFileMergeIterator::key() const {
assert(current_ != nullptr);
return current_->key();
}
Slice BlobFileMergeIterator::value() const {
assert(current_ != nullptr);
return current_->value();
}
} // namespace titandb
} // namespace rocksdb
#pragma once
#include <cstdint>
#include <queue>
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "table/internal_iterator.h"
#include "util/file_reader_writer.h"
#include "utilities/titandb/blob_format.h"
#include "utilities/titandb/options.h"
#include "utilities/titandb/util.h"
namespace rocksdb {
namespace titandb {
class BlobFileIterator {
public:
const uint64_t kMinReadaheadSize = 4 << 10;
const uint64_t kMaxReadaheadSize = 256 << 10;
BlobFileIterator(std::unique_ptr<RandomAccessFileReader>&& file,
uint64_t file_name, uint64_t file_size,
const TitanCFOptions& titan_cf_options);
~BlobFileIterator();
bool Init();
bool Valid() const;
void SeekToFirst();
void Next();
Slice key() const;
Slice value() const;
Status status() const { return status_; }
void IterateForPrev(uint64_t);
BlobIndex GetBlobIndex() {
BlobIndex blob_index;
blob_index.file_number = file_number_;
blob_index.blob_handle.offset = cur_record_offset_;
blob_index.blob_handle.size = cur_record_size_;
return blob_index;
}
private:
// Blob file info
const std::unique_ptr<RandomAccessFileReader> file_;
const uint64_t file_number_;
const uint64_t file_size_;
TitanCFOptions titan_cf_options_;
bool init_{false};
uint64_t end_of_blob_record_{0};
// Iterator status
Status status_;
bool valid_{false};
BlobDecoder decoder_;
uint64_t iterate_offset_{0};
std::vector<char> buffer_;
OwnedSlice uncompressed_;
BlobRecord cur_blob_record_;
uint64_t cur_record_offset_;
uint64_t cur_record_size_;
uint64_t readahead_begin_offset_{0};
uint64_t readahead_end_offset_{0};
uint64_t readahead_size_{kMinReadaheadSize};
void PrefetchAndGet();
void GetBlobRecord();
};
class BlobFileMergeIterator {
public:
explicit BlobFileMergeIterator(
std::vector<std::unique_ptr<BlobFileIterator>>&&);
~BlobFileMergeIterator() = default;
bool Valid() const;
void SeekToFirst();
void Next();
Slice key() const;
Slice value() const;
Status status() const {
if (current_ != nullptr && !current_->status().ok())
return current_->status();
return status_;
}
BlobIndex GetBlobIndex() { return current_->GetBlobIndex(); }
private:
class IternalComparator {
public:
// Smaller value get Higher priority
bool operator()(const BlobFileIterator* iter1,
const BlobFileIterator* iter2) {
return BytewiseComparator()->Compare(iter1->key(), iter2->key()) > 0;
}
};
Status status_;
std::vector<std::unique_ptr<BlobFileIterator>> blob_file_iterators_;
std::priority_queue<BlobFileIterator*, std::vector<BlobFileIterator*>,
IternalComparator>
min_heap_;
BlobFileIterator* current_ = nullptr;
};
} // namespace titandb
} // namespace rocksdb
#include "utilities/titandb/blob_file_iterator.h"
#include <cinttypes>
#include "util/filename.h"
#include "util/testharness.h"
#include "utilities/titandb/blob_file_builder.h"
#include "utilities/titandb/blob_file_cache.h"
#include "utilities/titandb/blob_file_reader.h"
namespace rocksdb {
namespace titandb {
class BlobFileIteratorTest : public testing::Test {
public:
Env* env_{Env::Default()};
TitanOptions titan_options_;
EnvOptions env_options_;
std::string dirname_;
std::string file_name_;
uint64_t file_number_;
std::unique_ptr<BlobFileBuilder> builder_;
std::unique_ptr<WritableFileWriter> writable_file_;
std::unique_ptr<BlobFileIterator> blob_file_iterator_;
std::unique_ptr<RandomAccessFileReader> readable_file_;
BlobFileIteratorTest() : dirname_(test::TmpDir(env_)) {
titan_options_.dirname = dirname_;
file_number_ = Random::GetTLSInstance()->Next();
file_name_ = BlobFileName(dirname_, file_number_);
}
~BlobFileIteratorTest() {
env_->DeleteFile(file_name_);
env_->DeleteDir(dirname_);
}
std::string GenKey(uint64_t i) {
char buf[64];
snprintf(buf, sizeof(buf), "k-%08" PRIu64, i);
return buf;
}
std::string GenValue(uint64_t k) {
if (k % 2 == 0) {
return std::string(titan_options_.min_blob_size - 1, 'v');
} else {
return std::string(titan_options_.min_blob_size + 1, 'v');
}
}
void NewBuiler() {
TitanDBOptions db_options(titan_options_);
TitanCFOptions cf_options(titan_options_);
BlobFileCache cache(db_options, cf_options, {NewLRUCache(128)});
{
std::unique_ptr<WritableFile> f;
ASSERT_OK(env_->NewWritableFile(file_name_, &f, env_options_));
writable_file_.reset(new WritableFileWriter(std::move(f), file_name_, env_options_));
}
builder_.reset(new BlobFileBuilder(cf_options, writable_file_.get()));
}
void AddKeyValue(const std::string& key, const std::string& value,
BlobHandle* blob_handle) {
BlobRecord record;
record.key = key;
record.value = value;
builder_->Add(record, blob_handle);
ASSERT_OK(builder_->status());
}
void FinishBuiler() {
ASSERT_OK(builder_->Finish());
ASSERT_OK(builder_->status());
}
void NewBlobFileIterator() {
uint64_t file_size = 0;
ASSERT_OK(env_->GetFileSize(file_name_, &file_size));
NewBlobFileReader(file_number_, 0, titan_options_, env_options_, env_,
&readable_file_);
blob_file_iterator_.reset(new BlobFileIterator{
std::move(readable_file_), file_number_, file_size, TitanCFOptions()});
}
void TestBlobFileIterator() {
NewBuiler();
const int n = 1000;
std::vector<BlobHandle> handles(n);
for (int i = 0; i < n; i++) {
auto id = std::to_string(i);
AddKeyValue(id, id, &handles[i]);
}
FinishBuiler();
NewBlobFileIterator();
blob_file_iterator_->SeekToFirst();
for (int i = 0; i < n; blob_file_iterator_->Next(), i++) {
ASSERT_OK(blob_file_iterator_->status());
ASSERT_EQ(blob_file_iterator_->Valid(), true);
auto id = std::to_string(i);
ASSERT_EQ(id, blob_file_iterator_->key());
ASSERT_EQ(id, blob_file_iterator_->value());
BlobIndex blob_index = blob_file_iterator_->GetBlobIndex();
ASSERT_EQ(handles[i], blob_index.blob_handle);
}
}
};
TEST_F(BlobFileIteratorTest, Basic) {
TitanOptions options;
TestBlobFileIterator();
}
TEST_F(BlobFileIteratorTest, IterateForPrev) {
NewBuiler();
const int n = 1000;
std::vector<BlobHandle> handles(n);
for (int i = 0; i < n; i++) {
auto id = std::to_string(i);
AddKeyValue(id, id, &handles[i]);
}
FinishBuiler();
NewBlobFileIterator();
int i = n / 2;
blob_file_iterator_->IterateForPrev(handles[i].offset);
ASSERT_OK(blob_file_iterator_->status());
for (blob_file_iterator_->Next(); i < n; i++, blob_file_iterator_->Next()) {
ASSERT_OK(blob_file_iterator_->status());
ASSERT_EQ(blob_file_iterator_->Valid(), true);
BlobIndex blob_index;
blob_index = blob_file_iterator_->GetBlobIndex();
ASSERT_EQ(handles[i], blob_index.blob_handle);
auto id = std::to_string(i);
ASSERT_EQ(id, blob_file_iterator_->key());
ASSERT_EQ(id, blob_file_iterator_->value());
}
auto idx = Random::GetTLSInstance()->Uniform(n);
blob_file_iterator_->IterateForPrev(handles[idx].offset);
ASSERT_OK(blob_file_iterator_->status());
blob_file_iterator_->Next();
ASSERT_OK(blob_file_iterator_->status());
ASSERT_TRUE(blob_file_iterator_->Valid());
BlobIndex blob_index;
blob_index = blob_file_iterator_->GetBlobIndex();
ASSERT_EQ(handles[idx], blob_index.blob_handle);
while ((idx = Random::GetTLSInstance()->Uniform(n)) == 0)
;
blob_file_iterator_->IterateForPrev(handles[idx].offset - kBlobHeaderSize -
1);
ASSERT_OK(blob_file_iterator_->status());
blob_file_iterator_->Next();
ASSERT_OK(blob_file_iterator_->status());
ASSERT_TRUE(blob_file_iterator_->Valid());
blob_index = blob_file_iterator_->GetBlobIndex();
ASSERT_EQ(handles[idx - 1], blob_index.blob_handle);
idx = Random::GetTLSInstance()->Uniform(n);
blob_file_iterator_->IterateForPrev(handles[idx].offset + 1);
ASSERT_OK(blob_file_iterator_->status());
blob_file_iterator_->Next();
ASSERT_OK(blob_file_iterator_->status());
ASSERT_TRUE(blob_file_iterator_->Valid());
blob_index = blob_file_iterator_->GetBlobIndex();
ASSERT_EQ(handles[idx], blob_index.blob_handle);
}
TEST_F(BlobFileIteratorTest, MergeIterator) {
const int kMaxKeyNum = 1000;
std::vector<BlobHandle> handles(kMaxKeyNum);
std::vector<std::unique_ptr<BlobFileIterator>> iters;
NewBuiler();
for (int i = 1; i < kMaxKeyNum; i++) {
AddKeyValue(GenKey(i), GenValue(i), &handles[i]);
if (i % 100 == 0) {
FinishBuiler();
uint64_t file_size = 0;
ASSERT_OK(env_->GetFileSize(file_name_, &file_size));
NewBlobFileReader(file_number_, 0, titan_options_, env_options_, env_,
&readable_file_);
iters.emplace_back(std::unique_ptr<BlobFileIterator>(
new BlobFileIterator{std::move(readable_file_), file_number_,
file_size, TitanCFOptions()}));
file_number_ = Random::GetTLSInstance()->Next();
file_name_ = BlobFileName(dirname_, file_number_);
NewBuiler();
}
}
FinishBuiler();
uint64_t file_size = 0;
ASSERT_OK(env_->GetFileSize(file_name_, &file_size));
NewBlobFileReader(file_number_, 0, titan_options_, env_options_, env_,
&readable_file_);
iters.emplace_back(std::unique_ptr<BlobFileIterator>(new BlobFileIterator{
std::move(readable_file_), file_number_, file_size, TitanCFOptions()}));
BlobFileMergeIterator iter(std::move(iters));
iter.SeekToFirst();
for (int i = 1; i < kMaxKeyNum; i++, iter.Next()) {
ASSERT_OK(iter.status());
ASSERT_TRUE(iter.Valid());
ASSERT_EQ(iter.key(), GenKey(i));
ASSERT_EQ(iter.value(), GenValue(i));
ASSERT_EQ(iter.GetBlobIndex().blob_handle, handles[i]);
}
}
} // namespace titandb
} // namespace rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#pragma once
#include "util/file_reader_writer.h"
#include "utilities/titandb/blob_format.h"
namespace rocksdb {
namespace titandb {
// Contains information to complete a blob file creation.
class BlobFileHandle {
public:
virtual ~BlobFileHandle() {}
virtual uint64_t GetNumber() const = 0;
virtual const std::string& GetName() const = 0;
virtual WritableFileWriter* GetFile() const = 0;
};
// Manages the process of blob files creation.
class BlobFileManager {
public:
virtual ~BlobFileManager() {}
// Creates a new file. The new file should not be accessed until
// FinishFile() has been called.
// If successful, sets "*handle* to the new file handle.
virtual Status NewFile(std::unique_ptr<BlobFileHandle>* handle) = 0;
// Finishes the file with the provided metadata. Stops writting to
// the file anymore.
// REQUIRES: FinishFile(), DeleteFile() have not been called.
virtual Status FinishFile(uint32_t cf_id, std::shared_ptr<BlobFileMeta> file,
std::unique_ptr<BlobFileHandle>&& handle) {
std::vector<std::pair<std::shared_ptr<BlobFileMeta>,
std::unique_ptr<BlobFileHandle>>>
tmp;
tmp.emplace_back(std::make_pair(file, std::move(handle)));
return BatchFinishFiles(cf_id, tmp);
}
// Batch version of FinishFile
virtual Status BatchFinishFiles(
uint32_t cf_id,
const std::vector<std::pair<std::shared_ptr<BlobFileMeta>,
std::unique_ptr<BlobFileHandle>>>& files) {
(void)cf_id;
(void)files;
return Status::OK();
};
// Deletes the file. If the caller is not going to call
// FinishFile(), it must call DeleteFile() to release the handle.
// REQUIRES: FinishFile(), DeleteFile() have not been called.
virtual Status DeleteFile(std::unique_ptr<BlobFileHandle>&& handle) {
std::vector<std::unique_ptr<BlobFileHandle>> tmp;
tmp.emplace_back(std::move(handle));
return BatchDeleteFiles(tmp);
}
// Batch version of DeleteFile
virtual Status BatchDeleteFiles(
const std::vector<std::unique_ptr<BlobFileHandle>>& handles) {
(void)handles;
return Status::OK();
}
};
} // namespace titandb
} // namespace rocksdb
#include "utilities/titandb/blob_file_reader.h"
#include "util/crc32c.h"
#include "util/filename.h"
#include "util/sync_point.h"
namespace rocksdb {
namespace titandb {
Status NewBlobFileReader(uint64_t file_number, uint64_t readahead_size,
const TitanDBOptions& db_options,
const EnvOptions& env_options, Env* env,
std::unique_ptr<RandomAccessFileReader>* result) {
std::unique_ptr<RandomAccessFile> file;
auto file_name = BlobFileName(db_options.dirname, file_number);
Status s = env->NewRandomAccessFile(file_name, &file, env_options);
if (!s.ok()) return s;
if (readahead_size > 0) {
file = NewReadaheadRandomAccessFile(std::move(file), readahead_size);
}
result->reset(new RandomAccessFileReader(std::move(file), file_name));
return s;
}
const uint64_t kMaxReadaheadSize = 256 << 10;
namespace {
void GenerateCachePrefix(std::string* dst, Cache* cc, RandomAccessFile* file) {
char buffer[kMaxVarint64Length * 3 + 1];
auto size = file->GetUniqueId(buffer, sizeof(buffer));
if (size == 0) {
auto end = EncodeVarint64(buffer, cc->NewId());
size = end - buffer;
}
dst->assign(buffer, size);
}
void EncodeBlobCache(std::string* dst, const Slice& prefix, uint64_t offset) {
dst->assign(prefix.data(), prefix.size());
PutVarint64(dst, offset);
}
} // namespace
Status BlobFileReader::Open(const TitanCFOptions& options,
std::unique_ptr<RandomAccessFileReader> file,
uint64_t file_size,
std::unique_ptr<BlobFileReader>* result) {
if (file_size < BlobFileFooter::kEncodedLength) {
return Status::Corruption("file is too short to be a blob file");
}
FixedSlice<BlobFileFooter::kEncodedLength> buffer;
TRY(file->Read(file_size - BlobFileFooter::kEncodedLength,
BlobFileFooter::kEncodedLength, &buffer, buffer.get()));
BlobFileFooter footer;
TRY(DecodeInto(buffer, &footer));
auto reader = new BlobFileReader(options, std::move(file));
reader->footer_ = footer;
result->reset(reader);
return Status::OK();
}
BlobFileReader::BlobFileReader(const TitanCFOptions& options,
std::unique_ptr<RandomAccessFileReader> file)
: options_(options), file_(std::move(file)), cache_(options.blob_cache) {
if (cache_) {
GenerateCachePrefix(&cache_prefix_, cache_.get(), file_->file());
}
}
Status BlobFileReader::Get(const ReadOptions& /*options*/,
const BlobHandle& handle, BlobRecord* record,
PinnableSlice* buffer) {
TEST_SYNC_POINT("BlobFileReader::Get");
std::string cache_key;
Cache::Handle* cache_handle = nullptr;
if (cache_) {
EncodeBlobCache(&cache_key, cache_prefix_, handle.offset);
cache_handle = cache_->Lookup(cache_key);
if (cache_handle) {
auto blob = reinterpret_cast<OwnedSlice*>(cache_->Value(cache_handle));
buffer->PinSlice(*blob, UnrefCacheHandle, cache_.get(), cache_handle);
return DecodeInto(*blob, record);
}
}
OwnedSlice blob;
TRY(ReadRecord(handle, record, &blob));
if (cache_) {
auto cache_value = new OwnedSlice(std::move(blob));
auto cache_size = cache_value->size() + sizeof(*cache_value);
cache_->Insert(cache_key, cache_value, cache_size,
&DeleteCacheValue<OwnedSlice>, &cache_handle);
buffer->PinSlice(*cache_value, UnrefCacheHandle, cache_.get(),
cache_handle);
} else {
buffer->PinSlice(blob, OwnedSlice::CleanupFunc, blob.release(), nullptr);
}
return Status::OK();
}
Status BlobFileReader::ReadRecord(const BlobHandle& handle, BlobRecord* record,
OwnedSlice* buffer) {
Slice blob;
CacheAllocationPtr ubuf(new char[handle.size]);
TRY(file_->Read(handle.offset, handle.size, &blob, ubuf.get()));
// something must be wrong
if (handle.size != blob.size()) {
fprintf(stderr, "ReadRecord actual size:%lu != blob size:%lu\n",
blob.size(), static_cast<std::size_t>(handle.size));
abort();
}
BlobDecoder decoder;
TRY(decoder.DecodeHeader(&blob));
buffer->reset(std::move(ubuf), blob);
TRY(decoder.DecodeRecord(&blob, record, buffer));
return Status::OK();
}
Status BlobFilePrefetcher::Get(const ReadOptions& options,
const BlobHandle& handle, BlobRecord* record,
PinnableSlice* buffer) {
if (handle.offset == last_offset_) {
last_offset_ = handle.offset + handle.size;
if (handle.offset + handle.size > readahead_limit_) {
readahead_size_ = std::max(handle.size, readahead_size_);
reader_->file_->Prefetch(handle.offset, readahead_size_);
readahead_limit_ = handle.offset + readahead_size_;
readahead_size_ = std::min(kMaxReadaheadSize, readahead_size_ * 2);
}
} else {
last_offset_ = handle.offset + handle.size;
readahead_size_ = 0;
readahead_limit_ = 0;
}
return reader_->Get(options, handle, record, buffer);
}
} // namespace titandb
} // namespace rocksdb
#pragma once
#include "util/file_reader_writer.h"
#include "utilities/titandb/blob_format.h"
#include "utilities/titandb/options.h"
namespace rocksdb {
namespace titandb {
Status NewBlobFileReader(uint64_t file_number, uint64_t readahead_size,
const TitanDBOptions& db_options,
const EnvOptions& env_options, Env* env,
std::unique_ptr<RandomAccessFileReader>* result);
class BlobFileReader {
public:
// Opens a blob file and read the necessary metadata from it.
// If successful, sets "*result" to the newly opened file reader.
static Status Open(const TitanCFOptions& options,
std::unique_ptr<RandomAccessFileReader> file,
uint64_t file_size,
std::unique_ptr<BlobFileReader>* result);
// Gets the blob record pointed by the handle in this file. The data
// of the record is stored in the provided buffer, so the buffer
// must be valid when the record is used.
Status Get(const ReadOptions& options, const BlobHandle& handle,
BlobRecord* record, PinnableSlice* buffer);
private:
friend class BlobFilePrefetcher;
BlobFileReader(const TitanCFOptions& options,
std::unique_ptr<RandomAccessFileReader> file);
Status ReadRecord(const BlobHandle& handle, BlobRecord* record,
OwnedSlice* buffer);
TitanCFOptions options_;
std::unique_ptr<RandomAccessFileReader> file_;
std::shared_ptr<Cache> cache_;
std::string cache_prefix_;
// Information read from the file.
BlobFileFooter footer_;
};
// Performs readahead on continuous reads.
class BlobFilePrefetcher : public Cleanable {
public:
// Constructs a prefetcher with the blob file reader.
// "*reader" must be valid when the prefetcher is used.
BlobFilePrefetcher(BlobFileReader* reader) : reader_(reader) {}
Status Get(const ReadOptions& options, const BlobHandle& handle,
BlobRecord* record, PinnableSlice* buffer);
private:
BlobFileReader* reader_;
uint64_t last_offset_{0};
uint64_t readahead_size_{0};
uint64_t readahead_limit_{0};
};
} // namespace titandb
} // namespace rocksdb
#include "utilities/titandb/blob_file_size_collector.h"
#include "base_db_listener.h"
namespace rocksdb {
namespace titandb {
TablePropertiesCollector*
BlobFileSizeCollectorFactory::CreateTablePropertiesCollector(
rocksdb::TablePropertiesCollectorFactory::Context /* context */) {
return new BlobFileSizeCollector();
}
const std::string BlobFileSizeCollector::kPropertiesName =
"TitanDB.blob_discardable_size";
bool BlobFileSizeCollector::Encode(
const std::map<uint64_t, uint64_t>& blob_files_size, std::string* result) {
PutVarint32(result, static_cast<uint32_t>(blob_files_size.size()));
for (const auto& bfs : blob_files_size) {
PutVarint64(result, bfs.first);
PutVarint64(result, bfs.second);
}
return true;
}
bool BlobFileSizeCollector::Decode(
Slice* slice, std::map<uint64_t, uint64_t>* blob_files_size) {
uint32_t num = 0;
if (!GetVarint32(slice, &num)) {
return false;
}
uint64_t file_number;
uint64_t size;
for (uint32_t i = 0; i < num; ++i) {
if (!GetVarint64(slice, &file_number)) {
return false;
}
if (!GetVarint64(slice, &size)) {
return false;
}
(*blob_files_size)[file_number] = size;
}
return true;
}
Status BlobFileSizeCollector::AddUserKey(const Slice& /* key */,
const Slice& value, EntryType type,
SequenceNumber /* seq */,
uint64_t /* file_size */) {
if (type != kEntryBlobIndex) {
return Status::OK();
}
BlobIndex index;
auto s = index.DecodeFrom(const_cast<Slice*>(&value));
if (!s.ok()) {
return s;
}
auto iter = blob_files_size_.find(index.file_number);
if (iter == blob_files_size_.end()) {
blob_files_size_[index.file_number] = index.blob_handle.size;
} else {
iter->second += index.blob_handle.size;
}
return Status::OK();
}
Status BlobFileSizeCollector::Finish(UserCollectedProperties* properties) {
if (blob_files_size_.empty()) {
return Status::OK();
}
std::string res;
if (!Encode(blob_files_size_, &res) || res.empty()) {
fprintf(stderr, "blob file size collector encode failed\n");
abort();
}
properties->emplace(std::make_pair(kPropertiesName, res));
return Status::OK();
}
} // namespace titandb
} // namespace rocksdb
#pragma once
#include "rocksdb/listener.h"
#include "rocksdb/table_properties.h"
#include "util/coding.h"
#include "utilities/titandb/db_impl.h"
#include "utilities/titandb/version_set.h"
namespace rocksdb {
namespace titandb {
class BlobFileSizeCollectorFactory final
: public TablePropertiesCollectorFactory {
public:
TablePropertiesCollector* CreateTablePropertiesCollector(
TablePropertiesCollectorFactory::Context context) override;
const char* Name() const override { return "BlobFileSizeCollector"; }
};
class BlobFileSizeCollector final : public TablePropertiesCollector {
public:
const static std::string kPropertiesName;
static bool Encode(const std::map<uint64_t, uint64_t>& blob_files_size,
std::string* result);
static bool Decode(Slice* slice,
std::map<uint64_t, uint64_t>* blob_files_size);
Status AddUserKey(const Slice& key, const Slice& value, EntryType type,
SequenceNumber seq, uint64_t file_size) override;
Status Finish(UserCollectedProperties* properties) override;
UserCollectedProperties GetReadableProperties() const override {
return UserCollectedProperties();
}
const char* Name() const override { return "BlobFileSizeCollector"; }
private:
std::map<uint64_t, uint64_t> blob_files_size_;
};
} // namespace titandb
} // namespace rocksdb
#include "utilities/titandb/blob_file_size_collector.h"
#include "util/testharness.h"
namespace rocksdb {
namespace titandb {
class BlobFileSizeCollectorTest : public testing::Test {
public:
Env* env_{Env::Default()};
EnvOptions env_options_;
Options options_;
TitanDBOptions db_options_;
TitanCFOptions cf_options_;
MutableCFOptions cf_moptions_;
ImmutableCFOptions cf_ioptions_;
std::unique_ptr<TableFactory> table_factory_;
std::vector<std::unique_ptr<IntTblPropCollectorFactory>> collectors_;
std::string tmpdir_;
std::string file_name_;
BlobFileSizeCollectorTest()
: cf_moptions_(cf_options_),
cf_ioptions_(options_),
table_factory_(NewBlockBasedTableFactory()),
tmpdir_(test::TmpDir(env_)),
file_name_(tmpdir_ + "/TEST") {
db_options_.dirname = tmpdir_;
auto blob_file_size_collector_factory =
std::make_shared<BlobFileSizeCollectorFactory>();
collectors_.emplace_back(new UserKeyTablePropertiesCollectorFactory(
blob_file_size_collector_factory));
}
~BlobFileSizeCollectorTest() {
env_->DeleteFile(file_name_);
env_->DeleteDir(tmpdir_);
}
void NewFileWriter(std::unique_ptr<WritableFileWriter>* result) {
std::unique_ptr<WritableFile> writable_file;
ASSERT_OK(env_->NewWritableFile(file_name_, &writable_file, env_options_));
result->reset(
new WritableFileWriter(std::move(writable_file), file_name_, env_options_));
ASSERT_TRUE(*result);
}
void NewTableBuilder(WritableFileWriter* file,
std::unique_ptr<TableBuilder>* result) {
TableBuilderOptions options(cf_ioptions_, cf_moptions_,
cf_ioptions_.internal_comparator, &collectors_,
kNoCompression, CompressionOptions(), nullptr,
false, kDefaultColumnFamilyName, 0);
result->reset(table_factory_->NewTableBuilder(options, 0, file));
ASSERT_TRUE(*result);
}
void NewFileReader(std::unique_ptr<RandomAccessFileReader>* result) {
std::unique_ptr<RandomAccessFile> file;
ASSERT_OK(env_->NewRandomAccessFile(file_name_, &file, env_options_));
result->reset(
new RandomAccessFileReader(std::move(file), file_name_, env_));
ASSERT_TRUE(*result);
}
void NewTableReader(std::unique_ptr<RandomAccessFileReader>&& file,
std::unique_ptr<TableReader>* result) {
TableReaderOptions options(cf_ioptions_, nullptr, env_options_,
cf_ioptions_.internal_comparator);
uint64_t file_size = 0;
ASSERT_OK(env_->GetFileSize(file->file_name(), &file_size));
ASSERT_TRUE(file_size > 0);
ASSERT_OK(table_factory_->NewTableReader(options, std::move(file),
file_size, result));
ASSERT_TRUE(*result);
}
};
TEST_F(BlobFileSizeCollectorTest, Basic) {
std::unique_ptr<WritableFileWriter> wfile;
NewFileWriter(&wfile);
std::unique_ptr<TableBuilder> table_builder;
NewTableBuilder(wfile.get(), &table_builder);
const int kNumEntries = 100;
char buf[16];
for (int i = 0; i < kNumEntries; i++) {
ParsedInternalKey ikey;
snprintf(buf, sizeof(buf), "%15d", i);
ikey.user_key = buf;
ikey.type = kTypeBlobIndex;
std::string key;
AppendInternalKey(&key, ikey);
BlobIndex index;
if (i % 2 == 0) {
index.file_number = 0ULL;
} else {
index.file_number = 1ULL;
}
index.blob_handle.size = 10;
std::string value;
index.EncodeTo(&value);
table_builder->Add(key, value);
}
ASSERT_OK(table_builder->status());
ASSERT_EQ(kNumEntries, table_builder->NumEntries());
ASSERT_OK(table_builder->Finish());
ASSERT_OK(wfile->Flush());
ASSERT_OK(wfile->Sync(true));
std::unique_ptr<RandomAccessFileReader> rfile;
NewFileReader(&rfile);
std::unique_ptr<TableReader> table_reader;
NewTableReader(std::move(rfile), &table_reader);
auto table_properties = table_reader->GetTableProperties();
ASSERT_TRUE(table_properties);
auto iter = table_properties->user_collected_properties.find(
BlobFileSizeCollector::kPropertiesName);
ASSERT_TRUE(iter != table_properties->user_collected_properties.end());
Slice raw_blob_file_size_prop(iter->second);
std::map<uint64_t, uint64_t> result;
BlobFileSizeCollector::Decode(&raw_blob_file_size_prop, &result);
ASSERT_EQ(2, result.size());
ASSERT_EQ(kNumEntries / 2 * 10, result[0]);
ASSERT_EQ(kNumEntries / 2 * 10, result[1]);
}
} // namespace titandb
} // namespace rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#include "util/filename.h"
#include "util/testharness.h"
#include "utilities/titandb/blob_file_builder.h"
#include "utilities/titandb/blob_file_cache.h"
#include "utilities/titandb/blob_file_reader.h"
namespace rocksdb {
namespace titandb {
class BlobFileTest : public testing::Test {
public:
BlobFileTest() : dirname_(test::TmpDir(env_)) {
file_name_ = BlobFileName(dirname_, file_number_);
}
~BlobFileTest() {
env_->DeleteFile(file_name_);
env_->DeleteDir(dirname_);
}
void TestBlobFilePrefetcher(TitanOptions options) {
options.dirname = dirname_;
TitanDBOptions db_options(options);
TitanCFOptions cf_options(options);
BlobFileCache cache(db_options, cf_options, {NewLRUCache(128)});
const int n = 100;
std::vector<BlobHandle> handles(n);
std::unique_ptr<WritableFileWriter> file;
{
std::unique_ptr<WritableFile> f;
ASSERT_OK(env_->NewWritableFile(file_name_, &f, env_options_));
file.reset(new WritableFileWriter(std::move(f), file_name_, env_options_));
}
std::unique_ptr<BlobFileBuilder> builder(
new BlobFileBuilder(cf_options, file.get()));
for (int i = 0; i < n; i++) {
auto key = std::to_string(i);
auto value = std::string(1024, i);
BlobRecord record;
record.key = key;
record.value = value;
builder->Add(record, &handles[i]);
ASSERT_OK(builder->status());
}
ASSERT_OK(builder->Finish());
ASSERT_OK(builder->status());
uint64_t file_size = 0;
ASSERT_OK(env_->GetFileSize(file_name_, &file_size));
ReadOptions ro;
std::unique_ptr<BlobFilePrefetcher> prefetcher;
ASSERT_OK(cache.NewPrefetcher(file_number_, file_size, &prefetcher));
for (int i = 0; i < n; i++) {
auto key = std::to_string(i);
auto value = std::string(1024, i);
BlobRecord expect;
expect.key = key;
expect.value = value;
BlobRecord record;
PinnableSlice buffer;
ASSERT_OK(
cache.Get(ro, file_number_, file_size, handles[i], &record, &buffer));
ASSERT_EQ(record, expect);
buffer.Reset();
ASSERT_OK(
cache.Get(ro, file_number_, file_size, handles[i], &record, &buffer));
ASSERT_EQ(record, expect);
buffer.Reset();
ASSERT_OK(prefetcher->Get(ro, handles[i], &record, &buffer));
ASSERT_EQ(record, expect);
buffer.Reset();
ASSERT_OK(prefetcher->Get(ro, handles[i], &record, &buffer));
ASSERT_EQ(record, expect);
}
}
void TestBlobFileReader(TitanOptions options) {
options.dirname = dirname_;
TitanDBOptions db_options(options);
TitanCFOptions cf_options(options);
BlobFileCache cache(db_options, cf_options, {NewLRUCache(128)});
const int n = 100;
std::vector<BlobHandle> handles(n);
std::unique_ptr<WritableFileWriter> file;
{
std::unique_ptr<WritableFile> f;
ASSERT_OK(env_->NewWritableFile(file_name_, &f, env_options_));
file.reset(new WritableFileWriter(std::move(f), file_name_, env_options_));
}
std::unique_ptr<BlobFileBuilder> builder(
new BlobFileBuilder(cf_options, file.get()));
for (int i = 0; i < n; i++) {
auto key = std::to_string(i);
auto value = std::string(1024, i);
BlobRecord record;
record.key = key;
record.value = value;
builder->Add(record, &handles[i]);
ASSERT_OK(builder->status());
}
ASSERT_OK(builder->Finish());
ASSERT_OK(builder->status());
uint64_t file_size = 0;
ASSERT_OK(env_->GetFileSize(file_name_, &file_size));
ReadOptions ro;
std::unique_ptr<RandomAccessFileReader> random_access_file_reader;
ASSERT_OK(NewBlobFileReader(file_number_, 0, db_options, env_options_, env_,
&random_access_file_reader));
std::unique_ptr<BlobFileReader> blob_file_reader;
ASSERT_OK(BlobFileReader::Open(cf_options,
std::move(random_access_file_reader),
file_size, &blob_file_reader));
for (int i = 0; i < n; i++) {
auto key = std::to_string(i);
auto value = std::string(1024, i);
BlobRecord expect;
expect.key = key;
expect.value = value;
BlobRecord record;
PinnableSlice buffer;
ASSERT_OK(
cache.Get(ro, file_number_, file_size, handles[i], &record, &buffer));
ASSERT_EQ(record, expect);
buffer.Reset();
ASSERT_OK(
cache.Get(ro, file_number_, file_size, handles[i], &record, &buffer));
ASSERT_EQ(record, expect);
buffer.Reset();
ASSERT_OK(blob_file_reader->Get(ro, handles[i], &record, &buffer));
ASSERT_EQ(record, expect);
buffer.Reset();
ASSERT_OK(blob_file_reader->Get(ro, handles[i], &record, &buffer));
ASSERT_EQ(record, expect);
}
}
Env* env_{Env::Default()};
EnvOptions env_options_;
std::string dirname_;
std::string file_name_;
uint64_t file_number_{1};
};
TEST_F(BlobFileTest, BlobFileReader) {
TitanOptions options;
TestBlobFileReader(options);
options.blob_file_compression = kLZ4Compression;
TestBlobFileReader(options);
}
TEST_F(BlobFileTest, BlobFilePrefetcher) {
TitanOptions options;
TestBlobFilePrefetcher(options);
options.blob_cache = NewLRUCache(1 << 20);
TestBlobFilePrefetcher(options);
options.blob_file_compression = kLZ4Compression;
TestBlobFilePrefetcher(options);
}
} // namespace titandb
} // namespace rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#include "utilities/titandb/blob_format.h"
#include "util/crc32c.h"
#include "util/sync_point.h"
namespace rocksdb {
namespace titandb {
namespace {
bool GetChar(Slice* src, unsigned char* value) {
if (src->size() < 1) return false;
*value = *src->data();
src->remove_prefix(1);
return true;
}
} // namespace
void BlobRecord::EncodeTo(std::string* dst) const {
PutLengthPrefixedSlice(dst, key);
PutLengthPrefixedSlice(dst, value);
}
Status BlobRecord::DecodeFrom(Slice* src) {
if (!GetLengthPrefixedSlice(src, &key) ||
!GetLengthPrefixedSlice(src, &value)) {
return Status::Corruption("BlobRecord");
}
return Status::OK();
}
bool operator==(const BlobRecord& lhs, const BlobRecord& rhs) {
return lhs.key == rhs.key && lhs.value == rhs.value;
}
void BlobEncoder::EncodeRecord(const BlobRecord& record) {
record_buffer_.clear();
compressed_buffer_.clear();
CompressionType compression;
record.EncodeTo(&record_buffer_);
record_ = Compress(compression_ctx_, record_buffer_, &compressed_buffer_,
&compression);
EXPECT(record_.size() < std::numeric_limits<uint32_t>::max());
EncodeFixed32(header_ + 4, static_cast<uint32_t>(record_.size()));
header_[8] = compression;
uint32_t crc = crc32c::Value(header_ + 4, sizeof(header_) - 4);
crc = crc32c::Extend(crc, record_.data(), record_.size());
EncodeFixed32(header_, crc);
}
Status BlobDecoder::DecodeHeader(Slice* src) {
if (!GetFixed32(src, &crc_)) {
return Status::Corruption("BlobHeader");
}
header_crc_ = crc32c::Value(src->data(), kBlobHeaderSize - 4);
unsigned char compression;
if (!GetFixed32(src, &record_size_) || !GetChar(src, &compression)) {
return Status::Corruption("BlobHeader");
}
compression_ = static_cast<CompressionType>(compression);
return Status::OK();
}
Status BlobDecoder::DecodeRecord(Slice* src, BlobRecord* record,
OwnedSlice* buffer) {
TEST_SYNC_POINT_CALLBACK("BlobDecoder::DecodeRecord", &crc_);
Slice input(src->data(), record_size_);
src->remove_prefix(record_size_);
uint32_t crc = crc32c::Extend(header_crc_, input.data(), input.size());
if (crc != crc_) {
return Status::Corruption("BlobRecord", "checksum mismatch");
}
if (compression_ == kNoCompression) {
return DecodeInto(input, record);
}
UncompressionContext ctx(compression_);
TRY(Uncompress(ctx, input, buffer));
return DecodeInto(*buffer, record);
}
void BlobHandle::EncodeTo(std::string* dst) const {
PutVarint64(dst, offset);
PutVarint64(dst, size);
}
Status BlobHandle::DecodeFrom(Slice* src) {
if (!GetVarint64(src, &offset) || !GetVarint64(src, &size)) {
return Status::Corruption("BlobHandle");
}
return Status::OK();
}
bool operator==(const BlobHandle& lhs, const BlobHandle& rhs) {
return lhs.offset == rhs.offset && lhs.size == rhs.size;
}
void BlobIndex::EncodeTo(std::string* dst) const {
dst->push_back(kBlobRecord);
PutVarint64(dst, file_number);
blob_handle.EncodeTo(dst);
}
Status BlobIndex::DecodeFrom(Slice* src) {
unsigned char type;
if (!GetChar(src, &type) || type != kBlobRecord ||
!GetVarint64(src, &file_number)) {
return Status::Corruption("BlobIndex");
}
Status s = blob_handle.DecodeFrom(src);
if (!s.ok()) {
return Status::Corruption("BlobIndex", s.ToString());
}
return s;
}
bool operator==(const BlobIndex& lhs, const BlobIndex& rhs) {
return (lhs.file_number == rhs.file_number &&
lhs.blob_handle == rhs.blob_handle);
}
void BlobFileMeta::EncodeTo(std::string* dst) const {
PutVarint64(dst, file_number_);
PutVarint64(dst, file_size_);
}
Status BlobFileMeta::DecodeFrom(Slice* src) {
if (!GetVarint64(src, &file_number_) || !GetVarint64(src, &file_size_)) {
return Status::Corruption("BlobFileMeta Decode failed");
}
return Status::OK();
}
bool operator==(const BlobFileMeta& lhs, const BlobFileMeta& rhs) {
return (lhs.file_number_ == rhs.file_number_ &&
lhs.file_size_ == rhs.file_size_);
}
void BlobFileMeta::FileStateTransit(const FileEvent& event) {
switch (event) {
case FileEvent::kFlushCompleted:
// blob file maybe generated by flush or gc, because gc will rewrite valid
// keys to memtable. If it's generated by gc, we will leave gc to change
// its file state. If it's generated by flush, we need to change it to
// normal state after flush completed.
assert(state_ == FileState::kPendingLSM ||
state_ == FileState::kPendingGC || state_ == FileState::kNormal ||
state_ == FileState::kBeingGC);
if (state_ == FileState::kPendingLSM) state_ = FileState::kNormal;
break;
case FileEvent::kGCCompleted:
// file is marked obsoleted during gc
if (state_ == FileState::kObsolete) {
break;
}
assert(state_ == FileState::kPendingGC || state_ == FileState::kBeingGC);
state_ = FileState::kNormal;
break;
case FileEvent::kCompactionCompleted:
assert(state_ == FileState::kPendingLSM);
state_ = FileState::kNormal;
break;
case FileEvent::kGCBegin:
assert(state_ == FileState::kNormal);
state_ = FileState::kBeingGC;
break;
case FileEvent::kGCOutput:
assert(state_ == FileState::kInit);
state_ = FileState::kPendingGC;
break;
case FileEvent::kFlushOrCompactionOutput:
assert(state_ == FileState::kInit);
state_ = FileState::kPendingLSM;
break;
case FileEvent::kDbRestart:
assert(state_ == FileState::kInit);
state_ = FileState::kNormal;
break;
case FileEvent::kDelete:
assert(state_ != FileState::kObsolete);
state_ = FileState::kObsolete;
break;
default:
fprintf(stderr,
"Unknown file event[%d], file number[%lu], file state[%d]",
static_cast<int>(event), static_cast<std::size_t>(file_number_),
static_cast<int>(state_));
abort();
}
}
void BlobFileMeta::AddDiscardableSize(uint64_t _discardable_size) {
assert(_discardable_size < file_size_);
discardable_size_ += _discardable_size;
assert(discardable_size_ < file_size_);
}
double BlobFileMeta::GetDiscardableRatio() const {
return static_cast<double>(discardable_size_) /
static_cast<double>(file_size_);
}
void BlobFileHeader::EncodeTo(std::string* dst) const {
PutFixed32(dst, kHeaderMagicNumber);
PutFixed32(dst, version);
}
Status BlobFileHeader::DecodeFrom(Slice* src) {
uint32_t magic_number = 0;
if (!GetFixed32(src, &magic_number) || magic_number != kHeaderMagicNumber) {
return Status::Corruption(
"Blob file header magic number missing or mismatched.");
}
if (!GetFixed32(src, &version) || version != kVersion1) {
return Status::Corruption("Blob file header version missing or invalid.");
}
return Status::OK();
}
void BlobFileFooter::EncodeTo(std::string* dst) const {
auto size = dst->size();
meta_index_handle.EncodeTo(dst);
// Add padding to make a fixed size footer.
dst->resize(size + kEncodedLength - 12);
PutFixed64(dst, kFooterMagicNumber);
Slice encoded(dst->data() + size, dst->size() - size);
PutFixed32(dst, crc32c::Value(encoded.data(), encoded.size()));
}
Status BlobFileFooter::DecodeFrom(Slice* src) {
auto data = src->data();
Status s = meta_index_handle.DecodeFrom(src);
if (!s.ok()) {
return Status::Corruption("BlobFileFooter", s.ToString());
}
// Remove padding.
src->remove_prefix(data + kEncodedLength - 12 - src->data());
uint64_t magic_number = 0;
if (!GetFixed64(src, &magic_number) || magic_number != kFooterMagicNumber) {
return Status::Corruption("BlobFileFooter", "magic number");
}
Slice decoded(data, src->data() - data);
uint32_t checksum = 0;
if (!GetFixed32(src, &checksum) ||
crc32c::Value(decoded.data(), decoded.size()) != checksum) {
return Status::Corruption("BlobFileFooter", "checksum");
}
return Status::OK();
}
bool operator==(const BlobFileFooter& lhs, const BlobFileFooter& rhs) {
return (lhs.meta_index_handle.offset() == rhs.meta_index_handle.offset() &&
lhs.meta_index_handle.size() == rhs.meta_index_handle.size());
}
} // namespace titandb
} // namespace rocksdb
#pragma once
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "table/format.h"
#include "utilities/titandb/util.h"
namespace rocksdb {
namespace titandb {
// Blob header format:
//
// crc : fixed32
// size : fixed32
// compression : char
const uint64_t kBlobHeaderSize = 9;
// Blob record format:
//
// key : varint64 length + length bytes
// value : varint64 length + length bytes
struct BlobRecord {
Slice key;
Slice value;
void EncodeTo(std::string* dst) const;
Status DecodeFrom(Slice* src);
friend bool operator==(const BlobRecord& lhs, const BlobRecord& rhs);
};
class BlobEncoder {
public:
BlobEncoder(CompressionType compression) : compression_ctx_(compression) {}
void EncodeRecord(const BlobRecord& record);
Slice GetHeader() const { return Slice(header_, sizeof(header_)); }
Slice GetRecord() const { return record_; }
size_t GetEncodedSize() const { return sizeof(header_) + record_.size(); }
private:
char header_[kBlobHeaderSize];
Slice record_;
std::string record_buffer_;
std::string compressed_buffer_;
CompressionContext compression_ctx_;
};
class BlobDecoder {
public:
Status DecodeHeader(Slice* src);
Status DecodeRecord(Slice* src, BlobRecord* record, OwnedSlice* buffer);
size_t GetRecordSize() const { return record_size_; }
private:
uint32_t crc_{0};
uint32_t header_crc_{0};
uint32_t record_size_{0};
CompressionType compression_{kNoCompression};
};
// Blob handle format:
//
// offset : varint64
// size : varint64
struct BlobHandle {
uint64_t offset{0};
uint64_t size{0};
void EncodeTo(std::string* dst) const;
Status DecodeFrom(Slice* src);
friend bool operator==(const BlobHandle& lhs, const BlobHandle& rhs);
};
// Blob index format:
//
// type : char
// file_number_ : varint64
// blob_handle : varint64 offset + varint64 size
struct BlobIndex {
enum Type : unsigned char {
kBlobRecord = 1,
};
uint64_t file_number{0};
BlobHandle blob_handle;
void EncodeTo(std::string* dst) const;
Status DecodeFrom(Slice* src);
friend bool operator==(const BlobIndex& lhs, const BlobIndex& rhs);
};
// Blob file meta format:
//
// file_number_ : varint64
// file_size_ : varint64
class BlobFileMeta {
public:
enum class FileEvent {
kInit,
kFlushCompleted,
kCompactionCompleted,
kGCCompleted,
kGCBegin,
kGCOutput,
kFlushOrCompactionOutput,
kDbRestart,
kDelete,
};
enum class FileState {
kInit, // file never at this state
kNormal,
kPendingLSM, // waiting keys adding to LSM
kBeingGC, // being gced
kPendingGC, // output of gc, waiting gc finish and keys adding to LSM
kObsolete, // already gced, but wait to be physical deleted
};
BlobFileMeta() = default;
BlobFileMeta(uint64_t _file_number, uint64_t _file_size)
: file_number_(_file_number), file_size_(_file_size) {}
friend bool operator==(const BlobFileMeta& lhs, const BlobFileMeta& rhs);
void EncodeTo(std::string* dst) const;
Status DecodeFrom(Slice* src);
uint64_t file_number() const { return file_number_; }
uint64_t file_size() const { return file_size_; }
FileState file_state() const { return state_; }
bool is_obsolete() const { return state_ == FileState::kObsolete; }
uint64_t discardable_size() const { return discardable_size_; }
void FileStateTransit(const FileEvent& event);
void AddDiscardableSize(uint64_t _discardable_size);
double GetDiscardableRatio() const;
private:
// Persistent field
uint64_t file_number_{0};
uint64_t file_size_{0};
// Not persistent field
FileState state_{FileState::kInit};
uint64_t discardable_size_{0};
// bool marked_for_gc_{false};
};
// Blob file header format.
// The header is mean to be compatible with header of BlobDB blob files, except
// we use a different magic number.
//
// magic_number : fixed32
// version : fixed32
struct BlobFileHeader {
// The first 32bits from $(echo titandb/blob | sha1sum).
static const uint32_t kHeaderMagicNumber = 0x2be0a614ul;
static const uint32_t kVersion1 = 1;
static const uint64_t kEncodedLength = 4 + 4;
uint32_t version = kVersion1;
void EncodeTo(std::string* dst) const;
Status DecodeFrom(Slice* src);
};
// Blob file footer format:
//
// meta_index_handle : varint64 offset + varint64 size
// <padding> : [... kEncodedLength - 12] bytes
// magic_number : fixed64
// checksum : fixed32
struct BlobFileFooter {
// The first 64bits from $(echo titandb/blob | sha1sum).
static const uint64_t kFooterMagicNumber{0x2be0a6148e39edc6ull};
static const uint64_t kEncodedLength{BlockHandle::kMaxEncodedLength + 8 + 4};
BlockHandle meta_index_handle{BlockHandle::NullBlockHandle()};
void EncodeTo(std::string* dst) const;
Status DecodeFrom(Slice* src);
friend bool operator==(const BlobFileFooter& lhs, const BlobFileFooter& rhs);
};
// A convenient template to decode a const slice.
template <typename T>
Status DecodeInto(const Slice& src, T* target) {
auto tmp = src;
auto s = target->DecodeFrom(&tmp);
if (s.ok() && !tmp.empty()) {
s = Status::Corruption(Slice());
}
return s;
}
} // namespace titandb
} // namespace rocksdb
#include "utilities/titandb/blob_format.h"
#include "util/testharness.h"
#include "utilities/titandb/testutil.h"
#include "utilities/titandb/util.h"
namespace rocksdb {
namespace titandb {
class BlobFormatTest : public testing::Test {};
TEST(BlobFormatTest, BlobRecord) {
BlobRecord input;
CheckCodec(input);
input.key = "hello";
input.value = "world";
CheckCodec(input);
}
TEST(BlobFormatTest, BlobHandle) {
BlobHandle input;
CheckCodec(input);
input.offset = 2;
input.size = 3;
CheckCodec(input);
}
TEST(BlobFormatTest, BlobIndex) {
BlobIndex input;
CheckCodec(input);
input.file_number = 1;
input.blob_handle.offset = 2;
input.blob_handle.size = 3;
CheckCodec(input);
}
TEST(BlobFormatTest, BlobFileMeta) {
BlobFileMeta input(2, 3);
CheckCodec(input);
}
TEST(BlobFormatTest, BlobFileFooter) {
BlobFileFooter input;
CheckCodec(input);
input.meta_index_handle.set_offset(123);
input.meta_index_handle.set_size(321);
CheckCodec(input);
}
TEST(BlobFormatTest, BlobFileStateTransit) {
BlobFileMeta blob_file;
ASSERT_EQ(blob_file.file_state(), BlobFileMeta::FileState::kInit);
blob_file.FileStateTransit(BlobFileMeta::FileEvent::kDbRestart);
ASSERT_EQ(blob_file.file_state(), BlobFileMeta::FileState::kNormal);
blob_file.FileStateTransit(BlobFileMeta::FileEvent::kGCBegin);
ASSERT_EQ(blob_file.file_state(), BlobFileMeta::FileState::kBeingGC);
blob_file.FileStateTransit(BlobFileMeta::FileEvent::kGCCompleted);
BlobFileMeta compaction_output;
ASSERT_EQ(compaction_output.file_state(), BlobFileMeta::FileState::kInit);
compaction_output.FileStateTransit(
BlobFileMeta::FileEvent::kFlushOrCompactionOutput);
ASSERT_EQ(compaction_output.file_state(),
BlobFileMeta::FileState::kPendingLSM);
compaction_output.FileStateTransit(
BlobFileMeta::FileEvent::kCompactionCompleted);
ASSERT_EQ(compaction_output.file_state(), BlobFileMeta::FileState::kNormal);
}
} // namespace titandb
} // namespace rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#include "utilities/titandb/blob_gc.h"
namespace rocksdb {
namespace titandb {
BlobGC::BlobGC(std::vector<BlobFileMeta*>&& blob_files,
TitanCFOptions&& _titan_cf_options)
: inputs_(std::move(blob_files)),
titan_cf_options_(std::move(_titan_cf_options)) {
MarkFilesBeingGC();
}
BlobGC::~BlobGC() {}
void BlobGC::SetColumnFamily(ColumnFamilyHandle* cfh) {
cfh_ = cfh;
}
ColumnFamilyData* BlobGC::GetColumnFamilyData() {
auto* cfhi = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh_);
return cfhi->cfd();
}
void BlobGC::AddOutputFile(BlobFileMeta* blob_file) {
blob_file->FileStateTransit(BlobFileMeta::FileEvent::kGCOutput);
outputs_.push_back(blob_file);
}
void BlobGC::MarkFilesBeingGC() {
for (auto& f : inputs_) {
f->FileStateTransit(BlobFileMeta::FileEvent::kGCBegin);
}
}
void BlobGC::ReleaseGcFiles() {
for (auto& f : inputs_) {
f->FileStateTransit(BlobFileMeta::FileEvent::kGCCompleted);
}
for (auto& f : outputs_) {
f->FileStateTransit(BlobFileMeta::FileEvent::kGCCompleted);
}
}
} // namespace titandb
} // namespace rocksdb
#pragma once
#include <memory>
#include "db/column_family.h"
#include "utilities/titandb/blob_format.h"
#include "utilities/titandb/options.h"
namespace rocksdb {
namespace titandb {
// A BlobGC encapsulates information about a blob gc.
class BlobGC {
public:
BlobGC(std::vector<BlobFileMeta*>&& blob_files,
TitanCFOptions&& _titan_cf_options);
// No copying allowed
BlobGC(const BlobGC&) = delete;
void operator=(const BlobGC&) = delete;
~BlobGC();
const std::vector<BlobFileMeta*>& inputs() { return inputs_; }
void set_sampled_inputs(std::vector<BlobFileMeta*>&& files) {
sampled_inputs_ = std::move(files);
}
const std::vector<BlobFileMeta*>& sampled_inputs() { return sampled_inputs_; }
const TitanCFOptions& titan_cf_options() { return titan_cf_options_; }
void SetColumnFamily(ColumnFamilyHandle* cfh);
ColumnFamilyHandle* column_family_handle() { return cfh_; }
ColumnFamilyData* GetColumnFamilyData();
void MarkFilesBeingGC();
void AddOutputFile(BlobFileMeta*);
void ReleaseGcFiles();
private:
std::vector<BlobFileMeta*> inputs_;
std::vector<BlobFileMeta*> sampled_inputs_;
std::vector<BlobFileMeta*> outputs_;
TitanCFOptions titan_cf_options_;
ColumnFamilyHandle* cfh_{nullptr};
};
struct GCScore {
uint64_t file_number;
double score;
};
} // namespace titandb
} // namespace rocksdb
This diff is collapsed.
#pragma once
#include "db/db_impl.h"
#include "rocksdb/status.h"
#include "utilities/titandb/blob_file_builder.h"
#include "utilities/titandb/blob_file_iterator.h"
#include "utilities/titandb/blob_file_manager.h"
#include "utilities/titandb/blob_gc.h"
#include "utilities/titandb/options.h"
#include "utilities/titandb/version_set.h"
namespace rocksdb {
namespace titandb {
class BlobGCJob {
public:
BlobGCJob(BlobGC* blob_gc, DB* db, port::Mutex* mutex,
const TitanDBOptions& titan_db_options, Env* env,
const EnvOptions& env_options, BlobFileManager* blob_file_manager,
VersionSet* version_set, LogBuffer* log_buffer,
std::atomic_bool* shuting_down);
// No copying allowed
BlobGCJob(const BlobGCJob&) = delete;
void operator=(const BlobGCJob&) = delete;
~BlobGCJob();
// REQUIRE: mutex held
Status Prepare();
// REQUIRE: mutex not held
Status Run();
// REQUIRE: mutex held
Status Finish();
private:
class GarbageCollectionWriteCallback;
friend class BlobGCJobTest;
BlobGC* blob_gc_;
DB* base_db_;
DBImpl* base_db_impl_;
port::Mutex* mutex_;
TitanDBOptions db_options_;
Env* env_;
EnvOptions env_options_;
BlobFileManager* blob_file_manager_;
titandb::VersionSet* version_set_;
LogBuffer* log_buffer_{nullptr};
std::vector<std::pair<std::unique_ptr<BlobFileHandle>,
std::unique_ptr<BlobFileBuilder>>>
blob_file_builders_;
std::vector<std::pair<WriteBatch, GarbageCollectionWriteCallback>>
rewrite_batches_;
InternalKeyComparator* cmp_{nullptr};
std::atomic_bool* shuting_down_{nullptr};
Status SampleCandidateFiles();
bool DoSample(const BlobFileMeta* file);
Status DoRunGC();
Status BuildIterator(std::unique_ptr<BlobFileMergeIterator>* result);
bool DiscardEntry(const Slice& key, const BlobIndex& blob_index);
Status InstallOutputBlobFiles();
Status RewriteValidKeyToLSM();
Status DeleteInputBlobFiles() const;
bool IsShutingDown();
};
} // namespace titandb
} // namespace rocksdb
#include "utilities/titandb/blob_gc_job.h"
#include "util/testharness.h"
#include "utilities/titandb/blob_gc_picker.h"
#include "utilities/titandb/db_impl.h"
namespace rocksdb {
namespace titandb {
const static int MAX_KEY_NUM = 1000;
std::string GenKey(int i) {
char buffer[32];
snprintf(buffer, sizeof(buffer), "k-%08d", i);
return buffer;
}
std::string GenValue(int i) {
char buffer[32];
snprintf(buffer, sizeof(buffer), "v-%08d", i);
return buffer;
}
class BlobGCJobTest : public testing::Test {
public:
std::string dbname_;
TitanDB* db_;
DBImpl* base_db_;
TitanDBImpl* tdb_;
VersionSet* version_set_;
TitanOptions options_;
port::Mutex* mutex_;
BlobGCJobTest() : dbname_(test::TmpDir()) {
options_.dirname = dbname_ + "/titandb";
options_.create_if_missing = true;
options_.disable_background_gc = true;
options_.min_blob_size = 0;
options_.env->CreateDirIfMissing(dbname_);
options_.env->CreateDirIfMissing(options_.dirname);
}
~BlobGCJobTest() {}
void CheckBlobNumber(int expected) {
auto b = version_set_->GetBlobStorage(base_db_->DefaultColumnFamily()->GetID()).lock();
ASSERT_EQ(expected, b->files_.size());
}
void ClearDir() {
std::vector<std::string> filenames;
options_.env->GetChildren(options_.dirname, &filenames);
for (auto& fname : filenames) {
if (fname != "." && fname != "..") {
ASSERT_OK(options_.env->DeleteFile(options_.dirname + "/" + fname));
}
}
options_.env->DeleteDir(options_.dirname);
filenames.clear();
options_.env->GetChildren(dbname_, &filenames);
for (auto& fname : filenames) {
if (fname != "." && fname != "..") {
options_.env->DeleteFile(dbname_ + "/" + fname);
}
}
}
void NewDB() {
ClearDir();
ASSERT_OK(TitanDB::Open(options_, dbname_, &db_));
tdb_ = reinterpret_cast<TitanDBImpl*>(db_);
version_set_ = tdb_->vset_.get();
mutex_ = &tdb_->mutex_;
base_db_ = reinterpret_cast<DBImpl*>(tdb_->GetRootDB());
}
void Flush() {
FlushOptions fopts;
fopts.wait = true;
ASSERT_OK(db_->Flush(fopts));
}
void DestroyDB() {
Status s __attribute__((__unused__)) = db_->Close();
assert(s.ok());
delete db_;
db_ = nullptr;
}
void RunGC() {
MutexLock l(mutex_);
Status s;
auto* cfh = base_db_->DefaultColumnFamily();
// Build BlobGC
TitanDBOptions db_options;
TitanCFOptions cf_options;
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options.info_log.get());
cf_options.min_gc_batch_size = 0;
cf_options.blob_file_discardable_ratio = 0.4;
std::unique_ptr<BlobGC> blob_gc;
{
std::shared_ptr<BlobGCPicker> blob_gc_picker =
std::make_shared<BasicBlobGCPicker>(db_options, cf_options);
blob_gc = blob_gc_picker->PickBlobGC(
version_set_->GetBlobStorage(cfh->GetID()).lock().get());
}
if (blob_gc) {
blob_gc->SetColumnFamily(cfh);
BlobGCJob blob_gc_job(blob_gc.get(), base_db_, mutex_, tdb_->db_options_,
tdb_->env_, EnvOptions(), tdb_->blob_manager_.get(),
version_set_, &log_buffer, nullptr);
s = blob_gc_job.Prepare();
ASSERT_OK(s);
{
mutex_->Unlock();
s = blob_gc_job.Run();
mutex_->Lock();
}
if (s.ok()) {
s = blob_gc_job.Finish();
ASSERT_OK(s);
}
}
mutex_->Unlock();
tdb_->PurgeObsoleteFiles();
mutex_->Lock();
}
Status NewIterator(uint64_t file_number, uint64_t file_size,
std::unique_ptr<BlobFileIterator>* iter) {
std::unique_ptr<RandomAccessFileReader> file;
Status s = NewBlobFileReader(file_number, 0, tdb_->db_options_,
tdb_->env_options_, tdb_->env_, &file);
if (!s.ok()) {
return s;
}
iter->reset(new BlobFileIterator(std::move(file), file_number, file_size,
TitanCFOptions()));
return Status::OK();
}
void TestDiscardEntry() {
NewDB();
auto* cfh = base_db_->DefaultColumnFamily();
BlobIndex blob_index;
blob_index.file_number = 0x81;
blob_index.blob_handle.offset = 0x98;
blob_index.blob_handle.size = 0x17;
std::string res;
blob_index.EncodeTo(&res);
std::string key = "test_discard_entry";
WriteBatch wb;
ASSERT_OK(WriteBatchInternal::PutBlobIndex(&wb, cfh->GetID(), key, res));
auto rewrite_status = base_db_->Write(WriteOptions(), &wb);
std::vector<BlobFileMeta*> tmp;
BlobGC blob_gc(std::move(tmp), TitanCFOptions());
blob_gc.SetColumnFamily(cfh);
BlobGCJob blob_gc_job(&blob_gc, base_db_, mutex_, TitanDBOptions(),
Env::Default(), EnvOptions(), nullptr, version_set_,
nullptr, nullptr);
ASSERT_FALSE(blob_gc_job.DiscardEntry(key, blob_index));
DestroyDB();
}
void TestRunGC() {
NewDB();
for (int i = 0; i < MAX_KEY_NUM; i++) {
db_->Put(WriteOptions(), GenKey(i), GenValue(i));
}
Flush();
std::string result;
for (int i = 0; i < MAX_KEY_NUM; i++) {
if (i % 2 != 0) continue;
db_->Delete(WriteOptions(), GenKey(i));
}
Flush();
auto b = version_set_->GetBlobStorage(base_db_->DefaultColumnFamily()->GetID()).lock();
ASSERT_EQ(b->files_.size(), 1);
auto old = b->files_.begin()->first;
// for (auto& f : b->files_) {
// f.second->marked_for_sample = false;
// }
std::unique_ptr<BlobFileIterator> iter;
ASSERT_OK(NewIterator(b->files_.begin()->second->file_number(),
b->files_.begin()->second->file_size(), &iter));
iter->SeekToFirst();
for (int i = 0; i < MAX_KEY_NUM; i++, iter->Next()) {
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_TRUE(iter->key().compare(Slice(GenKey(i))) == 0);
}
RunGC();
b = version_set_->GetBlobStorage(base_db_->DefaultColumnFamily()->GetID()).lock();
ASSERT_EQ(b->files_.size(), 1);
auto new1 = b->files_.begin()->first;
ASSERT_TRUE(old != new1);
ASSERT_OK(NewIterator(b->files_.begin()->second->file_number(),
b->files_.begin()->second->file_size(), &iter));
iter->SeekToFirst();
auto* db_iter = db_->NewIterator(ReadOptions(), db_->DefaultColumnFamily());
db_iter->SeekToFirst();
for (int i = 0; i < MAX_KEY_NUM; i++) {
if (i % 2 == 0) continue;
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_TRUE(iter->key().compare(Slice(GenKey(i))) == 0);
ASSERT_TRUE(iter->value().compare(Slice(GenValue(i))) == 0);
ASSERT_OK(db_->Get(ReadOptions(), iter->key(), &result));
ASSERT_TRUE(iter->value().size() == result.size());
ASSERT_TRUE(iter->value().compare(result) == 0);
ASSERT_OK(db_iter->status());
ASSERT_TRUE(db_iter->Valid());
ASSERT_TRUE(db_iter->key().compare(Slice(GenKey(i))) == 0);
ASSERT_TRUE(db_iter->value().compare(Slice(GenValue(i))) == 0);
iter->Next();
db_iter->Next();
}
delete db_iter;
ASSERT_FALSE(iter->Valid() || !iter->status().ok());
DestroyDB();
}
};
TEST_F(BlobGCJobTest, DiscardEntry) { TestDiscardEntry(); }
TEST_F(BlobGCJobTest, RunGC) { TestRunGC(); }
// Tests blob file will be kept after GC, if it is still visible by active snapshots.
TEST_F(BlobGCJobTest, PurgeBlobs) {
NewDB();
auto snap1 = db_->GetSnapshot();
for (int i = 0; i < 10; i++) {
db_->Put(WriteOptions(), GenKey(i), GenValue(i));
}
Flush();
CheckBlobNumber(1);
auto snap2 = db_->GetSnapshot();
auto snap3 = db_->GetSnapshot();
for (int i = 0; i < 10; i++) {
db_->Delete(WriteOptions(), GenKey(i));
}
Flush();
CheckBlobNumber(1);
auto snap4 = db_->GetSnapshot();
RunGC();
CheckBlobNumber(1);
for (int i = 10; i < 20; i++) {
db_->Put(WriteOptions(), GenKey(i), GenValue(i));
}
Flush();
auto snap5 = db_->GetSnapshot();
CheckBlobNumber(2);
db_->ReleaseSnapshot(snap2);
RunGC();
CheckBlobNumber(2);
db_->ReleaseSnapshot(snap3);
RunGC();
CheckBlobNumber(2);
db_->ReleaseSnapshot(snap1);
RunGC();
CheckBlobNumber(2);
db_->ReleaseSnapshot(snap4);
RunGC();
CheckBlobNumber(1);
db_->ReleaseSnapshot(snap5);
RunGC();
CheckBlobNumber(1);
DestroyDB();
}
} // namespace titandb
} // namespace rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#include "utilities/titandb/blob_gc_picker.h"
namespace rocksdb {
namespace titandb {
BasicBlobGCPicker::BasicBlobGCPicker(TitanDBOptions db_options,
TitanCFOptions cf_options)
: db_options_(db_options), cf_options_(cf_options) {}
BasicBlobGCPicker::~BasicBlobGCPicker() {}
std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC(
BlobStorage* blob_storage) {
Status s;
std::vector<BlobFileMeta*> blob_files;
uint64_t batch_size = 0;
// ROCKS_LOG_INFO(db_options_.info_log, "blob file num:%lu gc score:%lu",
// blob_storage->NumBlobFiles(), blob_storage->gc_score().size());
for (auto& gc_score : blob_storage->gc_score()) {
auto blob_file = blob_storage->FindFile(gc_score.file_number).lock();
assert(blob_file);
// ROCKS_LOG_INFO(db_options_.info_log,
// "file number:%lu score:%f being_gc:%d pending:%d, "
// "size:%lu discard:%lu mark_for_gc:%d
// mark_for_sample:%d", blob_file->file_number_,
// gc_score.score, blob_file->being_gc,
// blob_file->pending, blob_file->file_size_,
// blob_file->discardable_size_,
// blob_file->marked_for_gc_,
// blob_file->marked_for_sample);
if (!CheckBlobFile(blob_file.get())) {
ROCKS_LOG_INFO(db_options_.info_log, "file number:%lu no need gc",
blob_file->file_number());
continue;
}
blob_files.push_back(blob_file.get());
batch_size += blob_file->file_size();
if (batch_size >= cf_options_.max_gc_batch_size) break;
}
if (blob_files.empty() || batch_size < cf_options_.min_gc_batch_size)
return nullptr;
return std::unique_ptr<BlobGC>(
new BlobGC(std::move(blob_files), std::move(cf_options_)));
}
bool BasicBlobGCPicker::CheckBlobFile(BlobFileMeta* blob_file) const {
assert(blob_file->file_state() != BlobFileMeta::FileState::kInit);
if (blob_file->file_state() != BlobFileMeta::FileState::kNormal) return false;
return true;
}
} // namespace titandb
} // namespace rocksdb
#pragma once
#include <memory>
#include "db/column_family.h"
#include "db/write_callback.h"
#include "rocksdb/status.h"
#include "util/filename.h"
#include "utilities/titandb/blob_file_manager.h"
#include "utilities/titandb/blob_format.h"
#include "utilities/titandb/blob_gc.h"
#include "utilities/titandb/version.h"
namespace rocksdb {
namespace titandb {
class BlobGCPicker {
public:
BlobGCPicker(){};
virtual ~BlobGCPicker(){};
// Pick candidate blob files for a new gc.
// Returns nullptr if there is no gc to be done.
// Otherwise returns a pointer to a heap-allocated object that
// describes the gc. Caller should delete the result.
virtual std::unique_ptr<BlobGC> PickBlobGC(BlobStorage* blob_storage) = 0;
};
class BasicBlobGCPicker final : public BlobGCPicker {
public:
BasicBlobGCPicker(TitanDBOptions, TitanCFOptions);
~BasicBlobGCPicker();
std::unique_ptr<BlobGC> PickBlobGC(BlobStorage* blob_storage) override;
private:
TitanDBOptions db_options_;
TitanCFOptions cf_options_;
// Check if blob_file needs to gc, return true means we need pick this
// file for gc
bool CheckBlobFile(BlobFileMeta* blob_file) const;
};
} // namespace titandb
} // namespace rocksdb
#include "utilities/titandb/blob_gc_picker.h"
#include "util/filename.h"
#include "util/testharness.h"
#include "utilities/titandb/blob_file_builder.h"
#include "utilities/titandb/blob_file_cache.h"
#include "utilities/titandb/blob_file_iterator.h"
#include "utilities/titandb/blob_file_reader.h"
namespace rocksdb {
namespace titandb {
class BlobGCPickerTest : public testing::Test {
public:
std::unique_ptr<BlobStorage> blob_storage_;
std::unique_ptr<BlobGCPicker> basic_blob_gc_picker_;
BlobGCPickerTest() {}
~BlobGCPickerTest() {}
void NewBlobStorageAndPicker(const TitanDBOptions& titan_db_options,
const TitanCFOptions& titan_cf_options) {
auto blob_file_cache = std::make_shared<BlobFileCache>(
titan_db_options, titan_cf_options, NewLRUCache(128));
blob_storage_.reset(new BlobStorage(titan_cf_options, blob_file_cache));
basic_blob_gc_picker_.reset(new BasicBlobGCPicker(titan_db_options, titan_cf_options));
}
void AddBlobFile(uint64_t file_number, uint64_t file_size,
uint64_t discardable_size, bool being_gc = false) {
auto f = std::make_shared<BlobFileMeta>(file_number, file_size);
f->AddDiscardableSize(discardable_size);
f->FileStateTransit(BlobFileMeta::FileEvent::kDbRestart);
if (being_gc) {
f->FileStateTransit(BlobFileMeta::FileEvent::kGCBegin);
}
blob_storage_->files_[file_number] = f;
}
void UpdateBlobStorage() { blob_storage_->ComputeGCScore(); }
};
TEST_F(BlobGCPickerTest, Basic) {
TitanDBOptions titan_db_options;
TitanCFOptions titan_cf_options;
titan_cf_options.min_gc_batch_size = 0;
NewBlobStorageAndPicker(titan_db_options, titan_cf_options);
AddBlobFile(1U, 1U, 0U);
UpdateBlobStorage();
auto blob_gc = basic_blob_gc_picker_->PickBlobGC(blob_storage_.get());
ASSERT_TRUE(blob_gc != nullptr);
ASSERT_EQ(blob_gc->inputs().size(), 1);
ASSERT_EQ(blob_gc->inputs()[0]->file_number(), 1U);
}
TEST_F(BlobGCPickerTest, BeingGC) {
TitanDBOptions titan_db_options;
TitanCFOptions titan_cf_options;
titan_cf_options.min_gc_batch_size = 0;
NewBlobStorageAndPicker(titan_db_options, titan_cf_options);
AddBlobFile(1U, 1U, 0U, true);
UpdateBlobStorage();
auto blob_gc = basic_blob_gc_picker_->PickBlobGC(blob_storage_.get());
ASSERT_EQ(nullptr, blob_gc);
NewBlobStorageAndPicker(titan_db_options, titan_cf_options);
AddBlobFile(1U, 1U, 0U, true);
AddBlobFile(2U, 1U, 0U);
UpdateBlobStorage();
blob_gc = basic_blob_gc_picker_->PickBlobGC(blob_storage_.get());
ASSERT_EQ(blob_gc->inputs().size(), 1);
ASSERT_EQ(blob_gc->inputs()[0]->file_number(), 2U);
}
} // namespace titandb
} // namespace rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#include "rocksdb/utilities/titandb/db.h"
#include "utilities/titandb/db_impl.h"
namespace rocksdb {
namespace titandb {
Status TitanDB::Open(const TitanOptions& options, const std::string& dbname,
TitanDB** db) {
TitanDBOptions db_options(options);
TitanCFOptions cf_options(options);
std::vector<TitanCFDescriptor> descs;
descs.emplace_back(kDefaultColumnFamilyName, cf_options);
std::vector<ColumnFamilyHandle*> handles;
Status s = TitanDB::Open(db_options, dbname, descs, &handles, db);
if (s.ok()) {
assert(handles.size() == 1);
// DBImpl is always holding the default handle.
delete handles[0];
}
return s;
}
Status TitanDB::Open(const TitanDBOptions& db_options,
const std::string& dbname,
const std::vector<TitanCFDescriptor>& descs,
std::vector<ColumnFamilyHandle*>* handles, TitanDB** db) {
auto impl = new TitanDBImpl(db_options, dbname);
auto s = impl->Open(descs, handles);
if (s.ok()) {
*db = impl;
impl->StartBackgroundTasks();
} else {
*db = nullptr;
delete impl;
}
return s;
}
} // namespace titandb
} // namespace rocksdb
This diff is collapsed.
#pragma once
#include "db/db_impl.h"
#include "util/repeatable_thread.h"
#include "rocksdb/utilities/titandb/db.h"
#include "utilities/titandb/blob_file_manager.h"
#include "utilities/titandb/version_set.h"
namespace rocksdb {
namespace titandb {
class TitanDBImpl : public TitanDB {
public:
TitanDBImpl(const TitanDBOptions& options, const std::string& dbname);
~TitanDBImpl();
Status Open(const std::vector<TitanCFDescriptor>& descs,
std::vector<ColumnFamilyHandle*>* handles);
Status Close() override;
using TitanDB::CreateColumnFamilies;
Status CreateColumnFamilies(
const std::vector<TitanCFDescriptor>& descs,
std::vector<ColumnFamilyHandle*>* handles) override;
Status DropColumnFamilies(
const std::vector<ColumnFamilyHandle*>& handles) override;
using TitanDB::CompactFiles;
Status CompactFiles(
const CompactionOptions& compact_options,
ColumnFamilyHandle* column_family,
const std::vector<std::string>& input_file_names, const int output_level,
const int output_path_id = -1,
std::vector<std::string>* const output_file_names = nullptr,
CompactionJobInfo* compaction_job_info = nullptr) override;
Status CloseImpl();
using TitanDB::Get;
Status Get(const ReadOptions& options, ColumnFamilyHandle* handle,
const Slice& key, PinnableSlice* value) override;
using TitanDB::MultiGet;
std::vector<Status> MultiGet(const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& handles,
const std::vector<Slice>& keys,
std::vector<std::string>* values) override;
using TitanDB::NewIterator;
Iterator* NewIterator(const ReadOptions& options,
ColumnFamilyHandle* handle) override;
Status NewIterators(const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& handles,
std::vector<Iterator*>* iterators) override;
const Snapshot* GetSnapshot() override;
void ReleaseSnapshot(const Snapshot* snapshot) override;
using TitanDB::GetOptions;
Options GetOptions(ColumnFamilyHandle* column_family) const override;
void OnFlushCompleted(const FlushJobInfo& flush_job_info);
void OnCompactionCompleted(const CompactionJobInfo& compaction_job_info);
void StartBackgroundTasks();
private:
class FileManager;
friend class FileManager;
friend class BlobGCJobTest;
friend class BaseDbListener;
friend class TitanDBTest;
Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* handle,
const Slice& key, PinnableSlice* value);
std::vector<Status> MultiGetImpl(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& handles,
const std::vector<Slice>& keys, std::vector<std::string>* values);
Iterator* NewIteratorImpl(const ReadOptions& options,
ColumnFamilyHandle* handle,
std::shared_ptr<ManagedSnapshot> snapshot);
// REQUIRE: mutex_ held
void AddToGCQueue(uint32_t column_family_id) {
gc_queue_.push_back(column_family_id);
}
// REQUIRE: gc_queue_ not empty
// REQUIRE: mutex_ held
uint32_t PopFirstFromGCQueue() {
assert(!gc_queue_.empty());
auto column_family_id = *gc_queue_.begin();
gc_queue_.pop_front();
return column_family_id;
}
// REQUIRE: mutex_ held
void MaybeScheduleGC();
static void BGWorkGC(void* db);
void BackgroundCallGC();
Status BackgroundGC(LogBuffer* log_buffer);
void PurgeObsoleteFiles();
SequenceNumber GetOldestSnapshotSequence() {
SequenceNumber oldest_snapshot = kMaxSequenceNumber;
{
// Need to lock DBImpl mutex before access snapshot list.
InstrumentedMutexLock l(db_impl_->mutex());
auto& snapshots = db_impl_->snapshots();
if (!snapshots.empty()) {
oldest_snapshot = snapshots.oldest()->GetSequenceNumber();
}
}
return oldest_snapshot;
}
FileLock* lock_{nullptr};
// The lock sequence must be Titan.mutex_.Lock() -> Base DB mutex_.Lock()
// while the unlock sequence must be Base DB mutex.Unlock() ->
// Titan.mutex_.Unlock() Only if we all obey these sequence, we can prevent
// potential dead lock.
port::Mutex mutex_;
// This condition variable is signaled on these conditions:
// * whenever bg_gc_scheduled_ goes down to 0
port::CondVar bg_cv_;
std::string dbname_;
std::string dirname_;
Env* env_;
EnvOptions env_options_;
DBImpl* db_impl_;
TitanDBOptions db_options_;
std::unordered_map<uint32_t, std::shared_ptr<TableFactory>>
original_table_factory_;
// handle for purging obsolete blob files at fixed intervals
std::unique_ptr<RepeatableThread> thread_purge_obsolete_;
std::unique_ptr<VersionSet> vset_;
std::set<uint64_t> pending_outputs_;
std::shared_ptr<BlobFileManager> blob_manager_;
// gc_queue_ hold column families that we need to gc.
// pending_gc_ hold column families that already on gc_queue_.
std::deque<uint32_t> gc_queue_;
std::atomic_int bg_gc_scheduled_{0};
std::atomic_bool shuting_down_{false};
};
} // namespace titandb
} // namespace rocksdb
#include "utilities/titandb/db_impl.h"
namespace rocksdb {
namespace titandb {
void TitanDBImpl::PurgeObsoleteFiles() {
Status s;
ObsoleteFiles obsolete_files;
auto oldest_sequence = GetOldestSnapshotSequence();
{
MutexLock l(&mutex_);
vset_->GetObsoleteFiles(&obsolete_files, oldest_sequence);
}
{
std::vector<std::string> candidate_files;
for (auto& blob_file : obsolete_files.blob_files) {
candidate_files.emplace_back(
BlobFileName(db_options_.dirname, std::get<0>(blob_file)));
}
for (auto& manifest : obsolete_files.manifests) {
candidate_files.emplace_back(std::move(manifest));
}
// dedup state.inputs so we don't try to delete the same
// file twice
std::sort(candidate_files.begin(), candidate_files.end());
candidate_files.erase(
std::unique(candidate_files.begin(), candidate_files.end()),
candidate_files.end());
for (const auto& candidate_file : candidate_files) {
ROCKS_LOG_INFO(db_options_.info_log, "Titan deleting obsolete file [%s]",
candidate_file.c_str());
s = env_->DeleteFile(candidate_file);
if (!s.ok()) {
fprintf(stderr, "Titan deleting file [%s] failed, status:%s",
candidate_file.c_str(), s.ToString().c_str());
abort();
}
}
}
}
} // namespace titandb
} // namespace rocksdb
#include "utilities/titandb/db_impl.h"
#include "utilities/titandb/blob_file_iterator.h"
#include "utilities/titandb/blob_gc_job.h"
#include "utilities/titandb/blob_gc_picker.h"
namespace rocksdb {
namespace titandb {
void TitanDBImpl::MaybeScheduleGC() {
mutex_.AssertHeld();
if (db_options_.disable_background_gc) return;
if (shuting_down_.load(std::memory_order_acquire)) return;
if (bg_gc_scheduled_.load(std::memory_order_acquire) >=
db_options_.max_background_gc)
return;
bg_gc_scheduled_.fetch_add(1, std::memory_order_release);
env_->Schedule(&TitanDBImpl::BGWorkGC, this, Env::Priority::LOW, this);
}
void TitanDBImpl::BGWorkGC(void* db) {
reinterpret_cast<TitanDBImpl*>(db)->BackgroundCallGC();
}
void TitanDBImpl::BackgroundCallGC() {
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
{
MutexLock l(&mutex_);
assert(bg_gc_scheduled_ > 0);
BackgroundGC(&log_buffer);
{
mutex_.Unlock();
log_buffer.FlushBufferToLog();
LogFlush(db_options_.info_log.get());
mutex_.Lock();
}
bg_gc_scheduled_--;
if (bg_gc_scheduled_ == 0) {
// signal if
// * bg_gc_scheduled_ == 0 -- need to wakeup ~TitanDBImpl
// If none of this is true, there is no need to signal since nobody is
// waiting for it
bg_cv_.SignalAll();
}
// IMPORTANT: there should be no code after calling SignalAll. This call may
// signal the DB destructor that it's OK to proceed with destruction. In
// that case, all DB variables will be dealloacated and referencing them
// will cause trouble.
}
}
Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) {
mutex_.AssertHeld();
std::unique_ptr<BlobGC> blob_gc;
std::unique_ptr<ColumnFamilyHandle> cfh;
Status s;
if (!gc_queue_.empty()) {
uint32_t column_family_id = PopFirstFromGCQueue();
auto bs = vset_->GetBlobStorage(column_family_id).lock().get();
const auto& titan_cf_options = bs->titan_cf_options();
std::shared_ptr<BlobGCPicker> blob_gc_picker =
std::make_shared<BasicBlobGCPicker>(db_options_, titan_cf_options);
blob_gc = blob_gc_picker->PickBlobGC(bs);
if (blob_gc) {
cfh = db_impl_->GetColumnFamilyHandleUnlocked(column_family_id);
assert(column_family_id == cfh->GetID());
blob_gc->SetColumnFamily(cfh.get());
}
}
// TODO(@DorianZheng) Make sure enough room for GC
if (UNLIKELY(!blob_gc)) {
// Nothing to do
ROCKS_LOG_BUFFER(log_buffer, "Titan GC nothing to do");
} else {
BlobGCJob blob_gc_job(blob_gc.get(), db_, &mutex_, db_options_, env_,
env_options_, blob_manager_.get(), vset_.get(),
log_buffer, &shuting_down_);
s = blob_gc_job.Prepare();
if (s.ok()) {
mutex_.Unlock();
s = blob_gc_job.Run();
mutex_.Lock();
}
if (s.ok()) {
s = blob_gc_job.Finish();
}
blob_gc->ReleaseGcFiles();
}
if (s.ok()) {
// Done
} else {
ROCKS_LOG_WARN(db_options_.info_log, "Titan GC error: %s",
s.ToString().c_str());
}
return s;
}
} // namespace titandb
} // namespace rocksdb
#pragma once
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include "db/db_iter.h"
namespace rocksdb {
namespace titandb {
class TitanDBIterator : public Iterator {
public:
TitanDBIterator(const ReadOptions& options, BlobStorage* storage,
std::shared_ptr<ManagedSnapshot> snap,
std::unique_ptr<ArenaWrappedDBIter> iter)
: options_(options),
storage_(storage),
snap_(snap),
iter_(std::move(iter)) {}
bool Valid() const override { return iter_->Valid() && status_.ok(); }
Status status() const override {
// assume volatile inner iter
if(status_.ok()) {
return iter_->status();
} else {
return status_;
}
}
void SeekToFirst() override {
iter_->SeekToFirst();
GetBlobValue();
}
void SeekToLast() override {
iter_->SeekToLast();
GetBlobValue();
}
void Seek(const Slice& target) override {
iter_->Seek(target);
GetBlobValue();
}
void SeekForPrev(const Slice& target) override {
iter_->SeekForPrev(target);
GetBlobValue();
}
void Next() override {
assert(Valid());
iter_->Next();
GetBlobValue();
}
void Prev() override {
assert(Valid());
iter_->Prev();
GetBlobValue();
}
Slice key() const override {
assert(Valid());
return iter_->key();
}
Slice value() const override {
assert(Valid());
if (!iter_->IsBlob()) return iter_->value();
return record_.value;
}
private:
void GetBlobValue() {
if (!iter_->Valid() || !iter_->IsBlob()) {
status_ = iter_->status();
return;
}
assert(iter_->status().ok());
BlobIndex index;
status_ = DecodeInto(iter_->value(), &index);
if (!status_.ok()) {
fprintf(stderr, "GetBlobValue decode blob index err:%s\n",
status_.ToString().c_str());
abort();
}
auto it = files_.find(index.file_number);
if (it == files_.end()) {
std::unique_ptr<BlobFilePrefetcher> prefetcher;
status_ = storage_->NewPrefetcher(index.file_number, &prefetcher);
if (status_.IsCorruption()) {
fprintf(stderr,
"key:%s GetBlobValue err:%s with sequence number:%" PRIu64 "\n",
iter_->key().ToString(true).c_str(), status_.ToString().c_str(),
options_.snapshot->GetSequenceNumber());
}
if (!status_.ok()) return;
it = files_.emplace(index.file_number, std::move(prefetcher)).first;
}
buffer_.Reset();
status_ = it->second->Get(options_, index.blob_handle, &record_, &buffer_);
}
Status status_;
BlobRecord record_;
PinnableSlice buffer_;
ReadOptions options_;
BlobStorage* storage_;
std::shared_ptr<ManagedSnapshot> snap_;
std::unique_ptr<ArenaWrappedDBIter> iter_;
std::map<uint64_t, std::unique_ptr<BlobFilePrefetcher>> files_;
};
} // namespace titandb
} // namespace rocksdb
#include "utilities/titandb/options.h"
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include "rocksdb/convenience.h"
namespace rocksdb {
namespace titandb {
std::string TitanCFOptions::ToString() const {
char buf[256];
std::string str;
std::string res = "[titandb]\n";
snprintf(buf, sizeof(buf), "min_blob_size = %" PRIu64 "\n", min_blob_size);
res += buf;
GetStringFromCompressionType(&str, blob_file_compression);
snprintf(buf, sizeof(buf), "blob_file_compression = %s\n", str.c_str());
res += buf;
snprintf(buf, sizeof(buf), "blob_file_target_size = %" PRIu64 "\n",
blob_file_target_size);
res += buf;
return res;
}
} // namespace titandb
} // namespace rocksdb
#pragma once
#include "rocksdb/options.h"
namespace rocksdb {
namespace titandb {
struct TitanDBOptions : public DBOptions {
// The directory to store data specific to TitanDB alongside with
// the base DB.
//
// Default: {dbname}/titandb
std::string dirname;
// Disable background GC
//
// Default: false
bool disable_background_gc{false};
// Max background GC thread
//
// Default: 1
int32_t max_background_gc{1};
TitanDBOptions() = default;
explicit TitanDBOptions(const DBOptions& options) : DBOptions(options) {}
TitanDBOptions& operator=(const DBOptions& options) {
*dynamic_cast<DBOptions*>(this) = options;
return *this;
}
};
struct TitanCFOptions : public ColumnFamilyOptions {
// The smallest value to store in blob files. Value smaller than
// this threshold will be inlined in base DB.
//
// Default: 4096
uint64_t min_blob_size{4096};
// The compression algorithm used to compress data in blob files.
//
// Default: kNoCompression
CompressionType blob_file_compression{kNoCompression};
// The desirable blob file size. This is not a hard limit but a wish.
//
// Default: 256MB
uint64_t blob_file_target_size{256 << 20};
// If non-NULL use the specified cache for blob records.
//
// Default: nullptr
std::shared_ptr<Cache> blob_cache;
// Max batch size for gc
//
// Default: 1GB
uint64_t max_gc_batch_size{1 << 30};
// Min batch size for gc
//
// Default: 512MB
uint64_t min_gc_batch_size{512 << 20};
// The ratio of how much discardable size of a blob file can be GC
//
// Default: 0.5
float blob_file_discardable_ratio{0.5};
// The ratio of how much size of a blob file need to be sample before GC
//
// Default: 0.1
float sample_file_size_ratio{0.1};
// The blob file size less than this option will be mark gc
//
// Default: 8MB
uint64_t merge_small_file_threshold{8 << 20};
TitanCFOptions() = default;
explicit TitanCFOptions(const ColumnFamilyOptions& options)
: ColumnFamilyOptions(options) {}
TitanCFOptions& operator=(const ColumnFamilyOptions& options) {
*dynamic_cast<ColumnFamilyOptions*>(this) = options;
return *this;
}
std::string ToString() const;
};
struct TitanOptions : public TitanDBOptions, public TitanCFOptions {
TitanOptions() = default;
explicit TitanOptions(const Options& options)
: TitanDBOptions(options), TitanCFOptions(options) {}
TitanOptions& operator=(const Options& options) {
*dynamic_cast<TitanDBOptions*>(this) = options;
*dynamic_cast<TitanCFOptions*>(this) = options;
return *this;
}
operator Options() {
Options options;
*dynamic_cast<DBOptions*>(&options) = *dynamic_cast<DBOptions*>(this);
*dynamic_cast<ColumnFamilyOptions*>(&options) =
*dynamic_cast<ColumnFamilyOptions*>(this);
return options;
}
};
} // namespace titandb
} // namespace rocksdb
#include "utilities/titandb/table_builder.h"
namespace rocksdb {
namespace titandb {
void TitanTableBuilder::Add(const Slice& key, const Slice& value) {
if (!ok()) return;
ParsedInternalKey ikey;
if (!ParseInternalKey(key, &ikey)) {
status_ = Status::Corruption(Slice());
return;
}
if (ikey.type != kTypeValue || value.size() < cf_options_.min_blob_size) {
base_builder_->Add(key, value);
return;
}
std::string index_value;
AddBlob(ikey.user_key, value, &index_value);
if (!ok()) return;
ikey.type = kTypeBlobIndex;
std::string index_key;
AppendInternalKey(&index_key, ikey);
base_builder_->Add(index_key, index_value);
}
void TitanTableBuilder::AddBlob(const Slice& key, const Slice& value,
std::string* index_value) {
if (!ok()) return;
if (!blob_builder_) {
status_ = blob_manager_->NewFile(&blob_handle_);
if (!ok()) return;
blob_builder_.reset(
new BlobFileBuilder(cf_options_, blob_handle_->GetFile()));
}
BlobIndex index;
BlobRecord record;
record.key = key;
record.value = value;
index.file_number = blob_handle_->GetNumber();
blob_builder_->Add(record, &index.blob_handle);
if (ok()) {
index.EncodeTo(index_value);
}
}
Status TitanTableBuilder::status() const {
Status s = status_;
if (s.ok()) {
s = base_builder_->status();
}
if (s.ok() && blob_builder_) {
s = blob_builder_->status();
}
return s;
}
Status TitanTableBuilder::Finish() {
base_builder_->Finish();
if (blob_builder_) {
blob_builder_->Finish();
if (ok()) {
std::shared_ptr<BlobFileMeta> file = std::make_shared<BlobFileMeta>(
blob_handle_->GetNumber(), blob_handle_->GetFile()->GetFileSize());
file->FileStateTransit(BlobFileMeta::FileEvent::kFlushOrCompactionOutput);
status_ =
blob_manager_->FinishFile(cf_id_, file, std::move(blob_handle_));
// ROCKS_LOG_INFO(db_options_.info_log, "[%u] AddFile %lu", cf_id_,
// file->file_number_);
} else {
status_ = blob_manager_->DeleteFile(std::move(blob_handle_));
}
}
return status();
}
void TitanTableBuilder::Abandon() {
base_builder_->Abandon();
if (blob_builder_) {
blob_builder_->Abandon();
status_ = blob_manager_->DeleteFile(std::move(blob_handle_));
}
}
uint64_t TitanTableBuilder::NumEntries() const {
return base_builder_->NumEntries();
}
uint64_t TitanTableBuilder::FileSize() const {
return base_builder_->FileSize();
}
bool TitanTableBuilder::NeedCompact() const {
return base_builder_->NeedCompact();
}
TableProperties TitanTableBuilder::GetTableProperties() const {
return base_builder_->GetTableProperties();
}
} // namespace titandb
} // namespace rocksdb
#pragma once
#include "table/table_builder.h"
#include "utilities/titandb/blob_file_builder.h"
#include "utilities/titandb/blob_file_manager.h"
#include "utilities/titandb/options.h"
namespace rocksdb {
namespace titandb {
class TitanTableBuilder : public TableBuilder {
public:
TitanTableBuilder(uint32_t cf_id, const TitanDBOptions& db_options,
const TitanCFOptions& cf_options,
std::unique_ptr<TableBuilder> base_builder,
std::shared_ptr<BlobFileManager> blob_manager)
: cf_id_(cf_id),
db_options_(db_options),
cf_options_(cf_options),
base_builder_(std::move(base_builder)),
blob_manager_(blob_manager) {}
void Add(const Slice& key, const Slice& value) override;
Status status() const override;
Status Finish() override;
void Abandon() override;
uint64_t NumEntries() const override;
uint64_t FileSize() const override;
bool NeedCompact() const override;
TableProperties GetTableProperties() const override;
private:
bool ok() const { return status().ok(); }
void AddBlob(const Slice& key, const Slice& value, std::string* index_value);
Status status_;
uint32_t cf_id_;
TitanDBOptions db_options_;
TitanCFOptions cf_options_;
std::unique_ptr<TableBuilder> base_builder_;
std::unique_ptr<BlobFileHandle> blob_handle_;
std::shared_ptr<BlobFileManager> blob_manager_;
std::unique_ptr<BlobFileBuilder> blob_builder_;
};
} // namespace titandb
} // namespace rocksdb
This diff is collapsed.
#include "utilities/titandb/table_factory.h"
#include "utilities/titandb/table_builder.h"
namespace rocksdb {
namespace titandb {
Status TitanTableFactory::NewTableReader(
const TableReaderOptions& options,
std::unique_ptr<RandomAccessFileReader>&& file, uint64_t file_size,
std::unique_ptr<TableReader>* result,
bool prefetch_index_and_filter_in_cache) const {
return base_factory_->NewTableReader(options, std::move(file), file_size,
result,
prefetch_index_and_filter_in_cache);
}
TableBuilder* TitanTableFactory::NewTableBuilder(
const TableBuilderOptions& options, uint32_t column_family_id,
WritableFileWriter* file) const {
std::unique_ptr<TableBuilder> base_builder(
base_factory_->NewTableBuilder(options, column_family_id, file));
return new TitanTableBuilder(column_family_id, db_options_, cf_options_,
std::move(base_builder), blob_manager_);
}
std::string TitanTableFactory::GetPrintableTableOptions() const {
return base_factory_->GetPrintableTableOptions() + cf_options_.ToString();
}
} // namespace titandb
} // namespace rocksdb
#pragma once
#include "rocksdb/table.h"
#include "utilities/titandb/blob_file_manager.h"
#include "utilities/titandb/options.h"
namespace rocksdb {
namespace titandb {
class TitanTableFactory : public TableFactory {
public:
TitanTableFactory(const TitanDBOptions& db_options,
const TitanCFOptions& cf_options,
std::shared_ptr<BlobFileManager> blob_manager)
: db_options_(db_options),
cf_options_(cf_options),
base_factory_(cf_options.table_factory),
blob_manager_(blob_manager) {}
const char* Name() const override { return "TitanTable"; }
Status NewTableReader(
const TableReaderOptions& options,
std::unique_ptr<RandomAccessFileReader>&& file, uint64_t file_size,
std::unique_ptr<TableReader>* result,
bool prefetch_index_and_filter_in_cache = true) const override;
TableBuilder* NewTableBuilder(const TableBuilderOptions& options,
uint32_t column_family_id,
WritableFileWriter* file) const override;
std::string GetPrintableTableOptions() const override;
Status SanitizeOptions(const DBOptions& db_options,
const ColumnFamilyOptions& cf_options) const override {
// Override this when we need to validate our options.
return base_factory_->SanitizeOptions(db_options, cf_options);
}
Status GetOptionString(std::string* opt_string,
const std::string& delimiter) const override {
// Override this when we need to persist our options.
return base_factory_->GetOptionString(opt_string, delimiter);
}
void* GetOptions() override { return base_factory_->GetOptions(); }
bool IsDeleteRangeSupported() const override {
return base_factory_->IsDeleteRangeSupported();
}
private:
TitanDBOptions db_options_;
TitanCFOptions cf_options_;
std::shared_ptr<TableFactory> base_factory_;
std::shared_ptr<BlobFileManager> blob_manager_;
};
} // namespace titandb
} // namespace rocksdb
#pragma once
#include "rocksdb/cache.h"
#include "util/compression.h"
#include "util/testharness.h"
namespace rocksdb {
namespace titandb {
template <typename T>
void CheckCodec(const T& input) {
std::string buffer;
input.EncodeTo(&buffer);
T output;
ASSERT_OK(DecodeInto(buffer, &output));
ASSERT_EQ(output, input);
}
} // namespace titandb
} // namespace rocksdb
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
#include "utilities/titandb/util.h"
#include "util/testharness.h"
namespace rocksdb {
namespace titandb {
class UtilTest : public testing::Test {};
TEST(UtilTest, Compression) {
std::string input(1024, 'a');
for (auto compression :
{kSnappyCompression, kZlibCompression, kLZ4Compression, kZSTD}) {
CompressionContext compression_ctx(compression);
std::string buffer;
auto compressed = Compress(compression_ctx, input, &buffer, &compression);
if (compression != kNoCompression) {
ASSERT_TRUE(compressed.size() <= input.size());
UncompressionContext uncompression_ctx(compression);
OwnedSlice output;
ASSERT_OK(Uncompress(uncompression_ctx, compressed, &output));
ASSERT_EQ(output, input);
}
}
}
} // namespace titandb
} // namespace rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment