Unverified Commit 73b29188 authored by yiwu-arbug's avatar yiwu-arbug Committed by GitHub

Add clang-format script and CI job (#9)

Adding scripts/format-diff.sh to use clang-format to format code changes since diverge from master. Also adding travis CI job to check if the code needs to format. Also format all existing code.
Signed-off-by: 's avatarYi Wu <yiwu@pingcap.com>
parent 05a37557
...@@ -9,10 +9,12 @@ addons: ...@@ -9,10 +9,12 @@ addons:
apt: apt:
sources: sources:
- ubuntu-toolchain-r-test - ubuntu-toolchain-r-test
- llvm-toolchain-xenial-7
packages: packages:
- g++-7 - g++-7
- libgflags-dev - libgflags-dev
- lcov - lcov
- clang-format-7
# For GCC build, we also report code coverage to codecov. # For GCC build, we also report code coverage to codecov.
matrix: matrix:
...@@ -26,28 +28,31 @@ matrix: ...@@ -26,28 +28,31 @@ matrix:
- compiler: clang - compiler: clang
env: SANITIZER="UBSAN" env: SANITIZER="UBSAN"
- env: COMPILER=gcc7 - env: COMPILER=gcc7
- env: FORMATTER=ON
install: install:
- git clone --depth=1 --branch=tikv-3.0 https://github.com/pingcap/rocksdb.git - git clone --depth=1 --branch=tikv-3.0 https://github.com/pingcap/rocksdb.git
- if [ "${COMPILER}" == gcc7 ]; then - if [ "${COMPILER}" == gcc7 ]; then
CC=gcc-7; CC=gcc-7;
CXX=g++-7; CXX=g++-7;
COVERAGE_OPT="-DCODE_COVERAGE=ON"; export COVERAGE_OPT="-DCODE_COVERAGE=ON";
fi fi
- if [ ! -z "${BUILD_TYPE}" ]; then - if [ ! -z "${BUILD_TYPE}" ]; then
BUILD_OPT="-DCMAKE_BUILD_TYPE=${BUILD_TYPE}"; export BUILD_OPT="-DCMAKE_BUILD_TYPE=${BUILD_TYPE}";
else else
BUILD_OPT="-DCMAKE_BUILD_TYPE=Debug"; export BUILD_OPT="-DCMAKE_BUILD_TYPE=Debug";
fi fi
- if [ ! -z "${SANITIZER}" ]; then - if [ ! -z "${SANITIZER}" ]; then
SANITIZER_OPT="-DWITH_${SANITIZER}=ON"; export SANITIZER_OPT="-DWITH_${SANITIZER}=ON";
TOOLS_OPT="-DWITH_TITAN_TOOLS=OFF"; export TOOLS_OPT="-DWITH_TITAN_TOOLS=OFF";
fi fi
script: script:
- cmake . -L -DROCKSDB_DIR=./rocksdb -DTRAVIS=ON ${BUILD_OPT} ${SANITIZER_OPT} ${TOOLS_OPT} ${COVERAGE_OPT} - if [ -z "${FORMATTER}" ]; then
- make -j4 bash scripts/travis-make.sh;
- ctest -R titan else
bash scripts/travis-format.sh;
fi
after_success: after_success:
- if [ "${COMPILER}" == gcc7 ]; then - if [ "${COMPILER}" == gcc7 ]; then
......
...@@ -32,4 +32,7 @@ cmake .. -DROCKSDB_DIR=<rocksdb_source_dir> -DWITH_SNAPPY=ON ...@@ -32,4 +32,7 @@ cmake .. -DROCKSDB_DIR=<rocksdb_source_dir> -DWITH_SNAPPY=ON
# Run tests after build. You need to filter tests by "titan" prefix. # Run tests after build. You need to filter tests by "titan" prefix.
ctest -R titan ctest -R titan
# To format code, install clang-format and run the script.
bash scripts/format-diff.sh
``` ```
...@@ -46,8 +46,7 @@ class TitanDB : public StackableDB { ...@@ -46,8 +46,7 @@ class TitanDB : public StackableDB {
using StackableDB::CreateColumnFamilies; using StackableDB::CreateColumnFamilies;
Status CreateColumnFamilies( Status CreateColumnFamilies(
const ColumnFamilyOptions& options, const ColumnFamilyOptions& options, const std::vector<std::string>& names,
const std::vector<std::string>& names,
std::vector<ColumnFamilyHandle*>* handles) override { std::vector<ColumnFamilyHandle*>* handles) override {
std::vector<TitanCFDescriptor> descs; std::vector<TitanCFDescriptor> descs;
for (auto& name : names) { for (auto& name : names) {
...@@ -75,7 +74,8 @@ class TitanDB : public StackableDB { ...@@ -75,7 +74,8 @@ class TitanDB : public StackableDB {
Status DropColumnFamilies( Status DropColumnFamilies(
const std::vector<ColumnFamilyHandle*>& handles) override = 0; const std::vector<ColumnFamilyHandle*>& handles) override = 0;
Status DestroyColumnFamilyHandle(ColumnFamilyHandle* column_family) override = 0; Status DestroyColumnFamilyHandle(ColumnFamilyHandle* column_family) override =
0;
using StackableDB::Merge; using StackableDB::Merge;
Status Merge(const WriteOptions&, ColumnFamilyHandle*, const Slice& /*key*/, Status Merge(const WriteOptions&, ColumnFamilyHandle*, const Slice& /*key*/,
......
#!/bin/bash
git diff `git merge-base master HEAD` | clang-format-diff -style=google -p1 -i
#!/bin/bash
set -ev
git fetch --depth=1 origin master:master;
git diff $(git merge-base master HEAD) HEAD > diff;
cat diff | clang-format-diff-7 -style=google -p1 > formatted;
if [ -s formatted ]; then
cat formatted;
echo "Run scripts/format-diff.sh to format your code.";
exit 1;
fi;
#!/bin/bash
set -ev
cmake . -L -DROCKSDB_DIR=./rocksdb -DTRAVIS=ON ${BUILD_OPT} ${SANITIZER_OPT} ${TOOLS_OPT} ${COVERAGE_OPT}
make -j4
ctest -R titan
#pragma once #pragma once
#include "rocksdb/listener.h"
#include "db_impl.h" #include "db_impl.h"
#include "rocksdb/listener.h"
namespace rocksdb { namespace rocksdb {
......
#pragma once #pragma once
#include "util/file_reader_writer.h"
#include "blob_format.h" #include "blob_format.h"
#include "titan/options.h" #include "titan/options.h"
#include "util/file_reader_writer.h"
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
......
#include "blob_file_cache.h" #include "blob_file_cache.h"
#include "util/filename.h"
#include "util.h" #include "util.h"
#include "util/filename.h"
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
......
#pragma once #pragma once
#include "rocksdb/options.h"
#include "blob_file_reader.h" #include "blob_file_reader.h"
#include "blob_format.h" #include "blob_format.h"
#include "rocksdb/options.h"
#include "titan/options.h" #include "titan/options.h"
namespace rocksdb { namespace rocksdb {
......
#include "blob_file_iterator.h" #include "blob_file_iterator.h"
#include "util/crc32c.h"
#include "util.h" #include "util.h"
#include "util/crc32c.h"
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
......
...@@ -3,13 +3,13 @@ ...@@ -3,13 +3,13 @@
#include <cstdint> #include <cstdint>
#include <queue> #include <queue>
#include "blob_format.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "table/internal_iterator.h" #include "table/internal_iterator.h"
#include "util/file_reader_writer.h"
#include "blob_format.h"
#include "titan/options.h" #include "titan/options.h"
#include "util.h" #include "util.h"
#include "util/file_reader_writer.h"
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
......
...@@ -2,11 +2,11 @@ ...@@ -2,11 +2,11 @@
#include <cinttypes> #include <cinttypes>
#include "util/filename.h"
#include "util/testharness.h"
#include "blob_file_builder.h" #include "blob_file_builder.h"
#include "blob_file_cache.h" #include "blob_file_cache.h"
#include "blob_file_reader.h" #include "blob_file_reader.h"
#include "util/filename.h"
#include "util/testharness.h"
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
...@@ -57,7 +57,8 @@ class BlobFileIteratorTest : public testing::Test { ...@@ -57,7 +57,8 @@ class BlobFileIteratorTest : public testing::Test {
{ {
std::unique_ptr<WritableFile> f; std::unique_ptr<WritableFile> f;
ASSERT_OK(env_->NewWritableFile(file_name_, &f, env_options_)); ASSERT_OK(env_->NewWritableFile(file_name_, &f, env_options_));
writable_file_.reset(new WritableFileWriter(std::move(f), file_name_, env_options_)); writable_file_.reset(
new WritableFileWriter(std::move(f), file_name_, env_options_));
} }
builder_.reset(new BlobFileBuilder(cf_options, writable_file_.get())); builder_.reset(new BlobFileBuilder(cf_options, writable_file_.get()));
} }
......
#pragma once #pragma once
#include "util/file_reader_writer.h"
#include "blob_format.h" #include "blob_format.h"
#include "util/file_reader_writer.h"
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
......
#pragma once #pragma once
#include "util/file_reader_writer.h"
#include "blob_format.h" #include "blob_format.h"
#include "titan/options.h" #include "titan/options.h"
#include "util/file_reader_writer.h"
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
......
#pragma once #pragma once
#include "db_impl.h"
#include "rocksdb/listener.h" #include "rocksdb/listener.h"
#include "rocksdb/table_properties.h" #include "rocksdb/table_properties.h"
#include "util/coding.h" #include "util/coding.h"
#include "db_impl.h"
#include "version_set.h" #include "version_set.h"
namespace rocksdb { namespace rocksdb {
......
...@@ -40,8 +40,8 @@ class BlobFileSizeCollectorTest : public testing::Test { ...@@ -40,8 +40,8 @@ class BlobFileSizeCollectorTest : public testing::Test {
void NewFileWriter(std::unique_ptr<WritableFileWriter>* result) { void NewFileWriter(std::unique_ptr<WritableFileWriter>* result) {
std::unique_ptr<WritableFile> writable_file; std::unique_ptr<WritableFile> writable_file;
ASSERT_OK(env_->NewWritableFile(file_name_, &writable_file, env_options_)); ASSERT_OK(env_->NewWritableFile(file_name_, &writable_file, env_options_));
result->reset( result->reset(new WritableFileWriter(std::move(writable_file), file_name_,
new WritableFileWriter(std::move(writable_file), file_name_, env_options_)); env_options_));
ASSERT_TRUE(*result); ASSERT_TRUE(*result);
} }
......
#include "util/filename.h"
#include "util/testharness.h"
#include "blob_file_builder.h" #include "blob_file_builder.h"
#include "blob_file_cache.h" #include "blob_file_cache.h"
#include "blob_file_reader.h" #include "blob_file_reader.h"
#include "util/filename.h"
#include "util/testharness.h"
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
...@@ -31,7 +31,8 @@ class BlobFileTest : public testing::Test { ...@@ -31,7 +31,8 @@ class BlobFileTest : public testing::Test {
{ {
std::unique_ptr<WritableFile> f; std::unique_ptr<WritableFile> f;
ASSERT_OK(env_->NewWritableFile(file_name_, &f, env_options_)); ASSERT_OK(env_->NewWritableFile(file_name_, &f, env_options_));
file.reset(new WritableFileWriter(std::move(f), file_name_, env_options_)); file.reset(
new WritableFileWriter(std::move(f), file_name_, env_options_));
} }
std::unique_ptr<BlobFileBuilder> builder( std::unique_ptr<BlobFileBuilder> builder(
new BlobFileBuilder(cf_options, file.get())); new BlobFileBuilder(cf_options, file.get()));
...@@ -91,7 +92,8 @@ class BlobFileTest : public testing::Test { ...@@ -91,7 +92,8 @@ class BlobFileTest : public testing::Test {
{ {
std::unique_ptr<WritableFile> f; std::unique_ptr<WritableFile> f;
ASSERT_OK(env_->NewWritableFile(file_name_, &f, env_options_)); ASSERT_OK(env_->NewWritableFile(file_name_, &f, env_options_));
file.reset(new WritableFileWriter(std::move(f), file_name_, env_options_)); file.reset(
new WritableFileWriter(std::move(f), file_name_, env_options_));
} }
std::unique_ptr<BlobFileBuilder> builder( std::unique_ptr<BlobFileBuilder> builder(
new BlobFileBuilder(cf_options, file.get())); new BlobFileBuilder(cf_options, file.get()));
......
#include "blob_format.h" #include "blob_format.h"
#include "util/testharness.h"
#include "testutil.h" #include "testutil.h"
#include "util.h" #include "util.h"
#include "util/testharness.h"
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
......
...@@ -12,9 +12,7 @@ BlobGC::BlobGC(std::vector<BlobFileMeta*>&& blob_files, ...@@ -12,9 +12,7 @@ BlobGC::BlobGC(std::vector<BlobFileMeta*>&& blob_files,
BlobGC::~BlobGC() {} BlobGC::~BlobGC() {}
void BlobGC::SetColumnFamily(ColumnFamilyHandle* cfh) { void BlobGC::SetColumnFamily(ColumnFamilyHandle* cfh) { cfh_ = cfh; }
cfh_ = cfh;
}
ColumnFamilyData* BlobGC::GetColumnFamilyData() { ColumnFamilyData* BlobGC::GetColumnFamilyData() {
auto* cfhi = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh_); auto* cfhi = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh_);
......
...@@ -2,8 +2,8 @@ ...@@ -2,8 +2,8 @@
#include <memory> #include <memory>
#include "db/column_family.h"
#include "blob_format.h" #include "blob_format.h"
#include "db/column_family.h"
#include "titan/options.h" #include "titan/options.h"
namespace rocksdb { namespace rocksdb {
......
#pragma once #pragma once
#include "db/db_impl.h"
#include "rocksdb/status.h"
#include "blob_file_builder.h" #include "blob_file_builder.h"
#include "blob_file_iterator.h" #include "blob_file_iterator.h"
#include "blob_file_manager.h" #include "blob_file_manager.h"
#include "blob_gc.h" #include "blob_gc.h"
#include "db/db_impl.h"
#include "rocksdb/status.h"
#include "titan/options.h" #include "titan/options.h"
#include "version_set.h" #include "version_set.h"
......
#include "blob_gc_job.h" #include "blob_gc_job.h"
#include "util/testharness.h"
#include "blob_gc_picker.h" #include "blob_gc_picker.h"
#include "db_impl.h" #include "db_impl.h"
#include "util/testharness.h"
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
...@@ -42,7 +42,9 @@ class BlobGCJobTest : public testing::Test { ...@@ -42,7 +42,9 @@ class BlobGCJobTest : public testing::Test {
~BlobGCJobTest() {} ~BlobGCJobTest() {}
void CheckBlobNumber(int expected) { void CheckBlobNumber(int expected) {
auto b = version_set_->GetBlobStorage(base_db_->DefaultColumnFamily()->GetID()).lock(); auto b =
version_set_->GetBlobStorage(base_db_->DefaultColumnFamily()->GetID())
.lock();
ASSERT_EQ(expected, b->files_.size()); ASSERT_EQ(expected, b->files_.size());
} }
...@@ -182,12 +184,14 @@ class BlobGCJobTest : public testing::Test { ...@@ -182,12 +184,14 @@ class BlobGCJobTest : public testing::Test {
db_->Delete(WriteOptions(), GenKey(i)); db_->Delete(WriteOptions(), GenKey(i));
} }
Flush(); Flush();
auto b = version_set_->GetBlobStorage(base_db_->DefaultColumnFamily()->GetID()).lock(); auto b =
version_set_->GetBlobStorage(base_db_->DefaultColumnFamily()->GetID())
.lock();
ASSERT_EQ(b->files_.size(), 1); ASSERT_EQ(b->files_.size(), 1);
auto old = b->files_.begin()->first; auto old = b->files_.begin()->first;
// for (auto& f : b->files_) { // for (auto& f : b->files_) {
// f.second->marked_for_sample = false; // f.second->marked_for_sample = false;
// } // }
std::unique_ptr<BlobFileIterator> iter; std::unique_ptr<BlobFileIterator> iter;
ASSERT_OK(NewIterator(b->files_.begin()->second->file_number(), ASSERT_OK(NewIterator(b->files_.begin()->second->file_number(),
b->files_.begin()->second->file_size(), &iter)); b->files_.begin()->second->file_size(), &iter));
...@@ -198,7 +202,8 @@ class BlobGCJobTest : public testing::Test { ...@@ -198,7 +202,8 @@ class BlobGCJobTest : public testing::Test {
ASSERT_TRUE(iter->key().compare(Slice(GenKey(i))) == 0); ASSERT_TRUE(iter->key().compare(Slice(GenKey(i))) == 0);
} }
RunGC(); RunGC();
b = version_set_->GetBlobStorage(base_db_->DefaultColumnFamily()->GetID()).lock(); b = version_set_->GetBlobStorage(base_db_->DefaultColumnFamily()->GetID())
.lock();
ASSERT_EQ(b->files_.size(), 1); ASSERT_EQ(b->files_.size(), 1);
auto new1 = b->files_.begin()->first; auto new1 = b->files_.begin()->first;
ASSERT_TRUE(old != new1); ASSERT_TRUE(old != new1);
...@@ -234,7 +239,8 @@ TEST_F(BlobGCJobTest, DiscardEntry) { TestDiscardEntry(); } ...@@ -234,7 +239,8 @@ TEST_F(BlobGCJobTest, DiscardEntry) { TestDiscardEntry(); }
TEST_F(BlobGCJobTest, RunGC) { TestRunGC(); } TEST_F(BlobGCJobTest, RunGC) { TestRunGC(); }
// Tests blob file will be kept after GC, if it is still visible by active snapshots. // Tests blob file will be kept after GC, if it is still visible by active
// snapshots.
TEST_F(BlobGCJobTest, PurgeBlobs) { TEST_F(BlobGCJobTest, PurgeBlobs) {
NewDB(); NewDB();
......
...@@ -16,7 +16,8 @@ std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC( ...@@ -16,7 +16,8 @@ std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC(
uint64_t batch_size = 0; uint64_t batch_size = 0;
// ROCKS_LOG_INFO(db_options_.info_log, "blob file num:%lu gc score:%lu", // ROCKS_LOG_INFO(db_options_.info_log, "blob file num:%lu gc score:%lu",
// blob_storage->NumBlobFiles(), blob_storage->gc_score().size()); // blob_storage->NumBlobFiles(),
// blob_storage->gc_score().size());
for (auto& gc_score : blob_storage->gc_score()) { for (auto& gc_score : blob_storage->gc_score()) {
auto blob_file = blob_storage->FindFile(gc_score.file_number).lock(); auto blob_file = blob_storage->FindFile(gc_score.file_number).lock();
assert(blob_file); assert(blob_file);
......
...@@ -2,14 +2,14 @@ ...@@ -2,14 +2,14 @@
#include <memory> #include <memory>
#include "db/column_family.h"
#include "db/write_callback.h"
#include "rocksdb/status.h"
#include "util/filename.h"
#include "blob_file_manager.h" #include "blob_file_manager.h"
#include "blob_format.h" #include "blob_format.h"
#include "blob_gc.h" #include "blob_gc.h"
#include "blob_storage.h" #include "blob_storage.h"
#include "db/column_family.h"
#include "db/write_callback.h"
#include "rocksdb/status.h"
#include "util/filename.h"
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
......
#include "blob_gc_picker.h" #include "blob_gc_picker.h"
#include "util/filename.h"
#include "util/testharness.h"
#include "blob_file_builder.h" #include "blob_file_builder.h"
#include "blob_file_cache.h" #include "blob_file_cache.h"
#include "blob_file_iterator.h" #include "blob_file_iterator.h"
#include "blob_file_reader.h" #include "blob_file_reader.h"
#include "util/filename.h"
#include "util/testharness.h"
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
...@@ -22,8 +22,10 @@ class BlobGCPickerTest : public testing::Test { ...@@ -22,8 +22,10 @@ class BlobGCPickerTest : public testing::Test {
const TitanCFOptions& titan_cf_options) { const TitanCFOptions& titan_cf_options) {
auto blob_file_cache = std::make_shared<BlobFileCache>( auto blob_file_cache = std::make_shared<BlobFileCache>(
titan_db_options, titan_cf_options, NewLRUCache(128)); titan_db_options, titan_cf_options, NewLRUCache(128));
blob_storage_.reset(new BlobStorage(titan_db_options, titan_cf_options, blob_file_cache)); blob_storage_.reset(
basic_blob_gc_picker_.reset(new BasicBlobGCPicker(titan_db_options, titan_cf_options)); new BlobStorage(titan_db_options, titan_cf_options, blob_file_cache));
basic_blob_gc_picker_.reset(
new BasicBlobGCPicker(titan_db_options, titan_cf_options));
} }
void AddBlobFile(uint64_t file_number, uint64_t file_size, void AddBlobFile(uint64_t file_number, uint64_t file_size,
......
...@@ -37,7 +37,7 @@ std::weak_ptr<BlobFileMeta> BlobStorage::FindFile(uint64_t file_number) const { ...@@ -37,7 +37,7 @@ std::weak_ptr<BlobFileMeta> BlobStorage::FindFile(uint64_t file_number) const {
void BlobStorage::ExportBlobFiles( void BlobStorage::ExportBlobFiles(
std::map<uint64_t, std::weak_ptr<BlobFileMeta>>& ret) const { std::map<uint64_t, std::weak_ptr<BlobFileMeta>>& ret) const {
ReadLock rl(&mutex_); ReadLock rl(&mutex_);
for(auto& kv : files_) { for (auto& kv : files_) {
ret.emplace(kv.first, std::weak_ptr<BlobFileMeta>(kv.second)); ret.emplace(kv.first, std::weak_ptr<BlobFileMeta>(kv.second));
} }
} }
...@@ -47,13 +47,16 @@ void BlobStorage::AddBlobFile(std::shared_ptr<BlobFileMeta>& file) { ...@@ -47,13 +47,16 @@ void BlobStorage::AddBlobFile(std::shared_ptr<BlobFileMeta>& file) {
files_.emplace(std::make_pair(file->file_number(), file)); files_.emplace(std::make_pair(file->file_number(), file));
} }
void BlobStorage::MarkFileObsolete(std::shared_ptr<BlobFileMeta> file, SequenceNumber obsolete_sequence) { void BlobStorage::MarkFileObsolete(std::shared_ptr<BlobFileMeta> file,
SequenceNumber obsolete_sequence) {
WriteLock wl(&mutex_); WriteLock wl(&mutex_);
obsolete_files_.push_back(std::make_pair(file->file_number(), obsolete_sequence)); obsolete_files_.push_back(
std::make_pair(file->file_number(), obsolete_sequence));
file->FileStateTransit(BlobFileMeta::FileEvent::kDelete); file->FileStateTransit(BlobFileMeta::FileEvent::kDelete);
} }
void BlobStorage::GetObsoleteFiles(std::vector<std::string>* obsolete_files, SequenceNumber oldest_sequence) { void BlobStorage::GetObsoleteFiles(std::vector<std::string>* obsolete_files,
SequenceNumber oldest_sequence) {
WriteLock wl(&mutex_); WriteLock wl(&mutex_);
for (auto it = obsolete_files_.begin(); it != obsolete_files_.end();) { for (auto it = obsolete_files_.begin(); it != obsolete_files_.end();) {
...@@ -96,8 +99,7 @@ void BlobStorage::ComputeGCScore() { ...@@ -96,8 +99,7 @@ void BlobStorage::ComputeGCScore() {
gc_score_.push_back({}); gc_score_.push_back({});
auto& gcs = gc_score_.back(); auto& gcs = gc_score_.back();
gcs.file_number = file.first; gcs.file_number = file.first;
if (file.second->file_size() < if (file.second->file_size() < cf_options_.merge_small_file_threshold) {
cf_options_.merge_small_file_threshold) {
gcs.score = 1; gcs.score = 1;
} else { } else {
gcs.score = file.second->GetDiscardableRatio(); gcs.score = file.second->GetDiscardableRatio();
...@@ -111,6 +113,5 @@ void BlobStorage::ComputeGCScore() { ...@@ -111,6 +113,5 @@ void BlobStorage::ComputeGCScore() {
}); });
} }
} // namespace titandb } // namespace titandb
} // namespace rocksdb } // namespace rocksdb
...@@ -2,10 +2,10 @@ ...@@ -2,10 +2,10 @@
#include <inttypes.h> #include <inttypes.h>
#include "rocksdb/options.h"
#include "blob_file_cache.h" #include "blob_file_cache.h"
#include "blob_format.h" #include "blob_format.h"
#include "blob_gc.h" #include "blob_gc.h"
#include "rocksdb/options.h"
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
...@@ -21,12 +21,16 @@ class BlobStorage { ...@@ -21,12 +21,16 @@ class BlobStorage {
this->cf_options_ = bs.cf_options_; this->cf_options_ = bs.cf_options_;
} }
BlobStorage(const TitanDBOptions& _db_options, const TitanCFOptions& _cf_options, BlobStorage(const TitanDBOptions& _db_options,
const TitanCFOptions& _cf_options,
std::shared_ptr<BlobFileCache> _file_cache) std::shared_ptr<BlobFileCache> _file_cache)
: db_options_(_db_options), cf_options_(_cf_options), file_cache_(_file_cache), destroyed_(false) {} : db_options_(_db_options),
cf_options_(_cf_options),
file_cache_(_file_cache),
destroyed_(false) {}
~BlobStorage() { ~BlobStorage() {
for (auto& file: files_) { for (auto& file : files_) {
file_cache_->Evict(file.second->file_number()); file_cache_->Evict(file.second->file_number());
} }
} }
...@@ -78,9 +82,11 @@ class BlobStorage { ...@@ -78,9 +82,11 @@ class BlobStorage {
void AddBlobFile(std::shared_ptr<BlobFileMeta>& file); void AddBlobFile(std::shared_ptr<BlobFileMeta>& file);
void GetObsoleteFiles(std::vector<std::string>* obsolete_files, SequenceNumber oldest_sequence); void GetObsoleteFiles(std::vector<std::string>* obsolete_files,
SequenceNumber oldest_sequence);
void MarkFileObsolete(std::shared_ptr<BlobFileMeta> file, SequenceNumber obsolete_sequence); void MarkFileObsolete(std::shared_ptr<BlobFileMeta> file,
SequenceNumber obsolete_sequence);
private: private:
friend class VersionSet; friend class VersionSet;
...@@ -103,7 +109,8 @@ class BlobStorage { ...@@ -103,7 +109,8 @@ class BlobStorage {
std::list<std::pair<uint64_t, SequenceNumber>> obsolete_files_; std::list<std::pair<uint64_t, SequenceNumber>> obsolete_files_;
// It is marked when the column family handle is destroyed, indicating the // It is marked when the column family handle is destroyed, indicating the
// in-memory data structure can be destroyed. Physical files may still be kept. // in-memory data structure can be destroyed. Physical files may still be
// kept.
bool destroyed_; bool destroyed_;
}; };
......
...@@ -287,7 +287,7 @@ Status TitanDBImpl::CreateColumnFamilies( ...@@ -287,7 +287,7 @@ Status TitanDBImpl::CreateColumnFamilies(
Status TitanDBImpl::DropColumnFamilies( Status TitanDBImpl::DropColumnFamilies(
const std::vector<ColumnFamilyHandle*>& handles) { const std::vector<ColumnFamilyHandle*>& handles) {
std::vector<uint32_t> column_families; std::vector<uint32_t> column_families;
for (auto& handle: handles) { for (auto& handle : handles) {
column_families.emplace_back(handle->GetID()); column_families.emplace_back(handle->GetID());
} }
Status s = db_impl_->DropColumnFamilies(handles); Status s = db_impl_->DropColumnFamilies(handles);
...@@ -299,7 +299,8 @@ Status TitanDBImpl::DropColumnFamilies( ...@@ -299,7 +299,8 @@ Status TitanDBImpl::DropColumnFamilies(
return s; return s;
} }
Status TitanDBImpl::DestroyColumnFamilyHandle(ColumnFamilyHandle* column_family) { Status TitanDBImpl::DestroyColumnFamilyHandle(
ColumnFamilyHandle* column_family) {
auto cf_id = column_family->GetID(); auto cf_id = column_family->GetID();
Status s = db_impl_->DestroyColumnFamilyHandle(column_family); Status s = db_impl_->DestroyColumnFamilyHandle(column_family);
...@@ -365,7 +366,8 @@ Status TitanDBImpl::GetImpl(const ReadOptions& options, ...@@ -365,7 +366,8 @@ Status TitanDBImpl::GetImpl(const ReadOptions& options,
s = storage->Get(options, index, &record, &buffer); s = storage->Get(options, index, &record, &buffer);
if (s.IsCorruption()) { if (s.IsCorruption()) {
ROCKS_LOG_DEBUG(db_options_.info_log, "Key:%s Snapshot:%" PRIu64 " GetBlobFile err:%s\n", ROCKS_LOG_DEBUG(db_options_.info_log,
"Key:%s Snapshot:%" PRIu64 " GetBlobFile err:%s\n",
key.ToString(true).c_str(), key.ToString(true).c_str(),
options.snapshot->GetSequenceNumber(), options.snapshot->GetSequenceNumber(),
s.ToString().c_str()); s.ToString().c_str());
...@@ -432,8 +434,8 @@ Iterator* TitanDBImpl::NewIteratorImpl( ...@@ -432,8 +434,8 @@ Iterator* TitanDBImpl::NewIteratorImpl(
mutex_.Unlock(); mutex_.Unlock();
std::unique_ptr<ArenaWrappedDBIter> iter(db_impl_->NewIteratorImpl( std::unique_ptr<ArenaWrappedDBIter> iter(db_impl_->NewIteratorImpl(
options, cfd, options.snapshot->GetSequenceNumber(), nullptr /*read_callback*/, options, cfd, options.snapshot->GetSequenceNumber(),
true /*allow_blob*/, true /*allow_refresh*/)); nullptr /*read_callback*/, true /*allow_blob*/, true /*allow_refresh*/));
return new TitanDBIterator(options, storage.lock().get(), snapshot, return new TitanDBIterator(options, storage.lock().get(), snapshot,
std::move(iter)); std::move(iter));
} }
...@@ -456,9 +458,7 @@ Status TitanDBImpl::NewIterators( ...@@ -456,9 +458,7 @@ Status TitanDBImpl::NewIterators(
return Status::OK(); return Status::OK();
} }
const Snapshot* TitanDBImpl::GetSnapshot() { const Snapshot* TitanDBImpl::GetSnapshot() { return db_->GetSnapshot(); }
return db_->GetSnapshot();
}
void TitanDBImpl::ReleaseSnapshot(const Snapshot* snapshot) { void TitanDBImpl::ReleaseSnapshot(const Snapshot* snapshot) {
// TODO: // TODO:
......
#pragma once #pragma once
#include "blob_file_manager.h"
#include "db/db_impl.h" #include "db/db_impl.h"
#include "util/repeatable_thread.h"
#include "titan/db.h" #include "titan/db.h"
#include "blob_file_manager.h" #include "util/repeatable_thread.h"
#include "version_set.h" #include "version_set.h"
namespace rocksdb { namespace rocksdb {
...@@ -71,6 +71,7 @@ class TitanDBImpl : public TitanDB { ...@@ -71,6 +71,7 @@ class TitanDBImpl : public TitanDB {
void OnCompactionCompleted(const CompactionJobInfo& compaction_job_info); void OnCompactionCompleted(const CompactionJobInfo& compaction_job_info);
void StartBackgroundTasks(); void StartBackgroundTasks();
private: private:
class FileManager; class FileManager;
friend class FileManager; friend class FileManager;
......
...@@ -25,7 +25,7 @@ class TitanDBIterator : public Iterator { ...@@ -25,7 +25,7 @@ class TitanDBIterator : public Iterator {
Status status() const override { Status status() const override {
// assume volatile inner iter // assume volatile inner iter
if(status_.ok()) { if (status_.ok()) {
return iter_->status(); return iter_->status();
} else { } else {
return status_; return status_;
......
#pragma once #pragma once
#include "table/table_builder.h"
#include "blob_file_builder.h" #include "blob_file_builder.h"
#include "blob_file_manager.h" #include "blob_file_manager.h"
#include "table/table_builder.h"
#include "titan/options.h" #include "titan/options.h"
namespace rocksdb { namespace rocksdb {
......
#include "table/table_builder.h" #include "table/table_builder.h"
#include "table/table_reader.h"
#include "util/filename.h"
#include "util/testharness.h"
#include "blob_file_manager.h" #include "blob_file_manager.h"
#include "blob_file_reader.h" #include "blob_file_reader.h"
#include "table/table_reader.h"
#include "table_factory.h" #include "table_factory.h"
#include "util/filename.h"
#include "util/testharness.h"
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
...@@ -80,7 +80,8 @@ class TableBuilderTest : public testing::Test { ...@@ -80,7 +80,8 @@ class TableBuilderTest : public testing::Test {
db_options_.dirname = tmpdir_; db_options_.dirname = tmpdir_;
cf_options_.min_blob_size = kMinBlobSize; cf_options_.min_blob_size = kMinBlobSize;
blob_manager_.reset(new FileManager(db_options_)); blob_manager_.reset(new FileManager(db_options_));
table_factory_.reset(new TitanTableFactory(db_options_, cf_options_, blob_manager_)); table_factory_.reset(
new TitanTableFactory(db_options_, cf_options_, blob_manager_));
} }
~TableBuilderTest() { ~TableBuilderTest() {
......
#pragma once #pragma once
#include "rocksdb/table.h"
#include "blob_file_manager.h" #include "blob_file_manager.h"
#include "rocksdb/table.h"
#include "titan/options.h" #include "titan/options.h"
namespace rocksdb { namespace rocksdb {
......
#include <inttypes.h> #include <inttypes.h>
#include <options/cf_options.h> #include <options/cf_options.h>
#include "titan/db.h" #include "blob_file_iterator.h"
#include "blob_file_reader.h"
#include "db_impl.h" #include "db_impl.h"
#include "db_iter.h"
#include "titan/db.h"
#include "titan_fault_injection_test_env.h" #include "titan_fault_injection_test_env.h"
#include "util/filename.h" #include "util/filename.h"
#include "util/random.h" #include "util/random.h"
#include "util/testharness.h"
#include "util/sync_point.h" #include "util/sync_point.h"
#include "blob_file_reader.h" #include "util/testharness.h"
#include "blob_file_iterator.h"
#include "db_iter.h"
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
...@@ -113,14 +113,16 @@ class TitanDBTest : public testing::Test { ...@@ -113,14 +113,16 @@ class TitanDBTest : public testing::Test {
} }
} }
std::weak_ptr<BlobStorage> GetBlobStorage(ColumnFamilyHandle* cf_handle = nullptr) { std::weak_ptr<BlobStorage> GetBlobStorage(
if(cf_handle == nullptr) { ColumnFamilyHandle* cf_handle = nullptr) {
if (cf_handle == nullptr) {
cf_handle = db_->DefaultColumnFamily(); cf_handle = db_->DefaultColumnFamily();
} }
return db_impl_->vset_->GetBlobStorage(cf_handle->GetID()); return db_impl_->vset_->GetBlobStorage(cf_handle->GetID());
} }
void VerifyDB(const std::map<std::string, std::string>& data, ReadOptions ropts = ReadOptions()) { void VerifyDB(const std::map<std::string, std::string>& data,
ReadOptions ropts = ReadOptions()) {
db_impl_->PurgeObsoleteFiles(); db_impl_->PurgeObsoleteFiles();
for (auto& kv : data) { for (auto& kv : data) {
...@@ -156,8 +158,7 @@ class TitanDBTest : public testing::Test { ...@@ -156,8 +158,7 @@ class TitanDBTest : public testing::Test {
} }
} }
void VerifyBlob( void VerifyBlob(uint64_t file_number,
uint64_t file_number,
const std::map<std::string, std::string>& data) { const std::map<std::string, std::string>& data) {
// Open blob file and iterate in-file records // Open blob file and iterate in-file records
EnvOptions env_opt; EnvOptions env_opt;
...@@ -166,16 +167,12 @@ class TitanDBTest : public testing::Test { ...@@ -166,16 +167,12 @@ class TitanDBTest : public testing::Test {
std::unique_ptr<RandomAccessFileReader> readable_file; std::unique_ptr<RandomAccessFileReader> readable_file;
std::string file_name = BlobFileName(options_.dirname, file_number); std::string file_name = BlobFileName(options_.dirname, file_number);
ASSERT_OK(env_->GetFileSize(file_name, &file_size)); ASSERT_OK(env_->GetFileSize(file_name, &file_size));
NewBlobFileReader(file_number, 0, options_, env_opt, env_, NewBlobFileReader(file_number, 0, options_, env_opt, env_, &readable_file);
&readable_file); BlobFileIterator iter(std::move(readable_file), file_number, file_size,
BlobFileIterator iter(std::move(readable_file), options_);
file_number,
file_size,
options_
);
iter.SeekToFirst(); iter.SeekToFirst();
for(auto& kv : data) { for (auto& kv : data) {
if(kv.second.size() < options_.min_blob_size) { if (kv.second.size() < options_.min_blob_size) {
continue; continue;
} }
ASSERT_EQ(iter.Valid(), true); ASSERT_EQ(iter.Valid(), true);
...@@ -360,7 +357,7 @@ TEST_F(TitanDBTest, IngestExternalFiles) { ...@@ -360,7 +357,7 @@ TEST_F(TitanDBTest, IngestExternalFiles) {
VerifyDB(total_data); VerifyDB(total_data);
Flush(); Flush();
VerifyDB(total_data); VerifyDB(total_data);
for(auto& handle : cf_handles_) { for (auto& handle : cf_handles_) {
auto blob = GetBlobStorage(handle); auto blob = GetBlobStorage(handle);
ASSERT_EQ(1, blob.lock()->NumBlobFiles()); ASSERT_EQ(1, blob.lock()->NumBlobFiles());
} }
...@@ -368,7 +365,7 @@ TEST_F(TitanDBTest, IngestExternalFiles) { ...@@ -368,7 +365,7 @@ TEST_F(TitanDBTest, IngestExternalFiles) {
CompactRangeOptions copt; CompactRangeOptions copt;
ASSERT_OK(db_->CompactRange(copt, nullptr, nullptr)); ASSERT_OK(db_->CompactRange(copt, nullptr, nullptr));
VerifyDB(total_data); VerifyDB(total_data);
for(auto& handle : cf_handles_) { for (auto& handle : cf_handles_) {
auto blob = GetBlobStorage(handle); auto blob = GetBlobStorage(handle);
ASSERT_EQ(2, blob.lock()->NumBlobFiles()); ASSERT_EQ(2, blob.lock()->NumBlobFiles());
std::map<uint64_t, std::weak_ptr<BlobFileMeta>> blob_files; std::map<uint64_t, std::weak_ptr<BlobFileMeta>> blob_files;
...@@ -376,7 +373,7 @@ TEST_F(TitanDBTest, IngestExternalFiles) { ...@@ -376,7 +373,7 @@ TEST_F(TitanDBTest, IngestExternalFiles) {
ASSERT_EQ(2, blob_files.size()); ASSERT_EQ(2, blob_files.size());
auto bf = blob_files.begin(); auto bf = blob_files.begin();
VerifyBlob(bf->first, original_data); VerifyBlob(bf->first, original_data);
bf ++; bf++;
VerifyBlob(bf->first, ingested_data); VerifyBlob(bf->first, ingested_data);
} }
} }
...@@ -384,19 +381,20 @@ TEST_F(TitanDBTest, IngestExternalFiles) { ...@@ -384,19 +381,20 @@ TEST_F(TitanDBTest, IngestExternalFiles) {
TEST_F(TitanDBTest, DropColumnFamily) { TEST_F(TitanDBTest, DropColumnFamily) {
Open(); Open();
const uint64_t kNumCF = 3; const uint64_t kNumCF = 3;
for(uint64_t i = 1; i <= kNumCF; i++) { for (uint64_t i = 1; i <= kNumCF; i++) {
AddCF(std::to_string(i)); AddCF(std::to_string(i));
} }
const uint64_t kNumEntries = 100; const uint64_t kNumEntries = 100;
std::map<std::string, std::string> data; std::map<std::string, std::string> data;
for(uint64_t i = 1; i <= kNumEntries; i++) { for (uint64_t i = 1; i <= kNumEntries; i++) {
Put(i, &data); Put(i, &data);
} }
VerifyDB(data); VerifyDB(data);
Flush(); Flush();
VerifyDB(data); VerifyDB(data);
// Destroy column families handle, check whether the data is preserved after a round of GC and restart. // Destroy column families handle, check whether the data is preserved after a
// round of GC and restart.
for (auto& handle : cf_handles_) { for (auto& handle : cf_handles_) {
db_->DestroyColumnFamilyHandle(handle); db_->DestroyColumnFamilyHandle(handle);
} }
...@@ -405,14 +403,14 @@ TEST_F(TitanDBTest, DropColumnFamily) { ...@@ -405,14 +403,14 @@ TEST_F(TitanDBTest, DropColumnFamily) {
Reopen(); Reopen();
VerifyDB(data); VerifyDB(data);
for(auto& handle : cf_handles_) { for (auto& handle : cf_handles_) {
// we can't drop default column family // we can't drop default column family
if (handle->GetName() == kDefaultColumnFamilyName) { if (handle->GetName() == kDefaultColumnFamilyName) {
continue; continue;
} }
ASSERT_OK(db_->DropColumnFamily(handle)); ASSERT_OK(db_->DropColumnFamily(handle));
// The data is actually deleted only after destroying all outstanding column family handles, // The data is actually deleted only after destroying all outstanding column
// so we can still read from the dropped column family. // family handles, so we can still read from the dropped column family.
VerifyDB(data); VerifyDB(data);
} }
...@@ -437,17 +435,13 @@ TEST_F(TitanDBTest, BlobFileIOError) { ...@@ -437,17 +435,13 @@ TEST_F(TitanDBTest, BlobFileIOError) {
ASSERT_OK(db_->CompactRange(copts, nullptr, nullptr)); ASSERT_OK(db_->CompactRange(copts, nullptr, nullptr));
VerifyDB(data); VerifyDB(data);
SyncPoint::GetInstance()->SetCallBack( SyncPoint::GetInstance()->SetCallBack("BlobFileReader::Get", [&](void*) {
"BlobFileReader::Get", [&](void *) { mock_env->SetFilesystemActive(false, Status::IOError("Injected error"));
mock_env->SetFilesystemActive(
false,
Status::IOError("Injected error")
);
}); });
SyncPoint::GetInstance()->EnableProcessing(); SyncPoint::GetInstance()->EnableProcessing();
for(auto& it : data) { for (auto& it : data) {
std::string value; std::string value;
if(it.second.size() > options_.min_blob_size) { if (it.second.size() > options_.min_blob_size) {
ASSERT_TRUE(db_->Get(ReadOptions(), it.first, &value).IsIOError()); ASSERT_TRUE(db_->Get(ReadOptions(), it.first, &value).IsIOError());
mock_env->SetFilesystemActive(true); mock_env->SetFilesystemActive(true);
} }
...@@ -499,12 +493,9 @@ TEST_F(TitanDBTest, FlushWriteIOErrorHandling) { ...@@ -499,12 +493,9 @@ TEST_F(TitanDBTest, FlushWriteIOErrorHandling) {
// no compaction to enable Flush // no compaction to enable Flush
VerifyDB(data); VerifyDB(data);
SyncPoint::GetInstance()->SetCallBack( SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
"FlushJob::Start", [&](void *) { mock_env->SetFilesystemActive(false,
mock_env->SetFilesystemActive( Status::IOError("FlushJob injected error"));
false,
Status::IOError("FlushJob injected error")
);
}); });
SyncPoint::GetInstance()->EnableProcessing(); SyncPoint::GetInstance()->EnableProcessing();
FlushOptions fopts; FlushOptions fopts;
...@@ -543,11 +534,9 @@ TEST_F(TitanDBTest, CompactionWriteIOErrorHandling) { ...@@ -543,11 +534,9 @@ TEST_F(TitanDBTest, CompactionWriteIOErrorHandling) {
VerifyDB(data); VerifyDB(data);
SyncPoint::GetInstance()->SetCallBack( SyncPoint::GetInstance()->SetCallBack(
"BackgroundCallCompaction:0", [&](void *) { "BackgroundCallCompaction:0", [&](void*) {
mock_env->SetFilesystemActive( mock_env->SetFilesystemActive(
false, false, Status::IOError("Compaction injected error"));
Status::IOError("Compaction injected error")
);
}); });
SyncPoint::GetInstance()->EnableProcessing(); SyncPoint::GetInstance()->EnableProcessing();
ASSERT_TRUE(db_->CompactRange(copts, nullptr, nullptr).IsIOError()); ASSERT_TRUE(db_->CompactRange(copts, nullptr, nullptr).IsIOError());
...@@ -590,7 +579,7 @@ TEST_F(TitanDBTest, BlobFileCorruptionErrorHandling) { ...@@ -590,7 +579,7 @@ TEST_F(TitanDBTest, BlobFileCorruptionErrorHandling) {
SyncPoint::GetInstance()->EnableProcessing(); SyncPoint::GetInstance()->EnableProcessing();
for (auto& it : data) { for (auto& it : data) {
std::string value; std::string value;
if(it.second.size() < options_.min_blob_size) { if (it.second.size() < options_.min_blob_size) {
continue; continue;
} }
ASSERT_TRUE(db_->Get(ReadOptions(), it.first, &value).IsCorruption()); ASSERT_TRUE(db_->Get(ReadOptions(), it.first, &value).IsCorruption());
......
...@@ -14,27 +14,23 @@ class TitanTestRandomAccessFile : public RandomAccessFile { ...@@ -14,27 +14,23 @@ class TitanTestRandomAccessFile : public RandomAccessFile {
public: public:
explicit TitanTestRandomAccessFile(std::unique_ptr<RandomAccessFile>&& f, explicit TitanTestRandomAccessFile(std::unique_ptr<RandomAccessFile>&& f,
TitanFaultInjectionTestEnv* env) TitanFaultInjectionTestEnv* env)
: target_(std::move(f)), : target_(std::move(f)), env_(env) {
env_(env) {
assert(target_ != nullptr); assert(target_ != nullptr);
} }
virtual ~TitanTestRandomAccessFile() { } virtual ~TitanTestRandomAccessFile() {}
Status Read(uint64_t offset, size_t n, Slice* result, Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override; char* scratch) const override;
Status Prefetch(uint64_t offset, size_t n) override; Status Prefetch(uint64_t offset, size_t n) override;
size_t GetUniqueId(char* id, size_t max_size) const override { size_t GetUniqueId(char* id, size_t max_size) const override {
return target_->GetUniqueId(id, max_size); return target_->GetUniqueId(id, max_size);
} }
void Hint(AccessPattern pattern) override { void Hint(AccessPattern pattern) override { return target_->Hint(pattern); }
return target_->Hint(pattern); bool use_direct_io() const override { return target_->use_direct_io(); }
}
bool use_direct_io() const override {
return target_->use_direct_io();
}
size_t GetRequiredBufferAlignment() const override { size_t GetRequiredBufferAlignment() const override {
return target_->GetRequiredBufferAlignment(); return target_->GetRequiredBufferAlignment();
} }
Status InvalidateCache(size_t offset, size_t length) override; Status InvalidateCache(size_t offset, size_t length) override;
private: private:
std::unique_ptr<RandomAccessFile> target_; std::unique_ptr<RandomAccessFile> target_;
TitanFaultInjectionTestEnv* env_; TitanFaultInjectionTestEnv* env_;
...@@ -42,9 +38,8 @@ class TitanTestRandomAccessFile : public RandomAccessFile { ...@@ -42,9 +38,8 @@ class TitanTestRandomAccessFile : public RandomAccessFile {
class TitanFaultInjectionTestEnv : public FaultInjectionTestEnv { class TitanFaultInjectionTestEnv : public FaultInjectionTestEnv {
public: public:
TitanFaultInjectionTestEnv(Env* t) TitanFaultInjectionTestEnv(Env* t) : FaultInjectionTestEnv(t) {}
: FaultInjectionTestEnv(t) { } virtual ~TitanFaultInjectionTestEnv() {}
virtual ~TitanFaultInjectionTestEnv() { }
Status NewRandomAccessFile(const std::string& fname, Status NewRandomAccessFile(const std::string& fname,
std::unique_ptr<RandomAccessFile>* result, std::unique_ptr<RandomAccessFile>* result,
const EnvOptions& soptions) { const EnvOptions& soptions) {
...@@ -59,23 +54,24 @@ class TitanFaultInjectionTestEnv : public FaultInjectionTestEnv { ...@@ -59,23 +54,24 @@ class TitanFaultInjectionTestEnv : public FaultInjectionTestEnv {
} }
}; };
Status TitanTestRandomAccessFile::Read(uint64_t offset, size_t n, Status TitanTestRandomAccessFile::Read(uint64_t offset, size_t n, Slice* result,
Slice* result, char* scratch) const { char* scratch) const {
if(!env_->IsFilesystemActive()) { if (!env_->IsFilesystemActive()) {
return env_->GetError(); return env_->GetError();
} }
return target_->Read(offset, n, result, scratch); return target_->Read(offset, n, result, scratch);
} }
Status TitanTestRandomAccessFile::Prefetch(uint64_t offset, size_t n) { Status TitanTestRandomAccessFile::Prefetch(uint64_t offset, size_t n) {
if(!env_->IsFilesystemActive()) { if (!env_->IsFilesystemActive()) {
return env_->GetError(); return env_->GetError();
} }
return target_->Prefetch(offset, n); return target_->Prefetch(offset, n);
} }
Status TitanTestRandomAccessFile::InvalidateCache(size_t offset, size_t length) { Status TitanTestRandomAccessFile::InvalidateCache(size_t offset,
if(!env_->IsFilesystemActive()) { size_t length) {
if (!env_->IsFilesystemActive()) {
return env_->GetError(); return env_->GetError();
} }
return target_->InvalidateCache(offset, length); return target_->InvalidateCache(offset, length);
......
...@@ -2,8 +2,8 @@ ...@@ -2,8 +2,8 @@
#include <set> #include <set>
#include "rocksdb/slice.h"
#include "blob_format.h" #include "blob_format.h"
#include "rocksdb/slice.h"
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
...@@ -21,7 +21,8 @@ class VersionEdit { ...@@ -21,7 +21,8 @@ class VersionEdit {
added_files_.push_back(file); added_files_.push_back(file);
} }
void DeleteBlobFile(uint64_t file_number, SequenceNumber obsolete_sequence = 0) { void DeleteBlobFile(uint64_t file_number,
SequenceNumber obsolete_sequence = 0) {
deleted_files_.emplace_back(std::make_pair(file_number, obsolete_sequence)); deleted_files_.emplace_back(std::make_pair(file_number, obsolete_sequence));
} }
......
...@@ -210,10 +210,10 @@ Status VersionSet::Apply(VersionEdit* edit) { ...@@ -210,10 +210,10 @@ Status VersionSet::Apply(VersionEdit* edit) {
auto cf_id = edit->column_family_id_; auto cf_id = edit->column_family_id_;
auto it = column_families_.find(cf_id); auto it = column_families_.find(cf_id);
if (it == column_families_.end()) { if (it == column_families_.end()) {
// TODO: support OpenForReadOnly which doesn't open DB with all column family // TODO: support OpenForReadOnly which doesn't open DB with all column
// so there are maybe some invalid column family, but we can't just skip it // family so there are maybe some invalid column family, but we can't just
// otherwise blob files of the non-open column families will be regarded as // skip it otherwise blob files of the non-open column families will be
// obsolete and deleted. // regarded as obsolete and deleted.
return Status::OK(); return Status::OK();
} }
auto& files = it->second->files_; auto& files = it->second->files_;
...@@ -225,7 +225,8 @@ Status VersionSet::Apply(VersionEdit* edit) { ...@@ -225,7 +225,8 @@ Status VersionSet::Apply(VersionEdit* edit) {
fprintf(stderr, "blob file %" PRIu64 " doesn't exist before\n", number); fprintf(stderr, "blob file %" PRIu64 " doesn't exist before\n", number);
abort(); abort();
} else if (blob_it->second->is_obsolete()) { } else if (blob_it->second->is_obsolete()) {
fprintf(stderr, "blob file %" PRIu64 " has been deleted before\n", number); fprintf(stderr, "blob file %" PRIu64 " has been deleted before\n",
number);
abort(); abort();
} }
it->second->MarkFileObsolete(blob_it->second, file.second); it->second->MarkFileObsolete(blob_it->second, file.second);
...@@ -236,9 +237,11 @@ Status VersionSet::Apply(VersionEdit* edit) { ...@@ -236,9 +237,11 @@ Status VersionSet::Apply(VersionEdit* edit) {
auto blob_it = files.find(number); auto blob_it = files.find(number);
if (blob_it != files.end()) { if (blob_it != files.end()) {
if (blob_it->second->is_obsolete()) { if (blob_it->second->is_obsolete()) {
fprintf(stderr, "blob file %" PRIu64 " has been deleted before\n", number); fprintf(stderr, "blob file %" PRIu64 " has been deleted before\n",
number);
} else { } else {
fprintf(stderr, "blob file %" PRIu64 " has been added before\n", number); fprintf(stderr, "blob file %" PRIu64 " has been added before\n",
number);
} }
abort(); abort();
} }
...@@ -249,23 +252,27 @@ Status VersionSet::Apply(VersionEdit* edit) { ...@@ -249,23 +252,27 @@ Status VersionSet::Apply(VersionEdit* edit) {
return Status::OK(); return Status::OK();
} }
void VersionSet::AddColumnFamilies(const std::map<uint32_t, TitanCFOptions>& column_families) { void VersionSet::AddColumnFamilies(
const std::map<uint32_t, TitanCFOptions>& column_families) {
for (auto& cf : column_families) { for (auto& cf : column_families) {
auto file_cache = auto file_cache =
std::make_shared<BlobFileCache>(db_options_, cf.second, file_cache_); std::make_shared<BlobFileCache>(db_options_, cf.second, file_cache_);
auto blob_storage = std::make_shared<BlobStorage>(db_options_, cf.second, file_cache); auto blob_storage =
std::make_shared<BlobStorage>(db_options_, cf.second, file_cache);
column_families_.emplace(cf.first, blob_storage); column_families_.emplace(cf.first, blob_storage);
} }
} }
Status VersionSet::DropColumnFamilies(const std::vector<uint32_t>& column_families, SequenceNumber obsolete_sequence) { Status VersionSet::DropColumnFamilies(
const std::vector<uint32_t>& column_families,
SequenceNumber obsolete_sequence) {
Status s; Status s;
for (auto& cf_id : column_families) { for (auto& cf_id : column_families) {
auto it = column_families_.find(cf_id); auto it = column_families_.find(cf_id);
if (it != column_families_.end()) { if (it != column_families_.end()) {
VersionEdit edit; VersionEdit edit;
edit.SetColumnFamilyID(it->first); edit.SetColumnFamilyID(it->first);
for (auto& file: it->second->files_) { for (auto& file : it->second->files_) {
ROCKS_LOG_INFO(db_options_.info_log, "Titan add obsolete file [%llu]", ROCKS_LOG_INFO(db_options_.info_log, "Titan add obsolete file [%llu]",
file.second->file_number()); file.second->file_number());
edit.DeleteBlobFile(file.first, obsolete_sequence); edit.DeleteBlobFile(file.first, obsolete_sequence);
...@@ -273,8 +280,8 @@ Status VersionSet::DropColumnFamilies(const std::vector<uint32_t>& column_famili ...@@ -273,8 +280,8 @@ Status VersionSet::DropColumnFamilies(const std::vector<uint32_t>& column_famili
s = LogAndApply(&edit); s = LogAndApply(&edit);
if (!s.ok()) return s; if (!s.ok()) return s;
} else { } else {
ROCKS_LOG_ERROR(db_options_.info_log, ROCKS_LOG_ERROR(db_options_.info_log, "column %u not found for drop\n",
"column %u not found for drop\n", cf_id); cf_id);
return Status::NotFound("invalid column family"); return Status::NotFound("invalid column family");
} }
obsolete_columns_.insert(cf_id); obsolete_columns_.insert(cf_id);
...@@ -292,17 +299,18 @@ Status VersionSet::DestroyColumnFamily(uint32_t cf_id) { ...@@ -292,17 +299,18 @@ Status VersionSet::DestroyColumnFamily(uint32_t cf_id) {
} }
return Status::OK(); return Status::OK();
} }
ROCKS_LOG_ERROR(db_options_.info_log, ROCKS_LOG_ERROR(db_options_.info_log, "column %u not found for destroy\n",
"column %u not found for destroy\n", cf_id); cf_id);
return Status::NotFound("invalid column family"); return Status::NotFound("invalid column family");
} }
void VersionSet::GetObsoleteFiles(std::vector<std::string>* obsolete_files, SequenceNumber oldest_sequence) { void VersionSet::GetObsoleteFiles(std::vector<std::string>* obsolete_files,
SequenceNumber oldest_sequence) {
for (auto it = column_families_.begin(); it != column_families_.end();) { for (auto it = column_families_.begin(); it != column_families_.end();) {
auto& cf_id = it->first; auto& cf_id = it->first;
auto& blob_storage = it->second; auto& blob_storage = it->second;
// In the case of dropping column family, obsolete blob files can be deleted only // In the case of dropping column family, obsolete blob files can be deleted
// after the column family handle is destroyed. // only after the column family handle is destroyed.
if (obsolete_columns_.find(cf_id) != obsolete_columns_.end()) { if (obsolete_columns_.find(cf_id) != obsolete_columns_.end()) {
++it; ++it;
continue; continue;
...@@ -310,7 +318,8 @@ void VersionSet::GetObsoleteFiles(std::vector<std::string>* obsolete_files, Sequ ...@@ -310,7 +318,8 @@ void VersionSet::GetObsoleteFiles(std::vector<std::string>* obsolete_files, Sequ
blob_storage->GetObsoleteFiles(obsolete_files, oldest_sequence); blob_storage->GetObsoleteFiles(obsolete_files, oldest_sequence);
// Cleanup obsolete column family when all the blob files for that are deleted. // Cleanup obsolete column family when all the blob files for that are
// deleted.
if (blob_storage->MaybeRemove()) { if (blob_storage->MaybeRemove()) {
it = column_families_.erase(it); it = column_families_.erase(it);
continue; continue;
...@@ -318,7 +327,8 @@ void VersionSet::GetObsoleteFiles(std::vector<std::string>* obsolete_files, Sequ ...@@ -318,7 +327,8 @@ void VersionSet::GetObsoleteFiles(std::vector<std::string>* obsolete_files, Sequ
++it; ++it;
} }
obsolete_files->insert(obsolete_files->end(), obsolete_manifests_.begin(), obsolete_manifests_.end()); obsolete_files->insert(obsolete_files->end(), obsolete_manifests_.begin(),
obsolete_manifests_.end());
obsolete_manifests_.clear(); obsolete_manifests_.clear();
} }
......
...@@ -5,16 +5,16 @@ ...@@ -5,16 +5,16 @@
#include <unordered_map> #include <unordered_map>
#include <unordered_set> #include <unordered_set>
#include "blob_file_cache.h"
#include "blob_storage.h"
#include "db/log_reader.h" #include "db/log_reader.h"
#include "db/log_writer.h" #include "db/log_writer.h"
#include "port/port_posix.h" #include "port/port_posix.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "util/mutexlock.h"
#include "blob_file_cache.h"
#include "titan/options.h" #include "titan/options.h"
#include "util/mutexlock.h"
#include "version_edit.h" #include "version_edit.h"
#include "blob_storage.h"
namespace rocksdb { namespace rocksdb {
namespace titandb { namespace titandb {
...@@ -43,7 +43,8 @@ class VersionSet { ...@@ -43,7 +43,8 @@ class VersionSet {
// Drops some column families. The obsolete files will be deleted in // Drops some column families. The obsolete files will be deleted in
// background when they will not be accessed anymore. // background when they will not be accessed anymore.
// REQUIRES: mutex is held // REQUIRES: mutex is held
Status DropColumnFamilies(const std::vector<uint32_t>& handles, SequenceNumber obsolete_sequence); Status DropColumnFamilies(const std::vector<uint32_t>& handles,
SequenceNumber obsolete_sequence);
// Destroy the column family. Only after this is called, the obsolete files // Destroy the column family. Only after this is called, the obsolete files
// of the dropped column family can be physical deleted. // of the dropped column family can be physical deleted.
...@@ -63,7 +64,8 @@ class VersionSet { ...@@ -63,7 +64,8 @@ class VersionSet {
} }
// REQUIRES: mutex is held // REQUIRES: mutex is held
void GetObsoleteFiles(std::vector<std::string>* obsolete_files, SequenceNumber oldest_sequence); void GetObsoleteFiles(std::vector<std::string>* obsolete_files,
SequenceNumber oldest_sequence);
// REQUIRES: mutex is held // REQUIRES: mutex is held
void MarkAllFilesForGC() { void MarkAllFilesForGC() {
...@@ -71,6 +73,7 @@ class VersionSet { ...@@ -71,6 +73,7 @@ class VersionSet {
cf.second->MarkAllFilesForGC(); cf.second->MarkAllFilesForGC();
} }
} }
private: private:
friend class BlobFileSizeCollectorTest; friend class BlobFileSizeCollectorTest;
friend class VersionTest; friend class VersionTest;
...@@ -91,10 +94,12 @@ class VersionSet { ...@@ -91,10 +94,12 @@ class VersionSet {
std::vector<std::string> obsolete_manifests_; std::vector<std::string> obsolete_manifests_;
// As rocksdb described, `DropColumnFamilies()` only records the drop of the column family specified by ColumnFamilyHandle. // As rocksdb described, `DropColumnFamilies()` only records the drop of the
// The actual data is not deleted until the client calls `delete column_family`, namely `DestroyColumnFamilyHandle()`. // column family specified by ColumnFamilyHandle. The actual data is not
// We can still continue using the column family if we have outstanding ColumnFamilyHandle pointer. // deleted until the client calls `delete column_family`, namely
// So here record the dropped column family but the handler is not destroyed. // `DestroyColumnFamilyHandle()`. We can still continue using the column
// family if we have outstanding ColumnFamilyHandle pointer. So here record
// the dropped column family but the handler is not destroyed.
std::unordered_set<uint32_t> obsolete_columns_; std::unordered_set<uint32_t> obsolete_columns_;
std::unordered_map<uint32_t, std::shared_ptr<BlobStorage>> column_families_; std::unordered_map<uint32_t, std::shared_ptr<BlobStorage>> column_families_;
......
#include "util/filename.h"
#include "util/testharness.h"
#include "testutil.h" #include "testutil.h"
#include "util.h" #include "util.h"
#include "util/filename.h"
#include "util/testharness.h"
#include "version_edit.h" #include "version_edit.h"
#include "version_set.h" #include "version_set.h"
...@@ -82,7 +82,7 @@ class VersionTest : public testing::Test { ...@@ -82,7 +82,7 @@ class VersionTest : public testing::Test {
auto& storage = column_families_[it.first]; auto& storage = column_families_[it.first];
// ignore obsolete file // ignore obsolete file
auto size = 0; auto size = 0;
for (auto& file: it.second->files_) { for (auto& file : it.second->files_) {
if (!file.second->is_obsolete()) { if (!file.second->is_obsolete()) {
size++; size++;
} }
......
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment