Commit 05a37557 authored by Connor's avatar Connor Committed by yiwu-arbug

Delete blob files only after destroy column handle (#6)

As Rocksdb described, `DropColumnFamilies()` only records the drop of the column family specified by ColumnFamilyHandle. The actual data is not deleted until the client calls `delete column_family`, namely `DestroyColumnFamilyHandle()`. We can still continue using the column family if we have outstanding ColumnFamilyHandle pointer.	

now there are some problems:

- after dropping column family, the blob files of it may be deleted before column family handle is destroyed.
- When opening DB without some existing column families(not supported yet), the blob file of these will be regarded as obsolete files and deleted. 
- After destroyed column handle(not drop the column, just not need to read/write it anymore), we need to remove inner blob storage and evict related cache.
parent b552d16f
......@@ -72,9 +72,11 @@ class TitanDB : public StackableDB {
return DropColumnFamilies({handle});
}
virtual Status DropColumnFamilies(
Status DropColumnFamilies(
const std::vector<ColumnFamilyHandle*>& handles) override = 0;
Status DestroyColumnFamilyHandle(ColumnFamilyHandle* column_family) override = 0;
using StackableDB::Merge;
Status Merge(const WriteOptions&, ColumnFamilyHandle*, const Slice& /*key*/,
const Slice& /*value*/) override {
......@@ -82,14 +84,14 @@ class TitanDB : public StackableDB {
}
using rocksdb::StackableDB::SingleDelete;
virtual Status SingleDelete(const WriteOptions& /*wopts*/,
Status SingleDelete(const WriteOptions& /*wopts*/,
ColumnFamilyHandle* /*column_family*/,
const Slice& /*key*/) override {
return Status::NotSupported("Not supported operation in titan db.");
}
using rocksdb::StackableDB::CompactFiles;
virtual Status CompactFiles(
Status CompactFiles(
const CompactionOptions& compact_options,
ColumnFamilyHandle* column_family,
const std::vector<std::string>& input_file_names, const int output_level,
......
......@@ -22,7 +22,7 @@ class BlobGCPickerTest : public testing::Test {
const TitanCFOptions& titan_cf_options) {
auto blob_file_cache = std::make_shared<BlobFileCache>(
titan_db_options, titan_cf_options, NewLRUCache(128));
blob_storage_.reset(new BlobStorage(titan_cf_options, blob_file_cache));
blob_storage_.reset(new BlobStorage(titan_db_options, titan_cf_options, blob_file_cache));
basic_blob_gc_picker_.reset(new BasicBlobGCPicker(titan_db_options, titan_cf_options));
}
......
......@@ -47,12 +47,40 @@ void BlobStorage::AddBlobFile(std::shared_ptr<BlobFileMeta>& file) {
files_.emplace(std::make_pair(file->file_number(), file));
}
void BlobStorage::DeleteBlobFile(uint64_t file) {
{
void BlobStorage::MarkFileObsolete(std::shared_ptr<BlobFileMeta> file, SequenceNumber obsolete_sequence) {
WriteLock wl(&mutex_);
files_.erase(file);
obsolete_files_.push_back(std::make_pair(file->file_number(), obsolete_sequence));
file->FileStateTransit(BlobFileMeta::FileEvent::kDelete);
}
void BlobStorage::GetObsoleteFiles(std::vector<std::string>* obsolete_files, SequenceNumber oldest_sequence) {
WriteLock wl(&mutex_);
for (auto it = obsolete_files_.begin(); it != obsolete_files_.end();) {
auto& file_number = it->first;
auto& obsolete_sequence = it->second;
// We check whether the oldest snapshot is no less than the last sequence
// by the time the blob file become obsolete. If so, the blob file is not
// visible to all existing snapshots.
if (oldest_sequence > obsolete_sequence) {
// remove obsolete files
files_.erase(file_number);
file_cache_->Evict(file_number);
ROCKS_LOG_INFO(db_options_.info_log,
"Obsolete blob file %" PRIu64 " (obsolete at %" PRIu64
") not visible to oldest snapshot %" PRIu64 ", delete it.",
file_number, obsolete_sequence, oldest_sequence);
if (obsolete_files) {
obsolete_files->emplace_back(
BlobFileName(db_options_.dirname, file_number));
}
it = obsolete_files_.erase(it);
continue;
}
++it;
}
file_cache_->Evict(file);
}
void BlobStorage::ComputeGCScore() {
......@@ -69,7 +97,7 @@ void BlobStorage::ComputeGCScore() {
auto& gcs = gc_score_.back();
gcs.file_number = file.first;
if (file.second->file_size() <
titan_cf_options_.merge_small_file_threshold) {
cf_options_.merge_small_file_threshold) {
gcs.score = 1;
} else {
gcs.score = file.second->GetDiscardableRatio();
......
#pragma once
#include <inttypes.h>
#include "rocksdb/options.h"
#include "blob_file_cache.h"
#include "blob_format.h"
......@@ -12,15 +14,22 @@ namespace titandb {
// column family. The version must be valid when this storage is used.
class BlobStorage {
public:
BlobStorage(const BlobStorage& bs) : mutex_() {
BlobStorage(const BlobStorage& bs) : destroyed_(false) {
this->files_ = bs.files_;
this->file_cache_ = bs.file_cache_;
this->titan_cf_options_ = bs.titan_cf_options_;
this->db_options_ = bs.db_options_;
this->cf_options_ = bs.cf_options_;
}
BlobStorage(const TitanCFOptions& _options,
BlobStorage(const TitanDBOptions& _db_options, const TitanCFOptions& _cf_options,
std::shared_ptr<BlobFileCache> _file_cache)
: titan_cf_options_(_options), mutex_(), file_cache_(_file_cache) {}
: db_options_(_db_options), cf_options_(_cf_options), file_cache_(_file_cache), destroyed_(false) {}
~BlobStorage() {
for (auto& file: files_) {
file_cache_->Evict(file.second->file_number());
}
}
// Gets the blob record pointed by the blob index. The provided
// buffer is used to store the record data, so the buffer must be
......@@ -48,19 +57,30 @@ class BlobStorage {
WriteLock wl(&mutex_);
for (auto& file : files_) {
file.second->FileStateTransit(BlobFileMeta::FileEvent::kDbRestart);
// file.second->marked_for_gc_ = true;
}
}
void MarkDestroyed() {
WriteLock wl(&mutex_);
destroyed_ = true;
}
bool MaybeRemove() const {
ReadLock rl(&mutex_);
return destroyed_ && obsolete_files_.empty();
}
const std::vector<GCScore> gc_score() { return gc_score_; }
void ComputeGCScore();
const TitanCFOptions& titan_cf_options() { return titan_cf_options_; }
const TitanCFOptions& cf_options() { return cf_options_; }
void AddBlobFile(std::shared_ptr<BlobFileMeta>& file);
void DeleteBlobFile(uint64_t file);
void GetObsoleteFiles(std::vector<std::string>* obsolete_files, SequenceNumber oldest_sequence);
void MarkFileObsolete(std::shared_ptr<BlobFileMeta> file, SequenceNumber obsolete_sequence);
private:
friend class VersionSet;
......@@ -69,7 +89,8 @@ class BlobStorage {
friend class BlobGCJobTest;
friend class BlobFileSizeCollectorTest;
TitanCFOptions titan_cf_options_;
TitanDBOptions db_options_;
TitanCFOptions cf_options_;
// Read Write Mutex, which protects the `files_` structures
mutable port::RWMutex mutex_;
......@@ -79,6 +100,11 @@ class BlobStorage {
std::shared_ptr<BlobFileCache> file_cache_;
std::vector<GCScore> gc_score_;
std::list<std::pair<uint64_t, SequenceNumber>> obsolete_files_;
// It is marked when the column family handle is destroyed, indicating the
// in-memory data structure can be destroyed. Physical files may still be kept.
bool destroyed_;
};
} // namespace titandb
......
......@@ -270,15 +270,15 @@ Status TitanDBImpl::CreateColumnFamilies(
base_descs.emplace_back(desc.name, options);
}
MutexLock l(&mutex_);
Status s = db_impl_->CreateColumnFamilies(base_descs, handles);
assert(handles->size() == descs.size());
if (s.ok()) {
std::map<uint32_t, TitanCFOptions> column_families;
for (size_t i = 0; i < descs.size(); i++) {
column_families.emplace((*handles)[i]->GetID(), descs[i].options);
}
MutexLock l(&mutex_);
vset_->AddColumnFamilies(column_families);
}
return s;
......@@ -287,24 +287,26 @@ Status TitanDBImpl::CreateColumnFamilies(
Status TitanDBImpl::DropColumnFamilies(
const std::vector<ColumnFamilyHandle*>& handles) {
std::vector<uint32_t> column_families;
std::vector<ColumnFamilyData*> cfds;
for (auto& handle : handles) {
column_families.push_back(handle->GetID());
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(handle)->cfd();
cfds.push_back(cfd);
for (auto& handle: handles) {
column_families.emplace_back(handle->GetID());
}
MutexLock l(&mutex_);
// TODO:
// As rocksdb described, `DropColumnFamilies()` only records the drop of the column family specified by ColumnFamilyHandle.
// The actual data is not deleted until the client calls `delete column_family`, namely `DestroyColumnFamilyHandle()`.
// We can still continue using the column family if we have outstanding ColumnFamilyHandle pointer.
// So we should delete blob files in `DestroyColumnFamilyHandle()` but not here.
Status s = db_impl_->DropColumnFamilies(handles);
if (s.ok()) {
MutexLock l(&mutex_);
SequenceNumber obsolete_sequence = db_impl_->GetLatestSequenceNumber();
vset_->DropColumnFamilies(column_families, obsolete_sequence);
s = vset_->DropColumnFamilies(column_families, obsolete_sequence);
}
return s;
}
Status TitanDBImpl::DestroyColumnFamilyHandle(ColumnFamilyHandle* column_family) {
auto cf_id = column_family->GetID();
Status s = db_impl_->DestroyColumnFamilyHandle(column_family);
if (s.ok()) {
MutexLock l(&mutex_);
// it just changes some marks and doesn't delete blob files physically.
vset_->DestroyColumnFamily(cf_id);
}
return s;
}
......
......@@ -28,6 +28,8 @@ class TitanDBImpl : public TitanDB {
Status DropColumnFamilies(
const std::vector<ColumnFamilyHandle*>& handles) override;
Status DestroyColumnFamilyHandle(ColumnFamilyHandle* column_family) override;
using TitanDB::CompactFiles;
Status CompactFiles(
const CompactionOptions& compact_options,
......
......@@ -5,21 +5,11 @@ namespace titandb {
void TitanDBImpl::PurgeObsoleteFiles() {
Status s;
ObsoleteFiles obsolete_files;
std::vector<std::string> candidate_files;
auto oldest_sequence = GetOldestSnapshotSequence();
{
MutexLock l(&mutex_);
vset_->GetObsoleteFiles(&obsolete_files, oldest_sequence);
}
{
std::vector<std::string> candidate_files;
for (auto& blob_file : obsolete_files.blob_files) {
candidate_files.emplace_back(
BlobFileName(db_options_.dirname, std::get<0>(blob_file)));
}
for (auto& manifest : obsolete_files.manifests) {
candidate_files.emplace_back(std::move(manifest));
vset_->GetObsoleteFiles(&candidate_files, oldest_sequence);
}
// dedup state.inputs so we don't try to delete the same
......@@ -39,7 +29,6 @@ void TitanDBImpl::PurgeObsoleteFiles() {
abort();
}
}
}
}
} // namespace titandb
......
......@@ -69,9 +69,9 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) {
uint32_t column_family_id = PopFirstFromGCQueue();
auto bs = vset_->GetBlobStorage(column_family_id).lock().get();
const auto& titan_cf_options = bs->titan_cf_options();
const auto& cf_options = bs->cf_options();
std::shared_ptr<BlobGCPicker> blob_gc_picker =
std::make_shared<BasicBlobGCPicker>(db_options_, titan_cf_options);
std::make_shared<BasicBlobGCPicker>(db_options_, cf_options);
blob_gc = blob_gc_picker->PickBlobGC(bs);
if (blob_gc) {
......
......@@ -53,6 +53,7 @@ class TitanDBTest : public testing::Test {
}
cf_handles_.clear();
ASSERT_OK(TitanDB::Open(db_options, dbname_, descs, &cf_handles_, &db_));
db_impl_ = reinterpret_cast<TitanDBImpl*>(db_);
}
}
......@@ -120,6 +121,8 @@ class TitanDBTest : public testing::Test {
}
void VerifyDB(const std::map<std::string, std::string>& data, ReadOptions ropts = ReadOptions()) {
db_impl_->PurgeObsoleteFiles();
for (auto& kv : data) {
std::string value;
ASSERT_OK(db_->Get(ropts, kv.first, &value));
......@@ -378,7 +381,7 @@ TEST_F(TitanDBTest, IngestExternalFiles) {
}
}
TEST_F(TitanDBTest, ReadAfterDropCF) {
TEST_F(TitanDBTest, DropColumnFamily) {
Open();
const uint64_t kNumCF = 3;
for(uint64_t i = 1; i <= kNumCF; i++) {
......@@ -392,10 +395,28 @@ TEST_F(TitanDBTest, ReadAfterDropCF) {
VerifyDB(data);
Flush();
VerifyDB(data);
// Destroy column families handle, check whether the data is preserved after a round of GC and restart.
for (auto& handle : cf_handles_) {
db_->DestroyColumnFamilyHandle(handle);
}
cf_handles_.clear();
VerifyDB(data);
Reopen();
VerifyDB(data);
for(auto& handle : cf_handles_) {
// we can't drop default column family
if (handle->GetName() == kDefaultColumnFamilyName) {
continue;
}
ASSERT_OK(db_->DropColumnFamily(handle));
// The data is actually deleted only after destroying all outstanding column family handles,
// so we can still read from the dropped column family.
VerifyDB(data);
}
Close();
}
#ifndef NDEBUG
......
......@@ -24,6 +24,7 @@ void VersionEdit::EncodeTo(std::string* dst) const {
file->EncodeTo(dst);
}
for (auto& file : deleted_files_) {
// obsolete sequence is a inpersistent field, so no need to encode it.
PutVarint32Varint64(dst, kDeletedBlobFile, file.first);
}
}
......
......@@ -2,7 +2,6 @@
#include <inttypes.h>
#include "util/autovector.h"
#include "util/filename.h"
namespace rocksdb {
......@@ -80,7 +79,8 @@ Status VersionSet::Recover() {
VersionEdit edit;
s = DecodeInto(record, &edit);
if (!s.ok()) return s;
Apply(&edit);
s = Apply(&edit);
if (!s.ok()) return s;
if (edit.has_next_file_number_) {
assert(edit.next_file_number_ >= next_file_number);
next_file_number = edit.next_file_number_;
......@@ -102,25 +102,13 @@ Status VersionSet::Recover() {
std::set<uint64_t> alive_files;
alive_files.insert(new_manifest_file_number);
for (const auto& bs : column_families_) {
autovector<uint64_t> obsolete_files;
// delete obsoleted files at reopen
// all the obsolete files's obsolete sequence are 0
bs.second->GetObsoleteFiles(nullptr, kMaxSequenceNumber);
for (const auto& f : bs.second->files_) {
if (f.second->is_obsolete()) {
// delete already obsoleted files at reopen
obsolete_files.push_back(f.second->file_number());
for (auto it = obsolete_files_.blob_files.begin(); it != obsolete_files_.blob_files.end(); ++it) {
if (std::get<0>(*it) == f.second->file_number()) {
it = this->obsolete_files_.blob_files.erase(it);
break;
}
}
} else {
alive_files.insert(f.second->file_number());
}
}
for (uint64_t obsolete_file : obsolete_files) {
bs.second->DeleteBlobFile(obsolete_file);
}
}
std::vector<std::string> files;
env_->GetChildren(dirname_, &files);
for (const auto& f : files) {
......@@ -168,7 +156,7 @@ Status VersionSet::OpenManifest(uint64_t file_number) {
if (!s.ok()) {
manifest_.reset();
obsolete_files_.manifests.emplace_back(file_name);
obsolete_manifests_.emplace_back(file_name);
}
return s;
}
......@@ -215,16 +203,18 @@ Status VersionSet::LogAndApply(VersionEdit* edit) {
}
if (!s.ok()) return s;
Apply(edit);
return s;
return Apply(edit);
}
void VersionSet::Apply(VersionEdit* edit) {
Status VersionSet::Apply(VersionEdit* edit) {
auto cf_id = edit->column_family_id_;
auto it = column_families_.find(cf_id);
if (it == column_families_.end()) {
// Ignore unknown column families.
return;
// TODO: support OpenForReadOnly which doesn't open DB with all column family
// so there are maybe some invalid column family, but we can't just skip it
// otherwise blob files of the non-open column families will be regarded as
// obsolete and deleted.
return Status::OK();
}
auto& files = it->second->files_;
......@@ -238,7 +228,7 @@ void VersionSet::Apply(VersionEdit* edit) {
fprintf(stderr, "blob file %" PRIu64 " has been deleted before\n", number);
abort();
}
MarkFileObsolete(blob_it->second, file.second, cf_id);
it->second->MarkFileObsolete(blob_it->second, file.second);
}
for (auto& file : edit->added_files_) {
......@@ -256,20 +246,22 @@ void VersionSet::Apply(VersionEdit* edit) {
}
it->second->ComputeGCScore();
return Status::OK();
}
void VersionSet::AddColumnFamilies(const std::map<uint32_t, TitanCFOptions>& column_families) {
for (auto& cf : column_families) {
auto file_cache =
std::make_shared<BlobFileCache>(db_options_, cf.second, file_cache_);
auto blob_storage = std::make_shared<BlobStorage>(cf.second, file_cache);
auto blob_storage = std::make_shared<BlobStorage>(db_options_, cf.second, file_cache);
column_families_.emplace(cf.first, blob_storage);
}
}
void VersionSet::DropColumnFamilies(const std::vector<uint32_t>& column_families, SequenceNumber obsolete_sequence) {
for (auto& cf : column_families) {
auto it = column_families_.find(cf);
Status VersionSet::DropColumnFamilies(const std::vector<uint32_t>& column_families, SequenceNumber obsolete_sequence) {
Status s;
for (auto& cf_id : column_families) {
auto it = column_families_.find(cf_id);
if (it != column_families_.end()) {
VersionEdit edit;
edit.SetColumnFamilyID(it->first);
......@@ -278,51 +270,56 @@ void VersionSet::DropColumnFamilies(const std::vector<uint32_t>& column_families
file.second->file_number());
edit.DeleteBlobFile(file.first, obsolete_sequence);
}
// TODO: check status
LogAndApply(&edit);
s = LogAndApply(&edit);
if (!s.ok()) return s;
} else {
ROCKS_LOG_ERROR(db_options_.info_log,
"column %u not found for drop\n", cf_id);
return Status::NotFound("invalid column family");
}
obsolete_columns_.insert(cf);
obsolete_columns_.insert(cf_id);
}
return s;
}
void VersionSet::MarkFileObsolete(std::shared_ptr<BlobFileMeta> file, SequenceNumber obsolete_sequence, uint32_t cf_id) {
obsolete_files_.blob_files.push_back(std::make_tuple(file->file_number(), obsolete_sequence, cf_id));
file->FileStateTransit(BlobFileMeta::FileEvent::kDelete);
}
void VersionSet::GetObsoleteFiles(ObsoleteFiles* obsolete_files, SequenceNumber oldest_sequence) {
for (auto tuple_it = obsolete_files_.blob_files.begin(); tuple_it != obsolete_files_.blob_files.end();) {
auto& obsolete_sequence = std::get<1>(*tuple_it);
// We check whether the oldest snapshot is no less than the last sequence
// by the time the blob file become obsolete. If so, the blob file is not
// visible to all existing snapshots.
if (oldest_sequence > obsolete_sequence) {
auto& file_number = std::get<0>(*tuple_it);
auto& cf_id = std::get<2>(*tuple_it);
ROCKS_LOG_INFO(db_options_.info_log,
"Obsolete blob file %" PRIu64 " (obsolete at %" PRIu64
") not visible to oldest snapshot %" PRIu64 ", delete it.",
file_number, obsolete_sequence, oldest_sequence);
// Cleanup obsolete column family when all the blob files for that are deleted.
Status VersionSet::DestroyColumnFamily(uint32_t cf_id) {
obsolete_columns_.erase(cf_id);
auto it = column_families_.find(cf_id);
if (it != column_families_.end()) {
it->second->DeleteBlobFile(file_number);
if (it->second->files_.empty() && obsolete_columns_.find(cf_id) != obsolete_columns_.end()) {
it->second->MarkDestroyed();
if (it->second->MaybeRemove()) {
column_families_.erase(it);
obsolete_columns_.erase(cf_id);
}
} else {
fprintf(stderr, "column %u not found when deleting obsolete file%" PRIu64 "\n",
cf_id, file_number);
abort();
return Status::OK();
}
auto now = tuple_it++;
obsolete_files->blob_files.splice(obsolete_files->blob_files.end(), obsolete_files_.blob_files, now);
} else {
++tuple_it;
ROCKS_LOG_ERROR(db_options_.info_log,
"column %u not found for destroy\n", cf_id);
return Status::NotFound("invalid column family");
}
void VersionSet::GetObsoleteFiles(std::vector<std::string>* obsolete_files, SequenceNumber oldest_sequence) {
for (auto it = column_families_.begin(); it != column_families_.end();) {
auto& cf_id = it->first;
auto& blob_storage = it->second;
// In the case of dropping column family, obsolete blob files can be deleted only
// after the column family handle is destroyed.
if (obsolete_columns_.find(cf_id) != obsolete_columns_.end()) {
++it;
continue;
}
blob_storage->GetObsoleteFiles(obsolete_files, oldest_sequence);
// Cleanup obsolete column family when all the blob files for that are deleted.
if (blob_storage->MaybeRemove()) {
it = column_families_.erase(it);
continue;
}
++it;
}
obsolete_files_.manifests.swap(obsolete_files->manifests);
obsolete_files->insert(obsolete_files->end(), obsolete_manifests_.begin(), obsolete_manifests_.end());
obsolete_manifests_.clear();
}
} // namespace titandb
......
......@@ -19,20 +19,6 @@
namespace rocksdb {
namespace titandb {
struct ObsoleteFiles {
ObsoleteFiles() = default;
ObsoleteFiles(const ObsoleteFiles&) = delete;
ObsoleteFiles& operator=(const ObsoleteFiles&) = delete;
ObsoleteFiles(ObsoleteFiles&&) = delete;
ObsoleteFiles& operator=(ObsoleteFiles&&) = delete;
// TODO: make it map
// file_number -> (obsolete_sequence, cf_id)
std::list<std::tuple<uint64_t, SequenceNumber, uint32_t>> blob_files;
std::vector<std::string> manifests;
};
class VersionSet {
public:
explicit VersionSet(const TitanDBOptions& options);
......@@ -57,7 +43,12 @@ class VersionSet {
// Drops some column families. The obsolete files will be deleted in
// background when they will not be accessed anymore.
// REQUIRES: mutex is held
void DropColumnFamilies(const std::vector<uint32_t>& column_families, SequenceNumber obsolete_sequence);
Status DropColumnFamilies(const std::vector<uint32_t>& handles, SequenceNumber obsolete_sequence);
// Destroy the column family. Only after this is called, the obsolete files
// of the dropped column family can be physical deleted.
// REQUIRES: mutex is held
Status DestroyColumnFamily(uint32_t cf_id);
// Allocates a new file number.
uint64_t NewFileNumber() { return next_file_number_.fetch_add(1); }
......@@ -72,7 +63,7 @@ class VersionSet {
}
// REQUIRES: mutex is held
void GetObsoleteFiles(ObsoleteFiles* obsolete_files, SequenceNumber oldest_sequence);
void GetObsoleteFiles(std::vector<std::string>* obsolete_files, SequenceNumber oldest_sequence);
// REQUIRES: mutex is held
void MarkAllFilesForGC() {
......@@ -90,9 +81,7 @@ class VersionSet {
Status WriteSnapshot(log::Writer* log);
void Apply(VersionEdit* edit);
void MarkFileObsolete(std::shared_ptr<BlobFileMeta> file, SequenceNumber obsolete_sequence, uint32_t cf_id);
Status Apply(VersionEdit* edit);
std::string dirname_;
Env* env_;
......@@ -100,7 +89,12 @@ class VersionSet {
TitanDBOptions db_options_;
std::shared_ptr<Cache> file_cache_;
ObsoleteFiles obsolete_files_;
std::vector<std::string> obsolete_manifests_;
// As rocksdb described, `DropColumnFamilies()` only records the drop of the column family specified by ColumnFamilyHandle.
// The actual data is not deleted until the client calls `delete column_family`, namely `DestroyColumnFamilyHandle()`.
// We can still continue using the column family if we have outstanding ColumnFamilyHandle pointer.
// So here record the dropped column family but the handler is not destroyed.
std::unordered_set<uint32_t> obsolete_columns_;
std::unordered_map<uint32_t, std::shared_ptr<BlobStorage>> column_families_;
......
......@@ -18,7 +18,7 @@ void DeleteDir(Env* env, const std::string& dirname) {
ASSERT_OK(env->DeleteFile(dirname + "/" + fname));
}
}
env->DeleteDir(dirname);
ASSERT_OK(env->DeleteDir(dirname));
}
class VersionTest : public testing::Test {
......@@ -43,16 +43,18 @@ class VersionTest : public testing::Test {
}
void Reset() {
DeleteDir(env_, dbname_);
DeleteDir(env_, db_options_.dirname);
env_->CreateDirIfMissing(db_options_.dirname);
vset_.reset(new VersionSet(db_options_));
ASSERT_OK(vset_->Open({}));
column_families_.clear();
// Sets up some column families.
for (uint32_t id = 0; id < 10; id++) {
std::shared_ptr<BlobStorage> storage;
storage.reset(new BlobStorage(cf_options_, file_cache_));
storage.reset(new BlobStorage(db_options_, cf_options_, file_cache_));
column_families_.emplace(id, storage);
storage.reset(new BlobStorage(cf_options_, file_cache_));
storage.reset(new BlobStorage(db_options_, cf_options_, file_cache_));
vset_->column_families_.emplace(id, storage);
}
}
......@@ -74,7 +76,7 @@ class VersionTest : public testing::Test {
void BuildAndCheck(std::vector<VersionEdit> edits) {
for (auto& edit : edits) {
vset_->Apply(&edit);
ASSERT_OK(vset_->Apply(&edit));
}
for (auto& it : vset_->column_families_) {
auto& storage = column_families_[it.first];
......@@ -93,6 +95,10 @@ class VersionTest : public testing::Test {
}
}
}
void CheckColumnFamiliesSize(uint64_t size) {
ASSERT_EQ(vset_->column_families_.size(), size);
}
};
TEST_F(VersionTest, VersionEdit) {
......@@ -168,24 +174,40 @@ TEST_F(VersionTest, VersionBuilder) {
}
TEST_F(VersionTest, ObsoleteFiles) {
CheckColumnFamiliesSize(10);
std::map<uint32_t, TitanCFOptions> m;
m.insert({1, TitanCFOptions()});
m.insert({2, TitanCFOptions()});
vset_->AddColumnFamilies(m);
{
auto add1_0_4 = AddBlobFilesEdit(1, 0, 4);
auto add1_1_5 = AddBlobFilesEdit(1, 1, 5);
MutexLock l(&mutex_);
vset_->LogAndApply(&add1_0_4);
vset_->LogAndApply(&add1_1_5);
}
ObsoleteFiles of;
std::vector<std::string> of;
vset_->GetObsoleteFiles(&of, kMaxSequenceNumber);
ASSERT_EQ(of.blob_files.size(), 0);
ASSERT_EQ(of.size(), 0);
{
auto del1_3_4 = DeleteBlobFilesEdit(1, 3, 4);
auto del1_4_5 = DeleteBlobFilesEdit(1, 4, 5);
MutexLock l(&mutex_);
vset_->LogAndApply(&del1_3_4);
vset_->LogAndApply(&del1_4_5);
}
vset_->GetObsoleteFiles(&of, kMaxSequenceNumber);
ASSERT_EQ(of.blob_files.size(), 1);
ASSERT_EQ(of.size(), 1);
std::vector<uint32_t> cfs = {1};
ASSERT_OK(vset_->DropColumnFamilies(cfs, 0));
vset_->GetObsoleteFiles(&of, kMaxSequenceNumber);
ASSERT_EQ(of.size(), 1);
CheckColumnFamiliesSize(10);
ASSERT_OK(vset_->DestroyColumnFamily(1));
vset_->GetObsoleteFiles(&of, kMaxSequenceNumber);
ASSERT_EQ(of.size(), 4);
CheckColumnFamiliesSize(9);
ASSERT_OK(vset_->DestroyColumnFamily(2));
CheckColumnFamiliesSize(8);
}
} // namespace titandb
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment