Unverified Commit f79d2ba2 authored by yiwu-arbug's avatar yiwu-arbug Committed by GitHub

Add OnBackgroundError handler API (#349)

Add OnBackgroundError handler API. 

For the API, also adding crocksdb_status_ptr_t struct, which may be later extend to allow setting a different status (since rocksdb OnBackgroundError allow overriding the background error).
Signed-off-by: 's avatarYi Wu <yiwu@pingcap.com>
parent 34f9c021
...@@ -89,6 +89,7 @@ using rocksdb::InfoLogLevel; ...@@ -89,6 +89,7 @@ using rocksdb::InfoLogLevel;
using rocksdb::FileLock; using rocksdb::FileLock;
using rocksdb::FilterPolicy; using rocksdb::FilterPolicy;
using rocksdb::FlushJobInfo; using rocksdb::FlushJobInfo;
using rocksdb::BackgroundErrorReason;
using rocksdb::WriteStallInfo; using rocksdb::WriteStallInfo;
using rocksdb::WriteStallCondition; using rocksdb::WriteStallCondition;
using rocksdb::FlushOptions; using rocksdb::FlushOptions;
...@@ -184,6 +185,7 @@ extern "C" { ...@@ -184,6 +185,7 @@ extern "C" {
const char* block_base_table_str = "BlockBasedTable"; const char* block_base_table_str = "BlockBasedTable";
struct crocksdb_t { DB* rep; }; struct crocksdb_t { DB* rep; };
struct crocksdb_status_ptr_t { Status* rep; };
struct crocksdb_backup_engine_t { BackupEngine* rep; }; struct crocksdb_backup_engine_t { BackupEngine* rep; };
struct crocksdb_backup_engine_info_t { std::vector<BackupInfo> rep; }; struct crocksdb_backup_engine_info_t { std::vector<BackupInfo> rep; };
struct crocksdb_restore_options_t { RestoreOptions rep; }; struct crocksdb_restore_options_t { RestoreOptions rep; };
...@@ -599,6 +601,10 @@ crocksdb_t* crocksdb_open_for_read_only( ...@@ -599,6 +601,10 @@ crocksdb_t* crocksdb_open_for_read_only(
return result; return result;
} }
void crocksdb_status_ptr_get_error(crocksdb_status_ptr_t* status, char** errptr) {
SaveError(errptr, *(status->rep));
}
crocksdb_backup_engine_t* crocksdb_backup_engine_open( crocksdb_backup_engine_t* crocksdb_backup_engine_open(
const crocksdb_options_t* options, const char* path, char** errptr) { const crocksdb_options_t* options, const char* path, char** errptr) {
BackupEngine* be; BackupEngine* be;
...@@ -2012,6 +2018,8 @@ struct crocksdb_eventlistener_t : public EventListener { ...@@ -2012,6 +2018,8 @@ struct crocksdb_eventlistener_t : public EventListener {
const crocksdb_compactionjobinfo_t*); const crocksdb_compactionjobinfo_t*);
void (*on_external_file_ingested)( void (*on_external_file_ingested)(
void*, crocksdb_t*, const crocksdb_externalfileingestioninfo_t*); void*, crocksdb_t*, const crocksdb_externalfileingestioninfo_t*);
void (*on_background_error)(
void*, crocksdb_backgrounderrorreason_t, crocksdb_status_ptr_t*);
void (*on_stall_conditions_changed)(void*, const crocksdb_writestallinfo_t*); void (*on_stall_conditions_changed)(void*, const crocksdb_writestallinfo_t*);
virtual void OnFlushCompleted(DB* db, const FlushJobInfo& info) { virtual void OnFlushCompleted(DB* db, const FlushJobInfo& info) {
...@@ -2035,6 +2043,30 @@ struct crocksdb_eventlistener_t : public EventListener { ...@@ -2035,6 +2043,30 @@ struct crocksdb_eventlistener_t : public EventListener {
reinterpret_cast<const crocksdb_externalfileingestioninfo_t*>(&info)); reinterpret_cast<const crocksdb_externalfileingestioninfo_t*>(&info));
} }
virtual void OnBackgroundError(BackgroundErrorReason reason, Status* status) {
crocksdb_backgrounderrorreason_t r;
switch (reason) {
case BackgroundErrorReason::kFlush:
r = crocksdb_backgrounderrorreason_t::kFlush;
break;
case BackgroundErrorReason::kCompaction:
r = crocksdb_backgrounderrorreason_t::kCompaction;
break;
case BackgroundErrorReason::kWriteCallback:
r = crocksdb_backgrounderrorreason_t::kWriteCallback;
break;
case BackgroundErrorReason::kMemTable:
r = crocksdb_backgrounderrorreason_t::kMemTable;
break;
default:
assert(false);
}
crocksdb_status_ptr_t* s = new crocksdb_status_ptr_t;
s->rep = status;
on_background_error(state_, r, s);
delete s;
}
virtual void OnStallConditionsChanged(const WriteStallInfo& info) { virtual void OnStallConditionsChanged(const WriteStallInfo& info) {
on_stall_conditions_changed( on_stall_conditions_changed(
state_, state_,
...@@ -2049,6 +2081,7 @@ crocksdb_eventlistener_t* crocksdb_eventlistener_create( ...@@ -2049,6 +2081,7 @@ crocksdb_eventlistener_t* crocksdb_eventlistener_create(
on_flush_completed_cb on_flush_completed, on_flush_completed_cb on_flush_completed,
on_compaction_completed_cb on_compaction_completed, on_compaction_completed_cb on_compaction_completed,
on_external_file_ingested_cb on_external_file_ingested, on_external_file_ingested_cb on_external_file_ingested,
on_background_error_cb on_background_error,
on_stall_conditions_changed_cb on_stall_conditions_changed) { on_stall_conditions_changed_cb on_stall_conditions_changed) {
crocksdb_eventlistener_t* et = new crocksdb_eventlistener_t; crocksdb_eventlistener_t* et = new crocksdb_eventlistener_t;
et->state_ = state_; et->state_ = state_;
...@@ -2056,6 +2089,7 @@ crocksdb_eventlistener_t* crocksdb_eventlistener_create( ...@@ -2056,6 +2089,7 @@ crocksdb_eventlistener_t* crocksdb_eventlistener_create(
et->on_flush_completed = on_flush_completed; et->on_flush_completed = on_flush_completed;
et->on_compaction_completed = on_compaction_completed; et->on_compaction_completed = on_compaction_completed;
et->on_external_file_ingested = on_external_file_ingested; et->on_external_file_ingested = on_external_file_ingested;
et->on_background_error = on_background_error;
et->on_stall_conditions_changed = on_stall_conditions_changed; et->on_stall_conditions_changed = on_stall_conditions_changed;
return et; return et;
} }
......
...@@ -71,6 +71,7 @@ extern "C" { ...@@ -71,6 +71,7 @@ extern "C" {
/* Exported types */ /* Exported types */
typedef struct crocksdb_t crocksdb_t; typedef struct crocksdb_t crocksdb_t;
typedef struct crocksdb_status_ptr_t crocksdb_status_ptr_t;
typedef struct crocksdb_backup_engine_t crocksdb_backup_engine_t; typedef struct crocksdb_backup_engine_t crocksdb_backup_engine_t;
typedef struct crocksdb_backup_engine_info_t crocksdb_backup_engine_info_t; typedef struct crocksdb_backup_engine_info_t crocksdb_backup_engine_info_t;
typedef struct crocksdb_restore_options_t crocksdb_restore_options_t; typedef struct crocksdb_restore_options_t crocksdb_restore_options_t;
...@@ -172,6 +173,13 @@ typedef enum crocksdb_ratelimiter_mode_t { ...@@ -172,6 +173,13 @@ typedef enum crocksdb_ratelimiter_mode_t {
kAllIo = 3, kAllIo = 3,
} crocksdb_ratelimiter_mode_t; } crocksdb_ratelimiter_mode_t;
typedef enum crocksdb_backgrounderrorreason_t {
kFlush = 1,
kCompaction = 2,
kWriteCallback = 3,
kMemTable = 4,
} crocksdb_backgrounderrorreason_t;
/* DB operations */ /* DB operations */
extern C_ROCKSDB_LIBRARY_API crocksdb_t* crocksdb_open( extern C_ROCKSDB_LIBRARY_API crocksdb_t* crocksdb_open(
...@@ -184,6 +192,9 @@ extern C_ROCKSDB_LIBRARY_API crocksdb_t* crocksdb_open_for_read_only( ...@@ -184,6 +192,9 @@ extern C_ROCKSDB_LIBRARY_API crocksdb_t* crocksdb_open_for_read_only(
const crocksdb_options_t* options, const char* name, const crocksdb_options_t* options, const char* name,
unsigned char error_if_log_file_exist, char** errptr); unsigned char error_if_log_file_exist, char** errptr);
extern C_ROCKSDB_LIBRARY_API void crocksdb_status_ptr_get_error(
crocksdb_status_ptr_t*, char** errptr);
extern C_ROCKSDB_LIBRARY_API crocksdb_backup_engine_t* crocksdb_backup_engine_open( extern C_ROCKSDB_LIBRARY_API crocksdb_backup_engine_t* crocksdb_backup_engine_open(
const crocksdb_options_t* options, const char* path, char** errptr); const crocksdb_options_t* options, const char* path, char** errptr);
...@@ -760,6 +771,8 @@ typedef void (*on_compaction_completed_cb)(void*, crocksdb_t*, ...@@ -760,6 +771,8 @@ typedef void (*on_compaction_completed_cb)(void*, crocksdb_t*,
const crocksdb_compactionjobinfo_t*); const crocksdb_compactionjobinfo_t*);
typedef void (*on_external_file_ingested_cb)( typedef void (*on_external_file_ingested_cb)(
void*, crocksdb_t*, const crocksdb_externalfileingestioninfo_t*); void*, crocksdb_t*, const crocksdb_externalfileingestioninfo_t*);
typedef void (*on_background_error_cb)(void*, crocksdb_backgrounderrorreason_t,
crocksdb_status_ptr_t*);
typedef void (*on_stall_conditions_changed_cb)(void*, const crocksdb_writestallinfo_t*); typedef void (*on_stall_conditions_changed_cb)(void*, const crocksdb_writestallinfo_t*);
extern C_ROCKSDB_LIBRARY_API crocksdb_eventlistener_t* extern C_ROCKSDB_LIBRARY_API crocksdb_eventlistener_t*
...@@ -768,6 +781,7 @@ crocksdb_eventlistener_create( ...@@ -768,6 +781,7 @@ crocksdb_eventlistener_create(
on_flush_completed_cb on_flush_completed, on_flush_completed_cb on_flush_completed,
on_compaction_completed_cb on_compaction_completed, on_compaction_completed_cb on_compaction_completed,
on_external_file_ingested_cb on_external_file_ingested, on_external_file_ingested_cb on_external_file_ingested,
on_background_error_cb on_background_error,
on_stall_conditions_changed_cb on_stall_conditions_changed); on_stall_conditions_changed_cb on_stall_conditions_changed);
extern C_ROCKSDB_LIBRARY_API void crocksdb_eventlistener_destroy( extern C_ROCKSDB_LIBRARY_API void crocksdb_eventlistener_destroy(
crocksdb_eventlistener_t*); crocksdb_eventlistener_t*);
...@@ -2002,7 +2016,6 @@ crocksdb_iostats_context_logger_nanos(crocksdb_iostats_context_t*); ...@@ -2002,7 +2016,6 @@ crocksdb_iostats_context_logger_nanos(crocksdb_iostats_context_t*);
extern C_ROCKSDB_LIBRARY_API void extern C_ROCKSDB_LIBRARY_API void
crocksdb_run_ldb_tool(int argc, char** argv, const crocksdb_options_t* opts); crocksdb_run_ldb_tool(int argc, char** argv, const crocksdb_options_t* opts);
/* Titan */ /* Titan */
struct ctitandb_blob_index_t { struct ctitandb_blob_index_t {
uint64_t file_number; uint64_t file_number;
......
...@@ -74,6 +74,7 @@ pub enum DBCompactionOptions {} ...@@ -74,6 +74,7 @@ pub enum DBCompactionOptions {}
pub enum DBPerfContext {} pub enum DBPerfContext {}
pub enum DBIOStatsContext {} pub enum DBIOStatsContext {}
pub enum DBWriteStallInfo {} pub enum DBWriteStallInfo {}
pub enum DBStatusPtr {}
#[derive(Copy, Clone, Debug, Eq, PartialEq)] #[derive(Copy, Clone, Debug, Eq, PartialEq)]
#[repr(C)] #[repr(C)]
...@@ -291,6 +292,15 @@ pub enum IndexType { ...@@ -291,6 +292,15 @@ pub enum IndexType {
TwoLevelIndexSearch = 2, TwoLevelIndexSearch = 2,
} }
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
#[repr(C)]
pub enum DBBackgroundErrorReason {
Flush = 1,
Compaction = 2,
WriteCallback = 3,
MemTable = 4,
}
pub fn error_message(ptr: *mut c_char) -> String { pub fn error_message(ptr: *mut c_char) -> String {
let c_str = unsafe { CStr::from_ptr(ptr) }; let c_str = unsafe { CStr::from_ptr(ptr) };
let s = format!("{}", c_str.to_string_lossy()); let s = format!("{}", c_str.to_string_lossy());
...@@ -325,6 +335,7 @@ macro_rules! ffi_try { ...@@ -325,6 +335,7 @@ macro_rules! ffi_try {
// TODO audit the use of boolean arguments, b/c I think they need to be u8 // TODO audit the use of boolean arguments, b/c I think they need to be u8
// instead... // instead...
extern "C" { extern "C" {
pub fn crocksdb_status_ptr_get_error(status: *mut DBStatusPtr, err: *mut *mut c_char);
pub fn crocksdb_get_db_options(db: *mut DBInstance) -> *mut Options; pub fn crocksdb_get_db_options(db: *mut DBInstance) -> *mut Options;
pub fn crocksdb_set_db_options( pub fn crocksdb_set_db_options(
db: *mut DBInstance, db: *mut DBInstance,
...@@ -1708,6 +1719,7 @@ extern "C" { ...@@ -1708,6 +1719,7 @@ extern "C" {
flush: extern "C" fn(*mut c_void, *mut DBInstance, *const DBFlushJobInfo), flush: extern "C" fn(*mut c_void, *mut DBInstance, *const DBFlushJobInfo),
compact: extern "C" fn(*mut c_void, *mut DBInstance, *const DBCompactionJobInfo), compact: extern "C" fn(*mut c_void, *mut DBInstance, *const DBCompactionJobInfo),
ingest: extern "C" fn(*mut c_void, *mut DBInstance, *const DBIngestionInfo), ingest: extern "C" fn(*mut c_void, *mut DBInstance, *const DBIngestionInfo),
bg_error: extern "C" fn(*mut c_void, DBBackgroundErrorReason, *mut DBStatusPtr),
stall_conditions: extern "C" fn(*mut c_void, *const DBWriteStallInfo), stall_conditions: extern "C" fn(*mut c_void, *const DBWriteStallInfo),
) -> *mut DBEventListener; ) -> *mut DBEventListener;
pub fn crocksdb_eventlistener_destroy(et: *mut DBEventListener); pub fn crocksdb_eventlistener_destroy(et: *mut DBEventListener);
......
...@@ -12,8 +12,9 @@ ...@@ -12,8 +12,9 @@
// limitations under the License. // limitations under the License.
use crocksdb_ffi::{ use crocksdb_ffi::{
self, CompactionReason, DBCompactionJobInfo, DBEventListener, DBFlushJobInfo, DBIngestionInfo, self, CompactionReason, DBBackgroundErrorReason, DBCompactionJobInfo, DBEventListener,
DBInstance, DBWriteStallInfo, WriteStallCondition, DBFlushJobInfo, DBIngestionInfo, DBInstance, DBStatusPtr, DBWriteStallInfo,
WriteStallCondition,
}; };
use libc::c_void; use libc::c_void;
use std::path::Path; use std::path::Path;
...@@ -178,6 +179,7 @@ pub trait EventListener: Send + Sync { ...@@ -178,6 +179,7 @@ pub trait EventListener: Send + Sync {
fn on_flush_completed(&self, _: &FlushJobInfo) {} fn on_flush_completed(&self, _: &FlushJobInfo) {}
fn on_compaction_completed(&self, _: &CompactionJobInfo) {} fn on_compaction_completed(&self, _: &CompactionJobInfo) {}
fn on_external_file_ingested(&self, _: &IngestionInfo) {} fn on_external_file_ingested(&self, _: &IngestionInfo) {}
fn on_background_error(&self, _: DBBackgroundErrorReason, _: Result<(), String>) {}
fn on_stall_conditions_changed(&self, _: &WriteStallInfo) {} fn on_stall_conditions_changed(&self, _: &WriteStallInfo) {}
} }
...@@ -231,6 +233,23 @@ extern "C" fn on_external_file_ingested( ...@@ -231,6 +233,23 @@ extern "C" fn on_external_file_ingested(
ctx.on_external_file_ingested(info); ctx.on_external_file_ingested(info);
} }
extern "C" fn on_background_error(
ctx: *mut c_void,
reason: DBBackgroundErrorReason,
status: *mut DBStatusPtr,
) {
let (ctx, result) = unsafe {
(
&*(ctx as *mut Box<dyn EventListener>),
|| -> Result<(), String> {
ffi_try!(crocksdb_status_ptr_get_error(status));
Ok(())
}(),
)
};
ctx.on_background_error(reason, result);
}
extern "C" fn on_stall_conditions_changed(ctx: *mut c_void, info: *const DBWriteStallInfo) { extern "C" fn on_stall_conditions_changed(ctx: *mut c_void, info: *const DBWriteStallInfo) {
let (ctx, info) = unsafe { let (ctx, info) = unsafe {
( (
...@@ -250,6 +269,7 @@ pub fn new_event_listener<L: EventListener>(l: L) -> *mut DBEventListener { ...@@ -250,6 +269,7 @@ pub fn new_event_listener<L: EventListener>(l: L) -> *mut DBEventListener {
on_flush_completed, on_flush_completed,
on_compaction_completed, on_compaction_completed,
on_external_file_ingested, on_external_file_ingested,
on_background_error,
on_stall_conditions_changed, on_stall_conditions_changed,
) )
} }
......
...@@ -26,9 +26,9 @@ pub use event_listener::{ ...@@ -26,9 +26,9 @@ pub use event_listener::{
}; };
pub use librocksdb_sys::{ pub use librocksdb_sys::{
self as crocksdb_ffi, new_bloom_filter, CompactionPriority, CompactionReason, self as crocksdb_ffi, new_bloom_filter, CompactionPriority, CompactionReason,
DBBottommostLevelCompaction, DBCompactionStyle, DBCompressionType, DBEntryType, DBInfoLogLevel, DBBackgroundErrorReason, DBBottommostLevelCompaction, DBCompactionStyle, DBCompressionType,
DBRateLimiterMode, DBRecoveryMode, DBStatisticsHistogramType, DBStatisticsTickerType, DBEntryType, DBInfoLogLevel, DBRateLimiterMode, DBRecoveryMode, DBStatisticsHistogramType,
DBTitanDBBlobRunMode, IndexType, WriteStallCondition, DBStatisticsTickerType, DBStatusPtr, DBTitanDBBlobRunMode, IndexType, WriteStallCondition,
}; };
pub use merge_operator::MergeOperands; pub use merge_operator::MergeOperands;
pub use metadata::{ColumnFamilyMetaData, LevelMetaData, SstFileMetaData}; pub use metadata::{ColumnFamilyMetaData, LevelMetaData, SstFileMetaData};
......
...@@ -121,6 +121,17 @@ impl EventListener for StallEventCounter { ...@@ -121,6 +121,17 @@ impl EventListener for StallEventCounter {
} }
} }
#[derive(Default, Clone)]
struct BackgroundErrorCounter {
background_error: Arc<AtomicUsize>,
}
impl EventListener for BackgroundErrorCounter {
fn on_background_error(&self, _: DBBackgroundErrorReason, _: Result<(), String>) {
self.background_error.fetch_add(1, Ordering::SeqCst);
}
}
#[test] #[test]
fn test_event_listener_stall_conditions_changed() { fn test_event_listener_stall_conditions_changed() {
let path = TempDir::new("_rust_rocksdb_event_listener_stall_conditions").expect(""); let path = TempDir::new("_rust_rocksdb_event_listener_stall_conditions").expect("");
...@@ -243,3 +254,23 @@ fn test_event_listener_ingestion() { ...@@ -243,3 +254,23 @@ fn test_event_listener_ingestion() {
assert_eq!(db.get(b"k2").unwrap().unwrap(), b"v2"); assert_eq!(db.get(b"k2").unwrap().unwrap(), b"v2");
assert_ne!(counter.ingestion.load(Ordering::SeqCst), 0); assert_ne!(counter.ingestion.load(Ordering::SeqCst), 0);
} }
#[test]
fn test_event_listener_background_error() {
// TODO(yiwu): should create a test Env object which inject some IO error, to
// actually trigger background error.
let path = TempDir::new("_rust_rocksdb_event_listener_ingestion").expect("");
let path_str = path.path().to_str().unwrap();
let mut opts = DBOptions::new();
let counter = BackgroundErrorCounter::default();
opts.add_event_listener(counter.clone());
opts.create_if_missing(true);
let db = DB::open(opts, path_str).unwrap();
for i in 1..10 {
db.put(format!("{:04}", i).as_bytes(), b"value").unwrap();
db.flush(false).unwrap();
}
assert_eq!(counter.background_error.load(Ordering::SeqCst), 0);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment