/Users/deen/code/yugabyte-db/src/yb/docdb/docdb_test_util.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/docdb/docdb_test_util.h" |
15 | | |
16 | | #include <algorithm> |
17 | | #include <memory> |
18 | | #include <sstream> |
19 | | |
20 | | #include "yb/common/hybrid_time.h" |
21 | | #include "yb/common/ql_value.h" |
22 | | |
23 | | #include "yb/docdb/doc_key.h" |
24 | | #include "yb/docdb/doc_reader.h" |
25 | | #include "yb/docdb/docdb-internal.h" |
26 | | #include "yb/docdb/docdb.h" |
27 | | #include "yb/docdb/docdb_compaction_filter.h" |
28 | | #include "yb/docdb/docdb_debug.h" |
29 | | #include "yb/docdb/in_mem_docdb.h" |
30 | | |
31 | | #include "yb/gutil/strings/substitute.h" |
32 | | |
33 | | #include "yb/rocksdb/db/filename.h" |
34 | | |
35 | | #include "yb/rocksutil/write_batch_formatter.h" |
36 | | |
37 | | #include "yb/util/bytes_formatter.h" |
38 | | #include "yb/util/env.h" |
39 | | #include "yb/util/path_util.h" |
40 | | #include "yb/util/scope_exit.h" |
41 | | #include "yb/util/status.h" |
42 | | #include "yb/util/string_trim.h" |
43 | | #include "yb/util/test_macros.h" |
44 | | #include "yb/util/tostring.h" |
45 | | |
46 | | using std::endl; |
47 | | using std::make_shared; |
48 | | using std::string; |
49 | | using std::unique_ptr; |
50 | | using std::vector; |
51 | | using std::stringstream; |
52 | | |
53 | | using strings::Substitute; |
54 | | |
55 | | using yb::util::ApplyEagerLineContinuation; |
56 | | using yb::FormatBytesAsStr; |
57 | | using yb::util::TrimStr; |
58 | | using yb::util::LeftShiftTextBlock; |
59 | | using yb::util::TrimCppComments; |
60 | | |
61 | | namespace yb { |
62 | | namespace docdb { |
63 | | |
64 | | namespace { |
65 | | |
66 | | class NonTransactionalStatusProvider: public TransactionStatusManager { |
67 | | public: |
68 | 0 | HybridTime LocalCommitTime(const TransactionId& id) override { |
69 | 0 | Fail(); |
70 | 0 | return HybridTime::kInvalid; |
71 | 0 | } |
72 | | |
73 | 0 | boost::optional<CommitMetadata> LocalCommitData(const TransactionId& id) override { |
74 | 0 | Fail(); |
75 | 0 | return boost::none; |
76 | 0 | } |
77 | | |
78 | 0 | void RequestStatusAt(const StatusRequest& request) override { |
79 | 0 | Fail(); |
80 | 0 | } |
81 | | |
82 | 0 | Result<TransactionMetadata> PrepareMetadata(const TransactionMetadataPB& pb) override { |
83 | 0 | Fail(); |
84 | 0 | return STATUS(Expired, ""); |
85 | 0 | } |
86 | | |
87 | 0 | int64_t RegisterRequest() override { |
88 | 0 | Fail(); |
89 | 0 | return 0; |
90 | 0 | } |
91 | | |
92 | 0 | void UnregisterRequest(int64_t) override { |
93 | 0 | Fail(); |
94 | 0 | } |
95 | | |
96 | 0 | void Abort(const TransactionId& id, TransactionStatusCallback callback) override { |
97 | 0 | Fail(); |
98 | 0 | } |
99 | | |
100 | 0 | void Cleanup(TransactionIdSet&& set) override { |
101 | 0 | Fail(); |
102 | 0 | } |
103 | | |
104 | | void FillPriorities( |
105 | 0 | boost::container::small_vector_base<std::pair<TransactionId, uint64_t>>* inout) override { |
106 | 0 | Fail(); |
107 | 0 | } |
108 | | |
109 | 379k | HybridTime MinRunningHybridTime() const override { |
110 | 379k | return HybridTime::kMax; |
111 | 379k | } |
112 | | |
113 | 0 | Result<HybridTime> WaitForSafeTime(HybridTime safe_time, CoarseTimePoint deadline) override { |
114 | 0 | return STATUS(NotSupported, "WaitForSafeTime not implemented"); |
115 | 0 | } |
116 | | |
117 | 0 | const TabletId& tablet_id() const override { |
118 | 0 | static TabletId result; |
119 | 0 | return result; |
120 | 0 | } |
121 | | |
122 | | private: |
123 | 0 | static void Fail() { |
124 | 0 | LOG(FATAL) << "Internal error: trying to get transaction status for non transactional table"; |
125 | 0 | } |
126 | | }; |
127 | | |
128 | | NonTransactionalStatusProvider kNonTransactionalStatusProvider; |
129 | | |
130 | | } // namespace |
131 | | |
132 | | const TransactionOperationContext kNonTransactionalOperationContext = { |
133 | | TransactionId::Nil(), &kNonTransactionalStatusProvider |
134 | | }; |
135 | | |
136 | 283k | ValueRef GenRandomPrimitiveValue(RandomNumberGenerator* rng, QLValuePB* holder) { |
137 | 283k | static vector<string> kFruit = { |
138 | 283k | "Apple", |
139 | 283k | "Apricot", |
140 | 283k | "Avocado", |
141 | 283k | "Banana", |
142 | 283k | "Bilberry", |
143 | 283k | "Blackberry", |
144 | 283k | "Blackcurrant", |
145 | 283k | "Blood orange", |
146 | 283k | "Blueberry", |
147 | 283k | "Boysenberry", |
148 | 283k | "Cantaloupe", |
149 | 283k | "Cherimoya", |
150 | 283k | "Cherry", |
151 | 283k | "Clementine", |
152 | 283k | "Cloudberry", |
153 | 283k | "Coconut", |
154 | 283k | "Cranberry", |
155 | 283k | "Cucumber", |
156 | 283k | "Currant", |
157 | 283k | "Custard apple", |
158 | 283k | "Damson", |
159 | 283k | "Date", |
160 | 283k | "Decaisnea Fargesii", |
161 | 283k | "Dragonfruit", |
162 | 283k | "Durian", |
163 | 283k | "Elderberry", |
164 | 283k | "Feijoa", |
165 | 283k | "Fig", |
166 | 283k | "Goji berry", |
167 | 283k | "Gooseberry", |
168 | 283k | "Grape", |
169 | 283k | "Grapefruit", |
170 | 283k | "Guava", |
171 | 283k | "Honeyberry", |
172 | 283k | "Honeydew", |
173 | 283k | "Huckleberry", |
174 | 283k | "Jabuticaba", |
175 | 283k | "Jackfruit", |
176 | 283k | "Jambul", |
177 | 283k | "Jujube", |
178 | 283k | "Juniper berry", |
179 | 283k | "Kiwifruit", |
180 | 283k | "Kumquat", |
181 | 283k | "Lemon", |
182 | 283k | "Lime", |
183 | 283k | "Longan", |
184 | 283k | "Loquat", |
185 | 283k | "Lychee", |
186 | 283k | "Mandarine", |
187 | 283k | "Mango", |
188 | 283k | "Marionberry", |
189 | 283k | "Melon", |
190 | 283k | "Miracle fruit", |
191 | 283k | "Mulberry", |
192 | 283k | "Nance", |
193 | 283k | "Nectarine", |
194 | 283k | "Olive", |
195 | 283k | "Orange", |
196 | 283k | "Papaya", |
197 | 283k | "Passionfruit", |
198 | 283k | "Peach", |
199 | 283k | "Pear", |
200 | 283k | "Persimmon", |
201 | 283k | "Physalis", |
202 | 283k | "Pineapple", |
203 | 283k | "Plantain", |
204 | 283k | "Plum", |
205 | 283k | "Plumcot (or Pluot)", |
206 | 283k | "Pomegranate", |
207 | 283k | "Pomelo", |
208 | 283k | "Prune (dried plum)", |
209 | 283k | "Purple mangosteen", |
210 | 283k | "Quince", |
211 | 283k | "Raisin", |
212 | 283k | "Rambutan", |
213 | 283k | "Raspberry", |
214 | 283k | "Redcurrant", |
215 | 283k | "Salak", |
216 | 283k | "Salal berry", |
217 | 283k | "Salmonberry", |
218 | 283k | "Satsuma", |
219 | 283k | "Star fruit", |
220 | 283k | "Strawberry", |
221 | 283k | "Tamarillo", |
222 | 283k | "Tamarind", |
223 | 283k | "Tangerine", |
224 | 283k | "Tomato", |
225 | 283k | "Ugli fruit", |
226 | 283k | "Watermelon", |
227 | 283k | "Yuzu" |
228 | 283k | }; |
229 | 283k | switch ((*rng)() % 6) { |
230 | 47.0k | case 0: |
231 | 47.0k | *holder = QLValue::Primitive(static_cast<int64_t>((*rng)())); |
232 | 47.0k | return ValueRef(*holder); |
233 | 46.6k | case 1: { |
234 | 46.6k | string s; |
235 | 396k | for (size_t j = 0; j < (*rng)() % 50; ++j349k ) { |
236 | 349k | s.push_back((*rng)() & 0xff); |
237 | 349k | } |
238 | 46.6k | *holder = QLValue::Primitive(s); |
239 | 46.6k | return ValueRef(*holder); |
240 | 0 | } |
241 | 47.1k | case 2: return ValueRef(ValueType::kNullLow); |
242 | 47.1k | case 3: return ValueRef(ValueType::kTrue); |
243 | 47.6k | case 4: return ValueRef(ValueType::kFalse); |
244 | 47.4k | case 5: { |
245 | 47.4k | *holder = QLValue::Primitive(kFruit[(*rng)() % kFruit.size()]); |
246 | 47.4k | return ValueRef(*holder); |
247 | 0 | } |
248 | 283k | } |
249 | 0 | LOG(FATAL) << "Should never get here"; |
250 | 0 | return ValueRef(ValueType::kNullLow); // to make the compiler happy |
251 | 283k | } |
252 | | |
253 | 54.5k | PrimitiveValue GenRandomPrimitiveValue(RandomNumberGenerator* rng) { |
254 | 54.5k | QLValuePB value_holder; |
255 | 54.5k | auto value_ref = GenRandomPrimitiveValue(rng, &value_holder); |
256 | 54.5k | if (value_ref.custom_value_type() != ValueType::kLowest) { |
257 | 27.2k | return PrimitiveValue(value_ref.custom_value_type()); |
258 | 27.2k | } |
259 | 27.3k | return PrimitiveValue::FromQLValuePB(value_holder, SortingType::kNotSpecified, false); |
260 | 54.5k | } |
261 | | |
262 | | // Generate a vector of random primitive values. |
263 | 14.2k | vector<PrimitiveValue> GenRandomPrimitiveValues(RandomNumberGenerator* rng, int max_num) { |
264 | 14.2k | vector<PrimitiveValue> result; |
265 | 54.6k | for (size_t i = 0; i < (*rng)() % (max_num + 1); ++i40.3k ) { |
266 | 40.3k | result.push_back(GenRandomPrimitiveValue(rng)); |
267 | 40.3k | } |
268 | 14.2k | return result; |
269 | 14.2k | } |
270 | | |
271 | 26 | DocKey CreateMinimalDocKey(RandomNumberGenerator* rng, UseHash use_hash) { |
272 | 26 | return use_hash ? DocKey(static_cast<DocKeyHash>((*rng)()), std::vector<PrimitiveValue>(), |
273 | 14 | std::vector<PrimitiveValue>()) : DocKey(); |
274 | 26 | } |
275 | | |
276 | 9.85k | DocKey GenRandomDocKey(RandomNumberGenerator* rng, UseHash use_hash) { |
277 | 9.85k | if (use_hash) { |
278 | 4.40k | return DocKey( |
279 | 4.40k | static_cast<uint32_t>((*rng)()), // this is just a random value, not a hash function result |
280 | 4.40k | GenRandomPrimitiveValues(rng), |
281 | 4.40k | GenRandomPrimitiveValues(rng)); |
282 | 5.45k | } else { |
283 | 5.45k | return DocKey(GenRandomPrimitiveValues(rng)); |
284 | 5.45k | } |
285 | 9.85k | } |
286 | | |
287 | 21 | vector<DocKey> GenRandomDocKeys(RandomNumberGenerator* rng, UseHash use_hash, int num_keys) { |
288 | 21 | vector<DocKey> result; |
289 | 21 | result.push_back(CreateMinimalDocKey(rng, use_hash)); |
290 | 4.87k | for (int iteration = 0; iteration < num_keys; ++iteration4.85k ) { |
291 | 4.85k | result.push_back(GenRandomDocKey(rng, use_hash)); |
292 | 4.85k | } |
293 | 21 | return result; |
294 | 21 | } |
295 | | |
296 | 5 | vector<SubDocKey> GenRandomSubDocKeys(RandomNumberGenerator* rng, UseHash use_hash, int num_keys) { |
297 | 5 | vector<SubDocKey> result; |
298 | 5 | result.push_back(SubDocKey(CreateMinimalDocKey(rng, use_hash), HybridTime((*rng)()))); |
299 | 5.00k | for (int iteration = 0; iteration < num_keys; ++iteration5.00k ) { |
300 | 5.00k | result.push_back(SubDocKey(GenRandomDocKey(rng, use_hash))); |
301 | 19.1k | for (size_t i = 0; i < (*rng)() % (kMaxNumRandomSubKeys + 1); ++i14.1k ) { |
302 | 14.1k | result.back().AppendSubKeysAndMaybeHybridTime(GenRandomPrimitiveValue(rng)); |
303 | 14.1k | } |
304 | 5.00k | const IntraTxnWriteId write_id = static_cast<IntraTxnWriteId>( |
305 | 5.00k | (*rng)() % 2 == 0 ? 02.47k : (*rng)() % 10000002.52k ); |
306 | 5.00k | result.back().set_hybrid_time(DocHybridTime(HybridTime((*rng)()), write_id)); |
307 | 5.00k | } |
308 | 5 | return result; |
309 | 5 | } |
310 | | // ------------------------------------------------------------------------------------------------ |
311 | | |
312 | 14 | void LogicalRocksDBDebugSnapshot::Capture(rocksdb::DB* rocksdb) { |
313 | 14 | kvs.clear(); |
314 | 14 | rocksdb::ReadOptions read_options; |
315 | 14 | auto iter = unique_ptr<rocksdb::Iterator>(rocksdb->NewIterator(read_options)); |
316 | 14 | iter->SeekToFirst(); |
317 | 102 | while (iter->Valid()) { |
318 | 88 | kvs.emplace_back(iter->key().ToBuffer(), iter->value().ToBuffer()); |
319 | 88 | iter->Next(); |
320 | 88 | } |
321 | | // Save the DocDB debug dump as a string so we can check that we've properly restored the snapshot |
322 | | // in RestoreTo. |
323 | 14 | docdb_debug_dump_str = DocDBDebugDumpToStr(rocksdb); |
324 | 14 | } |
325 | | |
326 | 22 | void LogicalRocksDBDebugSnapshot::RestoreTo(rocksdb::DB *rocksdb) const { |
327 | 22 | rocksdb::ReadOptions read_options; |
328 | 22 | rocksdb::WriteOptions write_options; |
329 | 22 | auto iter = unique_ptr<rocksdb::Iterator>(rocksdb->NewIterator(read_options)); |
330 | 22 | iter->SeekToFirst(); |
331 | 114 | while (iter->Valid()) { |
332 | 92 | ASSERT_OK(rocksdb->Delete(write_options, iter->key())); |
333 | 92 | iter->Next(); |
334 | 92 | } |
335 | 132 | for (const auto& kv : kvs)22 { |
336 | 132 | ASSERT_OK(rocksdb->Put(write_options, kv.first, kv.second)); |
337 | 132 | } |
338 | 22 | ASSERT_OK(FullyCompactDB(rocksdb)); |
339 | 22 | ASSERT_EQ(docdb_debug_dump_str, DocDBDebugDumpToStr(rocksdb)); |
340 | 22 | } |
341 | | |
342 | | // ------------------------------------------------------------------------------------------------ |
343 | | |
344 | | DocDBLoadGenerator::DocDBLoadGenerator(DocDBRocksDBFixture* fixture, |
345 | | const int num_doc_keys, |
346 | | const int num_unique_subkeys, |
347 | | const UseHash use_hash, |
348 | | const ResolveIntentsDuringRead resolve_intents, |
349 | | const int deletion_chance, |
350 | | const int max_nesting_level, |
351 | | const uint64 random_seed, |
352 | | const int verification_frequency) |
353 | | : fixture_(fixture), |
354 | | doc_keys_(GenRandomDocKeys(&random_, use_hash, num_doc_keys)), |
355 | | resolve_intents_(resolve_intents), |
356 | | possible_subkeys_(GenRandomPrimitiveValues(&random_, num_unique_subkeys)), |
357 | | iteration_(1), |
358 | | deletion_chance_(deletion_chance), |
359 | | max_nesting_level_(max_nesting_level), |
360 | 17 | verification_frequency_(verification_frequency) { |
361 | 17 | CHECK_GE(max_nesting_level_, 1); |
362 | | // Use a fixed seed so that tests are deterministic. |
363 | 17 | random_.seed(random_seed); |
364 | | |
365 | | // This is done so we can use VerifySnapshot with in_mem_docdb_. That should preform a "latest" |
366 | | // read. |
367 | 17 | in_mem_docdb_.SetCaptureHybridTime(HybridTime::kMax); |
368 | 17 | } |
369 | | |
370 | 16 | DocDBLoadGenerator::~DocDBLoadGenerator() = default; |
371 | | |
372 | 228k | void DocDBLoadGenerator::PerformOperation(bool compact_history) { |
373 | | // Increment the iteration right away so we can return from the function at any time. |
374 | 228k | const int current_iteration = iteration_; |
375 | 228k | ++iteration_; |
376 | | |
377 | 228k | DOCDB_DEBUG_LOG("Starting iteration i=$0", current_iteration); |
378 | 228k | auto dwb = fixture_->MakeDocWriteBatch(); |
379 | 228k | const auto& doc_key = RandomElementOf(doc_keys_, &random_); |
380 | 228k | const KeyBytes encoded_doc_key(doc_key.Encode()); |
381 | | |
382 | 228k | const SubDocument* current_doc = in_mem_docdb_.GetDocument(doc_key); |
383 | | |
384 | 228k | bool is_deletion = false; |
385 | 228k | if (current_doc != nullptr && |
386 | 228k | current_doc->value_type() != ValueType::kObject206k ) { |
387 | | // The entire document is not an object, let's delete it. |
388 | 20.6k | is_deletion = true; |
389 | 20.6k | } |
390 | | |
391 | 228k | vector<PrimitiveValue> subkeys; |
392 | 228k | if (!is_deletion) { |
393 | | // Add up to (max_nesting_level_ - 1) subkeys. Combined with the document key itself, this |
394 | | // gives us the desired maximum nesting level. |
395 | 696k | for (size_t j = 0; j < random_() % max_nesting_level_; ++j488k ) { |
396 | 513k | if (current_doc != nullptr && current_doc->value_type() != ValueType::kObject239k ) { |
397 | | // We can't add any more subkeys because we've found a primitive subdocument. |
398 | 25.4k | break; |
399 | 25.4k | } |
400 | 488k | subkeys.emplace_back(RandomElementOf(possible_subkeys_, &random_)); |
401 | 488k | if (current_doc != nullptr) { |
402 | 213k | current_doc = current_doc->GetChild(subkeys.back()); |
403 | 213k | } |
404 | 488k | } |
405 | 207k | } |
406 | | |
407 | 228k | const DocPath doc_path(encoded_doc_key, subkeys); |
408 | 228k | QLValuePB value_holder; |
409 | 228k | const auto value = GenRandomPrimitiveValue(&random_, &value_holder); |
410 | 228k | const HybridTime hybrid_time(current_iteration); |
411 | 228k | last_operation_ht_ = hybrid_time; |
412 | | |
413 | 228k | if (random_() % deletion_chance_ == 0) { |
414 | 2.20k | is_deletion = true; |
415 | 2.20k | } |
416 | | |
417 | 228k | const bool doc_already_exists_in_mem = |
418 | 228k | in_mem_docdb_.GetDocument(doc_key) != nullptr; |
419 | | |
420 | 228k | if (is_deletion) { |
421 | 22.7k | DOCDB_DEBUG_LOG("Iteration $0: deleting doc path $1", current_iteration, doc_path.ToString()); |
422 | 22.7k | ASSERT_OK(dwb.DeleteSubDoc(doc_path, ReadHybridTime::Max())); |
423 | 22.7k | ASSERT_OK(in_mem_docdb_.DeleteSubDoc(doc_path)); |
424 | 205k | } else { |
425 | 205k | DOCDB_DEBUG_LOG("Iteration $0: setting value at doc path $1 to $2", |
426 | 205k | current_iteration, doc_path.ToString(), value.ToString()); |
427 | 205k | auto pv = value.custom_value_type() != ValueType::kLowest |
428 | 205k | ? PrimitiveValue(value.custom_value_type())103k |
429 | 205k | : PrimitiveValue::FromQLValuePB(value_holder, SortingType::kNotSpecified)102k ; |
430 | 205k | ASSERT_OK(in_mem_docdb_.SetPrimitive(doc_path, pv)); |
431 | 205k | const auto set_primitive_status = dwb.SetPrimitive(doc_path, value); |
432 | 205k | if (!set_primitive_status.ok()) { |
433 | 0 | DocDBDebugDump(rocksdb(), std::cerr, StorageDbType::kRegular); |
434 | 0 | LOG(INFO) << "doc_path=" << doc_path.ToString(); |
435 | 0 | } |
436 | 205k | ASSERT_OK(set_primitive_status); |
437 | 205k | } |
438 | | |
439 | | // We perform our randomly chosen operation first, both on the production version of DocDB |
440 | | // sitting on top of RocksDB, and on the in-memory single-threaded debug version used for |
441 | | // validation. |
442 | 228k | ASSERT_OK(fixture_->WriteToRocksDB(dwb, hybrid_time)); |
443 | 228k | const SubDocument* const subdoc_from_mem = in_mem_docdb_.GetDocument(doc_key); |
444 | | |
445 | 228k | TransactionOperationContext txn_op_context = GetReadOperationTransactionContext(); |
446 | | |
447 | | // In case we are asked to compact history, we read the document from RocksDB before and after the |
448 | | // compaction, and expect to get the same result in both cases. |
449 | 457k | for (int do_compaction_now = 0; do_compaction_now <= compact_history; ++do_compaction_now228k ) { |
450 | 228k | if (do_compaction_now) { |
451 | | // This will happen between the two iterations of the loop. If compact_history is false, |
452 | | // there is only one iteration and the compaction does not happen. |
453 | 73 | fixture_->FullyCompactHistoryBefore(hybrid_time); |
454 | 73 | } |
455 | 228k | SubDocKey sub_doc_key(doc_key); |
456 | 228k | auto encoded_sub_doc_key = sub_doc_key.EncodeWithoutHt(); |
457 | 228k | auto doc_from_rocksdb_opt = ASSERT_RESULT(TEST_GetSubDocument( |
458 | 228k | encoded_sub_doc_key, doc_db(), rocksdb::kDefaultQueryId, txn_op_context, |
459 | 228k | CoarseTimePoint::max() /* deadline */)); |
460 | 228k | if (is_deletion && ( |
461 | 22.7k | doc_path.num_subkeys() == 0 || // Deleted the entire sub-document, |
462 | 22.7k | !doc_already_exists_in_mem1.82k )) { // or the document did not exist in the first place. |
463 | | // In this case, after performing the deletion operation, we definitely should not see the |
464 | | // top-level document in RocksDB or in the in-memory database. |
465 | 21.1k | ASSERT_FALSE(doc_from_rocksdb_opt); |
466 | 21.1k | ASSERT_EQ(nullptr, subdoc_from_mem); |
467 | 207k | } else { |
468 | | // This is not a deletion, or we've deleted a sub-key from a document, but the top-level |
469 | | // document should still be there in RocksDB. |
470 | 207k | ASSERT_TRUE(doc_from_rocksdb_opt); |
471 | 207k | ASSERT_NE(nullptr, subdoc_from_mem); |
472 | | |
473 | 207k | ASSERT_EQ(*subdoc_from_mem, *doc_from_rocksdb_opt); |
474 | 207k | DOCDB_DEBUG_LOG("Retrieved a document from RocksDB: $0", doc_from_rocksdb_opt->ToString()); |
475 | 207k | ASSERT_STR_EQ_VERBOSE_TRIMMED(subdoc_from_mem->ToString(), doc_from_rocksdb_opt->ToString()); |
476 | 207k | } |
477 | 228k | } |
478 | | |
479 | 228k | if (current_iteration % verification_frequency_ == 0) { |
480 | | // in_mem_docdb_ has its captured_at() hybrid_time set to HybridTime::kMax, so the following |
481 | | // will result in checking the latest state of DocDB stored in RocksDB against in_mem_docdb_. |
482 | 4.56k | ASSERT_NO_FATALS0 (VerifySnapshot(in_mem_docdb_)) |
483 | 4.56k | << "Discrepancy between RocksDB-based and in-memory DocDB state found after iteration " |
484 | 4.56k | << current_iteration; |
485 | 2.28k | } |
486 | 228k | } |
487 | | |
488 | 10.6k | HybridTime DocDBLoadGenerator::last_operation_ht() const { |
489 | 10.6k | CHECK(last_operation_ht_.is_valid()); |
490 | 10.6k | return last_operation_ht_; |
491 | 10.6k | } |
492 | | |
493 | 314 | void DocDBLoadGenerator::FlushRocksDB() { |
494 | 314 | LOG(INFO) << "Forcing a RocksDB flush after hybrid_time " << last_operation_ht().value(); |
495 | 314 | ASSERT_OK(fixture_->FlushRocksDbAndWait()); |
496 | 314 | } |
497 | | |
498 | 1.28k | void DocDBLoadGenerator::CaptureDocDbSnapshot() { |
499 | | // Capture snapshots from time to time. |
500 | 1.28k | docdb_snapshots_.emplace_back(); |
501 | 1.28k | docdb_snapshots_.back().CaptureAt(doc_db(), HybridTime::kMax); |
502 | 1.28k | docdb_snapshots_.back().SetCaptureHybridTime(last_operation_ht_); |
503 | 1.28k | } |
504 | | |
505 | 47 | void DocDBLoadGenerator::VerifyOldestSnapshot() { |
506 | 47 | if (!docdb_snapshots_.empty()) { |
507 | 42 | ASSERT_NO_FATALS(VerifySnapshot(GetOldestSnapshot())); |
508 | 42 | } |
509 | 47 | } |
510 | | |
511 | 47 | void DocDBLoadGenerator::CheckIfOldestSnapshotIsStillValid(const HybridTime cleanup_ht) { |
512 | 47 | if (docdb_snapshots_.empty()) { |
513 | 0 | return; |
514 | 0 | } |
515 | | |
516 | 47 | const InMemDocDbState* latest_snapshot_before_ht = nullptr; |
517 | 1.21k | for (const auto& snapshot : docdb_snapshots_) { |
518 | 1.21k | const HybridTime snap_ht = snapshot.captured_at(); |
519 | 1.21k | if (snap_ht.CompareTo(cleanup_ht) < 0 && |
520 | 1.21k | (631 latest_snapshot_before_ht == nullptr631 || |
521 | 631 | latest_snapshot_before_ht->captured_at().CompareTo(snap_ht) < 0599 )) { |
522 | 631 | latest_snapshot_before_ht = &snapshot; |
523 | 631 | } |
524 | 1.21k | } |
525 | | |
526 | 47 | if (latest_snapshot_before_ht == nullptr) { |
527 | 15 | return; |
528 | 15 | } |
529 | | |
530 | 32 | const auto& snapshot = *latest_snapshot_before_ht; |
531 | 32 | LOG(INFO) << "Checking whether snapshot at hybrid_time " |
532 | 32 | << snapshot.captured_at().ToDebugString() |
533 | 32 | << " is no longer valid after history cleanup for hybrid_times before " |
534 | 32 | << cleanup_ht.ToDebugString() |
535 | 32 | << ", last operation hybrid_time: " << last_operation_ht() << "."; |
536 | 32 | RecordSnapshotDivergence(snapshot, cleanup_ht); |
537 | 32 | } |
538 | | |
539 | 2.74k | void DocDBLoadGenerator::VerifyRandomDocDbSnapshot() { |
540 | 2.74k | if (!docdb_snapshots_.empty()) { |
541 | 2.73k | const int snapshot_idx = NextRandomInt(narrow_cast<int>(docdb_snapshots_.size())); |
542 | 2.73k | ASSERT_NO_FATALS(VerifySnapshot(docdb_snapshots_[snapshot_idx])); |
543 | 2.73k | } |
544 | 2.74k | } |
545 | | |
546 | 47 | void DocDBLoadGenerator::RemoveSnapshotsBefore(HybridTime ht) { |
547 | 47 | docdb_snapshots_.erase( |
548 | 47 | std::remove_if(docdb_snapshots_.begin(), |
549 | 47 | docdb_snapshots_.end(), |
550 | 1.21k | [=](const InMemDocDbState& entry) { return entry.captured_at() < ht; }), |
551 | 47 | docdb_snapshots_.end()); |
552 | | // Double-check that there is no state corruption in any of the snapshots. Such corruption |
553 | | // happened when I (Mikhail) initially forgot to add the "erase" call above (as per the |
554 | | // "erase/remove" C++ idiom), and ended up with a bunch of moved-from objects still in the |
555 | | // snapshots array. |
556 | 588 | for (const auto& snapshot : docdb_snapshots_) { |
557 | 588 | snapshot.SanityCheck(); |
558 | 588 | } |
559 | 47 | } |
560 | | |
561 | 42 | const InMemDocDbState& DocDBLoadGenerator::GetOldestSnapshot() { |
562 | 42 | CHECK(!docdb_snapshots_.empty()); |
563 | 42 | return *std::min_element( |
564 | 42 | docdb_snapshots_.begin(), |
565 | 42 | docdb_snapshots_.end(), |
566 | 546 | [](const InMemDocDbState& a, const InMemDocDbState& b) { |
567 | 546 | return a.captured_at() < b.captured_at(); |
568 | 546 | }); |
569 | 42 | } |
570 | | |
571 | 5.06k | void DocDBLoadGenerator::VerifySnapshot(const InMemDocDbState& snapshot) { |
572 | 5.06k | const HybridTime snap_ht = snapshot.captured_at(); |
573 | 5.06k | InMemDocDbState flashback_state; |
574 | | |
575 | 5.06k | string details_msg; |
576 | 5.06k | { |
577 | 5.06k | stringstream details_ss; |
578 | 5.06k | details_ss << "After operation at hybrid_time " << last_operation_ht().value() << ": " |
579 | 5.06k | << "performing a flashback query at hybrid_time " << snap_ht.ToDebugString() << " " |
580 | 5.06k | << "(last operation's hybrid_time: " << last_operation_ht() << ") " |
581 | 5.06k | << "and verifying it against the snapshot captured at that hybrid_time."; |
582 | 5.06k | details_msg = details_ss.str(); |
583 | 5.06k | } |
584 | 5.06k | LOG(INFO) << details_msg; |
585 | | |
586 | 5.06k | flashback_state.CaptureAt(doc_db(), snap_ht); |
587 | 5.06k | const bool is_match = flashback_state.EqualsAndLogDiff(snapshot); |
588 | 5.06k | if (!is_match) { |
589 | 0 | LOG(ERROR) << details_msg << "\nDOCDB SNAPSHOT VERIFICATION FAILED, DOCDB STATE:"; |
590 | 0 | fixture_->DocDBDebugDumpToConsole(); |
591 | 0 | } |
592 | 10.1k | ASSERT_TRUE(is_match) << details_msg; |
593 | 5.06k | } |
594 | | |
595 | | void DocDBLoadGenerator::RecordSnapshotDivergence(const InMemDocDbState &snapshot, |
596 | 32 | const HybridTime cleanup_ht) { |
597 | 32 | InMemDocDbState flashback_state; |
598 | 32 | const auto snap_ht = snapshot.captured_at(); |
599 | 32 | flashback_state.CaptureAt(doc_db(), snap_ht); |
600 | 32 | if (!flashback_state.EqualsAndLogDiff(snapshot, /* log_diff = */ false)) { |
601 | | // Implicitly converting hybrid_times to ints. That's OK, because we're using small enough |
602 | | // integer values for hybrid_times. |
603 | 26 | divergent_snapshot_ht_and_cleanup_ht_.emplace_back(snapshot.captured_at().value(), |
604 | 26 | cleanup_ht.value()); |
605 | 26 | } |
606 | 32 | } |
607 | | |
608 | 228k | TransactionOperationContext DocDBLoadGenerator::GetReadOperationTransactionContext() { |
609 | 228k | if (resolve_intents_) { |
610 | 88.4k | return kNonTransactionalOperationContext; |
611 | 88.4k | } |
612 | 140k | return TransactionOperationContext(); |
613 | 228k | } |
614 | | |
615 | | // ------------------------------------------------------------------------------------------------ |
616 | | |
617 | 222 | void DocDBRocksDBFixture::AssertDocDbDebugDumpStrEq(const string &expected) { |
618 | 222 | const string debug_dump_str = TrimDocDbDebugDumpStr(DocDBDebugDumpToStr()); |
619 | 222 | const string expected_str = TrimDocDbDebugDumpStr(expected); |
620 | 222 | if (expected_str != debug_dump_str) { |
621 | 0 | auto expected_lines = StringSplit(expected_str, '\n'); |
622 | 0 | auto actual_lines = StringSplit(debug_dump_str, '\n'); |
623 | 0 | vector<size_t> mismatch_line_numbers; |
624 | 0 | for (size_t i = 0; i < std::min(expected_lines.size(), actual_lines.size()); ++i) { |
625 | 0 | if (expected_lines[i] != actual_lines[i]) { |
626 | 0 | mismatch_line_numbers.push_back(i + 1); |
627 | 0 | } |
628 | 0 | } |
629 | 0 | LOG(ERROR) << "Assertion failure" |
630 | 0 | << "\nExpected DocDB contents:\n\n" << expected_str << "\n" |
631 | 0 | << "\nActual DocDB contents:\n\n" << debug_dump_str << "\n" |
632 | 0 | << "\nExpected # of lines: " << expected_lines.size() |
633 | 0 | << ", actual # of lines: " << actual_lines.size() |
634 | 0 | << "\nLines not matching: " << AsString(mismatch_line_numbers) |
635 | 0 | << "\nPlease check if source files have trailing whitespace and remove it."; |
636 | |
|
637 | 0 | FAIL(); |
638 | 0 | } |
639 | 222 | } |
640 | | |
641 | 270 | void DocDBRocksDBFixture::FullyCompactHistoryBefore(HybridTime history_cutoff) { |
642 | 270 | LOG(INFO) << "Major-compacting history before hybrid_time " << history_cutoff; |
643 | 270 | SetHistoryCutoffHybridTime(history_cutoff); |
644 | 270 | auto se = ScopeExit([this] { |
645 | 270 | SetHistoryCutoffHybridTime(HybridTime::kMin); |
646 | 270 | }); |
647 | | |
648 | 270 | ASSERT_OK(FlushRocksDbAndWait()); |
649 | 270 | ASSERT_OK(FullyCompactDB(regular_db_.get())); |
650 | 270 | } |
651 | | |
652 | | void DocDBRocksDBFixture::MinorCompaction( |
653 | | HybridTime history_cutoff, |
654 | | size_t num_files_to_compact, |
655 | 24 | ssize_t start_index) { |
656 | | |
657 | 24 | ASSERT_OK(FlushRocksDbAndWait()); |
658 | 24 | SetHistoryCutoffHybridTime(history_cutoff); |
659 | 24 | auto se = ScopeExit([this] { |
660 | 24 | SetHistoryCutoffHybridTime(HybridTime::kMin); |
661 | 24 | }); |
662 | | |
663 | 24 | rocksdb::ColumnFamilyMetaData cf_meta; |
664 | 24 | regular_db_->GetColumnFamilyMetaData(&cf_meta); |
665 | | |
666 | 24 | vector<string> compaction_input_file_names; |
667 | 24 | vector<string> remaining_file_names; |
668 | | |
669 | 24 | size_t initial_num_files = 0; |
670 | 24 | { |
671 | 24 | const auto& files = cf_meta.levels[0].files; |
672 | 24 | initial_num_files = files.size(); |
673 | 24 | ASSERT_LE(num_files_to_compact, files.size()); |
674 | 24 | vector<string> file_names; |
675 | 106 | for (const auto& sst_meta : files) { |
676 | 106 | file_names.push_back(sst_meta.name); |
677 | 106 | } |
678 | 24 | SortByKey(file_names.begin(), file_names.end(), rocksdb::TableFileNameToNumber); |
679 | | |
680 | 24 | if (start_index < 0) { |
681 | 20 | start_index = file_names.size() - num_files_to_compact; |
682 | 20 | } |
683 | | |
684 | 130 | for (size_t i = 0; i < file_names.size(); ++i106 ) { |
685 | 106 | if (implicit_cast<size_t>(start_index) <= i && |
686 | 106 | compaction_input_file_names.size() < num_files_to_compact62 ) { |
687 | 54 | compaction_input_file_names.push_back(file_names[i]); |
688 | 54 | } else { |
689 | 52 | remaining_file_names.push_back(file_names[i]); |
690 | 52 | } |
691 | 106 | } |
692 | 48 | ASSERT_EQ(num_files_to_compact, compaction_input_file_names.size()) |
693 | 48 | << "Tried to add " << num_files_to_compact << " files starting with index " << start_index |
694 | 48 | << ", ended up adding " << yb::ToString(compaction_input_file_names) |
695 | 48 | << " and leaving " << yb::ToString(remaining_file_names) << " out. All files: " |
696 | 48 | << yb::ToString(file_names); |
697 | | |
698 | 24 | LOG(INFO) << "Minor-compacting history before hybrid_time " << history_cutoff << ":\n" |
699 | 24 | << " files being compacted: " << yb::ToString(compaction_input_file_names) << "\n" |
700 | 24 | << " other files: " << yb::ToString(remaining_file_names); |
701 | | |
702 | 24 | ASSERT_OK(regular_db_->CompactFiles( |
703 | 24 | rocksdb::CompactionOptions(), |
704 | 24 | compaction_input_file_names, |
705 | 24 | /* output_level */ 0)); |
706 | 24 | const auto sstables_after_compaction = SSTableFileNames(); |
707 | 24 | LOG(INFO) << "SSTable files after compaction: " << sstables_after_compaction.size() |
708 | 24 | << " (" << yb::ToString(sstables_after_compaction) << ")"; |
709 | 52 | for (const auto& remaining_file : remaining_file_names) { |
710 | 104 | ASSERT_TRUE( |
711 | 104 | std::find(sstables_after_compaction.begin(), sstables_after_compaction.end(), |
712 | 104 | remaining_file) != sstables_after_compaction.end() |
713 | 104 | ) << "File " << remaining_file << " not found in file list after compaction: " |
714 | 104 | << yb::ToString(sstables_after_compaction) << ", even though none of these files were " |
715 | 104 | << "supposed to be compacted: " << yb::ToString(remaining_file_names); |
716 | 52 | } |
717 | 24 | } |
718 | | |
719 | 24 | regular_db_->GetColumnFamilyMetaData(&cf_meta); |
720 | 24 | vector<string> files_after_compaction; |
721 | 76 | for (const auto& sst_meta : cf_meta.levels[0].files) { |
722 | 76 | files_after_compaction.push_back(sst_meta.name); |
723 | 76 | } |
724 | 24 | const int64_t expected_resulting_num_files = initial_num_files - num_files_to_compact + 1; |
725 | 48 | ASSERT_EQ(expected_resulting_num_files, |
726 | 48 | static_cast<int64_t>(cf_meta.levels[0].files.size())) |
727 | 48 | << "Files after compaction: " << yb::ToString(files_after_compaction); |
728 | 24 | } |
729 | | |
730 | 24 | size_t DocDBRocksDBFixture::NumSSTableFiles() { |
731 | 24 | rocksdb::ColumnFamilyMetaData cf_meta; |
732 | 24 | regular_db_->GetColumnFamilyMetaData(&cf_meta); |
733 | 24 | return cf_meta.levels[0].files.size(); |
734 | 24 | } |
735 | | |
736 | 24 | StringVector DocDBRocksDBFixture::SSTableFileNames() { |
737 | 24 | rocksdb::ColumnFamilyMetaData cf_meta; |
738 | 24 | regular_db_->GetColumnFamilyMetaData(&cf_meta); |
739 | 24 | StringVector files; |
740 | 76 | for (const auto& sstable_meta : cf_meta.levels[0].files) { |
741 | 76 | files.push_back(sstable_meta.name); |
742 | 76 | } |
743 | 24 | SortByKey(files.begin(), files.end(), rocksdb::TableFileNameToNumber); |
744 | 24 | return files; |
745 | 24 | } |
746 | | |
747 | 26 | Status DocDBRocksDBFixture::FormatDocWriteBatch(const DocWriteBatch &dwb, string* dwb_str) { |
748 | 26 | WriteBatchFormatter formatter; |
749 | 26 | rocksdb::WriteBatch rocksdb_write_batch; |
750 | 26 | RETURN_NOT_OK(PopulateRocksDBWriteBatch(dwb, &rocksdb_write_batch)); |
751 | 26 | RETURN_NOT_OK(rocksdb_write_batch.Iterate(&formatter)); |
752 | 26 | *dwb_str = formatter.str(); |
753 | 26 | return Status::OK(); |
754 | 26 | } |
755 | | |
756 | 292 | Status FullyCompactDB(rocksdb::DB* rocksdb) { |
757 | 292 | rocksdb::CompactRangeOptions compact_range_options; |
758 | 292 | return rocksdb->CompactRange(compact_range_options, nullptr, nullptr); |
759 | 292 | } |
760 | | |
761 | 119 | Status DocDBRocksDBFixture::InitRocksDBDir() { |
762 | 119 | string test_dir; |
763 | 119 | RETURN_NOT_OK(Env::Default()->GetTestDirectory(&test_dir)); |
764 | 119 | rocksdb_dir_ = JoinPathSegments(test_dir, StringPrintf("mytestdb-%d", rand())); |
765 | 119 | CHECK(!rocksdb_dir_.empty()); // Check twice before we recursively delete anything. |
766 | 119 | CHECK_NE(rocksdb_dir_, "/"); |
767 | 119 | RETURN_NOT_OK(Env::Default()->DeleteRecursively(rocksdb_dir_)); |
768 | 119 | RETURN_NOT_OK(Env::Default()->DeleteRecursively(IntentsDBDir())); |
769 | 119 | return Status::OK(); |
770 | 119 | } |
771 | | |
772 | 0 | string DocDBRocksDBFixture::tablet_id() { |
773 | 0 | return "mytablet"; |
774 | 0 | } |
775 | | |
776 | 119 | Status DocDBRocksDBFixture::InitRocksDBOptions() { |
777 | 119 | RETURN_NOT_OK(InitCommonRocksDBOptionsForTests()); |
778 | 119 | return Status::OK(); |
779 | 119 | } |
780 | | |
781 | 444 | string TrimDocDbDebugDumpStr(const string& debug_dump_str) { |
782 | 444 | return TrimStr(ApplyEagerLineContinuation(LeftShiftTextBlock(TrimCppComments(debug_dump_str)))); |
783 | 444 | } |
784 | | |
785 | | } // namespace docdb |
786 | | } // namespace yb |