/Users/deen/code/yugabyte-db/src/yb/rpc/rpc_with_call_id.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | // |
15 | | |
16 | | #include "yb/rpc/rpc_with_call_id.h" |
17 | | |
18 | | #include "yb/rpc/connection.h" |
19 | | #include "yb/rpc/reactor.h" |
20 | | #include "yb/rpc/rpc_introspection.pb.h" |
21 | | |
22 | | #include "yb/util/status_format.h" |
23 | | #include "yb/util/string_util.h" |
24 | | |
25 | | namespace yb { |
26 | | namespace rpc { |
27 | | |
28 | 944k | ConnectionContextWithCallId::ConnectionContextWithCallId() {} |
29 | | |
30 | | void ConnectionContextWithCallId::DumpPB(const DumpRunningRpcsRequestPB& req, |
31 | 21 | RpcConnectionPB* resp) { |
32 | 1 | for (const auto &entry : calls_being_handled_) { |
33 | 1 | entry.second->DumpPB(req, resp->add_calls_in_flight()); |
34 | 1 | } |
35 | 21 | } |
36 | | |
37 | 83.3M | bool ConnectionContextWithCallId::Idle(std::string* reason_not_idle) { |
38 | 83.3M | if (calls_being_handled_.empty()) { |
39 | 79.7M | return true; |
40 | 79.7M | } |
41 | | |
42 | 3.60M | if (reason_not_idle) { |
43 | 0 | AppendWithSeparator( |
44 | 0 | Format("$0 calls being handled: $1", calls_being_handled_.size(), calls_being_handled_), |
45 | 0 | reason_not_idle); |
46 | 0 | } |
47 | | |
48 | 3.60M | return false; |
49 | 3.60M | } |
50 | | |
51 | 24.1M | Status ConnectionContextWithCallId::Store(InboundCall* call) { |
52 | 24.1M | uint64_t call_id = ExtractCallId(call); |
53 | 24.1M | if (!calls_being_handled_.emplace(call_id, call).second) { |
54 | 0 | LOG(WARNING) << call->connection()->ToString() << ": received call ID " << call_id |
55 | 0 | << " but was already processing this ID! Ignoring"; |
56 | 0 | return STATUS_FORMAT(NetworkError, "Received duplicate call id: $0", call_id); |
57 | 0 | } |
58 | 24.1M | return Status::OK(); |
59 | 24.1M | } |
60 | | |
61 | 17.9k | void ConnectionContextWithCallId::Shutdown(const Status& status) { |
62 | 17.9k | } |
63 | | |
64 | 24.1M | void ConnectionContextWithCallId::CallProcessed(InboundCall* call) { |
65 | 24.1M | DCHECK(call->connection()->reactor()->IsCurrentThreadOrStartedClosing()); |
66 | | |
67 | 24.1M | ++processed_call_count_; |
68 | 24.1M | auto id = ExtractCallId(call); |
69 | 24.1M | auto it = calls_being_handled_.find(id); |
70 | 24.1M | if (it == calls_being_handled_.end() || it->second != call) { |
71 | 0 | std::string existing = it == calls_being_handled_.end() ? "<NONE>" : it->second->ToString(); |
72 | 0 | LOG(DFATAL) << "Processed call with invalid id: " << id << ", call: " << call->ToString() |
73 | 0 | << ", existing: " << existing; |
74 | 0 | return; |
75 | 0 | } |
76 | 24.1M | calls_being_handled_.erase(it); |
77 | 24.1M | if (Idle() && idle_listener_) { |
78 | 167 | idle_listener_(); |
79 | 167 | } |
80 | 24.1M | } |
81 | | |
82 | | void ConnectionContextWithCallId::QueueResponse(const ConnectionPtr& conn, |
83 | 24.1M | InboundCallPtr call) { |
84 | 24.1M | conn->QueueOutboundData(std::move(call)); |
85 | 24.1M | } |
86 | | |
87 | | } // namespace rpc |
88 | | } // namespace yb |