YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/rpc_with_queue.h
Line
Count
Source (jump to first uncovered line)
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#ifndef YB_RPC_RPC_WITH_QUEUE_H
17
#define YB_RPC_RPC_WITH_QUEUE_H
18
19
#include <stdint.h>
20
21
#include <functional>
22
#include <mutex>
23
#include <type_traits>
24
#include <unordered_set>
25
26
#include "yb/rpc/connection_context.h"
27
#include "yb/rpc/inbound_call.h"
28
29
#include "yb/util/size_literals.h"
30
31
namespace yb {
32
namespace rpc {
33
34
class QueueableInboundCall : public InboundCall {
35
 public:
36
  QueueableInboundCall(ConnectionPtr conn, size_t weight_in_bytes,
37
                       CallProcessedListener call_processed_listener)
38
      : InboundCall(std::move(conn), nullptr /* rpc_metrics */, std::move(call_processed_listener)),
39
104k
        weight_in_bytes_(weight_in_bytes) {}
40
41
104k
  void SetHasReply() {
42
104k
    has_reply_.store(true, std::memory_order_release);
43
104k
  }
44
45
104k
  bool has_reply() const {
46
104k
    return has_reply_.load(std::memory_order_acquire);
47
104k
  }
48
49
0
  void Abort(const Status& status) {
50
0
    aborted_.store(true, std::memory_order_release);
51
0
  }
52
53
104k
  bool aborted() const {
54
104k
    return aborted_.load(std::memory_order_acquire);
55
104k
  }
56
57
  // Context with queue has limit on bytes used by queued commands.
58
  // `weight_in_bytes` function is used to determine how many bytes consumes this call.
59
209k
  size_t weight_in_bytes() const { return weight_in_bytes_; }
60
61
 private:
62
  std::atomic<bool> has_reply_{false};
63
  std::atomic<bool> aborted_{false};
64
  const size_t weight_in_bytes_;
65
};
66
67
class ConnectionContextWithQueue : public ConnectionContextBase {
68
 protected:
69
  explicit ConnectionContextWithQueue(
70
      size_t max_concurrent_calls,
71
      size_t max_queued_bytes);
72
73
  ~ConnectionContextWithQueue();
74
75
104k
  InboundCall::CallProcessedListener call_processed_listener() {
76
104k
    return std::bind(&ConnectionContextWithQueue::CallProcessed, this, std::placeholders::_1);
77
104k
  }
78
79
209k
  bool can_enqueue() const {
80
209k
    return queued_bytes_ <= max_queued_bytes_;
81
209k
  }
82
83
  void Enqueue(std::shared_ptr<QueueableInboundCall> call);
84
85
0
  uint64_t ProcessedCallCount() override {
86
0
    return processed_call_count_.load(std::memory_order_acquire);
87
0
  }
88
89
  void Shutdown(const Status& status) override;
90
91
 private:
92
  void AssignConnection(const ConnectionPtr& conn) override;
93
  void DumpPB(const DumpRunningRpcsRequestPB& req, RpcConnectionPB* resp) override;
94
  bool Idle(std::string* reason_not_idle = nullptr) override;
95
  void QueueResponse(const ConnectionPtr& conn, InboundCallPtr call) override;
96
0
  void ListenIdle(IdleListener listener) override { idle_listener_ = std::move(listener); }
97
98
  void CallProcessed(InboundCall* call);
99
  void FlushOutboundQueue(Connection* conn);
100
  void FlushOutboundQueueAborted(const Status& status);
101
102
  const size_t max_concurrent_calls_;
103
  const size_t max_queued_bytes_;
104
  size_t replies_being_sent_ = 0;
105
  size_t queued_bytes_ = 0;
106
107
  // Calls that are being processed by this connection/context.
108
  // At the top or queue there are replies_being_sent_ calls, for which we are sending reply.
109
  // After that there are calls that are being processed.
110
  // first_without_reply_ points to the first of them.
111
  // There are not more than max_concurrent_calls_ entries in first two groups.
112
  // After end of queue there are calls that we received but processing did not start for them.
113
  std::deque<std::shared_ptr<QueueableInboundCall>> calls_queue_;
114
  std::shared_ptr<ReactorTask> flush_outbound_queue_task_;
115
116
  // First call that does not have reply yet.
117
  std::atomic<QueueableInboundCall*> first_without_reply_{nullptr};
118
  std::atomic<uint64_t> processed_call_count_{0};
119
  IdleListener idle_listener_;
120
};
121
122
} // namespace rpc
123
} // namespace yb
124
125
#endif // YB_RPC_RPC_WITH_QUEUE_H