Commit e28c5fe9 authored by follitude's avatar follitude Committed by zhangjinpeng1987

Add GetAllKeyVersions (#92)

parent f5ee9ed2
...@@ -9,7 +9,6 @@ ...@@ -9,7 +9,6 @@
#include "crocksdb/c.h" #include "crocksdb/c.h"
#include <stdlib.h>
#include "rocksdb/cache.h" #include "rocksdb/cache.h"
#include "rocksdb/compaction_filter.h" #include "rocksdb/compaction_filter.h"
#include "rocksdb/comparator.h" #include "rocksdb/comparator.h"
...@@ -30,7 +29,9 @@ ...@@ -30,7 +29,9 @@
#include "rocksdb/table_properties.h" #include "rocksdb/table_properties.h"
#include "rocksdb/universal_compaction.h" #include "rocksdb/universal_compaction.h"
#include "rocksdb/utilities/backupable_db.h" #include "rocksdb/utilities/backupable_db.h"
#include "rocksdb/utilities/debug.h"
#include "rocksdb/write_batch.h" #include "rocksdb/write_batch.h"
#include <stdlib.h>
using rocksdb::Cache; using rocksdb::Cache;
using rocksdb::ColumnFamilyDescriptor; using rocksdb::ColumnFamilyDescriptor;
...@@ -96,6 +97,7 @@ using rocksdb::TableProperties; ...@@ -96,6 +97,7 @@ using rocksdb::TableProperties;
using rocksdb::TablePropertiesCollection; using rocksdb::TablePropertiesCollection;
using rocksdb::TablePropertiesCollector; using rocksdb::TablePropertiesCollector;
using rocksdb::TablePropertiesCollectorFactory; using rocksdb::TablePropertiesCollectorFactory;
using rocksdb::KeyVersion;
using std::shared_ptr; using std::shared_ptr;
...@@ -147,6 +149,10 @@ struct crocksdb_externalfileingestioninfo_t { ...@@ -147,6 +149,10 @@ struct crocksdb_externalfileingestioninfo_t {
ExternalFileIngestionInfo rep; ExternalFileIngestionInfo rep;
}; };
struct crocksdb_keyversions_t {
std::vector<KeyVersion> rep;
};
struct crocksdb_compactionfiltercontext_t { struct crocksdb_compactionfiltercontext_t {
CompactionFilter::Context rep; CompactionFilter::Context rep;
}; };
...@@ -1923,7 +1929,7 @@ void crocksdb_options_set_compression(crocksdb_options_t* opt, int t) { ...@@ -1923,7 +1929,7 @@ void crocksdb_options_set_compression(crocksdb_options_t* opt, int t) {
opt->rep.compression = static_cast<CompressionType>(t); opt->rep.compression = static_cast<CompressionType>(t);
} }
int crocksdb_options_get_compression(crocksdb_options_t* opt) { int crocksdb_options_get_compression(crocksdb_options_t *opt) {
return static_cast<int>(opt->rep.compression); return static_cast<int>(opt->rep.compression);
} }
...@@ -3485,8 +3491,44 @@ crocksdb_get_properties_of_tables_in_range( ...@@ -3485,8 +3491,44 @@ crocksdb_get_properties_of_tables_in_range(
return props.release(); return props.release();
} }
void crocksdb_set_bottommost_compression(crocksdb_options_t* opt, int c) { void crocksdb_set_bottommost_compression(crocksdb_options_t *opt, int c) {
opt->rep.bottommost_compression = static_cast<CompressionType>(c); opt->rep.bottommost_compression = static_cast<CompressionType>(c);
} }
// Get All Key Versions
void crocksdb_keyversions_destroy(crocksdb_keyversions_t *kvs) { delete kvs; }
crocksdb_keyversions_t *
crocksdb_get_all_key_versions(crocksdb_t *db, const char *begin_key,
size_t begin_keylen, const char *end_key,
size_t end_keylen, char **errptr) {
crocksdb_keyversions_t *result = new crocksdb_keyversions_t;
SaveError(errptr,
GetAllKeyVersions(db->rep, Slice(begin_key, begin_keylen),
Slice(end_key, end_keylen), &result->rep));
return result;
}
size_t crocksdb_keyversions_count(const crocksdb_keyversions_t *kvs) {
return kvs->rep.size();
}
const char *crocksdb_keyversions_key(const crocksdb_keyversions_t *kvs,
int index) {
return kvs->rep[index].user_key.c_str();
}
const char *crocksdb_keyversions_value(const crocksdb_keyversions_t *kvs,
int index) {
return kvs->rep[index].value.c_str();
}
uint64_t crocksdb_keyversions_seq(const crocksdb_keyversions_t *kvs,
int index) {
return kvs->rep[index].sequence;
}
int crocksdb_keyversions_type(const crocksdb_keyversions_t *kvs, int index) {
return kvs->rep[index].type;
}
} // end extern "C" } // end extern "C"
...@@ -129,6 +129,7 @@ typedef struct crocksdb_compactionjobinfo_t crocksdb_compactionjobinfo_t; ...@@ -129,6 +129,7 @@ typedef struct crocksdb_compactionjobinfo_t crocksdb_compactionjobinfo_t;
typedef struct crocksdb_externalfileingestioninfo_t typedef struct crocksdb_externalfileingestioninfo_t
crocksdb_externalfileingestioninfo_t; crocksdb_externalfileingestioninfo_t;
typedef struct crocksdb_eventlistener_t crocksdb_eventlistener_t; typedef struct crocksdb_eventlistener_t crocksdb_eventlistener_t;
typedef struct crocksdb_keyversions_t crocksdb_keyversions_t;
typedef enum crocksdb_table_property_t { typedef enum crocksdb_table_property_t {
kDataSize = 1, kDataSize = 1,
...@@ -695,8 +696,8 @@ extern C_ROCKSDB_LIBRARY_API size_t crocksdb_options_get_compression_level_numbe ...@@ -695,8 +696,8 @@ extern C_ROCKSDB_LIBRARY_API size_t crocksdb_options_get_compression_level_numbe
crocksdb_options_t* opt); crocksdb_options_t* opt);
extern C_ROCKSDB_LIBRARY_API void crocksdb_options_get_compression_per_level( extern C_ROCKSDB_LIBRARY_API void crocksdb_options_get_compression_per_level(
crocksdb_options_t* opt, int *level_values); crocksdb_options_t* opt, int *level_values);
extern C_ROCKSDB_LIBRARY_API void crocksdb_set_bottommost_compression( extern C_ROCKSDB_LIBRARY_API void
crocksdb_options_t* opt, int c); crocksdb_set_bottommost_compression(crocksdb_options_t *opt, int c);
extern C_ROCKSDB_LIBRARY_API void crocksdb_options_set_create_if_missing( extern C_ROCKSDB_LIBRARY_API void crocksdb_options_set_create_if_missing(
crocksdb_options_t*, unsigned char); crocksdb_options_t*, unsigned char);
extern C_ROCKSDB_LIBRARY_API void extern C_ROCKSDB_LIBRARY_API void
...@@ -930,8 +931,8 @@ enum { ...@@ -930,8 +931,8 @@ enum {
}; };
extern C_ROCKSDB_LIBRARY_API void crocksdb_options_set_compression( extern C_ROCKSDB_LIBRARY_API void crocksdb_options_set_compression(
crocksdb_options_t*, int); crocksdb_options_t*, int);
extern C_ROCKSDB_LIBRARY_API int crocksdb_options_get_compression( extern C_ROCKSDB_LIBRARY_API int
crocksdb_options_t*); crocksdb_options_get_compression(crocksdb_options_t *);
enum { enum {
crocksdb_level_compaction = 0, crocksdb_level_compaction = 0,
...@@ -1425,6 +1426,31 @@ crocksdb_get_propeties_of_tables_in_range( ...@@ -1425,6 +1426,31 @@ crocksdb_get_propeties_of_tables_in_range(
const char* const* limit_keys, const size_t* limit_keys_lens, const char* const* limit_keys, const size_t* limit_keys_lens,
char** errptr); char** errptr);
/* Get All Key Versions */
extern C_ROCKSDB_LIBRARY_API void
crocksdb_keyversions_destroy(crocksdb_keyversions_t *kvs);
extern C_ROCKSDB_LIBRARY_API crocksdb_keyversions_t *
crocksdb_get_all_key_versions(crocksdb_t *db, const char *begin_key,
size_t begin_keylen, const char *end_key,
size_t end_keylen, char **errptr);
extern C_ROCKSDB_LIBRARY_API size_t
crocksdb_keyversions_count(const crocksdb_keyversions_t *kvs);
extern C_ROCKSDB_LIBRARY_API const char *
crocksdb_keyversions_key(const crocksdb_keyversions_t *kvs, int index);
extern C_ROCKSDB_LIBRARY_API const char *
crocksdb_keyversions_key(const crocksdb_keyversions_t *kvs, int index);
extern C_ROCKSDB_LIBRARY_API uint64_t
crocksdb_keyversions_seq(const crocksdb_keyversions_t *kvs, int index);
extern C_ROCKSDB_LIBRARY_API int
crocksdb_keyversions_type(const crocksdb_keyversions_t *kvs, int index);
#ifdef __cplusplus #ifdef __cplusplus
} /* end extern "C" */ } /* end extern "C" */
#endif #endif
......
...@@ -56,6 +56,7 @@ pub enum DBFlushJobInfo {} ...@@ -56,6 +56,7 @@ pub enum DBFlushJobInfo {}
pub enum DBCompactionJobInfo {} pub enum DBCompactionJobInfo {}
pub enum DBIngestionInfo {} pub enum DBIngestionInfo {}
pub enum DBEventListener {} pub enum DBEventListener {}
pub enum DBKeyVersions {}
pub fn new_bloom_filter(bits: c_int) -> *mut DBFilterPolicy { pub fn new_bloom_filter(bits: c_int) -> *mut DBFilterPolicy {
unsafe { crocksdb_filterpolicy_create_bloom(bits) } unsafe { crocksdb_filterpolicy_create_bloom(bits) }
...@@ -1037,6 +1038,26 @@ extern "C" { ...@@ -1037,6 +1038,26 @@ extern "C" {
-> *mut DBEventListener; -> *mut DBEventListener;
pub fn crocksdb_eventlistener_destroy(et: *mut DBEventListener); pub fn crocksdb_eventlistener_destroy(et: *mut DBEventListener);
pub fn crocksdb_options_add_eventlistener(opt: *mut DBOptions, et: *mut DBEventListener); pub fn crocksdb_options_add_eventlistener(opt: *mut DBOptions, et: *mut DBEventListener);
// Get All Key Versions
pub fn crocksdb_keyversions_destroy(kvs: *mut DBKeyVersions);
pub fn crocksdb_get_all_key_versions(db: *mut DBInstance,
begin_key: *const u8,
begin_keylen: size_t,
end_key: *const u8,
end_keylen: size_t,
errptr: *mut *mut c_char)
-> *mut DBKeyVersions;
pub fn crocksdb_keyversions_count(kvs: *mut DBKeyVersions) -> size_t;
pub fn crocksdb_keyversions_key(kvs: *mut DBKeyVersions, index: usize) -> *const c_char;
pub fn crocksdb_keyversions_value(kvs: *mut DBKeyVersions, index: usize) -> *const c_char;
pub fn crocksdb_keyversions_seq(kvs: *mut DBKeyVersions, index: usize) -> uint64_t;
pub fn crocksdb_keyversions_type(kvs: *mut DBKeyVersions, index: usize) -> c_int;
} }
#[cfg(test)] #[cfg(test)]
......
...@@ -306,6 +306,13 @@ impl<'a> Range<'a> { ...@@ -306,6 +306,13 @@ impl<'a> Range<'a> {
} }
} }
pub struct KeyVersion {
pub key: String,
pub value: String,
pub seq: u64,
pub value_type: c_int,
}
impl DB { impl DB {
pub fn open_default(path: &str) -> Result<DB, String> { pub fn open_default(path: &str) -> Result<DB, String> {
let mut opts = Options::new(); let mut opts = Options::new();
...@@ -1081,6 +1088,35 @@ impl DB { ...@@ -1081,6 +1088,35 @@ impl DB {
Ok(TablePropertiesCollection::from_raw(props)) Ok(TablePropertiesCollection::from_raw(props))
} }
} }
pub fn get_all_key_versions(&self,
start_key: &[u8],
end_key: &[u8])
-> Result<Vec<KeyVersion>, String> {
unsafe {
let kvs = ffi_try!(crocksdb_get_all_key_versions(self.inner,
start_key.as_ptr(),
start_key.len() as size_t,
end_key.as_ptr(),
end_key.len() as size_t));
let size = crocksdb_ffi::crocksdb_keyversions_count(kvs) as usize;
let mut key_versions = Vec::with_capacity(size);
for i in 0..size {
key_versions.push(KeyVersion {
key: CStr::from_ptr(crocksdb_ffi::crocksdb_keyversions_key(kvs, i))
.to_string_lossy()
.into_owned(),
value: CStr::from_ptr(crocksdb_ffi::crocksdb_keyversions_value(kvs, i))
.to_string_lossy()
.into_owned(),
seq: crocksdb_ffi::crocksdb_keyversions_seq(kvs, i),
value_type: crocksdb_ffi::crocksdb_keyversions_type(kvs, i),
})
}
crocksdb_ffi::crocksdb_keyversions_destroy(kvs);
Ok(key_versions)
}
}
} }
impl Writable for DB { impl Writable for DB {
...@@ -1914,4 +1950,28 @@ mod test { ...@@ -1914,4 +1950,28 @@ mod test {
} }
} }
} }
#[test]
fn test_get_all_key_versions() {
let mut opts = Options::new();
opts.create_if_missing(true);
let path = TempDir::new("_rust_rocksdb_get_all_key_version_test").expect("");
let db = DB::open(opts, path.path().to_str().unwrap()).unwrap();
let samples = vec![(b"key1".to_vec(), b"value1".to_vec()),
(b"key2".to_vec(), b"value2".to_vec()),
(b"key3".to_vec(), b"value3".to_vec()),
(b"key4".to_vec(), b"value4".to_vec())];
// Put 4 keys.
for &(ref k, ref v) in &samples {
db.put(k, v).unwrap();
assert_eq!(v.as_slice(), &*db.get(k).unwrap().unwrap());
}
db.flush(true).unwrap();
let key_versions = db.get_all_key_versions(b"key2", b"key4").unwrap();
assert_eq!(key_versions[1].key, "key3");
assert_eq!(key_versions[1].value, "value3");
assert_eq!(key_versions[1].seq, 3);
}
} }
...@@ -12,10 +12,10 @@ ...@@ -12,10 +12,10 @@
// limitations under the License. // limitations under the License.
use std::sync::Arc;
use std::sync::atomic::*;
use rocksdb::*; use rocksdb::*;
use std::sync::Arc;
use std::sync::atomic::*;
use tempdir::TempDir; use tempdir::TempDir;
use test_ingest_external_file::gen_sst; use test_ingest_external_file::gen_sst;
...@@ -84,7 +84,7 @@ fn test_event_listener_basic() { ...@@ -84,7 +84,7 @@ fn test_event_listener_basic() {
let db = DB::open(opts, path_str).unwrap(); let db = DB::open(opts, path_str).unwrap();
for i in 1..8000 { for i in 1..8000 {
db.put(format!("{:04}", i).as_bytes(), db.put(format!("{:04}", i).as_bytes(),
format!("{:04}", i).as_bytes()) format!("{:04}", i).as_bytes())
.unwrap(); .unwrap();
} }
db.flush(true).unwrap(); db.flush(true).unwrap();
...@@ -92,7 +92,7 @@ fn test_event_listener_basic() { ...@@ -92,7 +92,7 @@ fn test_event_listener_basic() {
for i in 1..8000 { for i in 1..8000 {
db.put(format!("{:04}", i).as_bytes(), db.put(format!("{:04}", i).as_bytes(),
format!("{:04}", i).as_bytes()) format!("{:04}", i).as_bytes())
.unwrap(); .unwrap();
} }
db.flush(true).unwrap(); db.flush(true).unwrap();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment