Commit 49238c5d authored by Shuai Li's avatar Shuai Li Committed by GitHub

add rocksdb options manual_wal_flush support (#123)

parent a2da29b1
...@@ -1109,6 +1109,13 @@ void crocksdb_flush_cf( ...@@ -1109,6 +1109,13 @@ void crocksdb_flush_cf(
SaveError(errptr, db->rep->Flush(options->rep, column_family->rep)); SaveError(errptr, db->rep->Flush(options->rep, column_family->rep));
} }
void crocksdb_flush_wal(
crocksdb_t* db,
unsigned char sync,
char** errptr) {
SaveError(errptr, db->rep->FlushWAL(sync));
}
void crocksdb_sync_wal( void crocksdb_sync_wal(
crocksdb_t* db, crocksdb_t* db,
char** errptr) { char** errptr) {
...@@ -2133,6 +2140,10 @@ void crocksdb_options_set_allow_concurrent_memtable_write(crocksdb_options_t* op ...@@ -2133,6 +2140,10 @@ void crocksdb_options_set_allow_concurrent_memtable_write(crocksdb_options_t* op
opt->rep.allow_concurrent_memtable_write = v; opt->rep.allow_concurrent_memtable_write = v;
} }
void crocksdb_options_set_manual_wal_flush(crocksdb_options_t* opt, unsigned char v) {
opt->rep.manual_wal_flush = v;
}
void crocksdb_options_set_enable_write_thread_adaptive_yield( void crocksdb_options_set_enable_write_thread_adaptive_yield(
crocksdb_options_t* opt, unsigned char v) { crocksdb_options_t* opt, unsigned char v) {
opt->rep.enable_write_thread_adaptive_yield = v; opt->rep.enable_write_thread_adaptive_yield = v;
......
...@@ -415,6 +415,9 @@ extern C_ROCKSDB_LIBRARY_API void crocksdb_flush_cf( ...@@ -415,6 +415,9 @@ extern C_ROCKSDB_LIBRARY_API void crocksdb_flush_cf(
crocksdb_t* db, crocksdb_column_family_handle_t* column_family, crocksdb_t* db, crocksdb_column_family_handle_t* column_family,
const crocksdb_flushoptions_t* options, char** errptr); const crocksdb_flushoptions_t* options, char** errptr);
extern C_ROCKSDB_LIBRARY_API void crocksdb_flush_wal(
crocksdb_t* db, unsigned char sync, char** errptr);
extern C_ROCKSDB_LIBRARY_API void crocksdb_sync_wal( extern C_ROCKSDB_LIBRARY_API void crocksdb_sync_wal(
crocksdb_t* db, char** errptr); crocksdb_t* db, char** errptr);
...@@ -880,6 +883,8 @@ crocksdb_options_set_enable_pipelined_write(crocksdb_options_t *, ...@@ -880,6 +883,8 @@ crocksdb_options_set_enable_pipelined_write(crocksdb_options_t *,
extern C_ROCKSDB_LIBRARY_API void extern C_ROCKSDB_LIBRARY_API void
crocksdb_options_set_allow_concurrent_memtable_write(crocksdb_options_t *, crocksdb_options_set_allow_concurrent_memtable_write(crocksdb_options_t *,
unsigned char); unsigned char);
extern C_ROCKSDB_LIBRARY_API void crocksdb_options_set_manual_wal_flush(
crocksdb_options_t *, unsigned char);
extern C_ROCKSDB_LIBRARY_API void extern C_ROCKSDB_LIBRARY_API void
crocksdb_options_set_enable_write_thread_adaptive_yield(crocksdb_options_t*, crocksdb_options_set_enable_write_thread_adaptive_yield(crocksdb_options_t*,
unsigned char); unsigned char);
......
...@@ -357,6 +357,7 @@ extern "C" { ...@@ -357,6 +357,7 @@ extern "C" {
pub fn crocksdb_options_set_bytes_per_sync(options: *mut Options, bytes: u64); pub fn crocksdb_options_set_bytes_per_sync(options: *mut Options, bytes: u64);
pub fn crocksdb_options_set_enable_pipelined_write(options: *mut Options, v: bool); pub fn crocksdb_options_set_enable_pipelined_write(options: *mut Options, v: bool);
pub fn crocksdb_options_set_allow_concurrent_memtable_write(options: *mut Options, v: bool); pub fn crocksdb_options_set_allow_concurrent_memtable_write(options: *mut Options, v: bool);
pub fn crocksdb_options_set_manual_wal_flush(options: *mut Options, v: bool);
pub fn crocksdb_options_optimize_for_point_lookup( pub fn crocksdb_options_optimize_for_point_lookup(
options: *mut Options, options: *mut Options,
block_cache_size_mb: u64, block_cache_size_mb: u64,
...@@ -848,6 +849,7 @@ extern "C" { ...@@ -848,6 +849,7 @@ extern "C" {
options: *const DBFlushOptions, options: *const DBFlushOptions,
err: *mut *mut c_char, err: *mut *mut c_char,
); );
pub fn crocksdb_flush_wal(db: *mut DBInstance, sync: bool, err: *mut *mut c_char);
pub fn crocksdb_sync_wal(db: *mut DBInstance, err: *mut *mut c_char); pub fn crocksdb_sync_wal(db: *mut DBInstance, err: *mut *mut c_char);
pub fn crocksdb_approximate_sizes( pub fn crocksdb_approximate_sizes(
......
...@@ -810,6 +810,15 @@ impl DB { ...@@ -810,6 +810,15 @@ impl DB {
} }
} }
/// Flush the WAL memory buffer to the file. If sync is true, it calls SyncWAL
/// afterwards.
pub fn flush_wal(&self, sync: bool) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_flush_wal(self.inner, sync));
Ok(())
}
}
/// Sync the wal. Note that Write() followed by SyncWAL() is not exactly the /// Sync the wal. Note that Write() followed by SyncWAL() is not exactly the
/// same as Write() with sync=true: in the latter case the changes won't be /// same as Write() with sync=true: in the latter case the changes won't be
/// visible until the sync is done. /// visible until the sync is done.
......
...@@ -724,6 +724,12 @@ impl DBOptions { ...@@ -724,6 +724,12 @@ impl DBOptions {
} }
} }
pub fn manual_wal_flush(&self, v: bool) {
unsafe {
crocksdb_ffi::crocksdb_options_set_manual_wal_flush(self.inner, v);
}
}
/// the second parameter is a slice which contains tuples (path, target_size). /// the second parameter is a slice which contains tuples (path, target_size).
pub fn set_db_paths<T: AsRef<Path>>(&self, val: &[(T, u64)]) { pub fn set_db_paths<T: AsRef<Path>>(&self, val: &[(T, u64)]) {
let num_paths = val.len(); let num_paths = val.len();
......
...@@ -169,6 +169,19 @@ fn test_set_wal_opt() { ...@@ -169,6 +169,19 @@ fn test_set_wal_opt() {
drop(db); drop(db);
} }
#[cfg(not(windows))]
#[test]
fn test_flush_wal() {
let path = TempDir::new("_rust_rocksdb_test_flush_wal").expect("");
let mut opts = DBOptions::new();
opts.create_if_missing(true);
opts.manual_wal_flush(true);
let db = DB::open(opts, path.path().to_str().unwrap()).unwrap();
db.put(b"key", b"value").unwrap();
db.flush_wal(false).unwrap();
drop(db);
}
#[cfg(not(windows))] #[cfg(not(windows))]
#[test] #[test]
fn test_sync_wal() { fn test_sync_wal() {
...@@ -411,6 +424,18 @@ fn test_allow_concurrent_memtable_write() { ...@@ -411,6 +424,18 @@ fn test_allow_concurrent_memtable_write() {
} }
} }
#[test]
fn test_manual_wal_flush() {
let path = TempDir::new("_rust_rocksdb_manual_wal_flush").expect("");
let mut opts = DBOptions::new();
opts.create_if_missing(true);
opts.manual_wal_flush(true);
let db = DB::open(opts, path.path().to_str().unwrap()).unwrap();
for i in 0..200 {
db.put(format!("k_{}", i).as_bytes(), b"v").unwrap();
}
}
#[test] #[test]
fn test_enable_pipelined_write() { fn test_enable_pipelined_write() {
let path = TempDir::new("_rust_rocksdb_enable_pipelined_write").expect(""); let path = TempDir::new("_rust_rocksdb_enable_pipelined_write").expect("");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment