YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/docdb/doc_operation-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include <thread>
15
16
#include "yb/common/common.pb.h"
17
#include "yb/common/index.h"
18
#include "yb/common/ql_protocol_util.h"
19
#include "yb/common/ql_resultset.h"
20
#include "yb/common/ql_rowblock.h"
21
#include "yb/common/ql_value.h"
22
#include "yb/common/transaction-test-util.h"
23
24
#include "yb/docdb/cql_operation.h"
25
#include "yb/docdb/doc_rowwise_iterator.h"
26
#include "yb/docdb/docdb_debug.h"
27
#include "yb/docdb/docdb_rocksdb_util.h"
28
#include "yb/docdb/docdb_test_base.h"
29
#include "yb/docdb/docdb_test_util.h"
30
#include "yb/docdb/ql_rocksdb_storage.h"
31
#include "yb/docdb/redis_operation.h"
32
33
#include "yb/gutil/casts.h"
34
35
#include "yb/rocksdb/db/filename.h"
36
#include "yb/rocksdb/db/internal_stats.h"
37
38
#include "yb/server/hybrid_clock.h"
39
40
#include "yb/util/random_util.h"
41
#include "yb/util/size_literals.h"
42
#include "yb/util/tostring.h"
43
44
DECLARE_uint64(rocksdb_max_file_size_for_compaction);
45
DECLARE_int32(rocksdb_level0_slowdown_writes_trigger);
46
DECLARE_int32(rocksdb_level0_stop_writes_trigger);
47
DECLARE_int32(rocksdb_level0_file_num_compaction_trigger);
48
49
using namespace std::literals; // NOLINT
50
51
namespace yb {
52
namespace docdb {
53
54
using server::HybridClock;
55
56
namespace {
57
58
0
std::vector<ColumnId> CreateColumnIds(size_t count) {
59
0
  std::vector<ColumnId> result;
60
0
  result.reserve(count);
61
0
  for (size_t i = 0; i != count; ++i) {
62
0
    result.emplace_back(i);
63
0
  }
64
0
  return result;
65
0
}
66
67
constexpr int32_t kFixedHashCode = 0;
68
const int64_t kAlwaysDiscard = -1;
69
70
// DiscardUntilFileFilter is a test CompactionFileFilter than will be used to mark some
71
// files for direct deletion during the PickCompaction process in UniversalCompactionPicker.
72
// Any files with numbers less than the provided last discard file number will be
73
// included in the compaction but directly deleted.
74
//
75
// DiscardUntilFileFilter emulates behavior of DocDBCompactionFileFilter, which expires
76
// files based on their TTL.
77
class DiscardUntilFileFilter : public rocksdb::CompactionFileFilter {
78
 public:
79
  explicit DiscardUntilFileFilter(int64_t last_discard) :
80
0
      last_discard_(last_discard) {}
81
82
  // Filters all file numbers less than or equal to the filter's last_discard_.
83
  // Setting last_discard_ to the kAlwaysDiscard constant will result in every
84
  // incoming file being filtered.
85
0
  rocksdb::FilterDecision Filter(const rocksdb::FileMetaData* file) override {
86
0
    if (last_discard_ == kAlwaysDiscard ||
87
0
        file->fd.GetNumber() <= implicit_cast<uint64_t>(last_discard_)) {
88
0
      LOG(INFO) << "Filtering file: " << file->fd.GetNumber() << ", size: "
89
0
          << file->fd.GetBaseFileSize() << ", total file size: " << file->fd.GetTotalFileSize();
90
0
      return rocksdb::FilterDecision::kDiscard;
91
0
    }
92
0
    return rocksdb::FilterDecision::kKeep;
93
0
  }
94
95
0
  const char* Name() const override { return "DiscardUntilFileFilter"; }
96
97
 private:
98
  const int64_t last_discard_;
99
};
100
101
// A CompactionFileFilterFactory that takes a file number as input, and filters all
102
// file numbers lower than or equal to that one. Any larger file numbers will not
103
// be filtered.
104
class DiscardUntilFileFilterFactory : public rocksdb::CompactionFileFilterFactory {
105
 public:
106
  explicit DiscardUntilFileFilterFactory(const int64_t last_to_discard) :
107
0
      last_to_discard_(last_to_discard) {}
108
109
  std::unique_ptr<rocksdb::CompactionFileFilter> CreateCompactionFileFilter(
110
0
      const std::vector<rocksdb::FileMetaData*>& inputs) override {
111
0
    return std::make_unique<DiscardUntilFileFilter>(last_to_discard_);
112
0
  }
113
114
0
  const char* Name() const override { return "DiscardUntilFileFilterFactory"; }
115
116
 private:
117
  const int64_t last_to_discard_;
118
};
119
120
// MakeMaxFileSizeFunction will create a function that returns the
121
// rocksdb_max_file_size_for_compaction flag if it is set to a positive number, and returns
122
// the max uint64 otherwise. It does NOT take the schema's table TTL into consideration.
123
0
auto MakeMaxFileSizeFunction() {
124
  // Trick to get type of max_file_size_for_compaction field.
125
0
  typedef typename decltype(
126
0
      static_cast<rocksdb::Options*>(nullptr)->max_file_size_for_compaction)::element_type
127
0
      MaxFileSizeFunction;
128
0
  return std::make_shared<MaxFileSizeFunction>([]{
129
0
    if (FLAGS_rocksdb_max_file_size_for_compaction > 0) {
130
0
      return FLAGS_rocksdb_max_file_size_for_compaction;
131
0
    }
132
0
    return std::numeric_limits<uint64_t>::max();
133
0
  });
134
0
}
135
136
} // namespace
137
138
class DocOperationTest : public DocDBTestBase {
139
 public:
140
0
  DocOperationTest() {
141
0
    SeedRandom();
142
0
  }
143
144
0
  Schema CreateSchema() {
145
0
    ColumnSchema hash_column_schema("k", INT32, false, true);
146
0
    ColumnSchema column1_schema("c1", INT32, false, false);
147
0
    ColumnSchema column2_schema("c2", INT32, false, false);
148
0
    ColumnSchema column3_schema("c3", INT32, false, false);
149
0
    const vector<ColumnSchema> columns({hash_column_schema, column1_schema, column2_schema,
150
0
                                           column3_schema});
151
0
    Schema schema(columns, CreateColumnIds(columns.size()), 1);
152
0
    return schema;
153
0
  }
154
155
0
  void AddPrimaryKeyColumn(yb::QLWriteRequestPB* ql_writereq_pb, int32_t value) {
156
0
    ql_writereq_pb->add_hashed_column_values()->mutable_value()->set_int32_value(value);
157
0
  }
158
159
0
  void AddRangeKeyColumn(int32_t value, yb::QLWriteRequestPB* ql_writereq_pb) {
160
0
    ql_writereq_pb->add_range_column_values()->mutable_value()->set_int32_value(value);
161
0
  }
162
163
  void AddColumnValues(const Schema& schema,
164
                       const vector<int32_t>& column_values,
165
0
                       yb::QLWriteRequestPB* ql_writereq_pb) {
166
0
    ASSERT_EQ(schema.num_columns() - schema.num_key_columns(), column_values.size());
167
0
    for (size_t i = 0; i < column_values.size(); i++) {
168
0
      auto column = ql_writereq_pb->add_column_values();
169
0
      column->set_column_id(narrow_cast<int32_t>(schema.num_key_columns() + i));
170
0
      column->mutable_expr()->mutable_value()->set_int32_value(column_values[i]);
171
0
    }
172
0
  }
173
174
  void WriteQL(const QLWriteRequestPB& ql_writereq_pb, const Schema& schema,
175
               QLResponsePB* ql_writeresp_pb,
176
               const HybridTime& hybrid_time = HybridTime::kMax,
177
               const TransactionOperationContext& txn_op_context =
178
0
                   kNonTransactionalOperationContext) {
179
0
    IndexMap index_map;
180
0
    QLWriteOperation ql_write_op(ql_writereq_pb,
181
0
                                 std::shared_ptr<const Schema>(&schema, [](const Schema*){}),
182
0
                                 index_map, nullptr /* unique_index_key_schema */, txn_op_context);
183
0
    ASSERT_OK(ql_write_op.Init(ql_writeresp_pb));
184
0
    auto doc_write_batch = MakeDocWriteBatch();
185
0
    HybridTime restart_read_ht;
186
0
    ASSERT_OK(ql_write_op.Apply(
187
0
        {&doc_write_batch, CoarseTimePoint::max() /* deadline */, ReadHybridTime(),
188
0
         &restart_read_ht}));
189
0
    ASSERT_OK(WriteToRocksDB(doc_write_batch, hybrid_time));
190
0
  }
191
192
0
  void AssertWithTTL(QLWriteRequestPB_QLStmtType stmt_type) {
193
0
    if (stmt_type == QLWriteRequestPB::QL_STMT_INSERT) {
194
0
      AssertDocDbDebugDumpStrEq(R"#(
195
0
SubDocKey(DocKey(0x0000, [1], []), [SystemColumnId(0); HT<max>]) -> null; ttl: 2.000s
196
0
SubDocKey(DocKey(0x0000, [1], []), [ColumnId(1); HT{ <max> w: 1 }]) -> 2; ttl: 2.000s
197
0
SubDocKey(DocKey(0x0000, [1], []), [ColumnId(2); HT{ <max> w: 2 }]) -> 3; ttl: 2.000s
198
0
SubDocKey(DocKey(0x0000, [1], []), [ColumnId(3); HT{ <max> w: 3 }]) -> 4; ttl: 2.000s
199
0
      )#");
200
0
    } else {
201
0
      AssertDocDbDebugDumpStrEq(R"#(
202
0
SubDocKey(DocKey(0x0000, [1], []), [ColumnId(1); HT<max>]) -> 2; ttl: 2.000s
203
0
SubDocKey(DocKey(0x0000, [1], []), [ColumnId(2); HT{ <max> w: 1 }]) -> 3; ttl: 2.000s
204
0
SubDocKey(DocKey(0x0000, [1], []), [ColumnId(3); HT{ <max> w: 2 }]) -> 4; ttl: 2.000s
205
0
      )#");
206
0
    }
207
0
  }
208
209
0
  void AssertWithoutTTL(QLWriteRequestPB_QLStmtType stmt_type) {
210
0
    if (stmt_type == QLWriteRequestPB::QL_STMT_INSERT) {
211
0
      AssertDocDbDebugDumpStrEq(R"#(
212
0
SubDocKey(DocKey(0x0000, [1], []), [SystemColumnId(0); HT<max>]) -> null
213
0
SubDocKey(DocKey(0x0000, [1], []), [ColumnId(1); HT{ <max> w: 1 }]) -> 2
214
0
SubDocKey(DocKey(0x0000, [1], []), [ColumnId(2); HT{ <max> w: 2 }]) -> 3
215
0
SubDocKey(DocKey(0x0000, [1], []), [ColumnId(3); HT{ <max> w: 3 }]) -> 4
216
0
      )#");
217
0
    } else {
218
0
      AssertDocDbDebugDumpStrEq(R"#(
219
0
SubDocKey(DocKey(0x0000, [1], []), [ColumnId(1); HT<max>]) -> 2
220
0
SubDocKey(DocKey(0x0000, [1], []), [ColumnId(2); HT{ <max> w: 1 }]) -> 3
221
0
SubDocKey(DocKey(0x0000, [1], []), [ColumnId(3); HT{ <max> w: 2 }]) -> 4
222
0
      )#");
223
0
    }
224
0
  }
225
226
0
  void RunTestQLInsertUpdate(QLWriteRequestPB_QLStmtType stmt_type, const int ttl = -1) {
227
0
    yb::QLWriteRequestPB ql_writereq_pb;
228
0
    yb::QLResponsePB ql_writeresp_pb;
229
230
    // Define the schema.
231
0
    Schema schema = CreateSchema();
232
233
0
    ql_writereq_pb.set_type(stmt_type);
234
    // Add primary key column.
235
0
    AddPrimaryKeyColumn(&ql_writereq_pb, 1);
236
0
    ql_writereq_pb.set_hash_code(0);
237
238
0
    AddColumnValues(schema, {2, 3, 4}, &ql_writereq_pb);
239
240
0
    if (ttl != -1) {
241
0
      ql_writereq_pb.set_ttl(ttl);
242
0
    }
243
244
    // Write to docdb.
245
0
    WriteQL(ql_writereq_pb, schema, &ql_writeresp_pb);
246
247
0
    if (ttl == -1) {
248
0
      AssertWithoutTTL(stmt_type);
249
0
    } else {
250
0
      AssertWithTTL(stmt_type);
251
0
    }
252
0
  }
253
254
  yb::QLWriteRequestPB WriteQLRowReq(QLWriteRequestPB_QLStmtType stmt_type, const Schema& schema,
255
                  const vector<int32_t>& column_values, const HybridTime& hybrid_time,
256
                  const TransactionOperationContext& txn_op_content =
257
0
                      kNonTransactionalOperationContext) {
258
0
    yb::QLWriteRequestPB ql_writereq_pb;
259
0
    ql_writereq_pb.set_type(stmt_type);
260
261
    // Add primary key column.
262
0
    ql_writereq_pb.set_hash_code(0);
263
0
    for (size_t i = 0; i != schema.num_key_columns(); ++i) {
264
0
      if (i < schema.num_hash_key_columns()) {
265
0
        AddPrimaryKeyColumn(&ql_writereq_pb, column_values[i]);
266
0
      } else {
267
0
        AddRangeKeyColumn(column_values[i], &ql_writereq_pb);
268
0
      }
269
0
    }
270
0
    std::vector<int32_t> values(column_values.begin() + schema.num_key_columns(),
271
0
                                column_values.end());
272
0
    AddColumnValues(schema, values, &ql_writereq_pb);
273
0
    return ql_writereq_pb;
274
0
  }
275
276
  void WriteQLRow(QLWriteRequestPB_QLStmtType stmt_type, const Schema& schema,
277
                  const vector<int32_t>& column_values, int64_t ttl, const HybridTime& hybrid_time,
278
                  const TransactionOperationContext& txn_op_content =
279
0
                      kNonTransactionalOperationContext) {
280
0
    yb::QLWriteRequestPB ql_writereq_pb = WriteQLRowReq(
281
0
        stmt_type, schema, column_values, hybrid_time, txn_op_content);
282
0
    ql_writereq_pb.set_ttl(ttl);
283
0
    yb::QLResponsePB ql_writeresp_pb;
284
    // Write to docdb.
285
0
    WriteQL(ql_writereq_pb, schema, &ql_writeresp_pb, hybrid_time, txn_op_content);
286
0
  }
287
288
  void WriteQLRow(QLWriteRequestPB_QLStmtType stmt_type, const Schema& schema,
289
                  const vector<int32_t>& column_values, const HybridTime& hybrid_time,
290
                  const TransactionOperationContext& txn_op_content =
291
0
                      kNonTransactionalOperationContext) {
292
0
    yb::QLWriteRequestPB ql_writereq_pb = WriteQLRowReq(
293
0
        stmt_type, schema, column_values, hybrid_time, txn_op_content);
294
0
    yb::QLResponsePB ql_writeresp_pb;
295
    // Write to docdb.
296
0
    WriteQL(ql_writereq_pb, schema, &ql_writeresp_pb, hybrid_time, txn_op_content);
297
0
  }
298
299
0
  QLRowBlock ReadQLRow(const Schema& schema, int32_t primary_key, const HybridTime& read_time) {
300
0
    QLReadRequestPB ql_read_req;
301
0
    ql_read_req.add_hashed_column_values()->mutable_value()->set_int32_value(primary_key);
302
0
    ql_read_req.set_hash_code(kFixedHashCode);
303
0
    ql_read_req.set_max_hash_code(kFixedHashCode);
304
305
0
    QLRowBlock row_block(schema, vector<ColumnId> ({ColumnId(0), ColumnId(1), ColumnId(2),
306
0
                                                        ColumnId(3)}));
307
0
    const Schema& projection = row_block.schema();
308
0
    QLRSRowDescPB *rsrow_desc_pb = ql_read_req.mutable_rsrow_desc();
309
0
    for (int32_t i = 0; i <= 3 ; i++) {
310
0
      ql_read_req.add_selected_exprs()->set_column_id(i);
311
0
      ql_read_req.mutable_column_refs()->add_ids(i);
312
313
0
      auto col = projection.column_by_id(ColumnId(i));
314
0
      EXPECT_OK(col);
315
0
      QLRSColDescPB *rscol_desc = rsrow_desc_pb->add_rscol_descs();
316
0
      rscol_desc->set_name(col->name());
317
0
      col->type()->ToQLTypePB(rscol_desc->mutable_ql_type());
318
0
    }
319
320
0
    QLReadOperation read_op(ql_read_req, kNonTransactionalOperationContext);
321
0
    QLRocksDBStorage ql_storage(doc_db());
322
0
    const QLRSRowDesc rsrow_desc(*rsrow_desc_pb);
323
0
    faststring rows_data;
324
0
    QLResultSet resultset(&rsrow_desc, &rows_data);
325
0
    HybridTime read_restart_ht;
326
0
    EXPECT_OK(read_op.Execute(
327
0
        ql_storage, CoarseTimePoint::max() /* deadline */, ReadHybridTime::SingleTime(read_time),
328
0
        schema, projection, &resultset, &read_restart_ht));
329
0
    EXPECT_FALSE(read_restart_ht.is_valid());
330
331
    // Transfer the column values from result set to rowblock.
332
0
    Slice data(rows_data.data(), rows_data.size());
333
0
    EXPECT_OK(row_block.Deserialize(YQL_CLIENT_CQL, &data));
334
0
    return row_block;
335
0
  }
336
337
0
  void SetMaxFileSizeForCompaction(const uint64_t max_size) {
338
0
    ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_max_file_size_for_compaction) = max_size;
339
    // Make a function that will always use rocksdb_max_file_size_for_compaction.
340
    // Normally, max_file_size_for_compaction is only used for tables with TTL.
341
0
    max_file_size_for_compaction_ = MakeMaxFileSizeFunction();
342
0
  }
343
};
344
345
0
TEST_F(DocOperationTest, TestRedisSetKVWithTTL) {
346
  // Write key with ttl to docdb.
347
0
  auto db = rocksdb();
348
0
  yb::RedisWriteRequestPB redis_write_operation_pb;
349
0
  auto set_request_pb = redis_write_operation_pb.mutable_set_request();
350
0
  set_request_pb->set_ttl(2000);
351
0
  redis_write_operation_pb.mutable_key_value()->set_key("abc");
352
0
  redis_write_operation_pb.mutable_key_value()->set_type(REDIS_TYPE_STRING);
353
0
  redis_write_operation_pb.mutable_key_value()->set_hash_code(123);
354
0
  redis_write_operation_pb.mutable_key_value()->add_value("xyz");
355
0
  RedisWriteOperation redis_write_operation(redis_write_operation_pb);
356
0
  auto doc_write_batch = MakeDocWriteBatch();
357
0
  ASSERT_OK(redis_write_operation.Apply(
358
0
      {&doc_write_batch, CoarseTimePoint::max() /* deadline */, ReadHybridTime()}));
359
360
0
  ASSERT_OK(WriteToRocksDB(doc_write_batch, HybridTime::FromMicros(1000)));
361
362
  // Read key from rocksdb.
363
0
  const KeyBytes doc_key = DocKey::FromRedisKey(123, "abc").Encode();
364
0
  rocksdb::ReadOptions read_opts;
365
0
  auto iter = std::unique_ptr<rocksdb::Iterator>(db->NewIterator(read_opts));
366
0
  ROCKSDB_SEEK(iter.get(), doc_key.AsSlice());
367
0
  ASSERT_TRUE(iter->Valid());
368
369
  // Verify correct ttl.
370
0
  MonoDelta ttl;
371
0
  auto value = iter->value();
372
0
  ASSERT_OK(Value::DecodeTTL(&value, &ttl));
373
0
  EXPECT_EQ(2000, ttl.ToMilliseconds());
374
0
}
375
376
0
TEST_F(DocOperationTest, TestQLInsertWithTTL) {
377
0
  RunTestQLInsertUpdate(QLWriteRequestPB_QLStmtType_QL_STMT_INSERT, 2000);
378
0
}
379
380
0
TEST_F(DocOperationTest, TestQLUpdateWithTTL) {
381
0
  RunTestQLInsertUpdate(QLWriteRequestPB_QLStmtType_QL_STMT_UPDATE, 2000);
382
0
}
383
384
0
TEST_F(DocOperationTest, TestQLInsertWithoutTTL) {
385
0
  RunTestQLInsertUpdate(QLWriteRequestPB_QLStmtType_QL_STMT_INSERT);
386
0
}
387
388
0
TEST_F(DocOperationTest, TestQLUpdateWithoutTTL) {
389
0
  RunTestQLInsertUpdate(QLWriteRequestPB_QLStmtType_QL_STMT_UPDATE);
390
0
}
391
392
0
TEST_F(DocOperationTest, TestQLWriteNulls) {
393
0
  yb::QLWriteRequestPB ql_writereq_pb;
394
0
  yb::QLResponsePB ql_writeresp_pb;
395
396
  // Define the schema.
397
0
  Schema schema = CreateSchema();
398
0
  ql_writereq_pb.set_type(
399
0
      QLWriteRequestPB_QLStmtType::QLWriteRequestPB_QLStmtType_QL_STMT_INSERT);
400
0
  ql_writereq_pb.set_hash_code(0);
401
402
  // Add primary key column.
403
0
  AddPrimaryKeyColumn(&ql_writereq_pb, 1);
404
405
  // Add null columns.
406
0
  for (int i = 0; i < 3; i++) {
407
0
    auto column = ql_writereq_pb.add_column_values();
408
0
    column->set_column_id(i + 1);
409
0
    column->mutable_expr()->mutable_value();
410
0
  }
411
412
  // Write to docdb.
413
0
  WriteQL(ql_writereq_pb, schema, &ql_writeresp_pb);
414
415
  // Null columns are converted to tombstones.
416
0
  AssertDocDbDebugDumpStrEq(R"#(
417
0
SubDocKey(DocKey(0x0000, [1], []), [SystemColumnId(0); HT<max>]) -> null
418
0
SubDocKey(DocKey(0x0000, [1], []), [ColumnId(1); HT{ <max> w: 1 }]) -> DEL
419
0
SubDocKey(DocKey(0x0000, [1], []), [ColumnId(2); HT{ <max> w: 2 }]) -> DEL
420
0
SubDocKey(DocKey(0x0000, [1], []), [ColumnId(3); HT{ <max> w: 3 }]) -> DEL
421
0
      )#");
422
0
}
423
424
TEST_F(DocOperationTest, TestQLReadWriteSimple) {
425
  yb::QLWriteRequestPB ql_writereq_pb;
426
  yb::QLResponsePB ql_writeresp_pb;
427
428
  // Define the schema.
429
  Schema schema = CreateSchema();
430
  WriteQLRow(QLWriteRequestPB_QLStmtType_QL_STMT_INSERT, schema, vector<int>({1, 1, 2, 3}),
431
           1000, HybridClock::HybridTimeFromMicrosecondsAndLogicalValue(1000, 0));
432
433
  AssertDocDbDebugDumpStrEq(R"#(
434
SubDocKey(DocKey(0x0000, [1], []), [SystemColumnId(0); HT{ physical: 1000 }]) -> null; ttl: 1.000s
435
SubDocKey(DocKey(0x0000, [1], []), [ColumnId(1); HT{ physical: 1000 w: 1 }]) -> 1; ttl: 1.000s
436
SubDocKey(DocKey(0x0000, [1], []), [ColumnId(2); HT{ physical: 1000 w: 2 }]) -> 2; ttl: 1.000s
437
SubDocKey(DocKey(0x0000, [1], []), [ColumnId(3); HT{ physical: 1000 w: 3 }]) -> 3; ttl: 1.000s
438
      )#");
439
440
  // Now read the value.
441
  QLRowBlock row_block = ReadQLRow(schema, 1,
442
                                  HybridClock::HybridTimeFromMicrosecondsAndLogicalValue(2000, 0));
443
  ASSERT_EQ(1, row_block.row_count());
444
  EXPECT_EQ(1, row_block.row(0).column(0).int32_value());
445
  EXPECT_EQ(1, row_block.row(0).column(1).int32_value());
446
  EXPECT_EQ(2, row_block.row(0).column(2).int32_value());
447
  EXPECT_EQ(3, row_block.row(0).column(3).int32_value());
448
}
449
450
0
TEST_F(DocOperationTest, TestQLRangeDeleteWithStaticColumnAvoidsFullPartitionKeyScan) {
451
0
  constexpr int kNumRows = 10000;
452
0
  constexpr int kDeleteRangeLow = 100;
453
0
  constexpr int kDeleteRangeHigh = 200;
454
455
  // Define the schema with a partition key, range key, static column, and value.
456
0
  SchemaBuilder builder;
457
0
  builder.set_next_column_id(ColumnId(0));
458
0
  ASSERT_OK(builder.AddHashKeyColumn("k", INT32));
459
0
  ASSERT_OK(builder.AddKeyColumn("r", INT32));
460
0
  ASSERT_OK(builder.AddColumn(ColumnSchema("s", INT32, false, false, true), false));
461
0
  ASSERT_OK(builder.AddColumn(ColumnSchema("v", INT32), false));
462
0
  auto schema = builder.Build();
463
464
  // Write rows with the same partition key but different range key
465
0
  for (int row_num = 0; row_num < kNumRows; ++row_num) {
466
0
    WriteQLRow(
467
0
        QLWriteRequestPB_QLStmtType_QL_STMT_INSERT,
468
0
        schema,
469
0
        vector<int>({1, row_num, 0, row_num}),
470
0
        HybridClock::HybridTimeFromMicrosecondsAndLogicalValue(1000, 0));
471
0
  }
472
0
  auto get_num_rocksdb_iter_moves = [&]() {
473
0
    auto num_next = regular_db_options().statistics->getTickerCount(
474
0
      rocksdb::Tickers::NUMBER_DB_NEXT);
475
0
    auto num_seek = regular_db_options().statistics->getTickerCount(
476
0
      rocksdb::Tickers::NUMBER_DB_SEEK);
477
0
    return num_next + num_seek;
478
0
  };
479
480
0
  auto rocksdb_iter_moves_before_delete = get_num_rocksdb_iter_moves();
481
482
  // Delete a subset of the partition
483
0
  yb::QLWriteRequestPB ql_writereq_pb;
484
0
  yb::QLResponsePB ql_writeresp_pb;
485
0
  ql_writereq_pb.set_type(QLWriteRequestPB_QLStmtType_QL_STMT_DELETE);
486
0
  AddPrimaryKeyColumn(&ql_writereq_pb, 1);
487
488
0
  auto where_clause_and = ql_writereq_pb.mutable_where_expr()->mutable_condition();
489
0
  where_clause_and->set_op(QLOperator::QL_OP_AND);
490
0
  QLAddInt32Condition(where_clause_and, 1, QL_OP_GREATER_THAN_EQUAL, kDeleteRangeLow);
491
0
  QLAddInt32Condition(where_clause_and, 1, QL_OP_LESS_THAN_EQUAL, kDeleteRangeHigh);
492
0
  ql_writereq_pb.set_hash_code(0);
493
494
0
  WriteQL(ql_writereq_pb, schema, &ql_writeresp_pb,
495
0
          HybridClock::HybridTimeFromMicrosecondsAndLogicalValue(2000, 0),
496
0
          kNonTransactionalOperationContext);
497
498
  // During deletion, we expect to move the RocksDB iterator *at most* once per docdb row in range,
499
  // plus once for the first docdb row out of range. We say *at most*, because it's possible to have
500
  // various combinations of Next() vs. Seek() calls. This simply tests that we do not have more
501
  // than the maximum allowed based on the schema and the restriction that we should not scan
502
  // outside the boundaries of the relevant deletion range.
503
0
  auto num_cql_rows_in_range = kDeleteRangeHigh - kDeleteRangeLow + 1;
504
0
  auto max_num_rocksdb_moves_per_cql_row = 3;
505
0
  auto max_num_rocksdb_iter_moves = num_cql_rows_in_range * max_num_rocksdb_moves_per_cql_row;
506
0
  ASSERT_LE(
507
0
      get_num_rocksdb_iter_moves() - rocksdb_iter_moves_before_delete,
508
0
      max_num_rocksdb_iter_moves + 1);
509
0
}
510
511
0
TEST_F(DocOperationTest, TestQLReadWithoutLivenessColumn) {
512
0
  const DocKey doc_key(kFixedHashCode, PrimitiveValues(PrimitiveValue::Int32(100)),
513
0
                       PrimitiveValues());
514
0
  KeyBytes encoded_doc_key(doc_key.Encode());
515
0
  ASSERT_OK(SetPrimitive(DocPath(encoded_doc_key, PrimitiveValue(ColumnId(1))),
516
0
                         Value(PrimitiveValue::Int32(2)), HybridTime(1000)));
517
0
  ASSERT_OK(SetPrimitive(DocPath(encoded_doc_key, PrimitiveValue(ColumnId(2))),
518
0
                         Value(PrimitiveValue::Int32(3)), HybridTime(2000)));
519
0
  ASSERT_OK(SetPrimitive(DocPath(encoded_doc_key, PrimitiveValue(ColumnId(3))),
520
0
                         Value(PrimitiveValue::Int32(4)), HybridTime(3000)));
521
522
0
  AssertDocDbDebugDumpStrEq(R"#(
523
0
SubDocKey(DocKey(0x0000, [100], []), [ColumnId(1); HT{ physical: 0 logical: 1000 }]) -> 2
524
0
SubDocKey(DocKey(0x0000, [100], []), [ColumnId(2); HT{ physical: 0 logical: 2000 }]) -> 3
525
0
SubDocKey(DocKey(0x0000, [100], []), [ColumnId(3); HT{ physical: 0 logical: 3000 }]) -> 4
526
0
      )#");
527
528
  // Now verify we can read without the system column id.
529
0
  Schema schema = CreateSchema();
530
0
  HybridTime read_time(3000);
531
0
  QLRowBlock row_block = ReadQLRow(schema, 100, read_time);
532
0
  ASSERT_EQ(1, row_block.row_count());
533
0
  EXPECT_EQ(100, row_block.row(0).column(0).int32_value());
534
0
  EXPECT_EQ(2, row_block.row(0).column(1).int32_value());
535
0
  EXPECT_EQ(3, row_block.row(0).column(2).int32_value());
536
0
  EXPECT_EQ(4, row_block.row(0).column(3).int32_value());
537
0
}
538
539
0
TEST_F(DocOperationTest, TestQLReadWithTombstone) {
540
0
  DocKey doc_key(0, PrimitiveValues(PrimitiveValue::Int32(100)), PrimitiveValues());
541
0
  KeyBytes encoded_doc_key(doc_key.Encode());
542
0
  ASSERT_OK(SetPrimitive(DocPath(encoded_doc_key, PrimitiveValue(ColumnId(1))),
543
0
                         Value(PrimitiveValue::kTombstone), HybridTime(1000)));
544
0
  ASSERT_OK(SetPrimitive(DocPath(encoded_doc_key, PrimitiveValue(ColumnId(2))),
545
0
                         Value(PrimitiveValue::kTombstone), HybridTime(2000)));
546
0
  ASSERT_OK(SetPrimitive(DocPath(encoded_doc_key, PrimitiveValue(ColumnId(3))),
547
0
                         Value(PrimitiveValue::kTombstone), HybridTime(3000)));
548
549
0
  AssertDocDbDebugDumpStrEq(R"#(
550
0
SubDocKey(DocKey(0x0000, [100], []), [ColumnId(1); HT{ physical: 0 logical: 1000 }]) -> DEL
551
0
SubDocKey(DocKey(0x0000, [100], []), [ColumnId(2); HT{ physical: 0 logical: 2000 }]) -> DEL
552
0
SubDocKey(DocKey(0x0000, [100], []), [ColumnId(3); HT{ physical: 0 logical: 3000 }]) -> DEL
553
0
      )#");
554
555
0
  Schema schema = CreateSchema();
556
0
  DocRowwiseIterator iter(schema, schema, kNonTransactionalOperationContext,
557
0
                          doc_db(), CoarseTimePoint::max() /* deadline */,
558
0
                          ReadHybridTime::FromUint64(3000));
559
0
  ASSERT_OK(iter.Init(YQL_TABLE_TYPE));
560
0
  ASSERT_FALSE(ASSERT_RESULT(iter.HasNext()));
561
562
  // Now verify row exists even with one valid column.
563
0
  doc_key = DocKey(kFixedHashCode, PrimitiveValues(PrimitiveValue::Int32(100)), PrimitiveValues());
564
0
  encoded_doc_key = doc_key.Encode();
565
0
  ASSERT_OK(SetPrimitive(DocPath(encoded_doc_key, PrimitiveValue(ColumnId(1))),
566
0
                         Value(PrimitiveValue::kTombstone), HybridTime(1001)));
567
0
  ASSERT_OK(SetPrimitive(DocPath(encoded_doc_key, PrimitiveValue(ColumnId(2))),
568
0
                         Value(PrimitiveValue::Int32(2),
569
0
                               MonoDelta::FromMilliseconds(1)), HybridTime(2001)));
570
0
  ASSERT_OK(SetPrimitive(DocPath(encoded_doc_key, PrimitiveValue(ColumnId(3))),
571
0
                         Value(PrimitiveValue::Int32(101)), HybridTime(3001)));
572
573
0
  AssertDocDbDebugDumpStrEq(R"#(
574
0
SubDocKey(DocKey(0x0000, [100], []), [ColumnId(1); HT{ physical: 0 logical: 1001 }]) -> DEL
575
0
SubDocKey(DocKey(0x0000, [100], []), [ColumnId(1); HT{ physical: 0 logical: 1000 }]) -> DEL
576
0
SubDocKey(DocKey(0x0000, [100], []), [ColumnId(2); HT{ physical: 0 logical: 2001 }]) -> \
577
0
    2; ttl: 0.001s
578
0
SubDocKey(DocKey(0x0000, [100], []), [ColumnId(2); HT{ physical: 0 logical: 2000 }]) -> DEL
579
0
SubDocKey(DocKey(0x0000, [100], []), [ColumnId(3); HT{ physical: 0 logical: 3001 }]) -> 101
580
0
SubDocKey(DocKey(0x0000, [100], []), [ColumnId(3); HT{ physical: 0 logical: 3000 }]) -> DEL
581
0
      )#");
582
583
0
  vector<PrimitiveValue> hashed_components({PrimitiveValue::Int32(100)});
584
0
  DocQLScanSpec ql_scan_spec(schema, kFixedHashCode, kFixedHashCode, hashed_components,
585
0
      /* req */ nullptr, /* if_req */ nullptr, rocksdb::kDefaultQueryId);
586
587
0
  DocRowwiseIterator ql_iter(
588
0
      schema, schema, kNonTransactionalOperationContext, doc_db(),
589
0
      CoarseTimePoint::max() /* deadline */, ReadHybridTime::FromMicros(3000));
590
0
  ASSERT_OK(ql_iter.Init(ql_scan_spec));
591
0
  ASSERT_TRUE(ASSERT_RESULT(ql_iter.HasNext()));
592
0
  QLTableRow value_map;
593
0
  ASSERT_OK(ql_iter.NextRow(&value_map));
594
0
  ASSERT_EQ(4, value_map.ColumnCount());
595
0
  EXPECT_EQ(100, value_map.TestValue(0).value.int32_value());
596
0
  EXPECT_TRUE(IsNull(value_map.TestValue(1).value));
597
0
  EXPECT_TRUE(IsNull(value_map.TestValue(2).value));
598
0
  EXPECT_EQ(101, value_map.TestValue(3).value.int32_value());
599
600
  // Now verify row exists as long as liveness system column exists.
601
0
  doc_key = DocKey(kFixedHashCode, PrimitiveValues(PrimitiveValue::Int32(101)), PrimitiveValues());
602
0
  encoded_doc_key = doc_key.Encode();
603
0
  ASSERT_OK(SetPrimitive(DocPath(encoded_doc_key, PrimitiveValue::kLivenessColumn),
604
0
                         Value(PrimitiveValue(ValueType::kNullLow)),
605
0
                         HybridTime(1000)));
606
0
  ASSERT_OK(SetPrimitive(DocPath(encoded_doc_key, PrimitiveValue(ColumnId(1))),
607
0
                         Value(PrimitiveValue::kTombstone), HybridTime(1000)));
608
0
  ASSERT_OK(SetPrimitive(DocPath(encoded_doc_key, PrimitiveValue(ColumnId(2))),
609
0
                         Value(PrimitiveValue::Int32(2),
610
0
                               MonoDelta::FromMilliseconds(1)), HybridTime(2000)));
611
0
  ASSERT_OK(SetPrimitive(DocPath(encoded_doc_key, PrimitiveValue(ColumnId(3))),
612
0
                         Value(PrimitiveValue::kTombstone), HybridTime(3000)));
613
614
0
  AssertDocDbDebugDumpStrEq(R"#(
615
0
SubDocKey(DocKey(0x0000, [100], []), [ColumnId(1); HT{ physical: 0 logical: 1001 }]) -> DEL
616
0
SubDocKey(DocKey(0x0000, [100], []), [ColumnId(1); HT{ physical: 0 logical: 1000 }]) -> DEL
617
0
SubDocKey(DocKey(0x0000, [100], []), [ColumnId(2); HT{ physical: 0 logical: 2001 }]) -> \
618
0
    2; ttl: 0.001s
619
0
SubDocKey(DocKey(0x0000, [100], []), [ColumnId(2); HT{ physical: 0 logical: 2000 }]) -> DEL
620
0
SubDocKey(DocKey(0x0000, [100], []), [ColumnId(3); HT{ physical: 0 logical: 3001 }]) -> 101
621
0
SubDocKey(DocKey(0x0000, [100], []), [ColumnId(3); HT{ physical: 0 logical: 3000 }]) -> DEL
622
0
SubDocKey(DocKey(0x0000, [101], []), [SystemColumnId(0); HT{ physical: 0 logical: 1000 }]) -> null
623
0
SubDocKey(DocKey(0x0000, [101], []), [ColumnId(1); HT{ physical: 0 logical: 1000 }]) -> DEL
624
0
SubDocKey(DocKey(0x0000, [101], []), [ColumnId(2); HT{ physical: 0 logical: 2000 }]) -> \
625
0
    2; ttl: 0.001s
626
0
SubDocKey(DocKey(0x0000, [101], []), [ColumnId(3); HT{ physical: 0 logical: 3000 }]) -> DEL
627
0
      )#");
628
629
0
  vector<PrimitiveValue> hashed_components_system({PrimitiveValue::Int32(101)});
630
0
  DocQLScanSpec ql_scan_spec_system(schema, kFixedHashCode, kFixedHashCode,
631
0
      hashed_components_system, /* req */ nullptr,  /* if_req */ nullptr,
632
0
      rocksdb::kDefaultQueryId);
633
634
0
  DocRowwiseIterator ql_iter_system(
635
0
      schema, schema, kNonTransactionalOperationContext, doc_db(),
636
0
      CoarseTimePoint::max() /* deadline */, ReadHybridTime::FromMicros(3000));
637
0
  ASSERT_OK(ql_iter_system.Init(ql_scan_spec_system));
638
0
  ASSERT_TRUE(ASSERT_RESULT(ql_iter_system.HasNext()));
639
0
  QLTableRow value_map_system;
640
0
  ASSERT_OK(ql_iter_system.NextRow(&value_map_system));
641
0
  ASSERT_EQ(4, value_map_system.ColumnCount());
642
0
  EXPECT_EQ(101, value_map_system.TestValue(0).value.int32_value());
643
0
  EXPECT_TRUE(IsNull(value_map_system.TestValue(1).value));
644
0
  EXPECT_TRUE(IsNull(value_map_system.TestValue(2).value));
645
0
  EXPECT_TRUE(IsNull(value_map_system.TestValue(3).value));
646
0
}
647
648
namespace {
649
650
0
int32_t NewInt(std::mt19937_64* rng, std::unordered_set<int32_t>* existing) {
651
0
  std::uniform_int_distribution<uint32_t> distribution;
652
0
  for (;;) {
653
0
    auto result = distribution(*rng);
654
0
    if (existing->insert(static_cast<int32_t>(result)).second) {
655
0
      return result;
656
0
    }
657
0
  }
658
0
}
659
660
int32_t NewInt(std::mt19937_64* rng, std::unordered_set<int32_t>* existing,
661
0
    const int32_t min, const int32_t max) {
662
0
  std::uniform_int_distribution<int32_t> distribution(min, max);
663
0
  for (;;) {
664
0
    auto result = distribution(*rng);
665
0
    if (existing->insert(result).second) {
666
0
      return result;
667
0
    }
668
0
  }
669
0
}
670
671
struct RowData {
672
  int32_t k;
673
  int32_t r;
674
  int32_t v;
675
};
676
677
struct RowDataWithHt {
678
  RowData data;
679
  HybridTime ht;
680
};
681
682
0
bool operator==(const RowData& lhs, const RowData& rhs) {
683
0
  return lhs.k == rhs.k && lhs.r == rhs.r && lhs.v == rhs.v;
684
0
}
685
686
0
bool operator<(const RowData& lhs, const RowData& rhs) {
687
0
  return lhs.k == rhs.k ? (lhs.r == rhs.r ? lhs.v < rhs.v : lhs.r < rhs.r) : lhs.k < rhs.k;
688
0
}
689
690
0
bool IsSameKey(const RowData& lhs, const RowData& rhs) {
691
0
  return lhs.k == rhs.k && lhs.r == rhs.r;
692
0
}
693
694
0
bool IsKeyLessThan(const RowData& lhs, const RowData& rhs) {
695
0
  return lhs.k == rhs.k ? lhs.r < rhs.r : lhs.k < rhs.k;
696
0
}
697
698
0
std::ostream& operator<<(std::ostream& out, const RowData& row) {
699
0
  return out << "{ k: " << row.k << " r: " << row.r << " v: " << row.v << " }";
700
0
}
701
702
0
bool IsSameKey(const RowDataWithHt& lhs, const RowDataWithHt& rhs) {
703
0
  return IsSameKey(lhs.data, rhs.data);
704
0
}
705
706
0
bool IsKeyLessThan(const RowDataWithHt& lhs, const RowDataWithHt& rhs) {
707
0
  return IsKeyLessThan(lhs.data, rhs.data);
708
0
}
709
710
0
bool operator<(const RowDataWithHt& lhs, const RowDataWithHt& rhs) {
711
0
  return IsSameKey(lhs, rhs)
712
0
      ? (lhs.ht == rhs.ht ? lhs.data < rhs.data : lhs.ht > rhs.ht)
713
0
      : IsKeyLessThan(lhs, rhs);
714
0
}
715
716
0
std::ostream& operator<<(std::ostream& out, const RowDataWithHt& row) {
717
0
  return out << "{ data: " << row.data << " ht: " << row.ht << " }";
718
0
}
719
720
template<class It>
721
0
It MoveForwardToNextKey(It it, const It end) {
722
0
  return std::find_if_not(it, end, [it](auto k) { return IsSameKey(k, *it); });
723
0
}
724
725
template<class It>
726
0
It MoveBackToBeginningOfCurrentKey(const It begin, It it) {
727
728
0
  return std::lower_bound(begin, it, *it, [](auto k1, auto k2) { return IsKeyLessThan(k1, k2); });
729
0
}
730
731
template<class It>
732
0
std::pair<It, It> GetIteratorRange(const It begin, const It end, const It it, QLOperator op) {
733
0
  switch (op) {
734
0
    case QL_OP_EQUAL:
735
0
      {
736
0
        return std::make_pair(MoveBackToBeginningOfCurrentKey(begin, it),
737
0
            MoveForwardToNextKey(it, end));
738
0
      }
739
0
    case QL_OP_LESS_THAN:
740
0
      {
741
0
        return std::make_pair(begin, MoveBackToBeginningOfCurrentKey(begin, it));
742
0
      }
743
0
    case QL_OP_LESS_THAN_EQUAL:
744
0
      {
745
0
        return std::make_pair(begin, MoveForwardToNextKey(it, end));
746
0
      }
747
0
    case QL_OP_GREATER_THAN:
748
0
      {
749
0
        return std::make_pair(MoveForwardToNextKey(it, end), end);
750
0
      }
751
0
    case QL_OP_GREATER_THAN_EQUAL:
752
0
      {
753
0
        return std::make_pair(MoveBackToBeginningOfCurrentKey(begin, it), end);
754
0
      }
755
0
    default: // We should not handle all cases here
756
0
      LOG(FATAL) << "Unexpected op: " << QLOperator_Name(op);
757
0
      return std::make_pair(begin, end);
758
0
  }
759
0
}
760
761
} // namespace
762
763
class DocOperationScanTest : public DocOperationTest {
764
 protected:
765
0
  DocOperationScanTest() {
766
0
    Seed(&rng_);
767
0
  }
768
769
0
  void InitSchema(SortingType range_column_sorting) {
770
0
    range_column_sorting_type_ = range_column_sorting;
771
0
    ColumnSchema hash_column("k", INT32, false, true);
772
0
    ColumnSchema range_column("r", INT32, false, false, false, false, 1, range_column_sorting);
773
0
    ColumnSchema value_column("v", INT32, false, false);
774
0
    auto columns = { hash_column, range_column, value_column };
775
0
    schema_ = Schema(columns, CreateColumnIds(columns.size()), 2);
776
0
  }
777
778
  void InsertRows(const size_t num_rows_per_key,
779
0
      TransactionStatusManagerMock* txn_status_manager = nullptr) {
780
0
    ResetCurrentTransactionId();
781
0
    ASSERT_OK(DisableCompactions());
782
783
0
    std::unordered_set<int32_t> used_ints;
784
0
    h_key_ = NewInt(&rng_, &used_ints);
785
0
    rows_.clear();
786
0
    for (int32_t i = 0; i != kNumKeys; ++i) {
787
0
      int32_t r_key = NewInt(&rng_, &used_ints);
788
0
      for (size_t j = 0; j < num_rows_per_key; ++j) {
789
0
        RowData row_data = {h_key_, r_key, NewInt(&rng_, &used_ints)};
790
0
        auto ht = HybridClock::HybridTimeFromMicrosecondsAndLogicalValue(
791
0
            NewInt(&rng_, &used_ints, kMinTime, kMaxTime), 0);
792
0
        RowDataWithHt row = {row_data, ht};
793
0
        rows_.push_back(row);
794
795
0
        std::unique_ptr<TransactionOperationContext> txn_op_context;
796
0
        boost::optional<TransactionId> txn_id;
797
0
        if (txn_status_manager) {
798
0
          if (RandomActWithProbability(0.5, &rng_)) {
799
0
            txn_id = TransactionId::GenerateRandom();
800
0
            SetCurrentTransactionId(*txn_id);
801
0
            txn_op_context = std::make_unique<TransactionOperationContext>(*txn_id,
802
0
                txn_status_manager);
803
0
          } else {
804
0
            ResetCurrentTransactionId();
805
0
          }
806
0
        }
807
0
        WriteQLRow(QLWriteRequestPB_QLStmtType_QL_STMT_INSERT,
808
0
                   schema_,
809
0
                   { row_data.k, row_data.r, row_data.v },
810
0
                   1000,
811
0
                   ht,
812
0
                   txn_op_context ? *txn_op_context : kNonTransactionalOperationContext);
813
0
        if (txn_id) {
814
0
          txn_status_manager->Commit(*txn_id, ht);
815
0
        }
816
0
      }
817
818
0
      ASSERT_OK(FlushRocksDbAndWait());
819
0
    }
820
821
0
    DumpRocksDBToLog(rocksdb(), StorageDbType::kRegular);
822
0
    DumpRocksDBToLog(intents_db(), StorageDbType::kIntents);
823
0
  }
824
825
  void PerformScans(const bool is_forward_scan,
826
      const TransactionOperationContext& txn_op_context,
827
0
      boost::function<void(const size_t keys_in_scan_range)> after_scan_callback) {
828
0
    std::vector <PrimitiveValue> hashed_components = {PrimitiveValue::Int32(h_key_)};
829
0
    std::vector <QLOperator> operators = {
830
0
        QL_OP_EQUAL,
831
0
        QL_OP_LESS_THAN_EQUAL,
832
0
        QL_OP_GREATER_THAN_EQUAL,
833
0
    };
834
835
0
    std::shuffle(rows_.begin(), rows_.end(), rng_);
836
0
    auto ordered_rows = rows_;
837
0
    std::sort(ordered_rows.begin(), ordered_rows.end());
838
839
0
    for (const auto op : operators) {
840
0
      LOG(INFO) << "Testing: " << QLOperator_Name(op);
841
0
      for (const auto& row : rows_) {
842
0
        QLConditionPB condition;
843
0
        condition.add_operands()->set_column_id(1_ColId);
844
0
        condition.set_op(op);
845
0
        condition.add_operands()->mutable_value()->set_int32_value(row.data.r);
846
0
        auto it = std::lower_bound(ordered_rows.begin(), ordered_rows.end(), row);
847
0
        LOG(INFO) << "Bound: " << yb::ToString(*it);
848
0
        const auto range = GetIteratorRange(ordered_rows.begin(), ordered_rows.end(), it, op);
849
0
        std::vector <int32_t> ht_diffs;
850
0
        if (op == QL_OP_EQUAL) {
851
0
          ht_diffs = {0};
852
0
        } else {
853
0
          ht_diffs = {-1, 0, 1};
854
0
        }
855
0
        for (auto ht_diff : ht_diffs) {
856
0
          std::vector <RowDataWithHt> expected_rows;
857
0
          auto read_ht = ReadHybridTime::FromMicros(row.ht.GetPhysicalValueMicros() + ht_diff);
858
0
          LOG(INFO) << "Read time: " << read_ht;
859
0
          size_t keys_in_range = range.first < range.second;
860
0
          for (auto it_r = range.first; it_r < range.second;) {
861
0
            auto it = it_r;
862
0
            if (it_r->ht <= read_ht.read) {
863
0
              expected_rows.emplace_back(*it_r);
864
0
              while (it_r < range.second && IsSameKey(*it_r, *it)) {
865
0
                ++it_r;
866
0
              }
867
0
            } else {
868
0
              ++it_r;
869
0
            }
870
0
            if (it_r < range.second && !IsSameKey(*it_r, *it)) {
871
0
              ++keys_in_range;
872
0
            }
873
0
          }
874
875
0
          if (is_forward_scan ==
876
0
              (range_column_sorting_type_ == SortingType::kDescending)) {
877
0
            std::reverse(expected_rows.begin(), expected_rows.end());
878
0
          }
879
0
          DocQLScanSpec ql_scan_spec(
880
0
              schema_, kFixedHashCode, kFixedHashCode, hashed_components,
881
0
              &condition, nullptr /* if_ req */, rocksdb::kDefaultQueryId, is_forward_scan);
882
0
          DocRowwiseIterator ql_iter(
883
0
              schema_, schema_, txn_op_context, doc_db(), CoarseTimePoint::max() /* deadline */,
884
0
              read_ht);
885
0
          ASSERT_OK(ql_iter.Init(ql_scan_spec));
886
0
          LOG(INFO) << "Expected rows: " << yb::ToString(expected_rows);
887
0
          it = expected_rows.begin();
888
0
          while (ASSERT_RESULT(ql_iter.HasNext())) {
889
0
            QLTableRow value_map;
890
0
            ASSERT_OK(ql_iter.NextRow(&value_map));
891
0
            ASSERT_EQ(3, value_map.ColumnCount());
892
893
0
            RowData fetched_row = {value_map.TestValue(0_ColId).value.int32_value(),
894
0
                value_map.TestValue(1_ColId).value.int32_value(),
895
0
                value_map.TestValue(2_ColId).value.int32_value()};
896
0
            LOG(INFO) << "Fetched row: " << fetched_row;
897
0
            ASSERT_LT(it, expected_rows.end());
898
0
            ASSERT_EQ(fetched_row, it->data);
899
0
            it++;
900
0
          }
901
0
          ASSERT_EQ(expected_rows.end(), it);
902
903
0
          after_scan_callback(keys_in_range);
904
0
        }
905
0
      }
906
0
    }
907
0
  }
908
909
0
  void TestWithSortingType(SortingType sorting_type, bool is_forward_scan) {
910
0
    DoTestWithSortingType(sorting_type, is_forward_scan, 1 /* num_rows_per_key */);
911
0
    ASSERT_OK(DestroyRocksDB());
912
0
    ASSERT_OK(ReopenRocksDB());
913
0
    DoTestWithSortingType(sorting_type, is_forward_scan, 5 /* num_rows_per_key */);
914
0
  }
915
916
  virtual void DoTestWithSortingType(SortingType schema_type, bool is_forward_scan,
917
      size_t num_rows_per_key) = 0;
918
919
  constexpr static int32_t kNumKeys = 20;
920
  constexpr static uint32_t kMinTime = 500;
921
  constexpr static uint32_t kMaxTime = 1500;
922
923
  std::mt19937_64 rng_;
924
  SortingType range_column_sorting_type_;
925
  Schema schema_;
926
  int32_t h_key_;
927
  std::vector<RowDataWithHt> rows_;
928
};
929
930
class DocOperationRangeFilterTest : public DocOperationScanTest {
931
 protected:
932
  void DoTestWithSortingType(SortingType sorting_type, bool is_forward_scan,
933
      size_t num_rows_per_key) override;
934
};
935
936
// Currently we test using one column and one scan type.
937
// TODO(akashnil): In future we want to implement and test arbitrary ASC DESC combinations for scan.
938
void DocOperationRangeFilterTest::DoTestWithSortingType(SortingType sorting_type,
939
0
    const bool is_forward_scan, const size_t num_rows_per_key) {
940
0
  ASSERT_OK(DisableCompactions());
941
942
0
  InitSchema(sorting_type);
943
0
  InsertRows(num_rows_per_key);
944
945
0
  {
946
0
    std::vector<rocksdb::LiveFileMetaData> live_files;
947
0
    rocksdb()->GetLiveFilesMetaData(&live_files);
948
0
    const auto expected_live_files = kNumKeys;
949
0
    ASSERT_EQ(expected_live_files, live_files.size());
950
0
  }
951
952
0
  auto old_iterators =
953
0
      rocksdb()->GetDBOptions().statistics->getTickerCount(rocksdb::NO_TABLE_CACHE_ITERATORS);
954
955
0
  PerformScans(is_forward_scan, kNonTransactionalOperationContext,
956
0
      [this, &old_iterators](const size_t key_in_scan_range) {
957
0
    auto new_iterators =
958
0
        rocksdb()->GetDBOptions().statistics->getTickerCount(rocksdb::NO_TABLE_CACHE_ITERATORS);
959
0
    ASSERT_EQ(key_in_scan_range, new_iterators - old_iterators);
960
0
    old_iterators = new_iterators;
961
0
  });
962
0
}
963
964
0
TEST_F_EX(DocOperationTest, QLRangeFilterAscendingForwardScan, DocOperationRangeFilterTest) {
965
0
  TestWithSortingType(SortingType::kAscending, true);
966
0
}
967
968
0
TEST_F_EX(DocOperationTest, QLRangeFilterDescendingForwardScan, DocOperationRangeFilterTest) {
969
0
  TestWithSortingType(SortingType::kDescending, true);
970
0
}
971
972
0
TEST_F_EX(DocOperationTest, QLRangeFilterAscendingReverseScan, DocOperationRangeFilterTest) {
973
0
  TestWithSortingType(SortingType::kAscending, false);
974
0
}
975
976
0
TEST_F_EX(DocOperationTest, QLRangeFilterDescendingReverseScan, DocOperationRangeFilterTest) {
977
0
  TestWithSortingType(SortingType::kDescending, false);
978
0
}
979
980
class DocOperationTxnScanTest : public DocOperationScanTest {
981
 protected:
982
  void DoTestWithSortingType(SortingType sorting_type, bool is_forward_scan,
983
0
      size_t num_rows_per_key) override {
984
0
    ASSERT_OK(DisableCompactions());
985
986
0
    InitSchema(sorting_type);
987
988
0
    TransactionStatusManagerMock txn_status_manager;
989
0
    SetTransactionIsolationLevel(IsolationLevel::SNAPSHOT_ISOLATION);
990
991
0
    InsertRows(num_rows_per_key, &txn_status_manager);
992
993
0
    PerformScans(is_forward_scan,
994
0
                 TransactionOperationContext(TransactionId::GenerateRandom(), &txn_status_manager),
995
0
                 [](size_t){});
996
0
  }
997
};
998
999
0
TEST_F_EX(DocOperationTest, QLTxnAscendingForwardScan, DocOperationTxnScanTest) {
1000
0
  TestWithSortingType(SortingType::kAscending, true);
1001
0
}
1002
1003
0
TEST_F_EX(DocOperationTest, QLTxnDescendingForwardScan, DocOperationTxnScanTest) {
1004
0
  TestWithSortingType(SortingType::kDescending, true);
1005
0
}
1006
1007
0
TEST_F_EX(DocOperationTest, QLTxnAscendingReverseScan, DocOperationTxnScanTest) {
1008
0
  TestWithSortingType(SortingType::kAscending, false);
1009
0
}
1010
1011
0
TEST_F_EX(DocOperationTest, QLTxnDescendingReverseScan, DocOperationTxnScanTest) {
1012
0
  TestWithSortingType(SortingType::kDescending, false);
1013
0
}
1014
1015
0
TEST_F(DocOperationTest, TestQLCompactions) {
1016
0
  yb::QLWriteRequestPB ql_writereq_pb;
1017
0
  yb::QLResponsePB ql_writeresp_pb;
1018
1019
0
  HybridTime t0 = HybridClock::HybridTimeFromMicrosecondsAndLogicalValue(1000, 0);
1020
0
  HybridTime t0prime = HybridClock::HybridTimeFromMicrosecondsAndLogicalValue(1000, 1);
1021
0
  HybridTime t1 = HybridClock::AddPhysicalTimeToHybridTime(t0, MonoDelta::FromMilliseconds(1001));
1022
1023
  // Define the schema.
1024
0
  Schema schema = CreateSchema();
1025
0
  WriteQLRow(QLWriteRequestPB_QLStmtType_QL_STMT_INSERT, schema, vector<int>({1, 1, 2, 3}),
1026
0
      1000, t0);
1027
1028
0
  AssertDocDbDebugDumpStrEq(R"#(
1029
0
SubDocKey(DocKey(0x0000, [1], []), [SystemColumnId(0); HT{ physical: 1000 }]) -> null; ttl: 1.000s
1030
0
SubDocKey(DocKey(0x0000, [1], []), [ColumnId(1); HT{ physical: 1000 w: 1 }]) -> 1; ttl: 1.000s
1031
0
SubDocKey(DocKey(0x0000, [1], []), [ColumnId(2); HT{ physical: 1000 w: 2 }]) -> 2; ttl: 1.000s
1032
0
SubDocKey(DocKey(0x0000, [1], []), [ColumnId(3); HT{ physical: 1000 w: 3 }]) -> 3; ttl: 1.000s
1033
0
      )#");
1034
1035
0
  FullyCompactHistoryBefore(t1);
1036
1037
  // Verify all entries are purged.
1038
0
  AssertDocDbDebugDumpStrEq(R"#(
1039
0
      )#");
1040
1041
  // Add a row with a TTL.
1042
0
  WriteQLRow(QLWriteRequestPB_QLStmtType_QL_STMT_INSERT, schema, vector<int>({1, 1, 2, 3}),
1043
0
      1000, t0);
1044
0
  AssertDocDbDebugDumpStrEq(R"#(
1045
0
SubDocKey(DocKey(0x0000, [1], []), [SystemColumnId(0); HT{ physical: 1000 }]) -> null; ttl: 1.000s
1046
0
SubDocKey(DocKey(0x0000, [1], []), [ColumnId(1); HT{ physical: 1000 w: 1 }]) -> 1; ttl: 1.000s
1047
0
SubDocKey(DocKey(0x0000, [1], []), [ColumnId(2); HT{ physical: 1000 w: 2 }]) -> 2; ttl: 1.000s
1048
0
SubDocKey(DocKey(0x0000, [1], []), [ColumnId(3); HT{ physical: 1000 w: 3 }]) -> 3; ttl: 1.000s
1049
0
     )#");
1050
1051
  // Update the columns with a higher TTL.
1052
0
  yb::QLWriteRequestPB ql_update_pb;
1053
0
  yb::QLResponsePB ql_update_resp_pb;
1054
0
  ql_writereq_pb.set_type(
1055
0
      QLWriteRequestPB_QLStmtType::QLWriteRequestPB_QLStmtType_QL_STMT_UPDATE);
1056
1057
0
  ql_writereq_pb.set_hash_code(kFixedHashCode);
1058
0
  AddPrimaryKeyColumn(&ql_writereq_pb, 1);
1059
0
  AddColumnValues(schema, {10, 20, 30}, &ql_writereq_pb);
1060
0
  ql_writereq_pb.set_ttl(2000);
1061
1062
  // Write to docdb at the same physical time and a bumped-up logical time.
1063
0
  WriteQL(ql_writereq_pb, schema, &ql_writeresp_pb, t0prime);
1064
1065
0
  AssertDocDbDebugDumpStrEq(R"#(
1066
0
SubDocKey(DocKey(0x0000, [1], []), [SystemColumnId(0); HT{ physical: 1000 }]) -> \
1067
0
    null; ttl: 1.000s
1068
0
SubDocKey(DocKey(0x0000, [1], []), [ColumnId(1); HT{ physical: 1000 logical: 1 }]) -> \
1069
0
    10; ttl: 2.000s
1070
0
SubDocKey(DocKey(0x0000, [1], []), [ColumnId(1); HT{ physical: 1000 w: 1 }]) -> \
1071
0
    1; ttl: 1.000s
1072
0
SubDocKey(DocKey(0x0000, [1], []), [ColumnId(2); HT{ physical: 1000 logical: 1 w: 1 }]) -> \
1073
0
    20; ttl: 2.000s
1074
0
SubDocKey(DocKey(0x0000, [1], []), [ColumnId(2); HT{ physical: 1000 w: 2 }]) -> \
1075
0
    2; ttl: 1.000s
1076
0
SubDocKey(DocKey(0x0000, [1], []), [ColumnId(3); HT{ physical: 1000 logical: 1 w: 2 }]) -> \
1077
0
    30; ttl: 2.000s
1078
0
SubDocKey(DocKey(0x0000, [1], []), [ColumnId(3); HT{ physical: 1000 w: 3 }]) -> 3; ttl: 1.000s
1079
0
      )#");
1080
1081
0
  FullyCompactHistoryBefore(t1);
1082
1083
  // Verify the rest of the columns still live.
1084
0
  AssertDocDbDebugDumpStrEq(R"#(
1085
0
SubDocKey(DocKey(0x0000, [1], []), [ColumnId(1); HT{ physical: 1000 logical: 1 }]) -> \
1086
0
    10; ttl: 2.000s
1087
0
SubDocKey(DocKey(0x0000, [1], []), [ColumnId(2); HT{ physical: 1000 logical: 1 w: 1 }]) -> \
1088
0
    20; ttl: 2.000s
1089
0
SubDocKey(DocKey(0x0000, [1], []), [ColumnId(3); HT{ physical: 1000 logical: 1 w: 2 }]) -> \
1090
0
    30; ttl: 2.000s
1091
0
      )#");
1092
1093
  // Verify reads work well without system column id.
1094
0
  QLRowBlock row_block = ReadQLRow(schema, 1, t1);
1095
0
  ASSERT_EQ(1, row_block.row_count());
1096
0
  EXPECT_EQ(1, row_block.row(0).column(0).int32_value());
1097
0
  EXPECT_EQ(10, row_block.row(0).column(1).int32_value());
1098
0
  EXPECT_EQ(20, row_block.row(0).column(2).int32_value());
1099
0
  EXPECT_EQ(30, row_block.row(0).column(3).int32_value());
1100
0
}
1101
1102
namespace {
1103
1104
0
size_t GenerateFiles(int total_batches, DocOperationTest* test, const int kBigFileFrequency = 7) {
1105
0
  auto schema = test->CreateSchema();
1106
1107
0
  auto t0 = HybridTime::FromMicrosecondsAndLogicalValue(1000, 0);
1108
0
  const int kBigFileRows = 10000;
1109
0
  size_t expected_files = 0;
1110
0
  bool first = true;
1111
0
  for (int i = 0; i != total_batches; ++i) {
1112
0
    int base = i * kBigFileRows;
1113
0
    int count;
1114
0
    if (i % kBigFileFrequency == 0) {
1115
0
      count = kBigFileRows;
1116
0
      ++expected_files;
1117
0
      first = true;
1118
0
    } else {
1119
0
      count = 1;
1120
0
      if (first) {
1121
0
        ++expected_files;
1122
0
        first = false;
1123
0
      }
1124
0
    }
1125
0
    for (int j = base; j != base + count; ++j) {
1126
0
      test->WriteQLRow(
1127
0
          QLWriteRequestPB_QLStmtType_QL_STMT_INSERT, schema, {j, j, j, j}, 1000000, t0);
1128
0
    }
1129
0
    EXPECT_OK(test->FlushRocksDbAndWait());
1130
0
  }
1131
0
  return expected_files;
1132
0
}
1133
1134
int CountBigFiles(const std::vector<rocksdb::LiveFileMetaData>& files,
1135
0
    const size_t kMaxFileSize) {
1136
0
  int num_large_files = 0;
1137
0
  for (auto file : files) {
1138
0
    if (file.uncompressed_size > kMaxFileSize) {
1139
0
      num_large_files++;
1140
0
    }
1141
0
  }
1142
0
  return num_large_files;
1143
0
}
1144
1145
0
void WaitCompactionsDone(rocksdb::DB* db) {
1146
0
  for (;;) {
1147
0
    std::this_thread::sleep_for(500ms);
1148
0
    uint64_t value = 0;
1149
0
    ASSERT_TRUE(db->GetIntProperty(rocksdb::DB::Properties::kNumRunningCompactions, &value));
1150
0
    if (value == 0) {
1151
0
      break;
1152
0
    }
1153
0
  }
1154
0
}
1155
1156
} // namespace
1157
1158
0
TEST_F(DocOperationTest, MaxFileSizeForCompaction) {
1159
0
  google::FlagSaver flag_saver;
1160
1161
0
  ASSERT_OK(DisableCompactions());
1162
0
  const int kTotalBatches = 20;
1163
0
  auto expected_files = GenerateFiles(kTotalBatches, this);
1164
1165
0
  std::vector<rocksdb::LiveFileMetaData> files;
1166
0
  rocksdb()->GetLiveFilesMetaData(&files);
1167
0
  ASSERT_EQ(kTotalBatches, files.size());
1168
1169
0
  SetMaxFileSizeForCompaction(100_KB);
1170
0
  ASSERT_OK(ReinitDBOptions());
1171
1172
0
  WaitCompactionsDone(rocksdb());
1173
1174
0
  files.clear();
1175
0
  rocksdb()->GetLiveFilesMetaData(&files);
1176
0
  ASSERT_EQ(expected_files, files.size());
1177
0
}
1178
1179
0
TEST_F(DocOperationTest, MaxFileSizeWithWritesTrigger) {
1180
0
  google::FlagSaver flag_saver;
1181
1182
0
  ASSERT_OK(DisableCompactions());
1183
0
  const int kTotalBatches = 20;
1184
0
  GenerateFiles(kTotalBatches, this);
1185
1186
0
  SetMaxFileSizeForCompaction(100_KB);
1187
0
  ASSERT_OK(ReinitDBOptions());
1188
1189
0
  WaitCompactionsDone(rocksdb());
1190
1191
0
  FLAGS_rocksdb_level0_slowdown_writes_trigger = 2;
1192
0
  FLAGS_rocksdb_level0_stop_writes_trigger = 1;
1193
0
  ASSERT_OK(ReinitDBOptions());
1194
1195
0
  std::vector<rocksdb::LiveFileMetaData> files;
1196
0
  rocksdb()->GetLiveFilesMetaData(&files);
1197
0
  ASSERT_GE(files.size(), 3);
1198
1199
0
  auto handle_impl = down_cast<rocksdb::ColumnFamilyHandleImpl*>(
1200
0
      regular_db_->DefaultColumnFamily());
1201
0
  auto stats = handle_impl->cfd()->internal_stats();
1202
0
  ASSERT_EQ(0, stats->GetCFStats(rocksdb::InternalStats::LEVEL0_NUM_FILES_TOTAL));
1203
0
  ASSERT_EQ(0, stats->GetCFStats(rocksdb::InternalStats::LEVEL0_SLOWDOWN_TOTAL));
1204
0
}
1205
1206
0
TEST_F(DocOperationTest, MaxFileSizeIgnoredWithFileFilter) {
1207
0
  ASSERT_OK(DisableCompactions());
1208
0
  const size_t kMaxFileSize = 100_KB;
1209
0
  const int kNumFilesToWrite = 20;
1210
0
  const int kBigFileFrequency = 7;
1211
0
  const int kExpectedBigFiles = (kNumFilesToWrite-1)/kBigFileFrequency + 1;
1212
0
  auto expected_files = GenerateFiles(kNumFilesToWrite, this, kBigFileFrequency);
1213
0
  LOG(INFO) << "Files that would exist without compaction, if no filtering: " << expected_files;
1214
1215
0
  auto files = rocksdb()->GetLiveFilesMetaData();
1216
0
  ASSERT_EQ(kNumFilesToWrite, files.size());
1217
0
  ASSERT_EQ(kExpectedBigFiles, CountBigFiles(files, kMaxFileSize));
1218
1219
0
  SetMaxFileSizeForCompaction(kMaxFileSize);
1220
1221
  // Use a filter factory that will expire every file.
1222
0
  compaction_file_filter_factory_ =
1223
0
      std::make_shared<DiscardUntilFileFilterFactory>(kAlwaysDiscard);
1224
1225
0
  ASSERT_OK(ReinitDBOptions());
1226
1227
0
  WaitCompactionsDone(rocksdb());
1228
1229
0
  files = rocksdb()->GetLiveFilesMetaData();
1230
1231
  // We expect all files to be filtered and thus removed, no matter the size.
1232
0
  ASSERT_EQ(0, files.size());
1233
0
  auto stats = rocksdb()->GetOptions().statistics;
1234
0
  ASSERT_EQ(kNumFilesToWrite, stats->getTickerCount(rocksdb::COMPACTION_FILES_FILTERED));
1235
0
}
1236
1237
0
TEST_F(DocOperationTest, EarlyFilesFilteredBeforeBigFile) {
1238
  // Ensure that any number of files can trigger a compaction.
1239
0
  ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_level0_file_num_compaction_trigger) = 0;
1240
0
  ASSERT_OK(DisableCompactions());
1241
0
  const size_t kMaxFileSize = 100_KB;
1242
0
  const int kNumFilesToWrite = 20;
1243
0
  const int kBigFileFrequency = 7;
1244
0
  const int kNumFilesToExpire = 3;
1245
0
  const int kExpectedBigFiles = (kNumFilesToWrite-1)/kBigFileFrequency + 1;
1246
  // Expected files should be one fewer than if none were discarded, since the first "big file"
1247
  // will be expired.
1248
0
  auto expected_files = GenerateFiles(kNumFilesToWrite, this, kBigFileFrequency) - 1;
1249
0
  LOG(INFO) << "Expiring " << kNumFilesToExpire <<
1250
0
      " files, first kept big file at " << kBigFileFrequency + 1;
1251
0
  LOG(INFO) << "Expected files after compaction: " << expected_files;
1252
1253
0
  auto files = rocksdb()->GetLiveFilesMetaData();
1254
0
  ASSERT_EQ(kNumFilesToWrite, files.size());
1255
0
  ASSERT_EQ(kExpectedBigFiles, CountBigFiles(files, kMaxFileSize));
1256
1257
  // Files will be ordered from latest to earliest, so select the nth file from the back.
1258
0
  auto last_to_discard =
1259
0
      rocksdb::TableFileNameToNumber(files[files.size() - kNumFilesToExpire].name);
1260
1261
0
  SetMaxFileSizeForCompaction(kMaxFileSize);
1262
1263
  // Use a filter factory that will expire exactly three files.
1264
0
  compaction_file_filter_factory_ =
1265
0
      std::make_shared<DiscardUntilFileFilterFactory>(last_to_discard);
1266
1267
  // Reinitialize the DB options with the file filter factory.
1268
0
  ASSERT_OK(ReinitDBOptions());
1269
1270
  // Compactions will be kicked off as part of options reinitialization.
1271
0
  WaitCompactionsDone(rocksdb());
1272
1273
0
  files = rocksdb()->GetLiveFilesMetaData();
1274
1275
  // We expect three files to expire, one big file and two small ones.
1276
  // The remaining small files should be compacted, leaving 5 files remaining.
1277
  //
1278
  // [large][small][small]....[large][small][small]...[large][small][small]...
1279
  // becomes [compacted small][large][compacted small][large][compacted small]
1280
0
  ASSERT_EQ(expected_files, files.size());
1281
1282
0
  auto stats = rocksdb()->GetOptions().statistics;
1283
0
  ASSERT_EQ(kNumFilesToExpire, stats->getTickerCount(rocksdb::COMPACTION_FILES_FILTERED));
1284
0
}
1285
1286
}  // namespace docdb
1287
}  // namespace yb