Unverified Commit c99cf9d2 authored by yiwu-arbug's avatar yiwu-arbug Committed by GitHub

Upgrade RocksDB to 6.4 (#61)

Summary:
Update the code to base off RocksDB 6.4.x. There are a few internal interface changes and files moving around from 5.18 to 6.4. titandb_bench is also updated to base off the version in 6.4.x.

Test Plan:
Travis
parent 19ea115a
......@@ -9,7 +9,7 @@ if (NOT ROCKSDB_GIT_REPO)
endif()
if (NOT ROCKSDB_GIT_BRANCH)
set(ROCKSDB_GIT_BRANCH "tikv-3.0")
set(ROCKSDB_GIT_BRANCH "6.4.tikv")
endif()
if (NOT DEFINED ROCKSDB_DIR)
......
......@@ -42,5 +42,5 @@ ctest -R titan
bash scripts/format-diff.sh
```
## Compatibility
Currently Titan is only compatible with RocksDB 5.18.
## Compatibility with RocksDB
Current version of Titan is developed and tested with RocksDB 6.4.
......@@ -32,7 +32,7 @@ endif()
option(WITH_ZLIB "build with zlib" OFF)
if (WITH_ZLIB)
find_package(zlib REQUIRED)
find_package(ZLIB REQUIRED)
add_definitions(-DZLIB)
include_directories(${ZLIB_INCLUDE_DIR})
endif()
......
......@@ -3,8 +3,8 @@
#include <map>
#include <unordered_map>
#include "logging/logging.h"
#include "rocksdb/options.h"
#include "util/logging.h"
namespace rocksdb {
namespace titandb {
......
#include "blob_file_cache.h"
#include "file/filename.h"
#include "util.h"
#include "util/filename.h"
namespace rocksdb {
namespace titandb {
......
......@@ -19,7 +19,10 @@ BlobFileIterator::~BlobFileIterator() {}
bool BlobFileIterator::Init() {
Slice slice;
char header_buf[BlobFileHeader::kEncodedLength];
status_ = file_->Read(0, BlobFileHeader::kEncodedLength, &slice, header_buf);
// With for_compaction=true, rate_limiter is enabled. Since BlobFileIterator
// is only used for GC, we always set for_compaction to true.
status_ = file_->Read(0, BlobFileHeader::kEncodedLength, &slice, header_buf,
true /*for_compaction*/);
if (!status_.ok()) {
return false;
}
......@@ -29,8 +32,11 @@ bool BlobFileIterator::Init() {
return false;
}
char footer_buf[BlobFileFooter::kEncodedLength];
// With for_compaction=true, rate_limiter is enabled. Since BlobFileIterator
// is only used for GC, we always set for_compaction to true.
status_ = file_->Read(file_size_ - BlobFileFooter::kEncodedLength,
BlobFileFooter::kEncodedLength, &slice, footer_buf);
BlobFileFooter::kEncodedLength, &slice, footer_buf,
true /*for_compaction*/);
if (!status_.ok()) return false;
BlobFileFooter blob_file_footer;
status_ = blob_file_footer.DecodeFrom(&slice);
......@@ -74,8 +80,10 @@ void BlobFileIterator::IterateForPrev(uint64_t offset) {
FixedSlice<kBlobHeaderSize> header_buffer;
iterate_offset_ = BlobFileHeader::kEncodedLength;
for (; iterate_offset_ < offset; iterate_offset_ += total_length) {
// With for_compaction=true, rate_limiter is enabled. Since BlobFileIterator
// is only used for GC, we always set for_compaction to true.
status_ = file_->Read(iterate_offset_, kBlobHeaderSize, &header_buffer,
header_buffer.get());
header_buffer.get(), true /*for_compaction*/);
if (!status_.ok()) return;
status_ = decoder_.DecodeHeader(&header_buffer);
if (!status_.ok()) return;
......@@ -88,8 +96,10 @@ void BlobFileIterator::IterateForPrev(uint64_t offset) {
void BlobFileIterator::GetBlobRecord() {
FixedSlice<kBlobHeaderSize> header_buffer;
// With for_compaction=true, rate_limiter is enabled. Since BlobFileIterator
// is only used for GC, we always set for_compaction to true.
status_ = file_->Read(iterate_offset_, kBlobHeaderSize, &header_buffer,
header_buffer.get());
header_buffer.get(), true /*for_compaction*/);
if (!status_.ok()) return;
status_ = decoder_.DecodeHeader(&header_buffer);
if (!status_.ok()) return;
......@@ -97,8 +107,10 @@ void BlobFileIterator::GetBlobRecord() {
Slice record_slice;
auto record_size = decoder_.GetRecordSize();
buffer_.resize(record_size);
// With for_compaction=true, rate_limiter is enabled. Since BlobFileIterator
// is only used for GC, we always set for_compaction to true.
status_ = file_->Read(iterate_offset_ + kBlobHeaderSize, record_size,
&record_slice, buffer_.data());
&record_slice, buffer_.data(), true /*for_compaction*/);
if (status_.ok()) {
status_ =
decoder_.DecodeRecord(&record_slice, &cur_blob_record_, &uncompressed_);
......
......@@ -14,6 +14,7 @@
namespace rocksdb {
namespace titandb {
// Used by GC job for iterate through blob file.
class BlobFileIterator {
public:
const uint64_t kMinReadaheadSize = 4 << 10;
......
......@@ -2,11 +2,12 @@
#include <cinttypes>
#include "file/filename.h"
#include "test_util/testharness.h"
#include "blob_file_builder.h"
#include "blob_file_cache.h"
#include "blob_file_reader.h"
#include "util/filename.h"
#include "util/testharness.h"
namespace rocksdb {
namespace titandb {
......
......@@ -6,10 +6,10 @@
#include <inttypes.h>
#include "file/filename.h"
#include "test_util/sync_point.h"
#include "util/crc32c.h"
#include "util/filename.h"
#include "util/string_util.h"
#include "util/sync_point.h"
#include "titan_stats.h"
......@@ -28,12 +28,9 @@ Status NewBlobFileReader(uint64_t file_number, uint64_t readahead_size,
if (readahead_size > 0) {
file = NewReadaheadRandomAccessFile(std::move(file), readahead_size);
}
// Currently only `BlobGCJob` will call `NewBlobFileReader()`. We set
// `for_compaction=true` in this case to enable rate limiter.
result->reset(new RandomAccessFileReader(
std::move(file), file_name, nullptr /*env*/, nullptr /*stats*/,
0 /*hist_type*/, nullptr /*file_read_hist*/, env_options.rate_limiter,
true /*for compaction*/));
0 /*hist_type*/, nullptr /*file_read_hist*/, env_options.rate_limiter));
return s;
}
......
#include "test_util/testharness.h"
#include "blob_file_size_collector.h"
#include "util/testharness.h"
namespace rocksdb {
namespace titandb {
......@@ -50,8 +51,9 @@ class BlobFileSizeCollectorTest : public testing::Test {
CompressionOptions compression_opts;
TableBuilderOptions options(cf_ioptions_, cf_moptions_,
cf_ioptions_.internal_comparator, &collectors_,
kNoCompression, compression_opts, nullptr,
false, kDefaultColumnFamilyName, 0);
kNoCompression, 0 /*sample_for_compression*/,
compression_opts, false /*skip_filters*/,
kDefaultColumnFamilyName, 0 /*level*/);
result->reset(table_factory_->NewTableBuilder(options, 0, file));
ASSERT_TRUE(*result);
}
......
#include "file/filename.h"
#include "test_util/testharness.h"
#include "blob_file_builder.h"
#include "blob_file_cache.h"
#include "blob_file_reader.h"
#include "util/filename.h"
#include "util/testharness.h"
namespace rocksdb {
namespace titandb {
......
#include "blob_format.h"
#include "test_util/sync_point.h"
#include "util/crc32c.h"
#include "util/sync_point.h"
namespace rocksdb {
namespace titandb {
......@@ -40,7 +40,7 @@ void BlobEncoder::EncodeRecord(const BlobRecord& record) {
CompressionType compression;
record.EncodeTo(&record_buffer_);
record_ = Compress(compression_ctx_, record_buffer_, &compressed_buffer_,
record_ = Compress(compression_info_, record_buffer_, &compressed_buffer_,
&compression);
assert(record_.size() < std::numeric_limits<uint32_t>::max());
......@@ -82,7 +82,8 @@ Status BlobDecoder::DecodeRecord(Slice* src, BlobRecord* record,
return DecodeInto(input, record);
}
UncompressionContext ctx(compression_);
Status s = Uncompress(ctx, input, buffer);
UncompressionInfo info(ctx, UncompressionDict::GetEmptyDict(), compression_);
Status s = Uncompress(info, input, buffer);
if (!s.ok()) {
return s;
}
......
......@@ -32,7 +32,11 @@ struct BlobRecord {
class BlobEncoder {
public:
BlobEncoder(CompressionType compression) : compression_ctx_(compression) {}
BlobEncoder(CompressionType compression)
: compression_ctx_(compression),
compression_info_(compression_opt_, compression_ctx_,
CompressionDict::GetEmptyDict(), compression,
0 /*sample_for_compression*/) {}
void EncodeRecord(const BlobRecord& record);
......@@ -46,7 +50,9 @@ class BlobEncoder {
Slice record_;
std::string record_buffer_;
std::string compressed_buffer_;
CompressionOptions compression_opt_;
CompressionContext compression_ctx_;
CompressionInfo compression_info_;
};
class BlobDecoder {
......
#include "test_util/testharness.h"
#include "blob_format.h"
#include "testutil.h"
#include "util.h"
#include "util/testharness.h"
namespace rocksdb {
namespace titandb {
......
......@@ -3,6 +3,8 @@
#endif
#include <inttypes.h>
#include <memory>
#include "blob_gc_job.h"
namespace rocksdb {
......@@ -491,7 +493,7 @@ Status BlobGCJob::InstallOutputBlobFiles() {
}
}
} else {
std::vector<unique_ptr<BlobFileHandle>> handles;
std::vector<std::unique_ptr<BlobFileHandle>> handles;
std::string to_delete_files;
for (auto& builder : this->blob_file_builders_) {
if (!to_delete_files.empty()) {
......@@ -565,7 +567,8 @@ Status BlobGCJob::DeleteInputBlobFiles() {
VersionEdit edit;
edit.SetColumnFamilyID(blob_gc_->column_family_handle()->GetID());
for (const auto& file : blob_gc_->sampled_inputs()) {
ROCKS_LOG_INFO(db_options_.info_log, "Titan add obsolete file [%llu]",
ROCKS_LOG_INFO(db_options_.info_log,
"Titan add obsolete file [%" PRIu64 "]",
file->file_number());
metrics_.blob_db_gc_num_files++;
edit.DeleteBlobFile(file->file_number(), obsolete_sequence);
......
......@@ -4,7 +4,7 @@
#include "blob_file_iterator.h"
#include "blob_file_manager.h"
#include "blob_gc.h"
#include "db/db_impl.h"
#include "db/db_impl/db_impl.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
#include "titan/options.h"
......
#include "blob_gc_job.h"
#include "rocksdb/convenience.h"
#include "test_util/testharness.h"
#include "blob_gc_job.h"
#include "blob_gc_picker.h"
#include "db_impl.h"
#include "rocksdb/convenience.h"
#include "util/testharness.h"
namespace rocksdb {
namespace titandb {
......
#include "blob_gc_picker.h"
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
namespace rocksdb {
namespace titandb {
......@@ -16,9 +22,6 @@ std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC(
uint64_t batch_size = 0;
uint64_t estimate_output_size = 0;
// ROCKS_LOG_INFO(db_options_.info_log, "blob file num:%lu gc score:%lu",
// blob_storage->NumBlobFiles(),
// blob_storage->gc_score().size());
bool stop_picking = false;
bool maybe_continue_next_time = false;
uint64_t next_gc_size = 0;
......@@ -30,23 +33,11 @@ std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC(
// or this file had been GCed
continue;
}
// ROCKS_LOG_INFO(db_options_.info_log,
// "file number:%lu score:%f being_gc:%d pending:%d, "
// "size:%lu discard:%lu mark_for_gc:%d
// mark_for_sample:%d", blob_file->file_number_,
// gc_score.score, blob_file->being_gc,
// blob_file->pending, blob_file->file_size_,
// blob_file->discardable_size_,
// blob_file->marked_for_gc_,
// blob_file->marked_for_sample);
if (!CheckBlobFile(blob_file.get())) {
ROCKS_LOG_INFO(db_options_.info_log, "file number:%lu no need gc",
ROCKS_LOG_INFO(db_options_.info_log, "Blob file %" PRIu64 " no need gc",
blob_file->file_number());
continue;
}
if (!stop_picking) {
blob_files.push_back(blob_file.get());
batch_size += blob_file->file_size();
......
......@@ -2,14 +2,15 @@
#include <memory>
#include "db/column_family.h"
#include "db/write_callback.h"
#include "file/filename.h"
#include "rocksdb/status.h"
#include "blob_file_manager.h"
#include "blob_format.h"
#include "blob_gc.h"
#include "blob_storage.h"
#include "db/column_family.h"
#include "db/write_callback.h"
#include "rocksdb/status.h"
#include "util/filename.h"
namespace rocksdb {
namespace titandb {
......
#include "blob_gc_picker.h"
#include "file/filename.h"
#include "test_util/testharness.h"
#include "blob_file_builder.h"
#include "blob_file_cache.h"
#include "blob_file_iterator.h"
#include "blob_file_reader.h"
#include "util/filename.h"
#include "util/testharness.h"
namespace rocksdb {
namespace titandb {
......
......@@ -65,7 +65,8 @@ class TitanDBImpl::FileManager : public BlobFileManager {
}
if (!s.ok()) return s;
ROCKS_LOG_INFO(db_->db_options_.info_log, "Titan adding blob file [%llu]",
ROCKS_LOG_INFO(db_->db_options_.info_log,
"Titan adding blob file [%" PRIu64 "]",
file.first->file_number());
edit.AddBlobFile(file.first);
}
......@@ -746,7 +747,7 @@ Status TitanDBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
if (!bs) {
// TODO: Should treat it as background error and make DB read-only.
ROCKS_LOG_ERROR(db_options_.info_log,
"Column family id:% " PRIu32 " not Found.", cf_id);
"Column family id:%" PRIu32 " not Found.", cf_id);
return Status::NotFound("Column family id: " + std::to_string(cf_id) +
" not Found.");
}
......@@ -1008,7 +1009,7 @@ void TitanDBImpl::OnCompactionCompleted(
if (!bs) {
// TODO: Should treat it as background error and make DB read-only.
ROCKS_LOG_ERROR(db_options_.info_log,
"OnCompactionCompleted[%d] Column family id:% " PRIu32
"OnCompactionCompleted[%d] Column family id:%" PRIu32
" not Found.",
compaction_job_info.job_id, compaction_job_info.cf_id);
return;
......
#pragma once
#include "blob_file_manager.h"
#include "db/db_impl.h"
#include "db/db_impl/db_impl.h"
#include "rocksdb/statistics.h"
#include "table_factory.h"
#include "titan/db.h"
......
......@@ -10,8 +10,8 @@
#include <unordered_map>
#include "db/db_iter.h"
#include "logging/logging.h"
#include "rocksdb/env.h"
#include "util/logging.h"
#include "titan_stats.h"
......
......@@ -6,9 +6,9 @@
#include <inttypes.h>
#include "logging/logging.h"
#include "options/options_helper.h"
#include "rocksdb/convenience.h"
#include "util/logging.h"
namespace rocksdb {
namespace titandb {
......
#include "table/table_builder.h"
#include "file/filename.h"
#include "table/table_reader.h"
#include "test_util/testharness.h"
#include "blob_file_manager.h"
#include "blob_file_reader.h"
#include "table/table_reader.h"
#include "table_factory.h"
#include "util/filename.h"
#include "util/testharness.h"
#include "version_set.h"
namespace rocksdb {
......@@ -149,8 +150,9 @@ class TableBuilderTest : public testing::Test {
CompressionOptions compression_opts;
TableBuilderOptions options(cf_ioptions_, cf_moptions_,
cf_ioptions_.internal_comparator, &collectors_,
kNoCompression, compression_opts, nullptr,
false, kDefaultColumnFamilyName, 0);
kNoCompression, 0 /*sample_for_compression*/,
compression_opts, false /*skip_filters*/,
kDefaultColumnFamilyName, 0 /*level*/);
result->reset(table_factory_->NewTableBuilder(options, 0, file));
}
......@@ -203,7 +205,9 @@ TEST_F(TableBuilderTest, Basic) {
ReadOptions ro;
std::unique_ptr<InternalIterator> iter;
iter.reset(base_reader->NewIterator(ro, nullptr));
iter.reset(base_reader->NewIterator(ro, nullptr /*prefix_extractor*/,
nullptr /*arena*/, false /*skip_filters*/,
TableReaderCaller::kUncategorized));
iter->SeekToFirst();
for (char i = 0; i < n; i++) {
ASSERT_TRUE(iter->Valid());
......@@ -252,7 +256,9 @@ TEST_F(TableBuilderTest, NoBlob) {
ReadOptions ro;
std::unique_ptr<InternalIterator> iter;
iter.reset(base_reader->NewIterator(ro, nullptr));
iter.reset(base_reader->NewIterator(ro, nullptr /*prefix_extractor*/,
nullptr /*arena*/, false /*skip_filters*/,
TableReaderCaller::kUncategorized));
iter->SeekToFirst();
for (char i = 0; i < n; i++) {
ASSERT_TRUE(iter->Valid());
......
#pragma once
#include "rocksdb/cache.h"
#include "test_util/testharness.h"
#include "util/compression.h"
#include "util/testharness.h"
namespace rocksdb {
namespace titandb {
......
......@@ -5,12 +5,12 @@
#include "db_impl.h"
#include "db_iter.h"
#include "file/filename.h"
#include "port/port.h"
#include "rocksdb/utilities/debug.h"
#include "test_util/testharness.h"
#include "titan/db.h"
#include "util/filename.h"
#include "util/random.h"
#include "util/testharness.h"
namespace rocksdb {
namespace titandb {
......
......@@ -2,17 +2,18 @@
#include <options/cf_options.h>
#include <unordered_map>
#include "file/filename.h"
#include "rocksdb/utilities/debug.h"
#include "test_util/sync_point.h"
#include "test_util/testharness.h"
#include "util/random.h"
#include "blob_file_iterator.h"
#include "blob_file_reader.h"
#include "db_impl.h"
#include "db_iter.h"
#include "rocksdb/utilities/debug.h"
#include "titan/db.h"
#include "titan_fault_injection_test_env.h"
#include "util/filename.h"
#include "util/random.h"
#include "util/sync_point.h"
#include "util/testharness.h"
namespace rocksdb {
namespace titandb {
......
#pragma once
#include "rocksdb/env.h"
#include "util/fault_injection_test_env.h"
#include "test_util/fault_injection_test_env.h"
#include <memory>
......@@ -78,4 +78,4 @@ Status TitanTestRandomAccessFile::InvalidateCache(size_t offset,
}
} // namespace titandb
} // namespace rocksdb
\ No newline at end of file
} // namespace rocksdb
......@@ -11,46 +11,46 @@ bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
return compressed_size < raw_size - (raw_size / 8u);
}
Slice Compress(const CompressionContext& ctx, const Slice& input,
Slice Compress(const CompressionInfo& info, const Slice& input,
std::string* output, CompressionType* type) {
*type = ctx.type();
if (ctx.type() == kNoCompression) {
*type = info.type();
if (info.type() == kNoCompression) {
return input;
}
// Returns compressed block contents if:
// (1) the compression method is supported in this platform and
// (2) the compression rate is "good enough".
switch (ctx.type()) {
switch (info.type()) {
case kSnappyCompression:
if (Snappy_Compress(ctx, input.data(), input.size(), output) &&
if (Snappy_Compress(info, input.data(), input.size(), output) &&
GoodCompressionRatio(output->size(), input.size())) {
return *output;
}
break;
case kZlibCompression:
if (Zlib_Compress(ctx, kCompressionFormat, input.data(), input.size(),
if (Zlib_Compress(info, kCompressionFormat, input.data(), input.size(),
output) &&
GoodCompressionRatio(output->size(), input.size())) {
return *output;
}
break;
case kBZip2Compression:
if (BZip2_Compress(ctx, kCompressionFormat, input.data(), input.size(),
if (BZip2_Compress(info, kCompressionFormat, input.data(), input.size(),
output) &&
GoodCompressionRatio(output->size(), input.size())) {
return *output;
}
break;
case kLZ4Compression:
if (LZ4_Compress(ctx, kCompressionFormat, input.data(), input.size(),
if (LZ4_Compress(info, kCompressionFormat, input.data(), input.size(),
output) &&
GoodCompressionRatio(output->size(), input.size())) {
return *output;
}
break;
case kLZ4HCCompression:
if (LZ4HC_Compress(ctx, kCompressionFormat, input.data(), input.size(),
if (LZ4HC_Compress(info, kCompressionFormat, input.data(), input.size(),
output) &&
GoodCompressionRatio(output->size(), input.size())) {
return *output;
......@@ -64,7 +64,7 @@ Slice Compress(const CompressionContext& ctx, const Slice& input,
break;
case kZSTD:
case kZSTDNotFinalCompression:
if (ZSTD_Compress(ctx, input.data(), input.size(), output) &&
if (ZSTD_Compress(info, input.data(), input.size(), output) &&
GoodCompressionRatio(output->size(), input.size())) {
return *output;
}
......@@ -78,13 +78,13 @@ Slice Compress(const CompressionContext& ctx, const Slice& input,
return input;
}
Status Uncompress(const UncompressionContext& ctx, const Slice& input,
Status Uncompress(const UncompressionInfo& info, const Slice& input,
OwnedSlice* output) {
int size = 0;
CacheAllocationPtr ubuf;
assert(ctx.type() != kNoCompression);
assert(info.type() != kNoCompression);
switch (ctx.type()) {
switch (info.type()) {
case kSnappyCompression: {
size_t usize = 0;
if (!Snappy_GetUncompressedLength(input.data(), input.size(), &usize)) {
......@@ -98,7 +98,7 @@ Status Uncompress(const UncompressionContext& ctx, const Slice& input,
break;
}
case kZlibCompression:
ubuf = Zlib_Uncompress(ctx, input.data(), input.size(), &size,
ubuf = Zlib_Uncompress(info, input.data(), input.size(), &size,
kCompressionFormat);
if (!ubuf.get()) {
return Status::Corruption("Corrupted compressed blob", "Zlib");
......@@ -114,7 +114,7 @@ Status Uncompress(const UncompressionContext& ctx, const Slice& input,
output->reset(std::move(ubuf), size);
break;
case kLZ4Compression:
ubuf = LZ4_Uncompress(ctx, input.data(), input.size(), &size,
ubuf = LZ4_Uncompress(info, input.data(), input.size(), &size,
kCompressionFormat);
if (!ubuf.get()) {
return Status::Corruption("Corrupted compressed blob", "LZ4");
......@@ -122,7 +122,7 @@ Status Uncompress(const UncompressionContext& ctx, const Slice& input,
output->reset(std::move(ubuf), size);
break;
case kLZ4HCCompression:
ubuf = LZ4_Uncompress(ctx, input.data(), input.size(), &size,
ubuf = LZ4_Uncompress(info, input.data(), input.size(), &size,
kCompressionFormat);
if (!ubuf.get()) {
return Status::Corruption("Corrupted compressed blob", "LZ4HC");
......@@ -138,7 +138,7 @@ Status Uncompress(const UncompressionContext& ctx, const Slice& input,
break;
case kZSTD:
case kZSTDNotFinalCompression:
ubuf = ZSTD_Uncompress(ctx, input.data(), input.size(), &size);
ubuf = ZSTD_Uncompress(info, input.data(), input.size(), &size);
if (!ubuf.get()) {
return Status::Corruption("Corrupted compressed blob", "ZSTD");
}
......
......@@ -55,13 +55,13 @@ class FixedSlice : public Slice {
// compressed data. However, if the compression ratio is not good, it
// returns the input slice directly and sets "*type" to
// kNoCompression.
Slice Compress(const CompressionContext& ctx, const Slice& input,
Slice Compress(const CompressionInfo& info, const Slice& input,
std::string* output, CompressionType* type);
// Uncompresses the input data according to the uncompression type.
// If successful, fills "*buffer" with the uncompressed data and
// points "*output" to it.
Status Uncompress(const UncompressionContext& ctx, const Slice& input,
Status Uncompress(const UncompressionInfo& info, const Slice& input,
OwnedSlice* output);
void UnrefCacheHandle(void* cache, void* handle);
......
#include "util.h"
#include "util/testharness.h"
#include "test_util/testharness.h"
namespace rocksdb {
namespace titandb {
......@@ -10,14 +11,20 @@ TEST(UtilTest, Compression) {
std::string input(1024, 'a');
for (auto compression :
{kSnappyCompression, kZlibCompression, kLZ4Compression, kZSTD}) {
CompressionOptions compression_opt;
CompressionContext compression_ctx(compression);
CompressionInfo compression_info(
compression_opt, compression_ctx, CompressionDict::GetEmptyDict(),
compression, 0 /* sample_for_compression */);
std::string buffer;
auto compressed = Compress(compression_ctx, input, &buffer, &compression);
auto compressed = Compress(compression_info, input, &buffer, &compression);
if (compression != kNoCompression) {
ASSERT_TRUE(compressed.size() <= input.size());
UncompressionContext uncompression_ctx(compression);
UncompressionInfo uncompression_info(
uncompression_ctx, UncompressionDict::GetEmptyDict(), compression);
OwnedSlice output;
ASSERT_OK(Uncompress(uncompression_ctx, compressed, &output));
ASSERT_OK(Uncompress(uncompression_info, compressed, &output));
ASSERT_EQ(output, input);
}
}
......
......@@ -2,8 +2,9 @@
#include <inttypes.h>
#include "file/filename.h"
#include "edit_collector.h"
#include "util/filename.h"
namespace rocksdb {
namespace titandb {
......@@ -71,7 +72,7 @@ Status VersionSet::Recover() {
LogReporter reporter;
reporter.status = &s;
log::Reader reader(nullptr, std::move(file), &reporter, true /*checksum*/,
0 /*initial_offset*/, 0);
0 /*log_num*/);
Slice record;
std::string scratch;
EditCollector collector;
......@@ -248,7 +249,8 @@ Status VersionSet::DropColumnFamilies(
VersionEdit edit;
edit.SetColumnFamilyID(it->first);
for (auto& file : it->second->files_) {
ROCKS_LOG_INFO(db_options_.info_log, "Titan add obsolete file [%llu]",
ROCKS_LOG_INFO(db_options_.info_log,
"Titan add obsolete file [%" PRIu64 "]",
file.second->file_number());
edit.DeleteBlobFile(file.first, obsolete_sequence);
}
......
#include "file/filename.h"
#include "test_util/testharness.h"
#include "edit_collector.h"
#include "testutil.h"
#include "util.h"
#include "util/filename.h"
#include "util/testharness.h"
#include "version_edit.h"
#include "version_set.h"
......
......@@ -7,10 +7,6 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#ifdef GFLAGS
#ifdef NUMA
#include <numa.h>
......@@ -20,11 +16,12 @@
#include <unistd.h>
#endif
#include <fcntl.h>
#include <inttypes.h>
#include <math.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <atomic>
#include <cinttypes>
#include <condition_variable>
#include <cstddef>
#include <memory>
......@@ -32,7 +29,7 @@
#include <thread>
#include <unordered_map>
#include "db/db_impl.h"
#include "db/db_impl/db_impl.h"
#include "db/malloc_stats.h"
#include "db/version_set.h"
#include "hdfs/env_hdfs.h"
......@@ -52,6 +49,7 @@
#include "rocksdb/rate_limiter.h"
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/stats_history.h"
#include "rocksdb/utilities/object_registry.h"
#include "rocksdb/utilities/optimistic_transaction_db.h"
#include "rocksdb/utilities/options_util.h"
......@@ -59,6 +57,8 @@
#include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h"
#include "rocksdb/write_batch.h"
#include "test_util/testutil.h"
#include "test_util/transaction_test_util.h"
#include "util/cast_util.h"
#include "util/compression.h"
#include "util/crc32c.h"
......@@ -67,8 +67,6 @@
#include "util/random.h"
#include "util/stderr_logger.h"
#include "util/string_util.h"
#include "util/testutil.h"
#include "util/transaction_test_util.h"
#include "util/xxhash.h"
#include "utilities/blob_db/blob_db.h"
#include "utilities/merge_operators.h"
......@@ -104,6 +102,7 @@ DEFINE_string(
"compact,"
"compactall,"
"multireadrandom,"
"mixgraph,"
"readseq,"
"readtocache,"
"readreverse,"
......@@ -519,6 +518,8 @@ DEFINE_int32(bloom_bits, -1,
DEFINE_double(memtable_bloom_size_ratio, 0,
"Ratio of memtable size used for bloom filter. 0 means no bloom "
"filter.");
DEFINE_bool(memtable_whole_key_filtering, false,
"Try to use whole key bloom filter in memtables.");
DEFINE_bool(memtable_use_huge_page, false,
"Try to use huge page in memtables.");
......@@ -527,6 +528,14 @@ DEFINE_bool(use_existing_db, false,
" database. If you set this flag and also specify a benchmark that"
" wants a fresh database, that benchmark will fail.");
DEFINE_bool(use_existing_keys, false,
"If true, uses existing keys in the DB, "
"rather than generating new ones. This involves some startup "
"latency to load all keys into memory. It is supported for the "
"same read/overwrite benchmarks as `-use_existing_db=true`, which "
"must also be set for this flag to be enabled. When this flag is "
"set, the value for `-num` will be ignored.");
DEFINE_bool(show_table_properties, false,
"If true, then per-level table"
" properties will be printed on every stats-interval when"
......@@ -564,6 +573,8 @@ DEFINE_bool(verify_checksum, true,
" from storage");
DEFINE_bool(statistics, false, "Database statistics");
DEFINE_int32(stats_level, rocksdb::StatsLevel::kExceptDetailedTimers,
"stats level for statistics");
DEFINE_string(statistics_string, "", "Serialized statistics string");
static class std::shared_ptr<rocksdb::Statistics> dbstats;
......@@ -707,6 +718,7 @@ DEFINE_string(
"RocksDB options related command-line arguments, all other arguments "
"that are related to RocksDB options will be ignored:\n"
"\t--use_existing_db\n"
"\t--use_existing_keys\n"
"\t--statistics\n"
"\t--row_cache_size\n"
"\t--row_cache_numshardbits\n"
......@@ -741,13 +753,8 @@ DEFINE_uint64(blob_db_max_ttl_range, 86400,
DEFINE_uint64(blob_db_ttl_range_secs, 3600,
"TTL bucket size to use when creating blob files.");
DEFINE_uint64(blob_db_bytes_per_sync, 0, "Bytes to sync blob file at.");
DEFINE_uint64(blob_db_file_size, 256 * 1024 * 1024,
"Target size of each blob file.");
DEFINE_uint64(blob_db_min_blob_size, 0,
"Smallest blob to store in a file. Blob smaller than this "
"Smallest blob to store in a file. Blobs smaller than this "
"will be inlined with the key in the LSM tree.");
// Titan Options
......@@ -757,6 +764,24 @@ DEFINE_uint64(titan_db_min_blob_size, 0,
"Smallest blob to store in a file. Blobs smaller than this "
"will be inlined with the key in the LSM tree.");
DEFINE_uint64(blob_db_bytes_per_sync, 0, "Bytes to sync blob file at.");
DEFINE_uint64(blob_db_file_size, 256 * 1024 * 1024,
"Target size of each blob file.");
// Secondary DB instance Options
DEFINE_bool(use_secondary_db, false,
"Open a RocksDB secondary instance. A primary instance can be "
"running in another db_bench process.");
DEFINE_string(secondary_path, "",
"Path to a directory used by the secondary instance to store "
"private files, e.g. info log.");
DEFINE_int32(secondary_update_interval, 5,
"Secondary instance attempts to catch up with the primary every "
"secondary_update_interval seconds.");
#endif // ROCKSDB_LITE
DEFINE_bool(report_bg_io_stats, false,
......@@ -767,6 +792,20 @@ DEFINE_bool(use_stderr_info_logger, false,
DEFINE_string(trace_file, "", "Trace workload to a file. ");
DEFINE_int32(trace_replay_fast_forward, 1,
"Fast forward trace replay, must >= 1. ");
DEFINE_int32(block_cache_trace_sampling_frequency, 1,
"Block cache trace sampling frequency, termed s. It uses spatial "
"downsampling and samples accesses to one out of s blocks.");
DEFINE_int64(
block_cache_trace_max_trace_file_size_in_bytes,
uint64_t{64} * 1024 * 1024 * 1024,
"The maximum block cache trace file size in bytes. Block cache accesses "
"will not be logged if the trace file size exceeds this threshold. Default "
"is 64 GB.");
DEFINE_string(block_cache_trace_file, "", "Block cache trace file path.");
static enum rocksdb::CompressionType StringToCompressionType(
const char* ctype) {
assert(ctype);
......@@ -807,6 +846,8 @@ DEFINE_string(compression_type, "snappy",
static enum rocksdb::CompressionType FLAGS_compression_type_e =
rocksdb::kSnappyCompression;
DEFINE_int64(sample_for_compression, 0, "Sample every N block for compression");
DEFINE_int32(compression_level, rocksdb::CompressionOptions().level,
"Compression level. The meaning of this value is library-"
"dependent. If unset, we try to use the default for the library "
......@@ -902,6 +943,9 @@ DEFINE_uint64(delayed_write_rate, 8388608u,
DEFINE_bool(enable_pipelined_write, true,
"Allow WAL and memtable writes to be pipelined");
DEFINE_bool(unordered_write, false,
"Allow WAL and memtable writes to be pipelined");
DEFINE_bool(allow_concurrent_memtable_write, true,
"Allow multi-writers to update mem tables in parallel.");
......@@ -955,6 +999,52 @@ DEFINE_uint64(
"If non-zero, db_bench will rate-limit the writes going into RocksDB. This "
"is the global rate in bytes/second.");
// the parameters of mix_graph
DEFINE_double(key_dist_a, 0.0,
"The parameter 'a' of key access distribution model "
"f(x)=a*x^b");
DEFINE_double(key_dist_b, 0.0,
"The parameter 'b' of key access distribution model "
"f(x)=a*x^b");
DEFINE_double(value_theta, 0.0,
"The parameter 'theta' of Generized Pareto Distribution "
"f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
DEFINE_double(value_k, 0.0,
"The parameter 'k' of Generized Pareto Distribution "
"f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
DEFINE_double(value_sigma, 0.0,
"The parameter 'theta' of Generized Pareto Distribution "
"f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
DEFINE_double(iter_theta, 0.0,
"The parameter 'theta' of Generized Pareto Distribution "
"f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
DEFINE_double(iter_k, 0.0,
"The parameter 'k' of Generized Pareto Distribution "
"f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
DEFINE_double(iter_sigma, 0.0,
"The parameter 'sigma' of Generized Pareto Distribution "
"f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
DEFINE_double(mix_get_ratio, 1.0,
"The ratio of Get queries of mix_graph workload");
DEFINE_double(mix_put_ratio, 0.0,
"The ratio of Put queries of mix_graph workload");
DEFINE_double(mix_seek_ratio, 0.0,
"The ratio of Seek queries of mix_graph workload");
DEFINE_int64(mix_max_scan_len, 10000, "The max scan length of Iterator");
DEFINE_int64(mix_ave_kv_size, 512,
"The average key-value size of this workload");
DEFINE_int64(mix_max_value_size, 1024, "The max value size of this workload");
DEFINE_double(
sine_mix_rate_noise, 0.0,
"Add the noise ratio to the sine rate, it is between 0.0 and 1.0");
DEFINE_bool(sine_mix_rate, false,
"Enable the sine QPS control on the mix workload");
DEFINE_uint64(
sine_mix_rate_interval_milliseconds, 10000,
"Interval of which the sine wave read_rate_limit is recalculated");
DEFINE_int64(mix_accesses, -1,
"The total query accesses of mix_graph workload");
DEFINE_uint64(
benchmark_read_rate_limit, 0,
"If non-zero, db_bench will rate-limit the reads from RocksDB. This "
......@@ -1052,6 +1142,7 @@ static bool ValidatePrefixSize(const char* flagname, int32_t value) {
}
return true;
}
DEFINE_int32(prefix_size, 0,
"control the prefix size for HashSkipList and "
"plain table");
......@@ -1059,6 +1150,14 @@ DEFINE_int64(keys_per_prefix, 0,
"control average number of keys generated "
"per prefix, 0 means no special handling of the prefix, "
"i.e. use the prefix comes with the generated random number.");
DEFINE_bool(total_order_seek, false,
"Enable total order seek regardless of index format.");
DEFINE_bool(prefix_same_as_start, false,
"Enforce iterator to return keys with prefix same as seek key.");
DEFINE_bool(
seek_missing_prefix, false,
"Iterator seek to keys with non-exist prefixes. Require prefix_size > 8");
DEFINE_int32(memtable_insert_with_hint_prefix_size, 0,
"If non-zero, enable "
"memtable insert with hint with the given prefix size.");
......@@ -1075,13 +1174,23 @@ DEFINE_bool(identity_as_first_hash, false,
DEFINE_bool(dump_malloc_stats, true, "Dump malloc stats in LOG ");
DEFINE_uint64(stats_dump_period_sec, rocksdb::Options().stats_dump_period_sec,
"Gap between printing stats to log in seconds");
DEFINE_uint64(stats_persist_period_sec,
rocksdb::Options().stats_persist_period_sec,
"Gap between persisting stats in seconds");
DEFINE_bool(persist_stats_to_disk, rocksdb::Options().persist_stats_to_disk,
"whether to persist stats to disk");
DEFINE_uint64(stats_history_buffer_size,
rocksdb::Options().stats_history_buffer_size,
"Max number of stats snapshots to keep in memory");
DEFINE_int64(multiread_stride, 0,
"Stride length for the keys in a MultiGet batch");
DEFINE_bool(multiread_batched, false, "Use the new MultiGet API");
enum RepFactory {
kSkipList,
kPrefixHash,
kVectorRep,
kHashLinkedList,
kCuckoo
};
static enum RepFactory StringToRepFactory(const char* ctype) {
......@@ -1095,8 +1204,6 @@ static enum RepFactory StringToRepFactory(const char* ctype) {
return kVectorRep;
else if (!strcasecmp(ctype, "hash_linkedlist"))
return kHashLinkedList;
else if (!strcasecmp(ctype, "cuckoo"))
return kCuckoo;
fprintf(stdout, "Cannot parse memreptable %s\n", ctype);
return kSkipList;
......@@ -1130,6 +1237,7 @@ DEFINE_int32(skip_list_lookahead, 0,
DEFINE_bool(report_file_operations, false,
"if report number of file "
"operations");
DEFINE_int32(readahead_size, 0, "Iterator readahead size");
static const bool FLAGS_soft_rate_limit_dummy __attribute__((__unused__)) =
RegisterFlagValidator(&FLAGS_soft_rate_limit, &ValidateRateLimit);
......@@ -1196,7 +1304,7 @@ class ReportFileOpEnv : public EnvWrapper {
ReportFileOpCounters* counters)
: target_(std::move(target)), counters_(counters) {}
virtual Status Read(size_t n, Slice* result, char* scratch) override {
Status Read(size_t n, Slice* result, char* scratch) override {
counters_->read_counter_.fetch_add(1, std::memory_order_relaxed);
Status rv = target_->Read(n, result, scratch);
counters_->bytes_read_.fetch_add(result->size(),
......@@ -1204,7 +1312,7 @@ class ReportFileOpEnv : public EnvWrapper {
return rv;
}
virtual Status Skip(uint64_t n) override { return target_->Skip(n); }
Status Skip(uint64_t n) override { return target_->Skip(n); }
};
Status s = target()->NewSequentialFile(f, r, soptions);
......@@ -1227,8 +1335,8 @@ class ReportFileOpEnv : public EnvWrapper {
CountingFile(std::unique_ptr<RandomAccessFile>&& target,
ReportFileOpCounters* counters)
: target_(std::move(target)), counters_(counters) {}
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override {
Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override {
counters_->read_counter_.fetch_add(1, std::memory_order_relaxed);
Status rv = target_->Read(offset, n, result, scratch);
counters_->bytes_read_.fetch_add(result->size(),
......@@ -1479,7 +1587,6 @@ class ReporterAgent {
private:
std::string Header() const { return "secs_elapsed,interval_qps"; }
void SleepAndReport() {
uint64_t kMicrosInSecond = 1000 * 1000;
auto time_started = env_->NowMicros();
while (true) {
{
......@@ -1808,7 +1915,7 @@ class Stats {
double throughput = (double)done_ / elapsed;
fprintf(stdout, "%-12s : %11.3f micros/op %ld ops/sec;%s%s\n",
name.ToString().c_str(), elapsed * 1e6 / done_, (long)throughput,
name.ToString().c_str(), seconds_ * 1e6 / done_, (long)throughput,
(extra.empty() ? "" : " "), extra.c_str());
if (FLAGS_histogram) {
for (auto it = hist_.begin(); it != hist_.end(); ++it) {
......@@ -2005,11 +2112,12 @@ class Benchmark {
int64_t range_tombstone_width_;
int64_t max_num_range_tombstones_;
WriteOptions write_options_;
titandb::TitanOptions
open_options_; // keep options around to properly destroy db later
#ifndef ROCKSDB_LITE
TraceOptions trace_options_;
TraceOptions block_cache_trace_options_;
#endif
titandb::TitanOptions
open_options_; // keep options around to properly destroy db later
int64_t reads_;
int64_t deletes_;
double read_random_exp_range_;
......@@ -2018,31 +2126,37 @@ class Benchmark {
int64_t merge_keys_;
bool report_file_operations_;
bool use_blob_db_;
std::vector<std::string> keys_;
class ErrorHandlerListener : public EventListener {
public:
#ifndef ROCKSDB_LITE
ErrorHandlerListener()
: cv_(&mutex_), no_auto_recovery_(false), recovery_complete_(false) {}
: mutex_(),
cv_(&mutex_),
no_auto_recovery_(false),
recovery_complete_(false) {}
~ErrorHandlerListener() {}
~ErrorHandlerListener() override {}
void OnErrorRecoveryBegin(BackgroundErrorReason /*reason*/,
Status /*bg_error*/, bool* auto_recovery) {
Status /*bg_error*/,
bool* auto_recovery) override {
if (*auto_recovery && no_auto_recovery_) {
*auto_recovery = false;
}
}
void OnErrorRecoveryCompleted(Status /*old_bg_error*/) {
void OnErrorRecoveryCompleted(Status /*old_bg_error*/) override {
InstrumentedMutexLock l(&mutex_);
recovery_complete_ = true;
cv_.SignalAll();
}
bool WaitForRecovery(uint64_t /*abs_time_us*/) {
bool WaitForRecovery(uint64_t abs_time_us) {
InstrumentedMutexLock l(&mutex_);
if (!recovery_complete_) {
cv_.Wait(/*abs_time_us*/);
cv_.TimedWait(abs_time_us);
}
if (recovery_complete_) {
recovery_complete_ = false;
......@@ -2058,6 +2172,10 @@ class Benchmark {
InstrumentedCondVar cv_;
bool no_auto_recovery_;
bool recovery_complete_;
#else // ROCKSDB_LITE
bool WaitForRecovery(uint64_t /*abs_time_us*/) { return true; }
void EnableAutoRecovery(bool /*enable*/) {}
#endif // ROCKSDB_LITE
};
std::shared_ptr<ErrorHandlerListener> listener_;
......@@ -2070,35 +2188,35 @@ class Benchmark {
return true;
}
inline bool CompressSlice(const CompressionContext& compression_ctx,
inline bool CompressSlice(const CompressionInfo& compression_info,
const Slice& input, std::string* compressed) {
bool ok = true;
switch (FLAGS_compression_type_e) {
case rocksdb::kSnappyCompression:
ok = Snappy_Compress(compression_ctx, input.data(), input.size(),
ok = Snappy_Compress(compression_info, input.data(), input.size(),
compressed);
break;
case rocksdb::kZlibCompression:
ok = Zlib_Compress(compression_ctx, 2, input.data(), input.size(),
ok = Zlib_Compress(compression_info, 2, input.data(), input.size(),
compressed);
break;
case rocksdb::kBZip2Compression:
ok = BZip2_Compress(compression_ctx, 2, input.data(), input.size(),
ok = BZip2_Compress(compression_info, 2, input.data(), input.size(),
compressed);
break;
case rocksdb::kLZ4Compression:
ok = LZ4_Compress(compression_ctx, 2, input.data(), input.size(),
ok = LZ4_Compress(compression_info, 2, input.data(), input.size(),
compressed);
break;
case rocksdb::kLZ4HCCompression:
ok = LZ4HC_Compress(compression_ctx, 2, input.data(), input.size(),
ok = LZ4HC_Compress(compression_info, 2, input.data(), input.size(),
compressed);
break;
case rocksdb::kXpressCompression:
ok = XPRESS_Compress(input.data(), input.size(), compressed);
break;
case rocksdb::kZSTD:
ok = ZSTD_Compress(compression_ctx, input.data(), input.size(),
ok = ZSTD_Compress(compression_info, input.data(), input.size(),
compressed);
break;
default:
......@@ -2142,6 +2260,8 @@ class Benchmark {
auto compression = CompressionTypeToString(FLAGS_compression_type_e);
fprintf(stdout, "Compression: %s\n", compression.c_str());
fprintf(stdout, "Compression sampling rate: %" PRId64 "\n",
FLAGS_sample_for_compression);
switch (FLAGS_rep_factory) {
case kPrefixHash:
......@@ -2156,9 +2276,6 @@ class Benchmark {
case kHashLinkedList:
fprintf(stdout, "Memtablerep: hash_linkedlist\n");
break;
case kCuckoo:
fprintf(stdout, "Memtablerep: cuckoo\n");
break;
}
fprintf(stdout, "Perf Level: %d\n", FLAGS_perf_level);
......@@ -2181,10 +2298,12 @@ class Benchmark {
const int len = FLAGS_block_size;
std::string input_str(len, 'y');
std::string compressed;
CompressionContext compression_ctx(FLAGS_compression_type_e,
Options().compression_opts);
bool result =
CompressSlice(compression_ctx, Slice(input_str), &compressed);
CompressionOptions opts;
CompressionContext context(FLAGS_compression_type_e);
CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
FLAGS_compression_type_e,
FLAGS_sample_for_compression);
bool result = CompressSlice(info, Slice(input_str), &compressed);
if (!result) {
fprintf(stdout, "WARNING: %s compression is not enabled\n",
......@@ -2285,13 +2404,13 @@ class Benchmark {
class KeepFilter : public CompactionFilter {
public:
virtual bool Filter(int /*level*/, const Slice& /*key*/,
const Slice& /*value*/, std::string* /*new_value*/,
bool* /*value_changed*/) const override {
bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
std::string* /*new_value*/,
bool* /*value_changed*/) const override {
return false;
}
virtual const char* Name() const override { return "KeepFilter"; }
const char* Name() const override { return "KeepFilter"; }
};
std::shared_ptr<Cache> NewCache(int64_t capacity) {
......@@ -2299,16 +2418,17 @@ class Benchmark {
return nullptr;
}
if (FLAGS_use_clock_cache) {
auto cache = NewClockCache((size_t)capacity, FLAGS_cache_numshardbits);
auto cache = NewClockCache(static_cast<size_t>(capacity),
FLAGS_cache_numshardbits);
if (!cache) {
fprintf(stderr, "Clock cache not supported.");
exit(1);
}
return cache;
} else {
return NewLRUCache((size_t)capacity, FLAGS_cache_numshardbits,
false /*strict_capacity_limit*/,
FLAGS_cache_high_pri_pool_ratio);
return NewLRUCache(
static_cast<size_t>(capacity), FLAGS_cache_numshardbits,
false /*strict_capacity_limit*/, FLAGS_cache_high_pri_pool_ratio);
}
}
......@@ -2429,6 +2549,13 @@ class Benchmark {
// | key 00000 |
// ----------------------------
void GenerateKeyFromInt(uint64_t v, int64_t num_keys, Slice* key) {
if (!keys_.empty()) {
assert(FLAGS_use_existing_keys);
assert(keys_.size() == static_cast<size_t>(num_keys));
assert(v < static_cast<uint64_t>(num_keys));
*key = keys_[v];
return;
}
char* start = const_cast<char*>(key->data());
char* pos = start;
if (keys_per_prefix_ > 0) {
......@@ -2463,6 +2590,17 @@ class Benchmark {
}
}
void GenerateKeyFromIntForSeek(uint64_t v, int64_t num_keys, Slice* key) {
GenerateKeyFromInt(v, num_keys, key);
if (FLAGS_seek_missing_prefix) {
assert(prefix_size_ > 8);
char* key_ptr = const_cast<char*>(key->data());
// This rely on GenerateKeyFromInt filling paddings with '0's.
// Putting a '1' will create a non-existing prefix.
key_ptr[8] = '1';
}
}
std::string GetPathForMultiple(std::string base_name, size_t id) {
if (!base_name.empty()) {
#ifndef OS_WIN
......@@ -2639,6 +2777,10 @@ class Benchmark {
} else if (name == "readreverse") {
method = &Benchmark::ReadReverse;
} else if (name == "readrandom") {
if (FLAGS_multiread_stride) {
fprintf(stderr, "entries_per_batch = %" PRIi64 "\n",
entries_per_batch_);
}
method = &Benchmark::ReadRandom;
} else if (name == "readrandomfast") {
method = &Benchmark::ReadRandomFast;
......@@ -2646,6 +2788,8 @@ class Benchmark {
fprintf(stderr, "entries_per_batch = %" PRIi64 "\n",
entries_per_batch_);
method = &Benchmark::MultiReadRandom;
} else if (name == "mixgraph") {
method = &Benchmark::MixGraph;
} else if (name == "readmissing") {
++key_size_;
method = &Benchmark::ReadRandom;
......@@ -2745,6 +2889,8 @@ class Benchmark {
PrintStats("rocksdb.levelstats");
} else if (name == "sstables") {
PrintStats("rocksdb.sstables");
} else if (name == "stats_history") {
PrintStatsHistory();
} else if (name == "replay") {
if (num_threads > 1) {
fprintf(stderr, "Multi-threaded replay is not yet supported\n");
......@@ -2809,6 +2955,47 @@ class Benchmark {
fprintf(stdout, "Tracing the workload to: [%s]\n",
FLAGS_trace_file.c_str());
}
// Start block cache tracing.
if (!FLAGS_block_cache_trace_file.empty()) {
// Sanity checks.
if (FLAGS_block_cache_trace_sampling_frequency <= 0) {
fprintf(stderr,
"Block cache trace sampling frequency must be higher than "
"0.\n");
exit(1);
}
if (FLAGS_block_cache_trace_max_trace_file_size_in_bytes <= 0) {
fprintf(stderr,
"The maximum file size for block cache tracing must be "
"higher than 0.\n");
exit(1);
}
block_cache_trace_options_.max_trace_file_size =
FLAGS_block_cache_trace_max_trace_file_size_in_bytes;
block_cache_trace_options_.sampling_frequency =
FLAGS_block_cache_trace_sampling_frequency;
std::unique_ptr<TraceWriter> block_cache_trace_writer;
Status s = NewFileTraceWriter(FLAGS_env, EnvOptions(),
FLAGS_block_cache_trace_file,
&block_cache_trace_writer);
if (!s.ok()) {
fprintf(stderr,
"Encountered an error when creating trace writer, %s\n",
s.ToString().c_str());
exit(1);
}
s = db_.db->StartBlockCacheTrace(block_cache_trace_options_,
std::move(block_cache_trace_writer));
if (!s.ok()) {
fprintf(
stderr,
"Encountered an error when starting block cache tracing, %s\n",
s.ToString().c_str());
exit(1);
}
fprintf(stdout, "Tracing block cache accesses to: [%s]\n",
FLAGS_block_cache_trace_file.c_str());
}
#endif // ROCKSDB_LITE
if (num_warmup > 0) {
......@@ -2837,6 +3024,12 @@ class Benchmark {
}
}
if (secondary_update_thread_) {
secondary_update_stopped_.store(1, std::memory_order_relaxed);
secondary_update_thread_->join();
secondary_update_thread_.reset();
}
#ifndef ROCKSDB_LITE
if (name != "replay" && FLAGS_trace_file != "") {
Status s = db_.db->EndTrace();
......@@ -2845,6 +3038,14 @@ class Benchmark {
s.ToString().c_str());
}
}
if (!FLAGS_block_cache_trace_file.empty()) {
Status s = db_.db->EndBlockCacheTrace();
if (!s.ok()) {
fprintf(stderr,
"Encountered an error ending the block cache tracing, %s\n",
s.ToString().c_str());
}
}
#endif // ROCKSDB_LITE
if (FLAGS_statistics) {
......@@ -2856,11 +3057,22 @@ class Benchmark {
->ToString()
.c_str());
}
#ifndef ROCKSDB_LITE
if (FLAGS_use_secondary_db) {
fprintf(stdout, "Secondary instance updated %" PRIu64 " times.\n",
secondary_db_updates_);
}
#endif // ROCKSDB_LITE
}
private:
std::shared_ptr<TimestampEmulator> timestamp_emulator_;
std::unique_ptr<port::Thread> secondary_update_thread_;
std::atomic<int> secondary_update_stopped_{0};
#ifndef ROCKSDB_LITE
uint64_t secondary_db_updates_ = 0;
#endif // ROCKSDB_LITE
struct ThreadArg {
Benchmark* bm;
SharedState* shared;
......@@ -3040,13 +3252,15 @@ class Benchmark {
int64_t produced = 0;
bool ok = true;
std::string compressed;
CompressionContext compression_ctx(FLAGS_compression_type_e,
Options().compression_opts);
CompressionOptions opts;
CompressionContext context(FLAGS_compression_type_e);
CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
FLAGS_compression_type_e,
FLAGS_sample_for_compression);
// Compress 1G
while (ok && bytes < int64_t(1) << 30) {
compressed.clear();
ok = CompressSlice(compression_ctx, input, &compressed);
ok = CompressSlice(info, input, &compressed);
produced += compressed.size();
bytes += input.size();
thread->stats.FinishedOps(nullptr, nullptr, 1, kCompress);
......@@ -3068,11 +3282,17 @@ class Benchmark {
Slice input = gen.Generate(FLAGS_block_size);
std::string compressed;
CompressionContext compression_ctx(FLAGS_compression_type_e);
CompressionOptions compression_opts;
CompressionInfo compression_info(
compression_opts, compression_ctx, CompressionDict::GetEmptyDict(),
FLAGS_compression_type_e, FLAGS_sample_for_compression);
UncompressionContext uncompression_ctx(FLAGS_compression_type_e);
CompressionContext compression_ctx(FLAGS_compression_type_e,
Options().compression_opts);
UncompressionInfo uncompression_info(uncompression_ctx,
UncompressionDict::GetEmptyDict(),
FLAGS_compression_type_e);
bool ok = CompressSlice(compression_ctx, input, &compressed);
bool ok = CompressSlice(compression_info, input, &compressed);
int64_t bytes = 0;
int decompress_size;
while (ok && bytes < 1024 * 1048576) {
......@@ -3093,7 +3313,7 @@ class Benchmark {
}
case rocksdb::kZlibCompression:
uncompressed =
Zlib_Uncompress(uncompression_ctx, compressed.data(),
Zlib_Uncompress(uncompression_info, compressed.data(),
compressed.size(), &decompress_size, 2);
ok = uncompressed.get() != nullptr;
break;
......@@ -3103,12 +3323,12 @@ class Benchmark {
ok = uncompressed.get() != nullptr;
break;
case rocksdb::kLZ4Compression:
uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(),
uncompressed = LZ4_Uncompress(uncompression_info, compressed.data(),
compressed.size(), &decompress_size, 2);
ok = uncompressed.get() != nullptr;
break;
case rocksdb::kLZ4HCCompression:
uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(),
uncompressed = LZ4_Uncompress(uncompression_info, compressed.data(),
compressed.size(), &decompress_size, 2);
ok = uncompressed.get() != nullptr;
break;
......@@ -3118,7 +3338,7 @@ class Benchmark {
ok = uncompressed.get() != nullptr;
break;
case rocksdb::kZSTD:
uncompressed = ZSTD_Uncompress(uncompression_ctx, compressed.data(),
uncompressed = ZSTD_Uncompress(uncompression_info, compressed.data(),
compressed.size(), &decompress_size);
ok = uncompressed.get() != nullptr;
break;
......@@ -3189,9 +3409,10 @@ class Benchmark {
options.use_direct_io_for_flush_and_compaction =
FLAGS_use_direct_io_for_flush_and_compaction;
#ifndef ROCKSDB_LITE
options.ttl = FLAGS_fifo_compaction_ttl;
options.compaction_options_fifo = CompactionOptionsFIFO(
FLAGS_fifo_compaction_max_table_files_size_mb * 1024 * 1024,
FLAGS_fifo_compaction_allow_compaction, FLAGS_fifo_compaction_ttl);
FLAGS_fifo_compaction_allow_compaction);
#endif // ROCKSDB_LITE
if (FLAGS_prefix_size != 0) {
options.prefix_extractor.reset(
......@@ -3209,6 +3430,7 @@ class Benchmark {
}
options.memtable_huge_page_size = FLAGS_memtable_use_huge_page ? 2048 : 0;
options.memtable_prefix_bloom_size_ratio = FLAGS_memtable_bloom_size_ratio;
options.memtable_whole_key_filtering = FLAGS_memtable_whole_key_filtering;
if (FLAGS_memtable_insert_with_hint_prefix_size > 0) {
options.memtable_insert_with_hint_prefix_extractor.reset(
NewCappedPrefixTransform(
......@@ -3254,10 +3476,6 @@ class Benchmark {
case kVectorRep:
options.memtable_factory.reset(new VectorRepFactory);
break;
case kCuckoo:
options.memtable_factory.reset(NewHashCuckooRepFactory(
options.write_buffer_size, FLAGS_key_size + FLAGS_value_size));
break;
#else
default:
fprintf(stderr, "Only skip list is supported in lite mode\n");
......@@ -3411,9 +3629,10 @@ class Benchmark {
}
if (FLAGS_max_bytes_for_level_multiplier_additional_v.size() > 0) {
if (FLAGS_max_bytes_for_level_multiplier_additional_v.size() !=
(unsigned int)FLAGS_num_levels) {
static_cast<unsigned int>(FLAGS_num_levels)) {
fprintf(stderr, "Insufficient number of fanouts specified %d\n",
(int)FLAGS_max_bytes_for_level_multiplier_additional_v.size());
static_cast<int>(
FLAGS_max_bytes_for_level_multiplier_additional_v.size()));
exit(1);
}
options.max_bytes_for_level_multiplier_additional =
......@@ -3425,6 +3644,7 @@ class Benchmark {
options.level0_slowdown_writes_trigger =
FLAGS_level0_slowdown_writes_trigger;
options.compression = FLAGS_compression_type_e;
options.sample_for_compression = FLAGS_sample_for_compression;
options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds;
options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB;
options.max_total_wal_size = FLAGS_max_total_wal_size;
......@@ -3453,6 +3673,7 @@ class Benchmark {
options.enable_write_thread_adaptive_yield =
FLAGS_enable_write_thread_adaptive_yield;
options.enable_pipelined_write = FLAGS_enable_pipelined_write;
options.unordered_write = FLAGS_unordered_write;
options.write_thread_max_yield_usec = FLAGS_write_thread_max_yield_usec;
options.write_thread_slow_yield_usec = FLAGS_write_thread_slow_yield_usec;
options.rate_limit_delay_max_milliseconds =
......@@ -3512,6 +3733,11 @@ class Benchmark {
fprintf(stderr, "Cannot use readonly flag with transaction_db\n");
exit(1);
}
if (FLAGS_use_secondary_db &&
(FLAGS_transaction_db || FLAGS_optimistic_transaction_db)) {
fprintf(stderr, "Cannot use use_secondary_db flag with transaction_db\n");
exit(1);
}
#endif // ROCKSDB_LITE
}
......@@ -3525,6 +3751,11 @@ class Benchmark {
options.dump_malloc_stats = FLAGS_dump_malloc_stats;
options.stats_dump_period_sec =
static_cast<unsigned int>(FLAGS_stats_dump_period_sec);
options.stats_persist_period_sec =
static_cast<unsigned int>(FLAGS_stats_persist_period_sec);
options.persist_stats_to_disk = FLAGS_persist_stats_to_disk;
options.stats_history_buffer_size =
static_cast<size_t>(FLAGS_stats_history_buffer_size);
options.compression_opts.level = FLAGS_compression_level;
options.compression_opts.max_dict_bytes = FLAGS_compression_max_dict_bytes;
......@@ -3582,16 +3813,9 @@ class Benchmark {
}
options.listeners.emplace_back(listener_);
opts->min_blob_size = FLAGS_titan_db_min_blob_size;
opts->min_gc_batch_size = 128 << 20;
opts->blob_file_compression = FLAGS_compression_type_e;
if (FLAGS_cache_size) {
opts->blob_cache = cache_;
}
opts->max_background_gc = FLAGS_max_background_jobs;
opts->max_gc_batch_size = 8LLU << 30;
if (FLAGS_num_multi_db <= 1) {
OpenDb(options, FLAGS_db, &db_);
} else {
......@@ -3612,6 +3836,19 @@ class Benchmark {
options.compaction_filter = new KeepFilter();
fprintf(stdout, "A noop compaction filter is used\n");
}
if (FLAGS_use_existing_keys) {
// Only work on single database
assert(db_.db != nullptr);
ReadOptions read_opts;
read_opts.total_order_seek = true;
Iterator* iter = db_.db->NewIterator(read_opts);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
keys_.emplace_back(iter->key().ToString());
}
delete iter;
FLAGS_num = keys_.size();
}
}
void Open(titandb::TitanOptions* opts) {
......@@ -3674,6 +3911,11 @@ class Benchmark {
} else if (FLAGS_transaction_db) {
TransactionDB* ptr;
TransactionDBOptions txn_db_options;
if (options.unordered_write) {
options.two_write_queues = true;
txn_db_options.skip_concurrency_control = true;
txn_db_options.write_policy = WRITE_PREPARED;
}
s = TransactionDB::Open(options, txn_db_options, db_name,
column_families, &db->cfh, &ptr);
if (s.ok()) {
......@@ -3712,6 +3954,11 @@ class Benchmark {
} else if (FLAGS_transaction_db) {
TransactionDB* ptr = nullptr;
TransactionDBOptions txn_db_options;
if (options.unordered_write) {
options.two_write_queues = true;
txn_db_options.skip_concurrency_control = true;
txn_db_options.write_policy = WRITE_PREPARED;
}
s = CreateLoggerFromOptions(db_name, options, &options.info_log);
if (s.ok()) {
s = TransactionDB::Open(options, txn_db_options, db_name, &ptr);
......@@ -3739,6 +3986,32 @@ class Benchmark {
if (s.ok()) {
db->db = ptr;
}
} else if (FLAGS_use_secondary_db) {
if (FLAGS_secondary_path.empty()) {
std::string default_secondary_path;
FLAGS_env->GetTestDirectory(&default_secondary_path);
default_secondary_path += "/dbbench_secondary";
FLAGS_secondary_path = default_secondary_path;
}
s = DB::OpenAsSecondary(options, db_name, FLAGS_secondary_path, &db->db);
if (s.ok() && FLAGS_secondary_update_interval > 0) {
secondary_update_thread_.reset(new port::Thread(
[this](int interval, DBWithColumnFamilies* _db) {
while (0 == secondary_update_stopped_.load(
std::memory_order_relaxed)) {
Status secondary_update_status =
_db->db->TryCatchUpWithPrimary();
if (!secondary_update_status.ok()) {
fprintf(stderr, "Failed to catch up with primary: %s\n",
secondary_update_status.ToString().c_str());
break;
}
++secondary_db_updates_;
FLAGS_env->SleepForMicroseconds(interval * 1000000);
}
},
FLAGS_secondary_update_interval, db));
}
#endif // ROCKSDB_LITE
} else {
s = DB::Open(options, db_name, &db->db);
......@@ -4471,6 +4744,8 @@ class Benchmark {
int64_t read = 0;
int64_t found = 0;
int64_t bytes = 0;
int num_keys = 0;
int64_t key_rand = GetRandomKey(&thread->rand);
ReadOptions options(FLAGS_verify_checksum, true);
std::unique_ptr<const char[]> key_guard;
Slice key = AllocateKey(&key_guard);
......@@ -4482,8 +4757,21 @@ class Benchmark {
// We use same key_rand as seed for key and column family so that we can
// deterministically find the cfh corresponding to a particular key, as it
// is done in DoWrite method.
int64_t key_rand = GetRandomKey(&thread->rand);
GenerateKeyFromInt(key_rand, FLAGS_num, &key);
if (entries_per_batch_ > 1 && FLAGS_multiread_stride) {
if (++num_keys == entries_per_batch_) {
num_keys = 0;
key_rand = GetRandomKey(&thread->rand);
if ((key_rand + (entries_per_batch_ - 1) * FLAGS_multiread_stride) >=
FLAGS_num) {
key_rand = FLAGS_num - entries_per_batch_ * FLAGS_multiread_stride;
}
} else {
key_rand += FLAGS_multiread_stride;
}
} else {
key_rand = GetRandomKey(&thread->rand);
}
read++;
Status s;
if (FLAGS_num_column_families > 1) {
......@@ -4535,6 +4823,9 @@ class Benchmark {
std::vector<Slice> keys;
std::vector<std::unique_ptr<const char[]>> key_guards;
std::vector<std::string> values(entries_per_batch_);
PinnableSlice* pin_values = new PinnableSlice[entries_per_batch_];
std::unique_ptr<PinnableSlice[]> pin_values_guard(pin_values);
std::vector<Status> stat_list(entries_per_batch_);
while (static_cast<int64_t>(keys.size()) < entries_per_batch_) {
key_guards.push_back(std::unique_ptr<const char[]>());
keys.push_back(AllocateKey(&key_guards.back()));
......@@ -4543,21 +4834,52 @@ class Benchmark {
Duration duration(FLAGS_duration, reads_);
while (!duration.Done(1)) {
DB* db = SelectDB(thread);
for (int64_t i = 0; i < entries_per_batch_; ++i) {
GenerateKeyFromInt(GetRandomKey(&thread->rand), FLAGS_num, &keys[i]);
if (FLAGS_multiread_stride) {
int64_t key = GetRandomKey(&thread->rand);
if ((key + (entries_per_batch_ - 1) * FLAGS_multiread_stride) >=
static_cast<int64_t>(FLAGS_num)) {
key = FLAGS_num - entries_per_batch_ * FLAGS_multiread_stride;
}
for (int64_t i = 0; i < entries_per_batch_; ++i) {
GenerateKeyFromInt(key, FLAGS_num, &keys[i]);
key += FLAGS_multiread_stride;
}
} else {
for (int64_t i = 0; i < entries_per_batch_; ++i) {
GenerateKeyFromInt(GetRandomKey(&thread->rand), FLAGS_num, &keys[i]);
}
}
std::vector<Status> statuses = db->MultiGet(options, keys, &values);
assert(static_cast<int64_t>(statuses.size()) == entries_per_batch_);
read += entries_per_batch_;
num_multireads++;
for (int64_t i = 0; i < entries_per_batch_; ++i) {
if (statuses[i].ok()) {
++found;
} else if (!statuses[i].IsNotFound()) {
fprintf(stderr, "MultiGet returned an error: %s\n",
statuses[i].ToString().c_str());
abort();
if (!FLAGS_multiread_batched) {
std::vector<Status> statuses = db->MultiGet(options, keys, &values);
assert(static_cast<int64_t>(statuses.size()) == entries_per_batch_);
read += entries_per_batch_;
num_multireads++;
for (int64_t i = 0; i < entries_per_batch_; ++i) {
if (statuses[i].ok()) {
++found;
} else if (!statuses[i].IsNotFound()) {
fprintf(stderr, "MultiGet returned an error: %s\n",
statuses[i].ToString().c_str());
abort();
}
}
} else {
db->MultiGet(options, db->DefaultColumnFamily(), keys.size(),
keys.data(), pin_values, stat_list.data());
read += entries_per_batch_;
num_multireads++;
for (int64_t i = 0; i < entries_per_batch_; ++i) {
if (stat_list[i].ok()) {
++found;
} else if (!stat_list[i].IsNotFound()) {
fprintf(stderr, "MultiGet returned an error: %s\n",
stat_list[i].ToString().c_str());
abort();
}
stat_list[i] = Status::OK();
pin_values[i].Reset();
}
}
if (thread->shared->read_rate_limiter.get() != nullptr &&
......@@ -4575,6 +4897,255 @@ class Benchmark {
thread->stats.AddMessage(msg);
}
// THe reverse function of Pareto function
int64_t ParetoCdfInversion(double u, double theta, double k, double sigma) {
double ret;
if (k == 0.0) {
ret = theta - sigma * std::log(u);
} else {
ret = theta + sigma * (std::pow(u, -1 * k) - 1) / k;
}
return static_cast<int64_t>(ceil(ret));
}
// inversion of y=ax^b
int64_t PowerCdfInversion(double u, double a, double b) {
double ret;
ret = std::pow((u / a), (1 / b));
return static_cast<int64_t>(ceil(ret));
}
// Add the noice to the QPS
double AddNoise(double origin, double noise_ratio) {
if (noise_ratio < 0.0 || noise_ratio > 1.0) {
return origin;
}
int band_int = static_cast<int>(FLAGS_sine_a);
double delta = (rand() % band_int - band_int / 2) * noise_ratio;
if (origin + delta < 0) {
return origin;
} else {
return (origin + delta);
}
}
// decide the query type
// 0 Get, 1 Put, 2 Seek, 3 SeekForPrev, 4 Delete, 5 SingleDelete, 6 merge
class QueryDecider {
public:
std::vector<int> type_;
std::vector<double> ratio_;
int range_;
QueryDecider() {}
~QueryDecider() {}
Status Initiate(std::vector<double> ratio_input) {
int range_max = 1000;
double sum = 0.0;
for (auto& ratio : ratio_input) {
sum += ratio;
}
range_ = 0;
for (auto& ratio : ratio_input) {
range_ += static_cast<int>(ceil(range_max * (ratio / sum)));
type_.push_back(range_);
ratio_.push_back(ratio / sum);
}
return Status::OK();
}
int GetType(int64_t rand_num) {
if (rand_num < 0) {
rand_num = rand_num * (-1);
}
assert(range_ != 0);
int pos = static_cast<int>(rand_num % range_);
for (int i = 0; i < static_cast<int>(type_.size()); i++) {
if (pos < type_[i]) {
return i;
}
}
return 0;
}
};
// The graph wokrload mixed with Get, Put, Iterator
void MixGraph(ThreadState* thread) {
int64_t read = 0; // including single gets and Next of iterators
int64_t gets = 0;
int64_t puts = 0;
int64_t found = 0;
int64_t seek = 0;
int64_t seek_found = 0;
int64_t bytes = 0;
const int64_t default_value_max = 1 * 1024 * 1024;
int64_t value_max = default_value_max;
int64_t scan_len_max = FLAGS_mix_max_scan_len;
double write_rate = 1000000.0;
double read_rate = 1000000.0;
std::vector<double> ratio{FLAGS_mix_get_ratio, FLAGS_mix_put_ratio,
FLAGS_mix_seek_ratio};
char value_buffer[default_value_max];
QueryDecider query;
RandomGenerator gen;
Status s;
if (value_max > FLAGS_mix_max_value_size) {
value_max = FLAGS_mix_max_value_size;
}
ReadOptions options(FLAGS_verify_checksum, true);
std::unique_ptr<const char[]> key_guard;
Slice key = AllocateKey(&key_guard);
PinnableSlice pinnable_val;
query.Initiate(ratio);
// the limit of qps initiation
if (FLAGS_sine_a != 0 || FLAGS_sine_d != 0) {
thread->shared->read_rate_limiter.reset(NewGenericRateLimiter(
static_cast<int64_t>(read_rate), 100000 /* refill_period_us */,
10 /* fairness */, RateLimiter::Mode::kReadsOnly));
thread->shared->write_rate_limiter.reset(
NewGenericRateLimiter(static_cast<int64_t>(write_rate)));
}
Duration duration(FLAGS_duration, reads_);
while (!duration.Done(1)) {
DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(thread);
int64_t rand_v, key_rand, key_seed;
rand_v = GetRandomKey(&thread->rand) % FLAGS_num;
double u = static_cast<double>(rand_v) / FLAGS_num;
key_seed = PowerCdfInversion(u, FLAGS_key_dist_a, FLAGS_key_dist_b);
Random64 rand(key_seed);
key_rand = static_cast<int64_t>(rand.Next()) % FLAGS_num;
GenerateKeyFromInt(key_rand, FLAGS_num, &key);
int query_type = query.GetType(rand_v);
// change the qps
uint64_t now = FLAGS_env->NowMicros();
uint64_t usecs_since_last;
if (now > thread->stats.GetSineInterval()) {
usecs_since_last = now - thread->stats.GetSineInterval();
} else {
usecs_since_last = 0;
}
if (usecs_since_last >
(FLAGS_sine_mix_rate_interval_milliseconds * uint64_t{1000})) {
double usecs_since_start =
static_cast<double>(now - thread->stats.GetStart());
thread->stats.ResetSineInterval();
double mix_rate_with_noise = AddNoise(
SineRate(usecs_since_start / 1000000.0), FLAGS_sine_mix_rate_noise);
read_rate = mix_rate_with_noise * (query.ratio_[0] + query.ratio_[2]);
write_rate =
mix_rate_with_noise * query.ratio_[1] * FLAGS_mix_ave_kv_size;
thread->shared->write_rate_limiter.reset(
NewGenericRateLimiter(static_cast<int64_t>(write_rate)));
thread->shared->read_rate_limiter.reset(NewGenericRateLimiter(
static_cast<int64_t>(read_rate),
FLAGS_sine_mix_rate_interval_milliseconds * uint64_t{1000}, 10,
RateLimiter::Mode::kReadsOnly));
}
// Start the query
if (query_type == 0) {
// the Get query
gets++;
read++;
if (FLAGS_num_column_families > 1) {
s = db_with_cfh->db->Get(options, db_with_cfh->GetCfh(key_rand), key,
&pinnable_val);
} else {
pinnable_val.Reset();
s = db_with_cfh->db->Get(options,
db_with_cfh->db->DefaultColumnFamily(), key,
&pinnable_val);
}
if (s.ok()) {
found++;
bytes += key.size() + pinnable_val.size();
} else if (!s.IsNotFound()) {
fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str());
abort();
}
if (thread->shared->read_rate_limiter.get() != nullptr &&
read % 256 == 255) {
thread->shared->read_rate_limiter->Request(
256, Env::IO_HIGH, nullptr /* stats */,
RateLimiter::OpType::kRead);
}
thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kRead);
} else if (query_type == 1) {
// the Put query
puts++;
int64_t value_size = ParetoCdfInversion(
u, FLAGS_value_theta, FLAGS_value_k, FLAGS_value_sigma);
if (value_size < 0) {
value_size = 10;
} else if (value_size > value_max) {
value_size = value_size % value_max;
}
s = db_with_cfh->db->Put(
write_options_, key,
gen.Generate(static_cast<unsigned int>(value_size)));
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
}
if (thread->shared->write_rate_limiter) {
thread->shared->write_rate_limiter->Request(
key.size() + value_size, Env::IO_HIGH, nullptr /*stats*/,
RateLimiter::OpType::kWrite);
}
thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kWrite);
} else if (query_type == 2) {
// Seek query
if (db_with_cfh->db != nullptr) {
Iterator* single_iter = nullptr;
single_iter = db_with_cfh->db->NewIterator(options);
if (single_iter != nullptr) {
single_iter->Seek(key);
seek++;
read++;
if (single_iter->Valid() && single_iter->key().compare(key) == 0) {
seek_found++;
}
int64_t scan_length =
ParetoCdfInversion(u, FLAGS_iter_theta, FLAGS_iter_k,
FLAGS_iter_sigma) %
scan_len_max;
for (int64_t j = 0; j < scan_length && single_iter->Valid(); j++) {
Slice value = single_iter->value();
memcpy(value_buffer, value.data(),
std::min(value.size(), sizeof(value_buffer)));
bytes += single_iter->key().size() + single_iter->value().size();
single_iter->Next();
assert(single_iter->status().ok());
}
}
delete single_iter;
}
thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kSeek);
}
}
char msg[256];
snprintf(msg, sizeof(msg),
"( Gets:%" PRIu64 " Puts:%" PRIu64 " Seek:%" PRIu64 " of %" PRIu64
" in %" PRIu64 " found)\n",
gets, puts, seek, found, read);
thread->stats.AddBytes(bytes);
thread->stats.AddMessage(msg);
if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
get_perf_context()->ToString());
}
}
void IteratorCreation(ThreadState* thread) {
Duration duration(FLAGS_duration, reads_);
ReadOptions options(FLAGS_verify_checksum, true);
......@@ -4599,7 +5170,10 @@ class Benchmark {
int64_t found = 0;
int64_t bytes = 0;
ReadOptions options(FLAGS_verify_checksum, true);
options.total_order_seek = FLAGS_total_order_seek;
options.prefix_same_as_start = FLAGS_prefix_same_as_start;
options.tailing = FLAGS_use_tailing_iterator;
options.readahead_size = FLAGS_readahead_size;
Iterator* single_iter = nullptr;
std::vector<Iterator*> multi_iters;
......@@ -4623,7 +5197,8 @@ class Benchmark {
char value_buffer[256];
while (!duration.Done(1)) {
int64_t seek_pos = thread->rand.Next() % FLAGS_num;
GenerateKeyFromInt((uint64_t)seek_pos, FLAGS_num, &key);
GenerateKeyFromIntForSeek(static_cast<uint64_t>(seek_pos), FLAGS_num,
&key);
if (FLAGS_max_scan_distance != 0) {
if (FLAGS_reverse_iterator) {
GenerateKeyFromInt(
......@@ -4632,9 +5207,10 @@ class Benchmark {
FLAGS_num, &lower_bound);
options.iterate_lower_bound = &lower_bound;
} else {
GenerateKeyFromInt(
(uint64_t)std::min(FLAGS_num, seek_pos + FLAGS_max_scan_distance),
FLAGS_num, &upper_bound);
auto min_num =
std::min(FLAGS_num, seek_pos + FLAGS_max_scan_distance);
GenerateKeyFromInt(static_cast<uint64_t>(min_num), FLAGS_num,
&upper_bound);
options.iterate_upper_bound = &upper_bound;
}
}
......@@ -4798,7 +5374,7 @@ class Benchmark {
// Wait for the writes to be finished
if (!hint_printed) {
fprintf(stderr, "Reads are finished. Have %d more writes to do\n",
(int)writes_ - written);
static_cast<int>(writes_) - written);
hint_printed = true;
}
} else {
......@@ -5699,7 +6275,8 @@ class Benchmark {
void Compact(ThreadState* thread) {
DB* db = SelectDB(thread);
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
cro.bottommost_level_compaction =
BottommostLevelCompaction::kForceOptimized;
db->CompactRange(cro, nullptr, nullptr);
}
......@@ -5721,6 +6298,39 @@ class Benchmark {
}
}
void PrintStatsHistory() {
if (db_.db != nullptr) {
PrintStatsHistoryImpl(db_.db, false);
}
for (const auto& db_with_cfh : multi_dbs_) {
PrintStatsHistoryImpl(db_with_cfh.db, true);
}
}
void PrintStatsHistoryImpl(DB* db, bool print_header) {
if (print_header) {
fprintf(stdout, "\n==== DB: %s ===\n", db->GetName().c_str());
}
std::unique_ptr<StatsHistoryIterator> shi;
Status s = db->GetStatsHistory(0, port::kMaxUint64, &shi);
if (!s.ok()) {
fprintf(stdout, "%s\n", s.ToString().c_str());
return;
}
assert(shi);
while (shi->Valid()) {
uint64_t stats_time = shi->GetStatsTime();
fprintf(stdout, "------ %s ------\n",
TimeToHumanString(static_cast<int>(stats_time)).c_str());
for (auto& entry : shi->GetStatsMap()) {
fprintf(stdout, " %" PRIu64 " %s %" PRIu64 "\n", stats_time,
entry.first.c_str(), entry.second);
}
shi->Next();
}
}
void PrintStats(const char* key) {
if (db_.db != nullptr) {
PrintStats(db_.db, key, false);
......@@ -5762,6 +6372,8 @@ class Benchmark {
}
Replayer replayer(db_with_cfh->db, db_with_cfh->cfh,
std::move(trace_reader));
replayer.SetFastForward(
static_cast<uint32_t>(FLAGS_trace_replay_fast_forward));
s = replayer.Replay();
if (s.ok()) {
fprintf(stdout, "Replay started from trace_file: %s\n",
......@@ -5790,13 +6402,12 @@ int db_bench_tool(int argc, char** argv) {
exit(1);
}
if (!FLAGS_statistics_string.empty()) {
std::unique_ptr<Statistics> custom_stats_guard;
dbstats.reset(NewCustomObject<Statistics>(FLAGS_statistics_string,
&custom_stats_guard));
custom_stats_guard.release();
Status s = ObjectRegistry::NewInstance()->NewSharedObject<Statistics>(
FLAGS_statistics_string, &dbstats);
if (dbstats == nullptr) {
fprintf(stderr, "No Statistics registered matching string: %s\n",
FLAGS_statistics_string.c_str());
fprintf(stderr,
"No Statistics registered matching string: %s status=%s\n",
FLAGS_statistics_string.c_str(), s.ToString().c_str());
exit(1);
}
}
......@@ -5804,6 +6415,9 @@ int db_bench_tool(int argc, char** argv) {
if (FLAGS_statistics) {
dbstats = rocksdb::CreateDBStatistics();
}
if (dbstats) {
dbstats->set_stats_level(static_cast<StatsLevel>(FLAGS_stats_level));
}
FLAGS_compaction_pri_e = (rocksdb::CompactionPri)FLAGS_compaction_pri;
std::vector<std::string> fanout = rocksdb::StringSplit(
......@@ -5821,18 +6435,24 @@ int db_bench_tool(int argc, char** argv) {
StringToCompressionType(FLAGS_compression_type.c_str());
#ifndef ROCKSDB_LITE
std::unique_ptr<Env> custom_env_guard;
if (!FLAGS_hdfs.empty() && !FLAGS_env_uri.empty()) {
fprintf(stderr, "Cannot provide both --hdfs and --env_uri.\n");
exit(1);
} else if (!FLAGS_env_uri.empty()) {
FLAGS_env = NewCustomObject<Env>(FLAGS_env_uri, &custom_env_guard);
Status s = Env::LoadEnv(FLAGS_env_uri, &FLAGS_env);
if (FLAGS_env == nullptr) {
fprintf(stderr, "No Env registered for URI: %s\n", FLAGS_env_uri.c_str());
exit(1);
}
}
#endif // ROCKSDB_LITE
if (FLAGS_use_existing_keys && !FLAGS_use_existing_db) {
fprintf(stderr,
"`-use_existing_db` must be true for `-use_existing_keys` to be "
"settable\n");
exit(1);
}
if (!FLAGS_hdfs.empty()) {
FLAGS_env = new rocksdb::HdfsEnv(FLAGS_hdfs);
}
......@@ -5875,6 +6495,11 @@ int db_bench_tool(int argc, char** argv) {
FLAGS_stats_interval = 1000;
}
if (FLAGS_seek_missing_prefix && FLAGS_prefix_size <= 8) {
fprintf(stderr, "prefix_size > 8 required by --seek_missing_prefix\n");
exit(1);
}
rocksdb::Benchmark benchmark;
benchmark.Run();
......
......@@ -43,9 +43,10 @@ int main() {
#include <queue>
#include <thread>
#include "db/db_impl.h"
#include "db/db_impl/db_impl.h"
#include "db/version_set.h"
#include "hdfs/env_hdfs.h"
#include "logging/logging.h"
#include "monitoring/histogram.h"
#include "options/options_helper.h"
#include "port/port.h"
......@@ -65,15 +66,14 @@ int main() {
#include "util/compression.h"
#include "util/crc32c.h"
#include "util/gflags_compat.h"
#include "util/logging.h"
#include "util/mutexlock.h"
#include "util/random.h"
#include "util/string_util.h"
// SyncPoint is not supported in Released Windows Mode.
#if !(defined NDEBUG) || !defined(OS_WIN)
#include "util/sync_point.h"
#include "test_util/sync_point.h"
#endif // !(defined NDEBUG) || !defined(OS_WIN)
#include "util/testutil.h"
#include "test_util/testutil.h"
#include "utilities/merge_operators.h"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment