Unverified Commit 4acb7d85 authored by zhangjinpeng1987's avatar zhangjinpeng1987 Committed by GitHub

support table filter (#169)

parent 53cb9a74
...@@ -2793,6 +2793,48 @@ void crocksdb_readoptions_set_ignore_range_deletions( ...@@ -2793,6 +2793,48 @@ void crocksdb_readoptions_set_ignore_range_deletions(
opt->rep.ignore_range_deletions = v; opt->rep.ignore_range_deletions = v;
} }
struct TableFilterCtx {
TableFilterCtx(void* ctx, void(*destroy)(void*))
: ctx_(ctx), destroy_(destroy) {}
~TableFilterCtx() { destroy_(ctx_); }
void* ctx_;
void (*destroy_)(void*);
};
struct TableFilter {
// After passing TableFilter to ReadOptions, ReadOptions will be copyed
// several times, so we need use shared_ptr to control the ctx_ resource
// destroy ctx_ only when the last ReadOptions out of its life time.
TableFilter(void* ctx,
int (*table_filter)(void*, const crocksdb_table_properties_t*),
void (*destroy)(void*))
: ctx_(std::make_shared<TableFilterCtx>(ctx, destroy)),
table_filter_(table_filter) {}
TableFilter(const TableFilter& f)
: ctx_(f.ctx_),
table_filter_(f.table_filter_) {}
bool operator()(const TableProperties& prop) {
return table_filter_(ctx_->ctx_, reinterpret_cast<const crocksdb_table_properties_t*>(&prop));
}
shared_ptr<TableFilterCtx> ctx_;
int (*table_filter_)(void*, const crocksdb_table_properties_t*);
private:
TableFilter() {}
};
void crocksdb_readoptions_set_table_filter(
crocksdb_readoptions_t *opt,
void* ctx,
int (*table_filter)(void*, const crocksdb_table_properties_t*),
void (*destroy)(void*)) {
opt->rep.table_filter = TableFilter(ctx, table_filter, destroy);
}
crocksdb_writeoptions_t* crocksdb_writeoptions_create() { crocksdb_writeoptions_t* crocksdb_writeoptions_create() {
return new crocksdb_writeoptions_t; return new crocksdb_writeoptions_t;
} }
......
...@@ -1136,6 +1136,12 @@ crocksdb_readoptions_set_background_purge_on_iterator_cleanup( ...@@ -1136,6 +1136,12 @@ crocksdb_readoptions_set_background_purge_on_iterator_cleanup(
extern C_ROCKSDB_LIBRARY_API void extern C_ROCKSDB_LIBRARY_API void
crocksdb_readoptions_set_ignore_range_deletions(crocksdb_readoptions_t *, crocksdb_readoptions_set_ignore_range_deletions(crocksdb_readoptions_t *,
unsigned char); unsigned char);
extern C_ROCKSDB_LIBRARY_API void
crocksdb_readoptions_set_table_filter(
crocksdb_readoptions_t*,
void*,
int(*table_filter)(void*, const crocksdb_table_properties_t*),
void(*destory)(void*));
/* Write options */ /* Write options */
......
...@@ -623,6 +623,12 @@ extern "C" { ...@@ -623,6 +623,12 @@ extern "C" {
v: bool, v: bool,
); );
pub fn crocksdb_readoptions_set_ignore_range_deletions(readopts: *mut DBReadOptions, v: bool); pub fn crocksdb_readoptions_set_ignore_range_deletions(readopts: *mut DBReadOptions, v: bool);
pub fn crocksdb_readoptions_set_table_filter(
readopts: *mut DBReadOptions,
ctx: *mut c_void,
filter: extern "C" fn(*mut c_void, *const DBTableProperties) -> c_int,
destroy: extern "C" fn(*mut c_void),
);
pub fn crocksdb_get( pub fn crocksdb_get(
db: *const DBInstance, db: *const DBInstance,
......
...@@ -30,6 +30,7 @@ mod table_properties; ...@@ -30,6 +30,7 @@ mod table_properties;
mod table_properties_collector; mod table_properties_collector;
mod table_properties_collector_factory; mod table_properties_collector_factory;
mod event_listener; mod event_listener;
mod table_filter;
pub use compaction_filter::CompactionFilter; pub use compaction_filter::CompactionFilter;
pub use event_listener::{CompactionJobInfo, EventListener, FlushJobInfo, IngestionInfo}; pub use event_listener::{CompactionJobInfo, EventListener, FlushJobInfo, IngestionInfo};
...@@ -45,6 +46,7 @@ pub use rocksdb_options::{BlockBasedOptions, ColumnFamilyOptions, CompactOptions ...@@ -45,6 +46,7 @@ pub use rocksdb_options::{BlockBasedOptions, ColumnFamilyOptions, CompactOptions
IngestExternalFileOptions, RateLimiter, ReadOptions, RestoreOptions, IngestExternalFileOptions, RateLimiter, ReadOptions, RestoreOptions,
WriteOptions}; WriteOptions};
pub use slice_transform::SliceTransform; pub use slice_transform::SliceTransform;
pub use table_filter::TableFilter;
pub use table_properties::{TableProperties, TablePropertiesCollection, pub use table_properties::{TableProperties, TablePropertiesCollection,
TablePropertiesCollectionView, UserCollectedProperties}; TablePropertiesCollectionView, UserCollectedProperties};
pub use table_properties_collector::TablePropertiesCollector; pub use table_properties_collector::TablePropertiesCollector;
......
...@@ -29,6 +29,7 @@ use std::ffi::{CStr, CString}; ...@@ -29,6 +29,7 @@ use std::ffi::{CStr, CString};
use std::mem; use std::mem;
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use table_filter::{destroy_table_filter, table_filter, TableFilter};
use table_properties_collector_factory::{new_table_properties_collector_factory, use table_properties_collector_factory::{new_table_properties_collector_factory,
TablePropertiesCollectorFactory}; TablePropertiesCollectorFactory};
...@@ -375,6 +376,18 @@ impl ReadOptions { ...@@ -375,6 +376,18 @@ impl ReadOptions {
pub unsafe fn get_inner(&self) -> *const DBReadOptions { pub unsafe fn get_inner(&self) -> *const DBReadOptions {
self.inner self.inner
} }
pub fn set_table_filter(&mut self, filter: Box<TableFilter>) {
unsafe {
let f = Box::into_raw(Box::new(filter));
crocksdb_ffi::crocksdb_readoptions_set_table_filter(
self.inner,
mem::transmute(f),
table_filter,
destroy_table_filter,
);
}
}
} }
pub struct WriteOptions { pub struct WriteOptions {
......
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
use crocksdb_ffi::DBTableProperties;
use libc::{c_int, c_void};
use std::mem;
use table_properties::TableProperties;
pub trait TableFilter {
// A callback to determine whether relevant keys for this scan exist in a
// given table based on the table's properties. The callback is passed the
// properties of each table during iteration. If the callback returns false,
// the table will not be scanned. This option only affects Iterators and has
// no impact on point lookups.
fn table_filter(&self, props: &TableProperties) -> bool;
}
pub extern "C" fn table_filter(ctx: *mut c_void, props: *const DBTableProperties) -> c_int {
unsafe {
let filter = &*(ctx as *mut Box<TableFilter>);
filter.table_filter(mem::transmute(&*props)) as c_int
}
}
pub extern "C" fn destroy_table_filter(filter: *mut c_void) {
unsafe {
Box::from_raw(filter as *mut Box<TableFilter>);
}
}
...@@ -12,9 +12,9 @@ ...@@ -12,9 +12,9 @@
// limitations under the License. // limitations under the License.
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use rocksdb::{ColumnFamilyOptions, DBEntryType, DBOptions, Range, TablePropertiesCollection, use rocksdb::{ColumnFamilyOptions, DBEntryType, DBOptions, Range, ReadOptions, SeekKey,
TablePropertiesCollector, TablePropertiesCollectorFactory, UserCollectedProperties, TableFilter, TableProperties, TablePropertiesCollection, TablePropertiesCollector,
Writable, DB}; TablePropertiesCollectorFactory, UserCollectedProperties, Writable, DB};
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt; use std::fmt;
use tempdir::TempDir; use tempdir::TempDir;
...@@ -214,3 +214,76 @@ fn test_table_properties_collector_factory() { ...@@ -214,3 +214,76 @@ fn test_table_properties_collector_factory() {
let collection = db.get_properties_of_tables_in_range(cf, &[range]).unwrap(); let collection = db.get_properties_of_tables_in_range(cf, &[range]).unwrap();
check_collection(&collection, 1, 4, 4, 0, 0); check_collection(&collection, 1, 4, 4, 0, 0);
} }
struct BigTableFilter {
max_entries: u64,
}
impl BigTableFilter {
pub fn new(max_entries: u64) -> BigTableFilter {
BigTableFilter {
max_entries: max_entries,
}
}
}
impl TableFilter for BigTableFilter {
fn table_filter(&self, props: &TableProperties) -> bool {
if props.num_entries() > self.max_entries {
// this sst will not be scanned
return false;
}
true
}
}
#[test]
fn test_table_properties_with_table_filter() {
let f = ExampleFactory::new();
let mut opts = DBOptions::new();
let mut cf_opts = ColumnFamilyOptions::new();
opts.create_if_missing(true);
cf_opts.add_table_properties_collector_factory("example-collector", Box::new(f));
let path = TempDir::new("_rust_rocksdb_collector_with_table_filter").expect("");
let db = DB::open_cf(
opts,
path.path().to_str().unwrap(),
vec![("default", cf_opts)],
).unwrap();
// Generate a sst with 4 entries.
let samples = vec![
(b"key1".to_vec(), b"value1".to_vec()),
(b"key2".to_vec(), b"value2".to_vec()),
(b"key3".to_vec(), b"value3".to_vec()),
(b"key4".to_vec(), b"value4".to_vec()),
];
for &(ref k, ref v) in &samples {
db.put(k, v).unwrap();
assert_eq!(v.as_slice(), &*db.get(k).unwrap().unwrap());
}
db.flush(true).unwrap();
// Generate a sst with 2 entries
let samples = vec![
(b"key5".to_vec(), b"value5".to_vec()),
(b"key6".to_vec(), b"value6".to_vec()),
];
for &(ref k, ref v) in &samples {
db.put(k, v).unwrap();
assert_eq!(v.as_slice(), &*db.get(k).unwrap().unwrap());
}
db.flush(true).unwrap();
// Scan with table filter
let f = BigTableFilter::new(2);
let mut ropts = ReadOptions::new();
ropts.set_table_filter(Box::new(f));
let mut iter = db.iter_opt(ropts);
let key = b"key";
let key5 = b"key5";
assert!(iter.seek(SeekKey::from(key.as_ref())));
// First sst will be skipped
assert_eq!(iter.key(), key5.as_ref());
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment