Commit 8f271417 authored by fredchenbj's avatar fredchenbj Committed by dorianzheng

Supoort db with ttl open (#227)

* support DBWithTTL Open Interface

* add DBWithTTL_Open test

* run cargo fmt --all

* small fix to pass travis ci

* fix bug about with_ttl condition and code style

* small fix to pass travis ci

* small fix to pass travis ci

* unify the with_ttl funtion's conditon

* fix condition about with_ttl and add more tests

* little fix to pass travis ci

* little fix to pass travis ci

* fix with_ttl condition and make it more clear

* litte fix to pass travis ci
parent 352aa9a1
......@@ -32,8 +32,10 @@
#include "rocksdb/utilities/backupable_db.h"
#include "rocksdb/utilities/debug.h"
#include "rocksdb/utilities/options_util.h"
#include "rocksdb/utilities/db_ttl.h"
#include "rocksdb/write_batch.h"
#include "db/column_family.h"
#include "table/sst_file_writer_collectors.h"
#include "table/table_reader.h"
......@@ -64,6 +66,7 @@ using rocksdb::Comparator;
using rocksdb::CompressionType;
using rocksdb::WALRecoveryMode;
using rocksdb::DB;
using rocksdb::DBWithTTL;
using rocksdb::DBOptions;
using rocksdb::Env;
using rocksdb::EnvOptions;
......@@ -522,6 +525,20 @@ crocksdb_t* crocksdb_open(
return result;
}
crocksdb_t* crocksdb_open_with_ttl(
const crocksdb_options_t* options,
const char* name,
int ttl,
char** errptr) {
DBWithTTL* db;
if (SaveError(errptr, DBWithTTL::Open(options->rep, std::string(name), &db, ttl))) {
return nullptr;
}
crocksdb_t* result = new crocksdb_t;
result->rep = db;
return result;
}
crocksdb_t* crocksdb_open_for_read_only(
const crocksdb_options_t* options,
const char* name,
......@@ -666,6 +683,42 @@ crocksdb_t* crocksdb_open_column_families(
return result;
}
crocksdb_t* crocksdb_open_column_families_with_ttl(
const crocksdb_options_t* db_options,
const char* name,
int num_column_families,
const char** column_family_names,
const crocksdb_options_t** column_family_options,
const int32_t* ttl_array,
bool read_only,
crocksdb_column_family_handle_t** column_family_handles,
char** errptr) {
std::vector<ColumnFamilyDescriptor> column_families;
std::vector<int32_t> ttls;
for (int i = 0; i < num_column_families; i++) {
column_families.push_back(ColumnFamilyDescriptor(
std::string(column_family_names[i]),
ColumnFamilyOptions(column_family_options[i]->rep)));
ttls.push_back(ttl_array[i]);
}
DBWithTTL* db;
std::vector<ColumnFamilyHandle*> handles;
if (SaveError(errptr, DBWithTTL::Open(DBOptions(db_options->rep),
std::string(name), column_families, &handles, &db, ttls, read_only))) {
return nullptr;
}
for (size_t i = 0; i < handles.size(); i++) {
crocksdb_column_family_handle_t* c_handle = new crocksdb_column_family_handle_t;
c_handle->rep = handles[i];
column_family_handles[i] = c_handle;
}
crocksdb_t* result = new crocksdb_t;
result->rep = db;
return result;
}
crocksdb_t* crocksdb_open_for_read_only_column_families(
const crocksdb_options_t* db_options,
const char* name,
......
......@@ -165,6 +165,9 @@ typedef enum crocksdb_table_property_t {
extern C_ROCKSDB_LIBRARY_API crocksdb_t* crocksdb_open(
const crocksdb_options_t* options, const char* name, char** errptr);
extern C_ROCKSDB_LIBRARY_API crocksdb_t* crocksdb_open_with_ttl(
const crocksdb_options_t* options, const char* name, int ttl, char** errptr);
extern C_ROCKSDB_LIBRARY_API crocksdb_t* crocksdb_open_for_read_only(
const crocksdb_options_t* options, const char* name,
unsigned char error_if_log_file_exist, char** errptr);
......@@ -223,6 +226,14 @@ extern C_ROCKSDB_LIBRARY_API crocksdb_t* crocksdb_open_column_families(
const crocksdb_options_t** column_family_options,
crocksdb_column_family_handle_t** column_family_handles, char** errptr);
extern C_ROCKSDB_LIBRARY_API crocksdb_t* crocksdb_open_column_families_with_ttl(
const crocksdb_options_t* options, const char* name, int num_column_families,
const char** column_family_names,
const crocksdb_options_t** column_family_options,
const int32_t* ttl_array, bool read_only,
crocksdb_column_family_handle_t** column_family_handles,
char** errptr);
extern C_ROCKSDB_LIBRARY_API crocksdb_t*
crocksdb_open_for_read_only_column_families(
const crocksdb_options_t* options, const char* name, int num_column_families,
......
......@@ -517,6 +517,12 @@ extern "C" {
path: *const c_char,
err: *mut *mut c_char,
) -> *mut DBInstance;
pub fn crocksdb_open_with_ttl(
options: *mut Options,
path: *const c_char,
ttl: c_int,
err: *mut *mut c_char,
) -> *mut DBInstance;
pub fn crocksdb_open_for_read_only(
options: *mut Options,
path: *const c_char,
......@@ -852,6 +858,17 @@ extern "C" {
column_family_handles: *const *mut DBCFHandle,
err: *mut *mut c_char,
) -> *mut DBInstance;
pub fn crocksdb_open_column_families_with_ttl(
options: *const Options,
path: *const c_char,
num_column_families: c_int,
column_family_names: *const *const c_char,
column_family_options: *const *const Options,
ttl_array: *const c_int,
read_only: bool,
column_family_handles: *const *mut DBCFHandle,
err: *mut *mut c_char,
) -> *mut DBInstance;
pub fn crocksdb_open_for_read_only_column_families(
options: *const Options,
path: *const c_char,
......
......@@ -52,10 +52,13 @@ impl Drop for CFHandle {
}
}
fn ensure_default_cf_exists<'a>(list: &mut Vec<ColumnFamilyDescriptor<'a>>) {
fn ensure_default_cf_exists<'a>(list: &mut Vec<ColumnFamilyDescriptor<'a>>, ttls: &mut Vec<i32>) {
let contains = list.iter().any(|ref cf| cf.is_default());
if !contains {
list.push(ColumnFamilyDescriptor::default());
if ttls.len() > 0 {
ttls.push(0);
}
}
}
......@@ -370,11 +373,34 @@ impl DB {
DB::open_cf(opts, path, cfds)
}
pub fn open_with_ttl(opts: DBOptions, path: &str, ttls: &[i32]) -> Result<DB, String> {
let cfds: Vec<&str> = vec![];
if ttls.len() == 0 {
return Err("ttls is empty in with_ttl function".to_owned());
}
DB::open_cf_with_ttl(opts, path, cfds, ttls)
}
pub fn open_cf<'a, T>(opts: DBOptions, path: &str, cfds: Vec<T>) -> Result<DB, String>
where
T: Into<ColumnFamilyDescriptor<'a>>,
{
DB::open_cf_internal(opts, path, cfds, None)
DB::open_cf_internal(opts, path, cfds, &[], None)
}
pub fn open_cf_with_ttl<'a, T>(
opts: DBOptions,
path: &str,
cfds: Vec<T>,
ttls: &[i32],
) -> Result<DB, String>
where
T: Into<ColumnFamilyDescriptor<'a>>,
{
if ttls.len() == 0 {
return Err("ttls is empty in with_ttl function".to_owned());
}
DB::open_cf_internal(opts, path, cfds, ttls, None)
}
pub fn open_for_read_only(
......@@ -395,13 +421,14 @@ impl DB {
where
T: Into<ColumnFamilyDescriptor<'a>>,
{
DB::open_cf_internal(opts, path, cfds, Some(error_if_log_file_exist))
DB::open_cf_internal(opts, path, cfds, &[], Some(error_if_log_file_exist))
}
fn open_cf_internal<'a, T>(
opts: DBOptions,
path: &str,
cfds: Vec<T>,
ttls: &[i32],
// if none, open for read write mode.
// otherwise, open for read only.
error_if_log_file_exist: Option<bool>,
......@@ -424,7 +451,8 @@ impl DB {
})?;
let mut descs = cfds.into_iter().map(|t| t.into()).collect();
ensure_default_cf_exists(&mut descs);
let mut ttls_vec = ttls.to_vec();
ensure_default_cf_exists(&mut descs, &mut ttls_vec);
let (names, options) = split_descriptors(descs);
let cstrings = build_cstring_list(&names);
......@@ -441,6 +469,17 @@ impl DB {
} else {
false
};
let with_ttl = if ttls_vec.len() > 0 {
if ttls_vec.len() == cf_names.len() {
true
} else {
return Err("the length of ttls not equal to length of cfs".to_owned());
}
} else {
false
};
let db = {
let db_options = opts.inner;
let db_path = cpath.as_ptr();
......@@ -448,6 +487,8 @@ impl DB {
let db_cf_ptrs = cf_names.as_ptr();
let db_cf_opts = cf_options.as_ptr();
let db_cf_handles = cf_handles.as_ptr();
if !with_ttl {
if let Some(flag) = error_if_log_file_exist {
unsafe {
ffi_try!(crocksdb_open_for_read_only_column_families(
......@@ -472,7 +513,24 @@ impl DB {
))
}
}
} else {
let ttl_array = ttls_vec.as_ptr() as *const c_int;
unsafe {
ffi_try!(crocksdb_open_column_families_with_ttl(
db_options,
db_path,
db_cfs_count,
db_cf_ptrs,
db_cf_opts,
ttl_array,
readonly,
db_cf_handles
))
}
}
};
if cf_handles.iter().any(|h| h.is_null()) {
return Err(ERR_NULL_CF_HANDLE.to_owned());
}
......
......@@ -21,3 +21,4 @@ mod test_rocksdb_options;
mod test_slice_transform;
mod test_statistics;
mod test_table_properties;
mod test_ttl;
use rocksdb::{ColumnFamilyOptions, DBOptions, Writable, DB};
use tempdir::TempDir;
#[test]
pub fn test_ttl() {
let path = TempDir::new("_rust_rocksdb_ttl_test").expect("");
let path_str = path.path().to_str().unwrap();
// should be able to open db with ttl
{
let mut opts = DBOptions::new();
let cf_opts = ColumnFamilyOptions::new();
let ttl = 10;
opts.create_if_missing(true);
let mut db = match DB::open_cf_with_ttl(
opts,
path.path().to_str().unwrap(),
vec![("default", cf_opts)],
&[ttl],
) {
Ok(db) => {
println!("successfully opened db with ttl");
db
}
Err(e) => panic!("failed to open db with ttl: {}", e),
};
match db.create_cf("cf1") {
Ok(_) => println!("cf1 created successfully"),
Err(e) => {
panic!("could not create column family: {}", e);
}
}
assert_eq!(db.cf_names(), vec!["cf1", "default"]);
match db.create_cf("cf2") {
Ok(_) => println!("cf2 created successfully"),
Err(e) => {
panic!("could not create column family: {}", e);
}
}
assert_eq!(db.cf_names(), vec!["cf1", "cf2", "default"]);
drop(db);
}
// should be able to write, read over a cf with the length of ttls equals to that of cfs
{
let db = match DB::open_cf_with_ttl(
DBOptions::new(),
path_str,
vec![
("cf1", ColumnFamilyOptions::new()),
("cf2", ColumnFamilyOptions::new()),
("default", ColumnFamilyOptions::new()),
],
&[10, 10, 10],
) {
Ok(db) => {
println!("successfully opened cf with ttl");
db
}
Err(e) => panic!("failed to open cf with ttl: {}", e),
};
let cf1 = db.cf_handle("cf1").unwrap();
assert!(db.put_cf(cf1, b"k1", b"v1").is_ok());
assert!(db.get_cf(cf1, b"k1").unwrap().unwrap().to_utf8().unwrap() == "v1");
let p = db.put_cf(cf1, b"k1", b"a");
assert!(p.is_ok());
}
// should be able to write, read over a cf with the length of ttls equals to that of cfs.
// default cf could be with ttl 0 if it is not in cfds
{
let db = match DB::open_cf_with_ttl(
DBOptions::new(),
path_str,
vec![
("cf1", ColumnFamilyOptions::new()),
("cf2", ColumnFamilyOptions::new()),
],
&[10, 10],
) {
Ok(db) => {
println!("successfully opened cf with ttl");
db
}
Err(e) => panic!("failed to open cf with ttl: {}", e),
};
let cf1 = db.cf_handle("cf1").unwrap();
assert!(db.put_cf(cf1, b"k1", b"v1").is_ok());
assert!(db.get_cf(cf1, b"k1").unwrap().unwrap().to_utf8().unwrap() == "v1");
let p = db.put_cf(cf1, b"k1", b"a");
assert!(p.is_ok());
}
// should fail to open cf with ttl when the length of ttls not equal to that of cfs
{
let _db = match DB::open_cf_with_ttl(
DBOptions::new(),
path_str,
vec![
("cf1", ColumnFamilyOptions::new()),
("cf2", ColumnFamilyOptions::new()),
],
&[10],
) {
Ok(_) => panic!(
"should not have opened DB successfully with ttl \
when the length of ttl not equal to that of cfs"
),
Err(e) => assert!(e.starts_with("the length of ttls not equal to length of cfs")),
};
}
// should fail to open cf with ttl when the length of ttls not equal to that of cfs
// when default is in cfds, it's ttl must be supplied
{
let _db = match DB::open_cf_with_ttl(
DBOptions::new(),
path_str,
vec![
("cf1", ColumnFamilyOptions::new()),
("cf2", ColumnFamilyOptions::new()),
("default", ColumnFamilyOptions::new()),
],
&[10, 10],
) {
Ok(_) => panic!(
"should not have opened DB successfully with ttl \
when the length of ttl not equal to that of cfs"
),
Err(e) => assert!(e.starts_with("the length of ttls not equal to length of cfs")),
};
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment