Commit fb75c242 authored by Kohei Watanabe's avatar Kohei Watanabe Committed by siddontang

Read only mode at opening DB (#141)

parent cc1db4a1
...@@ -510,6 +510,12 @@ extern "C" { ...@@ -510,6 +510,12 @@ extern "C" {
path: *const c_char, path: *const c_char,
err: *mut *mut c_char, err: *mut *mut c_char,
) -> *mut DBInstance; ) -> *mut DBInstance;
pub fn crocksdb_open_for_read_only(
options: *mut Options,
path: *const c_char,
error_if_log_file_exist: bool,
err: *mut *mut c_char,
) -> *mut DBInstance;
pub fn crocksdb_writeoptions_create() -> *mut DBWriteOptions; pub fn crocksdb_writeoptions_create() -> *mut DBWriteOptions;
pub fn crocksdb_writeoptions_destroy(writeopts: *mut DBWriteOptions); pub fn crocksdb_writeoptions_destroy(writeopts: *mut DBWriteOptions);
pub fn crocksdb_writeoptions_set_sync(writeopts: *mut DBWriteOptions, v: bool); pub fn crocksdb_writeoptions_set_sync(writeopts: *mut DBWriteOptions, v: bool);
...@@ -828,6 +834,16 @@ extern "C" { ...@@ -828,6 +834,16 @@ extern "C" {
column_family_handles: *const *mut DBCFHandle, column_family_handles: *const *mut DBCFHandle,
err: *mut *mut c_char, err: *mut *mut c_char,
) -> *mut DBInstance; ) -> *mut DBInstance;
pub fn crocksdb_open_for_read_only_column_families(
options: *const Options,
path: *const c_char,
num_column_families: c_int,
column_family_names: *const *const c_char,
column_family_options: *const *const Options,
column_family_handles: *const *mut DBCFHandle,
error_if_log_file_exist: bool,
err: *mut *mut c_char,
) -> *mut DBInstance;
pub fn crocksdb_create_column_family( pub fn crocksdb_create_column_family(
db: *mut DBInstance, db: *mut DBInstance,
column_family_options: *const Options, column_family_options: *const Options,
......
...@@ -352,10 +352,38 @@ impl DB { ...@@ -352,10 +352,38 @@ impl DB {
where where
T: Into<ColumnFamilyDescriptor<'a>>, T: Into<ColumnFamilyDescriptor<'a>>,
{ {
DB::open_cf_internal(opts, path, cfds) DB::open_cf_internal(opts, path, cfds, None)
} }
fn open_cf_internal<'a, T>(opts: DBOptions, path: &str, cfds: Vec<T>) -> Result<DB, String> pub fn open_for_read_only(
opts: DBOptions,
path: &str,
error_if_log_file_exist: bool,
) -> Result<DB, String> {
let cfds: Vec<&str> = vec![];
DB::open_cf_for_read_only(opts, path, cfds, error_if_log_file_exist)
}
pub fn open_cf_for_read_only<'a, T>(
opts: DBOptions,
path: &str,
cfds: Vec<T>,
error_if_log_file_exist: bool,
) -> Result<DB, String>
where
T: Into<ColumnFamilyDescriptor<'a>>,
{
DB::open_cf_internal(opts, path, cfds, Some(error_if_log_file_exist))
}
fn open_cf_internal<'a, T>(
opts: DBOptions,
path: &str,
cfds: Vec<T>,
// if none, open for read write mode.
// otherwise, open for read only.
error_if_log_file_exist: Option<bool>,
) -> Result<DB, String>
where where
T: Into<ColumnFamilyDescriptor<'a>>, T: Into<ColumnFamilyDescriptor<'a>>,
{ {
...@@ -393,15 +421,29 @@ impl DB { ...@@ -393,15 +421,29 @@ impl DB {
let db_cf_ptrs = cf_names.as_ptr(); let db_cf_ptrs = cf_names.as_ptr();
let db_cf_opts = cf_options.as_ptr(); let db_cf_opts = cf_options.as_ptr();
let db_cf_handles = cf_handles.as_ptr(); let db_cf_handles = cf_handles.as_ptr();
unsafe { if let Some(flag) = error_if_log_file_exist {
ffi_try!(crocksdb_open_column_families( unsafe {
db_options, ffi_try!(crocksdb_open_for_read_only_column_families(
db_path, db_options,
db_cfs_count, db_path,
db_cf_ptrs, db_cfs_count,
db_cf_opts, db_cf_ptrs,
db_cf_handles db_cf_opts,
)) db_cf_handles,
flag
))
}
} else {
unsafe {
ffi_try!(crocksdb_open_column_families(
db_options,
db_path,
db_cfs_count,
db_cf_ptrs,
db_cf_opts,
db_cf_handles
))
}
} }
}; };
if cf_handles.iter().any(|h| h.is_null()) { if cf_handles.iter().any(|h| h.is_null()) {
......
...@@ -17,3 +17,4 @@ mod test_table_properties; ...@@ -17,3 +17,4 @@ mod test_table_properties;
mod test_event_listener; mod test_event_listener;
mod test_delete_range; mod test_delete_range;
mod test_delete_files_in_range; mod test_delete_files_in_range;
mod test_read_only;
use rocksdb::{ColumnFamilyOptions, DBOptions, Writable, DB};
use tempdir::TempDir;
macro_rules! check_kv {
( $db:expr, $key:expr, $val:expr ) => {
assert_eq!($db.get($key).unwrap().unwrap(), $val);
};
( $db:expr, $cf:expr, $key:expr, $val:expr ) => {
assert_eq!($db.get_cf($cf, $key).unwrap().unwrap(), $val);
}
}
#[test]
fn test_open_for_read_only() {
let temp = TempDir::new("_rust_rocksdb_test_open_for_read_only").expect("");
let path = temp.path().to_str().unwrap();
let mut opts = DBOptions::new();
opts.create_if_missing(true);
let rw = DB::open_default(path).unwrap();
rw.put(b"k1", b"v1").unwrap();
rw.put(b"k2", b"v2").unwrap();
rw.put(b"k3", b"v3").unwrap();
check_kv!(rw, b"k1", b"v1");
check_kv!(rw, b"k2", b"v2");
check_kv!(rw, b"k3", b"v3");
let r1 = DB::open_for_read_only(opts.clone(), path, false).unwrap();
check_kv!(r1, b"k1", b"v1");
check_kv!(r1, b"k2", b"v2");
check_kv!(r1, b"k3", b"v3");
let r2 = DB::open_for_read_only(opts.clone(), path, false).unwrap();
check_kv!(r2, b"k1", b"v1");
check_kv!(r2, b"k2", b"v2");
check_kv!(r2, b"k3", b"v3");
let r3 = DB::open_for_read_only(opts.clone(), path, false).unwrap();
check_kv!(r3, b"k1", b"v1");
check_kv!(r3, b"k2", b"v2");
check_kv!(r3, b"k3", b"v3");
drop(rw);
drop(r1);
drop(r2);
drop(r3);
}
#[test]
fn test_open_cf_for_read_only() {
let temp = TempDir::new("_rust_rocksdb_test_open_cf_for_read_only").expect("");
let path = temp.path().to_str().unwrap();
{
let mut rw = DB::open_default(path).unwrap();
let cf_opts = ColumnFamilyOptions::new();
let _ = rw.create_cf("cf1", cf_opts.clone()).unwrap();
let _ = rw.create_cf("cf2", cf_opts.clone()).unwrap();
}
{
let rw = DB::open_cf(DBOptions::new(), path, vec!["cf1", "cf2"]).unwrap();
let cf1 = rw.cf_handle("cf1").unwrap();
rw.put_cf(cf1, b"cf1_k1", b"cf1_v1").unwrap();
rw.put_cf(cf1, b"cf1_k2", b"cf1_v2").unwrap();
rw.put_cf(cf1, b"cf1_k3", b"cf1_v3").unwrap();
check_kv!(rw, cf1, b"cf1_k1", b"cf1_v1");
check_kv!(rw, cf1, b"cf1_k2", b"cf1_v2");
check_kv!(rw, cf1, b"cf1_k3", b"cf1_v3");
let cf2 = rw.cf_handle("cf2").unwrap();
rw.put_cf(cf2, b"cf2_k1", b"cf2_v1").unwrap();
rw.put_cf(cf2, b"cf2_k2", b"cf2_v2").unwrap();
rw.put_cf(cf2, b"cf2_k3", b"cf2_v3").unwrap();
check_kv!(rw, cf2, b"cf2_k1", b"cf2_v1");
check_kv!(rw, cf2, b"cf2_k2", b"cf2_v2");
check_kv!(rw, cf2, b"cf2_k3", b"cf2_v3");
}
{
let r1 = DB::open_cf_for_read_only(DBOptions::new(), path, vec!["cf1"], false).unwrap();
let cf1 = r1.cf_handle("cf1").unwrap();
check_kv!(r1, cf1, b"cf1_k1", b"cf1_v1");
check_kv!(r1, cf1, b"cf1_k2", b"cf1_v2");
check_kv!(r1, cf1, b"cf1_k3", b"cf1_v3");
let r2 = DB::open_cf_for_read_only(DBOptions::new(), path, vec!["cf2"], false).unwrap();
let cf2 = r2.cf_handle("cf2").unwrap();
check_kv!(r2, cf2, b"cf2_k1", b"cf2_v1");
check_kv!(r2, cf2, b"cf2_k2", b"cf2_v2");
check_kv!(r2, cf2, b"cf2_k3", b"cf2_v3");
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment