YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/client/batcher.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
#ifndef YB_CLIENT_BATCHER_H_
33
#define YB_CLIENT_BATCHER_H_
34
35
#include <unordered_map>
36
#include <unordered_set>
37
#include <vector>
38
39
#include "yb/client/async_rpc.h"
40
#include "yb/client/error_collector.h"
41
#include "yb/client/transaction.h"
42
43
#include "yb/common/consistent_read_point.h"
44
#include "yb/common/retryable_request.h"
45
#include "yb/common/transaction.h"
46
47
#include "yb/gutil/macros.h"
48
#include "yb/gutil/ref_counted.h"
49
50
#include "yb/util/async_util.h"
51
#include "yb/util/atomic.h"
52
#include "yb/util/locks.h"
53
#include "yb/util/status_fwd.h"
54
#include "yb/util/threadpool.h"
55
56
namespace yb {
57
namespace client {
58
namespace internal {
59
60
struct InFlightOpsGroup {
61
  using Iterator = InFlightOps::const_iterator;
62
63
  bool need_metadata = false;
64
  const Iterator begin;
65
  const Iterator end;
66
67
  InFlightOpsGroup(const Iterator& group_begin, const Iterator& group_end);
68
  std::string ToString() const;
69
};
70
71
struct InFlightOpsTransactionMetadata {
72
  TransactionMetadata transaction;
73
  boost::optional<SubTransactionMetadata> subtransaction;
74
};
75
76
struct InFlightOpsGroupsWithMetadata {
77
  static const size_t kPreallocatedCapacity = 40;
78
79
  boost::container::small_vector<InFlightOpsGroup, kPreallocatedCapacity> groups;
80
  InFlightOpsTransactionMetadata metadata;
81
};
82
83
class TxnBatcherIf {
84
 public:
85
  // Ask transaction to expect `count` operations in future. I.e. Prepare will be called with such
86
  // number of ops.
87
  virtual void ExpectOperations(size_t count) = 0;
88
89
  // Notifies transaction that specified ops were flushed with some status.
90
  virtual void Flushed(
91
      const internal::InFlightOps& ops, const ReadHybridTime& used_read_time,
92
      const Status& status) = 0;
93
94
  // This function is used to init metadata of Write/Read request.
95
  // If we don't have enough information, then the function returns false and stores
96
  // the waiter, which will be invoked when we obtain such information.
97
  virtual bool Prepare(
98
      internal::InFlightOpsGroupsWithMetadata* ops_info,
99
      ForceConsistentRead force_consistent_read,
100
      CoarseTimePoint deadline,
101
      Initial initial,
102
      Waiter waiter) = 0;
103
104
419k
  virtual ~TxnBatcherIf() = default;
105
};
106
107
// Batcher state changes sequentially in the order listed below, with the exception that kAborted
108
// could be reached from any state.
109
YB_DEFINE_ENUM(
110
    BatcherState,
111
    (kGatheringOps)       // Initial state, while we adding operations to the batcher.
112
    (kResolvingTablets)   // Flush was invoked on batcher, waiting until tablets for all operations
113
                          // are resolved and move to the next state.
114
                          // Could change to kComplete in case of failure.
115
    (kTransactionPrepare) // Preparing associated transaction for flushing operations of this
116
                          // batcher, for instance it picks status tablet and fills
117
                          // transaction metadata for this batcher.
118
                          // When there is no associated transaction or no operations moves to the
119
                          // next state immediately.
120
    (kTransactionReady)   // Transaction is ready, sending operations to appropriate tablets and
121
                          // wait for response. When there is no transaction - we still sending
122
                          // operations marking transaction as auto ready.
123
    (kComplete)           // Batcher complete.
124
    (kAborted)            // Batcher was aborted.
125
    );
126
127
// A Batcher is the class responsible for collecting row operations, routing them to the
128
// correct tablet server, and possibly batching them together for better efficiency.
129
//
130
// It is a reference-counted class: the client session creating the batch holds one
131
// reference, and all of the in-flight operations hold others. This allows the client
132
// session to be destructed while ops are still in-flight, without the async callbacks
133
// attempting to access a destructed Batcher.
134
//
135
// This class is not thread safe, i.e. it could be filled only from one thread, then flushed.
136
//
137
// The batcher changes state step by step, with appropriate operation done in the current step.
138
// For instance on gathering step it does NOT lookup tablets, and on transaction prepare state
139
// it only waits for transaction to be ready.
140
//
141
// Before calling FlushAsync all Batcher functions should be called sequentially, so no concurrent
142
// access to Batcher state is happening. FlushAsync is doing all tablets lookups and this doesn't
143
// modify Batcher state (only updates individual operations independently) until all lookups are
144
// done. After all tablets lookups are completed, Batcher changes its state and calls
145
// ExecuteOperations. This results in asynchronous calls to ProcessReadResponse/ProcessWriteResponse
146
// as operations are completed, but these functions only read Batcher state and update individual
147
// operations independently.
148
class Batcher : public Runnable, public std::enable_shared_from_this<Batcher> {
149
 public:
150
  // Create a new batcher associated with the given session.
151
  //
152
  // Creates a weak_ptr to 'session'.
153
  Batcher(YBClient* client,
154
          const YBSessionPtr& session,
155
          YBTransactionPtr transaction,
156
          ConsistentReadPoint* read_point,
157
          bool force_consistent_read);
158
  ~Batcher();
159
160
  // Set the timeout for this batcher.
161
  //
162
  // The timeout is currently set on all of the RPCs, but in the future will be relative
163
  // to when the Flush call is made (eg even if the lookup of the TS takes a long time, it
164
  // may time out before even sending an op). TODO: implement that
165
  void SetDeadline(CoarseTimePoint deadline);
166
167
  // Add a new operation to the batch. Requires that the batch has not yet been flushed.
168
  // TODO: in other flush modes, this may not be the case -- need to
169
  // update this when they're implemented.
170
  //
171
  // NOTE: If this returns not-OK, does not take ownership of 'write_op'.
172
  void Add(std::shared_ptr<YBOperation> yb_op);
173
174
  bool Has(const std::shared_ptr<YBOperation>& yb_op) const;
175
176
  // Return true if any operations are still pending. An operation is no longer considered
177
  // pending once it has either errored or succeeded.  Operations are considering pending
178
  // as soon as they are added, even if Flush has not been called.
179
  bool HasPendingOperations() const;
180
181
  // Return the number of buffered operations. These are only those operations which are
182
  // "corked" (i.e not yet flushed). Once Flush has been called, this returns 0.
183
  size_t CountBufferedOperations() const;
184
185
  // Flush any buffered operations. The callback will be called once there are no
186
  // more pending operations from this Batcher. If all of the operations succeeded,
187
  // then the callback will receive Status::OK. Otherwise, it will receive failed status,
188
  // and the caller must inspect the ErrorCollector to retrieve more detailed
189
  // information on which operations failed.
190
  // If is_within_transaction_retry is true, all operations to be flushed by this batcher have
191
  // been already flushed, meaning we are now retrying them within the same session and the
192
  // associated transaction (if any) already expects them.
193
  void FlushAsync(StatusFunctor callback, IsWithinTransactionRetry is_within_transaction_retry);
194
195
12.0M
  CoarseTimePoint deadline() const {
196
12.0M
    return deadline_;
197
12.0M
  }
198
199
  rpc::Messenger* messenger() const;
200
201
  rpc::ProxyCache& proxy_cache() const;
202
203
12.0M
  const std::shared_ptr<AsyncRpcMetrics>& async_rpc_metrics() const {
204
12.0M
    return async_rpc_metrics_;
205
12.0M
  }
206
207
13.6M
  ConsistentReadPoint* read_point() {
208
13.6M
    return read_point_;
209
13.6M
  }
210
211
5.09k
  void SetForceConsistentRead(ForceConsistentRead value) {
212
5.09k
    force_consistent_read_ = value;
213
5.09k
  }
214
215
  YBTransactionPtr transaction() const;
216
217
12.4M
  const InFlightOpsGroupsWithMetadata& in_flight_ops() const { return ops_info_; }
218
219
11.5M
  void set_allow_local_calls_in_curr_thread(bool flag) { allow_local_calls_in_curr_thread_ = flag; }
220
221
0
  bool allow_local_calls_in_curr_thread() const { return allow_local_calls_in_curr_thread_; }
222
223
  const std::string& proxy_uuid() const;
224
225
  const ClientId& client_id() const;
226
227
  std::pair<RetryableRequestId, RetryableRequestId> NextRequestIdAndMinRunningRequestId(
228
      const TabletId& tablet_id);
229
  void RequestFinished(const TabletId& tablet_id, RetryableRequestId request_id);
230
231
20.6M
  void SetRejectionScoreSource(RejectionScoreSourcePtr rejection_score_source) {
232
20.6M
    rejection_score_source_ = rejection_score_source;
233
20.6M
  }
234
235
  double RejectionScore(int attempt_num);
236
237
  // Returns errors occurred due tablet resolution or flushing operations to tablet server(s).
238
  // Caller takes ownership of the returned errors.
239
  CollectedErrors GetAndClearPendingErrors();
240
241
  std::string LogPrefix() const;
242
243
  // This is a status error string used when there are multiple errors that need to be fetched
244
  // from the error collector.
245
  static const std::string kErrorReachingOutToTServersMsg;
246
247
 private:
248
  friend class RefCountedThreadSafe<Batcher>;
249
  friend class AsyncRpc;
250
  friend class WriteRpc;
251
  friend class ReadRpc;
252
253
  void Flushed(const InFlightOps& ops, const Status& status, FlushExtraResult flush_extra_result);
254
255
  // Combines new error to existing ones. I.e. updates combined error with new status.
256
  void CombineError(const InFlightOp& in_flight_op);
257
258
  void FlushFinished();
259
  void AllLookupsDone();
260
  std::shared_ptr<AsyncRpc> CreateRpc(
261
      const BatcherPtr& self, RemoteTablet* tablet, const InFlightOpsGroup& group,
262
      bool allow_local_calls_in_curr_thread, bool need_consistent_read);
263
264
  // Calls/Schedules flush_callback_ and resets it to free resources.
265
  void RunCallback();
266
267
  // Log an error where an Rpc callback has response count mismatch.
268
  void AddOpCountMismatchError();
269
270
  // Cleans up an RPC response, scooping out any errors and passing them up
271
  // to the batcher.
272
  void ProcessReadResponse(const ReadRpc &rpc, const Status &s);
273
  void ProcessWriteResponse(const WriteRpc &rpc, const Status &s);
274
275
  // Process RPC status.
276
  void ProcessRpcStatus(const AsyncRpc &rpc, const Status &s);
277
278
  // Async Callbacks.
279
  void TabletLookupFinished(InFlightOp* op, Result<internal::RemoteTabletPtr> result);
280
281
  void TransactionReady(const Status& status);
282
283
  // initial - whether this method is called first time for this batch.
284
  void ExecuteOperations(Initial initial);
285
286
  void Abort(const Status& status);
287
288
  void Run() override;
289
290
  std::map<PartitionKey, Status> CollectOpsErrors();
291
292
  BatcherState state_ = BatcherState::kGatheringOps;
293
294
  YBClient* const client_;
295
  std::weak_ptr<YBSession> weak_session_;
296
297
  // Errors are reported into this error collector.
298
  ErrorCollector error_collector_;
299
300
  Status combined_error_;
301
302
  // If state is kFlushing, this member will be set to the user-provided
303
  // callback. Once there are no more in-flight operations, the callback
304
  // will be called exactly once (and the state changed to kFlushed).
305
  StatusFunctor flush_callback_;
306
307
  // All buffered or in-flight ops.
308
  // Added to this set during apply, removed during Finished of AsyncRpc.
309
  std::vector<std::shared_ptr<YBOperation>> ops_;
310
  std::vector<InFlightOp> ops_queue_;
311
  InFlightOpsGroupsWithMetadata ops_info_;
312
313
  // The absolute deadline for all in-flight ops.
314
  CoarseTimePoint deadline_;
315
316
  // Number of outstanding lookups across all in-flight ops.
317
  std::atomic<size_t> outstanding_lookups_{0};
318
  std::atomic<size_t> outstanding_rpcs_{0};
319
320
  // If true, we might allow the local calls to be run in the same IPC thread.
321
  bool allow_local_calls_in_curr_thread_ = true;
322
323
  std::shared_ptr<yb::client::internal::AsyncRpcMetrics> async_rpc_metrics_;
324
325
  YBTransactionPtr transaction_;
326
327
  // The consistent read point for this batch if it is specified.
328
  ConsistentReadPoint* read_point_ = nullptr;
329
330
  // Force consistent read on transactional table, even we have only single shard commands.
331
  ForceConsistentRead force_consistent_read_;
332
333
  RejectionScoreSourcePtr rejection_score_source_;
334
335
  DISALLOW_COPY_AND_ASSIGN(Batcher);
336
};
337
338
}  // namespace internal
339
}  // namespace client
340
}  // namespace yb
341
#endif  // YB_CLIENT_BATCHER_H_