Commit 339a4651 authored by zhangjinpeng1987's avatar zhangjinpeng1987

add rate limiter to limit disk IO when doing compaction and flush

parent 93f4208e
...@@ -35,6 +35,7 @@ pub enum DBWriteBatch {} ...@@ -35,6 +35,7 @@ pub enum DBWriteBatch {}
pub enum DBComparator {} pub enum DBComparator {}
pub enum DBFlushOptions {} pub enum DBFlushOptions {}
pub enum DBCompactionFilter {} pub enum DBCompactionFilter {}
pub enum DBRateLimiter {}
pub fn new_bloom_filter(bits: c_int) -> *mut DBFilterPolicy { pub fn new_bloom_filter(bits: c_int) -> *mut DBFilterPolicy {
unsafe { rocksdb_filterpolicy_create_bloom(bits) } unsafe { rocksdb_filterpolicy_create_bloom(bits) }
...@@ -186,6 +187,12 @@ extern "C" { ...@@ -186,6 +187,12 @@ extern "C" {
pub fn rocksdb_options_statistics_get_string(options: *mut DBOptions) -> *const c_char; pub fn rocksdb_options_statistics_get_string(options: *mut DBOptions) -> *const c_char;
pub fn rocksdb_options_set_stats_dump_period_sec(options: *mut DBOptions, v: usize); pub fn rocksdb_options_set_stats_dump_period_sec(options: *mut DBOptions, v: usize);
pub fn rocksdb_options_set_num_levels(options: *mut DBOptions, v: c_int); pub fn rocksdb_options_set_num_levels(options: *mut DBOptions, v: c_int);
pub fn rocksdb_options_set_ratelimiter(options: *mut DBOptions, limiter: *mut DBRateLimiter);
pub fn rocksdb_ratelimiter_create(rate_bytes_per_sec: i64,
refill_period_us: i64,
fairness: i32)
-> *mut DBRateLimiter;
pub fn rocksdb_ratelimiter_destroy(limiter: *mut DBRateLimiter);
pub fn rocksdb_filterpolicy_create_bloom_full(bits_per_key: c_int) -> *mut DBFilterPolicy; pub fn rocksdb_filterpolicy_create_bloom_full(bits_per_key: c_int) -> *mut DBFilterPolicy;
pub fn rocksdb_filterpolicy_create_bloom(bits_per_key: c_int) -> *mut DBFilterPolicy; pub fn rocksdb_filterpolicy_create_bloom(bits_per_key: c_int) -> *mut DBFilterPolicy;
pub fn rocksdb_open(options: *mut DBOptions, pub fn rocksdb_open(options: *mut DBOptions,
......
...@@ -20,7 +20,8 @@ use merge_operator::{self, MergeOperatorCallback, full_merge_callback, partial_m ...@@ -20,7 +20,8 @@ use merge_operator::{self, MergeOperatorCallback, full_merge_callback, partial_m
use merge_operator::MergeFn; use merge_operator::MergeFn;
use rocksdb_ffi::{self, DBOptions, DBWriteOptions, DBBlockBasedTableOptions, DBReadOptions, use rocksdb_ffi::{self, DBOptions, DBWriteOptions, DBBlockBasedTableOptions, DBReadOptions,
DBCompressionType, DBRecoveryMode, DBSnapshot, DBInstance, DBFlushOptions}; DBCompressionType, DBRecoveryMode, DBSnapshot, DBInstance, DBFlushOptions,
DBRateLimiter};
use std::ffi::{CStr, CString}; use std::ffi::{CStr, CString};
use std::mem; use std::mem;
...@@ -87,6 +88,31 @@ impl BlockBasedOptions { ...@@ -87,6 +88,31 @@ impl BlockBasedOptions {
} }
} }
pub struct RateLimiter {
inner: *mut DBRateLimiter,
}
impl RateLimiter {
pub fn new(rate_bytes_per_sec: i64,
refill_period_us: i64,
fairness: i32) -> RateLimiter {
let limiter = unsafe {
rocksdb_ffi::rocksdb_ratelimiter_create(rate_bytes_per_sec,
refill_period_us,
fairness)
};
RateLimiter {
inner: limiter,
}
}
}
impl Drop for RateLimiter {
fn drop(&mut self) {
unsafe { rocksdb_ffi::rocksdb_ratelimiter_destroy(self.inner) }
}
}
/// The UnsafeSnap must be destroyed by db, it maybe be leaked /// The UnsafeSnap must be destroyed by db, it maybe be leaked
/// if not using it properly, hence named as unsafe. /// if not using it properly, hence named as unsafe.
/// ///
...@@ -542,6 +568,15 @@ impl Options { ...@@ -542,6 +568,15 @@ impl Options {
rocksdb_ffi::rocksdb_options_set_num_levels(self.inner, n); rocksdb_ffi::rocksdb_options_set_num_levels(self.inner, n);
} }
} }
pub fn set_ratelimiter(&mut self, rate_bytes_per_sec: i64) {
let rate_limiter = RateLimiter::new(rate_bytes_per_sec,
100 * 1000 /* 100ms should work for most cases */,
10 /* should be good by leaving it at default 10 */);
unsafe {
rocksdb_ffi::rocksdb_options_set_ratelimiter(self.inner, rate_limiter.inner);
}
}
} }
pub struct FlushOptions { pub struct FlushOptions {
......
...@@ -11,3 +11,14 @@ fn test_set_num_levels() { ...@@ -11,3 +11,14 @@ fn test_set_num_levels() {
let db = DB::open(opts, path.path().to_str().unwrap()).unwrap(); let db = DB::open(opts, path.path().to_str().unwrap()).unwrap();
drop(db); drop(db);
} }
#[test]
fn test_set_ratelimiter() {
let path = TempDir::new("_rust_rocksdb_test_set_rate_limiter").expect("");
let mut opts = Options::new();
opts.create_if_missing(true);
// compaction and flush rate limited below 100MB/sec
opts.set_ratelimiter(100 * 1024 * 1024);
let db = DB::open(opts, path.path().to_str().unwrap()).unwrap();
drop(db);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment