YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/client/transaction_manager.cc
Line
Count
Source (jump to first uncovered line)
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#include "yb/client/transaction_manager.h"
17
18
#include "yb/client/client.h"
19
#include "yb/client/meta_cache.h"
20
#include "yb/client/table.h"
21
#include "yb/client/yb_table_name.h"
22
23
#include "yb/master/catalog_manager.h"
24
25
#include "yb/rpc/tasks_pool.h"
26
27
#include "yb/server/server_base_options.h"
28
29
#include "yb/util/format.h"
30
#include "yb/util/status_format.h"
31
#include "yb/util/status_log.h"
32
#include "yb/util/string_util.h"
33
#include "yb/util/thread_restrictions.h"
34
35
DEFINE_uint64(transaction_manager_workers_limit, 50,
36
              "Max number of workers used by transaction manager");
37
38
DEFINE_uint64(transaction_manager_queue_limit, 500,
39
              "Max number of tasks used by transaction manager");
40
41
namespace yb {
42
namespace client {
43
44
namespace {
45
46
// Cache of tablet ids of the global transaction table and any transaction tables with
47
// the same placement.
48
class TransactionTableState {
49
 public:
50
  explicit TransactionTableState(LocalTabletFilter local_tablet_filter)
51
9.81k
      : local_tablet_filter_(local_tablet_filter) {
52
9.81k
  }
53
54
  void InvokeCallback(const PickStatusTabletCallback& callback,
55
411k
                      TransactionLocality locality) EXCLUDES(mutex_) {
56
411k
    SharedLock<yb::RWMutex> lock(mutex_);
57
411k
    const auto& tablets = PickTabletList(locality);
58
411k
    if (tablets.empty()) {
59
0
      callback(STATUS_FORMAT(
60
0
          IllegalState, "No $0 transaction tablets found", TransactionLocality_Name(locality)));
61
0
      return;
62
0
    }
63
411k
    if (PickStatusTabletId(tablets, callback)) {
64
399k
      return;
65
399k
    }
66
11.6k
    YB_LOG_EVERY_N_SECS
(WARNING, 1) << "No placement local transaction status tablet found"101
;
67
11.6k
    callback(RandomElement(tablets));
68
11.6k
  }
69
70
411k
  bool IsInitialized() {
71
411k
    return initialized_.load();
72
411k
  }
73
74
  void UpdateStatusTablets(uint64_t new_version,
75
3.26k
                           TransactionStatusTablets&& tablets) EXCLUDES(mutex_) {
76
3.26k
    std::lock_guard<yb::RWMutex> lock(mutex_);
77
3.26k
    if (!initialized_.load() || 
status_tablets_version_ < new_version2.25k
) {
78
2.09k
      tablets_ = std::move(tablets);
79
2.09k
      has_placement_local_tablets_.store(!tablets_.placement_local_tablets.empty());
80
2.09k
      status_tablets_version_ = new_version;
81
2.09k
      initialized_.store(true);
82
2.09k
    }
83
3.26k
  }
84
85
266k
  bool HasAnyPlacementLocalStatusTablets() {
86
266k
    return has_placement_local_tablets_.load();
87
266k
  }
88
89
88.5k
  uint64_t GetStatusTabletsVersion() EXCLUDES(mutex_) {
90
88.5k
    std::lock_guard<yb::RWMutex> lock(mutex_);
91
88.5k
    return status_tablets_version_;
92
88.5k
  }
93
94
 private:
95
  // Picks a status tablet id from 'tablets' filtered by 'filter'. Returns true if a
96
  // tablet id was picked successfully, and false if there were no applicable tablet ids.
97
  bool PickStatusTabletId(const std::vector<TabletId>& tablets,
98
411k
                          const PickStatusTabletCallback& callback) REQUIRES_SHARED(mutex_) {
99
411k
    if (tablets.empty()) {
100
0
      return false;
101
0
    }
102
411k
    if (local_tablet_filter_) {
103
411k
      std::vector<const TabletId*> ids;
104
411k
      ids.reserve(tablets.size());
105
8.48M
      for (const auto& id : tablets) {
106
8.48M
        ids.push_back(&id);
107
8.48M
      }
108
411k
      local_tablet_filter_(&ids);
109
411k
      if (!ids.empty()) {
110
399k
        callback(*RandomElement(ids));
111
399k
        return true;
112
399k
      }
113
11.2k
      return false;
114
411k
    }
115
17
    callback(RandomElement(tablets));
116
17
    return true;
117
411k
  }
118
119
  const std::vector<TabletId>& PickTabletList(TransactionLocality locality)
120
411k
      REQUIRES_SHARED(mutex_) {
121
411k
    if (tablets_.placement_local_tablets.empty()) {
122
410k
      return tablets_.global_tablets;
123
410k
    }
124
264
    switch (locality) {
125
239
      case TransactionLocality::GLOBAL:
126
239
        return tablets_.global_tablets;
127
25
      case TransactionLocality::LOCAL:
128
25
        return tablets_.placement_local_tablets;
129
264
    }
130
0
    FATAL_INVALID_ENUM_VALUE(TransactionLocality, locality);
131
0
  }
132
133
  LocalTabletFilter local_tablet_filter_;
134
135
  // Set to true once transaction tablets have been loaded at least once. global_tablets
136
  // is assumed to have at least one entry in it if this is true.
137
  std::atomic<bool> initialized_{false};
138
139
  // Set to true if there are any placement local transaction tablets.
140
  std::atomic<bool> has_placement_local_tablets_{false};
141
142
  // Locks the version/tablet lists. A read lock is acquired when picking
143
  // tablets, and a write lock is acquired when updating tablet lists.
144
  RWMutex mutex_;
145
146
  uint64_t status_tablets_version_ GUARDED_BY(mutex_) = 0;
147
148
  TransactionStatusTablets tablets_ GUARDED_BY(mutex_);
149
};
150
151
// Loads transaction tablets list to cache.
152
class LoadStatusTabletsTask {
153
 public:
154
  LoadStatusTabletsTask(YBClient* client,
155
                        TransactionTableState* table_state,
156
                        uint64_t version,
157
                        PickStatusTabletCallback callback = PickStatusTabletCallback(),
158
                        TransactionLocality locality = TransactionLocality::GLOBAL)
159
      : client_(client), table_state_(table_state), version_(version), callback_(callback),
160
3.36k
        locality_(locality) {
161
3.36k
  }
162
163
3.36k
  void Run() {
164
    // TODO(dtxn) async
165
3.36k
    auto tablets = GetTransactionStatusTablets();
166
3.36k
    if (!tablets.ok()) {
167
0
      YB_LOG_EVERY_N_SECS(ERROR, 1) << "Failed to get tablets of txn status tables: "
168
0
                                    << tablets.status();
169
0
      if (callback_) {
170
0
        callback_(tablets.status());
171
0
      }
172
0
      return;
173
0
    }
174
175
3.36k
    table_state_->UpdateStatusTablets(version_, std::move(*tablets));
176
177
3.36k
    if (callback_) {
178
2.13k
      table_state_->InvokeCallback(callback_, locality_);
179
2.13k
    }
180
3.36k
  }
181
182
3.31k
  void Done(const Status& status) {
183
3.31k
    if (!status.ok()) {
184
0
      callback_(status);
185
0
    }
186
3.31k
    callback_ = PickStatusTabletCallback();
187
3.31k
    client_ = nullptr;
188
3.31k
  }
189
190
 private:
191
3.36k
  Result<TransactionStatusTablets> GetTransactionStatusTablets() {
192
3.36k
    CloudInfoPB this_pb = yb::server::GetPlacementFromGFlags();
193
3.36k
    return client_->GetTransactionStatusTablets(this_pb);
194
3.36k
  }
195
196
  YBClient* client_;
197
  TransactionTableState* table_state_;
198
  uint64_t version_;
199
  PickStatusTabletCallback callback_;
200
  TransactionLocality locality_;
201
};
202
203
class InvokeCallbackTask {
204
 public:
205
  InvokeCallbackTask(TransactionTableState* table_state,
206
                     PickStatusTabletCallback callback,
207
                     TransactionLocality locality)
208
0
      : table_state_(table_state), callback_(std::move(callback)), locality_(locality) {
209
0
  }
210
211
0
  void Run() {
212
0
    table_state_->InvokeCallback(callback_, locality_);
213
0
  }
214
215
0
  void Done(const Status& status) {
216
0
    if (!status.ok()) {
217
0
      callback_(status);
218
0
    }
219
0
    callback_ = PickStatusTabletCallback();
220
0
  }
221
222
 private:
223
  TransactionTableState* table_state_;
224
  PickStatusTabletCallback callback_;
225
  TransactionLocality locality_;
226
};
227
} // namespace
228
229
class TransactionManager::Impl {
230
 public:
231
  explicit Impl(YBClient* client, const scoped_refptr<ClockBase>& clock,
232
                LocalTabletFilter local_tablet_filter)
233
      : client_(client),
234
        clock_(clock),
235
        table_state_{std::move(local_tablet_filter)},
236
        thread_pool_(
237
            "TransactionManager", FLAGS_transaction_manager_queue_limit,
238
            FLAGS_transaction_manager_workers_limit),
239
        tasks_pool_(FLAGS_transaction_manager_queue_limit),
240
9.81k
        invoke_callback_tasks_(FLAGS_transaction_manager_queue_limit) {
241
9.81k
    CHECK(clock);
242
9.81k
  }
243
244
6.71k
  ~Impl() {
245
6.71k
    Shutdown();
246
6.71k
  }
247
248
88.5k
  void UpdateTransactionTablesVersion(uint64_t version) {
249
88.5k
    if (table_state_.GetStatusTabletsVersion() >= version) {
250
87.3k
      return;
251
87.3k
    }
252
253
1.23k
    if (!tasks_pool_.Enqueue(&thread_pool_, client_, &table_state_, version)) {
254
0
      YB_LOG_EVERY_N_SECS(ERROR, 1) << "Update tasks overflow, number of tasks: "
255
0
                                    << tasks_pool_.size();
256
0
    }
257
1.23k
  }
258
259
410k
  void PickStatusTablet(PickStatusTabletCallback callback, TransactionLocality locality) {
260
410k
    if (table_state_.IsInitialized()) {
261
408k
      if (
ThreadRestrictions::IsWaitAllowed()408k
) {
262
408k
        table_state_.InvokeCallback(callback, locality);
263
18.4E
      } else if (!invoke_callback_tasks_.Enqueue(
264
18.4E
            &thread_pool_, &table_state_, callback, locality)) {
265
0
        callback(STATUS_FORMAT(ServiceUnavailable,
266
0
                               "Invoke callback queue overflow, number of tasks: $0",
267
0
                               invoke_callback_tasks_.size()));
268
0
      }
269
408k
      return;
270
408k
    }
271
272
2.13k
    if (!tasks_pool_.Enqueue(
273
2.13k
        &thread_pool_, client_, &table_state_, 0 /* version */, callback,
274
2.13k
        locality)) {
275
0
      callback(STATUS_FORMAT(ServiceUnavailable, "Tasks overflow, exists: $0", tasks_pool_.size()));
276
0
    }
277
2.13k
  }
278
279
633k
  const scoped_refptr<ClockBase>& clock() const {
280
633k
    return clock_;
281
633k
  }
282
283
1.95M
  YBClient* client() const {
284
1.95M
    return client_;
285
1.95M
  }
286
287
3.43M
  rpc::Rpcs& rpcs() {
288
3.43M
    return rpcs_;
289
3.43M
  }
290
291
876k
  HybridTime Now() const {
292
876k
    return clock_->Now();
293
876k
  }
294
295
0
  HybridTimeRange NowRange() const {
296
0
    return clock_->NowRange();
297
0
  }
298
299
866k
  void UpdateClock(HybridTime time) {
300
866k
    clock_->Update(time);
301
866k
  }
302
303
6.71k
  void Shutdown() {
304
6.71k
    rpcs_.Shutdown();
305
6.71k
    thread_pool_.Shutdown();
306
6.71k
  }
307
308
266k
  bool PlacementLocalTransactionsPossible() {
309
266k
    return table_state_.HasAnyPlacementLocalStatusTablets();
310
266k
  }
311
312
0
  uint64_t GetLoadedStatusTabletsVersion() {
313
0
    return table_state_.GetStatusTabletsVersion();
314
0
  }
315
316
 private:
317
  YBClient* const client_;
318
  scoped_refptr<ClockBase> clock_;
319
  TransactionTableState table_state_;
320
  std::atomic<bool> closed_{false};
321
322
  yb::rpc::ThreadPool thread_pool_; // TODO async operations instead of pool
323
  yb::rpc::TasksPool<LoadStatusTabletsTask> tasks_pool_;
324
  yb::rpc::TasksPool<InvokeCallbackTask> invoke_callback_tasks_;
325
  yb::rpc::Rpcs rpcs_;
326
};
327
328
TransactionManager::TransactionManager(
329
    YBClient* client, const scoped_refptr<ClockBase>& clock,
330
    LocalTabletFilter local_tablet_filter)
331
9.79k
    : impl_(new Impl(client, clock, std::move(local_tablet_filter))) {}
332
333
6.71k
TransactionManager::~TransactionManager() = default;
334
335
88.5k
void TransactionManager::UpdateTransactionTablesVersion(uint64_t version) {
336
88.5k
  impl_->UpdateTransactionTablesVersion(version);
337
88.5k
}
338
339
void TransactionManager::PickStatusTablet(
340
411k
    PickStatusTabletCallback callback, TransactionLocality locality) {
341
411k
  impl_->PickStatusTablet(std::move(callback), locality);
342
411k
}
343
344
1.95M
YBClient* TransactionManager::client() const {
345
1.95M
  return impl_->client();
346
1.95M
}
347
348
3.43M
rpc::Rpcs& TransactionManager::rpcs() {
349
3.43M
  return impl_->rpcs();
350
3.43M
}
351
352
633k
const scoped_refptr<ClockBase>& TransactionManager::clock() const {
353
633k
  return impl_->clock();
354
633k
}
355
356
876k
HybridTime TransactionManager::Now() const {
357
876k
  return impl_->Now();
358
876k
}
359
360
0
HybridTimeRange TransactionManager::NowRange() const {
361
0
  return impl_->NowRange();
362
0
}
363
364
866k
void TransactionManager::UpdateClock(HybridTime time) {
365
866k
  impl_->UpdateClock(time);
366
866k
}
367
368
266k
bool TransactionManager::PlacementLocalTransactionsPossible() {
369
266k
  return impl_->PlacementLocalTransactionsPossible();
370
266k
}
371
372
0
uint64_t TransactionManager::GetLoadedStatusTabletsVersion() {
373
0
  return impl_->GetLoadedStatusTabletsVersion();
374
0
}
375
376
0
TransactionManager::TransactionManager(TransactionManager&& rhs) = default;
377
0
TransactionManager& TransactionManager::operator=(TransactionManager&& rhs) = default;
378
379
} // namespace client
380
} // namespace yb