Commit a09580fe authored by Tyler Neely's avatar Tyler Neely

small refactor to get CF's working on batches at the cost of OOB CF handling

parent c5e6fb30
File added
...@@ -21,7 +21,7 @@ use std::ptr; ...@@ -21,7 +21,7 @@ use std::ptr;
use std::slice; use std::slice;
use rocksdb_options::Options; use rocksdb_options::Options;
use rocksdb::RocksDB; use rocksdb::DB;
pub struct ComparatorCallback { pub struct ComparatorCallback {
pub name: CString, pub name: CString,
...@@ -75,8 +75,8 @@ fn test_reverse_compare(a: &[u8], b: &[u8]) -> c_int { ...@@ -75,8 +75,8 @@ fn test_reverse_compare(a: &[u8], b: &[u8]) -> c_int {
// opts.create_if_missing(true); // opts.create_if_missing(true);
// opts.add_comparator("test comparator", test_reverse_compare); // opts.add_comparator("test comparator", test_reverse_compare);
// { // {
// let db = RocksDB::open(&opts, path).unwrap(); // let db = DB::open(&opts, path).unwrap();
// // TODO add interesting test // // TODO add interesting test
// } // }
// assert!(RocksDB::destroy(&opts, path).is_ok()); // assert!(DB::destroy(&opts, path).is_ok());
//} //}
This diff is collapsed.
...@@ -17,8 +17,8 @@ ...@@ -17,8 +17,8 @@
#![crate_type = "lib"] #![crate_type = "lib"]
pub use ffi as rocksdb_ffi; pub use ffi as rocksdb_ffi;
pub use ffi::{new_bloom_filter, RocksDBCompactionStyle, RocksDBComparator}; pub use ffi::{new_bloom_filter, DBCompactionStyle, DBComparator};
pub use rocksdb::{RocksDB, RocksDBResult, RocksDBVector, WriteBatch, Writable, Direction}; pub use rocksdb::{DB, DBResult, DBVector, WriteBatch, Writable, Direction};
pub use rocksdb_options::{Options, BlockBasedOptions}; pub use rocksdb_options::{Options, BlockBasedOptions};
pub use merge_operator::MergeOperands; pub use merge_operator::MergeOperands;
pub mod rocksdb; pub mod rocksdb;
......
...@@ -14,13 +14,13 @@ ...@@ -14,13 +14,13 @@
limitations under the License. limitations under the License.
*/ */
extern crate rocksdb; extern crate rocksdb;
use rocksdb::{Options, RocksDB, MergeOperands, new_bloom_filter, Writable, }; use rocksdb::{Options, DB, MergeOperands, new_bloom_filter, Writable, };
use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction; use rocksdb::DBCompactionStyle::DBUniversalCompaction;
//fn snapshot_test() { //fn snapshot_test() {
// let path = "_rust_rocksdb_iteratortest"; // let path = "_rust_rocksdb_iteratortest";
// { // {
// let mut db = RocksDB::open_default(path).unwrap(); // let mut db = DB::open_default(path).unwrap();
// let p = db.put(b"k1", b"v1111"); // let p = db.put(b"k1", b"v1111");
// assert!(p.is_ok()); // assert!(p.is_ok());
// let p = db.put(b"k2", b"v2222"); // let p = db.put(b"k2", b"v2222");
...@@ -41,13 +41,13 @@ use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction; ...@@ -41,13 +41,13 @@ use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction;
// }; // };
// } // }
// let opts = Options::new(); // let opts = Options::new();
// assert!(RocksDB::destroy(&opts, path).is_ok()); // assert!(DB::destroy(&opts, path).is_ok());
//} //}
#[cfg(not(feature = "valgrind"))] #[cfg(not(feature = "valgrind"))]
fn main() { fn main() {
let path = "/tmp/rust-rocksdb"; let path = "/tmp/rust-rocksdb";
let mut db = RocksDB::open_default(path).unwrap(); let mut db = DB::open_default(path).unwrap();
assert!(db.put(b"my key", b"my value").is_ok()); assert!(db.put(b"my key", b"my value").is_ok());
db.get(b"my key").map( |value| { db.get(b"my key").map( |value| {
match value.to_utf8() { match value.to_utf8() {
...@@ -88,7 +88,7 @@ fn custom_merge() { ...@@ -88,7 +88,7 @@ fn custom_merge() {
opts.create_if_missing(true); opts.create_if_missing(true);
opts.add_merge_operator("test operator", concat_merge); opts.add_merge_operator("test operator", concat_merge);
{ {
let mut db = RocksDB::open(&opts, path).unwrap(); let mut db = DB::open(&opts, path).unwrap();
db.put(b"k1", b"a"); db.put(b"k1", b"a");
db.merge(b"k1", b"b"); db.merge(b"k1", b"b");
db.merge(b"k1", b"c"); db.merge(b"k1", b"c");
...@@ -107,7 +107,7 @@ fn custom_merge() { ...@@ -107,7 +107,7 @@ fn custom_merge() {
.on_error( |e| { println!("error retrieving value: {}", e) }); .on_error( |e| { println!("error retrieving value: {}", e) });
} }
RocksDB::destroy(&opts, path).is_ok(); DB::destroy(&opts, path).is_ok();
} }
#[cfg(feature = "valgrind")] #[cfg(feature = "valgrind")]
...@@ -116,7 +116,7 @@ fn main() { ...@@ -116,7 +116,7 @@ fn main() {
let mut opts = Options::new(); let mut opts = Options::new();
opts.create_if_missing(true); opts.create_if_missing(true);
opts.add_merge_operator("test operator", concat_merge); opts.add_merge_operator("test operator", concat_merge);
let db = RocksDB::open(&opts, path).unwrap(); let db = DB::open(&opts, path).unwrap();
loop { loop {
db.put(b"k1", b"a"); db.put(b"k1", b"a");
db.merge(b"k1", b"b"); db.merge(b"k1", b"b");
...@@ -141,10 +141,10 @@ fn main() { ...@@ -141,10 +141,10 @@ fn main() {
mod tests { mod tests {
use std::thread::sleep_ms; use std::thread::sleep_ms;
use rocksdb::{BlockBasedOptions, Options, RocksDB, MergeOperands, new_bloom_filter, Writable }; use rocksdb::{BlockBasedOptions, Options, DB, MergeOperands, new_bloom_filter, Writable };
use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction; use rocksdb::DBCompactionStyle::DBUniversalCompaction;
fn tuned_for_somebody_elses_disk(path: &str, opts: & mut Options, blockopts: &mut BlockBasedOptions) -> RocksDB { fn tuned_for_somebody_elses_disk(path: &str, opts: & mut Options, blockopts: &mut BlockBasedOptions) -> DB {
opts.create_if_missing(true); opts.create_if_missing(true);
opts.set_max_open_files(10000); opts.set_max_open_files(10000);
opts.set_use_fsync(false); opts.set_use_fsync(false);
...@@ -158,7 +158,7 @@ mod tests { ...@@ -158,7 +158,7 @@ mod tests {
opts.set_min_write_buffer_number_to_merge(4); opts.set_min_write_buffer_number_to_merge(4);
opts.set_level_zero_stop_writes_trigger(2000); opts.set_level_zero_stop_writes_trigger(2000);
opts.set_level_zero_slowdown_writes_trigger(0); opts.set_level_zero_slowdown_writes_trigger(0);
opts.set_compaction_style(RocksDBUniversalCompaction); opts.set_compaction_style(DBUniversalCompaction);
opts.set_max_background_compactions(4); opts.set_max_background_compactions(4);
opts.set_max_background_flushes(4); opts.set_max_background_flushes(4);
opts.set_filter_deletes(false); opts.set_filter_deletes(false);
...@@ -169,7 +169,7 @@ mod tests { ...@@ -169,7 +169,7 @@ mod tests {
let filter = new_bloom_filter(10); let filter = new_bloom_filter(10);
//opts.set_filter(filter); //opts.set_filter(filter);
RocksDB::open(&opts, path).unwrap() DB::open(&opts, path).unwrap()
} }
/* TODO(tyler) unstable /* TODO(tyler) unstable
...@@ -204,7 +204,7 @@ mod tests { ...@@ -204,7 +204,7 @@ mod tests {
i += 1; i += 1;
}); });
} }
RocksDB::destroy(&opts, path).is_ok(); DB::destroy(&opts, path).is_ok();
} }
*/ */
} }
...@@ -21,7 +21,7 @@ use std::ptr; ...@@ -21,7 +21,7 @@ use std::ptr;
use std::slice; use std::slice;
use rocksdb_options::Options; use rocksdb_options::Options;
use rocksdb::{RocksDB, RocksDBResult, RocksDBVector, Writable}; use rocksdb::{DB, DBResult, DBVector, Writable};
pub struct MergeOperatorCallback { pub struct MergeOperatorCallback {
pub name: CString, pub name: CString,
...@@ -187,7 +187,7 @@ fn mergetest() { ...@@ -187,7 +187,7 @@ fn mergetest() {
opts.create_if_missing(true); opts.create_if_missing(true);
opts.add_merge_operator("test operator", test_provided_merge); opts.add_merge_operator("test operator", test_provided_merge);
{ {
let mut db = RocksDB::open(&opts, path).unwrap(); let mut db = DB::open(&opts, path).unwrap();
let p = db.put(b"k1", b"a"); let p = db.put(b"k1", b"a");
assert!(p.is_ok()); assert!(p.is_ok());
db.merge(b"k1", b"b"); db.merge(b"k1", b"b");
...@@ -207,10 +207,10 @@ fn mergetest() { ...@@ -207,10 +207,10 @@ fn mergetest() {
.on_error( |e| { println!("error reading value")}); //: {", e) }); .on_error( |e| { println!("error reading value")}); //: {", e) });
assert!(m.is_ok()); assert!(m.is_ok());
let r: RocksDBResult<RocksDBVector, String> = db.get(b"k1"); let r: DBResult<DBVector, String> = db.get(b"k1");
assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh"); assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh");
assert!(db.delete(b"k1").is_ok()); assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").is_none()); assert!(db.get(b"k1").is_none());
} }
assert!(RocksDB::destroy(&opts, path).is_ok()); assert!(DB::destroy(&opts, path).is_ok());
} }
This diff is collapsed.
...@@ -24,11 +24,11 @@ use merge_operator::{self, MergeOperatorCallback, MergeOperands, ...@@ -24,11 +24,11 @@ use merge_operator::{self, MergeOperatorCallback, MergeOperands,
use comparator::{self, ComparatorCallback, compare_callback}; use comparator::{self, ComparatorCallback, compare_callback};
pub struct BlockBasedOptions { pub struct BlockBasedOptions {
inner: rocksdb_ffi::RocksDBBlockBasedTableOptions, inner: rocksdb_ffi::DBBlockBasedTableOptions,
} }
pub struct Options { pub struct Options {
pub inner: rocksdb_ffi::RocksDBOptions, pub inner: rocksdb_ffi::DBOptions,
} }
impl Drop for Options { impl Drop for Options {
...@@ -50,7 +50,7 @@ impl Drop for BlockBasedOptions { ...@@ -50,7 +50,7 @@ impl Drop for BlockBasedOptions {
impl BlockBasedOptions { impl BlockBasedOptions {
pub fn new() -> BlockBasedOptions { pub fn new() -> BlockBasedOptions {
let block_opts = unsafe {rocksdb_ffi::rocksdb_block_based_options_create() }; let block_opts = unsafe {rocksdb_ffi::rocksdb_block_based_options_create() };
let rocksdb_ffi::RocksDBBlockBasedTableOptions(opt_ptr) = block_opts; let rocksdb_ffi::DBBlockBasedTableOptions(opt_ptr) = block_opts;
if opt_ptr.is_null() { if opt_ptr.is_null() {
panic!("Could not create rocksdb block based options".to_string()); panic!("Could not create rocksdb block based options".to_string());
} }
...@@ -65,21 +65,21 @@ impl BlockBasedOptions { ...@@ -65,21 +65,21 @@ impl BlockBasedOptions {
} }
//TODO figure out how to create these in a Rusty way //TODO figure out how to create these in a Rusty way
////pub fn set_filter(&mut self, filter: rocksdb_ffi::RocksDBFilterPolicy) { ////pub fn set_filter(&mut self, filter: rocksdb_ffi::DBFilterPolicy) {
//// unsafe { //// unsafe {
//// rocksdb_ffi::rocksdb_block_based_options_set_filter_policy( //// rocksdb_ffi::rocksdb_block_based_options_set_filter_policy(
//// self.inner, filter); //// self.inner, filter);
//// } //// }
////} ////}
////pub fn set_cache(&mut self, cache: rocksdb_ffi::RocksDBCache) { ////pub fn set_cache(&mut self, cache: rocksdb_ffi::DBCache) {
//// unsafe { //// unsafe {
//// rocksdb_ffi::rocksdb_block_based_options_set_block_cache( //// rocksdb_ffi::rocksdb_block_based_options_set_block_cache(
//// self.inner, cache); //// self.inner, cache);
//// } //// }
////} ////}
////pub fn set_cache_compressed(&mut self, cache: rocksdb_ffi::RocksDBCache) { ////pub fn set_cache_compressed(&mut self, cache: rocksdb_ffi::DBCache) {
//// unsafe { //// unsafe {
//// rocksdb_ffi::rocksdb_block_based_options_set_block_cache_compressed( //// rocksdb_ffi::rocksdb_block_based_options_set_block_cache_compressed(
//// self.inner, cache); //// self.inner, cache);
...@@ -92,7 +92,7 @@ impl Options { ...@@ -92,7 +92,7 @@ impl Options {
pub fn new() -> Options { pub fn new() -> Options {
unsafe { unsafe {
let opts = rocksdb_ffi::rocksdb_options_create(); let opts = rocksdb_ffi::rocksdb_options_create();
let rocksdb_ffi::RocksDBOptions(opt_ptr) = opts; let rocksdb_ffi::DBOptions(opt_ptr) = opts;
if opt_ptr.is_null() { if opt_ptr.is_null() {
panic!("Could not create rocksdb options".to_string()); panic!("Could not create rocksdb options".to_string());
} }
...@@ -258,7 +258,7 @@ impl Options { ...@@ -258,7 +258,7 @@ impl Options {
} }
} }
pub fn set_compaction_style(&mut self, style: rocksdb_ffi::RocksDBCompactionStyle) { pub fn set_compaction_style(&mut self, style: rocksdb_ffi::DBCompactionStyle) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_options_set_compaction_style( rocksdb_ffi::rocksdb_options_set_compaction_style(
self.inner, style); self.inner, style);
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
use rocksdb::{Options, RocksDB, RocksDBResult, Writable, Direction, MergeOperands}; use rocksdb::{Options, DB, DBResult, Writable, Direction, MergeOperands};
#[test] #[test]
pub fn test_column_family() { pub fn test_column_family() {
...@@ -24,7 +24,7 @@ pub fn test_column_family() { ...@@ -24,7 +24,7 @@ pub fn test_column_family() {
let mut opts = Options::new(); let mut opts = Options::new();
opts.create_if_missing(true); opts.create_if_missing(true);
opts.add_merge_operator("test operator", test_provided_merge); opts.add_merge_operator("test operator", test_provided_merge);
let mut db = RocksDB::open(&opts, path).unwrap(); let mut db = DB::open(&opts, path).unwrap();
let opts = Options::new(); let opts = Options::new();
match db.create_cf("cf1", &opts) { match db.create_cf("cf1", &opts) {
Ok(_) => println!("cf1 created successfully"), Ok(_) => println!("cf1 created successfully"),
...@@ -38,7 +38,7 @@ pub fn test_column_family() { ...@@ -38,7 +38,7 @@ pub fn test_column_family() {
{ {
let mut opts = Options::new(); let mut opts = Options::new();
opts.add_merge_operator("test operator", test_provided_merge); opts.add_merge_operator("test operator", test_provided_merge);
match RocksDB::open(&opts, path) { match DB::open(&opts, path) {
Ok(_) => panic!("should not have opened DB successfully without specifying column Ok(_) => panic!("should not have opened DB successfully without specifying column
families"), families"),
Err(e) => assert!(e.starts_with("Invalid argument: You have to open all column families.")), Err(e) => assert!(e.starts_with("Invalid argument: You have to open all column families.")),
...@@ -49,33 +49,34 @@ pub fn test_column_family() { ...@@ -49,33 +49,34 @@ pub fn test_column_family() {
{ {
let mut opts = Options::new(); let mut opts = Options::new();
opts.add_merge_operator("test operator", test_provided_merge); opts.add_merge_operator("test operator", test_provided_merge);
match RocksDB::open_cf(&opts, path, &["cf1"]) { match DB::open_cf(&opts, path, &["cf1"]) {
Ok(_) => println!("successfully opened db with column family"), Ok(_) => println!("successfully opened db with column family"),
Err(e) => panic!("failed to open db with column family: {}", e), Err(e) => panic!("failed to open db with column family: {}", e),
} }
} }
// should be able to write, read, merge, batch, and iterate over a cf // TODO should be able to write, read, merge, batch, and iterate over a cf
{ {
let mut opts = Options::new(); let mut opts = Options::new();
opts.add_merge_operator("test operator", test_provided_merge); opts.add_merge_operator("test operator", test_provided_merge);
let mut db = match RocksDB::open_cf(&opts, path, &["cf1"]) { let mut db = match DB::open_cf(&opts, path, &["cf1"]) {
Ok(db) => { Ok(db) => {
println!("successfully opened db with column family"); println!("successfully opened db with column family");
db db
}, },
Err(e) => panic!("failed to open db with column family: {}", e), Err(e) => panic!("failed to open db with column family: {}", e),
}; };
assert!(db.put_cf("cf1", b"k1", b"v1").is_ok()); let cf1 = *db.cf_handle("cf1").unwrap();
assert!(db.get_cf("cf1", b"k1").unwrap().to_utf8().unwrap() == "v1"); assert!(db.put_cf(cf1, b"k1", b"v1").is_ok());
let p = db.put_cf("cf1", b"k1", b"a"); assert!(db.get_cf(cf1, b"k1").unwrap().to_utf8().unwrap() == "v1");
let p = db.put_cf(cf1, b"k1", b"a");
assert!(p.is_ok()); assert!(p.is_ok());
db.merge_cf("cf1", b"k1", b"b"); db.merge_cf(cf1, b"k1", b"b");
db.merge_cf("cf1", b"k1", b"c"); db.merge_cf(cf1, b"k1", b"c");
db.merge_cf("cf1", b"k1", b"d"); db.merge_cf(cf1, b"k1", b"d");
db.merge_cf("cf1", b"k1", b"efg"); db.merge_cf(cf1, b"k1", b"efg");
let m = db.merge_cf("cf1", b"k1", b"h"); let m = db.merge_cf(cf1, b"k1", b"h");
println!("m is {:?}", m); println!("m is {:?}", m);
assert!(m.is_ok()); // TODO assert!(m.is_ok());
db.get(b"k1").map( |value| { db.get(b"k1").map( |value| {
match value.to_utf8() { match value.to_utf8() {
Some(v) => Some(v) =>
...@@ -86,21 +87,27 @@ pub fn test_column_family() { ...@@ -86,21 +87,27 @@ pub fn test_column_family() {
}).on_absent( || { println!("value not present!") }) }).on_absent( || { println!("value not present!") })
.on_error( |e| { println!("error reading value")}); //: {", e) }); .on_error( |e| { println!("error reading value")}); //: {", e) });
let r = db.get_cf("cf1", b"k1"); let r = db.get_cf(cf1, b"k1");
assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh"); // TODO assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh");
assert!(db.delete(b"k1").is_ok()); assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").is_none()); assert!(db.get(b"k1").is_none());
} }
// TODO should be able to use writebatch ops with a cf
{
}
// TODO should be able to iterate over a cf
{
}
// should b able to drop a cf // should b able to drop a cf
{ {
let mut db = RocksDB::open_cf(&Options::new(), path, &["cf1"]).unwrap(); let mut db = DB::open_cf(&Options::new(), path, &["cf1"]).unwrap();
match db.drop_cf("cf1") { match db.drop_cf("cf1") {
Ok(_) => println!("cf1 successfully dropped."), Ok(_) => println!("cf1 successfully dropped."),
Err(e) => panic!("failed to drop column family: {}", e), Err(e) => panic!("failed to drop column family: {}", e),
} }
} }
assert!(RocksDB::destroy(&Options::new(), path).is_ok()); assert!(DB::destroy(&Options::new(), path).is_ok());
} }
fn test_provided_merge(new_key: &[u8], fn test_provided_merge(new_key: &[u8],
......
use rocksdb::{Options, RocksDB, Writable, Direction}; use rocksdb::{Options, DB, Writable, Direction};
use std; use std;
fn cba(input: &Box<[u8]>) -> Box<[u8]> { fn cba(input: &Box<[u8]>) -> Box<[u8]> {
...@@ -17,7 +17,7 @@ pub fn test_iterator() { ...@@ -17,7 +17,7 @@ pub fn test_iterator() {
let v2:Box<[u8]> = b"v2222".to_vec().into_boxed_slice(); let v2:Box<[u8]> = b"v2222".to_vec().into_boxed_slice();
let v3:Box<[u8]> = b"v3333".to_vec().into_boxed_slice(); let v3:Box<[u8]> = b"v3333".to_vec().into_boxed_slice();
let v4:Box<[u8]> = b"v4444".to_vec().into_boxed_slice(); let v4:Box<[u8]> = b"v4444".to_vec().into_boxed_slice();
let mut db = RocksDB::open_default(path).unwrap(); let mut db = DB::open_default(path).unwrap();
let p = db.put(&*k1, &*v1); let p = db.put(&*k1, &*v1);
assert!(p.is_ok()); assert!(p.is_ok());
let p = db.put(&*k2, &*v2); let p = db.put(&*k2, &*v2);
...@@ -109,6 +109,6 @@ pub fn test_iterator() { ...@@ -109,6 +109,6 @@ pub fn test_iterator() {
} }
} }
let opts = Options::new(); let opts = Options::new();
assert!(RocksDB::destroy(&opts, path).is_ok()); assert!(DB::destroy(&opts, path).is_ok());
} }
use rocksdb::{Options, RocksDB, Writable, Direction, RocksDBResult}; use rocksdb::{Options, DB, Writable, Direction, DBResult};
use std::thread::{self, Builder}; use std::thread::{self, Builder};
use std::sync::Arc; use std::sync::Arc;
...@@ -8,7 +8,7 @@ const N: usize = 100_000; ...@@ -8,7 +8,7 @@ const N: usize = 100_000;
pub fn test_multithreaded() { pub fn test_multithreaded() {
let path = "_rust_rocksdb_multithreadtest"; let path = "_rust_rocksdb_multithreadtest";
{ {
let db = RocksDB::open_default(path).unwrap(); let db = DB::open_default(path).unwrap();
let db = Arc::new(db); let db = Arc::new(db);
db.put(b"key", b"value1"); db.put(b"key", b"value1");
...@@ -31,7 +31,7 @@ pub fn test_multithreaded() { ...@@ -31,7 +31,7 @@ pub fn test_multithreaded() {
let j3 = thread::spawn(move|| { let j3 = thread::spawn(move|| {
for i in 1..N { for i in 1..N {
match db3.get(b"key") { match db3.get(b"key") {
RocksDBResult::Some(v) => { DBResult::Some(v) => {
if &v[..] != b"value1" && &v[..] != b"value2" { if &v[..] != b"value1" && &v[..] != b"value2" {
assert!(false); assert!(false);
} }
...@@ -47,5 +47,5 @@ pub fn test_multithreaded() { ...@@ -47,5 +47,5 @@ pub fn test_multithreaded() {
j2.join(); j2.join();
j3.join(); j3.join();
} }
assert!(RocksDB::destroy(&Options::new(), path).is_ok()); assert!(DB::destroy(&Options::new(), path).is_ok());
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment