Unverified Commit bc358745 authored by Myth's avatar Myth Committed by GitHub

Add checkpoint support (#207)

* Add checkpoint support
Signed-off-by: 's avatarMyth <caipengbo@outlook.com>
parent 1098aac8
...@@ -103,6 +103,7 @@ if (WITH_TITAN_TESTS AND (NOT CMAKE_BUILD_TYPE STREQUAL "Release")) ...@@ -103,6 +103,7 @@ if (WITH_TITAN_TESTS AND (NOT CMAKE_BUILD_TYPE STREQUAL "Release"))
table_builder_test table_builder_test
thread_safety_test thread_safety_test
titan_db_test titan_db_test
titan_checkpoint_test
titan_options_test titan_options_test
util_test util_test
compaction_filter_test compaction_filter_test
......
#pragma once
#include "db.h"
namespace rocksdb {
namespace titandb {
class Checkpoint {
public:
// Creates a Checkpoint object to be used for creating openable snapshots
static Status Create(TitanDB* db, Checkpoint** checkpoint_ptr);
// Builds an openable snapshot of TitanDB.
// base_checkpoint_dir: checkpoint directory of base DB
// titan_checkpoint_dir: checkpoint directory of TitanDB, if not specified,
// default value is {base_checkpoint_dir}/titandb.
// The specified directory should contain absolute path and not exist, it
// will be created by the API.
// When a checkpoint is created:
// (1) SST and blob files are hard linked if the output directory is on the
// same filesystem as the database, and copied otherwise.
// (2) MANIFEST file specific to TitanDB will be regenerated based on all
// existing blob files.
// (3) other required files are always copied.
// log_size_for_flush: if the total log file size is equal or larger than
// this value, then a flush is triggered for all the column families. The
// default value is 0, which means flush is always triggered. If you move
// away from the default, the checkpoint may not contain up-to-date data
// if WAL writing is not always enabled.
// Flush will always trigger if it is 2PC.
virtual Status CreateCheckpoint(const std::string& base_checkpoint_dir,
const std::string& titan_checkpoint_dir = "",
uint64_t log_size_for_flush = 0);
virtual ~Checkpoint() {}
};
} // namespace titandb
} // namespace rocksdb
...@@ -6,6 +6,8 @@ ...@@ -6,6 +6,8 @@
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
class VersionEdit;
struct TitanCFDescriptor { struct TitanCFDescriptor {
std::string name; std::string name;
TitanCFOptions options; TitanCFOptions options;
...@@ -108,6 +110,23 @@ class TitanDB : public StackableDB { ...@@ -108,6 +110,23 @@ class TitanDB : public StackableDB {
return Status::NotSupported("TitanDB doesn't support this operation"); return Status::NotSupported("TitanDB doesn't support this operation");
} }
using StackableDB::DisableFileDeletions;
Status DisableFileDeletions() override {
return Status::NotSupported("TitanDB doesn't support this operation");
}
using StackableDB::EnableFileDeletions;
Status EnableFileDeletions(bool /*force*/) override {
return Status::NotSupported("TitanDB doesn't support this operation");
}
// Get all files in /titandb directory after disable file deletions
// edits include all blob file records of every column family
virtual Status GetAllTitanFiles(std::vector<std::string>& /*files*/,
std::vector<VersionEdit>* /*edits*/) {
return Status::NotSupported("TitanDB doesn't support this operation");
}
using rocksdb::StackableDB::SingleDelete; using rocksdb::StackableDB::SingleDelete;
Status SingleDelete(const WriteOptions& /*wopts*/, Status SingleDelete(const WriteOptions& /*wopts*/,
ColumnFamilyHandle* /*column_family*/, ColumnFamilyHandle* /*column_family*/,
......
...@@ -153,13 +153,16 @@ Status BlobFileSet::OpenManifest(uint64_t file_number) { ...@@ -153,13 +153,16 @@ Status BlobFileSet::OpenManifest(uint64_t file_number) {
ImmutableDBOptions ioptions(db_options_); ImmutableDBOptions ioptions(db_options_);
s = SyncTitanManifest(env_, stats_, &ioptions, manifest_->file()); s = SyncTitanManifest(env_, stats_, &ioptions, manifest_->file());
} }
uint64_t old_manifest_file_number = manifest_file_number_;
if (s.ok()) { if (s.ok()) {
// Makes "CURRENT" file that points to the new manifest file. // Makes "CURRENT" file that points to the new manifest file.
s = SetCurrentFile(env_, dirname_, file_number, nullptr); s = SetCurrentFile(env_, dirname_, file_number, nullptr);
manifest_file_number_ = file_number;
} }
if (!s.ok()) { if (!s.ok()) {
manifest_.reset(); manifest_.reset();
manifest_file_number_ = old_manifest_file_number;
obsolete_manifests_.emplace_back(file_name); obsolete_manifests_.emplace_back(file_name);
} }
return s; return s;
...@@ -326,5 +329,45 @@ void BlobFileSet::GetObsoleteFiles(std::vector<std::string>* obsolete_files, ...@@ -326,5 +329,45 @@ void BlobFileSet::GetObsoleteFiles(std::vector<std::string>* obsolete_files,
obsolete_manifests_.clear(); obsolete_manifests_.clear();
} }
void BlobFileSet::GetAllFiles(std::vector<std::string>* files,
std::vector<VersionEdit>* edits) {
std::vector<std::string> all_blob_files;
edits->clear();
edits->reserve(column_families_.size());
// Saves global information
{
VersionEdit edit;
edit.SetNextFileNumber(next_file_number_.load());
std::string record;
edit.EncodeTo(&record);
edits->emplace_back(edit);
}
// Saves all blob files
for (auto& cf : column_families_) {
VersionEdit edit;
edit.SetColumnFamilyID(cf.first);
auto& blob_storage = cf.second;
blob_storage->GetAllFiles(&all_blob_files);
for (auto& file : blob_storage->files_) {
edit.AddBlobFile(file.second);
}
edits->emplace_back(edit);
}
files->clear();
files->reserve(all_blob_files.size() + 2);
for (auto& live_file : all_blob_files) {
files->emplace_back(live_file);
}
// Append current MANIFEST and CURRENT file name
files->emplace_back(DescriptorFileName("", manifest_file_number_));
files->emplace_back(CurrentFileName(""));
}
} // namespace titandb } // namespace titandb
} // namespace rocksdb } // namespace rocksdb
...@@ -82,6 +82,10 @@ class BlobFileSet { ...@@ -82,6 +82,10 @@ class BlobFileSet {
void GetObsoleteFiles(std::vector<std::string>* obsolete_files, void GetObsoleteFiles(std::vector<std::string>* obsolete_files,
SequenceNumber oldest_sequence); SequenceNumber oldest_sequence);
// REQUIRES: mutex is held
void GetAllFiles(std::vector<std::string>* files,
std::vector<VersionEdit>* edits);
// REQUIRES: mutex is held // REQUIRES: mutex is held
bool IsColumnFamilyObsolete(uint32_t cf_id) { bool IsColumnFamilyObsolete(uint32_t cf_id) {
return obsolete_columns_.count(cf_id) > 0; return obsolete_columns_.count(cf_id) > 0;
...@@ -118,6 +122,7 @@ class BlobFileSet { ...@@ -118,6 +122,7 @@ class BlobFileSet {
std::unordered_map<uint32_t, std::shared_ptr<BlobStorage>> column_families_; std::unordered_map<uint32_t, std::shared_ptr<BlobStorage>> column_families_;
std::unique_ptr<log::Writer> manifest_; std::unique_ptr<log::Writer> manifest_;
std::atomic<uint64_t> next_file_number_{1}; std::atomic<uint64_t> next_file_number_{1};
uint64_t manifest_file_number_;
}; };
} // namespace titandb } // namespace titandb
......
...@@ -187,6 +187,16 @@ void BlobStorage::GetObsoleteFiles(std::vector<std::string>* obsolete_files, ...@@ -187,6 +187,16 @@ void BlobStorage::GetObsoleteFiles(std::vector<std::string>* obsolete_files,
} }
} }
void BlobStorage::GetAllFiles(std::vector<std::string>* files) {
MutexLock l(&mutex_);
for (auto& file : files_) {
uint64_t file_number = file.first;
// relative to dirname
files->emplace_back(BlobFileName("", file_number));
}
}
void BlobStorage::ComputeGCScore() { void BlobStorage::ComputeGCScore() {
// TODO: no need to recompute all everytime // TODO: no need to recompute all everytime
MutexLock l(&mutex_); MutexLock l(&mutex_);
......
...@@ -104,6 +104,9 @@ class BlobStorage { ...@@ -104,6 +104,9 @@ class BlobStorage {
void GetObsoleteFiles(std::vector<std::string>* obsolete_files, void GetObsoleteFiles(std::vector<std::string>* obsolete_files,
SequenceNumber oldest_sequence); SequenceNumber oldest_sequence);
// Gets all files (start with '/titandb' prefix), including obsolete files.
void GetAllFiles(std::vector<std::string>* files);
// Mark the file as obsolete, and retrun value indicates whether the file is // Mark the file as obsolete, and retrun value indicates whether the file is
// founded. // founded.
bool MarkFileObsolete(uint64_t file_number, SequenceNumber obsolete_sequence); bool MarkFileObsolete(uint64_t file_number, SequenceNumber obsolete_sequence);
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "titan_build_version.h" #include "titan_build_version.h"
#include "titan_stats.h" #include "titan_stats.h"
#include "util/autovector.h" #include "util/autovector.h"
#include "util/mutexlock.h"
#include "util/threadpool_imp.h" #include "util/threadpool_imp.h"
namespace rocksdb { namespace rocksdb {
...@@ -765,6 +766,64 @@ void TitanDBImpl::ReleaseSnapshot(const Snapshot* snapshot) { ...@@ -765,6 +766,64 @@ void TitanDBImpl::ReleaseSnapshot(const Snapshot* snapshot) {
db_->ReleaseSnapshot(snapshot); db_->ReleaseSnapshot(snapshot);
} }
Status TitanDBImpl::DisableFileDeletions() {
// Disable base DB file deletions.
Status s = db_impl_->DisableFileDeletions();
if (!s.ok()) {
return s;
}
int count = 0;
{
// Hold delete_titandb_file_mutex_ to make sure no
// PurgeObsoleteFiles job is running.
MutexLock l(&delete_titandb_file_mutex_);
count = ++disable_titandb_file_deletions_;
}
ROCKS_LOG_INFO(db_options_.info_log,
"Disalbed blob file deletions. count: %d", count);
return Status::OK();
}
Status TitanDBImpl::EnableFileDeletions(bool force) {
// Enable base DB file deletions.
Status s = db_impl_->EnableFileDeletions(force);
if (!s.ok()) {
return s;
}
int count = 0;
{
MutexLock l(&delete_titandb_file_mutex_);
if (force) {
disable_titandb_file_deletions_ = 0;
} else if (disable_titandb_file_deletions_ > 0) {
count = --disable_titandb_file_deletions_;
}
assert(count >= 0);
}
ROCKS_LOG_INFO(db_options_.info_log, "Enabled blob file deletions. count: %d",
count);
return Status::OK();
}
Status TitanDBImpl::GetAllTitanFiles(std::vector<std::string>& files,
std::vector<VersionEdit>* edits) {
Status s = DisableFileDeletions();
if (!s.ok()) {
return s;
}
{
MutexLock l(&mutex_);
blob_file_set_->GetAllFiles(&files, edits);
}
return EnableFileDeletions(false);
}
Status TitanDBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family, Status TitanDBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
const RangePtr* ranges, size_t n, const RangePtr* ranges, size_t n,
bool include_end) { bool include_end) {
......
...@@ -110,6 +110,16 @@ class TitanDBImpl : public TitanDB { ...@@ -110,6 +110,16 @@ class TitanDBImpl : public TitanDB {
void ReleaseSnapshot(const Snapshot* snapshot) override; void ReleaseSnapshot(const Snapshot* snapshot) override;
using TitanDB::DisableFileDeletions;
Status DisableFileDeletions() override;
using TitanDB::EnableFileDeletions;
Status EnableFileDeletions(bool force) override;
using TitanDB::GetAllTitanFiles;
Status GetAllTitanFiles(std::vector<std::string>& files,
std::vector<VersionEdit>* edits) override;
Status DeleteFilesInRanges(ColumnFamilyHandle* column_family, Status DeleteFilesInRanges(ColumnFamilyHandle* column_family,
const RangePtr* ranges, size_t n, const RangePtr* ranges, size_t n,
bool include_end = true) override; bool include_end = true) override;
...@@ -324,6 +334,13 @@ class TitanDBImpl : public TitanDB { ...@@ -324,6 +334,13 @@ class TitanDBImpl : public TitanDB {
// REQUIRE: mutex_ held. // REQUIRE: mutex_ held.
int drop_cf_requests_ = 0; int drop_cf_requests_ = 0;
// PurgeObsoleteFiles, DisableFileDeletions and EnableFileDeletions block
// on the mutex to avoid contention.
mutable port::Mutex delete_titandb_file_mutex_;
// REQUIRES: access with delete_titandb_file_mutex_ held.
int disable_titandb_file_deletions_ = 0;
std::atomic_bool shuting_down_{false}; std::atomic_bool shuting_down_{false};
}; };
......
...@@ -5,6 +5,12 @@ namespace titandb { ...@@ -5,6 +5,12 @@ namespace titandb {
Status TitanDBImpl::PurgeObsoleteFilesImpl() { Status TitanDBImpl::PurgeObsoleteFilesImpl() {
Status s; Status s;
MutexLock delete_file_lock(&delete_titandb_file_mutex_);
if (disable_titandb_file_deletions_ > 0) {
return s;
}
std::vector<std::string> candidate_files; std::vector<std::string> candidate_files;
auto oldest_sequence = GetOldestSnapshotSequence(); auto oldest_sequence = GetOldestSnapshotSequence();
{ {
......
This diff is collapsed.
#pragma once
#include "file/filename.h"
#include "titan/checkpoint.h"
namespace rocksdb {
namespace titandb {
class VersionEdit;
class TitanCheckpointImpl : public Checkpoint {
public:
explicit TitanCheckpointImpl(TitanDB* db) : db_(db) {}
// Follow these steps to build an openable snapshot of TitanDB:
// (1) Create base db checkpoint.
// (2) Hard linked all existing blob files(live + obsolete) if the output
// directory is on the same filesystem, and copied otherwise.
// (3) Create MANIFEST file include all records about existing blob files.
// (4) Craft CURRENT file manually based on MANIFEST file number.
// This will include redundant blob files, but hopefully not a lot of them,
// and on restart Titan will recalculate GC stats and GC out those redundant
// blob files.
using Checkpoint::CreateCheckpoint;
virtual Status CreateCheckpoint(const std::string& base_checkpoint_dir,
const std::string& titan_checkpoint_dir = "",
uint64_t log_size_for_flush = 0) override;
// Checkpoint logic can be customized by providing callbacks for link, copy,
// or create.
Status CreateCustomCheckpoint(
const TitanDBOptions& titandb_options,
std::function<Status(const std::string& src_dirname,
const std::string& fname, FileType type)>
link_file_cb,
std::function<Status(const std::string& src_dirname,
const std::string& fname, uint64_t size_limit_bytes,
FileType type)>
copy_file_cb,
std::function<Status(const std::string& fname,
const std::string& contents, FileType type)>
create_file_cb,
uint64_t log_size_for_flush, const std::string full_private_path);
private:
void CleanStagingDirectory(const std::string& path, Logger* info_log);
// Create titan manifest file based on the content of VersionEdit
Status CreateTitanManifest(const std::string& file_name,
std::vector<VersionEdit>* edits);
private:
TitanDB* db_;
};
} // namespace titandb
} // namespace rocksdb
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment