YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/consensus/retryable_requests.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/consensus/retryable_requests.h"
15
16
#include <boost/multi_index/hashed_index.hpp>
17
#include <boost/multi_index/member.hpp>
18
#include <boost/multi_index/ordered_index.hpp>
19
#include <boost/multi_index_container.hpp>
20
21
#include "yb/consensus/consensus.pb.h"
22
#include "yb/consensus/consensus_round.h"
23
24
#include "yb/tablet/operations.pb.h"
25
26
#include "yb/util/atomic.h"
27
#include "yb/util/flag_tags.h"
28
#include "yb/util/format.h"
29
#include "yb/util/logging.h"
30
#include "yb/util/metrics.h"
31
#include "yb/util/opid.h"
32
#include "yb/util/result.h"
33
#include "yb/util/status_format.h"
34
35
using namespace std::literals;
36
37
DEFINE_int32(retryable_request_timeout_secs, 120,
38
             "Amount of time to keep write request in index, to prevent duplicate writes.");
39
TAG_FLAG(retryable_request_timeout_secs, runtime);
40
41
// We use this limit to prevent request range from infinite grow, because it will block log
42
// cleanup. I.e. even we have continous request range, it will be split by blocks, that could be
43
// dropped independently.
44
DEFINE_int32(retryable_request_range_time_limit_secs, 30,
45
             "Max delta in time for single op id range.");
46
47
METRIC_DEFINE_gauge_int64(tablet, running_retryable_requests,
48
                          "Number of running retryable requests.",
49
                          yb::MetricUnit::kRequests,
50
                          "Number of running retryable requests.");
51
52
METRIC_DEFINE_gauge_int64(tablet, replicated_retryable_request_ranges,
53
                          "Number of replicated retryable request ranges.",
54
                          yb::MetricUnit::kRequests,
55
                          "Number of replicated retryable request ranges.");
56
57
namespace yb {
58
namespace consensus {
59
60
namespace {
61
62
struct RunningRetryableRequest {
63
  RetryableRequestId request_id;
64
  RestartSafeCoarseTimePoint time;
65
  mutable std::vector<ConsensusRoundPtr> duplicate_rounds;
66
67
  RunningRetryableRequest(
68
      RetryableRequestId request_id_, RestartSafeCoarseTimePoint time_)
69
3.66M
      : request_id(request_id_), time(time_) {}
70
71
0
  std::string ToString() const {
72
0
    return YB_STRUCT_TO_STRING(request_id, time);
73
0
  }
74
};
75
76
struct ReplicatedRetryableRequestRange {
77
  mutable RetryableRequestId first_id;
78
  RetryableRequestId last_id;
79
  yb::OpId min_op_id;
80
  mutable RestartSafeCoarseTimePoint min_time;
81
  mutable RestartSafeCoarseTimePoint max_time;
82
83
  ReplicatedRetryableRequestRange(RetryableRequestId id, const yb::OpId& op_id,
84
                              RestartSafeCoarseTimePoint time)
85
      : first_id(id), last_id(id), min_op_id(op_id), min_time(time),
86
3.94M
        max_time(time) {}
87
88
311k
  void InsertTime(const RestartSafeCoarseTimePoint& time) const {
89
311k
    min_time = std::min(min_time, time);
90
311k
    max_time = std::max(max_time, time);
91
311k
  }
92
93
5.65k
  void PrepareJoinWithPrev(const ReplicatedRetryableRequestRange& prev) const {
94
5.65k
    min_time = std::min(min_time, prev.min_time);
95
5.65k
    max_time = std::max(max_time, prev.max_time);
96
5.65k
    first_id = prev.first_id;
97
5.65k
  }
98
99
0
  std::string ToString() const {
100
0
    return Format("{ first_id: $0 last_id: $1 min_op_id: $2 min_time: $3 max_time: $4 }",
101
0
                  first_id, last_id, min_op_id, min_time, max_time);
102
0
  }
103
};
104
105
struct LastIdIndex;
106
struct OpIdIndex;
107
struct RequestIdIndex;
108
109
typedef boost::multi_index_container <
110
    RunningRetryableRequest,
111
    boost::multi_index::indexed_by <
112
        boost::multi_index::hashed_unique <
113
            boost::multi_index::tag<RequestIdIndex>,
114
            boost::multi_index::member <
115
                RunningRetryableRequest, RetryableRequestId, &RunningRetryableRequest::request_id
116
            >
117
        >
118
    >
119
> RunningRetryableRequests;
120
121
typedef boost::multi_index_container <
122
    ReplicatedRetryableRequestRange,
123
    boost::multi_index::indexed_by <
124
        boost::multi_index::ordered_unique <
125
            boost::multi_index::tag<LastIdIndex>,
126
            boost::multi_index::member <
127
                ReplicatedRetryableRequestRange, RetryableRequestId,
128
                &ReplicatedRetryableRequestRange::last_id
129
            >
130
        >,
131
        boost::multi_index::ordered_unique <
132
            boost::multi_index::tag<OpIdIndex>,
133
            boost::multi_index::member <
134
                ReplicatedRetryableRequestRange, yb::OpId,
135
                &ReplicatedRetryableRequestRange::min_op_id
136
            >
137
        >
138
    >
139
> ReplicatedRetryableRequestRanges;
140
141
typedef ReplicatedRetryableRequestRanges::index<LastIdIndex>::type
142
    ReplicatedRetryableRequestRangesByLastId;
143
144
struct ClientRetryableRequests {
145
  RunningRetryableRequests running;
146
  ReplicatedRetryableRequestRanges replicated;
147
  RetryableRequestId min_running_request_id = 0;
148
  RestartSafeCoarseTimePoint empty_since;
149
};
150
151
297k
std::chrono::seconds RangeTimeLimit() {
152
297k
  return std::chrono::seconds(FLAGS_retryable_request_range_time_limit_secs);
153
297k
}
154
155
class ReplicateData {
156
 public:
157
3.42M
  ReplicateData() : client_id_(ClientId::Nil()), write_(nullptr) {}
158
159
  explicit ReplicateData(const tablet::WritePB* write, const OpIdPB& op_id)
160
      : client_id_(write->client_id1(), write->client_id2()),
161
9.56M
        write_(write), op_id_(OpId::FromPB(op_id)) {
162
163
9.56M
  }
164
165
12.9M
  static ReplicateData FromMsg(const ReplicateMsg& replicate_msg) {
166
12.9M
    if (!replicate_msg.has_write()) {
167
3.42M
      return ReplicateData();
168
3.42M
    }
169
170
9.56M
    return ReplicateData(&replicate_msg.write(), replicate_msg.id());
171
9.56M
  }
172
173
12.9M
  bool operator!() const {
174
12.9M
    return client_id_.IsNil();
175
12.9M
  }
176
177
0
  explicit operator bool() const {
178
0
    return !!*this;
179
0
  }
180
181
7.92M
  const ClientId& client_id() const {
182
7.92M
    return client_id_;
183
7.92M
  }
184
185
4.25M
  const tablet::WritePB& write() const {
186
4.25M
    return *write_;
187
4.25M
  }
188
189
19.5M
  RetryableRequestId request_id() const {
190
19.5M
    return write_->request_id();
191
19.5M
  }
192
193
0
  const yb::OpId& op_id() const {
194
0
    return op_id_;
195
0
  }
196
197
 private:
198
  ClientId client_id_;
199
  const tablet::WritePB* write_;
200
  yb::OpId op_id_;
201
};
202
203
0
std::ostream& operator<<(std::ostream& out, const ReplicateData& data) {
204
0
  return out << data.client_id() << '/' << data.request_id() << ": "
205
0
             << data.write().ShortDebugString() << " op_id: " << data.op_id();
206
0
}
207
208
} // namespace
209
210
class RetryableRequests::Impl {
211
 public:
212
172k
  explicit Impl(std::string log_prefix) : log_prefix_(std::move(log_prefix)) {
213
2
    VLOG_WITH_PREFIX(1) << "Start";
214
172k
  }
215
216
4.47M
  bool Register(const ConsensusRoundPtr& round, RestartSafeCoarseTimePoint entry_time) {
217
4.47M
    auto data = ReplicateData::FromMsg(*round->replicate_msg());
218
4.47M
    if (!data) {
219
813k
      return true;
220
813k
    }
221
222
3.66M
    if (entry_time == RestartSafeCoarseTimePoint()) {
223
3.66M
      entry_time = clock_.Now();
224
3.66M
    }
225
226
3.66M
    ClientRetryableRequests& client_retryable_requests = clients_[data.client_id()];
227
228
3.66M
    CleanupReplicatedRequests(
229
3.66M
        data.write().min_running_request_id(), &client_retryable_requests);
230
231
3.66M
    if (data.request_id() < client_retryable_requests.min_running_request_id) {
232
0
      round->NotifyReplicationFinished(
233
0
          STATUS_EC_FORMAT(
234
0
              Expired,
235
0
              MinRunningRequestIdStatusData(client_retryable_requests.min_running_request_id),
236
0
              "Request id $0 is less than min running $1", data.request_id(),
237
0
              client_retryable_requests.min_running_request_id),
238
0
          round->bound_term(), nullptr /* applied_op_ids */);
239
0
      return false;
240
0
    }
241
242
3.66M
    auto& replicated_indexed_by_last_id = client_retryable_requests.replicated.get<LastIdIndex>();
243
3.66M
    auto it = replicated_indexed_by_last_id.lower_bound(data.request_id());
244
3.66M
    if (it != replicated_indexed_by_last_id.end() && it->first_id <= data.request_id()) {
245
1
      round->NotifyReplicationFinished(
246
1
          STATUS(AlreadyPresent, "Duplicate request"), round->bound_term(),
247
1
          nullptr /* applied_op_ids */);
248
1
      return false;
249
1
    }
250
251
3.66M
    auto& running_indexed_by_request_id = client_retryable_requests.running.get<RequestIdIndex>();
252
3.66M
    auto emplace_result = running_indexed_by_request_id.emplace(data.request_id(), entry_time);
253
3.66M
    if (!emplace_result.second) {
254
0
      emplace_result.first->duplicate_rounds.push_back(round);
255
0
      return false;
256
0
    }
257
258
18.4E
    VLOG_WITH_PREFIX(4) << "Running added " << data;
259
3.66M
    if (running_requests_gauge_) {
260
3.54M
      running_requests_gauge_->Increment();
261
3.54M
    }
262
263
3.66M
    return true;
264
3.66M
  }
265
266
6.40M
  OpId CleanExpiredReplicatedAndGetMinOpId() {
267
6.40M
    OpId result = OpId::Max();
268
6.40M
    auto now = clock_.Now();
269
6.40M
    auto clean_start = now - GetAtomicFlag(&FLAGS_retryable_request_timeout_secs) * 1s;
270
8.13M
    for (auto ci = clients_.begin(); ci != clients_.end();) {
271
1.73M
      ClientRetryableRequests& client_retryable_requests = ci->second;
272
1.73M
      auto& op_id_index = client_retryable_requests.replicated.get<OpIdIndex>();
273
1.73M
      auto it = op_id_index.begin();
274
1.73M
      int64_t count = 0;
275
1.73M
      while (it != op_id_index.end() && it->max_time < clean_start) {
276
576
        ++it;
277
576
        ++count;
278
576
      }
279
1.73M
      if (replicated_request_ranges_gauge_) {
280
1.67M
        replicated_request_ranges_gauge_->DecrementBy(count);
281
1.67M
      }
282
1.73M
      if (it != op_id_index.end()) {
283
1.61M
        result = std::min(result, it->min_op_id);
284
1.61M
        op_id_index.erase(op_id_index.begin(), it);
285
122k
      } else {
286
122k
        op_id_index.clear();
287
122k
      }
288
1.73M
      if (op_id_index.empty() && client_retryable_requests.running.empty()) {
289
        // We delay deleting client with empty requests, to be able to filter requests with too
290
        // small request id.
291
94.8k
        if (client_retryable_requests.empty_since == RestartSafeCoarseTimePoint()) {
292
579
          client_retryable_requests.empty_since = now;
293
94.2k
        } else if (client_retryable_requests.empty_since < clean_start) {
294
36
          ci = clients_.erase(ci);
295
36
          continue;
296
36
        }
297
1.73M
      }
298
1.73M
      ++ci;
299
1.73M
    }
300
301
6.40M
    return result;
302
6.40M
  }
303
304
  void ReplicationFinished(
305
7.90M
      const ReplicateMsg& replicate_msg, const Status& status, int64_t leader_term) {
306
7.90M
    auto data = ReplicateData::FromMsg(replicate_msg);
307
7.90M
    if (!data) {
308
4.23M
      return;
309
4.23M
    }
310
311
3.66M
    auto& client_retryable_requests = clients_[data.client_id()];
312
3.66M
    auto& running_indexed_by_request_id = client_retryable_requests.running.get<RequestIdIndex>();
313
3.66M
    auto running_it = running_indexed_by_request_id.find(data.request_id());
314
3.66M
    if (running_it == running_indexed_by_request_id.end()) {
315
0
#ifndef NDEBUG
316
0
      LOG_WITH_PREFIX(ERROR) << "Running requests: "
317
0
                             << AsString(running_indexed_by_request_id);
318
0
#endif
319
0
      LOG_WITH_PREFIX(DFATAL) << "Replication finished for request with unknown id " << data;
320
0
      return;
321
0
    }
322
18.4E
    VLOG_WITH_PREFIX(4) << "Running " << (status.ok() ? "replicated " : "aborted ") << data
323
18.4E
                        << ", " << status;
324
325
3.66M
    static Status duplicate_write_status = STATUS(AlreadyPresent, "Duplicate request");
326
18.4E
    auto status_for_duplicate = status.ok() ? duplicate_write_status : status;
327
0
    for (const auto& duplicate : running_it->duplicate_rounds) {
328
0
      duplicate->NotifyReplicationFinished(status_for_duplicate, leader_term,
329
0
                                           nullptr /* applied_op_ids */);
330
0
    }
331
3.66M
    auto entry_time = running_it->time;
332
3.66M
    running_indexed_by_request_id.erase(running_it);
333
3.66M
    if (running_requests_gauge_) {
334
3.53M
      running_requests_gauge_->Decrement();
335
3.53M
    }
336
337
3.66M
    if (status.ok()) {
338
3.66M
      AddReplicated(
339
3.66M
          yb::OpId::FromPB(replicate_msg.id()), data, entry_time, &client_retryable_requests);
340
3.66M
    }
341
3.66M
  }
342
343
  void Bootstrap(
344
601k
      const ReplicateMsg& replicate_msg, RestartSafeCoarseTimePoint entry_time) {
345
601k
    auto data = ReplicateData::FromMsg(replicate_msg);
346
601k
    if (!data) {
347
11.7k
      return;
348
11.7k
    }
349
350
590k
    auto& client_retryable_requests = clients_[data.client_id()];
351
590k
    auto& running_indexed_by_request_id = client_retryable_requests.running.get<RequestIdIndex>();
352
590k
    if (running_indexed_by_request_id.count(data.request_id()) != 0) {
353
0
#ifndef NDEBUG
354
0
      LOG_WITH_PREFIX(ERROR) << "Running requests: "
355
0
                             << yb::ToString(running_indexed_by_request_id);
356
0
#endif
357
0
      LOG_WITH_PREFIX(DFATAL) << "Bootstrapped running request " << data;
358
0
      return;
359
0
    }
360
590k
    VLOG_WITH_PREFIX(4) << "Bootstrapped " << data;
361
362
590k
    CleanupReplicatedRequests(
363
590k
       data.write().min_running_request_id(), &client_retryable_requests);
364
365
590k
    AddReplicated(
366
590k
        yb::OpId::FromPB(replicate_msg.id()), data, entry_time, &client_retryable_requests);
367
590k
  }
368
369
13.1M
  RestartSafeCoarseMonoClock& Clock() {
370
13.1M
    return clock_;
371
13.1M
  }
372
373
83.3k
  void SetMetricEntity(const scoped_refptr<MetricEntity>& metric_entity) {
374
83.3k
    running_requests_gauge_ = METRIC_running_retryable_requests.Instantiate(metric_entity, 0);
375
83.3k
    replicated_request_ranges_gauge_ = METRIC_replicated_retryable_request_ranges.Instantiate(
376
83.3k
        metric_entity, 0);
377
83.3k
  }
378
379
0
  RetryableRequestsCounts TEST_Counts() {
380
0
    RetryableRequestsCounts result;
381
0
    for (const auto& p : clients_) {
382
0
      result.running += p.second.running.size();
383
0
      result.replicated += p.second.replicated.size();
384
0
      LOG_WITH_PREFIX(INFO) << "Replicated: " << yb::ToString(p.second.replicated);
385
0
    }
386
0
    return result;
387
0
  }
388
389
0
  Result<RetryableRequestId> MinRunningRequestId(const ClientId& client_id) const {
390
0
    const auto it = clients_.find(client_id);
391
0
    if (it == clients_.end()) {
392
0
      return STATUS_FORMAT(NotFound, "Client requests data not found for client $0", client_id);
393
0
    }
394
0
    return it->second.min_running_request_id;
395
0
  }
396
397
 private:
398
  void CleanupReplicatedRequests(
399
      RetryableRequestId new_min_running_request_id,
400
4.25M
      ClientRetryableRequests* client_retryable_requests) {
401
4.25M
    auto& replicated_indexed_by_last_id = client_retryable_requests->replicated.get<LastIdIndex>();
402
4.25M
    if (new_min_running_request_id > client_retryable_requests->min_running_request_id) {
403
      // We are not interested in ids below write_request.min_running_request_id() anymore.
404
      //
405
      // Request id intervals are ordered by last id of interval, and does not overlap.
406
      // So we are trying to find interval with last_id >= min_running_request_id
407
      // and trim it if necessary.
408
3.99M
      auto it = replicated_indexed_by_last_id.lower_bound(new_min_running_request_id);
409
3.99M
      if (it != replicated_indexed_by_last_id.end() &&
410
60.7k
          it->first_id < new_min_running_request_id) {
411
58.1k
        it->first_id = new_min_running_request_id;
412
58.1k
      }
413
3.99M
      if (replicated_request_ranges_gauge_) {
414
3.32M
        replicated_request_ranges_gauge_->DecrementBy(
415
3.32M
            std::distance(replicated_indexed_by_last_id.begin(), it));
416
3.32M
      }
417
      // Remove all intervals that has ids below write_request.min_running_request_id().
418
3.99M
      replicated_indexed_by_last_id.erase(replicated_indexed_by_last_id.begin(), it);
419
3.99M
      client_retryable_requests->min_running_request_id = new_min_running_request_id;
420
3.99M
    }
421
4.25M
  }
422
423
  void AddReplicated(yb::OpId op_id, const ReplicateData& data, RestartSafeCoarseTimePoint time,
424
4.25M
                     ClientRetryableRequests* client) {
425
4.25M
    auto request_id = data.request_id();
426
4.25M
    auto& replicated_indexed_by_last_id = client->replicated.get<LastIdIndex>();
427
4.25M
    auto request_it = replicated_indexed_by_last_id.lower_bound(request_id);
428
4.25M
    if (request_it != replicated_indexed_by_last_id.end() && request_it->first_id <= request_id) {
429
0
#ifndef NDEBUG
430
0
      LOG_WITH_PREFIX(ERROR)
431
0
          << "Replicated requests: " << yb::ToString(client->replicated);
432
0
#endif
433
434
0
      LOG_WITH_PREFIX(DFATAL) << "Request already replicated: " << data;
435
0
      return;
436
0
    }
437
438
    // Check that we have range right after this id, and we could extend it.
439
    // Requests rarely attaches to begin of interval, so we could don't check for
440
    // RangeTimeLimit() here.
441
4.25M
    if (request_it != replicated_indexed_by_last_id.end() &&
442
21.4k
        request_it->first_id == request_id + 1) {
443
19.4k
      op_id = std::min(request_it->min_op_id, op_id);
444
19.4k
      request_it->InsertTime(time);
445
      // If previous range is right before this id, then we could just join those ranges.
446
19.4k
      if (!TryJoinRanges(request_it, op_id, &replicated_indexed_by_last_id)) {
447
13.8k
        --(request_it->first_id);
448
13.8k
        UpdateMinOpId(request_it, op_id, &replicated_indexed_by_last_id);
449
13.8k
      }
450
19.4k
      return;
451
19.4k
    }
452
453
4.23M
    if (TryJoinToEndOfRange(request_it, op_id, request_id, time, &replicated_indexed_by_last_id)) {
454
291k
      return;
455
291k
    }
456
457
3.94M
    client->replicated.emplace(request_id, op_id, time);
458
3.94M
    if (replicated_request_ranges_gauge_) {
459
3.29M
      replicated_request_ranges_gauge_->Increment();
460
3.29M
    }
461
3.94M
  }
462
463
  void UpdateMinOpId(
464
      ReplicatedRetryableRequestRangesByLastId::iterator request_it,
465
      yb::OpId min_op_id,
466
311k
      ReplicatedRetryableRequestRangesByLastId* replicated_indexed_by_last_id) {
467
311k
    if (min_op_id < request_it->min_op_id) {
468
5.32k
      replicated_indexed_by_last_id->modify(request_it, [min_op_id](auto& entry) { // NOLINT
469
5.32k
        entry.min_op_id = min_op_id;
470
5.32k
      });
471
5.32k
    }
472
311k
  }
473
474
  bool TryJoinRanges(
475
      ReplicatedRetryableRequestRangesByLastId::iterator request_it,
476
      yb::OpId min_op_id,
477
19.4k
      ReplicatedRetryableRequestRangesByLastId* replicated_indexed_by_last_id) {
478
19.4k
    if (request_it == replicated_indexed_by_last_id->begin()) {
479
13.0k
      return false;
480
13.0k
    }
481
482
6.45k
    auto request_prev_it = request_it;
483
6.45k
    --request_prev_it;
484
485
    // We could join ranges if there is exactly one id between them, and request with that id was
486
    // just replicated...
487
6.45k
    if (request_prev_it->last_id + 2 != request_it->first_id) {
488
794
      return false;
489
794
    }
490
491
    // ...and time range will fit into limit.
492
5.65k
    if (request_it->max_time > request_prev_it->min_time + RangeTimeLimit()) {
493
0
      return false;
494
0
    }
495
496
5.65k
    min_op_id = std::min(min_op_id, request_prev_it->min_op_id);
497
5.65k
    request_it->PrepareJoinWithPrev(*request_prev_it);
498
5.65k
    replicated_indexed_by_last_id->erase(request_prev_it);
499
5.65k
    if (replicated_request_ranges_gauge_) {
500
5.12k
      replicated_request_ranges_gauge_->Decrement();
501
5.12k
    }
502
5.65k
    UpdateMinOpId(request_it, min_op_id, replicated_indexed_by_last_id);
503
504
5.65k
    return true;
505
5.65k
  }
506
507
  bool TryJoinToEndOfRange(
508
      ReplicatedRetryableRequestRangesByLastId::iterator request_it,
509
      yb::OpId op_id, RetryableRequestId request_id, RestartSafeCoarseTimePoint time,
510
4.23M
      ReplicatedRetryableRequestRangesByLastId* replicated_indexed_by_last_id) {
511
4.23M
    if (request_it == replicated_indexed_by_last_id->begin()) {
512
3.93M
      return false;
513
3.93M
    }
514
515
305k
    --request_it;
516
517
305k
    if (request_it->last_id + 1 != request_id) {
518
12.1k
      return false;
519
12.1k
    }
520
521
    // It is rare case when request is attaches to end of range, but his time is lower than
522
    // min_time. So we could avoid checking for the case when
523
    // time + RangeTimeLimit() > request_prev_it->max_time
524
293k
    if (time > request_it->min_time + RangeTimeLimit()) {
525
0
      return false;
526
0
    }
527
528
293k
    op_id = std::min(request_it->min_op_id, op_id);
529
293k
    request_it->InsertTime(time);
530
    // Actually we should use the modify function on client.replicated, but since the order of
531
    // ranges should not be changed, we could update last_id directly.
532
293k
    ++const_cast<ReplicatedRetryableRequestRange&>(*request_it).last_id;
533
534
293k
    UpdateMinOpId(request_it, op_id, replicated_indexed_by_last_id);
535
536
293k
    return true;
537
293k
  }
538
539
0
  const std::string& LogPrefix() const {
540
0
    return log_prefix_;
541
0
  }
542
543
  const std::string log_prefix_;
544
  std::unordered_map<ClientId, ClientRetryableRequests, ClientIdHash> clients_;
545
  RestartSafeCoarseMonoClock clock_;
546
  scoped_refptr<AtomicGauge<int64_t>> running_requests_gauge_;
547
  scoped_refptr<AtomicGauge<int64_t>> replicated_request_ranges_gauge_;
548
};
549
550
RetryableRequests::RetryableRequests(std::string log_prefix)
551
172k
    : impl_(new Impl(std::move(log_prefix))) {
552
172k
}
553
554
131k
RetryableRequests::~RetryableRequests() {
555
131k
}
556
557
0
RetryableRequests::RetryableRequests(RetryableRequests&& rhs) : impl_(std::move(rhs.impl_)) {}
558
559
83.3k
void RetryableRequests::operator=(RetryableRequests&& rhs) {
560
83.3k
  impl_ = std::move(rhs.impl_);
561
83.3k
}
562
563
bool RetryableRequests::Register(
564
4.47M
    const ConsensusRoundPtr& round, RestartSafeCoarseTimePoint entry_time) {
565
4.47M
  return impl_->Register(round, entry_time);
566
4.47M
}
567
568
6.40M
yb::OpId RetryableRequests::CleanExpiredReplicatedAndGetMinOpId() {
569
6.40M
  return impl_->CleanExpiredReplicatedAndGetMinOpId();
570
6.40M
}
571
572
void RetryableRequests::ReplicationFinished(
573
7.90M
    const ReplicateMsg& replicate_msg, const Status& status, int64_t leader_term) {
574
7.90M
  impl_->ReplicationFinished(replicate_msg, status, leader_term);
575
7.90M
}
576
577
void RetryableRequests::Bootstrap(
578
601k
    const ReplicateMsg& replicate_msg, RestartSafeCoarseTimePoint entry_time) {
579
601k
  impl_->Bootstrap(replicate_msg, entry_time);
580
601k
}
581
582
13.1M
RestartSafeCoarseMonoClock& RetryableRequests::Clock() {
583
13.1M
  return impl_->Clock();
584
13.1M
}
585
586
0
RetryableRequestsCounts RetryableRequests::TEST_Counts() {
587
0
  return impl_->TEST_Counts();
588
0
}
589
590
Result<RetryableRequestId> RetryableRequests::MinRunningRequestId(
591
0
    const ClientId& client_id) const {
592
0
  return impl_->MinRunningRequestId(client_id);
593
0
}
594
595
83.4k
void RetryableRequests::SetMetricEntity(const scoped_refptr<MetricEntity>& metric_entity) {
596
83.4k
  impl_->SetMetricEntity(metric_entity);
597
83.4k
}
598
599
} // namespace consensus
600
} // namespace yb