Unverified Commit d26d0a1f authored by Croxx's avatar Croxx Committed by GitHub

expose key boundaries of compaction filter context (#630)

expose `start_key` and `end_key` in `CompactionFilterContext`.
Signed-off-by: 's avatarMrCroxx <mrcroxx@outlook.com>
parent 2cd2f5a2
......@@ -3449,6 +3449,20 @@ crocksdb_table_properties_t* crocksdb_compactionfiltercontext_table_properties(
.get();
}
const char* crocksdb_compactionfiltercontext_start_key(
crocksdb_compactionfiltercontext_t* context, size_t* key_len) {
const Slice& result = context->rep.start_key;
*key_len = result.size();
return result.data();
}
const char* crocksdb_compactionfiltercontext_end_key(
crocksdb_compactionfiltercontext_t* context, size_t* key_len) {
const Slice& result = context->rep.end_key;
*key_len = result.size();
return result.data();
}
crocksdb_compactionfilterfactory_t* crocksdb_compactionfilterfactory_create(
void* state, void (*destructor)(void*),
crocksdb_compactionfilter_t* (*create_compaction_filter)(
......
......@@ -1579,6 +1579,14 @@ extern "C" {
context: *const DBCompactionFilterContext,
offset: usize,
) -> *const DBTableProperties;
pub fn crocksdb_compactionfiltercontext_start_key(
context: *const DBCompactionFilterContext,
key_len: *mut size_t,
) -> *const c_char;
pub fn crocksdb_compactionfiltercontext_end_key(
context: *const DBCompactionFilterContext,
key_len: *mut size_t,
) -> *const c_char;
// Compaction filter factory
pub fn crocksdb_compactionfilterfactory_create(
......
use std::ffi::CString;
use std::{ptr, slice};
use std::{ptr, slice, usize};
use crate::table_properties::TableProperties;
use crocksdb_ffi::CompactionFilterDecision as RawCompactionFilterDecision;
......@@ -199,6 +199,28 @@ impl CompactionFilterContext {
TableProperties::from_ptr(raw)
}
}
pub fn start_key(&self) -> &[u8] {
let ctx = &self.0 as *const DBCompactionFilterContext;
unsafe {
let mut start_key_len: usize = 0;
let start_key_ptr =
crocksdb_ffi::crocksdb_compactionfiltercontext_start_key(ctx, &mut start_key_len)
as *const u8;
slice::from_raw_parts(start_key_ptr, start_key_len)
}
}
pub fn end_key(&self) -> &[u8] {
let ctx = &self.0 as *const DBCompactionFilterContext;
unsafe {
let mut end_key_len: usize = 0;
let end_key_ptr =
crocksdb_ffi::crocksdb_compactionfiltercontext_end_key(ctx, &mut end_key_len)
as *const u8;
slice::from_raw_parts(end_key_ptr, end_key_len)
}
}
}
pub trait CompactionFilterFactory {
......@@ -300,16 +322,16 @@ pub unsafe fn new_compaction_filter_factory(
mod tests {
use libc::c_uchar;
use std::ffi::CString;
use std::str;
use std::sync::mpsc::{self, SyncSender};
use std::time::Duration;
use librocksdb_sys::DBTableFileCreationReason;
use super::{
new_compaction_filter_raw, CompactionFilter, CompactionFilterContext,
CompactionFilterFactory, DBCompactionFilter,
use crate::{
new_compaction_filter_raw, ColumnFamilyOptions, CompactionFilter, CompactionFilterContext,
CompactionFilterFactory, DBCompactionFilter, DBOptions, Writable, DB,
};
use crate::{ColumnFamilyOptions, DBOptions, Writable, DB};
struct Factory(SyncSender<()>);
impl Drop for Factory {
......@@ -335,6 +357,32 @@ mod tests {
}
}
struct KeyRangeFilter;
impl CompactionFilter for KeyRangeFilter {
fn filter(&mut self, _: usize, _: &[u8], _: &[u8], _: &mut Vec<u8>, _: &mut bool) -> bool {
false
}
}
struct KeyRangeFactory(SyncSender<Vec<u8>>);
impl CompactionFilterFactory for KeyRangeFactory {
fn create_compaction_filter(
&self,
context: &CompactionFilterContext,
) -> *mut DBCompactionFilter {
let start_key = context.start_key();
let end_key = context.end_key();
&self.0.send(start_key.to_owned()).unwrap();
&self.0.send(end_key.to_owned()).unwrap();
unsafe {
new_compaction_filter_raw(
CString::new("key_range_filter").unwrap(),
Box::new(KeyRangeFilter),
)
}
}
}
struct FlushFactory {}
struct FlushFilter {}
impl CompactionFilter for FlushFilter {
......@@ -426,6 +474,42 @@ mod tests {
assert!(rx.recv_timeout(Duration::from_secs(1)).is_ok());
}
#[test]
fn test_compaction_filter_factory_context_keys() {
let mut cf_opts = ColumnFamilyOptions::default();
let name = CString::new("compaction filter factory").unwrap();
let (tx, rx) = mpsc::sync_channel(2);
let factory = Box::new(KeyRangeFactory(tx)) as Box<dyn CompactionFilterFactory>;
cf_opts
.set_compaction_filter_factory(name, factory)
.unwrap();
let mut opts = DBOptions::new();
opts.create_if_missing(true);
let path = tempfile::Builder::new()
.prefix("test_factory_context_keys")
.tempdir()
.unwrap();
let mut db = DB::open(opts, path.path().to_str().unwrap()).unwrap();
db.create_cf(("test", cf_opts)).unwrap();
let cfh = db.cf_handle("test").unwrap();
for i in 0..10 {
db.put_cf(
cfh,
format!("key{}", i).as_bytes(),
format!("value{}", i).as_bytes(),
)
.unwrap();
}
db.compact_range_cf(cfh, None, None);
let sk = rx.recv().unwrap();
let ek = rx.recv().unwrap();
println!("sk:{:?} ek:{:?}", sk, ek);
let sk = str::from_utf8(&sk).unwrap();
let ek = str::from_utf8(&ek).unwrap();
assert_eq!("key0", sk);
assert_eq!("key9", ek);
}
#[test]
fn test_flush_filter() {
// cf with filter
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment