Commit a302dfb3 authored by Cholerae Hu's avatar Cholerae Hu Committed by zhangjinpeng1987

tests: add more real world test case of ingest (#114)

parent a67bce54
......@@ -97,11 +97,7 @@ fn concat_merge(_: &[u8], existing_val: Option<&[u8]>, operands: &mut MergeOpera
#[test]
fn test_ingest_external_file() {
let path = TempDir::new("_rust_rocksdb_ingest_sst").expect("");
let path_str = path.path().to_str().unwrap();
let mut opts = DBOptions::new();
opts.create_if_missing(true);
let mut db = DB::open(opts, path_str).unwrap();
let mut db = create_default_database(&path);
let cf_opts = ColumnFamilyOptions::new();
db.create_cf("cf1", cf_opts).unwrap();
let handle = db.cf_handle("cf1").unwrap();
......@@ -204,10 +200,7 @@ fn test_ingest_external_file_new() {
#[test]
fn test_ingest_external_file_new_cf() {
let path = TempDir::new("_rust_rocksdb_ingest_sst_new_cf").expect("");
let path_str = path.path().to_str().unwrap();
let mut opts = DBOptions::new();
opts.create_if_missing(true);
let mut db = DB::open(opts, path_str).unwrap();
let mut db = create_default_database(&path);
let gen_path = TempDir::new("_rust_rocksdb_ingest_sst_gen_new_cf").expect("");
let test_sstfile = gen_path.path().join("test_sst_file_new_cf");
let test_sstfile_str = test_sstfile.to_str().unwrap();
......@@ -246,3 +239,108 @@ fn test_ingest_external_file_new_cf() {
assert_eq!(snap.get_cf(handle, b"k2").unwrap().unwrap(), b"b");
assert_eq!(snap.get_cf(handle, b"k3").unwrap().unwrap(), b"c");
}
fn check_kv(db: &DB, cf: Option<&CFHandle>, data: &[(&[u8], Option<&[u8]>)]) {
for &(k, v) in data {
let handle = cf.unwrap_or(db.cf_handle("default").unwrap());
if v.is_none() {
assert!(db.get_cf(handle, k).unwrap().is_none());
} else {
assert_eq!(db.get_cf(handle, k).unwrap().unwrap(), v.unwrap());
}
}
}
fn put_delete_and_generate_sst_cf(opt: ColumnFamilyOptions, db: &DB, cf: &CFHandle, path: &str) {
db.put_cf(cf, b"k1", b"v1").unwrap();
db.put_cf(cf, b"k2", b"v2").unwrap();
db.put_cf(cf, b"k3", b"v3").unwrap();
db.put_cf(cf, b"k4", b"v4").unwrap();
db.delete_cf(cf, b"k1").unwrap();
db.delete_cf(cf, b"k3").unwrap();
gen_sst_from_cf(opt, db, cf, path);
}
fn gen_sst_from_cf(opt: ColumnFamilyOptions, db: &DB, cf: &CFHandle, path: &str) {
let env_opt = EnvOptions::new();
let mut writer = SstFileWriter::new_cf(env_opt, opt, cf);
writer.open(path).unwrap();
let mut iter = db.iter_cf(cf);
iter.seek(SeekKey::Start);
while iter.valid() {
writer.add(iter.key(), iter.value()).unwrap();
iter.next();
}
writer.finish().unwrap();
}
fn create_default_database(path: &TempDir) -> DB {
let path_str = path.path().to_str().unwrap();
let mut opts = DBOptions::new();
opts.create_if_missing(true);
DB::open(opts, path_str).unwrap()
}
fn create_cfs(db: &mut DB, cfs: &[&str]) {
for cf in cfs {
if *cf != "default" {
let cf_opts = ColumnFamilyOptions::new();
db.create_cf(cf, cf_opts).unwrap();
}
}
}
#[test]
fn test_ingest_simulate_real_world() {
const ALL_CFS: [&str; 3] = ["lock", "write", "default"];
let path = TempDir::new("_rust_rocksdb_ingest_real_world_1").expect("");
let mut db = create_default_database(&path);
let gen_path = TempDir::new("_rust_rocksdb_ingest_real_world_new_cf").expect("");
create_cfs(&mut db, &ALL_CFS);
for cf in &ALL_CFS {
let handle = db.cf_handle(cf).unwrap();
let cf_opts = ColumnFamilyOptions::new();
put_delete_and_generate_sst_cf(cf_opts,
&db,
&handle,
gen_path.path().join(cf).to_str().unwrap());
}
let path2 = TempDir::new("_rust_rocksdb_ingest_real_world_2").expect("");
let mut db2 = create_default_database(&path2);
for cf in &ALL_CFS {
if *cf != "default" {
let cf_opts = ColumnFamilyOptions::new();
db2.create_cf(cf, cf_opts).unwrap();
}
}
for cf in &ALL_CFS {
let handle = db2.cf_handle(cf).unwrap();
let mut ingest_opt = IngestExternalFileOptions::new();
ingest_opt.move_files(true);
db2.ingest_external_file_cf(handle,
&ingest_opt,
&[gen_path.path().join(cf).to_str().unwrap()])
.unwrap();
check_kv(&db,
db.cf_handle(cf),
&[(b"k1", None), (b"k2", Some(b"v2")), (b"k3", None), (b"k4", Some(b"v4"))]);
let cf_opts = ColumnFamilyOptions::new();
gen_sst_from_cf(cf_opts,
&db2,
&handle,
gen_path.path().join(cf).to_str().unwrap());
}
for cf in &ALL_CFS {
let handle = db.cf_handle(cf).unwrap();
let ingest_opt = IngestExternalFileOptions::new();
db.ingest_external_file_cf(handle,
&ingest_opt,
&[gen_path.path().join(cf).to_str().unwrap()])
.unwrap();
check_kv(&db,
db.cf_handle(cf),
&[(b"k1", None), (b"k2", Some(b"v2")), (b"k3", None), (b"k4", Some(b"v4"))]);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment