YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/consensus/peer_manager.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/consensus/peer_manager.h"
34
35
#include <mutex>
36
37
#include "yb/consensus/consensus_peers.h"
38
39
#include "yb/gutil/map-util.h"
40
41
#include "yb/util/logging.h"
42
#include "yb/util/threadpool.h"
43
44
DECLARE_bool(enable_multi_raft_heartbeat_batcher);
45
46
namespace yb {
47
namespace consensus {
48
49
using log::Log;
50
using strings::Substitute;
51
52
PeerManager::PeerManager(
53
    const std::string tablet_id,
54
    const std::string local_uuid,
55
    PeerProxyFactory* peer_proxy_factory,
56
    PeerMessageQueue* queue,
57
    ThreadPoolToken* raft_pool_token,
58
    consensus::MultiRaftManager* multi_raft_manager)
59
    : tablet_id_(tablet_id),
60
      local_uuid_(local_uuid),
61
      peer_proxy_factory_(peer_proxy_factory),
62
      queue_(queue),
63
      raft_pool_token_(raft_pool_token),
64
88.8k
      multi_raft_manager_(multi_raft_manager) {
65
88.8k
}
66
67
47.8k
PeerManager::~PeerManager() {
68
47.8k
  Close();
69
47.8k
}
70
71
38.0k
void PeerManager::UpdateRaftConfig(const RaftConfigPB& config) {
72
18.4E
  VLOG_WITH_PREFIX(1) << "Updating peers from new config: " << config.ShortDebugString();
73
74
38.0k
  std::lock_guard<simple_spinlock> lock(lock_);
75
  // Create new peers.
76
113k
  for (const RaftPeerPB& peer_pb : config.peers()) {
77
113k
    if (peers_.find(peer_pb.permanent_uuid()) != peers_.end()) {
78
6.24k
      continue;
79
6.24k
    }
80
107k
    if (peer_pb.permanent_uuid() == local_uuid_) {
81
38.0k
      continue;
82
38.0k
    }
83
84
69.5k
    VLOG_WITH_PREFIX(1) << "Adding remote peer. Peer: " << peer_pb.ShortDebugString();
85
69.5k
    MultiRaftHeartbeatBatcherPtr multi_raft_batcher = nullptr;
86
69.5k
    if (multi_raft_manager_) {
87
69.5k
      multi_raft_batcher = multi_raft_manager_->AddOrGetBatcher(peer_pb);
88
69.5k
    }
89
69.5k
    auto remote_peer = Peer::NewRemotePeer(
90
69.5k
        peer_pb, tablet_id_, local_uuid_, peer_proxy_factory_->NewProxy(peer_pb), queue_,
91
69.5k
        multi_raft_batcher, raft_pool_token_, consensus_, peer_proxy_factory_->messenger());
92
69.5k
    if (!remote_peer.ok()) {
93
0
      LOG_WITH_PREFIX(WARNING)
94
0
          << "Failed to create remote peer for " << peer_pb.ShortDebugString() << ": "
95
0
          << remote_peer.status();
96
0
      return;
97
0
    }
98
99
69.5k
    peers_[peer_pb.permanent_uuid()] = std::move(*remote_peer);
100
69.5k
  }
101
38.0k
}
102
103
5.11M
void PeerManager::SignalRequest(RequestTriggerMode trigger_mode) {
104
5.11M
  std::lock_guard<simple_spinlock> lock(lock_);
105
14.8M
  for (auto iter = peers_.begin(); iter != peers_.end();) {
106
9.69M
    Status s = iter->second->SignalRequest(trigger_mode);
107
9.69M
    if (PREDICT_FALSE(s.IsIllegalState())) {
108
0
      LOG_WITH_PREFIX(WARNING)
109
0
          << "Peer was closed: " << s << ", removing from peers. Peer: "
110
0
          << (*iter).second->peer_pb().ShortDebugString();
111
0
      iter = peers_.erase(iter);
112
9.69M
    } else {
113
9.69M
      if (PREDICT_FALSE(!s.ok())) {
114
0
        LOG_WITH_PREFIX(WARNING)
115
0
            << "Peer " << (*iter).second->peer_pb().ShortDebugString()
116
0
            << " failed to send request: " << s;
117
0
      }
118
9.69M
      iter++;
119
9.69M
    }
120
9.69M
  }
121
5.11M
}
122
123
189k
void PeerManager::Close() {
124
189k
  std::lock_guard<simple_spinlock> lock(lock_);
125
41.3k
  for (const auto& entry : peers_) {
126
41.3k
    entry.second->Close();
127
41.3k
  }
128
189k
  peers_.clear();
129
189k
}
130
131
38.1k
void PeerManager::ClosePeersNotInConfig(const RaftConfigPB& config) {
132
38.1k
  std::unordered_map<std::string, RaftPeerPB> peers_in_config;
133
113k
  for (const RaftPeerPB &peer_pb : config.peers()) {
134
113k
    InsertOrDie(&peers_in_config, peer_pb.permanent_uuid(), peer_pb);
135
113k
  }
136
137
38.1k
  std::lock_guard<simple_spinlock> lock(lock_);
138
46.2k
  for (auto iter = peers_.begin(); iter != peers_.end();) {
139
8.13k
    auto peer = iter->second.get();
140
141
8.13k
    if (peer->peer_pb().permanent_uuid() == local_uuid_) {
142
0
      continue;
143
0
    }
144
145
8.13k
    auto it = peers_in_config.find(peer->peer_pb().permanent_uuid());
146
8.13k
    if (it == peers_in_config.end() ||
147
7.22k
        it->second.member_type() != peer->peer_pb().member_type()) {
148
1.89k
      peer->Close();
149
1.89k
      iter = peers_.erase(iter);
150
6.24k
    } else {
151
6.24k
      iter++;
152
6.24k
    }
153
8.13k
  }
154
38.1k
}
155
156
0
std::string PeerManager::LogPrefix() const {
157
0
  return MakeTabletLogPrefix(tablet_id_, local_uuid_);
158
0
}
159
160
} // namespace consensus
161
} // namespace yb