Unverified Commit 1939ecba authored by Huachao Huang's avatar Huachao Huang Committed by GitHub

Squashed 'librocksdb_sys/rocksdb/' changes from 185a70d50e..d7bab10ebe (#229)

parent 8f271417
......@@ -3501,12 +3501,13 @@ TEST_F(DBCompactionTest, CompactRangeDelayedByL0FileCount) {
// ensure the auto compaction doesn't finish until manual compaction has
// had a chance to be delayed.
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::CompactRange:StallWait", "CompactionJob::Run():End"}});
{{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
"CompactionJob::Run():End"}});
} else {
// ensure the auto-compaction doesn't finish until manual compaction has
// continued without delay.
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::CompactRange:StallWaitDone", "CompactionJob::Run():End"}});
{{"DBImpl::FlushMemTable:StallWaitDone", "CompactionJob::Run():End"}});
}
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
......@@ -3554,12 +3555,13 @@ TEST_F(DBCompactionTest, CompactRangeDelayedByImmMemTableCount) {
// ensure the flush doesn't finish until manual compaction has had a
// chance to be delayed.
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::CompactRange:StallWait", "FlushJob::WriteLevel0Table"}});
{{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
"FlushJob::WriteLevel0Table"}});
} else {
// ensure the flush doesn't finish until manual compaction has continued
// without delay.
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::CompactRange:StallWaitDone",
{{"DBImpl::FlushMemTable:StallWaitDone",
"FlushJob::WriteLevel0Table"}});
}
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
......@@ -3569,6 +3571,7 @@ TEST_F(DBCompactionTest, CompactRangeDelayedByImmMemTableCount) {
ASSERT_OK(Put(Key(0), RandomString(&rnd, 1024)));
FlushOptions flush_opts;
flush_opts.wait = false;
flush_opts.allow_write_stall = true;
dbfull()->Flush(flush_opts);
}
......@@ -3604,7 +3607,7 @@ TEST_F(DBCompactionTest, CompactRangeShutdownWhileDelayed) {
// The auto-compaction waits until the manual compaction finishes to ensure
// the signal comes from closing CF/DB, not from compaction making progress.
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::CompactRange:StallWait",
{{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
"DBCompactionTest::CompactRangeShutdownWhileDelayed:PreShutdown"},
{"DBCompactionTest::CompactRangeShutdownWhileDelayed:PostManual",
"CompactionJob::Run():End"}});
......@@ -3655,18 +3658,21 @@ TEST_F(DBCompactionTest, CompactRangeSkipFlushAfterDelay) {
// began. So it unblocks CompactRange and precludes its flush. Throughout the
// test, stall conditions are upheld via high L0 file count.
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::CompactRange:StallWait",
{{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
"DBCompactionTest::CompactRangeSkipFlushAfterDelay:PreFlush"},
{"DBCompactionTest::CompactRangeSkipFlushAfterDelay:PostFlush",
"DBImpl::CompactRange:StallWaitDone"},
{"DBImpl::CompactRange:StallWaitDone", "CompactionJob::Run():End"}});
"DBImpl::FlushMemTable:StallWaitDone"},
{"DBImpl::FlushMemTable:StallWaitDone", "CompactionJob::Run():End"}});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
//used for the delayable flushes
FlushOptions flush_opts;
flush_opts.allow_write_stall = true;
for (int i = 0; i < kNumL0FilesLimit - 1; ++i) {
for (int j = 0; j < 2; ++j) {
ASSERT_OK(Put(Key(j), RandomString(&rnd, 1024)));
}
Flush();
dbfull()->Flush(flush_opts);
}
auto manual_compaction_thread = port::Thread([this]() {
CompactRangeOptions cro;
......@@ -3676,7 +3682,7 @@ TEST_F(DBCompactionTest, CompactRangeSkipFlushAfterDelay) {
TEST_SYNC_POINT("DBCompactionTest::CompactRangeSkipFlushAfterDelay:PreFlush");
Put(ToString(0), RandomString(&rnd, 1024));
Flush();
dbfull()->Flush(flush_opts);
Put(ToString(0), RandomString(&rnd, 1024));
TEST_SYNC_POINT("DBCompactionTest::CompactRangeSkipFlushAfterDelay:PostFlush");
manual_compaction_thread.join();
......
......@@ -35,6 +35,7 @@ TEST_F(DBFlushTest, FlushWhileWritingManifest) {
Reopen(options);
FlushOptions no_wait;
no_wait.wait = false;
no_wait.allow_write_stall=true;
SyncPoint::GetInstance()->LoadDependency(
{{"VersionSet::LogAndApply:WriteManifest",
......
......@@ -385,7 +385,7 @@ class DBImpl : public DB {
Status TEST_SwitchMemtable(ColumnFamilyData* cfd = nullptr);
// Force current memtable contents to be flushed.
Status TEST_FlushMemTable(bool wait = true,
Status TEST_FlushMemTable(bool wait = true, bool allow_write_stall = false,
ColumnFamilyHandle* cfh = nullptr);
// Wait for memtable compaction
......@@ -904,6 +904,10 @@ class DBImpl : public DB {
Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options,
FlushReason flush_reason, bool writes_stopped = false);
// Wait until flushing this column family won't stall writes
Status WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
bool* flush_needed);
// Wait for memtable flushed.
// If flush_memtable_id is non-null, wait until the memtable with the ID
// gets flush. Otherwise, wait until the column family don't have any
......
......@@ -324,60 +324,12 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
CleanupSuperVersion(super_version);
}
if (!options.allow_write_stall && flush_needed) {
InstrumentedMutexLock l(&mutex_);
uint64_t orig_active_memtable_id = cfd->mem()->GetID();
WriteStallCondition write_stall_condition = WriteStallCondition::kNormal;
do {
if (write_stall_condition != WriteStallCondition::kNormal) {
TEST_SYNC_POINT("DBImpl::CompactRange:StallWait");
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] CompactRange waiting on stall conditions to clear",
cfd->GetName().c_str());
bg_cv_.Wait();
}
if (cfd->IsDropped() || shutting_down_.load(std::memory_order_acquire)) {
return Status::ShutdownInProgress();
}
uint64_t earliest_memtable_id =
std::min(cfd->mem()->GetID(), cfd->imm()->GetEarliestMemTableID());
if (earliest_memtable_id > orig_active_memtable_id) {
// We waited so long that the memtable we were originally waiting on was
// flushed.
flush_needed = false;
break;
}
const auto& mutable_cf_options = *cfd->GetLatestMutableCFOptions();
const auto* vstorage = cfd->current()->storage_info();
// Skip stalling check if we're below auto-flush and auto-compaction
// triggers. If it stalled in these conditions, that'd mean the stall
// triggers are so low that stalling is needed for any background work. In
// that case we shouldn't wait since background work won't be scheduled.
if (cfd->imm()->NumNotFlushed() <
cfd->ioptions()->min_write_buffer_number_to_merge &&
vstorage->l0_delay_trigger_count() <
mutable_cf_options.level0_file_num_compaction_trigger) {
break;
}
// check whether one extra immutable memtable or an extra L0 file would
// cause write stalling mode to be entered. It could still enter stall
// mode due to pending compaction bytes, but that's less common
write_stall_condition =
ColumnFamilyData::GetWriteStallConditionAndCause(
cfd->imm()->NumNotFlushed() + 1,
vstorage->l0_delay_trigger_count() + 1,
vstorage->estimated_compaction_needed_bytes(), mutable_cf_options)
.first;
} while (write_stall_condition != WriteStallCondition::kNormal);
}
TEST_SYNC_POINT("DBImpl::CompactRange:StallWaitDone");
Status s;
if (flush_needed) {
s = FlushMemTable(cfd, FlushOptions(), FlushReason::kManualCompaction);
FlushOptions fo;
fo.allow_write_stall = options.allow_write_stall;
s = FlushMemTable(cfd, fo, FlushReason::kManualCompaction,
false /* writes_stopped*/);
if (!s.ok()) {
LogFlush(immutable_db_options_.info_log);
return s;
......@@ -1078,6 +1030,14 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
FlushReason flush_reason, bool writes_stopped) {
Status s;
uint64_t flush_memtable_id = 0;
if (!flush_options.allow_write_stall) {
bool flush_needed = true;
s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed);
TEST_SYNC_POINT("DBImpl::FlushMemTable:StallWaitDone");
if (!s.ok() || !flush_needed) {
return s;
}
}
{
WriteContext context;
InstrumentedMutexLock guard_lock(&mutex_);
......@@ -1116,6 +1076,69 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
return s;
}
// Calling FlushMemTable(), whether from DB::Flush() or from Backup Engine, can
// cause write stall, for example if one memtable is being flushed already.
// This method tries to avoid write stall (similar to CompactRange() behavior)
// it emulates how the SuperVersion / LSM would change if flush happens, checks
// it against various constrains and delays flush if it'd cause write stall.
// Called should check status and flush_needed to see if flush already happened.
Status DBImpl::WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
bool* flush_needed) {
{
*flush_needed = true;
InstrumentedMutexLock l(&mutex_);
uint64_t orig_active_memtable_id = cfd->mem()->GetID();
WriteStallCondition write_stall_condition = WriteStallCondition::kNormal;
do {
if (write_stall_condition != WriteStallCondition::kNormal) {
TEST_SYNC_POINT("DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait");
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] WaitUntilFlushWouldNotStallWrites"
" waiting on stall conditions to clear",
cfd->GetName().c_str());
bg_cv_.Wait();
}
if (cfd->IsDropped() || shutting_down_.load(std::memory_order_acquire)) {
return Status::ShutdownInProgress();
}
uint64_t earliest_memtable_id =
std::min(cfd->mem()->GetID(), cfd->imm()->GetEarliestMemTableID());
if (earliest_memtable_id > orig_active_memtable_id) {
// We waited so long that the memtable we were originally waiting on was
// flushed.
*flush_needed = false;
return Status::OK();
}
const auto& mutable_cf_options = *cfd->GetLatestMutableCFOptions();
const auto* vstorage = cfd->current()->storage_info();
// Skip stalling check if we're below auto-flush and auto-compaction
// triggers. If it stalled in these conditions, that'd mean the stall
// triggers are so low that stalling is needed for any background work. In
// that case we shouldn't wait since background work won't be scheduled.
if (cfd->imm()->NumNotFlushed() <
cfd->ioptions()->min_write_buffer_number_to_merge &&
vstorage->l0_delay_trigger_count() <
mutable_cf_options.level0_file_num_compaction_trigger) {
break;
}
// check whether one extra immutable memtable or an extra L0 file would
// cause write stalling mode to be entered. It could still enter stall
// mode due to pending compaction bytes, but that's less common
write_stall_condition =
ColumnFamilyData::GetWriteStallConditionAndCause(
cfd->imm()->NumNotFlushed() + 1,
vstorage->l0_delay_trigger_count() + 1,
vstorage->estimated_compaction_needed_bytes(), mutable_cf_options)
.first;
} while (write_stall_condition != WriteStallCondition::kNormal);
}
return Status::OK();
}
Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd,
const uint64_t* flush_memtable_id) {
Status s;
......
......@@ -100,9 +100,11 @@ Status DBImpl::TEST_SwitchMemtable(ColumnFamilyData* cfd) {
return SwitchMemtable(cfd, &write_context);
}
Status DBImpl::TEST_FlushMemTable(bool wait, ColumnFamilyHandle* cfh) {
Status DBImpl::TEST_FlushMemTable(bool wait, bool allow_write_stall,
ColumnFamilyHandle* cfh) {
FlushOptions fo;
fo.wait = wait;
fo.allow_write_stall = allow_write_stall;
ColumnFamilyData* cfd;
if (cfh == nullptr) {
cfd = default_cf_handle_->cfd();
......
......@@ -4327,7 +4327,7 @@ TEST_F(DBTest, DynamicCompactionOptions) {
// Clean up memtable and L0. Block compaction threads. If continue to write
// and flush memtables. We should see put stop after 8 memtable flushes
// since level0_stop_writes_trigger = 8
dbfull()->TEST_FlushMemTable(true);
dbfull()->TEST_FlushMemTable(true, true);
dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
// Block compaction
test::SleepingBackgroundTask sleeping_task_low;
......@@ -4340,7 +4340,7 @@ TEST_F(DBTest, DynamicCompactionOptions) {
WriteOptions wo;
while (count < 64) {
ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), wo));
dbfull()->TEST_FlushMemTable(true);
dbfull()->TEST_FlushMemTable(true, true);
count++;
if (dbfull()->TEST_write_controler().IsStopped()) {
sleeping_task_low.WakeUp();
......@@ -4368,7 +4368,7 @@ TEST_F(DBTest, DynamicCompactionOptions) {
count = 0;
while (count < 64) {
ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), wo));
dbfull()->TEST_FlushMemTable(true);
dbfull()->TEST_FlushMemTable(true, true);
count++;
if (dbfull()->TEST_write_controler().IsStopped()) {
sleeping_task_low.WakeUp();
......@@ -5508,7 +5508,7 @@ TEST_F(DBTest, SoftLimit) {
for (int i = 0; i < 72; i++) {
Put(Key(i), std::string(5000, 'x'));
if (i % 10 == 0) {
Flush();
dbfull()->TEST_FlushMemTable(true, true);
}
}
dbfull()->TEST_WaitForCompact();
......@@ -5518,7 +5518,7 @@ TEST_F(DBTest, SoftLimit) {
for (int i = 0; i < 72; i++) {
Put(Key(i), std::string(5000, 'x'));
if (i % 10 == 0) {
Flush();
dbfull()->TEST_FlushMemTable(true, true);
}
}
dbfull()->TEST_WaitForCompact();
......@@ -5537,7 +5537,7 @@ TEST_F(DBTest, SoftLimit) {
Put(Key(i), std::string(5000, 'x'));
Put(Key(100 - i), std::string(5000, 'x'));
// Flush the file. File size is around 30KB.
Flush();
dbfull()->TEST_FlushMemTable(true, true);
}
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kDelayed));
......@@ -5572,7 +5572,7 @@ TEST_F(DBTest, SoftLimit) {
Put(Key(10 + i), std::string(5000, 'x'));
Put(Key(90 - i), std::string(5000, 'x'));
// Flush the file. File size is around 30KB.
Flush();
dbfull()->TEST_FlushMemTable(true, true);
}
// Wake up sleep task to enable compaction to run and waits
......@@ -5593,7 +5593,7 @@ TEST_F(DBTest, SoftLimit) {
Put(Key(20 + i), std::string(5000, 'x'));
Put(Key(80 - i), std::string(5000, 'x'));
// Flush the file. File size is around 30KB.
Flush();
dbfull()->TEST_FlushMemTable(true, true);
}
// Wake up sleep task to enable compaction to run and waits
// for it to go to sleep state again to make sure one compaction
......
......@@ -417,7 +417,9 @@ TEST_F(EventListenerTest, DisableBGCompaction) {
for (int i = 0; static_cast<int>(cf_meta.file_count) < kSlowdownTrigger * 10;
++i) {
Put(1, ToString(i), std::string(10000, 'x'), WriteOptions());
db_->Flush(FlushOptions(), handles_[1]);
FlushOptions fo;
fo.allow_write_stall = true;
db_->Flush(fo, handles_[1]);
db_->GetColumnFamilyMetaData(handles_[1], &cf_meta);
}
ASSERT_GE(listener->slowdown_count, kSlowdownTrigger * 9);
......
......@@ -1181,8 +1181,13 @@ struct FlushOptions {
// If true, the flush will wait until the flush is done.
// Default: true
bool wait;
FlushOptions() : wait(true) {}
// If true, the flush would proceed immediately even it means writes will
// stall for the duration of the flush; if false the operation will wait
// until it's possible to do flush w/o causing stall or until required flush
// is performed by someone else (foreground call or background thread).
// Default: false
bool allow_write_stall;
FlushOptions() : wait(true), allow_write_stall(false) {}
};
// Create a Logger from provided DBOptions
......
......@@ -1717,7 +1717,7 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest) {
}
// flush only cfa memtable
s = db_impl->TEST_FlushMemTable(true, cfa);
s = db_impl->TEST_FlushMemTable(true, false, cfa);
ASSERT_OK(s);
switch (txn_db_options.write_policy) {
......@@ -1736,7 +1736,7 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest) {
}
// flush only cfb memtable
s = db_impl->TEST_FlushMemTable(true, cfb);
s = db_impl->TEST_FlushMemTable(true, false, cfb);
ASSERT_OK(s);
// should show not dependency on logs
......@@ -5434,7 +5434,7 @@ TEST_P(TransactionTest, DuplicateKeys) {
db->FlushWAL(true);
// Flush only cf 1
reinterpret_cast<DBImpl*>(db->GetRootDB())
->TEST_FlushMemTable(true, handles[1]);
->TEST_FlushMemTable(true, false, handles[1]);
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
ASSERT_OK(ReOpenNoDelete(cfds, &handles));
txn0 = db->GetTransactionByName("xid");
......@@ -5472,7 +5472,7 @@ TEST_P(TransactionTest, DuplicateKeys) {
ASSERT_OK(db->FlushWAL(true));
// Flush only cf 1
reinterpret_cast<DBImpl*>(db->GetRootDB())
->TEST_FlushMemTable(true, handles[1]);
->TEST_FlushMemTable(true, false, handles[1]);
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
ASSERT_OK(ReOpenNoDelete(cfds, &handles));
txn0 = db->GetTransactionByName("xid");
......@@ -5505,7 +5505,7 @@ TEST_P(TransactionTest, DuplicateKeys) {
ASSERT_OK(db->FlushWAL(true));
// Flush only cf 1
reinterpret_cast<DBImpl*>(db->GetRootDB())
->TEST_FlushMemTable(true, handles[1]);
->TEST_FlushMemTable(true, false, handles[1]);
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
ASSERT_OK(ReOpenNoDelete(cfds, &handles));
txn0 = db->GetTransactionByName("xid");
......@@ -5532,7 +5532,7 @@ TEST_P(TransactionTest, DuplicateKeys) {
ASSERT_OK(db->FlushWAL(true));
// Flush only cf 1
reinterpret_cast<DBImpl*>(db->GetRootDB())
->TEST_FlushMemTable(true, handles[1]);
->TEST_FlushMemTable(true, false, handles[1]);
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
ASSERT_OK(ReOpenNoDelete(cfds, &handles));
txn0 = db->GetTransactionByName("xid");
......@@ -5559,7 +5559,7 @@ TEST_P(TransactionTest, DuplicateKeys) {
ASSERT_OK(db->FlushWAL(true));
// Flush only cf 1
reinterpret_cast<DBImpl*>(db->GetRootDB())
->TEST_FlushMemTable(true, handles[1]);
->TEST_FlushMemTable(true, false, handles[1]);
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
ASSERT_OK(ReOpenNoDelete(cfds, &handles));
txn0 = db->GetTransactionByName("xid");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment