Unverified Commit 348d747c authored by Huachao Huang's avatar Huachao Huang Committed by GitHub

Add mem env and sequential file to gen sst in memory (#161)

parent f6788b64
......@@ -144,6 +144,7 @@ struct crocksdb_cache_t { shared_ptr<Cache> rep; };
struct crocksdb_livefiles_t { std::vector<LiveFileMetaData> rep; };
struct crocksdb_column_family_handle_t { ColumnFamilyHandle* rep; };
struct crocksdb_envoptions_t { EnvOptions rep; };
struct crocksdb_sequential_file_t { SequentialFile* rep; };
struct crocksdb_ingestexternalfileoptions_t { IngestExternalFileOptions rep; };
struct crocksdb_sstfilewriter_t { SstFileWriter* rep; };
struct crocksdb_externalsstfileinfo_t { ExternalSstFileInfo rep; };
......@@ -2866,6 +2867,37 @@ crocksdb_envoptions_t* crocksdb_envoptions_create() {
void crocksdb_envoptions_destroy(crocksdb_envoptions_t* opt) { delete opt; }
crocksdb_sequential_file_t* crocksdb_sequential_file_create(
crocksdb_env_t* env, const char* path,
const crocksdb_envoptions_t* opts, char **errptr) {
std::unique_ptr<SequentialFile> result;
if (SaveError(errptr, env->rep->NewSequentialFile(path, &result, opts->rep))) {
return nullptr;
}
auto file = new crocksdb_sequential_file_t;
file->rep = result.release();
return file;
}
size_t crocksdb_sequential_file_read(crocksdb_sequential_file_t* file,
size_t n, char* buf, char** errptr) {
Slice result;
if (SaveError(errptr, file->rep->Read(n, &result, buf))) {
return 0;
}
return result.size();
}
void crocksdb_sequential_file_skip(crocksdb_sequential_file_t* file,
size_t n, char** errptr) {
SaveError(errptr, file->rep->Skip(n));
}
void crocksdb_sequential_file_destroy(crocksdb_sequential_file_t* file) {
delete file->rep;
delete file;
}
crocksdb_sstfilewriter_t* crocksdb_sstfilewriter_create(
const crocksdb_envoptions_t* env, const crocksdb_options_t* io_options) {
crocksdb_sstfilewriter_t* writer = new crocksdb_sstfilewriter_t;
......
......@@ -107,6 +107,7 @@ typedef struct crocksdb_universal_compaction_options_t crocksdb_universal_compac
typedef struct crocksdb_livefiles_t crocksdb_livefiles_t;
typedef struct crocksdb_column_family_handle_t crocksdb_column_family_handle_t;
typedef struct crocksdb_envoptions_t crocksdb_envoptions_t;
typedef struct crocksdb_sequential_file_t crocksdb_sequential_file_t;
typedef struct crocksdb_ingestexternalfileoptions_t crocksdb_ingestexternalfileoptions_t;
typedef struct crocksdb_sstfilewriter_t crocksdb_sstfilewriter_t;
typedef struct crocksdb_externalsstfileinfo_t crocksdb_externalsstfileinfo_t;
......@@ -1197,6 +1198,16 @@ extern C_ROCKSDB_LIBRARY_API crocksdb_envoptions_t* crocksdb_envoptions_create()
extern C_ROCKSDB_LIBRARY_API void crocksdb_envoptions_destroy(
crocksdb_envoptions_t* opt);
extern C_ROCKSDB_LIBRARY_API crocksdb_sequential_file_t*
crocksdb_sequential_file_create(crocksdb_env_t* env, const char* path,
const crocksdb_envoptions_t* opts, char** errptr);
extern C_ROCKSDB_LIBRARY_API size_t crocksdb_sequential_file_read(
crocksdb_sequential_file_t*, size_t n, char* buf, char** errptr);
extern C_ROCKSDB_LIBRARY_API void crocksdb_sequential_file_skip(
crocksdb_sequential_file_t*, size_t n, char** errptr);
extern C_ROCKSDB_LIBRARY_API void crocksdb_sequential_file_destroy(
crocksdb_sequential_file_t*);
/* SstFile */
extern C_ROCKSDB_LIBRARY_API crocksdb_sstfilewriter_t*
......
......@@ -59,6 +59,8 @@ pub enum DBCompactionJobInfo {}
pub enum DBIngestionInfo {}
pub enum DBEventListener {}
pub enum DBKeyVersions {}
pub enum DBEnv {}
pub enum DBSequentialFile {}
pub fn new_bloom_filter(bits: c_int) -> *mut DBFilterPolicy {
unsafe { crocksdb_filterpolicy_create_bloom(bits) }
......@@ -104,6 +106,7 @@ pub enum DBCompactionStyle {
Level = 0,
Universal = 1,
Fifo = 2,
None = 3,
}
#[derive(Debug)]
......@@ -376,6 +379,7 @@ extern "C" {
options: *mut Options,
memtable_memory_budget: c_int,
);
pub fn crocksdb_options_set_env(options: *mut Options, env: *mut DBEnv);
pub fn crocksdb_options_set_compaction_filter(
options: *mut Options,
filter: *mut DBCompactionFilter,
......@@ -1050,10 +1054,35 @@ extern "C" {
);
pub fn crocksdb_compactionfilter_destroy(filter: *mut DBCompactionFilter);
// Env
pub fn crocksdb_create_default_env() -> *mut DBEnv;
pub fn crocksdb_create_mem_env() -> *mut DBEnv;
pub fn crocksdb_env_destroy(env: *mut DBEnv);
// EnvOptions
pub fn crocksdb_envoptions_create() -> *mut EnvOptions;
pub fn crocksdb_envoptions_destroy(opt: *mut EnvOptions);
// SequentialFile
pub fn crocksdb_sequential_file_create(
env: *mut DBEnv,
path: *const c_char,
opts: *mut EnvOptions,
err: *mut *mut c_char,
) -> *mut DBSequentialFile;
pub fn crocksdb_sequential_file_read(
file: *mut DBSequentialFile,
n: size_t,
buf: *mut u8,
err: *mut *mut c_char,
) -> size_t;
pub fn crocksdb_sequential_file_skip(
file: *mut DBSequentialFile,
n: size_t,
err: *mut *mut c_char,
);
pub fn crocksdb_sequential_file_destroy(file: *mut DBSequentialFile);
// IngestExternalFileOptions
pub fn crocksdb_ingestexternalfileoptions_create() -> *mut IngestExternalFileOptions;
pub fn crocksdb_ingestexternalfileoptions_set_move_files(
......
......@@ -37,8 +37,8 @@ pub use librocksdb_sys::{self as crocksdb_ffi, new_bloom_filter, CompactionPrior
DBCompactionStyle, DBCompressionType, DBEntryType, DBInfoLogLevel,
DBRecoveryMode, DBStatisticsHistogramType, DBStatisticsTickerType};
pub use merge_operator::MergeOperands;
pub use rocksdb::{BackupEngine, CFHandle, DBIterator, DBVector, Kv, Range, SeekKey, SstFileWriter,
Writable, WriteBatch, DB};
pub use rocksdb::{BackupEngine, CFHandle, DBIterator, DBVector, Env, Kv, Range, SeekKey,
SequentialFile, SstFileWriter, Writable, WriteBatch, DB};
pub use rocksdb_options::{BlockBasedOptions, ColumnFamilyOptions, CompactOptions, DBOptions,
EnvOptions, FifoCompactionOptions, HistogramData,
IngestExternalFileOptions, RateLimiter, ReadOptions, RestoreOptions,
......
......@@ -13,9 +13,9 @@
// limitations under the License.
//
use crocksdb_ffi::{self, DBBackupEngine, DBCFHandle, DBCompressionType, DBInstance,
DBPinnableSlice, DBStatisticsHistogramType, DBStatisticsTickerType,
DBWriteBatch};
use crocksdb_ffi::{self, DBBackupEngine, DBCFHandle, DBCompressionType, DBEnv, DBInstance,
DBPinnableSlice, DBSequentialFile, DBStatisticsHistogramType,
DBStatisticsTickerType, DBWriteBatch};
use libc::{self, c_int, c_void, size_t};
use rocksdb_options::{ColumnFamilyDescriptor, ColumnFamilyOptions, CompactOptions, DBOptions,
EnvOptions, FlushOptions, HistogramData, IngestExternalFileOptions,
......@@ -25,6 +25,7 @@ use std::collections::BTreeMap;
use std::collections::btree_map::Entry;
use std::ffi::{CStr, CString};
use std::fmt::{self, Debug, Formatter};
use std::io;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::str::from_utf8;
......@@ -1850,6 +1851,100 @@ pub fn supported_compression() -> Vec<DBCompressionType> {
}
}
pub struct Env {
pub inner: *mut DBEnv,
}
impl Default for Env {
fn default() -> Env {
unsafe {
Env {
inner: crocksdb_ffi::crocksdb_create_default_env(),
}
}
}
}
impl Env {
pub fn new_mem() -> Env {
unsafe {
Env {
inner: crocksdb_ffi::crocksdb_create_mem_env(),
}
}
}
pub fn new_sequential_file(
&self,
path: &str,
opts: EnvOptions,
) -> Result<SequentialFile, String> {
unsafe {
let file_path = CString::new(path).unwrap();
let file = ffi_try!(crocksdb_sequential_file_create(
self.inner,
file_path.as_ptr(),
opts.inner
));
Ok(SequentialFile::new(file))
}
}
}
impl Drop for Env {
fn drop(&mut self) {
unsafe {
crocksdb_ffi::crocksdb_env_destroy(self.inner);
}
}
}
pub struct SequentialFile {
inner: *mut DBSequentialFile,
}
impl SequentialFile {
fn new(inner: *mut DBSequentialFile) -> SequentialFile {
SequentialFile { inner: inner }
}
pub fn skip(&mut self, n: usize) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_sequential_file_skip(self.inner, n as size_t));
Ok(())
}
}
}
impl io::Read for SequentialFile {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
unsafe {
let mut err = ptr::null_mut();
let size = crocksdb_ffi::crocksdb_sequential_file_read(
self.inner,
buf.len() as size_t,
buf.as_mut_ptr(),
&mut err,
);
if !err.is_null() {
return Err(io::Error::new(
io::ErrorKind::Other,
crocksdb_ffi::error_message(err),
));
}
Ok(size as usize)
}
}
}
impl Drop for SequentialFile {
fn drop(&mut self) {
unsafe {
crocksdb_ffi::crocksdb_sequential_file_destroy(self.inner);
}
}
}
#[cfg(test)]
mod test {
use super::*;
......
......@@ -23,10 +23,12 @@ use event_listener::{new_event_listener, EventListener};
use libc::{self, c_double, c_int, c_uchar, c_void, size_t};
use merge_operator::{self, full_merge_callback, partial_merge_callback, MergeOperatorCallback};
use merge_operator::MergeFn;
use rocksdb::Env;
use slice_transform::{new_slice_transform, SliceTransform};
use std::ffi::{CStr, CString};
use std::mem;
use std::path::Path;
use std::sync::Arc;
use table_properties_collector_factory::{new_table_properties_collector_factory,
TablePropertiesCollectorFactory};
......@@ -824,6 +826,7 @@ impl DBOptions {
pub struct ColumnFamilyOptions {
pub inner: *mut Options,
env: Option<Arc<Env>>,
filter: Option<CompactionFilterHandle>,
}
......@@ -845,6 +848,7 @@ impl Default for ColumnFamilyOptions {
);
ColumnFamilyOptions {
inner: opts,
env: None,
filter: None,
}
}
......@@ -859,6 +863,7 @@ impl Clone for ColumnFamilyOptions {
assert!(!opts.is_null());
ColumnFamilyOptions {
inner: opts,
env: self.env.clone(),
filter: None,
}
}
......@@ -877,6 +882,7 @@ impl ColumnFamilyOptions {
);
ColumnFamilyOptions {
inner: inner,
env: None,
filter: None,
}
}
......@@ -890,6 +896,13 @@ impl ColumnFamilyOptions {
}
}
pub fn set_env(&mut self, env: Arc<Env>) {
unsafe {
crocksdb_ffi::crocksdb_options_set_env(self.inner, env.inner);
self.env = Some(env);
}
}
/// Set compaction filter.
///
/// filter will be dropped when this option is dropped or a new filter is
......
......@@ -13,6 +13,8 @@
use rocksdb::*;
use std::fs;
use std::io::{Read, Write};
use std::sync::Arc;
use tempdir::TempDir;
pub fn gen_sst(
......@@ -380,3 +382,49 @@ fn test_ingest_simulate_real_world() {
);
}
}
#[test]
fn test_mem_sst_file_writer() {
let path = TempDir::new("_rust_mem_sst_file_writer").expect("");
let db = create_default_database(&path);
let env = Arc::new(Env::new_mem());
let mut opts = db.get_options().clone();
opts.set_env(env.clone());
let mem_sst_path = path.path().join("mem_sst");
let mem_sst_str = mem_sst_path.to_str().unwrap();
gen_sst(
opts,
None,
mem_sst_str,
&[(b"k1", b"v1"), (b"k2", b"v2"), (b"k3", b"v3")],
);
// Check that the file is not on disk.
assert!(!mem_sst_path.exists());
let mut buf = Vec::new();
let mut sst = env.new_sequential_file(mem_sst_str, EnvOptions::new())
.unwrap();
sst.read_to_end(&mut buf).unwrap();
// Write the data to a temp file.
let sst_path = path.path().join("temp_sst_path");
fs::File::create(&sst_path)
.unwrap()
.write_all(&buf)
.unwrap();
// Ingest the temp file to check the test kvs.
let ingest_opts = IngestExternalFileOptions::new();
db.ingest_external_file(&ingest_opts, &[sst_path.to_str().unwrap()])
.unwrap();
check_kv(
&db,
None,
&[
(b"k1", Some(b"v1")),
(b"k2", Some(b"v2")),
(b"k3", Some(b"v3")),
],
);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment