Unverified Commit d1e07646 authored by qupeng's avatar qupeng Committed by GitHub

port filter_v2 for compaction filter (#572)

parent 59af34d7
...@@ -392,36 +392,38 @@ struct crocksdb_map_property_t { ...@@ -392,36 +392,38 @@ struct crocksdb_map_property_t {
struct crocksdb_compactionfilter_t : public CompactionFilter { struct crocksdb_compactionfilter_t : public CompactionFilter {
void* state_; void* state_;
void (*destructor_)(void*); void (*destructor_)(void*);
unsigned char (*filter_)(void*, int level, const char* key, size_t key_length, Decision (*filter_v2_)(void*, int level, const char* key, size_t key_length,
const char* existing_value, size_t value_length, ValueType value_type, const char* existing_value,
char** new_value, size_t* new_value_length, size_t value_length, char** new_value,
unsigned char* value_changed); size_t* new_value_length, char** skip_until,
size_t* skip_until_length);
const char* (*name_)(void*); const char* (*name_)(void*);
unsigned char ignore_snapshots_;
virtual ~crocksdb_compactionfilter_t() { (*destructor_)(state_); } virtual ~crocksdb_compactionfilter_t() { (*destructor_)(state_); }
virtual bool Filter(int level, const Slice& key, const Slice& existing_value, virtual Decision FilterV2(int level, const Slice& key, ValueType value_type,
std::string* new_value, const Slice& existing_value, std::string* new_value,
bool* value_changed) const override { std::string* skip_until) const override {
char* c_new_value = nullptr; char* c_new_value = nullptr;
size_t new_value_length = 0; char* c_skip_until = nullptr;
unsigned char c_value_changed = 0; size_t new_value_length, skip_until_length = 0;
unsigned char result =
(*filter_)(state_, level, key.data(), key.size(), existing_value.data(), Decision result = (*filter_v2_)(
existing_value.size(), &c_new_value, &new_value_length, state_, level, key.data(), key.size(), value_type,
&c_value_changed); existing_value.data(), existing_value.size(), &c_new_value,
if (c_value_changed) { &new_value_length, &c_skip_until, &skip_until_length);
if (result == Decision::kChangeValue) {
new_value->assign(c_new_value, new_value_length); new_value->assign(c_new_value, new_value_length);
free(c_new_value); free(c_new_value);
*value_changed = true; } else if (result == Decision::kRemoveAndSkipUntil) {
skip_until->assign(c_skip_until, skip_until_length);
free(c_skip_until);
} }
return result; return result;
} }
virtual const char* Name() const override { return (*name_)(state_); } virtual const char* Name() const override { return (*name_)(state_); }
virtual bool IgnoreSnapshots() const override { return ignore_snapshots_; }
}; };
struct crocksdb_compactionfilterfactory_t : public CompactionFilterFactory { struct crocksdb_compactionfilterfactory_t : public CompactionFilterFactory {
...@@ -3256,28 +3258,22 @@ custom cache ...@@ -3256,28 +3258,22 @@ custom cache
table_properties_collectors table_properties_collectors
*/ */
crocksdb_compactionfilter_t* crocksdb_compactionfilter_create( crocksdb_compactionfilter_t* crocksdb_compactionfilter_create_v2(
void* state, void (*destructor)(void*), void* state, void (*destructor)(void*),
unsigned char (*filter)(void*, int level, const char* key, CompactionFilter::Decision (*filter_v2)(
size_t key_length, const char* existing_value, void*, int level, const char* key, size_t key_length,
size_t value_length, char** new_value, CompactionFilter::ValueType value_type, const char* existing_value,
size_t* new_value_length, size_t value_length, char** new_value, size_t* new_value_length,
unsigned char* value_changed), char** skip_until, size_t* skip_until_length),
const char* (*name)(void*)) { const char* (*name)(void*)) {
crocksdb_compactionfilter_t* result = new crocksdb_compactionfilter_t; crocksdb_compactionfilter_t* result = new crocksdb_compactionfilter_t;
result->state_ = state; result->state_ = state;
result->destructor_ = destructor; result->destructor_ = destructor;
result->filter_ = filter; result->filter_v2_ = filter_v2;
result->ignore_snapshots_ = true;
result->name_ = name; result->name_ = name;
return result; return result;
} }
void crocksdb_compactionfilter_set_ignore_snapshots(
crocksdb_compactionfilter_t* filter, unsigned char whether_ignore) {
filter->ignore_snapshots_ = whether_ignore;
}
void crocksdb_compactionfilter_destroy(crocksdb_compactionfilter_t* filter) { void crocksdb_compactionfilter_destroy(crocksdb_compactionfilter_t* filter) {
delete filter; delete filter;
} }
......
...@@ -446,6 +446,23 @@ pub enum DBSstPartitionerResult { ...@@ -446,6 +446,23 @@ pub enum DBSstPartitionerResult {
Required = 1, Required = 1,
} }
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
#[repr(C)]
pub enum CompactionFilterValueType {
Value = 0,
MergeOperand = 1,
BlobIndex = 2,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
#[repr(C)]
pub enum CompactionFilterDecision {
Keep = 0,
Remove = 1,
ChangeValue = 2,
RemoveAndSkipUntil = 3,
}
/// # Safety /// # Safety
/// ///
/// ptr must point to a valid CStr value /// ptr must point to a valid CStr value
...@@ -1499,26 +1516,24 @@ extern "C" { ...@@ -1499,26 +1516,24 @@ extern "C" {
propname: *const c_char, propname: *const c_char,
) -> *mut c_char; ) -> *mut c_char;
// Compaction filter // Compaction filter
pub fn crocksdb_compactionfilter_create( pub fn crocksdb_compactionfilter_create_v2(
state: *mut c_void, state: *mut c_void,
destructor: extern "C" fn(*mut c_void), destructor: extern "C" fn(*mut c_void),
filter: extern "C" fn( filter_v2: extern "C" fn(
*mut c_void, *mut c_void,
c_int, c_int,
*const u8, *const u8,
size_t, size_t,
CompactionFilterValueType,
*const u8, *const u8,
size_t, size_t,
*mut *mut u8, *mut *mut u8,
*mut size_t, *mut size_t,
*mut bool, *mut *mut u8,
) -> bool, *mut size_t,
) -> CompactionFilterDecision,
name: extern "C" fn(*mut c_void) -> *const c_char, name: extern "C" fn(*mut c_void) -> *const c_char,
) -> *mut DBCompactionFilter; ) -> *mut DBCompactionFilter;
pub fn crocksdb_compactionfilter_set_ignore_snapshots(
filter: *mut DBCompactionFilter,
ignore_snapshot: bool,
);
pub fn crocksdb_compactionfilter_destroy(filter: *mut DBCompactionFilter); pub fn crocksdb_compactionfilter_destroy(filter: *mut DBCompactionFilter);
// Compaction filter context // Compaction filter context
......
use std::ffi::CString; use std::ffi::CString;
use std::{mem, ptr, slice}; use std::{ptr, slice};
use crate::table_properties::TableProperties; use crate::table_properties::TableProperties;
use crocksdb_ffi::CompactionFilterDecision as RawCompactionFilterDecision;
pub use crocksdb_ffi::CompactionFilterValueType;
pub use crocksdb_ffi::DBCompactionFilter; pub use crocksdb_ffi::DBCompactionFilter;
use crocksdb_ffi::{self, DBCompactionFilterContext, DBCompactionFilterFactory}; use crocksdb_ffi::{self, DBCompactionFilterContext, DBCompactionFilterFactory};
use libc::{c_char, c_int, c_void, malloc, memcpy, size_t}; use libc::{c_char, c_int, c_void, malloc, memcpy, size_t};
/// Decision used in `CompactionFilter::filter_v2`.
pub enum CompactionFilterDecision {
/// The record will be kept instead of filtered.
Keep,
/// The record will be filtered, and a tombstone will be left.
Remove,
/// The record will be kept but the value will be replaced.
ChangeValue(Vec<u8>),
/// All records between [current, `until`) will be filtered without any tombstones left.
RemoveAndSkipUntil(Vec<u8>),
}
/// `CompactionFilter` allows an application to modify/delete a key-value at /// `CompactionFilter` allows an application to modify/delete a key-value at
/// the time of compaction. /// the time of compaction.
/// For more details, Please checkout rocksdb's documentation. /// For more details, Please checkout rocksdb's documentation.
...@@ -15,16 +29,43 @@ pub trait CompactionFilter { ...@@ -15,16 +29,43 @@ pub trait CompactionFilter {
/// of false indicates that the kv should be preserved in the /// of false indicates that the kv should be preserved in the
/// output of this compaction run and a return value of true /// output of this compaction run and a return value of true
/// indicates that this key-value should be removed from the /// indicates that this key-value should be removed from the
/// output of the compaction. The application can inspect /// output of the compaction. The application can inspect
/// the existing value of the key and make decision based on it. /// the existing value of the key and make decision based on it.
fn filter( fn filter(
&mut self,
_level: usize,
_key: &[u8],
_value: &[u8],
_new_value: &mut Vec<u8>,
_value_changed: &mut bool,
) -> bool {
false
}
/// This method will overwrite `filter` if a `CompactionFilter` implements both of them.
fn filter_v2(
&mut self, &mut self,
level: usize, level: usize,
key: &[u8], key: &[u8],
value: &[u8], value: &[u8],
new_value: &mut Vec<u8>, value_type: CompactionFilterValueType,
value_changed: &mut bool, ) -> CompactionFilterDecision {
) -> bool; match value_type {
CompactionFilterValueType::Value => {
let (mut new_value, mut value_changed) = (Vec::new(), false);
if self.filter(level, key, value, &mut new_value, &mut value_changed) {
return CompactionFilterDecision::Remove;
}
if value_changed {
CompactionFilterDecision::ChangeValue(new_value)
} else {
CompactionFilterDecision::Keep
}
}
// Currently `MergeOperand` and `BlobIndex` will always be kept.
_ => CompactionFilterDecision::Keep,
}
}
} }
#[repr(C)] #[repr(C)]
...@@ -43,44 +84,44 @@ extern "C" fn destructor(filter: *mut c_void) { ...@@ -43,44 +84,44 @@ extern "C" fn destructor(filter: *mut c_void) {
} }
} }
extern "C" fn filter( extern "C" fn filter_v2(
filter: *mut c_void, filter: *mut c_void,
level: c_int, level: c_int,
key: *const u8, key: *const u8,
key_len: size_t, key_len: size_t,
value_type: CompactionFilterValueType,
value: *const u8, value: *const u8,
value_len: size_t, value_len: size_t,
new_value: *mut *mut u8, new_value: *mut *mut u8,
new_value_len: *mut size_t, new_value_len: *mut size_t,
value_changed: *mut bool, skip_until: *mut *mut u8,
) -> bool { skip_until_length: *mut size_t,
) -> RawCompactionFilterDecision {
unsafe { unsafe {
*new_value = ptr::null_mut(); *new_value = ptr::null_mut();
*new_value_len = 0; *new_value_len = 0;
*value_changed = false; *skip_until = ptr::null_mut();
*skip_until_length = 0;
let filter = &mut *(filter as *mut CompactionFilterProxy); let filter = &mut (*(filter as *mut CompactionFilterProxy)).filter;
let key = slice::from_raw_parts(key, key_len); let key = slice::from_raw_parts(key, key_len);
let value = slice::from_raw_parts(value, value_len); let value = slice::from_raw_parts(value, value_len);
let mut new_value_v = Vec::default(); match filter.filter_v2(level as usize, key, value, value_type) {
let filtered = filter.filter.filter( CompactionFilterDecision::Keep => RawCompactionFilterDecision::Keep,
level as usize, CompactionFilterDecision::Remove => RawCompactionFilterDecision::Remove,
key, CompactionFilterDecision::ChangeValue(new_v) => {
value, *new_value_len = new_v.len();
&mut new_value_v, *new_value = malloc(*new_value_len) as *mut u8;
mem::transmute(&mut *value_changed), memcpy(*new_value as _, new_v.as_ptr() as _, *new_value_len);
); RawCompactionFilterDecision::ChangeValue
if *value_changed { }
*new_value_len = new_value_v.len(); CompactionFilterDecision::RemoveAndSkipUntil(until) => {
// The vector is allocated in Rust, so dup it before pass into C. *skip_until_length = until.len();
*new_value = malloc(*new_value_len) as *mut u8; *skip_until = malloc(*skip_until_length) as *mut u8;
memcpy( memcpy(*skip_until as _, until.as_ptr() as _, *skip_until_length);
new_value as *mut c_void, RawCompactionFilterDecision::RemoveAndSkipUntil
&new_value_v[0] as *const u8 as *const c_void, }
*new_value_len,
);
} }
filtered
} }
} }
...@@ -114,7 +155,12 @@ pub unsafe fn new_compaction_filter_raw( ...@@ -114,7 +155,12 @@ pub unsafe fn new_compaction_filter_raw(
name: c_name, name: c_name,
filter: f, filter: f,
})); }));
crocksdb_ffi::crocksdb_compactionfilter_create(proxy as *mut c_void, destructor, filter, name) crocksdb_ffi::crocksdb_compactionfilter_create_v2(
proxy as *mut c_void,
destructor,
filter_v2,
name,
)
} }
pub struct CompactionFilterContext(DBCompactionFilterContext); pub struct CompactionFilterContext(DBCompactionFilterContext);
......
...@@ -31,8 +31,9 @@ extern crate lazy_static; ...@@ -31,8 +31,9 @@ extern crate lazy_static;
pub use cloud::CloudEnvOptions; pub use cloud::CloudEnvOptions;
pub use compaction_filter::{ pub use compaction_filter::{
new_compaction_filter, new_compaction_filter_factory, new_compaction_filter_raw, new_compaction_filter, new_compaction_filter_factory, new_compaction_filter_raw,
CompactionFilter, CompactionFilterContext, CompactionFilterFactory, CompactionFilter, CompactionFilterContext, CompactionFilterDecision, CompactionFilterFactory,
CompactionFilterFactoryHandle, CompactionFilterHandle, DBCompactionFilter, CompactionFilterFactoryHandle, CompactionFilterHandle, CompactionFilterValueType,
DBCompactionFilter,
}; };
#[cfg(feature = "encryption")] #[cfg(feature = "encryption")]
pub use encryption::{DBEncryptionMethod, EncryptionKeyManager, FileEncryptionInfo}; pub use encryption::{DBEncryptionMethod, EncryptionKeyManager, FileEncryptionInfo};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment