Commit 909bc881 authored by David Greenberg's avatar David Greenberg

Improve safety of borrow checker with ffi objects

parent 57ecb0cf
......@@ -20,7 +20,7 @@ use std::mem;
use std::ptr;
use std::slice;
use rocksdb_options::RocksDBOptions;
use rocksdb_options::Options;
use rocksdb::RocksDB;
pub struct ComparatorCallback {
......@@ -69,11 +69,11 @@ fn test_reverse_compare(a: &[u8], b: &[u8]) -> c_int {
#[test]
fn compare_works() {
let path = "_rust_rocksdb_comparetest";
let opts = RocksDBOptions::new();
let mut opts = Options::new();
opts.create_if_missing(true);
opts.add_comparator("test comparator", test_reverse_compare);
let db = RocksDB::open(opts, path).unwrap();
let db = RocksDB::open(&opts, path).unwrap();
// TODO add interesting test
db.close();
assert!(RocksDB::destroy(opts, path).is_ok());
assert!(RocksDB::destroy(&opts, path).is_ok());
}
......@@ -92,14 +92,15 @@ pub enum RocksDBUniversalCompactionStyle {
rocksdb_total_size_compaction_stop_style = 1
}
//TODO audit the use of boolean arguments, b/c I think they need to be u8 instead...
#[link(name = "rocksdb")]
extern {
pub fn rocksdb_options_create() -> RocksDBOptions;
pub fn rocksdb_options_destroy(opts: RocksDBOptions);
pub fn rocksdb_cache_create_lru(capacity: size_t) -> RocksDBCache;
pub fn rocksdb_cache_destroy(cache: RocksDBCache);
pub fn rocksdb_block_based_options_create() -> RocksDBBlockBasedTableOptions;
pub fn rocksdb_block_based_options_destroy(
block_options: RocksDBBlockBasedTableOptions);
pub fn rocksdb_block_based_options_destroy(opts: RocksDBBlockBasedTableOptions);
pub fn rocksdb_block_based_options_set_block_size(
block_options: RocksDBBlockBasedTableOptions,
block_size: size_t);
......@@ -191,8 +192,26 @@ extern {
err: *mut i8);
pub fn rocksdb_readoptions_create() -> RocksDBReadOptions;
pub fn rocksdb_readoptions_destroy(readopts: RocksDBReadOptions);
pub fn rocksdb_readoptions_set_snapshot(read_opts: RocksDBReadOptions,
snapshot: RocksDBSnapshot);
pub fn rocksdb_readoptions_set_verify_checksums(
readopts: RocksDBReadOptions,
v: bool);
pub fn rocksdb_readoptions_set_fill_cache(
readopts: RocksDBReadOptions,
v: bool);
pub fn rocksdb_readoptions_set_snapshot(
readopts: RocksDBReadOptions,
snapshot: RocksDBSnapshot); //TODO how do I make this a const ref?
pub fn rocksdb_readoptions_set_iterate_upper_bound(
readopts: RocksDBReadOptions,
k: *const u8,
kLen: size_t);
pub fn rocksdb_readoptions_set_read_tier(
readopts: RocksDBReadOptions,
tier: c_int);
pub fn rocksdb_readoptions_set_tailing(
readopts: RocksDBReadOptions,
v: bool);
pub fn rocksdb_get(db: RocksDBInstance,
readopts: RocksDBReadOptions,
k: *const u8, kLen: size_t,
......
......@@ -38,7 +38,8 @@ pub use rocksdb::{
Writable,
};
pub use rocksdb_options::{
RocksDBOptions,
Options,
BlockBasedOptions,
};
pub use merge_operator::{
MergeOperands,
......
......@@ -17,7 +17,7 @@
extern crate rocksdb;
extern crate test;
use rocksdb::{RocksDBOptions, RocksDB, MergeOperands, new_bloom_filter, Writable};
use rocksdb::{Options, RocksDB, MergeOperands, new_bloom_filter, Writable};
use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction;
#[cfg(not(feature = "valgrind"))]
......@@ -57,10 +57,10 @@ fn concat_merge(new_key: &[u8], existing_val: Option<&[u8]>,
fn custom_merge() {
let path = "_rust_rocksdb_mergetest";
let opts = RocksDBOptions::new();
let mut opts = Options::new();
opts.create_if_missing(true);
opts.add_merge_operator("test operator", concat_merge);
let db = RocksDB::open(opts, path).unwrap();
let db = RocksDB::open(&opts, path).unwrap();
db.put(b"k1", b"a");
db.merge(b"k1", b"b");
db.merge(b"k1", b"c");
......@@ -79,16 +79,16 @@ fn custom_merge() {
.on_error( |e| { println!("error retrieving value: {}", e) });
db.close();
RocksDB::destroy(opts, path).is_ok();
RocksDB::destroy(&opts, path).is_ok();
}
#[cfg(feature = "valgrind")]
fn main() {
let path = "_rust_rocksdb_valgrind";
let opts = RocksDBOptions::new();
let mut opts = Options::new();
opts.create_if_missing(true);
opts.add_merge_operator("test operator", concat_merge);
let db = RocksDB::open(opts, path).unwrap();
let db = RocksDB::open(&opts, path).unwrap();
loop {
db.put(b"k1", b"a");
db.merge(b"k1", b"b");
......@@ -114,12 +114,11 @@ mod tests {
use test::Bencher;
use std::thread::sleep_ms;
use rocksdb::{RocksDBOptions, RocksDB, MergeOperands, new_bloom_filter, Writable};
use rocksdb::{BlockBasedOptions, Options, RocksDB, MergeOperands, new_bloom_filter, Writable};
use rocksdb::RocksDBCompactionStyle::RocksDBUniversalCompaction;
fn tuned_for_somebody_elses_disk(path: &str, opts: RocksDBOptions) -> RocksDB {
fn tuned_for_somebody_elses_disk(path: &str, opts: &mut Options, blockopts: &mut BlockBasedOptions) -> RocksDB {
opts.create_if_missing(true);
opts.set_block_size(524288);
opts.set_max_open_files(10000);
opts.set_use_fsync(false);
opts.set_bytes_per_sync(8388608);
......@@ -136,12 +135,14 @@ mod tests {
opts.set_max_background_compactions(4);
opts.set_max_background_flushes(4);
opts.set_filter_deletes(false);
blockopts.set_block_size(524288);
opts.set_block_based_table_factory(blockopts);
opts.set_disable_auto_compactions(true);
let filter = new_bloom_filter(10);
opts.set_filter(filter);
//opts.set_filter(filter);
RocksDB::open(opts, path).unwrap()
RocksDB::open(&opts, path).unwrap()
}
#[bench]
......@@ -149,8 +150,9 @@ mod tests {
// dirty hack due to parallel tests causing contention.
sleep_ms(1000);
let path = "_rust_rocksdb_optimizetest";
let opts = RocksDBOptions::new();
let db = tuned_for_somebody_elses_disk(path, opts);
let mut opts = Options::new();
let mut blockopts = BlockBasedOptions::new();
let db = tuned_for_somebody_elses_disk(path, &mut opts, &mut blockopts);
let mut i = 0 as u64;
b.iter(|| {
db.put(i.to_string().as_bytes(), b"v1111");
......@@ -162,8 +164,9 @@ mod tests {
#[bench]
fn b_reads(b: &mut Bencher) {
let path = "_rust_rocksdb_optimizetest";
let opts = RocksDBOptions::new();
let db = tuned_for_somebody_elses_disk(path, opts);
let mut opts = Options::new();
let mut blockopts = BlockBasedOptions::new();
let db = tuned_for_somebody_elses_disk(path, &mut opts, &mut blockopts);
let mut i = 0 as u64;
b.iter(|| {
db.get(i.to_string().as_bytes()).on_error( |e| {
......@@ -173,6 +176,6 @@ mod tests {
i += 1;
});
db.close();
RocksDB::destroy(opts, path).is_ok();
RocksDB::destroy(&opts, path).is_ok();
}
}
......@@ -20,7 +20,7 @@ use std::mem;
use std::ptr;
use std::slice;
use rocksdb_options::{RocksDBOptions};
use rocksdb_options::{Options};
use rocksdb::{RocksDB, RocksDBResult, RocksDBVector, Writable};
pub struct MergeOperatorCallback {
......@@ -166,10 +166,10 @@ fn test_provided_merge(new_key: &[u8], existing_val: Option<&[u8]>,
#[test]
fn mergetest() {
let path = "_rust_rocksdb_mergetest";
let opts = RocksDBOptions::new();
let mut opts = Options::new();
opts.create_if_missing(true);
opts.add_merge_operator("test operator", test_provided_merge);
let db = RocksDB::open(opts, path).unwrap();
let db = RocksDB::open(&opts, path).unwrap();
let p = db.put(b"k1", b"a");
assert!(p.is_ok());
db.merge(b"k1", b"b");
......@@ -189,10 +189,10 @@ fn mergetest() {
.on_error( |e| { println!("error reading value")}); //: {", e) });
assert!(m.is_ok());
let r: RocksDBResult<RocksDBVector, &str> = db.get(b"k1");
let r: RocksDBResult<RocksDBVector, String> = db.get(b"k1");
assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh");
assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").is_none());
db.close();
assert!(RocksDB::destroy(opts, path).is_ok());
assert!(RocksDB::destroy(&opts, path).is_ok());
}
......@@ -25,45 +25,50 @@ use std::slice;
use std::str::from_utf8;
use rocksdb_ffi;
use rocksdb_ffi::RocksDBSnapshot;
use rocksdb_options::RocksDBOptions;
use rocksdb_options::Options;
pub struct RocksDB {
inner: rocksdb_ffi::RocksDBInstance,
}
pub struct WriteBatch {
inner: rocksdb_ffi::RocksDBWriteBatch
inner: rocksdb_ffi::RocksDBWriteBatch,
}
pub struct ReadOptions {
inner: rocksdb_ffi::RocksDBReadOptions,
}
// This is for the RocksDB and write batches to share the same API
pub trait Writable {
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), &str>;
fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), &str>;
fn delete(&self, key: &[u8]) -> Result<(),&str>;
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String>;
fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String>;
fn delete(&self, key: &[u8]) -> Result<(), String>;
}
fn error_message<'a>(ptr: *const i8) -> &'a str {
unsafe {
return from_utf8(CStr::from_ptr(ptr).to_bytes()).unwrap();
}
fn error_message(ptr: *const i8) -> String {
let c_str = unsafe { CStr::from_ptr(ptr) };
from_utf8(c_str.to_bytes()).unwrap().to_owned()
}
impl RocksDB {
pub fn open_default(path: &str) -> Result<RocksDB, &str> {
let opts = RocksDBOptions::new();
pub fn open_default(path: &str) -> Result<RocksDB, String> {
let mut opts = Options::new();
opts.create_if_missing(true);
RocksDB::open(opts, path)
RocksDB::open(&opts, path)
}
pub fn open(opts: RocksDBOptions, path: &str) -> Result<RocksDB, &str> {
let cpath = CString::new(path.as_bytes()).unwrap();
pub fn open(opts: &Options, path: &str) -> Result<RocksDB, String> {
let cpath = match CString::new(path.as_bytes()) {
Ok(c) => c,
Err(_) => return Err("Failed to convert path to CString when opening rocksdb".to_string()),
};
let cpath_ptr = cpath.as_ptr();
let ospath = Path::new(path);
if !ospath.exists() {
match fs::create_dir_all(&ospath) {
Err(e) => return Err("Failed to create rocksdb directory."),
Err(e) => return Err("Failed to create rocksdb directory.".to_string()),
Ok(_) => (),
}
}
......@@ -78,19 +83,20 @@ impl RocksDB {
if !err.is_null() {
return Err(error_message(err));
}
if db.0.is_null() {
return Err("Could not initialize database.");
let rocksdb_ffi::RocksDBInstance(db_ptr) = db;
if db_ptr.is_null() {
return Err("Could not initialize database.".to_string());
}
Ok(RocksDB{inner: db})
}
pub fn destroy(opts: RocksDBOptions, path: &str) -> Result<(), &str> {
pub fn destroy(opts: &Options, path: &str) -> Result<(), String> {
let cpath = CString::new(path.as_bytes()).unwrap();
let cpath_ptr = cpath.as_ptr();
let ospath = Path::new(path);
if !ospath.exists() {
return Err("path does not exist");
return Err("path does not exist".to_string());
}
let err = 0 as *mut i8;
......@@ -103,13 +109,13 @@ impl RocksDB {
Ok(())
}
pub fn repair(opts: RocksDBOptions, path: &str) -> Result<(), &str> {
pub fn repair(opts: Options, path: &str) -> Result<(), String> {
let cpath = CString::new(path.as_bytes()).unwrap();
let cpath_ptr = cpath.as_ptr();
let ospath = Path::new(path);
if !ospath.exists() {
return Err("path does not exist");
return Err("path does not exist".to_string());
}
let err = 0 as *mut i8;
......@@ -122,33 +128,27 @@ impl RocksDB {
Ok(())
}
pub fn create_snapshot(self) -> RocksDBSnapshot {
unsafe {
rocksdb_ffi::rocksdb_create_snapshot(self.inner)
}
}
pub fn write(&self, batch: WriteBatch) -> Result<(), &str> {
pub fn write(&self, batch: WriteBatch) -> Result<(), String> {
let writeopts = unsafe { rocksdb_ffi::rocksdb_writeoptions_create() };
let err = 0 as *mut i8;
unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let err = 0 as *mut i8;
rocksdb_ffi::rocksdb_write(self.inner, writeopts.clone(), batch.inner, err);
rocksdb_ffi::rocksdb_writeoptions_destroy(writeopts);
if !err.is_null() {
return Err(error_message(err));
}
return Ok(())
}
if !err.is_null() {
return Err(error_message(err));
}
return Ok(())
}
pub fn get(&self, key: &[u8]) -> RocksDBResult<RocksDBVector, &str> {
pub fn get(&self, key: &[u8]) -> RocksDBResult<RocksDBVector, String> {
unsafe {
let readopts = rocksdb_ffi::rocksdb_readoptions_create();
if readopts.0.is_null() {
return RocksDBResult::Error("Unable to create rocksdb read \
options. This is a fairly trivial call, and its failure \
may be indicative of a mis-compiled or mis-loaded rocksdb \
library.");
library.".to_string());
}
let val_len: size_t = 0;
......@@ -175,7 +175,7 @@ impl RocksDB {
}
impl Writable for RocksDB {
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), &str> {
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let err = 0 as *mut i8;
......@@ -190,7 +190,7 @@ impl Writable for RocksDB {
}
}
fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), &str> {
fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let err = 0 as *mut i8;
......@@ -205,7 +205,7 @@ impl Writable for RocksDB {
}
}
fn delete(&self, key: &[u8]) -> Result<(),&str> {
fn delete(&self, key: &[u8]) -> Result<(), String> {
unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let err = 0 as *mut i8;
......@@ -239,7 +239,7 @@ impl Drop for WriteBatch {
}
impl Writable for WriteBatch {
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), &str> {
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe {
rocksdb_ffi::rocksdb_writebatch_put(self.inner, key.as_ptr(),
key.len() as size_t, value.as_ptr(),
......@@ -248,7 +248,7 @@ impl Writable for WriteBatch {
}
}
fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), &str> {
fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe {
rocksdb_ffi::rocksdb_writebatch_merge(self.inner, key.as_ptr(),
key.len() as size_t, value.as_ptr(),
......@@ -257,7 +257,7 @@ impl Writable for WriteBatch {
}
}
fn delete(&self, key: &[u8]) -> Result<(),&str> {
fn delete(&self, key: &[u8]) -> Result<(), String> {
unsafe {
rocksdb_ffi::rocksdb_writebatch_delete(self.inner, key.as_ptr(),
key.len() as size_t);
......@@ -266,6 +266,23 @@ impl Writable for WriteBatch {
}
}
impl Drop for ReadOptions {
fn drop(&mut self) {
unsafe {
rocksdb_ffi::rocksdb_readoptions_destroy(self.inner)
}
}
}
impl ReadOptions {
fn fill_cache(&mut self, v: bool) {
unsafe {
rocksdb_ffi::rocksdb_readoptions_set_fill_cache(self.inner, v);
}
}
}
pub struct RocksDBVector {
base: Unique<u8>,
len: usize,
......@@ -378,13 +395,13 @@ fn external() {
let db = RocksDB::open_default(path).unwrap();
let p = db.put(b"k1", b"v1111");
assert!(p.is_ok());
let r: RocksDBResult<RocksDBVector, &str> = db.get(b"k1");
let r: RocksDBResult<RocksDBVector, String> = db.get(b"k1");
assert!(r.unwrap().to_utf8().unwrap() == "v1111");
assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").is_none());
db.close();
let opts = RocksDBOptions::new();
assert!(RocksDB::destroy(opts, path).is_ok());
let opts = Options::new();
assert!(RocksDB::destroy(&opts, path).is_ok());
}
#[test]
......@@ -398,7 +415,7 @@ fn writebatch_works() {
assert!(db.get(b"k1").is_none());
let p = db.write(batch);
assert!(p.is_ok());
let r: RocksDBResult<RocksDBVector, &str> = db.get(b"k1");
let r: RocksDBResult<RocksDBVector, String> = db.get(b"k1");
assert!(r.unwrap().to_utf8().unwrap() == "v1111");
}
{ // test delete
......@@ -409,6 +426,6 @@ fn writebatch_works() {
assert!(db.get(b"k1").is_none());
}
db.close();
let opts = RocksDBOptions::new();
assert!(RocksDB::destroy(opts, path).is_ok());
let opts = Options::new();
assert!(RocksDB::destroy(&opts, path).is_ok());
}
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment