Commit 518aa2a2 authored by Wallace's avatar Wallace Committed by Yi Wu

add multi batch write interface (#436)

* add multi batch write interface
parent 48bf4dc6
......@@ -960,6 +960,20 @@ void crocksdb_write(
SaveError(errptr, db->rep->Write(options->rep, &batch->rep));
}
void crocksdb_write_multi_batch(
crocksdb_t* db,
const crocksdb_writeoptions_t* options,
crocksdb_writebatch_t** batches,
size_t batch_size,
char** errptr) {
thread_local std::vector<WriteBatch*> ws;
ws.clear();
for (size_t i = 0; i < batch_size; i ++) {
ws.push_back(&batches[i]->rep);
}
SaveError(errptr, db->rep->MultiThreadWrite(options->rep, ws));
}
char* crocksdb_get(
crocksdb_t* db,
const crocksdb_readoptions_t* options,
......@@ -2601,6 +2615,11 @@ void crocksdb_options_set_enable_pipelined_write(crocksdb_options_t *opt,
opt->rep.enable_pipelined_write = v;
}
void crocksdb_options_set_enable_multi_batch_write(crocksdb_options_t *opt,
unsigned char v) {
opt->rep.enable_multi_thread_write = v;
}
void crocksdb_options_set_unordered_write(crocksdb_options_t* opt,
unsigned char v) {
opt->rep.unordered_write = v;
......
......@@ -342,6 +342,10 @@ extern C_ROCKSDB_LIBRARY_API void crocksdb_write(
crocksdb_t* db, const crocksdb_writeoptions_t* options,
crocksdb_writebatch_t* batch, char** errptr);
extern C_ROCKSDB_LIBRARY_API void crocksdb_write_multi_batch(
crocksdb_t* db, const crocksdb_writeoptions_t* options,
crocksdb_writebatch_t** batches, size_t batch_size, char** errptr);
/* Returns NULL if not found. A malloc()ed array otherwise.
Stores the length of the array in *vallen. */
extern C_ROCKSDB_LIBRARY_API char* crocksdb_get(
......@@ -1051,6 +1055,9 @@ extern C_ROCKSDB_LIBRARY_API void crocksdb_options_set_bytes_per_sync(
extern C_ROCKSDB_LIBRARY_API void
crocksdb_options_set_enable_pipelined_write(crocksdb_options_t *, unsigned char);
extern C_ROCKSDB_LIBRARY_API void
crocksdb_options_set_enable_multi_batch_write(crocksdb_options_t *opt,
unsigned char v);
extern C_ROCKSDB_LIBRARY_API void
crocksdb_options_set_unordered_write(crocksdb_options_t*, unsigned char);
extern C_ROCKSDB_LIBRARY_API void
crocksdb_options_set_allow_concurrent_memtable_write(crocksdb_options_t *,
......
......@@ -560,6 +560,7 @@ extern "C" {
pub fn crocksdb_options_set_use_fsync(options: *mut Options, v: c_int);
pub fn crocksdb_options_set_bytes_per_sync(options: *mut Options, bytes: u64);
pub fn crocksdb_options_set_enable_pipelined_write(options: *mut Options, v: bool);
pub fn crocksdb_options_set_enable_multi_batch_write(options: *mut Options, v: bool);
pub fn crocksdb_options_set_unordered_write(options: *mut Options, v: bool);
pub fn crocksdb_options_set_allow_concurrent_memtable_write(options: *mut Options, v: bool);
pub fn crocksdb_options_set_manual_wal_flush(options: *mut Options, v: bool);
......@@ -990,6 +991,13 @@ extern "C" {
batch: *mut DBWriteBatch,
err: *mut *mut c_char,
);
pub fn crocksdb_write_multi_batch(
db: *mut DBInstance,
writeopts: *const DBWriteOptions,
batch: *const *mut DBWriteBatch,
batchlen: size_t,
err: *mut *mut c_char,
);
pub fn crocksdb_writebatch_create() -> *mut DBWriteBatch;
pub fn crocksdb_writebatch_create_with_capacity(cap: size_t) -> *mut DBWriteBatch;
pub fn crocksdb_writebatch_create_from(rep: *const u8, size: size_t) -> *mut DBWriteBatch;
......
......@@ -766,6 +766,29 @@ impl DB {
Ok(())
}
pub fn multi_batch_write(
&self,
batches: &[WriteBatch],
writeopts: &WriteOptions,
) -> Result<(), String> {
unsafe {
let b: Vec<*mut DBWriteBatch> = batches
.iter()
.filter(|w| w.count() > 0)
.map(|w| w.inner)
.collect();
if !b.is_empty() {
ffi_try!(crocksdb_write_multi_batch(
self.inner,
writeopts.inner,
b.as_ptr(),
b.len()
));
}
}
Ok(())
}
pub fn write(&self, batch: &WriteBatch) -> Result<(), String> {
self.write_opt(batch, &WriteOptions::new())
}
......@@ -3263,6 +3286,29 @@ mod test {
assert!(mp.is_some());
}
#[test]
fn test_multi_batch_write() {
let mut opts = DBOptions::new();
opts.create_if_missing(true);
opts.enable_multi_batch_write(true);
let path = tempdir_with_prefix("_rust_rocksdb_multi_batch");
let db = DB::open(opts, path.path().to_str().unwrap()).unwrap();
let cf = db.cf_handle("default").unwrap();
let mut data = Vec::new();
for s in &[b"ab", b"cd", b"ef"] {
let w = WriteBatch::new();
w.put_cf(cf, s.to_vec().as_slice(), b"a").unwrap();
data.push(w);
}
db.multi_batch_write(&data, &WriteOptions::new()).unwrap();
for s in &[b"ab", b"cd", b"ef"] {
let v = db.get_cf(cf, s.to_vec().as_slice()).unwrap();
assert!(v.is_some());
assert_eq!(v.unwrap().to_utf8().unwrap(), "a");
}
}
#[test]
fn test_get_db_path_from_option() {
let mut opts = DBOptions::new();
......
......@@ -1028,6 +1028,12 @@ impl DBOptions {
}
}
pub fn enable_multi_batch_write(&self, v: bool) {
unsafe {
crocksdb_ffi::crocksdb_options_set_enable_multi_batch_write(self.inner, v);
}
}
pub fn enable_unordered_write(&self, v: bool) {
unsafe {
crocksdb_ffi::crocksdb_options_set_unordered_write(self.inner, v);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment