Commit 18aa0d37 authored by Michael Neumann's avatar Michael Neumann

Allow RocksDB to be shared across threads

parent f5e1b541
...@@ -32,6 +32,9 @@ pub struct RocksDB { ...@@ -32,6 +32,9 @@ pub struct RocksDB {
inner: rocksdb_ffi::RocksDBInstance, inner: rocksdb_ffi::RocksDBInstance,
} }
unsafe impl Send for RocksDB {}
unsafe impl Sync for RocksDB {}
pub struct WriteBatch { pub struct WriteBatch {
inner: rocksdb_ffi::RocksDBWriteBatch, inner: rocksdb_ffi::RocksDBWriteBatch,
} }
...@@ -159,9 +162,9 @@ impl <'a> Drop for Snapshot<'a> { ...@@ -159,9 +162,9 @@ impl <'a> Drop for Snapshot<'a> {
// This is for the RocksDB and write batches to share the same API // This is for the RocksDB and write batches to share the same API
pub trait Writable { pub trait Writable {
fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), String>; fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String>;
fn merge(&mut self, key: &[u8], value: &[u8]) -> Result<(), String>; fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String>;
fn delete(&mut self, key: &[u8]) -> Result<(), String>; fn delete(&self, key: &[u8]) -> Result<(), String>;
} }
fn error_message(ptr: *const i8) -> String { fn error_message(ptr: *const i8) -> String {
...@@ -308,7 +311,7 @@ impl RocksDB { ...@@ -308,7 +311,7 @@ impl RocksDB {
} }
impl Writable for RocksDB { impl Writable for RocksDB {
fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), String> { fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe { unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let mut err: *const i8 = 0 as *const i8; let mut err: *const i8 = 0 as *const i8;
...@@ -324,7 +327,7 @@ impl Writable for RocksDB { ...@@ -324,7 +327,7 @@ impl Writable for RocksDB {
} }
} }
fn merge(&mut self, key: &[u8], value: &[u8]) -> Result<(), String> { fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe { unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let mut err: *const i8 = 0 as *const i8; let mut err: *const i8 = 0 as *const i8;
...@@ -340,7 +343,7 @@ impl Writable for RocksDB { ...@@ -340,7 +343,7 @@ impl Writable for RocksDB {
} }
} }
fn delete(&mut self, key: &[u8]) -> Result<(), String> { fn delete(&self, key: &[u8]) -> Result<(), String> {
unsafe { unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let mut err: *const i8 = 0 as *const i8; let mut err: *const i8 = 0 as *const i8;
...@@ -381,7 +384,7 @@ impl Drop for RocksDB { ...@@ -381,7 +384,7 @@ impl Drop for RocksDB {
} }
impl Writable for WriteBatch { impl Writable for WriteBatch {
fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), String> { fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe { unsafe {
rocksdb_ffi::rocksdb_writebatch_put(self.inner, key.as_ptr(), rocksdb_ffi::rocksdb_writebatch_put(self.inner, key.as_ptr(),
key.len() as size_t, value.as_ptr(), key.len() as size_t, value.as_ptr(),
...@@ -390,7 +393,7 @@ impl Writable for WriteBatch { ...@@ -390,7 +393,7 @@ impl Writable for WriteBatch {
} }
} }
fn merge(&mut self, key: &[u8], value: &[u8]) -> Result<(), String> { fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe { unsafe {
rocksdb_ffi::rocksdb_writebatch_merge(self.inner, key.as_ptr(), rocksdb_ffi::rocksdb_writebatch_merge(self.inner, key.as_ptr(),
key.len() as size_t, value.as_ptr(), key.len() as size_t, value.as_ptr(),
...@@ -399,7 +402,7 @@ impl Writable for WriteBatch { ...@@ -399,7 +402,7 @@ impl Writable for WriteBatch {
} }
} }
fn delete(&mut self, key: &[u8]) -> Result<(), String> { fn delete(&self, key: &[u8]) -> Result<(), String> {
unsafe { unsafe {
rocksdb_ffi::rocksdb_writebatch_delete(self.inner, key.as_ptr(), rocksdb_ffi::rocksdb_writebatch_delete(self.inner, key.as_ptr(),
key.len() as size_t); key.len() as size_t);
......
extern crate rocksdb; extern crate rocksdb;
mod test_iterator; mod test_iterator;
mod test_multithreaded;
use rocksdb::{Options, RocksDB, Writable, Direction, RocksDBResult};
use std::thread::{self, Builder};
use std::sync::Arc;
const N: usize = 1000_000;
#[test]
pub fn test_multithreaded() {
let path = "_rust_rocksdb_multithreadtest";
let db = RocksDB::open_default(path).unwrap();
let db = Arc::new(db);
db.put(b"key", b"value1");
let db1 = db.clone();
let j1 = thread::spawn(move|| {
for i in 1..N {
db1.put(b"key", b"value1");
}
});
let db2 = db.clone();
let j2 = thread::spawn(move|| {
for i in 1..N {
db2.put(b"key", b"value2");
}
});
let db3 = db.clone();
let j3 = thread::spawn(move|| {
for i in 1..N {
match db3.get(b"key") {
RocksDBResult::Some(v) => {
if &v[..] != b"value1" && &v[..] != b"value2" {
assert!(false);
}
}
_ => {
assert!(false);
}
}
}
});
j1.join();
j2.join();
j3.join();
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment