YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/yql/redis/redisserver/redis_rpc.h
Line
Count
Source (jump to first uncovered line)
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#ifndef YB_YQL_REDIS_REDISSERVER_REDIS_RPC_H
17
#define YB_YQL_REDIS_REDISSERVER_REDIS_RPC_H
18
19
#include <stdint.h>
20
21
#include <type_traits>
22
23
#include <boost/container/small_vector.hpp>
24
#include <boost/version.hpp>
25
26
#include "yb/common/redis_protocol.pb.h"
27
28
#include "yb/rpc/connection_context.h"
29
#include "yb/rpc/growable_buffer.h"
30
#include "yb/rpc/rpc_with_queue.h"
31
32
#include "yb/util/net/net_fwd.h"
33
#include "yb/util/size_literals.h"
34
35
#include "yb/yql/redis/redisserver/redis_fwd.h"
36
37
namespace yb {
38
39
class MemTracker;
40
41
namespace redisserver {
42
43
class RedisParser;
44
45
YB_DEFINE_ENUM(RedisClientMode, (kNormal)(kSubscribed)(kMonitoring));
46
47
class RedisConnectionContext : public rpc::ConnectionContextWithQueue {
48
 public:
49
  RedisConnectionContext(
50
      rpc::GrowableBufferAllocator* allocator,
51
      const MemTrackerPtr& call_tracker);
52
  ~RedisConnectionContext();
53
209k
  bool is_authenticated() const {
54
209k
    return authenticated_.load(std::memory_order_acquire);
55
209k
  }
56
526
  void set_authenticated(bool flag) {
57
526
    authenticated_.store(flag, std::memory_order_release);
58
526
  }
59
60
104k
  std::string redis_db_to_use() const {
61
104k
    return redis_db_name_;
62
104k
  }
63
64
83
  void use_redis_db(const std::string& name) {
65
83
    redis_db_name_ = name;
66
83
  }
67
68
1.40k
  static std::string Name() { return "Redis"; }
69
70
314k
  RedisClientMode ClientMode() { return mode_.load(std::memory_order_acquire); }
71
72
0
  void SetClientMode(RedisClientMode mode) { mode_.store(mode, std::memory_order_release); }
73
74
0
  void SetCleanupHook(std::function<void()> hook) { cleanup_hook_ = std::move(hook); }
75
76
  // Shutdown this context. Clean up the subscriptions if any.
77
  void Shutdown(const Status& status) override;
78
79
  CHECKED_STATUS ReportPendingWriteBytes(size_t bytes_in_queue) override;
80
81
 private:
82
528
  void Connected(const rpc::ConnectionPtr& connection) override {}
83
84
0
  rpc::RpcConnectionPB::StateType State() override {
85
0
    return rpc::RpcConnectionPB::OPEN;
86
0
  }
87
88
  Result<rpc::ProcessCallsResult> ProcessCalls(const rpc::ConnectionPtr& connection,
89
                                               const IoVecs& bytes_to_process,
90
                                               rpc::ReadBufferFull read_buffer_full) override;
91
92
629k
  rpc::StreamReadBuffer& ReadBuffer() override {
93
629k
    return read_buffer_;
94
629k
  }
95
96
  // Takes ownership of data content.
97
  CHECKED_STATUS HandleInboundCall(const rpc::ConnectionPtr& connection,
98
                                   size_t commands_in_batch,
99
                                   rpc::CallData* data);
100
101
  std::unique_ptr<RedisParser> parser_;
102
  rpc::GrowableBuffer read_buffer_;
103
  size_t commands_in_batch_ = 0;
104
  size_t end_of_batch_ = 0;
105
  std::atomic<bool> authenticated_{false};
106
  std::string redis_db_name_ = "0";
107
  std::atomic<RedisClientMode> mode_{RedisClientMode::kNormal};
108
  CoarseTimePoint soft_limit_exceeded_since_{CoarseTimePoint::max()};
109
  std::function<void()> cleanup_hook_;
110
111
  MemTrackerPtr call_mem_tracker_;
112
};
113
114
class RedisInboundCall : public rpc::QueueableInboundCall {
115
 public:
116
  explicit RedisInboundCall(
117
     rpc::ConnectionPtr conn,
118
     size_t weight_in_bytes,
119
     CallProcessedListener call_processed_listener);
120
121
  ~RedisInboundCall();
122
  // Takes ownership of data content.
123
  CHECKED_STATUS ParseFrom(const MemTrackerPtr& mem_tracker, size_t commands, rpc::CallData* data);
124
125
  // Serialize the response packet for the finished call.
126
  // The resulting slices refer to memory in this object.
127
  void DoSerialize(boost::container::small_vector_base<RefCntBuffer>* output) override;
128
  void GetCallDetails(rpc::RpcCallInProgressPB *call_in_progress_pb) const;
129
  void LogTrace() const override;
130
  std::string ToString() const override;
131
  bool DumpPB(const rpc::DumpRunningRpcsRequestPB& req, rpc::RpcCallInProgressPB* resp) override;
132
133
  CoarseTimePoint GetClientDeadline() const override;
134
135
209k
  RedisClientBatch& client_batch() { return client_batch_; }
136
  RedisConnectionContext& connection_context() const;
137
138
  Slice serialized_remote_method() const override;
139
  Slice method_name() const override;
140
141
  static Slice static_serialized_remote_method();
142
143
  void Respond(size_t idx, bool is_success, RedisResponsePB* resp);
144
145
  void RespondFailure(rpc::ErrorStatusPB::RpcErrorCodePB error_code, const Status& status) override;
146
147
  void RespondFailure(size_t idx, const Status& status);
148
  void RespondSuccess(size_t idx,
149
                      const rpc::RpcMethodMetrics& metrics,
150
                      RedisResponsePB* resp);
151
48
  void MarkForClose() { quit_.store(true, std::memory_order_release); }
152
153
104k
  size_t ObjectSize() const override { return sizeof(*this); }
154
155
104k
  size_t DynamicMemoryUsage() const override {
156
104k
    return QueueableInboundCall::DynamicMemoryUsage() +
157
104k
           DynamicMemoryUsageOf(responses_, ready_, client_batch_);
158
104k
  }
159
160
 private:
161
162
  // The connection on which this inbound call arrived.
163
  static constexpr size_t batch_capacity = RedisClientBatch::static_capacity;
164
  boost::container::small_vector<RedisResponsePB, batch_capacity> responses_;
165
  boost::container::small_vector<std::atomic<size_t>, batch_capacity> ready_;
166
  std::atomic<size_t> ready_count_{0};
167
  std::atomic<bool> had_failures_{false};
168
  RedisClientBatch client_batch_;
169
170
  // Atomic bool to indicate if the command batch has been parsed.
171
  std::atomic<bool> parsed_ = {false};
172
173
  // Atomic bool to indicate if the quit command is present
174
  std::atomic<bool> quit_ = {false};
175
176
  ScopedTrackedConsumption consumption_;
177
};
178
179
} // namespace redisserver
180
} // namespace yb
181
182
#endif // YB_YQL_REDIS_REDISSERVER_REDIS_RPC_H