Commit 9aa42565 authored by Jay's avatar Jay Committed by GitHub

use deref instead of pure db reference (#144)

parent 2dd2dcbb
...@@ -97,16 +97,14 @@ pub struct WriteBatch { ...@@ -97,16 +97,14 @@ pub struct WriteBatch {
unsafe impl Send for WriteBatch {} unsafe impl Send for WriteBatch {}
pub struct Snapshot<'a> { pub struct Snapshot<D: Deref<Target = DB>> {
db: &'a DB, db: D,
snap: UnsafeSnap, snap: UnsafeSnap,
} }
// We need to find a better way to add a lifetime in here. pub struct DBIterator<D: Deref<Target = DB>> {
#[allow(dead_code)] _db: D,
pub struct DBIterator<'a> { _readopts: ReadOptions,
db: &'a DB,
readopts: ReadOptions,
inner: *mut crocksdb_ffi::DBIterator, inner: *mut crocksdb_ffi::DBIterator,
} }
...@@ -122,14 +120,14 @@ impl<'a> From<&'a [u8]> for SeekKey<'a> { ...@@ -122,14 +120,14 @@ impl<'a> From<&'a [u8]> for SeekKey<'a> {
} }
} }
impl<'a> DBIterator<'a> { impl<D: Deref<Target = DB>> DBIterator<D> {
pub fn new(db: &'a DB, readopts: ReadOptions) -> DBIterator<'a> { pub fn new(db: D, readopts: ReadOptions) -> DBIterator<D> {
unsafe { unsafe {
let iterator = crocksdb_ffi::crocksdb_create_iterator(db.inner, readopts.get_inner()); let iterator = crocksdb_ffi::crocksdb_create_iterator(db.inner, readopts.get_inner());
DBIterator { DBIterator {
db: db, _db: db,
readopts: readopts, _readopts: readopts,
inner: iterator, inner: iterator,
} }
} }
...@@ -209,7 +207,7 @@ impl<'a> DBIterator<'a> { ...@@ -209,7 +207,7 @@ impl<'a> DBIterator<'a> {
unsafe { crocksdb_ffi::crocksdb_iter_valid(self.inner) } unsafe { crocksdb_ffi::crocksdb_iter_valid(self.inner) }
} }
pub fn new_cf(db: &'a DB, cf_handle: &CFHandle, readopts: ReadOptions) -> DBIterator<'a> { pub fn new_cf(db: D, cf_handle: &CFHandle, readopts: ReadOptions) -> DBIterator<D> {
unsafe { unsafe {
let iterator = crocksdb_ffi::crocksdb_create_iterator_cf( let iterator = crocksdb_ffi::crocksdb_create_iterator_cf(
db.inner, db.inner,
...@@ -217,8 +215,8 @@ impl<'a> DBIterator<'a> { ...@@ -217,8 +215,8 @@ impl<'a> DBIterator<'a> {
cf_handle.inner, cf_handle.inner,
); );
DBIterator { DBIterator {
db: db, _db: db,
readopts: readopts, _readopts: readopts,
inner: iterator, inner: iterator,
} }
} }
...@@ -227,7 +225,7 @@ impl<'a> DBIterator<'a> { ...@@ -227,7 +225,7 @@ impl<'a> DBIterator<'a> {
pub type Kv = (Vec<u8>, Vec<u8>); pub type Kv = (Vec<u8>, Vec<u8>);
impl<'b, 'a> Iterator for &'b mut DBIterator<'a> { impl<'b, D: Deref<Target = DB>> Iterator for &'b mut DBIterator<D> {
type Item = Kv; type Item = Kv;
fn next(&mut self) -> Option<Kv> { fn next(&mut self) -> Option<Kv> {
...@@ -239,7 +237,7 @@ impl<'b, 'a> Iterator for &'b mut DBIterator<'a> { ...@@ -239,7 +237,7 @@ impl<'b, 'a> Iterator for &'b mut DBIterator<'a> {
} }
} }
impl<'a> Drop for DBIterator<'a> { impl<D: Deref<Target = DB>> Drop for DBIterator<D> {
fn drop(&mut self) { fn drop(&mut self) {
unsafe { unsafe {
crocksdb_ffi::crocksdb_iter_destroy(self.inner); crocksdb_ffi::crocksdb_iter_destroy(self.inner);
...@@ -247,26 +245,44 @@ impl<'a> Drop for DBIterator<'a> { ...@@ -247,26 +245,44 @@ impl<'a> Drop for DBIterator<'a> {
} }
} }
impl<'a> Snapshot<'a> { unsafe impl<D: Deref<Target = DB> + Send> Send for DBIterator<D> {}
pub fn new(db: &DB) -> Snapshot {
unsafe impl<D: Deref<Target = DB> + Send + Sync> Send for Snapshot<D> {}
unsafe impl<D: Deref<Target = DB> + Send + Sync> Sync for Snapshot<D> {}
impl<D: Deref<Target = DB> + Clone> Snapshot<D> {
/// Create an iterator and clone the inner db.
///
/// Please note that, the snapshot struct could be dropped before the iterator
/// if use improperly, which seems safe though.
pub fn iter_opt_clone(&self, mut opt: ReadOptions) -> DBIterator<D> {
unsafe {
opt.set_snapshot(&self.snap);
}
DBIterator::new(self.db.clone(), opt)
}
}
impl<D: Deref<Target = DB>> Snapshot<D> {
pub fn new(db: D) -> Snapshot<D> {
unsafe { unsafe {
Snapshot { Snapshot {
db: db,
snap: db.unsafe_snap(), snap: db.unsafe_snap(),
db: db,
} }
} }
} }
pub fn iter(&self) -> DBIterator { pub fn iter(&self) -> DBIterator<&DB> {
let readopts = ReadOptions::new(); let readopts = ReadOptions::new();
self.iter_opt(readopts) self.iter_opt(readopts)
} }
pub fn iter_opt(&self, mut opt: ReadOptions) -> DBIterator { pub fn iter_opt(&self, mut opt: ReadOptions) -> DBIterator<&DB> {
unsafe { unsafe {
opt.set_snapshot(&self.snap); opt.set_snapshot(&self.snap);
} }
DBIterator::new(self.db, opt) DBIterator::new(&self.db, opt)
} }
pub fn get(&self, key: &[u8]) -> Result<Option<DBVector>, String> { pub fn get(&self, key: &[u8]) -> Result<Option<DBVector>, String> {
...@@ -286,7 +302,7 @@ impl<'a> Snapshot<'a> { ...@@ -286,7 +302,7 @@ impl<'a> Snapshot<'a> {
} }
} }
impl<'a> Drop for Snapshot<'a> { impl<D: Deref<Target = DB>> Drop for Snapshot<D> {
fn drop(&mut self) { fn drop(&mut self) {
unsafe { self.db.release_snap(&self.snap) } unsafe { self.db.release_snap(&self.snap) }
} }
...@@ -653,25 +669,25 @@ impl DB { ...@@ -653,25 +669,25 @@ impl DB {
self.cfs.iter().map(|(k, _)| k.as_str()).collect() self.cfs.iter().map(|(k, _)| k.as_str()).collect()
} }
pub fn iter(&self) -> DBIterator { pub fn iter(&self) -> DBIterator<&DB> {
let opts = ReadOptions::new(); let opts = ReadOptions::new();
self.iter_opt(opts) self.iter_opt(opts)
} }
pub fn iter_opt(&self, opt: ReadOptions) -> DBIterator { pub fn iter_opt(&self, opt: ReadOptions) -> DBIterator<&DB> {
DBIterator::new(&self, opt) DBIterator::new(&self, opt)
} }
pub fn iter_cf(&self, cf_handle: &CFHandle) -> DBIterator { pub fn iter_cf(&self, cf_handle: &CFHandle) -> DBIterator<&DB> {
let opts = ReadOptions::new(); let opts = ReadOptions::new();
DBIterator::new_cf(self, cf_handle, opts) DBIterator::new_cf(self, cf_handle, opts)
} }
pub fn iter_cf_opt(&self, cf_handle: &CFHandle, opts: ReadOptions) -> DBIterator { pub fn iter_cf_opt(&self, cf_handle: &CFHandle, opts: ReadOptions) -> DBIterator<&DB> {
DBIterator::new_cf(self, cf_handle, opts) DBIterator::new_cf(self, cf_handle, opts)
} }
pub fn snapshot(&self) -> Snapshot { pub fn snapshot(&self) -> Snapshot<&DB> {
Snapshot::new(self) Snapshot::new(self)
} }
......
...@@ -12,6 +12,10 @@ ...@@ -12,6 +12,10 @@
// limitations under the License. // limitations under the License.
use rocksdb::*; use rocksdb::*;
use rocksdb::rocksdb::Snapshot;
use std::ops::Deref;
use std::sync::*;
use std::thread;
use tempdir::TempDir; use tempdir::TempDir;
struct FixedPrefixTransform { struct FixedPrefixTransform {
...@@ -42,7 +46,7 @@ impl SliceTransform for FixedSuffixTransform { ...@@ -42,7 +46,7 @@ impl SliceTransform for FixedSuffixTransform {
} }
} }
fn prev_collect<'a>(iter: &mut DBIterator<'a>) -> Vec<Kv> { fn prev_collect<D: Deref<Target = DB>>(iter: &mut DBIterator<D>) -> Vec<Kv> {
let mut buf = vec![]; let mut buf = vec![];
while iter.valid() { while iter.valid() {
buf.push(iter.kv().unwrap()); buf.push(iter.kv().unwrap());
...@@ -51,7 +55,7 @@ fn prev_collect<'a>(iter: &mut DBIterator<'a>) -> Vec<Kv> { ...@@ -51,7 +55,7 @@ fn prev_collect<'a>(iter: &mut DBIterator<'a>) -> Vec<Kv> {
buf buf
} }
fn next_collect<'a>(iter: &mut DBIterator<'a>) -> Vec<Kv> { fn next_collect<D: Deref<Target = DB>>(iter: &mut DBIterator<D>) -> Vec<Kv> {
let mut buf = vec![]; let mut buf = vec![];
while iter.valid() { while iter.valid() {
buf.push(iter.kv().unwrap()); buf.push(iter.kv().unwrap());
...@@ -170,6 +174,49 @@ pub fn test_iterator() { ...@@ -170,6 +174,49 @@ pub fn test_iterator() {
assert!(!iter.valid()); assert!(!iter.valid());
} }
#[test]
fn test_send_iterator() {
let path = TempDir::new("_rust_rocksdb_iteratortest_send").expect("");
let db = Arc::new(DB::open_default(path.path().to_str().unwrap()).unwrap());
db.put(b"k1", b"v1").unwrap();
let opt = ReadOptions::new();
let iter = DBIterator::new(db.clone(), opt);
let make_checker = |mut iter: DBIterator<Arc<DB>>| {
let (tx, rx) = mpsc::channel();
let j = thread::spawn(move || {
rx.recv().unwrap();
iter.seek(SeekKey::Start);
assert_eq!(iter.key(), b"k1");
assert_eq!(iter.value(), b"v1");
});
(tx, j)
};
let (tx, handle) = make_checker(iter);
drop(db);
tx.send(()).unwrap();
handle.join().unwrap();
let db = Arc::new(DB::open_default(path.path().to_str().unwrap()).unwrap());
db.flush(true).unwrap();
let snap = Snapshot::new(db.clone());
let iter = snap.iter_opt_clone(ReadOptions::new());
db.put(b"k1", b"v2").unwrap();
db.flush(true).unwrap();
db.compact_range(None, None);
let (tx, handle) = make_checker(iter);
// iterator still holds the sst file, so it should be able to read the old value.
drop(snap);
drop(db);
tx.send(()).unwrap();
handle.join().unwrap();
}
#[test] #[test]
fn test_seek_for_prev() { fn test_seek_for_prev() {
let path = TempDir::new("_rust_rocksdb_seek_for_prev").expect(""); let path = TempDir::new("_rust_rocksdb_seek_for_prev").expect("");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment