YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/client/session.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/client/session.h"
15
16
#include "yb/client/async_rpc.h"
17
#include "yb/client/batcher.h"
18
#include "yb/client/client.h"
19
#include "yb/client/client_error.h"
20
#include "yb/client/error.h"
21
#include "yb/client/error_collector.h"
22
#include "yb/client/yb_op.h"
23
24
#include "yb/common/consistent_read_point.h"
25
26
#include "yb/consensus/consensus_error.h"
27
28
#include "yb/tserver/tserver_error.h"
29
30
#include "yb/util/debug-util.h"
31
#include "yb/util/logging.h"
32
#include "yb/util/metrics.h"
33
#include "yb/util/status_log.h"
34
35
using namespace std::literals;
36
using namespace std::placeholders;
37
38
DEFINE_int32(client_read_write_timeout_ms, 60000, "Timeout for client read and write operations.");
39
40
namespace yb {
41
namespace client {
42
43
using internal::AsyncRpcMetrics;
44
using internal::Batcher;
45
46
using std::shared_ptr;
47
48
130k
YBSession::YBSession(YBClient* client, const scoped_refptr<ClockBase>& clock) {
49
130k
  batcher_config_.client = client;
50
130k
  batcher_config_.non_transactional_read_point =
51
130k
      clock ? 
std::make_unique<ConsistentReadPoint>(clock)109k
:
nullptr20.8k
;
52
130k
  const auto metric_entity = client->metric_entity();
53
130k
  async_rpc_metrics_ = metric_entity ? 
std::make_shared<AsyncRpcMetrics>(metric_entity)129k
:
nullptr1.37k
;
54
130k
}
55
56
9.48M
void YBSession::SetReadPoint(const Restart restart) {
57
9.48M
  const auto& read_point = batcher_config_.non_transactional_read_point;
58
9.48M
  DCHECK_NOTNULL(read_point.get());
59
9.48M
  if (restart && 
read_point->IsRestartRequired()90.4k
) {
60
522
    read_point->Restart();
61
9.48M
  } else {
62
9.48M
    read_point->SetCurrentReadTime();
63
9.48M
  }
64
9.48M
}
65
66
702k
void YBSession::SetReadPoint(const ReadHybridTime& read_time) {
67
702k
  batcher_config_.non_transactional_read_point->SetReadTime(read_time, {} /* local_limits */);
68
702k
}
69
70
508
bool YBSession::IsRestartRequired() {
71
508
  auto rp = read_point();
72
508
  return rp && rp->IsRestartRequired();
73
508
}
74
75
80
void YBSession::DeferReadPoint() {
76
80
  batcher_config_.non_transactional_read_point->Defer();
77
80
}
78
79
825k
void YBSession::SetTransaction(YBTransactionPtr transaction) {
80
825k
  batcher_config_.transaction = std::move(transaction);
81
825k
  internal::BatcherPtr old_batcher;
82
825k
  old_batcher.swap(batcher_);
83
825k
  LOG_IF
(DFATAL, old_batcher) << "SetTransaction with non empty batcher"827
;
84
825k
}
85
86
9.14M
void YBSession::SetRejectionScoreSource(RejectionScoreSourcePtr rejection_score_source) {
87
9.14M
  if (batcher_) {
88
9.14M
    batcher_->SetRejectionScoreSource(rejection_score_source);
89
9.14M
  }
90
9.14M
  batcher_config_.rejection_score_source = std::move(rejection_score_source);
91
9.14M
}
92
93
99.4k
YBSession::~YBSession() {
94
99.4k
  WARN_NOT_OK(Close(true), "Closed Session with pending operations.");
95
99.4k
}
96
97
9.10M
void YBSession::Abort() {
98
9.10M
  if (batcher_ && 
batcher_->HasPendingOperations()58
) {
99
58
    batcher_.reset();
100
58
  }
101
9.10M
}
102
103
99.4k
Status YBSession::Close(bool force) {
104
99.4k
  if (batcher_) {
105
0
    if (batcher_->HasPendingOperations() && !force) {
106
0
      return STATUS(IllegalState, "Could not close. There are pending operations.");
107
0
    }
108
0
    batcher_.reset();
109
0
  }
110
99.4k
  return Status::OK();
111
99.4k
}
112
113
1.79k
void YBSession::SetTimeout(MonoDelta timeout) {
114
1.79k
  CHECK_GE(timeout, MonoDelta::kZero);
115
1.79k
  deadline_ = CoarseTimePoint();
116
1.79k
  timeout_ = timeout;
117
1.79k
  if (batcher_) {
118
0
    batcher_->SetDeadline(CoarseMonoClock::now() + timeout_);
119
0
  }
120
1.79k
}
121
122
11.4M
void YBSession::SetDeadline(CoarseTimePoint deadline) {
123
11.4M
  timeout_ = MonoDelta();
124
11.4M
  deadline_ = deadline;
125
11.4M
  if (batcher_) {
126
0
    batcher_->SetDeadline(deadline);
127
0
  }
128
11.4M
}
129
130
3.13k
Status YBSession::Flush() {
131
3.13k
  return FlushFuture().get().status;
132
3.13k
}
133
134
12.5k
FlushStatus YBSession::FlushAndGetOpsErrors() {
135
12.5k
  return FlushFuture().get();
136
12.5k
}
137
138
namespace {
139
140
11.6M
internal::BatcherPtr CreateBatcher(const YBSession::BatcherConfig& config) {
141
11.6M
  auto batcher = std::make_shared<internal::Batcher>(
142
11.6M
      config.client, config.session.lock(), config.transaction, config.read_point(),
143
11.6M
      config.force_consistent_read);
144
11.6M
  batcher->SetRejectionScoreSource(config.rejection_score_source);
145
11.6M
  return batcher;
146
11.6M
}
147
148
void FlushBatcherAsync(
149
    const internal::BatcherPtr& batcher, FlushCallback callback, YBSession::BatcherConfig config,
150
    const internal::IsWithinTransactionRetry is_within_transaction_retry);
151
152
void MoveErrorsAndRunCallback(
153
    const internal::BatcherPtr& done_batcher, CollectedErrors errors, FlushCallback callback,
154
11.5M
    const Status& status) {
155
11.5M
  const auto vlog_level = status.ok() ? 
411.4M
:
3140k
;
156
11.5M
  
VLOG_WITH_FUNC7.92k
(vlog_level) << "Invoking callback, batcher: " << done_batcher->LogPrefix()
157
7.92k
                             << ", num_errors: " << errors.size() << " status: " << status;
158
11.5M
  if (VLOG_IS_ON(5)) {
159
0
    for (auto& error : errors) {
160
0
      VLOG(5) << "Operation " << AsString(error->failed_op())
161
0
              << " failed with: " << AsString(error->status());
162
0
    }
163
0
  }
164
  // TODO: before enabling transaction sealing we might need to call Transaction::Flushed
165
  // for ops that we have retried, failed again and decided not to retry due to deadline.
166
  // See comments for YBTransaction::Impl::running_requests_ and
167
  // Batcher::RemoveInFlightOpsAfterFlushing.
168
  // https://github.com/yugabyte/yugabyte-db/issues/7984.
169
11.5M
  FlushStatus flush_status{status, std::move(errors)};
170
11.5M
  callback(&flush_status);
171
11.5M
}
172
173
void BatcherFlushDone(
174
    const internal::BatcherPtr& done_batcher, const Status& s,
175
11.5M
    FlushCallback callback, YBSession::BatcherConfig batcher_config) {
176
11.5M
  auto errors = done_batcher->GetAndClearPendingErrors();
177
11.5M
  size_t retriable_errors_count = 0;
178
11.5M
  for (auto& error : errors) {
179
995k
    retriable_errors_count += ShouldSessionRetryError(error->status());
180
995k
  }
181
11.5M
  if (
errors.size() > retriable_errors_count11.5M
||
errors.empty()11.4M
) {
182
    // We only retry failed ops if all of them failed with retriable errors.
183
11.5M
    MoveErrorsAndRunCallback(done_batcher, std::move(errors), std::move(callback), s);
184
11.5M
    return;
185
11.5M
  }
186
187
18.4E
  
VLOG_WITH_FUNC18.4E
(3) << "Retrying " << errors.size() << " operations from batcher "
188
18.4E
                    << done_batcher->LogPrefix() << " due to: " << s
189
18.4E
                    << ": (first op error: " << errors[0]->status() << ")";
190
191
18.4E
  internal::BatcherPtr retry_batcher = CreateBatcher(batcher_config);
192
18.4E
  retry_batcher->SetDeadline(done_batcher->deadline());
193
18.4E
  for (auto& error : errors) {
194
9.03k
    
VLOG_WITH_FUNC0
(5) << "Retrying " << AsString(error->failed_op())
195
0
                      << " due to: " << error->status();
196
9.03k
    const auto op = error->shared_failed_op();
197
9.03k
    op->ResetTablet();
198
9.03k
    retry_batcher->Add(op);
199
9.03k
  }
200
18.4E
  FlushBatcherAsync(retry_batcher, std::move(callback), batcher_config,
201
18.4E
      internal::IsWithinTransactionRetry::kTrue);
202
18.4E
}
203
204
void FlushBatcherAsync(
205
    const internal::BatcherPtr& batcher, FlushCallback callback,
206
    YBSession::BatcherConfig batcher_config,
207
11.5M
    const internal::IsWithinTransactionRetry is_within_transaction_retry) {
208
11.5M
  batcher->set_allow_local_calls_in_curr_thread(
209
11.5M
      batcher_config.allow_local_calls_in_curr_thread);
210
11.5M
  batcher->FlushAsync(
211
11.5M
      std::bind(
212
11.5M
          &BatcherFlushDone, batcher, _1, std::move(callback), batcher_config),
213
11.5M
      is_within_transaction_retry);
214
11.5M
}
215
216
} // namespace
217
218
11.5M
void YBSession::FlushAsync(FlushCallback callback) {
219
  // Swap in a new batcher to start building the next batch.
220
  // Save off the old batcher.
221
  //
222
  // Send off any buffered data. Important to do this outside of the lock
223
  // since the callback may itself try to take the lock, in the case that
224
  // the batch fails "inline" on the same thread.
225
226
11.5M
  internal::BatcherPtr old_batcher;
227
11.5M
  old_batcher.swap(batcher_);
228
11.5M
  if (old_batcher) {
229
11.5M
    FlushBatcherAsync(
230
11.5M
        old_batcher, std::move(callback), batcher_config_,
231
11.5M
        internal::IsWithinTransactionRetry::kFalse);
232
11.5M
  } else {
233
2.88k
    FlushStatus ok;
234
2.88k
    callback(&ok);
235
2.88k
  }
236
11.5M
}
237
238
23.5k
std::future<FlushStatus> YBSession::FlushFuture() {
239
23.5k
  auto promise = std::make_shared<std::promise<FlushStatus>>();
240
23.5k
  auto future = promise->get_future();
241
23.5k
  FlushAsync([promise](FlushStatus* status) {
242
23.5k
      promise->set_value(std::move(*status));
243
23.5k
  });
244
23.5k
  return future;
245
23.5k
}
246
247
3.84k
Status YBSession::ReadSync(std::shared_ptr<YBOperation> yb_op) {
248
3.84k
  CHECK(yb_op->read_only());
249
3.84k
  return ApplyAndFlush(std::move(yb_op));
250
3.84k
}
251
252
0
YBClient* YBSession::client() const {
253
0
  return batcher_config_.client;
254
0
}
255
256
11.6M
void YBSession::FlushStarted(internal::BatcherPtr batcher) {
257
11.6M
  std::lock_guard<simple_spinlock> l(lock_);
258
11.6M
  flushed_batchers_.insert(batcher);
259
11.6M
}
260
261
11.5M
void YBSession::FlushFinished(internal::BatcherPtr batcher) {
262
11.5M
  std::lock_guard<simple_spinlock> l(lock_);
263
11.5M
  CHECK_EQ(flushed_batchers_.erase(batcher), 1);
264
11.5M
}
265
266
208k
bool YBSession::allow_local_calls_in_curr_thread() const {
267
208k
  return batcher_config_.allow_local_calls_in_curr_thread;
268
208k
}
269
270
220k
void YBSession::set_allow_local_calls_in_curr_thread(bool flag) {
271
220k
  batcher_config_.allow_local_calls_in_curr_thread = flag;
272
220k
}
273
274
995k
void YBSession::SetInTxnLimit(HybridTime value) {
275
995k
  auto* rp = read_point();
276
995k
  LOG_IF(DFATAL, rp == nullptr)
277
915
      << __FUNCTION__ << "(" << value << ") called on YBSession " << this
278
915
      << " but read point is null";
279
995k
  if (rp) {
280
994k
    rp->SetInTxnLimit(value);
281
994k
  }
282
995k
}
283
284
12.7M
ConsistentReadPoint* YBSession::BatcherConfig::read_point() const {
285
12.7M
  return transaction ? 
&transaction->read_point()1.88M
:
non_transactional_read_point.get()10.8M
;
286
12.7M
}
287
288
289
1.15M
ConsistentReadPoint* YBSession::read_point() {
290
1.15M
  return batcher_config_.read_point();
291
1.15M
}
292
293
23.2M
internal::Batcher& YBSession::Batcher() {
294
23.2M
  if (!batcher_) {
295
11.5M
    batcher_config_.session = shared_from_this();
296
11.5M
    batcher_ = CreateBatcher(batcher_config_);
297
11.5M
    if (deadline_ != CoarseTimePoint()) {
298
11.3M
      batcher_->SetDeadline(deadline_);
299
11.3M
    } else {
300
166k
      auto timeout = timeout_;
301
166k
      if (PREDICT_FALSE(!timeout.Initialized())) {
302
1.29k
        YB_LOG_EVERY_N(WARNING, 100000)
303
108
            << "Client writing with no deadline set, using 60 seconds.\n"
304
108
            << GetStackTrace();
305
1.29k
        timeout = MonoDelta::FromSeconds(60);
306
1.29k
      }
307
308
166k
      batcher_->SetDeadline(CoarseMonoClock::now() + timeout);
309
166k
    }
310
11.5M
  }
311
23.2M
  return *batcher_;
312
23.2M
}
313
314
23.2M
void YBSession::Apply(YBOperationPtr yb_op) {
315
23.2M
  Batcher().Add(yb_op);
316
23.2M
}
317
318
7.48k
Status YBSession::ApplyAndFlush(YBOperationPtr yb_op) {
319
7.48k
  Apply(std::move(yb_op));
320
321
7.48k
  return FlushFuture().get().status;
322
7.48k
}
323
324
301
bool YBSession::IsInProgress(YBOperationPtr yb_op) const {
325
301
  if (batcher_ && 
batcher_->Has(yb_op)299
) {
326
297
    return true;
327
297
  }
328
4
  std::lock_guard<simple_spinlock> l(lock_);
329
4
  for (const auto& b : flushed_batchers_) {
330
0
    if (b->Has(yb_op)) {
331
0
      return true;
332
0
    }
333
0
  }
334
4
  return false;
335
4
}
336
337
306
void YBSession::Apply(const std::vector<YBOperationPtr>& ops) {
338
306
  if (ops.empty()) {
339
5
    return;
340
5
  }
341
301
  auto& batcher = Batcher();
342
5.21k
  for (const auto& op : ops) {
343
5.21k
    batcher.Add(op);
344
5.21k
  }
345
301
}
346
347
306
Status YBSession::ApplyAndFlush(const std::vector<YBOperationPtr>& ops) {
348
306
  Apply(ops);
349
306
  return FlushFuture().get().status;
350
306
}
351
352
0
size_t YBSession::TEST_CountBufferedOperations() const {
353
0
  return batcher_ ? batcher_->CountBufferedOperations() : 0;
354
0
}
355
356
18.6M
bool YBSession::HasNotFlushedOperations() const {
357
18.6M
  return batcher_ != nullptr && 
batcher_->HasPendingOperations()9.16M
;
358
18.6M
}
359
360
0
bool YBSession::TEST_HasPendingOperations() const {
361
0
  if (batcher_ && batcher_->HasPendingOperations()) {
362
0
    return true;
363
0
  }
364
0
  std::lock_guard<simple_spinlock> l(lock_);
365
0
  for (const auto& b : flushed_batchers_) {
366
0
    if (b->HasPendingOperations()) {
367
0
      return true;
368
0
    }
369
0
  }
370
0
  return false;
371
0
}
372
373
9.27M
void YBSession::SetForceConsistentRead(ForceConsistentRead value) {
374
9.27M
  batcher_config_.force_consistent_read = value;
375
9.27M
  if (batcher_) {
376
5.09k
    batcher_->SetForceConsistentRead(value);
377
5.09k
  }
378
9.27M
}
379
380
1.17M
bool ShouldSessionRetryError(const Status& status) {
381
1.17M
  return IsRetryableClientError(status) ||
382
1.17M
         
tserver::TabletServerError(status) == tserver::TabletServerErrorPB::TABLET_SPLIT1.17M
||
383
1.17M
         
consensus::ConsensusError(status) == consensus::ConsensusErrorPB::TABLET_SPLIT1.17M
;
384
1.17M
}
385
386
} // namespace client
387
} // namespace yb