YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/master/flush_manager.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/master/flush_manager.h"
15
16
#include <glog/logging.h>
17
18
#include "yb/master/async_flush_tablets_task.h"
19
#include "yb/master/catalog_entity_info.h"
20
#include "yb/master/catalog_manager_if.h"
21
#include "yb/master/master_admin.pb.h"
22
#include "yb/master/master_error.h"
23
#include "yb/master/master_util.h"
24
#include "yb/master/ts_descriptor.h"
25
26
#include "yb/util/status_log.h"
27
#include "yb/util/trace.h"
28
29
namespace yb {
30
namespace master {
31
32
using std::map;
33
using std::vector;
34
35
Status FlushManager::FlushTables(const FlushTablesRequestPB* req,
36
7
                                 FlushTablesResponsePB* resp) {
37
7
  LOG(INFO) << "Servicing FlushTables request: " << req->ShortDebugString();
38
39
  // Check request.
40
7
  if (req->tables_size() == 0) {
41
0
    const Status s = STATUS(IllegalState, "Empty table list in flush tables request",
42
0
                            req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST));
43
0
    return SetupError(resp->mutable_error(), s);
44
0
  }
45
46
  // Create a new flush request UUID.
47
7
  const FlushRequestId flush_id = catalog_manager_->GenerateId();
48
49
7
  const auto tables = VERIFY_RESULT(
50
7
      catalog_manager_->CollectTables(req->tables(), req->add_indexes()));
51
52
  // Per TS tablet lists for all provided tables.
53
7
  map<TabletServerId, vector<TabletId>> ts_tablet_map;
54
7
  scoped_refptr<TableInfo> table;
55
56
7
  for (const TableDescription& table_description : tables) {
57
7
    table = table_description.table_info;
58
59
    // Prepare per Tablet Server tablet lists.
60
7
    for (const scoped_refptr<TabletInfo>& tablet : table_description.tablet_infos) {
61
7
      TRACE("Locking tablet");
62
7
      auto l = tablet->LockForRead();
63
64
7
      auto locs = tablet->GetReplicaLocations();
65
21
      for (const TabletReplicaMap::value_type& replica : *locs) {
66
21
        const TabletServerId ts_uuid = replica.second.ts_desc->permanent_uuid();
67
21
        ts_tablet_map[ts_uuid].push_back(tablet->id());
68
21
      }
69
7
    }
70
7
  }
71
72
7
  DCHECK_GT(ts_tablet_map.size(), 0);
73
74
7
  {
75
7
    std::lock_guard<LockType> l(lock_);
76
7
    TRACE("Acquired flush manager lock");
77
78
    // Init Tablet Server id lists in memory storage.
79
7
    TSFlushingInfo& flush_info = flush_requests_[flush_id];
80
7
    flush_info.clear();
81
82
21
    for (const auto& ts : ts_tablet_map) {
83
21
      flush_info.ts_flushing_.insert(ts.first);
84
21
    }
85
86
7
    DCHECK_EQ(flush_info.ts_flushing_.size(), ts_tablet_map.size());
87
7
  }
88
89
  // Clean-up complete flushing requests.
90
7
  DeleteCompleteFlushRequests();
91
92
7
  DCHECK_ONLY_NOTNULL(table.get());
93
94
7
  const bool is_compaction = req->is_compaction();
95
96
  // Send FlushTablets requests to all Tablet Servers (one TS - one request).
97
21
  for (const auto& ts : ts_tablet_map) {
98
    // Using last table async task queue.
99
21
    SendFlushTabletsRequest(ts.first, table, ts.second, flush_id, is_compaction);
100
21
  }
101
102
7
  resp->set_flush_request_id(flush_id);
103
104
7
  LOG(INFO) << "Successfully started flushing request " << flush_id;
105
106
7
  return Status::OK();
107
7
}
108
109
Status FlushManager::IsFlushTablesDone(const IsFlushTablesDoneRequestPB* req,
110
19
                                       IsFlushTablesDoneResponsePB* resp) {
111
19
  LOG(INFO) << "Servicing IsFlushTablesDone request: " << req->ShortDebugString();
112
113
19
  std::lock_guard<LockType> l(lock_);
114
19
  TRACE("Acquired flush manager lock");
115
116
  // Check flush request id.
117
19
  const FlushRequestMap::const_iterator it = flush_requests_.find(req->flush_request_id());
118
119
19
  if (it == flush_requests_.end()) {
120
0
    const Status s = STATUS(NotFound, "The flush request was not found", req->flush_request_id(),
121
0
                            MasterError(MasterErrorPB::INVALID_REQUEST));
122
0
    return SetupError(resp->mutable_error(), s);
123
0
  }
124
125
19
  const TSFlushingInfo& flush_info = it->second;
126
19
  resp->set_done(flush_info.ts_flushing_.empty());
127
19
  resp->set_success(flush_info.ts_failed_.empty());
128
129
0
  VLOG(1) << "IsFlushTablesDone request: " << req->flush_request_id()
130
0
          << " Done: " << resp->done() << " Success " << resp->success();
131
19
  return Status::OK();
132
19
}
133
134
void FlushManager::SendFlushTabletsRequest(const TabletServerId& ts_uuid,
135
                                           const scoped_refptr<TableInfo>& table,
136
                                           const vector<TabletId>& tablet_ids,
137
                                           const FlushRequestId& flush_id,
138
21
                                           const bool is_compaction) {
139
21
  auto call = std::make_shared<AsyncFlushTablets>(
140
21
      master_, catalog_manager_->AsyncTaskPool(), ts_uuid, table, tablet_ids, flush_id,
141
21
      is_compaction);
142
21
  table->AddTask(call);
143
21
  WARN_NOT_OK(catalog_manager_->ScheduleTask(call), "Failed to send flush tablets request");
144
21
}
145
146
void FlushManager::HandleFlushTabletsResponse(const FlushRequestId& flush_id,
147
                                              const TabletServerId& ts_uuid,
148
21
                                              const Status& status) {
149
21
  LOG(INFO) << "Handling Flush Tablets Response from TS " << ts_uuid
150
21
            << ". Status: " << status << ". Flush request id: " << flush_id;
151
152
21
  std::lock_guard<LockType> l(lock_);
153
21
  TRACE("Acquired flush manager lock");
154
155
  // Check current flush request id.
156
21
  const FlushRequestMap::iterator it = flush_requests_.find(flush_id);
157
158
21
  if (it == flush_requests_.end()) {
159
0
    LOG(WARNING) << "Old flush request id is in the flush tablets response: " << flush_id;
160
0
    return;
161
0
  }
162
163
21
  TSFlushingInfo& flush_info = it->second;
164
165
21
  if (flush_info.ts_flushing_.erase(ts_uuid) > 0) {
166
21
    (status.IsOk() ? flush_info.ts_succeed_ : flush_info.ts_failed_).insert(ts_uuid);
167
168
    // Finish this flush request operation.
169
21
    if (flush_info.ts_flushing_.empty()) {
170
7
      if (flush_info.ts_failed_.empty()) {
171
7
        LOG(INFO) << "Successfully complete flush table request: " << flush_id;
172
0
      } else {
173
0
        LOG(WARNING) << "Failed flush table request: " << flush_id;
174
0
      }
175
7
    }
176
21
  }
177
178
0
  VLOG(1) << "Flush table request: " << flush_id
179
0
          << ". Flushing " << flush_info.ts_flushing_.size()
180
0
          << "; Succeed " << flush_info.ts_succeed_.size()
181
0
          << "; Failed " << flush_info.ts_failed_.size() << " TServers";
182
21
}
183
184
7
void FlushManager::DeleteCompleteFlushRequests() {
185
7
  std::lock_guard<LockType> l(lock_);
186
7
  TRACE("Acquired flush manager lock");
187
188
  // Clean-up complete flushing requests.
189
14
  for (FlushRequestMap::iterator it = flush_requests_.begin(); it != flush_requests_.end();) {
190
7
    if (it->second.ts_flushing_.empty()) {
191
0
      it = flush_requests_.erase(it);
192
7
    } else {
193
7
      ++it;
194
7
    }
195
7
  }
196
7
}
197
198
} // namespace master
199
} // namespace yb