Unverified Commit 04850259 authored by yiwu-arbug's avatar yiwu-arbug Committed by GitHub

Update titandb_stress to the version in rocksdb 6.4.x (#76)

Summary:
#61 update default rocksdb to 6.4.x, but didn't update titandb_stress accordingly. Update titandb_stress to pick up recent changes.

Test Plan:
build
Signed-off-by: 's avatarYi Wu <yiwu@pingcap.com>
parent 07aa0655
......@@ -28,17 +28,14 @@ int main() {
}
#else
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif // __STDC_FORMAT_MACROS
#include <fcntl.h>
#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <algorithm>
#include <array>
#include <chrono>
#include <cinttypes>
#include <exception>
#include <queue>
#include <thread>
......@@ -58,6 +55,7 @@ int main() {
#include "rocksdb/utilities/backupable_db.h"
#include "rocksdb/utilities/checkpoint.h"
#include "rocksdb/utilities/db_ttl.h"
#include "rocksdb/utilities/debug.h"
#include "rocksdb/utilities/options_util.h"
#include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h"
......@@ -85,7 +83,7 @@ using GFLAGS_NAMESPACE::SetUsageMessage;
static const long KB = 1024;
static const int kRandomValueMaxFactor = 3;
static const int kValueMaxLen = 1024 * 1024;
static const int kValueMaxLen = 100;
static bool ValidateUint32Range(const char* flagname, uint64_t value) {
if (value > std::numeric_limits<uint32_t>::max()) {
......@@ -100,6 +98,8 @@ DEFINE_uint64(seed, 2341234, "Seed for PRNG");
static const bool FLAGS_seed_dummy __attribute__((__unused__)) =
RegisterFlagValidator(&FLAGS_seed, &ValidateUint32Range);
DEFINE_bool(read_only, false, "True if open DB in read-only mode during tests");
DEFINE_int64(max_key, 1 * KB * KB,
"Max number of key/values to place in database");
......@@ -133,6 +133,13 @@ DEFINE_bool(test_batches_snapshots, false,
"\t(b) No long validation at the end (more speed up)\n"
"\t(c) Test snapshot and atomicity of batch writes");
DEFINE_bool(atomic_flush, false,
"If set, enables atomic flush in the options.\n");
DEFINE_bool(test_atomic_flush, false,
"If set, runs the stress test dedicated to verifying atomic flush "
"functionality. Setting this implies `atomic_flush=true`.\n");
DEFINE_int32(threads, 32, "Number of concurrent threads to run.");
DEFINE_int32(ttl, -1,
......@@ -201,6 +208,10 @@ DEFINE_double(memtable_prefix_bloom_size_ratio,
"creates prefix blooms for memtables, each with size "
"`write_buffer_size * memtable_prefix_bloom_size_ratio`.");
DEFINE_bool(memtable_whole_key_filtering,
rocksdb::Options().memtable_whole_key_filtering,
"Enable whole key filtering in memtables.");
DEFINE_int32(open_files, rocksdb::Options().max_open_files,
"Maximum number of files to keep open at the same time "
"(use default if == 0)");
......@@ -232,6 +243,11 @@ DEFINE_int32(
static_cast<int32_t>(rocksdb::BlockBasedTableOptions().format_version),
"Format version of SST files.");
DEFINE_int32(index_block_restart_interval,
rocksdb::BlockBasedTableOptions().index_block_restart_interval,
"Number of keys between restart points "
"for delta encoding of keys in index block.");
DEFINE_int32(max_background_compactions,
rocksdb::Options().max_background_compactions,
"The maximum number of concurrent background compactions "
......@@ -282,6 +298,9 @@ DEFINE_int32(set_in_place_one_in, 0,
DEFINE_int64(cache_size, 2LL * KB * KB * KB,
"Number of bytes to use as a cache of uncompressed data.");
DEFINE_bool(cache_index_and_filter_blocks, false,
"True if indexes/filters should be cached in block cache.");
DEFINE_bool(use_clock_cache, false,
"Replace default LRU block cache with clock cache.");
......@@ -320,6 +339,11 @@ DEFINE_bool(use_block_based_filter, false,
DEFINE_string(db, "", "Use the db with the following name.");
DEFINE_string(secondaries_base, "",
"Use this path as the base path for secondary instances.");
DEFINE_bool(enable_secondary, false, "Enable secondary instance.");
DEFINE_string(
expected_values_path, "",
"File where the array of expected uint32_t values will be stored. If "
......@@ -366,6 +390,9 @@ extern std::vector<std::string> rocksdb_kill_prefix_blacklist;
DEFINE_bool(disable_wal, false, "If true, do not write WAL for write.");
DEFINE_uint64(recycle_log_file_num, rocksdb::Options().recycle_log_file_num,
"Number of old WAL files to keep around for later recycling");
DEFINE_int64(target_file_size_base, rocksdb::Options().target_file_size_base,
"Target level-1 file size for compaction");
......@@ -417,6 +444,10 @@ DEFINE_int32(compact_range_one_in, 0,
"If non-zero, then CompactRange() will be called once for every N "
"operations on average. 0 indicates CompactRange() is disabled.");
DEFINE_int32(flush_one_in, 0,
"If non-zero, then Flush() will be called once for every N ops "
"on average. 0 indicates calls to Flush() are disabled.");
DEFINE_int32(compact_range_width, 10000,
"The width of the ranges passed to CompactRange().");
......@@ -424,22 +455,16 @@ DEFINE_int32(acquire_snapshot_one_in, 0,
"If non-zero, then acquires a snapshot once every N operations on "
"average.");
DEFINE_bool(compare_full_db_state_snapshot, false,
"If set we compare state of entire db (in one of the threads) with"
"each snapshot.");
DEFINE_uint64(snapshot_hold_ops, 0,
"If non-zero, then releases snapshots N operations after they're "
"acquired.");
DEFINE_bool(use_titandb, true, "Use TitanDB");
DEFINE_bool(disable_background_gc, false, "Disable background gc");
DEFINE_int32(max_background_gc,
rocksdb::titandb::TitanOptions().max_background_gc,
"The maximum number of concurrent background gc "
"that can occur in parallel.");
DEFINE_uint64(min_blob_size, 0,
"Smallest blob to store in a file. Blob smaller than this "
"will be inlined with the key in the LSM tree.");
DEFINE_bool(use_multiget, false,
"If set, use the batched MultiGet API for reads");
static bool ValidateInt32Percent(const char* flagname, int32_t value) {
if (value < 0 || value > 100) {
......@@ -493,6 +518,23 @@ DEFINE_uint64(num_iterations, 10, "Number of iterations per MultiIterate run");
static const bool FLAGS_num_iterations_dummy __attribute__((__unused__)) =
RegisterFlagValidator(&FLAGS_num_iterations, &ValidateUint32Range);
DEFINE_uint64(
snap_refresh_nanos, 100 * 1000 * 1000,
"If non-zero, compactions will periodically refresh snapshot list.");
DEFINE_bool(use_titandb, true, "Use TitanDB");
DEFINE_bool(disable_background_gc, false, "Disable background gc");
DEFINE_int32(max_background_gc,
rocksdb::titandb::TitanOptions().max_background_gc,
"The maximum number of concurrent background gc "
"that can occur in parallel.");
DEFINE_uint64(min_blob_size, 0,
"Smallest blob to store in a file. Blob smaller than this "
"will be inlined with the key in the LSM tree.");
namespace {
enum rocksdb::CompressionType StringToCompressionType(const char* ctype) {
assert(ctype);
......@@ -559,6 +601,14 @@ DEFINE_string(compression_type, "snappy",
static enum rocksdb::CompressionType FLAGS_compression_type_e =
rocksdb::kSnappyCompression;
DEFINE_int32(compression_max_dict_bytes, 0,
"Maximum size of dictionary used to prime the compression "
"library.");
DEFINE_int32(compression_zstd_max_train_bytes, 0,
"Maximum size of training data passed to zstd's dictionary "
"trainer.");
DEFINE_string(checksum_type, "kCRC32c", "Algorithm to use to checksum blocks");
static enum rocksdb::ChecksumType FLAGS_checksum_type_e = rocksdb::kCRC32c;
......@@ -578,6 +628,13 @@ DEFINE_uint64(max_manifest_file_size, 16384, "Maximum size of a MANIFEST file");
DEFINE_bool(in_place_update, false, "On true, does inplace update in memtable");
DEFINE_int32(secondary_catch_up_one_in, 0,
"If non-zero, the secondaries attemp to catch up with the primary "
"once for every N operations on average. 0 indicates the "
"secondaries do not try to catch up after open.");
static std::shared_ptr<rocksdb::Statistics> dbstats_secondaries;
enum RepFactory { kSkipList, kHashSkipList, kVectorRep };
namespace {
......@@ -594,6 +651,31 @@ enum RepFactory StringToRepFactory(const char* ctype) {
fprintf(stdout, "Cannot parse memreptable %s\n", ctype);
return kSkipList;
}
#ifdef _MSC_VER
#pragma warning(push)
// truncation of constant value on static_cast
#pragma warning(disable : 4309)
#endif
bool GetNextPrefix(const rocksdb::Slice& src, std::string* v) {
std::string ret = src.ToString();
for (int i = static_cast<int>(ret.size()) - 1; i >= 0; i--) {
if (ret[i] != static_cast<char>(255)) {
ret[i] = ret[i] + 1;
break;
} else if (i != 0) {
ret[i] = 0;
} else {
// all FF. No next prefix
return false;
}
}
*v = ret;
return true;
}
#ifdef _MSC_VER
#pragma warning(pop)
#endif
} // namespace
static enum RepFactory FLAGS_rep_factory;
......@@ -634,6 +716,18 @@ static std::string Key(int64_t val) {
return big_endian_key;
}
static bool GetIntVal(std::string big_endian_key, uint64_t* key_p) {
unsigned int size_key = sizeof(*key_p);
assert(big_endian_key.size() == size_key);
std::string little_endian_key;
little_endian_key.resize(size_key);
for (size_t i = 0; i < size_key; ++i) {
little_endian_key[i] = big_endian_key[size_key - 1 - i];
}
Slice little_endian_slice = Slice(little_endian_key);
return GetFixed64(&little_endian_slice, key_p);
}
static std::string StringToHex(const std::string& str) {
std::string result = "0x";
result.append(Slice(str).ToString(true));
......@@ -754,36 +848,36 @@ class Stats {
}
}
void AddBytesForWrites(int nwrites, size_t nbytes) {
void AddBytesForWrites(long nwrites, size_t nbytes) {
writes_ += nwrites;
bytes_ += nbytes;
}
void AddGets(int ngets, int nfounds) {
void AddGets(long ngets, long nfounds) {
founds_ += nfounds;
gets_ += ngets;
}
void AddPrefixes(int nprefixes, int count) {
void AddPrefixes(long nprefixes, long count) {
prefixes_ += nprefixes;
iterator_size_sums_ += count;
}
void AddIterations(int n) { iterations_ += n; }
void AddIterations(long n) { iterations_ += n; }
void AddDeletes(int n) { deletes_ += n; }
void AddDeletes(long n) { deletes_ += n; }
void AddSingleDeletes(size_t n) { single_deletes_ += n; }
void AddRangeDeletions(int n) { range_deletions_ += n; }
void AddRangeDeletions(long n) { range_deletions_ += n; }
void AddCoveredByRangeDeletions(int n) { covered_by_range_deletions_ += n; }
void AddCoveredByRangeDeletions(long n) { covered_by_range_deletions_ += n; }
void AddErrors(int n) { errors_ += n; }
void AddErrors(long n) { errors_ += n; }
void AddNumCompactFilesSucceed(int n) { num_compact_files_succeed_ += n; }
void AddNumCompactFilesSucceed(long n) { num_compact_files_succeed_ += n; }
void AddNumCompactFilesFailed(int n) { num_compact_files_failed_ += n; }
void AddNumCompactFilesFailed(long n) { num_compact_files_failed_ += n; }
void Report(const char* name) {
std::string extra;
......@@ -855,7 +949,8 @@ class SharedState {
stress_test_(stress_test),
verification_failure_(false),
no_overwrite_ids_(FLAGS_column_families),
values_(nullptr) {
values_(nullptr),
printing_verification_results_(false) {
// Pick random keys in each column family that will not experience
// overwrite
......@@ -1102,6 +1197,16 @@ class SharedState {
return expected_mmap_buffer_.get() != nullptr;
}
bool PrintingVerificationResults() {
bool tmp = false;
return !printing_verification_results_.compare_exchange_strong(
tmp, true, std::memory_order_relaxed);
}
void FinishPrintingVerificationResults() {
printing_verification_results_.store(false, std::memory_order_relaxed);
}
private:
port::Mutex mu_;
port::CondVar cv_;
......@@ -1129,6 +1234,7 @@ class SharedState {
// and storing it in the container may require copying depending on the impl.
std::vector<std::vector<std::unique_ptr<port::Mutex> > > key_locks_;
std::unique_ptr<MemoryMappedFileBuffer> expected_mmap_buffer_;
std::atomic<bool> printing_verification_results_;
};
const uint32_t SharedState::UNKNOWN_SENTINEL = 0xfffffffe;
......@@ -1152,6 +1258,8 @@ struct ThreadState {
Status status;
// The value of the Get
std::string value;
// optional state of all keys in the db
std::vector<bool>* key_vec;
};
std::queue<std::pair<uint64_t, SnapshotState> > snapshot_queue;
......@@ -1166,8 +1274,9 @@ class DbStressListener : public EventListener {
const std::vector<ColumnFamilyDescriptor>& column_families)
: db_name_(db_name),
db_paths_(db_paths),
column_families_(column_families) {}
virtual ~DbStressListener() {}
column_families_(column_families),
num_pending_file_creations_(0) {}
virtual ~DbStressListener() { assert(num_pending_file_creations_ == 0); }
#ifndef ROCKSDB_LITE
virtual void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
assert(IsValidColumnFamilyName(info.cf_name));
......@@ -1192,17 +1301,24 @@ class DbStressListener : public EventListener {
std::chrono::microseconds(Random::GetTLSInstance()->Uniform(5000)));
}
virtual void OnTableFileCreationStarted(
const TableFileCreationBriefInfo& /*info*/) override {
++num_pending_file_creations_;
}
virtual void OnTableFileCreated(const TableFileCreationInfo& info) override {
assert(info.db_name == db_name_);
assert(IsValidColumnFamilyName(info.cf_name));
VerifyFilePath(info.file_path);
if (info.file_size) {
VerifyFilePath(info.file_path);
}
assert(info.job_id > 0 || FLAGS_compact_files_one_in > 0);
if (info.status.ok()) {
assert(info.file_size > 0);
assert(info.table_properties.data_size > 0);
if (info.status.ok() && info.file_size > 0) {
assert(info.table_properties.data_size > 0 ||
info.table_properties.num_range_deletions > 0);
assert(info.table_properties.raw_key_size > 0);
assert(info.table_properties.num_entries > 0);
}
--num_pending_file_creations_;
}
protected:
......@@ -1275,6 +1391,7 @@ class DbStressListener : public EventListener {
std::string db_name_;
std::vector<DbPath> db_paths_;
std::vector<ColumnFamilyDescriptor> column_families_;
std::atomic<int> num_pending_file_creations_;
};
} // namespace
......@@ -1294,7 +1411,8 @@ class StressTest {
txn_db_(nullptr),
#endif
new_column_family_name_(1),
num_times_reopened_(0) {
num_times_reopened_(0),
db_preload_finished_(false) {
if (FLAGS_destroy_db_initially) {
std::vector<std::string> files;
FLAGS_env->GetChildren(FLAGS_db, &files);
......@@ -1303,7 +1421,14 @@ class StressTest {
FLAGS_env->DeleteFile(FLAGS_db + "/" + files[i]);
}
}
DestroyDB(FLAGS_db, Options());
Options options;
options.env = FLAGS_env;
Status s = DestroyDB(FLAGS_db, options);
if (!s.ok()) {
fprintf(stderr, "Cannot destroy original db: %s\n",
s.ToString().c_str());
exit(1);
}
}
}
......@@ -1313,6 +1438,17 @@ class StressTest {
}
column_families_.clear();
delete db_;
assert(secondaries_.size() == secondary_cfh_lists_.size());
size_t n = secondaries_.size();
for (size_t i = 0; i != n; ++i) {
for (auto* cf : secondary_cfh_lists_[i]) {
delete cf;
}
secondary_cfh_lists_[i].clear();
delete secondaries_[i];
}
secondaries_.clear();
}
std::shared_ptr<Cache> NewCache(size_t capacity) {
......@@ -1425,6 +1561,13 @@ class StressTest {
Open();
BuildOptionsTable();
SharedState shared(this);
if (FLAGS_read_only) {
now = FLAGS_env->NowMicros();
fprintf(stdout, "%s Preloading db with %" PRIu64 " KVs\n",
FLAGS_env->TimeToString(now / 1000000).c_str(), FLAGS_max_key);
PreloadDbAndReopenAsReadOnly(FLAGS_max_key, &shared);
}
uint32_t n = shared.GetNumThreads();
now = FLAGS_env->NowMicros();
......@@ -1507,6 +1650,60 @@ class StressTest {
}
}
#ifndef ROCKSDB_LITE
if (FLAGS_enable_secondary) {
now = FLAGS_env->NowMicros();
fprintf(stdout, "%s Start to verify secondaries against primary\n",
FLAGS_env->TimeToString(static_cast<uint64_t>(now) / 1000000)
.c_str());
}
for (size_t k = 0; k != secondaries_.size(); ++k) {
Status s = secondaries_[k]->TryCatchUpWithPrimary();
if (!s.ok()) {
fprintf(stderr, "Secondary failed to catch up with primary\n");
return false;
}
ReadOptions ropts;
ropts.total_order_seek = true;
// Verify only the default column family since the primary may have
// dropped other column families after most recent reopen.
std::unique_ptr<Iterator> iter1(db_->NewIterator(ropts));
std::unique_ptr<Iterator> iter2(secondaries_[k]->NewIterator(ropts));
for (iter1->SeekToFirst(), iter2->SeekToFirst();
iter1->Valid() && iter2->Valid(); iter1->Next(), iter2->Next()) {
if (iter1->key().compare(iter2->key()) != 0 ||
iter1->value().compare(iter2->value())) {
fprintf(stderr,
"Secondary %d contains different data from "
"primary.\nPrimary: %s : %s\nSecondary: %s : %s\n",
static_cast<int>(k),
iter1->key().ToString(/*hex=*/true).c_str(),
iter1->value().ToString(/*hex=*/true).c_str(),
iter2->key().ToString(/*hex=*/true).c_str(),
iter2->value().ToString(/*hex=*/true).c_str());
return false;
}
}
if (iter1->Valid() && !iter2->Valid()) {
fprintf(stderr,
"Secondary %d record count is smaller than that of primary\n",
static_cast<int>(k));
return false;
} else if (!iter1->Valid() && iter2->Valid()) {
fprintf(stderr,
"Secondary %d record count is larger than that of primary\n",
static_cast<int>(k));
return false;
}
}
if (FLAGS_enable_secondary) {
now = FLAGS_env->NowMicros();
fprintf(stdout, "%s Verification of secondaries succeeded\n",
FLAGS_env->TimeToString(static_cast<uint64_t>(now) / 1000000)
.c_str());
}
#endif // ROCKSDB_LITE
if (shared.HasVerificationFailedYet()) {
printf("Verification failed :(\n");
return false;
......@@ -1612,6 +1809,25 @@ class StressTest {
return base_key + thread->rand.Next() % FLAGS_active_width;
}
static std::vector<int64_t> GenerateNKeys(ThreadState* thread, int num_keys,
uint64_t iteration) {
const double completed_ratio =
static_cast<double>(iteration) / FLAGS_ops_per_thread;
const int64_t base_key = static_cast<int64_t>(
completed_ratio * (FLAGS_max_key - FLAGS_active_width));
std::vector<int64_t> keys;
keys.reserve(num_keys);
int64_t next_key = base_key + thread->rand.Next() % FLAGS_active_width;
keys.push_back(next_key);
for (int i = 1; i < num_keys; ++i) {
// This may result in some duplicate keys
next_key = next_key + thread->rand.Next() %
(FLAGS_active_width - (next_key - base_key));
keys.push_back(next_key);
}
return keys;
}
static size_t GenerateValue(uint32_t rand, char* v, size_t max_sz) {
size_t value_sz =
((rand % kRandomValueMaxFactor) + 1) * FLAGS_value_size_mult;
......@@ -1654,9 +1870,114 @@ class StressTest {
")");
}
}
if (snap_state.key_vec != nullptr) {
// When `prefix_extractor` is set, seeking to beginning and scanning
// across prefixes are only supported with `total_order_seek` set.
ropt.total_order_seek = true;
std::unique_ptr<Iterator> iterator(db->NewIterator(ropt));
std::unique_ptr<std::vector<bool> > tmp_bitvec(
new std::vector<bool>(FLAGS_max_key));
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
uint64_t key_val;
if (GetIntVal(iterator->key().ToString(), &key_val)) {
(*tmp_bitvec.get())[key_val] = true;
}
}
if (!std::equal(snap_state.key_vec->begin(), snap_state.key_vec->end(),
tmp_bitvec.get()->begin())) {
return Status::Corruption("Found inconsistent keys at this snapshot");
}
}
return Status::OK();
}
// Currently PreloadDb has to be single-threaded.
void PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,
SharedState* shared) {
WriteOptions write_opts;
write_opts.disableWAL = FLAGS_disable_wal;
if (FLAGS_sync) {
write_opts.sync = true;
}
char value[100];
int cf_idx = 0;
Status s;
for (auto cfh : column_families_) {
for (int64_t k = 0; k != number_of_keys; ++k) {
std::string key_str = Key(k);
Slice key = key_str;
size_t sz = GenerateValue(0 /*value_base*/, value, sizeof(value));
Slice v(value, sz);
shared->Put(cf_idx, k, 0, true /* pending */);
if (FLAGS_use_merge) {
if (!FLAGS_use_txn) {
s = db_->Merge(write_opts, cfh, key, v);
} else {
#ifndef ROCKSDB_LITE
Transaction* txn;
s = NewTxn(write_opts, &txn);
if (s.ok()) {
s = txn->Merge(cfh, key, v);
if (s.ok()) {
s = CommitTxn(txn);
}
}
#endif
}
} else {
if (!FLAGS_use_txn) {
s = db_->Put(write_opts, cfh, key, v);
} else {
#ifndef ROCKSDB_LITE
Transaction* txn;
s = NewTxn(write_opts, &txn);
if (s.ok()) {
s = txn->Put(cfh, key, v);
if (s.ok()) {
s = CommitTxn(txn);
}
}
#endif
}
}
shared->Put(cf_idx, k, 0, false /* pending */);
if (!s.ok()) {
break;
}
}
if (!s.ok()) {
break;
}
++cf_idx;
}
if (s.ok()) {
s = db_->Flush(FlushOptions(), column_families_);
}
if (s.ok()) {
for (auto cf : column_families_) {
delete cf;
}
column_families_.clear();
delete db_;
db_ = nullptr;
#ifndef ROCKSDB_LITE
txn_db_ = nullptr;
#endif
db_preload_finished_.store(true);
auto now = FLAGS_env->NowMicros();
fprintf(stdout, "%s Reopening database in read-only\n",
FLAGS_env->TimeToString(now / 1000000).c_str());
// Reopen as read-only, can ignore all options related to updates
Open();
} else {
fprintf(stderr, "Failed to preload db");
exit(1);
}
}
Status SetOptions(ThreadState* thread) {
assert(FLAGS_set_options_one_in > 0);
std::unordered_map<std::string, std::string> opts;
......@@ -1714,7 +2035,7 @@ class StressTest {
ReadOptions read_opts(FLAGS_verify_checksum, true);
WriteOptions write_opts;
auto shared = thread->shared;
char value[kValueMaxLen];
char value[100];
std::string from_db;
if (FLAGS_sync) {
write_opts.sync = true;
......@@ -1724,19 +2045,26 @@ class StressTest {
const int writeBound = prefixBound + (int)FLAGS_writepercent;
const int delBound = writeBound + (int)FLAGS_delpercent;
const int delRangeBound = delBound + (int)FLAGS_delrangepercent;
const uint64_t ops_per_open = FLAGS_ops_per_thread / (FLAGS_reopen + 1);
int multiget_batch_size = 0;
thread->stats.Start();
for (uint64_t i = 0; i < FLAGS_ops_per_thread; i++) {
if (thread->shared->HasVerificationFailedYet()) {
break;
}
if (i != 0 && (i % (FLAGS_ops_per_thread / (FLAGS_reopen + 1))) == 0) {
// Check if the multiget batch crossed the ops_per_open boundary. If it
// did, then we should vote to reopen
if (i != 0 &&
(i % ops_per_open == 0 ||
i % ops_per_open < (i - multiget_batch_size) % ops_per_open)) {
{
thread->stats.FinishedSingleOp();
MutexLock l(thread->shared->GetMutex());
while (!thread->snapshot_queue.empty()) {
db_->ReleaseSnapshot(
thread->snapshot_queue.front().second.snapshot);
delete thread->snapshot_queue.front().second.key_vec;
thread->snapshot_queue.pop();
}
thread->shared->IncVotedReopen();
......@@ -1765,49 +2093,6 @@ class StressTest {
MaybeClearOneColumnFamily(thread);
#ifndef ROCKSDB_LITE
if (FLAGS_checkpoint_one_in > 0 &&
thread->rand.Uniform(FLAGS_checkpoint_one_in) == 0) {
std::string checkpoint_dir =
FLAGS_db + "/.checkpoint" + ToString(thread->tid);
DestroyDB(checkpoint_dir, Options());
Checkpoint* checkpoint;
Status s = Checkpoint::Create(db_, &checkpoint);
if (s.ok()) {
s = checkpoint->CreateCheckpoint(checkpoint_dir);
}
std::vector<std::string> files;
if (s.ok()) {
s = FLAGS_env->GetChildren(checkpoint_dir, &files);
}
DestroyDB(checkpoint_dir, Options());
delete checkpoint;
if (!s.ok()) {
printf("A checkpoint operation failed with: %s\n",
s.ToString().c_str());
}
}
if (FLAGS_backup_one_in > 0 &&
thread->rand.Uniform(FLAGS_backup_one_in) == 0) {
std::string backup_dir = FLAGS_db + "/.backup" + ToString(thread->tid);
BackupableDBOptions backup_opts(backup_dir);
BackupEngine* backup_engine = nullptr;
Status s = BackupEngine::Open(FLAGS_env, backup_opts, &backup_engine);
if (s.ok()) {
s = backup_engine->CreateNewBackup(db_);
}
if (s.ok()) {
s = backup_engine->PurgeOldBackups(0 /* num_backups_to_keep */);
}
if (!s.ok()) {
printf("A BackupEngine operation failed with: %s\n",
s.ToString().c_str());
}
if (backup_engine != nullptr) {
delete backup_engine;
}
}
if (FLAGS_compact_files_one_in > 0 &&
thread->rand.Uniform(FLAGS_compact_files_one_in) == 0) {
auto* random_cf =
......@@ -1847,8 +2132,8 @@ class StressTest {
db_->CompactFiles(CompactionOptions(), random_cf, input_files,
static_cast<int>(output_level));
if (!s.ok()) {
printf("Unable to perform CompactFiles(): %s\n",
s.ToString().c_str());
fprintf(stdout, "Unable to perform CompactFiles(): %s\n",
s.ToString().c_str());
thread->stats.AddNumCompactFilesFailed(1);
} else {
thread->stats.AddNumCompactFilesSucceed(1);
......@@ -1891,9 +2176,45 @@ class StressTest {
}
}
std::vector<int> rand_column_families =
GenerateColumnFamilies(FLAGS_column_families, rand_column_family);
if (FLAGS_flush_one_in > 0 &&
thread->rand.Uniform(FLAGS_flush_one_in) == 0) {
FlushOptions flush_opts;
std::vector<ColumnFamilyHandle*> cfhs;
std::for_each(
rand_column_families.begin(), rand_column_families.end(),
[this, &cfhs](int k) { cfhs.push_back(column_families_[k]); });
Status status = db_->Flush(flush_opts, cfhs);
if (!status.ok()) {
fprintf(stdout, "Unable to perform Flush(): %s\n",
status.ToString().c_str());
}
}
std::vector<int64_t> rand_keys = GenerateKeys(rand_key);
if (FLAGS_ingest_external_file_one_in > 0 &&
thread->rand.Uniform(FLAGS_ingest_external_file_one_in) == 0) {
TestIngestExternalFile(thread, {rand_column_family}, {rand_key}, lock);
TestIngestExternalFile(thread, rand_column_families, rand_keys, lock);
}
if (FLAGS_backup_one_in > 0 &&
thread->rand.Uniform(FLAGS_backup_one_in) == 0) {
Status s = TestBackupRestore(thread, rand_column_families, rand_keys);
if (!s.ok()) {
VerificationAbort(shared, "Backup/restore gave inconsistent state",
s);
}
}
if (FLAGS_checkpoint_one_in > 0 &&
thread->rand.Uniform(FLAGS_checkpoint_one_in) == 0) {
Status s = TestCheckpoint(thread, rand_column_families, rand_keys);
if (!s.ok()) {
VerificationAbort(shared, "Checkpoint gave inconsistent state", s);
}
}
if (FLAGS_acquire_snapshot_one_in > 0 &&
......@@ -1906,15 +2227,32 @@ class StressTest {
// will later read the same key before releasing the snapshot and verify
// that the results are the same.
auto status_at = db_->Get(ropt, column_family, key, &value_at);
std::vector<bool>* key_vec = nullptr;
if (FLAGS_compare_full_db_state_snapshot && (thread->tid == 0)) {
key_vec = new std::vector<bool>(FLAGS_max_key);
// When `prefix_extractor` is set, seeking to beginning and scanning
// across prefixes are only supported with `total_order_seek` set.
ropt.total_order_seek = true;
std::unique_ptr<Iterator> iterator(db_->NewIterator(ropt));
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
uint64_t key_val;
if (GetIntVal(iterator->key().ToString(), &key_val)) {
(*key_vec)[key_val] = true;
}
}
}
ThreadState::SnapshotState snap_state = {
snapshot, rand_column_family, column_family->GetName(),
keystr, status_at, value_at};
keystr, status_at, value_at,
key_vec};
thread->snapshot_queue.emplace(
std::min(FLAGS_ops_per_thread - 1, i + FLAGS_snapshot_hold_ops),
snap_state);
}
while (!thread->snapshot_queue.empty() &&
i == thread->snapshot_queue.front().first) {
i >= thread->snapshot_queue.front().first) {
auto snap_state = thread->snapshot_queue.front().second;
assert(snap_state.snapshot);
// Note: this is unsafe as the cf might be dropped concurrently. But it
......@@ -1926,36 +2264,68 @@ class StressTest {
VerificationAbort(shared, "Snapshot gave inconsistent state", s);
}
db_->ReleaseSnapshot(snap_state.snapshot);
delete snap_state.key_vec;
thread->snapshot_queue.pop();
}
int prob_op = thread->rand.Uniform(100);
// Reset this in case we pick something other than a read op. We don't
// want to use a stale value when deciding at the beginning of the loop
// whether to vote to reopen
multiget_batch_size = 0;
if (prob_op >= 0 && prob_op < (int)FLAGS_readpercent) {
// OPERATION read
TestGet(thread, read_opts, {rand_column_family}, {rand_key});
if (FLAGS_use_multiget) {
// Leave room for one more iteration of the loop with a single key
// batch. This is to ensure that each thread does exactly the same
// number of ops
multiget_batch_size = static_cast<int>(
std::min(static_cast<uint64_t>(thread->rand.Uniform(64)),
FLAGS_ops_per_thread - i - 1));
// If its the last iteration, ensure that multiget_batch_size is 1
multiget_batch_size = std::max(multiget_batch_size, 1);
rand_keys = GenerateNKeys(thread, multiget_batch_size, i);
TestMultiGet(thread, read_opts, rand_column_families, rand_keys);
i += multiget_batch_size - 1;
} else {
TestGet(thread, read_opts, rand_column_families, rand_keys);
}
} else if ((int)FLAGS_readpercent <= prob_op && prob_op < prefixBound) {
// OPERATION prefix scan
// keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are
// (8 - FLAGS_prefix_size) bytes besides the prefix. So there will
// be 2 ^ ((8 - FLAGS_prefix_size) * 8) possible keys with the same
// prefix
TestPrefixScan(thread, read_opts, {rand_column_family}, {rand_key});
TestPrefixScan(thread, read_opts, rand_column_families, rand_keys);
} else if (prefixBound <= prob_op && prob_op < writeBound) {
// OPERATION write
TestPut(thread, write_opts, read_opts, {rand_column_family}, {rand_key},
TestPut(thread, write_opts, read_opts, rand_column_families, rand_keys,
value, lock);
} else if (writeBound <= prob_op && prob_op < delBound) {
// OPERATION delete
TestDelete(thread, write_opts, {rand_column_family}, {rand_key}, lock);
TestDelete(thread, write_opts, rand_column_families, rand_keys, lock);
} else if (delBound <= prob_op && prob_op < delRangeBound) {
// OPERATION delete range
TestDeleteRange(thread, write_opts, {rand_column_family}, {rand_key},
TestDeleteRange(thread, write_opts, rand_column_families, rand_keys,
lock);
} else {
// OPERATION iterate
TestIterate(thread, read_opts, {rand_column_family}, {rand_key});
TestIterate(thread, read_opts, rand_column_families, rand_keys);
}
thread->stats.FinishedSingleOp();
#ifndef ROCKSDB_LITE
uint32_t tid = thread->tid;
assert(secondaries_.empty() ||
static_cast<size_t>(tid) < secondaries_.size());
if (FLAGS_secondary_catch_up_one_in > 0 &&
thread->rand.Uniform(FLAGS_secondary_catch_up_one_in) == 0) {
Status s = secondaries_[tid]->TryCatchUpWithPrimary();
if (!s.ok()) {
VerificationAbort(shared, "Secondary instance failed to catch up", s);
break;
}
}
#endif
}
thread->stats.Stop();
......@@ -1967,10 +2337,24 @@ class StressTest {
virtual bool ShouldAcquireMutexOnKey() const { return false; }
virtual std::vector<int> GenerateColumnFamilies(
const int /* num_column_families */, int rand_column_family) const {
return {rand_column_family};
}
virtual std::vector<int64_t> GenerateKeys(int64_t rand_key) const {
return {rand_key};
}
virtual Status TestGet(ThreadState* thread, const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) = 0;
virtual std::vector<Status> TestMultiGet(
ThreadState* thread, const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) = 0;
virtual Status TestPrefixScan(ThreadState* thread,
const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
......@@ -1979,8 +2363,7 @@ class StressTest {
virtual Status TestPut(ThreadState* thread, WriteOptions& write_opts,
const ReadOptions& read_opts,
const std::vector<int>& cf_ids,
const std::vector<int64_t>& keys,
char (&value)[kValueMaxLen],
const std::vector<int64_t>& keys, char (&value)[100],
std::unique_ptr<MutexLock>& lock) = 0;
virtual Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
......@@ -2007,6 +2390,30 @@ class StressTest {
const Snapshot* snapshot = db_->GetSnapshot();
ReadOptions readoptionscopy = read_opts;
readoptionscopy.snapshot = snapshot;
std::string upper_bound_str;
Slice upper_bound;
if (thread->rand.OneIn(16)) {
// in 1/16 chance, set a iterator upper bound
int64_t rand_upper_key = GenerateOneKey(thread, FLAGS_ops_per_thread);
upper_bound_str = Key(rand_upper_key);
upper_bound = Slice(upper_bound_str);
// uppder_bound can be smaller than seek key, but the query itself
// should not crash either.
readoptionscopy.iterate_upper_bound = &upper_bound;
}
std::string lower_bound_str;
Slice lower_bound;
if (thread->rand.OneIn(16)) {
// in 1/16 chance, set a iterator lower bound
int64_t rand_lower_key = GenerateOneKey(thread, FLAGS_ops_per_thread);
lower_bound_str = Key(rand_lower_key);
lower_bound = Slice(lower_bound_str);
// uppder_bound can be smaller than seek key, but the query itself
// should not crash either.
readoptionscopy.iterate_lower_bound = &lower_bound;
}
auto cfh = column_families_[rand_column_families[0]];
std::unique_ptr<Iterator> iter(db_->NewIterator(readoptionscopy, cfh));
......@@ -2032,52 +2439,240 @@ class StressTest {
return s;
}
void VerificationAbort(SharedState* shared, std::string msg, Status s) const {
printf("Verification failed: %s. Status is %s\n", msg.c_str(),
s.ToString().c_str());
shared->SetVerificationFailure();
#ifdef ROCKSDB_LITE
virtual Status TestBackupRestore(
ThreadState* /* thread */,
const std::vector<int>& /* rand_column_families */,
const std::vector<int64_t>& /* rand_keys */) {
assert(false);
fprintf(stderr,
"RocksDB lite does not support "
"TestBackupRestore\n");
std::terminate();
}
void VerificationAbort(SharedState* shared, std::string msg, int cf,
int64_t key) const {
printf("Verification failed for column family %d key %" PRIi64 ": %s\n", cf,
key, msg.c_str());
shared->SetVerificationFailure();
virtual Status TestCheckpoint(
ThreadState* /* thread */,
const std::vector<int>& /* rand_column_families */,
const std::vector<int64_t>& /* rand_keys */) {
assert(false);
fprintf(stderr,
"RocksDB lite does not support "
"TestCheckpoint\n");
std::terminate();
}
void PrintEnv() const {
fprintf(stdout, "RocksDB version : %d.%d\n", kMajorVersion,
kMinorVersion);
fprintf(stdout, "Format version : %d\n", FLAGS_format_version);
fprintf(stdout, "TransactionDB : %s\n",
FLAGS_use_txn ? "true" : "false");
fprintf(stdout, "Column families : %d\n", FLAGS_column_families);
if (!FLAGS_test_batches_snapshots) {
fprintf(stdout, "Clear CFs one in : %d\n",
FLAGS_clear_column_family_one_in);
#else // ROCKSDB_LITE
virtual Status TestBackupRestore(ThreadState* thread,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) {
// Note the column families chosen by `rand_column_families` cannot be
// dropped while the locks for `rand_keys` are held. So we should not have
// to worry about accessing those column families throughout this function.
assert(rand_column_families.size() == rand_keys.size());
std::string backup_dir = FLAGS_db + "/.backup" + ToString(thread->tid);
std::string restore_dir = FLAGS_db + "/.restore" + ToString(thread->tid);
BackupableDBOptions backup_opts(backup_dir);
BackupEngine* backup_engine = nullptr;
Status s = BackupEngine::Open(FLAGS_env, backup_opts, &backup_engine);
if (s.ok()) {
s = backup_engine->CreateNewBackup(db_);
}
fprintf(stdout, "Number of threads : %d\n", FLAGS_threads);
fprintf(stdout, "Ops per thread : %lu\n",
(unsigned long)FLAGS_ops_per_thread);
std::string ttl_state("unused");
if (FLAGS_ttl > 0) {
ttl_state = NumberToString(FLAGS_ttl);
if (s.ok()) {
delete backup_engine;
backup_engine = nullptr;
s = BackupEngine::Open(FLAGS_env, backup_opts, &backup_engine);
}
fprintf(stdout, "Time to live(sec) : %s\n", ttl_state.c_str());
fprintf(stdout, "Read percentage : %d%%\n", FLAGS_readpercent);
fprintf(stdout, "Prefix percentage : %d%%\n", FLAGS_prefixpercent);
fprintf(stdout, "Write percentage : %d%%\n", FLAGS_writepercent);
fprintf(stdout, "Delete percentage : %d%%\n", FLAGS_delpercent);
fprintf(stdout, "Delete range percentage : %d%%\n",
FLAGS_delrangepercent);
fprintf(stdout, "No overwrite percentage : %d%%\n",
FLAGS_nooverwritepercent);
fprintf(stdout, "Iterate percentage : %d%%\n", FLAGS_iterpercent);
fprintf(stdout, "DB-write-buffer-size : %" PRIu64 "\n",
FLAGS_db_write_buffer_size);
fprintf(stdout, "Write-buffer-size : %d\n",
FLAGS_write_buffer_size);
fprintf(stdout, "Iterations : %lu\n",
if (s.ok()) {
s = backup_engine->RestoreDBFromLatestBackup(restore_dir /* db_dir */,
restore_dir /* wal_dir */);
}
if (s.ok()) {
s = backup_engine->PurgeOldBackups(0 /* num_backups_to_keep */);
}
DB* restored_db = nullptr;
std::vector<ColumnFamilyHandle*> restored_cf_handles;
if (s.ok()) {
Options restore_options(options_);
restore_options.listeners.clear();
std::vector<ColumnFamilyDescriptor> cf_descriptors;
// TODO(ajkr): `column_family_names_` is not safe to access here when
// `clear_column_family_one_in != 0`. But we can't easily switch to
// `ListColumnFamilies` to get names because it won't necessarily give
// the same order as `column_family_names_`.
assert(FLAGS_clear_column_family_one_in == 0);
for (auto name : column_family_names_) {
cf_descriptors.emplace_back(name, ColumnFamilyOptions(restore_options));
}
s = DB::Open(DBOptions(restore_options), restore_dir, cf_descriptors,
&restored_cf_handles, &restored_db);
}
// for simplicity, currently only verifies existence/non-existence of a few
// keys
for (size_t i = 0; s.ok() && i < rand_column_families.size(); ++i) {
std::string key_str = Key(rand_keys[i]);
Slice key = key_str;
std::string restored_value;
Status get_status = restored_db->Get(
ReadOptions(), restored_cf_handles[rand_column_families[i]], key,
&restored_value);
bool exists =
thread->shared->Exists(rand_column_families[i], rand_keys[i]);
if (get_status.ok()) {
if (!exists) {
s = Status::Corruption(
"key exists in restore but not in original db");
}
} else if (get_status.IsNotFound()) {
if (exists) {
s = Status::Corruption(
"key exists in original db but not in restore");
}
} else {
s = get_status;
}
}
if (backup_engine != nullptr) {
delete backup_engine;
backup_engine = nullptr;
}
if (restored_db != nullptr) {
for (auto* cf_handle : restored_cf_handles) {
restored_db->DestroyColumnFamilyHandle(cf_handle);
}
delete restored_db;
restored_db = nullptr;
}
if (!s.ok()) {
printf("A backup/restore operation failed with: %s\n",
s.ToString().c_str());
}
return s;
}
virtual Status TestCheckpoint(ThreadState* thread,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) {
// Note the column families chosen by `rand_column_families` cannot be
// dropped while the locks for `rand_keys` are held. So we should not have
// to worry about accessing those column families throughout this function.
assert(rand_column_families.size() == rand_keys.size());
std::string checkpoint_dir =
FLAGS_db + "/.checkpoint" + ToString(thread->tid);
DestroyDB(checkpoint_dir, Options());
Checkpoint* checkpoint = nullptr;
Status s = Checkpoint::Create(db_, &checkpoint);
if (s.ok()) {
s = checkpoint->CreateCheckpoint(checkpoint_dir);
}
std::vector<ColumnFamilyHandle*> cf_handles;
DB* checkpoint_db = nullptr;
if (s.ok()) {
delete checkpoint;
checkpoint = nullptr;
Options options(options_);
options.listeners.clear();
std::vector<ColumnFamilyDescriptor> cf_descs;
// TODO(ajkr): `column_family_names_` is not safe to access here when
// `clear_column_family_one_in != 0`. But we can't easily switch to
// `ListColumnFamilies` to get names because it won't necessarily give
// the same order as `column_family_names_`.
if (FLAGS_clear_column_family_one_in == 0) {
for (const auto& name : column_family_names_) {
cf_descs.emplace_back(name, ColumnFamilyOptions(options));
}
s = DB::OpenForReadOnly(DBOptions(options), checkpoint_dir, cf_descs,
&cf_handles, &checkpoint_db);
}
}
if (checkpoint_db != nullptr) {
for (size_t i = 0; s.ok() && i < rand_column_families.size(); ++i) {
std::string key_str = Key(rand_keys[i]);
Slice key = key_str;
std::string value;
Status get_status = checkpoint_db->Get(
ReadOptions(), cf_handles[rand_column_families[i]], key, &value);
bool exists =
thread->shared->Exists(rand_column_families[i], rand_keys[i]);
if (get_status.ok()) {
if (!exists) {
s = Status::Corruption(
"key exists in checkpoint but not in original db");
}
} else if (get_status.IsNotFound()) {
if (exists) {
s = Status::Corruption(
"key exists in original db but not in checkpoint");
}
} else {
s = get_status;
}
}
for (auto cfh : cf_handles) {
delete cfh;
}
cf_handles.clear();
delete checkpoint_db;
checkpoint_db = nullptr;
}
DestroyDB(checkpoint_dir, Options());
if (!s.ok()) {
fprintf(stderr, "A checkpoint operation failed with: %s\n",
s.ToString().c_str());
}
return s;
}
#endif // ROCKSDB_LITE
void VerificationAbort(SharedState* shared, std::string msg, Status s) const {
printf("Verification failed: %s. Status is %s\n", msg.c_str(),
s.ToString().c_str());
shared->SetVerificationFailure();
}
void VerificationAbort(SharedState* shared, std::string msg, int cf,
int64_t key) const {
printf("Verification failed for column family %d key %" PRIi64 ": %s\n", cf,
key, msg.c_str());
shared->SetVerificationFailure();
}
void PrintEnv() const {
fprintf(stdout, "RocksDB version : %d.%d\n", kMajorVersion,
kMinorVersion);
fprintf(stdout, "Format version : %d\n", FLAGS_format_version);
fprintf(stdout, "TransactionDB : %s\n",
FLAGS_use_txn ? "true" : "false");
fprintf(stdout, "Read only mode : %s\n",
FLAGS_read_only ? "true" : "false");
fprintf(stdout, "Atomic flush : %s\n",
FLAGS_atomic_flush ? "true" : "false");
fprintf(stdout, "Column families : %d\n", FLAGS_column_families);
if (!FLAGS_test_batches_snapshots) {
fprintf(stdout, "Clear CFs one in : %d\n",
FLAGS_clear_column_family_one_in);
}
fprintf(stdout, "Number of threads : %d\n", FLAGS_threads);
fprintf(stdout, "Ops per thread : %lu\n",
(unsigned long)FLAGS_ops_per_thread);
std::string ttl_state("unused");
if (FLAGS_ttl > 0) {
ttl_state = NumberToString(FLAGS_ttl);
}
fprintf(stdout, "Time to live(sec) : %s\n", ttl_state.c_str());
fprintf(stdout, "Read percentage : %d%%\n", FLAGS_readpercent);
fprintf(stdout, "Prefix percentage : %d%%\n", FLAGS_prefixpercent);
fprintf(stdout, "Write percentage : %d%%\n", FLAGS_writepercent);
fprintf(stdout, "Delete percentage : %d%%\n", FLAGS_delpercent);
fprintf(stdout, "Delete range percentage : %d%%\n",
FLAGS_delrangepercent);
fprintf(stdout, "No overwrite percentage : %d%%\n",
FLAGS_nooverwritepercent);
fprintf(stdout, "Iterate percentage : %d%%\n", FLAGS_iterpercent);
fprintf(stdout, "DB-write-buffer-size : %" PRIu64 "\n",
FLAGS_db_write_buffer_size);
fprintf(stdout, "Write-buffer-size : %d\n",
FLAGS_write_buffer_size);
fprintf(stdout, "Iterations : %lu\n",
(unsigned long)FLAGS_num_iterations);
fprintf(stdout, "Max key : %lu\n",
(unsigned long)FLAGS_max_key);
......@@ -2095,6 +2690,8 @@ class StressTest {
fprintf(stdout, "Checksum type : %s\n", checksum.c_str());
fprintf(stdout, "Max subcompactions : %" PRIu64 "\n",
FLAGS_subcompactions);
fprintf(stdout, "Use MultiGet : %s\n",
FLAGS_use_multiget ? "true" : "false");
const char* memtablerep = "";
switch (FLAGS_rep_factory) {
......@@ -2118,6 +2715,8 @@ class StressTest {
fprintf(stdout, " %s\n", p.c_str());
}
}
fprintf(stdout, "Snapshot refresh nanos : %" PRIu64 "\n",
FLAGS_snap_refresh_nanos);
fprintf(stdout, "------------------------------------------------\n");
}
......@@ -2130,11 +2729,15 @@ class StressTest {
if (FLAGS_options_file.empty()) {
BlockBasedTableOptions block_based_options;
block_based_options.block_cache = cache_;
block_based_options.cache_index_and_filter_blocks =
FLAGS_cache_index_and_filter_blocks;
block_based_options.block_cache_compressed = compressed_cache_;
block_based_options.checksum = FLAGS_checksum_type_e;
block_based_options.block_size = FLAGS_block_size;
block_based_options.format_version =
static_cast<uint32_t>(FLAGS_format_version);
block_based_options.index_block_restart_interval =
static_cast<int32_t>(FLAGS_index_block_restart_interval);
block_based_options.filter_policy = filter_policy_;
options_.table_factory.reset(
NewBlockBasedTableFactory(block_based_options));
......@@ -2147,6 +2750,8 @@ class StressTest {
FLAGS_max_write_buffer_number_to_maintain;
options_.memtable_prefix_bloom_size_ratio =
FLAGS_memtable_prefix_bloom_size_ratio;
options_.memtable_whole_key_filtering =
FLAGS_memtable_whole_key_filtering;
options_.max_background_compactions = FLAGS_max_background_compactions;
options_.max_background_flushes = FLAGS_max_background_flushes;
options_.compaction_style =
......@@ -2163,6 +2768,8 @@ class StressTest {
options_.use_direct_reads = FLAGS_use_direct_reads;
options_.use_direct_io_for_flush_and_compaction =
FLAGS_use_direct_io_for_flush_and_compaction;
options_.recycle_log_file_num =
static_cast<size_t>(FLAGS_recycle_log_file_num);
options_.target_file_size_base = FLAGS_target_file_size_base;
options_.target_file_size_multiplier = FLAGS_target_file_size_multiplier;
options_.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base;
......@@ -2174,6 +2781,10 @@ class StressTest {
options_.level0_file_num_compaction_trigger =
FLAGS_level0_file_num_compaction_trigger;
options_.compression = FLAGS_compression_type_e;
options_.compression_opts.max_dict_bytes =
FLAGS_compression_max_dict_bytes;
options_.compression_opts.zstd_max_train_bytes =
FLAGS_compression_zstd_max_train_bytes;
options_.create_if_missing = true;
options_.max_manifest_file_size = FLAGS_max_manifest_file_size;
options_.inplace_update_support = FLAGS_in_place_update;
......@@ -2191,6 +2802,7 @@ class StressTest {
FLAGS_universal_max_merge_width;
options_.compaction_options_universal.max_size_amplification_percent =
FLAGS_universal_max_size_amplification_percent;
options_.atomic_flush = FLAGS_atomic_flush;
} else {
#ifdef ROCKSDB_LITE
fprintf(stderr, "--options_file not supported in lite mode\n");
......@@ -2254,6 +2866,7 @@ class StressTest {
} else {
options_.merge_operator = MergeOperators::CreatePutOperator();
}
options_.snap_refresh_nanos = FLAGS_snap_refresh_nanos;
fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str());
......@@ -2334,8 +2947,13 @@ class StressTest {
&tdb);
db_ = tdb;
} else if (!FLAGS_use_txn) {
s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
&column_families_, &db_);
if (db_preload_finished_.load() && FLAGS_read_only) {
s = DB::OpenForReadOnly(DBOptions(options_), FLAGS_db, cf_descriptors,
&column_families_, &db_);
} else {
s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
&column_families_, &db_);
}
} else {
#ifndef ROCKSDB_LITE
TransactionDBOptions txn_db_options;
......@@ -2365,11 +2983,52 @@ class StressTest {
}
assert(!s.ok() || column_families_.size() ==
static_cast<size_t>(FLAGS_column_families));
if (FLAGS_enable_secondary) {
#ifndef ROCKSDB_LITE
secondaries_.resize(FLAGS_threads);
std::fill(secondaries_.begin(), secondaries_.end(), nullptr);
secondary_cfh_lists_.clear();
secondary_cfh_lists_.resize(FLAGS_threads);
Options tmp_opts;
tmp_opts.max_open_files = FLAGS_open_files;
tmp_opts.statistics = dbstats_secondaries;
tmp_opts.env = FLAGS_env;
for (size_t i = 0; i != static_cast<size_t>(FLAGS_threads); ++i) {
const std::string secondary_path =
FLAGS_secondaries_base + "/" + std::to_string(i);
s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
cf_descriptors, &secondary_cfh_lists_[i],
&secondaries_[i]);
if (!s.ok()) {
break;
}
}
#else
fprintf(stderr, "Secondary is not supported in RocksDBLite\n");
exit(1);
#endif
}
} else {
#ifndef ROCKSDB_LITE
DBWithTTL* db_with_ttl;
s = DBWithTTL::Open(options_, FLAGS_db, &db_with_ttl, FLAGS_ttl);
db_ = db_with_ttl;
if (FLAGS_enable_secondary) {
secondaries_.resize(FLAGS_threads);
std::fill(secondaries_.begin(), secondaries_.end(), nullptr);
Options tmp_opts;
tmp_opts.max_open_files = FLAGS_open_files;
for (size_t i = 0; i != static_cast<size_t>(FLAGS_threads); ++i) {
const std::string secondary_path =
FLAGS_secondaries_base + "/" + std::to_string(i);
s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
&secondaries_[i]);
if (!s.ok()) {
break;
}
}
}
#else
fprintf(stderr, "TTL is not supported in RocksDBLite\n");
exit(1);
......@@ -2392,6 +3051,17 @@ class StressTest {
txn_db_ = nullptr;
#endif
assert(secondaries_.size() == secondary_cfh_lists_.size());
size_t n = secondaries_.size();
for (size_t i = 0; i != n; ++i) {
for (auto* cf : secondary_cfh_lists_[i]) {
delete cf;
}
secondary_cfh_lists_[i].clear();
delete secondaries_[i];
}
secondaries_.clear();
num_times_reopened_++;
auto now = FLAGS_env->NowMicros();
fprintf(stdout, "%s Reopening database for the %dth time\n",
......@@ -2404,6 +3074,10 @@ class StressTest {
if (dbstats) {
fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str());
}
if (dbstats_secondaries) {
fprintf(stdout, "Secondary instances STATISTICS:\n%s\n",
dbstats_secondaries->ToString().c_str());
}
}
std::shared_ptr<Cache> cache_;
......@@ -2420,6 +3094,11 @@ class StressTest {
int num_times_reopened_;
std::unordered_map<std::string, std::vector<std::string> > options_table_;
std::vector<std::string> options_index_;
std::atomic<bool> db_preload_finished_;
// Fields used for stress-testing secondary instance in the same process
std::vector<DB*> secondaries_;
std::vector<std::vector<ColumnFamilyHandle*> > secondary_cfh_lists_;
};
class NonBatchedOpsStressTest : public StressTest {
......@@ -2582,6 +3261,40 @@ class NonBatchedOpsStressTest : public StressTest {
return s;
}
virtual std::vector<Status> TestMultiGet(
ThreadState* thread, const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) {
size_t num_keys = rand_keys.size();
std::vector<std::string> key_str;
std::vector<Slice> keys;
key_str.reserve(num_keys);
keys.reserve(num_keys);
std::vector<PinnableSlice> values(num_keys);
std::vector<Status> statuses(num_keys);
ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
for (size_t i = 0; i < num_keys; ++i) {
key_str.emplace_back(Key(rand_keys[i]));
keys.emplace_back(key_str.back());
}
db_->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(),
statuses.data());
for (const auto& s : statuses) {
if (s.ok()) {
// found case
thread->stats.AddGets(1, 1);
} else if (s.IsNotFound()) {
// not found case
thread->stats.AddGets(1, 0);
} else {
// errors case
thread->stats.AddErrors(1);
}
}
return statuses;
}
virtual Status TestPrefixScan(ThreadState* thread,
const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
......@@ -2590,16 +3303,26 @@ class NonBatchedOpsStressTest : public StressTest {
std::string key_str = Key(rand_keys[0]);
Slice key = key_str;
Slice prefix = Slice(key.data(), FLAGS_prefix_size);
Iterator* iter = db_->NewIterator(read_opts, cfh);
int64_t count = 0;
std::string upper_bound;
Slice ub_slice;
ReadOptions ro_copy = read_opts;
if (thread->rand.OneIn(2) && GetNextPrefix(prefix, &upper_bound)) {
// For half of the time, set the upper bound to the next prefix
ub_slice = Slice(upper_bound);
ro_copy.iterate_upper_bound = &ub_slice;
}
Iterator* iter = db_->NewIterator(ro_copy, cfh);
long count = 0;
for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix);
iter->Next()) {
++count;
}
assert(count <= (static_cast<int64_t>(1) << ((8 - FLAGS_prefix_size) * 8)));
assert(count <= (static_cast<long>(1) << ((8 - FLAGS_prefix_size) * 8)));
Status s = iter->status();
if (iter->status().ok()) {
thread->stats.AddPrefixes(1, static_cast<int>(count));
thread->stats.AddPrefixes(1, count);
} else {
thread->stats.AddErrors(1);
}
......@@ -2611,8 +3334,7 @@ class NonBatchedOpsStressTest : public StressTest {
const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys,
char (&value)[kValueMaxLen],
std::unique_ptr<MutexLock>& lock) {
char (&value)[100], std::unique_ptr<MutexLock>& lock) {
auto shared = thread->shared;
int64_t max_key = shared->GetMaxKey();
int64_t rand_key = rand_keys[0];
......@@ -2867,7 +3589,7 @@ class NonBatchedOpsStressTest : public StressTest {
values.push_back(value_base);
shared->Put(column_family, key, value_base, true /* pending */);
char value[kValueMaxLen];
char value[100];
size_t value_len = GenerateValue(value_base, value, sizeof(value));
auto key_str = Key(key);
s = sst_file_writer.Put(Slice(key_str), Slice(value, value_len));
......@@ -2945,7 +3667,7 @@ class BatchedOpsStressTest : public StressTest {
const ReadOptions& /* read_opts */,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys,
char (&value)[kValueMaxLen],
char (&value)[100],
std::unique_ptr<MutexLock>& /* lock */) {
uint32_t value_base =
thread->rand.Next() % thread->shared->UNKNOWN_SENTINEL;
......@@ -3092,6 +3814,76 @@ class BatchedOpsStressTest : public StressTest {
return s;
}
virtual std::vector<Status> TestMultiGet(
ThreadState* thread, const ReadOptions& readoptions,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) {
size_t num_keys = rand_keys.size();
std::vector<Status> ret_status(num_keys);
std::array<std::string, 10> keys = {"0", "1", "2", "3", "4",
"5", "6", "7", "8", "9"};
size_t num_prefixes = keys.size();
for (size_t rand_key = 0; rand_key < num_keys; ++rand_key) {
std::vector<Slice> key_slices;
std::vector<PinnableSlice> values(num_prefixes);
std::vector<Status> statuses(num_prefixes);
ReadOptions readoptionscopy = readoptions;
readoptionscopy.snapshot = db_->GetSnapshot();
std::vector<std::string> key_str;
key_str.reserve(num_prefixes);
key_slices.reserve(num_prefixes);
std::string from_db;
ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
for (size_t key = 0; key < num_prefixes; ++key) {
key_str.emplace_back(keys[key] + Key(rand_keys[rand_key]));
key_slices.emplace_back(key_str.back());
}
db_->MultiGet(readoptionscopy, cfh, num_prefixes, key_slices.data(),
values.data(), statuses.data());
for (size_t i = 0; i < num_prefixes; i++) {
Status s = statuses[i];
if (!s.ok() && !s.IsNotFound()) {
fprintf(stderr, "get error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
ret_status[rand_key] = s;
// we continue after error rather than exiting so that we can
// find more errors if any
} else if (s.IsNotFound()) {
thread->stats.AddGets(1, 0);
ret_status[rand_key] = s;
} else {
char expected_prefix = (keys[i])[0];
char actual_prefix = (values[i])[0];
if (actual_prefix != expected_prefix) {
fprintf(stderr, "error expected prefix = %c actual = %c\n",
expected_prefix, actual_prefix);
}
std::string str;
str.assign(values[i].data(), values[i].size());
values[i].Reset();
str[0] = ' '; // blank out the differing character
values[i].PinSelf(str);
thread->stats.AddGets(1, 1);
}
}
db_->ReleaseSnapshot(readoptionscopy.snapshot);
// Now that we retrieved all values, check that they all match
for (size_t i = 1; i < num_prefixes; i++) {
if (values[i] != values[0]) {
fprintf(stderr, "error : inconsistent values for key %s: %s, %s\n",
key_str[i].c_str(), StringToHex(values[0].ToString()).c_str(),
StringToHex(values[i].ToString()).c_str());
// we continue after error rather than exiting so that we can
// find more errors if any
}
}
}
return ret_status;
}
// Given a key, this does prefix scans for "0"+P, "1"+P,..."9"+P
// in the same snapshot where P is the first FLAGS_prefix_size - 1 bytes
// of the key. Each of these 10 scans returns a series of values;
......@@ -3111,6 +3903,8 @@ class BatchedOpsStressTest : public StressTest {
ReadOptions readoptionscopy[10];
const Snapshot* snapshot = db_->GetSnapshot();
Iterator* iters[10];
std::string upper_bounds[10];
Slice ub_slices[10];
Status s = Status::OK();
for (int i = 0; i < 10; i++) {
prefixes[i] += key.ToString();
......@@ -3118,11 +3912,17 @@ class BatchedOpsStressTest : public StressTest {
prefix_slices[i] = Slice(prefixes[i]);
readoptionscopy[i] = readoptions;
readoptionscopy[i].snapshot = snapshot;
if (thread->rand.OneIn(2) &&
GetNextPrefix(prefix_slices[i], &(upper_bounds[i]))) {
// For half of the time, set the upper bound to the next prefix
ub_slices[i] = Slice(upper_bounds[i]);
readoptionscopy[i].iterate_upper_bound = &(ub_slices[i]);
}
iters[i] = db_->NewIterator(readoptionscopy[i], cfh);
iters[i]->Seek(prefix_slices[i]);
}
int count = 0;
long count = 0;
while (iters[0]->Valid() && iters[0]->key().starts_with(prefix_slices[0])) {
count++;
std::string values[10];
......@@ -3178,6 +3978,422 @@ class BatchedOpsStressTest : public StressTest {
virtual void VerifyDb(ThreadState* /* thread */) const {}
};
class AtomicFlushStressTest : public StressTest {
public:
AtomicFlushStressTest() : batch_id_(0) {}
virtual ~AtomicFlushStressTest() {}
virtual Status TestPut(ThreadState* thread, WriteOptions& write_opts,
const ReadOptions& /* read_opts */,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys,
char (&value)[100],
std::unique_ptr<MutexLock>& /* lock */) {
std::string key_str = Key(rand_keys[0]);
Slice key = key_str;
uint64_t value_base = batch_id_.fetch_add(1);
size_t sz =
GenerateValue(static_cast<uint32_t>(value_base), value, sizeof(value));
Slice v(value, sz);
WriteBatch batch;
for (auto cf : rand_column_families) {
ColumnFamilyHandle* cfh = column_families_[cf];
if (FLAGS_use_merge) {
batch.Merge(cfh, key, v);
} else { /* !FLAGS_use_merge */
batch.Put(cfh, key, v);
}
}
Status s = db_->Write(write_opts, &batch);
if (!s.ok()) {
fprintf(stderr, "multi put or merge error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
} else {
auto num = static_cast<long>(rand_column_families.size());
thread->stats.AddBytesForWrites(num, (sz + 1) * num);
}
return s;
}
virtual Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys,
std::unique_ptr<MutexLock>& /* lock */) {
std::string key_str = Key(rand_keys[0]);
Slice key = key_str;
WriteBatch batch;
for (auto cf : rand_column_families) {
ColumnFamilyHandle* cfh = column_families_[cf];
batch.Delete(cfh, key);
}
Status s = db_->Write(write_opts, &batch);
if (!s.ok()) {
fprintf(stderr, "multidel error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
} else {
thread->stats.AddDeletes(static_cast<long>(rand_column_families.size()));
}
return s;
}
virtual Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys,
std::unique_ptr<MutexLock>& /* lock */) {
int64_t rand_key = rand_keys[0];
auto shared = thread->shared;
int64_t max_key = shared->GetMaxKey();
if (rand_key > max_key - FLAGS_range_deletion_width) {
rand_key =
thread->rand.Next() % (max_key - FLAGS_range_deletion_width + 1);
}
std::string key_str = Key(rand_key);
Slice key = key_str;
std::string end_key_str = Key(rand_key + FLAGS_range_deletion_width);
Slice end_key = end_key_str;
WriteBatch batch;
for (auto cf : rand_column_families) {
ColumnFamilyHandle* cfh = column_families_[rand_column_families[cf]];
batch.DeleteRange(cfh, key, end_key);
}
Status s = db_->Write(write_opts, &batch);
if (!s.ok()) {
fprintf(stderr, "multi del range error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
} else {
thread->stats.AddRangeDeletions(
static_cast<long>(rand_column_families.size()));
}
return s;
}
virtual void TestIngestExternalFile(
ThreadState* /* thread */,
const std::vector<int>& /* rand_column_families */,
const std::vector<int64_t>& /* rand_keys */,
std::unique_ptr<MutexLock>& /* lock */) {
assert(false);
fprintf(stderr,
"AtomicFlushStressTest does not support TestIngestExternalFile "
"because it's not possible to verify the result\n");
std::terminate();
}
virtual Status TestGet(ThreadState* thread, const ReadOptions& readoptions,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) {
std::string key_str = Key(rand_keys[0]);
Slice key = key_str;
auto cfh =
column_families_[rand_column_families[thread->rand.Next() %
rand_column_families.size()]];
std::string from_db;
Status s = db_->Get(readoptions, cfh, key, &from_db);
if (s.ok()) {
thread->stats.AddGets(1, 1);
} else if (s.IsNotFound()) {
thread->stats.AddGets(1, 0);
} else {
thread->stats.AddErrors(1);
}
return s;
}
virtual std::vector<Status> TestMultiGet(
ThreadState* thread, const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) {
size_t num_keys = rand_keys.size();
std::vector<std::string> key_str;
std::vector<Slice> keys;
keys.reserve(num_keys);
key_str.reserve(num_keys);
std::vector<PinnableSlice> values(num_keys);
std::vector<Status> statuses(num_keys);
ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
for (size_t i = 0; i < num_keys; ++i) {
key_str.emplace_back(Key(rand_keys[i]));
keys.emplace_back(key_str.back());
}
db_->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(),
statuses.data());
for (auto s : statuses) {
if (s.ok()) {
// found case
thread->stats.AddGets(1, 1);
} else if (s.IsNotFound()) {
// not found case
thread->stats.AddGets(1, 0);
} else {
// errors case
thread->stats.AddErrors(1);
}
}
return statuses;
}
virtual Status TestPrefixScan(ThreadState* thread,
const ReadOptions& readoptions,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) {
std::string key_str = Key(rand_keys[0]);
Slice key = key_str;
Slice prefix = Slice(key.data(), FLAGS_prefix_size);
std::string upper_bound;
Slice ub_slice;
ReadOptions ro_copy = readoptions;
if (thread->rand.OneIn(2) && GetNextPrefix(prefix, &upper_bound)) {
ub_slice = Slice(upper_bound);
ro_copy.iterate_upper_bound = &ub_slice;
}
auto cfh =
column_families_[rand_column_families[thread->rand.Next() %
rand_column_families.size()]];
Iterator* iter = db_->NewIterator(ro_copy, cfh);
long count = 0;
for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix);
iter->Next()) {
++count;
}
assert(count <= (static_cast<long>(1) << ((8 - FLAGS_prefix_size) * 8)));
Status s = iter->status();
if (s.ok()) {
thread->stats.AddPrefixes(1, count);
} else {
thread->stats.AddErrors(1);
}
delete iter;
return s;
}
#ifdef ROCKSDB_LITE
virtual Status TestCheckpoint(
ThreadState* /* thread */,
const std::vector<int>& /* rand_column_families */,
const std::vector<int64_t>& /* rand_keys */) {
assert(false);
fprintf(stderr,
"RocksDB lite does not support "
"TestCheckpoint\n");
std::terminate();
}
#else
virtual Status TestCheckpoint(
ThreadState* thread, const std::vector<int>& /* rand_column_families */,
const std::vector<int64_t>& /* rand_keys */) {
std::string checkpoint_dir =
FLAGS_db + "/.checkpoint" + ToString(thread->tid);
DestroyDB(checkpoint_dir, Options());
Checkpoint* checkpoint = nullptr;
Status s = Checkpoint::Create(db_, &checkpoint);
if (s.ok()) {
s = checkpoint->CreateCheckpoint(checkpoint_dir);
}
std::vector<ColumnFamilyHandle*> cf_handles;
DB* checkpoint_db = nullptr;
if (s.ok()) {
delete checkpoint;
checkpoint = nullptr;
Options options(options_);
options.listeners.clear();
std::vector<ColumnFamilyDescriptor> cf_descs;
// TODO(ajkr): `column_family_names_` is not safe to access here when
// `clear_column_family_one_in != 0`. But we can't easily switch to
// `ListColumnFamilies` to get names because it won't necessarily give
// the same order as `column_family_names_`.
if (FLAGS_clear_column_family_one_in == 0) {
for (const auto& name : column_family_names_) {
cf_descs.emplace_back(name, ColumnFamilyOptions(options));
}
s = DB::OpenForReadOnly(DBOptions(options), checkpoint_dir, cf_descs,
&cf_handles, &checkpoint_db);
}
}
if (checkpoint_db != nullptr) {
for (auto cfh : cf_handles) {
delete cfh;
}
cf_handles.clear();
delete checkpoint_db;
checkpoint_db = nullptr;
}
DestroyDB(checkpoint_dir, Options());
if (!s.ok()) {
fprintf(stderr, "A checkpoint operation failed with: %s\n",
s.ToString().c_str());
}
return s;
}
#endif // !ROCKSDB_LITE
virtual void VerifyDb(ThreadState* thread) const {
ReadOptions options(FLAGS_verify_checksum, true);
// We must set total_order_seek to true because we are doing a SeekToFirst
// on a column family whose memtables may support (by default) prefix-based
// iterator. In this case, NewIterator with options.total_order_seek being
// false returns a prefix-based iterator. Calling SeekToFirst using this
// iterator causes the iterator to become invalid. That means we cannot
// iterate the memtable using this iterator any more, although the memtable
// contains the most up-to-date key-values.
options.total_order_seek = true;
assert(thread != nullptr);
auto shared = thread->shared;
std::vector<std::unique_ptr<Iterator> > iters(column_families_.size());
for (size_t i = 0; i != column_families_.size(); ++i) {
iters[i].reset(db_->NewIterator(options, column_families_[i]));
}
for (auto& iter : iters) {
iter->SeekToFirst();
}
size_t num = column_families_.size();
assert(num == iters.size());
std::vector<Status> statuses(num, Status::OK());
do {
if (shared->HasVerificationFailedYet()) {
break;
}
size_t valid_cnt = 0;
size_t idx = 0;
for (auto& iter : iters) {
if (iter->Valid()) {
++valid_cnt;
} else {
statuses[idx] = iter->status();
}
++idx;
}
if (valid_cnt == 0) {
Status status;
for (size_t i = 0; i != num; ++i) {
const auto& s = statuses[i];
if (!s.ok()) {
status = s;
fprintf(stderr, "Iterator on cf %s has error: %s\n",
column_families_[i]->GetName().c_str(),
s.ToString().c_str());
shared->SetVerificationFailure();
}
}
if (status.ok()) {
fprintf(stdout, "Finished scanning all column families.\n");
}
break;
} else if (valid_cnt != iters.size()) {
shared->SetVerificationFailure();
for (size_t i = 0; i != num; ++i) {
if (!iters[i]->Valid()) {
if (statuses[i].ok()) {
fprintf(stderr, "Finished scanning cf %s\n",
column_families_[i]->GetName().c_str());
} else {
fprintf(stderr, "Iterator on cf %s has error: %s\n",
column_families_[i]->GetName().c_str(),
statuses[i].ToString().c_str());
}
} else {
fprintf(stderr, "cf %s has remaining data to scan\n",
column_families_[i]->GetName().c_str());
}
}
break;
}
if (shared->HasVerificationFailedYet()) {
break;
}
// If the program reaches here, then all column families' iterators are
// still valid.
if (shared->PrintingVerificationResults()) {
continue;
}
Slice key;
Slice value;
int num_mismatched_cfs = 0;
for (size_t i = 0; i != num; ++i) {
if (i == 0) {
key = iters[i]->key();
value = iters[i]->value();
} else {
int cmp = key.compare(iters[i]->key());
if (cmp != 0) {
++num_mismatched_cfs;
if (1 == num_mismatched_cfs) {
fprintf(stderr, "Verification failed\n");
fprintf(stderr, "Latest Sequence Number: %" PRIu64 "\n",
db_->GetLatestSequenceNumber());
fprintf(stderr, "[%s] %s => %s\n",
column_families_[0]->GetName().c_str(),
key.ToString(true /* hex */).c_str(),
value.ToString(true /* hex */).c_str());
}
fprintf(stderr, "[%s] %s => %s\n",
column_families_[i]->GetName().c_str(),
iters[i]->key().ToString(true /* hex */).c_str(),
iters[i]->value().ToString(true /* hex */).c_str());
#ifndef ROCKSDB_LITE
Slice begin_key;
Slice end_key;
if (cmp < 0) {
begin_key = key;
end_key = iters[i]->key();
} else {
begin_key = iters[i]->key();
end_key = key;
}
std::vector<KeyVersion> versions;
const size_t kMaxNumIKeys = 8;
const auto print_key_versions = [&](ColumnFamilyHandle* cfh) {
Status s = GetAllKeyVersions(db_, cfh, begin_key, end_key,
kMaxNumIKeys, &versions);
if (!s.ok()) {
fprintf(stderr, "%s\n", s.ToString().c_str());
return;
}
assert(nullptr != cfh);
fprintf(stderr,
"Internal keys in CF '%s', [%s, %s] (max %" ROCKSDB_PRIszt
")\n",
cfh->GetName().c_str(),
begin_key.ToString(true /* hex */).c_str(),
end_key.ToString(true /* hex */).c_str(), kMaxNumIKeys);
for (const KeyVersion& kv : versions) {
fprintf(stderr, " key %s seq %" PRIu64 " type %d\n",
Slice(kv.user_key).ToString(true).c_str(), kv.sequence,
kv.type);
}
};
if (1 == num_mismatched_cfs) {
print_key_versions(column_families_[0]);
}
print_key_versions(column_families_[i]);
#endif // ROCKSDB_LITE
shared->SetVerificationFailure();
}
}
}
shared->FinishPrintingVerificationResults();
for (auto& iter : iters) {
iter->Next();
}
} while (true);
}
virtual std::vector<int> GenerateColumnFamilies(
const int /* num_column_families */, int /* rand_column_family */) const {
std::vector<int> ret;
int num = static_cast<int>(column_families_.size());
int k = 0;
std::generate_n(back_inserter(ret), num, [&k]() -> int { return k++; });
return ret;
}
private:
std::atomic<int64_t> batch_id_;
};
} // namespace rocksdb
int main(int argc, char** argv) {
......@@ -3187,6 +4403,9 @@ int main(int argc, char** argv) {
if (FLAGS_statistics) {
dbstats = rocksdb::CreateDBStatistics();
if (FLAGS_enable_secondary) {
dbstats_secondaries = rocksdb::CreateDBStatistics();
}
}
FLAGS_compression_type_e =
StringToCompressionType(FLAGS_compression_type.c_str());
......@@ -3265,6 +4484,26 @@ int main(int argc, char** argv) {
"Error: nooverwritepercent must be 0 when using file ingestion\n");
exit(1);
}
if (FLAGS_clear_column_family_one_in > 0 && FLAGS_backup_one_in > 0) {
fprintf(stderr,
"Error: clear_column_family_one_in must be 0 when using backup\n");
exit(1);
}
if (FLAGS_test_atomic_flush) {
FLAGS_atomic_flush = true;
}
if (FLAGS_read_only) {
if (FLAGS_writepercent != 0 || FLAGS_delpercent != 0 ||
FLAGS_delrangepercent != 0) {
fprintf(stderr, "Error: updates are not supported in read only mode\n");
exit(1);
} else if (FLAGS_checkpoint_one_in > 0 &&
FLAGS_clear_column_family_one_in > 0) {
fprintf(stdout,
"Warn: checkpoint won't be validated since column families may "
"be dropped.\n");
}
}
// Choose a location for the test database if none given with --db=<path>
if (FLAGS_db.empty()) {
......@@ -3274,11 +4513,31 @@ int main(int argc, char** argv) {
FLAGS_db = default_db_path;
}
if (FLAGS_enable_secondary && FLAGS_secondaries_base.empty()) {
std::string default_secondaries_path;
FLAGS_env->GetTestDirectory(&default_secondaries_path);
default_secondaries_path += "/dbstress_secondaries";
rocksdb::Status s = FLAGS_env->CreateDirIfMissing(default_secondaries_path);
if (!s.ok()) {
fprintf(stderr, "Failed to create directory %s: %s\n",
default_secondaries_path.c_str(), s.ToString().c_str());
exit(1);
}
FLAGS_secondaries_base = default_secondaries_path;
}
if (!FLAGS_enable_secondary && FLAGS_secondary_catch_up_one_in > 0) {
fprintf(stderr, "Secondary instance is disabled.\n");
exit(1);
}
rocksdb_kill_odds = FLAGS_kill_random_test;
rocksdb_kill_prefix_blacklist = SplitString(FLAGS_kill_prefix_blacklist);
std::unique_ptr<rocksdb::StressTest> stress;
if (FLAGS_test_batches_snapshots) {
if (FLAGS_test_atomic_flush) {
stress.reset(new rocksdb::AtomicFlushStressTest());
} else if (FLAGS_test_batches_snapshots) {
stress.reset(new rocksdb::BatchedOpsStressTest());
} else {
stress.reset(new rocksdb::NonBatchedOpsStressTest());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment