Unverified Commit b8839f7e authored by Neil Shen's avatar Neil Shen Committed by GitHub

Support atomic_flush (#289)

Signed-off-by: 's avatarNeil Shen <overvenus@gmail.com>
parent 6028deb5
......@@ -1380,6 +1380,19 @@ void crocksdb_flush_cf(
SaveError(errptr, db->rep->Flush(options->rep, column_family->rep));
}
void crocksdb_flush_cfs(
crocksdb_t* db,
const crocksdb_column_family_handle_t** column_familys,
int num_handles,
const crocksdb_flushoptions_t* options,
char** errptr) {
std::vector<rocksdb::ColumnFamilyHandle*> handles(num_handles);
for (int i = 0; i < num_handles; i++) {
handles[i] = column_familys[i]->rep;
}
SaveError(errptr, db->rep->Flush(options->rep, handles));
}
void crocksdb_flush_wal(
crocksdb_t* db,
unsigned char sync,
......@@ -2985,6 +2998,10 @@ void crocksdb_options_set_vector_memtable_factory(crocksdb_options_t* opt, uint6
opt->rep.memtable_factory.reset(new VectorRepFactory(reserved_bytes));
}
void crocksdb_options_set_atomic_flush(crocksdb_options_t* opt, unsigned char enable) {
opt->rep.atomic_flush = enable;
}
unsigned char crocksdb_load_latest_options(const char* dbpath, crocksdb_env_t* env,
crocksdb_options_t* db_options,
crocksdb_column_family_descriptor*** cf_descs,
......@@ -5564,7 +5581,7 @@ crocksdb_column_family_handle_t* ctitandb_create_column_family(
// Blindly cast db into TitanDB.
TitanDB* titan_db = reinterpret_cast<TitanDB*>(db->rep);
// Copy the ColumnFamilyOptions part of `column_family_options` into `titan_column_family_options`
*((ColumnFamilyOptions*)(&titan_column_family_options->rep)) =
*((ColumnFamilyOptions*)(&titan_column_family_options->rep)) =
column_family_options->rep;
crocksdb_column_family_handle_t* handle = new crocksdb_column_family_handle_t;
SaveError(errptr,
......
......@@ -495,6 +495,10 @@ extern C_ROCKSDB_LIBRARY_API void crocksdb_flush_cf(
crocksdb_t* db, crocksdb_column_family_handle_t* column_family,
const crocksdb_flushoptions_t* options, char** errptr);
extern C_ROCKSDB_LIBRARY_API void crocksdb_flush_cfs(
crocksdb_t* db, const crocksdb_column_family_handle_t** column_familys,
int num_handles, const crocksdb_flushoptions_t* options, char** errptr);
extern C_ROCKSDB_LIBRARY_API void crocksdb_flush_wal(
crocksdb_t* db, unsigned char sync, char** errptr);
......@@ -1176,6 +1180,8 @@ extern C_ROCKSDB_LIBRARY_API void crocksdb_options_set_ratelimiter(
crocksdb_options_t* opt, crocksdb_ratelimiter_t* limiter);
extern C_ROCKSDB_LIBRARY_API void crocksdb_options_set_vector_memtable_factory(
crocksdb_options_t* opt, uint64_t reserved_bytes);
extern C_ROCKSDB_LIBRARY_API void crocksdb_options_set_atomic_flush(
crocksdb_options_t* opt, unsigned char enable);
enum {
compaction_by_compensated_size = 0,
......
......@@ -795,6 +795,7 @@ extern "C" {
pub fn crocksdb_options_get_db_path(options: *mut Options, idx: size_t) -> *const c_char;
pub fn crocksdb_options_get_path_target_size(options: *mut Options, idx: size_t) -> u64;
pub fn crocksdb_options_set_vector_memtable_factory(options: *mut Options, reserved_bytes: u64);
pub fn crocksdb_options_set_atomic_flush(option: *mut Options, enable: bool);
pub fn crocksdb_filterpolicy_create_bloom_full(bits_per_key: c_int) -> *mut DBFilterPolicy;
pub fn crocksdb_filterpolicy_create_bloom(bits_per_key: c_int) -> *mut DBFilterPolicy;
pub fn crocksdb_open(
......@@ -1211,6 +1212,13 @@ extern "C" {
options: *const DBFlushOptions,
err: *mut *mut c_char,
);
pub fn crocksdb_flush_cfs(
db: *mut DBInstance,
cfs: *const *mut DBCFHandle,
num_cfs: size_t,
options: *const DBFlushOptions,
err: *mut *mut c_char,
);
pub fn crocksdb_flush_wal(db: *mut DBInstance, sync: bool, err: *mut *mut c_char);
pub fn crocksdb_sync_wal(db: *mut DBInstance, err: *mut *mut c_char);
......
......@@ -1093,27 +1093,48 @@ impl DB {
}
/// Flush all memtable data.
/// If sync, the flush will wait until the flush is done.
pub fn flush(&self, sync: bool) -> Result<(), String> {
/// If wait, the flush will wait until the flush is done.
pub fn flush(&self, wait: bool) -> Result<(), String> {
unsafe {
let mut opts = FlushOptions::new();
opts.set_wait(sync);
opts.set_wait(wait);
ffi_try!(crocksdb_flush(self.inner, opts.inner));
Ok(())
}
}
/// Flush all memtable data for specified cf.
/// If sync, the flush will wait until the flush is done.
pub fn flush_cf(&self, cf: &CFHandle, sync: bool) -> Result<(), String> {
/// If wait, the flush will wait until the flush is done.
pub fn flush_cf(&self, cf: &CFHandle, wait: bool) -> Result<(), String> {
unsafe {
let mut opts = FlushOptions::new();
opts.set_wait(sync);
opts.set_wait(wait);
ffi_try!(crocksdb_flush_cf(self.inner, cf.inner, opts.inner));
Ok(())
}
}
/// Flushes multiple column families.
/// If atomic flush is not enabled, flush_cfs is equivalent to
/// calling flush_cf multiple times.
/// If atomic flush is enabled, flush_cfs will flush all column families
/// specified in `cfs` up to the latest sequence number at the time
/// when flush is requested.
pub fn flush_cfs(&self, cfs: &[&CFHandle], wait: bool) -> Result<(), String> {
unsafe {
let cfs: Vec<*mut _> = cfs.iter().map(|cf| cf.inner).collect();
let mut opts = FlushOptions::new();
opts.set_wait(wait);
ffi_try!(crocksdb_flush_cfs(
self.inner,
cfs.as_ptr(),
cfs.len(),
opts.inner
));
Ok(())
}
}
/// Flush the WAL memory buffer to the file. If sync is true, it calls SyncWAL
/// afterwards.
pub fn flush_wal(&self, sync: bool) -> Result<(), String> {
......@@ -3304,6 +3325,48 @@ mod test {
assert!(seq2 > seq1);
}
#[test]
fn test_atomic_flush() {
let path = tempdir_with_prefix("_rust_rocksdb_test_atomic_flush");
let cfs = ["default", "cf1", "cf2", "cf3"];
let mut cfs_opts = vec![];
for _ in 0..cfs.len() {
cfs_opts.push(ColumnFamilyOptions::new());
}
{
let mut opts = DBOptions::new();
opts.create_if_missing(true);
opts.set_atomic_flush(true);
let mut db = DB::open(opts, path.path().to_str().unwrap()).unwrap();
let wb = WriteBatch::new();
for (cf, cf_opts) in cfs.iter().zip(cfs_opts.iter().cloned()) {
if *cf != "default" {
db.create_cf((*cf, cf_opts)).unwrap();
}
let handle = db.cf_handle(cf).unwrap();
wb.put_cf(handle, b"k", cf.as_bytes()).unwrap();
}
let mut options = WriteOptions::new();
options.disable_wal(true);
db.write_opt(&wb, &options).unwrap();
let handles: Vec<_> = cfs.iter().map(|name| db.cf_handle(name).unwrap()).collect();
db.flush_cfs(&handles, true).unwrap();
}
let opts = DBOptions::new();
let db = DB::open_cf(
opts,
path.path().to_str().unwrap(),
cfs.iter().map(|cf| *cf).zip(cfs_opts).collect(),
)
.unwrap();
for cf in &cfs {
let handle = db.cf_handle(cf).unwrap();
assert_eq!(db.get_cf(handle, b"k").unwrap().unwrap(), cf.as_bytes());
}
}
#[test]
fn test_map_property() {
let path = tempdir_with_prefix("_rust_rocksdb_get_map_property");
......
......@@ -1092,6 +1092,12 @@ impl DBOptions {
}
}
pub fn set_atomic_flush(&self, enable: bool) {
unsafe {
crocksdb_ffi::crocksdb_options_set_atomic_flush(self.inner, enable);
}
}
pub fn get_db_paths_num(&self) -> usize {
unsafe { crocksdb_ffi::crocksdb_options_get_db_paths_num(self.inner) }
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment