Commit a22a722c authored by goroutine's avatar goroutine

Merge pull request #3 from BusyJay/busyjay/add-approximate-size-support

wraper: add flush and approximate size support
parents 097dee1e a1b66364
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
// //
use libc::{self, c_char, c_int, c_void, size_t}; use libc::{self, c_char, c_int, c_void, size_t, uint64_t};
use std::ffi::CStr; use std::ffi::CStr;
use std::str::from_utf8; use std::str::from_utf8;
...@@ -55,6 +55,9 @@ pub struct DBWriteBatch(pub *const c_void); ...@@ -55,6 +55,9 @@ pub struct DBWriteBatch(pub *const c_void);
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
#[repr(C)] #[repr(C)]
pub struct DBComparator(pub *const c_void); pub struct DBComparator(pub *const c_void);
#[derive(Copy, Clone)]
#[repr(C)]
pub struct DBFlushOptions(pub *const c_void);
pub fn new_bloom_filter(bits: c_int) -> DBFilterPolicy { pub fn new_bloom_filter(bits: c_int) -> DBFilterPolicy {
unsafe { rocksdb_filterpolicy_create_bloom(bits) } unsafe { rocksdb_filterpolicy_create_bloom(bits) }
...@@ -409,12 +412,36 @@ extern "C" { ...@@ -409,12 +412,36 @@ extern "C" {
err: *mut *const i8); err: *mut *const i8);
pub fn rocksdb_column_family_handle_destroy(column_family_handle: DBCFHandle); pub fn rocksdb_column_family_handle_destroy(column_family_handle: DBCFHandle);
// Flush options
pub fn rocksdb_flushoptions_create() -> DBFlushOptions;
pub fn rocksdb_flushoptions_destroy(opt: DBFlushOptions);
pub fn rocksdb_flushoptions_set_wait(opt: DBFlushOptions,
whether_wait: bool);
pub fn rocksdb_flush(db: DBInstance,
options: DBFlushOptions,
err: *mut *const i8);
pub fn rocksdb_approximate_sizes(db: DBInstance,
num_ranges: c_int,
range_start_key: *const *const u8,
range_start_key_len: *const size_t,
range_limit_key: *const *const u8,
range_limit_key_len: *const size_t,
sizes: *mut uint64_t);
pub fn rocksdb_approximate_sizes_cf(db: DBInstance,
cf: DBCFHandle,
num_ranges: c_int,
range_start_key: *const *const u8,
range_start_key_len: *const size_t,
range_limit_key: *const *const u8,
range_limit_key_len: *const size_t,
sizes: *mut uint64_t);
} }
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use super::*;
use libc::*;
use std::ffi::CString; use std::ffi::CString;
use tempdir::TempDir; use tempdir::TempDir;
...@@ -434,13 +461,9 @@ mod test { ...@@ -434,13 +461,9 @@ mod test {
.unwrap(); .unwrap();
let cpath_ptr = cpath.as_ptr(); let cpath_ptr = cpath.as_ptr();
let mut err: *const i8 = 0 as *const i8; let mut err = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err; let db = rocksdb_open(opts, cpath_ptr, &mut err);
let db = rocksdb_open(opts, cpath_ptr, err_ptr); assert!(err.is_null(), error_message(err));
if !err.is_null() {
println!("failed to open rocksdb: {}", error_message(err));
}
assert!(err.is_null());
let writeopts = rocksdb_writeoptions_create(); let writeopts = rocksdb_writeoptions_create();
assert!(!writeopts.0.is_null()); assert!(!writeopts.0.is_null());
...@@ -453,25 +476,43 @@ mod test { ...@@ -453,25 +476,43 @@ mod test {
4, 4,
val.as_ptr(), val.as_ptr(),
8, 8,
err_ptr); &mut err);
rocksdb_writeoptions_destroy(writeopts); rocksdb_writeoptions_destroy(writeopts);
assert!(err.is_null()); assert!(err.is_null(), error_message(err));
let readopts = rocksdb_readoptions_create(); let readopts = rocksdb_readoptions_create();
assert!(!readopts.0.is_null()); assert!(!readopts.0.is_null());
let val_len: size_t = 0; let mut val_len = 0;
let val_len_ptr = &val_len as *const size_t;
rocksdb_get(db, rocksdb_get(db,
readopts.clone(), readopts.clone(),
key.as_ptr(), key.as_ptr(),
4, 4,
val_len_ptr, &mut val_len,
err_ptr); &mut err);
rocksdb_readoptions_destroy(readopts); rocksdb_readoptions_destroy(readopts);
assert!(err.is_null()); assert!(err.is_null(), error_message(err));
// flush first to get approximate size later.
let flush_opt = rocksdb_flushoptions_create();
rocksdb_flushoptions_set_wait(flush_opt, true);
rocksdb_flush(db, flush_opt, &mut err);
rocksdb_flushoptions_destroy(flush_opt);
assert!(err.is_null(), error_message(err));
let mut sizes = vec![0; 1];
rocksdb_approximate_sizes(db,
1,
vec![b"\x00\x00".as_ptr()].as_ptr(),
vec![1].as_ptr(),
vec![b"\xff\x00".as_ptr()].as_ptr(),
vec![1].as_ptr(),
sizes.as_mut_ptr());
assert_eq!(sizes.len(), 1);
assert!(sizes[0] > 0);
rocksdb_close(db); rocksdb_close(db);
rocksdb_destroy_db(opts, cpath_ptr, err_ptr); rocksdb_destroy_db(opts, cpath_ptr, &mut err);
assert!(err.is_null()); assert!(err.is_null());
} }
} }
......
...@@ -25,6 +25,8 @@ use libc::{self, c_int, c_void, size_t}; ...@@ -25,6 +25,8 @@ use libc::{self, c_int, c_void, size_t};
use rocksdb_ffi::{self, DBCFHandle, error_message}; use rocksdb_ffi::{self, DBCFHandle, error_message};
use rocksdb_options::{Options, WriteOptions}; use rocksdb_options::{Options, WriteOptions};
const DEFAULT_COLUMN_FAMILY: &'static str = "default";
pub struct DB { pub struct DB {
inner: rocksdb_ffi::DBInstance, inner: rocksdb_ffi::DBInstance,
cfs: BTreeMap<String, DBCFHandle>, cfs: BTreeMap<String, DBCFHandle>,
...@@ -223,6 +225,24 @@ pub trait Writable { ...@@ -223,6 +225,24 @@ pub trait Writable {
fn delete_cf(&self, cf: DBCFHandle, key: &[u8]) -> Result<(), String>; fn delete_cf(&self, cf: DBCFHandle, key: &[u8]) -> Result<(), String>;
} }
/// A range of keys, start_key is included, but not end_key.
///
/// You should make sure end_key is not less than start_key.
pub struct Range<'a> {
start_key: &'a [u8],
end_key: &'a [u8],
}
impl<'a> Range<'a> {
pub fn new(start_key: &'a [u8], end_key: &'a [u8]) -> Range<'a> {
assert!(start_key <= end_key);
Range {
start_key: start_key,
end_key: end_key,
}
}
}
impl DB { impl DB {
pub fn open_default(path: &str) -> Result<DB, String> { pub fn open_default(path: &str) -> Result<DB, String> {
let mut opts = Options::new(); let mut opts = Options::new();
...@@ -271,8 +291,8 @@ impl DB { ...@@ -271,8 +291,8 @@ impl DB {
} else { } else {
let mut cfs_v = cfs.to_vec(); let mut cfs_v = cfs.to_vec();
// Always open the default column family // Always open the default column family
if !cfs_v.contains(&"default") { if !cfs_v.contains(&DEFAULT_COLUMN_FAMILY) {
cfs_v.push("default"); cfs_v.push(DEFAULT_COLUMN_FAMILY);
} }
// We need to store our CStrings in an intermediate vector // We need to store our CStrings in an intermediate vector
...@@ -662,6 +682,91 @@ impl DB { ...@@ -662,6 +682,91 @@ impl DB {
Ok(()) Ok(())
} }
} }
/// Flush all memtable data.
///
/// Due to lack of abi, only default cf is supported.
///
/// If sync, the flush will wait until the flush is done.
pub fn flush(&self, sync: bool) -> Result<(), String> {
unsafe {
let opts = rocksdb_ffi::rocksdb_flushoptions_create();
rocksdb_ffi::rocksdb_flushoptions_set_wait(opts, sync);
let mut err = 0 as *const i8;
rocksdb_ffi::rocksdb_flush(self.inner, opts, &mut err);
rocksdb_ffi::rocksdb_flushoptions_destroy(opts);
if !err.is_null() {
return Err(error_message(err));
}
Ok(())
}
}
/// Return the approximate file system space used by keys in each ranges.
///
/// Note that the returned sizes measure file system space usage, so
/// if the user data compresses by a factor of ten, the returned
/// sizes will be one-tenth the size of the corresponding user data size.
///
/// Due to lack of abi, only data flushed to disk is taken into account.
pub fn get_approximate_sizes(&self, ranges: &[Range]) -> Vec<u64> {
self.get_approximate_sizes_cfopt(None, ranges)
}
pub fn get_approximate_sizes_cf(&self,
cf: DBCFHandle,
ranges: &[Range])
-> Vec<u64> {
self.get_approximate_sizes_cfopt(Some(cf), ranges)
}
fn get_approximate_sizes_cfopt(&self,
cf: Option<DBCFHandle>,
ranges: &[Range])
-> Vec<u64> {
let start_keys: Vec<*const u8> = ranges.iter()
.map(|x| x.start_key.as_ptr())
.collect();
let start_key_lens: Vec<u64> = ranges.iter()
.map(|x| x.start_key.len() as u64)
.collect();
let end_keys: Vec<*const u8> = ranges.iter()
.map(|x| x.end_key.as_ptr())
.collect();
let end_key_lens: Vec<u64> = ranges.iter()
.map(|x| x.end_key.len() as u64)
.collect();
let mut sizes: Vec<u64> = vec![0; ranges.len()];
let (n, sk_ptr, skl_ptr, ek_ptr, ekl_ptr, s_ptr) =
(ranges.len() as i32,
start_keys.as_ptr(),
start_key_lens.as_ptr(),
end_keys.as_ptr(),
end_key_lens.as_ptr(),
sizes.as_mut_ptr());
match cf {
None => unsafe {
rocksdb_ffi::rocksdb_approximate_sizes(self.inner,
n,
sk_ptr,
skl_ptr,
ek_ptr,
ekl_ptr,
s_ptr)
},
Some(cf) => unsafe {
rocksdb_ffi::rocksdb_approximate_sizes_cf(self.inner,
cf,
n,
sk_ptr,
skl_ptr,
ek_ptr,
ekl_ptr,
s_ptr)
},
}
sizes
}
} }
impl Writable for DB { impl Writable for DB {
...@@ -922,12 +1027,9 @@ mod test { ...@@ -922,12 +1027,9 @@ mod test {
let path = TempDir::new("_rust_rocksdb_iteratortest").expect(""); let path = TempDir::new("_rust_rocksdb_iteratortest").expect("");
let db = DB::open_default(path.path().to_str().unwrap()).unwrap(); let db = DB::open_default(path.path().to_str().unwrap()).unwrap();
let p = db.put(b"k1", b"v1111"); db.put(b"k1", b"v1111").expect("");
assert!(p.is_ok()); db.put(b"k2", b"v2222").expect("");
let p = db.put(b"k2", b"v2222"); db.put(b"k3", b"v3333").expect("");
assert!(p.is_ok());
let p = db.put(b"k3", b"v3333");
assert!(p.is_ok());
let iter = db.iterator(IteratorMode::Start); let iter = db.iterator(IteratorMode::Start);
for (k, v) in iter { for (k, v) in iter {
println!("Hello {}: {}", println!("Hello {}: {}",
...@@ -935,4 +1037,28 @@ mod test { ...@@ -935,4 +1037,28 @@ mod test {
str::from_utf8(&*v).unwrap()); str::from_utf8(&*v).unwrap());
} }
} }
#[test]
fn approximate_size_test() {
let path = TempDir::new("_rust_rocksdb_iteratortest").expect("");
let db = DB::open_default(path.path().to_str().unwrap()).unwrap();
for i in 1..8000 {
db.put(format!("{:04}", i).as_bytes(),
format!("{:04}", i).as_bytes())
.expect("");
}
db.flush(true).expect("");
assert!(db.get(b"0001").expect("").is_some());
db.flush(true).expect("");
let sizes = db.get_approximate_sizes(&[Range::new(b"0000", b"2000"),
Range::new(b"2000", b"4000"),
Range::new(b"4000", b"6000"),
Range::new(b"6000", b"8000"),
Range::new(b"8000", b"9999")]);
assert_eq!(sizes.len(), 5);
for s in &sizes[0..4] {
assert!(*s > 0);
}
assert_eq!(sizes[4], 0);
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment