Unverified Commit 96be57ec authored by Connor's avatar Connor Committed by GitHub

Add cf abtest for Titan (#151)

parent 79dc247a
......@@ -138,9 +138,10 @@ DEFINE_bool(test_batches_snapshots, false,
DEFINE_bool(atomic_flush, false,
"If set, enables atomic flush in the options.\n");
DEFINE_bool(test_atomic_flush, false,
"If set, runs the stress test dedicated to verifying atomic flush "
"functionality. Setting this implies `atomic_flush=true`.\n");
DEFINE_bool(test_cf_consistency, false,
"If set, runs the stress test dedicated to verifying writes to "
"multiple column families are consistent. Setting this implies "
"`atomic_flush=true` is set true if `disable_wal=false`.\n");
DEFINE_int32(threads, 32, "Number of concurrent threads to run.");
......@@ -2944,6 +2945,18 @@ class StressTest {
for (const auto& cfd : cf_descriptors) {
titan_cf_descriptors.emplace_back(
titandb::TitanCFDescriptor{cfd.name, options_});
if (FLAGS_test_cf_consistency) {
// if test cf consistency, make some column families not store value
// in blob
if (titan_cf_descriptors.size() % 2 == 0) {
titan_cf_descriptors.back().options.min_blob_size =
1024 * 1024 * 1024;
}
fprintf(stdout,
"Create Titan column family %s with min_blob_size %lu\n",
cfd.name.c_str(),
titan_cf_descriptors.back().options.min_blob_size);
}
}
titandb::TitanDB* tdb;
s = titandb::TitanDB::Open(titan_db_options, FLAGS_db,
......@@ -3219,6 +3232,14 @@ class NonBatchedOpsStressTest : public StressTest {
std::vector<titandb::TitanCFDescriptor> tmp;
tmp.emplace_back(titandb::TitanCFDescriptor{
new_name, titandb::TitanCFOptions(options_)});
if (FLAGS_test_cf_consistency) {
if (new_column_family_name_ % 2 == 0) {
tmp.back().options.min_blob_size = 1024 * 1024 * 1024;
}
fprintf(stdout,
"recreate Titan column family %s with min_blob_size %lu\n",
new_name.c_str(), tmp.back().options.min_blob_size);
}
std::vector<ColumnFamilyHandle*> result;
s = tdb->CreateColumnFamilies(tmp, &result);
if (s.ok()) {
......@@ -3982,11 +4003,11 @@ class BatchedOpsStressTest : public StressTest {
virtual void VerifyDb(ThreadState* /* thread */) const {}
};
class AtomicFlushStressTest : public StressTest {
class CfConsistencyStressTest : public StressTest {
public:
AtomicFlushStressTest() : batch_id_(0) {}
CfConsistencyStressTest() : batch_id_(0) {}
virtual ~AtomicFlushStressTest() {}
virtual ~CfConsistencyStressTest() {}
virtual Status TestPut(ThreadState* thread, WriteOptions& write_opts,
const ReadOptions& /* read_opts */,
......@@ -4080,7 +4101,7 @@ class AtomicFlushStressTest : public StressTest {
std::unique_ptr<MutexLock>& /* lock */) {
assert(false);
fprintf(stderr,
"AtomicFlushStressTest does not support TestIngestExternalFile "
"CfConsistencyStressTest does not support TestIngestExternalFile "
"because it's not possible to verify the result\n");
std::terminate();
}
......@@ -4090,12 +4111,72 @@ class AtomicFlushStressTest : public StressTest {
const std::vector<int64_t>& rand_keys) {
std::string key_str = Key(rand_keys[0]);
Slice key = key_str;
auto cfh =
column_families_[rand_column_families[thread->rand.Next() %
rand_column_families.size()]];
std::string from_db;
Status s = db_->Get(readoptions, cfh, key, &from_db);
if (s.ok()) {
Status s;
bool is_consistent = true;
if (thread->rand.OneIn(2)) {
// 1/2 chance, does a random read from random CF
auto cfh =
column_families_[rand_column_families[thread->rand.Next() %
rand_column_families.size()]];
std::string from_db;
s = db_->Get(readoptions, cfh, key, &from_db);
} else {
// 1/2 chance, comparing one key is the same across all CFs
const Snapshot* snapshot = db_->GetSnapshot();
ReadOptions readoptionscopy = readoptions;
readoptionscopy.snapshot = snapshot;
std::string value0;
s = db_->Get(readoptionscopy, column_families_[rand_column_families[0]],
key, &value0);
if (s.ok() || s.IsNotFound()) {
bool found = s.ok();
for (size_t i = 1; i < rand_column_families.size(); i++) {
std::string value1;
s = db_->Get(readoptionscopy,
column_families_[rand_column_families[i]], key, &value1);
if (!s.ok() && !s.IsNotFound()) {
break;
}
if (!found && s.ok()) {
fprintf(stderr, "Get() return different results with key %s\n",
key_str.c_str());
fprintf(stderr, "CF %s is not found\n",
column_family_names_[0].c_str());
fprintf(stderr, "CF %s returns value %s\n",
column_family_names_[i].c_str(), value1.c_str());
is_consistent = false;
} else if (found && s.IsNotFound()) {
fprintf(stderr, "Get() return different results with key %s\n",
key_str.c_str());
fprintf(stderr, "CF %s returns value %s\n",
column_family_names_[0].c_str(), value0.c_str());
fprintf(stderr, "CF %s is not found\n",
column_family_names_[i].c_str());
is_consistent = false;
} else if (s.ok() && value0 != value1) {
fprintf(stderr, "Get() return different results with key %s\n",
key_str.c_str());
fprintf(stderr, "CF %s returns value %s\n",
column_family_names_[0].c_str(), value0.c_str());
fprintf(stderr, "CF %s returns value %s\n",
column_family_names_[i].c_str(), value1.c_str());
is_consistent = false;
}
if (!is_consistent) {
break;
}
}
}
db_->ReleaseSnapshot(snapshot);
}
if (!is_consistent) {
thread->stats.AddErrors(1);
// Fail fast to preserve the DB state.
thread->shared->SetVerificationFailure();
} else if (s.ok()) {
thread->stats.AddGets(1, 1);
} else if (s.IsNotFound()) {
thread->stats.AddGets(1, 0);
......@@ -4493,7 +4574,7 @@ int main(int argc, char** argv) {
"Error: clear_column_family_one_in must be 0 when using backup\n");
exit(1);
}
if (FLAGS_test_atomic_flush) {
if (FLAGS_test_cf_consistency && FLAGS_disable_wal) {
FLAGS_atomic_flush = true;
}
if (FLAGS_read_only) {
......@@ -4539,8 +4620,8 @@ int main(int argc, char** argv) {
rocksdb_kill_prefix_blacklist = SplitString(FLAGS_kill_prefix_blacklist);
std::unique_ptr<rocksdb::StressTest> stress;
if (FLAGS_test_atomic_flush) {
stress.reset(new rocksdb::AtomicFlushStressTest());
if (FLAGS_test_cf_consistency) {
stress.reset(new rocksdb::CfConsistencyStressTest());
} else if (FLAGS_test_batches_snapshots) {
stress.reset(new rocksdb::BatchedOpsStressTest());
} else {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment