YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/consensus/peer_manager.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/consensus/peer_manager.h"
34
35
#include <mutex>
36
37
#include "yb/consensus/consensus_peers.h"
38
39
#include "yb/gutil/map-util.h"
40
41
#include "yb/util/logging.h"
42
#include "yb/util/threadpool.h"
43
44
DECLARE_bool(enable_multi_raft_heartbeat_batcher);
45
46
namespace yb {
47
namespace consensus {
48
49
using log::Log;
50
using strings::Substitute;
51
52
PeerManager::PeerManager(
53
    const std::string tablet_id,
54
    const std::string local_uuid,
55
    PeerProxyFactory* peer_proxy_factory,
56
    PeerMessageQueue* queue,
57
    ThreadPoolToken* raft_pool_token,
58
    consensus::MultiRaftManager* multi_raft_manager)
59
    : tablet_id_(tablet_id),
60
      local_uuid_(local_uuid),
61
      peer_proxy_factory_(peer_proxy_factory),
62
      queue_(queue),
63
      raft_pool_token_(raft_pool_token),
64
150k
      multi_raft_manager_(multi_raft_manager) {
65
150k
}
66
67
75.6k
PeerManager::~PeerManager() {
68
75.6k
  Close();
69
75.6k
}
70
71
67.8k
void PeerManager::UpdateRaftConfig(const RaftConfigPB& config) {
72
67.8k
  
VLOG_WITH_PREFIX6
(1) << "Updating peers from new config: " << config.ShortDebugString()6
;
73
74
67.8k
  std::lock_guard<simple_spinlock> lock(lock_);
75
  // Create new peers.
76
202k
  for (const RaftPeerPB& peer_pb : config.peers()) {
77
202k
    if (peers_.find(peer_pb.permanent_uuid()) != peers_.end()) {
78
11.6k
      continue;
79
11.6k
    }
80
191k
    if (peer_pb.permanent_uuid() == local_uuid_) {
81
67.8k
      continue;
82
67.8k
    }
83
84
123k
    
VLOG_WITH_PREFIX26
(1) << "Adding remote peer. Peer: " << peer_pb.ShortDebugString()26
;
85
123k
    MultiRaftHeartbeatBatcherPtr multi_raft_batcher = nullptr;
86
123k
    if (multi_raft_manager_) {
87
123k
      multi_raft_batcher = multi_raft_manager_->AddOrGetBatcher(peer_pb);
88
123k
    }
89
123k
    auto remote_peer = Peer::NewRemotePeer(
90
123k
        peer_pb, tablet_id_, local_uuid_, peer_proxy_factory_->NewProxy(peer_pb), queue_,
91
123k
        multi_raft_batcher, raft_pool_token_, consensus_, peer_proxy_factory_->messenger());
92
123k
    if (!remote_peer.ok()) {
93
0
      LOG_WITH_PREFIX(WARNING)
94
0
          << "Failed to create remote peer for " << peer_pb.ShortDebugString() << ": "
95
0
          << remote_peer.status();
96
0
      return;
97
0
    }
98
99
123k
    peers_[peer_pb.permanent_uuid()] = std::move(*remote_peer);
100
123k
  }
101
67.8k
}
102
103
9.68M
void PeerManager::SignalRequest(RequestTriggerMode trigger_mode) {
104
9.68M
  std::lock_guard<simple_spinlock> lock(lock_);
105
27.5M
  for (auto iter = peers_.begin(); iter != peers_.end();) {
106
17.8M
    Status s = iter->second->SignalRequest(trigger_mode);
107
17.8M
    if (PREDICT_FALSE(s.IsIllegalState())) {
108
0
      LOG_WITH_PREFIX(WARNING)
109
0
          << "Peer was closed: " << s << ", removing from peers. Peer: "
110
0
          << (*iter).second->peer_pb().ShortDebugString();
111
0
      iter = peers_.erase(iter);
112
17.8M
    } else {
113
17.8M
      if (PREDICT_FALSE(!s.ok())) {
114
0
        LOG_WITH_PREFIX(WARNING)
115
0
            << "Peer " << (*iter).second->peer_pb().ShortDebugString()
116
0
            << " failed to send request: " << s;
117
0
      }
118
17.8M
      iter++;
119
17.8M
    }
120
17.8M
  }
121
9.68M
}
122
123
312k
void PeerManager::Close() {
124
312k
  std::lock_guard<simple_spinlock> lock(lock_);
125
312k
  for (const auto& entry : peers_) {
126
71.6k
    entry.second->Close();
127
71.6k
  }
128
312k
  peers_.clear();
129
312k
}
130
131
67.9k
void PeerManager::ClosePeersNotInConfig(const RaftConfigPB& config) {
132
67.9k
  std::unordered_map<std::string, RaftPeerPB> peers_in_config;
133
202k
  for (const RaftPeerPB &peer_pb : config.peers()) {
134
202k
    InsertOrDie(&peers_in_config, peer_pb.permanent_uuid(), peer_pb);
135
202k
  }
136
137
67.9k
  std::lock_guard<simple_spinlock> lock(lock_);
138
83.4k
  for (auto iter = peers_.begin(); iter != peers_.end();) {
139
15.5k
    auto peer = iter->second.get();
140
141
15.5k
    if (peer->peer_pb().permanent_uuid() == local_uuid_) {
142
0
      continue;
143
0
    }
144
145
15.5k
    auto it = peers_in_config.find(peer->peer_pb().permanent_uuid());
146
15.5k
    if (it == peers_in_config.end() ||
147
15.5k
        
it->second.member_type() != peer->peer_pb().member_type()13.7k
) {
148
3.81k
      peer->Close();
149
3.81k
      iter = peers_.erase(iter);
150
11.6k
    } else {
151
11.6k
      iter++;
152
11.6k
    }
153
15.5k
  }
154
67.9k
}
155
156
1
std::string PeerManager::LogPrefix() const {
157
1
  return MakeTabletLogPrefix(tablet_id_, local_uuid_);
158
1
}
159
160
} // namespace consensus
161
} // namespace yb