Commit ed089679 authored by Jay's avatar Jay

refactor iterator (#12)

parent 1f605a95
......@@ -27,7 +27,6 @@ pub mod comparator;
pub use librocksdb_sys::{DBCompactionStyle, DBComparator, DBCompressionType,
new_bloom_filter, self as rocksdb_ffi};
pub use rocksdb::{DB, DBIterator, DBVector, Direction, IteratorMode, Kv,
Writable, WriteBatch};
pub use rocksdb::{DB, DBIterator, DBVector, Kv, SeekKey, Writable, WriteBatch};
pub use rocksdb_options::{BlockBasedOptions, Options, WriteOptions};
pub use merge_operator::MergeOperands;
......@@ -129,13 +129,13 @@ fn main() {
db.merge(b"k1", b"efg");
db.merge(b"k1", b"h");
db.get(b"k1")
.map(|value| {
match value.to_utf8() {
Some(v) => (),
None => panic!("value corrupted"),
}
})
.or_else(|e| panic!("error retrieving value: {}", e));
.map(|value| {
match value.to_utf8() {
Some(v) => (),
None => panic!("value corrupted"),
}
})
.or_else(|e| panic!("error retrieving value: {}", e));
db.delete(b"k1");
}
}
......
......@@ -53,71 +53,22 @@ pub struct Snapshot<'a> {
pub struct DBIterator<'a> {
db: &'a DB,
inner: rocksdb_ffi::DBIterator,
direction: Direction,
just_seeked: bool,
}
pub enum Direction {
Forward,
Reverse,
}
pub type Kv<'a> = (&'a [u8], &'a [u8]);
impl<'a> Iterator for DBIterator<'a> {
type Item = Kv<'a>;
fn next(&mut self) -> Option<Kv<'a>> {
let native_iter = self.inner;
if self.just_seeked {
self.just_seeked = false;
} else {
match self.direction {
Direction::Forward => unsafe {
rocksdb_ffi::rocksdb_iter_next(native_iter)
},
Direction::Reverse => unsafe {
rocksdb_ffi::rocksdb_iter_prev(native_iter)
},
}
}
if unsafe { rocksdb_ffi::rocksdb_iter_valid(native_iter) } {
let mut key_len: size_t = 0;
let key_len_ptr: *mut size_t = &mut key_len;
let mut val_len: size_t = 0;
let val_len_ptr: *mut size_t = &mut val_len;
let key_ptr = unsafe {
rocksdb_ffi::rocksdb_iter_key(native_iter, key_len_ptr)
};
let key = unsafe {
slice::from_raw_parts(key_ptr, key_len as usize)
};
let val_ptr = unsafe {
rocksdb_ffi::rocksdb_iter_value(native_iter, val_len_ptr)
};
let val = unsafe {
slice::from_raw_parts(val_ptr, val_len as usize)
};
Some((key, val))
} else {
None
}
}
}
pub enum IteratorMode<'a> {
pub enum SeekKey<'a> {
Start,
End,
From(&'a [u8], Direction),
Key(&'a [u8]),
}
impl<'a> From<&'a [u8]> for SeekKey<'a> {
fn from(bs: &'a [u8]) -> SeekKey {
SeekKey::Key(bs)
}
}
impl<'a> DBIterator<'a> {
fn new(db: &'a DB,
readopts: &ReadOptions,
mode: IteratorMode)
-> DBIterator<'a> {
fn new(db: &'a DB, readopts: &ReadOptions, key: SeekKey) -> DBIterator<'a> {
unsafe {
let iterator = rocksdb_ffi::rocksdb_create_iterator(db.inner,
readopts.inner);
......@@ -125,35 +76,74 @@ impl<'a> DBIterator<'a> {
let mut rv = DBIterator {
db: db,
inner: iterator,
direction: Direction::Forward, // blown away by set_mode()
just_seeked: false,
};
rv.set_mode(mode);
rv.seek(key);
rv
}
}
pub fn set_mode(&mut self, mode: IteratorMode) {
pub fn seek(&mut self, key: SeekKey) -> bool {
unsafe {
match mode {
IteratorMode::Start => {
rocksdb_ffi::rocksdb_iter_seek_to_first(self.inner);
self.direction = Direction::Forward;
match key {
SeekKey::Start => {
rocksdb_ffi::rocksdb_iter_seek_to_first(self.inner)
}
IteratorMode::End => {
rocksdb_ffi::rocksdb_iter_seek_to_last(self.inner);
self.direction = Direction::Reverse;
SeekKey::End => {
rocksdb_ffi::rocksdb_iter_seek_to_last(self.inner)
}
IteratorMode::From(key, dir) => {
SeekKey::Key(key) => {
rocksdb_ffi::rocksdb_iter_seek(self.inner,
key.as_ptr(),
key.len() as size_t);
self.direction = dir;
key.len() as size_t)
}
};
self.just_seeked = true;
}
}
self.valid()
}
pub fn prev(&mut self) -> bool {
unsafe {
rocksdb_ffi::rocksdb_iter_prev(self.inner);
}
self.valid()
}
pub fn next(&mut self) -> bool {
unsafe {
rocksdb_ffi::rocksdb_iter_next(self.inner);
}
self.valid()
}
pub fn key(&self) -> &[u8] {
assert!(self.valid());
let mut key_len: size_t = 0;
let key_len_ptr: *mut size_t = &mut key_len;
unsafe {
let key_ptr = rocksdb_ffi::rocksdb_iter_key(self.inner,
key_len_ptr);
slice::from_raw_parts(key_ptr, key_len as usize)
}
}
pub fn value(&self) -> &[u8] {
assert!(self.valid());
let mut val_len: size_t = 0;
let val_len_ptr: *mut size_t = &mut val_len;
unsafe {
let val_ptr = rocksdb_ffi::rocksdb_iter_value(self.inner,
val_len_ptr);
slice::from_raw_parts(val_ptr, val_len as usize)
}
}
pub fn kv(&self) -> Option<(Vec<u8>, Vec<u8>)> {
if self.valid() {
Some((self.key().to_vec(), self.value().to_vec()))
} else {
None
}
}
......@@ -164,7 +154,7 @@ impl<'a> DBIterator<'a> {
fn new_cf(db: &'a DB,
cf_handle: DBCFHandle,
readopts: &ReadOptions,
mode: IteratorMode)
key: SeekKey)
-> Result<DBIterator<'a>, String> {
unsafe {
let iterator =
......@@ -175,17 +165,29 @@ impl<'a> DBIterator<'a> {
let mut rv = DBIterator {
db: db,
inner: iterator,
direction: Direction::Forward, // blown away by set_mode()
just_seeked: false,
};
rv.set_mode(mode);
rv.seek(key);
Ok(rv)
}
}
}
pub type Kv = (Vec<u8>, Vec<u8>);
impl<'a> Iterator for DBIterator<'a> {
type Item = Kv;
fn next(&mut self) -> Option<Kv> {
let kv = self.kv();
if kv.is_some() {
DBIterator::next(self);
}
kv
}
}
impl<'a> Drop for DBIterator<'a> {
fn drop(&mut self) {
unsafe {
......@@ -196,19 +198,18 @@ impl<'a> Drop for DBIterator<'a> {
impl<'a> Snapshot<'a> {
pub fn new(db: &DB) -> Snapshot {
let snapshot = unsafe {
rocksdb_ffi::rocksdb_create_snapshot(db.inner)
};
let snapshot =
unsafe { rocksdb_ffi::rocksdb_create_snapshot(db.inner) };
Snapshot {
db: db,
inner: snapshot,
}
}
pub fn iterator(&self, mode: IteratorMode) -> DBIterator {
pub fn iter(&self, key: SeekKey) -> DBIterator {
let mut readopts = ReadOptions::new();
readopts.set_snapshot(self);
DBIterator::new(self.db, &readopts, mode)
DBIterator::new(self.db, &readopts, key)
}
pub fn get(&self, key: &[u8]) -> Result<Option<DBVector>, String> {
......@@ -291,7 +292,7 @@ impl DB {
Err(_) => {
return Err("Failed to convert path to CString when opening \
rocksdb"
.to_owned())
.to_owned())
}
};
let cpath_ptr = cpath.as_ptr();
......@@ -326,27 +327,22 @@ impl DB {
// We need to store our CStrings in an intermediate vector
// so that their pointers remain valid.
let c_cfs: Vec<CString> = cfs_v.iter()
.map(|cf| {
CString::new(cf.as_bytes())
.unwrap()
})
.collect();
.map(|cf| CString::new(cf.as_bytes()).unwrap())
.collect();
let cfnames: Vec<*const _> = c_cfs.iter()
.map(|cf| cf.as_ptr())
.collect();
.map(|cf| cf.as_ptr())
.collect();
// These handles will be populated by DB.
let cfhandles: Vec<rocksdb_ffi::DBCFHandle> =
cfs_v.iter()
.map(|_| rocksdb_ffi::DBCFHandle(0 as *mut c_void))
.collect();
let cfhandles: Vec<rocksdb_ffi::DBCFHandle> = cfs_v.iter()
.map(|_| rocksdb_ffi::DBCFHandle(0 as *mut c_void))
.collect();
// TODO(tyler) allow options to be passed in.
let cfopts: Vec<rocksdb_ffi::DBOptions> =
cfs_v.iter()
.map(|_| unsafe { rocksdb_ffi::rocksdb_options_create() })
.collect();
let cfopts: Vec<rocksdb_ffi::DBOptions> = cfs_v.iter()
.map(|_| unsafe { rocksdb_ffi::rocksdb_options_create() })
.collect();
// Prepare to ship to C.
let cfopts_ptr: *const rocksdb_ffi::DBOptions = cfopts.as_ptr();
......@@ -362,7 +358,7 @@ impl DB {
for handle in &cfhandles {
if handle.0.is_null() {
return Err("Received null column family handle from DB."
.to_owned());
.to_owned());
}
}
......@@ -455,7 +451,7 @@ impl DB {
fairly trivial call, and its failure may be \
indicative of a mis-compiled or mis-loaded rocksdb \
library."
.to_owned());
.to_owned());
}
unsafe {
......@@ -495,7 +491,7 @@ impl DB {
fairly trivial call, and its failure may be \
indicative of a mis-compiled or mis-loaded rocksdb \
library."
.to_owned());
.to_owned());
}
unsafe {
......@@ -538,7 +534,7 @@ impl DB {
Err(_) => {
return Err("Failed to convert path to CString when opening \
rocksdb"
.to_owned())
.to_owned())
}
};
let cname_ptr = cname.as_ptr();
......@@ -583,17 +579,17 @@ impl DB {
self.cfs.get(name)
}
pub fn iterator(&self, mode: IteratorMode) -> DBIterator {
pub fn iter(&self, key: SeekKey) -> DBIterator {
let opts = ReadOptions::new();
DBIterator::new(&self, &opts, mode)
DBIterator::new(&self, &opts, key)
}
pub fn iterator_cf(&self,
cf_handle: DBCFHandle,
mode: IteratorMode)
-> Result<DBIterator, String> {
pub fn iter_cf(&self,
cf_handle: DBCFHandle,
key: SeekKey)
-> Result<DBIterator, String> {
let opts = ReadOptions::new();
DBIterator::new_cf(&self, cf_handle, &opts, mode)
DBIterator::new_cf(&self, cf_handle, &opts, key)
}
pub fn snapshot(&self) -> Snapshot {
......@@ -770,17 +766,17 @@ impl DB {
ranges: &[Range])
-> Vec<u64> {
let start_keys: Vec<*const u8> = ranges.iter()
.map(|x| x.start_key.as_ptr())
.collect();
.map(|x| x.start_key.as_ptr())
.collect();
let start_key_lens: Vec<u64> = ranges.iter()
.map(|x| x.start_key.len() as u64)
.collect();
.map(|x| x.start_key.len() as u64)
.collect();
let end_keys: Vec<*const u8> = ranges.iter()
.map(|x| x.end_key.as_ptr())
.collect();
.map(|x| x.end_key.as_ptr())
.collect();
let end_key_lens: Vec<u64> = ranges.iter()
.map(|x| x.end_key.len() as u64)
.collect();
.map(|x| x.end_key.len() as u64)
.collect();
let mut sizes: Vec<u64> = vec![0; ranges.len()];
let (n,
start_key_ptr,
......@@ -1091,7 +1087,7 @@ mod test {
db.put(b"k1", b"v1111").expect("");
db.put(b"k2", b"v2222").expect("");
db.put(b"k3", b"v3333").expect("");
let iter = db.iterator(IteratorMode::Start);
let iter = db.iter(SeekKey::Start);
for (k, v) in iter {
println!("Hello {}: {}",
str::from_utf8(&*k).unwrap(),
......@@ -1105,8 +1101,8 @@ mod test {
let db = DB::open_default(path.path().to_str().unwrap()).unwrap();
for i in 1..8000 {
db.put(format!("{:04}", i).as_bytes(),
format!("{:04}", i).as_bytes())
.expect("");
format!("{:04}", i).as_bytes())
.expect("");
}
db.flush(true).expect("");
assert!(db.get(b"0001").expect("").is_some());
......
......@@ -60,9 +60,8 @@ impl Drop for WriteOptions {
impl Default for BlockBasedOptions {
fn default() -> BlockBasedOptions {
let block_opts = unsafe {
rocksdb_ffi::rocksdb_block_based_options_create()
};
let block_opts =
unsafe { rocksdb_ffi::rocksdb_block_based_options_create() };
let rocksdb_ffi::DBBlockBasedTableOptions(opt_ptr) = block_opts;
if opt_ptr.is_null() {
panic!("Could not create rocksdb block based options".to_string());
......
use rocksdb::{DB, Direction, IteratorMode, Writable, Kv};
use rocksdb::{DB, Writable, SeekKey, DBIterator, Kv};
use tempdir::TempDir;
fn collect<'a, T: Iterator<Item=Kv<'a>>>(iter: T) -> Vec<(Vec<u8>, Vec<u8>)> {
iter.map(|(k, v)| (k.to_vec(), v.to_vec())).collect()
fn prev_collect<'a>(mut iter: DBIterator<'a>) -> Vec<Kv> {
let mut buf = vec![];
while iter.valid() {
buf.push(iter.kv().unwrap());
iter.prev();
}
buf
}
#[test]
......@@ -28,96 +33,79 @@ pub fn test_iterator() {
(k2.to_vec(), v2.to_vec()),
(k3.to_vec(), v3.to_vec())];
let iterator1 = db.iterator(IteratorMode::Start);
assert_eq!(collect(iterator1), expected);
let mut iter = db.iter(SeekKey::Start);
assert_eq!(iter.collect::<Vec<_>>(), expected);
// Test that it's idempotent
let iterator1 = db.iterator(IteratorMode::Start);
assert_eq!(collect(iterator1), expected);
let iterator1 = db.iterator(IteratorMode::Start);
assert_eq!(collect(iterator1), expected);
let iterator1 = db.iterator(IteratorMode::Start);
assert_eq!(collect(iterator1), expected);
iter = db.iter(SeekKey::Start);
assert_eq!(iter.collect::<Vec<_>>(), expected);
// Test it in reverse a few times
let iterator1 = db.iterator(IteratorMode::End);
let mut tmp_vec = collect(iterator1);
iter = db.iter(SeekKey::End);
let mut tmp_vec = prev_collect(iter);
tmp_vec.reverse();
assert_eq!(tmp_vec, expected);
let iterator1 = db.iterator(IteratorMode::End);
let mut tmp_vec = collect(iterator1);
tmp_vec.reverse();
assert_eq!(tmp_vec, expected);
let iterator1 = db.iterator(IteratorMode::End);
let mut tmp_vec = collect(iterator1);
tmp_vec.reverse();
assert_eq!(tmp_vec, expected);
let iterator1 = db.iterator(IteratorMode::End);
let mut tmp_vec = collect(iterator1);
tmp_vec.reverse();
assert_eq!(tmp_vec, expected);
let iterator1 = db.iterator(IteratorMode::End);
let mut tmp_vec = collect(iterator1);
iter = db.iter(SeekKey::End);
let mut tmp_vec = prev_collect(iter);
tmp_vec.reverse();
assert_eq!(tmp_vec, expected);
// Try it forward again
let iterator1 = db.iterator(IteratorMode::Start);
assert_eq!(collect(iterator1), expected);
iter = db.iter(SeekKey::Start);
assert_eq!(iter.collect::<Vec<_>>(), expected);
let iterator1 = db.iterator(IteratorMode::Start);
assert_eq!(collect(iterator1), expected);
iter = db.iter(SeekKey::Start);
assert_eq!(iter.collect::<Vec<_>>(), expected);
let old_iterator = db.iterator(IteratorMode::Start);
let old_iterator = db.iter(SeekKey::Start);
let p = db.put(&*k4, &*v4);
assert!(p.is_ok());
let expected2 = vec![(k1.to_vec(), v1.to_vec()),
(k2.to_vec(), v2.to_vec()),
(k3.to_vec(), v3.to_vec()),
(k4.to_vec(), v4.to_vec())];
assert_eq!(collect(old_iterator), expected);
assert_eq!(old_iterator.collect::<Vec<_>>(), expected);
let iterator1 = db.iterator(IteratorMode::Start);
assert_eq!(collect(iterator1), expected2);
iter = db.iter(SeekKey::Start);
assert_eq!(iter.collect::<Vec<_>>(), expected2);
let iterator1 = db.iterator(IteratorMode::From(b"k2",
Direction::Forward));
iter = db.iter(SeekKey::Key(k2));
let expected = vec![(k2.to_vec(), v2.to_vec()),
(k3.to_vec(), v3.to_vec()),
(k4.to_vec(), v4.to_vec())];
assert_eq!(collect(iterator1), expected);
assert_eq!(iter.collect::<Vec<_>>(), expected);
let iterator1 = db.iterator(IteratorMode::From(b"k2",
Direction::Reverse));
iter = db.iter(SeekKey::Key(k2));
let expected = vec![(k2.to_vec(), v2.to_vec()), (k1.to_vec(), v1.to_vec())];
assert_eq!(collect(iterator1), expected);
let iterator1 = db.iterator(IteratorMode::From(b"k0", Direction::Forward));
assert!(iterator1.valid());
let iterator2 = db.iterator(IteratorMode::From(b"k1", Direction::Forward));
assert!(iterator2.valid());
let iterator3 = db.iterator(IteratorMode::From(b"k11", Direction::Forward));
assert!(iterator3.valid());
let iterator4 = db.iterator(IteratorMode::From(b"k5", Direction::Forward));
assert!(!iterator4.valid());
let iterator5 = db.iterator(IteratorMode::From(b"k0", Direction::Reverse));
assert!(iterator5.valid());
let iterator6 = db.iterator(IteratorMode::From(b"k1", Direction::Reverse));
assert!(iterator6.valid());
let iterator7 = db.iterator(IteratorMode::From(b"k11", Direction::Reverse));
assert!(iterator7.valid());
let iterator8 = db.iterator(IteratorMode::From(b"k5", Direction::Reverse));
assert!(!iterator8.valid());
let mut iterator1 = db.iterator(IteratorMode::From(b"k4", Direction::Forward));
iterator1.next();
assert!(iterator1.valid());
iterator1.next();
assert!(!iterator1.valid());
assert_eq!(prev_collect(iter), expected);
iter = db.iter(SeekKey::Key(b"k0"));
assert!(iter.valid());
iter.seek(SeekKey::Key(b"k1"));
assert!(iter.valid());
iter.seek(SeekKey::Key(b"k11"));
assert!(iter.valid());
iter.seek(SeekKey::Key(b"k5"));
assert!(!iter.valid());
iter.seek(SeekKey::Key(b"k0"));
assert!(iter.valid());
iter.seek(SeekKey::Key(b"k1"));
assert!(iter.valid());
iter.seek(SeekKey::Key(b"k11"));
assert!(iter.valid());
iter.seek(SeekKey::Key(b"k5"));
assert!(!iter.valid());
iter.seek(SeekKey::Key(b"k4"));
assert!(iter.valid());
iter.prev();
assert!(iter.valid());
iter.next();
assert!(iter.valid());
iter.next();
assert!(!iter.valid());
// Once iterator is invalid, it can't be reverted.
iter.prev();
assert!(!iter.valid());
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment