Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
T
titan
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
fangzongwu
titan
Commits
423ce1f7
Commit
423ce1f7
authored
Mar 27, 2020
by
Connor
Committed by
yiwu-arbug
May 11, 2020
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Add cf abtest for Titan (#151)
Signed-off-by:
Connor1996
<
zbk602423539@gmail.com
>
parent
5f839436
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
97 additions
and
16 deletions
+97
-16
titandb_stress.cc
tools/titandb_stress.cc
+97
-16
No files found.
tools/titandb_stress.cc
View file @
423ce1f7
...
@@ -138,9 +138,10 @@ DEFINE_bool(test_batches_snapshots, false,
...
@@ -138,9 +138,10 @@ DEFINE_bool(test_batches_snapshots, false,
DEFINE_bool
(
atomic_flush
,
false
,
DEFINE_bool
(
atomic_flush
,
false
,
"If set, enables atomic flush in the options.
\n
"
);
"If set, enables atomic flush in the options.
\n
"
);
DEFINE_bool
(
test_atomic_flush
,
false
,
DEFINE_bool
(
test_cf_consistency
,
false
,
"If set, runs the stress test dedicated to verifying atomic flush "
"If set, runs the stress test dedicated to verifying writes to "
"functionality. Setting this implies `atomic_flush=true`.
\n
"
);
"multiple column families are consistent. Setting this implies "
"`atomic_flush=true` is set true if `disable_wal=false`.
\n
"
);
DEFINE_int32
(
threads
,
32
,
"Number of concurrent threads to run."
);
DEFINE_int32
(
threads
,
32
,
"Number of concurrent threads to run."
);
...
@@ -2944,6 +2945,18 @@ class StressTest {
...
@@ -2944,6 +2945,18 @@ class StressTest {
for
(
const
auto
&
cfd
:
cf_descriptors
)
{
for
(
const
auto
&
cfd
:
cf_descriptors
)
{
titan_cf_descriptors
.
emplace_back
(
titan_cf_descriptors
.
emplace_back
(
titandb
::
TitanCFDescriptor
{
cfd
.
name
,
options_
});
titandb
::
TitanCFDescriptor
{
cfd
.
name
,
options_
});
if
(
FLAGS_test_cf_consistency
)
{
// if test cf consistency, make some column families not store value
// in blob
if
(
titan_cf_descriptors
.
size
()
%
2
==
0
)
{
titan_cf_descriptors
.
back
().
options
.
min_blob_size
=
1024
*
1024
*
1024
;
}
fprintf
(
stdout
,
"Create Titan column family %s with min_blob_size %lu
\n
"
,
cfd
.
name
.
c_str
(),
titan_cf_descriptors
.
back
().
options
.
min_blob_size
);
}
}
}
titandb
::
TitanDB
*
tdb
;
titandb
::
TitanDB
*
tdb
;
s
=
titandb
::
TitanDB
::
Open
(
titan_db_options
,
FLAGS_db
,
s
=
titandb
::
TitanDB
::
Open
(
titan_db_options
,
FLAGS_db
,
...
@@ -3219,6 +3232,14 @@ class NonBatchedOpsStressTest : public StressTest {
...
@@ -3219,6 +3232,14 @@ class NonBatchedOpsStressTest : public StressTest {
std
::
vector
<
titandb
::
TitanCFDescriptor
>
tmp
;
std
::
vector
<
titandb
::
TitanCFDescriptor
>
tmp
;
tmp
.
emplace_back
(
titandb
::
TitanCFDescriptor
{
tmp
.
emplace_back
(
titandb
::
TitanCFDescriptor
{
new_name
,
titandb
::
TitanCFOptions
(
options_
)});
new_name
,
titandb
::
TitanCFOptions
(
options_
)});
if
(
FLAGS_test_cf_consistency
)
{
if
(
new_column_family_name_
%
2
==
0
)
{
tmp
.
back
().
options
.
min_blob_size
=
1024
*
1024
*
1024
;
}
fprintf
(
stdout
,
"recreate Titan column family %s with min_blob_size %lu
\n
"
,
new_name
.
c_str
(),
tmp
.
back
().
options
.
min_blob_size
);
}
std
::
vector
<
ColumnFamilyHandle
*>
result
;
std
::
vector
<
ColumnFamilyHandle
*>
result
;
s
=
tdb
->
CreateColumnFamilies
(
tmp
,
&
result
);
s
=
tdb
->
CreateColumnFamilies
(
tmp
,
&
result
);
if
(
s
.
ok
())
{
if
(
s
.
ok
())
{
...
@@ -3982,11 +4003,11 @@ class BatchedOpsStressTest : public StressTest {
...
@@ -3982,11 +4003,11 @@ class BatchedOpsStressTest : public StressTest {
virtual
void
VerifyDb
(
ThreadState
*
/* thread */
)
const
{}
virtual
void
VerifyDb
(
ThreadState
*
/* thread */
)
const
{}
};
};
class
AtomicFlush
StressTest
:
public
StressTest
{
class
CfConsistency
StressTest
:
public
StressTest
{
public
:
public
:
AtomicFlush
StressTest
()
:
batch_id_
(
0
)
{}
CfConsistency
StressTest
()
:
batch_id_
(
0
)
{}
virtual
~
AtomicFlush
StressTest
()
{}
virtual
~
CfConsistency
StressTest
()
{}
virtual
Status
TestPut
(
ThreadState
*
thread
,
WriteOptions
&
write_opts
,
virtual
Status
TestPut
(
ThreadState
*
thread
,
WriteOptions
&
write_opts
,
const
ReadOptions
&
/* read_opts */
,
const
ReadOptions
&
/* read_opts */
,
...
@@ -4080,7 +4101,7 @@ class AtomicFlushStressTest : public StressTest {
...
@@ -4080,7 +4101,7 @@ class AtomicFlushStressTest : public StressTest {
std
::
unique_ptr
<
MutexLock
>&
/* lock */
)
{
std
::
unique_ptr
<
MutexLock
>&
/* lock */
)
{
assert
(
false
);
assert
(
false
);
fprintf
(
stderr
,
fprintf
(
stderr
,
"
AtomicFlush
StressTest does not support TestIngestExternalFile "
"
CfConsistency
StressTest does not support TestIngestExternalFile "
"because it's not possible to verify the result
\n
"
);
"because it's not possible to verify the result
\n
"
);
std
::
terminate
();
std
::
terminate
();
}
}
...
@@ -4090,12 +4111,72 @@ class AtomicFlushStressTest : public StressTest {
...
@@ -4090,12 +4111,72 @@ class AtomicFlushStressTest : public StressTest {
const
std
::
vector
<
int64_t
>&
rand_keys
)
{
const
std
::
vector
<
int64_t
>&
rand_keys
)
{
std
::
string
key_str
=
Key
(
rand_keys
[
0
]);
std
::
string
key_str
=
Key
(
rand_keys
[
0
]);
Slice
key
=
key_str
;
Slice
key
=
key_str
;
auto
cfh
=
Status
s
;
column_families_
[
rand_column_families
[
thread
->
rand
.
Next
()
%
bool
is_consistent
=
true
;
rand_column_families
.
size
()]];
std
::
string
from_db
;
if
(
thread
->
rand
.
OneIn
(
2
))
{
Status
s
=
db_
->
Get
(
readoptions
,
cfh
,
key
,
&
from_db
);
// 1/2 chance, does a random read from random CF
if
(
s
.
ok
())
{
auto
cfh
=
column_families_
[
rand_column_families
[
thread
->
rand
.
Next
()
%
rand_column_families
.
size
()]];
std
::
string
from_db
;
s
=
db_
->
Get
(
readoptions
,
cfh
,
key
,
&
from_db
);
}
else
{
// 1/2 chance, comparing one key is the same across all CFs
const
Snapshot
*
snapshot
=
db_
->
GetSnapshot
();
ReadOptions
readoptionscopy
=
readoptions
;
readoptionscopy
.
snapshot
=
snapshot
;
std
::
string
value0
;
s
=
db_
->
Get
(
readoptionscopy
,
column_families_
[
rand_column_families
[
0
]],
key
,
&
value0
);
if
(
s
.
ok
()
||
s
.
IsNotFound
())
{
bool
found
=
s
.
ok
();
for
(
size_t
i
=
1
;
i
<
rand_column_families
.
size
();
i
++
)
{
std
::
string
value1
;
s
=
db_
->
Get
(
readoptionscopy
,
column_families_
[
rand_column_families
[
i
]],
key
,
&
value1
);
if
(
!
s
.
ok
()
&&
!
s
.
IsNotFound
())
{
break
;
}
if
(
!
found
&&
s
.
ok
())
{
fprintf
(
stderr
,
"Get() return different results with key %s
\n
"
,
key_str
.
c_str
());
fprintf
(
stderr
,
"CF %s is not found
\n
"
,
column_family_names_
[
0
].
c_str
());
fprintf
(
stderr
,
"CF %s returns value %s
\n
"
,
column_family_names_
[
i
].
c_str
(),
value1
.
c_str
());
is_consistent
=
false
;
}
else
if
(
found
&&
s
.
IsNotFound
())
{
fprintf
(
stderr
,
"Get() return different results with key %s
\n
"
,
key_str
.
c_str
());
fprintf
(
stderr
,
"CF %s returns value %s
\n
"
,
column_family_names_
[
0
].
c_str
(),
value0
.
c_str
());
fprintf
(
stderr
,
"CF %s is not found
\n
"
,
column_family_names_
[
i
].
c_str
());
is_consistent
=
false
;
}
else
if
(
s
.
ok
()
&&
value0
!=
value1
)
{
fprintf
(
stderr
,
"Get() return different results with key %s
\n
"
,
key_str
.
c_str
());
fprintf
(
stderr
,
"CF %s returns value %s
\n
"
,
column_family_names_
[
0
].
c_str
(),
value0
.
c_str
());
fprintf
(
stderr
,
"CF %s returns value %s
\n
"
,
column_family_names_
[
i
].
c_str
(),
value1
.
c_str
());
is_consistent
=
false
;
}
if
(
!
is_consistent
)
{
break
;
}
}
}
db_
->
ReleaseSnapshot
(
snapshot
);
}
if
(
!
is_consistent
)
{
thread
->
stats
.
AddErrors
(
1
);
// Fail fast to preserve the DB state.
thread
->
shared
->
SetVerificationFailure
();
}
else
if
(
s
.
ok
())
{
thread
->
stats
.
AddGets
(
1
,
1
);
thread
->
stats
.
AddGets
(
1
,
1
);
}
else
if
(
s
.
IsNotFound
())
{
}
else
if
(
s
.
IsNotFound
())
{
thread
->
stats
.
AddGets
(
1
,
0
);
thread
->
stats
.
AddGets
(
1
,
0
);
...
@@ -4493,7 +4574,7 @@ int main(int argc, char** argv) {
...
@@ -4493,7 +4574,7 @@ int main(int argc, char** argv) {
"Error: clear_column_family_one_in must be 0 when using backup
\n
"
);
"Error: clear_column_family_one_in must be 0 when using backup
\n
"
);
exit
(
1
);
exit
(
1
);
}
}
if
(
FLAGS_test_
atomic_flush
)
{
if
(
FLAGS_test_
cf_consistency
&&
FLAGS_disable_wal
)
{
FLAGS_atomic_flush
=
true
;
FLAGS_atomic_flush
=
true
;
}
}
if
(
FLAGS_read_only
)
{
if
(
FLAGS_read_only
)
{
...
@@ -4539,8 +4620,8 @@ int main(int argc, char** argv) {
...
@@ -4539,8 +4620,8 @@ int main(int argc, char** argv) {
rocksdb_kill_prefix_blacklist
=
SplitString
(
FLAGS_kill_prefix_blacklist
);
rocksdb_kill_prefix_blacklist
=
SplitString
(
FLAGS_kill_prefix_blacklist
);
std
::
unique_ptr
<
rocksdb
::
StressTest
>
stress
;
std
::
unique_ptr
<
rocksdb
::
StressTest
>
stress
;
if
(
FLAGS_test_
atomic_flush
)
{
if
(
FLAGS_test_
cf_consistency
)
{
stress
.
reset
(
new
rocksdb
::
AtomicFlush
StressTest
());
stress
.
reset
(
new
rocksdb
::
CfConsistency
StressTest
());
}
else
if
(
FLAGS_test_batches_snapshots
)
{
}
else
if
(
FLAGS_test_batches_snapshots
)
{
stress
.
reset
(
new
rocksdb
::
BatchedOpsStressTest
());
stress
.
reset
(
new
rocksdb
::
BatchedOpsStressTest
());
}
else
{
}
else
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment