YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rpc/rpc_with_queue.cc
Line
Count
Source (jump to first uncovered line)
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#include "yb/rpc/rpc_with_queue.h"
17
18
#include "yb/gutil/casts.h"
19
20
#include "yb/rpc/connection.h"
21
#include "yb/rpc/messenger.h"
22
#include "yb/rpc/reactor.h"
23
#include "yb/rpc/rpc_introspection.pb.h"
24
25
#include "yb/util/string_util.h"
26
27
namespace yb {
28
namespace rpc {
29
30
ConnectionContextWithQueue::ConnectionContextWithQueue(
31
    size_t max_concurrent_calls,
32
    size_t max_queued_bytes)
33
1.51k
    : max_concurrent_calls_(max_concurrent_calls), max_queued_bytes_(max_queued_bytes) {
34
1.51k
}
35
36
472
ConnectionContextWithQueue::~ConnectionContextWithQueue() {
37
472
}
38
39
void ConnectionContextWithQueue::DumpPB(const DumpRunningRpcsRequestPB& req,
40
0
                                        RpcConnectionPB* resp) {
41
0
  for (auto& call : calls_queue_) {
42
0
    call->DumpPB(req, resp->add_calls_in_flight());
43
0
  }
44
0
}
45
46
210k
bool ConnectionContextWithQueue::Idle(std::string* reason_not_idle) {
47
210k
  if (calls_queue_.empty()) {
48
210k
    return true;
49
210k
  }
50
51
2
  if (reason_not_idle) {
52
0
    AppendWithSeparator(Format("$0 calls", calls_queue_.size()), reason_not_idle);
53
0
  }
54
55
2
  return false;
56
210k
}
57
58
209k
void ConnectionContextWithQueue::Enqueue(std::shared_ptr<QueueableInboundCall> call) {
59
209k
  auto reactor = call->connection()->reactor();
60
209k
  DCHECK(reactor->IsCurrentThread());
61
62
209k
  calls_queue_.push_back(call);
63
209k
  queued_bytes_ += call->weight_in_bytes();
64
65
209k
  size_t size = calls_queue_.size();
66
209k
  if (size == replies_being_sent_ + 1) {
67
209k
    first_without_reply_.store(call.get(), std::memory_order_release);
68
209k
  }
69
209k
  if (size <= max_concurrent_calls_) {
70
209k
    reactor->messenger()->Handle(call, Queue::kTrue);
71
209k
  }
72
209k
}
73
74
947
void ConnectionContextWithQueue::Shutdown(const Status& status) {
75
  // Could erase calls, that we did not start to process yet.
76
947
  if (calls_queue_.size() > max_concurrent_calls_) {
77
0
    calls_queue_.erase(calls_queue_.begin() + max_concurrent_calls_, calls_queue_.end());
78
0
  }
79
80
947
  for (auto& call : calls_queue_) {
81
2
    call->Abort(status);
82
2
  }
83
947
}
84
85
209k
void ConnectionContextWithQueue::CallProcessed(InboundCall* call) {
86
209k
  ++processed_call_count_;
87
209k
  auto reactor = call->connection()->reactor();
88
209k
  DCHECK(reactor->IsCurrentThread());
89
90
209k
  DCHECK(!calls_queue_.empty());
91
209k
  DCHECK_EQ(calls_queue_.front().get(), call);
92
209k
  DCHECK_GT(replies_being_sent_, 0);
93
94
209k
  bool could_enqueue = can_enqueue();
95
209k
  auto call_weight_in_bytes = down_cast<QueueableInboundCall*>(call)->weight_in_bytes();
96
209k
  queued_bytes_ -= call_weight_in_bytes;
97
98
209k
  calls_queue_.pop_front();
99
209k
  --replies_being_sent_;
100
209k
  if (calls_queue_.size() >= max_concurrent_calls_) {
101
0
    auto call_ptr = calls_queue_[max_concurrent_calls_ - 1];
102
0
    reactor->messenger()->Handle(call_ptr, Queue::kTrue);
103
0
  }
104
209k
  if (Idle() && idle_listener_) {
105
1
    idle_listener_();
106
1
  }
107
108
209k
  if (!could_enqueue && 
can_enqueue()0
) {
109
0
    call->connection()->ParseReceived();
110
0
  }
111
209k
}
112
113
void ConnectionContextWithQueue::QueueResponse(const ConnectionPtr& conn,
114
209k
                                               InboundCallPtr call) {
115
209k
  QueueableInboundCall* queueable_call = down_cast<QueueableInboundCall*>(call.get());
116
209k
  queueable_call->SetHasReply();
117
209k
  if (queueable_call == first_without_reply_.load(std::memory_order_acquire)) {
118
209k
    auto scheduled = conn->reactor()->ScheduleReactorTask(flush_outbound_queue_task_);
119
209k
    LOG_IF
(WARNING, !scheduled) << "Failed to schedule flush outbound queue"0
;
120
209k
  }
121
209k
}
122
123
209k
void ConnectionContextWithQueue::FlushOutboundQueue(Connection* conn) {
124
209k
  DCHECK(conn->reactor()->IsCurrentThread());
125
126
209k
  const size_t begin = replies_being_sent_;
127
209k
  size_t end = begin;
128
209k
  for (;;) {
129
209k
    size_t queue_size = calls_queue_.size();
130
419k
    while (end < queue_size) {
131
209k
      if (!calls_queue_[end]->has_reply()) {
132
0
        break;
133
0
      }
134
209k
      ++end;
135
209k
    }
136
209k
    auto new_first_without_reply = end < queue_size ? 
calls_queue_[end].get()0
: nullptr;
137
209k
    first_without_reply_.store(new_first_without_reply, std::memory_order_release);
138
    // It is usual case that we break here, but sometimes there could happen that before updating
139
    // first_without_reply_ we did QueueResponse for this call.
140
209k
    if (!new_first_without_reply || 
!new_first_without_reply->has_reply()0
) {
141
209k
      break;
142
209k
    }
143
209k
  }
144
145
209k
  if (begin != end) {
146
209k
    replies_being_sent_ = end;
147
209k
    boost::container::small_vector<OutboundDataPtr, 64> batch(
148
209k
        calls_queue_.begin() + begin,
149
209k
        calls_queue_.begin() + end);
150
209k
    conn->QueueOutboundDataBatch(batch);
151
209k
  }
152
209k
}
153
154
1.51k
void ConnectionContextWithQueue::AssignConnection(const ConnectionPtr& conn) {
155
1.51k
  flush_outbound_queue_task_ = MakeFunctorReactorTask(
156
1.51k
      std::bind(&ConnectionContextWithQueue::FlushOutboundQueue, this, conn.get()), conn,
157
1.51k
      SOURCE_LOCATION());
158
1.51k
}
159
160
} // namespace rpc
161
} // namespace yb