/Users/deen/code/yugabyte-db/src/yb/client/batcher.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | #ifndef YB_CLIENT_BATCHER_H_ |
33 | | #define YB_CLIENT_BATCHER_H_ |
34 | | |
35 | | #include <unordered_map> |
36 | | #include <unordered_set> |
37 | | #include <vector> |
38 | | |
39 | | #include "yb/client/async_rpc.h" |
40 | | #include "yb/client/error_collector.h" |
41 | | #include "yb/client/transaction.h" |
42 | | |
43 | | #include "yb/common/consistent_read_point.h" |
44 | | #include "yb/common/retryable_request.h" |
45 | | #include "yb/common/transaction.h" |
46 | | |
47 | | #include "yb/gutil/macros.h" |
48 | | #include "yb/gutil/ref_counted.h" |
49 | | |
50 | | #include "yb/util/async_util.h" |
51 | | #include "yb/util/atomic.h" |
52 | | #include "yb/util/locks.h" |
53 | | #include "yb/util/status_fwd.h" |
54 | | #include "yb/util/threadpool.h" |
55 | | |
56 | | namespace yb { |
57 | | namespace client { |
58 | | namespace internal { |
59 | | |
60 | | struct InFlightOpsGroup { |
61 | | using Iterator = InFlightOps::const_iterator; |
62 | | |
63 | | bool need_metadata = false; |
64 | | const Iterator begin; |
65 | | const Iterator end; |
66 | | |
67 | | InFlightOpsGroup(const Iterator& group_begin, const Iterator& group_end); |
68 | | std::string ToString() const; |
69 | | }; |
70 | | |
71 | | struct InFlightOpsTransactionMetadata { |
72 | | TransactionMetadata transaction; |
73 | | boost::optional<SubTransactionMetadata> subtransaction; |
74 | | }; |
75 | | |
76 | | struct InFlightOpsGroupsWithMetadata { |
77 | | static const size_t kPreallocatedCapacity = 40; |
78 | | |
79 | | boost::container::small_vector<InFlightOpsGroup, kPreallocatedCapacity> groups; |
80 | | InFlightOpsTransactionMetadata metadata; |
81 | | }; |
82 | | |
83 | | class TxnBatcherIf { |
84 | | public: |
85 | | // Ask transaction to expect `count` operations in future. I.e. Prepare will be called with such |
86 | | // number of ops. |
87 | | virtual void ExpectOperations(size_t count) = 0; |
88 | | |
89 | | // Notifies transaction that specified ops were flushed with some status. |
90 | | virtual void Flushed( |
91 | | const internal::InFlightOps& ops, const ReadHybridTime& used_read_time, |
92 | | const Status& status) = 0; |
93 | | |
94 | | // This function is used to init metadata of Write/Read request. |
95 | | // If we don't have enough information, then the function returns false and stores |
96 | | // the waiter, which will be invoked when we obtain such information. |
97 | | virtual bool Prepare( |
98 | | internal::InFlightOpsGroupsWithMetadata* ops_info, |
99 | | ForceConsistentRead force_consistent_read, |
100 | | CoarseTimePoint deadline, |
101 | | Initial initial, |
102 | | Waiter waiter) = 0; |
103 | | |
104 | 253k | virtual ~TxnBatcherIf() = default; |
105 | | }; |
106 | | |
107 | | // Batcher state changes sequentially in the order listed below, with the exception that kAborted |
108 | | // could be reached from any state. |
109 | | YB_DEFINE_ENUM( |
110 | | BatcherState, |
111 | | (kGatheringOps) // Initial state, while we adding operations to the batcher. |
112 | | (kResolvingTablets) // Flush was invoked on batcher, waiting until tablets for all operations |
113 | | // are resolved and move to the next state. |
114 | | // Could change to kComplete in case of failure. |
115 | | (kTransactionPrepare) // Preparing associated transaction for flushing operations of this |
116 | | // batcher, for instance it picks status tablet and fills |
117 | | // transaction metadata for this batcher. |
118 | | // When there is no associated transaction or no operations moves to the |
119 | | // next state immediately. |
120 | | (kTransactionReady) // Transaction is ready, sending operations to appropriate tablets and |
121 | | // wait for response. When there is no transaction - we still sending |
122 | | // operations marking transaction as auto ready. |
123 | | (kComplete) // Batcher complete. |
124 | | (kAborted) // Batcher was aborted. |
125 | | ); |
126 | | |
127 | | // A Batcher is the class responsible for collecting row operations, routing them to the |
128 | | // correct tablet server, and possibly batching them together for better efficiency. |
129 | | // |
130 | | // It is a reference-counted class: the client session creating the batch holds one |
131 | | // reference, and all of the in-flight operations hold others. This allows the client |
132 | | // session to be destructed while ops are still in-flight, without the async callbacks |
133 | | // attempting to access a destructed Batcher. |
134 | | // |
135 | | // This class is not thread safe, i.e. it could be filled only from one thread, then flushed. |
136 | | // |
137 | | // The batcher changes state step by step, with appropriate operation done in the current step. |
138 | | // For instance on gathering step it does NOT lookup tablets, and on transaction prepare state |
139 | | // it only waits for transaction to be ready. |
140 | | // |
141 | | // Before calling FlushAsync all Batcher functions should be called sequentially, so no concurrent |
142 | | // access to Batcher state is happening. FlushAsync is doing all tablets lookups and this doesn't |
143 | | // modify Batcher state (only updates individual operations independently) until all lookups are |
144 | | // done. After all tablets lookups are completed, Batcher changes its state and calls |
145 | | // ExecuteOperations. This results in asynchronous calls to ProcessReadResponse/ProcessWriteResponse |
146 | | // as operations are completed, but these functions only read Batcher state and update individual |
147 | | // operations independently. |
148 | | class Batcher : public Runnable, public std::enable_shared_from_this<Batcher> { |
149 | | public: |
150 | | // Create a new batcher associated with the given session. |
151 | | // |
152 | | // Creates a weak_ptr to 'session'. |
153 | | Batcher(YBClient* client, |
154 | | const YBSessionPtr& session, |
155 | | YBTransactionPtr transaction, |
156 | | ConsistentReadPoint* read_point, |
157 | | bool force_consistent_read); |
158 | | ~Batcher(); |
159 | | |
160 | | // Set the timeout for this batcher. |
161 | | // |
162 | | // The timeout is currently set on all of the RPCs, but in the future will be relative |
163 | | // to when the Flush call is made (eg even if the lookup of the TS takes a long time, it |
164 | | // may time out before even sending an op). TODO: implement that |
165 | | void SetDeadline(CoarseTimePoint deadline); |
166 | | |
167 | | // Add a new operation to the batch. Requires that the batch has not yet been flushed. |
168 | | // TODO: in other flush modes, this may not be the case -- need to |
169 | | // update this when they're implemented. |
170 | | // |
171 | | // NOTE: If this returns not-OK, does not take ownership of 'write_op'. |
172 | | void Add(std::shared_ptr<YBOperation> yb_op); |
173 | | |
174 | | bool Has(const std::shared_ptr<YBOperation>& yb_op) const; |
175 | | |
176 | | // Return true if any operations are still pending. An operation is no longer considered |
177 | | // pending once it has either errored or succeeded. Operations are considering pending |
178 | | // as soon as they are added, even if Flush has not been called. |
179 | | bool HasPendingOperations() const; |
180 | | |
181 | | // Return the number of buffered operations. These are only those operations which are |
182 | | // "corked" (i.e not yet flushed). Once Flush has been called, this returns 0. |
183 | | size_t CountBufferedOperations() const; |
184 | | |
185 | | // Flush any buffered operations. The callback will be called once there are no |
186 | | // more pending operations from this Batcher. If all of the operations succeeded, |
187 | | // then the callback will receive Status::OK. Otherwise, it will receive failed status, |
188 | | // and the caller must inspect the ErrorCollector to retrieve more detailed |
189 | | // information on which operations failed. |
190 | | // If is_within_transaction_retry is true, all operations to be flushed by this batcher have |
191 | | // been already flushed, meaning we are now retrying them within the same session and the |
192 | | // associated transaction (if any) already expects them. |
193 | | void FlushAsync(StatusFunctor callback, IsWithinTransactionRetry is_within_transaction_retry); |
194 | | |
195 | 6.01M | CoarseTimePoint deadline() const { |
196 | 6.01M | return deadline_; |
197 | 6.01M | } |
198 | | |
199 | | rpc::Messenger* messenger() const; |
200 | | |
201 | | rpc::ProxyCache& proxy_cache() const; |
202 | | |
203 | 6.00M | const std::shared_ptr<AsyncRpcMetrics>& async_rpc_metrics() const { |
204 | 6.00M | return async_rpc_metrics_; |
205 | 6.00M | } |
206 | | |
207 | 6.60M | ConsistentReadPoint* read_point() { |
208 | 6.60M | return read_point_; |
209 | 6.60M | } |
210 | | |
211 | 5.30k | void SetForceConsistentRead(ForceConsistentRead value) { |
212 | 5.30k | force_consistent_read_ = value; |
213 | 5.30k | } |
214 | | |
215 | | YBTransactionPtr transaction() const; |
216 | | |
217 | 6.26M | const InFlightOpsGroupsWithMetadata& in_flight_ops() const { return ops_info_; } |
218 | | |
219 | 5.72M | void set_allow_local_calls_in_curr_thread(bool flag) { allow_local_calls_in_curr_thread_ = flag; } |
220 | | |
221 | 0 | bool allow_local_calls_in_curr_thread() const { return allow_local_calls_in_curr_thread_; } |
222 | | |
223 | | const std::string& proxy_uuid() const; |
224 | | |
225 | | const ClientId& client_id() const; |
226 | | |
227 | | std::pair<RetryableRequestId, RetryableRequestId> NextRequestIdAndMinRunningRequestId( |
228 | | const TabletId& tablet_id); |
229 | | void RequestFinished(const TabletId& tablet_id, RetryableRequestId request_id); |
230 | | |
231 | 10.5M | void SetRejectionScoreSource(RejectionScoreSourcePtr rejection_score_source) { |
232 | 10.5M | rejection_score_source_ = rejection_score_source; |
233 | 10.5M | } |
234 | | |
235 | | double RejectionScore(int attempt_num); |
236 | | |
237 | | // Returns errors occurred due tablet resolution or flushing operations to tablet server(s). |
238 | | // Caller takes ownership of the returned errors. |
239 | | CollectedErrors GetAndClearPendingErrors(); |
240 | | |
241 | | std::string LogPrefix() const; |
242 | | |
243 | | // This is a status error string used when there are multiple errors that need to be fetched |
244 | | // from the error collector. |
245 | | static const std::string kErrorReachingOutToTServersMsg; |
246 | | |
247 | | private: |
248 | | friend class RefCountedThreadSafe<Batcher>; |
249 | | friend class AsyncRpc; |
250 | | friend class WriteRpc; |
251 | | friend class ReadRpc; |
252 | | |
253 | | void Flushed(const InFlightOps& ops, const Status& status, FlushExtraResult flush_extra_result); |
254 | | |
255 | | // Combines new error to existing ones. I.e. updates combined error with new status. |
256 | | void CombineError(const InFlightOp& in_flight_op); |
257 | | |
258 | | void FlushFinished(); |
259 | | void AllLookupsDone(); |
260 | | std::shared_ptr<AsyncRpc> CreateRpc( |
261 | | const BatcherPtr& self, RemoteTablet* tablet, const InFlightOpsGroup& group, |
262 | | bool allow_local_calls_in_curr_thread, bool need_consistent_read); |
263 | | |
264 | | // Calls/Schedules flush_callback_ and resets it to free resources. |
265 | | void RunCallback(); |
266 | | |
267 | | // Log an error where an Rpc callback has response count mismatch. |
268 | | void AddOpCountMismatchError(); |
269 | | |
270 | | // Cleans up an RPC response, scooping out any errors and passing them up |
271 | | // to the batcher. |
272 | | void ProcessReadResponse(const ReadRpc &rpc, const Status &s); |
273 | | void ProcessWriteResponse(const WriteRpc &rpc, const Status &s); |
274 | | |
275 | | // Process RPC status. |
276 | | void ProcessRpcStatus(const AsyncRpc &rpc, const Status &s); |
277 | | |
278 | | // Async Callbacks. |
279 | | void TabletLookupFinished(InFlightOp* op, Result<internal::RemoteTabletPtr> result); |
280 | | |
281 | | void TransactionReady(const Status& status); |
282 | | |
283 | | // initial - whether this method is called first time for this batch. |
284 | | void ExecuteOperations(Initial initial); |
285 | | |
286 | | void Abort(const Status& status); |
287 | | |
288 | | void Run() override; |
289 | | |
290 | | std::map<PartitionKey, Status> CollectOpsErrors(); |
291 | | |
292 | | BatcherState state_ = BatcherState::kGatheringOps; |
293 | | |
294 | | YBClient* const client_; |
295 | | std::weak_ptr<YBSession> weak_session_; |
296 | | |
297 | | // Errors are reported into this error collector. |
298 | | ErrorCollector error_collector_; |
299 | | |
300 | | Status combined_error_; |
301 | | |
302 | | // If state is kFlushing, this member will be set to the user-provided |
303 | | // callback. Once there are no more in-flight operations, the callback |
304 | | // will be called exactly once (and the state changed to kFlushed). |
305 | | StatusFunctor flush_callback_; |
306 | | |
307 | | // All buffered or in-flight ops. |
308 | | // Added to this set during apply, removed during Finished of AsyncRpc. |
309 | | std::vector<std::shared_ptr<YBOperation>> ops_; |
310 | | std::vector<InFlightOp> ops_queue_; |
311 | | InFlightOpsGroupsWithMetadata ops_info_; |
312 | | |
313 | | // The absolute deadline for all in-flight ops. |
314 | | CoarseTimePoint deadline_; |
315 | | |
316 | | // Number of outstanding lookups across all in-flight ops. |
317 | | std::atomic<size_t> outstanding_lookups_{0}; |
318 | | std::atomic<size_t> outstanding_rpcs_{0}; |
319 | | |
320 | | // If true, we might allow the local calls to be run in the same IPC thread. |
321 | | bool allow_local_calls_in_curr_thread_ = true; |
322 | | |
323 | | std::shared_ptr<yb::client::internal::AsyncRpcMetrics> async_rpc_metrics_; |
324 | | |
325 | | YBTransactionPtr transaction_; |
326 | | |
327 | | // The consistent read point for this batch if it is specified. |
328 | | ConsistentReadPoint* read_point_ = nullptr; |
329 | | |
330 | | // Force consistent read on transactional table, even we have only single shard commands. |
331 | | ForceConsistentRead force_consistent_read_; |
332 | | |
333 | | RejectionScoreSourcePtr rejection_score_source_; |
334 | | |
335 | | DISALLOW_COPY_AND_ASSIGN(Batcher); |
336 | | }; |
337 | | |
338 | | } // namespace internal |
339 | | } // namespace client |
340 | | } // namespace yb |
341 | | #endif // YB_CLIENT_BATCHER_H_ |