/Users/deen/code/yugabyte-db/src/yb/yql/pggate/pg_client.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/yql/pggate/pg_client.h" |
15 | | |
16 | | #include "yb/client/client-internal.h" |
17 | | #include "yb/client/table.h" |
18 | | #include "yb/client/table_info.h" |
19 | | #include "yb/client/tablet_server.h" |
20 | | #include "yb/client/yb_table_name.h" |
21 | | |
22 | | #include "yb/gutil/casts.h" |
23 | | |
24 | | #include "yb/rpc/poller.h" |
25 | | #include "yb/rpc/rpc_controller.h" |
26 | | |
27 | | #include "yb/tserver/pg_client.pb.h" |
28 | | #include "yb/tserver/pg_client.proxy.h" |
29 | | #include "yb/tserver/tserver_shared_mem.h" |
30 | | |
31 | | #include "yb/util/logging.h" |
32 | | #include "yb/util/protobuf_util.h" |
33 | | #include "yb/util/result.h" |
34 | | #include "yb/util/scope_exit.h" |
35 | | #include "yb/util/shared_mem.h" |
36 | | #include "yb/util/status.h" |
37 | | |
38 | | #include "yb/yql/pggate/pg_op.h" |
39 | | #include "yb/yql/pggate/pg_tabledesc.h" |
40 | | |
41 | | DECLARE_bool(use_node_hostname_for_local_tserver); |
42 | | DECLARE_int32(backfill_index_client_rpc_timeout_ms); |
43 | | DECLARE_int32(yb_client_admin_operation_timeout_sec); |
44 | | |
45 | | DEFINE_uint64(pg_client_heartbeat_interval_ms, 10000, "Pg client heartbeat interval in ms."); |
46 | | |
47 | | using namespace std::literals; |
48 | | |
49 | | namespace yb { |
50 | | namespace pggate { |
51 | | |
52 | | namespace { |
53 | | |
54 | | // Adding this value to RPC call timeout, so postgres could detect timeout by it's own mechanism |
55 | | // and report it. |
56 | | const auto kExtraTimeout = 2s; |
57 | | |
58 | | struct PerformData { |
59 | | PgsqlOps operations; |
60 | | tserver::PgPerformResponsePB resp; |
61 | | rpc::RpcController controller; |
62 | | PerformCallback callback; |
63 | | |
64 | 2.06M | CHECKED_STATUS Process() { |
65 | 2.06M | auto& responses = *resp.mutable_responses(); |
66 | 2.06M | SCHECK_EQ(implicit_cast<size_t>(responses.size()), operations.size(), RuntimeError, |
67 | 2.06M | Format("Wrong number of responses: $0, while $1 expected", |
68 | 2.06M | responses.size(), operations.size())); |
69 | 12.3M | for (uint32_t i = 0; 2.06M i != operations.size(); ++i10.3M ) { |
70 | 10.3M | if (responses[i].has_rows_data_sidecar()10.3M ) { |
71 | 10.3M | operations[i]->rows_data() = VERIFY_RESULT( |
72 | 10.3M | controller.GetSidecarPtr(responses[i].rows_data_sidecar())); |
73 | 10.3M | } |
74 | 10.3M | operations[i]->response() = std::move(responses[i]); |
75 | 10.3M | } |
76 | 2.06M | return Status::OK(); |
77 | 2.06M | } |
78 | | }; |
79 | | |
80 | 7 | std::string PrettyFunctionName(const char* name) { |
81 | 7 | std::string result; |
82 | 90 | for (const char* ch = name; *ch; ++ch83 ) { |
83 | 83 | if (!result.empty() && std::isupper(*ch)76 ) { |
84 | 7 | result += ' '; |
85 | 7 | } |
86 | 83 | result += *ch; |
87 | 83 | } |
88 | 7 | return result; |
89 | 7 | } |
90 | | |
91 | | } // namespace |
92 | | |
93 | | class PgClient::Impl { |
94 | | public: |
95 | 6.08k | Impl() : heartbeat_poller_(std::bind(&Impl::Heartbeat, this, false)) { |
96 | 6.08k | tablet_server_count_cache_.fill(0); |
97 | 6.08k | } |
98 | | |
99 | 6.06k | ~Impl() { |
100 | 6.06k | CHECK(!proxy_); |
101 | 6.06k | } |
102 | | |
103 | | CHECKED_STATUS Start(rpc::ProxyCache* proxy_cache, |
104 | | rpc::Scheduler* scheduler, |
105 | 6.09k | const tserver::TServerSharedObject& tserver_shared_object) { |
106 | 6.09k | CHECK_NOTNULL(&tserver_shared_object); |
107 | 6.09k | MonoDelta resolve_cache_timeout; |
108 | 6.09k | const auto& tserver_shared_data_ = *tserver_shared_object; |
109 | 6.09k | HostPort host_port(tserver_shared_data_.endpoint()); |
110 | 6.09k | if (FLAGS_use_node_hostname_for_local_tserver) { |
111 | 6 | host_port = HostPort(tserver_shared_data_.host().ToBuffer(), |
112 | 6 | tserver_shared_data_.endpoint().port()); |
113 | 6 | resolve_cache_timeout = MonoDelta::kMax; |
114 | 6 | } |
115 | 6.09k | LOG(INFO) << "Using TServer host_port: " << host_port; |
116 | 6.09k | proxy_ = std::make_unique<tserver::PgClientServiceProxy>( |
117 | 6.09k | proxy_cache, host_port, nullptr /* protocol */, resolve_cache_timeout); |
118 | | |
119 | 6.09k | auto future = create_session_promise_.get_future(); |
120 | 6.09k | Heartbeat(true); |
121 | 6.09k | session_id_ = VERIFY_RESULT(future.get()); |
122 | 6.09k | LOG_WITH_PREFIX(INFO) << "Session id acquired"; |
123 | 6.09k | heartbeat_poller_.Start(scheduler, FLAGS_pg_client_heartbeat_interval_ms * 1ms); |
124 | 6.09k | return Status::OK(); |
125 | 6.09k | } |
126 | | |
127 | 6.07k | void Shutdown() { |
128 | 6.07k | heartbeat_poller_.Shutdown(); |
129 | 6.07k | proxy_ = nullptr; |
130 | 6.07k | } |
131 | | |
132 | 10.8k | void Heartbeat(bool create) { |
133 | 10.8k | { |
134 | 10.8k | bool expected = false; |
135 | 10.8k | if (!heartbeat_running_.compare_exchange_strong(expected, true)) { |
136 | 12 | LOG_WITH_PREFIX(DFATAL) << "Heartbeat did not complete yet"; |
137 | 12 | return; |
138 | 12 | } |
139 | 10.8k | } |
140 | 10.7k | tserver::PgHeartbeatRequestPB req; |
141 | 10.7k | if (!create) { |
142 | 4.69k | req.set_session_id(session_id_); |
143 | 4.69k | } |
144 | 10.7k | proxy_->HeartbeatAsync( |
145 | 10.7k | req, &heartbeat_resp_, PrepareHeartbeatController(), |
146 | 10.7k | [this, create] { |
147 | 10.7k | auto status = ResponseStatus(heartbeat_resp_); |
148 | 10.7k | if (create) { |
149 | 6.09k | if (!status.ok()) { |
150 | 0 | create_session_promise_.set_value(status); |
151 | 6.09k | } else { |
152 | 6.09k | create_session_promise_.set_value(heartbeat_resp_.session_id()); |
153 | 6.09k | } |
154 | 6.09k | } |
155 | 10.7k | heartbeat_running_ = false; |
156 | 10.7k | if (!status.ok()) { |
157 | 0 | LOG_WITH_PREFIX(WARNING) << "Heartbeat failed: " << status; |
158 | 0 | } |
159 | 10.7k | }); |
160 | 10.7k | } |
161 | | |
162 | 2 | void SetTimeout(MonoDelta timeout) { |
163 | 2 | timeout_ = timeout + kExtraTimeout; |
164 | 2 | } |
165 | | |
166 | | Result<PgTableDescPtr> OpenTable( |
167 | 191k | const PgObjectId& table_id, bool reopen, CoarseTimePoint invalidate_cache_time) { |
168 | 191k | tserver::PgOpenTableRequestPB req; |
169 | 191k | req.set_table_id(table_id.GetYbTableId()); |
170 | 191k | req.set_reopen(reopen); |
171 | 191k | if (invalidate_cache_time != CoarseTimePoint()) { |
172 | 1.04k | req.set_invalidate_cache_time_us(ToMicroseconds(invalidate_cache_time.time_since_epoch())); |
173 | 1.04k | } |
174 | 191k | tserver::PgOpenTableResponsePB resp; |
175 | | |
176 | 191k | RETURN_NOT_OK(proxy_->OpenTable(req, &resp, PrepareController())); |
177 | 191k | RETURN_NOT_OK(ResponseStatus(resp)); |
178 | | |
179 | 191k | auto partitions = std::make_shared<client::VersionedTablePartitionList>(); |
180 | 191k | partitions->version = resp.partitions().version(); |
181 | 191k | partitions->keys.assign(resp.partitions().keys().begin(), resp.partitions().keys().end()); |
182 | | |
183 | 191k | auto result = make_scoped_refptr<PgTableDesc>( |
184 | 191k | table_id, resp.info(), std::move(partitions)); |
185 | 191k | RETURN_NOT_OK(result->Init()); |
186 | 191k | return result; |
187 | 191k | } |
188 | | |
189 | 218k | CHECKED_STATUS FinishTransaction(Commit commit, DdlMode ddl_mode) { |
190 | 218k | tserver::PgFinishTransactionRequestPB req; |
191 | 218k | req.set_session_id(session_id_); |
192 | 218k | req.set_commit(commit); |
193 | 218k | req.set_ddl_mode(ddl_mode); |
194 | 218k | tserver::PgFinishTransactionResponsePB resp; |
195 | | |
196 | 218k | RETURN_NOT_OK(proxy_->FinishTransaction(req, &resp, PrepareController())); |
197 | 218k | return ResponseStatus(resp); |
198 | 218k | } |
199 | | |
200 | 5.71k | Result<master::GetNamespaceInfoResponsePB> GetDatabaseInfo(uint32_t oid) { |
201 | 5.71k | tserver::PgGetDatabaseInfoRequestPB req; |
202 | 5.71k | req.set_oid(oid); |
203 | | |
204 | 5.71k | tserver::PgGetDatabaseInfoResponsePB resp; |
205 | | |
206 | 5.71k | RETURN_NOT_OK(proxy_->GetDatabaseInfo(req, &resp, PrepareController())); |
207 | 5.71k | RETURN_NOT_OK(ResponseStatus(resp)); |
208 | 5.71k | return resp.info(); |
209 | 5.71k | } |
210 | | |
211 | | CHECKED_STATUS SetActiveSubTransaction( |
212 | 61.7k | SubTransactionId id, tserver::PgPerformOptionsPB* options) { |
213 | 61.7k | tserver::PgSetActiveSubTransactionRequestPB req; |
214 | 61.7k | req.set_session_id(session_id_); |
215 | 61.7k | if (options) { |
216 | 7.28k | options->Swap(req.mutable_options()); |
217 | 7.28k | } |
218 | 61.7k | req.set_sub_transaction_id(id); |
219 | | |
220 | 61.7k | tserver::PgSetActiveSubTransactionResponsePB resp; |
221 | | |
222 | 61.7k | RETURN_NOT_OK(proxy_->SetActiveSubTransaction(req, &resp, PrepareController())); |
223 | 61.7k | return ResponseStatus(resp); |
224 | 61.7k | } |
225 | | |
226 | 13.5k | CHECKED_STATUS RollbackSubTransaction(SubTransactionId id) { |
227 | 13.5k | tserver::PgRollbackSubTransactionRequestPB req; |
228 | 13.5k | req.set_session_id(session_id_); |
229 | 13.5k | req.set_sub_transaction_id(id); |
230 | | |
231 | 13.5k | tserver::PgRollbackSubTransactionResponsePB resp; |
232 | | |
233 | 13.5k | RETURN_NOT_OK(proxy_->RollbackSubTransaction(req, &resp, PrepareController())); |
234 | 13.5k | return ResponseStatus(resp); |
235 | 13.5k | } |
236 | | |
237 | | CHECKED_STATUS InsertSequenceTuple(int64_t db_oid, |
238 | | int64_t seq_oid, |
239 | | uint64_t ysql_catalog_version, |
240 | | int64_t last_val, |
241 | 295 | bool is_called) { |
242 | 295 | tserver::PgInsertSequenceTupleRequestPB req; |
243 | 295 | req.set_session_id(session_id_); |
244 | 295 | req.set_db_oid(db_oid); |
245 | 295 | req.set_seq_oid(seq_oid); |
246 | 295 | req.set_ysql_catalog_version(ysql_catalog_version); |
247 | 295 | req.set_last_val(last_val); |
248 | 295 | req.set_is_called(is_called); |
249 | | |
250 | 295 | tserver::PgInsertSequenceTupleResponsePB resp; |
251 | | |
252 | 295 | RETURN_NOT_OK(proxy_->InsertSequenceTuple(req, &resp, PrepareController())); |
253 | 295 | return ResponseStatus(resp); |
254 | 295 | } |
255 | | |
256 | | Result<bool> UpdateSequenceTuple(int64_t db_oid, |
257 | | int64_t seq_oid, |
258 | | uint64_t ysql_catalog_version, |
259 | | int64_t last_val, |
260 | | bool is_called, |
261 | | boost::optional<int64_t> expected_last_val, |
262 | 2.97k | boost::optional<bool> expected_is_called) { |
263 | 2.97k | tserver::PgUpdateSequenceTupleRequestPB req; |
264 | 2.97k | req.set_session_id(session_id_); |
265 | 2.97k | req.set_db_oid(db_oid); |
266 | 2.97k | req.set_seq_oid(seq_oid); |
267 | 2.97k | req.set_ysql_catalog_version(ysql_catalog_version); |
268 | 2.97k | req.set_last_val(last_val); |
269 | 2.97k | req.set_is_called(is_called); |
270 | 2.97k | if (expected_last_val && expected_is_called2.95k ) { |
271 | 2.95k | req.set_has_expected(true); |
272 | 2.95k | req.set_expected_last_val(*expected_last_val); |
273 | 2.95k | req.set_expected_is_called(*expected_is_called); |
274 | 2.95k | } |
275 | | |
276 | 2.97k | tserver::PgUpdateSequenceTupleResponsePB resp; |
277 | | |
278 | 2.97k | RETURN_NOT_OK(proxy_->UpdateSequenceTuple(req, &resp, PrepareController())); |
279 | 2.97k | RETURN_NOT_OK(ResponseStatus(resp)); |
280 | 2.97k | return resp.skipped(); |
281 | 2.97k | } |
282 | | |
283 | | Result<std::pair<int64_t, bool>> ReadSequenceTuple(int64_t db_oid, |
284 | | int64_t seq_oid, |
285 | 3.23k | uint64_t ysql_catalog_version) { |
286 | 3.23k | tserver::PgReadSequenceTupleRequestPB req; |
287 | 3.23k | req.set_session_id(session_id_); |
288 | 3.23k | req.set_db_oid(db_oid); |
289 | 3.23k | req.set_seq_oid(seq_oid); |
290 | 3.23k | req.set_ysql_catalog_version(ysql_catalog_version); |
291 | | |
292 | 3.23k | tserver::PgReadSequenceTupleResponsePB resp; |
293 | | |
294 | 3.23k | RETURN_NOT_OK(proxy_->ReadSequenceTuple(req, &resp, PrepareController())); |
295 | 3.23k | RETURN_NOT_OK(ResponseStatus(resp)); |
296 | 3.23k | return std::make_pair(resp.last_val(), resp.is_called()); |
297 | 3.23k | } |
298 | | |
299 | 282 | CHECKED_STATUS DeleteSequenceTuple(int64_t db_oid, int64_t seq_oid) { |
300 | 282 | tserver::PgDeleteSequenceTupleRequestPB req; |
301 | 282 | req.set_session_id(session_id_); |
302 | 282 | req.set_db_oid(db_oid); |
303 | 282 | req.set_seq_oid(seq_oid); |
304 | | |
305 | 282 | tserver::PgDeleteSequenceTupleResponsePB resp; |
306 | | |
307 | 282 | RETURN_NOT_OK(proxy_->DeleteSequenceTuple(req, &resp, PrepareController())); |
308 | 282 | return ResponseStatus(resp); |
309 | 282 | } |
310 | | |
311 | 71 | CHECKED_STATUS DeleteDBSequences(int64_t db_oid) { |
312 | 71 | tserver::PgDeleteDBSequencesRequestPB req; |
313 | 71 | req.set_session_id(session_id_); |
314 | 71 | req.set_db_oid(db_oid); |
315 | | |
316 | 71 | tserver::PgDeleteDBSequencesResponsePB resp; |
317 | | |
318 | 71 | RETURN_NOT_OK(proxy_->DeleteDBSequences(req, &resp, PrepareController())); |
319 | 71 | return ResponseStatus(resp); |
320 | 71 | } |
321 | | |
322 | | void PerformAsync( |
323 | | tserver::PgPerformOptionsPB* options, |
324 | | PgsqlOps* operations, |
325 | 2.16M | const PerformCallback& callback) { |
326 | 2.16M | tserver::PgPerformRequestPB req; |
327 | 2.16M | req.set_session_id(session_id_); |
328 | 2.16M | *req.mutable_options() = std::move(*options); |
329 | 2.17M | auto se = ScopeExit([&req] { |
330 | 11.5M | for (auto& op : *req.mutable_ops()) { |
331 | 11.5M | if (!op.release_read()) { |
332 | 7.20M | op.release_write(); |
333 | 7.20M | } |
334 | 11.5M | } |
335 | 2.17M | }); |
336 | 2.16M | PrepareOperations(&req, operations); |
337 | | |
338 | 2.16M | auto data = std::make_shared<PerformData>(); |
339 | 2.16M | data->operations = std::move(*operations); |
340 | 2.16M | data->callback = callback; |
341 | 2.16M | data->controller.set_invoke_callback_mode(rpc::InvokeCallbackMode::kReactorThread); |
342 | | |
343 | 2.17M | proxy_->PerformAsync(req, &data->resp, SetupController(&data->controller), [data] { |
344 | 2.17M | PerformResult result; |
345 | 2.17M | result.status = data->controller.status(); |
346 | 2.17M | if (result.status.ok()) { |
347 | 2.17M | result.status = ResponseStatus(data->resp); |
348 | 2.17M | } |
349 | 2.17M | if (result.status.ok()) { |
350 | 2.06M | result.status = data->Process(); |
351 | 2.06M | } |
352 | 2.17M | if (result.status.ok() && data->resp.has_catalog_read_time()2.06M ) { |
353 | 77.1k | result.catalog_read_time = ReadHybridTime::FromPB(data->resp.catalog_read_time()); |
354 | 77.1k | } |
355 | 2.17M | data->callback(result); |
356 | 2.17M | }); |
357 | 2.16M | } |
358 | | |
359 | 2.16M | void PrepareOperations(tserver::PgPerformRequestPB* req, PgsqlOps* operations) { |
360 | 2.16M | auto& ops = *req->mutable_ops(); |
361 | 2.16M | ops.Reserve(narrow_cast<int>(operations->size())); |
362 | 11.5M | for (auto& op : *operations) { |
363 | 11.5M | auto* union_op = ops.Add(); |
364 | 11.5M | if (op->is_read()) { |
365 | 4.34M | auto& read_op = down_cast<PgsqlReadOp&>(*op); |
366 | 4.34M | union_op->set_allocated_read(&read_op.read_request()); |
367 | 4.34M | if (read_op.read_from_followers()) { |
368 | 93 | union_op->set_read_from_followers(true); |
369 | 93 | } |
370 | 7.20M | } else { |
371 | 7.20M | auto& write_op = down_cast<PgsqlWriteOp&>(*op); |
372 | 7.20M | if (write_op.write_time()) { |
373 | 371k | req->set_write_time(write_op.write_time().ToUint64()); |
374 | 371k | } |
375 | 7.20M | union_op->set_allocated_write(&write_op.write_request()); |
376 | 7.20M | } |
377 | 11.5M | if (op->read_time()) { |
378 | 48.5k | op->read_time().AddToPB(req->mutable_options()); |
379 | 48.5k | } |
380 | 11.5M | } |
381 | 2.16M | } |
382 | | |
383 | 805 | Result<std::pair<PgOid, PgOid>> ReserveOids(PgOid database_oid, PgOid next_oid, uint32_t count) { |
384 | 805 | tserver::PgReserveOidsRequestPB req; |
385 | 805 | req.set_database_oid(database_oid); |
386 | 805 | req.set_next_oid(next_oid); |
387 | 805 | req.set_count(count); |
388 | | |
389 | 805 | tserver::PgReserveOidsResponsePB resp; |
390 | | |
391 | 805 | RETURN_NOT_OK(proxy_->ReserveOids(req, &resp, PrepareController())); |
392 | 805 | RETURN_NOT_OK(ResponseStatus(resp)); |
393 | 805 | return std::pair<PgOid, PgOid>(resp.begin_oid(), resp.end_oid()); |
394 | 805 | } |
395 | | |
396 | 2 | Result<bool> IsInitDbDone() { |
397 | 2 | tserver::PgIsInitDbDoneRequestPB req; |
398 | 2 | tserver::PgIsInitDbDoneResponsePB resp; |
399 | | |
400 | 2 | RETURN_NOT_OK(proxy_->IsInitDbDone(req, &resp, PrepareController())); |
401 | 2 | RETURN_NOT_OK(ResponseStatus(resp)); |
402 | 2 | return resp.done(); |
403 | 2 | } |
404 | | |
405 | 22 | Result<uint64_t> GetCatalogMasterVersion() { |
406 | 22 | tserver::PgGetCatalogMasterVersionRequestPB req; |
407 | 22 | tserver::PgGetCatalogMasterVersionResponsePB resp; |
408 | | |
409 | 22 | RETURN_NOT_OK(proxy_->GetCatalogMasterVersion(req, &resp, PrepareController())); |
410 | 22 | RETURN_NOT_OK(ResponseStatus(resp)); |
411 | 22 | return resp.version(); |
412 | 22 | } |
413 | | |
414 | 0 | CHECKED_STATUS CreateSequencesDataTable() { |
415 | 0 | tserver::PgCreateSequencesDataTableRequestPB req; |
416 | 0 | tserver::PgCreateSequencesDataTableResponsePB resp; |
417 | |
|
418 | 0 | RETURN_NOT_OK(proxy_->CreateSequencesDataTable(req, &resp, PrepareController())); |
419 | 0 | return ResponseStatus(resp); |
420 | 0 | } |
421 | | |
422 | | Result<client::YBTableName> DropTable( |
423 | 4.14k | tserver::PgDropTableRequestPB* req, CoarseTimePoint deadline) { |
424 | 4.14k | req->set_session_id(session_id_); |
425 | 4.14k | tserver::PgDropTableResponsePB resp; |
426 | 4.14k | RETURN_NOT_OK(proxy_->DropTable(*req, &resp, PrepareController(deadline))); |
427 | 4.14k | RETURN_NOT_OK(ResponseStatus(resp)); |
428 | 4.14k | client::YBTableName result; |
429 | 4.14k | if (resp.has_indexed_table()) { |
430 | 667 | result.GetFromTableIdentifierPB(resp.indexed_table()); |
431 | 667 | } |
432 | 4.14k | return result; |
433 | 4.14k | } |
434 | | |
435 | | CHECKED_STATUS BackfillIndex( |
436 | 540 | tserver::PgBackfillIndexRequestPB* req, CoarseTimePoint deadline) { |
437 | 540 | tserver::PgBackfillIndexResponsePB resp; |
438 | 540 | req->set_session_id(session_id_); |
439 | | |
440 | 540 | RETURN_NOT_OK(proxy_->BackfillIndex(*req, &resp, PrepareController(deadline))); |
441 | 536 | return ResponseStatus(resp); |
442 | 540 | } |
443 | | |
444 | 6.02k | Result<int32> TabletServerCount(bool primary_only) { |
445 | 6.02k | if (tablet_server_count_cache_[primary_only] > 0) { |
446 | 5.84k | return tablet_server_count_cache_[primary_only]; |
447 | 5.84k | } |
448 | 180 | tserver::PgTabletServerCountRequestPB req; |
449 | 180 | tserver::PgTabletServerCountResponsePB resp; |
450 | 180 | req.set_primary_only(primary_only); |
451 | | |
452 | 180 | RETURN_NOT_OK(proxy_->TabletServerCount(req, &resp, PrepareController())); |
453 | 180 | RETURN_NOT_OK(ResponseStatus(resp)); |
454 | 180 | tablet_server_count_cache_[primary_only] = resp.count(); |
455 | 180 | return resp.count(); |
456 | 180 | } |
457 | | |
458 | 4 | Result<client::TabletServersInfo> ListLiveTabletServers(bool primary_only) { |
459 | 4 | tserver::PgListLiveTabletServersRequestPB req; |
460 | 4 | tserver::PgListLiveTabletServersResponsePB resp; |
461 | 4 | req.set_primary_only(primary_only); |
462 | | |
463 | 4 | RETURN_NOT_OK(proxy_->ListLiveTabletServers(req, &resp, PrepareController())); |
464 | 4 | RETURN_NOT_OK(ResponseStatus(resp)); |
465 | 4 | client::TabletServersInfo result; |
466 | 4 | result.reserve(resp.servers().size()); |
467 | 12 | for (const auto& server : resp.servers()) { |
468 | 12 | result.push_back(client::YBTabletServerPlacementInfo::FromPB(server)); |
469 | 12 | } |
470 | 4 | return result; |
471 | 4 | } |
472 | | |
473 | 1 | CHECKED_STATUS ValidatePlacement(const tserver::PgValidatePlacementRequestPB* req) { |
474 | 1 | tserver::PgValidatePlacementResponsePB resp; |
475 | 1 | RETURN_NOT_OK(proxy_->ValidatePlacement(*req, &resp, PrepareController())); |
476 | 1 | return ResponseStatus(resp); |
477 | 1 | } |
478 | | |
479 | | #define YB_PG_CLIENT_SIMPLE_METHOD_IMPL(r, data, method) \ |
480 | | CHECKED_STATUS method( \ |
481 | | tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), RequestPB)* req, \ |
482 | 6.50k | CoarseTimePoint deadline) { \ |
483 | 6.50k | tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) resp; \ |
484 | 6.50k | req->set_session_id(session_id_); \ |
485 | 6.50k | auto status = proxy_->method(*req, &resp, PrepareController(deadline)); \ |
486 | 6.50k | if (!status.ok()) { \ |
487 | 9 | if (status.IsTimedOut()) { \ |
488 | 7 | return STATUS_FORMAT(TimedOut, "Timed out waiting for $0", PrettyFunctionName(__func__)); \ |
489 | 7 | } \ |
490 | 9 | return status2 ; \ |
491 | 9 | } \ |
492 | 6.50k | return ResponseStatus(resp)6.49k ; \ |
493 | 6.50k | } yb::pggate::PgClient::Impl::AlterDatabase(yb::tserver::PgAlterDatabaseRequestPB*, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >) Line | Count | Source | 482 | 3 | CoarseTimePoint deadline) { \ | 483 | 3 | tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) resp; \ | 484 | 3 | req->set_session_id(session_id_); \ | 485 | 3 | auto status = proxy_->method(*req, &resp, PrepareController(deadline)); \ | 486 | 3 | if (!status.ok()) { \ | 487 | 0 | if (status.IsTimedOut()) { \ | 488 | 0 | return STATUS_FORMAT(TimedOut, "Timed out waiting for $0", PrettyFunctionName(__func__)); \ | 489 | 0 | } \ | 490 | 0 | return status; \ | 491 | 0 | } \ | 492 | 3 | return ResponseStatus(resp); \ | 493 | 3 | } |
yb::pggate::PgClient::Impl::AlterTable(yb::tserver::PgAlterTableRequestPB*, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >) Line | Count | Source | 482 | 522 | CoarseTimePoint deadline) { \ | 483 | 522 | tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) resp; \ | 484 | 522 | req->set_session_id(session_id_); \ | 485 | 522 | auto status = proxy_->method(*req, &resp, PrepareController(deadline)); \ | 486 | 522 | if (!status.ok()) { \ | 487 | 0 | if (status.IsTimedOut()) { \ | 488 | 0 | return STATUS_FORMAT(TimedOut, "Timed out waiting for $0", PrettyFunctionName(__func__)); \ | 489 | 0 | } \ | 490 | 0 | return status; \ | 491 | 0 | } \ | 492 | 522 | return ResponseStatus(resp); \ | 493 | 522 | } |
yb::pggate::PgClient::Impl::CreateDatabase(yb::tserver::PgCreateDatabaseRequestPB*, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >) Line | Count | Source | 482 | 134 | CoarseTimePoint deadline) { \ | 483 | 134 | tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) resp; \ | 484 | 134 | req->set_session_id(session_id_); \ | 485 | 134 | auto status = proxy_->method(*req, &resp, PrepareController(deadline)); \ | 486 | 134 | if (!status.ok()) { \ | 487 | 2 | if (status.IsTimedOut()) { \ | 488 | 2 | return STATUS_FORMAT(TimedOut, "Timed out waiting for $0", PrettyFunctionName(__func__)); \ | 489 | 2 | } \ | 490 | 2 | return status0 ; \ | 491 | 2 | } \ | 492 | 134 | return ResponseStatus(resp)132 ; \ | 493 | 134 | } |
yb::pggate::PgClient::Impl::CreateTable(yb::tserver::PgCreateTableRequestPB*, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >) Line | Count | Source | 482 | 5.05k | CoarseTimePoint deadline) { \ | 483 | 5.05k | tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) resp; \ | 484 | 5.05k | req->set_session_id(session_id_); \ | 485 | 5.05k | auto status = proxy_->method(*req, &resp, PrepareController(deadline)); \ | 486 | 5.05k | if (!status.ok()) { \ | 487 | 7 | if (status.IsTimedOut()) { \ | 488 | 5 | return STATUS_FORMAT(TimedOut, "Timed out waiting for $0", PrettyFunctionName(__func__)); \ | 489 | 5 | } \ | 490 | 7 | return status2 ; \ | 491 | 7 | } \ | 492 | 5.05k | return ResponseStatus(resp)5.05k ; \ | 493 | 5.05k | } |
yb::pggate::PgClient::Impl::CreateTablegroup(yb::tserver::PgCreateTablegroupRequestPB*, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >) Line | Count | Source | 482 | 54 | CoarseTimePoint deadline) { \ | 483 | 54 | tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) resp; \ | 484 | 54 | req->set_session_id(session_id_); \ | 485 | 54 | auto status = proxy_->method(*req, &resp, PrepareController(deadline)); \ | 486 | 54 | if (!status.ok()) { \ | 487 | 0 | if (status.IsTimedOut()) { \ | 488 | 0 | return STATUS_FORMAT(TimedOut, "Timed out waiting for $0", PrettyFunctionName(__func__)); \ | 489 | 0 | } \ | 490 | 0 | return status; \ | 491 | 0 | } \ | 492 | 54 | return ResponseStatus(resp); \ | 493 | 54 | } |
yb::pggate::PgClient::Impl::DropDatabase(yb::tserver::PgDropDatabaseRequestPB*, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >) Line | Count | Source | 482 | 72 | CoarseTimePoint deadline) { \ | 483 | 72 | tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) resp; \ | 484 | 72 | req->set_session_id(session_id_); \ | 485 | 72 | auto status = proxy_->method(*req, &resp, PrepareController(deadline)); \ | 486 | 72 | if (!status.ok()) { \ | 487 | 0 | if (status.IsTimedOut()) { \ | 488 | 0 | return STATUS_FORMAT(TimedOut, "Timed out waiting for $0", PrettyFunctionName(__func__)); \ | 489 | 0 | } \ | 490 | 0 | return status; \ | 491 | 0 | } \ | 492 | 72 | return ResponseStatus(resp); \ | 493 | 72 | } |
yb::pggate::PgClient::Impl::DropTablegroup(yb::tserver::PgDropTablegroupRequestPB*, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >) Line | Count | Source | 482 | 39 | CoarseTimePoint deadline) { \ | 483 | 39 | tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) resp; \ | 484 | 39 | req->set_session_id(session_id_); \ | 485 | 39 | auto status = proxy_->method(*req, &resp, PrepareController(deadline)); \ | 486 | 39 | if (!status.ok()) { \ | 487 | 0 | if (status.IsTimedOut()) { \ | 488 | 0 | return STATUS_FORMAT(TimedOut, "Timed out waiting for $0", PrettyFunctionName(__func__)); \ | 489 | 0 | } \ | 490 | 0 | return status; \ | 491 | 0 | } \ | 492 | 39 | return ResponseStatus(resp); \ | 493 | 39 | } |
yb::pggate::PgClient::Impl::TruncateTable(yb::tserver::PgTruncateTableRequestPB*, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >) Line | Count | Source | 482 | 624 | CoarseTimePoint deadline) { \ | 483 | 624 | tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) resp; \ | 484 | 624 | req->set_session_id(session_id_); \ | 485 | 624 | auto status = proxy_->method(*req, &resp, PrepareController(deadline)); \ | 486 | 624 | if (!status.ok()) { \ | 487 | 0 | if (status.IsTimedOut()) { \ | 488 | 0 | return STATUS_FORMAT(TimedOut, "Timed out waiting for $0", PrettyFunctionName(__func__)); \ | 489 | 0 | } \ | 490 | 0 | return status; \ | 491 | 0 | } \ | 492 | 624 | return ResponseStatus(resp); \ | 493 | 624 | } |
|
494 | | |
495 | | BOOST_PP_SEQ_FOR_EACH(YB_PG_CLIENT_SIMPLE_METHOD_IMPL, ~, YB_PG_CLIENT_SIMPLE_METHODS); |
496 | | |
497 | | private: |
498 | 6.10k | std::string LogPrefix() const { |
499 | 6.10k | return Format("S $0: ", session_id_); |
500 | 6.10k | } |
501 | | |
502 | | rpc::RpcController* SetupController( |
503 | 2.68M | rpc::RpcController* controller, CoarseTimePoint deadline = CoarseTimePoint()) { |
504 | 2.68M | if (deadline != CoarseTimePoint()) { |
505 | 6.97k | controller->set_deadline(deadline); |
506 | 2.67M | } else { |
507 | 2.67M | controller->set_timeout(timeout_); |
508 | 2.67M | } |
509 | 2.68M | return controller; |
510 | 2.68M | } |
511 | | |
512 | 509k | rpc::RpcController* PrepareController(CoarseTimePoint deadline = CoarseTimePoint()) { |
513 | 509k | controller_.Reset(); |
514 | 509k | return SetupController(&controller_, deadline); |
515 | 509k | } |
516 | | |
517 | 10.7k | rpc::RpcController* PrepareHeartbeatController() { |
518 | 10.7k | heartbeat_controller_.Reset(); |
519 | 10.7k | heartbeat_controller_.set_timeout(FLAGS_pg_client_heartbeat_interval_ms * 1ms - 1s); |
520 | 10.7k | return &heartbeat_controller_; |
521 | 10.7k | } |
522 | | |
523 | | std::unique_ptr<tserver::PgClientServiceProxy> proxy_; |
524 | | rpc::RpcController controller_; |
525 | | uint64_t session_id_ = 0; |
526 | | |
527 | | rpc::Poller heartbeat_poller_; |
528 | | std::atomic<bool> heartbeat_running_{false}; |
529 | | rpc::RpcController heartbeat_controller_; |
530 | | tserver::PgHeartbeatResponsePB heartbeat_resp_; |
531 | | std::promise<Result<uint64_t>> create_session_promise_; |
532 | | std::array<int, 2> tablet_server_count_cache_; |
533 | | MonoDelta timeout_ = FLAGS_yb_client_admin_operation_timeout_sec * 1s; |
534 | | }; |
535 | | |
536 | 6.09k | PgClient::PgClient() : impl_(new Impl) { |
537 | 6.09k | } |
538 | | |
539 | 6.07k | PgClient::~PgClient() { |
540 | 6.07k | } |
541 | | |
542 | | Status PgClient::Start( |
543 | | rpc::ProxyCache* proxy_cache, rpc::Scheduler* scheduler, |
544 | 6.09k | const tserver::TServerSharedObject& tserver_shared_object) { |
545 | 6.09k | return impl_->Start(proxy_cache, scheduler, tserver_shared_object); |
546 | 6.09k | } |
547 | | |
548 | 6.07k | void PgClient::Shutdown() { |
549 | 6.07k | impl_->Shutdown(); |
550 | 6.07k | } |
551 | | |
552 | 2 | void PgClient::SetTimeout(MonoDelta timeout) { |
553 | 2 | impl_->SetTimeout(timeout); |
554 | 2 | } |
555 | | |
556 | | Result<PgTableDescPtr> PgClient::OpenTable( |
557 | 191k | const PgObjectId& table_id, bool reopen, CoarseTimePoint invalidate_cache_time) { |
558 | 191k | return impl_->OpenTable(table_id, reopen, invalidate_cache_time); |
559 | 191k | } |
560 | | |
561 | 218k | Status PgClient::FinishTransaction(Commit commit, DdlMode ddl_mode) { |
562 | 218k | return impl_->FinishTransaction(commit, ddl_mode); |
563 | 218k | } |
564 | | |
565 | 5.72k | Result<master::GetNamespaceInfoResponsePB> PgClient::GetDatabaseInfo(uint32_t oid) { |
566 | 5.72k | return impl_->GetDatabaseInfo(oid); |
567 | 5.72k | } |
568 | | |
569 | | Result<std::pair<PgOid, PgOid>> PgClient::ReserveOids( |
570 | 805 | PgOid database_oid, PgOid next_oid, uint32_t count) { |
571 | 805 | return impl_->ReserveOids(database_oid, next_oid, count); |
572 | 805 | } |
573 | | |
574 | 2 | Result<bool> PgClient::IsInitDbDone() { |
575 | 2 | return impl_->IsInitDbDone(); |
576 | 2 | } |
577 | | |
578 | 22 | Result<uint64_t> PgClient::GetCatalogMasterVersion() { |
579 | 22 | return impl_->GetCatalogMasterVersion(); |
580 | 22 | } |
581 | | |
582 | 0 | Status PgClient::CreateSequencesDataTable() { |
583 | 0 | return impl_->CreateSequencesDataTable(); |
584 | 0 | } |
585 | | |
586 | | Result<client::YBTableName> PgClient::DropTable( |
587 | 4.14k | tserver::PgDropTableRequestPB* req, CoarseTimePoint deadline) { |
588 | 4.14k | return impl_->DropTable(req, deadline); |
589 | 4.14k | } |
590 | | |
591 | | Status PgClient::BackfillIndex( |
592 | 540 | tserver::PgBackfillIndexRequestPB* req, CoarseTimePoint deadline) { |
593 | 540 | return impl_->BackfillIndex(req, deadline); |
594 | 540 | } |
595 | | |
596 | 6.04k | Result<int32> PgClient::TabletServerCount(bool primary_only) { |
597 | 6.04k | return impl_->TabletServerCount(primary_only); |
598 | 6.04k | } |
599 | | |
600 | 4 | Result<client::TabletServersInfo> PgClient::ListLiveTabletServers(bool primary_only) { |
601 | 4 | return impl_->ListLiveTabletServers(primary_only); |
602 | 4 | } |
603 | | |
604 | | Status PgClient::SetActiveSubTransaction( |
605 | 61.7k | SubTransactionId id, tserver::PgPerformOptionsPB* options) { |
606 | 61.7k | return impl_->SetActiveSubTransaction(id, options); |
607 | 61.7k | } |
608 | | |
609 | 13.5k | Status PgClient::RollbackSubTransaction(SubTransactionId id) { |
610 | 13.5k | return impl_->RollbackSubTransaction(id); |
611 | 13.5k | } |
612 | | |
613 | 1 | Status PgClient::ValidatePlacement(const tserver::PgValidatePlacementRequestPB* req) { |
614 | 1 | return impl_->ValidatePlacement(req); |
615 | 1 | } |
616 | | |
617 | | Status PgClient::InsertSequenceTuple(int64_t db_oid, |
618 | | int64_t seq_oid, |
619 | | uint64_t ysql_catalog_version, |
620 | | int64_t last_val, |
621 | 295 | bool is_called) { |
622 | 295 | return impl_->InsertSequenceTuple(db_oid, seq_oid, ysql_catalog_version, last_val, is_called); |
623 | 295 | } |
624 | | |
625 | | Result<bool> PgClient::UpdateSequenceTuple(int64_t db_oid, |
626 | | int64_t seq_oid, |
627 | | uint64_t ysql_catalog_version, |
628 | | int64_t last_val, |
629 | | bool is_called, |
630 | | boost::optional<int64_t> expected_last_val, |
631 | 2.97k | boost::optional<bool> expected_is_called) { |
632 | 2.97k | return impl_->UpdateSequenceTuple( |
633 | 2.97k | db_oid, seq_oid, ysql_catalog_version, last_val, is_called, expected_last_val, |
634 | 2.97k | expected_is_called); |
635 | 2.97k | } |
636 | | |
637 | | Result<std::pair<int64_t, bool>> PgClient::ReadSequenceTuple(int64_t db_oid, |
638 | | int64_t seq_oid, |
639 | 3.23k | uint64_t ysql_catalog_version) { |
640 | 3.23k | return impl_->ReadSequenceTuple(db_oid, seq_oid, ysql_catalog_version); |
641 | 3.23k | } |
642 | | |
643 | 282 | Status PgClient::DeleteSequenceTuple(int64_t db_oid, int64_t seq_oid) { |
644 | 282 | return impl_->DeleteSequenceTuple(db_oid, seq_oid); |
645 | 282 | } |
646 | | |
647 | 71 | Status PgClient::DeleteDBSequences(int64_t db_oid) { |
648 | 71 | return impl_->DeleteDBSequences(db_oid); |
649 | 71 | } |
650 | | |
651 | | void PgClient::PerformAsync( |
652 | | tserver::PgPerformOptionsPB* options, |
653 | | PgsqlOps* operations, |
654 | 2.17M | const PerformCallback& callback) { |
655 | 2.17M | impl_->PerformAsync(options, operations, callback); |
656 | 2.17M | } |
657 | | |
658 | | #define YB_PG_CLIENT_SIMPLE_METHOD_DEFINE(r, data, method) \ |
659 | | Status PgClient::method( \ |
660 | | tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), RequestPB)* req, \ |
661 | 6.50k | CoarseTimePoint deadline) { \ |
662 | 6.50k | return impl_->method(req, deadline); \ |
663 | 6.50k | } yb::pggate::PgClient::AlterDatabase(yb::tserver::PgAlterDatabaseRequestPB*, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >) Line | Count | Source | 661 | 3 | CoarseTimePoint deadline) { \ | 662 | 3 | return impl_->method(req, deadline); \ | 663 | 3 | } |
yb::pggate::PgClient::AlterTable(yb::tserver::PgAlterTableRequestPB*, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >) Line | Count | Source | 661 | 522 | CoarseTimePoint deadline) { \ | 662 | 522 | return impl_->method(req, deadline); \ | 663 | 522 | } |
yb::pggate::PgClient::CreateDatabase(yb::tserver::PgCreateDatabaseRequestPB*, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >) Line | Count | Source | 661 | 134 | CoarseTimePoint deadline) { \ | 662 | 134 | return impl_->method(req, deadline); \ | 663 | 134 | } |
yb::pggate::PgClient::CreateTable(yb::tserver::PgCreateTableRequestPB*, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >) Line | Count | Source | 661 | 5.05k | CoarseTimePoint deadline) { \ | 662 | 5.05k | return impl_->method(req, deadline); \ | 663 | 5.05k | } |
yb::pggate::PgClient::CreateTablegroup(yb::tserver::PgCreateTablegroupRequestPB*, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >) Line | Count | Source | 661 | 54 | CoarseTimePoint deadline) { \ | 662 | 54 | return impl_->method(req, deadline); \ | 663 | 54 | } |
yb::pggate::PgClient::DropDatabase(yb::tserver::PgDropDatabaseRequestPB*, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >) Line | Count | Source | 661 | 72 | CoarseTimePoint deadline) { \ | 662 | 72 | return impl_->method(req, deadline); \ | 663 | 72 | } |
yb::pggate::PgClient::DropTablegroup(yb::tserver::PgDropTablegroupRequestPB*, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >) Line | Count | Source | 661 | 39 | CoarseTimePoint deadline) { \ | 662 | 39 | return impl_->method(req, deadline); \ | 663 | 39 | } |
yb::pggate::PgClient::TruncateTable(yb::tserver::PgTruncateTableRequestPB*, std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >) Line | Count | Source | 661 | 624 | CoarseTimePoint deadline) { \ | 662 | 624 | return impl_->method(req, deadline); \ | 663 | 624 | } |
|
664 | | |
665 | | BOOST_PP_SEQ_FOR_EACH(YB_PG_CLIENT_SIMPLE_METHOD_DEFINE, ~, YB_PG_CLIENT_SIMPLE_METHODS); |
666 | | |
667 | | } // namespace pggate |
668 | | } // namespace yb |