Commit a1b66364 authored by Jay Lee's avatar Jay Lee

add flush and approximate size support

parent 097dee1e
......@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//
use libc::{self, c_char, c_int, c_void, size_t};
use libc::{self, c_char, c_int, c_void, size_t, uint64_t};
use std::ffi::CStr;
use std::str::from_utf8;
......@@ -55,6 +55,9 @@ pub struct DBWriteBatch(pub *const c_void);
#[derive(Copy, Clone)]
#[repr(C)]
pub struct DBComparator(pub *const c_void);
#[derive(Copy, Clone)]
#[repr(C)]
pub struct DBFlushOptions(pub *const c_void);
pub fn new_bloom_filter(bits: c_int) -> DBFilterPolicy {
unsafe { rocksdb_filterpolicy_create_bloom(bits) }
......@@ -409,12 +412,36 @@ extern "C" {
err: *mut *const i8);
pub fn rocksdb_column_family_handle_destroy(column_family_handle: DBCFHandle);
// Flush options
pub fn rocksdb_flushoptions_create() -> DBFlushOptions;
pub fn rocksdb_flushoptions_destroy(opt: DBFlushOptions);
pub fn rocksdb_flushoptions_set_wait(opt: DBFlushOptions,
whether_wait: bool);
pub fn rocksdb_flush(db: DBInstance,
options: DBFlushOptions,
err: *mut *const i8);
pub fn rocksdb_approximate_sizes(db: DBInstance,
num_ranges: c_int,
range_start_key: *const *const u8,
range_start_key_len: *const size_t,
range_limit_key: *const *const u8,
range_limit_key_len: *const size_t,
sizes: *mut uint64_t);
pub fn rocksdb_approximate_sizes_cf(db: DBInstance,
cf: DBCFHandle,
num_ranges: c_int,
range_start_key: *const *const u8,
range_start_key_len: *const size_t,
range_limit_key: *const *const u8,
range_limit_key_len: *const size_t,
sizes: *mut uint64_t);
}
#[cfg(test)]
mod test {
use super::*;
use libc::*;
use std::ffi::CString;
use tempdir::TempDir;
......@@ -434,13 +461,9 @@ mod test {
.unwrap();
let cpath_ptr = cpath.as_ptr();
let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err;
let db = rocksdb_open(opts, cpath_ptr, err_ptr);
if !err.is_null() {
println!("failed to open rocksdb: {}", error_message(err));
}
assert!(err.is_null());
let mut err = 0 as *const i8;
let db = rocksdb_open(opts, cpath_ptr, &mut err);
assert!(err.is_null(), error_message(err));
let writeopts = rocksdb_writeoptions_create();
assert!(!writeopts.0.is_null());
......@@ -453,25 +476,43 @@ mod test {
4,
val.as_ptr(),
8,
err_ptr);
&mut err);
rocksdb_writeoptions_destroy(writeopts);
assert!(err.is_null());
assert!(err.is_null(), error_message(err));
let readopts = rocksdb_readoptions_create();
assert!(!readopts.0.is_null());
let val_len: size_t = 0;
let val_len_ptr = &val_len as *const size_t;
let mut val_len = 0;
rocksdb_get(db,
readopts.clone(),
key.as_ptr(),
4,
val_len_ptr,
err_ptr);
&mut val_len,
&mut err);
rocksdb_readoptions_destroy(readopts);
assert!(err.is_null());
assert!(err.is_null(), error_message(err));
// flush first to get approximate size later.
let flush_opt = rocksdb_flushoptions_create();
rocksdb_flushoptions_set_wait(flush_opt, true);
rocksdb_flush(db, flush_opt, &mut err);
rocksdb_flushoptions_destroy(flush_opt);
assert!(err.is_null(), error_message(err));
let mut sizes = vec![0; 1];
rocksdb_approximate_sizes(db,
1,
vec![b"\x00\x00".as_ptr()].as_ptr(),
vec![1].as_ptr(),
vec![b"\xff\x00".as_ptr()].as_ptr(),
vec![1].as_ptr(),
sizes.as_mut_ptr());
assert_eq!(sizes.len(), 1);
assert!(sizes[0] > 0);
rocksdb_close(db);
rocksdb_destroy_db(opts, cpath_ptr, err_ptr);
rocksdb_destroy_db(opts, cpath_ptr, &mut err);
assert!(err.is_null());
}
}
......
......@@ -25,6 +25,8 @@ use libc::{self, c_int, c_void, size_t};
use rocksdb_ffi::{self, DBCFHandle, error_message};
use rocksdb_options::{Options, WriteOptions};
const DEFAULT_COLUMN_FAMILY: &'static str = "default";
pub struct DB {
inner: rocksdb_ffi::DBInstance,
cfs: BTreeMap<String, DBCFHandle>,
......@@ -223,6 +225,24 @@ pub trait Writable {
fn delete_cf(&self, cf: DBCFHandle, key: &[u8]) -> Result<(), String>;
}
/// A range of keys, start_key is included, but not end_key.
///
/// You should make sure end_key is not less than start_key.
pub struct Range<'a> {
start_key: &'a [u8],
end_key: &'a [u8],
}
impl<'a> Range<'a> {
pub fn new(start_key: &'a [u8], end_key: &'a [u8]) -> Range<'a> {
assert!(start_key <= end_key);
Range {
start_key: start_key,
end_key: end_key,
}
}
}
impl DB {
pub fn open_default(path: &str) -> Result<DB, String> {
let mut opts = Options::new();
......@@ -271,8 +291,8 @@ impl DB {
} else {
let mut cfs_v = cfs.to_vec();
// Always open the default column family
if !cfs_v.contains(&"default") {
cfs_v.push("default");
if !cfs_v.contains(&DEFAULT_COLUMN_FAMILY) {
cfs_v.push(DEFAULT_COLUMN_FAMILY);
}
// We need to store our CStrings in an intermediate vector
......@@ -662,6 +682,91 @@ impl DB {
Ok(())
}
}
/// Flush all memtable data.
///
/// Due to lack of abi, only default cf is supported.
///
/// If sync, the flush will wait until the flush is done.
pub fn flush(&self, sync: bool) -> Result<(), String> {
unsafe {
let opts = rocksdb_ffi::rocksdb_flushoptions_create();
rocksdb_ffi::rocksdb_flushoptions_set_wait(opts, sync);
let mut err = 0 as *const i8;
rocksdb_ffi::rocksdb_flush(self.inner, opts, &mut err);
rocksdb_ffi::rocksdb_flushoptions_destroy(opts);
if !err.is_null() {
return Err(error_message(err));
}
Ok(())
}
}
/// Return the approximate file system space used by keys in each ranges.
///
/// Note that the returned sizes measure file system space usage, so
/// if the user data compresses by a factor of ten, the returned
/// sizes will be one-tenth the size of the corresponding user data size.
///
/// Due to lack of abi, only data flushed to disk is taken into account.
pub fn get_approximate_sizes(&self, ranges: &[Range]) -> Vec<u64> {
self.get_approximate_sizes_cfopt(None, ranges)
}
pub fn get_approximate_sizes_cf(&self,
cf: DBCFHandle,
ranges: &[Range])
-> Vec<u64> {
self.get_approximate_sizes_cfopt(Some(cf), ranges)
}
fn get_approximate_sizes_cfopt(&self,
cf: Option<DBCFHandle>,
ranges: &[Range])
-> Vec<u64> {
let start_keys: Vec<*const u8> = ranges.iter()
.map(|x| x.start_key.as_ptr())
.collect();
let start_key_lens: Vec<u64> = ranges.iter()
.map(|x| x.start_key.len() as u64)
.collect();
let end_keys: Vec<*const u8> = ranges.iter()
.map(|x| x.end_key.as_ptr())
.collect();
let end_key_lens: Vec<u64> = ranges.iter()
.map(|x| x.end_key.len() as u64)
.collect();
let mut sizes: Vec<u64> = vec![0; ranges.len()];
let (n, sk_ptr, skl_ptr, ek_ptr, ekl_ptr, s_ptr) =
(ranges.len() as i32,
start_keys.as_ptr(),
start_key_lens.as_ptr(),
end_keys.as_ptr(),
end_key_lens.as_ptr(),
sizes.as_mut_ptr());
match cf {
None => unsafe {
rocksdb_ffi::rocksdb_approximate_sizes(self.inner,
n,
sk_ptr,
skl_ptr,
ek_ptr,
ekl_ptr,
s_ptr)
},
Some(cf) => unsafe {
rocksdb_ffi::rocksdb_approximate_sizes_cf(self.inner,
cf,
n,
sk_ptr,
skl_ptr,
ek_ptr,
ekl_ptr,
s_ptr)
},
}
sizes
}
}
impl Writable for DB {
......@@ -922,12 +1027,9 @@ mod test {
let path = TempDir::new("_rust_rocksdb_iteratortest").expect("");
let db = DB::open_default(path.path().to_str().unwrap()).unwrap();
let p = db.put(b"k1", b"v1111");
assert!(p.is_ok());
let p = db.put(b"k2", b"v2222");
assert!(p.is_ok());
let p = db.put(b"k3", b"v3333");
assert!(p.is_ok());
db.put(b"k1", b"v1111").expect("");
db.put(b"k2", b"v2222").expect("");
db.put(b"k3", b"v3333").expect("");
let iter = db.iterator(IteratorMode::Start);
for (k, v) in iter {
println!("Hello {}: {}",
......@@ -935,4 +1037,28 @@ mod test {
str::from_utf8(&*v).unwrap());
}
}
#[test]
fn approximate_size_test() {
let path = TempDir::new("_rust_rocksdb_iteratortest").expect("");
let db = DB::open_default(path.path().to_str().unwrap()).unwrap();
for i in 1..8000 {
db.put(format!("{:04}", i).as_bytes(),
format!("{:04}", i).as_bytes())
.expect("");
}
db.flush(true).expect("");
assert!(db.get(b"0001").expect("").is_some());
db.flush(true).expect("");
let sizes = db.get_approximate_sizes(&[Range::new(b"0000", b"2000"),
Range::new(b"2000", b"4000"),
Range::new(b"4000", b"6000"),
Range::new(b"6000", b"8000"),
Range::new(b"8000", b"9999")]);
assert_eq!(sizes.len(), 5);
for s in &sizes[0..4] {
assert!(*s > 0);
}
assert_eq!(sizes[4], 0);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment