YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/client/client_master_rpc.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/client/client_master_rpc.h"
15
16
#include "yb/rpc/outbound_call.h"
17
#include "yb/rpc/rpc_header.pb.h"
18
19
#include "yb/util/logging.h"
20
21
DEFINE_int64(reset_master_leader_timeout_ms, 15000,
22
             "Timeout to reset master leader in milliseconds.");
23
24
using namespace std::literals;
25
using namespace std::placeholders;
26
27
namespace yb {
28
namespace client {
29
namespace internal {
30
31
ClientMasterRpcBase::ClientMasterRpcBase(YBClient::Data* client_data, CoarseTimePoint deadline)
32
    : Rpc(deadline, client_data->messenger_, client_data->proxy_cache_.get()),
33
      client_data_(DCHECK_NOTNULL(client_data)),
34
583k
      retained_self_(client_data->rpcs_.InvalidHandle()) {
35
583k
}
36
37
593k
void ClientMasterRpcBase::SendRpc() {
38
593k
  DCHECK(retained_self_ != client_data_->rpcs_.InvalidHandle());
39
40
593k
  auto now = CoarseMonoClock::Now();
41
593k
  if (retrier().deadline() < now) {
42
10
    Finished(STATUS_FORMAT(TimedOut, "Request $0 timed out after deadline expired", *this));
43
10
    return;
44
10
  }
45
46
593k
  auto rpc_deadline = now + client_data_->default_rpc_timeout_;
47
593k
  mutable_retrier()->mutable_controller()->set_deadline(
48
593k
      std::min(rpc_deadline, retrier().deadline()));
49
593k
  CallRemoteMethod();
50
593k
}
51
52
853
void ClientMasterRpcBase::ResetMasterLeader(Retry retry) {
53
853
  client_data_->SetMasterServerProxyAsync(
54
853
      retry ? 
retrier().deadline()781
55
853
            : 
CoarseMonoClock::now() + FLAGS_reset_master_leader_timeout_ms * 1ms72
,
56
853
      false /* skip_resolution */,
57
853
      true, /* wait for leader election */
58
853
      retry ? 
std::bind(&ClientMasterRpcBase::NewLeaderMasterDeterminedCb, this, _1)781
59
853
            : 
StdStatusCallback([](auto)72
{}72
));
60
853
}
61
62
149
void ClientMasterRpcBase::NewLeaderMasterDeterminedCb(const Status& status) {
63
149
  if (status.ok()) {
64
110
    mutable_retrier()->mutable_controller()->Reset();
65
110
    SendRpc();
66
110
  } else {
67
39
    LOG(WARNING) << "Failed to determine new Master: " << status.ToString();
68
39
    ScheduleRetry(status);
69
39
  }
70
149
}
71
72
593k
void ClientMasterRpcBase::Finished(const Status& status) {
73
593k
  auto resp_status = ResponseStatus();
74
593k
  if (status.ok() && 
!resp_status.ok()593k
) {
75
15.4k
    LOG_WITH_PREFIX(INFO) << "Failed, got resp error: " << resp_status;
76
578k
  } else if (!status.ok()) {
77
63
    LOG_WITH_PREFIX(INFO) << "Failed: " << status;
78
63
  }
79
80
593k
  Status new_status = status;
81
593k
  if (new_status.ok() &&
82
593k
      
mutable_retrier()->HandleResponse(this, &new_status, rpc::RetryWhenBusy::kFalse)593k
) {
83
0
    return;
84
0
  }
85
86
593k
  if (new_status.ok() && 
!resp_status.ok()592k
) {
87
15.5k
    master::MasterError master_error(resp_status);
88
15.5k
    if (master_error == master::MasterErrorPB::NOT_THE_LEADER ||
89
15.5k
        
master_error == master::MasterErrorPB::CATALOG_MANAGER_NOT_INITIALIZED15.4k
) {
90
46
      LOG(WARNING) << ToString() << ": Leader Master has changed ("
91
46
                   << client_data_->leader_master_hostport().ToString()
92
46
                   << " is no longer the leader), re-trying...";
93
46
      ResetMasterLeader(Retry::kTrue);
94
46
      return;
95
46
    }
96
97
15.4k
    if (resp_status.IsLeaderNotReadyToServe() || resp_status.IsLeaderHasNoLease()) {
98
0
      LOG(WARNING) << ToString() << ": Leader Master "
99
0
                   << client_data_->leader_master_hostport().ToString()
100
0
                   << " does not have a valid exclusive lease: "
101
0
                   << resp_status << ", re-trying...";
102
0
      ResetMasterLeader(Retry::kTrue);
103
0
      return;
104
0
    }
105
15.4k
    VLOG
(2) << "resp.error().status()=" << resp_status0
;
106
15.4k
    new_status = resp_status;
107
15.4k
  }
108
109
593k
  if (new_status.IsTimedOut()) {
110
78
    auto now = CoarseMonoClock::Now();
111
78
    if (now < retrier().deadline()) {
112
6
      LOG(WARNING) << ToString() << ": Leader Master ("
113
6
          << client_data_->leader_master_hostport().ToString()
114
6
          << ") timed out, " << MonoDelta(retrier().deadline() - now) << " left, re-trying...";
115
6
      ResetMasterLeader(Retry::kTrue);
116
6
      return;
117
72
    } else {
118
      // Operation deadline expired during this latest RPC.
119
72
      new_status = STATUS_FORMAT(
120
72
          TimedOut, "$0 timed out after deadline expired, passed $1 of $2",
121
72
          *this, now - retrier().start(), retrier().deadline() - retrier().start());
122
72
      ResetMasterLeader(Retry::kFalse);
123
72
    }
124
78
  }
125
126
593k
  if (new_status.IsNetworkError() || 
new_status.IsRemoteError()592k
) {
127
729
    LOG(WARNING) << ToString() << ": Encountered a network error from the Master("
128
729
                 << client_data_->leader_master_hostport().ToString() << "): "
129
729
                 << new_status.ToString() << ", retrying...";
130
729
    ResetMasterLeader(Retry::kTrue);
131
729
    return;
132
729
  }
133
134
592k
  if (ShouldRetry(new_status)) {
135
9.53k
    if (CoarseMonoClock::Now() > retrier().deadline()) {
136
0
      if (new_status.ok()) {
137
0
        new_status = STATUS(TimedOut, ToString() + " timed out");
138
0
      }
139
0
      LOG(WARNING) << new_status;
140
9.53k
    } else {
141
9.53k
      auto backoff_strategy = rpc::BackoffStrategy::kLinear;
142
9.53k
      if (rpc::RpcError(new_status) == rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY) {
143
0
        backoff_strategy = rpc::BackoffStrategy::kExponential;
144
0
      }
145
9.53k
      new_status = mutable_retrier()->DelayedRetry(this, new_status, backoff_strategy);
146
9.54k
      if (
new_status.ok()9.53k
) {
147
9.54k
        return;
148
9.54k
      }
149
9.53k
    }
150
9.53k
  }
151
152
583k
  auto retained_self = client_data_->rpcs_.Unregister(&retained_self_);
153
154
583k
  ProcessResponse(new_status);
155
583k
}
156
157
15.8k
std::string ClientMasterRpcBase::LogPrefix() const {
158
15.8k
  return AsString(this) + ": ";
159
15.8k
}
160
161
} // namespace internal
162
} // namespace client
163
} // namespace yb