Commit 5f6c3c85 authored by yiwu-arbug's avatar yiwu-arbug Committed by Connor

Eliminate use of abort() (#26)

* Eliminate use of abort()
Signed-off-by: 's avatarYi Wu <yiwu@pingcap.com>
parent c5f2551d
#include "blob_file_reader.h" #include "blob_file_reader.h"
#include "titan_stats.h"
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include "util/crc32c.h" #include "util/crc32c.h"
#include "util/filename.h" #include "util/filename.h"
#include "util/string_util.h"
#include "util/sync_point.h" #include "util/sync_point.h"
#include "titan_stats.h"
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
...@@ -55,11 +63,17 @@ Status BlobFileReader::Open(const TitanCFOptions& options, ...@@ -55,11 +63,17 @@ Status BlobFileReader::Open(const TitanCFOptions& options,
} }
FixedSlice<BlobFileFooter::kEncodedLength> buffer; FixedSlice<BlobFileFooter::kEncodedLength> buffer;
TRY(file->Read(file_size - BlobFileFooter::kEncodedLength, Status s = file->Read(file_size - BlobFileFooter::kEncodedLength,
BlobFileFooter::kEncodedLength, &buffer, buffer.get())); BlobFileFooter::kEncodedLength, &buffer, buffer.get());
if (!s.ok()) {
return s;
}
BlobFileFooter footer; BlobFileFooter footer;
TRY(DecodeInto(buffer, &footer)); s = DecodeInto(buffer, &footer);
if (!s.ok()) {
return s;
}
auto reader = new BlobFileReader(options, std::move(file), stats); auto reader = new BlobFileReader(options, std::move(file), stats);
reader->footer_ = footer; reader->footer_ = footer;
...@@ -101,7 +115,10 @@ Status BlobFileReader::Get(const ReadOptions& /*options*/, ...@@ -101,7 +115,10 @@ Status BlobFileReader::Get(const ReadOptions& /*options*/,
RecordTick(stats_, BLOCK_CACHE_MISS); RecordTick(stats_, BLOCK_CACHE_MISS);
OwnedSlice blob; OwnedSlice blob;
TRY(ReadRecord(handle, record, &blob)); Status s = ReadRecord(handle, record, &blob);
if (!s.ok()) {
return s;
}
if (cache_) { if (cache_) {
auto cache_value = new OwnedSlice(std::move(blob)); auto cache_value = new OwnedSlice(std::move(blob));
...@@ -121,19 +138,24 @@ Status BlobFileReader::ReadRecord(const BlobHandle& handle, BlobRecord* record, ...@@ -121,19 +138,24 @@ Status BlobFileReader::ReadRecord(const BlobHandle& handle, BlobRecord* record,
OwnedSlice* buffer) { OwnedSlice* buffer) {
Slice blob; Slice blob;
CacheAllocationPtr ubuf(new char[handle.size]); CacheAllocationPtr ubuf(new char[handle.size]);
TRY(file_->Read(handle.offset, handle.size, &blob, ubuf.get())); Status s = file_->Read(handle.offset, handle.size, &blob, ubuf.get());
// something must be wrong if (!s.ok()) {
if (handle.size != blob.size()) { return s;
fprintf(stderr, "ReadRecord actual size:%lu != blob size:%lu\n", }
blob.size(), static_cast<std::size_t>(handle.size)); if (handle.size != static_cast<uint64_t>(blob.size())) {
abort(); return Status::Corruption(
"ReadRecord actual size: " + ToString(blob.size()) +
" not equal to blob size " + ToString(handle.size));
} }
BlobDecoder decoder; BlobDecoder decoder;
TRY(decoder.DecodeHeader(&blob)); s = decoder.DecodeHeader(&blob);
if (!s.ok()) {
return s;
}
buffer->reset(std::move(ubuf), blob); buffer->reset(std::move(ubuf), blob);
TRY(decoder.DecodeRecord(&blob, record, buffer)); s = decoder.DecodeRecord(&blob, record, buffer);
return Status::OK(); return s;
} }
Status BlobFilePrefetcher::Get(const ReadOptions& options, Status BlobFilePrefetcher::Get(const ReadOptions& options,
......
...@@ -72,10 +72,9 @@ Status BlobFileSizeCollector::Finish(UserCollectedProperties* properties) { ...@@ -72,10 +72,9 @@ Status BlobFileSizeCollector::Finish(UserCollectedProperties* properties) {
} }
std::string res; std::string res;
if (!Encode(blob_files_size_, &res) || res.empty()) { bool ok __attribute__((__unused__)) = Encode(blob_files_size_, &res);
fprintf(stderr, "blob file size collector encode failed\n"); assert(ok);
abort(); assert(!res.empty());
}
properties->emplace(std::make_pair(kPropertiesName, res)); properties->emplace(std::make_pair(kPropertiesName, res));
return Status::OK(); return Status::OK();
} }
......
...@@ -43,7 +43,7 @@ void BlobEncoder::EncodeRecord(const BlobRecord& record) { ...@@ -43,7 +43,7 @@ void BlobEncoder::EncodeRecord(const BlobRecord& record) {
record_ = Compress(compression_ctx_, record_buffer_, &compressed_buffer_, record_ = Compress(compression_ctx_, record_buffer_, &compressed_buffer_,
&compression); &compression);
EXPECT(record_.size() < std::numeric_limits<uint32_t>::max()); assert(record_.size() < std::numeric_limits<uint32_t>::max());
EncodeFixed32(header_ + 4, static_cast<uint32_t>(record_.size())); EncodeFixed32(header_ + 4, static_cast<uint32_t>(record_.size()));
header_[8] = compression; header_[8] = compression;
...@@ -82,7 +82,10 @@ Status BlobDecoder::DecodeRecord(Slice* src, BlobRecord* record, ...@@ -82,7 +82,10 @@ Status BlobDecoder::DecodeRecord(Slice* src, BlobRecord* record,
return DecodeInto(input, record); return DecodeInto(input, record);
} }
UncompressionContext ctx(compression_); UncompressionContext ctx(compression_);
TRY(Uncompress(ctx, input, buffer)); Status s = Uncompress(ctx, input, buffer);
if (!s.ok()) {
return s;
}
return DecodeInto(*buffer, record); return DecodeInto(*buffer, record);
} }
...@@ -188,11 +191,7 @@ void BlobFileMeta::FileStateTransit(const FileEvent& event) { ...@@ -188,11 +191,7 @@ void BlobFileMeta::FileStateTransit(const FileEvent& event) {
state_ = FileState::kObsolete; state_ = FileState::kObsolete;
break; break;
default: default:
fprintf(stderr, assert(false);
"Unknown file event[%d], file number[%lu], file state[%d]",
static_cast<int>(event), static_cast<std::size_t>(file_number_),
static_cast<int>(state_));
abort();
} }
} }
......
...@@ -23,8 +23,7 @@ class BlobGCJob::GarbageCollectionWriteCallback : public WriteCallback { ...@@ -23,8 +23,7 @@ class BlobGCJob::GarbageCollectionWriteCallback : public WriteCallback {
nullptr /*value_found*/, nullptr /*value_found*/,
nullptr /*read_callback*/, &is_blob_index); nullptr /*read_callback*/, &is_blob_index);
if (!s.ok() && !s.IsNotFound()) { if (!s.ok() && !s.IsNotFound()) {
fprintf(stderr, "GetImpl err, status:%s\n", s.ToString().c_str()); return s;
abort();
} }
read_bytes_ = key_.size() + index_entry.size(); read_bytes_ = key_.size() + index_entry.size();
if (s.IsNotFound()) { if (s.IsNotFound()) {
...@@ -39,9 +38,7 @@ class BlobGCJob::GarbageCollectionWriteCallback : public WriteCallback { ...@@ -39,9 +38,7 @@ class BlobGCJob::GarbageCollectionWriteCallback : public WriteCallback {
BlobIndex other_blob_index; BlobIndex other_blob_index;
s = other_blob_index.DecodeFrom(&index_entry); s = other_blob_index.DecodeFrom(&index_entry);
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "Decode blob index [%s] failed, err:%s\n", return s;
index_entry.ToString(true).c_str(), s.ToString().c_str());
abort();
} }
if (!(blob_index_ == other_blob_index)) { if (!(blob_index_ == other_blob_index)) {
...@@ -107,9 +104,10 @@ BlobGCJob::~BlobGCJob() { ...@@ -107,9 +104,10 @@ BlobGCJob::~BlobGCJob() {
Status BlobGCJob::Prepare() { return Status::OK(); } Status BlobGCJob::Prepare() { return Status::OK(); }
Status BlobGCJob::Run() { Status BlobGCJob::Run() {
Status s; Status s = SampleCandidateFiles();
if (!s.ok()) {
s = SampleCandidateFiles(); return s;
}
std::string tmp; std::string tmp;
for (const auto& f : blob_gc_->inputs()) { for (const auto& f : blob_gc_->inputs()) {
...@@ -134,7 +132,9 @@ Status BlobGCJob::Run() { ...@@ -134,7 +132,9 @@ Status BlobGCJob::Run() {
log_buffer_->FlushBufferToLog(); log_buffer_->FlushBufferToLog();
LogFlush(db_options_.info_log.get()); LogFlush(db_options_.info_log.get());
if (!s.ok()) return s; if (blob_gc_->sampled_inputs().empty()) {
return Status::OK();
}
return DoRunGC(); return DoRunGC();
} }
...@@ -142,22 +142,27 @@ Status BlobGCJob::Run() { ...@@ -142,22 +142,27 @@ Status BlobGCJob::Run() {
Status BlobGCJob::SampleCandidateFiles() { Status BlobGCJob::SampleCandidateFiles() {
std::vector<BlobFileMeta*> result; std::vector<BlobFileMeta*> result;
for (const auto& file : blob_gc_->inputs()) { for (const auto& file : blob_gc_->inputs()) {
if (DoSample(file)) { bool selected = false;
Status s = DoSample(file, &selected);
if (!s.ok()) {
return s;
}
if (selected) {
result.push_back(file); result.push_back(file);
} }
} }
if (!result.empty()) {
if (result.empty()) return Status::Aborted("No blob file need to be gc"); blob_gc_->set_sampled_inputs(std::move(result));
}
blob_gc_->set_sampled_inputs(std::move(result));
return Status::OK(); return Status::OK();
} }
bool BlobGCJob::DoSample(const BlobFileMeta* file) { Status BlobGCJob::DoSample(const BlobFileMeta* file, bool* selected) {
assert(selected != nullptr);
if (file->GetDiscardableRatio() >= if (file->GetDiscardableRatio() >=
blob_gc_->titan_cf_options().blob_file_discardable_ratio) { blob_gc_->titan_cf_options().blob_file_discardable_ratio) {
return true; *selected = true;
return Status::OK();
} }
// TODO: add do sample count metrics // TODO: add do sample count metrics
...@@ -176,9 +181,7 @@ bool BlobGCJob::DoSample(const BlobFileMeta* file) { ...@@ -176,9 +181,7 @@ bool BlobGCJob::DoSample(const BlobFileMeta* file) {
s = NewBlobFileReader(file->file_number(), readahead, db_options_, s = NewBlobFileReader(file->file_number(), readahead, db_options_,
env_options_, env_, &file_reader); env_options_, env_, &file_reader);
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "NewBlobFileReader failed, status:%s\n", return s;
s.ToString().c_str());
abort();
} }
BlobFileIterator iter(std::move(file_reader), file->file_number(), BlobFileIterator iter(std::move(file_reader), file->file_number(),
file->file_size(), blob_gc_->titan_cf_options()); file->file_size(), blob_gc_->titan_cf_options());
...@@ -189,12 +192,13 @@ bool BlobGCJob::DoSample(const BlobFileMeta* file) { ...@@ -189,12 +192,13 @@ bool BlobGCJob::DoSample(const BlobFileMeta* file) {
iter.IterateForPrev(BlobFileHeader::kEncodedLength); iter.IterateForPrev(BlobFileHeader::kEncodedLength);
} }
if (!iter.status().ok()) { if (!iter.status().ok()) {
fprintf(stderr, s = iter.status();
"IterateForPrev failed, file number[%lu] size[%lu] status[%s]\n", ROCKS_LOG_ERROR(db_options_.info_log,
static_cast<size_t>(file->file_number()), "IterateForPrev failed, file number[%" PRIu64
static_cast<size_t>(file->file_size()), "] size[%" PRIu64 "] status[%s]",
iter.status().ToString().c_str()); file->file_number(), file->file_size(),
abort(); s.ToString().c_str());
return s;
} }
uint64_t iterated_size{0}; uint64_t iterated_size{0};
...@@ -205,16 +209,23 @@ bool BlobGCJob::DoSample(const BlobFileMeta* file) { ...@@ -205,16 +209,23 @@ bool BlobGCJob::DoSample(const BlobFileMeta* file) {
BlobIndex blob_index = iter.GetBlobIndex(); BlobIndex blob_index = iter.GetBlobIndex();
uint64_t total_length = blob_index.blob_handle.size; uint64_t total_length = blob_index.blob_handle.size;
iterated_size += total_length; iterated_size += total_length;
if (DiscardEntry(iter.key(), blob_index)) { bool discardable = false;
s = DiscardEntry(iter.key(), blob_index, &discardable);
if (!s.ok()) {
return s;
}
if (discardable) {
discardable_size += total_length; discardable_size += total_length;
} }
} }
metrics_.blob_db_bytes_read += iterated_size; metrics_.blob_db_bytes_read += iterated_size;
assert(iter.status().ok()); assert(iter.status().ok());
return discardable_size >= *selected =
std::ceil(sample_size_window * discardable_size >=
blob_gc_->titan_cf_options().blob_file_discardable_ratio); std::ceil(sample_size_window *
blob_gc_->titan_cf_options().blob_file_discardable_ratio);
return s;
} }
Status BlobGCJob::DoRunGC() { Status BlobGCJob::DoRunGC() {
...@@ -267,7 +278,12 @@ Status BlobGCJob::DoRunGC() { ...@@ -267,7 +278,12 @@ Status BlobGCJob::DoRunGC() {
last_key_valid = false; last_key_valid = false;
} }
if (DiscardEntry(gc_iter->key(), blob_index)) { bool discardable = false;
s = DiscardEntry(gc_iter->key(), blob_index, &discardable);
if (!s.ok()) {
break;
}
if (discardable) {
metrics_.blob_db_gc_num_keys_overwritten++; metrics_.blob_db_gc_num_keys_overwritten++;
metrics_.blob_db_gc_bytes_overwritten += blob_index.blob_handle.size; metrics_.blob_db_gc_bytes_overwritten += blob_index.blob_handle.size;
continue; continue;
...@@ -365,31 +381,34 @@ Status BlobGCJob::BuildIterator( ...@@ -365,31 +381,34 @@ Status BlobGCJob::BuildIterator(
return s; return s;
} }
bool BlobGCJob::DiscardEntry(const Slice& key, const BlobIndex& blob_index) { Status BlobGCJob::DiscardEntry(const Slice& key, const BlobIndex& blob_index,
bool* discardable) {
assert(discardable != nullptr);
PinnableSlice index_entry; PinnableSlice index_entry;
bool is_blob_index; bool is_blob_index = false;
auto s = base_db_impl_->GetImpl( Status s = base_db_impl_->GetImpl(
ReadOptions(), blob_gc_->column_family_handle(), key, &index_entry, ReadOptions(), blob_gc_->column_family_handle(), key, &index_entry,
nullptr /*value_found*/, nullptr /*read_callback*/, &is_blob_index); nullptr /*value_found*/, nullptr /*read_callback*/, &is_blob_index);
if (!s.ok() && !s.IsNotFound()) { if (!s.ok() && !s.IsNotFound()) {
fprintf(stderr, "GetImpl err, status:%s\n", s.ToString().c_str()); return s;
abort();
} }
// count read bytes for checking LSM entry // count read bytes for checking LSM entry
metrics_.blob_db_bytes_read += key.size() + index_entry.size(); metrics_.blob_db_bytes_read += key.size() + index_entry.size();
if (s.IsNotFound() || !is_blob_index) { if (s.IsNotFound() || !is_blob_index) {
// Either the key is deleted or updated with a newer version which is // Either the key is deleted or updated with a newer version which is
// inlined in LSM. // inlined in LSM.
return true; *discardable = true;
return Status::OK();
} }
BlobIndex other_blob_index; BlobIndex other_blob_index;
s = other_blob_index.DecodeFrom(&index_entry); s = other_blob_index.DecodeFrom(&index_entry);
if (!s.ok()) { if (!s.ok()) {
abort(); return s;
} }
return !(blob_index == other_blob_index); *discardable = !(blob_index == other_blob_index);
return Status::OK();
} }
// We have to make sure crash consistency, but LSM db MANIFEST and BLOB db // We have to make sure crash consistency, but LSM db MANIFEST and BLOB db
......
...@@ -72,10 +72,11 @@ class BlobGCJob { ...@@ -72,10 +72,11 @@ class BlobGCJob {
} metrics_; } metrics_;
Status SampleCandidateFiles(); Status SampleCandidateFiles();
bool DoSample(const BlobFileMeta* file); Status DoSample(const BlobFileMeta* file, bool* selected);
Status DoRunGC(); Status DoRunGC();
Status BuildIterator(std::unique_ptr<BlobFileMergeIterator>* result); Status BuildIterator(std::unique_ptr<BlobFileMergeIterator>* result);
bool DiscardEntry(const Slice& key, const BlobIndex& blob_index); Status DiscardEntry(const Slice& key, const BlobIndex& blob_index,
bool* discardable);
Status InstallOutputBlobFiles(); Status InstallOutputBlobFiles();
Status RewriteValidKeyToLSM(); Status RewriteValidKeyToLSM();
Status DeleteInputBlobFiles(); Status DeleteInputBlobFiles();
......
...@@ -190,7 +190,9 @@ class BlobGCJobTest : public testing::Test { ...@@ -190,7 +190,9 @@ class BlobGCJobTest : public testing::Test {
BlobGCJob blob_gc_job(&blob_gc, base_db_, mutex_, TitanDBOptions(), BlobGCJob blob_gc_job(&blob_gc, base_db_, mutex_, TitanDBOptions(),
Env::Default(), EnvOptions(), nullptr, version_set_, Env::Default(), EnvOptions(), nullptr, version_set_,
nullptr, nullptr, nullptr); nullptr, nullptr, nullptr);
ASSERT_FALSE(blob_gc_job.DiscardEntry(key, blob_index)); bool discardable = false;
ASSERT_OK(blob_gc_job.DiscardEntry(key, blob_index, &discardable));
ASSERT_FALSE(discardable);
DestroyDB(); DestroyDB();
} }
......
#include "db_impl.h" #include "db_impl.h"
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include "port/port.h"
#include "base_db_listener.h" #include "base_db_listener.h"
#include "blob_file_builder.h" #include "blob_file_builder.h"
#include "blob_file_iterator.h" #include "blob_file_iterator.h"
...@@ -8,12 +16,6 @@ ...@@ -8,12 +16,6 @@
#include "db_iter.h" #include "db_iter.h"
#include "table_factory.h" #include "table_factory.h"
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
...@@ -535,7 +537,8 @@ Iterator* TitanDBImpl::NewIteratorImpl( ...@@ -535,7 +537,8 @@ Iterator* TitanDBImpl::NewIteratorImpl(
options, cfd, options.snapshot->GetSequenceNumber(), options, cfd, options.snapshot->GetSequenceNumber(),
nullptr /*read_callback*/, true /*allow_blob*/, true /*allow_refresh*/)); nullptr /*read_callback*/, true /*allow_blob*/, true /*allow_refresh*/));
return new TitanDBIterator(options, storage.lock().get(), snapshot, return new TitanDBIterator(options, storage.lock().get(), snapshot,
std::move(iter), env_, stats_.get()); std::move(iter), env_, stats_.get(),
db_options_.info_log.get());
} }
Status TitanDBImpl::NewIterators( Status TitanDBImpl::NewIterators(
...@@ -666,9 +669,12 @@ void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) { ...@@ -666,9 +669,12 @@ void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) {
std::map<uint64_t, uint64_t> blob_files_size; std::map<uint64_t, uint64_t> blob_files_size;
Slice src{ucp_iter->second}; Slice src{ucp_iter->second};
if (!BlobFileSizeCollector::Decode(&src, &blob_files_size)) { if (!BlobFileSizeCollector::Decode(&src, &blob_files_size)) {
fprintf(stderr, "BlobFileSizeCollector::Decode failed size:%lu\n", // TODO: Should treat it as background error and make DB read-only.
ucp_iter->second.size()); ROCKS_LOG_ERROR(db_options_.info_log,
abort(); "OnFlushCompleted[%d]: failed to decode table property, "
"prroperty size: %" ROCKSDB_PRIszt ".",
flush_job_info.job_id, ucp_iter->second.size());
assert(false);
} }
assert(!blob_files_size.empty()); assert(!blob_files_size.empty());
std::set<uint64_t> outputs; std::set<uint64_t> outputs;
...@@ -680,8 +686,13 @@ void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) { ...@@ -680,8 +686,13 @@ void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) {
MutexLock l(&mutex_); MutexLock l(&mutex_);
auto blob_storage = vset_->GetBlobStorage(flush_job_info.cf_id).lock(); auto blob_storage = vset_->GetBlobStorage(flush_job_info.cf_id).lock();
if (!blob_storage) { if (!blob_storage) {
fprintf(stderr, "Column family id:%u Not Found\n", flush_job_info.cf_id); // TODO: Should treat it as background error and make DB read-only.
abort(); ROCKS_LOG_ERROR(db_options_.info_log,
"OnFlushCompleted[%d]: Column family id: %" PRIu32
" Not Found.",
flush_job_info.job_id, flush_job_info.cf_id);
assert(false);
return;
} }
for (const auto& file_number : outputs) { for (const auto& file_number : outputs) {
auto file = blob_storage->FindFile(file_number).lock(); auto file = blob_storage->FindFile(file_number).lock();
...@@ -696,18 +707,23 @@ void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) { ...@@ -696,18 +707,23 @@ void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) {
void TitanDBImpl::OnCompactionCompleted( void TitanDBImpl::OnCompactionCompleted(
const CompactionJobInfo& compaction_job_info) { const CompactionJobInfo& compaction_job_info) {
if (!compaction_job_info.status.ok()) {
// TODO: Clean up blob file generated by the failed compaction.
return;
}
std::map<uint64_t, int64_t> blob_files_size; std::map<uint64_t, int64_t> blob_files_size;
std::set<uint64_t> outputs; std::set<uint64_t> outputs;
std::set<uint64_t> inputs; std::set<uint64_t> inputs;
auto calc_bfs = [&compaction_job_info, &blob_files_size, &outputs, &inputs]( auto calc_bfs = [&](const std::vector<std::string>& files, int coefficient,
const std::vector<std::string>& files, int coefficient,
bool output) { bool output) {
for (const auto& file : files) { for (const auto& file : files) {
auto tp_iter = compaction_job_info.table_properties.find(file); auto tp_iter = compaction_job_info.table_properties.find(file);
if (tp_iter == compaction_job_info.table_properties.end()) { if (tp_iter == compaction_job_info.table_properties.end()) {
if (output) { if (output) {
fprintf(stderr, "can't find property for output\n"); ROCKS_LOG_WARN(
abort(); db_options_.info_log,
"OnCompactionCompleted[%d]: No table properties for file %s.",
compaction_job_info.job_id, file.c_str());
} }
continue; continue;
} }
...@@ -721,8 +737,14 @@ void TitanDBImpl::OnCompactionCompleted( ...@@ -721,8 +737,14 @@ void TitanDBImpl::OnCompactionCompleted(
std::string s = ucp_iter->second; std::string s = ucp_iter->second;
Slice slice{s}; Slice slice{s};
if (!BlobFileSizeCollector::Decode(&slice, &input_blob_files_size)) { if (!BlobFileSizeCollector::Decode(&slice, &input_blob_files_size)) {
fprintf(stderr, "BlobFileSizeCollector::Decode failed\n"); // TODO: Should treat it as background error and make DB read-only.
abort(); ROCKS_LOG_ERROR(
db_options_.info_log,
"OnCompactionCompleted[%d]: failed to decode table property, "
"compaction file: %s, property size: %" ROCKSDB_PRIszt ".",
compaction_job_info.job_id, file.c_str(), s.size());
assert(false);
continue;
} }
for (const auto& input_bfs : input_blob_files_size) { for (const auto& input_bfs : input_blob_files_size) {
if (output) { if (output) {
...@@ -749,15 +771,23 @@ void TitanDBImpl::OnCompactionCompleted( ...@@ -749,15 +771,23 @@ void TitanDBImpl::OnCompactionCompleted(
MutexLock l(&mutex_); MutexLock l(&mutex_);
auto bs = vset_->GetBlobStorage(compaction_job_info.cf_id).lock(); auto bs = vset_->GetBlobStorage(compaction_job_info.cf_id).lock();
if (!bs) { if (!bs) {
fprintf(stderr, "Column family id:%u Not Found\n", // TODO: Should treat it as background error and make DB read-only.
compaction_job_info.cf_id); ROCKS_LOG_ERROR(db_options_.info_log,
"OnCompactionCompleted[%d] Column family id:% " PRIu32
" not Found.",
compaction_job_info.job_id, compaction_job_info.cf_id);
return; return;
} }
for (const auto& o : outputs) { for (const auto& file_number : outputs) {
auto file = bs->FindFile(o).lock(); auto file = bs->FindFile(file_number).lock();
if (!file) { if (!file) {
fprintf(stderr, "OnCompactionCompleted get file failed\n"); // TODO: Should treat it as background error and make DB read-only.
abort(); ROCKS_LOG_ERROR(
db_options_.info_log,
"OnCompactionCompleted[%d]: Failed to get file %" PRIu64,
compaction_job_info.job_id, file_number);
assert(false);
return;
} }
file->FileStateTransit(BlobFileMeta::FileEvent::kCompactionCompleted); file->FileStateTransit(BlobFileMeta::FileEvent::kCompactionCompleted);
} }
......
...@@ -24,6 +24,10 @@ Status TitanDBImpl::PurgeObsoleteFilesImpl() { ...@@ -24,6 +24,10 @@ Status TitanDBImpl::PurgeObsoleteFilesImpl() {
candidate_file.c_str()); candidate_file.c_str());
Status delete_status = env_->DeleteFile(candidate_file); Status delete_status = env_->DeleteFile(candidate_file);
if (!s.ok()) { if (!s.ok()) {
// Move on despite error deleting the file.
ROCKS_LOG_ERROR(db_options_.info_log,
"Titan deleting file [%s] failed, status:%s",
candidate_file.c_str(), s.ToString().c_str());
s = delete_status; s = delete_status;
} }
} }
......
...@@ -6,7 +6,13 @@ ...@@ -6,7 +6,13 @@
#include <inttypes.h> #include <inttypes.h>
#include <memory>
#include <unordered_map>
#include "db/db_iter.h" #include "db/db_iter.h"
#include "rocksdb/env.h"
#include "util/logging.h"
#include "titan_stats.h" #include "titan_stats.h"
namespace rocksdb { namespace rocksdb {
...@@ -17,13 +23,14 @@ class TitanDBIterator : public Iterator { ...@@ -17,13 +23,14 @@ class TitanDBIterator : public Iterator {
TitanDBIterator(const TitanReadOptions& options, BlobStorage* storage, TitanDBIterator(const TitanReadOptions& options, BlobStorage* storage,
std::shared_ptr<ManagedSnapshot> snap, std::shared_ptr<ManagedSnapshot> snap,
std::unique_ptr<ArenaWrappedDBIter> iter, Env* env, std::unique_ptr<ArenaWrappedDBIter> iter, Env* env,
TitanStats* stats) TitanStats* stats, Logger* info_log)
: options_(options), : options_(options),
storage_(storage), storage_(storage),
snap_(snap), snap_(snap),
iter_(std::move(iter)), iter_(std::move(iter)),
env_(env), env_(env),
stats_(stats) {} stats_(stats),
info_log_(info_log) {}
bool Valid() const override { return iter_->Valid() && status_.ok(); } bool Valid() const override { return iter_->Valid() && status_.ok(); }
...@@ -38,7 +45,7 @@ class TitanDBIterator : public Iterator { ...@@ -38,7 +45,7 @@ class TitanDBIterator : public Iterator {
void SeekToFirst() override { void SeekToFirst() override {
iter_->SeekToFirst(); iter_->SeekToFirst();
if (Check()) { if (ShouldGetBlobValue()) {
StopWatch seek_sw(env_, statistics(stats_), BLOB_DB_SEEK_MICROS); StopWatch seek_sw(env_, statistics(stats_), BLOB_DB_SEEK_MICROS);
GetBlobValue(); GetBlobValue();
RecordTick(stats_, BLOB_DB_NUM_SEEK); RecordTick(stats_, BLOB_DB_NUM_SEEK);
...@@ -47,7 +54,7 @@ class TitanDBIterator : public Iterator { ...@@ -47,7 +54,7 @@ class TitanDBIterator : public Iterator {
void SeekToLast() override { void SeekToLast() override {
iter_->SeekToLast(); iter_->SeekToLast();
if (Check()) { if (ShouldGetBlobValue()) {
StopWatch seek_sw(env_, statistics(stats_), BLOB_DB_SEEK_MICROS); StopWatch seek_sw(env_, statistics(stats_), BLOB_DB_SEEK_MICROS);
GetBlobValue(); GetBlobValue();
RecordTick(stats_, BLOB_DB_NUM_SEEK); RecordTick(stats_, BLOB_DB_NUM_SEEK);
...@@ -56,7 +63,7 @@ class TitanDBIterator : public Iterator { ...@@ -56,7 +63,7 @@ class TitanDBIterator : public Iterator {
void Seek(const Slice& target) override { void Seek(const Slice& target) override {
iter_->Seek(target); iter_->Seek(target);
if (Check()) { if (ShouldGetBlobValue()) {
StopWatch seek_sw(env_, statistics(stats_), BLOB_DB_SEEK_MICROS); StopWatch seek_sw(env_, statistics(stats_), BLOB_DB_SEEK_MICROS);
GetBlobValue(); GetBlobValue();
RecordTick(stats_, BLOB_DB_NUM_SEEK); RecordTick(stats_, BLOB_DB_NUM_SEEK);
...@@ -65,7 +72,7 @@ class TitanDBIterator : public Iterator { ...@@ -65,7 +72,7 @@ class TitanDBIterator : public Iterator {
void SeekForPrev(const Slice& target) override { void SeekForPrev(const Slice& target) override {
iter_->SeekForPrev(target); iter_->SeekForPrev(target);
if (Check()) { if (ShouldGetBlobValue()) {
StopWatch seek_sw(env_, statistics(stats_), BLOB_DB_SEEK_MICROS); StopWatch seek_sw(env_, statistics(stats_), BLOB_DB_SEEK_MICROS);
GetBlobValue(); GetBlobValue();
RecordTick(stats_, BLOB_DB_NUM_SEEK); RecordTick(stats_, BLOB_DB_NUM_SEEK);
...@@ -75,7 +82,7 @@ class TitanDBIterator : public Iterator { ...@@ -75,7 +82,7 @@ class TitanDBIterator : public Iterator {
void Next() override { void Next() override {
assert(Valid()); assert(Valid());
iter_->Next(); iter_->Next();
if (Check()) { if (ShouldGetBlobValue()) {
StopWatch next_sw(env_, statistics(stats_), BLOB_DB_NEXT_MICROS); StopWatch next_sw(env_, statistics(stats_), BLOB_DB_NEXT_MICROS);
GetBlobValue(); GetBlobValue();
RecordTick(stats_, BLOB_DB_NUM_NEXT); RecordTick(stats_, BLOB_DB_NUM_NEXT);
...@@ -85,7 +92,7 @@ class TitanDBIterator : public Iterator { ...@@ -85,7 +92,7 @@ class TitanDBIterator : public Iterator {
void Prev() override { void Prev() override {
assert(Valid()); assert(Valid());
iter_->Prev(); iter_->Prev();
if (Check()) { if (ShouldGetBlobValue()) {
StopWatch prev_sw(env_, statistics(stats_), BLOB_DB_PREV_MICROS); StopWatch prev_sw(env_, statistics(stats_), BLOB_DB_PREV_MICROS);
GetBlobValue(); GetBlobValue();
RecordTick(stats_, BLOB_DB_NUM_PREV); RecordTick(stats_, BLOB_DB_NUM_PREV);
...@@ -105,7 +112,7 @@ class TitanDBIterator : public Iterator { ...@@ -105,7 +112,7 @@ class TitanDBIterator : public Iterator {
} }
private: private:
bool Check() { bool ShouldGetBlobValue() {
if (!iter_->Valid() || !iter_->IsBlob() || options_.key_only) { if (!iter_->Valid() || !iter_->IsBlob() || options_.key_only) {
status_ = iter_->status(); status_ = iter_->status();
return false; return false;
...@@ -119,31 +126,38 @@ class TitanDBIterator : public Iterator { ...@@ -119,31 +126,38 @@ class TitanDBIterator : public Iterator {
BlobIndex index; BlobIndex index;
status_ = DecodeInto(iter_->value(), &index); status_ = DecodeInto(iter_->value(), &index);
if (!status_.ok()) { if (!status_.ok()) {
fprintf(stderr, "GetBlobValue decode blob index err:%s\n", ROCKS_LOG_ERROR(info_log_,
status_.ToString().c_str()); "Titan iterator: failed to decode blob index %s: %s",
abort(); iter_->value().ToString(true /*hex*/).c_str(),
status_.ToString().c_str());
return;
} }
auto it = files_.find(index.file_number); auto it = files_.find(index.file_number);
if (it == files_.end()) { if (it == files_.end()) {
std::unique_ptr<BlobFilePrefetcher> prefetcher; std::unique_ptr<BlobFilePrefetcher> prefetcher;
status_ = storage_->NewPrefetcher(index.file_number, &prefetcher); status_ = storage_->NewPrefetcher(index.file_number, &prefetcher);
if (status_.IsCorruption()) { if (!status_.ok()) {
// If use `DeleteFilesInRanges`, we may encounter this failure, ROCKS_LOG_ERROR(
// because `DeleteFilesInRanges` may expose an old key whose info_log_,
// corresponding blob file has already been GCed out, so we "Titan iterator: failed to create prefetcher for blob file %" PRIu64
// cannot abort here. ": %s",
fprintf(stderr, index.file_number, status_.ToString().c_str());
"key:%s GetBlobValue err:%s with sequence number:%" PRIu64 "\n", return;
iter_->key().ToString(true).c_str(), status_.ToString().c_str(),
options_.snapshot->GetSequenceNumber());
} }
if (!status_.ok()) return;
it = files_.emplace(index.file_number, std::move(prefetcher)).first; it = files_.emplace(index.file_number, std::move(prefetcher)).first;
} }
buffer_.Reset(); buffer_.Reset();
status_ = it->second->Get(options_, index.blob_handle, &record_, &buffer_); status_ = it->second->Get(options_, index.blob_handle, &record_, &buffer_);
if (!status_.ok()) {
ROCKS_LOG_ERROR(
info_log_,
"Titan iterator: failed to read blob value from file %" PRIu64
", offset %" PRIu64 ", size %" PRIu64 ": %s\n",
index.file_number, index.blob_handle.offset, index.blob_handle.size,
status_.ToString().c_str());
}
return; return;
} }
...@@ -155,10 +169,11 @@ class TitanDBIterator : public Iterator { ...@@ -155,10 +169,11 @@ class TitanDBIterator : public Iterator {
BlobStorage* storage_; BlobStorage* storage_;
std::shared_ptr<ManagedSnapshot> snap_; std::shared_ptr<ManagedSnapshot> snap_;
std::unique_ptr<ArenaWrappedDBIter> iter_; std::unique_ptr<ArenaWrappedDBIter> iter_;
std::map<uint64_t, std::unique_ptr<BlobFilePrefetcher>> files_; std::unordered_map<uint64_t, std::unique_ptr<BlobFilePrefetcher>> files_;
Env* env_; Env* env_;
TitanStats* stats_; TitanStats* stats_;
Logger* info_log_;
}; };
} // namespace titandb } // namespace titandb
......
...@@ -6,17 +6,6 @@ ...@@ -6,17 +6,6 @@
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
#define TRY(expr) \
do { \
auto s = (expr); \
if (!s.ok()) return s; \
} while (0)
#define EXPECT(expr) \
do { \
if (!(expr)) abort(); \
} while (0)
// A slice pointed to an owned buffer. // A slice pointed to an owned buffer.
class OwnedSlice : public Slice { class OwnedSlice : public Slice {
public: public:
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
#include <inttypes.h> #include <inttypes.h>
#include "util/filename.h" #include "util/filename.h"
#include "util/string_util.h"
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
...@@ -223,12 +224,11 @@ Status VersionSet::Apply(VersionEdit* edit) { ...@@ -223,12 +224,11 @@ Status VersionSet::Apply(VersionEdit* edit) {
auto number = file.first; auto number = file.first;
auto blob_it = files.find(number); auto blob_it = files.find(number);
if (blob_it == files.end()) { if (blob_it == files.end()) {
fprintf(stderr, "blob file %" PRIu64 " doesn't exist before\n", number); return Status::Corruption("Blob file " + ToString(number) +
abort(); " doesn't exist before.");
} else if (blob_it->second->is_obsolete()) { } else if (blob_it->second->is_obsolete()) {
fprintf(stderr, "blob file %" PRIu64 " has been deleted before\n", return Status::Corruption("Blob file " + ToString(number) +
number); " to delete has been deleted before.");
abort();
} }
it->second->MarkFileObsolete(blob_it->second, file.second); it->second->MarkFileObsolete(blob_it->second, file.second);
} }
...@@ -238,13 +238,12 @@ Status VersionSet::Apply(VersionEdit* edit) { ...@@ -238,13 +238,12 @@ Status VersionSet::Apply(VersionEdit* edit) {
auto blob_it = files.find(number); auto blob_it = files.find(number);
if (blob_it != files.end()) { if (blob_it != files.end()) {
if (blob_it->second->is_obsolete()) { if (blob_it->second->is_obsolete()) {
fprintf(stderr, "blob file %" PRIu64 " has been deleted before\n", return Status::Corruption("Blob file " + ToString(number) +
number); " to add has been deleted before.");
} else { } else {
fprintf(stderr, "blob file %" PRIu64 " has been added before\n", return Status::Corruption("Blob file " + ToString(number) +
number); " has been added before.");
} }
abort();
} }
it->second->AddBlobFile(file); it->second->AddBlobFile(file);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment