Unverified Commit fdd9bda0 authored by Huachao Huang's avatar Huachao Huang Committed by GitHub

Squashed 'librocksdb_sys/rocksdb/' changes from 185a70d..ba20fcf (#195)

parent b127863a
......@@ -450,8 +450,9 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
if (vstorage->NumLevelFiles(lvl) > 0) {
bool overlap_with_level = false;
status = IngestedFileOverlapWithLevel(sv, file_to_ingest, lvl,
&overlap_with_level);
status = sv->current->OverlapWithLevelIterator(ro, env_options_,
file_to_ingest->smallest_user_key, file_to_ingest->largest_user_key,
lvl, &overlap_with_level);
if (!status.ok()) {
return status;
}
......
......@@ -79,6 +79,33 @@ int FindFileInRange(const InternalKeyComparator& icmp,
return right;
}
Status OverlapWithIterator(const Comparator* ucmp,
const Slice& smallest_user_key,
const Slice& largest_user_key,
InternalIterator* iter,
bool* overlap) {
InternalKey range_start(smallest_user_key, kMaxSequenceNumber,
kValueTypeForSeek);
iter->Seek(range_start.Encode());
if (!iter->status().ok()) {
return iter->status();
}
*overlap = false;
if (iter->Valid()) {
ParsedInternalKey seek_result;
if (!ParseInternalKey(iter->key(), &seek_result)) {
return Status::Corruption("DB have corrupted keys");
}
if (ucmp->Compare(seek_result.user_key, largest_user_key) <= 0) {
*overlap = true;
}
}
return iter->status();
}
// Class to help choose the next file to search for the particular key.
// Searches and returns files level by level.
// We can search level-by-level since entries never hop across
......@@ -875,6 +902,65 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options,
}
}
Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
const EnvOptions& env_options,
const Slice& smallest_user_key,
const Slice& largest_user_key,
int level, bool* overlap) {
assert(storage_info_.finalized_);
auto icmp = cfd_->internal_comparator();
auto ucmp = icmp.user_comparator();
Arena arena;
Status status;
RangeDelAggregator range_del_agg(icmp, {}, false);
*overlap = false;
if (level == 0) {
for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
const auto file = &storage_info_.LevelFilesBrief(0).files[i];
if (AfterFile(ucmp, &smallest_user_key, file) ||
BeforeFile(ucmp, &largest_user_key, file)) {
continue;
}
ScopedArenaIterator iter(cfd_->table_cache()->NewIterator(
read_options, env_options, cfd_->internal_comparator(), file->fd,
&range_del_agg, nullptr, cfd_->internal_stats()->GetFileReadHist(0),
false, &arena, false /* skip_filters */, 0 /* level */));
status = OverlapWithIterator(
ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
if (!status.ok() || *overlap) {
break;
}
}
} else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
auto mem = arena.AllocateAligned(sizeof(LevelFileIteratorState));
auto state = new (mem)
LevelFileIteratorState(cfd_->table_cache(), read_options, env_options,
cfd_->internal_comparator(),
cfd_->internal_stats()->GetFileReadHist(level),
false /* for_compaction */,
cfd_->ioptions()->prefix_extractor != nullptr,
IsFilterSkipped(level), level, &range_del_agg);
mem = arena.AllocateAligned(sizeof(LevelFileNumIterator));
auto first_level_iter = new (mem) LevelFileNumIterator(
cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
should_sample_file_read());
ScopedArenaIterator iter(
NewTwoLevelIterator(state, first_level_iter, &arena, false));
status = OverlapWithIterator(
ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
}
if (status.ok() && *overlap == false &&
range_del_agg.IsRangeOverlapped(smallest_user_key, largest_user_key)) {
*overlap = true;
}
return status;
}
VersionStorageInfo::VersionStorageInfo(
const InternalKeyComparator* internal_comparator,
const Comparator* user_comparator, int levels,
......
......@@ -462,6 +462,11 @@ class Version {
MergeIteratorBuilder* merger_iter_builder,
int level, RangeDelAggregator* range_del_agg);
Status OverlapWithLevelIterator(const ReadOptions&, const EnvOptions&,
const Slice& smallest_user_key,
const Slice& largest_user_key,
int level, bool* overlap);
// Lookup the value for key. If found, store it in *val and
// return OK. Else return a non-OK status.
// Uses *operands to store merge_operator operations to apply later.
......
......@@ -67,7 +67,8 @@ BlockBasedFilterBlockBuilder::BlockBasedFilterBlockBuilder(
prefix_extractor_(prefix_extractor),
whole_key_filtering_(table_opt.whole_key_filtering),
prev_prefix_start_(0),
prev_prefix_size_(0) {
prev_prefix_size_(0),
num_added_(0) {
assert(policy_);
}
......@@ -91,6 +92,7 @@ void BlockBasedFilterBlockBuilder::Add(const Slice& key) {
// Add key to filter if needed
inline void BlockBasedFilterBlockBuilder::AddKey(const Slice& key) {
num_added_++;
start_.push_back(entries_.size());
entries_.append(key.data(), key.size());
}
......@@ -106,10 +108,9 @@ inline void BlockBasedFilterBlockBuilder::AddPrefix(const Slice& key) {
Slice prefix = prefix_extractor_->Transform(key);
// insert prefix only when it's different from the previous prefix.
if (prev.size() == 0 || prefix != prev) {
start_.push_back(entries_.size());
AddKey(prefix);
prev_prefix_start_ = entries_.size();
prev_prefix_size_ = prefix.size();
entries_.append(prefix.data(), prefix.size());
}
}
......
......@@ -41,6 +41,7 @@ class BlockBasedFilterBlockBuilder : public FilterBlockBuilder {
virtual bool IsBlockBased() override { return true; }
virtual void StartBlock(uint64_t block_offset) override;
virtual void Add(const Slice& key) override;
virtual size_t NumAdded() const override { return num_added_; }
virtual Slice Finish(const BlockHandle& tmp, Status* status) override;
using FilterBlockBuilder::Finish;
......@@ -65,6 +66,7 @@ class BlockBasedFilterBlockBuilder : public FilterBlockBuilder {
std::string result_; // Filter data computed so far
std::vector<Slice> tmp_entries_; // policy_->CreateFilter() argument
std::vector<uint32_t> filter_offsets_;
size_t num_added_; // Number of keys added
// No copying allowed
BlockBasedFilterBlockBuilder(const BlockBasedFilterBlockBuilder&);
......
......@@ -65,6 +65,7 @@ TEST_F(FilterBlockTest, EmptyBuilder) {
TEST_F(FilterBlockTest, SingleChunk) {
BlockBasedFilterBlockBuilder builder(nullptr, table_options_);
ASSERT_EQ(0, builder.NumAdded());
builder.StartBlock(100);
builder.Add("foo");
builder.Add("bar");
......@@ -73,6 +74,7 @@ TEST_F(FilterBlockTest, SingleChunk) {
builder.Add("box");
builder.StartBlock(300);
builder.Add("hello");
ASSERT_EQ(5, builder.NumAdded());
BlockContents block(builder.Finish(), false, kNoCompression);
BlockBasedFilterBlockReader reader(nullptr, table_options_, true,
std::move(block), nullptr);
......
......@@ -649,8 +649,11 @@ Status BlockBasedTableBuilder::Finish() {
BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle,
compression_dict_block_handle, range_del_block_handle;
// Write filter block
if (ok() && r->filter_builder != nullptr) {
bool empty_filter_block = (r->filter_builder == nullptr ||
r->filter_builder->NumAdded() == 0);
if (ok() && !empty_filter_block) {
Status s = Status::Incomplete();
while (s.IsIncomplete()) {
Slice filter_content = r->filter_builder->Finish(filter_block_handle, &s);
......@@ -686,7 +689,7 @@ Status BlockBasedTableBuilder::Finish() {
}
if (ok()) {
if (r->filter_builder != nullptr) {
if (!empty_filter_block) {
// Add mapping from "<filter_block_prefix>.Name" to location
// of filter data.
std::string key;
......
......@@ -51,6 +51,7 @@ class FilterBlockBuilder {
virtual bool IsBlockBased() = 0; // If is blockbased filter
virtual void StartBlock(uint64_t block_offset) = 0; // Start new block filter
virtual void Add(const Slice& key) = 0; // Add a key to current filter
virtual size_t NumAdded() const = 0; // Number of keys added
Slice Finish() { // Generate Filter
const BlockHandle empty_handle;
Status dont_care_status;
......
......@@ -45,6 +45,7 @@ class FullFilterBlockBuilder : public FilterBlockBuilder {
virtual bool IsBlockBased() override { return false; }
virtual void StartBlock(uint64_t block_offset) override {}
virtual void Add(const Slice& key) override;
virtual size_t NumAdded() const override { return num_added_; }
virtual Slice Finish(const BlockHandle& tmp, Status* status) override;
using FilterBlockBuilder::Finish;
......
......@@ -163,11 +163,13 @@ TEST_F(FullFilterBlockTest, EmptyBuilder) {
TEST_F(FullFilterBlockTest, SingleChunk) {
FullFilterBlockBuilder builder(
nullptr, true, table_options_.filter_policy->GetFilterBitsBuilder());
ASSERT_EQ(0, builder.NumAdded());
builder.Add("foo");
builder.Add("bar");
builder.Add("box");
builder.Add("box");
builder.Add("hello");
ASSERT_EQ(5, builder.NumAdded());
Slice block = builder.Finish();
FullFilterBlockReader reader(
nullptr, true, block,
......
......@@ -25,7 +25,8 @@ PartitionedFilterBlockBuilder::PartitionedFilterBlockBuilder(
filter_bits_builder),
index_on_filter_block_builder_(index_block_restart_interval),
p_index_builder_(p_index_builder),
filters_in_partition_(0) {
filters_in_partition_(0),
num_added_(0) {
filters_per_partition_ =
filter_bits_builder_->CalculateNumEntry(partition_size);
}
......@@ -53,6 +54,7 @@ void PartitionedFilterBlockBuilder::AddKey(const Slice& key) {
MaybeCutAFilterBlock();
filter_bits_builder_->AddKey(key);
filters_in_partition_++;
num_added_++;
}
Slice PartitionedFilterBlockBuilder::Finish(
......
......@@ -33,6 +33,8 @@ class PartitionedFilterBlockBuilder : public FullFilterBlockBuilder {
void AddKey(const Slice& key) override;
size_t NumAdded() const override { return num_added_; }
virtual Slice Finish(const BlockHandle& last_partition_block_handle,
Status* status) override;
......@@ -59,6 +61,8 @@ class PartitionedFilterBlockBuilder : public FullFilterBlockBuilder {
uint32_t filters_per_partition_;
// The current number of filters in the last partition
uint32_t filters_in_partition_;
// Number of keys added
size_t num_added_;
};
class PartitionedFilterBlockReader : public FilterBlockReader {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment