Commit 61d419d3 authored by UncP's avatar UncP Committed by zhangjinpeng1987

add `rate limiter` related api (#147)

parent fbf2fa5b
...@@ -2423,6 +2423,35 @@ void crocksdb_ratelimiter_destroy(crocksdb_ratelimiter_t *limiter) { ...@@ -2423,6 +2423,35 @@ void crocksdb_ratelimiter_destroy(crocksdb_ratelimiter_t *limiter) {
delete limiter; delete limiter;
} }
void crocksdb_ratelimiter_set_bytes_per_second(crocksdb_ratelimiter_t *limiter,
int64_t rate_bytes_per_sec) {
limiter->rep->SetBytesPerSecond(rate_bytes_per_sec);
}
int64_t crocksdb_ratelimiter_get_singleburst_bytes(crocksdb_ratelimiter_t *limiter) {
return limiter->rep->GetSingleBurstBytes();
}
void crocksdb_ratelimiter_request(crocksdb_ratelimiter_t *limiter,
int64_t bytes, unsigned char pri) {
limiter->rep->Request(bytes, static_cast<Env::IOPriority>(pri), nullptr);
}
int64_t crocksdb_ratelimiter_get_total_bytes_through(
crocksdb_ratelimiter_t *limiter, unsigned char pri) {
return limiter->rep->GetTotalBytesThrough(static_cast<Env::IOPriority>(pri));
}
int64_t crocksdb_ratelimiter_get_bytes_per_second(
crocksdb_ratelimiter_t *limiter) {
return limiter->rep->GetBytesPerSecond();
}
int64_t crocksdb_ratelimiter_get_total_requests(
crocksdb_ratelimiter_t *limiter, unsigned char pri) {
return limiter->rep->GetTotalRequests(static_cast<Env::IOPriority>(pri));
}
/* /*
TODO: TODO:
DB::OpenForReadOnly DB::OpenForReadOnly
......
...@@ -988,6 +988,23 @@ extern C_ROCKSDB_LIBRARY_API void crocksdb_options_set_delayed_write_rate( ...@@ -988,6 +988,23 @@ extern C_ROCKSDB_LIBRARY_API void crocksdb_options_set_delayed_write_rate(
extern C_ROCKSDB_LIBRARY_API crocksdb_ratelimiter_t* crocksdb_ratelimiter_create( extern C_ROCKSDB_LIBRARY_API crocksdb_ratelimiter_t* crocksdb_ratelimiter_create(
int64_t rate_bytes_per_sec, int64_t refill_period_us, int32_t fairness); int64_t rate_bytes_per_sec, int64_t refill_period_us, int32_t fairness);
extern C_ROCKSDB_LIBRARY_API void crocksdb_ratelimiter_destroy(crocksdb_ratelimiter_t*); extern C_ROCKSDB_LIBRARY_API void crocksdb_ratelimiter_destroy(crocksdb_ratelimiter_t*);
extern C_ROCKSDB_LIBRARY_API void crocksdb_ratelimiter_set_bytes_per_second(
crocksdb_ratelimiter_t *limiter, int64_t rate_bytes_per_sec);
extern C_ROCKSDB_LIBRARY_API int64_t crocksdb_ratelimiter_get_singleburst_bytes(
crocksdb_ratelimiter_t *limiter);
enum {
env_io_priority_low = 0,
env_io_priority_high = 1,
env_io_priority_total = 2,
};
extern C_ROCKSDB_LIBRARY_API void crocksdb_ratelimiter_request(crocksdb_ratelimiter_t *limiter,
int64_t bytes, unsigned char pri);
extern C_ROCKSDB_LIBRARY_API int64_t crocksdb_ratelimiter_get_total_bytes_through(
crocksdb_ratelimiter_t *limiter, unsigned char pri);
extern C_ROCKSDB_LIBRARY_API int64_t crocksdb_ratelimiter_get_bytes_per_second(
crocksdb_ratelimiter_t *limiter);
extern C_ROCKSDB_LIBRARY_API int64_t crocksdb_ratelimiter_get_total_requests(
crocksdb_ratelimiter_t *limiter, unsigned char pri);
/* Compaction Filter */ /* Compaction Filter */
......
...@@ -512,6 +512,21 @@ extern "C" { ...@@ -512,6 +512,21 @@ extern "C" {
fairness: i32, fairness: i32,
) -> *mut DBRateLimiter; ) -> *mut DBRateLimiter;
pub fn crocksdb_ratelimiter_destroy(limiter: *mut DBRateLimiter); pub fn crocksdb_ratelimiter_destroy(limiter: *mut DBRateLimiter);
pub fn crocksdb_ratelimiter_set_bytes_per_second(
limiter: *mut DBRateLimiter,
bytes_per_sec: i64,
);
pub fn crocksdb_ratelimiter_get_singleburst_bytes(limiter: *mut DBRateLimiter) -> i64;
pub fn crocksdb_ratelimiter_request(limiter: *mut DBRateLimiter, bytes: i64, pri: c_uchar);
pub fn crocksdb_ratelimiter_get_total_bytes_through(
limiter: *mut DBRateLimiter,
pri: c_uchar,
) -> i64;
pub fn crocksdb_ratelimiter_get_bytes_per_second(limiter: *mut DBRateLimiter) -> i64;
pub fn crocksdb_ratelimiter_get_total_requests(
limiter: *mut DBRateLimiter,
pri: c_uchar,
) -> i64;
pub fn crocksdb_options_set_soft_pending_compaction_bytes_limit(options: *mut Options, v: u64); pub fn crocksdb_options_set_soft_pending_compaction_bytes_limit(options: *mut Options, v: u64);
pub fn crocksdb_options_set_hard_pending_compaction_bytes_limit(options: *mut Options, v: u64); pub fn crocksdb_options_set_hard_pending_compaction_bytes_limit(options: *mut Options, v: u64);
pub fn crocksdb_options_set_compaction_priority(options: *mut Options, v: CompactionPriority); pub fn crocksdb_options_set_compaction_priority(options: *mut Options, v: CompactionPriority);
...@@ -950,13 +965,18 @@ extern "C" { ...@@ -950,13 +965,18 @@ extern "C" {
pub fn crocksdb_fifo_compaction_options_create() -> *mut DBFifoCompactionOptions; pub fn crocksdb_fifo_compaction_options_create() -> *mut DBFifoCompactionOptions;
pub fn crocksdb_fifo_compaction_options_set_max_table_files_size( pub fn crocksdb_fifo_compaction_options_set_max_table_files_size(
fifo_opts: *mut DBFifoCompactionOptions, size: uint64_t); fifo_opts: *mut DBFifoCompactionOptions,
size: uint64_t,
);
pub fn crocksdb_fifo_compaction_options_set_ttl( pub fn crocksdb_fifo_compaction_options_set_ttl(
fifo_opts: *mut DBFifoCompactionOptions, ttl: uint64_t); fifo_opts: *mut DBFifoCompactionOptions,
ttl: uint64_t,
);
pub fn crocksdb_fifo_compaction_options_set_allow_compaction( pub fn crocksdb_fifo_compaction_options_set_allow_compaction(
fifo_opts: *mut DBFifoCompactionOptions, allow_compaction: bool); fifo_opts: *mut DBFifoCompactionOptions,
pub fn crocksdb_fifo_compaction_options_destroy( allow_compaction: bool,
fifo_opts: *mut DBFifoCompactionOptions); );
pub fn crocksdb_fifo_compaction_options_destroy(fifo_opts: *mut DBFifoCompactionOptions);
pub fn crocksdb_compact_range( pub fn crocksdb_compact_range(
db: *mut DBInstance, db: *mut DBInstance,
......
...@@ -40,8 +40,9 @@ pub use merge_operator::MergeOperands; ...@@ -40,8 +40,9 @@ pub use merge_operator::MergeOperands;
pub use rocksdb::{BackupEngine, CFHandle, DBIterator, DBVector, Kv, Range, SeekKey, SstFileWriter, pub use rocksdb::{BackupEngine, CFHandle, DBIterator, DBVector, Kv, Range, SeekKey, SstFileWriter,
Writable, WriteBatch, DB}; Writable, WriteBatch, DB};
pub use rocksdb_options::{BlockBasedOptions, ColumnFamilyOptions, CompactOptions, DBOptions, pub use rocksdb_options::{BlockBasedOptions, ColumnFamilyOptions, CompactOptions, DBOptions,
EnvOptions, HistogramData, IngestExternalFileOptions, ReadOptions, EnvOptions, FifoCompactionOptions, HistogramData,
RestoreOptions, WriteOptions, FifoCompactionOptions}; IngestExternalFileOptions, RateLimiter, ReadOptions, RestoreOptions,
WriteOptions};
pub use slice_transform::SliceTransform; pub use slice_transform::SliceTransform;
pub use table_properties::{TableProperties, TablePropertiesCollection, pub use table_properties::{TableProperties, TablePropertiesCollection,
TablePropertiesCollectionView, UserCollectedProperties}; TablePropertiesCollectionView, UserCollectedProperties};
......
...@@ -16,9 +16,9 @@ ...@@ -16,9 +16,9 @@
use compaction_filter::{new_compaction_filter, CompactionFilter, CompactionFilterHandle}; use compaction_filter::{new_compaction_filter, CompactionFilter, CompactionFilterHandle};
use comparator::{self, compare_callback, ComparatorCallback}; use comparator::{self, compare_callback, ComparatorCallback};
use crocksdb_ffi::{self, DBBlockBasedTableOptions, DBCompactOptions, DBCompressionType, use crocksdb_ffi::{self, DBBlockBasedTableOptions, DBCompactOptions, DBCompressionType,
DBFlushOptions, DBInfoLogLevel, DBInstance, DBRateLimiter, DBReadOptions, DBFifoCompactionOptions, DBFlushOptions, DBInfoLogLevel, DBInstance,
DBRecoveryMode, DBRestoreOptions, DBSnapshot, DBStatisticsHistogramType, DBRateLimiter, DBReadOptions, DBRecoveryMode, DBRestoreOptions, DBSnapshot,
DBStatisticsTickerType, DBWriteOptions, DBFifoCompactionOptions, Options}; DBStatisticsHistogramType, DBStatisticsTickerType, DBWriteOptions, Options};
use event_listener::{new_event_listener, EventListener}; use event_listener::{new_event_listener, EventListener};
use libc::{self, c_double, c_int, c_uchar, c_void, size_t}; use libc::{self, c_double, c_int, c_uchar, c_void, size_t};
use merge_operator::{self, full_merge_callback, partial_merge_callback, MergeOperatorCallback}; use merge_operator::{self, full_merge_callback, partial_merge_callback, MergeOperatorCallback};
...@@ -164,6 +164,34 @@ impl RateLimiter { ...@@ -164,6 +164,34 @@ impl RateLimiter {
}; };
RateLimiter { inner: limiter } RateLimiter { inner: limiter }
} }
pub fn set_bytes_per_second(&mut self, bytes_per_sec: i64) {
unsafe {
crocksdb_ffi::crocksdb_ratelimiter_set_bytes_per_second(self.inner, bytes_per_sec);
}
}
pub fn get_singleburst_bytes(&self) -> i64 {
unsafe { crocksdb_ffi::crocksdb_ratelimiter_get_singleburst_bytes(self.inner) }
}
pub fn request(&mut self, bytes: i64, pri: c_uchar) {
unsafe {
crocksdb_ffi::crocksdb_ratelimiter_request(self.inner, bytes, pri);
}
}
pub fn get_total_bytes_through(&self, pri: c_uchar) -> i64 {
unsafe { crocksdb_ffi::crocksdb_ratelimiter_get_total_bytes_through(self.inner, pri) }
}
pub fn get_bytes_per_second(&self) -> i64 {
unsafe { crocksdb_ffi::crocksdb_ratelimiter_get_bytes_per_second(self.inner) }
}
pub fn get_total_requests(&self, pri: c_uchar) -> i64 {
unsafe { crocksdb_ffi::crocksdb_ratelimiter_get_total_requests(self.inner, pri) }
}
} }
impl Drop for RateLimiter { impl Drop for RateLimiter {
...@@ -1394,14 +1422,18 @@ impl FifoCompactionOptions { ...@@ -1394,14 +1422,18 @@ impl FifoCompactionOptions {
pub fn set_max_table_files_size(&mut self, max_table_files_size: u64) { pub fn set_max_table_files_size(&mut self, max_table_files_size: u64) {
unsafe { unsafe {
crocksdb_ffi::crocksdb_fifo_compaction_options_set_max_table_files_size( crocksdb_ffi::crocksdb_fifo_compaction_options_set_max_table_files_size(
self.inner, max_table_files_size); self.inner,
max_table_files_size,
);
} }
} }
pub fn set_allow_compaction(&mut self, allow_compaction: bool) { pub fn set_allow_compaction(&mut self, allow_compaction: bool) {
unsafe { unsafe {
crocksdb_ffi::crocksdb_fifo_compaction_options_set_allow_compaction( crocksdb_ffi::crocksdb_fifo_compaction_options_set_allow_compaction(
self.inner, allow_compaction); self.inner,
allow_compaction,
);
} }
} }
} }
......
...@@ -18,3 +18,4 @@ mod test_event_listener; ...@@ -18,3 +18,4 @@ mod test_event_listener;
mod test_delete_range; mod test_delete_range;
mod test_delete_files_in_range; mod test_delete_files_in_range;
mod test_read_only; mod test_read_only;
mod test_rate_limiter;
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
use rocksdb::RateLimiter;
#[test]
fn test_rate_limiter() {
let mut rate_limiter = RateLimiter::new(10 * 1024 * 1024, 100 * 1000, 10);
assert_eq!(rate_limiter.get_singleburst_bytes(), 1 * 1024 * 1024);
rate_limiter.set_bytes_per_second(20 * 1024 * 1024);
assert_eq!(rate_limiter.get_bytes_per_second(), 20 * 1024 * 1024);
assert_eq!(rate_limiter.get_singleburst_bytes(), 2 * 1024 * 1024);
let low = 0;
let high = 1;
let total = 2;
assert_eq!(rate_limiter.get_total_bytes_through(total), 0);
rate_limiter.request(1024 * 1024, low);
assert_eq!(rate_limiter.get_total_bytes_through(low), 1024 * 1024);
rate_limiter.request(2048 * 1024, high);
assert_eq!(rate_limiter.get_total_bytes_through(high), 2048 * 1024);
assert_eq!(rate_limiter.get_total_bytes_through(total), 3072 * 1024);
}
...@@ -11,8 +11,9 @@ ...@@ -11,8 +11,9 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use rocksdb::{BlockBasedOptions, ColumnFamilyOptions, CompactOptions, DBOptions, ReadOptions, use rocksdb::{BlockBasedOptions, ColumnFamilyOptions, CompactOptions, DBOptions,
SeekKey, SliceTransform, Writable, WriteOptions, FifoCompactionOptions, DB}; FifoCompactionOptions, ReadOptions, SeekKey, SliceTransform, Writable, WriteOptions,
DB};
use rocksdb::crocksdb_ffi::{CompactionPriority, DBCompressionType, DBInfoLogLevel as InfoLogLevel, use rocksdb::crocksdb_ffi::{CompactionPriority, DBCompressionType, DBInfoLogLevel as InfoLogLevel,
DBStatisticsHistogramType as HistogramType, DBStatisticsHistogramType as HistogramType,
DBStatisticsTickerType as TickerType}; DBStatisticsTickerType as TickerType};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment