Unverified Commit 359aaca7 authored by Wallace's avatar Wallace Committed by GitHub

Add iter method for WriteBatch (#511)

* add write batch iterator and append interface
Signed-off-by: 's avatarLittle-Wallace <bupt2013211450@gmail.com>
parent b6366e7f
...@@ -623,6 +623,10 @@ struct crocksdb_universal_compaction_options_t { ...@@ -623,6 +623,10 @@ struct crocksdb_universal_compaction_options_t {
rocksdb::CompactionOptionsUniversal* rep; rocksdb::CompactionOptionsUniversal* rep;
}; };
struct crocksdb_writebatch_iterator_t {
rocksdb::WriteBatch::Iterator* rep;
};
#ifdef OPENSSL #ifdef OPENSSL
struct crocksdb_file_encryption_info_t { struct crocksdb_file_encryption_info_t {
FileEncryptionInfo* rep; FileEncryptionInfo* rep;
...@@ -1691,22 +1695,67 @@ void crocksdb_writebatch_iterate(crocksdb_writebatch_t* b, void* state, ...@@ -1691,22 +1695,67 @@ void crocksdb_writebatch_iterate(crocksdb_writebatch_t* b, void* state,
const char* v, size_t vlen), const char* v, size_t vlen),
void (*deleted)(void*, const char* k, void (*deleted)(void*, const char* k,
size_t klen)) { size_t klen)) {
class H : public WriteBatch::Handler { class HandlerWrapper : public WriteBatch::Handler {
public:
void* state_;
void (*put_)(void*, const char* k, size_t klen, const char* v, size_t vlen);
void (*deleted_)(void*, const char* k, size_t klen);
void Put(const Slice& key, const Slice& value) override {
(*put_)(state_, key.data(), key.size(), value.data(), value.size());
}
void Delete(const Slice& key) override {
(*deleted_)(state_, key.data(), key.size());
}
};
HandlerWrapper handler;
handler.state_ = state;
handler.put_ = put;
handler.deleted_ = deleted;
b->rep.Iterate(&handler);
}
void crocksdb_writebatch_iterate_cf(
crocksdb_writebatch_t* b, void* state,
void (*put)(void*, const char* k, size_t klen, const char* v, size_t vlen),
void (*put_cf)(void*, uint32_t cf, const char* k, size_t klen,
const char* v, size_t vlen),
void (*deleted)(void*, const char* k, size_t klen),
void (*deleted_cf)(void*, uint32_t cf, const char* k, size_t klen)) {
class HandlerWrapper : public WriteBatch::Handler {
public: public:
void* state_; void* state_;
void (*put_)(void*, const char* k, size_t klen, const char* v, size_t vlen); void (*put_)(void*, const char* k, size_t klen, const char* v, size_t vlen);
void (*put_cf_)(void*, uint32_t cf, const char* k, size_t klen,
const char* v, size_t vlen);
void (*deleted_)(void*, const char* k, size_t klen); void (*deleted_)(void*, const char* k, size_t klen);
virtual void Put(const Slice& key, const Slice& value) override { void (*deleted_cf_)(void*, uint32_t cf, const char* k, size_t klen);
void Put(const Slice& key, const Slice& value) override {
(*put_)(state_, key.data(), key.size(), value.data(), value.size()); (*put_)(state_, key.data(), key.size(), value.data(), value.size());
} }
virtual void Delete(const Slice& key) override {
Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
(*put_cf_)(state_, column_family_id, key.data(), key.size(), value.data(),
value.size());
return Status::OK();
}
void Delete(const Slice& key) override {
(*deleted_)(state_, key.data(), key.size()); (*deleted_)(state_, key.data(), key.size());
} }
Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
(*deleted_cf_)(state_, column_family_id, key.data(), key.size());
return Status::OK();
}
}; };
H handler; HandlerWrapper handler;
handler.state_ = state; handler.state_ = state;
handler.put_ = put; handler.put_ = put;
handler.put_cf_ = put_cf;
handler.deleted_ = deleted; handler.deleted_ = deleted;
handler.deleted_cf_ = deleted_cf;
b->rep.Iterate(&handler); b->rep.Iterate(&handler);
} }
...@@ -1729,6 +1778,76 @@ void crocksdb_writebatch_rollback_to_save_point(crocksdb_writebatch_t* b, ...@@ -1729,6 +1778,76 @@ void crocksdb_writebatch_rollback_to_save_point(crocksdb_writebatch_t* b,
SaveError(errptr, b->rep.RollbackToSavePoint()); SaveError(errptr, b->rep.RollbackToSavePoint());
} }
void crocksdb_writebatch_set_content(crocksdb_writebatch_t* b, const char* data,
size_t dlen) {
rocksdb::WriteBatchInternal::SetContents(&b->rep, Slice(data, dlen));
}
void crocksdb_writebatch_append_content(crocksdb_writebatch_t* dest,
const char* data, size_t dlen) {
rocksdb::WriteBatchInternal::AppendContents(&dest->rep, Slice(data, dlen));
}
int crocksdb_writebatch_ref_count(const char* data, size_t dlen) {
Slice s(data, dlen);
rocksdb::WriteBatch::WriteBatchRef ref(s);
return ref.Count();
}
crocksdb_writebatch_iterator_t* crocksdb_writebatch_ref_iterator_create(
const char* data, size_t dlen) {
Slice input(data, dlen);
rocksdb::WriteBatch::WriteBatchRef ref(input);
auto it = new crocksdb_writebatch_iterator_t;
it->rep = ref.NewIterator();
it->rep->SeekToFirst();
return it;
}
crocksdb_writebatch_iterator_t* crocksdb_writebatch_iterator_create(
crocksdb_writebatch_t* dest) {
auto it = new crocksdb_writebatch_iterator_t;
it->rep = dest->rep.NewIterator();
it->rep->SeekToFirst();
return it;
}
void crocksdb_writebatch_iterator_destroy(crocksdb_writebatch_iterator_t* it) {
delete it->rep;
delete it;
}
unsigned char crocksdb_writebatch_iterator_valid(
crocksdb_writebatch_iterator_t* it) {
return it->rep->Valid();
}
void crocksdb_writebatch_iterator_next(crocksdb_writebatch_iterator_t* it) {
it->rep->Next();
}
const char* crocksdb_writebatch_iterator_key(crocksdb_writebatch_iterator_t* it,
size_t* klen) {
*klen = it->rep->Key().size();
return it->rep->Key().data();
}
const char* crocksdb_writebatch_iterator_value(
crocksdb_writebatch_iterator_t* it, size_t* klen) {
*klen = it->rep->Value().size();
return it->rep->Value().data();
}
int crocksdb_writebatch_iterator_value_type(
crocksdb_writebatch_iterator_t* it) {
return static_cast<int>(it->rep->GetValueType());
}
uint32_t crocksdb_writebatch_iterator_column_family_id(
crocksdb_writebatch_iterator_t* it) {
return it->rep->GetColumnFamilyId();
}
crocksdb_block_based_table_options_t* crocksdb_block_based_options_create() { crocksdb_block_based_table_options_t* crocksdb_block_based_options_create() {
return new crocksdb_block_based_table_options_t; return new crocksdb_block_based_table_options_t;
} }
......
...@@ -152,6 +152,7 @@ typedef struct crocksdb_iostats_context_t crocksdb_iostats_context_t; ...@@ -152,6 +152,7 @@ typedef struct crocksdb_iostats_context_t crocksdb_iostats_context_t;
typedef struct crocksdb_writestallinfo_t crocksdb_writestallinfo_t; typedef struct crocksdb_writestallinfo_t crocksdb_writestallinfo_t;
typedef struct crocksdb_writestallcondition_t crocksdb_writestallcondition_t; typedef struct crocksdb_writestallcondition_t crocksdb_writestallcondition_t;
typedef struct crocksdb_map_property_t crocksdb_map_property_t; typedef struct crocksdb_map_property_t crocksdb_map_property_t;
typedef struct crocksdb_writebatch_iterator_t crocksdb_writebatch_iterator_t;
typedef enum crocksdb_table_property_t { typedef enum crocksdb_table_property_t {
kDataSize = 1, kDataSize = 1,
...@@ -644,6 +645,15 @@ extern C_ROCKSDB_LIBRARY_API void crocksdb_writebatch_iterate( ...@@ -644,6 +645,15 @@ extern C_ROCKSDB_LIBRARY_API void crocksdb_writebatch_iterate(
crocksdb_writebatch_t*, void* state, crocksdb_writebatch_t*, void* state,
void (*put)(void*, const char* k, size_t klen, const char* v, size_t vlen), void (*put)(void*, const char* k, size_t klen, const char* v, size_t vlen),
void (*deleted)(void*, const char* k, size_t klen)); void (*deleted)(void*, const char* k, size_t klen));
extern C_ROCKSDB_LIBRARY_API void crocksdb_writebatch_iterate_cf(
crocksdb_writebatch_t* b, void* state,
void (*put)(void*, const char* k, size_t klen, const char* v, size_t vlen),
void (*put_cf)(void*, uint32_t cf, const char* k, size_t klen,
const char* v, size_t vlen),
void (*deleted)(void*, const char* k, size_t klen),
void (*deleted_cf)(void*, uint32_t cf, const char* k, size_t klen));
extern C_ROCKSDB_LIBRARY_API const char* crocksdb_writebatch_data( extern C_ROCKSDB_LIBRARY_API const char* crocksdb_writebatch_data(
crocksdb_writebatch_t*, size_t* size); crocksdb_writebatch_t*, size_t* size);
extern C_ROCKSDB_LIBRARY_API void crocksdb_writebatch_set_save_point( extern C_ROCKSDB_LIBRARY_API void crocksdb_writebatch_set_save_point(
...@@ -652,6 +662,33 @@ extern C_ROCKSDB_LIBRARY_API void crocksdb_writebatch_pop_save_point( ...@@ -652,6 +662,33 @@ extern C_ROCKSDB_LIBRARY_API void crocksdb_writebatch_pop_save_point(
crocksdb_writebatch_t*, char** errptr); crocksdb_writebatch_t*, char** errptr);
extern C_ROCKSDB_LIBRARY_API void crocksdb_writebatch_rollback_to_save_point( extern C_ROCKSDB_LIBRARY_API void crocksdb_writebatch_rollback_to_save_point(
crocksdb_writebatch_t*, char** errptr); crocksdb_writebatch_t*, char** errptr);
extern C_ROCKSDB_LIBRARY_API void crocksdb_writebatch_rollback_to_save_point(
crocksdb_writebatch_t*, char** errptr);
extern C_ROCKSDB_LIBRARY_API void crocksdb_writebatch_set_content(
crocksdb_writebatch_t* b, const char* data, size_t dlen);
extern C_ROCKSDB_LIBRARY_API void crocksdb_writebatch_append_content(
crocksdb_writebatch_t* dest, const char* data, size_t dlen);
extern C_ROCKSDB_LIBRARY_API int crocksdb_writebatch_ref_count(const char* data,
size_t dlen);
extern C_ROCKSDB_LIBRARY_API crocksdb_writebatch_iterator_t*
crocksdb_writebatch_ref_iterator_create(const char* data, size_t dlen);
extern C_ROCKSDB_LIBRARY_API crocksdb_writebatch_iterator_t*
crocksdb_writebatch_iterator_create(crocksdb_writebatch_t* dest);
extern C_ROCKSDB_LIBRARY_API void crocksdb_writebatch_iterator_destroy(
crocksdb_writebatch_iterator_t* it);
extern C_ROCKSDB_LIBRARY_API unsigned char crocksdb_writebatch_iterator_valid(
crocksdb_writebatch_iterator_t* it);
extern C_ROCKSDB_LIBRARY_API void crocksdb_writebatch_iterator_next(
crocksdb_writebatch_iterator_t* it);
extern C_ROCKSDB_LIBRARY_API const char* crocksdb_writebatch_iterator_key(
crocksdb_writebatch_iterator_t* it, size_t* klen);
extern C_ROCKSDB_LIBRARY_API const char* crocksdb_writebatch_iterator_value(
crocksdb_writebatch_iterator_t* it, size_t* klen);
extern C_ROCKSDB_LIBRARY_API int crocksdb_writebatch_iterator_value_type(
crocksdb_writebatch_iterator_t* it);
extern C_ROCKSDB_LIBRARY_API uint32_t
crocksdb_writebatch_iterator_column_family_id(
crocksdb_writebatch_iterator_t* it);
/* Block based table options */ /* Block based table options */
......
Subproject commit 1a540da001187d56c8afb25e53aa52aec2a16173 Subproject commit 9b549268cf61b7e1a90402f4f3c2b3cb442cd925
...@@ -163,6 +163,8 @@ pub struct DBFileEncryptionInfo(c_void); ...@@ -163,6 +163,8 @@ pub struct DBFileEncryptionInfo(c_void);
#[cfg(feature = "encryption")] #[cfg(feature = "encryption")]
#[repr(C)] #[repr(C)]
pub struct DBEncryptionKeyManagerInstance(c_void); pub struct DBEncryptionKeyManagerInstance(c_void);
#[repr(C)]
pub struct DBWriteBatchIterator(c_void);
#[derive(Copy, Clone, Debug, Eq, PartialEq)] #[derive(Copy, Clone, Debug, Eq, PartialEq)]
#[repr(C)] #[repr(C)]
...@@ -405,6 +407,23 @@ pub enum DBEncryptionMethod { ...@@ -405,6 +407,23 @@ pub enum DBEncryptionMethod {
Aes256Ctr = 4, Aes256Ctr = 4,
} }
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
#[repr(C)]
pub enum DBValueType {
TypeDeletion = 0x0,
TypeValue = 0x1,
TypeMerge = 0x2,
TypeColumnFamilyDeletion = 0x4, // WAL only.
TypeColumnFamilyValue = 0x5, // WAL only.
TypeColumnFamilyMerge = 0x6, // WAL only.
TypeSingleDeletion = 0x7,
TypeColumnFamilyRangeDeletion = 0xE, // WAL only.
TypeRangeDeletion = 0xF, // meta block
TypeColumnFamilyBlobIndex = 0x10, // Blob DB only
TypeBlobIndex = 0x11, // Blob DB only
MaxValue = 0x7F,
}
#[cfg(feature = "encryption")] #[cfg(feature = "encryption")]
impl fmt::Display for DBEncryptionMethod { impl fmt::Display for DBEncryptionMethod {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
...@@ -1129,6 +1148,32 @@ extern "C" { ...@@ -1129,6 +1148,32 @@ extern "C" {
), ),
deleted_fn: extern "C" fn(state: *mut c_void, k: *const u8, klen: size_t), deleted_fn: extern "C" fn(state: *mut c_void, k: *const u8, klen: size_t),
); );
pub fn crocksdb_writebatch_iterate_cf(
batch: *mut DBWriteBatch,
state: *mut c_void,
put_fn: unsafe extern "C" fn(
state: *mut c_void,
k: *const u8,
klen: size_t,
v: *const u8,
vlen: size_t,
) -> (),
put_cf_fn: unsafe extern "C" fn(
state: *mut c_void,
cf: u32,
k: *const u8,
klen: size_t,
v: *const u8,
vlen: size_t,
) -> (),
delete_fn: unsafe extern "C" fn(state: *mut c_void, k: *const u8, klen: size_t) -> (),
delete_cf_fn: unsafe extern "C" fn(
state: *mut c_void,
cf: u32,
k: *const u8,
klen: size_t,
) -> (),
);
pub fn crocksdb_writebatch_data(batch: *mut DBWriteBatch, size: *mut size_t) -> *const u8; pub fn crocksdb_writebatch_data(batch: *mut DBWriteBatch, size: *mut size_t) -> *const u8;
pub fn crocksdb_writebatch_set_save_point(batch: *mut DBWriteBatch); pub fn crocksdb_writebatch_set_save_point(batch: *mut DBWriteBatch);
pub fn crocksdb_writebatch_pop_save_point(batch: *mut DBWriteBatch, err: *mut *mut c_char); pub fn crocksdb_writebatch_pop_save_point(batch: *mut DBWriteBatch, err: *mut *mut c_char);
...@@ -1136,7 +1181,35 @@ extern "C" { ...@@ -1136,7 +1181,35 @@ extern "C" {
batch: *mut DBWriteBatch, batch: *mut DBWriteBatch,
err: *mut *mut c_char, err: *mut *mut c_char,
); );
pub fn crocksdb_writebatch_set_content(batch: *mut DBWriteBatch, data: *const u8, dlen: size_t);
pub fn crocksdb_writebatch_append_content(
dest: *mut DBWriteBatch,
data: *const u8,
dlen: size_t,
);
pub fn crocksdb_writebatch_ref_count(data: *const u8, dlen: size_t) -> c_int;
pub fn crocksdb_writebatch_ref_iterator_create(
data: *const u8,
dlen: size_t,
) -> *mut DBWriteBatchIterator;
pub fn crocksdb_writebatch_iterator_create(
dest: *mut DBWriteBatch,
) -> *mut DBWriteBatchIterator;
pub fn crocksdb_writebatch_iterator_destroy(it: *mut DBWriteBatchIterator);
pub fn crocksdb_writebatch_iterator_valid(it: *mut DBWriteBatchIterator) -> bool;
pub fn crocksdb_writebatch_iterator_next(it: *mut DBWriteBatchIterator);
pub fn crocksdb_writebatch_iterator_key(
it: *mut DBWriteBatchIterator,
klen: *mut size_t,
) -> *mut u8;
pub fn crocksdb_writebatch_iterator_value(
it: *mut DBWriteBatchIterator,
vlen: *mut size_t,
) -> *mut u8;
pub fn crocksdb_writebatch_iterator_value_type(it: *mut DBWriteBatchIterator) -> DBValueType;
pub fn crocksdb_writebatch_iterator_column_family_id(it: *mut DBWriteBatchIterator) -> u32;
// Comparator // Comparator
pub fn crocksdb_options_set_comparator(options: *mut Options, cb: *mut DBComparator); pub fn crocksdb_options_set_comparator(options: *mut Options, cb: *mut DBComparator);
pub fn crocksdb_comparator_create( pub fn crocksdb_comparator_create(
......
...@@ -49,8 +49,7 @@ pub use perf_context::{get_perf_level, set_perf_level, IOStatsContext, PerfConte ...@@ -49,8 +49,7 @@ pub use perf_context::{get_perf_level, set_perf_level, IOStatsContext, PerfConte
pub use rocksdb::{ pub use rocksdb::{
load_latest_options, run_ldb_tool, run_sst_dump_tool, set_external_sst_file_global_seq_no, load_latest_options, run_ldb_tool, run_sst_dump_tool, set_external_sst_file_global_seq_no,
BackupEngine, CFHandle, Cache, DBIterator, DBVector, Env, ExternalSstFileInfo, MapProperty, BackupEngine, CFHandle, Cache, DBIterator, DBVector, Env, ExternalSstFileInfo, MapProperty,
MemoryAllocator, Range, SeekKey, SequentialFile, SstFileReader, SstFileWriter, Writable, MemoryAllocator, Range, SeekKey, SequentialFile, SstFileReader, SstFileWriter, Writable, DB,
WriteBatch, DB,
}; };
pub use rocksdb_options::{ pub use rocksdb_options::{
BlockBasedOptions, CColumnFamilyDescriptor, ColumnFamilyOptions, CompactOptions, BlockBasedOptions, CColumnFamilyDescriptor, ColumnFamilyOptions, CompactOptions,
...@@ -67,6 +66,7 @@ pub use table_properties::{ ...@@ -67,6 +66,7 @@ pub use table_properties::{
pub use table_properties_collector::TablePropertiesCollector; pub use table_properties_collector::TablePropertiesCollector;
pub use table_properties_collector_factory::TablePropertiesCollectorFactory; pub use table_properties_collector_factory::TablePropertiesCollectorFactory;
pub use titan::{TitanBlobIndex, TitanDBOptions}; pub use titan::{TitanBlobIndex, TitanDBOptions};
pub use write_batch::{WriteBatch, WriteBatchRef};
#[allow(deprecated)] #[allow(deprecated)]
pub use rocksdb::Kv; pub use rocksdb::Kv;
...@@ -92,6 +92,7 @@ mod table_properties_collector_factory; ...@@ -92,6 +92,7 @@ mod table_properties_collector_factory;
pub mod table_properties_rc; pub mod table_properties_rc;
mod table_properties_rc_handles; mod table_properties_rc_handles;
mod titan; mod titan;
mod write_batch;
#[cfg(test)] #[cfg(test)]
fn tempdir_with_prefix(prefix: &str) -> tempfile::TempDir { fn tempdir_with_prefix(prefix: &str) -> tempfile::TempDir {
......
...@@ -26,7 +26,6 @@ use rocksdb_options::{ ...@@ -26,7 +26,6 @@ use rocksdb_options::{
IngestExternalFileOptions, LRUCacheOptions, ReadOptions, RestoreOptions, UnsafeSnap, IngestExternalFileOptions, LRUCacheOptions, ReadOptions, RestoreOptions, UnsafeSnap,
WriteOptions, WriteOptions,
}; };
use std::collections::btree_map::Entry;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::ffi::{CStr, CString}; use std::ffi::{CStr, CString};
use std::fmt::{self, Debug, Formatter}; use std::fmt::{self, Debug, Formatter};
...@@ -46,6 +45,7 @@ use encryption::{DBEncryptionKeyManager, EncryptionKeyManager}; ...@@ -46,6 +45,7 @@ use encryption::{DBEncryptionKeyManager, EncryptionKeyManager};
use table_properties::{TableProperties, TablePropertiesCollection}; use table_properties::{TableProperties, TablePropertiesCollection};
use table_properties_rc::TablePropertiesCollection as RcTablePropertiesCollection; use table_properties_rc::TablePropertiesCollection as RcTablePropertiesCollection;
use titan::TitanDBOptions; use titan::TitanDBOptions;
use write_batch::WriteBatch;
pub struct CFHandle { pub struct CFHandle {
inner: *mut DBCFHandle, inner: *mut DBCFHandle,
...@@ -147,7 +147,8 @@ impl MapProperty { ...@@ -147,7 +147,8 @@ impl MapProperty {
pub struct DB { pub struct DB {
inner: *mut DBInstance, inner: *mut DBInstance,
cfs: BTreeMap<String, CFHandle>, cfs_by_name: BTreeMap<String, usize>,
cfs: Vec<Option<(String, CFHandle)>>,
path: String, path: String,
opts: DBOptions, opts: DBOptions,
_cf_opts: Vec<ColumnFamilyOptions>, _cf_opts: Vec<ColumnFamilyOptions>,
...@@ -169,12 +170,6 @@ impl DB { ...@@ -169,12 +170,6 @@ impl DB {
} }
} }
pub struct WriteBatch {
inner: *mut DBWriteBatch,
}
unsafe impl Send for WriteBatch {}
pub struct Snapshot<D: Deref<Target = DB>> { pub struct Snapshot<D: Deref<Target = DB>> {
db: D, db: D,
snap: UnsafeSnap, snap: UnsafeSnap,
...@@ -700,20 +695,25 @@ impl DB { ...@@ -700,20 +695,25 @@ impl DB {
opts.titan_inner = crocksdb_ffi::ctitandb_get_titan_db_options(db); opts.titan_inner = crocksdb_ffi::ctitandb_get_titan_db_options(db);
} }
} }
let mut cfs = Vec::with_capacity(names.len());
let cfs = names let mut cfs_by_name = BTreeMap::new();
.into_iter() for (name, h) in names.into_iter().zip(cf_handles) {
.zip(cf_handles) let handle = CFHandle { inner: h };
.map(|(s, h)| (s.to_owned(), CFHandle { inner: h })) let idx = handle.id() as usize;
.collect(); while cfs.len() <= idx {
cfs.push(None);
}
cfs[idx] = Some((name.to_owned(), handle));
cfs_by_name.insert(name.to_owned(), idx);
}
Ok(DB { Ok(DB {
cfs,
opts,
readonly,
cfs_by_name,
inner: db, inner: db,
cfs: cfs,
path: path.to_owned(), path: path.to_owned(),
opts: opts,
_cf_opts: options, _cf_opts: options,
readonly: readonly,
}) })
} }
...@@ -902,36 +902,56 @@ impl DB { ...@@ -902,36 +902,56 @@ impl DB {
}; };
let handle = CFHandle { inner: cf_handler }; let handle = CFHandle { inner: cf_handler };
self._cf_opts.push(cfd.options); self._cf_opts.push(cfd.options);
Ok(match self.cfs.entry(cfd.name.to_owned()) { let idx = handle.id() as usize;
Entry::Occupied(mut e) => { while idx >= self.cfs.len() {
e.insert(handle); self.cfs.push(None);
e.into_mut() }
} self.cfs[idx] = Some((cfd.name.to_owned(), handle));
Entry::Vacant(e) => e.insert(handle), self.cfs_by_name.insert(cfd.name.to_owned(), idx);
}) Ok(&self.cfs[idx].as_ref().unwrap().1)
} }
} }
pub fn drop_cf(&mut self, name: &str) -> Result<(), String> { pub fn drop_cf(&mut self, name: &str) -> Result<(), String> {
let cf = self.cfs.remove(name); let id = self.cfs_by_name.remove(name);
if cf.is_none() { let cf = match id {
return Err(format!("Invalid column family: {}", name)); None => return Err(format!("Invalid column family: {}", name)),
} Some(idx) => match self.cfs[idx].take() {
None => return Err(format!("Invalid column family: {}", name)),
Some((_, handle)) => handle,
},
};
unsafe { unsafe {
ffi_try!(crocksdb_drop_column_family(self.inner, cf.unwrap().inner)); ffi_try!(crocksdb_drop_column_family(self.inner, cf.inner));
} }
Ok(()) Ok(())
} }
pub fn cf_handle(&self, name: &str) -> Option<&CFHandle> { pub fn cf_handle(&self, name: &str) -> Option<&CFHandle> {
self.cfs.get(name) let idx = match self.cfs_by_name.get(name) {
None => return None,
Some(idx) => *idx,
};
self.cfs[idx].as_ref().map(|h| &h.1)
} }
/// get all column family names, including 'default'. /// get all column family names, including 'default'.
pub fn cf_names(&self) -> Vec<&str> { pub fn cf_names(&self) -> Vec<&str> {
self.cfs.iter().map(|(k, _)| k.as_str()).collect() self.cfs
.iter()
.filter(|handle| handle.is_some())
.map(|handle| handle.as_ref().unwrap().0.as_str())
.collect()
}
/// get all column family names, including 'default'.
pub fn cf_handle_by_id(&self, id: usize) -> Option<&CFHandle> {
if id >= self.cfs.len() {
return None;
}
self.cfs[id].as_ref().map(|h| &h.1)
} }
pub fn iter(&self) -> DBIterator<&DB> { pub fn iter(&self) -> DBIterator<&DB> {
...@@ -1924,74 +1944,6 @@ impl Writable for DB { ...@@ -1924,74 +1944,6 @@ impl Writable for DB {
} }
} }
impl Default for WriteBatch {
fn default() -> WriteBatch {
WriteBatch {
inner: unsafe { crocksdb_ffi::crocksdb_writebatch_create() },
}
}
}
impl WriteBatch {
pub fn new() -> WriteBatch {
WriteBatch::default()
}
pub fn with_capacity(cap: usize) -> WriteBatch {
WriteBatch {
inner: unsafe { crocksdb_ffi::crocksdb_writebatch_create_with_capacity(cap) },
}
}
pub fn count(&self) -> usize {
unsafe { crocksdb_ffi::crocksdb_writebatch_count(self.inner) as usize }
}
pub fn is_empty(&self) -> bool {
self.count() == 0
}
pub fn data_size(&self) -> usize {
unsafe {
let mut data_size: usize = 0;
let _ = crocksdb_ffi::crocksdb_writebatch_data(self.inner, &mut data_size);
return data_size;
}
}
pub fn clear(&self) {
unsafe {
crocksdb_ffi::crocksdb_writebatch_clear(self.inner);
}
}
pub fn set_save_point(&mut self) {
unsafe {
crocksdb_ffi::crocksdb_writebatch_set_save_point(self.inner);
}
}
pub fn rollback_to_save_point(&mut self) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_writebatch_rollback_to_save_point(self.inner));
}
Ok(())
}
pub fn pop_save_point(&mut self) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_writebatch_pop_save_point(self.inner));
}
Ok(())
}
}
impl Drop for WriteBatch {
fn drop(&mut self) {
unsafe { crocksdb_ffi::crocksdb_writebatch_destroy(self.inner) }
}
}
impl Drop for DB { impl Drop for DB {
fn drop(&mut self) { fn drop(&mut self) {
// SyncWAL before call close. // SyncWAL before call close.
...@@ -2856,11 +2808,13 @@ pub fn run_sst_dump_tool(sst_dump_args: &[String], opts: &DBOptions) { ...@@ -2856,11 +2808,13 @@ pub fn run_sst_dump_tool(sst_dump_args: &[String], opts: &DBOptions) {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use librocksdb_sys::DBValueType;
use std::fs; use std::fs;
use std::path::Path; use std::path::Path;
use std::str; use std::str;
use std::string::String; use std::string::String;
use std::thread; use std::thread;
use write_batch::WriteBatchRef;
use super::*; use super::*;
use crate::tempdir_with_prefix; use crate::tempdir_with_prefix;
...@@ -3505,4 +3459,130 @@ mod test { ...@@ -3505,4 +3459,130 @@ mod test {
CloudEnvOptions::new(), CloudEnvOptions::new(),
); );
} }
#[test]
fn test_write_append() {
let mut opts = DBOptions::new();
opts.create_if_missing(true);
let path = tempdir_with_prefix("_rust_rocksdb_multi_batch");
let db = DB::open(opts, path.path().to_str().unwrap()).unwrap();
let cf = db.cf_handle("default").unwrap();
let mut wb = WriteBatch::new();
for s in &[b"ab", b"cd", b"ef"] {
let w = WriteBatch::new();
w.put_cf(cf, s.to_vec().as_slice(), b"a").unwrap();
wb.append(w.data());
}
db.write(&wb).unwrap();
for s in &[b"ab", b"cd", b"ef"] {
let v = db.get_cf(cf, s.to_vec().as_slice()).unwrap();
assert!(v.is_some());
assert_eq!(v.unwrap().to_utf8().unwrap(), "a");
}
}
fn inner_test_write_batch_iter<F>(iter_fn: F)
where
F: FnOnce(&DB, &mut WriteBatch),
{
let temp_dir = tempdir_with_prefix("_rust_rocksdb_write_batch_iterate");
let path = temp_dir.path().to_str().unwrap();
let cfs = ["default", "cf1"];
let mut cfs_opts = vec![];
for _ in 0..cfs.len() {
cfs_opts.push(ColumnFamilyOptions::new());
}
let mut opts = DBOptions::new();
opts.create_if_missing(true);
let mut db = DB::open(opts, path).unwrap();
for (cf, cf_opts) in cfs.iter().zip(cfs_opts.iter().cloned()) {
if *cf != "default" {
db.create_cf((*cf, cf_opts)).unwrap();
}
}
let mut wb = WriteBatch::new();
let default_cf = db.cf_handle("default").unwrap();
let cf1 = db.cf_handle("cf1").unwrap();
db.put_cf(default_cf, b"k1", b"v0").unwrap();
db.put_cf(cf1, b"k1", b"v0").unwrap();
wb.put_cf(default_cf, b"k2", b"v1").unwrap();
wb.put_cf(cf1, b"k2", b"v1").unwrap();
wb.delete_cf(default_cf, b"k1").unwrap();
wb.delete_cf(cf1, b"k1").unwrap();
iter_fn(&db, &mut wb);
for cf in &["default", "cf1"] {
let handle = db.cf_handle(cf).unwrap();
let v0 = db.get_cf(handle, b"k1").unwrap();
assert!(v0.is_none());
let v1 = db.get_cf(handle, b"k2").unwrap();
assert!(v1.is_some());
assert_eq!(v1.unwrap().to_utf8().unwrap(), "v1");
}
}
#[test]
fn test_write_batch_iterate() {
inner_test_write_batch_iter(|db, wb| {
let cf_names = db.cf_names();
wb.iterate(&cf_names, |cf, write_type, key, value| {
let handle = db.cf_handle(cf).unwrap();
match write_type {
DBValueType::TypeValue => {
db.put_cf(handle, key, value.unwrap()).unwrap();
}
DBValueType::TypeDeletion => {
db.delete_cf(handle, key).unwrap();
}
_ => (),
}
});
});
}
#[test]
fn test_write_batch_ref_iter() {
inner_test_write_batch_iter(|db, wb| {
let wb_ref = WriteBatchRef::new(wb.data());
assert_eq!(wb.count(), wb_ref.count());
for (value_type, c, key, value) in wb_ref.iter() {
let handle = db.cf_handle_by_id(c as usize).unwrap();
match value_type {
DBValueType::TypeValue => {
db.put_cf(handle, key, value).unwrap();
}
DBValueType::TypeDeletion => {
db.delete_cf(handle, key).unwrap();
}
_ => {
println!("error type, cf: {}", c);
}
}
}
});
}
#[test]
fn test_write_batch_iter() {
inner_test_write_batch_iter(|db, wb| {
for (value_type, c, key, value) in wb.iter() {
let handle = db.cf_handle_by_id(c as usize).unwrap();
match value_type {
DBValueType::TypeValue => {
db.put_cf(handle, key, value).unwrap();
}
DBValueType::TypeDeletion => {
db.delete_cf(handle, key).unwrap();
}
_ => {
println!("error type, cf: {}", c);
}
}
}
});
}
} }
use crocksdb_ffi::{self, DBValueType, DBWriteBatch, DBWriteBatchIterator};
use libc::{c_void, size_t};
use std::marker::PhantomData;
use std::slice;
pub struct WriteBatch {
pub(crate) inner: *mut DBWriteBatch,
}
unsafe impl Send for WriteBatch {}
impl Default for WriteBatch {
fn default() -> WriteBatch {
WriteBatch {
inner: unsafe { crocksdb_ffi::crocksdb_writebatch_create() },
}
}
}
pub struct WriteBatchCallback<'a> {
cb_ptr: *mut c_void,
cfs: &'a [&'a str],
}
impl<'a> WriteBatchCallback<'a> {
fn invoke(&mut self, cf_id: u32, value_type: DBValueType, key: &[u8], value: Option<&[u8]>) {
let cf = self.cfs[cf_id as usize];
unsafe {
let cb: &mut &mut dyn FnMut(&str, DBValueType, &[u8], Option<&[u8]>) =
&mut *(self.cb_ptr as *mut _);
cb(cf, value_type, key, value);
}
}
}
impl WriteBatch {
pub fn new() -> WriteBatch {
WriteBatch::default()
}
pub fn with_capacity(cap: usize) -> WriteBatch {
WriteBatch {
inner: unsafe { crocksdb_ffi::crocksdb_writebatch_create_with_capacity(cap) },
}
}
pub fn count(&self) -> usize {
unsafe { crocksdb_ffi::crocksdb_writebatch_count(self.inner) as usize }
}
pub fn is_empty(&self) -> bool {
self.count() == 0
}
pub fn data_size(&self) -> usize {
unsafe {
let mut data_size: usize = 0;
let _ = crocksdb_ffi::crocksdb_writebatch_data(self.inner, &mut data_size);
return data_size;
}
}
pub fn clear(&self) {
unsafe {
crocksdb_ffi::crocksdb_writebatch_clear(self.inner);
}
}
pub fn set_save_point(&mut self) {
unsafe {
crocksdb_ffi::crocksdb_writebatch_set_save_point(self.inner);
}
}
pub fn rollback_to_save_point(&mut self) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_writebatch_rollback_to_save_point(self.inner));
}
Ok(())
}
pub fn pop_save_point(&mut self) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_writebatch_pop_save_point(self.inner));
}
Ok(())
}
pub fn data(&self) -> &[u8] {
let mut val_len: size_t = 0;
let val_len_ptr: *mut size_t = &mut val_len;
unsafe {
let val_ptr = crocksdb_ffi::crocksdb_writebatch_data(self.inner, val_len_ptr);
slice::from_raw_parts(val_ptr, val_len as usize)
}
}
pub fn append(&mut self, src: &[u8]) {
unsafe {
crocksdb_ffi::crocksdb_writebatch_append_content(
self.inner,
src.as_ptr(),
src.len() as size_t,
);
}
}
pub fn iterate<F>(&self, cfs: &[&str], mut iterator_fn: F)
where
F: FnMut(&str, DBValueType, &[u8], Option<&[u8]>),
{
unsafe {
let mut cb: &mut dyn FnMut(&str, DBValueType, &[u8], Option<&[u8]>) = &mut iterator_fn;
let cb_ptr = &mut cb;
let cb_proxy = Box::new(WriteBatchCallback {
cfs,
cb_ptr: cb_ptr as *mut _ as *mut c_void,
});
let state = Box::into_raw(cb_proxy) as *mut c_void;
crocksdb_ffi::crocksdb_writebatch_iterate_cf(
self.inner,
state,
put_fn,
put_cf_fn,
delete_fn,
delete_cf_fn,
);
}
}
pub fn iter(&self) -> WriteBatchIter {
WriteBatchIter::new(self)
}
}
pub struct WriteBatchIter<'a> {
props: PhantomData<&'a DBWriteBatchIterator>,
inner: *mut DBWriteBatchIterator,
}
impl<'a> Drop for WriteBatchIter<'a> {
fn drop(&mut self) {
unsafe {
crocksdb_ffi::crocksdb_writebatch_iterator_destroy(self.inner);
}
}
}
impl<'a> WriteBatchIter<'a> {
fn new(wb: &'a WriteBatch) -> WriteBatchIter<'a> {
unsafe {
WriteBatchIter {
props: PhantomData,
inner: crocksdb_ffi::crocksdb_writebatch_iterator_create(wb.inner),
}
}
}
fn from_bytes(data: &'a [u8]) -> WriteBatchIter<'a> {
unsafe {
WriteBatchIter {
props: PhantomData,
inner: crocksdb_ffi::crocksdb_writebatch_ref_iterator_create(
data.as_ptr(),
data.len() as size_t,
),
}
}
}
}
impl<'a> Iterator for WriteBatchIter<'a> {
type Item = (DBValueType, u32, &'a [u8], &'a [u8]);
fn next(&mut self) -> Option<(DBValueType, u32, &'a [u8], &'a [u8])> {
unsafe {
if !crocksdb_ffi::crocksdb_writebatch_iterator_valid(self.inner) {
return None;
}
let mut klen: size_t = 0;
let k = crocksdb_ffi::crocksdb_writebatch_iterator_key(self.inner, &mut klen);
let key = slice::from_raw_parts(k, klen);
let mut vlen: size_t = 0;
let v = crocksdb_ffi::crocksdb_writebatch_iterator_value(self.inner, &mut vlen);
let val = slice::from_raw_parts(v, vlen);
let value_type = match crocksdb_ffi::crocksdb_writebatch_iterator_value_type(self.inner)
{
DBValueType::TypeColumnFamilyDeletion => DBValueType::TypeDeletion,
DBValueType::TypeColumnFamilyValue => DBValueType::TypeValue,
DBValueType::TypeColumnFamilyMerge => DBValueType::TypeMerge,
DBValueType::TypeColumnFamilyRangeDeletion => DBValueType::TypeRangeDeletion,
other => other,
};
let column_family =
crocksdb_ffi::crocksdb_writebatch_iterator_column_family_id(self.inner);
crocksdb_ffi::crocksdb_writebatch_iterator_next(self.inner);
Some((value_type, column_family, key, val))
}
}
}
pub struct WriteBatchRef<'a> {
data: &'a [u8],
}
impl<'a> WriteBatchRef<'a> {
pub fn new(data: &'a [u8]) -> WriteBatchRef<'a> {
WriteBatchRef { data }
}
pub fn count(&self) -> usize {
unsafe {
crocksdb_ffi::crocksdb_writebatch_ref_count(
self.data.as_ptr(),
self.data.len() as size_t,
) as usize
}
}
pub fn iter(&self) -> WriteBatchIter<'a> {
WriteBatchIter::from_bytes(self.data)
}
}
pub unsafe extern "C" fn put_fn(
state: *mut c_void,
k: *const u8,
klen: size_t,
v: *const u8,
vlen: size_t,
) {
let proxy: &mut WriteBatchCallback = &mut *(state as *mut WriteBatchCallback);
let a: &[u8] = slice::from_raw_parts(k as *const u8, klen as usize);
let b: &[u8] = slice::from_raw_parts(v as *const u8, vlen as usize);
proxy.invoke(0, DBValueType::TypeValue, a, Some(b));
}
pub unsafe extern "C" fn put_cf_fn(
state: *mut c_void,
cf_id: u32,
k: *const u8,
klen: size_t,
v: *const u8,
vlen: size_t,
) {
let proxy: &mut WriteBatchCallback = &mut *(state as *mut WriteBatchCallback);
let a: &[u8] = slice::from_raw_parts(k as *const u8, klen as usize);
let b: &[u8] = slice::from_raw_parts(v as *const u8, vlen as usize);
proxy.invoke(cf_id, DBValueType::TypeValue, a, Some(b));
}
pub unsafe extern "C" fn delete_fn(state: *mut c_void, k: *const u8, klen: size_t) {
let proxy: &mut WriteBatchCallback = &mut *(state as *mut WriteBatchCallback);
let k: &[u8] = slice::from_raw_parts(k as *const u8, klen as usize);
proxy.invoke(0, DBValueType::TypeDeletion, k, None);
}
pub unsafe extern "C" fn delete_cf_fn(state: *mut c_void, cf_id: u32, k: *const u8, klen: size_t) {
let proxy: &mut WriteBatchCallback = &mut *(state as *mut WriteBatchCallback);
let k: &[u8] = slice::from_raw_parts(k as *const u8, klen as usize);
proxy.invoke(cf_id, DBValueType::TypeDeletion, k, None);
}
impl Drop for WriteBatch {
fn drop(&mut self) {
unsafe { crocksdb_ffi::crocksdb_writebatch_destroy(self.inner) }
}
}
...@@ -35,7 +35,7 @@ pub fn test_column_family() { ...@@ -35,7 +35,7 @@ pub fn test_column_family() {
panic!("could not create column family: {}", e); panic!("could not create column family: {}", e);
} }
} }
assert_eq!(db.cf_names(), vec!["cf1", "default"]); assert_eq!(db.cf_names(), vec!["default", "cf1"]);
} }
// should fail to open db without specifying same column families // should fail to open db without specifying same column families
......
...@@ -33,7 +33,7 @@ pub fn test_ttl() { ...@@ -33,7 +33,7 @@ pub fn test_ttl() {
panic!("could not create column family: {}", e); panic!("could not create column family: {}", e);
} }
} }
assert_eq!(db.cf_names(), vec!["cf1", "default"]); assert_eq!(db.cf_names(), vec!["default", "cf1"]);
match db.create_cf("cf2") { match db.create_cf("cf2") {
Ok(_) => println!("cf2 created successfully"), Ok(_) => println!("cf2 created successfully"),
...@@ -41,7 +41,7 @@ pub fn test_ttl() { ...@@ -41,7 +41,7 @@ pub fn test_ttl() {
panic!("could not create column family: {}", e); panic!("could not create column family: {}", e);
} }
} }
assert_eq!(db.cf_names(), vec!["cf1", "cf2", "default"]); assert_eq!(db.cf_names(), vec!["default", "cf1", "cf2"]);
drop(db); drop(db);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment