/Users/deen/code/yugabyte-db/src/yb/docdb/docdb.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/docdb/docdb.h" |
15 | | |
16 | | #include <algorithm> |
17 | | #include <limits> |
18 | | #include <memory> |
19 | | #include <stack> |
20 | | #include <string> |
21 | | #include <unordered_map> |
22 | | #include <vector> |
23 | | |
24 | | #include "yb/common/hybrid_time.h" |
25 | | #include "yb/common/row_mark.h" |
26 | | #include "yb/common/transaction.h" |
27 | | |
28 | | #include "yb/docdb/conflict_resolution.h" |
29 | | #include "yb/docdb/cql_operation.h" |
30 | | #include "yb/docdb/docdb-internal.h" |
31 | | #include "yb/docdb/docdb.pb.h" |
32 | | #include "yb/docdb/docdb_debug.h" |
33 | | #include "yb/docdb/doc_kv_util.h" |
34 | | #include "yb/docdb/docdb_rocksdb_util.h" |
35 | | #include "yb/docdb/docdb_types.h" |
36 | | #include "yb/docdb/intent.h" |
37 | | #include "yb/docdb/intent_aware_iterator.h" |
38 | | #include "yb/docdb/pgsql_operation.h" |
39 | | #include "yb/docdb/rocksdb_writer.h" |
40 | | #include "yb/docdb/subdocument.h" |
41 | | #include "yb/docdb/value.h" |
42 | | #include "yb/docdb/value_type.h" |
43 | | |
44 | | #include "yb/gutil/casts.h" |
45 | | #include "yb/gutil/strings/substitute.h" |
46 | | |
47 | | #include "yb/rocksutil/write_batch_formatter.h" |
48 | | |
49 | | #include "yb/server/hybrid_clock.h" |
50 | | |
51 | | #include "yb/util/bitmap.h" |
52 | | #include "yb/util/bytes_formatter.h" |
53 | | #include "yb/util/enums.h" |
54 | | #include "yb/util/fast_varint.h" |
55 | | #include "yb/util/flag_tags.h" |
56 | | #include "yb/util/logging.h" |
57 | | #include "yb/util/metrics.h" |
58 | | #include "yb/util/pb_util.h" |
59 | | #include "yb/util/status.h" |
60 | | #include "yb/util/status_format.h" |
61 | | #include "yb/util/status_log.h" |
62 | | |
63 | | #include "yb/yql/cql/ql/util/errcodes.h" |
64 | | |
65 | | using std::endl; |
66 | | using std::list; |
67 | | using std::string; |
68 | | using std::stringstream; |
69 | | using std::unique_ptr; |
70 | | using std::shared_ptr; |
71 | | using std::stack; |
72 | | using std::vector; |
73 | | using std::make_shared; |
74 | | |
75 | | using yb::HybridTime; |
76 | | using yb::FormatBytesAsStr; |
77 | | using strings::Substitute; |
78 | | |
79 | | using namespace std::placeholders; |
80 | | |
81 | | DEFINE_int32(cdc_max_stream_intent_records, 1000, |
82 | | "Max number of intent records allowed in single cdc batch. "); |
83 | | |
84 | | namespace yb { |
85 | | namespace docdb { |
86 | | |
87 | | namespace { |
88 | | |
89 | | // key should be valid prefix of doc key, ending with some complete pritimive value or group end. |
90 | | CHECKED_STATUS ApplyIntent(RefCntPrefix key, |
91 | | const IntentTypeSet intent_types, |
92 | 66.2M | LockBatchEntries *keys_locked) { |
93 | | // Have to strip kGroupEnd from end of key, because when only hash key is specified, we will |
94 | | // get two kGroupEnd at end of strong intent. |
95 | 66.2M | size_t size = key.size(); |
96 | 66.2M | if (size > 0) { |
97 | 58.8M | if (key.data()[0] == ValueTypeAsChar::kGroupEnd) { |
98 | 2.86M | if (size != 1) { |
99 | 0 | return STATUS_FORMAT(Corruption, "Key starting with group end: $0", |
100 | 0 | key.as_slice().ToDebugHexString()); |
101 | 0 | } |
102 | 2.86M | size = 0; |
103 | 55.9M | } else { |
104 | 89.1M | while (key.data()[size - 1] == ValueTypeAsChar::kGroupEnd) { |
105 | 33.1M | --size; |
106 | 33.1M | } |
107 | 55.9M | } |
108 | 58.8M | } |
109 | 66.2M | key.Resize(size); |
110 | 66.2M | keys_locked->push_back({key, intent_types}); |
111 | 66.2M | return Status::OK(); |
112 | 66.2M | } |
113 | | |
114 | | struct DetermineKeysToLockResult { |
115 | | LockBatchEntries lock_batch; |
116 | | bool need_read_snapshot; |
117 | | |
118 | 0 | std::string ToString() const { |
119 | 0 | return YB_STRUCT_TO_STRING(lock_batch, need_read_snapshot); |
120 | 0 | } |
121 | | }; |
122 | | |
123 | | Result<DetermineKeysToLockResult> DetermineKeysToLock( |
124 | | const std::vector<std::unique_ptr<DocOperation>>& doc_write_ops, |
125 | | const google::protobuf::RepeatedPtrField<KeyValuePairPB>& read_pairs, |
126 | | const IsolationLevel isolation_level, |
127 | | const OperationKind operation_kind, |
128 | | const RowMarkType row_mark_type, |
129 | | bool transactional_table, |
130 | 3.24M | PartialRangeKeyIntents partial_range_key_intents) { |
131 | 3.24M | DetermineKeysToLockResult result; |
132 | 3.24M | boost::container::small_vector<RefCntPrefix, 8> doc_paths; |
133 | 3.24M | boost::container::small_vector<size_t, 32> key_prefix_lengths; |
134 | 3.24M | result.need_read_snapshot = false; |
135 | 17.5M | for (const unique_ptr<DocOperation>& doc_op : doc_write_ops) { |
136 | 17.5M | doc_paths.clear(); |
137 | 17.5M | IsolationLevel level; |
138 | 17.5M | RETURN_NOT_OK(doc_op->GetDocPaths(GetDocPathsMode::kLock, &doc_paths, &level)); |
139 | 17.5M | if (isolation_level != IsolationLevel::NON_TRANSACTIONAL) { |
140 | 6.78M | level = isolation_level; |
141 | 6.78M | } |
142 | 17.5M | IntentTypeSet strong_intent_types = GetStrongIntentTypeSet(level, operation_kind, |
143 | 17.5M | row_mark_type); |
144 | 17.5M | if (isolation_level == IsolationLevel::SERIALIZABLE_ISOLATION && |
145 | 17.5M | operation_kind == OperationKind::kWrite130k && |
146 | 17.5M | doc_op->RequireReadSnapshot()130k ) { |
147 | 72.9k | strong_intent_types = IntentTypeSet({IntentType::kStrongRead, IntentType::kStrongWrite}); |
148 | 72.9k | } |
149 | | |
150 | 17.5M | for (const auto& doc_path : doc_paths) { |
151 | 17.5M | key_prefix_lengths.clear(); |
152 | 17.5M | RETURN_NOT_OK(SubDocKey::DecodePrefixLengths(doc_path.as_slice(), &key_prefix_lengths)); |
153 | | // At least entire doc_path should be returned, so empty key_prefix_lengths is an error. |
154 | 17.5M | if (key_prefix_lengths.empty()) { |
155 | 0 | return STATUS_FORMAT(Corruption, "Unable to decode key prefixes from: $0", |
156 | 0 | doc_path.as_slice().ToDebugHexString()); |
157 | 0 | } |
158 | | // We will acquire strong lock on entire doc_path, so remove it from list of weak locks. |
159 | 17.5M | key_prefix_lengths.pop_back(); |
160 | 17.5M | auto partial_key = doc_path; |
161 | | // Acquire weak lock on empty key for transactional tables, |
162 | | // unless specified key is already empty. |
163 | 17.5M | if (doc_path.size() > 017.5M && transactional_table) { |
164 | 7.38M | partial_key.Resize(0); |
165 | 7.38M | RETURN_NOT_OK(ApplyIntent( |
166 | 7.38M | partial_key, StrongToWeak(strong_intent_types), &result.lock_batch)); |
167 | 7.38M | } |
168 | 35.5M | for (size_t prefix_length : key_prefix_lengths)17.5M { |
169 | 35.5M | partial_key.Resize(prefix_length); |
170 | 35.5M | RETURN_NOT_OK(ApplyIntent( |
171 | 35.5M | partial_key, StrongToWeak(strong_intent_types), &result.lock_batch)); |
172 | 35.5M | } |
173 | | |
174 | 17.5M | RETURN_NOT_OK(ApplyIntent(doc_path, strong_intent_types, &result.lock_batch)); |
175 | 17.5M | } |
176 | | |
177 | 17.5M | if (doc_op->RequireReadSnapshot()) { |
178 | 6.31M | result.need_read_snapshot = true; |
179 | 6.31M | } |
180 | 17.5M | } |
181 | | |
182 | 3.24M | if (!read_pairs.empty()) { |
183 | 314k | RETURN_NOT_OK(EnumerateIntents( |
184 | 314k | read_pairs, |
185 | 314k | [&result](IntentStrength strength, FullDocKey, Slice value, KeyBytes* key, LastKey) { |
186 | 314k | RefCntPrefix prefix(key->AsSlice()); |
187 | 314k | auto intent_types = strength == IntentStrength::kStrong |
188 | 314k | ? IntentTypeSet({IntentType::kStrongRead}) |
189 | 314k | : IntentTypeSet({IntentType::kWeakRead}); |
190 | 314k | return ApplyIntent(prefix, intent_types, &result.lock_batch); |
191 | 314k | }, partial_range_key_intents)); |
192 | 314k | } |
193 | | |
194 | 3.24M | return result; |
195 | 3.24M | } |
196 | | |
197 | | // Collapse keys_locked into a unique set of keys with intent_types representing the union of |
198 | | // intent_types originally present. In other words, suppose keys_locked is originally the following: |
199 | | // [ |
200 | | // (k1, {kWeakRead, kWeakWrite}), |
201 | | // (k1, {kStrongRead}), |
202 | | // (k2, {kWeakRead}), |
203 | | // (k3, {kStrongRead}), |
204 | | // (k2, {kStrongWrite}), |
205 | | // ] |
206 | | // Then after calling FilterKeysToLock we will have: |
207 | | // [ |
208 | | // (k1, {kWeakRead, kWeakWrite, kStrongRead}), |
209 | | // (k2, {kWeakRead}), |
210 | | // (k3, {kStrongRead, kStrongWrite}), |
211 | | // ] |
212 | | // Note that only keys which appear in order in keys_locked will be collapsed in this manner. |
213 | 3.24M | void FilterKeysToLock(LockBatchEntries *keys_locked) { |
214 | 3.24M | if (keys_locked->empty()) { |
215 | 0 | return; |
216 | 0 | } |
217 | | |
218 | 3.24M | std::sort(keys_locked->begin(), keys_locked->end(), |
219 | 560M | [](const auto& lhs, const auto& rhs) { |
220 | 560M | return lhs.key < rhs.key; |
221 | 560M | }); |
222 | | |
223 | 3.24M | auto w = keys_locked->begin(); |
224 | 66.2M | for (auto it = keys_locked->begin(); ++it != keys_locked->end();) { |
225 | 62.9M | if (it->key == w->key) { |
226 | 22.0M | w->intent_types |= it->intent_types; |
227 | 40.9M | } else { |
228 | 40.9M | ++w; |
229 | 40.9M | *w = *it; |
230 | 40.9M | } |
231 | 62.9M | } |
232 | | |
233 | 3.24M | ++w; |
234 | 3.24M | keys_locked->erase(w, keys_locked->end()); |
235 | 3.24M | } |
236 | | |
237 | | } // namespace |
238 | | |
239 | | Result<PrepareDocWriteOperationResult> PrepareDocWriteOperation( |
240 | | const std::vector<std::unique_ptr<DocOperation>>& doc_write_ops, |
241 | | const google::protobuf::RepeatedPtrField<KeyValuePairPB>& read_pairs, |
242 | | const scoped_refptr<Histogram>& write_lock_latency, |
243 | | const IsolationLevel isolation_level, |
244 | | const OperationKind operation_kind, |
245 | | const RowMarkType row_mark_type, |
246 | | bool transactional_table, |
247 | | CoarseTimePoint deadline, |
248 | | PartialRangeKeyIntents partial_range_key_intents, |
249 | 3.24M | SharedLockManager *lock_manager) { |
250 | 3.24M | PrepareDocWriteOperationResult result; |
251 | | |
252 | 3.24M | auto determine_keys_to_lock_result = VERIFY_RESULT(DetermineKeysToLock( |
253 | 3.24M | doc_write_ops, read_pairs, isolation_level, operation_kind, row_mark_type, |
254 | 3.24M | transactional_table, partial_range_key_intents)); |
255 | 4.40k | VLOG_WITH_FUNC(4) << "determine_keys_to_lock_result=" << determine_keys_to_lock_result.ToString(); |
256 | 3.24M | if (determine_keys_to_lock_result.lock_batch.empty()) { |
257 | 0 | LOG(ERROR) << "Empty lock batch, doc_write_ops: " << yb::ToString(doc_write_ops) |
258 | 0 | << ", read pairs: " << yb::ToString(read_pairs); |
259 | 0 | return STATUS(Corruption, "Empty lock batch"); |
260 | 0 | } |
261 | 3.24M | result.need_read_snapshot = determine_keys_to_lock_result.need_read_snapshot; |
262 | | |
263 | 3.24M | FilterKeysToLock(&determine_keys_to_lock_result.lock_batch); |
264 | 3.24M | VLOG_WITH_FUNC1.86k (4) << "filtered determine_keys_to_lock_result=" |
265 | 1.86k | << determine_keys_to_lock_result.ToString(); |
266 | 3.24M | const MonoTime start_time = (write_lock_latency != nullptr) ? MonoTime::Now()3.24M : MonoTime()1.18k ; |
267 | 3.24M | result.lock_batch = LockBatch( |
268 | 3.24M | lock_manager, std::move(determine_keys_to_lock_result.lock_batch), deadline); |
269 | 3.24M | RETURN_NOT_OK_PREPEND( |
270 | 3.24M | result.lock_batch.status(), Format("Timeout: $0", deadline - ToCoarse(start_time))); |
271 | 3.24M | if (write_lock_latency != nullptr) { |
272 | 3.24M | const MonoDelta elapsed_time = MonoTime::Now().GetDeltaSince(start_time); |
273 | 3.24M | write_lock_latency->Increment(elapsed_time.ToMicroseconds()); |
274 | 3.24M | } |
275 | | |
276 | 3.24M | return result; |
277 | 3.24M | } |
278 | | |
279 | 30 | Status SetDocOpQLErrorResponse(DocOperation* doc_op, string err_msg) { |
280 | 30 | switch (doc_op->OpType()) { |
281 | 30 | case DocOperation::Type::QL_WRITE_OPERATION: { |
282 | 30 | const auto &resp = down_cast<QLWriteOperation *>(doc_op)->response(); |
283 | 30 | resp->set_status(QLResponsePB::YQL_STATUS_QUERY_ERROR); |
284 | 30 | resp->set_error_message(err_msg); |
285 | 30 | break; |
286 | 0 | } |
287 | 0 | case DocOperation::Type::PGSQL_WRITE_OPERATION: { |
288 | 0 | const auto &resp = down_cast<PgsqlWriteOperation *>(doc_op)->response(); |
289 | 0 | resp->set_status(PgsqlResponsePB::PGSQL_STATUS_USAGE_ERROR); |
290 | 0 | resp->set_error_message(err_msg); |
291 | 0 | break; |
292 | 0 | } |
293 | 0 | default: |
294 | 0 | return STATUS_FORMAT(InternalError, |
295 | 30 | "Invalid status (QLError) for doc operation %d", |
296 | 30 | doc_op->OpType()); |
297 | 30 | } |
298 | 30 | return Status::OK(); |
299 | 30 | } |
300 | | |
301 | | Status AssembleDocWriteBatch(const vector<unique_ptr<DocOperation>>& doc_write_ops, |
302 | | CoarseTimePoint deadline, |
303 | | const ReadHybridTime& read_time, |
304 | | const DocDB& doc_db, |
305 | | KeyValueWriteBatchPB* write_batch, |
306 | | InitMarkerBehavior init_marker_behavior, |
307 | | std::atomic<int64_t>* monotonic_counter, |
308 | | HybridTime* restart_read_ht, |
309 | 3.11M | const string& table_name) { |
310 | 3.11M | DCHECK_ONLY_NOTNULL(restart_read_ht); |
311 | 3.11M | DocWriteBatch doc_write_batch(doc_db, init_marker_behavior, monotonic_counter); |
312 | 3.11M | DocOperationApplyData data = {&doc_write_batch, deadline, read_time, restart_read_ht}; |
313 | 17.4M | for (const unique_ptr<DocOperation>& doc_op : doc_write_ops) { |
314 | 17.4M | Status s = doc_op->Apply(data); |
315 | 17.4M | if (s.IsQLError()) { |
316 | 30 | string error_msg; |
317 | 30 | if (ql::GetErrorCode(s) == ql::ErrorCode::CONDITION_NOT_SATISFIED) { |
318 | | // Generating the error message here because 'table_name' |
319 | | // is not available on the lower level - in doc_op->Apply(). |
320 | 19 | error_msg = Format("Condition on table $0 was not satisfied.", table_name); |
321 | 19 | } else { |
322 | 11 | error_msg = s.message().ToBuffer(); |
323 | 11 | } |
324 | | |
325 | | // Ensure we set appropriate error in the response object for QL errors. |
326 | 30 | RETURN_NOT_OK(SetDocOpQLErrorResponse(doc_op.get(), error_msg)); |
327 | 30 | continue; |
328 | 30 | } |
329 | | |
330 | 17.4M | RETURN_NOT_OK(s); |
331 | 17.4M | } |
332 | 3.11M | doc_write_batch.MoveToWriteBatchPB(write_batch); |
333 | 3.11M | return Status::OK(); |
334 | 3.11M | } |
335 | | |
336 | | namespace { |
337 | | |
338 | 0 | CHECKED_STATUS NotEnoughBytes(size_t present, size_t required, const Slice& full) { |
339 | 0 | return STATUS_FORMAT( |
340 | 0 | Corruption, "Not enough bytes in external intents $0 while $1 expected, full: $2", |
341 | 0 | present, required, full.ToDebugHexString()); |
342 | 0 | } |
343 | | |
344 | | CHECKED_STATUS PrepareApplyExternalIntentsBatch( |
345 | | HybridTime commit_ht, |
346 | | const Slice& original_input_value, |
347 | | rocksdb::WriteBatch* regular_batch, |
348 | 0 | IntraTxnWriteId* write_id) { |
349 | 0 | auto input_value = original_input_value; |
350 | 0 | DocHybridTimeBuffer doc_ht_buffer; |
351 | 0 | RETURN_NOT_OK(input_value.consume_byte(ValueTypeAsChar::kUuid)); |
352 | 0 | Uuid status_tablet; |
353 | 0 | RETURN_NOT_OK(status_tablet.FromSlice(input_value.Prefix(kUuidSize))); |
354 | 0 | input_value.remove_prefix(kUuidSize); |
355 | 0 | RETURN_NOT_OK(input_value.consume_byte(ValueTypeAsChar::kExternalIntents)); |
356 | 0 | for (;;) { |
357 | 0 | auto key_size = VERIFY_RESULT(util::FastDecodeUnsignedVarInt(&input_value)); |
358 | 0 | if (key_size == 0) { |
359 | 0 | break; |
360 | 0 | } |
361 | 0 | if (input_value.size() < key_size) { |
362 | 0 | return NotEnoughBytes(input_value.size(), key_size, original_input_value); |
363 | 0 | } |
364 | 0 | auto output_key = input_value.Prefix(key_size); |
365 | 0 | input_value.remove_prefix(key_size); |
366 | 0 | auto value_size = VERIFY_RESULT(util::FastDecodeUnsignedVarInt(&input_value)); |
367 | 0 | if (input_value.size() < value_size) { |
368 | 0 | return NotEnoughBytes(input_value.size(), value_size, original_input_value); |
369 | 0 | } |
370 | 0 | auto output_value = input_value.Prefix(value_size); |
371 | 0 | input_value.remove_prefix(value_size); |
372 | 0 | std::array<Slice, 2> key_parts = {{ |
373 | 0 | output_key, |
374 | 0 | doc_ht_buffer.EncodeWithValueType(commit_ht, *write_id), |
375 | 0 | }}; |
376 | 0 | std::array<Slice, 1> value_parts = {{ |
377 | 0 | output_value, |
378 | 0 | }}; |
379 | 0 | regular_batch->Put(key_parts, value_parts); |
380 | 0 | ++*write_id; |
381 | 0 | } |
382 | | |
383 | 0 | return Status::OK(); |
384 | 0 | } |
385 | | |
386 | | // Reads all stored external intents for provided transactions and prepares batches that will apply |
387 | | // them into regular db and remove from intents db. |
388 | | CHECKED_STATUS PrepareApplyExternalIntents( |
389 | | ExternalTxnApplyState* apply_external_transactions, |
390 | | rocksdb::WriteBatch* regular_batch, |
391 | | rocksdb::DB* intents_db, |
392 | 6.34M | rocksdb::WriteBatch* intents_batch) { |
393 | 6.34M | if (apply_external_transactions->empty()) { |
394 | 6.34M | return Status::OK(); |
395 | 6.34M | } |
396 | | |
397 | 501 | KeyBytes key_prefix; |
398 | 501 | KeyBytes key_upperbound; |
399 | 501 | Slice key_upperbound_slice; |
400 | | |
401 | 501 | auto iter = CreateRocksDBIterator( |
402 | 501 | intents_db, &KeyBounds::kNoBounds, BloomFilterMode::DONT_USE_BLOOM_FILTER, |
403 | 501 | /* user_key_for_filter= */ boost::none, |
404 | 501 | rocksdb::kDefaultQueryId, /* read_filter= */ nullptr, &key_upperbound_slice); |
405 | | |
406 | 501 | for (auto& apply : *apply_external_transactions) { |
407 | 0 | key_prefix.Clear(); |
408 | 0 | key_prefix.AppendValueType(ValueType::kExternalTransactionId); |
409 | 0 | key_prefix.AppendRawBytes(apply.first.AsSlice()); |
410 | |
|
411 | 0 | key_upperbound = key_prefix; |
412 | 0 | key_upperbound.AppendValueType(ValueType::kMaxByte); |
413 | 0 | key_upperbound_slice = key_upperbound.AsSlice(); |
414 | |
|
415 | 0 | IntraTxnWriteId& write_id = apply.second.write_id; |
416 | |
|
417 | 0 | iter.Seek(key_prefix); |
418 | 0 | while (iter.Valid()) { |
419 | 0 | const Slice input_key(iter.key()); |
420 | |
|
421 | 0 | if (!input_key.starts_with(key_prefix.AsSlice())) { |
422 | 0 | break; |
423 | 0 | } |
424 | | |
425 | 0 | if (regular_batch) { |
426 | 0 | RETURN_NOT_OK(PrepareApplyExternalIntentsBatch( |
427 | 0 | apply.second.commit_ht, iter.value(), regular_batch, &write_id)); |
428 | 0 | } |
429 | 0 | if (intents_batch) { |
430 | 0 | intents_batch->SingleDelete(input_key); |
431 | 0 | } |
432 | |
|
433 | 0 | iter.Next(); |
434 | 0 | } |
435 | 0 | } |
436 | | |
437 | 501 | return Status::OK(); |
438 | 501 | } |
439 | | |
440 | 6.34M | ExternalTxnApplyState ProcessApplyExternalTransactions(const KeyValueWriteBatchPB& put_batch) { |
441 | 6.34M | ExternalTxnApplyState result; |
442 | 6.34M | for (const auto& apply : put_batch.apply_external_transactions()) { |
443 | 0 | auto txn_id = CHECK_RESULT(FullyDecodeTransactionId(apply.transaction_id())); |
444 | 0 | auto commit_ht = HybridTime(apply.commit_hybrid_time()); |
445 | 0 | result.emplace( |
446 | 0 | txn_id, |
447 | 0 | ExternalTxnApplyStateData{ |
448 | 0 | .commit_ht = commit_ht |
449 | 0 | }); |
450 | 0 | } |
451 | | |
452 | 6.34M | return result; |
453 | 6.34M | } |
454 | | |
455 | | } // namespace |
456 | | |
457 | | bool AddExternalPairToWriteBatch( |
458 | | const KeyValuePairPB& kv_pair, |
459 | | HybridTime hybrid_time, |
460 | | int write_id, |
461 | | ExternalTxnApplyState* apply_external_transactions, |
462 | | rocksdb::WriteBatch* regular_write_batch, |
463 | 125M | rocksdb::WriteBatch* intents_write_batch) { |
464 | 125M | DocHybridTimeBuffer doc_ht_buffer; |
465 | 125M | DocHybridTimeWordBuffer inverted_doc_ht_buffer; |
466 | | |
467 | 125M | CHECK(!kv_pair.key().empty()); |
468 | 125M | CHECK(!kv_pair.value().empty()); |
469 | | |
470 | 125M | if (kv_pair.key()[0] != ValueTypeAsChar::kExternalTransactionId) { |
471 | 125M | return true; |
472 | 125M | } |
473 | | |
474 | | // We replicate encoded SubDocKeys without a HybridTime at the end, and only append it here. |
475 | | // The reason for this is that the HybridTime timestamp is only picked at the time of |
476 | | // appending an entry to the tablet's Raft log. Also this is a good way to save network |
477 | | // bandwidth. |
478 | | // |
479 | | // "Write id" is the final component of our HybridTime encoding (or, to be more precise, |
480 | | // DocHybridTime encoding) that helps disambiguate between different updates to the |
481 | | // same key (row/column) within a transaction. We set it based on the position of the write |
482 | | // operation in its write batch. |
483 | | |
484 | 786 | hybrid_time = kv_pair.has_external_hybrid_time() ? |
485 | 786 | HybridTime(kv_pair.external_hybrid_time())0 : hybrid_time; |
486 | 786 | std::array<Slice, 2> key_parts = {{ |
487 | 786 | Slice(kv_pair.key()), |
488 | 786 | doc_ht_buffer.EncodeWithValueType(hybrid_time, write_id), |
489 | 786 | }}; |
490 | 786 | Slice key_value = kv_pair.value(); |
491 | | // This entry contains external intents. |
492 | 786 | Slice key = kv_pair.key(); |
493 | 786 | key.consume_byte(); |
494 | 786 | auto txn_id = CHECK_RESULT(DecodeTransactionId(&key)); |
495 | 786 | auto it = apply_external_transactions->find(txn_id); |
496 | 786 | if (it != apply_external_transactions->end()) { |
497 | | // The same write operation could contain external intents and instruct us to apply them. |
498 | 0 | CHECK_OK(PrepareApplyExternalIntentsBatch( |
499 | 0 | it->second.commit_ht, key_value, regular_write_batch, &it->second.write_id)); |
500 | 0 | return false; |
501 | 0 | } |
502 | 786 | key_parts[1] = InvertEncodedDocHT(key_parts[1], &inverted_doc_ht_buffer); |
503 | 786 | constexpr size_t kNumValueParts = 1; |
504 | 786 | intents_write_batch->Put(key_parts, { &key_value, kNumValueParts }); |
505 | | |
506 | 786 | return false; |
507 | 786 | } |
508 | | |
509 | | // Usually put_batch contains only records that should be applied to regular DB. |
510 | | // So apply_external_transactions will be empty and regular_entry will be true. |
511 | | // |
512 | | // But in general case on consumer side of CDC put_batch could contain various kinds of records, |
513 | | // that should be applied into regular and intents db. |
514 | | // They are: |
515 | | // apply_external_transactions |
516 | | // The list of external transactions that should be applied. |
517 | | // For each such transaction we should lookup for existing external intents (stored in intents DB) |
518 | | // and convert them to Put command in regular_write_batch plus SingleDelete command in |
519 | | // intents_write_batch. |
520 | | // write_pairs |
521 | | // Could contain regular entries, that should be stored into regular DB as is. |
522 | | // Also pair could contain external intents, that should be stored into intents DB. |
523 | | // But if apply_external_transactions contains transaction for those external intents, then |
524 | | // those intents will be applied directly to regular DB, avoiding unnecessary write to intents DB. |
525 | | // This case is very common for short running transactions. |
526 | | bool PrepareExternalWriteBatch( |
527 | | const KeyValueWriteBatchPB& put_batch, |
528 | | HybridTime hybrid_time, |
529 | | rocksdb::DB* intents_db, |
530 | | rocksdb::WriteBatch* regular_write_batch, |
531 | 6.34M | rocksdb::WriteBatch* intents_write_batch) { |
532 | 6.34M | CHECK(put_batch.read_pairs().empty()); |
533 | | |
534 | 6.34M | auto apply_external_transactions = ProcessApplyExternalTransactions(put_batch); |
535 | | |
536 | 6.34M | CHECK_OK(PrepareApplyExternalIntents( |
537 | 6.34M | &apply_external_transactions, regular_write_batch, intents_db, intents_write_batch)); |
538 | | |
539 | 6.34M | bool has_non_external_kvs = false; |
540 | 132M | for (int write_id = 0; write_id < put_batch.write_pairs_size(); ++write_id125M ) { |
541 | 125M | has_non_external_kvs = AddExternalPairToWriteBatch( |
542 | 125M | put_batch.write_pairs(write_id), hybrid_time, write_id, &apply_external_transactions, |
543 | 125M | regular_write_batch, intents_write_batch) || has_non_external_kvs0 ; |
544 | 125M | } |
545 | 6.34M | return has_non_external_kvs; |
546 | 6.34M | } |
547 | | |
548 | | namespace { |
549 | | |
550 | | // Checks if the given slice points to the part of an encoded SubDocKey past all of the subkeys |
551 | | // (and definitely past all the hash/range keys). The only remaining part could be a hybrid time. |
552 | 108M | inline bool IsEndOfSubKeys(const Slice& key) { |
553 | 108M | return key[0] == ValueTypeAsChar::kGroupEnd && |
554 | 108M | (87.5M key.size() == 187.5M || key[1] == ValueTypeAsChar::kHybridTime71.5M ); |
555 | 108M | } |
556 | | |
557 | | // Enumerates weak intent keys generated by considering specified prefixes of the given key and |
558 | | // invoking the provided callback with each combination considered, stored in encoded_key_buffer. |
559 | | // On return, *encoded_key_buffer contains the corresponding strong intent, for which the callback |
560 | | // has not yet been called. It is left to the caller to use the final state of encoded_key_buffer. |
561 | | // |
562 | | // The prefixes of the key considered are as follows: |
563 | | // 1. Up to and including the whole hash key. |
564 | | // 2. Up to and including the whole range key, or if partial_range_key_intents is |
565 | | // PartialRangeKeyIntents::kTrue, then enumerate the prefix up to the end of each component of |
566 | | // the range key separately. |
567 | | // 3. Up to and including each subkey component, separately. |
568 | | // |
569 | | // In any case, we stop short of enumerating the last intent key generated based on the above, as |
570 | | // this represents the strong intent key and will be stored in encoded_key_buffer at the end of this |
571 | | // call. |
572 | | // |
573 | | // The beginning of each intent key will also include any cotable_id or colocation_id, |
574 | | // if present. |
575 | | Status EnumerateWeakIntents( |
576 | | Slice key, |
577 | | const EnumerateIntentsCallback& functor, |
578 | | KeyBytes* encoded_key_buffer, |
579 | 87.6M | PartialRangeKeyIntents partial_range_key_intents) { |
580 | 87.6M | static const Slice kEmptyIntentValue; |
581 | | |
582 | 87.6M | encoded_key_buffer->Clear(); |
583 | 87.6M | if (key.empty()) { |
584 | 0 | return STATUS(Corruption, "An empty slice is not a valid encoded SubDocKey"); |
585 | 0 | } |
586 | | |
587 | 87.6M | const bool has_cotable_id = *key.cdata() == ValueTypeAsChar::kTableId; |
588 | 87.6M | const bool has_colocation_id = *key.cdata() == ValueTypeAsChar::kColocationId; |
589 | 87.6M | { |
590 | 87.6M | bool is_table_root_key = false; |
591 | 87.6M | if (has_cotable_id) { |
592 | 12.2M | const auto kMinExpectedSize = kUuidSize + 2; |
593 | 12.2M | if (key.size() < kMinExpectedSize) { |
594 | 0 | return STATUS_FORMAT( |
595 | 0 | Corruption, |
596 | 0 | "Expected an encoded SubDocKey starting with a cotable id to be at least $0 bytes long", |
597 | 0 | kMinExpectedSize); |
598 | 0 | } |
599 | 12.2M | encoded_key_buffer->AppendRawBytes(key.cdata(), kUuidSize + 1); |
600 | 12.2M | is_table_root_key = key[kUuidSize + 1] == ValueTypeAsChar::kGroupEnd; |
601 | 75.4M | } else if (has_colocation_id) { |
602 | 1.66k | const auto kMinExpectedSize = sizeof(ColocationId) + 2; |
603 | 1.66k | if (key.size() < kMinExpectedSize) { |
604 | 0 | return STATUS_FORMAT( |
605 | 0 | Corruption, |
606 | 0 | "Expected an encoded SubDocKey starting with a colocation id to be" |
607 | 0 | " at least $0 bytes long", |
608 | 0 | kMinExpectedSize); |
609 | 0 | } |
610 | 1.66k | encoded_key_buffer->AppendRawBytes(key.cdata(), sizeof(ColocationId) + 1); |
611 | 1.66k | is_table_root_key = key[sizeof(ColocationId) + 1] == ValueTypeAsChar::kGroupEnd; |
612 | 75.4M | } else { |
613 | 75.4M | is_table_root_key = *key.cdata() == ValueTypeAsChar::kGroupEnd; |
614 | 75.4M | } |
615 | | |
616 | 87.6M | encoded_key_buffer->AppendValueType(ValueType::kGroupEnd); |
617 | | |
618 | 87.6M | if (is_table_root_key) { |
619 | | // This must be a "table root" (or "tablet root") key (no hash components, no range |
620 | | // components, but the cotable might still be there). We are not really considering the case |
621 | | // of any subkeys under the empty key, so we can return here. |
622 | 95.7k | return Status::OK(); |
623 | 95.7k | } |
624 | 87.6M | } |
625 | | |
626 | | // For any non-empty key we already know that the empty key intent is weak. |
627 | 87.5M | RETURN_NOT_OK(functor( |
628 | 87.5M | IntentStrength::kWeak, FullDocKey::kFalse, kEmptyIntentValue, encoded_key_buffer, |
629 | 87.5M | LastKey::kFalse)); |
630 | | |
631 | 87.5M | auto hashed_part_size = VERIFY_RESULT(DocKey::EncodedSize(key, DocKeyPart::kUpToHash)); |
632 | | |
633 | | // Remove kGroupEnd that we just added to generate a weak intent. |
634 | 0 | encoded_key_buffer->RemoveLastByte(); |
635 | | |
636 | 87.5M | if (hashed_part_size != encoded_key_buffer->size()) { |
637 | | // A hash component is present. Note that if cotable id is present, hashed_part_size would |
638 | | // also include it, so we only need to append the new bytes. |
639 | 70.8M | encoded_key_buffer->AppendRawBytes( |
640 | 70.8M | key.cdata() + encoded_key_buffer->size(), hashed_part_size - encoded_key_buffer->size()); |
641 | 70.8M | key.remove_prefix(hashed_part_size); |
642 | 70.8M | if (key.empty()) { |
643 | 0 | return STATUS(Corruption, "Range key part missing, expected at least a kGroupEnd"); |
644 | 0 | } |
645 | | |
646 | | // Append the kGroupEnd at the end for the empty range part to make this a valid encoded DocKey. |
647 | 70.8M | encoded_key_buffer->AppendValueType(ValueType::kGroupEnd); |
648 | 70.8M | if (IsEndOfSubKeys(key)) { |
649 | | // This means the key ends at the hash component -- no range keys and no subkeys. |
650 | 13.3M | return Status::OK(); |
651 | 13.3M | } |
652 | | |
653 | | // Generate a weak intent that only includes the hash component. |
654 | 57.5M | RETURN_NOT_OK(functor( |
655 | 57.5M | IntentStrength::kWeak, FullDocKey(key[0] == ValueTypeAsChar::kGroupEnd), kEmptyIntentValue, |
656 | 57.5M | encoded_key_buffer, LastKey::kFalse)); |
657 | | |
658 | | // Remove the kGroupEnd we added a bit earlier so we can append some range components. |
659 | 57.5M | encoded_key_buffer->RemoveLastByte(); |
660 | 57.5M | } else { |
661 | | // No hash component. |
662 | 16.6M | key.remove_prefix(hashed_part_size); |
663 | 16.6M | } |
664 | | |
665 | | // Range components. |
666 | 74.1M | auto range_key_start = key.cdata(); |
667 | 74.1M | while (VERIFY_RESULT(ConsumePrimitiveValueFromKey(&key))) { |
668 | | // Append the consumed primitive value to encoded_key_buffer. |
669 | 37.4M | encoded_key_buffer->AppendRawBytes(range_key_start, key.cdata() - range_key_start); |
670 | | // We always need kGroupEnd at the end to make this a valid encoded DocKey. |
671 | 37.4M | encoded_key_buffer->AppendValueType(ValueType::kGroupEnd); |
672 | 37.4M | if (key.empty()) { |
673 | 0 | return STATUS(Corruption, "Range key part is not terminated with a kGroupEnd"); |
674 | 0 | } |
675 | 37.4M | if (IsEndOfSubKeys(key)) { |
676 | | // This is the last range key and there are no subkeys. |
677 | 2.60M | return Status::OK(); |
678 | 2.60M | } |
679 | 34.8M | FullDocKey full_doc_key(key[0] == ValueTypeAsChar::kGroupEnd); |
680 | 34.8M | if (partial_range_key_intents || full_doc_key19.9M ) { |
681 | 26.0M | RETURN_NOT_OK(functor( |
682 | 26.0M | IntentStrength::kWeak, full_doc_key, kEmptyIntentValue, encoded_key_buffer, |
683 | 26.0M | LastKey::kFalse)); |
684 | 26.0M | } |
685 | 34.8M | encoded_key_buffer->RemoveLastByte(); |
686 | 34.8M | range_key_start = key.cdata(); |
687 | 34.8M | } |
688 | | |
689 | | // We still need to append the kGroupEnd byte that closes the range portion to our buffer. |
690 | | // The corresponding kGroupEnd has already been consumed from the key slice by the last call to |
691 | | // ConsumePrimitiveValueFromKey, which returned false. |
692 | 71.5M | encoded_key_buffer->AppendValueType(ValueType::kGroupEnd); |
693 | | |
694 | | // Subkey components. |
695 | 71.5M | auto subkey_start = key.cdata(); |
696 | 71.5M | while (VERIFY_RESULT(SubDocKey::DecodeSubkey(&key))) { |
697 | | // Append the consumed value to encoded_key_buffer. |
698 | 71.5M | encoded_key_buffer->AppendRawBytes(subkey_start, key.cdata() - subkey_start); |
699 | 71.5M | if (key.empty() || *key.cdata() == ValueTypeAsChar::kHybridTime840 ) { |
700 | | // This was the last subkey. |
701 | 71.5M | return Status::OK(); |
702 | 71.5M | } |
703 | 48.0k | RETURN_NOT_OK(functor( |
704 | 48.0k | IntentStrength::kWeak, FullDocKey::kTrue, kEmptyIntentValue, encoded_key_buffer, |
705 | 48.0k | LastKey::kFalse)); |
706 | 48.0k | subkey_start = key.cdata(); |
707 | 48.0k | } |
708 | | |
709 | 37.5k | return STATUS( |
710 | 71.5M | Corruption, |
711 | 71.5M | "Expected to reach the end of the key after decoding last valid subkey"); |
712 | 71.5M | } |
713 | | |
714 | | } // anonymous namespace |
715 | | |
716 | | Status EnumerateIntents( |
717 | | Slice key, const Slice& intent_value, const EnumerateIntentsCallback& functor, |
718 | | KeyBytes* encoded_key_buffer, PartialRangeKeyIntents partial_range_key_intents, |
719 | 87.6M | LastKey last_key) { |
720 | 87.6M | RETURN_NOT_OK(EnumerateWeakIntents( |
721 | 87.6M | key, functor, encoded_key_buffer, partial_range_key_intents)); |
722 | 87.6M | return functor( |
723 | 87.6M | IntentStrength::kStrong, FullDocKey::kTrue, intent_value, encoded_key_buffer, last_key); |
724 | 87.6M | } |
725 | | |
726 | | Status EnumerateIntents( |
727 | | const google::protobuf::RepeatedPtrField<KeyValuePairPB> &kv_pairs, |
728 | 3.21M | const EnumerateIntentsCallback& functor, PartialRangeKeyIntents partial_range_key_intents) { |
729 | 3.21M | KeyBytes encoded_key; |
730 | | |
731 | 73.6M | for (int index = 0; index < kv_pairs.size(); ) { |
732 | 70.3M | const auto &kv_pair = kv_pairs.Get(index); |
733 | 70.3M | ++index; |
734 | 70.3M | CHECK(!kv_pair.key().empty()); |
735 | 70.3M | CHECK(!kv_pair.value().empty()); |
736 | 70.3M | RETURN_NOT_OK(EnumerateIntents( |
737 | 70.3M | kv_pair.key(), kv_pair.value(), functor, &encoded_key, partial_range_key_intents, |
738 | 70.3M | LastKey(index == kv_pairs.size()))); |
739 | 70.3M | } |
740 | | |
741 | 3.21M | return Status::OK(); |
742 | 3.21M | } |
743 | | |
744 | | // ------------------------------------------------------------------------------------------------ |
745 | | // Standalone functions |
746 | | // ------------------------------------------------------------------------------------------------ |
747 | | |
748 | 3.03M | void AppendTransactionKeyPrefix(const TransactionId& transaction_id, KeyBytes* out) { |
749 | 3.03M | out->AppendValueType(ValueType::kTransactionId); |
750 | 3.03M | out->AppendRawBytes(transaction_id.AsSlice()); |
751 | 3.03M | } |
752 | | |
753 | | Result<ApplyTransactionState> GetIntentsBatch( |
754 | | const TransactionId& transaction_id, |
755 | | const KeyBounds* key_bounds, |
756 | | const ApplyTransactionState* stream_state, |
757 | | rocksdb::DB* intents_db, |
758 | 147 | std::vector<IntentKeyValueForCDC>* key_value_intents) { |
759 | 147 | KeyBytes txn_reverse_index_prefix; |
760 | 147 | Slice transaction_id_slice = transaction_id.AsSlice(); |
761 | 147 | AppendTransactionKeyPrefix(transaction_id, &txn_reverse_index_prefix); |
762 | 147 | txn_reverse_index_prefix.AppendValueType(ValueType::kMaxByte); |
763 | 147 | Slice key_prefix = txn_reverse_index_prefix.AsSlice(); |
764 | 147 | key_prefix.remove_suffix(1); |
765 | 147 | const Slice reverse_index_upperbound = txn_reverse_index_prefix.AsSlice(); |
766 | | |
767 | 147 | auto reverse_index_iter = CreateRocksDBIterator( |
768 | 147 | intents_db, &KeyBounds::kNoBounds, BloomFilterMode::DONT_USE_BLOOM_FILTER, boost::none, |
769 | 147 | rocksdb::kDefaultQueryId, nullptr /* read_filter */, &reverse_index_upperbound); |
770 | | |
771 | 147 | BoundedRocksDbIterator intent_iter = CreateRocksDBIterator( |
772 | 147 | intents_db, key_bounds, BloomFilterMode::DONT_USE_BLOOM_FILTER, boost::none, |
773 | 147 | rocksdb::kDefaultQueryId); |
774 | | |
775 | 147 | reverse_index_iter.Seek(key_prefix); |
776 | | |
777 | 147 | DocHybridTimeBuffer doc_ht_buffer; |
778 | 147 | IntraTxnWriteId write_id = 0; |
779 | 147 | if (stream_state != nullptr && stream_state->active() && stream_state->write_id != 00 ) { |
780 | 0 | reverse_index_iter.Seek(stream_state->key); |
781 | 0 | write_id = stream_state->write_id; |
782 | 0 | reverse_index_iter.Next(); |
783 | 0 | } |
784 | 147 | const uint64_t max_records = FLAGS_cdc_max_stream_intent_records; |
785 | 147 | const uint64_t write_id_limit = write_id + max_records; |
786 | | |
787 | 1.49k | while (reverse_index_iter.Valid()) { |
788 | 1.34k | const Slice key_slice(reverse_index_iter.key()); |
789 | | |
790 | 1.34k | if (!key_slice.starts_with(key_prefix)) { |
791 | 0 | break; |
792 | 0 | } |
793 | | // If the key ends at the transaction id then it is transaction metadata (status tablet, |
794 | | // isolation level etc.). |
795 | 1.34k | if (key_slice.size() > txn_reverse_index_prefix.size()) { |
796 | 1.19k | auto reverse_index_value = reverse_index_iter.value(); |
797 | 1.19k | if (!reverse_index_value.empty() && reverse_index_value[0] == ValueTypeAsChar::kBitSet) { |
798 | 0 | reverse_index_value.remove_prefix(1); |
799 | 0 | RETURN_NOT_OK(OneWayBitmap::Skip(&reverse_index_value)); |
800 | 0 | } |
801 | | // Value of reverse index is a key of original intent record, so seek it and check match. |
802 | 1.19k | if ((!key_bounds || key_bounds->IsWithinBounds(reverse_index_iter.value()))) { |
803 | | // return when we have reached the batch limit. |
804 | 1.19k | if (write_id >= write_id_limit) { |
805 | 0 | return ApplyTransactionState{ |
806 | 0 | .key = key_slice.ToBuffer(), |
807 | 0 | .write_id = write_id, |
808 | 0 | }; |
809 | 0 | } |
810 | 1.19k | { |
811 | 1.19k | intent_iter.Seek(reverse_index_value); |
812 | 1.19k | if (!intent_iter.Valid() || intent_iter.key() != reverse_index_value) { |
813 | 0 | LOG(WARNING) << "Unable to find intent: " << reverse_index_value.ToDebugHexString() |
814 | 0 | << " for " << key_slice.ToDebugHexString(); |
815 | 0 | return ApplyTransactionState{}; |
816 | 0 | } |
817 | | |
818 | 1.19k | auto intent = VERIFY_RESULT(ParseIntentKey(intent_iter.key(), transaction_id_slice)); |
819 | | |
820 | 1.19k | if (intent.types.Test(IntentType::kStrongWrite)) { |
821 | 617 | auto decoded_value = |
822 | 617 | VERIFY_RESULT(DecodeIntentValue(intent_iter.value(), &transaction_id_slice)); |
823 | 0 | write_id = decoded_value.write_id; |
824 | | |
825 | 617 | if (decoded_value.body.starts_with(ValueTypeAsChar::kRowLock)) { |
826 | 0 | continue; |
827 | 0 | } |
828 | | |
829 | 617 | std::array<Slice, 1> key_parts = {{ |
830 | 617 | intent.doc_path, |
831 | 617 | }}; |
832 | 617 | std::array<Slice, 1> value_parts = {{ |
833 | 617 | decoded_value.body, |
834 | 617 | }}; |
835 | | |
836 | 617 | IntentKeyValueForCDC intent_metadata; |
837 | 617 | intent_metadata.key = Slice(key_parts, &(intent_metadata.key_buf)); |
838 | 617 | intent_metadata.value = Slice(value_parts, &(intent_metadata.value_buf)); |
839 | 617 | intent_metadata.reverse_index_key = key_slice.ToBuffer(); |
840 | 617 | intent_metadata.write_id = write_id; |
841 | 617 | (*key_value_intents).push_back(intent_metadata); |
842 | | |
843 | 617 | VLOG(4) << "The size of intentKeyValues in GetIntentList " |
844 | 0 | << (*key_value_intents).size(); |
845 | 617 | ++write_id; |
846 | 617 | } |
847 | 1.19k | } |
848 | 1.19k | } |
849 | 1.19k | } |
850 | 1.34k | reverse_index_iter.Next(); |
851 | 1.34k | } |
852 | | |
853 | 147 | return ApplyTransactionState{}; |
854 | 147 | } |
855 | | |
856 | 0 | std::string ApplyTransactionState::ToString() const { |
857 | 0 | return Format( |
858 | 0 | "{ key: $0 write_id: $1 aborted: $2 }", Slice(key).ToDebugString(), write_id, aborted); |
859 | 0 | } |
860 | | |
861 | | void CombineExternalIntents( |
862 | | const TransactionId& txn_id, |
863 | 2 | ExternalIntentsProvider* provider) { |
864 | | // External intents are stored in the following format: |
865 | | // key: kExternalTransactionId, txn_id |
866 | | // value: size(intent1_key), intent1_key, size(intent1_value), intent1_value, size(intent2_key)... |
867 | | // where size is encoded as varint. |
868 | | |
869 | 2 | docdb::KeyBytes buffer; |
870 | 2 | buffer.AppendValueType(ValueType::kExternalTransactionId); |
871 | 2 | buffer.AppendRawBytes(txn_id.AsSlice()); |
872 | 2 | provider->SetKey(buffer.AsSlice()); |
873 | 2 | buffer.Clear(); |
874 | 2 | buffer.AppendValueType(ValueType::kUuid); |
875 | 2 | buffer.AppendRawBytes(provider->InvolvedTablet().AsSlice()); |
876 | 2 | buffer.AppendValueType(ValueType::kExternalIntents); |
877 | 6 | while (auto key_value = provider->Next()) { |
878 | 4 | buffer.AppendUInt64AsVarInt(key_value->first.size()); |
879 | 4 | buffer.AppendRawBytes(key_value->first); |
880 | 4 | buffer.AppendUInt64AsVarInt(key_value->second.size()); |
881 | 4 | buffer.AppendRawBytes(key_value->second); |
882 | 4 | } |
883 | 2 | buffer.AppendUInt64AsVarInt(0); |
884 | 2 | provider->SetValue(buffer.AsSlice()); |
885 | 2 | } |
886 | | |
887 | | } // namespace docdb |
888 | | } // namespace yb |