Commit 812e1977 authored by Wu Jiayu's avatar Wu Jiayu Committed by yiwu-arbug

Use prefetcher for level merge (#116)

since level merge will read blob files sequentially, we should use prefetcher to accelerate merging.

experiment result:

Load 300GB (average value size 1KB) on Huawei NVME device, time to read blob file reduced about 50%.
Signed-off-by: 's avatarwujy-cs <wujy.cs@gmail.com>
parent 0fc61eec
......@@ -35,11 +35,7 @@ void TitanTableBuilder::Add(const Slice& key, const Slice& value) {
BlobRecord record;
PinnableSlice buffer;
auto storage = blob_storage_.lock();
assert(storage != nullptr);
ReadOptions options; // dummy option
Status get_status = storage->Get(options, index, &record, &buffer);
Status get_status = GetBlobRecord(index, &record, &buffer);
UpdateIOBytes(prev_bytes_read, prev_bytes_written, &io_bytes_read_,
&io_bytes_written_);
if (get_status.ok()) {
......@@ -85,8 +81,11 @@ void TitanTableBuilder::Add(const Slice& key, const Slice& value) {
if (ShouldMerge(blob_file)) {
BlobRecord record;
PinnableSlice buffer;
Status s = storage->Get(ReadOptions(), index, &record, &buffer);
if (s.ok()) {
Status get_status = GetBlobRecord(index, &record, &buffer);
// If not ok, write original blob index as compaction output without
// doing level merge.
if (get_status.ok()) {
std::string index_value;
AddBlob(ikey.user_key, record.value, &index_value);
UpdateIOBytes(prev_bytes_read, prev_bytes_written, &io_bytes_read_,
......@@ -98,6 +97,11 @@ void TitanTableBuilder::Add(const Slice& key, const Slice& value) {
base_builder_->Add(index_key, index_value);
return;
}
} else {
++error_read_cnt_;
ROCKS_LOG_DEBUG(db_options_.info_log,
"Read file %" PRIu64 " error during level merge: %s",
index.file_number, get_status.ToString().c_str());
}
}
base_builder_->Add(key, value);
......@@ -195,6 +199,11 @@ Status TitanTableBuilder::Finish() {
status_.ToString().c_str());
}
UpdateInternalOpStats();
if (error_read_cnt_ > 0) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Read file error %" PRIu64 " times during level merge",
error_read_cnt_);
}
return status();
}
......@@ -266,5 +275,29 @@ void TitanTableBuilder::UpdateInternalOpStats() {
}
}
Status TitanTableBuilder::GetBlobRecord(const BlobIndex& index,
BlobRecord* record,
PinnableSlice* buffer) {
Status s;
auto it = input_file_prefetchers_.find(index.file_number);
if (it == input_file_prefetchers_.end()) {
std::unique_ptr<BlobFilePrefetcher> prefetcher;
auto storage = blob_storage_.lock();
assert(storage != nullptr);
s = storage->NewPrefetcher(index.file_number, &prefetcher);
if (s.ok()) {
it = input_file_prefetchers_
.emplace(index.file_number, std::move(prefetcher))
.first;
}
}
if (s.ok()) {
s = it->second->Get(ReadOptions(), index.blob_handle, record, buffer);
}
return s;
}
} // namespace titandb
} // namespace rocksdb
......@@ -57,6 +57,9 @@ class TitanTableBuilder : public TableBuilder {
void UpdateInternalOpStats();
Status GetBlobRecord(const BlobIndex& index, BlobRecord* record,
PinnableSlice* buffer);
Status status_;
uint32_t cf_id_;
TitanDBOptions db_options_;
......@@ -69,6 +72,8 @@ class TitanTableBuilder : public TableBuilder {
std::vector<
std::pair<std::shared_ptr<BlobFileMeta>, std::unique_ptr<BlobFileHandle>>>
finished_blobs_;
std::unordered_map<uint64_t, std::unique_ptr<BlobFilePrefetcher>>
input_file_prefetchers_;
TitanStats* stats_;
// target level in LSM-Tree for generated SSTs and blob files
......@@ -83,6 +88,7 @@ class TitanTableBuilder : public TableBuilder {
uint64_t bytes_written_ = 0;
uint64_t io_bytes_read_ = 0;
uint64_t io_bytes_written_ = 0;
uint64_t error_read_cnt_ = 0;
};
} // namespace titandb
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment