Unverified Commit df009f56 authored by Xinye Tao's avatar Xinye Tao Committed by GitHub

port compaction begin and subcompaction listener (#589)

Add event listener for flush begin, compaction begin and subcompaction begin/completed.
Signed-off-by: 's avatartabokie <xy.tao@outlook.com>
parent 63e7ceac
...@@ -416,6 +416,11 @@ pub struct crocksdb_compactionjobinfo_t { ...@@ -416,6 +416,11 @@ pub struct crocksdb_compactionjobinfo_t {
} }
#[repr(C)] #[repr(C)]
#[derive(Debug, Copy, Clone)] #[derive(Debug, Copy, Clone)]
pub struct crocksdb_subcompactionjobinfo_t {
_unused: [u8; 0],
}
#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct crocksdb_externalfileingestioninfo_t { pub struct crocksdb_externalfileingestioninfo_t {
_unused: [u8; 0], _unused: [u8; 0],
} }
...@@ -1748,14 +1753,41 @@ extern "C" { ...@@ -1748,14 +1753,41 @@ extern "C" {
} }
extern "C" { extern "C" {
pub fn crocksdb_compactionjobinfo_total_input_bytes( pub fn crocksdb_compactionjobinfo_total_input_bytes(
info: *const crocksdb_compactionjobinfo_t, arg1: *const crocksdb_compactionjobinfo_t,
) -> u64; ) -> u64;
} }
extern "C" { extern "C" {
pub fn crocksdb_compactionjobinfo_total_output_bytes( pub fn crocksdb_compactionjobinfo_total_output_bytes(
info: *const crocksdb_compactionjobinfo_t, arg1: *const crocksdb_compactionjobinfo_t,
) -> u64;
}
extern "C" {
pub fn crocksdb_subcompactionjobinfo_status(
arg1: *const crocksdb_subcompactionjobinfo_t,
arg2: *mut *mut libc::c_char,
);
}
extern "C" {
pub fn crocksdb_subcompactionjobinfo_cf_name(
arg1: *const crocksdb_subcompactionjobinfo_t,
arg2: *mut usize,
) -> *const libc::c_char;
}
extern "C" {
pub fn crocksdb_subcompactionjobinfo_thread_id(
arg1: *const crocksdb_subcompactionjobinfo_t,
) -> u64; ) -> u64;
} }
extern "C" {
pub fn crocksdb_subcompactionjobinfo_base_input_level(
arg1: *const crocksdb_subcompactionjobinfo_t,
) -> libc::c_int;
}
extern "C" {
pub fn crocksdb_subcompactionjobinfo_output_level(
arg1: *const crocksdb_subcompactionjobinfo_t,
) -> libc::c_int;
}
extern "C" { extern "C" {
pub fn crocksdb_externalfileingestioninfo_cf_name( pub fn crocksdb_externalfileingestioninfo_cf_name(
arg1: *const crocksdb_externalfileingestioninfo_t, arg1: *const crocksdb_externalfileingestioninfo_t,
...@@ -1789,6 +1821,13 @@ extern "C" { ...@@ -1789,6 +1821,13 @@ extern "C" {
arg1: *const crocksdb_writestallinfo_t, arg1: *const crocksdb_writestallinfo_t,
) -> *const crocksdb_writestallcondition_t; ) -> *const crocksdb_writestallcondition_t;
} }
pub type on_flush_begin_cb = ::std::option::Option<
unsafe extern "C" fn(
arg1: *mut libc::c_void,
arg2: *mut crocksdb_t,
arg3: *const crocksdb_flushjobinfo_t,
),
>;
pub type on_flush_completed_cb = ::std::option::Option< pub type on_flush_completed_cb = ::std::option::Option<
unsafe extern "C" fn( unsafe extern "C" fn(
arg1: *mut libc::c_void, arg1: *mut libc::c_void,
...@@ -1796,6 +1835,13 @@ pub type on_flush_completed_cb = ::std::option::Option< ...@@ -1796,6 +1835,13 @@ pub type on_flush_completed_cb = ::std::option::Option<
arg3: *const crocksdb_flushjobinfo_t, arg3: *const crocksdb_flushjobinfo_t,
), ),
>; >;
pub type on_compaction_begin_cb = ::std::option::Option<
unsafe extern "C" fn(
arg1: *mut libc::c_void,
arg2: *mut crocksdb_t,
arg3: *const crocksdb_compactionjobinfo_t,
),
>;
pub type on_compaction_completed_cb = ::std::option::Option< pub type on_compaction_completed_cb = ::std::option::Option<
unsafe extern "C" fn( unsafe extern "C" fn(
arg1: *mut libc::c_void, arg1: *mut libc::c_void,
...@@ -1803,6 +1849,12 @@ pub type on_compaction_completed_cb = ::std::option::Option< ...@@ -1803,6 +1849,12 @@ pub type on_compaction_completed_cb = ::std::option::Option<
arg3: *const crocksdb_compactionjobinfo_t, arg3: *const crocksdb_compactionjobinfo_t,
), ),
>; >;
pub type on_subcompaction_begin_cb = ::std::option::Option<
unsafe extern "C" fn(arg1: *mut libc::c_void, arg2: *const crocksdb_subcompactionjobinfo_t),
>;
pub type on_subcompaction_completed_cb = ::std::option::Option<
unsafe extern "C" fn(arg1: *mut libc::c_void, arg2: *const crocksdb_subcompactionjobinfo_t),
>;
pub type on_external_file_ingested_cb = ::std::option::Option< pub type on_external_file_ingested_cb = ::std::option::Option<
unsafe extern "C" fn( unsafe extern "C" fn(
arg1: *mut libc::c_void, arg1: *mut libc::c_void,
...@@ -1831,8 +1883,12 @@ extern "C" { ...@@ -1831,8 +1883,12 @@ extern "C" {
pub fn crocksdb_eventlistener_create( pub fn crocksdb_eventlistener_create(
state_: *mut libc::c_void, state_: *mut libc::c_void,
destructor_: ::std::option::Option<unsafe extern "C" fn(arg1: *mut libc::c_void)>, destructor_: ::std::option::Option<unsafe extern "C" fn(arg1: *mut libc::c_void)>,
on_flush_begin: on_flush_begin_cb,
on_flush_completed: on_flush_completed_cb, on_flush_completed: on_flush_completed_cb,
on_compaction_begin: on_compaction_begin_cb,
on_compaction_completed: on_compaction_completed_cb, on_compaction_completed: on_compaction_completed_cb,
on_subcompaction_begin: on_subcompaction_begin_cb,
on_subcompaction_completed: on_subcompaction_completed_cb,
on_external_file_ingested: on_external_file_ingested_cb, on_external_file_ingested: on_external_file_ingested_cb,
on_background_error: on_background_error_cb, on_background_error: on_background_error_cb,
on_stall_conditions_changed: on_stall_conditions_changed_cb, on_stall_conditions_changed: on_stall_conditions_changed_cb,
...@@ -2696,6 +2752,12 @@ extern "C" { ...@@ -2696,6 +2752,12 @@ extern "C" {
rate_bytes_per_sec: i64, rate_bytes_per_sec: i64,
); );
} }
extern "C" {
pub fn crocksdb_ratelimiter_set_auto_tuned(
limiter: *mut crocksdb_ratelimiter_t,
auto_tuned: libc::c_uchar,
);
}
extern "C" { extern "C" {
pub fn crocksdb_ratelimiter_get_singleburst_bytes(limiter: *mut crocksdb_ratelimiter_t) -> i64; pub fn crocksdb_ratelimiter_get_singleburst_bytes(limiter: *mut crocksdb_ratelimiter_t) -> i64;
} }
...@@ -2719,43 +2781,17 @@ extern "C" { ...@@ -2719,43 +2781,17 @@ extern "C" {
extern "C" { extern "C" {
pub fn crocksdb_ratelimiter_get_bytes_per_second(limiter: *mut crocksdb_ratelimiter_t) -> i64; pub fn crocksdb_ratelimiter_get_bytes_per_second(limiter: *mut crocksdb_ratelimiter_t) -> i64;
} }
extern "C" {
pub fn crocksdb_ratelimiter_get_auto_tuned(
limiter: *mut crocksdb_ratelimiter_t,
) -> libc::c_uchar;
}
extern "C" { extern "C" {
pub fn crocksdb_ratelimiter_get_total_requests( pub fn crocksdb_ratelimiter_get_total_requests(
limiter: *mut crocksdb_ratelimiter_t, limiter: *mut crocksdb_ratelimiter_t,
pri: libc::c_uchar, pri: libc::c_uchar,
) -> i64; ) -> i64;
} }
extern "C" {
pub fn crocksdb_compactionfilter_create(
state: *mut libc::c_void,
destructor: ::std::option::Option<unsafe extern "C" fn(arg1: *mut libc::c_void)>,
filter: ::std::option::Option<
unsafe extern "C" fn(
arg1: *mut libc::c_void,
level: libc::c_int,
key: *const libc::c_char,
key_length: usize,
existing_value: *const libc::c_char,
value_length: usize,
new_value: *mut *mut libc::c_char,
new_value_length: *mut usize,
value_changed: *mut libc::c_uchar,
) -> libc::c_uchar,
>,
name: ::std::option::Option<
unsafe extern "C" fn(arg1: *mut libc::c_void) -> *const libc::c_char,
>,
) -> *mut crocksdb_compactionfilter_t;
}
extern "C" {
pub fn crocksdb_compactionfilter_set_ignore_snapshots(
arg1: *mut crocksdb_compactionfilter_t,
arg2: libc::c_uchar,
);
}
extern "C" {
pub fn crocksdb_compactionfilter_destroy(arg1: *mut crocksdb_compactionfilter_t);
}
extern "C" { extern "C" {
pub fn crocksdb_compactionfiltercontext_is_full_compaction( pub fn crocksdb_compactionfiltercontext_is_full_compaction(
context: *mut crocksdb_compactionfiltercontext_t, context: *mut crocksdb_compactionfiltercontext_t,
......
...@@ -139,6 +139,7 @@ using rocksdb::SstFileWriter; ...@@ -139,6 +139,7 @@ using rocksdb::SstFileWriter;
using rocksdb::SstPartitioner; using rocksdb::SstPartitioner;
using rocksdb::SstPartitionerFactory; using rocksdb::SstPartitionerFactory;
using rocksdb::Status; using rocksdb::Status;
using rocksdb::SubcompactionJobInfo;
using rocksdb::TableProperties; using rocksdb::TableProperties;
using rocksdb::TablePropertiesCollection; using rocksdb::TablePropertiesCollection;
using rocksdb::TablePropertiesCollector; using rocksdb::TablePropertiesCollector;
...@@ -364,6 +365,9 @@ struct crocksdb_writestallinfo_t { ...@@ -364,6 +365,9 @@ struct crocksdb_writestallinfo_t {
struct crocksdb_compactionjobinfo_t { struct crocksdb_compactionjobinfo_t {
CompactionJobInfo rep; CompactionJobInfo rep;
}; };
struct crocksdb_subcompactionjobinfo_t {
SubcompactionJobInfo rep;
};
struct crocksdb_externalfileingestioninfo_t { struct crocksdb_externalfileingestioninfo_t {
ExternalFileIngestionInfo rep; ExternalFileIngestionInfo rep;
}; };
...@@ -2167,6 +2171,34 @@ CompactionReason crocksdb_compactionjobinfo_compaction_reason( ...@@ -2167,6 +2171,34 @@ CompactionReason crocksdb_compactionjobinfo_compaction_reason(
return info->rep.compaction_reason; return info->rep.compaction_reason;
} }
/* SubcompactionJobInfo */
void crocksdb_subcompactionjobinfo_status(
const crocksdb_subcompactionjobinfo_t* info, char** errptr) {
SaveError(errptr, info->rep.status);
}
const char* crocksdb_subcompactionjobinfo_cf_name(
const crocksdb_subcompactionjobinfo_t* info, size_t* size) {
*size = info->rep.cf_name.size();
return info->rep.cf_name.data();
}
uint64_t crocksdb_subcompactionjobinfo_thread_id(
const crocksdb_subcompactionjobinfo_t* info) {
return info->rep.thread_id;
}
int crocksdb_subcompactionjobinfo_base_input_level(
const crocksdb_subcompactionjobinfo_t* info) {
return info->rep.base_input_level;
}
int crocksdb_subcompactionjobinfo_output_level(
const crocksdb_subcompactionjobinfo_t* info) {
return info->rep.output_level;
}
/* ExternalFileIngestionInfo */ /* ExternalFileIngestionInfo */
const char* crocksdb_externalfileingestioninfo_cf_name( const char* crocksdb_externalfileingestioninfo_cf_name(
...@@ -2212,22 +2244,41 @@ const crocksdb_writestallcondition_t* crocksdb_writestallinfo_prev( ...@@ -2212,22 +2244,41 @@ const crocksdb_writestallcondition_t* crocksdb_writestallinfo_prev(
struct crocksdb_eventlistener_t : public EventListener { struct crocksdb_eventlistener_t : public EventListener {
void* state_; void* state_;
void (*destructor_)(void*); void (*destructor_)(void*);
void (*on_flush_begin)(void*, crocksdb_t*, const crocksdb_flushjobinfo_t*);
void (*on_flush_completed)(void*, crocksdb_t*, void (*on_flush_completed)(void*, crocksdb_t*,
const crocksdb_flushjobinfo_t*); const crocksdb_flushjobinfo_t*);
void (*on_compaction_begin)(void*, crocksdb_t*,
const crocksdb_compactionjobinfo_t*);
void (*on_compaction_completed)(void*, crocksdb_t*, void (*on_compaction_completed)(void*, crocksdb_t*,
const crocksdb_compactionjobinfo_t*); const crocksdb_compactionjobinfo_t*);
void (*on_subcompaction_begin)(void*, const crocksdb_subcompactionjobinfo_t*);
void (*on_subcompaction_completed)(void*,
const crocksdb_subcompactionjobinfo_t*);
void (*on_external_file_ingested)( void (*on_external_file_ingested)(
void*, crocksdb_t*, const crocksdb_externalfileingestioninfo_t*); void*, crocksdb_t*, const crocksdb_externalfileingestioninfo_t*);
void (*on_background_error)(void*, crocksdb_backgrounderrorreason_t, void (*on_background_error)(void*, crocksdb_backgrounderrorreason_t,
crocksdb_status_ptr_t*); crocksdb_status_ptr_t*);
void (*on_stall_conditions_changed)(void*, const crocksdb_writestallinfo_t*); void (*on_stall_conditions_changed)(void*, const crocksdb_writestallinfo_t*);
virtual void OnFlushBegin(DB* db, const FlushJobInfo& info) {
crocksdb_t c_db = {db};
on_flush_begin(state_, &c_db,
reinterpret_cast<const crocksdb_flushjobinfo_t*>(&info));
}
virtual void OnFlushCompleted(DB* db, const FlushJobInfo& info) { virtual void OnFlushCompleted(DB* db, const FlushJobInfo& info) {
crocksdb_t c_db = {db}; crocksdb_t c_db = {db};
on_flush_completed(state_, &c_db, on_flush_completed(state_, &c_db,
reinterpret_cast<const crocksdb_flushjobinfo_t*>(&info)); reinterpret_cast<const crocksdb_flushjobinfo_t*>(&info));
} }
virtual void OnCompactionBegin(DB* db, const CompactionJobInfo& info) {
crocksdb_t c_db = {db};
on_compaction_begin(
state_, &c_db,
reinterpret_cast<const crocksdb_compactionjobinfo_t*>(&info));
}
virtual void OnCompactionCompleted(DB* db, const CompactionJobInfo& info) { virtual void OnCompactionCompleted(DB* db, const CompactionJobInfo& info) {
crocksdb_t c_db = {db}; crocksdb_t c_db = {db};
on_compaction_completed( on_compaction_completed(
...@@ -2235,6 +2286,18 @@ struct crocksdb_eventlistener_t : public EventListener { ...@@ -2235,6 +2286,18 @@ struct crocksdb_eventlistener_t : public EventListener {
reinterpret_cast<const crocksdb_compactionjobinfo_t*>(&info)); reinterpret_cast<const crocksdb_compactionjobinfo_t*>(&info));
} }
virtual void OnSubcompactionBegin(const SubcompactionJobInfo& info) {
on_subcompaction_begin(
state_,
reinterpret_cast<const crocksdb_subcompactionjobinfo_t*>(&info));
}
virtual void OnSubcompactionCompleted(const SubcompactionJobInfo& info) {
on_subcompaction_completed(
state_,
reinterpret_cast<const crocksdb_subcompactionjobinfo_t*>(&info));
}
virtual void OnExternalFileIngested(DB* db, virtual void OnExternalFileIngested(DB* db,
const ExternalFileIngestionInfo& info) { const ExternalFileIngestionInfo& info) {
crocksdb_t c_db = {db}; crocksdb_t c_db = {db};
...@@ -2276,17 +2339,24 @@ struct crocksdb_eventlistener_t : public EventListener { ...@@ -2276,17 +2339,24 @@ struct crocksdb_eventlistener_t : public EventListener {
}; };
crocksdb_eventlistener_t* crocksdb_eventlistener_create( crocksdb_eventlistener_t* crocksdb_eventlistener_create(
void* state_, void (*destructor_)(void*), void* state_, void (*destructor_)(void*), on_flush_begin_cb on_flush_begin,
on_flush_completed_cb on_flush_completed, on_flush_completed_cb on_flush_completed,
on_compaction_begin_cb on_compaction_begin,
on_compaction_completed_cb on_compaction_completed, on_compaction_completed_cb on_compaction_completed,
on_subcompaction_begin_cb on_subcompaction_begin,
on_subcompaction_completed_cb on_subcompaction_completed,
on_external_file_ingested_cb on_external_file_ingested, on_external_file_ingested_cb on_external_file_ingested,
on_background_error_cb on_background_error, on_background_error_cb on_background_error,
on_stall_conditions_changed_cb on_stall_conditions_changed) { on_stall_conditions_changed_cb on_stall_conditions_changed) {
crocksdb_eventlistener_t* et = new crocksdb_eventlistener_t; crocksdb_eventlistener_t* et = new crocksdb_eventlistener_t;
et->state_ = state_; et->state_ = state_;
et->destructor_ = destructor_; et->destructor_ = destructor_;
et->on_flush_begin = on_flush_begin;
et->on_flush_completed = on_flush_completed; et->on_flush_completed = on_flush_completed;
et->on_compaction_begin = on_compaction_begin;
et->on_compaction_completed = on_compaction_completed; et->on_compaction_completed = on_compaction_completed;
et->on_subcompaction_begin = on_subcompaction_begin;
et->on_subcompaction_completed = on_subcompaction_completed;
et->on_external_file_ingested = on_external_file_ingested; et->on_external_file_ingested = on_external_file_ingested;
et->on_background_error = on_background_error; et->on_background_error = on_background_error;
et->on_stall_conditions_changed = on_stall_conditions_changed; et->on_stall_conditions_changed = on_stall_conditions_changed;
......
...@@ -138,6 +138,7 @@ typedef struct crocksdb_table_properties_collector_factory_t ...@@ -138,6 +138,7 @@ typedef struct crocksdb_table_properties_collector_factory_t
crocksdb_table_properties_collector_factory_t; crocksdb_table_properties_collector_factory_t;
typedef struct crocksdb_flushjobinfo_t crocksdb_flushjobinfo_t; typedef struct crocksdb_flushjobinfo_t crocksdb_flushjobinfo_t;
typedef struct crocksdb_compactionjobinfo_t crocksdb_compactionjobinfo_t; typedef struct crocksdb_compactionjobinfo_t crocksdb_compactionjobinfo_t;
typedef struct crocksdb_subcompactionjobinfo_t crocksdb_subcompactionjobinfo_t;
typedef struct crocksdb_externalfileingestioninfo_t typedef struct crocksdb_externalfileingestioninfo_t
crocksdb_externalfileingestioninfo_t; crocksdb_externalfileingestioninfo_t;
typedef struct crocksdb_eventlistener_t crocksdb_eventlistener_t; typedef struct crocksdb_eventlistener_t crocksdb_eventlistener_t;
...@@ -830,10 +831,22 @@ extern C_ROCKSDB_LIBRARY_API uint64_t ...@@ -830,10 +831,22 @@ extern C_ROCKSDB_LIBRARY_API uint64_t
crocksdb_compactionjobinfo_output_records(const crocksdb_compactionjobinfo_t*); crocksdb_compactionjobinfo_output_records(const crocksdb_compactionjobinfo_t*);
extern C_ROCKSDB_LIBRARY_API uint64_t extern C_ROCKSDB_LIBRARY_API uint64_t
crocksdb_compactionjobinfo_total_input_bytes( crocksdb_compactionjobinfo_total_input_bytes(
const crocksdb_compactionjobinfo_t* info); const crocksdb_compactionjobinfo_t*);
extern C_ROCKSDB_LIBRARY_API uint64_t extern C_ROCKSDB_LIBRARY_API uint64_t
crocksdb_compactionjobinfo_total_output_bytes( crocksdb_compactionjobinfo_total_output_bytes(
const crocksdb_compactionjobinfo_t* info); const crocksdb_compactionjobinfo_t*);
/* Subcompaction job info */
extern C_ROCKSDB_LIBRARY_API void crocksdb_subcompactionjobinfo_status(
const crocksdb_subcompactionjobinfo_t*, char**);
extern C_ROCKSDB_LIBRARY_API const char* crocksdb_subcompactionjobinfo_cf_name(
const crocksdb_subcompactionjobinfo_t*, size_t*);
extern C_ROCKSDB_LIBRARY_API uint64_t
crocksdb_subcompactionjobinfo_thread_id(const crocksdb_subcompactionjobinfo_t*);
extern C_ROCKSDB_LIBRARY_API int crocksdb_subcompactionjobinfo_base_input_level(
const crocksdb_subcompactionjobinfo_t*);
extern C_ROCKSDB_LIBRARY_API int crocksdb_subcompactionjobinfo_output_level(
const crocksdb_subcompactionjobinfo_t*);
/* External file ingestion info */ /* External file ingestion info */
extern C_ROCKSDB_LIBRARY_API const char* extern C_ROCKSDB_LIBRARY_API const char*
...@@ -856,10 +869,18 @@ crocksdb_writestallinfo_prev(const crocksdb_writestallinfo_t*); ...@@ -856,10 +869,18 @@ crocksdb_writestallinfo_prev(const crocksdb_writestallinfo_t*);
/* Event listener */ /* Event listener */
typedef void (*on_flush_begin_cb)(void*, crocksdb_t*,
const crocksdb_flushjobinfo_t*);
typedef void (*on_flush_completed_cb)(void*, crocksdb_t*, typedef void (*on_flush_completed_cb)(void*, crocksdb_t*,
const crocksdb_flushjobinfo_t*); const crocksdb_flushjobinfo_t*);
typedef void (*on_compaction_begin_cb)(void*, crocksdb_t*,
const crocksdb_compactionjobinfo_t*);
typedef void (*on_compaction_completed_cb)(void*, crocksdb_t*, typedef void (*on_compaction_completed_cb)(void*, crocksdb_t*,
const crocksdb_compactionjobinfo_t*); const crocksdb_compactionjobinfo_t*);
typedef void (*on_subcompaction_begin_cb)(
void*, const crocksdb_subcompactionjobinfo_t*);
typedef void (*on_subcompaction_completed_cb)(
void*, const crocksdb_subcompactionjobinfo_t*);
typedef void (*on_external_file_ingested_cb)( typedef void (*on_external_file_ingested_cb)(
void*, crocksdb_t*, const crocksdb_externalfileingestioninfo_t*); void*, crocksdb_t*, const crocksdb_externalfileingestioninfo_t*);
typedef void (*on_background_error_cb)(void*, crocksdb_backgrounderrorreason_t, typedef void (*on_background_error_cb)(void*, crocksdb_backgrounderrorreason_t,
...@@ -870,9 +891,12 @@ typedef void (*crocksdb_logger_logv_cb)(void*, int log_level, const char*); ...@@ -870,9 +891,12 @@ typedef void (*crocksdb_logger_logv_cb)(void*, int log_level, const char*);
extern C_ROCKSDB_LIBRARY_API crocksdb_eventlistener_t* extern C_ROCKSDB_LIBRARY_API crocksdb_eventlistener_t*
crocksdb_eventlistener_create( crocksdb_eventlistener_create(
void* state_, void (*destructor_)(void*), void* state_, void (*destructor_)(void*), on_flush_begin_cb on_flush_begin,
on_flush_completed_cb on_flush_completed, on_flush_completed_cb on_flush_completed,
on_compaction_begin_cb on_compaction_begin,
on_compaction_completed_cb on_compaction_completed, on_compaction_completed_cb on_compaction_completed,
on_subcompaction_begin_cb on_subcompaction_begin,
on_subcompaction_completed_cb on_subcompaction_completed,
on_external_file_ingested_cb on_external_file_ingested, on_external_file_ingested_cb on_external_file_ingested,
on_background_error_cb on_background_error, on_background_error_cb on_background_error,
on_stall_conditions_changed_cb on_stall_conditions_changed); on_stall_conditions_changed_cb on_stall_conditions_changed);
......
Subproject commit a487d256ee927afc17ffe2dd863e4769c4ae8f3b Subproject commit 2dfbfa51b7892a80debc18e52c863b93bd79e491
...@@ -130,6 +130,8 @@ pub struct DBFlushJobInfo(c_void); ...@@ -130,6 +130,8 @@ pub struct DBFlushJobInfo(c_void);
#[repr(C)] #[repr(C)]
pub struct DBCompactionJobInfo(c_void); pub struct DBCompactionJobInfo(c_void);
#[repr(C)] #[repr(C)]
pub struct DBSubcompactionJobInfo(c_void);
#[repr(C)]
pub struct DBIngestionInfo(c_void); pub struct DBIngestionInfo(c_void);
#[repr(C)] #[repr(C)]
pub struct DBEventListener(c_void); pub struct DBEventListener(c_void);
...@@ -2144,6 +2146,21 @@ extern "C" { ...@@ -2144,6 +2146,21 @@ extern "C" {
info: *const DBCompactionJobInfo, info: *const DBCompactionJobInfo,
) -> CompactionReason; ) -> CompactionReason;
pub fn crocksdb_subcompactionjobinfo_status(
info: *const DBSubcompactionJobInfo,
errptr: *mut *mut c_char,
);
pub fn crocksdb_subcompactionjobinfo_cf_name(
info: *const DBSubcompactionJobInfo,
size: *mut size_t,
) -> *const c_char;
pub fn crocksdb_subcompactionjobinfo_thread_id(info: *const DBSubcompactionJobInfo) -> u64;
pub fn crocksdb_subcompactionjobinfo_base_input_level(
info: *const DBSubcompactionJobInfo,
) -> c_int;
pub fn crocksdb_subcompactionjobinfo_output_level(info: *const DBSubcompactionJobInfo)
-> c_int;
pub fn crocksdb_externalfileingestioninfo_cf_name( pub fn crocksdb_externalfileingestioninfo_cf_name(
info: *const DBIngestionInfo, info: *const DBIngestionInfo,
size: *mut size_t, size: *mut size_t,
...@@ -2169,8 +2186,12 @@ extern "C" { ...@@ -2169,8 +2186,12 @@ extern "C" {
pub fn crocksdb_eventlistener_create( pub fn crocksdb_eventlistener_create(
state: *mut c_void, state: *mut c_void,
destructor: extern "C" fn(*mut c_void), destructor: extern "C" fn(*mut c_void),
flush: extern "C" fn(*mut c_void, *mut DBInstance, *const DBFlushJobInfo), flush_begin: extern "C" fn(*mut c_void, *mut DBInstance, *const DBFlushJobInfo),
compact: extern "C" fn(*mut c_void, *mut DBInstance, *const DBCompactionJobInfo), flush_completed: extern "C" fn(*mut c_void, *mut DBInstance, *const DBFlushJobInfo),
compact_begin: extern "C" fn(*mut c_void, *mut DBInstance, *const DBCompactionJobInfo),
compact_completed: extern "C" fn(*mut c_void, *mut DBInstance, *const DBCompactionJobInfo),
subcompact_begin: extern "C" fn(*mut c_void, *const DBSubcompactionJobInfo),
subcompact_completed: extern "C" fn(*mut c_void, *const DBSubcompactionJobInfo),
ingest: extern "C" fn(*mut c_void, *mut DBInstance, *const DBIngestionInfo), ingest: extern "C" fn(*mut c_void, *mut DBInstance, *const DBIngestionInfo),
bg_error: extern "C" fn(*mut c_void, DBBackgroundErrorReason, *mut DBStatusPtr), bg_error: extern "C" fn(*mut c_void, DBBackgroundErrorReason, *mut DBStatusPtr),
stall_conditions: extern "C" fn(*mut c_void, *const DBWriteStallInfo), stall_conditions: extern "C" fn(*mut c_void, *const DBWriteStallInfo),
......
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
use crocksdb_ffi::{ use crocksdb_ffi::{
self, CompactionReason, DBBackgroundErrorReason, DBCompactionJobInfo, DBEventListener, self, CompactionReason, DBBackgroundErrorReason, DBCompactionJobInfo, DBEventListener,
DBFlushJobInfo, DBIngestionInfo, DBInstance, DBStatusPtr, DBWriteStallInfo, DBFlushJobInfo, DBIngestionInfo, DBInstance, DBStatusPtr, DBSubcompactionJobInfo,
WriteStallCondition, DBWriteStallInfo, WriteStallCondition,
}; };
use libc::c_void; use libc::c_void;
use std::path::Path; use std::path::Path;
...@@ -130,6 +130,32 @@ impl CompactionJobInfo { ...@@ -130,6 +130,32 @@ impl CompactionJobInfo {
} }
} }
#[repr(transparent)]
pub struct SubcompactionJobInfo(DBSubcompactionJobInfo);
impl SubcompactionJobInfo {
pub fn status(&self) -> Result<(), String> {
unsafe { ffi_try!(crocksdb_subcompactionjobinfo_status(&self.0)) }
Ok(())
}
pub fn cf_name(&self) -> &str {
unsafe { fetch_str!(crocksdb_subcompactionjobinfo_cf_name(&self.0)) }
}
pub fn thread_id(&self) -> u64 {
unsafe { crocksdb_ffi::crocksdb_subcompactionjobinfo_thread_id(&self.0) }
}
pub fn base_input_level(&self) -> i32 {
unsafe { crocksdb_ffi::crocksdb_subcompactionjobinfo_base_input_level(&self.0) }
}
pub fn output_level(&self) -> i32 {
unsafe { crocksdb_ffi::crocksdb_subcompactionjobinfo_output_level(&self.0) }
}
}
#[repr(transparent)] #[repr(transparent)]
pub struct IngestionInfo(DBIngestionInfo); pub struct IngestionInfo(DBIngestionInfo);
...@@ -180,8 +206,12 @@ impl WriteStallInfo { ...@@ -180,8 +206,12 @@ impl WriteStallInfo {
/// For more information, please see /// For more information, please see
/// [doc of rocksdb](https://github.com/facebook/rocksdb/blob/master/include/rocksdb/listener.h). /// [doc of rocksdb](https://github.com/facebook/rocksdb/blob/master/include/rocksdb/listener.h).
pub trait EventListener: Send + Sync { pub trait EventListener: Send + Sync {
fn on_flush_begin(&self, _: &FlushJobInfo) {}
fn on_flush_completed(&self, _: &FlushJobInfo) {} fn on_flush_completed(&self, _: &FlushJobInfo) {}
fn on_compaction_begin(&self, _: &CompactionJobInfo) {}
fn on_compaction_completed(&self, _: &CompactionJobInfo) {} fn on_compaction_completed(&self, _: &CompactionJobInfo) {}
fn on_subcompaction_begin(&self, _: &SubcompactionJobInfo) {}
fn on_subcompaction_completed(&self, _: &SubcompactionJobInfo) {}
fn on_external_file_ingested(&self, _: &IngestionInfo) {} fn on_external_file_ingested(&self, _: &IngestionInfo) {}
fn on_background_error(&self, _: DBBackgroundErrorReason, _: Result<(), String>) {} fn on_background_error(&self, _: DBBackgroundErrorReason, _: Result<(), String>) {}
fn on_stall_conditions_changed(&self, _: &WriteStallInfo) {} fn on_stall_conditions_changed(&self, _: &WriteStallInfo) {}
...@@ -195,6 +225,16 @@ extern "C" fn destructor(ctx: *mut c_void) { ...@@ -195,6 +225,16 @@ extern "C" fn destructor(ctx: *mut c_void) {
// Maybe we should reuse db instance? // Maybe we should reuse db instance?
// TODO: refactor DB implement so that we can convert DBInstance to DB. // TODO: refactor DB implement so that we can convert DBInstance to DB.
extern "C" fn on_flush_begin(ctx: *mut c_void, _: *mut DBInstance, info: *const DBFlushJobInfo) {
let (ctx, info) = unsafe {
(
&*(ctx as *mut Box<dyn EventListener>),
&*(info as *const FlushJobInfo),
)
};
ctx.on_flush_begin(info);
}
extern "C" fn on_flush_completed( extern "C" fn on_flush_completed(
ctx: *mut c_void, ctx: *mut c_void,
_: *mut DBInstance, _: *mut DBInstance,
...@@ -209,6 +249,20 @@ extern "C" fn on_flush_completed( ...@@ -209,6 +249,20 @@ extern "C" fn on_flush_completed(
ctx.on_flush_completed(info); ctx.on_flush_completed(info);
} }
extern "C" fn on_compaction_begin(
ctx: *mut c_void,
_: *mut DBInstance,
info: *const DBCompactionJobInfo,
) {
let (ctx, info) = unsafe {
(
&*(ctx as *mut Box<dyn EventListener>),
&*(info as *const CompactionJobInfo),
)
};
ctx.on_compaction_begin(info);
}
extern "C" fn on_compaction_completed( extern "C" fn on_compaction_completed(
ctx: *mut c_void, ctx: *mut c_void,
_: *mut DBInstance, _: *mut DBInstance,
...@@ -223,6 +277,26 @@ extern "C" fn on_compaction_completed( ...@@ -223,6 +277,26 @@ extern "C" fn on_compaction_completed(
ctx.on_compaction_completed(info); ctx.on_compaction_completed(info);
} }
extern "C" fn on_subcompaction_begin(ctx: *mut c_void, info: *const DBSubcompactionJobInfo) {
let (ctx, info) = unsafe {
(
&*(ctx as *mut Box<dyn EventListener>),
&*(info as *const SubcompactionJobInfo),
)
};
ctx.on_subcompaction_begin(info);
}
extern "C" fn on_subcompaction_completed(ctx: *mut c_void, info: *const DBSubcompactionJobInfo) {
let (ctx, info) = unsafe {
(
&*(ctx as *mut Box<dyn EventListener>),
&*(info as *const SubcompactionJobInfo),
)
};
ctx.on_subcompaction_completed(info);
}
extern "C" fn on_external_file_ingested( extern "C" fn on_external_file_ingested(
ctx: *mut c_void, ctx: *mut c_void,
_: *mut DBInstance, _: *mut DBInstance,
...@@ -270,8 +344,12 @@ pub fn new_event_listener<L: EventListener>(l: L) -> *mut DBEventListener { ...@@ -270,8 +344,12 @@ pub fn new_event_listener<L: EventListener>(l: L) -> *mut DBEventListener {
crocksdb_ffi::crocksdb_eventlistener_create( crocksdb_ffi::crocksdb_eventlistener_create(
Box::into_raw(Box::new(p)) as *mut c_void, Box::into_raw(Box::new(p)) as *mut c_void,
destructor, destructor,
on_flush_begin,
on_flush_completed, on_flush_completed,
on_compaction_begin,
on_compaction_completed, on_compaction_completed,
on_subcompaction_begin,
on_subcompaction_completed,
on_external_file_ingested, on_external_file_ingested,
on_background_error, on_background_error,
on_stall_conditions_changed, on_stall_conditions_changed,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment