Unverified Commit 9e472e35 authored by Huachao Huang's avatar Huachao Huang Committed by GitHub

Provide an optimized file ingestion method (#218)

parent 4e4edb7f
...@@ -3203,6 +3203,11 @@ void crocksdb_flushoptions_set_wait( ...@@ -3203,6 +3203,11 @@ void crocksdb_flushoptions_set_wait(
opt->rep.wait = v; opt->rep.wait = v;
} }
void crocksdb_flushoptions_set_allow_write_stall(
crocksdb_flushoptions_t* opt, unsigned char v) {
opt->rep.allow_write_stall = v;
}
crocksdb_cache_t* crocksdb_cache_create_lru(size_t capacity, crocksdb_cache_t* crocksdb_cache_create_lru(size_t capacity,
int num_shard_bits, unsigned char strict_capacity_limit, double high_pri_pool_ratio) { int num_shard_bits, unsigned char strict_capacity_limit, double high_pri_pool_ratio) {
crocksdb_cache_t* c = new crocksdb_cache_t; crocksdb_cache_t* c = new crocksdb_cache_t;
...@@ -3455,6 +3460,44 @@ void crocksdb_ingest_external_file_cf( ...@@ -3455,6 +3460,44 @@ void crocksdb_ingest_external_file_cf(
SaveError(errptr, db->rep->IngestExternalFile(handle->rep, files, opt->rep)); SaveError(errptr, db->rep->IngestExternalFile(handle->rep, files, opt->rep));
} }
bool crocksdb_ingest_external_file_optimized(
crocksdb_t* db, crocksdb_column_family_handle_t* handle,
const char* const* file_list, const size_t list_len,
const crocksdb_ingestexternalfileoptions_t* opt, char** errptr) {
std::vector<std::string> files(list_len);
for (size_t i = 0; i < list_len; ++i) {
files[i] = std::string(file_list[i]);
}
bool has_flush = false;
// If the file being ingested is overlapped with the memtable, it
// will block writes and wait for flushing, which can cause high
// write latency. So we set `allow_blocking_flush = false`.
auto ingest_opts = opt->rep;
ingest_opts.allow_blocking_flush = false;
auto s = db->rep->IngestExternalFile(handle->rep, files, ingest_opts);
if (s.IsInvalidArgument() &&
s.ToString().find("External file requires flush") != std::string::npos) {
// When `allow_blocking_flush = false` and the file being ingested
// is overlapped with the memtable, `IngestExternalFile` returns
// an invalid argument error. It is tricky to search for the
// specific error message here but don't worry, the unit test
// ensures that we get this right. Then we can try to flush the
// memtable outside without blocking writes. We also set
// `allow_write_stall = false` to prevent the flush from
// triggering write stall.
has_flush = true;
FlushOptions flush_opts;
flush_opts.wait = true;
flush_opts.allow_write_stall = false;
// We don't check the status of this flush because we will
// fallback to a blocking ingestion anyway.
db->rep->Flush(flush_opts, handle->rep);
s = db->rep->IngestExternalFile(handle->rep, files, opt->rep);
}
SaveError(errptr, s);
return has_flush;
}
crocksdb_slicetransform_t* crocksdb_slicetransform_create( crocksdb_slicetransform_t* crocksdb_slicetransform_create(
void* state, void* state,
void (*destructor)(void*), void (*destructor)(void*),
......
...@@ -1292,6 +1292,8 @@ extern C_ROCKSDB_LIBRARY_API void crocksdb_flushoptions_destroy( ...@@ -1292,6 +1292,8 @@ extern C_ROCKSDB_LIBRARY_API void crocksdb_flushoptions_destroy(
crocksdb_flushoptions_t*); crocksdb_flushoptions_t*);
extern C_ROCKSDB_LIBRARY_API void crocksdb_flushoptions_set_wait( extern C_ROCKSDB_LIBRARY_API void crocksdb_flushoptions_set_wait(
crocksdb_flushoptions_t*, unsigned char); crocksdb_flushoptions_t*, unsigned char);
extern C_ROCKSDB_LIBRARY_API void crocksdb_flushoptions_set_allow_write_stall(
crocksdb_flushoptions_t*, unsigned char);
/* Cache */ /* Cache */
...@@ -1409,6 +1411,10 @@ extern C_ROCKSDB_LIBRARY_API void crocksdb_ingest_external_file_cf( ...@@ -1409,6 +1411,10 @@ extern C_ROCKSDB_LIBRARY_API void crocksdb_ingest_external_file_cf(
crocksdb_t* db, crocksdb_column_family_handle_t* handle, crocksdb_t* db, crocksdb_column_family_handle_t* handle,
const char* const* file_list, const size_t list_len, const char* const* file_list, const size_t list_len,
const crocksdb_ingestexternalfileoptions_t* opt, char** errptr); const crocksdb_ingestexternalfileoptions_t* opt, char** errptr);
extern C_ROCKSDB_LIBRARY_API bool crocksdb_ingest_external_file_optimized(
crocksdb_t* db, crocksdb_column_family_handle_t* handle,
const char* const* file_list, const size_t list_len,
const crocksdb_ingestexternalfileoptions_t* opt, char** errptr);
/* SliceTransform */ /* SliceTransform */
......
...@@ -913,6 +913,7 @@ extern "C" { ...@@ -913,6 +913,7 @@ extern "C" {
pub fn crocksdb_flushoptions_create() -> *mut DBFlushOptions; pub fn crocksdb_flushoptions_create() -> *mut DBFlushOptions;
pub fn crocksdb_flushoptions_destroy(opt: *mut DBFlushOptions); pub fn crocksdb_flushoptions_destroy(opt: *mut DBFlushOptions);
pub fn crocksdb_flushoptions_set_wait(opt: *mut DBFlushOptions, whether_wait: bool); pub fn crocksdb_flushoptions_set_wait(opt: *mut DBFlushOptions, whether_wait: bool);
pub fn crocksdb_flushoptions_set_allow_write_stall(opt: *mut DBFlushOptions, allow: bool);
pub fn crocksdb_flush( pub fn crocksdb_flush(
db: *mut DBInstance, db: *mut DBInstance,
...@@ -1217,6 +1218,14 @@ extern "C" { ...@@ -1217,6 +1218,14 @@ extern "C" {
opt: *const IngestExternalFileOptions, opt: *const IngestExternalFileOptions,
err: *mut *mut c_char, err: *mut *mut c_char,
); );
pub fn crocksdb_ingest_external_file_optimized(
db: *mut DBInstance,
handle: *const DBCFHandle,
file_list: *const *const c_char,
list_len: size_t,
opt: *const IngestExternalFileOptions,
err: *mut *mut c_char,
) -> bool;
// Restore Option // Restore Option
pub fn crocksdb_restore_options_create() -> *mut DBRestoreOptions; pub fn crocksdb_restore_options_create() -> *mut DBRestoreOptions;
......
...@@ -1355,6 +1355,30 @@ impl DB { ...@@ -1355,6 +1355,30 @@ impl DB {
Ok(()) Ok(())
} }
/// An optimized version of `ingest_external_file_cf`. It will
/// first try to ingest files without blocking and fallback to a
/// blocking ingestion if the optimization fails.
/// Returns true if a memtable is flushed without blocking.
pub fn ingest_external_file_optimized(
&self,
cf: &CFHandle,
opt: &IngestExternalFileOptions,
files: &[&str],
) -> Result<bool, String> {
let c_files = build_cstring_list(files);
let c_files_ptrs: Vec<*const _> = c_files.iter().map(|s| s.as_ptr()).collect();
let has_flush = unsafe {
ffi_try!(crocksdb_ingest_external_file_optimized(
self.inner,
cf.inner,
c_files_ptrs.as_ptr(),
c_files_ptrs.len(),
opt.inner
))
};
Ok(has_flush)
}
pub fn backup_at(&self, path: &str) -> Result<BackupEngine, String> { pub fn backup_at(&self, path: &str) -> Result<BackupEngine, String> {
let backup_engine = BackupEngine::open(DBOptions::new(), path).unwrap(); let backup_engine = BackupEngine::open(DBOptions::new(), path).unwrap();
unsafe { unsafe {
......
...@@ -1490,6 +1490,12 @@ impl FlushOptions { ...@@ -1490,6 +1490,12 @@ impl FlushOptions {
crocksdb_ffi::crocksdb_flushoptions_set_wait(self.inner, wait); crocksdb_ffi::crocksdb_flushoptions_set_wait(self.inner, wait);
} }
} }
pub fn set_allow_write_stall(&mut self, allow: bool) {
unsafe {
crocksdb_ffi::crocksdb_flushoptions_set_allow_write_stall(self.inner, allow);
}
}
} }
impl Drop for FlushOptions { impl Drop for FlushOptions {
......
...@@ -464,3 +464,40 @@ fn test_set_external_sst_file_global_seq_no() { ...@@ -464,3 +464,40 @@ fn test_set_external_sst_file_global_seq_no() {
.unwrap(); .unwrap();
check_kv(&db, None, &[(b"k1", Some(b"v1")), (b"k2", Some(b"v2"))]); check_kv(&db, None, &[(b"k1", Some(b"v1")), (b"k2", Some(b"v2"))]);
} }
#[test]
fn test_ingest_external_file_optimized() {
let path = TempDir::new("_rust_rocksdb_ingest_sst_optimized").expect("");
let db = create_default_database(&path);
let gen_path = TempDir::new("_rust_rocksdb_ingest_sst_gen_new_cf").expect("");
let test_sstfile = gen_path.path().join("test_sst_file_optimized");
let test_sstfile_str = test_sstfile.to_str().unwrap();
let handle = db.cf_handle("default").unwrap();
let ingest_opt = IngestExternalFileOptions::new();
gen_sst_put(ColumnFamilyOptions::new(), None, test_sstfile_str);
db.put_cf(handle, b"k0", b"k0").unwrap();
// No overlap with the memtable.
let has_flush = db
.ingest_external_file_optimized(handle, &ingest_opt, &[test_sstfile_str])
.unwrap();
assert!(!has_flush);
assert!(test_sstfile.exists());
assert_eq!(db.get_cf(handle, b"k1").unwrap().unwrap(), b"a");
assert_eq!(db.get_cf(handle, b"k2").unwrap().unwrap(), b"b");
assert_eq!(db.get_cf(handle, b"k3").unwrap().unwrap(), b"c");
db.put_cf(handle, b"k1", b"k1").unwrap();
// Overlap with the memtable.
let has_flush = db
.ingest_external_file_optimized(handle, &ingest_opt, &[test_sstfile_str])
.unwrap();
assert!(has_flush);
assert!(test_sstfile.exists());
assert_eq!(db.get_cf(handle, b"k1").unwrap().unwrap(), b"a");
assert_eq!(db.get_cf(handle, b"k2").unwrap().unwrap(), b"b");
assert_eq!(db.get_cf(handle, b"k3").unwrap().unwrap(), b"c");
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment