/Users/deen/code/yugabyte-db/src/yb/client/table.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/client/table.h" |
15 | | |
16 | | #include "yb/client/client.h" |
17 | | #include "yb/client/table_info.h" |
18 | | #include "yb/client/yb_op.h" |
19 | | |
20 | | #include "yb/gutil/casts.h" |
21 | | |
22 | | #include "yb/master/master_client.pb.h" |
23 | | |
24 | | #include "yb/util/logging.h" |
25 | | #include "yb/util/result.h" |
26 | | #include "yb/util/shared_lock.h" |
27 | | #include "yb/util/status_format.h" |
28 | | #include "yb/util/unique_lock.h" |
29 | | |
30 | | DEFINE_int32( |
31 | | max_num_tablets_for_table, 5000, |
32 | | "Max number of tablets that can be specified in a CREATE TABLE statement"); |
33 | | |
34 | | namespace yb { |
35 | | namespace client { |
36 | | |
37 | 140k | Result<YBTableType> PBToClientTableType(TableType table_type_from_pb) { |
38 | 140k | switch (table_type_from_pb) { |
39 | 49.5k | case TableType::YQL_TABLE_TYPE: |
40 | 49.5k | return YBTableType::YQL_TABLE_TYPE; |
41 | 700 | case TableType::REDIS_TABLE_TYPE: |
42 | 700 | return YBTableType::REDIS_TABLE_TYPE; |
43 | 90.4k | case TableType::PGSQL_TABLE_TYPE: |
44 | 90.4k | return YBTableType::PGSQL_TABLE_TYPE; |
45 | 1 | case TableType::TRANSACTION_STATUS_TABLE_TYPE: |
46 | 1 | return YBTableType::TRANSACTION_STATUS_TABLE_TYPE; |
47 | 140k | } |
48 | | |
49 | 0 | return STATUS_FORMAT( |
50 | 140k | InvalidArgument, "Invalid table type from master response: $0", table_type_from_pb); |
51 | 140k | } |
52 | | |
53 | 6.95k | TableType ClientToPBTableType(YBTableType table_type) { |
54 | 6.95k | switch (table_type) { |
55 | 1.79k | case YBTableType::YQL_TABLE_TYPE: |
56 | 1.79k | return TableType::YQL_TABLE_TYPE; |
57 | 13 | case YBTableType::REDIS_TABLE_TYPE: |
58 | 13 | return TableType::REDIS_TABLE_TYPE; |
59 | 5.14k | case YBTableType::PGSQL_TABLE_TYPE: |
60 | 5.14k | return TableType::PGSQL_TABLE_TYPE; |
61 | 0 | case YBTableType::TRANSACTION_STATUS_TABLE_TYPE: |
62 | 0 | return TableType::TRANSACTION_STATUS_TABLE_TYPE; |
63 | 0 | case YBTableType::UNKNOWN_TABLE_TYPE: |
64 | 0 | break; |
65 | 6.95k | } |
66 | 0 | FATAL_INVALID_ENUM_VALUE(YBTableType, table_type); |
67 | | // Returns a dummy value to avoid compilation warning. |
68 | 0 | return TableType::DEFAULT_TABLE_TYPE; |
69 | 6.95k | } |
70 | | |
71 | | YBTable::YBTable(const YBTableInfo& info, VersionedTablePartitionListPtr partitions) |
72 | 134k | : info_(std::make_unique<YBTableInfo>(info)), partitions_(std::move(partitions)) { |
73 | 134k | } |
74 | | |
75 | 50.6k | YBTable::~YBTable() { |
76 | 50.6k | } |
77 | | |
78 | | //-------------------------------------------------------------------------------------------------- |
79 | | |
80 | 17.1M | const YBTableName& YBTable::name() const { |
81 | 17.1M | return info_->table_name; |
82 | 17.1M | } |
83 | | |
84 | 12.5M | YBTableType YBTable::table_type() const { |
85 | 12.5M | return info_->table_type; |
86 | 12.5M | } |
87 | | |
88 | 35.3M | const string& YBTable::id() const { |
89 | 35.3M | return info_->table_id; |
90 | 35.3M | } |
91 | | |
92 | 30.0M | const YBSchema& YBTable::schema() const { |
93 | 30.0M | return info_->schema; |
94 | 30.0M | } |
95 | | |
96 | 3.18M | const Schema& YBTable::InternalSchema() const { |
97 | 3.18M | return internal::GetSchema(info_->schema); |
98 | 3.18M | } |
99 | | |
100 | 4.99M | const IndexMap& YBTable::index_map() const { |
101 | 4.99M | return info_->index_map; |
102 | 4.99M | } |
103 | | |
104 | 485k | bool YBTable::IsIndex() const { |
105 | 485k | return info_->index_info != boost::none; |
106 | 485k | } |
107 | | |
108 | 4.91k | bool YBTable::IsUniqueIndex() const { |
109 | 4.91k | return info_->index_info.is_initialized() && info_->index_info->is_unique(); |
110 | 4.91k | } |
111 | | |
112 | 1.40k | const IndexInfo& YBTable::index_info() const { |
113 | 1.40k | static IndexInfo kEmptyIndexInfo; |
114 | 1.40k | if (info_->index_info) { |
115 | 1.40k | return *info_->index_info; |
116 | 1.40k | } |
117 | 1 | return kEmptyIndexInfo; |
118 | 1.40k | } |
119 | | |
120 | 0 | bool YBTable::colocated() const { |
121 | 0 | return info_->colocated; |
122 | 0 | } |
123 | | |
124 | 6 | const boost::optional<master::ReplicationInfoPB>& YBTable::replication_info() const { |
125 | 6 | return info_->replication_info; |
126 | 6 | } |
127 | | |
128 | 1 | std::string YBTable::ToString() const { |
129 | 1 | return Format( |
130 | 1 | "$0 $1 IndexInfo: $2 IndexMap $3", (IsIndex() ? "Index Table"0 : "Normal Table"), id(), |
131 | 1 | yb::ToString(index_info()), yb::ToString(index_map())); |
132 | 1 | } |
133 | | |
134 | 35.0M | const PartitionSchema& YBTable::partition_schema() const { |
135 | 35.0M | return info_->partition_schema; |
136 | 35.0M | } |
137 | | |
138 | 0 | bool YBTable::IsHashPartitioned() const { |
139 | | // TODO(neil) After fixing github #5832, "partition_schema" must be used here. |
140 | | // return info_.partition_schema.IsHashPartitioning(); |
141 | 0 | return info_->schema.num_hash_key_columns() > 0; |
142 | 0 | } |
143 | | |
144 | 0 | bool YBTable::IsRangePartitioned() const { |
145 | | // TODO(neil) After fixing github #5832, "partition_schema" must be used here. |
146 | | // return info_.partition_schema.IsRangePartitioning(); |
147 | 0 | return info_->schema.num_hash_key_columns() == 0; |
148 | 0 | } |
149 | | |
150 | 3.23k | std::shared_ptr<const TablePartitionList> YBTable::GetPartitionsShared() const { |
151 | 3.23k | SharedLock<decltype(mutex_)> lock(mutex_); |
152 | 3.23k | return std::shared_ptr<const TablePartitionList>(partitions_, &partitions_->keys); |
153 | 3.23k | } |
154 | | |
155 | 23.3M | VersionedTablePartitionListPtr YBTable::GetVersionedPartitions() const { |
156 | 23.3M | SharedLock<decltype(mutex_)> lock(mutex_); |
157 | 23.3M | return partitions_; |
158 | 23.3M | } |
159 | | |
160 | 70 | TablePartitionList YBTable::GetPartitionsCopy() const { |
161 | 70 | TablePartitionList result; |
162 | | |
163 | 70 | SharedLock<decltype(mutex_)> lock(mutex_); |
164 | 70 | result.reserve(partitions_->keys.size()); |
165 | 630 | for (const auto& key : partitions_->keys) { |
166 | 630 | result.push_back(key); |
167 | 630 | } |
168 | 70 | return result; |
169 | 70 | } |
170 | | |
171 | 0 | int32_t YBTable::GetPartitionCount() const { |
172 | 0 | SharedLock<decltype(mutex_)> lock(mutex_); |
173 | 0 | return narrow_cast<int32_t>(partitions_->keys.size()); |
174 | 0 | } |
175 | | |
176 | 0 | int32_t YBTable::GetPartitionListVersion() const { |
177 | 0 | SharedLock<decltype(mutex_)> lock(mutex_); |
178 | 0 | return partitions_->version; |
179 | 0 | } |
180 | | |
181 | | //-------------------------------------------------------------------------------------------------- |
182 | | |
183 | 41.1k | std::unique_ptr<YBqlWriteOp> YBTable::NewQLWrite() { |
184 | 41.1k | return std::unique_ptr<YBqlWriteOp>(new YBqlWriteOp(shared_from_this())); |
185 | 41.1k | } |
186 | | |
187 | 2.02M | std::unique_ptr<YBqlWriteOp> YBTable::NewQLInsert() { |
188 | 2.02M | return YBqlWriteOp::NewInsert(shared_from_this()); |
189 | 2.02M | } |
190 | | |
191 | 3.46k | std::unique_ptr<YBqlWriteOp> YBTable::NewQLUpdate() { |
192 | 3.46k | return YBqlWriteOp::NewUpdate(shared_from_this()); |
193 | 3.46k | } |
194 | | |
195 | 1.16k | std::unique_ptr<YBqlWriteOp> YBTable::NewQLDelete() { |
196 | 1.16k | return YBqlWriteOp::NewDelete(shared_from_this()); |
197 | 1.16k | } |
198 | | |
199 | 7.49M | std::unique_ptr<YBqlReadOp> YBTable::NewQLSelect() { |
200 | 7.49M | return YBqlReadOp::NewSelect(shared_from_this()); |
201 | 7.49M | } |
202 | | |
203 | 960 | std::unique_ptr<YBqlReadOp> YBTable::NewQLRead() { |
204 | 960 | return std::unique_ptr<YBqlReadOp>(new YBqlReadOp(shared_from_this())); |
205 | 960 | } |
206 | | |
207 | 8 | size_t YBTable::FindPartitionStartIndex(const PartitionKey& partition_key, size_t group_by) const { |
208 | 8 | SharedLock<decltype(mutex_)> lock(mutex_); |
209 | 8 | return client::FindPartitionStartIndex(partitions_->keys, partition_key, group_by); |
210 | 8 | } |
211 | | |
212 | | PartitionKeyPtr YBTable::FindPartitionStart( |
213 | 8 | const PartitionKey& partition_key, size_t group_by) const { |
214 | 8 | SharedLock<decltype(mutex_)> lock(mutex_); |
215 | 8 | size_t idx = FindPartitionStartIndex(partition_key, group_by); |
216 | 8 | return std::shared_ptr<const std::string>(partitions_, &partitions_->keys[idx]); |
217 | 8 | } |
218 | | |
219 | 32 | void YBTable::InvokeRefreshPartitionsCallbacks(const Status& status) { |
220 | 32 | std::vector<StdStatusCallback> callbacks; |
221 | 32 | { |
222 | 32 | UniqueLock<decltype(refresh_partitions_callbacks_mutex_)> lock( |
223 | 32 | refresh_partitions_callbacks_mutex_); |
224 | 32 | refresh_partitions_callbacks_.swap(callbacks); |
225 | 32 | } |
226 | 19.2k | for (auto& callback : callbacks) { |
227 | 19.2k | callback(status); |
228 | 19.2k | } |
229 | 32 | } |
230 | | |
231 | 19.2k | void YBTable::RefreshPartitions(YBClient* client, StdStatusCallback callback) { |
232 | 19.2k | UniqueLock<decltype(refresh_partitions_callbacks_mutex_)> lock( |
233 | 19.2k | refresh_partitions_callbacks_mutex_); |
234 | 19.2k | bool was_empty = refresh_partitions_callbacks_.empty(); |
235 | 19.2k | refresh_partitions_callbacks_.emplace_back(std::move(callback)); |
236 | 19.2k | if (!was_empty) { |
237 | 19.1k | VLOG_WITH_FUNC0 (2) << Format( |
238 | 0 | "FetchPartitions is in progress for table $0 ($1), added callback", info_->table_name, |
239 | 0 | info_->table_id); |
240 | 19.1k | return; |
241 | 19.1k | } |
242 | | |
243 | 33 | VLOG_WITH_FUNC0 (2) << Format( |
244 | 0 | "Calling FetchPartitions for table $0 ($1)", info_->table_name, info_->table_id); |
245 | 33 | FetchPartitions(client, info_->table_id, [this](const FetchPartitionsResult& result) { |
246 | 32 | if (!result.ok()) { |
247 | 0 | InvokeRefreshPartitionsCallbacks(result.status()); |
248 | 0 | return; |
249 | 0 | } |
250 | 32 | const auto& partitions = *result; |
251 | 32 | { |
252 | 32 | std::lock_guard<rw_spinlock> partitions_lock(mutex_); |
253 | 32 | if (partitions->version < partitions_->version) { |
254 | | // This might happen if another split happens after we had fetched partition in the current |
255 | | // thread from master leader and partition list has been concurrently updated to version |
256 | | // newer than version we got in current thread. |
257 | | // In this case we can safely skip outdated partition list. |
258 | 0 | LOG(INFO) << Format( |
259 | 0 | "Received table $0 partition list version: $1, ours is newer: $2", id(), |
260 | 0 | partitions->version, partitions_->version); |
261 | 0 | return; |
262 | 0 | } |
263 | 32 | partitions_ = partitions; |
264 | 32 | partitions_are_stale_ = false; |
265 | 32 | } |
266 | 0 | InvokeRefreshPartitionsCallbacks(Status::OK()); |
267 | 32 | }); |
268 | 33 | } |
269 | | |
270 | 2.05k | void YBTable::MarkPartitionsAsStale() { |
271 | 2.05k | partitions_are_stale_ = true; |
272 | 2.05k | } |
273 | | |
274 | 23.2M | bool YBTable::ArePartitionsStale() const { |
275 | 23.2M | return partitions_are_stale_; |
276 | 23.2M | } |
277 | | |
278 | | void YBTable::FetchPartitions( |
279 | 134k | YBClient* client, const TableId& table_id, FetchPartitionsCallback callback) { |
280 | | // TODO: fetch the schema from the master here once catalog is available. |
281 | | // TODO(tsplit): consider optimizing this to not wait for all tablets to be running in case |
282 | | // of some tablet has been split and post-split tablets are not yet running. |
283 | 134k | client->GetTableLocations( |
284 | 134k | table_id, /* max_tablets = */ std::numeric_limits<int32_t>::max(), |
285 | 134k | RequireTabletsRunning::kTrue, |
286 | 134k | [table_id, callback = std::move(callback)] |
287 | 134k | (const Result<master::GetTableLocationsResponsePB*>& result) { |
288 | 134k | if (!result.ok()) { |
289 | 226 | callback(result.status()); |
290 | 226 | return; |
291 | 226 | } |
292 | 134k | const auto& resp = **result; |
293 | | |
294 | 134k | VLOG_WITH_FUNC1 (2) << Format( |
295 | 1 | "Fetched partitions for table $0, found $1 tablets", |
296 | 1 | table_id, resp.tablet_locations_size()); |
297 | | |
298 | 134k | auto partitions = std::make_shared<VersionedTablePartitionList>(); |
299 | 134k | partitions->version = resp.partition_list_version(); |
300 | 134k | partitions->keys.reserve(resp.tablet_locations().size()); |
301 | 212k | for (const auto& tablet_location : resp.tablet_locations()) { |
302 | 212k | partitions->keys.push_back(tablet_location.partition().partition_key_start()); |
303 | 212k | } |
304 | 134k | std::sort(partitions->keys.begin(), partitions->keys.end()); |
305 | | |
306 | 134k | callback(partitions); |
307 | 134k | }); |
308 | 134k | } |
309 | | |
310 | | //-------------------------------------------------------------------------------------------------- |
311 | | |
312 | | size_t FindPartitionStartIndex(const TablePartitionList& partitions, |
313 | | const PartitionKey& partition_key, |
314 | 48.2M | size_t group_by) { |
315 | 48.2M | auto it = std::lower_bound(partitions.begin(), partitions.end(), partition_key); |
316 | 48.2M | if (it == partitions.end() || *it > partition_key39.7M ) { |
317 | 23.9M | DCHECK(it != partitions.begin()) << "Could not find partition start while looking for " |
318 | 4.74k | << partition_key << " in " << yb::ToString(partitions); |
319 | 23.9M | --it; |
320 | 23.9M | } |
321 | 48.2M | return group_by <= 1 ? it - partitions.begin()48.1M : |
322 | 48.2M | (it - partitions.begin()) / group_by * group_by111k ; |
323 | 48.2M | } |
324 | | |
325 | | PartitionKeyPtr FindPartitionStart( |
326 | | const VersionedTablePartitionListPtr& versioned_partitions, const PartitionKey& partition_key, |
327 | 46.7M | size_t group_by) { |
328 | 46.7M | const auto idx = FindPartitionStartIndex(versioned_partitions->keys, partition_key, group_by); |
329 | 46.7M | return PartitionKeyPtr(versioned_partitions, &versioned_partitions->keys[idx]); |
330 | 46.7M | } |
331 | | |
332 | 0 | std::string VersionedTablePartitionList::ToString() const { |
333 | 0 | auto key_transform = [](const Slice& key) { |
334 | 0 | return key.ToDebugHexString(); |
335 | 0 | }; |
336 | 0 | return Format("{ version: $0 keys: $1 }", version, CollectionToString(keys, key_transform)); |
337 | 0 | } |
338 | | |
339 | | } // namespace client |
340 | | } // namespace yb |