Unverified Commit 6eb4fda4 authored by Xintao's avatar Xintao Committed by GitHub

rocksdb: Atomize RenameFile in KeyManagedEncryptedEnv (#585)

* update rocksdb to atomize RenameFile in KeyManagedEncryptedEnv
* removed redundant rename logic from key manager
Signed-off-by: 's avatarXintao <hunterlxt@live.com>
parent 954c2a47
......@@ -3954,7 +3954,6 @@ struct crocksdb_encryption_key_manager_impl_t : public KeyManager {
crocksdb_encryption_key_manager_new_file_cb new_file;
crocksdb_encryption_key_manager_delete_file_cb delete_file;
crocksdb_encryption_key_manager_link_file_cb link_file;
crocksdb_encryption_key_manager_rename_file_cb rename_file;
virtual ~crocksdb_encryption_key_manager_impl_t() { destructor(state); }
......@@ -4004,17 +4003,6 @@ struct crocksdb_encryption_key_manager_impl_t : public KeyManager {
}
return s;
}
Status RenameFile(const std::string& src_fname,
const std::string& dst_fname) override {
const char* ret = rename_file(state, src_fname.c_str(), dst_fname.c_str());
Status s;
if (ret != nullptr) {
s = Status::Corruption(std::string(ret));
delete ret;
}
return s;
}
};
crocksdb_encryption_key_manager_t* crocksdb_encryption_key_manager_create(
......@@ -4022,8 +4010,7 @@ crocksdb_encryption_key_manager_t* crocksdb_encryption_key_manager_create(
crocksdb_encryption_key_manager_get_file_cb get_file,
crocksdb_encryption_key_manager_new_file_cb new_file,
crocksdb_encryption_key_manager_delete_file_cb delete_file,
crocksdb_encryption_key_manager_link_file_cb link_file,
crocksdb_encryption_key_manager_rename_file_cb rename_file) {
crocksdb_encryption_key_manager_link_file_cb link_file) {
std::shared_ptr<crocksdb_encryption_key_manager_impl_t> key_manager_impl =
std::make_shared<crocksdb_encryption_key_manager_impl_t>();
key_manager_impl->state = state;
......@@ -4032,7 +4019,6 @@ crocksdb_encryption_key_manager_t* crocksdb_encryption_key_manager_create(
key_manager_impl->new_file = new_file;
key_manager_impl->delete_file = delete_file;
key_manager_impl->link_file = link_file;
key_manager_impl->rename_file = rename_file;
crocksdb_encryption_key_manager_t* key_manager =
new crocksdb_encryption_key_manager_t;
key_manager->rep = key_manager_impl;
......@@ -4094,19 +4080,6 @@ const char* crocksdb_encryption_key_manager_link_file(
return nullptr;
}
const char* crocksdb_encryption_key_manager_rename_file(
crocksdb_encryption_key_manager_t* key_manager, const char* src_fname,
const char* dst_fname) {
assert(key_manager != nullptr && key_manager->rep != nullptr);
assert(src_fname != nullptr);
assert(dst_fname != nullptr);
Status s = key_manager->rep->RenameFile(src_fname, dst_fname);
if (!s.ok()) {
return strdup(s.ToString().c_str());
}
return nullptr;
}
crocksdb_env_t* crocksdb_key_managed_encrypted_env_create(
crocksdb_env_t* base_env, crocksdb_encryption_key_manager_t* key_manager) {
assert(base_env != nullptr);
......
......@@ -1590,8 +1590,6 @@ typedef const char* (*crocksdb_encryption_key_manager_delete_file_cb)(
void* state, const char* fname);
typedef const char* (*crocksdb_encryption_key_manager_link_file_cb)(
void* state, const char* src_fname, const char* dst_fname);
typedef const char* (*crocksdb_encryption_key_manager_rename_file_cb)(
void* state, const char* src_fname, const char* dst_fname);
extern C_ROCKSDB_LIBRARY_API crocksdb_encryption_key_manager_t*
crocksdb_encryption_key_manager_create(
......@@ -1599,8 +1597,7 @@ crocksdb_encryption_key_manager_create(
crocksdb_encryption_key_manager_get_file_cb get_file,
crocksdb_encryption_key_manager_new_file_cb new_file,
crocksdb_encryption_key_manager_delete_file_cb delete_file,
crocksdb_encryption_key_manager_link_file_cb link_file,
crocksdb_encryption_key_manager_rename_file_cb rename_file);
crocksdb_encryption_key_manager_link_file_cb link_file);
extern C_ROCKSDB_LIBRARY_API void crocksdb_encryption_key_manager_destroy(
crocksdb_encryption_key_manager_t*);
extern C_ROCKSDB_LIBRARY_API const char*
......@@ -1618,10 +1615,6 @@ extern C_ROCKSDB_LIBRARY_API const char*
crocksdb_encryption_key_manager_link_file(
crocksdb_encryption_key_manager_t* key_manager, const char* src_fname,
const char* dst_fname);
extern C_ROCKSDB_LIBRARY_API const char*
crocksdb_encryption_key_manager_rename_file(
crocksdb_encryption_key_manager_t* key_manager, const char* src_fname,
const char* dst_fname);
extern C_ROCKSDB_LIBRARY_API crocksdb_env_t*
crocksdb_key_managed_encrypted_env_create(crocksdb_env_t*,
......
Subproject commit 7d209a859fbfed6095b3df9cf2a33fdd8056527b
Subproject commit 93e89a58edb820f2daa15362120617e49ef6ddd6
......@@ -1681,7 +1681,6 @@ extern "C" {
) -> *const c_char,
delete_file: extern "C" fn(*mut c_void, *const c_char) -> *const c_char,
link_file: extern "C" fn(*mut c_void, *const c_char, *const c_char) -> *const c_char,
rename_file: extern "C" fn(*mut c_void, *const c_char, *const c_char) -> *const c_char,
) -> *mut DBEncryptionKeyManagerInstance;
#[cfg(feature = "encryption")]
pub fn crocksdb_encryption_key_manager_destroy(
......@@ -1710,12 +1709,6 @@ extern "C" {
src_fname: *const c_char,
dst_fname: *const c_char,
) -> *const c_char;
#[cfg(feature = "encryption")]
pub fn crocksdb_encryption_key_manager_rename_file(
key_manager: *mut DBEncryptionKeyManagerInstance,
src_fname: *const c_char,
dst_fname: *const c_char,
) -> *const c_char;
#[cfg(feature = "encryption")]
pub fn crocksdb_key_managed_encrypted_env_create(
......
......@@ -61,7 +61,6 @@ pub trait EncryptionKeyManager: Sync + Send {
fn new_file(&self, fname: &str) -> Result<FileEncryptionInfo>;
fn delete_file(&self, fname: &str) -> Result<()>;
fn link_file(&self, src_fname: &str, dst_fname: &str) -> Result<()>;
fn rename_file(&self, src_fname: &str, dst_fname: &str) -> Result<()>;
}
// Copy rust-owned error message to C-owned string. Caller is responsible to delete the result.
......@@ -185,39 +184,6 @@ extern "C" fn encryption_key_manager_link_file(
}
}
extern "C" fn encryption_key_manager_rename_file(
ctx: *mut c_void,
src_fname: *const c_char,
dst_fname: *const c_char,
) -> *const c_char {
let key_manager = unsafe { &*(ctx as *mut Arc<dyn EncryptionKeyManager>) };
let src_fname = match unsafe { CStr::from_ptr(src_fname).to_str() } {
Ok(ret) => ret,
Err(err) => {
return copy_error(format!(
"Encryption key manager encounter non-utf8 file name: {}",
err
));
}
};
let dst_fname = match unsafe { CStr::from_ptr(dst_fname).to_str() } {
Ok(ret) => ret,
Err(err) => {
return copy_error(format!(
"Encryption key manager encounter non-utf8 file name: {}",
err
));
}
};
match key_manager.rename_file(src_fname, dst_fname) {
Ok(()) => ptr::null(),
Err(err) => copy_error(format!(
"Encryption key manager delete file failure: {}",
err
)),
}
}
pub struct DBEncryptionKeyManager {
pub inner: *mut DBEncryptionKeyManagerInstance,
}
......@@ -238,7 +204,6 @@ impl DBEncryptionKeyManager {
encryption_key_manager_new_file,
encryption_key_manager_delete_file,
encryption_key_manager_link_file,
encryption_key_manager_rename_file,
)
};
DBEncryptionKeyManager { inner: instance }
......@@ -376,28 +341,6 @@ impl EncryptionKeyManager for DBEncryptionKeyManager {
}
ret
}
fn rename_file(&self, src_fname: &str, dst_fname: &str) -> Result<()> {
use std::io::{Error, ErrorKind};
let ret: Result<()>;
unsafe {
let err = crocksdb_ffi::crocksdb_encryption_key_manager_rename_file(
self.inner,
CString::new(src_fname).unwrap().as_ptr(),
CString::new(dst_fname).unwrap().as_ptr(),
);
if err == ptr::null() {
ret = Ok(());
} else {
ret = Err(Error::new(
ErrorKind::Other,
format!("{}", CStr::from_ptr(err).to_str().unwrap()),
));
libc::free(err as _);
}
}
ret
}
}
#[cfg(test)]
......@@ -423,7 +366,6 @@ mod test {
pub new_file_called: AtomicUsize,
pub delete_file_called: AtomicUsize,
pub link_file_called: AtomicUsize,
pub rename_file_called: AtomicUsize,
pub fname: Mutex<String>,
pub dst_fname: Mutex<String>,
pub return_value: Option<FileEncryptionInfo>,
......@@ -437,7 +379,6 @@ mod test {
new_file_called: AtomicUsize::new(0),
delete_file_called: AtomicUsize::new(0),
link_file_called: AtomicUsize::new(0),
rename_file_called: AtomicUsize::new(0),
fname: Mutex::new("".to_string()),
dst_fname: Mutex::new("".to_string()),
return_value: None,
......@@ -493,23 +434,6 @@ mod test {
None => Err(Error::new(ErrorKind::Other, "")),
}
}
fn rename_file(&self, src_fname: &str, dst_fname: &str) -> Result<()> {
let key_manager = self.lock().unwrap();
key_manager
.rename_file_called
.fetch_add(1, Ordering::SeqCst);
key_manager.fname.lock().unwrap().insert_str(0, src_fname);
key_manager
.dst_fname
.lock()
.unwrap()
.insert_str(0, dst_fname);
match &key_manager.return_value {
Some(_) => Ok(()),
None => Err(Error::new(ErrorKind::Other, "")),
}
}
}
#[test]
......@@ -548,7 +472,6 @@ mod test {
assert_eq!(0, record.new_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.delete_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.link_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.rename_file_called.load(Ordering::SeqCst));
assert_eq!("get_file_path", record.fname.lock().unwrap().as_str());
}
......@@ -562,7 +485,6 @@ mod test {
assert_eq!(0, record.new_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.delete_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.link_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.rename_file_called.load(Ordering::SeqCst));
assert_eq!("get_file_path", record.fname.lock().unwrap().as_str());
}
......@@ -586,7 +508,6 @@ mod test {
assert_eq!(1, record.new_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.delete_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.link_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.rename_file_called.load(Ordering::SeqCst));
assert_eq!("new_file_path", record.fname.lock().unwrap().as_str());
}
......@@ -600,7 +521,6 @@ mod test {
assert_eq!(1, record.new_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.delete_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.link_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.rename_file_called.load(Ordering::SeqCst));
assert_eq!("new_file_path", record.fname.lock().unwrap().as_str());
}
......@@ -617,7 +537,6 @@ mod test {
assert_eq!(0, record.new_file_called.load(Ordering::SeqCst));
assert_eq!(1, record.delete_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.link_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.rename_file_called.load(Ordering::SeqCst));
assert_eq!("delete_file_path", record.fname.lock().unwrap().as_str());
}
......@@ -631,7 +550,6 @@ mod test {
assert_eq!(0, record.new_file_called.load(Ordering::SeqCst));
assert_eq!(1, record.delete_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.link_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.rename_file_called.load(Ordering::SeqCst));
assert_eq!("delete_file_path", record.fname.lock().unwrap().as_str());
}
......@@ -650,7 +568,6 @@ mod test {
assert_eq!(0, record.new_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.delete_file_called.load(Ordering::SeqCst));
assert_eq!(1, record.link_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.rename_file_called.load(Ordering::SeqCst));
assert_eq!("src_link_file_path", record.fname.lock().unwrap().as_str());
assert_eq!(
"dst_link_file_path",
......@@ -670,60 +587,10 @@ mod test {
assert_eq!(0, record.new_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.delete_file_called.load(Ordering::SeqCst));
assert_eq!(1, record.link_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.rename_file_called.load(Ordering::SeqCst));
assert_eq!("src_link_file_path", record.fname.lock().unwrap().as_str());
assert_eq!(
"dst_link_file_path",
record.dst_fname.lock().unwrap().as_str()
);
}
#[test]
fn rename_file() {
let key_manager = Arc::new(Mutex::new(TestEncryptionKeyManager {
return_value: Some(FileEncryptionInfo::default()),
..Default::default()
}));
let db_key_manager = DBEncryptionKeyManager::new(key_manager.clone());
assert!(db_key_manager
.rename_file("src_rename_file_path", "dst_rename_file_path")
.is_ok());
let record = key_manager.lock().unwrap();
assert_eq!(0, record.get_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.new_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.delete_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.link_file_called.load(Ordering::SeqCst));
assert_eq!(1, record.rename_file_called.load(Ordering::SeqCst));
assert_eq!(
"src_rename_file_path",
record.fname.lock().unwrap().as_str()
);
assert_eq!(
"dst_rename_file_path",
record.dst_fname.lock().unwrap().as_str()
);
}
#[test]
fn rename_file_error() {
let key_manager = Arc::new(Mutex::new(TestEncryptionKeyManager::default()));
let db_key_manager = DBEncryptionKeyManager::new(key_manager.clone());
assert!(db_key_manager
.rename_file("src_rename_file_path", "dst_rename_file_path")
.is_err());
let record = key_manager.lock().unwrap();
assert_eq!(0, record.get_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.new_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.delete_file_called.load(Ordering::SeqCst));
assert_eq!(0, record.link_file_called.load(Ordering::SeqCst));
assert_eq!(1, record.rename_file_called.load(Ordering::SeqCst));
assert_eq!(
"src_rename_file_path",
record.fname.lock().unwrap().as_str()
);
assert_eq!(
"dst_rename_file_path",
record.dst_fname.lock().unwrap().as_str()
);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment