/Users/deen/code/yugabyte-db/src/yb/client/session.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/client/session.h" |
15 | | |
16 | | #include "yb/client/async_rpc.h" |
17 | | #include "yb/client/batcher.h" |
18 | | #include "yb/client/client.h" |
19 | | #include "yb/client/client_error.h" |
20 | | #include "yb/client/error.h" |
21 | | #include "yb/client/error_collector.h" |
22 | | #include "yb/client/yb_op.h" |
23 | | |
24 | | #include "yb/common/consistent_read_point.h" |
25 | | |
26 | | #include "yb/consensus/consensus_error.h" |
27 | | |
28 | | #include "yb/tserver/tserver_error.h" |
29 | | |
30 | | #include "yb/util/debug-util.h" |
31 | | #include "yb/util/logging.h" |
32 | | #include "yb/util/metrics.h" |
33 | | #include "yb/util/status_log.h" |
34 | | |
35 | | using namespace std::literals; |
36 | | using namespace std::placeholders; |
37 | | |
38 | | DEFINE_int32(client_read_write_timeout_ms, 60000, "Timeout for client read and write operations."); |
39 | | |
40 | | namespace yb { |
41 | | namespace client { |
42 | | |
43 | | using internal::AsyncRpcMetrics; |
44 | | using internal::Batcher; |
45 | | |
46 | | using std::shared_ptr; |
47 | | |
48 | 124k | YBSession::YBSession(YBClient* client, const scoped_refptr<ClockBase>& clock) { |
49 | 124k | batcher_config_.client = client; |
50 | 124k | batcher_config_.non_transactional_read_point = |
51 | 102k | clock ? std::make_unique<ConsistentReadPoint>(clock) : nullptr; |
52 | 124k | const auto metric_entity = client->metric_entity(); |
53 | 123k | async_rpc_metrics_ = metric_entity ? std::make_shared<AsyncRpcMetrics>(metric_entity) : nullptr; |
54 | 124k | } |
55 | | |
56 | 4.95M | void YBSession::SetReadPoint(const Restart restart) { |
57 | 4.95M | const auto& read_point = batcher_config_.non_transactional_read_point; |
58 | 4.95M | DCHECK_NOTNULL(read_point.get()); |
59 | 4.95M | if (restart && read_point->IsRestartRequired()) { |
60 | 33 | read_point->Restart(); |
61 | 4.95M | } else { |
62 | 4.95M | read_point->SetCurrentReadTime(); |
63 | 4.95M | } |
64 | 4.95M | } |
65 | | |
66 | 211k | void YBSession::SetReadPoint(const ReadHybridTime& read_time) { |
67 | 211k | batcher_config_.non_transactional_read_point->SetReadTime(read_time, {} /* local_limits */); |
68 | 211k | } |
69 | | |
70 | 0 | bool YBSession::IsRestartRequired() { |
71 | 0 | auto rp = read_point(); |
72 | 0 | return rp && rp->IsRestartRequired(); |
73 | 0 | } |
74 | | |
75 | 0 | void YBSession::DeferReadPoint() { |
76 | 0 | batcher_config_.non_transactional_read_point->Defer(); |
77 | 0 | } |
78 | | |
79 | 492k | void YBSession::SetTransaction(YBTransactionPtr transaction) { |
80 | 492k | batcher_config_.transaction = std::move(transaction); |
81 | 492k | internal::BatcherPtr old_batcher; |
82 | 492k | old_batcher.swap(batcher_); |
83 | 273 | LOG_IF(DFATAL, old_batcher) << "SetTransaction with non empty batcher"; |
84 | 492k | } |
85 | | |
86 | 4.81M | void YBSession::SetRejectionScoreSource(RejectionScoreSourcePtr rejection_score_source) { |
87 | 4.81M | if (batcher_) { |
88 | 4.81M | batcher_->SetRejectionScoreSource(rejection_score_source); |
89 | 4.81M | } |
90 | 4.81M | batcher_config_.rejection_score_source = std::move(rejection_score_source); |
91 | 4.81M | } |
92 | | |
93 | 102k | YBSession::~YBSession() { |
94 | 102k | WARN_NOT_OK(Close(true), "Closed Session with pending operations."); |
95 | 102k | } |
96 | | |
97 | 4.76M | void YBSession::Abort() { |
98 | 4.76M | if (batcher_ && batcher_->HasPendingOperations()) { |
99 | 42 | batcher_.reset(); |
100 | 42 | } |
101 | 4.76M | } |
102 | | |
103 | 102k | Status YBSession::Close(bool force) { |
104 | 102k | if (batcher_) { |
105 | 0 | if (batcher_->HasPendingOperations() && !force) { |
106 | 0 | return STATUS(IllegalState, "Could not close. There are pending operations."); |
107 | 0 | } |
108 | 0 | batcher_.reset(); |
109 | 0 | } |
110 | 102k | return Status::OK(); |
111 | 102k | } |
112 | | |
113 | 3.07k | void YBSession::SetTimeout(MonoDelta timeout) { |
114 | 3.07k | CHECK_GE(timeout, MonoDelta::kZero); |
115 | 3.07k | deadline_ = CoarseTimePoint(); |
116 | 3.07k | timeout_ = timeout; |
117 | 3.07k | if (batcher_) { |
118 | 0 | batcher_->SetDeadline(CoarseMonoClock::now() + timeout_); |
119 | 0 | } |
120 | 3.07k | } |
121 | | |
122 | 5.64M | void YBSession::SetDeadline(CoarseTimePoint deadline) { |
123 | 5.64M | timeout_ = MonoDelta(); |
124 | 5.64M | deadline_ = deadline; |
125 | 5.64M | if (batcher_) { |
126 | 0 | batcher_->SetDeadline(deadline); |
127 | 0 | } |
128 | 5.64M | } |
129 | | |
130 | 25.0k | Status YBSession::Flush() { |
131 | 25.0k | return FlushFuture().get().status; |
132 | 25.0k | } |
133 | | |
134 | 12.6k | FlushStatus YBSession::FlushAndGetOpsErrors() { |
135 | 12.6k | return FlushFuture().get(); |
136 | 12.6k | } |
137 | | |
138 | | namespace { |
139 | | |
140 | 5.77M | internal::BatcherPtr CreateBatcher(const YBSession::BatcherConfig& config) { |
141 | 5.77M | auto batcher = std::make_shared<internal::Batcher>( |
142 | 5.77M | config.client, config.session.lock(), config.transaction, config.read_point(), |
143 | 5.77M | config.force_consistent_read); |
144 | 5.77M | batcher->SetRejectionScoreSource(config.rejection_score_source); |
145 | 5.77M | return batcher; |
146 | 5.77M | } |
147 | | |
148 | | void FlushBatcherAsync( |
149 | | const internal::BatcherPtr& batcher, FlushCallback callback, YBSession::BatcherConfig config, |
150 | | const internal::IsWithinTransactionRetry is_within_transaction_retry); |
151 | | |
152 | | void MoveErrorsAndRunCallback( |
153 | | const internal::BatcherPtr& done_batcher, CollectedErrors errors, FlushCallback callback, |
154 | 5.74M | const Status& status) { |
155 | 5.64M | const auto vlog_level = status.ok() ? 4 : 3; |
156 | 18.4E | VLOG_WITH_FUNC(vlog_level) << "Invoking callback, batcher: " << done_batcher->LogPrefix() |
157 | 18.4E | << ", num_errors: " << errors.size() << " status: " << status; |
158 | 0 | if (VLOG_IS_ON(5)) { |
159 | 0 | for (auto& error : errors) { |
160 | 0 | VLOG(5) << "Operation " << AsString(error->failed_op()) |
161 | 0 | << " failed with: " << AsString(error->status()); |
162 | 0 | } |
163 | 0 | } |
164 | | // TODO: before enabling transaction sealing we might need to call Transaction::Flushed |
165 | | // for ops that we have retried, failed again and decided not to retry due to deadline. |
166 | | // See comments for YBTransaction::Impl::running_requests_ and |
167 | | // Batcher::RemoveInFlightOpsAfterFlushing. |
168 | | // https://github.com/yugabyte/yugabyte-db/issues/7984. |
169 | 5.74M | FlushStatus flush_status{status, std::move(errors)}; |
170 | 5.74M | callback(&flush_status); |
171 | 5.74M | } |
172 | | |
173 | | void BatcherFlushDone( |
174 | | const internal::BatcherPtr& done_batcher, const Status& s, |
175 | 5.74M | FlushCallback callback, YBSession::BatcherConfig batcher_config) { |
176 | 5.74M | auto errors = done_batcher->GetAndClearPendingErrors(); |
177 | 5.74M | size_t retriable_errors_count = 0; |
178 | 496k | for (auto& error : errors) { |
179 | 496k | retriable_errors_count += ShouldSessionRetryError(error->status()); |
180 | 496k | } |
181 | 5.76M | if (errors.size() > retriable_errors_count || errors.empty()) { |
182 | | // We only retry failed ops if all of them failed with retriable errors. |
183 | 5.76M | MoveErrorsAndRunCallback(done_batcher, std::move(errors), std::move(callback), s); |
184 | 5.76M | return; |
185 | 5.76M | } |
186 | | |
187 | 18.4E | VLOG_WITH_FUNC(3) << "Retrying " << errors.size() << " operations from batcher " |
188 | 18.4E | << done_batcher->LogPrefix() << " due to: " << s |
189 | 18.4E | << ": (first op error: " << errors[0]->status() << ")"; |
190 | | |
191 | 18.4E | internal::BatcherPtr retry_batcher = CreateBatcher(batcher_config); |
192 | 18.4E | retry_batcher->SetDeadline(done_batcher->deadline()); |
193 | 6.01k | for (auto& error : errors) { |
194 | 0 | VLOG_WITH_FUNC(5) << "Retrying " << AsString(error->failed_op()) |
195 | 0 | << " due to: " << error->status(); |
196 | 6.01k | const auto op = error->shared_failed_op(); |
197 | 6.01k | op->ResetTablet(); |
198 | 6.01k | retry_batcher->Add(op); |
199 | 6.01k | } |
200 | 18.4E | FlushBatcherAsync(retry_batcher, std::move(callback), batcher_config, |
201 | 18.4E | internal::IsWithinTransactionRetry::kTrue); |
202 | 18.4E | } |
203 | | |
204 | | void FlushBatcherAsync( |
205 | | const internal::BatcherPtr& batcher, FlushCallback callback, |
206 | | YBSession::BatcherConfig batcher_config, |
207 | 5.72M | const internal::IsWithinTransactionRetry is_within_transaction_retry) { |
208 | 5.72M | batcher->set_allow_local_calls_in_curr_thread( |
209 | 5.72M | batcher_config.allow_local_calls_in_curr_thread); |
210 | 5.72M | batcher->FlushAsync( |
211 | 5.72M | std::bind( |
212 | 5.72M | &BatcherFlushDone, batcher, _1, std::move(callback), batcher_config), |
213 | 5.72M | is_within_transaction_retry); |
214 | 5.72M | } |
215 | | |
216 | | } // namespace |
217 | | |
218 | 5.75M | void YBSession::FlushAsync(FlushCallback callback) { |
219 | | // Swap in a new batcher to start building the next batch. |
220 | | // Save off the old batcher. |
221 | | // |
222 | | // Send off any buffered data. Important to do this outside of the lock |
223 | | // since the callback may itself try to take the lock, in the case that |
224 | | // the batch fails "inline" on the same thread. |
225 | | |
226 | 5.75M | internal::BatcherPtr old_batcher; |
227 | 5.75M | old_batcher.swap(batcher_); |
228 | 5.75M | if (old_batcher) { |
229 | 5.74M | FlushBatcherAsync( |
230 | 5.74M | old_batcher, std::move(callback), batcher_config_, |
231 | 5.74M | internal::IsWithinTransactionRetry::kFalse); |
232 | 8.98k | } else { |
233 | 8.98k | FlushStatus ok; |
234 | 8.98k | callback(&ok); |
235 | 8.98k | } |
236 | 5.75M | } |
237 | | |
238 | 38.4k | std::future<FlushStatus> YBSession::FlushFuture() { |
239 | 38.4k | auto promise = std::make_shared<std::promise<FlushStatus>>(); |
240 | 38.4k | auto future = promise->get_future(); |
241 | 38.4k | FlushAsync([promise](FlushStatus* status) { |
242 | 38.4k | promise->set_value(std::move(*status)); |
243 | 38.4k | }); |
244 | 38.4k | return future; |
245 | 38.4k | } |
246 | | |
247 | 375 | Status YBSession::ReadSync(std::shared_ptr<YBOperation> yb_op) { |
248 | 375 | CHECK(yb_op->read_only()); |
249 | 375 | return ApplyAndFlush(std::move(yb_op)); |
250 | 375 | } |
251 | | |
252 | 0 | YBClient* YBSession::client() const { |
253 | 0 | return batcher_config_.client; |
254 | 0 | } |
255 | | |
256 | 5.76M | void YBSession::FlushStarted(internal::BatcherPtr batcher) { |
257 | 5.76M | std::lock_guard<simple_spinlock> l(lock_); |
258 | 5.76M | flushed_batchers_.insert(batcher); |
259 | 5.76M | } |
260 | | |
261 | 5.76M | void YBSession::FlushFinished(internal::BatcherPtr batcher) { |
262 | 5.76M | std::lock_guard<simple_spinlock> l(lock_); |
263 | 5.76M | CHECK_EQ(flushed_batchers_.erase(batcher), 1); |
264 | 5.76M | } |
265 | | |
266 | 104k | bool YBSession::allow_local_calls_in_curr_thread() const { |
267 | 104k | return batcher_config_.allow_local_calls_in_curr_thread; |
268 | 104k | } |
269 | | |
270 | 109k | void YBSession::set_allow_local_calls_in_curr_thread(bool flag) { |
271 | 109k | batcher_config_.allow_local_calls_in_curr_thread = flag; |
272 | 109k | } |
273 | | |
274 | 531k | void YBSession::SetInTxnLimit(HybridTime value) { |
275 | 531k | auto* rp = read_point(); |
276 | 71 | LOG_IF(DFATAL, rp == nullptr) |
277 | 71 | << __FUNCTION__ << "(" << value << ") called on YBSession " << this |
278 | 71 | << " but read point is null"; |
279 | 531k | if (rp) { |
280 | 531k | rp->SetInTxnLimit(value); |
281 | 531k | } |
282 | 531k | } |
283 | | |
284 | 6.48M | ConsistentReadPoint* YBSession::BatcherConfig::read_point() const { |
285 | 5.40M | return transaction ? &transaction->read_point() : non_transactional_read_point.get(); |
286 | 6.48M | } |
287 | | |
288 | | |
289 | 706k | ConsistentReadPoint* YBSession::read_point() { |
290 | 706k | return batcher_config_.read_point(); |
291 | 706k | } |
292 | | |
293 | 11.0M | internal::Batcher& YBSession::Batcher() { |
294 | 11.0M | if (!batcher_) { |
295 | 5.74M | batcher_config_.session = shared_from_this(); |
296 | 5.74M | batcher_ = CreateBatcher(batcher_config_); |
297 | 5.74M | if (deadline_ != CoarseTimePoint()) { |
298 | 5.63M | batcher_->SetDeadline(deadline_); |
299 | 109k | } else { |
300 | 109k | auto timeout = timeout_; |
301 | 109k | if (PREDICT_FALSE(!timeout.Initialized())) { |
302 | 1.16k | YB_LOG_EVERY_N(WARNING, 100000) |
303 | 90 | << "Client writing with no deadline set, using 60 seconds.\n" |
304 | 90 | << GetStackTrace(); |
305 | 1.16k | timeout = MonoDelta::FromSeconds(60); |
306 | 1.16k | } |
307 | | |
308 | 109k | batcher_->SetDeadline(CoarseMonoClock::now() + timeout); |
309 | 109k | } |
310 | 5.74M | } |
311 | 11.0M | return *batcher_; |
312 | 11.0M | } |
313 | | |
314 | 11.0M | void YBSession::Apply(YBOperationPtr yb_op) { |
315 | 11.0M | Batcher().Add(yb_op); |
316 | 11.0M | } |
317 | | |
318 | 573 | Status YBSession::ApplyAndFlush(YBOperationPtr yb_op) { |
319 | 573 | Apply(std::move(yb_op)); |
320 | | |
321 | 573 | return FlushFuture().get().status; |
322 | 573 | } |
323 | | |
324 | 900 | bool YBSession::IsInProgress(YBOperationPtr yb_op) const { |
325 | 900 | if (batcher_ && batcher_->Has(yb_op)) { |
326 | 356 | return true; |
327 | 356 | } |
328 | 544 | std::lock_guard<simple_spinlock> l(lock_); |
329 | 0 | for (const auto& b : flushed_batchers_) { |
330 | 0 | if (b->Has(yb_op)) { |
331 | 0 | return true; |
332 | 0 | } |
333 | 0 | } |
334 | 544 | return false; |
335 | 544 | } |
336 | | |
337 | 153 | void YBSession::Apply(const std::vector<YBOperationPtr>& ops) { |
338 | 153 | if (ops.empty()) { |
339 | 3 | return; |
340 | 3 | } |
341 | 150 | auto& batcher = Batcher(); |
342 | 2.54k | for (const auto& op : ops) { |
343 | 2.54k | batcher.Add(op); |
344 | 2.54k | } |
345 | 150 | } |
346 | | |
347 | 153 | Status YBSession::ApplyAndFlush(const std::vector<YBOperationPtr>& ops) { |
348 | 153 | Apply(ops); |
349 | 153 | return FlushFuture().get().status; |
350 | 153 | } |
351 | | |
352 | 0 | size_t YBSession::TEST_CountBufferedOperations() const { |
353 | 0 | return batcher_ ? batcher_->CountBufferedOperations() : 0; |
354 | 0 | } |
355 | | |
356 | 9.93M | bool YBSession::HasNotFlushedOperations() const { |
357 | 9.93M | return batcher_ != nullptr && batcher_->HasPendingOperations(); |
358 | 9.93M | } |
359 | | |
360 | 0 | bool YBSession::TEST_HasPendingOperations() const { |
361 | 0 | if (batcher_ && batcher_->HasPendingOperations()) { |
362 | 0 | return true; |
363 | 0 | } |
364 | 0 | std::lock_guard<simple_spinlock> l(lock_); |
365 | 0 | for (const auto& b : flushed_batchers_) { |
366 | 0 | if (b->HasPendingOperations()) { |
367 | 0 | return true; |
368 | 0 | } |
369 | 0 | } |
370 | 0 | return false; |
371 | 0 | } |
372 | | |
373 | 4.89M | void YBSession::SetForceConsistentRead(ForceConsistentRead value) { |
374 | 4.89M | batcher_config_.force_consistent_read = value; |
375 | 4.89M | if (batcher_) { |
376 | 5.30k | batcher_->SetForceConsistentRead(value); |
377 | 5.30k | } |
378 | 4.89M | } |
379 | | |
380 | 620k | bool ShouldSessionRetryError(const Status& status) { |
381 | 620k | return IsRetryableClientError(status) || |
382 | 619k | tserver::TabletServerError(status) == tserver::TabletServerErrorPB::TABLET_SPLIT || |
383 | 619k | consensus::ConsensusError(status) == consensus::ConsensusErrorPB::TABLET_SPLIT; |
384 | 620k | } |
385 | | |
386 | | } // namespace client |
387 | | } // namespace yb |