Commit c5a0aa66 authored by zhangjinpeng1987's avatar zhangjinpeng1987

DB hold Options

parent 48bad690
...@@ -90,7 +90,7 @@ fn custom_merge() { ...@@ -90,7 +90,7 @@ fn custom_merge() {
opts.create_if_missing(true); opts.create_if_missing(true);
opts.add_merge_operator("test operator", concat_merge); opts.add_merge_operator("test operator", concat_merge);
{ {
let db = DB::open(&opts, path).unwrap(); let db = DB::open(opts, path).unwrap();
db.put(b"k1", b"a").unwrap(); db.put(b"k1", b"a").unwrap();
db.merge(b"k1", b"b").unwrap(); db.merge(b"k1", b"b").unwrap();
db.merge(b"k1", b"c").unwrap(); db.merge(b"k1", b"c").unwrap();
...@@ -108,6 +108,7 @@ fn custom_merge() { ...@@ -108,6 +108,7 @@ fn custom_merge() {
Err(e) => println!("error retrieving value: {}", e), Err(e) => println!("error retrieving value: {}", e),
} }
} }
let opts = Options::new();
DB::destroy(&opts, path).is_ok(); DB::destroy(&opts, path).is_ok();
} }
...@@ -146,7 +147,7 @@ mod tests { ...@@ -146,7 +147,7 @@ mod tests {
#[allow(dead_code)] #[allow(dead_code)]
fn tuned_for_somebody_elses_disk(path: &str, fn tuned_for_somebody_elses_disk(path: &str,
opts: &mut Options, mut opts: Options,
blockopts: &mut BlockBasedOptions) blockopts: &mut BlockBasedOptions)
-> DB { -> DB {
let per_level_compression: [DBCompressionType; 7] = [DBCompressionType::DBNo, let per_level_compression: [DBCompressionType; 7] = [DBCompressionType::DBNo,
...@@ -189,7 +190,7 @@ mod tests { ...@@ -189,7 +190,7 @@ mod tests {
// let filter = new_bloom_filter(10); // let filter = new_bloom_filter(10);
// opts.set_filter(filter); // opts.set_filter(filter);
DB::open(&opts, path).unwrap() DB::open(opts, path).unwrap()
} }
// TODO(tyler) unstable // TODO(tyler) unstable
......
...@@ -180,7 +180,7 @@ mod test { ...@@ -180,7 +180,7 @@ mod test {
let mut opts = Options::new(); let mut opts = Options::new();
opts.create_if_missing(true); opts.create_if_missing(true);
opts.add_merge_operator("test operator", test_provided_merge); opts.add_merge_operator("test operator", test_provided_merge);
let db = DB::open(&opts, path.path().to_str().unwrap()).unwrap(); let db = DB::open(opts, path.path().to_str().unwrap()).unwrap();
let p = db.put(b"k1", b"a"); let p = db.put(b"k1", b"a");
assert!(p.is_ok()); assert!(p.is_ok());
let _ = db.merge(b"k1", b"b"); let _ = db.merge(b"k1", b"b");
......
...@@ -46,6 +46,7 @@ pub struct DB { ...@@ -46,6 +46,7 @@ pub struct DB {
inner: *mut DBInstance, inner: *mut DBInstance,
cfs: BTreeMap<String, CFHandle>, cfs: BTreeMap<String, CFHandle>,
path: String, path: String,
opts: Options,
} }
unsafe impl Send for DB {} unsafe impl Send for DB {}
...@@ -265,14 +266,14 @@ impl DB { ...@@ -265,14 +266,14 @@ impl DB {
pub fn open_default(path: &str) -> Result<DB, String> { pub fn open_default(path: &str) -> Result<DB, String> {
let mut opts = Options::new(); let mut opts = Options::new();
opts.create_if_missing(true); opts.create_if_missing(true);
DB::open(&opts, path) DB::open(opts, path)
} }
pub fn open(opts: &Options, path: &str) -> Result<DB, String> { pub fn open(opts: Options, path: &str) -> Result<DB, String> {
DB::open_cf(opts, path, &[], &[]) DB::open_cf(opts, path, &[], &[])
} }
pub fn open_cf(opts: &Options, pub fn open_cf(opts: Options,
path: &str, path: &str,
cfs: &[&str], cfs: &[&str],
cf_opts: &[&Options]) cf_opts: &[&Options])
...@@ -294,61 +295,66 @@ impl DB { ...@@ -294,61 +295,66 @@ impl DB {
return Err(format!("cfs.len() and cf_opts.len() not match.")); return Err(format!("cfs.len() and cf_opts.len() not match."));
} }
let mut cfs_v = cfs.to_vec(); let (db, cf_map) = {
let mut cf_opts_v = cf_opts.to_vec(); let mut cfs_v = cfs.to_vec();
// Always open the default column family let mut cf_opts_v = cf_opts.to_vec();
if !cfs_v.contains(&DEFAULT_COLUMN_FAMILY) { // Always open the default column family
cfs_v.push(DEFAULT_COLUMN_FAMILY); if !cfs_v.contains(&DEFAULT_COLUMN_FAMILY) {
cf_opts_v.push(opts); cfs_v.push(DEFAULT_COLUMN_FAMILY);
} cf_opts_v.push(&opts);
}
// We need to store our CStrings in an intermediate vector
// so that their pointers remain valid.
let c_cfs: Vec<CString> = cfs_v.iter()
.map(|cf| CString::new(cf.as_bytes()).unwrap())
.collect();
let cfnames: Vec<*const _> = c_cfs.iter()
.map(|cf| cf.as_ptr())
.collect();
// These handles will be populated by DB.
let cfhandles: Vec<_> = cfs_v.iter()
.map(|_| 0 as *mut DBCFHandle)
.collect();
let cfopts: Vec<_> = cf_opts_v.iter() // We need to store our CStrings in an intermediate vector
.map(|x| x.inner as *const rocksdb_ffi::DBOptions) // so that their pointers remain valid.
.collect(); let c_cfs: Vec<CString> = cfs_v.iter()
.map(|cf| CString::new(cf.as_bytes()).unwrap())
.collect();
let cfnames: Vec<*const _> = c_cfs.iter()
.map(|cf| cf.as_ptr())
.collect();
// These handles will be populated by DB.
let cfhandles: Vec<_> = cfs_v.iter()
.map(|_| 0 as *mut DBCFHandle)
.collect();
let cfopts: Vec<_> = cf_opts_v.iter()
.map(|x| x.inner as *const rocksdb_ffi::DBOptions)
.collect();
let db = unsafe {
ffi_try!(rocksdb_open_column_families(opts.inner,
cpath.as_ptr(),
cfs_v.len() as c_int,
cfnames.as_ptr(),
cfopts.as_ptr(),
cfhandles.as_ptr()))
};
let db = unsafe { for handle in &cfhandles {
ffi_try!(rocksdb_open_column_families(opts.inner, if handle.is_null() {
cpath.as_ptr(), return Err("Received null column family handle from DB.".to_owned());
cfs_v.len() as c_int, }
cfnames.as_ptr(), }
cfopts.as_ptr(),
cfhandles.as_ptr()))
};
for handle in &cfhandles { let mut cf_map = BTreeMap::new();
if handle.is_null() { for (n, h) in cfs_v.iter().zip(cfhandles) {
return Err("Received null column family handle from DB.".to_owned()); cf_map.insert((*n).to_owned(), CFHandle { inner: h });
} }
}
let mut cf_map = BTreeMap::new(); if db.is_null() {
for (n, h) in cfs_v.iter().zip(cfhandles) { return Err("Could not initialize database.".to_owned());
cf_map.insert((*n).to_owned(), CFHandle { inner: h }); }
}
if db.is_null() { (db, cf_map)
return Err("Could not initialize database.".to_owned()); };
}
Ok(DB { Ok(DB {
inner: db, inner: db,
cfs: cf_map, cfs: cf_map,
path: path.to_owned(), path: path.to_owned(),
opts: opts,
}) })
} }
...@@ -796,6 +802,10 @@ impl DB { ...@@ -796,6 +802,10 @@ impl DB {
None None
} }
pub fn get_statistics(&self) -> Option<String> {
self.opts.get_statistics()
}
} }
impl Writable for DB { impl Writable for DB {
...@@ -1088,7 +1098,7 @@ mod test { ...@@ -1088,7 +1098,7 @@ mod test {
let mut opts = Options::new(); let mut opts = Options::new();
opts.create_if_missing(true); opts.create_if_missing(true);
let mut db = DB::open(&opts, path.path().to_str().unwrap()).unwrap(); let mut db = DB::open(opts, path.path().to_str().unwrap()).unwrap();
for (&cf, &cf_opts) in cfs.iter().zip(&cfs_ref_opts) { for (&cf, &cf_opts) in cfs.iter().zip(&cfs_ref_opts) {
if cf == "default" { if cf == "default" {
continue; continue;
......
...@@ -566,12 +566,6 @@ impl Drop for FlushOptions { ...@@ -566,12 +566,6 @@ impl Drop for FlushOptions {
rocksdb_ffi::rocksdb_flushoptions_destroy(self.inner); rocksdb_ffi::rocksdb_flushoptions_destroy(self.inner);
} }
} }
pub fn set_num_levels(&mut self, n: c_int) {
unsafe {
rocksdb_ffi::rocksdb_options_set_num_levels(self.inner, n);
}
}
} }
#[cfg(test)] #[cfg(test)]
......
...@@ -26,7 +26,7 @@ pub fn test_column_family() { ...@@ -26,7 +26,7 @@ pub fn test_column_family() {
let mut opts = Options::new(); let mut opts = Options::new();
opts.create_if_missing(true); opts.create_if_missing(true);
opts.add_merge_operator("test operator", test_provided_merge); opts.add_merge_operator("test operator", test_provided_merge);
let mut db = DB::open(&opts, path_str).unwrap(); let mut db = DB::open(opts, path_str).unwrap();
let opts = Options::new(); let opts = Options::new();
match db.create_cf("cf1", &opts) { match db.create_cf("cf1", &opts) {
Ok(_) => println!("cf1 created successfully"), Ok(_) => println!("cf1 created successfully"),
...@@ -41,7 +41,7 @@ pub fn test_column_family() { ...@@ -41,7 +41,7 @@ pub fn test_column_family() {
{ {
let mut opts = Options::new(); let mut opts = Options::new();
opts.add_merge_operator("test operator", test_provided_merge); opts.add_merge_operator("test operator", test_provided_merge);
match DB::open(&opts, path_str) { match DB::open(opts, path_str) {
Ok(_) => { Ok(_) => {
panic!("should not have opened DB successfully without \ panic!("should not have opened DB successfully without \
specifying column specifying column
...@@ -58,7 +58,7 @@ pub fn test_column_family() { ...@@ -58,7 +58,7 @@ pub fn test_column_family() {
{ {
let mut opts = Options::new(); let mut opts = Options::new();
opts.add_merge_operator("test operator", test_provided_merge); opts.add_merge_operator("test operator", test_provided_merge);
match DB::open_cf(&opts, path_str, &["cf1"], &[&opts]) { match DB::open_cf(Options::new(), path_str, &["cf1"], &[&opts]) {
Ok(_) => println!("successfully opened db with column family"), Ok(_) => println!("successfully opened db with column family"),
Err(e) => panic!("failed to open db with column family: {}", e), Err(e) => panic!("failed to open db with column family: {}", e),
} }
...@@ -67,7 +67,7 @@ pub fn test_column_family() { ...@@ -67,7 +67,7 @@ pub fn test_column_family() {
{ {
let mut opts = Options::new(); let mut opts = Options::new();
opts.add_merge_operator("test operator", test_provided_merge); opts.add_merge_operator("test operator", test_provided_merge);
let db = match DB::open_cf(&opts, path_str, &["cf1"], &[&opts]) { let db = match DB::open_cf(Options::new(), path_str, &["cf1"], &[&opts]) {
Ok(db) => { Ok(db) => {
println!("successfully opened db with column family"); println!("successfully opened db with column family");
db db
...@@ -115,7 +115,7 @@ pub fn test_column_family() { ...@@ -115,7 +115,7 @@ pub fn test_column_family() {
} }
// should b able to drop a cf // should b able to drop a cf
{ {
let mut db = DB::open_cf(&Options::new(), path_str, &["cf1"], &[&Options::new()]).unwrap(); let mut db = DB::open_cf(Options::new(), path_str, &["cf1"], &[&Options::new()]).unwrap();
match db.drop_cf("cf1") { match db.drop_cf("cf1") {
Ok(_) => println!("cf1 successfully dropped."), Ok(_) => println!("cf1 successfully dropped."),
Err(e) => panic!("failed to drop column family: {}", e), Err(e) => panic!("failed to drop column family: {}", e),
......
...@@ -7,7 +7,7 @@ fn test_compact_range() { ...@@ -7,7 +7,7 @@ fn test_compact_range() {
let path = TempDir::new("_rust_rocksdb_test_compact_range").expect(""); let path = TempDir::new("_rust_rocksdb_test_compact_range").expect("");
let mut opts = Options::new(); let mut opts = Options::new();
opts.create_if_missing(true); opts.create_if_missing(true);
let db = DB::open(&opts, path.path().to_str().unwrap()).unwrap(); let db = DB::open(opts, path.path().to_str().unwrap()).unwrap();
let samples = vec![ let samples = vec![
(b"k1".to_vec(), b"value--------1".to_vec()), (b"k1".to_vec(), b"value--------1".to_vec()),
(b"k2".to_vec(), b"value--------2".to_vec()), (b"k2".to_vec(), b"value--------2".to_vec()),
......
...@@ -37,7 +37,7 @@ fn test_compaction_filter() { ...@@ -37,7 +37,7 @@ fn test_compaction_filter() {
})) }))
.unwrap(); .unwrap();
opts.create_if_missing(true); opts.create_if_missing(true);
let db = DB::open(&opts, path.path().to_str().unwrap()).unwrap(); let db = DB::open(opts, path.path().to_str().unwrap()).unwrap();
let samples = vec![ let samples = vec![
(b"key1".to_vec(), b"value1".to_vec()), (b"key1".to_vec(), b"value1".to_vec()),
(b"key2".to_vec(), b"value2".to_vec()), (b"key2".to_vec(), b"value2".to_vec()),
...@@ -58,7 +58,9 @@ fn test_compaction_filter() { ...@@ -58,7 +58,9 @@ fn test_compaction_filter() {
} }
drop(db); drop(db);
// reregister with ignore_snapshots set to true // reregister with ignore_snapshots set to true
let mut opts = Options::new();
opts.set_compaction_filter("test", opts.set_compaction_filter("test",
true, true,
Box::new(Filter { Box::new(Filter {
...@@ -69,7 +71,7 @@ fn test_compaction_filter() { ...@@ -69,7 +71,7 @@ fn test_compaction_filter() {
assert!(drop_called.load(Ordering::Relaxed)); assert!(drop_called.load(Ordering::Relaxed));
drop_called.store(false, Ordering::Relaxed); drop_called.store(false, Ordering::Relaxed);
{ {
let db = DB::open(&opts, path.path().to_str().unwrap()).unwrap(); let db = DB::open(opts, path.path().to_str().unwrap()).unwrap();
let _snap = db.snapshot(); let _snap = db.snapshot();
// Because ignore_snapshots is true, so all the keys will be compacted. // Because ignore_snapshots is true, so all the keys will be compacted.
db.compact_range(Some(b"key1"), Some(b"key3")); db.compact_range(Some(b"key1"), Some(b"key3"));
...@@ -79,6 +81,5 @@ fn test_compaction_filter() { ...@@ -79,6 +81,5 @@ fn test_compaction_filter() {
assert_eq!(*filtered_kvs.read().unwrap(), samples); assert_eq!(*filtered_kvs.read().unwrap(), samples);
} }
drop(opts);
assert!(drop_called.load(Ordering::Relaxed)); assert!(drop_called.load(Ordering::Relaxed));
} }
...@@ -118,7 +118,7 @@ fn read_with_upper_bound() { ...@@ -118,7 +118,7 @@ fn read_with_upper_bound() {
let mut opts = Options::new(); let mut opts = Options::new();
opts.create_if_missing(true); opts.create_if_missing(true);
{ {
let db = DB::open(&opts, path.path().to_str().unwrap()).unwrap(); let db = DB::open(opts, path.path().to_str().unwrap()).unwrap();
let writeopts = WriteOptions::new(); let writeopts = WriteOptions::new();
db.put_opt(b"k1-0", b"a", &writeopts).unwrap(); db.put_opt(b"k1-0", b"a", &writeopts).unwrap();
db.put_opt(b"k1-1", b"b", &writeopts).unwrap(); db.put_opt(b"k1-1", b"b", &writeopts).unwrap();
......
...@@ -8,6 +8,6 @@ fn test_set_num_levels() { ...@@ -8,6 +8,6 @@ fn test_set_num_levels() {
let mut opts = Options::new(); let mut opts = Options::new();
opts.create_if_missing(true); opts.create_if_missing(true);
opts.set_num_levels(2); opts.set_num_levels(2);
let db = DB::open(&opts, path.path().to_str().unwrap()).unwrap(); let db = DB::open(opts, path.path().to_str().unwrap()).unwrap();
drop(db); drop(db);
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment