Commit e4d5afe9 authored by kennytm's avatar kennytm Committed by Neil Shen

Export SstFileReader (#347)

parent 84451e36
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
#include "rocksdb/perf_context.h" #include "rocksdb/perf_context.h"
#include "rocksdb/rate_limiter.h" #include "rocksdb/rate_limiter.h"
#include "rocksdb/slice_transform.h" #include "rocksdb/slice_transform.h"
#include "rocksdb/sst_file_reader.h"
#include "rocksdb/statistics.h" #include "rocksdb/statistics.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/table.h" #include "rocksdb/table.h"
...@@ -109,6 +110,7 @@ using rocksdb::Slice; ...@@ -109,6 +110,7 @@ using rocksdb::Slice;
using rocksdb::SliceParts; using rocksdb::SliceParts;
using rocksdb::SliceTransform; using rocksdb::SliceTransform;
using rocksdb::Snapshot; using rocksdb::Snapshot;
using rocksdb::SstFileReader;
using rocksdb::SstFileWriter; using rocksdb::SstFileWriter;
using rocksdb::ExternalSstFileInfo; using rocksdb::ExternalSstFileInfo;
using rocksdb::Status; using rocksdb::Status;
...@@ -220,6 +222,7 @@ struct crocksdb_column_family_handle_t { ColumnFamilyHandle* rep; }; ...@@ -220,6 +222,7 @@ struct crocksdb_column_family_handle_t { ColumnFamilyHandle* rep; };
struct crocksdb_envoptions_t { EnvOptions rep; }; struct crocksdb_envoptions_t { EnvOptions rep; };
struct crocksdb_sequential_file_t { SequentialFile* rep; }; struct crocksdb_sequential_file_t { SequentialFile* rep; };
struct crocksdb_ingestexternalfileoptions_t { IngestExternalFileOptions rep; }; struct crocksdb_ingestexternalfileoptions_t { IngestExternalFileOptions rep; };
struct crocksdb_sstfilereader_t { SstFileReader* rep; };
struct crocksdb_sstfilewriter_t { SstFileWriter* rep; }; struct crocksdb_sstfilewriter_t { SstFileWriter* rep; };
struct crocksdb_externalsstfileinfo_t { ExternalSstFileInfo rep; }; struct crocksdb_externalsstfileinfo_t { ExternalSstFileInfo rep; };
struct crocksdb_ratelimiter_t { RateLimiter* rep; }; struct crocksdb_ratelimiter_t { RateLimiter* rep; };
...@@ -3505,6 +3508,42 @@ void crocksdb_sequential_file_destroy(crocksdb_sequential_file_t* file) { ...@@ -3505,6 +3508,42 @@ void crocksdb_sequential_file_destroy(crocksdb_sequential_file_t* file) {
delete file; delete file;
} }
crocksdb_sstfilereader_t* crocksdb_sstfilereader_create(
const crocksdb_options_t* io_options) {
auto reader = new crocksdb_sstfilereader_t;
reader->rep = new SstFileReader(io_options->rep);
return reader;
}
void crocksdb_sstfilereader_open(crocksdb_sstfilereader_t* reader,
const char* name, char** errptr) {
SaveError(errptr, reader->rep->Open(std::string(name)));
}
crocksdb_iterator_t* crocksdb_sstfilereader_new_iterator(
crocksdb_sstfilereader_t* reader, const crocksdb_readoptions_t* options) {
auto it = new crocksdb_iterator_t;
it->rep = reader->rep->NewIterator(options->rep);
return it;
}
void crocksdb_sstfilereader_read_table_properties(
const crocksdb_sstfilereader_t* reader,
void* ctx, void (*cb)(void*, const crocksdb_table_properties_t*)) {
auto props = reader->rep->GetTableProperties();
cb(ctx, reinterpret_cast<const crocksdb_table_properties_t*>(props.get()));
}
void crocksdb_sstfilereader_verify_checksum(crocksdb_sstfilereader_t* reader,
char** errptr) {
SaveError(errptr, reader->rep->VerifyChecksum());
}
void crocksdb_sstfilereader_destroy(crocksdb_sstfilereader_t* reader) {
delete reader->rep;
delete reader;
}
crocksdb_sstfilewriter_t* crocksdb_sstfilewriter_create( crocksdb_sstfilewriter_t* crocksdb_sstfilewriter_create(
const crocksdb_envoptions_t* env, const crocksdb_options_t* io_options) { const crocksdb_envoptions_t* env, const crocksdb_options_t* io_options) {
crocksdb_sstfilewriter_t* writer = new crocksdb_sstfilewriter_t; crocksdb_sstfilewriter_t* writer = new crocksdb_sstfilewriter_t;
...@@ -5320,7 +5359,7 @@ void ctitandb_delete_files_in_range( ...@@ -5320,7 +5359,7 @@ void ctitandb_delete_files_in_range(
RangePtr range( RangePtr range(
start_key ? (a = Slice(start_key, start_key_len), &a) : nullptr, start_key ? (a = Slice(start_key, start_key_len), &a) : nullptr,
limit_key ? (b = Slice(limit_key, limit_key_len), &b) : nullptr); limit_key ? (b = Slice(limit_key, limit_key_len), &b) : nullptr);
SaveError( SaveError(
errptr, errptr,
static_cast<TitanDB*>(db->rep)->DeleteFilesInRanges( static_cast<TitanDB*>(db->rep)->DeleteFilesInRanges(
...@@ -5338,7 +5377,7 @@ void ctitandb_delete_files_in_range_cf( ...@@ -5338,7 +5377,7 @@ void ctitandb_delete_files_in_range_cf(
RangePtr range( RangePtr range(
start_key ? (a = Slice(start_key, start_key_len), &a) : nullptr, start_key ? (a = Slice(start_key, start_key_len), &a) : nullptr,
limit_key ? (b = Slice(limit_key, limit_key_len), &b) : nullptr); limit_key ? (b = Slice(limit_key, limit_key_len), &b) : nullptr);
SaveError( SaveError(
errptr, errptr,
static_cast<TitanDB*>(db->rep)->DeleteFilesInRanges( static_cast<TitanDB*>(db->rep)->DeleteFilesInRanges(
......
...@@ -113,6 +113,7 @@ typedef struct crocksdb_column_family_handle_t crocksdb_column_family_handle_t; ...@@ -113,6 +113,7 @@ typedef struct crocksdb_column_family_handle_t crocksdb_column_family_handle_t;
typedef struct crocksdb_envoptions_t crocksdb_envoptions_t; typedef struct crocksdb_envoptions_t crocksdb_envoptions_t;
typedef struct crocksdb_sequential_file_t crocksdb_sequential_file_t; typedef struct crocksdb_sequential_file_t crocksdb_sequential_file_t;
typedef struct crocksdb_ingestexternalfileoptions_t crocksdb_ingestexternalfileoptions_t; typedef struct crocksdb_ingestexternalfileoptions_t crocksdb_ingestexternalfileoptions_t;
typedef struct crocksdb_sstfilereader_t crocksdb_sstfilereader_t;
typedef struct crocksdb_sstfilewriter_t crocksdb_sstfilewriter_t; typedef struct crocksdb_sstfilewriter_t crocksdb_sstfilewriter_t;
typedef struct crocksdb_externalsstfileinfo_t crocksdb_externalsstfileinfo_t; typedef struct crocksdb_externalsstfileinfo_t crocksdb_externalsstfileinfo_t;
typedef struct crocksdb_ratelimiter_t crocksdb_ratelimiter_t; typedef struct crocksdb_ratelimiter_t crocksdb_ratelimiter_t;
...@@ -1391,6 +1392,29 @@ extern C_ROCKSDB_LIBRARY_API void crocksdb_sequential_file_destroy( ...@@ -1391,6 +1392,29 @@ extern C_ROCKSDB_LIBRARY_API void crocksdb_sequential_file_destroy(
/* SstFile */ /* SstFile */
extern C_ROCKSDB_LIBRARY_API crocksdb_sstfilereader_t*
crocksdb_sstfilereader_create(const crocksdb_options_t* io_options);
extern C_ROCKSDB_LIBRARY_API void
crocksdb_sstfilereader_open(crocksdb_sstfilereader_t* reader,
const char* name, char** errptr);
extern C_ROCKSDB_LIBRARY_API crocksdb_iterator_t*
crocksdb_sstfilereader_new_iterator(crocksdb_sstfilereader_t* reader,
const crocksdb_readoptions_t* options);
extern C_ROCKSDB_LIBRARY_API void
crocksdb_sstfilereader_read_table_properties(
const crocksdb_sstfilereader_t* reader,
void* ctx, void (*cb)(void*, const crocksdb_table_properties_t*));
extern C_ROCKSDB_LIBRARY_API void
crocksdb_sstfilereader_verify_checksum(crocksdb_sstfilereader_t* reader,
char** errptr);
extern C_ROCKSDB_LIBRARY_API void
crocksdb_sstfilereader_destroy(crocksdb_sstfilereader_t* reader);
extern C_ROCKSDB_LIBRARY_API crocksdb_sstfilewriter_t* extern C_ROCKSDB_LIBRARY_API crocksdb_sstfilewriter_t*
crocksdb_sstfilewriter_create(const crocksdb_envoptions_t* env, crocksdb_sstfilewriter_create(const crocksdb_envoptions_t* env,
const crocksdb_options_t* io_options); const crocksdb_options_t* io_options);
...@@ -2013,7 +2037,7 @@ extern C_ROCKSDB_LIBRARY_API void ctitandb_options_destroy(ctitandb_options_t*); ...@@ -2013,7 +2037,7 @@ extern C_ROCKSDB_LIBRARY_API void ctitandb_options_destroy(ctitandb_options_t*);
extern C_ROCKSDB_LIBRARY_API ctitandb_options_t* ctitandb_options_copy( extern C_ROCKSDB_LIBRARY_API ctitandb_options_t* ctitandb_options_copy(
ctitandb_options_t*); ctitandb_options_t*);
extern C_ROCKSDB_LIBRARY_API ctitandb_options_t* ctitandb_get_titan_options_cf( extern C_ROCKSDB_LIBRARY_API ctitandb_options_t* ctitandb_get_titan_options_cf(
const crocksdb_t* db, crocksdb_column_family_handle_t* column_family); const crocksdb_t* db, crocksdb_column_family_handle_t* column_family);
...@@ -2076,7 +2100,7 @@ extern C_ROCKSDB_LIBRARY_API size_t ctitandb_options_get_blob_cache_usage( ...@@ -2076,7 +2100,7 @@ extern C_ROCKSDB_LIBRARY_API size_t ctitandb_options_get_blob_cache_usage(
extern C_ROCKSDB_LIBRARY_API void ctitandb_options_set_blob_cache_capacity( extern C_ROCKSDB_LIBRARY_API void ctitandb_options_set_blob_cache_capacity(
ctitandb_options_t* opt, size_t capacity, char **errptr); ctitandb_options_t* opt, size_t capacity, char **errptr);
extern C_ROCKSDB_LIBRARY_API size_t ctitandb_options_get_blob_cache_capacity( extern C_ROCKSDB_LIBRARY_API size_t ctitandb_options_get_blob_cache_capacity(
ctitandb_options_t* opt); ctitandb_options_t* opt);
......
...@@ -41,6 +41,7 @@ pub enum DBComparator {} ...@@ -41,6 +41,7 @@ pub enum DBComparator {}
pub enum DBFlushOptions {} pub enum DBFlushOptions {}
pub enum DBCompactionFilter {} pub enum DBCompactionFilter {}
pub enum EnvOptions {} pub enum EnvOptions {}
pub enum SstFileReader {}
pub enum SstFileWriter {} pub enum SstFileWriter {}
pub enum ExternalSstFileInfo {} pub enum ExternalSstFileInfo {}
pub enum IngestExternalFileOptions {} pub enum IngestExternalFileOptions {}
...@@ -1283,6 +1284,33 @@ extern "C" { ...@@ -1283,6 +1284,33 @@ extern "C" {
); );
pub fn crocksdb_ingestexternalfileoptions_destroy(opt: *mut IngestExternalFileOptions); pub fn crocksdb_ingestexternalfileoptions_destroy(opt: *mut IngestExternalFileOptions);
// SstFileReader
pub fn crocksdb_sstfilereader_create(io_options: *const Options) -> *mut SstFileReader;
pub fn crocksdb_sstfilereader_open(
reader: *mut SstFileReader,
name: *const c_char,
err: *mut *mut c_char,
);
pub fn crocksdb_sstfilereader_new_iterator(
reader: *mut SstFileReader,
options: *const DBReadOptions,
) -> *mut DBIterator;
pub fn crocksdb_sstfilereader_read_table_properties(
reader: *const SstFileReader,
ctx: *mut c_void,
callback: extern "C" fn(*mut c_void, *const DBTableProperties),
);
pub fn crocksdb_sstfilereader_verify_checksum(
reader: *mut SstFileReader,
errptr: *mut *mut c_char,
);
pub fn crocksdb_sstfilereader_destroy(reader: *mut SstFileReader);
// SstFileWriter // SstFileWriter
pub fn crocksdb_sstfilewriter_create( pub fn crocksdb_sstfilewriter_create(
env: *mut EnvOptions, env: *mut EnvOptions,
......
...@@ -36,7 +36,7 @@ pub use perf_context::{get_perf_level, set_perf_level, IOStatsContext, PerfConte ...@@ -36,7 +36,7 @@ pub use perf_context::{get_perf_level, set_perf_level, IOStatsContext, PerfConte
pub use rocksdb::{ pub use rocksdb::{
load_latest_options, run_ldb_tool, set_external_sst_file_global_seq_no, BackupEngine, CFHandle, load_latest_options, run_ldb_tool, set_external_sst_file_global_seq_no, BackupEngine, CFHandle,
Cache, DBIterator, DBVector, Env, ExternalSstFileInfo, Kv, MemoryAllocator, Range, SeekKey, Cache, DBIterator, DBVector, Env, ExternalSstFileInfo, Kv, MemoryAllocator, Range, SeekKey,
SequentialFile, SstFileWriter, Writable, WriteBatch, DB, SequentialFile, SstFileReader, SstFileWriter, Writable, WriteBatch, DB,
}; };
pub use rocksdb_options::{ pub use rocksdb_options::{
BlockBasedOptions, CColumnFamilyDescriptor, ColumnFamilyOptions, CompactOptions, BlockBasedOptions, CColumnFamilyDescriptor, ColumnFamilyOptions, CompactOptions,
......
...@@ -37,7 +37,7 @@ use std::path::{Path, PathBuf}; ...@@ -37,7 +37,7 @@ use std::path::{Path, PathBuf};
use std::str::from_utf8; use std::str::from_utf8;
use std::sync::Arc; use std::sync::Arc;
use std::{fs, ptr, slice}; use std::{fs, ptr, slice};
use table_properties::TablePropertiesCollection; use table_properties::{TableProperties, TablePropertiesCollection};
use util::is_power_of_two; use util::is_power_of_two;
pub struct CFHandle { pub struct CFHandle {
...@@ -122,7 +122,7 @@ pub struct Snapshot<D: Deref<Target = DB>> { ...@@ -122,7 +122,7 @@ pub struct Snapshot<D: Deref<Target = DB>> {
snap: UnsafeSnap, snap: UnsafeSnap,
} }
pub struct DBIterator<D: Deref<Target = DB>> { pub struct DBIterator<D> {
_db: D, _db: D,
_readopts: ReadOptions, _readopts: ReadOptions,
inner: *mut crocksdb_ffi::DBIterator, inner: *mut crocksdb_ffi::DBIterator,
...@@ -184,7 +184,9 @@ impl<D: Deref<Target = DB>> DBIterator<D> { ...@@ -184,7 +184,9 @@ impl<D: Deref<Target = DB>> DBIterator<D> {
} }
} }
} }
}
impl<D> DBIterator<D> {
pub fn seek(&mut self, key: SeekKey) -> bool { pub fn seek(&mut self, key: SeekKey) -> bool {
unsafe { unsafe {
match key { match key {
...@@ -269,7 +271,7 @@ impl<D: Deref<Target = DB>> DBIterator<D> { ...@@ -269,7 +271,7 @@ impl<D: Deref<Target = DB>> DBIterator<D> {
pub type Kv = (Vec<u8>, Vec<u8>); pub type Kv = (Vec<u8>, Vec<u8>);
impl<'b, D: Deref<Target = DB>> Iterator for &'b mut DBIterator<D> { impl<'b, D> Iterator for &'b mut DBIterator<D> {
type Item = Kv; type Item = Kv;
fn next(&mut self) -> Option<Kv> { fn next(&mut self) -> Option<Kv> {
...@@ -281,7 +283,7 @@ impl<'b, D: Deref<Target = DB>> Iterator for &'b mut DBIterator<D> { ...@@ -281,7 +283,7 @@ impl<'b, D: Deref<Target = DB>> Iterator for &'b mut DBIterator<D> {
} }
} }
impl<D: Deref<Target = DB>> Drop for DBIterator<D> { impl<D> Drop for DBIterator<D> {
fn drop(&mut self) { fn drop(&mut self) {
unsafe { unsafe {
crocksdb_ffi::crocksdb_iter_destroy(self.inner); crocksdb_ffi::crocksdb_iter_destroy(self.inner);
...@@ -289,7 +291,7 @@ impl<D: Deref<Target = DB>> Drop for DBIterator<D> { ...@@ -289,7 +291,7 @@ impl<D: Deref<Target = DB>> Drop for DBIterator<D> {
} }
} }
unsafe impl<D: Deref<Target = DB> + Send> Send for DBIterator<D> {} unsafe impl<D: Send> Send for DBIterator<D> {}
unsafe impl<D: Deref<Target = DB> + Send + Sync> Send for Snapshot<D> {} unsafe impl<D: Deref<Target = DB> + Send + Sync> Send for Snapshot<D> {}
...@@ -2037,6 +2039,87 @@ impl Drop for BackupEngine { ...@@ -2037,6 +2039,87 @@ impl Drop for BackupEngine {
} }
} }
// SstFileReader is used to read sst files that are generated by DB or
// SstFileWriter.
pub struct SstFileReader {
inner: *mut crocksdb_ffi::SstFileReader,
_opt: ColumnFamilyOptions,
}
unsafe impl Send for SstFileReader {}
impl SstFileReader {
/// Creates a new SstFileReader.
pub fn new(opt: ColumnFamilyOptions) -> Self {
unsafe {
Self {
inner: crocksdb_ffi::crocksdb_sstfilereader_create(opt.inner),
_opt: opt,
}
}
}
/// Opens a local SST file for reading.
pub fn open(&mut self, name: &str) -> Result<(), String> {
let path =
CString::new(name.to_owned()).map_err(|e| format!("invalid path {}: {:?}", name, e))?;
unsafe {
Ok(ffi_try!(crocksdb_sstfilereader_open(
self.inner,
path.as_ptr()
)))
}
}
pub fn iter(&self) -> DBIterator<&Self> {
self.iter_opt(ReadOptions::new())
}
pub fn iter_opt(&self, readopts: ReadOptions) -> DBIterator<&Self> {
unsafe {
DBIterator {
inner: crocksdb_ffi::crocksdb_sstfilereader_new_iterator(
self.inner,
readopts.get_inner(),
),
_db: self,
_readopts: readopts,
}
}
}
pub fn read_table_properties<F: FnOnce(&TableProperties)>(&self, mut action: F) {
extern "C" fn callback<F: FnOnce(&TableProperties)>(
ctx: *mut c_void,
ptr: *const crocksdb_ffi::DBTableProperties,
) {
unsafe {
let caller = ptr::read(ctx as *mut F);
caller(TableProperties::from_ptr(ptr));
}
}
unsafe {
crocksdb_ffi::crocksdb_sstfilereader_read_table_properties(
self.inner,
&mut action as *mut F as *mut c_void,
callback::<F>,
);
mem::forget(action);
}
}
pub fn verify_checksum(&self) -> Result<(), String> {
unsafe { Ok(ffi_try!(crocksdb_sstfilereader_verify_checksum(self.inner))) }
}
}
impl Drop for SstFileReader {
fn drop(&mut self) {
unsafe { crocksdb_ffi::crocksdb_sstfilereader_destroy(self.inner) }
}
}
/// SstFileWriter is used to create sst files that can be added to database later /// SstFileWriter is used to create sst files that can be added to database later
/// All keys in files generated by SstFileWriter will have sequence number = 0 /// All keys in files generated by SstFileWriter will have sequence number = 0
pub struct SstFileWriter { pub struct SstFileWriter {
...@@ -2891,7 +2974,7 @@ mod test { ...@@ -2891,7 +2974,7 @@ mod test {
println!("{:?}", c); println!("{:?}", c);
println!("{}", c as u32); println!("{}", c as u32);
match c as u32 { match c as u32 {
0...5 | 7 | 0x40 => assert!(true), 0..=5 | 7 | 0x40 => assert!(true),
_ => assert!(false), _ => assert!(false),
} }
} }
......
...@@ -505,3 +505,50 @@ fn test_ingest_external_file_optimized() { ...@@ -505,3 +505,50 @@ fn test_ingest_external_file_optimized() {
assert_eq!(db.get_cf(handle, b"k2").unwrap().unwrap(), b"b"); assert_eq!(db.get_cf(handle, b"k2").unwrap().unwrap(), b"b");
assert_eq!(db.get_cf(handle, b"k3").unwrap().unwrap(), b"c"); assert_eq!(db.get_cf(handle, b"k3").unwrap().unwrap(), b"c");
} }
#[test]
fn test_read_sst() {
let dir = TempDir::new("_rust_rocksdb_test_read_sst").unwrap();
let sst_path = dir.path().join("sst");
let sst_path_str = sst_path.to_str().unwrap();
gen_sst_put(ColumnFamilyOptions::new(), None, sst_path_str);
let mut reader = SstFileReader::new(ColumnFamilyOptions::default());
reader.open(sst_path_str).unwrap();
reader.verify_checksum().unwrap();
reader.read_table_properties(|props| {
assert_eq!(props.num_entries(), 3);
});
let mut it = reader.iter();
it.seek(SeekKey::Start);
assert_eq!(
it.collect::<Vec<_>>(),
vec![
(b"k1".to_vec(), b"a".to_vec()),
(b"k2".to_vec(), b"b".to_vec()),
(b"k3".to_vec(), b"c".to_vec()),
]
);
}
#[test]
fn test_read_invalid_sst() {
let dir = TempDir::new("_rust_rocksdb_test_read_invalid_sst").unwrap();
let sst_path = dir.path().join("sst");
let sst_path_str = sst_path.to_str().unwrap();
gen_sst_put(ColumnFamilyOptions::new(), None, sst_path_str);
// corrupt one byte.
{
use std::io::{Seek, SeekFrom};
let mut f = fs::OpenOptions::new().write(true).open(&sst_path).unwrap();
f.seek(SeekFrom::Start(9)).unwrap();
f.write(b"!").unwrap();
}
let mut reader = SstFileReader::new(ColumnFamilyOptions::default());
reader.open(sst_path_str).unwrap();
let error_message = reader.verify_checksum().unwrap_err();
assert!(error_message.contains("checksum mismatch"));
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment