/Users/deen/code/yugabyte-db/src/yb/client/universe_key_client.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/client/universe_key_client.h" |
15 | | |
16 | | #include "yb/encryption/encryption.pb.h" |
17 | | |
18 | | #include "yb/master/master_encryption.proxy.h" |
19 | | |
20 | | #include "yb/rpc/rpc_controller.h" |
21 | | |
22 | | using namespace std::chrono_literals; |
23 | | |
24 | | namespace yb { |
25 | | namespace client { |
26 | | |
27 | 7.88k | void UniverseKeyClient::GetUniverseKeyRegistryAsync() { |
28 | 12.8k | for (const auto& host_port : hps_) { |
29 | 12.8k | SendAsyncRequest(host_port); |
30 | 12.8k | } |
31 | 7.88k | } |
32 | | |
33 | 0 | void UniverseKeyClient::GetUniverseKeyRegistrySync() { |
34 | 0 | for (const auto& host_port : hps_) { |
35 | 0 | SendAsyncRequest(host_port); |
36 | 0 | } |
37 | 0 | std::unique_lock<decltype(mutex_)> l(mutex_); |
38 | 0 | cond_.wait(l, [&] { return callback_triggered_; } ); |
39 | 0 | } |
40 | | |
41 | 52.2k | void UniverseKeyClient::SendAsyncRequest(HostPort host_port) { |
42 | 52.2k | master::GetUniverseKeyRegistryRequestPB req; |
43 | 52.2k | auto resp = std::make_shared<master::GetUniverseKeyRegistryResponsePB>(); |
44 | 52.2k | auto rpc = std::make_shared<rpc::RpcController>(); |
45 | 52.2k | rpc->set_timeout(10s); |
46 | | |
47 | 52.2k | master::MasterEncryptionProxy peer_proxy(proxy_cache_, host_port); |
48 | 52.2k | peer_proxy.GetUniverseKeyRegistryAsync( |
49 | 52.2k | req, resp.get(), rpc.get(), |
50 | 52.2k | std::bind(&UniverseKeyClient::ProcessGetUniverseKeyRegistryResponse, this, resp, rpc, |
51 | 52.2k | host_port)); |
52 | 52.2k | } |
53 | | |
54 | | void UniverseKeyClient::ProcessGetUniverseKeyRegistryResponse( |
55 | | std::shared_ptr<master::GetUniverseKeyRegistryResponsePB> resp, |
56 | | std::shared_ptr<rpc::RpcController> rpc, |
57 | 52.2k | HostPort host_port) { |
58 | 52.2k | if (!rpc->status().ok() || resp->has_error()12.7k ) { |
59 | 39.4k | LOG(WARNING) << Format("Rpc status: $0, resp: $1", rpc->status(), resp->ShortDebugString()); |
60 | | // Always retry the request on failure. |
61 | 39.4k | SendAsyncRequest(host_port); |
62 | 39.4k | return; |
63 | 39.4k | } |
64 | 12.7k | std::unique_lock<decltype(mutex_)> l(mutex_); |
65 | 12.7k | LOG(INFO) << "Received universe keys from master: " << host_port.ToString(); |
66 | 12.7k | callback_(resp->universe_keys()); |
67 | 12.7k | callback_triggered_ = true; |
68 | 12.7k | cond_.notify_all(); |
69 | 12.7k | } |
70 | | |
71 | | } // namespace client |
72 | | } // namespace yb |