Unverified Commit c4b1aca7 authored by Huachao Huang's avatar Huachao Huang Committed by GitHub

Add DeleteFilesInRanges and include_end option (#185)

parent e7a7ab9c
......@@ -116,6 +116,7 @@ using rocksdb::TablePropertiesCollector;
using rocksdb::TablePropertiesCollectorFactory;
using rocksdb::KeyVersion;
using rocksdb::DbPath;
using rocksdb::RangePtr;
using rocksdb::ColumnFamilyData;
using rocksdb::ColumnFamilyHandleImpl;
......@@ -3381,29 +3382,61 @@ void crocksdb_get_options_from_string(const crocksdb_options_t* base_options,
&new_options->rep));
}
void crocksdb_delete_file_in_range(crocksdb_t* db, const char* start_key,
size_t start_key_len, const char* limit_key,
size_t limit_key_len, char** errptr) {
void crocksdb_delete_files_in_range(
crocksdb_t* db,
const char* start_key, size_t start_key_len,
const char* limit_key, size_t limit_key_len,
bool include_end, char** errptr) {
Slice a, b;
SaveError(
errptr,
DeleteFilesInRange(
db->rep, db->rep->DefaultColumnFamily(),
(start_key ? (a = Slice(start_key, start_key_len), &a) : nullptr),
(limit_key ? (b = Slice(limit_key, limit_key_len), &b) : nullptr)));
(limit_key ? (b = Slice(limit_key, limit_key_len), &b) : nullptr),
include_end));
}
void crocksdb_delete_file_in_range_cf(
void crocksdb_delete_files_in_range_cf(
crocksdb_t* db, crocksdb_column_family_handle_t* column_family,
const char* start_key, size_t start_key_len, const char* limit_key,
size_t limit_key_len, char** errptr) {
const char* start_key, size_t start_key_len,
const char* limit_key, size_t limit_key_len,
bool include_end, char** errptr) {
Slice a, b;
SaveError(
errptr,
DeleteFilesInRange(
db->rep, column_family->rep,
(start_key ? (a = Slice(start_key, start_key_len), &a) : nullptr),
(limit_key ? (b = Slice(limit_key, limit_key_len), &b) : nullptr)));
(limit_key ? (b = Slice(limit_key, limit_key_len), &b) : nullptr),
include_end));
}
void crocksdb_delete_files_in_ranges_cf(
crocksdb_t* db, crocksdb_column_family_handle_t* cf,
const char* const* start_keys, const size_t* start_keys_lens,
const char* const* limit_keys, const size_t* limit_keys_lens,
size_t num_ranges, bool include_end, char** errptr) {
Slice starts[num_ranges];
Slice limits[num_ranges];
RangePtr ranges[num_ranges];
for (auto i = 0; i < num_ranges; i++) {
const Slice* start = nullptr;
if (start_keys[i]) {
starts[i] = Slice(start_keys[i], start_keys_lens[i]);
start = &starts[i];
}
const Slice* limit = nullptr;
if (limit_keys[i]) {
limits[i] = Slice(limit_keys[i], limit_keys_lens[i]);
limit = &limits[i];
}
ranges[i] = RangePtr(start, limit);
}
SaveError(
errptr,
DeleteFilesInRanges(
db->rep, cf->rep, ranges, num_ranges, include_end));
}
void crocksdb_free(void* ptr) { free(ptr); }
......
......@@ -1396,14 +1396,23 @@ extern C_ROCKSDB_LIBRARY_API void crocksdb_get_options_from_string(
const crocksdb_options_t* base_options, const char* opts_str,
crocksdb_options_t* new_options, char** errptr);
extern C_ROCKSDB_LIBRARY_API void crocksdb_delete_file_in_range(
crocksdb_t* db, const char* start_key, size_t start_key_len,
const char* limit_key, size_t limit_key_len, char** errptr);
extern C_ROCKSDB_LIBRARY_API void crocksdb_delete_files_in_range(
crocksdb_t* db,
const char* start_key, size_t start_key_len,
const char* limit_key, size_t limit_key_len,
bool include_end, char** errptr);
extern C_ROCKSDB_LIBRARY_API void crocksdb_delete_file_in_range_cf(
extern C_ROCKSDB_LIBRARY_API void crocksdb_delete_files_in_range_cf(
crocksdb_t* db, crocksdb_column_family_handle_t* column_family,
const char* start_key, size_t start_key_len, const char* limit_key,
size_t limit_key_len, char** errptr);
const char* start_key, size_t start_key_len,
const char* limit_key, size_t limit_key_len,
bool include_end, char** errptr);
extern C_ROCKSDB_LIBRARY_API void crocksdb_delete_files_in_ranges_cf(
crocksdb_t* db, crocksdb_column_family_handle_t* cf,
const char* const* start_keys, const size_t* start_keys_lens,
const char* const* limit_keys, const size_t* limit_keys_lens,
size_t num_ranges, bool include_end, char** errptr);
// referring to convention (3), this should be used by client
// to free memory that was malloc()ed
......
......@@ -1024,23 +1024,36 @@ extern "C" {
limit_key: *const u8,
limit_key_len: size_t,
);
pub fn crocksdb_delete_file_in_range(
pub fn crocksdb_delete_files_in_range(
db: *mut DBInstance,
range_start_key: *const u8,
range_start_key_len: size_t,
range_limit_key: *const u8,
range_limit_key_len: size_t,
include_end: bool,
err: *mut *mut c_char,
);
pub fn crocksdb_delete_file_in_range_cf(
pub fn crocksdb_delete_files_in_range_cf(
db: *mut DBInstance,
cf: *mut DBCFHandle,
range_start_key: *const u8,
range_start_key_len: size_t,
range_limit_key: *const u8,
range_limit_key_len: size_t,
include_end: bool,
err: *mut *mut c_char,
);
pub fn crocksdb_delete_files_in_ranges_cf(
db: *mut DBInstance,
cf: *mut DBCFHandle,
start_keys: *const *const uint8_t,
start_keys_lens: *const size_t,
limit_keys: *const *const uint8_t,
limit_keys_lens: *const size_t,
num_ranges: size_t,
include_end: bool,
errptr: *mut *mut c_char,
);
pub fn crocksdb_property_value(db: *mut DBInstance, propname: *const c_char) -> *mut c_char;
pub fn crocksdb_property_value_cf(
db: *mut DBInstance,
......
......@@ -1034,38 +1034,71 @@ impl DB {
}
}
pub fn delete_file_in_range(&self, start_key: &[u8], end_key: &[u8]) -> Result<(), String> {
pub fn delete_files_in_range(
&self,
start_key: &[u8],
end_key: &[u8],
include_end: bool,
) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_delete_file_in_range(
ffi_try!(crocksdb_delete_files_in_range(
self.inner,
start_key.as_ptr(),
start_key.len() as size_t,
end_key.as_ptr(),
end_key.len() as size_t
end_key.len() as size_t,
include_end
));
Ok(())
}
}
pub fn delete_file_in_range_cf(
pub fn delete_files_in_range_cf(
&self,
cf: &CFHandle,
start_key: &[u8],
end_key: &[u8],
include_end: bool,
) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_delete_file_in_range_cf(
ffi_try!(crocksdb_delete_files_in_range_cf(
self.inner,
cf.inner,
start_key.as_ptr(),
start_key.len() as size_t,
end_key.as_ptr(),
end_key.len() as size_t
end_key.len() as size_t,
include_end
));
Ok(())
}
}
pub fn delete_files_in_ranges_cf(
&self,
cf: &CFHandle,
ranges: &[Range],
include_end: bool,
) -> Result<(), String> {
let start_keys: Vec<*const u8> = ranges.iter().map(|x| x.start_key.as_ptr()).collect();
let start_keys_lens: Vec<_> = ranges.iter().map(|x| x.start_key.len()).collect();
let limit_keys: Vec<*const u8> = ranges.iter().map(|x| x.end_key.as_ptr()).collect();
let limit_keys_lens: Vec<_> = ranges.iter().map(|x| x.end_key.len()).collect();
unsafe {
ffi_try!(crocksdb_delete_files_in_ranges_cf(
self.inner,
cf.inner,
start_keys.as_ptr(),
start_keys_lens.as_ptr(),
limit_keys.as_ptr(),
limit_keys_lens.as_ptr(),
ranges.len(),
include_end
));
}
Ok(())
}
pub fn get_property_value(&self, name: &str) -> Option<String> {
self.get_property_value_cf_opt(None, name)
}
......
......@@ -64,7 +64,7 @@ fn test_delete_files_in_range_with_iter() {
let mut iter = db.iter();
// delete sst2
db.delete_file_in_range(b"key2", b"key7").unwrap();
db.delete_files_in_range(b"key2", b"key7", false).unwrap();
let mut count = 0;
assert!(iter.seek(SeekKey::Start));
......@@ -87,7 +87,7 @@ fn test_delete_files_in_range_with_snap() {
let snap = db.snapshot();
// delete sst2
db.delete_file_in_range(b"key2", b"key7").unwrap();
db.delete_files_in_range(b"key2", b"key7", false).unwrap();
let mut iter = snap.iter();
assert!(iter.seek(SeekKey::Start));
......@@ -142,7 +142,7 @@ fn test_delete_files_in_range_with_delete_range() {
// Before the fix, the file in the middle with keys 2 and 3 will be deleted,
// which can be a problem when we compact later. After the fix, no file will
// be deleted since they have an overlapped delete range [0, 6).
db.delete_file_in_range(b"1", b"4").unwrap();
db.delete_files_in_range(b"1", b"4", false).unwrap();
// Flush a file with keys 4 and 5 to level 0.
for i in 4..5 {
......@@ -167,3 +167,39 @@ fn test_delete_files_in_range_with_delete_range() {
assert_eq!(it.key(), b"5");
assert!(!it.next());
}
#[test]
fn test_delete_files_in_ranges() {
let path = TempDir::new("_rust_rocksdb_test_delete_files_in_multi_ranges").expect("");
let path_str = path.path().to_str().unwrap();
let db = initial_data(path_str);
// Delete files in multiple overlapped ranges.
// File ["key0", "key2"], ["key3", "key5"] should have been deleted,
// but file ["key6", "key8"] should not be deleted because "key8" is exclusive.
let mut ranges = Vec::new();
ranges.push(Range::new(b"key0", b"key4"));
ranges.push(Range::new(b"key2", b"key6"));
ranges.push(Range::new(b"key4", b"key8"));
let cf = db.cf_handle("default").unwrap();
db.delete_files_in_ranges_cf(cf, &ranges, false).unwrap();
// Check that ["key0", "key5"] have been deleted, but ["key6", "key8"] still exist.
let mut iter = db.iter();
iter.seek(SeekKey::Start);
for i in 6..9 {
assert!(iter.valid());
let k = format!("key{}", i);
assert_eq!(iter.key(), k.as_bytes());
iter.next();
}
assert!(!iter.valid());
// Delete the last file.
let ranges = vec![Range::new(b"key6", b"key8")];
db.delete_files_in_ranges_cf(cf, &ranges, true).unwrap();
let mut iter = db.iter();
iter.seek(SeekKey::Start);
assert!(!iter.valid());
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment