YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/docdb/intent_aware_iterator.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/docdb/intent_aware_iterator.h"
15
16
#include <future>
17
18
#include "yb/common/doc_hybrid_time.h"
19
#include "yb/common/hybrid_time.h"
20
#include "yb/common/transaction.h"
21
22
#include "yb/docdb/docdb_fwd.h"
23
#include "yb/docdb/shared_lock_manager_fwd.h"
24
#include "yb/docdb/conflict_resolution.h"
25
#include "yb/docdb/doc_key.h"
26
#include "yb/docdb/doc_kv_util.h"
27
#include "yb/docdb/docdb-internal.h"
28
#include "yb/docdb/docdb_rocksdb_util.h"
29
#include "yb/docdb/intent.h"
30
#include "yb/docdb/key_bounds.h"
31
#include "yb/docdb/transaction_dump.h"
32
#include "yb/docdb/value.h"
33
#include "yb/docdb/value_type.h"
34
35
#include "yb/util/bytes_formatter.h"
36
#include "yb/util/debug-util.h"
37
#include "yb/util/result.h"
38
#include "yb/util/status_format.h"
39
#include "yb/util/trace.h"
40
41
using namespace std::literals;
42
43
namespace yb {
44
namespace docdb {
45
46
namespace {
47
48
25.2M
void GetIntentPrefixForKeyWithoutHt(const Slice& key, KeyBytes* out) {
49
25.2M
  out->Clear();
50
  // Since caller guarantees that key_bytes doesn't have hybrid time, we can simply use it
51
  // to get prefix for all related intents.
52
25.2M
  out->AppendRawBytes(key);
53
25.2M
}
54
55
13.0k
KeyBytes GetIntentPrefixForKeyWithoutHt(const Slice& key) {
56
13.0k
  KeyBytes result;
57
13.0k
  GetIntentPrefixForKeyWithoutHt(key, &result);
58
13.0k
  return result;
59
13.0k
}
60
61
886M
void AppendEncodedDocHt(const Slice& encoded_doc_ht, KeyBytes* key_bytes) {
62
886M
  key_bytes->AppendValueType(ValueType::kHybridTime);
63
886M
  key_bytes->AppendRawBytes(encoded_doc_ht);
64
886M
}
65
66
const char kStrongWriteTail[] = {
67
    ValueTypeAsChar::kIntentTypeSet,
68
    static_cast<char>(IntentTypeSet({IntentType::kStrongWrite}).ToUIntPtr()) };
69
70
const Slice kStrongWriteTailSlice = Slice(kStrongWriteTail, sizeof(kStrongWriteTail));
71
72
char kEmptyKeyStrongWriteTail[] = {
73
    ValueTypeAsChar::kGroupEnd,
74
    ValueTypeAsChar::kIntentTypeSet,
75
    static_cast<char>(IntentTypeSet({IntentType::kStrongWrite}).ToUIntPtr()) };
76
77
const Slice kEmptyKeyStrongWriteTailSlice =
78
    Slice(kEmptyKeyStrongWriteTail, sizeof(kEmptyKeyStrongWriteTail));
79
80
131M
Slice StrongWriteSuffix(const KeyBytes& key) {
81
131M
  return key.empty() ? 
kEmptyKeyStrongWriteTailSlice14
:
kStrongWriteTailSlice131M
;
82
131M
}
83
84
// We are not interested in weak and read intents here.
85
// So could just skip them.
86
8.00M
void AppendStrongWrite(KeyBytes* out) {
87
8.00M
  out->AppendRawBytes(StrongWriteSuffix(*out));
88
8.00M
}
89
90
} // namespace
91
92
namespace {
93
94
struct DecodeStrongWriteIntentResult {
95
  Slice intent_prefix;
96
  Slice intent_value;
97
  DocHybridTime intent_time;
98
  DocHybridTime value_time;
99
  IntentTypeSet intent_types;
100
101
  // Whether this intent from the same transaction as specified in context.
102
  bool same_transaction = false;
103
104
0
  std::string ToString() const {
105
0
    return Format("{ intent_prefix: $0 intent_value: $1 intent_time: $2 value_time: $3 "
106
0
                  "same_transaction: $4 intent_types: $5 }",
107
0
                  intent_prefix.ToDebugHexString(), intent_value.ToDebugHexString(), intent_time,
108
0
                  value_time, same_transaction, intent_types);
109
0
  }
110
111
  // Returns the upper limit for the "value time" of an intent in order for the intent to be visible
112
  // in the read results. The "value time" is defined as follows:
113
  //   - For uncommitted transactions, the "value time" is the time when the intent was written.
114
  //     Note that same_transaction or in_txn_limit could only be set for uncommited transactions.
115
  //   - For committed transactions, the "value time" is the commit time.
116
  //
117
  // The logic here is as follows:
118
  //   - When a transaction is reading its own intents, the in_txn_limit allows a statement to
119
  //     avoid seeing its own partial results. This is necessary for statements such as INSERT ...
120
  //     SELECT to avoid reading rows that the same statement generated and going into an infinite
121
  //     loop.
122
  //   - If an intent's hybrid time is greater than the tablet's local limit, then this intent
123
  //     cannot lead to a read restart and we only need to see it if its commit time is less than or
124
  //     equal to read_time.
125
  //   - If an intent's hybrid time is <= than the tablet's local limit, then we cannot claim that
126
  //     the intent was written after the read transaction began based on the local limit, and we
127
  //     must compare the intent's commit time with global_limit and potentially perform a read
128
  //     restart, because the transaction that wrote the intent might have been committed before our
129
  //     read transaction begin.
130
4.93M
  HybridTime MaxAllowedValueTime(const ReadHybridTime& read_time) const {
131
4.93M
    if (same_transaction) {
132
4.34M
      return read_time.in_txn_limit;
133
4.34M
    }
134
596k
    return intent_time.hybrid_time() > read_time.local_limit
135
596k
        ? 
read_time.read692
:
read_time.global_limit595k
;
136
4.93M
  }
137
};
138
139
0
std::ostream& operator<<(std::ostream& out, const DecodeStrongWriteIntentResult& result) {
140
0
  return out << result.ToString();
141
0
}
142
143
// Decodes intent based on intent_iterator and its transaction commit time if intent is a strong
144
// write intent, intent is not for row locking, and transaction is already committed at specified
145
// time or is current transaction.
146
// Returns HybridTime::kMin as value_time otherwise.
147
// For current transaction returns intent record hybrid time as value_time.
148
// Consumes intent from value_slice leaving only value itself.
149
Result<DecodeStrongWriteIntentResult> DecodeStrongWriteIntent(
150
    HybridTime global_limit,
151
    const TransactionOperationContext& txn_op_context,
152
    rocksdb::Iterator* intent_iter,
153
6.14M
    TransactionStatusCache* transaction_status_cache) {
154
6.14M
  DecodeStrongWriteIntentResult result;
155
6.14M
  auto decoded_intent_key = VERIFY_RESULT(DecodeIntentKey(intent_iter->key()));
156
0
  result.intent_prefix = decoded_intent_key.intent_prefix;
157
6.14M
  result.intent_types = decoded_intent_key.intent_types;
158
6.14M
  if (result.intent_types.Test(IntentType::kStrongWrite)) {
159
5.57M
    auto intent_value = intent_iter->value();
160
5.57M
    auto decoded_intent_value = VERIFY_RESULT(DecodeIntentValue(intent_value));
161
162
0
    auto decoded_txn_id = decoded_intent_value.transaction_id;
163
5.57M
    auto decoded_subtxn_id = decoded_intent_value.subtransaction_id;
164
165
5.57M
    result.intent_value = decoded_intent_value.body;
166
5.57M
    result.intent_time = decoded_intent_key.doc_ht;
167
5.57M
    result.same_transaction = decoded_txn_id == txn_op_context.transaction_id;
168
169
    // By setting the value time to kMin, we ensure the caller ignores this intent. This is true
170
    // because the caller is skipping all intents written before or at the same time as
171
    // intent_dht_from_same_txn_ or resolved_intent_txn_dht_, which of course are greater than or
172
    // equal to DocHybridTime::kMin.
173
5.57M
    if (result.intent_value.starts_with(ValueTypeAsChar::kRowLock)) {
174
125k
      result.value_time = DocHybridTime::kMin;
175
5.44M
    } else if (result.same_transaction) {
176
4.62M
      if (txn_op_context.subtransaction.aborted.Test(decoded_subtxn_id)) {
177
        // If this intent is from the same transaction, we can check the aborted set from this
178
        // txn_op_context to see whether the intent is still live. If not, mask it from the caller.
179
752
        result.value_time = DocHybridTime::kMin;
180
4.62M
      } else {
181
4.62M
        result.value_time = decoded_intent_key.doc_ht;
182
4.62M
      }
183
4.62M
    } else 
if (817k
result.intent_time.hybrid_time() > global_limit817k
) {
184
108k
      VTRACE(1, "Ignoring intent from a different txn written after read.global_limit");
185
108k
      result.value_time = DocHybridTime::kMin;
186
709k
    } else {
187
709k
      auto commit_data = VERIFY_RESULT(transaction_status_cache->GetCommitData(decoded_txn_id));
188
0
      auto commit_ht = commit_data.commit_ht;
189
709k
      auto aborted_subtxn_set = commit_data.aborted_subtxn_set;
190
709k
      auto is_aborted_subtxn = aborted_subtxn_set.Test(decoded_subtxn_id);
191
709k
      result.value_time = commit_ht == HybridTime::kMin || 
is_aborted_subtxn629k
192
709k
          ? 
DocHybridTime::kMin79.6k
193
709k
          : 
DocHybridTime(commit_ht, decoded_intent_value.write_id)629k
;
194
709k
      VLOG(4) << "Transaction id: " << decoded_txn_id
195
68
              << ", subtransaction id: " << decoded_subtxn_id
196
68
              << ", value time: " << result.value_time
197
68
              << ", value: " << result.intent_value.ToDebugHexString()
198
68
              << ", aborted subtxn set: " << aborted_subtxn_set.ToString();
199
709k
    }
200
5.57M
  } else {
201
569k
    result.value_time = DocHybridTime::kMin;
202
569k
  }
203
6.14M
  return result;
204
6.14M
}
205
206
// Given that key is well-formed DocDB encoded key, checks if it is an intent key for the same key
207
// as intent_prefix. If key is not well-formed DocDB encoded key, result could be true or false.
208
4.61M
bool IsIntentForTheSameKey(const Slice& key, const Slice& intent_prefix) {
209
4.61M
  return key.starts_with(intent_prefix) &&
210
4.61M
         
key.size() > intent_prefix.size()346k
&&
211
4.61M
         
IntentValueType(key[intent_prefix.size()])346k
;
212
4.61M
}
213
214
0
std::string DebugDumpKeyToStr(const Slice &key) {
215
0
  return key.ToDebugString() + " (" + SubDocKey::DebugSliceToString(key) + ")";
216
0
}
217
218
0
std::string DebugDumpKeyToStr(const KeyBytes &key) {
219
0
  return DebugDumpKeyToStr(key.AsSlice());
220
0
}
221
222
84.6M
bool DebugHasHybridTime(const Slice& subdoc_key_encoded) {
223
84.6M
  SubDocKey subdoc_key;
224
84.6M
  CHECK(subdoc_key.FullyDecodeFromKeyWithOptionalHybridTime(subdoc_key_encoded).ok());
225
84.6M
  return subdoc_key.has_hybrid_time();
226
84.6M
}
227
228
62.2M
std::string EncodeHybridTime(HybridTime value) {
229
62.2M
  return DocHybridTime(value, kMaxWriteId).EncodedInDocDbFormat();
230
62.2M
}
231
232
} // namespace
233
234
IntentAwareIterator::IntentAwareIterator(
235
    const DocDB& doc_db,
236
    const rocksdb::ReadOptions& read_opts,
237
    CoarseTimePoint deadline,
238
    const ReadHybridTime& read_time,
239
    const TransactionOperationContext& txn_op_context)
240
    : read_time_(read_time),
241
      encoded_read_time_read_(EncodeHybridTime(read_time_.read)),
242
      encoded_read_time_local_limit_(EncodeHybridTime(read_time_.local_limit)),
243
      encoded_read_time_global_limit_(EncodeHybridTime(read_time_.global_limit)),
244
      encoded_read_time_regular_limit_(
245
          read_time_.local_limit > read_time_.read ? Slice(encoded_read_time_local_limit_)
246
                                                   : Slice(encoded_read_time_read_)),
247
      txn_op_context_(txn_op_context),
248
20.7M
      transaction_status_cache_(txn_op_context_, read_time, deadline) {
249
20.7M
  VTRACE(1, __func__);
250
18.4E
  VLOG(4) << "IntentAwareIterator, read_time: " << read_time
251
18.4E
          << ", txn_op_context: " << txn_op_context_;
252
253
20.7M
  if (txn_op_context) {
254
12.3M
    VTRACE(1, "Checking MinRunningTime");
255
12.3M
    const auto min_running_ht = txn_op_context.txn_status_manager->MinRunningHybridTime();
256
12.3M
    if (min_running_ht != HybridTime::kMax && 
min_running_ht < read_time.global_limit7.62M
) {
257
7.61M
      intent_iter_ = docdb::CreateRocksDBIterator(doc_db.intents,
258
7.61M
                                                  doc_db.key_bounds,
259
7.61M
                                                  docdb::BloomFilterMode::DONT_USE_BLOOM_FILTER,
260
7.61M
                                                  boost::none,
261
7.61M
                                                  rocksdb::kDefaultQueryId,
262
7.61M
                                                  nullptr /* file_filter */,
263
7.61M
                                                  &intent_upperbound_);
264
7.61M
    } else {
265
4.76M
      VLOG(4) << "No relevant transactions running: "
266
16.8k
              << "min_running_ht=" << min_running_ht << ", "
267
16.8k
              << "global_limit=" << read_time.global_limit;
268
4.76M
    }
269
12.3M
  }
270
20.7M
  VTRACE(2, "Done Checking MinRunningTime");
271
  // WARNING: Is is important for regular DB iterator to be created after intents DB iterator,
272
  // otherwise consistency could break, for example in following scenario:
273
  // 1) Transaction is T1 committed with value v1 for k1, but not yet applied to regular DB.
274
  // 2) Client reads v1 for k1.
275
  // 3) Regular DB iterator is created on a regular DB snapshot containing no values for k1.
276
  // 4) Transaction T1 is applied, k1->v1 is written into regular DB, intent k1->v1 is deleted.
277
  // 5) Intents DB iterator is created on an intents DB snapshot containing no intents for k1.
278
  // 6) Client reads no values for k1.
279
20.7M
  iter_ = BoundedRocksDbIterator(doc_db.regular, read_opts, doc_db.key_bounds);
280
20.7M
  VTRACE(2, "Created iterator");
281
20.7M
}
282
283
3.10k
void IntentAwareIterator::Seek(const DocKey &doc_key) {
284
3.10k
  Seek(doc_key.Encode());
285
3.10k
}
286
287
35.1M
void IntentAwareIterator::Seek(const Slice& key) {
288
18.4E
  VLOG(4) << "Seek(" << SubDocKey::DebugSliceToString(key) << ")";
289
35.1M
  DOCDB_DEBUG_SCOPE_LOG(
290
0
      key.ToDebugString(),
291
0
      std::bind(&IntentAwareIterator::DebugDump, this));
292
35.1M
  if (!status_.ok()) {
293
0
    return;
294
0
  }
295
296
35.1M
  ROCKSDB_SEEK(&iter_, key);
297
35.1M
  skip_future_records_needed_ = true;
298
299
35.1M
  if (intent_iter_.Initialized()) {
300
8.01M
    seek_intent_iter_needed_ = SeekIntentIterNeeded::kSeek;
301
8.01M
    GetIntentPrefixForKeyWithoutHt(key, &seek_key_buffer_);
302
8.01M
    AppendStrongWrite(&seek_key_buffer_);
303
8.01M
  }
304
35.1M
}
305
306
83.6M
void IntentAwareIterator::SeekForward(const Slice& key) {
307
83.6M
  KeyBytes key_bytes;
308
  // Reserve space for key plus kMaxBytesPerEncodedHybridTime + 1 bytes for SeekForward() below to
309
  // avoid extra realloc while appending the read time.
310
83.6M
  key_bytes.Reserve(key.size() + kMaxBytesPerEncodedHybridTime + 1);
311
83.6M
  key_bytes.AppendRawBytes(key);
312
83.6M
  SeekForward(&key_bytes);
313
83.6M
}
314
315
803M
void IntentAwareIterator::SeekForward(KeyBytes* key_bytes) {
316
18.4E
  VLOG(4) << "SeekForward(" << SubDocKey::DebugSliceToString(*key_bytes) << ")";
317
803M
  DOCDB_DEBUG_SCOPE_LOG(
318
0
      SubDocKey::DebugSliceToString(*key_bytes),
319
0
      std::bind(&IntentAwareIterator::DebugDump, this));
320
803M
  if (!status_.ok()) {
321
0
    return;
322
0
  }
323
324
803M
  const size_t key_size = key_bytes->size();
325
803M
  AppendEncodedDocHt(encoded_read_time_global_limit_, key_bytes);
326
803M
  SeekForwardRegular(*key_bytes);
327
803M
  key_bytes->Truncate(key_size);
328
803M
  if (intent_iter_.Initialized() && 
status_.ok()123M
) {
329
123M
    UpdatePlannedIntentSeekForward(
330
123M
        *key_bytes, StrongWriteSuffix(*key_bytes), /* use_suffix_for_prefix= */ false);
331
123M
  }
332
803M
}
333
334
void IntentAwareIterator::UpdatePlannedIntentSeekForward(const Slice& key,
335
                                                         const Slice& suffix,
336
143M
                                                         bool use_suffix_for_prefix) {
337
143M
  if (seek_intent_iter_needed_ != SeekIntentIterNeeded::kNoNeed &&
338
143M
      
seek_key_buffer_.AsSlice().GreaterOrEqual(key, suffix)0
) {
339
0
    return;
340
0
  }
341
143M
  seek_key_buffer_.Clear();
342
143M
  seek_key_buffer_.AppendRawBytes(key);
343
143M
  seek_key_buffer_.AppendRawBytes(suffix);
344
143M
  if (
seek_intent_iter_needed_ == SeekIntentIterNeeded::kNoNeed143M
) {
345
143M
    seek_intent_iter_needed_ = SeekIntentIterNeeded::kSeekForward;
346
143M
  }
347
143M
  seek_key_prefix_ = seek_key_buffer_.AsSlice();
348
143M
  if (!use_suffix_for_prefix) {
349
123M
    seek_key_prefix_.remove_suffix(suffix.size());
350
123M
  }
351
143M
}
352
353
// TODO: If TTL rows are ever supported on subkeys, this may need to change appropriately.
354
// Otherwise, this function might seek past the TTL merge record, but not the original
355
// record for the actual subkey.
356
98.3M
void IntentAwareIterator::SeekPastSubKey(const Slice& key) {
357
18.4E
  VLOG(4) << "SeekPastSubKey(" << SubDocKey::DebugSliceToString(key) << ")";
358
98.3M
  if (!status_.ok()) {
359
0
    return;
360
0
  }
361
362
98.3M
  docdb::SeekPastSubKey(key, &iter_);
363
98.3M
  skip_future_records_needed_ = true;
364
98.3M
  if (intent_iter_.Initialized() && 
status_.ok()2.29M
) {
365
    // Skip all intents for subdoc_key.
366
2.29M
    char kSuffix = ValueTypeAsChar::kGreaterThanIntentType;
367
2.29M
    UpdatePlannedIntentSeekForward(key, Slice(&kSuffix, 1));
368
2.29M
  }
369
98.3M
}
370
371
135M
void IntentAwareIterator::SeekOutOfSubDoc(KeyBytes* key_bytes) {
372
18.4E
  VLOG(4) << "SeekOutOfSubDoc(" << SubDocKey::DebugSliceToString(*key_bytes) << ")";
373
135M
  if (!status_.ok()) {
374
0
    return;
375
0
  }
376
377
135M
  docdb::SeekOutOfSubKey(key_bytes, &iter_);
378
135M
  skip_future_records_needed_ = true;
379
135M
  if (intent_iter_.Initialized() && 
status_.ok()17.8M
) {
380
    // See comment for SubDocKey::AdvanceOutOfSubDoc.
381
17.8M
    const char kSuffix = ValueTypeAsChar::kMaxByte;
382
17.8M
    UpdatePlannedIntentSeekForward(*key_bytes, Slice(&kSuffix, 1));
383
17.8M
  }
384
135M
}
385
386
133M
void IntentAwareIterator::SeekOutOfSubDoc(const Slice& key) {
387
133M
  KeyBytes key_bytes;
388
  // Reserve space for key + 1 byte for docdb::SeekOutOfSubKey() above to avoid extra realloc while
389
  // appending kMaxByte.
390
133M
  key_bytes.Reserve(key.size() + 1);
391
133M
  key_bytes.AppendRawBytes(key);
392
133M
  SeekOutOfSubDoc(&key_bytes);
393
133M
}
394
395
1.23G
bool IntentAwareIterator::HasCurrentEntry() {
396
1.23G
  return iter_valid_ || 
resolved_intent_state_ == ResolvedIntentState::kValid139M
;
397
1.23G
}
398
399
716k
void IntentAwareIterator::SeekToLastDocKey() {
400
716k
  iter_.SeekToLast();
401
716k
  SkipFutureRecords(Direction::kBackward);
402
716k
  if (intent_iter_.Initialized()) {
403
0
    ResetIntentUpperbound();
404
0
    intent_iter_.SeekToLast();
405
0
    SeekToSuitableIntent<Direction::kBackward>();
406
0
    seek_intent_iter_needed_ = SeekIntentIterNeeded::kNoNeed;
407
0
    skip_future_intents_needed_ = false;
408
0
  }
409
716k
  if (HasCurrentEntry()) {
410
714k
    SeekToLatestDocKeyInternal();
411
714k
  }
412
716k
}
413
414
template <class T>
415
634k
void Assign(const T& value, T* out) {
416
634k
  if (out) {
417
634k
    *out = value;
418
634k
  }
419
634k
}
void yb::docdb::Assign<yb::Slice>(yb::Slice const&, yb::Slice*)
Line
Count
Source
415
317k
void Assign(const T& value, T* out) {
416
317k
  if (out) {
417
317k
    *out = value;
418
317k
  }
419
317k
}
void yb::docdb::Assign<yb::DocHybridTime>(yb::DocHybridTime const&, yb::DocHybridTime*)
Line
Count
Source
415
317k
void Assign(const T& value, T* out) {
416
317k
  if (out) {
417
317k
    *out = value;
418
317k
  }
419
317k
}
420
421
// If we reach a different key, stop seeking.
422
Status IntentAwareIterator::NextFullValue(
423
    DocHybridTime* latest_record_ht,
424
    Slice* result_value,
425
317k
    Slice* final_key) {
426
317k
  if (!latest_record_ht || !result_value)
427
0
    return STATUS(Corruption, "The arguments latest_record_ht and "
428
317k
                              "result_value cannot be null pointers.");
429
317k
  RETURN_NOT_OK(status_);
430
317k
  Slice v;
431
317k
  if (!valid() || !IsMergeRecord(v = value())) {
432
317k
    auto key_data = VERIFY_RESULT(FetchKey());
433
0
    Assign(key_data.key, final_key);
434
317k
    Assign(key_data.write_time, latest_record_ht);
435
317k
    *result_value = v;
436
317k
    return status_;
437
317k
  }
438
439
180
  *latest_record_ht = DocHybridTime::kMin;
440
180
  const auto key_data = VERIFY_RESULT(FetchKey());
441
0
  auto key = key_data.key;
442
180
  const size_t key_size = key.size();
443
180
  bool found_record = false;
444
445
390
  while ((found_record = iter_.Valid()) &&  // as long as we're pointing to a record
446
390
         (key = iter_.key()).starts_with(key_data.key) &&  // with the same key we started with
447
390
         key[key_size] == ValueTypeAsChar::kHybridTime && // whose key ends with a HT
448
390
         IsMergeRecord(v = iter_.value())) { // and whose value is a merge record
449
210
    iter_.Next(); // advance the iterator
450
210
  }
451
452
180
  if (found_record) {
453
180
    *result_value = v;
454
180
    *latest_record_ht = VERIFY_RESULT(DocHybridTime::DecodeFromEnd(&key));
455
0
    Assign(key, final_key);
456
180
  }
457
458
180
  found_record = false;
459
180
  if (intent_iter_.Initialized()) {
460
0
    while ((found_record = IsIntentForTheSameKey(intent_iter_.key(), key_data.key)) &&
461
0
           IsMergeRecord(v = intent_iter_.value())) {
462
0
      intent_iter_.Next();
463
0
    }
464
0
    DocHybridTime doc_ht;
465
0
    if (found_record && !(key = intent_iter_.key()).empty() &&
466
0
        (doc_ht = VERIFY_RESULT(DocHybridTime::DecodeFromEnd(&key))) >= *latest_record_ht) {
467
0
      *latest_record_ht = doc_ht;
468
0
      *result_value = v;
469
0
      Assign(key, final_key);
470
0
    }
471
0
  }
472
473
180
  if (*latest_record_ht == DocHybridTime::kMin) {
474
0
    iter_valid_ = false;
475
0
  }
476
180
  return status_;
477
180
}
478
479
454k
bool IntentAwareIterator::PreparePrev(const Slice& key) {
480
18.4E
  VLOG(4) << __func__ << "(" << SubDocKey::DebugSliceToString(key) << ")";
481
482
454k
  ROCKSDB_SEEK(&iter_, key);
483
484
454k
  if (iter_.Valid()) {
485
451k
    iter_.Prev();
486
451k
  } else {
487
2.89k
    iter_.SeekToLast();
488
2.89k
  }
489
454k
  SkipFutureRecords(Direction::kBackward);
490
491
454k
  if (intent_iter_.Initialized()) {
492
13.0k
    ResetIntentUpperbound();
493
13.0k
    ROCKSDB_SEEK(&intent_iter_, GetIntentPrefixForKeyWithoutHt(key));
494
13.0k
    if (intent_iter_.Valid()) {
495
13.0k
      intent_iter_.Prev();
496
13.0k
    } else {
497
2
      intent_iter_.SeekToLast();
498
2
    }
499
13.0k
    SeekToSuitableIntent<Direction::kBackward>();
500
13.0k
    seek_intent_iter_needed_ = SeekIntentIterNeeded::kNoNeed;
501
13.0k
    skip_future_intents_needed_ = false;
502
13.0k
  }
503
504
454k
  return HasCurrentEntry();
505
454k
}
506
507
0
void IntentAwareIterator::PrevSubDocKey(const KeyBytes& key_bytes) {
508
0
  if (PreparePrev(key_bytes)) {
509
0
    SeekToLatestSubDocKeyInternal();
510
0
  }
511
0
}
512
513
0
void IntentAwareIterator::PrevDocKey(const DocKey& doc_key) {
514
0
  PrevDocKey(doc_key.Encode().AsSlice());
515
0
}
516
517
454k
void IntentAwareIterator::PrevDocKey(const Slice& encoded_doc_key) {
518
454k
  if (PreparePrev(encoded_doc_key)) {
519
454k
    SeekToLatestDocKeyInternal();
520
454k
  }
521
454k
}
522
523
1.16M
Slice IntentAwareIterator::LatestSubDocKey() {
524
1.16M
  DCHECK(HasCurrentEntry())
525
31
      << "Expected iter_valid(" << iter_valid_ << ") || resolved_intent_state_("
526
31
      << resolved_intent_state_ << ") == ResolvedIntentState::kValid";
527
1.16M
  return IsEntryRegular(/* descending */ true) ? 
iter_.key()1.16M
528
1.16M
                                               : 
resolved_intent_key_prefix_.AsSlice()3.97k
;
529
1.16M
}
530
531
0
void IntentAwareIterator::SeekToLatestSubDocKeyInternal() {
532
0
  auto subdockey_slice = LatestSubDocKey();
533
534
  // Strip the hybrid time and seek the slice.
535
0
  auto doc_ht = DocHybridTime::DecodeFromEnd(&subdockey_slice);
536
0
  if (!doc_ht.ok()) {
537
0
    status_ = doc_ht.status();
538
0
    return;
539
0
  }
540
0
  subdockey_slice.remove_suffix(1);
541
0
  Seek(subdockey_slice);
542
0
}
543
544
1.16M
void IntentAwareIterator::SeekToLatestDocKeyInternal() {
545
1.16M
  auto subdockey_slice = LatestSubDocKey();
546
547
  // Seek to the first key for row containing found subdockey.
548
1.16M
  auto dockey_size = DocKey::EncodedSize(subdockey_slice, DocKeyPart::kWholeDocKey);
549
1.16M
  if (!dockey_size.ok()) {
550
0
    status_ = dockey_size.status();
551
0
    return;
552
0
  }
553
1.16M
  Seek(Slice(subdockey_slice.data(), *dockey_size));
554
1.16M
}
555
556
1.23G
void IntentAwareIterator::SeekIntentIterIfNeeded() {
557
1.23G
  if (seek_intent_iter_needed_ == SeekIntentIterNeeded::kNoNeed || 
!status_.ok()149M
) {
558
1.08G
    return;
559
1.08G
  }
560
149M
  status_ = SetIntentUpperbound();
561
149M
  if (!status_.ok()) {
562
0
    return;
563
0
  }
564
149M
  switch (seek_intent_iter_needed_) {
565
0
    case SeekIntentIterNeeded::kNoNeed:
566
0
      break;
567
7.87M
    case SeekIntentIterNeeded::kSeek:
568
7.87M
      VLOG
(4) << __func__ << ", seek: " << SubDocKey::DebugSliceToString(seek_key_buffer_)242
;
569
7.87M
      ROCKSDB_SEEK(&intent_iter_, seek_key_buffer_);
570
7.87M
      SeekToSuitableIntent<Direction::kForward>();
571
7.87M
      seek_intent_iter_needed_ = SeekIntentIterNeeded::kNoNeed;
572
7.87M
      return;
573
141M
    case SeekIntentIterNeeded::kSeekForward:
574
141M
      SeekForwardToSuitableIntent();
575
141M
      seek_intent_iter_needed_ = SeekIntentIterNeeded::kNoNeed;
576
141M
      return;
577
149M
  }
578
0
  FATAL_INVALID_ENUM_VALUE(SeekIntentIterNeeded, seek_intent_iter_needed_);
579
0
}
580
581
1.23G
bool IntentAwareIterator::valid() {
582
1.23G
  if (skip_future_records_needed_) {
583
1.14G
    SkipFutureRecords(Direction::kForward);
584
1.14G
  }
585
1.23G
  SeekIntentIterIfNeeded();
586
1.23G
  if (skip_future_intents_needed_) {
587
1.03G
    SkipFutureIntents();
588
1.03G
  }
589
1.23G
  return 
!status_.ok()1.23G
|| HasCurrentEntry();
590
1.23G
}
591
592
1.60G
bool IntentAwareIterator::IsEntryRegular(bool descending) {
593
1.60G
  if (PREDICT_FALSE(!iter_valid_)) {
594
8.87M
    return false;
595
8.87M
  }
596
1.60G
  if (resolved_intent_state_ == ResolvedIntentState::kValid) {
597
1.44M
    return (iter_.key().compare(resolved_intent_sub_doc_key_encoded_) < 0) != descending;
598
1.44M
  }
599
1.59G
  return true;
600
1.60G
}
601
602
849M
Result<FetchKeyResult> IntentAwareIterator::FetchKey() {
603
849M
  RETURN_NOT_OK(status_);
604
849M
  FetchKeyResult result;
605
849M
  if (IsEntryRegular()) {
606
843M
    result.key = iter_.key();
607
843M
    result.write_time = VERIFY_RESULT(DocHybridTime::DecodeFromEnd(&result.key));
608
18.4E
    DCHECK(result.key.ends_with(ValueTypeAsChar::kHybridTime)) << result.key.ToDebugString();
609
843M
    result.key.remove_suffix(1);
610
843M
    result.same_transaction = false;
611
843M
    max_seen_ht_.MakeAtLeast(result.write_time.hybrid_time());
612
843M
  } else {
613
6.61M
    DCHECK_EQ(ResolvedIntentState::kValid, resolved_intent_state_);
614
6.61M
    result.key = resolved_intent_key_prefix_.AsSlice();
615
6.61M
    result.write_time = GetIntentDocHybridTime();
616
6.61M
    result.same_transaction = ResolvedIntentFromSameTransaction();
617
6.61M
    max_seen_ht_.MakeAtLeast(resolved_intent_txn_dht_.hybrid_time());
618
6.61M
  }
619
849M
  VLOG(4) << "Fetched key " << SubDocKey::DebugSliceToString(result.key)
620
97.2k
          << ", regular: " << IsEntryRegular()
621
97.2k
          << ", with time: " << result.write_time
622
97.2k
          << ", while read bounds are: " << read_time_;
623
624
849M
  YB_TRANSACTION_DUMP(
625
849M
      Read, txn_op_context_ ? txn_op_context_.txn_status_manager->tablet_id() : TabletId(),
626
849M
      txn_op_context_ ? txn_op_context_.transaction_id : TransactionId::Nil(),
627
849M
      read_time_, result.write_time, result.same_transaction,
628
849M
      result.key.size(), result.key, value().size(), value());
629
630
849M
  return result;
631
849M
}
632
633
763M
Slice IntentAwareIterator::value() {
634
763M
  if (IsEntryRegular()) {
635
18.4E
    VLOG(4) << "IntentAwareIterator::value() returning iter_.value(): "
636
18.4E
            << iter_.value().ToDebugHexString() << " or " << FormatSliceAsStr(iter_.value());
637
758M
    return iter_.value();
638
758M
  } else {
639
4.37M
    DCHECK_EQ(ResolvedIntentState::kValid, resolved_intent_state_);
640
18.4E
    VLOG(4) << "IntentAwareIterator::value() returning resolved_intent_value_: "
641
18.4E
            << resolved_intent_value_.AsSlice().ToDebugHexString();
642
4.37M
    return resolved_intent_value_;
643
4.37M
  }
644
763M
}
645
646
886M
void IntentAwareIterator::SeekForwardRegular(const Slice& slice) {
647
18.4E
  VLOG(4) << "SeekForwardRegular(" << SubDocKey::DebugSliceToString(slice) << ")";
648
886M
  docdb::SeekForward(slice, &iter_);
649
886M
  skip_future_records_needed_ = true;
650
886M
}
651
652
1.04G
bool IntentAwareIterator::SatisfyBounds(const Slice& slice) {
653
1.04G
  return upperbound_.empty() || 
slice.compare(upperbound_) <= 0911M
;
654
1.04G
}
655
656
6.14M
void IntentAwareIterator::ProcessIntent() {
657
6.14M
  auto decode_result = DecodeStrongWriteIntent(
658
6.14M
      read_time_.global_limit, txn_op_context_, &intent_iter_, &transaction_status_cache_);
659
6.14M
  if (!decode_result.ok()) {
660
0
    status_ = decode_result.status();
661
0
    return;
662
0
  }
663
6.14M
  VLOG(4) << "Intent decode: " << DebugIntentKeyToString(intent_iter_.key())
664
3
          << " => " << intent_iter_.value().ToDebugHexString() << ", result: " << *decode_result;
665
6.14M
  DOCDB_DEBUG_LOG(
666
6.14M
      "resolved_intent_txn_dht_: $0 value_time: $1 read_time: $2",
667
6.14M
      resolved_intent_txn_dht_.ToString(),
668
6.14M
      decode_result->value_time.ToString(),
669
6.14M
      read_time_.ToString());
670
6.14M
  auto resolved_intent_time = decode_result->same_transaction ? 
intent_dht_from_same_txn_4.75M
671
6.14M
                                                              : 
resolved_intent_txn_dht_1.38M
;
672
  // If we already resolved intent that is newer that this one, we should ignore current
673
  // intent because we are interested in the most recent intent only.
674
6.14M
  if (decode_result->value_time <= resolved_intent_time) {
675
1.20M
    return;
676
1.20M
  }
677
678
  // Ignore intent past read limit.
679
4.93M
  if (decode_result->value_time.hybrid_time() > decode_result->MaxAllowedValueTime(read_time_)) {
680
1.55k
    return;
681
1.55k
  }
682
683
4.93M
  if (resolved_intent_state_ == ResolvedIntentState::kNoIntent) {
684
4.92M
    resolved_intent_key_prefix_.Reset(decode_result->intent_prefix);
685
4.92M
    auto prefix = CurrentPrefix();
686
4.92M
    if (!decode_result->intent_prefix.starts_with(prefix)) {
687
0
      resolved_intent_state_ = ResolvedIntentState::kInvalidPrefix;
688
4.92M
    } else if (!SatisfyBounds(decode_result->intent_prefix)) {
689
0
      resolved_intent_state_ = ResolvedIntentState::kNoIntent;
690
4.92M
    } else {
691
4.92M
      resolved_intent_state_ = ResolvedIntentState::kValid;
692
4.92M
    }
693
4.92M
  }
694
4.93M
  if (decode_result->same_transaction) {
695
4.34M
    intent_dht_from_same_txn_ = decode_result->value_time;
696
    // We set resolved_intent_txn_dht_ to maximum possible time (time higher than read_time_.read
697
    // will cause read restart or will be ignored if higher than read_time_.global_limit) in
698
    // order to ignore intents/values from other transactions. But we save origin intent time into
699
    // intent_dht_from_same_txn_, so we can compare time of intents for the same key from the same
700
    // transaction and select the latest one.
701
4.34M
    resolved_intent_txn_dht_ = DocHybridTime(read_time_.read, kMaxWriteId);
702
4.34M
  } else {
703
595k
    resolved_intent_txn_dht_ = decode_result->value_time;
704
595k
  }
705
4.93M
  resolved_intent_value_.Reset(decode_result->intent_value);
706
4.93M
}
707
708
4.92M
void IntentAwareIterator::UpdateResolvedIntentSubDocKeyEncoded() {
709
4.92M
  resolved_intent_sub_doc_key_encoded_.Reset(resolved_intent_key_prefix_.AsSlice());
710
4.92M
  resolved_intent_sub_doc_key_encoded_.AppendValueType(ValueType::kHybridTime);
711
4.92M
  resolved_intent_sub_doc_key_encoded_.AppendHybridTime(resolved_intent_txn_dht_);
712
4.92M
  VLOG(4) << "Resolved intent SubDocKey: "
713
5
          << DebugDumpKeyToStr(resolved_intent_sub_doc_key_encoded_);
714
4.92M
}
715
716
158M
void IntentAwareIterator::SeekForwardToSuitableIntent() {
717
18.4E
  VLOG(4) << __func__ << "(" << DebugDumpKeyToStr(seek_key_buffer_) << ")";
718
719
158M
  DOCDB_DEBUG_SCOPE_LOG(seek_key_buffer_.ToString(),
720
0
                        std::bind(&IntentAwareIterator::DebugDump, this));
721
158M
  if (resolved_intent_state_ != ResolvedIntentState::kNoIntent &&
722
158M
      
resolved_intent_key_prefix_.CompareTo(seek_key_prefix_) >= 06.32M
) {
723
18.4E
    VLOG(4) << __func__ << ", has suitable " << AsString(resolved_intent_state_) << " intent: "
724
18.4E
            << DebugDumpKeyToStr(resolved_intent_key_prefix_);
725
1.54M
    return;
726
1.54M
  }
727
728
157M
  if (VLOG_IS_ON(4)) {
729
0
    if (resolved_intent_state_ != ResolvedIntentState::kNoIntent) {
730
0
      VLOG(4) << __func__ << ", has NOT suitable " << AsString(resolved_intent_state_)
731
0
              << " intent: " << DebugDumpKeyToStr(resolved_intent_key_prefix_);
732
0
    }
733
734
0
    if (intent_iter_.Valid()) {
735
0
      VLOG(4) << __func__ << ", current position: " << DebugDumpKeyToStr(intent_iter_.key());
736
0
    } else {
737
0
      VLOG(4) << __func__ << ", iterator invalid";
738
0
    }
739
0
  }
740
741
157M
  docdb::SeekForward(seek_key_buffer_.AsSlice(), &intent_iter_);
742
157M
  SeekToSuitableIntent<Direction::kForward>();
743
157M
}
744
745
template<Direction direction>
746
300M
void IntentAwareIterator::SeekToSuitableIntent() {
747
300M
  DOCDB_DEBUG_SCOPE_LOG
(/* msg */ "", std::bind(&IntentAwareIterator::DebugDump, this));0
748
300M
  resolved_intent_state_ = ResolvedIntentState::kNoIntent;
749
300M
  resolved_intent_txn_dht_ = DocHybridTime::kMin;
750
300M
  intent_dht_from_same_txn_ = DocHybridTime::kMin;
751
300M
  auto prefix = CurrentPrefix();
752
753
  // Find latest suitable intent for the first SubDocKey having suitable intents.
754
307M
  while (intent_iter_.Valid()) {
755
14.7M
    auto intent_key = intent_iter_.key();
756
14.7M
    if (intent_key[0] == ValueTypeAsChar::kTransactionId) {
757
      // If the intent iterator ever enters the transaction metadata and reverse index region, skip
758
      // past it.
759
538k
      switch (direction) {
760
538k
        case Direction::kForward: {
761
538k
          static const std::array<char, 1> kAfterTransactionId{ValueTypeAsChar::kTransactionId + 1};
762
538k
          static const Slice kAfterTxnRegion(kAfterTransactionId);
763
538k
          intent_iter_.Seek(kAfterTxnRegion);
764
538k
          break;
765
0
        }
766
2
        case Direction::kBackward:
767
2
          intent_upperbound_keybytes_.Clear();
768
2
          intent_upperbound_keybytes_.AppendValueType(ValueType::kTransactionId);
769
2
          intent_upperbound_ = intent_upperbound_keybytes_.AsSlice();
770
2
          intent_iter_.SeekToLast();
771
2
          break;
772
538k
      }
773
538k
      continue;
774
538k
    }
775
14.2M
    VLOG(4) << "Intent found: " << DebugIntentKeyToString(intent_key)
776
251
            << ", resolved state: " << yb::ToString(resolved_intent_state_);
777
14.2M
    if (resolved_intent_state_ != ResolvedIntentState::kNoIntent &&
778
        // Only scan intents for the first SubDocKey having suitable intents.
779
14.2M
        
!IsIntentForTheSameKey(intent_key, resolved_intent_key_prefix_)4.61M
) {
780
4.27M
      break;
781
4.27M
    }
782
9.97M
    if (!intent_key.starts_with(prefix) || 
!SatisfyBounds(intent_key)8.74M
) {
783
3.83M
      break;
784
3.83M
    }
785
6.14M
    ProcessIntent();
786
6.14M
    if (!status_.ok()) {
787
0
      return;
788
0
    }
789
6.14M
    switch (direction) {
790
5.97M
      case Direction::kForward:
791
5.97M
        intent_iter_.Next();
792
5.97M
        break;
793
170k
      case Direction::kBackward:
794
170k
        intent_iter_.Prev();
795
170k
        break;
796
6.14M
    }
797
6.14M
  }
798
300M
  if (resolved_intent_state_ != ResolvedIntentState::kNoIntent) {
799
4.92M
    UpdateResolvedIntentSubDocKeyEncoded();
800
4.92M
  }
801
300M
}
void yb::docdb::IntentAwareIterator::SeekToSuitableIntent<(yb::docdb::Direction)1>()
Line
Count
Source
746
13.0k
void IntentAwareIterator::SeekToSuitableIntent() {
747
13.0k
  DOCDB_DEBUG_SCOPE_LOG
(/* msg */ "", std::bind(&IntentAwareIterator::DebugDump, this));0
748
13.0k
  resolved_intent_state_ = ResolvedIntentState::kNoIntent;
749
13.0k
  resolved_intent_txn_dht_ = DocHybridTime::kMin;
750
13.0k
  intent_dht_from_same_txn_ = DocHybridTime::kMin;
751
13.0k
  auto prefix = CurrentPrefix();
752
753
  // Find latest suitable intent for the first SubDocKey having suitable intents.
754
183k
  while (intent_iter_.Valid()) {
755
182k
    auto intent_key = intent_iter_.key();
756
182k
    if (intent_key[0] == ValueTypeAsChar::kTransactionId) {
757
      // If the intent iterator ever enters the transaction metadata and reverse index region, skip
758
      // past it.
759
2
      switch (direction) {
760
0
        case Direction::kForward: {
761
0
          static const std::array<char, 1> kAfterTransactionId{ValueTypeAsChar::kTransactionId + 1};
762
0
          static const Slice kAfterTxnRegion(kAfterTransactionId);
763
0
          intent_iter_.Seek(kAfterTxnRegion);
764
0
          break;
765
0
        }
766
2
        case Direction::kBackward:
767
2
          intent_upperbound_keybytes_.Clear();
768
2
          intent_upperbound_keybytes_.AppendValueType(ValueType::kTransactionId);
769
2
          intent_upperbound_ = intent_upperbound_keybytes_.AsSlice();
770
2
          intent_iter_.SeekToLast();
771
2
          break;
772
2
      }
773
2
      continue;
774
2
    }
775
182k
    VLOG(4) << "Intent found: " << DebugIntentKeyToString(intent_key)
776
0
            << ", resolved state: " << yb::ToString(resolved_intent_state_);
777
182k
    if (resolved_intent_state_ != ResolvedIntentState::kNoIntent &&
778
        // Only scan intents for the first SubDocKey having suitable intents.
779
182k
        
!IsIntentForTheSameKey(intent_key, resolved_intent_key_prefix_)30.3k
) {
780
12.0k
      break;
781
12.0k
    }
782
170k
    if (!intent_key.starts_with(prefix) || !SatisfyBounds(intent_key)) {
783
0
      break;
784
0
    }
785
170k
    ProcessIntent();
786
170k
    if (!status_.ok()) {
787
0
      return;
788
0
    }
789
170k
    switch (direction) {
790
0
      case Direction::kForward:
791
0
        intent_iter_.Next();
792
0
        break;
793
170k
      case Direction::kBackward:
794
170k
        intent_iter_.Prev();
795
170k
        break;
796
170k
    }
797
170k
  }
798
13.0k
  if (resolved_intent_state_ != ResolvedIntentState::kNoIntent) {
799
12.0k
    UpdateResolvedIntentSubDocKeyEncoded();
800
12.0k
  }
801
13.0k
}
void yb::docdb::IntentAwareIterator::SeekToSuitableIntent<(yb::docdb::Direction)0>()
Line
Count
Source
746
300M
void IntentAwareIterator::SeekToSuitableIntent() {
747
300M
  DOCDB_DEBUG_SCOPE_LOG
(/* msg */ "", std::bind(&IntentAwareIterator::DebugDump, this));0
748
300M
  resolved_intent_state_ = ResolvedIntentState::kNoIntent;
749
300M
  resolved_intent_txn_dht_ = DocHybridTime::kMin;
750
300M
  intent_dht_from_same_txn_ = DocHybridTime::kMin;
751
300M
  auto prefix = CurrentPrefix();
752
753
  // Find latest suitable intent for the first SubDocKey having suitable intents.
754
306M
  while (intent_iter_.Valid()) {
755
14.6M
    auto intent_key = intent_iter_.key();
756
14.6M
    if (intent_key[0] == ValueTypeAsChar::kTransactionId) {
757
      // If the intent iterator ever enters the transaction metadata and reverse index region, skip
758
      // past it.
759
538k
      switch (direction) {
760
538k
        case Direction::kForward: {
761
538k
          static const std::array<char, 1> kAfterTransactionId{ValueTypeAsChar::kTransactionId + 1};
762
538k
          static const Slice kAfterTxnRegion(kAfterTransactionId);
763
538k
          intent_iter_.Seek(kAfterTxnRegion);
764
538k
          break;
765
0
        }
766
0
        case Direction::kBackward:
767
0
          intent_upperbound_keybytes_.Clear();
768
0
          intent_upperbound_keybytes_.AppendValueType(ValueType::kTransactionId);
769
0
          intent_upperbound_ = intent_upperbound_keybytes_.AsSlice();
770
0
          intent_iter_.SeekToLast();
771
0
          break;
772
538k
      }
773
538k
      continue;
774
538k
    }
775
14.0M
    VLOG(4) << "Intent found: " << DebugIntentKeyToString(intent_key)
776
251
            << ", resolved state: " << yb::ToString(resolved_intent_state_);
777
14.0M
    if (resolved_intent_state_ != ResolvedIntentState::kNoIntent &&
778
        // Only scan intents for the first SubDocKey having suitable intents.
779
14.0M
        
!IsIntentForTheSameKey(intent_key, resolved_intent_key_prefix_)4.58M
) {
780
4.26M
      break;
781
4.26M
    }
782
9.80M
    if (!intent_key.starts_with(prefix) || 
!SatisfyBounds(intent_key)8.57M
) {
783
3.83M
      break;
784
3.83M
    }
785
5.97M
    ProcessIntent();
786
5.97M
    if (!status_.ok()) {
787
0
      return;
788
0
    }
789
5.97M
    switch (direction) {
790
5.97M
      case Direction::kForward:
791
5.97M
        intent_iter_.Next();
792
5.97M
        break;
793
0
      case Direction::kBackward:
794
0
        intent_iter_.Prev();
795
0
        break;
796
5.97M
    }
797
5.97M
  }
798
300M
  if (resolved_intent_state_ != ResolvedIntentState::kNoIntent) {
799
4.91M
    UpdateResolvedIntentSubDocKeyEncoded();
800
4.91M
  }
801
300M
}
802
803
0
void IntentAwareIterator::DebugDump() {
804
0
  bool is_valid = valid();
805
0
  LOG(INFO) << ">> IntentAwareIterator dump";
806
0
  LOG(INFO) << "iter_.Valid(): " << iter_.Valid();
807
0
  if (iter_.Valid()) {
808
0
    LOG(INFO) << "iter_.key(): " << DebugDumpKeyToStr(iter_.key());
809
0
  }
810
0
  if (intent_iter_.Initialized()) {
811
0
    LOG(INFO) << "intent_iter_.Valid(): " << intent_iter_.Valid();
812
0
    if (intent_iter_.Valid()) {
813
0
      LOG(INFO) << "intent_iter_.key(): " << intent_iter_.key().ToDebugHexString();
814
0
    }
815
0
  }
816
0
  LOG(INFO) << "resolved_intent_state_: " << yb::ToString(resolved_intent_state_);
817
0
  if (resolved_intent_state_ != ResolvedIntentState::kNoIntent) {
818
0
    LOG(INFO) << "resolved_intent_sub_doc_key_encoded_: "
819
0
              << DebugDumpKeyToStr(resolved_intent_sub_doc_key_encoded_);
820
0
  }
821
0
  LOG(INFO) << "valid(): " << is_valid;
822
0
  if (valid()) {
823
0
    auto key_data = FetchKey();
824
0
    if (key_data.ok()) {
825
0
      LOG(INFO) << "key(): " << DebugDumpKeyToStr(key_data->key)
826
0
                << ", doc_ht: " << key_data->write_time;
827
0
    } else {
828
0
      LOG(INFO) << "key(): fetch failed: " << key_data.status();
829
0
    }
830
0
  }
831
0
  LOG(INFO) << "<< IntentAwareIterator dump";
832
0
}
833
834
Result<DocHybridTime>
835
17.1M
IntentAwareIterator::FindMatchingIntentRecordDocHybridTime(const Slice& key_without_ht) {
836
17.1M
  VLOG
(4) << __func__ << "(" << SubDocKey::DebugSliceToString(key_without_ht) << ")"1.58k
;
837
17.1M
  GetIntentPrefixForKeyWithoutHt(key_without_ht, &seek_key_buffer_);
838
17.1M
  seek_key_prefix_ = seek_key_buffer_.AsSlice();
839
840
17.1M
  SeekForwardToSuitableIntent();
841
17.1M
  RETURN_NOT_OK(status_);
842
843
17.1M
  if (resolved_intent_state_ != ResolvedIntentState::kValid) {
844
16.6M
    return DocHybridTime::kInvalid;
845
16.6M
  }
846
847
521k
  if (resolved_intent_key_prefix_.CompareTo(seek_key_buffer_) == 0) {
848
19.2k
    max_seen_ht_.MakeAtLeast(resolved_intent_txn_dht_.hybrid_time());
849
19.2k
    return GetIntentDocHybridTime();
850
19.2k
  }
851
502k
  return DocHybridTime::kInvalid;
852
521k
}
853
854
Result<DocHybridTime>
855
IntentAwareIterator::GetMatchingRegularRecordDocHybridTime(
856
83.3M
    const Slice& key_without_ht) {
857
83.3M
  size_t other_encoded_ht_size = 0;
858
83.3M
  RETURN_NOT_OK(CheckHybridTimeSizeAndValueType(iter_.key(), &other_encoded_ht_size));
859
83.3M
  Slice iter_key_without_ht = iter_.key();
860
83.3M
  iter_key_without_ht.remove_suffix(1 + other_encoded_ht_size);
861
83.3M
  if (key_without_ht == iter_key_without_ht) {
862
1.44M
    DocHybridTime doc_ht = VERIFY_RESULT(DocHybridTime::DecodeFromEnd(iter_.key()));
863
0
    max_seen_ht_.MakeAtLeast(doc_ht.hybrid_time());
864
1.44M
    return doc_ht;
865
1.44M
  }
866
81.9M
  return DocHybridTime::kInvalid;
867
83.3M
}
868
869
Result<HybridTime> IntentAwareIterator::FindOldestRecord(
870
2.00k
    const Slice& key_without_ht, HybridTime min_hybrid_time) {
871
2.00k
  VLOG(4) << "FindOldestRecord("
872
0
          << SubDocKey::DebugSliceToString(key_without_ht) << " = "
873
0
          << key_without_ht.ToDebugHexString() << " , " << min_hybrid_time
874
0
          << ")";
875
2.00k
#define DOCDB_DEBUG
876
2.00k
  DOCDB_DEBUG_SCOPE_LOG(SubDocKey::DebugSliceToString(key_without_ht) + ", " +
877
0
                            yb::ToString(min_hybrid_time),
878
0
                        std::bind(&IntentAwareIterator::DebugDump, this));
879
2.00k
#undef DOCDB_DEBUG
880
2.00k
  DCHECK(!DebugHasHybridTime(key_without_ht));
881
882
2.00k
  RETURN_NOT_OK(status_);
883
2.00k
  if (!valid()) {
884
0
    VLOG(4) << "Returning kInvalid";
885
0
    return HybridTime::kInvalid;
886
0
  }
887
888
2.00k
  HybridTime result;
889
2.00k
  if (intent_iter_.Initialized()) {
890
565
    auto intent_dht = VERIFY_RESULT(FindMatchingIntentRecordDocHybridTime(key_without_ht));
891
0
    VLOG(4) << "Looking for Intent Record found ?  =  "
892
0
            << (intent_dht != DocHybridTime::kInvalid);
893
565
    if (intent_dht != DocHybridTime::kInvalid &&
894
565
        
intent_dht.hybrid_time() > min_hybrid_time9
) {
895
5
      result = intent_dht.hybrid_time();
896
5
      VLOG
(4) << " oldest_record_ht is now " << result0
;
897
5
    }
898
1.43k
  } else {
899
1.43k
    VLOG
(4) << "intent_iter_ not Initialized"1
;
900
1.43k
  }
901
902
2.00k
  seek_key_buffer_.Reserve(key_without_ht.size() +
903
2.00k
                           kMaxBytesPerEncodedHybridTime);
904
2.00k
  seek_key_buffer_.Reset(key_without_ht);
905
2.00k
  seek_key_buffer_.AppendValueType(ValueType::kHybridTime);
906
2.00k
  seek_key_buffer_.AppendHybridTime(
907
2.00k
      DocHybridTime(min_hybrid_time, kMaxWriteId));
908
2.00k
  SeekForwardRegular(seek_key_buffer_);
909
2.00k
  RETURN_NOT_OK(status_);
910
2.00k
  if (iter_.Valid()) {
911
1.99k
    iter_.Prev();
912
1.99k
  } else {
913
6
    iter_.SeekToLast();
914
6
  }
915
2.00k
  SkipFutureRecords(Direction::kForward);
916
917
2.00k
  if (iter_valid_) {
918
1.86k
    DocHybridTime regular_dht =
919
1.86k
        VERIFY_RESULT(GetMatchingRegularRecordDocHybridTime(key_without_ht));
920
0
    VLOG(4) << "Looking for Matching Regular Record found   =  " << regular_dht;
921
1.86k
    if (regular_dht != DocHybridTime::kInvalid &&
922
1.86k
        
regular_dht.hybrid_time() > min_hybrid_time38
) {
923
38
      result.MakeAtMost(regular_dht.hybrid_time());
924
38
    }
925
1.86k
  } else {
926
18.4E
    VLOG(4) << "iter_valid_ is false";
927
141
  }
928
18.4E
  VLOG(4) << "Returning " << result;
929
2.00k
  return result;
930
2.00k
}
931
932
Status IntentAwareIterator::FindLatestRecord(
933
    const Slice& key_without_ht,
934
    DocHybridTime* latest_record_ht,
935
84.6M
    Slice* result_value) {
936
84.6M
  if (!latest_record_ht) {
937
0
    return STATUS(Corruption, "latest_record_ht should not be a null pointer");
938
0
  }
939
18.4E
  VLOG(4) << __func__ << "(" << SubDocKey::DebugSliceToString(key_without_ht) << ", "
940
18.4E
          << *latest_record_ht << ")";
941
84.6M
  DOCDB_DEBUG_SCOPE_LOG(
942
0
      SubDocKey::DebugSliceToString(key_without_ht) + ", " + yb::ToString(latest_record_ht) + ", "
943
0
      + yb::ToString(result_value),
944
0
      std::bind(&IntentAwareIterator::DebugDump, this));
945
18.4E
  DCHECK(!DebugHasHybridTime(key_without_ht)) << SubDocKey::DebugSliceToString(key_without_ht);
946
947
84.6M
  RETURN_NOT_OK(status_);
948
84.6M
  if (!valid()) {
949
1.05M
    return Status::OK();
950
1.05M
  }
951
952
83.5M
  bool found_later_intent_result = false;
953
83.5M
  if (intent_iter_.Initialized()) {
954
17.1M
    DocHybridTime dht = VERIFY_RESULT(FindMatchingIntentRecordDocHybridTime(key_without_ht));
955
17.1M
    if (dht != DocHybridTime::kInvalid && 
dht > *latest_record_ht19.2k
) {
956
19.2k
      *latest_record_ht = dht;
957
19.2k
      found_later_intent_result = true;
958
19.2k
    }
959
17.1M
  }
960
961
83.5M
  seek_key_buffer_.Reserve(key_without_ht.size() + encoded_read_time_global_limit_.size() + 1);
962
83.5M
  seek_key_buffer_.Reset(key_without_ht);
963
83.5M
  AppendEncodedDocHt(encoded_read_time_global_limit_, &seek_key_buffer_);
964
965
83.5M
  SeekForwardRegular(seek_key_buffer_);
966
83.5M
  RETURN_NOT_OK(status_);
967
  // After SeekForwardRegular(), we need to call valid() to skip future records and see if the
968
  // current key still matches the pushed prefix if any. If it does not, we are done.
969
83.5M
  if (!valid()) {
970
65
    return Status::OK();
971
65
  }
972
973
83.5M
  bool found_later_regular_result = false;
974
83.5M
  if (iter_valid_) {
975
83.3M
    DocHybridTime dht = VERIFY_RESULT(GetMatchingRegularRecordDocHybridTime(key_without_ht));
976
83.3M
    if (dht != DocHybridTime::kInvalid && 
dht > *latest_record_ht1.44M
) {
977
1.43M
      *latest_record_ht = dht;
978
1.43M
      found_later_regular_result = true;
979
1.43M
    }
980
83.3M
  }
981
982
83.5M
  
if (83.5M
result_value83.5M
) {
983
83.5M
    if (found_later_regular_result) {
984
1.43M
      *result_value = iter_.value();
985
82.1M
    } else if (found_later_intent_result) {
986
19.2k
      *result_value = resolved_intent_value_;
987
19.2k
    }
988
83.5M
  }
989
83.5M
  return Status::OK();
990
83.5M
}
991
992
1.54G
void IntentAwareIterator::PushPrefix(const Slice& prefix) {
993
18.4E
  VLOG(4) << "PushPrefix: " << SubDocKey::DebugSliceToString(prefix);
994
1.54G
  prefix_stack_.push_back(prefix);
995
1.54G
  skip_future_records_needed_ = true;
996
1.54G
  skip_future_intents_needed_ = true;
997
1.54G
}
998
999
1.55G
void IntentAwareIterator::PopPrefix() {
1000
1.55G
  prefix_stack_.pop_back();
1001
1.55G
  skip_future_records_needed_ = true;
1002
1.55G
  skip_future_intents_needed_ = true;
1003
18.4E
  VLOG(4) << "PopPrefix: "
1004
18.4E
          << (prefix_stack_.empty() ? 
std::string()1
1005
18.4E
              : 
SubDocKey::DebugSliceToString(prefix_stack_.back())18.4E
);
1006
1.55G
}
1007
1008
1.59G
Slice IntentAwareIterator::CurrentPrefix() const {
1009
1.59G
  return prefix_stack_.empty() ? 
Slice()164M
:
prefix_stack_.back()1.42G
;
1010
1.59G
}
1011
1012
1.14G
void IntentAwareIterator::SkipFutureRecords(const Direction direction) {
1013
1.14G
  skip_future_records_needed_ = false;
1014
1.14G
  if (!status_.ok()) {
1015
0
    return;
1016
0
  }
1017
1.14G
  auto prefix = CurrentPrefix();
1018
1.15G
  while (
iter_.Valid()1.14G
) {
1019
1.15G
    if (!iter_.key().starts_with(prefix)) {
1020
126M
      VLOG(4) << "Unmatched prefix: " << SubDocKey::DebugSliceToString(iter_.key())
1021
23.9k
              << ", prefix: " << SubDocKey::DebugSliceToString(prefix);
1022
126M
      iter_valid_ = false;
1023
126M
      return;
1024
126M
    }
1025
1.02G
    if (!SatisfyBounds(iter_.key())) {
1026
18.4E
      VLOG(4) << "Out of bounds: " << SubDocKey::DebugSliceToString(iter_.key())
1027
18.4E
              << ", upperbound: " << SubDocKey::DebugSliceToString(upperbound_);
1028
8.51M
      iter_valid_ = false;
1029
8.51M
      return;
1030
8.51M
    }
1031
1.01G
    Slice encoded_doc_ht = iter_.key();
1032
1.01G
    if (encoded_doc_ht.TryConsumeByte(ValueTypeAsChar::kTransactionApplyState)) {
1033
0
      if (!NextRegular(direction)) {
1034
0
        return;
1035
0
      }
1036
0
      continue;
1037
0
    }
1038
1.01G
    size_t doc_ht_size = 0;
1039
1.01G
    auto decode_status = DocHybridTime::CheckAndGetEncodedSize(encoded_doc_ht, &doc_ht_size);
1040
1.01G
    if (!decode_status.ok()) {
1041
0
      LOG(ERROR) << "Decode doc ht from key failed: " << decode_status
1042
0
                 << ", key: " << iter_.key().ToDebugHexString();
1043
0
      status_ = std::move(decode_status);
1044
0
      return;
1045
0
    }
1046
1.01G
    encoded_doc_ht.remove_prefix(encoded_doc_ht.size() - doc_ht_size);
1047
1.01G
    auto value = iter_.value();
1048
1.01G
    auto value_type = DecodeValueType(value);
1049
18.4E
    VLOG(4) << "Checking for skip, type " << value_type << ", encoded_doc_ht: "
1050
18.4E
            << DocHybridTime::DebugSliceToString(encoded_doc_ht)
1051
18.4E
            << " value: " << value.ToDebugHexString();
1052
1.01G
    if (value_type == ValueType::kHybridTime) {
1053
      // Value came from a transaction, we could try to filter it by original intent time.
1054
199M
      Slice encoded_intent_doc_ht = value;
1055
199M
      encoded_intent_doc_ht.consume_byte();
1056
      // The logic here replicates part of the logic in
1057
      // DecodeStrongWriteIntentResult:: MaxAllowedValueTime for intents that have been committed
1058
      // and applied to regular RocksDB only. Note that here we are comparing encoded hybrid times,
1059
      // so comparisons are reversed vs. the un-encoded case. If a value is found "invalid", it
1060
      // can't cause a read restart. If it is found "valid", it will cause a read restart if it is
1061
      // greater than read_time.read. That last comparison is done outside this function.
1062
199M
      Slice max_allowed = encoded_intent_doc_ht.compare(encoded_read_time_local_limit_) > 0
1063
199M
          ? 
Slice(encoded_read_time_global_limit_)199M
1064
199M
          : 
Slice(encoded_read_time_read_)67.0k
;
1065
199M
      if (encoded_doc_ht.compare(max_allowed) > 0) {
1066
199M
        iter_valid_ = true;
1067
199M
        return;
1068
199M
      }
1069
815M
    } else if (encoded_doc_ht.compare(encoded_read_time_regular_limit_) > 0) {
1070
      // If a value does not contain the hybrid time of the intent that wrote the original
1071
      // transaction, then it either (a) originated from a single-shard transaction or (b) the
1072
      // intent hybrid time has already been garbage-collected during a compaction because the
1073
      // corresponding transaction's commit time (stored in the key) became lower than the history
1074
      // cutoff. See the following commit for the details of this intent hybrid time GC.
1075
      //
1076
      // https://github.com/yugabyte/yugabyte-db/commit/26260e0143e521e219d93f4aba6310fcc030a628
1077
      //
1078
      // encoded_read_time_regular_limit_ is simply the encoded value of max(read_ht, local_limit).
1079
      // The above condition
1080
      //
1081
      //   encoded_doc_ht.compare(encoded_read_time_regular_limit_) >= 0
1082
      //
1083
      // corresponds to the following in terms of decoded hybrid times (order is reversed):
1084
      //
1085
      //   commit_ht <= max(read_ht, local_limit)
1086
      //
1087
      // and the inverse of that can be written as
1088
      //
1089
      //   commit_ht > read_ht && commit_ht > local_limit
1090
      //
1091
      // The reason this is correct here is that in case (a) the event of writing a single-shard
1092
      // record to the tablet would certainly be after our read transaction's start time in case
1093
      // commit_ht > local_limit, so it can never cause a read restart. In case (b) we know that
1094
      // commit_ht < history_cutoff and read_ht >= history_cutoff (by definition of history cutoff)
1095
      // so commit_ht < read_ht, and in this case read restart is impossible regardless of the
1096
      // value of local_limit.
1097
814M
      iter_valid_ = true;
1098
814M
      return;
1099
814M
    }
1100
18.4E
    VLOG(4) << "Skipping because of time: " << SubDocKey::DebugSliceToString(iter_.key())
1101
18.4E
            << ", read time: " << read_time_;
1102
1.49M
    if (!NextRegular(direction)) {
1103
0
      return;
1104
0
    }
1105
1.49M
  }
1106
18.4E
  iter_valid_ = false;
1107
18.4E
}
1108
1109
8.02M
bool IntentAwareIterator::NextRegular(Direction direction) {
1110
8.02M
  switch (direction) {
1111
3.21M
    case Direction::kForward:
1112
3.21M
      iter_.Next(); // TODO(dtxn) use seek with the same key, but read limit as doc hybrid time.
1113
3.21M
      return true;
1114
4.80M
    case Direction::kBackward:
1115
4.80M
      iter_.Prev();
1116
4.80M
      return true;
1117
8.02M
  }
1118
1119
0
  status_ = STATUS_FORMAT(Corruption, "Unexpected direction: $0", direction);
1120
0
  LOG(ERROR) << status_;
1121
0
  iter_valid_ = false;
1122
0
  return false;
1123
8.02M
}
1124
1125
1.03G
void IntentAwareIterator::SkipFutureIntents() {
1126
1.03G
  skip_future_intents_needed_ = false;
1127
1.03G
  if (!intent_iter_.Initialized() || 
!status_.ok()141M
) {
1128
888M
    return;
1129
888M
  }
1130
141M
  auto prefix = CurrentPrefix();
1131
141M
  if (resolved_intent_state_ != ResolvedIntentState::kNoIntent) {
1132
5.66M
    auto compare_result = resolved_intent_key_prefix_.AsSlice().compare_prefix(prefix);
1133
5.66M
    VLOG(4) << "Checking resolved intent subdockey: "
1134
1
            << DebugDumpKeyToStr(resolved_intent_key_prefix_)
1135
1
            << ", against new prefix: " << DebugDumpKeyToStr(prefix) << ": "
1136
1
            << compare_result;
1137
5.66M
    if (compare_result == 0) {
1138
5.66M
      if (!SatisfyBounds(resolved_intent_key_prefix_.AsSlice())) {
1139
0
        resolved_intent_state_ = ResolvedIntentState::kNoIntent;
1140
5.66M
      } else {
1141
5.66M
        resolved_intent_state_ = ResolvedIntentState::kValid;
1142
5.66M
      }
1143
5.66M
      return;
1144
5.66M
    } else 
if (319
compare_result > 0319
) {
1145
326
      resolved_intent_state_ = ResolvedIntentState::kInvalidPrefix;
1146
326
      return;
1147
326
    }
1148
5.66M
  }
1149
136M
  SeekToSuitableIntent<Direction::kForward>();
1150
136M
}
1151
1152
149M
Status IntentAwareIterator::SetIntentUpperbound() {
1153
149M
  if (iter_.Valid()) {
1154
146M
    intent_upperbound_keybytes_.Clear();
1155
    // Strip ValueType::kHybridTime + DocHybridTime at the end of SubDocKey in iter_ and append
1156
    // to upperbound with 0xff.
1157
146M
    Slice subdoc_key = iter_.key();
1158
146M
    size_t doc_ht_size = 0;
1159
146M
    RETURN_NOT_OK(DocHybridTime::CheckAndGetEncodedSize(subdoc_key, &doc_ht_size));
1160
146M
    subdoc_key.remove_suffix(1 + doc_ht_size);
1161
146M
    intent_upperbound_keybytes_.AppendRawBytes(subdoc_key);
1162
18.4E
    VLOG(4) << "SetIntentUpperbound = "
1163
18.4E
            << SubDocKey::DebugSliceToString(intent_upperbound_keybytes_.AsSlice());
1164
146M
    intent_upperbound_keybytes_.AppendValueType(ValueType::kMaxByte);
1165
146M
    intent_upperbound_ = intent_upperbound_keybytes_.AsSlice();
1166
146M
    intent_iter_.RevalidateAfterUpperBoundChange();
1167
146M
  } else {
1168
    // In case the current position of the regular iterator is invalid, set the exclusive intent
1169
    // upperbound high to be able to find all intents higher than the last regular record.
1170
2.95M
    ResetIntentUpperbound();
1171
2.95M
  }
1172
149M
  return Status::OK();
1173
149M
}
1174
1175
2.96M
void IntentAwareIterator::ResetIntentUpperbound() {
1176
2.96M
  intent_upperbound_keybytes_.Clear();
1177
2.96M
  intent_upperbound_keybytes_.AppendValueType(ValueType::kHighest);
1178
2.96M
  intent_upperbound_ = intent_upperbound_keybytes_.AsSlice();
1179
2.96M
  intent_iter_.RevalidateAfterUpperBoundChange();
1180
18.4E
  VLOG(4) << "ResetIntentUpperbound = " << intent_upperbound_.ToDebugString();
1181
2.96M
}
1182
1183
}  // namespace docdb
1184
}  // namespace yb