YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/tablet/verifyrows-tablet-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include <memory>
34
35
#include <gtest/gtest.h>
36
37
#include "yb/common/ql_expr.h"
38
#include "yb/common/ql_protocol_util.h"
39
40
#include "yb/docdb/ql_rowwise_iterator_interface.h"
41
42
#include "yb/gutil/macros.h"
43
#include "yb/gutil/strings/substitute.h"
44
45
#include "yb/tablet/local_tablet_writer.h"
46
#include "yb/tablet/tablet-test-base.h"
47
#include "yb/tablet/tablet.h"
48
49
#include "yb/util/countdown_latch.h"
50
#include "yb/util/status_log.h"
51
#include "yb/util/test_graph.h"
52
#include "yb/util/thread.h"
53
54
DEFINE_int32(num_counter_threads, 8, "Number of counting threads to launch");
55
DEFINE_int32(num_summer_threads, 1, "Number of summing threads to launch");
56
DEFINE_int32(num_slowreader_threads, 1, "Number of 'slow' reader threads to launch");
57
DEFINE_int32(inserts_per_thread, 1000, "Number of rows inserted by the inserter thread");
58
59
using std::shared_ptr;
60
61
namespace yb {
62
namespace tablet {
63
64
// We use only one thread for now as each thread picks an OpId via a WriteTnxState for write and
65
// could reach rocksdb::MemTable::SetLastOpId() async'ly causing out of order assertion.
66
// There could be multiple threads, as long as they get OpId's assigned in order.
67
const int kNumInsertThreads = 1;
68
69
template<class SETUP>
70
class VerifyRowsTabletTest : public TabletTestBase<SETUP> {
71
  // Import some names from superclass, since C++ is stingy about
72
  // letting us refer to the members otherwise.
73
  typedef TabletTestBase<SETUP> superclass;
74
  using superclass::schema_;
75
  using superclass::client_schema_;
76
  using superclass::tablet;
77
  using superclass::setup_;
78
 public:
79
6
  virtual void SetUp() {
80
6
    superclass::SetUp();
81
82
    // Warm up code cache with all the projections we'll be using.
83
6
    ASSERT_OK(tablet()->NewRowIterator(client_schema_));
84
6
    const SchemaPtr schema = tablet()->schema();
85
6
    ColumnSchema valcol = schema->column(schema->find_column("val"));
86
6
    valcol_projection_ = Schema({ valcol }, 0);
87
6
    ASSERT_OK(tablet()->NewRowIterator(valcol_projection_));
88
89
6
    ts_collector_.StartDumperThread();
90
6
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_18StringKeyTestSetupEE5SetUpEv
Line
Count
Source
79
1
  virtual void SetUp() {
80
1
    superclass::SetUp();
81
82
    // Warm up code cache with all the projections we'll be using.
83
1
    ASSERT_OK(tablet()->NewRowIterator(client_schema_));
84
1
    const SchemaPtr schema = tablet()->schema();
85
1
    ColumnSchema valcol = schema->column(schema->find_column("val"));
86
1
    valcol_projection_ = Schema({ valcol }, 0);
87
1
    ASSERT_OK(tablet()->NewRowIterator(valcol_projection_));
88
89
1
    ts_collector_.StartDumperThread();
90
1
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE1EEEE5SetUpEv
Line
Count
Source
79
1
  virtual void SetUp() {
80
1
    superclass::SetUp();
81
82
    // Warm up code cache with all the projections we'll be using.
83
1
    ASSERT_OK(tablet()->NewRowIterator(client_schema_));
84
1
    const SchemaPtr schema = tablet()->schema();
85
1
    ColumnSchema valcol = schema->column(schema->find_column("val"));
86
1
    valcol_projection_ = Schema({ valcol }, 0);
87
1
    ASSERT_OK(tablet()->NewRowIterator(valcol_projection_));
88
89
1
    ts_collector_.StartDumperThread();
90
1
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE2EEEE5SetUpEv
Line
Count
Source
79
1
  virtual void SetUp() {
80
1
    superclass::SetUp();
81
82
    // Warm up code cache with all the projections we'll be using.
83
1
    ASSERT_OK(tablet()->NewRowIterator(client_schema_));
84
1
    const SchemaPtr schema = tablet()->schema();
85
1
    ColumnSchema valcol = schema->column(schema->find_column("val"));
86
1
    valcol_projection_ = Schema({ valcol }, 0);
87
1
    ASSERT_OK(tablet()->NewRowIterator(valcol_projection_));
88
89
1
    ts_collector_.StartDumperThread();
90
1
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE3EEEE5SetUpEv
Line
Count
Source
79
1
  virtual void SetUp() {
80
1
    superclass::SetUp();
81
82
    // Warm up code cache with all the projections we'll be using.
83
1
    ASSERT_OK(tablet()->NewRowIterator(client_schema_));
84
1
    const SchemaPtr schema = tablet()->schema();
85
1
    ColumnSchema valcol = schema->column(schema->find_column("val"));
86
1
    valcol_projection_ = Schema({ valcol }, 0);
87
1
    ASSERT_OK(tablet()->NewRowIterator(valcol_projection_));
88
89
1
    ts_collector_.StartDumperThread();
90
1
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE4EEEE5SetUpEv
Line
Count
Source
79
1
  virtual void SetUp() {
80
1
    superclass::SetUp();
81
82
    // Warm up code cache with all the projections we'll be using.
83
1
    ASSERT_OK(tablet()->NewRowIterator(client_schema_));
84
1
    const SchemaPtr schema = tablet()->schema();
85
1
    ColumnSchema valcol = schema->column(schema->find_column("val"));
86
1
    valcol_projection_ = Schema({ valcol }, 0);
87
1
    ASSERT_OK(tablet()->NewRowIterator(valcol_projection_));
88
89
1
    ts_collector_.StartDumperThread();
90
1
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_22NullableValueTestSetupEE5SetUpEv
Line
Count
Source
79
1
  virtual void SetUp() {
80
1
    superclass::SetUp();
81
82
    // Warm up code cache with all the projections we'll be using.
83
1
    ASSERT_OK(tablet()->NewRowIterator(client_schema_));
84
1
    const SchemaPtr schema = tablet()->schema();
85
1
    ColumnSchema valcol = schema->column(schema->find_column("val"));
86
1
    valcol_projection_ = Schema({ valcol }, 0);
87
1
    ASSERT_OK(tablet()->NewRowIterator(valcol_projection_));
88
89
1
    ts_collector_.StartDumperThread();
90
1
  }
91
92
  VerifyRowsTabletTest()
93
      : running_insert_count_(kNumInsertThreads),
94
6
        ts_collector_(::testing::UnitTest::GetInstance()->current_test_info()->test_case_name()) {
95
6
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_18StringKeyTestSetupEEC2Ev
Line
Count
Source
94
1
        ts_collector_(::testing::UnitTest::GetInstance()->current_test_info()->test_case_name()) {
95
1
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE1EEEEC2Ev
Line
Count
Source
94
1
        ts_collector_(::testing::UnitTest::GetInstance()->current_test_info()->test_case_name()) {
95
1
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE2EEEEC2Ev
Line
Count
Source
94
1
        ts_collector_(::testing::UnitTest::GetInstance()->current_test_info()->test_case_name()) {
95
1
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE3EEEEC2Ev
Line
Count
Source
94
1
        ts_collector_(::testing::UnitTest::GetInstance()->current_test_info()->test_case_name()) {
95
1
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE4EEEEC2Ev
Line
Count
Source
94
1
        ts_collector_(::testing::UnitTest::GetInstance()->current_test_info()->test_case_name()) {
95
1
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_22NullableValueTestSetupEEC2Ev
Line
Count
Source
94
1
        ts_collector_(::testing::UnitTest::GetInstance()->current_test_info()->test_case_name()) {
95
1
  }
96
97
6
  void InsertThread(int tid) {
98
6
    CountDownOnScopeExit dec_count(&running_insert_count_);
99
6
    shared_ptr<TimeSeries> inserts = ts_collector_.GetTimeSeries("inserted");
100
101
6
    int32_t max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads)
102
6
        / kNumInsertThreads;
103
104
6
    if (max_rows < FLAGS_inserts_per_thread) {
105
1
      LOG(WARNING) << "Clamping the inserts per thread to " << max_rows << " to prevent overflow";
106
1
    }
107
108
6
    this->InsertTestRows(tid * max_rows, max_rows, 0, inserts.get());
109
6
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_18StringKeyTestSetupEE12InsertThreadEi
Line
Count
Source
97
1
  void InsertThread(int tid) {
98
1
    CountDownOnScopeExit dec_count(&running_insert_count_);
99
1
    shared_ptr<TimeSeries> inserts = ts_collector_.GetTimeSeries("inserted");
100
101
1
    int32_t max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads)
102
1
        / kNumInsertThreads;
103
104
1
    if (max_rows < FLAGS_inserts_per_thread) {
105
0
      LOG(WARNING) << "Clamping the inserts per thread to " << max_rows << " to prevent overflow";
106
0
    }
107
108
1
    this->InsertTestRows(tid * max_rows, max_rows, 0, inserts.get());
109
1
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE1EEEE12InsertThreadEi
Line
Count
Source
97
1
  void InsertThread(int tid) {
98
1
    CountDownOnScopeExit dec_count(&running_insert_count_);
99
1
    shared_ptr<TimeSeries> inserts = ts_collector_.GetTimeSeries("inserted");
100
101
1
    int32_t max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads)
102
1
        / kNumInsertThreads;
103
104
1
    if (max_rows < FLAGS_inserts_per_thread) {
105
1
      LOG(WARNING) << "Clamping the inserts per thread to " << max_rows << " to prevent overflow";
106
1
    }
107
108
1
    this->InsertTestRows(tid * max_rows, max_rows, 0, inserts.get());
109
1
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE2EEEE12InsertThreadEi
Line
Count
Source
97
1
  void InsertThread(int tid) {
98
1
    CountDownOnScopeExit dec_count(&running_insert_count_);
99
1
    shared_ptr<TimeSeries> inserts = ts_collector_.GetTimeSeries("inserted");
100
101
1
    int32_t max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads)
102
1
        / kNumInsertThreads;
103
104
1
    if (max_rows < FLAGS_inserts_per_thread) {
105
0
      LOG(WARNING) << "Clamping the inserts per thread to " << max_rows << " to prevent overflow";
106
0
    }
107
108
1
    this->InsertTestRows(tid * max_rows, max_rows, 0, inserts.get());
109
1
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE3EEEE12InsertThreadEi
Line
Count
Source
97
1
  void InsertThread(int tid) {
98
1
    CountDownOnScopeExit dec_count(&running_insert_count_);
99
1
    shared_ptr<TimeSeries> inserts = ts_collector_.GetTimeSeries("inserted");
100
101
1
    int32_t max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads)
102
1
        / kNumInsertThreads;
103
104
1
    if (max_rows < FLAGS_inserts_per_thread) {
105
0
      LOG(WARNING) << "Clamping the inserts per thread to " << max_rows << " to prevent overflow";
106
0
    }
107
108
1
    this->InsertTestRows(tid * max_rows, max_rows, 0, inserts.get());
109
1
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE4EEEE12InsertThreadEi
Line
Count
Source
97
1
  void InsertThread(int tid) {
98
1
    CountDownOnScopeExit dec_count(&running_insert_count_);
99
1
    shared_ptr<TimeSeries> inserts = ts_collector_.GetTimeSeries("inserted");
100
101
1
    int32_t max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads)
102
1
        / kNumInsertThreads;
103
104
1
    if (max_rows < FLAGS_inserts_per_thread) {
105
0
      LOG(WARNING) << "Clamping the inserts per thread to " << max_rows << " to prevent overflow";
106
0
    }
107
108
1
    this->InsertTestRows(tid * max_rows, max_rows, 0, inserts.get());
109
1
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_22NullableValueTestSetupEE12InsertThreadEi
Line
Count
Source
97
1
  void InsertThread(int tid) {
98
1
    CountDownOnScopeExit dec_count(&running_insert_count_);
99
1
    shared_ptr<TimeSeries> inserts = ts_collector_.GetTimeSeries("inserted");
100
101
1
    int32_t max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads)
102
1
        / kNumInsertThreads;
103
104
1
    if (max_rows < FLAGS_inserts_per_thread) {
105
0
      LOG(WARNING) << "Clamping the inserts per thread to " << max_rows << " to prevent overflow";
106
0
    }
107
108
1
    this->InsertTestRows(tid * max_rows, max_rows, 0, inserts.get());
109
1
  }
110
111
  void UpdateThread(int tid) {
112
    const Schema &schema = schema_;
113
114
    shared_ptr<TimeSeries> updates = ts_collector_.GetTimeSeries("updated");
115
116
    LocalTabletWriter writer(this->tablet().get(), &this->client_schema_);
117
118
    faststring update_buf;
119
120
    uint64_t updates_since_last_report = 0;
121
    int col_idx = schema.num_key_columns() == 1 ? 2 : 3;
122
    LOG(INFO) << "Update thread using schema: " << schema.ToString();
123
124
    YBPartialRow row(&client_schema_);
125
126
    QLTableRow value_map;
127
128
    while (running_insert_count_.count() > 0) {
129
      auto iter = tablet()->NewRowIterator(client_schema_);
130
      CHECK_OK(iter);
131
132
      while (ASSERT_RESULT((**iter).HasNext()) && running_insert_count_.count() > 0) {
133
        CHECK_OK((**iter).NextRow(&value_map));
134
135
        unsigned int seed = 1234;
136
        if (rand_r(&seed) % 10 == 7) {
137
          // Increment the "val"
138
139
          QLValue value;
140
          CHECK_OK(value_map.GetValue(schema.column_id(col_idx), &value));
141
          int32_t new_val = value.int32_value() + 1;
142
143
          // Rebuild the key by extracting the cells from the row
144
          QLWriteRequestPB req;
145
          setup_.BuildRowKeyFromExistingRow(&req, value_map);
146
          QLAddInt32ColumnValue(&req, kFirstColumnId + col_idx, new_val);
147
          CHECK_OK(writer.Write(&req));
148
149
          if (++updates_since_last_report >= 10) {
150
            updates->AddValue(updates_since_last_report);
151
            updates_since_last_report = 0;
152
          }
153
        }
154
      }
155
    }
156
  }
157
158
  // Thread which iterates slowly over the first 10% of the data.
159
  // This is meant to test that outstanding iterators don't end up
160
  // trying to reference already-freed memory.
161
6
  void SlowReaderThread(int tid) {
162
6
    QLTableRow row;
163
164
6
    auto max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads)
165
6
            / kNumInsertThreads;
166
167
6
    int max_iters = kNumInsertThreads * max_rows / 10;
168
169
53
    while (running_insert_count_.count() > 0) {
170
53
      auto iter = tablet()->NewRowIterator(client_schema_);
171
53
      ASSERT_OK(iter);
172
173
368
      for (int i = 0; i < max_iters && ASSERT_RESULT((**iter).HasNext()); i++) {
174
321
        ASSERT_OK((**iter).NextRow(&row));
175
176
321
        if (running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(1))) {
177
6
          return;
178
6
        }
179
321
      }
180
53
    }
181
6
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_18StringKeyTestSetupEE16SlowReaderThreadEi
Line
Count
Source
161
1
  void SlowReaderThread(int tid) {
162
1
    QLTableRow row;
163
164
1
    auto max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads)
165
1
            / kNumInsertThreads;
166
167
1
    int max_iters = kNumInsertThreads * max_rows / 10;
168
169
12
    while (running_insert_count_.count() > 0) {
170
12
      auto iter = tablet()->NewRowIterator(client_schema_);
171
12
      ASSERT_OK(iter);
172
173
77
      for (int i = 0; i < max_iters && ASSERT_RESULT((**iter).HasNext()); i++) {
174
66
        ASSERT_OK((**iter).NextRow(&row));
175
176
66
        if (running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(1))) {
177
1
          return;
178
1
        }
179
66
      }
180
12
    }
181
1
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE1EEEE16SlowReaderThreadEi
Line
Count
Source
161
1
  void SlowReaderThread(int tid) {
162
1
    QLTableRow row;
163
164
1
    auto max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads)
165
1
            / kNumInsertThreads;
166
167
1
    int max_iters = kNumInsertThreads * max_rows / 10;
168
169
9
    while (running_insert_count_.count() > 0) {
170
9
      auto iter = tablet()->NewRowIterator(client_schema_);
171
9
      ASSERT_OK(iter);
172
173
17
      for (int i = 0; i < max_iters && ASSERT_RESULT((**iter).HasNext()); i++) {
174
9
        ASSERT_OK((**iter).NextRow(&row));
175
176
9
        if (running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(1))) {
177
1
          return;
178
1
        }
179
9
      }
180
9
    }
181
1
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE2EEEE16SlowReaderThreadEi
Line
Count
Source
161
1
  void SlowReaderThread(int tid) {
162
1
    QLTableRow row;
163
164
1
    auto max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads)
165
1
            / kNumInsertThreads;
166
167
1
    int max_iters = kNumInsertThreads * max_rows / 10;
168
169
10
    while (running_insert_count_.count() > 0) {
170
10
      auto iter = tablet()->NewRowIterator(client_schema_);
171
10
      ASSERT_OK(iter);
172
173
72
      for (int i = 0; i < max_iters && ASSERT_RESULT((**iter).HasNext()); i++) {
174
63
        ASSERT_OK((**iter).NextRow(&row));
175
176
63
        if (running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(1))) {
177
1
          return;
178
1
        }
179
63
      }
180
10
    }
181
1
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE3EEEE16SlowReaderThreadEi
Line
Count
Source
161
1
  void SlowReaderThread(int tid) {
162
1
    QLTableRow row;
163
164
1
    auto max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads)
165
1
            / kNumInsertThreads;
166
167
1
    int max_iters = kNumInsertThreads * max_rows / 10;
168
169
2
    while (running_insert_count_.count() > 0) {
170
2
      auto iter = tablet()->NewRowIterator(client_schema_);
171
2
      ASSERT_OK(iter);
172
173
66
      for (int i = 0; i < max_iters && ASSERT_RESULT((**iter).HasNext()); i++) {
174
65
        ASSERT_OK((**iter).NextRow(&row));
175
176
65
        if (running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(1))) {
177
1
          return;
178
1
        }
179
65
      }
180
2
    }
181
1
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE4EEEE16SlowReaderThreadEi
Line
Count
Source
161
1
  void SlowReaderThread(int tid) {
162
1
    QLTableRow row;
163
164
1
    auto max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads)
165
1
            / kNumInsertThreads;
166
167
1
    int max_iters = kNumInsertThreads * max_rows / 10;
168
169
9
    while (running_insert_count_.count() > 0) {
170
9
      auto iter = tablet()->NewRowIterator(client_schema_);
171
9
      ASSERT_OK(iter);
172
173
66
      for (int i = 0; i < max_iters && ASSERT_RESULT((**iter).HasNext()); i++) {
174
58
        ASSERT_OK((**iter).NextRow(&row));
175
176
58
        if (running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(1))) {
177
1
          return;
178
1
        }
179
58
      }
180
9
    }
181
1
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_22NullableValueTestSetupEE16SlowReaderThreadEi
Line
Count
Source
161
1
  void SlowReaderThread(int tid) {
162
1
    QLTableRow row;
163
164
1
    auto max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads)
165
1
            / kNumInsertThreads;
166
167
1
    int max_iters = kNumInsertThreads * max_rows / 10;
168
169
11
    while (running_insert_count_.count() > 0) {
170
11
      auto iter = tablet()->NewRowIterator(client_schema_);
171
11
      ASSERT_OK(iter);
172
173
70
      for (int i = 0; i < max_iters && ASSERT_RESULT((**iter).HasNext()); i++) {
174
60
        ASSERT_OK((**iter).NextRow(&row));
175
176
60
        if (running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(1))) {
177
1
          return;
178
1
        }
179
60
      }
180
11
    }
181
1
  }
182
183
6
  void SummerThread(int tid) {
184
6
    shared_ptr<TimeSeries> scanned_ts = ts_collector_.GetTimeSeries("scanned");
185
186
164
    while (running_insert_count_.count() > 0) {
187
158
      CountSum(scanned_ts);
188
158
    }
189
6
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_18StringKeyTestSetupEE12SummerThreadEi
Line
Count
Source
183
1
  void SummerThread(int tid) {
184
1
    shared_ptr<TimeSeries> scanned_ts = ts_collector_.GetTimeSeries("scanned");
185
186
34
    while (running_insert_count_.count() > 0) {
187
33
      CountSum(scanned_ts);
188
33
    }
189
1
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE1EEEE12SummerThreadEi
Line
Count
Source
183
1
  void SummerThread(int tid) {
184
1
    shared_ptr<TimeSeries> scanned_ts = ts_collector_.GetTimeSeries("scanned");
185
186
21
    while (running_insert_count_.count() > 0) {
187
20
      CountSum(scanned_ts);
188
20
    }
189
1
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE2EEEE12SummerThreadEi
Line
Count
Source
183
1
  void SummerThread(int tid) {
184
1
    shared_ptr<TimeSeries> scanned_ts = ts_collector_.GetTimeSeries("scanned");
185
186
30
    while (running_insert_count_.count() > 0) {
187
29
      CountSum(scanned_ts);
188
29
    }
189
1
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE3EEEE12SummerThreadEi
Line
Count
Source
183
1
  void SummerThread(int tid) {
184
1
    shared_ptr<TimeSeries> scanned_ts = ts_collector_.GetTimeSeries("scanned");
185
186
14
    while (running_insert_count_.count() > 0) {
187
13
      CountSum(scanned_ts);
188
13
    }
189
1
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE4EEEE12SummerThreadEi
Line
Count
Source
183
1
  void SummerThread(int tid) {
184
1
    shared_ptr<TimeSeries> scanned_ts = ts_collector_.GetTimeSeries("scanned");
185
186
31
    while (running_insert_count_.count() > 0) {
187
30
      CountSum(scanned_ts);
188
30
    }
189
1
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_22NullableValueTestSetupEE12SummerThreadEi
Line
Count
Source
183
1
  void SummerThread(int tid) {
184
1
    shared_ptr<TimeSeries> scanned_ts = ts_collector_.GetTimeSeries("scanned");
185
186
34
    while (running_insert_count_.count() > 0) {
187
33
      CountSum(scanned_ts);
188
33
    }
189
1
  }
190
191
164
  uint64_t CountSum(const shared_ptr<TimeSeries> &scanned_ts) {
192
164
    uint64_t count_since_report = 0;
193
194
164
    uint64_t sum = 0;
195
196
164
    auto iter = tablet()->NewRowIterator(valcol_projection_);
197
164
    CHECK_OK(iter);
198
199
164
    QLTableRow row;
200
20.9k
    while (CHECK_RESULT((**iter).HasNext())) {
201
20.9k
      CHECK_OK((**iter).NextRow(&row));
202
203
20.9k
      QLValue value;
204
20.9k
      CHECK_OK(row.GetValue(schema_.column_id(2), &value));
205
20.9k
      if (!value.IsNull()) {
206
0
        CHECK(value.value().has_int32_value()) << "Row: " << row.ToString();
207
19.0k
        sum += value.int32_value();
208
19.0k
      }
209
20.9k
      count_since_report += 1;
210
211
      // Report metrics if enough time has passed
212
20.9k
      if (count_since_report > 100) {
213
169
        if (scanned_ts.get()) {
214
123
          scanned_ts->AddValue(count_since_report);
215
123
        }
216
169
        count_since_report = 0;
217
169
      }
218
20.9k
    }
219
220
164
    if (scanned_ts.get()) {
221
158
      scanned_ts->AddValue(count_since_report);
222
158
    }
223
224
164
    return sum;
225
164
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_18StringKeyTestSetupEE8CountSumERKNSt3__110shared_ptrINS_10TimeSeriesEEE
Line
Count
Source
191
34
  uint64_t CountSum(const shared_ptr<TimeSeries> &scanned_ts) {
192
34
    uint64_t count_since_report = 0;
193
194
34
    uint64_t sum = 0;
195
196
34
    auto iter = tablet()->NewRowIterator(valcol_projection_);
197
34
    CHECK_OK(iter);
198
199
34
    QLTableRow row;
200
4.34k
    while (CHECK_RESULT((**iter).HasNext())) {
201
4.34k
      CHECK_OK((**iter).NextRow(&row));
202
203
4.34k
      QLValue value;
204
4.34k
      CHECK_OK(row.GetValue(schema_.column_id(2), &value));
205
4.34k
      if (!value.IsNull()) {
206
0
        CHECK(value.value().has_int32_value()) << "Row: " << row.ToString();
207
4.34k
        sum += value.int32_value();
208
4.34k
      }
209
4.34k
      count_since_report += 1;
210
211
      // Report metrics if enough time has passed
212
4.34k
      if (count_since_report > 100) {
213
35
        if (scanned_ts.get()) {
214
26
          scanned_ts->AddValue(count_since_report);
215
26
        }
216
35
        count_since_report = 0;
217
35
      }
218
4.34k
    }
219
220
34
    if (scanned_ts.get()) {
221
33
      scanned_ts->AddValue(count_since_report);
222
33
    }
223
224
34
    return sum;
225
34
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE1EEEE8CountSumERKNSt3__110shared_ptrINS_10TimeSeriesEEE
Line
Count
Source
191
21
  uint64_t CountSum(const shared_ptr<TimeSeries> &scanned_ts) {
192
21
    uint64_t count_since_report = 0;
193
194
21
    uint64_t sum = 0;
195
196
21
    auto iter = tablet()->NewRowIterator(valcol_projection_);
197
21
    CHECK_OK(iter);
198
199
21
    QLTableRow row;
200
524
    while (CHECK_RESULT((**iter).HasNext())) {
201
524
      CHECK_OK((**iter).NextRow(&row));
202
203
524
      QLValue value;
204
524
      CHECK_OK(row.GetValue(schema_.column_id(2), &value));
205
524
      if (!value.IsNull()) {
206
0
        CHECK(value.value().has_int32_value()) << "Row: " << row.ToString();
207
524
        sum += value.int32_value();
208
524
      }
209
524
      count_since_report += 1;
210
211
      // Report metrics if enough time has passed
212
524
      if (count_since_report > 100) {
213
2
        if (scanned_ts.get()) {
214
1
          scanned_ts->AddValue(count_since_report);
215
1
        }
216
2
        count_since_report = 0;
217
2
      }
218
524
    }
219
220
21
    if (scanned_ts.get()) {
221
20
      scanned_ts->AddValue(count_since_report);
222
20
    }
223
224
21
    return sum;
225
21
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE2EEEE8CountSumERKNSt3__110shared_ptrINS_10TimeSeriesEEE
Line
Count
Source
191
30
  uint64_t CountSum(const shared_ptr<TimeSeries> &scanned_ts) {
192
30
    uint64_t count_since_report = 0;
193
194
30
    uint64_t sum = 0;
195
196
30
    auto iter = tablet()->NewRowIterator(valcol_projection_);
197
30
    CHECK_OK(iter);
198
199
30
    QLTableRow row;
200
4.33k
    while (CHECK_RESULT((**iter).HasNext())) {
201
4.33k
      CHECK_OK((**iter).NextRow(&row));
202
203
4.33k
      QLValue value;
204
4.33k
      CHECK_OK(row.GetValue(schema_.column_id(2), &value));
205
4.33k
      if (!value.IsNull()) {
206
0
        CHECK(value.value().has_int32_value()) << "Row: " << row.ToString();
207
4.33k
        sum += value.int32_value();
208
4.33k
      }
209
4.33k
      count_since_report += 1;
210
211
      // Report metrics if enough time has passed
212
4.33k
      if (count_since_report > 100) {
213
35
        if (scanned_ts.get()) {
214
26
          scanned_ts->AddValue(count_since_report);
215
26
        }
216
35
        count_since_report = 0;
217
35
      }
218
4.33k
    }
219
220
30
    if (scanned_ts.get()) {
221
29
      scanned_ts->AddValue(count_since_report);
222
29
    }
223
224
30
    return sum;
225
30
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE3EEEE8CountSumERKNSt3__110shared_ptrINS_10TimeSeriesEEE
Line
Count
Source
191
14
  uint64_t CountSum(const shared_ptr<TimeSeries> &scanned_ts) {
192
14
    uint64_t count_since_report = 0;
193
194
14
    uint64_t sum = 0;
195
196
14
    auto iter = tablet()->NewRowIterator(valcol_projection_);
197
14
    CHECK_OK(iter);
198
199
14
    QLTableRow row;
200
3.78k
    while (CHECK_RESULT((**iter).HasNext())) {
201
3.78k
      CHECK_OK((**iter).NextRow(&row));
202
203
3.78k
      QLValue value;
204
3.78k
      CHECK_OK(row.GetValue(schema_.column_id(2), &value));
205
3.78k
      if (!value.IsNull()) {
206
0
        CHECK(value.value().has_int32_value()) << "Row: " << row.ToString();
207
3.78k
        sum += value.int32_value();
208
3.78k
      }
209
3.78k
      count_since_report += 1;
210
211
      // Report metrics if enough time has passed
212
3.78k
      if (count_since_report > 100) {
213
32
        if (scanned_ts.get()) {
214
23
          scanned_ts->AddValue(count_since_report);
215
23
        }
216
32
        count_since_report = 0;
217
32
      }
218
3.78k
    }
219
220
14
    if (scanned_ts.get()) {
221
13
      scanned_ts->AddValue(count_since_report);
222
13
    }
223
224
14
    return sum;
225
14
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE4EEEE8CountSumERKNSt3__110shared_ptrINS_10TimeSeriesEEE
Line
Count
Source
191
31
  uint64_t CountSum(const shared_ptr<TimeSeries> &scanned_ts) {
192
31
    uint64_t count_since_report = 0;
193
194
31
    uint64_t sum = 0;
195
196
31
    auto iter = tablet()->NewRowIterator(valcol_projection_);
197
31
    CHECK_OK(iter);
198
199
31
    QLTableRow row;
200
4.01k
    while (CHECK_RESULT((**iter).HasNext())) {
201
4.01k
      CHECK_OK((**iter).NextRow(&row));
202
203
4.01k
      QLValue value;
204
4.01k
      CHECK_OK(row.GetValue(schema_.column_id(2), &value));
205
4.01k
      if (!value.IsNull()) {
206
0
        CHECK(value.value().has_int32_value()) << "Row: " << row.ToString();
207
4.01k
        sum += value.int32_value();
208
4.01k
      }
209
4.01k
      count_since_report += 1;
210
211
      // Report metrics if enough time has passed
212
4.01k
      if (count_since_report > 100) {
213
33
        if (scanned_ts.get()) {
214
24
          scanned_ts->AddValue(count_since_report);
215
24
        }
216
33
        count_since_report = 0;
217
33
      }
218
4.01k
    }
219
220
31
    if (scanned_ts.get()) {
221
30
      scanned_ts->AddValue(count_since_report);
222
30
    }
223
224
31
    return sum;
225
31
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_22NullableValueTestSetupEE8CountSumERKNSt3__110shared_ptrINS_10TimeSeriesEEE
Line
Count
Source
191
34
  uint64_t CountSum(const shared_ptr<TimeSeries> &scanned_ts) {
192
34
    uint64_t count_since_report = 0;
193
194
34
    uint64_t sum = 0;
195
196
34
    auto iter = tablet()->NewRowIterator(valcol_projection_);
197
34
    CHECK_OK(iter);
198
199
34
    QLTableRow row;
200
3.98k
    while (CHECK_RESULT((**iter).HasNext())) {
201
3.98k
      CHECK_OK((**iter).NextRow(&row));
202
203
3.98k
      QLValue value;
204
3.98k
      CHECK_OK(row.GetValue(schema_.column_id(2), &value));
205
3.98k
      if (!value.IsNull()) {
206
0
        CHECK(value.value().has_int32_value()) << "Row: " << row.ToString();
207
2.00k
        sum += value.int32_value();
208
2.00k
      }
209
3.98k
      count_since_report += 1;
210
211
      // Report metrics if enough time has passed
212
3.98k
      if (count_since_report > 100) {
213
32
        if (scanned_ts.get()) {
214
23
          scanned_ts->AddValue(count_since_report);
215
23
        }
216
32
        count_since_report = 0;
217
32
      }
218
3.98k
    }
219
220
34
    if (scanned_ts.get()) {
221
33
      scanned_ts->AddValue(count_since_report);
222
33
    }
223
224
34
    return sum;
225
34
  }
226
227
  // Thread which cycles between inserting and deleting a test row, each time
228
  // with a different value.
229
60
  void DeleteAndReinsertCycleThread(int tid) {
230
60
    int32_t iteration = 0;
231
60
    LocalTabletWriter writer(this->tablet().get());
232
233
60
    while (running_insert_count_.count() > 0) {
234
0
      for (int i = 0; i < 100; i++) {
235
0
        CHECK_OK(this->InsertTestRow(&writer, tid, iteration++));
236
0
        CHECK_OK(this->DeleteTestRow(&writer, tid));
237
0
      }
238
0
    }
239
60
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_18StringKeyTestSetupEE28DeleteAndReinsertCycleThreadEi
Line
Count
Source
229
10
  void DeleteAndReinsertCycleThread(int tid) {
230
10
    int32_t iteration = 0;
231
10
    LocalTabletWriter writer(this->tablet().get());
232
233
10
    while (running_insert_count_.count() > 0) {
234
0
      for (int i = 0; i < 100; i++) {
235
0
        CHECK_OK(this->InsertTestRow(&writer, tid, iteration++));
236
0
        CHECK_OK(this->DeleteTestRow(&writer, tid));
237
0
      }
238
0
    }
239
10
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE1EEEE28DeleteAndReinsertCycleThreadEi
Line
Count
Source
229
10
  void DeleteAndReinsertCycleThread(int tid) {
230
10
    int32_t iteration = 0;
231
10
    LocalTabletWriter writer(this->tablet().get());
232
233
10
    while (running_insert_count_.count() > 0) {
234
0
      for (int i = 0; i < 100; i++) {
235
0
        CHECK_OK(this->InsertTestRow(&writer, tid, iteration++));
236
0
        CHECK_OK(this->DeleteTestRow(&writer, tid));
237
0
      }
238
0
    }
239
10
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE2EEEE28DeleteAndReinsertCycleThreadEi
Line
Count
Source
229
10
  void DeleteAndReinsertCycleThread(int tid) {
230
10
    int32_t iteration = 0;
231
10
    LocalTabletWriter writer(this->tablet().get());
232
233
10
    while (running_insert_count_.count() > 0) {
234
0
      for (int i = 0; i < 100; i++) {
235
0
        CHECK_OK(this->InsertTestRow(&writer, tid, iteration++));
236
0
        CHECK_OK(this->DeleteTestRow(&writer, tid));
237
0
      }
238
0
    }
239
10
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE3EEEE28DeleteAndReinsertCycleThreadEi
Line
Count
Source
229
10
  void DeleteAndReinsertCycleThread(int tid) {
230
10
    int32_t iteration = 0;
231
10
    LocalTabletWriter writer(this->tablet().get());
232
233
10
    while (running_insert_count_.count() > 0) {
234
0
      for (int i = 0; i < 100; i++) {
235
0
        CHECK_OK(this->InsertTestRow(&writer, tid, iteration++));
236
0
        CHECK_OK(this->DeleteTestRow(&writer, tid));
237
0
      }
238
0
    }
239
10
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE4EEEE28DeleteAndReinsertCycleThreadEi
Line
Count
Source
229
10
  void DeleteAndReinsertCycleThread(int tid) {
230
10
    int32_t iteration = 0;
231
10
    LocalTabletWriter writer(this->tablet().get());
232
233
10
    while (running_insert_count_.count() > 0) {
234
0
      for (int i = 0; i < 100; i++) {
235
0
        CHECK_OK(this->InsertTestRow(&writer, tid, iteration++));
236
0
        CHECK_OK(this->DeleteTestRow(&writer, tid));
237
0
      }
238
0
    }
239
10
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_22NullableValueTestSetupEE28DeleteAndReinsertCycleThreadEi
Line
Count
Source
229
10
  void DeleteAndReinsertCycleThread(int tid) {
230
10
    int32_t iteration = 0;
231
10
    LocalTabletWriter writer(this->tablet().get());
232
233
10
    while (running_insert_count_.count() > 0) {
234
0
      for (int i = 0; i < 100; i++) {
235
0
        CHECK_OK(this->InsertTestRow(&writer, tid, iteration++));
236
0
        CHECK_OK(this->DeleteTestRow(&writer, tid));
237
0
      }
238
0
    }
239
10
  }
240
241
  // Thread which continuously sends updates at the same row, ignoring any
242
  // "not found" errors that might come back. This is used simultaneously with
243
  // DeleteAndReinsertCycleThread to check for races where we might accidentally
244
  // succeed in UPDATING a ghost row.
245
60
  void StubbornlyUpdateSameRowThread(int tid) {
246
60
    int32_t iteration = 0;
247
60
    LocalTabletWriter writer(this->tablet().get());
248
60
    while (running_insert_count_.count() > 0) {
249
0
      for (int i = 0; i < 100; i++) {
250
0
        Status s = this->UpdateTestRow(&writer, tid, iteration++);
251
0
        if (!s.ok() && !s.IsNotFound()) {
252
          // We expect "not found", but not any other errors.
253
0
          CHECK_OK(s);
254
0
        }
255
0
      }
256
0
    }
257
60
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_18StringKeyTestSetupEE29StubbornlyUpdateSameRowThreadEi
Line
Count
Source
245
10
  void StubbornlyUpdateSameRowThread(int tid) {
246
10
    int32_t iteration = 0;
247
10
    LocalTabletWriter writer(this->tablet().get());
248
10
    while (running_insert_count_.count() > 0) {
249
0
      for (int i = 0; i < 100; i++) {
250
0
        Status s = this->UpdateTestRow(&writer, tid, iteration++);
251
0
        if (!s.ok() && !s.IsNotFound()) {
252
          // We expect "not found", but not any other errors.
253
0
          CHECK_OK(s);
254
0
        }
255
0
      }
256
0
    }
257
10
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE1EEEE29StubbornlyUpdateSameRowThreadEi
Line
Count
Source
245
10
  void StubbornlyUpdateSameRowThread(int tid) {
246
10
    int32_t iteration = 0;
247
10
    LocalTabletWriter writer(this->tablet().get());
248
10
    while (running_insert_count_.count() > 0) {
249
0
      for (int i = 0; i < 100; i++) {
250
0
        Status s = this->UpdateTestRow(&writer, tid, iteration++);
251
0
        if (!s.ok() && !s.IsNotFound()) {
252
          // We expect "not found", but not any other errors.
253
0
          CHECK_OK(s);
254
0
        }
255
0
      }
256
0
    }
257
10
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE2EEEE29StubbornlyUpdateSameRowThreadEi
Line
Count
Source
245
10
  void StubbornlyUpdateSameRowThread(int tid) {
246
10
    int32_t iteration = 0;
247
10
    LocalTabletWriter writer(this->tablet().get());
248
10
    while (running_insert_count_.count() > 0) {
249
0
      for (int i = 0; i < 100; i++) {
250
0
        Status s = this->UpdateTestRow(&writer, tid, iteration++);
251
0
        if (!s.ok() && !s.IsNotFound()) {
252
          // We expect "not found", but not any other errors.
253
0
          CHECK_OK(s);
254
0
        }
255
0
      }
256
0
    }
257
10
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE3EEEE29StubbornlyUpdateSameRowThreadEi
Line
Count
Source
245
10
  void StubbornlyUpdateSameRowThread(int tid) {
246
10
    int32_t iteration = 0;
247
10
    LocalTabletWriter writer(this->tablet().get());
248
10
    while (running_insert_count_.count() > 0) {
249
0
      for (int i = 0; i < 100; i++) {
250
0
        Status s = this->UpdateTestRow(&writer, tid, iteration++);
251
0
        if (!s.ok() && !s.IsNotFound()) {
252
          // We expect "not found", but not any other errors.
253
0
          CHECK_OK(s);
254
0
        }
255
0
      }
256
0
    }
257
10
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE4EEEE29StubbornlyUpdateSameRowThreadEi
Line
Count
Source
245
10
  void StubbornlyUpdateSameRowThread(int tid) {
246
10
    int32_t iteration = 0;
247
10
    LocalTabletWriter writer(this->tablet().get());
248
10
    while (running_insert_count_.count() > 0) {
249
0
      for (int i = 0; i < 100; i++) {
250
0
        Status s = this->UpdateTestRow(&writer, tid, iteration++);
251
0
        if (!s.ok() && !s.IsNotFound()) {
252
          // We expect "not found", but not any other errors.
253
0
          CHECK_OK(s);
254
0
        }
255
0
      }
256
0
    }
257
10
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_22NullableValueTestSetupEE29StubbornlyUpdateSameRowThreadEi
Line
Count
Source
245
10
  void StubbornlyUpdateSameRowThread(int tid) {
246
10
    int32_t iteration = 0;
247
10
    LocalTabletWriter writer(this->tablet().get());
248
10
    while (running_insert_count_.count() > 0) {
249
0
      for (int i = 0; i < 100; i++) {
250
0
        Status s = this->UpdateTestRow(&writer, tid, iteration++);
251
0
        if (!s.ok() && !s.IsNotFound()) {
252
          // We expect "not found", but not any other errors.
253
0
          CHECK_OK(s);
254
0
        }
255
0
      }
256
0
    }
257
10
  }
258
259
  template<typename FunctionType>
260
30
  void StartThreads(int n_threads, const FunctionType &function) {
261
168
    for (int i = 0; i < n_threads; i++) {
262
138
      scoped_refptr<yb::Thread> new_thread;
263
138
      CHECK_OK(yb::Thread::Create("test", strings::Substitute("test$0", i),
264
138
          function, this, i, &new_thread));
265
138
      threads_.push_back(new_thread);
266
138
    }
267
30
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_18StringKeyTestSetupEE12StartThreadsIMS3_FviEEEviRKT_
Line
Count
Source
260
5
  void StartThreads(int n_threads, const FunctionType &function) {
261
28
    for (int i = 0; i < n_threads; i++) {
262
23
      scoped_refptr<yb::Thread> new_thread;
263
23
      CHECK_OK(yb::Thread::Create("test", strings::Substitute("test$0", i),
264
23
          function, this, i, &new_thread));
265
23
      threads_.push_back(new_thread);
266
23
    }
267
5
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE1EEEE12StartThreadsIMS5_FviEEEviRKT_
Line
Count
Source
260
5
  void StartThreads(int n_threads, const FunctionType &function) {
261
28
    for (int i = 0; i < n_threads; i++) {
262
23
      scoped_refptr<yb::Thread> new_thread;
263
23
      CHECK_OK(yb::Thread::Create("test", strings::Substitute("test$0", i),
264
23
          function, this, i, &new_thread));
265
23
      threads_.push_back(new_thread);
266
23
    }
267
5
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE2EEEE12StartThreadsIMS5_FviEEEviRKT_
Line
Count
Source
260
5
  void StartThreads(int n_threads, const FunctionType &function) {
261
28
    for (int i = 0; i < n_threads; i++) {
262
23
      scoped_refptr<yb::Thread> new_thread;
263
23
      CHECK_OK(yb::Thread::Create("test", strings::Substitute("test$0", i),
264
23
          function, this, i, &new_thread));
265
23
      threads_.push_back(new_thread);
266
23
    }
267
5
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE3EEEE12StartThreadsIMS5_FviEEEviRKT_
Line
Count
Source
260
5
  void StartThreads(int n_threads, const FunctionType &function) {
261
28
    for (int i = 0; i < n_threads; i++) {
262
23
      scoped_refptr<yb::Thread> new_thread;
263
23
      CHECK_OK(yb::Thread::Create("test", strings::Substitute("test$0", i),
264
23
          function, this, i, &new_thread));
265
23
      threads_.push_back(new_thread);
266
23
    }
267
5
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE4EEEE12StartThreadsIMS5_FviEEEviRKT_
Line
Count
Source
260
5
  void StartThreads(int n_threads, const FunctionType &function) {
261
28
    for (int i = 0; i < n_threads; i++) {
262
23
      scoped_refptr<yb::Thread> new_thread;
263
23
      CHECK_OK(yb::Thread::Create("test", strings::Substitute("test$0", i),
264
23
          function, this, i, &new_thread));
265
23
      threads_.push_back(new_thread);
266
23
    }
267
5
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_22NullableValueTestSetupEE12StartThreadsIMS3_FviEEEviRKT_
Line
Count
Source
260
5
  void StartThreads(int n_threads, const FunctionType &function) {
261
28
    for (int i = 0; i < n_threads; i++) {
262
23
      scoped_refptr<yb::Thread> new_thread;
263
23
      CHECK_OK(yb::Thread::Create("test", strings::Substitute("test$0", i),
264
23
          function, this, i, &new_thread));
265
23
      threads_.push_back(new_thread);
266
23
    }
267
5
  }
268
269
12
  void JoinThreads() {
270
156
    for (scoped_refptr<yb::Thread> thr : threads_) {
271
156
     CHECK_OK(ThreadJoiner(thr.get()).Join());
272
156
    }
273
12
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_18StringKeyTestSetupEE11JoinThreadsEv
Line
Count
Source
269
2
  void JoinThreads() {
270
26
    for (scoped_refptr<yb::Thread> thr : threads_) {
271
26
     CHECK_OK(ThreadJoiner(thr.get()).Join());
272
26
    }
273
2
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE1EEEE11JoinThreadsEv
Line
Count
Source
269
2
  void JoinThreads() {
270
26
    for (scoped_refptr<yb::Thread> thr : threads_) {
271
26
     CHECK_OK(ThreadJoiner(thr.get()).Join());
272
26
    }
273
2
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE2EEEE11JoinThreadsEv
Line
Count
Source
269
2
  void JoinThreads() {
270
26
    for (scoped_refptr<yb::Thread> thr : threads_) {
271
26
     CHECK_OK(ThreadJoiner(thr.get()).Join());
272
26
    }
273
2
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE3EEEE11JoinThreadsEv
Line
Count
Source
269
2
  void JoinThreads() {
270
26
    for (scoped_refptr<yb::Thread> thr : threads_) {
271
26
     CHECK_OK(ThreadJoiner(thr.get()).Join());
272
26
    }
273
2
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_15IntKeyTestSetupILNS_8DataTypeE4EEEE11JoinThreadsEv
Line
Count
Source
269
2
  void JoinThreads() {
270
26
    for (scoped_refptr<yb::Thread> thr : threads_) {
271
26
     CHECK_OK(ThreadJoiner(thr.get()).Join());
272
26
    }
273
2
  }
_ZN2yb6tablet20VerifyRowsTabletTestINS0_22NullableValueTestSetupEE11JoinThreadsEv
Line
Count
Source
269
2
  void JoinThreads() {
270
26
    for (scoped_refptr<yb::Thread> thr : threads_) {
271
26
     CHECK_OK(ThreadJoiner(thr.get()).Join());
272
26
    }
273
2
  }
274
275
  std::vector<scoped_refptr<yb::Thread> > threads_;
276
  CountDownLatch running_insert_count_;
277
278
  // Projection with only an int column.
279
  // This is provided by both harnesses.
280
  Schema valcol_projection_;
281
282
  TimeSeriesCollector ts_collector_;
283
};
284
285
TYPED_TEST_CASE(VerifyRowsTabletTest, TabletTestHelperTypes);
286
287
6
TYPED_TEST(VerifyRowsTabletTest, DoTestAllAtOnce) {
288
6
  if (1000 == FLAGS_inserts_per_thread) {
289
6
    if (AllowSlowTests()) {
290
0
      FLAGS_inserts_per_thread = 50000;
291
0
    }
292
6
  }
293
294
  // Spawn a bunch of threads, each of which will do updates.
295
6
  this->StartThreads(kNumInsertThreads, &TestFixture::InsertThread);
296
6
  this->StartThreads(FLAGS_num_summer_threads, &TestFixture::SummerThread);
297
6
  this->StartThreads(FLAGS_num_slowreader_threads, &TestFixture::SlowReaderThread);
298
6
  this->JoinThreads();
299
6
  LOG_TIMING(INFO, "Summing int32 column") {
300
6
    uint64_t sum = this->CountSum(shared_ptr<TimeSeries>());
301
6
    LOG(INFO) << "Sum = " << sum;
302
6
  }
303
304
6
  auto max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads)
305
6
          / kNumInsertThreads;
306
307
6
  this->VerifyTestRows(0, max_rows * kNumInsertThreads);
308
309
  // Start up a bunch of threads which repeatedly insert and delete the same
310
  // row, while flushing and compacting. This checks various concurrent handling
311
  // of DELETE/REINSERT during flushes.
312
6
  google::FlagSaver saver;
313
6
  this->StartThreads(10, &TestFixture::DeleteAndReinsertCycleThread);
314
6
  this->StartThreads(10, &TestFixture::StubbornlyUpdateSameRowThread);
315
316
  // Run very quickly in dev builds, longer in slow builds.
317
6
  float runtime_seconds = AllowSlowTests() ? 2 : 0.1;
318
6
  Stopwatch sw;
319
6
  sw.start();
320
108
  while (sw.elapsed().wall < runtime_seconds * NANOS_PER_SECOND &&
321
102
         !this->HasFatalFailure()) {
322
102
    SleepFor(MonoDelta::FromMicroseconds(5000));
323
102
  }
324
325
  // This is sort of a hack -- the flusher thread stops when it sees this
326
  // countdown latch go to 0.
327
6
  this->running_insert_count_.Reset(0);
328
6
  this->JoinThreads();
329
6
}
_ZN2yb6tablet41VerifyRowsTabletTest_DoTestAllAtOnce_TestINS0_18StringKeyTestSetupEE8TestBodyEv
Line
Count
Source
287
1
TYPED_TEST(VerifyRowsTabletTest, DoTestAllAtOnce) {
288
1
  if (1000 == FLAGS_inserts_per_thread) {
289
1
    if (AllowSlowTests()) {
290
0
      FLAGS_inserts_per_thread = 50000;
291
0
    }
292
1
  }
293
294
  // Spawn a bunch of threads, each of which will do updates.
295
1
  this->StartThreads(kNumInsertThreads, &TestFixture::InsertThread);
296
1
  this->StartThreads(FLAGS_num_summer_threads, &TestFixture::SummerThread);
297
1
  this->StartThreads(FLAGS_num_slowreader_threads, &TestFixture::SlowReaderThread);
298
1
  this->JoinThreads();
299
1
  LOG_TIMING(INFO, "Summing int32 column") {
300
1
    uint64_t sum = this->CountSum(shared_ptr<TimeSeries>());
301
1
    LOG(INFO) << "Sum = " << sum;
302
1
  }
303
304
1
  auto max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads)
305
1
          / kNumInsertThreads;
306
307
1
  this->VerifyTestRows(0, max_rows * kNumInsertThreads);
308
309
  // Start up a bunch of threads which repeatedly insert and delete the same
310
  // row, while flushing and compacting. This checks various concurrent handling
311
  // of DELETE/REINSERT during flushes.
312
1
  google::FlagSaver saver;
313
1
  this->StartThreads(10, &TestFixture::DeleteAndReinsertCycleThread);
314
1
  this->StartThreads(10, &TestFixture::StubbornlyUpdateSameRowThread);
315
316
  // Run very quickly in dev builds, longer in slow builds.
317
1
  float runtime_seconds = AllowSlowTests() ? 2 : 0.1;
318
1
  Stopwatch sw;
319
1
  sw.start();
320
18
  while (sw.elapsed().wall < runtime_seconds * NANOS_PER_SECOND &&
321
17
         !this->HasFatalFailure()) {
322
17
    SleepFor(MonoDelta::FromMicroseconds(5000));
323
17
  }
324
325
  // This is sort of a hack -- the flusher thread stops when it sees this
326
  // countdown latch go to 0.
327
1
  this->running_insert_count_.Reset(0);
328
1
  this->JoinThreads();
329
1
}
_ZN2yb6tablet41VerifyRowsTabletTest_DoTestAllAtOnce_TestINS0_15IntKeyTestSetupILNS_8DataTypeE1EEEE8TestBodyEv
Line
Count
Source
287
1
TYPED_TEST(VerifyRowsTabletTest, DoTestAllAtOnce) {
288
1
  if (1000 == FLAGS_inserts_per_thread) {
289
1
    if (AllowSlowTests()) {
290
0
      FLAGS_inserts_per_thread = 50000;
291
0
    }
292
1
  }
293
294
  // Spawn a bunch of threads, each of which will do updates.
295
1
  this->StartThreads(kNumInsertThreads, &TestFixture::InsertThread);
296
1
  this->StartThreads(FLAGS_num_summer_threads, &TestFixture::SummerThread);
297
1
  this->StartThreads(FLAGS_num_slowreader_threads, &TestFixture::SlowReaderThread);
298
1
  this->JoinThreads();
299
1
  LOG_TIMING(INFO, "Summing int32 column") {
300
1
    uint64_t sum = this->CountSum(shared_ptr<TimeSeries>());
301
1
    LOG(INFO) << "Sum = " << sum;
302
1
  }
303
304
1
  auto max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads)
305
1
          / kNumInsertThreads;
306
307
1
  this->VerifyTestRows(0, max_rows * kNumInsertThreads);
308
309
  // Start up a bunch of threads which repeatedly insert and delete the same
310
  // row, while flushing and compacting. This checks various concurrent handling
311
  // of DELETE/REINSERT during flushes.
312
1
  google::FlagSaver saver;
313
1
  this->StartThreads(10, &TestFixture::DeleteAndReinsertCycleThread);
314
1
  this->StartThreads(10, &TestFixture::StubbornlyUpdateSameRowThread);
315
316
  // Run very quickly in dev builds, longer in slow builds.
317
1
  float runtime_seconds = AllowSlowTests() ? 2 : 0.1;
318
1
  Stopwatch sw;
319
1
  sw.start();
320
18
  while (sw.elapsed().wall < runtime_seconds * NANOS_PER_SECOND &&
321
17
         !this->HasFatalFailure()) {
322
17
    SleepFor(MonoDelta::FromMicroseconds(5000));
323
17
  }
324
325
  // This is sort of a hack -- the flusher thread stops when it sees this
326
  // countdown latch go to 0.
327
1
  this->running_insert_count_.Reset(0);
328
1
  this->JoinThreads();
329
1
}
_ZN2yb6tablet41VerifyRowsTabletTest_DoTestAllAtOnce_TestINS0_15IntKeyTestSetupILNS_8DataTypeE2EEEE8TestBodyEv
Line
Count
Source
287
1
TYPED_TEST(VerifyRowsTabletTest, DoTestAllAtOnce) {
288
1
  if (1000 == FLAGS_inserts_per_thread) {
289
1
    if (AllowSlowTests()) {
290
0
      FLAGS_inserts_per_thread = 50000;
291
0
    }
292
1
  }
293
294
  // Spawn a bunch of threads, each of which will do updates.
295
1
  this->StartThreads(kNumInsertThreads, &TestFixture::InsertThread);
296
1
  this->StartThreads(FLAGS_num_summer_threads, &TestFixture::SummerThread);
297
1
  this->StartThreads(FLAGS_num_slowreader_threads, &TestFixture::SlowReaderThread);
298
1
  this->JoinThreads();
299
1
  LOG_TIMING(INFO, "Summing int32 column") {
300
1
    uint64_t sum = this->CountSum(shared_ptr<TimeSeries>());
301
1
    LOG(INFO) << "Sum = " << sum;
302
1
  }
303
304
1
  auto max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads)
305
1
          / kNumInsertThreads;
306
307
1
  this->VerifyTestRows(0, max_rows * kNumInsertThreads);
308
309
  // Start up a bunch of threads which repeatedly insert and delete the same
310
  // row, while flushing and compacting. This checks various concurrent handling
311
  // of DELETE/REINSERT during flushes.
312
1
  google::FlagSaver saver;
313
1
  this->StartThreads(10, &TestFixture::DeleteAndReinsertCycleThread);
314
1
  this->StartThreads(10, &TestFixture::StubbornlyUpdateSameRowThread);
315
316
  // Run very quickly in dev builds, longer in slow builds.
317
1
  float runtime_seconds = AllowSlowTests() ? 2 : 0.1;
318
1
  Stopwatch sw;
319
1
  sw.start();
320
18
  while (sw.elapsed().wall < runtime_seconds * NANOS_PER_SECOND &&
321
17
         !this->HasFatalFailure()) {
322
17
    SleepFor(MonoDelta::FromMicroseconds(5000));
323
17
  }
324
325
  // This is sort of a hack -- the flusher thread stops when it sees this
326
  // countdown latch go to 0.
327
1
  this->running_insert_count_.Reset(0);
328
1
  this->JoinThreads();
329
1
}
_ZN2yb6tablet41VerifyRowsTabletTest_DoTestAllAtOnce_TestINS0_15IntKeyTestSetupILNS_8DataTypeE3EEEE8TestBodyEv
Line
Count
Source
287
1
TYPED_TEST(VerifyRowsTabletTest, DoTestAllAtOnce) {
288
1
  if (1000 == FLAGS_inserts_per_thread) {
289
1
    if (AllowSlowTests()) {
290
0
      FLAGS_inserts_per_thread = 50000;
291
0
    }
292
1
  }
293
294
  // Spawn a bunch of threads, each of which will do updates.
295
1
  this->StartThreads(kNumInsertThreads, &TestFixture::InsertThread);
296
1
  this->StartThreads(FLAGS_num_summer_threads, &TestFixture::SummerThread);
297
1
  this->StartThreads(FLAGS_num_slowreader_threads, &TestFixture::SlowReaderThread);
298
1
  this->JoinThreads();
299
1
  LOG_TIMING(INFO, "Summing int32 column") {
300
1
    uint64_t sum = this->CountSum(shared_ptr<TimeSeries>());
301
1
    LOG(INFO) << "Sum = " << sum;
302
1
  }
303
304
1
  auto max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads)
305
1
          / kNumInsertThreads;
306
307
1
  this->VerifyTestRows(0, max_rows * kNumInsertThreads);
308
309
  // Start up a bunch of threads which repeatedly insert and delete the same
310
  // row, while flushing and compacting. This checks various concurrent handling
311
  // of DELETE/REINSERT during flushes.
312
1
  google::FlagSaver saver;
313
1
  this->StartThreads(10, &TestFixture::DeleteAndReinsertCycleThread);
314
1
  this->StartThreads(10, &TestFixture::StubbornlyUpdateSameRowThread);
315
316
  // Run very quickly in dev builds, longer in slow builds.
317
1
  float runtime_seconds = AllowSlowTests() ? 2 : 0.1;
318
1
  Stopwatch sw;
319
1
  sw.start();
320
18
  while (sw.elapsed().wall < runtime_seconds * NANOS_PER_SECOND &&
321
17
         !this->HasFatalFailure()) {
322
17
    SleepFor(MonoDelta::FromMicroseconds(5000));
323
17
  }
324
325
  // This is sort of a hack -- the flusher thread stops when it sees this
326
  // countdown latch go to 0.
327
1
  this->running_insert_count_.Reset(0);
328
1
  this->JoinThreads();
329
1
}
_ZN2yb6tablet41VerifyRowsTabletTest_DoTestAllAtOnce_TestINS0_15IntKeyTestSetupILNS_8DataTypeE4EEEE8TestBodyEv
Line
Count
Source
287
1
TYPED_TEST(VerifyRowsTabletTest, DoTestAllAtOnce) {
288
1
  if (1000 == FLAGS_inserts_per_thread) {
289
1
    if (AllowSlowTests()) {
290
0
      FLAGS_inserts_per_thread = 50000;
291
0
    }
292
1
  }
293
294
  // Spawn a bunch of threads, each of which will do updates.
295
1
  this->StartThreads(kNumInsertThreads, &TestFixture::InsertThread);
296
1
  this->StartThreads(FLAGS_num_summer_threads, &TestFixture::SummerThread);
297
1
  this->StartThreads(FLAGS_num_slowreader_threads, &TestFixture::SlowReaderThread);
298
1
  this->JoinThreads();
299
1
  LOG_TIMING(INFO, "Summing int32 column") {
300
1
    uint64_t sum = this->CountSum(shared_ptr<TimeSeries>());
301
1
    LOG(INFO) << "Sum = " << sum;
302
1
  }
303
304
1
  auto max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads)
305
1
          / kNumInsertThreads;
306
307
1
  this->VerifyTestRows(0, max_rows * kNumInsertThreads);
308
309
  // Start up a bunch of threads which repeatedly insert and delete the same
310
  // row, while flushing and compacting. This checks various concurrent handling
311
  // of DELETE/REINSERT during flushes.
312
1
  google::FlagSaver saver;
313
1
  this->StartThreads(10, &TestFixture::DeleteAndReinsertCycleThread);
314
1
  this->StartThreads(10, &TestFixture::StubbornlyUpdateSameRowThread);
315
316
  // Run very quickly in dev builds, longer in slow builds.
317
1
  float runtime_seconds = AllowSlowTests() ? 2 : 0.1;
318
1
  Stopwatch sw;
319
1
  sw.start();
320
18
  while (sw.elapsed().wall < runtime_seconds * NANOS_PER_SECOND &&
321
17
         !this->HasFatalFailure()) {
322
17
    SleepFor(MonoDelta::FromMicroseconds(5000));
323
17
  }
324
325
  // This is sort of a hack -- the flusher thread stops when it sees this
326
  // countdown latch go to 0.
327
1
  this->running_insert_count_.Reset(0);
328
1
  this->JoinThreads();
329
1
}
_ZN2yb6tablet41VerifyRowsTabletTest_DoTestAllAtOnce_TestINS0_22NullableValueTestSetupEE8TestBodyEv
Line
Count
Source
287
1
TYPED_TEST(VerifyRowsTabletTest, DoTestAllAtOnce) {
288
1
  if (1000 == FLAGS_inserts_per_thread) {
289
1
    if (AllowSlowTests()) {
290
0
      FLAGS_inserts_per_thread = 50000;
291
0
    }
292
1
  }
293
294
  // Spawn a bunch of threads, each of which will do updates.
295
1
  this->StartThreads(kNumInsertThreads, &TestFixture::InsertThread);
296
1
  this->StartThreads(FLAGS_num_summer_threads, &TestFixture::SummerThread);
297
1
  this->StartThreads(FLAGS_num_slowreader_threads, &TestFixture::SlowReaderThread);
298
1
  this->JoinThreads();
299
1
  LOG_TIMING(INFO, "Summing int32 column") {
300
1
    uint64_t sum = this->CountSum(shared_ptr<TimeSeries>());
301
1
    LOG(INFO) << "Sum = " << sum;
302
1
  }
303
304
1
  auto max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * kNumInsertThreads)
305
1
          / kNumInsertThreads;
306
307
1
  this->VerifyTestRows(0, max_rows * kNumInsertThreads);
308
309
  // Start up a bunch of threads which repeatedly insert and delete the same
310
  // row, while flushing and compacting. This checks various concurrent handling
311
  // of DELETE/REINSERT during flushes.
312
1
  google::FlagSaver saver;
313
1
  this->StartThreads(10, &TestFixture::DeleteAndReinsertCycleThread);
314
1
  this->StartThreads(10, &TestFixture::StubbornlyUpdateSameRowThread);
315
316
  // Run very quickly in dev builds, longer in slow builds.
317
1
  float runtime_seconds = AllowSlowTests() ? 2 : 0.1;
318
1
  Stopwatch sw;
319
1
  sw.start();
320
18
  while (sw.elapsed().wall < runtime_seconds * NANOS_PER_SECOND &&
321
17
         !this->HasFatalFailure()) {
322
17
    SleepFor(MonoDelta::FromMicroseconds(5000));
323
17
  }
324
325
  // This is sort of a hack -- the flusher thread stops when it sees this
326
  // countdown latch go to 0.
327
1
  this->running_insert_count_.Reset(0);
328
1
  this->JoinThreads();
329
1
}
330
331
// NOTE: Cannot add another TYPED_TEST here. The opid chosen for the next insert might
332
// be out of order and will assert in rocksdb::MemTable::SetLastOpId().
333
334
} // namespace tablet
335
} // namespace yb