Commit 20edf912 authored by Dylan Wen's avatar Dylan Wen Committed by siddontang

support ingest external file (#77)

parent 1c921cc9
...@@ -35,6 +35,9 @@ pub enum DBWriteBatch {} ...@@ -35,6 +35,9 @@ pub enum DBWriteBatch {}
pub enum DBComparator {} pub enum DBComparator {}
pub enum DBFlushOptions {} pub enum DBFlushOptions {}
pub enum DBCompactionFilter {} pub enum DBCompactionFilter {}
pub enum EnvOptions {}
pub enum SstFileWriter {}
pub enum IngestExternalFileOptions {}
pub enum DBBackupEngine {} pub enum DBBackupEngine {}
pub enum DBRestoreOptions {} pub enum DBRestoreOptions {}
pub enum DBSliceTransform {} pub enum DBSliceTransform {}
...@@ -509,6 +512,54 @@ extern "C" { ...@@ -509,6 +512,54 @@ extern "C" {
ignore_snapshot: bool); ignore_snapshot: bool);
pub fn crocksdb_compactionfilter_destroy(filter: *mut DBCompactionFilter); pub fn crocksdb_compactionfilter_destroy(filter: *mut DBCompactionFilter);
// EnvOptions
pub fn crocksdb_envoptions_create() -> *mut EnvOptions;
pub fn crocksdb_envoptions_destroy(opt: *mut EnvOptions);
// IngestExternalFileOptions
pub fn crocksdb_ingestexternalfileoptions_create() -> *mut IngestExternalFileOptions;
pub fn crocksdb_ingestexternalfileoptions_set_move_files(opt: *mut IngestExternalFileOptions,
move_files: bool);
pub fn crocksdb_ingestexternalfileoptions_set_snapshot_consistency(
opt: *mut IngestExternalFileOptions, snapshot_consistency: bool);
pub fn crocksdb_ingestexternalfileoptions_set_allow_global_seqno(
opt: *mut IngestExternalFileOptions, allow_global_seqno: bool);
pub fn crocksdb_ingestexternalfileoptions_set_allow_blocking_flush(
opt: *mut IngestExternalFileOptions, allow_blocking_flush: bool);
pub fn crocksdb_ingestexternalfileoptions_destroy(opt: *mut IngestExternalFileOptions);
// SstFileWriter
pub fn crocksdb_sstfilewriter_create(env: *mut EnvOptions,
io_options: *const DBOptions)
-> *mut SstFileWriter;
pub fn crocksdb_sstfilewriter_create_with_comparator(env: *mut EnvOptions,
io_options: *const DBOptions,
comparator: *const DBComparator)
-> *mut SstFileWriter;
pub fn crocksdb_sstfilewriter_open(writer: *mut SstFileWriter,
name: *const c_char,
err: *mut *mut c_char);
pub fn crocksdb_sstfilewriter_add(writer: *mut SstFileWriter,
key: *const u8,
key_len: size_t,
val: *const u8,
val_len: size_t,
err: *mut *mut c_char);
pub fn crocksdb_sstfilewriter_finish(writer: *mut SstFileWriter, err: *mut *mut c_char);
pub fn crocksdb_sstfilewriter_destroy(writer: *mut SstFileWriter);
pub fn crocksdb_ingest_external_file(db: *mut DBInstance,
file_list: *const *const c_char,
list_len: size_t,
opt: *const IngestExternalFileOptions,
err: *mut *mut c_char);
pub fn crocksdb_ingest_external_file_cf(db: *mut DBInstance,
handle: *const DBCFHandle,
file_list: *const *const c_char,
list_len: size_t,
opt: *const IngestExternalFileOptions,
err: *mut *mut c_char);
// Restore Option // Restore Option
pub fn crocksdb_restore_options_create() -> *mut DBRestoreOptions; pub fn crocksdb_restore_options_create() -> *mut DBRestoreOptions;
pub fn crocksdb_restore_options_destroy(ropts: *mut DBRestoreOptions); pub fn crocksdb_restore_options_destroy(ropts: *mut DBRestoreOptions);
...@@ -551,8 +602,8 @@ extern "C" { ...@@ -551,8 +602,8 @@ extern "C" {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use libc::{self, c_void}; use libc::{self, c_void};
use std::{ptr, slice, fs};
use std::ffi::{CStr, CString}; use std::ffi::{CStr, CString};
use std::ptr;
use super::*; use super::*;
use tempdir::TempDir; use tempdir::TempDir;
...@@ -635,4 +686,104 @@ mod test { ...@@ -635,4 +686,104 @@ mod test {
assert!(err.is_null()); assert!(err.is_null());
} }
} }
unsafe fn check_get(db: *mut DBInstance,
opt: *const DBReadOptions,
key: &[u8],
val: Option<&[u8]>) {
let mut val_len = 0;
let mut err = ptr::null_mut();
let res_ptr = crocksdb_get(db, opt, key.as_ptr(), key.len(), &mut val_len, &mut err);
assert!(err.is_null());
let res = if res_ptr.is_null() {
None
} else {
Some(slice::from_raw_parts(res_ptr, val_len))
};
assert_eq!(res, val);
if !res_ptr.is_null() {
libc::free(res_ptr as *mut libc::c_void);
}
}
#[test]
fn test_ingest_external_file() {
unsafe {
let opts = crocksdb_options_create();
crocksdb_options_set_create_if_missing(opts, true);
let rustpath = TempDir::new("_rust_rocksdb_internaltest").expect("");
let cpath = CString::new(rustpath.path().to_str().unwrap()).unwrap();
let cpath_ptr = cpath.as_ptr();
let mut err = ptr::null_mut();
let db = crocksdb_open(opts, cpath_ptr, &mut err);
assert!(err.is_null(), error_message(err));
let env_opt = crocksdb_envoptions_create();
let io_options = crocksdb_options_create();
let writer = crocksdb_sstfilewriter_create(env_opt, io_options);
let sst_dir = TempDir::new("_rust_rocksdb_internaltest").expect("");
let sst_path = sst_dir.path().join("sstfilename");
let c_sst_path = CString::new(sst_path.to_str().unwrap()).unwrap();
let c_sst_path_ptr = c_sst_path.as_ptr();
crocksdb_sstfilewriter_open(writer, c_sst_path_ptr, &mut err);
assert!(err.is_null(), error_message(err));
crocksdb_sstfilewriter_add(writer, b"sstk1".as_ptr(), 5, b"v1".as_ptr(), 2, &mut err);
assert!(err.is_null(), error_message(err));
crocksdb_sstfilewriter_add(writer, b"sstk2".as_ptr(), 5, b"v2".as_ptr(), 2, &mut err);
assert!(err.is_null(), error_message(err));
crocksdb_sstfilewriter_add(writer, b"sstk3".as_ptr(), 5, b"v3".as_ptr(), 2, &mut err);
assert!(err.is_null(), error_message(err));
crocksdb_sstfilewriter_finish(writer, &mut err);
assert!(err.is_null(), error_message(err));
let ing_opt = crocksdb_ingestexternalfileoptions_create();
let file_list = &[c_sst_path_ptr];
crocksdb_ingest_external_file(db, file_list.as_ptr(), 1, ing_opt, &mut err);
assert!(err.is_null(), error_message(err));
let roptions = crocksdb_readoptions_create();
check_get(db, roptions, b"sstk1", Some(b"v1"));
check_get(db, roptions, b"sstk2", Some(b"v2"));
check_get(db, roptions, b"sstk3", Some(b"v3"));
let snap = crocksdb_create_snapshot(db);
fs::remove_file(sst_path).unwrap();
crocksdb_sstfilewriter_open(writer, c_sst_path_ptr, &mut err);
assert!(err.is_null(), error_message(err));
crocksdb_sstfilewriter_add(writer, "sstk2".as_ptr(), 5, "v4".as_ptr(), 2, &mut err);
assert!(err.is_null(), error_message(err));
crocksdb_sstfilewriter_add(writer, "sstk22".as_ptr(), 6, "v5".as_ptr(), 2, &mut err);
assert!(err.is_null(), error_message(err));
crocksdb_sstfilewriter_add(writer, "sstk3".as_ptr(), 5, "v6".as_ptr(), 2, &mut err);
assert!(err.is_null(), error_message(err));
crocksdb_sstfilewriter_finish(writer, &mut err);
assert!(err.is_null(), error_message(err));
crocksdb_ingest_external_file(db, file_list.as_ptr(), 1, ing_opt, &mut err);
assert!(err.is_null(), error_message(err));
check_get(db, roptions, b"sstk1", Some(b"v1"));
check_get(db, roptions, b"sstk2", Some(b"v4"));
check_get(db, roptions, b"sstk22", Some(b"v5"));
check_get(db, roptions, b"sstk3", Some(b"v6"));
let roptions2 = crocksdb_readoptions_create();
crocksdb_readoptions_set_snapshot(roptions2, snap);
check_get(db, roptions2, b"sstk1", Some(b"v1"));
check_get(db, roptions2, b"sstk2", Some(b"v2"));
check_get(db, roptions2, b"sstk22", None);
check_get(db, roptions2, b"sstk3", Some(b"v3"));
crocksdb_readoptions_destroy(roptions2);
crocksdb_readoptions_destroy(roptions);
crocksdb_release_snapshot(db, snap);
crocksdb_ingestexternalfileoptions_destroy(ing_opt);
crocksdb_sstfilewriter_destroy(writer);
crocksdb_options_destroy(io_options);
crocksdb_envoptions_destroy(env_opt);
}
}
} }
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
use librocksdb_sys;
use rocksdb_options::{Options, EnvOptions};
use std::ffi::CString;
/// SstFileWriter is used to create sst files that can be added to database later
/// All keys in files generated by SstFileWriter will have sequence number = 0
pub struct SstFileWriter {
inner: *mut librocksdb_sys::SstFileWriter,
}
impl SstFileWriter {
pub fn new(env_opt: &EnvOptions, opt: &Options) -> SstFileWriter {
unsafe {
SstFileWriter {
inner: librocksdb_sys::crocksdb_sstfilewriter_create(env_opt.inner, opt.inner),
}
}
}
/// Prepare SstFileWriter to write into file located at "file_path".
pub fn open(&mut self, name: &str) -> Result<(), String> {
let path = match CString::new(name.to_owned()) {
Err(e) => return Err(format!("invalid path {}: {:?}", name, e)),
Ok(p) => p,
};
unsafe { Ok(ffi_try!(crocksdb_sstfilewriter_open(self.inner, path.as_ptr()))) }
}
/// Add key, value to currently opened file
/// REQUIRES: key is after any previously added key according to comparator.
pub fn add(&mut self, key: &[u8], val: &[u8]) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_sstfilewriter_add(self.inner,
key.as_ptr(),
key.len(),
val.as_ptr(),
val.len()));
Ok(())
}
}
/// Finalize writing to sst file and close file.
pub fn finish(&mut self) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_sstfilewriter_finish(self.inner));
Ok(())
}
}
}
impl Drop for SstFileWriter {
fn drop(&mut self) {
unsafe { librocksdb_sys::crocksdb_sstfilewriter_destroy(self.inner) }
}
}
...@@ -25,13 +25,16 @@ pub mod rocksdb_options; ...@@ -25,13 +25,16 @@ pub mod rocksdb_options;
pub mod merge_operator; pub mod merge_operator;
pub mod comparator; pub mod comparator;
mod compaction_filter; mod compaction_filter;
mod external_file;
mod slice_transform; mod slice_transform;
pub use compaction_filter::CompactionFilter; pub use compaction_filter::CompactionFilter;
pub use external_file::SstFileWriter;
pub use librocksdb_sys::{DBCompactionStyle, DBCompressionType, DBRecoveryMode, new_bloom_filter, pub use librocksdb_sys::{DBCompactionStyle, DBCompressionType, DBRecoveryMode, new_bloom_filter,
self as crocksdb_ffi}; self as crocksdb_ffi};
pub use merge_operator::MergeOperands; pub use merge_operator::MergeOperands;
pub use rocksdb::{DB, DBIterator, DBVector, Kv, SeekKey, Writable, WriteBatch, CFHandle, Range, pub use rocksdb::{DB, DBIterator, DBVector, Kv, SeekKey, Writable, WriteBatch, CFHandle, Range,
BackupEngine}; BackupEngine};
pub use rocksdb_options::{BlockBasedOptions, Options, ReadOptions, WriteOptions, RestoreOptions}; pub use rocksdb_options::{BlockBasedOptions, Options, ReadOptions, WriteOptions, RestoreOptions,
IngestExternalFileOptions, EnvOptions};
pub use slice_transform::SliceTransform; pub use slice_transform::SliceTransform;
...@@ -15,15 +15,15 @@ ...@@ -15,15 +15,15 @@
use crocksdb_ffi::{self, DBWriteBatch, DBCFHandle, DBInstance, DBBackupEngine}; use crocksdb_ffi::{self, DBWriteBatch, DBCFHandle, DBInstance, DBBackupEngine};
use libc::{self, c_int, c_void, size_t}; use libc::{self, c_int, c_void, size_t};
use rocksdb_options::{Options, ReadOptions, UnsafeSnap, WriteOptions, FlushOptions, RestoreOptions}; use rocksdb_options::{Options, ReadOptions, UnsafeSnap, WriteOptions, FlushOptions,
RestoreOptions, IngestExternalFileOptions};
use std::{fs, ptr, slice};
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::collections::btree_map::Entry; use std::collections::btree_map::Entry;
use std::ffi::{CStr, CString}; use std::ffi::{CStr, CString};
use std::fs; use std::fmt::{self, Debug, Formatter};
use std::ops::Deref; use std::ops::Deref;
use std::path::Path; use std::path::Path;
use std::ptr;
use std::slice;
use std::str::from_utf8; use std::str::from_utf8;
const DEFAULT_COLUMN_FAMILY: &'static str = "default"; const DEFAULT_COLUMN_FAMILY: &'static str = "default";
...@@ -40,6 +40,10 @@ impl Drop for CFHandle { ...@@ -40,6 +40,10 @@ impl Drop for CFHandle {
} }
} }
fn build_cstring_list(str_list: &[&str]) -> Vec<CString> {
str_list.into_iter().map(|s| CString::new(s.as_bytes()).unwrap()).collect()
}
pub struct DB { pub struct DB {
inner: *mut DBInstance, inner: *mut DBInstance,
cfs: BTreeMap<String, CFHandle>, cfs: BTreeMap<String, CFHandle>,
...@@ -323,9 +327,7 @@ impl DB { ...@@ -323,9 +327,7 @@ impl DB {
// We need to store our CStrings in an intermediate vector // We need to store our CStrings in an intermediate vector
// so that their pointers remain valid. // so that their pointers remain valid.
let c_cfs: Vec<CString> = cfs_v.iter() let c_cfs = build_cstring_list(&cfs_v);
.map(|cf| CString::new(cf.as_bytes()).unwrap())
.collect();
let cfnames: Vec<*const _> = c_cfs.iter() let cfnames: Vec<*const _> = c_cfs.iter()
.map(|cf| cf.as_ptr()) .map(|cf| cf.as_ptr())
...@@ -855,6 +857,42 @@ impl DB { ...@@ -855,6 +857,42 @@ impl DB {
self.opts.get_statistics() self.opts.get_statistics()
} }
pub fn get_options(&self) -> &Options {
&self.opts
}
pub fn ingest_external_file(&self,
opt: &IngestExternalFileOptions,
files: &[&str])
-> Result<(), String> {
let c_files = build_cstring_list(files);
let c_files_ptrs: Vec<*const _> = c_files.iter().map(|s| s.as_ptr()).collect();
unsafe {
ffi_try!(crocksdb_ingest_external_file(self.inner,
c_files_ptrs.as_ptr(),
c_files.len(),
opt.inner));
}
Ok(())
}
pub fn ingest_external_file_cf(&self,
cf: &CFHandle,
opt: &IngestExternalFileOptions,
files: &[&str])
-> Result<(), String> {
let c_files = build_cstring_list(files);
let c_files_ptrs: Vec<*const _> = c_files.iter().map(|s| s.as_ptr()).collect();
unsafe {
ffi_try!(crocksdb_ingest_external_file_cf(self.inner,
cf.inner,
c_files_ptrs.as_ptr(),
c_files_ptrs.len(),
opt.inner));
}
Ok(())
}
pub fn backup_at(&self, path: &str) -> Result<BackupEngine, String> { pub fn backup_at(&self, path: &str) -> Result<BackupEngine, String> {
let backup_engine = BackupEngine::open(Options::new(), path).unwrap(); let backup_engine = BackupEngine::open(Options::new(), path).unwrap();
unsafe { unsafe {
...@@ -1067,6 +1105,29 @@ pub struct DBVector { ...@@ -1067,6 +1105,29 @@ pub struct DBVector {
len: usize, len: usize,
} }
impl Debug for DBVector {
fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
unsafe {
write!(formatter,
"{:?}",
slice::from_raw_parts(self.base, self.len))
}
}
}
impl<'a> PartialEq<&'a [u8]> for DBVector {
fn eq(&self, rhs: &&[u8]) -> bool {
if self.len != rhs.len() {
return false;
}
unsafe {
libc::memcmp(self.base as *mut c_void,
rhs.as_ptr() as *mut c_void,
self.len) == 0
}
}
}
impl Deref for DBVector { impl Deref for DBVector {
type Target = [u8]; type Target = [u8];
fn deref(&self) -> &[u8] { fn deref(&self) -> &[u8] {
......
...@@ -626,6 +626,87 @@ impl Drop for FlushOptions { ...@@ -626,6 +626,87 @@ impl Drop for FlushOptions {
} }
} }
/// IngestExternalFileOptions is used by DB::ingest_external_file
pub struct IngestExternalFileOptions {
pub inner: *mut crocksdb_ffi::IngestExternalFileOptions,
}
impl IngestExternalFileOptions {
pub fn new() -> IngestExternalFileOptions {
unsafe {
IngestExternalFileOptions {
inner: crocksdb_ffi::crocksdb_ingestexternalfileoptions_create(),
}
}
}
/// If set to false, an ingested file keys could appear in existing snapshots
/// that where created before the file was ingested.
pub fn snapshot_consistent(self, whether_consistent: bool) -> IngestExternalFileOptions {
unsafe {
crocksdb_ffi::crocksdb_ingestexternalfileoptions_set_snapshot_consistency(
self.inner, whether_consistent);
}
self
}
/// If set to false, DB::ingest_external_file() will fail if the file key range
/// overlaps with existing keys or tombstones in the DB.
pub fn allow_global_seqno(self, whether_allow: bool) -> IngestExternalFileOptions {
unsafe {
crocksdb_ffi::crocksdb_ingestexternalfileoptions_set_allow_global_seqno(self.inner,
whether_allow);
}
self
}
/// If set to false and the file key range overlaps with the memtable key range
/// (memtable flush required), DB::ingest_external_file will fail.
pub fn allow_blocking_flush(self, whether_allow: bool) -> IngestExternalFileOptions {
unsafe {
crocksdb_ffi::crocksdb_ingestexternalfileoptions_set_allow_blocking_flush(self.inner,
whether_allow);
}
self
}
/// Set to true to move the files instead of copying them.
pub fn move_files(self, whether_move: bool) -> IngestExternalFileOptions {
unsafe {
crocksdb_ffi::crocksdb_ingestexternalfileoptions_set_move_files(self.inner,
whether_move);
}
self
}
}
impl Drop for IngestExternalFileOptions {
fn drop(&mut self) {
unsafe {
crocksdb_ffi::crocksdb_ingestexternalfileoptions_destroy(self.inner);
}
}
}
/// Options while opening a file to read/write
pub struct EnvOptions {
pub inner: *mut crocksdb_ffi::EnvOptions,
}
impl EnvOptions {
pub fn new() -> EnvOptions {
unsafe { EnvOptions { inner: crocksdb_ffi::crocksdb_envoptions_create() } }
}
}
impl Drop for EnvOptions {
fn drop(&mut self) {
unsafe {
crocksdb_ffi::crocksdb_envoptions_destroy(self.inner);
}
}
}
pub struct RestoreOptions { pub struct RestoreOptions {
pub inner: *mut DBRestoreOptions, pub inner: *mut DBRestoreOptions,
} }
......
...@@ -7,4 +7,5 @@ mod test_column_family; ...@@ -7,4 +7,5 @@ mod test_column_family;
mod test_compaction_filter; mod test_compaction_filter;
mod test_compact_range; mod test_compact_range;
mod test_rocksdb_options; mod test_rocksdb_options;
mod test_ingest_external_file;
mod test_slice_transform; mod test_slice_transform;
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
use rocksdb::*;
use tempdir::TempDir;
use std::fs;
fn gen_sst(opt: &Options, path: &str, data: &[(&[u8], &[u8])]) {
let _ = fs::remove_file(path);
let env_opt = EnvOptions::new();
let mut writer = SstFileWriter::new(&env_opt, opt);
writer.open(path).unwrap();
for &(k, v) in data {
writer.add(k, v).unwrap();
}
writer.finish().unwrap();
}
#[test]
fn test_ingest_external_file() {
let path = TempDir::new("_rust_rocksdb_ingest_sst").expect("");
let path_str = path.path().to_str().unwrap();
let mut opts = Options::new();
opts.create_if_missing(true);
let mut db = DB::open(opts, path_str).unwrap();
let cf_opts = Options::new();
db.create_cf("cf1", &cf_opts).unwrap();
let handle = db.cf_handle("cf1").unwrap();
let gen_path = TempDir::new("_rust_rocksdb_ingest_sst_gen").expect("");
let test_sstfile = gen_path.path().join("test_sst_file");
let test_sstfile_str = test_sstfile.to_str().unwrap();
gen_sst(db.get_options(), test_sstfile_str, &[(b"k1", b"v1"), (b"k2", b"v2")]);
let mut ingest_opt = IngestExternalFileOptions::new();
db.ingest_external_file(&ingest_opt, &[test_sstfile_str]).unwrap();
assert!(test_sstfile.exists());
assert_eq!(db.get(b"k1").unwrap().unwrap(), b"v1");
assert_eq!(db.get(b"k2").unwrap().unwrap(), b"v2");
gen_sst(&cf_opts, test_sstfile_str, &[(b"k1", b"v3"), (b"k2", b"v4")]);
db.ingest_external_file_cf(handle, &ingest_opt, &[test_sstfile_str]).unwrap();
assert_eq!(db.get_cf(handle, b"k1").unwrap().unwrap(), b"v3");
assert_eq!(db.get_cf(handle, b"k2").unwrap().unwrap(), b"v4");
let snap = db.snapshot();
gen_sst(db.get_options(), test_sstfile_str, &[(b"k2", b"v5"), (b"k3", b"v6")]);
ingest_opt = ingest_opt.move_files(true);
db.ingest_external_file_cf(handle, &ingest_opt, &[test_sstfile_str]).unwrap();
assert_eq!(db.get_cf(handle, b"k1").unwrap().unwrap(), b"v3");
assert_eq!(db.get_cf(handle, b"k2").unwrap().unwrap(), b"v5");
assert_eq!(db.get_cf(handle, b"k3").unwrap().unwrap(), b"v6");
assert_eq!(snap.get_cf(handle, b"k1").unwrap().unwrap(), b"v3");
assert_eq!(snap.get_cf(handle, b"k2").unwrap().unwrap(), b"v4");
assert!(snap.get_cf(handle, b"k3").unwrap().is_none());
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment