YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/yql/redis/redisserver/redis_service.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/yql/redis/redisserver/redis_service.h"
15
16
#include <thread>
17
18
#include <boost/algorithm/string/case_conv.hpp>
19
#include <boost/lockfree/queue.hpp>
20
#include <gflags/gflags.h>
21
22
#include "yb/client/client.h"
23
#include "yb/client/error.h"
24
#include "yb/client/meta_cache.h"
25
#include "yb/client/meta_data_cache.h"
26
#include "yb/client/session.h"
27
#include "yb/client/table.h"
28
#include "yb/client/yb_op.h"
29
#include "yb/client/yb_table_name.h"
30
31
#include "yb/common/redis_protocol.pb.h"
32
33
#include "yb/gutil/casts.h"
34
#include "yb/gutil/strings/join.h"
35
36
#include "yb/master/master_heartbeat.pb.h"
37
38
#include "yb/rpc/connection.h"
39
#include "yb/rpc/rpc_controller.h"
40
#include "yb/rpc/rpc_introspection.pb.h"
41
42
#include "yb/tserver/tablet_server_interface.h"
43
#include "yb/tserver/tserver_service.proxy.h"
44
45
#include "yb/util/locks.h"
46
#include "yb/util/logging.h"
47
#include "yb/util/memory/mc_types.h"
48
#include "yb/util/metrics.h"
49
#include "yb/util/redis_util.h"
50
#include "yb/util/result.h"
51
#include "yb/util/shared_lock.h"
52
#include "yb/util/size_literals.h"
53
54
#include "yb/yql/redis/redisserver/redis_commands.h"
55
#include "yb/yql/redis/redisserver/redis_encoding.h"
56
#include "yb/yql/redis/redisserver/redis_rpc.h"
57
58
using yb::operator"" _MB;
59
using namespace std::literals;
60
using namespace std::placeholders;
61
using yb::client::YBMetaDataCache;
62
using strings::Substitute;
63
using yb::rpc::Connection;
64
65
DEFINE_REDIS_histogram_EX(error,
66
                          "yb.redisserver.RedisServerService.AnyMethod RPC Time",
67
                          "yb.redisserver.RedisServerService.ErrorUnsupportedMethod()");
68
DEFINE_REDIS_histogram_EX(get_internal,
69
                          "yb.redisserver.RedisServerService.Get RPC Time",
70
                          "in yb.client.Get");
71
DEFINE_REDIS_histogram_EX(set_internal,
72
                          "yb.redisserver.RedisServerService.Set RPC Time",
73
                          "in yb.client.Set");
74
75
#define DEFINE_REDIS_SESSION_GAUGE(state) \
76
  METRIC_DEFINE_gauge_uint64( \
77
      server, \
78
      BOOST_PP_CAT(redis_, BOOST_PP_CAT(state, _sessions)), \
79
      "Number of " BOOST_PP_STRINGIZE(state) " sessions", \
80
      yb::MetricUnit::kUnits, \
81
      "Number of sessions " BOOST_PP_STRINGIZE(state) " by Redis service.") \
82
      /**/
83
84
DEFINE_REDIS_SESSION_GAUGE(allocated);
85
DEFINE_REDIS_SESSION_GAUGE(available);
86
87
METRIC_DEFINE_gauge_uint64(
88
    server, redis_monitoring_clients, "Number of clients running monitor", yb::MetricUnit::kUnits,
89
    "Number of clients running monitor ");
90
91
#if defined(THREAD_SANITIZER) || defined(ADDRESS_SANITIZER)
92
constexpr int32_t kDefaultRedisServiceTimeoutMs = 600000;
93
#else
94
constexpr int32_t kDefaultRedisServiceTimeoutMs = 3000;
95
#endif
96
97
DEFINE_int32(redis_service_yb_client_timeout_millis, kDefaultRedisServiceTimeoutMs,
98
             "Timeout in milliseconds for RPC calls from Redis service to master/tserver");
99
100
// In order to support up to three 64MB strings along with other strings,
101
// we have the total size of a redis command at 253_MB, which is less than the consensus size
102
// to account for the headers in the consensus layer.
103
DEFINE_uint64(redis_max_command_size, 253_MB, "Maximum size of the command in redis");
104
105
// Maximum value size is 64MB
106
DEFINE_uint64(redis_max_value_size, 64_MB, "Maximum size of the value in redis");
107
DEFINE_int32(redis_callbacks_threadpool_size, 64,
108
             "The maximum size for the threadpool which handles callbacks from the ybclient layer");
109
110
DEFINE_int32(redis_password_caching_duration_ms, 5000,
111
             "The duration for which we will cache the redis passwords. 0 to disable.");
112
113
DEFINE_bool(redis_safe_batch, true, "Use safe batching with Redis service");
114
DEFINE_bool(enable_redis_auth, true, "Enable AUTH for the Redis service");
115
116
DECLARE_string(placement_cloud);
117
DECLARE_string(placement_region);
118
DECLARE_string(placement_zone);
119
120
using yb::client::YBRedisOp;
121
using yb::client::YBRedisReadOp;
122
using yb::client::YBRedisWriteOp;
123
using yb::client::YBClientBuilder;
124
using yb::client::YBSchema;
125
using yb::client::YBSession;
126
using yb::client::YBStatusCallback;
127
using yb::client::YBTable;
128
using yb::client::YBTableName;
129
using yb::rpc::ConnectionPtr;
130
using yb::rpc::ConnectionWeakPtr;
131
using yb::rpc::OutboundData;
132
using yb::rpc::OutboundDataPtr;
133
using yb::RedisResponsePB;
134
135
namespace yb {
136
namespace redisserver {
137
138
typedef boost::container::small_vector_base<Slice> RedisKeyList;
139
140
namespace {
141
142
YB_DEFINE_ENUM(OperationType, (kNone)(kRead)(kWrite)(kLocal));
143
144
// Returns opposite operation type for specified type. Write for read, read for write.
145
207k
OperationType Opposite(OperationType type) {
146
207k
  switch (type) {
147
84.1k
    case OperationType::kRead:
148
84.1k
      return OperationType::kWrite;
149
123k
    case OperationType::kWrite:
150
123k
      return OperationType::kRead;
151
0
    case OperationType::kNone: FALLTHROUGH_INTENDED;
152
0
    case OperationType::kLocal:
153
0
      FATAL_INVALID_ENUM_VALUE(OperationType, type);
154
207k
  }
155
0
  FATAL_INVALID_ENUM_VALUE(OperationType, type);
156
0
}
157
158
class Operation {
159
 public:
160
  template <class Op>
161
  Operation(const std::shared_ptr<RedisInboundCall>& call,
162
            size_t index,
163
            std::shared_ptr<Op> operation,
164
            const rpc::RpcMethodMetrics& metrics)
165
    : type_(std::is_same<Op, YBRedisReadOp>::value ? OperationType::kRead : OperationType::kWrite),
166
      call_(call),
167
      index_(index),
168
      operation_(std::move(operation)),
169
      metrics_(metrics),
170
207k
      manual_response_(ManualResponse::kFalse) {
171
207k
    auto status = operation_->GetPartitionKey(&partition_key_);
172
207k
    if (!status.ok()) {
173
0
      Respond(status);
174
0
    }
175
207k
  }
redis_service.cc:yb::redisserver::(anonymous namespace)::Operation::Operation<yb::client::YBRedisReadOp>(std::__1::shared_ptr<yb::redisserver::RedisInboundCall> const&, unsigned long, std::__1::shared_ptr<yb::client::YBRedisReadOp>, yb::rpc::RpcMethodMetrics const&)
Line
Count
Source
170
84.1k
      manual_response_(ManualResponse::kFalse) {
171
84.1k
    auto status = operation_->GetPartitionKey(&partition_key_);
172
84.1k
    if (!status.ok()) {
173
0
      Respond(status);
174
0
    }
175
84.1k
  }
redis_service.cc:yb::redisserver::(anonymous namespace)::Operation::Operation<yb::client::YBRedisWriteOp>(std::__1::shared_ptr<yb::redisserver::RedisInboundCall> const&, unsigned long, std::__1::shared_ptr<yb::client::YBRedisWriteOp>, yb::rpc::RpcMethodMetrics const&)
Line
Count
Source
170
123k
      manual_response_(ManualResponse::kFalse) {
171
123k
    auto status = operation_->GetPartitionKey(&partition_key_);
172
123k
    if (!status.ok()) {
173
0
      Respond(status);
174
0
    }
175
123k
  }
176
177
  Operation(const std::shared_ptr<RedisInboundCall>& call,
178
            size_t index,
179
            std::function<bool(client::YBSession*, const StatusFunctor&)> functor,
180
            std::string partition_key,
181
            const rpc::RpcMethodMetrics& metrics,
182
            ManualResponse manual_response)
183
    : type_(OperationType::kLocal),
184
      call_(call),
185
      index_(index),
186
      functor_(std::move(functor)),
187
      partition_key_(std::move(partition_key)),
188
      metrics_(metrics),
189
636
      manual_response_(manual_response) {
190
636
  }
191
192
416k
  bool responded() const {
193
416k
    return responded_.load(std::memory_order_acquire);
194
416k
  }
195
196
0
  size_t index() const {
197
0
    return index_;
198
0
  }
199
200
208k
  OperationType type() const {
201
208k
    return type_;
202
208k
  }
203
204
207k
  const YBRedisOp& operation() const {
205
207k
    return *operation_;
206
207k
  }
207
208
208k
  bool has_operation() const {
209
208k
    return operation_ != nullptr;
210
208k
  }
211
212
208k
  size_t space_used_by_request() const {
213
208k
    return operation_ ? 
operation_->space_used_by_request()207k
:
0636
;
214
208k
  }
215
216
207k
  RedisResponsePB& response() {
217
207k
    switch (type_) {
218
84.3k
      case OperationType::kRead:
219
84.3k
        return *down_cast<YBRedisReadOp*>(operation_.get())->mutable_response();
220
123k
      case OperationType::kWrite:
221
123k
        return *down_cast<YBRedisWriteOp*>(operation_.get())->mutable_response();
222
0
      case OperationType::kNone: FALLTHROUGH_INTENDED;
223
0
      case OperationType::kLocal:
224
0
        FATAL_INVALID_ENUM_VALUE(OperationType, type_);
225
207k
    }
226
0
    FATAL_INVALID_ENUM_VALUE(OperationType, type_);
227
0
  }
228
229
0
  const rpc::RpcMethodMetrics& metrics() const {
230
0
    return metrics_;
231
0
  }
232
233
208k
  const std::string& partition_key() const {
234
208k
    return partition_key_;
235
208k
  }
236
237
208k
  void SetTablet(const client::internal::RemoteTabletPtr& tablet) {
238
208k
    if (operation_) {
239
207k
      operation_->SetTablet(tablet);
240
207k
    }
241
208k
    tablet_ = tablet;
242
208k
  }
243
244
0
  void ResetTable(std::shared_ptr<client::YBTable> table) {
245
0
    if (operation_) {
246
0
      operation_->ResetTable(table);
247
0
    }
248
0
    tablet_.reset();
249
0
  }
250
251
416k
  const client::internal::RemoteTabletPtr& tablet() const {
252
416k
    return tablet_;
253
416k
  }
254
255
0
  RedisInboundCall& call() const {
256
0
    return *call_;
257
0
  }
258
259
207k
  void GetKeys(RedisKeyList* keys) const {
260
207k
    if (FLAGS_redis_safe_batch) {
261
207k
      keys->emplace_back(operation_ ? operation_->GetKey() : 
Slice()0
);
262
207k
    }
263
207k
  }
264
265
208k
  bool Apply(client::YBSession* session, const StatusFunctor& callback, bool* applied_operations) {
266
    // We should destroy functor after this call.
267
    // Because it could hold references to other objects.
268
    // So we more it to temp variable.
269
208k
    auto functor = std::move(functor_);
270
271
208k
    if (call_->aborted()) {
272
0
      Respond(STATUS(Aborted, ""));
273
0
      return false;
274
0
    }
275
276
    // Used for DebugSleep
277
208k
    if (functor) {
278
636
      return functor(session, callback);
279
636
    }
280
281
207k
    session->Apply(operation_);
282
207k
    *applied_operations = true;
283
207k
    return true;
284
208k
  }
285
286
208k
  void Respond(const Status& status) {
287
208k
    responded_.store(true, std::memory_order_release);
288
208k
    if (manual_response_) {
289
636
      return;
290
636
    }
291
292
207k
    if (status.ok()) {
293
207k
      if (operation_) {
294
207k
        call_->RespondSuccess(index_, metrics_, &response());
295
207k
      } else {
296
0
        RedisResponsePB resp;
297
0
        call_->RespondSuccess(index_, metrics_, &resp);
298
0
      }
299
207k
    } else 
if (215
(215
type_ == OperationType::kRead215
||
type_ == OperationType::kWrite14
) &&
300
215
               response().code() == RedisResponsePB_RedisStatusCode_SERVER_ERROR) {
301
215
      call_->Respond(index_, false, &response());
302
215
    } else {
303
0
      call_->RespondFailure(index_, status);
304
0
    }
305
207k
  }
306
307
0
  std::string ToString() const {
308
0
    return Format("{ index: $0, operation: $1 }", index_, operation_);
309
0
  }
310
311
 private:
312
  OperationType type_;
313
  std::shared_ptr<RedisInboundCall> call_;
314
  size_t index_;
315
  std::shared_ptr<YBRedisOp> operation_;
316
  std::function<bool(client::YBSession*, const StatusFunctor&)> functor_;
317
  std::string partition_key_;
318
  rpc::RpcMethodMetrics metrics_;
319
  ManualResponse manual_response_;
320
  client::internal::RemoteTabletPtr tablet_;
321
  std::atomic<bool> responded_{false};
322
};
323
324
class SessionPool {
325
 public:
326
  void Init(client::YBClient* client,
327
572
            const scoped_refptr<MetricEntity>& metric_entity) {
328
572
    client_ = client;
329
572
    auto* proto = &METRIC_redis_allocated_sessions;
330
572
    allocated_sessions_metric_ = proto->Instantiate(metric_entity, 0);
331
572
    proto = &METRIC_redis_available_sessions;
332
572
    available_sessions_metric_ = proto->Instantiate(metric_entity, 0);
333
572
  }
334
335
208k
  std::shared_ptr<client::YBSession> Take() {
336
208k
    client::YBSession* result = nullptr;
337
208k
    if (!queue_.pop(result)) {
338
610
      std::lock_guard<std::mutex> lock(mutex_);
339
610
      auto session = client_->NewSession();
340
610
      session->SetTimeout(
341
610
          MonoDelta::FromMilliseconds(FLAGS_redis_service_yb_client_timeout_millis));
342
610
      sessions_.push_back(session);
343
610
      allocated_sessions_metric_->IncrementBy(1);
344
610
      return session;
345
610
    }
346
207k
    available_sessions_metric_->DecrementBy(1);
347
207k
    return result->shared_from_this();
348
208k
  }
349
350
208k
  void Release(const std::shared_ptr<client::YBSession>& session) {
351
208k
    available_sessions_metric_->IncrementBy(1);
352
208k
    queue_.push(session.get());
353
208k
  }
354
 private:
355
  client::YBClient* client_ = nullptr;
356
  std::mutex mutex_;
357
  std::vector<std::shared_ptr<client::YBSession>> sessions_;
358
  boost::lockfree::queue<client::YBSession*> queue_{30};
359
  scoped_refptr<AtomicGauge<uint64_t>> allocated_sessions_metric_;
360
  scoped_refptr<AtomicGauge<uint64_t>> available_sessions_metric_;
361
};
362
363
class Block;
364
typedef std::shared_ptr<Block> BlockPtr;
365
366
class Block : public std::enable_shared_from_this<Block> {
367
 public:
368
  typedef MCVector<Operation*> Ops;
369
370
  Block(const BatchContextPtr& context,
371
        Ops::allocator_type allocator,
372
        rpc::RpcMethodMetrics metrics_internal)
373
      : context_(context),
374
        ops_(allocator),
375
        metrics_internal_(std::move(metrics_internal)),
376
208k
        start_(MonoTime::Now()) {
377
208k
  }
378
379
  Block(const Block&) = delete;
380
  void operator=(const Block&) = delete;
381
382
208k
  void AddOperation(Operation* operation) {
383
208k
    ops_.push_back(operation);
384
208k
  }
385
386
208k
  void Launch(SessionPool* session_pool, bool allow_local_calls_in_curr_thread = true) {
387
208k
    session_pool_ = session_pool;
388
208k
    session_ = session_pool->Take();
389
208k
    bool has_ok = false;
390
208k
    bool applied_operations = false;
391
    // Supposed to be called only once.
392
208k
    client::FlushCallback callback = BlockCallback(shared_from_this());
393
208k
    auto status_callback = [callback](const Status& status){
394
636
      client::FlushStatus flush_status = {status, {}};
395
636
      callback(&flush_status);
396
636
    };
397
208k
    for (auto* op : ops_) {
398
208k
      has_ok = op->Apply(session_.get(), status_callback, &applied_operations) || 
has_ok0
;
399
208k
    }
400
208k
    if (has_ok) {
401
208k
      if (applied_operations) {
402
        // Allow local calls in this thread only if no one is waiting behind us.
403
207k
        session_->set_allow_local_calls_in_curr_thread(
404
207k
            allow_local_calls_in_curr_thread && 
this->next_ == nullptr207k
);
405
207k
        session_->FlushAsync(std::move(callback));
406
207k
      }
407
208k
    } else {
408
0
      Processed();
409
0
    }
410
208k
  }
411
412
4
  BlockPtr SetNext(const BlockPtr& next) {
413
4
    BlockPtr result = std::move(next_);
414
4
    next_ = next;
415
4
    return result;
416
4
  }
417
418
0
  std::string ToString() const {
419
0
    return Format("{ ops: $0 context: $1 next: $2 }",
420
0
                  ops_, static_cast<void*>(context_.get()), next_);
421
0
  }
422
423
 private:
424
  class BlockCallback {
425
   public:
426
208k
    explicit BlockCallback(BlockPtr block) : block_(std::move(block)) {
427
      // We remember block_->context_ to avoid issues with having multiple instances referring the
428
      // same block (that is allocated in arena and one of them calling block_->Done while another
429
      // still have reference to block and trying to update ref counter for it in destructor.
430
208k
      context_ = block_ ? block_->context_ : 
nullptr0
;
431
208k
    }
432
433
5.40M
    ~BlockCallback() {
434
      // We only reset context_ after block_, because resetting context_ could free Arena memory
435
      // on which block_ is allocated together with its ref counter.
436
5.40M
      block_.reset();
437
5.40M
      context_.reset();
438
5.40M
    }
439
440
208k
    void operator()(client::FlushStatus* status) {
441
      // Block context owns the arena upon which this block is created.
442
      // Done is going to free up block's reference to context. So, unless we ensure that
443
      // the context lives beyond the block_.reset() we might get an error while updating the
444
      // ref-count for the block_ (in the area of arena owned by the context).
445
208k
      auto context = block_->context_;
446
208k
      DCHECK
(context != nullptr) << block_.get()0
;
447
208k
      block_->Done(status);
448
208k
      block_.reset();
449
208k
    }
450
   private:
451
    BlockPtr block_;
452
    BatchContextPtr context_;
453
  };
454
455
208k
  void Done(client::FlushStatus* flush_status) {
456
208k
    MonoTime now = MonoTime::Now();
457
208k
    metrics_internal_.handler_latency->Increment(now.GetDeltaSince(start_).ToMicroseconds());
458
208k
    VLOG
(3) << "Received status from call " << flush_status->status.ToString(true)0
;
459
460
208k
    std::unordered_map<const client::YBOperation*, Status> op_errors;
461
208k
    bool tablet_not_found = false;
462
208k
    if (!flush_status->status.ok()) {
463
215
      for (const auto& error : flush_status->errors) {
464
215
        if (error->status().IsNotFound()) {
465
0
          tablet_not_found = true;
466
0
        }
467
215
        op_errors[&error->failed_op()] = std::move(error->status());
468
215
        YB_LOG_EVERY_N_SECS(WARNING, 1) << "Explicit error while inserting: "
469
214
                                        << error->status().ToString();
470
215
      }
471
215
    }
472
473
208k
    if (tablet_not_found && 
Retrying()0
) {
474
        // We will retry and not mark the ops as failed.
475
0
        return;
476
0
    }
477
478
208k
    
for (auto* op : ops_)208k
{
479
208k
      if (op->has_operation() && 
op_errors.find(&op->operation()) != op_errors.end()207k
) {
480
        // Could check here for NotFound either.
481
215
        auto s = op_errors[&op->operation()];
482
215
        op->Respond(s);
483
207k
      } else {
484
207k
        op->Respond(Status::OK());
485
207k
      }
486
208k
    }
487
488
208k
    Processed();
489
208k
  }
490
491
208k
  void Processed() {
492
208k
    auto allow_local_calls_in_curr_thread = false;
493
208k
    if (session_) {
494
208k
      allow_local_calls_in_curr_thread = session_->allow_local_calls_in_curr_thread();
495
208k
      session_pool_->Release(session_);
496
208k
      session_.reset();
497
208k
    }
498
208k
    if (next_) {
499
2
      next_->Launch(session_pool_, allow_local_calls_in_curr_thread);
500
2
    }
501
208k
    context_.reset();
502
208k
  }
503
504
0
  bool Retrying() {
505
0
    auto old_table = context_->table();
506
0
    context_->CleanYBTableFromCache();
507
508
0
    const int kMaxRetries = 2;
509
0
    if (num_retries_ >= kMaxRetries) {
510
0
      VLOG(3) << "Not retrying because we are past kMaxRetries. num_retries_ = " << num_retries_
511
0
              << " kMaxRetries = " << kMaxRetries;
512
0
      return false;
513
0
    }
514
0
    num_retries_++;
515
516
0
    auto allow_local_calls_in_curr_thread = false;
517
0
    if (session_) {
518
0
      allow_local_calls_in_curr_thread = session_->allow_local_calls_in_curr_thread();
519
0
      session_pool_->Release(session_);
520
0
      session_.reset();
521
0
    }
522
523
0
    auto table = context_->table();
524
0
    if (!table || table->id() == old_table->id()) {
525
0
      VLOG(3) << "Not retrying because new table is : " << (table ? table->id() : "nullptr")
526
0
              << " old table was " << old_table->id();
527
0
      return false;
528
0
    }
529
530
    // Swap out ops with the newer version of the ops referring to the newly created table.
531
0
    for (auto* op : ops_) {
532
0
      op->ResetTable(context_->table());
533
0
    }
534
0
    Launch(session_pool_, allow_local_calls_in_curr_thread);
535
0
    VLOG(3) << " Retrying with table : " << table->id() << " old table was " << old_table->id();
536
0
    return true;
537
0
  }
538
539
 private:
540
  BatchContextPtr context_;
541
  Ops ops_;
542
  rpc::RpcMethodMetrics metrics_internal_;
543
  MonoTime start_;
544
  SessionPool* session_pool_;
545
  std::shared_ptr<client::YBSession> session_;
546
  BlockPtr next_;
547
  int num_retries_ = 1;
548
};
549
550
typedef std::array<rpc::RpcMethodMetrics, kOperationTypeMapSize> InternalMetrics;
551
552
struct BlockData {
553
416k
  explicit BlockData(Arena* arena) : used_keys(UsedKeys::allocator_type(arena)) {}
554
555
0
  std::string ToString() const {
556
0
    return Format("{ used_keys: $0 block: $1 count: $2 }", used_keys, block, count);
557
0
  }
558
559
  typedef MCUnorderedSet<Slice, Slice::Hash> UsedKeys;
560
  UsedKeys used_keys;
561
  BlockPtr block;
562
  size_t count = 0;
563
};
564
565
class TabletOperations {
566
 public:
567
  explicit TabletOperations(Arena* arena)
568
208k
      : read_data_(arena), write_data_(arena) {
569
208k
  }
570
571
415k
  BlockData& data(OperationType type) {
572
415k
    switch (type) {
573
207k
      case OperationType::kRead:
574
207k
        return read_data_;
575
207k
      case OperationType::kWrite:
576
207k
        return write_data_;
577
0
      case OperationType::kNone: FALLTHROUGH_INTENDED;
578
0
      case OperationType::kLocal:
579
0
        FATAL_INVALID_ENUM_VALUE(OperationType, type);
580
415k
    }
581
0
    FATAL_INVALID_ENUM_VALUE(OperationType, type);
582
0
  }
583
584
208k
  void Done(SessionPool* session_pool, bool allow_local_calls_in_curr_thread) {
585
208k
    if (flush_head_) {
586
638
      flush_head_->Launch(session_pool, allow_local_calls_in_curr_thread);
587
207k
    } else {
588
207k
      if (read_data_.block) {
589
84.1k
        read_data_.block->Launch(session_pool, allow_local_calls_in_curr_thread);
590
84.1k
      }
591
207k
      if (write_data_.block) {
592
123k
        write_data_.block->Launch(session_pool, allow_local_calls_in_curr_thread);
593
123k
      }
594
207k
    }
595
208k
  }
596
597
  void Process(const BatchContextPtr& context,
598
               Arena* arena,
599
               Operation* operation,
600
208k
               const InternalMetrics& metrics_internal) {
601
208k
    auto type = operation->type();
602
208k
    if (type == OperationType::kLocal) {
603
636
      ProcessLocalOperation(context, arena, operation, metrics_internal);
604
636
      return;
605
636
    }
606
207k
    boost::container::small_vector<Slice, RedisClientCommand::static_capacity> keys;
607
207k
    operation->GetKeys(&keys);
608
207k
    CheckConflicts(type, keys);
609
207k
    auto& data = this->data(type);
610
207k
    if (!data.block) {
611
207k
      ArenaAllocator<Block> alloc(arena);
612
207k
      data.block = std::allocate_shared<Block>(
613
207k
          alloc, context, alloc, metrics_internal[static_cast<size_t>(OperationType::kRead)]);
614
207k
      if (last_conflict_type_ == OperationType::kLocal) {
615
0
        last_local_block_->SetNext(data.block);
616
0
        last_conflict_type_ = type;
617
207k
      } else if (type == last_conflict_type_) {
618
2
        auto old_value = this->data(Opposite(type)).block->SetNext(data.block);
619
2
        if (old_value) {
620
0
          LOG(DFATAL) << "Opposite already had next block: "
621
0
                      << operation->call().serialized_request().ToDebugString();
622
0
        }
623
2
      }
624
207k
    }
625
207k
    data.block->AddOperation(operation);
626
207k
    RememberKeys(type, &keys);
627
207k
  }
628
629
0
  std::string ToString() const {
630
0
    return Format("{ read_data: $0 write_data: $1 flush_head: $2 last_local_block: $3 "
631
0
                  "last_conflict_type: $4 }",
632
0
                  read_data_, write_data_, flush_head_, last_local_block_, last_conflict_type_);
633
0
  }
634
635
 private:
636
  void ProcessLocalOperation(const BatchContextPtr& context,
637
                             Arena* arena,
638
                             Operation* operation,
639
636
                             const InternalMetrics& metrics_internal) {
640
636
    ArenaAllocator<Block> alloc(arena);
641
636
    auto block = std::allocate_shared<Block>(
642
636
        alloc, context, alloc, metrics_internal[static_cast<size_t>(OperationType::kLocal)]);
643
636
    switch (last_conflict_type_) {
644
636
      case OperationType::kNone:
645
636
        if (read_data_.block) {
646
0
          flush_head_ = read_data_.block;
647
0
          if (write_data_.block) {
648
0
            read_data_.block->SetNext(write_data_.block);
649
0
            write_data_.block->SetNext(block);
650
0
          } else {
651
0
            read_data_.block->SetNext(block);
652
0
          }
653
636
        } else if (write_data_.block) {
654
0
          flush_head_ = write_data_.block;
655
0
          write_data_.block->SetNext(block);
656
636
        } else {
657
636
          flush_head_ = block;
658
636
        }
659
636
        break;
660
0
      case OperationType::kRead:
661
0
        read_data_.block->SetNext(block);
662
0
        break;
663
0
      case OperationType::kWrite:
664
0
        write_data_.block->SetNext(block);
665
0
        break;
666
0
      case OperationType::kLocal:
667
0
        last_local_block_->SetNext(block);
668
0
        break;
669
636
    }
670
636
    read_data_.block = nullptr;
671
636
    read_data_.used_keys.clear();
672
636
    write_data_.block = nullptr;
673
636
    write_data_.used_keys.clear();
674
636
    last_local_block_ = block;
675
636
    last_conflict_type_ = OperationType::kLocal;
676
636
    block->AddOperation(operation);
677
636
  }
678
679
2
  void ConflictFound(OperationType type) {
680
2
    auto& data = this->data(type);
681
2
    auto& opposite_data = this->data(Opposite(type));
682
683
2
    switch (last_conflict_type_) {
684
2
      case OperationType::kNone:
685
2
        flush_head_ = opposite_data.block;
686
2
        opposite_data.block->SetNext(data.block);
687
2
        break;
688
0
      case OperationType::kWrite:
689
0
      case OperationType::kRead:
690
0
        data.block = nullptr;
691
0
        data.used_keys.clear();
692
0
        break;
693
0
      case OperationType::kLocal:
694
0
        last_local_block_->SetNext(data.block);
695
0
        break;
696
2
    }
697
2
    last_conflict_type_ = type;
698
2
  }
699
700
207k
  void CheckConflicts(OperationType type, const RedisKeyList& keys) {
701
207k
    if (last_conflict_type_ == type) {
702
1
      return;
703
1
    }
704
207k
    if (last_conflict_type_ == OperationType::kLocal) {
705
0
      return;
706
0
    }
707
207k
    auto& opposite = data(Opposite(type));
708
207k
    bool conflict = false;
709
207k
    for (const auto& key : keys) {
710
207k
      if (opposite.used_keys.count(key)) {
711
2
        conflict = true;
712
2
        break;
713
2
      }
714
207k
    }
715
207k
    if (conflict) {
716
2
      ConflictFound(type);
717
2
    }
718
207k
  }
719
720
207k
  void RememberKeys(OperationType type, RedisKeyList* keys) {
721
207k
    BlockData* dest;
722
207k
    switch (type) {
723
84.1k
      case OperationType::kRead:
724
84.1k
        dest = &read_data_;
725
84.1k
        break;
726
123k
      case OperationType::kWrite:
727
123k
        dest = &write_data_;
728
123k
        break;
729
0
      case OperationType::kNone: FALLTHROUGH_INTENDED;
730
0
      case OperationType::kLocal:
731
0
        FATAL_INVALID_ENUM_VALUE(OperationType, type);
732
207k
    }
733
207k
    for (auto& key : *keys) {
734
207k
      dest->used_keys.insert(std::move(key));
735
207k
    }
736
207k
  }
737
738
  BlockData read_data_;
739
  BlockData write_data_;
740
  BlockPtr flush_head_;
741
  BlockPtr last_local_block_;
742
743
  // Type of command that caused last conflict between reads and writes.
744
  OperationType last_conflict_type_ = OperationType::kNone;
745
};
746
747
YB_STRONGLY_TYPED_BOOL(IsMonitorMessage);
748
749
struct RedisServiceImplData : public RedisServiceData {
750
  RedisServiceImplData(RedisServer* server, string&& yb_tier_master_addresses);
751
752
  void AppendToMonitors(Connection* conn) override;
753
  void RemoveFromMonitors(Connection* conn) override;
754
  void LogToMonitors(const string& end, const string& db, const RedisClientCommand& cmd) override;
755
  yb::Result<std::shared_ptr<client::YBTable>> GetYBTableForDB(const string& db_name) override;
756
757
  void CleanYBTableFromCacheForDB(const string& table);
758
759
  void AppendToSubscribers(
760
      AsPattern type, const std::vector<std::string>& channels, rpc::Connection* conn,
761
      std::vector<size_t>* subs) override;
762
  void RemoveFromSubscribers(
763
      AsPattern type, const std::vector<std::string>& channels, rpc::Connection* conn,
764
      std::vector<size_t>* subs) override;
765
  void CleanUpSubscriptions(Connection* conn) override;
766
  size_t NumSubscribers(AsPattern type, const std::string& channel) override;
767
  std::unordered_set<std::string> GetSubscriptions(AsPattern type, rpc::Connection* conn) override;
768
  std::unordered_set<std::string> GetAllSubscriptions(AsPattern type) override;
769
  int Publish(const string& channel, const string& message);
770
  void ForwardToInterestedProxies(
771
      const string& channel, const string& message, const IntFunctor& f) override;
772
  int PublishToLocalClients(IsMonitorMessage mode, const string& channel, const string& message);
773
  Result<vector<HostPortPB>> GetServerAddrsForChannel(const string& channel);
774
  size_t NumSubscriptionsUnlocked(Connection* conn);
775
776
  CHECKED_STATUS GetRedisPasswords(vector<string>* passwords) override;
777
  CHECKED_STATUS Initialize();
778
210k
  bool initialized() const { return initialized_.load(std::memory_order_relaxed); }
779
780
  // yb::Result<std::shared_ptr<client::YBTable>> GetYBTableForDB(const string& db_name);
781
782
  std::string yb_tier_master_addresses_;
783
784
  yb::rpc::RpcMethodMetrics metrics_error_;
785
  InternalMetrics metrics_internal_;
786
787
  // Mutex that protects the creation of client_ and populating db_to_opened_table_.
788
  std::mutex yb_mutex_;
789
  std::atomic<bool> initialized_;
790
  client::YBClient* client_ = nullptr;
791
  SessionPool session_pool_;
792
  std::unordered_map<std::string, std::shared_ptr<client::YBTable>> db_to_opened_table_;
793
  std::shared_ptr<client::YBMetaDataCache> tables_cache_;
794
795
  rw_semaphore pubsub_mutex_;
796
  std::unordered_map<std::string, std::unordered_set<Connection*>> channels_to_clients_;
797
  std::unordered_map<std::string, std::unordered_set<Connection*>> patterns_to_clients_;
798
  struct ClientSubscription {
799
    std::unordered_set<std::string> channels;
800
    std::unordered_set<std::string> patterns;
801
  };
802
  std::unordered_map<Connection*, ClientSubscription> clients_to_subscriptions_;
803
804
  std::unordered_set<Connection*> monitoring_clients_;
805
  scoped_refptr<AtomicGauge<uint64_t>> num_clients_monitoring_;
806
807
  std::mutex redis_password_mutex_;
808
  MonoTime redis_cached_password_validity_expiry_;
809
  vector<string> redis_cached_passwords_;
810
811
  RedisServer* server_ = nullptr;
812
};
813
814
class BatchContextImpl : public BatchContext {
815
 public:
816
  BatchContextImpl(
817
      const string& dbname, const std::shared_ptr<RedisInboundCall>& call,
818
      RedisServiceImplData* impl_data)
819
      : impl_data_(impl_data),
820
        db_name_(dbname),
821
        call_(call),
822
        consumption_(impl_data->server_->mem_tracker(), 0),
823
        operations_(&arena_),
824
        lookups_left_(0),
825
209k
        tablets_(&arena_) {
826
209k
  }
827
828
209k
  virtual ~BatchContextImpl() {}
829
830
213k
  const RedisClientCommand& command(size_t idx) const override {
831
213k
    return call_->client_batch()[idx];
832
213k
  }
833
834
3.55k
  const std::shared_ptr<RedisInboundCall>& call() const override {
835
3.55k
    return call_;
836
3.55k
  }
837
838
737
  client::YBClient* client() const override {
839
737
    return impl_data_->client_;
840
737
  }
841
842
943
  RedisServiceImplData* service_data() override {
843
943
    return impl_data_;
844
943
  }
845
846
1.02k
  const RedisServer* server() override {
847
1.02k
    return impl_data_->server_;
848
1.02k
  }
849
850
0
  void CleanYBTableFromCache() override {
851
0
    impl_data_->CleanYBTableFromCacheForDB(db_name_);
852
0
  }
853
854
208k
  std::shared_ptr<client::YBTable> table() override {
855
208k
    auto table = impl_data_->GetYBTableForDB(db_name_);
856
208k
    if (!table.ok()) {
857
0
      return nullptr;
858
0
    }
859
208k
    return *table;
860
208k
  }
861
862
209k
  void Commit() {
863
209k
    Commit(1);
864
209k
  }
865
866
209k
  void Commit(int retries) {
867
209k
    if (operations_.empty()) {
868
2.18k
      return;
869
2.18k
    }
870
871
207k
    auto table = impl_data_->GetYBTableForDB(db_name_);
872
207k
    if (!table.ok()) {
873
0
      for (auto& operation : operations_) {
874
0
        operation.Respond(table.status());
875
0
      }
876
0
    }
877
207k
    auto deadline = CoarseMonoClock::Now() + FLAGS_redis_service_yb_client_timeout_millis * 1ms;
878
207k
    lookups_left_.store(operations_.size(), std::memory_order_release);
879
207k
    retry_lookups_.store(false, std::memory_order_release);
880
208k
    for (auto& operation : operations_) {
881
208k
      impl_data_->client_->LookupTabletByKey(
882
208k
          table.get(), operation.partition_key(), deadline,
883
208k
          std::bind(
884
208k
              &BatchContextImpl::LookupDone, scoped_refptr<BatchContextImpl>(this), &operation,
885
208k
              retries, _1));
886
208k
    }
887
207k
  }
888
889
  void Apply(
890
      size_t index,
891
      std::shared_ptr<client::YBRedisReadOp> operation,
892
84.1k
      const rpc::RpcMethodMetrics& metrics) override {
893
84.1k
    DoApply(index, std::move(operation), metrics);
894
84.1k
  }
895
896
  void Apply(
897
      size_t index,
898
      std::shared_ptr<client::YBRedisWriteOp> operation,
899
123k
      const rpc::RpcMethodMetrics& metrics) override {
900
123k
    DoApply(index, std::move(operation), metrics);
901
123k
  }
902
903
  void Apply(
904
      size_t index,
905
      std::function<bool(client::YBSession*, const StatusFunctor&)> functor,
906
      std::string partition_key,
907
      const rpc::RpcMethodMetrics& metrics,
908
636
      ManualResponse manual_response) override {
909
636
    DoApply(index, std::move(functor), std::move(partition_key), metrics, manual_response);
910
636
  }
911
912
0
  std::string ToString() const {
913
0
    return Format("{ tablets: $0 }", tablets_);
914
0
  }
915
916
 private:
917
  template <class... Args>
918
208k
  void DoApply(Args&&... args) {
919
208k
    operations_.emplace_back(call_, std::forward<Args>(args)...);
920
208k
    if (PREDICT_FALSE(operations_.back().responded())) {
921
0
      operations_.pop_back();
922
208k
    } else {
923
208k
      consumption_.Add(operations_.back().space_used_by_request());
924
208k
    }
925
208k
  }
redis_service.cc:void yb::redisserver::(anonymous namespace)::BatchContextImpl::DoApply<unsigned long&, std::__1::shared_ptr<yb::client::YBRedisReadOp>, yb::rpc::RpcMethodMetrics const&>(unsigned long&, std::__1::shared_ptr<yb::client::YBRedisReadOp>&&, yb::rpc::RpcMethodMetrics const&)
Line
Count
Source
918
84.1k
  void DoApply(Args&&... args) {
919
84.1k
    operations_.emplace_back(call_, std::forward<Args>(args)...);
920
84.1k
    if (PREDICT_FALSE(operations_.back().responded())) {
921
0
      operations_.pop_back();
922
84.1k
    } else {
923
84.1k
      consumption_.Add(operations_.back().space_used_by_request());
924
84.1k
    }
925
84.1k
  }
redis_service.cc:void yb::redisserver::(anonymous namespace)::BatchContextImpl::DoApply<unsigned long&, std::__1::shared_ptr<yb::client::YBRedisWriteOp>, yb::rpc::RpcMethodMetrics const&>(unsigned long&, std::__1::shared_ptr<yb::client::YBRedisWriteOp>&&, yb::rpc::RpcMethodMetrics const&)
Line
Count
Source
918
123k
  void DoApply(Args&&... args) {
919
123k
    operations_.emplace_back(call_, std::forward<Args>(args)...);
920
123k
    if (PREDICT_FALSE(operations_.back().responded())) {
921
0
      operations_.pop_back();
922
123k
    } else {
923
123k
      consumption_.Add(operations_.back().space_used_by_request());
924
123k
    }
925
123k
  }
redis_service.cc:void yb::redisserver::(anonymous namespace)::BatchContextImpl::DoApply<unsigned long&, std::__1::function<bool (yb::client::YBSession*, boost::function<void (yb::Status const&)> const&)>, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, yb::rpc::RpcMethodMetrics const&, yb::StronglyTypedBool<yb::redisserver::ManualResponse_Tag>&>(unsigned long&, std::__1::function<bool (yb::client::YBSession*, boost::function<void (yb::Status const&)> const&)>&&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&&, yb::rpc::RpcMethodMetrics const&, yb::StronglyTypedBool<yb::redisserver::ManualResponse_Tag>&)
Line
Count
Source
918
636
  void DoApply(Args&&... args) {
919
636
    operations_.emplace_back(call_, std::forward<Args>(args)...);
920
636
    if (PREDICT_FALSE(operations_.back().responded())) {
921
0
      operations_.pop_back();
922
636
    } else {
923
636
      consumption_.Add(operations_.back().space_used_by_request());
924
636
    }
925
636
  }
926
927
  void LookupDone(
928
208k
      Operation* operation, int retries, const Result<client::internal::RemoteTabletPtr>& result) {
929
208k
    constexpr int kMaxRetries = 2;
930
208k
    if (!result.ok()) {
931
0
      auto status = result.status();
932
0
      if (status.IsNotFound() && retries < kMaxRetries) {
933
0
        retry_lookups_.store(false, std::memory_order_release);
934
0
      } else {
935
0
        operation->Respond(status);
936
0
      }
937
208k
    } else {
938
208k
      operation->SetTablet(*result);
939
208k
    }
940
208k
    if (lookups_left_.fetch_sub(1, std::memory_order_acq_rel) != 1) {
941
569
      return;
942
569
    }
943
944
207k
    if (retries < kMaxRetries && retry_lookups_.load(std::memory_order_acquire)) {
945
0
      CleanYBTableFromCache();
946
0
      Commit(retries + 1);
947
0
      return;
948
0
    }
949
950
207k
    BatchContextPtr self(this);
951
208k
    for (auto& operation : operations_) {
952
208k
      if (!operation.responded()) {
953
208k
        auto it = tablets_.find(operation.tablet()->tablet_id());
954
208k
        if (it == tablets_.end()) {
955
208k
          it = tablets_.emplace(operation.tablet()->tablet_id(), TabletOperations(&arena_)).first;
956
208k
        }
957
208k
        it->second.Process(self, &arena_, &operation, impl_data_->metrics_internal_);
958
208k
      }
959
208k
    }
960
961
207k
    size_t idx = 0;
962
208k
    for (auto& tablet : tablets_) {
963
208k
      tablet.second.Done(&impl_data_->session_pool_, ++idx == tablets_.size());
964
208k
    }
965
207k
    tablets_.clear();
966
207k
  }
967
968
  RedisServiceImplData* impl_data_ = nullptr;
969
970
  const string db_name_;
971
  std::shared_ptr<RedisInboundCall> call_;
972
  ScopedTrackedConsumption consumption_;
973
974
  Arena arena_;
975
  MCDeque<Operation> operations_;
976
  std::atomic<bool> retry_lookups_;
977
  std::atomic<size_t> lookups_left_;
978
  MCUnorderedMap<Slice, TabletOperations, Slice::Hash> tablets_;
979
};
980
981
} // namespace
982
983
class RedisServiceImpl::Impl {
984
 public:
985
  Impl(RedisServer* server, string yb_tier_master_address);
986
987
0
  ~Impl() {
988
    // Wait for DebugSleep to finish.
989
    // We use DebugSleep only during tests.
990
    // So just for long enough, giving extra 400ms for it.
991
0
    std::this_thread::sleep_for(500ms);
992
0
  }
993
994
  void Handle(yb::rpc::InboundCallPtr call_ptr);
995
996
 private:
997
  static constexpr size_t kMaxCommandLen = 32;
998
999
246k
  void SetupMethod(const RedisCommandInfo& info) {
1000
246k
    CHECK_LE(info.name.length(), kMaxCommandLen);
1001
246k
    auto info_ptr = std::make_shared<RedisCommandInfo>(info);
1002
246k
    std::string lower_name = boost::to_lower_copy(info.name);
1003
246k
    names_.push_back(lower_name);
1004
246k
    CHECK(command_name_to_info_map_.emplace(names_.back(), info_ptr).second);
1005
246k
  }
1006
1007
209k
  bool CheckArgumentSizeOK(const RedisClientCommand& cmd_args) {
1008
757k
    for (Slice arg : cmd_args) {
1009
757k
      if (arg.size() > FLAGS_redis_max_value_size) {
1010
0
        return false;
1011
0
      }
1012
757k
    }
1013
209k
    return true;
1014
209k
  }
1015
1016
209k
  bool CheckAuthentication(RedisConnectionContext* conn_context) {
1017
209k
    if (!conn_context->is_authenticated()) {
1018
1.48k
      vector<string> passwords;
1019
1.48k
      Status s = data_.GetRedisPasswords(&passwords);
1020
1.48k
      conn_context->set_authenticated(
!FLAGS_enable_redis_auth1.48k
|| (s.ok() && passwords.empty()));
1021
1.48k
    }
1022
209k
    return conn_context->is_authenticated();
1023
209k
  }
1024
1025
  void PopulateHandlers();
1026
  // Fetches the appropriate handler for the command, nullptr if none exists.
1027
  const RedisCommandInfo* FetchHandler(const RedisClientCommand& cmd_args);
1028
1029
  std::deque<std::string> names_;
1030
  std::unordered_map<Slice, RedisCommandInfoPtr, Slice::Hash> command_name_to_info_map_;
1031
1032
  RedisServiceImplData data_;
1033
};
1034
1035
RedisServiceImplData::RedisServiceImplData(RedisServer* server, string&& yb_tier_master_addresses)
1036
    : yb_tier_master_addresses_(std::move(yb_tier_master_addresses)),
1037
      initialized_(false),
1038
2.93k
      server_(server) {}
1039
1040
yb::Result<std::shared_ptr<client::YBTable>> RedisServiceImplData::GetYBTableForDB(
1041
416k
    const string& db_name) {
1042
416k
  std::shared_ptr<client::YBTable> table;
1043
416k
  YBTableName table_name = GetYBTableNameForRedisDatabase(db_name);
1044
416k
  bool was_cached = false;
1045
416k
  auto res = tables_cache_->GetTable(table_name, &table, &was_cached);
1046
416k
  if (!res.ok()) 
return res2
;
1047
416k
  return table;
1048
416k
}
1049
1050
0
void RedisServiceImplData::AppendToMonitors(Connection* conn) {
1051
0
  VLOG(3) << "AppendToMonitors (" << conn->ToString();
1052
0
  {
1053
0
    boost::lock_guard<decltype(pubsub_mutex_)> lock(pubsub_mutex_);
1054
0
    monitoring_clients_.insert(conn);
1055
0
    num_clients_monitoring_->set_value(monitoring_clients_.size());
1056
0
  }
1057
0
  auto& context = static_cast<RedisConnectionContext&>(conn->context());
1058
0
  if (context.ClientMode() != RedisClientMode::kMonitoring) {
1059
0
    context.SetClientMode(RedisClientMode::kMonitoring);
1060
0
    context.SetCleanupHook(std::bind(&RedisServiceImplData::RemoveFromMonitors, this, conn));
1061
0
  }
1062
0
}
1063
1064
0
void RedisServiceImplData::RemoveFromMonitors(Connection* conn) {
1065
0
  VLOG(3) << "RemoveFromMonitors (" << conn->ToString();
1066
0
  {
1067
0
    boost::lock_guard<decltype(pubsub_mutex_)> lock(pubsub_mutex_);
1068
0
    monitoring_clients_.erase(conn);
1069
0
    num_clients_monitoring_->set_value(monitoring_clients_.size());
1070
0
  }
1071
0
}
1072
1073
120
size_t RedisServiceImplData::NumSubscriptionsUnlocked(Connection* conn) {
1074
120
  return clients_to_subscriptions_[conn].channels.size() +
1075
120
         clients_to_subscriptions_[conn].patterns.size();
1076
120
}
1077
1078
void RedisServiceImplData::AppendToSubscribers(
1079
    AsPattern type, const std::vector<std::string>& channels, rpc::Connection* conn,
1080
49
    std::vector<size_t>* subs) {
1081
49
  boost::lock_guard<decltype(pubsub_mutex_)> lock(pubsub_mutex_);
1082
49
  subs->clear();
1083
70
  for (const auto& channel : channels) {
1084
70
    VLOG
(3) << "AppendToSubscribers (" << type << ", " << channel << ", " << conn->ToString()0
;
1085
70
    if (type == AsPattern::kTrue) {
1086
29
      patterns_to_clients_[channel].insert(conn);
1087
29
      clients_to_subscriptions_[conn].patterns.insert(channel);
1088
41
    } else {
1089
41
      channels_to_clients_[channel].insert(conn);
1090
41
      clients_to_subscriptions_[conn].channels.insert(channel);
1091
41
    }
1092
70
    subs->push_back(NumSubscriptionsUnlocked(conn));
1093
70
  }
1094
49
  auto& context = static_cast<RedisConnectionContext&>(conn->context());
1095
49
  if (context.ClientMode() != RedisClientMode::kSubscribed) {
1096
43
    context.SetClientMode(RedisClientMode::kSubscribed);
1097
43
    context.SetCleanupHook(std::bind(&RedisServiceImplData::CleanUpSubscriptions, this, conn));
1098
43
  }
1099
49
}
1100
1101
void RedisServiceImplData::RemoveFromSubscribers(
1102
    AsPattern type, const std::vector<std::string>& channels, rpc::Connection* conn,
1103
38
    std::vector<size_t>* subs) {
1104
38
  boost::lock_guard<decltype(pubsub_mutex_)> lock(pubsub_mutex_);
1105
38
  auto& map_to_clients = (type == AsPattern::kTrue ? 
patterns_to_clients_17
:
channels_to_clients_21
);
1106
38
  auto& map_from_clients =
1107
38
      (type == AsPattern::kTrue ? 
clients_to_subscriptions_[conn].patterns17
1108
38
                                : 
clients_to_subscriptions_[conn].channels21
);
1109
1110
38
  subs->clear();
1111
50
  for (const auto& channel : channels) {
1112
50
    map_to_clients[channel].erase(conn);
1113
50
    if (map_to_clients[channel].empty()) {
1114
38
      map_to_clients.erase(channel);
1115
38
    }
1116
50
    map_from_clients.erase(channel);
1117
50
    subs->push_back(NumSubscriptionsUnlocked(conn));
1118
50
  }
1119
38
}
1120
1121
std::unordered_set<string> RedisServiceImplData::GetSubscriptions(
1122
16
    AsPattern type, Connection* conn) {
1123
16
  SharedLock<decltype(pubsub_mutex_)> lock(pubsub_mutex_);
1124
16
  return (
1125
16
      type == AsPattern::kTrue ? 
clients_to_subscriptions_[conn].patterns7
1126
16
                               : 
clients_to_subscriptions_[conn].channels9
);
1127
16
}
1128
1129
// ENG-4199: Consider getting all the cluster-wide subscriptions?
1130
38
std::unordered_set<string> RedisServiceImplData::GetAllSubscriptions(AsPattern type) {
1131
38
  std::unordered_set<string> ret;
1132
38
  SharedLock<decltype(pubsub_mutex_)> lock(pubsub_mutex_);
1133
38
  for (const auto& element :
1134
38
       (type == AsPattern::kTrue ? 
patterns_to_clients_19
:
channels_to_clients_19
)) {
1135
27
    ret.insert(element.first);
1136
27
  }
1137
38
  return ret;
1138
38
}
1139
1140
// ENG-4199: Consider getting all the cluster-wide subscribers?
1141
38
size_t RedisServiceImplData::NumSubscribers(AsPattern type, const std::string& channel) {
1142
38
  SharedLock<decltype(pubsub_mutex_)> lock(pubsub_mutex_);
1143
38
  const auto& look_in = (type ? 
patterns_to_clients_0
: channels_to_clients_);
1144
38
  const auto& iter = look_in.find(channel);
1145
38
  return (iter == look_in.end() ? 
026
:
iter->second.size()12
);
1146
38
}
1147
1148
void RedisServiceImplData::LogToMonitors(
1149
209k
    const string& end, const string& db, const RedisClientCommand& cmd) {
1150
209k
  {
1151
209k
    SharedLock<decltype(pubsub_mutex_)> rlock(pubsub_mutex_);
1152
209k
    if (
monitoring_clients_.empty()209k
) return;
1153
209k
  }
1154
1155
  // Prepare the string to be sent to all the monitoring clients.
1156
  // TODO: Use timestamp that works with converter.
1157
18.4E
  int64_t now_ms = ToMicroseconds(CoarseMonoClock::Now().time_since_epoch());
1158
18.4E
  std::stringstream ss;
1159
18.4E
  ss << "+";
1160
18.4E
  ss.setf(std::ios::fixed, std::ios::floatfield);
1161
18.4E
  ss.precision(6);
1162
18.4E
  ss << (now_ms / 1000000.0) << " [" << db << " " << end << "]";
1163
18.4E
  for (auto& part : cmd) {
1164
0
    ss << " \"" << part.ToBuffer() << "\"";
1165
0
  }
1166
18.4E
  ss << "\r\n";
1167
1168
18.4E
  PublishToLocalClients(IsMonitorMessage::kTrue, "", ss.str());
1169
18.4E
}
1170
1171
239
int RedisServiceImplData::Publish(const string& channel, const string& message) {
1172
239
  VLOG
(3) << "Forwarding to clients on channel " << channel0
;
1173
239
  return PublishToLocalClients(IsMonitorMessage::kFalse, channel, message);
1174
239
}
1175
1176
Result<vector<HostPortPB>> RedisServiceImplData::GetServerAddrsForChannel(
1177
228
    const string& channel_unused) {
1178
  // TODO(Amit): Instead of forwarding  blindly to all servers, figure out the
1179
  // ones that have a subscription and send it to them only.
1180
228
  std::vector<master::TSInformationPB> live_tservers;
1181
228
  Status s = CHECK_NOTNULL(server_->tserver())->GetLiveTServers(&live_tservers);
1182
228
  if (!s.ok()) {
1183
0
    LOG(WARNING) << s;
1184
0
    return s;
1185
0
  }
1186
1187
228
  vector<HostPortPB> servers;
1188
228
  const auto cloud_info_pb = server_->MakeCloudInfoPB();
1189
  // Queue NEW_NODE event for all the live tservers.
1190
684
  for (const master::TSInformationPB& ts_info : live_tservers) {
1191
684
    const auto& hostport_pb = DesiredHostPort(ts_info.registration().common(), cloud_info_pb);
1192
684
    if (hostport_pb.host().empty()) {
1193
0
      LOG(WARNING) << "Skipping TS since it doesn't have any rpc address: "
1194
0
                   << ts_info.DebugString();
1195
0
      continue;
1196
0
    }
1197
684
    servers.push_back(hostport_pb);
1198
684
  }
1199
228
  return servers;
1200
228
}
1201
1202
class PublishResponseHandler {
1203
 public:
1204
  PublishResponseHandler(int32_t n, IntFunctor f)
1205
228
      : num_replies_pending(n), done_functor(std::move(f)) {}
1206
1207
683
  void HandleResponse(const tserver::PublishResponsePB* resp) {
1208
683
    num_clients_forwarded_to.IncrementBy(resp->num_clients_forwarded_to());
1209
1210
683
    if (0 == num_replies_pending.IncrementBy(-1)) {
1211
228
      done_functor(num_clients_forwarded_to.Load());
1212
228
    }
1213
683
  }
1214
1215
 private:
1216
  AtomicInt<int32_t> num_replies_pending;
1217
  AtomicInt<int32_t> num_clients_forwarded_to{0};
1218
  IntFunctor done_functor;
1219
};
1220
1221
void RedisServiceImplData::ForwardToInterestedProxies(
1222
228
    const string& channel, const string& message, const IntFunctor& f) {
1223
228
  auto interested_servers = GetServerAddrsForChannel(channel);
1224
228
  if (!interested_servers.ok()) {
1225
0
    LOG(ERROR) << "Could not get servers to forward to " << interested_servers.status();
1226
0
    return;
1227
0
  }
1228
228
  std::shared_ptr<PublishResponseHandler> resp_handler =
1229
228
      std::make_shared<PublishResponseHandler>(interested_servers->size(), f);
1230
684
  for (auto& hostport_pb : *interested_servers) {
1231
684
    tserver::PublishRequestPB requestPB;
1232
684
    requestPB.set_channel(channel);
1233
684
    requestPB.set_message(message);
1234
684
    std::shared_ptr<tserver::TabletServerServiceProxy> proxy =
1235
684
        std::make_shared<tserver::TabletServerServiceProxy>(
1236
684
            &client_->proxy_cache(), HostPortFromPB(hostport_pb));
1237
684
    std::shared_ptr<tserver::PublishResponsePB> responsePB =
1238
684
        std::make_shared<tserver::PublishResponsePB>();
1239
684
    std::shared_ptr<yb::rpc::RpcController> rpcController = std::make_shared<rpc::RpcController>();
1240
    // Hold a copy of the shared ptr in the callback to ensure that the proxy, responsePB and
1241
    // rpcController are valid.
1242
    // these self-destruct on the latter of the two events
1243
    //  (i)  exit this loop, and
1244
    //  (ii) done with the callback.
1245
684
    proxy->PublishAsync(
1246
684
        requestPB, responsePB.get(), rpcController.get(),
1247
684
        [resp_handler, responsePB, rpcController, proxy]() mutable {
1248
684
          resp_handler->HandleResponse(responsePB.get());
1249
684
          responsePB.reset();
1250
684
          rpcController.reset();
1251
684
          proxy.reset();
1252
684
          resp_handler.reset();
1253
684
        });
1254
684
  }
1255
228
}
1256
1257
239
string MessageFor(const string& channel, const string& message) {
1258
239
  vector<string> parts;
1259
239
  parts.push_back(redisserver::EncodeAsBulkString("message").ToBuffer());
1260
239
  parts.push_back(redisserver::EncodeAsBulkString(channel).ToBuffer());
1261
239
  parts.push_back(redisserver::EncodeAsBulkString(message).ToBuffer());
1262
239
  return redisserver::EncodeAsArrayOfEncodedElements(parts);
1263
239
}
1264
1265
22
string PMessageFor(const string& pattern, const string& channel, const string& message) {
1266
22
  vector<string> parts;
1267
22
  parts.push_back(redisserver::EncodeAsBulkString("pmessage").ToBuffer());
1268
22
  parts.push_back(redisserver::EncodeAsBulkString(pattern).ToBuffer());
1269
22
  parts.push_back(redisserver::EncodeAsBulkString(channel).ToBuffer());
1270
22
  parts.push_back(redisserver::EncodeAsBulkString(message).ToBuffer());
1271
22
  return redisserver::EncodeAsArrayOfEncodedElements(parts);
1272
22
}
1273
1274
int RedisServiceImplData::PublishToLocalClients(
1275
239
    IsMonitorMessage mode, const string& channel, const string& message) {
1276
239
  SharedLock<decltype(pubsub_mutex_)> rlock(pubsub_mutex_);
1277
1278
239
  int num_pushed_to = 0;
1279
  // Send the message to all the monitoring clients.
1280
239
  OutboundDataPtr out;
1281
239
  const std::unordered_set<Connection*>* clients = nullptr;
1282
239
  if (mode == IsMonitorMessage::kTrue) {
1283
0
    out = std::make_shared<yb::rpc::StringOutboundData>(message, "Monitor redis commands");
1284
0
    clients = &monitoring_clients_;
1285
239
  } else {
1286
239
    out = std::make_shared<yb::rpc::StringOutboundData>(
1287
239
        MessageFor(channel, message), "Publishing to Channel");
1288
239
    clients =
1289
239
        (channels_to_clients_.find(channel) == channels_to_clients_.end()
1290
239
             ? 
nullptr99
1291
239
             : 
&channels_to_clients_[channel]140
);
1292
239
  }
1293
239
  if (clients) {
1294
    // Handle Monitor and Subscribe clients.
1295
145
    for (auto connection : *clients) {
1296
145
      DVLOG
(3) << "Publishing to subscribed client " << connection->ToString()0
;
1297
145
      connection->QueueOutboundData(out);
1298
145
      num_pushed_to++;
1299
145
    }
1300
140
  }
1301
239
  if (mode == IsMonitorMessage::kFalse) {
1302
    // Handle PSubscribe clients.
1303
239
    for (auto& entry : patterns_to_clients_) {
1304
34
      auto& pattern = entry.first;
1305
34
      auto& clients_subscribed_to_pattern = entry.second;
1306
34
      if (!RedisPatternMatch(pattern, channel, /* ignore case */ false)) {
1307
12
        continue;
1308
12
      }
1309
1310
22
      OutboundDataPtr out = std::make_shared<yb::rpc::StringOutboundData>(
1311
22
          PMessageFor(pattern, channel, message), "Publishing to Channel");
1312
24
      for (auto remote : clients_subscribed_to_pattern) {
1313
24
        remote->QueueOutboundData(out);
1314
24
        num_pushed_to++;
1315
24
      }
1316
22
    }
1317
239
  }
1318
1319
239
  return num_pushed_to;
1320
239
}
1321
1322
83
void RedisServiceImplData::CleanUpSubscriptions(Connection* conn) {
1323
83
  VLOG
(3) << "CleanUpSubscriptions (" << conn->ToString()0
;
1324
83
  boost::lock_guard<decltype(pubsub_mutex_)> wlock(pubsub_mutex_);
1325
83
  if (monitoring_clients_.find(conn) != monitoring_clients_.end()) {
1326
0
    monitoring_clients_.erase(conn);
1327
0
    num_clients_monitoring_->set_value(monitoring_clients_.size());
1328
0
  }
1329
83
  if (clients_to_subscriptions_.find(conn) != clients_to_subscriptions_.end()) {
1330
43
    for (auto& channel : clients_to_subscriptions_[conn].channels) {
1331
13
      channels_to_clients_[channel].erase(conn);
1332
13
      if (channels_to_clients_[channel].empty()) {
1333
10
        channels_to_clients_.erase(channel);
1334
10
      }
1335
13
    }
1336
43
    for (auto& pattern : clients_to_subscriptions_[conn].patterns) {
1337
6
      patterns_to_clients_[pattern].erase(conn);
1338
6
      if (patterns_to_clients_[pattern].empty()) {
1339
6
        patterns_to_clients_.erase(pattern);
1340
6
      }
1341
6
    }
1342
43
    clients_to_subscriptions_.erase(conn);
1343
43
  }
1344
83
}
1345
1346
572
Status RedisServiceImplData::Initialize() {
1347
572
  boost::lock_guard<std::mutex> guard(yb_mutex_);
1348
572
  if (!initialized()) {
1349
572
    client_ = server_->tserver()->client();
1350
1351
572
    server_->tserver()->SetPublisher(std::bind(&RedisServiceImplData::Publish, this, _1, _2));
1352
1353
572
    tables_cache_ = std::make_shared<YBMetaDataCache>(
1354
572
        client_, false /* Update roles permissions cache */);
1355
572
    session_pool_.Init(client_, server_->metric_entity());
1356
1357
572
    initialized_.store(true, std::memory_order_release);
1358
572
  }
1359
572
  return Status::OK();
1360
572
}
1361
1362
0
void RedisServiceImplData::CleanYBTableFromCacheForDB(const string& db) {
1363
0
  tables_cache_->RemoveCachedTable(GetYBTableNameForRedisDatabase(db));
1364
0
}
1365
1366
1.66k
Status RedisServiceImplData::GetRedisPasswords(vector<string>* passwords) {
1367
1.66k
  MonoTime now = MonoTime::Now();
1368
1369
1.66k
  std::lock_guard<std::mutex> lock(redis_password_mutex_);
1370
1.66k
  if (redis_cached_password_validity_expiry_.Initialized() &&
1371
1.66k
      
now < redis_cached_password_validity_expiry_1.09k
) {
1372
490
    *passwords = redis_cached_passwords_;
1373
490
    return Status::OK();
1374
490
  }
1375
1376
1.17k
  RETURN_NOT_OK(client_->GetRedisPasswords(&redis_cached_passwords_));
1377
1.17k
  *passwords = redis_cached_passwords_;
1378
1.17k
  redis_cached_password_validity_expiry_ =
1379
1.17k
      now + MonoDelta::FromMilliseconds(FLAGS_redis_password_caching_duration_ms);
1380
1.17k
  return Status::OK();
1381
1.17k
}
1382
1383
2.93k
void RedisServiceImpl::Impl::PopulateHandlers() {
1384
2.93k
  auto metric_entity = data_.server_->metric_entity();
1385
2.93k
  FillRedisCommands(metric_entity, std::bind(&Impl::SetupMethod, this, _1));
1386
1387
  // Set up metrics for erroneous calls.
1388
2.93k
  data_.metrics_error_.handler_latency = YB_REDIS_METRIC(error).Instantiate(metric_entity);
1389
2.93k
  data_.metrics_internal_[static_cast<size_t>(OperationType::kWrite)].handler_latency =
1390
2.93k
      YB_REDIS_METRIC(set_internal).Instantiate(metric_entity);
1391
2.93k
  data_.metrics_internal_[static_cast<size_t>(OperationType::kRead)].handler_latency =
1392
2.93k
      YB_REDIS_METRIC(get_internal).Instantiate(metric_entity);
1393
2.93k
  data_.metrics_internal_[static_cast<size_t>(OperationType::kLocal)].handler_latency =
1394
2.93k
      data_.metrics_internal_[static_cast<size_t>(OperationType::kRead)].handler_latency;
1395
1396
2.93k
  auto* proto = &METRIC_redis_monitoring_clients;
1397
2.93k
  data_.num_clients_monitoring_ = proto->Instantiate(metric_entity, 0);
1398
2.93k
}
1399
1400
209k
const RedisCommandInfo* RedisServiceImpl::Impl::FetchHandler(const RedisClientCommand& cmd_args) {
1401
209k
  if (cmd_args.size() < 1) {
1402
0
    return nullptr;
1403
0
  }
1404
209k
  Slice cmd_name = cmd_args[0];
1405
209k
  size_t len = cmd_name.size();
1406
209k
  if (len > kMaxCommandLen) {
1407
0
    return nullptr;
1408
0
  }
1409
209k
  char lower_cmd[kMaxCommandLen];
1410
1.05M
  for (size_t i = 0; i != len; 
++i841k
) {
1411
841k
    lower_cmd[i] = std::tolower(cmd_name[i]);
1412
841k
  }
1413
209k
  auto iter = command_name_to_info_map_.find(Slice(lower_cmd, len));
1414
209k
  if (iter == command_name_to_info_map_.end()) {
1415
1
    YB_LOG_EVERY_N_SECS(ERROR, 60)
1416
1
        << "Command " << cmd_name << " not yet supported. "
1417
1
        << "Arguments: " << ToString(cmd_args) << ". "
1418
1
        << "Raw: " << Slice(cmd_args[0].data(), cmd_args.back().end()).ToDebugString();
1419
1
    return nullptr;
1420
1
  }
1421
209k
  return iter->second.get();
1422
209k
}
1423
1424
RedisServiceImpl::Impl::Impl(RedisServer* server, string yb_tier_master_addresses)
1425
2.93k
    : data_(server, std::move(yb_tier_master_addresses)) {
1426
2.93k
  PopulateHandlers();
1427
2.93k
}
1428
1429
209k
bool AllowedInClientMode(const RedisCommandInfo* info, RedisClientMode mode) {
1430
209k
  if (mode == RedisClientMode::kMonitoring) {
1431
0
    static std::unordered_set<string> allowed = {"quit"};
1432
0
    return allowed.find(info->name) != allowed.end();
1433
209k
  } else if (mode == RedisClientMode::kSubscribed) {
1434
86
    static std::unordered_set<string> allowed = {"subscribe",    "unsubscribe", "psubscribe",
1435
86
                                                 "punsubscribe", "ping",        "quit"};
1436
86
    return allowed.find(info->name) != allowed.end();
1437
209k
  } else {
1438
    // kNormal.
1439
209k
    return true;
1440
209k
  }
1441
209k
}
1442
1443
209k
void RedisServiceImpl::Impl::Handle(rpc::InboundCallPtr call_ptr) {
1444
209k
  auto call = std::static_pointer_cast<RedisInboundCall>(call_ptr);
1445
1446
209k
  DVLOG
(2) << "Asked to handle a call " << call->ToString()1
;
1447
209k
  if (call->serialized_request().size() > FLAGS_redis_max_command_size) {
1448
0
    auto message = StrCat("Size of redis command ", call->serialized_request().size(),
1449
0
                          ", but we only support up to length of ", FLAGS_redis_max_command_size);
1450
0
    for (size_t idx = 0; idx != call->client_batch().size(); ++idx) {
1451
0
      RespondWithFailure(call, idx, message);
1452
0
    }
1453
0
    return;
1454
0
  }
1455
1456
  // Ensure that we have the required YBClient(s) initialized.
1457
209k
  if (!data_.initialized()) {
1458
572
    auto status = data_.Initialize();
1459
572
    if (!status.ok()) {
1460
0
      auto message = StrCat("Could not open .redis table. ", status.ToString());
1461
0
      for (size_t idx = 0; idx != call->client_batch().size(); ++idx) {
1462
0
        RespondWithFailure(call, idx, message);
1463
0
      }
1464
0
      return;
1465
0
    }
1466
572
  }
1467
1468
  // Call could contain several commands, i.e. batch.
1469
  // We process them as follows:
1470
  // Each read commands are processed individually.
1471
  // Sequential write commands use single session and the same batcher.
1472
209k
  const auto& batch = call->client_batch();
1473
209k
  auto conn = call->connection();
1474
209k
  const string remote = yb::ToString(conn->remote());
1475
209k
  RedisConnectionContext* conn_context = &(call->connection_context());
1476
209k
  string db_name = conn_context->redis_db_to_use();
1477
209k
  auto context = make_scoped_refptr<BatchContextImpl>(db_name, call, &data_);
1478
419k
  for (size_t idx = 0; idx != batch.size(); 
++idx209k
) {
1479
209k
    const RedisClientCommand& c = batch[idx];
1480
1481
209k
    auto cmd_info = FetchHandler(c);
1482
1483
    // Handle the current redis command.
1484
209k
    if (cmd_info == nullptr) {
1485
1
      RespondWithFailure(call, idx, "Unsupported call.");
1486
1
      continue;
1487
209k
    } else if (!AllowedInClientMode(cmd_info, conn_context->ClientMode())) {
1488
1
      RespondWithFailure(
1489
1
          call, idx, Substitute(
1490
1
                         "Command $0 not allowed in client mode $1.", cmd_info->name,
1491
1
                         yb::ToString(conn_context->ClientMode())));
1492
1
      continue;
1493
1
    }
1494
1495
209k
    size_t arity = static_cast<size_t>(std::abs(cmd_info->arity) - 1);
1496
209k
    bool exact_count = cmd_info->arity > 0;
1497
209k
    size_t passed_arguments = c.size() - 1;
1498
209k
    if (!exact_count && 
passed_arguments < arity135k
) {
1499
      // -X means that the command needs >= X arguments.
1500
0
      YB_LOG_EVERY_N_SECS(ERROR, 60)
1501
0
          << "Requested command " << c[0] << " does not have enough arguments."
1502
0
          << " At least " << arity << " expected, but " << passed_arguments << " found.";
1503
0
      RespondWithFailure(call, idx, "Too few arguments.");
1504
209k
    } else if (exact_count && 
passed_arguments != arity74.5k
) {
1505
      // X (> 0) means that the command needs exactly X arguments.
1506
0
      YB_LOG_EVERY_N_SECS(ERROR, 60)
1507
0
          << "Requested command " << c[0] << " has wrong number of arguments. "
1508
0
          << arity << " expected, but " << passed_arguments << " found.";
1509
0
      RespondWithFailure(call, idx, "Wrong number of arguments.");
1510
209k
    } else if (!CheckArgumentSizeOK(c)) {
1511
0
      RespondWithFailure(call, idx, "Redis argument too long.");
1512
209k
    } else if (!CheckAuthentication(conn_context) && 
cmd_info->name != "auth"177
) {
1513
2
      RespondWithFailure(call, idx, "Authentication required.", "NOAUTH");
1514
209k
    } else {
1515
209k
      if (cmd_info->name != "config" && 
cmd_info->name != "monitor"209k
) {
1516
209k
        data_.LogToMonitors(remote, db_name, c);
1517
209k
      }
1518
1519
      // Handle the call.
1520
209k
      cmd_info->functor(*cmd_info, idx, context.get());
1521
1522
209k
      if (cmd_info->name == "select" && 
db_name != conn_context->redis_db_to_use()353
) {
1523
        // update context.
1524
115
        context->Commit();
1525
115
        db_name = conn_context->redis_db_to_use();
1526
115
        context = make_scoped_refptr<BatchContextImpl>(db_name, call, &data_);
1527
115
      }
1528
209k
    }
1529
209k
  }
1530
209k
  context->Commit();
1531
209k
}
1532
1533
RedisServiceImpl::RedisServiceImpl(RedisServer* server, string yb_tier_master_address)
1534
    : RedisServerServiceIf(server->metric_entity()),
1535
2.93k
      impl_(new Impl(server, std::move(yb_tier_master_address))) {}
1536
1537
0
RedisServiceImpl::~RedisServiceImpl() {
1538
0
}
1539
1540
209k
void RedisServiceImpl::Handle(yb::rpc::InboundCallPtr call) {
1541
209k
  impl_->Handle(std::move(call));
1542
209k
}
1543
1544
2.93k
void RedisServiceImpl::FillEndpoints(const rpc::RpcServicePtr& service, rpc::RpcEndpointMap* map) {
1545
2.93k
  map->emplace(RedisInboundCall::static_serialized_remote_method(), std::make_pair(service, 0ULL));
1546
2.93k
}
1547
1548
}  // namespace redisserver
1549
}  // namespace yb