Unverified Commit cc16af35 authored by Xintao's avatar Xintao Committed by GitHub

rocksdb: Pick Atomize Rename operation related PRs into tikv-4.x (#640)

update rocksdb to include tikv/rocksdb#246 for tikv-4.x

include 6eb4fda4Signed-off-by: 's avatarXintao <hunterlxt@live.com>
parent 333af419
......@@ -3771,7 +3771,6 @@ struct crocksdb_encryption_key_manager_impl_t : public KeyManager {
crocksdb_encryption_key_manager_new_file_cb new_file;
crocksdb_encryption_key_manager_delete_file_cb delete_file;
crocksdb_encryption_key_manager_link_file_cb link_file;
crocksdb_encryption_key_manager_rename_file_cb rename_file;
virtual ~crocksdb_encryption_key_manager_impl_t() {
destructor(state);
......@@ -3823,17 +3822,6 @@ struct crocksdb_encryption_key_manager_impl_t : public KeyManager {
}
return s;
}
Status RenameFile(
const std::string& src_fname, const std::string& dst_fname) override {
const char* ret = rename_file(state, src_fname.c_str(), dst_fname.c_str());
Status s;
if (ret != nullptr) {
s = Status::Corruption(std::string(ret));
delete ret;
}
return s;
}
};
crocksdb_encryption_key_manager_t* crocksdb_encryption_key_manager_create(
......@@ -3841,8 +3829,7 @@ crocksdb_encryption_key_manager_t* crocksdb_encryption_key_manager_create(
crocksdb_encryption_key_manager_get_file_cb get_file,
crocksdb_encryption_key_manager_new_file_cb new_file,
crocksdb_encryption_key_manager_delete_file_cb delete_file,
crocksdb_encryption_key_manager_link_file_cb link_file,
crocksdb_encryption_key_manager_rename_file_cb rename_file) {
crocksdb_encryption_key_manager_link_file_cb link_file) {
std::shared_ptr<crocksdb_encryption_key_manager_impl_t> key_manager_impl =
std::make_shared<crocksdb_encryption_key_manager_impl_t>();
key_manager_impl->state = state;
......@@ -3851,8 +3838,8 @@ crocksdb_encryption_key_manager_t* crocksdb_encryption_key_manager_create(
key_manager_impl->new_file = new_file;
key_manager_impl->delete_file = delete_file;
key_manager_impl->link_file = link_file;
key_manager_impl->rename_file = rename_file;
crocksdb_encryption_key_manager_t* key_manager = new crocksdb_encryption_key_manager_t;
crocksdb_encryption_key_manager_t* key_manager =
new crocksdb_encryption_key_manager_t;
key_manager->rep = key_manager_impl;
return key_manager;
}
......@@ -3912,19 +3899,6 @@ const char* crocksdb_encryption_key_manager_link_file(
return nullptr;
}
const char* crocksdb_encryption_key_manager_rename_file(
crocksdb_encryption_key_manager_t* key_manager, const char* src_fname,
const char* dst_fname) {
assert(key_manager != nullptr && key_manager->rep != nullptr);
assert(src_fname != nullptr);
assert(dst_fname != nullptr);
Status s = key_manager->rep->RenameFile(src_fname, dst_fname);
if (!s.ok()) {
return strdup(s.ToString().c_str());
}
return nullptr;
}
crocksdb_env_t* crocksdb_key_managed_encrypted_env_create(
crocksdb_env_t* base_env, crocksdb_encryption_key_manager_t* key_manager) {
assert(base_env != nullptr);
......
......@@ -1490,8 +1490,6 @@ typedef const char* (*crocksdb_encryption_key_manager_delete_file_cb)(
void* state, const char* fname);
typedef const char* (*crocksdb_encryption_key_manager_link_file_cb)(
void* state, const char* src_fname, const char* dst_fname);
typedef const char* (*crocksdb_encryption_key_manager_rename_file_cb)(
void* state, const char* src_fname, const char* dst_fname);
extern C_ROCKSDB_LIBRARY_API crocksdb_encryption_key_manager_t*
crocksdb_encryption_key_manager_create(
......@@ -1499,8 +1497,7 @@ crocksdb_encryption_key_manager_create(
crocksdb_encryption_key_manager_get_file_cb get_file,
crocksdb_encryption_key_manager_new_file_cb new_file,
crocksdb_encryption_key_manager_delete_file_cb delete_file,
crocksdb_encryption_key_manager_link_file_cb link_file,
crocksdb_encryption_key_manager_rename_file_cb rename_file);
crocksdb_encryption_key_manager_link_file_cb link_file);
extern C_ROCKSDB_LIBRARY_API void crocksdb_encryption_key_manager_destroy(
crocksdb_encryption_key_manager_t*);
extern C_ROCKSDB_LIBRARY_API const char*
......@@ -1518,10 +1515,6 @@ extern C_ROCKSDB_LIBRARY_API const char*
crocksdb_encryption_key_manager_link_file(
crocksdb_encryption_key_manager_t* key_manager, const char* src_fname,
const char* dst_fname);
extern C_ROCKSDB_LIBRARY_API const char*
crocksdb_encryption_key_manager_rename_file(
crocksdb_encryption_key_manager_t* key_manager, const char* src_fname,
const char* dst_fname);
extern C_ROCKSDB_LIBRARY_API crocksdb_env_t*
crocksdb_key_managed_encrypted_env_create(
......
Subproject commit cf58bfefe2fda52cc7d4c8faf3173aa8cd41901d
Subproject commit db1164ae424c919e9ee835cc528489e06798cc39
......@@ -1528,7 +1528,6 @@ extern "C" {
) -> *const c_char,
delete_file: extern "C" fn(*mut c_void, *const c_char) -> *const c_char,
link_file: extern "C" fn(*mut c_void, *const c_char, *const c_char) -> *const c_char,
rename_file: extern "C" fn(*mut c_void, *const c_char, *const c_char) -> *const c_char,
) -> *mut DBEncryptionKeyManagerInstance;
#[cfg(feature = "encryption")]
pub fn crocksdb_encryption_key_manager_destroy(
......@@ -1557,12 +1556,6 @@ extern "C" {
src_fname: *const c_char,
dst_fname: *const c_char,
) -> *const c_char;
#[cfg(feature = "encryption")]
pub fn crocksdb_encryption_key_manager_rename_file(
key_manager: *mut DBEncryptionKeyManagerInstance,
src_fname: *const c_char,
dst_fname: *const c_char,
) -> *const c_char;
#[cfg(feature = "encryption")]
pub fn crocksdb_key_managed_encrypted_env_create(
......
......@@ -61,7 +61,6 @@ pub trait EncryptionKeyManager: Sync + Send {
fn new_file(&self, fname: &str) -> Result<FileEncryptionInfo>;
fn delete_file(&self, fname: &str) -> Result<()>;
fn link_file(&self, src_fname: &str, dst_fname: &str) -> Result<()>;
fn rename_file(&self, src_fname: &str, dst_fname: &str) -> Result<()>;
}
// Copy rust-owned error message to C-owned string. Caller is responsible to delete the result.
......@@ -185,39 +184,6 @@ extern "C" fn encryption_key_manager_link_file(
}
}
extern "C" fn encryption_key_manager_rename_file(
ctx: *mut c_void,
src_fname: *const c_char,
dst_fname: *const c_char,
) -> *const c_char {
let key_manager = unsafe { &*(ctx as *mut Arc<dyn EncryptionKeyManager>) };
let src_fname = match unsafe { CStr::from_ptr(src_fname).to_str() } {
Ok(ret) => ret,
Err(err) => {
return copy_error(format!(
"Encryption key manager encounter non-utf8 file name: {}",
err
));
}
};
let dst_fname = match unsafe { CStr::from_ptr(dst_fname).to_str() } {
Ok(ret) => ret,
Err(err) => {
return copy_error(format!(
"Encryption key manager encounter non-utf8 file name: {}",
err
));
}
};
match key_manager.rename_file(src_fname, dst_fname) {
Ok(()) => ptr::null(),
Err(err) => copy_error(format!(
"Encryption key manager delete file failure: {}",
err
)),
}
}
pub struct DBEncryptionKeyManager {
pub inner: *mut DBEncryptionKeyManagerInstance,
}
......@@ -238,7 +204,6 @@ impl DBEncryptionKeyManager {
encryption_key_manager_new_file,
encryption_key_manager_delete_file,
encryption_key_manager_link_file,
encryption_key_manager_rename_file,
)
};
DBEncryptionKeyManager { inner: instance }
......@@ -376,28 +341,6 @@ impl EncryptionKeyManager for DBEncryptionKeyManager {
}
ret
}
fn rename_file(&self, src_fname: &str, dst_fname: &str) -> Result<()> {
use std::io::{Error, ErrorKind};
let ret: Result<()>;
unsafe {
let err = crocksdb_ffi::crocksdb_encryption_key_manager_rename_file(
self.inner,
CString::new(src_fname).unwrap().as_ptr(),
CString::new(dst_fname).unwrap().as_ptr(),
);
if err == ptr::null() {
ret = Ok(());
} else {
ret = Err(Error::new(
ErrorKind::Other,
format!("{}", CStr::from_ptr(err).to_str().unwrap()),
));
libc::free(err as _);
}
}
ret
}
}
#[cfg(test)]
......@@ -423,7 +366,6 @@ mod test {
pub new_file_called: AtomicUsize,
pub delete_file_called: AtomicUsize,
pub link_file_called: AtomicUsize,
pub rename_file_called: AtomicUsize,
pub fname: Mutex<String>,
pub dst_fname: Mutex<String>,
pub return_value: Option<FileEncryptionInfo>,
......@@ -437,7 +379,6 @@ mod test {
new_file_called: AtomicUsize::new(0),
delete_file_called: AtomicUsize::new(0),
link_file_called: AtomicUsize::new(0),
rename_file_called: AtomicUsize::new(0),
fname: Mutex::new("".to_string()),
dst_fname: Mutex::new("".to_string()),
return_value: None,
......@@ -493,23 +434,6 @@ mod test {
None => Err(Error::new(ErrorKind::Other, "")),
}
}
fn rename_file(&self, src_fname: &str, dst_fname: &str) -> Result<()> {
let key_manager = self.lock().unwrap();
key_manager
.rename_file_called
.fetch_add(1, Ordering::SeqCst);
key_manager.fname.lock().unwrap().insert_str(0, src_fname);
key_manager
.dst_fname
.lock()
.unwrap()
.insert_str(0, dst_fname);
match &key_manager.return_value {
Some(_) => Ok(()),
None => Err(Error::new(ErrorKind::Other, "")),
}
}
}
#[test]
......@@ -548,7 +472,6 @@ mod test {
assert_eq!(0, record.new_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.delete_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.link_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.rename_file_called.load(Ordering::SeqCst));
assert_eq!("get_file_path", record.fname.lock().unwrap().as_str());
}
......@@ -562,7 +485,6 @@ mod test {
assert_eq!(0, record.new_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.delete_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.link_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.rename_file_called.load(Ordering::SeqCst));
assert_eq!("get_file_path", record.fname.lock().unwrap().as_str());
}
......@@ -586,7 +508,6 @@ mod test {
assert_eq!(1, record.new_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.delete_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.link_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.rename_file_called.load(Ordering::SeqCst));
assert_eq!("new_file_path", record.fname.lock().unwrap().as_str());
}
......@@ -600,7 +521,6 @@ mod test {
assert_eq!(1, record.new_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.delete_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.link_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.rename_file_called.load(Ordering::SeqCst));
assert_eq!("new_file_path", record.fname.lock().unwrap().as_str());
}
......@@ -617,7 +537,6 @@ mod test {
assert_eq!(0, record.new_file_called.load(Ordering::SeqCst));
assert_eq!(1, record.delete_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.link_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.rename_file_called.load(Ordering::SeqCst));
assert_eq!("delete_file_path", record.fname.lock().unwrap().as_str());
}
......@@ -631,7 +550,6 @@ mod test {
assert_eq!(0, record.new_file_called.load(Ordering::SeqCst));
assert_eq!(1, record.delete_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.link_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.rename_file_called.load(Ordering::SeqCst));
assert_eq!("delete_file_path", record.fname.lock().unwrap().as_str());
}
......@@ -650,7 +568,6 @@ mod test {
assert_eq!(0, record.new_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.delete_file_called.load(Ordering::SeqCst));
assert_eq!(1, record.link_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.rename_file_called.load(Ordering::SeqCst));
assert_eq!("src_link_file_path", record.fname.lock().unwrap().as_str());
assert_eq!(
"dst_link_file_path",
......@@ -670,60 +587,10 @@ mod test {
assert_eq!(0, record.new_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.delete_file_called.load(Ordering::SeqCst));
assert_eq!(1, record.link_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.rename_file_called.load(Ordering::SeqCst));
assert_eq!("src_link_file_path", record.fname.lock().unwrap().as_str());
assert_eq!(
"dst_link_file_path",
record.dst_fname.lock().unwrap().as_str()
);
}
#[test]
fn rename_file() {
let key_manager = Arc::new(Mutex::new(TestEncryptionKeyManager {
return_value: Some(FileEncryptionInfo::default()),
..Default::default()
}));
let db_key_manager = DBEncryptionKeyManager::new(key_manager.clone());
assert!(db_key_manager
.rename_file("src_rename_file_path", "dst_rename_file_path")
.is_ok());
let record = key_manager.lock().unwrap();
assert_eq!(0, record.get_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.new_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.delete_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.link_file_called.load(Ordering::SeqCst));
assert_eq!(1, record.rename_file_called.load(Ordering::SeqCst));
assert_eq!(
"src_rename_file_path",
record.fname.lock().unwrap().as_str()
);
assert_eq!(
"dst_rename_file_path",
record.dst_fname.lock().unwrap().as_str()
);
}
#[test]
fn rename_file_error() {
let key_manager = Arc::new(Mutex::new(TestEncryptionKeyManager::default()));
let db_key_manager = DBEncryptionKeyManager::new(key_manager.clone());
assert!(db_key_manager
.rename_file("src_rename_file_path", "dst_rename_file_path")
.is_err());
let record = key_manager.lock().unwrap();
assert_eq!(0, record.get_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.new_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.delete_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.link_file_called.load(Ordering::SeqCst));
assert_eq!(1, record.rename_file_called.load(Ordering::SeqCst));
assert_eq!(
"src_rename_file_path",
record.fname.lock().unwrap().as_str()
);
assert_eq!(
"dst_rename_file_path",
record.dst_fname.lock().unwrap().as_str()
);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment