/Users/deen/code/yugabyte-db/src/yb/tserver/pg_client_service.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/tserver/pg_client_service.h" |
15 | | |
16 | | #include <shared_mutex> |
17 | | |
18 | | #include <queue> |
19 | | |
20 | | #include <boost/multi_index/hashed_index.hpp> |
21 | | #include <boost/multi_index/mem_fun.hpp> |
22 | | #include <boost/multi_index/ordered_index.hpp> |
23 | | #include <boost/multi_index_container.hpp> |
24 | | |
25 | | #include "yb/client/client.h" |
26 | | #include "yb/client/schema.h" |
27 | | #include "yb/client/table.h" |
28 | | #include "yb/client/table_creator.h" |
29 | | #include "yb/client/tablet_server.h" |
30 | | |
31 | | #include "yb/common/partition.h" |
32 | | #include "yb/common/pg_types.h" |
33 | | #include "yb/common/wire_protocol.h" |
34 | | |
35 | | #include "yb/master/master_admin.proxy.h" |
36 | | |
37 | | #include "yb/rpc/rpc_context.h" |
38 | | #include "yb/rpc/rpc_controller.h" |
39 | | #include "yb/rpc/scheduler.h" |
40 | | |
41 | | #include "yb/tserver/pg_client_session.h" |
42 | | #include "yb/tserver/pg_create_table.h" |
43 | | #include "yb/tserver/pg_table_cache.h" |
44 | | |
45 | | #include "yb/util/net/net_util.h" |
46 | | #include "yb/util/result.h" |
47 | | #include "yb/util/shared_lock.h" |
48 | | #include "yb/util/status_format.h" |
49 | | #include "yb/util/status_log.h" |
50 | | #include "yb/util/status.h" |
51 | | |
52 | | using namespace std::literals; |
53 | | |
54 | | DEFINE_uint64(pg_client_session_expiration_ms, 60000, |
55 | | "Pg client session expiration time in milliseconds."); |
56 | | |
57 | | namespace yb { |
58 | | namespace tserver { |
59 | | |
60 | | namespace { |
61 | | |
62 | | template <class Resp> |
63 | 521k | void Respond(const Status& status, Resp* resp, rpc::RpcContext* context) { |
64 | 521k | if (!status.ok()) { |
65 | 16.2k | StatusToPB(status, resp->mutable_status()); |
66 | 16.2k | } |
67 | 521k | context->RespondSuccess(); |
68 | 521k | } pg_client_service.cc:void yb::tserver::(anonymous namespace)::Respond<yb::tserver::PgPerformResponsePB>(yb::Status const&, yb::tserver::PgPerformResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 63 | 13 | void Respond(const Status& status, Resp* resp, rpc::RpcContext* context) { | 64 | 13 | if (!status.ok()) { | 65 | 13 | StatusToPB(status, resp->mutable_status()); | 66 | 13 | } | 67 | 13 | context->RespondSuccess(); | 68 | 13 | } |
pg_client_service.cc:void yb::tserver::(anonymous namespace)::Respond<yb::tserver::PgAlterDatabaseResponsePB>(yb::Status const&, yb::tserver::PgAlterDatabaseResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 63 | 3 | void Respond(const Status& status, Resp* resp, rpc::RpcContext* context) { | 64 | 3 | if (!status.ok()) { | 65 | 0 | StatusToPB(status, resp->mutable_status()); | 66 | 0 | } | 67 | 3 | context->RespondSuccess(); | 68 | 3 | } |
pg_client_service.cc:void yb::tserver::(anonymous namespace)::Respond<yb::tserver::PgAlterTableResponsePB>(yb::Status const&, yb::tserver::PgAlterTableResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 63 | 522 | void Respond(const Status& status, Resp* resp, rpc::RpcContext* context) { | 64 | 522 | if (!status.ok()) { | 65 | 2 | StatusToPB(status, resp->mutable_status()); | 66 | 2 | } | 67 | 522 | context->RespondSuccess(); | 68 | 522 | } |
pg_client_service.cc:void yb::tserver::(anonymous namespace)::Respond<yb::tserver::PgBackfillIndexResponsePB>(yb::Status const&, yb::tserver::PgBackfillIndexResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 63 | 540 | void Respond(const Status& status, Resp* resp, rpc::RpcContext* context) { | 64 | 540 | if (!status.ok()) { | 65 | 10 | StatusToPB(status, resp->mutable_status()); | 66 | 10 | } | 67 | 540 | context->RespondSuccess(); | 68 | 540 | } |
pg_client_service.cc:void yb::tserver::(anonymous namespace)::Respond<yb::tserver::PgCreateDatabaseResponsePB>(yb::Status const&, yb::tserver::PgCreateDatabaseResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 63 | 134 | void Respond(const Status& status, Resp* resp, rpc::RpcContext* context) { | 64 | 134 | if (!status.ok()) { | 65 | 3 | StatusToPB(status, resp->mutable_status()); | 66 | 3 | } | 67 | 134 | context->RespondSuccess(); | 68 | 134 | } |
Unexecuted instantiation: pg_client_service.cc:void yb::tserver::(anonymous namespace)::Respond<yb::tserver::PgCreateSequencesDataTableResponsePB>(yb::Status const&, yb::tserver::PgCreateSequencesDataTableResponsePB*, yb::rpc::RpcContext*) pg_client_service.cc:void yb::tserver::(anonymous namespace)::Respond<yb::tserver::PgCreateTableResponsePB>(yb::Status const&, yb::tserver::PgCreateTableResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 63 | 5.05k | void Respond(const Status& status, Resp* resp, rpc::RpcContext* context) { | 64 | 5.05k | if (!status.ok()) { | 65 | 27 | StatusToPB(status, resp->mutable_status()); | 66 | 27 | } | 67 | 5.05k | context->RespondSuccess(); | 68 | 5.05k | } |
pg_client_service.cc:void yb::tserver::(anonymous namespace)::Respond<yb::tserver::PgCreateTablegroupResponsePB>(yb::Status const&, yb::tserver::PgCreateTablegroupResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 63 | 54 | void Respond(const Status& status, Resp* resp, rpc::RpcContext* context) { | 64 | 54 | if (!status.ok()) { | 65 | 2 | StatusToPB(status, resp->mutable_status()); | 66 | 2 | } | 67 | 54 | context->RespondSuccess(); | 68 | 54 | } |
pg_client_service.cc:void yb::tserver::(anonymous namespace)::Respond<yb::tserver::PgDeleteDBSequencesResponsePB>(yb::Status const&, yb::tserver::PgDeleteDBSequencesResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 63 | 71 | void Respond(const Status& status, Resp* resp, rpc::RpcContext* context) { | 64 | 71 | if (!status.ok()) { | 65 | 0 | StatusToPB(status, resp->mutable_status()); | 66 | 0 | } | 67 | 71 | context->RespondSuccess(); | 68 | 71 | } |
pg_client_service.cc:void yb::tserver::(anonymous namespace)::Respond<yb::tserver::PgDeleteSequenceTupleResponsePB>(yb::Status const&, yb::tserver::PgDeleteSequenceTupleResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 63 | 282 | void Respond(const Status& status, Resp* resp, rpc::RpcContext* context) { | 64 | 282 | if (!status.ok()) { | 65 | 0 | StatusToPB(status, resp->mutable_status()); | 66 | 0 | } | 67 | 282 | context->RespondSuccess(); | 68 | 282 | } |
pg_client_service.cc:void yb::tserver::(anonymous namespace)::Respond<yb::tserver::PgDropDatabaseResponsePB>(yb::Status const&, yb::tserver::PgDropDatabaseResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 63 | 72 | void Respond(const Status& status, Resp* resp, rpc::RpcContext* context) { | 64 | 72 | if (!status.ok()) { | 65 | 1 | StatusToPB(status, resp->mutable_status()); | 66 | 1 | } | 67 | 72 | context->RespondSuccess(); | 68 | 72 | } |
pg_client_service.cc:void yb::tserver::(anonymous namespace)::Respond<yb::tserver::PgDropTableResponsePB>(yb::Status const&, yb::tserver::PgDropTableResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 63 | 4.14k | void Respond(const Status& status, Resp* resp, rpc::RpcContext* context) { | 64 | 4.14k | if (!status.ok()) { | 65 | 7 | StatusToPB(status, resp->mutable_status()); | 66 | 7 | } | 67 | 4.14k | context->RespondSuccess(); | 68 | 4.14k | } |
pg_client_service.cc:void yb::tserver::(anonymous namespace)::Respond<yb::tserver::PgDropTablegroupResponsePB>(yb::Status const&, yb::tserver::PgDropTablegroupResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 63 | 39 | void Respond(const Status& status, Resp* resp, rpc::RpcContext* context) { | 64 | 39 | if (!status.ok()) { | 65 | 0 | StatusToPB(status, resp->mutable_status()); | 66 | 0 | } | 67 | 39 | context->RespondSuccess(); | 68 | 39 | } |
pg_client_service.cc:void yb::tserver::(anonymous namespace)::Respond<yb::tserver::PgFinishTransactionResponsePB>(yb::Status const&, yb::tserver::PgFinishTransactionResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 63 | 218k | void Respond(const Status& status, Resp* resp, rpc::RpcContext* context) { | 64 | 218k | if (!status.ok()) { | 65 | 16.0k | StatusToPB(status, resp->mutable_status()); | 66 | 16.0k | } | 67 | 218k | context->RespondSuccess(); | 68 | 218k | } |
pg_client_service.cc:void yb::tserver::(anonymous namespace)::Respond<yb::tserver::PgGetCatalogMasterVersionResponsePB>(yb::Status const&, yb::tserver::PgGetCatalogMasterVersionResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 63 | 22 | void Respond(const Status& status, Resp* resp, rpc::RpcContext* context) { | 64 | 22 | if (!status.ok()) { | 65 | 0 | StatusToPB(status, resp->mutable_status()); | 66 | 0 | } | 67 | 22 | context->RespondSuccess(); | 68 | 22 | } |
pg_client_service.cc:void yb::tserver::(anonymous namespace)::Respond<yb::tserver::PgGetDatabaseInfoResponsePB>(yb::Status const&, yb::tserver::PgGetDatabaseInfoResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 63 | 5.72k | void Respond(const Status& status, Resp* resp, rpc::RpcContext* context) { | 64 | 5.72k | if (!status.ok()) { | 65 | 0 | StatusToPB(status, resp->mutable_status()); | 66 | 0 | } | 67 | 5.72k | context->RespondSuccess(); | 68 | 5.72k | } |
pg_client_service.cc:void yb::tserver::(anonymous namespace)::Respond<yb::tserver::PgHeartbeatResponsePB>(yb::Status const&, yb::tserver::PgHeartbeatResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 63 | 10.7k | void Respond(const Status& status, Resp* resp, rpc::RpcContext* context) { | 64 | 10.7k | if (!status.ok()) { | 65 | 0 | StatusToPB(status, resp->mutable_status()); | 66 | 0 | } | 67 | 10.7k | context->RespondSuccess(); | 68 | 10.7k | } |
pg_client_service.cc:void yb::tserver::(anonymous namespace)::Respond<yb::tserver::PgInsertSequenceTupleResponsePB>(yb::Status const&, yb::tserver::PgInsertSequenceTupleResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 63 | 295 | void Respond(const Status& status, Resp* resp, rpc::RpcContext* context) { | 64 | 295 | if (!status.ok()) { | 65 | 0 | StatusToPB(status, resp->mutable_status()); | 66 | 0 | } | 67 | 295 | context->RespondSuccess(); | 68 | 295 | } |
pg_client_service.cc:void yb::tserver::(anonymous namespace)::Respond<yb::tserver::PgIsInitDbDoneResponsePB>(yb::Status const&, yb::tserver::PgIsInitDbDoneResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 63 | 2 | void Respond(const Status& status, Resp* resp, rpc::RpcContext* context) { | 64 | 2 | if (!status.ok()) { | 65 | 0 | StatusToPB(status, resp->mutable_status()); | 66 | 0 | } | 67 | 2 | context->RespondSuccess(); | 68 | 2 | } |
pg_client_service.cc:void yb::tserver::(anonymous namespace)::Respond<yb::tserver::PgListLiveTabletServersResponsePB>(yb::Status const&, yb::tserver::PgListLiveTabletServersResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 63 | 4 | void Respond(const Status& status, Resp* resp, rpc::RpcContext* context) { | 64 | 4 | if (!status.ok()) { | 65 | 0 | StatusToPB(status, resp->mutable_status()); | 66 | 0 | } | 67 | 4 | context->RespondSuccess(); | 68 | 4 | } |
pg_client_service.cc:void yb::tserver::(anonymous namespace)::Respond<yb::tserver::PgOpenTableResponsePB>(yb::Status const&, yb::tserver::PgOpenTableResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 63 | 191k | void Respond(const Status& status, Resp* resp, rpc::RpcContext* context) { | 64 | 191k | if (!status.ok()) { | 65 | 64 | StatusToPB(status, resp->mutable_status()); | 66 | 64 | } | 67 | 191k | context->RespondSuccess(); | 68 | 191k | } |
pg_client_service.cc:void yb::tserver::(anonymous namespace)::Respond<yb::tserver::PgReadSequenceTupleResponsePB>(yb::Status const&, yb::tserver::PgReadSequenceTupleResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 63 | 3.23k | void Respond(const Status& status, Resp* resp, rpc::RpcContext* context) { | 64 | 3.23k | if (!status.ok()) { | 65 | 0 | StatusToPB(status, resp->mutable_status()); | 66 | 0 | } | 67 | 3.23k | context->RespondSuccess(); | 68 | 3.23k | } |
pg_client_service.cc:void yb::tserver::(anonymous namespace)::Respond<yb::tserver::PgReserveOidsResponsePB>(yb::Status const&, yb::tserver::PgReserveOidsResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 63 | 805 | void Respond(const Status& status, Resp* resp, rpc::RpcContext* context) { | 64 | 805 | if (!status.ok()) { | 65 | 0 | StatusToPB(status, resp->mutable_status()); | 66 | 0 | } | 67 | 805 | context->RespondSuccess(); | 68 | 805 | } |
pg_client_service.cc:void yb::tserver::(anonymous namespace)::Respond<yb::tserver::PgRollbackSubTransactionResponsePB>(yb::Status const&, yb::tserver::PgRollbackSubTransactionResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 63 | 13.5k | void Respond(const Status& status, Resp* resp, rpc::RpcContext* context) { | 64 | 13.5k | if (!status.ok()) { | 65 | 0 | StatusToPB(status, resp->mutable_status()); | 66 | 0 | } | 67 | 13.5k | context->RespondSuccess(); | 68 | 13.5k | } |
pg_client_service.cc:void yb::tserver::(anonymous namespace)::Respond<yb::tserver::PgSetActiveSubTransactionResponsePB>(yb::Status const&, yb::tserver::PgSetActiveSubTransactionResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 63 | 61.7k | void Respond(const Status& status, Resp* resp, rpc::RpcContext* context) { | 64 | 61.7k | if (!status.ok()) { | 65 | 0 | StatusToPB(status, resp->mutable_status()); | 66 | 0 | } | 67 | 61.7k | context->RespondSuccess(); | 68 | 61.7k | } |
pg_client_service.cc:void yb::tserver::(anonymous namespace)::Respond<yb::tserver::PgTabletServerCountResponsePB>(yb::Status const&, yb::tserver::PgTabletServerCountResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 63 | 148 | void Respond(const Status& status, Resp* resp, rpc::RpcContext* context) { | 64 | 148 | if (!status.ok()) { | 65 | 0 | StatusToPB(status, resp->mutable_status()); | 66 | 0 | } | 67 | 148 | context->RespondSuccess(); | 68 | 148 | } |
pg_client_service.cc:void yb::tserver::(anonymous namespace)::Respond<yb::tserver::PgTruncateTableResponsePB>(yb::Status const&, yb::tserver::PgTruncateTableResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 63 | 624 | void Respond(const Status& status, Resp* resp, rpc::RpcContext* context) { | 64 | 624 | if (!status.ok()) { | 65 | 0 | StatusToPB(status, resp->mutable_status()); | 66 | 0 | } | 67 | 624 | context->RespondSuccess(); | 68 | 624 | } |
pg_client_service.cc:void yb::tserver::(anonymous namespace)::Respond<yb::tserver::PgUpdateSequenceTupleResponsePB>(yb::Status const&, yb::tserver::PgUpdateSequenceTupleResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 63 | 2.97k | void Respond(const Status& status, Resp* resp, rpc::RpcContext* context) { | 64 | 2.97k | if (!status.ok()) { | 65 | 0 | StatusToPB(status, resp->mutable_status()); | 66 | 0 | } | 67 | 2.97k | context->RespondSuccess(); | 68 | 2.97k | } |
pg_client_service.cc:void yb::tserver::(anonymous namespace)::Respond<yb::tserver::PgValidatePlacementResponsePB>(yb::Status const&, yb::tserver::PgValidatePlacementResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 63 | 1 | void Respond(const Status& status, Resp* resp, rpc::RpcContext* context) { | 64 | 1 | if (!status.ok()) { | 65 | 1 | StatusToPB(status, resp->mutable_status()); | 66 | 1 | } | 67 | 1 | context->RespondSuccess(); | 68 | 1 | } |
|
69 | | |
70 | | } // namespace |
71 | | |
72 | | template <class T> |
73 | | class Expirable { |
74 | | public: |
75 | | template <class... Args> |
76 | | explicit Expirable(CoarseDuration lifetime, Args&&... args) |
77 | | : lifetime_(lifetime), expiration_(NewExpiration()), |
78 | 6.09k | value_(std::forward<Args>(args)...) { |
79 | 6.09k | } |
80 | | |
81 | 6.54k | CoarseTimePoint expiration() const { |
82 | 6.54k | return expiration_.load(std::memory_order_acquire); |
83 | 6.54k | } |
84 | | |
85 | 2.48M | void Touch() { |
86 | 2.48M | auto new_expiration = NewExpiration(); |
87 | 2.48M | auto old_expiration = expiration_.load(std::memory_order_acquire); |
88 | 2.48M | while (new_expiration > old_expiration2.48M ) { |
89 | 2.48M | if (expiration_.compare_exchange_weak( |
90 | 2.48M | old_expiration, new_expiration, std::memory_order_acq_rel)) { |
91 | 2.48M | break; |
92 | 2.48M | } |
93 | 2.48M | } |
94 | 2.48M | } |
95 | | |
96 | 4.98M | const T& value() const { |
97 | 4.98M | return value_; |
98 | 4.98M | } |
99 | | |
100 | | private: |
101 | 2.49M | CoarseTimePoint NewExpiration() const { |
102 | 2.49M | return CoarseMonoClock::now() + lifetime_; |
103 | 2.49M | } |
104 | | |
105 | | const CoarseDuration lifetime_; |
106 | | std::atomic<CoarseTimePoint> expiration_; |
107 | | T value_; |
108 | | }; |
109 | | |
110 | | template <class Extractor> |
111 | | class ApplyToValue { |
112 | | public: |
113 | | using result_type = typename Extractor::result_type; |
114 | | |
115 | | template <class T> |
116 | 2.49M | auto operator()(const T& t) const { |
117 | 2.49M | return extractor_(t.value()); |
118 | 2.49M | } |
119 | | |
120 | | private: |
121 | | Extractor extractor_; |
122 | | }; |
123 | | |
124 | | class PgClientServiceImpl::Impl { |
125 | | public: |
126 | | explicit Impl( |
127 | | const std::shared_future<client::YBClient*>& client_future, |
128 | | const scoped_refptr<ClockBase>& clock, |
129 | | TransactionPoolProvider transaction_pool_provider, |
130 | | rpc::Scheduler* scheduler) |
131 | | : client_future_(client_future), |
132 | | clock_(clock), |
133 | | transaction_pool_provider_(std::move(transaction_pool_provider)), |
134 | | table_cache_(client_future), |
135 | 16.7k | check_expired_sessions_(scheduler) { |
136 | 16.7k | ScheduleCheckExpiredSessions(CoarseMonoClock::now()); |
137 | 16.7k | } |
138 | | |
139 | 182 | ~Impl() { |
140 | 182 | check_expired_sessions_.Shutdown(); |
141 | 182 | } |
142 | | |
143 | | CHECKED_STATUS Heartbeat( |
144 | 10.7k | const PgHeartbeatRequestPB& req, PgHeartbeatResponsePB* resp, rpc::RpcContext* context) { |
145 | 10.7k | if (req.session_id()) { |
146 | 4.68k | return ResultToStatus(DoGetSession(req.session_id())); |
147 | 4.68k | } |
148 | | |
149 | 6.09k | auto session_id = ++session_serial_no_; |
150 | 6.09k | auto session = std::make_shared<PgClientSession>( |
151 | 6.09k | &client(), clock_, transaction_pool_provider_, &table_cache_, session_id); |
152 | 6.09k | resp->set_session_id(session_id); |
153 | | |
154 | 6.09k | std::lock_guard<rw_spinlock> lock(mutex_); |
155 | 6.09k | auto it = sessions_.emplace( |
156 | 6.09k | FLAGS_pg_client_session_expiration_ms * 1ms, std::move(session)).first; |
157 | 6.09k | session_expiration_queue_.push({it->expiration(), session_id}); |
158 | 6.09k | return Status::OK(); |
159 | 10.7k | } |
160 | | |
161 | | CHECKED_STATUS OpenTable( |
162 | 191k | const PgOpenTableRequestPB& req, PgOpenTableResponsePB* resp, rpc::RpcContext* context) { |
163 | 191k | if (req.invalidate_cache_time_us()) { |
164 | 1.04k | table_cache_.InvalidateAll(CoarseTimePoint() + req.invalidate_cache_time_us() * 1us); |
165 | 1.04k | } |
166 | 191k | if (req.reopen()) { |
167 | 9 | table_cache_.Invalidate(req.table_id()); |
168 | 9 | } |
169 | 191k | RETURN_NOT_OK(table_cache_.GetInfo( |
170 | 191k | req.table_id(), resp->mutable_info(), resp->mutable_partitions())); |
171 | 191k | return Status::OK(); |
172 | 191k | } |
173 | | |
174 | | CHECKED_STATUS GetDatabaseInfo( |
175 | | const PgGetDatabaseInfoRequestPB& req, PgGetDatabaseInfoResponsePB* resp, |
176 | 5.71k | rpc::RpcContext* context) { |
177 | 5.71k | RETURN_NOT_OK(client().GetNamespaceInfo( |
178 | 5.71k | GetPgsqlNamespaceId(req.oid()), "" /* namespace_name */, YQL_DATABASE_PGSQL, |
179 | 5.71k | resp->mutable_info())); |
180 | | |
181 | 5.71k | return Status::OK(); |
182 | 5.71k | } |
183 | | |
184 | | CHECKED_STATUS IsInitDbDone( |
185 | | const PgIsInitDbDoneRequestPB& req, PgIsInitDbDoneResponsePB* resp, |
186 | 2 | rpc::RpcContext* context) { |
187 | 2 | HostPort master_leader_host_port = client().GetMasterLeaderAddress(); |
188 | 2 | auto proxy = std::make_shared<master::MasterAdminProxy>( |
189 | 2 | &client().proxy_cache(), master_leader_host_port); |
190 | 2 | rpc::RpcController rpc; |
191 | 2 | master::IsInitDbDoneRequestPB master_req; |
192 | 2 | master::IsInitDbDoneResponsePB master_resp; |
193 | 2 | RETURN_NOT_OK(proxy->IsInitDbDone(master_req, &master_resp, &rpc)); |
194 | 2 | if (master_resp.has_error()) { |
195 | 0 | return STATUS_FORMAT( |
196 | 0 | RuntimeError, |
197 | 0 | "IsInitDbDone RPC response hit error: $0", |
198 | 0 | master_resp.error().ShortDebugString()); |
199 | 0 | } |
200 | 2 | if (master_resp.done() && master_resp.has_initdb_error()0 && |
201 | 2 | !master_resp.initdb_error().empty()0 ) { |
202 | 0 | return STATUS_FORMAT(RuntimeError, "initdb failed: $0", master_resp.initdb_error()); |
203 | 0 | } |
204 | 2 | VLOG(1) << "IsInitDbDone response: " << master_resp.ShortDebugString()0 ; |
205 | | // We return true if initdb finished running, as well as if we know that it created the first |
206 | | // table (pg_proc) to make initdb idempotent on upgrades. |
207 | 2 | resp->set_done(master_resp.done() || master_resp.pg_proc_exists()); |
208 | 2 | return Status::OK(); |
209 | 2 | } |
210 | | |
211 | | CHECKED_STATUS ReserveOids( |
212 | 805 | const PgReserveOidsRequestPB& req, PgReserveOidsResponsePB* resp, rpc::RpcContext* context) { |
213 | 805 | uint32_t begin_oid, end_oid; |
214 | 805 | RETURN_NOT_OK(client().ReservePgsqlOids( |
215 | 805 | GetPgsqlNamespaceId(req.database_oid()), req.next_oid(), req.count(), &begin_oid, |
216 | 805 | &end_oid)); |
217 | 805 | resp->set_begin_oid(begin_oid); |
218 | 805 | resp->set_end_oid(end_oid); |
219 | | |
220 | 805 | return Status::OK(); |
221 | 805 | } |
222 | | |
223 | | CHECKED_STATUS GetCatalogMasterVersion( |
224 | | const PgGetCatalogMasterVersionRequestPB& req, |
225 | | PgGetCatalogMasterVersionResponsePB* resp, |
226 | 22 | rpc::RpcContext* context) { |
227 | 22 | uint64_t version; |
228 | 22 | RETURN_NOT_OK(client().GetYsqlCatalogMasterVersion(&version)); |
229 | 22 | resp->set_version(version); |
230 | 22 | return Status::OK(); |
231 | 22 | } |
232 | | |
233 | | CHECKED_STATUS CreateSequencesDataTable( |
234 | | const PgCreateSequencesDataTableRequestPB& req, |
235 | | PgCreateSequencesDataTableResponsePB* resp, |
236 | 0 | rpc::RpcContext* context) { |
237 | 0 | return tserver::CreateSequencesDataTable(&client(), context->GetClientDeadline()); |
238 | 0 | } |
239 | | |
240 | | CHECKED_STATUS TabletServerCount( |
241 | | const PgTabletServerCountRequestPB& req, PgTabletServerCountResponsePB* resp, |
242 | 148 | rpc::RpcContext* context) { |
243 | 148 | int result = 0; |
244 | 148 | RETURN_NOT_OK(client().TabletServerCount(&result, req.primary_only(), /* use_cache= */ true)); |
245 | 148 | resp->set_count(result); |
246 | 148 | return Status::OK(); |
247 | 148 | } |
248 | | |
249 | | CHECKED_STATUS ListLiveTabletServers( |
250 | | const PgListLiveTabletServersRequestPB& req, PgListLiveTabletServersResponsePB* resp, |
251 | 4 | rpc::RpcContext* context) { |
252 | 4 | auto tablet_servers = VERIFY_RESULT(client().ListLiveTabletServers(req.primary_only())); |
253 | 12 | for (const auto& server : tablet_servers) { |
254 | 12 | server.ToPB(resp->mutable_servers()->Add()); |
255 | 12 | } |
256 | 4 | return Status::OK(); |
257 | 4 | } |
258 | | |
259 | | CHECKED_STATUS ValidatePlacement( |
260 | | const PgValidatePlacementRequestPB& req, PgValidatePlacementResponsePB* resp, |
261 | 1 | rpc::RpcContext* context) { |
262 | 1 | master::ReplicationInfoPB replication_info; |
263 | 1 | master::PlacementInfoPB* live_replicas = replication_info.mutable_live_replicas(); |
264 | | |
265 | 1 | for (const auto& block : req.placement_infos()) { |
266 | 1 | auto pb = live_replicas->add_placement_blocks(); |
267 | 1 | pb->mutable_cloud_info()->set_placement_cloud(block.cloud()); |
268 | 1 | pb->mutable_cloud_info()->set_placement_region(block.region()); |
269 | 1 | pb->mutable_cloud_info()->set_placement_zone(block.zone()); |
270 | 1 | pb->set_min_num_replicas(block.min_num_replicas()); |
271 | 1 | } |
272 | 1 | live_replicas->set_num_replicas(req.num_replicas()); |
273 | | |
274 | 1 | return client().ValidateReplicationInfo(replication_info); |
275 | 1 | } |
276 | | |
277 | | void Perform( |
278 | 2.17M | const PgPerformRequestPB& req, PgPerformResponsePB* resp, rpc::RpcContext* context) { |
279 | 2.17M | auto status = DoPerform(req, resp, context); |
280 | 2.17M | if (!status.ok()) { |
281 | 13 | Respond(status, resp, context); |
282 | 13 | } |
283 | 2.17M | } |
284 | | |
285 | | #define PG_CLIENT_SESSION_METHOD_FORWARD(r, data, method) \ |
286 | | CHECKED_STATUS method( \ |
287 | | const BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), RequestPB)& req, \ |
288 | | BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB)* resp, \ |
289 | 311k | rpc::RpcContext* context) { \ |
290 | 311k | return VERIFY_RESULT(GetSession(req))->method(req, resp, context); \ |
291 | 311k | } yb::tserver::PgClientServiceImpl::Impl::AlterDatabase(yb::tserver::PgAlterDatabaseRequestPB const&, yb::tserver::PgAlterDatabaseResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 289 | 3 | rpc::RpcContext* context) { \ | 290 | 3 | return VERIFY_RESULT(GetSession(req))->method(req, resp, context); \ | 291 | 3 | } |
yb::tserver::PgClientServiceImpl::Impl::AlterTable(yb::tserver::PgAlterTableRequestPB const&, yb::tserver::PgAlterTableResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 289 | 522 | rpc::RpcContext* context) { \ | 290 | 522 | return VERIFY_RESULT(GetSession(req))->method(req, resp, context); \ | 291 | 522 | } |
yb::tserver::PgClientServiceImpl::Impl::BackfillIndex(yb::tserver::PgBackfillIndexRequestPB const&, yb::tserver::PgBackfillIndexResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 289 | 540 | rpc::RpcContext* context) { \ | 290 | 540 | return VERIFY_RESULT(GetSession(req))->method(req, resp, context); \ | 291 | 540 | } |
yb::tserver::PgClientServiceImpl::Impl::CreateDatabase(yb::tserver::PgCreateDatabaseRequestPB const&, yb::tserver::PgCreateDatabaseResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 289 | 134 | rpc::RpcContext* context) { \ | 290 | 134 | return VERIFY_RESULT(GetSession(req))->method(req, resp, context); \ | 291 | 134 | } |
yb::tserver::PgClientServiceImpl::Impl::CreateTable(yb::tserver::PgCreateTableRequestPB const&, yb::tserver::PgCreateTableResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 289 | 5.05k | rpc::RpcContext* context) { \ | 290 | 5.05k | return VERIFY_RESULT(GetSession(req))->method(req, resp, context); \ | 291 | 5.05k | } |
yb::tserver::PgClientServiceImpl::Impl::CreateTablegroup(yb::tserver::PgCreateTablegroupRequestPB const&, yb::tserver::PgCreateTablegroupResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 289 | 54 | rpc::RpcContext* context) { \ | 290 | 54 | return VERIFY_RESULT(GetSession(req))->method(req, resp, context); \ | 291 | 54 | } |
yb::tserver::PgClientServiceImpl::Impl::DeleteDBSequences(yb::tserver::PgDeleteDBSequencesRequestPB const&, yb::tserver::PgDeleteDBSequencesResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 289 | 71 | rpc::RpcContext* context) { \ | 290 | 71 | return VERIFY_RESULT(GetSession(req))->method(req, resp, context); \ | 291 | 71 | } |
yb::tserver::PgClientServiceImpl::Impl::DeleteSequenceTuple(yb::tserver::PgDeleteSequenceTupleRequestPB const&, yb::tserver::PgDeleteSequenceTupleResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 289 | 282 | rpc::RpcContext* context) { \ | 290 | 282 | return VERIFY_RESULT(GetSession(req))->method(req, resp, context); \ | 291 | 282 | } |
yb::tserver::PgClientServiceImpl::Impl::DropDatabase(yb::tserver::PgDropDatabaseRequestPB const&, yb::tserver::PgDropDatabaseResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 289 | 72 | rpc::RpcContext* context) { \ | 290 | 72 | return VERIFY_RESULT(GetSession(req))->method(req, resp, context); \ | 291 | 72 | } |
yb::tserver::PgClientServiceImpl::Impl::DropTable(yb::tserver::PgDropTableRequestPB const&, yb::tserver::PgDropTableResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 289 | 4.14k | rpc::RpcContext* context) { \ | 290 | 4.14k | return VERIFY_RESULT(GetSession(req))->method(req, resp, context); \ | 291 | 4.14k | } |
yb::tserver::PgClientServiceImpl::Impl::DropTablegroup(yb::tserver::PgDropTablegroupRequestPB const&, yb::tserver::PgDropTablegroupResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 289 | 39 | rpc::RpcContext* context) { \ | 290 | 39 | return VERIFY_RESULT(GetSession(req))->method(req, resp, context); \ | 291 | 39 | } |
yb::tserver::PgClientServiceImpl::Impl::FinishTransaction(yb::tserver::PgFinishTransactionRequestPB const&, yb::tserver::PgFinishTransactionResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 289 | 218k | rpc::RpcContext* context) { \ | 290 | 218k | return VERIFY_RESULT(GetSession(req))->method(req, resp, context); \ | 291 | 218k | } |
yb::tserver::PgClientServiceImpl::Impl::InsertSequenceTuple(yb::tserver::PgInsertSequenceTupleRequestPB const&, yb::tserver::PgInsertSequenceTupleResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 289 | 295 | rpc::RpcContext* context) { \ | 290 | 295 | return VERIFY_RESULT(GetSession(req))->method(req, resp, context); \ | 291 | 295 | } |
yb::tserver::PgClientServiceImpl::Impl::ReadSequenceTuple(yb::tserver::PgReadSequenceTupleRequestPB const&, yb::tserver::PgReadSequenceTupleResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 289 | 3.23k | rpc::RpcContext* context) { \ | 290 | 3.23k | return VERIFY_RESULT(GetSession(req))->method(req, resp, context); \ | 291 | 3.23k | } |
yb::tserver::PgClientServiceImpl::Impl::RollbackSubTransaction(yb::tserver::PgRollbackSubTransactionRequestPB const&, yb::tserver::PgRollbackSubTransactionResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 289 | 13.5k | rpc::RpcContext* context) { \ | 290 | 13.5k | return VERIFY_RESULT(GetSession(req))->method(req, resp, context); \ | 291 | 13.5k | } |
yb::tserver::PgClientServiceImpl::Impl::SetActiveSubTransaction(yb::tserver::PgSetActiveSubTransactionRequestPB const&, yb::tserver::PgSetActiveSubTransactionResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 289 | 61.7k | rpc::RpcContext* context) { \ | 290 | 61.7k | return VERIFY_RESULT(GetSession(req))->method(req, resp, context); \ | 291 | 61.7k | } |
yb::tserver::PgClientServiceImpl::Impl::TruncateTable(yb::tserver::PgTruncateTableRequestPB const&, yb::tserver::PgTruncateTableResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 289 | 624 | rpc::RpcContext* context) { \ | 290 | 624 | return VERIFY_RESULT(GetSession(req))->method(req, resp, context); \ | 291 | 624 | } |
yb::tserver::PgClientServiceImpl::Impl::UpdateSequenceTuple(yb::tserver::PgUpdateSequenceTupleRequestPB const&, yb::tserver::PgUpdateSequenceTupleResponsePB*, yb::rpc::RpcContext*) Line | Count | Source | 289 | 2.97k | rpc::RpcContext* context) { \ | 290 | 2.97k | return VERIFY_RESULT(GetSession(req))->method(req, resp, context); \ | 291 | 2.97k | } |
|
292 | | |
293 | | BOOST_PP_SEQ_FOR_EACH(PG_CLIENT_SESSION_METHOD_FORWARD, ~, PG_CLIENT_SESSION_METHODS); |
294 | | |
295 | | private: |
296 | 12.7k | client::YBClient& client() { return *client_future_.get(); } |
297 | | |
298 | | template <class Req> |
299 | 2.48M | Result<PgClientSessionLocker> GetSession(const Req& req) { |
300 | 2.48M | return GetSession(req.session_id()); |
301 | 2.48M | } yb::Result<yb::tserver::PgClientSessionLocker> yb::tserver::PgClientServiceImpl::Impl::GetSession<yb::tserver::PgPerformRequestPB>(yb::tserver::PgPerformRequestPB const&) Line | Count | Source | 299 | 2.17M | Result<PgClientSessionLocker> GetSession(const Req& req) { | 300 | 2.17M | return GetSession(req.session_id()); | 301 | 2.17M | } |
yb::Result<yb::tserver::PgClientSessionLocker> yb::tserver::PgClientServiceImpl::Impl::GetSession<yb::tserver::PgAlterDatabaseRequestPB>(yb::tserver::PgAlterDatabaseRequestPB const&) Line | Count | Source | 299 | 3 | Result<PgClientSessionLocker> GetSession(const Req& req) { | 300 | 3 | return GetSession(req.session_id()); | 301 | 3 | } |
yb::Result<yb::tserver::PgClientSessionLocker> yb::tserver::PgClientServiceImpl::Impl::GetSession<yb::tserver::PgAlterTableRequestPB>(yb::tserver::PgAlterTableRequestPB const&) Line | Count | Source | 299 | 522 | Result<PgClientSessionLocker> GetSession(const Req& req) { | 300 | 522 | return GetSession(req.session_id()); | 301 | 522 | } |
yb::Result<yb::tserver::PgClientSessionLocker> yb::tserver::PgClientServiceImpl::Impl::GetSession<yb::tserver::PgBackfillIndexRequestPB>(yb::tserver::PgBackfillIndexRequestPB const&) Line | Count | Source | 299 | 540 | Result<PgClientSessionLocker> GetSession(const Req& req) { | 300 | 540 | return GetSession(req.session_id()); | 301 | 540 | } |
yb::Result<yb::tserver::PgClientSessionLocker> yb::tserver::PgClientServiceImpl::Impl::GetSession<yb::tserver::PgCreateDatabaseRequestPB>(yb::tserver::PgCreateDatabaseRequestPB const&) Line | Count | Source | 299 | 134 | Result<PgClientSessionLocker> GetSession(const Req& req) { | 300 | 134 | return GetSession(req.session_id()); | 301 | 134 | } |
yb::Result<yb::tserver::PgClientSessionLocker> yb::tserver::PgClientServiceImpl::Impl::GetSession<yb::tserver::PgCreateTableRequestPB>(yb::tserver::PgCreateTableRequestPB const&) Line | Count | Source | 299 | 5.05k | Result<PgClientSessionLocker> GetSession(const Req& req) { | 300 | 5.05k | return GetSession(req.session_id()); | 301 | 5.05k | } |
yb::Result<yb::tserver::PgClientSessionLocker> yb::tserver::PgClientServiceImpl::Impl::GetSession<yb::tserver::PgCreateTablegroupRequestPB>(yb::tserver::PgCreateTablegroupRequestPB const&) Line | Count | Source | 299 | 54 | Result<PgClientSessionLocker> GetSession(const Req& req) { | 300 | 54 | return GetSession(req.session_id()); | 301 | 54 | } |
yb::Result<yb::tserver::PgClientSessionLocker> yb::tserver::PgClientServiceImpl::Impl::GetSession<yb::tserver::PgDeleteDBSequencesRequestPB>(yb::tserver::PgDeleteDBSequencesRequestPB const&) Line | Count | Source | 299 | 71 | Result<PgClientSessionLocker> GetSession(const Req& req) { | 300 | 71 | return GetSession(req.session_id()); | 301 | 71 | } |
yb::Result<yb::tserver::PgClientSessionLocker> yb::tserver::PgClientServiceImpl::Impl::GetSession<yb::tserver::PgDeleteSequenceTupleRequestPB>(yb::tserver::PgDeleteSequenceTupleRequestPB const&) Line | Count | Source | 299 | 282 | Result<PgClientSessionLocker> GetSession(const Req& req) { | 300 | 282 | return GetSession(req.session_id()); | 301 | 282 | } |
yb::Result<yb::tserver::PgClientSessionLocker> yb::tserver::PgClientServiceImpl::Impl::GetSession<yb::tserver::PgDropDatabaseRequestPB>(yb::tserver::PgDropDatabaseRequestPB const&) Line | Count | Source | 299 | 72 | Result<PgClientSessionLocker> GetSession(const Req& req) { | 300 | 72 | return GetSession(req.session_id()); | 301 | 72 | } |
yb::Result<yb::tserver::PgClientSessionLocker> yb::tserver::PgClientServiceImpl::Impl::GetSession<yb::tserver::PgDropTableRequestPB>(yb::tserver::PgDropTableRequestPB const&) Line | Count | Source | 299 | 4.14k | Result<PgClientSessionLocker> GetSession(const Req& req) { | 300 | 4.14k | return GetSession(req.session_id()); | 301 | 4.14k | } |
yb::Result<yb::tserver::PgClientSessionLocker> yb::tserver::PgClientServiceImpl::Impl::GetSession<yb::tserver::PgDropTablegroupRequestPB>(yb::tserver::PgDropTablegroupRequestPB const&) Line | Count | Source | 299 | 39 | Result<PgClientSessionLocker> GetSession(const Req& req) { | 300 | 39 | return GetSession(req.session_id()); | 301 | 39 | } |
yb::Result<yb::tserver::PgClientSessionLocker> yb::tserver::PgClientServiceImpl::Impl::GetSession<yb::tserver::PgFinishTransactionRequestPB>(yb::tserver::PgFinishTransactionRequestPB const&) Line | Count | Source | 299 | 218k | Result<PgClientSessionLocker> GetSession(const Req& req) { | 300 | 218k | return GetSession(req.session_id()); | 301 | 218k | } |
yb::Result<yb::tserver::PgClientSessionLocker> yb::tserver::PgClientServiceImpl::Impl::GetSession<yb::tserver::PgInsertSequenceTupleRequestPB>(yb::tserver::PgInsertSequenceTupleRequestPB const&) Line | Count | Source | 299 | 295 | Result<PgClientSessionLocker> GetSession(const Req& req) { | 300 | 295 | return GetSession(req.session_id()); | 301 | 295 | } |
yb::Result<yb::tserver::PgClientSessionLocker> yb::tserver::PgClientServiceImpl::Impl::GetSession<yb::tserver::PgReadSequenceTupleRequestPB>(yb::tserver::PgReadSequenceTupleRequestPB const&) Line | Count | Source | 299 | 3.23k | Result<PgClientSessionLocker> GetSession(const Req& req) { | 300 | 3.23k | return GetSession(req.session_id()); | 301 | 3.23k | } |
yb::Result<yb::tserver::PgClientSessionLocker> yb::tserver::PgClientServiceImpl::Impl::GetSession<yb::tserver::PgRollbackSubTransactionRequestPB>(yb::tserver::PgRollbackSubTransactionRequestPB const&) Line | Count | Source | 299 | 13.5k | Result<PgClientSessionLocker> GetSession(const Req& req) { | 300 | 13.5k | return GetSession(req.session_id()); | 301 | 13.5k | } |
yb::Result<yb::tserver::PgClientSessionLocker> yb::tserver::PgClientServiceImpl::Impl::GetSession<yb::tserver::PgSetActiveSubTransactionRequestPB>(yb::tserver::PgSetActiveSubTransactionRequestPB const&) Line | Count | Source | 299 | 61.7k | Result<PgClientSessionLocker> GetSession(const Req& req) { | 300 | 61.7k | return GetSession(req.session_id()); | 301 | 61.7k | } |
yb::Result<yb::tserver::PgClientSessionLocker> yb::tserver::PgClientServiceImpl::Impl::GetSession<yb::tserver::PgTruncateTableRequestPB>(yb::tserver::PgTruncateTableRequestPB const&) Line | Count | Source | 299 | 624 | Result<PgClientSessionLocker> GetSession(const Req& req) { | 300 | 624 | return GetSession(req.session_id()); | 301 | 624 | } |
yb::Result<yb::tserver::PgClientSessionLocker> yb::tserver::PgClientServiceImpl::Impl::GetSession<yb::tserver::PgUpdateSequenceTupleRequestPB>(yb::tserver::PgUpdateSequenceTupleRequestPB const&) Line | Count | Source | 299 | 2.97k | Result<PgClientSessionLocker> GetSession(const Req& req) { | 300 | 2.97k | return GetSession(req.session_id()); | 301 | 2.97k | } |
|
302 | | |
303 | 2.48M | Result<PgClientSession&> DoGetSession(uint64_t session_id) { |
304 | 2.48M | SharedLock<rw_spinlock> lock(mutex_); |
305 | 2.48M | DCHECK_NE(session_id, 0); |
306 | 2.48M | auto it = sessions_.find(session_id); |
307 | 2.48M | if (it == sessions_.end()) { |
308 | 0 | return STATUS_FORMAT(InvalidArgument, "Unknown session: $0", session_id); |
309 | 0 | } |
310 | 2.48M | const_cast<SessionsEntry&>(*it).Touch(); |
311 | 2.48M | return *it->value(); |
312 | 2.48M | } |
313 | | |
314 | 2.48M | Result<PgClientSessionLocker> GetSession(uint64_t session_id) { |
315 | 2.48M | return PgClientSessionLocker(&VERIFY_RESULT_REF(DoGetSession(session_id))); |
316 | 2.48M | } |
317 | | |
318 | 143k | void ScheduleCheckExpiredSessions(CoarseTimePoint now) REQUIRES(mutex_) { |
319 | 143k | auto time = session_expiration_queue_.empty() |
320 | 143k | ? CoarseTimePoint(now + FLAGS_pg_client_session_expiration_ms * 1ms)142k |
321 | 143k | : session_expiration_queue_.top().first + 1s570 ; |
322 | 143k | check_expired_sessions_.Schedule([this](const Status& status) { |
323 | 126k | if (!status.ok()) { |
324 | 182 | return; |
325 | 182 | } |
326 | 126k | this->CheckExpiredSessions(); |
327 | 126k | }, time - now); |
328 | 143k | } |
329 | | |
330 | 126k | void CheckExpiredSessions() { |
331 | 126k | auto now = CoarseMonoClock::now(); |
332 | 126k | std::lock_guard<rw_spinlock> lock(mutex_); |
333 | 126k | while (!session_expiration_queue_.empty()) { |
334 | 1.02k | auto& top = session_expiration_queue_.top(); |
335 | 1.02k | if (top.first > now) { |
336 | 570 | break; |
337 | 570 | } |
338 | 452 | auto id = top.second; |
339 | 452 | session_expiration_queue_.pop(); |
340 | 452 | auto it = sessions_.find(id); |
341 | 452 | if (it != sessions_.end()) { |
342 | 452 | auto current_expiration = it->expiration(); |
343 | 452 | if (current_expiration > now) { |
344 | 168 | session_expiration_queue_.push({current_expiration, id}); |
345 | 284 | } else { |
346 | 284 | sessions_.erase(it); |
347 | 284 | } |
348 | 452 | } |
349 | 452 | } |
350 | 126k | ScheduleCheckExpiredSessions(now); |
351 | 126k | } |
352 | | |
353 | | CHECKED_STATUS DoPerform( |
354 | 2.17M | const PgPerformRequestPB& req, PgPerformResponsePB* resp, rpc::RpcContext* context) { |
355 | 2.17M | return VERIFY_RESULT(GetSession(req))->Perform(req, resp, context); |
356 | 2.17M | } |
357 | | |
358 | | std::shared_future<client::YBClient*> client_future_; |
359 | | scoped_refptr<ClockBase> clock_; |
360 | | TransactionPoolProvider transaction_pool_provider_; |
361 | | PgTableCache table_cache_; |
362 | | rw_spinlock mutex_; |
363 | | |
364 | | class ExpirationTag; |
365 | | |
366 | | using SessionsEntry = Expirable<std::shared_ptr<PgClientSession>>; |
367 | | boost::multi_index_container< |
368 | | SessionsEntry, |
369 | | boost::multi_index::indexed_by< |
370 | | boost::multi_index::hashed_unique< |
371 | | ApplyToValue< |
372 | | boost::multi_index::const_mem_fun<PgClientSession, uint64_t, &PgClientSession::id> |
373 | | > |
374 | | > |
375 | | > |
376 | | > sessions_ GUARDED_BY(mutex_); |
377 | | |
378 | | using ExpirationEntry = std::pair<CoarseTimePoint, uint64_t>; |
379 | | |
380 | | struct CompareExpiration { |
381 | 16.7k | bool operator()(const ExpirationEntry& lhs, const ExpirationEntry& rhs) const { |
382 | 16.7k | return rhs.first > lhs.first; |
383 | 16.7k | } |
384 | | }; |
385 | | |
386 | | std::priority_queue<ExpirationEntry, |
387 | | std::vector<ExpirationEntry>, |
388 | | CompareExpiration> session_expiration_queue_; |
389 | | |
390 | | std::atomic<int64_t> session_serial_no_{0}; |
391 | | |
392 | | rpc::ScheduledTaskTracker check_expired_sessions_; |
393 | | }; |
394 | | |
395 | | PgClientServiceImpl::PgClientServiceImpl( |
396 | | const std::shared_future<client::YBClient*>& client_future, |
397 | | const scoped_refptr<ClockBase>& clock, |
398 | | TransactionPoolProvider transaction_pool_provider, |
399 | | const scoped_refptr<MetricEntity>& entity, |
400 | | rpc::Scheduler* scheduler) |
401 | | : PgClientServiceIf(entity), |
402 | 16.7k | impl_(new Impl(client_future, clock, std::move(transaction_pool_provider), scheduler)) {} |
403 | | |
404 | 182 | PgClientServiceImpl::~PgClientServiceImpl() {} |
405 | | |
406 | | void PgClientServiceImpl::Perform( |
407 | 2.17M | const PgPerformRequestPB* req, PgPerformResponsePB* resp, rpc::RpcContext context) { |
408 | 2.17M | impl_->Perform(*req, resp, &context); |
409 | 2.17M | } |
410 | | |
411 | | #define YB_PG_CLIENT_METHOD_DEFINE(r, data, method) \ |
412 | | void PgClientServiceImpl::method( \ |
413 | | const BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), RequestPB)* req, \ |
414 | | BOOST_PP_CAT(BOOST_PP_CAT(Pg, method), ResponsePB)* resp, \ |
415 | 521k | rpc::RpcContext context) { \ |
416 | 521k | Respond(impl_->method(*req, resp, &context), resp, &context); \ |
417 | 521k | } yb::tserver::PgClientServiceImpl::AlterDatabase(yb::tserver::PgAlterDatabaseRequestPB const*, yb::tserver::PgAlterDatabaseResponsePB*, yb::rpc::RpcContext) Line | Count | Source | 415 | 3 | rpc::RpcContext context) { \ | 416 | 3 | Respond(impl_->method(*req, resp, &context), resp, &context); \ | 417 | 3 | } |
yb::tserver::PgClientServiceImpl::AlterTable(yb::tserver::PgAlterTableRequestPB const*, yb::tserver::PgAlterTableResponsePB*, yb::rpc::RpcContext) Line | Count | Source | 415 | 522 | rpc::RpcContext context) { \ | 416 | 522 | Respond(impl_->method(*req, resp, &context), resp, &context); \ | 417 | 522 | } |
yb::tserver::PgClientServiceImpl::BackfillIndex(yb::tserver::PgBackfillIndexRequestPB const*, yb::tserver::PgBackfillIndexResponsePB*, yb::rpc::RpcContext) Line | Count | Source | 415 | 540 | rpc::RpcContext context) { \ | 416 | 540 | Respond(impl_->method(*req, resp, &context), resp, &context); \ | 417 | 540 | } |
yb::tserver::PgClientServiceImpl::CreateDatabase(yb::tserver::PgCreateDatabaseRequestPB const*, yb::tserver::PgCreateDatabaseResponsePB*, yb::rpc::RpcContext) Line | Count | Source | 415 | 134 | rpc::RpcContext context) { \ | 416 | 134 | Respond(impl_->method(*req, resp, &context), resp, &context); \ | 417 | 134 | } |
Unexecuted instantiation: yb::tserver::PgClientServiceImpl::CreateSequencesDataTable(yb::tserver::PgCreateSequencesDataTableRequestPB const*, yb::tserver::PgCreateSequencesDataTableResponsePB*, yb::rpc::RpcContext) yb::tserver::PgClientServiceImpl::CreateTable(yb::tserver::PgCreateTableRequestPB const*, yb::tserver::PgCreateTableResponsePB*, yb::rpc::RpcContext) Line | Count | Source | 415 | 5.05k | rpc::RpcContext context) { \ | 416 | 5.05k | Respond(impl_->method(*req, resp, &context), resp, &context); \ | 417 | 5.05k | } |
yb::tserver::PgClientServiceImpl::CreateTablegroup(yb::tserver::PgCreateTablegroupRequestPB const*, yb::tserver::PgCreateTablegroupResponsePB*, yb::rpc::RpcContext) Line | Count | Source | 415 | 54 | rpc::RpcContext context) { \ | 416 | 54 | Respond(impl_->method(*req, resp, &context), resp, &context); \ | 417 | 54 | } |
yb::tserver::PgClientServiceImpl::DeleteDBSequences(yb::tserver::PgDeleteDBSequencesRequestPB const*, yb::tserver::PgDeleteDBSequencesResponsePB*, yb::rpc::RpcContext) Line | Count | Source | 415 | 71 | rpc::RpcContext context) { \ | 416 | 71 | Respond(impl_->method(*req, resp, &context), resp, &context); \ | 417 | 71 | } |
yb::tserver::PgClientServiceImpl::DeleteSequenceTuple(yb::tserver::PgDeleteSequenceTupleRequestPB const*, yb::tserver::PgDeleteSequenceTupleResponsePB*, yb::rpc::RpcContext) Line | Count | Source | 415 | 282 | rpc::RpcContext context) { \ | 416 | 282 | Respond(impl_->method(*req, resp, &context), resp, &context); \ | 417 | 282 | } |
yb::tserver::PgClientServiceImpl::DropDatabase(yb::tserver::PgDropDatabaseRequestPB const*, yb::tserver::PgDropDatabaseResponsePB*, yb::rpc::RpcContext) Line | Count | Source | 415 | 72 | rpc::RpcContext context) { \ | 416 | 72 | Respond(impl_->method(*req, resp, &context), resp, &context); \ | 417 | 72 | } |
yb::tserver::PgClientServiceImpl::DropTable(yb::tserver::PgDropTableRequestPB const*, yb::tserver::PgDropTableResponsePB*, yb::rpc::RpcContext) Line | Count | Source | 415 | 4.14k | rpc::RpcContext context) { \ | 416 | 4.14k | Respond(impl_->method(*req, resp, &context), resp, &context); \ | 417 | 4.14k | } |
yb::tserver::PgClientServiceImpl::DropTablegroup(yb::tserver::PgDropTablegroupRequestPB const*, yb::tserver::PgDropTablegroupResponsePB*, yb::rpc::RpcContext) Line | Count | Source | 415 | 39 | rpc::RpcContext context) { \ | 416 | 39 | Respond(impl_->method(*req, resp, &context), resp, &context); \ | 417 | 39 | } |
yb::tserver::PgClientServiceImpl::FinishTransaction(yb::tserver::PgFinishTransactionRequestPB const*, yb::tserver::PgFinishTransactionResponsePB*, yb::rpc::RpcContext) Line | Count | Source | 415 | 218k | rpc::RpcContext context) { \ | 416 | 218k | Respond(impl_->method(*req, resp, &context), resp, &context); \ | 417 | 218k | } |
yb::tserver::PgClientServiceImpl::GetCatalogMasterVersion(yb::tserver::PgGetCatalogMasterVersionRequestPB const*, yb::tserver::PgGetCatalogMasterVersionResponsePB*, yb::rpc::RpcContext) Line | Count | Source | 415 | 22 | rpc::RpcContext context) { \ | 416 | 22 | Respond(impl_->method(*req, resp, &context), resp, &context); \ | 417 | 22 | } |
yb::tserver::PgClientServiceImpl::GetDatabaseInfo(yb::tserver::PgGetDatabaseInfoRequestPB const*, yb::tserver::PgGetDatabaseInfoResponsePB*, yb::rpc::RpcContext) Line | Count | Source | 415 | 5.72k | rpc::RpcContext context) { \ | 416 | 5.72k | Respond(impl_->method(*req, resp, &context), resp, &context); \ | 417 | 5.72k | } |
yb::tserver::PgClientServiceImpl::Heartbeat(yb::tserver::PgHeartbeatRequestPB const*, yb::tserver::PgHeartbeatResponsePB*, yb::rpc::RpcContext) Line | Count | Source | 415 | 10.7k | rpc::RpcContext context) { \ | 416 | 10.7k | Respond(impl_->method(*req, resp, &context), resp, &context); \ | 417 | 10.7k | } |
yb::tserver::PgClientServiceImpl::InsertSequenceTuple(yb::tserver::PgInsertSequenceTupleRequestPB const*, yb::tserver::PgInsertSequenceTupleResponsePB*, yb::rpc::RpcContext) Line | Count | Source | 415 | 295 | rpc::RpcContext context) { \ | 416 | 295 | Respond(impl_->method(*req, resp, &context), resp, &context); \ | 417 | 295 | } |
yb::tserver::PgClientServiceImpl::IsInitDbDone(yb::tserver::PgIsInitDbDoneRequestPB const*, yb::tserver::PgIsInitDbDoneResponsePB*, yb::rpc::RpcContext) Line | Count | Source | 415 | 2 | rpc::RpcContext context) { \ | 416 | 2 | Respond(impl_->method(*req, resp, &context), resp, &context); \ | 417 | 2 | } |
yb::tserver::PgClientServiceImpl::ListLiveTabletServers(yb::tserver::PgListLiveTabletServersRequestPB const*, yb::tserver::PgListLiveTabletServersResponsePB*, yb::rpc::RpcContext) Line | Count | Source | 415 | 4 | rpc::RpcContext context) { \ | 416 | 4 | Respond(impl_->method(*req, resp, &context), resp, &context); \ | 417 | 4 | } |
yb::tserver::PgClientServiceImpl::OpenTable(yb::tserver::PgOpenTableRequestPB const*, yb::tserver::PgOpenTableResponsePB*, yb::rpc::RpcContext) Line | Count | Source | 415 | 191k | rpc::RpcContext context) { \ | 416 | 191k | Respond(impl_->method(*req, resp, &context), resp, &context); \ | 417 | 191k | } |
yb::tserver::PgClientServiceImpl::ReadSequenceTuple(yb::tserver::PgReadSequenceTupleRequestPB const*, yb::tserver::PgReadSequenceTupleResponsePB*, yb::rpc::RpcContext) Line | Count | Source | 415 | 3.23k | rpc::RpcContext context) { \ | 416 | 3.23k | Respond(impl_->method(*req, resp, &context), resp, &context); \ | 417 | 3.23k | } |
yb::tserver::PgClientServiceImpl::ReserveOids(yb::tserver::PgReserveOidsRequestPB const*, yb::tserver::PgReserveOidsResponsePB*, yb::rpc::RpcContext) Line | Count | Source | 415 | 805 | rpc::RpcContext context) { \ | 416 | 805 | Respond(impl_->method(*req, resp, &context), resp, &context); \ | 417 | 805 | } |
yb::tserver::PgClientServiceImpl::RollbackSubTransaction(yb::tserver::PgRollbackSubTransactionRequestPB const*, yb::tserver::PgRollbackSubTransactionResponsePB*, yb::rpc::RpcContext) Line | Count | Source | 415 | 13.5k | rpc::RpcContext context) { \ | 416 | 13.5k | Respond(impl_->method(*req, resp, &context), resp, &context); \ | 417 | 13.5k | } |
yb::tserver::PgClientServiceImpl::SetActiveSubTransaction(yb::tserver::PgSetActiveSubTransactionRequestPB const*, yb::tserver::PgSetActiveSubTransactionResponsePB*, yb::rpc::RpcContext) Line | Count | Source | 415 | 61.7k | rpc::RpcContext context) { \ | 416 | 61.7k | Respond(impl_->method(*req, resp, &context), resp, &context); \ | 417 | 61.7k | } |
yb::tserver::PgClientServiceImpl::TabletServerCount(yb::tserver::PgTabletServerCountRequestPB const*, yb::tserver::PgTabletServerCountResponsePB*, yb::rpc::RpcContext) Line | Count | Source | 415 | 148 | rpc::RpcContext context) { \ | 416 | 148 | Respond(impl_->method(*req, resp, &context), resp, &context); \ | 417 | 148 | } |
yb::tserver::PgClientServiceImpl::TruncateTable(yb::tserver::PgTruncateTableRequestPB const*, yb::tserver::PgTruncateTableResponsePB*, yb::rpc::RpcContext) Line | Count | Source | 415 | 624 | rpc::RpcContext context) { \ | 416 | 624 | Respond(impl_->method(*req, resp, &context), resp, &context); \ | 417 | 624 | } |
yb::tserver::PgClientServiceImpl::UpdateSequenceTuple(yb::tserver::PgUpdateSequenceTupleRequestPB const*, yb::tserver::PgUpdateSequenceTupleResponsePB*, yb::rpc::RpcContext) Line | Count | Source | 415 | 2.97k | rpc::RpcContext context) { \ | 416 | 2.97k | Respond(impl_->method(*req, resp, &context), resp, &context); \ | 417 | 2.97k | } |
yb::tserver::PgClientServiceImpl::ValidatePlacement(yb::tserver::PgValidatePlacementRequestPB const*, yb::tserver::PgValidatePlacementResponsePB*, yb::rpc::RpcContext) Line | Count | Source | 415 | 1 | rpc::RpcContext context) { \ | 416 | 1 | Respond(impl_->method(*req, resp, &context), resp, &context); \ | 417 | 1 | } |
|
418 | | |
419 | | BOOST_PP_SEQ_FOR_EACH(YB_PG_CLIENT_METHOD_DEFINE, ~, YB_PG_CLIENT_METHODS); |
420 | | |
421 | | } // namespace tserver |
422 | | } // namespace yb |