Unverified Commit af4cd023 authored by Connor's avatar Connor Committed by GitHub

Add delete blob files in ranges (#51)

* add delete blobs in ranges
Signed-off-by: 's avatarConnor1996 <zbk602423539@gmail.com>
parent 69d79667
......@@ -284,6 +284,30 @@ Status BlobFileSet::MaybeDestroyColumnFamily(uint32_t cf_id) {
return Status::NotFound("invalid column family");
}
Status BlobFileSet::DeleteBlobFilesInRanges(uint32_t cf_id,
const RangePtr* ranges, size_t n,
bool include_end,
SequenceNumber obsolete_sequence) {
auto it = column_families_.find(cf_id);
if (it != column_families_.end()) {
VersionEdit edit;
edit.SetColumnFamilyID(cf_id);
std::vector<uint64_t> files;
Status s = it->second->GetBlobFilesInRanges(ranges, n, include_end, &files);
if (!s.ok()) return s;
for (auto file_number : files) {
edit.DeleteBlobFile(file_number);
}
s = LogAndApply(edit);
return s;
}
ROCKS_LOG_ERROR(db_options_.info_log,
"column %u not found for delete blob files in ranges\n",
cf_id);
return Status::NotFound("invalid column family");
}
void BlobFileSet::GetObsoleteFiles(std::vector<std::string>* obsolete_files,
SequenceNumber oldest_sequence) {
for (auto it = column_families_.begin(); it != column_families_.end();) {
......
......@@ -53,6 +53,12 @@ class BlobFileSet {
// REQUIRES: mutex is held
Status MaybeDestroyColumnFamily(uint32_t cf_id);
// Logical deletes all the blobs within the ranges.
// REQUIRES: mutex is held
Status DeleteBlobFilesInRanges(uint32_t cf_id, const RangePtr* ranges,
size_t n, bool include_end,
SequenceNumber obsolete_sequence);
// Allocates a new file number.
uint64_t NewFileNumber() { return next_file_number_.fetch_add(1); }
......
......@@ -465,6 +465,9 @@ TEST_F(BlobGCJobTest, DeleteFilesInRange) {
Flush();
CompactAll();
std::string value;
// Now the LSM structure is:
// L6: [2, 4]
// with 1 alive blob file
ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level0", &value));
ASSERT_EQ(value, "0");
ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level6", &value));
......@@ -489,6 +492,10 @@ TEST_F(BlobGCJobTest, DeleteFilesInRange) {
ASSERT_OK(sst_file_writer.Put(GenKey(2), GenValue(22)));
ASSERT_OK(sst_file_writer.Finish());
ASSERT_OK(db_->IngestExternalFile({sst_file}, IngestExternalFileOptions()));
// Now the LSM structure is:
// L5: [1, 2]
// L6: [2, 4]
// with 1 alive blob file
ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level0", &value));
ASSERT_EQ(value, "0");
ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level5", &value));
......@@ -496,6 +503,11 @@ TEST_F(BlobGCJobTest, DeleteFilesInRange) {
ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level6", &value));
ASSERT_EQ(value, "1");
// GC and purge blob file
// Now the LSM structure is:
// L5: [1, 2]
// L6: [2, 4]
// with 0 blob file
CheckBlobNumber(1);
RunGC(true);
......@@ -506,15 +518,21 @@ TEST_F(BlobGCJobTest, DeleteFilesInRange) {
Slice end = Slice(key3);
ASSERT_OK(
DeleteFilesInRange(base_db_, db_->DefaultColumnFamily(), &start, &end));
// Now the LSM structure is:
// L6: [2, 4]
// with 0 blob file
TitanReadOptions opts;
auto* iter = db_->NewIterator(opts, db_->DefaultColumnFamily());
iter->SeekToFirst();
while (iter->Valid()) {
iter->Next();
}
// `DeleteFilesInRange` may expose old blob index.
ASSERT_TRUE(iter->status().IsCorruption());
delete iter;
// Set key only to ignore the stale blob indexes.
opts.key_only = true;
iter = db_->NewIterator(opts, db_->DefaultColumnFamily());
iter->SeekToFirst();
......
......@@ -24,6 +24,44 @@ Status BlobStorage::NewPrefetcher(uint64_t file_number,
result);
}
Status BlobStorage::GetBlobFilesInRanges(const RangePtr* ranges, size_t n,
bool include_end,
std::vector<uint64_t>* files) {
MutexLock l(&mutex_);
for (size_t i = 0; i < n; i++) {
const Slice* begin = ranges[i].start;
const Slice* end = ranges[i].limit;
auto cmp = cf_options_.comparator;
// nullptr means the minimum or maximum.
for (auto it = ((begin != nullptr) ? blob_ranges_.lower_bound(*begin)
: blob_ranges_.begin());
it != ((end != nullptr) ? blob_ranges_.upper_bound(*end)
: blob_ranges_.end());
it++) {
// Obsolete files are to be deleted, so just skip.
if (it->second->is_obsolete()) continue;
// The smallest and largest key of blob file meta of the old version are
// empty, so skip.
if (it->second->largest_key().empty() && end) continue;
if ((end == nullptr) ||
(include_end && cmp->Compare(it->second->largest_key(), *end) <= 0) ||
(!include_end && cmp->Compare(it->second->largest_key(), *end) < 0)) {
files->push_back(it->second->file_number());
}
assert(it->second->smallest_key().empty() ||
(!begin || cmp->Compare(it->second->smallest_key(), *begin) >= 0));
}
ROCKS_LOG_INFO(db_options_.info_log,
"Get %" PRIuPTR " blob files in the range [%s, %s%c",
files->size(), begin ? begin->ToString(true).c_str() : " ",
end ? end->ToString(true).c_str() : " ",
include_end ? ']' : ')');
}
return Status::OK();
}
std::weak_ptr<BlobFileMeta> BlobStorage::FindFile(uint64_t file_number) const {
MutexLock l(&mutex_);
auto it = files_.find(file_number);
......@@ -45,14 +83,27 @@ void BlobStorage::ExportBlobFiles(
void BlobStorage::AddBlobFile(std::shared_ptr<BlobFileMeta>& file) {
MutexLock l(&mutex_);
files_.emplace(std::make_pair(file->file_number(), file));
blob_ranges_.emplace(std::make_pair(file->smallest_key(), file));
AddStats(stats_, cf_id_, TitanInternalStats::LIVE_BLOB_FILE_SIZE,
file->file_size());
AddStats(stats_, cf_id_, TitanInternalStats::NUM_LIVE_BLOB_FILE, 1);
}
void BlobStorage::MarkFileObsolete(std::shared_ptr<BlobFileMeta> file,
bool BlobStorage::MarkFileObsolete(uint64_t file_number,
SequenceNumber obsolete_sequence) {
MutexLock l(&mutex_);
auto file = files_.find(file_number);
if (file == files_.end()) {
return false;
}
MarkFileObsoleteLocked(file->second, obsolete_sequence);
return true;
}
void BlobStorage::MarkFileObsoleteLocked(std::shared_ptr<BlobFileMeta> file,
SequenceNumber obsolete_sequence) {
mutex_.AssertHeld();
obsolete_files_.push_back(
std::make_pair(file->file_number(), obsolete_sequence));
file->FileStateTransit(BlobFileMeta::FileEvent::kDelete);
......@@ -66,6 +117,29 @@ void BlobStorage::MarkFileObsolete(std::shared_ptr<BlobFileMeta> file,
AddStats(stats_, cf_id_, TitanInternalStats::NUM_OBSOLETE_BLOB_FILE, 1);
}
bool BlobStorage::RemoveFile(uint64_t file_number) {
mutex_.AssertHeld();
auto file = files_.find(file_number);
if (file == files_.end()) {
return false;
}
// Removes from blob_ranges_
auto p = blob_ranges_.equal_range(file->second->smallest_key());
for (auto it = p.first; it != p.second; it++) {
if (it->second->file_number() == file->second->file_number()) {
it = blob_ranges_.erase(it);
break;
}
}
SubStats(stats_, cf_id_, TitanInternalStats::OBSOLETE_BLOB_FILE_SIZE,
file->second->file_size());
SubStats(stats_, cf_id_, TitanInternalStats::NUM_OBSOLETE_BLOB_FILE, 1);
files_.erase(file_number);
file_cache_->Evict(file_number);
return true;
}
void BlobStorage::GetObsoleteFiles(std::vector<std::string>* obsolete_files,
SequenceNumber oldest_sequence) {
MutexLock l(&mutex_);
......@@ -80,14 +154,8 @@ void BlobStorage::GetObsoleteFiles(std::vector<std::string>* obsolete_files,
// visible to all existing snapshots.
if (oldest_sequence > obsolete_sequence) {
// remove obsolete files
auto p = files_.find(file_number);
assert(p != files_.end());
file_dropped++;
file_dropped_size += p->second->file_size();
files_.erase(p);
file_cache_->Evict(file_number);
bool __attribute__((__unused__)) removed = RemoveFile(file_number);
assert(removed);
ROCKS_LOG_INFO(db_options_.info_log,
"Obsolete blob file %" PRIu64 " (obsolete at %" PRIu64
") not visible to oldest snapshot %" PRIu64 ", delete it.",
......@@ -95,7 +163,6 @@ void BlobStorage::GetObsoleteFiles(std::vector<std::string>* obsolete_files,
if (obsolete_files) {
obsolete_files->emplace_back(
BlobFileName(db_options_.dirname, file_number));
// TODO: add obsolete files count metrics
}
it = obsolete_files_.erase(it);
......@@ -103,10 +170,6 @@ void BlobStorage::GetObsoleteFiles(std::vector<std::string>* obsolete_files,
}
++it;
}
SubStats(stats_, cf_id_, TitanInternalStats::OBSOLETE_BLOB_FILE_SIZE,
file_dropped_size);
SubStats(stats_, cf_id_, TitanInternalStats::NUM_OBSOLETE_BLOB_FILE,
file_dropped);
}
void BlobStorage::ComputeGCScore() {
......
......@@ -31,6 +31,7 @@ class BlobStorage {
: db_options_(_db_options),
cf_options_(_cf_options),
cf_id_(cf_id),
blob_ranges_(InternalComparator(_cf_options.comparator)),
file_cache_(_file_cache),
destroyed_(false),
stats_(stats) {}
......@@ -41,6 +42,15 @@ class BlobStorage {
}
}
const TitanDBOptions& db_options() { return db_options_; }
const TitanCFOptions& cf_options() { return cf_options_; }
const std::vector<GCScore> gc_score() {
MutexLock l(&mutex_);
return gc_score_;
}
// Gets the blob record pointed by the blob index. The provided
// buffer is used to store the record data, so the buffer must be
// valid when the record is used.
......@@ -51,18 +61,15 @@ class BlobStorage {
Status NewPrefetcher(uint64_t file_number,
std::unique_ptr<BlobFilePrefetcher>* result);
// Get all the blob files within the ranges.
Status GetBlobFilesInRanges(const RangePtr* ranges, size_t n,
bool include_end, std::vector<uint64_t>* files);
// Finds the blob file meta for the specified file number. It is a
// corruption if the file doesn't exist.
std::weak_ptr<BlobFileMeta> FindFile(uint64_t file_number) const;
std::size_t NumBlobFiles() const {
MutexLock l(&mutex_);
return files_.size();
}
void ExportBlobFiles(
std::map<uint64_t, std::weak_ptr<BlobFileMeta>>& ret) const;
// Marks all the blob files so that they can be picked by GC job.
void MarkAllFilesForGC() {
MutexLock l(&mutex_);
for (auto& file : files_) {
......@@ -73,34 +80,52 @@ class BlobStorage {
}
}
// The corresponding column family is dropped, so mark destroyed and we can
// remove this blob storage later.
void MarkDestroyed() {
MutexLock l(&mutex_);
destroyed_ = true;
}
// Returns whether this blob storage can be deleted now.
bool MaybeRemove() const {
MutexLock l(&mutex_);
return destroyed_ && obsolete_files_.empty();
}
const std::vector<GCScore> gc_score() {
MutexLock l(&mutex_);
return gc_score_;
}
// Computes GC score.
void ComputeGCScore();
const TitanDBOptions& db_options() { return db_options_; }
const TitanCFOptions& cf_options() { return cf_options_; }
// Add a new blob file to this blob storage.
void AddBlobFile(std::shared_ptr<BlobFileMeta>& file);
// Gets all obsolete blob files whose obsolete_sequence is smaller than the
// oldest_sequence. Note that the files returned would be erased from internal
// structure, so for the next call, the files returned before wouldn't be
// returned again.
void GetObsoleteFiles(std::vector<std::string>* obsolete_files,
SequenceNumber oldest_sequence);
void MarkFileObsolete(std::shared_ptr<BlobFileMeta> file,
SequenceNumber obsolete_sequence);
// Mark the file as obsolete, and retrun value indicates whether the file is
// founded.
bool MarkFileObsolete(uint64_t file_number, SequenceNumber obsolete_sequence);
// Returns the number of blob files, including obsolete files.
std::size_t NumBlobFiles() const {
MutexLock l(&mutex_);
return files_.size();
}
// Returns the number of obsolete blob files.
// TODO: use this method to calculate `kNumObsoleteBlobFile` DB property.
std::size_t NumObsoleteBlobFiles() const {
MutexLock l(&mutex_);
return obsolete_files_.size();
}
// Exports all blob files' meta. Only for tests.
void ExportBlobFiles(
std::map<uint64_t, std::weak_ptr<BlobFileMeta>>& ret) const;
private:
friend class BlobFileSet;
......@@ -109,6 +134,10 @@ class BlobStorage {
friend class BlobGCJobTest;
friend class BlobFileSizeCollectorTest;
void MarkFileObsoleteLocked(std::shared_ptr<BlobFileMeta> file,
SequenceNumber obsolete_sequence);
bool RemoveFile(uint64_t file_number);
TitanDBOptions db_options_;
TitanCFOptions cf_options_;
uint32_t cf_id_;
......@@ -117,6 +146,26 @@ class BlobStorage {
// Only BlobStorage OWNS BlobFileMeta
std::unordered_map<uint64_t, std::shared_ptr<BlobFileMeta>> files_;
class InternalComparator {
public:
// The default constructor is not supposed to be used.
// It is only to make std::multimap can compile.
InternalComparator() : comparator_(nullptr){};
explicit InternalComparator(const Comparator* comparator)
: comparator_(comparator){};
bool operator()(const Slice& key1, const Slice& key2) const {
assert(comparator_ != nullptr);
return comparator_->Compare(key1, key2) < 0;
}
private:
const Comparator* comparator_;
};
// smallest_key -> file_meta
std::multimap<const Slice, std::shared_ptr<BlobFileMeta>, InternalComparator>
blob_ranges_;
std::shared_ptr<BlobFileCache> file_cache_;
std::vector<GCScore> gc_score_;
......
......@@ -796,6 +796,11 @@ Status TitanDBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
if (!s.ok()) return s;
MutexLock l(&mutex_);
SequenceNumber obsolete_sequence = db_impl_->GetLatestSequenceNumber();
s = blob_file_set_->DeleteBlobFilesInRanges(cf_id, ranges, n, include_end,
obsolete_sequence);
if (!s.ok()) return s;
auto bs = blob_file_set_->GetBlobStorage(cf_id).lock();
if (!bs) {
// TODO: Should treat it as background error and make DB read-only.
......
......@@ -186,12 +186,10 @@ class EditCollector {
if (added_files_.count(number) > 0) {
continue;
}
auto blob = storage->FindFile(number).lock();
if (!blob) {
if (!storage->MarkFileObsolete(number, file.second)) {
return Status::NotFound("Invalid file number " +
std::to_string(number));
}
storage->MarkFileObsolete(blob, file.second);
}
storage->ComputeGCScore();
......
......@@ -137,12 +137,12 @@ void TitanTableBuilder::AddBlob(const Slice& key, const Slice& value,
index.EncodeTo(index_value);
if (blob_handle_->GetFile()->GetFileSize() >=
cf_options_.blob_file_target_size) {
FinishBlob();
FinishBlobFile();
}
}
}
void TitanTableBuilder::FinishBlob() {
void TitanTableBuilder::FinishBlobFile() {
if (blob_builder_) {
blob_builder_->Finish();
if (ok()) {
......@@ -179,7 +179,7 @@ Status TitanTableBuilder::status() const {
Status TitanTableBuilder::Finish() {
base_builder_->Finish();
FinishBlob();
FinishBlobFile();
status_ = blob_manager_->BatchFinishFiles(cf_id_, finished_blobs_);
if (!status_.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
......
......@@ -53,7 +53,7 @@ class TitanTableBuilder : public TableBuilder {
bool ShouldMerge(const std::shared_ptr<BlobFileMeta>& file);
void FinishBlob();
void FinishBlobFile();
void UpdateInternalOpStats();
......
......@@ -593,36 +593,35 @@ TEST_F(TitanDBTest, DeleteFilesInRange) {
// The LSM structure is:
// L0: [11, 21, 31] [41, 51] [61, 71, 81, 91]
// L6: [12, 22, 32] [42, 52] [62, 72, 82, 92]
// with 6 blob files
// with 6 alive blob files
ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level0", &value));
ASSERT_EQ(value, "3");
ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level6", &value));
ASSERT_EQ(value, "3");
std::string key40 = GenKey(40);
std::string key70 = GenKey(70);
std::string key80 = GenKey(80);
Slice start = Slice(key40);
Slice end = Slice(key70);
Slice end = Slice(key80);
DeleteFilesInRange(&start, &end);
// Now the LSM structure is:
// L0: [11, 21, 31] [41, 51] [61, 71, 81, 91]
// L6: [12, 22, 32] [62, 72, 82, 92]
// with 6 blob files
// with 4 alive blob files and 2 obsolete blob files
ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level0", &value));
ASSERT_EQ(value, "3");
ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level6", &value));
ASSERT_EQ(value, "2");
auto blob = GetBlobStorage(db_->DefaultColumnFamily());
auto before = blob.lock()->NumBlobFiles();
ASSERT_EQ(before, 6);
auto blob = GetBlobStorage(db_->DefaultColumnFamily()).lock();
ASSERT_EQ(blob->NumBlobFiles(), 6);
// These two files are marked obsolete directly by `DeleteBlobFilesInRanges`
ASSERT_EQ(blob->NumObsoleteBlobFiles(), 2);
ASSERT_OK(db_impl_->TEST_StartGC(db_->DefaultColumnFamily()->GetID()));
ASSERT_OK(db_impl_->TEST_PurgeObsoleteFiles());
// The blob file of deleted SST should be GCed.
ASSERT_EQ(before - 1, blob.lock()->NumBlobFiles());
ASSERT_EQ(blob->NumBlobFiles(), 4);
ASSERT_EQ(blob->NumObsoleteBlobFiles(), 0);
Close();
}
......
......@@ -284,6 +284,85 @@ TEST_F(VersionTest, ObsoleteFiles) {
CheckColumnFamiliesSize(8);
}
TEST_F(VersionTest, DeleteBlobsInRange) {
// The blob files' range are:
// 1:[00--------------------------------------------------------99]
// 2:[00----10]
// 3: [07---------------------55]
// 4: [25------50]
// 5: [25-------51]
// 6: [50------65]
// 7: [90----99]
// 8: [50-------------79]
// 9: [70---80]
// 10: [60----72]
// 11: [75-----------91]
// 12: [30--------------------------------85]
// 13: [50--------------80]
// 14:[]
auto metas = std::vector<std::pair<std::string, std::string>>{
std::make_pair("00", "99"), std::make_pair("00", "10"),
std::make_pair("07", "55"), std::make_pair("25", "50"),
std::make_pair("25", "51"), std::make_pair("50", "65"),
std::make_pair("90", "99"), std::make_pair("50", "79"),
std::make_pair("70", "80"), std::make_pair("60", "72"),
std::make_pair("75", "91"), std::make_pair("30", "85"),
std::make_pair("50", "80"), std::make_pair("", ""),
};
VersionEdit edit;
edit.SetColumnFamilyID(1);
for (size_t i = 0; i < metas.size(); i++) {
auto file = std::make_shared<BlobFileMeta>(i + 1, i + 1, 0, 0,
std::move(metas[i].first),
std::move(metas[i].second));
edit.AddBlobFile(file);
}
EditCollector collector;
ASSERT_OK(collector.AddEdit(edit));
ASSERT_OK(collector.Seal(*blob_file_set_.get()));
ASSERT_OK(collector.Apply(*blob_file_set_.get()));
Slice begin = Slice("50");
Slice end = Slice("80");
RangePtr range(&begin, &end);
auto blob = blob_file_set_->GetBlobStorage(1).lock();
blob_file_set_->DeleteBlobFilesInRanges(1, &range, 1, false /* include_end */,
0);
ASSERT_EQ(blob->NumBlobFiles(), metas.size());
// obsolete files: 6, 8, 10
ASSERT_EQ(blob->NumObsoleteBlobFiles(), 3);
blob_file_set_->DeleteBlobFilesInRanges(1, &range, 1, true /* include_end */,
0);
ASSERT_EQ(blob->NumBlobFiles(), metas.size());
// obsolete file: 6, 8, 9, 10, 13
ASSERT_EQ(blob->NumObsoleteBlobFiles(), 5);
std::vector<std::string> obsolete_files;
blob_file_set_->GetObsoleteFiles(&obsolete_files, 1);
ASSERT_EQ(blob->NumBlobFiles(), 9);
Slice begin1 = Slice("");
Slice end1 = Slice("99");
RangePtr range1(&begin1, &end1);
blob_file_set_->DeleteBlobFilesInRanges(1, &range1, 1,
false /* include_end */, 0);
// obsolete file: 2, 3, 4, 5, 11, 12
ASSERT_EQ(blob->NumObsoleteBlobFiles(), 6);
RangePtr range2(nullptr, nullptr);
blob_file_set_->DeleteBlobFilesInRanges(1, &range2, 1, true /* include_end */,
0);
// obsolete file: 1, 2, 3, 4, 5, 7, 11, 12, 14
ASSERT_EQ(blob->NumObsoleteBlobFiles(), 9);
blob_file_set_->GetObsoleteFiles(&obsolete_files, 1);
ASSERT_EQ(blob->NumBlobFiles(), 0);
}
TEST_F(VersionTest, BlobFileMetaV1ToV2) {
VersionEdit edit;
edit.SetColumnFamilyID(1);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment