/Users/deen/code/yugabyte-db/src/yb/yql/redis/redisserver/redis_service.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/yql/redis/redisserver/redis_service.h" |
15 | | |
16 | | #include <thread> |
17 | | |
18 | | #include <boost/algorithm/string/case_conv.hpp> |
19 | | #include <boost/lockfree/queue.hpp> |
20 | | #include <gflags/gflags.h> |
21 | | |
22 | | #include "yb/client/client.h" |
23 | | #include "yb/client/error.h" |
24 | | #include "yb/client/meta_cache.h" |
25 | | #include "yb/client/meta_data_cache.h" |
26 | | #include "yb/client/session.h" |
27 | | #include "yb/client/table.h" |
28 | | #include "yb/client/yb_op.h" |
29 | | #include "yb/client/yb_table_name.h" |
30 | | |
31 | | #include "yb/common/redis_protocol.pb.h" |
32 | | |
33 | | #include "yb/gutil/casts.h" |
34 | | #include "yb/gutil/strings/join.h" |
35 | | |
36 | | #include "yb/master/master_heartbeat.pb.h" |
37 | | |
38 | | #include "yb/rpc/connection.h" |
39 | | #include "yb/rpc/rpc_controller.h" |
40 | | #include "yb/rpc/rpc_introspection.pb.h" |
41 | | |
42 | | #include "yb/tserver/tablet_server_interface.h" |
43 | | #include "yb/tserver/tserver_service.proxy.h" |
44 | | |
45 | | #include "yb/util/locks.h" |
46 | | #include "yb/util/logging.h" |
47 | | #include "yb/util/memory/mc_types.h" |
48 | | #include "yb/util/metrics.h" |
49 | | #include "yb/util/redis_util.h" |
50 | | #include "yb/util/result.h" |
51 | | #include "yb/util/shared_lock.h" |
52 | | #include "yb/util/size_literals.h" |
53 | | |
54 | | #include "yb/yql/redis/redisserver/redis_commands.h" |
55 | | #include "yb/yql/redis/redisserver/redis_encoding.h" |
56 | | #include "yb/yql/redis/redisserver/redis_rpc.h" |
57 | | |
58 | | using yb::operator"" _MB; |
59 | | using namespace std::literals; |
60 | | using namespace std::placeholders; |
61 | | using yb::client::YBMetaDataCache; |
62 | | using strings::Substitute; |
63 | | using yb::rpc::Connection; |
64 | | |
65 | | DEFINE_REDIS_histogram_EX(error, |
66 | | "yb.redisserver.RedisServerService.AnyMethod RPC Time", |
67 | | "yb.redisserver.RedisServerService.ErrorUnsupportedMethod()"); |
68 | | DEFINE_REDIS_histogram_EX(get_internal, |
69 | | "yb.redisserver.RedisServerService.Get RPC Time", |
70 | | "in yb.client.Get"); |
71 | | DEFINE_REDIS_histogram_EX(set_internal, |
72 | | "yb.redisserver.RedisServerService.Set RPC Time", |
73 | | "in yb.client.Set"); |
74 | | |
75 | | #define DEFINE_REDIS_SESSION_GAUGE(state) \ |
76 | | METRIC_DEFINE_gauge_uint64( \ |
77 | | server, \ |
78 | | BOOST_PP_CAT(redis_, BOOST_PP_CAT(state, _sessions)), \ |
79 | | "Number of " BOOST_PP_STRINGIZE(state) " sessions", \ |
80 | | yb::MetricUnit::kUnits, \ |
81 | | "Number of sessions " BOOST_PP_STRINGIZE(state) " by Redis service.") \ |
82 | | /**/ |
83 | | |
84 | | DEFINE_REDIS_SESSION_GAUGE(allocated); |
85 | | DEFINE_REDIS_SESSION_GAUGE(available); |
86 | | |
87 | | METRIC_DEFINE_gauge_uint64( |
88 | | server, redis_monitoring_clients, "Number of clients running monitor", yb::MetricUnit::kUnits, |
89 | | "Number of clients running monitor "); |
90 | | |
91 | | #if defined(THREAD_SANITIZER) || defined(ADDRESS_SANITIZER) |
92 | | constexpr int32_t kDefaultRedisServiceTimeoutMs = 600000; |
93 | | #else |
94 | | constexpr int32_t kDefaultRedisServiceTimeoutMs = 3000; |
95 | | #endif |
96 | | |
97 | | DEFINE_int32(redis_service_yb_client_timeout_millis, kDefaultRedisServiceTimeoutMs, |
98 | | "Timeout in milliseconds for RPC calls from Redis service to master/tserver"); |
99 | | |
100 | | // In order to support up to three 64MB strings along with other strings, |
101 | | // we have the total size of a redis command at 253_MB, which is less than the consensus size |
102 | | // to account for the headers in the consensus layer. |
103 | | DEFINE_uint64(redis_max_command_size, 253_MB, "Maximum size of the command in redis"); |
104 | | |
105 | | // Maximum value size is 64MB |
106 | | DEFINE_uint64(redis_max_value_size, 64_MB, "Maximum size of the value in redis"); |
107 | | DEFINE_int32(redis_callbacks_threadpool_size, 64, |
108 | | "The maximum size for the threadpool which handles callbacks from the ybclient layer"); |
109 | | |
110 | | DEFINE_int32(redis_password_caching_duration_ms, 5000, |
111 | | "The duration for which we will cache the redis passwords. 0 to disable."); |
112 | | |
113 | | DEFINE_bool(redis_safe_batch, true, "Use safe batching with Redis service"); |
114 | | DEFINE_bool(enable_redis_auth, true, "Enable AUTH for the Redis service"); |
115 | | |
116 | | DECLARE_string(placement_cloud); |
117 | | DECLARE_string(placement_region); |
118 | | DECLARE_string(placement_zone); |
119 | | |
120 | | using yb::client::YBRedisOp; |
121 | | using yb::client::YBRedisReadOp; |
122 | | using yb::client::YBRedisWriteOp; |
123 | | using yb::client::YBClientBuilder; |
124 | | using yb::client::YBSchema; |
125 | | using yb::client::YBSession; |
126 | | using yb::client::YBStatusCallback; |
127 | | using yb::client::YBTable; |
128 | | using yb::client::YBTableName; |
129 | | using yb::rpc::ConnectionPtr; |
130 | | using yb::rpc::ConnectionWeakPtr; |
131 | | using yb::rpc::OutboundData; |
132 | | using yb::rpc::OutboundDataPtr; |
133 | | using yb::RedisResponsePB; |
134 | | |
135 | | namespace yb { |
136 | | namespace redisserver { |
137 | | |
138 | | typedef boost::container::small_vector_base<Slice> RedisKeyList; |
139 | | |
140 | | namespace { |
141 | | |
142 | | YB_DEFINE_ENUM(OperationType, (kNone)(kRead)(kWrite)(kLocal)); |
143 | | |
144 | | // Returns opposite operation type for specified type. Write for read, read for write. |
145 | 104k | OperationType Opposite(OperationType type) { |
146 | 104k | switch (type) { |
147 | 42.7k | case OperationType::kRead: |
148 | 42.7k | return OperationType::kWrite; |
149 | 61.5k | case OperationType::kWrite: |
150 | 61.5k | return OperationType::kRead; |
151 | 0 | case OperationType::kNone: FALLTHROUGH_INTENDED; |
152 | 0 | case OperationType::kLocal: |
153 | 0 | FATAL_INVALID_ENUM_VALUE(OperationType, type); |
154 | 104k | } |
155 | 0 | FATAL_INVALID_ENUM_VALUE(OperationType, type); |
156 | 0 | } |
157 | | |
158 | | class Operation { |
159 | | public: |
160 | | template <class Op> |
161 | | Operation(const std::shared_ptr<RedisInboundCall>& call, |
162 | | size_t index, |
163 | | std::shared_ptr<Op> operation, |
164 | | const rpc::RpcMethodMetrics& metrics) |
165 | | : type_(std::is_same<Op, YBRedisReadOp>::value ? OperationType::kRead : OperationType::kWrite), |
166 | | call_(call), |
167 | | index_(index), |
168 | | operation_(std::move(operation)), |
169 | | metrics_(metrics), |
170 | 104k | manual_response_(ManualResponse::kFalse) { |
171 | 104k | auto status = operation_->GetPartitionKey(&partition_key_); |
172 | 104k | if (!status.ok()) { |
173 | 0 | Respond(status); |
174 | 0 | } |
175 | 104k | } redis_service.cc:_ZN2yb11redisserver12_GLOBAL__N_19OperationC2INS_6client13YBRedisReadOpEEERKNSt3__110shared_ptrINS0_16RedisInboundCallEEEmNS7_IT_EERKNS_3rpc16RpcMethodMetricsE Line | Count | Source | 170 | 42.7k | manual_response_(ManualResponse::kFalse) { | 171 | 42.7k | auto status = operation_->GetPartitionKey(&partition_key_); | 172 | 42.7k | if (!status.ok()) { | 173 | 0 | Respond(status); | 174 | 0 | } | 175 | 42.7k | } |
redis_service.cc:_ZN2yb11redisserver12_GLOBAL__N_19OperationC2INS_6client14YBRedisWriteOpEEERKNSt3__110shared_ptrINS0_16RedisInboundCallEEEmNS7_IT_EERKNS_3rpc16RpcMethodMetricsE Line | Count | Source | 170 | 61.5k | manual_response_(ManualResponse::kFalse) { | 171 | 61.5k | auto status = operation_->GetPartitionKey(&partition_key_); | 172 | 61.5k | if (!status.ok()) { | 173 | 0 | Respond(status); | 174 | 0 | } | 175 | 61.5k | } |
|
176 | | |
177 | | Operation(const std::shared_ptr<RedisInboundCall>& call, |
178 | | size_t index, |
179 | | std::function<bool(client::YBSession*, const StatusFunctor&)> functor, |
180 | | std::string partition_key, |
181 | | const rpc::RpcMethodMetrics& metrics, |
182 | | ManualResponse manual_response) |
183 | | : type_(OperationType::kLocal), |
184 | | call_(call), |
185 | | index_(index), |
186 | | functor_(std::move(functor)), |
187 | | partition_key_(std::move(partition_key)), |
188 | | metrics_(metrics), |
189 | 297 | manual_response_(manual_response) { |
190 | 297 | } |
191 | | |
192 | 209k | bool responded() const { |
193 | 209k | return responded_.load(std::memory_order_acquire); |
194 | 209k | } |
195 | | |
196 | 0 | size_t index() const { |
197 | 0 | return index_; |
198 | 0 | } |
199 | | |
200 | 104k | OperationType type() const { |
201 | 104k | return type_; |
202 | 104k | } |
203 | | |
204 | 104k | const YBRedisOp& operation() const { |
205 | 104k | return *operation_; |
206 | 104k | } |
207 | | |
208 | 104k | bool has_operation() const { |
209 | 104k | return operation_ != nullptr; |
210 | 104k | } |
211 | | |
212 | 104k | size_t space_used_by_request() const { |
213 | 104k | return operation_ ? operation_->space_used_by_request() : 0; |
214 | 104k | } |
215 | | |
216 | 104k | RedisResponsePB& response() { |
217 | 104k | switch (type_) { |
218 | 42.8k | case OperationType::kRead: |
219 | 42.8k | return *down_cast<YBRedisReadOp*>(operation_.get())->mutable_response(); |
220 | 61.5k | case OperationType::kWrite: |
221 | 61.5k | return *down_cast<YBRedisWriteOp*>(operation_.get())->mutable_response(); |
222 | 0 | case OperationType::kNone: FALLTHROUGH_INTENDED; |
223 | 0 | case OperationType::kLocal: |
224 | 0 | FATAL_INVALID_ENUM_VALUE(OperationType, type_); |
225 | 104k | } |
226 | 0 | FATAL_INVALID_ENUM_VALUE(OperationType, type_); |
227 | 0 | } |
228 | | |
229 | 0 | const rpc::RpcMethodMetrics& metrics() const { |
230 | 0 | return metrics_; |
231 | 0 | } |
232 | | |
233 | 104k | const std::string& partition_key() const { |
234 | 104k | return partition_key_; |
235 | 104k | } |
236 | | |
237 | 104k | void SetTablet(const client::internal::RemoteTabletPtr& tablet) { |
238 | 104k | if (operation_) { |
239 | 104k | operation_->SetTablet(tablet); |
240 | 104k | } |
241 | 104k | tablet_ = tablet; |
242 | 104k | } |
243 | | |
244 | 0 | void ResetTable(std::shared_ptr<client::YBTable> table) { |
245 | 0 | if (operation_) { |
246 | 0 | operation_->ResetTable(table); |
247 | 0 | } |
248 | 0 | tablet_.reset(); |
249 | 0 | } |
250 | | |
251 | 209k | const client::internal::RemoteTabletPtr& tablet() const { |
252 | 209k | return tablet_; |
253 | 209k | } |
254 | | |
255 | 0 | RedisInboundCall& call() const { |
256 | 0 | return *call_; |
257 | 0 | } |
258 | | |
259 | 104k | void GetKeys(RedisKeyList* keys) const { |
260 | 104k | if (FLAGS_redis_safe_batch) { |
261 | 104k | keys->emplace_back(operation_ ? operation_->GetKey() : Slice()); |
262 | 104k | } |
263 | 104k | } |
264 | | |
265 | 104k | bool Apply(client::YBSession* session, const StatusFunctor& callback, bool* applied_operations) { |
266 | | // We should destroy functor after this call. |
267 | | // Because it could hold references to other objects. |
268 | | // So we more it to temp variable. |
269 | 104k | auto functor = std::move(functor_); |
270 | | |
271 | 104k | if (call_->aborted()) { |
272 | 0 | Respond(STATUS(Aborted, "")); |
273 | 0 | return false; |
274 | 0 | } |
275 | | |
276 | | // Used for DebugSleep |
277 | 104k | if (functor) { |
278 | 297 | return functor(session, callback); |
279 | 297 | } |
280 | | |
281 | 104k | session->Apply(operation_); |
282 | 104k | *applied_operations = true; |
283 | 104k | return true; |
284 | 104k | } |
285 | | |
286 | 104k | void Respond(const Status& status) { |
287 | 104k | responded_.store(true, std::memory_order_release); |
288 | 104k | if (manual_response_) { |
289 | 297 | return; |
290 | 297 | } |
291 | | |
292 | 104k | if (status.ok()) { |
293 | 104k | if (operation_) { |
294 | 104k | call_->RespondSuccess(index_, metrics_, &response()); |
295 | 0 | } else { |
296 | 0 | RedisResponsePB resp; |
297 | 0 | call_->RespondSuccess(index_, metrics_, &resp); |
298 | 0 | } |
299 | 89 | } else if ((type_ == OperationType::kRead || type_ == OperationType::kWrite) && |
300 | 89 | response().code() == RedisResponsePB_RedisStatusCode_SERVER_ERROR) { |
301 | 89 | call_->Respond(index_, false, &response()); |
302 | 0 | } else { |
303 | 0 | call_->RespondFailure(index_, status); |
304 | 0 | } |
305 | 104k | } |
306 | | |
307 | 0 | std::string ToString() const { |
308 | 0 | return Format("{ index: $0, operation: $1 }", index_, operation_); |
309 | 0 | } |
310 | | |
311 | | private: |
312 | | OperationType type_; |
313 | | std::shared_ptr<RedisInboundCall> call_; |
314 | | size_t index_; |
315 | | std::shared_ptr<YBRedisOp> operation_; |
316 | | std::function<bool(client::YBSession*, const StatusFunctor&)> functor_; |
317 | | std::string partition_key_; |
318 | | rpc::RpcMethodMetrics metrics_; |
319 | | ManualResponse manual_response_; |
320 | | client::internal::RemoteTabletPtr tablet_; |
321 | | std::atomic<bool> responded_{false}; |
322 | | }; |
323 | | |
324 | | class SessionPool { |
325 | | public: |
326 | | void Init(client::YBClient* client, |
327 | 181 | const scoped_refptr<MetricEntity>& metric_entity) { |
328 | 181 | client_ = client; |
329 | 181 | auto* proto = &METRIC_redis_allocated_sessions; |
330 | 181 | allocated_sessions_metric_ = proto->Instantiate(metric_entity, 0); |
331 | 181 | proto = &METRIC_redis_available_sessions; |
332 | 181 | available_sessions_metric_ = proto->Instantiate(metric_entity, 0); |
333 | 181 | } |
334 | | |
335 | 104k | std::shared_ptr<client::YBSession> Take() { |
336 | 104k | client::YBSession* result = nullptr; |
337 | 104k | if (!queue_.pop(result)) { |
338 | 242 | std::lock_guard<std::mutex> lock(mutex_); |
339 | 242 | auto session = client_->NewSession(); |
340 | 242 | session->SetTimeout( |
341 | 242 | MonoDelta::FromMilliseconds(FLAGS_redis_service_yb_client_timeout_millis)); |
342 | 242 | sessions_.push_back(session); |
343 | 242 | allocated_sessions_metric_->IncrementBy(1); |
344 | 242 | return session; |
345 | 242 | } |
346 | 104k | available_sessions_metric_->DecrementBy(1); |
347 | 104k | return result->shared_from_this(); |
348 | 104k | } |
349 | | |
350 | 104k | void Release(const std::shared_ptr<client::YBSession>& session) { |
351 | 104k | available_sessions_metric_->IncrementBy(1); |
352 | 104k | queue_.push(session.get()); |
353 | 104k | } |
354 | | private: |
355 | | client::YBClient* client_ = nullptr; |
356 | | std::mutex mutex_; |
357 | | std::vector<std::shared_ptr<client::YBSession>> sessions_; |
358 | | boost::lockfree::queue<client::YBSession*> queue_{30}; |
359 | | scoped_refptr<AtomicGauge<uint64_t>> allocated_sessions_metric_; |
360 | | scoped_refptr<AtomicGauge<uint64_t>> available_sessions_metric_; |
361 | | }; |
362 | | |
363 | | class Block; |
364 | | typedef std::shared_ptr<Block> BlockPtr; |
365 | | |
366 | | class Block : public std::enable_shared_from_this<Block> { |
367 | | public: |
368 | | typedef MCVector<Operation*> Ops; |
369 | | |
370 | | Block(const BatchContextPtr& context, |
371 | | Ops::allocator_type allocator, |
372 | | rpc::RpcMethodMetrics metrics_internal) |
373 | | : context_(context), |
374 | | ops_(allocator), |
375 | | metrics_internal_(std::move(metrics_internal)), |
376 | 104k | start_(MonoTime::Now()) { |
377 | 104k | } |
378 | | |
379 | | Block(const Block&) = delete; |
380 | | void operator=(const Block&) = delete; |
381 | | |
382 | 104k | void AddOperation(Operation* operation) { |
383 | 104k | ops_.push_back(operation); |
384 | 104k | } |
385 | | |
386 | 104k | void Launch(SessionPool* session_pool, bool allow_local_calls_in_curr_thread = true) { |
387 | 104k | session_pool_ = session_pool; |
388 | 104k | session_ = session_pool->Take(); |
389 | 104k | bool has_ok = false; |
390 | 104k | bool applied_operations = false; |
391 | | // Supposed to be called only once. |
392 | 104k | client::FlushCallback callback = BlockCallback(shared_from_this()); |
393 | 297 | auto status_callback = [callback](const Status& status){ |
394 | 297 | client::FlushStatus flush_status = {status, {}}; |
395 | 297 | callback(&flush_status); |
396 | 297 | }; |
397 | 104k | for (auto* op : ops_) { |
398 | 104k | has_ok = op->Apply(session_.get(), status_callback, &applied_operations) || has_ok; |
399 | 104k | } |
400 | 104k | if (has_ok) { |
401 | 104k | if (applied_operations) { |
402 | | // Allow local calls in this thread only if no one is waiting behind us. |
403 | 104k | session_->set_allow_local_calls_in_curr_thread( |
404 | 104k | allow_local_calls_in_curr_thread && this->next_ == nullptr); |
405 | 104k | session_->FlushAsync(std::move(callback)); |
406 | 104k | } |
407 | 0 | } else { |
408 | 0 | Processed(); |
409 | 0 | } |
410 | 104k | } |
411 | | |
412 | 0 | BlockPtr SetNext(const BlockPtr& next) { |
413 | 0 | BlockPtr result = std::move(next_); |
414 | 0 | next_ = next; |
415 | 0 | return result; |
416 | 0 | } |
417 | | |
418 | 0 | std::string ToString() const { |
419 | 0 | return Format("{ ops: $0 context: $1 next: $2 }", |
420 | 0 | ops_, static_cast<void*>(context_.get()), next_); |
421 | 0 | } |
422 | | |
423 | | private: |
424 | | class BlockCallback { |
425 | | public: |
426 | 104k | explicit BlockCallback(BlockPtr block) : block_(std::move(block)) { |
427 | | // We remember block_->context_ to avoid issues with having multiple instances referring the |
428 | | // same block (that is allocated in arena and one of them calling block_->Done while another |
429 | | // still have reference to block and trying to update ref counter for it in destructor. |
430 | 104k | context_ = block_ ? block_->context_ : nullptr; |
431 | 104k | } |
432 | | |
433 | 2.71M | ~BlockCallback() { |
434 | | // We only reset context_ after block_, because resetting context_ could free Arena memory |
435 | | // on which block_ is allocated together with its ref counter. |
436 | 2.71M | block_.reset(); |
437 | 2.71M | context_.reset(); |
438 | 2.71M | } |
439 | | |
440 | 104k | void operator()(client::FlushStatus* status) { |
441 | | // Block context owns the arena upon which this block is created. |
442 | | // Done is going to free up block's reference to context. So, unless we ensure that |
443 | | // the context lives beyond the block_.reset() we might get an error while updating the |
444 | | // ref-count for the block_ (in the area of arena owned by the context). |
445 | 104k | auto context = block_->context_; |
446 | 0 | DCHECK(context != nullptr) << block_.get(); |
447 | 104k | block_->Done(status); |
448 | 104k | block_.reset(); |
449 | 104k | } |
450 | | private: |
451 | | BlockPtr block_; |
452 | | BatchContextPtr context_; |
453 | | }; |
454 | | |
455 | 104k | void Done(client::FlushStatus* flush_status) { |
456 | 104k | MonoTime now = MonoTime::Now(); |
457 | 104k | metrics_internal_.handler_latency->Increment(now.GetDeltaSince(start_).ToMicroseconds()); |
458 | 0 | VLOG(3) << "Received status from call " << flush_status->status.ToString(true); |
459 | | |
460 | 104k | std::unordered_map<const client::YBOperation*, Status> op_errors; |
461 | 104k | bool tablet_not_found = false; |
462 | 104k | if (!flush_status->status.ok()) { |
463 | 89 | for (const auto& error : flush_status->errors) { |
464 | 89 | if (error->status().IsNotFound()) { |
465 | 0 | tablet_not_found = true; |
466 | 0 | } |
467 | 89 | op_errors[&error->failed_op()] = std::move(error->status()); |
468 | 89 | YB_LOG_EVERY_N_SECS(WARNING, 1) << "Explicit error while inserting: " |
469 | 89 | << error->status().ToString(); |
470 | 89 | } |
471 | 89 | } |
472 | | |
473 | 104k | if (tablet_not_found && Retrying()) { |
474 | | // We will retry and not mark the ops as failed. |
475 | 0 | return; |
476 | 0 | } |
477 | | |
478 | 104k | for (auto* op : ops_) { |
479 | 104k | if (op->has_operation() && op_errors.find(&op->operation()) != op_errors.end()) { |
480 | | // Could check here for NotFound either. |
481 | 89 | auto s = op_errors[&op->operation()]; |
482 | 89 | op->Respond(s); |
483 | 104k | } else { |
484 | 104k | op->Respond(Status::OK()); |
485 | 104k | } |
486 | 104k | } |
487 | | |
488 | 104k | Processed(); |
489 | 104k | } |
490 | | |
491 | 104k | void Processed() { |
492 | 104k | auto allow_local_calls_in_curr_thread = false; |
493 | 104k | if (session_) { |
494 | 104k | allow_local_calls_in_curr_thread = session_->allow_local_calls_in_curr_thread(); |
495 | 104k | session_pool_->Release(session_); |
496 | 104k | session_.reset(); |
497 | 104k | } |
498 | 104k | if (next_) { |
499 | 0 | next_->Launch(session_pool_, allow_local_calls_in_curr_thread); |
500 | 0 | } |
501 | 104k | context_.reset(); |
502 | 104k | } |
503 | | |
504 | 0 | bool Retrying() { |
505 | 0 | auto old_table = context_->table(); |
506 | 0 | context_->CleanYBTableFromCache(); |
507 | |
|
508 | 0 | const int kMaxRetries = 2; |
509 | 0 | if (num_retries_ >= kMaxRetries) { |
510 | 0 | VLOG(3) << "Not retrying because we are past kMaxRetries. num_retries_ = " << num_retries_ |
511 | 0 | << " kMaxRetries = " << kMaxRetries; |
512 | 0 | return false; |
513 | 0 | } |
514 | 0 | num_retries_++; |
515 | |
|
516 | 0 | auto allow_local_calls_in_curr_thread = false; |
517 | 0 | if (session_) { |
518 | 0 | allow_local_calls_in_curr_thread = session_->allow_local_calls_in_curr_thread(); |
519 | 0 | session_pool_->Release(session_); |
520 | 0 | session_.reset(); |
521 | 0 | } |
522 | |
|
523 | 0 | auto table = context_->table(); |
524 | 0 | if (!table || table->id() == old_table->id()) { |
525 | 0 | VLOG(3) << "Not retrying because new table is : " << (table ? table->id() : "nullptr") |
526 | 0 | << " old table was " << old_table->id(); |
527 | 0 | return false; |
528 | 0 | } |
529 | | |
530 | | // Swap out ops with the newer version of the ops referring to the newly created table. |
531 | 0 | for (auto* op : ops_) { |
532 | 0 | op->ResetTable(context_->table()); |
533 | 0 | } |
534 | 0 | Launch(session_pool_, allow_local_calls_in_curr_thread); |
535 | 0 | VLOG(3) << " Retrying with table : " << table->id() << " old table was " << old_table->id(); |
536 | 0 | return true; |
537 | 0 | } |
538 | | |
539 | | private: |
540 | | BatchContextPtr context_; |
541 | | Ops ops_; |
542 | | rpc::RpcMethodMetrics metrics_internal_; |
543 | | MonoTime start_; |
544 | | SessionPool* session_pool_; |
545 | | std::shared_ptr<client::YBSession> session_; |
546 | | BlockPtr next_; |
547 | | int num_retries_ = 1; |
548 | | }; |
549 | | |
550 | | typedef std::array<rpc::RpcMethodMetrics, kOperationTypeMapSize> InternalMetrics; |
551 | | |
552 | | struct BlockData { |
553 | 209k | explicit BlockData(Arena* arena) : used_keys(UsedKeys::allocator_type(arena)) {} |
554 | | |
555 | 0 | std::string ToString() const { |
556 | 0 | return Format("{ used_keys: $0 block: $1 count: $2 }", used_keys, block, count); |
557 | 0 | } |
558 | | |
559 | | typedef MCUnorderedSet<Slice, Slice::Hash> UsedKeys; |
560 | | UsedKeys used_keys; |
561 | | BlockPtr block; |
562 | | size_t count = 0; |
563 | | }; |
564 | | |
565 | | class TabletOperations { |
566 | | public: |
567 | | explicit TabletOperations(Arena* arena) |
568 | 104k | : read_data_(arena), write_data_(arena) { |
569 | 104k | } |
570 | | |
571 | 208k | BlockData& data(OperationType type) { |
572 | 208k | switch (type) { |
573 | 104k | case OperationType::kRead: |
574 | 104k | return read_data_; |
575 | 104k | case OperationType::kWrite: |
576 | 104k | return write_data_; |
577 | 0 | case OperationType::kNone: FALLTHROUGH_INTENDED; |
578 | 0 | case OperationType::kLocal: |
579 | 0 | FATAL_INVALID_ENUM_VALUE(OperationType, type); |
580 | 208k | } |
581 | 0 | FATAL_INVALID_ENUM_VALUE(OperationType, type); |
582 | 0 | } |
583 | | |
584 | 104k | void Done(SessionPool* session_pool, bool allow_local_calls_in_curr_thread) { |
585 | 104k | if (flush_head_) { |
586 | 297 | flush_head_->Launch(session_pool, allow_local_calls_in_curr_thread); |
587 | 104k | } else { |
588 | 104k | if (read_data_.block) { |
589 | 42.7k | read_data_.block->Launch(session_pool, allow_local_calls_in_curr_thread); |
590 | 42.7k | } |
591 | 104k | if (write_data_.block) { |
592 | 61.5k | write_data_.block->Launch(session_pool, allow_local_calls_in_curr_thread); |
593 | 61.5k | } |
594 | 104k | } |
595 | 104k | } |
596 | | |
597 | | void Process(const BatchContextPtr& context, |
598 | | Arena* arena, |
599 | | Operation* operation, |
600 | 104k | const InternalMetrics& metrics_internal) { |
601 | 104k | auto type = operation->type(); |
602 | 104k | if (type == OperationType::kLocal) { |
603 | 297 | ProcessLocalOperation(context, arena, operation, metrics_internal); |
604 | 297 | return; |
605 | 297 | } |
606 | 104k | boost::container::small_vector<Slice, RedisClientCommand::static_capacity> keys; |
607 | 104k | operation->GetKeys(&keys); |
608 | 104k | CheckConflicts(type, keys); |
609 | 104k | auto& data = this->data(type); |
610 | 104k | if (!data.block) { |
611 | 104k | ArenaAllocator<Block> alloc(arena); |
612 | 104k | data.block = std::allocate_shared<Block>( |
613 | 104k | alloc, context, alloc, metrics_internal[static_cast<size_t>(OperationType::kRead)]); |
614 | 104k | if (last_conflict_type_ == OperationType::kLocal) { |
615 | 0 | last_local_block_->SetNext(data.block); |
616 | 0 | last_conflict_type_ = type; |
617 | 104k | } else if (type == last_conflict_type_) { |
618 | 0 | auto old_value = this->data(Opposite(type)).block->SetNext(data.block); |
619 | 0 | if (old_value) { |
620 | 0 | LOG(DFATAL) << "Opposite already had next block: " |
621 | 0 | << operation->call().serialized_request().ToDebugString(); |
622 | 0 | } |
623 | 0 | } |
624 | 104k | } |
625 | 104k | data.block->AddOperation(operation); |
626 | 104k | RememberKeys(type, &keys); |
627 | 104k | } |
628 | | |
629 | 0 | std::string ToString() const { |
630 | 0 | return Format("{ read_data: $0 write_data: $1 flush_head: $2 last_local_block: $3 " |
631 | 0 | "last_conflict_type: $4 }", |
632 | 0 | read_data_, write_data_, flush_head_, last_local_block_, last_conflict_type_); |
633 | 0 | } |
634 | | |
635 | | private: |
636 | | void ProcessLocalOperation(const BatchContextPtr& context, |
637 | | Arena* arena, |
638 | | Operation* operation, |
639 | 297 | const InternalMetrics& metrics_internal) { |
640 | 297 | ArenaAllocator<Block> alloc(arena); |
641 | 297 | auto block = std::allocate_shared<Block>( |
642 | 297 | alloc, context, alloc, metrics_internal[static_cast<size_t>(OperationType::kLocal)]); |
643 | 297 | switch (last_conflict_type_) { |
644 | 297 | case OperationType::kNone: |
645 | 297 | if (read_data_.block) { |
646 | 0 | flush_head_ = read_data_.block; |
647 | 0 | if (write_data_.block) { |
648 | 0 | read_data_.block->SetNext(write_data_.block); |
649 | 0 | write_data_.block->SetNext(block); |
650 | 0 | } else { |
651 | 0 | read_data_.block->SetNext(block); |
652 | 0 | } |
653 | 297 | } else if (write_data_.block) { |
654 | 0 | flush_head_ = write_data_.block; |
655 | 0 | write_data_.block->SetNext(block); |
656 | 297 | } else { |
657 | 297 | flush_head_ = block; |
658 | 297 | } |
659 | 297 | break; |
660 | 0 | case OperationType::kRead: |
661 | 0 | read_data_.block->SetNext(block); |
662 | 0 | break; |
663 | 0 | case OperationType::kWrite: |
664 | 0 | write_data_.block->SetNext(block); |
665 | 0 | break; |
666 | 0 | case OperationType::kLocal: |
667 | 0 | last_local_block_->SetNext(block); |
668 | 0 | break; |
669 | 297 | } |
670 | 297 | read_data_.block = nullptr; |
671 | 297 | read_data_.used_keys.clear(); |
672 | 297 | write_data_.block = nullptr; |
673 | 297 | write_data_.used_keys.clear(); |
674 | 297 | last_local_block_ = block; |
675 | 297 | last_conflict_type_ = OperationType::kLocal; |
676 | 297 | block->AddOperation(operation); |
677 | 297 | } |
678 | | |
679 | 0 | void ConflictFound(OperationType type) { |
680 | 0 | auto& data = this->data(type); |
681 | 0 | auto& opposite_data = this->data(Opposite(type)); |
682 | |
|
683 | 0 | switch (last_conflict_type_) { |
684 | 0 | case OperationType::kNone: |
685 | 0 | flush_head_ = opposite_data.block; |
686 | 0 | opposite_data.block->SetNext(data.block); |
687 | 0 | break; |
688 | 0 | case OperationType::kWrite: |
689 | 0 | case OperationType::kRead: |
690 | 0 | data.block = nullptr; |
691 | 0 | data.used_keys.clear(); |
692 | 0 | break; |
693 | 0 | case OperationType::kLocal: |
694 | 0 | last_local_block_->SetNext(data.block); |
695 | 0 | break; |
696 | 0 | } |
697 | 0 | last_conflict_type_ = type; |
698 | 0 | } |
699 | | |
700 | 104k | void CheckConflicts(OperationType type, const RedisKeyList& keys) { |
701 | 104k | if (last_conflict_type_ == type) { |
702 | 0 | return; |
703 | 0 | } |
704 | 104k | if (last_conflict_type_ == OperationType::kLocal) { |
705 | 0 | return; |
706 | 0 | } |
707 | 104k | auto& opposite = data(Opposite(type)); |
708 | 104k | bool conflict = false; |
709 | 104k | for (const auto& key : keys) { |
710 | 104k | if (opposite.used_keys.count(key)) { |
711 | 0 | conflict = true; |
712 | 0 | break; |
713 | 0 | } |
714 | 104k | } |
715 | 104k | if (conflict) { |
716 | 0 | ConflictFound(type); |
717 | 0 | } |
718 | 104k | } |
719 | | |
720 | 104k | void RememberKeys(OperationType type, RedisKeyList* keys) { |
721 | 104k | BlockData* dest; |
722 | 104k | switch (type) { |
723 | 42.7k | case OperationType::kRead: |
724 | 42.7k | dest = &read_data_; |
725 | 42.7k | break; |
726 | 61.5k | case OperationType::kWrite: |
727 | 61.5k | dest = &write_data_; |
728 | 61.5k | break; |
729 | 0 | case OperationType::kNone: FALLTHROUGH_INTENDED; |
730 | 0 | case OperationType::kLocal: |
731 | 0 | FATAL_INVALID_ENUM_VALUE(OperationType, type); |
732 | 104k | } |
733 | 104k | for (auto& key : *keys) { |
734 | 104k | dest->used_keys.insert(std::move(key)); |
735 | 104k | } |
736 | 104k | } |
737 | | |
738 | | BlockData read_data_; |
739 | | BlockData write_data_; |
740 | | BlockPtr flush_head_; |
741 | | BlockPtr last_local_block_; |
742 | | |
743 | | // Type of command that caused last conflict between reads and writes. |
744 | | OperationType last_conflict_type_ = OperationType::kNone; |
745 | | }; |
746 | | |
747 | | YB_STRONGLY_TYPED_BOOL(IsMonitorMessage); |
748 | | |
749 | | struct RedisServiceImplData : public RedisServiceData { |
750 | | RedisServiceImplData(RedisServer* server, string&& yb_tier_master_addresses); |
751 | | |
752 | | void AppendToMonitors(Connection* conn) override; |
753 | | void RemoveFromMonitors(Connection* conn) override; |
754 | | void LogToMonitors(const string& end, const string& db, const RedisClientCommand& cmd) override; |
755 | | yb::Result<std::shared_ptr<client::YBTable>> GetYBTableForDB(const string& db_name) override; |
756 | | |
757 | | void CleanYBTableFromCacheForDB(const string& table); |
758 | | |
759 | | void AppendToSubscribers( |
760 | | AsPattern type, const std::vector<std::string>& channels, rpc::Connection* conn, |
761 | | std::vector<size_t>* subs) override; |
762 | | void RemoveFromSubscribers( |
763 | | AsPattern type, const std::vector<std::string>& channels, rpc::Connection* conn, |
764 | | std::vector<size_t>* subs) override; |
765 | | void CleanUpSubscriptions(Connection* conn) override; |
766 | | size_t NumSubscribers(AsPattern type, const std::string& channel) override; |
767 | | std::unordered_set<std::string> GetSubscriptions(AsPattern type, rpc::Connection* conn) override; |
768 | | std::unordered_set<std::string> GetAllSubscriptions(AsPattern type) override; |
769 | | int Publish(const string& channel, const string& message); |
770 | | void ForwardToInterestedProxies( |
771 | | const string& channel, const string& message, const IntFunctor& f) override; |
772 | | int PublishToLocalClients(IsMonitorMessage mode, const string& channel, const string& message); |
773 | | Result<vector<HostPortPB>> GetServerAddrsForChannel(const string& channel); |
774 | | size_t NumSubscriptionsUnlocked(Connection* conn); |
775 | | |
776 | | CHECKED_STATUS GetRedisPasswords(vector<string>* passwords) override; |
777 | | CHECKED_STATUS Initialize(); |
778 | 104k | bool initialized() const { return initialized_.load(std::memory_order_relaxed); } |
779 | | |
780 | | // yb::Result<std::shared_ptr<client::YBTable>> GetYBTableForDB(const string& db_name); |
781 | | |
782 | | std::string yb_tier_master_addresses_; |
783 | | |
784 | | yb::rpc::RpcMethodMetrics metrics_error_; |
785 | | InternalMetrics metrics_internal_; |
786 | | |
787 | | // Mutex that protects the creation of client_ and populating db_to_opened_table_. |
788 | | std::mutex yb_mutex_; |
789 | | std::atomic<bool> initialized_; |
790 | | client::YBClient* client_ = nullptr; |
791 | | SessionPool session_pool_; |
792 | | std::unordered_map<std::string, std::shared_ptr<client::YBTable>> db_to_opened_table_; |
793 | | std::shared_ptr<client::YBMetaDataCache> tables_cache_; |
794 | | |
795 | | rw_semaphore pubsub_mutex_; |
796 | | std::unordered_map<std::string, std::unordered_set<Connection*>> channels_to_clients_; |
797 | | std::unordered_map<std::string, std::unordered_set<Connection*>> patterns_to_clients_; |
798 | | struct ClientSubscription { |
799 | | std::unordered_set<std::string> channels; |
800 | | std::unordered_set<std::string> patterns; |
801 | | }; |
802 | | std::unordered_map<Connection*, ClientSubscription> clients_to_subscriptions_; |
803 | | |
804 | | std::unordered_set<Connection*> monitoring_clients_; |
805 | | scoped_refptr<AtomicGauge<uint64_t>> num_clients_monitoring_; |
806 | | |
807 | | std::mutex redis_password_mutex_; |
808 | | MonoTime redis_cached_password_validity_expiry_; |
809 | | vector<string> redis_cached_passwords_; |
810 | | |
811 | | RedisServer* server_ = nullptr; |
812 | | }; |
813 | | |
814 | | class BatchContextImpl : public BatchContext { |
815 | | public: |
816 | | BatchContextImpl( |
817 | | const string& dbname, const std::shared_ptr<RedisInboundCall>& call, |
818 | | RedisServiceImplData* impl_data) |
819 | | : impl_data_(impl_data), |
820 | | db_name_(dbname), |
821 | | call_(call), |
822 | | consumption_(impl_data->server_->mem_tracker(), 0), |
823 | | operations_(&arena_), |
824 | | lookups_left_(0), |
825 | 104k | tablets_(&arena_) { |
826 | 104k | } |
827 | | |
828 | 104k | virtual ~BatchContextImpl() {} |
829 | | |
830 | 105k | const RedisClientCommand& command(size_t idx) const override { |
831 | 105k | return call_->client_batch()[idx]; |
832 | 105k | } |
833 | | |
834 | 862 | const std::shared_ptr<RedisInboundCall>& call() const override { |
835 | 862 | return call_; |
836 | 862 | } |
837 | | |
838 | 57 | client::YBClient* client() const override { |
839 | 57 | return impl_data_->client_; |
840 | 57 | } |
841 | | |
842 | 83 | RedisServiceImplData* service_data() override { |
843 | 83 | return impl_data_; |
844 | 83 | } |
845 | | |
846 | 513 | const RedisServer* server() override { |
847 | 513 | return impl_data_->server_; |
848 | 513 | } |
849 | | |
850 | 0 | void CleanYBTableFromCache() override { |
851 | 0 | impl_data_->CleanYBTableFromCacheForDB(db_name_); |
852 | 0 | } |
853 | | |
854 | 104k | std::shared_ptr<client::YBTable> table() override { |
855 | 104k | auto table = impl_data_->GetYBTableForDB(db_name_); |
856 | 104k | if (!table.ok()) { |
857 | 0 | return nullptr; |
858 | 0 | } |
859 | 104k | return *table; |
860 | 104k | } |
861 | | |
862 | 104k | void Commit() { |
863 | 104k | Commit(1); |
864 | 104k | } |
865 | | |
866 | 104k | void Commit(int retries) { |
867 | 104k | if (operations_.empty()) { |
868 | 468 | return; |
869 | 468 | } |
870 | | |
871 | 104k | auto table = impl_data_->GetYBTableForDB(db_name_); |
872 | 104k | if (!table.ok()) { |
873 | 0 | for (auto& operation : operations_) { |
874 | 0 | operation.Respond(table.status()); |
875 | 0 | } |
876 | 0 | } |
877 | 104k | auto deadline = CoarseMonoClock::Now() + FLAGS_redis_service_yb_client_timeout_millis * 1ms; |
878 | 104k | lookups_left_.store(operations_.size(), std::memory_order_release); |
879 | 104k | retry_lookups_.store(false, std::memory_order_release); |
880 | 104k | for (auto& operation : operations_) { |
881 | 104k | impl_data_->client_->LookupTabletByKey( |
882 | 104k | table.get(), operation.partition_key(), deadline, |
883 | 104k | std::bind( |
884 | 104k | &BatchContextImpl::LookupDone, scoped_refptr<BatchContextImpl>(this), &operation, |
885 | 104k | retries, _1)); |
886 | 104k | } |
887 | 104k | } |
888 | | |
889 | | void Apply( |
890 | | size_t index, |
891 | | std::shared_ptr<client::YBRedisReadOp> operation, |
892 | 42.7k | const rpc::RpcMethodMetrics& metrics) override { |
893 | 42.7k | DoApply(index, std::move(operation), metrics); |
894 | 42.7k | } |
895 | | |
896 | | void Apply( |
897 | | size_t index, |
898 | | std::shared_ptr<client::YBRedisWriteOp> operation, |
899 | 61.5k | const rpc::RpcMethodMetrics& metrics) override { |
900 | 61.5k | DoApply(index, std::move(operation), metrics); |
901 | 61.5k | } |
902 | | |
903 | | void Apply( |
904 | | size_t index, |
905 | | std::function<bool(client::YBSession*, const StatusFunctor&)> functor, |
906 | | std::string partition_key, |
907 | | const rpc::RpcMethodMetrics& metrics, |
908 | 297 | ManualResponse manual_response) override { |
909 | 297 | DoApply(index, std::move(functor), std::move(partition_key), metrics, manual_response); |
910 | 297 | } |
911 | | |
912 | 0 | std::string ToString() const { |
913 | 0 | return Format("{ tablets: $0 }", tablets_); |
914 | 0 | } |
915 | | |
916 | | private: |
917 | | template <class... Args> |
918 | 104k | void DoApply(Args&&... args) { |
919 | 104k | operations_.emplace_back(call_, std::forward<Args>(args)...); |
920 | 104k | if (PREDICT_FALSE(operations_.back().responded())) { |
921 | 0 | operations_.pop_back(); |
922 | 104k | } else { |
923 | 104k | consumption_.Add(operations_.back().space_used_by_request()); |
924 | 104k | } |
925 | 104k | } redis_service.cc:_ZN2yb11redisserver12_GLOBAL__N_116BatchContextImpl7DoApplyIJRmNSt3__110shared_ptrINS_6client13YBRedisReadOpEEERKNS_3rpc16RpcMethodMetricsEEEEvDpOT_ Line | Count | Source | 918 | 42.7k | void DoApply(Args&&... args) { | 919 | 42.7k | operations_.emplace_back(call_, std::forward<Args>(args)...); | 920 | 42.7k | if (PREDICT_FALSE(operations_.back().responded())) { | 921 | 0 | operations_.pop_back(); | 922 | 42.7k | } else { | 923 | 42.7k | consumption_.Add(operations_.back().space_used_by_request()); | 924 | 42.7k | } | 925 | 42.7k | } |
redis_service.cc:_ZN2yb11redisserver12_GLOBAL__N_116BatchContextImpl7DoApplyIJRmNSt3__110shared_ptrINS_6client14YBRedisWriteOpEEERKNS_3rpc16RpcMethodMetricsEEEEvDpOT_ Line | Count | Source | 918 | 61.5k | void DoApply(Args&&... args) { | 919 | 61.5k | operations_.emplace_back(call_, std::forward<Args>(args)...); | 920 | 61.5k | if (PREDICT_FALSE(operations_.back().responded())) { | 921 | 0 | operations_.pop_back(); | 922 | 61.5k | } else { | 923 | 61.5k | consumption_.Add(operations_.back().space_used_by_request()); | 924 | 61.5k | } | 925 | 61.5k | } |
redis_service.cc:_ZN2yb11redisserver12_GLOBAL__N_116BatchContextImpl7DoApplyIJRmNSt3__18functionIFbPNS_6client9YBSessionERKN5boost8functionIFvRKNS_6StatusEEEEEEENS5_12basic_stringIcNS5_11char_traitsIcEENS5_9allocatorIcEEEERKNS_3rpc16RpcMethodMetricsERNS_17StronglyTypedBoolINS0_18ManualResponse_TagEEEEEEvDpOT_ Line | Count | Source | 918 | 297 | void DoApply(Args&&... args) { | 919 | 297 | operations_.emplace_back(call_, std::forward<Args>(args)...); | 920 | 297 | if (PREDICT_FALSE(operations_.back().responded())) { | 921 | 0 | operations_.pop_back(); | 922 | 297 | } else { | 923 | 297 | consumption_.Add(operations_.back().space_used_by_request()); | 924 | 297 | } | 925 | 297 | } |
|
926 | | |
927 | | void LookupDone( |
928 | 104k | Operation* operation, int retries, const Result<client::internal::RemoteTabletPtr>& result) { |
929 | 104k | const int kMaxRetries = 2; |
930 | 104k | if (!result.ok()) { |
931 | 0 | auto status = result.status(); |
932 | 0 | if (status.IsNotFound() && retries < kMaxRetries) { |
933 | 0 | retry_lookups_.store(false, std::memory_order_release); |
934 | 0 | } else { |
935 | 0 | operation->Respond(status); |
936 | 0 | } |
937 | 104k | } else { |
938 | 104k | operation->SetTablet(*result); |
939 | 104k | } |
940 | 104k | if (lookups_left_.fetch_sub(1, std::memory_order_acq_rel) != 1) { |
941 | 264 | return; |
942 | 264 | } |
943 | | |
944 | 104k | if (retries < kMaxRetries && retry_lookups_.load(std::memory_order_acquire)) { |
945 | 0 | CleanYBTableFromCache(); |
946 | 0 | Commit(retries + 1); |
947 | 0 | return; |
948 | 0 | } |
949 | | |
950 | 104k | BatchContextPtr self(this); |
951 | 104k | for (auto& operation : operations_) { |
952 | 104k | if (!operation.responded()) { |
953 | 104k | auto it = tablets_.find(operation.tablet()->tablet_id()); |
954 | 104k | if (it == tablets_.end()) { |
955 | 104k | it = tablets_.emplace(operation.tablet()->tablet_id(), TabletOperations(&arena_)).first; |
956 | 104k | } |
957 | 104k | it->second.Process(self, &arena_, &operation, impl_data_->metrics_internal_); |
958 | 104k | } |
959 | 104k | } |
960 | | |
961 | 104k | size_t idx = 0; |
962 | 104k | for (auto& tablet : tablets_) { |
963 | 104k | tablet.second.Done(&impl_data_->session_pool_, ++idx == tablets_.size()); |
964 | 104k | } |
965 | 104k | tablets_.clear(); |
966 | 104k | } |
967 | | |
968 | | RedisServiceImplData* impl_data_ = nullptr; |
969 | | |
970 | | const string db_name_; |
971 | | std::shared_ptr<RedisInboundCall> call_; |
972 | | ScopedTrackedConsumption consumption_; |
973 | | |
974 | | Arena arena_; |
975 | | MCDeque<Operation> operations_; |
976 | | std::atomic<bool> retry_lookups_; |
977 | | std::atomic<size_t> lookups_left_; |
978 | | MCUnorderedMap<Slice, TabletOperations, Slice::Hash> tablets_; |
979 | | }; |
980 | | |
981 | | } // namespace |
982 | | |
983 | | class RedisServiceImpl::Impl { |
984 | | public: |
985 | | Impl(RedisServer* server, string yb_tier_master_address); |
986 | | |
987 | 0 | ~Impl() { |
988 | | // Wait for DebugSleep to finish. |
989 | | // We use DebugSleep only during tests. |
990 | | // So just for long enough, giving extra 400ms for it. |
991 | 0 | std::this_thread::sleep_for(500ms); |
992 | 0 | } |
993 | | |
994 | | void Handle(yb::rpc::InboundCallPtr call_ptr); |
995 | | |
996 | | private: |
997 | 117k | void SetupMethod(const RedisCommandInfo& info) { |
998 | 117k | auto info_ptr = std::make_shared<RedisCommandInfo>(info); |
999 | 117k | std::string lower_name = boost::to_lower_copy(info.name); |
1000 | 117k | std::string upper_name = boost::to_upper_copy(info.name); |
1001 | 117k | size_t len = info.name.size(); |
1002 | 117k | std::string temp(len, 0); |
1003 | 136M | for (size_t i = 0; i != (1ULL << len); ++i) { |
1004 | 2.10G | for (size_t j = 0; j != len; ++j) { |
1005 | 985M | temp[j] = i & (1 << j) ? upper_name[j] : lower_name[j]; |
1006 | 1.97G | } |
1007 | 136M | names_.push_back(temp); |
1008 | 136M | CHECK(command_name_to_info_map_.emplace(names_.back(), info_ptr).second); |
1009 | 136M | } |
1010 | 117k | } |
1011 | | |
1012 | 104k | bool CheckArgumentSizeOK(const RedisClientCommand& cmd_args) { |
1013 | 377k | for (Slice arg : cmd_args) { |
1014 | 377k | if (arg.size() > FLAGS_redis_max_value_size) { |
1015 | 0 | return false; |
1016 | 0 | } |
1017 | 377k | } |
1018 | 104k | return true; |
1019 | 104k | } |
1020 | | |
1021 | 104k | bool CheckAuthentication(RedisConnectionContext* conn_context) { |
1022 | 104k | if (!conn_context->is_authenticated()) { |
1023 | 523 | vector<string> passwords; |
1024 | 523 | Status s = data_.GetRedisPasswords(&passwords); |
1025 | 526 | conn_context->set_authenticated(!FLAGS_enable_redis_auth || (s.ok() && passwords.empty())); |
1026 | 523 | } |
1027 | 104k | return conn_context->is_authenticated(); |
1028 | 104k | } |
1029 | | |
1030 | | void PopulateHandlers(); |
1031 | | // Fetches the appropriate handler for the command, nullptr if none exists. |
1032 | | const RedisCommandInfo* FetchHandler(const RedisClientCommand& cmd_args); |
1033 | | |
1034 | | std::deque<std::string> names_; |
1035 | | std::unordered_map<Slice, RedisCommandInfoPtr, Slice::Hash> command_name_to_info_map_; |
1036 | | |
1037 | | RedisServiceImplData data_; |
1038 | | }; |
1039 | | |
1040 | | RedisServiceImplData::RedisServiceImplData(RedisServer* server, string&& yb_tier_master_addresses) |
1041 | | : yb_tier_master_addresses_(std::move(yb_tier_master_addresses)), |
1042 | | initialized_(false), |
1043 | 1.40k | server_(server) {} |
1044 | | |
1045 | | yb::Result<std::shared_ptr<client::YBTable>> RedisServiceImplData::GetYBTableForDB( |
1046 | 208k | const string& db_name) { |
1047 | 208k | std::shared_ptr<client::YBTable> table; |
1048 | 208k | YBTableName table_name = GetYBTableNameForRedisDatabase(db_name); |
1049 | 208k | bool was_cached = false; |
1050 | 208k | auto res = tables_cache_->GetTable(table_name, &table, &was_cached); |
1051 | 208k | if (!res.ok()) return res; |
1052 | 208k | return table; |
1053 | 208k | } |
1054 | | |
1055 | 0 | void RedisServiceImplData::AppendToMonitors(Connection* conn) { |
1056 | 0 | VLOG(3) << "AppendToMonitors (" << conn->ToString(); |
1057 | 0 | { |
1058 | 0 | boost::lock_guard<decltype(pubsub_mutex_)> lock(pubsub_mutex_); |
1059 | 0 | monitoring_clients_.insert(conn); |
1060 | 0 | num_clients_monitoring_->set_value(monitoring_clients_.size()); |
1061 | 0 | } |
1062 | 0 | auto& context = static_cast<RedisConnectionContext&>(conn->context()); |
1063 | 0 | if (context.ClientMode() != RedisClientMode::kMonitoring) { |
1064 | 0 | context.SetClientMode(RedisClientMode::kMonitoring); |
1065 | 0 | context.SetCleanupHook(std::bind(&RedisServiceImplData::RemoveFromMonitors, this, conn)); |
1066 | 0 | } |
1067 | 0 | } |
1068 | | |
1069 | 0 | void RedisServiceImplData::RemoveFromMonitors(Connection* conn) { |
1070 | 0 | VLOG(3) << "RemoveFromMonitors (" << conn->ToString(); |
1071 | 0 | { |
1072 | 0 | boost::lock_guard<decltype(pubsub_mutex_)> lock(pubsub_mutex_); |
1073 | 0 | monitoring_clients_.erase(conn); |
1074 | 0 | num_clients_monitoring_->set_value(monitoring_clients_.size()); |
1075 | 0 | } |
1076 | 0 | } |
1077 | | |
1078 | 0 | size_t RedisServiceImplData::NumSubscriptionsUnlocked(Connection* conn) { |
1079 | 0 | return clients_to_subscriptions_[conn].channels.size() + |
1080 | 0 | clients_to_subscriptions_[conn].patterns.size(); |
1081 | 0 | } |
1082 | | |
1083 | | void RedisServiceImplData::AppendToSubscribers( |
1084 | | AsPattern type, const std::vector<std::string>& channels, rpc::Connection* conn, |
1085 | 0 | std::vector<size_t>* subs) { |
1086 | 0 | boost::lock_guard<decltype(pubsub_mutex_)> lock(pubsub_mutex_); |
1087 | 0 | subs->clear(); |
1088 | 0 | for (const auto& channel : channels) { |
1089 | 0 | VLOG(3) << "AppendToSubscribers (" << type << ", " << channel << ", " << conn->ToString(); |
1090 | 0 | if (type == AsPattern::kTrue) { |
1091 | 0 | patterns_to_clients_[channel].insert(conn); |
1092 | 0 | clients_to_subscriptions_[conn].patterns.insert(channel); |
1093 | 0 | } else { |
1094 | 0 | channels_to_clients_[channel].insert(conn); |
1095 | 0 | clients_to_subscriptions_[conn].channels.insert(channel); |
1096 | 0 | } |
1097 | 0 | subs->push_back(NumSubscriptionsUnlocked(conn)); |
1098 | 0 | } |
1099 | 0 | auto& context = static_cast<RedisConnectionContext&>(conn->context()); |
1100 | 0 | if (context.ClientMode() != RedisClientMode::kSubscribed) { |
1101 | 0 | context.SetClientMode(RedisClientMode::kSubscribed); |
1102 | 0 | context.SetCleanupHook(std::bind(&RedisServiceImplData::CleanUpSubscriptions, this, conn)); |
1103 | 0 | } |
1104 | 0 | } |
1105 | | |
1106 | | void RedisServiceImplData::RemoveFromSubscribers( |
1107 | | AsPattern type, const std::vector<std::string>& channels, rpc::Connection* conn, |
1108 | 0 | std::vector<size_t>* subs) { |
1109 | 0 | boost::lock_guard<decltype(pubsub_mutex_)> lock(pubsub_mutex_); |
1110 | 0 | auto& map_to_clients = (type == AsPattern::kTrue ? patterns_to_clients_ : channels_to_clients_); |
1111 | 0 | auto& map_from_clients = |
1112 | 0 | (type == AsPattern::kTrue ? clients_to_subscriptions_[conn].patterns |
1113 | 0 | : clients_to_subscriptions_[conn].channels); |
1114 | |
|
1115 | 0 | subs->clear(); |
1116 | 0 | for (const auto& channel : channels) { |
1117 | 0 | map_to_clients[channel].erase(conn); |
1118 | 0 | if (map_to_clients[channel].empty()) { |
1119 | 0 | map_to_clients.erase(channel); |
1120 | 0 | } |
1121 | 0 | map_from_clients.erase(channel); |
1122 | 0 | subs->push_back(NumSubscriptionsUnlocked(conn)); |
1123 | 0 | } |
1124 | 0 | } |
1125 | | |
1126 | | std::unordered_set<string> RedisServiceImplData::GetSubscriptions( |
1127 | 0 | AsPattern type, Connection* conn) { |
1128 | 0 | SharedLock<decltype(pubsub_mutex_)> lock(pubsub_mutex_); |
1129 | 0 | return ( |
1130 | 0 | type == AsPattern::kTrue ? clients_to_subscriptions_[conn].patterns |
1131 | 0 | : clients_to_subscriptions_[conn].channels); |
1132 | 0 | } |
1133 | | |
1134 | | // ENG-4199: Consider getting all the cluster-wide subscriptions? |
1135 | 0 | std::unordered_set<string> RedisServiceImplData::GetAllSubscriptions(AsPattern type) { |
1136 | 0 | std::unordered_set<string> ret; |
1137 | 0 | SharedLock<decltype(pubsub_mutex_)> lock(pubsub_mutex_); |
1138 | 0 | for (const auto& element : |
1139 | 0 | (type == AsPattern::kTrue ? patterns_to_clients_ : channels_to_clients_)) { |
1140 | 0 | ret.insert(element.first); |
1141 | 0 | } |
1142 | 0 | return ret; |
1143 | 0 | } |
1144 | | |
1145 | | // ENG-4199: Consider getting all the cluster-wide subscribers? |
1146 | 0 | size_t RedisServiceImplData::NumSubscribers(AsPattern type, const std::string& channel) { |
1147 | 0 | SharedLock<decltype(pubsub_mutex_)> lock(pubsub_mutex_); |
1148 | 0 | const auto& look_in = (type ? patterns_to_clients_ : channels_to_clients_); |
1149 | 0 | const auto& iter = look_in.find(channel); |
1150 | 0 | return (iter == look_in.end() ? 0 : iter->second.size()); |
1151 | 0 | } |
1152 | | |
1153 | | void RedisServiceImplData::LogToMonitors( |
1154 | 104k | const string& end, const string& db, const RedisClientCommand& cmd) { |
1155 | 104k | { |
1156 | 104k | SharedLock<decltype(pubsub_mutex_)> rlock(pubsub_mutex_); |
1157 | 104k | if (monitoring_clients_.empty()) return; |
1158 | 18.4E | } |
1159 | | |
1160 | | // Prepare the string to be sent to all the monitoring clients. |
1161 | | // TODO: Use timestamp that works with converter. |
1162 | 18.4E | int64_t now_ms = ToMicroseconds(CoarseMonoClock::Now().time_since_epoch()); |
1163 | 18.4E | std::stringstream ss; |
1164 | 18.4E | ss << "+"; |
1165 | 18.4E | ss.setf(std::ios::fixed, std::ios::floatfield); |
1166 | 18.4E | ss.precision(6); |
1167 | 18.4E | ss << (now_ms / 1000000.0) << " [" << db << " " << end << "]"; |
1168 | 0 | for (auto& part : cmd) { |
1169 | 0 | ss << " \"" << part.ToBuffer() << "\""; |
1170 | 0 | } |
1171 | 18.4E | ss << "\r\n"; |
1172 | | |
1173 | 18.4E | PublishToLocalClients(IsMonitorMessage::kTrue, "", ss.str()); |
1174 | 18.4E | } |
1175 | | |
1176 | 0 | int RedisServiceImplData::Publish(const string& channel, const string& message) { |
1177 | 0 | VLOG(3) << "Forwarding to clients on channel " << channel; |
1178 | 0 | return PublishToLocalClients(IsMonitorMessage::kFalse, channel, message); |
1179 | 0 | } |
1180 | | |
1181 | | Result<vector<HostPortPB>> RedisServiceImplData::GetServerAddrsForChannel( |
1182 | 0 | const string& channel_unused) { |
1183 | | // TODO(Amit): Instead of forwarding blindly to all servers, figure out the |
1184 | | // ones that have a subscription and send it to them only. |
1185 | 0 | std::vector<master::TSInformationPB> live_tservers; |
1186 | 0 | Status s = CHECK_NOTNULL(server_->tserver())->GetLiveTServers(&live_tservers); |
1187 | 0 | if (!s.ok()) { |
1188 | 0 | LOG(WARNING) << s; |
1189 | 0 | return s; |
1190 | 0 | } |
1191 | | |
1192 | 0 | vector<HostPortPB> servers; |
1193 | 0 | const auto cloud_info_pb = server_->MakeCloudInfoPB(); |
1194 | | // Queue NEW_NODE event for all the live tservers. |
1195 | 0 | for (const master::TSInformationPB& ts_info : live_tservers) { |
1196 | 0 | const auto& hostport_pb = DesiredHostPort(ts_info.registration().common(), cloud_info_pb); |
1197 | 0 | if (hostport_pb.host().empty()) { |
1198 | 0 | LOG(WARNING) << "Skipping TS since it doesn't have any rpc address: " |
1199 | 0 | << ts_info.DebugString(); |
1200 | 0 | continue; |
1201 | 0 | } |
1202 | 0 | servers.push_back(hostport_pb); |
1203 | 0 | } |
1204 | 0 | return servers; |
1205 | 0 | } |
1206 | | |
1207 | | class PublishResponseHandler { |
1208 | | public: |
1209 | | PublishResponseHandler(int32_t n, IntFunctor f) |
1210 | 0 | : num_replies_pending(n), done_functor(std::move(f)) {} |
1211 | | |
1212 | 0 | void HandleResponse(const tserver::PublishResponsePB* resp) { |
1213 | 0 | num_clients_forwarded_to.IncrementBy(resp->num_clients_forwarded_to()); |
1214 | |
|
1215 | 0 | if (0 == num_replies_pending.IncrementBy(-1)) { |
1216 | 0 | done_functor(num_clients_forwarded_to.Load()); |
1217 | 0 | } |
1218 | 0 | } |
1219 | | |
1220 | | private: |
1221 | | AtomicInt<int32_t> num_replies_pending; |
1222 | | AtomicInt<int32_t> num_clients_forwarded_to{0}; |
1223 | | IntFunctor done_functor; |
1224 | | }; |
1225 | | |
1226 | | void RedisServiceImplData::ForwardToInterestedProxies( |
1227 | 0 | const string& channel, const string& message, const IntFunctor& f) { |
1228 | 0 | auto interested_servers = GetServerAddrsForChannel(channel); |
1229 | 0 | if (!interested_servers.ok()) { |
1230 | 0 | LOG(ERROR) << "Could not get servers to forward to " << interested_servers.status(); |
1231 | 0 | return; |
1232 | 0 | } |
1233 | 0 | std::shared_ptr<PublishResponseHandler> resp_handler = |
1234 | 0 | std::make_shared<PublishResponseHandler>(interested_servers->size(), f); |
1235 | 0 | for (auto& hostport_pb : *interested_servers) { |
1236 | 0 | tserver::PublishRequestPB requestPB; |
1237 | 0 | requestPB.set_channel(channel); |
1238 | 0 | requestPB.set_message(message); |
1239 | 0 | std::shared_ptr<tserver::TabletServerServiceProxy> proxy = |
1240 | 0 | std::make_shared<tserver::TabletServerServiceProxy>( |
1241 | 0 | &client_->proxy_cache(), HostPortFromPB(hostport_pb)); |
1242 | 0 | std::shared_ptr<tserver::PublishResponsePB> responsePB = |
1243 | 0 | std::make_shared<tserver::PublishResponsePB>(); |
1244 | 0 | std::shared_ptr<yb::rpc::RpcController> rpcController = std::make_shared<rpc::RpcController>(); |
1245 | | // Hold a copy of the shared ptr in the callback to ensure that the proxy, responsePB and |
1246 | | // rpcController are valid. |
1247 | | // these self-destruct on the latter of the two events |
1248 | | // (i) exit this loop, and |
1249 | | // (ii) done with the callback. |
1250 | 0 | proxy->PublishAsync( |
1251 | 0 | requestPB, responsePB.get(), rpcController.get(), |
1252 | 0 | [resp_handler, responsePB, rpcController, proxy]() mutable { |
1253 | 0 | resp_handler->HandleResponse(responsePB.get()); |
1254 | 0 | responsePB.reset(); |
1255 | 0 | rpcController.reset(); |
1256 | 0 | proxy.reset(); |
1257 | 0 | resp_handler.reset(); |
1258 | 0 | }); |
1259 | 0 | } |
1260 | 0 | } |
1261 | | |
1262 | 0 | string MessageFor(const string& channel, const string& message) { |
1263 | 0 | vector<string> parts; |
1264 | 0 | parts.push_back(redisserver::EncodeAsBulkString("message").ToBuffer()); |
1265 | 0 | parts.push_back(redisserver::EncodeAsBulkString(channel).ToBuffer()); |
1266 | 0 | parts.push_back(redisserver::EncodeAsBulkString(message).ToBuffer()); |
1267 | 0 | return redisserver::EncodeAsArrayOfEncodedElements(parts); |
1268 | 0 | } |
1269 | | |
1270 | 0 | string PMessageFor(const string& pattern, const string& channel, const string& message) { |
1271 | 0 | vector<string> parts; |
1272 | 0 | parts.push_back(redisserver::EncodeAsBulkString("pmessage").ToBuffer()); |
1273 | 0 | parts.push_back(redisserver::EncodeAsBulkString(pattern).ToBuffer()); |
1274 | 0 | parts.push_back(redisserver::EncodeAsBulkString(channel).ToBuffer()); |
1275 | 0 | parts.push_back(redisserver::EncodeAsBulkString(message).ToBuffer()); |
1276 | 0 | return redisserver::EncodeAsArrayOfEncodedElements(parts); |
1277 | 0 | } |
1278 | | |
1279 | | int RedisServiceImplData::PublishToLocalClients( |
1280 | 0 | IsMonitorMessage mode, const string& channel, const string& message) { |
1281 | 0 | SharedLock<decltype(pubsub_mutex_)> rlock(pubsub_mutex_); |
1282 | |
|
1283 | 0 | int num_pushed_to = 0; |
1284 | | // Send the message to all the monitoring clients. |
1285 | 0 | OutboundDataPtr out; |
1286 | 0 | const std::unordered_set<Connection*>* clients = nullptr; |
1287 | 0 | if (mode == IsMonitorMessage::kTrue) { |
1288 | 0 | out = std::make_shared<yb::rpc::StringOutboundData>(message, "Monitor redis commands"); |
1289 | 0 | clients = &monitoring_clients_; |
1290 | 0 | } else { |
1291 | 0 | out = std::make_shared<yb::rpc::StringOutboundData>( |
1292 | 0 | MessageFor(channel, message), "Publishing to Channel"); |
1293 | 0 | clients = |
1294 | 0 | (channels_to_clients_.find(channel) == channels_to_clients_.end() |
1295 | 0 | ? nullptr |
1296 | 0 | : &channels_to_clients_[channel]); |
1297 | 0 | } |
1298 | 0 | if (clients) { |
1299 | | // Handle Monitor and Subscribe clients. |
1300 | 0 | for (auto connection : *clients) { |
1301 | 0 | DVLOG(3) << "Publishing to subscribed client " << connection->ToString(); |
1302 | 0 | connection->QueueOutboundData(out); |
1303 | 0 | num_pushed_to++; |
1304 | 0 | } |
1305 | 0 | } |
1306 | 0 | if (mode == IsMonitorMessage::kFalse) { |
1307 | | // Handle PSubscribe clients. |
1308 | 0 | for (auto& entry : patterns_to_clients_) { |
1309 | 0 | auto& pattern = entry.first; |
1310 | 0 | auto& clients_subscribed_to_pattern = entry.second; |
1311 | 0 | if (!RedisPatternMatch(pattern, channel, /* ignore case */ false)) { |
1312 | 0 | continue; |
1313 | 0 | } |
1314 | | |
1315 | 0 | OutboundDataPtr out = std::make_shared<yb::rpc::StringOutboundData>( |
1316 | 0 | PMessageFor(pattern, channel, message), "Publishing to Channel"); |
1317 | 0 | for (auto remote : clients_subscribed_to_pattern) { |
1318 | 0 | remote->QueueOutboundData(out); |
1319 | 0 | num_pushed_to++; |
1320 | 0 | } |
1321 | 0 | } |
1322 | 0 | } |
1323 | |
|
1324 | 0 | return num_pushed_to; |
1325 | 0 | } |
1326 | | |
1327 | 0 | void RedisServiceImplData::CleanUpSubscriptions(Connection* conn) { |
1328 | 0 | VLOG(3) << "CleanUpSubscriptions (" << conn->ToString(); |
1329 | 0 | boost::lock_guard<decltype(pubsub_mutex_)> wlock(pubsub_mutex_); |
1330 | 0 | if (monitoring_clients_.find(conn) != monitoring_clients_.end()) { |
1331 | 0 | monitoring_clients_.erase(conn); |
1332 | 0 | num_clients_monitoring_->set_value(monitoring_clients_.size()); |
1333 | 0 | } |
1334 | 0 | if (clients_to_subscriptions_.find(conn) != clients_to_subscriptions_.end()) { |
1335 | 0 | for (auto& channel : clients_to_subscriptions_[conn].channels) { |
1336 | 0 | channels_to_clients_[channel].erase(conn); |
1337 | 0 | if (channels_to_clients_[channel].empty()) { |
1338 | 0 | channels_to_clients_.erase(channel); |
1339 | 0 | } |
1340 | 0 | } |
1341 | 0 | for (auto& pattern : clients_to_subscriptions_[conn].patterns) { |
1342 | 0 | patterns_to_clients_[pattern].erase(conn); |
1343 | 0 | if (patterns_to_clients_[pattern].empty()) { |
1344 | 0 | patterns_to_clients_.erase(pattern); |
1345 | 0 | } |
1346 | 0 | } |
1347 | 0 | clients_to_subscriptions_.erase(conn); |
1348 | 0 | } |
1349 | 0 | } |
1350 | | |
1351 | 181 | Status RedisServiceImplData::Initialize() { |
1352 | 181 | boost::lock_guard<std::mutex> guard(yb_mutex_); |
1353 | 181 | if (!initialized()) { |
1354 | 181 | client_ = server_->tserver()->client(); |
1355 | | |
1356 | 181 | server_->tserver()->SetPublisher(std::bind(&RedisServiceImplData::Publish, this, _1, _2)); |
1357 | | |
1358 | 181 | tables_cache_ = std::make_shared<YBMetaDataCache>( |
1359 | 181 | client_, false /* Update roles permissions cache */); |
1360 | 181 | session_pool_.Init(client_, server_->metric_entity()); |
1361 | | |
1362 | 181 | initialized_.store(true, std::memory_order_release); |
1363 | 181 | } |
1364 | 181 | return Status::OK(); |
1365 | 181 | } |
1366 | | |
1367 | 0 | void RedisServiceImplData::CleanYBTableFromCacheForDB(const string& db) { |
1368 | 0 | tables_cache_->RemoveCachedTable(GetYBTableNameForRedisDatabase(db)); |
1369 | 0 | } |
1370 | | |
1371 | 526 | Status RedisServiceImplData::GetRedisPasswords(vector<string>* passwords) { |
1372 | 526 | MonoTime now = MonoTime::Now(); |
1373 | | |
1374 | 526 | std::lock_guard<std::mutex> lock(redis_password_mutex_); |
1375 | 526 | if (redis_cached_password_validity_expiry_.Initialized() && |
1376 | 345 | now < redis_cached_password_validity_expiry_) { |
1377 | 235 | *passwords = redis_cached_passwords_; |
1378 | 235 | return Status::OK(); |
1379 | 235 | } |
1380 | | |
1381 | 291 | RETURN_NOT_OK(client_->GetRedisPasswords(&redis_cached_passwords_)); |
1382 | 291 | *passwords = redis_cached_passwords_; |
1383 | 291 | redis_cached_password_validity_expiry_ = |
1384 | 291 | now + MonoDelta::FromMilliseconds(FLAGS_redis_password_caching_duration_ms); |
1385 | 291 | return Status::OK(); |
1386 | 291 | } |
1387 | | |
1388 | 1.40k | void RedisServiceImpl::Impl::PopulateHandlers() { |
1389 | 1.40k | auto metric_entity = data_.server_->metric_entity(); |
1390 | 1.40k | FillRedisCommands(metric_entity, std::bind(&Impl::SetupMethod, this, _1)); |
1391 | | |
1392 | | // Set up metrics for erroneous calls. |
1393 | 1.40k | data_.metrics_error_.handler_latency = YB_REDIS_METRIC(error).Instantiate(metric_entity); |
1394 | 1.40k | data_.metrics_internal_[static_cast<size_t>(OperationType::kWrite)].handler_latency = |
1395 | 1.40k | YB_REDIS_METRIC(set_internal).Instantiate(metric_entity); |
1396 | 1.40k | data_.metrics_internal_[static_cast<size_t>(OperationType::kRead)].handler_latency = |
1397 | 1.40k | YB_REDIS_METRIC(get_internal).Instantiate(metric_entity); |
1398 | 1.40k | data_.metrics_internal_[static_cast<size_t>(OperationType::kLocal)].handler_latency = |
1399 | 1.40k | data_.metrics_internal_[static_cast<size_t>(OperationType::kRead)].handler_latency; |
1400 | | |
1401 | 1.40k | auto* proto = &METRIC_redis_monitoring_clients; |
1402 | 1.40k | data_.num_clients_monitoring_ = proto->Instantiate(metric_entity, 0); |
1403 | 1.40k | } |
1404 | | |
1405 | 104k | const RedisCommandInfo* RedisServiceImpl::Impl::FetchHandler(const RedisClientCommand& cmd_args) { |
1406 | 104k | if (cmd_args.size() < 1) { |
1407 | 0 | return nullptr; |
1408 | 0 | } |
1409 | 104k | Slice cmd_name = cmd_args[0]; |
1410 | 104k | auto iter = command_name_to_info_map_.find(cmd_args[0]); |
1411 | 104k | if (iter == command_name_to_info_map_.end()) { |
1412 | 0 | YB_LOG_EVERY_N_SECS(ERROR, 60) |
1413 | 0 | << "Command " << cmd_name << " not yet supported. " |
1414 | 0 | << "Arguments: " << ToString(cmd_args) << ". " |
1415 | 0 | << "Raw: " << Slice(cmd_args[0].data(), cmd_args.back().end()).ToDebugString(); |
1416 | 0 | return nullptr; |
1417 | 0 | } |
1418 | 104k | return iter->second.get(); |
1419 | 104k | } |
1420 | | |
1421 | | RedisServiceImpl::Impl::Impl(RedisServer* server, string yb_tier_master_addresses) |
1422 | 1.40k | : data_(server, std::move(yb_tier_master_addresses)) { |
1423 | 1.40k | PopulateHandlers(); |
1424 | 1.40k | } |
1425 | | |
1426 | 104k | bool AllowedInClientMode(const RedisCommandInfo* info, RedisClientMode mode) { |
1427 | 104k | if (mode == RedisClientMode::kMonitoring) { |
1428 | 0 | static std::unordered_set<string> allowed = {"quit"}; |
1429 | 0 | return allowed.find(info->name) != allowed.end(); |
1430 | 104k | } else if (mode == RedisClientMode::kSubscribed) { |
1431 | 0 | static std::unordered_set<string> allowed = {"subscribe", "unsubscribe", "psubscribe", |
1432 | 0 | "punsubscribe", "ping", "quit"}; |
1433 | 0 | return allowed.find(info->name) != allowed.end(); |
1434 | 104k | } else { |
1435 | | // kNormal. |
1436 | 104k | return true; |
1437 | 104k | } |
1438 | 104k | } |
1439 | | |
1440 | 104k | void RedisServiceImpl::Impl::Handle(rpc::InboundCallPtr call_ptr) { |
1441 | 104k | auto call = std::static_pointer_cast<RedisInboundCall>(call_ptr); |
1442 | | |
1443 | 0 | DVLOG(2) << "Asked to handle a call " << call->ToString(); |
1444 | 104k | if (call->serialized_request().size() > FLAGS_redis_max_command_size) { |
1445 | 0 | auto message = StrCat("Size of redis command ", call->serialized_request().size(), |
1446 | 0 | ", but we only support up to length of ", FLAGS_redis_max_command_size); |
1447 | 0 | for (size_t idx = 0; idx != call->client_batch().size(); ++idx) { |
1448 | 0 | RespondWithFailure(call, idx, message); |
1449 | 0 | } |
1450 | 0 | return; |
1451 | 0 | } |
1452 | | |
1453 | | // Ensure that we have the required YBClient(s) initialized. |
1454 | 104k | if (!data_.initialized()) { |
1455 | 181 | auto status = data_.Initialize(); |
1456 | 181 | if (!status.ok()) { |
1457 | 0 | auto message = StrCat("Could not open .redis table. ", status.ToString()); |
1458 | 0 | for (size_t idx = 0; idx != call->client_batch().size(); ++idx) { |
1459 | 0 | RespondWithFailure(call, idx, message); |
1460 | 0 | } |
1461 | 0 | return; |
1462 | 0 | } |
1463 | 104k | } |
1464 | | |
1465 | | // Call could contain several commands, i.e. batch. |
1466 | | // We process them as follows: |
1467 | | // Each read commands are processed individually. |
1468 | | // Sequential write commands use single session and the same batcher. |
1469 | 104k | const auto& batch = call->client_batch(); |
1470 | 104k | auto conn = call->connection(); |
1471 | 104k | const string remote = yb::ToString(conn->remote()); |
1472 | 104k | RedisConnectionContext* conn_context = &(call->connection_context()); |
1473 | 104k | string db_name = conn_context->redis_db_to_use(); |
1474 | 104k | auto context = make_scoped_refptr<BatchContextImpl>(db_name, call, &data_); |
1475 | 209k | for (size_t idx = 0; idx != batch.size(); ++idx) { |
1476 | 104k | const RedisClientCommand& c = batch[idx]; |
1477 | | |
1478 | 104k | auto cmd_info = FetchHandler(c); |
1479 | | |
1480 | | // Handle the current redis command. |
1481 | 104k | if (cmd_info == nullptr) { |
1482 | 0 | RespondWithFailure(call, idx, "Unsupported call."); |
1483 | 0 | continue; |
1484 | 104k | } else if (!AllowedInClientMode(cmd_info, conn_context->ClientMode())) { |
1485 | 0 | RespondWithFailure( |
1486 | 0 | call, idx, Substitute( |
1487 | 0 | "Command $0 not allowed in client mode $1.", cmd_info->name, |
1488 | 0 | yb::ToString(conn_context->ClientMode()))); |
1489 | 0 | continue; |
1490 | 0 | } |
1491 | | |
1492 | 104k | size_t arity = static_cast<size_t>(std::abs(cmd_info->arity) - 1); |
1493 | 104k | bool exact_count = cmd_info->arity > 0; |
1494 | 104k | size_t passed_arguments = c.size() - 1; |
1495 | 104k | if (!exact_count && passed_arguments < arity) { |
1496 | | // -X means that the command needs >= X arguments. |
1497 | 0 | YB_LOG_EVERY_N_SECS(ERROR, 60) |
1498 | 0 | << "Requested command " << c[0] << " does not have enough arguments." |
1499 | 0 | << " At least " << arity << " expected, but " << passed_arguments << " found."; |
1500 | 0 | RespondWithFailure(call, idx, "Too few arguments."); |
1501 | 104k | } else if (exact_count && passed_arguments != arity) { |
1502 | | // X (> 0) means that the command needs exactly X arguments. |
1503 | 0 | YB_LOG_EVERY_N_SECS(ERROR, 60) |
1504 | 0 | << "Requested command " << c[0] << " has wrong number of arguments. " |
1505 | 0 | << arity << " expected, but " << passed_arguments << " found."; |
1506 | 0 | RespondWithFailure(call, idx, "Wrong number of arguments."); |
1507 | 104k | } else if (!CheckArgumentSizeOK(c)) { |
1508 | 0 | RespondWithFailure(call, idx, "Redis argument too long."); |
1509 | 104k | } else if (!CheckAuthentication(conn_context) && cmd_info->name != "auth") { |
1510 | 0 | RespondWithFailure(call, idx, "Authentication required.", "NOAUTH"); |
1511 | 104k | } else { |
1512 | 104k | if (cmd_info->name != "config" && cmd_info->name != "monitor") { |
1513 | 104k | data_.LogToMonitors(remote, db_name, c); |
1514 | 104k | } |
1515 | | |
1516 | | // Handle the call. |
1517 | 104k | cmd_info->functor(*cmd_info, idx, context.get()); |
1518 | | |
1519 | 104k | if (cmd_info->name == "select" && db_name != conn_context->redis_db_to_use()) { |
1520 | | // update context. |
1521 | 56 | context->Commit(); |
1522 | 56 | db_name = conn_context->redis_db_to_use(); |
1523 | 56 | context = make_scoped_refptr<BatchContextImpl>(db_name, call, &data_); |
1524 | 56 | } |
1525 | 104k | } |
1526 | 104k | } |
1527 | 104k | context->Commit(); |
1528 | 104k | } |
1529 | | |
1530 | | RedisServiceImpl::RedisServiceImpl(RedisServer* server, string yb_tier_master_address) |
1531 | | : RedisServerServiceIf(server->metric_entity()), |
1532 | 1.40k | impl_(new Impl(server, std::move(yb_tier_master_address))) {} |
1533 | | |
1534 | 0 | RedisServiceImpl::~RedisServiceImpl() { |
1535 | 0 | } |
1536 | | |
1537 | 104k | void RedisServiceImpl::Handle(yb::rpc::InboundCallPtr call) { |
1538 | 104k | impl_->Handle(std::move(call)); |
1539 | 104k | } |
1540 | | |
1541 | 1.40k | void RedisServiceImpl::FillEndpoints(const rpc::RpcServicePtr& service, rpc::RpcEndpointMap* map) { |
1542 | 1.40k | map->emplace(RedisInboundCall::static_serialized_remote_method(), std::make_pair(service, 0ULL)); |
1543 | 1.40k | } |
1544 | | |
1545 | | } // namespace redisserver |
1546 | | } // namespace yb |