Unverified Commit 4d2381ec authored by Connor's avatar Connor Committed by GitHub

cherry-pick some commits (#310)

* Fix titan config (#306)

* Add titan read option (#309)
Signed-off-by: 's avatarConnor1996 <zbk602423539@gmail.com>
parent 94d50c89
......@@ -170,6 +170,7 @@ using rocksdb::titandb::TitanCFOptions;
using rocksdb::titandb::TitanDB;
using rocksdb::titandb::TitanDBOptions;
using rocksdb::titandb::TitanOptions;
using rocksdb::titandb::TitanReadOptions;
using rocksdb::titandb::TitanBlobRunMode;
using std::shared_ptr;
......@@ -1074,17 +1075,20 @@ void crocksdb_create_iterators(
crocksdb_iterator_t** iterators,
size_t size,
char** errptr) {
std::vector<ColumnFamilyHandle*> column_families_vec;
std::vector<ColumnFamilyHandle*> column_families_vec(size);
for (size_t i = 0; i < size; i++) {
column_families_vec.push_back(column_families[i]->rep);
}
std::vector<Iterator*> res;
Status status = db->rep->NewIterators(opts->rep, column_families_vec, &res);
assert(res.size() == size);
if (SaveError(errptr, status)) {
for (size_t i = 0; i < res.size(); i++) {
delete res[i];
}
return;
}
assert(res.size() == size);
for (size_t i = 0; i < size; i++) {
iterators[i] = new crocksdb_iterator_t;
......@@ -5068,12 +5072,12 @@ void ctitandb_options_set_min_gc_batch_size(ctitandb_options_t* options,
}
void ctitandb_options_set_blob_file_discardable_ratio(
ctitandb_options_t* options, float ratio) {
ctitandb_options_t* options, double ratio) {
options->rep.blob_file_discardable_ratio = ratio;
}
void ctitandb_options_set_sample_file_size_ratio(ctitandb_options_t* options,
float ratio) {
double ratio) {
options->rep.sample_file_size_ratio = ratio;
}
......@@ -5087,6 +5091,11 @@ void ctitandb_options_set_max_background_gc(ctitandb_options_t* options,
options->rep.max_background_gc = size;
}
void ctitandb_options_set_purge_obsolete_files_period(ctitandb_options_t* options,
unsigned int period) {
options->rep.purge_obsolete_files_period = period;
}
void ctitandb_options_set_blob_cache(ctitandb_options_t* options,
crocksdb_cache_t* cache) {
if (cache) {
......@@ -5095,12 +5104,12 @@ void ctitandb_options_set_blob_cache(ctitandb_options_t* options,
}
void ctitandb_options_set_discardable_ratio(ctitandb_options_t* options,
float ratio) {
double ratio) {
options->rep.blob_file_discardable_ratio = ratio;
}
void ctitandb_options_set_sample_ratio(ctitandb_options_t* options,
float ratio) {
double ratio) {
options->rep.sample_file_size_ratio = ratio;
}
......@@ -5109,4 +5118,90 @@ void ctitandb_options_set_blob_run_mode(ctitandb_options_t* options,
options->rep.blob_run_mode = static_cast<TitanBlobRunMode>(mode);
}
/* TitanReadOptions */
struct ctitandb_readoptions_t {
TitanReadOptions rep;
};
ctitandb_readoptions_t* ctitandb_readoptions_create() {
return new ctitandb_readoptions_t;
}
void ctitandb_readoptions_destroy(ctitandb_readoptions_t* opts) {
delete opts;
}
bool ctitandb_readoptions_key_only(ctitandb_readoptions_t* opts) {
return opts->rep.key_only;
}
void ctitandb_readoptions_set_key_only(ctitandb_readoptions_t* opts,
bool v) {
opts->rep.key_only = v;
}
crocksdb_iterator_t* ctitandb_create_iterator(
crocksdb_t* db,
const crocksdb_readoptions_t* options,
const ctitandb_readoptions_t* titan_options) {
crocksdb_iterator_t* result = new crocksdb_iterator_t;
if (titan_options == nullptr) {
result->rep = db->rep->NewIterator(options->rep);
} else {
*(ReadOptions*)&titan_options->rep = options->rep;
result->rep = static_cast<TitanDB*>(db->rep)->NewIterator(titan_options->rep);
}
return result;
}
crocksdb_iterator_t* ctitandb_create_iterator_cf(
crocksdb_t* db,
const crocksdb_readoptions_t* options,
const ctitandb_readoptions_t* titan_options,
crocksdb_column_family_handle_t* column_family) {
crocksdb_iterator_t* result = new crocksdb_iterator_t;
if (titan_options == nullptr) {
result->rep = db->rep->NewIterator(options->rep, column_family->rep);
} else {
*(ReadOptions*)&titan_options->rep = options->rep;
result->rep = static_cast<TitanDB*>(db->rep)->NewIterator(titan_options->rep, column_family->rep);
}
return result;
}
void ctitandb_create_iterators(
crocksdb_t *db,
crocksdb_readoptions_t* options,
ctitandb_readoptions_t* titan_options,
crocksdb_column_family_handle_t** column_families,
crocksdb_iterator_t** iterators,
size_t size,
char** errptr) {
std::vector<ColumnFamilyHandle*> column_families_vec(size);
for (size_t i = 0; i < size; i++) {
column_families_vec.push_back(column_families[i]->rep);
}
std::vector<Iterator*> res;
Status status;
if (titan_options == nullptr) {
status = db->rep->NewIterators(options->rep, column_families_vec, &res);
} else {
*(ReadOptions*)&titan_options->rep = options->rep;
status = static_cast<TitanDB*>(db->rep)->NewIterators(titan_options->rep, column_families_vec, &res);
}
if (SaveError(errptr, status)) {
for (size_t i = 0; i < res.size(); i++) {
delete res[i];
}
return;
}
assert(res.size() == size);
for (size_t i = 0; i < size; i++) {
iterators[i] = new crocksdb_iterator_t;
iterators[i]->rep = res[i];
}
}
} // end extern "C"
......@@ -1965,6 +1965,7 @@ struct ctitandb_blob_index_t {
};
typedef struct ctitandb_options_t ctitandb_options_t;
typedef struct ctitandb_readoptions_t ctitandb_readoptions_t;
typedef struct ctitandb_blob_index_t ctitandb_blob_index_t;
extern C_ROCKSDB_LIBRARY_API crocksdb_t* ctitandb_open_column_families(
......@@ -2020,10 +2021,10 @@ extern C_ROCKSDB_LIBRARY_API void ctitandb_options_set_min_gc_batch_size(
extern C_ROCKSDB_LIBRARY_API void
ctitandb_options_set_blob_file_discardable_ratio(ctitandb_options_t* options,
float ratio);
double ratio);
extern C_ROCKSDB_LIBRARY_API void ctitandb_options_set_sample_file_size_ratio(
ctitandb_options_t* options, float ratio);
ctitandb_options_t* options, double ratio);
extern C_ROCKSDB_LIBRARY_API void
ctitandb_options_set_merge_small_file_threshold(ctitandb_options_t* options,
......@@ -2032,14 +2033,50 @@ ctitandb_options_set_merge_small_file_threshold(ctitandb_options_t* options,
extern C_ROCKSDB_LIBRARY_API void ctitandb_options_set_max_background_gc(
ctitandb_options_t* options, int32_t size);
extern C_ROCKSDB_LIBRARY_API void ctitandb_options_set_purge_obsolete_files_period(
ctitandb_options_t* options, unsigned int period);
extern C_ROCKSDB_LIBRARY_API void ctitandb_options_set_blob_cache(
ctitandb_options_t* options, crocksdb_cache_t* cache);
extern C_ROCKSDB_LIBRARY_API void ctitandb_options_set_discardable_ratio(
ctitandb_options_t* options, float ratio);
ctitandb_options_t* options, double ratio);
extern void ctitandb_options_set_sample_ratio(ctitandb_options_t* options,
float ratio);
double ratio);
/* TitanReadOptions */
extern C_ROCKSDB_LIBRARY_API ctitandb_readoptions_t* ctitandb_readoptions_create();
extern C_ROCKSDB_LIBRARY_API void ctitandb_readoptions_destroy(ctitandb_readoptions_t* opts);
extern C_ROCKSDB_LIBRARY_API bool ctitandb_readoptions_key_only(ctitandb_readoptions_t* opts);
extern C_ROCKSDB_LIBRARY_API void ctitandb_readoptions_set_key_only(ctitandb_readoptions_t* opts,
bool v);
/* Titan Iterator */
extern C_ROCKSDB_LIBRARY_API crocksdb_iterator_t* ctitandb_create_iterator(
crocksdb_t* db,
const crocksdb_readoptions_t* options,
const ctitandb_readoptions_t* titan_options);
extern C_ROCKSDB_LIBRARY_API crocksdb_iterator_t* ctitandb_create_iterator_cf(
crocksdb_t* db,
const crocksdb_readoptions_t* options,
const ctitandb_readoptions_t* titan_options,
crocksdb_column_family_handle_t* column_family);
extern C_ROCKSDB_LIBRARY_API void ctitandb_create_iterators(
crocksdb_t *db,
crocksdb_readoptions_t* options,
ctitandb_readoptions_t* titan_options,
crocksdb_column_family_handle_t** column_families,
crocksdb_iterator_t** iterators,
size_t size,
char** errptr);
#ifdef __cplusplus
} /* end extern "C" */
......
Subproject commit b4b4181a6eff6961c0062415d557976679d5fbcf
Subproject commit ab0fa88daa6b3f8d7ba7fac6cc52346b51e4c039
......@@ -85,6 +85,7 @@ mod generated;
pub use generated::*;
pub enum DBTitanDBOptions {}
pub enum DBTitanReadOptions {}
#[derive(Clone, Debug, Default)]
#[repr(C)]
......@@ -1853,6 +1854,10 @@ extern "C" {
pub fn ctitandb_options_set_disable_background_gc(opts: *mut DBTitanDBOptions, disable: bool);
pub fn ctitandb_options_set_max_background_gc(opts: *mut DBTitanDBOptions, size: i32);
pub fn ctitandb_options_set_purge_obsolete_files_period(
opts: *mut DBTitanDBOptions,
period: usize,
);
pub fn ctitandb_options_set_min_gc_batch_size(opts: *mut DBTitanDBOptions, size: u64);
pub fn ctitandb_options_set_max_gc_batch_size(opts: *mut DBTitanDBOptions, size: u64);
pub fn ctitandb_options_set_blob_cache(opts: *mut DBTitanDBOptions, cache: *mut DBCache);
......@@ -1860,6 +1865,23 @@ extern "C" {
pub fn ctitandb_options_set_sample_ratio(opts: *mut DBTitanDBOptions, ratio: f64);
pub fn ctitandb_options_set_merge_small_file_threshold(opts: *mut DBTitanDBOptions, size: u64);
pub fn ctitandb_options_set_blob_run_mode(opts: *mut DBTitanDBOptions, t: DBTitanDBBlobRunMode);
pub fn ctitandb_readoptions_set_key_only(opts: *mut DBTitanReadOptions, v: bool);
pub fn ctitandb_readoptions_create() -> *mut DBTitanReadOptions;
pub fn ctitandb_readoptions_destroy(readopts: *mut DBTitanReadOptions);
pub fn ctitandb_create_iterator(
db: *mut DBInstance,
readopts: *const DBReadOptions,
titan_readopts: *const DBTitanReadOptions,
) -> *mut DBIterator;
pub fn ctitandb_create_iterator_cf(
db: *mut DBInstance,
readopts: *const DBReadOptions,
titan_readopts: *const DBTitanReadOptions,
cf_handle: *mut DBCFHandle,
) -> *mut DBIterator;
}
#[cfg(test)]
......
......@@ -104,6 +104,12 @@ impl Debug for DB {
unsafe impl Send for DB {}
unsafe impl Sync for DB {}
impl DB {
pub fn is_titan(&self) -> bool {
!self.opts.titan_inner.is_null()
}
}
pub struct WriteBatch {
inner: *mut DBWriteBatch,
}
......@@ -136,7 +142,15 @@ impl<'a> From<&'a [u8]> for SeekKey<'a> {
impl<D: Deref<Target = DB>> DBIterator<D> {
pub fn new(db: D, readopts: ReadOptions) -> DBIterator<D> {
unsafe {
let iterator = crocksdb_ffi::crocksdb_create_iterator(db.inner, readopts.get_inner());
let iterator = if db.is_titan() {
crocksdb_ffi::ctitandb_create_iterator(
db.inner,
readopts.get_inner(),
readopts.get_titan_inner(),
)
} else {
crocksdb_ffi::crocksdb_create_iterator(db.inner, readopts.get_inner())
};
DBIterator {
_db: db,
......@@ -146,6 +160,30 @@ impl<D: Deref<Target = DB>> DBIterator<D> {
}
}
pub fn new_cf(db: D, cf_handle: &CFHandle, readopts: ReadOptions) -> DBIterator<D> {
unsafe {
let iterator = if db.is_titan() {
crocksdb_ffi::ctitandb_create_iterator_cf(
db.inner,
readopts.get_inner(),
readopts.get_titan_inner(),
cf_handle.inner,
)
} else {
crocksdb_ffi::crocksdb_create_iterator_cf(
db.inner,
readopts.get_inner(),
cf_handle.inner,
)
};
DBIterator {
_db: db,
_readopts: readopts,
inner: iterator,
}
}
}
pub fn seek(&mut self, key: SeekKey) -> bool {
unsafe {
match key {
......@@ -226,21 +264,6 @@ impl<D: Deref<Target = DB>> DBIterator<D> {
}
Ok(())
}
pub fn new_cf(db: D, cf_handle: &CFHandle, readopts: ReadOptions) -> DBIterator<D> {
unsafe {
let iterator = crocksdb_ffi::crocksdb_create_iterator_cf(
db.inner,
readopts.get_inner(),
cf_handle.inner,
);
DBIterator {
_db: db,
_readopts: readopts,
inner: iterator,
}
}
}
}
pub type Kv = (Vec<u8>, Vec<u8>);
......
......@@ -20,7 +20,7 @@ use crocksdb_ffi::{
DBCompactionOptions, DBCompressionType, DBFifoCompactionOptions, DBFlushOptions,
DBInfoLogLevel, DBInstance, DBLRUCacheOptions, DBRateLimiter, DBRateLimiterMode, DBReadOptions,
DBRecoveryMode, DBRestoreOptions, DBSnapshot, DBStatisticsHistogramType,
DBStatisticsTickerType, DBTitanDBOptions, DBWriteOptions, Options,
DBStatisticsTickerType, DBTitanDBOptions, DBTitanReadOptions, DBWriteOptions, Options,
};
use event_listener::{new_event_listener, EventListener};
use libc::{self, c_double, c_int, c_uchar, c_void, size_t};
......@@ -252,11 +252,17 @@ pub struct ReadOptions {
inner: *mut DBReadOptions,
lower_bound: Vec<u8>,
upper_bound: Vec<u8>,
titan_inner: *mut DBTitanReadOptions,
}
impl Drop for ReadOptions {
fn drop(&mut self) {
unsafe { crocksdb_ffi::crocksdb_readoptions_destroy(self.inner) }
unsafe {
crocksdb_ffi::crocksdb_readoptions_destroy(self.inner);
if !self.titan_inner.is_null() {
crocksdb_ffi::ctitandb_readoptions_destroy(self.titan_inner);
}
}
}
}
......@@ -269,6 +275,7 @@ impl Default for ReadOptions {
inner: opts,
lower_bound: vec![],
upper_bound: vec![],
titan_inner: ptr::null_mut::<DBTitanReadOptions>(),
}
}
}
......@@ -392,10 +399,23 @@ impl ReadOptions {
}
}
pub unsafe fn get_inner(&self) -> *const DBReadOptions {
pub fn get_inner(&self) -> *const DBReadOptions {
self.inner
}
pub fn get_titan_inner(&self) -> *const DBTitanReadOptions {
self.titan_inner
}
pub fn set_titan_key_only(&mut self, v: bool) {
unsafe {
if self.titan_inner.is_null() {
self.titan_inner = crocksdb_ffi::ctitandb_readoptions_create();
}
crocksdb_ffi::ctitandb_readoptions_set_key_only(self.titan_inner, v);
}
}
pub fn set_table_filter(&mut self, filter: Box<TableFilter>) {
unsafe {
let f = Box::into_raw(Box::new(filter));
......
......@@ -70,6 +70,12 @@ impl TitanDBOptions {
}
}
pub fn set_purge_obsolete_files_period(&mut self, period: usize) {
unsafe {
crocksdb_ffi::ctitandb_options_set_purge_obsolete_files_period(self.inner, period);
}
}
pub fn set_min_gc_batch_size(&mut self, size: u64) {
unsafe {
crocksdb_ffi::ctitandb_options_set_min_gc_batch_size(self.inner, size);
......
......@@ -18,7 +18,7 @@ use tempdir::TempDir;
use rand::Rng;
use rocksdb::{
ColumnFamilyOptions, DBCompressionType, DBEntryType, DBOptions, SeekKey,
ColumnFamilyOptions, DBCompressionType, DBEntryType, DBOptions, ReadOptions, SeekKey,
TablePropertiesCollector, TablePropertiesCollectorFactory, TitanBlobIndex, TitanDBOptions,
UserCollectedProperties, Writable, DB,
};
......@@ -108,6 +108,7 @@ fn test_titandb() {
tdb_opts.set_min_blob_size(max_value_size / 2 + 1);
tdb_opts.set_blob_file_compression(DBCompressionType::No);
tdb_opts.set_disable_background_gc(true);
tdb_opts.set_purge_obsolete_files_period(10);
let mut opts = DBOptions::new();
opts.create_if_missing(true);
......@@ -148,6 +149,37 @@ fn test_titandb() {
}
}
let mut readopts = ReadOptions::new();
readopts.set_titan_key_only(true);
iter = db.iter_opt(readopts);
iter.seek(SeekKey::Start);
for i in 0..n {
for j in 0..n {
let k = (i * n + j) as u8;
let v = vec![k; (j + 1) as usize];
assert_eq!(db.get(&[k]).unwrap().unwrap(), &v);
assert!(iter.valid());
assert_eq!(iter.key(), &[k]);
iter.next();
}
}
let cf_handle = db.cf_handle("default").unwrap();
readopts = ReadOptions::new();
readopts.set_titan_key_only(true);
iter = db.iter_cf_opt(&cf_handle, readopts);
iter.seek(SeekKey::Start);
for i in 0..n {
for j in 0..n {
let k = (i * n + j) as u8;
let v = vec![k; (j + 1) as usize];
assert_eq!(db.get(&[k]).unwrap().unwrap(), &v);
assert!(iter.valid());
assert_eq!(iter.key(), &[k]);
iter.next();
}
}
let num_entries = n as u32 * max_value_size as u32;
check_table_properties(&db, num_entries / 2, num_entries);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment