Commit d991b347 authored by kennytm's avatar kennytm Committed by yiwu-arbug

Export SstFileReader (#347) (#357)

Cherry-pick #347 for TiKV v3.1.
parent 7574dae9
......@@ -26,6 +26,7 @@
#include "rocksdb/perf_context.h"
#include "rocksdb/rate_limiter.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/sst_file_reader.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
......@@ -110,6 +111,7 @@ using rocksdb::Slice;
using rocksdb::SliceParts;
using rocksdb::SliceTransform;
using rocksdb::Snapshot;
using rocksdb::SstFileReader;
using rocksdb::SstFileWriter;
using rocksdb::ExternalSstFileInfo;
using rocksdb::Status;
......@@ -222,6 +224,7 @@ struct crocksdb_column_family_handle_t { ColumnFamilyHandle* rep; };
struct crocksdb_envoptions_t { EnvOptions rep; };
struct crocksdb_sequential_file_t { SequentialFile* rep; };
struct crocksdb_ingestexternalfileoptions_t { IngestExternalFileOptions rep; };
struct crocksdb_sstfilereader_t { SstFileReader* rep; };
struct crocksdb_sstfilewriter_t { SstFileWriter* rep; };
struct crocksdb_externalsstfileinfo_t { ExternalSstFileInfo rep; };
struct crocksdb_ratelimiter_t { RateLimiter* rep; };
......@@ -3524,6 +3527,42 @@ void crocksdb_sequential_file_destroy(crocksdb_sequential_file_t* file) {
delete file;
}
crocksdb_sstfilereader_t* crocksdb_sstfilereader_create(
const crocksdb_options_t* io_options) {
auto reader = new crocksdb_sstfilereader_t;
reader->rep = new SstFileReader(io_options->rep);
return reader;
}
void crocksdb_sstfilereader_open(crocksdb_sstfilereader_t* reader,
const char* name, char** errptr) {
SaveError(errptr, reader->rep->Open(std::string(name)));
}
crocksdb_iterator_t* crocksdb_sstfilereader_new_iterator(
crocksdb_sstfilereader_t* reader, const crocksdb_readoptions_t* options) {
auto it = new crocksdb_iterator_t;
it->rep = reader->rep->NewIterator(options->rep);
return it;
}
void crocksdb_sstfilereader_read_table_properties(
const crocksdb_sstfilereader_t* reader,
void* ctx, void (*cb)(void*, const crocksdb_table_properties_t*)) {
auto props = reader->rep->GetTableProperties();
cb(ctx, reinterpret_cast<const crocksdb_table_properties_t*>(props.get()));
}
void crocksdb_sstfilereader_verify_checksum(crocksdb_sstfilereader_t* reader,
char** errptr) {
SaveError(errptr, reader->rep->VerifyChecksum());
}
void crocksdb_sstfilereader_destroy(crocksdb_sstfilereader_t* reader) {
delete reader->rep;
delete reader;
}
crocksdb_sstfilewriter_t* crocksdb_sstfilewriter_create(
const crocksdb_envoptions_t* env, const crocksdb_options_t* io_options) {
crocksdb_sstfilewriter_t* writer = new crocksdb_sstfilewriter_t;
......@@ -5306,7 +5345,7 @@ void ctitandb_delete_files_in_range(
RangePtr range(
start_key ? (a = Slice(start_key, start_key_len), &a) : nullptr,
limit_key ? (b = Slice(limit_key, limit_key_len), &b) : nullptr);
SaveError(
errptr,
static_cast<TitanDB*>(db->rep)->DeleteFilesInRanges(
......@@ -5324,7 +5363,7 @@ void ctitandb_delete_files_in_range_cf(
RangePtr range(
start_key ? (a = Slice(start_key, start_key_len), &a) : nullptr,
limit_key ? (b = Slice(limit_key, limit_key_len), &b) : nullptr);
SaveError(
errptr,
static_cast<TitanDB*>(db->rep)->DeleteFilesInRanges(
......
......@@ -114,6 +114,7 @@ typedef struct crocksdb_column_family_handle_t crocksdb_column_family_handle_t;
typedef struct crocksdb_envoptions_t crocksdb_envoptions_t;
typedef struct crocksdb_sequential_file_t crocksdb_sequential_file_t;
typedef struct crocksdb_ingestexternalfileoptions_t crocksdb_ingestexternalfileoptions_t;
typedef struct crocksdb_sstfilereader_t crocksdb_sstfilereader_t;
typedef struct crocksdb_sstfilewriter_t crocksdb_sstfilewriter_t;
typedef struct crocksdb_externalsstfileinfo_t crocksdb_externalsstfileinfo_t;
typedef struct crocksdb_ratelimiter_t crocksdb_ratelimiter_t;
......@@ -1396,6 +1397,29 @@ extern C_ROCKSDB_LIBRARY_API void crocksdb_sequential_file_destroy(
/* SstFile */
extern C_ROCKSDB_LIBRARY_API crocksdb_sstfilereader_t*
crocksdb_sstfilereader_create(const crocksdb_options_t* io_options);
extern C_ROCKSDB_LIBRARY_API void
crocksdb_sstfilereader_open(crocksdb_sstfilereader_t* reader,
const char* name, char** errptr);
extern C_ROCKSDB_LIBRARY_API crocksdb_iterator_t*
crocksdb_sstfilereader_new_iterator(crocksdb_sstfilereader_t* reader,
const crocksdb_readoptions_t* options);
extern C_ROCKSDB_LIBRARY_API void
crocksdb_sstfilereader_read_table_properties(
const crocksdb_sstfilereader_t* reader,
void* ctx, void (*cb)(void*, const crocksdb_table_properties_t*));
extern C_ROCKSDB_LIBRARY_API void
crocksdb_sstfilereader_verify_checksum(crocksdb_sstfilereader_t* reader,
char** errptr);
extern C_ROCKSDB_LIBRARY_API void
crocksdb_sstfilereader_destroy(crocksdb_sstfilereader_t* reader);
extern C_ROCKSDB_LIBRARY_API crocksdb_sstfilewriter_t*
crocksdb_sstfilewriter_create(const crocksdb_envoptions_t* env,
const crocksdb_options_t* io_options);
......
......@@ -41,6 +41,7 @@ pub enum DBComparator {}
pub enum DBFlushOptions {}
pub enum DBCompactionFilter {}
pub enum EnvOptions {}
pub enum SstFileReader {}
pub enum SstFileWriter {}
pub enum ExternalSstFileInfo {}
pub enum IngestExternalFileOptions {}
......@@ -1266,6 +1267,33 @@ extern "C" {
);
pub fn crocksdb_ingestexternalfileoptions_destroy(opt: *mut IngestExternalFileOptions);
// SstFileReader
pub fn crocksdb_sstfilereader_create(io_options: *const Options) -> *mut SstFileReader;
pub fn crocksdb_sstfilereader_open(
reader: *mut SstFileReader,
name: *const c_char,
err: *mut *mut c_char,
);
pub fn crocksdb_sstfilereader_new_iterator(
reader: *mut SstFileReader,
options: *const DBReadOptions,
) -> *mut DBIterator;
pub fn crocksdb_sstfilereader_read_table_properties(
reader: *const SstFileReader,
ctx: *mut c_void,
callback: extern "C" fn(*mut c_void, *const DBTableProperties),
);
pub fn crocksdb_sstfilereader_verify_checksum(
reader: *mut SstFileReader,
errptr: *mut *mut c_char,
);
pub fn crocksdb_sstfilereader_destroy(reader: *mut SstFileReader);
// SstFileWriter
pub fn crocksdb_sstfilewriter_create(
env: *mut EnvOptions,
......
......@@ -36,7 +36,7 @@ pub use perf_context::{get_perf_level, set_perf_level, IOStatsContext, PerfConte
pub use rocksdb::{
load_latest_options, run_ldb_tool, set_external_sst_file_global_seq_no, BackupEngine, CFHandle,
Cache, DBIterator, DBVector, Env, ExternalSstFileInfo, Kv, MemoryAllocator, Range, SeekKey,
SequentialFile, SstFileWriter, Writable, WriteBatch, DB,
SequentialFile, SstFileReader, SstFileWriter, Writable, WriteBatch, DB,
};
pub use rocksdb_options::{
BlockBasedOptions, CColumnFamilyDescriptor, ColumnFamilyOptions, CompactOptions,
......
......@@ -37,7 +37,7 @@ use std::path::{Path, PathBuf};
use std::str::from_utf8;
use std::sync::Arc;
use std::{fs, ptr, slice};
use table_properties::TablePropertiesCollection;
use table_properties::{TableProperties, TablePropertiesCollection};
use util::is_power_of_two;
pub struct CFHandle {
......@@ -122,7 +122,7 @@ pub struct Snapshot<D: Deref<Target = DB>> {
snap: UnsafeSnap,
}
pub struct DBIterator<D: Deref<Target = DB>> {
pub struct DBIterator<D> {
_db: D,
_readopts: ReadOptions,
inner: *mut crocksdb_ffi::DBIterator,
......@@ -184,7 +184,9 @@ impl<D: Deref<Target = DB>> DBIterator<D> {
}
}
}
}
impl<D> DBIterator<D> {
pub fn seek(&mut self, key: SeekKey) -> bool {
unsafe {
match key {
......@@ -269,7 +271,7 @@ impl<D: Deref<Target = DB>> DBIterator<D> {
pub type Kv = (Vec<u8>, Vec<u8>);
impl<'b, D: Deref<Target = DB>> Iterator for &'b mut DBIterator<D> {
impl<'b, D> Iterator for &'b mut DBIterator<D> {
type Item = Kv;
fn next(&mut self) -> Option<Kv> {
......@@ -281,7 +283,7 @@ impl<'b, D: Deref<Target = DB>> Iterator for &'b mut DBIterator<D> {
}
}
impl<D: Deref<Target = DB>> Drop for DBIterator<D> {
impl<D> Drop for DBIterator<D> {
fn drop(&mut self) {
unsafe {
crocksdb_ffi::crocksdb_iter_destroy(self.inner);
......@@ -289,7 +291,7 @@ impl<D: Deref<Target = DB>> Drop for DBIterator<D> {
}
}
unsafe impl<D: Deref<Target = DB> + Send> Send for DBIterator<D> {}
unsafe impl<D: Send> Send for DBIterator<D> {}
unsafe impl<D: Deref<Target = DB> + Send + Sync> Send for Snapshot<D> {}
......@@ -2020,6 +2022,87 @@ impl Drop for BackupEngine {
}
}
// SstFileReader is used to read sst files that are generated by DB or
// SstFileWriter.
pub struct SstFileReader {
inner: *mut crocksdb_ffi::SstFileReader,
_opt: ColumnFamilyOptions,
}
unsafe impl Send for SstFileReader {}
impl SstFileReader {
/// Creates a new SstFileReader.
pub fn new(opt: ColumnFamilyOptions) -> Self {
unsafe {
Self {
inner: crocksdb_ffi::crocksdb_sstfilereader_create(opt.inner),
_opt: opt,
}
}
}
/// Opens a local SST file for reading.
pub fn open(&mut self, name: &str) -> Result<(), String> {
let path =
CString::new(name.to_owned()).map_err(|e| format!("invalid path {}: {:?}", name, e))?;
unsafe {
Ok(ffi_try!(crocksdb_sstfilereader_open(
self.inner,
path.as_ptr()
)))
}
}
pub fn iter(&self) -> DBIterator<&Self> {
self.iter_opt(ReadOptions::new())
}
pub fn iter_opt(&self, readopts: ReadOptions) -> DBIterator<&Self> {
unsafe {
DBIterator {
inner: crocksdb_ffi::crocksdb_sstfilereader_new_iterator(
self.inner,
readopts.get_inner(),
),
_db: self,
_readopts: readopts,
}
}
}
pub fn read_table_properties<F: FnOnce(&TableProperties)>(&self, mut action: F) {
extern "C" fn callback<F: FnOnce(&TableProperties)>(
ctx: *mut c_void,
ptr: *const crocksdb_ffi::DBTableProperties,
) {
unsafe {
let caller = ptr::read(ctx as *mut F);
caller(TableProperties::from_ptr(ptr));
}
}
unsafe {
crocksdb_ffi::crocksdb_sstfilereader_read_table_properties(
self.inner,
&mut action as *mut F as *mut c_void,
callback::<F>,
);
mem::forget(action);
}
}
pub fn verify_checksum(&self) -> Result<(), String> {
unsafe { Ok(ffi_try!(crocksdb_sstfilereader_verify_checksum(self.inner))) }
}
}
impl Drop for SstFileReader {
fn drop(&mut self) {
unsafe { crocksdb_ffi::crocksdb_sstfilereader_destroy(self.inner) }
}
}
/// SstFileWriter is used to create sst files that can be added to database later
/// All keys in files generated by SstFileWriter will have sequence number = 0
pub struct SstFileWriter {
......@@ -2874,7 +2957,7 @@ mod test {
println!("{:?}", c);
println!("{}", c as u32);
match c as u32 {
0...5 | 7 | 0x40 => assert!(true),
0..=5 | 7 | 0x40 => assert!(true),
_ => assert!(false),
}
}
......
......@@ -505,3 +505,50 @@ fn test_ingest_external_file_optimized() {
assert_eq!(db.get_cf(handle, b"k2").unwrap().unwrap(), b"b");
assert_eq!(db.get_cf(handle, b"k3").unwrap().unwrap(), b"c");
}
#[test]
fn test_read_sst() {
let dir = TempDir::new("_rust_rocksdb_test_read_sst").unwrap();
let sst_path = dir.path().join("sst");
let sst_path_str = sst_path.to_str().unwrap();
gen_sst_put(ColumnFamilyOptions::new(), None, sst_path_str);
let mut reader = SstFileReader::new(ColumnFamilyOptions::default());
reader.open(sst_path_str).unwrap();
reader.verify_checksum().unwrap();
reader.read_table_properties(|props| {
assert_eq!(props.num_entries(), 3);
});
let mut it = reader.iter();
it.seek(SeekKey::Start);
assert_eq!(
it.collect::<Vec<_>>(),
vec![
(b"k1".to_vec(), b"a".to_vec()),
(b"k2".to_vec(), b"b".to_vec()),
(b"k3".to_vec(), b"c".to_vec()),
]
);
}
#[test]
fn test_read_invalid_sst() {
let dir = TempDir::new("_rust_rocksdb_test_read_invalid_sst").unwrap();
let sst_path = dir.path().join("sst");
let sst_path_str = sst_path.to_str().unwrap();
gen_sst_put(ColumnFamilyOptions::new(), None, sst_path_str);
// corrupt one byte.
{
use std::io::{Seek, SeekFrom};
let mut f = fs::OpenOptions::new().write(true).open(&sst_path).unwrap();
f.seek(SeekFrom::Start(9)).unwrap();
f.write(b"!").unwrap();
}
let mut reader = SstFileReader::new(ColumnFamilyOptions::default());
reader.open(sst_path_str).unwrap();
let error_message = reader.verify_checksum().unwrap_err();
assert!(error_message.contains("checksum mismatch"));
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment