Unverified Commit 3f082c17 authored by qupeng's avatar qupeng Committed by GitHub

expose seqno from compaction filter and iterator (#574)

parent d1e07646
...@@ -41,6 +41,7 @@ ...@@ -41,6 +41,7 @@
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/table.h" #include "rocksdb/table.h"
#include "rocksdb/table_properties.h" #include "rocksdb/table_properties.h"
#include "rocksdb/types.h"
#include "rocksdb/universal_compaction.h" #include "rocksdb/universal_compaction.h"
#include "rocksdb/utilities/backupable_db.h" #include "rocksdb/utilities/backupable_db.h"
#include "rocksdb/utilities/db_ttl.h" #include "rocksdb/utilities/db_ttl.h"
...@@ -392,27 +393,28 @@ struct crocksdb_map_property_t { ...@@ -392,27 +393,28 @@ struct crocksdb_map_property_t {
struct crocksdb_compactionfilter_t : public CompactionFilter { struct crocksdb_compactionfilter_t : public CompactionFilter {
void* state_; void* state_;
void (*destructor_)(void*); void (*destructor_)(void*);
Decision (*filter_v2_)(void*, int level, const char* key, size_t key_length, Decision (*filter_)(void*, int level, const char* key, size_t key_length,
ValueType value_type, const char* existing_value, uint64_t seqno, ValueType value_type,
size_t value_length, char** new_value, const char* existing_value, size_t value_length,
size_t* new_value_length, char** skip_until, char** new_value, size_t* new_value_length,
size_t* skip_until_length); char** skip_until, size_t* skip_until_length);
const char* (*name_)(void*); const char* (*name_)(void*);
virtual ~crocksdb_compactionfilter_t() { (*destructor_)(state_); } virtual ~crocksdb_compactionfilter_t() { (*destructor_)(state_); }
virtual Decision FilterV2(int level, const Slice& key, ValueType value_type, virtual Decision FilterV3(int level, const Slice& key, uint64_t seqno,
const Slice& existing_value, std::string* new_value, ValueType value_type, const Slice& existing_value,
std::string* new_value,
std::string* skip_until) const override { std::string* skip_until) const override {
char* c_new_value = nullptr; char* c_new_value = nullptr;
char* c_skip_until = nullptr; char* c_skip_until = nullptr;
size_t new_value_length, skip_until_length = 0; size_t new_value_length, skip_until_length = 0;
Decision result = (*filter_v2_)( Decision result =
state_, level, key.data(), key.size(), value_type, (*filter_)(state_, level, key.data(), key.size(), seqno, value_type,
existing_value.data(), existing_value.size(), &c_new_value, existing_value.data(), existing_value.size(), &c_new_value,
&new_value_length, &c_skip_until, &skip_until_length); &new_value_length, &c_skip_until, &skip_until_length);
if (result == Decision::kChangeValue) { if (result == Decision::kChangeValue) {
new_value->assign(c_new_value, new_value_length); new_value->assign(c_new_value, new_value_length);
free(c_new_value); free(c_new_value);
...@@ -1500,6 +1502,10 @@ const char* crocksdb_iter_value(const crocksdb_iterator_t* iter, size_t* vlen) { ...@@ -1500,6 +1502,10 @@ const char* crocksdb_iter_value(const crocksdb_iterator_t* iter, size_t* vlen) {
return s.data(); return s.data();
} }
bool crocksdb_iter_seqno(const crocksdb_iterator_t* iter, SequenceNumber* no) {
return iter->rep->seqno(no);
}
void crocksdb_iter_get_error(const crocksdb_iterator_t* iter, char** errptr) { void crocksdb_iter_get_error(const crocksdb_iterator_t* iter, char** errptr) {
SaveError(errptr, iter->rep->status()); SaveError(errptr, iter->rep->status());
} }
...@@ -3258,10 +3264,10 @@ custom cache ...@@ -3258,10 +3264,10 @@ custom cache
table_properties_collectors table_properties_collectors
*/ */
crocksdb_compactionfilter_t* crocksdb_compactionfilter_create_v2( crocksdb_compactionfilter_t* crocksdb_compactionfilter_create(
void* state, void (*destructor)(void*), void* state, void (*destructor)(void*),
CompactionFilter::Decision (*filter_v2)( CompactionFilter::Decision (*filter)(
void*, int level, const char* key, size_t key_length, void*, int level, const char* key, size_t key_length, uint64_t seqno,
CompactionFilter::ValueType value_type, const char* existing_value, CompactionFilter::ValueType value_type, const char* existing_value,
size_t value_length, char** new_value, size_t* new_value_length, size_t value_length, char** new_value, size_t* new_value_length,
char** skip_until, size_t* skip_until_length), char** skip_until, size_t* skip_until_length),
...@@ -3269,7 +3275,7 @@ crocksdb_compactionfilter_t* crocksdb_compactionfilter_create_v2( ...@@ -3269,7 +3275,7 @@ crocksdb_compactionfilter_t* crocksdb_compactionfilter_create_v2(
crocksdb_compactionfilter_t* result = new crocksdb_compactionfilter_t; crocksdb_compactionfilter_t* result = new crocksdb_compactionfilter_t;
result->state_ = state; result->state_ = state;
result->destructor_ = destructor; result->destructor_ = destructor;
result->filter_v2_ = filter_v2; result->filter_ = filter;
result->name_ = name; result->name_ = name;
return result; return result;
} }
......
...@@ -1310,23 +1310,6 @@ crocksdb_ratelimiter_get_bytes_per_second(crocksdb_ratelimiter_t* limiter); ...@@ -1310,23 +1310,6 @@ crocksdb_ratelimiter_get_bytes_per_second(crocksdb_ratelimiter_t* limiter);
extern C_ROCKSDB_LIBRARY_API int64_t crocksdb_ratelimiter_get_total_requests( extern C_ROCKSDB_LIBRARY_API int64_t crocksdb_ratelimiter_get_total_requests(
crocksdb_ratelimiter_t* limiter, unsigned char pri); crocksdb_ratelimiter_t* limiter, unsigned char pri);
/* Compaction Filter */
extern C_ROCKSDB_LIBRARY_API crocksdb_compactionfilter_t*
crocksdb_compactionfilter_create(
void* state, void (*destructor)(void*),
unsigned char (*filter)(void*, int level, const char* key,
size_t key_length, const char* existing_value,
size_t value_length, char** new_value,
size_t* new_value_length,
unsigned char* value_changed),
const char* (*name)(void*));
extern C_ROCKSDB_LIBRARY_API void
crocksdb_compactionfilter_set_ignore_snapshots(crocksdb_compactionfilter_t*,
unsigned char);
extern C_ROCKSDB_LIBRARY_API void crocksdb_compactionfilter_destroy(
crocksdb_compactionfilter_t*);
/* Compaction Filter Context */ /* Compaction Filter Context */
extern C_ROCKSDB_LIBRARY_API unsigned char extern C_ROCKSDB_LIBRARY_API unsigned char
......
Subproject commit ab0a8b90f38eb49af49ee452fe564c7e474ebd44 Subproject commit 531a25ee0f11bfa9e91cd63eb80f9bf301a31da5
Subproject commit 355b496bf9d78d2f52e4972ec22cc9d35c234275 Subproject commit fbbe2c873da2319c8986cb315f7ea8e75553a79c
...@@ -1117,6 +1117,7 @@ extern "C" { ...@@ -1117,6 +1117,7 @@ extern "C" {
pub fn crocksdb_iter_prev(iter: *mut DBIterator); pub fn crocksdb_iter_prev(iter: *mut DBIterator);
pub fn crocksdb_iter_key(iter: *const DBIterator, klen: *mut size_t) -> *mut u8; pub fn crocksdb_iter_key(iter: *const DBIterator, klen: *mut size_t) -> *mut u8;
pub fn crocksdb_iter_value(iter: *const DBIterator, vlen: *mut size_t) -> *mut u8; pub fn crocksdb_iter_value(iter: *const DBIterator, vlen: *mut size_t) -> *mut u8;
pub fn crocksdb_iter_seqno(iter: *const DBIterator, seqno: *mut u64) -> bool;
pub fn crocksdb_iter_get_error(iter: *const DBIterator, err: *mut *mut c_char); pub fn crocksdb_iter_get_error(iter: *const DBIterator, err: *mut *mut c_char);
// Write batch // Write batch
pub fn crocksdb_write( pub fn crocksdb_write(
...@@ -1516,14 +1517,15 @@ extern "C" { ...@@ -1516,14 +1517,15 @@ extern "C" {
propname: *const c_char, propname: *const c_char,
) -> *mut c_char; ) -> *mut c_char;
// Compaction filter // Compaction filter
pub fn crocksdb_compactionfilter_create_v2( pub fn crocksdb_compactionfilter_create(
state: *mut c_void, state: *mut c_void,
destructor: extern "C" fn(*mut c_void), destructor: extern "C" fn(*mut c_void),
filter_v2: extern "C" fn( filter: extern "C" fn(
*mut c_void, *mut c_void,
c_int, c_int,
*const u8, *const u8,
size_t, size_t,
u64,
CompactionFilterValueType, CompactionFilterValueType,
*const u8, *const u8,
size_t, size_t,
......
...@@ -8,7 +8,7 @@ pub use crocksdb_ffi::DBCompactionFilter; ...@@ -8,7 +8,7 @@ pub use crocksdb_ffi::DBCompactionFilter;
use crocksdb_ffi::{self, DBCompactionFilterContext, DBCompactionFilterFactory}; use crocksdb_ffi::{self, DBCompactionFilterContext, DBCompactionFilterFactory};
use libc::{c_char, c_int, c_void, malloc, memcpy, size_t}; use libc::{c_char, c_int, c_void, malloc, memcpy, size_t};
/// Decision used in `CompactionFilter::filter_v2`. /// Decision used in `CompactionFilter::filter`.
pub enum CompactionFilterDecision { pub enum CompactionFilterDecision {
/// The record will be kept instead of filtered. /// The record will be kept instead of filtered.
Keep, Keep,
...@@ -43,10 +43,11 @@ pub trait CompactionFilter { ...@@ -43,10 +43,11 @@ pub trait CompactionFilter {
} }
/// This method will overwrite `filter` if a `CompactionFilter` implements both of them. /// This method will overwrite `filter` if a `CompactionFilter` implements both of them.
fn filter_v2( fn featured_filter(
&mut self, &mut self,
level: usize, level: usize,
key: &[u8], key: &[u8],
_seqno: u64,
value: &[u8], value: &[u8],
value_type: CompactionFilterValueType, value_type: CompactionFilterValueType,
) -> CompactionFilterDecision { ) -> CompactionFilterDecision {
...@@ -84,11 +85,12 @@ extern "C" fn destructor(filter: *mut c_void) { ...@@ -84,11 +85,12 @@ extern "C" fn destructor(filter: *mut c_void) {
} }
} }
extern "C" fn filter_v2( extern "C" fn filter(
filter: *mut c_void, filter: *mut c_void,
level: c_int, level: c_int,
key: *const u8, key: *const u8,
key_len: size_t, key_len: size_t,
seqno: u64,
value_type: CompactionFilterValueType, value_type: CompactionFilterValueType,
value: *const u8, value: *const u8,
value_len: size_t, value_len: size_t,
...@@ -106,7 +108,7 @@ extern "C" fn filter_v2( ...@@ -106,7 +108,7 @@ extern "C" fn filter_v2(
let filter = &mut (*(filter as *mut CompactionFilterProxy)).filter; let filter = &mut (*(filter as *mut CompactionFilterProxy)).filter;
let key = slice::from_raw_parts(key, key_len); let key = slice::from_raw_parts(key, key_len);
let value = slice::from_raw_parts(value, value_len); let value = slice::from_raw_parts(value, value_len);
match filter.filter_v2(level as usize, key, value, value_type) { match filter.featured_filter(level as usize, key, seqno, value, value_type) {
CompactionFilterDecision::Keep => RawCompactionFilterDecision::Keep, CompactionFilterDecision::Keep => RawCompactionFilterDecision::Keep,
CompactionFilterDecision::Remove => RawCompactionFilterDecision::Remove, CompactionFilterDecision::Remove => RawCompactionFilterDecision::Remove,
CompactionFilterDecision::ChangeValue(new_v) => { CompactionFilterDecision::ChangeValue(new_v) => {
...@@ -155,12 +157,7 @@ pub unsafe fn new_compaction_filter_raw( ...@@ -155,12 +157,7 @@ pub unsafe fn new_compaction_filter_raw(
name: c_name, name: c_name,
filter: f, filter: f,
})); }));
crocksdb_ffi::crocksdb_compactionfilter_create_v2( crocksdb_ffi::crocksdb_compactionfilter_create(proxy as *mut c_void, destructor, filter, name)
proxy as *mut c_void,
destructor,
filter_v2,
name,
)
} }
pub struct CompactionFilterContext(DBCompactionFilterContext); pub struct CompactionFilterContext(DBCompactionFilterContext);
......
...@@ -305,6 +305,17 @@ impl<D> DBIterator<D> { ...@@ -305,6 +305,17 @@ impl<D> DBIterator<D> {
} }
} }
pub fn sequence(&self) -> Option<u64> {
debug_assert_eq!(self.valid(), Ok(true));
unsafe {
let mut seqno = 0;
if crocksdb_ffi::crocksdb_iter_seqno(self.inner, &mut seqno) {
return Some(seqno);
}
None
}
}
#[deprecated] #[deprecated]
pub fn kv(&self) -> Option<(Vec<u8>, Vec<u8>)> { pub fn kv(&self) -> Option<(Vec<u8>, Vec<u8>)> {
if self.valid().unwrap() { if self.valid().unwrap() {
......
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
// limitations under the License. // limitations under the License.
use std::ops::Deref; use std::ops::Deref;
use std::sync::mpsc::{self, SyncSender};
use std::sync::*; use std::sync::*;
use std::thread; use std::thread;
...@@ -409,3 +410,72 @@ fn test_fixed_suffix_seek() { ...@@ -409,3 +410,72 @@ fn test_fixed_suffix_seek() {
let vec = prev_collect(&mut iter); let vec = prev_collect(&mut iter);
assert!(vec.len() == 0); assert!(vec.len() == 0);
} }
#[test]
fn test_iter_sequence_number() {
struct TestCompactionFilter(SyncSender<(Vec<u8>, Vec<u8>, u64)>);
impl CompactionFilter for TestCompactionFilter {
fn featured_filter(
&mut self,
_: usize,
key: &[u8],
seqno: u64,
value: &[u8],
_: CompactionFilterValueType,
) -> CompactionFilterDecision {
self.0.send((key.to_vec(), value.to_vec(), seqno)).unwrap();
CompactionFilterDecision::Keep
}
}
let (tx, rx) = mpsc::sync_channel(8);
let filter = Box::new(TestCompactionFilter(tx));
let path = tempdir_with_prefix("_rust_rocksdb_sequence_number");
let mut opts = DBOptions::new();
opts.create_if_missing(true);
let mut cf_opts = ColumnFamilyOptions::new();
cf_opts.set_disable_auto_compactions(true);
cf_opts.set_num_levels(7);
cf_opts.set_compaction_filter("test", filter).unwrap();
let db = DB::open_cf(
opts,
path.path().to_str().unwrap(),
vec![("default", cf_opts)],
)
.unwrap();
db.put(b"key1", b"value11").unwrap();
db.flush(false).unwrap();
db.put(b"key1", b"value22").unwrap();
db.flush(false).unwrap();
db.put(b"key2", b"value21").unwrap();
db.flush(false).unwrap();
db.put(b"key2", b"value22").unwrap();
let mut iter = db.iter();
assert!(iter.seek(SeekKey::Key(b"key1")).unwrap());
assert_eq!(iter.key(), b"key1");
assert_eq!(iter.value(), b"value22");
assert_eq!(iter.sequence().unwrap(), 2);
assert!(iter.next().unwrap());
assert_eq!(iter.key(), b"key2");
assert_eq!(iter.value(), b"value22");
assert_eq!(iter.sequence().unwrap(), 4);
let mut compact_opts = CompactOptions::new();
compact_opts.set_bottommost_level_compaction(DBBottommostLevelCompaction::Force);
compact_opts.set_target_level(6);
let cf_default = db.cf_handle("default").unwrap();
db.compact_range_cf_opt(cf_default, &compact_opts, Some(b"a"), Some(b"z"));
let (k, v, seqno) = rx.recv().unwrap();
assert_eq!(k, b"key1");
assert_eq!(v, b"value22");
assert_eq!(seqno, 2);
let (k, v, seqno) = rx.recv().unwrap();
assert_eq!(k, b"key2");
assert_eq!(v, b"value22");
assert_eq!(seqno, 4);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment