YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/client/table.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/client/table.h"
15
16
#include "yb/client/client.h"
17
#include "yb/client/table_info.h"
18
#include "yb/client/yb_op.h"
19
20
#include "yb/gutil/casts.h"
21
22
#include "yb/master/master_client.pb.h"
23
24
#include "yb/util/logging.h"
25
#include "yb/util/result.h"
26
#include "yb/util/shared_lock.h"
27
#include "yb/util/status_format.h"
28
#include "yb/util/unique_lock.h"
29
30
DEFINE_int32(
31
    max_num_tablets_for_table, 5000,
32
    "Max number of tablets that can be specified in a CREATE TABLE statement");
33
34
namespace yb {
35
namespace client {
36
37
152k
Result<YBTableType> PBToClientTableType(TableType table_type_from_pb) {
38
152k
  switch (table_type_from_pb) {
39
47.9k
    case TableType::YQL_TABLE_TYPE:
40
47.9k
      return YBTableType::YQL_TABLE_TYPE;
41
169
    case TableType::REDIS_TABLE_TYPE:
42
169
      return YBTableType::REDIS_TABLE_TYPE;
43
103k
    case TableType::PGSQL_TABLE_TYPE:
44
103k
      return YBTableType::PGSQL_TABLE_TYPE;
45
1
    case TableType::TRANSACTION_STATUS_TABLE_TYPE:
46
1
      return  YBTableType::TRANSACTION_STATUS_TABLE_TYPE;
47
0
  }
48
49
0
  return STATUS_FORMAT(
50
0
      InvalidArgument, "Invalid table type from master response: $0", table_type_from_pb);
51
0
}
52
53
3.15k
TableType ClientToPBTableType(YBTableType table_type) {
54
3.15k
  switch (table_type) {
55
1.71k
    case YBTableType::YQL_TABLE_TYPE:
56
1.71k
      return TableType::YQL_TABLE_TYPE;
57
0
    case YBTableType::REDIS_TABLE_TYPE:
58
0
      return TableType::REDIS_TABLE_TYPE;
59
1.44k
    case YBTableType::PGSQL_TABLE_TYPE:
60
1.44k
      return TableType::PGSQL_TABLE_TYPE;
61
0
    case YBTableType::TRANSACTION_STATUS_TABLE_TYPE:
62
0
      return TableType::TRANSACTION_STATUS_TABLE_TYPE;
63
0
    case YBTableType::UNKNOWN_TABLE_TYPE:
64
0
      break;
65
0
  }
66
0
  FATAL_INVALID_ENUM_VALUE(YBTableType, table_type);
67
  // Returns a dummy value to avoid compilation warning.
68
0
  return TableType::DEFAULT_TABLE_TYPE;
69
0
}
70
71
YBTable::YBTable(const YBTableInfo& info, VersionedTablePartitionListPtr partitions)
72
150k
    : info_(std::make_unique<YBTableInfo>(info)), partitions_(std::move(partitions)) {
73
150k
}
74
75
89.4k
YBTable::~YBTable() {
76
89.4k
}
77
78
//--------------------------------------------------------------------------------------------------
79
80
8.68M
const YBTableName& YBTable::name() const {
81
8.68M
  return info_->table_name;
82
8.68M
}
83
84
6.37M
YBTableType YBTable::table_type() const {
85
6.37M
  return info_->table_type;
86
6.37M
}
87
88
16.1M
const string& YBTable::id() const {
89
16.1M
  return info_->table_id;
90
16.1M
}
91
92
21.5M
const YBSchema& YBTable::schema() const {
93
21.5M
  return info_->schema;
94
21.5M
}
95
96
113M
const Schema& YBTable::InternalSchema() const {
97
113M
  return internal::GetSchema(info_->schema);
98
113M
}
99
100
3.53M
const IndexMap& YBTable::index_map() const {
101
3.53M
  return info_->index_map;
102
3.53M
}
103
104
478k
bool YBTable::IsIndex() const {
105
478k
  return info_->index_info != boost::none;
106
478k
}
107
108
9.54k
bool YBTable::IsUniqueIndex() const {
109
9.54k
  return info_->index_info.is_initialized() && info_->index_info->is_unique();
110
9.54k
}
111
112
1.34k
const IndexInfo& YBTable::index_info() const {
113
1.34k
  static IndexInfo kEmptyIndexInfo;
114
1.34k
  if (info_->index_info) {
115
1.34k
    return *info_->index_info;
116
1.34k
  }
117
0
  return kEmptyIndexInfo;
118
0
}
119
120
1.91k
bool YBTable::colocated() const {
121
1.91k
  return info_->colocated;
122
1.91k
}
123
124
6
const boost::optional<master::ReplicationInfoPB>& YBTable::replication_info() const {
125
6
  return info_->replication_info;
126
6
}
127
128
0
std::string YBTable::ToString() const {
129
0
  return Format(
130
0
      "$0 $1 IndexInfo: $2 IndexMap $3", (IsIndex() ? "Index Table" : "Normal Table"), id(),
131
0
      yb::ToString(index_info()), yb::ToString(index_map()));
132
0
}
133
134
23.6M
const PartitionSchema& YBTable::partition_schema() const {
135
23.6M
  return info_->partition_schema;
136
23.6M
}
137
138
0
bool YBTable::IsHashPartitioned() const {
139
  // TODO(neil) After fixing github #5832, "partition_schema" must be used here.
140
  // return info_.partition_schema.IsHashPartitioning();
141
0
  return info_->schema.num_hash_key_columns() > 0;
142
0
}
143
144
0
bool YBTable::IsRangePartitioned() const {
145
  // TODO(neil) After fixing github #5832, "partition_schema" must be used here.
146
  // return info_.partition_schema.IsRangePartitioning();
147
0
  return info_->schema.num_hash_key_columns() == 0;
148
0
}
149
150
69
std::shared_ptr<const TablePartitionList> YBTable::GetPartitionsShared() const {
151
69
  SharedLock<decltype(mutex_)> lock(mutex_);
152
69
  return std::shared_ptr<const TablePartitionList>(partitions_, &partitions_->keys);
153
69
}
154
155
11.1M
VersionedTablePartitionListPtr YBTable::GetVersionedPartitions() const {
156
11.1M
  SharedLock<decltype(mutex_)> lock(mutex_);
157
11.1M
  return partitions_;
158
11.1M
}
159
160
33
TablePartitionList YBTable::GetPartitionsCopy() const {
161
33
  TablePartitionList result;
162
163
33
  SharedLock<decltype(mutex_)> lock(mutex_);
164
33
  result.reserve(partitions_->keys.size());
165
297
  for (const auto& key : partitions_->keys) {
166
297
    result.push_back(key);
167
297
  }
168
33
  return result;
169
33
}
170
171
0
int32_t YBTable::GetPartitionCount() const {
172
0
  SharedLock<decltype(mutex_)> lock(mutex_);
173
0
  return narrow_cast<int32_t>(partitions_->keys.size());
174
0
}
175
176
0
int32_t YBTable::GetPartitionListVersion() const {
177
0
  SharedLock<decltype(mutex_)> lock(mutex_);
178
0
  return partitions_->version;
179
0
}
180
181
//--------------------------------------------------------------------------------------------------
182
183
48.0k
std::unique_ptr<YBqlWriteOp> YBTable::NewQLWrite() {
184
48.0k
  return std::unique_ptr<YBqlWriteOp>(new YBqlWriteOp(shared_from_this()));
185
48.0k
}
186
187
1.29M
std::unique_ptr<YBqlWriteOp> YBTable::NewQLInsert() {
188
1.29M
  return YBqlWriteOp::NewInsert(shared_from_this());
189
1.29M
}
190
191
3.44k
std::unique_ptr<YBqlWriteOp> YBTable::NewQLUpdate() {
192
3.44k
  return YBqlWriteOp::NewUpdate(shared_from_this());
193
3.44k
}
194
195
1.11k
std::unique_ptr<YBqlWriteOp> YBTable::NewQLDelete() {
196
1.11k
  return YBqlWriteOp::NewDelete(shared_from_this());
197
1.11k
}
198
199
3.89M
std::unique_ptr<YBqlReadOp> YBTable::NewQLSelect() {
200
3.89M
  return YBqlReadOp::NewSelect(shared_from_this());
201
3.89M
}
202
203
620
std::unique_ptr<YBqlReadOp> YBTable::NewQLRead() {
204
620
  return std::unique_ptr<YBqlReadOp>(new YBqlReadOp(shared_from_this()));
205
620
}
206
207
0
size_t YBTable::FindPartitionStartIndex(const PartitionKey& partition_key, size_t group_by) const {
208
0
  SharedLock<decltype(mutex_)> lock(mutex_);
209
0
  return client::FindPartitionStartIndex(partitions_->keys, partition_key, group_by);
210
0
}
211
212
PartitionKeyPtr YBTable::FindPartitionStart(
213
0
    const PartitionKey& partition_key, size_t group_by) const {
214
0
  SharedLock<decltype(mutex_)> lock(mutex_);
215
0
  size_t idx = FindPartitionStartIndex(partition_key, group_by);
216
0
  return std::shared_ptr<const std::string>(partitions_, &partitions_->keys[idx]);
217
0
}
218
219
12
void YBTable::InvokeRefreshPartitionsCallbacks(const Status& status) {
220
12
  std::vector<StdStatusCallback> callbacks;
221
12
  {
222
12
    UniqueLock<decltype(refresh_partitions_callbacks_mutex_)> lock(
223
12
        refresh_partitions_callbacks_mutex_);
224
12
    refresh_partitions_callbacks_.swap(callbacks);
225
12
  }
226
16.0k
  for (auto& callback : callbacks) {
227
16.0k
    callback(status);
228
16.0k
  }
229
12
}
230
231
16.0k
void YBTable::RefreshPartitions(YBClient* client, StdStatusCallback callback) {
232
16.0k
  UniqueLock<decltype(refresh_partitions_callbacks_mutex_)> lock(
233
16.0k
      refresh_partitions_callbacks_mutex_);
234
16.0k
  bool was_empty = refresh_partitions_callbacks_.empty();
235
16.0k
  refresh_partitions_callbacks_.emplace_back(std::move(callback));
236
16.0k
  if (!was_empty) {
237
0
    VLOG_WITH_FUNC(2) << Format(
238
0
        "FetchPartitions is in progress for table $0 ($1), added callback", info_->table_name,
239
0
        info_->table_id);
240
16.0k
    return;
241
16.0k
  }
242
243
12
  VLOG_WITH_FUNC(2) << Format(
244
0
      "Calling FetchPartitions for table $0 ($1)", info_->table_name, info_->table_id);
245
12
  FetchPartitions(client, info_->table_id, [this](const FetchPartitionsResult& result) {
246
12
    if (!result.ok()) {
247
0
      InvokeRefreshPartitionsCallbacks(result.status());
248
0
      return;
249
0
    }
250
12
    const auto& partitions = *result;
251
12
    {
252
12
      std::lock_guard<rw_spinlock> partitions_lock(mutex_);
253
12
      if (partitions->version < partitions_->version) {
254
        // This might happen if another split happens after we had fetched partition in the current
255
        // thread from master leader and partition list has been concurrently updated to version
256
        // newer than version we got in current thread.
257
        // In this case we can safely skip outdated partition list.
258
0
        LOG(INFO) << Format(
259
0
            "Received table $0 partition list version: $1, ours is newer: $2", id(),
260
0
            partitions->version, partitions_->version);
261
0
        return;
262
0
      }
263
12
      partitions_ = partitions;
264
12
      partitions_are_stale_ = false;
265
12
    }
266
12
    InvokeRefreshPartitionsCallbacks(Status::OK());
267
12
  });
268
12
}
269
270
533
void YBTable::MarkPartitionsAsStale() {
271
533
  partitions_are_stale_ = true;
272
533
}
273
274
11.0M
bool YBTable::ArePartitionsStale() const {
275
11.0M
  return partitions_are_stale_;
276
11.0M
}
277
278
void YBTable::FetchPartitions(
279
85.4k
    YBClient* client, const TableId& table_id, FetchPartitionsCallback callback) {
280
  // TODO: fetch the schema from the master here once catalog is available.
281
  // TODO(tsplit): consider optimizing this to not wait for all tablets to be running in case
282
  // of some tablet has been split and post-split tablets are not yet running.
283
85.4k
  client->GetTableLocations(
284
85.4k
      table_id, /* max_tablets = */ std::numeric_limits<int32_t>::max(),
285
85.4k
      RequireTabletsRunning::kTrue,
286
85.4k
      [table_id, callback = std::move(callback)]
287
85.5k
          (const Result<master::GetTableLocationsResponsePB*>& result) {
288
85.5k
        if (!result.ok()) {
289
43
          callback(result.status());
290
43
          return;
291
43
        }
292
85.4k
        const auto& resp = **result;
293
294
0
        VLOG_WITH_FUNC(2) << Format(
295
0
            "Fetched partitions for table $0, found $1 tablets",
296
0
            table_id, resp.tablet_locations_size());
297
298
85.4k
        auto partitions = std::make_shared<VersionedTablePartitionList>();
299
85.4k
        partitions->version = resp.partition_list_version();
300
85.4k
        partitions->keys.reserve(resp.tablet_locations().size());
301
132k
        for (const auto& tablet_location : resp.tablet_locations()) {
302
132k
          partitions->keys.push_back(tablet_location.partition().partition_key_start());
303
132k
        }
304
85.4k
        std::sort(partitions->keys.begin(), partitions->keys.end());
305
306
85.4k
        callback(partitions);
307
85.4k
      });
308
85.4k
}
309
310
//--------------------------------------------------------------------------------------------------
311
312
size_t FindPartitionStartIndex(const TablePartitionList& partitions,
313
                               const PartitionKey& partition_key,
314
23.0M
                               size_t group_by) {
315
23.0M
  auto it = std::lower_bound(partitions.begin(), partitions.end(), partition_key);
316
23.0M
  if (it == partitions.end() || *it > partition_key) {
317
18.4E
    DCHECK(it != partitions.begin()) << "Could not find partition start while looking for "
318
18.4E
        << partition_key << " in " << yb::ToString(partitions);
319
11.4M
    --it;
320
11.4M
  }
321
22.9M
  return group_by <= 1 ? it - partitions.begin() :
322
60.6k
                         (it - partitions.begin()) / group_by * group_by;
323
23.0M
}
324
325
PartitionKeyPtr FindPartitionStart(
326
    const VersionedTablePartitionListPtr& versioned_partitions, const PartitionKey& partition_key,
327
22.2M
    size_t group_by) {
328
22.2M
  const auto idx = FindPartitionStartIndex(versioned_partitions->keys, partition_key, group_by);
329
22.2M
  return PartitionKeyPtr(versioned_partitions, &versioned_partitions->keys[idx]);
330
22.2M
}
331
332
0
std::string VersionedTablePartitionList::ToString() const {
333
0
  auto key_transform = [](const Slice& key) {
334
0
    return key.ToDebugHexString();
335
0
  };
336
0
  return Format("{ version: $0 keys: $1 }", version, CollectionToString(keys, key_transform));
337
0
}
338
339
} // namespace client
340
} // namespace yb