YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/docdb/intent_aware_iterator.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/docdb/intent_aware_iterator.h"
15
16
#include <future>
17
18
#include "yb/common/doc_hybrid_time.h"
19
#include "yb/common/hybrid_time.h"
20
#include "yb/common/transaction.h"
21
22
#include "yb/docdb/docdb_fwd.h"
23
#include "yb/docdb/shared_lock_manager_fwd.h"
24
#include "yb/docdb/conflict_resolution.h"
25
#include "yb/docdb/doc_key.h"
26
#include "yb/docdb/doc_kv_util.h"
27
#include "yb/docdb/docdb-internal.h"
28
#include "yb/docdb/docdb_rocksdb_util.h"
29
#include "yb/docdb/intent.h"
30
#include "yb/docdb/key_bounds.h"
31
#include "yb/docdb/transaction_dump.h"
32
#include "yb/docdb/value.h"
33
#include "yb/docdb/value_type.h"
34
35
#include "yb/util/bytes_formatter.h"
36
#include "yb/util/debug-util.h"
37
#include "yb/util/result.h"
38
#include "yb/util/status_format.h"
39
#include "yb/util/trace.h"
40
41
using namespace std::literals;
42
43
namespace yb {
44
namespace docdb {
45
46
namespace {
47
48
7.90M
void GetIntentPrefixForKeyWithoutHt(const Slice& key, KeyBytes* out) {
49
7.90M
  out->Clear();
50
  // Since caller guarantees that key_bytes doesn't have hybrid time, we can simply use it
51
  // to get prefix for all related intents.
52
7.90M
  out->AppendRawBytes(key);
53
7.90M
}
54
55
0
KeyBytes GetIntentPrefixForKeyWithoutHt(const Slice& key) {
56
0
  KeyBytes result;
57
0
  GetIntentPrefixForKeyWithoutHt(key, &result);
58
0
  return result;
59
0
}
60
61
324M
void AppendEncodedDocHt(const Slice& encoded_doc_ht, KeyBytes* key_bytes) {
62
324M
  key_bytes->AppendValueType(ValueType::kHybridTime);
63
324M
  key_bytes->AppendRawBytes(encoded_doc_ht);
64
324M
}
65
66
const char kStrongWriteTail[] = {
67
    ValueTypeAsChar::kIntentTypeSet,
68
    static_cast<char>(IntentTypeSet({IntentType::kStrongWrite}).ToUIntPtr()) };
69
70
const Slice kStrongWriteTailSlice = Slice(kStrongWriteTail, sizeof(kStrongWriteTail));
71
72
char kEmptyKeyStrongWriteTail[] = {
73
    ValueTypeAsChar::kGroupEnd,
74
    ValueTypeAsChar::kIntentTypeSet,
75
    static_cast<char>(IntentTypeSet({IntentType::kStrongWrite}).ToUIntPtr()) };
76
77
const Slice kEmptyKeyStrongWriteTailSlice =
78
    Slice(kEmptyKeyStrongWriteTail, sizeof(kEmptyKeyStrongWriteTail));
79
80
45.0M
Slice StrongWriteSuffix(const KeyBytes& key) {
81
45.0M
  return key.empty() ? kEmptyKeyStrongWriteTailSlice : kStrongWriteTailSlice;
82
45.0M
}
83
84
// We are not interested in weak and read intents here.
85
// So could just skip them.
86
2.72M
void AppendStrongWrite(KeyBytes* out) {
87
2.72M
  out->AppendRawBytes(StrongWriteSuffix(*out));
88
2.72M
}
89
90
} // namespace
91
92
namespace {
93
94
struct DecodeStrongWriteIntentResult {
95
  Slice intent_prefix;
96
  Slice intent_value;
97
  DocHybridTime intent_time;
98
  DocHybridTime value_time;
99
  IntentTypeSet intent_types;
100
101
  // Whether this intent from the same transaction as specified in context.
102
  bool same_transaction = false;
103
104
0
  std::string ToString() const {
105
0
    return Format("{ intent_prefix: $0 intent_value: $1 intent_time: $2 value_time: $3 "
106
0
                  "same_transaction: $4 intent_types: $5 }",
107
0
                  intent_prefix.ToDebugHexString(), intent_value.ToDebugHexString(), intent_time,
108
0
                  value_time, same_transaction, intent_types);
109
0
  }
110
111
  // Returns the upper limit for the "value time" of an intent in order for the intent to be visible
112
  // in the read results. The "value time" is defined as follows:
113
  //   - For uncommitted transactions, the "value time" is the time when the intent was written.
114
  //     Note that same_transaction or in_txn_limit could only be set for uncommited transactions.
115
  //   - For committed transactions, the "value time" is the commit time.
116
  //
117
  // The logic here is as follows:
118
  //   - When a transaction is reading its own intents, the in_txn_limit allows a statement to
119
  //     avoid seeing its own partial results. This is necessary for statements such as INSERT ...
120
  //     SELECT to avoid reading rows that the same statement generated and going into an infinite
121
  //     loop.
122
  //   - If an intent's hybrid time is greater than the tablet's local limit, then this intent
123
  //     cannot lead to a read restart and we only need to see it if its commit time is less than or
124
  //     equal to read_time.
125
  //   - If an intent's hybrid time is <= than the tablet's local limit, then we cannot claim that
126
  //     the intent was written after the read transaction began based on the local limit, and we
127
  //     must compare the intent's commit time with global_limit and potentially perform a read
128
  //     restart, because the transaction that wrote the intent might have been committed before our
129
  //     read transaction begin.
130
1.62M
  HybridTime MaxAllowedValueTime(const ReadHybridTime& read_time) const {
131
1.62M
    if (same_transaction) {
132
1.55M
      return read_time.in_txn_limit;
133
1.55M
    }
134
66.5k
    return intent_time.hybrid_time() > read_time.local_limit
135
66.4k
        ? read_time.read : read_time.global_limit;
136
66.5k
  }
137
};
138
139
0
std::ostream& operator<<(std::ostream& out, const DecodeStrongWriteIntentResult& result) {
140
0
  return out << result.ToString();
141
0
}
142
143
// Decodes intent based on intent_iterator and its transaction commit time if intent is a strong
144
// write intent, intent is not for row locking, and transaction is already committed at specified
145
// time or is current transaction.
146
// Returns HybridTime::kMin as value_time otherwise.
147
// For current transaction returns intent record hybrid time as value_time.
148
// Consumes intent from value_slice leaving only value itself.
149
Result<DecodeStrongWriteIntentResult> DecodeStrongWriteIntent(
150
    HybridTime global_limit,
151
    const TransactionOperationContext& txn_op_context,
152
    rocksdb::Iterator* intent_iter,
153
2.17M
    TransactionStatusCache* transaction_status_cache) {
154
2.17M
  DecodeStrongWriteIntentResult result;
155
2.17M
  auto decoded_intent_key = VERIFY_RESULT(DecodeIntentKey(intent_iter->key()));
156
2.17M
  result.intent_prefix = decoded_intent_key.intent_prefix;
157
2.17M
  result.intent_types = decoded_intent_key.intent_types;
158
2.17M
  if (result.intent_types.Test(IntentType::kStrongWrite)) {
159
2.05M
    auto intent_value = intent_iter->value();
160
2.05M
    auto decoded_intent_value = VERIFY_RESULT(DecodeIntentValue(intent_value));
161
162
2.05M
    auto decoded_txn_id = decoded_intent_value.transaction_id;
163
2.05M
    auto decoded_subtxn_id = decoded_intent_value.subtransaction_id;
164
165
2.05M
    result.intent_value = decoded_intent_value.body;
166
2.05M
    result.intent_time = decoded_intent_key.doc_ht;
167
2.05M
    result.same_transaction = decoded_txn_id == txn_op_context.transaction_id;
168
169
    // By setting the value time to kMin, we ensure the caller ignores this intent. This is true
170
    // because the caller is skipping all intents written before or at the same time as
171
    // intent_dht_from_same_txn_ or resolved_intent_txn_dht_, which of course are greater than or
172
    // equal to DocHybridTime::kMin.
173
2.05M
    if (result.intent_value.starts_with(ValueTypeAsChar::kRowLock)) {
174
246k
      result.value_time = DocHybridTime::kMin;
175
1.80M
    } else if (result.same_transaction) {
176
1.68M
      if (txn_op_context.subtransaction.aborted.Test(decoded_subtxn_id)) {
177
        // If this intent is from the same transaction, we can check the aborted set from this
178
        // txn_op_context to see whether the intent is still live. If not, mask it from the caller.
179
408
        result.value_time = DocHybridTime::kMin;
180
1.68M
      } else {
181
1.68M
        result.value_time = decoded_intent_key.doc_ht;
182
1.68M
      }
183
119k
    } else if (result.intent_time.hybrid_time() > global_limit) {
184
1.42k
      VTRACE(1, "Ignoring intent from a different txn written after read.global_limit");
185
1.42k
      result.value_time = DocHybridTime::kMin;
186
117k
    } else {
187
117k
      auto commit_data = VERIFY_RESULT(transaction_status_cache->GetCommitData(decoded_txn_id));
188
117k
      auto commit_ht = commit_data.commit_ht;
189
117k
      auto aborted_subtxn_set = commit_data.aborted_subtxn_set;
190
117k
      auto is_aborted_subtxn = aborted_subtxn_set.Test(decoded_subtxn_id);
191
117k
      result.value_time = commit_ht == HybridTime::kMin || is_aborted_subtxn
192
47.6k
          ? DocHybridTime::kMin
193
70.1k
          : DocHybridTime(commit_ht, decoded_intent_value.write_id);
194
65
      VLOG(4) << "Transaction id: " << decoded_txn_id
195
65
              << ", subtransaction id: " << decoded_subtxn_id
196
65
              << ", value time: " << result.value_time
197
65
              << ", value: " << result.intent_value.ToDebugHexString()
198
65
              << ", aborted subtxn set: " << aborted_subtxn_set.ToString();
199
117k
    }
200
119k
  } else {
201
119k
    result.value_time = DocHybridTime::kMin;
202
119k
  }
203
2.17M
  return result;
204
2.17M
}
205
206
// Given that key is well-formed DocDB encoded key, checks if it is an intent key for the same key
207
// as intent_prefix. If key is not well-formed DocDB encoded key, result could be true or false.
208
1.59M
bool IsIntentForTheSameKey(const Slice& key, const Slice& intent_prefix) {
209
1.59M
  return key.starts_with(intent_prefix) &&
210
143k
         key.size() > intent_prefix.size() &&
211
143k
         IntentValueType(key[intent_prefix.size()]);
212
1.59M
}
213
214
2
std::string DebugDumpKeyToStr(const Slice &key) {
215
2
  return key.ToDebugString() + " (" + SubDocKey::DebugSliceToString(key) + ")";
216
2
}
217
218
1
std::string DebugDumpKeyToStr(const KeyBytes &key) {
219
1
  return DebugDumpKeyToStr(key.AsSlice());
220
1
}
221
222
34.4M
bool DebugHasHybridTime(const Slice& subdoc_key_encoded) {
223
34.4M
  SubDocKey subdoc_key;
224
34.4M
  CHECK(subdoc_key.FullyDecodeFromKeyWithOptionalHybridTime(subdoc_key_encoded).ok());
225
34.4M
  return subdoc_key.has_hybrid_time();
226
34.4M
}
227
228
24.8M
std::string EncodeHybridTime(HybridTime value) {
229
24.8M
  return DocHybridTime(value, kMaxWriteId).EncodedInDocDbFormat();
230
24.8M
}
231
232
} // namespace
233
234
IntentAwareIterator::IntentAwareIterator(
235
    const DocDB& doc_db,
236
    const rocksdb::ReadOptions& read_opts,
237
    CoarseTimePoint deadline,
238
    const ReadHybridTime& read_time,
239
    const TransactionOperationContext& txn_op_context)
240
    : read_time_(read_time),
241
      encoded_read_time_read_(EncodeHybridTime(read_time_.read)),
242
      encoded_read_time_local_limit_(EncodeHybridTime(read_time_.local_limit)),
243
      encoded_read_time_global_limit_(EncodeHybridTime(read_time_.global_limit)),
244
      encoded_read_time_regular_limit_(
245
          read_time_.local_limit > read_time_.read ? Slice(encoded_read_time_local_limit_)
246
                                                   : Slice(encoded_read_time_read_)),
247
      txn_op_context_(txn_op_context),
248
8.27M
      transaction_status_cache_(txn_op_context_, read_time, deadline) {
249
8.27M
  VTRACE(1, __func__);
250
9.03k
  VLOG(4) << "IntentAwareIterator, read_time: " << read_time
251
9.03k
          << ", txn_op_context: " << txn_op_context_;
252
253
8.27M
  if (txn_op_context) {
254
4.47M
    VTRACE(1, "Checking MinRunningTime");
255
4.47M
    const auto min_running_ht = txn_op_context.txn_status_manager->MinRunningHybridTime();
256
4.47M
    if (min_running_ht != HybridTime::kMax && min_running_ht < read_time.global_limit) {
257
2.57M
      intent_iter_ = docdb::CreateRocksDBIterator(doc_db.intents,
258
2.57M
                                                  doc_db.key_bounds,
259
2.57M
                                                  docdb::BloomFilterMode::DONT_USE_BLOOM_FILTER,
260
2.57M
                                                  boost::none,
261
2.57M
                                                  rocksdb::kDefaultQueryId,
262
2.57M
                                                  nullptr /* file_filter */,
263
2.57M
                                                  &intent_upperbound_);
264
1.90M
    } else {
265
21
      VLOG(4) << "No relevant transactions running: "
266
21
              << "min_running_ht=" << min_running_ht << ", "
267
21
              << "global_limit=" << read_time.global_limit;
268
1.90M
    }
269
4.47M
  }
270
8.27M
  VTRACE(2, "Done Checking MinRunningTime");
271
  // WARNING: Is is important for regular DB iterator to be created after intents DB iterator,
272
  // otherwise consistency could break, for example in following scenario:
273
  // 1) Transaction is T1 committed with value v1 for k1, but not yet applied to regular DB.
274
  // 2) Client reads v1 for k1.
275
  // 3) Regular DB iterator is created on a regular DB snapshot containing no values for k1.
276
  // 4) Transaction T1 is applied, k1->v1 is written into regular DB, intent k1->v1 is deleted.
277
  // 5) Intents DB iterator is created on an intents DB snapshot containing no intents for k1.
278
  // 6) Client reads no values for k1.
279
8.27M
  iter_ = BoundedRocksDbIterator(doc_db.regular, read_opts, doc_db.key_bounds);
280
8.27M
  VTRACE(2, "Created iterator");
281
8.27M
}
282
283
1.44k
void IntentAwareIterator::Seek(const DocKey &doc_key) {
284
1.44k
  Seek(doc_key.Encode());
285
1.44k
}
286
287
19.5M
void IntentAwareIterator::Seek(const Slice& key) {
288
1.80k
  VLOG(4) << "Seek(" << SubDocKey::DebugSliceToString(key) << ")";
289
19.5M
  DOCDB_DEBUG_SCOPE_LOG(
290
0
      key.ToDebugString(),
291
0
      std::bind(&IntentAwareIterator::DebugDump, this));
292
19.5M
  if (!status_.ok()) {
293
0
    return;
294
0
  }
295
296
19.5M
  ROCKSDB_SEEK(&iter_, key);
297
19.5M
  skip_future_records_needed_ = true;
298
299
19.5M
  if (intent_iter_.Initialized()) {
300
2.72M
    seek_intent_iter_needed_ = SeekIntentIterNeeded::kSeek;
301
2.72M
    GetIntentPrefixForKeyWithoutHt(key, &seek_key_buffer_);
302
2.72M
    AppendStrongWrite(&seek_key_buffer_);
303
2.72M
  }
304
19.5M
}
305
306
34.1M
void IntentAwareIterator::SeekForward(const Slice& key) {
307
34.1M
  KeyBytes key_bytes;
308
  // Reserve space for key plus kMaxBytesPerEncodedHybridTime + 1 bytes for SeekForward() below to
309
  // avoid extra realloc while appending the read time.
310
34.1M
  key_bytes.Reserve(key.size() + kMaxBytesPerEncodedHybridTime + 1);
311
34.1M
  key_bytes.AppendRawBytes(key);
312
34.1M
  SeekForward(&key_bytes);
313
34.1M
}
314
315
290M
void IntentAwareIterator::SeekForward(KeyBytes* key_bytes) {
316
18.4E
  VLOG(4) << "SeekForward(" << SubDocKey::DebugSliceToString(*key_bytes) << ")";
317
290M
  DOCDB_DEBUG_SCOPE_LOG(
318
0
      SubDocKey::DebugSliceToString(*key_bytes),
319
0
      std::bind(&IntentAwareIterator::DebugDump, this));
320
290M
  if (!status_.ok()) {
321
0
    return;
322
0
  }
323
324
290M
  const size_t key_size = key_bytes->size();
325
290M
  AppendEncodedDocHt(encoded_read_time_global_limit_, key_bytes);
326
290M
  SeekForwardRegular(*key_bytes);
327
290M
  key_bytes->Truncate(key_size);
328
290M
  if (intent_iter_.Initialized() && status_.ok()) {
329
42.3M
    UpdatePlannedIntentSeekForward(
330
42.3M
        *key_bytes, StrongWriteSuffix(*key_bytes), /* use_suffix_for_prefix= */ false);
331
42.3M
  }
332
290M
}
333
334
void IntentAwareIterator::UpdatePlannedIntentSeekForward(const Slice& key,
335
                                                         const Slice& suffix,
336
48.4M
                                                         bool use_suffix_for_prefix) {
337
48.4M
  if (seek_intent_iter_needed_ != SeekIntentIterNeeded::kNoNeed &&
338
0
      seek_key_buffer_.AsSlice().GreaterOrEqual(key, suffix)) {
339
0
    return;
340
0
  }
341
48.4M
  seek_key_buffer_.Clear();
342
48.4M
  seek_key_buffer_.AppendRawBytes(key);
343
48.4M
  seek_key_buffer_.AppendRawBytes(suffix);
344
48.5M
  if (seek_intent_iter_needed_ == SeekIntentIterNeeded::kNoNeed) {
345
48.5M
    seek_intent_iter_needed_ = SeekIntentIterNeeded::kSeekForward;
346
48.5M
  }
347
48.4M
  seek_key_prefix_ = seek_key_buffer_.AsSlice();
348
48.4M
  if (!use_suffix_for_prefix) {
349
42.3M
    seek_key_prefix_.remove_suffix(suffix.size());
350
42.3M
  }
351
48.4M
}
352
353
// TODO: If TTL rows are ever supported on subkeys, this may need to change appropriately.
354
// Otherwise, this function might seek past the TTL merge record, but not the original
355
// record for the actual subkey.
356
23.0M
void IntentAwareIterator::SeekPastSubKey(const Slice& key) {
357
18.4E
  VLOG(4) << "SeekPastSubKey(" << SubDocKey::DebugSliceToString(key) << ")";
358
23.0M
  if (!status_.ok()) {
359
0
    return;
360
0
  }
361
362
23.0M
  docdb::SeekPastSubKey(key, &iter_);
363
23.0M
  skip_future_records_needed_ = true;
364
23.0M
  if (intent_iter_.Initialized() && status_.ok()) {
365
    // Skip all intents for subdoc_key.
366
858k
    char kSuffix = ValueTypeAsChar::kGreaterThanIntentType;
367
858k
    UpdatePlannedIntentSeekForward(key, Slice(&kSuffix, 1));
368
858k
  }
369
23.0M
}
370
371
41.2M
void IntentAwareIterator::SeekOutOfSubDoc(KeyBytes* key_bytes) {
372
18.4E
  VLOG(4) << "SeekOutOfSubDoc(" << SubDocKey::DebugSliceToString(*key_bytes) << ")";
373
41.2M
  if (!status_.ok()) {
374
0
    return;
375
0
  }
376
377
41.2M
  docdb::SeekOutOfSubKey(key_bytes, &iter_);
378
41.2M
  skip_future_records_needed_ = true;
379
41.2M
  if (intent_iter_.Initialized() && status_.ok()) {
380
    // See comment for SubDocKey::AdvanceOutOfSubDoc.
381
5.31M
    const char kSuffix = ValueTypeAsChar::kMaxByte;
382
5.31M
    UpdatePlannedIntentSeekForward(*key_bytes, Slice(&kSuffix, 1));
383
5.31M
  }
384
41.2M
}
385
386
40.2M
void IntentAwareIterator::SeekOutOfSubDoc(const Slice& key) {
387
40.2M
  KeyBytes key_bytes;
388
  // Reserve space for key + 1 byte for docdb::SeekOutOfSubKey() above to avoid extra realloc while
389
  // appending kMaxByte.
390
40.2M
  key_bytes.Reserve(key.size() + 1);
391
40.2M
  key_bytes.AppendRawBytes(key);
392
40.2M
  SeekOutOfSubDoc(&key_bytes);
393
40.2M
}
394
395
441M
bool IntentAwareIterator::HasCurrentEntry() {
396
441M
  return iter_valid_ || resolved_intent_state_ == ResolvedIntentState::kValid;
397
441M
}
398
399
0
void IntentAwareIterator::SeekToLastDocKey() {
400
0
  iter_.SeekToLast();
401
0
  SkipFutureRecords(Direction::kBackward);
402
0
  if (intent_iter_.Initialized()) {
403
0
    ResetIntentUpperbound();
404
0
    intent_iter_.SeekToLast();
405
0
    SeekToSuitableIntent<Direction::kBackward>();
406
0
    seek_intent_iter_needed_ = SeekIntentIterNeeded::kNoNeed;
407
0
    skip_future_intents_needed_ = false;
408
0
  }
409
0
  if (HasCurrentEntry()) {
410
0
    SeekToLatestDocKeyInternal();
411
0
  }
412
0
}
413
414
template <class T>
415
1.14k
void Assign(const T& value, T* out) {
416
1.14k
  if (out) {
417
1.05k
    *out = value;
418
1.05k
  }
419
1.14k
}
_ZN2yb5docdb6AssignINS_5SliceEEEvRKT_PS3_
Line
Count
Source
415
615
void Assign(const T& value, T* out) {
416
615
  if (out) {
417
525
    *out = value;
418
525
  }
419
615
}
_ZN2yb5docdb6AssignINS_13DocHybridTimeEEEvRKT_PS3_
Line
Count
Source
415
525
void Assign(const T& value, T* out) {
416
525
  if (out) {
417
525
    *out = value;
418
525
  }
419
525
}
420
421
// If we reach a different key, stop seeking.
422
Status IntentAwareIterator::NextFullValue(
423
    DocHybridTime* latest_record_ht,
424
    Slice* result_value,
425
615
    Slice* final_key) {
426
615
  if (!latest_record_ht || !result_value)
427
0
    return STATUS(Corruption, "The arguments latest_record_ht and "
428
615
                              "result_value cannot be null pointers.");
429
615
  RETURN_NOT_OK(status_);
430
615
  Slice v;
431
615
  if (!valid() || !IsMergeRecord(v = value())) {
432
525
    auto key_data = VERIFY_RESULT(FetchKey());
433
525
    Assign(key_data.key, final_key);
434
525
    Assign(key_data.write_time, latest_record_ht);
435
525
    *result_value = v;
436
525
    return status_;
437
90
  }
438
439
90
  *latest_record_ht = DocHybridTime::kMin;
440
90
  const auto key_data = VERIFY_RESULT(FetchKey());
441
90
  auto key = key_data.key;
442
90
  const size_t key_size = key.size();
443
90
  bool found_record = false;
444
445
195
  while ((found_record = iter_.Valid()) &&  // as long as we're pointing to a record
446
195
         (key = iter_.key()).starts_with(key_data.key) &&  // with the same key we started with
447
195
         key[key_size] == ValueTypeAsChar::kHybridTime && // whose key ends with a HT
448
195
         IsMergeRecord(v = iter_.value())) { // and whose value is a merge record
449
105
    iter_.Next(); // advance the iterator
450
105
  }
451
452
90
  if (found_record) {
453
90
    *result_value = v;
454
90
    *latest_record_ht = VERIFY_RESULT(DocHybridTime::DecodeFromEnd(&key));
455
90
    Assign(key, final_key);
456
90
  }
457
458
90
  found_record = false;
459
90
  if (intent_iter_.Initialized()) {
460
0
    while ((found_record = IsIntentForTheSameKey(intent_iter_.key(), key_data.key)) &&
461
0
           IsMergeRecord(v = intent_iter_.value())) {
462
0
      intent_iter_.Next();
463
0
    }
464
0
    DocHybridTime doc_ht;
465
0
    if (found_record && !(key = intent_iter_.key()).empty() &&
466
0
        (doc_ht = VERIFY_RESULT(DocHybridTime::DecodeFromEnd(&key))) >= *latest_record_ht) {
467
0
      *latest_record_ht = doc_ht;
468
0
      *result_value = v;
469
0
      Assign(key, final_key);
470
0
    }
471
0
  }
472
473
90
  if (*latest_record_ht == DocHybridTime::kMin) {
474
0
    iter_valid_ = false;
475
0
  }
476
90
  return status_;
477
90
}
478
479
413k
bool IntentAwareIterator::PreparePrev(const Slice& key) {
480
18.4E
  VLOG(4) << __func__ << "(" << SubDocKey::DebugSliceToString(key) << ")";
481
482
413k
  ROCKSDB_SEEK(&iter_, key);
483
484
413k
  if (iter_.Valid()) {
485
413k
    iter_.Prev();
486
18.4E
  } else {
487
18.4E
    iter_.SeekToLast();
488
18.4E
  }
489
413k
  SkipFutureRecords(Direction::kBackward);
490
491
413k
  if (intent_iter_.Initialized()) {
492
0
    ResetIntentUpperbound();
493
0
    ROCKSDB_SEEK(&intent_iter_, GetIntentPrefixForKeyWithoutHt(key));
494
0
    if (intent_iter_.Valid()) {
495
0
      intent_iter_.Prev();
496
0
    } else {
497
0
      intent_iter_.SeekToLast();
498
0
    }
499
0
    SeekToSuitableIntent<Direction::kBackward>();
500
0
    seek_intent_iter_needed_ = SeekIntentIterNeeded::kNoNeed;
501
0
    skip_future_intents_needed_ = false;
502
0
  }
503
504
413k
  return HasCurrentEntry();
505
413k
}
506
507
0
void IntentAwareIterator::PrevSubDocKey(const KeyBytes& key_bytes) {
508
0
  if (PreparePrev(key_bytes)) {
509
0
    SeekToLatestSubDocKeyInternal();
510
0
  }
511
0
}
512
513
0
void IntentAwareIterator::PrevDocKey(const DocKey& doc_key) {
514
0
  PrevDocKey(doc_key.Encode().AsSlice());
515
0
}
516
517
413k
void IntentAwareIterator::PrevDocKey(const Slice& encoded_doc_key) {
518
413k
  if (PreparePrev(encoded_doc_key)) {
519
413k
    SeekToLatestDocKeyInternal();
520
413k
  }
521
413k
}
522
523
413k
Slice IntentAwareIterator::LatestSubDocKey() {
524
35
  DCHECK(HasCurrentEntry())
525
35
      << "Expected iter_valid(" << iter_valid_ << ") || resolved_intent_state_("
526
35
      << resolved_intent_state_ << ") == ResolvedIntentState::kValid";
527
413k
  return IsEntryRegular(/* descending */ true) ? iter_.key()
528
29
                                               : resolved_intent_key_prefix_.AsSlice();
529
413k
}
530
531
0
void IntentAwareIterator::SeekToLatestSubDocKeyInternal() {
532
0
  auto subdockey_slice = LatestSubDocKey();
533
534
  // Strip the hybrid time and seek the slice.
535
0
  auto doc_ht = DocHybridTime::DecodeFromEnd(&subdockey_slice);
536
0
  if (!doc_ht.ok()) {
537
0
    status_ = doc_ht.status();
538
0
    return;
539
0
  }
540
0
  subdockey_slice.remove_suffix(1);
541
0
  Seek(subdockey_slice);
542
0
}
543
544
413k
void IntentAwareIterator::SeekToLatestDocKeyInternal() {
545
413k
  auto subdockey_slice = LatestSubDocKey();
546
547
  // Seek to the first key for row containing found subdockey.
548
413k
  auto dockey_size = DocKey::EncodedSize(subdockey_slice, DocKeyPart::kWholeDocKey);
549
413k
  if (!dockey_size.ok()) {
550
0
    status_ = dockey_size.status();
551
0
    return;
552
0
  }
553
413k
  Seek(Slice(subdockey_slice.data(), *dockey_size));
554
413k
}
555
556
440M
void IntentAwareIterator::SeekIntentIterIfNeeded() {
557
440M
  if (seek_intent_iter_needed_ == SeekIntentIterNeeded::kNoNeed || !status_.ok()) {
558
390M
    return;
559
390M
  }
560
50.6M
  status_ = SetIntentUpperbound();
561
50.6M
  if (!status_.ok()) {
562
0
    return;
563
0
  }
564
50.6M
  switch (seek_intent_iter_needed_) {
565
0
    case SeekIntentIterNeeded::kNoNeed:
566
0
      break;
567
2.67M
    case SeekIntentIterNeeded::kSeek:
568
400
      VLOG(4) << __func__ << ", seek: " << SubDocKey::DebugSliceToString(seek_key_buffer_);
569
2.67M
      ROCKSDB_SEEK(&intent_iter_, seek_key_buffer_);
570
2.67M
      SeekToSuitableIntent<Direction::kForward>();
571
2.67M
      seek_intent_iter_needed_ = SeekIntentIterNeeded::kNoNeed;
572
2.67M
      return;
573
48.0M
    case SeekIntentIterNeeded::kSeekForward:
574
48.0M
      SeekForwardToSuitableIntent();
575
48.0M
      seek_intent_iter_needed_ = SeekIntentIterNeeded::kNoNeed;
576
48.0M
      return;
577
0
  }
578
0
  FATAL_INVALID_ENUM_VALUE(SeekIntentIterNeeded, seek_intent_iter_needed_);
579
0
}
580
581
440M
bool IntentAwareIterator::valid() {
582
440M
  if (skip_future_records_needed_) {
583
405M
    SkipFutureRecords(Direction::kForward);
584
405M
  }
585
440M
  SeekIntentIterIfNeeded();
586
440M
  if (skip_future_intents_needed_) {
587
353M
    SkipFutureIntents();
588
353M
  }
589
440M
  return !status_.ok() || HasCurrentEntry();
590
440M
}
591
592
545M
bool IntentAwareIterator::IsEntryRegular(bool descending) {
593
545M
  if (PREDICT_FALSE(!iter_valid_)) {
594
3.02M
    return false;
595
3.02M
  }
596
542M
  if (resolved_intent_state_ == ResolvedIntentState::kValid) {
597
295k
    return (iter_.key().compare(resolved_intent_sub_doc_key_encoded_) < 0) != descending;
598
295k
  }
599
542M
  return true;
600
542M
}
601
602
294M
Result<FetchKeyResult> IntentAwareIterator::FetchKey() {
603
294M
  RETURN_NOT_OK(status_);
604
294M
  FetchKeyResult result;
605
294M
  if (IsEntryRegular()) {
606
293M
    result.key = iter_.key();
607
293M
    result.write_time = VERIFY_RESULT(DocHybridTime::DecodeFromEnd(&result.key));
608
18.4E
    DCHECK(result.key.ends_with(ValueTypeAsChar::kHybridTime)) << result.key.ToDebugString();
609
293M
    result.key.remove_suffix(1);
610
293M
    result.same_transaction = false;
611
293M
    max_seen_ht_.MakeAtLeast(result.write_time.hybrid_time());
612
1.71M
  } else {
613
1.71M
    DCHECK_EQ(ResolvedIntentState::kValid, resolved_intent_state_);
614
1.71M
    result.key = resolved_intent_key_prefix_.AsSlice();
615
1.71M
    result.write_time = GetIntentDocHybridTime();
616
1.71M
    result.same_transaction = ResolvedIntentFromSameTransaction();
617
1.71M
    max_seen_ht_.MakeAtLeast(resolved_intent_txn_dht_.hybrid_time());
618
1.71M
  }
619
223k
  VLOG(4) << "Fetched key " << SubDocKey::DebugSliceToString(result.key)
620
223k
          << ", regular: " << IsEntryRegular()
621
223k
          << ", with time: " << result.write_time
622
223k
          << ", while read bounds are: " << read_time_;
623
624
294M
  YB_TRANSACTION_DUMP(
625
294M
      Read, txn_op_context_ ? txn_op_context_.txn_status_manager->tablet_id() : TabletId(),
626
294M
      txn_op_context_ ? txn_op_context_.transaction_id : TransactionId::Nil(),
627
294M
      read_time_, result.write_time, result.same_transaction,
628
294M
      result.key.size(), result.key, value().size(), value());
629
630
294M
  return result;
631
294M
}
632
633
250M
Slice IntentAwareIterator::value() {
634
250M
  if (IsEntryRegular()) {
635
18.4E
    VLOG(4) << "IntentAwareIterator::value() returning iter_.value(): "
636
18.4E
            << iter_.value().ToDebugHexString() << " or " << FormatSliceAsStr(iter_.value());
637
248M
    return iter_.value();
638
1.46M
  } else {
639
1.46M
    DCHECK_EQ(ResolvedIntentState::kValid, resolved_intent_state_);
640
18.4E
    VLOG(4) << "IntentAwareIterator::value() returning resolved_intent_value_: "
641
18.4E
            << resolved_intent_value_.AsSlice().ToDebugHexString();
642
1.46M
    return resolved_intent_value_;
643
1.46M
  }
644
250M
}
645
646
324M
void IntentAwareIterator::SeekForwardRegular(const Slice& slice) {
647
18.4E
  VLOG(4) << "SeekForwardRegular(" << SubDocKey::DebugSliceToString(slice) << ")";
648
324M
  docdb::SeekForward(slice, &iter_);
649
324M
  skip_future_records_needed_ = true;
650
324M
}
651
652
372M
bool IntentAwareIterator::SatisfyBounds(const Slice& slice) {
653
372M
  return upperbound_.empty() || slice.compare(upperbound_) <= 0;
654
372M
}
655
656
2.17M
void IntentAwareIterator::ProcessIntent() {
657
2.17M
  auto decode_result = DecodeStrongWriteIntent(
658
2.17M
      read_time_.global_limit, txn_op_context_, &intent_iter_, &transaction_status_cache_);
659
2.17M
  if (!decode_result.ok()) {
660
0
    status_ = decode_result.status();
661
0
    return;
662
0
  }
663
18.4E
  VLOG(4) << "Intent decode: " << DebugIntentKeyToString(intent_iter_.key())
664
18.4E
          << " => " << intent_iter_.value().ToDebugHexString() << ", result: " << *decode_result;
665
2.17M
  DOCDB_DEBUG_LOG(
666
2.17M
      "resolved_intent_txn_dht_: $0 value_time: $1 read_time: $2",
667
2.17M
      resolved_intent_txn_dht_.ToString(),
668
2.17M
      decode_result->value_time.ToString(),
669
2.17M
      read_time_.ToString());
670
1.93M
  auto resolved_intent_time = decode_result->same_transaction ? intent_dht_from_same_txn_
671
238k
                                                              : resolved_intent_txn_dht_;
672
  // If we already resolved intent that is newer that this one, we should ignore current
673
  // intent because we are interested in the most recent intent only.
674
2.17M
  if (decode_result->value_time <= resolved_intent_time) {
675
552k
    return;
676
552k
  }
677
678
  // Ignore intent past read limit.
679
1.62M
  if (decode_result->value_time.hybrid_time() > decode_result->MaxAllowedValueTime(read_time_)) {
680
47
    return;
681
47
  }
682
683
1.62M
  if (resolved_intent_state_ == ResolvedIntentState::kNoIntent) {
684
1.62M
    resolved_intent_key_prefix_.Reset(decode_result->intent_prefix);
685
1.62M
    auto prefix = CurrentPrefix();
686
1.62M
    if (!decode_result->intent_prefix.starts_with(prefix)) {
687
0
      resolved_intent_state_ = ResolvedIntentState::kInvalidPrefix;
688
1.62M
    } else if (!SatisfyBounds(decode_result->intent_prefix)) {
689
0
      resolved_intent_state_ = ResolvedIntentState::kNoIntent;
690
1.62M
    } else {
691
1.62M
      resolved_intent_state_ = ResolvedIntentState::kValid;
692
1.62M
    }
693
1.62M
  }
694
1.62M
  if (decode_result->same_transaction) {
695
1.55M
    intent_dht_from_same_txn_ = decode_result->value_time;
696
    // We set resolved_intent_txn_dht_ to maximum possible time (time higher than read_time_.read
697
    // will cause read restart or will be ignored if higher than read_time_.global_limit) in
698
    // order to ignore intents/values from other transactions. But we save origin intent time into
699
    // intent_dht_from_same_txn_, so we can compare time of intents for the same key from the same
700
    // transaction and select the latest one.
701
1.55M
    resolved_intent_txn_dht_ = DocHybridTime(read_time_.read, kMaxWriteId);
702
66.6k
  } else {
703
66.6k
    resolved_intent_txn_dht_ = decode_result->value_time;
704
66.6k
  }
705
1.62M
  resolved_intent_value_.Reset(decode_result->intent_value);
706
1.62M
}
707
708
1.62M
void IntentAwareIterator::UpdateResolvedIntentSubDocKeyEncoded() {
709
1.62M
  resolved_intent_sub_doc_key_encoded_.Reset(resolved_intent_key_prefix_.AsSlice());
710
1.62M
  resolved_intent_sub_doc_key_encoded_.AppendValueType(ValueType::kHybridTime);
711
1.62M
  resolved_intent_sub_doc_key_encoded_.AppendHybridTime(resolved_intent_txn_dht_);
712
18.4E
  VLOG(4) << "Resolved intent SubDocKey: "
713
18.4E
          << DebugDumpKeyToStr(resolved_intent_sub_doc_key_encoded_);
714
1.62M
}
715
716
53.2M
void IntentAwareIterator::SeekForwardToSuitableIntent() {
717
18.4E
  VLOG(4) << __func__ << "(" << DebugDumpKeyToStr(seek_key_buffer_) << ")";
718
719
53.2M
  DOCDB_DEBUG_SCOPE_LOG(seek_key_buffer_.ToString(),
720
0
                        std::bind(&IntentAwareIterator::DebugDump, this));
721
53.2M
  if (resolved_intent_state_ != ResolvedIntentState::kNoIntent &&
722
1.91M
      resolved_intent_key_prefix_.CompareTo(seek_key_prefix_) >= 0) {
723
18.4E
    VLOG(4) << __func__ << ", has suitable " << AsString(resolved_intent_state_) << " intent: "
724
18.4E
            << DebugDumpKeyToStr(resolved_intent_key_prefix_);
725
326k
    return;
726
326k
  }
727
728
52.8M
  if (VLOG_IS_ON(4)) {
729
0
    if (resolved_intent_state_ != ResolvedIntentState::kNoIntent) {
730
0
      VLOG(4) << __func__ << ", has NOT suitable " << AsString(resolved_intent_state_)
731
0
              << " intent: " << DebugDumpKeyToStr(resolved_intent_key_prefix_);
732
0
    }
733
734
0
    if (intent_iter_.Valid()) {
735
0
      VLOG(4) << __func__ << ", current position: " << DebugDumpKeyToStr(intent_iter_.key());
736
0
    } else {
737
0
      VLOG(4) << __func__ << ", iterator invalid";
738
0
    }
739
0
  }
740
741
52.8M
  docdb::SeekForward(seek_key_buffer_.AsSlice(), &intent_iter_);
742
52.8M
  SeekToSuitableIntent<Direction::kForward>();
743
52.8M
}
744
745
template<Direction direction>
746
101M
void IntentAwareIterator::SeekToSuitableIntent() {
747
101M
  DOCDB_DEBUG_SCOPE_LOG(/* msg */ "", std::bind(&IntentAwareIterator::DebugDump, this));
748
101M
  resolved_intent_state_ = ResolvedIntentState::kNoIntent;
749
101M
  resolved_intent_txn_dht_ = DocHybridTime::kMin;
750
101M
  intent_dht_from_same_txn_ = DocHybridTime::kMin;
751
101M
  auto prefix = CurrentPrefix();
752
753
  // Find latest suitable intent for the first SubDocKey having suitable intents.
754
104M
  while (intent_iter_.Valid()) {
755
5.01M
    auto intent_key = intent_iter_.key();
756
5.01M
    if (intent_key[0] == ValueTypeAsChar::kTransactionId) {
757
      // If the intent iterator ever enters the transaction metadata and reverse index region, skip
758
      // past it.
759
457k
      switch (direction) {
760
457k
        case Direction::kForward: {
761
457k
          static const std::array<char, 1> kAfterTransactionId{ValueTypeAsChar::kTransactionId + 1};
762
457k
          static const Slice kAfterTxnRegion(kAfterTransactionId);
763
457k
          intent_iter_.Seek(kAfterTxnRegion);
764
457k
          break;
765
0
        }
766
0
        case Direction::kBackward:
767
0
          intent_upperbound_keybytes_.Clear();
768
0
          intent_upperbound_keybytes_.AppendValueType(ValueType::kTransactionId);
769
0
          intent_upperbound_ = intent_upperbound_keybytes_.AsSlice();
770
0
          intent_iter_.SeekToLast();
771
0
          break;
772
457k
      }
773
457k
      continue;
774
457k
    }
775
45
    VLOG(4) << "Intent found: " << DebugIntentKeyToString(intent_key)
776
45
            << ", resolved state: " << yb::ToString(resolved_intent_state_);
777
4.55M
    if (resolved_intent_state_ != ResolvedIntentState::kNoIntent &&
778
        // Only scan intents for the first SubDocKey having suitable intents.
779
1.59M
        !IsIntentForTheSameKey(intent_key, resolved_intent_key_prefix_)) {
780
1.45M
      break;
781
1.45M
    }
782
3.09M
    if (!intent_key.starts_with(prefix) || !SatisfyBounds(intent_key)) {
783
924k
      break;
784
924k
    }
785
2.17M
    ProcessIntent();
786
2.17M
    if (!status_.ok()) {
787
0
      return;
788
0
    }
789
2.17M
    switch (direction) {
790
2.17M
      case Direction::kForward:
791
2.17M
        intent_iter_.Next();
792
2.17M
        break;
793
0
      case Direction::kBackward:
794
0
        intent_iter_.Prev();
795
0
        break;
796
2.17M
    }
797
2.17M
  }
798
101M
  if (resolved_intent_state_ != ResolvedIntentState::kNoIntent) {
799
1.62M
    UpdateResolvedIntentSubDocKeyEncoded();
800
1.62M
  }
801
101M
}
Unexecuted instantiation: _ZN2yb5docdb19IntentAwareIterator20SeekToSuitableIntentILNS0_9DirectionE1EEEvv
_ZN2yb5docdb19IntentAwareIterator20SeekToSuitableIntentILNS0_9DirectionE0EEEvv
Line
Count
Source
746
101M
void IntentAwareIterator::SeekToSuitableIntent() {
747
101M
  DOCDB_DEBUG_SCOPE_LOG(/* msg */ "", std::bind(&IntentAwareIterator::DebugDump, this));
748
101M
  resolved_intent_state_ = ResolvedIntentState::kNoIntent;
749
101M
  resolved_intent_txn_dht_ = DocHybridTime::kMin;
750
101M
  intent_dht_from_same_txn_ = DocHybridTime::kMin;
751
101M
  auto prefix = CurrentPrefix();
752
753
  // Find latest suitable intent for the first SubDocKey having suitable intents.
754
104M
  while (intent_iter_.Valid()) {
755
5.01M
    auto intent_key = intent_iter_.key();
756
5.01M
    if (intent_key[0] == ValueTypeAsChar::kTransactionId) {
757
      // If the intent iterator ever enters the transaction metadata and reverse index region, skip
758
      // past it.
759
457k
      switch (direction) {
760
457k
        case Direction::kForward: {
761
457k
          static const std::array<char, 1> kAfterTransactionId{ValueTypeAsChar::kTransactionId + 1};
762
457k
          static const Slice kAfterTxnRegion(kAfterTransactionId);
763
457k
          intent_iter_.Seek(kAfterTxnRegion);
764
457k
          break;
765
0
        }
766
0
        case Direction::kBackward:
767
0
          intent_upperbound_keybytes_.Clear();
768
0
          intent_upperbound_keybytes_.AppendValueType(ValueType::kTransactionId);
769
0
          intent_upperbound_ = intent_upperbound_keybytes_.AsSlice();
770
0
          intent_iter_.SeekToLast();
771
0
          break;
772
457k
      }
773
457k
      continue;
774
457k
    }
775
45
    VLOG(4) << "Intent found: " << DebugIntentKeyToString(intent_key)
776
45
            << ", resolved state: " << yb::ToString(resolved_intent_state_);
777
4.55M
    if (resolved_intent_state_ != ResolvedIntentState::kNoIntent &&
778
        // Only scan intents for the first SubDocKey having suitable intents.
779
1.59M
        !IsIntentForTheSameKey(intent_key, resolved_intent_key_prefix_)) {
780
1.45M
      break;
781
1.45M
    }
782
3.09M
    if (!intent_key.starts_with(prefix) || !SatisfyBounds(intent_key)) {
783
924k
      break;
784
924k
    }
785
2.17M
    ProcessIntent();
786
2.17M
    if (!status_.ok()) {
787
0
      return;
788
0
    }
789
2.17M
    switch (direction) {
790
2.17M
      case Direction::kForward:
791
2.17M
        intent_iter_.Next();
792
2.17M
        break;
793
0
      case Direction::kBackward:
794
0
        intent_iter_.Prev();
795
0
        break;
796
2.17M
    }
797
2.17M
  }
798
101M
  if (resolved_intent_state_ != ResolvedIntentState::kNoIntent) {
799
1.62M
    UpdateResolvedIntentSubDocKeyEncoded();
800
1.62M
  }
801
101M
}
802
803
0
void IntentAwareIterator::DebugDump() {
804
0
  bool is_valid = valid();
805
0
  LOG(INFO) << ">> IntentAwareIterator dump";
806
0
  LOG(INFO) << "iter_.Valid(): " << iter_.Valid();
807
0
  if (iter_.Valid()) {
808
0
    LOG(INFO) << "iter_.key(): " << DebugDumpKeyToStr(iter_.key());
809
0
  }
810
0
  if (intent_iter_.Initialized()) {
811
0
    LOG(INFO) << "intent_iter_.Valid(): " << intent_iter_.Valid();
812
0
    if (intent_iter_.Valid()) {
813
0
      LOG(INFO) << "intent_iter_.key(): " << intent_iter_.key().ToDebugHexString();
814
0
    }
815
0
  }
816
0
  LOG(INFO) << "resolved_intent_state_: " << yb::ToString(resolved_intent_state_);
817
0
  if (resolved_intent_state_ != ResolvedIntentState::kNoIntent) {
818
0
    LOG(INFO) << "resolved_intent_sub_doc_key_encoded_: "
819
0
              << DebugDumpKeyToStr(resolved_intent_sub_doc_key_encoded_);
820
0
  }
821
0
  LOG(INFO) << "valid(): " << is_valid;
822
0
  if (valid()) {
823
0
    auto key_data = FetchKey();
824
0
    if (key_data.ok()) {
825
0
      LOG(INFO) << "key(): " << DebugDumpKeyToStr(key_data->key)
826
0
                << ", doc_ht: " << key_data->write_time;
827
0
    } else {
828
0
      LOG(INFO) << "key(): fetch failed: " << key_data.status();
829
0
    }
830
0
  }
831
0
  LOG(INFO) << "<< IntentAwareIterator dump";
832
0
}
833
834
Result<DocHybridTime>
835
5.17M
IntentAwareIterator::FindMatchingIntentRecordDocHybridTime(const Slice& key_without_ht) {
836
18.4E
  VLOG(4) << __func__ << "(" << SubDocKey::DebugSliceToString(key_without_ht) << ")";
837
5.17M
  GetIntentPrefixForKeyWithoutHt(key_without_ht, &seek_key_buffer_);
838
5.17M
  seek_key_prefix_ = seek_key_buffer_.AsSlice();
839
840
5.17M
  SeekForwardToSuitableIntent();
841
5.17M
  RETURN_NOT_OK(status_);
842
843
5.17M
  if (resolved_intent_state_ != ResolvedIntentState::kValid) {
844
5.06M
    return DocHybridTime::kInvalid;
845
5.06M
  }
846
847
112k
  if (resolved_intent_key_prefix_.CompareTo(seek_key_buffer_) == 0) {
848
9.13k
    max_seen_ht_.MakeAtLeast(resolved_intent_txn_dht_.hybrid_time());
849
9.13k
    return GetIntentDocHybridTime();
850
9.13k
  }
851
102k
  return DocHybridTime::kInvalid;
852
102k
}
853
854
Result<DocHybridTime>
855
IntentAwareIterator::GetMatchingRegularRecordDocHybridTime(
856
34.2M
    const Slice& key_without_ht) {
857
34.2M
  DocHybridTime doc_ht;
858
34.2M
  size_t other_encoded_ht_size = 0;
859
34.2M
  RETURN_NOT_OK(CheckHybridTimeSizeAndValueType(iter_.key(), &other_encoded_ht_size));
860
34.2M
  Slice iter_key_without_ht = iter_.key();
861
34.2M
  iter_key_without_ht.remove_suffix(1 + other_encoded_ht_size);
862
34.2M
  if (key_without_ht == iter_key_without_ht) {
863
563k
    RETURN_NOT_OK(DecodeHybridTimeFromEndOfKey(iter_.key(), &doc_ht));
864
563k
    max_seen_ht_.MakeAtLeast(doc_ht.hybrid_time());
865
563k
    return doc_ht;
866
33.7M
  }
867
33.7M
  return DocHybridTime::kInvalid;
868
33.7M
}
869
870
Result<HybridTime> IntentAwareIterator::FindOldestRecord(
871
984
    const Slice& key_without_ht, HybridTime min_hybrid_time) {
872
1
  VLOG(4) << "FindOldestRecord("
873
1
          << SubDocKey::DebugSliceToString(key_without_ht) << " = "
874
1
          << key_without_ht.ToDebugHexString() << " , " << min_hybrid_time
875
1
          << ")";
876
984
#define DOCDB_DEBUG
877
984
  DOCDB_DEBUG_SCOPE_LOG(SubDocKey::DebugSliceToString(key_without_ht) + ", " +
878
0
                            yb::ToString(min_hybrid_time),
879
0
                        std::bind(&IntentAwareIterator::DebugDump, this));
880
984
#undef DOCDB_DEBUG
881
984
  DCHECK(!DebugHasHybridTime(key_without_ht));
882
883
984
  RETURN_NOT_OK(status_);
884
984
  if (!valid()) {
885
0
    VLOG(4) << "Returning kInvalid";
886
0
    return HybridTime::kInvalid;
887
0
  }
888
889
984
  HybridTime result;
890
984
  if (intent_iter_.Initialized()) {
891
0
    auto intent_dht = VERIFY_RESULT(FindMatchingIntentRecordDocHybridTime(key_without_ht));
892
0
    VLOG(4) << "Looking for Intent Record found ?  =  "
893
0
            << (intent_dht != DocHybridTime::kInvalid);
894
0
    if (intent_dht != DocHybridTime::kInvalid &&
895
0
        intent_dht.hybrid_time() > min_hybrid_time) {
896
0
      result = intent_dht.hybrid_time();
897
0
      VLOG(4) << " oldest_record_ht is now " << result;
898
0
    }
899
984
  } else {
900
0
    VLOG(4) << "intent_iter_ not Initialized";
901
984
  }
902
903
984
  seek_key_buffer_.Reserve(key_without_ht.size() +
904
984
                           kMaxBytesPerEncodedHybridTime);
905
984
  seek_key_buffer_.Reset(key_without_ht);
906
984
  seek_key_buffer_.AppendValueType(ValueType::kHybridTime);
907
984
  seek_key_buffer_.AppendHybridTime(
908
984
      DocHybridTime(min_hybrid_time, kMaxWriteId));
909
984
  SeekForwardRegular(seek_key_buffer_);
910
984
  RETURN_NOT_OK(status_);
911
984
  if (iter_.Valid()) {
912
980
    iter_.Prev();
913
4
  } else {
914
4
    iter_.SeekToLast();
915
4
  }
916
984
  SkipFutureRecords(Direction::kForward);
917
918
984
  if (iter_valid_) {
919
937
    DocHybridTime regular_dht =
920
937
        VERIFY_RESULT(GetMatchingRegularRecordDocHybridTime(key_without_ht));
921
0
    VLOG(4) << "Looking for Matching Regular Record found   =  " << regular_dht;
922
937
    if (regular_dht != DocHybridTime::kInvalid &&
923
14
        regular_dht.hybrid_time() > min_hybrid_time) {
924
14
      result.MakeAtMost(regular_dht.hybrid_time());
925
14
    }
926
47
  } else {
927
0
    VLOG(4) << "iter_valid_ is false";
928
47
  }
929
1
  VLOG(4) << "Returning " << result;
930
984
  return result;
931
984
}
932
933
Status IntentAwareIterator::FindLatestRecord(
934
    const Slice& key_without_ht,
935
    DocHybridTime* latest_record_ht,
936
34.4M
    Slice* result_value) {
937
34.4M
  if (!latest_record_ht) {
938
0
    return STATUS(Corruption, "latest_record_ht should not be a null pointer");
939
0
  }
940
18.4E
  VLOG(4) << __func__ << "(" << SubDocKey::DebugSliceToString(key_without_ht) << ", "
941
18.4E
          << *latest_record_ht << ")";
942
34.4M
  DOCDB_DEBUG_SCOPE_LOG(
943
0
      SubDocKey::DebugSliceToString(key_without_ht) + ", " + yb::ToString(latest_record_ht) + ", "
944
0
      + yb::ToString(result_value),
945
0
      std::bind(&IntentAwareIterator::DebugDump, this));
946
18.4E
  DCHECK(!DebugHasHybridTime(key_without_ht)) << SubDocKey::DebugSliceToString(key_without_ht);
947
948
34.4M
  RETURN_NOT_OK(status_);
949
34.4M
  if (!valid()) {
950
77.0k
    return Status::OK();
951
77.0k
  }
952
953
34.3M
  bool found_later_intent_result = false;
954
34.3M
  if (intent_iter_.Initialized()) {
955
5.17M
    DocHybridTime dht = VERIFY_RESULT(FindMatchingIntentRecordDocHybridTime(key_without_ht));
956
5.17M
    if (dht != DocHybridTime::kInvalid && dht > *latest_record_ht) {
957
9.13k
      *latest_record_ht = dht;
958
9.13k
      found_later_intent_result = true;
959
9.13k
    }
960
5.17M
  }
961
962
34.3M
  seek_key_buffer_.Reserve(key_without_ht.size() + encoded_read_time_global_limit_.size() + 1);
963
34.3M
  seek_key_buffer_.Reset(key_without_ht);
964
34.3M
  AppendEncodedDocHt(encoded_read_time_global_limit_, &seek_key_buffer_);
965
966
34.3M
  SeekForwardRegular(seek_key_buffer_);
967
34.3M
  RETURN_NOT_OK(status_);
968
  // After SeekForwardRegular(), we need to call valid() to skip future records and see if the
969
  // current key still matches the pushed prefix if any. If it does not, we are done.
970
34.3M
  if (!valid()) {
971
0
    return Status::OK();
972
0
  }
973
974
34.3M
  bool found_later_regular_result = false;
975
34.3M
  if (iter_valid_) {
976
34.2M
    DocHybridTime dht = VERIFY_RESULT(GetMatchingRegularRecordDocHybridTime(key_without_ht));
977
34.2M
    if (dht != DocHybridTime::kInvalid && dht > *latest_record_ht) {
978
562k
      *latest_record_ht = dht;
979
562k
      found_later_regular_result = true;
980
562k
    }
981
34.2M
  }
982
983
34.3M
  if (result_value) {
984
34.3M
    if (found_later_regular_result) {
985
562k
      *result_value = iter_.value();
986
33.7M
    } else if (found_later_intent_result) {
987
9.13k
      *result_value = resolved_intent_value_;
988
9.13k
    }
989
34.3M
  }
990
34.3M
  return Status::OK();
991
34.3M
}
992
993
539M
void IntentAwareIterator::PushPrefix(const Slice& prefix) {
994
18.4E
  VLOG(4) << "PushPrefix: " << SubDocKey::DebugSliceToString(prefix);
995
539M
  prefix_stack_.push_back(prefix);
996
539M
  skip_future_records_needed_ = true;
997
539M
  skip_future_intents_needed_ = true;
998
539M
}
999
1000
539M
void IntentAwareIterator::PopPrefix() {
1001
539M
  prefix_stack_.pop_back();
1002
539M
  skip_future_records_needed_ = true;
1003
539M
  skip_future_intents_needed_ = true;
1004
97.9k
  VLOG(4) << "PopPrefix: "
1005
0
          << (prefix_stack_.empty() ? std::string()
1006
97.9k
              : SubDocKey::DebugSliceToString(prefix_stack_.back()));
1007
539M
}
1008
1009
556M
Slice IntentAwareIterator::CurrentPrefix() const {
1010
490M
  return prefix_stack_.empty() ? Slice() : prefix_stack_.back();
1011
556M
}
1012
1013
406M
void IntentAwareIterator::SkipFutureRecords(const Direction direction) {
1014
406M
  skip_future_records_needed_ = false;
1015
406M
  if (!status_.ok()) {
1016
0
    return;
1017
0
  }
1018
406M
  auto prefix = CurrentPrefix();
1019
405M
  while (iter_.Valid()) {
1020
404M
    if (!iter_.key().starts_with(prefix)) {
1021
611
      VLOG(4) << "Unmatched prefix: " << SubDocKey::DebugSliceToString(iter_.key())
1022
611
              << ", prefix: " << SubDocKey::DebugSliceToString(prefix);
1023
39.3M
      iter_valid_ = false;
1024
39.3M
      return;
1025
39.3M
    }
1026
365M
    if (!SatisfyBounds(iter_.key())) {
1027
64
      VLOG(4) << "Out of bounds: " << SubDocKey::DebugSliceToString(iter_.key())
1028
64
              << ", upperbound: " << SubDocKey::DebugSliceToString(upperbound_);
1029
3.97M
      iter_valid_ = false;
1030
3.97M
      return;
1031
3.97M
    }
1032
361M
    Slice encoded_doc_ht = iter_.key();
1033
361M
    if (encoded_doc_ht.TryConsumeByte(ValueTypeAsChar::kTransactionApplyState)) {
1034
0
      if (!NextRegular(direction)) {
1035
0
        return;
1036
0
      }
1037
0
      continue;
1038
0
    }
1039
361M
    size_t doc_ht_size = 0;
1040
361M
    auto decode_status = DocHybridTime::CheckAndGetEncodedSize(encoded_doc_ht, &doc_ht_size);
1041
361M
    if (!decode_status.ok()) {
1042
0
      LOG(ERROR) << "Decode doc ht from key failed: " << decode_status
1043
0
                 << ", key: " << iter_.key().ToDebugHexString();
1044
0
      status_ = std::move(decode_status);
1045
0
      return;
1046
0
    }
1047
361M
    encoded_doc_ht.remove_prefix(encoded_doc_ht.size() - doc_ht_size);
1048
361M
    auto value = iter_.value();
1049
361M
    auto value_type = DecodeValueType(value);
1050
18.4E
    VLOG(4) << "Checking for skip, type " << value_type << ", encoded_doc_ht: "
1051
18.4E
            << DocHybridTime::DebugSliceToString(encoded_doc_ht)
1052
18.4E
            << " value: " << value.ToDebugHexString();
1053
361M
    if (value_type == ValueType::kHybridTime) {
1054
      // Value came from a transaction, we could try to filter it by original intent time.
1055
89.3M
      Slice encoded_intent_doc_ht = value;
1056
89.3M
      encoded_intent_doc_ht.consume_byte();
1057
      // The logic here replicates part of the logic in
1058
      // DecodeStrongWriteIntentResult:: MaxAllowedValueTime for intents that have been committed
1059
      // and applied to regular RocksDB only. Note that here we are comparing encoded hybrid times,
1060
      // so comparisons are reversed vs. the un-encoded case. If a value is found "invalid", it
1061
      // can't cause a read restart. If it is found "valid", it will cause a read restart if it is
1062
      // greater than read_time.read. That last comparison is done outside this function.
1063
89.3M
      Slice max_allowed = encoded_intent_doc_ht.compare(encoded_read_time_local_limit_) > 0
1064
89.3M
          ? Slice(encoded_read_time_global_limit_)
1065
10.9k
          : Slice(encoded_read_time_read_);
1066
89.3M
      if (encoded_doc_ht.compare(max_allowed) > 0) {
1067
89.3M
        iter_valid_ = true;
1068
89.3M
        return;
1069
89.3M
      }
1070
272M
    } else if (encoded_doc_ht.compare(encoded_read_time_regular_limit_) > 0) {
1071
      // If a value does not contain the hybrid time of the intent that wrote the original
1072
      // transaction, then it either (a) originated from a single-shard transaction or (b) the
1073
      // intent hybrid time has already been garbage-collected during a compaction because the
1074
      // corresponding transaction's commit time (stored in the key) became lower than the history
1075
      // cutoff. See the following commit for the details of this intent hybrid time GC.
1076
      //
1077
      // https://github.com/yugabyte/yugabyte-db/commit/26260e0143e521e219d93f4aba6310fcc030a628
1078
      //
1079
      // encoded_read_time_regular_limit_ is simply the encoded value of max(read_ht, local_limit).
1080
      // The above condition
1081
      //
1082
      //   encoded_doc_ht.compare(encoded_read_time_regular_limit_) >= 0
1083
      //
1084
      // corresponds to the following in terms of decoded hybrid times (order is reversed):
1085
      //
1086
      //   commit_ht <= max(read_ht, local_limit)
1087
      //
1088
      // and the inverse of that can be written as
1089
      //
1090
      //   commit_ht > read_ht && commit_ht > local_limit
1091
      //
1092
      // The reason this is correct here is that in case (a) the event of writing a single-shard
1093
      // record to the tablet would certainly be after our read transaction's start time in case
1094
      // commit_ht > local_limit, so it can never cause a read restart. In case (b) we know that
1095
      // commit_ht < history_cutoff and read_ht >= history_cutoff (by definition of history cutoff)
1096
      // so commit_ht < read_ht, and in this case read restart is impossible regardless of the
1097
      // value of local_limit.
1098
272M
      iter_valid_ = true;
1099
272M
      return;
1100
272M
    }
1101
18.4E
    VLOG(4) << "Skipping because of time: " << SubDocKey::DebugSliceToString(iter_.key())
1102
18.4E
            << ", read time: " << read_time_;
1103
18.4E
    if (!NextRegular(direction)) {
1104
0
      return;
1105
0
    }
1106
18.4E
  }
1107
390k
  iter_valid_ = false;
1108
390k
}
1109
1110
240k
bool IntentAwareIterator::NextRegular(Direction direction) {
1111
240k
  switch (direction) {
1112
240k
    case Direction::kForward:
1113
240k
      iter_.Next(); // TODO(dtxn) use seek with the same key, but read limit as doc hybrid time.
1114
240k
      return true;
1115
4
    case Direction::kBackward:
1116
4
      iter_.Prev();
1117
4
      return true;
1118
0
  }
1119
1120
0
  status_ = STATUS_FORMAT(Corruption, "Unexpected direction: $0", direction);
1121
0
  LOG(ERROR) << status_;
1122
0
  iter_valid_ = false;
1123
0
  return false;
1124
0
}
1125
1126
353M
void IntentAwareIterator::SkipFutureIntents() {
1127
353M
  skip_future_intents_needed_ = false;
1128
353M
  if (!intent_iter_.Initialized() || !status_.ok()) {
1129
305M
    return;
1130
305M
  }
1131
48.1M
  auto prefix = CurrentPrefix();
1132
48.1M
  if (resolved_intent_state_ != ResolvedIntentState::kNoIntent) {
1133
1.75M
    auto compare_result = resolved_intent_key_prefix_.AsSlice().compare_prefix(prefix);
1134
18.4E
    VLOG(4) << "Checking resolved intent subdockey: "
1135
18.4E
            << DebugDumpKeyToStr(resolved_intent_key_prefix_)
1136
18.4E
            << ", against new prefix: " << DebugDumpKeyToStr(prefix) << ": "
1137
18.4E
            << compare_result;
1138
1.75M
    if (compare_result == 0) {
1139
1.75M
      if (!SatisfyBounds(resolved_intent_key_prefix_.AsSlice())) {
1140
0
        resolved_intent_state_ = ResolvedIntentState::kNoIntent;
1141
1.75M
      } else {
1142
1.75M
        resolved_intent_state_ = ResolvedIntentState::kValid;
1143
1.75M
      }
1144
1.75M
      return;
1145
306
    } else if (compare_result > 0) {
1146
306
      resolved_intent_state_ = ResolvedIntentState::kInvalidPrefix;
1147
306
      return;
1148
306
    }
1149
46.3M
  }
1150
46.3M
  SeekToSuitableIntent<Direction::kForward>();
1151
46.3M
}
1152
1153
50.7M
Status IntentAwareIterator::SetIntentUpperbound() {
1154
50.7M
  if (iter_.Valid()) {
1155
49.8M
    intent_upperbound_keybytes_.Clear();
1156
    // Strip ValueType::kHybridTime + DocHybridTime at the end of SubDocKey in iter_ and append
1157
    // to upperbound with 0xff.
1158
49.8M
    Slice subdoc_key = iter_.key();
1159
49.8M
    size_t doc_ht_size = 0;
1160
49.8M
    RETURN_NOT_OK(DocHybridTime::CheckAndGetEncodedSize(subdoc_key, &doc_ht_size));
1161
49.8M
    subdoc_key.remove_suffix(1 + doc_ht_size);
1162
49.8M
    intent_upperbound_keybytes_.AppendRawBytes(subdoc_key);
1163
18.4E
    VLOG(4) << "SetIntentUpperbound = "
1164
18.4E
            << SubDocKey::DebugSliceToString(intent_upperbound_keybytes_.AsSlice());
1165
49.8M
    intent_upperbound_keybytes_.AppendValueType(ValueType::kMaxByte);
1166
49.8M
    intent_upperbound_ = intent_upperbound_keybytes_.AsSlice();
1167
49.8M
    intent_iter_.RevalidateAfterUpperBoundChange();
1168
901k
  } else {
1169
    // In case the current position of the regular iterator is invalid, set the exclusive intent
1170
    // upperbound high to be able to find all intents higher than the last regular record.
1171
901k
    ResetIntentUpperbound();
1172
901k
  }
1173
50.7M
  return Status::OK();
1174
50.7M
}
1175
1176
861k
void IntentAwareIterator::ResetIntentUpperbound() {
1177
861k
  intent_upperbound_keybytes_.Clear();
1178
861k
  intent_upperbound_keybytes_.AppendValueType(ValueType::kHighest);
1179
861k
  intent_upperbound_ = intent_upperbound_keybytes_.AsSlice();
1180
861k
  intent_iter_.RevalidateAfterUpperBoundChange();
1181
18.4E
  VLOG(4) << "ResetIntentUpperbound = " << intent_upperbound_.ToDebugString();
1182
861k
}
1183
1184
}  // namespace docdb
1185
}  // namespace yb