/Users/deen/code/yugabyte-db/src/yb/master/flush_manager.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/master/flush_manager.h" |
15 | | |
16 | | #include <glog/logging.h> |
17 | | |
18 | | #include "yb/master/async_flush_tablets_task.h" |
19 | | #include "yb/master/catalog_entity_info.h" |
20 | | #include "yb/master/catalog_manager_if.h" |
21 | | #include "yb/master/master_admin.pb.h" |
22 | | #include "yb/master/master_error.h" |
23 | | #include "yb/master/master_util.h" |
24 | | #include "yb/master/ts_descriptor.h" |
25 | | |
26 | | #include "yb/util/status_log.h" |
27 | | #include "yb/util/trace.h" |
28 | | |
29 | | namespace yb { |
30 | | namespace master { |
31 | | |
32 | | using std::map; |
33 | | using std::vector; |
34 | | |
35 | | Status FlushManager::FlushTables(const FlushTablesRequestPB* req, |
36 | 7 | FlushTablesResponsePB* resp) { |
37 | 7 | LOG(INFO) << "Servicing FlushTables request: " << req->ShortDebugString(); |
38 | | |
39 | | // Check request. |
40 | 7 | if (req->tables_size() == 0) { |
41 | 0 | const Status s = STATUS(IllegalState, "Empty table list in flush tables request", |
42 | 0 | req->ShortDebugString(), MasterError(MasterErrorPB::INVALID_REQUEST)); |
43 | 0 | return SetupError(resp->mutable_error(), s); |
44 | 0 | } |
45 | | |
46 | | // Create a new flush request UUID. |
47 | 7 | const FlushRequestId flush_id = catalog_manager_->GenerateId(); |
48 | | |
49 | 7 | const auto tables = VERIFY_RESULT( |
50 | 7 | catalog_manager_->CollectTables(req->tables(), req->add_indexes())); |
51 | | |
52 | | // Per TS tablet lists for all provided tables. |
53 | 7 | map<TabletServerId, vector<TabletId>> ts_tablet_map; |
54 | 7 | scoped_refptr<TableInfo> table; |
55 | | |
56 | 7 | for (const TableDescription& table_description : tables) { |
57 | 7 | table = table_description.table_info; |
58 | | |
59 | | // Prepare per Tablet Server tablet lists. |
60 | 7 | for (const scoped_refptr<TabletInfo>& tablet : table_description.tablet_infos) { |
61 | 7 | TRACE("Locking tablet"); |
62 | 7 | auto l = tablet->LockForRead(); |
63 | | |
64 | 7 | auto locs = tablet->GetReplicaLocations(); |
65 | 21 | for (const TabletReplicaMap::value_type& replica : *locs) { |
66 | 21 | const TabletServerId ts_uuid = replica.second.ts_desc->permanent_uuid(); |
67 | 21 | ts_tablet_map[ts_uuid].push_back(tablet->id()); |
68 | 21 | } |
69 | 7 | } |
70 | 7 | } |
71 | | |
72 | 7 | DCHECK_GT(ts_tablet_map.size(), 0); |
73 | | |
74 | 7 | { |
75 | 7 | std::lock_guard<LockType> l(lock_); |
76 | 7 | TRACE("Acquired flush manager lock"); |
77 | | |
78 | | // Init Tablet Server id lists in memory storage. |
79 | 7 | TSFlushingInfo& flush_info = flush_requests_[flush_id]; |
80 | 7 | flush_info.clear(); |
81 | | |
82 | 21 | for (const auto& ts : ts_tablet_map) { |
83 | 21 | flush_info.ts_flushing_.insert(ts.first); |
84 | 21 | } |
85 | | |
86 | 7 | DCHECK_EQ(flush_info.ts_flushing_.size(), ts_tablet_map.size()); |
87 | 7 | } |
88 | | |
89 | | // Clean-up complete flushing requests. |
90 | 7 | DeleteCompleteFlushRequests(); |
91 | | |
92 | 7 | DCHECK_ONLY_NOTNULL(table.get()); |
93 | | |
94 | 7 | const bool is_compaction = req->is_compaction(); |
95 | | |
96 | | // Send FlushTablets requests to all Tablet Servers (one TS - one request). |
97 | 21 | for (const auto& ts : ts_tablet_map) { |
98 | | // Using last table async task queue. |
99 | 21 | SendFlushTabletsRequest(ts.first, table, ts.second, flush_id, is_compaction); |
100 | 21 | } |
101 | | |
102 | 7 | resp->set_flush_request_id(flush_id); |
103 | | |
104 | 7 | LOG(INFO) << "Successfully started flushing request " << flush_id; |
105 | | |
106 | 7 | return Status::OK(); |
107 | 7 | } |
108 | | |
109 | | Status FlushManager::IsFlushTablesDone(const IsFlushTablesDoneRequestPB* req, |
110 | 19 | IsFlushTablesDoneResponsePB* resp) { |
111 | 19 | LOG(INFO) << "Servicing IsFlushTablesDone request: " << req->ShortDebugString(); |
112 | | |
113 | 19 | std::lock_guard<LockType> l(lock_); |
114 | 19 | TRACE("Acquired flush manager lock"); |
115 | | |
116 | | // Check flush request id. |
117 | 19 | const FlushRequestMap::const_iterator it = flush_requests_.find(req->flush_request_id()); |
118 | | |
119 | 19 | if (it == flush_requests_.end()) { |
120 | 0 | const Status s = STATUS(NotFound, "The flush request was not found", req->flush_request_id(), |
121 | 0 | MasterError(MasterErrorPB::INVALID_REQUEST)); |
122 | 0 | return SetupError(resp->mutable_error(), s); |
123 | 0 | } |
124 | | |
125 | 19 | const TSFlushingInfo& flush_info = it->second; |
126 | 19 | resp->set_done(flush_info.ts_flushing_.empty()); |
127 | 19 | resp->set_success(flush_info.ts_failed_.empty()); |
128 | | |
129 | 0 | VLOG(1) << "IsFlushTablesDone request: " << req->flush_request_id() |
130 | 0 | << " Done: " << resp->done() << " Success " << resp->success(); |
131 | 19 | return Status::OK(); |
132 | 19 | } |
133 | | |
134 | | void FlushManager::SendFlushTabletsRequest(const TabletServerId& ts_uuid, |
135 | | const scoped_refptr<TableInfo>& table, |
136 | | const vector<TabletId>& tablet_ids, |
137 | | const FlushRequestId& flush_id, |
138 | 21 | const bool is_compaction) { |
139 | 21 | auto call = std::make_shared<AsyncFlushTablets>( |
140 | 21 | master_, catalog_manager_->AsyncTaskPool(), ts_uuid, table, tablet_ids, flush_id, |
141 | 21 | is_compaction); |
142 | 21 | table->AddTask(call); |
143 | 21 | WARN_NOT_OK(catalog_manager_->ScheduleTask(call), "Failed to send flush tablets request"); |
144 | 21 | } |
145 | | |
146 | | void FlushManager::HandleFlushTabletsResponse(const FlushRequestId& flush_id, |
147 | | const TabletServerId& ts_uuid, |
148 | 21 | const Status& status) { |
149 | 21 | LOG(INFO) << "Handling Flush Tablets Response from TS " << ts_uuid |
150 | 21 | << ". Status: " << status << ". Flush request id: " << flush_id; |
151 | | |
152 | 21 | std::lock_guard<LockType> l(lock_); |
153 | 21 | TRACE("Acquired flush manager lock"); |
154 | | |
155 | | // Check current flush request id. |
156 | 21 | const FlushRequestMap::iterator it = flush_requests_.find(flush_id); |
157 | | |
158 | 21 | if (it == flush_requests_.end()) { |
159 | 0 | LOG(WARNING) << "Old flush request id is in the flush tablets response: " << flush_id; |
160 | 0 | return; |
161 | 0 | } |
162 | | |
163 | 21 | TSFlushingInfo& flush_info = it->second; |
164 | | |
165 | 21 | if (flush_info.ts_flushing_.erase(ts_uuid) > 0) { |
166 | 21 | (status.IsOk() ? flush_info.ts_succeed_ : flush_info.ts_failed_).insert(ts_uuid); |
167 | | |
168 | | // Finish this flush request operation. |
169 | 21 | if (flush_info.ts_flushing_.empty()) { |
170 | 7 | if (flush_info.ts_failed_.empty()) { |
171 | 7 | LOG(INFO) << "Successfully complete flush table request: " << flush_id; |
172 | 0 | } else { |
173 | 0 | LOG(WARNING) << "Failed flush table request: " << flush_id; |
174 | 0 | } |
175 | 7 | } |
176 | 21 | } |
177 | | |
178 | 0 | VLOG(1) << "Flush table request: " << flush_id |
179 | 0 | << ". Flushing " << flush_info.ts_flushing_.size() |
180 | 0 | << "; Succeed " << flush_info.ts_succeed_.size() |
181 | 0 | << "; Failed " << flush_info.ts_failed_.size() << " TServers"; |
182 | 21 | } |
183 | | |
184 | 7 | void FlushManager::DeleteCompleteFlushRequests() { |
185 | 7 | std::lock_guard<LockType> l(lock_); |
186 | 7 | TRACE("Acquired flush manager lock"); |
187 | | |
188 | | // Clean-up complete flushing requests. |
189 | 14 | for (FlushRequestMap::iterator it = flush_requests_.begin(); it != flush_requests_.end();) { |
190 | 7 | if (it->second.ts_flushing_.empty()) { |
191 | 0 | it = flush_requests_.erase(it); |
192 | 7 | } else { |
193 | 7 | ++it; |
194 | 7 | } |
195 | 7 | } |
196 | 7 | } |
197 | | |
198 | | } // namespace master |
199 | | } // namespace yb |