/Users/deen/code/yugabyte-db/src/yb/docdb/doc_operation-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include <thread> |
15 | | |
16 | | #include "yb/common/common.pb.h" |
17 | | #include "yb/common/index.h" |
18 | | #include "yb/common/ql_protocol_util.h" |
19 | | #include "yb/common/ql_resultset.h" |
20 | | #include "yb/common/ql_rowblock.h" |
21 | | #include "yb/common/ql_value.h" |
22 | | #include "yb/common/transaction-test-util.h" |
23 | | |
24 | | #include "yb/docdb/cql_operation.h" |
25 | | #include "yb/docdb/doc_rowwise_iterator.h" |
26 | | #include "yb/docdb/docdb_debug.h" |
27 | | #include "yb/docdb/docdb_rocksdb_util.h" |
28 | | #include "yb/docdb/docdb_test_base.h" |
29 | | #include "yb/docdb/docdb_test_util.h" |
30 | | #include "yb/docdb/ql_rocksdb_storage.h" |
31 | | #include "yb/docdb/redis_operation.h" |
32 | | |
33 | | #include "yb/gutil/casts.h" |
34 | | |
35 | | #include "yb/rocksdb/db/filename.h" |
36 | | #include "yb/rocksdb/db/internal_stats.h" |
37 | | |
38 | | #include "yb/server/hybrid_clock.h" |
39 | | |
40 | | #include "yb/util/random_util.h" |
41 | | #include "yb/util/size_literals.h" |
42 | | #include "yb/util/tostring.h" |
43 | | |
44 | | DECLARE_uint64(rocksdb_max_file_size_for_compaction); |
45 | | DECLARE_int32(rocksdb_level0_slowdown_writes_trigger); |
46 | | DECLARE_int32(rocksdb_level0_stop_writes_trigger); |
47 | | DECLARE_int32(rocksdb_level0_file_num_compaction_trigger); |
48 | | |
49 | | using namespace std::literals; // NOLINT |
50 | | |
51 | | namespace yb { |
52 | | namespace docdb { |
53 | | |
54 | | using server::HybridClock; |
55 | | |
56 | | namespace { |
57 | | |
58 | 0 | std::vector<ColumnId> CreateColumnIds(size_t count) { |
59 | 0 | std::vector<ColumnId> result; |
60 | 0 | result.reserve(count); |
61 | 0 | for (size_t i = 0; i != count; ++i) { |
62 | 0 | result.emplace_back(i); |
63 | 0 | } |
64 | 0 | return result; |
65 | 0 | } |
66 | | |
67 | | constexpr int32_t kFixedHashCode = 0; |
68 | | const int64_t kAlwaysDiscard = -1; |
69 | | |
70 | | // DiscardUntilFileFilter is a test CompactionFileFilter than will be used to mark some |
71 | | // files for direct deletion during the PickCompaction process in UniversalCompactionPicker. |
72 | | // Any files with numbers less than the provided last discard file number will be |
73 | | // included in the compaction but directly deleted. |
74 | | // |
75 | | // DiscardUntilFileFilter emulates behavior of DocDBCompactionFileFilter, which expires |
76 | | // files based on their TTL. |
77 | | class DiscardUntilFileFilter : public rocksdb::CompactionFileFilter { |
78 | | public: |
79 | | explicit DiscardUntilFileFilter(int64_t last_discard) : |
80 | 0 | last_discard_(last_discard) {} |
81 | | |
82 | | // Filters all file numbers less than or equal to the filter's last_discard_. |
83 | | // Setting last_discard_ to the kAlwaysDiscard constant will result in every |
84 | | // incoming file being filtered. |
85 | 0 | rocksdb::FilterDecision Filter(const rocksdb::FileMetaData* file) override { |
86 | 0 | if (last_discard_ == kAlwaysDiscard || |
87 | 0 | file->fd.GetNumber() <= implicit_cast<uint64_t>(last_discard_)) { |
88 | 0 | LOG(INFO) << "Filtering file: " << file->fd.GetNumber() << ", size: " |
89 | 0 | << file->fd.GetBaseFileSize() << ", total file size: " << file->fd.GetTotalFileSize(); |
90 | 0 | return rocksdb::FilterDecision::kDiscard; |
91 | 0 | } |
92 | 0 | return rocksdb::FilterDecision::kKeep; |
93 | 0 | } |
94 | | |
95 | 0 | const char* Name() const override { return "DiscardUntilFileFilter"; } |
96 | | |
97 | | private: |
98 | | const int64_t last_discard_; |
99 | | }; |
100 | | |
101 | | // A CompactionFileFilterFactory that takes a file number as input, and filters all |
102 | | // file numbers lower than or equal to that one. Any larger file numbers will not |
103 | | // be filtered. |
104 | | class DiscardUntilFileFilterFactory : public rocksdb::CompactionFileFilterFactory { |
105 | | public: |
106 | | explicit DiscardUntilFileFilterFactory(const int64_t last_to_discard) : |
107 | 0 | last_to_discard_(last_to_discard) {} |
108 | | |
109 | | std::unique_ptr<rocksdb::CompactionFileFilter> CreateCompactionFileFilter( |
110 | 0 | const std::vector<rocksdb::FileMetaData*>& inputs) override { |
111 | 0 | return std::make_unique<DiscardUntilFileFilter>(last_to_discard_); |
112 | 0 | } |
113 | | |
114 | 0 | const char* Name() const override { return "DiscardUntilFileFilterFactory"; } |
115 | | |
116 | | private: |
117 | | const int64_t last_to_discard_; |
118 | | }; |
119 | | |
120 | | // MakeMaxFileSizeFunction will create a function that returns the |
121 | | // rocksdb_max_file_size_for_compaction flag if it is set to a positive number, and returns |
122 | | // the max uint64 otherwise. It does NOT take the schema's table TTL into consideration. |
123 | 0 | auto MakeMaxFileSizeFunction() { |
124 | | // Trick to get type of max_file_size_for_compaction field. |
125 | 0 | typedef typename decltype( |
126 | 0 | static_cast<rocksdb::Options*>(nullptr)->max_file_size_for_compaction)::element_type |
127 | 0 | MaxFileSizeFunction; |
128 | 0 | return std::make_shared<MaxFileSizeFunction>([]{ |
129 | 0 | if (FLAGS_rocksdb_max_file_size_for_compaction > 0) { |
130 | 0 | return FLAGS_rocksdb_max_file_size_for_compaction; |
131 | 0 | } |
132 | 0 | return std::numeric_limits<uint64_t>::max(); |
133 | 0 | }); |
134 | 0 | } |
135 | | |
136 | | } // namespace |
137 | | |
138 | | class DocOperationTest : public DocDBTestBase { |
139 | | public: |
140 | 0 | DocOperationTest() { |
141 | 0 | SeedRandom(); |
142 | 0 | } |
143 | | |
144 | 0 | Schema CreateSchema() { |
145 | 0 | ColumnSchema hash_column_schema("k", INT32, false, true); |
146 | 0 | ColumnSchema column1_schema("c1", INT32, false, false); |
147 | 0 | ColumnSchema column2_schema("c2", INT32, false, false); |
148 | 0 | ColumnSchema column3_schema("c3", INT32, false, false); |
149 | 0 | const vector<ColumnSchema> columns({hash_column_schema, column1_schema, column2_schema, |
150 | 0 | column3_schema}); |
151 | 0 | Schema schema(columns, CreateColumnIds(columns.size()), 1); |
152 | 0 | return schema; |
153 | 0 | } |
154 | | |
155 | 0 | void AddPrimaryKeyColumn(yb::QLWriteRequestPB* ql_writereq_pb, int32_t value) { |
156 | 0 | ql_writereq_pb->add_hashed_column_values()->mutable_value()->set_int32_value(value); |
157 | 0 | } |
158 | | |
159 | 0 | void AddRangeKeyColumn(int32_t value, yb::QLWriteRequestPB* ql_writereq_pb) { |
160 | 0 | ql_writereq_pb->add_range_column_values()->mutable_value()->set_int32_value(value); |
161 | 0 | } |
162 | | |
163 | | void AddColumnValues(const Schema& schema, |
164 | | const vector<int32_t>& column_values, |
165 | 0 | yb::QLWriteRequestPB* ql_writereq_pb) { |
166 | 0 | ASSERT_EQ(schema.num_columns() - schema.num_key_columns(), column_values.size()); |
167 | 0 | for (size_t i = 0; i < column_values.size(); i++) { |
168 | 0 | auto column = ql_writereq_pb->add_column_values(); |
169 | 0 | column->set_column_id(narrow_cast<int32_t>(schema.num_key_columns() + i)); |
170 | 0 | column->mutable_expr()->mutable_value()->set_int32_value(column_values[i]); |
171 | 0 | } |
172 | 0 | } |
173 | | |
174 | | void WriteQL(const QLWriteRequestPB& ql_writereq_pb, const Schema& schema, |
175 | | QLResponsePB* ql_writeresp_pb, |
176 | | const HybridTime& hybrid_time = HybridTime::kMax, |
177 | | const TransactionOperationContext& txn_op_context = |
178 | 0 | kNonTransactionalOperationContext) { |
179 | 0 | IndexMap index_map; |
180 | 0 | QLWriteOperation ql_write_op(ql_writereq_pb, |
181 | 0 | std::shared_ptr<const Schema>(&schema, [](const Schema*){}), |
182 | 0 | index_map, nullptr /* unique_index_key_schema */, txn_op_context); |
183 | 0 | ASSERT_OK(ql_write_op.Init(ql_writeresp_pb)); |
184 | 0 | auto doc_write_batch = MakeDocWriteBatch(); |
185 | 0 | HybridTime restart_read_ht; |
186 | 0 | ASSERT_OK(ql_write_op.Apply( |
187 | 0 | {&doc_write_batch, CoarseTimePoint::max() /* deadline */, ReadHybridTime(), |
188 | 0 | &restart_read_ht})); |
189 | 0 | ASSERT_OK(WriteToRocksDB(doc_write_batch, hybrid_time)); |
190 | 0 | } |
191 | | |
192 | 0 | void AssertWithTTL(QLWriteRequestPB_QLStmtType stmt_type) { |
193 | 0 | if (stmt_type == QLWriteRequestPB::QL_STMT_INSERT) { |
194 | 0 | AssertDocDbDebugDumpStrEq(R"#( |
195 | 0 | SubDocKey(DocKey(0x0000, [1], []), [SystemColumnId(0); HT<max>]) -> null; ttl: 2.000s |
196 | 0 | SubDocKey(DocKey(0x0000, [1], []), [ColumnId(1); HT{ <max> w: 1 }]) -> 2; ttl: 2.000s |
197 | 0 | SubDocKey(DocKey(0x0000, [1], []), [ColumnId(2); HT{ <max> w: 2 }]) -> 3; ttl: 2.000s |
198 | 0 | SubDocKey(DocKey(0x0000, [1], []), [ColumnId(3); HT{ <max> w: 3 }]) -> 4; ttl: 2.000s |
199 | 0 | )#"); |
200 | 0 | } else { |
201 | 0 | AssertDocDbDebugDumpStrEq(R"#( |
202 | 0 | SubDocKey(DocKey(0x0000, [1], []), [ColumnId(1); HT<max>]) -> 2; ttl: 2.000s |
203 | 0 | SubDocKey(DocKey(0x0000, [1], []), [ColumnId(2); HT{ <max> w: 1 }]) -> 3; ttl: 2.000s |
204 | 0 | SubDocKey(DocKey(0x0000, [1], []), [ColumnId(3); HT{ <max> w: 2 }]) -> 4; ttl: 2.000s |
205 | 0 | )#"); |
206 | 0 | } |
207 | 0 | } |
208 | | |
209 | 0 | void AssertWithoutTTL(QLWriteRequestPB_QLStmtType stmt_type) { |
210 | 0 | if (stmt_type == QLWriteRequestPB::QL_STMT_INSERT) { |
211 | 0 | AssertDocDbDebugDumpStrEq(R"#( |
212 | 0 | SubDocKey(DocKey(0x0000, [1], []), [SystemColumnId(0); HT<max>]) -> null |
213 | 0 | SubDocKey(DocKey(0x0000, [1], []), [ColumnId(1); HT{ <max> w: 1 }]) -> 2 |
214 | 0 | SubDocKey(DocKey(0x0000, [1], []), [ColumnId(2); HT{ <max> w: 2 }]) -> 3 |
215 | 0 | SubDocKey(DocKey(0x0000, [1], []), [ColumnId(3); HT{ <max> w: 3 }]) -> 4 |
216 | 0 | )#"); |
217 | 0 | } else { |
218 | 0 | AssertDocDbDebugDumpStrEq(R"#( |
219 | 0 | SubDocKey(DocKey(0x0000, [1], []), [ColumnId(1); HT<max>]) -> 2 |
220 | 0 | SubDocKey(DocKey(0x0000, [1], []), [ColumnId(2); HT{ <max> w: 1 }]) -> 3 |
221 | 0 | SubDocKey(DocKey(0x0000, [1], []), [ColumnId(3); HT{ <max> w: 2 }]) -> 4 |
222 | 0 | )#"); |
223 | 0 | } |
224 | 0 | } |
225 | | |
226 | 0 | void RunTestQLInsertUpdate(QLWriteRequestPB_QLStmtType stmt_type, const int ttl = -1) { |
227 | 0 | yb::QLWriteRequestPB ql_writereq_pb; |
228 | 0 | yb::QLResponsePB ql_writeresp_pb; |
229 | | |
230 | | // Define the schema. |
231 | 0 | Schema schema = CreateSchema(); |
232 | |
|
233 | 0 | ql_writereq_pb.set_type(stmt_type); |
234 | | // Add primary key column. |
235 | 0 | AddPrimaryKeyColumn(&ql_writereq_pb, 1); |
236 | 0 | ql_writereq_pb.set_hash_code(0); |
237 | |
|
238 | 0 | AddColumnValues(schema, {2, 3, 4}, &ql_writereq_pb); |
239 | |
|
240 | 0 | if (ttl != -1) { |
241 | 0 | ql_writereq_pb.set_ttl(ttl); |
242 | 0 | } |
243 | | |
244 | | // Write to docdb. |
245 | 0 | WriteQL(ql_writereq_pb, schema, &ql_writeresp_pb); |
246 | |
|
247 | 0 | if (ttl == -1) { |
248 | 0 | AssertWithoutTTL(stmt_type); |
249 | 0 | } else { |
250 | 0 | AssertWithTTL(stmt_type); |
251 | 0 | } |
252 | 0 | } |
253 | | |
254 | | yb::QLWriteRequestPB WriteQLRowReq(QLWriteRequestPB_QLStmtType stmt_type, const Schema& schema, |
255 | | const vector<int32_t>& column_values, const HybridTime& hybrid_time, |
256 | | const TransactionOperationContext& txn_op_content = |
257 | 0 | kNonTransactionalOperationContext) { |
258 | 0 | yb::QLWriteRequestPB ql_writereq_pb; |
259 | 0 | ql_writereq_pb.set_type(stmt_type); |
260 | | |
261 | | // Add primary key column. |
262 | 0 | ql_writereq_pb.set_hash_code(0); |
263 | 0 | for (size_t i = 0; i != schema.num_key_columns(); ++i) { |
264 | 0 | if (i < schema.num_hash_key_columns()) { |
265 | 0 | AddPrimaryKeyColumn(&ql_writereq_pb, column_values[i]); |
266 | 0 | } else { |
267 | 0 | AddRangeKeyColumn(column_values[i], &ql_writereq_pb); |
268 | 0 | } |
269 | 0 | } |
270 | 0 | std::vector<int32_t> values(column_values.begin() + schema.num_key_columns(), |
271 | 0 | column_values.end()); |
272 | 0 | AddColumnValues(schema, values, &ql_writereq_pb); |
273 | 0 | return ql_writereq_pb; |
274 | 0 | } |
275 | | |
276 | | void WriteQLRow(QLWriteRequestPB_QLStmtType stmt_type, const Schema& schema, |
277 | | const vector<int32_t>& column_values, int64_t ttl, const HybridTime& hybrid_time, |
278 | | const TransactionOperationContext& txn_op_content = |
279 | 0 | kNonTransactionalOperationContext) { |
280 | 0 | yb::QLWriteRequestPB ql_writereq_pb = WriteQLRowReq( |
281 | 0 | stmt_type, schema, column_values, hybrid_time, txn_op_content); |
282 | 0 | ql_writereq_pb.set_ttl(ttl); |
283 | 0 | yb::QLResponsePB ql_writeresp_pb; |
284 | | // Write to docdb. |
285 | 0 | WriteQL(ql_writereq_pb, schema, &ql_writeresp_pb, hybrid_time, txn_op_content); |
286 | 0 | } |
287 | | |
288 | | void WriteQLRow(QLWriteRequestPB_QLStmtType stmt_type, const Schema& schema, |
289 | | const vector<int32_t>& column_values, const HybridTime& hybrid_time, |
290 | | const TransactionOperationContext& txn_op_content = |
291 | 0 | kNonTransactionalOperationContext) { |
292 | 0 | yb::QLWriteRequestPB ql_writereq_pb = WriteQLRowReq( |
293 | 0 | stmt_type, schema, column_values, hybrid_time, txn_op_content); |
294 | 0 | yb::QLResponsePB ql_writeresp_pb; |
295 | | // Write to docdb. |
296 | 0 | WriteQL(ql_writereq_pb, schema, &ql_writeresp_pb, hybrid_time, txn_op_content); |
297 | 0 | } |
298 | | |
299 | 0 | QLRowBlock ReadQLRow(const Schema& schema, int32_t primary_key, const HybridTime& read_time) { |
300 | 0 | QLReadRequestPB ql_read_req; |
301 | 0 | ql_read_req.add_hashed_column_values()->mutable_value()->set_int32_value(primary_key); |
302 | 0 | ql_read_req.set_hash_code(kFixedHashCode); |
303 | 0 | ql_read_req.set_max_hash_code(kFixedHashCode); |
304 | |
|
305 | 0 | QLRowBlock row_block(schema, vector<ColumnId> ({ColumnId(0), ColumnId(1), ColumnId(2), |
306 | 0 | ColumnId(3)})); |
307 | 0 | const Schema& projection = row_block.schema(); |
308 | 0 | QLRSRowDescPB *rsrow_desc_pb = ql_read_req.mutable_rsrow_desc(); |
309 | 0 | for (int32_t i = 0; i <= 3 ; i++) { |
310 | 0 | ql_read_req.add_selected_exprs()->set_column_id(i); |
311 | 0 | ql_read_req.mutable_column_refs()->add_ids(i); |
312 | |
|
313 | 0 | auto col = projection.column_by_id(ColumnId(i)); |
314 | 0 | EXPECT_OK(col); |
315 | 0 | QLRSColDescPB *rscol_desc = rsrow_desc_pb->add_rscol_descs(); |
316 | 0 | rscol_desc->set_name(col->name()); |
317 | 0 | col->type()->ToQLTypePB(rscol_desc->mutable_ql_type()); |
318 | 0 | } |
319 | |
|
320 | 0 | QLReadOperation read_op(ql_read_req, kNonTransactionalOperationContext); |
321 | 0 | QLRocksDBStorage ql_storage(doc_db()); |
322 | 0 | const QLRSRowDesc rsrow_desc(*rsrow_desc_pb); |
323 | 0 | faststring rows_data; |
324 | 0 | QLResultSet resultset(&rsrow_desc, &rows_data); |
325 | 0 | HybridTime read_restart_ht; |
326 | 0 | EXPECT_OK(read_op.Execute( |
327 | 0 | ql_storage, CoarseTimePoint::max() /* deadline */, ReadHybridTime::SingleTime(read_time), |
328 | 0 | schema, projection, &resultset, &read_restart_ht)); |
329 | 0 | EXPECT_FALSE(read_restart_ht.is_valid()); |
330 | | |
331 | | // Transfer the column values from result set to rowblock. |
332 | 0 | Slice data(rows_data.data(), rows_data.size()); |
333 | 0 | EXPECT_OK(row_block.Deserialize(YQL_CLIENT_CQL, &data)); |
334 | 0 | return row_block; |
335 | 0 | } |
336 | | |
337 | 0 | void SetMaxFileSizeForCompaction(const uint64_t max_size) { |
338 | 0 | ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_max_file_size_for_compaction) = max_size; |
339 | | // Make a function that will always use rocksdb_max_file_size_for_compaction. |
340 | | // Normally, max_file_size_for_compaction is only used for tables with TTL. |
341 | 0 | max_file_size_for_compaction_ = MakeMaxFileSizeFunction(); |
342 | 0 | } |
343 | | }; |
344 | | |
345 | 0 | TEST_F(DocOperationTest, TestRedisSetKVWithTTL) { |
346 | | // Write key with ttl to docdb. |
347 | 0 | auto db = rocksdb(); |
348 | 0 | yb::RedisWriteRequestPB redis_write_operation_pb; |
349 | 0 | auto set_request_pb = redis_write_operation_pb.mutable_set_request(); |
350 | 0 | set_request_pb->set_ttl(2000); |
351 | 0 | redis_write_operation_pb.mutable_key_value()->set_key("abc"); |
352 | 0 | redis_write_operation_pb.mutable_key_value()->set_type(REDIS_TYPE_STRING); |
353 | 0 | redis_write_operation_pb.mutable_key_value()->set_hash_code(123); |
354 | 0 | redis_write_operation_pb.mutable_key_value()->add_value("xyz"); |
355 | 0 | RedisWriteOperation redis_write_operation(redis_write_operation_pb); |
356 | 0 | auto doc_write_batch = MakeDocWriteBatch(); |
357 | 0 | ASSERT_OK(redis_write_operation.Apply( |
358 | 0 | {&doc_write_batch, CoarseTimePoint::max() /* deadline */, ReadHybridTime()})); |
359 | |
|
360 | 0 | ASSERT_OK(WriteToRocksDB(doc_write_batch, HybridTime::FromMicros(1000))); |
361 | | |
362 | | // Read key from rocksdb. |
363 | 0 | const KeyBytes doc_key = DocKey::FromRedisKey(123, "abc").Encode(); |
364 | 0 | rocksdb::ReadOptions read_opts; |
365 | 0 | auto iter = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(read_opts)); |
366 | 0 | ROCKSDB_SEEK(iter.get(), doc_key.AsSlice()); |
367 | 0 | ASSERT_TRUE(iter->Valid()); |
368 | | |
369 | | // Verify correct ttl. |
370 | 0 | MonoDelta ttl; |
371 | 0 | auto value = iter->value(); |
372 | 0 | ASSERT_OK(Value::DecodeTTL(&value, &ttl)); |
373 | 0 | EXPECT_EQ(2000, ttl.ToMilliseconds()); |
374 | 0 | } |
375 | | |
376 | 0 | TEST_F(DocOperationTest, TestQLInsertWithTTL) { |
377 | 0 | RunTestQLInsertUpdate(QLWriteRequestPB_QLStmtType_QL_STMT_INSERT, 2000); |
378 | 0 | } |
379 | | |
380 | 0 | TEST_F(DocOperationTest, TestQLUpdateWithTTL) { |
381 | 0 | RunTestQLInsertUpdate(QLWriteRequestPB_QLStmtType_QL_STMT_UPDATE, 2000); |
382 | 0 | } |
383 | | |
384 | 0 | TEST_F(DocOperationTest, TestQLInsertWithoutTTL) { |
385 | 0 | RunTestQLInsertUpdate(QLWriteRequestPB_QLStmtType_QL_STMT_INSERT); |
386 | 0 | } |
387 | | |
388 | 0 | TEST_F(DocOperationTest, TestQLUpdateWithoutTTL) { |
389 | 0 | RunTestQLInsertUpdate(QLWriteRequestPB_QLStmtType_QL_STMT_UPDATE); |
390 | 0 | } |
391 | | |
392 | 0 | TEST_F(DocOperationTest, TestQLWriteNulls) { |
393 | 0 | yb::QLWriteRequestPB ql_writereq_pb; |
394 | 0 | yb::QLResponsePB ql_writeresp_pb; |
395 | | |
396 | | // Define the schema. |
397 | 0 | Schema schema = CreateSchema(); |
398 | 0 | ql_writereq_pb.set_type( |
399 | 0 | QLWriteRequestPB_QLStmtType::QLWriteRequestPB_QLStmtType_QL_STMT_INSERT); |
400 | 0 | ql_writereq_pb.set_hash_code(0); |
401 | | |
402 | | // Add primary key column. |
403 | 0 | AddPrimaryKeyColumn(&ql_writereq_pb, 1); |
404 | | |
405 | | // Add null columns. |
406 | 0 | for (int i = 0; i < 3; i++) { |
407 | 0 | auto column = ql_writereq_pb.add_column_values(); |
408 | 0 | column->set_column_id(i + 1); |
409 | 0 | column->mutable_expr()->mutable_value(); |
410 | 0 | } |
411 | | |
412 | | // Write to docdb. |
413 | 0 | WriteQL(ql_writereq_pb, schema, &ql_writeresp_pb); |
414 | | |
415 | | // Null columns are converted to tombstones. |
416 | 0 | AssertDocDbDebugDumpStrEq(R"#( |
417 | 0 | SubDocKey(DocKey(0x0000, [1], []), [SystemColumnId(0); HT<max>]) -> null |
418 | 0 | SubDocKey(DocKey(0x0000, [1], []), [ColumnId(1); HT{ <max> w: 1 }]) -> DEL |
419 | 0 | SubDocKey(DocKey(0x0000, [1], []), [ColumnId(2); HT{ <max> w: 2 }]) -> DEL |
420 | 0 | SubDocKey(DocKey(0x0000, [1], []), [ColumnId(3); HT{ <max> w: 3 }]) -> DEL |
421 | 0 | )#"); |
422 | 0 | } |
423 | | |
424 | | TEST_F(DocOperationTest, TestQLReadWriteSimple) { |
425 | | yb::QLWriteRequestPB ql_writereq_pb; |
426 | | yb::QLResponsePB ql_writeresp_pb; |
427 | | |
428 | | // Define the schema. |
429 | | Schema schema = CreateSchema(); |
430 | | WriteQLRow(QLWriteRequestPB_QLStmtType_QL_STMT_INSERT, schema, vector<int>({1, 1, 2, 3}), |
431 | | 1000, HybridClock::HybridTimeFromMicrosecondsAndLogicalValue(1000, 0)); |
432 | | |
433 | | AssertDocDbDebugDumpStrEq(R"#( |
434 | | SubDocKey(DocKey(0x0000, [1], []), [SystemColumnId(0); HT{ physical: 1000 }]) -> null; ttl: 1.000s |
435 | | SubDocKey(DocKey(0x0000, [1], []), [ColumnId(1); HT{ physical: 1000 w: 1 }]) -> 1; ttl: 1.000s |
436 | | SubDocKey(DocKey(0x0000, [1], []), [ColumnId(2); HT{ physical: 1000 w: 2 }]) -> 2; ttl: 1.000s |
437 | | SubDocKey(DocKey(0x0000, [1], []), [ColumnId(3); HT{ physical: 1000 w: 3 }]) -> 3; ttl: 1.000s |
438 | | )#"); |
439 | | |
440 | | // Now read the value. |
441 | | QLRowBlock row_block = ReadQLRow(schema, 1, |
442 | | HybridClock::HybridTimeFromMicrosecondsAndLogicalValue(2000, 0)); |
443 | | ASSERT_EQ(1, row_block.row_count()); |
444 | | EXPECT_EQ(1, row_block.row(0).column(0).int32_value()); |
445 | | EXPECT_EQ(1, row_block.row(0).column(1).int32_value()); |
446 | | EXPECT_EQ(2, row_block.row(0).column(2).int32_value()); |
447 | | EXPECT_EQ(3, row_block.row(0).column(3).int32_value()); |
448 | | } |
449 | | |
450 | 0 | TEST_F(DocOperationTest, TestQLRangeDeleteWithStaticColumnAvoidsFullPartitionKeyScan) { |
451 | 0 | constexpr int kNumRows = 10000; |
452 | 0 | constexpr int kDeleteRangeLow = 100; |
453 | 0 | constexpr int kDeleteRangeHigh = 200; |
454 | | |
455 | | // Define the schema with a partition key, range key, static column, and value. |
456 | 0 | SchemaBuilder builder; |
457 | 0 | builder.set_next_column_id(ColumnId(0)); |
458 | 0 | ASSERT_OK(builder.AddHashKeyColumn("k", INT32)); |
459 | 0 | ASSERT_OK(builder.AddKeyColumn("r", INT32)); |
460 | 0 | ASSERT_OK(builder.AddColumn(ColumnSchema("s", INT32, false, false, true), false)); |
461 | 0 | ASSERT_OK(builder.AddColumn(ColumnSchema("v", INT32), false)); |
462 | 0 | auto schema = builder.Build(); |
463 | | |
464 | | // Write rows with the same partition key but different range key |
465 | 0 | for (int row_num = 0; row_num < kNumRows; ++row_num) { |
466 | 0 | WriteQLRow( |
467 | 0 | QLWriteRequestPB_QLStmtType_QL_STMT_INSERT, |
468 | 0 | schema, |
469 | 0 | vector<int>({1, row_num, 0, row_num}), |
470 | 0 | HybridClock::HybridTimeFromMicrosecondsAndLogicalValue(1000, 0)); |
471 | 0 | } |
472 | 0 | auto get_num_rocksdb_iter_moves = [&]() { |
473 | 0 | auto num_next = regular_db_options().statistics->getTickerCount( |
474 | 0 | rocksdb::Tickers::NUMBER_DB_NEXT); |
475 | 0 | auto num_seek = regular_db_options().statistics->getTickerCount( |
476 | 0 | rocksdb::Tickers::NUMBER_DB_SEEK); |
477 | 0 | return num_next + num_seek; |
478 | 0 | }; |
479 | |
|
480 | 0 | auto rocksdb_iter_moves_before_delete = get_num_rocksdb_iter_moves(); |
481 | | |
482 | | // Delete a subset of the partition |
483 | 0 | yb::QLWriteRequestPB ql_writereq_pb; |
484 | 0 | yb::QLResponsePB ql_writeresp_pb; |
485 | 0 | ql_writereq_pb.set_type(QLWriteRequestPB_QLStmtType_QL_STMT_DELETE); |
486 | 0 | AddPrimaryKeyColumn(&ql_writereq_pb, 1); |
487 | |
|
488 | 0 | auto where_clause_and = ql_writereq_pb.mutable_where_expr()->mutable_condition(); |
489 | 0 | where_clause_and->set_op(QLOperator::QL_OP_AND); |
490 | 0 | QLAddInt32Condition(where_clause_and, 1, QL_OP_GREATER_THAN_EQUAL, kDeleteRangeLow); |
491 | 0 | QLAddInt32Condition(where_clause_and, 1, QL_OP_LESS_THAN_EQUAL, kDeleteRangeHigh); |
492 | 0 | ql_writereq_pb.set_hash_code(0); |
493 | |
|
494 | 0 | WriteQL(ql_writereq_pb, schema, &ql_writeresp_pb, |
495 | 0 | HybridClock::HybridTimeFromMicrosecondsAndLogicalValue(2000, 0), |
496 | 0 | kNonTransactionalOperationContext); |
497 | | |
498 | | // During deletion, we expect to move the RocksDB iterator *at most* once per docdb row in range, |
499 | | // plus once for the first docdb row out of range. We say *at most*, because it's possible to have |
500 | | // various combinations of Next() vs. Seek() calls. This simply tests that we do not have more |
501 | | // than the maximum allowed based on the schema and the restriction that we should not scan |
502 | | // outside the boundaries of the relevant deletion range. |
503 | 0 | auto num_cql_rows_in_range = kDeleteRangeHigh - kDeleteRangeLow + 1; |
504 | 0 | auto max_num_rocksdb_moves_per_cql_row = 3; |
505 | 0 | auto max_num_rocksdb_iter_moves = num_cql_rows_in_range * max_num_rocksdb_moves_per_cql_row; |
506 | 0 | ASSERT_LE( |
507 | 0 | get_num_rocksdb_iter_moves() - rocksdb_iter_moves_before_delete, |
508 | 0 | max_num_rocksdb_iter_moves + 1); |
509 | 0 | } |
510 | | |
511 | 0 | TEST_F(DocOperationTest, TestQLReadWithoutLivenessColumn) { |
512 | 0 | const DocKey doc_key(kFixedHashCode, PrimitiveValues(PrimitiveValue::Int32(100)), |
513 | 0 | PrimitiveValues()); |
514 | 0 | KeyBytes encoded_doc_key(doc_key.Encode()); |
515 | 0 | ASSERT_OK(SetPrimitive(DocPath(encoded_doc_key, PrimitiveValue(ColumnId(1))), |
516 | 0 | Value(PrimitiveValue::Int32(2)), HybridTime(1000))); |
517 | 0 | ASSERT_OK(SetPrimitive(DocPath(encoded_doc_key, PrimitiveValue(ColumnId(2))), |
518 | 0 | Value(PrimitiveValue::Int32(3)), HybridTime(2000))); |
519 | 0 | ASSERT_OK(SetPrimitive(DocPath(encoded_doc_key, PrimitiveValue(ColumnId(3))), |
520 | 0 | Value(PrimitiveValue::Int32(4)), HybridTime(3000))); |
521 | |
|
522 | 0 | AssertDocDbDebugDumpStrEq(R"#( |
523 | 0 | SubDocKey(DocKey(0x0000, [100], []), [ColumnId(1); HT{ physical: 0 logical: 1000 }]) -> 2 |
524 | 0 | SubDocKey(DocKey(0x0000, [100], []), [ColumnId(2); HT{ physical: 0 logical: 2000 }]) -> 3 |
525 | 0 | SubDocKey(DocKey(0x0000, [100], []), [ColumnId(3); HT{ physical: 0 logical: 3000 }]) -> 4 |
526 | 0 | )#"); |
527 | | |
528 | | // Now verify we can read without the system column id. |
529 | 0 | Schema schema = CreateSchema(); |
530 | 0 | HybridTime read_time(3000); |
531 | 0 | QLRowBlock row_block = ReadQLRow(schema, 100, read_time); |
532 | 0 | ASSERT_EQ(1, row_block.row_count()); |
533 | 0 | EXPECT_EQ(100, row_block.row(0).column(0).int32_value()); |
534 | 0 | EXPECT_EQ(2, row_block.row(0).column(1).int32_value()); |
535 | 0 | EXPECT_EQ(3, row_block.row(0).column(2).int32_value()); |
536 | 0 | EXPECT_EQ(4, row_block.row(0).column(3).int32_value()); |
537 | 0 | } |
538 | | |
539 | 0 | TEST_F(DocOperationTest, TestQLReadWithTombstone) { |
540 | 0 | DocKey doc_key(0, PrimitiveValues(PrimitiveValue::Int32(100)), PrimitiveValues()); |
541 | 0 | KeyBytes encoded_doc_key(doc_key.Encode()); |
542 | 0 | ASSERT_OK(SetPrimitive(DocPath(encoded_doc_key, PrimitiveValue(ColumnId(1))), |
543 | 0 | Value(PrimitiveValue::kTombstone), HybridTime(1000))); |
544 | 0 | ASSERT_OK(SetPrimitive(DocPath(encoded_doc_key, PrimitiveValue(ColumnId(2))), |
545 | 0 | Value(PrimitiveValue::kTombstone), HybridTime(2000))); |
546 | 0 | ASSERT_OK(SetPrimitive(DocPath(encoded_doc_key, PrimitiveValue(ColumnId(3))), |
547 | 0 | Value(PrimitiveValue::kTombstone), HybridTime(3000))); |
548 | |
|
549 | 0 | AssertDocDbDebugDumpStrEq(R"#( |
550 | 0 | SubDocKey(DocKey(0x0000, [100], []), [ColumnId(1); HT{ physical: 0 logical: 1000 }]) -> DEL |
551 | 0 | SubDocKey(DocKey(0x0000, [100], []), [ColumnId(2); HT{ physical: 0 logical: 2000 }]) -> DEL |
552 | 0 | SubDocKey(DocKey(0x0000, [100], []), [ColumnId(3); HT{ physical: 0 logical: 3000 }]) -> DEL |
553 | 0 | )#"); |
554 | |
|
555 | 0 | Schema schema = CreateSchema(); |
556 | 0 | DocRowwiseIterator iter(schema, schema, kNonTransactionalOperationContext, |
557 | 0 | doc_db(), CoarseTimePoint::max() /* deadline */, |
558 | 0 | ReadHybridTime::FromUint64(3000)); |
559 | 0 | ASSERT_OK(iter.Init(YQL_TABLE_TYPE)); |
560 | 0 | ASSERT_FALSE(ASSERT_RESULT(iter.HasNext())); |
561 | | |
562 | | // Now verify row exists even with one valid column. |
563 | 0 | doc_key = DocKey(kFixedHashCode, PrimitiveValues(PrimitiveValue::Int32(100)), PrimitiveValues()); |
564 | 0 | encoded_doc_key = doc_key.Encode(); |
565 | 0 | ASSERT_OK(SetPrimitive(DocPath(encoded_doc_key, PrimitiveValue(ColumnId(1))), |
566 | 0 | Value(PrimitiveValue::kTombstone), HybridTime(1001))); |
567 | 0 | ASSERT_OK(SetPrimitive(DocPath(encoded_doc_key, PrimitiveValue(ColumnId(2))), |
568 | 0 | Value(PrimitiveValue::Int32(2), |
569 | 0 | MonoDelta::FromMilliseconds(1)), HybridTime(2001))); |
570 | 0 | ASSERT_OK(SetPrimitive(DocPath(encoded_doc_key, PrimitiveValue(ColumnId(3))), |
571 | 0 | Value(PrimitiveValue::Int32(101)), HybridTime(3001))); |
572 | |
|
573 | 0 | AssertDocDbDebugDumpStrEq(R"#( |
574 | 0 | SubDocKey(DocKey(0x0000, [100], []), [ColumnId(1); HT{ physical: 0 logical: 1001 }]) -> DEL |
575 | 0 | SubDocKey(DocKey(0x0000, [100], []), [ColumnId(1); HT{ physical: 0 logical: 1000 }]) -> DEL |
576 | 0 | SubDocKey(DocKey(0x0000, [100], []), [ColumnId(2); HT{ physical: 0 logical: 2001 }]) -> \ |
577 | 0 | 2; ttl: 0.001s |
578 | 0 | SubDocKey(DocKey(0x0000, [100], []), [ColumnId(2); HT{ physical: 0 logical: 2000 }]) -> DEL |
579 | 0 | SubDocKey(DocKey(0x0000, [100], []), [ColumnId(3); HT{ physical: 0 logical: 3001 }]) -> 101 |
580 | 0 | SubDocKey(DocKey(0x0000, [100], []), [ColumnId(3); HT{ physical: 0 logical: 3000 }]) -> DEL |
581 | 0 | )#"); |
582 | |
|
583 | 0 | vector<PrimitiveValue> hashed_components({PrimitiveValue::Int32(100)}); |
584 | 0 | DocQLScanSpec ql_scan_spec(schema, kFixedHashCode, kFixedHashCode, hashed_components, |
585 | 0 | /* req */ nullptr, /* if_req */ nullptr, rocksdb::kDefaultQueryId); |
586 | |
|
587 | 0 | DocRowwiseIterator ql_iter( |
588 | 0 | schema, schema, kNonTransactionalOperationContext, doc_db(), |
589 | 0 | CoarseTimePoint::max() /* deadline */, ReadHybridTime::FromMicros(3000)); |
590 | 0 | ASSERT_OK(ql_iter.Init(ql_scan_spec)); |
591 | 0 | ASSERT_TRUE(ASSERT_RESULT(ql_iter.HasNext())); |
592 | 0 | QLTableRow value_map; |
593 | 0 | ASSERT_OK(ql_iter.NextRow(&value_map)); |
594 | 0 | ASSERT_EQ(4, value_map.ColumnCount()); |
595 | 0 | EXPECT_EQ(100, value_map.TestValue(0).value.int32_value()); |
596 | 0 | EXPECT_TRUE(IsNull(value_map.TestValue(1).value)); |
597 | 0 | EXPECT_TRUE(IsNull(value_map.TestValue(2).value)); |
598 | 0 | EXPECT_EQ(101, value_map.TestValue(3).value.int32_value()); |
599 | | |
600 | | // Now verify row exists as long as liveness system column exists. |
601 | 0 | doc_key = DocKey(kFixedHashCode, PrimitiveValues(PrimitiveValue::Int32(101)), PrimitiveValues()); |
602 | 0 | encoded_doc_key = doc_key.Encode(); |
603 | 0 | ASSERT_OK(SetPrimitive(DocPath(encoded_doc_key, PrimitiveValue::kLivenessColumn), |
604 | 0 | Value(PrimitiveValue(ValueType::kNullLow)), |
605 | 0 | HybridTime(1000))); |
606 | 0 | ASSERT_OK(SetPrimitive(DocPath(encoded_doc_key, PrimitiveValue(ColumnId(1))), |
607 | 0 | Value(PrimitiveValue::kTombstone), HybridTime(1000))); |
608 | 0 | ASSERT_OK(SetPrimitive(DocPath(encoded_doc_key, PrimitiveValue(ColumnId(2))), |
609 | 0 | Value(PrimitiveValue::Int32(2), |
610 | 0 | MonoDelta::FromMilliseconds(1)), HybridTime(2000))); |
611 | 0 | ASSERT_OK(SetPrimitive(DocPath(encoded_doc_key, PrimitiveValue(ColumnId(3))), |
612 | 0 | Value(PrimitiveValue::kTombstone), HybridTime(3000))); |
613 | |
|
614 | 0 | AssertDocDbDebugDumpStrEq(R"#( |
615 | 0 | SubDocKey(DocKey(0x0000, [100], []), [ColumnId(1); HT{ physical: 0 logical: 1001 }]) -> DEL |
616 | 0 | SubDocKey(DocKey(0x0000, [100], []), [ColumnId(1); HT{ physical: 0 logical: 1000 }]) -> DEL |
617 | 0 | SubDocKey(DocKey(0x0000, [100], []), [ColumnId(2); HT{ physical: 0 logical: 2001 }]) -> \ |
618 | 0 | 2; ttl: 0.001s |
619 | 0 | SubDocKey(DocKey(0x0000, [100], []), [ColumnId(2); HT{ physical: 0 logical: 2000 }]) -> DEL |
620 | 0 | SubDocKey(DocKey(0x0000, [100], []), [ColumnId(3); HT{ physical: 0 logical: 3001 }]) -> 101 |
621 | 0 | SubDocKey(DocKey(0x0000, [100], []), [ColumnId(3); HT{ physical: 0 logical: 3000 }]) -> DEL |
622 | 0 | SubDocKey(DocKey(0x0000, [101], []), [SystemColumnId(0); HT{ physical: 0 logical: 1000 }]) -> null |
623 | 0 | SubDocKey(DocKey(0x0000, [101], []), [ColumnId(1); HT{ physical: 0 logical: 1000 }]) -> DEL |
624 | 0 | SubDocKey(DocKey(0x0000, [101], []), [ColumnId(2); HT{ physical: 0 logical: 2000 }]) -> \ |
625 | 0 | 2; ttl: 0.001s |
626 | 0 | SubDocKey(DocKey(0x0000, [101], []), [ColumnId(3); HT{ physical: 0 logical: 3000 }]) -> DEL |
627 | 0 | )#"); |
628 | |
|
629 | 0 | vector<PrimitiveValue> hashed_components_system({PrimitiveValue::Int32(101)}); |
630 | 0 | DocQLScanSpec ql_scan_spec_system(schema, kFixedHashCode, kFixedHashCode, |
631 | 0 | hashed_components_system, /* req */ nullptr, /* if_req */ nullptr, |
632 | 0 | rocksdb::kDefaultQueryId); |
633 | |
|
634 | 0 | DocRowwiseIterator ql_iter_system( |
635 | 0 | schema, schema, kNonTransactionalOperationContext, doc_db(), |
636 | 0 | CoarseTimePoint::max() /* deadline */, ReadHybridTime::FromMicros(3000)); |
637 | 0 | ASSERT_OK(ql_iter_system.Init(ql_scan_spec_system)); |
638 | 0 | ASSERT_TRUE(ASSERT_RESULT(ql_iter_system.HasNext())); |
639 | 0 | QLTableRow value_map_system; |
640 | 0 | ASSERT_OK(ql_iter_system.NextRow(&value_map_system)); |
641 | 0 | ASSERT_EQ(4, value_map_system.ColumnCount()); |
642 | 0 | EXPECT_EQ(101, value_map_system.TestValue(0).value.int32_value()); |
643 | 0 | EXPECT_TRUE(IsNull(value_map_system.TestValue(1).value)); |
644 | 0 | EXPECT_TRUE(IsNull(value_map_system.TestValue(2).value)); |
645 | 0 | EXPECT_TRUE(IsNull(value_map_system.TestValue(3).value)); |
646 | 0 | } |
647 | | |
648 | | namespace { |
649 | | |
650 | 0 | int32_t NewInt(std::mt19937_64* rng, std::unordered_set<int32_t>* existing) { |
651 | 0 | std::uniform_int_distribution<uint32_t> distribution; |
652 | 0 | for (;;) { |
653 | 0 | auto result = distribution(*rng); |
654 | 0 | if (existing->insert(static_cast<int32_t>(result)).second) { |
655 | 0 | return result; |
656 | 0 | } |
657 | 0 | } |
658 | 0 | } |
659 | | |
660 | | int32_t NewInt(std::mt19937_64* rng, std::unordered_set<int32_t>* existing, |
661 | 0 | const int32_t min, const int32_t max) { |
662 | 0 | std::uniform_int_distribution<int32_t> distribution(min, max); |
663 | 0 | for (;;) { |
664 | 0 | auto result = distribution(*rng); |
665 | 0 | if (existing->insert(result).second) { |
666 | 0 | return result; |
667 | 0 | } |
668 | 0 | } |
669 | 0 | } |
670 | | |
671 | | struct RowData { |
672 | | int32_t k; |
673 | | int32_t r; |
674 | | int32_t v; |
675 | | }; |
676 | | |
677 | | struct RowDataWithHt { |
678 | | RowData data; |
679 | | HybridTime ht; |
680 | | }; |
681 | | |
682 | 0 | bool operator==(const RowData& lhs, const RowData& rhs) { |
683 | 0 | return lhs.k == rhs.k && lhs.r == rhs.r && lhs.v == rhs.v; |
684 | 0 | } |
685 | | |
686 | 0 | bool operator<(const RowData& lhs, const RowData& rhs) { |
687 | 0 | return lhs.k == rhs.k ? (lhs.r == rhs.r ? lhs.v < rhs.v : lhs.r < rhs.r) : lhs.k < rhs.k; |
688 | 0 | } |
689 | | |
690 | 0 | bool IsSameKey(const RowData& lhs, const RowData& rhs) { |
691 | 0 | return lhs.k == rhs.k && lhs.r == rhs.r; |
692 | 0 | } |
693 | | |
694 | 0 | bool IsKeyLessThan(const RowData& lhs, const RowData& rhs) { |
695 | 0 | return lhs.k == rhs.k ? lhs.r < rhs.r : lhs.k < rhs.k; |
696 | 0 | } |
697 | | |
698 | 0 | std::ostream& operator<<(std::ostream& out, const RowData& row) { |
699 | 0 | return out << "{ k: " << row.k << " r: " << row.r << " v: " << row.v << " }"; |
700 | 0 | } |
701 | | |
702 | 0 | bool IsSameKey(const RowDataWithHt& lhs, const RowDataWithHt& rhs) { |
703 | 0 | return IsSameKey(lhs.data, rhs.data); |
704 | 0 | } |
705 | | |
706 | 0 | bool IsKeyLessThan(const RowDataWithHt& lhs, const RowDataWithHt& rhs) { |
707 | 0 | return IsKeyLessThan(lhs.data, rhs.data); |
708 | 0 | } |
709 | | |
710 | 0 | bool operator<(const RowDataWithHt& lhs, const RowDataWithHt& rhs) { |
711 | 0 | return IsSameKey(lhs, rhs) |
712 | 0 | ? (lhs.ht == rhs.ht ? lhs.data < rhs.data : lhs.ht > rhs.ht) |
713 | 0 | : IsKeyLessThan(lhs, rhs); |
714 | 0 | } |
715 | | |
716 | 0 | std::ostream& operator<<(std::ostream& out, const RowDataWithHt& row) { |
717 | 0 | return out << "{ data: " << row.data << " ht: " << row.ht << " }"; |
718 | 0 | } |
719 | | |
720 | | template<class It> |
721 | 0 | It MoveForwardToNextKey(It it, const It end) { |
722 | 0 | return std::find_if_not(it, end, [it](auto k) { return IsSameKey(k, *it); }); |
723 | 0 | } |
724 | | |
725 | | template<class It> |
726 | 0 | It MoveBackToBeginningOfCurrentKey(const It begin, It it) { |
727 | |
|
728 | 0 | return std::lower_bound(begin, it, *it, [](auto k1, auto k2) { return IsKeyLessThan(k1, k2); }); |
729 | 0 | } |
730 | | |
731 | | template<class It> |
732 | 0 | std::pair<It, It> GetIteratorRange(const It begin, const It end, const It it, QLOperator op) { |
733 | 0 | switch (op) { |
734 | 0 | case QL_OP_EQUAL: |
735 | 0 | { |
736 | 0 | return std::make_pair(MoveBackToBeginningOfCurrentKey(begin, it), |
737 | 0 | MoveForwardToNextKey(it, end)); |
738 | 0 | } |
739 | 0 | case QL_OP_LESS_THAN: |
740 | 0 | { |
741 | 0 | return std::make_pair(begin, MoveBackToBeginningOfCurrentKey(begin, it)); |
742 | 0 | } |
743 | 0 | case QL_OP_LESS_THAN_EQUAL: |
744 | 0 | { |
745 | 0 | return std::make_pair(begin, MoveForwardToNextKey(it, end)); |
746 | 0 | } |
747 | 0 | case QL_OP_GREATER_THAN: |
748 | 0 | { |
749 | 0 | return std::make_pair(MoveForwardToNextKey(it, end), end); |
750 | 0 | } |
751 | 0 | case QL_OP_GREATER_THAN_EQUAL: |
752 | 0 | { |
753 | 0 | return std::make_pair(MoveBackToBeginningOfCurrentKey(begin, it), end); |
754 | 0 | } |
755 | 0 | default: // We should not handle all cases here |
756 | 0 | LOG(FATAL) << "Unexpected op: " << QLOperator_Name(op); |
757 | 0 | return std::make_pair(begin, end); |
758 | 0 | } |
759 | 0 | } |
760 | | |
761 | | } // namespace |
762 | | |
763 | | class DocOperationScanTest : public DocOperationTest { |
764 | | protected: |
765 | 0 | DocOperationScanTest() { |
766 | 0 | Seed(&rng_); |
767 | 0 | } |
768 | | |
769 | 0 | void InitSchema(SortingType range_column_sorting) { |
770 | 0 | range_column_sorting_type_ = range_column_sorting; |
771 | 0 | ColumnSchema hash_column("k", INT32, false, true); |
772 | 0 | ColumnSchema range_column("r", INT32, false, false, false, false, 1, range_column_sorting); |
773 | 0 | ColumnSchema value_column("v", INT32, false, false); |
774 | 0 | auto columns = { hash_column, range_column, value_column }; |
775 | 0 | schema_ = Schema(columns, CreateColumnIds(columns.size()), 2); |
776 | 0 | } |
777 | | |
778 | | void InsertRows(const size_t num_rows_per_key, |
779 | 0 | TransactionStatusManagerMock* txn_status_manager = nullptr) { |
780 | 0 | ResetCurrentTransactionId(); |
781 | 0 | ASSERT_OK(DisableCompactions()); |
782 | |
|
783 | 0 | std::unordered_set<int32_t> used_ints; |
784 | 0 | h_key_ = NewInt(&rng_, &used_ints); |
785 | 0 | rows_.clear(); |
786 | 0 | for (int32_t i = 0; i != kNumKeys; ++i) { |
787 | 0 | int32_t r_key = NewInt(&rng_, &used_ints); |
788 | 0 | for (size_t j = 0; j < num_rows_per_key; ++j) { |
789 | 0 | RowData row_data = {h_key_, r_key, NewInt(&rng_, &used_ints)}; |
790 | 0 | auto ht = HybridClock::HybridTimeFromMicrosecondsAndLogicalValue( |
791 | 0 | NewInt(&rng_, &used_ints, kMinTime, kMaxTime), 0); |
792 | 0 | RowDataWithHt row = {row_data, ht}; |
793 | 0 | rows_.push_back(row); |
794 | |
|
795 | 0 | std::unique_ptr<TransactionOperationContext> txn_op_context; |
796 | 0 | boost::optional<TransactionId> txn_id; |
797 | 0 | if (txn_status_manager) { |
798 | 0 | if (RandomActWithProbability(0.5, &rng_)) { |
799 | 0 | txn_id = TransactionId::GenerateRandom(); |
800 | 0 | SetCurrentTransactionId(*txn_id); |
801 | 0 | txn_op_context = std::make_unique<TransactionOperationContext>(*txn_id, |
802 | 0 | txn_status_manager); |
803 | 0 | } else { |
804 | 0 | ResetCurrentTransactionId(); |
805 | 0 | } |
806 | 0 | } |
807 | 0 | WriteQLRow(QLWriteRequestPB_QLStmtType_QL_STMT_INSERT, |
808 | 0 | schema_, |
809 | 0 | { row_data.k, row_data.r, row_data.v }, |
810 | 0 | 1000, |
811 | 0 | ht, |
812 | 0 | txn_op_context ? *txn_op_context : kNonTransactionalOperationContext); |
813 | 0 | if (txn_id) { |
814 | 0 | txn_status_manager->Commit(*txn_id, ht); |
815 | 0 | } |
816 | 0 | } |
817 | |
|
818 | 0 | ASSERT_OK(FlushRocksDbAndWait()); |
819 | 0 | } |
820 | |
|
821 | 0 | DumpRocksDBToLog(rocksdb(), StorageDbType::kRegular); |
822 | 0 | DumpRocksDBToLog(intents_db(), StorageDbType::kIntents); |
823 | 0 | } |
824 | | |
825 | | void PerformScans(const bool is_forward_scan, |
826 | | const TransactionOperationContext& txn_op_context, |
827 | 0 | boost::function<void(const size_t keys_in_scan_range)> after_scan_callback) { |
828 | 0 | std::vector <PrimitiveValue> hashed_components = {PrimitiveValue::Int32(h_key_)}; |
829 | 0 | std::vector <QLOperator> operators = { |
830 | 0 | QL_OP_EQUAL, |
831 | 0 | QL_OP_LESS_THAN_EQUAL, |
832 | 0 | QL_OP_GREATER_THAN_EQUAL, |
833 | 0 | }; |
834 | |
|
835 | 0 | std::shuffle(rows_.begin(), rows_.end(), rng_); |
836 | 0 | auto ordered_rows = rows_; |
837 | 0 | std::sort(ordered_rows.begin(), ordered_rows.end()); |
838 | |
|
839 | 0 | for (const auto op : operators) { |
840 | 0 | LOG(INFO) << "Testing: " << QLOperator_Name(op); |
841 | 0 | for (const auto& row : rows_) { |
842 | 0 | QLConditionPB condition; |
843 | 0 | condition.add_operands()->set_column_id(1_ColId); |
844 | 0 | condition.set_op(op); |
845 | 0 | condition.add_operands()->mutable_value()->set_int32_value(row.data.r); |
846 | 0 | auto it = std::lower_bound(ordered_rows.begin(), ordered_rows.end(), row); |
847 | 0 | LOG(INFO) << "Bound: " << yb::ToString(*it); |
848 | 0 | const auto range = GetIteratorRange(ordered_rows.begin(), ordered_rows.end(), it, op); |
849 | 0 | std::vector <int32_t> ht_diffs; |
850 | 0 | if (op == QL_OP_EQUAL) { |
851 | 0 | ht_diffs = {0}; |
852 | 0 | } else { |
853 | 0 | ht_diffs = {-1, 0, 1}; |
854 | 0 | } |
855 | 0 | for (auto ht_diff : ht_diffs) { |
856 | 0 | std::vector <RowDataWithHt> expected_rows; |
857 | 0 | auto read_ht = ReadHybridTime::FromMicros(row.ht.GetPhysicalValueMicros() + ht_diff); |
858 | 0 | LOG(INFO) << "Read time: " << read_ht; |
859 | 0 | size_t keys_in_range = range.first < range.second; |
860 | 0 | for (auto it_r = range.first; it_r < range.second;) { |
861 | 0 | auto it = it_r; |
862 | 0 | if (it_r->ht <= read_ht.read) { |
863 | 0 | expected_rows.emplace_back(*it_r); |
864 | 0 | while (it_r < range.second && IsSameKey(*it_r, *it)) { |
865 | 0 | ++it_r; |
866 | 0 | } |
867 | 0 | } else { |
868 | 0 | ++it_r; |
869 | 0 | } |
870 | 0 | if (it_r < range.second && !IsSameKey(*it_r, *it)) { |
871 | 0 | ++keys_in_range; |
872 | 0 | } |
873 | 0 | } |
874 | |
|
875 | 0 | if (is_forward_scan == |
876 | 0 | (range_column_sorting_type_ == SortingType::kDescending)) { |
877 | 0 | std::reverse(expected_rows.begin(), expected_rows.end()); |
878 | 0 | } |
879 | 0 | DocQLScanSpec ql_scan_spec( |
880 | 0 | schema_, kFixedHashCode, kFixedHashCode, hashed_components, |
881 | 0 | &condition, nullptr /* if_ req */, rocksdb::kDefaultQueryId, is_forward_scan); |
882 | 0 | DocRowwiseIterator ql_iter( |
883 | 0 | schema_, schema_, txn_op_context, doc_db(), CoarseTimePoint::max() /* deadline */, |
884 | 0 | read_ht); |
885 | 0 | ASSERT_OK(ql_iter.Init(ql_scan_spec)); |
886 | 0 | LOG(INFO) << "Expected rows: " << yb::ToString(expected_rows); |
887 | 0 | it = expected_rows.begin(); |
888 | 0 | while (ASSERT_RESULT(ql_iter.HasNext())) { |
889 | 0 | QLTableRow value_map; |
890 | 0 | ASSERT_OK(ql_iter.NextRow(&value_map)); |
891 | 0 | ASSERT_EQ(3, value_map.ColumnCount()); |
892 | |
|
893 | 0 | RowData fetched_row = {value_map.TestValue(0_ColId).value.int32_value(), |
894 | 0 | value_map.TestValue(1_ColId).value.int32_value(), |
895 | 0 | value_map.TestValue(2_ColId).value.int32_value()}; |
896 | 0 | LOG(INFO) << "Fetched row: " << fetched_row; |
897 | 0 | ASSERT_LT(it, expected_rows.end()); |
898 | 0 | ASSERT_EQ(fetched_row, it->data); |
899 | 0 | it++; |
900 | 0 | } |
901 | 0 | ASSERT_EQ(expected_rows.end(), it); |
902 | |
|
903 | 0 | after_scan_callback(keys_in_range); |
904 | 0 | } |
905 | 0 | } |
906 | 0 | } |
907 | 0 | } |
908 | | |
909 | 0 | void TestWithSortingType(SortingType sorting_type, bool is_forward_scan) { |
910 | 0 | DoTestWithSortingType(sorting_type, is_forward_scan, 1 /* num_rows_per_key */); |
911 | 0 | ASSERT_OK(DestroyRocksDB()); |
912 | 0 | ASSERT_OK(ReopenRocksDB()); |
913 | 0 | DoTestWithSortingType(sorting_type, is_forward_scan, 5 /* num_rows_per_key */); |
914 | 0 | } |
915 | | |
916 | | virtual void DoTestWithSortingType(SortingType schema_type, bool is_forward_scan, |
917 | | size_t num_rows_per_key) = 0; |
918 | | |
919 | | constexpr static int32_t kNumKeys = 20; |
920 | | constexpr static uint32_t kMinTime = 500; |
921 | | constexpr static uint32_t kMaxTime = 1500; |
922 | | |
923 | | std::mt19937_64 rng_; |
924 | | SortingType range_column_sorting_type_; |
925 | | Schema schema_; |
926 | | int32_t h_key_; |
927 | | std::vector<RowDataWithHt> rows_; |
928 | | }; |
929 | | |
930 | | class DocOperationRangeFilterTest : public DocOperationScanTest { |
931 | | protected: |
932 | | void DoTestWithSortingType(SortingType sorting_type, bool is_forward_scan, |
933 | | size_t num_rows_per_key) override; |
934 | | }; |
935 | | |
936 | | // Currently we test using one column and one scan type. |
937 | | // TODO(akashnil): In future we want to implement and test arbitrary ASC DESC combinations for scan. |
938 | | void DocOperationRangeFilterTest::DoTestWithSortingType(SortingType sorting_type, |
939 | 0 | const bool is_forward_scan, const size_t num_rows_per_key) { |
940 | 0 | ASSERT_OK(DisableCompactions()); |
941 | |
|
942 | 0 | InitSchema(sorting_type); |
943 | 0 | InsertRows(num_rows_per_key); |
944 | |
|
945 | 0 | { |
946 | 0 | std::vector<rocksdb::LiveFileMetaData> live_files; |
947 | 0 | rocksdb()->GetLiveFilesMetaData(&live_files); |
948 | 0 | const auto expected_live_files = kNumKeys; |
949 | 0 | ASSERT_EQ(expected_live_files, live_files.size()); |
950 | 0 | } |
951 | |
|
952 | 0 | auto old_iterators = |
953 | 0 | rocksdb()->GetDBOptions().statistics->getTickerCount(rocksdb::NO_TABLE_CACHE_ITERATORS); |
954 | |
|
955 | 0 | PerformScans(is_forward_scan, kNonTransactionalOperationContext, |
956 | 0 | [this, &old_iterators](const size_t key_in_scan_range) { |
957 | 0 | auto new_iterators = |
958 | 0 | rocksdb()->GetDBOptions().statistics->getTickerCount(rocksdb::NO_TABLE_CACHE_ITERATORS); |
959 | 0 | ASSERT_EQ(key_in_scan_range, new_iterators - old_iterators); |
960 | 0 | old_iterators = new_iterators; |
961 | 0 | }); |
962 | 0 | } |
963 | | |
964 | 0 | TEST_F_EX(DocOperationTest, QLRangeFilterAscendingForwardScan, DocOperationRangeFilterTest) { |
965 | 0 | TestWithSortingType(SortingType::kAscending, true); |
966 | 0 | } |
967 | | |
968 | 0 | TEST_F_EX(DocOperationTest, QLRangeFilterDescendingForwardScan, DocOperationRangeFilterTest) { |
969 | 0 | TestWithSortingType(SortingType::kDescending, true); |
970 | 0 | } |
971 | | |
972 | 0 | TEST_F_EX(DocOperationTest, QLRangeFilterAscendingReverseScan, DocOperationRangeFilterTest) { |
973 | 0 | TestWithSortingType(SortingType::kAscending, false); |
974 | 0 | } |
975 | | |
976 | 0 | TEST_F_EX(DocOperationTest, QLRangeFilterDescendingReverseScan, DocOperationRangeFilterTest) { |
977 | 0 | TestWithSortingType(SortingType::kDescending, false); |
978 | 0 | } |
979 | | |
980 | | class DocOperationTxnScanTest : public DocOperationScanTest { |
981 | | protected: |
982 | | void DoTestWithSortingType(SortingType sorting_type, bool is_forward_scan, |
983 | 0 | size_t num_rows_per_key) override { |
984 | 0 | ASSERT_OK(DisableCompactions()); |
985 | |
|
986 | 0 | InitSchema(sorting_type); |
987 | |
|
988 | 0 | TransactionStatusManagerMock txn_status_manager; |
989 | 0 | SetTransactionIsolationLevel(IsolationLevel::SNAPSHOT_ISOLATION); |
990 | |
|
991 | 0 | InsertRows(num_rows_per_key, &txn_status_manager); |
992 | |
|
993 | 0 | PerformScans(is_forward_scan, |
994 | 0 | TransactionOperationContext(TransactionId::GenerateRandom(), &txn_status_manager), |
995 | 0 | [](size_t){}); |
996 | 0 | } |
997 | | }; |
998 | | |
999 | 0 | TEST_F_EX(DocOperationTest, QLTxnAscendingForwardScan, DocOperationTxnScanTest) { |
1000 | 0 | TestWithSortingType(SortingType::kAscending, true); |
1001 | 0 | } |
1002 | | |
1003 | 0 | TEST_F_EX(DocOperationTest, QLTxnDescendingForwardScan, DocOperationTxnScanTest) { |
1004 | 0 | TestWithSortingType(SortingType::kDescending, true); |
1005 | 0 | } |
1006 | | |
1007 | 0 | TEST_F_EX(DocOperationTest, QLTxnAscendingReverseScan, DocOperationTxnScanTest) { |
1008 | 0 | TestWithSortingType(SortingType::kAscending, false); |
1009 | 0 | } |
1010 | | |
1011 | 0 | TEST_F_EX(DocOperationTest, QLTxnDescendingReverseScan, DocOperationTxnScanTest) { |
1012 | 0 | TestWithSortingType(SortingType::kDescending, false); |
1013 | 0 | } |
1014 | | |
1015 | 0 | TEST_F(DocOperationTest, TestQLCompactions) { |
1016 | 0 | yb::QLWriteRequestPB ql_writereq_pb; |
1017 | 0 | yb::QLResponsePB ql_writeresp_pb; |
1018 | |
|
1019 | 0 | HybridTime t0 = HybridClock::HybridTimeFromMicrosecondsAndLogicalValue(1000, 0); |
1020 | 0 | HybridTime t0prime = HybridClock::HybridTimeFromMicrosecondsAndLogicalValue(1000, 1); |
1021 | 0 | HybridTime t1 = HybridClock::AddPhysicalTimeToHybridTime(t0, MonoDelta::FromMilliseconds(1001)); |
1022 | | |
1023 | | // Define the schema. |
1024 | 0 | Schema schema = CreateSchema(); |
1025 | 0 | WriteQLRow(QLWriteRequestPB_QLStmtType_QL_STMT_INSERT, schema, vector<int>({1, 1, 2, 3}), |
1026 | 0 | 1000, t0); |
1027 | |
|
1028 | 0 | AssertDocDbDebugDumpStrEq(R"#( |
1029 | 0 | SubDocKey(DocKey(0x0000, [1], []), [SystemColumnId(0); HT{ physical: 1000 }]) -> null; ttl: 1.000s |
1030 | 0 | SubDocKey(DocKey(0x0000, [1], []), [ColumnId(1); HT{ physical: 1000 w: 1 }]) -> 1; ttl: 1.000s |
1031 | 0 | SubDocKey(DocKey(0x0000, [1], []), [ColumnId(2); HT{ physical: 1000 w: 2 }]) -> 2; ttl: 1.000s |
1032 | 0 | SubDocKey(DocKey(0x0000, [1], []), [ColumnId(3); HT{ physical: 1000 w: 3 }]) -> 3; ttl: 1.000s |
1033 | 0 | )#"); |
1034 | |
|
1035 | 0 | FullyCompactHistoryBefore(t1); |
1036 | | |
1037 | | // Verify all entries are purged. |
1038 | 0 | AssertDocDbDebugDumpStrEq(R"#( |
1039 | 0 | )#"); |
1040 | | |
1041 | | // Add a row with a TTL. |
1042 | 0 | WriteQLRow(QLWriteRequestPB_QLStmtType_QL_STMT_INSERT, schema, vector<int>({1, 1, 2, 3}), |
1043 | 0 | 1000, t0); |
1044 | 0 | AssertDocDbDebugDumpStrEq(R"#( |
1045 | 0 | SubDocKey(DocKey(0x0000, [1], []), [SystemColumnId(0); HT{ physical: 1000 }]) -> null; ttl: 1.000s |
1046 | 0 | SubDocKey(DocKey(0x0000, [1], []), [ColumnId(1); HT{ physical: 1000 w: 1 }]) -> 1; ttl: 1.000s |
1047 | 0 | SubDocKey(DocKey(0x0000, [1], []), [ColumnId(2); HT{ physical: 1000 w: 2 }]) -> 2; ttl: 1.000s |
1048 | 0 | SubDocKey(DocKey(0x0000, [1], []), [ColumnId(3); HT{ physical: 1000 w: 3 }]) -> 3; ttl: 1.000s |
1049 | 0 | )#"); |
1050 | | |
1051 | | // Update the columns with a higher TTL. |
1052 | 0 | yb::QLWriteRequestPB ql_update_pb; |
1053 | 0 | yb::QLResponsePB ql_update_resp_pb; |
1054 | 0 | ql_writereq_pb.set_type( |
1055 | 0 | QLWriteRequestPB_QLStmtType::QLWriteRequestPB_QLStmtType_QL_STMT_UPDATE); |
1056 | |
|
1057 | 0 | ql_writereq_pb.set_hash_code(kFixedHashCode); |
1058 | 0 | AddPrimaryKeyColumn(&ql_writereq_pb, 1); |
1059 | 0 | AddColumnValues(schema, {10, 20, 30}, &ql_writereq_pb); |
1060 | 0 | ql_writereq_pb.set_ttl(2000); |
1061 | | |
1062 | | // Write to docdb at the same physical time and a bumped-up logical time. |
1063 | 0 | WriteQL(ql_writereq_pb, schema, &ql_writeresp_pb, t0prime); |
1064 | |
|
1065 | 0 | AssertDocDbDebugDumpStrEq(R"#( |
1066 | 0 | SubDocKey(DocKey(0x0000, [1], []), [SystemColumnId(0); HT{ physical: 1000 }]) -> \ |
1067 | 0 | null; ttl: 1.000s |
1068 | 0 | SubDocKey(DocKey(0x0000, [1], []), [ColumnId(1); HT{ physical: 1000 logical: 1 }]) -> \ |
1069 | 0 | 10; ttl: 2.000s |
1070 | 0 | SubDocKey(DocKey(0x0000, [1], []), [ColumnId(1); HT{ physical: 1000 w: 1 }]) -> \ |
1071 | 0 | 1; ttl: 1.000s |
1072 | 0 | SubDocKey(DocKey(0x0000, [1], []), [ColumnId(2); HT{ physical: 1000 logical: 1 w: 1 }]) -> \ |
1073 | 0 | 20; ttl: 2.000s |
1074 | 0 | SubDocKey(DocKey(0x0000, [1], []), [ColumnId(2); HT{ physical: 1000 w: 2 }]) -> \ |
1075 | 0 | 2; ttl: 1.000s |
1076 | 0 | SubDocKey(DocKey(0x0000, [1], []), [ColumnId(3); HT{ physical: 1000 logical: 1 w: 2 }]) -> \ |
1077 | 0 | 30; ttl: 2.000s |
1078 | 0 | SubDocKey(DocKey(0x0000, [1], []), [ColumnId(3); HT{ physical: 1000 w: 3 }]) -> 3; ttl: 1.000s |
1079 | 0 | )#"); |
1080 | |
|
1081 | 0 | FullyCompactHistoryBefore(t1); |
1082 | | |
1083 | | // Verify the rest of the columns still live. |
1084 | 0 | AssertDocDbDebugDumpStrEq(R"#( |
1085 | 0 | SubDocKey(DocKey(0x0000, [1], []), [ColumnId(1); HT{ physical: 1000 logical: 1 }]) -> \ |
1086 | 0 | 10; ttl: 2.000s |
1087 | 0 | SubDocKey(DocKey(0x0000, [1], []), [ColumnId(2); HT{ physical: 1000 logical: 1 w: 1 }]) -> \ |
1088 | 0 | 20; ttl: 2.000s |
1089 | 0 | SubDocKey(DocKey(0x0000, [1], []), [ColumnId(3); HT{ physical: 1000 logical: 1 w: 2 }]) -> \ |
1090 | 0 | 30; ttl: 2.000s |
1091 | 0 | )#"); |
1092 | | |
1093 | | // Verify reads work well without system column id. |
1094 | 0 | QLRowBlock row_block = ReadQLRow(schema, 1, t1); |
1095 | 0 | ASSERT_EQ(1, row_block.row_count()); |
1096 | 0 | EXPECT_EQ(1, row_block.row(0).column(0).int32_value()); |
1097 | 0 | EXPECT_EQ(10, row_block.row(0).column(1).int32_value()); |
1098 | 0 | EXPECT_EQ(20, row_block.row(0).column(2).int32_value()); |
1099 | 0 | EXPECT_EQ(30, row_block.row(0).column(3).int32_value()); |
1100 | 0 | } |
1101 | | |
1102 | | namespace { |
1103 | | |
1104 | 0 | size_t GenerateFiles(int total_batches, DocOperationTest* test, const int kBigFileFrequency = 7) { |
1105 | 0 | auto schema = test->CreateSchema(); |
1106 | |
|
1107 | 0 | auto t0 = HybridTime::FromMicrosecondsAndLogicalValue(1000, 0); |
1108 | 0 | const int kBigFileRows = 10000; |
1109 | 0 | size_t expected_files = 0; |
1110 | 0 | bool first = true; |
1111 | 0 | for (int i = 0; i != total_batches; ++i) { |
1112 | 0 | int base = i * kBigFileRows; |
1113 | 0 | int count; |
1114 | 0 | if (i % kBigFileFrequency == 0) { |
1115 | 0 | count = kBigFileRows; |
1116 | 0 | ++expected_files; |
1117 | 0 | first = true; |
1118 | 0 | } else { |
1119 | 0 | count = 1; |
1120 | 0 | if (first) { |
1121 | 0 | ++expected_files; |
1122 | 0 | first = false; |
1123 | 0 | } |
1124 | 0 | } |
1125 | 0 | for (int j = base; j != base + count; ++j) { |
1126 | 0 | test->WriteQLRow( |
1127 | 0 | QLWriteRequestPB_QLStmtType_QL_STMT_INSERT, schema, {j, j, j, j}, 1000000, t0); |
1128 | 0 | } |
1129 | 0 | EXPECT_OK(test->FlushRocksDbAndWait()); |
1130 | 0 | } |
1131 | 0 | return expected_files; |
1132 | 0 | } |
1133 | | |
1134 | | int CountBigFiles(const std::vector<rocksdb::LiveFileMetaData>& files, |
1135 | 0 | const size_t kMaxFileSize) { |
1136 | 0 | int num_large_files = 0; |
1137 | 0 | for (auto file : files) { |
1138 | 0 | if (file.uncompressed_size > kMaxFileSize) { |
1139 | 0 | num_large_files++; |
1140 | 0 | } |
1141 | 0 | } |
1142 | 0 | return num_large_files; |
1143 | 0 | } |
1144 | | |
1145 | 0 | void WaitCompactionsDone(rocksdb::DB* db) { |
1146 | 0 | for (;;) { |
1147 | 0 | std::this_thread::sleep_for(500ms); |
1148 | 0 | uint64_t value = 0; |
1149 | 0 | ASSERT_TRUE(db->GetIntProperty(rocksdb::DB::Properties::kNumRunningCompactions, &value)); |
1150 | 0 | if (value == 0) { |
1151 | 0 | break; |
1152 | 0 | } |
1153 | 0 | } |
1154 | 0 | } |
1155 | | |
1156 | | } // namespace |
1157 | | |
1158 | 0 | TEST_F(DocOperationTest, MaxFileSizeForCompaction) { |
1159 | 0 | google::FlagSaver flag_saver; |
1160 | |
|
1161 | 0 | ASSERT_OK(DisableCompactions()); |
1162 | 0 | const int kTotalBatches = 20; |
1163 | 0 | auto expected_files = GenerateFiles(kTotalBatches, this); |
1164 | |
|
1165 | 0 | std::vector<rocksdb::LiveFileMetaData> files; |
1166 | 0 | rocksdb()->GetLiveFilesMetaData(&files); |
1167 | 0 | ASSERT_EQ(kTotalBatches, files.size()); |
1168 | |
|
1169 | 0 | SetMaxFileSizeForCompaction(100_KB); |
1170 | 0 | ASSERT_OK(ReinitDBOptions()); |
1171 | |
|
1172 | 0 | WaitCompactionsDone(rocksdb()); |
1173 | |
|
1174 | 0 | files.clear(); |
1175 | 0 | rocksdb()->GetLiveFilesMetaData(&files); |
1176 | 0 | ASSERT_EQ(expected_files, files.size()); |
1177 | 0 | } |
1178 | | |
1179 | 0 | TEST_F(DocOperationTest, MaxFileSizeWithWritesTrigger) { |
1180 | 0 | google::FlagSaver flag_saver; |
1181 | |
|
1182 | 0 | ASSERT_OK(DisableCompactions()); |
1183 | 0 | const int kTotalBatches = 20; |
1184 | 0 | GenerateFiles(kTotalBatches, this); |
1185 | |
|
1186 | 0 | SetMaxFileSizeForCompaction(100_KB); |
1187 | 0 | ASSERT_OK(ReinitDBOptions()); |
1188 | |
|
1189 | 0 | WaitCompactionsDone(rocksdb()); |
1190 | |
|
1191 | 0 | FLAGS_rocksdb_level0_slowdown_writes_trigger = 2; |
1192 | 0 | FLAGS_rocksdb_level0_stop_writes_trigger = 1; |
1193 | 0 | ASSERT_OK(ReinitDBOptions()); |
1194 | |
|
1195 | 0 | std::vector<rocksdb::LiveFileMetaData> files; |
1196 | 0 | rocksdb()->GetLiveFilesMetaData(&files); |
1197 | 0 | ASSERT_GE(files.size(), 3); |
1198 | |
|
1199 | 0 | auto handle_impl = down_cast<rocksdb::ColumnFamilyHandleImpl*>( |
1200 | 0 | regular_db_->DefaultColumnFamily()); |
1201 | 0 | auto stats = handle_impl->cfd()->internal_stats(); |
1202 | 0 | ASSERT_EQ(0, stats->GetCFStats(rocksdb::InternalStats::LEVEL0_NUM_FILES_TOTAL)); |
1203 | 0 | ASSERT_EQ(0, stats->GetCFStats(rocksdb::InternalStats::LEVEL0_SLOWDOWN_TOTAL)); |
1204 | 0 | } |
1205 | | |
1206 | 0 | TEST_F(DocOperationTest, MaxFileSizeIgnoredWithFileFilter) { |
1207 | 0 | ASSERT_OK(DisableCompactions()); |
1208 | 0 | const size_t kMaxFileSize = 100_KB; |
1209 | 0 | const int kNumFilesToWrite = 20; |
1210 | 0 | const int kBigFileFrequency = 7; |
1211 | 0 | const int kExpectedBigFiles = (kNumFilesToWrite-1)/kBigFileFrequency + 1; |
1212 | 0 | auto expected_files = GenerateFiles(kNumFilesToWrite, this, kBigFileFrequency); |
1213 | 0 | LOG(INFO) << "Files that would exist without compaction, if no filtering: " << expected_files; |
1214 | |
|
1215 | 0 | auto files = rocksdb()->GetLiveFilesMetaData(); |
1216 | 0 | ASSERT_EQ(kNumFilesToWrite, files.size()); |
1217 | 0 | ASSERT_EQ(kExpectedBigFiles, CountBigFiles(files, kMaxFileSize)); |
1218 | |
|
1219 | 0 | SetMaxFileSizeForCompaction(kMaxFileSize); |
1220 | | |
1221 | | // Use a filter factory that will expire every file. |
1222 | 0 | compaction_file_filter_factory_ = |
1223 | 0 | std::make_shared<DiscardUntilFileFilterFactory>(kAlwaysDiscard); |
1224 | |
|
1225 | 0 | ASSERT_OK(ReinitDBOptions()); |
1226 | |
|
1227 | 0 | WaitCompactionsDone(rocksdb()); |
1228 | |
|
1229 | 0 | files = rocksdb()->GetLiveFilesMetaData(); |
1230 | | |
1231 | | // We expect all files to be filtered and thus removed, no matter the size. |
1232 | 0 | ASSERT_EQ(0, files.size()); |
1233 | 0 | auto stats = rocksdb()->GetOptions().statistics; |
1234 | 0 | ASSERT_EQ(kNumFilesToWrite, stats->getTickerCount(rocksdb::COMPACTION_FILES_FILTERED)); |
1235 | 0 | } |
1236 | | |
1237 | 0 | TEST_F(DocOperationTest, EarlyFilesFilteredBeforeBigFile) { |
1238 | | // Ensure that any number of files can trigger a compaction. |
1239 | 0 | ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_level0_file_num_compaction_trigger) = 0; |
1240 | 0 | ASSERT_OK(DisableCompactions()); |
1241 | 0 | const size_t kMaxFileSize = 100_KB; |
1242 | 0 | const int kNumFilesToWrite = 20; |
1243 | 0 | const int kBigFileFrequency = 7; |
1244 | 0 | const int kNumFilesToExpire = 3; |
1245 | 0 | const int kExpectedBigFiles = (kNumFilesToWrite-1)/kBigFileFrequency + 1; |
1246 | | // Expected files should be one fewer than if none were discarded, since the first "big file" |
1247 | | // will be expired. |
1248 | 0 | auto expected_files = GenerateFiles(kNumFilesToWrite, this, kBigFileFrequency) - 1; |
1249 | 0 | LOG(INFO) << "Expiring " << kNumFilesToExpire << |
1250 | 0 | " files, first kept big file at " << kBigFileFrequency + 1; |
1251 | 0 | LOG(INFO) << "Expected files after compaction: " << expected_files; |
1252 | |
|
1253 | 0 | auto files = rocksdb()->GetLiveFilesMetaData(); |
1254 | 0 | ASSERT_EQ(kNumFilesToWrite, files.size()); |
1255 | 0 | ASSERT_EQ(kExpectedBigFiles, CountBigFiles(files, kMaxFileSize)); |
1256 | | |
1257 | | // Files will be ordered from latest to earliest, so select the nth file from the back. |
1258 | 0 | auto last_to_discard = |
1259 | 0 | rocksdb::TableFileNameToNumber(files[files.size() - kNumFilesToExpire].name); |
1260 | |
|
1261 | 0 | SetMaxFileSizeForCompaction(kMaxFileSize); |
1262 | | |
1263 | | // Use a filter factory that will expire exactly three files. |
1264 | 0 | compaction_file_filter_factory_ = |
1265 | 0 | std::make_shared<DiscardUntilFileFilterFactory>(last_to_discard); |
1266 | | |
1267 | | // Reinitialize the DB options with the file filter factory. |
1268 | 0 | ASSERT_OK(ReinitDBOptions()); |
1269 | | |
1270 | | // Compactions will be kicked off as part of options reinitialization. |
1271 | 0 | WaitCompactionsDone(rocksdb()); |
1272 | |
|
1273 | 0 | files = rocksdb()->GetLiveFilesMetaData(); |
1274 | | |
1275 | | // We expect three files to expire, one big file and two small ones. |
1276 | | // The remaining small files should be compacted, leaving 5 files remaining. |
1277 | | // |
1278 | | // [large][small][small]....[large][small][small]...[large][small][small]... |
1279 | | // becomes [compacted small][large][compacted small][large][compacted small] |
1280 | 0 | ASSERT_EQ(expected_files, files.size()); |
1281 | |
|
1282 | 0 | auto stats = rocksdb()->GetOptions().statistics; |
1283 | 0 | ASSERT_EQ(kNumFilesToExpire, stats->getTickerCount(rocksdb::COMPACTION_FILES_FILTERED)); |
1284 | 0 | } |
1285 | | |
1286 | | } // namespace docdb |
1287 | | } // namespace yb |