Commit eb27fcf7 authored by siddontang's avatar siddontang

*: merge master and fix conflict

parents a7c90f5d ffdce61a
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
name = "librocksdb_sys" name = "librocksdb_sys"
version = "0.1.0" version = "0.1.0"
authors = ["Jay Lee <busyjaylee@gmail.com>"] authors = ["Jay Lee <busyjaylee@gmail.com>"]
build = "build.rs"
[dependencies] [dependencies]
libc = "0.1.8" libc = "0.1.8"
......
use std::{env, fs, str};
use std::path::PathBuf;
use std::process::Command;
macro_rules! t {
($e:expr) => (match $e {
Ok(n) => n,
Err(e) => panic!("\n{} failed with {}\n", stringify!($e), e),
})
}
fn main() {
let want_static = env::var("ROCKSDB_SYS_STATIC").map(|s| s == "1").unwrap_or(false);
if !want_static {
return;
}
let target = env::var("TARGET").unwrap();
if !target.contains("linux") && !target.contains("darwin") {
// only linux and apple support static link right now
return;
}
let dst = PathBuf::from(env::var_os("OUT_DIR").unwrap());
let build = dst.join("build");
t!(fs::create_dir_all(&build));
let fest_dir = env::var("CARGO_MANIFEST_DIR").unwrap();
let p = PathBuf::from(fest_dir).join("build.sh");
for lib in &["z", "snappy", "bz2", "lz4", "rocksdb"] {
let lib_name = format!("lib{}.a", lib);
let src = build.join(&lib_name);
let dst = dst.join(&lib_name);
if dst.exists() {
continue;
}
if !src.exists() {
let mut cmd = Command::new(p.as_path());
cmd.current_dir(&build).args(&[format!("compile_{}", lib)]);
if *lib == "rocksdb" {
if let Some(s) = env::var("ROCKSDB_SYS_PORTABLE").ok() {
cmd.env("PORTABLE", s);
}
}
run(&mut cmd);
}
if let Err(e) = fs::rename(src.as_path(), dst.as_path()) {
panic!("failed to move {} to {}: {:?}", src.display(), dst.display(), e);
}
}
println!("cargo:rustc-link-lib=static=rocksdb");
println!("cargo:rustc-link-lib=static=z");
println!("cargo:rustc-link-lib=static=bz2");
println!("cargo:rustc-link-lib=static=lz4");
println!("cargo:rustc-link-lib=static=snappy");
println!("cargo:rustc-link-search=native={}", dst.display());
let mut cpp_linked = false;
if let Ok(libs) = env::var("ROCKSDB_OTHER_STATIC") {
for lib in libs.split(":") {
if lib == "stdc++" {
cpp_linked = true;
}
println!("cargo:rustc-link-lib=static={}", lib);
}
if let Ok(pathes) = env::var("ROCKSDB_OTHER_STATIC_PATH") {
for p in pathes.split(":") {
println!("cargo:rustc-link-search=native={}", p);
}
}
}
if !cpp_linked {
let output = Command::new(p.as_path()).arg("find_stdcxx").output().unwrap();
if output.status.success() && !output.stdout.is_empty() {
if let Ok(path_str) = str::from_utf8(&output.stdout) {
let path = PathBuf::from(path_str);
if path.is_absolute() {
println!("cargo:rustc-link-lib=static=stdc++");
println!("cargo:rustc-link-search=native={}", path.parent().unwrap().display());
return;
}
}
}
println!("failed to detect libstdc++.a: {:?}, fallback to dynamic", output);
println!("cargo:rustc-link-lib=stdc++");
}
}
fn run(cmd: &mut Command) {
println!("running: {:?}", cmd);
let status = match cmd.status() {
Ok(s) => s,
Err(e) => panic!("{:?} failed: {}", cmd, e),
};
if !status.success() {
panic!("{:?} failed: {}", cmd, status);
}
}
#!/usr/bin/env bash
set -e
con=1
if [[ -f /proc/cpuinfo ]]; then
con=`grep -c processor /proc/cpuinfo`
else
con=`sysctl -n hw.ncpu 2>/dev/null || echo 1`
fi
function error() {
echo $@ >&2
return 1
}
function md5_check() {
if which md5sum &>/dev/null; then
hash=`md5sum $1 | cut -d ' ' -f 1`
elif which openssl &>/dev/null; then
hash=`openssl md5 -hex $1 | cut -d ' ' -f 2`
else
error can\'t find hash tool.
fi
[[ "$hash" == "$2" ]] || error $1: hash not correct, expect $2, got $hash
}
function download() {
if [[ -f $2 ]] && md5_check $2 $3; then
return
fi
if which wget &>/dev/null; then
wget $1 -O $2
elif which curl &>/dev/null; then
curl -L $1 -o $2
else
error can\'t find wget and curl.
fi
md5_check $2 $3
}
function compile_z() {
if [[ -f libz.a ]]; then
return
fi
rm -rf zlib-1.2.8
download http://zlib.net/zlib-1.2.8.tar.gz zlib-1.2.8.tar.gz 44d667c142d7cda120332623eab69f40
tar xf zlib-1.2.8.tar.gz
cd zlib-1.2.8
CFLAGS='-fPIC' ./configure --static
make -j $con
cp libz.a ../
cd ..
}
function compile_bz2() {
if [[ -f libbz2.a ]]; then
return
fi
rm -rf bzip2-1.0.6
download http://www.bzip.org/1.0.6/bzip2-1.0.6.tar.gz bzip2-1.0.6.tar.gz 00b516f4704d4a7cb50a1d97e6e8e15b
tar xvzf bzip2-1.0.6.tar.gz
cd bzip2-1.0.6
make CFLAGS='-fPIC -O2 -g -D_FILE_OFFSET_BITS=64' -j $con
cp libbz2.a ../
cd ..
}
function compile_snappy() {
if [[ -f libsnappy.a ]]; then
return
fi
rm -rf snappy-1.1.1
download http://pkgs.fedoraproject.org/repo/pkgs/snappy/snappy-1.1.1.tar.gz/8887e3b7253b22a31f5486bca3cbc1c2/snappy-1.1.1.tar.gz snappy-1.1.1.tar.gz 8887e3b7253b22a31f5486bca3cbc1c2
tar xvzf snappy-1.1.1.tar.gz
cd snappy-1.1.1
./configure --with-pic --enable-static
make -j $con
mv .libs/libsnappy.a ../
cd ..
}
function compile_lz4() {
if [[ -f liblz4.a ]]; then
return
fi
rm -rf lz4-r127
download https://github.com/Cyan4973/lz4/archive/r131.tar.gz lz4-r131.tar.gz 42b09fab42331da9d3fb33bd5c560de9
tar xvzf lz4-r131.tar.gz
cd lz4-r131/lib
make CFLAGS='-fPIC' all -j $con
mv liblz4.a ../../
cd ../..
}
function compile_rocksdb() {
if [[ -f librocksdb.a ]]; then
return
fi
version=4.9.fb
echo building rocksdb-$version
rm -rf rocksdb-$version
download https://github.com/facebook/rocksdb/archive/$version.tar.gz rocksdb-$version.tar.gz 75f00635d4dcf0200db54a9244ac5f1d
tar xf rocksdb-$version.tar.gz
wd=`pwd`
cd rocksdb-$version
export EXTRA_CFLAGS="-fPIC -I${wd}/zlib-1.2.8 -I${wd}/bzip2-1.0.6 -I${wd}/snappy-1.1.1 -I${wd}/lz4-r131/lib"
export EXTRA_CXXFLAGS="-DZLIB -DBZIP2 -DSNAPPY -DLZ4 $EXTRA_CFLAGS"
make static_lib -j $con
mv librocksdb.a ../
}
function find_stdcxx() {
if g++ --version &>/dev/null; then
CXX=g++
elif clang++ --version &>/dev/null; then
CXX=clang++
else
error failed to find valid cxx compiler.
fi
$CXX --print-file-name libstdc++.a
}
if [[ $# -ne 1 ]]; then
error $0 [compile_bz2\|compile_z\|compile_lz4\|compile_rocksdb\|compile_snappy\|find_stdcxx]
fi
$1
...@@ -17,7 +17,7 @@ extern crate libc; ...@@ -17,7 +17,7 @@ extern crate libc;
#[cfg(test)] #[cfg(test)]
extern crate tempdir; extern crate tempdir;
use libc::{c_char, c_int, c_void, size_t, uint64_t}; use libc::{c_char, c_uchar, c_int, c_void, size_t, uint64_t};
use std::ffi::CStr; use std::ffi::CStr;
use std::str::from_utf8; use std::str::from_utf8;
...@@ -72,6 +72,7 @@ pub fn new_cache(capacity: size_t) -> DBCache { ...@@ -72,6 +72,7 @@ pub fn new_cache(capacity: size_t) -> DBCache {
unsafe { rocksdb_cache_create_lru(capacity) } unsafe { rocksdb_cache_create_lru(capacity) }
} }
#[derive(Copy, Clone)]
#[repr(C)] #[repr(C)]
pub enum DBCompressionType { pub enum DBCompressionType {
DBNo = 0, DBNo = 0,
...@@ -123,6 +124,8 @@ extern "C" { ...@@ -123,6 +124,8 @@ extern "C" {
pub fn rocksdb_block_based_options_set_block_restart_interval( pub fn rocksdb_block_based_options_set_block_restart_interval(
block_options: DBBlockBasedTableOptions, block_options: DBBlockBasedTableOptions,
block_restart_interval: c_int); block_restart_interval: c_int);
pub fn rocksdb_block_based_options_set_cache_index_and_filter_blocks(
block_options: DBBlockBasedTableOptions, v: c_uchar);
pub fn rocksdb_block_based_options_set_filter_policy( pub fn rocksdb_block_based_options_set_filter_policy(
block_options: DBBlockBasedTableOptions, block_options: DBBlockBasedTableOptions,
filter_policy: DBFilterPolicy); filter_policy: DBFilterPolicy);
...@@ -186,6 +189,9 @@ extern "C" { ...@@ -186,6 +189,9 @@ extern "C" {
cs: DBCompactionStyle); cs: DBCompactionStyle);
pub fn rocksdb_options_set_compression(options: DBOptions, pub fn rocksdb_options_set_compression(options: DBOptions,
compression_style_no: DBCompressionType); compression_style_no: DBCompressionType);
pub fn rocksdb_options_set_compression_per_level(options: DBOptions,
level_values: *const DBCompressionType,
num_levels: size_t);
pub fn rocksdb_options_set_max_background_compactions( pub fn rocksdb_options_set_max_background_compactions(
options: DBOptions, max_bg_compactions: c_int); options: DBOptions, max_bg_compactions: c_int);
pub fn rocksdb_options_set_max_background_flushes(options: DBOptions, pub fn rocksdb_options_set_max_background_flushes(options: DBOptions,
...@@ -193,6 +199,9 @@ extern "C" { ...@@ -193,6 +199,9 @@ extern "C" {
pub fn rocksdb_options_set_filter_deletes(options: DBOptions, v: bool); pub fn rocksdb_options_set_filter_deletes(options: DBOptions, v: bool);
pub fn rocksdb_options_set_disable_auto_compactions(options: DBOptions, pub fn rocksdb_options_set_disable_auto_compactions(options: DBOptions,
v: c_int); v: c_int);
pub fn rocksdb_options_set_report_bg_io_stats(options: DBOptions, v: c_int);
pub fn rocksdb_filterpolicy_create_bloom_full(bits_per_key: c_int)
-> DBFilterPolicy;
pub fn rocksdb_filterpolicy_create_bloom(bits_per_key: c_int) pub fn rocksdb_filterpolicy_create_bloom(bits_per_key: c_int)
-> DBFilterPolicy; -> DBFilterPolicy;
pub fn rocksdb_open(options: DBOptions, pub fn rocksdb_open(options: DBOptions,
...@@ -460,12 +469,20 @@ extern "C" { ...@@ -460,12 +469,20 @@ extern "C" {
range_limit_key: *const u8, range_limit_key: *const u8,
range_limit_key_len: size_t, range_limit_key_len: size_t,
err: *mut *const i8); err: *mut *const i8);
pub fn rocksdb_property_value(db: DBInstance,
propname: *const c_char)
-> *mut c_char;
pub fn rocksdb_property_value_cf(db: DBInstance,
cf: DBCFHandle,
propname: *const c_char)
-> *mut c_char;
} }
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use super::*;
use std::ffi::CString; use std::ffi::{CStr, CString};
use libc::{self, c_void};
use tempdir::TempDir; use tempdir::TempDir;
#[test] #[test]
...@@ -542,6 +559,21 @@ mod test { ...@@ -542,6 +559,21 @@ mod test {
&mut err); &mut err);
assert!(err.is_null(), error_message(err)); assert!(err.is_null(), error_message(err));
let propname = CString::new("rocksdb.total-sst-files-size")
.unwrap();
let value = rocksdb_property_value(db, propname.as_ptr());
assert!(!value.is_null());
let sst_size =
CStr::from_ptr(value).to_str().unwrap().parse::<u64>().unwrap();
assert!(sst_size > 0);
libc::free(value as *mut c_void);
let propname = CString::new("fake_key").unwrap();
let value = rocksdb_property_value(db, propname.as_ptr());
assert!(value.is_null());
libc::free(value as *mut c_void);
rocksdb_close(db); rocksdb_close(db);
rocksdb_destroy_db(opts, cpath_ptr, &mut err); rocksdb_destroy_db(opts, cpath_ptr, &mut err);
assert!(err.is_null()); assert!(err.is_null());
......
...@@ -12,7 +12,6 @@ ...@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
// //
#![feature(plugin)]
extern crate libc; extern crate libc;
...@@ -27,6 +26,7 @@ pub mod comparator; ...@@ -27,6 +26,7 @@ pub mod comparator;
pub use librocksdb_sys::{DBCompactionStyle, DBComparator, DBCompressionType, pub use librocksdb_sys::{DBCompactionStyle, DBComparator, DBCompressionType,
new_bloom_filter, self as rocksdb_ffi}; new_bloom_filter, self as rocksdb_ffi};
pub use rocksdb::{DB, DBIterator, DBVector, Kv, SeekKey, Writable, WriteBatch}; pub use rocksdb::{DB, DBIterator, DBVector, Kv, ReadOptions, SeekKey,
Writable, WriteBatch};
pub use rocksdb_options::{BlockBasedOptions, Options, WriteOptions}; pub use rocksdb_options::{BlockBasedOptions, Options, WriteOptions};
pub use merge_operator::MergeOperands; pub use merge_operator::MergeOperands;
...@@ -143,7 +143,7 @@ fn main() { ...@@ -143,7 +143,7 @@ fn main() {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use rocksdb::{BlockBasedOptions, DB, Options}; use rocksdb::{BlockBasedOptions, DB, DBCompressionType, Options};
use rocksdb::DBCompactionStyle::DBUniversal; use rocksdb::DBCompactionStyle::DBUniversal;
#[allow(dead_code)] #[allow(dead_code)]
...@@ -151,6 +151,15 @@ mod tests { ...@@ -151,6 +151,15 @@ mod tests {
opts: &mut Options, opts: &mut Options,
blockopts: &mut BlockBasedOptions) blockopts: &mut BlockBasedOptions)
-> DB { -> DB {
let per_level_compression: [DBCompressionType; 7] =
[DBCompressionType::DBNo,
DBCompressionType::DBNo,
DBCompressionType::DBNo,
DBCompressionType::DBLz4,
DBCompressionType::DBLz4,
DBCompressionType::DBLz4,
DBCompressionType::DBLz4];
opts.create_if_missing(true); opts.create_if_missing(true);
opts.set_max_open_files(10000); opts.set_max_open_files(10000);
opts.set_use_fsync(false); opts.set_use_fsync(false);
...@@ -168,7 +177,11 @@ mod tests { ...@@ -168,7 +177,11 @@ mod tests {
opts.set_max_background_compactions(4); opts.set_max_background_compactions(4);
opts.set_max_background_flushes(4); opts.set_max_background_flushes(4);
opts.set_filter_deletes(false); opts.set_filter_deletes(false);
opts.set_report_bg_io_stats(true);
opts.compression_per_level(&per_level_compression);
blockopts.set_block_size(524288); blockopts.set_block_size(524288);
blockopts.set_cache_index_and_filter_blocks(true);
blockopts.set_bloom_filter(10, false);
opts.set_block_based_table_factory(blockopts); opts.set_block_based_table_factory(blockopts);
opts.set_disable_auto_compactions(true); opts.set_disable_auto_compactions(true);
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
// limitations under the License. // limitations under the License.
// //
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::ffi::CString; use std::ffi::{CStr, CString};
use std::fs; use std::fs;
use std::ops::Deref; use std::ops::Deref;
use std::path::Path; use std::path::Path;
...@@ -30,6 +30,7 @@ const DEFAULT_COLUMN_FAMILY: &'static str = "default"; ...@@ -30,6 +30,7 @@ const DEFAULT_COLUMN_FAMILY: &'static str = "default";
pub struct DB { pub struct DB {
inner: rocksdb_ffi::DBInstance, inner: rocksdb_ffi::DBInstance,
cfs: BTreeMap<String, DBCFHandle>, cfs: BTreeMap<String, DBCFHandle>,
path: String,
} }
unsafe impl Send for DB {} unsafe impl Send for DB {}
...@@ -43,9 +44,18 @@ pub struct ReadOptions { ...@@ -43,9 +44,18 @@ pub struct ReadOptions {
inner: rocksdb_ffi::DBReadOptions, inner: rocksdb_ffi::DBReadOptions,
} }
/// The UnsafeSnap must be destroyed by db, it maybe be leaked
/// if not using it properly, hence named as unsafe.
///
/// This object is convenient for wrapping snapshot by yourself. In most
/// cases, using `Snapshot` is enough.
pub struct UnsafeSnap {
inner: rocksdb_ffi::DBSnapshot,
}
pub struct Snapshot<'a> { pub struct Snapshot<'a> {
db: &'a DB, db: &'a DB,
inner: rocksdb_ffi::DBSnapshot, snap: UnsafeSnap,
} }
// We need to find a better way to add a lifetime in here. // We need to find a better way to add a lifetime in here.
...@@ -68,7 +78,7 @@ impl<'a> From<&'a [u8]> for SeekKey<'a> { ...@@ -68,7 +78,7 @@ impl<'a> From<&'a [u8]> for SeekKey<'a> {
} }
impl<'a> DBIterator<'a> { impl<'a> DBIterator<'a> {
fn new(db: &'a DB, readopts: &ReadOptions) -> DBIterator<'a> { pub fn new(db: &'a DB, readopts: &ReadOptions) -> DBIterator<'a> {
unsafe { unsafe {
let iterator = rocksdb_ffi::rocksdb_create_iterator(db.inner, let iterator = rocksdb_ffi::rocksdb_create_iterator(db.inner,
readopts.inner); readopts.inner);
...@@ -147,25 +157,19 @@ impl<'a> DBIterator<'a> { ...@@ -147,25 +157,19 @@ impl<'a> DBIterator<'a> {
unsafe { rocksdb_ffi::rocksdb_iter_valid(self.inner) } unsafe { rocksdb_ffi::rocksdb_iter_valid(self.inner) }
} }
fn new_cf(db: &'a DB, pub fn new_cf(db: &'a DB,
cf_handle: DBCFHandle, cf_handle: DBCFHandle,
readopts: &ReadOptions, readopts: &ReadOptions)
key: SeekKey) -> DBIterator<'a> {
-> Result<DBIterator<'a>, String> {
unsafe { unsafe {
let iterator = let iterator =
rocksdb_ffi::rocksdb_create_iterator_cf(db.inner, rocksdb_ffi::rocksdb_create_iterator_cf(db.inner,
readopts.inner, readopts.inner,
cf_handle); cf_handle);
DBIterator {
let mut rv = DBIterator {
db: db, db: db,
inner: iterator, inner: iterator,
}; }
rv.seek(key);
Ok(rv)
} }
} }
} }
...@@ -194,23 +198,31 @@ impl<'a> Drop for DBIterator<'a> { ...@@ -194,23 +198,31 @@ impl<'a> Drop for DBIterator<'a> {
impl<'a> Snapshot<'a> { impl<'a> Snapshot<'a> {
pub fn new(db: &DB) -> Snapshot { pub fn new(db: &DB) -> Snapshot {
let snapshot = unsafe {
unsafe { rocksdb_ffi::rocksdb_create_snapshot(db.inner) }; Snapshot {
Snapshot { db: db,
db: db, snap: db.unsafe_snap(),
inner: snapshot, }
} }
} }
pub fn iter(&self) -> DBIterator { pub fn iter(&self) -> DBIterator {
let mut readopts = ReadOptions::new(); let readopts = ReadOptions::new();
readopts.set_snapshot(self); self.iter_opt(readopts)
DBIterator::new(self.db, &readopts) }
pub fn iter_opt(&self, mut opt: ReadOptions) -> DBIterator {
unsafe {
opt.set_snapshot(&self.snap);
}
DBIterator::new(self.db, &opt)
} }
pub fn get(&self, key: &[u8]) -> Result<Option<DBVector>, String> { pub fn get(&self, key: &[u8]) -> Result<Option<DBVector>, String> {
let mut readopts = ReadOptions::new(); let mut readopts = ReadOptions::new();
readopts.set_snapshot(self); unsafe {
readopts.set_snapshot(&self.snap);
}
self.db.get_opt(key, &readopts) self.db.get_opt(key, &readopts)
} }
...@@ -219,16 +231,16 @@ impl<'a> Snapshot<'a> { ...@@ -219,16 +231,16 @@ impl<'a> Snapshot<'a> {
key: &[u8]) key: &[u8])
-> Result<Option<DBVector>, String> { -> Result<Option<DBVector>, String> {
let mut readopts = ReadOptions::new(); let mut readopts = ReadOptions::new();
readopts.set_snapshot(self); unsafe {
readopts.set_snapshot(&self.snap);
}
self.db.get_cf_opt(cf, key, &readopts) self.db.get_cf_opt(cf, key, &readopts)
} }
} }
impl<'a> Drop for Snapshot<'a> { impl<'a> Drop for Snapshot<'a> {
fn drop(&mut self) { fn drop(&mut self) {
unsafe { unsafe { self.db.release_snap(&self.snap) }
rocksdb_ffi::rocksdb_release_snapshot(self.db.inner, self.inner);
}
} }
} }
...@@ -276,12 +288,13 @@ impl DB { ...@@ -276,12 +288,13 @@ impl DB {
} }
pub fn open(opts: &Options, path: &str) -> Result<DB, String> { pub fn open(opts: &Options, path: &str) -> Result<DB, String> {
DB::open_cf(opts, path, &[]) DB::open_cf(opts, path, &[], &[])
} }
pub fn open_cf(opts: &Options, pub fn open_cf(opts: &Options,
path: &str, path: &str,
cfs: &[&str]) cfs: &[&str],
cf_opts: &[&Options])
-> Result<DB, String> { -> Result<DB, String> {
let cpath = match CString::new(path.as_bytes()) { let cpath = match CString::new(path.as_bytes()) {
Ok(c) => c, Ok(c) => c,
...@@ -291,81 +304,71 @@ impl DB { ...@@ -291,81 +304,71 @@ impl DB {
.to_owned()) .to_owned())
} }
}; };
let cpath_ptr = cpath.as_ptr(); if let Err(e) = fs::create_dir_all(&Path::new(path)) {
let ospath = Path::new(path);
if let Err(e) = fs::create_dir_all(&ospath) {
return Err(format!("Failed to create rocksdb directory: \ return Err(format!("Failed to create rocksdb directory: \
src/rocksdb.rs: \ src/rocksdb.rs: \
{:?}", {:?}",
e)); e));
} }
let mut err: *const i8 = 0 as *const i8; if cfs.len() != cf_opts.len() {
let err_ptr: *mut *const i8 = &mut err; return Err(format!("cfs.len() and cf_opts.len() not match."));
let db: rocksdb_ffi::DBInstance; }
let mut cf_map = BTreeMap::new();
if cfs.len() == 0 { let mut cfs_v = cfs.to_vec();
unsafe { let mut cf_opts_v = cf_opts.to_vec();
db = rocksdb_ffi::rocksdb_open(opts.inner, // Always open the default column family
cpath_ptr as *const _, if !cfs_v.contains(&DEFAULT_COLUMN_FAMILY) {
err_ptr); cfs_v.push(DEFAULT_COLUMN_FAMILY);
} cf_opts_v.push(opts);
} else { }
let mut cfs_v = cfs.to_vec();
// Always open the default column family
if !cfs_v.contains(&DEFAULT_COLUMN_FAMILY) {
cfs_v.push(DEFAULT_COLUMN_FAMILY);
}
// We need to store our CStrings in an intermediate vector // We need to store our CStrings in an intermediate vector
// so that their pointers remain valid. // so that their pointers remain valid.
let c_cfs: Vec<CString> = cfs_v.iter() let c_cfs: Vec<CString> = cfs_v.iter()
.map(|cf| CString::new(cf.as_bytes()).unwrap()) .map(|cf| CString::new(cf.as_bytes()).unwrap())
.collect(); .collect();
let cfnames: Vec<*const _> = c_cfs.iter()
.map(|cf| cf.as_ptr())
.collect();
// These handles will be populated by DB.
let cfhandles: Vec<rocksdb_ffi::DBCFHandle> = cfs_v.iter()
.map(|_| rocksdb_ffi::DBCFHandle(0 as *mut c_void))
.collect();
// TODO(tyler) allow options to be passed in.
let cfopts: Vec<rocksdb_ffi::DBOptions> = cfs_v.iter()
.map(|_| unsafe { rocksdb_ffi::rocksdb_options_create() })
.collect();
// Prepare to ship to C.
let cfopts_ptr: *const rocksdb_ffi::DBOptions = cfopts.as_ptr();
let handles: *const rocksdb_ffi::DBCFHandle = cfhandles.as_ptr();
let nfam = cfs_v.len();
unsafe {
db = rocksdb_ffi::rocksdb_open_column_families(opts.inner, cpath_ptr as *const _,
nfam as c_int,
cfnames.as_ptr() as *const _,
cfopts_ptr, handles, err_ptr);
}
for handle in &cfhandles { let cfnames: Vec<*const _> = c_cfs.iter()
if handle.0.is_null() { .map(|cf| cf.as_ptr())
return Err("Received null column family handle from DB." .collect();
.to_owned());
}
}
for (n, h) in cfs_v.iter().zip(cfhandles) { // These handles will be populated by DB.
cf_map.insert((*n).to_owned(), h); let cfhandles: Vec<rocksdb_ffi::DBCFHandle> = cfs_v.iter()
} .map(|_| rocksdb_ffi::DBCFHandle(0 as *mut c_void))
} .collect();
let cfopts: Vec<rocksdb_ffi::DBOptions> =
cf_opts_v.iter().map(|x| x.inner).collect();
let db: rocksdb_ffi::DBInstance;
let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err;
unsafe {
db = rocksdb_ffi::rocksdb_open_column_families(opts.inner,
cpath.as_ptr() as *const _,
cfs_v.len() as c_int,
cfnames.as_ptr() as *const _,
cfopts.as_ptr(),
cfhandles.as_ptr(),
err_ptr);
}
if !err.is_null() { if !err.is_null() {
return Err(error_message(err)); return Err(error_message(err));
} }
for handle in &cfhandles {
if handle.0.is_null() {
return Err("Received null column family handle from DB."
.to_owned());
}
}
let mut cf_map = BTreeMap::new();
for (n, h) in cfs_v.iter().zip(cfhandles) {
cf_map.insert((*n).to_owned(), h);
}
if db.0.is_null() { if db.0.is_null() {
return Err("Could not initialize database.".to_owned()); return Err("Could not initialize database.".to_owned());
} }
...@@ -373,6 +376,7 @@ impl DB { ...@@ -373,6 +376,7 @@ impl DB {
Ok(DB { Ok(DB {
inner: db, inner: db,
cfs: cf_map, cfs: cf_map,
path: path.to_owned(),
}) })
} }
...@@ -410,6 +414,10 @@ impl DB { ...@@ -410,6 +414,10 @@ impl DB {
Ok(()) Ok(())
} }
pub fn path(&self) -> &str {
&self.path
}
pub fn write_opt(&self, pub fn write_opt(&self,
batch: WriteBatch, batch: WriteBatch,
writeopts: &WriteOptions) writeopts: &WriteOptions)
...@@ -575,23 +583,37 @@ impl DB { ...@@ -575,23 +583,37 @@ impl DB {
self.cfs.get(name) self.cfs.get(name)
} }
/// get all column family names, including 'default'.
pub fn cf_names(&self) -> Vec<&str> {
self.cfs.iter().map(|(k, _)| k.as_str()).collect()
}
pub fn iter(&self) -> DBIterator { pub fn iter(&self) -> DBIterator {
let opts = ReadOptions::new(); let opts = ReadOptions::new();
DBIterator::new(&self, &opts) self.iter_opt(&opts)
}
pub fn iter_opt(&self, opt: &ReadOptions) -> DBIterator {
DBIterator::new(&self, opt)
} }
pub fn iter_cf(&self, pub fn iter_cf(&self, cf_handle: DBCFHandle) -> DBIterator {
cf_handle: DBCFHandle,
key: SeekKey)
-> Result<DBIterator, String> {
let opts = ReadOptions::new(); let opts = ReadOptions::new();
DBIterator::new_cf(&self, cf_handle, &opts, key) DBIterator::new_cf(&self, cf_handle, &opts)
} }
pub fn snapshot(&self) -> Snapshot { pub fn snapshot(&self) -> Snapshot {
Snapshot::new(self) Snapshot::new(self)
} }
pub unsafe fn unsafe_snap(&self) -> UnsafeSnap {
UnsafeSnap { inner: rocksdb_ffi::rocksdb_create_snapshot(self.inner) }
}
pub unsafe fn release_snap(&self, snap: &UnsafeSnap) {
rocksdb_ffi::rocksdb_release_snapshot(self.inner, snap.inner)
}
pub fn put_opt(&self, pub fn put_opt(&self,
key: &[u8], key: &[u8],
value: &[u8], value: &[u8],
...@@ -848,6 +870,75 @@ impl DB { ...@@ -848,6 +870,75 @@ impl DB {
Ok(()) Ok(())
} }
} }
pub fn get_property_value(&self, name: &str) -> Option<String> {
self.get_property_value_cf_opt(None, name)
}
pub fn get_property_value_cf(&self,
cf: DBCFHandle,
name: &str)
-> Option<String> {
self.get_property_value_cf_opt(Some(cf), name)
}
/// Return the int property in rocksdb.
/// Return None if the property not exists or not int type.
pub fn get_property_int(&self, name: &str) -> Option<u64> {
self.get_property_int_cf_opt(None, name)
}
pub fn get_property_int_cf(&self,
cf: DBCFHandle,
name: &str)
-> Option<u64> {
self.get_property_int_cf_opt(Some(cf), name)
}
fn get_property_value_cf_opt(&self,
cf: Option<DBCFHandle>,
name: &str)
-> Option<String> {
unsafe {
let prop_name = CString::new(name).unwrap();
let value = match cf {
None => {
rocksdb_ffi::rocksdb_property_value(self.inner,
prop_name.as_ptr())
}
Some(cf) => {
rocksdb_ffi::rocksdb_property_value_cf(self.inner,
cf,
prop_name.as_ptr())
}
};
if value.is_null() {
return None;
}
// Must valid UTF-8 format.
let s = CStr::from_ptr(value).to_str().unwrap().to_owned();
libc::free(value as *mut c_void);
Some(s)
}
}
fn get_property_int_cf_opt(&self,
cf: Option<DBCFHandle>,
name: &str)
-> Option<u64> {
// Rocksdb guarantees that the return property int
// value is u64 if exists.
if let Some(value) = self.get_property_value_cf_opt(cf, name) {
if let Ok(num) = value.as_str().parse::<u64>() {
return Some(num);
}
}
None
}
} }
impl Writable for DB { impl Writable for DB {
...@@ -896,6 +987,14 @@ impl WriteBatch { ...@@ -896,6 +987,14 @@ impl WriteBatch {
pub fn new() -> WriteBatch { pub fn new() -> WriteBatch {
WriteBatch::default() WriteBatch::default()
} }
pub fn count(&self) -> usize {
unsafe { rocksdb_ffi::rocksdb_writebatch_count(self.inner) as usize }
}
pub fn is_empty(&self) -> bool {
self.count() == 0
}
} }
impl Drop for WriteBatch { impl Drop for WriteBatch {
...@@ -1005,24 +1104,22 @@ impl Default for ReadOptions { ...@@ -1005,24 +1104,22 @@ impl Default for ReadOptions {
} }
impl ReadOptions { impl ReadOptions {
fn new() -> ReadOptions { pub fn new() -> ReadOptions {
ReadOptions::default() ReadOptions::default()
} }
// TODO add snapshot setting here // TODO add snapshot setting here
// TODO add snapshot wrapper structs with proper destructors; // TODO add snapshot wrapper structs with proper destructors;
// that struct needs an "iterator" impl too. // that struct needs an "iterator" impl too.
#[allow(dead_code)] #[allow(dead_code)]
fn fill_cache(&mut self, v: bool) { pub fn fill_cache(&mut self, v: bool) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_readoptions_set_fill_cache(self.inner, v); rocksdb_ffi::rocksdb_readoptions_set_fill_cache(self.inner, v);
} }
} }
fn set_snapshot(&mut self, snapshot: &Snapshot) { pub unsafe fn set_snapshot(&mut self, snapshot: &UnsafeSnap) {
unsafe { rocksdb_ffi::rocksdb_readoptions_set_snapshot(self.inner,
rocksdb_ffi::rocksdb_readoptions_set_snapshot(self.inner, snapshot.inner);
snapshot.inner);
}
} }
} }
...@@ -1100,7 +1197,11 @@ mod test { ...@@ -1100,7 +1197,11 @@ mod test {
// test put // test put
let batch = WriteBatch::new(); let batch = WriteBatch::new();
assert!(db.get(b"k1").unwrap().is_none()); assert!(db.get(b"k1").unwrap().is_none());
assert_eq!(batch.count(), 0);
assert!(batch.is_empty());
let _ = batch.put(b"k1", b"v1111"); let _ = batch.put(b"k1", b"v1111");
assert_eq!(batch.count(), 1);
assert!(!batch.is_empty());
assert!(db.get(b"k1").unwrap().is_none()); assert!(db.get(b"k1").unwrap().is_none());
let p = db.write(batch); let p = db.write(batch);
assert!(p.is_ok()); assert!(p.is_ok());
...@@ -1110,6 +1211,8 @@ mod test { ...@@ -1110,6 +1211,8 @@ mod test {
// test delete // test delete
let batch = WriteBatch::new(); let batch = WriteBatch::new();
let _ = batch.delete(b"k1"); let _ = batch.delete(b"k1");
assert_eq!(batch.count(), 1);
assert!(!batch.is_empty());
let p = db.write(batch); let p = db.write(batch);
assert!(p.is_ok()); assert!(p.is_ok());
assert!(db.get(b"k1").unwrap().is_none()); assert!(db.get(b"k1").unwrap().is_none());
...@@ -1155,6 +1258,21 @@ mod test { ...@@ -1155,6 +1258,21 @@ mod test {
} }
assert_eq!(sizes[4], 0); assert_eq!(sizes[4], 0);
} }
#[test]
fn property_test() {
let path = TempDir::new("_rust_rocksdb_propertytest").expect("");
let db = DB::open_default(path.path().to_str().unwrap()).unwrap();
db.put(b"a1", b"v1").unwrap();
db.flush(true).unwrap();
let prop_name = "rocksdb.total-sst-files-size";
let st1 = db.get_property_int(prop_name).unwrap();
assert!(st1 > 0);
db.put(b"a2", b"v2").unwrap();
db.flush(true).unwrap();
let st2 = db.get_property_int(prop_name).unwrap();
assert!(st2 > st1);
}
} }
#[test] #[test]
......
...@@ -90,6 +90,28 @@ impl BlockBasedOptions { ...@@ -90,6 +90,28 @@ impl BlockBasedOptions {
rocksdb_ffi::rocksdb_block_based_options_set_block_cache(self.inner, cache); rocksdb_ffi::rocksdb_block_based_options_set_block_cache(self.inner, cache);
} }
} }
pub fn set_bloom_filter(&mut self,
bits_per_key: c_int,
block_based: bool) {
unsafe {
let bloom = if block_based {
rocksdb_ffi::rocksdb_filterpolicy_create_bloom(bits_per_key)
} else {
rocksdb_ffi::rocksdb_filterpolicy_create_bloom_full(bits_per_key)
};
rocksdb_ffi::rocksdb_block_based_options_set_filter_policy(self.inner,
bloom);
}
}
pub fn set_cache_index_and_filter_blocks(&mut self, v: bool) {
unsafe {
rocksdb_ffi::rocksdb_block_based_options_set_cache_index_and_filter_blocks(self.inner,
v as u8);
}
}
} }
// TODO figure out how to create these in a Rusty way // TODO figure out how to create these in a Rusty way
...@@ -162,6 +184,15 @@ impl Options { ...@@ -162,6 +184,15 @@ impl Options {
} }
} }
pub fn compression_per_level(&mut self,
level_types: &[DBCompressionType]) {
unsafe {
rocksdb_ffi::rocksdb_options_set_compression_per_level(self.inner,
level_types.as_ptr(),
level_types.len() as size_t)
}
}
pub fn add_merge_operator(&mut self, name: &str, merge_fn: MergeFn) { pub fn add_merge_operator(&mut self, name: &str, merge_fn: MergeFn) {
let cb = Box::new(MergeOperatorCallback { let cb = Box::new(MergeOperatorCallback {
name: CString::new(name.as_bytes()).unwrap(), name: CString::new(name.as_bytes()).unwrap(),
...@@ -359,6 +390,18 @@ impl Options { ...@@ -359,6 +390,18 @@ impl Options {
rocksdb_ffi::rocksdb_options_set_block_based_table_factory(self.inner, factory.inner); rocksdb_ffi::rocksdb_options_set_block_based_table_factory(self.inner, factory.inner);
} }
} }
pub fn set_report_bg_io_stats(&mut self, enable: bool) {
unsafe {
if enable {
rocksdb_ffi::rocksdb_options_set_report_bg_io_stats(self.inner,
1);
} else {
rocksdb_ffi::rocksdb_options_set_report_bg_io_stats(self.inner,
0);
}
}
}
} }
impl Default for WriteOptions { impl Default for WriteOptions {
......
...@@ -33,6 +33,7 @@ pub fn test_column_family() { ...@@ -33,6 +33,7 @@ pub fn test_column_family() {
panic!("could not create column family: {}", e); panic!("could not create column family: {}", e);
} }
} }
assert_eq!(db.cf_names(), vec!["cf1", "default"]);
} }
// should fail to open db without specifying same column families // should fail to open db without specifying same column families
...@@ -56,7 +57,7 @@ pub fn test_column_family() { ...@@ -56,7 +57,7 @@ pub fn test_column_family() {
{ {
let mut opts = Options::new(); let mut opts = Options::new();
opts.add_merge_operator("test operator", test_provided_merge); opts.add_merge_operator("test operator", test_provided_merge);
match DB::open_cf(&opts, path_str, &["cf1"]) { match DB::open_cf(&opts, path_str, &["cf1"], &[&opts]) {
Ok(_) => println!("successfully opened db with column family"), Ok(_) => println!("successfully opened db with column family"),
Err(e) => panic!("failed to open db with column family: {}", e), Err(e) => panic!("failed to open db with column family: {}", e),
} }
...@@ -65,7 +66,7 @@ pub fn test_column_family() { ...@@ -65,7 +66,7 @@ pub fn test_column_family() {
{ {
let mut opts = Options::new(); let mut opts = Options::new();
opts.add_merge_operator("test operator", test_provided_merge); opts.add_merge_operator("test operator", test_provided_merge);
let db = match DB::open_cf(&opts, path_str, &["cf1"]) { let db = match DB::open_cf(&opts, path_str, &["cf1"], &[&opts]) {
Ok(db) => { Ok(db) => {
println!("successfully opened db with column family"); println!("successfully opened db with column family");
db db
...@@ -113,7 +114,11 @@ pub fn test_column_family() { ...@@ -113,7 +114,11 @@ pub fn test_column_family() {
} }
// should b able to drop a cf // should b able to drop a cf
{ {
let mut db = DB::open_cf(&Options::new(), path_str, &["cf1"]).unwrap(); let mut db = DB::open_cf(&Options::new(),
path_str,
&["cf1"],
&[&Options::new()])
.unwrap();
match db.drop_cf("cf1") { match db.drop_cf("cf1") {
Ok(_) => println!("cf1 successfully dropped."), Ok(_) => println!("cf1 successfully dropped."),
Err(e) => panic!("failed to drop column family: {}", e), Err(e) => panic!("failed to drop column family: {}", e),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment