YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/tablet/write_query.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/tablet/write_query.h"
15
16
#include "yb/client/client.h"
17
#include "yb/client/error.h"
18
#include "yb/client/meta_data_cache.h"
19
#include "yb/client/session.h"
20
#include "yb/client/table.h"
21
#include "yb/client/transaction.h"
22
#include "yb/client/yb_op.h"
23
24
#include "yb/common/index.h"
25
#include "yb/common/row_mark.h"
26
#include "yb/common/schema.h"
27
28
#include "yb/docdb/conflict_resolution.h"
29
#include "yb/docdb/cql_operation.h"
30
#include "yb/docdb/doc_write_batch.h"
31
#include "yb/docdb/pgsql_operation.h"
32
#include "yb/docdb/redis_operation.h"
33
34
#include "yb/tablet/tablet_metadata.h"
35
#include "yb/tablet/operations/write_operation.h"
36
#include "yb/tablet/tablet.h"
37
#include "yb/tablet/tablet_metrics.h"
38
#include "yb/tablet/transaction_participant.h"
39
#include "yb/tablet/write_query_context.h"
40
41
#include "yb/tserver/tserver.pb.h"
42
43
#include "yb/util/logging.h"
44
#include "yb/util/metrics.h"
45
#include "yb/util/trace.h"
46
47
using namespace std::placeholders;
48
49
namespace yb {
50
namespace tablet {
51
52
namespace {
53
54
// Separate Redis / QL / row operations write batches from write_request in preparation for the
55
// write transaction. Leave just the tablet id behind. Return Redis / QL / row operations, etc.
56
// in batch_request.
57
2.93M
void SetupKeyValueBatch(const tserver::WriteRequestPB& client_request, WritePB* out_request) {
58
2.93M
  out_request->set_unused_tablet_id(""); // Backward compatibility.
59
2.93M
  auto& out_write_batch = *out_request->mutable_write_batch();
60
2.93M
  if (client_request.has_write_batch()) {
61
673k
    out_write_batch = client_request.write_batch();
62
673k
  }
63
2.93M
  out_write_batch.set_deprecated_may_have_metadata(true);
64
2.93M
  if (client_request.has_request_id()) {
65
2.52M
    out_request->set_client_id1(client_request.client_id1());
66
2.52M
    out_request->set_client_id2(client_request.client_id2());
67
2.52M
    out_request->set_request_id(client_request.request_id());
68
2.52M
    out_request->set_min_running_request_id(client_request.min_running_request_id());
69
2.52M
  }
70
2.93M
  out_request->set_batch_idx(client_request.batch_idx());
71
  // Actually, in production code, we could check for external hybrid time only when there are
72
  // no ql, pgsql, redis operations.
73
  // But in CDCServiceTest we have ql write batch with external time.
74
2.93M
  if (client_request.has_external_hybrid_time()) {
75
1.87k
    out_request->set_external_hybrid_time(client_request.external_hybrid_time());
76
1.87k
  }
77
2.93M
}
78
79
} // namespace
80
81
enum class WriteQuery::ExecuteMode {
82
  kSimple,
83
  kRedis,
84
  kCql,
85
  kPgsql,
86
};
87
88
WriteQuery::WriteQuery(
89
    int64_t term,
90
    CoarseTimePoint deadline,
91
    WriteQueryContext* context,
92
    Tablet* tablet,
93
    tserver::WriteResponsePB* response,
94
    docdb::OperationKind kind)
95
    : operation_(std::make_unique<WriteOperation>(tablet)),
96
      term_(term), deadline_(deadline),
97
      context_(context),
98
      response_(response),
99
      kind_(kind),
100
3.26M
      start_time_(CoarseMonoClock::Now()) {
101
3.26M
}
102
103
11.9M
WritePB& WriteQuery::request() {
104
11.9M
  return *operation_->mutable_request();
105
11.9M
}
106
107
3.11M
std::unique_ptr<WriteOperation> WriteQuery::PrepareSubmit() {
108
3.11M
  operation_->set_completion_callback(
109
3.11M
      [operation = operation_.get(), query = this](const Status& status) {
110
3.11M
    std::unique_ptr<WriteQuery> query_holder(query);
111
3.11M
    query->Finished(operation, status);
112
3.11M
  });
113
3.11M
  return std::move(operation_);
114
3.11M
}
115
116
3.26M
void WriteQuery::DoStartSynchronization(const Status& status) {
117
3.26M
  std::unique_ptr<WriteQuery> self(this);
118
  // Move submit_token_ so it is released after this function.
119
3.26M
  ScopedRWOperation submit_token(std::move(submit_token_));
120
  // If a restart read is required, then we return this fact to caller and don't perform the write
121
  // operation.
122
3.26M
  if (status.ok() && 
restart_read_ht_.is_valid()3.11M
) {
123
0
    auto restart_time = response()->mutable_restart_read_time();
124
0
    restart_time->set_read_ht(restart_read_ht_.ToUint64());
125
0
    auto local_limit = context_->ReportReadRestart();
126
0
    if (!local_limit.ok()) {
127
0
      Cancel(local_limit.status());
128
0
      return;
129
0
    }
130
0
    restart_time->set_deprecated_max_of_read_time_and_local_limit_ht(local_limit->ToUint64());
131
0
    restart_time->set_local_limit_ht(local_limit->ToUint64());
132
    // Global limit is ignored by caller, so we don't set it.
133
0
    Cancel(Status::OK());
134
0
    return;
135
0
  }
136
137
3.26M
  if (!status.ok()) {
138
146k
    Cancel(status);
139
146k
    return;
140
146k
  }
141
142
3.11M
  context_->Submit(self.release()->PrepareSubmit(), term_);
143
3.11M
}
144
145
3.26M
void WriteQuery::Release() {
146
  // Free DocDB multi-level locks.
147
3.26M
  docdb_locks_.Reset();
148
3.26M
}
149
150
3.26M
WriteQuery::~WriteQuery() {
151
3.26M
}
152
153
2.95M
void WriteQuery::set_client_request(std::reference_wrapper<const tserver::WriteRequestPB> req) {
154
2.95M
  client_request_ = &req.get();
155
2.95M
  read_time_ = ReadHybridTime::FromReadTimePB(req.get());
156
2.95M
  allow_immediate_read_restart_ = !read_time_;
157
2.95M
}
158
159
0
void WriteQuery::set_client_request(std::unique_ptr<tserver::WriteRequestPB> req) {
160
0
  set_client_request(*req);
161
0
  client_request_holder_ = std::move(req);
162
0
}
163
164
3.11M
void WriteQuery::Finished(WriteOperation* operation, const Status& status) {
165
3.11M
  LOG_IF
(DFATAL, operation_) << "Finished not submitted operation: " << status1.31k
;
166
167
3.11M
  if (status.ok()) {
168
3.08M
    TabletMetrics* metrics = operation->tablet()->metrics();
169
3.08M
    if (metrics) {
170
3.08M
      auto op_duration_usec = MonoDelta(CoarseMonoClock::now() - start_time_).ToMicroseconds();
171
3.08M
      metrics->write_op_duration_client_propagated_consistency->Increment(op_duration_usec);
172
3.08M
    }
173
3.08M
  }
174
175
3.11M
  Complete(status);
176
3.11M
}
177
178
146k
void WriteQuery::Cancel(const Status& status) {
179
146k
  LOG_IF
(DFATAL, !operation_) << "Cancelled submitted operation: " << status75
;
180
181
146k
  Complete(status);
182
146k
}
183
184
3.26M
void WriteQuery::Complete(const Status& status) {
185
3.26M
  Release();
186
187
3.26M
  if (callback_) {
188
3.16M
    callback_(status);
189
3.16M
  }
190
3.26M
}
191
192
3.24M
void WriteQuery::ExecuteDone(const Status& status) {
193
3.24M
  scoped_read_operation_.Reset();
194
3.24M
  switch (execute_mode_) {
195
314k
    case ExecuteMode::kSimple:
196
314k
      SimpleExecuteDone(status);
197
314k
      return;
198
123k
    case ExecuteMode::kRedis:
199
123k
      RedisExecuteDone(status);
200
123k
      return;
201
2.11M
    case ExecuteMode::kCql:
202
2.11M
      CqlExecuteDone(status);
203
2.11M
      return;
204
695k
    case ExecuteMode::kPgsql:
205
695k
      PgsqlExecuteDone(status);
206
695k
      return;
207
3.24M
  }
208
0
  FATAL_INVALID_ENUM_VALUE(ExecuteMode, execute_mode_);
209
0
}
210
211
3.24M
Result<bool> WriteQuery::PrepareExecute() {
212
3.24M
  if (client_request_) {
213
2.93M
    auto* request = operation().AllocateRequest();
214
2.93M
    SetupKeyValueBatch(*client_request_, request);
215
216
2.93M
    if (!client_request_->redis_write_batch().empty()) {
217
123k
      return RedisPrepareExecute();
218
123k
    }
219
220
2.81M
    if (!client_request_->ql_write_batch().empty()) {
221
2.11M
      return CqlPrepareExecute();
222
2.11M
    }
223
224
695k
    if (!client_request_->pgsql_write_batch().empty()) {
225
692k
      return PgsqlPrepareExecute();
226
692k
    }
227
228
2.94k
    if (client_request_->has_write_batch() && 
client_request_->has_external_hybrid_time()0
) {
229
0
      return false;
230
0
    }
231
314k
  } else {
232
314k
    const auto* request = operation().request();
233
314k
    if (request && 
request->has_write_batch()314k
&&
!request->write_batch().read_pairs().empty()314k
) {
234
314k
      return SimplePrepareExecute();
235
314k
    }
236
314k
  }
237
238
  // Empty write should not happen, but we could handle it.
239
  // Just report it as error in release mode.
240
3.30k
  LOG(DFATAL) << "Empty write: " << AsString(client_request_) << ", " << AsString(request());
241
242
3.30k
  return STATUS(InvalidArgument, "Empty write");
243
3.24M
}
244
245
3.24M
CHECKED_STATUS WriteQuery::InitExecute(ExecuteMode mode) {
246
3.24M
  scoped_read_operation_ = tablet().CreateNonAbortableScopedRWOperation();
247
3.24M
  if (!scoped_read_operation_.ok()) {
248
99
    return MoveStatus(scoped_read_operation_);
249
99
  }
250
3.24M
  execute_mode_ = mode;
251
3.24M
  return Status::OK();
252
3.24M
}
253
254
123k
Result<bool> WriteQuery::RedisPrepareExecute() {
255
123k
  RETURN_NOT_OK(InitExecute(ExecuteMode::kRedis));
256
257
  // Since we take exclusive locks, it's okay to use Now as the read TS for writes.
258
123k
  const auto& redis_write_batch = client_request_->redis_write_batch();
259
260
123k
  doc_ops_.reserve(redis_write_batch.size());
261
123k
  for (const auto& redis_request : redis_write_batch) {
262
123k
    doc_ops_.emplace_back(new docdb::RedisWriteOperation(redis_request));
263
123k
  }
264
265
123k
  return true;
266
123k
}
267
268
314k
Result<bool> WriteQuery::SimplePrepareExecute() {
269
314k
  RETURN_NOT_OK(InitExecute(ExecuteMode::kSimple));
270
314k
  return true;
271
314k
}
272
273
2.11M
Result<bool> WriteQuery::CqlPrepareExecute() {
274
2.11M
  RETURN_NOT_OK(InitExecute(ExecuteMode::kCql));
275
276
2.11M
  auto& metadata = *tablet().metadata();
277
18.4E
  DVLOG(2) << "Schema version for  " << metadata.table_name() << ": " << metadata.schema_version();
278
279
2.11M
  const auto& ql_write_batch = client_request_->ql_write_batch();
280
281
2.11M
  doc_ops_.reserve(ql_write_batch.size());
282
283
2.11M
  auto txn_op_ctx = VERIFY_RESULT(tablet().CreateTransactionOperationContext(
284
2.11M
      request().write_batch().transaction(),
285
2.11M
      /* is_ysql_catalog_table */ false,
286
2.11M
      &request().write_batch().subtransaction()));
287
0
  auto table_info = metadata.primary_table_info();
288
4.51M
  for (const auto& req : ql_write_batch) {
289
4.51M
    QLResponsePB* resp = response_->add_ql_response_batch();
290
4.51M
    if (!IsSchemaVersionCompatible(
291
4.51M
            table_info->schema_version, req.schema_version(),
292
4.51M
            req.is_compatible_with_previous_version())) {
293
18.4E
      DVLOG(1) << " On " << table_info->table_name
294
18.4E
               << " Setting status for write as YQL_STATUS_SCHEMA_VERSION_MISMATCH tserver's: "
295
18.4E
               << table_info->schema_version << " vs req's : " << req.schema_version()
296
18.4E
               << " is req compatible with prev version: "
297
18.4E
               << req.is_compatible_with_previous_version() << " for " << AsString(req);
298
1.55k
      resp->set_status(QLResponsePB::YQL_STATUS_SCHEMA_VERSION_MISMATCH);
299
1.55k
      resp->set_error_message(Format(
300
1.55k
          "schema version mismatch for table $0: expected $1, got $2 (compt with prev: $3)",
301
1.55k
          table_info->table_id,
302
1.55k
          table_info->schema_version, req.schema_version(),
303
1.55k
          req.is_compatible_with_previous_version()));
304
4.51M
    } else {
305
4.51M
      DVLOG(3) << "Version matches : " << table_info->schema_version << " for "
306
3.27k
               << AsString(req);
307
4.51M
      auto write_op = std::make_unique<docdb::QLWriteOperation>(
308
4.51M
          req, std::shared_ptr<Schema>(table_info, table_info->schema.get()),
309
4.51M
          *table_info->index_map, tablet().unique_index_key_schema(),
310
4.51M
          txn_op_ctx);
311
4.51M
      RETURN_NOT_OK(write_op->Init(resp));
312
4.51M
      doc_ops_.emplace_back(std::move(write_op));
313
4.51M
    }
314
4.51M
  }
315
316
  // All operations has wrong schema version
317
2.11M
  if (doc_ops_.empty()) {
318
1.27k
    return false;
319
1.27k
  }
320
321
2.11M
  return true;
322
2.11M
}
323
324
694k
Result<bool> WriteQuery::PgsqlPrepareExecute() {
325
694k
  RETURN_NOT_OK(InitExecute(ExecuteMode::kPgsql));
326
327
694k
  const auto& pgsql_write_batch = client_request_->pgsql_write_batch();
328
329
694k
  doc_ops_.reserve(pgsql_write_batch.size());
330
331
694k
  TransactionOperationContext txn_op_ctx;
332
333
694k
  auto& metadata = *tablet().metadata();
334
694k
  bool colocated = metadata.colocated();
335
336
12.8M
  for (const auto& req : pgsql_write_batch) {
337
12.8M
    PgsqlResponsePB* resp = response_->add_pgsql_response_batch();
338
    // Table-level tombstones should not be requested for non-colocated tables.
339
12.8M
    if ((req.stmt_type() == PgsqlWriteRequestPB::PGSQL_TRUNCATE_COLOCATED) && 
!colocated90
) {
340
0
      LOG(WARNING) << "cannot create table-level tombstone for a non-colocated table";
341
0
      resp->set_skipped(true);
342
0
      continue;
343
0
    }
344
12.8M
    const std::shared_ptr<tablet::TableInfo> table_info =
345
12.8M
        VERIFY_RESULT(metadata.GetTableInfo(req.table_id()));
346
12.8M
    if (table_info->schema_version != req.schema_version()) {
347
22
      resp->set_status(PgsqlResponsePB::PGSQL_STATUS_SCHEMA_VERSION_MISMATCH);
348
22
      resp->set_error_message(
349
22
          Format("schema version mismatch for table $0: expected $1, got $2",
350
22
                 table_info->table_id,
351
22
                 table_info->schema_version,
352
22
                 req.schema_version()));
353
12.8M
    } else {
354
12.8M
      if (doc_ops_.empty()) {
355
        // Use the value of is_ysql_catalog_table from the first operation in the batch.
356
693k
        txn_op_ctx = VERIFY_RESULT(tablet().CreateTransactionOperationContext(
357
693k
            request().write_batch().transaction(),
358
693k
            table_info->schema->table_properties().is_ysql_catalog_table(),
359
693k
            &request().write_batch().subtransaction()));
360
693k
      }
361
12.8M
      auto write_op = std::make_unique<docdb::PgsqlWriteOperation>(
362
12.8M
          req, *table_info->schema, txn_op_ctx);
363
12.8M
      RETURN_NOT_OK(write_op->Init(resp));
364
12.8M
      doc_ops_.emplace_back(std::move(write_op));
365
12.8M
    }
366
12.8M
  }
367
368
  // All operations have wrong schema version.
369
694k
  if (doc_ops_.empty()) {
370
22
    return false;
371
22
  }
372
373
694k
  return true;
374
694k
}
375
376
3.24M
void WriteQuery::Execute(std::unique_ptr<WriteQuery> query) {
377
3.24M
  auto prepare_result = query->PrepareExecute();
378
3.24M
  if (!prepare_result.ok()) {
379
99
    StartSynchronization(std::move(query), prepare_result.status());
380
99
    return;
381
99
  }
382
383
3.24M
  if (!prepare_result.get()) {
384
1.29k
    StartSynchronization(std::move(query), Status::OK());
385
1.29k
    return;
386
1.29k
  }
387
388
3.24M
  auto* query_ptr = query.get();
389
3.24M
  query_ptr->self_ = std::move(query);
390
3.24M
  auto status = query_ptr->DoExecute();
391
3.24M
  if (!status.ok()) {
392
711
    query_ptr->ExecuteDone(status);
393
711
  }
394
3.24M
}
395
396
3.24M
CHECKED_STATUS WriteQuery::DoExecute() {
397
3.24M
  auto& write_batch = *request().mutable_write_batch();
398
3.24M
  isolation_level_ = 
VERIFY_RESULT3.24M
(3.24M
tablet().GetIsolationLevelFromPB(write_batch));
399
0
  const RowMarkType row_mark_type = GetRowMarkTypeFromPB(write_batch);
400
3.24M
  const auto& metadata = *tablet().metadata();
401
402
3.24M
  const bool transactional_table = metadata.schema()->table_properties().is_transactional() ||
403
3.24M
                                   
force_txn_path_2.27M
;
404
405
3.24M
  if (!transactional_table && 
isolation_level_ != IsolationLevel::NON_TRANSACTIONAL2.09M
) {
406
0
    YB_LOG_EVERY_N_SECS(DFATAL, 30)
407
0
        << "An attempt to perform a transactional operation on a non-transactional table: "
408
0
        << operation_->ToString();
409
0
  }
410
411
3.24M
  docdb::PartialRangeKeyIntents partial_range_key_intents(metadata.UsePartialRangeKeyIntents());
412
3.24M
  prepare_result_ = 
VERIFY_RESULT3.24M
(3.24M
docdb::PrepareDocWriteOperation(
413
0
      doc_ops_, write_batch.read_pairs(), tablet().metrics()->write_lock_latency,
414
0
      isolation_level_, kind(), row_mark_type, transactional_table,
415
0
      deadline(), partial_range_key_intents, tablet().shared_lock_manager()));
416
417
0
  auto* transaction_participant = tablet().transaction_participant();
418
3.24M
  if (transaction_participant) {
419
1.39M
    request_scope_ = RequestScope(transaction_participant);
420
1.39M
  }
421
422
3.24M
  if (!tablet().txns_enabled() || 
!transactional_table3.09M
) {
423
2.13M
    CompleteExecute();
424
2.13M
    return Status::OK();
425
2.13M
  }
426
427
1.10M
  if (isolation_level_ == IsolationLevel::NON_TRANSACTIONAL) {
428
122k
    auto now = tablet().clock()->Now();
429
122k
    docdb::ResolveOperationConflicts(
430
122k
        doc_ops_, now, tablet().doc_db(), partial_range_key_intents,
431
122k
        transaction_participant, tablet().metrics()->transaction_conflicts.get(),
432
122k
        [this, now](const Result<HybridTime>& result) {
433
122k
          if (!result.ok()) {
434
3
            ExecuteDone(result.status());
435
3
            TRACE("InvokeCallback");
436
3
            return;
437
3
          }
438
122k
          NonTransactionalConflictsResolved(now, *result);
439
122k
          TRACE("NonTransactionalConflictsResolved");
440
122k
        });
441
122k
    return Status::OK();
442
122k
  }
443
444
983k
  if (isolation_level_ == IsolationLevel::SERIALIZABLE_ISOLATION &&
445
983k
      
prepare_result_.need_read_snapshot437k
) {
446
72.8k
    boost::container::small_vector<RefCntPrefix, 16> paths;
447
73.0k
    for (const auto& doc_op : doc_ops_) {
448
73.0k
      paths.clear();
449
73.0k
      IsolationLevel ignored_isolation_level;
450
73.0k
      RETURN_NOT_OK(doc_op->GetDocPaths(
451
73.0k
          docdb::GetDocPathsMode::kLock, &paths, &ignored_isolation_level));
452
73.0k
      
for (const auto& path : paths)73.0k
{
453
73.0k
        auto key = path.as_slice();
454
73.0k
        auto* pair = write_batch.mutable_read_pairs()->Add();
455
73.0k
        pair->set_key(key.data(), key.size());
456
        // Empty values are disallowed by docdb.
457
        // https://github.com/YugaByte/yugabyte-db/issues/736
458
73.0k
        pair->set_value(std::string(1, docdb::ValueTypeAsChar::kNullLow));
459
73.0k
        write_batch.set_wait_policy(WAIT_ERROR);
460
73.0k
      }
461
73.0k
    }
462
72.8k
  }
463
464
983k
  docdb::ResolveTransactionConflicts(
465
983k
      doc_ops_, write_batch, tablet().clock()->Now(),
466
983k
      read_time_ ? 
read_time_.read495k
:
HybridTime::kMax488k
,
467
983k
      tablet().doc_db(), partial_range_key_intents,
468
983k
      transaction_participant, tablet().metrics()->transaction_conflicts.get(),
469
987k
      [this](const Result<HybridTime>& result) {
470
987k
        if (!result.ok()) {
471
129k
          ExecuteDone(result.status());
472
129k
          TRACE("ExecuteDone");
473
129k
          return;
474
129k
        }
475
858k
        TransactionalConflictsResolved();
476
858k
        TRACE("TransactionalConflictsResolved");
477
858k
      });
478
479
983k
  return Status::OK();
480
983k
}
481
482
122k
void WriteQuery::NonTransactionalConflictsResolved(HybridTime now, HybridTime result) {
483
122k
  if (now != result) {
484
10
    tablet().clock()->Update(result);
485
10
  }
486
487
122k
  CompleteExecute();
488
122k
}
489
490
858k
void WriteQuery::TransactionalConflictsResolved() {
491
858k
  auto status = DoTransactionalConflictsResolved();
492
858k
  if (!status.ok()) {
493
0
    LOG(DFATAL) << status;
494
0
    ExecuteDone(status);
495
0
  }
496
858k
}
497
498
858k
CHECKED_STATUS WriteQuery::DoTransactionalConflictsResolved() {
499
858k
  if (!read_time_) {
500
427k
    auto safe_time = VERIFY_RESULT(tablet().SafeTime(RequireLease::kTrue));
501
0
    read_time_ = ReadHybridTime::FromHybridTimeRange(
502
427k
        {safe_time, tablet().clock()->NowRange().second});
503
430k
  } else if (prepare_result_.need_read_snapshot &&
504
430k
             
isolation_level_ == IsolationLevel::SERIALIZABLE_ISOLATION359k
) {
505
0
    return STATUS_FORMAT(
506
0
        InvalidArgument,
507
0
        "Read time should NOT be specified for serializable isolation level: $0",
508
0
        read_time_);
509
0
  }
510
511
858k
  CompleteExecute();
512
858k
  return Status::OK();
513
858k
}
514
515
3.11M
void WriteQuery::CompleteExecute() {
516
3.11M
  ExecuteDone(DoCompleteExecute());
517
3.11M
}
518
519
3.11M
CHECKED_STATUS WriteQuery::DoCompleteExecute() {
520
3.11M
  auto read_op = prepare_result_.need_read_snapshot
521
3.11M
      ? VERIFY_RESULT(ScopedReadOperation::Create(&tablet(), RequireLease::kTrue, read_time_))
522
3.11M
      : 
ScopedReadOperation()2.51M
;
523
  // Actual read hybrid time used for read-modify-write operation.
524
3.11M
  auto real_read_time = prepare_result_.need_read_snapshot
525
3.11M
      ? 
read_op.read_time()596k
526
      // When need_read_snapshot is false, this time is used only to write TTL field of record.
527
3.11M
      : 
ReadHybridTime::SingleTime(tablet().clock()->Now())2.52M
;
528
529
  // We expect all read operations for this transaction to be done in AssembleDocWriteBatch. Once
530
  // read_txn goes out of scope, the read point is deregistered.
531
3.11M
  bool local_limit_updated = false;
532
533
  // This loop may be executed multiple times multiple times only for serializable isolation or
534
  // when read_time was not yet picked for snapshot isolation.
535
  // In all other cases it is executed only once.
536
3.11M
  auto init_marker_behavior = tablet().table_type() == TableType::REDIS_TABLE_TYPE
537
3.11M
      ? 
docdb::InitMarkerBehavior::kRequired123k
538
3.11M
      : 
docdb::InitMarkerBehavior::kOptional2.99M
;
539
3.11M
  for (;;) {
540
3.11M
    RETURN_NOT_OK(docdb::AssembleDocWriteBatch(
541
3.11M
        doc_ops_, deadline(), real_read_time, tablet().doc_db(),
542
3.11M
        request().mutable_write_batch(), init_marker_behavior,
543
3.11M
        tablet().monotonic_counter(), &restart_read_ht_,
544
3.11M
        tablet().metadata()->table_name()));
545
546
    // For serializable isolation we don't fix read time, so could do read restart locally,
547
    // instead of failing whole transaction.
548
3.11M
    if (!restart_read_ht_.is_valid() || 
!allow_immediate_read_restart_0
) {
549
3.11M
      break;
550
3.11M
    }
551
552
2.62k
    real_read_time.read = restart_read_ht_;
553
2.62k
    if (!local_limit_updated) {
554
0
      local_limit_updated = true;
555
0
      real_read_time.local_limit = std::min(
556
0
          real_read_time.local_limit, VERIFY_RESULT(tablet().SafeTime(RequireLease::kTrue)));
557
0
    }
558
559
2.62k
    restart_read_ht_ = HybridTime();
560
561
2.62k
    request().mutable_write_batch()->clear_write_pairs();
562
563
2.62k
    for (auto& doc_op : doc_ops_) {
564
0
      doc_op->ClearResponse();
565
0
    }
566
2.62k
  }
567
568
3.11M
  if (allow_immediate_read_restart_ &&
569
3.11M
      
isolation_level_ != IsolationLevel::NON_TRANSACTIONAL2.27M
&&
570
3.11M
      
response_175k
) {
571
175k
    real_read_time.ToPB(response_->mutable_used_read_time());
572
175k
  }
573
574
3.11M
  if (restart_read_ht_.is_valid()) {
575
0
    return Status::OK();
576
0
  }
577
578
3.11M
  docdb_locks_ = std::move(prepare_result_.lock_batch);
579
580
3.11M
  return Status::OK();
581
3.11M
}
582
583
55.7M
Tablet& WriteQuery::tablet() const {
584
55.7M
  return *operation_->tablet();
585
55.7M
}
586
587
2.87M
void WriteQuery::AdjustYsqlQueryTransactionality(size_t ysql_batch_size) {
588
2.87M
  force_txn_path_ = ysql_batch_size > 0 && 
tablet().is_sys_catalog()1.00M
;
589
2.87M
}
590
591
123k
void WriteQuery::RedisExecuteDone(const Status& status) {
592
123k
  if (!status.ok() || restart_read_ht().is_valid()) {
593
0
    StartSynchronization(std::move(self_), status);
594
0
    return;
595
0
  }
596
123k
  
for (auto& doc_op : doc_ops_)123k
{
597
123k
    auto* redis_write_operation = down_cast<docdb::RedisWriteOperation*>(doc_op.get());
598
123k
    response_->add_redis_response_batch()->Swap(&redis_write_operation->response());
599
123k
  }
600
601
123k
  StartSynchronization(std::move(self_), Status::OK());
602
123k
}
603
604
2.11M
void WriteQuery::CqlExecuteDone(const Status& status) {
605
2.11M
  if (restart_read_ht().is_valid()) {
606
0
    StartSynchronization(std::move(self_), Status::OK());
607
0
    return;
608
0
  }
609
610
2.11M
  if (status.ok()) {
611
2.08M
    UpdateQLIndexes();
612
2.08M
  } else {
613
26.9k
    CompleteQLWriteBatch(status);
614
26.9k
  }
615
2.11M
}
616
617
2.11M
void WriteQuery::CompleteQLWriteBatch(const Status& status) {
618
2.11M
  if (!status.ok()) {
619
24.8k
    StartSynchronization(std::move(self_), status);
620
24.8k
    return;
621
24.8k
  }
622
623
2.08M
  bool is_unique_index = tablet().metadata()->is_unique_index();
624
625
4.49M
  for (auto& doc_op : doc_ops_) {
626
4.49M
    std::unique_ptr<docdb::QLWriteOperation> ql_write_op(
627
4.49M
        down_cast<docdb::QLWriteOperation*>(doc_op.release()));
628
4.49M
    if (is_unique_index &&
629
4.49M
        
ql_write_op->request().type() == QLWriteRequestPB::QL_STMT_INSERT2.90k
&&
630
4.49M
        
ql_write_op->response()->has_applied()2.30k
&&
!ql_write_op->response()->applied()451
) {
631
      // If this is an insert into a unique index and it fails to apply, report duplicate value err.
632
451
      ql_write_op->response()->set_status(QLResponsePB::YQL_STATUS_USAGE_ERROR);
633
451
      ql_write_op->response()->set_error_message(
634
451
          Format("Duplicate value disallowed by unique index $0",
635
451
          tablet().metadata()->table_name()));
636
451
      DVLOG(1) << "Could not apply the given operation " << AsString(ql_write_op->request())
637
0
               << " due to " << AsString(ql_write_op->response());
638
4.49M
    } else if (ql_write_op->rowblock() != nullptr) {
639
      // If the QL write op returns a rowblock, move the op to the transaction state to return the
640
      // rows data as a sidecar after the transaction completes.
641
273
      ql_write_ops_.emplace_back(std::move(ql_write_op));
642
273
    }
643
4.49M
  }
644
645
2.08M
  StartSynchronization(std::move(self_), Status::OK());
646
2.08M
}
647
648
2.08M
void WriteQuery::UpdateQLIndexes() {
649
2.08M
  client::YBClient* client = nullptr;
650
2.08M
  client::YBSessionPtr session;
651
2.08M
  client::YBTransactionPtr txn;
652
2.08M
  IndexOps index_ops;
653
2.08M
  const ChildTransactionDataPB* child_transaction_data = nullptr;
654
4.48M
  for (auto& doc_op : doc_ops_) {
655
4.48M
    auto* write_op = down_cast<docdb::QLWriteOperation*>(doc_op.get());
656
4.48M
    if (write_op->index_requests()->empty()) {
657
4.47M
      continue;
658
4.47M
    }
659
15.1k
    
if (14.0k
!client14.0k
) {
660
15.1k
      client = &tablet().client();
661
15.1k
      session = std::make_shared<client::YBSession>(client);
662
15.1k
      session->SetDeadline(deadline());
663
15.1k
      if (write_op->request().has_child_transaction_data()) {
664
13.8k
        child_transaction_data = &write_op->request().child_transaction_data();
665
13.8k
        if (!tablet().transaction_manager()) {
666
0
          StartSynchronization(
667
0
              std::move(self_),
668
0
              STATUS(Corruption, "Transaction manager is not present for index update"));
669
0
          return;
670
0
        }
671
13.8k
        auto child_data = client::ChildTransactionData::FromPB(
672
13.8k
            write_op->request().child_transaction_data());
673
13.8k
        if (!child_data.ok()) {
674
0
          StartSynchronization(std::move(self_), child_data.status());
675
0
          return;
676
0
        }
677
13.8k
        txn = std::make_shared<client::YBTransaction>(tablet().transaction_manager(), *child_data);
678
13.8k
        session->SetTransaction(txn);
679
13.8k
      } else {
680
1.24k
        child_transaction_data = nullptr;
681
1.24k
      }
682
18.4E
    } else if (write_op->request().has_child_transaction_data()) {
683
4
      DCHECK_ONLY_NOTNULL(child_transaction_data);
684
4
      DCHECK_EQ(child_transaction_data->ShortDebugString(),
685
4
                write_op->request().child_transaction_data().ShortDebugString());
686
18.4E
    } else {
687
18.4E
      DCHECK(child_transaction_data == nullptr) <<
688
18.4E
          "Value: " << child_transaction_data->ShortDebugString();
689
18.4E
    }
690
691
    // Apply the write ops to update the index
692
36.2k
    
for (auto& pair : *write_op->index_requests())14.0k
{
693
36.2k
      client::YBTablePtr index_table;
694
36.2k
      bool cache_used_ignored = false;
695
36.2k
      auto metadata_cache = tablet().YBMetaDataCache();
696
36.2k
      if (!metadata_cache) {
697
0
        StartSynchronization(
698
0
            std::move(self_),
699
0
            STATUS(Corruption, "Table metadata cache is not present for index update"));
700
0
        return;
701
0
      }
702
      // TODO create async version of GetTable.
703
      // It is ok to have sync call here, because we use cache and it should not take too long.
704
36.2k
      auto status = metadata_cache->GetTable(pair.first->table_id(), &index_table,
705
36.2k
                                             &cache_used_ignored);
706
36.2k
      if (!status.ok()) {
707
0
        StartSynchronization(std::move(self_), status);
708
0
        return;
709
0
      }
710
36.2k
      std::shared_ptr<client::YBqlWriteOp> index_op(index_table->NewQLWrite());
711
36.2k
      index_op->mutable_request()->Swap(&pair.second);
712
36.2k
      index_op->mutable_request()->MergeFrom(pair.second);
713
36.2k
      session->Apply(index_op);
714
36.2k
      index_ops.emplace_back(std::move(index_op), write_op);
715
36.2k
    }
716
14.0k
  }
717
718
2.08M
  if (!session) {
719
2.06M
    CompleteQLWriteBatch(Status::OK());
720
2.06M
    return;
721
2.06M
  }
722
723
18.8k
  session->FlushAsync(std::bind(
724
18.8k
      &WriteQuery::UpdateQLIndexesFlushed, this, session, txn, std::move(index_ops), _1));
725
18.8k
}
726
727
void WriteQuery::UpdateQLIndexesFlushed(
728
    const client::YBSessionPtr& session, const client::YBTransactionPtr& txn,
729
15.1k
    const IndexOps& index_ops, client::FlushStatus* flush_status) {
730
15.1k
  std::unique_ptr<WriteQuery> query(std::move(self_));
731
732
15.1k
  const auto& status = flush_status->status;
733
15.1k
  if (PREDICT_FALSE(!status.ok())) {
734
    // When any error occurs during the dispatching of YBOperation, YBSession saves the error and
735
    // returns IOError. When it happens, retrieves the errors and discard the IOError.
736
145
    if (status.IsIOError()) {
737
145
      for (const auto& error : flush_status->errors) {
738
        // return just the first error seen.
739
145
        Cancel(error->status());
740
145
        return;
741
145
      }
742
145
    }
743
0
    Cancel(status);
744
0
    return;
745
145
  }
746
747
14.9k
  ChildTransactionResultPB child_result;
748
14.9k
  if (txn) {
749
13.7k
    auto finish_result = txn->FinishChild();
750
13.7k
    if (!finish_result.ok()) {
751
0
      query->Cancel(finish_result.status());
752
0
      return;
753
0
    }
754
13.7k
    child_result = std::move(*finish_result);
755
13.7k
  }
756
757
  // Check the responses of the index write ops.
758
36.0k
  
for (const auto& pair : index_ops)14.9k
{
759
36.0k
    std::shared_ptr<client::YBqlWriteOp> index_op = pair.first;
760
36.0k
    auto* response = pair.second->response();
761
36.0k
    DCHECK_ONLY_NOTNULL(response);
762
36.0k
    auto* index_response = index_op->mutable_response();
763
764
36.0k
    if (index_response->status() != QLResponsePB::YQL_STATUS_OK) {
765
441
      DVLOG
(1) << "Got status " << index_response->status() << " for " << AsString(index_op)0
;
766
441
      response->set_status(index_response->status());
767
441
      response->set_error_message(std::move(*index_response->mutable_error_message()));
768
441
    }
769
36.0k
    if (txn) {
770
33.2k
      *response->mutable_child_transaction_result() = child_result;
771
33.2k
    }
772
36.0k
  }
773
774
14.9k
  self_ = std::move(query);
775
14.9k
  CompleteQLWriteBatch(Status::OK());
776
14.9k
}
777
778
695k
void WriteQuery::PgsqlExecuteDone(const Status& status) {
779
695k
  if (!status.ok() || 
restart_read_ht_.is_valid()647k
) {
780
47.6k
    StartSynchronization(std::move(self_), status);
781
47.6k
    return;
782
47.6k
  }
783
784
12.8M
  
for (auto& doc_op : doc_ops_)647k
{
785
    // We'll need to return the number of rows inserted, updated, or deleted by each operation.
786
12.8M
    std::unique_ptr<docdb::PgsqlWriteOperation> pgsql_write_op(
787
12.8M
        down_cast<docdb::PgsqlWriteOperation*>(doc_op.release()));
788
12.8M
    pgsql_write_ops_.emplace_back(std::move(pgsql_write_op));
789
12.8M
  }
790
791
647k
  StartSynchronization(std::move(self_), Status::OK());
792
647k
}
793
794
314k
void WriteQuery::SimpleExecuteDone(const Status& status) {
795
314k
  StartSynchronization(std::move(self_), status);
796
314k
}
797
798
}  // namespace tablet
799
}  // namespace yb