Unverified Commit 2fa68dd5 authored by Croxx's avatar Croxx Committed by GitHub

Allow compaction filter on flush (#635)

Ref: tikv/rocksdb#pr243
Signed-off-by: 's avatarMrCroxx <mrcroxx@outlook.com>
parent 9af34338
...@@ -78,7 +78,6 @@ using rocksdb::ColumnFamilyDescriptor; ...@@ -78,7 +78,6 @@ using rocksdb::ColumnFamilyDescriptor;
using rocksdb::ColumnFamilyHandle; using rocksdb::ColumnFamilyHandle;
using rocksdb::ColumnFamilyOptions; using rocksdb::ColumnFamilyOptions;
using rocksdb::CompactionFilter; using rocksdb::CompactionFilter;
using rocksdb::CompactionFilterContext;
using rocksdb::CompactionFilterFactory; using rocksdb::CompactionFilterFactory;
using rocksdb::CompactionJobInfo; using rocksdb::CompactionJobInfo;
using rocksdb::CompactionOptionsFIFO; using rocksdb::CompactionOptionsFIFO;
...@@ -139,6 +138,7 @@ using rocksdb::SstPartitioner; ...@@ -139,6 +138,7 @@ using rocksdb::SstPartitioner;
using rocksdb::SstPartitionerFactory; using rocksdb::SstPartitionerFactory;
using rocksdb::Status; using rocksdb::Status;
using rocksdb::SubcompactionJobInfo; using rocksdb::SubcompactionJobInfo;
using rocksdb::TableFileCreationReason;
using rocksdb::TableProperties; using rocksdb::TableProperties;
using rocksdb::TablePropertiesCollection; using rocksdb::TablePropertiesCollection;
using rocksdb::TablePropertiesCollector; using rocksdb::TablePropertiesCollector;
...@@ -436,6 +436,7 @@ struct crocksdb_compactionfilterfactory_t : public CompactionFilterFactory { ...@@ -436,6 +436,7 @@ struct crocksdb_compactionfilterfactory_t : public CompactionFilterFactory {
void (*destructor_)(void*); void (*destructor_)(void*);
crocksdb_compactionfilter_t* (*create_compaction_filter_)( crocksdb_compactionfilter_t* (*create_compaction_filter_)(
void*, crocksdb_compactionfiltercontext_t* context); void*, crocksdb_compactionfiltercontext_t* context);
bool (*should_filter_table_file_creation_)(void*, int reason);
const char* (*name_)(void*); const char* (*name_)(void*);
virtual ~crocksdb_compactionfilterfactory_t() { (*destructor_)(state_); } virtual ~crocksdb_compactionfilterfactory_t() { (*destructor_)(state_); }
...@@ -448,6 +449,12 @@ struct crocksdb_compactionfilterfactory_t : public CompactionFilterFactory { ...@@ -448,6 +449,12 @@ struct crocksdb_compactionfilterfactory_t : public CompactionFilterFactory {
return std::unique_ptr<CompactionFilter>(cf); return std::unique_ptr<CompactionFilter>(cf);
} }
virtual bool ShouldFilterTableFileCreation(
TableFileCreationReason reason) const override {
int creason = static_cast<int>(reason);
return (*should_filter_table_file_creation_)(state_, creason);
}
virtual const char* Name() const override { return (*name_)(state_); } virtual const char* Name() const override { return (*name_)(state_); }
}; };
...@@ -3446,12 +3453,15 @@ crocksdb_compactionfilterfactory_t* crocksdb_compactionfilterfactory_create( ...@@ -3446,12 +3453,15 @@ crocksdb_compactionfilterfactory_t* crocksdb_compactionfilterfactory_create(
void* state, void (*destructor)(void*), void* state, void (*destructor)(void*),
crocksdb_compactionfilter_t* (*create_compaction_filter)( crocksdb_compactionfilter_t* (*create_compaction_filter)(
void*, crocksdb_compactionfiltercontext_t* context), void*, crocksdb_compactionfiltercontext_t* context),
bool (*should_filter_table_file_creation)(void*, int reason),
const char* (*name)(void*)) { const char* (*name)(void*)) {
crocksdb_compactionfilterfactory_t* result = crocksdb_compactionfilterfactory_t* result =
new crocksdb_compactionfilterfactory_t; new crocksdb_compactionfilterfactory_t;
result->state_ = state; result->state_ = state;
result->destructor_ = destructor; result->destructor_ = destructor;
result->create_compaction_filter_ = create_compaction_filter; result->create_compaction_filter_ = create_compaction_filter;
result->should_filter_table_file_creation_ =
should_filter_table_file_creation;
result->name_ = name; result->name_ = name;
return result; return result;
} }
......
...@@ -80,6 +80,12 @@ typedef struct crocksdb_lru_cache_options_t crocksdb_lru_cache_options_t; ...@@ -80,6 +80,12 @@ typedef struct crocksdb_lru_cache_options_t crocksdb_lru_cache_options_t;
typedef struct crocksdb_cache_t crocksdb_cache_t; typedef struct crocksdb_cache_t crocksdb_cache_t;
typedef struct crocksdb_memory_allocator_t crocksdb_memory_allocator_t; typedef struct crocksdb_memory_allocator_t crocksdb_memory_allocator_t;
typedef struct crocksdb_compactionfilter_t crocksdb_compactionfilter_t; typedef struct crocksdb_compactionfilter_t crocksdb_compactionfilter_t;
enum {
crocksdb_table_file_creation_reason_flush = 0,
crocksdb_table_file_creation_reason_compaction = 1,
crocksdb_table_file_creation_reason_recovery = 2,
crocksdb_table_file_creation_reason_misc = 3,
};
typedef struct crocksdb_compactionfiltercontext_t typedef struct crocksdb_compactionfiltercontext_t
crocksdb_compactionfiltercontext_t; crocksdb_compactionfiltercontext_t;
typedef struct crocksdb_compactionfilterfactory_t typedef struct crocksdb_compactionfilterfactory_t
...@@ -1392,6 +1398,7 @@ crocksdb_compactionfilterfactory_create( ...@@ -1392,6 +1398,7 @@ crocksdb_compactionfilterfactory_create(
void* state, void (*destructor)(void*), void* state, void (*destructor)(void*),
crocksdb_compactionfilter_t* (*create_compaction_filter)( crocksdb_compactionfilter_t* (*create_compaction_filter)(
void*, crocksdb_compactionfiltercontext_t* context), void*, crocksdb_compactionfiltercontext_t* context),
bool (*should_filter_table_file_creation)(void*, int reasion),
const char* (*name)(void*)); const char* (*name)(void*));
extern C_ROCKSDB_LIBRARY_API void crocksdb_compactionfilterfactory_destroy( extern C_ROCKSDB_LIBRARY_API void crocksdb_compactionfilterfactory_destroy(
crocksdb_compactionfilterfactory_t*); crocksdb_compactionfilterfactory_t*);
......
Subproject commit 1098aac80da9703b9608d1b8f338531dfcbe9ba4 Subproject commit bc358745d7768d15cfded4190d0d2427f16065b6
...@@ -83,6 +83,7 @@ pub struct DBCompactionFilter(c_void); ...@@ -83,6 +83,7 @@ pub struct DBCompactionFilter(c_void);
pub struct DBCompactionFilterFactory(c_void); pub struct DBCompactionFilterFactory(c_void);
#[repr(C)] #[repr(C)]
pub struct DBCompactionFilterContext(c_void); pub struct DBCompactionFilterContext(c_void);
#[repr(C)] #[repr(C)]
pub struct EnvOptions(c_void); pub struct EnvOptions(c_void);
#[repr(C)] #[repr(C)]
...@@ -465,6 +466,15 @@ pub enum CompactionFilterDecision { ...@@ -465,6 +466,15 @@ pub enum CompactionFilterDecision {
RemoveAndSkipUntil = 3, RemoveAndSkipUntil = 3,
} }
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
#[repr(C)]
pub enum DBTableFileCreationReason {
Flush = 0,
Compaction = 1,
Recovery = 2,
Misc = 3,
}
/// # Safety /// # Safety
/// ///
/// ptr must point to a valid CStr value /// ptr must point to a valid CStr value
...@@ -1578,6 +1588,10 @@ extern "C" { ...@@ -1578,6 +1588,10 @@ extern "C" {
*mut c_void, *mut c_void,
*const DBCompactionFilterContext, *const DBCompactionFilterContext,
) -> *mut DBCompactionFilter, ) -> *mut DBCompactionFilter,
should_filter_table_file_creation: extern "C" fn(
*const c_void,
DBTableFileCreationReason,
) -> bool,
name: extern "C" fn(*mut c_void) -> *const c_char, name: extern "C" fn(*mut c_void) -> *const c_char,
) -> *mut DBCompactionFilterFactory; ) -> *mut DBCompactionFilterFactory;
pub fn crocksdb_compactionfilterfactory_destroy(factory: *mut DBCompactionFilterFactory); pub fn crocksdb_compactionfilterfactory_destroy(factory: *mut DBCompactionFilterFactory);
......
...@@ -5,7 +5,9 @@ use crate::table_properties::TableProperties; ...@@ -5,7 +5,9 @@ use crate::table_properties::TableProperties;
use crocksdb_ffi::CompactionFilterDecision as RawCompactionFilterDecision; use crocksdb_ffi::CompactionFilterDecision as RawCompactionFilterDecision;
pub use crocksdb_ffi::CompactionFilterValueType; pub use crocksdb_ffi::CompactionFilterValueType;
pub use crocksdb_ffi::DBCompactionFilter; pub use crocksdb_ffi::DBCompactionFilter;
use crocksdb_ffi::{self, DBCompactionFilterContext, DBCompactionFilterFactory}; use crocksdb_ffi::{
self, DBCompactionFilterContext, DBCompactionFilterFactory, DBTableFileCreationReason,
};
use libc::{c_char, c_int, c_void, malloc, memcpy, size_t}; use libc::{c_char, c_int, c_void, malloc, memcpy, size_t};
/// Decision used in `CompactionFilter::filter`. /// Decision used in `CompactionFilter::filter`.
...@@ -204,6 +206,13 @@ pub trait CompactionFilterFactory { ...@@ -204,6 +206,13 @@ pub trait CompactionFilterFactory {
&self, &self,
context: &CompactionFilterContext, context: &CompactionFilterContext,
) -> *mut DBCompactionFilter; ) -> *mut DBCompactionFilter;
/// Returns whether a thread creating table files for the specified `reason`
/// should have invoke `create_compaction_filter` and pass KVs through the returned
/// filter.
fn should_filter_table_file_creation(&self, reason: DBTableFileCreationReason) -> bool {
matches!(reason, DBTableFileCreationReason::Flush)
}
} }
#[repr(C)] #[repr(C)]
...@@ -216,6 +225,7 @@ mod factory { ...@@ -216,6 +225,7 @@ mod factory {
use super::{CompactionFilterContext, CompactionFilterFactoryProxy}; use super::{CompactionFilterContext, CompactionFilterFactoryProxy};
use crocksdb_ffi::{DBCompactionFilter, DBCompactionFilterContext}; use crocksdb_ffi::{DBCompactionFilter, DBCompactionFilterContext};
use libc::{c_char, c_void}; use libc::{c_char, c_void};
use librocksdb_sys::DBTableFileCreationReason;
pub(super) extern "C" fn name(factory: *mut c_void) -> *const c_char { pub(super) extern "C" fn name(factory: *mut c_void) -> *const c_char {
unsafe { unsafe {
...@@ -240,6 +250,17 @@ mod factory { ...@@ -240,6 +250,17 @@ mod factory {
factory.factory.create_compaction_filter(context) factory.factory.create_compaction_filter(context)
} }
} }
pub(super) extern "C" fn should_filter_table_file_creation(
factory: *const c_void,
reason: DBTableFileCreationReason,
) -> bool {
unsafe {
let factory = &*(factory as *const CompactionFilterFactoryProxy);
let reason: DBTableFileCreationReason = reason as DBTableFileCreationReason;
factory.factory.should_filter_table_file_creation(reason)
}
}
} }
pub struct CompactionFilterFactoryHandle { pub struct CompactionFilterFactoryHandle {
...@@ -267,6 +288,7 @@ pub unsafe fn new_compaction_filter_factory( ...@@ -267,6 +288,7 @@ pub unsafe fn new_compaction_filter_factory(
proxy as *mut c_void, proxy as *mut c_void,
self::factory::destructor, self::factory::destructor,
self::factory::create_compaction_filter, self::factory::create_compaction_filter,
self::factory::should_filter_table_file_creation,
self::factory::name, self::factory::name,
); );
...@@ -279,10 +301,13 @@ mod tests { ...@@ -279,10 +301,13 @@ mod tests {
use std::sync::mpsc::{self, SyncSender}; use std::sync::mpsc::{self, SyncSender};
use std::time::Duration; use std::time::Duration;
use librocksdb_sys::DBTableFileCreationReason;
use super::{ use super::{
CompactionFilter, CompactionFilterContext, CompactionFilterFactory, DBCompactionFilter, new_compaction_filter_raw, CompactionFilter, CompactionFilterContext,
CompactionFilterFactory, DBCompactionFilter,
}; };
use crate::{ColumnFamilyOptions, DBOptions, DB}; use crate::{ColumnFamilyOptions, DBOptions, Writable, DB};
struct Factory(SyncSender<()>); struct Factory(SyncSender<()>);
impl Drop for Factory { impl Drop for Factory {
...@@ -308,6 +333,29 @@ mod tests { ...@@ -308,6 +333,29 @@ mod tests {
} }
} }
struct FlushFactory {}
struct FlushFilter {}
impl CompactionFilter for FlushFilter {
fn filter(&mut self, _: usize, _: &[u8], _: &[u8], _: &mut Vec<u8>, _: &mut bool) -> bool {
true
}
}
impl CompactionFilterFactory for FlushFactory {
fn should_filter_table_file_creation(&self, reason: DBTableFileCreationReason) -> bool {
matches!(reason, DBTableFileCreationReason::Flush)
}
fn create_compaction_filter(
&self,
_context: &CompactionFilterContext,
) -> *mut DBCompactionFilter {
let filter = Box::new(FlushFilter {});
let name = CString::new("flush_compaction_filter").unwrap();
unsafe { new_compaction_filter_raw(name, filter) }
}
}
#[test] #[test]
fn test_factory_destructor() { fn test_factory_destructor() {
let (tx, rx) = mpsc::sync_channel(1); let (tx, rx) = mpsc::sync_channel(1);
...@@ -375,4 +423,43 @@ mod tests { ...@@ -375,4 +423,43 @@ mod tests {
drop(db); drop(db);
assert!(rx.recv_timeout(Duration::from_secs(1)).is_ok()); assert!(rx.recv_timeout(Duration::from_secs(1)).is_ok());
} }
#[test]
fn test_flush_filter() {
// cf with filter
let name = CString::new("test_flush_filter_factory").unwrap();
let factory = Box::new(FlushFactory {}) as Box<dyn CompactionFilterFactory>;
let mut cf_opts_wf = ColumnFamilyOptions::default();
cf_opts_wf
.set_compaction_filter_factory(name, factory)
.unwrap();
cf_opts_wf.set_disable_auto_compactions(true);
// cf without filter
let mut cf_opts_of = ColumnFamilyOptions::default();
cf_opts_of.set_disable_auto_compactions(true);
// db
let mut opts = DBOptions::new();
opts.create_if_missing(true);
let path = tempfile::Builder::new()
.prefix("test_factory_context_keys")
.tempdir()
.unwrap();
let mut db = DB::open(opts, path.path().to_str().unwrap()).unwrap();
db.create_cf(("wf", cf_opts_wf)).unwrap();
db.create_cf(("of", cf_opts_of)).unwrap();
let cfh_wf = db.cf_handle("wf").unwrap();
let cfh_of = db.cf_handle("of").unwrap();
// put data
db.put_cf(cfh_wf, b"k", b"v").unwrap();
db.put_cf(cfh_of, b"k", b"v").unwrap();
db.flush_cf(cfh_wf, true).unwrap();
db.flush_cf(cfh_of, true).unwrap();
// assert
assert!(db.get_cf(cfh_wf, b"k").unwrap().is_none());
assert!(db.get_cf(cfh_of, b"k").unwrap().is_some());
}
} }
...@@ -45,8 +45,8 @@ pub use librocksdb_sys::{ ...@@ -45,8 +45,8 @@ pub use librocksdb_sys::{
DBBackgroundErrorReason, DBBottommostLevelCompaction, DBCompactionStyle, DBCompressionType, DBBackgroundErrorReason, DBBottommostLevelCompaction, DBCompactionStyle, DBCompressionType,
DBEntryType, DBInfoLogLevel, DBRateLimiterMode, DBRecoveryMode, DBEntryType, DBInfoLogLevel, DBRateLimiterMode, DBRecoveryMode,
DBSstPartitionerResult as SstPartitionerResult, DBStatisticsHistogramType, DBSstPartitionerResult as SstPartitionerResult, DBStatisticsHistogramType,
DBStatisticsTickerType, DBStatusPtr, DBTitanDBBlobRunMode, DBValueType, IndexType, DBStatisticsTickerType, DBStatusPtr, DBTableFileCreationReason, DBTitanDBBlobRunMode,
WriteStallCondition, DBValueType, IndexType, WriteStallCondition,
}; };
pub use logger::Logger; pub use logger::Logger;
pub use merge_operator::MergeOperands; pub use merge_operator::MergeOperands;
......
...@@ -2081,11 +2081,11 @@ impl IngestExternalFileOptions { ...@@ -2081,11 +2081,11 @@ impl IngestExternalFileOptions {
); );
} }
} }
pub fn get_write_global_seqno(&self) -> bool { pub fn get_write_global_seqno(&self) -> bool {
unsafe { unsafe {
crocksdb_ffi::crocksdb_ingestexternalfileoptions_get_write_global_seqno(self.inner) crocksdb_ffi::crocksdb_ingestexternalfileoptions_get_write_global_seqno(self.inner)
} }
} }
/// If set to true, a global_seqno will be written to a given offset in the external SST file /// If set to true, a global_seqno will be written to a given offset in the external SST file
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment