Commit f5ee9ed2 authored by Jay's avatar Jay Committed by zhangjinpeng1987

support event listener (#95)

parent ab16db78
...@@ -18,18 +18,19 @@ ...@@ -18,18 +18,19 @@
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/filter_policy.h" #include "rocksdb/filter_policy.h"
#include "rocksdb/iterator.h" #include "rocksdb/iterator.h"
#include "rocksdb/listener.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/merge_operator.h" #include "rocksdb/merge_operator.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/status.h" #include "rocksdb/rate_limiter.h"
#include "rocksdb/write_batch.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/universal_compaction.h"
#include "rocksdb/statistics.h"
#include "rocksdb/slice_transform.h" #include "rocksdb/slice_transform.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h" #include "rocksdb/table.h"
#include "rocksdb/table_properties.h" #include "rocksdb/table_properties.h"
#include "rocksdb/rate_limiter.h" #include "rocksdb/universal_compaction.h"
#include "rocksdb/utilities/backupable_db.h" #include "rocksdb/utilities/backupable_db.h"
#include "rocksdb/write_batch.h"
using rocksdb::Cache; using rocksdb::Cache;
using rocksdb::ColumnFamilyDescriptor; using rocksdb::ColumnFamilyDescriptor;
...@@ -38,6 +39,7 @@ using rocksdb::ColumnFamilyOptions; ...@@ -38,6 +39,7 @@ using rocksdb::ColumnFamilyOptions;
using rocksdb::CompactionFilter; using rocksdb::CompactionFilter;
using rocksdb::CompactionFilterFactory; using rocksdb::CompactionFilterFactory;
using rocksdb::CompactionFilterContext; using rocksdb::CompactionFilterContext;
using rocksdb::CompactionJobInfo;
using rocksdb::CompactionOptionsFIFO; using rocksdb::CompactionOptionsFIFO;
using rocksdb::Comparator; using rocksdb::Comparator;
using rocksdb::CompressionType; using rocksdb::CompressionType;
...@@ -46,9 +48,12 @@ using rocksdb::DB; ...@@ -46,9 +48,12 @@ using rocksdb::DB;
using rocksdb::DBOptions; using rocksdb::DBOptions;
using rocksdb::Env; using rocksdb::Env;
using rocksdb::EnvOptions; using rocksdb::EnvOptions;
using rocksdb::ExternalFileIngestionInfo;
using rocksdb::EventListener;
using rocksdb::InfoLogLevel; using rocksdb::InfoLogLevel;
using rocksdb::FileLock; using rocksdb::FileLock;
using rocksdb::FilterPolicy; using rocksdb::FilterPolicy;
using rocksdb::FlushJobInfo;
using rocksdb::FlushOptions; using rocksdb::FlushOptions;
using rocksdb::IngestExternalFileOptions; using rocksdb::IngestExternalFileOptions;
using rocksdb::Iterator; using rocksdb::Iterator;
...@@ -132,6 +137,15 @@ struct crocksdb_sstfilewriter_t { SstFileWriter* rep; }; ...@@ -132,6 +137,15 @@ struct crocksdb_sstfilewriter_t { SstFileWriter* rep; };
struct crocksdb_ratelimiter_t { RateLimiter* rep; }; struct crocksdb_ratelimiter_t { RateLimiter* rep; };
struct crocksdb_histogramdata_t { HistogramData rep; }; struct crocksdb_histogramdata_t { HistogramData rep; };
struct crocksdb_pinnableslice_t { PinnableSlice rep; }; struct crocksdb_pinnableslice_t { PinnableSlice rep; };
struct crocksdb_flushjobinfo_t {
FlushJobInfo rep;
};
struct crocksdb_compactionjobinfo_t {
CompactionJobInfo rep;
};
struct crocksdb_externalfileingestioninfo_t {
ExternalFileIngestionInfo rep;
};
struct crocksdb_compactionfiltercontext_t { struct crocksdb_compactionfiltercontext_t {
CompactionFilter::Context rep; CompactionFilter::Context rep;
...@@ -1541,6 +1555,143 @@ size_t crocksdb_options_get_block_cache_usage(crocksdb_options_t *opt) { ...@@ -1541,6 +1555,143 @@ size_t crocksdb_options_get_block_cache_usage(crocksdb_options_t *opt) {
return 0; return 0;
} }
/* FlushJobInfo */
const char* crocksdb_flushjobinfo_cf_name(const crocksdb_flushjobinfo_t* info,
size_t* size) {
*size = info->rep.cf_name.size();
return info->rep.cf_name.data();
}
const char* crocksdb_flushjobinfo_file_path(const crocksdb_flushjobinfo_t* info,
size_t* size) {
*size = info->rep.file_path.size();
return info->rep.file_path.data();
}
const crocksdb_table_properties_t* crocksdb_flushjobinfo_table_properties(
const crocksdb_flushjobinfo_t* info) {
return reinterpret_cast<const crocksdb_table_properties_t*>(
&info->rep.table_properties);
}
/* CompactionJobInfo */
const char* crocksdb_compactionjobinfo_cf_name(
const crocksdb_compactionjobinfo_t* info, size_t* size) {
*size = info->rep.cf_name.size();
return info->rep.cf_name.data();
}
size_t crocksdb_compactionjobinfo_input_files_count(
const crocksdb_compactionjobinfo_t* info) {
return info->rep.input_files.size();
}
const char* crocksdb_compactionjobinfo_input_file_at(
const crocksdb_compactionjobinfo_t* info, size_t pos, size_t* size) {
const std::string& path = info->rep.input_files[pos];
*size = path.size();
return path.data();
}
size_t crocksdb_compactionjobinfo_output_files_count(
const crocksdb_compactionjobinfo_t* info) {
return info->rep.output_files.size();
}
const char* crocksdb_compactionjobinfo_output_file_at(
const crocksdb_compactionjobinfo_t* info, size_t pos, size_t* size) {
const std::string& path = info->rep.output_files[pos];
*size = path.size();
return path.data();
}
const crocksdb_table_properties_collection_t*
crocksdb_compactionjobinfo_table_properties(
const crocksdb_compactionjobinfo_t* info) {
return reinterpret_cast<const crocksdb_table_properties_collection_t*>(
&info->rep.table_properties);
}
/* ExternalFileIngestionInfo */
const char* crocksdb_externalfileingestioninfo_cf_name(
const crocksdb_externalfileingestioninfo_t* info, size_t* size) {
*size = info->rep.cf_name.size();
return info->rep.cf_name.data();
}
const char* crocksdb_externalfileingestioninfo_internal_file_path(
const crocksdb_externalfileingestioninfo_t* info, size_t* size) {
*size = info->rep.internal_file_path.size();
return info->rep.internal_file_path.data();
}
const crocksdb_table_properties_t*
crocksdb_externalfileingestioninfo_table_properties(
const crocksdb_externalfileingestioninfo_t* info) {
return reinterpret_cast<const crocksdb_table_properties_t*>(
&info->rep.table_properties);
}
/* event listener */
struct crocksdb_eventlistener_t : public EventListener {
void* state_;
void (*destructor_)(void*);
void (*on_flush_completed)(void*, crocksdb_t*,
const crocksdb_flushjobinfo_t*);
void (*on_compaction_completed)(void*, crocksdb_t*,
const crocksdb_compactionjobinfo_t*);
void (*on_external_file_ingested)(
void*, crocksdb_t*, const crocksdb_externalfileingestioninfo_t*);
virtual void OnFlushCompleted(DB* db, const FlushJobInfo& info) {
crocksdb_t c_db = {db};
on_flush_completed(state_, &c_db,
reinterpret_cast<const crocksdb_flushjobinfo_t*>(&info));
}
virtual void OnCompactionCompleted(DB* db, const CompactionJobInfo& info) {
crocksdb_t c_db = {db};
on_compaction_completed(
state_, &c_db,
reinterpret_cast<const crocksdb_compactionjobinfo_t*>(&info));
}
virtual void OnExternalFileIngested(DB* db,
const ExternalFileIngestionInfo& info) {
crocksdb_t c_db = {db};
on_external_file_ingested(
state_, &c_db,
reinterpret_cast<const crocksdb_externalfileingestioninfo_t*>(&info));
}
virtual ~crocksdb_eventlistener_t() { destructor_(state_); }
};
crocksdb_eventlistener_t* crocksdb_eventlistener_create(
void* state_, void (*destructor_)(void*),
on_flush_completed_cb on_flush_completed,
on_compaction_completed_cb on_compaction_completed,
on_external_file_ingested_cb on_external_file_ingested) {
crocksdb_eventlistener_t* et = new crocksdb_eventlistener_t;
et->state_ = state_;
et->destructor_ = destructor_;
et->on_flush_completed = on_flush_completed;
et->on_compaction_completed = on_compaction_completed;
et->on_external_file_ingested = on_external_file_ingested;
return et;
}
void crocksdb_eventlistener_destroy(crocksdb_eventlistener_t* t) { delete t; }
void crocksdb_options_add_eventlistener(crocksdb_options_t* opt,
crocksdb_eventlistener_t* t) {
opt->rep.listeners.emplace_back(std::shared_ptr<EventListener>(t));
}
crocksdb_cuckoo_table_options_t* crocksdb_cuckoo_table_options_t*
crocksdb_cuckoo_options_create() { crocksdb_cuckoo_options_create() {
return new crocksdb_cuckoo_table_options_t; return new crocksdb_cuckoo_table_options_t;
......
...@@ -124,6 +124,11 @@ typedef struct crocksdb_table_properties_collector_t ...@@ -124,6 +124,11 @@ typedef struct crocksdb_table_properties_collector_t
crocksdb_table_properties_collector_t; crocksdb_table_properties_collector_t;
typedef struct crocksdb_table_properties_collector_factory_t typedef struct crocksdb_table_properties_collector_factory_t
crocksdb_table_properties_collector_factory_t; crocksdb_table_properties_collector_factory_t;
typedef struct crocksdb_flushjobinfo_t crocksdb_flushjobinfo_t;
typedef struct crocksdb_compactionjobinfo_t crocksdb_compactionjobinfo_t;
typedef struct crocksdb_externalfileingestioninfo_t
crocksdb_externalfileingestioninfo_t;
typedef struct crocksdb_eventlistener_t crocksdb_eventlistener_t;
typedef enum crocksdb_table_property_t { typedef enum crocksdb_table_property_t {
kDataSize = 1, kDataSize = 1,
...@@ -579,6 +584,67 @@ extern C_ROCKSDB_LIBRARY_API void crocksdb_options_set_block_based_table_factory ...@@ -579,6 +584,67 @@ extern C_ROCKSDB_LIBRARY_API void crocksdb_options_set_block_based_table_factory
extern C_ROCKSDB_LIBRARY_API size_t crocksdb_options_get_block_cache_usage( extern C_ROCKSDB_LIBRARY_API size_t crocksdb_options_get_block_cache_usage(
crocksdb_options_t *opt); crocksdb_options_t *opt);
/* Flush job info */
extern C_ROCKSDB_LIBRARY_API const char* crocksdb_flushjobinfo_cf_name(
const crocksdb_flushjobinfo_t*, size_t*);
extern C_ROCKSDB_LIBRARY_API const char* crocksdb_flushjobinfo_file_path(
const crocksdb_flushjobinfo_t*, size_t*);
extern C_ROCKSDB_LIBRARY_API const crocksdb_table_properties_t*
crocksdb_flushjobinfo_table_properties(const crocksdb_flushjobinfo_t*);
/* Compaction job info */
extern C_ROCKSDB_LIBRARY_API const char* crocksdb_compactionjobinfo_cf_name(
const crocksdb_compactionjobinfo_t*, size_t*);
extern C_ROCKSDB_LIBRARY_API size_t
crocksdb_compactionjobinfo_input_files_count(
const crocksdb_compactionjobinfo_t*);
extern C_ROCKSDB_LIBRARY_API const char*
crocksdb_compactionjobinfo_input_file_at(const crocksdb_compactionjobinfo_t*,
size_t pos, size_t*);
extern C_ROCKSDB_LIBRARY_API size_t
crocksdb_compactionjobinfo_output_files_count(
const crocksdb_compactionjobinfo_t*);
extern C_ROCKSDB_LIBRARY_API const char*
crocksdb_compactionjobinfo_output_file_at(const crocksdb_compactionjobinfo_t*,
size_t pos, size_t*);
extern C_ROCKSDB_LIBRARY_API const crocksdb_table_properties_collection_t*
crocksdb_compactionjobinfo_table_properties(
const crocksdb_compactionjobinfo_t*);
/* External file ingestion info */
extern C_ROCKSDB_LIBRARY_API const char*
crocksdb_externalfileingestioninfo_cf_name(
const crocksdb_externalfileingestioninfo_t*, size_t*);
extern C_ROCKSDB_LIBRARY_API const char*
crocksdb_externalfileingestioninfo_internal_file_path(
const crocksdb_externalfileingestioninfo_t*, size_t*);
extern C_ROCKSDB_LIBRARY_API const crocksdb_table_properties_t*
crocksdb_externalfileingestioninfo_table_properties(
const crocksdb_externalfileingestioninfo_t*);
/* Event listener */
typedef void (*on_flush_completed_cb)(void*, crocksdb_t*,
const crocksdb_flushjobinfo_t*);
typedef void (*on_compaction_completed_cb)(void*, crocksdb_t*,
const crocksdb_compactionjobinfo_t*);
typedef void (*on_external_file_ingested_cb)(
void*, crocksdb_t*, const crocksdb_externalfileingestioninfo_t*);
extern C_ROCKSDB_LIBRARY_API crocksdb_eventlistener_t*
crocksdb_eventlistener_create(
void* state_, void (*destructor_)(void*),
on_flush_completed_cb on_flush_completed,
on_compaction_completed_cb on_compaction_completed,
on_external_file_ingested_cb on_external_file_ingested);
extern C_ROCKSDB_LIBRARY_API void crocksdb_eventlistener_destroy(
crocksdb_eventlistener_t*);
extern C_ROCKSDB_LIBRARY_API void crocksdb_options_add_eventlistener(
crocksdb_options_t*, crocksdb_eventlistener_t*);
/* Cuckoo table options */ /* Cuckoo table options */
extern C_ROCKSDB_LIBRARY_API crocksdb_cuckoo_table_options_t* extern C_ROCKSDB_LIBRARY_API crocksdb_cuckoo_table_options_t*
......
...@@ -52,6 +52,10 @@ pub enum DBTablePropertiesCollection {} ...@@ -52,6 +52,10 @@ pub enum DBTablePropertiesCollection {}
pub enum DBTablePropertiesCollectionIterator {} pub enum DBTablePropertiesCollectionIterator {}
pub enum DBTablePropertiesCollector {} pub enum DBTablePropertiesCollector {}
pub enum DBTablePropertiesCollectorFactory {} pub enum DBTablePropertiesCollectorFactory {}
pub enum DBFlushJobInfo {}
pub enum DBCompactionJobInfo {}
pub enum DBIngestionInfo {}
pub enum DBEventListener {}
pub fn new_bloom_filter(bits: c_int) -> *mut DBFilterPolicy { pub fn new_bloom_filter(bits: c_int) -> *mut DBFilterPolicy {
unsafe { crocksdb_filterpolicy_create_bloom(bits) } unsafe { crocksdb_filterpolicy_create_bloom(bits) }
...@@ -904,9 +908,11 @@ extern "C" { ...@@ -904,9 +908,11 @@ extern "C" {
vlen: *mut size_t) vlen: *mut size_t)
-> *const uint8_t; -> *const uint8_t;
pub fn crocksdb_user_collected_properties_len(props: *const DBUserCollectedProperties) -> size_t; pub fn crocksdb_user_collected_properties_len(props: *const DBUserCollectedProperties)
-> size_t;
pub fn crocksdb_table_properties_collection_len(props: *const DBTablePropertiesCollection) -> size_t; pub fn crocksdb_table_properties_collection_len(props: *const DBTablePropertiesCollection)
-> size_t;
pub fn crocksdb_table_properties_collection_destroy(props: *mut DBTablePropertiesCollection); pub fn crocksdb_table_properties_collection_destroy(props: *mut DBTablePropertiesCollection);
...@@ -981,6 +987,56 @@ extern "C" { ...@@ -981,6 +987,56 @@ extern "C" {
errptr: *mut *mut c_char) errptr: *mut *mut c_char)
-> *mut DBTablePropertiesCollection; -> *mut DBTablePropertiesCollection;
pub fn crocksdb_flushjobinfo_cf_name(info: *const DBFlushJobInfo,
size: *mut size_t)
-> *const c_char;
pub fn crocksdb_flushjobinfo_file_path(info: *const DBFlushJobInfo,
size: *mut size_t)
-> *const c_char;
pub fn crocksdb_flushjobinfo_table_properties(info: *const DBFlushJobInfo)
-> *const DBTableProperties;
pub fn crocksdb_compactionjobinfo_cf_name(info: *const DBCompactionJobInfo,
size: *mut size_t)
-> *const c_char;
pub fn crocksdb_compactionjobinfo_input_files_count(info: *const DBCompactionJobInfo)
-> size_t;
pub fn crocksdb_compactionjobinfo_input_file_at(info: *const DBCompactionJobInfo,
pos: size_t,
len: *mut size_t)
-> *const c_char;
pub fn crocksdb_compactionjobinfo_output_files_count(info: *const DBCompactionJobInfo)
-> size_t;
pub fn crocksdb_compactionjobinfo_output_file_at(info: *const DBCompactionJobInfo,
pos: size_t,
len: *mut size_t)
-> *const c_char;
pub fn crocksdb_compactionjobinfo_table_properties(info: *const DBCompactionJobInfo)
-> *const DBTablePropertiesCollection;
pub fn crocksdb_externalfileingestioninfo_cf_name(info: *const DBIngestionInfo,
size: *mut size_t)
-> *const c_char;
pub fn crocksdb_externalfileingestioninfo_internal_file_path(info: *const DBIngestionInfo,
size: *mut size_t)
-> *const c_char;
pub fn crocksdb_externalfileingestioninfo_table_properties(info: *const DBIngestionInfo)
-> *const DBTableProperties;
pub fn crocksdb_eventlistener_create(state: *mut c_void,
destructor: extern "C" fn(*mut c_void),
flush: extern "C" fn(*mut c_void,
*mut DBInstance,
*const DBFlushJobInfo),
compact: extern "C" fn(*mut c_void,
*mut DBInstance,
*const DBCompactionJobInfo),
ingest: extern "C" fn(*mut c_void,
*mut DBInstance,
*const DBIngestionInfo))
-> *mut DBEventListener;
pub fn crocksdb_eventlistener_destroy(et: *mut DBEventListener);
pub fn crocksdb_options_add_eventlistener(opt: *mut DBOptions, et: *mut DBEventListener);
} }
#[cfg(test)] #[cfg(test)]
......
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
use {TableProperties, TablePropertiesCollectionView};
use crocksdb_ffi::{self, DBInstance, DBFlushJobInfo, DBCompactionJobInfo, DBIngestionInfo,
DBEventListener};
use libc::c_void;
use std::{slice, mem, str};
use std::path::Path;
macro_rules! fetch_str {
($func:ident($($arg:expr),*)) => ({
let mut len = 0;
let ptr = crocksdb_ffi::$func($($arg),*, &mut len);
let s = slice::from_raw_parts(ptr as *const u8, len);
str::from_utf8(s).unwrap()
})
}
pub struct FlushJobInfo(DBFlushJobInfo);
impl FlushJobInfo {
pub fn cf_name(&self) -> &str {
unsafe { fetch_str!(crocksdb_flushjobinfo_cf_name(&self.0)) }
}
pub fn file_path(&self) -> &Path {
let p = unsafe { fetch_str!(crocksdb_flushjobinfo_file_path(&self.0)) };
Path::new(p)
}
pub fn table_properties(&self) -> &TableProperties {
unsafe {
let prop = crocksdb_ffi::crocksdb_flushjobinfo_table_properties(&self.0);
TableProperties::from_ptr(prop)
}
}
}
pub struct CompactionJobInfo(DBCompactionJobInfo);
impl CompactionJobInfo {
pub fn cf_name(&self) -> &str {
unsafe { fetch_str!(crocksdb_compactionjobinfo_cf_name(&self.0)) }
}
pub fn input_file_count(&self) -> usize {
unsafe { crocksdb_ffi::crocksdb_compactionjobinfo_input_files_count(&self.0) }
}
pub fn input_file_at(&self, pos: usize) -> &Path {
let p = unsafe { fetch_str!(crocksdb_compactionjobinfo_input_file_at(&self.0, pos)) };
Path::new(p)
}
pub fn output_file_count(&self) -> usize {
unsafe { crocksdb_ffi::crocksdb_compactionjobinfo_output_files_count(&self.0) }
}
pub fn output_file_at(&self, pos: usize) -> &Path {
let p = unsafe { fetch_str!(crocksdb_compactionjobinfo_output_file_at(&self.0, pos)) };
Path::new(p)
}
pub fn table_properties(&self) -> &TablePropertiesCollectionView {
unsafe {
let prop = crocksdb_ffi::crocksdb_compactionjobinfo_table_properties(&self.0);
TablePropertiesCollectionView::from_ptr(prop)
}
}
}
pub struct IngestionInfo(DBIngestionInfo);
impl IngestionInfo {
pub fn cf_name(&self) -> &str {
unsafe { fetch_str!(crocksdb_externalfileingestioninfo_cf_name(&self.0)) }
}
pub fn internal_file_path(&self) -> &Path {
let p =
unsafe { fetch_str!(crocksdb_externalfileingestioninfo_internal_file_path(&self.0)) };
Path::new(p)
}
pub fn table_properties(&self) -> &TableProperties {
unsafe {
let prop = crocksdb_ffi::crocksdb_externalfileingestioninfo_table_properties(&self.0);
TableProperties::from_ptr(prop)
}
}
}
/// EventListener trait contains a set of call-back functions that will
/// be called when specific RocksDB event happens such as flush. It can
/// be used as a building block for developing custom features such as
/// stats-collector or external compaction algorithm.
///
/// Note that call-back functions should not run for an extended period of
/// time before the function returns, otherwise RocksDB may be blocked.
/// For more information, please see
/// [doc of rocksdb](https://github.com/facebook/rocksdb/blob/master/include/rocksdb/listener.h).
pub trait EventListener: Send + Sync {
fn on_flush_completed(&self, _: &FlushJobInfo) {}
fn on_compaction_completed(&self, _: &CompactionJobInfo) {}
fn on_external_file_ingested(&self, _: &IngestionInfo) {}
}
extern "C" fn destructor(ctx: *mut c_void) {
unsafe {
Box::from_raw(ctx as *mut Box<EventListener>);
}
}
// Maybe we should reuse db instance?
// TODO: refactor DB implement so that we can convert DBInstance to DB.
extern "C" fn on_flush_completed(ctx: *mut c_void,
_: *mut DBInstance,
info: *const DBFlushJobInfo) {
let (ctx, info) = unsafe { (&*(ctx as *mut Box<EventListener>), mem::transmute(&*info)) };
ctx.on_flush_completed(info);
}
extern "C" fn on_compaction_completed(ctx: *mut c_void,
_: *mut DBInstance,
info: *const DBCompactionJobInfo) {
let (ctx, info) = unsafe { (&*(ctx as *mut Box<EventListener>), mem::transmute(&*info)) };
ctx.on_compaction_completed(info);
}
extern "C" fn on_external_file_ingested(ctx: *mut c_void,
_: *mut DBInstance,
info: *const DBIngestionInfo) {
let (ctx, info) = unsafe { (&*(ctx as *mut Box<EventListener>), mem::transmute(&*info)) };
ctx.on_external_file_ingested(info);
}
pub fn new_event_listener<L: EventListener>(l: L) -> *mut DBEventListener {
let p: Box<EventListener> = Box::new(l);
unsafe {
crocksdb_ffi::crocksdb_eventlistener_create(Box::into_raw(Box::new(p)) as *mut c_void,
destructor,
on_flush_completed,
on_compaction_completed,
on_external_file_ingested)
}
}
...@@ -29,8 +29,10 @@ mod slice_transform; ...@@ -29,8 +29,10 @@ mod slice_transform;
mod table_properties; mod table_properties;
mod table_properties_collector; mod table_properties_collector;
mod table_properties_collector_factory; mod table_properties_collector_factory;
mod event_listener;
pub use compaction_filter::CompactionFilter; pub use compaction_filter::CompactionFilter;
pub use event_listener::{EventListener, CompactionJobInfo, IngestionInfo, FlushJobInfo};
pub use librocksdb_sys::{DBCompactionStyle, DBCompressionType, DBRecoveryMode, DBInfoLogLevel, pub use librocksdb_sys::{DBCompactionStyle, DBCompressionType, DBRecoveryMode, DBInfoLogLevel,
DBStatisticsTickerType, DBStatisticsHistogramType, new_bloom_filter, DBStatisticsTickerType, DBStatisticsHistogramType, new_bloom_filter,
CompactionPriority, DBEntryType, self as crocksdb_ffi}; CompactionPriority, DBEntryType, self as crocksdb_ffi};
...@@ -40,6 +42,7 @@ pub use rocksdb::{DB, DBIterator, DBVector, Kv, SeekKey, Writable, WriteBatch, C ...@@ -40,6 +42,7 @@ pub use rocksdb::{DB, DBIterator, DBVector, Kv, SeekKey, Writable, WriteBatch, C
pub use rocksdb_options::{BlockBasedOptions, Options, ReadOptions, WriteOptions, RestoreOptions, pub use rocksdb_options::{BlockBasedOptions, Options, ReadOptions, WriteOptions, RestoreOptions,
IngestExternalFileOptions, EnvOptions, HistogramData, CompactOptions}; IngestExternalFileOptions, EnvOptions, HistogramData, CompactOptions};
pub use slice_transform::SliceTransform; pub use slice_transform::SliceTransform;
pub use table_properties::{TableProperties, TablePropertiesCollection, UserCollectedProperties}; pub use table_properties::{TableProperties, TablePropertiesCollection,
TablePropertiesCollectionView, UserCollectedProperties};
pub use table_properties_collector::TablePropertiesCollector; pub use table_properties_collector::TablePropertiesCollector;
pub use table_properties_collector_factory::TablePropertiesCollectorFactory; pub use table_properties_collector_factory::TablePropertiesCollectorFactory;
...@@ -20,6 +20,7 @@ use crocksdb_ffi::{self, DBOptions, DBWriteOptions, DBBlockBasedTableOptions, DB ...@@ -20,6 +20,7 @@ use crocksdb_ffi::{self, DBOptions, DBWriteOptions, DBBlockBasedTableOptions, DB
DBRestoreOptions, DBCompressionType, DBRecoveryMode, DBSnapshot, DBInstance, DBRestoreOptions, DBCompressionType, DBRecoveryMode, DBSnapshot, DBInstance,
DBFlushOptions, DBStatisticsTickerType, DBStatisticsHistogramType, DBFlushOptions, DBStatisticsTickerType, DBStatisticsHistogramType,
DBRateLimiter, DBInfoLogLevel, DBCompactOptions}; DBRateLimiter, DBInfoLogLevel, DBCompactOptions};
use event_listener::{EventListener, new_event_listener};
use libc::{self, c_int, size_t, c_void}; use libc::{self, c_int, size_t, c_void};
use merge_operator::{self, MergeOperatorCallback, full_merge_callback, partial_merge_callback}; use merge_operator::{self, MergeOperatorCallback, full_merge_callback, partial_merge_callback};
use merge_operator::MergeFn; use merge_operator::MergeFn;
...@@ -394,6 +395,11 @@ impl Options { ...@@ -394,6 +395,11 @@ impl Options {
} }
} }
pub fn add_event_listener<L: EventListener>(&mut self, l: L) {
let handle = new_event_listener(l);
unsafe { crocksdb_ffi::crocksdb_options_add_eventlistener(self.inner, handle) }
}
pub fn add_table_properties_collector_factory(&mut self, pub fn add_table_properties_collector_factory(&mut self,
fname: &str, fname: &str,
factory: Box<TablePropertiesCollectorFactory>) { factory: Box<TablePropertiesCollectorFactory>) {
......
...@@ -18,24 +18,16 @@ use crocksdb_ffi::{self, DBTableProperties, DBTableProperty, DBUserCollectedProp ...@@ -18,24 +18,16 @@ use crocksdb_ffi::{self, DBTableProperties, DBTableProperty, DBUserCollectedProp
use libc::size_t; use libc::size_t;
use std::{slice, str, mem}; use std::{slice, str, mem};
use std::marker::PhantomData; use std::marker::PhantomData;
use std::ops::Index; use std::ops::{Index, Deref};
pub struct TablePropertiesCollection { pub struct TablePropertiesCollectionView(DBTablePropertiesCollection);
inner: *mut DBTablePropertiesCollection,
}
impl Drop for TablePropertiesCollection { impl TablePropertiesCollectionView {
fn drop(&mut self) { pub unsafe fn from_ptr<'a>(collection: *const DBTablePropertiesCollection)
unsafe { -> &'a TablePropertiesCollectionView {
crocksdb_ffi::crocksdb_table_properties_collection_destroy(self.inner); let c = &*collection;
} mem::transmute(c)
}
}
impl TablePropertiesCollection {
pub unsafe fn from_raw(ptr: *mut DBTablePropertiesCollection) -> TablePropertiesCollection {
TablePropertiesCollection { inner: ptr }
} }
pub fn iter(&self) -> TablePropertiesCollectionIter { pub fn iter(&self) -> TablePropertiesCollectionIter {
...@@ -43,7 +35,7 @@ impl TablePropertiesCollection { ...@@ -43,7 +35,7 @@ impl TablePropertiesCollection {
} }
pub fn len(&self) -> usize { pub fn len(&self) -> usize {
unsafe { crocksdb_ffi::crocksdb_table_properties_collection_len(self.inner) } unsafe { crocksdb_ffi::crocksdb_table_properties_collection_len(&self.0) }
} }
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
...@@ -51,7 +43,7 @@ impl TablePropertiesCollection { ...@@ -51,7 +43,7 @@ impl TablePropertiesCollection {
} }
} }
impl<'a> IntoIterator for &'a TablePropertiesCollection { impl<'a> IntoIterator for &'a TablePropertiesCollectionView {
type Item = (&'a str, &'a TableProperties); type Item = (&'a str, &'a TableProperties);
type IntoIter = TablePropertiesCollectionIter<'a>; type IntoIter = TablePropertiesCollectionIter<'a>;
...@@ -74,11 +66,11 @@ impl<'a> Drop for TablePropertiesCollectionIter<'a> { ...@@ -74,11 +66,11 @@ impl<'a> Drop for TablePropertiesCollectionIter<'a> {
} }
impl<'a> TablePropertiesCollectionIter<'a> { impl<'a> TablePropertiesCollectionIter<'a> {
fn new(props: &'a TablePropertiesCollection) -> TablePropertiesCollectionIter<'a> { fn new(props: &'a TablePropertiesCollectionView) -> TablePropertiesCollectionIter<'a> {
unsafe { unsafe {
TablePropertiesCollectionIter { TablePropertiesCollectionIter {
props: PhantomData, props: PhantomData,
inner: crocksdb_ffi::crocksdb_table_properties_collection_iter_create(props.inner), inner: crocksdb_ffi::crocksdb_table_properties_collection_iter_create(&props.0),
} }
} }
} }
...@@ -106,16 +98,40 @@ impl<'a> Iterator for TablePropertiesCollectionIter<'a> { ...@@ -106,16 +98,40 @@ impl<'a> Iterator for TablePropertiesCollectionIter<'a> {
} }
} }
pub struct TablePropertiesCollection {
inner: *mut DBTablePropertiesCollection,
}
impl Drop for TablePropertiesCollection {
fn drop(&mut self) {
unsafe {
crocksdb_ffi::crocksdb_table_properties_collection_destroy(self.inner);
}
}
}
impl TablePropertiesCollection {
pub unsafe fn from_raw(ptr: *mut DBTablePropertiesCollection) -> TablePropertiesCollection {
TablePropertiesCollection { inner: ptr }
}
}
impl Deref for TablePropertiesCollection {
type Target = TablePropertiesCollectionView;
fn deref(&self) -> &TablePropertiesCollectionView {
unsafe { TablePropertiesCollectionView::from_ptr(self.inner) }
}
}
pub struct TableProperties { pub struct TableProperties {
inner: DBTableProperties, inner: DBTableProperties,
} }
impl TableProperties { impl TableProperties {
fn from_ptr<'a>(ptr: *const DBTableProperties) -> &'a TableProperties { pub unsafe fn from_ptr<'a>(ptr: *const DBTableProperties) -> &'a TableProperties {
unsafe { let res = &*ptr;
let res = &*ptr; mem::transmute(res)
mem::transmute(res)
}
} }
fn get_u64(&self, prop: DBTableProperty) -> u64 { fn get_u64(&self, prop: DBTableProperty) -> u64 {
......
...@@ -13,3 +13,4 @@ mod test_slice_transform; ...@@ -13,3 +13,4 @@ mod test_slice_transform;
mod test_prefix_extractor; mod test_prefix_extractor;
mod test_statistics; mod test_statistics;
mod test_table_properties; mod test_table_properties;
mod test_event_listener;
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::sync::atomic::*;
use rocksdb::*;
use tempdir::TempDir;
use test_ingest_external_file::gen_sst;
#[derive(Default, Clone)]
struct EventCounter {
flush: Arc<AtomicUsize>,
compaction: Arc<AtomicUsize>,
ingestion: Arc<AtomicUsize>,
drop_count: Arc<AtomicUsize>,
}
impl Drop for EventCounter {
fn drop(&mut self) {
self.drop_count.fetch_add(1, Ordering::SeqCst);
}
}
impl EventListener for EventCounter {
fn on_flush_completed(&self, info: &FlushJobInfo) {
assert!(!info.cf_name().is_empty());
assert!(info.file_path().exists());
assert_ne!(info.table_properties().data_size(), 0);
self.flush.fetch_add(1, Ordering::SeqCst);
}
fn on_compaction_completed(&self, info: &CompactionJobInfo) {
assert!(!info.cf_name().is_empty());
let input_file_count = info.input_file_count();
assert_ne!(input_file_count, 0);
for i in 0..input_file_count {
let path = info.input_file_at(i);
assert!(path.exists());
}
let output_file_count = info.output_file_count();
assert_ne!(output_file_count, 0);
for i in 0..output_file_count {
let path = info.output_file_at(i);
assert!(path.exists());
}
let props = info.table_properties();
assert_eq!(props.len(), output_file_count + input_file_count);
self.compaction.fetch_add(1, Ordering::SeqCst);
}
fn on_external_file_ingested(&self, info: &IngestionInfo) {
assert!(!info.cf_name().is_empty());
assert!(info.internal_file_path().exists());
assert_ne!(info.table_properties().data_size(), 0);
self.ingestion.fetch_add(1, Ordering::SeqCst);
}
}
#[test]
fn test_event_listener_basic() {
let path = TempDir::new("_rust_rocksdb_event_listener_flush").expect("");
let path_str = path.path().to_str().unwrap();
let mut opts = Options::new();
let counter = EventCounter::default();
opts.add_event_listener(counter.clone());
opts.create_if_missing(true);
let db = DB::open(opts, path_str).unwrap();
for i in 1..8000 {
db.put(format!("{:04}", i).as_bytes(),
format!("{:04}", i).as_bytes())
.unwrap();
}
db.flush(true).unwrap();
assert_ne!(counter.flush.load(Ordering::SeqCst), 0);
for i in 1..8000 {
db.put(format!("{:04}", i).as_bytes(),
format!("{:04}", i).as_bytes())
.unwrap();
}
db.flush(true).unwrap();
let flush_cnt = counter.flush.load(Ordering::SeqCst);
assert_ne!(flush_cnt, 0);
assert_eq!(counter.compaction.load(Ordering::SeqCst), 0);
db.compact_range(None, None);
assert_eq!(counter.flush.load(Ordering::SeqCst), flush_cnt);
assert_ne!(counter.compaction.load(Ordering::SeqCst), 0);
drop(db);
assert_eq!(counter.drop_count.load(Ordering::SeqCst), 1);
}
#[test]
fn test_event_listener_ingestion() {
let path = TempDir::new("_rust_rocksdb_event_listener_ingestion").expect("");
let path_str = path.path().to_str().unwrap();
let mut opts = Options::new();
let counter = EventCounter::default();
opts.add_event_listener(counter.clone());
opts.create_if_missing(true);
let db = DB::open(opts, path_str).unwrap();
let gen_path = TempDir::new("_rust_rocksdb_ingest_sst_gen").expect("");
let test_sstfile = gen_path.path().join("test_sst_file");
let test_sstfile_str = test_sstfile.to_str().unwrap();
let default_options = db.get_options();
gen_sst(default_options,
Some(db.cf_handle("default").unwrap()),
test_sstfile_str,
&[(b"k1", b"v1"), (b"k2", b"v2")]);
let ingest_opt = IngestExternalFileOptions::new();
db.ingest_external_file(&ingest_opt, &[test_sstfile_str])
.unwrap();
assert!(test_sstfile.exists());
assert_eq!(db.get(b"k1").unwrap().unwrap(), b"v1");
assert_eq!(db.get(b"k2").unwrap().unwrap(), b"v2");
assert_ne!(counter.ingestion.load(Ordering::SeqCst), 0);
}
...@@ -16,7 +16,7 @@ use rocksdb::*; ...@@ -16,7 +16,7 @@ use rocksdb::*;
use std::fs; use std::fs;
use tempdir::TempDir; use tempdir::TempDir;
fn gen_sst(opt: Options, cf: Option<&CFHandle>, path: &str, data: &[(&[u8], &[u8])]) { pub fn gen_sst(opt: Options, cf: Option<&CFHandle>, path: &str, data: &[(&[u8], &[u8])]) {
let _ = fs::remove_file(path); let _ = fs::remove_file(path);
let env_opt = EnvOptions::new(); let env_opt = EnvOptions::new();
let mut writer = if cf.is_some() { let mut writer = if cf.is_some() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment