Unverified Commit 48754f31 authored by yiwu-arbug's avatar yiwu-arbug Committed by GitHub

Fix false corruption error when reading from v1 blob file (#221)

To support dictionary compression we added a v2 blob file header to store extra flags. However, the check in BlobFileReader , when reading a v1 blob file, falsely asserted the file header is shorter than expected:
https://github.com/tikv/titan/blob/ecb5cba016096309cbd9b566bdc4ce62307527a0/src/blob_file_reader.cc#L103
https://github.com/tikv/titan/blob/ecb5cba016096309cbd9b566bdc4ce62307527a0/src/blob_format.h#L394
Refactoring the code to bypass the check to fix the issue.

The issue is introduced in https://github.com/tikv/titan/pull/189. TiKV is affected when Titan is enabled and upgrade from pre-5.0 versions to >=5.0.0 versions. It will make TiKV fall in crash loop.

Also adding a titan_blob_file_dump tool to dump blob file content.
Signed-off-by: 's avatarYi Wu <yiwu@pingcap.com>
parent ecb5cba0
......@@ -156,6 +156,10 @@ if (WITH_TITAN_TOOLS)
add_executable(titan_manifest_dump tools/manifest_dump.cc)
target_include_directories(titan_manifest_dump PRIVATE ${gflags_INCLUDE_DIR})
target_link_libraries(titan_manifest_dump ${TOOLS_LIBS})
add_executable(titan_blob_file_dump tools/blob_file_dump.cc)
target_include_directories(titan_blob_file_dump PRIVATE ${gflags_INCLUDE_DIR})
target_link_libraries(titan_blob_file_dump ${TOOLS_LIBS})
endif()
# Installation - copy lib/ and include/
......
......@@ -5,26 +5,38 @@ namespace titandb {
BlobFileBuilder::BlobFileBuilder(const TitanDBOptions& db_options,
const TitanCFOptions& cf_options,
WritableFileWriter* file)
WritableFileWriter* file,
uint32_t blob_file_version)
: builder_state_(cf_options.blob_file_compression_options.max_dict_bytes > 0
? BuilderState::kBuffered
: BuilderState::kUnbuffered),
cf_options_(cf_options),
file_(file),
blob_file_version_(blob_file_version),
encoder_(cf_options.blob_file_compression,
cf_options.blob_file_compression_options) {
#if ZSTD_VERSION_NUMBER < 10103
status_ = BlobFileHeader::ValidateVersion(blob_file_version_);
if (!status_.ok()) {
return;
}
if (cf_options_.blob_file_compression_options.max_dict_bytes > 0) {
if (blob_file_version_ != BlobFileHeader::kVersion2) {
status_ = Status::NotSupported(
"dictionary comparession is not supported by blob file version 1");
}
#if ZSTD_VERSION_NUMBER < 10103
status_ = Status::NotSupported("ZSTD version too old.");
return;
}
#endif
}
WriteHeader();
}
void BlobFileBuilder::WriteHeader() {
BlobFileHeader header;
header.version = blob_file_version_;
if (cf_options_.blob_file_compression_options.max_dict_bytes > 0) {
assert(blob_file_version_ == BlobFileHeader::kVersion2);
header.flags |= BlobFileHeader::kHasUncompressionDictionary;
}
std::string buffer;
......@@ -174,6 +186,7 @@ Status BlobFileBuilder::Finish(OutContexts* out_ctx) {
BlobFileFooter footer;
// if has compression dictionary, encode it into meta blocks
if (cf_options_.blob_file_compression_options.max_dict_bytes > 0) {
assert(blob_file_version_ == BlobFileHeader::kVersion2);
BlockHandle meta_index_handle;
MetaIndexBuilder meta_index_builder;
WriteCompressionDictBlock(&meta_index_builder);
......
......@@ -67,7 +67,8 @@ class BlobFileBuilder {
// is building in "*file". Does not close the file. It is up to the
// caller to sync and close the file after calling Finish().
BlobFileBuilder(const TitanDBOptions& db_options,
const TitanCFOptions& cf_options, WritableFileWriter* file);
const TitanCFOptions& cf_options, WritableFileWriter* file,
uint32_t blob_file_version = BlobFileHeader::kVersion2);
// Tries to add the record to the file
// Notice:
......@@ -124,6 +125,7 @@ class BlobFileBuilder {
TitanCFOptions cf_options_;
WritableFileWriter* file_;
const uint32_t blob_file_version_;
Status status_;
BlobEncoder encoder_;
......
......@@ -28,7 +28,7 @@ bool BlobFileIterator::Init() {
return false;
}
BlobFileHeader blob_file_header;
status_ = blob_file_header.DecodeFrom(&slice);
status_ = DecodeInto(slice, &blob_file_header, true /*ignore_extra_bytes*/);
if (!status_.ok()) {
return false;
}
......
......@@ -125,7 +125,7 @@ Status BlobFileReader::ReadHeader(std::unique_ptr<RandomAccessFileReader>& file,
file->Read(0, BlobFileHeader::kMaxEncodedLength, &buffer, buffer.get());
if (!s.ok()) return s;
s = DecodeInto(buffer, header);
s = DecodeInto(buffer, header, true /*ignore_extra_bytes*/);
return s;
}
......
......@@ -50,7 +50,8 @@ class BlobFileTest : public testing::Test {
return s;
}
void TestBlobFilePrefetcher(TitanOptions options) {
void TestBlobFilePrefetcher(TitanOptions options,
uint32_t blob_file_version = 0) {
options.dirname = dirname_;
TitanDBOptions db_options(options);
TitanCFOptions cf_options(options);
......@@ -66,8 +67,15 @@ class BlobFileTest : public testing::Test {
file.reset(
new WritableFileWriter(std::move(f), file_name_, env_options_));
}
std::unique_ptr<BlobFileBuilder> builder(
new BlobFileBuilder(db_options, cf_options, file.get()));
std::unique_ptr<BlobFileBuilder> builder;
if (blob_file_version == 0) {
// Default blob file version
builder.reset(new BlobFileBuilder(db_options, cf_options, file.get()));
} else {
// Test with specific blob file version
builder.reset(new BlobFileBuilder(db_options, cf_options, file.get(),
blob_file_version));
}
for (int i = 0; i < n; i++) {
auto key = GenKey(i);
......@@ -115,7 +123,8 @@ class BlobFileTest : public testing::Test {
}
}
void TestBlobFileReader(TitanOptions options) {
void TestBlobFileReader(TitanOptions options,
uint32_t blob_file_version = 0) {
options.dirname = dirname_;
TitanDBOptions db_options(options);
TitanCFOptions cf_options(options);
......@@ -131,8 +140,16 @@ class BlobFileTest : public testing::Test {
file.reset(
new WritableFileWriter(std::move(f), file_name_, env_options_));
}
std::unique_ptr<BlobFileBuilder> builder(
new BlobFileBuilder(db_options, cf_options, file.get()));
std::unique_ptr<BlobFileBuilder> builder;
if (blob_file_version == 0) {
// Default blob file version
builder.reset(new BlobFileBuilder(db_options, cf_options, file.get()));
} else {
// Test with specific blob file version
builder.reset(new BlobFileBuilder(db_options, cf_options, file.get(),
blob_file_version));
}
for (int i = 0; i < n; i++) {
auto key = GenKey(i);
......@@ -197,6 +214,7 @@ class BlobFileTest : public testing::Test {
TEST_F(BlobFileTest, BlobFileReader) {
TitanOptions options;
TestBlobFileReader(options);
TestBlobFileReader(options, BlobFileHeader::kVersion1);
options.blob_file_compression = kLZ4Compression;
TestBlobFileReader(options);
}
......@@ -204,6 +222,7 @@ TEST_F(BlobFileTest, BlobFileReader) {
TEST_F(BlobFileTest, BlobFilePrefetcher) {
TitanOptions options;
TestBlobFilePrefetcher(options);
TestBlobFilePrefetcher(options, BlobFileHeader::kVersion1);
options.blob_cache = NewLRUCache(1 << 20);
TestBlobFilePrefetcher(options);
options.blob_file_compression = kLZ4Compression;
......
......@@ -352,6 +352,14 @@ struct BlobFileHeader {
uint32_t version = kVersion2;
uint32_t flags = 0;
static Status ValidateVersion(uint32_t ver) {
if (ver != BlobFileHeader::kVersion1 && ver != BlobFileHeader::kVersion2) {
return Status::InvalidArgument("unrecognized blob file version " +
ToString(ver));
}
return Status::OK();
}
uint64_t size() const {
return version == BlobFileHeader::kVersion1
? BlobFileHeader::kMinEncodedLength
......@@ -387,11 +395,12 @@ struct BlobFileFooter {
// A convenient template to decode a const slice.
template <typename T>
Status DecodeInto(const Slice& src, T* target) {
auto tmp = src;
auto s = target->DecodeFrom(&tmp);
if (s.ok() && !tmp.empty()) {
s = Status::Corruption(Slice());
Status DecodeInto(const Slice& src, T* target,
bool ignore_extra_bytes = false) {
Slice tmp = src;
Status s = target->DecodeFrom(&tmp);
if (!ignore_extra_bytes && s.ok() && !tmp.empty()) {
s = Status::Corruption("redundant bytes when decoding blob file");
}
return s;
}
......
// Copyright 2021-present TiKV Project Authors. Licensed under Apache-2.0.
#include "blob_file_iterator.h"
#include "file/filename.h"
#include "util/gflags_compat.h"
using GFLAGS_NAMESPACE::ParseCommandLineFlags;
using GFLAGS_NAMESPACE::SetUsageMessage;
DEFINE_string(path, "", "Path of blob file.");
DEFINE_bool(dump, false, "");
#define handle_error(s, location) \
if (!s.ok()) { \
fprintf(stderr, "error when %s: %s\n", location, s.ToString().c_str()); \
return 1; \
}
namespace rocksdb {
namespace titandb {
int blob_file_dump() {
Env* env = Env::Default();
Status s;
std::string file_name = FLAGS_path;
uint64_t file_size = 0;
s = env->GetFileSize(file_name, &file_size);
handle_error(s, "getting file size");
std::unique_ptr<RandomAccessFileReader> file;
std::unique_ptr<RandomAccessFile> f;
s = env->NewRandomAccessFile(file_name, &f, EnvOptions());
handle_error(s, "open file");
file.reset(new RandomAccessFileReader(std::move(f), file_name));
std::unique_ptr<BlobFileIterator> iter(new BlobFileIterator(
std::move(file), 1 /*fake file number*/, file_size, TitanCFOptions()));
iter->SeekToFirst();
while (iter->Valid()) {
handle_error(iter->status(), "status");
if (FLAGS_dump) {
std::string key = iter->key().ToString(true);
std::string value = iter->value().ToString(true);
fprintf(stdout, "%s: %s\n", key.c_str(), value.c_str());
}
}
handle_error(iter->status(), "reading blob file");
return 0;
}
} // namespace titandb
} // namespace rocksdb
int main(int argc, char** argv) {
SetUsageMessage(std::string("\nUSAGE\n") + std::string(argv[0]) +
" [OPTIONS]...");
ParseCommandLineFlags(&argc, &argv, true);
return rocksdb::titandb::blob_file_dump();
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment