Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
R
rust-rocksdb
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
fangzongwu
rust-rocksdb
Commits
bde37b67
Commit
bde37b67
authored
Mar 23, 2016
by
siddontang
Browse files
Options
Browse Files
Download
Plain Diff
merge master and fix conflict
parents
b87bb3f2
a22a722c
Hide whitespace changes
Inline
Side-by-side
Showing
13 changed files
with
547 additions
and
387 deletions
+547
-387
.gitignore
.gitignore
+5
-0
Cargo.toml
Cargo.toml
+2
-0
comparator.rs
src/comparator.rs
+1
-2
ffi.rs
src/ffi.rs
+108
-59
lib.rs
src/lib.rs
+15
-6
main.rs
src/main.rs
+3
-2
merge_operator.rs
src/merge_operator.rs
+36
-38
rocksdb.rs
src/rocksdb.rs
+218
-102
rocksdb_options.rs
src/rocksdb_options.rs
+20
-25
test.rs
test/test.rs
+1
-0
test_column_family.rs
test/test_column_family.rs
+12
-8
test_iterator.rs
test/test_iterator.rs
+92
-110
test_multithreaded.rs
test/test_multithreaded.rs
+34
-35
No files found.
.gitignore
View file @
bde37b67
...
...
@@ -7,3 +7,7 @@ Cargo.lock
_rust_rocksdb*
*rlib
tags
.idea/
out/
*.iml
\ No newline at end of file
Cargo.toml
View file @
bde37b67
...
...
@@ -25,3 +25,5 @@ path = "test/test.rs"
[dependencies]
libc
=
"0.1.8"
tempdir
=
"0.3.4"
clippy
=
"*"
src/comparator.rs
View file @
bde37b67
...
...
@@ -12,8 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//
extern
crate
libc
;
use
self
::
libc
::{
c_char
,
c_int
,
c_void
,
size_t
};
use
libc
::{
c_char
,
c_int
,
c_void
,
size_t
};
use
std
::
ffi
::
CString
;
use
std
::
mem
;
use
std
::
slice
;
...
...
src/ffi.rs
View file @
bde37b67
...
...
@@ -12,8 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//
extern
crate
libc
;
use
self
::
libc
::{
c_char
,
c_int
,
c_void
,
size_t
};
use
libc
::{
self
,
c_char
,
c_int
,
c_void
,
size_t
,
uint64_t
};
use
std
::
ffi
::
CStr
;
use
std
::
str
::
from_utf8
;
...
...
@@ -56,6 +55,9 @@ pub struct DBWriteBatch(pub *const c_void);
#[derive(Copy,
Clone)]
#[repr(C)]
pub
struct
DBComparator
(
pub
*
const
c_void
);
#[derive(Copy,
Clone)]
#[repr(C)]
pub
struct
DBFlushOptions
(
pub
*
const
c_void
);
pub
fn
new_bloom_filter
(
bits
:
c_int
)
->
DBFilterPolicy
{
unsafe
{
rocksdb_filterpolicy_create_bloom
(
bits
)
}
...
...
@@ -67,19 +69,19 @@ pub fn new_cache(capacity: size_t) -> DBCache {
#[repr(C)]
pub
enum
DBCompressionType
{
DBNo
Compression
=
0
,
DBSnappy
Compression
=
1
,
DBZlib
Compression
=
2
,
DBBz2
Compression
=
3
,
DBLz4
Compression
=
4
,
DBLz4hc
Compression
=
5
,
DBNo
=
0
,
DBSnappy
=
1
,
DBZlib
=
2
,
DBBz2
=
3
,
DBLz4
=
4
,
DBLz4hc
=
5
,
}
#[repr(C)]
pub
enum
DBCompactionStyle
{
DBLevel
Compaction
=
0
,
DBUniversal
Compaction
=
1
,
DBFifo
Compaction
=
2
,
DBLevel
=
0
,
DBUniversal
=
1
,
DBFifo
=
2
,
}
#[repr(C)]
...
...
@@ -92,7 +94,7 @@ pub fn error_message(ptr: *const i8) -> String {
let
c_str
=
unsafe
{
CStr
::
from_ptr
(
ptr
as
*
const
_
)
};
let
s
=
from_utf8
(
c_str
.to_bytes
())
.unwrap
()
.to_owned
();
unsafe
{
libc
::
free
(
ptr
as
*
mut
libc
::
c_void
);
libc
::
free
(
ptr
as
*
mut
c_void
);
}
s
}
...
...
@@ -410,61 +412,108 @@ extern "C" {
err
:
*
mut
*
const
i8
);
pub
fn
rocksdb_column_family_handle_destroy
(
column_family_handle
:
DBCFHandle
);
// Flush options
pub
fn
rocksdb_flushoptions_create
()
->
DBFlushOptions
;
pub
fn
rocksdb_flushoptions_destroy
(
opt
:
DBFlushOptions
);
pub
fn
rocksdb_flushoptions_set_wait
(
opt
:
DBFlushOptions
,
whether_wait
:
bool
);
pub
fn
rocksdb_flush
(
db
:
DBInstance
,
options
:
DBFlushOptions
,
err
:
*
mut
*
const
i8
);
pub
fn
rocksdb_approximate_sizes
(
db
:
DBInstance
,
num_ranges
:
c_int
,
range_start_key
:
*
const
*
const
u8
,
range_start_key_len
:
*
const
size_t
,
range_limit_key
:
*
const
*
const
u8
,
range_limit_key_len
:
*
const
size_t
,
sizes
:
*
mut
uint64_t
);
pub
fn
rocksdb_approximate_sizes_cf
(
db
:
DBInstance
,
cf
:
DBCFHandle
,
num_ranges
:
c_int
,
range_start_key
:
*
const
*
const
u8
,
range_start_key_len
:
*
const
size_t
,
range_limit_key
:
*
const
*
const
u8
,
range_limit_key_len
:
*
const
size_t
,
sizes
:
*
mut
uint64_t
);
}
#[test]
fn
internal
()
{
unsafe
{
use
std
::
ffi
::
CString
;
let
opts
=
rocksdb_options_create
();
assert
!
(
!
opts
.
0
.is_null
());
#[cfg(test)]
mod
test
{
use
super
::
*
;
use
std
::
ffi
::
CString
;
use
tempdir
::
TempDir
;
rocksdb_options_increase_parallelism
(
opts
,
0
);
rocksdb_options_optimize_level_style_compaction
(
opts
,
0
);
rocksdb_options_set_create_if_missing
(
opts
,
true
);
#[test]
fn
internal
()
{
unsafe
{
let
opts
=
rocksdb_options_create
();
assert
!
(
!
opts
.
0
.is_null
());
let
rustpath
=
"_rust_rocksdb_internaltest"
;
let
cpath
=
CString
::
new
(
rustpath
)
.unwrap
(
);
let
cpath_ptr
=
cpath
.as_ptr
(
);
rocksdb_options_increase_parallelism
(
opts
,
0
)
;
rocksdb_options_optimize_level_style_compaction
(
opts
,
0
);
rocksdb_options_set_create_if_missing
(
opts
,
true
);
let
mut
err
:
*
const
i8
=
0
as
*
const
i8
;
let
err_ptr
:
*
mut
*
const
i8
=
&
mut
err
;
let
db
=
rocksdb_open
(
opts
,
cpath_ptr
,
err_ptr
);
if
!
err
.is_null
()
{
println!
(
"failed to open rocksdb: {}"
,
error_message
(
err
));
}
assert
!
(
err
.is_null
());
let
rustpath
=
TempDir
::
new
(
"_rust_rocksdb_internaltest"
)
.expect
(
""
);
let
cpath
=
CString
::
new
(
rustpath
.path
()
.to_str
()
.unwrap
())
.unwrap
();
let
cpath_ptr
=
cpath
.as_ptr
();
let
mut
err
=
0
as
*
const
i8
;
let
db
=
rocksdb_open
(
opts
,
cpath_ptr
,
&
mut
err
);
assert
!
(
err
.is_null
(),
error_message
(
err
));
let
writeopts
=
rocksdb_writeoptions_create
();
assert
!
(
!
writeopts
.
0
.is_null
());
let
writeopts
=
rocksdb_writeoptions_create
();
assert
!
(
!
writeopts
.
0
.is_null
());
let
key
=
b
"name
\x00
"
;
let
val
=
b
"spacejam
\x00
"
;
rocksdb_put
(
db
,
writeopts
.clone
(),
key
.as_ptr
(),
4
,
val
.as_ptr
(),
8
,
&
mut
err
);
rocksdb_writeoptions_destroy
(
writeopts
);
assert
!
(
err
.is_null
(),
error_message
(
err
));
let
key
=
b
"name
\x00
"
;
let
val
=
b
"spacejam
\x00
"
;
rocksdb_put
(
db
,
writeopts
.clone
(),
key
.as_ptr
(),
4
,
val
.as_ptr
(),
8
,
err_ptr
);
rocksdb_writeoptions_destroy
(
writeopts
);
assert
!
(
err
.is_null
());
let
readopts
=
rocksdb_readoptions_create
();
assert
!
(
!
readopts
.
0
.is_null
());
let
readopts
=
rocksdb_readoptions_create
();
assert
!
(
!
readopts
.
0
.is_null
());
let
mut
val_len
=
0
;
rocksdb_get
(
db
,
readopts
.clone
(),
key
.as_ptr
(),
4
,
&
mut
val_len
,
&
mut
err
);
rocksdb_readoptions_destroy
(
readopts
);
assert
!
(
err
.is_null
(),
error_message
(
err
));
let
val_len
:
size_t
=
0
;
let
val_len_ptr
=
&
val_len
as
*
const
size_t
;
rocksdb_get
(
db
,
readopts
.clone
(),
key
.as_ptr
(),
4
,
val_len_ptr
,
err_ptr
);
rocksdb_readoptions_destroy
(
readopts
);
assert
!
(
err
.is_null
());
rocksdb_close
(
db
);
rocksdb_destroy_db
(
opts
,
cpath_ptr
,
err_ptr
);
assert
!
(
err
.is_null
());
// flush first to get approximate size later.
let
flush_opt
=
rocksdb_flushoptions_create
();
rocksdb_flushoptions_set_wait
(
flush_opt
,
true
);
rocksdb_flush
(
db
,
flush_opt
,
&
mut
err
);
rocksdb_flushoptions_destroy
(
flush_opt
);
assert
!
(
err
.is_null
(),
error_message
(
err
));
let
mut
sizes
=
vec!
[
0
;
1
];
rocksdb_approximate_sizes
(
db
,
1
,
vec!
[
b
"
\x00\x00
"
.as_ptr
()]
.as_ptr
(),
vec!
[
1
]
.as_ptr
(),
vec!
[
b
"
\xff\x00
"
.as_ptr
()]
.as_ptr
(),
vec!
[
1
]
.as_ptr
(),
sizes
.as_mut_ptr
());
assert_eq!
(
sizes
.len
(),
1
);
assert
!
(
sizes
[
0
]
>
0
);
rocksdb_close
(
db
);
rocksdb_destroy_db
(
opts
,
cpath_ptr
,
&
mut
err
);
assert
!
(
err
.is_null
());
}
}
}
src/lib.rs
View file @
bde37b67
...
...
@@ -12,14 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//
pub
use
ffi
as
rocksdb_ffi
;
pub
use
ffi
::{
DBCompactionStyle
,
DBComparator
,
new_bloom_filter
};
pub
use
rocksdb
::{
DB
,
DBIterator
,
DBVector
,
Direction
,
IteratorMode
,
Writable
,
WriteBatch
};
pub
use
rocksdb_options
::{
BlockBasedOptions
,
Options
,
WriteOptions
};
pub
use
merge_operator
::
MergeOperands
;
#
!
[
feature
(
plugin
)]
#
!
[
plugin
(
clippy
)]
extern
crate
libc
;
#[cfg(test)]
extern
crate
tempdir
;
pub
mod
rocksdb
;
pub
mod
ffi
;
pub
mod
rocksdb_options
;
pub
mod
merge_operator
;
pub
mod
comparator
;
pub
use
ffi
::{
DBCompactionStyle
,
DBComparator
,
new_bloom_filter
,
self
as
rocksdb_ffi
};
pub
use
rocksdb
::{
DB
,
DBIterator
,
DBVector
,
Direction
,
IteratorMode
,
Writable
,
WriteBatch
};
pub
use
rocksdb_options
::{
BlockBasedOptions
,
Options
,
WriteOptions
};
pub
use
merge_operator
::
MergeOperands
;
src/main.rs
View file @
bde37b67
...
...
@@ -144,8 +144,9 @@ fn main() {
#[cfg(test)]
mod
tests
{
use
rocksdb
::{
BlockBasedOptions
,
DB
,
Options
};
use
rocksdb
::
DBCompactionStyle
::
DBUniversal
Compaction
;
use
rocksdb
::
DBCompactionStyle
::
DBUniversal
;
#[allow(dead_code)]
fn
tuned_for_somebody_elses_disk
(
path
:
&
str
,
opts
:
&
mut
Options
,
blockopts
:
&
mut
BlockBasedOptions
)
...
...
@@ -163,7 +164,7 @@ mod tests {
opts
.set_min_write_buffer_number_to_merge
(
4
);
opts
.set_level_zero_stop_writes_trigger
(
2000
);
opts
.set_level_zero_slowdown_writes_trigger
(
0
);
opts
.set_compaction_style
(
DBUniversal
Compaction
);
opts
.set_compaction_style
(
DBUniversal
);
opts
.set_max_background_compactions
(
4
);
opts
.set_max_background_flushes
(
4
);
opts
.set_filter_deletes
(
false
);
...
...
src/merge_operator.rs
View file @
bde37b67
...
...
@@ -12,8 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//
extern
crate
libc
;
use
self
::
libc
::{
c_char
,
c_int
,
c_void
,
size_t
};
use
libc
::{
self
,
c_char
,
c_int
,
c_void
,
size_t
};
use
std
::
ffi
::
CString
;
use
std
::
mem
;
use
std
::
ptr
;
...
...
@@ -128,9 +127,10 @@ impl MergeOperands {
impl
<
'a
>
Iterator
for
&
'a
mut
MergeOperands
{
type
Item
=
&
'a
[
u8
];
fn
next
(
&
mut
self
)
->
Option
<&
'a
[
u8
]
>
{
match
self
.cursor
==
self
.num_operands
{
true
=>
None
,
false
=>
unsafe
{
if
self
.cursor
==
self
.num_operands
{
None
}
else
{
unsafe
{
let
base
=
self
.operands_list
as
usize
;
let
base_len
=
self
.operands_list_len
as
usize
;
let
spacing
=
mem
::
size_of
::
<*
const
*
const
u8
>
();
...
...
@@ -142,7 +142,7 @@ impl<'a> Iterator for &'a mut MergeOperands {
self
.cursor
+=
1
;
Some
(
mem
::
transmute
(
slice
::
from_raw_parts
(
*
(
ptr
as
*
const
*
const
u8
)
as
*
const
u8
,
len
)))
}
,
}
}
}
...
...
@@ -152,43 +152,42 @@ impl<'a> Iterator for &'a mut MergeOperands {
}
}
#[allow(unused_variables)]
#[allow(dead_code)]
fn
test_provided_merge
(
new_key
:
&
[
u8
],
existing_val
:
Option
<&
[
u8
]
>
,
operands
:
&
mut
MergeOperands
)
->
Vec
<
u8
>
{
let
nops
=
operands
.size_hint
()
.
0
;
let
mut
result
:
Vec
<
u8
>
=
Vec
::
with_capacity
(
nops
);
match
existing_val
{
Some
(
v
)
=>
{
#[cfg(test)]
mod
test
{
use
super
::
*
;
use
rocksdb_options
::
Options
;
use
rocksdb
::{
DB
,
DBVector
,
Writable
};
use
tempdir
::
TempDir
;
#[allow(unused_variables)]
#[allow(dead_code)]
fn
test_provided_merge
(
new_key
:
&
[
u8
],
existing_val
:
Option
<&
[
u8
]
>
,
operands
:
&
mut
MergeOperands
)
->
Vec
<
u8
>
{
let
nops
=
operands
.size_hint
()
.
0
;
let
mut
result
:
Vec
<
u8
>
=
Vec
::
with_capacity
(
nops
);
if
let
Some
(
v
)
=
existing_val
{
for
e
in
v
{
result
.push
(
*
e
);
}
}
None
=>
(),
}
for
op
in
operands
{
for
e
in
op
{
result
.push
(
*
e
);
for
op
in
operands
{
for
e
in
op
{
result
.push
(
*
e
);
}
}
result
}
result
}
#[allow(dead_code)]
#[test]
fn
mergetest
()
{
use
rocksdb_options
::
Options
;
use
rocksdb
::{
DB
,
DBVector
,
Writable
};
let
path
=
"_rust_rocksdb_mergetest"
;
let
mut
opts
=
Options
::
new
();
opts
.create_if_missing
(
true
);
opts
.add_merge_operator
(
"test operator"
,
test_provided_merge
);
{
let
db
=
DB
::
open
(
&
opts
,
path
)
.unwrap
();
#[allow(dead_code)]
#[test]
fn
mergetest
()
{
let
path
=
TempDir
::
new
(
"_rust_rocksdb_mergetest"
)
.expect
(
""
);
let
mut
opts
=
Options
::
new
();
opts
.create_if_missing
(
true
);
opts
.add_merge_operator
(
"test operator"
,
test_provided_merge
);
let
db
=
DB
::
open
(
&
opts
,
path
.path
()
.to_str
()
.unwrap
())
.unwrap
();
let
p
=
db
.put
(
b
"k1"
,
b
"a"
);
assert
!
(
p
.is_ok
());
let
_
=
db
.merge
(
b
"k1"
,
b
"b"
);
...
...
@@ -204,7 +203,7 @@ fn mergetest() {
None
=>
println!
(
"did not read valid utf-8 out of the db"
),
}
}
Err
(
e
)
=>
println!
(
"error reading value
"
),
Err
(
e
)
=>
println!
(
"error reading value
{:?}"
,
e
),
_
=>
panic!
(
"value not present"
),
}
...
...
@@ -214,5 +213,4 @@ fn mergetest() {
assert
!
(
db
.delete
(
b
"k1"
)
.is_ok
());
assert
!
(
db
.get
(
b
"k1"
)
.unwrap
()
.is_none
());
}
assert
!
(
DB
::
destroy
(
&
opts
,
path
)
.is_ok
());
}
src/rocksdb.rs
View file @
bde37b67
...
...
@@ -12,9 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//
extern
crate
libc
;
use
std
::
collections
::
BTreeMap
;
use
std
::
ffi
::
CString
;
use
std
::
fs
;
...
...
@@ -23,11 +20,13 @@ use std::path::Path;
use
std
::
slice
;
use
std
::
str
::
from_utf8
;
use
self
::
libc
::{
c_void
,
size_t
};
use
libc
::{
self
,
c_int
,
c_void
,
size_t
};
use
rocksdb_ffi
::{
self
,
DBCFHandle
,
error_message
};
use
rocksdb_options
::{
Options
,
WriteOptions
};
const
DEFAULT_COLUMN_FAMILY
:
&
'static
str
=
"default"
;
pub
struct
DB
{
inner
:
rocksdb_ffi
::
DBInstance
,
cfs
:
BTreeMap
<
String
,
DBCFHandle
>
,
...
...
@@ -111,10 +110,7 @@ pub enum IteratorMode<'a> {
impl
DBIterator
{
fn
new
<
'b
>
(
db
:
&
DB
,
readopts
:
&
'b
ReadOptions
,
mode
:
IteratorMode
)
->
DBIterator
{
fn
new
(
db
:
&
DB
,
readopts
:
&
ReadOptions
,
mode
:
IteratorMode
)
->
DBIterator
{
unsafe
{
let
iterator
=
rocksdb_ffi
::
rocksdb_create_iterator
(
db
.inner
,
readopts
.inner
);
...
...
@@ -244,6 +240,24 @@ pub trait Writable {
fn
delete_cf
(
&
self
,
cf
:
DBCFHandle
,
key
:
&
[
u8
])
->
Result
<
(),
String
>
;
}
/// A range of keys, start_key is included, but not end_key.
///
/// You should make sure end_key is not less than start_key.
pub
struct
Range
<
'a
>
{
start_key
:
&
'a
[
u8
],
end_key
:
&
'a
[
u8
],
}
impl
<
'a
>
Range
<
'a
>
{
pub
fn
new
(
start_key
:
&
'a
[
u8
],
end_key
:
&
'a
[
u8
])
->
Range
<
'a
>
{
assert
!
(
start_key
<=
end_key
);
Range
{
start_key
:
start_key
,
end_key
:
end_key
,
}
}
}
impl
DB
{
pub
fn
open_default
(
path
:
&
str
)
->
Result
<
DB
,
String
>
{
let
mut
opts
=
Options
::
new
();
...
...
@@ -264,19 +278,18 @@ impl DB {
Err
(
_
)
=>
{
return
Err
(
"Failed to convert path to CString when opening
\
rocksdb"
.to_
string
())
.to_
owned
())
}
};
let
cpath_ptr
=
cpath
.as_ptr
();
let
ospath
=
Path
::
new
(
path
);
match
fs
::
create_dir_all
(
&
ospath
)
{
Err
(
e
)
=>
{
return
Err
(
format!
(
"Failed to create rocksdb directory:
\
{:?}"
,
e
))
}
Ok
(
_
)
=>
(),
if
let
Err
(
e
)
=
fs
::
create_dir_all
(
&
ospath
)
{
return
Err
(
format!
(
"Failed to create rocksdb directory:
\
src/rocksdb.rs:
\
{:?}"
,
e
));
}
let
mut
err
:
*
const
i8
=
0
as
*
const
i8
;
...
...
@@ -293,8 +306,8 @@ impl DB {
}
else
{
let
mut
cfs_v
=
cfs
.to_vec
();
// Always open the default column family
if
!
cfs_v
.contains
(
&
"default"
)
{
cfs_v
.push
(
"default"
);
if
!
cfs_v
.contains
(
&
DEFAULT_COLUMN_FAMILY
)
{
cfs_v
.push
(
DEFAULT_COLUMN_FAMILY
);
}
// We need to store our CStrings in an intermediate vector
...
...
@@ -328,20 +341,20 @@ impl DB {
let
nfam
=
cfs_v
.len
();
unsafe
{
db
=
rocksdb_ffi
::
rocksdb_open_column_families
(
opts
.inner
,
cpath_ptr
as
*
const
_
,
nfam
as
libc
::
c_int
,
nfam
as
c_int
,
cfnames
.as_ptr
()
as
*
const
_
,
copts
,
handles
,
err_ptr
);
}
for
handle
in
cfhandles
.iter
()
{
for
handle
in
&
cfhandles
{
if
handle
.
0
.is_null
()
{
return
Err
(
"Received null column family handle from DB."
.to_
string
());
.to_
owned
());
}
}
for
(
n
,
h
)
in
cfs_v
.iter
()
.zip
(
cfhandles
)
{
cf_map
.insert
(
n
.to_string
(),
h
);
cf_map
.insert
(
(
*
n
)
.to_owned
(),
h
);
}
}
...
...
@@ -349,7 +362,7 @@ impl DB {
return
Err
(
error_message
(
err
));
}
if
db
.
0
.is_null
()
{
return
Err
(
"Could not initialize database."
.to_
string
());
return
Err
(
"Could not initialize database."
.to_
owned
());
}
Ok
(
DB
{
...
...
@@ -407,7 +420,7 @@ impl DB {
if
!
err
.is_null
()
{
return
Err
(
error_message
(
err
));
}
return
Ok
(());
Ok
(())
}
pub
fn
write
(
&
self
,
batch
:
WriteBatch
)
->
Result
<
(),
String
>
{
...
...
@@ -423,7 +436,7 @@ impl DB {
fairly trivial call, and its failure may be
\
indicative of a mis-compiled or mis-loaded rocksdb
\
library."
.to_
string
());
.to_
owned
());
}
unsafe
{
...
...
@@ -441,9 +454,10 @@ impl DB {
if
!
err
.is_null
()
{
return
Err
(
error_message
(
err
));
}
match
val
.is_null
()
{
true
=>
Ok
(
None
),
false
=>
Ok
(
Some
(
DBVector
::
from_c
(
val
,
val_len
))),
if
val
.is_null
()
{
Ok
(
None
)
}
else
{
Ok
(
Some
(
DBVector
::
from_c
(
val
,
val_len
)))
}
}
}
...
...
@@ -462,7 +476,7 @@ impl DB {
fairly trivial call, and its failure may be
\
indicative of a mis-compiled or mis-loaded rocksdb
\
library."
.to_
string
());
.to_
owned
());
}
unsafe
{
...
...
@@ -481,9 +495,10 @@ impl DB {
if
!
err
.is_null
()
{
return
Err
(
error_message
(
err
));
}
match
val
.is_null
()
{
true
=>
Ok
(
None
),
false
=>
Ok
(
Some
(
DBVector
::
from_c
(
val
,
val_len
))),
if
val
.is_null
()
{
Ok
(
None
)
}
else
{
Ok
(
Some
(
DBVector
::
from_c
(
val
,
val_len
)))
}
}
}
...
...
@@ -504,7 +519,7 @@ impl DB {
Err
(
_
)
=>
{
return
Err
(
"Failed to convert path to CString when opening
\
rocksdb"
.to_
string
())
.to_
owned
())
}
};
let
cname_ptr
=
cname
.as_ptr
();
...
...
@@ -516,7 +531,7 @@ impl DB {
opts
.inner
,
cname_ptr
as
*
const
_
,
err_ptr
);
self
.cfs
.insert
(
name
.to_
string
(),
cf_handler
);
self
.cfs
.insert
(
name
.to_
owned
(),
cf_handler
);
cf_handler
};
if
!
err
.is_null
()
{
...
...
@@ -528,7 +543,7 @@ impl DB {
pub
fn
drop_cf
(
&
mut
self
,
name
:
&
str
)
->
Result
<
(),
String
>
{
let
cf
=
self
.cfs
.get
(
name
);
if
cf
.is_none
()
{
return
Err
(
format!
(
"Invalid column family: {}"
,
name
)
.
to_string
());
return
Err
(
format!
(
"Invalid column family: {}"
,
name
)
.
clone
());
}
let
mut
err
:
*
const
i8
=
0
as
*
const
i8
;
...
...
@@ -693,6 +708,91 @@ impl DB {
Ok
(())
}
}
/// Flush all memtable data.
///
/// Due to lack of abi, only default cf is supported.
///
/// If sync, the flush will wait until the flush is done.
pub
fn
flush
(
&
self
,
sync
:
bool
)
->
Result
<
(),
String
>
{
unsafe
{
let
opts
=
rocksdb_ffi
::
rocksdb_flushoptions_create
();
rocksdb_ffi
::
rocksdb_flushoptions_set_wait
(
opts
,
sync
);
let
mut
err
=
0
as
*
const
i8
;
rocksdb_ffi
::
rocksdb_flush
(
self
.inner
,
opts
,
&
mut
err
);
rocksdb_ffi
::
rocksdb_flushoptions_destroy
(
opts
);
if
!
err
.is_null
()
{
return
Err
(
error_message
(
err
));
}
Ok
(())
}
}
/// Return the approximate file system space used by keys in each ranges.
///
/// Note that the returned sizes measure file system space usage, so
/// if the user data compresses by a factor of ten, the returned
/// sizes will be one-tenth the size of the corresponding user data size.
///
/// Due to lack of abi, only data flushed to disk is taken into account.
pub
fn
get_approximate_sizes
(
&
self
,
ranges
:
&
[
Range
])
->
Vec
<
u64
>
{
self
.get_approximate_sizes_cfopt
(
None
,
ranges
)
}
pub
fn
get_approximate_sizes_cf
(
&
self
,
cf
:
DBCFHandle
,
ranges
:
&
[
Range
])
->
Vec
<
u64
>
{
self
.get_approximate_sizes_cfopt
(
Some
(
cf
),
ranges
)
}
fn
get_approximate_sizes_cfopt
(
&
self
,
cf
:
Option
<
DBCFHandle
>
,
ranges
:
&
[
Range
])
->
Vec
<
u64
>
{
let
start_keys
:
Vec
<*
const
u8
>
=
ranges
.iter
()
.map
(|
x
|
x
.start_key
.as_ptr
())
.collect
();
let
start_key_lens
:
Vec
<
u64
>
=
ranges
.iter
()
.map
(|
x
|
x
.start_key
.len
()
as
u64
)
.collect
();
let
end_keys
:
Vec
<*
const
u8
>
=
ranges
.iter
()
.map
(|
x
|
x
.end_key
.as_ptr
())
.collect
();
let
end_key_lens
:
Vec
<
u64
>
=
ranges
.iter
()
.map
(|
x
|
x
.end_key
.len
()
as
u64
)
.collect
();
let
mut
sizes
:
Vec
<
u64
>
=
vec!
[
0
;
ranges
.len
()];
let
(
n
,
sk_ptr
,
skl_ptr
,
ek_ptr
,
ekl_ptr
,
s_ptr
)
=
(
ranges
.len
()
as
i32
,
start_keys
.as_ptr
(),
start_key_lens
.as_ptr
(),
end_keys
.as_ptr
(),
end_key_lens
.as_ptr
(),
sizes
.as_mut_ptr
());
match
cf
{
None
=>
unsafe
{
rocksdb_ffi
::
rocksdb_approximate_sizes
(
self
.inner
,
n
,
sk_ptr
,
skl_ptr
,
ek_ptr
,
ekl_ptr
,
s_ptr
)
},
Some
(
cf
)
=>
unsafe
{
rocksdb_ffi
::
rocksdb_approximate_sizes_cf
(
self
.inner
,
cf
,
n
,
sk_ptr
,
skl_ptr
,
ek_ptr
,
ekl_ptr
,
s_ptr
)
},
}
sizes
}
}
impl
Writable
for
DB
{
...
...
@@ -746,7 +846,7 @@ impl Drop for WriteBatch {
impl
Drop
for
DB
{
fn
drop
(
&
mut
self
)
{
unsafe
{
for
(
_
,
cf
)
in
self
.cfs
.iter
()
{
for
cf
in
self
.cfs
.values
()
{
rocksdb_ffi
::
rocksdb_column_family_handle_destroy
(
*
cf
);
}
rocksdb_ffi
::
rocksdb_close
(
self
.inner
);
...
...
@@ -874,7 +974,7 @@ impl Deref for DBVector {
impl
Drop
for
DBVector
{
fn
drop
(
&
mut
self
)
{
unsafe
{
libc
::
free
(
self
.base
as
*
mut
libc
::
c_void
);
libc
::
free
(
self
.base
as
*
mut
c_void
);
}
}
}
...
...
@@ -887,16 +987,22 @@ impl DBVector {
}
}
pub
fn
to_utf8
<
'a
>
(
&
'a
self
)
->
Option
<&
'a
str
>
{
pub
fn
to_utf8
(
&
self
)
->
Option
<&
str
>
{
from_utf8
(
self
.deref
())
.ok
()
}
}
#[test]
fn
external
()
{
let
path
=
"_rust_rocksdb_externaltest"
;
{
let
db
=
DB
::
open_default
(
path
)
.unwrap
();
#[cfg(test)]
mod
test
{
use
super
::
*
;
use
rocksdb_options
::
*
;
use
std
::
str
;
use
tempdir
::
TempDir
;
#[test]
fn
external
()
{
let
path
=
TempDir
::
new
(
"_rust_rocksdb_externaltest"
)
.expect
(
""
);
let
db
=
DB
::
open_default
(
path
.path
()
.to_str
()
.unwrap
())
.unwrap
();
let
p
=
db
.put
(
b
"k1"
,
b
"v1111"
);
assert
!
(
p
.is_ok
());
let
r
:
Result
<
Option
<
DBVector
>
,
String
>
=
db
.get
(
b
"k1"
);
...
...
@@ -904,76 +1010,83 @@ fn external() {
assert
!
(
db
.delete
(
b
"k1"
)
.is_ok
());
assert
!
(
db
.get
(
b
"k1"
)
.unwrap
()
.is_none
());
}
let
opts
=
Options
::
new
();
let
result
=
DB
::
destroy
(
&
opts
,
path
);
assert
!
(
result
.is_ok
());
}
#[test
]
fn
errors_do_stuff
()
{
let
path
=
"_rust_rocksdb_error"
;
let
db
=
DB
::
open_default
(
path
)
.unwrap
(
);
let
opts
=
Options
::
new
();
// The DB will still be open when we try to destroy and the lock should fail
match
DB
::
destroy
(
&
opts
,
path
)
{
Err
(
ref
s
)
=>
{
assert
!
(
s
==
"IO error: lock _rust_rocksdb_error/LOCK: No locks
\
available"
)
#[allow(unused_variables)
]
#[test]
fn
errors_do_stuff
()
{
let
path
=
TempDir
::
new
(
"_rust_rocksdb_error"
)
.expect
(
""
);
let
path_str
=
path
.path
()
.to_str
()
.unwrap
();
let
db
=
DB
::
open_default
(
path_str
)
.unwrap
();
let
opts
=
Options
::
new
();
// The DB will still be open when we try to destroy and the lock should fail
match
DB
::
destroy
(
&
opts
,
path_str
)
{
Err
(
ref
s
)
=>
assert
!
(
s
.contains
(
"LOCK: No locks available"
)),
Ok
(
_
)
=>
panic!
(
"should fail"
),
}
Ok
(
_
)
=>
panic!
(
"should fail"
),
}
}
#[test]
fn
writebatch_works
()
{
let
path
=
"_rust_rocksdb_writebacktest"
;
{
let
db
=
DB
::
open_default
(
path
)
.unwrap
();
{
// test put
let
batch
=
WriteBatch
::
new
();
assert
!
(
db
.get
(
b
"k1"
)
.unwrap
()
.is_none
());
let
_
=
batch
.put
(
b
"k1"
,
b
"v1111"
);
assert
!
(
db
.get
(
b
"k1"
)
.unwrap
()
.is_none
());
let
p
=
db
.write
(
batch
);
assert
!
(
p
.is_ok
());
let
r
:
Result
<
Option
<
DBVector
>
,
String
>
=
db
.get
(
b
"k1"
);
assert
!
(
r
.unwrap
()
.unwrap
()
.to_utf8
()
.unwrap
()
==
"v1111"
);
}
{
// test delete
let
batch
=
WriteBatch
::
new
();
let
_
=
batch
.delete
(
b
"k1"
);
let
p
=
db
.write
(
batch
);
assert
!
(
p
.is_ok
());
assert
!
(
db
.get
(
b
"k1"
)
.unwrap
()
.is_none
());
}
}
let
opts
=
Options
::
new
();
assert
!
(
DB
::
destroy
(
&
opts
,
path
)
.is_ok
());
}
#[test]
fn
writebatch_works
()
{
let
path
=
TempDir
::
new
(
"_rust_rocksdb_writebacktest"
)
.expect
(
""
);
let
db
=
DB
::
open_default
(
path
.path
()
.to_str
()
.unwrap
())
.unwrap
();
#[test]
fn
iterator_test
()
{
let
path
=
"_rust_rocksdb_iteratortest"
;
{
let
db
=
DB
::
open_default
(
path
)
.unwrap
();
let
p
=
db
.put
(
b
"k1"
,
b
"v1111"
);
assert
!
(
p
.is_ok
());
let
p
=
db
.put
(
b
"k2"
,
b
"v2222"
);
// test put
let
batch
=
WriteBatch
::
new
();
assert
!
(
db
.get
(
b
"k1"
)
.unwrap
()
.is_none
());
let
_
=
batch
.put
(
b
"k1"
,
b
"v1111"
);
assert
!
(
db
.get
(
b
"k1"
)
.unwrap
()
.is_none
());
let
p
=
db
.write
(
batch
);
assert
!
(
p
.is_ok
());
let
p
=
db
.put
(
b
"k3"
,
b
"v3333"
);
let
r
:
Result
<
Option
<
DBVector
>
,
String
>
=
db
.get
(
b
"k1"
);
assert
!
(
r
.unwrap
()
.unwrap
()
.to_utf8
()
.unwrap
()
==
"v1111"
);
// test delete
let
batch
=
WriteBatch
::
new
();
let
_
=
batch
.delete
(
b
"k1"
);
let
p
=
db
.write
(
batch
);
assert
!
(
p
.is_ok
());
assert
!
(
db
.get
(
b
"k1"
)
.unwrap
()
.is_none
());
}
#[test]
fn
iterator_test
()
{
let
path
=
TempDir
::
new
(
"_rust_rocksdb_iteratortest"
)
.expect
(
""
);
let
db
=
DB
::
open_default
(
path
.path
()
.to_str
()
.unwrap
())
.unwrap
();
db
.put
(
b
"k1"
,
b
"v1111"
)
.expect
(
""
);
db
.put
(
b
"k2"
,
b
"v2222"
)
.expect
(
""
);
db
.put
(
b
"k3"
,
b
"v3333"
)
.expect
(
""
);
let
iter
=
db
.iterator
(
IteratorMode
::
Start
);
for
(
k
,
v
)
in
iter
{
println!
(
"Hello {}: {}"
,
from_utf8
(
&*
k
)
.unwrap
(),
from_utf8
(
&*
v
)
.unwrap
());
str
::
from_utf8
(
&*
k
)
.unwrap
(),
str
::
from_utf8
(
&*
v
)
.unwrap
());
}
}
let
opts
=
Options
::
new
();
assert
!
(
DB
::
destroy
(
&
opts
,
path
)
.is_ok
());
#[test]
fn
approximate_size_test
()
{
let
path
=
TempDir
::
new
(
"_rust_rocksdb_iteratortest"
)
.expect
(
""
);
let
db
=
DB
::
open_default
(
path
.path
()
.to_str
()
.unwrap
())
.unwrap
();
for
i
in
1
..
8000
{
db
.put
(
format!
(
"{:04}"
,
i
)
.as_bytes
(),
format!
(
"{:04}"
,
i
)
.as_bytes
())
.expect
(
""
);
}
db
.flush
(
true
)
.expect
(
""
);
assert
!
(
db
.get
(
b
"0001"
)
.expect
(
""
)
.is_some
());
db
.flush
(
true
)
.expect
(
""
);
let
sizes
=
db
.get_approximate_sizes
(
&
[
Range
::
new
(
b
"0000"
,
b
"2000"
),
Range
::
new
(
b
"2000"
,
b
"4000"
),
Range
::
new
(
b
"4000"
,
b
"6000"
),
Range
::
new
(
b
"6000"
,
b
"8000"
),
Range
::
new
(
b
"8000"
,
b
"9999"
)]);
assert_eq!
(
sizes
.len
(),
5
);
for
s
in
&
sizes
[
0
..
4
]
{
assert
!
(
*
s
>
0
);
}
assert_eq!
(
sizes
[
4
],
0
);
}
}
#[test]
...
...
@@ -985,7 +1098,10 @@ fn snapshot_test() {
assert
!
(
p
.is_ok
());
let
snap
=
db
.snapshot
();
let
r
:
Result
<
Option
<
DBVector
>
,
String
>
=
snap
.get
(
b
"k1"
);
let
mut
r
:
Result
<
Option
<
DBVector
>
,
String
>
=
snap
.get
(
b
"k1"
);
assert
!
(
r
.unwrap
()
.unwrap
()
.to_utf8
()
.unwrap
()
==
"v1111"
);
r
=
db
.get
(
b
"k1"
);
assert
!
(
r
.unwrap
()
.unwrap
()
.to_utf8
()
.unwrap
()
==
"v1111"
);
let
p
=
db
.put
(
b
"k2"
,
b
"v2222"
);
...
...
src/rocksdb_options.rs
View file @
bde37b67
...
...
@@ -12,8 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//
extern
crate
libc
;
use
self
::
libc
::{
c_int
,
size_t
};
use
libc
::{
c_int
,
size_t
};
use
std
::
ffi
::
CString
;
use
std
::
mem
;
...
...
@@ -136,12 +135,12 @@ impl Options {
}
}
pub
fn
add_merge_operator
<
'a
>
(
&
mut
self
,
name
:
&
str
,
merge_fn
:
fn
(
&
[
u8
],
Option
<&
[
u8
]
>
,
&
mut
MergeOperands
)
->
Vec
<
u8
>
)
{
pub
fn
add_merge_operator
(
&
mut
self
,
name
:
&
str
,
merge_fn
:
fn
(
&
[
u8
],
Option
<&
[
u8
]
>
,
&
mut
MergeOperands
)
->
Vec
<
u8
>
)
{
let
cb
=
Box
::
new
(
MergeOperatorCallback
{
name
:
CString
::
new
(
name
.as_bytes
())
.unwrap
(),
merge_fn
:
merge_fn
,
...
...
@@ -159,9 +158,9 @@ impl Options {
}
}
pub
fn
add_comparator
<
'a
>
(
&
mut
self
,
name
:
&
str
,
compare_fn
:
fn
(
&
[
u8
],
&
[
u8
])
->
i32
)
{
pub
fn
add_comparator
(
&
mut
self
,
name
:
&
str
,
compare_fn
:
fn
(
&
[
u8
],
&
[
u8
])
->
i32
)
{
let
cb
=
Box
::
new
(
ComparatorCallback
{
name
:
CString
::
new
(
name
.as_bytes
())
.unwrap
(),
f
:
compare_fn
,
...
...
@@ -193,13 +192,10 @@ impl Options {
pub
fn
set_use_fsync
(
&
mut
self
,
useit
:
bool
)
{
unsafe
{
match
useit
{
true
=>
{
rocksdb_ffi
::
rocksdb_options_set_use_fsync
(
self
.inner
,
1
)
}
false
=>
{
rocksdb_ffi
::
rocksdb_options_set_use_fsync
(
self
.inner
,
0
)
}
if
useit
{
rocksdb_ffi
::
rocksdb_options_set_use_fsync
(
self
.inner
,
1
)
}
else
{
rocksdb_ffi
::
rocksdb_options_set_use_fsync
(
self
.inner
,
0
)
}
}
}
...
...
@@ -212,13 +208,12 @@ impl Options {
pub
fn
set_disable_data_sync
(
&
mut
self
,
disable
:
bool
)
{
unsafe
{
match
disable
{
true
=>
rocksdb_ffi
::
rocksdb_options_set_disable_data_sync
(
self
.inner
,
1
),
false
=>
rocksdb_ffi
::
rocksdb_options_set_disable_data_sync
(
self
.inner
,
0
),
if
disable
{
rocksdb_ffi
::
rocksdb_options_set_disable_data_sync
(
self
.inner
,
1
);
}
else
{
rocksdb_ffi
::
rocksdb_options_set_disable_data_sync
(
self
.inner
,
0
);
}
}
}
...
...
test/test.rs
View file @
bde37b67
extern
crate
rocksdb
;
extern
crate
tempdir
;
mod
test_iterator
;
mod
test_multithreaded
;
...
...
test/test_column_family.rs
View file @
bde37b67
...
...
@@ -13,17 +13,19 @@
// limitations under the License.
//
use
rocksdb
::{
DB
,
MergeOperands
,
Options
,
Writable
};
use
tempdir
::
TempDir
;
#[test]
pub
fn
test_column_family
()
{
let
path
=
"_rust_rocksdb_cftest"
;
let
path
=
TempDir
::
new
(
"_rust_rocksdb_cftest"
)
.expect
(
""
);
let
path_str
=
path
.path
()
.to_str
()
.unwrap
();
// should be able to create column families
{
let
mut
opts
=
Options
::
new
();
opts
.create_if_missing
(
true
);
opts
.add_merge_operator
(
"test operator"
,
test_provided_merge
);
let
mut
db
=
DB
::
open
(
&
opts
,
path
)
.unwrap
();
let
mut
db
=
DB
::
open
(
&
opts
,
path
_str
)
.unwrap
();
let
opts
=
Options
::
new
();
match
db
.create_cf
(
"cf1"
,
&
opts
)
{
Ok
(
_
)
=>
println!
(
"cf1 created successfully"
),
...
...
@@ -37,7 +39,7 @@ pub fn test_column_family() {
{
let
mut
opts
=
Options
::
new
();
opts
.add_merge_operator
(
"test operator"
,
test_provided_merge
);
match
DB
::
open
(
&
opts
,
path
)
{
match
DB
::
open
(
&
opts
,
path
_str
)
{
Ok
(
_
)
=>
{
panic!
(
"should not have opened DB successfully without
\
specifying column
...
...
@@ -54,7 +56,7 @@ pub fn test_column_family() {
{
let
mut
opts
=
Options
::
new
();
opts
.add_merge_operator
(
"test operator"
,
test_provided_merge
);
match
DB
::
open_cf
(
&
opts
,
path
,
&
[
"cf1"
])
{
match
DB
::
open_cf
(
&
opts
,
path
_str
,
&
[
"cf1"
])
{
Ok
(
_
)
=>
println!
(
"successfully opened db with column family"
),
Err
(
e
)
=>
panic!
(
"failed to open db with column family: {}"
,
e
),
}
...
...
@@ -63,7 +65,7 @@ pub fn test_column_family() {
{
let
mut
opts
=
Options
::
new
();
opts
.add_merge_operator
(
"test operator"
,
test_provided_merge
);
let
db
=
match
DB
::
open_cf
(
&
opts
,
path
,
&
[
"cf1"
])
{
let
db
=
match
DB
::
open_cf
(
&
opts
,
path
_str
,
&
[
"cf1"
])
{
Ok
(
db
)
=>
{
println!
(
"successfully opened db with column family"
);
db
...
...
@@ -76,6 +78,9 @@ pub fn test_column_family() {
"v1"
);
let
p
=
db
.put_cf
(
cf1
,
b
"k1"
,
b
"a"
);
assert
!
(
p
.is_ok
());
/*
// TODO support family merge operator
// have not finished yet, following codes won't work.
db.merge_cf(cf1, b"k1", b"b").unwrap();
db.merge_cf(cf1, b"k1", b"c").unwrap();
db.merge_cf(cf1, b"k1", b"d").unwrap();
...
...
@@ -98,6 +103,7 @@ pub fn test_column_family() {
// TODO assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh");
assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").unwrap().is_none());
*/
}
// TODO should be able to use writebatch ops with a cf
{
...
...
@@ -107,14 +113,12 @@ pub fn test_column_family() {
}
// should b able to drop a cf
{
let
mut
db
=
DB
::
open_cf
(
&
Options
::
new
(),
path
,
&
[
"cf1"
])
.unwrap
();
let
mut
db
=
DB
::
open_cf
(
&
Options
::
new
(),
path
_str
,
&
[
"cf1"
])
.unwrap
();
match
db
.drop_cf
(
"cf1"
)
{
Ok
(
_
)
=>
println!
(
"cf1 successfully dropped."
),
Err
(
e
)
=>
panic!
(
"failed to drop column family: {}"
,
e
),
}
}
assert
!
(
DB
::
destroy
(
&
Options
::
new
(),
path
)
.is_ok
());
}
fn
test_provided_merge
(
_
:
&
[
u8
],
...
...
test/test_iterator.rs
View file @
bde37b67
use
rocksdb
::{
DB
,
Direction
,
IteratorMode
,
Options
,
Writable
};
use
rocksdb
::{
DB
,
Direction
,
IteratorMode
,
Writable
};
use
tempdir
::
TempDir
;
fn
cba
(
input
:
&
Box
<
[
u8
]
>
)
->
Box
<
[
u8
]
>
{
input
.iter
()
.cloned
()
.collect
::
<
Vec
<
_
>>
()
.into_boxed_slice
()
...
...
@@ -6,113 +7,94 @@ fn cba(input: &Box<[u8]>) -> Box<[u8]> {
#[test]
pub
fn
test_iterator
()
{
let
path
=
"_rust_rocksdb_iteratortest"
;
{
let
k1
:
Box
<
[
u8
]
>
=
b
"k1"
.to_vec
()
.into_boxed_slice
();
let
k2
:
Box
<
[
u8
]
>
=
b
"k2"
.to_vec
()
.into_boxed_slice
();
let
k3
:
Box
<
[
u8
]
>
=
b
"k3"
.to_vec
()
.into_boxed_slice
();
let
k4
:
Box
<
[
u8
]
>
=
b
"k4"
.to_vec
()
.into_boxed_slice
();
let
v1
:
Box
<
[
u8
]
>
=
b
"v1111"
.to_vec
()
.into_boxed_slice
();
let
v2
:
Box
<
[
u8
]
>
=
b
"v2222"
.to_vec
()
.into_boxed_slice
();
let
v3
:
Box
<
[
u8
]
>
=
b
"v3333"
.to_vec
()
.into_boxed_slice
();
let
v4
:
Box
<
[
u8
]
>
=
b
"v4444"
.to_vec
()
.into_boxed_slice
();
let
db
=
DB
::
open_default
(
path
)
.unwrap
();
let
p
=
db
.put
(
&*
k1
,
&*
v1
);
assert
!
(
p
.is_ok
());
let
p
=
db
.put
(
&*
k2
,
&*
v2
);
assert
!
(
p
.is_ok
());
let
p
=
db
.put
(
&*
k3
,
&*
v3
);
assert
!
(
p
.is_ok
());
let
expected
=
vec!
[(
cba
(
&
k1
),
cba
(
&
v1
)),
(
cba
(
&
k2
),
cba
(
&
v2
)),
(
cba
(
&
k3
),
cba
(
&
v3
))];
{
let
iterator1
=
db
.iterator
(
IteratorMode
::
Start
);
assert_eq!
(
iterator1
.collect
::
<
Vec
<
_
>>
(),
expected
);
}
// Test that it's idempotent
{
let
iterator1
=
db
.iterator
(
IteratorMode
::
Start
);
assert_eq!
(
iterator1
.collect
::
<
Vec
<
_
>>
(),
expected
);
}
{
let
iterator1
=
db
.iterator
(
IteratorMode
::
Start
);
assert_eq!
(
iterator1
.collect
::
<
Vec
<
_
>>
(),
expected
);
}
{
let
iterator1
=
db
.iterator
(
IteratorMode
::
Start
);
assert_eq!
(
iterator1
.collect
::
<
Vec
<
_
>>
(),
expected
);
}
// Test it in reverse a few times
{
let
iterator1
=
db
.iterator
(
IteratorMode
::
End
);
let
mut
tmp_vec
=
iterator1
.collect
::
<
Vec
<
_
>>
();
tmp_vec
.reverse
();
assert_eq!
(
tmp_vec
,
expected
);
}
{
let
iterator1
=
db
.iterator
(
IteratorMode
::
End
);
let
mut
tmp_vec
=
iterator1
.collect
::
<
Vec
<
_
>>
();
tmp_vec
.reverse
();
assert_eq!
(
tmp_vec
,
expected
);
}
{
let
iterator1
=
db
.iterator
(
IteratorMode
::
End
);
let
mut
tmp_vec
=
iterator1
.collect
::
<
Vec
<
_
>>
();
tmp_vec
.reverse
();
assert_eq!
(
tmp_vec
,
expected
);
}
{
let
iterator1
=
db
.iterator
(
IteratorMode
::
End
);
let
mut
tmp_vec
=
iterator1
.collect
::
<
Vec
<
_
>>
();
tmp_vec
.reverse
();
assert_eq!
(
tmp_vec
,
expected
);
}
{
let
iterator1
=
db
.iterator
(
IteratorMode
::
End
);
let
mut
tmp_vec
=
iterator1
.collect
::
<
Vec
<
_
>>
();
tmp_vec
.reverse
();
assert_eq!
(
tmp_vec
,
expected
);
}
// Try it forward again
{
let
iterator1
=
db
.iterator
(
IteratorMode
::
Start
);
assert_eq!
(
iterator1
.collect
::
<
Vec
<
_
>>
(),
expected
);
}
{
let
iterator1
=
db
.iterator
(
IteratorMode
::
Start
);
assert_eq!
(
iterator1
.collect
::
<
Vec
<
_
>>
(),
expected
);
}
let
old_iterator
=
db
.iterator
(
IteratorMode
::
Start
);
let
p
=
db
.put
(
&*
k4
,
&*
v4
);
assert
!
(
p
.is_ok
());
let
expected2
=
vec!
[(
cba
(
&
k1
),
cba
(
&
v1
)),
(
cba
(
&
k2
),
cba
(
&
v2
)),
(
cba
(
&
k3
),
cba
(
&
v3
)),
(
cba
(
&
k4
),
cba
(
&
v4
))];
{
assert_eq!
(
old_iterator
.collect
::
<
Vec
<
_
>>
(),
expected
);
}
{
let
iterator1
=
db
.iterator
(
IteratorMode
::
Start
);
assert_eq!
(
iterator1
.collect
::
<
Vec
<
_
>>
(),
expected2
);
}
{
let
iterator1
=
db
.iterator
(
IteratorMode
::
From
(
b
"k2"
,
Direction
::
Forward
));
let
expected
=
vec!
[(
cba
(
&
k2
),
cba
(
&
v2
)),
(
cba
(
&
k3
),
cba
(
&
v3
)),
(
cba
(
&
k4
),
cba
(
&
v4
))];
assert_eq!
(
iterator1
.collect
::
<
Vec
<
_
>>
(),
expected
);
}
{
let
iterator1
=
db
.iterator
(
IteratorMode
::
From
(
b
"k2"
,
Direction
::
Reverse
));
let
expected
=
vec!
[(
cba
(
&
k2
),
cba
(
&
v2
)),
(
cba
(
&
k1
),
cba
(
&
v1
))];
assert_eq!
(
iterator1
.collect
::
<
Vec
<
_
>>
(),
expected
);
}
}
let
opts
=
Options
::
new
();
assert
!
(
DB
::
destroy
(
&
opts
,
path
)
.is_ok
());
let
path
=
TempDir
::
new
(
"_rust_rocksdb_iteratortest"
)
.expect
(
""
);
let
k1
:
Box
<
[
u8
]
>
=
b
"k1"
.to_vec
()
.into_boxed_slice
();
let
k2
:
Box
<
[
u8
]
>
=
b
"k2"
.to_vec
()
.into_boxed_slice
();
let
k3
:
Box
<
[
u8
]
>
=
b
"k3"
.to_vec
()
.into_boxed_slice
();
let
k4
:
Box
<
[
u8
]
>
=
b
"k4"
.to_vec
()
.into_boxed_slice
();
let
v1
:
Box
<
[
u8
]
>
=
b
"v1111"
.to_vec
()
.into_boxed_slice
();
let
v2
:
Box
<
[
u8
]
>
=
b
"v2222"
.to_vec
()
.into_boxed_slice
();
let
v3
:
Box
<
[
u8
]
>
=
b
"v3333"
.to_vec
()
.into_boxed_slice
();
let
v4
:
Box
<
[
u8
]
>
=
b
"v4444"
.to_vec
()
.into_boxed_slice
();
let
db
=
DB
::
open_default
(
path
.path
()
.to_str
()
.unwrap
())
.unwrap
();
let
p
=
db
.put
(
&*
k1
,
&*
v1
);
assert
!
(
p
.is_ok
());
let
p
=
db
.put
(
&*
k2
,
&*
v2
);
assert
!
(
p
.is_ok
());
let
p
=
db
.put
(
&*
k3
,
&*
v3
);
assert
!
(
p
.is_ok
());
let
expected
=
vec!
[(
cba
(
&
k1
),
cba
(
&
v1
)),
(
cba
(
&
k2
),
cba
(
&
v2
)),
(
cba
(
&
k3
),
cba
(
&
v3
))];
let
iterator1
=
db
.iterator
(
IteratorMode
::
Start
);
assert_eq!
(
iterator1
.collect
::
<
Vec
<
_
>>
(),
expected
);
// Test that it's idempotent
let
iterator1
=
db
.iterator
(
IteratorMode
::
Start
);
assert_eq!
(
iterator1
.collect
::
<
Vec
<
_
>>
(),
expected
);
let
iterator1
=
db
.iterator
(
IteratorMode
::
Start
);
assert_eq!
(
iterator1
.collect
::
<
Vec
<
_
>>
(),
expected
);
let
iterator1
=
db
.iterator
(
IteratorMode
::
Start
);
assert_eq!
(
iterator1
.collect
::
<
Vec
<
_
>>
(),
expected
);
// Test it in reverse a few times
let
iterator1
=
db
.iterator
(
IteratorMode
::
End
);
let
mut
tmp_vec
=
iterator1
.collect
::
<
Vec
<
_
>>
();
tmp_vec
.reverse
();
assert_eq!
(
tmp_vec
,
expected
);
let
iterator1
=
db
.iterator
(
IteratorMode
::
End
);
let
mut
tmp_vec
=
iterator1
.collect
::
<
Vec
<
_
>>
();
tmp_vec
.reverse
();
assert_eq!
(
tmp_vec
,
expected
);
let
iterator1
=
db
.iterator
(
IteratorMode
::
End
);
let
mut
tmp_vec
=
iterator1
.collect
::
<
Vec
<
_
>>
();
tmp_vec
.reverse
();
assert_eq!
(
tmp_vec
,
expected
);
let
iterator1
=
db
.iterator
(
IteratorMode
::
End
);
let
mut
tmp_vec
=
iterator1
.collect
::
<
Vec
<
_
>>
();
tmp_vec
.reverse
();
assert_eq!
(
tmp_vec
,
expected
);
let
iterator1
=
db
.iterator
(
IteratorMode
::
End
);
let
mut
tmp_vec
=
iterator1
.collect
::
<
Vec
<
_
>>
();
tmp_vec
.reverse
();
assert_eq!
(
tmp_vec
,
expected
);
// Try it forward again
let
iterator1
=
db
.iterator
(
IteratorMode
::
Start
);
assert_eq!
(
iterator1
.collect
::
<
Vec
<
_
>>
(),
expected
);
let
iterator1
=
db
.iterator
(
IteratorMode
::
Start
);
assert_eq!
(
iterator1
.collect
::
<
Vec
<
_
>>
(),
expected
);
let
old_iterator
=
db
.iterator
(
IteratorMode
::
Start
);
let
p
=
db
.put
(
&*
k4
,
&*
v4
);
assert
!
(
p
.is_ok
());
let
expected2
=
vec!
[(
cba
(
&
k1
),
cba
(
&
v1
)),
(
cba
(
&
k2
),
cba
(
&
v2
)),
(
cba
(
&
k3
),
cba
(
&
v3
)),
(
cba
(
&
k4
),
cba
(
&
v4
))];
assert_eq!
(
old_iterator
.collect
::
<
Vec
<
_
>>
(),
expected
);
let
iterator1
=
db
.iterator
(
IteratorMode
::
Start
);
assert_eq!
(
iterator1
.collect
::
<
Vec
<
_
>>
(),
expected2
);
let
iterator1
=
db
.iterator
(
IteratorMode
::
From
(
b
"k2"
,
Direction
::
Forward
));
let
expected
=
vec!
[(
cba
(
&
k2
),
cba
(
&
v2
)),
(
cba
(
&
k3
),
cba
(
&
v3
)),
(
cba
(
&
k4
),
cba
(
&
v4
))];
assert_eq!
(
iterator1
.collect
::
<
Vec
<
_
>>
(),
expected
);
let
iterator1
=
db
.iterator
(
IteratorMode
::
From
(
b
"k2"
,
Direction
::
Reverse
));
let
expected
=
vec!
[(
cba
(
&
k2
),
cba
(
&
v2
)),
(
cba
(
&
k1
),
cba
(
&
v1
))];
assert_eq!
(
iterator1
.collect
::
<
Vec
<
_
>>
(),
expected
);
}
test/test_multithreaded.rs
View file @
bde37b67
use
rocksdb
::{
DB
,
Options
,
Writable
};
use
rocksdb
::{
DB
,
Writable
};
use
std
::
thread
;
use
std
::
sync
::
Arc
;
use
tempdir
::
TempDir
;
const
N
:
usize
=
100
_000
;
#[test]
pub
fn
test_multithreaded
()
{
let
path
=
"_rust_rocksdb_multithreadtest"
;
{
let
db
=
DB
::
open_default
(
path
)
.unwrap
();
let
db
=
Arc
::
new
(
db
);
let
path
=
TempDir
::
new
(
"_rust_rocksdb_multithreadtest"
)
.expect
(
""
);
db
.put
(
b
"key"
,
b
"value1"
)
.unwrap
();
let
db
=
DB
::
open_default
(
path
.path
()
.to_str
()
.unwrap
())
.unwrap
();
let
db
=
Arc
::
new
(
db
);
let
db1
=
db
.clone
();
let
j1
=
thread
::
spawn
(
move
||
{
for
_
in
1
..
N
{
db1
.put
(
b
"key"
,
b
"value1"
)
.unwrap
();
}
});
db
.put
(
b
"key"
,
b
"value1"
)
.unwrap
();
let
db2
=
db
.clone
();
let
j2
=
thread
::
spawn
(
move
||
{
for
_
in
1
..
N
{
db2
.put
(
b
"key"
,
b
"value2"
)
.unwrap
();
}
});
let
db3
=
db
.clone
();
let
j3
=
thread
::
spawn
(
move
||
{
for
_
in
1
..
N
{
match
db3
.get
(
b
"key"
)
{
Ok
(
Some
(
v
))
=>
{
if
&
v
[
..
]
!=
b
"value1"
&&
&
v
[
..
]
!=
b
"value2"
{
assert
!
(
false
);
}
}
_
=>
{
let
db1
=
db
.clone
();
let
j1
=
thread
::
spawn
(
move
||
{
for
_
in
1
..
N
{
db1
.put
(
b
"key"
,
b
"value1"
)
.unwrap
();
}
});
let
db2
=
db
.clone
();
let
j2
=
thread
::
spawn
(
move
||
{
for
_
in
1
..
N
{
db2
.put
(
b
"key"
,
b
"value2"
)
.unwrap
();
}
});
let
db3
=
db
.clone
();
let
j3
=
thread
::
spawn
(
move
||
{
for
_
in
1
..
N
{
match
db3
.get
(
b
"key"
)
{
Ok
(
Some
(
v
))
=>
{
if
&
v
[
..
]
!=
b
"value1"
&&
&
v
[
..
]
!=
b
"value2"
{
assert
!
(
false
);
}
}
_
=>
{
assert
!
(
false
);
}
}
});
}
});
j1
.join
()
.unwrap
();
j2
.join
()
.unwrap
();
j3
.join
()
.unwrap
();
}
assert
!
(
DB
::
destroy
(
&
Options
::
new
(),
path
)
.is_ok
());
j1
.join
()
.unwrap
();
j2
.join
()
.unwrap
();
j3
.join
()
.unwrap
();
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment