Commit ce9b8225 authored by Huachao Huang's avatar Huachao Huang

*: support table properties

parent 8336f84a
...@@ -29,6 +29,7 @@ path = "tests/test.rs" ...@@ -29,6 +29,7 @@ path = "tests/test.rs"
[dependencies] [dependencies]
libc = "0.2.11" libc = "0.2.11"
tempdir = "0.3.4" tempdir = "0.3.4"
byteorder = "1.0.0"
[dependencies.librocksdb_sys] [dependencies.librocksdb_sys]
path = "librocksdb_sys" path = "librocksdb_sys"
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
#include "rocksdb/statistics.h" #include "rocksdb/statistics.h"
#include "rocksdb/slice_transform.h" #include "rocksdb/slice_transform.h"
#include "rocksdb/table.h" #include "rocksdb/table.h"
#include "rocksdb/table_properties.h"
#include "rocksdb/rate_limiter.h" #include "rocksdb/rate_limiter.h"
#include "rocksdb/utilities/backupable_db.h" #include "rocksdb/utilities/backupable_db.h"
...@@ -83,6 +84,13 @@ using rocksdb::HistogramData; ...@@ -83,6 +84,13 @@ using rocksdb::HistogramData;
using rocksdb::PinnableSlice; using rocksdb::PinnableSlice;
using rocksdb::FilterBitsBuilder; using rocksdb::FilterBitsBuilder;
using rocksdb::FilterBitsReader; using rocksdb::FilterBitsReader;
using rocksdb::EntryType;
using rocksdb::SequenceNumber;
using rocksdb::UserCollectedProperties;
using rocksdb::TableProperties;
using rocksdb::TablePropertiesCollection;
using rocksdb::TablePropertiesCollector;
using rocksdb::TablePropertiesCollectorFactory;
using std::shared_ptr; using std::shared_ptr;
...@@ -2915,4 +2923,270 @@ const char* crocksdb_pinnableslice_value(const crocksdb_pinnableslice_t* v, ...@@ -2915,4 +2923,270 @@ const char* crocksdb_pinnableslice_value(const crocksdb_pinnableslice_t* v,
*vlen = v->rep.size(); *vlen = v->rep.size();
return v->rep.data(); return v->rep.data();
} }
struct crocksdb_user_collected_properties_t {
UserCollectedProperties* inner;
crocksdb_user_collected_properties_t(UserCollectedProperties* props) : inner(props) {}
};
void crocksdb_user_collected_properties_add(crocksdb_user_collected_properties_t* props,
const char* key, size_t key_len,
const char* value, size_t value_len) {
props->inner->emplace(std::make_pair(std::string(key, key_len), std::string(value, value_len)));
}
struct crocksdb_table_properties_collector_context_t {
void* collector;
const char* (*name)(void*);
void (*destructor)(void*);
void (*add_userkey)(void*,
const char* key, size_t key_len,
const char* value, size_t value_len,
int entry_type, uint64_t seq, uint64_t file_size);
void (*finish)(void*, crocksdb_user_collected_properties_t* props);
void (*readable_properties)(void*, crocksdb_user_collected_properties_t* props);
};
struct crocksdb_table_properties_collector_t : public TablePropertiesCollector {
crocksdb_table_properties_collector_context_t* rep_;
crocksdb_table_properties_collector_t(
crocksdb_table_properties_collector_context_t* context) : rep_(context) {}
virtual ~crocksdb_table_properties_collector_t() {
rep_->destructor(rep_);
}
virtual Status AddUserKey(const Slice& key,
const Slice& value,
EntryType entry_type,
SequenceNumber seq,
uint64_t file_size) override {
rep_->add_userkey(rep_,
key.data(), key.size(),
value.data(), value.size(),
entry_type, seq, file_size);
return Status::OK();
}
virtual Status Finish(UserCollectedProperties* inner) override {
crocksdb_user_collected_properties_t props(inner);
rep_->finish(rep_, &props);
return Status::OK();
}
virtual UserCollectedProperties GetReadableProperties() const override {
UserCollectedProperties inner;
crocksdb_user_collected_properties_t props(&inner);
rep_->readable_properties(rep_, &props);
return inner;
}
const char* Name() const override {
return rep_->name(rep_);
}
};
struct crocksdb_table_properties_collector_factory_context_t {
void* factory;
const char* (*name)(void*);
void (*destructor)(void*);
crocksdb_table_properties_collector_context_t*
(*create_table_properties_collector)(void*, uint32_t cf);
};
struct crocksdb_table_properties_collector_factory_t : public TablePropertiesCollectorFactory {
crocksdb_table_properties_collector_factory_context_t* rep_;
crocksdb_table_properties_collector_factory_t(
crocksdb_table_properties_collector_factory_context_t* context) : rep_(context) {}
virtual ~crocksdb_table_properties_collector_factory_t() {
rep_->destructor(rep_);
}
virtual TablePropertiesCollector* CreateTablePropertiesCollector (
TablePropertiesCollectorFactory::Context ctx) override {
auto context = rep_->create_table_properties_collector(rep_, ctx.column_family_id);
return new crocksdb_table_properties_collector_t(context);
}
const char* Name() const override {
return rep_->name(rep_);
}
};
crocksdb_table_properties_collector_factory_t*
crocksdb_table_properties_collector_factory_create(
crocksdb_table_properties_collector_factory_context_t* context) {
return new crocksdb_table_properties_collector_factory_t(context);
}
void crocksdb_table_properties_collector_factory_destroy(
crocksdb_table_properties_collector_factory_t* factory) {
delete factory;
}
void crocksdb_options_add_table_properties_collector_factory(
crocksdb_options_t* opt, crocksdb_table_properties_collector_factory_t* factory) {
opt->rep.table_properties_collector_factories.push_back(
std::shared_ptr<TablePropertiesCollectorFactory>(factory));
}
struct crocksdb_shallow_strings_map_t {
size_t size = 0;
const char** keys = nullptr;
size_t* keys_lens = nullptr;
const char** values = nullptr;
size_t* values_lens = nullptr;
~crocksdb_shallow_strings_map_t() {
delete[] keys;
delete[] keys_lens;
delete[] values;
delete[] values_lens;
}
void assign(const std::map<std::string, std::string>& items) {
size = items.size();
keys = new const char* [size];
keys_lens = new size_t [size];
values = new const char* [size];
values_lens = new size_t [size];
size_t i = 0;
for (auto it = items.begin(); it != items.end(); it++, i++) {
keys[i] = it->first.data();
keys_lens[i] = it->first.size();
values[i] = it->second.data();
values_lens[i] = it->second.size();
}
}
};
struct crocksdb_table_properties_t {
uint64_t data_size;
uint64_t index_size;
uint64_t filter_size;
uint64_t raw_key_size;
uint64_t raw_value_size;
uint64_t num_data_blocks;
uint64_t num_entries;
uint64_t format_version;
uint64_t fixed_key_len;
uint64_t column_family_id;
const char* column_family_name;
const char* filter_policy_name;
const char* comparator_name;
const char* merge_operator_name;
const char* prefix_extractor_name;
const char* property_collectors_names;
const char* compression_name;
crocksdb_shallow_strings_map_t user_collected_properties;
crocksdb_shallow_strings_map_t readable_properties;
void FromProperties(const std::shared_ptr<const TableProperties> props) {
data_size = props->data_size;
index_size = props->index_size;
filter_size = props->filter_size;
raw_key_size = props->raw_key_size;
raw_value_size = props->raw_value_size;
num_data_blocks = props->num_data_blocks;
num_entries = props->num_entries;
format_version = props->format_version;
fixed_key_len = props->fixed_key_len;
column_family_id = props->column_family_id;
column_family_name = props->column_family_name.c_str();
filter_policy_name = props->filter_policy_name.c_str();
comparator_name = props->comparator_name.c_str();
merge_operator_name = props->merge_operator_name.c_str();
prefix_extractor_name = props->prefix_extractor_name.c_str();
property_collectors_names = props->property_collectors_names.c_str();
compression_name = props->compression_name.c_str();
user_collected_properties.assign(props->user_collected_properties);
readable_properties.assign(props->readable_properties);
}
};
struct crocksdb_table_properties_collection_t {
TablePropertiesCollection* inner = nullptr;
size_t size = 0;
const char** keys = nullptr;
crocksdb_table_properties_t* values = nullptr;
~crocksdb_table_properties_collection_t() {
delete inner;
delete[] keys;
delete[] values;
}
void FromProperties(TablePropertiesCollection* props) {
inner = props;
size = props->size();
keys = new const char* [size];
values = new crocksdb_table_properties_t [size];
size_t i = 0;
for (auto it = props->begin(); it != props->end(); it++, i++) {
keys[i] = it->first.c_str();
values[i].FromProperties(it->second);
}
}
};
crocksdb_table_properties_collection_t*
crocksdb_table_properties_collection_create() {
return new crocksdb_table_properties_collection_t;
}
void crocksdb_table_properties_collection_destroy(
crocksdb_table_properties_collection_t* collection) {
delete collection;
}
void crocksdb_get_properties_of_all_tables(crocksdb_t* db,
crocksdb_table_properties_collection_t* collection, char** errptr) {
auto props = std::unique_ptr<TablePropertiesCollection>(new TablePropertiesCollection);
auto s = db->rep->GetPropertiesOfAllTables(props.get());
if (!s.ok()) {
SaveError(errptr, s);
return;
}
collection->FromProperties(props.release());
}
void crocksdb_get_properties_of_all_tables_cf(
crocksdb_t* db, crocksdb_column_family_handle_t* cf,
crocksdb_table_properties_collection_t* collection, char** errptr) {
auto props = std::unique_ptr<TablePropertiesCollection>(new TablePropertiesCollection);
auto s = db->rep->GetPropertiesOfAllTables(cf->rep, props.get());
if (!s.ok()) {
SaveError(errptr, s);
return;
}
collection->FromProperties(props.release());
}
void crocksdb_get_properties_of_tables_in_range(
crocksdb_t* db, crocksdb_column_family_handle_t* cf,
int num_ranges,
const char* const* start_keys, const size_t* start_keys_lens,
const char* const* limit_keys, const size_t* limit_keys_lens,
crocksdb_table_properties_collection_t* collection, char** errptr) {
std::vector<Range> ranges;
for (int i = 0; i < num_ranges; i++) {
ranges.emplace_back(Range(Slice(start_keys[i], start_keys_lens[i]),
Slice(limit_keys[i], limit_keys_lens[i])));
}
auto props = std::unique_ptr<TablePropertiesCollection>(new TablePropertiesCollection);
auto s = db->rep->GetPropertiesOfTablesInRange(cf->rep, &ranges[0], ranges.size(), props.get());
if (!s.ok()) {
SaveError(errptr, s);
return;
}
collection->FromProperties(props.release());
}
} // end extern "C" } // end extern "C"
...@@ -111,6 +111,14 @@ typedef struct crocksdb_ingestexternalfileoptions_t crocksdb_ingestexternalfileo ...@@ -111,6 +111,14 @@ typedef struct crocksdb_ingestexternalfileoptions_t crocksdb_ingestexternalfileo
typedef struct crocksdb_sstfilewriter_t crocksdb_sstfilewriter_t; typedef struct crocksdb_sstfilewriter_t crocksdb_sstfilewriter_t;
typedef struct crocksdb_ratelimiter_t crocksdb_ratelimiter_t; typedef struct crocksdb_ratelimiter_t crocksdb_ratelimiter_t;
typedef struct crocksdb_pinnableslice_t crocksdb_pinnableslice_t; typedef struct crocksdb_pinnableslice_t crocksdb_pinnableslice_t;
typedef struct crocksdb_user_collected_properties_t
crocksdb_user_collected_properties_t;
typedef struct crocksdb_table_properties_collection_t
crocksdb_table_properties_collection_t;
typedef struct crocksdb_table_properties_collector_factory_t
crocksdb_table_properties_collector_factory_t;
typedef struct crocksdb_table_properties_collector_factory_context_t
crocksdb_table_properties_collector_factory_context_t;
/* DB operations */ /* DB operations */
...@@ -1173,6 +1181,48 @@ extern C_ROCKSDB_LIBRARY_API void crocksdb_pinnableslice_destroy( ...@@ -1173,6 +1181,48 @@ extern C_ROCKSDB_LIBRARY_API void crocksdb_pinnableslice_destroy(
extern C_ROCKSDB_LIBRARY_API const char* crocksdb_pinnableslice_value( extern C_ROCKSDB_LIBRARY_API const char* crocksdb_pinnableslice_value(
const crocksdb_pinnableslice_t* t, size_t* vlen); const crocksdb_pinnableslice_t* t, size_t* vlen);
/* Table Properties */
extern C_ROCKSDB_LIBRARY_API void
crocksdb_user_collected_properties_add(
crocksdb_user_collected_properties_t* props,
const char* key, size_t key_len, const char* value, size_t value_len);
extern C_ROCKSDB_LIBRARY_API crocksdb_table_properties_collector_factory_t*
crocksdb_table_properties_collector_factory_create(
crocksdb_table_properties_collector_factory_context_t* context);
extern C_ROCKSDB_LIBRARY_API void
crocksdb_table_properties_collector_factory_destroy(
crocksdb_table_properties_collector_factory_t* factory);
extern C_ROCKSDB_LIBRARY_API void
crocksdb_options_add_table_properties_collector_factory(
crocksdb_options_t* opt, crocksdb_table_properties_collector_factory_t* factory);
extern C_ROCKSDB_LIBRARY_API crocksdb_table_properties_collection_t*
crocksdb_table_properties_collection_create();
extern C_ROCKSDB_LIBRARY_API void
crocksdb_table_properties_collection_destroy(crocksdb_table_properties_collection_t*);
extern C_ROCKSDB_LIBRARY_API void
crocksdb_get_propeties_of_all_tables(crocksdb_t* db,
crocksdb_table_properties_collection_t* props, char** errptr);
extern C_ROCKSDB_LIBRARY_API void
crocksdb_get_propeties_of_all_tables_cf(
crocksdb_t* db, crocksdb_column_family_handle_t* cf,
crocksdb_table_properties_collection_t* props, char** errptr);
extern C_ROCKSDB_LIBRARY_API void
crocksdb_get_propeties_of_tables_in_range(
crocksdb_t* db, crocksdb_column_family_handle_t* cf,
int num_ranges,
const char* const* start_keys, const size_t* start_keys_lens,
const char* const* limit_keys, const size_t* limit_keys_lens,
crocksdb_table_properties_collection_t* props, char** errptr);
#ifdef __cplusplus #ifdef __cplusplus
} /* end extern "C" */ } /* end extern "C" */
#endif #endif
......
...@@ -17,7 +17,7 @@ extern crate libc; ...@@ -17,7 +17,7 @@ extern crate libc;
#[cfg(test)] #[cfg(test)]
extern crate tempdir; extern crate tempdir;
use libc::{c_char, c_uchar, c_int, c_void, size_t, uint64_t, c_double}; use libc::{c_char, c_uchar, c_int, c_void, size_t, uint8_t, uint64_t, c_double};
use std::ffi::CStr; use std::ffi::CStr;
pub enum DBOptions {} pub enum DBOptions {}
...@@ -45,6 +45,9 @@ pub enum DBRateLimiter {} ...@@ -45,6 +45,9 @@ pub enum DBRateLimiter {}
pub enum DBLogger {} pub enum DBLogger {}
pub enum DBCompactOptions {} pub enum DBCompactOptions {}
pub enum DBPinnableSlice {} pub enum DBPinnableSlice {}
pub enum DBUserCollectedProperties {}
pub enum DBTablePropertiesCollection {}
pub enum DBTablePropertiesCollectorFactory {}
pub fn new_bloom_filter(bits: c_int) -> *mut DBFilterPolicy { pub fn new_bloom_filter(bits: c_int) -> *mut DBFilterPolicy {
unsafe { crocksdb_filterpolicy_create_bloom(bits) } unsafe { crocksdb_filterpolicy_create_bloom(bits) }
...@@ -54,6 +57,16 @@ pub fn new_cache(capacity: size_t) -> *mut DBCache { ...@@ -54,6 +57,16 @@ pub fn new_cache(capacity: size_t) -> *mut DBCache {
unsafe { crocksdb_cache_create_lru(capacity) } unsafe { crocksdb_cache_create_lru(capacity) }
} }
#[repr(C)]
#[derive(PartialEq, Eq, Clone, Copy)]
pub enum DBEntryType {
DBPut = 0,
DBDelete = 1,
DBSingleDelete = 2,
DBMerge = 3,
DBOther = 4,
}
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
#[repr(C)] #[repr(C)]
pub enum DBCompressionType { pub enum DBCompressionType {
...@@ -806,6 +819,45 @@ extern "C" { ...@@ -806,6 +819,45 @@ extern "C" {
valLen: *mut size_t) valLen: *mut size_t)
-> *const u8; -> *const u8;
pub fn crocksdb_pinnableslice_destroy(v: *mut DBPinnableSlice); pub fn crocksdb_pinnableslice_destroy(v: *mut DBPinnableSlice);
pub fn crocksdb_user_collected_properties_add(props: *mut c_void,
key: *const uint8_t,
key_len: size_t,
value: *const uint8_t,
value_len: size_t);
pub fn crocksdb_table_properties_collector_factory_create
(context: *mut c_void)
-> *mut DBTablePropertiesCollectorFactory;
pub fn crocksdb_table_properties_collector_factory_destroy(
factory: *mut DBTablePropertiesCollectorFactory);
pub fn crocksdb_options_add_table_properties_collector_factory(
options: *mut DBOptions, factory: *mut DBTablePropertiesCollectorFactory);
pub fn crocksdb_table_properties_collection_create() -> *mut DBTablePropertiesCollection;
pub fn crocksdb_table_properties_collection_destroy(c: *mut DBTablePropertiesCollection);
pub fn crocksdb_get_properties_of_all_tables(db: *mut DBInstance,
props: *mut DBTablePropertiesCollection,
errptr: *mut *mut c_char);
pub fn crocksdb_get_properties_of_all_tables_cf(db: *mut DBInstance,
cf: *mut DBCFHandle,
props: *mut DBTablePropertiesCollection,
errptr: *mut *mut c_char);
pub fn crocksdb_get_properties_of_tables_in_range(db: *mut DBInstance,
cf: *mut DBCFHandle,
num_ranges: c_int,
start_keys: *const *const uint8_t,
start_keys_lens: *const size_t,
limit_keys: *const *const uint8_t,
limit_keys_lens: *const size_t,
props: *mut DBTablePropertiesCollection,
errptr: *mut *mut c_char);
} }
#[cfg(test)] #[cfg(test)]
......
...@@ -26,14 +26,20 @@ pub mod merge_operator; ...@@ -26,14 +26,20 @@ pub mod merge_operator;
pub mod comparator; pub mod comparator;
mod compaction_filter; mod compaction_filter;
mod slice_transform; mod slice_transform;
mod table_properties;
mod table_properties_collector;
mod table_properties_collector_factory;
pub use compaction_filter::CompactionFilter; pub use compaction_filter::CompactionFilter;
pub use librocksdb_sys::{DBCompactionStyle, DBCompressionType, DBRecoveryMode, DBInfoLogLevel, pub use librocksdb_sys::{DBCompactionStyle, DBCompressionType, DBRecoveryMode, DBInfoLogLevel,
DBStatisticsTickerType, DBStatisticsHistogramType, new_bloom_filter, DBStatisticsTickerType, DBStatisticsHistogramType, new_bloom_filter,
CompactionPriority, self as crocksdb_ffi}; CompactionPriority, DBEntryType, self as crocksdb_ffi};
pub use merge_operator::MergeOperands; pub use merge_operator::MergeOperands;
pub use rocksdb::{DB, DBIterator, DBVector, Kv, SeekKey, Writable, WriteBatch, CFHandle, Range, pub use rocksdb::{DB, DBIterator, DBVector, Kv, SeekKey, Writable, WriteBatch, CFHandle, Range,
BackupEngine, SstFileWriter}; BackupEngine, SstFileWriter};
pub use rocksdb_options::{BlockBasedOptions, Options, ReadOptions, WriteOptions, RestoreOptions, pub use rocksdb_options::{BlockBasedOptions, Options, ReadOptions, WriteOptions, RestoreOptions,
IngestExternalFileOptions, EnvOptions, HistogramData, CompactOptions}; IngestExternalFileOptions, EnvOptions, HistogramData, CompactOptions};
pub use slice_transform::SliceTransform; pub use slice_transform::SliceTransform;
pub use table_properties::{TableProperties, TablePropertiesCollection};
pub use table_properties_collector::TablePropertiesCollector;
pub use table_properties_collector_factory::TablePropertiesCollectorFactory;
...@@ -26,6 +26,7 @@ use std::fmt::{self, Debug, Formatter}; ...@@ -26,6 +26,7 @@ use std::fmt::{self, Debug, Formatter};
use std::ops::Deref; use std::ops::Deref;
use std::path::Path; use std::path::Path;
use std::str::from_utf8; use std::str::from_utf8;
use table_properties::{TablePropertiesCollection, TablePropertiesCollectionHandle};
const DEFAULT_COLUMN_FAMILY: &'static str = "default"; const DEFAULT_COLUMN_FAMILY: &'static str = "default";
...@@ -1037,6 +1038,46 @@ impl DB { ...@@ -1037,6 +1038,46 @@ impl DB {
pub fn get_block_cache_usage_cf(&self, cf: &CFHandle) -> u64 { pub fn get_block_cache_usage_cf(&self, cf: &CFHandle) -> u64 {
self.get_options_cf(cf).get_block_cache_usage() self.get_options_cf(cf).get_block_cache_usage()
} }
pub fn get_properties_of_all_tables(&self) -> Result<TablePropertiesCollection, String> {
unsafe {
let props = TablePropertiesCollectionHandle::new();
ffi_try!(crocksdb_get_properties_of_all_tables(self.inner, props.inner));
props.normalize()
}
}
pub fn get_properties_of_all_tables_cf(&self,
cf: &CFHandle)
-> Result<TablePropertiesCollection, String> {
unsafe {
let props = TablePropertiesCollectionHandle::new();
ffi_try!(crocksdb_get_properties_of_all_tables_cf(self.inner, cf.inner, props.inner));
props.normalize()
}
}
pub fn get_properties_of_tables_in_range(&self,
cf: &CFHandle,
ranges: &[Range])
-> Result<TablePropertiesCollection, String> {
let start_keys: Vec<*const u8> = ranges.iter().map(|x| x.start_key.as_ptr()).collect();
let start_keys_lens: Vec<_> = ranges.iter().map(|x| x.start_key.len()).collect();
let limit_keys: Vec<*const u8> = ranges.iter().map(|x| x.end_key.as_ptr()).collect();
let limit_keys_lens: Vec<_> = ranges.iter().map(|x| x.end_key.len()).collect();
unsafe {
let props = TablePropertiesCollectionHandle::new();
ffi_try!(crocksdb_get_properties_of_tables_in_range(self.inner,
cf.inner,
ranges.len() as i32,
start_keys.as_ptr(),
start_keys_lens.as_ptr(),
limit_keys.as_ptr(),
limit_keys_lens.as_ptr(),
props.inner));
props.normalize()
}
}
} }
impl Writable for DB { impl Writable for DB {
......
...@@ -26,6 +26,8 @@ use merge_operator::MergeFn; ...@@ -26,6 +26,8 @@ use merge_operator::MergeFn;
use slice_transform::{SliceTransform, new_slice_transform}; use slice_transform::{SliceTransform, new_slice_transform};
use std::ffi::{CStr, CString}; use std::ffi::{CStr, CString};
use std::mem; use std::mem;
use table_properties_collector_factory::{TablePropertiesCollectorFactory,
new_table_properties_collector_factory_context};
#[derive(Default, Debug)] #[derive(Default, Debug)]
pub struct HistogramData { pub struct HistogramData {
...@@ -379,6 +381,16 @@ impl Options { ...@@ -379,6 +381,16 @@ impl Options {
} }
} }
pub fn add_table_properties_collector_factory(&mut self,
factory: Box<TablePropertiesCollectorFactory>) {
unsafe {
let context = new_table_properties_collector_factory_context(factory);
let context = Box::into_raw(context) as *mut c_void;
let f = crocksdb_ffi::crocksdb_table_properties_collector_factory_create(context);
crocksdb_ffi::crocksdb_options_add_table_properties_collector_factory(self.inner, f);
}
}
pub fn create_if_missing(&mut self, create_if_missing: bool) { pub fn create_if_missing(&mut self, create_if_missing: bool) {
unsafe { unsafe {
crocksdb_ffi::crocksdb_options_set_create_if_missing(self.inner, create_if_missing); crocksdb_ffi::crocksdb_options_set_create_if_missing(self.inner, create_if_missing);
......
use crocksdb_ffi::{self, DBTablePropertiesCollection};
use libc::{c_void, c_char, size_t, uint64_t};
use std::collections::HashMap;
use std::ffi::CStr;
use std::slice;
#[allow(dead_code)]
#[derive(Debug)]
pub struct TableProperties {
pub data_size: u64,
pub index_size: u64,
pub filter_size: u64,
pub raw_key_size: u64,
pub raw_value_size: u64,
pub num_data_blocks: u64,
pub num_entries: u64,
pub format_version: u64,
pub fixed_key_len: u64,
pub column_family_id: u64,
pub column_family_name: String,
pub filter_policy_name: String,
pub comparator_name: String,
pub merge_operator_name: String,
pub prefix_extractor_name: String,
pub property_collectors_names: String,
pub compression_name: String,
pub user_collected_properties: HashMap<Vec<u8>, Vec<u8>>,
pub readable_properties: HashMap<String, String>,
}
pub type TablePropertiesCollection = HashMap<String, TableProperties>;
fn ptr_to_string(ptr: *const c_char) -> Result<String, String> {
unsafe {
match CStr::from_ptr(ptr).to_str() {
Ok(s) => Ok(s.to_owned()),
Err(e) => Err(format!("{}", e)),
}
}
}
#[repr(C)]
struct CShallowStringsMap {
size: size_t,
keys: *const *const c_char,
keys_lens: *const size_t,
values: *const *const c_char,
values_lens: *const size_t,
}
impl CShallowStringsMap {
fn to_bytes_map(&self) -> Result<HashMap<Vec<u8>, Vec<u8>>, String> {
let mut res = HashMap::new();
unsafe {
let keys = slice::from_raw_parts(self.keys, self.size);
let keys_lens = slice::from_raw_parts(self.keys_lens, self.size);
let values = slice::from_raw_parts(self.values, self.size);
let values_lens = slice::from_raw_parts(self.values_lens, self.size);
for i in 0..self.size {
let k = slice::from_raw_parts(keys[i] as *const u8, keys_lens[i]);
let v = slice::from_raw_parts(values[i] as *const u8, values_lens[i]);
res.insert(k.to_owned(), v.to_owned());
}
}
Ok(res)
}
fn to_strings_map(&self) -> Result<HashMap<String, String>, String> {
let mut res = HashMap::new();
unsafe {
let keys = slice::from_raw_parts(self.keys, self.size);
let values = slice::from_raw_parts(self.values, self.size);
for i in 0..self.size {
let k = ptr_to_string(keys[i])?;
let v = ptr_to_string(values[i])?;
res.insert(k, v);
}
}
Ok(res)
}
}
#[repr(C)]
struct TablePropertiesContext {
data_size: uint64_t,
index_size: uint64_t,
filter_size: uint64_t,
raw_key_size: uint64_t,
raw_value_size: uint64_t,
num_data_blocks: uint64_t,
num_entries: uint64_t,
format_version: uint64_t,
fixed_key_len: uint64_t,
column_family_id: uint64_t,
column_family_name: *const c_char,
filter_policy_name: *const c_char,
comparator_name: *const c_char,
merge_operator_name: *const c_char,
prefix_extractor_name: *const c_char,
property_collectors_names: *const c_char,
compression_name: *const c_char,
user_collected_properties: CShallowStringsMap,
readable_properties: CShallowStringsMap,
}
#[repr(C)]
pub struct TablePropertiesCollectionContext {
inner: *mut c_void,
size: size_t,
keys: *const *const c_char,
values: *const TablePropertiesContext,
}
pub struct TablePropertiesCollectionHandle {
pub inner: *mut DBTablePropertiesCollection,
}
impl Drop for TablePropertiesCollectionHandle {
fn drop(&mut self) {
unsafe {
crocksdb_ffi::crocksdb_table_properties_collection_destroy(self.inner);
}
}
}
impl TablePropertiesCollectionHandle {
pub fn new() -> TablePropertiesCollectionHandle {
unsafe {
TablePropertiesCollectionHandle {
inner: crocksdb_ffi::crocksdb_table_properties_collection_create(),
}
}
}
pub fn normalize(&self) -> Result<TablePropertiesCollection, String> {
let mut collection = TablePropertiesCollection::new();
unsafe {
let ctx = &*(self.inner as *mut TablePropertiesCollectionContext);
let keys = slice::from_raw_parts(ctx.keys, ctx.size);
let values = slice::from_raw_parts(ctx.values, ctx.size);
for i in 0..ctx.size {
let props = &values[i];
let k = ptr_to_string(keys[i])?;
let v = TableProperties {
data_size: props.data_size,
index_size: props.index_size,
filter_size: props.filter_size,
raw_key_size: props.raw_key_size,
raw_value_size: props.raw_value_size,
num_data_blocks: props.num_data_blocks,
num_entries: props.num_entries,
format_version: props.format_version,
fixed_key_len: props.fixed_key_len,
column_family_id: props.column_family_id,
column_family_name: ptr_to_string(props.column_family_name)?,
filter_policy_name: ptr_to_string(props.filter_policy_name)?,
comparator_name: ptr_to_string(props.comparator_name)?,
merge_operator_name: ptr_to_string(props.merge_operator_name)?,
prefix_extractor_name: ptr_to_string(props.prefix_extractor_name)?,
property_collectors_names: ptr_to_string(props.property_collectors_names)?,
compression_name: ptr_to_string(props.compression_name)?,
user_collected_properties: props.user_collected_properties.to_bytes_map()?,
readable_properties: props.readable_properties.to_strings_map()?,
};
collection.insert(k, v);
}
}
Ok(collection)
}
}
use crocksdb_ffi::{self, DBEntryType};
use libc::{c_void, c_char, c_int, uint8_t, uint64_t, size_t};
use std::collections::HashMap;
use std::mem;
use std::slice;
pub trait TablePropertiesCollector {
fn name(&self) -> &str;
fn add_userkey(&mut self, key: &[u8], value: &[u8], entry_type: DBEntryType);
fn finish(&mut self) -> HashMap<Vec<u8>, Vec<u8>>;
fn readable_properties(&self) -> HashMap<String, String>;
}
#[repr(C)]
pub struct TablePropertiesCollectorContext {
collector: *mut c_void,
name: extern "C" fn(*mut c_void) -> *const c_char,
destructor: extern "C" fn(*mut c_void),
add_userkey: extern "C" fn(*mut c_void,
*const uint8_t,
size_t,
*const uint8_t,
size_t,
c_int,
uint64_t,
uint64_t),
finish: extern "C" fn(*mut c_void, *mut c_void),
readable_properties: extern "C" fn(*mut c_void, *mut c_void),
}
extern "C" fn name(context: *mut c_void) -> *const c_char {
unsafe {
let context = &mut *(context as *mut TablePropertiesCollectorContext);
let collector = &mut *(context.collector as *mut Box<TablePropertiesCollector>);
collector.name().as_ptr() as *const c_char
}
}
extern "C" fn destructor(context: *mut c_void) {
unsafe {
let context = Box::from_raw(context as *mut TablePropertiesCollectorContext);
Box::from_raw(context.collector as *mut Box<TablePropertiesCollector>);
}
}
pub extern "C" fn add_userkey(context: *mut c_void,
key: *const uint8_t,
key_len: size_t,
value: *const uint8_t,
value_len: size_t,
entry_type: c_int,
_: uint64_t,
_: uint64_t) {
unsafe {
let context = &mut *(context as *mut TablePropertiesCollectorContext);
let collector = &mut *(context.collector as *mut Box<TablePropertiesCollector>);
let key = slice::from_raw_parts(key, key_len);
let value = slice::from_raw_parts(value, value_len);
collector.add_userkey(key, value, mem::transmute(entry_type))
}
}
pub extern "C" fn finish(context: *mut c_void, props: *mut c_void) {
unsafe {
let context = &mut *(context as *mut TablePropertiesCollectorContext);
let collector = &mut *(context.collector as *mut Box<TablePropertiesCollector>);
for (key, value) in collector.finish() {
crocksdb_ffi::crocksdb_user_collected_properties_add(props,
key.as_ptr(),
key.len(),
value.as_ptr(),
value.len());
}
}
}
pub extern "C" fn readable_properties(context: *mut c_void, props: *mut c_void) {
unsafe {
let context = &mut *(context as *mut TablePropertiesCollectorContext);
let collector = &mut *(context.collector as *mut Box<TablePropertiesCollector>);
for (key, value) in collector.readable_properties() {
crocksdb_ffi::crocksdb_user_collected_properties_add(props,
key.as_ptr(),
key.len(),
value.as_ptr(),
value.len());
}
}
}
pub unsafe fn new_table_properties_collector_context(collector: Box<TablePropertiesCollector>)
-> Box<TablePropertiesCollectorContext> {
Box::new(TablePropertiesCollectorContext {
collector: Box::into_raw(Box::new(collector)) as *mut c_void,
name: name,
destructor: destructor,
add_userkey: add_userkey,
finish: finish,
readable_properties: readable_properties,
})
}
use libc::{c_void, c_char, uint32_t};
use table_properties_collector::{TablePropertiesCollector, new_table_properties_collector_context};
pub trait TablePropertiesCollectorFactory {
fn name(&self) -> &str;
fn create_table_properties_collector(&mut self, cf: u32) -> Box<TablePropertiesCollector>;
}
#[repr(C)]
pub struct TablePropertiesCollectorFactoryContext {
factory: *mut c_void,
name: extern "C" fn(*mut c_void) -> *const c_char,
destructor: extern "C" fn(*mut c_void),
create_table_properties_collector: extern "C" fn(*mut c_void, uint32_t) -> *mut c_void,
}
extern "C" fn name(context: *mut c_void) -> *const c_char {
unsafe {
let context = &mut *(context as *mut TablePropertiesCollectorFactoryContext);
let factory = &mut *(context.factory as *mut Box<TablePropertiesCollectorFactory>);
factory.name().as_ptr() as *const c_char
}
}
extern "C" fn destructor(context: *mut c_void) {
unsafe {
let context = Box::from_raw(context as *mut TablePropertiesCollectorFactoryContext);
Box::from_raw(context.factory as *mut Box<TablePropertiesCollectorFactory>);
}
}
extern "C" fn create_table_properties_collector(context: *mut c_void, cf: uint32_t) -> *mut c_void {
unsafe {
let context = &mut *(context as *mut TablePropertiesCollectorFactoryContext);
let factory = &mut *(context.factory as *mut Box<TablePropertiesCollectorFactory>);
let collector = factory.create_table_properties_collector(cf);
Box::into_raw(new_table_properties_collector_context(collector)) as *mut c_void
}
}
pub unsafe fn new_table_properties_collector_factory_context
(factory: Box<TablePropertiesCollectorFactory>)
-> Box<TablePropertiesCollectorFactoryContext> {
Box::new(TablePropertiesCollectorFactoryContext {
factory: Box::into_raw(Box::new(factory)) as *mut c_void,
name: name,
destructor: destructor,
create_table_properties_collector: create_table_properties_collector,
})
}
extern crate rocksdb; extern crate rocksdb;
extern crate tempdir; extern crate tempdir;
extern crate byteorder;
mod test_iterator; mod test_iterator;
mod test_multithreaded; mod test_multithreaded;
...@@ -11,3 +12,4 @@ mod test_ingest_external_file; ...@@ -11,3 +12,4 @@ mod test_ingest_external_file;
mod test_slice_transform; mod test_slice_transform;
mod test_prefix_extractor; mod test_prefix_extractor;
mod test_statistics; mod test_statistics;
mod test_table_properties;
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use rocksdb::{DB, Range, Options, Writable, DBEntryType, TablePropertiesCollection,
TablePropertiesCollector, TablePropertiesCollectorFactory};
use std::cmp::Ordering;
use std::collections::HashMap;
use std::fmt;
use std::io::Cursor;
use tempdir::TempDir;
enum Props {
NumKeys = 0,
NumPuts,
NumMerges,
NumDeletes,
}
fn encode_u32(x: u32) -> Vec<u8> {
let mut w = Vec::new();
w.write_u32::<LittleEndian>(x).unwrap();
w
}
fn decode_u32(x: &[u8]) -> u32 {
let mut r = Cursor::new(x);
r.read_u32::<LittleEndian>().unwrap()
}
struct ExampleCollector {
num_keys: u32,
num_puts: u32,
num_merges: u32,
num_deletes: u32,
last_key: Vec<u8>,
}
impl ExampleCollector {
fn new() -> ExampleCollector {
ExampleCollector {
num_keys: 0,
num_puts: 0,
num_merges: 0,
num_deletes: 0,
last_key: Vec::new(),
}
}
fn add(&mut self, other: &ExampleCollector) {
self.num_keys += other.num_keys;
self.num_puts += other.num_puts;
self.num_merges += other.num_merges;
self.num_deletes += other.num_deletes;
}
fn encode(&self) -> HashMap<Vec<u8>, Vec<u8>> {
let mut props = HashMap::new();
props.insert(vec![Props::NumKeys as u8], encode_u32(self.num_keys));
props.insert(vec![Props::NumPuts as u8], encode_u32(self.num_puts));
props.insert(vec![Props::NumMerges as u8], encode_u32(self.num_merges));
props.insert(vec![Props::NumDeletes as u8], encode_u32(self.num_deletes));
props
}
fn decode(props: &HashMap<Vec<u8>, Vec<u8>>) -> ExampleCollector {
let mut c = ExampleCollector::new();
c.num_keys = decode_u32(props.get(&vec![Props::NumKeys as u8]).unwrap());
c.num_puts = decode_u32(props.get(&vec![Props::NumPuts as u8]).unwrap());
c.num_merges = decode_u32(props.get(&vec![Props::NumMerges as u8]).unwrap());
c.num_deletes = decode_u32(props.get(&vec![Props::NumDeletes as u8]).unwrap());
c
}
}
impl fmt::Display for ExampleCollector {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f,
"keys={}, puts={}, merges={}, deletes={}",
self.num_keys,
self.num_puts,
self.num_merges,
self.num_deletes)
}
}
impl TablePropertiesCollector for ExampleCollector {
fn name(&self) -> &str {
"example-collector"
}
fn add_userkey(&mut self, key: &[u8], _: &[u8], entry_type: DBEntryType) {
if key.cmp(&self.last_key) != Ordering::Equal {
self.num_keys += 1;
self.last_key.clear();
self.last_key.extend_from_slice(key);
}
match entry_type {
DBEntryType::DBPut => self.num_puts += 1,
DBEntryType::DBMerge => self.num_merges += 1,
DBEntryType::DBDelete |
DBEntryType::DBSingleDelete => self.num_deletes += 1,
DBEntryType::DBOther => {}
}
}
fn finish(&mut self) -> HashMap<Vec<u8>, Vec<u8>> {
self.encode()
}
fn readable_properties(&self) -> HashMap<String, String> {
let mut props = HashMap::new();
props.insert("num_keys".to_owned(), self.num_keys.to_string());
props.insert("num_puts".to_owned(), self.num_puts.to_string());
props.insert("num_merges".to_owned(), self.num_merges.to_string());
props.insert("num_deletes".to_owned(), self.num_deletes.to_string());
props
}
}
struct ExampleFactory {}
impl ExampleFactory {
fn new() -> ExampleFactory {
ExampleFactory {}
}
}
impl TablePropertiesCollectorFactory for ExampleFactory {
fn name(&self) -> &str {
"example-factory"
}
fn create_table_properties_collector(&mut self, _: u32) -> Box<TablePropertiesCollector> {
Box::new(ExampleCollector::new())
}
}
fn check_collection(collection: &TablePropertiesCollection,
num_files: u32,
num_keys: u32,
num_puts: u32,
num_merges: u32,
num_deletes: u32) {
let mut res = ExampleCollector::new();
for (_, props) in collection {
assert_eq!(props.property_collectors_names, "[example-factory]");
res.add(&ExampleCollector::decode(&props.user_collected_properties));
}
assert_eq!(collection.len() as u32, num_files);
assert_eq!(res.num_keys, num_keys);
assert_eq!(res.num_puts, num_puts);
assert_eq!(res.num_merges, num_merges);
assert_eq!(res.num_deletes, num_deletes);
}
#[test]
fn test_table_properties_collector_factory() {
let mut opts = Options::new();
opts.create_if_missing(true);
opts.add_table_properties_collector_factory(Box::new(ExampleFactory::new()));
let path = TempDir::new("_rust_rocksdb_collectortest").expect("");
let db = DB::open(opts, path.path().to_str().unwrap()).unwrap();
let samples = vec![(b"key1".to_vec(), b"value1".to_vec()),
(b"key2".to_vec(), b"value2".to_vec()),
(b"key3".to_vec(), b"value3".to_vec()),
(b"key4".to_vec(), b"value4".to_vec())];
// Put 4 keys.
for &(ref k, ref v) in &samples {
db.put(k, v).unwrap();
assert_eq!(v.as_slice(), &*db.get(k).unwrap().unwrap());
}
db.flush(true).unwrap();
let collection = db.get_properties_of_all_tables().unwrap();
check_collection(&collection, 1, 4, 4, 0, 0);
// Delete 2 keys.
let cf = db.cf_handle("default").unwrap();
for &(ref k, _) in &samples[0..2] {
db.delete_cf(cf, k).unwrap();
}
db.flush_cf(cf, true).unwrap();
let collection = db.get_properties_of_all_tables_cf(cf).unwrap();
check_collection(&collection, 2, 6, 4, 0, 2);
// ["key2", "key3") covers two sst files.
let range = Range::new(b"key2", b"key3");
let collection = db.get_properties_of_tables_in_range(cf, &[range]).unwrap();
check_collection(&collection, 2, 6, 4, 0, 2);
// ["key3", "key4") covers only the first sst file.
let range = Range::new(b"key3", b"key4");
let collection = db.get_properties_of_tables_in_range(cf, &[range]).unwrap();
check_collection(&collection, 1, 4, 4, 0, 0);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment