Unverified Commit 817f576f authored by Connor's avatar Connor Committed by GitHub

Check edits before written to manifest (#37)

* check edits before written to manifest
Signed-off-by: 's avatarConnor1996 <zbk602423539@gmail.com>
parent 8ee2e794
...@@ -11,28 +11,36 @@ ...@@ -11,28 +11,36 @@
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
// A collector to apply edits in batch.
// The functions should be called in the sequence:
// AddEdit() -> Seal() -> Apply()
class EditCollector { class EditCollector {
public: public:
// Add the edit into the batch.
Status AddEdit(const VersionEdit& edit) { Status AddEdit(const VersionEdit& edit) {
if (sealed_)
return Status::Incomplete(
"Should be not called after Sealed() is called");
auto cf_id = edit.column_family_id_; auto cf_id = edit.column_family_id_;
auto& collector = column_families_[cf_id]; auto& collector = column_families_[cf_id];
Status s;
for (auto& file : edit.added_files_) { for (auto& file : edit.added_files_) {
s = collector.AddFile(file); status_ = collector.AddFile(file);
if (!s.ok()) return s; if (!status_.ok()) return status_;
} }
for (auto& file : edit.deleted_files_) { for (auto& file : edit.deleted_files_) {
collector.DeleteFile(file.first, file.second); status_ = collector.DeleteFile(file.first, file.second);
if (!s.ok()) return s; if (!status_.ok()) return status_;
} }
if (edit.has_next_file_number_) { if (edit.has_next_file_number_) {
if (edit.next_file_number_ < next_file_number_) { if (edit.next_file_number_ < next_file_number_) {
return Status::Corruption("Edit has a smaller next file number " + status_ =
ToString(edit.next_file_number_) + Status::Corruption("Edit has a smaller next file number " +
" than current " + ToString(edit.next_file_number_) +
ToString(next_file_number_)); " than current " + ToString(next_file_number_));
return status_;
} }
next_file_number_ = edit.next_file_number_; next_file_number_ = edit.next_file_number_;
has_next_file_number_ = true; has_next_file_number_ = true;
...@@ -40,7 +48,35 @@ class EditCollector { ...@@ -40,7 +48,35 @@ class EditCollector {
return Status::OK(); return Status::OK();
} }
// Seal the batch and check the validation of the edits.
Status Seal(VersionSet& vset) {
if (!status_.ok()) return status_;
for (auto& cf : column_families_) {
auto cf_id = cf.first;
auto storage = vset.GetBlobStorage(cf_id).lock();
if (!storage) {
// TODO: support OpenForReadOnly which doesn't open DB with all column
// family so there are maybe some invalid column family, but we can't
// just skip it otherwise blob files of the non-open column families
// will be regarded as obsolete and deleted.
continue;
}
status_ = cf.second.Seal(storage.get());
if (!status_.ok()) return status_;
}
sealed_ = true;
return Status::OK();
}
// Apply the edits of the batch.
Status Apply(VersionSet& vset) { Status Apply(VersionSet& vset) {
if (!status_.ok()) return status_;
if (!sealed_)
return Status::Incomplete(
"Should be not called until Sealed() is called");
for (auto& cf : column_families_) { for (auto& cf : column_families_) {
auto cf_id = cf.first; auto cf_id = cf.first;
auto storage = vset.GetBlobStorage(cf_id).lock(); auto storage = vset.GetBlobStorage(cf_id).lock();
...@@ -51,14 +87,16 @@ class EditCollector { ...@@ -51,14 +87,16 @@ class EditCollector {
// will be regarded as obsolete and deleted. // will be regarded as obsolete and deleted.
continue; continue;
} }
Status s = cf.second.Apply(storage); status_ = cf.second.Apply(storage.get());
if (!s.ok()) return s; if (!status_.ok()) return status_;
} }
return Status::OK(); return Status::OK();
} }
Status GetNextFileNumber(uint64_t* next_file_number) { Status GetNextFileNumber(uint64_t* next_file_number) {
if (!status_.ok()) return status_;
if (has_next_file_number_) { if (has_next_file_number_) {
*next_file_number = next_file_number_; *next_file_number = next_file_number_;
return Status::OK(); return Status::OK();
...@@ -88,7 +126,7 @@ class EditCollector { ...@@ -88,7 +126,7 @@ class EditCollector {
return Status::OK(); return Status::OK();
} }
Status Apply(shared_ptr<BlobStorage>& storage) { Status Seal(BlobStorage* storage) {
for (auto& file : added_files_) { for (auto& file : added_files_) {
auto number = file.first; auto number = file.first;
auto blob = storage->FindFile(number).lock(); auto blob = storage->FindFile(number).lock();
...@@ -107,11 +145,13 @@ class EditCollector { ...@@ -107,11 +145,13 @@ class EditCollector {
" has been added before"); " has been added before");
} }
} }
storage->AddBlobFile(file.second);
} }
for (auto& file : deleted_files_) { for (auto& file : deleted_files_) {
auto number = file.first; auto number = file.first;
if (added_files_.count(number) > 0) {
continue;
}
auto blob = storage->FindFile(number).lock(); auto blob = storage->FindFile(number).lock();
if (!blob) { if (!blob) {
ROCKS_LOG_ERROR(storage->db_options().info_log, ROCKS_LOG_ERROR(storage->db_options().info_log,
...@@ -126,6 +166,31 @@ class EditCollector { ...@@ -126,6 +166,31 @@ class EditCollector {
return Status::Corruption("Blob file " + ToString(number) + return Status::Corruption("Blob file " + ToString(number) +
" has been deleted already"); " has been deleted already");
} }
}
return Status::OK();
}
Status Apply(BlobStorage* storage) {
for (auto& file : added_files_) {
// just skip paired added and deleted files
if (deleted_files_.count(file.first) > 0) {
continue;
}
storage->AddBlobFile(file.second);
}
for (auto& file : deleted_files_) {
auto number = file.first;
// just skip paired added and deleted files
if (added_files_.count(number) > 0) {
continue;
}
auto blob = storage->FindFile(number).lock();
if (!blob) {
return Status::NotFound("Invalid file number " +
std::to_string(number));
}
storage->MarkFileObsolete(blob, file.second); storage->MarkFileObsolete(blob, file.second);
} }
...@@ -138,6 +203,9 @@ class EditCollector { ...@@ -138,6 +203,9 @@ class EditCollector {
std::unordered_map<uint64_t, SequenceNumber> deleted_files_; std::unordered_map<uint64_t, SequenceNumber> deleted_files_;
}; };
Status status_{Status::OK()};
//
bool sealed_{false};
bool has_next_file_number_{false}; bool has_next_file_number_{false};
uint64_t next_file_number_{0}; uint64_t next_file_number_{0};
std::unordered_map<uint32_t, CFEditCollector> column_families_; std::unordered_map<uint32_t, CFEditCollector> column_families_;
......
...@@ -98,6 +98,10 @@ class TitanDBTest : public testing::Test { ...@@ -98,6 +98,10 @@ class TitanDBTest : public testing::Test {
} }
} }
Status LogAndApply(VersionEdit& edit) {
return db_impl_->vset_->LogAndApply(edit);
}
void Put(uint64_t k, std::map<std::string, std::string>* data = nullptr) { void Put(uint64_t k, std::map<std::string, std::string>* data = nullptr) {
WriteOptions wopts; WriteOptions wopts;
std::string key = GenKey(k); std::string key = GenKey(k);
...@@ -448,6 +452,32 @@ TEST_F(TitanDBTest, DropColumnFamily) { ...@@ -448,6 +452,32 @@ TEST_F(TitanDBTest, DropColumnFamily) {
Close(); Close();
} }
TEST_F(TitanDBTest, VersionEditError) {
Open();
std::map<std::string, std::string> data;
Put(1, &data);
ASSERT_EQ(1, data.size());
VerifyDB(data);
auto cf_id = db_->DefaultColumnFamily()->GetID();
VersionEdit edit;
edit.SetColumnFamilyID(cf_id);
edit.AddBlobFile(std::make_shared<BlobFileMeta>(1, 1));
ASSERT_OK(LogAndApply(edit));
VerifyDB(data);
// add same blob file twice
VersionEdit edit1;
edit1.SetColumnFamilyID(cf_id);
edit1.AddBlobFile(std::make_shared<BlobFileMeta>(1, 1));
ASSERT_NOK(LogAndApply(edit));
Reopen();
VerifyDB(data);
}
#ifndef NDEBUG #ifndef NDEBUG
TEST_F(TitanDBTest, BlobFileIOError) { TEST_F(TitanDBTest, BlobFileIOError) {
std::unique_ptr<TitanFaultInjectionTestEnv> mock_env( std::unique_ptr<TitanFaultInjectionTestEnv> mock_env(
......
...@@ -82,8 +82,11 @@ Status VersionSet::Recover() { ...@@ -82,8 +82,11 @@ Status VersionSet::Recover() {
s = collector.AddEdit(edit); s = collector.AddEdit(edit);
if (!s.ok()) return s; if (!s.ok()) return s;
} }
s = collector.Seal(*this);
if (!s.ok()) return s;
s = collector.Apply(*this); s = collector.Apply(*this);
if (!s.ok()) return s; if (!s.ok()) return s;
uint64_t next_file_number = 0; uint64_t next_file_number = 0;
s = collector.GetNextFileNumber(&next_file_number); s = collector.GetNextFileNumber(&next_file_number);
if (!s.ok()) return s; if (!s.ok()) return s;
...@@ -209,17 +212,19 @@ Status VersionSet::LogAndApply(VersionEdit& edit) { ...@@ -209,17 +212,19 @@ Status VersionSet::LogAndApply(VersionEdit& edit) {
std::string record; std::string record;
edit.SetNextFileNumber(next_file_number_.load()); edit.SetNextFileNumber(next_file_number_.load());
edit.EncodeTo(&record); edit.EncodeTo(&record);
Status s = manifest_->AddRecord(record);
if (s.ok()) { EditCollector collector;
ImmutableDBOptions ioptions(db_options_); Status s = collector.AddEdit(edit);
s = SyncManifest(env_, &ioptions, manifest_->file()); if (!s.ok()) return s;
} s = collector.Seal(*this);
if (s.ok()) { if (!s.ok()) return s;
EditCollector collector; s = manifest_->AddRecord(record);
collector.AddEdit(edit); if (!s.ok()) return s;
s = collector.Apply(*this);
} ImmutableDBOptions ioptions(db_options_);
return s; s = SyncManifest(env_, &ioptions, manifest_->file());
if (!s.ok()) return s;
return collector.Apply(*this);
} }
void VersionSet::AddColumnFamilies( void VersionSet::AddColumnFamilies(
......
...@@ -83,6 +83,7 @@ class VersionTest : public testing::Test { ...@@ -83,6 +83,7 @@ class VersionTest : public testing::Test {
for (auto& edit : edits) { for (auto& edit : edits) {
ASSERT_OK(collector.AddEdit(edit)); ASSERT_OK(collector.AddEdit(edit));
} }
ASSERT_OK(collector.Seal(*vset_.get()));
ASSERT_OK(collector.Apply(*vset_.get())); ASSERT_OK(collector.Apply(*vset_.get()));
for (auto& it : vset_->column_families_) { for (auto& it : vset_->column_families_) {
auto& storage = column_families_[it.first]; auto& storage = column_families_[it.first];
...@@ -141,6 +142,57 @@ VersionEdit DeleteBlobFilesEdit(uint32_t cf_id, uint64_t start, uint64_t end) { ...@@ -141,6 +142,57 @@ VersionEdit DeleteBlobFilesEdit(uint32_t cf_id, uint64_t start, uint64_t end) {
return edit; return edit;
} }
TEST_F(VersionTest, InvalidEdit) {
// init state
{
auto add1_0_4 = AddBlobFilesEdit(1, 0, 4);
EditCollector collector;
ASSERT_OK(collector.AddEdit(add1_0_4));
ASSERT_OK(collector.Seal(*vset_.get()));
ASSERT_OK(collector.Apply(*vset_.get()));
}
// delete nonexistent blobs
{
auto del1_4_6 = DeleteBlobFilesEdit(1, 4, 6);
EditCollector collector;
ASSERT_OK(collector.AddEdit(del1_4_6));
ASSERT_NOK(collector.Seal(*vset_.get()));
ASSERT_NOK(collector.Apply(*vset_.get()));
}
// add already existing blobs
{
auto add1_1_3 = AddBlobFilesEdit(1, 1, 3);
EditCollector collector;
ASSERT_OK(collector.AddEdit(add1_1_3));
ASSERT_NOK(collector.Seal(*vset_.get()));
ASSERT_NOK(collector.Apply(*vset_.get()));
}
// add same blobs
{
auto add1_4_5_1 = AddBlobFilesEdit(1, 4, 5);
auto add1_4_5_2 = AddBlobFilesEdit(1, 4, 5);
EditCollector collector;
ASSERT_OK(collector.AddEdit(add1_4_5_1));
ASSERT_NOK(collector.AddEdit(add1_4_5_2));
ASSERT_NOK(collector.Seal(*vset_.get()));
ASSERT_NOK(collector.Apply(*vset_.get()));
}
// delete same blobs
{
auto del1_3_4_1 = DeleteBlobFilesEdit(1, 3, 4);
auto del1_3_4_2 = DeleteBlobFilesEdit(1, 3, 4);
EditCollector collector;
ASSERT_OK(collector.AddEdit(del1_3_4_1));
ASSERT_NOK(collector.AddEdit(del1_3_4_2));
ASSERT_NOK(collector.Seal(*vset_.get()));
ASSERT_NOK(collector.Apply(*vset_.get()));
}
}
TEST_F(VersionTest, VersionBuilder) { TEST_F(VersionTest, VersionBuilder) {
// {(0, 4)}, {} // {(0, 4)}, {}
auto add1_0_4 = AddBlobFilesEdit(1, 0, 4); auto add1_0_4 = AddBlobFilesEdit(1, 0, 4);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment