Unverified Commit 8c84a1a4 authored by Wu Jiayu's avatar Wu Jiayu Committed by GitHub

Live data based GC for level merge (#77)

This patch implemented live data based GC for level merge.

Delete blob files which have (discardable size == file_size - file header - file footer) in OnCompactionCompleted()
Mark last two level blob files to merge in next compaction if discardable size reached GC threshold
Using live data based GC instead of regular GC as long as level merge is enabled.
Signed-off-by: 's avatarYi Wu <yiwu@pingcap.com>
parent 04aab1c4
......@@ -26,7 +26,8 @@ struct TitanDBOptions : public DBOptions {
// Default: 1
int32_t max_background_gc{1};
// How often to schedule delete obsolete blob files periods
// How often to schedule delete obsolete blob files periods.
// If set zero, obsolete blob files won't be deleted.
//
// Default: 10
uint32_t purge_obsolete_files_period_sec{10}; // 10s
......
......@@ -77,17 +77,17 @@ void BlobFileIterator::IterateForPrev(uint64_t offset) {
}
uint64_t total_length = 0;
FixedSlice<kBlobHeaderSize> header_buffer;
FixedSlice<kRecordHeaderSize> header_buffer;
iterate_offset_ = BlobFileHeader::kEncodedLength;
for (; iterate_offset_ < offset; iterate_offset_ += total_length) {
// With for_compaction=true, rate_limiter is enabled. Since BlobFileIterator
// is only used for GC, we always set for_compaction to true.
status_ = file_->Read(iterate_offset_, kBlobHeaderSize, &header_buffer,
status_ = file_->Read(iterate_offset_, kRecordHeaderSize, &header_buffer,
header_buffer.get(), true /*for_compaction*/);
if (!status_.ok()) return;
status_ = decoder_.DecodeHeader(&header_buffer);
if (!status_.ok()) return;
total_length = kBlobHeaderSize + decoder_.GetRecordSize();
total_length = kRecordHeaderSize + decoder_.GetRecordSize();
}
if (iterate_offset_ > offset) iterate_offset_ -= total_length;
......@@ -95,10 +95,10 @@ void BlobFileIterator::IterateForPrev(uint64_t offset) {
}
void BlobFileIterator::GetBlobRecord() {
FixedSlice<kBlobHeaderSize> header_buffer;
FixedSlice<kRecordHeaderSize> header_buffer;
// With for_compaction=true, rate_limiter is enabled. Since BlobFileIterator
// is only used for GC, we always set for_compaction to true.
status_ = file_->Read(iterate_offset_, kBlobHeaderSize, &header_buffer,
status_ = file_->Read(iterate_offset_, kRecordHeaderSize, &header_buffer,
header_buffer.get(), true /*for_compaction*/);
if (!status_.ok()) return;
status_ = decoder_.DecodeHeader(&header_buffer);
......@@ -109,7 +109,7 @@ void BlobFileIterator::GetBlobRecord() {
buffer_.resize(record_size);
// With for_compaction=true, rate_limiter is enabled. Since BlobFileIterator
// is only used for GC, we always set for_compaction to true.
status_ = file_->Read(iterate_offset_ + kBlobHeaderSize, record_size,
status_ = file_->Read(iterate_offset_ + kRecordHeaderSize, record_size,
&record_slice, buffer_.data(), true /*for_compaction*/);
if (status_.ok()) {
status_ =
......@@ -118,7 +118,7 @@ void BlobFileIterator::GetBlobRecord() {
if (!status_.ok()) return;
cur_record_offset_ = iterate_offset_;
cur_record_size_ = kBlobHeaderSize + record_size;
cur_record_size_ = kRecordHeaderSize + record_size;
iterate_offset_ += cur_record_size_;
valid_ = true;
}
......@@ -138,7 +138,7 @@ void BlobFileIterator::PrefetchAndGet() {
readahead_size_ = kMinReadaheadSize;
}
auto min_blob_size =
iterate_offset_ + kBlobHeaderSize + titan_cf_options_.min_blob_size;
iterate_offset_ + kRecordHeaderSize + titan_cf_options_.min_blob_size;
if (readahead_end_offset_ <= min_blob_size) {
while (readahead_end_offset_ + readahead_size_ <= min_blob_size &&
readahead_size_ < kMaxReadaheadSize)
......
......@@ -155,7 +155,7 @@ TEST_F(BlobFileIteratorTest, IterateForPrev) {
while ((idx = Random::GetTLSInstance()->Uniform(n)) == 0)
;
blob_file_iterator_->IterateForPrev(handles[idx].offset - kBlobHeaderSize -
blob_file_iterator_->IterateForPrev(handles[idx].offset - kRecordHeaderSize -
1);
ASSERT_OK(blob_file_iterator_->status());
blob_file_iterator_->Next();
......
......@@ -56,7 +56,7 @@ Status BlobDecoder::DecodeHeader(Slice* src) {
if (!GetFixed32(src, &crc_)) {
return Status::Corruption("BlobHeader");
}
header_crc_ = crc32c::Value(src->data(), kBlobHeaderSize - 4);
header_crc_ = crc32c::Value(src->data(), kRecordHeaderSize - 4);
unsigned char compression;
if (!GetFixed32(src, &record_size_) || !GetChar(src, &compression)) {
......@@ -218,6 +218,13 @@ void BlobFileMeta::FileStateTransit(const FileEvent& event) {
assert(state_ != FileState::kObsolete);
state_ = FileState::kObsolete;
break;
case FileEvent::kNeedMerge:
if (state_ == FileState::kToMerge) {
break;
}
assert(state_ == FileState::kNormal);
state_ = FileState::kToMerge;
break;
default:
assert(false);
}
......
......@@ -12,10 +12,10 @@ namespace titandb {
// Blob file overall format:
//
// [blob file header]
// [blob head + record 1]
// [blob head + record 2]
// [record head + record 1]
// [record head + record 2]
// ...
// [blob head + record N]
// [record head + record N]
// [blob file footer]
// Format of blob head (9 bytes):
......@@ -26,7 +26,9 @@ namespace titandb {
// | Fixed32 | Fixed32 | char |
// +---------+---------+-------------+
//
const uint64_t kBlobHeaderSize = 9;
const uint64_t kBlobHeaderSize = 8;
const uint64_t kRecordHeaderSize = 9;
const uint64_t kBlobFooterSize = BlockHandle::kMaxEncodedLength + 8 + 4;
// Format of blob record (not fixed size):
//
......@@ -64,7 +66,7 @@ class BlobEncoder {
size_t GetEncodedSize() const { return sizeof(header_) + record_.size(); }
private:
char header_[kBlobHeaderSize];
char header_[kRecordHeaderSize];
Slice record_;
std::string record_buffer_;
std::string compressed_buffer_;
......@@ -164,6 +166,7 @@ class BlobFileMeta {
kFlushOrCompactionOutput,
kDbRestart,
kDelete,
kNeedMerge,
};
enum class FileState {
......@@ -173,6 +176,7 @@ class BlobFileMeta {
kBeingGC, // being gced
kPendingGC, // output of gc, waiting gc finish and keys adding to LSM
kObsolete, // already gced, but wait to be physical deleted
kToMerge, // need merge to new blob file in next compaction
};
BlobFileMeta() = default;
......@@ -212,6 +216,9 @@ class BlobFileMeta {
void AddDiscardableSize(uint64_t _discardable_size);
double GetDiscardableRatio() const;
bool NoLiveData() {
return discardable_size_ == file_size_ - kBlobHeaderSize - kBlobFooterSize;
}
TitanInternalStats::StatsType GetDiscardableRatioLevel() const;
private:
......@@ -270,7 +277,7 @@ struct BlobFileHeader {
struct BlobFileFooter {
// The first 64bits from $(echo titandb/blob | sha1sum).
static const uint64_t kFooterMagicNumber{0x2be0a6148e39edc6ull};
static const uint64_t kEncodedLength{BlockHandle::kMaxEncodedLength + 8 + 4};
static const uint64_t kEncodedLength{kBlobFooterSize};
BlockHandle meta_index_handle{BlockHandle::NullBlockHandle()};
......
......@@ -527,6 +527,59 @@ TEST_F(BlobGCJobTest, DeleteFilesInRange) {
DestroyDB();
}
TEST_F(BlobGCJobTest, LevelMergeGC) {
options_.level_merge = true;
options_.level_compaction_dynamic_level_bytes = true;
options_.blob_file_discardable_ratio = 0.5;
options_.purge_obsolete_files_period_sec = 0;
NewDB();
ColumnFamilyMetaData cf_meta;
std::vector<std::string> to_compact;
auto opts = db_->GetOptions();
for (int i = 0; i < 10; i++) {
db_->Put(WriteOptions(), GenKey(i), GenValue(i));
}
Flush();
CheckBlobNumber(1);
// compact level0 file to last level
db_->GetColumnFamilyMetaData(base_db_->DefaultColumnFamily(), &cf_meta);
to_compact.push_back(cf_meta.levels[0].files[0].name);
db_->CompactFiles(CompactionOptions(), base_db_->DefaultColumnFamily(),
to_compact, opts.num_levels - 1);
CheckBlobNumber(2);
// update most of keys
for (int i = 1; i < 11; i++) {
db_->Put(WriteOptions(), GenKey(i), GenValue(i));
}
Flush();
CheckBlobNumber(3);
// compact new level0 file to last level
db_->GetColumnFamilyMetaData(base_db_->DefaultColumnFamily(), &cf_meta);
to_compact[0] = cf_meta.levels[0].files[0].name;
db_->CompactFiles(CompactionOptions(), base_db_->DefaultColumnFamily(),
to_compact, opts.num_levels - 1);
CheckBlobNumber(4);
auto b = GetBlobStorage(base_db_->DefaultColumnFamily()->GetID()).lock();
// blob file number is start from 2, they are: old level0 blob file, old
// last level blob file, new level0 blob file, new last level blob file
// respectively.
ASSERT_EQ(b->FindFile(2).lock()->file_state(),
BlobFileMeta::FileState::kObsolete);
ASSERT_EQ(b->FindFile(3).lock()->file_state(),
BlobFileMeta::FileState::kToMerge);
ASSERT_EQ(b->FindFile(4).lock()->file_state(),
BlobFileMeta::FileState::kObsolete);
ASSERT_EQ(b->FindFile(5).lock()->file_state(),
BlobFileMeta::FileState::kNormal);
DestroyDB();
}
} // namespace titandb
} // namespace rocksdb
......
......@@ -143,7 +143,8 @@ TitanDBImpl::TitanDBImpl(const TitanDBOptions& options,
TitanDBImpl::~TitanDBImpl() { Close(); }
void TitanDBImpl::StartBackgroundTasks() {
if (thread_purge_obsolete_ == nullptr) {
if (thread_purge_obsolete_ == nullptr &&
db_options_.purge_obsolete_files_period_sec > 0) {
thread_purge_obsolete_.reset(new rocksdb::RepeatableThread(
[this]() { TitanDBImpl::PurgeObsoleteFiles(); }, "titanbg", env_,
db_options_.purge_obsolete_files_period_sec * 1000 * 1000));
......@@ -159,10 +160,6 @@ void TitanDBImpl::StartBackgroundTasks() {
Status TitanDBImpl::ValidateOptions(
const TitanDBOptions& options,
const std::vector<TitanCFDescriptor>& column_families) const {
if (options.purge_obsolete_files_period_sec == 0) {
return Status::InvalidArgument(
"Require non-zero purge_obsolete_files_period_sec");
}
for (const auto& cf : column_families) {
if (cf.options.level_merge &&
!cf.options.level_compaction_dynamic_level_bytes) {
......@@ -752,7 +749,7 @@ Status TitanDBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
}
auto cf_id = column_family->GetID();
std::map<uint64_t, uint64_t> blob_files_size;
std::map<uint64_t, uint64_t> blob_files_discardable_size;
for (auto& collection : props) {
auto& prop = collection.second;
auto ucp_iter = prop->user_collected_properties.find(
......@@ -775,7 +772,7 @@ Status TitanDBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
}
for (auto& it : sst_blob_files_size) {
blob_files_size[it.first] += it.second;
blob_files_discardable_size[it.first] += it.second;
}
}
......@@ -799,7 +796,9 @@ Status TitanDBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
}
uint64_t delta = 0;
for (const auto& bfs : blob_files_size) {
VersionEdit edit;
auto cf_options = bs->cf_options();
for (const auto& bfs : blob_files_discardable_size) {
auto file = bs->FindFile(bfs.first).lock();
if (!file) {
// file has been gc out
......@@ -815,12 +814,25 @@ Status TitanDBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
AddStats(stats_.get(), cf_id, after, 1);
SubStats(stats_.get(), cf_id, before, 1);
}
if (cf_options.level_merge) {
if (file->NoLiveData()) {
edit.DeleteBlobFile(file->file_number(),
db_impl_->GetLatestSequenceNumber());
} else if (file->GetDiscardableRatio() >
cf_options.blob_file_discardable_ratio) {
file->FileStateTransit(BlobFileMeta::FileEvent::kNeedMerge);
}
}
}
SubStats(stats_.get(), cf_id, TitanInternalStats::LIVE_BLOB_SIZE, delta);
bs->ComputeGCScore();
if (cf_options.level_merge) {
blob_file_set_->LogAndApply(edit);
} else {
bs->ComputeGCScore();
AddToGCQueue(cf_id);
MaybeScheduleGC();
AddToGCQueue(cf_id);
MaybeScheduleGC();
}
return s;
}
......@@ -1003,7 +1015,7 @@ void TitanDBImpl::OnCompactionCompleted(
// TODO: Clean up blob file generated by the failed compaction.
return;
}
std::map<uint64_t, int64_t> blob_files_size;
std::map<uint64_t, int64_t> blob_files_size_diff;
std::set<uint64_t> outputs;
std::set<uint64_t> inputs;
auto calc_bfs = [&](const std::vector<std::string>& files, int coefficient,
......@@ -1046,9 +1058,10 @@ void TitanDBImpl::OnCompactionCompleted(
} else {
inputs.insert(input_bfs.first);
}
auto bfs_iter = blob_files_size.find(input_bfs.first);
if (bfs_iter == blob_files_size.end()) {
blob_files_size[input_bfs.first] = coefficient * input_bfs.second;
auto bfs_iter = blob_files_size_diff.find(input_bfs.first);
if (bfs_iter == blob_files_size_diff.end()) {
blob_files_size_diff[input_bfs.first] =
coefficient * input_bfs.second;
} else {
bfs_iter->second += coefficient * input_bfs.second;
}
......@@ -1089,7 +1102,9 @@ void TitanDBImpl::OnCompactionCompleted(
}
uint64_t delta = 0;
for (const auto& bfs : blob_files_size) {
VersionEdit edit;
auto cf_options = bs->cf_options();
for (const auto& bfs : blob_files_size_diff) {
// blob file size < 0 means discardable size > 0
if (bfs.second >= 0) {
continue;
......@@ -1109,13 +1124,34 @@ void TitanDBImpl::OnCompactionCompleted(
AddStats(stats_.get(), compaction_job_info.cf_id, after, 1);
SubStats(stats_.get(), compaction_job_info.cf_id, before, 1);
}
if (cf_options.level_merge) {
// After level merge, most entries of merged blob files are written to
// new blob files. Delete blob files which have no live data.
// Mark last two level blob files to merge in next compaction if
// discardable size reached GC threshold
if (file->NoLiveData()) {
edit.DeleteBlobFile(file->file_number(),
db_impl_->GetLatestSequenceNumber());
} else if (static_cast<int>(file->file_level()) >=
cf_options.num_levels - 2 &&
file->GetDiscardableRatio() >
cf_options.blob_file_discardable_ratio) {
file->FileStateTransit(BlobFileMeta::FileEvent::kNeedMerge);
}
}
}
SubStats(stats_.get(), compaction_job_info.cf_id,
TitanInternalStats::LIVE_BLOB_SIZE, delta);
bs->ComputeGCScore();
// If level merge is enabled, blob files will be deleted by live
// data based GC, so we don't need to trigger regular GC anymore
if (cf_options.level_merge) {
blob_file_set_->LogAndApply(edit);
} else {
bs->ComputeGCScore();
AddToGCQueue(compaction_job_info.cf_id);
MaybeScheduleGC();
AddToGCQueue(compaction_job_info.cf_id);
MaybeScheduleGC();
}
}
}
......
......@@ -220,7 +220,10 @@ TableProperties TitanTableBuilder::GetTableProperties() const {
bool TitanTableBuilder::ShouldMerge(
const std::shared_ptr<rocksdb::titandb::BlobFileMeta>& file) {
return file != nullptr && (int)file->file_level() < target_level_;
assert(cf_options_.level_merge);
return file != nullptr &&
(static_cast<int>(file->file_level()) < target_level_ ||
file->file_state() == BlobFileMeta::FileState::kToMerge);
}
void TitanTableBuilder::UpdateInternalOpStats() {
......
......@@ -61,8 +61,9 @@ class TitanOptionsTest : public testing::Test {
TitanDB* titan_db = nullptr;
};
TEST_F(TitanOptionsTest, PurgeObsoleteFilesPeriodSec) {
titan_options_.purge_obsolete_files_period_sec = 0;
TEST_F(TitanOptionsTest, LevelMerge) {
titan_options_.level_merge = true;
titan_options_.level_compaction_dynamic_level_bytes = false;
Status s = Open();
ASSERT_TRUE(s.IsInvalidArgument());
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment