YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/tserver/tablet_service.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/tserver/tablet_service.h"
34
35
#include <algorithm>
36
#include <memory>
37
#include <string>
38
#include <vector>
39
40
#include <glog/logging.h>
41
42
#include "yb/client/forward_rpc.h"
43
#include "yb/client/transaction.h"
44
#include "yb/client/transaction_pool.h"
45
46
#include "yb/common/ql_rowblock.h"
47
#include "yb/common/ql_value.h"
48
#include "yb/common/row_mark.h"
49
#include "yb/common/schema.h"
50
#include "yb/common/wire_protocol.h"
51
#include "yb/consensus/leader_lease.h"
52
#include "yb/consensus/consensus.pb.h"
53
#include "yb/consensus/raft_consensus.h"
54
55
#include "yb/docdb/cql_operation.h"
56
#include "yb/docdb/pgsql_operation.h"
57
58
#include "yb/gutil/bind.h"
59
#include "yb/gutil/casts.h"
60
#include "yb/gutil/stl_util.h"
61
#include "yb/gutil/stringprintf.h"
62
#include "yb/gutil/strings/escaping.h"
63
64
#include "yb/rpc/thread_pool.h"
65
66
#include "yb/server/hybrid_clock.h"
67
68
#include "yb/tablet/abstract_tablet.h"
69
#include "yb/tablet/metadata.pb.h"
70
#include "yb/tablet/operations/change_metadata_operation.h"
71
#include "yb/tablet/operations/split_operation.h"
72
#include "yb/tablet/operations/truncate_operation.h"
73
#include "yb/tablet/operations/update_txn_operation.h"
74
#include "yb/tablet/operations/write_operation.h"
75
#include "yb/tablet/read_result.h"
76
#include "yb/tablet/tablet.h"
77
#include "yb/tablet/tablet_bootstrap_if.h"
78
#include "yb/tablet/tablet_metadata.h"
79
#include "yb/tablet/tablet_metrics.h"
80
#include "yb/tablet/transaction_participant.h"
81
#include "yb/tablet/write_query.h"
82
83
#include "yb/tserver/read_query.h"
84
#include "yb/tserver/service_util.h"
85
#include "yb/tserver/tablet_server.h"
86
#include "yb/tserver/ts_tablet_manager.h"
87
#include "yb/tserver/tserver_error.h"
88
89
#include "yb/util/crc.h"
90
#include "yb/util/debug-util.h"
91
#include "yb/util/debug/long_operation_tracker.h"
92
#include "yb/util/debug/trace_event.h"
93
#include "yb/util/faststring.h"
94
#include "yb/util/flag_tags.h"
95
#include "yb/util/format.h"
96
#include "yb/util/logging.h"
97
#include "yb/util/math_util.h"
98
#include "yb/util/mem_tracker.h"
99
#include "yb/util/metrics.h"
100
#include "yb/util/monotime.h"
101
#include "yb/util/random_util.h"
102
#include "yb/util/scope_exit.h"
103
#include "yb/util/size_literals.h"
104
#include "yb/util/status.h"
105
#include "yb/util/status_callback.h"
106
#include "yb/util/status_format.h"
107
#include "yb/util/status_log.h"
108
#include "yb/util/string_util.h"
109
#include "yb/util/trace.h"
110
111
#include "yb/yql/pgwrapper/ysql_upgrade.h"
112
113
using namespace std::literals;  // NOLINT
114
115
DEFINE_int32(scanner_default_batch_size_bytes, 64 * 1024,
116
             "The default size for batches of scan results");
117
TAG_FLAG(scanner_default_batch_size_bytes, advanced);
118
TAG_FLAG(scanner_default_batch_size_bytes, runtime);
119
120
DEFINE_int32(scanner_max_batch_size_bytes, 8 * 1024 * 1024,
121
             "The maximum batch size that a client may request for "
122
             "scan results.");
123
TAG_FLAG(scanner_max_batch_size_bytes, advanced);
124
TAG_FLAG(scanner_max_batch_size_bytes, runtime);
125
126
DEFINE_int32(scanner_batch_size_rows, 100,
127
             "The number of rows to batch for servicing scan requests.");
128
TAG_FLAG(scanner_batch_size_rows, advanced);
129
TAG_FLAG(scanner_batch_size_rows, runtime);
130
131
// Fault injection flags.
132
DEFINE_test_flag(int32, scanner_inject_latency_on_each_batch_ms, 0,
133
                 "If set, the scanner will pause the specified number of milliesconds "
134
                 "before reading each batch of data on the tablet server.");
135
136
DEFINE_int32(max_wait_for_safe_time_ms, 5000,
137
             "Maximum time in milliseconds to wait for the safe time to advance when trying to "
138
             "scan at the given hybrid_time.");
139
140
DEFINE_int32(num_concurrent_backfills_allowed, -1,
141
             "Maximum number of concurrent backfill jobs that is allowed to run.");
142
143
DEFINE_test_flag(bool, tserver_noop_read_write, false, "Respond NOOP to read/write.");
144
145
DEFINE_uint64(index_backfill_upperbound_for_user_enforced_txn_duration_ms, 65000,
146
              "For Non-Txn tables, it is impossible to know at the tservers "
147
              "whether or not an 'old transaction' is still active. To avoid "
148
              "having such old transactions, we assume a bound on the duration "
149
              "of such transactions (during the backfill process) and wait "
150
              "it out. This flag denotes a conservative upper bound on the "
151
              "duration of such user enforced transactions.");
152
TAG_FLAG(index_backfill_upperbound_for_user_enforced_txn_duration_ms, evolving);
153
TAG_FLAG(index_backfill_upperbound_for_user_enforced_txn_duration_ms, runtime);
154
155
DEFINE_int32(index_backfill_additional_delay_before_backfilling_ms, 0,
156
             "Operations that are received by the tserver, and have decided how "
157
             "the indexes need to be updated (based on the IndexPermission), will "
158
             "not be added to the list of current transactions until they are "
159
             "replicated/applied. This delay allows for the GetSafeTime method "
160
             "to wait for such operations to be replicated/applied. Ideally, this "
161
             "value should be set to be something larger than the raft-heartbeat-interval "
162
             "but can be as high as the client_rpc_timeout if we want to be more conservative.");
163
TAG_FLAG(index_backfill_additional_delay_before_backfilling_ms, evolving);
164
TAG_FLAG(index_backfill_additional_delay_before_backfilling_ms, runtime);
165
166
DEFINE_int32(index_backfill_wait_for_old_txns_ms, 0,
167
             "Index backfill needs to wait for transactions that started before the "
168
             "WRITE_AND_DELETE phase to commit or abort before choosing a time for "
169
             "backfilling the index. This is the max time that the GetSafeTime call will "
170
             "wait for, before it resorts to attempt aborting old transactions. This is "
171
             "necessary to guard against the pathological active transaction that never "
172
             "commits from blocking the index backfill forever.");
173
TAG_FLAG(index_backfill_wait_for_old_txns_ms, evolving);
174
TAG_FLAG(index_backfill_wait_for_old_txns_ms, runtime);
175
176
DEFINE_test_flag(double, respond_write_failed_probability, 0.0,
177
                 "Probability to respond that write request is failed");
178
179
DEFINE_test_flag(bool, rpc_delete_tablet_fail, false, "Should delete tablet RPC fail.");
180
181
DECLARE_bool(disable_alter_vs_write_mutual_exclusion);
182
DECLARE_uint64(max_clock_skew_usec);
183
DECLARE_uint64(transaction_min_running_check_interval_ms);
184
DECLARE_int64(transaction_rpc_timeout_ms);
185
186
DEFINE_test_flag(int32, txn_status_table_tablet_creation_delay_ms, 0,
187
                 "Extra delay to slowdown creation of transaction status table tablet.");
188
189
DEFINE_test_flag(int32, leader_stepdown_delay_ms, 0,
190
                 "Amount of time to delay before starting a leader stepdown change.");
191
192
DEFINE_test_flag(int32, alter_schema_delay_ms, 0, "Delay before processing AlterSchema.");
193
194
DEFINE_test_flag(bool, disable_post_split_tablet_rbs_check, false,
195
                 "If true, bypass any checks made to reject remote boostrap requests for post "
196
                 "split tablets whose parent tablets are still present.");
197
198
DEFINE_test_flag(double, fail_tablet_split_probability, 0.0,
199
                 "Probability of failing in TabletServiceAdminImpl::SplitTablet.");
200
201
DEFINE_test_flag(bool, pause_tserver_get_split_key, false,
202
                 "Pause before processing a GetSplitKey request.");
203
204
DECLARE_int32(heartbeat_interval_ms);
205
206
DECLARE_int32(ysql_transaction_abort_timeout_ms);
207
208
DEFINE_test_flag(bool, fail_alter_schema_after_abort_transactions, false,
209
                 "If true, setup an error status in AlterSchema and respond success to rpc call. "
210
                 "This failure should not cause the TServer to crash but "
211
                 "instead return an error message on the YSQL connection.");
212
213
double TEST_delay_create_transaction_probability = 0;
214
215
namespace yb {
216
namespace tserver {
217
218
using client::internal::ForwardReadRpc;
219
using client::internal::ForwardWriteRpc;
220
using consensus::ChangeConfigRequestPB;
221
using consensus::ChangeConfigResponsePB;
222
using consensus::Consensus;
223
using consensus::CONSENSUS_CONFIG_ACTIVE;
224
using consensus::CONSENSUS_CONFIG_COMMITTED;
225
using consensus::ConsensusConfigType;
226
using consensus::ConsensusRequestPB;
227
using consensus::ConsensusResponsePB;
228
using consensus::GetLastOpIdRequestPB;
229
using consensus::GetNodeInstanceRequestPB;
230
using consensus::GetNodeInstanceResponsePB;
231
using consensus::LeaderLeaseStatus;
232
using consensus::LeaderStepDownRequestPB;
233
using consensus::LeaderStepDownResponsePB;
234
using consensus::RaftPeerPB;
235
using consensus::RunLeaderElectionRequestPB;
236
using consensus::RunLeaderElectionResponsePB;
237
using consensus::StartRemoteBootstrapRequestPB;
238
using consensus::StartRemoteBootstrapResponsePB;
239
using consensus::UnsafeChangeConfigRequestPB;
240
using consensus::UnsafeChangeConfigResponsePB;
241
using consensus::VoteRequestPB;
242
using consensus::VoteResponsePB;
243
244
using std::unique_ptr;
245
using google::protobuf::RepeatedPtrField;
246
using rpc::RpcContext;
247
using std::shared_ptr;
248
using std::vector;
249
using std::string;
250
using strings::Substitute;
251
using tablet::ChangeMetadataOperation;
252
using tablet::Tablet;
253
using tablet::TabletPeer;
254
using tablet::TabletPeerPtr;
255
using tablet::TabletStatusPB;
256
using tablet::TruncateOperation;
257
using tablet::OperationCompletionCallback;
258
using tablet::WriteOperation;
259
260
namespace {
261
262
10.4M
Result<std::shared_ptr<consensus::RaftConsensus>> GetConsensus(const TabletPeerPtr& tablet_peer) {
263
10.4M
  auto result = tablet_peer->shared_raft_consensus();
264
10.4M
  if (!result) {
265
0
    Status s = STATUS(ServiceUnavailable, "Consensus unavailable. Tablet not running");
266
0
    return s.CloneAndAddErrorCode(TabletServerError(TabletServerErrorPB::TABLET_NOT_RUNNING));
267
0
  }
268
10.4M
  return result;
269
10.4M
}
270
271
template<class RespClass>
272
std::shared_ptr<consensus::RaftConsensus> GetConsensusOrRespond(const TabletPeerPtr& tablet_peer,
273
                                                                RespClass* resp,
274
557
                                                                rpc::RpcContext* context) {
275
557
  auto result = GetConsensus(tablet_peer);
276
557
  if (!result.ok()) {
277
0
    SetupErrorAndRespond(resp->mutable_error(), result.status(), context);
278
0
  }
279
557
  return result.get();
280
557
}
281
282
template<class RespClass>
283
bool GetConsensusOrRespond(const TabletPeerPtr& tablet_peer,
284
                           RespClass* resp,
285
                           rpc::RpcContext* context,
286
10.4M
                           shared_ptr<Consensus>* consensus) {
287
10.4M
  auto result = GetConsensus(tablet_peer);
288
10.4M
  if (!result.ok()) {
289
0
    SetupErrorAndRespond(resp->mutable_error(), result.status(), context);
290
0
    return false;
291
0
  }
292
10.4M
  return (*consensus = result.get()) != nullptr;
293
10.4M
}
tablet_service.cc:_ZN2yb7tserver12_GLOBAL__N_121GetConsensusOrRespondINS_9consensus19ConsensusResponsePBEEEbRKNSt3__110shared_ptrINS_6tablet10TabletPeerEEEPT_PNS_3rpc10RpcContextEPNS6_INS3_9ConsensusEEE
Line
Count
Source
286
10.2M
                           shared_ptr<Consensus>* consensus) {
287
10.2M
  auto result = GetConsensus(tablet_peer);
288
10.2M
  if (!result.ok()) {
289
0
    SetupErrorAndRespond(resp->mutable_error(), result.status(), context);
290
0
    return false;
291
0
  }
292
10.2M
  return (*consensus = result.get()) != nullptr;
293
10.2M
}
tablet_service.cc:_ZN2yb7tserver12_GLOBAL__N_121GetConsensusOrRespondINS_9consensus14VoteResponsePBEEEbRKNSt3__110shared_ptrINS_6tablet10TabletPeerEEEPT_PNS_3rpc10RpcContextEPNS6_INS3_9ConsensusEEE
Line
Count
Source
286
125k
                           shared_ptr<Consensus>* consensus) {
287
125k
  auto result = GetConsensus(tablet_peer);
288
125k
  if (!result.ok()) {
289
0
    SetupErrorAndRespond(resp->mutable_error(), result.status(), context);
290
0
    return false;
291
0
  }
292
125k
  return (*consensus = result.get()) != nullptr;
293
125k
}
tablet_service.cc:_ZN2yb7tserver12_GLOBAL__N_121GetConsensusOrRespondINS_9consensus22ChangeConfigResponsePBEEEbRKNSt3__110shared_ptrINS_6tablet10TabletPeerEEEPT_PNS_3rpc10RpcContextEPNS6_INS3_9ConsensusEEE
Line
Count
Source
286
2.50k
                           shared_ptr<Consensus>* consensus) {
287
2.50k
  auto result = GetConsensus(tablet_peer);
288
2.50k
  if (!result.ok()) {
289
0
    SetupErrorAndRespond(resp->mutable_error(), result.status(), context);
290
0
    return false;
291
0
  }
292
2.50k
  return (*consensus = result.get()) != nullptr;
293
2.50k
}
Unexecuted instantiation: tablet_service.cc:_ZN2yb7tserver12_GLOBAL__N_121GetConsensusOrRespondINS_9consensus28UnsafeChangeConfigResponsePBEEEbRKNSt3__110shared_ptrINS_6tablet10TabletPeerEEEPT_PNS_3rpc10RpcContextEPNS6_INS3_9ConsensusEEE
tablet_service.cc:_ZN2yb7tserver12_GLOBAL__N_121GetConsensusOrRespondINS_9consensus27RunLeaderElectionResponsePBEEEbRKNSt3__110shared_ptrINS_6tablet10TabletPeerEEEPT_PNS_3rpc10RpcContextEPNS6_INS3_9ConsensusEEE
Line
Count
Source
286
32.8k
                           shared_ptr<Consensus>* consensus) {
287
32.8k
  auto result = GetConsensus(tablet_peer);
288
32.8k
  if (!result.ok()) {
289
0
    SetupErrorAndRespond(resp->mutable_error(), result.status(), context);
290
0
    return false;
291
0
  }
292
32.8k
  return (*consensus = result.get()) != nullptr;
293
32.8k
}
tablet_service.cc:_ZN2yb7tserver12_GLOBAL__N_121GetConsensusOrRespondINS_9consensus28LeaderElectionLostResponsePBEEEbRKNSt3__110shared_ptrINS_6tablet10TabletPeerEEEPT_PNS_3rpc10RpcContextEPNS6_INS3_9ConsensusEEE
Line
Count
Source
286
44
                           shared_ptr<Consensus>* consensus) {
287
44
  auto result = GetConsensus(tablet_peer);
288
44
  if (!result.ok()) {
289
0
    SetupErrorAndRespond(resp->mutable_error(), result.status(), context);
290
0
    return false;
291
0
  }
292
44
  return (*consensus = result.get()) != nullptr;
293
44
}
tablet_service.cc:_ZN2yb7tserver12_GLOBAL__N_121GetConsensusOrRespondINS_9consensus24LeaderStepDownResponsePBEEEbRKNSt3__110shared_ptrINS_6tablet10TabletPeerEEEPT_PNS_3rpc10RpcContextEPNS6_INS3_9ConsensusEEE
Line
Count
Source
286
5.87k
                           shared_ptr<Consensus>* consensus) {
287
5.87k
  auto result = GetConsensus(tablet_peer);
288
5.87k
  if (!result.ok()) {
289
0
    SetupErrorAndRespond(resp->mutable_error(), result.status(), context);
290
0
    return false;
291
0
  }
292
5.87k
  return (*consensus = result.get()) != nullptr;
293
5.87k
}
tablet_service.cc:_ZN2yb7tserver12_GLOBAL__N_121GetConsensusOrRespondINS_9consensus27GetConsensusStateResponsePBEEEbRKNSt3__110shared_ptrINS_6tablet10TabletPeerEEEPT_PNS_3rpc10RpcContextEPNS6_INS3_9ConsensusEEE
Line
Count
Source
286
3.68k
                           shared_ptr<Consensus>* consensus) {
287
3.68k
  auto result = GetConsensus(tablet_peer);
288
3.68k
  if (!result.ok()) {
289
0
    SetupErrorAndRespond(resp->mutable_error(), result.status(), context);
290
0
    return false;
291
0
  }
292
3.68k
  return (*consensus = result.get()) != nullptr;
293
3.68k
}
294
295
} // namespace
296
297
template<class Resp>
298
bool TabletServiceImpl::CheckWriteThrottlingOrRespond(
299
1.34M
    double score, tablet::TabletPeer* tablet_peer, Resp* resp, rpc::RpcContext* context) {
300
  // Check for memory pressure; don't bother doing any additional work if we've
301
  // exceeded the limit.
302
1.34M
  auto status = CheckWriteThrottling(score, tablet_peer);
303
1.34M
  if (!status.ok()) {
304
22
    SetupErrorAndRespond(resp->mutable_error(), status, context);
305
22
    return false;
306
22
  }
307
308
1.34M
  return true;
309
1.34M
}
310
311
typedef ListTabletsResponsePB::StatusAndSchemaPB StatusAndSchemaPB;
312
313
class WriteQueryCompletionCallback {
314
 public:
315
  WriteQueryCompletionCallback(
316
      tablet::TabletPeerPtr tablet_peer,
317
      std::shared_ptr<rpc::RpcContext> context,
318
      WriteResponsePB* response,
319
      tablet::WriteQuery* query,
320
      const server::ClockPtr& clock,
321
      bool trace = false)
322
      : tablet_peer_(std::move(tablet_peer)),
323
        context_(std::move(context)),
324
        response_(response),
325
        query_(query),
326
        clock_(clock),
327
        include_trace_(trace),
328
1.34M
        trace_(include_trace_ ? Trace::CurrentTrace() : nullptr) {}
329
330
1.34M
  void operator()(Status status) const {
331
444
    VLOG(1) << __PRETTY_FUNCTION__ << " completing with status " << status;
332
    // When we don't need to return any data, we could return success on duplicate request.
333
1.34M
    if (status.IsAlreadyPresent() &&
334
1
        query_->ql_write_ops()->empty() &&
335
1
        query_->pgsql_write_ops()->empty() &&
336
1
        query_->client_request()->redis_write_batch().empty()) {
337
1
      status = Status::OK();
338
1
    }
339
340
1.34M
    TRACE("Write completing with status $0", yb::ToString(status));
341
342
1.34M
    if (!status.ok()) {
343
80.5k
      LOG(INFO) << tablet_peer_->LogPrefix() << "Write failed: " << status;
344
80.5k
      if (include_trace_ && trace_) {
345
0
        response_->set_trace_buffer(trace_->DumpToString(true));
346
0
      }
347
80.5k
      SetupErrorAndRespond(get_error(), status, context_.get());
348
80.5k
      return;
349
80.5k
    }
350
351
    // Retrieve the rowblocks returned from the QL write operations and return them as RPC
352
    // sidecars. Populate the row schema also.
353
1.26M
    faststring rows_data;
354
273
    for (const auto& ql_write_op : *query_->ql_write_ops()) {
355
273
      const auto& ql_write_req = ql_write_op->request();
356
273
      auto* ql_write_resp = ql_write_op->response();
357
273
      const QLRowBlock* rowblock = ql_write_op->rowblock();
358
273
      SchemaToColumnPBs(rowblock->schema(), ql_write_resp->mutable_column_schemas());
359
273
      rows_data.clear();
360
273
      rowblock->Serialize(ql_write_req.client(), &rows_data);
361
273
      ql_write_resp->set_rows_data_sidecar(
362
273
          narrow_cast<int32_t>(context_->AddRpcSidecar(rows_data)));
363
273
    }
364
365
1.26M
    if (!query_->pgsql_write_ops()->empty()) {
366
      // Retrieve the resultset returned from the PGSQL write operations and return them as RPC
367
      // sidecars.
368
369
237k
      size_t sidecars_size = 0;
370
2.10M
      for (const auto& pgsql_write_op : *query_->pgsql_write_ops()) {
371
2.10M
        sidecars_size += pgsql_write_op->result_buffer().size();
372
2.10M
      }
373
374
237k
      if (sidecars_size != 0) {
375
236k
        context_->ReserveSidecarSpace(sidecars_size);
376
2.10M
        for (const auto& pgsql_write_op : *query_->pgsql_write_ops()) {
377
2.10M
          auto* pgsql_write_resp = pgsql_write_op->response();
378
2.10M
          const faststring& result_buffer = pgsql_write_op->result_buffer();
379
2.10M
          if (!result_buffer.empty()) {
380
2.10M
            pgsql_write_resp->set_rows_data_sidecar(
381
2.10M
                narrow_cast<int32_t>(context_->AddRpcSidecar(result_buffer)));
382
2.10M
          }
383
2.10M
        }
384
236k
      }
385
237k
    }
386
387
1.26M
    if (include_trace_ && trace_) {
388
0
      response_->set_trace_buffer(trace_->DumpToString(true));
389
0
    }
390
1.26M
    response_->set_propagated_hybrid_time(clock_->Now().ToUint64());
391
1.26M
    context_->RespondSuccess();
392
18.4E
    VLOG(1) << __PRETTY_FUNCTION__ << " RespondedSuccess";
393
1.26M
  }
394
395
 private:
396
80.6k
  TabletServerErrorPB* get_error() const {
397
80.6k
    return response_->mutable_error();
398
80.6k
  }
399
400
  tablet::TabletPeerPtr tablet_peer_;
401
  const std::shared_ptr<rpc::RpcContext> context_;
402
  WriteResponsePB* const response_;
403
  tablet::WriteQuery* const query_;
404
  server::ClockPtr clock_;
405
  const bool include_trace_;
406
  scoped_refptr<Trace> trace_;
407
};
408
409
// Checksums the scan result.
410
class ScanResultChecksummer {
411
 public:
412
2.81k
  ScanResultChecksummer() {}
413
414
1.72M
  void HandleRow(const Schema& schema, const QLTableRow& row) {
415
1.72M
    QLValue value;
416
1.72M
    buffer_.clear();
417
7.81M
    for (uint32_t col_index = 0; col_index != schema.num_columns(); ++col_index) {
418
6.08M
      auto status = row.GetValue(schema.column_id(col_index), &value);
419
6.08M
      if (!status.ok()) {
420
0
        LOG(WARNING) << "Column " << schema.column_id(col_index)
421
0
                     << " not found in " << row.ToString();
422
0
        continue;
423
0
      }
424
6.08M
      buffer_.append(pointer_cast<const char*>(&col_index), sizeof(col_index));
425
6.08M
      if (schema.column(col_index).is_nullable()) {
426
3.04M
        uint8_t defined = value.IsNull() ? 0 : 1;
427
3.04M
        buffer_.append(pointer_cast<const char*>(&defined), sizeof(defined));
428
3.04M
      }
429
6.08M
      if (!value.IsNull()) {
430
6.03M
        value.value().AppendToString(&buffer_);
431
6.03M
      }
432
6.08M
    }
433
1.72M
    crc_->Compute(buffer_.c_str(), buffer_.size(), &agg_checksum_, nullptr);
434
1.72M
  }
435
436
  // Accessors for initializing / setting the checksum.
437
2.90k
  uint64_t agg_checksum() const { return agg_checksum_; }
438
439
 private:
440
  crc::Crc* const crc_ = crc::GetCrc32cInstance();
441
  uint64_t agg_checksum_ = 0;
442
  std::string buffer_;
443
};
444
445
Result<std::shared_ptr<tablet::AbstractTablet>> TabletServiceImpl::GetTabletForRead(
446
  const TabletId& tablet_id, tablet::TabletPeerPtr tablet_peer,
447
3.91M
  YBConsistencyLevel consistency_level, tserver::AllowSplitTablet allow_split_tablet) {
448
3.91M
  return GetTablet(server_->tablet_peer_lookup(), tablet_id, std::move(tablet_peer),
449
3.91M
                   consistency_level, allow_split_tablet);
450
3.91M
}
451
452
TabletServiceImpl::TabletServiceImpl(TabletServerIf* server)
453
    : TabletServerServiceIf(server->MetricEnt()),
454
11.2k
      server_(server) {
455
11.2k
}
456
457
TabletServiceAdminImpl::TabletServiceAdminImpl(TabletServer* server)
458
5.81k
    : TabletServerAdminServiceIf(server->MetricEnt()), server_(server) {}
459
460
void TabletServiceAdminImpl::BackfillDone(
461
    const tablet::ChangeMetadataRequestPB* req, ChangeMetadataResponsePB* resp,
462
2.47k
    rpc::RpcContext context) {
463
2.47k
  if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "BackfillDone", req, resp, &context)) {
464
0
    return;
465
0
  }
466
23
  DVLOG(3) << "Received BackfillDone RPC: " << req->DebugString();
467
468
2.47k
  server::UpdateClock(*req, server_->Clock());
469
470
  // For now, we shall only allow this RPC on the leader.
471
2.47k
  auto tablet =
472
2.47k
      LookupLeaderTabletOrRespond(server_->tablet_peer_lookup(), req->tablet_id(), resp, &context);
473
2.47k
  if (!tablet) {
474
11
    return;
475
11
  }
476
477
2.45k
  auto operation = std::make_unique<ChangeMetadataOperation>(
478
2.45k
      tablet.peer->tablet(), tablet.peer->log(), req);
479
480
2.45k
  operation->set_completion_callback(
481
2.45k
      MakeRpcOperationCompletionCallback(std::move(context), resp, server_->Clock()));
482
483
  // Submit the alter schema op. The RPC will be responded to asynchronously.
484
2.45k
  tablet.peer->Submit(std::move(operation), tablet.leader_term);
485
2.45k
}
486
487
void TabletServiceAdminImpl::GetSafeTime(
488
2.65k
    const GetSafeTimeRequestPB* req, GetSafeTimeResponsePB* resp, rpc::RpcContext context) {
489
2.65k
  if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "GetSafeTime", req, resp, &context)) {
490
0
    return;
491
0
  }
492
7
  DVLOG(3) << "Received GetSafeTime RPC: " << req->DebugString();
493
494
2.65k
  server::UpdateClock(*req, server_->Clock());
495
496
  // For now, we shall only allow this RPC on the leader.
497
2.65k
  auto tablet =
498
2.65k
      LookupLeaderTabletOrRespond(server_->tablet_peer_lookup(), req->tablet_id(), resp, &context);
499
2.65k
  if (!tablet) {
500
0
    return;
501
0
  }
502
2.65k
  const CoarseTimePoint& deadline = context.GetClientDeadline();
503
2.65k
  HybridTime min_hybrid_time(HybridTime::kMin);
504
2.65k
  if (req->has_min_hybrid_time_for_backfill()) {
505
2.64k
    min_hybrid_time = HybridTime(req->min_hybrid_time_for_backfill());
506
    // For Transactional tables, wait until there are no pending transactions that started
507
    // prior to min_hybrid_time. These may not have updated the index correctly, if they
508
    // happen to commit after the backfill scan, it is possible that they may miss updating
509
    // the index because the some operations may have taken place prior to min_hybrid_time.
510
    //
511
    // For Non-Txn tables, it is impossible to know at the tservers whether or not an "old
512
    // transaction" is still active. To avoid having such old transactions, we assume a
513
    // bound on the length of such transactions (during the backfill process) and wait it
514
    // out.
515
2.64k
    if (!tablet.peer->tablet()->transaction_participant()) {
516
498
      min_hybrid_time = min_hybrid_time.AddMilliseconds(
517
498
          FLAGS_index_backfill_upperbound_for_user_enforced_txn_duration_ms);
518
0
      VLOG(2) << "GetSafeTime called on a user enforced transaction tablet "
519
0
              << tablet.peer->tablet_id() << " will wait until "
520
0
              << min_hybrid_time << " is safe.";
521
2.14k
    } else {
522
      // Add some extra delay to wait for operations being replicated to be
523
      // applied.
524
2.14k
      SleepFor(MonoDelta::FromMilliseconds(
525
2.14k
          FLAGS_index_backfill_additional_delay_before_backfilling_ms));
526
527
2.14k
      auto txn_particpant = tablet.peer->tablet()->transaction_participant();
528
2.14k
      auto wait_until = CoarseMonoClock::Now() + FLAGS_index_backfill_wait_for_old_txns_ms * 1ms;
529
2.14k
      HybridTime min_running_ht;
530
2.13k
      for (;;) {
531
2.13k
        min_running_ht = txn_particpant->MinRunningHybridTime();
532
2.15k
        if ((min_running_ht && min_running_ht >= min_hybrid_time) ||
533
2.14k
            CoarseMonoClock::Now() > wait_until) {
534
2.14k
          break;
535
2.14k
        }
536
18.4E
        VLOG(2) << "MinRunningHybridTime is " << min_running_ht
537
18.4E
                << " need to wait for " << min_hybrid_time;
538
18.4E
        SleepFor(MonoDelta::FromMilliseconds(FLAGS_transaction_min_running_check_interval_ms));
539
18.4E
      }
540
541
18.4E
      VLOG(2) << "Finally MinRunningHybridTime is " << min_running_ht;
542
2.14k
      if (min_running_ht < min_hybrid_time) {
543
0
        VLOG(2) << "Aborting Txns that started prior to " << min_hybrid_time;
544
1
        auto s = txn_particpant->StopActiveTxnsPriorTo(min_hybrid_time, deadline);
545
1
        if (!s.ok()) {
546
0
          SetupErrorAndRespond(resp->mutable_error(), s, &context);
547
0
          return;
548
0
        }
549
2.65k
      }
550
2.14k
    }
551
2.64k
  }
552
553
2.65k
  auto safe_time = tablet.peer->tablet()->SafeTime(
554
2.65k
      tablet::RequireLease::kTrue, min_hybrid_time, deadline);
555
2.65k
  if (!safe_time.ok()) {
556
5
    SetupErrorAndRespond(resp->mutable_error(), safe_time.status(), &context);
557
5
    return;
558
5
  }
559
560
2.64k
  resp->set_safe_time(safe_time->ToUint64());
561
2.64k
  resp->set_propagated_hybrid_time(server_->Clock()->Now().ToUint64());
562
18.4E
  VLOG(1) << "Tablet " << tablet.peer->tablet_id()
563
18.4E
          << " returning safe time " << yb::ToString(safe_time);
564
565
2.64k
  context.RespondSuccess();
566
2.64k
}
567
568
void TabletServiceAdminImpl::BackfillIndex(
569
2.70k
    const BackfillIndexRequestPB* req, BackfillIndexResponsePB* resp, rpc::RpcContext context) {
570
2.70k
  if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "BackfillIndex", req, resp, &context)) {
571
0
    return;
572
0
  }
573
1
  DVLOG(3) << "Received BackfillIndex RPC: " << req->DebugString();
574
575
2.70k
  server::UpdateClock(*req, server_->Clock());
576
577
  // For now, we shall only allow this RPC on the leader.
578
2.70k
  auto tablet =
579
2.70k
      LookupLeaderTabletOrRespond(server_->tablet_peer_lookup(), req->tablet_id(), resp, &context);
580
2.70k
  if (!tablet) {
581
0
    return;
582
0
  }
583
584
2.70k
  if (req->indexes().empty()) {
585
0
    SetupErrorAndRespond(
586
0
        resp->mutable_error(),
587
0
        STATUS(InvalidArgument, "No indexes given in request"),
588
0
        TabletServerErrorPB::OPERATION_NOT_SUPPORTED,
589
0
        &context);
590
0
    return;
591
0
  }
592
593
2.70k
  const CoarseTimePoint &deadline = context.GetClientDeadline();
594
2.70k
  const auto coarse_start = CoarseMonoClock::Now();
595
2.70k
  {
596
2.70k
    std::unique_lock<std::mutex> l(backfill_lock_);
597
3.63k
    while (num_tablets_backfilling_ >= FLAGS_num_concurrent_backfills_allowed) {
598
934
      if (backfill_cond_.wait_until(l, deadline) == std::cv_status::timeout) {
599
0
        SetupErrorAndRespond(
600
0
            resp->mutable_error(),
601
0
            STATUS_FORMAT(ServiceUnavailable,
602
0
                          "Already running $0 backfill requests",
603
0
                          num_tablets_backfilling_),
604
0
            &context);
605
0
        return;
606
0
      }
607
934
    }
608
2.70k
    num_tablets_backfilling_++;
609
2.70k
  }
610
2.62k
  auto se = ScopeExit([this] {
611
2.62k
    std::unique_lock<std::mutex> l(this->backfill_lock_);
612
2.62k
    this->num_tablets_backfilling_--;
613
2.62k
    this->backfill_cond_.notify_all();
614
2.62k
  });
615
616
  // Wait for SafeTime to get past read_at;
617
2.70k
  const HybridTime read_at(req->read_at_hybrid_time());
618
18.4E
  DVLOG(1) << "Waiting for safe time to be past " << read_at;
619
2.70k
  const auto safe_time =
620
2.70k
      tablet.peer->tablet()->SafeTime(tablet::RequireLease::kFalse, read_at, deadline);
621
78
  DVLOG(1) << "Got safe time " << safe_time.ToString();
622
2.70k
  if (!safe_time.ok()) {
623
0
    LOG(ERROR) << "Could not get a good enough safe time " << safe_time.ToString();
624
0
    SetupErrorAndRespond(resp->mutable_error(), safe_time.status(), &context);
625
0
    return;
626
0
  }
627
628
  // Don't work on the request if we have had to wait more than 50%
629
  // of the time allocated to us for the RPC.
630
  // Backfill is a costly operation, we do not want to start working
631
  // on it if we expect the client (master) to time out the RPC and
632
  // force us to redo the work.
633
2.70k
  const auto coarse_now = CoarseMonoClock::Now();
634
2.70k
  if (deadline - coarse_now < coarse_now - coarse_start) {
635
0
    SetupErrorAndRespond(
636
0
        resp->mutable_error(),
637
0
        STATUS_FORMAT(
638
0
            ServiceUnavailable, "Not enough time left $0", deadline - coarse_now),
639
0
        &context);
640
0
    return;
641
0
  }
642
643
2.70k
  bool all_at_backfill = true;
644
2.70k
  bool all_past_backfill = true;
645
2.70k
  bool is_pg_table = tablet.peer->tablet()->table_type() == TableType::PGSQL_TABLE_TYPE;
646
2.70k
  const shared_ptr<IndexMap> index_map = tablet.peer->tablet_metadata()->index_map(
647
2.70k
    req->indexed_table_id());
648
2.70k
  std::vector<IndexInfo> indexes_to_backfill;
649
2.70k
  std::vector<TableId> index_ids;
650
2.65k
  for (const auto& idx : req->indexes()) {
651
2.65k
    auto result = index_map->FindIndex(idx.table_id());
652
2.65k
    if (result) {
653
2.64k
      const IndexInfo* index_info = *result;
654
2.64k
      indexes_to_backfill.push_back(*index_info);
655
2.64k
      index_ids.push_back(index_info->table_id());
656
657
2.64k
      IndexInfoPB idx_info_pb;
658
2.64k
      index_info->ToPB(&idx_info_pb);
659
2.64k
      if (!is_pg_table) {
660
2.40k
        all_at_backfill &=
661
2.40k
            idx_info_pb.index_permissions() == IndexPermissions::INDEX_PERM_DO_BACKFILL;
662
239
      } else {
663
        // YSQL tables don't use all the docdb permissions, so use this approximation.
664
        // TODO(jason): change this back to being like YCQL once we bring the docdb permission
665
        // DO_BACKFILL back (issue #6218).
666
239
        all_at_backfill &=
667
239
            idx_info_pb.index_permissions() == IndexPermissions::INDEX_PERM_WRITE_AND_DELETE;
668
239
      }
669
2.64k
      all_past_backfill &=
670
2.64k
          idx_info_pb.index_permissions() > IndexPermissions::INDEX_PERM_DO_BACKFILL;
671
6
    } else {
672
6
      LOG(WARNING) << "index " << idx.table_id() << " not found in tablet metadata";
673
6
      all_at_backfill = false;
674
6
      all_past_backfill = false;
675
6
    }
676
2.65k
  }
677
678
2.70k
  if (!all_at_backfill) {
679
0
    if (all_past_backfill) {
680
      // Change this to see if for all indexes: IndexPermission > DO_BACKFILL.
681
0
      LOG(WARNING) << "Received BackfillIndex RPC: " << req->DebugString()
682
0
                   << " after all indexes have moved past DO_BACKFILL. IndexMap is "
683
0
                   << ToString(index_map);
684
      // This is possible if this tablet completed the backfill. But the master failed over before
685
      // other tablets could complete.
686
      // The new master is redoing the backfill. We are safe to ignore this request.
687
0
      context.RespondSuccess();
688
0
      return;
689
0
    }
690
691
0
    uint32_t our_schema_version = tablet.peer->tablet_metadata()->schema_version();
692
0
    uint32_t their_schema_version = req->schema_version();
693
0
    DCHECK_NE(our_schema_version, their_schema_version);
694
0
    SetupErrorAndRespond(
695
0
        resp->mutable_error(),
696
0
        STATUS_SUBSTITUTE(
697
0
            InvalidArgument,
698
0
            "Tablet has a different schema $0 vs $1. "
699
0
            "Requested index is not ready to backfill. IndexMap: $2",
700
0
            our_schema_version, their_schema_version, ToString(index_map)),
701
0
        TabletServerErrorPB::MISMATCHED_SCHEMA, &context);
702
0
    return;
703
0
  }
704
705
2.70k
  Status backfill_status;
706
2.70k
  std::string backfilled_until;
707
2.70k
  std::unordered_set<TableId> failed_indexes;
708
2.70k
  size_t number_rows_processed = 0;
709
2.70k
  if (is_pg_table) {
710
235
    if (!req->has_namespace_name()) {
711
0
      SetupErrorAndRespond(
712
0
          resp->mutable_error(),
713
0
          STATUS(
714
0
              InvalidArgument,
715
0
              "Attempted backfill on YSQL table without supplying database name"),
716
0
          TabletServerErrorPB::OPERATION_NOT_SUPPORTED,
717
0
          &context);
718
0
      return;
719
0
    }
720
235
    backfill_status = tablet.peer->tablet()->BackfillIndexesForYsql(
721
235
        indexes_to_backfill,
722
235
        req->start_key(),
723
235
        deadline,
724
235
        read_at,
725
235
        server_->pgsql_proxy_bind_address(),
726
235
        req->namespace_name(),
727
235
        server_->GetSharedMemoryPostgresAuthKey(),
728
235
        &number_rows_processed,
729
235
        &backfilled_until);
730
235
    if (backfill_status.IsIllegalState()) {
731
0
      DCHECK_EQ(failed_indexes.size(), 0) << "We don't support batching in YSQL yet";
732
1
      for (const auto& idx_info : indexes_to_backfill) {
733
1
        failed_indexes.insert(idx_info.table_id());
734
1
      }
735
0
      DCHECK_EQ(failed_indexes.size(), 1) << "We don't support batching in YSQL yet";
736
1
    }
737
2.46k
  } else if (tablet.peer->tablet()->table_type() == TableType::YQL_TABLE_TYPE) {
738
2.39k
    backfill_status = tablet.peer->tablet()->BackfillIndexes(
739
2.39k
        indexes_to_backfill,
740
2.39k
        req->start_key(),
741
2.39k
        deadline,
742
2.39k
        read_at,
743
2.39k
        &number_rows_processed,
744
2.39k
        &backfilled_until,
745
2.39k
        &failed_indexes);
746
76
  } else {
747
76
    SetupErrorAndRespond(
748
76
        resp->mutable_error(),
749
76
        STATUS(InvalidArgument, "Attempted backfill on tablet of invalid table type"),
750
76
        TabletServerErrorPB::OPERATION_NOT_SUPPORTED,
751
76
        &context);
752
76
    return;
753
76
  }
754
0
  DVLOG(1) << "Tablet " << tablet.peer->tablet_id() << " backfilled indexes "
755
0
           << yb::ToString(index_ids) << " and got " << backfill_status
756
0
           << " backfilled until : " << backfilled_until;
757
758
2.62k
  resp->set_backfilled_until(backfilled_until);
759
2.62k
  resp->set_propagated_hybrid_time(server_->Clock()->Now().ToUint64());
760
2.62k
  resp->set_number_rows_processed(number_rows_processed);
761
762
2.62k
  if (!backfill_status.ok()) {
763
0
    VLOG(2) << " Failed indexes are " << yb::ToString(failed_indexes);
764
6
    for (const auto& idx : failed_indexes) {
765
6
      *resp->add_failed_index_ids() = idx;
766
6
    }
767
6
    SetupErrorAndRespond(
768
6
        resp->mutable_error(),
769
6
        backfill_status,
770
6
        (backfill_status.IsIllegalState()
771
6
            ? TabletServerErrorPB::OPERATION_NOT_SUPPORTED
772
0
            : TabletServerErrorPB::UNKNOWN_ERROR),
773
6
        &context);
774
6
    return;
775
6
  }
776
777
2.62k
  context.RespondSuccess();
778
2.62k
}
779
780
void TabletServiceAdminImpl::AlterSchema(const tablet::ChangeMetadataRequestPB* req,
781
                                         ChangeMetadataResponsePB* resp,
782
18.2k
                                         rpc::RpcContext context) {
783
18.2k
  if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "ChangeMetadata", req, resp, &context)) {
784
0
    return;
785
0
  }
786
221
  VLOG(1) << "Received Change Metadata RPC: " << req->DebugString();
787
18.2k
  if (FLAGS_TEST_alter_schema_delay_ms) {
788
0
    LOG(INFO) << __func__ << ": sleeping for " << FLAGS_TEST_alter_schema_delay_ms << "ms";
789
0
    SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_alter_schema_delay_ms));
790
0
    LOG(INFO) << __func__ << ": done sleeping for " << FLAGS_TEST_alter_schema_delay_ms << "ms";
791
0
  }
792
793
18.2k
  server::UpdateClock(*req, server_->Clock());
794
795
18.2k
  auto tablet = LookupLeaderTabletOrRespond(
796
18.2k
      server_->tablet_peer_lookup(), req->tablet_id(), resp, &context);
797
18.2k
  if (!tablet) {
798
2
    return;
799
2
  }
800
801
18.2k
  tablet::TableInfoPtr table_info;
802
18.2k
  if (req->has_alter_table_id()) {
803
17.9k
    auto result = tablet.peer->tablet_metadata()->GetTableInfo(req->alter_table_id());
804
17.9k
    if (!result.ok()) {
805
0
      SetupErrorAndRespond(resp->mutable_error(), result.status(),
806
0
                           TabletServerErrorPB::INVALID_SCHEMA, &context);
807
0
      return;
808
0
    }
809
17.9k
    table_info = *result;
810
249
  } else {
811
249
    table_info = tablet.peer->tablet_metadata()->primary_table_info();
812
249
  }
813
18.2k
  const Schema& tablet_schema = *table_info->schema;
814
18.2k
  uint32_t schema_version = table_info->schema_version;
815
  // Sanity check, to verify that the tablet should have the same schema
816
  // specified in the request.
817
18.2k
  Schema req_schema;
818
18.2k
  Status s = SchemaFromPB(req->schema(), &req_schema);
819
18.2k
  if (!s.ok()) {
820
0
    SetupErrorAndRespond(resp->mutable_error(), s, TabletServerErrorPB::INVALID_SCHEMA, &context);
821
0
    return;
822
0
  }
823
824
  // If the schema was already applied, respond as succeeded.
825
18.2k
  if (!req->has_wal_retention_secs() && schema_version == req->schema_version()) {
826
827
74
    if (req_schema.Equals(tablet_schema)) {
828
74
      context.RespondSuccess();
829
74
      return;
830
74
    }
831
832
0
    schema_version = tablet.peer->tablet_metadata()->schema_version(
833
0
        req->has_alter_table_id() ? req->alter_table_id() : "");
834
0
    if (schema_version == req->schema_version()) {
835
0
      LOG(ERROR) << "The current schema does not match the request schema."
836
0
                 << " version=" << schema_version
837
0
                 << " current-schema=" << tablet_schema.ToString()
838
0
                 << " request-schema=" << req_schema.ToString()
839
0
                 << " (corruption)";
840
0
      SetupErrorAndRespond(resp->mutable_error(),
841
0
                           STATUS(Corruption, "got a different schema for the same version number"),
842
0
                           TabletServerErrorPB::MISMATCHED_SCHEMA, &context);
843
0
      return;
844
0
    }
845
18.1k
  }
846
847
  // If the current schema is newer than the one in the request reject the request.
848
18.1k
  if (schema_version > req->schema_version()) {
849
0
    LOG(ERROR) << "Tablet " << req->tablet_id() << " has a newer schema"
850
0
               << " version=" << schema_version
851
0
               << " req->schema_version()=" << req->schema_version()
852
0
               << "\n current-schema=" << tablet_schema.ToString()
853
0
               << "\n request-schema=" << req_schema.ToString();
854
0
    SetupErrorAndRespond(
855
0
        resp->mutable_error(),
856
0
        STATUS_SUBSTITUTE(
857
0
            InvalidArgument, "Tablet has a newer schema Tab $0. Req $1 vs Existing version : $2",
858
0
            req->tablet_id(), req->schema_version(), schema_version),
859
0
        TabletServerErrorPB::TABLET_HAS_A_NEWER_SCHEMA, &context);
860
0
    return;
861
0
  }
862
863
153
  VLOG(1) << "Tablet updating schema from "
864
153
          << " version=" << schema_version << " current-schema=" << tablet_schema.ToString()
865
153
          << " to request-schema=" << req_schema.ToString()
866
153
          << " for table ID=" << table_info->table_id;
867
18.1k
  ScopedRWOperationPause pause_writes;
868
18.1k
  if ((tablet.peer->tablet()->table_type() == TableType::YQL_TABLE_TYPE &&
869
13.8k
       !GetAtomicFlag(&FLAGS_disable_alter_vs_write_mutual_exclusion)) ||
870
17.7k
      tablet.peer->tablet()->table_type() == TableType::PGSQL_TABLE_TYPE) {
871
    // For schema change operations we will have to pause the write operations
872
    // until the schema change is done. This will be done synchronously.
873
17.7k
    pause_writes = tablet.peer->tablet()->PauseWritePermits(context.GetClientDeadline());
874
17.7k
    if (!pause_writes.ok()) {
875
0
      SetupErrorAndRespond(
876
0
          resp->mutable_error(),
877
0
          STATUS(
878
0
              TryAgain, "Could not lock the tablet against write operations for schema change"),
879
0
          &context);
880
0
      return;
881
0
    }
882
883
    // After write operation is paused, active transactions will be aborted for YSQL transactions.
884
17.7k
    if (tablet.peer->tablet()->table_type() == TableType::PGSQL_TABLE_TYPE &&
885
4.00k
        req->should_abort_active_txns()) {
886
317
      DCHECK(req->has_transaction_id());
887
317
      if (tablet.peer->tablet()->transaction_participant() == nullptr) {
888
0
        auto status = STATUS(
889
0
            IllegalState, "Transaction participant is null for tablet " + req->tablet_id());
890
0
        LOG(ERROR) << status;
891
0
        SetupErrorAndRespond(
892
0
            resp->mutable_error(),
893
0
            status,
894
0
            &context);
895
0
        return;
896
0
      }
897
317
      HybridTime max_cutoff = HybridTime::kMax;
898
317
      CoarseTimePoint deadline =
899
317
          CoarseMonoClock::Now() +
900
317
          MonoDelta::FromMilliseconds(FLAGS_ysql_transaction_abort_timeout_ms);
901
317
      TransactionId txn_id = CHECK_RESULT(TransactionId::FromString(req->transaction_id()));
902
317
      LOG(INFO) << "Aborting transactions that started prior to " << max_cutoff
903
317
                << " for tablet id " << req->tablet_id()
904
317
                << " excluding transaction with id " << txn_id;
905
      // There could be a chance where a transaction does not appear by transaction_participant
906
      // but has already begun replicating through Raft. Such transactions might succeed rather
907
      // than get aborted. This race codnition is dismissable for this intermediate solution.
908
317
      Status status = tablet.peer->tablet()->transaction_participant()->StopActiveTxnsPriorTo(
909
317
            max_cutoff, deadline, &txn_id);
910
317
      if (!status.ok() || PREDICT_FALSE(FLAGS_TEST_fail_alter_schema_after_abort_transactions)) {
911
0
        auto status = STATUS(TryAgain, "Transaction abort failed for tablet " + req->tablet_id());
912
0
        LOG(WARNING) << status;
913
0
        SetupErrorAndRespond(
914
0
            resp->mutable_error(),
915
0
            status,
916
0
            &context);
917
0
        return;
918
0
      }
919
18.1k
    }
920
17.7k
  }
921
18.1k
  auto operation = std::make_unique<ChangeMetadataOperation>(
922
18.1k
      tablet.peer->tablet(), tablet.peer->log(), req);
923
924
18.1k
  operation->set_completion_callback(
925
18.1k
      MakeRpcOperationCompletionCallback(std::move(context), resp, server_->Clock()));
926
18.1k
  operation->UsePermitToken(std::move(pause_writes));
927
928
  // Submit the alter schema op. The RPC will be responded to asynchronously.
929
18.1k
  tablet.peer->Submit(std::move(operation), tablet.leader_term);
930
18.1k
}
931
932
11.0M
#define VERIFY_RESULT_OR_RETURN(expr) RESULT_CHECKER_HELPER( \
933
11.0M
    expr, \
934
11.0M
    if (!__result.ok()) { return; });
935
936
void TabletServiceImpl::VerifyTableRowRange(
937
    const VerifyTableRowRangeRequestPB* req,
938
    VerifyTableRowRangeResponsePB* resp,
939
0
    rpc::RpcContext context) {
940
0
  DVLOG(3) << "Received VerifyTableRowRange RPC: " << req->DebugString();
941
942
0
  server::UpdateClock(*req, server_->Clock());
943
944
0
  auto peer_tablet =
945
0
      LookupTabletPeerOrRespond(server_->tablet_peer_lookup(), req->tablet_id(), resp, &context);
946
0
  if (!peer_tablet) {
947
0
    return;
948
0
  }
949
950
0
  auto tablet = peer_tablet->tablet;
951
0
  bool is_pg_table = tablet->table_type() == TableType::PGSQL_TABLE_TYPE;
952
0
  if (is_pg_table) {
953
0
    SetupErrorAndRespond(
954
0
        resp->mutable_error(), STATUS(NotFound, "Verify operation not supported for PGSQL tables."),
955
0
        &context);
956
0
    return;
957
0
  }
958
959
0
  const CoarseTimePoint& deadline = context.GetClientDeadline();
960
961
  // Wait for SafeTime to get past read_at;
962
0
  const HybridTime read_at(req->read_time());
963
0
  DVLOG(1) << "Waiting for safe time to be past " << read_at;
964
0
  const auto safe_time = tablet->SafeTime(tablet::RequireLease::kFalse, read_at, deadline);
965
0
  DVLOG(1) << "Got safe time " << safe_time.ToString();
966
0
  if (!safe_time.ok()) {
967
0
    LOG(ERROR) << "Could not get a good enough safe time " << safe_time.ToString();
968
0
    SetupErrorAndRespond(resp->mutable_error(), safe_time.status(), &context);
969
0
    return;
970
0
  }
971
972
0
  auto valid_read_at = req->has_read_time() ? read_at : *safe_time;
973
0
  std::string verified_until = "";
974
0
  std::unordered_map<TableId, uint64> consistency_stats;
975
976
0
  if (peer_tablet->tablet_peer->tablet_metadata()->primary_table_info()->index_info) {
977
0
    auto index_info =
978
0
        *peer_tablet->tablet_peer->tablet_metadata()->primary_table_info()->index_info;
979
0
    const auto& table_id = index_info.indexed_table_id();
980
0
    Status verify_status = tablet->VerifyMainTableConsistencyForCQL(
981
0
        table_id, req->start_key(), req->num_rows(), deadline, valid_read_at, &consistency_stats,
982
0
        &verified_until);
983
0
    if (!verify_status.ok()) {
984
0
      SetupErrorAndRespond(resp->mutable_error(), verify_status, &context);
985
0
      return;
986
0
    }
987
988
0
    (*resp->mutable_consistency_stats())[table_id] = consistency_stats[table_id];
989
0
  } else {
990
0
    const IndexMap index_map =
991
0
        *peer_tablet->tablet_peer->tablet_metadata()->primary_table_info()->index_map;
992
0
    vector<IndexInfo> indexes;
993
0
    vector<TableId> index_ids;
994
0
    if (req->index_ids().empty()) {
995
0
      for (auto it = index_map.begin(); it != index_map.end(); it++) {
996
0
        indexes.push_back(it->second);
997
0
      }
998
0
    } else {
999
0
      for (const auto& idx : req->index_ids()) {
1000
0
        auto result = index_map.FindIndex(idx);
1001
0
        if (result) {
1002
0
          const IndexInfo* index_info = *result;
1003
0
          indexes.push_back(*index_info);
1004
0
          index_ids.push_back(index_info->table_id());
1005
0
        } else {
1006
0
          LOG(WARNING) << "Index " << idx << " not found in tablet metadata";
1007
0
        }
1008
0
      }
1009
0
    }
1010
1011
0
    Status verify_status = tablet->VerifyIndexTableConsistencyForCQL(
1012
0
        indexes, req->start_key(), req->num_rows(), deadline, valid_read_at, &consistency_stats,
1013
0
        &verified_until);
1014
0
    if (!verify_status.ok()) {
1015
0
      SetupErrorAndRespond(resp->mutable_error(), verify_status, &context);
1016
0
      return;
1017
0
    }
1018
1019
0
    for (const IndexInfo& index : indexes) {
1020
0
      const auto& table_id = index.table_id();
1021
0
      (*resp->mutable_consistency_stats())[table_id] = consistency_stats[table_id];
1022
0
    }
1023
0
  }
1024
0
  resp->set_verified_until(verified_until);
1025
0
  context.RespondSuccess();
1026
0
}
1027
1028
void TabletServiceImpl::UpdateTransaction(const UpdateTransactionRequestPB* req,
1029
                                          UpdateTransactionResponsePB* resp,
1030
1.48M
                                          rpc::RpcContext context) {
1031
1.48M
  TRACE("UpdateTransaction");
1032
1033
1.48M
  if (req->state().status() == TransactionStatus::CREATED &&
1034
243k
      RandomActWithProbability(TEST_delay_create_transaction_probability)) {
1035
0
    std::this_thread::sleep_for(
1036
0
        (FLAGS_transaction_rpc_timeout_ms + RandomUniformInt(-200, 200)) * 1ms);
1037
0
  }
1038
1039
783
  VLOG(1) << "UpdateTransaction: " << req->ShortDebugString()
1040
783
          << ", context: " << context.ToString();
1041
1.77k
  LOG_IF(DFATAL, !req->has_propagated_hybrid_time())
1042
1.77k
      << __func__ << " missing propagated hybrid time for "
1043
1.77k
      << TransactionStatus_Name(req->state().status());
1044
1.48M
  UpdateClock(*req, server_->Clock());
1045
1046
1.48M
  LeaderTabletPeer tablet;
1047
1.48M
  auto txn_status = req->state().status();
1048
1.48M
  auto cleanup = txn_status == TransactionStatus::IMMEDIATE_CLEANUP ||
1049
1.12M
                 txn_status == TransactionStatus::GRACEFUL_CLEANUP;
1050
1.48M
  if (cleanup) {
1051
548k
    auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond(
1052
548k
        server_->tablet_peer_lookup(), req->tablet_id(), resp, &context));
1053
548k
    tablet.FillTabletPeer(std::move(peer_tablet));
1054
548k
    tablet.leader_term = OpId::kUnknownTerm;
1055
940k
  } else {
1056
940k
    tablet = LookupLeaderTabletOrRespond(
1057
940k
        server_->tablet_peer_lookup(), req->tablet_id(), resp, &context);
1058
940k
  }
1059
1.48M
  if (!tablet) {
1060
6.71k
    return;
1061
6.71k
  }
1062
1063
1.48M
  auto state = std::make_unique<tablet::UpdateTxnOperation>(tablet.tablet.get(), &req->state());
1064
1.48M
  state->set_completion_callback(MakeRpcOperationCompletionCallback(
1065
1.48M
      std::move(context), resp, server_->Clock()));
1066
1067
1.48M
  if (req->state().status() == TransactionStatus::APPLYING || cleanup) {
1068
794k
    auto* participant = tablet.tablet->transaction_participant();
1069
794k
    if (participant) {
1070
793k
      participant->Handle(std::move(state), tablet.leader_term);
1071
1.21k
    } else {
1072
1.21k
      state->CompleteWithStatus(STATUS_FORMAT(
1073
1.21k
          InvalidArgument, "Does not have transaction participant to process $0",
1074
1.21k
          req->state().status()));
1075
1.21k
    }
1076
686k
  } else {
1077
686k
    auto* coordinator = tablet.tablet->transaction_coordinator();
1078
686k
    if (coordinator) {
1079
685k
      coordinator->Handle(std::move(state), tablet.leader_term);
1080
1.43k
    } else {
1081
1.43k
      state->CompleteWithStatus(STATUS_FORMAT(
1082
1.43k
          InvalidArgument, "Does not have transaction coordinator to process $0",
1083
1.43k
          req->state().status()));
1084
1.43k
    }
1085
686k
  }
1086
1.48M
}
1087
1088
template <class Req, class Resp, class Action>
1089
void TabletServiceImpl::PerformAtLeader(
1090
208k
    const Req& req, Resp* resp, rpc::RpcContext* context, const Action& action) {
1091
208k
  UpdateClock(*req, server_->Clock());
1092
1093
208k
  auto tablet_peer = LookupLeaderTabletOrRespond(
1094
208k
      server_->tablet_peer_lookup(), req->tablet_id(), resp, context);
1095
1096
208k
  if (!tablet_peer) {
1097
47
    return;
1098
47
  }
1099
1100
208k
  auto status = action(tablet_peer);
1101
1102
208k
  if (*context) {
1103
208k
    resp->set_propagated_hybrid_time(server_->Clock()->Now().ToUint64());
1104
208k
    if (status.ok()) {
1105
208k
      context->RespondSuccess();
1106
18.4E
    } else {
1107
18.4E
      SetupErrorAndRespond(resp->mutable_error(), status, context);
1108
18.4E
    }
1109
208k
  }
1110
208k
}
tablet_service.cc:_ZN2yb7tserver17TabletServiceImpl15PerformAtLeaderIPKNS0_29GetTransactionStatusRequestPBENS0_30GetTransactionStatusResponsePBEZNS1_20GetTransactionStatusES5_PS6_NS_3rpc10RpcContextEE3$_1EEvRKT_PT0_PS9_RKT1_
Line
Count
Source
1090
208k
    const Req& req, Resp* resp, rpc::RpcContext* context, const Action& action) {
1091
208k
  UpdateClock(*req, server_->Clock());
1092
1093
208k
  auto tablet_peer = LookupLeaderTabletOrRespond(
1094
208k
      server_->tablet_peer_lookup(), req->tablet_id(), resp, context);
1095
1096
208k
  if (!tablet_peer) {
1097
45
    return;
1098
45
  }
1099
1100
208k
  auto status = action(tablet_peer);
1101
1102
208k
  if (*context) {
1103
208k
    resp->set_propagated_hybrid_time(server_->Clock()->Now().ToUint64());
1104
208k
    if (status.ok()) {
1105
208k
      context->RespondSuccess();
1106
18.4E
    } else {
1107
18.4E
      SetupErrorAndRespond(resp->mutable_error(), status, context);
1108
18.4E
    }
1109
208k
  }
1110
208k
}
Unexecuted instantiation: tablet_service.cc:_ZN2yb7tserver17TabletServiceImpl15PerformAtLeaderIPKNS0_42GetTransactionStatusAtParticipantRequestPBENS0_43GetTransactionStatusAtParticipantResponsePBEZNS1_33GetTransactionStatusAtParticipantES5_PS6_NS_3rpc10RpcContextEE3$_2EEvRKT_PT0_PS9_RKT1_
tablet_service.cc:_ZN2yb7tserver17TabletServiceImpl15PerformAtLeaderIPKNS0_20GetSplitKeyRequestPBENS0_21GetSplitKeyResponsePBEZNS1_11GetSplitKeyES5_PS6_NS_3rpc10RpcContextEE3$_4EEvRKT_PT0_PS9_RKT1_
Line
Count
Source
1090
49
    const Req& req, Resp* resp, rpc::RpcContext* context, const Action& action) {
1091
49
  UpdateClock(*req, server_->Clock());
1092
1093
49
  auto tablet_peer = LookupLeaderTabletOrRespond(
1094
49
      server_->tablet_peer_lookup(), req->tablet_id(), resp, context);
1095
1096
49
  if (!tablet_peer) {
1097
2
    return;
1098
2
  }
1099
1100
47
  auto status = action(tablet_peer);
1101
1102
47
  if (*context) {
1103
47
    resp->set_propagated_hybrid_time(server_->Clock()->Now().ToUint64());
1104
47
    if (status.ok()) {
1105
46
      context->RespondSuccess();
1106
1
    } else {
1107
1
      SetupErrorAndRespond(resp->mutable_error(), status, context);
1108
1
    }
1109
47
  }
1110
47
}
1111
1112
void TabletServiceImpl::GetTransactionStatus(const GetTransactionStatusRequestPB* req,
1113
                                             GetTransactionStatusResponsePB* resp,
1114
208k
                                             rpc::RpcContext context) {
1115
208k
  TRACE("GetTransactionStatus");
1116
1117
208k
  PerformAtLeader(req, resp, &context,
1118
208k
      [req, resp, &context](const LeaderTabletPeer& tablet_peer) {
1119
208k
    auto* transaction_coordinator = tablet_peer.tablet->transaction_coordinator();
1120
208k
    if (!transaction_coordinator) {
1121
0
      return STATUS_FORMAT(
1122
0
          InvalidArgument, "No transaction coordinator at tablet $0",
1123
0
          tablet_peer.peer->tablet_id());
1124
0
    }
1125
208k
    return transaction_coordinator->GetStatus(
1126
208k
        req->transaction_id(), context.GetClientDeadline(), resp);
1127
208k
  });
1128
208k
}
1129
1130
void TabletServiceImpl::GetTransactionStatusAtParticipant(
1131
    const GetTransactionStatusAtParticipantRequestPB* req,
1132
    GetTransactionStatusAtParticipantResponsePB* resp,
1133
0
    rpc::RpcContext context) {
1134
0
  TRACE("GetTransactionStatusAtParticipant");
1135
1136
0
  PerformAtLeader(req, resp, &context,
1137
0
      [req, resp, &context](const LeaderTabletPeer& tablet_peer) -> Status {
1138
0
    auto* transaction_participant = tablet_peer.peer->tablet()->transaction_participant();
1139
0
    if (!transaction_participant) {
1140
0
      return STATUS_FORMAT(
1141
0
          InvalidArgument, "No transaction participant at tablet $0",
1142
0
          tablet_peer.peer->tablet_id());
1143
0
    }
1144
1145
0
    transaction_participant->GetStatus(
1146
0
        VERIFY_RESULT(FullyDecodeTransactionId(req->transaction_id())),
1147
0
        req->required_num_replicated_batches(), tablet_peer.leader_term, resp, &context);
1148
0
    return Status::OK();
1149
0
  });
1150
0
}
1151
1152
void TabletServiceImpl::AbortTransaction(const AbortTransactionRequestPB* req,
1153
                                         AbortTransactionResponsePB* resp,
1154
126k
                                         rpc::RpcContext context) {
1155
126k
  TRACE("AbortTransaction");
1156
1157
126k
  UpdateClock(*req, server_->Clock());
1158
1159
126k
  auto tablet = LookupLeaderTabletOrRespond(
1160
126k
      server_->tablet_peer_lookup(), req->tablet_id(), resp, &context);
1161
126k
  if (!tablet) {
1162
9.80k
    return;
1163
9.80k
  }
1164
1165
116k
  server::ClockPtr clock(server_->Clock());
1166
116k
  auto context_ptr = std::make_shared<rpc::RpcContext>(std::move(context));
1167
116k
  tablet.peer->tablet()->transaction_coordinator()->Abort(
1168
116k
      req->transaction_id(),
1169
116k
      tablet.leader_term,
1170
115k
      [resp, context_ptr, clock, peer = tablet.peer](Result<TransactionStatusResult> result) {
1171
115k
        resp->set_propagated_hybrid_time(clock->Now().ToUint64());
1172
115k
        Status status;
1173
115k
        if (result.ok()) {
1174
115k
          auto leader_safe_time = peer->LeaderSafeTime();
1175
115k
          if (leader_safe_time.ok()) {
1176
115k
            resp->set_status(result->status);
1177
115k
            if (result->status_time.is_valid()) {
1178
25.7k
              resp->set_status_hybrid_time(result->status_time.ToUint64());
1179
25.7k
            }
1180
            // See comment above WaitForSafeTime in TransactionStatusCache::DoGetCommitData
1181
            // for details.
1182
115k
            resp->set_coordinator_safe_time(leader_safe_time->ToUint64());
1183
115k
            context_ptr->RespondSuccess();
1184
115k
            return;
1185
115k
          }
1186
1187
13
          status = leader_safe_time.status();
1188
18.4E
        } else {
1189
18.4E
          status = result.status();
1190
18.4E
        }
1191
18.4E
        SetupErrorAndRespond(resp->mutable_error(), status, context_ptr.get());
1192
18.4E
      });
1193
116k
}
1194
1195
void TabletServiceImpl::Truncate(const TruncateRequestPB* req,
1196
                                 TruncateResponsePB* resp,
1197
53.6k
                                 rpc::RpcContext context) {
1198
53.6k
  TRACE("Truncate");
1199
1200
53.6k
  UpdateClock(*req, server_->Clock());
1201
1202
53.6k
  auto tablet = LookupLeaderTabletOrRespond(
1203
53.6k
      server_->tablet_peer_lookup(), req->tablet_id(), resp, &context);
1204
53.6k
  if (!tablet) {
1205
9
    return;
1206
9
  }
1207
1208
53.5k
  auto operation = std::make_unique<TruncateOperation>(tablet.peer->tablet(), &req->truncate());
1209
1210
53.5k
  operation->set_completion_callback(
1211
53.5k
      MakeRpcOperationCompletionCallback(std::move(context), resp, server_->Clock()));
1212
1213
  // Submit the truncate tablet op. The RPC will be responded to asynchronously.
1214
53.5k
  tablet.peer->Submit(std::move(operation), tablet.leader_term);
1215
53.5k
}
1216
1217
void TabletServiceAdminImpl::CreateTablet(const CreateTabletRequestPB* req,
1218
                                          CreateTabletResponsePB* resp,
1219
81.9k
                                          rpc::RpcContext context) {
1220
81.9k
  if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "CreateTablet", req, resp, &context)) {
1221
0
    return;
1222
0
  }
1223
81.9k
  auto status = DoCreateTablet(req, resp);
1224
81.9k
  if (!status.ok()) {
1225
6
    SetupErrorAndRespond(resp->mutable_error(), status, &context);
1226
81.9k
  } else {
1227
81.9k
    context.RespondSuccess();
1228
81.9k
  }
1229
81.9k
}
1230
1231
Status TabletServiceAdminImpl::DoCreateTablet(const CreateTabletRequestPB* req,
1232
81.5k
                                              CreateTabletResponsePB* resp) {
1233
81.5k
  if (PREDICT_FALSE(FLAGS_TEST_txn_status_table_tablet_creation_delay_ms > 0 &&
1234
71
                    req->table_type() == TableType::TRANSACTION_STATUS_TABLE_TYPE)) {
1235
71
    std::this_thread::sleep_for(FLAGS_TEST_txn_status_table_tablet_creation_delay_ms * 1ms);
1236
71
  }
1237
1238
18.4E
  DVLOG(3) << "Received CreateTablet RPC: " << yb::ToString(*req);
1239
81.5k
  TRACE_EVENT1("tserver", "CreateTablet",
1240
81.5k
               "tablet_id", req->tablet_id());
1241
1242
81.5k
  Schema schema;
1243
81.5k
  PartitionSchema partition_schema;
1244
81.5k
  auto status = SchemaFromPB(req->schema(), &schema);
1245
81.5k
  if (status.ok()) {
1246
81.4k
    DCHECK(schema.has_column_ids());
1247
81.4k
    status = PartitionSchema::FromPB(req->partition_schema(), schema, &partition_schema);
1248
81.4k
  }
1249
81.5k
  if (!status.ok()) {
1250
0
    return status.CloneAndAddErrorCode(TabletServerError(TabletServerErrorPB::INVALID_SCHEMA));
1251
0
  }
1252
1253
81.5k
  Partition partition;
1254
81.5k
  Partition::FromPB(req->partition(), &partition);
1255
1256
81.5k
  LOG(INFO) << "Processing CreateTablet for T " << req->tablet_id() << " P " << req->dest_uuid()
1257
81.5k
            << " (table=" << req->table_name()
1258
81.5k
            << " [id=" << req->table_id() << "]), partition="
1259
81.5k
            << partition_schema.PartitionDebugString(partition, schema);
1260
18.4E
  VLOG(1) << "Full request: " << req->DebugString();
1261
1262
81.5k
  auto table_info = std::make_shared<tablet::TableInfo>(
1263
81.5k
      req->table_id(), req->namespace_name(), req->table_name(), req->table_type(), schema,
1264
81.5k
      IndexMap(),
1265
72.1k
      req->has_index_info() ? boost::optional<IndexInfo>(req->index_info()) : boost::none,
1266
81.5k
      0 /* schema_version */, partition_schema);
1267
81.5k
  std::vector<SnapshotScheduleId> snapshot_schedules;
1268
81.5k
  snapshot_schedules.reserve(req->snapshot_schedules().size());
1269
0
  for (const auto& id : req->snapshot_schedules()) {
1270
0
    snapshot_schedules.push_back(VERIFY_RESULT(FullyDecodeSnapshotScheduleId(id)));
1271
0
  }
1272
81.5k
  status = ResultToStatus(server_->tablet_manager()->CreateNewTablet(
1273
81.5k
      table_info, req->tablet_id(), partition, req->config(), req->colocated(),
1274
81.5k
      snapshot_schedules));
1275
81.5k
  if (PREDICT_FALSE(!status.ok())) {
1276
6
    return status.IsAlreadyPresent()
1277
5
        ? status.CloneAndAddErrorCode(TabletServerError(TabletServerErrorPB::TABLET_ALREADY_EXISTS))
1278
1
        : status;
1279
6
  }
1280
81.5k
  return Status::OK();
1281
81.5k
}
1282
1283
void TabletServiceAdminImpl::DeleteTablet(const DeleteTabletRequestPB* req,
1284
                                          DeleteTabletResponsePB* resp,
1285
49.3k
                                          rpc::RpcContext context) {
1286
49.3k
  if (PREDICT_FALSE(FLAGS_TEST_rpc_delete_tablet_fail)) {
1287
0
    context.RespondFailure(STATUS(NetworkError, "Simulating network partition for test"));
1288
0
    return;
1289
0
  }
1290
1291
49.3k
  if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "DeleteTablet", req, resp, &context)) {
1292
0
    return;
1293
0
  }
1294
49.3k
  TRACE_EVENT2("tserver", "DeleteTablet",
1295
49.3k
               "tablet_id", req->tablet_id(),
1296
49.3k
               "reason", req->reason());
1297
1298
49.3k
  tablet::TabletDataState delete_type = tablet::TABLET_DATA_UNKNOWN;
1299
49.3k
  if (req->has_delete_type()) {
1300
49.0k
    delete_type = req->delete_type();
1301
49.0k
  }
1302
49.3k
  LOG(INFO) << "T " << req->tablet_id() << " P " << server_->permanent_uuid()
1303
49.3k
            << ": Processing DeleteTablet with delete_type " << TabletDataState_Name(delete_type)
1304
49.2k
            << (req->has_reason() ? (" (" + req->reason() + ")") : "")
1305
49.3k
            << (req->hide_only() ? " (Hide only)" : "")
1306
49.3k
            << " from " << context.requestor_string();
1307
18.4E
  VLOG(1) << "Full request: " << req->DebugString();
1308
1309
49.3k
  boost::optional<int64_t> cas_config_opid_index_less_or_equal;
1310
49.3k
  if (req->has_cas_config_opid_index_less_or_equal()) {
1311
2.42k
    cas_config_opid_index_less_or_equal = req->cas_config_opid_index_less_or_equal();
1312
2.42k
  }
1313
49.3k
  boost::optional<TabletServerErrorPB::Code> error_code;
1314
49.3k
  Status s = server_->tablet_manager()->DeleteTablet(req->tablet_id(),
1315
49.3k
                                                     delete_type,
1316
49.3k
                                                     cas_config_opid_index_less_or_equal,
1317
49.3k
                                                     req->hide_only(),
1318
49.3k
                                                     &error_code);
1319
49.3k
  if (PREDICT_FALSE(!s.ok())) {
1320
1.76k
    HandleErrorResponse(resp, &context, s, error_code);
1321
1.76k
    return;
1322
1.76k
  }
1323
47.5k
  context.RespondSuccess();
1324
47.5k
}
1325
1326
// TODO(sagnik): Modify this to actually create a copartitioned table
1327
void TabletServiceAdminImpl::CopartitionTable(const CopartitionTableRequestPB* req,
1328
                                              CopartitionTableResponsePB* resp,
1329
0
                                              rpc::RpcContext context) {
1330
0
  context.RespondSuccess();
1331
0
  LOG(INFO) << "tserver doesn't support co-partitioning yet";
1332
0
}
1333
1334
void TabletServiceAdminImpl::FlushTablets(const FlushTabletsRequestPB* req,
1335
                                          FlushTabletsResponsePB* resp,
1336
30
                                          rpc::RpcContext context) {
1337
30
  if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "FlushTablets", req, resp, &context)) {
1338
0
    return;
1339
0
  }
1340
1341
30
  if (!req->all_tablets() && req->tablet_ids_size() == 0) {
1342
0
    const Status s = STATUS(InvalidArgument, "No tablet ids");
1343
0
    SetupErrorAndRespond(resp->mutable_error(), s, &context);
1344
0
    return;
1345
0
  }
1346
1347
30
  server::UpdateClock(*req, server_->Clock());
1348
1349
30
  TRACE_EVENT1("tserver", "FlushTablets",
1350
30
               "TS: ", req->dest_uuid());
1351
1352
30
  LOG(INFO) << "Processing FlushTablets from " << context.requestor_string();
1353
0
  VLOG(1) << "Full FlushTablets request: " << req->DebugString();
1354
30
  TabletPeers tablet_peers;
1355
30
  TSTabletManager::TabletPtrs tablet_ptrs;
1356
1357
30
  if (req->all_tablets()) {
1358
4
    tablet_peers = server_->tablet_manager()->GetTabletPeers(&tablet_ptrs);
1359
26
  } else {
1360
26
    for (const TabletId& id : req->tablet_ids()) {
1361
26
      auto tablet_peer = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond(
1362
26
          server_->tablet_peer_lookup(), id, resp, &context));
1363
26
      tablet_peers.push_back(std::move(tablet_peer.tablet_peer));
1364
26
      auto tablet = tablet_peer.tablet;
1365
26
      if (tablet != nullptr) {
1366
26
        tablet_ptrs.push_back(std::move(tablet));
1367
26
      }
1368
26
    }
1369
26
  }
1370
30
  switch (req->operation()) {
1371
26
    case FlushTabletsRequestPB::FLUSH:
1372
26
      for (const tablet::TabletPtr& tablet : tablet_ptrs) {
1373
26
        resp->set_failed_tablet_id(tablet->tablet_id());
1374
26
        RETURN_UNKNOWN_ERROR_IF_NOT_OK(tablet->Flush(tablet::FlushMode::kAsync), resp, &context);
1375
26
        resp->clear_failed_tablet_id();
1376
26
      }
1377
1378
      // Wait for end of all flush operations.
1379
26
      for (const tablet::TabletPtr& tablet : tablet_ptrs) {
1380
26
        resp->set_failed_tablet_id(tablet->tablet_id());
1381
26
        RETURN_UNKNOWN_ERROR_IF_NOT_OK(tablet->WaitForFlush(), resp, &context);
1382
26
        resp->clear_failed_tablet_id();
1383
26
      }
1384
26
      break;
1385
0
    case FlushTabletsRequestPB::COMPACT:
1386
0
      RETURN_UNKNOWN_ERROR_IF_NOT_OK(
1387
0
          server_->tablet_manager()->TriggerCompactionAndWait(tablet_ptrs), resp, &context);
1388
0
      break;
1389
4
    case FlushTabletsRequestPB::LOG_GC:
1390
4
      for (const auto& tablet : tablet_peers) {
1391
4
        resp->set_failed_tablet_id(tablet->tablet_id());
1392
4
        RETURN_UNKNOWN_ERROR_IF_NOT_OK(tablet->RunLogGC(), resp, &context);
1393
4
        resp->clear_failed_tablet_id();
1394
4
      }
1395
4
      break;
1396
30
  }
1397
1398
30
  context.RespondSuccess();
1399
30
}
1400
1401
void TabletServiceAdminImpl::CountIntents(
1402
    const CountIntentsRequestPB* req,
1403
    CountIntentsResponsePB* resp,
1404
0
    rpc::RpcContext context) {
1405
0
  TSTabletManager::TabletPtrs tablet_ptrs;
1406
0
  TabletPeers tablet_peers = server_->tablet_manager()->GetTabletPeers(&tablet_ptrs);
1407
0
  int64_t total_intents = 0;
1408
  // TODO: do this in parallel.
1409
  // TODO: per-tablet intent counts.
1410
0
  for (const auto& tablet : tablet_ptrs) {
1411
0
    auto num_intents = tablet->CountIntents();
1412
0
    if (!num_intents.ok()) {
1413
0
      SetupErrorAndRespond(resp->mutable_error(), num_intents.status(), &context);
1414
0
      return;
1415
0
    }
1416
0
    total_intents += *num_intents;
1417
0
  }
1418
0
  resp->set_num_intents(total_intents);
1419
0
  context.RespondSuccess();
1420
0
}
1421
1422
void TabletServiceAdminImpl::AddTableToTablet(
1423
    const AddTableToTabletRequestPB* req, AddTableToTabletResponsePB* resp,
1424
27
    rpc::RpcContext context) {
1425
27
  auto tablet_id = req->tablet_id();
1426
1427
27
  const auto tablet =
1428
27
      LookupLeaderTabletOrRespond(server_->tablet_peer_lookup(), tablet_id, resp, &context);
1429
27
  if (!tablet) {
1430
0
    return;
1431
0
  }
1432
0
  DVLOG(3) << "Received AddTableToTablet RPC: " << yb::ToString(*req);
1433
1434
27
  tablet::ChangeMetadataRequestPB change_req;
1435
27
  *change_req.mutable_add_table() = req->add_table();
1436
27
  change_req.set_tablet_id(tablet_id);
1437
27
  Status s = tablet::SyncReplicateChangeMetadataOperation(
1438
27
      &change_req, tablet.peer.get(), tablet.leader_term);
1439
27
  if (PREDICT_FALSE(!s.ok())) {
1440
0
    SetupErrorAndRespond(resp->mutable_error(), s, &context);
1441
0
    return;
1442
0
  }
1443
27
  context.RespondSuccess();
1444
27
}
1445
1446
void TabletServiceAdminImpl::RemoveTableFromTablet(
1447
    const RemoveTableFromTabletRequestPB* req,
1448
    RemoveTableFromTabletResponsePB* resp,
1449
15
    rpc::RpcContext context) {
1450
15
  auto tablet =
1451
15
      LookupLeaderTabletOrRespond(server_->tablet_peer_lookup(), req->tablet_id(), resp, &context);
1452
15
  if (!tablet) {
1453
0
    return;
1454
0
  }
1455
1456
15
  tablet::ChangeMetadataRequestPB change_req;
1457
15
  change_req.set_remove_table_id(req->remove_table_id());
1458
15
  change_req.set_tablet_id(req->tablet_id());
1459
15
  Status s = tablet::SyncReplicateChangeMetadataOperation(
1460
15
      &change_req, tablet.peer.get(), tablet.leader_term);
1461
15
  if (PREDICT_FALSE(!s.ok())) {
1462
0
    SetupErrorAndRespond(resp->mutable_error(), s, &context);
1463
0
    return;
1464
0
  }
1465
15
  context.RespondSuccess();
1466
15
}
1467
1468
void TabletServiceAdminImpl::SplitTablet(
1469
44
    const tablet::SplitTabletRequestPB* req, SplitTabletResponsePB* resp, rpc::RpcContext context) {
1470
44
  if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "SplitTablet", req, resp, &context)) {
1471
0
    return;
1472
0
  }
1473
44
  if (PREDICT_FALSE(FLAGS_TEST_fail_tablet_split_probability > 0) &&
1474
0
      RandomActWithProbability(FLAGS_TEST_fail_tablet_split_probability)) {
1475
0
    return SetupErrorAndRespond(
1476
0
        resp->mutable_error(),
1477
0
        STATUS(InvalidArgument,  // Use InvalidArgument to hit IsDefinitelyPermanentError().
1478
0
            "Failing tablet split due to FLAGS_TEST_fail_tablet_split_probability"),
1479
0
        TabletServerErrorPB::UNKNOWN_ERROR,
1480
0
        &context);
1481
0
  }
1482
44
  TRACE_EVENT1("tserver", "SplitTablet", "tablet_id", req->tablet_id());
1483
1484
44
  server::UpdateClock(*req, server_->Clock());
1485
44
  auto leader_tablet_peer =
1486
44
      LookupLeaderTabletOrRespond(server_->tablet_peer_lookup(), req->tablet_id(), resp, &context);
1487
44
  if (!leader_tablet_peer) {
1488
0
    return;
1489
0
  }
1490
1491
44
  {
1492
44
    auto tablet_data_state = leader_tablet_peer.peer->data_state();
1493
44
    if (tablet_data_state != tablet::TABLET_DATA_READY) {
1494
25
      auto s = tablet_data_state == tablet::TABLET_DATA_SPLIT_COMPLETED
1495
25
                  ? STATUS_FORMAT(AlreadyPresent, "Tablet $0 is already split.", req->tablet_id())
1496
0
                  : STATUS_FORMAT(
1497
25
                        InvalidArgument, "Invalid tablet $0 data state: $1", req->tablet_id(),
1498
25
                        tablet_data_state);
1499
25
      SetupErrorAndRespond(
1500
25
          resp->mutable_error(), s, TabletServerErrorPB::TABLET_NOT_RUNNING, &context);
1501
25
      return;
1502
25
    }
1503
19
  }
1504
1505
19
  auto state = std::make_unique<tablet::SplitOperation>(
1506
19
      leader_tablet_peer.peer->tablet(), server_->tablet_manager(), req);
1507
1508
19
  state->set_completion_callback(
1509
19
      MakeRpcOperationCompletionCallback(std::move(context), resp, server_->Clock()));
1510
1511
19
  leader_tablet_peer.peer->Submit(std::move(state), leader_tablet_peer.leader_term);
1512
19
}
1513
1514
void TabletServiceAdminImpl::UpgradeYsql(
1515
    const UpgradeYsqlRequestPB* req,
1516
    UpgradeYsqlResponsePB* resp,
1517
0
    rpc::RpcContext context) {
1518
0
  LOG(INFO) << "Starting YSQL upgrade";
1519
1520
0
  pgwrapper::YsqlUpgradeHelper upgrade_helper(server_->pgsql_proxy_bind_address(),
1521
0
                                              server_->GetSharedMemoryPostgresAuthKey(),
1522
0
                                              FLAGS_heartbeat_interval_ms);
1523
0
  const auto status = upgrade_helper.Upgrade();
1524
0
  if (!status.ok()) {
1525
0
    LOG(INFO) << "YSQL upgrade failed: " << status;
1526
0
    SetupErrorAndRespond(resp->mutable_error(), status, &context);
1527
0
    return;
1528
0
  }
1529
1530
0
  LOG(INFO) << "YSQL upgrade done successfully";
1531
0
  context.RespondSuccess();
1532
0
}
1533
1534
1535
0
bool EmptyWriteBatch(const docdb::KeyValueWriteBatchPB& write_batch) {
1536
0
  return write_batch.write_pairs().empty() && write_batch.apply_external_transactions().empty();
1537
0
}
1538
1539
void TabletServiceImpl::Write(const WriteRequestPB* req,
1540
                              WriteResponsePB* resp,
1541
1.39M
                              rpc::RpcContext context) {
1542
1.39M
  if (FLAGS_TEST_tserver_noop_read_write) {
1543
0
    for (int i = 0; i < req->ql_write_batch_size(); ++i) {
1544
0
      resp->add_ql_response_batch();
1545
0
    }
1546
0
    context.RespondSuccess();
1547
0
    return;
1548
0
  }
1549
1.39M
  TRACE("Start Write");
1550
1.39M
  TRACE_EVENT1("tserver", "TabletServiceImpl::Write",
1551
1.39M
               "tablet_id", req->tablet_id());
1552
147
  VLOG(2) << "Received Write RPC: " << req->DebugString();
1553
1.39M
  UpdateClock(*req, server_->Clock());
1554
1555
1.39M
  auto tablet = LookupLeaderTabletOrRespond(
1556
1.39M
      server_->tablet_peer_lookup(), req->tablet_id(), resp, &context);
1557
1.39M
  if (!tablet ||
1558
1.34M
      !CheckWriteThrottlingOrRespond(
1559
45.6k
          req->rejection_score(), tablet.peer.get(), resp, &context)) {
1560
45.6k
    return;
1561
45.6k
  }
1562
1563
1.34M
  if (tablet.peer->tablet()->metadata()->hidden()) {
1564
0
    auto status = STATUS(NotFound, "Tablet not found", req->tablet_id());
1565
0
    SetupErrorAndRespond(
1566
0
        resp->mutable_error(), status, TabletServerErrorPB::TABLET_NOT_FOUND, &context);
1567
0
    return;
1568
0
  }
1569
1570
#if defined(DUMP_WRITE)
1571
  if (req->has_write_batch() && req->write_batch().has_transaction()) {
1572
    VLOG(1) << "Write with transaction: " << req->write_batch().transaction().ShortDebugString();
1573
    if (req->pgsql_write_batch_size() != 0) {
1574
      auto txn_id = CHECK_RESULT(FullyDecodeTransactionId(
1575
          req->write_batch().transaction().transaction_id()));
1576
      for (const auto& entry : req->pgsql_write_batch()) {
1577
        if (entry.stmt_type() == PgsqlWriteRequestPB::PGSQL_UPDATE) {
1578
          auto key = entry.column_new_values(0).expr().value().int32_value();
1579
          LOG(INFO) << txn_id << " UPDATE: " << key << " = "
1580
                    << entry.column_new_values(1).expr().value().string_value();
1581
        } else if (
1582
            entry.stmt_type() == PgsqlWriteRequestPB::PGSQL_INSERT ||
1583
            entry.stmt_type() == PgsqlWriteRequestPB::PGSQL_UPSERT) {
1584
          docdb::DocKey doc_key;
1585
          CHECK_OK(doc_key.FullyDecodeFrom(entry.ybctid_column_value().value().binary_value()));
1586
          LOG(INFO) << txn_id << " INSERT: " << doc_key.hashed_group()[0].GetInt32() << " = "
1587
                    << entry.column_values(0).expr().value().string_value();
1588
        } else if (entry.stmt_type() == PgsqlWriteRequestPB::PGSQL_DELETE) {
1589
          LOG(INFO) << txn_id << " DELETE: " << entry.ShortDebugString();
1590
        }
1591
      }
1592
    }
1593
  }
1594
#endif
1595
1596
1.34M
  if (PREDICT_FALSE(req->has_write_batch() && !req->has_external_hybrid_time() &&
1597
0
      (!req->write_batch().write_pairs().empty() || !req->write_batch().read_pairs().empty()))) {
1598
0
    Status s = STATUS(NotSupported, "Write Request contains write batch. This field should be "
1599
0
        "used only for post-processed write requests during "
1600
0
        "Raft replication.");
1601
0
    SetupErrorAndRespond(resp->mutable_error(), s,
1602
0
                         TabletServerErrorPB::INVALID_MUTATION,
1603
0
                         &context);
1604
0
    return;
1605
0
  }
1606
1607
1.34M
  bool has_operations = req->ql_write_batch_size() != 0 ||
1608
319k
                        req->redis_write_batch_size() != 0 ||
1609
258k
                        req->pgsql_write_batch_size() != 0 ||
1610
1
                        (req->has_external_hybrid_time() && !EmptyWriteBatch(req->write_batch()));
1611
1.34M
  if (!has_operations && tablet.peer->tablet()->table_type() != TableType::REDIS_TABLE_TYPE) {
1612
    // An empty request. This is fine, can just exit early with ok status instead of working hard.
1613
    // This doesn't need to go to Raft log.
1614
1
    MakeRpcOperationCompletionCallback<WriteResponsePB>(
1615
1
        std::move(context), resp, server_->Clock())(Status::OK());
1616
1
    return;
1617
1
  }
1618
1619
  // For postgres requests check that the syscatalog version matches.
1620
1.34M
  if (tablet.peer->tablet()->table_type() == TableType::PGSQL_TABLE_TYPE) {
1621
214k
    uint64_t last_breaking_catalog_version = 0; // unset.
1622
1.90M
    for (const auto& pg_req : req->pgsql_write_batch()) {
1623
1.90M
      if (pg_req.has_ysql_catalog_version()) {
1624
1.90M
        if (last_breaking_catalog_version == 0) {
1625
          // Initialize last breaking version if not yet set.
1626
214k
          server_->get_ysql_catalog_version(nullptr /* current_version */,
1627
214k
                                            &last_breaking_catalog_version);
1628
214k
        }
1629
1.90M
        if (pg_req.ysql_catalog_version() < last_breaking_catalog_version) {
1630
1
          SetupErrorAndRespond(resp->mutable_error(),
1631
1
              STATUS_SUBSTITUTE(QLError, "The catalog snapshot used for this "
1632
1
                                         "transaction has been invalidated."),
1633
1
              TabletServerErrorPB::MISMATCHED_SCHEMA, &context);
1634
1
          return;
1635
1
        }
1636
1.90M
      }
1637
1.90M
    }
1638
214k
  }
1639
1640
1.34M
  auto query = std::make_unique<tablet::WriteQuery>(
1641
1.34M
      tablet.leader_term, context.GetClientDeadline(), tablet.peer.get(),
1642
1.34M
      tablet.peer->tablet(), resp);
1643
1.34M
  query->set_client_request(*req);
1644
1645
1.34M
  auto context_ptr = std::make_shared<RpcContext>(std::move(context));
1646
1.34M
  if (RandomActWithProbability(GetAtomicFlag(&FLAGS_TEST_respond_write_failed_probability))) {
1647
0
    LOG(INFO) << "Responding with a failure to " << req->DebugString();
1648
0
    SetupErrorAndRespond(resp->mutable_error(), STATUS(LeaderHasNoLease, "TEST: Random failure"),
1649
0
                         context_ptr.get());
1650
1.34M
  } else {
1651
1.34M
    query->set_callback(WriteQueryCompletionCallback(
1652
1.34M
        tablet.peer, context_ptr, resp, query.get(), server_->Clock(), req->include_trace()));
1653
1.34M
  }
1654
1655
1.34M
  query->AdjustYsqlQueryTransactionality(req->pgsql_write_batch_size());
1656
1657
1.34M
  tablet.peer->WriteAsync(std::move(query));
1658
1.34M
}
1659
1660
void TabletServiceImpl::Read(const ReadRequestPB* req,
1661
                             ReadResponsePB* resp,
1662
4.70M
                             rpc::RpcContext context) {
1663
4.70M
  if (FLAGS_TEST_tserver_noop_read_write) {
1664
0
    context.RespondSuccess();
1665
0
    return;
1666
0
  }
1667
1668
4.70M
  PerformRead(server_, this, req, resp, std::move(context));
1669
4.70M
}
1670
1671
ConsensusServiceImpl::ConsensusServiceImpl(const scoped_refptr<MetricEntity>& metric_entity,
1672
                                           TabletPeerLookupIf* tablet_manager)
1673
    : ConsensusServiceIf(metric_entity),
1674
11.2k
      tablet_manager_(tablet_manager) {
1675
11.2k
}
1676
1677
160
ConsensusServiceImpl::~ConsensusServiceImpl() {
1678
160
}
1679
1680
void ConsensusServiceImpl::CompleteUpdateConsensusResponse(
1681
    std::shared_ptr<tablet::TabletPeer> tablet_peer,
1682
10.2M
    consensus::ConsensusResponsePB* resp) {
1683
10.2M
  auto tablet = tablet_peer->shared_tablet();
1684
10.2M
  if (tablet) {
1685
10.2M
    resp->set_num_sst_files(tablet->GetCurrentVersionNumSSTFiles());
1686
10.2M
  }
1687
10.2M
  resp->set_propagated_hybrid_time(tablet_peer->clock().Now().ToUint64());
1688
10.2M
}
1689
1690
void ConsensusServiceImpl::MultiRaftUpdateConsensus(
1691
      const consensus::MultiRaftConsensusRequestPB *req,
1692
      consensus::MultiRaftConsensusResponsePB *resp,
1693
0
      rpc::RpcContext context) {
1694
0
    DVLOG(3) << "Received Batch Consensus Update RPC: " << req->ShortDebugString();
1695
    // Effectively performs ConsensusServiceImpl::UpdateConsensus for
1696
    // each ConsensusRequestPB in the batch but does not fail the entire
1697
    // batch if a single request fails.
1698
0
    for (int i = 0; i < req->consensus_request_size(); i++) {
1699
      // Unfortunately, we have to use const_cast here,
1700
      // because the protobuf-generated interface only gives us a const request
1701
      // but we need to be able to move messages out of the request for efficiency.
1702
0
      auto consensus_req = const_cast<ConsensusRequestPB*>(&req->consensus_request(i));
1703
0
      auto consensus_resp = resp->add_consensus_response();;
1704
1705
0
      auto uuid_match_res = CheckUuidMatch(tablet_manager_, "UpdateConsensus", consensus_req,
1706
0
                                           context.requestor_string());
1707
0
      if (!uuid_match_res.ok()) {
1708
0
        SetupError(consensus_resp->mutable_error(), uuid_match_res.status());
1709
0
        continue;
1710
0
      }
1711
1712
0
      auto peer_tablet_res = LookupTabletPeer(tablet_manager_, consensus_req->tablet_id());
1713
0
      if (!peer_tablet_res.ok()) {
1714
0
        SetupError(consensus_resp->mutable_error(), peer_tablet_res.status());
1715
0
        continue;
1716
0
      }
1717
0
      auto tablet_peer = peer_tablet_res.get().tablet_peer;
1718
1719
      // Submit the update directly to the TabletPeer's Consensus instance.
1720
0
      auto consensus_res = GetConsensus(tablet_peer);
1721
0
      if (!consensus_res.ok()) {
1722
0
        SetupError(consensus_resp->mutable_error(), consensus_res.status());
1723
0
        continue;
1724
0
      }
1725
0
      auto consensus = *consensus_res;
1726
1727
0
      Status s = consensus->Update(
1728
0
        consensus_req, consensus_resp, context.GetClientDeadline());
1729
0
      if (PREDICT_FALSE(!s.ok())) {
1730
        // Clear the response first, since a partially-filled response could
1731
        // result in confusing a caller, or in having missing required fields
1732
        // in embedded optional messages.
1733
0
        consensus_resp->Clear();
1734
0
        SetupError(consensus_resp->mutable_error(), s);
1735
0
        continue;
1736
0
      }
1737
1738
0
      CompleteUpdateConsensusResponse(tablet_peer, consensus_resp);
1739
0
    }
1740
0
    context.RespondSuccess();
1741
0
}
1742
1743
void ConsensusServiceImpl::UpdateConsensus(const ConsensusRequestPB* req,
1744
                                           ConsensusResponsePB* resp,
1745
10.3M
                                           rpc::RpcContext context) {
1746
982
  DVLOG(3) << "Received Consensus Update RPC: " << req->ShortDebugString();
1747
10.3M
  if (!CheckUuidMatchOrRespond(tablet_manager_, "UpdateConsensus", req, resp, &context)) {
1748
12
    return;
1749
12
  }
1750
10.3M
  auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond(
1751
10.2M
      tablet_manager_, req->tablet_id(), resp, &context));
1752
10.2M
  auto tablet_peer = peer_tablet.tablet_peer;
1753
1754
  // Submit the update directly to the TabletPeer's Consensus instance.
1755
10.2M
  shared_ptr<Consensus> consensus;
1756
10.2M
  if (!GetConsensusOrRespond(tablet_peer, resp, &context, &consensus)) return;
1757
1758
  // Unfortunately, we have to use const_cast here, because the protobuf-generated interface only
1759
  // gives us a const request, but we need to be able to move messages out of the request for
1760
  // efficiency.
1761
10.2M
  Status s = consensus->Update(
1762
10.2M
      const_cast<ConsensusRequestPB*>(req), resp, context.GetClientDeadline());
1763
10.2M
  if (PREDICT_FALSE(!s.ok())) {
1764
    // Clear the response first, since a partially-filled response could
1765
    // result in confusing a caller, or in having missing required fields
1766
    // in embedded optional messages.
1767
520
    resp->Clear();
1768
1769
520
    SetupErrorAndRespond(resp->mutable_error(), s, &context);
1770
520
    return;
1771
520
  }
1772
1773
10.2M
  CompleteUpdateConsensusResponse(tablet_peer, resp);
1774
1775
10.2M
  context.RespondSuccess();
1776
10.2M
}
1777
1778
void ConsensusServiceImpl::RequestConsensusVote(const VoteRequestPB* req,
1779
                                                VoteResponsePB* resp,
1780
144k
                                                rpc::RpcContext context) {
1781
121
  DVLOG(3) << "Received Consensus Request Vote RPC: " << req->DebugString();
1782
144k
  if (!CheckUuidMatchOrRespond(tablet_manager_, "RequestConsensusVote", req, resp, &context)) {
1783
8
    return;
1784
8
  }
1785
144k
  auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond(
1786
125k
      tablet_manager_, req->tablet_id(), resp, &context));
1787
125k
  auto tablet_peer = peer_tablet.tablet_peer;
1788
1789
  // Submit the vote request directly to the consensus instance.
1790
125k
  shared_ptr<Consensus> consensus;
1791
125k
  if (!GetConsensusOrRespond(tablet_peer, resp, &context, &consensus)) return;
1792
125k
  Status s = consensus->RequestVote(req, resp);
1793
125k
  RETURN_UNKNOWN_ERROR_IF_NOT_OK(s, resp, &context);
1794
125k
  context.RespondSuccess();
1795
125k
}
1796
1797
void ConsensusServiceImpl::ChangeConfig(const ChangeConfigRequestPB* req,
1798
                                        ChangeConfigResponsePB* resp,
1799
2.50k
                                        RpcContext context) {
1800
0
  VLOG(1) << "Received ChangeConfig RPC: " << req->ShortDebugString();
1801
  // If the destination uuid is empty string, it means the client was retrying after a leader
1802
  // stepdown and did not have a chance to update the uuid inside the request.
1803
  // TODO: Note that this can be removed once Java YBClient will reset change config's uuid
1804
  // correctly after leader step down.
1805
2.50k
  if (req->dest_uuid() != "" &&
1806
2.41k
      !CheckUuidMatchOrRespond(tablet_manager_, "ChangeConfig", req, resp, &context)) {
1807
0
    return;
1808
0
  }
1809
2.50k
  auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond(
1810
2.50k
      tablet_manager_, req->tablet_id(), resp, &context));
1811
2.50k
  auto tablet_peer = peer_tablet.tablet_peer;
1812
1813
2.50k
  shared_ptr<Consensus> consensus;
1814
2.50k
  if (!GetConsensusOrRespond(tablet_peer, resp, &context, &consensus)) return;
1815
2.50k
  boost::optional<TabletServerErrorPB::Code> error_code;
1816
2.50k
  std::shared_ptr<RpcContext> context_ptr = std::make_shared<RpcContext>(std::move(context));
1817
2.50k
  Status s = consensus->ChangeConfig(*req, BindHandleResponse(resp, context_ptr), &error_code);
1818
1
  VLOG(1) << "Sent ChangeConfig req " << req->ShortDebugString() << " to consensus layer.";
1819
2.50k
  if (PREDICT_FALSE(!s.ok())) {
1820
717
    HandleErrorResponse(resp, context_ptr.get(), s, error_code);
1821
717
    return;
1822
717
  }
1823
  // The success case is handled when the callback fires.
1824
2.50k
}
1825
1826
void ConsensusServiceImpl::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB* req,
1827
                                              UnsafeChangeConfigResponsePB* resp,
1828
0
                                              RpcContext context) {
1829
0
  VLOG(1) << "Received UnsafeChangeConfig RPC: " << req->ShortDebugString();
1830
0
  if (!CheckUuidMatchOrRespond(tablet_manager_, "UnsafeChangeConfig", req, resp, &context)) {
1831
0
    return;
1832
0
  }
1833
0
  auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond(
1834
0
      tablet_manager_, req->tablet_id(), resp, &context));
1835
0
  auto tablet_peer = peer_tablet.tablet_peer;
1836
1837
0
  shared_ptr<Consensus> consensus;
1838
0
  if (!GetConsensusOrRespond(tablet_peer, resp, &context, &consensus)) {
1839
0
    return;
1840
0
  }
1841
0
  boost::optional<TabletServerErrorPB::Code> error_code;
1842
0
  const Status s = consensus->UnsafeChangeConfig(*req, &error_code);
1843
0
  if (PREDICT_FALSE(!s.ok())) {
1844
0
    SetupErrorAndRespond(resp->mutable_error(), s, &context);
1845
0
    HandleErrorResponse(resp, &context, s, error_code);
1846
0
    return;
1847
0
  }
1848
0
  context.RespondSuccess();
1849
0
}
1850
1851
void ConsensusServiceImpl::GetNodeInstance(const GetNodeInstanceRequestPB* req,
1852
                                           GetNodeInstanceResponsePB* resp,
1853
14.5k
                                           rpc::RpcContext context) {
1854
0
  DVLOG(3) << "Received Get Node Instance RPC: " << req->DebugString();
1855
14.5k
  resp->mutable_node_instance()->CopyFrom(tablet_manager_->NodeInstance());
1856
14.5k
  auto status = tablet_manager_->GetRegistration(resp->mutable_registration());
1857
14.5k
  if (!status.ok()) {
1858
0
    context.RespondFailure(status);
1859
14.5k
  } else {
1860
14.5k
    context.RespondSuccess();
1861
14.5k
  }
1862
14.5k
}
1863
1864
namespace {
1865
1866
class RpcScope {
1867
 public:
1868
  template<class Req, class Resp>
1869
  RpcScope(TabletPeerLookupIf* tablet_manager,
1870
           const char* method_name,
1871
           const Req* req,
1872
           Resp* resp,
1873
           rpc::RpcContext* context)
1874
66.1k
      : context_(context) {
1875
66.1k
    if (!CheckUuidMatchOrRespond(tablet_manager, method_name, req, resp, context)) {
1876
0
      return;
1877
0
    }
1878
66.1k
    auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond(
1879
42.5k
        tablet_manager, req->tablet_id(), resp, context));
1880
42.5k
    auto tablet_peer = peer_tablet.tablet_peer;
1881
1882
42.5k
    if (!GetConsensusOrRespond(tablet_peer, resp, context, &consensus_)) {
1883
0
      return;
1884
0
    }
1885
42.5k
    responded_ = false;
1886
42.5k
  }
tablet_service.cc:_ZN2yb7tserver12_GLOBAL__N_18RpcScopeC2INS_9consensus26RunLeaderElectionRequestPBENS4_27RunLeaderElectionResponsePBEEEPNS0_18TabletPeerLookupIfEPKcPKT_PT0_PNS_3rpc10RpcContextE
Line
Count
Source
1874
56.5k
      : context_(context) {
1875
56.5k
    if (!CheckUuidMatchOrRespond(tablet_manager, method_name, req, resp, context)) {
1876
0
      return;
1877
0
    }
1878
56.5k
    auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond(
1879
32.9k
        tablet_manager, req->tablet_id(), resp, context));
1880
32.9k
    auto tablet_peer = peer_tablet.tablet_peer;
1881
1882
32.9k
    if (!GetConsensusOrRespond(tablet_peer, resp, context, &consensus_)) {
1883
0
      return;
1884
0
    }
1885
32.9k
    responded_ = false;
1886
32.9k
  }
tablet_service.cc:_ZN2yb7tserver12_GLOBAL__N_18RpcScopeC2INS_9consensus27LeaderElectionLostRequestPBENS4_28LeaderElectionLostResponsePBEEEPNS0_18TabletPeerLookupIfEPKcPKT_PT0_PNS_3rpc10RpcContextE
Line
Count
Source
1874
44
      : context_(context) {
1875
44
    if (!CheckUuidMatchOrRespond(tablet_manager, method_name, req, resp, context)) {
1876
0
      return;
1877
0
    }
1878
44
    auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond(
1879
44
        tablet_manager, req->tablet_id(), resp, context));
1880
44
    auto tablet_peer = peer_tablet.tablet_peer;
1881
1882
44
    if (!GetConsensusOrRespond(tablet_peer, resp, context, &consensus_)) {
1883
0
      return;
1884
0
    }
1885
44
    responded_ = false;
1886
44
  }
tablet_service.cc:_ZN2yb7tserver12_GLOBAL__N_18RpcScopeC2INS_9consensus23LeaderStepDownRequestPBENS4_24LeaderStepDownResponsePBEEEPNS0_18TabletPeerLookupIfEPKcPKT_PT0_PNS_3rpc10RpcContextE
Line
Count
Source
1874
5.88k
      : context_(context) {
1875
5.88k
    if (!CheckUuidMatchOrRespond(tablet_manager, method_name, req, resp, context)) {
1876
0
      return;
1877
0
    }
1878
5.88k
    auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond(
1879
5.88k
        tablet_manager, req->tablet_id(), resp, context));
1880
5.88k
    auto tablet_peer = peer_tablet.tablet_peer;
1881
1882
5.88k
    if (!GetConsensusOrRespond(tablet_peer, resp, context, &consensus_)) {
1883
0
      return;
1884
0
    }
1885
5.88k
    responded_ = false;
1886
5.88k
  }
tablet_service.cc:_ZN2yb7tserver12_GLOBAL__N_18RpcScopeC2INS_9consensus26GetConsensusStateRequestPBENS4_27GetConsensusStateResponsePBEEEPNS0_18TabletPeerLookupIfEPKcPKT_PT0_PNS_3rpc10RpcContextE
Line
Count
Source
1874
3.68k
      : context_(context) {
1875
3.68k
    if (!CheckUuidMatchOrRespond(tablet_manager, method_name, req, resp, context)) {
1876
0
      return;
1877
0
    }
1878
3.68k
    auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond(
1879
3.68k
        tablet_manager, req->tablet_id(), resp, context));
1880
3.68k
    auto tablet_peer = peer_tablet.tablet_peer;
1881
1882
3.68k
    if (!GetConsensusOrRespond(tablet_peer, resp, context, &consensus_)) {
1883
0
      return;
1884
0
    }
1885
3.68k
    responded_ = false;
1886
3.68k
  }
1887
1888
66.1k
  ~RpcScope() {
1889
66.1k
    if (!responded_) {
1890
42.5k
      context_->RespondSuccess();
1891
42.5k
    }
1892
66.1k
  }
1893
1894
  template<class Resp>
1895
38.8k
  void CheckStatus(const Status& status, Resp* resp) {
1896
38.8k
    if (!status.ok()) {
1897
0
      LOG(INFO) << "Status failed: " << status.ToString();
1898
0
      SetupErrorAndRespond(resp->mutable_error(), status, context_);
1899
0
      responded_ = true;
1900
0
    }
1901
38.8k
  }
tablet_service.cc:_ZN2yb7tserver12_GLOBAL__N_18RpcScope11CheckStatusINS_9consensus27RunLeaderElectionResponsePBEEEvRKNS_6StatusEPT_
Line
Count
Source
1895
32.9k
  void CheckStatus(const Status& status, Resp* resp) {
1896
32.9k
    if (!status.ok()) {
1897
0
      LOG(INFO) << "Status failed: " << status.ToString();
1898
0
      SetupErrorAndRespond(resp->mutable_error(), status, context_);
1899
0
      responded_ = true;
1900
0
    }
1901
32.9k
  }
tablet_service.cc:_ZN2yb7tserver12_GLOBAL__N_18RpcScope11CheckStatusINS_9consensus28LeaderElectionLostResponsePBEEEvRKNS_6StatusEPT_
Line
Count
Source
1895
44
  void CheckStatus(const Status& status, Resp* resp) {
1896
44
    if (!status.ok()) {
1897
0
      LOG(INFO) << "Status failed: " << status.ToString();
1898
0
      SetupErrorAndRespond(resp->mutable_error(), status, context_);
1899
0
      responded_ = true;
1900
0
    }
1901
44
  }
tablet_service.cc:_ZN2yb7tserver12_GLOBAL__N_18RpcScope11CheckStatusINS_9consensus24LeaderStepDownResponsePBEEEvRKNS_6StatusEPT_
Line
Count
Source
1895
5.88k
  void CheckStatus(const Status& status, Resp* resp) {
1896
5.88k
    if (!status.ok()) {
1897
0
      LOG(INFO) << "Status failed: " << status.ToString();
1898
0
      SetupErrorAndRespond(resp->mutable_error(), status, context_);
1899
0
      responded_ = true;
1900
0
    }
1901
5.88k
  }
1902
1903
42.5k
  Consensus* operator->() {
1904
42.5k
    return consensus_.get();
1905
42.5k
  }
1906
1907
66.1k
  explicit operator bool() const {
1908
66.1k
    return !responded_;
1909
66.1k
  }
1910
1911
 private:
1912
  rpc::RpcContext* context_;
1913
  bool responded_ = true;
1914
  shared_ptr<Consensus> consensus_;
1915
};
1916
1917
} // namespace
1918
1919
void ConsensusServiceImpl::RunLeaderElection(const RunLeaderElectionRequestPB* req,
1920
                                             RunLeaderElectionResponsePB* resp,
1921
56.5k
                                             rpc::RpcContext context) {
1922
60
  VLOG(1) << "Received Run Leader Election RPC: " << req->DebugString();
1923
56.5k
  RpcScope scope(tablet_manager_, "RunLeaderElection", req, resp, &context);
1924
56.5k
  if (!scope) {
1925
23.5k
    return;
1926
23.5k
  }
1927
1928
32.9k
  Status s = scope->StartElection(consensus::LeaderElectionData {
1929
32.9k
    .mode = consensus::ElectionMode::ELECT_EVEN_IF_LEADER_IS_ALIVE,
1930
32.9k
    .pending_commit = req->has_committed_index(),
1931
32.9k
    .must_be_committed_opid = OpId::FromPB(req->committed_index()),
1932
28.0k
    .originator_uuid = req->has_originator_uuid() ? req->originator_uuid() : std::string(),
1933
32.9k
    .suppress_vote_request = consensus::TEST_SuppressVoteRequest(req->suppress_vote_request()),
1934
32.9k
    .initial_election = req->initial_election() });
1935
32.9k
  scope.CheckStatus(s, resp);
1936
32.9k
}
1937
1938
void ConsensusServiceImpl::LeaderElectionLost(const consensus::LeaderElectionLostRequestPB *req,
1939
                                              consensus::LeaderElectionLostResponsePB *resp,
1940
44
                                              ::yb::rpc::RpcContext context) {
1941
44
  LOG(INFO) << "LeaderElectionLost, req: " << req->ShortDebugString();
1942
44
  RpcScope scope(tablet_manager_, "LeaderElectionLost", req, resp, &context);
1943
44
  if (!scope) {
1944
0
    return;
1945
0
  }
1946
44
  auto status = scope->ElectionLostByProtege(req->election_lost_by_uuid());
1947
44
  scope.CheckStatus(status, resp);
1948
44
  LOG(INFO) << "LeaderElectionLost, outcome: " << (scope ? "success" : "failure") << "req: "
1949
44
            << req->ShortDebugString();
1950
44
}
1951
1952
void ConsensusServiceImpl::LeaderStepDown(const LeaderStepDownRequestPB* req,
1953
                                          LeaderStepDownResponsePB* resp,
1954
5.88k
                                          RpcContext context) {
1955
5.88k
  LOG(INFO) << "Received Leader stepdown RPC: " << req->ShortDebugString();
1956
1957
5.88k
  if (PREDICT_FALSE(FLAGS_TEST_leader_stepdown_delay_ms > 0)) {
1958
3
    LOG(INFO) << "Delaying leader stepdown for "
1959
3
              << FLAGS_TEST_leader_stepdown_delay_ms << " ms.";
1960
3
    SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_leader_stepdown_delay_ms));
1961
3
  }
1962
1963
5.88k
  RpcScope scope(tablet_manager_, "LeaderStepDown", req, resp, &context);
1964
5.88k
  if (!scope) {
1965
0
    return;
1966
0
  }
1967
5.88k
  Status s = scope->StepDown(req, resp);
1968
5.88k
  LOG(INFO) << "Leader stepdown request " << req->ShortDebugString() << " success. Resp code="
1969
5.88k
            << TabletServerErrorPB::Code_Name(resp->error().code());
1970
5.88k
  scope.CheckStatus(s, resp);
1971
5.88k
}
1972
1973
void ConsensusServiceImpl::GetLastOpId(const consensus::GetLastOpIdRequestPB *req,
1974
                                       consensus::GetLastOpIdResponsePB *resp,
1975
576
                                       rpc::RpcContext context) {
1976
0
  DVLOG(3) << "Received GetLastOpId RPC: " << req->DebugString();
1977
1978
576
  if (PREDICT_FALSE(req->opid_type() == consensus::UNKNOWN_OPID_TYPE)) {
1979
0
    HandleErrorResponse(resp, &context,
1980
0
                        STATUS(InvalidArgument, "Invalid opid_type specified to GetLastOpId()"));
1981
0
    return;
1982
0
  }
1983
1984
576
  if (!CheckUuidMatchOrRespond(tablet_manager_, "GetLastOpId", req, resp, &context)) {
1985
0
    return;
1986
0
  }
1987
576
  auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond(
1988
557
      tablet_manager_, req->tablet_id(), resp, &context));
1989
557
  auto tablet_peer = peer_tablet.tablet_peer;
1990
1991
557
  if (tablet_peer->state() != tablet::RUNNING) {
1992
0
    SetupErrorAndRespond(resp->mutable_error(),
1993
0
                         STATUS(ServiceUnavailable, "Tablet Peer not in RUNNING state"),
1994
0
                         TabletServerErrorPB::TABLET_NOT_RUNNING, &context);
1995
0
    return;
1996
0
  }
1997
1998
557
  auto consensus = GetConsensusOrRespond(tablet_peer, resp, &context);
1999
557
  if (!consensus) return;
2000
557
  auto op_id = req->has_op_type()
2001
3
      ? consensus->TEST_GetLastOpIdWithType(req->opid_type(), req->op_type())
2002
554
      : consensus->GetLastOpId(req->opid_type());
2003
2004
  // RETURN_UNKNOWN_ERROR_IF_NOT_OK does not support Result, so have to add extra check here.
2005
557
  if (!op_id.ok()) {
2006
3
    RETURN_UNKNOWN_ERROR_IF_NOT_OK(op_id.status(), resp, &context);
2007
3
  }
2008
554
  op_id->ToPB(resp->mutable_opid());
2009
554
  context.RespondSuccess();
2010
554
}
2011
2012
void ConsensusServiceImpl::GetConsensusState(const consensus::GetConsensusStateRequestPB *req,
2013
                                             consensus::GetConsensusStateResponsePB *resp,
2014
3.68k
                                             rpc::RpcContext context) {
2015
1
  DVLOG(3) << "Received GetConsensusState RPC: " << req->DebugString();
2016
2017
3.68k
  RpcScope scope(tablet_manager_, "GetConsensusState", req, resp, &context);
2018
3.68k
  if (!scope) {
2019
2
    return;
2020
2
  }
2021
3.68k
  ConsensusConfigType type = req->type();
2022
3.68k
  if (PREDICT_FALSE(type != CONSENSUS_CONFIG_ACTIVE && type != CONSENSUS_CONFIG_COMMITTED)) {
2023
0
    HandleErrorResponse(resp, &context,
2024
0
        STATUS(InvalidArgument, Substitute("Unsupported ConsensusConfigType $0 ($1)",
2025
0
                                           ConsensusConfigType_Name(type), type)));
2026
0
    return;
2027
0
  }
2028
3.68k
  LeaderLeaseStatus leader_lease_status;
2029
3.68k
  *resp->mutable_cstate() = scope->ConsensusState(req->type(), &leader_lease_status);
2030
3.68k
  resp->set_leader_lease_status(leader_lease_status);
2031
3.68k
}
2032
2033
void ConsensusServiceImpl::StartRemoteBootstrap(const StartRemoteBootstrapRequestPB* req,
2034
                                                StartRemoteBootstrapResponsePB* resp,
2035
4.91k
                                                rpc::RpcContext context) {
2036
4.91k
  if (!CheckUuidMatchOrRespond(tablet_manager_, "StartRemoteBootstrap", req, resp, &context)) {
2037
0
    return;
2038
0
  }
2039
4.91k
  if (req->has_split_parent_tablet_id()
2040
84
      && !PREDICT_FALSE(FLAGS_TEST_disable_post_split_tablet_rbs_check)) {
2041
    // For any tablet that was the result of a split, the raft group leader will always send the
2042
    // split_parent_tablet_id. However, our local tablet manager should only know about the parent
2043
    // if it was part of the raft group which committed the split to the parent, and if the parent
2044
    // tablet has yet to be deleted across the cluster.
2045
82
    TabletPeerTablet result;
2046
82
    if (tablet_manager_->GetTabletPeer(req->split_parent_tablet_id(), &result.tablet_peer).ok()) {
2047
78
      YB_LOG_EVERY_N_SECS(WARNING, 30)
2048
1
          << "Start remote bootstrap rejected: parent tablet not yet split.";
2049
78
      SetupErrorAndRespond(
2050
78
          resp->mutable_error(),
2051
78
          STATUS(Incomplete, "Rejecting bootstrap request while parent tablet is present."),
2052
78
          TabletServerErrorPB::TABLET_SPLIT_PARENT_STILL_LIVE,
2053
78
          &context);
2054
78
      return;
2055
78
    }
2056
4.83k
  }
2057
2058
4.83k
  Status s = tablet_manager_->StartRemoteBootstrap(*req);
2059
4.83k
  if (!s.ok()) {
2060
    // Using Status::AlreadyPresent for a remote bootstrap operation that is already in progress.
2061
3.83k
    if (s.IsAlreadyPresent()) {
2062
3.82k
      YB_LOG_EVERY_N_SECS(WARNING, 30) << "Start remote bootstrap failed: " << s;
2063
3.82k
      SetupErrorAndRespond(resp->mutable_error(), s, TabletServerErrorPB::ALREADY_IN_PROGRESS,
2064
3.82k
                           &context);
2065
3.82k
      return;
2066
16
    } else {
2067
16
      LOG(WARNING) << "Start remote bootstrap failed: " << s;
2068
16
    }
2069
3.83k
  }
2070
2071
1.01k
  RETURN_UNKNOWN_ERROR_IF_NOT_OK(s, resp, &context);
2072
996
  context.RespondSuccess();
2073
996
}
2074
2075
void TabletServiceImpl::NoOp(const NoOpRequestPB *req,
2076
                             NoOpResponsePB *resp,
2077
0
                             rpc::RpcContext context) {
2078
0
  context.RespondSuccess();
2079
0
}
2080
2081
void TabletServiceImpl::Publish(
2082
0
    const PublishRequestPB* req, PublishResponsePB* resp, rpc::RpcContext context) {
2083
0
  rpc::Publisher* publisher = server_->GetPublisher();
2084
0
  resp->set_num_clients_forwarded_to(publisher ? (*publisher)(req->channel(), req->message()) : 0);
2085
0
  context.RespondSuccess();
2086
0
}
2087
2088
void TabletServiceImpl::ListTablets(const ListTabletsRequestPB* req,
2089
                                    ListTabletsResponsePB* resp,
2090
213
                                    rpc::RpcContext context) {
2091
213
  TabletPeers peers = server_->tablet_manager()->GetTabletPeers();
2092
213
  RepeatedPtrField<StatusAndSchemaPB>* peer_status = resp->mutable_status_and_schema();
2093
3.14k
  for (const TabletPeerPtr& peer : peers) {
2094
3.14k
    StatusAndSchemaPB* status = peer_status->Add();
2095
3.14k
    peer->GetTabletStatusPB(status->mutable_tablet_status());
2096
3.14k
    SchemaToPB(*peer->status_listener()->schema(), status->mutable_schema());
2097
3.14k
    peer->tablet_metadata()->partition_schema()->ToPB(status->mutable_partition_schema());
2098
3.14k
  }
2099
213
  context.RespondSuccess();
2100
213
}
2101
2102
void TabletServiceImpl::GetMasterAddresses(const GetMasterAddressesRequestPB* req,
2103
                                           GetMasterAddressesResponsePB* resp,
2104
6
                                           rpc::RpcContext context) {
2105
6
  resp->set_master_addresses(server::MasterAddressesToString(
2106
6
      *server_->tablet_manager()->server()->options().GetMasterAddresses()));
2107
6
  context.RespondSuccess();
2108
6
}
2109
2110
void TabletServiceImpl::GetLogLocation(
2111
    const GetLogLocationRequestPB* req,
2112
    GetLogLocationResponsePB* resp,
2113
0
    rpc::RpcContext context) {
2114
0
  resp->set_log_location(FLAGS_log_dir);
2115
0
  context.RespondSuccess();
2116
0
}
2117
2118
void TabletServiceImpl::ListTabletsForTabletServer(const ListTabletsForTabletServerRequestPB* req,
2119
                                                   ListTabletsForTabletServerResponsePB* resp,
2120
892
                                                   rpc::RpcContext context) {
2121
  // Replicating logic from path-handlers.
2122
892
  TabletPeers peers = server_->tablet_manager()->GetTabletPeers();
2123
3.77k
  for (const TabletPeerPtr& peer : peers) {
2124
3.77k
    TabletStatusPB status;
2125
3.77k
    peer->GetTabletStatusPB(&status);
2126
2127
3.77k
    ListTabletsForTabletServerResponsePB::Entry* data_entry = resp->add_entries();
2128
3.77k
    data_entry->set_table_name(status.table_name());
2129
3.77k
    data_entry->set_tablet_id(status.tablet_id());
2130
2131
3.77k
    std::shared_ptr<consensus::Consensus> consensus = peer->shared_consensus();
2132
3.77k
    data_entry->set_is_leader(consensus && consensus->role() == PeerRole::LEADER);
2133
3.77k
    data_entry->set_state(status.state());
2134
2135
3.77k
    auto tablet = peer->shared_tablet();
2136
3.74k
    uint64_t num_sst_files = tablet ? tablet->GetCurrentVersionNumSSTFiles() : 0;
2137
3.77k
    data_entry->set_num_sst_files(num_sst_files);
2138
2139
3.77k
    uint64_t num_log_segments = peer->GetNumLogSegments();
2140
3.77k
    data_entry->set_num_log_segments(num_log_segments);
2141
2142
3.74k
    auto num_memtables = tablet ? tablet->GetNumMemtables() : std::make_pair(0, 0);
2143
3.77k
    data_entry->set_num_memtables_intents(num_memtables.first);
2144
3.77k
    data_entry->set_num_memtables_regular(num_memtables.second);
2145
3.77k
  }
2146
2147
892
  context.RespondSuccess();
2148
892
}
2149
2150
namespace {
2151
2152
2.89k
Result<uint64_t> CalcChecksum(tablet::Tablet* tablet, CoarseTimePoint deadline) {
2153
2.89k
  const shared_ptr<Schema> schema = tablet->metadata()->schema();
2154
2.89k
  auto client_schema = schema->CopyWithoutColumnIds();
2155
2.89k
  auto iter = tablet->NewRowIterator(client_schema, {}, "", deadline);
2156
2.89k
  RETURN_NOT_OK(iter);
2157
2158
2.89k
  QLTableRow value_map;
2159
2.89k
  ScanResultChecksummer collector;
2160
2161
1.73M
  while (VERIFY_RESULT((**iter).HasNext())) {
2162
1.73M
    RETURN_NOT_OK((**iter).NextRow(&value_map));
2163
1.73M
    collector.HandleRow(*schema, value_map);
2164
1.73M
  }
2165
2166
2.89k
  return collector.agg_checksum();
2167
2.89k
}
2168
2169
} // namespace
2170
2171
Result<uint64_t> TabletServiceImpl::DoChecksum(
2172
2.90k
    const ChecksumRequestPB* req, CoarseTimePoint deadline) {
2173
2.90k
  auto abstract_tablet = VERIFY_RESULT(GetTablet(
2174
2.90k
      server_->tablet_peer_lookup(), req->tablet_id(), /* tablet_peer = */ nullptr,
2175
2.90k
      req->consistency_level(), AllowSplitTablet::kTrue));
2176
2.90k
  return CalcChecksum(down_cast<tablet::Tablet*>(abstract_tablet.get()), deadline);
2177
2.90k
}
2178
2179
void TabletServiceImpl::Checksum(const ChecksumRequestPB* req,
2180
                                 ChecksumResponsePB* resp,
2181
2.90k
                                 rpc::RpcContext context) {
2182
3
  VLOG(1) << "Full request: " << req->DebugString();
2183
2184
2.90k
  auto checksum = DoChecksum(req, context.GetClientDeadline());
2185
2.90k
  if (!checksum.ok()) {
2186
4
    SetupErrorAndRespond(resp->mutable_error(), checksum.status(), &context);
2187
4
    return;
2188
4
  }
2189
2190
2.90k
  resp->set_checksum(*checksum);
2191
2.90k
  context.RespondSuccess();
2192
2.90k
}
2193
2194
void TabletServiceImpl::ImportData(const ImportDataRequestPB* req,
2195
                                   ImportDataResponsePB* resp,
2196
0
                                   rpc::RpcContext context) {
2197
0
  auto peer = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond(
2198
0
      server_->tablet_peer_lookup(), req->tablet_id(), resp, &context));
2199
2200
0
  auto status = peer.tablet_peer->tablet()->ImportData(req->source_dir());
2201
0
  if (!status.ok()) {
2202
0
    SetupErrorAndRespond(resp->mutable_error(), status, &context);
2203
0
    return;
2204
0
  }
2205
0
  context.RespondSuccess();
2206
0
}
2207
2208
void TabletServiceImpl::GetTabletStatus(const GetTabletStatusRequestPB* req,
2209
                                        GetTabletStatusResponsePB* resp,
2210
1
                                        rpc::RpcContext context) {
2211
1
  const Status s = server_->GetTabletStatus(req, resp);
2212
1
  if (!s.ok()) {
2213
0
    SetupErrorAndRespond(resp->mutable_error(), s,
2214
0
                         s.IsNotFound() ? TabletServerErrorPB::TABLET_NOT_FOUND
2215
0
                                        : TabletServerErrorPB::UNKNOWN_ERROR,
2216
0
                         &context);
2217
0
    return;
2218
0
  }
2219
1
  context.RespondSuccess();
2220
1
}
2221
2222
void TabletServiceImpl::IsTabletServerReady(const IsTabletServerReadyRequestPB* req,
2223
                                            IsTabletServerReadyResponsePB* resp,
2224
9
                                            rpc::RpcContext context) {
2225
9
  Status s = server_->tablet_manager()->GetNumTabletsPendingBootstrap(resp);
2226
9
  if (!s.ok()) {
2227
0
    SetupErrorAndRespond(resp->mutable_error(), s, &context);
2228
0
    return;
2229
0
  }
2230
9
  context.RespondSuccess();
2231
9
}
2232
2233
void TabletServiceImpl::TakeTransaction(const TakeTransactionRequestPB* req,
2234
                                        TakeTransactionResponsePB* resp,
2235
0
                                        rpc::RpcContext context) {
2236
0
  auto transaction = server_->TransactionPool()->Take(
2237
0
      client::ForceGlobalTransaction(req->has_is_global() && req->is_global()));
2238
0
  auto metadata = transaction->Release();
2239
0
  if (!metadata.ok()) {
2240
0
    LOG(INFO) << "Take failed: " << metadata.status();
2241
0
    context.RespondFailure(metadata.status());
2242
0
    return;
2243
0
  }
2244
0
  metadata->ForceToPB(resp->mutable_metadata());
2245
0
  VLOG(2) << "Taken metadata: " << metadata->ToString();
2246
0
  context.RespondSuccess();
2247
0
}
2248
2249
void TabletServiceImpl::GetSplitKey(
2250
49
    const GetSplitKeyRequestPB* req, GetSplitKeyResponsePB* resp, RpcContext context) {
2251
49
  TEST_PAUSE_IF_FLAG(TEST_pause_tserver_get_split_key);
2252
49
  PerformAtLeader(req, resp, &context,
2253
47
      [resp](const LeaderTabletPeer& leader_tablet_peer) -> Status {
2254
47
        const auto& tablet = leader_tablet_peer.tablet;
2255
2256
47
        if (tablet->MayHaveOrphanedPostSplitData()) {
2257
0
          return STATUS(IllegalState, "Tablet has orphaned post-split data");
2258
0
        }
2259
47
        const auto split_encoded_key = VERIFY_RESULT(tablet->GetEncodedMiddleSplitKey());
2260
46
        resp->set_split_encoded_key(split_encoded_key);
2261
46
        const auto doc_key_hash = VERIFY_RESULT(docdb::DecodeDocKeyHash(split_encoded_key));
2262
46
        if (doc_key_hash.has_value()) {
2263
46
          resp->set_split_partition_key(PartitionSchema::EncodeMultiColumnHashValue(
2264
46
              doc_key_hash.value()));
2265
0
        } else {
2266
0
          resp->set_split_partition_key(split_encoded_key);
2267
0
        }
2268
46
        return Status::OK();
2269
46
  });
2270
49
}
2271
2272
void TabletServiceImpl::GetSharedData(const GetSharedDataRequestPB* req,
2273
                                      GetSharedDataResponsePB* resp,
2274
0
                                      rpc::RpcContext context) {
2275
0
  auto& data = server_->SharedObject();
2276
0
  resp->mutable_data()->assign(pointer_cast<const char*>(&data), sizeof(data));
2277
0
  context.RespondSuccess();
2278
0
}
2279
2280
160
void TabletServiceImpl::Shutdown() {
2281
160
}
2282
2283
scoped_refptr<Histogram> TabletServer::GetMetricsHistogram(
2284
75.5k
    TabletServerServiceRpcMethodIndexes metric) {
2285
  // Returns the metric Histogram by holding a lock to make sure tablet_server_service_ remains
2286
  // unchanged during the operation.
2287
75.5k
  std::lock_guard<simple_spinlock> l(lock_);
2288
75.5k
  if (tablet_server_service_) {
2289
75.5k
    return tablet_server_service_->GetMetric(metric).handler_latency;
2290
75.5k
  }
2291
0
  return nullptr;
2292
0
}
2293
2294
TabletServerForwardServiceImpl::TabletServerForwardServiceImpl(TabletServiceImpl *impl,
2295
                                                               TabletServerIf *server)
2296
  : TabletServerForwardServiceIf(server->MetricEnt()),
2297
5.81k
    server_(server) {
2298
5.81k
}
2299
2300
void TabletServerForwardServiceImpl::Write(const WriteRequestPB* req,
2301
                                           WriteResponsePB* resp,
2302
0
                                           rpc::RpcContext context) {
2303
  // Forward the rpc to the required Tserver.
2304
0
  std::shared_ptr<ForwardWriteRpc> forward_rpc =
2305
0
    std::make_shared<ForwardWriteRpc>(req, resp, std::move(context), server_->client());
2306
0
  forward_rpc->SendRpc();
2307
0
}
2308
2309
void TabletServerForwardServiceImpl::Read(const ReadRequestPB* req,
2310
                                          ReadResponsePB* resp,
2311
0
                                          rpc::RpcContext context) {
2312
0
  std::shared_ptr<ForwardReadRpc> forward_rpc =
2313
0
    std::make_shared<ForwardReadRpc>(req, resp, std::move(context), server_->client());
2314
0
  forward_rpc->SendRpc();
2315
0
}
2316
2317
}  // namespace tserver
2318
}  // namespace yb