Commit d65ca9df authored by Huachao Huang's avatar Huachao Huang Committed by dorianzheng

Provide an optimized file ingestion method (#218) (#270)

parent c4d27e4f
......@@ -3352,6 +3352,41 @@ void crocksdb_ingest_external_file_cf(
SaveError(errptr, db->rep->IngestExternalFile(handle->rep, files, opt->rep));
}
bool crocksdb_ingest_external_file_optimized(
crocksdb_t* db, crocksdb_column_family_handle_t* handle,
const char* const* file_list, const size_t list_len,
const crocksdb_ingestexternalfileoptions_t* opt, char** errptr) {
std::vector<std::string> files(list_len);
for (size_t i = 0; i < list_len; ++i) {
files[i] = std::string(file_list[i]);
}
bool has_flush = false;
// If the file being ingested is overlapped with the memtable, it
// will block writes and wait for flushing, which can cause high
// write latency. So we set `allow_blocking_flush = false`.
auto ingest_opts = opt->rep;
ingest_opts.allow_blocking_flush = false;
auto s = db->rep->IngestExternalFile(handle->rep, files, ingest_opts);
if (s.IsInvalidArgument() &&
s.ToString().find("External file requires flush") != std::string::npos) {
// When `allow_blocking_flush = false` and the file being ingested
// is overlapped with the memtable, `IngestExternalFile` returns
// an invalid argument error. It is tricky to search for the
// specific error message here but don't worry, the unit test
// ensures that we get this right. Then we can try to flush the
// memtable outside without blocking writes.
has_flush = true;
FlushOptions flush_opts;
flush_opts.wait = true;
// We don't check the status of this flush because we will
// fallback to a blocking ingestion anyway.
db->rep->Flush(flush_opts, handle->rep);
s = db->rep->IngestExternalFile(handle->rep, files, opt->rep);
}
SaveError(errptr, s);
return has_flush;
}
crocksdb_slicetransform_t* crocksdb_slicetransform_create(
void* state,
void (*destructor)(void*),
......
......@@ -1375,6 +1375,10 @@ extern C_ROCKSDB_LIBRARY_API void crocksdb_ingest_external_file_cf(
crocksdb_t* db, crocksdb_column_family_handle_t* handle,
const char* const* file_list, const size_t list_len,
const crocksdb_ingestexternalfileoptions_t* opt, char** errptr);
extern C_ROCKSDB_LIBRARY_API bool crocksdb_ingest_external_file_optimized(
crocksdb_t* db, crocksdb_column_family_handle_t* handle,
const char* const* file_list, const size_t list_len,
const crocksdb_ingestexternalfileoptions_t* opt, char** errptr);
/* SliceTransform */
......
......@@ -1295,6 +1295,14 @@ extern "C" {
opt: *const IngestExternalFileOptions,
err: *mut *mut c_char,
);
pub fn crocksdb_ingest_external_file_optimized(
db: *mut DBInstance,
handle: *const DBCFHandle,
file_list: *const *const c_char,
list_len: size_t,
opt: *const IngestExternalFileOptions,
err: *mut *mut c_char,
) -> bool;
// Restore Option
pub fn crocksdb_restore_options_create() -> *mut DBRestoreOptions;
......
......@@ -1297,6 +1297,30 @@ impl DB {
Ok(())
}
/// An optimized version of `ingest_external_file_cf`. It will
/// first try to ingest files without blocking and fallback to a
/// blocking ingestion if the optimization fails.
/// Returns true if a memtable is flushed without blocking.
pub fn ingest_external_file_optimized(
&self,
cf: &CFHandle,
opt: &IngestExternalFileOptions,
files: &[&str],
) -> Result<bool, String> {
let c_files = build_cstring_list(files);
let c_files_ptrs: Vec<*const _> = c_files.iter().map(|s| s.as_ptr()).collect();
let has_flush = unsafe {
ffi_try!(crocksdb_ingest_external_file_optimized(
self.inner,
cf.inner,
c_files_ptrs.as_ptr(),
c_files_ptrs.len(),
opt.inner
))
};
Ok(has_flush)
}
pub fn backup_at(&self, path: &str) -> Result<BackupEngine, String> {
let backup_engine = BackupEngine::open(DBOptions::new(), path).unwrap();
unsafe {
......
......@@ -868,7 +868,7 @@ impl DBOptions {
Err(_) => {
return Err(
"Failed to convert path to CString when creating rocksdb info log".to_owned(),
)
);
}
};
......
......@@ -468,3 +468,40 @@ fn test_set_external_sst_file_global_seq_no() {
.unwrap();
check_kv(&db, None, &[(b"k1", Some(b"v1")), (b"k2", Some(b"v2"))]);
}
#[test]
fn test_ingest_external_file_optimized() {
let path = TempDir::new("_rust_rocksdb_ingest_sst_optimized").expect("");
let db = create_default_database(&path);
let gen_path = TempDir::new("_rust_rocksdb_ingest_sst_gen_new_cf").expect("");
let test_sstfile = gen_path.path().join("test_sst_file_optimized");
let test_sstfile_str = test_sstfile.to_str().unwrap();
let handle = db.cf_handle("default").unwrap();
let ingest_opt = IngestExternalFileOptions::new();
gen_sst_put(ColumnFamilyOptions::new(), None, test_sstfile_str);
db.put_cf(handle, b"k0", b"k0").unwrap();
// No overlap with the memtable.
let has_flush = db
.ingest_external_file_optimized(handle, &ingest_opt, &[test_sstfile_str])
.unwrap();
assert!(!has_flush);
assert!(test_sstfile.exists());
assert_eq!(db.get_cf(handle, b"k1").unwrap().unwrap(), b"a");
assert_eq!(db.get_cf(handle, b"k2").unwrap().unwrap(), b"b");
assert_eq!(db.get_cf(handle, b"k3").unwrap().unwrap(), b"c");
db.put_cf(handle, b"k1", b"k1").unwrap();
// Overlap with the memtable.
let has_flush = db
.ingest_external_file_optimized(handle, &ingest_opt, &[test_sstfile_str])
.unwrap();
assert!(has_flush);
assert!(test_sstfile.exists());
assert_eq!(db.get_cf(handle, b"k1").unwrap().unwrap(), b"a");
assert_eq!(db.get_cf(handle, b"k2").unwrap().unwrap(), b"b");
assert_eq!(db.get_cf(handle, b"k3").unwrap().unwrap(), b"c");
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment