/Users/deen/code/yugabyte-db/src/yb/tablet/verifyrows-tablet-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include <memory> |
34 | | |
35 | | #include <gtest/gtest.h> |
36 | | |
37 | | #include "yb/common/ql_expr.h" |
38 | | #include "yb/common/ql_protocol_util.h" |
39 | | |
40 | | #include "yb/docdb/ql_rowwise_iterator_interface.h" |
41 | | |
42 | | #include "yb/gutil/macros.h" |
43 | | #include "yb/gutil/strings/substitute.h" |
44 | | |
45 | | #include "yb/tablet/local_tablet_writer.h" |
46 | | #include "yb/tablet/tablet-test-base.h" |
47 | | #include "yb/tablet/tablet.h" |
48 | | |
49 | | #include "yb/util/countdown_latch.h" |
50 | | #include "yb/util/status_log.h" |
51 | | #include "yb/util/test_graph.h" |
52 | | #include "yb/util/thread.h" |
53 | | |
54 | | DEFINE_int32(num_counter_threads, 8, "Number of counting threads to launch"); |
55 | | DEFINE_int32(num_summer_threads, 1, "Number of summing threads to launch"); |
56 | | DEFINE_int32(num_slowreader_threads, 1, "Number of 'slow' reader threads to launch"); |
57 | | DEFINE_int32(inserts_per_thread, 1000, "Number of rows inserted by the inserter thread"); |
58 | | |
59 | | using std::shared_ptr; |
60 | | |
61 | | namespace yb { |
62 | | namespace tablet { |
63 | | |
64 | | // We use only one thread for now as each thread picks an OpId via a WriteTnxState for write and |
65 | | // could reach rocksdb::MemTable::SetLastOpId() async'ly causing out of order assertion. |
66 | | // There could be multiple threads, as long as they get OpId's assigned in order. |
67 | | const int kNumInsertThreads = 1; |
68 | | |
69 | | template<class SETUP> |
70 | | class VerifyRowsTabletTest : public TabletTestBase<SETUP> { |
71 | | // Import some names from superclass, since C++ is stingy about |
72 | | // letting us refer to the members otherwise. |
73 | | typedef TabletTestBase<SETUP> superclass; |
74 | | using superclass::schema_; |
75 | | using superclass::client_schema_; |
76 | | using superclass::tablet; |
77 | | using superclass::setup_; |
78 | | public: |
79 | 6 | virtual void SetUp() { |
80 | 6 | superclass::SetUp(); |
81 | | |
82 | | // Warm up code cache with all the projections we'll be using. |
83 | 6 | ASSERT_OK(tablet()->NewRowIterator(client_schema_)); |
84 | 6 | const SchemaPtr schema = tablet()->schema(); |
85 | 6 | ColumnSchema valcol = schema->column(schema->find_column("val")); |
86 | 6 | valcol_projection_ = Schema({ valcol }, 0); |
87 | 6 | ASSERT_OK(tablet()->NewRowIterator(valcol_projection_)); |
88 | | |
89 | 6 | ts_collector_.StartDumperThread(); |
90 | 6 | } _ZN2yb6tablet20VerifyRowsTabletTestINS0_18StringKeyTestSetupEE5SetUpEv Line | Count | Source | 79 | 1 | virtual void SetUp() { | 80 | 1 | superclass::SetUp(); | 81 | | | 82 | | // Warm up code cache with all the projections we'll be using. | 83 | 1 | ASSERT_OK(tablet()->NewRowIterator(client_schema_)); | 84 | 1 | const SchemaPtr schema = tablet()->schema(); | 85 | 1 | ColumnSchema valcol = schema->column(schema->find_column("val")); | 86 | 1 | valcol_projection_ = Schema({ valcol }, 0); | 87 | 1 | ASSERT_OK(tablet()->NewRowIterator(valcol_projection_)); | 88 | | | 89 | 1 | ts_collector_.StartDumperThread(); | 90 | 1 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE1EEEE5SetUpEv Line | Count | Source | 79 | 1 | virtual void SetUp() { | 80 | 1 | superclass::SetUp(); | 81 | | | 82 | | // Warm up code cache with all the projections we'll be using. | 83 | 1 | ASSERT_OK(tablet()->NewRowIterator(client_schema_)); | 84 | 1 | const SchemaPtr schema = tablet()->schema(); | 85 | 1 | ColumnSchema valcol = schema->column(schema->find_column("val")); | 86 | 1 | valcol_projection_ = Schema({ valcol }, 0); | 87 | 1 | ASSERT_OK(tablet()->NewRowIterator(valcol_projection_)); | 88 | | | 89 | 1 | ts_collector_.StartDumperThread(); | 90 | 1 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE2EEEE5SetUpEv Line | Count | Source | 79 | 1 | virtual void SetUp() { | 80 | 1 | superclass::SetUp(); | 81 | | | 82 | | // Warm up code cache with all the projections we'll be using. | 83 | 1 | ASSERT_OK(tablet()->NewRowIterator(client_schema_)); | 84 | 1 | const SchemaPtr schema = tablet()->schema(); | 85 | 1 | ColumnSchema valcol = schema->column(schema->find_column("val")); | 86 | 1 | valcol_projection_ = Schema({ valcol }, 0); | 87 | 1 | ASSERT_OK(tablet()->NewRowIterator(valcol_projection_)); | 88 | | | 89 | 1 | ts_collector_.StartDumperThread(); | 90 | 1 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE3EEEE5SetUpEv Line | Count | Source | 79 | 1 | virtual void SetUp() { | 80 | 1 | superclass::SetUp(); | 81 | | | 82 | | // Warm up code cache with all the projections we'll be using. | 83 | 1 | ASSERT_OK(tablet()->NewRowIterator(client_schema_)); | 84 | 1 | const SchemaPtr schema = tablet()->schema(); | 85 | 1 | ColumnSchema valcol = schema->column(schema->find_column("val")); | 86 | 1 | valcol_projection_ = Schema({ valcol }, 0); | 87 | 1 | ASSERT_OK(tablet()->NewRowIterator(valcol_projection_)); | 88 | | | 89 | 1 | ts_collector_.StartDumperThread(); | 90 | 1 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE4EEEE5SetUpEv Line | Count | Source | 79 | 1 | virtual void SetUp() { | 80 | 1 | superclass::SetUp(); | 81 | | | 82 | | // Warm up code cache with all the projections we'll be using. | 83 | 1 | ASSERT_OK(tablet()->NewRowIterator(client_schema_)); | 84 | 1 | const SchemaPtr schema = tablet()->schema(); | 85 | 1 | ColumnSchema valcol = schema->column(schema->find_column("val")); | 86 | 1 | valcol_projection_ = Schema({ valcol }, 0); | 87 | 1 | ASSERT_OK(tablet()->NewRowIterator(valcol_projection_)); | 88 | | | 89 | 1 | ts_collector_.StartDumperThread(); | 90 | 1 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_22NullableValueTestSetupEE5SetUpEv Line | Count | Source | 79 | 1 | virtual void SetUp() { | 80 | 1 | superclass::SetUp(); | 81 | | | 82 | | // Warm up code cache with all the projections we'll be using. | 83 | 1 | ASSERT_OK(tablet()->NewRowIterator(client_schema_)); | 84 | 1 | const SchemaPtr schema = tablet()->schema(); | 85 | 1 | ColumnSchema valcol = schema->column(schema->find_column("val")); | 86 | 1 | valcol_projection_ = Schema({ valcol }, 0); | 87 | 1 | ASSERT_OK(tablet()->NewRowIterator(valcol_projection_)); | 88 | | | 89 | 1 | ts_collector_.StartDumperThread(); | 90 | 1 | } |
|
91 | | |
92 | | VerifyRowsTabletTest() |
93 | | : running_insert_count_(kNumInsertThreads), |
94 | 6 | ts_collector_(::testing::UnitTest::GetInstance()->current_test_info()->test_case_name()) { |
95 | 6 | } _ZN2yb6tablet20VerifyRowsTabletTestINS0_18StringKeyTestSetupEEC2Ev Line | Count | Source | 94 | 1 | ts_collector_(::testing::UnitTest::GetInstance()->current_test_info()->test_case_name()) { | 95 | 1 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE1EEEEC2Ev Line | Count | Source | 94 | 1 | ts_collector_(::testing::UnitTest::GetInstance()->current_test_info()->test_case_name()) { | 95 | 1 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE2EEEEC2Ev Line | Count | Source | 94 | 1 | ts_collector_(::testing::UnitTest::GetInstance()->current_test_info()->test_case_name()) { | 95 | 1 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE3EEEEC2Ev Line | Count | Source | 94 | 1 | ts_collector_(::testing::UnitTest::GetInstance()->current_test_info()->test_case_name()) { | 95 | 1 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE4EEEEC2Ev Line | Count | Source | 94 | 1 | ts_collector_(::testing::UnitTest::GetInstance()->current_test_info()->test_case_name()) { | 95 | 1 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_22NullableValueTestSetupEEC2Ev Line | Count | Source | 94 | 1 | ts_collector_(::testing::UnitTest::GetInstance()->current_test_info()->test_case_name()) { | 95 | 1 | } |
|
96 | | |
97 | 6 | void InsertThread(int tid) { |
98 | 6 | CountDownOnScopeExit dec_count(&running_insert_count_); |
99 | 6 | shared_ptr<TimeSeries> inserts = ts_collector_.GetTimeSeries("inserted"); |
100 | | |
101 | 6 | int32_t max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads) |
102 | 6 | / kNumInsertThreads; |
103 | | |
104 | 6 | if (max_rows < FLAGS_inserts_per_thread) { |
105 | 1 | LOG(WARNING) << "Clamping the inserts per thread to " << max_rows << " to prevent overflow"; |
106 | 1 | } |
107 | | |
108 | 6 | this->InsertTestRows(tid * max_rows, max_rows, 0, inserts.get()); |
109 | 6 | } _ZN2yb6tablet20VerifyRowsTabletTestINS0_18StringKeyTestSetupEE12InsertThreadEi Line | Count | Source | 97 | 1 | void InsertThread(int tid) { | 98 | 1 | CountDownOnScopeExit dec_count(&running_insert_count_); | 99 | 1 | shared_ptr<TimeSeries> inserts = ts_collector_.GetTimeSeries("inserted"); | 100 | | | 101 | 1 | int32_t max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads) | 102 | 1 | / kNumInsertThreads; | 103 | | | 104 | 1 | if (max_rows < FLAGS_inserts_per_thread) { | 105 | 0 | LOG(WARNING) << "Clamping the inserts per thread to " << max_rows << " to prevent overflow"; | 106 | 0 | } | 107 | | | 108 | 1 | this->InsertTestRows(tid * max_rows, max_rows, 0, inserts.get()); | 109 | 1 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE1EEEE12InsertThreadEi Line | Count | Source | 97 | 1 | void InsertThread(int tid) { | 98 | 1 | CountDownOnScopeExit dec_count(&running_insert_count_); | 99 | 1 | shared_ptr<TimeSeries> inserts = ts_collector_.GetTimeSeries("inserted"); | 100 | | | 101 | 1 | int32_t max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads) | 102 | 1 | / kNumInsertThreads; | 103 | | | 104 | 1 | if (max_rows < FLAGS_inserts_per_thread) { | 105 | 1 | LOG(WARNING) << "Clamping the inserts per thread to " << max_rows << " to prevent overflow"; | 106 | 1 | } | 107 | | | 108 | 1 | this->InsertTestRows(tid * max_rows, max_rows, 0, inserts.get()); | 109 | 1 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE2EEEE12InsertThreadEi Line | Count | Source | 97 | 1 | void InsertThread(int tid) { | 98 | 1 | CountDownOnScopeExit dec_count(&running_insert_count_); | 99 | 1 | shared_ptr<TimeSeries> inserts = ts_collector_.GetTimeSeries("inserted"); | 100 | | | 101 | 1 | int32_t max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads) | 102 | 1 | / kNumInsertThreads; | 103 | | | 104 | 1 | if (max_rows < FLAGS_inserts_per_thread) { | 105 | 0 | LOG(WARNING) << "Clamping the inserts per thread to " << max_rows << " to prevent overflow"; | 106 | 0 | } | 107 | | | 108 | 1 | this->InsertTestRows(tid * max_rows, max_rows, 0, inserts.get()); | 109 | 1 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE3EEEE12InsertThreadEi Line | Count | Source | 97 | 1 | void InsertThread(int tid) { | 98 | 1 | CountDownOnScopeExit dec_count(&running_insert_count_); | 99 | 1 | shared_ptr<TimeSeries> inserts = ts_collector_.GetTimeSeries("inserted"); | 100 | | | 101 | 1 | int32_t max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads) | 102 | 1 | / kNumInsertThreads; | 103 | | | 104 | 1 | if (max_rows < FLAGS_inserts_per_thread) { | 105 | 0 | LOG(WARNING) << "Clamping the inserts per thread to " << max_rows << " to prevent overflow"; | 106 | 0 | } | 107 | | | 108 | 1 | this->InsertTestRows(tid * max_rows, max_rows, 0, inserts.get()); | 109 | 1 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE4EEEE12InsertThreadEi Line | Count | Source | 97 | 1 | void InsertThread(int tid) { | 98 | 1 | CountDownOnScopeExit dec_count(&running_insert_count_); | 99 | 1 | shared_ptr<TimeSeries> inserts = ts_collector_.GetTimeSeries("inserted"); | 100 | | | 101 | 1 | int32_t max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads) | 102 | 1 | / kNumInsertThreads; | 103 | | | 104 | 1 | if (max_rows < FLAGS_inserts_per_thread) { | 105 | 0 | LOG(WARNING) << "Clamping the inserts per thread to " << max_rows << " to prevent overflow"; | 106 | 0 | } | 107 | | | 108 | 1 | this->InsertTestRows(tid * max_rows, max_rows, 0, inserts.get()); | 109 | 1 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_22NullableValueTestSetupEE12InsertThreadEi Line | Count | Source | 97 | 1 | void InsertThread(int tid) { | 98 | 1 | CountDownOnScopeExit dec_count(&running_insert_count_); | 99 | 1 | shared_ptr<TimeSeries> inserts = ts_collector_.GetTimeSeries("inserted"); | 100 | | | 101 | 1 | int32_t max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads) | 102 | 1 | / kNumInsertThreads; | 103 | | | 104 | 1 | if (max_rows < FLAGS_inserts_per_thread) { | 105 | 0 | LOG(WARNING) << "Clamping the inserts per thread to " << max_rows << " to prevent overflow"; | 106 | 0 | } | 107 | | | 108 | 1 | this->InsertTestRows(tid * max_rows, max_rows, 0, inserts.get()); | 109 | 1 | } |
|
110 | | |
111 | | void UpdateThread(int tid) { |
112 | | const Schema &schema = schema_; |
113 | | |
114 | | shared_ptr<TimeSeries> updates = ts_collector_.GetTimeSeries("updated"); |
115 | | |
116 | | LocalTabletWriter writer(this->tablet().get(), &this->client_schema_); |
117 | | |
118 | | faststring update_buf; |
119 | | |
120 | | uint64_t updates_since_last_report = 0; |
121 | | int col_idx = schema.num_key_columns() == 1 ? 2 : 3; |
122 | | LOG(INFO) << "Update thread using schema: " << schema.ToString(); |
123 | | |
124 | | YBPartialRow row(&client_schema_); |
125 | | |
126 | | QLTableRow value_map; |
127 | | |
128 | | while (running_insert_count_.count() > 0) { |
129 | | auto iter = tablet()->NewRowIterator(client_schema_); |
130 | | CHECK_OK(iter); |
131 | | |
132 | | while (ASSERT_RESULT((**iter).HasNext()) && running_insert_count_.count() > 0) { |
133 | | CHECK_OK((**iter).NextRow(&value_map)); |
134 | | |
135 | | unsigned int seed = 1234; |
136 | | if (rand_r(&seed) % 10 == 7) { |
137 | | // Increment the "val" |
138 | | |
139 | | QLValue value; |
140 | | CHECK_OK(value_map.GetValue(schema.column_id(col_idx), &value)); |
141 | | int32_t new_val = value.int32_value() + 1; |
142 | | |
143 | | // Rebuild the key by extracting the cells from the row |
144 | | QLWriteRequestPB req; |
145 | | setup_.BuildRowKeyFromExistingRow(&req, value_map); |
146 | | QLAddInt32ColumnValue(&req, kFirstColumnId + col_idx, new_val); |
147 | | CHECK_OK(writer.Write(&req)); |
148 | | |
149 | | if (++updates_since_last_report >= 10) { |
150 | | updates->AddValue(updates_since_last_report); |
151 | | updates_since_last_report = 0; |
152 | | } |
153 | | } |
154 | | } |
155 | | } |
156 | | } |
157 | | |
158 | | // Thread which iterates slowly over the first 10% of the data. |
159 | | // This is meant to test that outstanding iterators don't end up |
160 | | // trying to reference already-freed memory. |
161 | 6 | void SlowReaderThread(int tid) { |
162 | 6 | QLTableRow row; |
163 | | |
164 | 6 | auto max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads) |
165 | 6 | / kNumInsertThreads; |
166 | | |
167 | 6 | int max_iters = kNumInsertThreads * max_rows / 10; |
168 | | |
169 | 53 | while (running_insert_count_.count() > 0) { |
170 | 53 | auto iter = tablet()->NewRowIterator(client_schema_); |
171 | 53 | ASSERT_OK(iter); |
172 | | |
173 | 368 | for (int i = 0; i < max_iters && ASSERT_RESULT((**iter).HasNext()); i++) { |
174 | 321 | ASSERT_OK((**iter).NextRow(&row)); |
175 | | |
176 | 321 | if (running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(1))) { |
177 | 6 | return; |
178 | 6 | } |
179 | 321 | } |
180 | 53 | } |
181 | 6 | } _ZN2yb6tablet20VerifyRowsTabletTestINS0_18StringKeyTestSetupEE16SlowReaderThreadEi Line | Count | Source | 161 | 1 | void SlowReaderThread(int tid) { | 162 | 1 | QLTableRow row; | 163 | | | 164 | 1 | auto max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads) | 165 | 1 | / kNumInsertThreads; | 166 | | | 167 | 1 | int max_iters = kNumInsertThreads * max_rows / 10; | 168 | | | 169 | 12 | while (running_insert_count_.count() > 0) { | 170 | 12 | auto iter = tablet()->NewRowIterator(client_schema_); | 171 | 12 | ASSERT_OK(iter); | 172 | | | 173 | 77 | for (int i = 0; i < max_iters && ASSERT_RESULT((**iter).HasNext()); i++) { | 174 | 66 | ASSERT_OK((**iter).NextRow(&row)); | 175 | | | 176 | 66 | if (running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(1))) { | 177 | 1 | return; | 178 | 1 | } | 179 | 66 | } | 180 | 12 | } | 181 | 1 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE1EEEE16SlowReaderThreadEi Line | Count | Source | 161 | 1 | void SlowReaderThread(int tid) { | 162 | 1 | QLTableRow row; | 163 | | | 164 | 1 | auto max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads) | 165 | 1 | / kNumInsertThreads; | 166 | | | 167 | 1 | int max_iters = kNumInsertThreads * max_rows / 10; | 168 | | | 169 | 9 | while (running_insert_count_.count() > 0) { | 170 | 9 | auto iter = tablet()->NewRowIterator(client_schema_); | 171 | 9 | ASSERT_OK(iter); | 172 | | | 173 | 17 | for (int i = 0; i < max_iters && ASSERT_RESULT((**iter).HasNext()); i++) { | 174 | 9 | ASSERT_OK((**iter).NextRow(&row)); | 175 | | | 176 | 9 | if (running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(1))) { | 177 | 1 | return; | 178 | 1 | } | 179 | 9 | } | 180 | 9 | } | 181 | 1 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE2EEEE16SlowReaderThreadEi Line | Count | Source | 161 | 1 | void SlowReaderThread(int tid) { | 162 | 1 | QLTableRow row; | 163 | | | 164 | 1 | auto max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads) | 165 | 1 | / kNumInsertThreads; | 166 | | | 167 | 1 | int max_iters = kNumInsertThreads * max_rows / 10; | 168 | | | 169 | 10 | while (running_insert_count_.count() > 0) { | 170 | 10 | auto iter = tablet()->NewRowIterator(client_schema_); | 171 | 10 | ASSERT_OK(iter); | 172 | | | 173 | 72 | for (int i = 0; i < max_iters && ASSERT_RESULT((**iter).HasNext()); i++) { | 174 | 63 | ASSERT_OK((**iter).NextRow(&row)); | 175 | | | 176 | 63 | if (running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(1))) { | 177 | 1 | return; | 178 | 1 | } | 179 | 63 | } | 180 | 10 | } | 181 | 1 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE3EEEE16SlowReaderThreadEi Line | Count | Source | 161 | 1 | void SlowReaderThread(int tid) { | 162 | 1 | QLTableRow row; | 163 | | | 164 | 1 | auto max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads) | 165 | 1 | / kNumInsertThreads; | 166 | | | 167 | 1 | int max_iters = kNumInsertThreads * max_rows / 10; | 168 | | | 169 | 2 | while (running_insert_count_.count() > 0) { | 170 | 2 | auto iter = tablet()->NewRowIterator(client_schema_); | 171 | 2 | ASSERT_OK(iter); | 172 | | | 173 | 66 | for (int i = 0; i < max_iters && ASSERT_RESULT((**iter).HasNext()); i++) { | 174 | 65 | ASSERT_OK((**iter).NextRow(&row)); | 175 | | | 176 | 65 | if (running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(1))) { | 177 | 1 | return; | 178 | 1 | } | 179 | 65 | } | 180 | 2 | } | 181 | 1 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE4EEEE16SlowReaderThreadEi Line | Count | Source | 161 | 1 | void SlowReaderThread(int tid) { | 162 | 1 | QLTableRow row; | 163 | | | 164 | 1 | auto max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads) | 165 | 1 | / kNumInsertThreads; | 166 | | | 167 | 1 | int max_iters = kNumInsertThreads * max_rows / 10; | 168 | | | 169 | 9 | while (running_insert_count_.count() > 0) { | 170 | 9 | auto iter = tablet()->NewRowIterator(client_schema_); | 171 | 9 | ASSERT_OK(iter); | 172 | | | 173 | 66 | for (int i = 0; i < max_iters && ASSERT_RESULT((**iter).HasNext()); i++) { | 174 | 58 | ASSERT_OK((**iter).NextRow(&row)); | 175 | | | 176 | 58 | if (running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(1))) { | 177 | 1 | return; | 178 | 1 | } | 179 | 58 | } | 180 | 9 | } | 181 | 1 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_22NullableValueTestSetupEE16SlowReaderThreadEi Line | Count | Source | 161 | 1 | void SlowReaderThread(int tid) { | 162 | 1 | QLTableRow row; | 163 | | | 164 | 1 | auto max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads) | 165 | 1 | / kNumInsertThreads; | 166 | | | 167 | 1 | int max_iters = kNumInsertThreads * max_rows / 10; | 168 | | | 169 | 11 | while (running_insert_count_.count() > 0) { | 170 | 11 | auto iter = tablet()->NewRowIterator(client_schema_); | 171 | 11 | ASSERT_OK(iter); | 172 | | | 173 | 70 | for (int i = 0; i < max_iters && ASSERT_RESULT((**iter).HasNext()); i++) { | 174 | 60 | ASSERT_OK((**iter).NextRow(&row)); | 175 | | | 176 | 60 | if (running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(1))) { | 177 | 1 | return; | 178 | 1 | } | 179 | 60 | } | 180 | 11 | } | 181 | 1 | } |
|
182 | | |
183 | 6 | void SummerThread(int tid) { |
184 | 6 | shared_ptr<TimeSeries> scanned_ts = ts_collector_.GetTimeSeries("scanned"); |
185 | | |
186 | 164 | while (running_insert_count_.count() > 0) { |
187 | 158 | CountSum(scanned_ts); |
188 | 158 | } |
189 | 6 | } _ZN2yb6tablet20VerifyRowsTabletTestINS0_18StringKeyTestSetupEE12SummerThreadEi Line | Count | Source | 183 | 1 | void SummerThread(int tid) { | 184 | 1 | shared_ptr<TimeSeries> scanned_ts = ts_collector_.GetTimeSeries("scanned"); | 185 | | | 186 | 34 | while (running_insert_count_.count() > 0) { | 187 | 33 | CountSum(scanned_ts); | 188 | 33 | } | 189 | 1 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE1EEEE12SummerThreadEi Line | Count | Source | 183 | 1 | void SummerThread(int tid) { | 184 | 1 | shared_ptr<TimeSeries> scanned_ts = ts_collector_.GetTimeSeries("scanned"); | 185 | | | 186 | 21 | while (running_insert_count_.count() > 0) { | 187 | 20 | CountSum(scanned_ts); | 188 | 20 | } | 189 | 1 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE2EEEE12SummerThreadEi Line | Count | Source | 183 | 1 | void SummerThread(int tid) { | 184 | 1 | shared_ptr<TimeSeries> scanned_ts = ts_collector_.GetTimeSeries("scanned"); | 185 | | | 186 | 30 | while (running_insert_count_.count() > 0) { | 187 | 29 | CountSum(scanned_ts); | 188 | 29 | } | 189 | 1 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE3EEEE12SummerThreadEi Line | Count | Source | 183 | 1 | void SummerThread(int tid) { | 184 | 1 | shared_ptr<TimeSeries> scanned_ts = ts_collector_.GetTimeSeries("scanned"); | 185 | | | 186 | 14 | while (running_insert_count_.count() > 0) { | 187 | 13 | CountSum(scanned_ts); | 188 | 13 | } | 189 | 1 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE4EEEE12SummerThreadEi Line | Count | Source | 183 | 1 | void SummerThread(int tid) { | 184 | 1 | shared_ptr<TimeSeries> scanned_ts = ts_collector_.GetTimeSeries("scanned"); | 185 | | | 186 | 31 | while (running_insert_count_.count() > 0) { | 187 | 30 | CountSum(scanned_ts); | 188 | 30 | } | 189 | 1 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_22NullableValueTestSetupEE12SummerThreadEi Line | Count | Source | 183 | 1 | void SummerThread(int tid) { | 184 | 1 | shared_ptr<TimeSeries> scanned_ts = ts_collector_.GetTimeSeries("scanned"); | 185 | | | 186 | 34 | while (running_insert_count_.count() > 0) { | 187 | 33 | CountSum(scanned_ts); | 188 | 33 | } | 189 | 1 | } |
|
190 | | |
191 | 164 | uint64_t CountSum(const shared_ptr<TimeSeries> &scanned_ts) { |
192 | 164 | uint64_t count_since_report = 0; |
193 | | |
194 | 164 | uint64_t sum = 0; |
195 | | |
196 | 164 | auto iter = tablet()->NewRowIterator(valcol_projection_); |
197 | 164 | CHECK_OK(iter); |
198 | | |
199 | 164 | QLTableRow row; |
200 | 20.9k | while (CHECK_RESULT((**iter).HasNext())) { |
201 | 20.9k | CHECK_OK((**iter).NextRow(&row)); |
202 | | |
203 | 20.9k | QLValue value; |
204 | 20.9k | CHECK_OK(row.GetValue(schema_.column_id(2), &value)); |
205 | 20.9k | if (!value.IsNull()) { |
206 | 0 | CHECK(value.value().has_int32_value()) << "Row: " << row.ToString(); |
207 | 19.0k | sum += value.int32_value(); |
208 | 19.0k | } |
209 | 20.9k | count_since_report += 1; |
210 | | |
211 | | // Report metrics if enough time has passed |
212 | 20.9k | if (count_since_report > 100) { |
213 | 169 | if (scanned_ts.get()) { |
214 | 123 | scanned_ts->AddValue(count_since_report); |
215 | 123 | } |
216 | 169 | count_since_report = 0; |
217 | 169 | } |
218 | 20.9k | } |
219 | | |
220 | 164 | if (scanned_ts.get()) { |
221 | 158 | scanned_ts->AddValue(count_since_report); |
222 | 158 | } |
223 | | |
224 | 164 | return sum; |
225 | 164 | } _ZN2yb6tablet20VerifyRowsTabletTestINS0_18StringKeyTestSetupEE8CountSumERKNSt3__110shared_ptrINS_10TimeSeriesEEE Line | Count | Source | 191 | 34 | uint64_t CountSum(const shared_ptr<TimeSeries> &scanned_ts) { | 192 | 34 | uint64_t count_since_report = 0; | 193 | | | 194 | 34 | uint64_t sum = 0; | 195 | | | 196 | 34 | auto iter = tablet()->NewRowIterator(valcol_projection_); | 197 | 34 | CHECK_OK(iter); | 198 | | | 199 | 34 | QLTableRow row; | 200 | 4.34k | while (CHECK_RESULT((**iter).HasNext())) { | 201 | 4.34k | CHECK_OK((**iter).NextRow(&row)); | 202 | | | 203 | 4.34k | QLValue value; | 204 | 4.34k | CHECK_OK(row.GetValue(schema_.column_id(2), &value)); | 205 | 4.34k | if (!value.IsNull()) { | 206 | 0 | CHECK(value.value().has_int32_value()) << "Row: " << row.ToString(); | 207 | 4.34k | sum += value.int32_value(); | 208 | 4.34k | } | 209 | 4.34k | count_since_report += 1; | 210 | | | 211 | | // Report metrics if enough time has passed | 212 | 4.34k | if (count_since_report > 100) { | 213 | 35 | if (scanned_ts.get()) { | 214 | 26 | scanned_ts->AddValue(count_since_report); | 215 | 26 | } | 216 | 35 | count_since_report = 0; | 217 | 35 | } | 218 | 4.34k | } | 219 | | | 220 | 34 | if (scanned_ts.get()) { | 221 | 33 | scanned_ts->AddValue(count_since_report); | 222 | 33 | } | 223 | | | 224 | 34 | return sum; | 225 | 34 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE1EEEE8CountSumERKNSt3__110shared_ptrINS_10TimeSeriesEEE Line | Count | Source | 191 | 21 | uint64_t CountSum(const shared_ptr<TimeSeries> &scanned_ts) { | 192 | 21 | uint64_t count_since_report = 0; | 193 | | | 194 | 21 | uint64_t sum = 0; | 195 | | | 196 | 21 | auto iter = tablet()->NewRowIterator(valcol_projection_); | 197 | 21 | CHECK_OK(iter); | 198 | | | 199 | 21 | QLTableRow row; | 200 | 524 | while (CHECK_RESULT((**iter).HasNext())) { | 201 | 524 | CHECK_OK((**iter).NextRow(&row)); | 202 | | | 203 | 524 | QLValue value; | 204 | 524 | CHECK_OK(row.GetValue(schema_.column_id(2), &value)); | 205 | 524 | if (!value.IsNull()) { | 206 | 0 | CHECK(value.value().has_int32_value()) << "Row: " << row.ToString(); | 207 | 524 | sum += value.int32_value(); | 208 | 524 | } | 209 | 524 | count_since_report += 1; | 210 | | | 211 | | // Report metrics if enough time has passed | 212 | 524 | if (count_since_report > 100) { | 213 | 2 | if (scanned_ts.get()) { | 214 | 1 | scanned_ts->AddValue(count_since_report); | 215 | 1 | } | 216 | 2 | count_since_report = 0; | 217 | 2 | } | 218 | 524 | } | 219 | | | 220 | 21 | if (scanned_ts.get()) { | 221 | 20 | scanned_ts->AddValue(count_since_report); | 222 | 20 | } | 223 | | | 224 | 21 | return sum; | 225 | 21 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE2EEEE8CountSumERKNSt3__110shared_ptrINS_10TimeSeriesEEE Line | Count | Source | 191 | 30 | uint64_t CountSum(const shared_ptr<TimeSeries> &scanned_ts) { | 192 | 30 | uint64_t count_since_report = 0; | 193 | | | 194 | 30 | uint64_t sum = 0; | 195 | | | 196 | 30 | auto iter = tablet()->NewRowIterator(valcol_projection_); | 197 | 30 | CHECK_OK(iter); | 198 | | | 199 | 30 | QLTableRow row; | 200 | 4.33k | while (CHECK_RESULT((**iter).HasNext())) { | 201 | 4.33k | CHECK_OK((**iter).NextRow(&row)); | 202 | | | 203 | 4.33k | QLValue value; | 204 | 4.33k | CHECK_OK(row.GetValue(schema_.column_id(2), &value)); | 205 | 4.33k | if (!value.IsNull()) { | 206 | 0 | CHECK(value.value().has_int32_value()) << "Row: " << row.ToString(); | 207 | 4.33k | sum += value.int32_value(); | 208 | 4.33k | } | 209 | 4.33k | count_since_report += 1; | 210 | | | 211 | | // Report metrics if enough time has passed | 212 | 4.33k | if (count_since_report > 100) { | 213 | 35 | if (scanned_ts.get()) { | 214 | 26 | scanned_ts->AddValue(count_since_report); | 215 | 26 | } | 216 | 35 | count_since_report = 0; | 217 | 35 | } | 218 | 4.33k | } | 219 | | | 220 | 30 | if (scanned_ts.get()) { | 221 | 29 | scanned_ts->AddValue(count_since_report); | 222 | 29 | } | 223 | | | 224 | 30 | return sum; | 225 | 30 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE3EEEE8CountSumERKNSt3__110shared_ptrINS_10TimeSeriesEEE Line | Count | Source | 191 | 14 | uint64_t CountSum(const shared_ptr<TimeSeries> &scanned_ts) { | 192 | 14 | uint64_t count_since_report = 0; | 193 | | | 194 | 14 | uint64_t sum = 0; | 195 | | | 196 | 14 | auto iter = tablet()->NewRowIterator(valcol_projection_); | 197 | 14 | CHECK_OK(iter); | 198 | | | 199 | 14 | QLTableRow row; | 200 | 3.78k | while (CHECK_RESULT((**iter).HasNext())) { | 201 | 3.78k | CHECK_OK((**iter).NextRow(&row)); | 202 | | | 203 | 3.78k | QLValue value; | 204 | 3.78k | CHECK_OK(row.GetValue(schema_.column_id(2), &value)); | 205 | 3.78k | if (!value.IsNull()) { | 206 | 0 | CHECK(value.value().has_int32_value()) << "Row: " << row.ToString(); | 207 | 3.78k | sum += value.int32_value(); | 208 | 3.78k | } | 209 | 3.78k | count_since_report += 1; | 210 | | | 211 | | // Report metrics if enough time has passed | 212 | 3.78k | if (count_since_report > 100) { | 213 | 32 | if (scanned_ts.get()) { | 214 | 23 | scanned_ts->AddValue(count_since_report); | 215 | 23 | } | 216 | 32 | count_since_report = 0; | 217 | 32 | } | 218 | 3.78k | } | 219 | | | 220 | 14 | if (scanned_ts.get()) { | 221 | 13 | scanned_ts->AddValue(count_since_report); | 222 | 13 | } | 223 | | | 224 | 14 | return sum; | 225 | 14 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE4EEEE8CountSumERKNSt3__110shared_ptrINS_10TimeSeriesEEE Line | Count | Source | 191 | 31 | uint64_t CountSum(const shared_ptr<TimeSeries> &scanned_ts) { | 192 | 31 | uint64_t count_since_report = 0; | 193 | | | 194 | 31 | uint64_t sum = 0; | 195 | | | 196 | 31 | auto iter = tablet()->NewRowIterator(valcol_projection_); | 197 | 31 | CHECK_OK(iter); | 198 | | | 199 | 31 | QLTableRow row; | 200 | 4.01k | while (CHECK_RESULT((**iter).HasNext())) { | 201 | 4.01k | CHECK_OK((**iter).NextRow(&row)); | 202 | | | 203 | 4.01k | QLValue value; | 204 | 4.01k | CHECK_OK(row.GetValue(schema_.column_id(2), &value)); | 205 | 4.01k | if (!value.IsNull()) { | 206 | 0 | CHECK(value.value().has_int32_value()) << "Row: " << row.ToString(); | 207 | 4.01k | sum += value.int32_value(); | 208 | 4.01k | } | 209 | 4.01k | count_since_report += 1; | 210 | | | 211 | | // Report metrics if enough time has passed | 212 | 4.01k | if (count_since_report > 100) { | 213 | 33 | if (scanned_ts.get()) { | 214 | 24 | scanned_ts->AddValue(count_since_report); | 215 | 24 | } | 216 | 33 | count_since_report = 0; | 217 | 33 | } | 218 | 4.01k | } | 219 | | | 220 | 31 | if (scanned_ts.get()) { | 221 | 30 | scanned_ts->AddValue(count_since_report); | 222 | 30 | } | 223 | | | 224 | 31 | return sum; | 225 | 31 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_22NullableValueTestSetupEE8CountSumERKNSt3__110shared_ptrINS_10TimeSeriesEEE Line | Count | Source | 191 | 34 | uint64_t CountSum(const shared_ptr<TimeSeries> &scanned_ts) { | 192 | 34 | uint64_t count_since_report = 0; | 193 | | | 194 | 34 | uint64_t sum = 0; | 195 | | | 196 | 34 | auto iter = tablet()->NewRowIterator(valcol_projection_); | 197 | 34 | CHECK_OK(iter); | 198 | | | 199 | 34 | QLTableRow row; | 200 | 3.98k | while (CHECK_RESULT((**iter).HasNext())) { | 201 | 3.98k | CHECK_OK((**iter).NextRow(&row)); | 202 | | | 203 | 3.98k | QLValue value; | 204 | 3.98k | CHECK_OK(row.GetValue(schema_.column_id(2), &value)); | 205 | 3.98k | if (!value.IsNull()) { | 206 | 0 | CHECK(value.value().has_int32_value()) << "Row: " << row.ToString(); | 207 | 2.00k | sum += value.int32_value(); | 208 | 2.00k | } | 209 | 3.98k | count_since_report += 1; | 210 | | | 211 | | // Report metrics if enough time has passed | 212 | 3.98k | if (count_since_report > 100) { | 213 | 32 | if (scanned_ts.get()) { | 214 | 23 | scanned_ts->AddValue(count_since_report); | 215 | 23 | } | 216 | 32 | count_since_report = 0; | 217 | 32 | } | 218 | 3.98k | } | 219 | | | 220 | 34 | if (scanned_ts.get()) { | 221 | 33 | scanned_ts->AddValue(count_since_report); | 222 | 33 | } | 223 | | | 224 | 34 | return sum; | 225 | 34 | } |
|
226 | | |
227 | | // Thread which cycles between inserting and deleting a test row, each time |
228 | | // with a different value. |
229 | 60 | void DeleteAndReinsertCycleThread(int tid) { |
230 | 60 | int32_t iteration = 0; |
231 | 60 | LocalTabletWriter writer(this->tablet().get()); |
232 | | |
233 | 60 | while (running_insert_count_.count() > 0) { |
234 | 0 | for (int i = 0; i < 100; i++) { |
235 | 0 | CHECK_OK(this->InsertTestRow(&writer, tid, iteration++)); |
236 | 0 | CHECK_OK(this->DeleteTestRow(&writer, tid)); |
237 | 0 | } |
238 | 0 | } |
239 | 60 | } _ZN2yb6tablet20VerifyRowsTabletTestINS0_18StringKeyTestSetupEE28DeleteAndReinsertCycleThreadEi Line | Count | Source | 229 | 10 | void DeleteAndReinsertCycleThread(int tid) { | 230 | 10 | int32_t iteration = 0; | 231 | 10 | LocalTabletWriter writer(this->tablet().get()); | 232 | | | 233 | 10 | while (running_insert_count_.count() > 0) { | 234 | 0 | for (int i = 0; i < 100; i++) { | 235 | 0 | CHECK_OK(this->InsertTestRow(&writer, tid, iteration++)); | 236 | 0 | CHECK_OK(this->DeleteTestRow(&writer, tid)); | 237 | 0 | } | 238 | 0 | } | 239 | 10 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE1EEEE28DeleteAndReinsertCycleThreadEi Line | Count | Source | 229 | 10 | void DeleteAndReinsertCycleThread(int tid) { | 230 | 10 | int32_t iteration = 0; | 231 | 10 | LocalTabletWriter writer(this->tablet().get()); | 232 | | | 233 | 10 | while (running_insert_count_.count() > 0) { | 234 | 0 | for (int i = 0; i < 100; i++) { | 235 | 0 | CHECK_OK(this->InsertTestRow(&writer, tid, iteration++)); | 236 | 0 | CHECK_OK(this->DeleteTestRow(&writer, tid)); | 237 | 0 | } | 238 | 0 | } | 239 | 10 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE2EEEE28DeleteAndReinsertCycleThreadEi Line | Count | Source | 229 | 10 | void DeleteAndReinsertCycleThread(int tid) { | 230 | 10 | int32_t iteration = 0; | 231 | 10 | LocalTabletWriter writer(this->tablet().get()); | 232 | | | 233 | 10 | while (running_insert_count_.count() > 0) { | 234 | 0 | for (int i = 0; i < 100; i++) { | 235 | 0 | CHECK_OK(this->InsertTestRow(&writer, tid, iteration++)); | 236 | 0 | CHECK_OK(this->DeleteTestRow(&writer, tid)); | 237 | 0 | } | 238 | 0 | } | 239 | 10 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE3EEEE28DeleteAndReinsertCycleThreadEi Line | Count | Source | 229 | 10 | void DeleteAndReinsertCycleThread(int tid) { | 230 | 10 | int32_t iteration = 0; | 231 | 10 | LocalTabletWriter writer(this->tablet().get()); | 232 | | | 233 | 10 | while (running_insert_count_.count() > 0) { | 234 | 0 | for (int i = 0; i < 100; i++) { | 235 | 0 | CHECK_OK(this->InsertTestRow(&writer, tid, iteration++)); | 236 | 0 | CHECK_OK(this->DeleteTestRow(&writer, tid)); | 237 | 0 | } | 238 | 0 | } | 239 | 10 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE4EEEE28DeleteAndReinsertCycleThreadEi Line | Count | Source | 229 | 10 | void DeleteAndReinsertCycleThread(int tid) { | 230 | 10 | int32_t iteration = 0; | 231 | 10 | LocalTabletWriter writer(this->tablet().get()); | 232 | | | 233 | 10 | while (running_insert_count_.count() > 0) { | 234 | 0 | for (int i = 0; i < 100; i++) { | 235 | 0 | CHECK_OK(this->InsertTestRow(&writer, tid, iteration++)); | 236 | 0 | CHECK_OK(this->DeleteTestRow(&writer, tid)); | 237 | 0 | } | 238 | 0 | } | 239 | 10 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_22NullableValueTestSetupEE28DeleteAndReinsertCycleThreadEi Line | Count | Source | 229 | 10 | void DeleteAndReinsertCycleThread(int tid) { | 230 | 10 | int32_t iteration = 0; | 231 | 10 | LocalTabletWriter writer(this->tablet().get()); | 232 | | | 233 | 10 | while (running_insert_count_.count() > 0) { | 234 | 0 | for (int i = 0; i < 100; i++) { | 235 | 0 | CHECK_OK(this->InsertTestRow(&writer, tid, iteration++)); | 236 | 0 | CHECK_OK(this->DeleteTestRow(&writer, tid)); | 237 | 0 | } | 238 | 0 | } | 239 | 10 | } |
|
240 | | |
241 | | // Thread which continuously sends updates at the same row, ignoring any |
242 | | // "not found" errors that might come back. This is used simultaneously with |
243 | | // DeleteAndReinsertCycleThread to check for races where we might accidentally |
244 | | // succeed in UPDATING a ghost row. |
245 | 60 | void StubbornlyUpdateSameRowThread(int tid) { |
246 | 60 | int32_t iteration = 0; |
247 | 60 | LocalTabletWriter writer(this->tablet().get()); |
248 | 60 | while (running_insert_count_.count() > 0) { |
249 | 0 | for (int i = 0; i < 100; i++) { |
250 | 0 | Status s = this->UpdateTestRow(&writer, tid, iteration++); |
251 | 0 | if (!s.ok() && !s.IsNotFound()) { |
252 | | // We expect "not found", but not any other errors. |
253 | 0 | CHECK_OK(s); |
254 | 0 | } |
255 | 0 | } |
256 | 0 | } |
257 | 60 | } _ZN2yb6tablet20VerifyRowsTabletTestINS0_18StringKeyTestSetupEE29StubbornlyUpdateSameRowThreadEi Line | Count | Source | 245 | 10 | void StubbornlyUpdateSameRowThread(int tid) { | 246 | 10 | int32_t iteration = 0; | 247 | 10 | LocalTabletWriter writer(this->tablet().get()); | 248 | 10 | while (running_insert_count_.count() > 0) { | 249 | 0 | for (int i = 0; i < 100; i++) { | 250 | 0 | Status s = this->UpdateTestRow(&writer, tid, iteration++); | 251 | 0 | if (!s.ok() && !s.IsNotFound()) { | 252 | | // We expect "not found", but not any other errors. | 253 | 0 | CHECK_OK(s); | 254 | 0 | } | 255 | 0 | } | 256 | 0 | } | 257 | 10 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE1EEEE29StubbornlyUpdateSameRowThreadEi Line | Count | Source | 245 | 10 | void StubbornlyUpdateSameRowThread(int tid) { | 246 | 10 | int32_t iteration = 0; | 247 | 10 | LocalTabletWriter writer(this->tablet().get()); | 248 | 10 | while (running_insert_count_.count() > 0) { | 249 | 0 | for (int i = 0; i < 100; i++) { | 250 | 0 | Status s = this->UpdateTestRow(&writer, tid, iteration++); | 251 | 0 | if (!s.ok() && !s.IsNotFound()) { | 252 | | // We expect "not found", but not any other errors. | 253 | 0 | CHECK_OK(s); | 254 | 0 | } | 255 | 0 | } | 256 | 0 | } | 257 | 10 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE2EEEE29StubbornlyUpdateSameRowThreadEi Line | Count | Source | 245 | 10 | void StubbornlyUpdateSameRowThread(int tid) { | 246 | 10 | int32_t iteration = 0; | 247 | 10 | LocalTabletWriter writer(this->tablet().get()); | 248 | 10 | while (running_insert_count_.count() > 0) { | 249 | 0 | for (int i = 0; i < 100; i++) { | 250 | 0 | Status s = this->UpdateTestRow(&writer, tid, iteration++); | 251 | 0 | if (!s.ok() && !s.IsNotFound()) { | 252 | | // We expect "not found", but not any other errors. | 253 | 0 | CHECK_OK(s); | 254 | 0 | } | 255 | 0 | } | 256 | 0 | } | 257 | 10 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE3EEEE29StubbornlyUpdateSameRowThreadEi Line | Count | Source | 245 | 10 | void StubbornlyUpdateSameRowThread(int tid) { | 246 | 10 | int32_t iteration = 0; | 247 | 10 | LocalTabletWriter writer(this->tablet().get()); | 248 | 10 | while (running_insert_count_.count() > 0) { | 249 | 0 | for (int i = 0; i < 100; i++) { | 250 | 0 | Status s = this->UpdateTestRow(&writer, tid, iteration++); | 251 | 0 | if (!s.ok() && !s.IsNotFound()) { | 252 | | // We expect "not found", but not any other errors. | 253 | 0 | CHECK_OK(s); | 254 | 0 | } | 255 | 0 | } | 256 | 0 | } | 257 | 10 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE4EEEE29StubbornlyUpdateSameRowThreadEi Line | Count | Source | 245 | 10 | void StubbornlyUpdateSameRowThread(int tid) { | 246 | 10 | int32_t iteration = 0; | 247 | 10 | LocalTabletWriter writer(this->tablet().get()); | 248 | 10 | while (running_insert_count_.count() > 0) { | 249 | 0 | for (int i = 0; i < 100; i++) { | 250 | 0 | Status s = this->UpdateTestRow(&writer, tid, iteration++); | 251 | 0 | if (!s.ok() && !s.IsNotFound()) { | 252 | | // We expect "not found", but not any other errors. | 253 | 0 | CHECK_OK(s); | 254 | 0 | } | 255 | 0 | } | 256 | 0 | } | 257 | 10 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_22NullableValueTestSetupEE29StubbornlyUpdateSameRowThreadEi Line | Count | Source | 245 | 10 | void StubbornlyUpdateSameRowThread(int tid) { | 246 | 10 | int32_t iteration = 0; | 247 | 10 | LocalTabletWriter writer(this->tablet().get()); | 248 | 10 | while (running_insert_count_.count() > 0) { | 249 | 0 | for (int i = 0; i < 100; i++) { | 250 | 0 | Status s = this->UpdateTestRow(&writer, tid, iteration++); | 251 | 0 | if (!s.ok() && !s.IsNotFound()) { | 252 | | // We expect "not found", but not any other errors. | 253 | 0 | CHECK_OK(s); | 254 | 0 | } | 255 | 0 | } | 256 | 0 | } | 257 | 10 | } |
|
258 | | |
259 | | template<typename FunctionType> |
260 | 30 | void StartThreads(int n_threads, const FunctionType &function) { |
261 | 168 | for (int i = 0; i < n_threads; i++) { |
262 | 138 | scoped_refptr<yb::Thread> new_thread; |
263 | 138 | CHECK_OK(yb::Thread::Create("test", strings::Substitute("test$0", i), |
264 | 138 | function, this, i, &new_thread)); |
265 | 138 | threads_.push_back(new_thread); |
266 | 138 | } |
267 | 30 | } _ZN2yb6tablet20VerifyRowsTabletTestINS0_18StringKeyTestSetupEE12StartThreadsIMS3_FviEEEviRKT_ Line | Count | Source | 260 | 5 | void StartThreads(int n_threads, const FunctionType &function) { | 261 | 28 | for (int i = 0; i < n_threads; i++) { | 262 | 23 | scoped_refptr<yb::Thread> new_thread; | 263 | 23 | CHECK_OK(yb::Thread::Create("test", strings::Substitute("test$0", i), | 264 | 23 | function, this, i, &new_thread)); | 265 | 23 | threads_.push_back(new_thread); | 266 | 23 | } | 267 | 5 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE1EEEE12StartThreadsIMS5_FviEEEviRKT_ Line | Count | Source | 260 | 5 | void StartThreads(int n_threads, const FunctionType &function) { | 261 | 28 | for (int i = 0; i < n_threads; i++) { | 262 | 23 | scoped_refptr<yb::Thread> new_thread; | 263 | 23 | CHECK_OK(yb::Thread::Create("test", strings::Substitute("test$0", i), | 264 | 23 | function, this, i, &new_thread)); | 265 | 23 | threads_.push_back(new_thread); | 266 | 23 | } | 267 | 5 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE2EEEE12StartThreadsIMS5_FviEEEviRKT_ Line | Count | Source | 260 | 5 | void StartThreads(int n_threads, const FunctionType &function) { | 261 | 28 | for (int i = 0; i < n_threads; i++) { | 262 | 23 | scoped_refptr<yb::Thread> new_thread; | 263 | 23 | CHECK_OK(yb::Thread::Create("test", strings::Substitute("test$0", i), | 264 | 23 | function, this, i, &new_thread)); | 265 | 23 | threads_.push_back(new_thread); | 266 | 23 | } | 267 | 5 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE3EEEE12StartThreadsIMS5_FviEEEviRKT_ Line | Count | Source | 260 | 5 | void StartThreads(int n_threads, const FunctionType &function) { | 261 | 28 | for (int i = 0; i < n_threads; i++) { | 262 | 23 | scoped_refptr<yb::Thread> new_thread; | 263 | 23 | CHECK_OK(yb::Thread::Create("test", strings::Substitute("test$0", i), | 264 | 23 | function, this, i, &new_thread)); | 265 | 23 | threads_.push_back(new_thread); | 266 | 23 | } | 267 | 5 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE4EEEE12StartThreadsIMS5_FviEEEviRKT_ Line | Count | Source | 260 | 5 | void StartThreads(int n_threads, const FunctionType &function) { | 261 | 28 | for (int i = 0; i < n_threads; i++) { | 262 | 23 | scoped_refptr<yb::Thread> new_thread; | 263 | 23 | CHECK_OK(yb::Thread::Create("test", strings::Substitute("test$0", i), | 264 | 23 | function, this, i, &new_thread)); | 265 | 23 | threads_.push_back(new_thread); | 266 | 23 | } | 267 | 5 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_22NullableValueTestSetupEE12StartThreadsIMS3_FviEEEviRKT_ Line | Count | Source | 260 | 5 | void StartThreads(int n_threads, const FunctionType &function) { | 261 | 28 | for (int i = 0; i < n_threads; i++) { | 262 | 23 | scoped_refptr<yb::Thread> new_thread; | 263 | 23 | CHECK_OK(yb::Thread::Create("test", strings::Substitute("test$0", i), | 264 | 23 | function, this, i, &new_thread)); | 265 | 23 | threads_.push_back(new_thread); | 266 | 23 | } | 267 | 5 | } |
|
268 | | |
269 | 12 | void JoinThreads() { |
270 | 156 | for (scoped_refptr<yb::Thread> thr : threads_) { |
271 | 156 | CHECK_OK(ThreadJoiner(thr.get()).Join()); |
272 | 156 | } |
273 | 12 | } _ZN2yb6tablet20VerifyRowsTabletTestINS0_18StringKeyTestSetupEE11JoinThreadsEv Line | Count | Source | 269 | 2 | void JoinThreads() { | 270 | 26 | for (scoped_refptr<yb::Thread> thr : threads_) { | 271 | 26 | CHECK_OK(ThreadJoiner(thr.get()).Join()); | 272 | 26 | } | 273 | 2 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE1EEEE11JoinThreadsEv Line | Count | Source | 269 | 2 | void JoinThreads() { | 270 | 26 | for (scoped_refptr<yb::Thread> thr : threads_) { | 271 | 26 | CHECK_OK(ThreadJoiner(thr.get()).Join()); | 272 | 26 | } | 273 | 2 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE2EEEE11JoinThreadsEv Line | Count | Source | 269 | 2 | void JoinThreads() { | 270 | 26 | for (scoped_refptr<yb::Thread> thr : threads_) { | 271 | 26 | CHECK_OK(ThreadJoiner(thr.get()).Join()); | 272 | 26 | } | 273 | 2 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE3EEEE11JoinThreadsEv Line | Count | Source | 269 | 2 | void JoinThreads() { | 270 | 26 | for (scoped_refptr<yb::Thread> thr : threads_) { | 271 | 26 | CHECK_OK(ThreadJoiner(thr.get()).Join()); | 272 | 26 | } | 273 | 2 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE4EEEE11JoinThreadsEv Line | Count | Source | 269 | 2 | void JoinThreads() { | 270 | 26 | for (scoped_refptr<yb::Thread> thr : threads_) { | 271 | 26 | CHECK_OK(ThreadJoiner(thr.get()).Join()); | 272 | 26 | } | 273 | 2 | } |
_ZN2yb6tablet20VerifyRowsTabletTestINS0_22NullableValueTestSetupEE11JoinThreadsEv Line | Count | Source | 269 | 2 | void JoinThreads() { | 270 | 26 | for (scoped_refptr<yb::Thread> thr : threads_) { | 271 | 26 | CHECK_OK(ThreadJoiner(thr.get()).Join()); | 272 | 26 | } | 273 | 2 | } |
|
274 | | |
275 | | std::vector<scoped_refptr<yb::Thread> > threads_; |
276 | | CountDownLatch running_insert_count_; |
277 | | |
278 | | // Projection with only an int column. |
279 | | // This is provided by both harnesses. |
280 | | Schema valcol_projection_; |
281 | | |
282 | | TimeSeriesCollector ts_collector_; |
283 | | }; |
284 | | |
285 | | TYPED_TEST_CASE(VerifyRowsTabletTest, TabletTestHelperTypes); |
286 | | |
287 | 6 | TYPED_TEST(VerifyRowsTabletTest, DoTestAllAtOnce) { |
288 | 6 | if (1000 == FLAGS_inserts_per_thread) { |
289 | 6 | if (AllowSlowTests()) { |
290 | 0 | FLAGS_inserts_per_thread = 50000; |
291 | 0 | } |
292 | 6 | } |
293 | | |
294 | | // Spawn a bunch of threads, each of which will do updates. |
295 | 6 | this->StartThreads(kNumInsertThreads, &TestFixture::InsertThread); |
296 | 6 | this->StartThreads(FLAGS_num_summer_threads, &TestFixture::SummerThread); |
297 | 6 | this->StartThreads(FLAGS_num_slowreader_threads, &TestFixture::SlowReaderThread); |
298 | 6 | this->JoinThreads(); |
299 | 6 | LOG_TIMING(INFO, "Summing int32 column") { |
300 | 6 | uint64_t sum = this->CountSum(shared_ptr<TimeSeries>()); |
301 | 6 | LOG(INFO) << "Sum = " << sum; |
302 | 6 | } |
303 | | |
304 | 6 | auto max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads) |
305 | 6 | / kNumInsertThreads; |
306 | | |
307 | 6 | this->VerifyTestRows(0, max_rows * kNumInsertThreads); |
308 | | |
309 | | // Start up a bunch of threads which repeatedly insert and delete the same |
310 | | // row, while flushing and compacting. This checks various concurrent handling |
311 | | // of DELETE/REINSERT during flushes. |
312 | 6 | google::FlagSaver saver; |
313 | 6 | this->StartThreads(10, &TestFixture::DeleteAndReinsertCycleThread); |
314 | 6 | this->StartThreads(10, &TestFixture::StubbornlyUpdateSameRowThread); |
315 | | |
316 | | // Run very quickly in dev builds, longer in slow builds. |
317 | 6 | float runtime_seconds = AllowSlowTests() ? 2 : 0.1; |
318 | 6 | Stopwatch sw; |
319 | 6 | sw.start(); |
320 | 108 | while (sw.elapsed().wall < runtime_seconds * NANOS_PER_SECOND && |
321 | 102 | !this->HasFatalFailure()) { |
322 | 102 | SleepFor(MonoDelta::FromMicroseconds(5000)); |
323 | 102 | } |
324 | | |
325 | | // This is sort of a hack -- the flusher thread stops when it sees this |
326 | | // countdown latch go to 0. |
327 | 6 | this->running_insert_count_.Reset(0); |
328 | 6 | this->JoinThreads(); |
329 | 6 | } _ZN2yb6tablet41VerifyRowsTabletTest_DoTestAllAtOnce_TestINS0_18StringKeyTestSetupEE8TestBodyEv Line | Count | Source | 287 | 1 | TYPED_TEST(VerifyRowsTabletTest, DoTestAllAtOnce) { | 288 | 1 | if (1000 == FLAGS_inserts_per_thread) { | 289 | 1 | if (AllowSlowTests()) { | 290 | 0 | FLAGS_inserts_per_thread = 50000; | 291 | 0 | } | 292 | 1 | } | 293 | | | 294 | | // Spawn a bunch of threads, each of which will do updates. | 295 | 1 | this->StartThreads(kNumInsertThreads, &TestFixture::InsertThread); | 296 | 1 | this->StartThreads(FLAGS_num_summer_threads, &TestFixture::SummerThread); | 297 | 1 | this->StartThreads(FLAGS_num_slowreader_threads, &TestFixture::SlowReaderThread); | 298 | 1 | this->JoinThreads(); | 299 | 1 | LOG_TIMING(INFO, "Summing int32 column") { | 300 | 1 | uint64_t sum = this->CountSum(shared_ptr<TimeSeries>()); | 301 | 1 | LOG(INFO) << "Sum = " << sum; | 302 | 1 | } | 303 | | | 304 | 1 | auto max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads) | 305 | 1 | / kNumInsertThreads; | 306 | | | 307 | 1 | this->VerifyTestRows(0, max_rows * kNumInsertThreads); | 308 | | | 309 | | // Start up a bunch of threads which repeatedly insert and delete the same | 310 | | // row, while flushing and compacting. This checks various concurrent handling | 311 | | // of DELETE/REINSERT during flushes. | 312 | 1 | google::FlagSaver saver; | 313 | 1 | this->StartThreads(10, &TestFixture::DeleteAndReinsertCycleThread); | 314 | 1 | this->StartThreads(10, &TestFixture::StubbornlyUpdateSameRowThread); | 315 | | | 316 | | // Run very quickly in dev builds, longer in slow builds. | 317 | 1 | float runtime_seconds = AllowSlowTests() ? 2 : 0.1; | 318 | 1 | Stopwatch sw; | 319 | 1 | sw.start(); | 320 | 18 | while (sw.elapsed().wall < runtime_seconds * NANOS_PER_SECOND && | 321 | 17 | !this->HasFatalFailure()) { | 322 | 17 | SleepFor(MonoDelta::FromMicroseconds(5000)); | 323 | 17 | } | 324 | | | 325 | | // This is sort of a hack -- the flusher thread stops when it sees this | 326 | | // countdown latch go to 0. | 327 | 1 | this->running_insert_count_.Reset(0); | 328 | 1 | this->JoinThreads(); | 329 | 1 | } |
_ZN2yb6tablet41VerifyRowsTabletTest_DoTestAllAtOnce_TestINS0_15IntKeyTestSetupILNS_8DataTypeE1EEEE8TestBodyEv Line | Count | Source | 287 | 1 | TYPED_TEST(VerifyRowsTabletTest, DoTestAllAtOnce) { | 288 | 1 | if (1000 == FLAGS_inserts_per_thread) { | 289 | 1 | if (AllowSlowTests()) { | 290 | 0 | FLAGS_inserts_per_thread = 50000; | 291 | 0 | } | 292 | 1 | } | 293 | | | 294 | | // Spawn a bunch of threads, each of which will do updates. | 295 | 1 | this->StartThreads(kNumInsertThreads, &TestFixture::InsertThread); | 296 | 1 | this->StartThreads(FLAGS_num_summer_threads, &TestFixture::SummerThread); | 297 | 1 | this->StartThreads(FLAGS_num_slowreader_threads, &TestFixture::SlowReaderThread); | 298 | 1 | this->JoinThreads(); | 299 | 1 | LOG_TIMING(INFO, "Summing int32 column") { | 300 | 1 | uint64_t sum = this->CountSum(shared_ptr<TimeSeries>()); | 301 | 1 | LOG(INFO) << "Sum = " << sum; | 302 | 1 | } | 303 | | | 304 | 1 | auto max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads) | 305 | 1 | / kNumInsertThreads; | 306 | | | 307 | 1 | this->VerifyTestRows(0, max_rows * kNumInsertThreads); | 308 | | | 309 | | // Start up a bunch of threads which repeatedly insert and delete the same | 310 | | // row, while flushing and compacting. This checks various concurrent handling | 311 | | // of DELETE/REINSERT during flushes. | 312 | 1 | google::FlagSaver saver; | 313 | 1 | this->StartThreads(10, &TestFixture::DeleteAndReinsertCycleThread); | 314 | 1 | this->StartThreads(10, &TestFixture::StubbornlyUpdateSameRowThread); | 315 | | | 316 | | // Run very quickly in dev builds, longer in slow builds. | 317 | 1 | float runtime_seconds = AllowSlowTests() ? 2 : 0.1; | 318 | 1 | Stopwatch sw; | 319 | 1 | sw.start(); | 320 | 18 | while (sw.elapsed().wall < runtime_seconds * NANOS_PER_SECOND && | 321 | 17 | !this->HasFatalFailure()) { | 322 | 17 | SleepFor(MonoDelta::FromMicroseconds(5000)); | 323 | 17 | } | 324 | | | 325 | | // This is sort of a hack -- the flusher thread stops when it sees this | 326 | | // countdown latch go to 0. | 327 | 1 | this->running_insert_count_.Reset(0); | 328 | 1 | this->JoinThreads(); | 329 | 1 | } |
_ZN2yb6tablet41VerifyRowsTabletTest_DoTestAllAtOnce_TestINS0_15IntKeyTestSetupILNS_8DataTypeE2EEEE8TestBodyEv Line | Count | Source | 287 | 1 | TYPED_TEST(VerifyRowsTabletTest, DoTestAllAtOnce) { | 288 | 1 | if (1000 == FLAGS_inserts_per_thread) { | 289 | 1 | if (AllowSlowTests()) { | 290 | 0 | FLAGS_inserts_per_thread = 50000; | 291 | 0 | } | 292 | 1 | } | 293 | | | 294 | | // Spawn a bunch of threads, each of which will do updates. | 295 | 1 | this->StartThreads(kNumInsertThreads, &TestFixture::InsertThread); | 296 | 1 | this->StartThreads(FLAGS_num_summer_threads, &TestFixture::SummerThread); | 297 | 1 | this->StartThreads(FLAGS_num_slowreader_threads, &TestFixture::SlowReaderThread); | 298 | 1 | this->JoinThreads(); | 299 | 1 | LOG_TIMING(INFO, "Summing int32 column") { | 300 | 1 | uint64_t sum = this->CountSum(shared_ptr<TimeSeries>()); | 301 | 1 | LOG(INFO) << "Sum = " << sum; | 302 | 1 | } | 303 | | | 304 | 1 | auto max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads) | 305 | 1 | / kNumInsertThreads; | 306 | | | 307 | 1 | this->VerifyTestRows(0, max_rows * kNumInsertThreads); | 308 | | | 309 | | // Start up a bunch of threads which repeatedly insert and delete the same | 310 | | // row, while flushing and compacting. This checks various concurrent handling | 311 | | // of DELETE/REINSERT during flushes. | 312 | 1 | google::FlagSaver saver; | 313 | 1 | this->StartThreads(10, &TestFixture::DeleteAndReinsertCycleThread); | 314 | 1 | this->StartThreads(10, &TestFixture::StubbornlyUpdateSameRowThread); | 315 | | | 316 | | // Run very quickly in dev builds, longer in slow builds. | 317 | 1 | float runtime_seconds = AllowSlowTests() ? 2 : 0.1; | 318 | 1 | Stopwatch sw; | 319 | 1 | sw.start(); | 320 | 18 | while (sw.elapsed().wall < runtime_seconds * NANOS_PER_SECOND && | 321 | 17 | !this->HasFatalFailure()) { | 322 | 17 | SleepFor(MonoDelta::FromMicroseconds(5000)); | 323 | 17 | } | 324 | | | 325 | | // This is sort of a hack -- the flusher thread stops when it sees this | 326 | | // countdown latch go to 0. | 327 | 1 | this->running_insert_count_.Reset(0); | 328 | 1 | this->JoinThreads(); | 329 | 1 | } |
_ZN2yb6tablet41VerifyRowsTabletTest_DoTestAllAtOnce_TestINS0_15IntKeyTestSetupILNS_8DataTypeE3EEEE8TestBodyEv Line | Count | Source | 287 | 1 | TYPED_TEST(VerifyRowsTabletTest, DoTestAllAtOnce) { | 288 | 1 | if (1000 == FLAGS_inserts_per_thread) { | 289 | 1 | if (AllowSlowTests()) { | 290 | 0 | FLAGS_inserts_per_thread = 50000; | 291 | 0 | } | 292 | 1 | } | 293 | | | 294 | | // Spawn a bunch of threads, each of which will do updates. | 295 | 1 | this->StartThreads(kNumInsertThreads, &TestFixture::InsertThread); | 296 | 1 | this->StartThreads(FLAGS_num_summer_threads, &TestFixture::SummerThread); | 297 | 1 | this->StartThreads(FLAGS_num_slowreader_threads, &TestFixture::SlowReaderThread); | 298 | 1 | this->JoinThreads(); | 299 | 1 | LOG_TIMING(INFO, "Summing int32 column") { | 300 | 1 | uint64_t sum = this->CountSum(shared_ptr<TimeSeries>()); | 301 | 1 | LOG(INFO) << "Sum = " << sum; | 302 | 1 | } | 303 | | | 304 | 1 | auto max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads) | 305 | 1 | / kNumInsertThreads; | 306 | | | 307 | 1 | this->VerifyTestRows(0, max_rows * kNumInsertThreads); | 308 | | | 309 | | // Start up a bunch of threads which repeatedly insert and delete the same | 310 | | // row, while flushing and compacting. This checks various concurrent handling | 311 | | // of DELETE/REINSERT during flushes. | 312 | 1 | google::FlagSaver saver; | 313 | 1 | this->StartThreads(10, &TestFixture::DeleteAndReinsertCycleThread); | 314 | 1 | this->StartThreads(10, &TestFixture::StubbornlyUpdateSameRowThread); | 315 | | | 316 | | // Run very quickly in dev builds, longer in slow builds. | 317 | 1 | float runtime_seconds = AllowSlowTests() ? 2 : 0.1; | 318 | 1 | Stopwatch sw; | 319 | 1 | sw.start(); | 320 | 18 | while (sw.elapsed().wall < runtime_seconds * NANOS_PER_SECOND && | 321 | 17 | !this->HasFatalFailure()) { | 322 | 17 | SleepFor(MonoDelta::FromMicroseconds(5000)); | 323 | 17 | } | 324 | | | 325 | | // This is sort of a hack -- the flusher thread stops when it sees this | 326 | | // countdown latch go to 0. | 327 | 1 | this->running_insert_count_.Reset(0); | 328 | 1 | this->JoinThreads(); | 329 | 1 | } |
_ZN2yb6tablet41VerifyRowsTabletTest_DoTestAllAtOnce_TestINS0_15IntKeyTestSetupILNS_8DataTypeE4EEEE8TestBodyEv Line | Count | Source | 287 | 1 | TYPED_TEST(VerifyRowsTabletTest, DoTestAllAtOnce) { | 288 | 1 | if (1000 == FLAGS_inserts_per_thread) { | 289 | 1 | if (AllowSlowTests()) { | 290 | 0 | FLAGS_inserts_per_thread = 50000; | 291 | 0 | } | 292 | 1 | } | 293 | | | 294 | | // Spawn a bunch of threads, each of which will do updates. | 295 | 1 | this->StartThreads(kNumInsertThreads, &TestFixture::InsertThread); | 296 | 1 | this->StartThreads(FLAGS_num_summer_threads, &TestFixture::SummerThread); | 297 | 1 | this->StartThreads(FLAGS_num_slowreader_threads, &TestFixture::SlowReaderThread); | 298 | 1 | this->JoinThreads(); | 299 | 1 | LOG_TIMING(INFO, "Summing int32 column") { | 300 | 1 | uint64_t sum = this->CountSum(shared_ptr<TimeSeries>()); | 301 | 1 | LOG(INFO) << "Sum = " << sum; | 302 | 1 | } | 303 | | | 304 | 1 | auto max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads) | 305 | 1 | / kNumInsertThreads; | 306 | | | 307 | 1 | this->VerifyTestRows(0, max_rows * kNumInsertThreads); | 308 | | | 309 | | // Start up a bunch of threads which repeatedly insert and delete the same | 310 | | // row, while flushing and compacting. This checks various concurrent handling | 311 | | // of DELETE/REINSERT during flushes. | 312 | 1 | google::FlagSaver saver; | 313 | 1 | this->StartThreads(10, &TestFixture::DeleteAndReinsertCycleThread); | 314 | 1 | this->StartThreads(10, &TestFixture::StubbornlyUpdateSameRowThread); | 315 | | | 316 | | // Run very quickly in dev builds, longer in slow builds. | 317 | 1 | float runtime_seconds = AllowSlowTests() ? 2 : 0.1; | 318 | 1 | Stopwatch sw; | 319 | 1 | sw.start(); | 320 | 18 | while (sw.elapsed().wall < runtime_seconds * NANOS_PER_SECOND && | 321 | 17 | !this->HasFatalFailure()) { | 322 | 17 | SleepFor(MonoDelta::FromMicroseconds(5000)); | 323 | 17 | } | 324 | | | 325 | | // This is sort of a hack -- the flusher thread stops when it sees this | 326 | | // countdown latch go to 0. | 327 | 1 | this->running_insert_count_.Reset(0); | 328 | 1 | this->JoinThreads(); | 329 | 1 | } |
_ZN2yb6tablet41VerifyRowsTabletTest_DoTestAllAtOnce_TestINS0_22NullableValueTestSetupEE8TestBodyEv Line | Count | Source | 287 | 1 | TYPED_TEST(VerifyRowsTabletTest, DoTestAllAtOnce) { | 288 | 1 | if (1000 == FLAGS_inserts_per_thread) { | 289 | 1 | if (AllowSlowTests()) { | 290 | 0 | FLAGS_inserts_per_thread = 50000; | 291 | 0 | } | 292 | 1 | } | 293 | | | 294 | | // Spawn a bunch of threads, each of which will do updates. | 295 | 1 | this->StartThreads(kNumInsertThreads, &TestFixture::InsertThread); | 296 | 1 | this->StartThreads(FLAGS_num_summer_threads, &TestFixture::SummerThread); | 297 | 1 | this->StartThreads(FLAGS_num_slowreader_threads, &TestFixture::SlowReaderThread); | 298 | 1 | this->JoinThreads(); | 299 | 1 | LOG_TIMING(INFO, "Summing int32 column") { | 300 | 1 | uint64_t sum = this->CountSum(shared_ptr<TimeSeries>()); | 301 | 1 | LOG(INFO) << "Sum = " << sum; | 302 | 1 | } | 303 | | | 304 | 1 | auto max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads) | 305 | 1 | / kNumInsertThreads; | 306 | | | 307 | 1 | this->VerifyTestRows(0, max_rows * kNumInsertThreads); | 308 | | | 309 | | // Start up a bunch of threads which repeatedly insert and delete the same | 310 | | // row, while flushing and compacting. This checks various concurrent handling | 311 | | // of DELETE/REINSERT during flushes. | 312 | 1 | google::FlagSaver saver; | 313 | 1 | this->StartThreads(10, &TestFixture::DeleteAndReinsertCycleThread); | 314 | 1 | this->StartThreads(10, &TestFixture::StubbornlyUpdateSameRowThread); | 315 | | | 316 | | // Run very quickly in dev builds, longer in slow builds. | 317 | 1 | float runtime_seconds = AllowSlowTests() ? 2 : 0.1; | 318 | 1 | Stopwatch sw; | 319 | 1 | sw.start(); | 320 | 18 | while (sw.elapsed().wall < runtime_seconds * NANOS_PER_SECOND && | 321 | 17 | !this->HasFatalFailure()) { | 322 | 17 | SleepFor(MonoDelta::FromMicroseconds(5000)); | 323 | 17 | } | 324 | | | 325 | | // This is sort of a hack -- the flusher thread stops when it sees this | 326 | | // countdown latch go to 0. | 327 | 1 | this->running_insert_count_.Reset(0); | 328 | 1 | this->JoinThreads(); | 329 | 1 | } |
|
330 | | |
331 | | // NOTE: Cannot add another TYPED_TEST here. The opid chosen for the next insert might |
332 | | // be out of order and will assert in rocksdb::MemTable::SetLastOpId(). |
333 | | |
334 | | } // namespace tablet |
335 | | } // namespace yb |