YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/client/session.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/client/session.h"
15
16
#include "yb/client/async_rpc.h"
17
#include "yb/client/batcher.h"
18
#include "yb/client/client.h"
19
#include "yb/client/client_error.h"
20
#include "yb/client/error.h"
21
#include "yb/client/error_collector.h"
22
#include "yb/client/yb_op.h"
23
24
#include "yb/common/consistent_read_point.h"
25
26
#include "yb/consensus/consensus_error.h"
27
28
#include "yb/tserver/tserver_error.h"
29
30
#include "yb/util/debug-util.h"
31
#include "yb/util/logging.h"
32
#include "yb/util/metrics.h"
33
#include "yb/util/status_log.h"
34
35
using namespace std::literals;
36
using namespace std::placeholders;
37
38
DEFINE_int32(client_read_write_timeout_ms, 60000, "Timeout for client read and write operations.");
39
40
namespace yb {
41
namespace client {
42
43
using internal::AsyncRpcMetrics;
44
using internal::Batcher;
45
46
using std::shared_ptr;
47
48
124k
YBSession::YBSession(YBClient* client, const scoped_refptr<ClockBase>& clock) {
49
124k
  batcher_config_.client = client;
50
124k
  batcher_config_.non_transactional_read_point =
51
102k
      clock ? std::make_unique<ConsistentReadPoint>(clock) : nullptr;
52
124k
  const auto metric_entity = client->metric_entity();
53
123k
  async_rpc_metrics_ = metric_entity ? std::make_shared<AsyncRpcMetrics>(metric_entity) : nullptr;
54
124k
}
55
56
4.95M
void YBSession::SetReadPoint(const Restart restart) {
57
4.95M
  const auto& read_point = batcher_config_.non_transactional_read_point;
58
4.95M
  DCHECK_NOTNULL(read_point.get());
59
4.95M
  if (restart && read_point->IsRestartRequired()) {
60
33
    read_point->Restart();
61
4.95M
  } else {
62
4.95M
    read_point->SetCurrentReadTime();
63
4.95M
  }
64
4.95M
}
65
66
211k
void YBSession::SetReadPoint(const ReadHybridTime& read_time) {
67
211k
  batcher_config_.non_transactional_read_point->SetReadTime(read_time, {} /* local_limits */);
68
211k
}
69
70
0
bool YBSession::IsRestartRequired() {
71
0
  auto rp = read_point();
72
0
  return rp && rp->IsRestartRequired();
73
0
}
74
75
0
void YBSession::DeferReadPoint() {
76
0
  batcher_config_.non_transactional_read_point->Defer();
77
0
}
78
79
492k
void YBSession::SetTransaction(YBTransactionPtr transaction) {
80
492k
  batcher_config_.transaction = std::move(transaction);
81
492k
  internal::BatcherPtr old_batcher;
82
492k
  old_batcher.swap(batcher_);
83
273
  LOG_IF(DFATAL, old_batcher) << "SetTransaction with non empty batcher";
84
492k
}
85
86
4.81M
void YBSession::SetRejectionScoreSource(RejectionScoreSourcePtr rejection_score_source) {
87
4.81M
  if (batcher_) {
88
4.81M
    batcher_->SetRejectionScoreSource(rejection_score_source);
89
4.81M
  }
90
4.81M
  batcher_config_.rejection_score_source = std::move(rejection_score_source);
91
4.81M
}
92
93
102k
YBSession::~YBSession() {
94
102k
  WARN_NOT_OK(Close(true), "Closed Session with pending operations.");
95
102k
}
96
97
4.76M
void YBSession::Abort() {
98
4.76M
  if (batcher_ && batcher_->HasPendingOperations()) {
99
42
    batcher_.reset();
100
42
  }
101
4.76M
}
102
103
102k
Status YBSession::Close(bool force) {
104
102k
  if (batcher_) {
105
0
    if (batcher_->HasPendingOperations() && !force) {
106
0
      return STATUS(IllegalState, "Could not close. There are pending operations.");
107
0
    }
108
0
    batcher_.reset();
109
0
  }
110
102k
  return Status::OK();
111
102k
}
112
113
3.07k
void YBSession::SetTimeout(MonoDelta timeout) {
114
3.07k
  CHECK_GE(timeout, MonoDelta::kZero);
115
3.07k
  deadline_ = CoarseTimePoint();
116
3.07k
  timeout_ = timeout;
117
3.07k
  if (batcher_) {
118
0
    batcher_->SetDeadline(CoarseMonoClock::now() + timeout_);
119
0
  }
120
3.07k
}
121
122
5.64M
void YBSession::SetDeadline(CoarseTimePoint deadline) {
123
5.64M
  timeout_ = MonoDelta();
124
5.64M
  deadline_ = deadline;
125
5.64M
  if (batcher_) {
126
0
    batcher_->SetDeadline(deadline);
127
0
  }
128
5.64M
}
129
130
25.0k
Status YBSession::Flush() {
131
25.0k
  return FlushFuture().get().status;
132
25.0k
}
133
134
12.6k
FlushStatus YBSession::FlushAndGetOpsErrors() {
135
12.6k
  return FlushFuture().get();
136
12.6k
}
137
138
namespace {
139
140
5.77M
internal::BatcherPtr CreateBatcher(const YBSession::BatcherConfig& config) {
141
5.77M
  auto batcher = std::make_shared<internal::Batcher>(
142
5.77M
      config.client, config.session.lock(), config.transaction, config.read_point(),
143
5.77M
      config.force_consistent_read);
144
5.77M
  batcher->SetRejectionScoreSource(config.rejection_score_source);
145
5.77M
  return batcher;
146
5.77M
}
147
148
void FlushBatcherAsync(
149
    const internal::BatcherPtr& batcher, FlushCallback callback, YBSession::BatcherConfig config,
150
    const internal::IsWithinTransactionRetry is_within_transaction_retry);
151
152
void MoveErrorsAndRunCallback(
153
    const internal::BatcherPtr& done_batcher, CollectedErrors errors, FlushCallback callback,
154
5.74M
    const Status& status) {
155
5.64M
  const auto vlog_level = status.ok() ? 4 : 3;
156
18.4E
  VLOG_WITH_FUNC(vlog_level) << "Invoking callback, batcher: " << done_batcher->LogPrefix()
157
18.4E
                             << ", num_errors: " << errors.size() << " status: " << status;
158
0
  if (VLOG_IS_ON(5)) {
159
0
    for (auto& error : errors) {
160
0
      VLOG(5) << "Operation " << AsString(error->failed_op())
161
0
              << " failed with: " << AsString(error->status());
162
0
    }
163
0
  }
164
  // TODO: before enabling transaction sealing we might need to call Transaction::Flushed
165
  // for ops that we have retried, failed again and decided not to retry due to deadline.
166
  // See comments for YBTransaction::Impl::running_requests_ and
167
  // Batcher::RemoveInFlightOpsAfterFlushing.
168
  // https://github.com/yugabyte/yugabyte-db/issues/7984.
169
5.74M
  FlushStatus flush_status{status, std::move(errors)};
170
5.74M
  callback(&flush_status);
171
5.74M
}
172
173
void BatcherFlushDone(
174
    const internal::BatcherPtr& done_batcher, const Status& s,
175
5.74M
    FlushCallback callback, YBSession::BatcherConfig batcher_config) {
176
5.74M
  auto errors = done_batcher->GetAndClearPendingErrors();
177
5.74M
  size_t retriable_errors_count = 0;
178
496k
  for (auto& error : errors) {
179
496k
    retriable_errors_count += ShouldSessionRetryError(error->status());
180
496k
  }
181
5.76M
  if (errors.size() > retriable_errors_count || errors.empty()) {
182
    // We only retry failed ops if all of them failed with retriable errors.
183
5.76M
    MoveErrorsAndRunCallback(done_batcher, std::move(errors), std::move(callback), s);
184
5.76M
    return;
185
5.76M
  }
186
187
18.4E
  VLOG_WITH_FUNC(3) << "Retrying " << errors.size() << " operations from batcher "
188
18.4E
                    << done_batcher->LogPrefix() << " due to: " << s
189
18.4E
                    << ": (first op error: " << errors[0]->status() << ")";
190
191
18.4E
  internal::BatcherPtr retry_batcher = CreateBatcher(batcher_config);
192
18.4E
  retry_batcher->SetDeadline(done_batcher->deadline());
193
6.01k
  for (auto& error : errors) {
194
0
    VLOG_WITH_FUNC(5) << "Retrying " << AsString(error->failed_op())
195
0
                      << " due to: " << error->status();
196
6.01k
    const auto op = error->shared_failed_op();
197
6.01k
    op->ResetTablet();
198
6.01k
    retry_batcher->Add(op);
199
6.01k
  }
200
18.4E
  FlushBatcherAsync(retry_batcher, std::move(callback), batcher_config,
201
18.4E
      internal::IsWithinTransactionRetry::kTrue);
202
18.4E
}
203
204
void FlushBatcherAsync(
205
    const internal::BatcherPtr& batcher, FlushCallback callback,
206
    YBSession::BatcherConfig batcher_config,
207
5.72M
    const internal::IsWithinTransactionRetry is_within_transaction_retry) {
208
5.72M
  batcher->set_allow_local_calls_in_curr_thread(
209
5.72M
      batcher_config.allow_local_calls_in_curr_thread);
210
5.72M
  batcher->FlushAsync(
211
5.72M
      std::bind(
212
5.72M
          &BatcherFlushDone, batcher, _1, std::move(callback), batcher_config),
213
5.72M
      is_within_transaction_retry);
214
5.72M
}
215
216
} // namespace
217
218
5.75M
void YBSession::FlushAsync(FlushCallback callback) {
219
  // Swap in a new batcher to start building the next batch.
220
  // Save off the old batcher.
221
  //
222
  // Send off any buffered data. Important to do this outside of the lock
223
  // since the callback may itself try to take the lock, in the case that
224
  // the batch fails "inline" on the same thread.
225
226
5.75M
  internal::BatcherPtr old_batcher;
227
5.75M
  old_batcher.swap(batcher_);
228
5.75M
  if (old_batcher) {
229
5.74M
    FlushBatcherAsync(
230
5.74M
        old_batcher, std::move(callback), batcher_config_,
231
5.74M
        internal::IsWithinTransactionRetry::kFalse);
232
8.98k
  } else {
233
8.98k
    FlushStatus ok;
234
8.98k
    callback(&ok);
235
8.98k
  }
236
5.75M
}
237
238
38.4k
std::future<FlushStatus> YBSession::FlushFuture() {
239
38.4k
  auto promise = std::make_shared<std::promise<FlushStatus>>();
240
38.4k
  auto future = promise->get_future();
241
38.4k
  FlushAsync([promise](FlushStatus* status) {
242
38.4k
      promise->set_value(std::move(*status));
243
38.4k
  });
244
38.4k
  return future;
245
38.4k
}
246
247
375
Status YBSession::ReadSync(std::shared_ptr<YBOperation> yb_op) {
248
375
  CHECK(yb_op->read_only());
249
375
  return ApplyAndFlush(std::move(yb_op));
250
375
}
251
252
0
YBClient* YBSession::client() const {
253
0
  return batcher_config_.client;
254
0
}
255
256
5.76M
void YBSession::FlushStarted(internal::BatcherPtr batcher) {
257
5.76M
  std::lock_guard<simple_spinlock> l(lock_);
258
5.76M
  flushed_batchers_.insert(batcher);
259
5.76M
}
260
261
5.76M
void YBSession::FlushFinished(internal::BatcherPtr batcher) {
262
5.76M
  std::lock_guard<simple_spinlock> l(lock_);
263
5.76M
  CHECK_EQ(flushed_batchers_.erase(batcher), 1);
264
5.76M
}
265
266
104k
bool YBSession::allow_local_calls_in_curr_thread() const {
267
104k
  return batcher_config_.allow_local_calls_in_curr_thread;
268
104k
}
269
270
109k
void YBSession::set_allow_local_calls_in_curr_thread(bool flag) {
271
109k
  batcher_config_.allow_local_calls_in_curr_thread = flag;
272
109k
}
273
274
531k
void YBSession::SetInTxnLimit(HybridTime value) {
275
531k
  auto* rp = read_point();
276
71
  LOG_IF(DFATAL, rp == nullptr)
277
71
      << __FUNCTION__ << "(" << value << ") called on YBSession " << this
278
71
      << " but read point is null";
279
531k
  if (rp) {
280
531k
    rp->SetInTxnLimit(value);
281
531k
  }
282
531k
}
283
284
6.48M
ConsistentReadPoint* YBSession::BatcherConfig::read_point() const {
285
5.40M
  return transaction ? &transaction->read_point() : non_transactional_read_point.get();
286
6.48M
}
287
288
289
706k
ConsistentReadPoint* YBSession::read_point() {
290
706k
  return batcher_config_.read_point();
291
706k
}
292
293
11.0M
internal::Batcher& YBSession::Batcher() {
294
11.0M
  if (!batcher_) {
295
5.74M
    batcher_config_.session = shared_from_this();
296
5.74M
    batcher_ = CreateBatcher(batcher_config_);
297
5.74M
    if (deadline_ != CoarseTimePoint()) {
298
5.63M
      batcher_->SetDeadline(deadline_);
299
109k
    } else {
300
109k
      auto timeout = timeout_;
301
109k
      if (PREDICT_FALSE(!timeout.Initialized())) {
302
1.16k
        YB_LOG_EVERY_N(WARNING, 100000)
303
90
            << "Client writing with no deadline set, using 60 seconds.\n"
304
90
            << GetStackTrace();
305
1.16k
        timeout = MonoDelta::FromSeconds(60);
306
1.16k
      }
307
308
109k
      batcher_->SetDeadline(CoarseMonoClock::now() + timeout);
309
109k
    }
310
5.74M
  }
311
11.0M
  return *batcher_;
312
11.0M
}
313
314
11.0M
void YBSession::Apply(YBOperationPtr yb_op) {
315
11.0M
  Batcher().Add(yb_op);
316
11.0M
}
317
318
573
Status YBSession::ApplyAndFlush(YBOperationPtr yb_op) {
319
573
  Apply(std::move(yb_op));
320
321
573
  return FlushFuture().get().status;
322
573
}
323
324
900
bool YBSession::IsInProgress(YBOperationPtr yb_op) const {
325
900
  if (batcher_ && batcher_->Has(yb_op)) {
326
356
    return true;
327
356
  }
328
544
  std::lock_guard<simple_spinlock> l(lock_);
329
0
  for (const auto& b : flushed_batchers_) {
330
0
    if (b->Has(yb_op)) {
331
0
      return true;
332
0
    }
333
0
  }
334
544
  return false;
335
544
}
336
337
153
void YBSession::Apply(const std::vector<YBOperationPtr>& ops) {
338
153
  if (ops.empty()) {
339
3
    return;
340
3
  }
341
150
  auto& batcher = Batcher();
342
2.54k
  for (const auto& op : ops) {
343
2.54k
    batcher.Add(op);
344
2.54k
  }
345
150
}
346
347
153
Status YBSession::ApplyAndFlush(const std::vector<YBOperationPtr>& ops) {
348
153
  Apply(ops);
349
153
  return FlushFuture().get().status;
350
153
}
351
352
0
size_t YBSession::TEST_CountBufferedOperations() const {
353
0
  return batcher_ ? batcher_->CountBufferedOperations() : 0;
354
0
}
355
356
9.93M
bool YBSession::HasNotFlushedOperations() const {
357
9.93M
  return batcher_ != nullptr && batcher_->HasPendingOperations();
358
9.93M
}
359
360
0
bool YBSession::TEST_HasPendingOperations() const {
361
0
  if (batcher_ && batcher_->HasPendingOperations()) {
362
0
    return true;
363
0
  }
364
0
  std::lock_guard<simple_spinlock> l(lock_);
365
0
  for (const auto& b : flushed_batchers_) {
366
0
    if (b->HasPendingOperations()) {
367
0
      return true;
368
0
    }
369
0
  }
370
0
  return false;
371
0
}
372
373
4.89M
void YBSession::SetForceConsistentRead(ForceConsistentRead value) {
374
4.89M
  batcher_config_.force_consistent_read = value;
375
4.89M
  if (batcher_) {
376
5.30k
    batcher_->SetForceConsistentRead(value);
377
5.30k
  }
378
4.89M
}
379
380
620k
bool ShouldSessionRetryError(const Status& status) {
381
620k
  return IsRetryableClientError(status) ||
382
619k
         tserver::TabletServerError(status) == tserver::TabletServerErrorPB::TABLET_SPLIT ||
383
619k
         consensus::ConsensusError(status) == consensus::ConsensusErrorPB::TABLET_SPLIT;
384
620k
}
385
386
} // namespace client
387
} // namespace yb