Commit 5f0b895c authored by Tyler Neely's avatar Tyler Neely

more thrashing about for merge operator support

parent 12c7afad
...@@ -128,7 +128,7 @@ extern { ...@@ -128,7 +128,7 @@ extern {
) -> *const c_char, ) -> *const c_char,
partial_merge: extern fn( partial_merge: extern fn(
arg: *mut c_void, key: *const c_char, key_len: size_t, arg: *mut c_void, key: *const c_char, key_len: size_t,
operands_list: *const c_void, operands_list_len: *const c_void, operands_list: *const *const c_char, operands_list_len: *const size_t,
num_operands: c_int, num_operands: c_int,
success: *mut u8, new_value_length: *mut size_t success: *mut u8, new_value_length: *mut size_t
) -> *const c_char, ) -> *const c_char,
......
...@@ -54,9 +54,10 @@ impl RocksDBOptions { ...@@ -54,9 +54,10 @@ impl RocksDBOptions {
} }
} }
pub fn set_merge_operator(&self, mo: rocksdb_ffi::RocksDBMergeOperator) { pub fn add_merge_operator(&self, name: &[str], merge_fn: for <'b> fn (String, Option<String>, &mut MergeOperands) -> &'b [u8]) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_options_set_merge_operator(self.inner, mo); let mo = MergeOperator::new(name, merge_fn);
rocksdb_ffi::rocksdb_options_set_merge_operator(self.inner, mo.mo);
} }
} }
} }
...@@ -355,20 +356,6 @@ fn external() { ...@@ -355,20 +356,6 @@ fn external() {
assert!(RocksDB::destroy(opts, path).is_ok()); assert!(RocksDB::destroy(opts, path).is_ok());
} }
extern "C" fn null_destructor(args: *mut c_void) {
println!("in null_destructor");
}
extern "C" fn mergeoperator_name(args: *mut c_void) -> *const c_char {
println!("in mergeoperator_name");
let name = "test_mo".to_c_str();
unsafe {
let buf = libc::malloc(8 as size_t);
ptr::copy_memory(&mut *buf, name.as_ptr() as *const c_void, 8);
println!("returning from mergeoperator_name");
buf as *const c_char
}
}
struct MergeOperands<'a> { struct MergeOperands<'a> {
operands_list: *const *const c_char, operands_list: *const *const c_char,
operands_list_len: *const size_t, operands_list_len: *const size_t,
...@@ -389,7 +376,7 @@ impl <'a> MergeOperands<'a> { ...@@ -389,7 +376,7 @@ impl <'a> MergeOperands<'a> {
} }
} }
impl <'a> Iterator<&'a [u8]> for MergeOperands<'a> { impl <'a> Iterator<&'a [u8]> for &'a mut MergeOperands<'a> {
fn next(&mut self) -> Option<&'a [u8]> { fn next(&mut self) -> Option<&'a [u8]> {
use std::raw::Slice; use std::raw::Slice;
match self.cursor == self.num_operands { match self.cursor == self.num_operands {
...@@ -418,70 +405,114 @@ impl <'a> Iterator<&'a [u8]> for MergeOperands<'a> { ...@@ -418,70 +405,114 @@ impl <'a> Iterator<&'a [u8]> for MergeOperands<'a> {
} }
} }
extern "C" fn full_merge( struct MergeOperatorState<'a> {
arg: *mut c_void, key: *const c_char, key_len: size_t, name: &'a [str],
existing_value: *const c_char, existing_value_len: size_t, merge_fn: for <'b> fn (String, Option<String>, &mut MergeOperands) -> &'b [u8],
operands_list: *const *const c_char, operands_list_len: *const size_t, }
num_operands: c_int,
success: *mut u8, new_value_length: *mut size_t) -> *const c_char {
unsafe {
println!("in the FULL merge operator");
let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands);
let key = from_buf_len(key as *const u8, key_len as uint);
let oldval = from_buf_len(existing_value as *const u8, existing_value_len as uint);
println!("returning from FULL merge"); struct MergeOperator<'a> {
//TODO rust will "free" this when it goes out of scope, copy this to a non-gc'd buffer to return mo: rocksdb_ffi::RocksDBMergeOperator,
merge(key, Some(oldval), operands); state: MergeOperatorState<'a>,
}
let buf = libc::malloc(1 as size_t); impl <'a> MergeOperator<'a> {
match buf.is_null() { pub fn new<'a>(name: &'a [str], merge_fn: for <'b> fn (String, Option<String>, &mut MergeOperands) -> &'b [u8]) -> &'a MergeOperator<'a> {
false => { let state = &MergeOperatorState {
*new_value_length = 1 as size_t; name: name,
*success = 1 as u8; merge_fn: merge_fn,
let newval = "2"; };
ptr::copy_memory(&mut *buf, newval.as_ptr() as *const c_void, 1);
println!("returning from full_merge"); let ffi_operator = rocksdb_ffi::rocksdb_mergeoperator_create(
buf as *const c_char state as *mut c_void,
}, state.null_destructor,
true => { state.full_merge,
println!("returning from full_merge"); state.partial_merge,
0 as *const c_char None,
} state.mergeoperator_name);
&MergeOperator {
mo: ffi_operator,
state: state,
} }
} }
} }
extern "C" fn partial_merge(
arg: *mut c_void, key: *const c_char, key_len: size_t, impl <'a> MergeOperatorState<'a> {
operands_list: *const c_void, operands_list_len: *const c_void,
num_operands: c_int, extern "C" fn null_destructor(&self) {
success: *mut u8, new_value_length: *mut size_t) -> *const c_char { println!("in null_destructor");
unsafe { }
println!("in the PARTIAL merge operator");
*new_value_length = 2; extern "C" fn mergeoperator_name(&self) -> *const c_char {
*success = 1 as u8; println!("in mergeoperator_name");
let buf = libc::malloc(1 as size_t); let name = self.name.to_c_str();
match buf.is_null() { unsafe {
false => { let buf = libc::malloc(8 as size_t);
println!("number of operands: {}", num_operands); ptr::copy_memory(&mut *buf, name.as_ptr() as *const c_void, 8);
println!("first operand: {}", from_buf_len(operands_list as *const u8, 1)); println!("returning from mergeoperator_name");
*new_value_length = 1 as size_t; buf as *const c_char
*success = 1 as u8; }
let newval = "2"; }
ptr::copy_memory(&mut *buf, newval.as_ptr() as *const c_void, 1);
println!("returning from partial_merge"); extern "C" fn full_merge(
buf as *const c_char &self, key: *const c_char, key_len: size_t,
}, existing_value: *const c_char, existing_value_len: size_t,
true => { operands_list: *const *const c_char, operands_list_len: *const size_t,
println!("returning from partial_merge"); num_operands: c_int,
0 as *const c_char success: *mut u8, new_value_length: *mut size_t) -> *const c_char {
} unsafe {
println!("in the FULL merge operator");
let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands);
let key = from_buf_len(key as *const u8, key_len as uint);
let oldval = from_buf_len(existing_value as *const u8, existing_value_len as uint);
let result = self.merge_fn(key, Some(oldval), operands);
let buf = libc::malloc(result.len() as size_t);
assert!(buf.is_not_null());
*new_value_length = 1 as size_t;
*success = 1 as u8;
let newval = "2";
ptr::copy_memory(&mut *buf, result.as_ptr() as *const c_void, result.len());
println!("returning from full_merge");
buf as *const c_char
}
}
extern "C" fn partial_merge(
&self, key: *const c_char, key_len: size_t,
operands_list: *const *const c_char, operands_list_len: *const size_t,
num_operands: c_int,
success: *mut u8, new_value_length: *mut size_t) -> *const c_char {
unsafe {
println!("in the PARTIAL merge operator");
let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands);
let key = from_buf_len(key as *const u8, key_len as uint);
let result = self.merge_fn(key, None, operands);
let buf = libc::malloc(result.len() as size_t);
assert!(buf.is_not_null());
*new_value_length = 1 as size_t;
*success = 1 as u8;
let newval = "2";
ptr::copy_memory(&mut *buf, result.as_ptr() as *const c_void, result.len());
buf as *const c_char
} }
} }
} }
fn merge<'a>(new_key: String, existing_val: Option<String>, operands: &mut MergeOperands) -> &'a [u8] { fn create_full_merge(provided_merge: for<'a> fn (new_key: String, existing_val: Option<String>,
for op in *operands { mut operands: &mut MergeOperands) -> &'a [u8]) {
}
fn create_partial_merge(provided_merge: for<'a> fn (new_key: String, existing_val: Option<String>,
mut operands: &mut MergeOperands) -> &'a [u8]) {
}
fn test_provided_merge<'a>(new_key: String, existing_val: Option<String>,
mut operands: &mut MergeOperands) -> &'a [u8] {
for op in operands {
println!("op: {}", from_utf8(op)); println!("op: {}", from_utf8(op));
} }
...@@ -494,15 +525,8 @@ fn mergetest() { ...@@ -494,15 +525,8 @@ fn mergetest() {
let path = "_rust_rocksdb_mergetest"; let path = "_rust_rocksdb_mergetest";
unsafe { unsafe {
let opts = RocksDBOptions::new(); let opts = RocksDBOptions::new();
let mo = rocksdb_ffi::rocksdb_mergeoperator_create(
0 as *mut c_void,
null_destructor,
full_merge,
partial_merge,
None,
mergeoperator_name);
opts.create_if_missing(true); opts.create_if_missing(true);
opts.set_merge_operator(mo); opts.add_merge_operator("test operator", test_provided_merge);
let db = RocksDB::open(opts, path).unwrap(); let db = RocksDB::open(opts, path).unwrap();
let p = db.put(b"k1", b"1"); let p = db.put(b"k1", b"1");
assert!(p.is_ok()); assert!(p.is_ok());
...@@ -524,7 +548,7 @@ fn mergetest() { ...@@ -524,7 +548,7 @@ fn mergetest() {
assert!(m.is_ok()); assert!(m.is_ok());
let r: RocksDBResult<RocksDBVector, String> = db.get(b"k1"); let r: RocksDBResult<RocksDBVector, String> = db.get(b"k1");
assert!(r.unwrap().to_utf8().unwrap() == "2"); //assert!(r.unwrap().to_utf8().unwrap() == "yoyo");
assert!(db.delete(b"k1").is_ok()); assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").is_none()); assert!(db.get(b"k1").is_none());
db.close(); db.close();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment