/Users/deen/code/yugabyte-db/src/yb/yql/pggate/ybc_pggate_tool.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | |
13 | | #include "yb/yql/pggate/ybc_pggate_tool.h" |
14 | | |
15 | | #include "yb/common/ybc-internal.h" |
16 | | |
17 | | #include "yb/rpc/rpc_controller.h" |
18 | | |
19 | | #include "yb/server/server_base_options.h" |
20 | | |
21 | | #include "yb/tserver/tserver_service.proxy.h" |
22 | | #include "yb/tserver/tserver_shared_mem.h" |
23 | | |
24 | | #include "yb/util/countdown_latch.h" |
25 | | #include "yb/util/shared_mem.h" |
26 | | |
27 | | #include "yb/yql/pggate/pg_env.h" |
28 | | #include "yb/yql/pggate/pggate.h" |
29 | | #include "yb/yql/pggate/pggate_flags.h" |
30 | | #include "yb/yql/pggate/ybc_pggate.h" |
31 | | |
32 | | namespace yb { |
33 | | namespace pggate { |
34 | | |
35 | | namespace { |
36 | | |
37 | | // Fetches relation's unique constraint name to specified buffer. |
38 | | // If relation is not an index and it has primary key the name of primary key index is returned. |
39 | | // In other cases, relation name is used. |
40 | | // |
41 | | // Not implemented for tools. |
42 | 0 | void FetchUniqueConstraintName(PgOid relation_id, char* dest, size_t max_size) { |
43 | 0 | CHECK(false) << "Not implemented"; |
44 | 0 | } |
45 | | |
46 | 0 | YBCPgMemctx GetCurrentToolYbMemctx() { |
47 | 0 | static YBCPgMemctx tool_memctx = nullptr; |
48 | |
|
49 | 0 | if (!tool_memctx) { |
50 | 0 | tool_memctx = YBCPgCreateMemctx(); |
51 | 0 | } |
52 | 0 | return tool_memctx; |
53 | 0 | } |
54 | | |
55 | | // Conversion Table. |
56 | | // Contain function pointers for conversion between PostgreSQL Datum to YugaByte data. |
57 | | // Currently it is not used in the tools and can be empty. |
58 | | static const YBCPgTypeEntity YBCEmptyTypeEntityTable[] = {}; |
59 | | |
60 | 0 | CHECKED_STATUS PrepareInitPgGateBackend() { |
61 | 0 | server::MasterAddresses master_addresses; |
62 | 0 | std::string resolved_str; |
63 | 0 | RETURN_NOT_OK(server::DetermineMasterAddresses( |
64 | 0 | "pggate_master_addresses", FLAGS_pggate_master_addresses, 0, &master_addresses, |
65 | 0 | &resolved_str)); |
66 | 0 | LOG(INFO) << "Master addresses: " << AsString(master_addresses); |
67 | |
|
68 | 0 | PgApiContext context; |
69 | 0 | struct Data { |
70 | 0 | boost::optional<tserver::TServerSharedObject> tserver_shared_object; |
71 | 0 | HostPort reached_host_port; |
72 | 0 | std::atomic<bool> flag{false}; |
73 | 0 | CountDownLatch latch{1}; |
74 | 0 | std::atomic<size_t> running{0}; |
75 | 0 | Status failure; |
76 | 0 | }; |
77 | 0 | static std::shared_ptr<Data> data = std::make_shared<Data>(); |
78 | 0 | data->tserver_shared_object.emplace(VERIFY_RESULT(tserver::TServerSharedObject::Create())); |
79 | 0 | data->running = 0; |
80 | 0 | for (const auto& list : master_addresses) { |
81 | 0 | data->running += list.size(); |
82 | 0 | } |
83 | 0 | for (const auto& list : master_addresses) { |
84 | 0 | for (const auto& host_port : list) { |
85 | 0 | tserver::TabletServerServiceProxy proxy(context.proxy_cache.get(), host_port); |
86 | 0 | struct ReqData { |
87 | 0 | tserver::GetSharedDataRequestPB req; |
88 | 0 | tserver::GetSharedDataResponsePB resp; |
89 | 0 | rpc::RpcController controller; |
90 | 0 | }; |
91 | 0 | auto req_data = std::make_shared<ReqData>(); |
92 | 0 | req_data->controller.set_timeout(std::chrono::seconds(60)); |
93 | 0 | proxy.GetSharedDataAsync( |
94 | 0 | req_data->req, &req_data->resp, &req_data->controller, |
95 | 0 | [req_data, host_port = host_port] { |
96 | 0 | if (req_data->controller.status().ok()) { |
97 | 0 | bool expected = false; |
98 | 0 | if (data->flag.compare_exchange_strong(expected, true)) { |
99 | 0 | memcpy(pointer_cast<char*>(&**data->tserver_shared_object), |
100 | 0 | req_data->resp.data().c_str(), req_data->resp.data().size()); |
101 | 0 | data->reached_host_port = host_port; |
102 | 0 | data->latch.CountDown(); |
103 | 0 | } |
104 | 0 | } else if (--data->running == 0) { |
105 | 0 | data->failure = req_data->controller.status(); |
106 | 0 | data->latch.CountDown(); |
107 | 0 | } |
108 | 0 | }); |
109 | 0 | } |
110 | 0 | } |
111 | |
|
112 | 0 | data->latch.Wait(); |
113 | 0 | RETURN_NOT_OK(data->failure); |
114 | |
|
115 | 0 | FLAGS_pggate_tserver_shm_fd = data->tserver_shared_object->GetFd(); |
116 | |
|
117 | 0 | auto& shared_data = **data->tserver_shared_object; |
118 | 0 | shared_data.SetHostEndpoint(shared_data.endpoint(), data->reached_host_port.host()); |
119 | 0 | LOG(INFO) << "Shared data fetched, endpoint: " << shared_data.endpoint() |
120 | 0 | << ", host: " << shared_data.host().ToBuffer() |
121 | 0 | << ", catalog version: " << shared_data.ysql_catalog_version() |
122 | 0 | << ", postgres_auth_key: " << shared_data.postgres_auth_key(); |
123 | |
|
124 | 0 | YBCPgCallbacks callbacks; |
125 | 0 | callbacks.FetchUniqueConstraintName = &FetchUniqueConstraintName; |
126 | 0 | callbacks.GetCurrentYbMemctx = &GetCurrentToolYbMemctx; |
127 | 0 | YBCInitPgGateEx(YBCEmptyTypeEntityTable, 0, callbacks, &context); |
128 | |
|
129 | 0 | return Status::OK(); |
130 | 0 | } |
131 | | |
132 | | } // anonymous namespace |
133 | | |
134 | | //-------------------------------------------------------------------------------------------------- |
135 | | // C API. |
136 | | //-------------------------------------------------------------------------------------------------- |
137 | | extern "C" { |
138 | | |
139 | 0 | void YBCSetMasterAddresses(const char* hosts) { |
140 | 0 | LOG(INFO) << "Setting custom master addresses: " << hosts; |
141 | 0 | FLAGS_pggate_master_addresses = hosts; |
142 | 0 | } |
143 | | |
144 | 0 | YBCStatus YBCInitPgGateBackend() { |
145 | 0 | auto status = PrepareInitPgGateBackend(); |
146 | 0 | if (!status.ok()) { |
147 | 0 | return ToYBCStatus(status); |
148 | 0 | } |
149 | 0 | return YBCPgInitSession(/* pg_env */ nullptr, /* database_name */ nullptr); |
150 | 0 | } |
151 | | |
152 | 0 | void YBCShutdownPgGateBackend() { |
153 | 0 | YBCDestroyPgGate(); |
154 | 0 | YBCPgDestroyMemctx(GetCurrentToolYbMemctx()); |
155 | 0 | } |
156 | | |
157 | | } // extern "C" |
158 | | |
159 | | } // namespace pggate |
160 | | } // namespace yb |