Unverified Commit 86d3e059 authored by Connor's avatar Connor Committed by GitHub

Introduce edit collector (#19)

* introduce edit collector
Signed-off-by: 's avatarConnor1996 <zbk602423539@gmail.com>
parent 5f6c3c85
......@@ -522,7 +522,7 @@ Status BlobGCJob::DeleteInputBlobFiles() {
metrics_.blob_db_gc_num_files++;
edit.DeleteBlobFile(file->file_number(), obsolete_sequence);
}
s = version_set_->LogAndApply(&edit);
s = version_set_->LogAndApply(edit);
// TODO(@DorianZheng) Purge pending outputs
// base_db_->pending_outputs_.erase(handle->GetNumber());
return s;
......
......@@ -9,6 +9,7 @@
#include "rocksdb/status.h"
#include "titan/options.h"
#include "titan_stats.h"
#include "version_edit.h"
#include "version_set.h"
namespace rocksdb {
......
......@@ -51,7 +51,7 @@ class BlobStorage {
std::unique_ptr<BlobFilePrefetcher>* result);
// Finds the blob file meta for the specified file number. It is a
// corruption if the file doesn't exist in the specific version.
// corruption if the file doesn't exist.
std::weak_ptr<BlobFileMeta> FindFile(uint64_t file_number) const;
std::size_t NumBlobFiles() const {
......@@ -86,6 +86,8 @@ class BlobStorage {
void ComputeGCScore();
const TitanDBOptions& db_options() { return db_options_; }
const TitanCFOptions& cf_options() { return cf_options_; }
void AddBlobFile(std::shared_ptr<BlobFileMeta>& file);
......
......@@ -71,7 +71,7 @@ class TitanDBImpl::FileManager : public BlobFileManager {
{
MutexLock l(&db_->mutex_);
s = db_->vset_->LogAndApply(&edit);
s = db_->vset_->LogAndApply(edit);
for (const auto& file : files)
db_->pending_outputs_.erase(file.second->GetNumber());
}
......
#pragma once
#include <unordered_map>
#include "util/string_util.h"
#include "version_edit.h"
#include "version_set.h"
#include <inttypes.h>
namespace rocksdb {
namespace titandb {
class EditCollector {
public:
Status AddEdit(const VersionEdit& edit) {
auto cf_id = edit.column_family_id_;
auto& collector = column_families_[cf_id];
Status s;
for (auto& file : edit.added_files_) {
s = collector.AddFile(file);
if (!s.ok()) return s;
}
for (auto& file : edit.deleted_files_) {
collector.DeleteFile(file.first, file.second);
if (!s.ok()) return s;
}
if (edit.has_next_file_number_) {
if (edit.next_file_number_ < next_file_number_) {
return Status::Corruption("Edit has a smaller next file number " +
ToString(edit.next_file_number_) +
" than current " +
ToString(next_file_number_));
}
next_file_number_ = edit.next_file_number_;
has_next_file_number_ = true;
}
return Status::OK();
}
Status Apply(VersionSet& vset) {
for (auto& cf : column_families_) {
auto cf_id = cf.first;
auto storage = vset.GetBlobStorage(cf_id).lock();
if (!storage) {
// TODO: support OpenForReadOnly which doesn't open DB with all column
// family so there are maybe some invalid column family, but we can't
// just skip it otherwise blob files of the non-open column families
// will be regarded as obsolete and deleted.
continue;
}
Status s = cf.second.Apply(storage);
if (!s.ok()) return s;
}
return Status::OK();
}
Status GetNextFileNumber(uint64_t* next_file_number) {
if (has_next_file_number_) {
*next_file_number = next_file_number_;
return Status::OK();
}
return Status::Corruption("No next file number in manifest file");
}
private:
class CFEditCollector {
public:
Status AddFile(const std::shared_ptr<BlobFileMeta>& file) {
auto number = file->file_number();
if (added_files_.count(number) > 0) {
return Status::Corruption("Blob file " + ToString(number) +
" has been added twice");
}
added_files_.emplace(number, file);
return Status::OK();
}
Status DeleteFile(uint64_t number, SequenceNumber obsolete_sequence) {
if (deleted_files_.count(number) > 0) {
return Status::Corruption("Blob file " + ToString(number) +
" has been deleted twice");
}
deleted_files_.emplace(number, obsolete_sequence);
return Status::OK();
}
Status Apply(shared_ptr<BlobStorage>& storage) {
for (auto& file : added_files_) {
auto number = file.first;
auto blob = storage->FindFile(number).lock();
if (blob) {
if (blob->is_obsolete()) {
ROCKS_LOG_ERROR(storage->db_options().info_log,
"blob file %" PRIu64 " has been deleted before\n",
number);
return Status::Corruption("Blob file " + ToString(number) +
" has been deleted before");
} else {
ROCKS_LOG_ERROR(storage->db_options().info_log,
"blob file %" PRIu64 " has been added before\n",
number);
return Status::Corruption("Blob file " + ToString(number) +
" has been added before");
}
}
storage->AddBlobFile(file.second);
}
for (auto& file : deleted_files_) {
auto number = file.first;
auto blob = storage->FindFile(number).lock();
if (!blob) {
ROCKS_LOG_ERROR(storage->db_options().info_log,
"blob file %" PRIu64 " doesn't exist before\n",
number);
return Status::Corruption("Blob file " + ToString(number) +
" doesn't exist before");
} else if (blob->is_obsolete()) {
ROCKS_LOG_ERROR(storage->db_options().info_log,
"blob file %" PRIu64 " has been deleted already\n",
number);
return Status::Corruption("Blob file " + ToString(number) +
" has been deleted already");
}
storage->MarkFileObsolete(blob, file.second);
}
storage->ComputeGCScore();
return Status::OK();
}
private:
std::unordered_map<uint64_t, std::shared_ptr<BlobFileMeta>> added_files_;
std::unordered_map<uint64_t, SequenceNumber> deleted_files_;
};
bool has_next_file_number_{false};
uint64_t next_file_number_{0};
std::unordered_map<uint32_t, CFEditCollector> column_families_;
};
} // namespace titandb
} // namespace rocksdb
\ No newline at end of file
......@@ -5,6 +5,8 @@
#include "blob_format.h"
#include "rocksdb/slice.h"
#include <inttypes.h>
namespace rocksdb {
namespace titandb {
......@@ -33,6 +35,7 @@ class VersionEdit {
private:
friend class VersionSet;
friend class EditCollector;
bool has_next_file_number_{false};
uint64_t next_file_number_{0};
......
......@@ -2,8 +2,8 @@
#include <inttypes.h>
#include "edit_collector.h"
#include "util/filename.h"
#include "util/string_util.h"
namespace rocksdb {
namespace titandb {
......@@ -66,9 +66,6 @@ Status VersionSet::Recover() {
file.reset(new SequentialFileReader(std::move(f), file_name));
}
bool has_next_file_number = false;
uint64_t next_file_number = 0;
// Reads edits from the manifest and applies them one by one.
{
LogReporter reporter;
......@@ -77,25 +74,22 @@ Status VersionSet::Recover() {
0 /*initial_offset*/, 0);
Slice record;
std::string scratch;
EditCollector collector;
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
VersionEdit edit;
s = DecodeInto(record, &edit);
if (!s.ok()) return s;
s = Apply(&edit);
s = collector.AddEdit(edit);
if (!s.ok()) return s;
if (edit.has_next_file_number_) {
assert(edit.next_file_number_ >= next_file_number);
next_file_number = edit.next_file_number_;
has_next_file_number = true;
}
}
s = collector.Apply(*this);
if (!s.ok()) return s;
uint64_t next_file_number = 0;
s = collector.GetNextFileNumber(&next_file_number);
if (!s.ok()) return s;
next_file_number_.store(next_file_number);
}
if (!has_next_file_number) {
return Status::Corruption("no next file number in manifest file");
}
next_file_number_.store(next_file_number);
auto new_manifest_file_number = NewFileNumber();
s = OpenManifest(new_manifest_file_number);
if (!s.ok()) return s;
......@@ -193,11 +187,11 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
return s;
}
Status VersionSet::LogAndApply(VersionEdit* edit) {
Status VersionSet::LogAndApply(VersionEdit& edit) {
// TODO(@huachao): write manifest file unlocked
std::string record;
edit->SetNextFileNumber(next_file_number_.load());
edit->EncodeTo(&record);
edit.SetNextFileNumber(next_file_number_.load());
edit.EncodeTo(&record);
Status s = manifest_->AddRecord(record);
if (s.ok()) {
ImmutableDBOptions ioptions(db_options_);
......@@ -205,51 +199,9 @@ Status VersionSet::LogAndApply(VersionEdit* edit) {
}
if (!s.ok()) return s;
return Apply(edit);
}
Status VersionSet::Apply(VersionEdit* edit) {
auto cf_id = edit->column_family_id_;
auto it = column_families_.find(cf_id);
if (it == column_families_.end()) {
// TODO: support OpenForReadOnly which doesn't open DB with all column
// family so there are maybe some invalid column family, but we can't just
// skip it otherwise blob files of the non-open column families will be
// regarded as obsolete and deleted.
return Status::OK();
}
auto& files = it->second->files_;
for (auto& file : edit->deleted_files_) {
auto number = file.first;
auto blob_it = files.find(number);
if (blob_it == files.end()) {
return Status::Corruption("Blob file " + ToString(number) +
" doesn't exist before.");
} else if (blob_it->second->is_obsolete()) {
return Status::Corruption("Blob file " + ToString(number) +
" to delete has been deleted before.");
}
it->second->MarkFileObsolete(blob_it->second, file.second);
}
for (auto& file : edit->added_files_) {
auto number = file->file_number();
auto blob_it = files.find(number);
if (blob_it != files.end()) {
if (blob_it->second->is_obsolete()) {
return Status::Corruption("Blob file " + ToString(number) +
" to add has been deleted before.");
} else {
return Status::Corruption("Blob file " + ToString(number) +
" has been added before.");
}
}
it->second->AddBlobFile(file);
}
it->second->ComputeGCScore();
return Status::OK();
EditCollector collector;
collector.AddEdit(edit);
return collector.Apply(*this);
}
void VersionSet::AddColumnFamilies(
......@@ -277,7 +229,7 @@ Status VersionSet::DropColumnFamilies(
file.second->file_number());
edit.DeleteBlobFile(file.first, obsolete_sequence);
}
s = LogAndApply(&edit);
s = LogAndApply(edit);
if (!s.ok()) return s;
} else {
ROCKS_LOG_ERROR(db_options_.info_log, "column %u not found for drop\n",
......
......@@ -34,7 +34,7 @@ class VersionSet {
// Applies *edit on the current version to form a new version that is
// both saved to the manifest and installed as the new current version.
// REQUIRES: mutex is held
Status LogAndApply(VersionEdit* edit);
Status LogAndApply(VersionEdit& edit);
// Adds some column families with the specified options.
// REQUIRES: mutex is held
......@@ -85,8 +85,6 @@ class VersionSet {
Status WriteSnapshot(log::Writer* log);
Status Apply(VersionEdit* edit);
std::string dirname_;
Env* env_;
EnvOptions env_options_;
......
#include "edit_collector.h"
#include "testutil.h"
#include "util.h"
#include "util/filename.h"
......@@ -78,9 +79,11 @@ class VersionTest : public testing::Test {
}
void BuildAndCheck(std::vector<VersionEdit> edits) {
EditCollector collector;
for (auto& edit : edits) {
ASSERT_OK(vset_->Apply(&edit));
ASSERT_OK(collector.AddEdit(edit));
}
ASSERT_OK(collector.Apply(*vset_.get()));
for (auto& it : vset_->column_families_) {
auto& storage = column_families_[it.first];
// ignore obsolete file
......@@ -185,7 +188,7 @@ TEST_F(VersionTest, ObsoleteFiles) {
{
auto add1_1_5 = AddBlobFilesEdit(1, 1, 5);
MutexLock l(&mutex_);
vset_->LogAndApply(&add1_1_5);
vset_->LogAndApply(add1_1_5);
}
std::vector<std::string> of;
vset_->GetObsoleteFiles(&of, kMaxSequenceNumber);
......@@ -193,7 +196,7 @@ TEST_F(VersionTest, ObsoleteFiles) {
{
auto del1_4_5 = DeleteBlobFilesEdit(1, 4, 5);
MutexLock l(&mutex_);
vset_->LogAndApply(&del1_4_5);
vset_->LogAndApply(del1_4_5);
}
vset_->GetObsoleteFiles(&of, kMaxSequenceNumber);
ASSERT_EQ(of.size(), 1);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment