Unverified Commit 85cdc278 authored by Connor's avatar Connor Committed by GitHub

ignore missing blob file for iterator (#18)

* add key only iter option
Signed-off-by: 's avatarConnor1996 <zbk602423539@gmail.com>
parent c20e2e0e
......@@ -77,6 +77,25 @@ class TitanDB : public StackableDB {
Status DestroyColumnFamilyHandle(ColumnFamilyHandle* column_family) override =
0;
using StackableDB::NewIterator;
Iterator* NewIterator(const ReadOptions& opts,
ColumnFamilyHandle* column_family) override {
return NewIterator(TitanReadOptions(opts), column_family);
}
virtual Iterator* NewIterator(const TitanReadOptions& opts,
ColumnFamilyHandle* column_family) = 0;
using StackableDB::NewIterators;
Status NewIterators(const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) override {
return NewIterators(TitanReadOptions(options), column_families, iterators);
}
virtual Status NewIterators(
const TitanReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) = 0;
using StackableDB::Merge;
Status Merge(const WriteOptions&, ColumnFamilyHandle*, const Slice& /*key*/,
const Slice& /*value*/) override {
......
......@@ -25,6 +25,11 @@ struct TitanDBOptions : public DBOptions {
// Default: 1
int32_t max_background_gc{1};
// How often to schedule delete obsolete blob files periods
//
// Default: 10
uint32_t purge_obsolete_files_period{10}; // 10s
TitanDBOptions() = default;
explicit TitanDBOptions(const DBOptions& options) : DBOptions(options) {}
......@@ -172,19 +177,38 @@ struct TitanOptions : public TitanDBOptions, public TitanCFOptions {
: TitanDBOptions(options), TitanCFOptions(options) {}
TitanOptions& operator=(const Options& options) {
*dynamic_cast<TitanDBOptions*>(this) = options;
*dynamic_cast<TitanCFOptions*>(this) = options;
*static_cast<TitanDBOptions*>(this) = options;
*static_cast<TitanCFOptions*>(this) = options;
return *this;
}
operator Options() {
Options options;
*dynamic_cast<DBOptions*>(&options) = *dynamic_cast<DBOptions*>(this);
*dynamic_cast<ColumnFamilyOptions*>(&options) =
*dynamic_cast<ColumnFamilyOptions*>(this);
*static_cast<DBOptions*>(&options) = *static_cast<DBOptions*>(this);
*static_cast<ColumnFamilyOptions*>(&options) =
*static_cast<ColumnFamilyOptions*>(this);
return options;
}
};
struct TitanReadOptions : public ReadOptions {
// If true, it will just return keys without indexing value from blob files.
// It is mainly used for the scan-delete operation after DeleteFilesInRange.
// Cause DeleteFilesInRange may expose old blob index keys, returning key only
// avoids referring to missing blob files.
//
// Default: false
bool key_only{false};
TitanReadOptions() = default;
explicit TitanReadOptions(const ReadOptions& options)
: ReadOptions(options) {}
TitanReadOptions& operator=(const ReadOptions& options) {
*static_cast<ReadOptions*>(this) = options;
return *this;
}
};
} // namespace titandb
} // namespace rocksdb
......@@ -49,7 +49,7 @@ class BlobFileIteratorTest : public testing::Test {
}
}
void NewBuiler() {
void NewBuilder() {
TitanDBOptions db_options(titan_options_);
TitanCFOptions cf_options(titan_options_);
BlobFileCache cache(db_options, cf_options, {NewLRUCache(128)}, nullptr);
......@@ -73,7 +73,7 @@ class BlobFileIteratorTest : public testing::Test {
ASSERT_OK(builder_->status());
}
void FinishBuiler() {
void FinishBuilder() {
ASSERT_OK(builder_->Finish());
ASSERT_OK(builder_->status());
}
......@@ -88,7 +88,7 @@ class BlobFileIteratorTest : public testing::Test {
}
void TestBlobFileIterator() {
NewBuiler();
NewBuilder();
const int n = 1000;
std::vector<BlobHandle> handles(n);
......@@ -97,7 +97,7 @@ class BlobFileIteratorTest : public testing::Test {
AddKeyValue(id, id, &handles[i]);
}
FinishBuiler();
FinishBuilder();
NewBlobFileIterator();
......@@ -120,7 +120,7 @@ TEST_F(BlobFileIteratorTest, Basic) {
}
TEST_F(BlobFileIteratorTest, IterateForPrev) {
NewBuiler();
NewBuilder();
const int n = 1000;
std::vector<BlobHandle> handles(n);
for (int i = 0; i < n; i++) {
......@@ -128,7 +128,7 @@ TEST_F(BlobFileIteratorTest, IterateForPrev) {
AddKeyValue(id, id, &handles[i]);
}
FinishBuiler();
FinishBuilder();
NewBlobFileIterator();
......@@ -181,11 +181,11 @@ TEST_F(BlobFileIteratorTest, MergeIterator) {
const int kMaxKeyNum = 1000;
std::vector<BlobHandle> handles(kMaxKeyNum);
std::vector<std::unique_ptr<BlobFileIterator>> iters;
NewBuiler();
NewBuilder();
for (int i = 1; i < kMaxKeyNum; i++) {
AddKeyValue(GenKey(i), GenValue(i), &handles[i]);
if (i % 100 == 0) {
FinishBuiler();
FinishBuilder();
uint64_t file_size = 0;
ASSERT_OK(env_->GetFileSize(file_name_, &file_size));
NewBlobFileReader(file_number_, 0, titan_options_, env_options_, env_,
......@@ -195,11 +195,11 @@ TEST_F(BlobFileIteratorTest, MergeIterator) {
file_size, TitanCFOptions()}));
file_number_ = Random::GetTLSInstance()->Next();
file_name_ = BlobFileName(dirname_, file_number_);
NewBuiler();
NewBuilder();
}
}
FinishBuiler();
FinishBuilder();
uint64_t file_size = 0;
ASSERT_OK(env_->GetFileSize(file_name_, &file_size));
NewBlobFileReader(file_number_, 0, titan_options_, env_options_, env_,
......
......@@ -161,14 +161,16 @@ bool BlobGCJob::DoSample(const BlobFileMeta* file) {
}
// TODO: add do sample count metrics
auto records_size = file->file_size() - BlobFileHeader::kEncodedLength -
BlobFileFooter::kEncodedLength;
Status s;
uint64_t sample_size_window = static_cast<uint64_t>(
file->file_size() * blob_gc_->titan_cf_options().sample_file_size_ratio);
Random64 random64(file->file_size());
uint64_t sample_begin_offset =
random64.Uniform(file->file_size() - sample_size_window);
records_size * blob_gc_->titan_cf_options().sample_file_size_ratio);
uint64_t sample_begin_offset = BlobFileHeader::kEncodedLength;
if (records_size != sample_size_window) {
Random64 random64(records_size);
sample_begin_offset += random64.Uniform(records_size - sample_size_window);
}
std::unique_ptr<RandomAccessFileReader> file_reader;
const int readahead = 256 << 10;
s = NewBlobFileReader(file->file_number(), readahead, db_options_,
......@@ -184,11 +186,11 @@ bool BlobGCJob::DoSample(const BlobFileMeta* file) {
// TODO(@DorianZheng) sample_begin_offset maybe out of data block size, need
// more elegant solution
if (iter.status().IsInvalidArgument()) {
iter.IterateForPrev(0);
iter.IterateForPrev(BlobFileHeader::kEncodedLength);
}
if (!iter.status().ok()) {
fprintf(stderr,
"IterateForPrev faile, file number[%lu] size[%lu] status[%s]\n",
"IterateForPrev failed, file number[%lu] size[%lu] status[%s]\n",
static_cast<size_t>(file->file_number()),
static_cast<size_t>(file->file_size()),
iter.status().ToString().c_str());
......@@ -211,8 +213,8 @@ bool BlobGCJob::DoSample(const BlobFileMeta* file) {
assert(iter.status().ok());
return discardable_size >=
sample_size_window *
blob_gc_->titan_cf_options().blob_file_discardable_ratio;
std::ceil(sample_size_window *
blob_gc_->titan_cf_options().blob_file_discardable_ratio);
}
Status BlobGCJob::DoRunGC() {
......
......@@ -2,6 +2,7 @@
#include "blob_gc_picker.h"
#include "db_impl.h"
#include "rocksdb/convenience.h"
#include "util/testharness.h"
namespace rocksdb {
......@@ -36,6 +37,7 @@ class BlobGCJobTest : public testing::Test {
options_.create_if_missing = true;
options_.disable_background_gc = true;
options_.min_blob_size = 0;
options_.disable_auto_compactions = true;
options_.env->CreateDirIfMissing(dbname_);
options_.env->CreateDirIfMissing(options_.dirname);
}
......@@ -84,6 +86,15 @@ class BlobGCJobTest : public testing::Test {
ASSERT_OK(db_->Flush(fopts));
}
void CompactAll() {
auto opts = db_->GetOptions();
auto compact_opts = CompactRangeOptions();
compact_opts.change_level = true;
compact_opts.target_level = opts.num_levels - 1;
compact_opts.bottommost_level_compaction = BottommostLevelCompaction::kSkip;
ASSERT_OK(db_->CompactRange(compact_opts, nullptr, nullptr));
}
void DestroyDB() {
Status s __attribute__((__unused__)) = db_->Close();
assert(s.ok());
......@@ -91,7 +102,7 @@ class BlobGCJobTest : public testing::Test {
db_ = nullptr;
}
void RunGC() {
void RunGC(bool expected = false) {
MutexLock l(mutex_);
Status s;
auto* cfh = base_db_->DefaultColumnFamily();
......@@ -102,6 +113,7 @@ class BlobGCJobTest : public testing::Test {
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options.info_log.get());
cf_options.min_gc_batch_size = 0;
cf_options.blob_file_discardable_ratio = 0.4;
cf_options.sample_file_size_ratio = 1;
std::unique_ptr<BlobGC> blob_gc;
{
......@@ -111,6 +123,10 @@ class BlobGCJobTest : public testing::Test {
version_set_->GetBlobStorage(cfh->GetID()).lock().get());
}
if (expected) {
ASSERT_TRUE(blob_gc != nullptr);
}
if (blob_gc) {
blob_gc->SetColumnFamily(cfh);
......@@ -124,6 +140,9 @@ class BlobGCJobTest : public testing::Test {
{
mutex_->Unlock();
s = blob_gc_job.Run();
if (expected) {
ASSERT_OK(s);
}
mutex_->Lock();
}
......@@ -294,6 +313,62 @@ TEST_F(BlobGCJobTest, PurgeBlobs) {
DestroyDB();
}
TEST_F(BlobGCJobTest, DeleteFilesInRange) {
NewDB();
ASSERT_OK(db_->Put(WriteOptions(), GenKey(2), GenValue(21)));
ASSERT_OK(db_->Put(WriteOptions(), GenKey(4), GenValue(4)));
Flush();
CompactAll();
std::string value;
ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level0", &value));
ASSERT_EQ(value, "0");
ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level6", &value));
ASSERT_EQ(value, "1");
SstFileWriter sst_file_writer(EnvOptions(), options_);
std::string sst_file = options_.dirname + "/for_ingest.sst";
ASSERT_OK(sst_file_writer.Open(sst_file));
ASSERT_OK(sst_file_writer.Put(GenKey(1), GenValue(1)));
ASSERT_OK(sst_file_writer.Put(GenKey(2), GenValue(22)));
ASSERT_OK(sst_file_writer.Finish());
ASSERT_OK(db_->IngestExternalFile({sst_file}, IngestExternalFileOptions()));
ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level0", &value));
ASSERT_EQ(value, "0");
ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level5", &value));
ASSERT_EQ(value, "1");
ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level6", &value));
ASSERT_EQ(value, "1");
RunGC(true);
std::string key0 = GenKey(0);
std::string key3 = GenKey(3);
Slice start = Slice(key0);
Slice end = Slice(key3);
ASSERT_OK(
DeleteFilesInRange(base_db_, db_->DefaultColumnFamily(), &start, &end));
TitanReadOptions opts;
auto* iter = db_->NewIterator(opts, db_->DefaultColumnFamily());
iter->SeekToFirst();
while (iter->Valid()) {
iter->Next();
}
ASSERT_TRUE(iter->status().IsCorruption());
delete iter;
opts.key_only = true;
iter = db_->NewIterator(opts, db_->DefaultColumnFamily());
iter->SeekToFirst();
while (iter->Valid()) {
iter->Next();
}
ASSERT_OK(iter->status());
delete iter;
DestroyDB();
}
} // namespace titandb
} // namespace rocksdb
......
......@@ -134,14 +134,11 @@ TitanDBImpl::TitanDBImpl(const TitanDBOptions& options,
TitanDBImpl::~TitanDBImpl() { Close(); }
// how often to schedule delete obsolete blob files periods
static constexpr uint32_t kDeleteObsoleteFilesPeriodSecs = 10; // 10s
void TitanDBImpl::StartBackgroundTasks() {
if (!thread_purge_obsolete_) {
thread_purge_obsolete_.reset(new rocksdb::RepeatableThread(
[this]() { TitanDBImpl::PurgeObsoleteFiles(); }, "titanbg", env_,
kDeleteObsoleteFilesPeriodSecs * 1000 * 1000));
db_options_.purge_obsolete_files_period * 1000 * 1000));
}
}
......@@ -404,7 +401,7 @@ Status TitanDBImpl::GetImpl(const ReadOptions& options,
index.blob_handle.size);
}
if (s.IsCorruption()) {
ROCKS_LOG_DEBUG(db_options_.info_log,
ROCKS_LOG_ERROR(db_options_.info_log,
"Key:%s Snapshot:%" PRIu64 " GetBlobFile err:%s\n",
key.ToString(true).c_str(),
options.snapshot->GetSequenceNumber(),
......@@ -448,22 +445,22 @@ std::vector<Status> TitanDBImpl::MultiGetImpl(
return res;
}
Iterator* TitanDBImpl::NewIterator(const ReadOptions& options,
Iterator* TitanDBImpl::NewIterator(const TitanReadOptions& options,
ColumnFamilyHandle* handle) {
ReadOptions options_copy = options;
TitanReadOptions options_copy = options;
options_copy.total_order_seek = true;
std::shared_ptr<ManagedSnapshot> snapshot;
if (options_copy.snapshot) {
return NewIteratorImpl(options_copy, handle, snapshot);
}
ReadOptions ro(options_copy);
TitanReadOptions ro(options_copy);
snapshot.reset(new ManagedSnapshot(this));
ro.snapshot = snapshot->snapshot();
return NewIteratorImpl(ro, handle, snapshot);
}
Iterator* TitanDBImpl::NewIteratorImpl(
const ReadOptions& options, ColumnFamilyHandle* handle,
const TitanReadOptions& options, ColumnFamilyHandle* handle,
std::shared_ptr<ManagedSnapshot> snapshot) {
auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(handle)->cfd();
......@@ -479,9 +476,10 @@ Iterator* TitanDBImpl::NewIteratorImpl(
}
Status TitanDBImpl::NewIterators(
const ReadOptions& options, const std::vector<ColumnFamilyHandle*>& handles,
const TitanReadOptions& options,
const std::vector<ColumnFamilyHandle*>& handles,
std::vector<Iterator*>* iterators) {
ReadOptions ro(options);
TitanReadOptions ro(options);
ro.total_order_seek = true;
std::shared_ptr<ManagedSnapshot> snapshot;
if (!ro.snapshot) {
......
......@@ -54,10 +54,11 @@ class TitanDBImpl : public TitanDB {
std::vector<std::string>* values) override;
using TitanDB::NewIterator;
Iterator* NewIterator(const ReadOptions& options,
Iterator* NewIterator(const TitanReadOptions& options,
ColumnFamilyHandle* handle) override;
Status NewIterators(const ReadOptions& options,
using TitanDB::NewIterators;
Status NewIterators(const TitanReadOptions& options,
const std::vector<ColumnFamilyHandle*>& handles,
std::vector<Iterator*>* iterators) override;
......@@ -103,7 +104,7 @@ class TitanDBImpl : public TitanDB {
const std::vector<ColumnFamilyHandle*>& handles,
const std::vector<Slice>& keys, std::vector<std::string>* values);
Iterator* NewIteratorImpl(const ReadOptions& options,
Iterator* NewIteratorImpl(const TitanReadOptions& options,
ColumnFamilyHandle* handle,
std::shared_ptr<ManagedSnapshot> snapshot);
......
......@@ -14,7 +14,7 @@ namespace titandb {
class TitanDBIterator : public Iterator {
public:
TitanDBIterator(const ReadOptions& options, BlobStorage* storage,
TitanDBIterator(const TitanReadOptions& options, BlobStorage* storage,
std::shared_ptr<ManagedSnapshot> snap,
std::unique_ptr<ArenaWrappedDBIter> iter, Env* env,
TitanStats* stats)
......@@ -98,14 +98,15 @@ class TitanDBIterator : public Iterator {
}
Slice value() const override {
assert(Valid());
assert(Valid() && !options_.key_only);
if (options_.key_only) return Slice();
if (!iter_->IsBlob()) return iter_->value();
return record_.value;
}
private:
bool Check() {
if (!iter_->Valid() || !iter_->IsBlob()) {
if (!iter_->Valid() || !iter_->IsBlob() || options_.key_only) {
status_ = iter_->status();
return false;
}
......@@ -128,6 +129,10 @@ class TitanDBIterator : public Iterator {
std::unique_ptr<BlobFilePrefetcher> prefetcher;
status_ = storage_->NewPrefetcher(index.file_number, &prefetcher);
if (status_.IsCorruption()) {
// If use `DeleteFilesInRanges`, we may encounter this failure,
// because `DeleteFilesInRanges` may expose an old key whose
// corresponding blob file has already been GCed out, so we
// cannot abort here.
fprintf(stderr,
"key:%s GetBlobValue err:%s with sequence number:%" PRIu64 "\n",
iter_->key().ToString(true).c_str(), status_.ToString().c_str(),
......@@ -146,7 +151,7 @@ class TitanDBIterator : public Iterator {
BlobRecord record_;
PinnableSlice buffer_;
ReadOptions options_;
TitanReadOptions options_;
BlobStorage* storage_;
std::shared_ptr<ManagedSnapshot> snap_;
std::unique_ptr<ArenaWrappedDBIter> iter_;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment