Commit eb537e9a authored by Huachao Huang's avatar Huachao Huang

Merge commit 'c567d822' into release-2.0

parents d2fe0a96 c567d822
...@@ -715,6 +715,8 @@ void DBImpl::MarkLogsSynced( ...@@ -715,6 +715,8 @@ void DBImpl::MarkLogsSynced(
assert(log.getting_synced); assert(log.getting_synced);
if (status.ok() && logs_.size() > 1) { if (status.ok() && logs_.size() > 1) {
logs_to_free_.push_back(log.ReleaseWriter()); logs_to_free_.push_back(log.ReleaseWriter());
// To modify logs_ both mutex_ and log_write_mutex_ must be held
InstrumentedMutexLock l(&log_write_mutex_);
it = logs_.erase(it); it = logs_.erase(it);
} else { } else {
log.getting_synced = false; log.getting_synced = false;
...@@ -972,6 +974,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, ...@@ -972,6 +974,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
RecordTick(stats_, MEMTABLE_HIT); RecordTick(stats_, MEMTABLE_HIT);
} }
if (!done && !s.ok() && !s.IsMergeInProgress()) { if (!done && !s.ok() && !s.IsMergeInProgress()) {
ReturnAndCleanupSuperVersion(cfd, sv);
return s; return s;
} }
} }
......
...@@ -49,6 +49,9 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) { ...@@ -49,6 +49,9 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) {
"[JOB %d] Syncing log #%" PRIu64, job_context->job_id, "[JOB %d] Syncing log #%" PRIu64, job_context->job_id,
log->get_log_number()); log->get_log_number());
s = log->file()->Sync(immutable_db_options_.use_fsync); s = log->file()->Sync(immutable_db_options_.use_fsync);
if (!s.ok()) {
break;
}
} }
if (s.ok()) { if (s.ok()) {
s = directories_.GetWalDir()->Fsync(); s = directories_.GetWalDir()->Fsync();
......
...@@ -57,8 +57,11 @@ Status Writer::AddRecord(const Slice& slice) { ...@@ -57,8 +57,11 @@ Status Writer::AddRecord(const Slice& slice) {
// Fill the trailer (literal below relies on kHeaderSize and // Fill the trailer (literal below relies on kHeaderSize and
// kRecyclableHeaderSize being <= 11) // kRecyclableHeaderSize being <= 11)
assert(header_size <= 11); assert(header_size <= 11);
dest_->Append( s = dest_->Append(Slice("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
Slice("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", leftover)); static_cast<size_t>(leftover)));
if (!s.ok()) {
break;
}
} }
block_offset_ = 0; block_offset_ = 0;
} }
......
...@@ -74,7 +74,7 @@ TEST_F(RepairTest, CorruptManifest) { ...@@ -74,7 +74,7 @@ TEST_F(RepairTest, CorruptManifest) {
Close(); Close();
ASSERT_OK(env_->FileExists(manifest_path)); ASSERT_OK(env_->FileExists(manifest_path));
CreateFile(env_, manifest_path, "blah"); CreateFile(env_, manifest_path, "blah", false /* use_fsync */);
ASSERT_OK(RepairDB(dbname_, CurrentOptions())); ASSERT_OK(RepairDB(dbname_, CurrentOptions()));
Reopen(CurrentOptions()); Reopen(CurrentOptions());
...@@ -136,7 +136,7 @@ TEST_F(RepairTest, CorruptSst) { ...@@ -136,7 +136,7 @@ TEST_F(RepairTest, CorruptSst) {
Flush(); Flush();
auto sst_path = GetFirstSstPath(); auto sst_path = GetFirstSstPath();
ASSERT_FALSE(sst_path.empty()); ASSERT_FALSE(sst_path.empty());
CreateFile(env_, sst_path, "blah"); CreateFile(env_, sst_path, "blah", false /* use_fsync */);
Close(); Close();
ASSERT_OK(RepairDB(dbname_, CurrentOptions())); ASSERT_OK(RepairDB(dbname_, CurrentOptions()));
......
...@@ -2565,7 +2565,9 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, ...@@ -2565,7 +2565,9 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
// Unlock during expensive operations. New writes cannot get here // Unlock during expensive operations. New writes cannot get here
// because &w is ensuring that all new writes get queued. // because &w is ensuring that all new writes get queued.
{ {
// Before releasing mutex, make a copy of mutable_cf_options and pass to
// `PrepareApply` to avoided a potential data race with backgroundflush
MutableCFOptions mutable_cf_options_copy(mutable_cf_options);
mu->Unlock(); mu->Unlock();
TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest"); TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest");
...@@ -2605,7 +2607,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, ...@@ -2605,7 +2607,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
if (!w.edit_list.front()->IsColumnFamilyManipulation()) { if (!w.edit_list.front()->IsColumnFamilyManipulation()) {
// This is cpu-heavy operations, which should be called outside mutex. // This is cpu-heavy operations, which should be called outside mutex.
v->PrepareApply(mutable_cf_options, true); v->PrepareApply(mutable_cf_options_copy, true);
} }
// Write new record to MANIFEST log // Write new record to MANIFEST log
......
...@@ -62,13 +62,12 @@ Status CopyFile(Env* env, const std::string& source, ...@@ -62,13 +62,12 @@ Status CopyFile(Env* env, const std::string& source,
} }
size -= slice.size(); size -= slice.size();
} }
dest_writer->Sync(use_fsync); return dest_writer->Sync(use_fsync);
return Status::OK();
} }
// Utility function to create a file with the provided contents // Utility function to create a file with the provided contents
Status CreateFile(Env* env, const std::string& destination, Status CreateFile(Env* env, const std::string& destination,
const std::string& contents) { const std::string& contents, bool use_fsync) {
const EnvOptions soptions; const EnvOptions soptions;
Status s; Status s;
unique_ptr<WritableFileWriter> dest_writer; unique_ptr<WritableFileWriter> dest_writer;
...@@ -79,7 +78,11 @@ Status CreateFile(Env* env, const std::string& destination, ...@@ -79,7 +78,11 @@ Status CreateFile(Env* env, const std::string& destination,
return s; return s;
} }
dest_writer.reset(new WritableFileWriter(std::move(destfile), soptions)); dest_writer.reset(new WritableFileWriter(std::move(destfile), soptions));
return dest_writer->Append(Slice(contents)); s = dest_writer->Append(Slice(contents));
if (!s.ok()) {
return s;
}
return dest_writer->Sync(use_fsync);
} }
Status DeleteSSTFile(const ImmutableDBOptions* db_options, Status DeleteSSTFile(const ImmutableDBOptions* db_options,
......
...@@ -19,7 +19,7 @@ extern Status CopyFile(Env* env, const std::string& source, ...@@ -19,7 +19,7 @@ extern Status CopyFile(Env* env, const std::string& source,
bool use_fsync); bool use_fsync);
extern Status CreateFile(Env* env, const std::string& destination, extern Status CreateFile(Env* env, const std::string& destination,
const std::string& contents); const std::string& contents, bool use_fsync);
extern Status DeleteSSTFile(const ImmutableDBOptions* db_options, extern Status DeleteSSTFile(const ImmutableDBOptions* db_options,
const std::string& fname, uint32_t path_id); const std::string& fname, uint32_t path_id);
......
...@@ -82,7 +82,8 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir, ...@@ -82,7 +82,8 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
} /* copy_file_cb */, } /* copy_file_cb */,
[&](const std::string& fname, const std::string& contents, FileType) { [&](const std::string& fname, const std::string& contents, FileType) {
ROCKS_LOG_INFO(db_options.info_log, "Creating %s", fname.c_str()); ROCKS_LOG_INFO(db_options.info_log, "Creating %s", fname.c_str());
return CreateFile(db_->GetEnv(), full_private_path + fname, contents); return CreateFile(db_->GetEnv(), full_private_path + fname, contents,
db_options.use_fsync);
} /* create_file_cb */, } /* create_file_cb */,
&sequence_number, log_size_for_flush); &sequence_number, log_size_for_flush);
// we copied all the files, enable file deletions // we copied all the files, enable file deletions
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment