YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/tserver/tablet_service.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/tserver/tablet_service.h"
34
35
#include <algorithm>
36
#include <memory>
37
#include <string>
38
#include <vector>
39
40
#include <glog/logging.h>
41
42
#include "yb/client/forward_rpc.h"
43
#include "yb/client/transaction.h"
44
#include "yb/client/transaction_pool.h"
45
46
#include "yb/common/ql_rowblock.h"
47
#include "yb/common/ql_value.h"
48
#include "yb/common/row_mark.h"
49
#include "yb/common/schema.h"
50
#include "yb/common/wire_protocol.h"
51
#include "yb/consensus/leader_lease.h"
52
#include "yb/consensus/consensus.pb.h"
53
#include "yb/consensus/raft_consensus.h"
54
55
#include "yb/docdb/cql_operation.h"
56
#include "yb/docdb/pgsql_operation.h"
57
58
#include "yb/gutil/bind.h"
59
#include "yb/gutil/casts.h"
60
#include "yb/gutil/stl_util.h"
61
#include "yb/gutil/stringprintf.h"
62
#include "yb/gutil/strings/escaping.h"
63
64
#include "yb/rpc/thread_pool.h"
65
66
#include "yb/server/hybrid_clock.h"
67
68
#include "yb/tablet/abstract_tablet.h"
69
#include "yb/tablet/metadata.pb.h"
70
#include "yb/tablet/operations/change_metadata_operation.h"
71
#include "yb/tablet/operations/split_operation.h"
72
#include "yb/tablet/operations/truncate_operation.h"
73
#include "yb/tablet/operations/update_txn_operation.h"
74
#include "yb/tablet/operations/write_operation.h"
75
#include "yb/tablet/read_result.h"
76
#include "yb/tablet/tablet.h"
77
#include "yb/tablet/tablet_bootstrap_if.h"
78
#include "yb/tablet/tablet_metadata.h"
79
#include "yb/tablet/tablet_metrics.h"
80
#include "yb/tablet/transaction_participant.h"
81
#include "yb/tablet/write_query.h"
82
83
#include "yb/tserver/read_query.h"
84
#include "yb/tserver/service_util.h"
85
#include "yb/tserver/tablet_server.h"
86
#include "yb/tserver/ts_tablet_manager.h"
87
#include "yb/tserver/tserver_error.h"
88
89
#include "yb/util/crc.h"
90
#include "yb/util/debug-util.h"
91
#include "yb/util/debug/long_operation_tracker.h"
92
#include "yb/util/debug/trace_event.h"
93
#include "yb/util/faststring.h"
94
#include "yb/util/flag_tags.h"
95
#include "yb/util/format.h"
96
#include "yb/util/logging.h"
97
#include "yb/util/math_util.h"
98
#include "yb/util/mem_tracker.h"
99
#include "yb/util/metrics.h"
100
#include "yb/util/monotime.h"
101
#include "yb/util/random_util.h"
102
#include "yb/util/scope_exit.h"
103
#include "yb/util/size_literals.h"
104
#include "yb/util/status.h"
105
#include "yb/util/status_callback.h"
106
#include "yb/util/status_format.h"
107
#include "yb/util/status_log.h"
108
#include "yb/util/string_util.h"
109
#include "yb/util/trace.h"
110
111
#include "yb/yql/pgwrapper/ysql_upgrade.h"
112
113
using namespace std::literals;  // NOLINT
114
115
DEFINE_int32(scanner_default_batch_size_bytes, 64 * 1024,
116
             "The default size for batches of scan results");
117
TAG_FLAG(scanner_default_batch_size_bytes, advanced);
118
TAG_FLAG(scanner_default_batch_size_bytes, runtime);
119
120
DEFINE_int32(scanner_max_batch_size_bytes, 8 * 1024 * 1024,
121
             "The maximum batch size that a client may request for "
122
             "scan results.");
123
TAG_FLAG(scanner_max_batch_size_bytes, advanced);
124
TAG_FLAG(scanner_max_batch_size_bytes, runtime);
125
126
DEFINE_int32(scanner_batch_size_rows, 100,
127
             "The number of rows to batch for servicing scan requests.");
128
TAG_FLAG(scanner_batch_size_rows, advanced);
129
TAG_FLAG(scanner_batch_size_rows, runtime);
130
131
// Fault injection flags.
132
DEFINE_test_flag(int32, scanner_inject_latency_on_each_batch_ms, 0,
133
                 "If set, the scanner will pause the specified number of milliesconds "
134
                 "before reading each batch of data on the tablet server.");
135
136
DEFINE_int32(max_wait_for_safe_time_ms, 5000,
137
             "Maximum time in milliseconds to wait for the safe time to advance when trying to "
138
             "scan at the given hybrid_time.");
139
140
DEFINE_int32(num_concurrent_backfills_allowed, -1,
141
             "Maximum number of concurrent backfill jobs that is allowed to run.");
142
143
DEFINE_test_flag(bool, tserver_noop_read_write, false, "Respond NOOP to read/write.");
144
145
DEFINE_uint64(index_backfill_upperbound_for_user_enforced_txn_duration_ms, 65000,
146
              "For Non-Txn tables, it is impossible to know at the tservers "
147
              "whether or not an 'old transaction' is still active. To avoid "
148
              "having such old transactions, we assume a bound on the duration "
149
              "of such transactions (during the backfill process) and wait "
150
              "it out. This flag denotes a conservative upper bound on the "
151
              "duration of such user enforced transactions.");
152
TAG_FLAG(index_backfill_upperbound_for_user_enforced_txn_duration_ms, evolving);
153
TAG_FLAG(index_backfill_upperbound_for_user_enforced_txn_duration_ms, runtime);
154
155
DEFINE_int32(index_backfill_additional_delay_before_backfilling_ms, 0,
156
             "Operations that are received by the tserver, and have decided how "
157
             "the indexes need to be updated (based on the IndexPermission), will "
158
             "not be added to the list of current transactions until they are "
159
             "replicated/applied. This delay allows for the GetSafeTime method "
160
             "to wait for such operations to be replicated/applied. Ideally, this "
161
             "value should be set to be something larger than the raft-heartbeat-interval "
162
             "but can be as high as the client_rpc_timeout if we want to be more conservative.");
163
TAG_FLAG(index_backfill_additional_delay_before_backfilling_ms, evolving);
164
TAG_FLAG(index_backfill_additional_delay_before_backfilling_ms, runtime);
165
166
DEFINE_int32(index_backfill_wait_for_old_txns_ms, 0,
167
             "Index backfill needs to wait for transactions that started before the "
168
             "WRITE_AND_DELETE phase to commit or abort before choosing a time for "
169
             "backfilling the index. This is the max time that the GetSafeTime call will "
170
             "wait for, before it resorts to attempt aborting old transactions. This is "
171
             "necessary to guard against the pathological active transaction that never "
172
             "commits from blocking the index backfill forever.");
173
TAG_FLAG(index_backfill_wait_for_old_txns_ms, evolving);
174
TAG_FLAG(index_backfill_wait_for_old_txns_ms, runtime);
175
176
DEFINE_test_flag(double, respond_write_failed_probability, 0.0,
177
                 "Probability to respond that write request is failed");
178
179
DEFINE_test_flag(bool, rpc_delete_tablet_fail, false, "Should delete tablet RPC fail.");
180
181
DECLARE_bool(disable_alter_vs_write_mutual_exclusion);
182
DECLARE_uint64(max_clock_skew_usec);
183
DECLARE_uint64(transaction_min_running_check_interval_ms);
184
DECLARE_int64(transaction_rpc_timeout_ms);
185
186
DEFINE_test_flag(int32, txn_status_table_tablet_creation_delay_ms, 0,
187
                 "Extra delay to slowdown creation of transaction status table tablet.");
188
189
DEFINE_test_flag(int32, leader_stepdown_delay_ms, 0,
190
                 "Amount of time to delay before starting a leader stepdown change.");
191
192
DEFINE_test_flag(int32, alter_schema_delay_ms, 0, "Delay before processing AlterSchema.");
193
194
DEFINE_test_flag(bool, disable_post_split_tablet_rbs_check, false,
195
                 "If true, bypass any checks made to reject remote boostrap requests for post "
196
                 "split tablets whose parent tablets are still present.");
197
198
DEFINE_test_flag(double, fail_tablet_split_probability, 0.0,
199
                 "Probability of failing in TabletServiceAdminImpl::SplitTablet.");
200
201
DEFINE_test_flag(bool, pause_tserver_get_split_key, false,
202
                 "Pause before processing a GetSplitKey request.");
203
204
DECLARE_int32(heartbeat_interval_ms);
205
DECLARE_uint64(rocksdb_max_file_size_for_compaction);
206
207
DECLARE_int32(ysql_transaction_abort_timeout_ms);
208
209
DEFINE_test_flag(bool, fail_alter_schema_after_abort_transactions, false,
210
                 "If true, setup an error status in AlterSchema and respond success to rpc call. "
211
                 "This failure should not cause the TServer to crash but "
212
                 "instead return an error message on the YSQL connection.");
213
214
double TEST_delay_create_transaction_probability = 0;
215
216
namespace yb {
217
namespace tserver {
218
219
using client::internal::ForwardReadRpc;
220
using client::internal::ForwardWriteRpc;
221
using consensus::ChangeConfigRequestPB;
222
using consensus::ChangeConfigResponsePB;
223
using consensus::Consensus;
224
using consensus::CONSENSUS_CONFIG_ACTIVE;
225
using consensus::CONSENSUS_CONFIG_COMMITTED;
226
using consensus::ConsensusConfigType;
227
using consensus::ConsensusRequestPB;
228
using consensus::ConsensusResponsePB;
229
using consensus::GetLastOpIdRequestPB;
230
using consensus::GetNodeInstanceRequestPB;
231
using consensus::GetNodeInstanceResponsePB;
232
using consensus::LeaderLeaseStatus;
233
using consensus::LeaderStepDownRequestPB;
234
using consensus::LeaderStepDownResponsePB;
235
using consensus::RaftPeerPB;
236
using consensus::RunLeaderElectionRequestPB;
237
using consensus::RunLeaderElectionResponsePB;
238
using consensus::StartRemoteBootstrapRequestPB;
239
using consensus::StartRemoteBootstrapResponsePB;
240
using consensus::UnsafeChangeConfigRequestPB;
241
using consensus::UnsafeChangeConfigResponsePB;
242
using consensus::VoteRequestPB;
243
using consensus::VoteResponsePB;
244
245
using std::unique_ptr;
246
using google::protobuf::RepeatedPtrField;
247
using rpc::RpcContext;
248
using std::shared_ptr;
249
using std::vector;
250
using std::string;
251
using strings::Substitute;
252
using tablet::ChangeMetadataOperation;
253
using tablet::Tablet;
254
using tablet::TabletPeer;
255
using tablet::TabletPeerPtr;
256
using tablet::TabletStatusPB;
257
using tablet::TruncateOperation;
258
using tablet::OperationCompletionCallback;
259
using tablet::WriteOperation;
260
261
namespace {
262
263
27.0M
Result<std::shared_ptr<consensus::RaftConsensus>> GetConsensus(const TabletPeerPtr& tablet_peer) {
264
27.0M
  auto result = tablet_peer->shared_raft_consensus();
265
27.0M
  if (!result) {
266
0
    Status s = STATUS(ServiceUnavailable, "Consensus unavailable. Tablet not running");
267
0
    return s.CloneAndAddErrorCode(TabletServerError(TabletServerErrorPB::TABLET_NOT_RUNNING));
268
0
  }
269
27.0M
  return result;
270
27.0M
}
271
272
template<class RespClass>
273
std::shared_ptr<consensus::RaftConsensus> GetConsensusOrRespond(const TabletPeerPtr& tablet_peer,
274
                                                                RespClass* resp,
275
556
                                                                rpc::RpcContext* context) {
276
556
  auto result = GetConsensus(tablet_peer);
277
556
  if (!result.ok()) {
278
0
    SetupErrorAndRespond(resp->mutable_error(), result.status(), context);
279
0
  }
280
556
  return result.get();
281
556
}
282
283
template<class RespClass>
284
bool GetConsensusOrRespond(const TabletPeerPtr& tablet_peer,
285
                           RespClass* resp,
286
                           rpc::RpcContext* context,
287
27.0M
                           shared_ptr<Consensus>* consensus) {
288
27.0M
  auto result = GetConsensus(tablet_peer);
289
27.0M
  if (!result.ok()) {
290
0
    SetupErrorAndRespond(resp->mutable_error(), result.status(), context);
291
0
    return false;
292
0
  }
293
27.0M
  return (*consensus = result.get()) != nullptr;
294
27.0M
}
tablet_service.cc:bool yb::tserver::(anonymous namespace)::GetConsensusOrRespond<yb::consensus::ConsensusResponsePB>(std::__1::shared_ptr<yb::tablet::TabletPeer> const&, yb::consensus::ConsensusResponsePB*, yb::rpc::RpcContext*, std::__1::shared_ptr<yb::consensus::Consensus>*)
Line
Count
Source
287
25.5M
                           shared_ptr<Consensus>* consensus) {
288
25.5M
  auto result = GetConsensus(tablet_peer);
289
25.5M
  if (!result.ok()) {
290
0
    SetupErrorAndRespond(resp->mutable_error(), result.status(), context);
291
0
    return false;
292
0
  }
293
25.5M
  return (*consensus = result.get()) != nullptr;
294
25.5M
}
tablet_service.cc:bool yb::tserver::(anonymous namespace)::GetConsensusOrRespond<yb::consensus::VoteResponsePB>(std::__1::shared_ptr<yb::tablet::TabletPeer> const&, yb::consensus::VoteResponsePB*, yb::rpc::RpcContext*, std::__1::shared_ptr<yb::consensus::Consensus>*)
Line
Count
Source
287
1.36M
                           shared_ptr<Consensus>* consensus) {
288
1.36M
  auto result = GetConsensus(tablet_peer);
289
1.36M
  if (!result.ok()) {
290
0
    SetupErrorAndRespond(resp->mutable_error(), result.status(), context);
291
0
    return false;
292
0
  }
293
1.36M
  return (*consensus = result.get()) != nullptr;
294
1.36M
}
tablet_service.cc:bool yb::tserver::(anonymous namespace)::GetConsensusOrRespond<yb::consensus::ChangeConfigResponsePB>(std::__1::shared_ptr<yb::tablet::TabletPeer> const&, yb::consensus::ChangeConfigResponsePB*, yb::rpc::RpcContext*, std::__1::shared_ptr<yb::consensus::Consensus>*)
Line
Count
Source
287
5.29k
                           shared_ptr<Consensus>* consensus) {
288
5.29k
  auto result = GetConsensus(tablet_peer);
289
5.29k
  if (!result.ok()) {
290
0
    SetupErrorAndRespond(resp->mutable_error(), result.status(), context);
291
0
    return false;
292
0
  }
293
5.29k
  return (*consensus = result.get()) != nullptr;
294
5.29k
}
tablet_service.cc:bool yb::tserver::(anonymous namespace)::GetConsensusOrRespond<yb::consensus::UnsafeChangeConfigResponsePB>(std::__1::shared_ptr<yb::tablet::TabletPeer> const&, yb::consensus::UnsafeChangeConfigResponsePB*, yb::rpc::RpcContext*, std::__1::shared_ptr<yb::consensus::Consensus>*)
Line
Count
Source
287
6
                           shared_ptr<Consensus>* consensus) {
288
6
  auto result = GetConsensus(tablet_peer);
289
6
  if (!result.ok()) {
290
0
    SetupErrorAndRespond(resp->mutable_error(), result.status(), context);
291
0
    return false;
292
0
  }
293
6
  return (*consensus = result.get()) != nullptr;
294
6
}
tablet_service.cc:bool yb::tserver::(anonymous namespace)::GetConsensusOrRespond<yb::consensus::RunLeaderElectionResponsePB>(std::__1::shared_ptr<yb::tablet::TabletPeer> const&, yb::consensus::RunLeaderElectionResponsePB*, yb::rpc::RpcContext*, std::__1::shared_ptr<yb::consensus::Consensus>*)
Line
Count
Source
287
57.8k
                           shared_ptr<Consensus>* consensus) {
288
57.8k
  auto result = GetConsensus(tablet_peer);
289
57.8k
  if (!result.ok()) {
290
0
    SetupErrorAndRespond(resp->mutable_error(), result.status(), context);
291
0
    return false;
292
0
  }
293
57.8k
  return (*consensus = result.get()) != nullptr;
294
57.8k
}
tablet_service.cc:bool yb::tserver::(anonymous namespace)::GetConsensusOrRespond<yb::consensus::LeaderElectionLostResponsePB>(std::__1::shared_ptr<yb::tablet::TabletPeer> const&, yb::consensus::LeaderElectionLostResponsePB*, yb::rpc::RpcContext*, std::__1::shared_ptr<yb::consensus::Consensus>*)
Line
Count
Source
287
77
                           shared_ptr<Consensus>* consensus) {
288
77
  auto result = GetConsensus(tablet_peer);
289
77
  if (!result.ok()) {
290
0
    SetupErrorAndRespond(resp->mutable_error(), result.status(), context);
291
0
    return false;
292
0
  }
293
77
  return (*consensus = result.get()) != nullptr;
294
77
}
tablet_service.cc:bool yb::tserver::(anonymous namespace)::GetConsensusOrRespond<yb::consensus::LeaderStepDownResponsePB>(std::__1::shared_ptr<yb::tablet::TabletPeer> const&, yb::consensus::LeaderStepDownResponsePB*, yb::rpc::RpcContext*, std::__1::shared_ptr<yb::consensus::Consensus>*)
Line
Count
Source
287
54.0k
                           shared_ptr<Consensus>* consensus) {
288
54.0k
  auto result = GetConsensus(tablet_peer);
289
54.0k
  if (!result.ok()) {
290
0
    SetupErrorAndRespond(resp->mutable_error(), result.status(), context);
291
0
    return false;
292
0
  }
293
54.0k
  return (*consensus = result.get()) != nullptr;
294
54.0k
}
tablet_service.cc:bool yb::tserver::(anonymous namespace)::GetConsensusOrRespond<yb::consensus::GetConsensusStateResponsePB>(std::__1::shared_ptr<yb::tablet::TabletPeer> const&, yb::consensus::GetConsensusStateResponsePB*, yb::rpc::RpcContext*, std::__1::shared_ptr<yb::consensus::Consensus>*)
Line
Count
Source
287
3.88k
                           shared_ptr<Consensus>* consensus) {
288
3.88k
  auto result = GetConsensus(tablet_peer);
289
3.88k
  if (!result.ok()) {
290
0
    SetupErrorAndRespond(resp->mutable_error(), result.status(), context);
291
0
    return false;
292
0
  }
293
3.88k
  return (*consensus = result.get()) != nullptr;
294
3.88k
}
295
296
} // namespace
297
298
template<class Resp>
299
bool TabletServiceImpl::CheckWriteThrottlingOrRespond(
300
2.56M
    double score, tablet::TabletPeer* tablet_peer, Resp* resp, rpc::RpcContext* context) {
301
  // Check for memory pressure; don't bother doing any additional work if we've
302
  // exceeded the limit.
303
2.56M
  auto status = CheckWriteThrottling(score, tablet_peer);
304
2.56M
  if (!status.ok()) {
305
15
    SetupErrorAndRespond(resp->mutable_error(), status, context);
306
15
    return false;
307
15
  }
308
309
2.56M
  return true;
310
2.56M
}
311
312
typedef ListTabletsResponsePB::StatusAndSchemaPB StatusAndSchemaPB;
313
314
class WriteQueryCompletionCallback {
315
 public:
316
  WriteQueryCompletionCallback(
317
      tablet::TabletPeerPtr tablet_peer,
318
      std::shared_ptr<rpc::RpcContext> context,
319
      WriteResponsePB* response,
320
      tablet::WriteQuery* query,
321
      const server::ClockPtr& clock,
322
      bool trace = false)
323
      : tablet_peer_(std::move(tablet_peer)),
324
        context_(std::move(context)),
325
        response_(response),
326
        query_(query),
327
        clock_(clock),
328
        include_trace_(trace),
329
2.56M
        trace_(include_trace_ ? Trace::CurrentTrace() : nullptr) {}
330
331
2.56M
  void operator()(Status status) const {
332
18.4E
    VLOG(1) << __PRETTY_FUNCTION__ << " completing with status " << status;
333
    // When we don't need to return any data, we could return success on duplicate request.
334
2.56M
    if (status.IsAlreadyPresent() &&
335
2.56M
        
query_->ql_write_ops()->empty()1
&&
336
2.56M
        
query_->pgsql_write_ops()->empty()1
&&
337
2.56M
        
query_->client_request()->redis_write_batch().empty()1
) {
338
1
      status = Status::OK();
339
1
    }
340
341
2.56M
    TRACE("Write completing with status $0", yb::ToString(status));
342
343
2.56M
    if (!status.ok()) {
344
90.4k
      LOG(INFO) << tablet_peer_->LogPrefix() << "Write failed: " << status;
345
90.4k
      if (include_trace_ && 
trace_0
) {
346
0
        response_->set_trace_buffer(trace_->DumpToString(true));
347
0
      }
348
90.4k
      SetupErrorAndRespond(get_error(), status, context_.get());
349
90.4k
      return;
350
90.4k
    }
351
352
    // Retrieve the rowblocks returned from the QL write operations and return them as RPC
353
    // sidecars. Populate the row schema also.
354
2.47M
    faststring rows_data;
355
2.47M
    for (const auto& ql_write_op : *query_->ql_write_ops()) {
356
273
      const auto& ql_write_req = ql_write_op->request();
357
273
      auto* ql_write_resp = ql_write_op->response();
358
273
      const QLRowBlock* rowblock = ql_write_op->rowblock();
359
273
      SchemaToColumnPBs(rowblock->schema(), ql_write_resp->mutable_column_schemas());
360
273
      rows_data.clear();
361
273
      rowblock->Serialize(ql_write_req.client(), &rows_data);
362
273
      ql_write_resp->set_rows_data_sidecar(
363
273
          narrow_cast<int32_t>(context_->AddRpcSidecar(rows_data)));
364
273
    }
365
366
2.47M
    if (!query_->pgsql_write_ops()->empty()) {
367
      // Retrieve the resultset returned from the PGSQL write operations and return them as RPC
368
      // sidecars.
369
370
644k
      size_t sidecars_size = 0;
371
7.16M
      for (const auto& pgsql_write_op : *query_->pgsql_write_ops()) {
372
7.16M
        sidecars_size += pgsql_write_op->result_buffer().size();
373
7.16M
      }
374
375
644k
      if (sidecars_size != 0) {
376
633k
        context_->ReserveSidecarSpace(sidecars_size);
377
7.14M
        for (const auto& pgsql_write_op : *query_->pgsql_write_ops()) {
378
7.14M
          auto* pgsql_write_resp = pgsql_write_op->response();
379
7.14M
          const faststring& result_buffer = pgsql_write_op->result_buffer();
380
7.14M
          if (
!result_buffer.empty()7.14M
) {
381
7.14M
            pgsql_write_resp->set_rows_data_sidecar(
382
7.14M
                narrow_cast<int32_t>(context_->AddRpcSidecar(result_buffer)));
383
7.14M
          }
384
7.14M
        }
385
633k
      }
386
644k
    }
387
388
2.47M
    if (include_trace_ && 
trace_0
) {
389
0
      response_->set_trace_buffer(trace_->DumpToString(true));
390
0
    }
391
2.47M
    response_->set_propagated_hybrid_time(clock_->Now().ToUint64());
392
2.47M
    context_->RespondSuccess();
393
18.4E
    VLOG(1) << __PRETTY_FUNCTION__ << " RespondedSuccess";
394
2.47M
  }
395
396
 private:
397
90.5k
  TabletServerErrorPB* get_error() const {
398
90.5k
    return response_->mutable_error();
399
90.5k
  }
400
401
  tablet::TabletPeerPtr tablet_peer_;
402
  const std::shared_ptr<rpc::RpcContext> context_;
403
  WriteResponsePB* const response_;
404
  tablet::WriteQuery* const query_;
405
  server::ClockPtr clock_;
406
  const bool include_trace_;
407
  scoped_refptr<Trace> trace_;
408
};
409
410
// Checksums the scan result.
411
class ScanResultChecksummer {
412
 public:
413
3.23k
  ScanResultChecksummer() {}
414
415
1.60M
  void HandleRow(const Schema& schema, const QLTableRow& row) {
416
1.60M
    QLValue value;
417
1.60M
    buffer_.clear();
418
7.31M
    for (uint32_t col_index = 0; col_index != schema.num_columns(); 
++col_index5.71M
) {
419
5.71M
      auto status = row.GetValue(schema.column_id(col_index), &value);
420
5.71M
      if (!status.ok()) {
421
0
        LOG(WARNING) << "Column " << schema.column_id(col_index)
422
0
                     << " not found in " << row.ToString();
423
0
        continue;
424
0
      }
425
5.71M
      buffer_.append(pointer_cast<const char*>(&col_index), sizeof(col_index));
426
5.71M
      if (schema.column(col_index).is_nullable()) {
427
2.98M
        uint8_t defined = value.IsNull() ? 
02.29k
:
12.98M
;
428
2.98M
        buffer_.append(pointer_cast<const char*>(&defined), sizeof(defined));
429
2.98M
      }
430
5.71M
      if (!value.IsNull()) {
431
5.71M
        value.value().AppendToString(&buffer_);
432
5.71M
      }
433
5.71M
    }
434
1.60M
    crc_->Compute(buffer_.c_str(), buffer_.size(), &agg_checksum_, nullptr);
435
1.60M
  }
436
437
  // Accessors for initializing / setting the checksum.
438
3.30k
  uint64_t agg_checksum() const { return agg_checksum_; }
439
440
 private:
441
  crc::Crc* const crc_ = crc::GetCrc32cInstance();
442
  uint64_t agg_checksum_ = 0;
443
  std::string buffer_;
444
};
445
446
Result<std::shared_ptr<tablet::AbstractTablet>> TabletServiceImpl::GetTabletForRead(
447
  const TabletId& tablet_id, tablet::TabletPeerPtr tablet_peer,
448
7.72M
  YBConsistencyLevel consistency_level, tserver::AllowSplitTablet allow_split_tablet) {
449
7.72M
  return GetTablet(server_->tablet_peer_lookup(), tablet_id, std::move(tablet_peer),
450
7.72M
                   consistency_level, allow_split_tablet);
451
7.72M
}
452
453
TabletServiceImpl::TabletServiceImpl(TabletServerIf* server)
454
    : TabletServerServiceIf(server->MetricEnt()),
455
16.7k
      server_(server) {
456
16.7k
}
457
458
TabletServiceAdminImpl::TabletServiceAdminImpl(TabletServer* server)
459
8.74k
    : TabletServerAdminServiceIf(server->MetricEnt()), server_(server) {}
460
461
void TabletServiceAdminImpl::BackfillDone(
462
    const tablet::ChangeMetadataRequestPB* req, ChangeMetadataResponsePB* resp,
463
3.79k
    rpc::RpcContext context) {
464
3.79k
  if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "BackfillDone", req, resp, &context)) {
465
0
    return;
466
0
  }
467
3.79k
  DVLOG
(3) << "Received BackfillDone RPC: " << req->DebugString()27
;
468
469
3.79k
  server::UpdateClock(*req, server_->Clock());
470
471
  // For now, we shall only allow this RPC on the leader.
472
3.79k
  auto tablet =
473
3.79k
      LookupLeaderTabletOrRespond(server_->tablet_peer_lookup(), req->tablet_id(), resp, &context);
474
3.79k
  if (!tablet) {
475
24
    return;
476
24
  }
477
478
3.77k
  auto operation = std::make_unique<ChangeMetadataOperation>(
479
3.77k
      tablet.peer->tablet(), tablet.peer->log(), req);
480
481
3.77k
  operation->set_completion_callback(
482
3.77k
      MakeRpcOperationCompletionCallback(std::move(context), resp, server_->Clock()));
483
484
  // Submit the alter schema op. The RPC will be responded to asynchronously.
485
3.77k
  tablet.peer->Submit(std::move(operation), tablet.leader_term);
486
3.77k
}
487
488
void TabletServiceAdminImpl::GetSafeTime(
489
4.39k
    const GetSafeTimeRequestPB* req, GetSafeTimeResponsePB* resp, rpc::RpcContext context) {
490
4.39k
  if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "GetSafeTime", req, resp, &context)) {
491
0
    return;
492
0
  }
493
4.39k
  DVLOG
(3) << "Received GetSafeTime RPC: " << req->DebugString()13
;
494
495
4.39k
  server::UpdateClock(*req, server_->Clock());
496
497
  // For now, we shall only allow this RPC on the leader.
498
4.39k
  auto tablet =
499
4.39k
      LookupLeaderTabletOrRespond(server_->tablet_peer_lookup(), req->tablet_id(), resp, &context);
500
4.39k
  if (!tablet) {
501
0
    return;
502
0
  }
503
4.39k
  const CoarseTimePoint& deadline = context.GetClientDeadline();
504
4.39k
  HybridTime min_hybrid_time(HybridTime::kMin);
505
4.39k
  if (req->has_min_hybrid_time_for_backfill()) {
506
4.37k
    min_hybrid_time = HybridTime(req->min_hybrid_time_for_backfill());
507
    // For Transactional tables, wait until there are no pending transactions that started
508
    // prior to min_hybrid_time. These may not have updated the index correctly, if they
509
    // happen to commit after the backfill scan, it is possible that they may miss updating
510
    // the index because the some operations may have taken place prior to min_hybrid_time.
511
    //
512
    // For Non-Txn tables, it is impossible to know at the tservers whether or not an "old
513
    // transaction" is still active. To avoid having such old transactions, we assume a
514
    // bound on the length of such transactions (during the backfill process) and wait it
515
    // out.
516
4.37k
    if (!tablet.peer->tablet()->transaction_participant()) {
517
510
      min_hybrid_time = min_hybrid_time.AddMilliseconds(
518
510
          FLAGS_index_backfill_upperbound_for_user_enforced_txn_duration_ms);
519
510
      VLOG(2) << "GetSafeTime called on a user enforced transaction tablet "
520
3
              << tablet.peer->tablet_id() << " will wait until "
521
3
              << min_hybrid_time << " is safe.";
522
3.86k
    } else {
523
      // Add some extra delay to wait for operations being replicated to be
524
      // applied.
525
3.86k
      SleepFor(MonoDelta::FromMilliseconds(
526
3.86k
          FLAGS_index_backfill_additional_delay_before_backfilling_ms));
527
528
3.86k
      auto txn_particpant = tablet.peer->tablet()->transaction_participant();
529
3.86k
      auto wait_until = CoarseMonoClock::Now() + FLAGS_index_backfill_wait_for_old_txns_ms * 1ms;
530
3.86k
      HybridTime min_running_ht;
531
3.86k
      for (;;) {
532
3.86k
        min_running_ht = txn_particpant->MinRunningHybridTime();
533
3.87k
        if (
(3.86k
min_running_ht3.86k
&& min_running_ht >= min_hybrid_time) ||
534
3.86k
            
CoarseMonoClock::Now() > wait_until3
) {
535
3.86k
          break;
536
3.86k
        }
537
18.4E
        VLOG(2) << "MinRunningHybridTime is " << min_running_ht
538
18.4E
                << " need to wait for " << min_hybrid_time;
539
18.4E
        SleepFor(MonoDelta::FromMilliseconds(FLAGS_transaction_min_running_check_interval_ms));
540
18.4E
      }
541
542
18.4E
      VLOG(2) << "Finally MinRunningHybridTime is " << min_running_ht;
543
3.86k
      if (min_running_ht < min_hybrid_time) {
544
3
        VLOG
(2) << "Aborting Txns that started prior to " << min_hybrid_time0
;
545
3
        auto s = txn_particpant->StopActiveTxnsPriorTo(min_hybrid_time, deadline);
546
3
        if (!s.ok()) {
547
0
          SetupErrorAndRespond(resp->mutable_error(), s, &context);
548
0
          return;
549
0
        }
550
3
      }
551
3.86k
    }
552
4.37k
  }
553
554
4.39k
  auto safe_time = tablet.peer->tablet()->SafeTime(
555
4.39k
      tablet::RequireLease::kTrue, min_hybrid_time, deadline);
556
4.39k
  if (!safe_time.ok()) {
557
3
    SetupErrorAndRespond(resp->mutable_error(), safe_time.status(), &context);
558
3
    return;
559
3
  }
560
561
4.39k
  resp->set_safe_time(safe_time->ToUint64());
562
4.39k
  resp->set_propagated_hybrid_time(server_->Clock()->Now().ToUint64());
563
18.4E
  VLOG(1) << "Tablet " << tablet.peer->tablet_id()
564
18.4E
          << " returning safe time " << yb::ToString(safe_time);
565
566
4.39k
  context.RespondSuccess();
567
4.39k
}
568
569
void TabletServiceAdminImpl::BackfillIndex(
570
4.49k
    const BackfillIndexRequestPB* req, BackfillIndexResponsePB* resp, rpc::RpcContext context) {
571
4.49k
  if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "BackfillIndex", req, resp, &context)) {
572
0
    return;
573
0
  }
574
18.4E
  DVLOG(3) << "Received BackfillIndex RPC: " << req->DebugString();
575
576
4.49k
  server::UpdateClock(*req, server_->Clock());
577
578
  // For now, we shall only allow this RPC on the leader.
579
4.49k
  auto tablet =
580
4.49k
      LookupLeaderTabletOrRespond(server_->tablet_peer_lookup(), req->tablet_id(), resp, &context);
581
4.49k
  if (!tablet) {
582
0
    return;
583
0
  }
584
585
4.49k
  if (req->indexes().empty()) {
586
0
    SetupErrorAndRespond(
587
0
        resp->mutable_error(),
588
0
        STATUS(InvalidArgument, "No indexes given in request"),
589
0
        TabletServerErrorPB::OPERATION_NOT_SUPPORTED,
590
0
        &context);
591
0
    return;
592
0
  }
593
594
4.49k
  const CoarseTimePoint &deadline = context.GetClientDeadline();
595
4.49k
  const auto coarse_start = CoarseMonoClock::Now();
596
4.49k
  {
597
4.49k
    std::unique_lock<std::mutex> l(backfill_lock_);
598
6.12k
    while (num_tablets_backfilling_ >= FLAGS_num_concurrent_backfills_allowed) {
599
1.62k
      if (backfill_cond_.wait_until(l, deadline) == std::cv_status::timeout) {
600
0
        SetupErrorAndRespond(
601
0
            resp->mutable_error(),
602
0
            STATUS_FORMAT(ServiceUnavailable,
603
0
                          "Already running $0 backfill requests",
604
0
                          num_tablets_backfilling_),
605
0
            &context);
606
0
        return;
607
0
      }
608
1.62k
    }
609
4.49k
    num_tablets_backfilling_++;
610
4.49k
  }
611
4.36k
  auto se = ScopeExit([this] {
612
4.36k
    std::unique_lock<std::mutex> l(this->backfill_lock_);
613
4.36k
    this->num_tablets_backfilling_--;
614
4.36k
    this->backfill_cond_.notify_all();
615
4.36k
  });
616
617
  // Wait for SafeTime to get past read_at;
618
4.49k
  const HybridTime read_at(req->read_at_hybrid_time());
619
18.4E
  DVLOG(1) << "Waiting for safe time to be past " << read_at;
620
4.49k
  const auto safe_time =
621
4.49k
      tablet.peer->tablet()->SafeTime(tablet::RequireLease::kFalse, read_at, deadline);
622
4.49k
  DVLOG
(1) << "Got safe time " << safe_time.ToString()88
;
623
4.49k
  if (!safe_time.ok()) {
624
0
    LOG(ERROR) << "Could not get a good enough safe time " << safe_time.ToString();
625
0
    SetupErrorAndRespond(resp->mutable_error(), safe_time.status(), &context);
626
0
    return;
627
0
  }
628
629
  // Don't work on the request if we have had to wait more than 50%
630
  // of the time allocated to us for the RPC.
631
  // Backfill is a costly operation, we do not want to start working
632
  // on it if we expect the client (master) to time out the RPC and
633
  // force us to redo the work.
634
4.49k
  const auto coarse_now = CoarseMonoClock::Now();
635
4.49k
  if (deadline - coarse_now < coarse_now - coarse_start) {
636
0
    SetupErrorAndRespond(
637
0
        resp->mutable_error(),
638
0
        STATUS_FORMAT(
639
0
            ServiceUnavailable, "Not enough time left $0", deadline - coarse_now),
640
0
        &context);
641
0
    return;
642
0
  }
643
644
4.49k
  bool all_at_backfill = true;
645
4.49k
  bool all_past_backfill = true;
646
4.49k
  bool is_pg_table = tablet.peer->tablet()->table_type() == TableType::PGSQL_TABLE_TYPE;
647
4.49k
  const shared_ptr<IndexMap> index_map = tablet.peer->tablet_metadata()->index_map(
648
4.49k
    req->indexed_table_id());
649
4.49k
  std::vector<IndexInfo> indexes_to_backfill;
650
4.49k
  std::vector<TableId> index_ids;
651
4.49k
  for (const auto& idx : req->indexes()) {
652
4.43k
    auto result = index_map->FindIndex(idx.table_id());
653
4.43k
    if (result) {
654
4.42k
      const IndexInfo* index_info = *result;
655
4.42k
      indexes_to_backfill.push_back(*index_info);
656
4.42k
      index_ids.push_back(index_info->table_id());
657
658
4.42k
      IndexInfoPB idx_info_pb;
659
4.42k
      index_info->ToPB(&idx_info_pb);
660
4.42k
      if (!is_pg_table) {
661
2.45k
        all_at_backfill &=
662
2.45k
            idx_info_pb.index_permissions() == IndexPermissions::INDEX_PERM_DO_BACKFILL;
663
2.45k
      } else {
664
        // YSQL tables don't use all the docdb permissions, so use this approximation.
665
        // TODO(jason): change this back to being like YCQL once we bring the docdb permission
666
        // DO_BACKFILL back (issue #6218).
667
1.97k
        all_at_backfill &=
668
1.97k
            idx_info_pb.index_permissions() == IndexPermissions::INDEX_PERM_WRITE_AND_DELETE;
669
1.97k
      }
670
4.42k
      all_past_backfill &=
671
4.42k
          idx_info_pb.index_permissions() > IndexPermissions::INDEX_PERM_DO_BACKFILL;
672
4.42k
    } else {
673
13
      LOG(WARNING) << "index " << idx.table_id() << " not found in tablet metadata";
674
13
      all_at_backfill = false;
675
13
      all_past_backfill = false;
676
13
    }
677
4.43k
  }
678
679
4.49k
  if (!all_at_backfill) {
680
17
    if (all_past_backfill) {
681
      // Change this to see if for all indexes: IndexPermission > DO_BACKFILL.
682
8
      LOG(WARNING) << "Received BackfillIndex RPC: " << req->DebugString()
683
8
                   << " after all indexes have moved past DO_BACKFILL. IndexMap is "
684
8
                   << ToString(index_map);
685
      // This is possible if this tablet completed the backfill. But the master failed over before
686
      // other tablets could complete.
687
      // The new master is redoing the backfill. We are safe to ignore this request.
688
8
      context.RespondSuccess();
689
8
      return;
690
8
    }
691
692
9
    uint32_t our_schema_version = tablet.peer->tablet_metadata()->schema_version();
693
9
    uint32_t their_schema_version = req->schema_version();
694
9
    DCHECK_NE(our_schema_version, their_schema_version);
695
9
    SetupErrorAndRespond(
696
9
        resp->mutable_error(),
697
9
        STATUS_SUBSTITUTE(
698
9
            InvalidArgument,
699
9
            "Tablet has a different schema $0 vs $1. "
700
9
            "Requested index is not ready to backfill. IndexMap: $2",
701
9
            our_schema_version, their_schema_version, ToString(index_map)),
702
9
        TabletServerErrorPB::MISMATCHED_SCHEMA, &context);
703
9
    return;
704
17
  }
705
706
4.48k
  Status backfill_status;
707
4.48k
  std::string backfilled_until;
708
4.48k
  std::unordered_set<TableId> failed_indexes;
709
4.48k
  size_t number_rows_processed = 0;
710
4.48k
  if (is_pg_table) {
711
1.95k
    if (!req->has_namespace_name()) {
712
0
      SetupErrorAndRespond(
713
0
          resp->mutable_error(),
714
0
          STATUS(
715
0
              InvalidArgument,
716
0
              "Attempted backfill on YSQL table without supplying database name"),
717
0
          TabletServerErrorPB::OPERATION_NOT_SUPPORTED,
718
0
          &context);
719
0
      return;
720
0
    }
721
1.95k
    backfill_status = tablet.peer->tablet()->BackfillIndexesForYsql(
722
1.95k
        indexes_to_backfill,
723
1.95k
        req->start_key(),
724
1.95k
        deadline,
725
1.95k
        read_at,
726
1.95k
        server_->pgsql_proxy_bind_address(),
727
1.95k
        req->namespace_name(),
728
1.95k
        server_->GetSharedMemoryPostgresAuthKey(),
729
1.95k
        &number_rows_processed,
730
1.95k
        &backfilled_until);
731
1.95k
    if (backfill_status.IsIllegalState()) {
732
24
      DCHECK_EQ
(failed_indexes.size(), 0) << "We don't support batching in YSQL yet"0
;
733
24
      for (const auto& idx_info : indexes_to_backfill) {
734
24
        failed_indexes.insert(idx_info.table_id());
735
24
      }
736
24
      DCHECK_EQ
(failed_indexes.size(), 1) << "We don't support batching in YSQL yet"0
;
737
24
    }
738
2.52k
  } else if (tablet.peer->tablet()->table_type() == TableType::YQL_TABLE_TYPE) {
739
2.43k
    backfill_status = tablet.peer->tablet()->BackfillIndexes(
740
2.43k
        indexes_to_backfill,
741
2.43k
        req->start_key(),
742
2.43k
        deadline,
743
2.43k
        read_at,
744
2.43k
        &number_rows_processed,
745
2.43k
        &backfilled_until,
746
2.43k
        &failed_indexes);
747
2.43k
  } else {
748
89
    SetupErrorAndRespond(
749
89
        resp->mutable_error(),
750
89
        STATUS(InvalidArgument, "Attempted backfill on tablet of invalid table type"),
751
89
        TabletServerErrorPB::OPERATION_NOT_SUPPORTED,
752
89
        &context);
753
89
    return;
754
89
  }
755
4.39k
  DVLOG(1) << "Tablet " << tablet.peer->tablet_id() << " backfilled indexes "
756
41
           << yb::ToString(index_ids) << " and got " << backfill_status
757
41
           << " backfilled until : " << backfilled_until;
758
759
4.39k
  resp->set_backfilled_until(backfilled_until);
760
4.39k
  resp->set_propagated_hybrid_time(server_->Clock()->Now().ToUint64());
761
4.39k
  resp->set_number_rows_processed(number_rows_processed);
762
763
4.39k
  if (!backfill_status.ok()) {
764
29
    VLOG
(2) << " Failed indexes are " << yb::ToString(failed_indexes)0
;
765
29
    for (const auto& idx : failed_indexes) {
766
29
      *resp->add_failed_index_ids() = idx;
767
29
    }
768
29
    SetupErrorAndRespond(
769
29
        resp->mutable_error(),
770
29
        backfill_status,
771
29
        (backfill_status.IsIllegalState()
772
29
            ? TabletServerErrorPB::OPERATION_NOT_SUPPORTED
773
29
            : 
TabletServerErrorPB::UNKNOWN_ERROR0
),
774
29
        &context);
775
29
    return;
776
29
  }
777
778
4.36k
  context.RespondSuccess();
779
4.36k
}
780
781
void TabletServiceAdminImpl::AlterSchema(const tablet::ChangeMetadataRequestPB* req,
782
                                         ChangeMetadataResponsePB* resp,
783
28.5k
                                         rpc::RpcContext context) {
784
28.5k
  if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "ChangeMetadata", req, resp, &context)) {
785
0
    return;
786
0
  }
787
28.5k
  VLOG
(1) << "Received Change Metadata RPC: " << req->DebugString()297
;
788
28.5k
  if (FLAGS_TEST_alter_schema_delay_ms) {
789
54
    LOG(INFO) << __func__ << ": sleeping for " << FLAGS_TEST_alter_schema_delay_ms << "ms";
790
54
    SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_alter_schema_delay_ms));
791
54
    LOG(INFO) << __func__ << ": done sleeping for " << FLAGS_TEST_alter_schema_delay_ms << "ms";
792
54
  }
793
794
28.5k
  server::UpdateClock(*req, server_->Clock());
795
796
28.5k
  auto tablet = LookupLeaderTabletOrRespond(
797
28.5k
      server_->tablet_peer_lookup(), req->tablet_id(), resp, &context);
798
28.5k
  if (!tablet) {
799
570
    return;
800
570
  }
801
802
27.9k
  tablet::TableInfoPtr table_info;
803
27.9k
  if (req->has_alter_table_id()) {
804
27.6k
    auto result = tablet.peer->tablet_metadata()->GetTableInfo(req->alter_table_id());
805
27.6k
    if (!result.ok()) {
806
0
      SetupErrorAndRespond(resp->mutable_error(), result.status(),
807
0
                           TabletServerErrorPB::INVALID_SCHEMA, &context);
808
0
      return;
809
0
    }
810
27.6k
    table_info = *result;
811
27.6k
  } else {
812
320
    table_info = tablet.peer->tablet_metadata()->primary_table_info();
813
320
  }
814
27.9k
  const Schema& tablet_schema = *table_info->schema;
815
27.9k
  uint32_t schema_version = table_info->schema_version;
816
  // Sanity check, to verify that the tablet should have the same schema
817
  // specified in the request.
818
27.9k
  Schema req_schema;
819
27.9k
  Status s = SchemaFromPB(req->schema(), &req_schema);
820
27.9k
  if (!s.ok()) {
821
0
    SetupErrorAndRespond(resp->mutable_error(), s, TabletServerErrorPB::INVALID_SCHEMA, &context);
822
0
    return;
823
0
  }
824
825
  // If the schema was already applied, respond as succeeded.
826
27.9k
  if (!req->has_wal_retention_secs() && 
schema_version == req->schema_version()22.6k
) {
827
828
401
    if (
req_schema.Equals(tablet_schema)398
) {
829
401
      context.RespondSuccess();
830
401
      return;
831
401
    }
832
833
18.4E
    schema_version = tablet.peer->tablet_metadata()->schema_version(
834
18.4E
        req->has_alter_table_id() ? 
req->alter_table_id()0
: "");
835
18.4E
    if (schema_version == req->schema_version()) {
836
0
      LOG(ERROR) << "The current schema does not match the request schema."
837
0
                 << " version=" << schema_version
838
0
                 << " current-schema=" << tablet_schema.ToString()
839
0
                 << " request-schema=" << req_schema.ToString()
840
0
                 << " (corruption)";
841
0
      SetupErrorAndRespond(resp->mutable_error(),
842
0
                           STATUS(Corruption, "got a different schema for the same version number"),
843
0
                           TabletServerErrorPB::MISMATCHED_SCHEMA, &context);
844
0
      return;
845
0
    }
846
18.4E
  }
847
848
  // If the current schema is newer than the one in the request reject the request.
849
27.5k
  if (schema_version > req->schema_version()) {
850
0
    LOG(ERROR) << "Tablet " << req->tablet_id() << " has a newer schema"
851
0
               << " version=" << schema_version
852
0
               << " req->schema_version()=" << req->schema_version()
853
0
               << "\n current-schema=" << tablet_schema.ToString()
854
0
               << "\n request-schema=" << req_schema.ToString();
855
0
    SetupErrorAndRespond(
856
0
        resp->mutable_error(),
857
0
        STATUS_SUBSTITUTE(
858
0
            InvalidArgument, "Tablet has a newer schema Tab $0. Req $1 vs Existing version : $2",
859
0
            req->tablet_id(), req->schema_version(), schema_version),
860
0
        TabletServerErrorPB::TABLET_HAS_A_NEWER_SCHEMA, &context);
861
0
    return;
862
0
  }
863
864
18.4E
  VLOG(1) << "Tablet updating schema from "
865
18.4E
          << " version=" << schema_version << " current-schema=" << tablet_schema.ToString()
866
18.4E
          << " to request-schema=" << req_schema.ToString()
867
18.4E
          << " for table ID=" << table_info->table_id;
868
27.5k
  ScopedRWOperationPause pause_writes;
869
27.5k
  if ((tablet.peer->tablet()->table_type() == TableType::YQL_TABLE_TYPE &&
870
27.5k
       
!GetAtomicFlag(&FLAGS_disable_alter_vs_write_mutual_exclusion)14.1k
) ||
871
27.5k
      
tablet.peer->tablet()->table_type() == TableType::PGSQL_TABLE_TYPE13.1k
) {
872
    // For schema change operations we will have to pause the write operations
873
    // until the schema change is done. This will be done synchronously.
874
27.4k
    pause_writes = tablet.peer->tablet()->PauseWritePermits(context.GetClientDeadline());
875
27.4k
    if (!pause_writes.ok()) {
876
0
      SetupErrorAndRespond(
877
0
          resp->mutable_error(),
878
0
          STATUS(
879
0
              TryAgain, "Could not lock the tablet against write operations for schema change"),
880
0
          &context);
881
0
      return;
882
0
    }
883
884
    // After write operation is paused, active transactions will be aborted for YSQL transactions.
885
27.4k
    if (tablet.peer->tablet()->table_type() == TableType::PGSQL_TABLE_TYPE &&
886
27.4k
        
req->should_abort_active_txns()13.2k
) {
887
1.13k
      DCHECK(req->has_transaction_id());
888
1.13k
      if (tablet.peer->tablet()->transaction_participant() == nullptr) {
889
0
        auto status = STATUS(
890
0
            IllegalState, "Transaction participant is null for tablet " + req->tablet_id());
891
0
        LOG(ERROR) << status;
892
0
        SetupErrorAndRespond(
893
0
            resp->mutable_error(),
894
0
            status,
895
0
            &context);
896
0
        return;
897
0
      }
898
1.13k
      HybridTime max_cutoff = HybridTime::kMax;
899
1.13k
      CoarseTimePoint deadline =
900
1.13k
          CoarseMonoClock::Now() +
901
1.13k
          MonoDelta::FromMilliseconds(FLAGS_ysql_transaction_abort_timeout_ms);
902
1.13k
      TransactionId txn_id = CHECK_RESULT(TransactionId::FromString(req->transaction_id()));
903
1.13k
      LOG(INFO) << "Aborting transactions that started prior to " << max_cutoff
904
1.13k
                << " for tablet id " << req->tablet_id()
905
1.13k
                << " excluding transaction with id " << txn_id;
906
      // There could be a chance where a transaction does not appear by transaction_participant
907
      // but has already begun replicating through Raft. Such transactions might succeed rather
908
      // than get aborted. This race codnition is dismissable for this intermediate solution.
909
1.13k
      Status status = tablet.peer->tablet()->transaction_participant()->StopActiveTxnsPriorTo(
910
1.13k
            max_cutoff, deadline, &txn_id);
911
1.13k
      if (!status.ok() || 
PREDICT_FALSE1.13k
(FLAGS_TEST_fail_alter_schema_after_abort_transactions)) {
912
0
        auto status = STATUS(TryAgain, "Transaction abort failed for tablet " + req->tablet_id());
913
0
        LOG(WARNING) << status;
914
0
        SetupErrorAndRespond(
915
0
            resp->mutable_error(),
916
0
            status,
917
0
            &context);
918
0
        return;
919
0
      }
920
1.13k
    }
921
27.4k
  }
922
27.5k
  auto operation = std::make_unique<ChangeMetadataOperation>(
923
27.5k
      tablet.peer->tablet(), tablet.peer->log(), req);
924
925
27.5k
  operation->set_completion_callback(
926
27.5k
      MakeRpcOperationCompletionCallback(std::move(context), resp, server_->Clock()));
927
27.5k
  operation->UsePermitToken(std::move(pause_writes));
928
929
  // Submit the alter schema op. The RPC will be responded to asynchronously.
930
27.5k
  tablet.peer->Submit(std::move(operation), tablet.leader_term);
931
27.5k
}
932
933
28.0M
#define VERIFY_RESULT_OR_RETURN(expr) RESULT_CHECKER_HELPER( \
934
27.9M
    expr, \
935
27.9M
    if (!__result.ok()) { return; });
936
937
void TabletServiceImpl::VerifyTableRowRange(
938
    const VerifyTableRowRangeRequestPB* req,
939
    VerifyTableRowRangeResponsePB* resp,
940
0
    rpc::RpcContext context) {
941
0
  DVLOG(3) << "Received VerifyTableRowRange RPC: " << req->DebugString();
942
943
0
  server::UpdateClock(*req, server_->Clock());
944
945
0
  auto peer_tablet =
946
0
      LookupTabletPeerOrRespond(server_->tablet_peer_lookup(), req->tablet_id(), resp, &context);
947
0
  if (!peer_tablet) {
948
0
    return;
949
0
  }
950
951
0
  auto tablet = peer_tablet->tablet;
952
0
  bool is_pg_table = tablet->table_type() == TableType::PGSQL_TABLE_TYPE;
953
0
  if (is_pg_table) {
954
0
    SetupErrorAndRespond(
955
0
        resp->mutable_error(), STATUS(NotFound, "Verify operation not supported for PGSQL tables."),
956
0
        &context);
957
0
    return;
958
0
  }
959
960
0
  const CoarseTimePoint& deadline = context.GetClientDeadline();
961
962
  // Wait for SafeTime to get past read_at;
963
0
  const HybridTime read_at(req->read_time());
964
0
  DVLOG(1) << "Waiting for safe time to be past " << read_at;
965
0
  const auto safe_time = tablet->SafeTime(tablet::RequireLease::kFalse, read_at, deadline);
966
0
  DVLOG(1) << "Got safe time " << safe_time.ToString();
967
0
  if (!safe_time.ok()) {
968
0
    LOG(ERROR) << "Could not get a good enough safe time " << safe_time.ToString();
969
0
    SetupErrorAndRespond(resp->mutable_error(), safe_time.status(), &context);
970
0
    return;
971
0
  }
972
973
0
  auto valid_read_at = req->has_read_time() ? read_at : *safe_time;
974
0
  std::string verified_until = "";
975
0
  std::unordered_map<TableId, uint64> consistency_stats;
976
977
0
  if (peer_tablet->tablet_peer->tablet_metadata()->primary_table_info()->index_info) {
978
0
    auto index_info =
979
0
        *peer_tablet->tablet_peer->tablet_metadata()->primary_table_info()->index_info;
980
0
    const auto& table_id = index_info.indexed_table_id();
981
0
    Status verify_status = tablet->VerifyMainTableConsistencyForCQL(
982
0
        table_id, req->start_key(), req->num_rows(), deadline, valid_read_at, &consistency_stats,
983
0
        &verified_until);
984
0
    if (!verify_status.ok()) {
985
0
      SetupErrorAndRespond(resp->mutable_error(), verify_status, &context);
986
0
      return;
987
0
    }
988
989
0
    (*resp->mutable_consistency_stats())[table_id] = consistency_stats[table_id];
990
0
  } else {
991
0
    const IndexMap index_map =
992
0
        *peer_tablet->tablet_peer->tablet_metadata()->primary_table_info()->index_map;
993
0
    vector<IndexInfo> indexes;
994
0
    vector<TableId> index_ids;
995
0
    if (req->index_ids().empty()) {
996
0
      for (auto it = index_map.begin(); it != index_map.end(); it++) {
997
0
        indexes.push_back(it->second);
998
0
      }
999
0
    } else {
1000
0
      for (const auto& idx : req->index_ids()) {
1001
0
        auto result = index_map.FindIndex(idx);
1002
0
        if (result) {
1003
0
          const IndexInfo* index_info = *result;
1004
0
          indexes.push_back(*index_info);
1005
0
          index_ids.push_back(index_info->table_id());
1006
0
        } else {
1007
0
          LOG(WARNING) << "Index " << idx << " not found in tablet metadata";
1008
0
        }
1009
0
      }
1010
0
    }
1011
1012
0
    Status verify_status = tablet->VerifyIndexTableConsistencyForCQL(
1013
0
        indexes, req->start_key(), req->num_rows(), deadline, valid_read_at, &consistency_stats,
1014
0
        &verified_until);
1015
0
    if (!verify_status.ok()) {
1016
0
      SetupErrorAndRespond(resp->mutable_error(), verify_status, &context);
1017
0
      return;
1018
0
    }
1019
1020
0
    for (const IndexInfo& index : indexes) {
1021
0
      const auto& table_id = index.table_id();
1022
0
      (*resp->mutable_consistency_stats())[table_id] = consistency_stats[table_id];
1023
0
    }
1024
0
  }
1025
0
  resp->set_verified_until(verified_until);
1026
0
  context.RespondSuccess();
1027
0
}
1028
1029
void TabletServiceImpl::UpdateTransaction(const UpdateTransactionRequestPB* req,
1030
                                          UpdateTransactionResponsePB* resp,
1031
2.53M
                                          rpc::RpcContext context) {
1032
2.53M
  TRACE("UpdateTransaction");
1033
1034
2.53M
  if (req->state().status() == TransactionStatus::CREATED &&
1035
2.53M
      
RandomActWithProbability(TEST_delay_create_transaction_probability)411k
) {
1036
0
    std::this_thread::sleep_for(
1037
0
        (FLAGS_transaction_rpc_timeout_ms + RandomUniformInt(-200, 200)) * 1ms);
1038
0
  }
1039
1040
2.53M
  VLOG(1) << "UpdateTransaction: " << req->ShortDebugString()
1041
1.83k
          << ", context: " << context.ToString();
1042
2.53M
  LOG_IF(DFATAL, !req->has_propagated_hybrid_time())
1043
3.08k
      << __func__ << " missing propagated hybrid time for "
1044
3.08k
      << TransactionStatus_Name(req->state().status());
1045
2.53M
  UpdateClock(*req, server_->Clock());
1046
1047
2.53M
  LeaderTabletPeer tablet;
1048
2.53M
  auto txn_status = req->state().status();
1049
2.53M
  auto cleanup = txn_status == TransactionStatus::IMMEDIATE_CLEANUP ||
1050
2.53M
                 
txn_status == TransactionStatus::GRACEFUL_CLEANUP1.85M
;
1051
2.53M
  if (cleanup) {
1052
927k
    auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond(
1053
927k
        server_->tablet_peer_lookup(), req->tablet_id(), resp, &context));
1054
927k
    tablet.FillTabletPeer(std::move(peer_tablet));
1055
927k
    tablet.leader_term = OpId::kUnknownTerm;
1056
1.60M
  } else {
1057
1.60M
    tablet = LookupLeaderTabletOrRespond(
1058
1.60M
        server_->tablet_peer_lookup(), req->tablet_id(), resp, &context);
1059
1.60M
  }
1060
2.53M
  if (!tablet) {
1061
3.77k
    return;
1062
3.77k
  }
1063
1064
2.52M
  auto state = std::make_unique<tablet::UpdateTxnOperation>(tablet.tablet.get(), &req->state());
1065
2.52M
  state->set_completion_callback(MakeRpcOperationCompletionCallback(
1066
2.52M
      std::move(context), resp, server_->Clock()));
1067
1068
2.52M
  if (req->state().status() == TransactionStatus::APPLYING || 
cleanup2.09M
) {
1069
1.36M
    auto* participant = tablet.tablet->transaction_participant();
1070
1.36M
    if (
participant1.36M
) {
1071
1.36M
      participant->Handle(std::move(state), tablet.leader_term);
1072
18.4E
    } else {
1073
18.4E
      state->CompleteWithStatus(STATUS_FORMAT(
1074
18.4E
          InvalidArgument, "Does not have transaction participant to process $0",
1075
18.4E
          req->state().status()));
1076
18.4E
    }
1077
1.36M
  } else {
1078
1.16M
    auto* coordinator = tablet.tablet->transaction_coordinator();
1079
1.16M
    if (coordinator) {
1080
1.16M
      coordinator->Handle(std::move(state), tablet.leader_term);
1081
1.16M
    } else {
1082
783
      state->CompleteWithStatus(STATUS_FORMAT(
1083
783
          InvalidArgument, "Does not have transaction coordinator to process $0",
1084
783
          req->state().status()));
1085
783
    }
1086
1.16M
  }
1087
2.52M
}
1088
1089
template <class Req, class Resp, class Action>
1090
void TabletServiceImpl::PerformAtLeader(
1091
296k
    const Req& req, Resp* resp, rpc::RpcContext* context, const Action& action) {
1092
296k
  UpdateClock(*req, server_->Clock());
1093
1094
296k
  auto tablet_peer = LookupLeaderTabletOrRespond(
1095
296k
      server_->tablet_peer_lookup(), req->tablet_id(), resp, context);
1096
1097
296k
  if (!tablet_peer) {
1098
223
    return;
1099
223
  }
1100
1101
296k
  auto status = action(tablet_peer);
1102
1103
296k
  if (*context) {
1104
296k
    resp->set_propagated_hybrid_time(server_->Clock()->Now().ToUint64());
1105
296k
    if (
status.ok()296k
) {
1106
296k
      context->RespondSuccess();
1107
18.4E
    } else {
1108
18.4E
      SetupErrorAndRespond(resp->mutable_error(), status, context);
1109
18.4E
    }
1110
296k
  }
1111
296k
}
tablet_service.cc:void yb::tserver::TabletServiceImpl::PerformAtLeader<yb::tserver::GetTransactionStatusRequestPB const*, yb::tserver::GetTransactionStatusResponsePB, yb::tserver::TabletServiceImpl::GetTransactionStatus(yb::tserver::GetTransactionStatusRequestPB const*, yb::tserver::GetTransactionStatusResponsePB*, yb::rpc::RpcContext)::$_1>(yb::tserver::GetTransactionStatusRequestPB const* const&, yb::tserver::GetTransactionStatusResponsePB*, yb::rpc::RpcContext*, yb::tserver::TabletServiceImpl::GetTransactionStatus(yb::tserver::GetTransactionStatusRequestPB const*, yb::tserver::GetTransactionStatusResponsePB*, yb::rpc::RpcContext)::$_1 const&)
Line
Count
Source
1091
296k
    const Req& req, Resp* resp, rpc::RpcContext* context, const Action& action) {
1092
296k
  UpdateClock(*req, server_->Clock());
1093
1094
296k
  auto tablet_peer = LookupLeaderTabletOrRespond(
1095
296k
      server_->tablet_peer_lookup(), req->tablet_id(), resp, context);
1096
1097
296k
  if (!tablet_peer) {
1098
223
    return;
1099
223
  }
1100
1101
296k
  auto status = action(tablet_peer);
1102
1103
296k
  if (*context) {
1104
296k
    resp->set_propagated_hybrid_time(server_->Clock()->Now().ToUint64());
1105
296k
    if (
status.ok()296k
) {
1106
296k
      context->RespondSuccess();
1107
18.4E
    } else {
1108
18.4E
      SetupErrorAndRespond(resp->mutable_error(), status, context);
1109
18.4E
    }
1110
296k
  }
1111
296k
}
Unexecuted instantiation: tablet_service.cc:void yb::tserver::TabletServiceImpl::PerformAtLeader<yb::tserver::GetTransactionStatusAtParticipantRequestPB const*, yb::tserver::GetTransactionStatusAtParticipantResponsePB, yb::tserver::TabletServiceImpl::GetTransactionStatusAtParticipant(yb::tserver::GetTransactionStatusAtParticipantRequestPB const*, yb::tserver::GetTransactionStatusAtParticipantResponsePB*, yb::rpc::RpcContext)::$_2>(yb::tserver::GetTransactionStatusAtParticipantRequestPB const* const&, yb::tserver::GetTransactionStatusAtParticipantResponsePB*, yb::rpc::RpcContext*, yb::tserver::TabletServiceImpl::GetTransactionStatusAtParticipant(yb::tserver::GetTransactionStatusAtParticipantRequestPB const*, yb::tserver::GetTransactionStatusAtParticipantResponsePB*, yb::rpc::RpcContext)::$_2 const&)
tablet_service.cc:void yb::tserver::TabletServiceImpl::PerformAtLeader<yb::tserver::GetSplitKeyRequestPB const*, yb::tserver::GetSplitKeyResponsePB, yb::tserver::TabletServiceImpl::GetSplitKey(yb::tserver::GetSplitKeyRequestPB const*, yb::tserver::GetSplitKeyResponsePB*, yb::rpc::RpcContext)::$_4>(yb::tserver::GetSplitKeyRequestPB const* const&, yb::tserver::GetSplitKeyResponsePB*, yb::rpc::RpcContext*, yb::tserver::TabletServiceImpl::GetSplitKey(yb::tserver::GetSplitKeyRequestPB const*, yb::tserver::GetSplitKeyResponsePB*, yb::rpc::RpcContext)::$_4 const&)
Line
Count
Source
1091
144
    const Req& req, Resp* resp, rpc::RpcContext* context, const Action& action) {
1092
144
  UpdateClock(*req, server_->Clock());
1093
1094
144
  auto tablet_peer = LookupLeaderTabletOrRespond(
1095
144
      server_->tablet_peer_lookup(), req->tablet_id(), resp, context);
1096
1097
144
  if (!tablet_peer) {
1098
0
    return;
1099
0
  }
1100
1101
144
  auto status = action(tablet_peer);
1102
1103
144
  if (*context) {
1104
144
    resp->set_propagated_hybrid_time(server_->Clock()->Now().ToUint64());
1105
144
    if (status.ok()) {
1106
143
      context->RespondSuccess();
1107
143
    } else {
1108
1
      SetupErrorAndRespond(resp->mutable_error(), status, context);
1109
1
    }
1110
144
  }
1111
144
}
1112
1113
void TabletServiceImpl::GetTransactionStatus(const GetTransactionStatusRequestPB* req,
1114
                                             GetTransactionStatusResponsePB* resp,
1115
296k
                                             rpc::RpcContext context) {
1116
296k
  TRACE("GetTransactionStatus");
1117
1118
296k
  PerformAtLeader(req, resp, &context,
1119
296k
      [req, resp, &context](const LeaderTabletPeer& tablet_peer) {
1120
296k
    auto* transaction_coordinator = tablet_peer.tablet->transaction_coordinator();
1121
296k
    if (!transaction_coordinator) {
1122
0
      return STATUS_FORMAT(
1123
0
          InvalidArgument, "No transaction coordinator at tablet $0",
1124
0
          tablet_peer.peer->tablet_id());
1125
0
    }
1126
296k
    return transaction_coordinator->GetStatus(
1127
296k
        req->transaction_id(), context.GetClientDeadline(), resp);
1128
296k
  });
1129
296k
}
1130
1131
void TabletServiceImpl::GetTransactionStatusAtParticipant(
1132
    const GetTransactionStatusAtParticipantRequestPB* req,
1133
    GetTransactionStatusAtParticipantResponsePB* resp,
1134
0
    rpc::RpcContext context) {
1135
0
  TRACE("GetTransactionStatusAtParticipant");
1136
1137
0
  PerformAtLeader(req, resp, &context,
1138
0
      [req, resp, &context](const LeaderTabletPeer& tablet_peer) -> Status {
1139
0
    auto* transaction_participant = tablet_peer.peer->tablet()->transaction_participant();
1140
0
    if (!transaction_participant) {
1141
0
      return STATUS_FORMAT(
1142
0
          InvalidArgument, "No transaction participant at tablet $0",
1143
0
          tablet_peer.peer->tablet_id());
1144
0
    }
1145
1146
0
    transaction_participant->GetStatus(
1147
0
        VERIFY_RESULT(FullyDecodeTransactionId(req->transaction_id())),
1148
0
        req->required_num_replicated_batches(), tablet_peer.leader_term, resp, &context);
1149
0
    return Status::OK();
1150
0
  });
1151
0
}
1152
1153
void TabletServiceImpl::AbortTransaction(const AbortTransactionRequestPB* req,
1154
                                         AbortTransactionResponsePB* resp,
1155
193k
                                         rpc::RpcContext context) {
1156
193k
  TRACE("AbortTransaction");
1157
1158
193k
  UpdateClock(*req, server_->Clock());
1159
1160
193k
  auto tablet = LookupLeaderTabletOrRespond(
1161
193k
      server_->tablet_peer_lookup(), req->tablet_id(), resp, &context);
1162
193k
  if (!tablet) {
1163
6.74k
    return;
1164
6.74k
  }
1165
1166
186k
  server::ClockPtr clock(server_->Clock());
1167
186k
  auto context_ptr = std::make_shared<rpc::RpcContext>(std::move(context));
1168
186k
  tablet.peer->tablet()->transaction_coordinator()->Abort(
1169
186k
      req->transaction_id(),
1170
186k
      tablet.leader_term,
1171
186k
      [resp, context_ptr, clock, peer = tablet.peer](Result<TransactionStatusResult> result) {
1172
184k
        resp->set_propagated_hybrid_time(clock->Now().ToUint64());
1173
184k
        Status status;
1174
184k
        if (
result.ok()184k
) {
1175
184k
          auto leader_safe_time = peer->LeaderSafeTime();
1176
184k
          if (leader_safe_time.ok()) {
1177
184k
            resp->set_status(result->status);
1178
184k
            if (result->status_time.is_valid()) {
1179
22.5k
              resp->set_status_hybrid_time(result->status_time.ToUint64());
1180
22.5k
            }
1181
            // See comment above WaitForSafeTime in TransactionStatusCache::DoGetCommitData
1182
            // for details.
1183
184k
            resp->set_coordinator_safe_time(leader_safe_time->ToUint64());
1184
184k
            context_ptr->RespondSuccess();
1185
184k
            return;
1186
184k
          }
1187
1188
11
          status = leader_safe_time.status();
1189
18.4E
        } else {
1190
18.4E
          status = result.status();
1191
18.4E
        }
1192
18.4E
        SetupErrorAndRespond(resp->mutable_error(), status, context_ptr.get());
1193
18.4E
      });
1194
186k
}
1195
1196
void TabletServiceImpl::Truncate(const TruncateRequestPB* req,
1197
                                 TruncateResponsePB* resp,
1198
57.2k
                                 rpc::RpcContext context) {
1199
57.2k
  TRACE("Truncate");
1200
1201
57.2k
  UpdateClock(*req, server_->Clock());
1202
1203
57.2k
  auto tablet = LookupLeaderTabletOrRespond(
1204
57.2k
      server_->tablet_peer_lookup(), req->tablet_id(), resp, &context);
1205
57.2k
  if (!tablet) {
1206
5
    return;
1207
5
  }
1208
1209
57.1k
  auto operation = std::make_unique<TruncateOperation>(tablet.peer->tablet(), &req->truncate());
1210
1211
57.1k
  operation->set_completion_callback(
1212
57.1k
      MakeRpcOperationCompletionCallback(std::move(context), resp, server_->Clock()));
1213
1214
  // Submit the truncate tablet op. The RPC will be responded to asynchronously.
1215
57.1k
  tablet.peer->Submit(std::move(operation), tablet.leader_term);
1216
57.1k
}
1217
1218
void TabletServiceAdminImpl::CreateTablet(const CreateTabletRequestPB* req,
1219
                                          CreateTabletResponsePB* resp,
1220
139k
                                          rpc::RpcContext context) {
1221
139k
  if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "CreateTablet", req, resp, &context)) {
1222
0
    return;
1223
0
  }
1224
139k
  auto status = DoCreateTablet(req, resp);
1225
139k
  if (!status.ok()) {
1226
12
    SetupErrorAndRespond(resp->mutable_error(), status, &context);
1227
139k
  } else {
1228
139k
    context.RespondSuccess();
1229
139k
  }
1230
139k
}
1231
1232
Status TabletServiceAdminImpl::DoCreateTablet(const CreateTabletRequestPB* req,
1233
138k
                                              CreateTabletResponsePB* resp) {
1234
138k
  if (PREDICT_FALSE(FLAGS_TEST_txn_status_table_tablet_creation_delay_ms > 0 &&
1235
138k
                    req->table_type() == TableType::TRANSACTION_STATUS_TABLE_TYPE)) {
1236
142
    std::this_thread::sleep_for(FLAGS_TEST_txn_status_table_tablet_creation_delay_ms * 1ms);
1237
142
  }
1238
1239
138k
  DVLOG
(3) << "Received CreateTablet RPC: " << yb::ToString(*req)15
;
1240
138k
  TRACE_EVENT1("tserver", "CreateTablet",
1241
138k
               "tablet_id", req->tablet_id());
1242
1243
138k
  Schema schema;
1244
138k
  PartitionSchema partition_schema;
1245
138k
  auto status = SchemaFromPB(req->schema(), &schema);
1246
138k
  if (status.ok()) {
1247
138k
    DCHECK(schema.has_column_ids());
1248
138k
    status = PartitionSchema::FromPB(req->partition_schema(), schema, &partition_schema);
1249
138k
  }
1250
138k
  if (!status.ok()) {
1251
0
    return status.CloneAndAddErrorCode(TabletServerError(TabletServerErrorPB::INVALID_SCHEMA));
1252
0
  }
1253
1254
138k
  Partition partition;
1255
138k
  Partition::FromPB(req->partition(), &partition);
1256
1257
138k
  LOG(INFO) << "Processing CreateTablet for T " << req->tablet_id() << " P " << req->dest_uuid()
1258
138k
            << " (table=" << req->table_name()
1259
138k
            << " [id=" << req->table_id() << "]), partition="
1260
138k
            << partition_schema.PartitionDebugString(partition, schema);
1261
18.4E
  VLOG(1) << "Full request: " << req->DebugString();
1262
1263
138k
  auto table_info = std::make_shared<tablet::TableInfo>(
1264
138k
      req->table_id(), req->namespace_name(), req->table_name(), req->table_type(), schema,
1265
138k
      IndexMap(),
1266
138k
      req->has_index_info() ? 
boost::optional<IndexInfo>(req->index_info())14.7k
:
boost::none124k
,
1267
138k
      0 /* schema_version */, partition_schema);
1268
138k
  std::vector<SnapshotScheduleId> snapshot_schedules;
1269
138k
  snapshot_schedules.reserve(req->snapshot_schedules().size());
1270
138k
  for (const auto& id : req->snapshot_schedules()) {
1271
71
    snapshot_schedules.push_back(VERIFY_RESULT(FullyDecodeSnapshotScheduleId(id)));
1272
71
  }
1273
138k
  status = ResultToStatus(server_->tablet_manager()->CreateNewTablet(
1274
138k
      table_info, req->tablet_id(), partition, req->config(), req->colocated(),
1275
138k
      snapshot_schedules));
1276
138k
  if (PREDICT_FALSE(!status.ok())) {
1277
12
    return status.IsAlreadyPresent()
1278
12
        ? 
status.CloneAndAddErrorCode(TabletServerError(TabletServerErrorPB::TABLET_ALREADY_EXISTS))1
1279
12
        : 
status11
;
1280
12
  }
1281
138k
  return Status::OK();
1282
138k
}
1283
1284
void TabletServiceAdminImpl::DeleteTablet(const DeleteTabletRequestPB* req,
1285
                                          DeleteTabletResponsePB* resp,
1286
77.4k
                                          rpc::RpcContext context) {
1287
77.4k
  if (PREDICT_FALSE(FLAGS_TEST_rpc_delete_tablet_fail)) {
1288
0
    context.RespondFailure(STATUS(NetworkError, "Simulating network partition for test"));
1289
0
    return;
1290
0
  }
1291
1292
77.4k
  if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "DeleteTablet", req, resp, &context)) {
1293
1
    return;
1294
1
  }
1295
77.4k
  TRACE_EVENT2("tserver", "DeleteTablet",
1296
77.4k
               "tablet_id", req->tablet_id(),
1297
77.4k
               "reason", req->reason());
1298
1299
77.4k
  tablet::TabletDataState delete_type = tablet::TABLET_DATA_UNKNOWN;
1300
77.4k
  if (req->has_delete_type()) {
1301
76.9k
    delete_type = req->delete_type();
1302
76.9k
  }
1303
77.4k
  LOG(INFO) << "T " << req->tablet_id() << " P " << server_->permanent_uuid()
1304
77.4k
            << ": Processing DeleteTablet with delete_type " << TabletDataState_Name(delete_type)
1305
77.4k
            << (req->has_reason() ? 
(" (" + req->reason() + ")")77.4k
:
""20
)
1306
77.4k
            << (req->hide_only() ? 
" (Hide only)"36
:
""77.4k
)
1307
77.4k
            << " from " << context.requestor_string();
1308
18.4E
  VLOG(1) << "Full request: " << req->DebugString();
1309
1310
77.4k
  boost::optional<int64_t> cas_config_opid_index_less_or_equal;
1311
77.4k
  if (req->has_cas_config_opid_index_less_or_equal()) {
1312
3.47k
    cas_config_opid_index_less_or_equal = req->cas_config_opid_index_less_or_equal();
1313
3.47k
  }
1314
77.4k
  boost::optional<TabletServerErrorPB::Code> error_code;
1315
77.4k
  Status s = server_->tablet_manager()->DeleteTablet(req->tablet_id(),
1316
77.4k
                                                     delete_type,
1317
77.4k
                                                     cas_config_opid_index_less_or_equal,
1318
77.4k
                                                     req->hide_only(),
1319
77.4k
                                                     &error_code);
1320
77.4k
  if (PREDICT_FALSE(!s.ok())) {
1321
2.00k
    HandleErrorResponse(resp, &context, s, error_code);
1322
2.00k
    return;
1323
2.00k
  }
1324
75.4k
  context.RespondSuccess();
1325
75.4k
}
1326
1327
// TODO(sagnik): Modify this to actually create a copartitioned table
1328
void TabletServiceAdminImpl::CopartitionTable(const CopartitionTableRequestPB* req,
1329
                                              CopartitionTableResponsePB* resp,
1330
0
                                              rpc::RpcContext context) {
1331
0
  context.RespondSuccess();
1332
0
  LOG(INFO) << "tserver doesn't support co-partitioning yet";
1333
0
}
1334
1335
void TabletServiceAdminImpl::FlushTablets(const FlushTabletsRequestPB* req,
1336
                                          FlushTabletsResponsePB* resp,
1337
43
                                          rpc::RpcContext context) {
1338
43
  if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "FlushTablets", req, resp, &context)) {
1339
0
    return;
1340
0
  }
1341
1342
43
  if (!req->all_tablets() && 
req->tablet_ids_size() == 035
) {
1343
0
    const Status s = STATUS(InvalidArgument, "No tablet ids");
1344
0
    SetupErrorAndRespond(resp->mutable_error(), s, &context);
1345
0
    return;
1346
0
  }
1347
1348
43
  server::UpdateClock(*req, server_->Clock());
1349
1350
43
  TRACE_EVENT1("tserver", "FlushTablets",
1351
43
               "TS: ", req->dest_uuid());
1352
1353
43
  LOG(INFO) << "Processing FlushTablets from " << context.requestor_string();
1354
43
  VLOG
(1) << "Full FlushTablets request: " << req->DebugString()0
;
1355
43
  TabletPeers tablet_peers;
1356
43
  TSTabletManager::TabletPtrs tablet_ptrs;
1357
1358
43
  if (req->all_tablets()) {
1359
8
    tablet_peers = server_->tablet_manager()->GetTabletPeers(&tablet_ptrs);
1360
35
  } else {
1361
50
    for (const TabletId& id : req->tablet_ids()) {
1362
50
      auto tablet_peer = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond(
1363
50
          server_->tablet_peer_lookup(), id, resp, &context));
1364
50
      tablet_peers.push_back(std::move(tablet_peer.tablet_peer));
1365
50
      auto tablet = tablet_peer.tablet;
1366
50
      if (tablet != nullptr) {
1367
50
        tablet_ptrs.push_back(std::move(tablet));
1368
50
      }
1369
50
    }
1370
35
  }
1371
43
  switch (req->operation()) {
1372
35
    case FlushTabletsRequestPB::FLUSH:
1373
50
      for (const tablet::TabletPtr& tablet : tablet_ptrs) {
1374
50
        resp->set_failed_tablet_id(tablet->tablet_id());
1375
50
        RETURN_UNKNOWN_ERROR_IF_NOT_OK(tablet->Flush(tablet::FlushMode::kAsync), resp, &context);
1376
50
        resp->clear_failed_tablet_id();
1377
50
      }
1378
1379
      // Wait for end of all flush operations.
1380
50
      
for (const tablet::TabletPtr& tablet : tablet_ptrs)35
{
1381
50
        resp->set_failed_tablet_id(tablet->tablet_id());
1382
50
        RETURN_UNKNOWN_ERROR_IF_NOT_OK(tablet->WaitForFlush(), resp, &context);
1383
50
        resp->clear_failed_tablet_id();
1384
50
      }
1385
35
      break;
1386
35
    case FlushTabletsRequestPB::COMPACT:
1387
0
      RETURN_UNKNOWN_ERROR_IF_NOT_OK(
1388
0
          server_->tablet_manager()->TriggerCompactionAndWait(tablet_ptrs), resp, &context);
1389
0
      break;
1390
8
    case FlushTabletsRequestPB::LOG_GC:
1391
8
      for (const auto& tablet : tablet_peers) {
1392
8
        resp->set_failed_tablet_id(tablet->tablet_id());
1393
8
        RETURN_UNKNOWN_ERROR_IF_NOT_OK(tablet->RunLogGC(), resp, &context);
1394
8
        resp->clear_failed_tablet_id();
1395
8
      }
1396
8
      break;
1397
43
  }
1398
1399
43
  context.RespondSuccess();
1400
43
}
1401
1402
void TabletServiceAdminImpl::CountIntents(
1403
    const CountIntentsRequestPB* req,
1404
    CountIntentsResponsePB* resp,
1405
0
    rpc::RpcContext context) {
1406
0
  TSTabletManager::TabletPtrs tablet_ptrs;
1407
0
  TabletPeers tablet_peers = server_->tablet_manager()->GetTabletPeers(&tablet_ptrs);
1408
0
  int64_t total_intents = 0;
1409
  // TODO: do this in parallel.
1410
  // TODO: per-tablet intent counts.
1411
0
  for (const auto& tablet : tablet_ptrs) {
1412
0
    auto num_intents = tablet->CountIntents();
1413
0
    if (!num_intents.ok()) {
1414
0
      SetupErrorAndRespond(resp->mutable_error(), num_intents.status(), &context);
1415
0
      return;
1416
0
    }
1417
0
    total_intents += *num_intents;
1418
0
  }
1419
0
  resp->set_num_intents(total_intents);
1420
0
  context.RespondSuccess();
1421
0
}
1422
1423
void TabletServiceAdminImpl::AddTableToTablet(
1424
    const AddTableToTabletRequestPB* req, AddTableToTabletResponsePB* resp,
1425
126
    rpc::RpcContext context) {
1426
126
  auto tablet_id = req->tablet_id();
1427
1428
126
  const auto tablet =
1429
126
      LookupLeaderTabletOrRespond(server_->tablet_peer_lookup(), tablet_id, resp, &context);
1430
126
  if (!tablet) {
1431
0
    return;
1432
0
  }
1433
126
  DVLOG
(3) << "Received AddTableToTablet RPC: " << yb::ToString(*req)0
;
1434
1435
126
  tablet::ChangeMetadataRequestPB change_req;
1436
126
  *change_req.mutable_add_table() = req->add_table();
1437
126
  change_req.set_tablet_id(tablet_id);
1438
126
  Status s = tablet::SyncReplicateChangeMetadataOperation(
1439
126
      &change_req, tablet.peer.get(), tablet.leader_term);
1440
126
  if (PREDICT_FALSE(!s.ok())) {
1441
0
    SetupErrorAndRespond(resp->mutable_error(), s, &context);
1442
0
    return;
1443
0
  }
1444
126
  context.RespondSuccess();
1445
126
}
1446
1447
void TabletServiceAdminImpl::RemoveTableFromTablet(
1448
    const RemoveTableFromTabletRequestPB* req,
1449
    RemoveTableFromTabletResponsePB* resp,
1450
81
    rpc::RpcContext context) {
1451
81
  auto tablet =
1452
81
      LookupLeaderTabletOrRespond(server_->tablet_peer_lookup(), req->tablet_id(), resp, &context);
1453
81
  if (!tablet) {
1454
1
    return;
1455
1
  }
1456
1457
80
  tablet::ChangeMetadataRequestPB change_req;
1458
80
  change_req.set_remove_table_id(req->remove_table_id());
1459
80
  change_req.set_tablet_id(req->tablet_id());
1460
80
  Status s = tablet::SyncReplicateChangeMetadataOperation(
1461
80
      &change_req, tablet.peer.get(), tablet.leader_term);
1462
80
  if (PREDICT_FALSE(!s.ok())) {
1463
0
    SetupErrorAndRespond(resp->mutable_error(), s, &context);
1464
0
    return;
1465
0
  }
1466
80
  context.RespondSuccess();
1467
80
}
1468
1469
void TabletServiceAdminImpl::SplitTablet(
1470
141
    const tablet::SplitTabletRequestPB* req, SplitTabletResponsePB* resp, rpc::RpcContext context) {
1471
141
  if (!CheckUuidMatchOrRespond(server_->tablet_manager(), "SplitTablet", req, resp, &context)) {
1472
0
    return;
1473
0
  }
1474
141
  if (PREDICT_FALSE(FLAGS_TEST_fail_tablet_split_probability > 0) &&
1475
141
      
RandomActWithProbability(FLAGS_TEST_fail_tablet_split_probability)0
) {
1476
0
    return SetupErrorAndRespond(
1477
0
        resp->mutable_error(),
1478
0
        STATUS(InvalidArgument,  // Use InvalidArgument to hit IsDefinitelyPermanentError().
1479
0
            "Failing tablet split due to FLAGS_TEST_fail_tablet_split_probability"),
1480
0
        TabletServerErrorPB::UNKNOWN_ERROR,
1481
0
        &context);
1482
0
  }
1483
141
  TRACE_EVENT1("tserver", "SplitTablet", "tablet_id", req->tablet_id());
1484
1485
141
  server::UpdateClock(*req, server_->Clock());
1486
141
  auto leader_tablet_peer =
1487
141
      LookupLeaderTabletOrRespond(server_->tablet_peer_lookup(), req->tablet_id(), resp, &context);
1488
141
  if (!leader_tablet_peer) {
1489
0
    return;
1490
0
  }
1491
1492
141
  {
1493
141
    auto tablet_data_state = leader_tablet_peer.peer->data_state();
1494
141
    if (tablet_data_state != tablet::TABLET_DATA_READY) {
1495
96
      auto s = tablet_data_state == tablet::TABLET_DATA_SPLIT_COMPLETED
1496
96
                  ? STATUS_FORMAT(AlreadyPresent, "Tablet $0 is already split.", req->tablet_id())
1497
96
                  : 
STATUS_FORMAT0
(
1498
96
                        InvalidArgument, "Invalid tablet $0 data state: $1", req->tablet_id(),
1499
96
                        tablet_data_state);
1500
96
      SetupErrorAndRespond(
1501
96
          resp->mutable_error(), s, TabletServerErrorPB::TABLET_NOT_RUNNING, &context);
1502
96
      return;
1503
96
    }
1504
141
  }
1505
1506
45
  auto state = std::make_unique<tablet::SplitOperation>(
1507
45
      leader_tablet_peer.peer->tablet(), server_->tablet_manager(), req);
1508
1509
45
  state->set_completion_callback(
1510
45
      MakeRpcOperationCompletionCallback(std::move(context), resp, server_->Clock()));
1511
1512
45
  leader_tablet_peer.peer->Submit(std::move(state), leader_tablet_peer.leader_term);
1513
45
}
1514
1515
void TabletServiceAdminImpl::UpgradeYsql(
1516
    const UpgradeYsqlRequestPB* req,
1517
    UpgradeYsqlResponsePB* resp,
1518
4
    rpc::RpcContext context) {
1519
4
  LOG(INFO) << "Starting YSQL upgrade";
1520
1521
4
  pgwrapper::YsqlUpgradeHelper upgrade_helper(server_->pgsql_proxy_bind_address(),
1522
4
                                              server_->GetSharedMemoryPostgresAuthKey(),
1523
4
                                              FLAGS_heartbeat_interval_ms);
1524
4
  const auto status = upgrade_helper.Upgrade();
1525
4
  if (!status.ok()) {
1526
0
    LOG(INFO) << "YSQL upgrade failed: " << status;
1527
0
    SetupErrorAndRespond(resp->mutable_error(), status, &context);
1528
0
    return;
1529
0
  }
1530
1531
4
  LOG(INFO) << "YSQL upgrade done successfully";
1532
4
  context.RespondSuccess();
1533
4
}
1534
1535
1536
0
bool EmptyWriteBatch(const docdb::KeyValueWriteBatchPB& write_batch) {
1537
0
  return write_batch.write_pairs().empty() && write_batch.apply_external_transactions().empty();
1538
0
}
1539
1540
void TabletServiceImpl::Write(const WriteRequestPB* req,
1541
                              WriteResponsePB* resp,
1542
2.61M
                              rpc::RpcContext context) {
1543
2.61M
  if (FLAGS_TEST_tserver_noop_read_write) {
1544
0
    for (int i = 0; i < req->ql_write_batch_size(); ++i) {
1545
0
      resp->add_ql_response_batch();
1546
0
    }
1547
0
    context.RespondSuccess();
1548
0
    return;
1549
0
  }
1550
2.61M
  TRACE("Start Write");
1551
2.61M
  TRACE_EVENT1("tserver", "TabletServiceImpl::Write",
1552
2.61M
               "tablet_id", req->tablet_id());
1553
2.61M
  VLOG
(2) << "Received Write RPC: " << req->DebugString()367
;
1554
2.61M
  UpdateClock(*req, server_->Clock());
1555
1556
2.61M
  auto tablet = LookupLeaderTabletOrRespond(
1557
2.61M
      server_->tablet_peer_lookup(), req->tablet_id(), resp, &context);
1558
2.61M
  if (!tablet ||
1559
2.61M
      !CheckWriteThrottlingOrRespond(
1560
2.56M
          req->rejection_score(), tablet.peer.get(), resp, &context)) {
1561
51.1k
    return;
1562
51.1k
  }
1563
1564
2.56M
  if (tablet.peer->tablet()->metadata()->hidden()) {
1565
3
    auto status = STATUS(NotFound, "Tablet not found", req->tablet_id());
1566
3
    SetupErrorAndRespond(
1567
3
        resp->mutable_error(), status, TabletServerErrorPB::TABLET_NOT_FOUND, &context);
1568
3
    return;
1569
3
  }
1570
1571
#if defined(DUMP_WRITE)
1572
  if (req->has_write_batch() && req->write_batch().has_transaction()) {
1573
    VLOG(1) << "Write with transaction: " << req->write_batch().transaction().ShortDebugString();
1574
    if (req->pgsql_write_batch_size() != 0) {
1575
      auto txn_id = CHECK_RESULT(FullyDecodeTransactionId(
1576
          req->write_batch().transaction().transaction_id()));
1577
      for (const auto& entry : req->pgsql_write_batch()) {
1578
        if (entry.stmt_type() == PgsqlWriteRequestPB::PGSQL_UPDATE) {
1579
          auto key = entry.column_new_values(0).expr().value().int32_value();
1580
          LOG(INFO) << txn_id << " UPDATE: " << key << " = "
1581
                    << entry.column_new_values(1).expr().value().string_value();
1582
        } else if (
1583
            entry.stmt_type() == PgsqlWriteRequestPB::PGSQL_INSERT ||
1584
            entry.stmt_type() == PgsqlWriteRequestPB::PGSQL_UPSERT) {
1585
          docdb::DocKey doc_key;
1586
          CHECK_OK(doc_key.FullyDecodeFrom(entry.ybctid_column_value().value().binary_value()));
1587
          LOG(INFO) << txn_id << " INSERT: " << doc_key.hashed_group()[0].GetInt32() << " = "
1588
                    << entry.column_values(0).expr().value().string_value();
1589
        } else if (entry.stmt_type() == PgsqlWriteRequestPB::PGSQL_DELETE) {
1590
          LOG(INFO) << txn_id << " DELETE: " << entry.ShortDebugString();
1591
        }
1592
      }
1593
    }
1594
  }
1595
#endif
1596
1597
2.56M
  if (PREDICT_FALSE(req->has_write_batch() && !req->has_external_hybrid_time() &&
1598
2.56M
      (!req->write_batch().write_pairs().empty() || !req->write_batch().read_pairs().empty()))) {
1599
0
    Status s = STATUS(NotSupported, "Write Request contains write batch. This field should be "
1600
0
        "used only for post-processed write requests during "
1601
0
        "Raft replication.");
1602
0
    SetupErrorAndRespond(resp->mutable_error(), s,
1603
0
                         TabletServerErrorPB::INVALID_MUTATION,
1604
0
                         &context);
1605
0
    return;
1606
0
  }
1607
1608
2.56M
  bool has_operations = req->ql_write_batch_size() != 0 ||
1609
2.56M
                        
req->redis_write_batch_size() != 0815k
||
1610
2.56M
                        
req->pgsql_write_batch_size() != 0691k
||
1611
2.56M
                        
(1
req->has_external_hybrid_time()1
&&
!EmptyWriteBatch(req->write_batch())0
);
1612
2.56M
  if (!has_operations && 
tablet.peer->tablet()->table_type() != TableType::REDIS_TABLE_TYPE1
) {
1613
    // An empty request. This is fine, can just exit early with ok status instead of working hard.
1614
    // This doesn't need to go to Raft log.
1615
1
    MakeRpcOperationCompletionCallback<WriteResponsePB>(
1616
1
        std::move(context), resp, server_->Clock())(Status::OK());
1617
1
    return;
1618
1
  }
1619
1620
  // For postgres requests check that the syscatalog version matches.
1621
2.56M
  if (tablet.peer->tablet()->table_type() == TableType::PGSQL_TABLE_TYPE) {
1622
514k
    uint64_t last_breaking_catalog_version = 0; // unset.
1623
6.43M
    for (const auto& pg_req : req->pgsql_write_batch()) {
1624
6.43M
      if (pg_req.has_ysql_catalog_version()) {
1625
6.43M
        if (last_breaking_catalog_version == 0) {
1626
          // Initialize last breaking version if not yet set.
1627
513k
          server_->get_ysql_catalog_version(nullptr /* current_version */,
1628
513k
                                            &last_breaking_catalog_version);
1629
513k
        }
1630
6.43M
        if (pg_req.ysql_catalog_version() < last_breaking_catalog_version) {
1631
5
          SetupErrorAndRespond(resp->mutable_error(),
1632
5
              STATUS_SUBSTITUTE(QLError, "The catalog snapshot used for this "
1633
5
                                         "transaction has been invalidated."),
1634
5
              TabletServerErrorPB::MISMATCHED_SCHEMA, &context);
1635
5
          return;
1636
5
        }
1637
6.43M
      }
1638
6.43M
    }
1639
514k
  }
1640
1641
2.56M
  auto query = std::make_unique<tablet::WriteQuery>(
1642
2.56M
      tablet.leader_term, context.GetClientDeadline(), tablet.peer.get(),
1643
2.56M
      tablet.peer->tablet(), resp);
1644
2.56M
  query->set_client_request(*req);
1645
1646
2.56M
  auto context_ptr = std::make_shared<RpcContext>(std::move(context));
1647
2.56M
  if (RandomActWithProbability(GetAtomicFlag(&FLAGS_TEST_respond_write_failed_probability))) {
1648
0
    LOG(INFO) << "Responding with a failure to " << req->DebugString();
1649
0
    SetupErrorAndRespond(resp->mutable_error(), STATUS(LeaderHasNoLease, "TEST: Random failure"),
1650
0
                         context_ptr.get());
1651
2.56M
  } else {
1652
2.56M
    query->set_callback(WriteQueryCompletionCallback(
1653
2.56M
        tablet.peer, context_ptr, resp, query.get(), server_->Clock(), req->include_trace()));
1654
2.56M
  }
1655
1656
2.56M
  query->AdjustYsqlQueryTransactionality(req->pgsql_write_batch_size());
1657
1658
2.56M
  tablet.peer->WriteAsync(std::move(query));
1659
2.56M
}
1660
1661
void TabletServiceImpl::Read(const ReadRequestPB* req,
1662
                             ReadResponsePB* resp,
1663
9.56M
                             rpc::RpcContext context) {
1664
9.56M
  if (FLAGS_TEST_tserver_noop_read_write) {
1665
0
    context.RespondSuccess();
1666
0
    return;
1667
0
  }
1668
1669
9.56M
  PerformRead(server_, this, req, resp, std::move(context));
1670
9.56M
}
1671
1672
ConsensusServiceImpl::ConsensusServiceImpl(const scoped_refptr<MetricEntity>& metric_entity,
1673
                                           TabletPeerLookupIf* tablet_manager)
1674
    : ConsensusServiceIf(metric_entity),
1675
16.7k
      tablet_manager_(tablet_manager) {
1676
16.7k
}
1677
1678
182
ConsensusServiceImpl::~ConsensusServiceImpl() {
1679
182
}
1680
1681
void ConsensusServiceImpl::CompleteUpdateConsensusResponse(
1682
    std::shared_ptr<tablet::TabletPeer> tablet_peer,
1683
25.5M
    consensus::ConsensusResponsePB* resp) {
1684
25.5M
  auto tablet = tablet_peer->shared_tablet();
1685
25.5M
  if (
tablet25.5M
) {
1686
25.5M
    resp->set_num_sst_files(tablet->GetCurrentVersionNumSSTFiles());
1687
25.5M
  }
1688
25.5M
  resp->set_propagated_hybrid_time(tablet_peer->clock().Now().ToUint64());
1689
25.5M
}
1690
1691
void ConsensusServiceImpl::MultiRaftUpdateConsensus(
1692
      const consensus::MultiRaftConsensusRequestPB *req,
1693
      consensus::MultiRaftConsensusResponsePB *resp,
1694
0
      rpc::RpcContext context) {
1695
0
    DVLOG(3) << "Received Batch Consensus Update RPC: " << req->ShortDebugString();
1696
    // Effectively performs ConsensusServiceImpl::UpdateConsensus for
1697
    // each ConsensusRequestPB in the batch but does not fail the entire
1698
    // batch if a single request fails.
1699
0
    for (int i = 0; i < req->consensus_request_size(); i++) {
1700
      // Unfortunately, we have to use const_cast here,
1701
      // because the protobuf-generated interface only gives us a const request
1702
      // but we need to be able to move messages out of the request for efficiency.
1703
0
      auto consensus_req = const_cast<ConsensusRequestPB*>(&req->consensus_request(i));
1704
0
      auto consensus_resp = resp->add_consensus_response();;
1705
1706
0
      auto uuid_match_res = CheckUuidMatch(tablet_manager_, "UpdateConsensus", consensus_req,
1707
0
                                           context.requestor_string());
1708
0
      if (!uuid_match_res.ok()) {
1709
0
        SetupError(consensus_resp->mutable_error(), uuid_match_res.status());
1710
0
        continue;
1711
0
      }
1712
1713
0
      auto peer_tablet_res = LookupTabletPeer(tablet_manager_, consensus_req->tablet_id());
1714
0
      if (!peer_tablet_res.ok()) {
1715
0
        SetupError(consensus_resp->mutable_error(), peer_tablet_res.status());
1716
0
        continue;
1717
0
      }
1718
0
      auto tablet_peer = peer_tablet_res.get().tablet_peer;
1719
1720
      // Submit the update directly to the TabletPeer's Consensus instance.
1721
0
      auto consensus_res = GetConsensus(tablet_peer);
1722
0
      if (!consensus_res.ok()) {
1723
0
        SetupError(consensus_resp->mutable_error(), consensus_res.status());
1724
0
        continue;
1725
0
      }
1726
0
      auto consensus = *consensus_res;
1727
1728
0
      Status s = consensus->Update(
1729
0
        consensus_req, consensus_resp, context.GetClientDeadline());
1730
0
      if (PREDICT_FALSE(!s.ok())) {
1731
        // Clear the response first, since a partially-filled response could
1732
        // result in confusing a caller, or in having missing required fields
1733
        // in embedded optional messages.
1734
0
        consensus_resp->Clear();
1735
0
        SetupError(consensus_resp->mutable_error(), s);
1736
0
        continue;
1737
0
      }
1738
1739
0
      CompleteUpdateConsensusResponse(tablet_peer, consensus_resp);
1740
0
    }
1741
0
    context.RespondSuccess();
1742
0
}
1743
1744
void ConsensusServiceImpl::UpdateConsensus(const ConsensusRequestPB* req,
1745
                                           ConsensusResponsePB* resp,
1746
25.5M
                                           rpc::RpcContext context) {
1747
25.5M
  DVLOG
(3) << "Received Consensus Update RPC: " << req->ShortDebugString()31.8k
;
1748
25.5M
  if (!CheckUuidMatchOrRespond(tablet_manager_, "UpdateConsensus", req, resp, &context)) {
1749
95
    return;
1750
95
  }
1751
25.5M
  auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond(
1752
25.5M
      tablet_manager_, req->tablet_id(), resp, &context));
1753
25.5M
  auto tablet_peer = peer_tablet.tablet_peer;
1754
1755
  // Submit the update directly to the TabletPeer's Consensus instance.
1756
25.5M
  shared_ptr<Consensus> consensus;
1757
25.5M
  if (!GetConsensusOrRespond(tablet_peer, resp, &context, &consensus)) 
return0
;
1758
1759
  // Unfortunately, we have to use const_cast here, because the protobuf-generated interface only
1760
  // gives us a const request, but we need to be able to move messages out of the request for
1761
  // efficiency.
1762
25.5M
  Status s = consensus->Update(
1763
25.5M
      const_cast<ConsensusRequestPB*>(req), resp, context.GetClientDeadline());
1764
25.5M
  if (PREDICT_FALSE(!s.ok())) {
1765
    // Clear the response first, since a partially-filled response could
1766
    // result in confusing a caller, or in having missing required fields
1767
    // in embedded optional messages.
1768
1.93k
    resp->Clear();
1769
1770
1.93k
    SetupErrorAndRespond(resp->mutable_error(), s, &context);
1771
1.93k
    return;
1772
1.93k
  }
1773
1774
25.5M
  CompleteUpdateConsensusResponse(tablet_peer, resp);
1775
1776
25.5M
  context.RespondSuccess();
1777
25.5M
}
1778
1779
void ConsensusServiceImpl::RequestConsensusVote(const VoteRequestPB* req,
1780
                                                VoteResponsePB* resp,
1781
1.39M
                                                rpc::RpcContext context) {
1782
1.39M
  DVLOG
(3) << "Received Consensus Request Vote RPC: " << req->DebugString()513
;
1783
1.39M
  if (!CheckUuidMatchOrRespond(tablet_manager_, "RequestConsensusVote", req, resp, &context)) {
1784
2.26k
    return;
1785
2.26k
  }
1786
1.39M
  auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond(
1787
1.36M
      tablet_manager_, req->tablet_id(), resp, &context));
1788
1.36M
  auto tablet_peer = peer_tablet.tablet_peer;
1789
1790
  // Submit the vote request directly to the consensus instance.
1791
1.36M
  shared_ptr<Consensus> consensus;
1792
1.36M
  if (!GetConsensusOrRespond(tablet_peer, resp, &context, &consensus)) 
return0
;
1793
1.36M
  Status s = consensus->RequestVote(req, resp);
1794
1.36M
  RETURN_UNKNOWN_ERROR_IF_NOT_OK(s, resp, &context);
1795
1.36M
  context.RespondSuccess();
1796
1.36M
}
1797
1798
void ConsensusServiceImpl::ChangeConfig(const ChangeConfigRequestPB* req,
1799
                                        ChangeConfigResponsePB* resp,
1800
5.29k
                                        RpcContext context) {
1801
5.29k
  VLOG
(1) << "Received ChangeConfig RPC: " << req->ShortDebugString()4
;
1802
  // If the destination uuid is empty string, it means the client was retrying after a leader
1803
  // stepdown and did not have a chance to update the uuid inside the request.
1804
  // TODO: Note that this can be removed once Java YBClient will reset change config's uuid
1805
  // correctly after leader step down.
1806
5.29k
  if (req->dest_uuid() != "" &&
1807
5.29k
      
!CheckUuidMatchOrRespond(tablet_manager_, "ChangeConfig", req, resp, &context)5.15k
) {
1808
0
    return;
1809
0
  }
1810
5.29k
  auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond(
1811
5.29k
      tablet_manager_, req->tablet_id(), resp, &context));
1812
5.29k
  auto tablet_peer = peer_tablet.tablet_peer;
1813
1814
5.29k
  shared_ptr<Consensus> consensus;
1815
5.29k
  if (!GetConsensusOrRespond(tablet_peer, resp, &context, &consensus)) 
return0
;
1816
5.29k
  boost::optional<TabletServerErrorPB::Code> error_code;
1817
5.29k
  std::shared_ptr<RpcContext> context_ptr = std::make_shared<RpcContext>(std::move(context));
1818
5.29k
  Status s = consensus->ChangeConfig(*req, BindHandleResponse(resp, context_ptr), &error_code);
1819
18.4E
  VLOG(1) << "Sent ChangeConfig req " << req->ShortDebugString() << " to consensus layer.";
1820
5.29k
  if (PREDICT_FALSE(!s.ok())) {
1821
1.67k
    HandleErrorResponse(resp, context_ptr.get(), s, error_code);
1822
1.67k
    return;
1823
1.67k
  }
1824
  // The success case is handled when the callback fires.
1825
5.29k
}
1826
1827
void ConsensusServiceImpl::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB* req,
1828
                                              UnsafeChangeConfigResponsePB* resp,
1829
6
                                              RpcContext context) {
1830
6
  VLOG
(1) << "Received UnsafeChangeConfig RPC: " << req->ShortDebugString()0
;
1831
6
  if (!CheckUuidMatchOrRespond(tablet_manager_, "UnsafeChangeConfig", req, resp, &context)) {
1832
0
    return;
1833
0
  }
1834
6
  auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond(
1835
6
      tablet_manager_, req->tablet_id(), resp, &context));
1836
6
  auto tablet_peer = peer_tablet.tablet_peer;
1837
1838
6
  shared_ptr<Consensus> consensus;
1839
6
  if (!GetConsensusOrRespond(tablet_peer, resp, &context, &consensus)) {
1840
0
    return;
1841
0
  }
1842
6
  boost::optional<TabletServerErrorPB::Code> error_code;
1843
6
  const Status s = consensus->UnsafeChangeConfig(*req, &error_code);
1844
6
  if (PREDICT_FALSE(!s.ok())) {
1845
0
    SetupErrorAndRespond(resp->mutable_error(), s, &context);
1846
0
    HandleErrorResponse(resp, &context, s, error_code);
1847
0
    return;
1848
0
  }
1849
6
  context.RespondSuccess();
1850
6
}
1851
1852
void ConsensusServiceImpl::GetNodeInstance(const GetNodeInstanceRequestPB* req,
1853
                                           GetNodeInstanceResponsePB* resp,
1854
20.9k
                                           rpc::RpcContext context) {
1855
20.9k
  DVLOG
(3) << "Received Get Node Instance RPC: " << req->DebugString()6
;
1856
20.9k
  resp->mutable_node_instance()->CopyFrom(tablet_manager_->NodeInstance());
1857
20.9k
  auto status = tablet_manager_->GetRegistration(resp->mutable_registration());
1858
20.9k
  if (!status.ok()) {
1859
0
    context.RespondFailure(status);
1860
20.9k
  } else {
1861
20.9k
    context.RespondSuccess();
1862
20.9k
  }
1863
20.9k
}
1864
1865
namespace {
1866
1867
class RpcScope {
1868
 public:
1869
  template<class Req, class Resp>
1870
  RpcScope(TabletPeerLookupIf* tablet_manager,
1871
           const char* method_name,
1872
           const Req* req,
1873
           Resp* resp,
1874
           rpc::RpcContext* context)
1875
154k
      : context_(context) {
1876
154k
    if (!CheckUuidMatchOrRespond(tablet_manager, method_name, req, resp, context)) {
1877
0
      return;
1878
0
    }
1879
154k
    auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond(
1880
116k
        tablet_manager, req->tablet_id(), resp, context));
1881
116k
    auto tablet_peer = peer_tablet.tablet_peer;
1882
1883
116k
    if (!GetConsensusOrRespond(tablet_peer, resp, context, &consensus_)) {
1884
0
      return;
1885
0
    }
1886
116k
    responded_ = false;
1887
116k
  }
tablet_service.cc:yb::tserver::(anonymous namespace)::RpcScope::RpcScope<yb::consensus::RunLeaderElectionRequestPB, yb::consensus::RunLeaderElectionResponsePB>(yb::tserver::TabletPeerLookupIf*, char const*, yb::consensus::RunLeaderElectionRequestPB const*, yb::consensus::RunLeaderElectionResponsePB*, yb::rpc::RpcContext*)
Line
Count
Source
1875
96.2k
      : context_(context) {
1876
96.2k
    if (!CheckUuidMatchOrRespond(tablet_manager, method_name, req, resp, context)) {
1877
0
      return;
1878
0
    }
1879
96.2k
    auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond(
1880
57.9k
        tablet_manager, req->tablet_id(), resp, context));
1881
57.9k
    auto tablet_peer = peer_tablet.tablet_peer;
1882
1883
57.9k
    if (!GetConsensusOrRespond(tablet_peer, resp, context, &consensus_)) {
1884
0
      return;
1885
0
    }
1886
57.9k
    responded_ = false;
1887
57.9k
  }
tablet_service.cc:yb::tserver::(anonymous namespace)::RpcScope::RpcScope<yb::consensus::LeaderElectionLostRequestPB, yb::consensus::LeaderElectionLostResponsePB>(yb::tserver::TabletPeerLookupIf*, char const*, yb::consensus::LeaderElectionLostRequestPB const*, yb::consensus::LeaderElectionLostResponsePB*, yb::rpc::RpcContext*)
Line
Count
Source
1875
77
      : context_(context) {
1876
77
    if (!CheckUuidMatchOrRespond(tablet_manager, method_name, req, resp, context)) {
1877
0
      return;
1878
0
    }
1879
77
    auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond(
1880
77
        tablet_manager, req->tablet_id(), resp, context));
1881
77
    auto tablet_peer = peer_tablet.tablet_peer;
1882
1883
77
    if (!GetConsensusOrRespond(tablet_peer, resp, context, &consensus_)) {
1884
0
      return;
1885
0
    }
1886
77
    responded_ = false;
1887
77
  }
tablet_service.cc:yb::tserver::(anonymous namespace)::RpcScope::RpcScope<yb::consensus::LeaderStepDownRequestPB, yb::consensus::LeaderStepDownResponsePB>(yb::tserver::TabletPeerLookupIf*, char const*, yb::consensus::LeaderStepDownRequestPB const*, yb::consensus::LeaderStepDownResponsePB*, yb::rpc::RpcContext*)
Line
Count
Source
1875
54.1k
      : context_(context) {
1876
54.1k
    if (!CheckUuidMatchOrRespond(tablet_manager, method_name, req, resp, context)) {
1877
0
      return;
1878
0
    }
1879
54.1k
    auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond(
1880
54.1k
        tablet_manager, req->tablet_id(), resp, context));
1881
54.1k
    auto tablet_peer = peer_tablet.tablet_peer;
1882
1883
54.1k
    if (!GetConsensusOrRespond(tablet_peer, resp, context, &consensus_)) {
1884
0
      return;
1885
0
    }
1886
54.1k
    responded_ = false;
1887
54.1k
  }
tablet_service.cc:yb::tserver::(anonymous namespace)::RpcScope::RpcScope<yb::consensus::GetConsensusStateRequestPB, yb::consensus::GetConsensusStateResponsePB>(yb::tserver::TabletPeerLookupIf*, char const*, yb::consensus::GetConsensusStateRequestPB const*, yb::consensus::GetConsensusStateResponsePB*, yb::rpc::RpcContext*)
Line
Count
Source
1875
4.06k
      : context_(context) {
1876
4.06k
    if (!CheckUuidMatchOrRespond(tablet_manager, method_name, req, resp, context)) {
1877
0
      return;
1878
0
    }
1879
4.06k
    auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond(
1880
3.88k
        tablet_manager, req->tablet_id(), resp, context));
1881
3.88k
    auto tablet_peer = peer_tablet.tablet_peer;
1882
1883
3.88k
    if (!GetConsensusOrRespond(tablet_peer, resp, context, &consensus_)) {
1884
0
      return;
1885
0
    }
1886
3.88k
    responded_ = false;
1887
3.88k
  }
1888
1889
154k
  ~RpcScope() {
1890
154k
    if (!responded_) {
1891
115k
      context_->RespondSuccess();
1892
115k
    }
1893
154k
  }
1894
1895
  template<class Resp>
1896
112k
  void CheckStatus(const Status& status, Resp* resp) {
1897
112k
    if (!status.ok()) {
1898
0
      LOG(INFO) << "Status failed: " << status.ToString();
1899
0
      SetupErrorAndRespond(resp->mutable_error(), status, context_);
1900
0
      responded_ = true;
1901
0
    }
1902
112k
  }
tablet_service.cc:void yb::tserver::(anonymous namespace)::RpcScope::CheckStatus<yb::consensus::RunLeaderElectionResponsePB>(yb::Status const&, yb::consensus::RunLeaderElectionResponsePB*)
Line
Count
Source
1896
57.8k
  void CheckStatus(const Status& status, Resp* resp) {
1897
57.8k
    if (!status.ok()) {
1898
0
      LOG(INFO) << "Status failed: " << status.ToString();
1899
0
      SetupErrorAndRespond(resp->mutable_error(), status, context_);
1900
0
      responded_ = true;
1901
0
    }
1902
57.8k
  }
tablet_service.cc:void yb::tserver::(anonymous namespace)::RpcScope::CheckStatus<yb::consensus::LeaderElectionLostResponsePB>(yb::Status const&, yb::consensus::LeaderElectionLostResponsePB*)
Line
Count
Source
1896
77
  void CheckStatus(const Status& status, Resp* resp) {
1897
77
    if (!status.ok()) {
1898
0
      LOG(INFO) << "Status failed: " << status.ToString();
1899
0
      SetupErrorAndRespond(resp->mutable_error(), status, context_);
1900
0
      responded_ = true;
1901
0
    }
1902
77
  }
tablet_service.cc:void yb::tserver::(anonymous namespace)::RpcScope::CheckStatus<yb::consensus::LeaderStepDownResponsePB>(yb::Status const&, yb::consensus::LeaderStepDownResponsePB*)
Line
Count
Source
1896
54.1k
  void CheckStatus(const Status& status, Resp* resp) {
1897
54.1k
    if (!status.ok()) {
1898
0
      LOG(INFO) << "Status failed: " << status.ToString();
1899
0
      SetupErrorAndRespond(resp->mutable_error(), status, context_);
1900
0
      responded_ = true;
1901
0
    }
1902
54.1k
  }
1903
1904
115k
  Consensus* operator->() {
1905
115k
    return consensus_.get();
1906
115k
  }
1907
1908
154k
  explicit operator bool() const {
1909
154k
    return !responded_;
1910
154k
  }
1911
1912
 private:
1913
  rpc::RpcContext* context_;
1914
  bool responded_ = true;
1915
  shared_ptr<Consensus> consensus_;
1916
};
1917
1918
} // namespace
1919
1920
void ConsensusServiceImpl::RunLeaderElection(const RunLeaderElectionRequestPB* req,
1921
                                             RunLeaderElectionResponsePB* resp,
1922
96.3k
                                             rpc::RpcContext context) {
1923
96.3k
  VLOG
(1) << "Received Run Leader Election RPC: " << req->DebugString()157
;
1924
96.3k
  RpcScope scope(tablet_manager_, "RunLeaderElection", req, resp, &context);
1925
96.3k
  if (!scope) {
1926
38.3k
    return;
1927
38.3k
  }
1928
1929
57.9k
  Status s = scope->StartElection(consensus::LeaderElectionData {
1930
57.9k
    .mode = consensus::ElectionMode::ELECT_EVEN_IF_LEADER_IS_ALIVE,
1931
57.9k
    .pending_commit = req->has_committed_index(),
1932
57.9k
    .must_be_committed_opid = OpId::FromPB(req->committed_index()),
1933
57.9k
    .originator_uuid = req->has_originator_uuid() ? 
req->originator_uuid()9.97k
:
std::string()47.9k
,
1934
57.9k
    .suppress_vote_request = consensus::TEST_SuppressVoteRequest(req->suppress_vote_request()),
1935
57.9k
    .initial_election = req->initial_election() });
1936
57.9k
  scope.CheckStatus(s, resp);
1937
57.9k
}
1938
1939
void ConsensusServiceImpl::LeaderElectionLost(const consensus::LeaderElectionLostRequestPB *req,
1940
                                              consensus::LeaderElectionLostResponsePB *resp,
1941
77
                                              ::yb::rpc::RpcContext context) {
1942
77
  LOG(INFO) << "LeaderElectionLost, req: " << req->ShortDebugString();
1943
77
  RpcScope scope(tablet_manager_, "LeaderElectionLost", req, resp, &context);
1944
77
  if (!scope) {
1945
0
    return;
1946
0
  }
1947
77
  auto status = scope->ElectionLostByProtege(req->election_lost_by_uuid());
1948
77
  scope.CheckStatus(status, resp);
1949
77
  LOG(INFO) << "LeaderElectionLost, outcome: " << (scope ? "success" : 
"failure"0
) << "req: "
1950
77
            << req->ShortDebugString();
1951
77
}
1952
1953
void ConsensusServiceImpl::LeaderStepDown(const LeaderStepDownRequestPB* req,
1954
                                          LeaderStepDownResponsePB* resp,
1955
54.1k
                                          RpcContext context) {
1956
54.1k
  LOG(INFO) << "Received Leader stepdown RPC: " << req->ShortDebugString();
1957
1958
54.1k
  if (PREDICT_FALSE(FLAGS_TEST_leader_stepdown_delay_ms > 0)) {
1959
2
    LOG(INFO) << "Delaying leader stepdown for "
1960
2
              << FLAGS_TEST_leader_stepdown_delay_ms << " ms.";
1961
2
    SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_leader_stepdown_delay_ms));
1962
2
  }
1963
1964
54.1k
  RpcScope scope(tablet_manager_, "LeaderStepDown", req, resp, &context);
1965
54.1k
  if (!scope) {
1966
0
    return;
1967
0
  }
1968
54.1k
  Status s = scope->StepDown(req, resp);
1969
54.1k
  LOG(INFO) << "Leader stepdown request " << req->ShortDebugString() << " success. Resp code="
1970
54.1k
            << TabletServerErrorPB::Code_Name(resp->error().code());
1971
54.1k
  scope.CheckStatus(s, resp);
1972
54.1k
}
1973
1974
void ConsensusServiceImpl::GetLastOpId(const consensus::GetLastOpIdRequestPB *req,
1975
                                       consensus::GetLastOpIdResponsePB *resp,
1976
575
                                       rpc::RpcContext context) {
1977
575
  DVLOG
(3) << "Received GetLastOpId RPC: " << req->DebugString()0
;
1978
1979
575
  if (PREDICT_FALSE(req->opid_type() == consensus::UNKNOWN_OPID_TYPE)) {
1980
0
    HandleErrorResponse(resp, &context,
1981
0
                        STATUS(InvalidArgument, "Invalid opid_type specified to GetLastOpId()"));
1982
0
    return;
1983
0
  }
1984
1985
575
  if (!CheckUuidMatchOrRespond(tablet_manager_, "GetLastOpId", req, resp, &context)) {
1986
0
    return;
1987
0
  }
1988
575
  auto peer_tablet = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond(
1989
556
      tablet_manager_, req->tablet_id(), resp, &context));
1990
556
  auto tablet_peer = peer_tablet.tablet_peer;
1991
1992
556
  if (tablet_peer->state() != tablet::RUNNING) {
1993
0
    SetupErrorAndRespond(resp->mutable_error(),
1994
0
                         STATUS(ServiceUnavailable, "Tablet Peer not in RUNNING state"),
1995
0
                         TabletServerErrorPB::TABLET_NOT_RUNNING, &context);
1996
0
    return;
1997
0
  }
1998
1999
556
  auto consensus = GetConsensusOrRespond(tablet_peer, resp, &context);
2000
556
  if (!consensus) 
return0
;
2001
556
  auto op_id = req->has_op_type()
2002
556
      ? 
consensus->TEST_GetLastOpIdWithType(req->opid_type(), req->op_type())3
2003
556
      : 
consensus->GetLastOpId(req->opid_type())553
;
2004
2005
  // RETURN_UNKNOWN_ERROR_IF_NOT_OK does not support Result, so have to add extra check here.
2006
556
  if (!op_id.ok()) {
2007
2
    RETURN_UNKNOWN_ERROR_IF_NOT_OK(op_id.status(), resp, &context);
2008
2
  }
2009
554
  op_id->ToPB(resp->mutable_opid());
2010
554
  context.RespondSuccess();
2011
554
}
2012
2013
void ConsensusServiceImpl::GetConsensusState(const consensus::GetConsensusStateRequestPB *req,
2014
                                             consensus::GetConsensusStateResponsePB *resp,
2015
4.06k
                                             rpc::RpcContext context) {
2016
18.4E
  DVLOG(3) << "Received GetConsensusState RPC: " << req->DebugString();
2017
2018
4.06k
  RpcScope scope(tablet_manager_, "GetConsensusState", req, resp, &context);
2019
4.06k
  if (!scope) {
2020
181
    return;
2021
181
  }
2022
3.88k
  ConsensusConfigType type = req->type();
2023
3.88k
  if (PREDICT_FALSE(type != CONSENSUS_CONFIG_ACTIVE && type != CONSENSUS_CONFIG_COMMITTED)) {
2024
0
    HandleErrorResponse(resp, &context,
2025
0
        STATUS(InvalidArgument, Substitute("Unsupported ConsensusConfigType $0 ($1)",
2026
0
                                           ConsensusConfigType_Name(type), type)));
2027
0
    return;
2028
0
  }
2029
3.88k
  LeaderLeaseStatus leader_lease_status;
2030
3.88k
  *resp->mutable_cstate() = scope->ConsensusState(req->type(), &leader_lease_status);
2031
3.88k
  resp->set_leader_lease_status(leader_lease_status);
2032
3.88k
}
2033
2034
void ConsensusServiceImpl::StartRemoteBootstrap(const StartRemoteBootstrapRequestPB* req,
2035
                                                StartRemoteBootstrapResponsePB* resp,
2036
10.6k
                                                rpc::RpcContext context) {
2037
10.6k
  if (!CheckUuidMatchOrRespond(tablet_manager_, "StartRemoteBootstrap", req, resp, &context)) {
2038
0
    return;
2039
0
  }
2040
10.6k
  if (req->has_split_parent_tablet_id()
2041
10.6k
      && 
!88
PREDICT_FALSE88
(FLAGS_TEST_disable_post_split_tablet_rbs_check)) {
2042
    // For any tablet that was the result of a split, the raft group leader will always send the
2043
    // split_parent_tablet_id. However, our local tablet manager should only know about the parent
2044
    // if it was part of the raft group which committed the split to the parent, and if the parent
2045
    // tablet has yet to be deleted across the cluster.
2046
86
    TabletPeerTablet result;
2047
86
    if (tablet_manager_->GetTabletPeer(req->split_parent_tablet_id(), &result.tablet_peer).ok()) {
2048
85
      YB_LOG_EVERY_N_SECS(WARNING, 30)
2049
2
          << "Start remote bootstrap rejected: parent tablet not yet split.";
2050
85
      SetupErrorAndRespond(
2051
85
          resp->mutable_error(),
2052
85
          STATUS(Incomplete, "Rejecting bootstrap request while parent tablet is present."),
2053
85
          TabletServerErrorPB::TABLET_SPLIT_PARENT_STILL_LIVE,
2054
85
          &context);
2055
85
      return;
2056
85
    }
2057
86
  }
2058
2059
10.5k
  Status s = tablet_manager_->StartRemoteBootstrap(*req);
2060
10.5k
  if (!s.ok()) {
2061
    // Using Status::AlreadyPresent for a remote bootstrap operation that is already in progress.
2062
8.54k
    if (s.IsAlreadyPresent()) {
2063
8.53k
      YB_LOG_EVERY_N_SECS
(WARNING, 30) << "Start remote bootstrap failed: " << s67
;
2064
8.53k
      SetupErrorAndRespond(resp->mutable_error(), s, TabletServerErrorPB::ALREADY_IN_PROGRESS,
2065
8.53k
                           &context);
2066
8.53k
      return;
2067
8.53k
    } else {
2068
6
      LOG(WARNING) << "Start remote bootstrap failed: " << s;
2069
6
    }
2070
8.54k
  }
2071
2072
2.02k
  RETURN_UNKNOWN_ERROR_IF_NOT_OK(s, resp, &context);
2073
2.01k
  context.RespondSuccess();
2074
2.01k
}
2075
2076
void TabletServiceImpl::NoOp(const NoOpRequestPB *req,
2077
                             NoOpResponsePB *resp,
2078
0
                             rpc::RpcContext context) {
2079
0
  context.RespondSuccess();
2080
0
}
2081
2082
void TabletServiceImpl::Publish(
2083
684
    const PublishRequestPB* req, PublishResponsePB* resp, rpc::RpcContext context) {
2084
684
  rpc::Publisher* publisher = server_->GetPublisher();
2085
684
  resp->set_num_clients_forwarded_to(publisher ? 
(*publisher)(req->channel(), req->message())239
:
0445
);
2086
684
  context.RespondSuccess();
2087
684
}
2088
2089
void TabletServiceImpl::ListTablets(const ListTabletsRequestPB* req,
2090
                                    ListTabletsResponsePB* resp,
2091
217
                                    rpc::RpcContext context) {
2092
217
  TabletPeers peers = server_->tablet_manager()->GetTabletPeers();
2093
217
  RepeatedPtrField<StatusAndSchemaPB>* peer_status = resp->mutable_status_and_schema();
2094
3.19k
  for (const TabletPeerPtr& peer : peers) {
2095
3.19k
    StatusAndSchemaPB* status = peer_status->Add();
2096
3.19k
    peer->GetTabletStatusPB(status->mutable_tablet_status());
2097
3.19k
    SchemaToPB(*peer->status_listener()->schema(), status->mutable_schema());
2098
3.19k
    peer->tablet_metadata()->partition_schema()->ToPB(status->mutable_partition_schema());
2099
3.19k
  }
2100
217
  context.RespondSuccess();
2101
217
}
2102
2103
void TabletServiceImpl::GetMasterAddresses(const GetMasterAddressesRequestPB* req,
2104
                                           GetMasterAddressesResponsePB* resp,
2105
9
                                           rpc::RpcContext context) {
2106
9
  resp->set_master_addresses(server::MasterAddressesToString(
2107
9
      *server_->tablet_manager()->server()->options().GetMasterAddresses()));
2108
9
  context.RespondSuccess();
2109
9
}
2110
2111
void TabletServiceImpl::GetLogLocation(
2112
    const GetLogLocationRequestPB* req,
2113
    GetLogLocationResponsePB* resp,
2114
0
    rpc::RpcContext context) {
2115
0
  resp->set_log_location(FLAGS_log_dir);
2116
0
  context.RespondSuccess();
2117
0
}
2118
2119
void TabletServiceImpl::ListTabletsForTabletServer(const ListTabletsForTabletServerRequestPB* req,
2120
                                                   ListTabletsForTabletServerResponsePB* resp,
2121
881
                                                   rpc::RpcContext context) {
2122
  // Replicating logic from path-handlers.
2123
881
  TabletPeers peers = server_->tablet_manager()->GetTabletPeers();
2124
4.41k
  for (const TabletPeerPtr& peer : peers) {
2125
4.41k
    TabletStatusPB status;
2126
4.41k
    peer->GetTabletStatusPB(&status);
2127
2128
4.41k
    ListTabletsForTabletServerResponsePB::Entry* data_entry = resp->add_entries();
2129
4.41k
    data_entry->set_table_name(status.table_name());
2130
4.41k
    data_entry->set_tablet_id(status.tablet_id());
2131
2132
4.41k
    std::shared_ptr<consensus::Consensus> consensus = peer->shared_consensus();
2133
4.41k
    data_entry->set_is_leader(consensus && 
consensus->role() == PeerRole::LEADER4.39k
);
2134
4.41k
    data_entry->set_state(status.state());
2135
2136
4.41k
    auto tablet = peer->shared_tablet();
2137
4.41k
    uint64_t num_sst_files = tablet ? 
tablet->GetCurrentVersionNumSSTFiles()4.39k
:
012
;
2138
4.41k
    data_entry->set_num_sst_files(num_sst_files);
2139
2140
4.41k
    uint64_t num_log_segments = peer->GetNumLogSegments();
2141
4.41k
    data_entry->set_num_log_segments(num_log_segments);
2142
2143
4.41k
    auto num_memtables = tablet ? 
tablet->GetNumMemtables()4.39k
:
std::make_pair(0, 0)12
;
2144
4.41k
    data_entry->set_num_memtables_intents(num_memtables.first);
2145
4.41k
    data_entry->set_num_memtables_regular(num_memtables.second);
2146
4.41k
  }
2147
2148
881
  context.RespondSuccess();
2149
881
}
2150
2151
namespace {
2152
2153
3.29k
Result<uint64_t> CalcChecksum(tablet::Tablet* tablet, CoarseTimePoint deadline) {
2154
3.29k
  const shared_ptr<Schema> schema = tablet->metadata()->schema();
2155
3.29k
  auto client_schema = schema->CopyWithoutColumnIds();
2156
3.29k
  auto iter = tablet->NewRowIterator(client_schema, {}, "", deadline);
2157
3.29k
  RETURN_NOT_OK(iter);
2158
2159
3.29k
  QLTableRow value_map;
2160
3.29k
  ScanResultChecksummer collector;
2161
2162
1.62M
  while (VERIFY_RESULT((**iter).HasNext())) {
2163
1.62M
    RETURN_NOT_OK((**iter).NextRow(&value_map));
2164
1.62M
    collector.HandleRow(*schema, value_map);
2165
1.62M
  }
2166
2167
3.29k
  return collector.agg_checksum();
2168
3.29k
}
2169
2170
} // namespace
2171
2172
Result<uint64_t> TabletServiceImpl::DoChecksum(
2173
3.30k
    const ChecksumRequestPB* req, CoarseTimePoint deadline) {
2174
3.30k
  auto abstract_tablet = 
VERIFY_RESULT3.30k
(GetTablet(
2175
3.30k
      server_->tablet_peer_lookup(), req->tablet_id(), /* tablet_peer = */ nullptr,
2176
3.30k
      req->consistency_level(), AllowSplitTablet::kTrue));
2177
0
  return CalcChecksum(down_cast<tablet::Tablet*>(abstract_tablet.get()), deadline);
2178
3.30k
}
2179
2180
void TabletServiceImpl::Checksum(const ChecksumRequestPB* req,
2181
                                 ChecksumResponsePB* resp,
2182
3.30k
                                 rpc::RpcContext context) {
2183
3.30k
  VLOG
(1) << "Full request: " << req->DebugString()7
;
2184
2185
3.30k
  auto checksum = DoChecksum(req, context.GetClientDeadline());
2186
3.30k
  if (!checksum.ok()) {
2187
1
    SetupErrorAndRespond(resp->mutable_error(), checksum.status(), &context);
2188
1
    return;
2189
1
  }
2190
2191
3.30k
  resp->set_checksum(*checksum);
2192
3.30k
  context.RespondSuccess();
2193
3.30k
}
2194
2195
void TabletServiceImpl::ImportData(const ImportDataRequestPB* req,
2196
                                   ImportDataResponsePB* resp,
2197
0
                                   rpc::RpcContext context) {
2198
0
  auto peer = VERIFY_RESULT_OR_RETURN(LookupTabletPeerOrRespond(
2199
0
      server_->tablet_peer_lookup(), req->tablet_id(), resp, &context));
2200
2201
0
  auto status = peer.tablet_peer->tablet()->ImportData(req->source_dir());
2202
0
  if (!status.ok()) {
2203
0
    SetupErrorAndRespond(resp->mutable_error(), status, &context);
2204
0
    return;
2205
0
  }
2206
0
  context.RespondSuccess();
2207
0
}
2208
2209
void TabletServiceImpl::GetTabletStatus(const GetTabletStatusRequestPB* req,
2210
                                        GetTabletStatusResponsePB* resp,
2211
1
                                        rpc::RpcContext context) {
2212
1
  const Status s = server_->GetTabletStatus(req, resp);
2213
1
  if (!s.ok()) {
2214
0
    SetupErrorAndRespond(resp->mutable_error(), s,
2215
0
                         s.IsNotFound() ? TabletServerErrorPB::TABLET_NOT_FOUND
2216
0
                                        : TabletServerErrorPB::UNKNOWN_ERROR,
2217
0
                         &context);
2218
0
    return;
2219
0
  }
2220
1
  context.RespondSuccess();
2221
1
}
2222
2223
void TabletServiceImpl::IsTabletServerReady(const IsTabletServerReadyRequestPB* req,
2224
                                            IsTabletServerReadyResponsePB* resp,
2225
9
                                            rpc::RpcContext context) {
2226
9
  Status s = server_->tablet_manager()->GetNumTabletsPendingBootstrap(resp);
2227
9
  if (!s.ok()) {
2228
0
    SetupErrorAndRespond(resp->mutable_error(), s, &context);
2229
0
    return;
2230
0
  }
2231
9
  context.RespondSuccess();
2232
9
}
2233
2234
void TabletServiceImpl::TakeTransaction(const TakeTransactionRequestPB* req,
2235
                                        TakeTransactionResponsePB* resp,
2236
0
                                        rpc::RpcContext context) {
2237
0
  auto transaction = server_->TransactionPool()->Take(
2238
0
      client::ForceGlobalTransaction(req->has_is_global() && req->is_global()),
2239
0
      context.GetClientDeadline());
2240
0
  auto metadata = transaction->Release();
2241
0
  if (!metadata.ok()) {
2242
0
    LOG(INFO) << "Take failed: " << metadata.status();
2243
0
    context.RespondFailure(metadata.status());
2244
0
    return;
2245
0
  }
2246
0
  metadata->ForceToPB(resp->mutable_metadata());
2247
0
  VLOG(2) << "Taken metadata: " << metadata->ToString();
2248
0
  context.RespondSuccess();
2249
0
}
2250
2251
void TabletServiceImpl::GetSplitKey(
2252
144
    const GetSplitKeyRequestPB* req, GetSplitKeyResponsePB* resp, RpcContext context) {
2253
144
  TEST_PAUSE_IF_FLAG(TEST_pause_tserver_get_split_key);
2254
144
  PerformAtLeader(req, resp, &context,
2255
144
      [resp](const LeaderTabletPeer& leader_tablet_peer) -> Status {
2256
144
        const auto& tablet = leader_tablet_peer.tablet;
2257
144
        if (FLAGS_rocksdb_max_file_size_for_compaction > 0 &&
2258
144
            
tablet->schema()->table_properties().HasDefaultTimeToLive()0
) {
2259
0
          auto s = STATUS(NotSupported, "Tablet splitting not supported for TTL tables.");
2260
0
          return s.CloneAndAddErrorCode(
2261
0
              TabletServerError(TabletServerErrorPB::TABLET_SPLIT_DISABLED_TTL_EXPIRY));
2262
0
        }
2263
144
        if (tablet->MayHaveOrphanedPostSplitData()) {
2264
0
          return STATUS(IllegalState, "Tablet has orphaned post-split data");
2265
0
        }
2266
144
        const auto split_encoded_key = 
VERIFY_RESULT143
(tablet->GetEncodedMiddleSplitKey());143
2267
0
        resp->set_split_encoded_key(split_encoded_key);
2268
143
        const auto doc_key_hash = VERIFY_RESULT(docdb::DecodeDocKeyHash(split_encoded_key));
2269
143
        if (doc_key_hash.has_value()) {
2270
143
          resp->set_split_partition_key(PartitionSchema::EncodeMultiColumnHashValue(
2271
143
              doc_key_hash.value()));
2272
143
        } else {
2273
0
          resp->set_split_partition_key(split_encoded_key);
2274
0
        }
2275
143
        return Status::OK();
2276
143
  });
2277
144
}
2278
2279
void TabletServiceImpl::GetSharedData(const GetSharedDataRequestPB* req,
2280
                                      GetSharedDataResponsePB* resp,
2281
8
                                      rpc::RpcContext context) {
2282
8
  auto& data = server_->SharedObject();
2283
8
  resp->mutable_data()->assign(pointer_cast<const char*>(&data), sizeof(data));
2284
8
  context.RespondSuccess();
2285
8
}
2286
2287
188
void TabletServiceImpl::Shutdown() {
2288
188
}
2289
2290
scoped_refptr<Histogram> TabletServer::GetMetricsHistogram(
2291
2.14M
    TabletServerServiceRpcMethodIndexes metric) {
2292
  // Returns the metric Histogram by holding a lock to make sure tablet_server_service_ remains
2293
  // unchanged during the operation.
2294
2.14M
  std::lock_guard<simple_spinlock> l(lock_);
2295
2.14M
  if (tablet_server_service_) {
2296
2.14M
    return tablet_server_service_->GetMetric(metric).handler_latency;
2297
2.14M
  }
2298
0
  return nullptr;
2299
2.14M
}
2300
2301
TabletServerForwardServiceImpl::TabletServerForwardServiceImpl(TabletServiceImpl *impl,
2302
                                                               TabletServerIf *server)
2303
  : TabletServerForwardServiceIf(server->MetricEnt()),
2304
8.74k
    server_(server) {
2305
8.74k
}
2306
2307
void TabletServerForwardServiceImpl::Write(const WriteRequestPB* req,
2308
                                           WriteResponsePB* resp,
2309
0
                                           rpc::RpcContext context) {
2310
  // Forward the rpc to the required Tserver.
2311
0
  std::shared_ptr<ForwardWriteRpc> forward_rpc =
2312
0
    std::make_shared<ForwardWriteRpc>(req, resp, std::move(context), server_->client());
2313
0
  forward_rpc->SendRpc();
2314
0
}
2315
2316
void TabletServerForwardServiceImpl::Read(const ReadRequestPB* req,
2317
                                          ReadResponsePB* resp,
2318
0
                                          rpc::RpcContext context) {
2319
0
  std::shared_ptr<ForwardReadRpc> forward_rpc =
2320
0
    std::make_shared<ForwardReadRpc>(req, resp, std::move(context), server_->client());
2321
0
  forward_rpc->SendRpc();
2322
0
}
2323
2324
}  // namespace tserver
2325
}  // namespace yb