YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/master/ysql_transaction_ddl.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/master/ysql_transaction_ddl.h"
15
16
#include "yb/client/transaction_rpc.h"
17
18
#include "yb/common/ql_expr.h"
19
#include "yb/common/wire_protocol.h"
20
21
#include "yb/docdb/doc_rowwise_iterator.h"
22
23
#include "yb/gutil/casts.h"
24
25
#include "yb/master/sys_catalog.h"
26
27
#include "yb/tablet/tablet.h"
28
#include "yb/tablet/tablet_metadata.h"
29
#include "yb/tablet/tablet_peer.h"
30
31
#include "yb/tserver/tserver_service.pb.h"
32
33
#include "yb/util/logging.h"
34
#include "yb/util/monotime.h"
35
#include "yb/util/net/net_fwd.h"
36
#include "yb/util/status_log.h"
37
38
DEFINE_int32(ysql_transaction_bg_task_wait_ms, 200,
39
  "Amount of time the catalog manager background task thread waits "
40
  "between runs");
41
42
namespace yb {
43
namespace master {
44
45
86
YsqlTransactionDdl::~YsqlTransactionDdl() {
46
  // Shutdown any outstanding RPCs.
47
86
  rpcs_.Shutdown();
48
86
}
49
50
void YsqlTransactionDdl::VerifyTransaction(
51
    const TransactionMetadata& transaction_metadata,
52
20.5k
    std::function<Status(bool)> complete_callback) {
53
20.5k
  SleepFor(MonoDelta::FromMilliseconds(FLAGS_ysql_transaction_bg_task_wait_ms));
54
55
20.5k
  YB_LOG_EVERY_N_SECS
(INFO, 1) << "Verifying Transaction " << transaction_metadata5.10k
;
56
57
20.5k
  tserver::GetTransactionStatusRequestPB req;
58
20.5k
  req.set_tablet_id(transaction_metadata.status_tablet);
59
20.5k
  req.add_transaction_id()->assign(
60
20.5k
      pointer_cast<const char*>(transaction_metadata.transaction_id.data()),
61
20.5k
      transaction_metadata.transaction_id.size());
62
63
20.5k
  auto rpc_handle = rpcs_.Prepare();
64
20.5k
  if (rpc_handle == rpcs_.InvalidHandle()) {
65
0
    LOG(WARNING) << "Shutting down. Cannot send GetTransactionStatus: " << transaction_metadata;
66
0
    return;
67
0
  }
68
20.5k
  auto client = client_future_.get();
69
20.5k
  if (!client) {
70
0
    LOG(WARNING) << "Shutting down. Cannot get GetTransactionStatus: " << transaction_metadata;
71
0
    return;
72
0
  }
73
  // We need to query the TransactionCoordinator here.  Can't use TransactionStatusResolver in
74
  // TransactionParticipant since this TransactionMetadata may not have any actual data flushed yet.
75
20.5k
  *rpc_handle = client::GetTransactionStatus(
76
20.5k
      TransactionRpcDeadline(),
77
20.5k
      nullptr /* tablet */,
78
20.5k
      client,
79
20.5k
      &req,
80
20.5k
      [this, rpc_handle, transaction_metadata, complete_callback]
81
20.5k
          (Status status, const tserver::GetTransactionStatusResponsePB& resp) {
82
20.4k
        auto retained = rpcs_.Unregister(rpc_handle);
83
20.4k
        TransactionReceived(transaction_metadata, complete_callback, std::move(status), resp);
84
20.4k
      });
85
20.5k
  (**rpc_handle).SendRpc();
86
20.5k
}
87
88
void YsqlTransactionDdl::TransactionReceived(
89
    const TransactionMetadata& transaction,
90
    std::function<Status(bool)> complete_callback,
91
20.5k
    Status txn_status, const tserver::GetTransactionStatusResponsePB& resp) {
92
20.5k
  if (!txn_status.ok()) {
93
1
    LOG(WARNING) << "Transaction Status attempt (" << transaction.ToString()
94
1
                 << ") failed with status " << txn_status;
95
1
    WARN_NOT_OK(thread_pool_->SubmitFunc([complete_callback] () {
96
1
      WARN_NOT_OK(complete_callback(false /* txn_rpc_success */), "Callback failure");
97
1
    }), "Failed to enqueue callback");
98
    // #5981: Improve failure handling to retry transient errors or recognize transaction complete.
99
20.5k
  } else if (resp.has_error()) {
100
0
    const Status s = StatusFromPB(resp.error().status());
101
0
    const tserver::TabletServerErrorPB::Code code = resp.error().code();
102
0
    LOG(WARNING) << "Transaction Status attempt (" << transaction.ToString()
103
0
                 << ") failed with error code " << tserver::TabletServerErrorPB::Code_Name(code)
104
0
                 << ": " << s;
105
0
    WARN_NOT_OK(thread_pool_->SubmitFunc([complete_callback] () {
106
0
      WARN_NOT_OK(complete_callback(false /* txn_rpc_success */), "Callback failure");
107
0
    }), "Failed to enqueue callback");
108
    // #5981: Maybe have the same heuristic as above?
109
20.5k
  } else {
110
20.5k
    YB_LOG_EVERY_N_SECS(INFO, 1) << "Got Response for " << transaction.ToString()
111
5.09k
                                 << ", resp: " << resp.ShortDebugString();
112
20.5k
    bool is_pending = (resp.status_size() == 0);
113
40.9k
    for (int i = 0; i < resp.status_size() && 
!is_pending20.4k
;
++i20.4k
) {
114
      // NOTE: COMMITTED state is also "pending" because we need APPLIED.
115
20.4k
      is_pending = resp.status(i) == TransactionStatus::PENDING ||
116
20.4k
                   
resp.status(i) == TransactionStatus::COMMITTED4.99k
;
117
20.4k
    }
118
20.5k
    if (is_pending) {
119
      // Re-enqueue if transaction is still pending.
120
15.6k
      WARN_NOT_OK(thread_pool_->SubmitFunc(
121
15.6k
          std::bind(&YsqlTransactionDdl::VerifyTransaction, this, transaction, complete_callback)),
122
15.6k
          "Could not submit VerifyTransaction to thread pool");
123
15.6k
    } else {
124
      // If this transaction isn't pending, then the transaction is in a terminal state.
125
      // Note: We ignore the resp.status() now, because it could be ABORT'd but actually a SUCCESS.
126
4.85k
      WARN_NOT_OK(thread_pool_->SubmitFunc([complete_callback] () {
127
4.85k
        WARN_NOT_OK(complete_callback(true /* txn_rpc_success */), "Callback failure");
128
4.85k
      }), "Failed to enqueue callback");
129
4.85k
    }
130
20.5k
  }
131
20.5k
}
132
133
4.85k
Result<bool> YsqlTransactionDdl::PgEntryExists(TableId pg_table_id, Result<uint32_t> entry_oid) {
134
4.85k
  auto tablet_peer = sys_catalog_->tablet_peer();
135
4.85k
  if (!tablet_peer || !tablet_peer->tablet()) {
136
0
    return STATUS(ServiceUnavailable, "SysCatalog unavailable");
137
0
  }
138
4.85k
  const tablet::Tablet* catalog_tablet = tablet_peer->tablet();
139
4.85k
  const Schema& pg_database_schema =
140
4.85k
      *VERIFY_RESULT(catalog_tablet->metadata()->GetTableInfo(pg_table_id))->schema;
141
142
  // Use Scan to query the 'pg_database' table, filtering by our 'oid'.
143
0
  Schema projection;
144
4.85k
  RETURN_NOT_OK(pg_database_schema.CreateProjectionByNames({"oid"}, &projection,
145
4.85k
                pg_database_schema.num_key_columns()));
146
4.85k
  const auto oid_col_id = VERIFY_RESULT(projection.ColumnIdByName("oid")).rep();
147
4.85k
  auto iter = VERIFY_RESULT(catalog_tablet->NewRowIterator(
148
4.85k
      projection.CopyWithoutColumnIds(), {} /* read_hybrid_time */, pg_table_id));
149
4.85k
  auto e_oid_val = VERIFY_RESULT(std::move(entry_oid));
150
0
  {
151
4.85k
    auto doc_iter = down_cast<docdb::DocRowwiseIterator*>(iter.get());
152
4.85k
    PgsqlConditionPB cond;
153
4.85k
    cond.add_operands()->set_column_id(oid_col_id);
154
4.85k
    cond.set_op(QL_OP_EQUAL);
155
4.85k
    cond.add_operands()->mutable_value()->set_uint32_value(e_oid_val);
156
4.85k
    const std::vector<docdb::PrimitiveValue> empty_key_components;
157
4.85k
    docdb::DocPgsqlScanSpec spec(
158
4.85k
        projection, rocksdb::kDefaultQueryId, empty_key_components, empty_key_components,
159
4.85k
        &cond, boost::none /* hash_code */, boost::none /* max_hash_code */, nullptr /* where */);
160
4.85k
    RETURN_NOT_OK(doc_iter->Init(spec));
161
4.85k
  }
162
163
  // Expect exactly one row, which means the transaction was a success.
164
4.85k
  QLTableRow row;
165
4.85k
  if (VERIFY_RESULT(iter->HasNext())) {
166
4.69k
    RETURN_NOT_OK(iter->NextRow(&row));
167
4.69k
    return true;
168
4.69k
  }
169
155
  return false;
170
4.85k
}
171
172
}  // namespace master
173
}  // namespace yb