YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/client/batcher.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/client/batcher.h"
34
35
#include <algorithm>
36
#include <memory>
37
#include <mutex>
38
#include <set>
39
#include <string>
40
#include <unordered_map>
41
#include <utility>
42
#include <vector>
43
44
#include <boost/optional/optional_io.hpp>
45
#include <glog/logging.h>
46
47
#include "yb/client/async_rpc.h"
48
#include "yb/client/client-internal.h"
49
#include "yb/client/client.h"
50
#include "yb/client/client_error.h"
51
#include "yb/client/error.h"
52
#include "yb/client/error_collector.h"
53
#include "yb/client/in_flight_op.h"
54
#include "yb/client/meta_cache.h"
55
#include "yb/client/rejection_score_source.h"
56
#include "yb/client/schema.h"
57
#include "yb/client/session.h"
58
#include "yb/client/table.h"
59
#include "yb/client/transaction.h"
60
#include "yb/client/yb_op.h"
61
#include "yb/client/yb_table_name.h"
62
63
#include "yb/common/wire_protocol.h"
64
65
#include "yb/gutil/stl_util.h"
66
#include "yb/gutil/strings/join.h"
67
68
#include "yb/util/debug-util.h"
69
#include "yb/util/flag_tags.h"
70
#include "yb/util/format.h"
71
#include "yb/util/logging.h"
72
#include "yb/util/result.h"
73
#include "yb/util/status_format.h"
74
#include "yb/util/trace.h"
75
76
// When this flag is set to false and we have separate errors for operation, then batcher would
77
// report IO Error status. Otherwise we will try to combine errors from separate operation to
78
// status of batch. Useful in tests, when we don't need complex error analysis.
79
DEFINE_test_flag(bool, combine_batcher_errors, false,
80
                 "Whether combine errors into batcher status.");
81
DEFINE_test_flag(double, simulate_tablet_lookup_does_not_match_partition_key_probability, 0.0,
82
                 "Probability for simulating the error that happens when a key is not in the key "
83
                 "range of the resolved tablet's partition.");
84
85
using std::pair;
86
using std::set;
87
using std::unique_ptr;
88
using std::shared_ptr;
89
using std::unordered_map;
90
using strings::Substitute;
91
92
using namespace std::placeholders;
93
94
namespace yb {
95
96
using tserver::WriteResponsePB;
97
using tserver::WriteResponsePB_PerRowErrorPB;
98
99
namespace client {
100
101
namespace internal {
102
103
// TODO: instead of using a string error message, make Batcher return a status other than IOError.
104
// (https://github.com/YugaByte/yugabyte-db/issues/702)
105
const std::string Batcher::kErrorReachingOutToTServersMsg(
106
    "Errors occurred while reaching out to the tablet servers");
107
108
namespace {
109
110
const auto kGeneralErrorStatus = STATUS(IOError, Batcher::kErrorReachingOutToTServersMsg);
111
112
}  // namespace
113
114
// About lock ordering in this file:
115
// ------------------------------
116
// The locks must be acquired in the following order:
117
//   - Batcher::lock_
118
//   - InFlightOp::lock_
119
//
120
// It's generally important to release all the locks before either calling
121
// a user callback, or chaining to another async function, since that function
122
// may also chain directly to the callback. Without releasing locks first,
123
// the lock ordering may be violated, or a lock may deadlock on itself (these
124
// locks are non-reentrant).
125
// ------------------------------------------------------------
126
127
Batcher::Batcher(YBClient* client,
128
                 const YBSessionPtr& session,
129
                 YBTransactionPtr transaction,
130
                 ConsistentReadPoint* read_point,
131
                 bool force_consistent_read)
132
  : client_(client),
133
    weak_session_(session),
134
    async_rpc_metrics_(session->async_rpc_metrics()),
135
    transaction_(std::move(transaction)),
136
    read_point_(read_point),
137
11.6M
    force_consistent_read_(force_consistent_read) {
138
11.6M
}
139
140
11.6M
Batcher::~Batcher() {
141
11.6M
  
LOG_IF_WITH_PREFIX5.05k
(DFATAL, outstanding_rpcs_ != 0)
142
5.05k
      << "Destroying batcher with running rpcs: " << outstanding_rpcs_;
143
11.6M
  CHECK(
144
4.55k
      state_ == BatcherState::kComplete || state_ == BatcherState::kAborted ||
145
4.55k
      state_ == BatcherState::kGatheringOps)
146
4.55k
      << "Bad state: " << state_;
147
11.6M
}
148
149
0
void Batcher::Abort(const Status& status) {
150
0
  for (auto& op : ops_queue_) {
151
0
    error_collector_.AddError(op.yb_op, status);
152
0
  }
153
0
  combined_error_ = status;
154
0
  state_ = BatcherState::kAborted;
155
0
  FlushFinished();
156
0
}
157
158
11.6M
void Batcher::SetDeadline(CoarseTimePoint deadline) {
159
11.6M
  deadline_ = deadline;
160
11.6M
}
161
162
9.17M
bool Batcher::HasPendingOperations() const {
163
9.17M
  return !ops_.empty();
164
9.17M
}
165
166
0
size_t Batcher::CountBufferedOperations() const {
167
0
  if (state_ == BatcherState::kGatheringOps) {
168
0
    return ops_.size();
169
0
  } else {
170
    // If we've already started to flush, then the ops aren't
171
    // considered "buffered".
172
0
    return 0;
173
0
  }
174
0
}
175
176
11.6M
void Batcher::FlushFinished() {
177
11.6M
  if (
state_ != BatcherState::kAborted11.6M
) {
178
11.6M
    state_ = BatcherState::kComplete;
179
11.6M
  }
180
181
11.6M
  YBSessionPtr session = weak_session_.lock();
182
11.6M
  if (
session11.6M
) {
183
    // Important to do this outside of the lock so that we don't have
184
    // a lock inversion deadlock -- the session lock should always
185
    // come before the batcher lock.
186
11.6M
    session->FlushFinished(shared_from_this());
187
11.6M
  }
188
189
11.6M
  if (
combined_error_.ok()11.6M
&& error_collector_.CountErrors() != 0) {
190
    // In the general case, the user is responsible for fetching errors from the error collector.
191
    // TODO: use the Combined status here, so it is easy to recognize.
192
    // https://github.com/YugaByte/yugabyte-db/issues/702
193
135k
    combined_error_ = kGeneralErrorStatus;
194
135k
  }
195
196
11.6M
  RunCallback();
197
11.6M
}
198
199
11.5M
void Batcher::Run() {
200
11.5M
  flush_callback_(combined_error_);
201
11.5M
  flush_callback_ = StatusFunctor();
202
11.5M
}
203
204
11.5M
void Batcher::RunCallback() {
205
18.4E
  VLOG_WITH_PREFIX_AND_FUNC(4) << combined_error_;
206
207
11.5M
  if (!client_->callback_threadpool() ||
208
11.5M
      
!client_->callback_threadpool()->Submit(shared_from_this()).ok()0
) {
209
11.5M
    Run();
210
11.5M
  }
211
11.5M
}
212
213
void Batcher::FlushAsync(
214
11.6M
    StatusFunctor callback, const IsWithinTransactionRetry is_within_transaction_retry) {
215
11.6M
  
VLOG_WITH_PREFIX_AND_FUNC22.2k
(4) << "is_within_transaction_retry: " << is_within_transaction_retry22.2k
;
216
217
11.6M
  CHECK_EQ(state_, BatcherState::kGatheringOps);
218
11.6M
  state_ = BatcherState::kResolvingTablets;
219
220
11.6M
  const auto operations_count = ops_.size();
221
11.6M
  outstanding_lookups_ = operations_count;
222
223
11.6M
  flush_callback_ = std::move(callback);
224
11.6M
  auto session = weak_session_.lock();
225
226
11.6M
  if (session) {
227
    // Important to do this outside of the lock so that we don't have
228
    // a lock inversion deadlock -- the session lock should always
229
    // come before the batcher lock.
230
11.5M
    session->FlushStarted(shared_from_this());
231
11.5M
  }
232
233
11.6M
  auto transaction = this->transaction();
234
  // If YBSession retries previously failed ops within the same transaction, these ops are already
235
  // expected by transaction.
236
11.6M
  if (transaction && 
!is_within_transaction_retry1.16M
) {
237
1.16M
    transaction->batcher_if().ExpectOperations(operations_count);
238
1.16M
  }
239
240
11.6M
  ops_queue_.reserve(ops_.size());
241
23.2M
  for (auto& yb_op : ops_) {
242
23.2M
    ops_queue_.emplace_back(yb_op, ops_queue_.size());
243
23.2M
    auto& in_flight_op = ops_queue_.back();
244
23.2M
    auto status = yb_op->GetPartitionKey(&in_flight_op.partition_key);
245
246
23.2M
    if (status.ok() && 
yb_op->table()->partition_schema().IsHashPartitioning()23.2M
) {
247
20.4M
      if (in_flight_op.partition_key.empty()) {
248
241k
        if (!yb_op->read_only()) {
249
0
          status = STATUS_FORMAT(IllegalState, "Hash partition key is empty for $0", yb_op);
250
0
        }
251
20.2M
      } else {
252
20.2M
        yb_op->SetHashCode(PartitionSchema::DecodeMultiColumnHashValue(in_flight_op.partition_key));
253
20.2M
      }
254
20.4M
    }
255
256
23.2M
    if (!status.ok()) {
257
0
      combined_error_ = status;
258
0
      FlushFinished();
259
0
      return;
260
0
    }
261
23.2M
  }
262
263
11.6M
  auto shared_this = shared_from_this();
264
23.3M
  for (auto& op : ops_queue_) {
265
23.3M
    
VLOG_WITH_PREFIX14.2k
(4) << "Looking up tablet for " << op.ToString()
266
14.2k
                        << " partition key: " << Slice(op.partition_key).ToDebugHexString();
267
268
23.3M
    if (op.yb_op->tablet()) {
269
207k
      TabletLookupFinished(&op, op.yb_op->tablet());
270
23.0M
    } else {
271
23.0M
      client_->data_->meta_cache_->LookupTabletByKey(
272
23.0M
          op.yb_op->mutable_table(), op.partition_key, deadline_,
273
23.0M
          std::bind(&Batcher::TabletLookupFinished, shared_this, &op, _1));
274
23.0M
    }
275
23.3M
  }
276
11.6M
}
277
278
299
bool Batcher::Has(const std::shared_ptr<YBOperation>& yb_op) const {
279
301
  for (const auto& op : ops_) {
280
301
    if (op == yb_op) {
281
297
      return true;
282
297
    }
283
301
  }
284
2
  return false;
285
299
}
286
287
23.2M
void Batcher::Add(std::shared_ptr<YBOperation> op) {
288
23.2M
  if (state_ != BatcherState::kGatheringOps) {
289
0
    LOG_WITH_PREFIX(DFATAL)
290
0
        << "Adding op to batcher in a wrong state: " << state_ << "\n" << GetStackTrace();
291
0
    return;
292
0
  }
293
294
23.2M
  ops_.push_back(op);
295
23.2M
}
296
297
995k
void Batcher::CombineError(const InFlightOp& in_flight_op) {
298
995k
  if (ClientError(in_flight_op.error) == ClientErrorCode::kTablePartitionListIsStale) {
299
    // MetaCache returns ClientErrorCode::kTablePartitionListIsStale error for tablet lookup request
300
    // in case GetTabletLocations from master returns newer version of table partitions.
301
    // Since MetaCache has no write access to YBTable, it just returns an error which we receive
302
    // here and mark the table partitions as stale, so they will be refetched on retry.
303
2.01k
    in_flight_op.yb_op->MarkTablePartitionListAsStale();
304
2.01k
  }
305
306
995k
  error_collector_.AddError(in_flight_op.yb_op, in_flight_op.error);
307
995k
  if (FLAGS_TEST_combine_batcher_errors) {
308
4.01k
    if (combined_error_.ok()) {
309
5
      combined_error_ = in_flight_op.error.CloneAndPrepend(in_flight_op.ToString());
310
4.00k
    } else if (!combined_error_.IsCombined() &&
311
4.00k
               combined_error_.code() != in_flight_op.error.code()) {
312
0
      combined_error_ = STATUS(Combined, "Multiple failures");
313
0
    }
314
4.01k
  }
315
995k
}
316
317
void Batcher::TabletLookupFinished(
318
23.3M
    InFlightOp* op, Result<internal::RemoteTabletPtr> lookup_result) {
319
23.3M
  
VLOG_WITH_PREFIX_AND_FUNC7.79k
(lookup_result.ok() ? 4 : 3)
320
7.79k
      << "Op: " << op->ToString() << ", result: " << AsString(lookup_result);
321
322
23.3M
  if (lookup_result.ok()) {
323
23.3M
    op->tablet = *lookup_result;
324
23.3M
  } else {
325
9.22k
    op->error = lookup_result.status();
326
9.22k
  }
327
23.3M
  if (--outstanding_lookups_ == 0) {
328
11.6M
    AllLookupsDone();
329
11.6M
  }
330
23.3M
}
331
332
546
void Batcher::TransactionReady(const Status& status) {
333
546
  if (status.ok()) {
334
546
    ExecuteOperations(Initial::kFalse);
335
546
  } else {
336
0
    Abort(status);
337
0
  }
338
546
}
339
340
11.6M
std::map<PartitionKey, Status> Batcher::CollectOpsErrors() {
341
11.6M
  std::map<PartitionKey, Status> result;
342
23.3M
  for (auto& op : ops_queue_) {
343
23.3M
    if (
op.tablet23.3M
) {
344
23.3M
      const Partition& partition = op.tablet->partition();
345
346
23.3M
      bool partition_contains_row = false;
347
23.3M
      const auto& partition_key = op.partition_key;
348
23.3M
      switch (op.yb_op->type()) {
349
7.53M
        case YBOperation::QL_READ: FALLTHROUGH_INTENDED;
350
11.5M
        case YBOperation::QL_WRITE: FALLTHROUGH_INTENDED;
351
15.8M
        case YBOperation::PGSQL_READ: FALLTHROUGH_INTENDED;
352
23.0M
        case YBOperation::PGSQL_WRITE: FALLTHROUGH_INTENDED;
353
23.1M
        case YBOperation::REDIS_READ: FALLTHROUGH_INTENDED;
354
23.3M
        case YBOperation::REDIS_WRITE: {
355
23.3M
          partition_contains_row = partition.ContainsKey(partition_key);
356
23.3M
          break;
357
23.1M
        }
358
23.3M
      }
359
360
23.3M
      if (!partition_contains_row ||
361
23.3M
          (PREDICT_FALSE(
362
23.3M
              RandomActWithProbability(
363
23.3M
                  FLAGS_TEST_simulate_tablet_lookup_does_not_match_partition_key_probability) &&
364
23.3M
              op.yb_op->table()->name().namespace_name() == "yb_test"))) {
365
0
        const Schema& schema = GetSchema(op.yb_op->table()->schema());
366
0
        const PartitionSchema& partition_schema = op.yb_op->table()->partition_schema();
367
0
        const auto msg = Format(
368
0
            "Row $0 not in partition $1, partition key: $2, tablet: $3",
369
0
            op.yb_op->ToString(),
370
0
            partition_schema.PartitionDebugString(partition, schema),
371
0
            Slice(partition_key).ToDebugHexString(),
372
0
            op.tablet->tablet_id());
373
0
        LOG_WITH_PREFIX(DFATAL) << msg;
374
0
        op.error = STATUS(InternalError, msg);
375
0
      }
376
23.3M
    }
377
378
23.3M
    if (!op.error.ok()) {
379
884
      result.emplace(op.partition_key, op.error);
380
884
    }
381
23.3M
  }
382
383
11.6M
  return result;
384
11.6M
}
385
386
11.6M
void Batcher::AllLookupsDone() {
387
  // We're only ready to flush if both of the following conditions are true:
388
  // 1. The batcher is in the "resolving tablets" state (i.e. FlushAsync was called).
389
  // 2. All outstanding ops have finished lookup. Why? To avoid a situation
390
  //    where ops are flushed one by one as they finish lookup.
391
392
11.6M
  if (state_ != BatcherState::kResolvingTablets) {
393
0
    LOG(DFATAL) << __func__ << " is invoked in wrong state: " << state_;
394
0
    return;
395
0
  }
396
397
11.6M
  auto errors = CollectOpsErrors();
398
399
11.6M
  state_ = BatcherState::kTransactionPrepare;
400
401
11.6M
  
VLOG_WITH_PREFIX_AND_FUNC8.11k
(4)
402
8.11k
      << "Errors: " << errors.size() << ", ops queue: " << ops_queue_.size();
403
404
11.6M
  if (!errors.empty()) {
405
    // If some operation tablet lookup failed - set this error for all operations designated for
406
    // the same partition key. We are doing this to keep guarantee on the order of ops for the
407
    // same partition key (see InFlightOp::sequence_number_).
408
9.28k
    EraseIf([this, &errors](auto& op) {
409
9.28k
      if (op.error.ok()) {
410
8.40k
        auto lookup_error_it = errors.find(op.partition_key);
411
8.40k
        if (lookup_error_it != errors.end()) {
412
0
          op.error = lookup_error_it->second;
413
0
        }
414
8.40k
      }
415
9.28k
      if (!op.error.ok()) {
416
883
        this->CombineError(op);
417
883
        return true;
418
883
      }
419
8.40k
      return false;
420
9.28k
    }, &ops_queue_);
421
170
  }
422
423
  // Checking if ops_queue_ is empty after processing potential errors, because if some operation
424
  // tablet lookup failed, ops_queue_ could become empty inside `if (had_errors_) { ... }` block
425
  // above.
426
11.6M
  if (ops_queue_.empty()) {
427
24
    FlushFinished();
428
24
    return;
429
24
  }
430
431
  // All operations were added, and tablets for them were resolved.
432
  // So we could sort them.
433
11.6M
  std::sort(ops_queue_.begin(),
434
11.6M
            ops_queue_.end(),
435
56.1M
            [](const InFlightOp& lhs, const InFlightOp& rhs) {
436
56.1M
    if (lhs.tablet.get() == rhs.tablet.get()) {
437
44.6M
      auto lgroup = lhs.yb_op->group();
438
44.6M
      auto rgroup = rhs.yb_op->group();
439
44.6M
      if (lgroup != rgroup) {
440
111k
        return lgroup < rgroup;
441
111k
      }
442
44.5M
      return lhs.sequence_number < rhs.sequence_number;
443
44.6M
    }
444
11.5M
    return lhs.tablet.get() < rhs.tablet.get();
445
56.1M
  });
446
447
11.6M
  auto group_start = ops_queue_.begin();
448
11.6M
  auto current_group = (*group_start).yb_op->group();
449
11.6M
  const auto* current_tablet = (*group_start).tablet.get();
450
34.9M
  for (auto it = group_start; it != ops_queue_.end(); 
++it23.3M
) {
451
23.3M
    const auto it_group = (*it).yb_op->group();
452
23.3M
    const auto* it_tablet = (*it).tablet.get();
453
23.3M
    const auto it_table_partition_list_version = (*it).yb_op->partition_list_version();
454
23.3M
    if (it_table_partition_list_version.has_value() &&
455
23.3M
        
it_table_partition_list_version != it_tablet->partition_list_version()0
) {
456
0
      Abort(STATUS_EC_FORMAT(
457
0
          Aborted, ClientError(ClientErrorCode::kTablePartitionListVersionDoesNotMatch),
458
0
          "Operation $0 requested table partition list version $1, but ours is: $2",
459
0
          (*it).yb_op, it_table_partition_list_version.value(),
460
0
          it_tablet->partition_list_version()));
461
0
      return;
462
0
    }
463
23.3M
    if (current_tablet != it_tablet || 
current_group != it_group22.9M
) {
464
469k
      ops_info_.groups.emplace_back(group_start, it);
465
469k
      group_start = it;
466
469k
      current_group = it_group;
467
469k
      current_tablet = it_tablet;
468
469k
    }
469
23.3M
  }
470
11.6M
  ops_info_.groups.emplace_back(group_start, ops_queue_.end());
471
472
11.6M
  ExecuteOperations(Initial::kTrue);
473
11.6M
}
474
475
11.5M
void Batcher::ExecuteOperations(Initial initial) {
476
18.4E
  VLOG_WITH_PREFIX_AND_FUNC(3) << "initial: " << initial;
477
11.5M
  auto transaction = this->transaction();
478
11.5M
  if (transaction) {
479
    // If this Batcher is executed in context of transaction,
480
    // then this transaction should initialize metadata used by RPC calls.
481
    //
482
    // If transaction is not yet ready to do it, then it will notify as via provided when
483
    // it could be done.
484
1.16M
    if (!transaction->batcher_if().Prepare(
485
1.16M
        &ops_info_, force_consistent_read_, deadline_, initial,
486
1.16M
        std::bind(&Batcher::TransactionReady, shared_from_this(), _1))) {
487
546
      return;
488
546
    }
489
1.16M
  }
490
491
11.5M
  if (state_ != BatcherState::kTransactionPrepare) {
492
    // Batcher was aborted.
493
0
    LOG_IF(DFATAL, state_ != BatcherState::kAborted)
494
0
        << "Batcher in a wrong state at the moment the transaction became ready: " << state_;
495
0
    return;
496
0
  }
497
11.5M
  state_ = BatcherState::kTransactionReady;
498
499
11.5M
  const bool force_consistent_read = force_consistent_read_ || 
this->transaction()9.21M
;
500
501
  // Use big enough value for preallocated storage, to avoid unnecessary allocations.
502
11.5M
  boost::container::small_vector<std::shared_ptr<AsyncRpc>,
503
11.5M
                                 InFlightOpsGroupsWithMetadata::kPreallocatedCapacity> rpcs;
504
11.5M
  rpcs.reserve(ops_info_.groups.size());
505
506
  // Now flush the ops for each group.
507
  // Consistent read is not required when whole batch fits into one command.
508
11.5M
  const auto need_consistent_read = force_consistent_read || 
ops_info_.groups.size() > 19.07M
;
509
510
11.5M
  auto self = shared_from_this();
511
12.0M
  for (const auto& group : ops_info_.groups) {
512
    // Allow local calls for last group only.
513
12.0M
    const auto allow_local_calls =
514
12.0M
        allow_local_calls_in_curr_thread_ && 
(&group == &ops_info_.groups.back())9.46M
;
515
12.0M
    rpcs.push_back(CreateRpc(
516
12.0M
        self, group.begin->tablet.get(), group, allow_local_calls, need_consistent_read));
517
12.0M
  }
518
519
11.5M
  outstanding_rpcs_.store(rpcs.size());
520
12.0M
  for (const auto& rpc : rpcs) {
521
12.0M
    if (transaction) {
522
1.59M
      transaction->trace()->AddChildTrace(rpc->trace());
523
1.59M
    }
524
12.0M
    rpc->SendRpc();
525
12.0M
  }
526
11.5M
}
527
528
12.0M
rpc::Messenger* Batcher::messenger() const {
529
12.0M
  return client_->messenger();
530
12.0M
}
531
532
12.0M
rpc::ProxyCache& Batcher::proxy_cache() const {
533
12.0M
  return client_->proxy_cache();
534
12.0M
}
535
536
44.4M
YBTransactionPtr Batcher::transaction() const {
537
44.4M
  return transaction_;
538
44.4M
}
539
540
9.53M
const std::string& Batcher::proxy_uuid() const {
541
9.53M
  return client_->proxy_uuid();
542
9.53M
}
543
544
2.54M
const ClientId& Batcher::client_id() const {
545
2.54M
  return client_->id();
546
2.54M
}
547
548
std::pair<RetryableRequestId, RetryableRequestId> Batcher::NextRequestIdAndMinRunningRequestId(
549
2.54M
    const TabletId& tablet_id) {
550
2.54M
  return client_->NextRequestIdAndMinRunningRequestId(tablet_id);
551
2.54M
}
552
553
2.54M
void Batcher::RequestFinished(const TabletId& tablet_id, RetryableRequestId request_id) {
554
2.54M
  client_->RequestFinished(tablet_id, request_id);
555
2.54M
}
556
557
std::shared_ptr<AsyncRpc> Batcher::CreateRpc(
558
    const BatcherPtr& self, RemoteTablet* tablet, const InFlightOpsGroup& group,
559
12.0M
    const bool allow_local_calls_in_curr_thread, const bool need_consistent_read) {
560
18.4E
  VLOG_WITH_PREFIX_AND_FUNC(3) << "tablet: " << tablet->tablet_id();
561
562
12.0M
  CHECK(group.begin != group.end);
563
564
  // Create and send an RPC that aggregates the ops. The RPC is freed when
565
  // its callback completes.
566
  //
567
  // The RPC object takes ownership of the in flight ops.
568
  // The underlying YB OP is not directly owned, only a reference is kept.
569
570
  // Split the read operations according to consistency levels since based on consistency
571
  // levels the read algorithm would differ.
572
12.0M
  const auto op_group = (*group.begin).yb_op->group();
573
12.0M
  AsyncRpcData data {
574
12.0M
    .batcher = self,
575
12.0M
    .tablet = tablet,
576
12.0M
    .allow_local_calls_in_curr_thread = allow_local_calls_in_curr_thread,
577
12.0M
    .need_consistent_read = need_consistent_read,
578
12.0M
    .ops = InFlightOps(group.begin, group.end),
579
12.0M
    .need_metadata = group.need_metadata
580
12.0M
  };
581
582
12.0M
  switch (op_group) {
583
2.54M
    case OpGroup::kWrite:
584
2.54M
      return std::make_shared<WriteRpc>(data);
585
9.50M
    case OpGroup::kLeaderRead:
586
9.50M
      return std::make_shared<ReadRpc>(data, YBConsistencyLevel::STRONG);
587
22.0k
    case OpGroup::kConsistentPrefixRead:
588
22.0k
      return std::make_shared<ReadRpc>(data, YBConsistencyLevel::CONSISTENT_PREFIX);
589
12.0M
  }
590
0
  FATAL_INVALID_ENUM_VALUE(OpGroup, op_group);
591
0
}
592
593
using tserver::ReadResponsePB;
594
595
0
void Batcher::AddOpCountMismatchError() {
596
  // TODO: how to handle this kind of error where the array of response PB's don't match
597
  //       the size of the array of requests. We don't have a specific YBOperation to
598
  //       create an error with, because there are multiple YBOps in one Rpc.
599
0
  LOG_WITH_PREFIX(DFATAL) << "Received wrong number of responses compared to request(s) sent.";
600
0
}
601
602
// Invoked when all RPCs are responded, so no other methods should be run in parallel to it.
603
void Batcher::Flushed(
604
12.0M
    const InFlightOps& ops, const Status& status, FlushExtraResult flush_extra_result) {
605
12.0M
  auto transaction = this->transaction();
606
12.0M
  if (transaction) {
607
1.59M
    const auto ops_will_be_retried = !status.ok() && 
ShouldSessionRetryError(status)182k
;
608
1.59M
    if (
!ops_will_be_retried1.59M
) {
609
      // We don't call Transaction::Flushed for ops that will be retried within the same
610
      // transaction in order to keep transaction running until we finally retry all operations
611
      // successfully or decide to fail and abort the transaction.
612
      // We also don't call Transaction::Flushed for ops that have been retried, but failed during
613
      // the retry.
614
      // See comments for YBTransaction::Impl::running_requests_ and
615
      // YBSession::AddErrorsAndRunCallback.
616
      // https://github.com/yugabyte/yugabyte-db/issues/7984.
617
1.59M
      transaction->batcher_if().Flushed(ops, flush_extra_result.used_read_time, status);
618
1.59M
    }
619
1.59M
  }
620
12.0M
  if (status.ok() && 
read_point_11.8M
) {
621
11.6M
    read_point_->UpdateClock(flush_extra_result.propagated_hybrid_time);
622
11.6M
  }
623
624
12.0M
  if (--outstanding_rpcs_ == 0) {
625
23.2M
    for (auto& op : ops_queue_) {
626
23.2M
      if (!op.error.ok()) {
627
995k
        CombineError(op);
628
995k
      }
629
23.2M
    }
630
11.6M
    FlushFinished();
631
11.6M
  }
632
12.0M
}
633
634
12.0M
void Batcher::ProcessRpcStatus(const AsyncRpc &rpc, const Status &s) {
635
12.0M
  
VLOG_WITH_PREFIX_AND_FUNC7.36k
(4) << "rpc: " << AsString(rpc) << ", status: " << s7.36k
;
636
637
12.0M
  if (state_ != BatcherState::kTransactionReady) {
638
0
    LOG_WITH_PREFIX(DFATAL) << "ProcessRpcStatus in wrong state " << ToString(state_) << ": "
639
0
                            << rpc.ToString() << ", " << s;
640
0
    return;
641
0
  }
642
643
12.0M
  if (PREDICT_FALSE(!s.ok())) {
644
    // Mark each of the ops as failed, since the whole RPC failed.
645
994k
    for (auto& in_flight_op : rpc.ops()) {
646
994k
      in_flight_op.error = s;
647
994k
    }
648
183k
  }
649
12.0M
}
650
651
9.53M
void Batcher::ProcessReadResponse(const ReadRpc &rpc, const Status &s) {
652
9.53M
  ProcessRpcStatus(rpc, s);
653
9.53M
}
654
655
2.54M
void Batcher::ProcessWriteResponse(const WriteRpc &rpc, const Status &s) {
656
2.54M
  ProcessRpcStatus(rpc, s);
657
658
2.54M
  if (s.ok() && 
rpc.resp().has_propagated_hybrid_time()2.45M
) {
659
2.45M
    client_->data_->UpdateLatestObservedHybridTime(rpc.resp().propagated_hybrid_time());
660
2.45M
  }
661
662
  // Check individual row errors.
663
2.54M
  for (const auto& err_pb : rpc.resp().per_row_errors()) {
664
    // TODO: handle case where we get one of the more specific TS errors
665
    // like the tablet not being hosted?
666
667
0
    size_t row_index = err_pb.row_index();
668
0
    if (row_index >= rpc.ops().size()) {
669
0
      LOG_WITH_PREFIX(ERROR) << "Received a per_row_error for an out-of-bound op index "
670
0
                             << row_index << " (sent only " << rpc.ops().size() << " ops)";
671
0
      LOG_WITH_PREFIX(ERROR) << "Response from tablet " << rpc.tablet().tablet_id() << ":\n"
672
0
                             << rpc.resp().DebugString();
673
0
      continue;
674
0
    }
675
0
    shared_ptr<YBOperation> yb_op = rpc.ops()[row_index].yb_op;
676
0
    VLOG_WITH_PREFIX(1) << "Error on op " << yb_op->ToString() << ": "
677
0
                        << err_pb.error().ShortDebugString();
678
0
    rpc.ops()[err_pb.row_index()].error = StatusFromPB(err_pb.error());
679
0
  }
680
2.54M
}
681
682
12.1M
double Batcher::RejectionScore(int attempt_num) {
683
12.1M
  if (!rejection_score_source_) {
684
2.88M
    return 0.0;
685
2.88M
  }
686
687
9.23M
  return rejection_score_source_->Get(attempt_num);
688
12.1M
}
689
690
11.5M
CollectedErrors Batcher::GetAndClearPendingErrors() {
691
11.5M
  return error_collector_.GetAndClearErrors();
692
11.5M
}
693
694
1
std::string Batcher::LogPrefix() const {
695
1
  const void* self = this;
696
1
  return Format(
697
1
      "Batcher ($0), session ($1): ", self, static_cast<void*>(weak_session_.lock().get()));
698
1
}
699
700
InFlightOpsGroup::InFlightOpsGroup(const Iterator& group_begin, const Iterator& group_end)
701
12.0M
    : begin(group_begin), end(group_end) {
702
12.0M
}
703
704
0
std::string InFlightOpsGroup::ToString() const {
705
0
  return Format("{items: $0 need_metadata: $1}",
706
0
                AsString(boost::make_iterator_range(begin, end)),
707
0
                need_metadata);
708
0
}
709
710
}  // namespace internal
711
}  // namespace client
712
}  // namespace yb