/Users/deen/code/yugabyte-db/ent/src/yb/tserver/twodc_write_implementations.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | |
13 | | #include <deque> |
14 | | |
15 | | #include "yb/common/transaction.h" |
16 | | |
17 | | #include "yb/docdb/docdb.h" |
18 | | #include "yb/docdb/key_bytes.h" |
19 | | |
20 | | #include "yb/tserver/twodc_write_interface.h" |
21 | | #include "yb/tserver/tserver.pb.h" |
22 | | |
23 | | #include "yb/cdc/cdc_service.pb.h" |
24 | | |
25 | | #include "yb/util/size_literals.h" |
26 | | #include "yb/util/flag_tags.h" |
27 | | #include "yb/util/flags.h" |
28 | | |
29 | | #include "yb/common/hybrid_time.h" |
30 | | |
31 | | DEFINE_int32(cdc_max_apply_batch_num_records, 1024, "Max CDC write request batch num records. If" |
32 | | " set to 0, there is no max num records, which" |
33 | | " means batches will be limited only by size."); |
34 | | TAG_FLAG(cdc_max_apply_batch_num_records, runtime); |
35 | | |
36 | | DEFINE_int32(cdc_max_apply_batch_size_bytes, 0, "Max CDC write request batch size in kb. If 0," |
37 | | " default to consensus_max_batch_size_bytes."); |
38 | | TAG_FLAG(cdc_max_apply_batch_size_bytes, runtime); |
39 | | |
40 | | DEFINE_test_flag(bool, twodc_write_hybrid_time, false, |
41 | | "Override external_hybrid_time with initialHybridTimeValue for testing."); |
42 | | |
43 | | DECLARE_uint64(consensus_max_batch_size_bytes); |
44 | | |
45 | | namespace yb { |
46 | | |
47 | | using namespace yb::size_literals; |
48 | | |
49 | | namespace tserver { |
50 | | namespace enterprise { |
51 | | |
52 | | CHECKED_STATUS CombineExternalIntents( |
53 | | const tablet::TransactionStatePB& transaction_state, |
54 | | const google::protobuf::RepeatedPtrField<cdc::KeyValuePairPB>& pairs, |
55 | 0 | google::protobuf::RepeatedPtrField<docdb::KeyValuePairPB> *out) { |
56 | |
|
57 | 0 | class Provider : public docdb::ExternalIntentsProvider { |
58 | 0 | public: |
59 | 0 | Provider( |
60 | 0 | const Uuid& involved_tablet, |
61 | 0 | const google::protobuf::RepeatedPtrField<cdc::KeyValuePairPB>* pairs, |
62 | 0 | docdb::KeyValuePairPB* out) |
63 | 0 | : involved_tablet_(involved_tablet), pairs_(*pairs), out_(out) { |
64 | 0 | } |
65 | |
|
66 | 0 | void SetKey(const Slice& slice) override { |
67 | 0 | out_->set_key(slice.cdata(), slice.size()); |
68 | 0 | } |
69 | |
|
70 | 0 | void SetValue(const Slice& slice) override { |
71 | 0 | out_->set_value(slice.cdata(), slice.size()); |
72 | 0 | } |
73 | |
|
74 | 0 | const Uuid& InvolvedTablet() override { |
75 | 0 | return involved_tablet_; |
76 | 0 | } |
77 | |
|
78 | 0 | boost::optional<std::pair<Slice, Slice>> Next() override { |
79 | 0 | if (next_idx_ >= pairs_.size()) { |
80 | 0 | return boost::none; |
81 | 0 | } |
82 | | |
83 | 0 | const auto& input = pairs_[next_idx_]; |
84 | 0 | ++next_idx_; |
85 | |
|
86 | 0 | return std::pair<Slice, Slice>(input.key(), input.value().binary_value()); |
87 | 0 | } |
88 | |
|
89 | 0 | private: |
90 | 0 | Uuid involved_tablet_; |
91 | 0 | const google::protobuf::RepeatedPtrField<cdc::KeyValuePairPB>& pairs_; |
92 | 0 | docdb::KeyValuePairPB* out_; |
93 | 0 | int next_idx_ = 0; |
94 | 0 | }; |
95 | |
|
96 | 0 | auto txn_id = VERIFY_RESULT(FullyDecodeTransactionId(transaction_state.transaction_id())); |
97 | 0 | SCHECK_EQ(transaction_state.tablets().size(), 1, InvalidArgument, "Wrong tablets number"); |
98 | 0 | Uuid status_tablet; |
99 | 0 | RETURN_NOT_OK(status_tablet.FromHexString(transaction_state.tablets()[0])); |
100 | | |
101 | 0 | Provider provider(status_tablet, &pairs, out->Add()); |
102 | 0 | docdb::CombineExternalIntents(txn_id, &provider); |
103 | 0 | return Status::OK(); |
104 | 0 | } |
105 | | |
106 | | CHECKED_STATUS AddRecord( |
107 | | const cdc::CDCRecordPB& record, |
108 | 0 | docdb::KeyValueWriteBatchPB* write_batch) { |
109 | 0 | if (record.operation() == cdc::CDCRecordPB::APPLY) { |
110 | 0 | auto* apply_txn = write_batch->mutable_apply_external_transactions()->Add(); |
111 | 0 | apply_txn->set_transaction_id(record.transaction_state().transaction_id()); |
112 | 0 | apply_txn->set_commit_hybrid_time(record.transaction_state().commit_hybrid_time()); |
113 | 0 | return Status::OK(); |
114 | 0 | } |
115 | | |
116 | 0 | if (record.has_transaction_state()) { |
117 | 0 | return CombineExternalIntents( |
118 | 0 | record.transaction_state(), record.changes(), write_batch->mutable_write_pairs()); |
119 | 0 | } |
120 | | |
121 | 0 | for (const auto& kv_pair : record.changes()) { |
122 | 0 | auto* write_pair = write_batch->mutable_write_pairs()->Add(); |
123 | 0 | write_pair->set_key(kv_pair.key()); |
124 | 0 | write_pair->set_value(kv_pair.value().binary_value()); |
125 | 0 | if (PREDICT_FALSE(FLAGS_TEST_twodc_write_hybrid_time)) { |
126 | | // Used only for testing external hybrid time. |
127 | 0 | write_pair->set_external_hybrid_time(yb::kInitialHybridTimeValue); |
128 | 0 | } else { |
129 | 0 | write_pair->set_external_hybrid_time(record.time()); |
130 | 0 | } |
131 | 0 | } |
132 | |
|
133 | 0 | return Status::OK(); |
134 | 0 | } |
135 | | |
136 | | // The BatchedWriteImplementation strategy batches together multiple records per WriteRequestPB. |
137 | | // Max number of records in a request is cdc_max_apply_batch_num_records, and max size of a request |
138 | | // is cdc_max_apply_batch_size_kb. Batches are not sent by opid order, since a GetChangesResponse |
139 | | // can contain interleaved records to multiple tablets. Rather, we send batches to each tablet |
140 | | // in order for that tablet, before moving on to the next tablet. |
141 | | class BatchedWriteImplementation : public TwoDCWriteInterface { |
142 | 0 | ~BatchedWriteImplementation() = default; |
143 | | |
144 | 0 | Status ProcessRecord(const std::string& tablet_id, const cdc::CDCRecordPB& record) override { |
145 | 0 | auto it = records_.find(tablet_id); |
146 | 0 | if (it == records_.end()) { |
147 | 0 | it = records_.emplace(tablet_id, std::deque<std::unique_ptr<WriteRequestPB>>()).first; |
148 | 0 | } |
149 | |
|
150 | 0 | auto& queue = it->second; |
151 | |
|
152 | 0 | auto max_batch_records = FLAGS_cdc_max_apply_batch_num_records != 0 ? |
153 | 0 | FLAGS_cdc_max_apply_batch_num_records : std::numeric_limits<uint32_t>::max(); |
154 | 0 | auto max_batch_size = FLAGS_cdc_max_apply_batch_size_bytes != 0 ? |
155 | 0 | FLAGS_cdc_max_apply_batch_size_bytes : FLAGS_consensus_max_batch_size_bytes; |
156 | |
|
157 | 0 | if (queue.empty() || |
158 | 0 | implicit_cast<size_t>(queue.back()->write_batch().write_pairs_size()) |
159 | 0 | >= max_batch_records || |
160 | 0 | queue.back()->ByteSizeLong() >= max_batch_size) { |
161 | | // Create a new batch. |
162 | 0 | auto req = std::make_unique<WriteRequestPB>(); |
163 | 0 | req->set_tablet_id(tablet_id); |
164 | 0 | req->set_external_hybrid_time(record.time()); |
165 | 0 | queue.push_back(std::move(req)); |
166 | 0 | } |
167 | 0 | auto* write_request = queue.back().get(); |
168 | |
|
169 | 0 | return AddRecord(record, write_request->mutable_write_batch()); |
170 | 0 | } |
171 | | |
172 | 0 | std::unique_ptr<WriteRequestPB> GetNextWriteRequest() override { |
173 | 0 | if (records_.empty()) { |
174 | 0 | return nullptr; |
175 | 0 | } |
176 | 0 | auto& queue = records_.begin()->second; |
177 | 0 | auto next_req = std::move(queue.front()); |
178 | 0 | queue.pop_front(); |
179 | 0 | if (queue.empty()) { |
180 | 0 | records_.erase(next_req->tablet_id()); |
181 | 0 | } |
182 | 0 | return next_req; |
183 | 0 | } |
184 | | |
185 | | private: |
186 | | std::map<std::string, std::deque<std::unique_ptr<WriteRequestPB>>> records_; |
187 | | }; |
188 | | |
189 | 0 | void ResetWriteInterface(std::unique_ptr<TwoDCWriteInterface>* write_strategy) { |
190 | 0 | write_strategy->reset(new BatchedWriteImplementation()); |
191 | 0 | } |
192 | | |
193 | | } // namespace enterprise |
194 | | } // namespace tserver |
195 | | } // namespace yb |