Commit 4ba34cdd authored by Jay's avatar Jay Committed by GitHub

support compaction filter (#43)

parent 87f2aa24
......@@ -34,6 +34,7 @@ pub enum DBCFHandle {}
pub enum DBWriteBatch {}
pub enum DBComparator {}
pub enum DBFlushOptions {}
pub enum DBCompactionFilter {}
pub fn new_bloom_filter(bits: c_int) -> *mut DBFilterPolicy {
unsafe { rocksdb_filterpolicy_create_bloom(bits) }
......@@ -125,6 +126,8 @@ extern "C" {
threads: c_int);
pub fn rocksdb_options_optimize_level_style_compaction(
options: *mut DBOptions, memtable_memory_budget: c_int);
pub fn rocksdb_options_set_compaction_filter(options: *mut DBOptions,
filter: *mut DBCompactionFilter);
pub fn rocksdb_options_set_create_if_missing(options: *mut DBOptions, v: bool);
pub fn rocksdb_options_set_max_open_files(options: *mut DBOptions,
files: c_int);
......@@ -444,6 +447,8 @@ extern "C" {
range_limit_key: *const *const u8,
range_limit_key_len: *const size_t,
sizes: *mut uint64_t);
pub fn rocksdb_compact_range(db: *mut DBInstance, start_key: *const u8, start_key_len: size_t, limit_key: *const u8, limit_key_len: size_t);
pub fn rocksdb_compact_range_cf(db: *mut DBInstance, cf: *mut DBCFHandle, start_key: *const u8, start_key_len: size_t, limit_key: *const u8, limit_key_len: size_t);
pub fn rocksdb_delete_file_in_range(db: *mut DBInstance,
range_start_key: *const u8,
range_start_key_len: size_t,
......@@ -464,6 +469,13 @@ extern "C" {
cf: *mut DBCFHandle,
propname: *const c_char)
-> *mut c_char;
// Compaction filter
pub fn rocksdb_compactionfilter_create(state: *mut c_void,
destructor: extern fn(*mut c_void),
filter: extern fn(*mut c_void, c_int, *const u8, size_t, *const u8, size_t, *mut *mut u8, *mut size_t, *mut bool) -> bool,
name: extern fn(*mut c_void) -> *const c_char) -> *mut DBCompactionFilter;
pub fn rocksdb_compactionfilter_set_ignore_snapshots(filter: *mut DBCompactionFilter, ignore_snapshot: bool);
pub fn rocksdb_compactionfilter_destroy(filter: *mut DBCompactionFilter);
}
#[cfg(test)]
......
use libc::{c_void, c_char, c_int, size_t};
use rocksdb_ffi::{self, DBCompactionFilter};
use std::ffi::CString;
use std::slice;
/// `CompactionFilter` allows an application to modify/delete a key-value at
/// the time of compaction.
/// For more details, Please checkout rocksdb's documentation.
// TODO: support change value
pub trait CompactionFilter {
/// The compaction process invokes this
/// method for kv that is being compacted. A return value
/// of false indicates that the kv should be preserved in the
/// output of this compaction run and a return value of true
/// indicates that this key-value should be removed from the
/// output of the compaction. The application can inspect
/// the existing value of the key and make decision based on it.
fn filter(&mut self, level: usize, key: &[u8], value: &[u8]) -> bool;
}
#[repr(C)]
pub struct CompactionFilterProxy {
name: CString,
filter: Box<CompactionFilter>,
}
extern "C" fn name(filter: *mut c_void) -> *const c_char {
unsafe { (*(filter as *mut CompactionFilterProxy)).name.as_ptr() }
}
extern "C" fn destructor(filter: *mut c_void) {
unsafe {
Box::from_raw(filter as *mut CompactionFilterProxy);
}
}
extern "C" fn filter(filter: *mut c_void,
level: c_int,
key: *const u8,
key_len: size_t,
value: *const u8,
value_len: size_t,
_: *mut *mut u8,
_: *mut size_t,
value_changed: *mut bool)
-> bool {
unsafe {
let filter = &mut *(filter as *mut CompactionFilterProxy);
let key = slice::from_raw_parts(key, key_len);
let value = slice::from_raw_parts(value, value_len);
*value_changed = false;
filter.filter.filter(level as usize, key, value)
}
}
pub struct CompactionFilterHandle {
pub inner: *mut DBCompactionFilter,
}
impl Drop for CompactionFilterHandle {
fn drop(&mut self) {
unsafe {
rocksdb_ffi::rocksdb_compactionfilter_destroy(self.inner);
}
}
}
pub unsafe fn new_compaction_filter
(c_name: CString,
ignore_snapshots: bool,
f: Box<CompactionFilter>)
-> Result<CompactionFilterHandle, String> {
let proxy = Box::into_raw(Box::new(CompactionFilterProxy {
name: c_name,
filter: f,
}));
let filter =
rocksdb_ffi::rocksdb_compactionfilter_create(proxy as *mut c_void,
destructor,
filter,
name);
rocksdb_ffi::rocksdb_compactionfilter_set_ignore_snapshots(filter, ignore_snapshots);
Ok(CompactionFilterHandle { inner: filter })
}
......@@ -23,10 +23,13 @@ pub mod rocksdb;
pub mod rocksdb_options;
pub mod merge_operator;
pub mod comparator;
mod compaction_filter;
pub use librocksdb_sys::{DBCompactionStyle, DBCompressionType,
DBRecoveryMode, new_bloom_filter, self as rocksdb_ffi};
pub use compaction_filter::CompactionFilter;
pub use librocksdb_sys::{DBCompactionStyle, DBCompressionType, DBRecoveryMode,
new_bloom_filter, self as rocksdb_ffi};
pub use merge_operator::MergeOperands;
pub use rocksdb::{DB, DBIterator, DBVector, Kv, SeekKey, Writable, WriteBatch, CFHandle};
pub use rocksdb::{DB, DBIterator, DBVector, Kv, SeekKey, Writable, WriteBatch,
CFHandle};
pub use rocksdb_options::{BlockBasedOptions, Options, ReadOptions,
WriteOptions};
......@@ -35,7 +35,9 @@ pub struct CFHandle {
impl Drop for CFHandle {
fn drop(&mut self) {
unsafe { rocksdb_ffi::rocksdb_column_family_handle_destroy(self.inner); }
unsafe {
rocksdb_ffi::rocksdb_column_family_handle_destroy(self.inner);
}
}
}
......@@ -348,12 +350,12 @@ impl DB {
let mut err = 0 as *mut c_char;
let db = unsafe {
rocksdb_ffi::rocksdb_open_column_families(opts.inner,
cpath.as_ptr(),
cfs_v.len() as c_int,
cfnames.as_ptr(),
cfopts.as_ptr(),
cfhandles.as_ptr(),
&mut err)
cpath.as_ptr(),
cfs_v.len() as c_int,
cfnames.as_ptr(),
cfopts.as_ptr(),
cfhandles.as_ptr(),
&mut err)
};
if !err.is_null() {
return Err(error_message(err));
......@@ -386,7 +388,9 @@ impl DB {
let cpath = CString::new(path.as_bytes()).unwrap();
let mut err = 0 as *mut c_char;
unsafe {
rocksdb_ffi::rocksdb_destroy_db(opts.inner, cpath.as_ptr(), &mut err);
rocksdb_ffi::rocksdb_destroy_db(opts.inner,
cpath.as_ptr(),
&mut err);
}
if !err.is_null() {
return Err(error_message(err));
......@@ -398,7 +402,9 @@ impl DB {
let cpath = CString::new(path.as_bytes()).unwrap();
let mut err = 0 as *mut c_char;
unsafe {
rocksdb_ffi::rocksdb_repair_db(opts.inner, cpath.as_ptr(), &mut err);
rocksdb_ffi::rocksdb_repair_db(opts.inner,
cpath.as_ptr(),
&mut err);
}
if !err.is_null() {
return Err(error_message(err));
......@@ -406,11 +412,15 @@ impl DB {
Ok(())
}
pub fn list_column_families(opts: &Options, path: &str) -> Result<Vec<String>, String> {
pub fn list_column_families(opts: &Options,
path: &str)
-> Result<Vec<String>, String> {
let cpath = match CString::new(path.as_bytes()) {
Ok(c) => c,
Err(_) => {
return Err("Failed to convert path to CString when list column families".to_owned())
return Err("Failed to convert path to CString when list \
column families"
.to_owned())
}
};
......@@ -418,19 +428,23 @@ impl DB {
unsafe {
let mut lencf: size_t = 0;
let mut err = 0 as *mut c_char;
let list = rocksdb_ffi::rocksdb_list_column_families(opts.inner,
cpath.as_ptr(),
&mut lencf,
&mut err);
let list =
rocksdb_ffi::rocksdb_list_column_families(opts.inner,
cpath.as_ptr(),
&mut lencf,
&mut err);
if !err.is_null() {
return Err(error_message(err));
}
let list_cfs = slice::from_raw_parts(list, lencf);
for &cf_name in list_cfs {
let cf = match CStr::from_ptr(cf_name).to_owned().into_string() {
Ok(s) => s,
Err(e) => return Err(format!("invalid utf8 bytes: {:?}", e)),
};
let cf =
match CStr::from_ptr(cf_name).to_owned().into_string() {
Ok(s) => s,
Err(e) => {
return Err(format!("invalid utf8 bytes: {:?}", e))
}
};
cfs.push(cf);
}
rocksdb_ffi::rocksdb_list_column_families_destroy(list, lencf);
......@@ -478,13 +492,12 @@ impl DB {
let val_len: size_t = 0;
let val_len_ptr = &val_len as *const size_t;
let mut err = 0 as *mut c_char;
let val =
rocksdb_ffi::rocksdb_get(self.inner,
readopts.get_inner(),
key.as_ptr(),
key.len() as size_t,
val_len_ptr,
&mut err);
let val = rocksdb_ffi::rocksdb_get(self.inner,
readopts.get_inner(),
key.as_ptr(),
key.len() as size_t,
val_len_ptr,
&mut err);
if !err.is_null() {
return Err(error_message(err));
}
......@@ -509,14 +522,13 @@ impl DB {
let val_len: size_t = 0;
let val_len_ptr = &val_len as *const size_t;
let mut err = 0 as *mut c_char;
let val =
rocksdb_ffi::rocksdb_get_cf(self.inner,
readopts.get_inner(),
cf.inner,
key.as_ptr(),
key.len() as size_t,
val_len_ptr,
&mut err);
let val = rocksdb_ffi::rocksdb_get_cf(self.inner,
readopts.get_inner(),
cf.inner,
key.as_ptr(),
key.len() as size_t,
val_len_ptr,
&mut err);
if !err.is_null() {
return Err(error_message(err));
}
......@@ -564,10 +576,8 @@ impl DB {
Entry::Occupied(mut e) => {
e.insert(handle);
e.into_mut()
},
Entry::Vacant(e) => {
e.insert(handle)
}
Entry::Vacant(e) => e.insert(handle),
})
}
}
......@@ -838,6 +848,30 @@ impl DB {
sizes
}
pub fn compact_range(&self, start_key: &[u8], end_key: &[u8]) {
unsafe {
rocksdb_ffi::rocksdb_compact_range(self.inner,
start_key.as_ptr(),
start_key.len() as size_t,
end_key.as_ptr(),
end_key.len());
}
}
pub fn compact_range_cf(&self,
cf: &CFHandle,
start_key: &[u8],
end_key: &[u8]) {
unsafe {
rocksdb_ffi::rocksdb_compact_range_cf(self.inner,
cf.inner,
start_key.as_ptr(),
start_key.len() as size_t,
end_key.as_ptr(),
end_key.len());
}
}
pub fn delete_file_in_range(&self,
start_key: &[u8],
end_key: &[u8])
......@@ -1248,7 +1282,8 @@ mod test {
#[test]
fn list_column_families_test() {
let path = TempDir::new("_rust_rocksdb_list_column_families_test").expect("");
let path = TempDir::new("_rust_rocksdb_list_column_families_test")
.expect("");
let mut cfs = ["default", "cf1", "cf2", "cf3"];
{
let mut cfs_opts = vec![];
......@@ -1259,7 +1294,8 @@ mod test {
let mut opts = Options::new();
opts.create_if_missing(true);
let mut db = DB::open(&opts, path.path().to_str().unwrap()).unwrap();
let mut db = DB::open(&opts, path.path().to_str().unwrap())
.unwrap();
for (&cf, &cf_opts) in cfs.iter().zip(&cfs_ref_opts) {
if cf == "default" {
continue;
......@@ -1268,7 +1304,10 @@ mod test {
}
}
let opts_list_cfs = Options::new();
let mut cfs_vec = DB::list_column_families(&opts_list_cfs, path.path().to_str().unwrap()).unwrap();
let mut cfs_vec =
DB::list_column_families(&opts_list_cfs,
path.path().to_str().unwrap())
.unwrap();
cfs_vec.sort();
cfs.sort();
assert_eq!(cfs_vec, cfs);
......
......@@ -13,6 +13,8 @@
// limitations under the License.
//
use compaction_filter::{CompactionFilter, new_compaction_filter,
CompactionFilterHandle};
use comparator::{self, ComparatorCallback, compare_callback};
use libc::{c_int, size_t};
use merge_operator::{self, MergeOperatorCallback, full_merge_callback,
......@@ -20,7 +22,8 @@ use merge_operator::{self, MergeOperatorCallback, full_merge_callback,
use merge_operator::MergeFn;
use rocksdb_ffi::{self, DBOptions, DBWriteOptions, DBBlockBasedTableOptions,
DBReadOptions, DBCompressionType, DBRecoveryMode, DBSnapshot, DBInstance};
DBReadOptions, DBCompressionType, DBRecoveryMode,
DBSnapshot, DBInstance};
use std::ffi::CString;
use std::mem;
......@@ -213,6 +216,7 @@ impl WriteOptions {
pub struct Options {
pub inner: *mut DBOptions,
filter: Option<CompactionFilterHandle>,
}
impl Drop for Options {
......@@ -228,7 +232,10 @@ impl Default for Options {
unsafe {
let opts = rocksdb_ffi::rocksdb_options_create();
assert!(!opts.is_null(), "Could not create rocksdb options");
Options { inner: opts }
Options {
inner: opts,
filter: None,
}
}
}
}
......@@ -253,6 +260,44 @@ impl Options {
}
}
/// Set compaction filter.
///
/// filter will be dropped when this option is dropped or a new filter is
/// set.
///
/// By default, compaction will only pass keys written after the most
/// recent call to GetSnapshot() to filter. However, if `ignore_snapshots`
/// is set to true, even if the keys were written before the last snapshot
/// will be passed to filter too. For more details please checkout
/// rocksdb's documentation.
///
/// See also `CompactionFilter`.
pub fn set_compaction_filter<S>(&mut self,
name: S,
ignore_snapshots: bool,
filter: Box<CompactionFilter>)
-> Result<(), String>
where S: Into<Vec<u8>>
{
unsafe {
let c_name = match CString::new(name) {
Ok(s) => s,
Err(e) => {
return Err(format!("failed to convert to cstring: {:?}", e))
}
};
self.filter = Some(try!(new_compaction_filter(c_name,
ignore_snapshots,
filter)));
rocksdb_ffi::rocksdb_options_set_compaction_filter(self.inner,
self.filter
.as_ref()
.unwrap()
.inner);
Ok(())
}
}
pub fn create_if_missing(&mut self, create_if_missing: bool) {
unsafe {
rocksdb_ffi::rocksdb_options_set_create_if_missing(
......
......@@ -4,3 +4,4 @@ extern crate tempdir;
mod test_iterator;
mod test_multithreaded;
mod test_column_family;
mod test_compaction_filter;
use tempdir::TempDir;
use std::sync::{Arc, RwLock};
use std::sync::atomic::{AtomicBool, Ordering};
use rocksdb::{Writable, DB, CompactionFilter, Options};
struct Filter {
drop_called: Arc<AtomicBool>,
filtered_kvs: Arc<RwLock<Vec<(Vec<u8>, Vec<u8>)>>>,
}
impl CompactionFilter for Filter {
fn filter(&mut self, _: usize, key: &[u8], value: &[u8]) -> bool {
self.filtered_kvs.write().unwrap().push((key.to_vec(), value.to_vec()));
true
}
}
impl Drop for Filter {
fn drop(&mut self) {
self.drop_called.store(true, Ordering::Relaxed);
}
}
#[test]
fn test_compaction_filter() {
let path = TempDir::new("_rust_rocksdb_writebacktest").expect("");
let mut opts = Options::new();
let drop_called = Arc::new(AtomicBool::new(false));
let filtered_kvs = Arc::new(RwLock::new(vec![]));
// set ignore_snapshots to false
opts.set_compaction_filter("test", false, Box::new(Filter {
drop_called: drop_called.clone(),
filtered_kvs: filtered_kvs.clone(),
})).unwrap();
opts.create_if_missing(true);
let db = DB::open(&opts, path.path().to_str().unwrap()).unwrap();
let samples = vec![
(b"key1".to_vec(), b"value1".to_vec()),
(b"key2".to_vec(), b"value2".to_vec()),
];
for &(ref k, ref v) in &samples {
db.put(k, v).unwrap();
assert_eq!(v.as_slice(), &*db.get(k).unwrap().unwrap());
}
{
let _snap = db.snapshot();
// Because ignore_snapshots is false, so force compact will not effect
// the keys written before.
db.compact_range(b"key1", b"key3");
for &(ref k, ref v) in &samples {
assert_eq!(v.as_slice(), &*db.get(k).unwrap().unwrap());
}
assert!(filtered_kvs.read().unwrap().is_empty());
}
drop(db);
// reregister with ignore_snapshots set to true
opts.set_compaction_filter("test", true, Box::new(Filter {
drop_called: drop_called.clone(),
filtered_kvs: filtered_kvs.clone(),
})).unwrap();
assert!(drop_called.load(Ordering::Relaxed));
drop_called.store(false, Ordering::Relaxed);
{
let db = DB::open(&opts, path.path().to_str().unwrap()).unwrap();
let _snap = db.snapshot();
// Because ignore_snapshots is true, so all the keys will be compacted.
db.compact_range(b"key1", b"key3");
for &(ref k, _) in &samples {
assert!(db.get(k).unwrap().is_none());
}
assert_eq!(*filtered_kvs.read().unwrap(), samples);
}
drop(opts);
assert!(drop_called.load(Ordering::Relaxed));
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment