YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/tablet/transaction_status_resolver.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/tablet/transaction_status_resolver.h"
15
16
#include "yb/client/transaction_rpc.h"
17
18
#include "yb/common/wire_protocol.h"
19
20
#include "yb/rpc/rpc.h"
21
22
#include "yb/tablet/transaction_participant_context.h"
23
24
#include "yb/tserver/tserver_service.pb.h"
25
26
#include "yb/util/atomic.h"
27
#include "yb/util/countdown_latch.h"
28
#include "yb/util/flag_tags.h"
29
#include "yb/util/logging.h"
30
#include "yb/util/result.h"
31
#include "yb/util/status_format.h"
32
33
DEFINE_test_flag(int32, inject_status_resolver_delay_ms, 0,
34
                 "Inject delay before launching transaction status resolver RPC.");
35
36
DEFINE_test_flag(int32, inject_status_resolver_complete_delay_ms, 0,
37
                 "Inject delay before counting down latch in transaction status resolver "
38
                 "complete.");
39
40
using namespace std::literals;
41
using namespace std::placeholders;
42
43
namespace yb {
44
namespace tablet {
45
46
class TransactionStatusResolver::Impl {
47
 public:
48
  Impl(TransactionParticipantContext* participant_context, rpc::Rpcs* rpcs,
49
       int max_transactions_per_request, TransactionStatusResolverCallback callback)
50
      : participant_context_(*participant_context), rpcs_(*rpcs),
51
        max_transactions_per_request_(max_transactions_per_request), callback_(std::move(callback)),
52
428
        log_prefix_(participant_context->LogPrefix()), handle_(rpcs_.InvalidHandle()) {}
53
54
428
  ~Impl() {
55
12
    LOG_IF_WITH_PREFIX(DFATAL, !closing_.load(std::memory_order_acquire))
56
12
        << "Destroy resolver without Shutdown";
57
428
  }
58
59
427
  void Shutdown() {
60
427
    closing_.store(true, std::memory_order_release);
61
419
    for (;;) {
62
425
      if (run_latch_.WaitFor(10s)) {
63
425
        break;
64
425
      }
65
18.4E
      LOG_WITH_PREFIX(DFATAL) << "Long wait for transaction status resolver to shutdown";
66
18.4E
    }
67
427
  }
68
69
423
  void Start(CoarseTimePoint deadline) {
70
8
    VLOG_WITH_PREFIX(2) << "Start, queues: " << queues_.size();
71
72
423
    deadline_ = deadline;
73
423
    run_latch_.Reset(1);
74
423
    Execute();
75
423
  }
76
77
0
  std::future<Status> ResultFuture() {
78
0
    return result_promise_.get_future();
79
0
  }
80
81
849
  bool Running() {
82
849
    return run_latch_.count() != 0;
83
849
  }
84
85
423
  void Add(const TabletId& status_tablet, const TransactionId& transaction_id) {
86
18.4E
    LOG_IF(DFATAL, run_latch_.count()) << "Add while running";
87
423
    queues_[status_tablet].push_back(transaction_id);
88
423
  }
89
90
 private:
91
857
  void Execute() {
92
4
    LOG_IF(DFATAL, !run_latch_.count()) << "Execute while running is false";
93
94
857
    if (CoarseMonoClock::now() >= deadline_) {
95
0
      Complete(STATUS(TimedOut, "Timed out to resolve transaction statuses"));
96
0
      return;
97
0
    }
98
857
    if (closing_.load(std::memory_order_acquire)) {
99
0
      Complete(STATUS(Aborted, "Aborted because of shutdown"));
100
0
      return;
101
0
    }
102
857
    if (queues_.empty() || max_transactions_per_request_ <= 0) {
103
431
      Complete(Status::OK());
104
431
      return;
105
431
    }
106
107
    // We access queues_ only while adding transaction and after that while resolving
108
    // transaction statuses, which is NOT concurrent.
109
    // So we could avoid doing synchronization here.
110
426
    auto& tablet_id_and_queue = *queues_.begin();
111
426
    tserver::GetTransactionStatusRequestPB req;
112
426
    req.set_tablet_id(tablet_id_and_queue.first);
113
426
    req.set_propagated_hybrid_time(participant_context_.Now().ToUint64());
114
426
    const auto& tablet_queue = tablet_id_and_queue.second;
115
426
    auto request_size = std::min<size_t>(max_transactions_per_request_, tablet_queue.size());
116
859
    for (size_t i = 0; i != request_size; ++i) {
117
433
      const auto& txn_id = tablet_queue[i];
118
18.4E
      VLOG_WITH_PREFIX(4) << "Checking txn status: " << txn_id;
119
433
      req.add_transaction_id()->assign(pointer_cast<const char*>(txn_id.data()), txn_id.size());
120
433
    }
121
122
426
    AtomicFlagSleepMs(&FLAGS_TEST_inject_status_resolver_delay_ms);
123
124
426
    auto client = participant_context_.client_future().get();
125
432
    if (!client || !rpcs_.RegisterAndStart(
126
432
        client::GetTransactionStatus(
127
432
            std::min(deadline_, TransactionRpcDeadline()),
128
432
            nullptr /* tablet */,
129
432
            client,
130
432
            &req,
131
432
            std::bind(&Impl::StatusReceived, this, _1, _2, request_size)),
132
0
        &handle_)) {
133
0
      Complete(STATUS(Aborted, "Aborted because cannot start RPC"));
134
0
    }
135
426
  }
136
137
0
  const std::string& LogPrefix() const {
138
0
    return log_prefix_;
139
0
  }
140
141
  void StatusReceived(Status status,
142
                      const tserver::GetTransactionStatusResponsePB& response,
143
432
                      int request_size) {
144
1
    VLOG_WITH_PREFIX(2) << "Received statuses: " << status << ", " << response.ShortDebugString();
145
146
432
    rpcs_.Unregister(&handle_);
147
148
432
    if (status.ok() && response.has_error()) {
149
0
      status = StatusFromPB(response.error().status());
150
0
    }
151
152
432
    if (!status.ok()) {
153
0
      LOG_WITH_PREFIX(WARNING) << "Failed to request transaction statuses: " << status;
154
0
      if (status.IsAborted()) {
155
0
        Complete(status);
156
0
      } else {
157
0
        Execute();
158
0
      }
159
0
      return;
160
0
    }
161
162
432
    if (response.has_propagated_hybrid_time()) {
163
428
      participant_context_.UpdateClock(HybridTime(response.propagated_hybrid_time()));
164
428
    }
165
166
432
    if ((response.status().size() != 1 && response.status().size() != request_size) ||
167
429
        (response.aborted_subtxn_set().size() != 1 &&
168
2
            response.aborted_subtxn_set().size() != request_size)) {
169
      // Node with old software version would always return 1 status.
170
0
      LOG_WITH_PREFIX(DFATAL)
171
0
          << "Bad response size, expected " << request_size << " entries, but found: "
172
0
          << response.ShortDebugString() << ", queue: " << AsString(queues_);
173
0
      Execute();
174
0
      return;
175
0
    }
176
177
432
    status_infos_.clear();
178
432
    status_infos_.resize(response.status().size());
179
432
    auto it = queues_.begin();
180
432
    auto& queue = it->second;
181
866
    for (int i = 0; i != response.status().size(); ++i) {
182
433
      auto& status_info = status_infos_[i];
183
433
      status_info.transaction_id = queue.front();
184
433
      status_info.status = response.status(i);
185
186
433
      auto aborted_subtxn_set_or_status = AbortedSubTransactionSet::FromPB(
187
433
        response.aborted_subtxn_set(i).set());
188
433
      if (!aborted_subtxn_set_or_status.ok()) {
189
0
        Complete(STATUS_FORMAT(
190
0
            IllegalState, "Cannot deserialize AbortedSubTransactionSet: $0",
191
0
            response.aborted_subtxn_set(i).DebugString()));
192
0
        return;
193
0
      }
194
433
      status_info.aborted_subtxn_set = aborted_subtxn_set_or_status.get();
195
196
434
      if (i < response.status_hybrid_time().size()) {
197
434
        status_info.status_ht = HybridTime(response.status_hybrid_time(i));
198
      // Could happen only when coordinator has an old version.
199
18.4E
      } else if (status_info.status == TransactionStatus::ABORTED) {
200
0
        status_info.status_ht = HybridTime::kMax;
201
18.4E
      } else {
202
18.4E
        Complete(STATUS_FORMAT(
203
18.4E
            IllegalState, "Missing status hybrid time for transaction status: $0",
204
18.4E
            TransactionStatus_Name(status_info.status)));
205
18.4E
        return;
206
18.4E
      }
207
434
      status_info.coordinator_safe_time = i < response.coordinator_safe_time().size()
208
417
          ? HybridTime::FromPB(response.coordinator_safe_time(i)) : HybridTime();
209
18.4E
      VLOG_WITH_PREFIX(4) << "Status: " << status_info.ToString();
210
434
      queue.pop_front();
211
434
    }
212
433
    if (queue.empty()) {
213
18.4E
      VLOG_WITH_PREFIX(2) << "Processed queue for: " << it->first;
214
429
      queues_.erase(it);
215
429
    }
216
217
433
    callback_(status_infos_);
218
219
433
    Execute();
220
433
  }
221
222
428
  void Complete(const Status& status) {
223
18.4E
    VLOG_WITH_PREFIX(2) << "Complete: " << status;
224
428
    result_promise_.set_value(status);
225
428
    AtomicFlagSleepMs(&FLAGS_TEST_inject_status_resolver_complete_delay_ms);
226
428
    run_latch_.CountDown();
227
428
  }
228
229
  TransactionParticipantContext& participant_context_;
230
  rpc::Rpcs& rpcs_;
231
  const int max_transactions_per_request_;
232
  TransactionStatusResolverCallback callback_;
233
234
  const std::string log_prefix_;
235
  rpc::Rpcs::Handle handle_;
236
237
  std::atomic<bool> closing_{false};
238
  CountDownLatch run_latch_{0};
239
  CoarseTimePoint deadline_;
240
  std::unordered_map<TabletId, std::deque<TransactionId>> queues_;
241
  std::vector<TransactionStatusInfo> status_infos_;
242
  std::promise<Status> result_promise_;
243
};
244
245
TransactionStatusResolver::TransactionStatusResolver(
246
    TransactionParticipantContext* participant_context, rpc::Rpcs* rpcs,
247
    int max_transactions_per_request, TransactionStatusResolverCallback callback)
248
    : impl_(new Impl(
249
429
        participant_context, rpcs, max_transactions_per_request, std::move(callback))) {
250
429
}
251
252
428
TransactionStatusResolver::~TransactionStatusResolver() {}
253
254
426
void TransactionStatusResolver::Shutdown() {
255
426
  impl_->Shutdown();
256
426
}
257
258
void TransactionStatusResolver::Add(
259
432
    const TabletId& status_tablet, const TransactionId& transaction_id) {
260
432
  impl_->Add(status_tablet, transaction_id);
261
432
}
262
263
424
void TransactionStatusResolver::Start(CoarseTimePoint deadline) {
264
424
  impl_->Start(deadline);
265
424
}
266
267
0
std::future<Status> TransactionStatusResolver::ResultFuture() {
268
0
  return impl_->ResultFuture();
269
0
}
270
271
856
bool TransactionStatusResolver::Running() const {
272
856
  return impl_->Running();
273
856
}
274
275
} // namespace tablet
276
} // namespace yb