Commit eb3cc66d authored by dorianzheng's avatar dorianzheng Committed by Huachao Huang

Introduce a new storage engine: Titan (#256)

parent c912ea07
[submodule "rocksdb"]
path = librocksdb_sys/rocksdb
url = https://github.com/pingcap/rocksdb.git
branch = release-5.15
branch = titan-5.15
......@@ -45,6 +45,10 @@
#include "util/file_reader_writer.h"
#include "util/coding.h"
#include "rocksdb/utilities/titandb/db.h"
#include "utilities/titandb/blob_format.h"
#include "utilities/titandb/options.h"
#include <stdlib.h>
#if !defined(ROCKSDB_MAJOR) || !defined(ROCKSDB_MINOR) || !defined(ROCKSDB_PATCH)
......@@ -155,6 +159,13 @@ using rocksdb::IOStatsContext;
using rocksdb::BottommostLevelCompaction;
using rocksdb::LDBTool;
using rocksdb::titandb::BlobIndex;
using rocksdb::titandb::TitanCFDescriptor;
using rocksdb::titandb::TitanCFOptions;
using rocksdb::titandb::TitanDB;
using rocksdb::titandb::TitanDBOptions;
using rocksdb::titandb::TitanOptions;
using std::shared_ptr;
extern "C" {
......@@ -4852,4 +4863,147 @@ void crocksdb_run_ldb_tool(int argc, char** argv) {
LDBTool().Run(argc, argv);
}
/* Titan */
struct ctitandb_options_t {
TitanOptions rep;
};
crocksdb_t* ctitandb_open_column_families(
const char* name, const crocksdb_options_t* db_options,
const ctitandb_options_t* tdb_options, int num_column_families,
const char** column_family_names,
const crocksdb_options_t** column_family_options,
const ctitandb_options_t** titan_column_family_options,
crocksdb_column_family_handle_t** column_family_handles, char** errptr) {
std::vector<TitanCFDescriptor> column_families;
for (int i = 0; i < num_column_families; i++) {
*(ColumnFamilyOptions*)&titan_column_family_options[i]->rep =
column_family_options[i]->rep;
column_families.push_back(
TitanCFDescriptor(std::string(column_family_names[i]),
TitanCFOptions(titan_column_family_options[i]->rep)));
}
TitanDB* db;
std::vector<ColumnFamilyHandle*> handles;
*(DBOptions*)&tdb_options->rep = db_options->rep;
if (SaveError(errptr, TitanDB::Open(tdb_options->rep, std::string(name),
column_families, &handles, &db))) {
return nullptr;
}
for (size_t i = 0; i < handles.size(); i++) {
crocksdb_column_family_handle_t* c_handle =
new crocksdb_column_family_handle_t;
c_handle->rep = handles[i];
column_family_handles[i] = c_handle;
}
crocksdb_t* result = new crocksdb_t;
result->rep = db;
return result;
}
/* TitanDBOptions */
ctitandb_options_t* ctitandb_options_create() { return new ctitandb_options_t; }
void ctitandb_options_destroy(ctitandb_options_t* opts) { delete opts; }
ctitandb_options_t* ctitandb_options_copy(ctitandb_options_t* src) {
if (src == nullptr) {
return nullptr;
}
return new ctitandb_options_t{src->rep};
}
const char* ctitandb_options_dirname(ctitandb_options_t* opts) {
return opts->rep.dirname.c_str();
}
void ctitandb_options_set_dirname(ctitandb_options_t* opts, const char* name) {
opts->rep.dirname = name;
}
uint64_t ctitandb_options_min_blob_size(ctitandb_options_t* opts) {
return opts->rep.min_blob_size;
}
void ctitandb_options_set_min_blob_size(ctitandb_options_t* opts,
uint64_t size) {
opts->rep.min_blob_size = size;
}
int ctitandb_options_blob_file_compression(ctitandb_options_t* opts) {
return opts->rep.blob_file_compression;
}
void ctitandb_options_set_blob_file_compression(ctitandb_options_t* opts,
int type) {
opts->rep.blob_file_compression = static_cast<CompressionType>(type);
}
void ctitandb_decode_blob_index(const char* value, size_t value_size,
ctitandb_blob_index_t* index, char** errptr) {
Slice v(value, value_size);
BlobIndex bi;
if (SaveError(errptr, bi.DecodeFrom(&v))) {
return;
}
index->file_number = bi.file_number;
index->blob_offset = bi.blob_handle.offset;
index->blob_size = bi.blob_handle.size;
}
void ctitandb_options_set_disable_background_gc(ctitandb_options_t* options,
unsigned char disable) {
options->rep.disable_background_gc = disable;
}
void ctitandb_options_set_max_gc_batch_size(ctitandb_options_t* options,
uint64_t size) {
options->rep.max_gc_batch_size = size;
}
void ctitandb_options_set_min_gc_batch_size(ctitandb_options_t* options,
uint64_t size) {
options->rep.min_gc_batch_size = size;
}
void ctitandb_options_set_blob_file_discardable_ratio(
ctitandb_options_t* options, float ratio) {
options->rep.blob_file_discardable_ratio = ratio;
}
void ctitandb_options_set_sample_file_size_ratio(ctitandb_options_t* options,
float ratio) {
options->rep.sample_file_size_ratio = ratio;
}
void ctitandb_options_set_merge_small_file_threshold(
ctitandb_options_t* options, uint64_t size) {
options->rep.merge_small_file_threshold = size;
}
void ctitandb_options_set_max_background_gc(ctitandb_options_t* options,
int32_t size) {
options->rep.max_background_gc = size;
}
void ctitandb_options_set_blob_cache(ctitandb_options_t* options,
crocksdb_cache_t* cache) {
if (cache) {
options->rep.blob_cache = cache->rep;
}
}
void ctitandb_options_set_discardable_ratio(ctitandb_options_t* options,
float ratio) {
options->rep.blob_file_discardable_ratio = ratio;
}
void ctitandb_options_set_sample_ratio(ctitandb_options_t* options,
float ratio) {
options->rep.sample_file_size_ratio = ratio;
}
} // end extern "C"
......@@ -1927,6 +1927,88 @@ crocksdb_iostats_context_logger_nanos(crocksdb_iostats_context_t*);
extern C_ROCKSDB_LIBRARY_API void
crocksdb_run_ldb_tool(int argc, char** argv);
/* Titan */
struct ctitandb_blob_index_t {
uint64_t file_number;
uint64_t blob_offset;
uint64_t blob_size;
};
typedef struct ctitandb_options_t ctitandb_options_t;
typedef struct ctitandb_blob_index_t ctitandb_blob_index_t;
extern C_ROCKSDB_LIBRARY_API crocksdb_t* ctitandb_open_column_families(
const char* name, const crocksdb_options_t* options,
const ctitandb_options_t* tdb_options, int num_column_families,
const char** column_family_names,
const crocksdb_options_t** column_family_options,
const ctitandb_options_t** titan_column_family_options,
crocksdb_column_family_handle_t** column_family_handles, char** errptr);
/* TitanDBOptions */
extern C_ROCKSDB_LIBRARY_API ctitandb_options_t* ctitandb_options_create();
extern C_ROCKSDB_LIBRARY_API void ctitandb_options_destroy(ctitandb_options_t*);
extern C_ROCKSDB_LIBRARY_API ctitandb_options_t* ctitandb_options_copy(
ctitandb_options_t*);
extern C_ROCKSDB_LIBRARY_API const char* ctitandb_options_dirname(
ctitandb_options_t*);
extern C_ROCKSDB_LIBRARY_API void ctitandb_options_set_dirname(
ctitandb_options_t*, const char* name);
extern C_ROCKSDB_LIBRARY_API uint64_t
ctitandb_options_min_blob_size(ctitandb_options_t*);
extern C_ROCKSDB_LIBRARY_API void ctitandb_options_set_min_blob_size(
ctitandb_options_t*, uint64_t size);
extern C_ROCKSDB_LIBRARY_API int ctitandb_options_blob_file_compression(
ctitandb_options_t*);
extern C_ROCKSDB_LIBRARY_API void ctitandb_options_set_blob_file_compression(
ctitandb_options_t*, int type);
extern C_ROCKSDB_LIBRARY_API void ctitandb_decode_blob_index(
const char* value, size_t value_size, ctitandb_blob_index_t* index,
char** errptr);
extern C_ROCKSDB_LIBRARY_API void ctitandb_options_set_disable_background_gc(
ctitandb_options_t* options, unsigned char disable);
extern C_ROCKSDB_LIBRARY_API void ctitandb_options_set_max_gc_batch_size(
ctitandb_options_t* options, uint64_t size);
extern C_ROCKSDB_LIBRARY_API void ctitandb_options_set_min_gc_batch_size(
ctitandb_options_t* options, uint64_t size);
extern C_ROCKSDB_LIBRARY_API void
ctitandb_options_set_blob_file_discardable_ratio(ctitandb_options_t* options,
float ratio);
extern C_ROCKSDB_LIBRARY_API void ctitandb_options_set_sample_file_size_ratio(
ctitandb_options_t* options, float ratio);
extern C_ROCKSDB_LIBRARY_API void
ctitandb_options_set_merge_small_file_threshold(ctitandb_options_t* options,
uint64_t size);
extern C_ROCKSDB_LIBRARY_API void ctitandb_options_set_max_background_gc(
ctitandb_options_t* options, int32_t size);
extern C_ROCKSDB_LIBRARY_API void ctitandb_options_set_blob_cache(
ctitandb_options_t* options, crocksdb_cache_t* cache);
extern C_ROCKSDB_LIBRARY_API void ctitandb_options_set_discardable_ratio(
ctitandb_options_t* options, float ratio);
extern void ctitandb_options_set_sample_ratio(ctitandb_options_t* options,
float ratio);
#ifdef __cplusplus
} /* end extern "C" */
#endif
......
Subproject commit 84eaf9c23a1a277835608dcb979ae97229391981
Subproject commit 2f9604595bd6ff8ea33b677f8dd8d0778f063c69
......@@ -83,6 +83,16 @@ pub enum WriteStallCondition {
mod generated;
pub use generated::*;
pub enum DBTitanDBOptions {}
#[derive(Clone, Debug, Default)]
#[repr(C)]
pub struct DBTitanBlobIndex {
pub file_number: u64,
pub blob_offset: u64,
pub blob_size: u64,
}
pub fn new_bloom_filter(bits: c_int) -> *mut DBFilterPolicy {
unsafe { crocksdb_filterpolicy_create_bloom(bits) }
}
......@@ -103,7 +113,9 @@ pub enum DBEntryType {
Delete = 1,
SingleDelete = 2,
Merge = 3,
Other = 4,
RangeDeletion = 4,
BlobIndex = 5,
Other = 6,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
......@@ -1757,6 +1769,51 @@ extern "C" {
pub fn crocksdb_run_ldb_tool(argc: c_int, argv: *const *const c_char);
}
// Titan
extern "C" {
pub fn ctitandb_open_column_families(
path: *const c_char,
options: *const Options,
titan_options: *const DBTitanDBOptions,
num_column_families: c_int,
column_family_names: *const *const c_char,
column_family_options: *const *const Options,
titan_column_family_options: *const *const DBTitanDBOptions,
column_family_handles: *const *mut DBCFHandle,
err: *mut *mut c_char,
) -> *mut DBInstance;
pub fn ctitandb_options_create() -> *mut DBTitanDBOptions;
pub fn ctitandb_options_destroy(opts: *mut DBTitanDBOptions);
pub fn ctitandb_options_copy(opts: *mut DBTitanDBOptions) -> *mut DBTitanDBOptions;
pub fn ctitandb_options_dirname(opts: *mut DBTitanDBOptions) -> *const c_char;
pub fn ctitandb_options_set_dirname(opts: *mut DBTitanDBOptions, name: *const c_char);
pub fn ctitandb_options_min_blob_size(opts: *mut DBTitanDBOptions) -> u64;
pub fn ctitandb_options_set_min_blob_size(opts: *mut DBTitanDBOptions, size: u64);
pub fn ctitandb_options_blob_file_compression(opts: *mut DBTitanDBOptions)
-> DBCompressionType;
pub fn ctitandb_options_set_blob_file_compression(
opts: *mut DBTitanDBOptions,
t: DBCompressionType,
);
pub fn ctitandb_decode_blob_index(
value: *const u8,
value_size: u64,
index: *mut DBTitanBlobIndex,
errptr: *mut *mut c_char,
);
pub fn ctitandb_options_set_disable_background_gc(opts: *mut DBTitanDBOptions, disable: bool);
pub fn ctitandb_options_set_max_background_gc(opts: *mut DBTitanDBOptions, size: i32);
pub fn ctitandb_options_set_min_gc_batch_size(opts: *mut DBTitanDBOptions, size: u64);
pub fn ctitandb_options_set_max_gc_batch_size(opts: *mut DBTitanDBOptions, size: u64);
pub fn ctitandb_options_set_blob_cache(opts: *mut DBTitanDBOptions, cache: *mut DBCache);
pub fn ctitandb_options_set_discardable_ratio(opts: *mut DBTitanDBOptions, ratio: f64);
pub fn ctitandb_options_set_sample_ratio(opts: *mut DBTitanDBOptions, ratio: f64);
pub fn ctitandb_options_set_merge_small_file_threshold(opts: *mut DBTitanDBOptions, size: u64);
}
#[cfg(test)]
mod test {
use super::*;
......
......@@ -50,6 +50,7 @@ pub use table_properties::{
};
pub use table_properties_collector::TablePropertiesCollector;
pub use table_properties_collector_factory::TablePropertiesCollectorFactory;
pub use titan::{TitanBlobIndex, TitanDBOptions};
mod compaction_filter;
pub mod comparator;
......@@ -64,4 +65,5 @@ mod table_filter;
mod table_properties;
mod table_properties_collector;
mod table_properties_collector_factory;
mod titan;
mod util;
......@@ -466,6 +466,10 @@ impl DB {
.iter()
.map(|x| x.inner as *const crocksdb_ffi::Options)
.collect();
let titan_cf_options: Vec<_> = options
.iter()
.map(|x| x.titan_inner as *const crocksdb_ffi::DBTitanDBOptions)
.collect();
let readonly = if error_if_log_file_exist.is_some() {
true
......@@ -489,8 +493,18 @@ impl DB {
let db_cfs_count = cf_names.len() as c_int;
let db_cf_ptrs = cf_names.as_ptr();
let db_cf_opts = cf_options.as_ptr();
let titan_cf_opts = titan_cf_options.as_ptr();
let db_cf_handles = cf_handles.as_ptr();
let titan_options = opts.titan_inner;
if !titan_options.is_null() {
if error_if_log_file_exist.is_some() {
return Err("TitanDB doesn't support read only mode.".to_owned());
} else if with_ttl {
return Err("TitanDB doesn't support ttl.".to_owned());
}
}
if !with_ttl {
if let Some(flag) = error_if_log_file_exist {
unsafe {
......@@ -504,7 +518,7 @@ impl DB {
flag
))
}
} else {
} else if titan_options.is_null() {
unsafe {
ffi_try!(crocksdb_open_column_families(
db_options,
......@@ -515,6 +529,19 @@ impl DB {
db_cf_handles
))
}
} else {
unsafe {
ffi_try!(ctitandb_open_column_families(
db_path,
db_options,
titan_options,
db_cfs_count,
db_cf_ptrs,
db_cf_opts,
titan_cf_opts,
db_cf_handles
))
}
}
} else {
let ttl_array = ttls_vec.as_ptr() as *const c_int;
......
......@@ -19,7 +19,8 @@ use crocksdb_ffi::{
self, DBBlockBasedTableOptions, DBBottommostLevelCompaction, DBCompactOptions,
DBCompactionOptions, DBCompressionType, DBFifoCompactionOptions, DBFlushOptions,
DBInfoLogLevel, DBInstance, DBRateLimiter, DBReadOptions, DBRecoveryMode, DBRestoreOptions,
DBSnapshot, DBStatisticsHistogramType, DBStatisticsTickerType, DBWriteOptions, Options,
DBSnapshot, DBStatisticsHistogramType, DBStatisticsTickerType, DBTitanDBOptions,
DBWriteOptions, Options,
};
use event_listener::{new_event_listener, EventListener};
use libc::{self, c_double, c_int, c_uchar, c_void, size_t};
......@@ -30,11 +31,13 @@ use slice_transform::{new_slice_transform, SliceTransform};
use std::ffi::{CStr, CString};
use std::mem;
use std::path::Path;
use std::ptr;
use std::sync::Arc;
use table_filter::{destroy_table_filter, table_filter, TableFilter};
use table_properties_collector_factory::{
new_table_properties_collector_factory, TablePropertiesCollectorFactory,
};
use titan::TitanDBOptions;
#[derive(Default, Debug)]
pub struct HistogramData {
......@@ -548,12 +551,16 @@ impl Drop for CompactionOptions {
pub struct DBOptions {
pub inner: *mut Options,
env: Option<Arc<Env>>,
pub titan_inner: *mut DBTitanDBOptions,
}
impl Drop for DBOptions {
fn drop(&mut self) {
unsafe {
crocksdb_ffi::crocksdb_options_destroy(self.inner);
if !self.titan_inner.is_null() {
crocksdb_ffi::ctitandb_options_destroy(self.titan_inner);
}
}
}
}
......@@ -566,6 +573,7 @@ impl Default for DBOptions {
DBOptions {
inner: opts,
env: None,
titan_inner: ptr::null_mut::<DBTitanDBOptions>(),
}
}
}
......@@ -576,9 +584,14 @@ impl Clone for DBOptions {
unsafe {
let opts = crocksdb_ffi::crocksdb_options_copy(self.inner);
assert!(!opts.is_null());
let mut titan_opts = ptr::null_mut::<DBTitanDBOptions>();
if !self.titan_inner.is_null() {
titan_opts = crocksdb_ffi::ctitandb_options_copy(self.titan_inner);
}
DBOptions {
inner: opts,
env: self.env.clone(),
titan_inner: titan_opts,
}
}
}
......@@ -597,9 +610,14 @@ impl DBOptions {
DBOptions {
inner: inner,
env: None,
titan_inner: ptr::null_mut::<DBTitanDBOptions>(),
}
}
pub fn set_titandb_options(&mut self, opts: &TitanDBOptions) {
self.titan_inner = unsafe { crocksdb_ffi::ctitandb_options_copy(opts.inner) }
}
pub fn increase_parallelism(&mut self, parallelism: i32) {
unsafe {
crocksdb_ffi::crocksdb_options_increase_parallelism(self.inner, parallelism);
......@@ -948,6 +966,7 @@ impl DBOptions {
pub struct ColumnFamilyOptions {
pub inner: *mut Options,
pub titan_inner: *mut DBTitanDBOptions,
env: Option<Arc<Env>>,
filter: Option<CompactionFilterHandle>,
}
......@@ -956,6 +975,9 @@ impl Drop for ColumnFamilyOptions {
fn drop(&mut self) {
unsafe {
crocksdb_ffi::crocksdb_options_destroy(self.inner);
if !self.titan_inner.is_null() {
crocksdb_ffi::ctitandb_options_destroy(self.titan_inner);
}
}
}
}
......@@ -970,6 +992,7 @@ impl Default for ColumnFamilyOptions {
);
ColumnFamilyOptions {
inner: opts,
titan_inner: ptr::null_mut::<DBTitanDBOptions>(),
env: None,
filter: None,
}
......@@ -983,8 +1006,13 @@ impl Clone for ColumnFamilyOptions {
unsafe {
let opts = crocksdb_ffi::crocksdb_options_copy(self.inner);
assert!(!opts.is_null());
let mut titan_opts = ptr::null_mut::<DBTitanDBOptions>();
if !self.titan_inner.is_null() {
titan_opts = crocksdb_ffi::ctitandb_options_copy(self.titan_inner);
}
ColumnFamilyOptions {
inner: opts,
titan_inner: titan_opts,
env: self.env.clone(),
filter: None,
}
......@@ -1003,12 +1031,19 @@ impl ColumnFamilyOptions {
"could not new rocksdb options with null inner"
);
ColumnFamilyOptions {
inner: inner,
inner,
titan_inner: ptr::null_mut::<DBTitanDBOptions>(),
env: None,
filter: None,
}
}
pub fn set_titandb_options(&mut self, opts: &TitanDBOptions) {
if !opts.inner.is_null() {
self.titan_inner = unsafe { crocksdb_ffi::ctitandb_options_copy(opts.inner) }
}
}
pub fn optimize_level_style_compaction(&mut self, memtable_memory_budget: i32) {
unsafe {
crocksdb_ffi::crocksdb_options_optimize_level_style_compaction(
......
use std::ffi::{CStr, CString};
use std::ops::Deref;
use crocksdb_ffi::{self, DBCompressionType, DBTitanBlobIndex, DBTitanDBOptions};
use std::os::raw::c_double;
use std::os::raw::c_int;
use std::os::raw::c_uchar;
pub struct TitanDBOptions {
pub inner: *mut DBTitanDBOptions,
}
impl TitanDBOptions {
pub fn new() -> Self {
unsafe {
Self {
inner: crocksdb_ffi::ctitandb_options_create(),
}
}
}
pub fn dirname(&self) -> &str {
unsafe {
let name = crocksdb_ffi::ctitandb_options_dirname(self.inner);
CStr::from_ptr(name).to_str().unwrap()
}
}
pub fn set_dirname(&mut self, name: &str) {
let s = CString::new(name).unwrap();
unsafe {
crocksdb_ffi::ctitandb_options_set_dirname(self.inner, s.into_raw());
}
}
pub fn min_blob_size(&self) -> u64 {
unsafe { crocksdb_ffi::ctitandb_options_min_blob_size(self.inner) }
}
pub fn set_min_blob_size(&mut self, size: u64) {
unsafe {
crocksdb_ffi::ctitandb_options_set_min_blob_size(self.inner, size);
}
}
pub fn blob_file_compression(&self) -> DBCompressionType {
unsafe { crocksdb_ffi::ctitandb_options_blob_file_compression(self.inner) }
}
pub fn set_blob_file_compression(&mut self, t: DBCompressionType) {
unsafe {
crocksdb_ffi::ctitandb_options_set_blob_file_compression(self.inner, t);
}
}
pub fn set_disable_background_gc(&mut self, disable: bool) {
unsafe {
crocksdb_ffi::ctitandb_options_set_disable_background_gc(self.inner, disable);
}
}
pub fn set_max_background_gc(&mut self, size: i32) {
unsafe {
crocksdb_ffi::ctitandb_options_set_max_background_gc(self.inner, size);
}
}
pub fn set_min_gc_batch_size(&mut self, size: u64) {
unsafe {
crocksdb_ffi::ctitandb_options_set_min_gc_batch_size(self.inner, size);
}
}
pub fn set_max_gc_batch_size(&mut self, size: u64) {
unsafe {
crocksdb_ffi::ctitandb_options_set_max_gc_batch_size(self.inner, size);
}
}
pub fn set_blob_cache(
&mut self,
size: usize,
shard_bits: c_int,
capacity_limit: c_uchar,
pri_ratio: c_double,
) {
let cache = crocksdb_ffi::new_cache(size, shard_bits, capacity_limit, pri_ratio);
unsafe {
crocksdb_ffi::ctitandb_options_set_blob_cache(self.inner, cache);
crocksdb_ffi::crocksdb_cache_destroy(cache);
}
}
pub fn set_discardable_ratio(&mut self, ratio: f64) {
unsafe {
crocksdb_ffi::ctitandb_options_set_discardable_ratio(self.inner, ratio);
}
}
pub fn set_sample_ratio(&mut self, ratio: f64) {
unsafe {
crocksdb_ffi::ctitandb_options_set_sample_ratio(self.inner, ratio);
}
}
pub fn set_merge_small_file_threshold(&mut self, size: u64) {
unsafe {
crocksdb_ffi::ctitandb_options_set_merge_small_file_threshold(self.inner, size);
}
}
}
impl Drop for TitanDBOptions {
fn drop(&mut self) {
unsafe {
crocksdb_ffi::ctitandb_options_destroy(self.inner);
}
}
}
#[derive(Debug, Default)]
pub struct TitanBlobIndex {
inner: DBTitanBlobIndex,
}
impl TitanBlobIndex {
pub fn decode_from(value: &[u8]) -> Result<Self, String> {
let mut index = Self::default();
unsafe {
ffi_try!(ctitandb_decode_blob_index(
value.as_ptr(),
value.len() as u64,
&mut index.inner as *mut DBTitanBlobIndex
));
}
Ok(index)
}
}
impl Deref for TitanBlobIndex {
type Target = DBTitanBlobIndex;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
extern crate byteorder;
extern crate crc;
extern crate rand;
extern crate rocksdb;
extern crate tempdir;
mod test_column_family;
mod test_compact_range;
mod test_compaction_filter;
......@@ -16,4 +22,5 @@ mod test_rocksdb_options;
mod test_slice_transform;
mod test_statistics;
mod test_table_properties;
mod test_titan;
mod test_ttl;
......@@ -114,7 +114,7 @@ impl TablePropertiesCollector for ExampleCollector {
DBEntryType::Put => self.num_puts += 1,
DBEntryType::Merge => self.num_merges += 1,
DBEntryType::Delete | DBEntryType::SingleDelete => self.num_deletes += 1,
DBEntryType::Other => {}
_ => {}
}
}
......
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use tempdir::TempDir;
use rocksdb::{
ColumnFamilyOptions, DBCompressionType, DBEntryType, DBOptions, SeekKey,
TablePropertiesCollector, TablePropertiesCollectorFactory, TitanBlobIndex, TitanDBOptions,
UserCollectedProperties, Writable, DB,
};
fn encode_u32(x: u32) -> Vec<u8> {
let mut w = Vec::new();
w.write_u32::<LittleEndian>(x).unwrap();
w
}
fn decode_u32(mut x: &[u8]) -> u32 {
x.read_u32::<LittleEndian>().unwrap()
}
#[derive(Default)]
struct TitanCollector {
num_blobs: u32,
num_entries: u32,
}
impl TitanCollector {
fn add(&mut self, other: &TitanCollector) {
self.num_blobs += other.num_blobs;
self.num_entries += other.num_entries;
}
fn encode(&self) -> HashMap<Vec<u8>, Vec<u8>> {
let mut props = HashMap::new();
props.insert(vec![0], encode_u32(self.num_blobs));
props.insert(vec![1], encode_u32(self.num_entries));
props
}
fn decode(props: &UserCollectedProperties) -> TitanCollector {
let mut c = TitanCollector::default();
c.num_blobs = decode_u32(&props[&[0]]);
c.num_entries = decode_u32(&props[&[1]]);
c
}
}
impl TablePropertiesCollector for TitanCollector {
fn add(&mut self, _: &[u8], value: &[u8], entry_type: DBEntryType, _: u64, _: u64) {
self.num_entries += 1;
if let DBEntryType::BlobIndex = entry_type {
self.num_blobs += 1;
let index = TitanBlobIndex::decode_from(value).unwrap();
assert!(index.file_number > 0);
assert!(index.blob_size > 0);
}
}
fn finish(&mut self) -> HashMap<Vec<u8>, Vec<u8>> {
self.encode()
}
}
#[derive(Default)]
struct TitanCollectorFactory {}
impl TablePropertiesCollectorFactory for TitanCollectorFactory {
fn create_table_properties_collector(&mut self, _: u32) -> Box<TablePropertiesCollector> {
Box::new(TitanCollector::default())
}
}
fn check_table_properties(db: &DB, num_blobs: u32, num_entries: u32) {
let cf = db.cf_handle("default").unwrap();
let collection = db.get_properties_of_all_tables_cf(cf).unwrap();
let mut res = TitanCollector::default();
let props: HashMap<_, _> = collection.iter().collect();
for (_, v) in &props {
res.add(&TitanCollector::decode(v.user_collected_properties()));
}
assert_eq!(res.num_blobs, num_blobs);
assert_eq!(res.num_entries, num_entries);
}
#[test]
fn test_titandb() {
let max_value_size = 10;
let path = TempDir::new("test_titandb").unwrap();
let tdb_path = path.path().join("titandb");
let mut tdb_opts = TitanDBOptions::new();
tdb_opts.set_dirname(tdb_path.to_str().unwrap());
tdb_opts.set_min_blob_size(max_value_size / 2 + 1);
tdb_opts.set_blob_file_compression(DBCompressionType::No);
tdb_opts.set_disable_background_gc(true);
let mut opts = DBOptions::new();
opts.create_if_missing(true);
opts.set_titandb_options(&tdb_opts);
let mut cf_opts = ColumnFamilyOptions::new();
let f = TitanCollectorFactory::default();
cf_opts.add_table_properties_collector_factory("titan-collector", Box::new(f));
cf_opts.set_titandb_options(&tdb_opts);
let db = DB::open_cf(
opts,
path.path().to_str().unwrap(),
vec![("default", cf_opts)],
)
.unwrap();
let n = 10;
for i in 0..n {
for size in 0..max_value_size {
let k = (i * n + size) as u8;
let v = vec![k; (size + 1) as usize];
db.put(&[k], &v).unwrap();
}
db.flush(true).unwrap();
}
let mut iter = db.iter();
iter.seek(SeekKey::Start);
for i in 0..n {
for j in 0..n {
let k = (i * n + j) as u8;
let v = vec![k; (j + 1) as usize];
assert_eq!(db.get(&[k]).unwrap().unwrap(), &v);
assert!(iter.valid());
assert_eq!(iter.key(), &[k]);
assert_eq!(iter.value(), v.as_slice());
iter.next();
}
}
let num_entries = n as u32 * max_value_size as u32;
check_table_properties(&db, num_entries / 2, num_entries);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment