YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rpc/rpc_with_call_id.cc
Line
Count
Source (jump to first uncovered line)
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#include "yb/rpc/rpc_with_call_id.h"
17
18
#include "yb/rpc/connection.h"
19
#include "yb/rpc/reactor.h"
20
#include "yb/rpc/rpc_introspection.pb.h"
21
22
#include "yb/util/status_format.h"
23
#include "yb/util/string_util.h"
24
25
namespace yb {
26
namespace rpc {
27
28
3.76M
ConnectionContextWithCallId::ConnectionContextWithCallId() {}
29
30
void ConnectionContextWithCallId::DumpPB(const DumpRunningRpcsRequestPB& req,
31
21
                                         RpcConnectionPB* resp) {
32
21
  for (const auto &entry : calls_being_handled_) {
33
1
    entry.second->DumpPB(req, resp->add_calls_in_flight());
34
1
  }
35
21
}
36
37
998M
bool ConnectionContextWithCallId::Idle(std::string* reason_not_idle) {
38
998M
  if (calls_being_handled_.empty()) {
39
983M
    return true;
40
983M
  }
41
42
14.4M
  if (reason_not_idle) {
43
0
    AppendWithSeparator(
44
0
        Format("$0 calls being handled: $1", calls_being_handled_.size(), calls_being_handled_),
45
0
        reason_not_idle);
46
0
  }
47
48
14.4M
  return false;
49
998M
}
50
51
79.7M
Status ConnectionContextWithCallId::Store(InboundCall* call) {
52
79.7M
  uint64_t call_id = ExtractCallId(call);
53
79.7M
  if (!calls_being_handled_.emplace(call_id, call).second) {
54
0
    LOG(WARNING) << call->connection()->ToString() << ": received call ID " << call_id
55
0
                 << " but was already processing this ID! Ignoring";
56
0
    return STATUS_FORMAT(NetworkError, "Received duplicate call id: $0", call_id);
57
0
  }
58
79.7M
  return Status::OK();
59
79.7M
}
60
61
18.7k
void ConnectionContextWithCallId::Shutdown(const Status& status) {
62
18.7k
}
63
64
79.7M
void ConnectionContextWithCallId::CallProcessed(InboundCall* call) {
65
79.7M
  DCHECK(call->connection()->reactor()->IsCurrentThreadOrStartedClosing());
66
67
79.7M
  ++processed_call_count_;
68
79.7M
  auto id = ExtractCallId(call);
69
79.7M
  auto it = calls_being_handled_.find(id);
70
79.7M
  if (it == calls_being_handled_.end() || 
it->second != call79.7M
) {
71
0
    std::string existing = it == calls_being_handled_.end() ? "<NONE>" : it->second->ToString();
72
0
    LOG(DFATAL) << "Processed call with invalid id: " << id << ", call: " << call->ToString()
73
0
                << ", existing: " << existing;
74
0
    return;
75
0
  }
76
79.7M
  calls_being_handled_.erase(it);
77
79.7M
  if (Idle() && 
idle_listener_73.7M
) {
78
2.37k
    idle_listener_();
79
2.37k
  }
80
79.7M
}
81
82
void ConnectionContextWithCallId::QueueResponse(const ConnectionPtr& conn,
83
79.7M
                                                InboundCallPtr call) {
84
79.7M
  conn->QueueOutboundData(std::move(call));
85
79.7M
}
86
87
} // namespace rpc
88
} // namespace yb