Commit 07d44ea8 authored by disksing's avatar disksing

update open_cf(), add cf_names().

parent 97993162
...@@ -157,16 +157,15 @@ impl<'a> DBIterator<'a> { ...@@ -157,16 +157,15 @@ impl<'a> DBIterator<'a> {
unsafe { rocksdb_ffi::rocksdb_iter_valid(self.inner) } unsafe { rocksdb_ffi::rocksdb_iter_valid(self.inner) }
} }
fn new_cf(db: &'a DB, pub fn new_cf(db: &'a DB,
cf_handle: DBCFHandle, cf_handle: DBCFHandle,
readopts: &ReadOptions) readopts: &ReadOptions)
-> DBIterator<'a> { -> DBIterator<'a> {
unsafe { unsafe {
let iterator = let iterator =
rocksdb_ffi::rocksdb_create_iterator_cf(db.inner, rocksdb_ffi::rocksdb_create_iterator_cf(db.inner,
readopts.inner, readopts.inner,
cf_handle); cf_handle);
DBIterator { DBIterator {
db: db, db: db,
inner: iterator, inner: iterator,
...@@ -289,12 +288,13 @@ impl DB { ...@@ -289,12 +288,13 @@ impl DB {
} }
pub fn open(opts: &Options, path: &str) -> Result<DB, String> { pub fn open(opts: &Options, path: &str) -> Result<DB, String> {
DB::open_cf(opts, path, &[]) DB::open_cf(opts, path, &[], &[])
} }
pub fn open_cf(opts: &Options, pub fn open_cf(opts: &Options,
path: &str, path: &str,
cfs: &[&str]) cfs: &[&str],
cf_opts: &[&Options])
-> Result<DB, String> { -> Result<DB, String> {
let cpath = match CString::new(path.as_bytes()) { let cpath = match CString::new(path.as_bytes()) {
Ok(c) => c, Ok(c) => c,
...@@ -304,81 +304,71 @@ impl DB { ...@@ -304,81 +304,71 @@ impl DB {
.to_owned()) .to_owned())
} }
}; };
let cpath_ptr = cpath.as_ptr(); if let Err(e) = fs::create_dir_all(&Path::new(path)) {
let ospath = Path::new(path);
if let Err(e) = fs::create_dir_all(&ospath) {
return Err(format!("Failed to create rocksdb directory: \ return Err(format!("Failed to create rocksdb directory: \
src/rocksdb.rs: \ src/rocksdb.rs: \
{:?}", {:?}",
e)); e));
} }
let mut err: *const i8 = 0 as *const i8; if cfs.len() != cf_opts.len() {
let err_ptr: *mut *const i8 = &mut err; return Err(format!("cfs.len() and cf_opts.len() not match."));
let db: rocksdb_ffi::DBInstance; }
let mut cf_map = BTreeMap::new();
if cfs.len() == 0 { let mut cfs_v = cfs.to_vec();
unsafe { let mut cf_opts_v = cf_opts.to_vec();
db = rocksdb_ffi::rocksdb_open(opts.inner, // Always open the default column family
cpath_ptr as *const _, if !cfs_v.contains(&DEFAULT_COLUMN_FAMILY) {
err_ptr); cfs_v.push(DEFAULT_COLUMN_FAMILY);
} cf_opts_v.push(opts);
} else { }
let mut cfs_v = cfs.to_vec();
// Always open the default column family
if !cfs_v.contains(&DEFAULT_COLUMN_FAMILY) {
cfs_v.push(DEFAULT_COLUMN_FAMILY);
}
// We need to store our CStrings in an intermediate vector // We need to store our CStrings in an intermediate vector
// so that their pointers remain valid. // so that their pointers remain valid.
let c_cfs: Vec<CString> = cfs_v.iter() let c_cfs: Vec<CString> = cfs_v.iter()
.map(|cf| CString::new(cf.as_bytes()).unwrap()) .map(|cf| CString::new(cf.as_bytes()).unwrap())
.collect(); .collect();
let cfnames: Vec<*const _> = c_cfs.iter()
.map(|cf| cf.as_ptr())
.collect();
// These handles will be populated by DB.
let cfhandles: Vec<rocksdb_ffi::DBCFHandle> = cfs_v.iter()
.map(|_| rocksdb_ffi::DBCFHandle(0 as *mut c_void))
.collect();
// TODO(tyler) allow options to be passed in.
let cfopts: Vec<rocksdb_ffi::DBOptions> = cfs_v.iter()
.map(|_| unsafe { rocksdb_ffi::rocksdb_options_create() })
.collect();
// Prepare to ship to C.
let cfopts_ptr: *const rocksdb_ffi::DBOptions = cfopts.as_ptr();
let handles: *const rocksdb_ffi::DBCFHandle = cfhandles.as_ptr();
let nfam = cfs_v.len();
unsafe {
db = rocksdb_ffi::rocksdb_open_column_families(opts.inner, cpath_ptr as *const _,
nfam as c_int,
cfnames.as_ptr() as *const _,
cfopts_ptr, handles, err_ptr);
}
for handle in &cfhandles { let cfnames: Vec<*const _> = c_cfs.iter()
if handle.0.is_null() { .map(|cf| cf.as_ptr())
return Err("Received null column family handle from DB." .collect();
.to_owned());
}
}
for (n, h) in cfs_v.iter().zip(cfhandles) { // These handles will be populated by DB.
cf_map.insert((*n).to_owned(), h); let cfhandles: Vec<rocksdb_ffi::DBCFHandle> = cfs_v.iter()
} .map(|_| rocksdb_ffi::DBCFHandle(0 as *mut c_void))
} .collect();
let cfopts: Vec<rocksdb_ffi::DBOptions> =
cf_opts_v.iter().map(|x| x.inner).collect();
let db: rocksdb_ffi::DBInstance;
let mut err: *const i8 = 0 as *const i8;
let err_ptr: *mut *const i8 = &mut err;
unsafe {
db = rocksdb_ffi::rocksdb_open_column_families(opts.inner,
cpath.as_ptr() as *const _,
cfs_v.len() as c_int,
cfnames.as_ptr() as *const _,
cfopts.as_ptr(),
cfhandles.as_ptr(),
err_ptr);
}
if !err.is_null() { if !err.is_null() {
return Err(error_message(err)); return Err(error_message(err));
} }
for handle in &cfhandles {
if handle.0.is_null() {
return Err("Received null column family handle from DB."
.to_owned());
}
}
let mut cf_map = BTreeMap::new();
for (n, h) in cfs_v.iter().zip(cfhandles) {
cf_map.insert((*n).to_owned(), h);
}
if db.0.is_null() { if db.0.is_null() {
return Err("Could not initialize database.".to_owned()); return Err("Could not initialize database.".to_owned());
} }
...@@ -593,6 +583,11 @@ impl DB { ...@@ -593,6 +583,11 @@ impl DB {
self.cfs.get(name) self.cfs.get(name)
} }
/// get all column family names, without 'default'.
pub fn cf_names(&self) -> Vec<&str> {
self.cfs.iter().map(|(k, _)| k.as_str()).collect()
}
pub fn iter(&self) -> DBIterator { pub fn iter(&self) -> DBIterator {
let opts = ReadOptions::new(); let opts = ReadOptions::new();
self.iter_opt(&opts) self.iter_opt(&opts)
......
...@@ -33,6 +33,7 @@ pub fn test_column_family() { ...@@ -33,6 +33,7 @@ pub fn test_column_family() {
panic!("could not create column family: {}", e); panic!("could not create column family: {}", e);
} }
} }
assert_eq!(db.cf_names(), vec!["cf1", "default"]);
} }
// should fail to open db without specifying same column families // should fail to open db without specifying same column families
...@@ -56,7 +57,7 @@ pub fn test_column_family() { ...@@ -56,7 +57,7 @@ pub fn test_column_family() {
{ {
let mut opts = Options::new(); let mut opts = Options::new();
opts.add_merge_operator("test operator", test_provided_merge); opts.add_merge_operator("test operator", test_provided_merge);
match DB::open_cf(&opts, path_str, &["cf1"]) { match DB::open_cf(&opts, path_str, &["cf1"], &[&opts]) {
Ok(_) => println!("successfully opened db with column family"), Ok(_) => println!("successfully opened db with column family"),
Err(e) => panic!("failed to open db with column family: {}", e), Err(e) => panic!("failed to open db with column family: {}", e),
} }
...@@ -65,7 +66,7 @@ pub fn test_column_family() { ...@@ -65,7 +66,7 @@ pub fn test_column_family() {
{ {
let mut opts = Options::new(); let mut opts = Options::new();
opts.add_merge_operator("test operator", test_provided_merge); opts.add_merge_operator("test operator", test_provided_merge);
let db = match DB::open_cf(&opts, path_str, &["cf1"]) { let db = match DB::open_cf(&opts, path_str, &["cf1"], &[&opts]) {
Ok(db) => { Ok(db) => {
println!("successfully opened db with column family"); println!("successfully opened db with column family");
db db
...@@ -113,7 +114,11 @@ pub fn test_column_family() { ...@@ -113,7 +114,11 @@ pub fn test_column_family() {
} }
// should b able to drop a cf // should b able to drop a cf
{ {
let mut db = DB::open_cf(&Options::new(), path_str, &["cf1"]).unwrap(); let mut db = DB::open_cf(&Options::new(),
path_str,
&["cf1"],
&[&Options::new()])
.unwrap();
match db.drop_cf("cf1") { match db.drop_cf("cf1") {
Ok(_) => println!("cf1 successfully dropped."), Ok(_) => println!("cf1 successfully dropped."),
Err(e) => panic!("failed to drop column family: {}", e), Err(e) => panic!("failed to drop column family: {}", e),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment