/Users/deen/code/yugabyte-db/src/yb/rpc/rpc-bench.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include <string> |
34 | | #include <thread> |
35 | | |
36 | | #include <gtest/gtest.h> |
37 | | |
38 | | #include "yb/rpc/proxy.h" |
39 | | #include "yb/rpc/rpc-test-base.h" |
40 | | #include "yb/rpc/rpc_controller.h" |
41 | | #include "yb/rpc/rtest.proxy.h" |
42 | | |
43 | | #include "yb/util/countdown_latch.h" |
44 | | #include "yb/util/net/net_util.h" |
45 | | #include "yb/util/status_log.h" |
46 | | #include "yb/util/test_util.h" |
47 | | #include "yb/util/thread.h" |
48 | | |
49 | | using namespace std::literals; // NOLINT |
50 | | |
51 | | using std::string; |
52 | | using std::shared_ptr; |
53 | | |
54 | | namespace yb { |
55 | | namespace rpc { |
56 | | |
57 | | class RpcBench : public RpcTestBase { |
58 | | public: |
59 | 1 | RpcBench() {} |
60 | | |
61 | | protected: |
62 | | friend class ClientThread; |
63 | | |
64 | | HostPort server_hostport_; |
65 | | std::atomic<bool> should_run_{true}; |
66 | | }; |
67 | | |
68 | | class ClientThread { |
69 | | public: |
70 | | explicit ClientThread(RpcBench *bench) |
71 | | : bench_(bench), |
72 | 16 | request_count_(0) { |
73 | 16 | } |
74 | | |
75 | 16 | void Start() { |
76 | 16 | thread_.reset(new std::thread(&ClientThread::Run, this)); |
77 | 16 | } |
78 | | |
79 | 0 | void Join() { |
80 | 0 | thread_->join(); |
81 | 0 | } |
82 | | |
83 | 16 | void Run() { |
84 | 16 | CDSAttacher attacher; |
85 | 16 | auto client_messenger = CreateAutoShutdownMessengerHolder(bench_->CreateMessenger("Client")); |
86 | 16 | ProxyCache proxy_cache(client_messenger.get()); |
87 | | |
88 | 16 | rpc_test::CalculatorServiceProxy p(&proxy_cache, HostPort(bench_->server_hostport_)); |
89 | | |
90 | 16 | rpc_test::AddRequestPB req; |
91 | 16 | rpc_test::AddResponsePB resp; |
92 | 83 | while (bench_->should_run_.load(std::memory_order_acquire)) { |
93 | 67 | req.set_x(request_count_); |
94 | 67 | req.set_y(request_count_); |
95 | 67 | RpcController controller; |
96 | 67 | controller.set_timeout(MonoDelta::FromSeconds(10)); |
97 | 67 | CHECK_OK(p.Add(req, &resp, &controller)); |
98 | 67 | CHECK_EQ(req.x() + req.y(), resp.result()); |
99 | 67 | request_count_++; |
100 | 67 | } |
101 | 16 | } |
102 | | |
103 | | std::unique_ptr<std::thread> thread_; |
104 | | RpcBench *bench_; |
105 | | int request_count_; |
106 | | }; |
107 | | |
108 | | |
109 | | // Test making successful RPC calls. |
110 | 1 | TEST_F(RpcBench, BenchmarkCalls) { |
111 | 1 | TestServerOptions options; |
112 | 1 | options.n_worker_threads = 1; |
113 | | |
114 | | // Set up server. |
115 | 1 | StartTestServerWithGeneratedCode(&server_hostport_); |
116 | | |
117 | | // Set up client. |
118 | 1 | LOG(INFO) << "Connecting to " << server_hostport_; |
119 | 1 | MessengerOptions client_options = kDefaultClientMessengerOptions; |
120 | 1 | client_options.n_reactors = 2; |
121 | 1 | auto client_messenger = CreateAutoShutdownMessengerHolder("Client", client_options); |
122 | | |
123 | 1 | Stopwatch sw(Stopwatch::ALL_THREADS); |
124 | 1 | sw.start(); |
125 | | |
126 | 1 | std::vector<std::unique_ptr<ClientThread>> threads; |
127 | | #if defined(THREAD_SANITIZER) || defined(ADDRESS_SANITIZER) |
128 | | constexpr int kNumThreads = 4; |
129 | | #else |
130 | 1 | constexpr int kNumThreads = 16; |
131 | 1 | #endif |
132 | 17 | for (int i = 0; i < kNumThreads; i++) { |
133 | 16 | auto thr = std::make_unique<ClientThread>(this); |
134 | 16 | thr->Start(); |
135 | 16 | threads.push_back(std::move(thr)); |
136 | 16 | } |
137 | | |
138 | 1 | std::this_thread::sleep_for(10s); |
139 | 1 | should_run_.store(false, std::memory_order_release); |
140 | | |
141 | 1 | int total_reqs = 0; |
142 | | |
143 | 0 | for (const auto& thr : threads) { |
144 | 0 | thr->Join(); |
145 | 0 | total_reqs += thr->request_count_; |
146 | 0 | } |
147 | 1 | sw.stop(); |
148 | | |
149 | 1 | float reqs_per_second = static_cast<float>(total_reqs / sw.elapsed().wall_seconds()); |
150 | 1 | float user_cpu_micros_per_req = static_cast<float>(sw.elapsed().user / 1000.0 / total_reqs); |
151 | 1 | float sys_cpu_micros_per_req = static_cast<float>(sw.elapsed().system / 1000.0 / total_reqs); |
152 | | |
153 | 1 | LOG(INFO) << "Reqs/sec: " << reqs_per_second; |
154 | 1 | LOG(INFO) << "User CPU per req: " << user_cpu_micros_per_req << "us"; |
155 | 1 | LOG(INFO) << "Sys CPU per req: " << sys_cpu_micros_per_req << "us"; |
156 | 1 | } |
157 | | |
158 | | } // namespace rpc |
159 | | } // namespace yb |
160 | | |