Unverified Commit 45ca78eb authored by qupeng's avatar qupeng Committed by GitHub

compaction_filter: improve some APIs (#437)

Signed-off-by: 's avatarqupeng <qupeng@pingcap.com>
Co-authored-by: 's avatarzhangjinpeng1987 <zhangjinpeng@pingcap.com>
parent 74299c38
......@@ -31,3 +31,4 @@ path = "librocksdb_sys"
rand = "0.7"
crc = "1.8"
tempfile = "3.1"
tempdir = "0.3"
......@@ -302,6 +302,7 @@ struct crocksdb_compactionfilter_t : public CompactionFilter {
&c_new_value, &new_value_length, &c_value_changed);
if (c_value_changed) {
new_value->assign(c_new_value, new_value_length);
free(c_new_value);
*value_changed = true;
}
return result;
......@@ -3050,7 +3051,7 @@ crocksdb_compactionfilter_t* crocksdb_compactionfilter_create(
result->state_ = state;
result->destructor_ = destructor;
result->filter_ = filter;
result->ignore_snapshots_ = false;
result->ignore_snapshots_ = true;
result->name_ = name;
return result;
}
......
......@@ -11,7 +11,6 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
extern crate bzip2_sys;
extern crate libc;
......@@ -80,6 +79,10 @@ pub struct DBFlushOptions(c_void);
#[repr(C)]
pub struct DBCompactionFilter(c_void);
#[repr(C)]
pub struct DBCompactionFilterFactory(c_void);
#[repr(C)]
pub struct DBCompactionFilterContext(c_void);
#[repr(C)]
pub struct EnvOptions(c_void);
#[repr(C)]
pub struct SstFileReader(c_void);
......@@ -560,6 +563,10 @@ extern "C" {
options: *mut Options,
filter: *mut DBCompactionFilter,
);
pub fn crocksdb_options_set_compaction_filter_factory(
options: *mut Options,
filter: *mut DBCompactionFilterFactory,
);
pub fn crocksdb_options_set_create_if_missing(options: *mut Options, v: bool);
pub fn crocksdb_options_set_max_open_files(options: *mut Options, files: c_int);
pub fn crocksdb_options_set_max_total_wal_size(options: *mut Options, size: u64);
......@@ -1349,6 +1356,26 @@ extern "C" {
);
pub fn crocksdb_compactionfilter_destroy(filter: *mut DBCompactionFilter);
// Compaction filter context
pub fn crocksdb_compactionfiltercontext_is_full_compaction(
context: *const DBCompactionFilterContext,
) -> bool;
pub fn crocksdb_compactionfiltercontext_is_manual_compaction(
context: *const DBCompactionFilterContext,
) -> bool;
// Compaction filter factory
pub fn crocksdb_compactionfilterfactory_create(
state: *mut c_void,
destructor: extern "C" fn(*mut c_void),
create_compaction_filter: extern "C" fn(
*mut c_void,
*const DBCompactionFilterContext,
) -> *mut DBCompactionFilter,
name: extern "C" fn(*mut c_void) -> *const c_char,
) -> *mut DBCompactionFilterFactory;
pub fn crocksdb_compactionfilterfactory_destroy(factory: *mut DBCompactionFilterFactory);
// Env
pub fn crocksdb_default_env_create() -> *mut DBEnv;
pub fn crocksdb_mem_env_create() -> *mut DBEnv;
......
use crocksdb_ffi::{self, DBCompactionFilter};
use libc::{c_char, c_int, c_void, size_t};
use std::ffi::CString;
use std::slice;
use std::{mem, ptr, slice};
pub use crocksdb_ffi::DBCompactionFilter;
use crocksdb_ffi::{self, DBCompactionFilterContext, DBCompactionFilterFactory};
use libc::{c_char, c_int, c_void, malloc, memcpy, size_t};
/// `CompactionFilter` allows an application to modify/delete a key-value at
/// the time of compaction.
/// For more details, Please checkout rocksdb's documentation.
// TODO: support change value
pub trait CompactionFilter {
/// The compaction process invokes this
/// method for kv that is being compacted. A return value
......@@ -15,11 +16,18 @@ pub trait CompactionFilter {
/// indicates that this key-value should be removed from the
/// output of the compaction. The application can inspect
/// the existing value of the key and make decision based on it.
fn filter(&mut self, level: usize, key: &[u8], value: &[u8]) -> bool;
fn filter(
&mut self,
level: usize,
key: &[u8],
value: &[u8],
new_value: &mut Vec<u8>,
value_changed: &mut bool,
) -> bool;
}
#[repr(C)]
pub struct CompactionFilterProxy {
struct CompactionFilterProxy {
name: CString,
filter: Box<dyn CompactionFilter>,
}
......@@ -41,16 +49,37 @@ extern "C" fn filter(
key_len: size_t,
value: *const u8,
value_len: size_t,
_: *mut *mut u8,
_: *mut size_t,
new_value: *mut *mut u8,
new_value_len: *mut size_t,
value_changed: *mut bool,
) -> bool {
unsafe {
*new_value = ptr::null_mut();
*new_value_len = 0;
*value_changed = false;
let filter = &mut *(filter as *mut CompactionFilterProxy);
let key = slice::from_raw_parts(key, key_len);
let value = slice::from_raw_parts(value, value_len);
*value_changed = false;
filter.filter.filter(level as usize, key, value)
let mut new_value_v = Vec::default();
let filtered = filter.filter.filter(
level as usize,
key,
value,
&mut new_value_v,
mem::transmute(&mut *value_changed),
);
if *value_changed {
*new_value_len = new_value_v.len();
// The vector is allocated in Rust, so dup it before pass into C.
*new_value = malloc(*new_value_len) as *mut u8;
memcpy(
new_value as *mut c_void,
&new_value_v[0] as *const u8 as *const c_void,
*new_value_len,
);
}
filtered
}
}
......@@ -68,19 +97,206 @@ impl Drop for CompactionFilterHandle {
pub unsafe fn new_compaction_filter(
c_name: CString,
ignore_snapshots: bool,
f: Box<dyn CompactionFilter>,
) -> Result<CompactionFilterHandle, String> {
) -> CompactionFilterHandle {
let filter = new_compaction_filter_raw(c_name, f);
CompactionFilterHandle { inner: filter }
}
/// Just like `new_compaction_filter`, but returns a raw pointer instead of a RAII struct.
/// Generally used in `CompactionFilterFactory::create_compaction_filter`.
pub unsafe fn new_compaction_filter_raw(
c_name: CString,
f: Box<dyn CompactionFilter>,
) -> *mut DBCompactionFilter {
let proxy = Box::into_raw(Box::new(CompactionFilterProxy {
name: c_name,
filter: f,
}));
let filter = crocksdb_ffi::crocksdb_compactionfilter_create(
crocksdb_ffi::crocksdb_compactionfilter_create(proxy as *mut c_void, destructor, filter, name)
}
pub struct CompactionFilterContext(DBCompactionFilterContext);
impl CompactionFilterContext {
pub fn is_full_compaction(&self) -> bool {
let ctx = &self.0 as *const DBCompactionFilterContext;
unsafe { crocksdb_ffi::crocksdb_compactionfiltercontext_is_full_compaction(ctx) }
}
pub fn is_manual_compaction(&self) -> bool {
let ctx = &self.0 as *const DBCompactionFilterContext;
unsafe { crocksdb_ffi::crocksdb_compactionfiltercontext_is_manual_compaction(ctx) }
}
}
pub trait CompactionFilterFactory {
fn create_compaction_filter(
&self,
context: &CompactionFilterContext,
) -> *mut DBCompactionFilter;
}
#[repr(C)]
struct CompactionFilterFactoryProxy {
name: CString,
factory: Box<dyn CompactionFilterFactory>,
}
mod factory {
use super::{CompactionFilterContext, CompactionFilterFactoryProxy};
use crocksdb_ffi::{DBCompactionFilter, DBCompactionFilterContext};
use libc::{c_char, c_void};
pub(super) extern "C" fn name(factory: *mut c_void) -> *const c_char {
unsafe {
let proxy = &*(factory as *mut CompactionFilterFactoryProxy);
proxy.name.as_ptr()
}
}
pub(super) extern "C" fn destructor(factory: *mut c_void) {
unsafe {
Box::from_raw(factory as *mut CompactionFilterFactoryProxy);
}
}
pub(super) extern "C" fn create_compaction_filter(
factory: *mut c_void,
context: *const DBCompactionFilterContext,
) -> *mut DBCompactionFilter {
unsafe {
let factory = &mut *(factory as *mut CompactionFilterFactoryProxy);
let context: &CompactionFilterContext = &*(context as *const CompactionFilterContext);
factory.factory.create_compaction_filter(context)
}
}
}
pub struct CompactionFilterFactoryHandle {
pub inner: *mut DBCompactionFilterFactory,
}
impl Drop for CompactionFilterFactoryHandle {
fn drop(&mut self) {
unsafe {
crocksdb_ffi::crocksdb_compactionfilterfactory_destroy(self.inner);
}
}
}
pub unsafe fn new_compaction_filter_factory(
c_name: CString,
f: Box<dyn CompactionFilterFactory>,
) -> Result<CompactionFilterFactoryHandle, String> {
let proxy = Box::into_raw(Box::new(CompactionFilterFactoryProxy {
name: c_name,
factory: f,
}));
let factory = crocksdb_ffi::crocksdb_compactionfilterfactory_create(
proxy as *mut c_void,
destructor,
filter,
name,
self::factory::destructor,
self::factory::create_compaction_filter,
self::factory::name,
);
crocksdb_ffi::crocksdb_compactionfilter_set_ignore_snapshots(filter, ignore_snapshots);
Ok(CompactionFilterHandle { inner: filter })
Ok(CompactionFilterFactoryHandle { inner: factory })
}
#[cfg(test)]
mod tests {
use std::ffi::CString;
use std::sync::mpsc::{self, SyncSender};
use std::time::Duration;
use super::{
CompactionFilter, CompactionFilterContext, CompactionFilterFactory, DBCompactionFilter,
};
use crate::{ColumnFamilyOptions, DBOptions, DB};
struct Factory(SyncSender<()>);
impl Drop for Factory {
fn drop(&mut self) {
self.0.send(()).unwrap();
}
}
impl CompactionFilterFactory for Factory {
fn create_compaction_filter(&self, _: &CompactionFilterContext) -> *mut DBCompactionFilter {
return std::ptr::null_mut();
}
}
struct Filter(SyncSender<()>);
impl Drop for Filter {
fn drop(&mut self) {
self.0.send(()).unwrap();
}
}
impl CompactionFilter for Filter {
fn filter(&mut self, _: usize, _: &[u8], _: &[u8], _: &mut Vec<u8>, _: &mut bool) -> bool {
false
}
}
#[test]
fn test_factory_destructor() {
let (tx, rx) = mpsc::sync_channel(1);
let mut cf_opts = ColumnFamilyOptions::default();
let name = CString::new("compaction filter factory").unwrap();
let factory = Box::new(Factory(tx)) as Box<dyn CompactionFilterFactory>;
cf_opts
.set_compaction_filter_factory(name, factory)
.unwrap();
drop(cf_opts);
assert!(rx.recv_timeout(Duration::from_secs(1)).is_ok());
let dir = tempdir::TempDir::new("compaction_filter").unwrap();
let path = dir.path().to_str().unwrap();
let (tx, rx) = mpsc::sync_channel(1);
let mut db_opts = DBOptions::default();
db_opts.create_if_missing(true);
let mut cfds = Vec::new();
cfds.push(("default", {
let mut cf_opts = ColumnFamilyOptions::default();
let name = CString::new("compaction filter factory").unwrap();
let factory = Box::new(Factory(tx)) as Box<dyn CompactionFilterFactory>;
cf_opts
.set_compaction_filter_factory(name, factory)
.unwrap();
cf_opts
}));
let db = DB::open_cf(db_opts, path, cfds);
drop(db);
assert!(rx.recv_timeout(Duration::from_secs(1)).is_ok());
}
#[test]
fn test_filter_destructor() {
let (tx, rx) = mpsc::sync_channel(1);
let mut cf_opts = ColumnFamilyOptions::default();
let name = CString::new("compaction filter factory").unwrap();
let filter = Box::new(Filter(tx)) as Box<dyn CompactionFilter>;
cf_opts.set_compaction_filter(name, filter).unwrap();
drop(cf_opts);
assert!(rx.recv_timeout(Duration::from_secs(1)).is_ok());
let dir = tempdir::TempDir::new("compaction_filter").unwrap();
let path = dir.path().to_str().unwrap();
let (tx, rx) = mpsc::sync_channel(1);
let mut db_opts = DBOptions::default();
db_opts.create_if_missing(true);
let mut cfds = Vec::new();
cfds.push(("default", {
let mut cf_opts = ColumnFamilyOptions::default();
let name = CString::new("compaction filter factory").unwrap();
let filter = Box::new(Filter(tx)) as Box<dyn CompactionFilter>;
cf_opts.set_compaction_filter(name, filter).unwrap();
cf_opts
}));
let db = DB::open_cf(db_opts, path, cfds);
drop(db);
assert!(rx.recv_timeout(Duration::from_secs(1)).is_ok());
}
}
......@@ -24,7 +24,11 @@ pub extern crate librocksdb_sys;
#[cfg(test)]
extern crate tempfile;
pub use compaction_filter::CompactionFilter;
pub use compaction_filter::{
new_compaction_filter, new_compaction_filter_factory, new_compaction_filter_raw,
CompactionFilter, CompactionFilterContext, CompactionFilterFactory,
CompactionFilterFactoryHandle, CompactionFilterHandle, DBCompactionFilter,
};
pub use event_listener::{
CompactionJobInfo, EventListener, FlushJobInfo, IngestionInfo, WriteStallInfo,
};
......
......@@ -13,7 +13,10 @@
// limitations under the License.
//
use compaction_filter::{new_compaction_filter, CompactionFilter, CompactionFilterHandle};
use compaction_filter::{
new_compaction_filter, new_compaction_filter_factory, CompactionFilter,
CompactionFilterFactory, CompactionFilterHandle,
};
use comparator::{self, compare_callback, ComparatorCallback};
use crocksdb_ffi::{
self, DBBlockBasedTableOptions, DBBottommostLevelCompaction, DBCompactOptions,
......@@ -1226,16 +1229,12 @@ impl ColumnFamilyOptions {
/// set.
///
/// By default, compaction will only pass keys written after the most
/// recent call to GetSnapshot() to filter. However, if `ignore_snapshots`
/// is set to true, even if the keys were written before the last snapshot
/// will be passed to filter too. For more details please checkout
/// rocksdb's documentation.
/// recent call to GetSnapshot() to filter.
///
/// See also `CompactionFilter`.
pub fn set_compaction_filter<S>(
&mut self,
name: S,
ignore_snapshots: bool,
filter: Box<dyn CompactionFilter>,
) -> Result<(), String>
where
......@@ -1246,11 +1245,32 @@ impl ColumnFamilyOptions {
Ok(s) => s,
Err(e) => return Err(format!("failed to convert to cstring: {:?}", e)),
};
self.filter = Some(new_compaction_filter(c_name, ignore_snapshots, filter)?);
crocksdb_ffi::crocksdb_options_set_compaction_filter(
self.inner,
self.filter.as_ref().unwrap().inner,
);
let filter = new_compaction_filter(c_name, filter);
crocksdb_ffi::crocksdb_options_set_compaction_filter(self.inner, filter.inner);
self.filter = Some(filter);
Ok(())
}
}
/// Set compaction filter factory.
///
/// See also `CompactionFilterFactory`.
pub fn set_compaction_filter_factory<S>(
&mut self,
name: S,
factory: Box<dyn CompactionFilterFactory>,
) -> Result<(), String>
where
S: Into<Vec<u8>>,
{
let c_name = match CString::new(name) {
Ok(s) => s,
Err(e) => return Err(format!("failed to convert to cstring: {:?}", e)),
};
unsafe {
let factory = new_compaction_filter_factory(c_name, factory)?;
crocksdb_ffi::crocksdb_options_set_compaction_filter_factory(self.inner, factory.inner);
std::mem::forget(factory); // Deconstructor will be called after `self` is dropped.
Ok(())
}
}
......
......@@ -24,7 +24,14 @@ struct Filter {
}
impl CompactionFilter for Filter {
fn filter(&mut self, _: usize, key: &[u8], value: &[u8]) -> bool {
fn filter(
&mut self,
_: usize,
key: &[u8],
value: &[u8],
_: &mut Vec<u8>,
_: &mut bool,
) -> bool {
self.filtered_kvs
.write()
.unwrap()
......@@ -42,63 +49,23 @@ impl Drop for Filter {
#[test]
fn test_compaction_filter() {
let path = tempdir_with_prefix("_rust_rocksdb_writebacktest");
let mut cf_opts = ColumnFamilyOptions::new();
let drop_called = Arc::new(AtomicBool::new(false));
let filtered_kvs = Arc::new(RwLock::new(vec![]));
// set ignore_snapshots to false
cf_opts
.set_compaction_filter(
"test",
false,
Box::new(Filter {
drop_called: drop_called.clone(),
filtered_kvs: filtered_kvs.clone(),
}),
)
.unwrap();
let mut opts = DBOptions::new();
opts.create_if_missing(true);
let db = DB::open_cf(
opts,
path.path().to_str().unwrap(),
vec![("default", cf_opts)],
)
.unwrap();
let samples = vec![
(b"key1".to_vec(), b"value1".to_vec()),
(b"key2".to_vec(), b"value2".to_vec()),
];
for &(ref k, ref v) in &samples {
db.put(k, v).unwrap();
assert_eq!(v.as_slice(), &*db.get(k).unwrap().unwrap());
}
{
let _snap = db.snapshot();
// Because ignore_snapshots is false, so force compact will not effect
// the keys written before.
db.compact_range(Some(b"key1"), Some(b"key3"));
for &(ref k, ref v) in &samples {
assert_eq!(v.as_slice(), &*db.get(k).unwrap().unwrap());
}
assert!(filtered_kvs.read().unwrap().is_empty());
}
drop(db);
// reregister with ignore_snapshots set to true
let mut cf_opts = ColumnFamilyOptions::new();
let opts = DBOptions::new();
cf_opts
.set_compaction_filter(
"test",
true,
Box::new(Filter {
drop_called: drop_called.clone(),
filtered_kvs: filtered_kvs.clone(),
}),
)
.unwrap();
assert!(drop_called.load(Ordering::Relaxed));
drop_called.store(false, Ordering::Relaxed);
let mut opts = DBOptions::new();
opts.create_if_missing(true);
{
let db = DB::open_cf(
opts,
......@@ -106,6 +73,17 @@ fn test_compaction_filter() {
vec![("default", cf_opts)],
)
.unwrap();
let samples = vec![
(b"key1".to_vec(), b"value1".to_vec()),
(b"key2".to_vec(), b"value2".to_vec()),
];
for &(ref k, ref v) in &samples {
db.put(k, v).unwrap();
assert_eq!(v.as_slice(), &*db.get(k).unwrap().unwrap());
}
let _snap = db.snapshot();
// Because ignore_snapshots is true, so all the keys will be compacted.
db.compact_range(Some(b"key1"), Some(b"key3"));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment