YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/client/batcher.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/client/batcher.h"
34
35
#include <algorithm>
36
#include <memory>
37
#include <mutex>
38
#include <set>
39
#include <string>
40
#include <unordered_map>
41
#include <utility>
42
#include <vector>
43
44
#include <boost/optional/optional_io.hpp>
45
#include <glog/logging.h>
46
47
#include "yb/client/async_rpc.h"
48
#include "yb/client/client-internal.h"
49
#include "yb/client/client.h"
50
#include "yb/client/client_error.h"
51
#include "yb/client/error.h"
52
#include "yb/client/error_collector.h"
53
#include "yb/client/in_flight_op.h"
54
#include "yb/client/meta_cache.h"
55
#include "yb/client/rejection_score_source.h"
56
#include "yb/client/schema.h"
57
#include "yb/client/session.h"
58
#include "yb/client/table.h"
59
#include "yb/client/transaction.h"
60
#include "yb/client/yb_op.h"
61
#include "yb/client/yb_table_name.h"
62
63
#include "yb/common/wire_protocol.h"
64
65
#include "yb/gutil/stl_util.h"
66
#include "yb/gutil/strings/join.h"
67
68
#include "yb/util/debug-util.h"
69
#include "yb/util/flag_tags.h"
70
#include "yb/util/format.h"
71
#include "yb/util/logging.h"
72
#include "yb/util/result.h"
73
#include "yb/util/status_format.h"
74
#include "yb/util/trace.h"
75
76
// When this flag is set to false and we have separate errors for operation, then batcher would
77
// report IO Error status. Otherwise we will try to combine errors from separate operation to
78
// status of batch. Useful in tests, when we don't need complex error analysis.
79
DEFINE_test_flag(bool, combine_batcher_errors, false,
80
                 "Whether combine errors into batcher status.");
81
DEFINE_test_flag(double, simulate_tablet_lookup_does_not_match_partition_key_probability, 0.0,
82
                 "Probability for simulating the error that happens when a key is not in the key "
83
                 "range of the resolved tablet's partition.");
84
85
using std::pair;
86
using std::set;
87
using std::unique_ptr;
88
using std::shared_ptr;
89
using std::unordered_map;
90
using strings::Substitute;
91
92
using namespace std::placeholders;
93
94
namespace yb {
95
96
using tserver::WriteResponsePB;
97
using tserver::WriteResponsePB_PerRowErrorPB;
98
99
namespace client {
100
101
namespace internal {
102
103
// TODO: instead of using a string error message, make Batcher return a status other than IOError.
104
// (https://github.com/YugaByte/yugabyte-db/issues/702)
105
const std::string Batcher::kErrorReachingOutToTServersMsg(
106
    "Errors occurred while reaching out to the tablet servers");
107
108
namespace {
109
110
const auto kGeneralErrorStatus = STATUS(IOError, Batcher::kErrorReachingOutToTServersMsg);
111
112
}  // namespace
113
114
// About lock ordering in this file:
115
// ------------------------------
116
// The locks must be acquired in the following order:
117
//   - Batcher::lock_
118
//   - InFlightOp::lock_
119
//
120
// It's generally important to release all the locks before either calling
121
// a user callback, or chaining to another async function, since that function
122
// may also chain directly to the callback. Without releasing locks first,
123
// the lock ordering may be violated, or a lock may deadlock on itself (these
124
// locks are non-reentrant).
125
// ------------------------------------------------------------
126
127
Batcher::Batcher(YBClient* client,
128
                 const YBSessionPtr& session,
129
                 YBTransactionPtr transaction,
130
                 ConsistentReadPoint* read_point,
131
                 bool force_consistent_read)
132
  : client_(client),
133
    weak_session_(session),
134
    async_rpc_metrics_(session->async_rpc_metrics()),
135
    transaction_(std::move(transaction)),
136
    read_point_(read_point),
137
5.75M
    force_consistent_read_(force_consistent_read) {
138
5.75M
}
139
140
5.77M
Batcher::~Batcher() {
141
4.27k
  LOG_IF_WITH_PREFIX(DFATAL, outstanding_rpcs_ != 0)
142
4.27k
      << "Destroying batcher with running rpcs: " << outstanding_rpcs_;
143
6.27k
  CHECK(
144
6.27k
      state_ == BatcherState::kComplete || state_ == BatcherState::kAborted ||
145
6.27k
      state_ == BatcherState::kGatheringOps)
146
6.27k
      << "Bad state: " << state_;
147
5.77M
}
148
149
0
void Batcher::Abort(const Status& status) {
150
0
  for (auto& op : ops_queue_) {
151
0
    error_collector_.AddError(op.yb_op, status);
152
0
  }
153
0
  combined_error_ = status;
154
0
  state_ = BatcherState::kAborted;
155
0
  FlushFinished();
156
0
}
157
158
5.77M
void Batcher::SetDeadline(CoarseTimePoint deadline) {
159
5.77M
  deadline_ = deadline;
160
5.77M
}
161
162
4.81M
bool Batcher::HasPendingOperations() const {
163
4.81M
  return !ops_.empty();
164
4.81M
}
165
166
0
size_t Batcher::CountBufferedOperations() const {
167
0
  if (state_ == BatcherState::kGatheringOps) {
168
0
    return ops_.size();
169
0
  } else {
170
    // If we've already started to flush, then the ops aren't
171
    // considered "buffered".
172
0
    return 0;
173
0
  }
174
0
}
175
176
5.76M
void Batcher::FlushFinished() {
177
5.76M
  if (state_ != BatcherState::kAborted) {
178
5.76M
    state_ = BatcherState::kComplete;
179
5.76M
  }
180
181
5.76M
  YBSessionPtr session = weak_session_.lock();
182
5.77M
  if (session) {
183
    // Important to do this outside of the lock so that we don't have
184
    // a lock inversion deadlock -- the session lock should always
185
    // come before the batcher lock.
186
5.77M
    session->FlushFinished(shared_from_this());
187
5.77M
  }
188
189
5.77M
  if (combined_error_.ok() && error_collector_.CountErrors() != 0) {
190
    // In the general case, the user is responsible for fetching errors from the error collector.
191
    // TODO: use the Combined status here, so it is easy to recognize.
192
    // https://github.com/YugaByte/yugabyte-db/issues/702
193
103k
    combined_error_ = kGeneralErrorStatus;
194
103k
  }
195
196
5.76M
  RunCallback();
197
5.76M
}
198
199
5.75M
void Batcher::Run() {
200
5.75M
  flush_callback_(combined_error_);
201
5.75M
  flush_callback_ = StatusFunctor();
202
5.75M
}
203
204
5.75M
void Batcher::RunCallback() {
205
18.4E
  VLOG_WITH_PREFIX_AND_FUNC(4) << combined_error_;
206
207
5.75M
  if (!client_->callback_threadpool() ||
208
5.75M
      !client_->callback_threadpool()->Submit(shared_from_this()).ok()) {
209
5.75M
    Run();
210
5.75M
  }
211
5.75M
}
212
213
void Batcher::FlushAsync(
214
5.77M
    StatusFunctor callback, const IsWithinTransactionRetry is_within_transaction_retry) {
215
12.6k
  VLOG_WITH_PREFIX_AND_FUNC(4) << "is_within_transaction_retry: " << is_within_transaction_retry;
216
217
5.77M
  CHECK_EQ(state_, BatcherState::kGatheringOps);
218
5.77M
  state_ = BatcherState::kResolvingTablets;
219
220
5.77M
  const auto operations_count = ops_.size();
221
5.77M
  outstanding_lookups_ = operations_count;
222
223
5.77M
  flush_callback_ = std::move(callback);
224
5.77M
  auto session = weak_session_.lock();
225
226
5.77M
  if (session) {
227
    // Important to do this outside of the lock so that we don't have
228
    // a lock inversion deadlock -- the session lock should always
229
    // come before the batcher lock.
230
5.75M
    session->FlushStarted(shared_from_this());
231
5.75M
  }
232
233
5.77M
  auto transaction = this->transaction();
234
  // If YBSession retries previously failed ops within the same transaction, these ops are already
235
  // expected by transaction.
236
5.77M
  if (transaction && !is_within_transaction_retry) {
237
568k
    transaction->batcher_if().ExpectOperations(operations_count);
238
568k
  }
239
240
5.77M
  ops_queue_.reserve(ops_.size());
241
11.0M
  for (auto& yb_op : ops_) {
242
11.0M
    ops_queue_.emplace_back(yb_op, ops_queue_.size());
243
11.0M
    auto& in_flight_op = ops_queue_.back();
244
11.0M
    auto status = yb_op->GetPartitionKey(&in_flight_op.partition_key);
245
246
11.0M
    if (status.ok() && yb_op->table()->partition_schema().IsHashPartitioning()) {
247
9.98M
      if (in_flight_op.partition_key.empty()) {
248
188k
        if (!yb_op->read_only()) {
249
0
          status = STATUS_FORMAT(IllegalState, "Hash partition key is empty for $0", yb_op);
250
0
        }
251
9.79M
      } else {
252
9.79M
        yb_op->SetHashCode(PartitionSchema::DecodeMultiColumnHashValue(in_flight_op.partition_key));
253
9.79M
      }
254
9.98M
    }
255
256
11.0M
    if (!status.ok()) {
257
0
      combined_error_ = status;
258
0
      FlushFinished();
259
0
      return;
260
0
    }
261
11.0M
  }
262
263
5.77M
  auto shared_this = shared_from_this();
264
11.0M
  for (auto& op : ops_queue_) {
265
4.13k
    VLOG_WITH_PREFIX(4) << "Looking up tablet for " << op.ToString()
266
4.13k
                        << " partition key: " << Slice(op.partition_key).ToDebugHexString();
267
268
11.0M
    if (op.yb_op->tablet()) {
269
104k
      TabletLookupFinished(&op, op.yb_op->tablet());
270
10.9M
    } else {
271
10.9M
      client_->data_->meta_cache_->LookupTabletByKey(
272
10.9M
          op.yb_op->mutable_table(), op.partition_key, deadline_,
273
10.9M
          std::bind(&Batcher::TabletLookupFinished, shared_this, &op, _1));
274
10.9M
    }
275
11.0M
  }
276
5.77M
}
277
278
358
bool Batcher::Has(const std::shared_ptr<YBOperation>& yb_op) const {
279
361
  for (const auto& op : ops_) {
280
361
    if (op == yb_op) {
281
357
      return true;
282
357
    }
283
361
  }
284
1
  return false;
285
358
}
286
287
11.0M
void Batcher::Add(std::shared_ptr<YBOperation> op) {
288
11.0M
  if (state_ != BatcherState::kGatheringOps) {
289
0
    LOG_WITH_PREFIX(DFATAL)
290
0
        << "Adding op to batcher in a wrong state: " << state_ << "\n" << GetStackTrace();
291
0
    return;
292
0
  }
293
294
11.0M
  ops_.push_back(op);
295
11.0M
}
296
297
496k
void Batcher::CombineError(const InFlightOp& in_flight_op) {
298
496k
  if (ClientError(in_flight_op.error) == ClientErrorCode::kTablePartitionListIsStale) {
299
    // MetaCache returns ClientErrorCode::kTablePartitionListIsStale error for tablet lookup request
300
    // in case GetTabletLocations from master returns newer version of table partitions.
301
    // Since MetaCache has no write access to YBTable, it just returns an error which we receive
302
    // here and mark the table partitions as stale, so they will be refetched on retry.
303
502
    in_flight_op.yb_op->MarkTablePartitionListAsStale();
304
502
  }
305
306
496k
  error_collector_.AddError(in_flight_op.yb_op, in_flight_op.error);
307
496k
  if (FLAGS_TEST_combine_batcher_errors) {
308
5
    if (combined_error_.ok()) {
309
5
      combined_error_ = in_flight_op.error.CloneAndPrepend(in_flight_op.ToString());
310
0
    } else if (!combined_error_.IsCombined() &&
311
0
               combined_error_.code() != in_flight_op.error.code()) {
312
0
      combined_error_ = STATUS(Combined, "Multiple failures");
313
0
    }
314
5
  }
315
496k
}
316
317
void Batcher::TabletLookupFinished(
318
11.1M
    InFlightOp* op, Result<internal::RemoteTabletPtr> lookup_result) {
319
221
  VLOG_WITH_PREFIX_AND_FUNC(lookup_result.ok() ? 4 : 3)
320
221
      << "Op: " << op->ToString() << ", result: " << AsString(lookup_result);
321
322
11.1M
  if (lookup_result.ok()) {
323
11.0M
    op->tablet = *lookup_result;
324
5.40k
  } else {
325
5.40k
    op->error = lookup_result.status();
326
5.40k
  }
327
11.1M
  if (--outstanding_lookups_ == 0) {
328
5.77M
    AllLookupsDone();
329
5.77M
  }
330
11.1M
}
331
332
196
void Batcher::TransactionReady(const Status& status) {
333
196
  if (status.ok()) {
334
196
    ExecuteOperations(Initial::kFalse);
335
0
  } else {
336
0
    Abort(status);
337
0
  }
338
196
}
339
340
5.77M
std::map<PartitionKey, Status> Batcher::CollectOpsErrors() {
341
5.77M
  std::map<PartitionKey, Status> result;
342
11.0M
  for (auto& op : ops_queue_) {
343
11.0M
    if (op.tablet) {
344
11.0M
      const Partition& partition = op.tablet->partition();
345
346
11.0M
      bool partition_contains_row = false;
347
11.0M
      const auto& partition_key = op.partition_key;
348
11.0M
      switch (op.yb_op->type()) {
349
3.93M
        case YBOperation::QL_READ: FALLTHROUGH_INTENDED;
350
6.96M
        case YBOperation::QL_WRITE: FALLTHROUGH_INTENDED;
351
8.86M
        case YBOperation::PGSQL_READ: FALLTHROUGH_INTENDED;
352
10.9M
        case YBOperation::PGSQL_WRITE: FALLTHROUGH_INTENDED;
353
11.0M
        case YBOperation::REDIS_READ: FALLTHROUGH_INTENDED;
354
11.0M
        case YBOperation::REDIS_WRITE: {
355
11.0M
          partition_contains_row = partition.ContainsKey(partition_key);
356
11.0M
          break;
357
11.1M
        }
358
11.1M
      }
359
360
11.1M
      if (!partition_contains_row ||
361
11.0M
          (PREDICT_FALSE(
362
11.0M
              RandomActWithProbability(
363
11.0M
                  FLAGS_TEST_simulate_tablet_lookup_does_not_match_partition_key_probability) &&
364
0
              op.yb_op->table()->name().namespace_name() == "yb_test"))) {
365
0
        const Schema& schema = GetSchema(op.yb_op->table()->schema());
366
0
        const PartitionSchema& partition_schema = op.yb_op->table()->partition_schema();
367
0
        const auto msg = Format(
368
0
            "Row $0 not in partition $1, partition key: $2, tablet: $3",
369
0
            op.yb_op->ToString(),
370
0
            partition_schema.PartitionDebugString(partition, schema),
371
0
            Slice(partition_key).ToDebugHexString(),
372
0
            op.tablet->tablet_id());
373
0
        LOG_WITH_PREFIX(DFATAL) << msg;
374
0
        op.error = STATUS(InternalError, msg);
375
0
      }
376
11.1M
    }
377
378
11.1M
    if (!op.error.ok()) {
379
1.88k
      result.emplace(op.partition_key, op.error);
380
1.88k
    }
381
11.1M
  }
382
383
5.78M
  return result;
384
5.77M
}
385
386
5.77M
void Batcher::AllLookupsDone() {
387
  // We're only ready to flush if both of the following conditions are true:
388
  // 1. The batcher is in the "resolving tablets" state (i.e. FlushAsync was called).
389
  // 2. All outstanding ops have finished lookup. Why? To avoid a situation
390
  //    where ops are flushed one by one as they finish lookup.
391
392
5.77M
  if (state_ != BatcherState::kResolvingTablets) {
393
0
    LOG(DFATAL) << __func__ << " is invoked in wrong state: " << state_;
394
0
    return;
395
0
  }
396
397
5.77M
  auto errors = CollectOpsErrors();
398
399
5.77M
  state_ = BatcherState::kTransactionPrepare;
400
401
9.96k
  VLOG_WITH_PREFIX_AND_FUNC(4)
402
9.96k
      << "Errors: " << errors.size() << ", ops queue: " << ops_queue_.size();
403
404
5.77M
  if (!errors.empty()) {
405
    // If some operation tablet lookup failed - set this error for all operations designated for
406
    // the same partition key. We are doing this to keep guarantee on the order of ops for the
407
    // same partition key (see InFlightOp::sequence_number_).
408
4.25k
    EraseIf([this, &errors](auto& op) {
409
4.25k
      if (op.error.ok()) {
410
2.37k
        auto lookup_error_it = errors.find(op.partition_key);
411
2.37k
        if (lookup_error_it != errors.end()) {
412
0
          op.error = lookup_error_it->second;
413
0
        }
414
2.37k
      }
415
4.25k
      if (!op.error.ok()) {
416
1.88k
        this->CombineError(op);
417
1.88k
        return true;
418
1.88k
      }
419
2.36k
      return false;
420
2.36k
    }, &ops_queue_);
421
106
  }
422
423
  // Checking if ops_queue_ is empty after processing potential errors, because if some operation
424
  // tablet lookup failed, ops_queue_ could become empty inside `if (had_errors_) { ... }` block
425
  // above.
426
5.77M
  if (ops_queue_.empty()) {
427
35
    FlushFinished();
428
35
    return;
429
35
  }
430
431
  // All operations were added, and tablets for them were resolved.
432
  // So we could sort them.
433
5.77M
  std::sort(ops_queue_.begin(),
434
5.77M
            ops_queue_.end(),
435
19.6M
            [](const InFlightOp& lhs, const InFlightOp& rhs) {
436
19.6M
    if (lhs.tablet.get() == rhs.tablet.get()) {
437
15.5M
      auto lgroup = lhs.yb_op->group();
438
15.5M
      auto rgroup = rhs.yb_op->group();
439
15.5M
      if (lgroup != rgroup) {
440
38.0k
        return lgroup < rgroup;
441
38.0k
      }
442
15.5M
      return lhs.sequence_number < rhs.sequence_number;
443
15.5M
    }
444
4.14M
    return lhs.tablet.get() < rhs.tablet.get();
445
4.14M
  });
446
447
5.77M
  auto group_start = ops_queue_.begin();
448
5.77M
  auto current_group = (*group_start).yb_op->group();
449
5.77M
  const auto* current_tablet = (*group_start).tablet.get();
450
16.8M
  for (auto it = group_start; it != ops_queue_.end(); ++it) {
451
11.0M
    const auto it_group = (*it).yb_op->group();
452
11.0M
    const auto* it_tablet = (*it).tablet.get();
453
11.0M
    const auto it_table_partition_list_version = (*it).yb_op->partition_list_version();
454
11.0M
    if (it_table_partition_list_version.has_value() &&
455
0
        it_table_partition_list_version != it_tablet->partition_list_version()) {
456
0
      Abort(STATUS_EC_FORMAT(
457
0
          Aborted, ClientError(ClientErrorCode::kTablePartitionListVersionDoesNotMatch),
458
0
          "Operation $0 requested table partition list version $1, but ours is: $2",
459
0
          (*it).yb_op, it_table_partition_list_version.value(),
460
0
          it_tablet->partition_list_version()));
461
0
      return;
462
0
    }
463
11.0M
    if (current_tablet != it_tablet || current_group != it_group) {
464
238k
      ops_info_.groups.emplace_back(group_start, it);
465
238k
      group_start = it;
466
238k
      current_group = it_group;
467
238k
      current_tablet = it_tablet;
468
238k
    }
469
11.0M
  }
470
5.77M
  ops_info_.groups.emplace_back(group_start, ops_queue_.end());
471
472
5.77M
  ExecuteOperations(Initial::kTrue);
473
5.77M
}
474
475
5.75M
void Batcher::ExecuteOperations(Initial initial) {
476
18.4E
  VLOG_WITH_PREFIX_AND_FUNC(3) << "initial: " << initial;
477
5.75M
  auto transaction = this->transaction();
478
5.75M
  if (transaction) {
479
    // If this Batcher is executed in context of transaction,
480
    // then this transaction should initialize metadata used by RPC calls.
481
    //
482
    // If transaction is not yet ready to do it, then it will notify as via provided when
483
    // it could be done.
484
568k
    if (!transaction->batcher_if().Prepare(
485
568k
        &ops_info_, force_consistent_read_, deadline_, initial,
486
196
        std::bind(&Batcher::TransactionReady, shared_from_this(), _1))) {
487
196
      return;
488
196
    }
489
5.75M
  }
490
491
5.75M
  if (state_ != BatcherState::kTransactionPrepare) {
492
    // Batcher was aborted.
493
0
    LOG_IF(DFATAL, state_ != BatcherState::kAborted)
494
0
        << "Batcher in a wrong state at the moment the transaction became ready: " << state_;
495
0
    return;
496
0
  }
497
5.75M
  state_ = BatcherState::kTransactionReady;
498
499
5.75M
  const bool force_consistent_read = force_consistent_read_ || this->transaction();
500
501
  // Use big enough value for preallocated storage, to avoid unnecessary allocations.
502
5.75M
  boost::container::small_vector<std::shared_ptr<AsyncRpc>,
503
5.75M
                                 InFlightOpsGroupsWithMetadata::kPreallocatedCapacity> rpcs;
504
5.75M
  rpcs.reserve(ops_info_.groups.size());
505
506
  // Now flush the ops for each group.
507
  // Consistent read is not required when whole batch fits into one command.
508
5.75M
  const auto need_consistent_read = force_consistent_read || ops_info_.groups.size() > 1;
509
510
5.75M
  auto self = shared_from_this();
511
6.00M
  for (const auto& group : ops_info_.groups) {
512
    // Allow local calls for last group only.
513
6.00M
    const auto allow_local_calls =
514
6.00M
        allow_local_calls_in_curr_thread_ && (&group == &ops_info_.groups.back());
515
6.00M
    rpcs.push_back(CreateRpc(
516
6.00M
        self, group.begin->tablet.get(), group, allow_local_calls, need_consistent_read));
517
6.00M
  }
518
519
5.75M
  outstanding_rpcs_.store(rpcs.size());
520
6.00M
  for (const auto& rpc : rpcs) {
521
6.00M
    if (transaction) {
522
782k
      transaction->trace()->AddChildTrace(rpc->trace());
523
782k
    }
524
6.00M
    rpc->SendRpc();
525
6.00M
  }
526
5.75M
}
527
528
6.00M
rpc::Messenger* Batcher::messenger() const {
529
6.00M
  return client_->messenger();
530
6.00M
}
531
532
5.99M
rpc::ProxyCache& Batcher::proxy_cache() const {
533
5.99M
  return client_->proxy_cache();
534
5.99M
}
535
536
22.3M
YBTransactionPtr Batcher::transaction() const {
537
22.3M
  return transaction_;
538
22.3M
}
539
540
4.68M
const std::string& Batcher::proxy_uuid() const {
541
4.68M
  return client_->proxy_uuid();
542
4.68M
}
543
544
1.32M
const ClientId& Batcher::client_id() const {
545
1.32M
  return client_->id();
546
1.32M
}
547
548
std::pair<RetryableRequestId, RetryableRequestId> Batcher::NextRequestIdAndMinRunningRequestId(
549
1.32M
    const TabletId& tablet_id) {
550
1.32M
  return client_->NextRequestIdAndMinRunningRequestId(tablet_id);
551
1.32M
}
552
553
1.32M
void Batcher::RequestFinished(const TabletId& tablet_id, RetryableRequestId request_id) {
554
1.32M
  client_->RequestFinished(tablet_id, request_id);
555
1.32M
}
556
557
std::shared_ptr<AsyncRpc> Batcher::CreateRpc(
558
    const BatcherPtr& self, RemoteTablet* tablet, const InFlightOpsGroup& group,
559
6.00M
    const bool allow_local_calls_in_curr_thread, const bool need_consistent_read) {
560
3.74k
  VLOG_WITH_PREFIX_AND_FUNC(3) << "tablet: " << tablet->tablet_id();
561
562
6.00M
  CHECK(group.begin != group.end);
563
564
  // Create and send an RPC that aggregates the ops. The RPC is freed when
565
  // its callback completes.
566
  //
567
  // The RPC object takes ownership of the in flight ops.
568
  // The underlying YB OP is not directly owned, only a reference is kept.
569
570
  // Split the read operations according to consistency levels since based on consistency
571
  // levels the read algorithm would differ.
572
6.00M
  const auto op_group = (*group.begin).yb_op->group();
573
6.00M
  AsyncRpcData data {
574
6.00M
    .batcher = self,
575
6.00M
    .tablet = tablet,
576
6.00M
    .allow_local_calls_in_curr_thread = allow_local_calls_in_curr_thread,
577
6.00M
    .need_consistent_read = need_consistent_read,
578
6.00M
    .ops = InFlightOps(group.begin, group.end),
579
6.00M
    .need_metadata = group.need_metadata
580
6.00M
  };
581
582
6.00M
  switch (op_group) {
583
1.32M
    case OpGroup::kWrite:
584
1.32M
      return std::make_shared<WriteRpc>(data);
585
4.67M
    case OpGroup::kLeaderRead:
586
4.67M
      return std::make_shared<ReadRpc>(data, YBConsistencyLevel::STRONG);
587
11.8k
    case OpGroup::kConsistentPrefixRead:
588
11.8k
      return std::make_shared<ReadRpc>(data, YBConsistencyLevel::CONSISTENT_PREFIX);
589
0
  }
590
0
  FATAL_INVALID_ENUM_VALUE(OpGroup, op_group);
591
0
}
592
593
using tserver::ReadResponsePB;
594
595
0
void Batcher::AddOpCountMismatchError() {
596
  // TODO: how to handle this kind of error where the array of response PB's don't match
597
  //       the size of the array of requests. We don't have a specific YBOperation to
598
  //       create an error with, because there are multiple YBOps in one Rpc.
599
0
  LOG_WITH_PREFIX(DFATAL) << "Received wrong number of responses compared to request(s) sent.";
600
0
}
601
602
// Invoked when all RPCs are responded, so no other methods should be run in parallel to it.
603
void Batcher::Flushed(
604
5.99M
    const InFlightOps& ops, const Status& status, FlushExtraResult flush_extra_result) {
605
5.99M
  auto transaction = this->transaction();
606
5.99M
  if (transaction) {
607
780k
    const auto ops_will_be_retried = !status.ok() && ShouldSessionRetryError(status);
608
780k
    if (!ops_will_be_retried) {
609
      // We don't call Transaction::Flushed for ops that will be retried within the same
610
      // transaction in order to keep transaction running until we finally retry all operations
611
      // successfully or decide to fail and abort the transaction.
612
      // We also don't call Transaction::Flushed for ops that have been retried, but failed during
613
      // the retry.
614
      // See comments for YBTransaction::Impl::running_requests_ and
615
      // YBSession::AddErrorsAndRunCallback.
616
      // https://github.com/yugabyte/yugabyte-db/issues/7984.
617
780k
      transaction->batcher_if().Flushed(ops, flush_extra_result.used_read_time, status);
618
780k
    }
619
780k
  }
620
5.99M
  if (status.ok() && read_point_) {
621
5.71M
    read_point_->UpdateClock(flush_extra_result.propagated_hybrid_time);
622
5.71M
  }
623
624
5.99M
  if (--outstanding_rpcs_ == 0) {
625
11.0M
    for (auto& op : ops_queue_) {
626
11.0M
      if (!op.error.ok()) {
627
495k
        CombineError(op);
628
495k
      }
629
11.0M
    }
630
5.77M
    FlushFinished();
631
5.77M
  }
632
5.99M
}
633
634
5.99M
void Batcher::ProcessRpcStatus(const AsyncRpc &rpc, const Status &s) {
635
2.49k
  VLOG_WITH_PREFIX_AND_FUNC(4) << "rpc: " << AsString(rpc) << ", status: " << s;
636
637
5.99M
  if (state_ != BatcherState::kTransactionReady) {
638
0
    LOG_WITH_PREFIX(DFATAL) << "ProcessRpcStatus in wrong state " << ToString(state_) << ": "
639
0
                            << rpc.ToString() << ", " << s;
640
0
    return;
641
0
  }
642
643
5.99M
  if (PREDICT_FALSE(!s.ok())) {
644
    // Mark each of the ops as failed, since the whole RPC failed.
645
494k
    for (auto& in_flight_op : rpc.ops()) {
646
494k
      in_flight_op.error = s;
647
494k
    }
648
124k
  }
649
5.99M
}
650
651
4.67M
void Batcher::ProcessReadResponse(const ReadRpc &rpc, const Status &s) {
652
4.67M
  ProcessRpcStatus(rpc, s);
653
4.67M
}
654
655
1.32M
void Batcher::ProcessWriteResponse(const WriteRpc &rpc, const Status &s) {
656
1.32M
  ProcessRpcStatus(rpc, s);
657
658
1.32M
  if (s.ok() && rpc.resp().has_propagated_hybrid_time()) {
659
1.24M
    client_->data_->UpdateLatestObservedHybridTime(rpc.resp().propagated_hybrid_time());
660
1.24M
  }
661
662
  // Check individual row errors.
663
0
  for (const auto& err_pb : rpc.resp().per_row_errors()) {
664
    // TODO: handle case where we get one of the more specific TS errors
665
    // like the tablet not being hosted?
666
667
0
    size_t row_index = err_pb.row_index();
668
0
    if (row_index >= rpc.ops().size()) {
669
0
      LOG_WITH_PREFIX(ERROR) << "Received a per_row_error for an out-of-bound op index "
670
0
                             << row_index << " (sent only " << rpc.ops().size() << " ops)";
671
0
      LOG_WITH_PREFIX(ERROR) << "Response from tablet " << rpc.tablet().tablet_id() << ":\n"
672
0
                             << rpc.resp().DebugString();
673
0
      continue;
674
0
    }
675
0
    shared_ptr<YBOperation> yb_op = rpc.ops()[row_index].yb_op;
676
0
    VLOG_WITH_PREFIX(1) << "Error on op " << yb_op->ToString() << ": "
677
0
                        << err_pb.error().ShortDebugString();
678
0
    rpc.ops()[err_pb.row_index()].error = StatusFromPB(err_pb.error());
679
0
  }
680
1.32M
}
681
682
6.03M
double Batcher::RejectionScore(int attempt_num) {
683
6.03M
  if (!rejection_score_source_) {
684
1.16M
    return 0.0;
685
1.16M
  }
686
687
4.86M
  return rejection_score_source_->Get(attempt_num);
688
4.86M
}
689
690
5.74M
CollectedErrors Batcher::GetAndClearPendingErrors() {
691
5.74M
  return error_collector_.GetAndClearErrors();
692
5.74M
}
693
694
0
std::string Batcher::LogPrefix() const {
695
0
  const void* self = this;
696
0
  return Format(
697
0
      "Batcher ($0), session ($1): ", self, static_cast<void*>(weak_session_.lock().get()));
698
0
}
699
700
InFlightOpsGroup::InFlightOpsGroup(const Iterator& group_begin, const Iterator& group_end)
701
5.99M
    : begin(group_begin), end(group_end) {
702
5.99M
}
703
704
0
std::string InFlightOpsGroup::ToString() const {
705
0
  return Format("{items: $0 need_metadata: $1}",
706
0
                AsString(boost::make_iterator_range(begin, end)),
707
0
                need_metadata);
708
0
}
709
710
}  // namespace internal
711
}  // namespace client
712
}  // namespace yb