/Users/deen/code/yugabyte-db/src/yb/yql/pggate/pg_client.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/yql/pggate/pg_client.h" |
15 | | |
16 | | #include "yb/client/client-internal.h" |
17 | | #include "yb/client/table.h" |
18 | | #include "yb/client/table_info.h" |
19 | | #include "yb/client/tablet_server.h" |
20 | | #include "yb/client/yb_table_name.h" |
21 | | |
22 | | #include "yb/gutil/casts.h" |
23 | | |
24 | | #include "yb/rpc/poller.h" |
25 | | #include "yb/rpc/rpc_controller.h" |
26 | | |
27 | | #include "yb/tserver/pg_client.pb.h" |
28 | | #include "yb/tserver/pg_client.proxy.h" |
29 | | #include "yb/tserver/tserver_shared_mem.h" |
30 | | |
31 | | #include "yb/util/logging.h" |
32 | | #include "yb/util/protobuf_util.h" |
33 | | #include "yb/util/result.h" |
34 | | #include "yb/util/scope_exit.h" |
35 | | #include "yb/util/shared_mem.h" |
36 | | #include "yb/util/status.h" |
37 | | |
38 | | #include "yb/yql/pggate/pg_op.h" |
39 | | #include "yb/yql/pggate/pg_tabledesc.h" |
40 | | |
41 | | DECLARE_bool(use_node_hostname_for_local_tserver); |
42 | | DECLARE_int32(backfill_index_client_rpc_timeout_ms); |
43 | | DECLARE_int32(yb_client_admin_operation_timeout_sec); |
44 | | |
45 | | DEFINE_uint64(pg_client_heartbeat_interval_ms, 10000, "Pg client heartbeat interval in ms."); |
46 | | |
47 | | using namespace std::literals; |
48 | | |
49 | | namespace yb { |
50 | | namespace pggate { |
51 | | |
52 | | namespace { |
53 | | |
54 | | // Adding this value to RPC call timeout, so postgres could detect timeout by it's own mechanism |
55 | | // and report it. |
56 | | const auto kExtraTimeout = 2s; |
57 | | |
58 | | struct PerformData { |
59 | | PgsqlOps operations; |
60 | | tserver::PgPerformResponsePB resp; |
61 | | rpc::RpcController controller; |
62 | | PerformCallback callback; |
63 | | |
64 | 728k | CHECKED_STATUS Process() { |
65 | 728k | auto& responses = *resp.mutable_responses(); |
66 | 728k | SCHECK_EQ(implicit_cast<size_t>(responses.size()), operations.size(), RuntimeError, |
67 | 728k | Format("Wrong number of responses: $0, while $1 expected", |
68 | 728k | responses.size(), operations.size())); |
69 | 4.18M | for (uint32_t i = 0; i != operations.size(); ++i) { |
70 | 3.45M | if (responses[i].has_rows_data_sidecar()) { |
71 | 3.45M | operations[i]->rows_data() = VERIFY_RESULT( |
72 | 3.45M | controller.GetSidecarPtr(responses[i].rows_data_sidecar())); |
73 | 3.45M | } |
74 | 3.45M | operations[i]->response() = std::move(responses[i]); |
75 | 3.45M | } |
76 | 728k | return Status::OK(); |
77 | 728k | } |
78 | | }; |
79 | | |
80 | 0 | std::string PrettyFunctionName(const char* name) { |
81 | 0 | std::string result; |
82 | 0 | for (const char* ch = name; *ch; ++ch) { |
83 | 0 | if (!result.empty() && std::isupper(*ch)) { |
84 | 0 | result += ' '; |
85 | 0 | } |
86 | 0 | result += *ch; |
87 | 0 | } |
88 | 0 | return result; |
89 | 0 | } |
90 | | |
91 | | } // namespace |
92 | | |
93 | | class PgClient::Impl { |
94 | | public: |
95 | 1.65k | Impl() : heartbeat_poller_(std::bind(&Impl::Heartbeat, this, false)) { |
96 | 1.65k | tablet_server_count_cache_.fill(0); |
97 | 1.65k | } |
98 | | |
99 | 1.64k | ~Impl() { |
100 | 1.64k | CHECK(!proxy_); |
101 | 1.64k | } |
102 | | |
103 | | CHECKED_STATUS Start(rpc::ProxyCache* proxy_cache, |
104 | | rpc::Scheduler* scheduler, |
105 | 1.65k | const tserver::TServerSharedObject& tserver_shared_object) { |
106 | 1.65k | CHECK_NOTNULL(&tserver_shared_object); |
107 | 1.65k | MonoDelta resolve_cache_timeout; |
108 | 1.65k | const auto& tserver_shared_data_ = *tserver_shared_object; |
109 | 1.65k | HostPort host_port(tserver_shared_data_.endpoint()); |
110 | 1.65k | if (FLAGS_use_node_hostname_for_local_tserver) { |
111 | 0 | host_port = HostPort(tserver_shared_data_.host().ToBuffer(), |
112 | 0 | tserver_shared_data_.endpoint().port()); |
113 | 0 | resolve_cache_timeout = MonoDelta::kMax; |
114 | 0 | } |
115 | 1.65k | LOG(INFO) << "Using TServer host_port: " << host_port; |
116 | 1.65k | proxy_ = std::make_unique<tserver::PgClientServiceProxy>( |
117 | 1.65k | proxy_cache, host_port, nullptr /* protocol */, resolve_cache_timeout); |
118 | | |
119 | 1.65k | auto future = create_session_promise_.get_future(); |
120 | 1.65k | Heartbeat(true); |
121 | 1.65k | session_id_ = VERIFY_RESULT(future.get()); |
122 | 1.65k | LOG_WITH_PREFIX(INFO) << "Session id acquired"; |
123 | 1.65k | heartbeat_poller_.Start(scheduler, FLAGS_pg_client_heartbeat_interval_ms * 1ms); |
124 | 1.65k | return Status::OK(); |
125 | 1.65k | } |
126 | | |
127 | 1.65k | void Shutdown() { |
128 | 1.65k | heartbeat_poller_.Shutdown(); |
129 | 1.65k | proxy_ = nullptr; |
130 | 1.65k | } |
131 | | |
132 | 2.74k | void Heartbeat(bool create) { |
133 | 2.74k | { |
134 | 2.74k | bool expected = false; |
135 | 2.74k | if (!heartbeat_running_.compare_exchange_strong(expected, true)) { |
136 | 0 | LOG_WITH_PREFIX(DFATAL) << "Heartbeat did not complete yet"; |
137 | 0 | return; |
138 | 0 | } |
139 | 2.74k | } |
140 | 2.74k | tserver::PgHeartbeatRequestPB req; |
141 | 2.74k | if (!create) { |
142 | 1.09k | req.set_session_id(session_id_); |
143 | 1.09k | } |
144 | 2.74k | proxy_->HeartbeatAsync( |
145 | 2.74k | req, &heartbeat_resp_, PrepareHeartbeatController(), |
146 | 2.74k | [this, create] { |
147 | 2.74k | auto status = ResponseStatus(heartbeat_resp_); |
148 | 2.74k | if (create) { |
149 | 1.65k | if (!status.ok()) { |
150 | 0 | create_session_promise_.set_value(status); |
151 | 1.65k | } else { |
152 | 1.65k | create_session_promise_.set_value(heartbeat_resp_.session_id()); |
153 | 1.65k | } |
154 | 1.65k | } |
155 | 2.74k | heartbeat_running_ = false; |
156 | 2.74k | if (!status.ok()) { |
157 | 0 | LOG_WITH_PREFIX(WARNING) << "Heartbeat failed: " << status; |
158 | 0 | } |
159 | 2.74k | }); |
160 | 2.74k | } |
161 | | |
162 | 0 | void SetTimeout(MonoDelta timeout) { |
163 | 0 | timeout_ = timeout + kExtraTimeout; |
164 | 0 | } |
165 | | |
166 | | Result<PgTableDescPtr> OpenTable( |
167 | 64.5k | const PgObjectId& table_id, bool reopen, CoarseTimePoint invalidate_cache_time) { |
168 | 64.5k | tserver::PgOpenTableRequestPB req; |
169 | 64.5k | req.set_table_id(table_id.GetYBTableId()); |
170 | 64.5k | req.set_reopen(reopen); |
171 | 64.5k | if (invalidate_cache_time != CoarseTimePoint()) { |
172 | 551 | req.set_invalidate_cache_time_us(ToMicroseconds(invalidate_cache_time.time_since_epoch())); |
173 | 551 | } |
174 | 64.5k | tserver::PgOpenTableResponsePB resp; |
175 | | |
176 | 64.5k | RETURN_NOT_OK(proxy_->OpenTable(req, &resp, PrepareController())); |
177 | 64.5k | RETURN_NOT_OK(ResponseStatus(resp)); |
178 | | |
179 | 64.5k | client::YBTableInfo info; |
180 | 64.5k | RETURN_NOT_OK(client::CreateTableInfoFromTableSchemaResp(resp.info(), &info)); |
181 | | |
182 | 64.5k | auto partitions = std::make_shared<client::VersionedTablePartitionList>(); |
183 | 64.5k | partitions->version = resp.partitions().version(); |
184 | 64.5k | partitions->keys.assign(resp.partitions().keys().begin(), resp.partitions().keys().end()); |
185 | | |
186 | 64.5k | return make_scoped_refptr<PgTableDesc>( |
187 | 64.5k | table_id, std::make_shared<client::YBTable>(info, std::move(partitions))); |
188 | 64.5k | } |
189 | | |
190 | 82.3k | CHECKED_STATUS FinishTransaction(Commit commit, DdlMode ddl_mode) { |
191 | 82.3k | tserver::PgFinishTransactionRequestPB req; |
192 | 82.3k | req.set_session_id(session_id_); |
193 | 82.3k | req.set_commit(commit); |
194 | 82.3k | req.set_ddl_mode(ddl_mode); |
195 | 82.3k | tserver::PgFinishTransactionResponsePB resp; |
196 | | |
197 | 82.3k | RETURN_NOT_OK(proxy_->FinishTransaction(req, &resp, PrepareController())); |
198 | 82.3k | return ResponseStatus(resp); |
199 | 82.3k | } |
200 | | |
201 | 1.61k | Result<master::GetNamespaceInfoResponsePB> GetDatabaseInfo(uint32_t oid) { |
202 | 1.61k | tserver::PgGetDatabaseInfoRequestPB req; |
203 | 1.61k | req.set_oid(oid); |
204 | | |
205 | 1.61k | tserver::PgGetDatabaseInfoResponsePB resp; |
206 | | |
207 | 1.61k | RETURN_NOT_OK(proxy_->GetDatabaseInfo(req, &resp, PrepareController())); |
208 | 1.61k | RETURN_NOT_OK(ResponseStatus(resp)); |
209 | 1.61k | return resp.info(); |
210 | 1.61k | } |
211 | | |
212 | | CHECKED_STATUS SetActiveSubTransaction( |
213 | 48.8k | SubTransactionId id, tserver::PgPerformOptionsPB* options) { |
214 | 48.8k | tserver::PgSetActiveSubTransactionRequestPB req; |
215 | 48.8k | req.set_session_id(session_id_); |
216 | 48.8k | if (options) { |
217 | 307 | options->Swap(req.mutable_options()); |
218 | 307 | } |
219 | 48.8k | req.set_sub_transaction_id(id); |
220 | | |
221 | 48.8k | tserver::PgSetActiveSubTransactionResponsePB resp; |
222 | | |
223 | 48.8k | RETURN_NOT_OK(proxy_->SetActiveSubTransaction(req, &resp, PrepareController())); |
224 | 48.8k | return ResponseStatus(resp); |
225 | 48.8k | } |
226 | | |
227 | 23.5k | CHECKED_STATUS RollbackSubTransaction(SubTransactionId id) { |
228 | 23.5k | tserver::PgRollbackSubTransactionRequestPB req; |
229 | 23.5k | req.set_session_id(session_id_); |
230 | 23.5k | req.set_sub_transaction_id(id); |
231 | | |
232 | 23.5k | tserver::PgRollbackSubTransactionResponsePB resp; |
233 | | |
234 | 23.5k | RETURN_NOT_OK(proxy_->RollbackSubTransaction(req, &resp, PrepareController())); |
235 | 23.5k | return ResponseStatus(resp); |
236 | 23.5k | } |
237 | | |
238 | | void PerformAsync( |
239 | | tserver::PgPerformOptionsPB* options, |
240 | | PgsqlOps* operations, |
241 | 774k | const PerformCallback& callback) { |
242 | 774k | tserver::PgPerformRequestPB req; |
243 | 774k | req.set_session_id(session_id_); |
244 | 774k | *req.mutable_options() = std::move(*options); |
245 | 775k | auto se = ScopeExit([&req] { |
246 | 4.02M | for (auto& op : *req.mutable_ops()) { |
247 | 4.02M | if (!op.release_read()) { |
248 | 2.12M | op.release_write(); |
249 | 2.12M | } |
250 | 4.02M | } |
251 | 775k | }); |
252 | 774k | PrepareOperations(&req, operations); |
253 | | |
254 | 774k | auto data = std::make_shared<PerformData>(); |
255 | 774k | data->operations = std::move(*operations); |
256 | 774k | data->callback = callback; |
257 | 774k | data->controller.set_invoke_callback_mode(rpc::InvokeCallbackMode::kReactorThread); |
258 | | |
259 | 775k | proxy_->PerformAsync(req, &data->resp, SetupController(&data->controller), [data] { |
260 | 775k | PerformResult result; |
261 | 775k | result.status = data->controller.status(); |
262 | 775k | if (result.status.ok()) { |
263 | 775k | result.status = ResponseStatus(data->resp); |
264 | 775k | } |
265 | 775k | if (result.status.ok()) { |
266 | 728k | result.status = data->Process(); |
267 | 728k | } |
268 | 775k | if (result.status.ok() && data->resp.has_catalog_read_time()) { |
269 | 18.5k | result.catalog_read_time = ReadHybridTime::FromPB(data->resp.catalog_read_time()); |
270 | 18.5k | } |
271 | 775k | data->callback(result); |
272 | 775k | }); |
273 | 774k | } |
274 | | |
275 | 774k | void PrepareOperations(tserver::PgPerformRequestPB* req, PgsqlOps* operations) { |
276 | 774k | auto& ops = *req->mutable_ops(); |
277 | 774k | ops.Reserve(narrow_cast<int>(operations->size())); |
278 | 4.02M | for (auto& op : *operations) { |
279 | 4.02M | auto* union_op = ops.Add(); |
280 | 4.02M | if (op->is_read()) { |
281 | 1.89M | auto& read_op = down_cast<PgsqlReadOp&>(*op); |
282 | 1.89M | union_op->set_allocated_read(&read_op.read_request()); |
283 | 1.89M | if (read_op.read_from_followers()) { |
284 | 0 | union_op->set_read_from_followers(true); |
285 | 0 | } |
286 | 2.12M | } else { |
287 | 2.12M | auto& write_op = down_cast<PgsqlWriteOp&>(*op); |
288 | 2.12M | if (write_op.write_time()) { |
289 | 103k | req->set_write_time(write_op.write_time().ToUint64()); |
290 | 103k | } |
291 | 2.12M | union_op->set_allocated_write(&write_op.write_request()); |
292 | 2.12M | } |
293 | 4.02M | if (op->read_time()) { |
294 | 24.8k | op->read_time().AddToPB(req->mutable_options()); |
295 | 24.8k | } |
296 | 4.02M | } |
297 | 774k | } |
298 | | |
299 | 380 | Result<std::pair<PgOid, PgOid>> ReserveOids(PgOid database_oid, PgOid next_oid, uint32_t count) { |
300 | 380 | tserver::PgReserveOidsRequestPB req; |
301 | 380 | req.set_database_oid(database_oid); |
302 | 380 | req.set_next_oid(next_oid); |
303 | 380 | req.set_count(count); |
304 | | |
305 | 380 | tserver::PgReserveOidsResponsePB resp; |
306 | | |
307 | 380 | RETURN_NOT_OK(proxy_->ReserveOids(req, &resp, PrepareController())); |
308 | 380 | RETURN_NOT_OK(ResponseStatus(resp)); |
309 | 380 | return std::pair<PgOid, PgOid>(resp.begin_oid(), resp.end_oid()); |
310 | 380 | } |
311 | | |
312 | 0 | Result<bool> IsInitDbDone() { |
313 | 0 | tserver::PgIsInitDbDoneRequestPB req; |
314 | 0 | tserver::PgIsInitDbDoneResponsePB resp; |
315 | |
|
316 | 0 | RETURN_NOT_OK(proxy_->IsInitDbDone(req, &resp, PrepareController())); |
317 | 0 | RETURN_NOT_OK(ResponseStatus(resp)); |
318 | 0 | return resp.done(); |
319 | 0 | } |
320 | | |
321 | 0 | Result<uint64_t> GetCatalogMasterVersion() { |
322 | 0 | tserver::PgGetCatalogMasterVersionRequestPB req; |
323 | 0 | tserver::PgGetCatalogMasterVersionResponsePB resp; |
324 | |
|
325 | 0 | RETURN_NOT_OK(proxy_->GetCatalogMasterVersion(req, &resp, PrepareController())); |
326 | 0 | RETURN_NOT_OK(ResponseStatus(resp)); |
327 | 0 | return resp.version(); |
328 | 0 | } |
329 | | |
330 | 21 | CHECKED_STATUS CreateSequencesDataTable() { |
331 | 21 | tserver::PgCreateSequencesDataTableRequestPB req; |
332 | 21 | tserver::PgCreateSequencesDataTableResponsePB resp; |
333 | | |
334 | 21 | RETURN_NOT_OK(proxy_->CreateSequencesDataTable(req, &resp, PrepareController())); |
335 | 21 | return ResponseStatus(resp); |
336 | 21 | } |
337 | | |
338 | | Result<client::YBTableName> DropTable( |
339 | 1.17k | tserver::PgDropTableRequestPB* req, CoarseTimePoint deadline) { |
340 | 1.17k | req->set_session_id(session_id_); |
341 | 1.17k | tserver::PgDropTableResponsePB resp; |
342 | 1.17k | RETURN_NOT_OK(proxy_->DropTable(*req, &resp, PrepareController(deadline))); |
343 | 1.17k | RETURN_NOT_OK(ResponseStatus(resp)); |
344 | 1.17k | client::YBTableName result; |
345 | 1.17k | if (resp.has_indexed_table()) { |
346 | 140 | result.GetFromTableIdentifierPB(resp.indexed_table()); |
347 | 140 | } |
348 | 1.17k | return result; |
349 | 1.17k | } |
350 | | |
351 | | CHECKED_STATUS BackfillIndex( |
352 | 89 | tserver::PgBackfillIndexRequestPB* req, CoarseTimePoint deadline) { |
353 | 89 | tserver::PgBackfillIndexResponsePB resp; |
354 | 89 | req->set_session_id(session_id_); |
355 | | |
356 | 89 | RETURN_NOT_OK(proxy_->BackfillIndex(*req, &resp, PrepareController(deadline))); |
357 | 89 | return ResponseStatus(resp); |
358 | 89 | } |
359 | | |
360 | 178 | Result<int32> TabletServerCount(bool primary_only) { |
361 | 178 | if (tablet_server_count_cache_[primary_only] > 0) { |
362 | 121 | return tablet_server_count_cache_[primary_only]; |
363 | 121 | } |
364 | 57 | tserver::PgTabletServerCountRequestPB req; |
365 | 57 | tserver::PgTabletServerCountResponsePB resp; |
366 | 57 | req.set_primary_only(primary_only); |
367 | | |
368 | 57 | RETURN_NOT_OK(proxy_->TabletServerCount(req, &resp, PrepareController())); |
369 | 57 | RETURN_NOT_OK(ResponseStatus(resp)); |
370 | 57 | tablet_server_count_cache_[primary_only] = resp.count(); |
371 | 57 | return resp.count(); |
372 | 57 | } |
373 | | |
374 | 2 | Result<client::TabletServersInfo> ListLiveTabletServers(bool primary_only) { |
375 | 2 | tserver::PgListLiveTabletServersRequestPB req; |
376 | 2 | tserver::PgListLiveTabletServersResponsePB resp; |
377 | 2 | req.set_primary_only(primary_only); |
378 | | |
379 | 2 | RETURN_NOT_OK(proxy_->ListLiveTabletServers(req, &resp, PrepareController())); |
380 | 2 | RETURN_NOT_OK(ResponseStatus(resp)); |
381 | 2 | client::TabletServersInfo result; |
382 | 2 | result.reserve(resp.servers().size()); |
383 | 6 | for (const auto& server : resp.servers()) { |
384 | 6 | result.push_back(client::YBTabletServerPlacementInfo::FromPB(server)); |
385 | 6 | } |
386 | 2 | return result; |
387 | 2 | } |
388 | | |
389 | 0 | CHECKED_STATUS ValidatePlacement(const tserver::PgValidatePlacementRequestPB* req) { |
390 | 0 | tserver::PgValidatePlacementResponsePB resp; |
391 | 0 | RETURN_NOT_OK(proxy_->ValidatePlacement(*req, &resp, PrepareController())); |
392 | 0 | return ResponseStatus(resp); |
393 | 0 | } |
394 | | |
395 | | #define YB_PG_CLIENT_SIMPLE_METHOD_IMPL(r, data, method) \ |
396 | | CHECKED_STATUS method( \ |
397 | | tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), RequestPB)* req, \ |
398 | 1.64k | CoarseTimePoint deadline) { \ |
399 | 1.64k | tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) resp; \ |
400 | 1.64k | req->set_session_id(session_id_); \ |
401 | 1.64k | auto status = proxy_->method(*req, &resp, PrepareController(deadline)); \ |
402 | 1.64k | if (!status.ok()) { \ |
403 | 0 | if (status.IsTimedOut()) { \ |
404 | 0 | return STATUS_FORMAT(TimedOut, "Timed out waiting for $0", PrettyFunctionName(__func__)); \ |
405 | 0 | } \ |
406 | 0 | return status; \ |
407 | 0 | } \ |
408 | 1.64k | return ResponseStatus(resp); \ |
409 | 1.64k | } Unexecuted instantiation: _ZN2yb6pggate8PgClient4Impl13AlterDatabaseEPNS_7tserver24PgAlterDatabaseRequestPBENSt3__16chrono10time_pointINS_15CoarseMonoClockENS7_8durationIxNS6_5ratioILl1ELl1000000000EEEEEEE _ZN2yb6pggate8PgClient4Impl10AlterTableEPNS_7tserver21PgAlterTableRequestPBENSt3__16chrono10time_pointINS_15CoarseMonoClockENS7_8durationIxNS6_5ratioILl1ELl1000000000EEEEEEE Line | Count | Source | 398 | 155 | CoarseTimePoint deadline) { \ | 399 | 155 | tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) resp; \ | 400 | 155 | req->set_session_id(session_id_); \ | 401 | 155 | auto status = proxy_->method(*req, &resp, PrepareController(deadline)); \ | 402 | 155 | if (!status.ok()) { \ | 403 | 0 | if (status.IsTimedOut()) { \ | 404 | 0 | return STATUS_FORMAT(TimedOut, "Timed out waiting for $0", PrettyFunctionName(__func__)); \ | 405 | 0 | } \ | 406 | 0 | return status; \ | 407 | 0 | } \ | 408 | 155 | return ResponseStatus(resp); \ | 409 | 155 | } |
_ZN2yb6pggate8PgClient4Impl14CreateDatabaseEPNS_7tserver25PgCreateDatabaseRequestPBENSt3__16chrono10time_pointINS_15CoarseMonoClockENS7_8durationIxNS6_5ratioILl1ELl1000000000EEEEEEE Line | Count | Source | 398 | 22 | CoarseTimePoint deadline) { \ | 399 | 22 | tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) resp; \ | 400 | 22 | req->set_session_id(session_id_); \ | 401 | 22 | auto status = proxy_->method(*req, &resp, PrepareController(deadline)); \ | 402 | 22 | if (!status.ok()) { \ | 403 | 0 | if (status.IsTimedOut()) { \ | 404 | 0 | return STATUS_FORMAT(TimedOut, "Timed out waiting for $0", PrettyFunctionName(__func__)); \ | 405 | 0 | } \ | 406 | 0 | return status; \ | 407 | 0 | } \ | 408 | 22 | return ResponseStatus(resp); \ | 409 | 22 | } |
_ZN2yb6pggate8PgClient4Impl11CreateTableEPNS_7tserver22PgCreateTableRequestPBENSt3__16chrono10time_pointINS_15CoarseMonoClockENS7_8durationIxNS6_5ratioILl1ELl1000000000EEEEEEE Line | Count | Source | 398 | 1.41k | CoarseTimePoint deadline) { \ | 399 | 1.41k | tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) resp; \ | 400 | 1.41k | req->set_session_id(session_id_); \ | 401 | 1.41k | auto status = proxy_->method(*req, &resp, PrepareController(deadline)); \ | 402 | 1.41k | if (!status.ok()) { \ | 403 | 0 | if (status.IsTimedOut()) { \ | 404 | 0 | return STATUS_FORMAT(TimedOut, "Timed out waiting for $0", PrettyFunctionName(__func__)); \ | 405 | 0 | } \ | 406 | 0 | return status; \ | 407 | 0 | } \ | 408 | 1.41k | return ResponseStatus(resp); \ | 409 | 1.41k | } |
_ZN2yb6pggate8PgClient4Impl16CreateTablegroupEPNS_7tserver27PgCreateTablegroupRequestPBENSt3__16chrono10time_pointINS_15CoarseMonoClockENS7_8durationIxNS6_5ratioILl1ELl1000000000EEEEEEE Line | Count | Source | 398 | 1 | CoarseTimePoint deadline) { \ | 399 | 1 | tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) resp; \ | 400 | 1 | req->set_session_id(session_id_); \ | 401 | 1 | auto status = proxy_->method(*req, &resp, PrepareController(deadline)); \ | 402 | 1 | if (!status.ok()) { \ | 403 | 0 | if (status.IsTimedOut()) { \ | 404 | 0 | return STATUS_FORMAT(TimedOut, "Timed out waiting for $0", PrettyFunctionName(__func__)); \ | 405 | 0 | } \ | 406 | 0 | return status; \ | 407 | 0 | } \ | 408 | 1 | return ResponseStatus(resp); \ | 409 | 1 | } |
_ZN2yb6pggate8PgClient4Impl12DropDatabaseEPNS_7tserver23PgDropDatabaseRequestPBENSt3__16chrono10time_pointINS_15CoarseMonoClockENS7_8durationIxNS6_5ratioILl1ELl1000000000EEEEEEE Line | Count | Source | 398 | 21 | CoarseTimePoint deadline) { \ | 399 | 21 | tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) resp; \ | 400 | 21 | req->set_session_id(session_id_); \ | 401 | 21 | auto status = proxy_->method(*req, &resp, PrepareController(deadline)); \ | 402 | 21 | if (!status.ok()) { \ | 403 | 0 | if (status.IsTimedOut()) { \ | 404 | 0 | return STATUS_FORMAT(TimedOut, "Timed out waiting for $0", PrettyFunctionName(__func__)); \ | 405 | 0 | } \ | 406 | 0 | return status; \ | 407 | 0 | } \ | 408 | 21 | return ResponseStatus(resp); \ | 409 | 21 | } |
_ZN2yb6pggate8PgClient4Impl14DropTablegroupEPNS_7tserver25PgDropTablegroupRequestPBENSt3__16chrono10time_pointINS_15CoarseMonoClockENS7_8durationIxNS6_5ratioILl1ELl1000000000EEEEEEE Line | Count | Source | 398 | 1 | CoarseTimePoint deadline) { \ | 399 | 1 | tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) resp; \ | 400 | 1 | req->set_session_id(session_id_); \ | 401 | 1 | auto status = proxy_->method(*req, &resp, PrepareController(deadline)); \ | 402 | 1 | if (!status.ok()) { \ | 403 | 0 | if (status.IsTimedOut()) { \ | 404 | 0 | return STATUS_FORMAT(TimedOut, "Timed out waiting for $0", PrettyFunctionName(__func__)); \ | 405 | 0 | } \ | 406 | 0 | return status; \ | 407 | 0 | } \ | 408 | 1 | return ResponseStatus(resp); \ | 409 | 1 | } |
_ZN2yb6pggate8PgClient4Impl13TruncateTableEPNS_7tserver24PgTruncateTableRequestPBENSt3__16chrono10time_pointINS_15CoarseMonoClockENS7_8durationIxNS6_5ratioILl1ELl1000000000EEEEEEE Line | Count | Source | 398 | 31 | CoarseTimePoint deadline) { \ | 399 | 31 | tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB) resp; \ | 400 | 31 | req->set_session_id(session_id_); \ | 401 | 31 | auto status = proxy_->method(*req, &resp, PrepareController(deadline)); \ | 402 | 31 | if (!status.ok()) { \ | 403 | 0 | if (status.IsTimedOut()) { \ | 404 | 0 | return STATUS_FORMAT(TimedOut, "Timed out waiting for $0", PrettyFunctionName(__func__)); \ | 405 | 0 | } \ | 406 | 0 | return status; \ | 407 | 0 | } \ | 408 | 31 | return ResponseStatus(resp); \ | 409 | 31 | } |
|
410 | | |
411 | | BOOST_PP_SEQ_FOR_EACH(YB_PG_CLIENT_SIMPLE_METHOD_IMPL, ~, YB_PG_CLIENT_SIMPLE_METHODS); |
412 | | |
413 | | private: |
414 | 1.65k | std::string LogPrefix() const { |
415 | 1.65k | return Format("S $0: ", session_id_); |
416 | 1.65k | } |
417 | | |
418 | | rpc::RpcController* SetupController( |
419 | 999k | rpc::RpcController* controller, CoarseTimePoint deadline = CoarseTimePoint()) { |
420 | 999k | if (deadline != CoarseTimePoint()) { |
421 | 1.71k | controller->set_deadline(deadline); |
422 | 997k | } else { |
423 | 997k | controller->set_timeout(timeout_); |
424 | 997k | } |
425 | 999k | return controller; |
426 | 999k | } |
427 | | |
428 | 224k | rpc::RpcController* PrepareController(CoarseTimePoint deadline = CoarseTimePoint()) { |
429 | 224k | controller_.Reset(); |
430 | 224k | return SetupController(&controller_, deadline); |
431 | 224k | } |
432 | | |
433 | 2.74k | rpc::RpcController* PrepareHeartbeatController() { |
434 | 2.74k | heartbeat_controller_.Reset(); |
435 | 2.74k | heartbeat_controller_.set_timeout(FLAGS_pg_client_heartbeat_interval_ms * 1ms - 1s); |
436 | 2.74k | return &heartbeat_controller_; |
437 | 2.74k | } |
438 | | |
439 | | std::unique_ptr<tserver::PgClientServiceProxy> proxy_; |
440 | | rpc::RpcController controller_; |
441 | | uint64_t session_id_ = 0; |
442 | | |
443 | | rpc::Poller heartbeat_poller_; |
444 | | std::atomic<bool> heartbeat_running_{false}; |
445 | | rpc::RpcController heartbeat_controller_; |
446 | | tserver::PgHeartbeatResponsePB heartbeat_resp_; |
447 | | std::promise<Result<uint64_t>> create_session_promise_; |
448 | | std::array<int, 2> tablet_server_count_cache_; |
449 | | MonoDelta timeout_ = FLAGS_yb_client_admin_operation_timeout_sec * 1s; |
450 | | }; |
451 | | |
452 | 1.65k | PgClient::PgClient() : impl_(new Impl) { |
453 | 1.65k | } |
454 | | |
455 | 1.65k | PgClient::~PgClient() { |
456 | 1.65k | } |
457 | | |
458 | | Status PgClient::Start( |
459 | | rpc::ProxyCache* proxy_cache, rpc::Scheduler* scheduler, |
460 | 1.65k | const tserver::TServerSharedObject& tserver_shared_object) { |
461 | 1.65k | return impl_->Start(proxy_cache, scheduler, tserver_shared_object); |
462 | 1.65k | } |
463 | | |
464 | 1.65k | void PgClient::Shutdown() { |
465 | 1.65k | impl_->Shutdown(); |
466 | 1.65k | } |
467 | | |
468 | 0 | void PgClient::SetTimeout(MonoDelta timeout) { |
469 | 0 | impl_->SetTimeout(timeout); |
470 | 0 | } |
471 | | |
472 | | Result<PgTableDescPtr> PgClient::OpenTable( |
473 | 64.6k | const PgObjectId& table_id, bool reopen, CoarseTimePoint invalidate_cache_time) { |
474 | 64.6k | return impl_->OpenTable(table_id, reopen, invalidate_cache_time); |
475 | 64.6k | } |
476 | | |
477 | 82.4k | Status PgClient::FinishTransaction(Commit commit, DdlMode ddl_mode) { |
478 | 82.4k | return impl_->FinishTransaction(commit, ddl_mode); |
479 | 82.4k | } |
480 | | |
481 | 1.61k | Result<master::GetNamespaceInfoResponsePB> PgClient::GetDatabaseInfo(uint32_t oid) { |
482 | 1.61k | return impl_->GetDatabaseInfo(oid); |
483 | 1.61k | } |
484 | | |
485 | | Result<std::pair<PgOid, PgOid>> PgClient::ReserveOids( |
486 | 380 | PgOid database_oid, PgOid next_oid, uint32_t count) { |
487 | 380 | return impl_->ReserveOids(database_oid, next_oid, count); |
488 | 380 | } |
489 | | |
490 | 0 | Result<bool> PgClient::IsInitDbDone() { |
491 | 0 | return impl_->IsInitDbDone(); |
492 | 0 | } |
493 | | |
494 | 0 | Result<uint64_t> PgClient::GetCatalogMasterVersion() { |
495 | 0 | return impl_->GetCatalogMasterVersion(); |
496 | 0 | } |
497 | | |
498 | 21 | Status PgClient::CreateSequencesDataTable() { |
499 | 21 | return impl_->CreateSequencesDataTable(); |
500 | 21 | } |
501 | | |
502 | | Result<client::YBTableName> PgClient::DropTable( |
503 | 1.17k | tserver::PgDropTableRequestPB* req, CoarseTimePoint deadline) { |
504 | 1.17k | return impl_->DropTable(req, deadline); |
505 | 1.17k | } |
506 | | |
507 | | Status PgClient::BackfillIndex( |
508 | 89 | tserver::PgBackfillIndexRequestPB* req, CoarseTimePoint deadline) { |
509 | 89 | return impl_->BackfillIndex(req, deadline); |
510 | 89 | } |
511 | | |
512 | 178 | Result<int32> PgClient::TabletServerCount(bool primary_only) { |
513 | 178 | return impl_->TabletServerCount(primary_only); |
514 | 178 | } |
515 | | |
516 | 2 | Result<client::TabletServersInfo> PgClient::ListLiveTabletServers(bool primary_only) { |
517 | 2 | return impl_->ListLiveTabletServers(primary_only); |
518 | 2 | } |
519 | | |
520 | | Status PgClient::SetActiveSubTransaction( |
521 | 48.8k | SubTransactionId id, tserver::PgPerformOptionsPB* options) { |
522 | 48.8k | return impl_->SetActiveSubTransaction(id, options); |
523 | 48.8k | } |
524 | | |
525 | 23.5k | Status PgClient::RollbackSubTransaction(SubTransactionId id) { |
526 | 23.5k | return impl_->RollbackSubTransaction(id); |
527 | 23.5k | } |
528 | | |
529 | 0 | Status PgClient::ValidatePlacement(const tserver::PgValidatePlacementRequestPB* req) { |
530 | 0 | return impl_->ValidatePlacement(req); |
531 | 0 | } |
532 | | |
533 | | void PgClient::PerformAsync( |
534 | | tserver::PgPerformOptionsPB* options, |
535 | | PgsqlOps* operations, |
536 | 775k | const PerformCallback& callback) { |
537 | 775k | impl_->PerformAsync(options, operations, callback); |
538 | 775k | } |
539 | | |
540 | | #define YB_PG_CLIENT_SIMPLE_METHOD_DEFINE(r, data, method) \ |
541 | | Status PgClient::method( \ |
542 | | tserver::BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), RequestPB)* req, \ |
543 | 1.64k | CoarseTimePoint deadline) { \ |
544 | 1.64k | return impl_->method(req, deadline); \ |
545 | 1.64k | } Unexecuted instantiation: _ZN2yb6pggate8PgClient13AlterDatabaseEPNS_7tserver24PgAlterDatabaseRequestPBENSt3__16chrono10time_pointINS_15CoarseMonoClockENS6_8durationIxNS5_5ratioILl1ELl1000000000EEEEEEE _ZN2yb6pggate8PgClient10AlterTableEPNS_7tserver21PgAlterTableRequestPBENSt3__16chrono10time_pointINS_15CoarseMonoClockENS6_8durationIxNS5_5ratioILl1ELl1000000000EEEEEEE Line | Count | Source | 543 | 155 | CoarseTimePoint deadline) { \ | 544 | 155 | return impl_->method(req, deadline); \ | 545 | 155 | } |
_ZN2yb6pggate8PgClient14CreateDatabaseEPNS_7tserver25PgCreateDatabaseRequestPBENSt3__16chrono10time_pointINS_15CoarseMonoClockENS6_8durationIxNS5_5ratioILl1ELl1000000000EEEEEEE Line | Count | Source | 543 | 22 | CoarseTimePoint deadline) { \ | 544 | 22 | return impl_->method(req, deadline); \ | 545 | 22 | } |
_ZN2yb6pggate8PgClient11CreateTableEPNS_7tserver22PgCreateTableRequestPBENSt3__16chrono10time_pointINS_15CoarseMonoClockENS6_8durationIxNS5_5ratioILl1ELl1000000000EEEEEEE Line | Count | Source | 543 | 1.41k | CoarseTimePoint deadline) { \ | 544 | 1.41k | return impl_->method(req, deadline); \ | 545 | 1.41k | } |
_ZN2yb6pggate8PgClient16CreateTablegroupEPNS_7tserver27PgCreateTablegroupRequestPBENSt3__16chrono10time_pointINS_15CoarseMonoClockENS6_8durationIxNS5_5ratioILl1ELl1000000000EEEEEEE Line | Count | Source | 543 | 1 | CoarseTimePoint deadline) { \ | 544 | 1 | return impl_->method(req, deadline); \ | 545 | 1 | } |
_ZN2yb6pggate8PgClient12DropDatabaseEPNS_7tserver23PgDropDatabaseRequestPBENSt3__16chrono10time_pointINS_15CoarseMonoClockENS6_8durationIxNS5_5ratioILl1ELl1000000000EEEEEEE Line | Count | Source | 543 | 21 | CoarseTimePoint deadline) { \ | 544 | 21 | return impl_->method(req, deadline); \ | 545 | 21 | } |
_ZN2yb6pggate8PgClient14DropTablegroupEPNS_7tserver25PgDropTablegroupRequestPBENSt3__16chrono10time_pointINS_15CoarseMonoClockENS6_8durationIxNS5_5ratioILl1ELl1000000000EEEEEEE Line | Count | Source | 543 | 1 | CoarseTimePoint deadline) { \ | 544 | 1 | return impl_->method(req, deadline); \ | 545 | 1 | } |
_ZN2yb6pggate8PgClient13TruncateTableEPNS_7tserver24PgTruncateTableRequestPBENSt3__16chrono10time_pointINS_15CoarseMonoClockENS6_8durationIxNS5_5ratioILl1ELl1000000000EEEEEEE Line | Count | Source | 543 | 31 | CoarseTimePoint deadline) { \ | 544 | 31 | return impl_->method(req, deadline); \ | 545 | 31 | } |
|
546 | | |
547 | | BOOST_PP_SEQ_FOR_EACH(YB_PG_CLIENT_SIMPLE_METHOD_DEFINE, ~, YB_PG_CLIENT_SIMPLE_METHODS); |
548 | | |
549 | | } // namespace pggate |
550 | | } // namespace yb |