Commit d7b40915 authored by Wu Jiayu's avatar Wu Jiayu Committed by yiwu-arbug

Limit blob file read IO with rate_limiter (#54)

Limit Titan blob file read IO with rate_limiter, to avoid burst of IO starve online workloads.
parent 9cd140f9
......@@ -28,7 +28,12 @@ Status NewBlobFileReader(uint64_t file_number, uint64_t readahead_size,
if (readahead_size > 0) {
file = NewReadaheadRandomAccessFile(std::move(file), readahead_size);
}
result->reset(new RandomAccessFileReader(std::move(file), file_name));
// Currently only `BlobGCJob` will call `NewBlobFileReader()`. We set
// `for_compaction=true` in this case to enable rate limiter.
result->reset(new RandomAccessFileReader(
std::move(file), file_name, nullptr /*env*/, nullptr /*stats*/,
0 /*hist_type*/, nullptr /*file_read_hist*/, env_options.rate_limiter,
true /*for compaction*/));
return s;
}
......
......@@ -131,8 +131,9 @@ class BlobGCJobTest : public testing::Test {
blob_gc->SetColumnFamily(cfh);
BlobGCJob blob_gc_job(blob_gc.get(), base_db_, mutex_, tdb_->db_options_,
tdb_->env_, EnvOptions(), tdb_->blob_manager_.get(),
version_set_, &log_buffer, nullptr, nullptr);
tdb_->env_, EnvOptions(options_),
tdb_->blob_manager_.get(), version_set_,
&log_buffer, nullptr, nullptr);
s = blob_gc_job.Prepare();
ASSERT_OK(s);
......@@ -260,6 +261,98 @@ TEST_F(BlobGCJobTest, DiscardEntry) { TestDiscardEntry(); }
TEST_F(BlobGCJobTest, RunGC) { TestRunGC(); }
TEST_F(BlobGCJobTest, GCLimiter) {
class TestLimiter : public RateLimiter {
public:
TestLimiter(RateLimiter::Mode mode)
: RateLimiter(mode), read(false), write(false) {}
size_t RequestToken(size_t bytes, size_t alignment,
Env::IOPriority io_priority, Statistics* stats,
RateLimiter::OpType op_type) override {
if (IsRateLimited(op_type)) {
if (op_type == RateLimiter::OpType::kRead) {
read = true;
} else {
write = true;
}
}
return bytes;
}
void SetBytesPerSecond(int64_t bytes_per_second) override {}
int64_t GetSingleBurstBytes() const override { return 0; }
int64_t GetTotalBytesThrough(
const Env::IOPriority pri = Env::IO_TOTAL) const override {
return 0;
}
int64_t GetTotalRequests(
const Env::IOPriority pri = Env::IO_TOTAL) const override {
return 0;
}
int64_t GetBytesPerSecond() const override { return 0; }
void Reset() {
read = false;
write = false;
}
bool ReadRequested() { return read; }
bool WriteRequested() { return write; }
private:
bool read;
bool write;
};
auto PutAndUpdate = [this] {
assert(db_);
for (int i = 0; i < MAX_KEY_NUM; i++) {
db_->Put(WriteOptions(), GenKey(i), GenValue(i));
}
Flush();
for (int i = 0; i < MAX_KEY_NUM; i++) {
db_->Put(WriteOptions(), GenKey(i), GenValue(i));
}
Flush();
};
TestLimiter* test_limiter = new TestLimiter(RateLimiter::Mode::kWritesOnly);
options_.rate_limiter = std::shared_ptr<RateLimiter>(test_limiter);
NewDB();
PutAndUpdate();
test_limiter->Reset();
RunGC();
ASSERT_TRUE(test_limiter->WriteRequested());
ASSERT_FALSE(test_limiter->ReadRequested());
DestroyDB();
test_limiter = new TestLimiter(RateLimiter::Mode::kReadsOnly);
options_.rate_limiter.reset(test_limiter);
NewDB();
PutAndUpdate();
test_limiter->Reset();
RunGC();
ASSERT_FALSE(test_limiter->WriteRequested());
ASSERT_TRUE(test_limiter->ReadRequested());
DestroyDB();
test_limiter = new TestLimiter(RateLimiter::Mode::kAllIo);
options_.rate_limiter.reset(test_limiter);
NewDB();
PutAndUpdate();
test_limiter->Reset();
RunGC();
ASSERT_TRUE(test_limiter->WriteRequested());
ASSERT_TRUE(test_limiter->ReadRequested());
DestroyDB();
}
// Tests blob file will be kept after GC, if it is still visible by active
// snapshots.
TEST_F(BlobGCJobTest, PurgeBlobs) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment