Commit cb11b928 authored by zhangjinpeng1987's avatar zhangjinpeng1987

Merge branch 'master' of https://github.com/ngaut/rust-rocksdb

parents 2aebd37e ebf9de0f
...@@ -12,7 +12,6 @@ ...@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
// //
#![feature(plugin)]
extern crate libc; extern crate libc;
...@@ -27,6 +26,7 @@ pub mod comparator; ...@@ -27,6 +26,7 @@ pub mod comparator;
pub use librocksdb_sys::{DBCompactionStyle, DBComparator, DBCompressionType, pub use librocksdb_sys::{DBCompactionStyle, DBComparator, DBCompressionType,
new_bloom_filter, self as rocksdb_ffi}; new_bloom_filter, self as rocksdb_ffi};
pub use rocksdb::{DB, DBIterator, DBVector, Kv, SeekKey, Writable, WriteBatch, ReadOptions}; pub use rocksdb::{DB, DBIterator, DBVector, Kv, ReadOptions, SeekKey,
Writable, WriteBatch};
pub use rocksdb_options::{BlockBasedOptions, Options, WriteOptions}; pub use rocksdb_options::{BlockBasedOptions, Options, WriteOptions};
pub use merge_operator::MergeOperands; pub use merge_operator::MergeOperands;
...@@ -143,7 +143,7 @@ fn main() { ...@@ -143,7 +143,7 @@ fn main() {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use rocksdb::{BlockBasedOptions, DB, Options, DBCompressionType}; use rocksdb::{BlockBasedOptions, DB, DBCompressionType, Options};
use rocksdb::DBCompactionStyle::DBUniversal; use rocksdb::DBCompactionStyle::DBUniversal;
#[allow(dead_code)] #[allow(dead_code)]
...@@ -151,7 +151,8 @@ mod tests { ...@@ -151,7 +151,8 @@ mod tests {
opts: &mut Options, opts: &mut Options,
blockopts: &mut BlockBasedOptions) blockopts: &mut BlockBasedOptions)
-> DB { -> DB {
let per_level_compression: [DBCompressionType; 7] = [DBCompressionType::DBNo, let per_level_compression: [DBCompressionType; 7] =
[DBCompressionType::DBNo,
DBCompressionType::DBNo, DBCompressionType::DBNo,
DBCompressionType::DBNo, DBCompressionType::DBNo,
DBCompressionType::DBLz4, DBCompressionType::DBLz4,
......
...@@ -44,9 +44,18 @@ pub struct ReadOptions { ...@@ -44,9 +44,18 @@ pub struct ReadOptions {
inner: rocksdb_ffi::DBReadOptions, inner: rocksdb_ffi::DBReadOptions,
} }
/// The UnsafeSnap must be destroyed by db, it maybe be leaked
/// if not using it properly, hence named as unsafe.
///
/// This object is convenient for wrapping snapshot by yourself. In most
/// cases, using `Snapshot` is enough.
pub struct UnsafeSnap {
inner: rocksdb_ffi::DBSnapshot,
}
pub struct Snapshot<'a> { pub struct Snapshot<'a> {
db: &'a DB, db: &'a DB,
inner: rocksdb_ffi::DBSnapshot, snap: UnsafeSnap,
} }
// We need to find a better way to add a lifetime in here. // We need to find a better way to add a lifetime in here.
...@@ -69,7 +78,7 @@ impl<'a> From<&'a [u8]> for SeekKey<'a> { ...@@ -69,7 +78,7 @@ impl<'a> From<&'a [u8]> for SeekKey<'a> {
} }
impl<'a> DBIterator<'a> { impl<'a> DBIterator<'a> {
fn new(db: &'a DB, readopts: &ReadOptions) -> DBIterator<'a> { pub fn new(db: &'a DB, readopts: &ReadOptions) -> DBIterator<'a> {
unsafe { unsafe {
let iterator = rocksdb_ffi::rocksdb_create_iterator(db.inner, let iterator = rocksdb_ffi::rocksdb_create_iterator(db.inner,
readopts.inner); readopts.inner);
...@@ -148,25 +157,19 @@ impl<'a> DBIterator<'a> { ...@@ -148,25 +157,19 @@ impl<'a> DBIterator<'a> {
unsafe { rocksdb_ffi::rocksdb_iter_valid(self.inner) } unsafe { rocksdb_ffi::rocksdb_iter_valid(self.inner) }
} }
fn new_cf(db: &'a DB, pub fn new_cf(db: &'a DB,
cf_handle: DBCFHandle, cf_handle: DBCFHandle,
readopts: &ReadOptions, readopts: &ReadOptions)
key: SeekKey) -> DBIterator<'a> {
-> Result<DBIterator<'a>, String> {
unsafe { unsafe {
let iterator = let iterator =
rocksdb_ffi::rocksdb_create_iterator_cf(db.inner, rocksdb_ffi::rocksdb_create_iterator_cf(db.inner,
readopts.inner, readopts.inner,
cf_handle); cf_handle);
DBIterator {
let mut rv = DBIterator {
db: db, db: db,
inner: iterator, inner: iterator,
}; }
rv.seek(key);
Ok(rv)
} }
} }
} }
...@@ -195,11 +198,11 @@ impl<'a> Drop for DBIterator<'a> { ...@@ -195,11 +198,11 @@ impl<'a> Drop for DBIterator<'a> {
impl<'a> Snapshot<'a> { impl<'a> Snapshot<'a> {
pub fn new(db: &DB) -> Snapshot { pub fn new(db: &DB) -> Snapshot {
let snapshot = unsafe {
unsafe { rocksdb_ffi::rocksdb_create_snapshot(db.inner) };
Snapshot { Snapshot {
db: db, db: db,
inner: snapshot, snap: db.unsafe_snap(),
}
} }
} }
...@@ -209,13 +212,17 @@ impl<'a> Snapshot<'a> { ...@@ -209,13 +212,17 @@ impl<'a> Snapshot<'a> {
} }
pub fn iter_opt(&self, mut opt: ReadOptions) -> DBIterator { pub fn iter_opt(&self, mut opt: ReadOptions) -> DBIterator {
opt.set_snapshot(self); unsafe {
opt.set_snapshot(&self.snap);
}
DBIterator::new(self.db, &opt) DBIterator::new(self.db, &opt)
} }
pub fn get(&self, key: &[u8]) -> Result<Option<DBVector>, String> { pub fn get(&self, key: &[u8]) -> Result<Option<DBVector>, String> {
let mut readopts = ReadOptions::new(); let mut readopts = ReadOptions::new();
readopts.set_snapshot(self); unsafe {
readopts.set_snapshot(&self.snap);
}
self.db.get_opt(key, &readopts) self.db.get_opt(key, &readopts)
} }
...@@ -224,16 +231,16 @@ impl<'a> Snapshot<'a> { ...@@ -224,16 +231,16 @@ impl<'a> Snapshot<'a> {
key: &[u8]) key: &[u8])
-> Result<Option<DBVector>, String> { -> Result<Option<DBVector>, String> {
let mut readopts = ReadOptions::new(); let mut readopts = ReadOptions::new();
readopts.set_snapshot(self); unsafe {
readopts.set_snapshot(&self.snap);
}
self.db.get_cf_opt(cf, key, &readopts) self.db.get_cf_opt(cf, key, &readopts)
} }
} }
impl<'a> Drop for Snapshot<'a> { impl<'a> Drop for Snapshot<'a> {
fn drop(&mut self) { fn drop(&mut self) {
unsafe { unsafe { self.db.release_snap(&self.snap) }
rocksdb_ffi::rocksdb_release_snapshot(self.db.inner, self.inner);
}
} }
} }
...@@ -281,12 +288,13 @@ impl DB { ...@@ -281,12 +288,13 @@ impl DB {
} }
pub fn open(opts: &Options, path: &str) -> Result<DB, String> { pub fn open(opts: &Options, path: &str) -> Result<DB, String> {
DB::open_cf(opts, path, &[]) DB::open_cf(opts, path, &[], &[])
} }
pub fn open_cf(opts: &Options, pub fn open_cf(opts: &Options,
path: &str, path: &str,
cfs: &[&str]) cfs: &[&str],
cf_opts: &[&Options])
-> Result<DB, String> { -> Result<DB, String> {
let cpath = match CString::new(path.as_bytes()) { let cpath = match CString::new(path.as_bytes()) {
Ok(c) => c, Ok(c) => c,
...@@ -296,33 +304,23 @@ impl DB { ...@@ -296,33 +304,23 @@ impl DB {
.to_owned()) .to_owned())
} }
}; };
let cpath_ptr = cpath.as_ptr(); if let Err(e) = fs::create_dir_all(&Path::new(path)) {
let ospath = Path::new(path);
if let Err(e) = fs::create_dir_all(&ospath) {
return Err(format!("Failed to create rocksdb directory: \ return Err(format!("Failed to create rocksdb directory: \
src/rocksdb.rs: \ src/rocksdb.rs: \
{:?}", {:?}",
e)); e));
} }
let mut err: *const i8 = 0 as *const i8; if cfs.len() != cf_opts.len() {
let err_ptr: *mut *const i8 = &mut err; return Err(format!("cfs.len() and cf_opts.len() not match."));
let db: rocksdb_ffi::DBInstance;
let mut cf_map = BTreeMap::new();
if cfs.len() == 0 {
unsafe {
db = rocksdb_ffi::rocksdb_open(opts.inner,
cpath_ptr as *const _,
err_ptr);
} }
} else {
let mut cfs_v = cfs.to_vec(); let mut cfs_v = cfs.to_vec();
let mut cf_opts_v = cf_opts.to_vec();
// Always open the default column family // Always open the default column family
if !cfs_v.contains(&DEFAULT_COLUMN_FAMILY) { if !cfs_v.contains(&DEFAULT_COLUMN_FAMILY) {
cfs_v.push(DEFAULT_COLUMN_FAMILY); cfs_v.push(DEFAULT_COLUMN_FAMILY);
cf_opts_v.push(opts);
} }
// We need to store our CStrings in an intermediate vector // We need to store our CStrings in an intermediate vector
...@@ -340,20 +338,23 @@ impl DB { ...@@ -340,20 +338,23 @@ impl DB {
.map(|_| rocksdb_ffi::DBCFHandle(0 as *mut c_void)) .map(|_| rocksdb_ffi::DBCFHandle(0 as *mut c_void))
.collect(); .collect();
// TODO(tyler) allow options to be passed in. let cfopts: Vec<rocksdb_ffi::DBOptions> =
let cfopts: Vec<rocksdb_ffi::DBOptions> = cfs_v.iter() cf_opts_v.iter().map(|x| x.inner).collect();
.map(|_| unsafe { rocksdb_ffi::rocksdb_options_create() })
.collect();
// Prepare to ship to C. let db: rocksdb_ffi::DBInstance;
let cfopts_ptr: *const rocksdb_ffi::DBOptions = cfopts.as_ptr(); let mut err: *const i8 = 0 as *const i8;
let handles: *const rocksdb_ffi::DBCFHandle = cfhandles.as_ptr(); let err_ptr: *mut *const i8 = &mut err;
let nfam = cfs_v.len();
unsafe { unsafe {
db = rocksdb_ffi::rocksdb_open_column_families(opts.inner, cpath_ptr as *const _, db = rocksdb_ffi::rocksdb_open_column_families(opts.inner,
nfam as c_int, cpath.as_ptr() as *const _,
cfs_v.len() as c_int,
cfnames.as_ptr() as *const _, cfnames.as_ptr() as *const _,
cfopts_ptr, handles, err_ptr); cfopts.as_ptr(),
cfhandles.as_ptr(),
err_ptr);
}
if !err.is_null() {
return Err(error_message(err));
} }
for handle in &cfhandles { for handle in &cfhandles {
...@@ -363,14 +364,11 @@ impl DB { ...@@ -363,14 +364,11 @@ impl DB {
} }
} }
let mut cf_map = BTreeMap::new();
for (n, h) in cfs_v.iter().zip(cfhandles) { for (n, h) in cfs_v.iter().zip(cfhandles) {
cf_map.insert((*n).to_owned(), h); cf_map.insert((*n).to_owned(), h);
} }
}
if !err.is_null() {
return Err(error_message(err));
}
if db.0.is_null() { if db.0.is_null() {
return Err("Could not initialize database.".to_owned()); return Err("Could not initialize database.".to_owned());
} }
...@@ -585,6 +583,11 @@ impl DB { ...@@ -585,6 +583,11 @@ impl DB {
self.cfs.get(name) self.cfs.get(name)
} }
/// get all column family names, including 'default'.
pub fn cf_names(&self) -> Vec<&str> {
self.cfs.iter().map(|(k, _)| k.as_str()).collect()
}
pub fn iter(&self) -> DBIterator { pub fn iter(&self) -> DBIterator {
let opts = ReadOptions::new(); let opts = ReadOptions::new();
self.iter_opt(&opts) self.iter_opt(&opts)
...@@ -594,18 +597,23 @@ impl DB { ...@@ -594,18 +597,23 @@ impl DB {
DBIterator::new(&self, opt) DBIterator::new(&self, opt)
} }
pub fn iter_cf(&self, pub fn iter_cf(&self, cf_handle: DBCFHandle) -> DBIterator {
cf_handle: DBCFHandle,
key: SeekKey)
-> Result<DBIterator, String> {
let opts = ReadOptions::new(); let opts = ReadOptions::new();
DBIterator::new_cf(&self, cf_handle, &opts, key) DBIterator::new_cf(&self, cf_handle, &opts)
} }
pub fn snapshot(&self) -> Snapshot { pub fn snapshot(&self) -> Snapshot {
Snapshot::new(self) Snapshot::new(self)
} }
pub unsafe fn unsafe_snap(&self) -> UnsafeSnap {
UnsafeSnap { inner: rocksdb_ffi::rocksdb_create_snapshot(self.inner) }
}
pub unsafe fn release_snap(&self, snap: &UnsafeSnap) {
rocksdb_ffi::rocksdb_release_snapshot(self.inner, snap.inner)
}
pub fn put_opt(&self, pub fn put_opt(&self,
key: &[u8], key: &[u8],
value: &[u8], value: &[u8],
...@@ -939,6 +947,14 @@ impl WriteBatch { ...@@ -939,6 +947,14 @@ impl WriteBatch {
pub fn new() -> WriteBatch { pub fn new() -> WriteBatch {
WriteBatch::default() WriteBatch::default()
} }
pub fn count(&self) -> usize {
unsafe { rocksdb_ffi::rocksdb_writebatch_count(self.inner) as usize }
}
pub fn is_empty(&self) -> bool {
self.count() == 0
}
} }
impl Drop for WriteBatch { impl Drop for WriteBatch {
...@@ -1061,12 +1077,10 @@ impl ReadOptions { ...@@ -1061,12 +1077,10 @@ impl ReadOptions {
} }
} }
fn set_snapshot(&mut self, snapshot: &Snapshot) { pub unsafe fn set_snapshot(&mut self, snapshot: &UnsafeSnap) {
unsafe {
rocksdb_ffi::rocksdb_readoptions_set_snapshot(self.inner, rocksdb_ffi::rocksdb_readoptions_set_snapshot(self.inner,
snapshot.inner); snapshot.inner);
} }
}
} }
pub struct DBVector { pub struct DBVector {
...@@ -1143,7 +1157,11 @@ mod test { ...@@ -1143,7 +1157,11 @@ mod test {
// test put // test put
let batch = WriteBatch::new(); let batch = WriteBatch::new();
assert!(db.get(b"k1").unwrap().is_none()); assert!(db.get(b"k1").unwrap().is_none());
assert_eq!(batch.count(), 0);
assert!(batch.is_empty());
let _ = batch.put(b"k1", b"v1111"); let _ = batch.put(b"k1", b"v1111");
assert_eq!(batch.count(), 1);
assert!(!batch.is_empty());
assert!(db.get(b"k1").unwrap().is_none()); assert!(db.get(b"k1").unwrap().is_none());
let p = db.write(batch); let p = db.write(batch);
assert!(p.is_ok()); assert!(p.is_ok());
...@@ -1153,6 +1171,8 @@ mod test { ...@@ -1153,6 +1171,8 @@ mod test {
// test delete // test delete
let batch = WriteBatch::new(); let batch = WriteBatch::new();
let _ = batch.delete(b"k1"); let _ = batch.delete(b"k1");
assert_eq!(batch.count(), 1);
assert!(!batch.is_empty());
let p = db.write(batch); let p = db.write(batch);
assert!(p.is_ok()); assert!(p.is_ok());
assert!(db.get(b"k1").unwrap().is_none()); assert!(db.get(b"k1").unwrap().is_none());
......
...@@ -193,7 +193,8 @@ impl Options { ...@@ -193,7 +193,8 @@ impl Options {
} }
} }
pub fn compression_per_level(&mut self, level_types: &[DBCompressionType]) { pub fn compression_per_level(&mut self,
level_types: &[DBCompressionType]) {
unsafe { unsafe {
rocksdb_ffi::rocksdb_options_set_compression_per_level(self.inner, rocksdb_ffi::rocksdb_options_set_compression_per_level(self.inner,
level_types.as_ptr(), level_types.as_ptr(),
...@@ -402,9 +403,11 @@ impl Options { ...@@ -402,9 +403,11 @@ impl Options {
pub fn set_report_bg_io_stats(&mut self, enable: bool) { pub fn set_report_bg_io_stats(&mut self, enable: bool) {
unsafe { unsafe {
if enable { if enable {
rocksdb_ffi::rocksdb_options_set_report_bg_io_stats(self.inner, 1); rocksdb_ffi::rocksdb_options_set_report_bg_io_stats(self.inner,
1);
} else { } else {
rocksdb_ffi::rocksdb_options_set_report_bg_io_stats(self.inner, 0); rocksdb_ffi::rocksdb_options_set_report_bg_io_stats(self.inner,
0);
} }
} }
} }
......
...@@ -33,6 +33,7 @@ pub fn test_column_family() { ...@@ -33,6 +33,7 @@ pub fn test_column_family() {
panic!("could not create column family: {}", e); panic!("could not create column family: {}", e);
} }
} }
assert_eq!(db.cf_names(), vec!["cf1", "default"]);
} }
// should fail to open db without specifying same column families // should fail to open db without specifying same column families
...@@ -56,7 +57,7 @@ pub fn test_column_family() { ...@@ -56,7 +57,7 @@ pub fn test_column_family() {
{ {
let mut opts = Options::new(); let mut opts = Options::new();
opts.add_merge_operator("test operator", test_provided_merge); opts.add_merge_operator("test operator", test_provided_merge);
match DB::open_cf(&opts, path_str, &["cf1"]) { match DB::open_cf(&opts, path_str, &["cf1"], &[&opts]) {
Ok(_) => println!("successfully opened db with column family"), Ok(_) => println!("successfully opened db with column family"),
Err(e) => panic!("failed to open db with column family: {}", e), Err(e) => panic!("failed to open db with column family: {}", e),
} }
...@@ -65,7 +66,7 @@ pub fn test_column_family() { ...@@ -65,7 +66,7 @@ pub fn test_column_family() {
{ {
let mut opts = Options::new(); let mut opts = Options::new();
opts.add_merge_operator("test operator", test_provided_merge); opts.add_merge_operator("test operator", test_provided_merge);
let db = match DB::open_cf(&opts, path_str, &["cf1"]) { let db = match DB::open_cf(&opts, path_str, &["cf1"], &[&opts]) {
Ok(db) => { Ok(db) => {
println!("successfully opened db with column family"); println!("successfully opened db with column family");
db db
...@@ -113,7 +114,11 @@ pub fn test_column_family() { ...@@ -113,7 +114,11 @@ pub fn test_column_family() {
} }
// should b able to drop a cf // should b able to drop a cf
{ {
let mut db = DB::open_cf(&Options::new(), path_str, &["cf1"]).unwrap(); let mut db = DB::open_cf(&Options::new(),
path_str,
&["cf1"],
&[&Options::new()])
.unwrap();
match db.drop_cf("cf1") { match db.drop_cf("cf1") {
Ok(_) => println!("cf1 successfully dropped."), Ok(_) => println!("cf1 successfully dropped."),
Err(e) => panic!("failed to drop column family: {}", e), Err(e) => panic!("failed to drop column family: {}", e),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment