YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/ent/src/yb/cdc/cdc_service.h
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
13
#ifndef ENT_SRC_YB_CDC_CDC_SERVICE_H
14
#define ENT_SRC_YB_CDC_CDC_SERVICE_H
15
16
#include <memory>
17
18
#include "yb/cdc/cdc_error.h"
19
#include "yb/cdc/cdc_metrics.h"
20
#include "yb/cdc/cdc_producer.h"
21
#include "yb/cdc/cdc_service.proxy.h"
22
#include "yb/cdc/cdc_service.service.h"
23
#include "yb/cdc/cdc_util.h"
24
#include "yb/client/async_initializer.h"
25
26
#include "yb/common/schema.h"
27
28
#include "yb/master/master_client.fwd.h"
29
30
#include "yb/rpc/rpc.h"
31
#include "yb/rpc/rpc_context.h"
32
#include "yb/rpc/rpc_controller.h"
33
34
#include "yb/util/net/net_util.h"
35
#include "yb/util/service_util.h"
36
37
namespace yb {
38
39
namespace client {
40
41
class TableHandle;
42
43
}
44
45
namespace tserver {
46
47
class TSTabletManager;
48
49
}
50
51
namespace cdc {
52
53
typedef std::unordered_map<HostPort, std::shared_ptr<CDCServiceProxy>, HostPortHash>
54
    CDCServiceProxyMap;
55
56
static const char* const kRecordType = "record_type";
57
static const char* const kRecordFormat = "record_format";
58
static const char* const kRetentionSec = "retention_sec";
59
static const char* const kSourceType = "source_type";
60
static const char* const kCheckpointType = "checkpoint_type";
61
static const char* const kIdType = "id_type";
62
static const char* const kNamespaceId = "NAMESPACEID";
63
static const char* const kTableId = "TABLEID";
64
65
struct TabletCheckpoint {
66
  OpId op_id;
67
  // Timestamp at which the op ID was last updated.
68
  CoarseTimePoint last_update_time;
69
70
4.21k
  bool ExpiredAt(std::chrono::milliseconds duration, std::chrono::time_point<CoarseMonoClock> now) {
71
4.21k
    return (now - last_update_time) > duration;
72
4.21k
  }
73
};
74
75
class CDCServiceImpl : public CDCServiceIf {
76
 public:
77
  CDCServiceImpl(tserver::TSTabletManager* tablet_manager,
78
                 const scoped_refptr<MetricEntity>& metric_entity_server,
79
                 MetricRegistry* metric_registry);
80
81
  CDCServiceImpl(const CDCServiceImpl&) = delete;
82
  void operator=(const CDCServiceImpl&) = delete;
83
84
  ~CDCServiceImpl();
85
86
  void CreateCDCStream(const CreateCDCStreamRequestPB* req,
87
                       CreateCDCStreamResponsePB* resp,
88
                       rpc::RpcContext rpc) override;
89
  void DeleteCDCStream(const DeleteCDCStreamRequestPB *req,
90
                       DeleteCDCStreamResponsePB* resp,
91
                       rpc::RpcContext rpc) override;
92
  void ListTablets(const ListTabletsRequestPB *req,
93
                   ListTabletsResponsePB* resp,
94
                   rpc::RpcContext rpc) override;
95
  void GetChanges(const GetChangesRequestPB* req,
96
                  GetChangesResponsePB* resp,
97
                  rpc::RpcContext rpc) override;
98
  void GetCheckpoint(const GetCheckpointRequestPB* req,
99
                     GetCheckpointResponsePB* resp,
100
                     rpc::RpcContext rpc) override;
101
102
  // Update peers in other tablet servers about the latest minimum applied cdc index for a specific
103
  // tablet.
104
  void UpdateCdcReplicatedIndex(const UpdateCdcReplicatedIndexRequestPB* req,
105
                                UpdateCdcReplicatedIndexResponsePB* resp,
106
                                rpc::RpcContext rpc) override;
107
108
  void GetLatestEntryOpId(const GetLatestEntryOpIdRequestPB* req,
109
                          GetLatestEntryOpIdResponsePB* resp,
110
                          rpc::RpcContext context) override;
111
112
  void BootstrapProducer(const BootstrapProducerRequestPB* req,
113
                         BootstrapProducerResponsePB* resp,
114
                         rpc::RpcContext rpc) override;
115
116
  void GetCDCDBStreamInfo(const GetCDCDBStreamInfoRequestPB* req,
117
                          GetCDCDBStreamInfoResponsePB* resp,
118
                          rpc::RpcContext context) override;
119
120
  Result<SetCDCCheckpointResponsePB> SetCDCCheckpoint(
121
      const SetCDCCheckpointRequestPB& req, CoarseTimePoint deadline) override;
122
123
  void Shutdown() override;
124
125
  // Used in cdc_service-int-test.cc.
126
  std::shared_ptr<CDCTabletMetrics> GetCDCTabletMetrics(
127
      const ProducerTabletInfo& producer,
128
      std::shared_ptr<tablet::TabletPeer> tablet_peer = nullptr);
129
130
  void UpdateCDCTabletMetrics(const GetChangesResponsePB* resp,
131
                              const ProducerTabletInfo& producer_tablet,
132
                              const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
133
                              const OpId& op_id,
134
                              int64_t last_readable_index);
135
136
0
  std::shared_ptr<CDCServerMetrics> GetCDCServerMetrics() {
137
0
    return server_metrics_;
138
0
  }
139
140
  // Returns true if this server has received a GetChanges call.
141
  bool CDCEnabled();
142
143
 private:
144
  FRIEND_TEST(CDCServiceTestMultipleServersOneTablet, TestMetricsAfterServerFailure);
145
146
  class Impl;
147
148
  template <class ReqType, class RespType>
149
  bool CheckOnline(const ReqType* req, RespType* resp, rpc::RpcContext* rpc);
150
151
  Result<OpId> GetLastCheckpoint(const ProducerTabletInfo& producer_tablet,
152
                                 const client::YBSessionPtr& session);
153
154
  Result<std::vector<pair<std::string, std::string>>> GetDBStreamInfo(
155
          const std::string& db_stream_id,
156
          const client::YBSessionPtr& session);
157
158
  Result<std::string> GetCdcStreamId(const ProducerTabletInfo& producer_tablet,
159
                                     const std::shared_ptr<client::YBSession>& session);
160
161
  CHECKED_STATUS UpdateCheckpoint(const ProducerTabletInfo& producer_tablet,
162
                                  const OpId& sent_op_id,
163
                                  const OpId& commit_op_id,
164
                                  const client::YBSessionPtr& session,
165
                                  uint64_t last_record_hybrid_time);
166
167
  Result<google::protobuf::RepeatedPtrField<master::TabletLocationsPB>> GetTablets(
168
      const CDCStreamId& stream_id);
169
170
  Status CreateCDCStreamForTable(
171
      const TableId& table_id,
172
      const std::unordered_map<std::string, std::string>& options,
173
      const CDCStreamId& stream_id);
174
175
  void RollbackPartialCreate(const CDCCreationState& creation_state);
176
177
  Result<NamespaceId> GetNamespaceId(const std::string& ns_name);
178
179
  Result<std::shared_ptr<StreamMetadata>> GetStream(const std::string& stream_id);
180
181
  std::shared_ptr<StreamMetadata> GetStreamMetadataFromCache(const std::string& stream_id);
182
  void AddStreamMetadataToCache(const std::string& stream_id,
183
                                const std::shared_ptr<StreamMetadata>& stream_metadata);
184
185
  CHECKED_STATUS CheckTabletValidForStream(const ProducerTabletInfo& producer_info);
186
187
  void TabletLeaderGetChanges(const GetChangesRequestPB* req,
188
                              GetChangesResponsePB* resp,
189
                              std::shared_ptr<rpc::RpcContext> context,
190
                              std::shared_ptr<tablet::TabletPeer> peer);
191
192
  void TabletLeaderGetCheckpoint(const GetCheckpointRequestPB* req,
193
                                 GetCheckpointResponsePB* resp,
194
                                 rpc::RpcContext* context,
195
                                 const std::shared_ptr<tablet::TabletPeer>& peer);
196
197
  Result<OpId> TabletLeaderLatestEntryOpId(const TabletId& tablet_id);
198
199
  Result<client::internal::RemoteTabletPtr> GetRemoteTablet(const TabletId& tablet_id);
200
  Result<client::internal::RemoteTabletServer *> GetLeaderTServer(const TabletId& tablet_id);
201
  CHECKED_STATUS GetTServers(const TabletId& tablet_id,
202
                             std::vector<client::internal::RemoteTabletServer*>* servers);
203
204
  std::shared_ptr<CDCServiceProxy> GetCDCServiceProxy(client::internal::RemoteTabletServer* ts);
205
206
  OpId GetMinSentCheckpointForTablet(const std::string& tablet_id);
207
208
  std::shared_ptr<MemTracker> GetMemTracker(
209
      const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
210
      const ProducerTabletInfo& producer_info);
211
212
  OpId GetMinAppliedCheckpointForTablet(const std::string& tablet_id,
213
                                        const client::YBSessionPtr& session);
214
215
  CHECKED_STATUS UpdatePeersCdcMinReplicatedIndex(const TabletId& tablet_id, int64_t min_index,
216
                                                  int64_t min_term = -1);
217
218
  void ComputeLagMetric(int64_t last_replicated_micros, int64_t metric_last_timestamp_micros,
219
                        int64_t cdc_state_last_replication_time_micros,
220
                        scoped_refptr<AtomicGauge<int64_t>> metric);
221
222
  // Update metrics async_replication_sent_lag_micros and async_replication_committed_lag_micros.
223
  // Called periodically default 1s.
224
  void UpdateLagMetrics();
225
226
  // This method is used to read the cdc_state table to find the minimum replicated index for each
227
  // tablet and then update the peers' log objects. Also used to update lag metrics.
228
  void UpdatePeersAndMetrics();
229
230
  MicrosTime GetLastReplicatedTime(const std::shared_ptr<tablet::TabletPeer>& tablet_peer);
231
232
  bool ShouldUpdateLagMetrics(MonoTime time_since_update_metrics);
233
234
  Result<std::shared_ptr<client::TableHandle>> GetCdcStateTable() EXCLUDES(mutex_);
235
236
  void RefreshCdcStateTable() EXCLUDES(mutex_);
237
238
  Status RefreshCacheOnFail(const Status& s) EXCLUDES(mutex_);
239
240
  client::YBClient* client();
241
242
  void CreateEntryInCdcStateTable(
243
      const std::shared_ptr<client::TableHandle>& cdc_state_table,
244
      std::vector<ProducerTabletInfo>* producer_entries_modified,
245
      std::vector<client::YBOperationPtr>* ops,
246
      const CDCStreamId& stream_id,
247
      const TableId& table_id,
248
      const TabletId& tablet_id);
249
250
  Status CreateCDCStreamForNamespace(
251
      const CreateCDCStreamRequestPB* req,
252
      CreateCDCStreamResponsePB* resp,
253
      CoarseTimePoint deadline);
254
255
  rpc::Rpcs rpcs_;
256
257
  tserver::TSTabletManager* tablet_manager_;
258
259
  MetricRegistry* metric_registry_;
260
261
  std::shared_ptr<CDCServerMetrics> server_metrics_;
262
263
  // Used to protect tablet_checkpoints_ and stream_metadata_ maps.
264
  mutable rw_spinlock mutex_;
265
266
  std::unique_ptr<Impl> impl_;
267
268
  std::shared_ptr<client::TableHandle> cdc_state_table_ GUARDED_BY(mutex_);
269
270
  std::unordered_map<std::string, std::shared_ptr<StreamMetadata>> stream_metadata_
271
      GUARDED_BY(mutex_);
272
273
  // Map of HostPort -> CDCServiceProxy. This is used to redirect requests to tablet leader's
274
  // CDC service proxy.
275
  CDCServiceProxyMap cdc_service_map_ GUARDED_BY(mutex_);
276
277
  // Thread with a few functions:
278
  //
279
  // Read the cdc_state table and get the minimum checkpoint for each tablet
280
  // and then, for each tablet this tserver is a leader, update the log minimum cdc replicated
281
  // index so we can use this information to correctly keep log files that are needed so we
282
  // can continue replicating cdc records. This runs periodically to handle
283
  // leadership changes (FLAGS_update_min_cdc_indices_interval_secs).
284
  // TODO(hector): It would be better to do this update only when a local peer becomes a leader.
285
  //
286
  // Periodically update lag metrics (FLAGS_update_metrics_interval_ms).
287
  std::unique_ptr<std::thread> update_peers_and_metrics_thread_;
288
289
  // True when this service is stopped. Used to inform
290
  // get_minimum_checkpoints_and_update_peers_thread_ that it should exit.
291
  bool cdc_service_stopped_ GUARDED_BY(mutex_){false};
292
293
  // True when this service has received a GetChanges request on a valid replication stream.
294
  std::atomic<bool> cdc_enabled_{false};
295
};
296
297
}  // namespace cdc
298
}  // namespace yb
299
300
#endif  // ENT_SRC_YB_CDC_CDC_SERVICE_H