YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/client/client_master_rpc.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/client/client_master_rpc.h"
15
16
#include "yb/rpc/outbound_call.h"
17
#include "yb/rpc/rpc_header.pb.h"
18
19
#include "yb/util/logging.h"
20
21
DEFINE_int64(reset_master_leader_timeout_ms, 15000,
22
             "Timeout to reset master leader in milliseconds.");
23
24
using namespace std::literals;
25
using namespace std::placeholders;
26
27
namespace yb {
28
namespace client {
29
namespace internal {
30
31
ClientMasterRpcBase::ClientMasterRpcBase(YBClient::Data* client_data, CoarseTimePoint deadline)
32
    : Rpc(deadline, client_data->messenger_, client_data->proxy_cache_.get()),
33
      client_data_(DCHECK_NOTNULL(client_data)),
34
419k
      retained_self_(client_data->rpcs_.InvalidHandle()) {
35
419k
}
36
37
428k
void ClientMasterRpcBase::SendRpc() {
38
428k
  DCHECK(retained_self_ != client_data_->rpcs_.InvalidHandle());
39
40
428k
  auto now = CoarseMonoClock::Now();
41
428k
  if (retrier().deadline() < now) {
42
62
    Finished(STATUS_FORMAT(TimedOut, "Request $0 timed out after deadline expired", *this));
43
62
    return;
44
62
  }
45
46
428k
  auto rpc_deadline = now + client_data_->default_rpc_timeout_;
47
428k
  mutable_retrier()->mutable_controller()->set_deadline(
48
428k
      std::min(rpc_deadline, retrier().deadline()));
49
428k
  CallRemoteMethod();
50
428k
}
51
52
696
void ClientMasterRpcBase::ResetMasterLeader(Retry retry) {
53
696
  client_data_->SetMasterServerProxyAsync(
54
620
      retry ? retrier().deadline()
55
76
            : CoarseMonoClock::now() + FLAGS_reset_master_leader_timeout_ms * 1ms,
56
696
      false /* skip_resolution */,
57
696
      true, /* wait for leader election */
58
620
      retry ? std::bind(&ClientMasterRpcBase::NewLeaderMasterDeterminedCb, this, _1)
59
76
            : StdStatusCallback([](auto){}));
60
696
}
61
62
72
void ClientMasterRpcBase::NewLeaderMasterDeterminedCb(const Status& status) {
63
72
  if (status.ok()) {
64
71
    mutable_retrier()->mutable_controller()->Reset();
65
71
    SendRpc();
66
1
  } else {
67
1
    LOG(WARNING) << "Failed to determine new Master: " << status.ToString();
68
1
    ScheduleRetry(status);
69
1
  }
70
72
}
71
72
428k
void ClientMasterRpcBase::Finished(const Status& status) {
73
428k
  auto resp_status = ResponseStatus();
74
428k
  if (status.ok() && !resp_status.ok()) {
75
13.5k
    LOG_WITH_PREFIX(INFO) << "Failed, got resp error: " << resp_status;
76
414k
  } else if (!status.ok()) {
77
69
    LOG_WITH_PREFIX(INFO) << "Failed: " << status;
78
69
  }
79
80
428k
  Status new_status = status;
81
428k
  if (new_status.ok() &&
82
428k
      mutable_retrier()->HandleResponse(this, &new_status, rpc::RetryWhenBusy::kFalse)) {
83
0
    return;
84
0
  }
85
86
428k
  if (new_status.ok() && !resp_status.ok()) {
87
13.5k
    master::MasterError master_error(resp_status);
88
13.5k
    if (master_error == master::MasterErrorPB::NOT_THE_LEADER ||
89
13.5k
        master_error == master::MasterErrorPB::CATALOG_MANAGER_NOT_INITIALIZED) {
90
3
      LOG(WARNING) << ToString() << ": Leader Master has changed ("
91
3
                   << client_data_->leader_master_hostport().ToString()
92
3
                   << " is no longer the leader), re-trying...";
93
3
      ResetMasterLeader(Retry::kTrue);
94
3
      return;
95
3
    }
96
97
13.5k
    if (resp_status.IsLeaderNotReadyToServe() || resp_status.IsLeaderHasNoLease()) {
98
0
      LOG(WARNING) << ToString() << ": Leader Master "
99
0
                   << client_data_->leader_master_hostport().ToString()
100
0
                   << " does not have a valid exclusive lease: "
101
0
                   << resp_status << ", re-trying...";
102
0
      ResetMasterLeader(Retry::kTrue);
103
0
      return;
104
0
    }
105
0
    VLOG(2) << "resp.error().status()=" << resp_status;
106
13.5k
    new_status = resp_status;
107
13.5k
  }
108
109
428k
  if (new_status.IsTimedOut()) {
110
94
    auto now = CoarseMonoClock::Now();
111
94
    if (now < retrier().deadline()) {
112
18
      LOG(WARNING) << ToString() << ": Leader Master ("
113
18
          << client_data_->leader_master_hostport().ToString()
114
18
          << ") timed out, " << MonoDelta(retrier().deadline() - now) << " left, re-trying...";
115
18
      ResetMasterLeader(Retry::kTrue);
116
18
      return;
117
76
    } else {
118
      // Operation deadline expired during this latest RPC.
119
76
      new_status = new_status.CloneAndPrepend(
120
76
          "RPC timed out after deadline expired");
121
76
      ResetMasterLeader(Retry::kFalse);
122
76
    }
123
94
  }
124
125
428k
  if (new_status.IsNetworkError() || new_status.IsRemoteError()) {
126
599
    LOG(WARNING) << ToString() << ": Encountered a network error from the Master("
127
599
                 << client_data_->leader_master_hostport().ToString() << "): "
128
599
                 << new_status.ToString() << ", retrying...";
129
599
    ResetMasterLeader(Retry::kTrue);
130
599
    return;
131
599
  }
132
133
427k
  if (ShouldRetry(new_status)) {
134
8.67k
    if (CoarseMonoClock::Now() > retrier().deadline()) {
135
0
      if (new_status.ok()) {
136
0
        new_status = STATUS(TimedOut, ToString() + " timed out");
137
0
      }
138
0
      LOG(WARNING) << new_status;
139
8.67k
    } else {
140
8.67k
      auto backoff_strategy = rpc::BackoffStrategy::kLinear;
141
8.67k
      if (rpc::RpcError(new_status) == rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY) {
142
0
        backoff_strategy = rpc::BackoffStrategy::kExponential;
143
0
      }
144
8.67k
      new_status = mutable_retrier()->DelayedRetry(this, new_status, backoff_strategy);
145
8.67k
      if (new_status.ok()) {
146
8.67k
        return;
147
8.67k
      }
148
419k
    }
149
8.67k
  }
150
151
419k
  auto retained_self = client_data_->rpcs_.Unregister(&retained_self_);
152
153
419k
  ProcessResponse(new_status);
154
419k
}
155
156
13.8k
std::string ClientMasterRpcBase::LogPrefix() const {
157
13.8k
  return AsString(this) + ": ";
158
13.8k
}
159
160
} // namespace internal
161
} // namespace client
162
} // namespace yb