Commit 5d278e1f authored by Huachao Huang's avatar Huachao Huang Committed by UncP

Add wrapper for CompactFiles (#190)

* Add wrapper for CompactFiles
parent 882bf4ec
......@@ -134,6 +134,7 @@ using rocksdb::VectorRepFactory;
using rocksdb::ColumnFamilyMetaData;
using rocksdb::LevelMetaData;
using rocksdb::SstFileMetaData;
using rocksdb::CompactionOptions;
using std::shared_ptr;
......@@ -205,6 +206,9 @@ struct crocksdb_level_meta_data_t {
struct crocksdb_sst_file_meta_data_t {
SstFileMetaData rep;
};
struct crocksdb_compaction_options_t {
CompactionOptions rep;
};
struct crocksdb_compactionfilter_t : public CompactionFilter {
void* state_;
......@@ -1988,6 +1992,11 @@ void crocksdb_options_set_target_file_size_base(
opt->rep.target_file_size_base = n;
}
uint64_t crocksdb_options_get_target_file_size_base(
const crocksdb_options_t* opt) {
return opt->rep.target_file_size_base;
}
void crocksdb_options_set_target_file_size_multiplier(
crocksdb_options_t* opt, int n) {
opt->rep.target_file_size_multiplier = n;
......@@ -4081,4 +4090,39 @@ const char* crocksdb_sst_file_meta_data_name(const crocksdb_sst_file_meta_data_t
return meta->rep.name.data();
}
crocksdb_compaction_options_t* crocksdb_compaction_options_create() {
return new crocksdb_compaction_options_t();
}
void crocksdb_compaction_options_destroy(crocksdb_compaction_options_t* opts) {
delete opts;
}
void crocksdb_compaction_options_set_compression(
crocksdb_compaction_options_t* opts,
int compression) {
opts->rep.compression = static_cast<CompressionType>(compression);
}
void crocksdb_compaction_options_set_output_file_size_limit(
crocksdb_compaction_options_t* opts,
size_t size) {
opts->rep.output_file_size_limit = size;
}
void crocksdb_compact_files_cf(
crocksdb_t* db, crocksdb_column_family_handle_t* cf,
crocksdb_compaction_options_t* opts,
const char** input_file_names,
size_t input_file_count,
int output_level,
char** errptr) {
std::vector<std::string> input_files;
for (size_t i = 0; i < input_file_count; i++) {
input_files.push_back(input_file_names[i]);
}
auto s = db->rep->CompactFiles(opts->rep, cf->rep, input_files, output_level);
SaveError(errptr, s);
}
} // end extern "C"
......@@ -135,6 +135,7 @@ typedef struct crocksdb_keyversions_t crocksdb_keyversions_t;
typedef struct crocksdb_column_family_meta_data_t crocksdb_column_family_meta_data_t;
typedef struct crocksdb_level_meta_data_t crocksdb_level_meta_data_t;
typedef struct crocksdb_sst_file_meta_data_t crocksdb_sst_file_meta_data_t;
typedef struct crocksdb_compaction_options_t crocksdb_compaction_options_t;
typedef enum crocksdb_table_property_t {
kDataSize = 1,
......@@ -791,6 +792,8 @@ extern C_ROCKSDB_LIBRARY_API void crocksdb_options_set_level0_stop_writes_trigge
crocksdb_options_t*, int);
extern C_ROCKSDB_LIBRARY_API void crocksdb_options_set_target_file_size_base(
crocksdb_options_t*, uint64_t);
extern C_ROCKSDB_LIBRARY_API uint64_t crocksdb_options_get_target_file_size_base(
const crocksdb_options_t*);
extern C_ROCKSDB_LIBRARY_API void crocksdb_options_set_target_file_size_multiplier(
crocksdb_options_t*, int);
extern C_ROCKSDB_LIBRARY_API void crocksdb_options_set_max_bytes_for_level_base(
......@@ -1630,6 +1633,24 @@ crocksdb_sst_file_meta_data_size(const crocksdb_sst_file_meta_data_t*);
extern C_ROCKSDB_LIBRARY_API const char*
crocksdb_sst_file_meta_data_name(const crocksdb_sst_file_meta_data_t*);
/* CompactFiles */
extern C_ROCKSDB_LIBRARY_API crocksdb_compaction_options_t*
crocksdb_compaction_options_create();
extern C_ROCKSDB_LIBRARY_API void
crocksdb_compaction_options_destroy(crocksdb_compaction_options_t*);
extern C_ROCKSDB_LIBRARY_API void
crocksdb_compaction_options_set_compression(crocksdb_compaction_options_t*, int);
extern C_ROCKSDB_LIBRARY_API void
crocksdb_compaction_options_set_output_file_size_limit(crocksdb_compaction_options_t*, size_t);
extern C_ROCKSDB_LIBRARY_API void
crocksdb_compact_files_cf(crocksdb_t*, crocksdb_column_family_handle_t*,
crocksdb_compaction_options_t*,
const char** input_file_names,
size_t input_file_count,
int output_level,
char** errptr);
#ifdef __cplusplus
} /* end extern "C" */
#endif
......
......@@ -65,6 +65,7 @@ pub enum DBSequentialFile {}
pub enum DBColumnFamilyMetaData {}
pub enum DBLevelMetaData {}
pub enum DBSstFileMetaData {}
pub enum DBCompactionOptions {}
pub fn new_bloom_filter(bits: c_int) -> *mut DBFilterPolicy {
unsafe { crocksdb_filterpolicy_create_bloom(bits) }
......@@ -417,6 +418,7 @@ extern "C" {
pub fn crocksdb_options_set_level0_stop_writes_trigger(options: *mut Options, no: c_int);
pub fn crocksdb_options_set_write_buffer_size(options: *mut Options, bytes: u64);
pub fn crocksdb_options_set_target_file_size_base(options: *mut Options, bytes: u64);
pub fn crocksdb_options_get_target_file_size_base(options: *const Options) -> u64;
pub fn crocksdb_options_set_target_file_size_multiplier(options: *mut Options, mul: c_int);
pub fn crocksdb_options_set_max_bytes_for_level_base(options: *mut Options, bytes: u64);
pub fn crocksdb_options_set_max_bytes_for_level_multiplier(options: *mut Options, mul: f64);
......@@ -1566,6 +1568,27 @@ extern "C" {
pub fn crocksdb_sst_file_meta_data_size(meta: *const DBSstFileMetaData) -> size_t;
pub fn crocksdb_sst_file_meta_data_name(meta: *const DBSstFileMetaData) -> *const c_char;
pub fn crocksdb_compaction_options_create() -> *mut DBCompactionOptions;
pub fn crocksdb_compaction_options_destroy(opts: *mut DBCompactionOptions);
pub fn crocksdb_compaction_options_set_compression(
opts: *mut DBCompactionOptions,
compression: DBCompressionType,
);
pub fn crocksdb_compaction_options_set_output_file_size_limit(
opts: *mut DBCompactionOptions,
size: size_t,
);
pub fn crocksdb_compact_files_cf(
db: *mut DBInstance,
cf: *mut DBCFHandle,
opts: *const DBCompactionOptions,
input_file_names: *const *const c_char,
input_file_count: size_t,
output_level: c_int,
errptr: *mut *mut c_char,
);
}
#[cfg(test)]
......
......@@ -43,10 +43,10 @@ pub use metadata::{ColumnFamilyMetaData, LevelMetaData, SstFileMetaData};
pub use rocksdb::{set_external_sst_file_global_seq_no, BackupEngine, CFHandle, DBIterator,
DBVector, Env, ExternalSstFileInfo, Kv, Range, SeekKey, SequentialFile,
SstFileWriter, Writable, WriteBatch, DB};
pub use rocksdb_options::{BlockBasedOptions, ColumnFamilyOptions, CompactOptions, DBOptions,
EnvOptions, FifoCompactionOptions, HistogramData,
IngestExternalFileOptions, RateLimiter, ReadOptions, RestoreOptions,
WriteOptions};
pub use rocksdb_options::{BlockBasedOptions, ColumnFamilyOptions, CompactOptions,
CompactionOptions, DBOptions, EnvOptions, FifoCompactionOptions,
HistogramData, IngestExternalFileOptions, RateLimiter, ReadOptions,
RestoreOptions, WriteOptions};
pub use slice_transform::SliceTransform;
pub use table_filter::TableFilter;
pub use table_properties::{TableProperties, TablePropertiesCollection,
......
......@@ -15,11 +15,12 @@
use crocksdb_ffi::{self, DBBackupEngine, DBCFHandle, DBCompressionType, DBEnv, DBInstance,
DBPinnableSlice, DBSequentialFile, DBStatisticsHistogramType,
DBStatisticsTickerType, DBWriteBatch};
use libc::{self, c_int, c_void, size_t};
use libc::{self, c_char, c_int, c_void, size_t};
use metadata::ColumnFamilyMetaData;
use rocksdb_options::{ColumnFamilyDescriptor, ColumnFamilyOptions, CompactOptions, DBOptions,
EnvOptions, FlushOptions, HistogramData, IngestExternalFileOptions,
ReadOptions, RestoreOptions, UnsafeSnap, WriteOptions};
use rocksdb_options::{ColumnFamilyDescriptor, ColumnFamilyOptions, CompactOptions,
CompactionOptions, DBOptions, EnvOptions, FlushOptions, HistogramData,
IngestExternalFileOptions, ReadOptions, RestoreOptions, UnsafeSnap,
WriteOptions};
use std::{fs, ptr, slice};
use std::collections::BTreeMap;
use std::collections::btree_map::Entry;
......@@ -1379,6 +1380,27 @@ impl DB {
ColumnFamilyMetaData::from_ptr(inner)
}
}
pub fn compact_files_cf(
&self,
cf: &CFHandle,
opts: &CompactionOptions,
input_files: &[String],
output_level: i32,
) -> Result<(), String> {
unsafe {
let input_file_names: Vec<_> = input_files.iter().map(|f| f.as_ptr()).collect();
ffi_try!(crocksdb_compact_files_cf(
self.inner,
cf.inner,
opts.inner,
input_file_names.as_ptr() as *const *const c_char,
input_file_names.len(),
output_level
));
Ok(())
}
}
}
impl Writable for DB {
......
......@@ -15,10 +15,11 @@
use compaction_filter::{new_compaction_filter, CompactionFilter, CompactionFilterHandle};
use comparator::{self, compare_callback, ComparatorCallback};
use crocksdb_ffi::{self, DBBlockBasedTableOptions, DBCompactOptions, DBCompressionType,
DBFifoCompactionOptions, DBFlushOptions, DBInfoLogLevel, DBInstance,
DBRateLimiter, DBReadOptions, DBRecoveryMode, DBRestoreOptions, DBSnapshot,
DBStatisticsHistogramType, DBStatisticsTickerType, DBWriteOptions, Options};
use crocksdb_ffi::{self, DBBlockBasedTableOptions, DBCompactOptions, DBCompactionOptions,
DBCompressionType, DBFifoCompactionOptions, DBFlushOptions, DBInfoLogLevel,
DBInstance, DBRateLimiter, DBReadOptions, DBRecoveryMode, DBRestoreOptions,
DBSnapshot, DBStatisticsHistogramType, DBStatisticsTickerType, DBWriteOptions,
Options};
use event_listener::{new_event_listener, EventListener};
use libc::{self, c_double, c_int, c_uchar, c_void, size_t};
use merge_operator::{self, full_merge_callback, partial_merge_callback, MergeOperatorCallback};
......@@ -493,6 +494,40 @@ impl Drop for CompactOptions {
}
}
pub struct CompactionOptions {
pub inner: *mut DBCompactionOptions,
}
impl CompactionOptions {
pub fn new() -> CompactionOptions {
unsafe {
CompactionOptions {
inner: crocksdb_ffi::crocksdb_compaction_options_create(),
}
}
}
pub fn set_compression(&mut self, compression: DBCompressionType) {
unsafe {
crocksdb_ffi::crocksdb_compaction_options_set_compression(self.inner, compression);
}
}
pub fn set_output_file_size_limit(&mut self, size: usize) {
unsafe {
crocksdb_ffi::crocksdb_compaction_options_set_output_file_size_limit(self.inner, size);
}
}
}
impl Drop for CompactionOptions {
fn drop(&mut self) {
unsafe {
crocksdb_ffi::crocksdb_compaction_options_destroy(self.inner);
}
}
}
pub struct DBOptions {
pub inner: *mut Options,
}
......@@ -1145,6 +1180,10 @@ impl ColumnFamilyOptions {
}
}
pub fn get_target_file_size_base(&self) -> u64 {
unsafe { crocksdb_ffi::crocksdb_options_get_target_file_size_base(self.inner) }
}
pub fn set_min_write_buffer_number_to_merge(&mut self, to_merge: c_int) {
unsafe {
crocksdb_ffi::crocksdb_options_set_min_write_buffer_number_to_merge(
......
......@@ -11,7 +11,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use rocksdb::{ColumnFamilyOptions, DBOptions, Writable, DB};
use rocksdb::{CFHandle, ColumnFamilyOptions, CompactionOptions, DBCompressionType, DBOptions,
Writable, DB};
use tempdir::TempDir;
#[test]
......@@ -50,3 +51,51 @@ fn test_metadata() {
}
}
}
fn get_files_cf(db: &DB, cf: &CFHandle, max_level: usize) -> Vec<String> {
let mut files = Vec::new();
let cf_meta = db.get_column_family_meta_data(cf);
for (i, level) in cf_meta.get_levels().iter().enumerate() {
if i > max_level {
break;
}
for f in level.get_files() {
files.push(f.get_name());
}
}
files
}
#[test]
fn test_compact_files() {
let path = TempDir::new("_rust_rocksdb_test_metadata").unwrap();
let mut opts = DBOptions::new();
opts.create_if_missing(true);
let mut cf_opts = ColumnFamilyOptions::new();
cf_opts.set_disable_auto_compactions(true);
let db = DB::open_cf(
opts,
path.path().to_str().unwrap(),
vec![("default", cf_opts)],
).unwrap();
let cf_handle = db.cf_handle("default").unwrap();
let cf_opts = db.get_options_cf(cf_handle);
let output_file_size = cf_opts.get_target_file_size_base();
let mut opts = CompactionOptions::new();
opts.set_compression(DBCompressionType::Zstd);
opts.set_output_file_size_limit(output_file_size as usize);
let num_files = 5;
for i in 0..num_files {
let b = &[i as u8];
db.put(b, b).unwrap();
db.flush(true).unwrap();
}
let input_files = get_files_cf(&db, cf_handle, 0);
assert_eq!(input_files.len(), num_files);
db.compact_files_cf(cf_handle, &opts, &input_files, 0)
.unwrap();
assert_eq!(get_files_cf(&db, cf_handle, 0).len(), 1);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment