/Users/deen/code/yugabyte-db/src/yb/consensus/retryable_requests.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/consensus/retryable_requests.h" |
15 | | |
16 | | #include <boost/multi_index/hashed_index.hpp> |
17 | | #include <boost/multi_index/member.hpp> |
18 | | #include <boost/multi_index/ordered_index.hpp> |
19 | | #include <boost/multi_index_container.hpp> |
20 | | |
21 | | #include "yb/consensus/consensus.pb.h" |
22 | | #include "yb/consensus/consensus_round.h" |
23 | | |
24 | | #include "yb/tablet/operations.pb.h" |
25 | | |
26 | | #include "yb/util/atomic.h" |
27 | | #include "yb/util/flag_tags.h" |
28 | | #include "yb/util/format.h" |
29 | | #include "yb/util/logging.h" |
30 | | #include "yb/util/metrics.h" |
31 | | #include "yb/util/opid.h" |
32 | | #include "yb/util/result.h" |
33 | | #include "yb/util/status_format.h" |
34 | | |
35 | | using namespace std::literals; |
36 | | |
37 | | DEFINE_int32(retryable_request_timeout_secs, 120, |
38 | | "Amount of time to keep write request in index, to prevent duplicate writes."); |
39 | | TAG_FLAG(retryable_request_timeout_secs, runtime); |
40 | | |
41 | | // We use this limit to prevent request range from infinite grow, because it will block log |
42 | | // cleanup. I.e. even we have continous request range, it will be split by blocks, that could be |
43 | | // dropped independently. |
44 | | DEFINE_int32(retryable_request_range_time_limit_secs, 30, |
45 | | "Max delta in time for single op id range."); |
46 | | |
47 | | METRIC_DEFINE_gauge_int64(tablet, running_retryable_requests, |
48 | | "Number of running retryable requests.", |
49 | | yb::MetricUnit::kRequests, |
50 | | "Number of running retryable requests."); |
51 | | |
52 | | METRIC_DEFINE_gauge_int64(tablet, replicated_retryable_request_ranges, |
53 | | "Number of replicated retryable request ranges.", |
54 | | yb::MetricUnit::kRequests, |
55 | | "Number of replicated retryable request ranges."); |
56 | | |
57 | | namespace yb { |
58 | | namespace consensus { |
59 | | |
60 | | namespace { |
61 | | |
62 | | struct RunningRetryableRequest { |
63 | | RetryableRequestId request_id; |
64 | | RestartSafeCoarseTimePoint time; |
65 | | mutable std::vector<ConsensusRoundPtr> duplicate_rounds; |
66 | | |
67 | | RunningRetryableRequest( |
68 | | RetryableRequestId request_id_, RestartSafeCoarseTimePoint time_) |
69 | 7.01M | : request_id(request_id_), time(time_) {} |
70 | | |
71 | 0 | std::string ToString() const { |
72 | 0 | return YB_STRUCT_TO_STRING(request_id, time); |
73 | 0 | } |
74 | | }; |
75 | | |
76 | | struct ReplicatedRetryableRequestRange { |
77 | | mutable RetryableRequestId first_id; |
78 | | RetryableRequestId last_id; |
79 | | yb::OpId min_op_id; |
80 | | mutable RestartSafeCoarseTimePoint min_time; |
81 | | mutable RestartSafeCoarseTimePoint max_time; |
82 | | |
83 | | ReplicatedRetryableRequestRange(RetryableRequestId id, const yb::OpId& op_id, |
84 | | RestartSafeCoarseTimePoint time) |
85 | | : first_id(id), last_id(id), min_op_id(op_id), min_time(time), |
86 | 7.31M | max_time(time) {} |
87 | | |
88 | 685k | void InsertTime(const RestartSafeCoarseTimePoint& time) const { |
89 | 685k | min_time = std::min(min_time, time); |
90 | 685k | max_time = std::max(max_time, time); |
91 | 685k | } |
92 | | |
93 | 20.8k | void PrepareJoinWithPrev(const ReplicatedRetryableRequestRange& prev) const { |
94 | 20.8k | min_time = std::min(min_time, prev.min_time); |
95 | 20.8k | max_time = std::max(max_time, prev.max_time); |
96 | 20.8k | first_id = prev.first_id; |
97 | 20.8k | } |
98 | | |
99 | 0 | std::string ToString() const { |
100 | 0 | return Format("{ first_id: $0 last_id: $1 min_op_id: $2 min_time: $3 max_time: $4 }", |
101 | 0 | first_id, last_id, min_op_id, min_time, max_time); |
102 | 0 | } |
103 | | }; |
104 | | |
105 | | struct LastIdIndex; |
106 | | struct OpIdIndex; |
107 | | struct RequestIdIndex; |
108 | | |
109 | | typedef boost::multi_index_container < |
110 | | RunningRetryableRequest, |
111 | | boost::multi_index::indexed_by < |
112 | | boost::multi_index::hashed_unique < |
113 | | boost::multi_index::tag<RequestIdIndex>, |
114 | | boost::multi_index::member < |
115 | | RunningRetryableRequest, RetryableRequestId, &RunningRetryableRequest::request_id |
116 | | > |
117 | | > |
118 | | > |
119 | | > RunningRetryableRequests; |
120 | | |
121 | | typedef boost::multi_index_container < |
122 | | ReplicatedRetryableRequestRange, |
123 | | boost::multi_index::indexed_by < |
124 | | boost::multi_index::ordered_unique < |
125 | | boost::multi_index::tag<LastIdIndex>, |
126 | | boost::multi_index::member < |
127 | | ReplicatedRetryableRequestRange, RetryableRequestId, |
128 | | &ReplicatedRetryableRequestRange::last_id |
129 | | > |
130 | | >, |
131 | | boost::multi_index::ordered_unique < |
132 | | boost::multi_index::tag<OpIdIndex>, |
133 | | boost::multi_index::member < |
134 | | ReplicatedRetryableRequestRange, yb::OpId, |
135 | | &ReplicatedRetryableRequestRange::min_op_id |
136 | | > |
137 | | > |
138 | | > |
139 | | > ReplicatedRetryableRequestRanges; |
140 | | |
141 | | typedef ReplicatedRetryableRequestRanges::index<LastIdIndex>::type |
142 | | ReplicatedRetryableRequestRangesByLastId; |
143 | | |
144 | | struct ClientRetryableRequests { |
145 | | RunningRetryableRequests running; |
146 | | ReplicatedRetryableRequestRanges replicated; |
147 | | RetryableRequestId min_running_request_id = 0; |
148 | | RestartSafeCoarseTimePoint empty_since; |
149 | | }; |
150 | | |
151 | 656k | std::chrono::seconds RangeTimeLimit() { |
152 | 656k | return std::chrono::seconds(FLAGS_retryable_request_range_time_limit_secs); |
153 | 656k | } |
154 | | |
155 | | class ReplicateData { |
156 | | public: |
157 | 5.97M | ReplicateData() : client_id_(ClientId::Nil()), write_(nullptr) {} |
158 | | |
159 | | explicit ReplicateData(const tablet::WritePB* write, const OpIdPB& op_id) |
160 | | : client_id_(write->client_id1(), write->client_id2()), |
161 | 18.1M | write_(write), op_id_(OpId::FromPB(op_id)) { |
162 | | |
163 | 18.1M | } |
164 | | |
165 | 24.0M | static ReplicateData FromMsg(const ReplicateMsg& replicate_msg) { |
166 | 24.0M | if (!replicate_msg.has_write()) { |
167 | 5.97M | return ReplicateData(); |
168 | 5.97M | } |
169 | | |
170 | 18.1M | return ReplicateData(&replicate_msg.write(), replicate_msg.id()); |
171 | 24.0M | } |
172 | | |
173 | 24.0M | bool operator!() const { |
174 | 24.0M | return client_id_.IsNil(); |
175 | 24.0M | } |
176 | | |
177 | 0 | explicit operator bool() const { |
178 | 0 | return !!*this; |
179 | 0 | } |
180 | | |
181 | 15.0M | const ClientId& client_id() const { |
182 | 15.0M | return client_id_; |
183 | 15.0M | } |
184 | | |
185 | 8.00M | const tablet::WritePB& write() const { |
186 | 8.00M | return *write_; |
187 | 8.00M | } |
188 | | |
189 | 37.0M | RetryableRequestId request_id() const { |
190 | 37.0M | return write_->request_id(); |
191 | 37.0M | } |
192 | | |
193 | 0 | const yb::OpId& op_id() const { |
194 | 0 | return op_id_; |
195 | 0 | } |
196 | | |
197 | | private: |
198 | | ClientId client_id_; |
199 | | const tablet::WritePB* write_; |
200 | | yb::OpId op_id_; |
201 | | }; |
202 | | |
203 | 0 | std::ostream& operator<<(std::ostream& out, const ReplicateData& data) { |
204 | 0 | return out << data.client_id() << '/' << data.request_id() << ": " |
205 | 0 | << data.write().ShortDebugString() << " op_id: " << data.op_id(); |
206 | 0 | } |
207 | | |
208 | | } // namespace |
209 | | |
210 | | class RetryableRequests::Impl { |
211 | | public: |
212 | 292k | explicit Impl(std::string log_prefix) : log_prefix_(std::move(log_prefix)) { |
213 | 18.4E | VLOG_WITH_PREFIX(1) << "Start"; |
214 | 292k | } |
215 | | |
216 | 8.55M | bool Register(const ConsensusRoundPtr& round, RestartSafeCoarseTimePoint entry_time) { |
217 | 8.55M | auto data = ReplicateData::FromMsg(*round->replicate_msg()); |
218 | 8.55M | if (!data) { |
219 | 1.54M | return true; |
220 | 1.54M | } |
221 | | |
222 | 7.01M | if (entry_time == RestartSafeCoarseTimePoint()) { |
223 | 7.01M | entry_time = clock_.Now(); |
224 | 7.01M | } |
225 | | |
226 | 7.01M | ClientRetryableRequests& client_retryable_requests = clients_[data.client_id()]; |
227 | | |
228 | 7.01M | CleanupReplicatedRequests( |
229 | 7.01M | data.write().min_running_request_id(), &client_retryable_requests); |
230 | | |
231 | 7.01M | if (data.request_id() < client_retryable_requests.min_running_request_id) { |
232 | 0 | round->NotifyReplicationFinished( |
233 | 0 | STATUS_EC_FORMAT( |
234 | 0 | Expired, |
235 | 0 | MinRunningRequestIdStatusData(client_retryable_requests.min_running_request_id), |
236 | 0 | "Request id $0 is less than min running $1", data.request_id(), |
237 | 0 | client_retryable_requests.min_running_request_id), |
238 | 0 | round->bound_term(), nullptr /* applied_op_ids */); |
239 | 0 | return false; |
240 | 0 | } |
241 | | |
242 | 7.01M | auto& replicated_indexed_by_last_id = client_retryable_requests.replicated.get<LastIdIndex>(); |
243 | 7.01M | auto it = replicated_indexed_by_last_id.lower_bound(data.request_id()); |
244 | 7.01M | if (it != replicated_indexed_by_last_id.end() && it->first_id <= data.request_id()29.3k ) { |
245 | 1 | round->NotifyReplicationFinished( |
246 | 1 | STATUS(AlreadyPresent, "Duplicate request"), round->bound_term(), |
247 | 1 | nullptr /* applied_op_ids */); |
248 | 1 | return false; |
249 | 1 | } |
250 | | |
251 | 7.01M | auto& running_indexed_by_request_id = client_retryable_requests.running.get<RequestIdIndex>(); |
252 | 7.01M | auto emplace_result = running_indexed_by_request_id.emplace(data.request_id(), entry_time); |
253 | 7.01M | if (!emplace_result.second) { |
254 | 0 | emplace_result.first->duplicate_rounds.push_back(round); |
255 | 0 | return false; |
256 | 0 | } |
257 | | |
258 | 18.4E | VLOG_WITH_PREFIX(4) << "Running added " << data; |
259 | 7.01M | if (running_requests_gauge_) { |
260 | 6.58M | running_requests_gauge_->Increment(); |
261 | 6.58M | } |
262 | | |
263 | 7.01M | return true; |
264 | 7.01M | } |
265 | | |
266 | 50.1M | OpId CleanExpiredReplicatedAndGetMinOpId() { |
267 | 50.1M | OpId result = OpId::Max(); |
268 | 50.1M | auto now = clock_.Now(); |
269 | 50.1M | auto clean_start = now - GetAtomicFlag(&FLAGS_retryable_request_timeout_secs) * 1s; |
270 | 55.0M | for (auto ci = clients_.begin(); ci != clients_.end();) { |
271 | 4.91M | ClientRetryableRequests& client_retryable_requests = ci->second; |
272 | 4.91M | auto& op_id_index = client_retryable_requests.replicated.get<OpIdIndex>(); |
273 | 4.91M | auto it = op_id_index.begin(); |
274 | 4.91M | int64_t count = 0; |
275 | 4.91M | while (it != op_id_index.end() && it->max_time < clean_start4.33M ) { |
276 | 2.59k | ++it; |
277 | 2.59k | ++count; |
278 | 2.59k | } |
279 | 4.91M | if (replicated_request_ranges_gauge_) { |
280 | 4.69M | replicated_request_ranges_gauge_->DecrementBy(count); |
281 | 4.69M | } |
282 | 4.91M | if (it != op_id_index.end()) { |
283 | 4.33M | result = std::min(result, it->min_op_id); |
284 | 4.33M | op_id_index.erase(op_id_index.begin(), it); |
285 | 4.33M | } else { |
286 | 580k | op_id_index.clear(); |
287 | 580k | } |
288 | 4.91M | if (op_id_index.empty() && client_retryable_requests.running.empty()580k ) { |
289 | | // We delay deleting client with empty requests, to be able to filter requests with too |
290 | | // small request id. |
291 | 521k | if (client_retryable_requests.empty_since == RestartSafeCoarseTimePoint()) { |
292 | 2.59k | client_retryable_requests.empty_since = now; |
293 | 518k | } else if (client_retryable_requests.empty_since < clean_start) { |
294 | 598 | ci = clients_.erase(ci); |
295 | 598 | continue; |
296 | 598 | } |
297 | 521k | } |
298 | 4.91M | ++ci; |
299 | 4.91M | } |
300 | | |
301 | 50.1M | return result; |
302 | 50.1M | } |
303 | | |
304 | | void ReplicationFinished( |
305 | 14.5M | const ReplicateMsg& replicate_msg, const Status& status, int64_t leader_term) { |
306 | 14.5M | auto data = ReplicateData::FromMsg(replicate_msg); |
307 | 14.5M | if (!data) { |
308 | 7.51M | return; |
309 | 7.51M | } |
310 | | |
311 | 7.00M | auto& client_retryable_requests = clients_[data.client_id()]; |
312 | 7.00M | auto& running_indexed_by_request_id = client_retryable_requests.running.get<RequestIdIndex>(); |
313 | 7.00M | auto running_it = running_indexed_by_request_id.find(data.request_id()); |
314 | 7.00M | if (running_it == running_indexed_by_request_id.end()) { |
315 | 0 | #ifndef NDEBUG |
316 | 0 | LOG_WITH_PREFIX(ERROR) << "Running requests: " |
317 | 0 | << AsString(running_indexed_by_request_id); |
318 | 0 | #endif |
319 | 0 | LOG_WITH_PREFIX(DFATAL) << "Replication finished for request with unknown id " << data; |
320 | 0 | return; |
321 | 0 | } |
322 | 18.4E | VLOG_WITH_PREFIX(4) << "Running " << (status.ok() ? "replicated "0 : "aborted ") << data |
323 | 18.4E | << ", " << status; |
324 | | |
325 | 7.00M | static Status duplicate_write_status = STATUS(AlreadyPresent, "Duplicate request"); |
326 | 18.4E | auto status_for_duplicate = status.ok()7.00M ? duplicate_write_status7.01M : status; |
327 | 7.00M | for (const auto& duplicate : running_it->duplicate_rounds) { |
328 | 0 | duplicate->NotifyReplicationFinished(status_for_duplicate, leader_term, |
329 | 0 | nullptr /* applied_op_ids */); |
330 | 0 | } |
331 | 7.00M | auto entry_time = running_it->time; |
332 | 7.00M | running_indexed_by_request_id.erase(running_it); |
333 | 7.00M | if (running_requests_gauge_) { |
334 | 6.57M | running_requests_gauge_->Decrement(); |
335 | 6.57M | } |
336 | | |
337 | 7.00M | if (status.ok()7.00M ) { |
338 | 7.00M | AddReplicated( |
339 | 7.00M | yb::OpId::FromPB(replicate_msg.id()), data, entry_time, &client_retryable_requests); |
340 | 7.00M | } |
341 | 7.00M | } |
342 | | |
343 | | void Bootstrap( |
344 | 1.00M | const ReplicateMsg& replicate_msg, RestartSafeCoarseTimePoint entry_time) { |
345 | 1.00M | auto data = ReplicateData::FromMsg(replicate_msg); |
346 | 1.00M | if (!data) { |
347 | 9.02k | return; |
348 | 9.02k | } |
349 | | |
350 | 993k | auto& client_retryable_requests = clients_[data.client_id()]; |
351 | 993k | auto& running_indexed_by_request_id = client_retryable_requests.running.get<RequestIdIndex>(); |
352 | 993k | if (running_indexed_by_request_id.count(data.request_id()) != 0) { |
353 | 0 | #ifndef NDEBUG |
354 | 0 | LOG_WITH_PREFIX(ERROR) << "Running requests: " |
355 | 0 | << yb::ToString(running_indexed_by_request_id); |
356 | 0 | #endif |
357 | 0 | LOG_WITH_PREFIX(DFATAL) << "Bootstrapped running request " << data; |
358 | 0 | return; |
359 | 0 | } |
360 | 993k | VLOG_WITH_PREFIX15 (4) << "Bootstrapped " << data15 ; |
361 | | |
362 | 993k | CleanupReplicatedRequests( |
363 | 993k | data.write().min_running_request_id(), &client_retryable_requests); |
364 | | |
365 | 993k | AddReplicated( |
366 | 993k | yb::OpId::FromPB(replicate_msg.id()), data, entry_time, &client_retryable_requests); |
367 | 993k | } |
368 | | |
369 | 24.2M | RestartSafeCoarseMonoClock& Clock() { |
370 | 24.2M | return clock_; |
371 | 24.2M | } |
372 | | |
373 | 142k | void SetMetricEntity(const scoped_refptr<MetricEntity>& metric_entity) { |
374 | 142k | running_requests_gauge_ = METRIC_running_retryable_requests.Instantiate(metric_entity, 0); |
375 | 142k | replicated_request_ranges_gauge_ = METRIC_replicated_retryable_request_ranges.Instantiate( |
376 | 142k | metric_entity, 0); |
377 | 142k | } |
378 | | |
379 | 0 | RetryableRequestsCounts TEST_Counts() { |
380 | 0 | RetryableRequestsCounts result; |
381 | 0 | for (const auto& p : clients_) { |
382 | 0 | result.running += p.second.running.size(); |
383 | 0 | result.replicated += p.second.replicated.size(); |
384 | 0 | LOG_WITH_PREFIX(INFO) << "Replicated: " << yb::ToString(p.second.replicated); |
385 | 0 | } |
386 | 0 | return result; |
387 | 0 | } |
388 | | |
389 | 0 | Result<RetryableRequestId> MinRunningRequestId(const ClientId& client_id) const { |
390 | 0 | const auto it = clients_.find(client_id); |
391 | 0 | if (it == clients_.end()) { |
392 | 0 | return STATUS_FORMAT(NotFound, "Client requests data not found for client $0", client_id); |
393 | 0 | } |
394 | 0 | return it->second.min_running_request_id; |
395 | 0 | } |
396 | | |
397 | | private: |
398 | | void CleanupReplicatedRequests( |
399 | | RetryableRequestId new_min_running_request_id, |
400 | 8.00M | ClientRetryableRequests* client_retryable_requests) { |
401 | 8.00M | auto& replicated_indexed_by_last_id = client_retryable_requests->replicated.get<LastIdIndex>(); |
402 | 8.00M | if (new_min_running_request_id > client_retryable_requests->min_running_request_id) { |
403 | | // We are not interested in ids below write_request.min_running_request_id() anymore. |
404 | | // |
405 | | // Request id intervals are ordered by last id of interval, and does not overlap. |
406 | | // So we are trying to find interval with last_id >= min_running_request_id |
407 | | // and trim it if necessary. |
408 | 7.42M | auto it = replicated_indexed_by_last_id.lower_bound(new_min_running_request_id); |
409 | 7.42M | if (it != replicated_indexed_by_last_id.end() && |
410 | 7.42M | it->first_id < new_min_running_request_id134k ) { |
411 | 128k | it->first_id = new_min_running_request_id; |
412 | 128k | } |
413 | 7.42M | if (replicated_request_ranges_gauge_) { |
414 | 6.07M | replicated_request_ranges_gauge_->DecrementBy( |
415 | 6.07M | std::distance(replicated_indexed_by_last_id.begin(), it)); |
416 | 6.07M | } |
417 | | // Remove all intervals that has ids below write_request.min_running_request_id(). |
418 | 7.42M | replicated_indexed_by_last_id.erase(replicated_indexed_by_last_id.begin(), it); |
419 | 7.42M | client_retryable_requests->min_running_request_id = new_min_running_request_id; |
420 | 7.42M | } |
421 | 8.00M | } |
422 | | |
423 | | void AddReplicated(yb::OpId op_id, const ReplicateData& data, RestartSafeCoarseTimePoint time, |
424 | 8.00M | ClientRetryableRequests* client) { |
425 | 8.00M | auto request_id = data.request_id(); |
426 | 8.00M | auto& replicated_indexed_by_last_id = client->replicated.get<LastIdIndex>(); |
427 | 8.00M | auto request_it = replicated_indexed_by_last_id.lower_bound(request_id); |
428 | 8.00M | if (request_it != replicated_indexed_by_last_id.end() && request_it->first_id <= request_id59.3k ) { |
429 | 0 | #ifndef NDEBUG |
430 | 0 | LOG_WITH_PREFIX(ERROR) |
431 | 0 | << "Replicated requests: " << yb::ToString(client->replicated); |
432 | 0 | #endif |
433 | |
|
434 | 0 | LOG_WITH_PREFIX(DFATAL) << "Request already replicated: " << data; |
435 | 0 | return; |
436 | 0 | } |
437 | | |
438 | | // Check that we have range right after this id, and we could extend it. |
439 | | // Requests rarely attaches to begin of interval, so we could don't check for |
440 | | // RangeTimeLimit() here. |
441 | 8.00M | if (request_it != replicated_indexed_by_last_id.end() && |
442 | 8.00M | request_it->first_id == request_id + 159.3k ) { |
443 | 49.8k | op_id = std::min(request_it->min_op_id, op_id); |
444 | 49.8k | request_it->InsertTime(time); |
445 | | // If previous range is right before this id, then we could just join those ranges. |
446 | 49.8k | if (!TryJoinRanges(request_it, op_id, &replicated_indexed_by_last_id)) { |
447 | 29.0k | --(request_it->first_id); |
448 | 29.0k | UpdateMinOpId(request_it, op_id, &replicated_indexed_by_last_id); |
449 | 29.0k | } |
450 | 49.8k | return; |
451 | 49.8k | } |
452 | | |
453 | 7.95M | if (TryJoinToEndOfRange(request_it, op_id, request_id, time, &replicated_indexed_by_last_id)) { |
454 | 635k | return; |
455 | 635k | } |
456 | | |
457 | 7.31M | client->replicated.emplace(request_id, op_id, time); |
458 | 7.31M | if (replicated_request_ranges_gauge_) { |
459 | 6.00M | replicated_request_ranges_gauge_->Increment(); |
460 | 6.00M | } |
461 | 7.31M | } |
462 | | |
463 | | void UpdateMinOpId( |
464 | | ReplicatedRetryableRequestRangesByLastId::iterator request_it, |
465 | | yb::OpId min_op_id, |
466 | 685k | ReplicatedRetryableRequestRangesByLastId* replicated_indexed_by_last_id) { |
467 | 685k | if (min_op_id < request_it->min_op_id) { |
468 | 18.6k | replicated_indexed_by_last_id->modify(request_it, [min_op_id](auto& entry) { // NOLINT |
469 | 18.6k | entry.min_op_id = min_op_id; |
470 | 18.6k | }); |
471 | 18.6k | } |
472 | 685k | } |
473 | | |
474 | | bool TryJoinRanges( |
475 | | ReplicatedRetryableRequestRangesByLastId::iterator request_it, |
476 | | yb::OpId min_op_id, |
477 | 49.8k | ReplicatedRetryableRequestRangesByLastId* replicated_indexed_by_last_id) { |
478 | 49.8k | if (request_it == replicated_indexed_by_last_id->begin()) { |
479 | 24.6k | return false; |
480 | 24.6k | } |
481 | | |
482 | 25.2k | auto request_prev_it = request_it; |
483 | 25.2k | --request_prev_it; |
484 | | |
485 | | // We could join ranges if there is exactly one id between them, and request with that id was |
486 | | // just replicated... |
487 | 25.2k | if (request_prev_it->last_id + 2 != request_it->first_id) { |
488 | 4.37k | return false; |
489 | 4.37k | } |
490 | | |
491 | | // ...and time range will fit into limit. |
492 | 20.8k | if (request_it->max_time > request_prev_it->min_time + RangeTimeLimit()) { |
493 | 0 | return false; |
494 | 0 | } |
495 | | |
496 | 20.8k | min_op_id = std::min(min_op_id, request_prev_it->min_op_id); |
497 | 20.8k | request_it->PrepareJoinWithPrev(*request_prev_it); |
498 | 20.8k | replicated_indexed_by_last_id->erase(request_prev_it); |
499 | 20.8k | if (replicated_request_ranges_gauge_) { |
500 | 19.8k | replicated_request_ranges_gauge_->Decrement(); |
501 | 19.8k | } |
502 | 20.8k | UpdateMinOpId(request_it, min_op_id, replicated_indexed_by_last_id); |
503 | | |
504 | 20.8k | return true; |
505 | 20.8k | } |
506 | | |
507 | | bool TryJoinToEndOfRange( |
508 | | ReplicatedRetryableRequestRangesByLastId::iterator request_it, |
509 | | yb::OpId op_id, RetryableRequestId request_id, RestartSafeCoarseTimePoint time, |
510 | 7.95M | ReplicatedRetryableRequestRangesByLastId* replicated_indexed_by_last_id) { |
511 | 7.95M | if (request_it == replicated_indexed_by_last_id->begin()) { |
512 | 7.28M | return false; |
513 | 7.28M | } |
514 | | |
515 | 665k | --request_it; |
516 | | |
517 | 665k | if (request_it->last_id + 1 != request_id) { |
518 | 29.7k | return false; |
519 | 29.7k | } |
520 | | |
521 | | // It is rare case when request is attaches to end of range, but his time is lower than |
522 | | // min_time. So we could avoid checking for the case when |
523 | | // time + RangeTimeLimit() > request_prev_it->max_time |
524 | 635k | if (time > request_it->min_time + RangeTimeLimit()) { |
525 | 0 | return false; |
526 | 0 | } |
527 | | |
528 | 635k | op_id = std::min(request_it->min_op_id, op_id); |
529 | 635k | request_it->InsertTime(time); |
530 | | // Actually we should use the modify function on client.replicated, but since the order of |
531 | | // ranges should not be changed, we could update last_id directly. |
532 | 635k | ++const_cast<ReplicatedRetryableRequestRange&>(*request_it).last_id; |
533 | | |
534 | 635k | UpdateMinOpId(request_it, op_id, replicated_indexed_by_last_id); |
535 | | |
536 | 635k | return true; |
537 | 635k | } |
538 | | |
539 | 0 | const std::string& LogPrefix() const { |
540 | 0 | return log_prefix_; |
541 | 0 | } |
542 | | |
543 | | const std::string log_prefix_; |
544 | | std::unordered_map<ClientId, ClientRetryableRequests, ClientIdHash> clients_; |
545 | | RestartSafeCoarseMonoClock clock_; |
546 | | scoped_refptr<AtomicGauge<int64_t>> running_requests_gauge_; |
547 | | scoped_refptr<AtomicGauge<int64_t>> replicated_request_ranges_gauge_; |
548 | | }; |
549 | | |
550 | | RetryableRequests::RetryableRequests(std::string log_prefix) |
551 | 292k | : impl_(new Impl(std::move(log_prefix))) { |
552 | 292k | } |
553 | | |
554 | 217k | RetryableRequests::~RetryableRequests() { |
555 | 217k | } |
556 | | |
557 | 0 | RetryableRequests::RetryableRequests(RetryableRequests&& rhs) : impl_(std::move(rhs.impl_)) {} |
558 | | |
559 | 142k | void RetryableRequests::operator=(RetryableRequests&& rhs) { |
560 | 142k | impl_ = std::move(rhs.impl_); |
561 | 142k | } |
562 | | |
563 | | bool RetryableRequests::Register( |
564 | 8.55M | const ConsensusRoundPtr& round, RestartSafeCoarseTimePoint entry_time) { |
565 | 8.55M | return impl_->Register(round, entry_time); |
566 | 8.55M | } |
567 | | |
568 | 50.1M | yb::OpId RetryableRequests::CleanExpiredReplicatedAndGetMinOpId() { |
569 | 50.1M | return impl_->CleanExpiredReplicatedAndGetMinOpId(); |
570 | 50.1M | } |
571 | | |
572 | | void RetryableRequests::ReplicationFinished( |
573 | 14.5M | const ReplicateMsg& replicate_msg, const Status& status, int64_t leader_term) { |
574 | 14.5M | impl_->ReplicationFinished(replicate_msg, status, leader_term); |
575 | 14.5M | } |
576 | | |
577 | | void RetryableRequests::Bootstrap( |
578 | 1.00M | const ReplicateMsg& replicate_msg, RestartSafeCoarseTimePoint entry_time) { |
579 | 1.00M | impl_->Bootstrap(replicate_msg, entry_time); |
580 | 1.00M | } |
581 | | |
582 | 24.3M | RestartSafeCoarseMonoClock& RetryableRequests::Clock() { |
583 | 24.3M | return impl_->Clock(); |
584 | 24.3M | } |
585 | | |
586 | 0 | RetryableRequestsCounts RetryableRequests::TEST_Counts() { |
587 | 0 | return impl_->TEST_Counts(); |
588 | 0 | } |
589 | | |
590 | | Result<RetryableRequestId> RetryableRequests::MinRunningRequestId( |
591 | 0 | const ClientId& client_id) const { |
592 | 0 | return impl_->MinRunningRequestId(client_id); |
593 | 0 | } |
594 | | |
595 | 142k | void RetryableRequests::SetMetricEntity(const scoped_refptr<MetricEntity>& metric_entity) { |
596 | 142k | impl_->SetMetricEntity(metric_entity); |
597 | 142k | } |
598 | | |
599 | | } // namespace consensus |
600 | | } // namespace yb |