Unverified Commit 1de531cd authored by Connor's avatar Connor Committed by GitHub

update discardable size before delete files in ranges (#43)

* update discardable size before delete files in range
Signed-off-by: 's avatarConnor1996 <zbk602423539@gmail.com>
parent 91bbb5fc
......@@ -124,6 +124,10 @@ class TitanDB : public StackableDB {
std::vector<std::string>* const output_file_names = nullptr,
CompactionJobInfo* compaction_job_info = nullptr) override = 0;
virtual Status DeleteFilesInRanges(ColumnFamilyHandle* column_family,
const RangePtr* ranges, size_t n,
bool include_end = true) = 0;
using rocksdb::StackableDB::GetOptions;
Options GetOptions(ColumnFamilyHandle* column_family) const override = 0;
......
......@@ -628,6 +628,150 @@ void TitanDBImpl::ReleaseSnapshot(const Snapshot* snapshot) {
db_->ReleaseSnapshot(snapshot);
}
Status TitanDBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
const RangePtr* ranges, size_t n,
bool include_end) {
TablePropertiesCollection props;
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
Version* version = nullptr;
// Increment the ref count
{
InstrumentedMutexLock l(db_impl_->mutex());
version = cfd->current();
version->Ref();
}
auto* vstorage = version->storage_info();
for (size_t i = 0; i < n; i++) {
auto begin = ranges[i].start, end = ranges[i].limit;
// Get all the files within range except L0, cause `DeleteFilesInRanges`
// would not delete the files in L0.
for (int level = 1; level < vstorage->num_non_empty_levels(); level++) {
if (vstorage->LevelFiles(i).empty() ||
!vstorage->OverlapInLevel(i, begin, end)) {
continue;
}
std::vector<FileMetaData*> level_files;
InternalKey begin_storage, end_storage, *begin_key, *end_key;
if (begin == nullptr) {
begin_key = nullptr;
} else {
begin_storage.SetMinPossibleForUserKey(*begin);
begin_key = &begin_storage;
}
if (end == nullptr) {
end_key = nullptr;
} else {
end_storage.SetMaxPossibleForUserKey(*end);
end_key = &end_storage;
}
std::vector<FileMetaData*> files;
vstorage->GetCleanInputsWithinInterval(level, begin_key, end_key, &files,
-1 /* hint_index */,
nullptr /* file_index */);
for (const auto& file_meta : files) {
if (file_meta->being_compacted) {
continue;
}
if (!include_end && end != nullptr &&
cfd->user_comparator()->Compare(file_meta->largest.user_key(),
*end) == 0) {
continue;
}
auto fname =
TableFileName(cfd->ioptions()->cf_paths, file_meta->fd.GetNumber(),
file_meta->fd.GetPathId());
if (props.count(fname) == 0) {
std::shared_ptr<const TableProperties> table_properties;
Status s =
version->GetTableProperties(&table_properties, file_meta, &fname);
if (s.ok() && table_properties) {
props.insert({fname, table_properties});
} else {
return s;
}
}
}
}
}
// Decrement the ref count
{
InstrumentedMutexLock l(db_impl_->mutex());
version->Unref();
}
auto cf_id = column_family->GetID();
std::map<uint64_t, uint64_t> blob_files_size;
for (auto& collection : props) {
auto& prop = collection.second;
auto ucp_iter = prop->user_collected_properties.find(
BlobFileSizeCollector::kPropertiesName);
// this sst file doesn't contain any blob index
if (ucp_iter == prop->user_collected_properties.end()) {
continue;
}
std::map<uint64_t, uint64_t> sst_blob_files_size;
std::string str = ucp_iter->second;
Slice slice{str};
if (!BlobFileSizeCollector::Decode(&slice, &sst_blob_files_size)) {
// TODO: Should treat it as background error and make DB read-only.
ROCKS_LOG_ERROR(db_options_.info_log,
"failed to decode table property, "
"deleted file: %s, property size: %" ROCKSDB_PRIszt ".",
collection.first.c_str(), str.size());
assert(false);
continue;
}
for (auto& it : sst_blob_files_size) {
blob_files_size[it.first] += it.second;
}
}
// Here could be a running compaction install a new version after obtain
// current and before we call DeleteFilesInRange for the base DB. In this case
// the properties we get could be inaccurate.
// TODO: we can use the OnTableFileDeleted callback after adding table
// property field to TableFileDeletionInfo.
Status s =
db_impl_->DeleteFilesInRanges(column_family, ranges, n, include_end);
if (!s.ok()) return s;
MutexLock l(&mutex_);
auto bs = vset_->GetBlobStorage(cf_id).lock();
if (!bs) {
// TODO: Should treat it as background error and make DB read-only.
ROCKS_LOG_ERROR(db_options_.info_log,
"Column family id:% " PRIu32 " not Found.", cf_id);
return Status::NotFound("Column family id: " + std::to_string(cf_id) +
" not Found.");
}
uint64_t delta = 0;
for (const auto& bfs : blob_files_size) {
auto file = bs->FindFile(bfs.first).lock();
if (!file) {
// file has been gc out
continue;
}
if (!file->is_obsolete()) {
delta += bfs.second;
}
file->AddDiscardableSize(static_cast<uint64_t>(bfs.second));
}
SubStats(stats_.get(), cf_id, TitanInternalStats::LIVE_BLOB_SIZE, delta);
bs->ComputeGCScore();
AddToGCQueue(cf_id);
MaybeScheduleGC();
return s;
}
Options TitanDBImpl::GetOptions(ColumnFamilyHandle* column_family) const {
assert(column_family != nullptr);
Options options = db_->GetOptions(column_family);
......@@ -674,7 +818,7 @@ Status TitanDBImpl::SetOptions(
return s;
}
}
// Make sure base db's SetOptions sucesss before setting blob_run_mode.
// Make sure base db's SetOptions success before setting blob_run_mode.
if (set_blob_run_mode) {
uint32_t cf_id = column_family->GetID();
{
......@@ -760,7 +904,7 @@ void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) {
// TODO: Should treat it as background error and make DB read-only.
ROCKS_LOG_ERROR(db_options_.info_log,
"OnFlushCompleted[%d]: failed to decode table property, "
"prroperty size: %" ROCKSDB_PRIszt ".",
"property size: %" ROCKSDB_PRIszt ".",
flush_job_info.job_id, ucp_iter->second.size());
assert(false);
}
......
......@@ -91,6 +91,10 @@ class TitanDBImpl : public TitanDB {
void ReleaseSnapshot(const Snapshot* snapshot) override;
Status DeleteFilesInRanges(ColumnFamilyHandle* column_family,
const RangePtr* ranges, size_t n,
bool include_end = true) override;
using TitanDB::GetOptions;
Options GetOptions(ColumnFamilyHandle* column_family) const override;
......
......@@ -33,6 +33,8 @@ class TitanDBTest : public testing::Test {
options_.create_if_missing = true;
options_.min_blob_size = 32;
options_.min_gc_batch_size = 1;
options_.merge_small_file_threshold = 0;
options_.disable_background_gc = true;
options_.blob_file_compression = CompressionType::kLZ4Compression;
DeleteDir(env_, options_.dirname);
DeleteDir(env_, dbname_);
......@@ -193,6 +195,20 @@ class TitanDBTest : public testing::Test {
}
}
void CompactAll() {
auto opts = db_->GetOptions();
auto compact_opts = CompactRangeOptions();
compact_opts.change_level = true;
compact_opts.target_level = opts.num_levels - 1;
compact_opts.bottommost_level_compaction = BottommostLevelCompaction::kSkip;
ASSERT_OK(db_->CompactRange(compact_opts, nullptr, nullptr));
}
void DeleteFilesInRange(const Slice* begin, const Slice* end) {
RangePtr range(begin, end);
ASSERT_OK(db_->DeleteFilesInRanges(db_->DefaultColumnFamily(), &range, 1));
}
std::string GenKey(uint64_t i) {
char buf[64];
snprintf(buf, sizeof(buf), "k-%08" PRIu64, i);
......@@ -452,6 +468,79 @@ TEST_F(TitanDBTest, DropColumnFamily) {
Close();
}
TEST_F(TitanDBTest, DeleteFilesInRange) {
Open();
ASSERT_OK(db_->Put(WriteOptions(), GenKey(11), GenValue(1)));
ASSERT_OK(db_->Put(WriteOptions(), GenKey(21), GenValue(2)));
ASSERT_OK(db_->Put(WriteOptions(), GenKey(31), GenValue(3)));
Flush();
ASSERT_OK(db_->Put(WriteOptions(), GenKey(41), GenValue(4)));
ASSERT_OK(db_->Put(WriteOptions(), GenKey(51), GenValue(5)));
Flush();
ASSERT_OK(db_->Put(WriteOptions(), GenKey(61), GenValue(6)));
ASSERT_OK(db_->Put(WriteOptions(), GenKey(71), GenValue(7)));
ASSERT_OK(db_->Put(WriteOptions(), GenKey(81), GenValue(8)));
ASSERT_OK(db_->Put(WriteOptions(), GenKey(91), GenValue(9)));
Flush();
CompactAll();
std::string value;
ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level0", &value));
ASSERT_EQ(value, "0");
ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level6", &value));
ASSERT_EQ(value, "3");
ASSERT_OK(db_->Put(WriteOptions(), GenKey(12), GenValue(1)));
ASSERT_OK(db_->Put(WriteOptions(), GenKey(22), GenValue(2)));
ASSERT_OK(db_->Put(WriteOptions(), GenKey(32), GenValue(3)));
Flush();
ASSERT_OK(db_->Put(WriteOptions(), GenKey(42), GenValue(4)));
ASSERT_OK(db_->Put(WriteOptions(), GenKey(52), GenValue(5)));
Flush();
ASSERT_OK(db_->Put(WriteOptions(), GenKey(62), GenValue(6)));
ASSERT_OK(db_->Put(WriteOptions(), GenKey(72), GenValue(7)));
ASSERT_OK(db_->Put(WriteOptions(), GenKey(82), GenValue(8)));
ASSERT_OK(db_->Put(WriteOptions(), GenKey(92), GenValue(9)));
Flush();
// The LSM structure is:
// L0: [11, 21, 31] [41, 51] [61, 71, 81, 91]
// L6: [12, 22, 32] [42, 52] [62, 72, 82, 92]
// with 6 blob files
ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level0", &value));
ASSERT_EQ(value, "3");
ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level6", &value));
ASSERT_EQ(value, "3");
std::string key40 = GenKey(40);
std::string key70 = GenKey(70);
Slice start = Slice(key40);
Slice end = Slice(key70);
DeleteFilesInRange(&start, &end);
// Now the LSM structure is:
// L0: [11, 21, 31] [41, 51] [61, 71, 81, 91]
// L6: [12, 22, 32] [62, 72, 82, 92]
// with 6 blob files
ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level0", &value));
ASSERT_EQ(value, "3");
ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level6", &value));
ASSERT_EQ(value, "2");
auto blob = GetBlobStorage(db_->DefaultColumnFamily());
auto before = blob.lock()->NumBlobFiles();
ASSERT_EQ(before, 6);
ASSERT_OK(db_impl_->TEST_StartGC(db_->DefaultColumnFamily()->GetID()));
ASSERT_OK(db_impl_->TEST_PurgeObsoleteFiles());
// The blob file of deleted SST should be GCed.
ASSERT_EQ(before - 1, blob.lock()->NumBlobFiles());
Close();
}
TEST_F(TitanDBTest, VersionEditError) {
Open();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment