YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/client/session.h
Line
Count
Source
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#ifndef YB_CLIENT_SESSION_H
15
#define YB_CLIENT_SESSION_H
16
17
#include <future>
18
#include <unordered_set>
19
20
#include "yb/client/client_fwd.h"
21
22
#include "yb/common/common_fwd.h"
23
24
#include "yb/gutil/ref_counted.h"
25
26
#include "yb/util/locks.h"
27
#include "yb/util/monotime.h"
28
29
namespace yb {
30
31
class ConsistentReadPoint;
32
33
struct ReadHybridTime;
34
35
namespace client {
36
37
namespace internal {
38
class Batcher;
39
class ErrorCollector;
40
} // internal
41
42
YB_STRONGLY_TYPED_BOOL(Restart);
43
44
struct NODISCARD_CLASS FlushStatus {
45
  Status status = Status::OK();
46
  // Contains more detailed per-operation list of errors if status is not OK.
47
  CollectedErrors errors;
48
};
49
50
// A YBSession belongs to a specific YBClient, and represents a context in
51
// which all read/write data access should take place. Within a session,
52
// multiple operations may be accumulated and batched together for better
53
// efficiency. Settings like timeouts, priorities, and trace IDs are also set
54
// per session.
55
//
56
// A YBSession's main purpose is for grouping together multiple data-access
57
// operations together into batches or transactions. It is important to note
58
// the distinction between these two:
59
//
60
// * A batch is a set of operations which are grouped together in order to
61
//   amortize fixed costs such as RPC call overhead and round trip times.
62
//   A batch DOES NOT imply any ACID-like guarantees. Within a batch, some
63
//   operations may succeed while others fail, and concurrent readers may see
64
//   partial results. If the client crashes mid-batch, it is possible that some
65
//   of the operations will be made durable while others were lost.
66
//
67
// * In contrast, a transaction is a set of operations which are treated as an
68
//   indivisible semantic unit, per the usual definitions of database transactions
69
//   and isolation levels.
70
//
71
// YBSession is separate from YBTable because a given batch or transaction
72
// may span multiple tables. Even in the context of batching and not multi-table
73
// ACID transactions, we may be able to coalesce writes to different tables hosted
74
// on the same server into the same RPC.
75
//
76
// YBSession is separate from YBClient because, in a multi-threaded
77
// application, different threads may need to concurrently execute
78
// transactions. Similar to a JDBC "session", transaction boundaries will be
79
// delineated on a per-session basis -- in between a "BeginTransaction" and
80
// "Commit" call on a given session, all operations will be part of the same
81
// transaction. Meanwhile another concurrent Session object can safely run
82
// non-transactional work or other transactions without interfering.
83
//
84
// Additionally, there is a guarantee that writes from different sessions do not
85
// get batched together into the same RPCs -- this means that latency-sensitive
86
// clients can run through the same YBClient object as throughput-oriented
87
// clients, perhaps by setting the latency-sensitive session's timeouts low and
88
// priorities high. Without the separation of batches, a latency-sensitive
89
// single-row insert might get batched along with 10MB worth of inserts from the
90
// batch writer, thus delaying the response significantly.
91
//
92
// Users who are familiar with the Hibernate ORM framework should find this
93
// concept of a Session familiar.
94
//
95
// This class is not thread-safe.
96
class YBSession : public std::enable_shared_from_this<YBSession> {
97
 public:
98
  explicit YBSession(YBClient* client, const scoped_refptr<ClockBase>& clock = nullptr);
99
100
  ~YBSession();
101
102
  // Set the consistent read point used by the non-transactional operations in this session. If the
103
  // operations are restarted and last read point indicates the operations do need to be restarted,
104
  // the read point will be updated to restart read-time. Otherwise, the read point will be set to
105
  // the current time.
106
  void SetReadPoint(Restart restart);
107
108
  void SetReadPoint(const ReadHybridTime& read_time);
109
110
  // Returns true if our current read point requires restart.
111
  bool IsRestartRequired();
112
113
  // Defer the read hybrid time to the global limit.  Since the global limit should not change for a
114
  // session, this call is idempotent.
115
  void DeferReadPoint();
116
117
  // Changes transaction used by this session.
118
  void SetTransaction(YBTransactionPtr transaction);
119
120
  // Set the timeout for writes made in this session.
121
  void SetTimeout(MonoDelta delta);
122
123
  void SetDeadline(CoarseTimePoint deadline);
124
125
  CHECKED_STATUS ReadSync(std::shared_ptr<YBOperation> yb_op);
126
127
  // TODO: add "doAs" ability here for proxy servers to be able to act on behalf of
128
  // other users, assuming access rights.
129
130
  // Apply the write operation.
131
  //
132
  // Applied operations just added to the session and waits to be flushed.
133
  void Apply(YBOperationPtr yb_op);
134
  CHECKED_STATUS ApplyAndFlush(YBOperationPtr yb_op);
135
136
  bool IsInProgress(YBOperationPtr yb_op) const;
137
138
  void Apply(const std::vector<YBOperationPtr>& ops);
139
  CHECKED_STATUS ApplyAndFlush(const std::vector<YBOperationPtr>& ops);
140
141
  // Flush any pending writes.
142
  //
143
  // Returns a bad status if session failed to resolve tablets for at least some operations or
144
  // if there are any pending errors after operations have been flushed.
145
  // FlushAndGetOpsErrors could be used instead of Flush to get info about which specific
146
  // operations failed.
147
  //
148
  // Async version invokes callback as soon as all operations have been flushed and passes
149
  // general status and which specific operations failed.
150
  //
151
  // In the case that the async version of this method is used, then the callback
152
  // will be called upon completion of the operations which were buffered since the
153
  // last flush. In other words, in the following sequence:
154
  //
155
  //    session->Apply(a);
156
  //    session->FlushAsync(callback_1);
157
  //    session->Apply(b);
158
  //    session->FlushAsync(callback_2);
159
  //
160
  // ... 'callback_2' will be triggered once 'b' has been inserted, regardless of whether
161
  // 'a' has completed or not.
162
  //
163
  // Note that this also means that, if FlushAsync is called twice in succession, with
164
  // no intervening operations, the second flush will return immediately. For example:
165
  //
166
  //    session->Insert(a);
167
  //    session->FlushAsync(callback_1); // called when 'a' is inserted
168
  //    session->FlushAsync(callback_2); // called immediately!
169
  //
170
  // Note that, as in all other async functions in YB, the callback may be called
171
  // either from an IO thread or the same thread which calls FlushAsync. The callback
172
  // should not block.
173
  //
174
  // For FlushAsync, 'callback' must remain valid until it is invoked.
175
  void FlushAsync(FlushCallback callback);
176
  std::future<FlushStatus> FlushFuture();
177
  CHECKED_STATUS Flush();
178
  FlushStatus FlushAndGetOpsErrors();
179
180
  // Abort the unflushed or in-flight operations in the session.
181
  void Abort();
182
183
  // Close the session.
184
  // Returns Status::IllegalState() if 'force' is false and there are still pending
185
  // operations. If 'force' is true batcher_ is aborted even if there are pending
186
  // operations.
187
  CHECKED_STATUS Close(bool force = false);
188
189
  // Return true if there are operations which have not yet been delivered to the
190
  // cluster. This may include buffered operations (i.e those that have not yet been
191
  // flushed) as well as in-flight operations (i.e those that are in the process of
192
  // being sent to the servers).
193
  // TODO: maybe "incomplete" or "undelivered" is clearer?
194
  bool TEST_HasPendingOperations() const;
195
196
  // Return the number of buffered operations. These are operations that have
197
  // not yet been flushed - i.e they are not en-route yet.
198
  //
199
  // Note that this is different than TEST_HasPendingOperations() above, which includes
200
  // operations which have been sent and not yet responded to.
201
  size_t TEST_CountBufferedOperations() const;
202
203
  // Returns true if this session has not flushed operations.
204
  bool HasNotFlushedOperations() const;
205
206
  // Allow local calls to run in the current thread.
207
  void set_allow_local_calls_in_curr_thread(bool flag);
208
  bool allow_local_calls_in_curr_thread() const;
209
210
  // Sets in transaction read limit for this session.
211
  void SetInTxnLimit(HybridTime value);
212
213
  YBClient* client() const;
214
215
  // Sets force consistent read mode, if true then consistent read point will be used even we have
216
  // only one command to flush.
217
  // It is useful when whole statement is executed using multiple flushes.
218
  void SetForceConsistentRead(ForceConsistentRead value);
219
220
5.76M
  const internal::AsyncRpcMetricsPtr& async_rpc_metrics() const {
221
5.76M
    return async_rpc_metrics_;
222
5.76M
  }
223
224
  // Called by Batcher when a flush has started/finished.
225
  void FlushStarted(internal::BatcherPtr batcher);
226
  void FlushFinished(internal::BatcherPtr batcher);
227
228
  ConsistentReadPoint* read_point();
229
230
  void SetRejectionScoreSource(RejectionScoreSourcePtr rejection_score_source);
231
232
  struct BatcherConfig {
233
    std::weak_ptr<YBSession> session;
234
    client::YBClient* client;
235
    YBTransactionPtr transaction;
236
    std::shared_ptr<ConsistentReadPoint> non_transactional_read_point;
237
    bool allow_local_calls_in_curr_thread = true;
238
    bool force_consistent_read = false;
239
    RejectionScoreSourcePtr rejection_score_source;
240
241
    ConsistentReadPoint* read_point() const;
242
  };
243
244
 private:
245
  friend class YBClient;
246
  friend class internal::Batcher;
247
248
  internal::Batcher& Batcher();
249
250
  BatcherConfig batcher_config_;
251
252
  // Lock protecting flushed_batchers_.
253
  mutable simple_spinlock lock_;
254
255
  // The current batcher being prepared.
256
  internal::BatcherPtr batcher_;
257
258
  // Any batchers which have been flushed but not yet finished.
259
  //
260
  // Upon a batch finishing, it will call FlushFinished(), which removes the batcher from
261
  // this set. This set does not hold any reference count to the Batcher, since, while
262
  // the flush is active, the batcher manages its own refcount. The Batcher will always
263
  // call FlushFinished() before it destructs itself, so we're guaranteed that these
264
  // pointers stay valid.
265
  std::unordered_set<internal::BatcherPtr> flushed_batchers_;
266
267
  // Session only one of deadline and timeout could be active.
268
  // When new batcher is created its deadline is set as session deadline or
269
  // current time + session timeout.
270
  CoarseTimePoint deadline_;
271
  MonoDelta timeout_;
272
273
  internal::AsyncRpcMetricsPtr async_rpc_metrics_;
274
275
  DISALLOW_COPY_AND_ASSIGN(YBSession);
276
};
277
278
// In case of tablet splitting YBSession can flush an operation to an outdated tablet and this can
279
// be retried by the session internally without returning error to upper layers.
280
bool ShouldSessionRetryError(const Status& status);
281
282
} // namespace client
283
} // namespace yb
284
285
#endif // YB_CLIENT_SESSION_H