YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/tablet/write_query.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/tablet/write_query.h"
15
16
#include "yb/client/client.h"
17
#include "yb/client/error.h"
18
#include "yb/client/meta_data_cache.h"
19
#include "yb/client/session.h"
20
#include "yb/client/table.h"
21
#include "yb/client/transaction.h"
22
#include "yb/client/yb_op.h"
23
24
#include "yb/common/index.h"
25
#include "yb/common/row_mark.h"
26
#include "yb/common/schema.h"
27
28
#include "yb/docdb/conflict_resolution.h"
29
#include "yb/docdb/cql_operation.h"
30
#include "yb/docdb/doc_write_batch.h"
31
#include "yb/docdb/pgsql_operation.h"
32
#include "yb/docdb/redis_operation.h"
33
34
#include "yb/tablet/tablet_metadata.h"
35
#include "yb/tablet/operations/write_operation.h"
36
#include "yb/tablet/tablet.h"
37
#include "yb/tablet/tablet_metrics.h"
38
#include "yb/tablet/transaction_participant.h"
39
#include "yb/tablet/write_query_context.h"
40
41
#include "yb/tserver/tserver.pb.h"
42
43
#include "yb/util/logging.h"
44
#include "yb/util/metrics.h"
45
#include "yb/util/trace.h"
46
47
using namespace std::placeholders;
48
49
namespace yb {
50
namespace tablet {
51
52
namespace {
53
54
// Separate Redis / QL / row operations write batches from write_request in preparation for the
55
// write transaction. Leave just the tablet id behind. Return Redis / QL / row operations, etc.
56
// in batch_request.
57
1.59M
void SetupKeyValueBatch(const tserver::WriteRequestPB& client_request, WritePB* out_request) {
58
1.59M
  out_request->set_unused_tablet_id(""); // Backward compatibility.
59
1.59M
  auto& out_write_batch = *out_request->mutable_write_batch();
60
1.59M
  if (client_request.has_write_batch()) {
61
405k
    out_write_batch = client_request.write_batch();
62
405k
  }
63
1.59M
  out_write_batch.set_deprecated_may_have_metadata(true);
64
1.59M
  if (client_request.has_request_id()) {
65
1.31M
    out_request->set_client_id1(client_request.client_id1());
66
1.31M
    out_request->set_client_id2(client_request.client_id2());
67
1.31M
    out_request->set_request_id(client_request.request_id());
68
1.31M
    out_request->set_min_running_request_id(client_request.min_running_request_id());
69
1.31M
  }
70
1.59M
  out_request->set_batch_idx(client_request.batch_idx());
71
  // Actually, in production code, we could check for external hybrid time only when there are
72
  // no ql, pgsql, redis operations.
73
  // But in CDCServiceTest we have ql write batch with external time.
74
1.59M
  if (client_request.has_external_hybrid_time()) {
75
1.18k
    out_request->set_external_hybrid_time(client_request.external_hybrid_time());
76
1.18k
  }
77
1.59M
}
78
79
} // namespace
80
81
enum class WriteQuery::ExecuteMode {
82
  kSimple,
83
  kRedis,
84
  kCql,
85
  kPgsql,
86
};
87
88
WriteQuery::WriteQuery(
89
    int64_t term,
90
    CoarseTimePoint deadline,
91
    WriteQueryContext* context,
92
    Tablet* tablet,
93
    tserver::WriteResponsePB* response,
94
    docdb::OperationKind kind)
95
    : operation_(std::make_unique<WriteOperation>(tablet)),
96
      term_(term), deadline_(deadline),
97
      context_(context),
98
      response_(response),
99
      kind_(kind),
100
1.74M
      start_time_(CoarseMonoClock::Now()) {
101
1.74M
}
102
103
6.45M
WritePB& WriteQuery::request() {
104
6.45M
  return *operation_->mutable_request();
105
6.45M
}
106
107
1.64M
std::unique_ptr<WriteOperation> WriteQuery::PrepareSubmit() {
108
1.64M
  operation_->set_completion_callback(
109
1.64M
      [operation = operation_.get(), query = this](const Status& status) {
110
1.64M
    std::unique_ptr<WriteQuery> query_holder(query);
111
1.64M
    query->Finished(operation, status);
112
1.64M
  });
113
1.64M
  return std::move(operation_);
114
1.64M
}
115
116
1.74M
void WriteQuery::DoStartSynchronization(const Status& status) {
117
1.74M
  std::unique_ptr<WriteQuery> self(this);
118
  // Move submit_token_ so it is released after this function.
119
1.74M
  ScopedRWOperation submit_token(std::move(submit_token_));
120
  // If a restart read is required, then we return this fact to caller and don't perform the write
121
  // operation.
122
1.74M
  if (status.ok() && restart_read_ht_.is_valid()) {
123
0
    auto restart_time = response()->mutable_restart_read_time();
124
0
    restart_time->set_read_ht(restart_read_ht_.ToUint64());
125
0
    auto local_limit = context_->ReportReadRestart();
126
0
    if (!local_limit.ok()) {
127
0
      Cancel(local_limit.status());
128
0
      return;
129
0
    }
130
0
    restart_time->set_deprecated_max_of_read_time_and_local_limit_ht(local_limit->ToUint64());
131
0
    restart_time->set_local_limit_ht(local_limit->ToUint64());
132
    // Global limit is ignored by caller, so we don't set it.
133
0
    Cancel(Status::OK());
134
0
    return;
135
0
  }
136
137
1.74M
  if (!status.ok()) {
138
107k
    Cancel(status);
139
107k
    return;
140
107k
  }
141
142
1.64M
  context_->Submit(self.release()->PrepareSubmit(), term_);
143
1.64M
}
144
145
1.75M
void WriteQuery::Release() {
146
  // Free DocDB multi-level locks.
147
1.75M
  docdb_locks_.Reset();
148
1.75M
}
149
150
1.75M
WriteQuery::~WriteQuery() {
151
1.75M
}
152
153
1.61M
void WriteQuery::set_client_request(std::reference_wrapper<const tserver::WriteRequestPB> req) {
154
1.61M
  client_request_ = &req.get();
155
1.61M
  read_time_ = ReadHybridTime::FromReadTimePB(req.get());
156
1.61M
  allow_immediate_read_restart_ = !read_time_;
157
1.61M
}
158
159
0
void WriteQuery::set_client_request(std::unique_ptr<tserver::WriteRequestPB> req) {
160
0
  set_client_request(*req);
161
0
  client_request_holder_ = std::move(req);
162
0
}
163
164
1.64M
void WriteQuery::Finished(WriteOperation* operation, const Status& status) {
165
625
  LOG_IF(DFATAL, operation_) << "Finished not submitted operation: " << status;
166
167
1.64M
  if (status.ok()) {
168
1.62M
    TabletMetrics* metrics = operation->tablet()->metrics();
169
1.62M
    if (metrics) {
170
1.62M
      auto op_duration_usec = MonoDelta(CoarseMonoClock::now() - start_time_).ToMicroseconds();
171
1.62M
      metrics->write_op_duration_client_propagated_consistency->Increment(op_duration_usec);
172
1.62M
    }
173
1.62M
  }
174
175
1.64M
  Complete(status);
176
1.64M
}
177
178
107k
void WriteQuery::Cancel(const Status& status) {
179
69
  LOG_IF(DFATAL, !operation_) << "Cancelled submitted operation: " << status;
180
181
107k
  Complete(status);
182
107k
}
183
184
1.75M
void WriteQuery::Complete(const Status& status) {
185
1.75M
  Release();
186
187
1.75M
  if (callback_) {
188
1.65M
    callback_(status);
189
1.65M
  }
190
1.75M
}
191
192
1.74M
void WriteQuery::ExecuteDone(const Status& status) {
193
1.74M
  scoped_read_operation_.Reset();
194
1.74M
  switch (execute_mode_) {
195
140k
    case ExecuteMode::kSimple:
196
140k
      SimpleExecuteDone(status);
197
140k
      return;
198
61.5k
    case ExecuteMode::kRedis:
199
61.5k
      RedisExecuteDone(status);
200
61.5k
      return;
201
1.27M
    case ExecuteMode::kCql:
202
1.27M
      CqlExecuteDone(status);
203
1.27M
      return;
204
261k
    case ExecuteMode::kPgsql:
205
261k
      PgsqlExecuteDone(status);
206
261k
      return;
207
0
  }
208
0
  FATAL_INVALID_ENUM_VALUE(ExecuteMode, execute_mode_);
209
0
}
210
211
1.74M
Result<bool> WriteQuery::PrepareExecute() {
212
1.74M
  if (client_request_) {
213
1.60M
    auto* request = operation().AllocateRequest();
214
1.60M
    SetupKeyValueBatch(*client_request_, request);
215
216
1.60M
    if (!client_request_->redis_write_batch().empty()) {
217
61.5k
      return RedisPrepareExecute();
218
61.5k
    }
219
220
1.54M
    if (!client_request_->ql_write_batch().empty()) {
221
1.27M
      return CqlPrepareExecute();
222
1.27M
    }
223
224
261k
    if (!client_request_->pgsql_write_batch().empty()) {
225
260k
      return PgsqlPrepareExecute();
226
260k
    }
227
228
1.28k
    if (client_request_->has_write_batch() && client_request_->has_external_hybrid_time()) {
229
0
      return false;
230
0
    }
231
140k
  } else {
232
140k
    const auto* request = operation().request();
233
140k
    if (request && request->has_write_batch() && !request->write_batch().read_pairs().empty()) {
234
140k
      return SimplePrepareExecute();
235
140k
    }
236
1.35k
  }
237
238
  // Empty write should not happen, but we could handle it.
239
  // Just report it as error in release mode.
240
1.35k
  LOG(DFATAL) << "Empty write: " << AsString(client_request_) << ", " << AsString(request());
241
242
1.35k
  return STATUS(InvalidArgument, "Empty write");
243
1.35k
}
244
245
1.74M
CHECKED_STATUS WriteQuery::InitExecute(ExecuteMode mode) {
246
1.74M
  scoped_read_operation_ = tablet().CreateNonAbortableScopedRWOperation();
247
1.74M
  if (!scoped_read_operation_.ok()) {
248
7
    return MoveStatus(scoped_read_operation_);
249
7
  }
250
1.74M
  execute_mode_ = mode;
251
1.74M
  return Status::OK();
252
1.74M
}
253
254
61.5k
Result<bool> WriteQuery::RedisPrepareExecute() {
255
61.5k
  RETURN_NOT_OK(InitExecute(ExecuteMode::kRedis));
256
257
  // Since we take exclusive locks, it's okay to use Now as the read TS for writes.
258
61.5k
  const auto& redis_write_batch = client_request_->redis_write_batch();
259
260
61.5k
  doc_ops_.reserve(redis_write_batch.size());
261
61.5k
  for (const auto& redis_request : redis_write_batch) {
262
61.5k
    doc_ops_.emplace_back(new docdb::RedisWriteOperation(redis_request));
263
61.5k
  }
264
265
61.5k
  return true;
266
61.5k
}
267
268
140k
Result<bool> WriteQuery::SimplePrepareExecute() {
269
140k
  RETURN_NOT_OK(InitExecute(ExecuteMode::kSimple));
270
140k
  return true;
271
140k
}
272
273
1.27M
Result<bool> WriteQuery::CqlPrepareExecute() {
274
1.27M
  RETURN_NOT_OK(InitExecute(ExecuteMode::kCql));
275
276
1.27M
  auto& metadata = *tablet().metadata();
277
18.4E
  DVLOG(2) << "Schema version for  " << metadata.table_name() << ": " << metadata.schema_version();
278
279
1.27M
  const auto& ql_write_batch = client_request_->ql_write_batch();
280
281
1.27M
  doc_ops_.reserve(ql_write_batch.size());
282
283
1.27M
  auto txn_op_ctx = VERIFY_RESULT(tablet().CreateTransactionOperationContext(
284
1.27M
      request().write_batch().transaction(),
285
1.27M
      /* is_ysql_catalog_table */ false,
286
1.27M
      &request().write_batch().subtransaction()));
287
1.27M
  auto table_info = metadata.primary_table_info();
288
3.38M
  for (const auto& req : ql_write_batch) {
289
3.38M
    QLResponsePB* resp = response_->add_ql_response_batch();
290
3.38M
    if (!IsSchemaVersionCompatible(
291
3.38M
            table_info->schema_version, req.schema_version(),
292
1.54k
            req.is_compatible_with_previous_version())) {
293
18.4E
      DVLOG(1) << " On " << table_info->table_name
294
18.4E
               << " Setting status for write as YQL_STATUS_SCHEMA_VERSION_MISMATCH tserver's: "
295
18.4E
               << table_info->schema_version << " vs req's : " << req.schema_version()
296
18.4E
               << " is req compatible with prev version: "
297
18.4E
               << req.is_compatible_with_previous_version() << " for " << AsString(req);
298
1.54k
      resp->set_status(QLResponsePB::YQL_STATUS_SCHEMA_VERSION_MISMATCH);
299
1.54k
      resp->set_error_message(Format(
300
1.54k
          "schema version mismatch for table $0: expected $1, got $2 (compt with prev: $3)",
301
1.54k
          table_info->table_id,
302
1.54k
          table_info->schema_version, req.schema_version(),
303
1.54k
          req.is_compatible_with_previous_version()));
304
3.38M
    } else {
305
6.20k
      DVLOG(3) << "Version matches : " << table_info->schema_version << " for "
306
6.20k
               << AsString(req);
307
3.38M
      auto write_op = std::make_unique<docdb::QLWriteOperation>(
308
3.38M
          req, std::shared_ptr<Schema>(table_info, table_info->schema.get()),
309
3.38M
          *table_info->index_map, tablet().unique_index_key_schema(),
310
3.38M
          txn_op_ctx);
311
3.38M
      RETURN_NOT_OK(write_op->Init(resp));
312
3.38M
      doc_ops_.emplace_back(std::move(write_op));
313
3.38M
    }
314
3.38M
  }
315
316
  // All operations has wrong schema version
317
1.27M
  if (doc_ops_.empty()) {
318
1.28k
    return false;
319
1.28k
  }
320
321
1.27M
  return true;
322
1.27M
}
323
324
260k
Result<bool> WriteQuery::PgsqlPrepareExecute() {
325
260k
  RETURN_NOT_OK(InitExecute(ExecuteMode::kPgsql));
326
327
260k
  const auto& pgsql_write_batch = client_request_->pgsql_write_batch();
328
329
260k
  doc_ops_.reserve(pgsql_write_batch.size());
330
331
260k
  TransactionOperationContext txn_op_ctx;
332
333
260k
  auto& metadata = *tablet().metadata();
334
260k
  bool colocated = metadata.colocated();
335
336
3.11M
  for (const auto& req : pgsql_write_batch) {
337
3.11M
    PgsqlResponsePB* resp = response_->add_pgsql_response_batch();
338
    // Table-level tombstones should not be requested for non-colocated tables.
339
3.11M
    if ((req.stmt_type() == PgsqlWriteRequestPB::PGSQL_TRUNCATE_COLOCATED) && !colocated) {
340
0
      LOG(WARNING) << "cannot create table-level tombstone for a non-colocated table";
341
0
      resp->set_skipped(true);
342
0
      continue;
343
0
    }
344
3.11M
    const std::shared_ptr<tablet::TableInfo> table_info =
345
3.11M
        VERIFY_RESULT(metadata.GetTableInfo(req.table_id()));
346
3.11M
    if (table_info->schema_version != req.schema_version()) {
347
5
      resp->set_status(PgsqlResponsePB::PGSQL_STATUS_SCHEMA_VERSION_MISMATCH);
348
5
      resp->set_error_message(
349
5
          Format("schema version mismatch for table $0: expected $1, got $2",
350
5
                 table_info->table_id,
351
5
                 table_info->schema_version,
352
5
                 req.schema_version()));
353
3.11M
    } else {
354
3.11M
      if (doc_ops_.empty()) {
355
        // Use the value of is_ysql_catalog_table from the first operation in the batch.
356
258k
        txn_op_ctx = VERIFY_RESULT(tablet().CreateTransactionOperationContext(
357
258k
            request().write_batch().transaction(),
358
258k
            table_info->schema->table_properties().is_ysql_catalog_table(),
359
258k
            &request().write_batch().subtransaction()));
360
258k
      }
361
3.11M
      auto write_op = std::make_unique<docdb::PgsqlWriteOperation>(
362
3.11M
          req, *table_info->schema, txn_op_ctx);
363
3.11M
      RETURN_NOT_OK(write_op->Init(resp));
364
3.11M
      doc_ops_.emplace_back(std::move(write_op));
365
3.11M
    }
366
3.11M
  }
367
368
  // All operations have wrong schema version.
369
260k
  if (doc_ops_.empty()) {
370
5
    return false;
371
5
  }
372
373
260k
  return true;
374
260k
}
375
376
1.74M
void WriteQuery::Execute(std::unique_ptr<WriteQuery> query) {
377
1.74M
  auto prepare_result = query->PrepareExecute();
378
1.74M
  if (!prepare_result.ok()) {
379
7
    StartSynchronization(std::move(query), prepare_result.status());
380
7
    return;
381
7
  }
382
383
1.74M
  if (!prepare_result.get()) {
384
1.28k
    StartSynchronization(std::move(query), Status::OK());
385
1.28k
    return;
386
1.28k
  }
387
388
1.74M
  auto* query_ptr = query.get();
389
1.74M
  query_ptr->self_ = std::move(query);
390
1.74M
  auto status = query_ptr->DoExecute();
391
1.74M
  if (!status.ok()) {
392
46
    query_ptr->ExecuteDone(status);
393
46
  }
394
1.74M
}
395
396
1.73M
CHECKED_STATUS WriteQuery::DoExecute() {
397
1.73M
  auto& write_batch = *request().mutable_write_batch();
398
1.73M
  isolation_level_ = VERIFY_RESULT(tablet().GetIsolationLevelFromPB(write_batch));
399
1.73M
  const RowMarkType row_mark_type = GetRowMarkTypeFromPB(write_batch);
400
1.73M
  const auto& metadata = *tablet().metadata();
401
402
1.73M
  const bool transactional_table = metadata.schema()->table_properties().is_transactional() ||
403
1.18M
                                   force_txn_path_;
404
405
1.73M
  if (!transactional_table && isolation_level_ != IsolationLevel::NON_TRANSACTIONAL) {
406
0
    YB_LOG_EVERY_N_SECS(DFATAL, 30)
407
0
        << "An attempt to perform a transactional operation on a non-transactional table: "
408
0
        << operation_->ToString();
409
0
  }
410
411
1.73M
  docdb::PartialRangeKeyIntents partial_range_key_intents(metadata.UsePartialRangeKeyIntents());
412
1.73M
  prepare_result_ = VERIFY_RESULT(docdb::PrepareDocWriteOperation(
413
1.73M
      doc_ops_, write_batch.read_pairs(), tablet().metrics()->write_lock_latency,
414
1.73M
      isolation_level_, kind(), row_mark_type, transactional_table,
415
1.73M
      deadline(), partial_range_key_intents, tablet().shared_lock_manager()));
416
417
1.73M
  auto* transaction_participant = tablet().transaction_participant();
418
1.73M
  if (transaction_participant) {
419
768k
    request_scope_ = RequestScope(transaction_participant);
420
768k
  }
421
422
1.73M
  if (!tablet().txns_enabled() || !transactional_table) {
423
1.13M
    CompleteExecute();
424
1.13M
    return Status::OK();
425
1.13M
  }
426
427
602k
  if (isolation_level_ == IsolationLevel::NON_TRANSACTIONAL) {
428
58.2k
    auto now = tablet().clock()->Now();
429
58.2k
    docdb::ResolveOperationConflicts(
430
58.2k
        doc_ops_, now, tablet().doc_db(), partial_range_key_intents,
431
58.2k
        transaction_participant, tablet().metrics()->transaction_conflicts.get(),
432
58.3k
        [this, now](const Result<HybridTime>& result) {
433
58.3k
          if (!result.ok()) {
434
3
            ExecuteDone(result.status());
435
3
            TRACE("InvokeCallback");
436
3
            return;
437
3
          }
438
58.3k
          NonTransactionalConflictsResolved(now, *result);
439
58.3k
          TRACE("NonTransactionalConflictsResolved");
440
58.3k
        });
441
58.2k
    return Status::OK();
442
58.2k
  }
443
444
544k
  if (isolation_level_ == IsolationLevel::SERIALIZABLE_ISOLATION &&
445
192k
      prepare_result_.need_read_snapshot) {
446
30.0k
    boost::container::small_vector<RefCntPrefix, 16> paths;
447
30.1k
    for (const auto& doc_op : doc_ops_) {
448
30.1k
      paths.clear();
449
30.1k
      IsolationLevel ignored_isolation_level;
450
30.1k
      RETURN_NOT_OK(doc_op->GetDocPaths(
451
30.1k
          docdb::GetDocPathsMode::kLock, &paths, &ignored_isolation_level));
452
30.1k
      for (const auto& path : paths) {
453
30.1k
        auto key = path.as_slice();
454
30.1k
        auto* pair = write_batch.mutable_read_pairs()->Add();
455
30.1k
        pair->set_key(key.data(), key.size());
456
        // Empty values are disallowed by docdb.
457
        // https://github.com/YugaByte/yugabyte-db/issues/736
458
30.1k
        pair->set_value(std::string(1, docdb::ValueTypeAsChar::kNullLow));
459
30.1k
        write_batch.set_wait_policy(WAIT_ERROR);
460
30.1k
      }
461
30.1k
    }
462
30.0k
  }
463
464
544k
  docdb::ResolveTransactionConflicts(
465
544k
      doc_ops_, write_batch, tablet().clock()->Now(),
466
299k
      read_time_ ? read_time_.read : HybridTime::kMax,
467
544k
      tablet().doc_db(), partial_range_key_intents,
468
544k
      transaction_participant, tablet().metrics()->transaction_conflicts.get(),
469
547k
      [this](const Result<HybridTime>& result) {
470
547k
        if (!result.ok()) {
471
100k
          ExecuteDone(result.status());
472
100k
          TRACE("ExecuteDone");
473
100k
          return;
474
100k
        }
475
446k
        TransactionalConflictsResolved();
476
446k
        TRACE("TransactionalConflictsResolved");
477
446k
      });
478
479
544k
  return Status::OK();
480
544k
}
481
482
58.2k
void WriteQuery::NonTransactionalConflictsResolved(HybridTime now, HybridTime result) {
483
58.2k
  if (now != result) {
484
3
    tablet().clock()->Update(result);
485
3
  }
486
487
58.2k
  CompleteExecute();
488
58.2k
}
489
490
446k
void WriteQuery::TransactionalConflictsResolved() {
491
446k
  auto status = DoTransactionalConflictsResolved();
492
446k
  if (!status.ok()) {
493
0
    LOG(DFATAL) << status;
494
0
    ExecuteDone(status);
495
0
  }
496
446k
}
497
498
446k
CHECKED_STATUS WriteQuery::DoTransactionalConflictsResolved() {
499
446k
  if (!read_time_) {
500
220k
    auto safe_time = VERIFY_RESULT(tablet().SafeTime(RequireLease::kTrue));
501
220k
    read_time_ = ReadHybridTime::FromHybridTimeRange(
502
220k
        {safe_time, tablet().clock()->NowRange().second});
503
225k
  } else if (prepare_result_.need_read_snapshot &&
504
139k
             isolation_level_ == IsolationLevel::SERIALIZABLE_ISOLATION) {
505
0
    return STATUS_FORMAT(
506
0
        InvalidArgument,
507
0
        "Read time should NOT be specified for serializable isolation level: $0",
508
0
        read_time_);
509
0
  }
510
511
446k
  CompleteExecute();
512
446k
  return Status::OK();
513
446k
}
514
515
1.64M
void WriteQuery::CompleteExecute() {
516
1.64M
  ExecuteDone(DoCompleteExecute());
517
1.64M
}
518
519
1.64M
CHECKED_STATUS WriteQuery::DoCompleteExecute() {
520
1.64M
  auto read_op = prepare_result_.need_read_snapshot
521
1.64M
      ? VERIFY_RESULT(ScopedReadOperation::Create(&tablet(), RequireLease::kTrue, read_time_))
522
1.41M
      : ScopedReadOperation();
523
  // Actual read hybrid time used for read-modify-write operation.
524
1.64M
  auto real_read_time = prepare_result_.need_read_snapshot
525
227k
      ? read_op.read_time()
526
      // When need_read_snapshot is false, this time is used only to write TTL field of record.
527
1.41M
      : ReadHybridTime::SingleTime(tablet().clock()->Now());
528
529
  // We expect all read operations for this transaction to be done in AssembleDocWriteBatch. Once
530
  // read_txn goes out of scope, the read point is deregistered.
531
1.64M
  bool local_limit_updated = false;
532
533
  // This loop may be executed multiple times multiple times only for serializable isolation or
534
  // when read_time was not yet picked for snapshot isolation.
535
  // In all other cases it is executed only once.
536
1.64M
  auto init_marker_behavior = tablet().table_type() == TableType::REDIS_TABLE_TYPE
537
61.5k
      ? docdb::InitMarkerBehavior::kRequired
538
1.58M
      : docdb::InitMarkerBehavior::kOptional;
539
1.64M
  for (;;) {
540
1.64M
    RETURN_NOT_OK(docdb::AssembleDocWriteBatch(
541
1.64M
        doc_ops_, deadline(), real_read_time, tablet().doc_db(),
542
1.64M
        request().mutable_write_batch(), init_marker_behavior,
543
1.64M
        tablet().monotonic_counter(), &restart_read_ht_,
544
1.64M
        tablet().metadata()->table_name()));
545
546
    // For serializable isolation we don't fix read time, so could do read restart locally,
547
    // instead of failing whole transaction.
548
1.64M
    if (!restart_read_ht_.is_valid() || !allow_immediate_read_restart_) {
549
1.63M
      break;
550
1.63M
    }
551
552
2.61k
    real_read_time.read = restart_read_ht_;
553
2.61k
    if (!local_limit_updated) {
554
0
      local_limit_updated = true;
555
0
      real_read_time.local_limit = std::min(
556
0
          real_read_time.local_limit, VERIFY_RESULT(tablet().SafeTime(RequireLease::kTrue)));
557
0
    }
558
559
2.61k
    restart_read_ht_ = HybridTime();
560
561
2.61k
    request().mutable_write_batch()->clear_write_pairs();
562
563
0
    for (auto& doc_op : doc_ops_) {
564
0
      doc_op->ClearResponse();
565
0
    }
566
2.61k
  }
567
568
1.64M
  if (allow_immediate_read_restart_ &&
569
1.24M
      isolation_level_ != IsolationLevel::NON_TRANSACTIONAL &&
570
110k
      response_) {
571
110k
    real_read_time.ToPB(response_->mutable_used_read_time());
572
110k
  }
573
574
1.64M
  if (restart_read_ht_.is_valid()) {
575
0
    return Status::OK();
576
0
  }
577
578
1.64M
  docdb_locks_ = std::move(prepare_result_.lock_batch);
579
580
1.64M
  return Status::OK();
581
1.64M
}
582
583
30.8M
Tablet& WriteQuery::tablet() const {
584
30.8M
  return *operation_->tablet();
585
30.8M
}
586
587
1.48M
void WriteQuery::AdjustYsqlQueryTransactionality(size_t ysql_batch_size) {
588
1.48M
  force_txn_path_ = ysql_batch_size > 0 && tablet().is_sys_catalog();
589
1.48M
}
590
591
61.5k
void WriteQuery::RedisExecuteDone(const Status& status) {
592
61.5k
  if (!status.ok() || restart_read_ht().is_valid()) {
593
0
    StartSynchronization(std::move(self_), status);
594
0
    return;
595
0
  }
596
61.5k
  for (auto& doc_op : doc_ops_) {
597
61.5k
    auto* redis_write_operation = down_cast<docdb::RedisWriteOperation*>(doc_op.get());
598
61.5k
    response_->add_redis_response_batch()->Swap(&redis_write_operation->response());
599
61.5k
  }
600
601
61.5k
  StartSynchronization(std::move(self_), Status::OK());
602
61.5k
}
603
604
1.27M
void WriteQuery::CqlExecuteDone(const Status& status) {
605
1.27M
  if (restart_read_ht().is_valid()) {
606
0
    StartSynchronization(std::move(self_), Status::OK());
607
0
    return;
608
0
  }
609
610
1.27M
  if (status.ok()) {
611
1.22M
    UpdateQLIndexes();
612
50.8k
  } else {
613
50.8k
    CompleteQLWriteBatch(status);
614
50.8k
  }
615
1.27M
}
616
617
1.27M
void WriteQuery::CompleteQLWriteBatch(const Status& status) {
618
1.27M
  if (!status.ok()) {
619
49.5k
    StartSynchronization(std::move(self_), status);
620
49.5k
    return;
621
49.5k
  }
622
623
1.22M
  bool is_unique_index = tablet().metadata()->is_unique_index();
624
625
3.34M
  for (auto& doc_op : doc_ops_) {
626
3.34M
    std::unique_ptr<docdb::QLWriteOperation> ql_write_op(
627
3.34M
        down_cast<docdb::QLWriteOperation*>(doc_op.release()));
628
3.34M
    if (is_unique_index &&
629
2.95k
        ql_write_op->request().type() == QLWriteRequestPB::QL_STMT_INSERT &&
630
2.33k
        ql_write_op->response()->has_applied() && !ql_write_op->response()->applied()) {
631
      // If this is an insert into a unique index and it fails to apply, report duplicate value err.
632
451
      ql_write_op->response()->set_status(QLResponsePB::YQL_STATUS_USAGE_ERROR);
633
451
      ql_write_op->response()->set_error_message(
634
451
          Format("Duplicate value disallowed by unique index $0",
635
451
          tablet().metadata()->table_name()));
636
0
      DVLOG(1) << "Could not apply the given operation " << AsString(ql_write_op->request())
637
0
               << " due to " << AsString(ql_write_op->response());
638
3.34M
    } else if (ql_write_op->rowblock() != nullptr) {
639
      // If the QL write op returns a rowblock, move the op to the transaction state to return the
640
      // rows data as a sidecar after the transaction completes.
641
272
      ql_write_ops_.emplace_back(std::move(ql_write_op));
642
272
    }
643
3.34M
  }
644
645
1.22M
  StartSynchronization(std::move(self_), Status::OK());
646
1.22M
}
647
648
1.22M
void WriteQuery::UpdateQLIndexes() {
649
1.22M
  client::YBClient* client = nullptr;
650
1.22M
  client::YBSessionPtr session;
651
1.22M
  client::YBTransactionPtr txn;
652
1.22M
  IndexOps index_ops;
653
1.22M
  const ChildTransactionDataPB* child_transaction_data = nullptr;
654
3.33M
  for (auto& doc_op : doc_ops_) {
655
3.33M
    auto* write_op = down_cast<docdb::QLWriteOperation*>(doc_op.get());
656
3.33M
    if (write_op->index_requests()->empty()) {
657
3.32M
      continue;
658
3.32M
    }
659
16.2k
    if (!client) {
660
15.8k
      client = &tablet().client();
661
15.8k
      session = std::make_shared<client::YBSession>(client);
662
15.8k
      session->SetDeadline(deadline());
663
15.8k
      if (write_op->request().has_child_transaction_data()) {
664
14.6k
        child_transaction_data = &write_op->request().child_transaction_data();
665
14.6k
        if (!tablet().transaction_manager()) {
666
0
          StartSynchronization(
667
0
              std::move(self_),
668
0
              STATUS(Corruption, "Transaction manager is not present for index update"));
669
0
          return;
670
0
        }
671
14.6k
        auto child_data = client::ChildTransactionData::FromPB(
672
14.6k
            write_op->request().child_transaction_data());
673
14.6k
        if (!child_data.ok()) {
674
0
          StartSynchronization(std::move(self_), child_data.status());
675
0
          return;
676
0
        }
677
14.6k
        txn = std::make_shared<client::YBTransaction>(tablet().transaction_manager(), *child_data);
678
14.6k
        session->SetTransaction(txn);
679
1.23k
      } else {
680
1.23k
        child_transaction_data = nullptr;
681
1.23k
      }
682
436
    } else if (write_op->request().has_child_transaction_data()) {
683
4
      DCHECK_ONLY_NOTNULL(child_transaction_data);
684
4
      DCHECK_EQ(child_transaction_data->ShortDebugString(),
685
4
                write_op->request().child_transaction_data().ShortDebugString());
686
432
    } else {
687
18.4E
      DCHECK(child_transaction_data == nullptr) <<
688
18.4E
          "Value: " << child_transaction_data->ShortDebugString();
689
432
    }
690
691
    // Apply the write ops to update the index
692
38.4k
    for (auto& pair : *write_op->index_requests()) {
693
38.4k
      client::YBTablePtr index_table;
694
38.4k
      bool cache_used_ignored = false;
695
38.4k
      auto metadata_cache = tablet().YBMetaDataCache();
696
38.4k
      if (!metadata_cache) {
697
0
        StartSynchronization(
698
0
            std::move(self_),
699
0
            STATUS(Corruption, "Table metadata cache is not present for index update"));
700
0
        return;
701
0
      }
702
      // TODO create async version of GetTable.
703
      // It is ok to have sync call here, because we use cache and it should not take too long.
704
38.4k
      auto status = metadata_cache->GetTable(pair.first->table_id(), &index_table,
705
38.4k
                                             &cache_used_ignored);
706
38.4k
      if (!status.ok()) {
707
0
        StartSynchronization(std::move(self_), status);
708
0
        return;
709
0
      }
710
38.4k
      std::shared_ptr<client::YBqlWriteOp> index_op(index_table->NewQLWrite());
711
38.4k
      index_op->mutable_request()->Swap(&pair.second);
712
38.4k
      index_op->mutable_request()->MergeFrom(pair.second);
713
38.4k
      session->Apply(index_op);
714
38.4k
      index_ops.emplace_back(std::move(index_op), write_op);
715
38.4k
    }
716
16.2k
  }
717
718
1.22M
  if (!session) {
719
1.20M
    CompleteQLWriteBatch(Status::OK());
720
1.20M
    return;
721
1.20M
  }
722
723
18.3k
  session->FlushAsync(std::bind(
724
18.3k
      &WriteQuery::UpdateQLIndexesFlushed, this, session, txn, std::move(index_ops), _1));
725
18.3k
}
726
727
void WriteQuery::UpdateQLIndexesFlushed(
728
    const client::YBSessionPtr& session, const client::YBTransactionPtr& txn,
729
15.8k
    const IndexOps& index_ops, client::FlushStatus* flush_status) {
730
15.8k
  std::unique_ptr<WriteQuery> query(std::move(self_));
731
732
15.8k
  const auto& status = flush_status->status;
733
15.8k
  if (PREDICT_FALSE(!status.ok())) {
734
    // When any error occurs during the dispatching of YBOperation, YBSession saves the error and
735
    // returns IOError. When it happens, retrieves the errors and discard the IOError.
736
112
    if (status.IsIOError()) {
737
112
      for (const auto& error : flush_status->errors) {
738
        // return just the first error seen.
739
112
        Cancel(error->status());
740
112
        return;
741
112
      }
742
112
    }
743
0
    Cancel(status);
744
0
    return;
745
15.7k
  }
746
747
15.7k
  ChildTransactionResultPB child_result;
748
15.7k
  if (txn) {
749
14.5k
    auto finish_result = txn->FinishChild();
750
14.5k
    if (!finish_result.ok()) {
751
0
      query->Cancel(finish_result.status());
752
0
      return;
753
0
    }
754
14.5k
    child_result = std::move(*finish_result);
755
14.5k
  }
756
757
  // Check the responses of the index write ops.
758
38.3k
  for (const auto& pair : index_ops) {
759
38.3k
    std::shared_ptr<client::YBqlWriteOp> index_op = pair.first;
760
38.3k
    auto* response = pair.second->response();
761
38.3k
    DCHECK_ONLY_NOTNULL(response);
762
38.3k
    auto* index_response = index_op->mutable_response();
763
764
38.3k
    if (index_response->status() != QLResponsePB::YQL_STATUS_OK) {
765
0
      DVLOG(1) << "Got status " << index_response->status() << " for " << AsString(index_op);
766
441
      response->set_status(index_response->status());
767
441
      response->set_error_message(std::move(*index_response->mutable_error_message()));
768
441
    }
769
38.3k
    if (txn) {
770
35.4k
      *response->mutable_child_transaction_result() = child_result;
771
35.4k
    }
772
38.3k
  }
773
774
15.7k
  self_ = std::move(query);
775
15.7k
  CompleteQLWriteBatch(Status::OK());
776
15.7k
}
777
778
261k
void WriteQuery::PgsqlExecuteDone(const Status& status) {
779
261k
  if (!status.ok() || restart_read_ht_.is_valid()) {
780
23.0k
    StartSynchronization(std::move(self_), status);
781
23.0k
    return;
782
23.0k
  }
783
784
3.09M
  for (auto& doc_op : doc_ops_) {
785
    // We'll need to return the number of rows inserted, updated, or deleted by each operation.
786
3.09M
    std::unique_ptr<docdb::PgsqlWriteOperation> pgsql_write_op(
787
3.09M
        down_cast<docdb::PgsqlWriteOperation*>(doc_op.release()));
788
3.09M
    pgsql_write_ops_.emplace_back(std::move(pgsql_write_op));
789
3.09M
  }
790
791
238k
  StartSynchronization(std::move(self_), Status::OK());
792
238k
}
793
794
140k
void WriteQuery::SimpleExecuteDone(const Status& status) {
795
140k
  StartSynchronization(std::move(self_), status);
796
140k
}
797
798
}  // namespace tablet
799
}  // namespace yb