Commit 1cba2117 authored by follitude's avatar follitude Committed by zhangjinpeng1987

support new DB operations (#108)

parent 19c24b9b
......@@ -2800,8 +2800,7 @@ void crocksdb_envoptions_destroy(crocksdb_envoptions_t* opt) { delete opt; }
crocksdb_sstfilewriter_t* crocksdb_sstfilewriter_create(
const crocksdb_envoptions_t* env, const crocksdb_options_t* io_options) {
crocksdb_sstfilewriter_t* writer = new crocksdb_sstfilewriter_t;
writer->rep =
new SstFileWriter(env->rep, io_options->rep, io_options->rep.comparator);
writer->rep = new SstFileWriter(env->rep, io_options->rep);
return writer;
}
......@@ -2810,17 +2809,7 @@ crocksdb_sstfilewriter_t* crocksdb_sstfilewriter_create_cf(
crocksdb_column_family_handle_t* column_family) {
crocksdb_sstfilewriter_t* writer = new crocksdb_sstfilewriter_t;
writer->rep =
new SstFileWriter(env->rep, io_options->rep, io_options->rep.comparator, column_family->rep);
return writer;
}
crocksdb_sstfilewriter_t* crocksdb_sstfilewriter_create_with_comparator(
const crocksdb_envoptions_t* env, const crocksdb_options_t* io_options,
const crocksdb_comparator_t* comparator,
crocksdb_column_family_handle_t* column_family) {
crocksdb_sstfilewriter_t* writer = new crocksdb_sstfilewriter_t;
writer->rep =
new SstFileWriter(env->rep, io_options->rep, comparator, column_family->rep);
new SstFileWriter(env->rep, io_options->rep, column_family->rep);
return writer;
}
......@@ -2829,12 +2818,31 @@ void crocksdb_sstfilewriter_open(crocksdb_sstfilewriter_t* writer,
SaveError(errptr, writer->rep->Open(std::string(name)));
}
void crocksdb_sstfilewriter_add(crocksdb_sstfilewriter_t* writer, const char* key,
size_t keylen, const char* val, size_t vallen,
char** errptr) {
void crocksdb_sstfilewriter_add(crocksdb_sstfilewriter_t *writer,
const char *key, size_t keylen, const char *val,
size_t vallen, char **errptr) {
SaveError(errptr, writer->rep->Add(Slice(key, keylen), Slice(val, vallen)));
}
void crocksdb_sstfilewriter_put(crocksdb_sstfilewriter_t *writer,
const char *key, size_t keylen, const char *val,
size_t vallen, char **errptr) {
SaveError(errptr, writer->rep->Put(Slice(key, keylen), Slice(val, vallen)));
}
void crocksdb_sstfilewriter_merge(crocksdb_sstfilewriter_t *writer,
const char *key, size_t keylen,
const char *val, size_t vallen,
char **errptr) {
SaveError(errptr, writer->rep->Merge(Slice(key, keylen), Slice(val, vallen)));
}
void crocksdb_sstfilewriter_delete(crocksdb_sstfilewriter_t *writer,
const char *key, size_t keylen,
char **errptr) {
SaveError(errptr, writer->rep->Delete(Slice(key, keylen)));
}
void crocksdb_sstfilewriter_finish(crocksdb_sstfilewriter_t* writer,
char** errptr) {
SaveError(errptr, writer->rep->Finish(NULL));
......
......@@ -1175,16 +1175,23 @@ extern C_ROCKSDB_LIBRARY_API crocksdb_sstfilewriter_t*
crocksdb_sstfilewriter_create_cf(const crocksdb_envoptions_t* env,
const crocksdb_options_t* io_options,
crocksdb_column_family_handle_t* column_family);
extern C_ROCKSDB_LIBRARY_API crocksdb_sstfilewriter_t*
crocksdb_sstfilewriter_create_with_comparator(
const crocksdb_envoptions_t* env, const crocksdb_options_t* io_options,
const crocksdb_comparator_t* comparator,
crocksdb_column_family_handle_t* column_family);
extern C_ROCKSDB_LIBRARY_API void crocksdb_sstfilewriter_open(
crocksdb_sstfilewriter_t* writer, const char* name, char** errptr);
extern C_ROCKSDB_LIBRARY_API void crocksdb_sstfilewriter_add(
crocksdb_sstfilewriter_t* writer, const char* key, size_t keylen,
const char* val, size_t vallen, char** errptr);
extern C_ROCKSDB_LIBRARY_API void
crocksdb_sstfilewriter_add(crocksdb_sstfilewriter_t *writer, const char *key,
size_t keylen, const char *val, size_t vallen,
char **errptr);
extern C_ROCKSDB_LIBRARY_API void
crocksdb_sstfilewriter_put(crocksdb_sstfilewriter_t *writer, const char *key,
size_t keylen, const char *val, size_t vallen,
char **errptr);
extern C_ROCKSDB_LIBRARY_API void
crocksdb_sstfilewriter_merge(crocksdb_sstfilewriter_t *writer, const char *key,
size_t keylen, const char *val, size_t vallen,
char **errptr);
extern C_ROCKSDB_LIBRARY_API void
crocksdb_sstfilewriter_delete(crocksdb_sstfilewriter_t *writer, const char *key,
size_t keylen, char **errptr);
extern C_ROCKSDB_LIBRARY_API void crocksdb_sstfilewriter_finish(
crocksdb_sstfilewriter_t* writer, char** errptr);
extern C_ROCKSDB_LIBRARY_API void crocksdb_sstfilewriter_destroy(
......
......@@ -798,11 +798,6 @@ extern "C" {
io_options: *const Options,
cf: *mut DBCFHandle)
-> *mut SstFileWriter;
pub fn crocksdb_sstfilewriter_create_with_comparator(env: *mut EnvOptions,
io_options: *const Options,
comparator: *const DBComparator,
cf: *mut DBCFHandle)
-> *mut SstFileWriter;
pub fn crocksdb_sstfilewriter_open(writer: *mut SstFileWriter,
name: *const c_char,
err: *mut *mut c_char);
......@@ -812,6 +807,22 @@ extern "C" {
val: *const u8,
val_len: size_t,
err: *mut *mut c_char);
pub fn crocksdb_sstfilewriter_put(writer: *mut SstFileWriter,
key: *const u8,
key_len: size_t,
val: *const u8,
val_len: size_t,
err: *mut *mut c_char);
pub fn crocksdb_sstfilewriter_merge(writer: *mut SstFileWriter,
key: *const u8,
key_len: size_t,
val: *const u8,
val_len: size_t,
err: *mut *mut c_char);
pub fn crocksdb_sstfilewriter_delete(writer: *mut SstFileWriter,
key: *const u8,
key_len: size_t,
err: *mut *mut c_char);
pub fn crocksdb_sstfilewriter_finish(writer: *mut SstFileWriter, err: *mut *mut c_char);
pub fn crocksdb_sstfilewriter_destroy(writer: *mut SstFileWriter);
......@@ -1219,9 +1230,9 @@ mod test {
assert!(err.is_null(), error_message(err));
crocksdb_sstfilewriter_add(writer, b"sstk1".as_ptr(), 5, b"v1".as_ptr(), 2, &mut err);
assert!(err.is_null(), error_message(err));
crocksdb_sstfilewriter_add(writer, b"sstk2".as_ptr(), 5, b"v2".as_ptr(), 2, &mut err);
crocksdb_sstfilewriter_put(writer, b"sstk2".as_ptr(), 5, b"v2".as_ptr(), 2, &mut err);
assert!(err.is_null(), error_message(err));
crocksdb_sstfilewriter_add(writer, b"sstk3".as_ptr(), 5, b"v3".as_ptr(), 2, &mut err);
crocksdb_sstfilewriter_put(writer, b"sstk3".as_ptr(), 5, b"v3".as_ptr(), 2, &mut err);
assert!(err.is_null(), error_message(err));
crocksdb_sstfilewriter_finish(writer, &mut err);
assert!(err.is_null(), error_message(err));
......@@ -1242,9 +1253,9 @@ mod test {
assert!(err.is_null(), error_message(err));
crocksdb_sstfilewriter_add(writer, "sstk2".as_ptr(), 5, "v4".as_ptr(), 2, &mut err);
assert!(err.is_null(), error_message(err));
crocksdb_sstfilewriter_add(writer, "sstk22".as_ptr(), 6, "v5".as_ptr(), 2, &mut err);
crocksdb_sstfilewriter_put(writer, "sstk22".as_ptr(), 6, "v5".as_ptr(), 2, &mut err);
assert!(err.is_null(), error_message(err));
crocksdb_sstfilewriter_add(writer, "sstk3".as_ptr(), 5, "v6".as_ptr(), 2, &mut err);
crocksdb_sstfilewriter_put(writer, "sstk3".as_ptr(), 5, "v6".as_ptr(), 2, &mut err);
assert!(err.is_null(), error_message(err));
crocksdb_sstfilewriter_finish(writer, &mut err);
assert!(err.is_null(), error_message(err));
......
......@@ -1514,6 +1514,35 @@ impl SstFileWriter {
}
}
pub fn put(&mut self, key: &[u8], val: &[u8]) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_sstfilewriter_put(self.inner,
key.as_ptr(),
key.len(),
val.as_ptr(),
val.len()));
Ok(())
}
}
pub fn merge(&mut self, key: &[u8], val: &[u8]) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_sstfilewriter_merge(self.inner,
key.as_ptr(),
key.len(),
val.as_ptr(),
val.len()));
Ok(())
}
}
pub fn delete(&mut self, key: &[u8]) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_sstfilewriter_delete(self.inner, key.as_ptr(), key.len()));
Ok(())
}
}
/// Finalize writing to sst file and close file.
pub fn finish(&mut self) -> Result<(), String> {
unsafe {
......
......@@ -31,9 +31,69 @@ pub fn gen_sst(opt: ColumnFamilyOptions,
for &(k, v) in data {
writer.add(k, v).unwrap();
}
writer.finish().unwrap();
}
fn gen_sst_put(opt: ColumnFamilyOptions, cf: Option<&CFHandle>, path: &str) {
let _ = fs::remove_file(path);
let env_opt = EnvOptions::new();
let mut writer = if cf.is_some() {
SstFileWriter::new_cf(env_opt, opt, cf.unwrap())
} else {
SstFileWriter::new(env_opt, opt)
};
writer.open(path).unwrap();
writer.put(b"k1", b"a").unwrap();
writer.put(b"k2", b"b").unwrap();
writer.put(b"k3", b"c").unwrap();
writer.finish().unwrap();
}
fn gen_sst_merge(opt: ColumnFamilyOptions, cf: Option<&CFHandle>, path: &str) {
let _ = fs::remove_file(path);
let env_opt = EnvOptions::new();
let mut writer = if cf.is_some() {
SstFileWriter::new_cf(env_opt, opt, cf.unwrap())
} else {
SstFileWriter::new(env_opt, opt)
};
writer.open(path).unwrap();
writer.merge(b"k3", b"d").unwrap();
writer.finish().unwrap();
}
fn gen_sst_delete(opt: ColumnFamilyOptions, cf: Option<&CFHandle>, path: &str) {
let _ = fs::remove_file(path);
let env_opt = EnvOptions::new();
let mut writer = if cf.is_some() {
SstFileWriter::new_cf(env_opt, opt, cf.unwrap())
} else {
SstFileWriter::new(env_opt, opt)
};
writer.open(path).unwrap();
writer.delete(b"k3").unwrap();
writer.finish().unwrap();
}
fn concat_merge(_: &[u8], existing_val: Option<&[u8]>, operands: &mut MergeOperands) -> Vec<u8> {
let mut result: Vec<u8> = Vec::with_capacity(operands.size_hint().0);
match existing_val {
Some(v) => {
for e in v {
result.push(*e)
}
}
None => (),
}
for op in operands {
for e in op {
result.push(*e);
}
}
result
}
#[test]
fn test_ingest_external_file() {
let path = TempDir::new("_rust_rocksdb_ingest_sst").expect("");
......@@ -45,17 +105,15 @@ fn test_ingest_external_file() {
let cf_opts = ColumnFamilyOptions::new();
db.create_cf("cf1", cf_opts).unwrap();
let handle = db.cf_handle("cf1").unwrap();
let gen_path = TempDir::new("_rust_rocksdb_ingest_sst_gen").expect("");
let test_sstfile = gen_path.path().join("test_sst_file");
let test_sstfile_str = test_sstfile.to_str().unwrap();
let default_options = db.get_options();
gen_sst(default_options,
Some(db.cf_handle("default").unwrap()),
test_sstfile_str,
&[(b"k1", b"v1"), (b"k2", b"v2")]);
let mut ingest_opt = IngestExternalFileOptions::new();
db.ingest_external_file(&ingest_opt, &[test_sstfile_str])
.unwrap();
......@@ -71,7 +129,6 @@ fn test_ingest_external_file() {
.unwrap();
assert_eq!(db.get_cf(handle, b"k1").unwrap().unwrap(), b"v3");
assert_eq!(db.get_cf(handle, b"k2").unwrap().unwrap(), b"v4");
let snap = db.snapshot();
gen_sst(ColumnFamilyOptions::new(),
......@@ -88,4 +145,104 @@ fn test_ingest_external_file() {
assert_eq!(snap.get_cf(handle, b"k1").unwrap().unwrap(), b"v3");
assert_eq!(snap.get_cf(handle, b"k2").unwrap().unwrap(), b"v4");
assert!(snap.get_cf(handle, b"k3").unwrap().is_none());
}
#[test]
fn test_ingest_external_file_new() {
let path = TempDir::new("_rust_rocksdb_ingest_sst_new").expect("");
let path_str = path.path().to_str().unwrap();
let mut opts = DBOptions::new();
opts.create_if_missing(true);
let mut cf_opts = ColumnFamilyOptions::new();
cf_opts.add_merge_operator("merge operator", concat_merge);
let db = DB::open_cf(opts, path_str, vec!["default"], vec![cf_opts]).unwrap();
let gen_path = TempDir::new("_rust_rocksdb_ingest_sst_gen_new").expect("");
let test_sstfile = gen_path.path().join("test_sst_file_new");
let test_sstfile_str = test_sstfile.to_str().unwrap();
let default_options = db.get_options();
gen_sst_put(default_options,
Some(db.cf_handle("default").unwrap()),
test_sstfile_str);
let mut ingest_opt = IngestExternalFileOptions::new();
db.ingest_external_file(&ingest_opt, &[test_sstfile_str])
.unwrap();
assert!(test_sstfile.exists());
assert_eq!(db.get(b"k1").unwrap().unwrap(), b"a");
assert_eq!(db.get(b"k2").unwrap().unwrap(), b"b");
assert_eq!(db.get(b"k3").unwrap().unwrap(), b"c");
let snap = db.snapshot();
let default_options = db.get_options();
gen_sst_merge(default_options,
Some(db.cf_handle("default").unwrap()),
test_sstfile_str);
db.ingest_external_file(&ingest_opt, &[test_sstfile_str])
.unwrap();
assert_eq!(db.get(b"k1").unwrap().unwrap(), b"a");
assert_eq!(db.get(b"k2").unwrap().unwrap(), b"b");
assert_eq!(db.get(b"k3").unwrap().unwrap(), b"cd");
let default_options = db.get_options();
gen_sst_delete(default_options,
Some(db.cf_handle("default").unwrap()),
test_sstfile_str);
ingest_opt.move_files(true);
db.ingest_external_file(&ingest_opt, &[test_sstfile_str])
.unwrap();
assert_eq!(db.get(b"k1").unwrap().unwrap(), b"a");
assert_eq!(db.get(b"k2").unwrap().unwrap(), b"b");
assert!(db.get(b"k3").unwrap().is_none());
assert_eq!(snap.get(b"k1").unwrap().unwrap(), b"a");
assert_eq!(snap.get(b"k2").unwrap().unwrap(), b"b");
assert_eq!(snap.get(b"k3").unwrap().unwrap(), b"c");
}
#[test]
fn test_ingest_external_file_new_cf() {
let path = TempDir::new("_rust_rocksdb_ingest_sst_new_cf").expect("");
let path_str = path.path().to_str().unwrap();
let mut opts = DBOptions::new();
opts.create_if_missing(true);
let mut db = DB::open(opts, path_str).unwrap();
let gen_path = TempDir::new("_rust_rocksdb_ingest_sst_gen_new_cf").expect("");
let test_sstfile = gen_path.path().join("test_sst_file_new_cf");
let test_sstfile_str = test_sstfile.to_str().unwrap();
let mut cf_opts = ColumnFamilyOptions::new();
cf_opts.add_merge_operator("merge operator", concat_merge);
db.create_cf("cf1", cf_opts).unwrap();
let handle = db.cf_handle("cf1").unwrap();
let mut ingest_opt = IngestExternalFileOptions::new();
gen_sst_put(ColumnFamilyOptions::new(), None, test_sstfile_str);
db.ingest_external_file_cf(handle, &ingest_opt, &[test_sstfile_str])
.unwrap();
assert!(test_sstfile.exists());
assert_eq!(db.get_cf(handle, b"k1").unwrap().unwrap(), b"a");
assert_eq!(db.get_cf(handle, b"k2").unwrap().unwrap(), b"b");
assert_eq!(db.get_cf(handle, b"k3").unwrap().unwrap(), b"c");
let snap = db.snapshot();
ingest_opt.move_files(true);
gen_sst_merge(ColumnFamilyOptions::new(), None, test_sstfile_str);
db.ingest_external_file_cf(handle, &ingest_opt, &[test_sstfile_str])
.unwrap();
assert_eq!(db.get_cf(handle, b"k1").unwrap().unwrap(), b"a");
assert_eq!(db.get_cf(handle, b"k2").unwrap().unwrap(), b"b");
assert_eq!(db.get_cf(handle, b"k3").unwrap().unwrap(), b"cd");
gen_sst_delete(ColumnFamilyOptions::new(), None, test_sstfile_str);
db.ingest_external_file_cf(handle, &ingest_opt, &[test_sstfile_str])
.unwrap();
assert_eq!(db.get_cf(handle, b"k1").unwrap().unwrap(), b"a");
assert_eq!(db.get_cf(handle, b"k2").unwrap().unwrap(), b"b");
assert!(db.get_cf(handle, b"k3").unwrap().is_none());
assert_eq!(snap.get_cf(handle, b"k1").unwrap().unwrap(), b"a");
assert_eq!(snap.get_cf(handle, b"k2").unwrap().unwrap(), b"b");
assert_eq!(snap.get_cf(handle, b"k3").unwrap().unwrap(), b"c");
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment