Unverified Commit 07aa0655 authored by Connor's avatar Connor Committed by GitHub

Rename version_set to blob_file_set (#69)

* rename version_set to blob_set
Signed-off-by: 's avatarConnor1996 <zbk602423539@gmail.com>
parent f963b880
#include "version_set.h"
#include "blob_file_set.h"
#include <inttypes.h>
......@@ -11,7 +11,7 @@ namespace titandb {
const size_t kMaxFileCacheSize = 1024 * 1024;
VersionSet::VersionSet(const TitanDBOptions& options, TitanStats* stats)
BlobFileSet::BlobFileSet(const TitanDBOptions& options, TitanStats* stats)
: dirname_(options.dirname),
env_(options.env),
env_options_(options),
......@@ -24,7 +24,7 @@ VersionSet::VersionSet(const TitanDBOptions& options, TitanStats* stats)
file_cache_ = NewLRUCache(file_cache_size);
}
Status VersionSet::Open(
Status BlobFileSet::Open(
const std::map<uint32_t, TitanCFOptions>& column_families) {
// Sets up initial column families.
AddColumnFamilies(column_families);
......@@ -39,7 +39,7 @@ Status VersionSet::Open(
return OpenManifest(NewFileNumber());
}
Status VersionSet::Recover() {
Status BlobFileSet::Recover() {
struct LogReporter : public log::Reader::Reporter {
Status* status;
void Corruption(size_t, const Status& s) override {
......@@ -145,7 +145,7 @@ Status VersionSet::Recover() {
return Status::OK();
}
Status VersionSet::OpenManifest(uint64_t file_number) {
Status BlobFileSet::OpenManifest(uint64_t file_number) {
Status s;
auto file_name = DescriptorFileName(dirname_, file_number);
......@@ -177,7 +177,7 @@ Status VersionSet::OpenManifest(uint64_t file_number) {
return s;
}
Status VersionSet::WriteSnapshot(log::Writer* log) {
Status BlobFileSet::WriteSnapshot(log::Writer* log) {
Status s;
// Saves global information
{
......@@ -207,8 +207,8 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
return s;
}
Status VersionSet::LogAndApply(VersionEdit& edit) {
TEST_SYNC_POINT("VersionSet::LogAndApply");
Status BlobFileSet::LogAndApply(VersionEdit& edit) {
TEST_SYNC_POINT("BlobFileSet::LogAndApply");
// TODO(@huachao): write manifest file unlocked
std::string record;
edit.SetNextFileNumber(next_file_number_.load());
......@@ -228,7 +228,7 @@ Status VersionSet::LogAndApply(VersionEdit& edit) {
return collector.Apply(*this);
}
void VersionSet::AddColumnFamilies(
void BlobFileSet::AddColumnFamilies(
const std::map<uint32_t, TitanCFOptions>& column_families) {
for (auto& cf : column_families) {
auto file_cache = std::make_shared<BlobFileCache>(db_options_, cf.second,
......@@ -239,7 +239,7 @@ void VersionSet::AddColumnFamilies(
}
}
Status VersionSet::DropColumnFamilies(
Status BlobFileSet::DropColumnFamilies(
const std::vector<uint32_t>& column_families,
SequenceNumber obsolete_sequence) {
Status s;
......@@ -266,7 +266,7 @@ Status VersionSet::DropColumnFamilies(
return s;
}
Status VersionSet::MaybeDestroyColumnFamily(uint32_t cf_id) {
Status BlobFileSet::MaybeDestroyColumnFamily(uint32_t cf_id) {
obsolete_columns_.erase(cf_id);
auto it = column_families_.find(cf_id);
if (it != column_families_.end()) {
......@@ -281,8 +281,8 @@ Status VersionSet::MaybeDestroyColumnFamily(uint32_t cf_id) {
return Status::NotFound("invalid column family");
}
void VersionSet::GetObsoleteFiles(std::vector<std::string>* obsolete_files,
SequenceNumber oldest_sequence) {
void BlobFileSet::GetObsoleteFiles(std::vector<std::string>* obsolete_files,
SequenceNumber oldest_sequence) {
for (auto it = column_families_.begin(); it != column_families_.end();) {
auto& cf_id = it->first;
auto& blob_storage = it->second;
......
......@@ -20,9 +20,11 @@
namespace rocksdb {
namespace titandb {
class VersionSet {
// BlobFileSet is the set of all the blobs file generated by Titan.
// It records blob file meta in terms of column family.
class BlobFileSet {
public:
explicit VersionSet(const TitanDBOptions& options, TitanStats* stats);
explicit BlobFileSet(const TitanDBOptions& options, TitanStats* stats);
// Sets up the storage specified in "options.dirname".
// If the manifest doesn't exist, it will create one.
......@@ -31,8 +33,7 @@ class VersionSet {
// outside of the provided column families.
Status Open(const std::map<uint32_t, TitanCFOptions>& column_families);
// Applies *edit on the current version to form a new version that is
// both saved to the manifest and installed as the new current version.
// Applies *edit and saved to the manifest.
// REQUIRES: mutex is held
Status LogAndApply(VersionEdit& edit);
......
#pragma once
#include "blob_file_set.h"
#include "db_impl.h"
#include "rocksdb/listener.h"
#include "rocksdb/table_properties.h"
#include "util/coding.h"
#include "version_set.h"
namespace rocksdb {
namespace titandb {
......
......@@ -76,7 +76,7 @@ BlobGCJob::BlobGCJob(BlobGC* blob_gc, DB* db, port::Mutex* mutex,
const TitanDBOptions& titan_db_options, Env* env,
const EnvOptions& env_options,
BlobFileManager* blob_file_manager,
VersionSet* version_set, LogBuffer* log_buffer,
BlobFileSet* blob_file_set, LogBuffer* log_buffer,
std::atomic_bool* shuting_down, TitanStats* stats)
: blob_gc_(blob_gc),
base_db_(db),
......@@ -86,7 +86,7 @@ BlobGCJob::BlobGCJob(BlobGC* blob_gc, DB* db, port::Mutex* mutex,
env_(env),
env_options_(env_options),
blob_file_manager_(blob_file_manager),
version_set_(version_set),
blob_file_set_(blob_file_set),
log_buffer_(log_buffer),
shuting_down_(shuting_down),
stats_(stats) {}
......@@ -586,7 +586,7 @@ Status BlobGCJob::DeleteInputBlobFiles() {
metrics_.blob_db_gc_num_files++;
edit.DeleteBlobFile(file->file_number(), obsolete_sequence);
}
s = version_set_->LogAndApply(edit);
s = blob_file_set_->LogAndApply(edit);
// TODO(@DorianZheng) Purge pending outputs
// base_db_->pending_outputs_.erase(handle->GetNumber());
return s;
......
......@@ -3,6 +3,7 @@
#include "blob_file_builder.h"
#include "blob_file_iterator.h"
#include "blob_file_manager.h"
#include "blob_file_set.h"
#include "blob_gc.h"
#include "db/db_impl/db_impl.h"
#include "rocksdb/statistics.h"
......@@ -10,7 +11,6 @@
#include "titan/options.h"
#include "titan_stats.h"
#include "version_edit.h"
#include "version_set.h"
namespace rocksdb {
namespace titandb {
......@@ -20,7 +20,7 @@ class BlobGCJob {
BlobGCJob(BlobGC* blob_gc, DB* db, port::Mutex* mutex,
const TitanDBOptions& titan_db_options, Env* env,
const EnvOptions& env_options, BlobFileManager* blob_file_manager,
VersionSet* version_set, LogBuffer* log_buffer,
BlobFileSet* blob_file_set, LogBuffer* log_buffer,
std::atomic_bool* shuting_down, TitanStats* stats);
// No copying allowed
......@@ -50,7 +50,7 @@ class BlobGCJob {
Env* env_;
EnvOptions env_options_;
BlobFileManager* blob_file_manager_;
VersionSet* version_set_;
BlobFileSet* blob_file_set_;
LogBuffer* log_buffer_{nullptr};
std::vector<std::pair<std::unique_ptr<BlobFileHandle>,
......
......@@ -28,7 +28,7 @@ class BlobGCJobTest : public testing::Test {
TitanDB* db_;
DBImpl* base_db_;
TitanDBImpl* tdb_;
VersionSet* version_set_;
BlobFileSet* blob_file_set_;
TitanOptions options_;
port::Mutex* mutex_;
......@@ -45,7 +45,7 @@ class BlobGCJobTest : public testing::Test {
std::weak_ptr<BlobStorage> GetBlobStorage(uint32_t cf_id) {
MutexLock l(mutex_);
return version_set_->GetBlobStorage(cf_id);
return blob_file_set_->GetBlobStorage(cf_id);
}
void CheckBlobNumber(int expected) {
......@@ -75,7 +75,7 @@ class BlobGCJobTest : public testing::Test {
ClearDir();
ASSERT_OK(TitanDB::Open(options_, dbname_, &db_));
tdb_ = reinterpret_cast<TitanDBImpl*>(db_);
version_set_ = tdb_->vset_.get();
blob_file_set_ = tdb_->blob_file_set_.get();
mutex_ = &tdb_->mutex_;
base_db_ = reinterpret_cast<DBImpl*>(tdb_->GetRootDB());
}
......@@ -120,7 +120,7 @@ class BlobGCJobTest : public testing::Test {
std::shared_ptr<BlobGCPicker> blob_gc_picker =
std::make_shared<BasicBlobGCPicker>(db_options, cf_options);
blob_gc = blob_gc_picker->PickBlobGC(
version_set_->GetBlobStorage(cfh->GetID()).lock().get());
blob_file_set_->GetBlobStorage(cfh->GetID()).lock().get());
}
if (expected) {
......@@ -132,7 +132,7 @@ class BlobGCJobTest : public testing::Test {
BlobGCJob blob_gc_job(blob_gc.get(), base_db_, mutex_, tdb_->db_options_,
tdb_->env_, EnvOptions(options_),
tdb_->blob_manager_.get(), version_set_,
tdb_->blob_manager_.get(), blob_file_set_,
&log_buffer, nullptr, nullptr);
s = blob_gc_job.Prepare();
......@@ -189,7 +189,7 @@ class BlobGCJobTest : public testing::Test {
BlobGC blob_gc(std::move(tmp), TitanCFOptions(), false /*trigger_next*/);
blob_gc.SetColumnFamily(cfh);
BlobGCJob blob_gc_job(&blob_gc, base_db_, mutex_, TitanDBOptions(),
Env::Default(), EnvOptions(), nullptr, version_set_,
Env::Default(), EnvOptions(), nullptr, blob_file_set_,
nullptr, nullptr, nullptr);
bool discardable = false;
ASSERT_OK(blob_gc_job.DiscardEntry(key, blob_index, &discardable));
......
#include "blob_storage.h"
#include "version_set.h"
#include "blob_file_set.h"
namespace rocksdb {
namespace titandb {
......
......@@ -13,7 +13,7 @@ namespace rocksdb {
namespace titandb {
// Provides methods to access the blob storage for a specific
// column family. The version must be valid when this storage is used.
// column family.
class BlobStorage {
public:
BlobStorage(const BlobStorage& bs) : destroyed_(false) {
......@@ -101,7 +101,7 @@ class BlobStorage {
SequenceNumber obsolete_sequence);
private:
friend class VersionSet;
friend class BlobFileSet;
friend class VersionTest;
friend class BlobGCPickerTest;
friend class BlobGCJobTest;
......
......@@ -27,7 +27,7 @@ class TitanDBImpl::FileManager : public BlobFileManager {
FileManager(TitanDBImpl* db) : db_(db) {}
Status NewFile(std::unique_ptr<BlobFileHandle>* handle) override {
auto number = db_->vset_->NewFileNumber();
auto number = db_->blob_file_set_->NewFileNumber();
auto name = BlobFileName(db_->dirname_, number);
Status s;
......@@ -75,7 +75,7 @@ class TitanDBImpl::FileManager : public BlobFileManager {
{
MutexLock l(&db_->mutex_);
s = db_->vset_->LogAndApply(edit);
s = db_->blob_file_set_->LogAndApply(edit);
if (!s.ok()) {
db_->SetBGError(s);
}
......@@ -220,9 +220,9 @@ Status TitanDBImpl::Open(const std::vector<TitanCFDescriptor>& descs,
// Add EventListener to collect statistics for GC
db_options_.listeners.emplace_back(std::make_shared<BaseDbListener>(this));
// Note that info log is initialized after `CreateLoggerFromOptions`,
// so new `VersionSet` here but not in constructor is to get a proper info
// so new `BlobFileSet` here but not in constructor is to get a proper info
// log.
vset_.reset(new VersionSet(db_options_, stats_.get()));
blob_file_set_.reset(new BlobFileSet(db_options_, stats_.get()));
s = DB::Open(db_options_, dbname_, init_descs, handles, &db_);
if (s.ok()) {
......@@ -237,8 +237,8 @@ Status TitanDBImpl::Open(const std::vector<TitanCFDescriptor>& descs,
auto& base_table_factory = base_descs[i].options.table_factory;
assert(base_table_factory != nullptr);
auto titan_table_factory = std::make_shared<TitanTableFactory>(
db_options_, descs[i].options, blob_manager_, &mutex_, vset_.get(),
stats_.get());
db_options_, descs[i].options, blob_manager_, &mutex_,
blob_file_set_.get(), stats_.get());
cf_info_.emplace(cf_id,
TitanColumnFamilyInfo(
{cf_name, ImmutableTitanCFOptions(descs[i].options),
......@@ -256,7 +256,7 @@ Status TitanDBImpl::Open(const std::vector<TitanCFDescriptor>& descs,
}
if (!s.ok()) return s;
s = vset_->Open(column_families);
s = blob_file_set_->Open(column_families);
if (!s.ok()) return s;
static bool has_init_background_threads = false;
......@@ -355,7 +355,7 @@ Status TitanDBImpl::CreateColumnFamilies(
// Replaces the provided table factory with TitanTableFactory.
base_table_factory.emplace_back(options.table_factory);
titan_table_factory.emplace_back(std::make_shared<TitanTableFactory>(
db_options_, desc.options, blob_manager_, &mutex_, vset_.get(),
db_options_, desc.options, blob_manager_, &mutex_, blob_file_set_.get(),
stats_.get()));
options.table_factory = titan_table_factory.back();
base_descs.emplace_back(desc.name, options);
......@@ -379,7 +379,7 @@ Status TitanDBImpl::CreateColumnFamilies(
MutableTitanCFOptions(descs[i].options), base_table_factory[i],
titan_table_factory[i]}));
}
vset_->AddColumnFamilies(column_families);
blob_file_set_->AddColumnFamilies(column_families);
}
}
if (s.ok()) {
......@@ -412,7 +412,7 @@ Status TitanDBImpl::DropColumnFamilies(
if (s.ok()) {
MutexLock l(&mutex_);
SequenceNumber obsolete_sequence = db_impl_->GetLatestSequenceNumber();
s = vset_->DropColumnFamilies(column_families, obsolete_sequence);
s = blob_file_set_->DropColumnFamilies(column_families, obsolete_sequence);
}
if (s.ok()) {
ROCKS_LOG_INFO(db_options_.info_log, "Dropped column families: %s",
......@@ -437,8 +437,8 @@ Status TitanDBImpl::DestroyColumnFamilyHandle(
if (s.ok()) {
MutexLock l(&mutex_);
// it just changes some marks and doesn't delete blob files physically.
Status destroy_status = vset_->MaybeDestroyColumnFamily(cf_id);
// VersionSet will return NotFound status if the cf is not destroyed.
Status destroy_status = blob_file_set_->MaybeDestroyColumnFamily(cf_id);
// BlobFileSet will return NotFound status if the cf is not destroyed.
if (destroy_status.ok()) {
assert(cf_info_.count(cf_id) > 0);
cf_info_.erase(cf_id);
......@@ -548,7 +548,7 @@ Status TitanDBImpl::GetImpl(const ReadOptions& options,
PinnableSlice buffer;
mutex_.Lock();
auto storage = vset_->GetBlobStorage(handle->GetID()).lock();
auto storage = blob_file_set_->GetBlobStorage(handle->GetID()).lock();
mutex_.Unlock();
{
......@@ -624,7 +624,7 @@ Iterator* TitanDBImpl::NewIteratorImpl(
auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(handle)->cfd();
mutex_.Lock();
auto storage = vset_->GetBlobStorage(handle->GetID());
auto storage = blob_file_set_->GetBlobStorage(handle->GetID());
mutex_.Unlock();
std::unique_ptr<ArenaWrappedDBIter> iter(db_impl_->NewIteratorImpl(
......@@ -777,7 +777,7 @@ Status TitanDBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
if (!s.ok()) return s;
MutexLock l(&mutex_);
auto bs = vset_->GetBlobStorage(cf_id).lock();
auto bs = blob_file_set_->GetBlobStorage(cf_id).lock();
if (!bs) {
// TODO: Should treat it as background error and make DB read-only.
ROCKS_LOG_ERROR(db_options_.info_log,
......@@ -954,7 +954,8 @@ void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) {
{
MutexLock l(&mutex_);
auto blob_storage = vset_->GetBlobStorage(flush_job_info.cf_id).lock();
auto blob_storage =
blob_file_set_->GetBlobStorage(flush_job_info.cf_id).lock();
if (!blob_storage) {
// TODO: Should treat it as background error and make DB read-only.
ROCKS_LOG_ERROR(db_options_.info_log,
......@@ -1042,7 +1043,7 @@ void TitanDBImpl::OnCompactionCompleted(
{
MutexLock l(&mutex_);
auto bs = vset_->GetBlobStorage(compaction_job_info.cf_id).lock();
auto bs = blob_file_set_->GetBlobStorage(compaction_job_info.cf_id).lock();
if (!bs) {
// TODO: Should treat it as background error and make DB read-only.
ROCKS_LOG_ERROR(db_options_.info_log,
......
......@@ -5,10 +5,10 @@
#include "util/repeatable_thread.h"
#include "blob_file_manager.h"
#include "blob_file_set.h"
#include "table_factory.h"
#include "titan/db.h"
#include "titan_stats.h"
#include "version_set.h"
namespace rocksdb {
namespace titandb {
......@@ -245,7 +245,7 @@ class TitanDBImpl : public TitanDB {
// handle for dump internal stats at fixed intervals.
std::unique_ptr<RepeatableThread> thread_dump_stats_;
std::unique_ptr<VersionSet> vset_;
std::unique_ptr<BlobFileSet> blob_file_set_;
std::set<uint64_t> pending_outputs_;
std::shared_ptr<BlobFileManager> blob_manager_;
......
......@@ -9,7 +9,7 @@ Status TitanDBImpl::PurgeObsoleteFilesImpl() {
auto oldest_sequence = GetOldestSnapshotSequence();
{
MutexLock l(&mutex_);
vset_->GetObsoleteFiles(&candidate_files, oldest_sequence);
blob_file_set_->GetObsoleteFiles(&candidate_files, oldest_sequence);
}
// dedup state.inputs so we don't try to delete the same
......
......@@ -67,7 +67,7 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) {
Status s;
if (!gc_queue_.empty()) {
uint32_t column_family_id = PopFirstFromGCQueue();
auto bs = vset_->GetBlobStorage(column_family_id).lock().get();
auto bs = blob_file_set_->GetBlobStorage(column_family_id).lock().get();
if (bs) {
const auto& cf_options = bs->cf_options();
......@@ -90,8 +90,9 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) {
ROCKS_LOG_BUFFER(log_buffer, "Titan GC nothing to do");
} else {
BlobGCJob blob_gc_job(blob_gc.get(), db_, &mutex_, db_options_, env_,
env_options_, blob_manager_.get(), vset_.get(),
log_buffer, &shuting_down_, stats_.get());
env_options_, blob_manager_.get(),
blob_file_set_.get(), log_buffer, &shuting_down_,
stats_.get());
s = blob_gc_job.Prepare();
if (s.ok()) {
mutex_.Unlock();
......@@ -142,7 +143,7 @@ Status TitanDBImpl::TEST_StartGC(uint32_t column_family_id) {
std::unique_ptr<BlobGC> blob_gc;
std::unique_ptr<ColumnFamilyHandle> cfh;
auto bs = vset_->GetBlobStorage(column_family_id).lock().get();
auto bs = blob_file_set_->GetBlobStorage(column_family_id).lock().get();
const auto& cf_options = bs->cf_options();
std::shared_ptr<BlobGCPicker> blob_gc_picker =
std::make_shared<BasicBlobGCPicker>(db_options_, cf_options);
......@@ -158,8 +159,9 @@ Status TitanDBImpl::TEST_StartGC(uint32_t column_family_id) {
ROCKS_LOG_BUFFER(&log_buffer, "Titan GC nothing to do");
} else {
BlobGCJob blob_gc_job(blob_gc.get(), db_, &mutex_, db_options_, env_,
env_options_, blob_manager_.get(), vset_.get(),
&log_buffer, &shuting_down_, stats_.get());
env_options_, blob_manager_.get(),
blob_file_set_.get(), &log_buffer, &shuting_down_,
stats_.get());
s = blob_gc_job.Prepare();
if (s.ok()) {
......
......@@ -2,9 +2,9 @@
#include <unordered_map>
#include "blob_file_set.h"
#include "util/string_util.h"
#include "version_edit.h"
#include "version_set.h"
#include <inttypes.h>
......@@ -49,12 +49,12 @@ class EditCollector {
}
// Seal the batch and check the validation of the edits.
Status Seal(VersionSet& vset) {
Status Seal(BlobFileSet& blob_file_set) {
if (!status_.ok()) return status_;
for (auto& cf : column_families_) {
auto cf_id = cf.first;
auto storage = vset.GetBlobStorage(cf_id).lock();
auto storage = blob_file_set.GetBlobStorage(cf_id).lock();
if (!storage) {
// TODO: support OpenForReadOnly which doesn't open DB with all column
// family so there are maybe some invalid column family, but we can't
......@@ -71,7 +71,7 @@ class EditCollector {
}
// Apply the edits of the batch.
Status Apply(VersionSet& vset) {
Status Apply(BlobFileSet& blob_file_set) {
if (!status_.ok()) return status_;
if (!sealed_)
return Status::Incomplete(
......@@ -79,7 +79,7 @@ class EditCollector {
for (auto& cf : column_families_) {
auto cf_id = cf.first;
auto storage = vset.GetBlobStorage(cf_id).lock();
auto storage = blob_file_set.GetBlobStorage(cf_id).lock();
if (!storage) {
// TODO: support OpenForReadOnly which doesn't open DB with all column
// family so there are maybe some invalid column family, but we can't
......
......@@ -2,10 +2,10 @@
#include "blob_file_builder.h"
#include "blob_file_manager.h"
#include "blob_file_set.h"
#include "table/table_builder.h"
#include "titan/options.h"
#include "titan_stats.h"
#include "version_set.h"
namespace rocksdb {
namespace titandb {
......
......@@ -5,9 +5,9 @@
#include "blob_file_manager.h"
#include "blob_file_reader.h"
#include "blob_file_set.h"
#include "table_builder.h"
#include "table_factory.h"
#include "version_set.h"
namespace rocksdb {
namespace titandb {
......@@ -18,8 +18,10 @@ const uint64_t kTargetBlobFileSize = 4096;
class FileManager : public BlobFileManager {
public:
FileManager(const TitanDBOptions& db_options, VersionSet* vset)
: db_options_(db_options), number_(kTestFileNumber), vset_(vset) {}
FileManager(const TitanDBOptions& db_options, BlobFileSet* blob_file_set)
: db_options_(db_options),
number_(kTestFileNumber),
blob_file_set_(blob_file_set) {}
Status NewFile(std::unique_ptr<BlobFileHandle>* handle) override {
auto number = number_.fetch_add(1);
......@@ -41,7 +43,7 @@ class FileManager : public BlobFileManager {
Status s = handle->GetFile()->Sync(true);
if (s.ok()) {
s = handle->GetFile()->Close();
auto storage = vset_->GetBlobStorage(0).lock();
auto storage = blob_file_set_->GetBlobStorage(0).lock();
storage->AddBlobFile(file);
}
return s;
......@@ -93,7 +95,7 @@ class FileManager : public BlobFileManager {
EnvOptions env_options_;
TitanDBOptions db_options_;
std::atomic<uint64_t> number_{0};
VersionSet* vset_;
BlobFileSet* blob_file_set_;
};
class TableBuilderTest : public testing::Test {
......@@ -106,13 +108,13 @@ class TableBuilderTest : public testing::Test {
blob_name_(BlobFileName(tmpdir_, kTestFileNumber)) {
db_options_.dirname = tmpdir_;
cf_options_.min_blob_size = kMinBlobSize;
vset_.reset(new VersionSet(db_options_, nullptr));
blob_file_set_.reset(new BlobFileSet(db_options_, nullptr));
std::map<uint32_t, TitanCFOptions> cfs{{0, cf_options_}};
vset_->AddColumnFamilies(cfs);
blob_manager_.reset(new FileManager(db_options_, vset_.get()));
blob_file_set_->AddColumnFamilies(cfs);
blob_manager_.reset(new FileManager(db_options_, blob_file_set_.get()));
table_factory_.reset(new TitanTableFactory(db_options_, cf_options_,
blob_manager_, &mutex_,
vset_.get(), nullptr));
blob_file_set_.get(), nullptr));
}
~TableBuilderTest() {
......@@ -205,7 +207,7 @@ class TableBuilderTest : public testing::Test {
std::string blob_name_;
std::unique_ptr<TableFactory> table_factory_;
std::shared_ptr<BlobFileManager> blob_manager_;
std::unique_ptr<VersionSet> vset_;
std::unique_ptr<BlobFileSet> blob_file_set_;
};
TEST_F(TableBuilderTest, Basic) {
......@@ -354,8 +356,9 @@ TEST_F(TableBuilderTest, NumEntries) {
// To test size of each blob file is around blob_file_target_size after building
TEST_F(TableBuilderTest, TargetSize) {
cf_options_.blob_file_target_size = kTargetBlobFileSize;
table_factory_.reset(new TitanTableFactory(
db_options_, cf_options_, blob_manager_, &mutex_, vset_.get(), nullptr));
table_factory_.reset(new TitanTableFactory(db_options_, cf_options_,
blob_manager_, &mutex_,
blob_file_set_.get(), nullptr));
std::unique_ptr<WritableFileWriter> base_file;
NewBaseFileWriter(&base_file);
std::unique_ptr<TableBuilder> table_builder;
......@@ -384,8 +387,9 @@ TEST_F(TableBuilderTest, TargetSize) {
// correct
TEST_F(TableBuilderTest, LevelMerge) {
cf_options_.level_merge = true;
table_factory_.reset(new TitanTableFactory(
db_options_, cf_options_, blob_manager_, &mutex_, vset_.get(), nullptr));
table_factory_.reset(new TitanTableFactory(db_options_, cf_options_,
blob_manager_, &mutex_,
blob_file_set_.get(), nullptr));
std::unique_ptr<WritableFileWriter> base_file;
NewBaseFileWriter(&base_file);
std::unique_ptr<TableBuilder> table_builder;
......@@ -437,7 +441,7 @@ TEST_F(TableBuilderTest, LevelMerge) {
// Compare key, index and blob records after level merge
first_iter->SeekToFirst();
second_iter->SeekToFirst();
auto storage = vset_->GetBlobStorage(0).lock();
auto storage = blob_file_set_->GetBlobStorage(0).lock();
for (unsigned char i = 0; i < n; i++) {
ASSERT_TRUE(first_iter->Valid());
ASSERT_TRUE(second_iter->Valid());
......
......@@ -30,7 +30,7 @@ TableBuilder* TitanTableFactory::NewTableBuilder(
{
MutexLock l(db_mutex_);
blob_storage = vset_->GetBlobStorage(column_family_id);
blob_storage = blob_file_set_->GetBlobStorage(column_family_id);
}
return new TitanTableBuilder(
......
......@@ -3,10 +3,10 @@
#include <atomic>
#include "blob_file_manager.h"
#include "blob_file_set.h"
#include "rocksdb/table.h"
#include "titan/options.h"
#include "titan_stats.h"
#include "version_set.h"
namespace rocksdb {
namespace titandb {
......@@ -16,14 +16,15 @@ class TitanTableFactory : public TableFactory {
TitanTableFactory(const TitanDBOptions& db_options,
const TitanCFOptions& cf_options,
std::shared_ptr<BlobFileManager> blob_manager,
port::Mutex* db_mutex, VersionSet* vset, TitanStats* stats)
port::Mutex* db_mutex, BlobFileSet* blob_file_set,
TitanStats* stats)
: db_options_(db_options),
cf_options_(cf_options),
blob_run_mode_(cf_options.blob_run_mode),
base_factory_(cf_options.table_factory),
blob_manager_(blob_manager),
db_mutex_(db_mutex),
vset_(vset),
blob_file_set_(blob_file_set),
stats_(stats) {}
const char* Name() const override { return "TitanTable"; }
......@@ -67,7 +68,7 @@ class TitanTableFactory : public TableFactory {
std::shared_ptr<TableFactory> base_factory_;
std::shared_ptr<BlobFileManager> blob_manager_;
port::Mutex* db_mutex_;
VersionSet* vset_;
BlobFileSet* blob_file_set_;
TitanStats* stats_;
};
......
......@@ -102,7 +102,7 @@ class TitanDBTest : public testing::Test {
}
Status LogAndApply(VersionEdit& edit) {
return db_impl_->vset_->LogAndApply(edit);
return db_impl_->blob_file_set_->LogAndApply(edit);
}
void Put(uint64_t k, std::map<std::string, std::string>* data = nullptr) {
......@@ -132,7 +132,7 @@ class TitanDBTest : public testing::Test {
cf_handle = db_->DefaultColumnFamily();
}
MutexLock l(&db_impl_->mutex_);
return db_impl_->vset_->GetBlobStorage(cf_handle->GetID());
return db_impl_->blob_file_set_->GetBlobStorage(cf_handle->GetID());
}
void VerifyDB(const std::map<std::string, std::string>& data,
......@@ -924,7 +924,7 @@ TEST_F(TitanDBTest, BackgroundErrorTrigger) {
}
Flush();
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
SyncPoint::GetInstance()->SetCallBack("VersionSet::LogAndApply", [&](void*) {
SyncPoint::GetInstance()->SetCallBack("BlobFileSet::LogAndApply", [&](void*) {
mock_env->SetFilesystemActive(false, Status::IOError("Injected error"));
});
SyncPoint::GetInstance()->EnableProcessing();
......
......@@ -43,7 +43,7 @@ class VersionEdit {
friend bool operator==(const VersionEdit& lhs, const VersionEdit& rhs);
private:
friend class VersionSet;
friend class BlobFileSet;
friend class VersionTest;
friend class EditCollector;
......
#include "file/filename.h"
#include "test_util/testharness.h"
#include "blob_file_set.h"
#include "blob_format.h"
#include "edit_collector.h"
#include "testutil.h"
#include "util.h"
#include "version_edit.h"
#include "version_set.h"
namespace rocksdb {
namespace titandb {
......@@ -30,7 +30,7 @@ class VersionTest : public testing::Test {
TitanCFOptions cf_options_;
std::shared_ptr<BlobFileCache> file_cache_;
std::map<uint32_t, std::shared_ptr<BlobStorage>> column_families_;
std::unique_ptr<VersionSet> vset_;
std::unique_ptr<BlobFileSet> blob_file_set_;
port::Mutex mutex_;
std::string dbname_;
Env* env_;
......@@ -50,8 +50,8 @@ class VersionTest : public testing::Test {
DeleteDir(env_, db_options_.dirname);
env_->CreateDirIfMissing(db_options_.dirname);
vset_.reset(new VersionSet(db_options_, nullptr));
ASSERT_OK(vset_->Open({}));
blob_file_set_.reset(new BlobFileSet(db_options_, nullptr));
ASSERT_OK(blob_file_set_->Open({}));
column_families_.clear();
// Sets up some column families.
for (uint32_t id = 0; id < 10; id++) {
......@@ -61,7 +61,7 @@ class VersionTest : public testing::Test {
column_families_.emplace(id, storage);
storage.reset(
new BlobStorage(db_options_, cf_options_, id, file_cache_, nullptr));
vset_->column_families_.emplace(id, storage);
blob_file_set_->column_families_.emplace(id, storage);
}
}
......@@ -85,9 +85,9 @@ class VersionTest : public testing::Test {
for (auto& edit : edits) {
ASSERT_OK(collector.AddEdit(edit));
}
ASSERT_OK(collector.Seal(*vset_.get()));
ASSERT_OK(collector.Apply(*vset_.get()));
for (auto& it : vset_->column_families_) {
ASSERT_OK(collector.Seal(*blob_file_set_.get()));
ASSERT_OK(collector.Apply(*blob_file_set_.get()));
for (auto& it : blob_file_set_->column_families_) {
auto& storage = column_families_[it.first];
// ignore obsolete file
auto size = 0;
......@@ -106,7 +106,7 @@ class VersionTest : public testing::Test {
}
void CheckColumnFamiliesSize(uint64_t size) {
ASSERT_EQ(vset_->column_families_.size(), size);
ASSERT_EQ(blob_file_set_->column_families_.size(), size);
}
void LegacyEncode(const VersionEdit& edit, std::string* dst) {
......@@ -164,8 +164,8 @@ TEST_F(VersionTest, InvalidEdit) {
auto add1_0_4 = AddBlobFilesEdit(1, 0, 4);
EditCollector collector;
ASSERT_OK(collector.AddEdit(add1_0_4));
ASSERT_OK(collector.Seal(*vset_.get()));
ASSERT_OK(collector.Apply(*vset_.get()));
ASSERT_OK(collector.Seal(*blob_file_set_.get()));
ASSERT_OK(collector.Apply(*blob_file_set_.get()));
}
// delete nonexistent blobs
......@@ -173,8 +173,8 @@ TEST_F(VersionTest, InvalidEdit) {
auto del1_4_6 = DeleteBlobFilesEdit(1, 4, 6);
EditCollector collector;
ASSERT_OK(collector.AddEdit(del1_4_6));
ASSERT_NOK(collector.Seal(*vset_.get()));
ASSERT_NOK(collector.Apply(*vset_.get()));
ASSERT_NOK(collector.Seal(*blob_file_set_.get()));
ASSERT_NOK(collector.Apply(*blob_file_set_.get()));
}
// add already existing blobs
......@@ -182,8 +182,8 @@ TEST_F(VersionTest, InvalidEdit) {
auto add1_1_3 = AddBlobFilesEdit(1, 1, 3);
EditCollector collector;
ASSERT_OK(collector.AddEdit(add1_1_3));
ASSERT_NOK(collector.Seal(*vset_.get()));
ASSERT_NOK(collector.Apply(*vset_.get()));
ASSERT_NOK(collector.Seal(*blob_file_set_.get()));
ASSERT_NOK(collector.Apply(*blob_file_set_.get()));
}
// add same blobs
......@@ -193,8 +193,8 @@ TEST_F(VersionTest, InvalidEdit) {
EditCollector collector;
ASSERT_OK(collector.AddEdit(add1_4_5_1));
ASSERT_NOK(collector.AddEdit(add1_4_5_2));
ASSERT_NOK(collector.Seal(*vset_.get()));
ASSERT_NOK(collector.Apply(*vset_.get()));
ASSERT_NOK(collector.Seal(*blob_file_set_.get()));
ASSERT_NOK(collector.Apply(*blob_file_set_.get()));
}
// delete same blobs
......@@ -204,8 +204,8 @@ TEST_F(VersionTest, InvalidEdit) {
EditCollector collector;
ASSERT_OK(collector.AddEdit(del1_3_4_1));
ASSERT_NOK(collector.AddEdit(del1_3_4_2));
ASSERT_NOK(collector.Seal(*vset_.get()));
ASSERT_NOK(collector.Apply(*vset_.get()));
ASSERT_NOK(collector.Seal(*blob_file_set_.get()));
ASSERT_NOK(collector.Apply(*blob_file_set_.get()));
}
}
......@@ -252,35 +252,35 @@ TEST_F(VersionTest, ObsoleteFiles) {
std::map<uint32_t, TitanCFOptions> m;
m.insert({1, TitanCFOptions()});
m.insert({2, TitanCFOptions()});
vset_->AddColumnFamilies(m);
blob_file_set_->AddColumnFamilies(m);
{
auto add1_1_5 = AddBlobFilesEdit(1, 1, 5);
MutexLock l(&mutex_);
vset_->LogAndApply(add1_1_5);
blob_file_set_->LogAndApply(add1_1_5);
}
std::vector<std::string> of;
vset_->GetObsoleteFiles(&of, kMaxSequenceNumber);
blob_file_set_->GetObsoleteFiles(&of, kMaxSequenceNumber);
ASSERT_EQ(of.size(), 0);
{
auto del1_4_5 = DeleteBlobFilesEdit(1, 4, 5);
MutexLock l(&mutex_);
vset_->LogAndApply(del1_4_5);
blob_file_set_->LogAndApply(del1_4_5);
}
vset_->GetObsoleteFiles(&of, kMaxSequenceNumber);
blob_file_set_->GetObsoleteFiles(&of, kMaxSequenceNumber);
ASSERT_EQ(of.size(), 1);
std::vector<uint32_t> cfs = {1};
ASSERT_OK(vset_->DropColumnFamilies(cfs, 0));
vset_->GetObsoleteFiles(&of, kMaxSequenceNumber);
ASSERT_OK(blob_file_set_->DropColumnFamilies(cfs, 0));
blob_file_set_->GetObsoleteFiles(&of, kMaxSequenceNumber);
ASSERT_EQ(of.size(), 1);
CheckColumnFamiliesSize(10);
ASSERT_OK(vset_->MaybeDestroyColumnFamily(1));
vset_->GetObsoleteFiles(&of, kMaxSequenceNumber);
ASSERT_OK(blob_file_set_->MaybeDestroyColumnFamily(1));
blob_file_set_->GetObsoleteFiles(&of, kMaxSequenceNumber);
ASSERT_EQ(of.size(), 4);
CheckColumnFamiliesSize(9);
ASSERT_OK(vset_->MaybeDestroyColumnFamily(2));
ASSERT_OK(blob_file_set_->MaybeDestroyColumnFamily(2));
CheckColumnFamiliesSize(8);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment