YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/tablet/transaction_status_resolver.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/tablet/transaction_status_resolver.h"
15
16
#include "yb/client/transaction_rpc.h"
17
18
#include "yb/common/wire_protocol.h"
19
20
#include "yb/rpc/rpc.h"
21
22
#include "yb/tablet/transaction_participant_context.h"
23
24
#include "yb/tserver/tserver_service.pb.h"
25
26
#include "yb/util/atomic.h"
27
#include "yb/util/countdown_latch.h"
28
#include "yb/util/flag_tags.h"
29
#include "yb/util/logging.h"
30
#include "yb/util/result.h"
31
#include "yb/util/status_format.h"
32
33
DEFINE_test_flag(int32, inject_status_resolver_delay_ms, 0,
34
                 "Inject delay before launching transaction status resolver RPC.");
35
36
DEFINE_test_flag(int32, inject_status_resolver_complete_delay_ms, 0,
37
                 "Inject delay before counting down latch in transaction status resolver "
38
                 "complete.");
39
40
using namespace std::literals;
41
using namespace std::placeholders;
42
43
namespace yb {
44
namespace tablet {
45
46
class TransactionStatusResolver::Impl {
47
 public:
48
  Impl(TransactionParticipantContext* participant_context, rpc::Rpcs* rpcs,
49
       int max_transactions_per_request, TransactionStatusResolverCallback callback)
50
      : participant_context_(*participant_context), rpcs_(*rpcs),
51
        max_transactions_per_request_(max_transactions_per_request), callback_(std::move(callback)),
52
1.69k
        log_prefix_(participant_context->LogPrefix()), handle_(rpcs_.InvalidHandle()) {}
53
54
1.65k
  ~Impl() {
55
1.65k
    
LOG_IF_WITH_PREFIX22
(DFATAL, !closing_.load(std::memory_order_acquire))
56
22
        << "Destroy resolver without Shutdown";
57
1.65k
  }
58
59
1.68k
  void Shutdown() {
60
1.68k
    closing_.store(true, std::memory_order_release);
61
1.68k
    for (;;) {
62
1.66k
      if (
run_latch_.WaitFor(10s)1.65k
) {
63
1.66k
        break;
64
1.66k
      }
65
18.4E
      LOG_WITH_PREFIX(DFATAL) << "Long wait for transaction status resolver to shutdown";
66
18.4E
    }
67
1.68k
  }
68
69
1.66k
  void Start(CoarseTimePoint deadline) {
70
1.66k
    
VLOG_WITH_PREFIX4
(2) << "Start, queues: " << queues_.size()4
;
71
72
1.66k
    deadline_ = deadline;
73
1.66k
    run_latch_.Reset(1);
74
1.66k
    Execute();
75
1.66k
  }
76
77
15
  std::future<Status> ResultFuture() {
78
15
    return result_promise_.get_future();
79
15
  }
80
81
3.41k
  bool Running() {
82
3.41k
    return run_latch_.count() != 0;
83
3.41k
  }
84
85
1.73k
  void Add(const TabletId& status_tablet, const TransactionId& transaction_id) {
86
1.73k
    LOG_IF
(DFATAL, run_latch_.count()) << "Add while running"33
;
87
1.73k
    queues_[status_tablet].push_back(transaction_id);
88
1.73k
  }
89
90
 private:
91
3.41k
  void Execute() {
92
3.41k
    LOG_IF
(DFATAL, !run_latch_.count()) << "Execute while running is false"26
;
93
94
3.41k
    if (CoarseMonoClock::now() >= deadline_) {
95
0
      Complete(STATUS(TimedOut, "Timed out to resolve transaction statuses"));
96
0
      return;
97
0
    }
98
3.41k
    if (closing_.load(std::memory_order_acquire)) {
99
0
      Complete(STATUS(Aborted, "Aborted because of shutdown"));
100
0
      return;
101
0
    }
102
3.41k
    if (queues_.empty() || 
max_transactions_per_request_ <= 01.67k
) {
103
1.71k
      Complete(Status::OK());
104
1.71k
      return;
105
1.71k
    }
106
107
    // We access queues_ only while adding transaction and after that while resolving
108
    // transaction statuses, which is NOT concurrent.
109
    // So we could avoid doing synchronization here.
110
1.69k
    auto& tablet_id_and_queue = *queues_.begin();
111
1.69k
    tserver::GetTransactionStatusRequestPB req;
112
1.69k
    req.set_tablet_id(tablet_id_and_queue.first);
113
1.69k
    req.set_propagated_hybrid_time(participant_context_.Now().ToUint64());
114
1.69k
    const auto& tablet_queue = tablet_id_and_queue.second;
115
1.69k
    auto request_size = std::min<size_t>(max_transactions_per_request_, tablet_queue.size());
116
3.45k
    for (size_t i = 0; i != request_size; 
++i1.75k
) {
117
1.75k
      const auto& txn_id = tablet_queue[i];
118
1.75k
      
VLOG_WITH_PREFIX1
(4) << "Checking txn status: " << txn_id1
;
119
1.75k
      req.add_transaction_id()->assign(pointer_cast<const char*>(txn_id.data()), txn_id.size());
120
1.75k
    }
121
122
1.69k
    AtomicFlagSleepMs(&FLAGS_TEST_inject_status_resolver_delay_ms);
123
124
1.69k
    auto client = participant_context_.client_future().get();
125
1.72k
    if (
!client1.69k
|| !rpcs_.RegisterAndStart(
126
1.72k
        client::GetTransactionStatus(
127
1.72k
            std::min(deadline_, TransactionRpcDeadline()),
128
1.72k
            nullptr /* tablet */,
129
1.72k
            client,
130
1.72k
            &req,
131
1.72k
            std::bind(&Impl::StatusReceived, this, _1, _2, request_size)),
132
1.72k
        &handle_)) {
133
0
      Complete(STATUS(Aborted, "Aborted because cannot start RPC"));
134
0
    }
135
1.69k
  }
136
137
7
  const std::string& LogPrefix() const {
138
7
    return log_prefix_;
139
7
  }
140
141
  void StatusReceived(Status status,
142
                      const tserver::GetTransactionStatusResponsePB& response,
143
1.71k
                      int request_size) {
144
18.4E
    VLOG_WITH_PREFIX(2) << "Received statuses: " << status << ", " << response.ShortDebugString();
145
146
1.71k
    rpcs_.Unregister(&handle_);
147
148
1.71k
    if (status.ok() && 
response.has_error()1.71k
) {
149
0
      status = StatusFromPB(response.error().status());
150
0
    }
151
152
1.71k
    if (!status.ok()) {
153
7
      LOG_WITH_PREFIX(WARNING) << "Failed to request transaction statuses: " << status;
154
7
      if (status.IsAborted()) {
155
0
        Complete(status);
156
7
      } else {
157
7
        Execute();
158
7
      }
159
7
      return;
160
7
    }
161
162
1.70k
    if (response.has_propagated_hybrid_time()) {
163
1.70k
      participant_context_.UpdateClock(HybridTime(response.propagated_hybrid_time()));
164
1.70k
    }
165
166
1.70k
    if ((response.status().size() != 1 && 
response.status().size() != request_size16
) ||
167
1.70k
        
(1.70k
response.aborted_subtxn_set().size() != 01.70k
&& // Old node may not populate these.
168
1.70k
            response.aborted_subtxn_set().size() != request_size)) {
169
      // Node with old software version would always return 1 status.
170
0
      LOG_WITH_PREFIX(DFATAL)
171
0
          << "Bad response size, expected " << request_size << " entries, but found: "
172
0
          << response.ShortDebugString() << ", queue: " << AsString(queues_);
173
0
      Execute();
174
0
      return;
175
0
    }
176
177
1.70k
    status_infos_.clear();
178
1.70k
    status_infos_.resize(response.status().size());
179
1.70k
    auto it = queues_.begin();
180
1.70k
    auto& queue = it->second;
181
3.43k
    for (int i = 0; i != response.status().size(); 
++i1.72k
) {
182
1.73k
      auto& status_info = status_infos_[i];
183
1.73k
      status_info.transaction_id = queue.front();
184
1.73k
      status_info.status = response.status(i);
185
186
1.73k
      if (PREDICT_FALSE(response.aborted_subtxn_set().empty())) {
187
0
        YB_LOG_EVERY_N(WARNING, 1)
188
0
            << "Empty aborted_subtxn_set in transaction status response. "
189
0
            << "This should only happen when nodes are on different versions, e.g. during upgrade.";
190
1.73k
      } else {
191
1.73k
        auto aborted_subtxn_set_or_status = AbortedSubTransactionSet::FromPB(
192
1.73k
          response.aborted_subtxn_set(i).set());
193
1.73k
        if (!aborted_subtxn_set_or_status.ok()) {
194
0
          Complete(STATUS_FORMAT(
195
0
              IllegalState, "Cannot deserialize AbortedSubTransactionSet: $0",
196
0
              response.aborted_subtxn_set(i).DebugString()));
197
0
          return;
198
0
        }
199
1.73k
        status_info.aborted_subtxn_set = aborted_subtxn_set_or_status.get();
200
1.73k
      }
201
202
1.73k
      if (i < response.status_hybrid_time().size()) {
203
1.72k
        status_info.status_ht = HybridTime(response.status_hybrid_time(i));
204
      // Could happen only when coordinator has an old version.
205
1.72k
      } else 
if (2
status_info.status == TransactionStatus::ABORTED2
) {
206
0
        status_info.status_ht = HybridTime::kMax;
207
2
      } else {
208
2
        Complete(STATUS_FORMAT(
209
2
            IllegalState, "Missing status hybrid time for transaction status: $0",
210
2
            TransactionStatus_Name(status_info.status)));
211
2
        return;
212
2
      }
213
1.72k
      status_info.coordinator_safe_time = i < response.coordinator_safe_time().size()
214
1.72k
          ? 
HybridTime::FromPB(response.coordinator_safe_time(i))79
:
HybridTime()1.65k
;
215
18.4E
      VLOG_WITH_PREFIX(4) << "Status: " << status_info.ToString();
216
1.72k
      queue.pop_front();
217
1.72k
    }
218
1.70k
    if (queue.empty()) {
219
1.70k
      
VLOG_WITH_PREFIX0
(2) << "Processed queue for: " << it->first0
;
220
1.70k
      queues_.erase(it);
221
1.70k
    }
222
223
1.70k
    callback_(status_infos_);
224
225
1.70k
    Execute();
226
1.70k
  }
227
228
1.70k
  void Complete(const Status& status) {
229
18.4E
    VLOG_WITH_PREFIX(2) << "Complete: " << status;
230
1.70k
    result_promise_.set_value(status);
231
1.70k
    AtomicFlagSleepMs(&FLAGS_TEST_inject_status_resolver_complete_delay_ms);
232
1.70k
    run_latch_.CountDown();
233
1.70k
  }
234
235
  TransactionParticipantContext& participant_context_;
236
  rpc::Rpcs& rpcs_;
237
  const int max_transactions_per_request_;
238
  TransactionStatusResolverCallback callback_;
239
240
  const std::string log_prefix_;
241
  rpc::Rpcs::Handle handle_;
242
243
  std::atomic<bool> closing_{false};
244
  CountDownLatch run_latch_{0};
245
  CoarseTimePoint deadline_;
246
  std::unordered_map<TabletId, std::deque<TransactionId>> queues_;
247
  std::vector<TransactionStatusInfo> status_infos_;
248
  std::promise<Status> result_promise_;
249
};
250
251
TransactionStatusResolver::TransactionStatusResolver(
252
    TransactionParticipantContext* participant_context, rpc::Rpcs* rpcs,
253
    int max_transactions_per_request, TransactionStatusResolverCallback callback)
254
    : impl_(new Impl(
255
1.69k
        participant_context, rpcs, max_transactions_per_request, std::move(callback))) {
256
1.69k
}
257
258
1.67k
TransactionStatusResolver::~TransactionStatusResolver() {}
259
260
1.68k
void TransactionStatusResolver::Shutdown() {
261
1.68k
  impl_->Shutdown();
262
1.68k
}
263
264
void TransactionStatusResolver::Add(
265
1.72k
    const TabletId& status_tablet, const TransactionId& transaction_id) {
266
1.72k
  impl_->Add(status_tablet, transaction_id);
267
1.72k
}
268
269
1.68k
void TransactionStatusResolver::Start(CoarseTimePoint deadline) {
270
1.68k
  impl_->Start(deadline);
271
1.68k
}
272
273
15
std::future<Status> TransactionStatusResolver::ResultFuture() {
274
15
  return impl_->ResultFuture();
275
15
}
276
277
3.44k
bool TransactionStatusResolver::Running() const {
278
3.44k
  return impl_->Running();
279
3.44k
}
280
281
} // namespace tablet
282
} // namespace yb