Unverified Commit c141298a authored by UncP's avatar UncP Committed by GitHub

modify sst file `seq no` (#173)

* modify external sst file's global seq no
parent e6492cbc
...@@ -11,7 +11,6 @@ ...@@ -11,7 +11,6 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
extern crate cc; extern crate cc;
extern crate cmake; extern crate cmake;
...@@ -85,12 +84,15 @@ fn build_rocksdb() -> Build { ...@@ -85,12 +84,15 @@ fn build_rocksdb() -> Build {
_ => "Debug", _ => "Debug",
}; };
println!("cargo:rustc-link-search=native={}/{}", build_dir, profile); println!("cargo:rustc-link-search=native={}/{}", build_dir, profile);
build.define("OS_WIN", None);
} else { } else {
println!("cargo:rustc-link-search=native={}", build_dir); println!("cargo:rustc-link-search=native={}", build_dir);
build.define("ROCKSDB_PLATFORM_POSIX", None);
} }
let cur_dir = env::current_dir().unwrap(); let cur_dir = env::current_dir().unwrap();
build.include(cur_dir.join("rocksdb").join("include")); build.include(cur_dir.join("rocksdb").join("include"));
build.include(cur_dir.join("rocksdb"));
println!("cargo:rustc-link-lib=static=rocksdb"); println!("cargo:rustc-link-lib=static=rocksdb");
println!("cargo:rustc-link-lib=static=z"); println!("cargo:rustc-link-lib=static=z");
......
...@@ -31,6 +31,14 @@ ...@@ -31,6 +31,14 @@
#include "rocksdb/utilities/backupable_db.h" #include "rocksdb/utilities/backupable_db.h"
#include "rocksdb/utilities/debug.h" #include "rocksdb/utilities/debug.h"
#include "rocksdb/write_batch.h" #include "rocksdb/write_batch.h"
#include "db/column_family.h"
#include "table/sst_file_writer_collectors.h"
#include "table/table_reader.h"
#include "table/block_based_table_factory.h"
#include "util/file_reader_writer.h"
#include "util/coding.h"
#include <stdlib.h> #include <stdlib.h>
#if !defined(ROCKSDB_MAJOR) || !defined(ROCKSDB_MINOR) || !defined(ROCKSDB_PATCH) #if !defined(ROCKSDB_MAJOR) || !defined(ROCKSDB_MINOR) || !defined(ROCKSDB_PATCH)
...@@ -109,6 +117,19 @@ using rocksdb::TablePropertiesCollectorFactory; ...@@ -109,6 +117,19 @@ using rocksdb::TablePropertiesCollectorFactory;
using rocksdb::KeyVersion; using rocksdb::KeyVersion;
using rocksdb::DbPath; using rocksdb::DbPath;
using rocksdb::ColumnFamilyData;
using rocksdb::ColumnFamilyHandleImpl;
using rocksdb::TableReaderOptions;
using rocksdb::TableReader;
using rocksdb::BlockBasedTableFactory;
using rocksdb::RandomAccessFile;
using rocksdb::RandomAccessFileReader;
using rocksdb::RandomRWFile;
using rocksdb::ExternalSstFilePropertyNames;
using rocksdb::DecodeFixed32;
using rocksdb::DecodeFixed64;
using rocksdb::PutFixed64;
using std::shared_ptr; using std::shared_ptr;
extern "C" { extern "C" {
...@@ -2444,7 +2465,7 @@ crocksdb_ratelimiter_t* crocksdb_ratelimiter_create( ...@@ -2444,7 +2465,7 @@ crocksdb_ratelimiter_t* crocksdb_ratelimiter_create(
void crocksdb_ratelimiter_destroy(crocksdb_ratelimiter_t *limiter) { void crocksdb_ratelimiter_destroy(crocksdb_ratelimiter_t *limiter) {
if (limiter->rep) { if (limiter->rep) {
delete limiter->rep; delete limiter->rep;
} }
delete limiter; delete limiter;
} }
...@@ -3787,4 +3808,111 @@ int crocksdb_keyversions_type(const crocksdb_keyversions_t *kvs, int index) { ...@@ -3787,4 +3808,111 @@ int crocksdb_keyversions_type(const crocksdb_keyversions_t *kvs, int index) {
return kvs->rep[index].type; return kvs->rep[index].type;
} }
struct ExternalSstFileModifier {
ExternalSstFileModifier(Env *env, ColumnFamilyData *cfd, DBOptions &db_options)
:env_(env), cfd_(cfd), env_options_(db_options), table_reader_(nullptr) { }
Status Open(std::string file) {
file_ = file;
// Get External Sst File Size
uint64_t file_size;
auto status = env_->GetFileSize(file_, &file_size);
if (!status.ok()) {
return status;
}
// Open External Sst File
std::unique_ptr<RandomAccessFile> sst_file;
std::unique_ptr<RandomAccessFileReader> sst_file_reader;
status = env_->NewRandomAccessFile(file_, &sst_file, env_options_);
if (!status.ok()) {
return status;
}
sst_file_reader.reset(new RandomAccessFileReader(std::move(sst_file), file_));
// Get Table Reader
status = cfd_->ioptions()->table_factory->NewTableReader(
TableReaderOptions(*cfd_->ioptions(), env_options_,
cfd_->internal_comparator()),
std::move(sst_file_reader), file_size, &table_reader_);
return status;
}
Status SetGlobalSeqNo(uint64_t seq_no, uint64_t *pre_seq_no) {
if (table_reader_ == nullptr) {
return Status::InvalidArgument("File is not open or seq-no has been modified");
}
// Get the external file properties
auto props = table_reader_->GetTableProperties();
const auto& uprops = props->user_collected_properties;
// Validate version and seqno offset
auto version_iter = uprops.find(ExternalSstFilePropertyNames::kVersion);
if (version_iter == uprops.end()) {
return Status::Corruption("External file version not found");
}
uint32_t version = DecodeFixed32(version_iter->second.c_str());
if (version != 2) {
return Status::NotSupported("External file version should be 2");
}
auto seqno_iter = uprops.find(ExternalSstFilePropertyNames::kGlobalSeqno);
if (seqno_iter == uprops.end()) {
return Status::Corruption("External file global sequence number not found");
}
*pre_seq_no = DecodeFixed64(seqno_iter->second.c_str());
uint64_t offset = props->properties_offsets.at(ExternalSstFilePropertyNames::kGlobalSeqno);
if (offset == 0) {
return Status::Corruption("Was not able to find file global seqno field");
}
if (*pre_seq_no == seq_no) {
// This file already have the correct global seqno
return Status::OK();
}
std::unique_ptr<RandomRWFile> rwfile;
auto status = env_->NewRandomRWFile(file_, &rwfile, env_options_);
if (!status.ok()) {
return status;
}
// Write the new seqno in the global sequence number field in the file
std::string seqno_val;
PutFixed64(&seqno_val, seq_no);
status = rwfile->Write(offset, seqno_val);
return status;
}
private:
Env *env_;
ColumnFamilyData *cfd_;
EnvOptions env_options_;
std::string file_;
std::unique_ptr<TableReader> table_reader_;
};
// !!! this function is dangerous because it uses rocksdb's non-public API !!!
// find the offset of external sst file's `global seq no` and modify it.
uint64_t crocksdb_set_external_sst_file_global_seq_no(
crocksdb_t *db,
crocksdb_column_family_handle_t *column_family,
const char *file,
uint64_t seq_no,
char **errptr) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family->rep);
auto db_options = db->rep->GetDBOptions();
ExternalSstFileModifier modifier(db->rep->GetEnv(), cfh->cfd(), db_options);
auto s = modifier.Open(std::string(file));
uint64_t pre_seq_no = 0;
if (!s.ok()) {
SaveError(errptr, s);
return pre_seq_no;
}
s = modifier.SetGlobalSeqNo(seq_no, &pre_seq_no);
if (!s.ok()) {
SaveError(errptr, s);
}
return pre_seq_no;
}
} // end extern "C" } // end extern "C"
...@@ -1561,6 +1561,15 @@ crocksdb_keyversions_seq(const crocksdb_keyversions_t *kvs, int index); ...@@ -1561,6 +1561,15 @@ crocksdb_keyversions_seq(const crocksdb_keyversions_t *kvs, int index);
extern C_ROCKSDB_LIBRARY_API int extern C_ROCKSDB_LIBRARY_API int
crocksdb_keyversions_type(const crocksdb_keyversions_t *kvs, int index); crocksdb_keyversions_type(const crocksdb_keyversions_t *kvs, int index);
/* Modify Sst File Seq No */
extern C_ROCKSDB_LIBRARY_API uint64_t
crocksdb_set_external_sst_file_global_seq_no(
crocksdb_t *db,
crocksdb_column_family_handle_t *column_family,
const char *file,
uint64_t seq_no,
char **errptr);
#ifdef __cplusplus #ifdef __cplusplus
} /* end extern "C" */ } /* end extern "C" */
#endif #endif
......
...@@ -1496,6 +1496,14 @@ extern "C" { ...@@ -1496,6 +1496,14 @@ extern "C" {
pub fn crocksdb_keyversions_seq(kvs: *mut DBKeyVersions, index: usize) -> uint64_t; pub fn crocksdb_keyversions_seq(kvs: *mut DBKeyVersions, index: usize) -> uint64_t;
pub fn crocksdb_keyversions_type(kvs: *mut DBKeyVersions, index: usize) -> c_int; pub fn crocksdb_keyversions_type(kvs: *mut DBKeyVersions, index: usize) -> c_int;
pub fn crocksdb_set_external_sst_file_global_seq_no(
db: *mut DBInstance,
handle: *mut DBCFHandle,
file: *const c_char,
seq_no: u64,
err: *mut *mut c_char,
) -> u64;
} }
#[cfg(test)] #[cfg(test)]
......
...@@ -37,8 +37,9 @@ pub use librocksdb_sys::{self as crocksdb_ffi, new_bloom_filter, CompactionPrior ...@@ -37,8 +37,9 @@ pub use librocksdb_sys::{self as crocksdb_ffi, new_bloom_filter, CompactionPrior
DBCompactionStyle, DBCompressionType, DBEntryType, DBInfoLogLevel, DBCompactionStyle, DBCompressionType, DBEntryType, DBInfoLogLevel,
DBRecoveryMode, DBStatisticsHistogramType, DBStatisticsTickerType}; DBRecoveryMode, DBStatisticsHistogramType, DBStatisticsTickerType};
pub use merge_operator::MergeOperands; pub use merge_operator::MergeOperands;
pub use rocksdb::{BackupEngine, CFHandle, DBIterator, DBVector, Env, ExternalSstFileInfo, Kv, pub use rocksdb::{set_external_sst_file_global_seq_no, BackupEngine, CFHandle, DBIterator,
Range, SeekKey, SequentialFile, SstFileWriter, Writable, WriteBatch, DB}; DBVector, Env, ExternalSstFileInfo, Kv, Range, SeekKey, SequentialFile,
SstFileWriter, Writable, WriteBatch, DB};
pub use rocksdb_options::{BlockBasedOptions, ColumnFamilyOptions, CompactOptions, DBOptions, pub use rocksdb_options::{BlockBasedOptions, ColumnFamilyOptions, CompactOptions, DBOptions,
EnvOptions, FifoCompactionOptions, HistogramData, EnvOptions, FifoCompactionOptions, HistogramData,
IngestExternalFileOptions, RateLimiter, ReadOptions, RestoreOptions, IngestExternalFileOptions, RateLimiter, ReadOptions, RestoreOptions,
......
...@@ -11,7 +11,6 @@ ...@@ -11,7 +11,6 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
//
use crocksdb_ffi::{self, DBBackupEngine, DBCFHandle, DBCompressionType, DBEnv, DBInstance, use crocksdb_ffi::{self, DBBackupEngine, DBCFHandle, DBCompressionType, DBEnv, DBInstance,
DBPinnableSlice, DBSequentialFile, DBStatisticsHistogramType, DBPinnableSlice, DBSequentialFile, DBStatisticsHistogramType,
...@@ -1964,6 +1963,24 @@ impl Drop for SequentialFile { ...@@ -1964,6 +1963,24 @@ impl Drop for SequentialFile {
} }
} }
pub fn set_external_sst_file_global_seq_no(
db: &DB,
cf: &CFHandle,
file: &str,
seq_no: u64,
) -> Result<u64, String> {
let cfile = CString::new(file).unwrap();
unsafe {
let pre_seq_no = ffi_try!(crocksdb_set_external_sst_file_global_seq_no(
db.inner,
cf.inner,
cfile.as_ptr(),
seq_no
));
Ok(pre_seq_no)
}
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use super::*;
......
...@@ -432,3 +432,34 @@ fn test_mem_sst_file_writer() { ...@@ -432,3 +432,34 @@ fn test_mem_sst_file_writer() {
assert!(env.delete_file(mem_sst_str).is_ok()); assert!(env.delete_file(mem_sst_str).is_ok());
assert!(env.file_exists(mem_sst_str).is_err()); assert!(env.file_exists(mem_sst_str).is_err());
} }
#[test]
fn test_set_external_sst_file_global_seq_no() {
let db_path = TempDir::new("_rust_rocksdb_set_external_sst_file_global_seq_no_db").expect("");
let db = create_default_database(&db_path);
let path = TempDir::new("_rust_rocksdb_set_external_sst_file_global_seq_no").expect("");
let file = path.path().join("sst_file");
let sstfile_str = file.to_str().unwrap();
gen_sst(
ColumnFamilyOptions::new(),
Some(db.cf_handle("default").unwrap()),
sstfile_str,
&[(b"k1", b"v1"), (b"k2", b"v2")],
);
let handle = db.cf_handle("default").unwrap();
let seq_no = 1;
// varify change seq_no
let r1 = set_external_sst_file_global_seq_no(&db, &handle, sstfile_str, seq_no);
assert!(r1.unwrap() != seq_no);
// varify that seq_no are equal
let r2 = set_external_sst_file_global_seq_no(&db, &handle, sstfile_str, seq_no);
assert!(r2.unwrap() == seq_no);
// change seq_no back to 0 so that it can be ingested
assert!(set_external_sst_file_global_seq_no(&db, &handle, sstfile_str, 0).is_ok());
db.ingest_external_file(&IngestExternalFileOptions::new(), &[sstfile_str])
.unwrap();
check_kv(&db, None, &[(b"k1", Some(b"v1")), (b"k2", Some(b"v2"))]);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment