YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/client/transaction_cleanup.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/client/transaction_cleanup.h"
15
16
#include "yb/client/client.h"
17
#include "yb/client/meta_cache.h"
18
19
#include "yb/rpc/rpc_controller.h"
20
21
#include "yb/tserver/tserver_service.proxy.h"
22
23
#include "yb/util/logging.h"
24
#include "yb/util/result.h"
25
26
DEFINE_CAPABILITY(GracefulCleanup, 0x5512d2a9);
27
28
using namespace std::literals;
29
using namespace std::placeholders;
30
31
namespace yb {
32
namespace client {
33
34
namespace {
35
36
class TransactionCleanup : public std::enable_shared_from_this<TransactionCleanup> {
37
 public:
38
  TransactionCleanup(
39
      YBClient* client, const scoped_refptr<ClockBase>& clock, const TransactionId& transaction_id,
40
      Sealed sealed, CleanupType type)
41
      : client_(client), clock_(clock), transaction_id_(transaction_id), sealed_(sealed),
42
208k
        type_(type) {
43
208k
  }
44
45
208k
  void Perform(const std::vector<TabletId>& tablet_ids) {
46
208k
    auto self = shared_from_this();
47
309k
    for (const auto& tablet_id : tablet_ids) {
48
      // TODO(tsplit): pass table if needed as a part of
49
      // https://github.com/yugabyte/yugabyte-db/issues/4942.
50
309k
      client_->LookupTabletById(
51
309k
          tablet_id,
52
309k
          /* table =*/ nullptr,
53
309k
          master::IncludeInactive::kFalse,
54
309k
          TransactionRpcDeadline(),
55
309k
          std::bind(&TransactionCleanup::LookupTabletDone, this, _1, self),
56
309k
          client::UseCache::kTrue);
57
309k
    }
58
208k
  }
59
60
 private:
61
  void LookupTabletDone(const Result<internal::RemoteTabletPtr>& remote_tablet,
62
309k
                        const std::shared_ptr<TransactionCleanup>& self) {
63
309k
    if (!remote_tablet.ok()) {
64
      // Intents will be cleaned up later in this case.
65
36
      LOG_WITH_PREFIX(WARNING) << "Tablet lookup failed: " << remote_tablet.status();
66
36
      return;
67
36
    }
68
309k
    
VLOG_WITH_PREFIX102
(1) << "Lookup tablet for cleanup done: " << yb::ToString(*remote_tablet)102
;
69
309k
    auto remote_tablet_servers = (**remote_tablet).GetRemoteTabletServers(
70
309k
        internal::IncludeFailedReplicas::kTrue);
71
72
309k
    constexpr auto kCallTimeout = 15s;
73
309k
    auto now = clock_->Now().ToUint64();
74
75
309k
    const auto& tablet_id = (**remote_tablet).tablet_id();
76
77
309k
    std::lock_guard<std::mutex> lock(mutex_);
78
309k
    calls_.reserve(calls_.size() + remote_tablet_servers.size());
79
927k
    for (auto* server : remote_tablet_servers) {
80
927k
      if (type_ == CleanupType::kGraceful && 
!server->HasCapability(CAPABILITY_GracefulCleanup)248k
) {
81
0
        VLOG_WITH_PREFIX(1)
82
0
            << "Skipping graceful cleanup at T " << (**remote_tablet).tablet_id() << " P "
83
0
            << server->permanent_uuid() << " because server does support it";
84
0
        continue;
85
0
      }
86
927k
      
VLOG_WITH_PREFIX21
(2) << "Sending cleanup to T " << (**remote_tablet).tablet_id() << " P "
87
21
                          << server->permanent_uuid();
88
927k
      auto status = server->InitProxy(client_);
89
927k
      if (!status.ok()) {
90
0
        LOG_WITH_PREFIX(WARNING) << "Failed to init proxy to " << server->ToString() << ": "
91
0
                                 << status;
92
0
        continue;
93
0
      }
94
927k
      calls_.emplace_back();
95
927k
      auto& call = calls_.back();
96
97
927k
      auto& request = call.request;
98
927k
      request.set_tablet_id(tablet_id);
99
927k
      request.set_propagated_hybrid_time(now);
100
927k
      auto& state = *request.mutable_state();
101
927k
      state.set_transaction_id(transaction_id_.data(), transaction_id_.size());
102
927k
      state.set_status(type_ == CleanupType::kImmediate ? 
TransactionStatus::IMMEDIATE_CLEANUP678k
103
927k
                                                        : 
TransactionStatus::GRACEFUL_CLEANUP248k
);
104
927k
      state.set_sealed(sealed_);
105
106
927k
      call.controller.set_timeout(kCallTimeout);
107
108
927k
      server->proxy()->UpdateTransactionAsync(
109
927k
          request, &call.response, &call.controller,
110
927k
          [this, self, remote_tablet = *remote_tablet, server] {
111
927k
            
VLOG_WITH_PREFIX34
(3) << "Cleaned intents at T " << remote_tablet->tablet_id() << " P "
112
34
                                << server->permanent_uuid();
113
927k
          });
114
927k
    }
115
309k
  }
116
117
36
  std::string LogPrefix() const {
118
36
    return Format("ID $0: ", transaction_id_);
119
36
  }
120
121
  struct Call {
122
    tserver::UpdateTransactionRequestPB request;
123
    tserver::UpdateTransactionResponsePB response;
124
    rpc::RpcController controller;
125
  };
126
127
  YBClient* const client_;
128
  const scoped_refptr<ClockBase> clock_;
129
  const TransactionId transaction_id_;
130
  const Sealed sealed_;
131
  const CleanupType type_;
132
133
  std::mutex mutex_;
134
  boost::container::stable_vector<Call> calls_ GUARDED_BY(mutex_);
135
};
136
137
} // namespace
138
139
void CleanupTransaction(
140
    YBClient* client, const scoped_refptr<ClockBase>& clock, const TransactionId& transaction_id,
141
208k
    Sealed sealed, CleanupType type, const std::vector<TabletId>& tablets) {
142
208k
  auto cleanup = std::make_shared<TransactionCleanup>(
143
208k
      client, clock, transaction_id, sealed, type);
144
208k
  cleanup->Perform(tablets);
145
208k
}
146
147
} // namespace client
148
} // namespace yb