Unverified Commit db9052e9 authored by Connor's avatar Connor Committed by GitHub

Add delete files in ranges API for titan (#317) (#329)

* add delete files in ranges API for titan
Signed-off-by: 's avatarConnor1996 <zbk602423539@gmail.com>
parent 68467cbb
......@@ -5227,4 +5227,66 @@ void ctitandb_create_iterators(
}
}
void ctitandb_delete_files_in_range(
crocksdb_t* db,
const char* start_key, size_t start_key_len,
const char* limit_key, size_t limit_key_len,
bool include_end, char** errptr) {
Slice a, b;
RangePtr range(
start_key ? (a = Slice(start_key, start_key_len), &a) : nullptr,
limit_key ? (b = Slice(limit_key, limit_key_len), &b) : nullptr);
SaveError(
errptr,
static_cast<TitanDB*>(db->rep)->DeleteFilesInRanges(
db->rep->DefaultColumnFamily(),
&range, 1,
include_end));
}
void ctitandb_delete_files_in_range_cf(
crocksdb_t* db, crocksdb_column_family_handle_t* column_family,
const char* start_key, size_t start_key_len,
const char* limit_key, size_t limit_key_len,
bool include_end, char** errptr) {
Slice a, b;
RangePtr range(
start_key ? (a = Slice(start_key, start_key_len), &a) : nullptr,
limit_key ? (b = Slice(limit_key, limit_key_len), &b) : nullptr);
SaveError(
errptr,
static_cast<TitanDB*>(db->rep)->DeleteFilesInRanges(
column_family->rep,
&range, 1,
include_end));
}
void ctitandb_delete_files_in_ranges_cf(
crocksdb_t* db, crocksdb_column_family_handle_t* cf,
const char* const* start_keys, const size_t* start_keys_lens,
const char* const* limit_keys, const size_t* limit_keys_lens,
size_t num_ranges, bool include_end, char** errptr) {
std::vector<Slice> starts(num_ranges);
std::vector<Slice> limits(num_ranges);
std::vector<RangePtr> ranges(num_ranges);
for (auto i = 0; i < num_ranges; i++) {
const Slice* start = nullptr;
if (start_keys[i]) {
starts[i] = Slice(start_keys[i], start_keys_lens[i]);
start = &starts[i];
}
const Slice* limit = nullptr;
if (limit_keys[i]) {
limits[i] = Slice(limit_keys[i], limit_keys_lens[i]);
limit = &limits[i];
}
ranges[i] = RangePtr(start, limit);
}
SaveError(
errptr,
static_cast<TitanDB*>(db->rep)->DeleteFilesInRanges(
cf->rep, &ranges[0], num_ranges, include_end));
}
} // end extern "C"
......@@ -2085,6 +2085,24 @@ extern C_ROCKSDB_LIBRARY_API void ctitandb_create_iterators(
size_t size,
char** errptr);
extern C_ROCKSDB_LIBRARY_API void ctitandb_delete_files_in_range(
crocksdb_t* db,
const char* start_key, size_t start_key_len,
const char* limit_key, size_t limit_key_len,
bool include_end, char** errptr);
extern C_ROCKSDB_LIBRARY_API void ctitandb_delete_files_in_range_cf(
crocksdb_t* db, crocksdb_column_family_handle_t* column_family,
const char* start_key, size_t start_key_len,
const char* limit_key, size_t limit_key_len,
bool include_end, char** errptr);
extern C_ROCKSDB_LIBRARY_API void ctitandb_delete_files_in_ranges_cf(
crocksdb_t* db, crocksdb_column_family_handle_t* cf,
const char* const* start_keys, const size_t* start_keys_lens,
const char* const* limit_keys, const size_t* limit_keys_lens,
size_t num_ranges, bool include_end, char** errptr);
#ifdef __cplusplus
} /* end extern "C" */
#endif
......
Subproject commit ab0fa88daa6b3f8d7ba7fac6cc52346b51e4c039
Subproject commit 9cd140f9cbd41ba7b1d477e11a8f1c40a1c23e01
Subproject commit 7a03c83eddcafaa48c37a751045326deca9173ff
Subproject commit 862b680230647774c6825d6278de22d68a0be909
......@@ -1889,6 +1889,36 @@ extern "C" {
titan_readopts: *const DBTitanReadOptions,
cf_handle: *mut DBCFHandle,
) -> *mut DBIterator;
pub fn ctitandb_delete_files_in_range(
db: *mut DBInstance,
range_start_key: *const u8,
range_start_key_len: size_t,
range_limit_key: *const u8,
range_limit_key_len: size_t,
include_end: bool,
err: *mut *mut c_char,
);
pub fn ctitandb_delete_files_in_range_cf(
db: *mut DBInstance,
cf: *mut DBCFHandle,
range_start_key: *const u8,
range_start_key_len: size_t,
range_limit_key: *const u8,
range_limit_key_len: size_t,
include_end: bool,
err: *mut *mut c_char,
);
pub fn ctitandb_delete_files_in_ranges_cf(
db: *mut DBInstance,
cf: *mut DBCFHandle,
start_keys: *const *const uint8_t,
start_keys_lens: *const size_t,
limit_keys: *const *const uint8_t,
limit_keys_lens: *const size_t,
num_ranges: size_t,
include_end: bool,
errptr: *mut *mut c_char,
);
}
#[cfg(test)]
......
......@@ -1196,14 +1196,25 @@ impl DB {
include_end: bool,
) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_delete_files_in_range(
self.inner,
start_key.as_ptr(),
start_key.len() as size_t,
end_key.as_ptr(),
end_key.len() as size_t,
include_end
));
if self.is_titan() {
ffi_try!(ctitandb_delete_files_in_range(
self.inner,
start_key.as_ptr(),
start_key.len() as size_t,
end_key.as_ptr(),
end_key.len() as size_t,
include_end
));
} else {
ffi_try!(crocksdb_delete_files_in_range(
self.inner,
start_key.as_ptr(),
start_key.len() as size_t,
end_key.as_ptr(),
end_key.len() as size_t,
include_end
));
}
Ok(())
}
}
......@@ -1216,15 +1227,27 @@ impl DB {
include_end: bool,
) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_delete_files_in_range_cf(
self.inner,
cf.inner,
start_key.as_ptr(),
start_key.len() as size_t,
end_key.as_ptr(),
end_key.len() as size_t,
include_end
));
if self.is_titan() {
ffi_try!(ctitandb_delete_files_in_range_cf(
self.inner,
cf.inner,
start_key.as_ptr(),
start_key.len() as size_t,
end_key.as_ptr(),
end_key.len() as size_t,
include_end
));
} else {
ffi_try!(crocksdb_delete_files_in_range_cf(
self.inner,
cf.inner,
start_key.as_ptr(),
start_key.len() as size_t,
end_key.as_ptr(),
end_key.len() as size_t,
include_end
));
}
Ok(())
}
}
......@@ -1240,16 +1263,29 @@ impl DB {
let limit_keys: Vec<*const u8> = ranges.iter().map(|x| x.end_key.as_ptr()).collect();
let limit_keys_lens: Vec<_> = ranges.iter().map(|x| x.end_key.len()).collect();
unsafe {
ffi_try!(crocksdb_delete_files_in_ranges_cf(
self.inner,
cf.inner,
start_keys.as_ptr(),
start_keys_lens.as_ptr(),
limit_keys.as_ptr(),
limit_keys_lens.as_ptr(),
ranges.len(),
include_end
));
if self.is_titan() {
ffi_try!(ctitandb_delete_files_in_ranges_cf(
self.inner,
cf.inner,
start_keys.as_ptr(),
start_keys_lens.as_ptr(),
limit_keys.as_ptr(),
limit_keys_lens.as_ptr(),
ranges.len(),
include_end
));
} else {
ffi_try!(crocksdb_delete_files_in_ranges_cf(
self.inner,
cf.inner,
start_keys.as_ptr(),
start_keys_lens.as_ptr(),
limit_keys.as_ptr(),
limit_keys_lens.as_ptr(),
ranges.len(),
include_end
));
}
}
Ok(())
}
......
......@@ -12,15 +12,17 @@
// limitations under the License.
use std::collections::HashMap;
use std::ops;
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use tempdir::TempDir;
use rand::Rng;
use rocksdb::{
ColumnFamilyOptions, DBCompressionType, DBEntryType, DBOptions, ReadOptions, SeekKey,
TablePropertiesCollector, TablePropertiesCollectorFactory, TitanBlobIndex, TitanDBOptions,
UserCollectedProperties, Writable, DB,
CFHandle, ColumnFamilyOptions, CompactOptions, DBBottommostLevelCompaction, DBCompressionType,
DBEntryType, DBOptions, Range, ReadOptions, SeekKey, TablePropertiesCollector,
TablePropertiesCollectorFactory, TitanBlobIndex, TitanDBOptions, UserCollectedProperties,
Writable, DB,
};
fn encode_u32(x: u32) -> Vec<u8> {
......@@ -197,3 +199,81 @@ fn test_titan_blob_index() {
assert_eq!(index2.blob_size, index.blob_size);
assert_eq!(index2.blob_offset, index.blob_offset);
}
// Generates a file with `range` and put it to the bottommost level.
fn generate_file_bottom_level(db: &DB, handle: &CFHandle, range: ops::Range<u32>) {
for i in range {
let k = format!("key{}", i);
let v = format!("value{}", i);
db.put_cf(handle, k.as_bytes(), v.as_bytes()).unwrap();
}
db.flush_cf(handle, true).unwrap();
let opts = db.get_options_cf(handle);
let mut compact_opts = CompactOptions::new();
compact_opts.set_change_level(true);
compact_opts.set_target_level(opts.get_num_levels() as i32 - 1);
compact_opts.set_bottommost_level_compaction(DBBottommostLevelCompaction::Skip);
db.compact_range_cf_opt(handle, &compact_opts, None, None);
}
#[test]
fn test_titan_delete_files_in_ranges() {
let path = TempDir::new("_rust_rocksdb_test_titan_delete_files_in_multi_ranges").unwrap();
let tdb_path = path.path().join("titandb");
let mut tdb_opts = TitanDBOptions::new();
tdb_opts.set_dirname(tdb_path.to_str().unwrap());
tdb_opts.set_min_blob_size(0);
let mut opts = DBOptions::new();
opts.create_if_missing(true);
opts.set_titandb_options(&tdb_opts);
let mut cf_opts = ColumnFamilyOptions::new();
let f = TitanCollectorFactory::default();
cf_opts.add_table_properties_collector_factory("titan-collector", Box::new(f));
cf_opts.set_titandb_options(&tdb_opts);
let db = DB::open_cf(
opts,
path.path().to_str().unwrap(),
vec![("default", cf_opts)],
)
.unwrap();
let cf_handle = db.cf_handle("default").unwrap();
generate_file_bottom_level(&db, cf_handle, 0..3);
generate_file_bottom_level(&db, cf_handle, 3..6);
generate_file_bottom_level(&db, cf_handle, 6..9);
// Delete files in multiple overlapped ranges.
// File ["key0", "key2"], ["key3", "key5"] should have been deleted,
// but file ["key6", "key8"] should not be deleted because "key8" is exclusive.
let mut ranges = Vec::new();
ranges.push(Range::new(b"key0", b"key4"));
ranges.push(Range::new(b"key2", b"key6"));
ranges.push(Range::new(b"key4", b"key8"));
db.delete_files_in_ranges_cf(cf_handle, &ranges, false)
.unwrap();
// Check that ["key0", "key5"] have been deleted, but ["key6", "key8"] still exist.
let mut readopts = ReadOptions::new();
readopts.set_titan_key_only(true);
let mut iter = db.iter_cf_opt(&cf_handle, readopts);
iter.seek(SeekKey::Start);
for i in 6..9 {
assert!(iter.valid());
let k = format!("key{}", i);
assert_eq!(iter.key(), k.as_bytes());
iter.next();
}
assert!(!iter.valid());
// Delete the last file.
let ranges = vec![Range::new(b"key6", b"key8")];
db.delete_files_in_ranges_cf(cf_handle, &ranges, true)
.unwrap();
let mut iter = db.iter();
iter.seek(SeekKey::Start);
assert!(!iter.valid());
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment