YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/yql/pggate/ybc_pggate_tool.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
13
#include "yb/yql/pggate/ybc_pggate_tool.h"
14
15
#include "yb/common/ybc-internal.h"
16
17
#include "yb/rpc/rpc_controller.h"
18
19
#include "yb/server/server_base_options.h"
20
21
#include "yb/tserver/tserver_service.proxy.h"
22
#include "yb/tserver/tserver_shared_mem.h"
23
24
#include "yb/util/countdown_latch.h"
25
#include "yb/util/shared_mem.h"
26
27
#include "yb/yql/pggate/pg_env.h"
28
#include "yb/yql/pggate/pggate.h"
29
#include "yb/yql/pggate/pggate_flags.h"
30
#include "yb/yql/pggate/ybc_pggate.h"
31
32
namespace yb {
33
namespace pggate {
34
35
namespace {
36
37
// Fetches relation's unique constraint name to specified buffer.
38
// If relation is not an index and it has primary key the name of primary key index is returned.
39
// In other cases, relation name is used.
40
//
41
// Not implemented for tools.
42
0
void FetchUniqueConstraintName(PgOid relation_id, char* dest, size_t max_size) {
43
0
  CHECK(false) << "Not implemented";
44
0
}
45
46
0
YBCPgMemctx GetCurrentToolYbMemctx() {
47
0
  static YBCPgMemctx tool_memctx = nullptr;
48
49
0
  if (!tool_memctx) {
50
0
    tool_memctx = YBCPgCreateMemctx();
51
0
  }
52
0
  return tool_memctx;
53
0
}
54
55
// Conversion Table.
56
// Contain function pointers for conversion between PostgreSQL Datum to YugaByte data.
57
// Currently it is not used in the tools and can be empty.
58
static const YBCPgTypeEntity YBCEmptyTypeEntityTable[] = {};
59
60
0
CHECKED_STATUS PrepareInitPgGateBackend() {
61
0
  server::MasterAddresses master_addresses;
62
0
  std::string resolved_str;
63
0
  RETURN_NOT_OK(server::DetermineMasterAddresses(
64
0
      "pggate_master_addresses", FLAGS_pggate_master_addresses, 0, &master_addresses,
65
0
      &resolved_str));
66
0
  LOG(INFO) << "Master addresses: " << AsString(master_addresses);
67
68
0
  PgApiContext context;
69
0
  struct Data {
70
0
    boost::optional<tserver::TServerSharedObject> tserver_shared_object;
71
0
    HostPort reached_host_port;
72
0
    std::atomic<bool> flag{false};
73
0
    CountDownLatch latch{1};
74
0
    std::atomic<size_t> running{0};
75
0
    Status failure;
76
0
  };
77
0
  static std::shared_ptr<Data> data = std::make_shared<Data>();
78
0
  data->tserver_shared_object.emplace(VERIFY_RESULT(tserver::TServerSharedObject::Create()));
79
0
  data->running = 0;
80
0
  for (const auto& list : master_addresses) {
81
0
    data->running += list.size();
82
0
  }
83
0
  for (const auto& list : master_addresses) {
84
0
    for (const auto& host_port : list) {
85
0
      tserver::TabletServerServiceProxy proxy(context.proxy_cache.get(), host_port);
86
0
      struct ReqData {
87
0
        tserver::GetSharedDataRequestPB req;
88
0
        tserver::GetSharedDataResponsePB resp;
89
0
        rpc::RpcController controller;
90
0
      };
91
0
      auto req_data = std::make_shared<ReqData>();
92
0
      req_data->controller.set_timeout(std::chrono::seconds(60));
93
0
      proxy.GetSharedDataAsync(
94
0
          req_data->req, &req_data->resp, &req_data->controller,
95
0
          [req_data, host_port = host_port] {
96
0
        if (req_data->controller.status().ok()) {
97
0
          bool expected = false;
98
0
          if (data->flag.compare_exchange_strong(expected, true)) {
99
0
            memcpy(pointer_cast<char*>(&**data->tserver_shared_object),
100
0
                   req_data->resp.data().c_str(), req_data->resp.data().size());
101
0
            data->reached_host_port = host_port;
102
0
            data->latch.CountDown();
103
0
          }
104
0
        } else if (--data->running == 0) {
105
0
          data->failure = req_data->controller.status();
106
0
          data->latch.CountDown();
107
0
        }
108
0
      });
109
0
    }
110
0
  }
111
112
0
  data->latch.Wait();
113
0
  RETURN_NOT_OK(data->failure);
114
115
0
  FLAGS_pggate_tserver_shm_fd = data->tserver_shared_object->GetFd();
116
117
0
  auto& shared_data = **data->tserver_shared_object;
118
0
  shared_data.SetHostEndpoint(shared_data.endpoint(), data->reached_host_port.host());
119
0
  LOG(INFO) << "Shared data fetched, endpoint: " << shared_data.endpoint()
120
0
            << ", host: " << shared_data.host().ToBuffer()
121
0
            << ", catalog version: " << shared_data.ysql_catalog_version()
122
0
            << ", postgres_auth_key: " << shared_data.postgres_auth_key();
123
124
0
  YBCPgCallbacks callbacks;
125
0
  callbacks.FetchUniqueConstraintName = &FetchUniqueConstraintName;
126
0
  callbacks.GetCurrentYbMemctx = &GetCurrentToolYbMemctx;
127
0
  YBCInitPgGateEx(YBCEmptyTypeEntityTable, 0, callbacks, &context);
128
129
0
  return Status::OK();
130
0
}
131
132
} // anonymous namespace
133
134
//--------------------------------------------------------------------------------------------------
135
// C API.
136
//--------------------------------------------------------------------------------------------------
137
extern "C" {
138
139
0
void YBCSetMasterAddresses(const char* hosts) {
140
0
  LOG(INFO) << "Setting custom master addresses: " << hosts;
141
0
  FLAGS_pggate_master_addresses = hosts;
142
0
}
143
144
0
YBCStatus YBCInitPgGateBackend() {
145
0
  auto status = PrepareInitPgGateBackend();
146
0
  if (!status.ok()) {
147
0
    return ToYBCStatus(status);
148
0
  }
149
0
  return YBCPgInitSession(/* pg_env */ nullptr, /* database_name */ nullptr);
150
0
}
151
152
0
void YBCShutdownPgGateBackend() {
153
0
  YBCDestroyPgGate();
154
0
  YBCPgDestroyMemctx(GetCurrentToolYbMemctx());
155
0
}
156
157
} // extern "C"
158
159
} // namespace pggate
160
} // namespace yb