YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/master/ysql_transaction_ddl.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/master/ysql_transaction_ddl.h"
15
16
#include "yb/client/transaction_rpc.h"
17
18
#include "yb/common/ql_expr.h"
19
#include "yb/common/wire_protocol.h"
20
21
#include "yb/docdb/doc_rowwise_iterator.h"
22
23
#include "yb/gutil/casts.h"
24
25
#include "yb/master/sys_catalog.h"
26
27
#include "yb/tablet/tablet.h"
28
#include "yb/tablet/tablet_metadata.h"
29
#include "yb/tablet/tablet_peer.h"
30
31
#include "yb/tserver/tserver_service.pb.h"
32
33
#include "yb/util/logging.h"
34
#include "yb/util/monotime.h"
35
#include "yb/util/net/net_fwd.h"
36
#include "yb/util/status_log.h"
37
38
DEFINE_int32(ysql_transaction_bg_task_wait_ms, 200,
39
  "Amount of time the catalog manager background task thread waits "
40
  "between runs");
41
42
namespace yb {
43
namespace master {
44
45
85
YsqlTransactionDdl::~YsqlTransactionDdl() {
46
  // Shutdown any outstanding RPCs.
47
85
  rpcs_.Shutdown();
48
85
}
49
50
void YsqlTransactionDdl::VerifyTransaction(
51
    const TransactionMetadata& transaction_metadata,
52
5.64k
    std::function<Status(bool)> complete_callback) {
53
5.64k
  SleepFor(MonoDelta::FromMilliseconds(FLAGS_ysql_transaction_bg_task_wait_ms));
54
55
5.64k
  YB_LOG_EVERY_N_SECS(INFO, 1) << "Verifying Transaction " << transaction_metadata;
56
57
5.64k
  tserver::GetTransactionStatusRequestPB req;
58
5.64k
  req.set_tablet_id(transaction_metadata.status_tablet);
59
5.64k
  req.add_transaction_id()->assign(
60
5.64k
      pointer_cast<const char*>(transaction_metadata.transaction_id.data()),
61
5.64k
      transaction_metadata.transaction_id.size());
62
63
5.64k
  auto rpc_handle = rpcs_.Prepare();
64
5.64k
  if (rpc_handle == rpcs_.InvalidHandle()) {
65
0
    LOG(WARNING) << "Shutting down. Cannot send GetTransactionStatus: " << transaction_metadata;
66
0
    return;
67
0
  }
68
5.64k
  auto client = client_future_.get();
69
5.64k
  if (!client) {
70
0
    LOG(WARNING) << "Shutting down. Cannot get GetTransactionStatus: " << transaction_metadata;
71
0
    return;
72
0
  }
73
  // We need to query the TransactionCoordinator here.  Can't use TransactionStatusResolver in
74
  // TransactionParticipant since this TransactionMetadata may not have any actual data flushed yet.
75
5.64k
  *rpc_handle = client::GetTransactionStatus(
76
5.64k
      TransactionRpcDeadline(),
77
5.64k
      nullptr /* tablet */,
78
5.64k
      client,
79
5.64k
      &req,
80
5.64k
      [this, rpc_handle, transaction_metadata, complete_callback]
81
5.63k
          (Status status, const tserver::GetTransactionStatusResponsePB& resp) {
82
5.63k
        auto retained = rpcs_.Unregister(rpc_handle);
83
5.63k
        TransactionReceived(transaction_metadata, complete_callback, std::move(status), resp);
84
5.63k
      });
85
5.64k
  (**rpc_handle).SendRpc();
86
5.64k
}
87
88
void YsqlTransactionDdl::TransactionReceived(
89
    const TransactionMetadata& transaction,
90
    std::function<Status(bool)> complete_callback,
91
5.64k
    Status txn_status, const tserver::GetTransactionStatusResponsePB& resp) {
92
5.64k
  if (!txn_status.ok()) {
93
0
    LOG(WARNING) << "Transaction Status attempt (" << transaction.ToString()
94
0
                 << ") failed with status " << txn_status;
95
0
    WARN_NOT_OK(thread_pool_->SubmitFunc([complete_callback] () {
96
0
      WARN_NOT_OK(complete_callback(false /* txn_rpc_success */), "Callback failure");
97
0
    }), "Failed to enqueue callback");
98
    // #5981: Improve failure handling to retry transient errors or recognize transaction complete.
99
5.64k
  } else if (resp.has_error()) {
100
0
    const Status s = StatusFromPB(resp.error().status());
101
0
    const tserver::TabletServerErrorPB::Code code = resp.error().code();
102
0
    LOG(WARNING) << "Transaction Status attempt (" << transaction.ToString()
103
0
                 << ") failed with error code " << tserver::TabletServerErrorPB::Code_Name(code)
104
0
                 << ": " << s;
105
0
    WARN_NOT_OK(thread_pool_->SubmitFunc([complete_callback] () {
106
0
      WARN_NOT_OK(complete_callback(false /* txn_rpc_success */), "Callback failure");
107
0
    }), "Failed to enqueue callback");
108
    // #5981: Maybe have the same heuristic as above?
109
5.64k
  } else {
110
5.64k
    YB_LOG_EVERY_N_SECS(INFO, 1) << "Got Response for " << transaction.ToString()
111
1.49k
                                 << ", resp: " << resp.ShortDebugString();
112
5.64k
    bool is_pending = (resp.status_size() == 0);
113
11.2k
    for (int i = 0; i < resp.status_size() && !is_pending; ++i) {
114
      // NOTE: COMMITTED state is also "pending" because we need APPLIED.
115
5.64k
      is_pending = resp.status(i) == TransactionStatus::PENDING ||
116
1.49k
                   resp.status(i) == TransactionStatus::COMMITTED;
117
5.64k
    }
118
5.64k
    if (is_pending) {
119
      // Re-enqueue if transaction is still pending.
120
4.20k
      WARN_NOT_OK(thread_pool_->SubmitFunc(
121
4.20k
          std::bind(&YsqlTransactionDdl::VerifyTransaction, this, transaction, complete_callback)),
122
4.20k
          "Could not submit VerifyTransaction to thread pool");
123
1.43k
    } else {
124
      // If this transaction isn't pending, then the transaction is in a terminal state.
125
      // Note: We ignore the resp.status() now, because it could be ABORT'd but actually a SUCCESS.
126
1.43k
      WARN_NOT_OK(thread_pool_->SubmitFunc([complete_callback] () {
127
1.43k
        WARN_NOT_OK(complete_callback(true /* txn_rpc_success */), "Callback failure");
128
1.43k
      }), "Failed to enqueue callback");
129
1.43k
    }
130
5.64k
  }
131
5.64k
}
132
133
1.43k
Result<bool> YsqlTransactionDdl::PgEntryExists(TableId pg_table_id, Result<uint32_t> entry_oid) {
134
1.43k
  auto tablet_peer = sys_catalog_->tablet_peer();
135
1.43k
  if (!tablet_peer || !tablet_peer->tablet()) {
136
0
    return STATUS(ServiceUnavailable, "SysCatalog unavailable");
137
0
  }
138
1.43k
  const tablet::Tablet* catalog_tablet = tablet_peer->tablet();
139
1.43k
  const Schema& pg_database_schema =
140
1.43k
      *VERIFY_RESULT(catalog_tablet->metadata()->GetTableInfo(pg_table_id))->schema;
141
142
  // Use Scan to query the 'pg_database' table, filtering by our 'oid'.
143
1.43k
  Schema projection;
144
1.43k
  RETURN_NOT_OK(pg_database_schema.CreateProjectionByNames({"oid"}, &projection,
145
1.43k
                pg_database_schema.num_key_columns()));
146
1.43k
  const auto oid_col_id = VERIFY_RESULT(projection.ColumnIdByName("oid")).rep();
147
1.43k
  auto iter = VERIFY_RESULT(catalog_tablet->NewRowIterator(
148
1.43k
      projection.CopyWithoutColumnIds(), {} /* read_hybrid_time */, pg_table_id));
149
1.43k
  auto e_oid_val = VERIFY_RESULT(std::move(entry_oid));
150
1.43k
  {
151
1.43k
    auto doc_iter = down_cast<docdb::DocRowwiseIterator*>(iter.get());
152
1.43k
    PgsqlConditionPB cond;
153
1.43k
    cond.add_operands()->set_column_id(oid_col_id);
154
1.43k
    cond.set_op(QL_OP_EQUAL);
155
1.43k
    cond.add_operands()->mutable_value()->set_uint32_value(e_oid_val);
156
1.43k
    const std::vector<docdb::PrimitiveValue> empty_key_components;
157
1.43k
    docdb::DocPgsqlScanSpec spec(
158
1.43k
        projection, rocksdb::kDefaultQueryId, empty_key_components, empty_key_components,
159
1.43k
        &cond, boost::none /* hash_code */, boost::none /* max_hash_code */, nullptr /* where */);
160
1.43k
    RETURN_NOT_OK(doc_iter->Init(spec));
161
1.43k
  }
162
163
  // Expect exactly one row, which means the transaction was a success.
164
1.43k
  QLTableRow row;
165
1.43k
  if (VERIFY_RESULT(iter->HasNext())) {
166
1.39k
    RETURN_NOT_OK(iter->NextRow(&row));
167
1.39k
    return true;
168
39
  }
169
39
  return false;
170
39
}
171
172
}  // namespace master
173
}  // namespace yb