YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/consensus/retryable_requests.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/consensus/retryable_requests.h"
15
16
#include <boost/multi_index/hashed_index.hpp>
17
#include <boost/multi_index/member.hpp>
18
#include <boost/multi_index/ordered_index.hpp>
19
#include <boost/multi_index_container.hpp>
20
21
#include "yb/consensus/consensus.pb.h"
22
#include "yb/consensus/consensus_round.h"
23
24
#include "yb/tablet/operations.pb.h"
25
26
#include "yb/util/atomic.h"
27
#include "yb/util/flag_tags.h"
28
#include "yb/util/format.h"
29
#include "yb/util/logging.h"
30
#include "yb/util/metrics.h"
31
#include "yb/util/opid.h"
32
#include "yb/util/result.h"
33
#include "yb/util/status_format.h"
34
35
using namespace std::literals;
36
37
DEFINE_int32(retryable_request_timeout_secs, 120,
38
             "Amount of time to keep write request in index, to prevent duplicate writes.");
39
TAG_FLAG(retryable_request_timeout_secs, runtime);
40
41
// We use this limit to prevent request range from infinite grow, because it will block log
42
// cleanup. I.e. even we have continous request range, it will be split by blocks, that could be
43
// dropped independently.
44
DEFINE_int32(retryable_request_range_time_limit_secs, 30,
45
             "Max delta in time for single op id range.");
46
47
METRIC_DEFINE_gauge_int64(tablet, running_retryable_requests,
48
                          "Number of running retryable requests.",
49
                          yb::MetricUnit::kRequests,
50
                          "Number of running retryable requests.");
51
52
METRIC_DEFINE_gauge_int64(tablet, replicated_retryable_request_ranges,
53
                          "Number of replicated retryable request ranges.",
54
                          yb::MetricUnit::kRequests,
55
                          "Number of replicated retryable request ranges.");
56
57
namespace yb {
58
namespace consensus {
59
60
namespace {
61
62
struct RunningRetryableRequest {
63
  RetryableRequestId request_id;
64
  RestartSafeCoarseTimePoint time;
65
  mutable std::vector<ConsensusRoundPtr> duplicate_rounds;
66
67
  RunningRetryableRequest(
68
      RetryableRequestId request_id_, RestartSafeCoarseTimePoint time_)
69
7.01M
      : request_id(request_id_), time(time_) {}
70
71
0
  std::string ToString() const {
72
0
    return YB_STRUCT_TO_STRING(request_id, time);
73
0
  }
74
};
75
76
struct ReplicatedRetryableRequestRange {
77
  mutable RetryableRequestId first_id;
78
  RetryableRequestId last_id;
79
  yb::OpId min_op_id;
80
  mutable RestartSafeCoarseTimePoint min_time;
81
  mutable RestartSafeCoarseTimePoint max_time;
82
83
  ReplicatedRetryableRequestRange(RetryableRequestId id, const yb::OpId& op_id,
84
                              RestartSafeCoarseTimePoint time)
85
      : first_id(id), last_id(id), min_op_id(op_id), min_time(time),
86
7.31M
        max_time(time) {}
87
88
685k
  void InsertTime(const RestartSafeCoarseTimePoint& time) const {
89
685k
    min_time = std::min(min_time, time);
90
685k
    max_time = std::max(max_time, time);
91
685k
  }
92
93
20.8k
  void PrepareJoinWithPrev(const ReplicatedRetryableRequestRange& prev) const {
94
20.8k
    min_time = std::min(min_time, prev.min_time);
95
20.8k
    max_time = std::max(max_time, prev.max_time);
96
20.8k
    first_id = prev.first_id;
97
20.8k
  }
98
99
0
  std::string ToString() const {
100
0
    return Format("{ first_id: $0 last_id: $1 min_op_id: $2 min_time: $3 max_time: $4 }",
101
0
                  first_id, last_id, min_op_id, min_time, max_time);
102
0
  }
103
};
104
105
struct LastIdIndex;
106
struct OpIdIndex;
107
struct RequestIdIndex;
108
109
typedef boost::multi_index_container <
110
    RunningRetryableRequest,
111
    boost::multi_index::indexed_by <
112
        boost::multi_index::hashed_unique <
113
            boost::multi_index::tag<RequestIdIndex>,
114
            boost::multi_index::member <
115
                RunningRetryableRequest, RetryableRequestId, &RunningRetryableRequest::request_id
116
            >
117
        >
118
    >
119
> RunningRetryableRequests;
120
121
typedef boost::multi_index_container <
122
    ReplicatedRetryableRequestRange,
123
    boost::multi_index::indexed_by <
124
        boost::multi_index::ordered_unique <
125
            boost::multi_index::tag<LastIdIndex>,
126
            boost::multi_index::member <
127
                ReplicatedRetryableRequestRange, RetryableRequestId,
128
                &ReplicatedRetryableRequestRange::last_id
129
            >
130
        >,
131
        boost::multi_index::ordered_unique <
132
            boost::multi_index::tag<OpIdIndex>,
133
            boost::multi_index::member <
134
                ReplicatedRetryableRequestRange, yb::OpId,
135
                &ReplicatedRetryableRequestRange::min_op_id
136
            >
137
        >
138
    >
139
> ReplicatedRetryableRequestRanges;
140
141
typedef ReplicatedRetryableRequestRanges::index<LastIdIndex>::type
142
    ReplicatedRetryableRequestRangesByLastId;
143
144
struct ClientRetryableRequests {
145
  RunningRetryableRequests running;
146
  ReplicatedRetryableRequestRanges replicated;
147
  RetryableRequestId min_running_request_id = 0;
148
  RestartSafeCoarseTimePoint empty_since;
149
};
150
151
656k
std::chrono::seconds RangeTimeLimit() {
152
656k
  return std::chrono::seconds(FLAGS_retryable_request_range_time_limit_secs);
153
656k
}
154
155
class ReplicateData {
156
 public:
157
5.97M
  ReplicateData() : client_id_(ClientId::Nil()), write_(nullptr) {}
158
159
  explicit ReplicateData(const tablet::WritePB* write, const OpIdPB& op_id)
160
      : client_id_(write->client_id1(), write->client_id2()),
161
18.1M
        write_(write), op_id_(OpId::FromPB(op_id)) {
162
163
18.1M
  }
164
165
24.0M
  static ReplicateData FromMsg(const ReplicateMsg& replicate_msg) {
166
24.0M
    if (!replicate_msg.has_write()) {
167
5.97M
      return ReplicateData();
168
5.97M
    }
169
170
18.1M
    return ReplicateData(&replicate_msg.write(), replicate_msg.id());
171
24.0M
  }
172
173
24.0M
  bool operator!() const {
174
24.0M
    return client_id_.IsNil();
175
24.0M
  }
176
177
0
  explicit operator bool() const {
178
0
    return !!*this;
179
0
  }
180
181
15.0M
  const ClientId& client_id() const {
182
15.0M
    return client_id_;
183
15.0M
  }
184
185
8.00M
  const tablet::WritePB& write() const {
186
8.00M
    return *write_;
187
8.00M
  }
188
189
37.0M
  RetryableRequestId request_id() const {
190
37.0M
    return write_->request_id();
191
37.0M
  }
192
193
0
  const yb::OpId& op_id() const {
194
0
    return op_id_;
195
0
  }
196
197
 private:
198
  ClientId client_id_;
199
  const tablet::WritePB* write_;
200
  yb::OpId op_id_;
201
};
202
203
0
std::ostream& operator<<(std::ostream& out, const ReplicateData& data) {
204
0
  return out << data.client_id() << '/' << data.request_id() << ": "
205
0
             << data.write().ShortDebugString() << " op_id: " << data.op_id();
206
0
}
207
208
} // namespace
209
210
class RetryableRequests::Impl {
211
 public:
212
292k
  explicit Impl(std::string log_prefix) : log_prefix_(std::move(log_prefix)) {
213
18.4E
    VLOG_WITH_PREFIX(1) << "Start";
214
292k
  }
215
216
8.55M
  bool Register(const ConsensusRoundPtr& round, RestartSafeCoarseTimePoint entry_time) {
217
8.55M
    auto data = ReplicateData::FromMsg(*round->replicate_msg());
218
8.55M
    if (!data) {
219
1.54M
      return true;
220
1.54M
    }
221
222
7.01M
    if (entry_time == RestartSafeCoarseTimePoint()) {
223
7.01M
      entry_time = clock_.Now();
224
7.01M
    }
225
226
7.01M
    ClientRetryableRequests& client_retryable_requests = clients_[data.client_id()];
227
228
7.01M
    CleanupReplicatedRequests(
229
7.01M
        data.write().min_running_request_id(), &client_retryable_requests);
230
231
7.01M
    if (data.request_id() < client_retryable_requests.min_running_request_id) {
232
0
      round->NotifyReplicationFinished(
233
0
          STATUS_EC_FORMAT(
234
0
              Expired,
235
0
              MinRunningRequestIdStatusData(client_retryable_requests.min_running_request_id),
236
0
              "Request id $0 is less than min running $1", data.request_id(),
237
0
              client_retryable_requests.min_running_request_id),
238
0
          round->bound_term(), nullptr /* applied_op_ids */);
239
0
      return false;
240
0
    }
241
242
7.01M
    auto& replicated_indexed_by_last_id = client_retryable_requests.replicated.get<LastIdIndex>();
243
7.01M
    auto it = replicated_indexed_by_last_id.lower_bound(data.request_id());
244
7.01M
    if (it != replicated_indexed_by_last_id.end() && 
it->first_id <= data.request_id()29.3k
) {
245
1
      round->NotifyReplicationFinished(
246
1
          STATUS(AlreadyPresent, "Duplicate request"), round->bound_term(),
247
1
          nullptr /* applied_op_ids */);
248
1
      return false;
249
1
    }
250
251
7.01M
    auto& running_indexed_by_request_id = client_retryable_requests.running.get<RequestIdIndex>();
252
7.01M
    auto emplace_result = running_indexed_by_request_id.emplace(data.request_id(), entry_time);
253
7.01M
    if (!emplace_result.second) {
254
0
      emplace_result.first->duplicate_rounds.push_back(round);
255
0
      return false;
256
0
    }
257
258
18.4E
    VLOG_WITH_PREFIX(4) << "Running added " << data;
259
7.01M
    if (running_requests_gauge_) {
260
6.58M
      running_requests_gauge_->Increment();
261
6.58M
    }
262
263
7.01M
    return true;
264
7.01M
  }
265
266
50.1M
  OpId CleanExpiredReplicatedAndGetMinOpId() {
267
50.1M
    OpId result = OpId::Max();
268
50.1M
    auto now = clock_.Now();
269
50.1M
    auto clean_start = now - GetAtomicFlag(&FLAGS_retryable_request_timeout_secs) * 1s;
270
55.0M
    for (auto ci = clients_.begin(); ci != clients_.end();) {
271
4.91M
      ClientRetryableRequests& client_retryable_requests = ci->second;
272
4.91M
      auto& op_id_index = client_retryable_requests.replicated.get<OpIdIndex>();
273
4.91M
      auto it = op_id_index.begin();
274
4.91M
      int64_t count = 0;
275
4.91M
      while (it != op_id_index.end() && 
it->max_time < clean_start4.33M
) {
276
2.59k
        ++it;
277
2.59k
        ++count;
278
2.59k
      }
279
4.91M
      if (replicated_request_ranges_gauge_) {
280
4.69M
        replicated_request_ranges_gauge_->DecrementBy(count);
281
4.69M
      }
282
4.91M
      if (it != op_id_index.end()) {
283
4.33M
        result = std::min(result, it->min_op_id);
284
4.33M
        op_id_index.erase(op_id_index.begin(), it);
285
4.33M
      } else {
286
580k
        op_id_index.clear();
287
580k
      }
288
4.91M
      if (op_id_index.empty() && 
client_retryable_requests.running.empty()580k
) {
289
        // We delay deleting client with empty requests, to be able to filter requests with too
290
        // small request id.
291
521k
        if (client_retryable_requests.empty_since == RestartSafeCoarseTimePoint()) {
292
2.59k
          client_retryable_requests.empty_since = now;
293
518k
        } else if (client_retryable_requests.empty_since < clean_start) {
294
598
          ci = clients_.erase(ci);
295
598
          continue;
296
598
        }
297
521k
      }
298
4.91M
      ++ci;
299
4.91M
    }
300
301
50.1M
    return result;
302
50.1M
  }
303
304
  void ReplicationFinished(
305
14.5M
      const ReplicateMsg& replicate_msg, const Status& status, int64_t leader_term) {
306
14.5M
    auto data = ReplicateData::FromMsg(replicate_msg);
307
14.5M
    if (!data) {
308
7.51M
      return;
309
7.51M
    }
310
311
7.00M
    auto& client_retryable_requests = clients_[data.client_id()];
312
7.00M
    auto& running_indexed_by_request_id = client_retryable_requests.running.get<RequestIdIndex>();
313
7.00M
    auto running_it = running_indexed_by_request_id.find(data.request_id());
314
7.00M
    if (running_it == running_indexed_by_request_id.end()) {
315
0
#ifndef NDEBUG
316
0
      LOG_WITH_PREFIX(ERROR) << "Running requests: "
317
0
                             << AsString(running_indexed_by_request_id);
318
0
#endif
319
0
      LOG_WITH_PREFIX(DFATAL) << "Replication finished for request with unknown id " << data;
320
0
      return;
321
0
    }
322
18.4E
    VLOG_WITH_PREFIX(4) << "Running " << (status.ok() ? 
"replicated "0
: "aborted ") << data
323
18.4E
                        << ", " << status;
324
325
7.00M
    static Status duplicate_write_status = STATUS(AlreadyPresent, "Duplicate request");
326
18.4E
    auto status_for_duplicate = 
status.ok()7.00M
?
duplicate_write_status7.01M
: status;
327
7.00M
    for (const auto& duplicate : running_it->duplicate_rounds) {
328
0
      duplicate->NotifyReplicationFinished(status_for_duplicate, leader_term,
329
0
                                           nullptr /* applied_op_ids */);
330
0
    }
331
7.00M
    auto entry_time = running_it->time;
332
7.00M
    running_indexed_by_request_id.erase(running_it);
333
7.00M
    if (running_requests_gauge_) {
334
6.57M
      running_requests_gauge_->Decrement();
335
6.57M
    }
336
337
7.00M
    if (
status.ok()7.00M
) {
338
7.00M
      AddReplicated(
339
7.00M
          yb::OpId::FromPB(replicate_msg.id()), data, entry_time, &client_retryable_requests);
340
7.00M
    }
341
7.00M
  }
342
343
  void Bootstrap(
344
1.00M
      const ReplicateMsg& replicate_msg, RestartSafeCoarseTimePoint entry_time) {
345
1.00M
    auto data = ReplicateData::FromMsg(replicate_msg);
346
1.00M
    if (!data) {
347
9.02k
      return;
348
9.02k
    }
349
350
993k
    auto& client_retryable_requests = clients_[data.client_id()];
351
993k
    auto& running_indexed_by_request_id = client_retryable_requests.running.get<RequestIdIndex>();
352
993k
    if (running_indexed_by_request_id.count(data.request_id()) != 0) {
353
0
#ifndef NDEBUG
354
0
      LOG_WITH_PREFIX(ERROR) << "Running requests: "
355
0
                             << yb::ToString(running_indexed_by_request_id);
356
0
#endif
357
0
      LOG_WITH_PREFIX(DFATAL) << "Bootstrapped running request " << data;
358
0
      return;
359
0
    }
360
993k
    
VLOG_WITH_PREFIX15
(4) << "Bootstrapped " << data15
;
361
362
993k
    CleanupReplicatedRequests(
363
993k
       data.write().min_running_request_id(), &client_retryable_requests);
364
365
993k
    AddReplicated(
366
993k
        yb::OpId::FromPB(replicate_msg.id()), data, entry_time, &client_retryable_requests);
367
993k
  }
368
369
24.2M
  RestartSafeCoarseMonoClock& Clock() {
370
24.2M
    return clock_;
371
24.2M
  }
372
373
142k
  void SetMetricEntity(const scoped_refptr<MetricEntity>& metric_entity) {
374
142k
    running_requests_gauge_ = METRIC_running_retryable_requests.Instantiate(metric_entity, 0);
375
142k
    replicated_request_ranges_gauge_ = METRIC_replicated_retryable_request_ranges.Instantiate(
376
142k
        metric_entity, 0);
377
142k
  }
378
379
0
  RetryableRequestsCounts TEST_Counts() {
380
0
    RetryableRequestsCounts result;
381
0
    for (const auto& p : clients_) {
382
0
      result.running += p.second.running.size();
383
0
      result.replicated += p.second.replicated.size();
384
0
      LOG_WITH_PREFIX(INFO) << "Replicated: " << yb::ToString(p.second.replicated);
385
0
    }
386
0
    return result;
387
0
  }
388
389
0
  Result<RetryableRequestId> MinRunningRequestId(const ClientId& client_id) const {
390
0
    const auto it = clients_.find(client_id);
391
0
    if (it == clients_.end()) {
392
0
      return STATUS_FORMAT(NotFound, "Client requests data not found for client $0", client_id);
393
0
    }
394
0
    return it->second.min_running_request_id;
395
0
  }
396
397
 private:
398
  void CleanupReplicatedRequests(
399
      RetryableRequestId new_min_running_request_id,
400
8.00M
      ClientRetryableRequests* client_retryable_requests) {
401
8.00M
    auto& replicated_indexed_by_last_id = client_retryable_requests->replicated.get<LastIdIndex>();
402
8.00M
    if (new_min_running_request_id > client_retryable_requests->min_running_request_id) {
403
      // We are not interested in ids below write_request.min_running_request_id() anymore.
404
      //
405
      // Request id intervals are ordered by last id of interval, and does not overlap.
406
      // So we are trying to find interval with last_id >= min_running_request_id
407
      // and trim it if necessary.
408
7.42M
      auto it = replicated_indexed_by_last_id.lower_bound(new_min_running_request_id);
409
7.42M
      if (it != replicated_indexed_by_last_id.end() &&
410
7.42M
          
it->first_id < new_min_running_request_id134k
) {
411
128k
        it->first_id = new_min_running_request_id;
412
128k
      }
413
7.42M
      if (replicated_request_ranges_gauge_) {
414
6.07M
        replicated_request_ranges_gauge_->DecrementBy(
415
6.07M
            std::distance(replicated_indexed_by_last_id.begin(), it));
416
6.07M
      }
417
      // Remove all intervals that has ids below write_request.min_running_request_id().
418
7.42M
      replicated_indexed_by_last_id.erase(replicated_indexed_by_last_id.begin(), it);
419
7.42M
      client_retryable_requests->min_running_request_id = new_min_running_request_id;
420
7.42M
    }
421
8.00M
  }
422
423
  void AddReplicated(yb::OpId op_id, const ReplicateData& data, RestartSafeCoarseTimePoint time,
424
8.00M
                     ClientRetryableRequests* client) {
425
8.00M
    auto request_id = data.request_id();
426
8.00M
    auto& replicated_indexed_by_last_id = client->replicated.get<LastIdIndex>();
427
8.00M
    auto request_it = replicated_indexed_by_last_id.lower_bound(request_id);
428
8.00M
    if (request_it != replicated_indexed_by_last_id.end() && 
request_it->first_id <= request_id59.3k
) {
429
0
#ifndef NDEBUG
430
0
      LOG_WITH_PREFIX(ERROR)
431
0
          << "Replicated requests: " << yb::ToString(client->replicated);
432
0
#endif
433
434
0
      LOG_WITH_PREFIX(DFATAL) << "Request already replicated: " << data;
435
0
      return;
436
0
    }
437
438
    // Check that we have range right after this id, and we could extend it.
439
    // Requests rarely attaches to begin of interval, so we could don't check for
440
    // RangeTimeLimit() here.
441
8.00M
    if (request_it != replicated_indexed_by_last_id.end() &&
442
8.00M
        
request_it->first_id == request_id + 159.3k
) {
443
49.8k
      op_id = std::min(request_it->min_op_id, op_id);
444
49.8k
      request_it->InsertTime(time);
445
      // If previous range is right before this id, then we could just join those ranges.
446
49.8k
      if (!TryJoinRanges(request_it, op_id, &replicated_indexed_by_last_id)) {
447
29.0k
        --(request_it->first_id);
448
29.0k
        UpdateMinOpId(request_it, op_id, &replicated_indexed_by_last_id);
449
29.0k
      }
450
49.8k
      return;
451
49.8k
    }
452
453
7.95M
    if (TryJoinToEndOfRange(request_it, op_id, request_id, time, &replicated_indexed_by_last_id)) {
454
635k
      return;
455
635k
    }
456
457
7.31M
    client->replicated.emplace(request_id, op_id, time);
458
7.31M
    if (replicated_request_ranges_gauge_) {
459
6.00M
      replicated_request_ranges_gauge_->Increment();
460
6.00M
    }
461
7.31M
  }
462
463
  void UpdateMinOpId(
464
      ReplicatedRetryableRequestRangesByLastId::iterator request_it,
465
      yb::OpId min_op_id,
466
685k
      ReplicatedRetryableRequestRangesByLastId* replicated_indexed_by_last_id) {
467
685k
    if (min_op_id < request_it->min_op_id) {
468
18.6k
      replicated_indexed_by_last_id->modify(request_it, [min_op_id](auto& entry) { // NOLINT
469
18.6k
        entry.min_op_id = min_op_id;
470
18.6k
      });
471
18.6k
    }
472
685k
  }
473
474
  bool TryJoinRanges(
475
      ReplicatedRetryableRequestRangesByLastId::iterator request_it,
476
      yb::OpId min_op_id,
477
49.8k
      ReplicatedRetryableRequestRangesByLastId* replicated_indexed_by_last_id) {
478
49.8k
    if (request_it == replicated_indexed_by_last_id->begin()) {
479
24.6k
      return false;
480
24.6k
    }
481
482
25.2k
    auto request_prev_it = request_it;
483
25.2k
    --request_prev_it;
484
485
    // We could join ranges if there is exactly one id between them, and request with that id was
486
    // just replicated...
487
25.2k
    if (request_prev_it->last_id + 2 != request_it->first_id) {
488
4.37k
      return false;
489
4.37k
    }
490
491
    // ...and time range will fit into limit.
492
20.8k
    if (request_it->max_time > request_prev_it->min_time + RangeTimeLimit()) {
493
0
      return false;
494
0
    }
495
496
20.8k
    min_op_id = std::min(min_op_id, request_prev_it->min_op_id);
497
20.8k
    request_it->PrepareJoinWithPrev(*request_prev_it);
498
20.8k
    replicated_indexed_by_last_id->erase(request_prev_it);
499
20.8k
    if (replicated_request_ranges_gauge_) {
500
19.8k
      replicated_request_ranges_gauge_->Decrement();
501
19.8k
    }
502
20.8k
    UpdateMinOpId(request_it, min_op_id, replicated_indexed_by_last_id);
503
504
20.8k
    return true;
505
20.8k
  }
506
507
  bool TryJoinToEndOfRange(
508
      ReplicatedRetryableRequestRangesByLastId::iterator request_it,
509
      yb::OpId op_id, RetryableRequestId request_id, RestartSafeCoarseTimePoint time,
510
7.95M
      ReplicatedRetryableRequestRangesByLastId* replicated_indexed_by_last_id) {
511
7.95M
    if (request_it == replicated_indexed_by_last_id->begin()) {
512
7.28M
      return false;
513
7.28M
    }
514
515
665k
    --request_it;
516
517
665k
    if (request_it->last_id + 1 != request_id) {
518
29.7k
      return false;
519
29.7k
    }
520
521
    // It is rare case when request is attaches to end of range, but his time is lower than
522
    // min_time. So we could avoid checking for the case when
523
    // time + RangeTimeLimit() > request_prev_it->max_time
524
635k
    if (time > request_it->min_time + RangeTimeLimit()) {
525
0
      return false;
526
0
    }
527
528
635k
    op_id = std::min(request_it->min_op_id, op_id);
529
635k
    request_it->InsertTime(time);
530
    // Actually we should use the modify function on client.replicated, but since the order of
531
    // ranges should not be changed, we could update last_id directly.
532
635k
    ++const_cast<ReplicatedRetryableRequestRange&>(*request_it).last_id;
533
534
635k
    UpdateMinOpId(request_it, op_id, replicated_indexed_by_last_id);
535
536
635k
    return true;
537
635k
  }
538
539
0
  const std::string& LogPrefix() const {
540
0
    return log_prefix_;
541
0
  }
542
543
  const std::string log_prefix_;
544
  std::unordered_map<ClientId, ClientRetryableRequests, ClientIdHash> clients_;
545
  RestartSafeCoarseMonoClock clock_;
546
  scoped_refptr<AtomicGauge<int64_t>> running_requests_gauge_;
547
  scoped_refptr<AtomicGauge<int64_t>> replicated_request_ranges_gauge_;
548
};
549
550
RetryableRequests::RetryableRequests(std::string log_prefix)
551
292k
    : impl_(new Impl(std::move(log_prefix))) {
552
292k
}
553
554
217k
RetryableRequests::~RetryableRequests() {
555
217k
}
556
557
0
RetryableRequests::RetryableRequests(RetryableRequests&& rhs) : impl_(std::move(rhs.impl_)) {}
558
559
142k
void RetryableRequests::operator=(RetryableRequests&& rhs) {
560
142k
  impl_ = std::move(rhs.impl_);
561
142k
}
562
563
bool RetryableRequests::Register(
564
8.55M
    const ConsensusRoundPtr& round, RestartSafeCoarseTimePoint entry_time) {
565
8.55M
  return impl_->Register(round, entry_time);
566
8.55M
}
567
568
50.1M
yb::OpId RetryableRequests::CleanExpiredReplicatedAndGetMinOpId() {
569
50.1M
  return impl_->CleanExpiredReplicatedAndGetMinOpId();
570
50.1M
}
571
572
void RetryableRequests::ReplicationFinished(
573
14.5M
    const ReplicateMsg& replicate_msg, const Status& status, int64_t leader_term) {
574
14.5M
  impl_->ReplicationFinished(replicate_msg, status, leader_term);
575
14.5M
}
576
577
void RetryableRequests::Bootstrap(
578
1.00M
    const ReplicateMsg& replicate_msg, RestartSafeCoarseTimePoint entry_time) {
579
1.00M
  impl_->Bootstrap(replicate_msg, entry_time);
580
1.00M
}
581
582
24.3M
RestartSafeCoarseMonoClock& RetryableRequests::Clock() {
583
24.3M
  return impl_->Clock();
584
24.3M
}
585
586
0
RetryableRequestsCounts RetryableRequests::TEST_Counts() {
587
0
  return impl_->TEST_Counts();
588
0
}
589
590
Result<RetryableRequestId> RetryableRequests::MinRunningRequestId(
591
0
    const ClientId& client_id) const {
592
0
  return impl_->MinRunningRequestId(client_id);
593
0
}
594
595
142k
void RetryableRequests::SetMetricEntity(const scoped_refptr<MetricEntity>& metric_entity) {
596
142k
  impl_->SetMetricEntity(metric_entity);
597
142k
}
598
599
} // namespace consensus
600
} // namespace yb