/Users/deen/code/yugabyte-db/src/yb/master/backfill_index.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/master/backfill_index.h" |
15 | | |
16 | | #include <pthread.h> |
17 | | #include <stdlib.h> |
18 | | #include <sys/types.h> |
19 | | |
20 | | #include <algorithm> |
21 | | #include <bitset> |
22 | | #include <functional> |
23 | | #include <memory> |
24 | | #include <mutex> |
25 | | #include <set> |
26 | | #include <string> |
27 | | #include <unordered_map> |
28 | | #include <vector> |
29 | | |
30 | | #include <boost/optional.hpp> |
31 | | #include <boost/preprocessor/cat.hpp> |
32 | | #include <glog/logging.h> |
33 | | |
34 | | #include "yb/common/partial_row.h" |
35 | | #include "yb/common/partition.h" |
36 | | #include "yb/common/wire_protocol.h" |
37 | | |
38 | | #include "yb/docdb/doc_rowwise_iterator.h" |
39 | | |
40 | | #include "yb/gutil/atomicops.h" |
41 | | #include "yb/gutil/callback.h" |
42 | | #include "yb/gutil/casts.h" |
43 | | #include "yb/gutil/map-util.h" |
44 | | #include "yb/gutil/mathlimits.h" |
45 | | #include "yb/gutil/ref_counted.h" |
46 | | #include "yb/gutil/stl_util.h" |
47 | | #include "yb/gutil/strings/escaping.h" |
48 | | #include "yb/gutil/strings/join.h" |
49 | | #include "yb/gutil/strings/substitute.h" |
50 | | #include "yb/gutil/sysinfo.h" |
51 | | |
52 | | #include "yb/master/master_fwd.h" |
53 | | #include "yb/master/async_rpc_tasks.h" |
54 | | #include "yb/master/catalog_manager.h" |
55 | | #include "yb/master/master.h" |
56 | | #include "yb/master/master_ddl.pb.h" |
57 | | #include "yb/master/sys_catalog.h" |
58 | | |
59 | | #include "yb/tablet/tablet.h" |
60 | | #include "yb/tablet/tablet_metadata.h" |
61 | | #include "yb/tablet/tablet_peer.h" |
62 | | |
63 | | #include "yb/tserver/tserver_admin.proxy.h" |
64 | | |
65 | | #include "yb/util/flag_tags.h" |
66 | | #include "yb/util/format.h" |
67 | | #include "yb/util/math_util.h" |
68 | | #include "yb/util/monotime.h" |
69 | | #include "yb/util/random_util.h" |
70 | | #include "yb/util/result.h" |
71 | | #include "yb/util/status_format.h" |
72 | | #include "yb/util/status_log.h" |
73 | | #include "yb/util/threadpool.h" |
74 | | #include "yb/util/trace.h" |
75 | | #include "yb/util/uuid.h" |
76 | | |
77 | | DEFINE_int32(ysql_index_backfill_rpc_timeout_ms, 60 * 1000, // 1 min. |
78 | | "Timeout used by the master when attempting to backfill a YSQL tablet during index " |
79 | | "creation."); |
80 | | TAG_FLAG(ysql_index_backfill_rpc_timeout_ms, advanced); |
81 | | TAG_FLAG(ysql_index_backfill_rpc_timeout_ms, runtime); |
82 | | |
83 | | DEFINE_int32(index_backfill_rpc_timeout_ms, 1 * 30 * 1000, // 30 sec. |
84 | | "Timeout used by the master when attempting to backfill a tablet " |
85 | | "during index creation."); |
86 | | TAG_FLAG(index_backfill_rpc_timeout_ms, advanced); |
87 | | TAG_FLAG(index_backfill_rpc_timeout_ms, runtime); |
88 | | |
89 | | DEFINE_int32(index_backfill_rpc_max_retries, 150, |
90 | | "Number of times to retry backfilling a tablet chunk " |
91 | | "during index creation."); |
92 | | TAG_FLAG(index_backfill_rpc_max_retries, advanced); |
93 | | TAG_FLAG(index_backfill_rpc_max_retries, runtime); |
94 | | |
95 | | DEFINE_int32(index_backfill_rpc_max_delay_ms, 10 * 60 * 1000, // 10 min. |
96 | | "Maximum delay before retrying a backfill tablet chunk request " |
97 | | "during index creation."); |
98 | | TAG_FLAG(index_backfill_rpc_max_delay_ms, advanced); |
99 | | TAG_FLAG(index_backfill_rpc_max_delay_ms, runtime); |
100 | | |
101 | | DEFINE_int32(index_backfill_wait_for_alter_table_completion_ms, 100, |
102 | | "Delay before retrying to see if an in-progress alter table has " |
103 | | "completed, during index backfill."); |
104 | | TAG_FLAG(index_backfill_wait_for_alter_table_completion_ms, advanced); |
105 | | TAG_FLAG(index_backfill_wait_for_alter_table_completion_ms, runtime); |
106 | | |
107 | | DEFINE_bool(defer_index_backfill, false, |
108 | | "Defer index backfill so that backfills can be performed as a batch later on."); |
109 | | TAG_FLAG(defer_index_backfill, advanced); |
110 | | TAG_FLAG(defer_index_backfill, runtime); |
111 | | |
112 | | DEFINE_test_flag(int32, slowdown_backfill_alter_table_rpcs_ms, 0, |
113 | | "Slows down the send alter table rpc's so that the master may be stopped between " |
114 | | "different phases."); |
115 | | |
116 | | DEFINE_test_flag( |
117 | | int32, slowdown_backfill_job_deletion_ms, 0, |
118 | | "Slows down backfill job deletion so that backfill job can be read by test."); |
119 | | |
120 | | namespace yb { |
121 | | namespace master { |
122 | | |
123 | | using namespace std::literals; |
124 | | using server::MonitoredTaskState; |
125 | | using strings::Substitute; |
126 | | using tserver::TabletServerErrorPB; |
127 | | |
128 | | namespace { |
129 | | |
130 | | // Peek into pg_index table to get an index (boolean) status from YSQL perspective. |
131 | | Result<bool> GetPgIndexStatus( |
132 | | CatalogManager* catalog_manager, |
133 | | const TableId& idx_id, |
134 | 0 | const std::string& status_col_name) { |
135 | 0 | const auto pg_index_id = |
136 | 0 | GetPgsqlTableId(VERIFY_RESULT(GetPgsqlDatabaseOid(idx_id)), kPgIndexTableOid); |
137 | |
|
138 | 0 | const tablet::Tablet* catalog_tablet = |
139 | 0 | catalog_manager->tablet_peer()->tablet(); |
140 | 0 | const Schema& pg_index_schema = |
141 | 0 | *VERIFY_RESULT(catalog_tablet->metadata()->GetTableInfo(pg_index_id))->schema; |
142 | |
|
143 | 0 | Schema projection; |
144 | 0 | RETURN_NOT_OK(pg_index_schema.CreateProjectionByNames({"indexrelid", status_col_name}, |
145 | 0 | &projection, |
146 | 0 | pg_index_schema.num_key_columns())); |
147 | |
|
148 | 0 | const auto indexrelid_col_id = VERIFY_RESULT(projection.ColumnIdByName("indexrelid")).rep(); |
149 | 0 | const auto status_col_id = VERIFY_RESULT(projection.ColumnIdByName(status_col_name)).rep(); |
150 | |
|
151 | 0 | const auto idx_oid = VERIFY_RESULT(GetPgsqlTableOid(idx_id)); |
152 | |
|
153 | 0 | auto iter = VERIFY_RESULT(catalog_tablet->NewRowIterator(projection.CopyWithoutColumnIds(), |
154 | 0 | {} /* read_hybrid_time */, |
155 | 0 | pg_index_id)); |
156 | | |
157 | | // Filtering by 'indexrelid' == idx_oid. |
158 | 0 | { |
159 | 0 | auto doc_iter = down_cast<docdb::DocRowwiseIterator*>(iter.get()); |
160 | 0 | PgsqlConditionPB cond; |
161 | 0 | cond.add_operands()->set_column_id(indexrelid_col_id); |
162 | 0 | cond.set_op(QL_OP_EQUAL); |
163 | 0 | cond.add_operands()->mutable_value()->set_uint32_value(idx_oid); |
164 | 0 | const std::vector<docdb::PrimitiveValue> empty_key_components; |
165 | 0 | docdb::DocPgsqlScanSpec spec(projection, |
166 | 0 | rocksdb::kDefaultQueryId, |
167 | 0 | empty_key_components, |
168 | 0 | empty_key_components, |
169 | 0 | &cond, |
170 | 0 | boost::none /* hash_code */, |
171 | 0 | boost::none /* max_hash_code */, |
172 | 0 | nullptr /* where_expr */); |
173 | 0 | RETURN_NOT_OK(doc_iter->Init(spec)); |
174 | 0 | } |
175 | | |
176 | | // Expecting one row at most. |
177 | 0 | QLTableRow row; |
178 | 0 | if (VERIFY_RESULT(iter->HasNext())) { |
179 | 0 | RETURN_NOT_OK(iter->NextRow(&row)); |
180 | 0 | return row.GetColumn(status_col_id)->bool_value(); |
181 | 0 | } |
182 | | |
183 | | // For practical purposes, an absent index is the same as having false status column value. |
184 | 0 | return false; |
185 | 0 | } |
186 | | |
187 | | // Before advancing index permissions, we need to make sure Postgres side has advanced sufficiently |
188 | | // - that the state tracked in pg_index haven't fallen behind from the desired permission |
189 | | // for more than one step. |
190 | | Result<bool> ShouldProceedWithPgsqlIndexPermissionUpdate( |
191 | | CatalogManager* catalog_manager, |
192 | | const TableId& idx_id, |
193 | 88 | IndexPermissions new_perm) { |
194 | | // TODO(alex, jason): Add the appropriate cases for dropping index path |
195 | 88 | switch (new_perm) { |
196 | 0 | case INDEX_PERM_WRITE_AND_DELETE: { |
197 | 0 | auto live = VERIFY_RESULT(GetPgIndexStatus(catalog_manager, idx_id, "indislive")); |
198 | 0 | if (!live) { |
199 | 0 | VLOG(1) << "Index " << idx_id << " is not yet live, skipping permission update"; |
200 | 0 | } |
201 | 0 | return live; |
202 | 0 | } |
203 | 0 | case INDEX_PERM_DO_BACKFILL: { |
204 | 0 | auto ready = VERIFY_RESULT(GetPgIndexStatus(catalog_manager, idx_id, "indisready")); |
205 | 0 | if (!ready) { |
206 | 0 | VLOG(1) << "Index " << idx_id << " is not yet ready, skipping permission update"; |
207 | 0 | } |
208 | 0 | return ready; |
209 | 0 | } |
210 | 88 | default: |
211 | | // No need to wait for anything |
212 | 88 | return true; |
213 | 88 | } |
214 | 88 | } |
215 | | |
216 | | } // namespace |
217 | | |
218 | 2.51k | void MultiStageAlterTable::CopySchemaDetailsToFullyApplied(SysTablesEntryPB* pb) { |
219 | 0 | VLOG(4) << "Setting fully_applied_schema_version to " << pb->version(); |
220 | 2.51k | pb->mutable_fully_applied_schema()->CopyFrom(pb->schema()); |
221 | 2.51k | pb->set_fully_applied_schema_version(pb->version()); |
222 | 2.51k | pb->mutable_fully_applied_indexes()->CopyFrom(pb->indexes()); |
223 | 2.51k | if (pb->has_index_info()) { |
224 | 0 | pb->mutable_fully_applied_index_info()->CopyFrom(pb->index_info()); |
225 | 0 | } |
226 | 2.51k | } |
227 | | |
228 | | Status MultiStageAlterTable::ClearFullyAppliedAndUpdateState( |
229 | | CatalogManager* catalog_manager, |
230 | | const scoped_refptr<TableInfo>& table, |
231 | | boost::optional<uint32_t> expected_version, |
232 | 4.39k | bool update_state_to_running) { |
233 | 4.39k | auto l = table->LockForWrite(); |
234 | 4.39k | uint32_t current_version = l->pb.version(); |
235 | 4.39k | if (expected_version && *expected_version != current_version) { |
236 | 3 | return STATUS(AlreadyPresent, "Table has already moved to a different version."); |
237 | 3 | } |
238 | 4.39k | l.mutable_data()->pb.clear_fully_applied_schema(); |
239 | 4.39k | l.mutable_data()->pb.clear_fully_applied_schema_version(); |
240 | 4.39k | l.mutable_data()->pb.clear_fully_applied_indexes(); |
241 | 4.39k | l.mutable_data()->pb.clear_fully_applied_index_info(); |
242 | 3.89k | auto new_state = update_state_to_running ? SysTablesEntryPB::RUNNING : SysTablesEntryPB::ALTERING; |
243 | 4.39k | l.mutable_data()->set_state(new_state, Format("Current schema version=$0", current_version)); |
244 | | |
245 | 4.39k | Status s = catalog_manager->sys_catalog_->Upsert(catalog_manager->leader_ready_term(), table); |
246 | 4.39k | if (!s.ok()) { |
247 | 0 | LOG(WARNING) << "An error occurred while updating sys-tables: " << s.ToString() |
248 | 0 | << ". This master may not be the leader anymore."; |
249 | 0 | return s; |
250 | 0 | } |
251 | | |
252 | 4.39k | l.Commit(); |
253 | 4.39k | LOG(INFO) << table->ToString() << " - Alter table completed version=" << current_version |
254 | 4.39k | << ", state: " << SysTablesEntryPB::State_Name(new_state); |
255 | 4.39k | return Status::OK(); |
256 | 4.39k | } |
257 | | |
258 | | Result<bool> MultiStageAlterTable::UpdateIndexPermission( |
259 | | CatalogManager* catalog_manager, |
260 | | const scoped_refptr<TableInfo>& indexed_table, |
261 | | const std::unordered_map<TableId, IndexPermissions>& perm_mapping, |
262 | 1.65k | boost::optional<uint32_t> current_version) { |
263 | 1.65k | TRACE(__func__); |
264 | 5 | DVLOG(3) << __PRETTY_FUNCTION__ << " " << yb::ToString(*indexed_table); |
265 | 1.65k | if (FLAGS_TEST_slowdown_backfill_alter_table_rpcs_ms > 0) { |
266 | 179 | TRACE("Sleeping for $0 ms", FLAGS_TEST_slowdown_backfill_alter_table_rpcs_ms); |
267 | 5 | DVLOG(3) << __PRETTY_FUNCTION__ << " " << yb::ToString(*indexed_table) << " sleeping for " |
268 | 5 | << FLAGS_TEST_slowdown_backfill_alter_table_rpcs_ms |
269 | 5 | << "ms BEFORE updating the index permission to " << ToString(perm_mapping); |
270 | 179 | SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_slowdown_backfill_alter_table_rpcs_ms)); |
271 | 6 | DVLOG(3) << __PRETTY_FUNCTION__ << " Done Sleeping"; |
272 | 179 | TRACE("Done Sleeping"); |
273 | 179 | } |
274 | | |
275 | 1.65k | bool permissions_updated = false; |
276 | 1.65k | { |
277 | 1.65k | TRACE("Locking indexed table"); |
278 | 1.65k | auto l = indexed_table->LockForWrite(); |
279 | 1.65k | auto& indexed_table_data = *l.mutable_data(); |
280 | 1.65k | auto& indexed_table_pb = indexed_table_data.pb; |
281 | 1.65k | if (current_version && *current_version != indexed_table_pb.version()) { |
282 | 160 | LOG(INFO) << "The table schema version " |
283 | 160 | << "seems to have already been updated to " << indexed_table_pb.version() |
284 | 160 | << " We wanted to do this update at " << *current_version; |
285 | 160 | return STATUS_SUBSTITUTE( |
286 | 160 | AlreadyPresent, "Schema was already updated to $0 before we got to it (expected $1).", |
287 | 160 | indexed_table_pb.version(), *current_version); |
288 | 160 | } |
289 | | |
290 | 1.49k | CopySchemaDetailsToFullyApplied(&indexed_table_pb); |
291 | 1.49k | bool is_pgsql = indexed_table_pb.table_type() == TableType::PGSQL_TABLE_TYPE; |
292 | 4.25k | for (int i = 0; i < indexed_table_pb.indexes_size(); i++) { |
293 | 2.76k | IndexInfoPB* idx_pb = indexed_table_pb.mutable_indexes(i); |
294 | 2.76k | auto& idx_table_id = idx_pb->table_id(); |
295 | 2.76k | if (perm_mapping.find(idx_table_id) != perm_mapping.end()) { |
296 | 1.51k | const auto new_perm = perm_mapping.at(idx_table_id); |
297 | 1.51k | if (idx_pb->index_permissions() >= new_perm) { |
298 | 1 | LOG(WARNING) << "Index " << idx_pb->table_id() << " on table " |
299 | 1 | << indexed_table->ToString() << " has index_permission " |
300 | 1 | << IndexPermissions_Name(idx_pb->index_permissions()) << " already past " |
301 | 1 | << IndexPermissions_Name(new_perm) << ". Will not update it"; |
302 | 1 | continue; |
303 | 1 | } |
304 | | // TODO(alex, amit): Non-OK status here should be converted to TryAgain, |
305 | | // which should be handled on an upper level. |
306 | 1.51k | if (is_pgsql && !VERIFY_RESULT(ShouldProceedWithPgsqlIndexPermissionUpdate(catalog_manager, |
307 | 1.51k | idx_table_id, |
308 | 0 | new_perm))) { |
309 | 0 | continue; |
310 | 0 | } |
311 | 1.51k | idx_pb->set_index_permissions(new_perm); |
312 | 1.51k | permissions_updated = true; |
313 | 1.51k | } |
314 | 2.76k | } |
315 | | |
316 | 1.49k | if (permissions_updated) { |
317 | 1.48k | indexed_table_pb.set_version(indexed_table_pb.version() + 1); |
318 | 1.48k | indexed_table_pb.set_updates_only_index_permissions(true); |
319 | 2 | } else { |
320 | 1 | VLOG(1) << "Index permissions update skipped, leaving schema_version at " |
321 | 1 | << indexed_table_pb.version(); |
322 | 2 | } |
323 | 1.49k | indexed_table_data.set_state( |
324 | 1.49k | SysTablesEntryPB::ALTERING, |
325 | 1.49k | Format("Update index permission version=$0 ts=$1", |
326 | 1.49k | indexed_table_pb.version(), LocalTimeAsString())); |
327 | | |
328 | | // Update sys-catalog with the new indexed table info. |
329 | 1.49k | TRACE("Updating indexed table metadata on disk"); |
330 | 1.49k | RETURN_NOT_OK(catalog_manager->sys_catalog_->Upsert( |
331 | 1.49k | catalog_manager->leader_ready_term(), indexed_table)); |
332 | | |
333 | | // Update the in-memory state. |
334 | 1.49k | TRACE("Committing in-memory state"); |
335 | 1.49k | l.Commit(); |
336 | 1.49k | } |
337 | 1.49k | if (PREDICT_FALSE(FLAGS_TEST_slowdown_backfill_alter_table_rpcs_ms > 0)) { |
338 | 138 | TRACE("Sleeping for $0 ms", |
339 | 138 | FLAGS_TEST_slowdown_backfill_alter_table_rpcs_ms); |
340 | 5 | DVLOG(3) << __PRETTY_FUNCTION__ << " " << yb::ToString(*indexed_table) << " sleeping for " |
341 | 5 | << FLAGS_TEST_slowdown_backfill_alter_table_rpcs_ms |
342 | 5 | << "ms AFTER updating the index permission to " << ToString(perm_mapping); |
343 | 138 | SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_slowdown_backfill_alter_table_rpcs_ms)); |
344 | 12 | DVLOG(3) << __PRETTY_FUNCTION__ << " Done Sleeping"; |
345 | 138 | TRACE("Done Sleeping"); |
346 | 138 | } |
347 | 1.49k | return permissions_updated; |
348 | 1.49k | } |
349 | | |
350 | | Status MultiStageAlterTable::StartBackfillingData( |
351 | | CatalogManager* catalog_manager, |
352 | | const scoped_refptr<TableInfo>& indexed_table, |
353 | | const std::vector<IndexInfoPB>& idx_infos, |
354 | 499 | boost::optional<uint32_t> current_version) { |
355 | | // We leave the table state as ALTERING so that a master failover can resume the backfill. |
356 | 499 | RETURN_NOT_OK(ClearFullyAppliedAndUpdateState( |
357 | 499 | catalog_manager, indexed_table, current_version, /* change_state to RUNNING */ false)); |
358 | | |
359 | 499 | RETURN_NOT_OK(indexed_table->SetIsBackfilling()); |
360 | | |
361 | 452 | TRACE("Starting backfill process"); |
362 | 452 | VLOG(0) << __func__ << " starting backfill on " << indexed_table->ToString() << " for " |
363 | 452 | << yb::ToString(idx_infos); |
364 | | |
365 | 452 | auto ns_info = catalog_manager->FindNamespaceById(indexed_table->namespace_id()); |
366 | 452 | RETURN_NOT_OK_PREPEND(ns_info, "Unable to get namespace info for backfill"); |
367 | | |
368 | 452 | auto backfill_table = std::make_shared<BackfillTable>( |
369 | 452 | catalog_manager->master_, catalog_manager->AsyncTaskPool(), indexed_table, idx_infos, |
370 | 452 | *ns_info); |
371 | 452 | backfill_table->Launch(); |
372 | 452 | return Status::OK(); |
373 | 452 | } |
374 | | |
375 | | // Returns true, if the said IndexPermissions is a transient state. |
376 | | // Returns false, if it is a state where the index can be. viz: READ_WRITE_AND_DELETE |
377 | | // INDEX_UNUSED is considered transcient because it needs to delete the index. |
378 | 0 | bool IsTransientState(IndexPermissions perm) { |
379 | 0 | return perm != INDEX_PERM_READ_WRITE_AND_DELETE && perm != INDEX_PERM_NOT_USED; |
380 | 0 | } |
381 | | |
382 | 1.15k | IndexPermissions NextPermission(IndexPermissions perm) { |
383 | 1.15k | switch (perm) { |
384 | 462 | case INDEX_PERM_DELETE_ONLY: |
385 | 462 | return INDEX_PERM_WRITE_AND_DELETE; |
386 | 438 | case INDEX_PERM_WRITE_AND_DELETE: |
387 | 438 | return INDEX_PERM_DO_BACKFILL; |
388 | 0 | case INDEX_PERM_DO_BACKFILL: |
389 | 0 | CHECK(false) << "Not expected to be here."; |
390 | 0 | return INDEX_PERM_DELETE_ONLY; |
391 | 0 | case INDEX_PERM_READ_WRITE_AND_DELETE: |
392 | 0 | CHECK(false) << "Not expected to be here."; |
393 | 0 | return INDEX_PERM_DELETE_ONLY; |
394 | 135 | case INDEX_PERM_WRITE_AND_DELETE_WHILE_REMOVING: |
395 | 135 | return INDEX_PERM_DELETE_ONLY_WHILE_REMOVING; |
396 | 120 | case INDEX_PERM_DELETE_ONLY_WHILE_REMOVING: |
397 | 120 | return INDEX_PERM_INDEX_UNUSED; |
398 | 0 | case INDEX_PERM_INDEX_UNUSED: |
399 | 0 | case INDEX_PERM_NOT_USED: |
400 | 0 | CHECK(false) << "Not expected to be here."; |
401 | 0 | return INDEX_PERM_DELETE_ONLY; |
402 | 0 | } |
403 | 0 | CHECK(false) << "Not expected to be here."; |
404 | 0 | return INDEX_PERM_DELETE_ONLY; |
405 | 0 | } |
406 | | |
407 | | Status MultiStageAlterTable::LaunchNextTableInfoVersionIfNecessary( |
408 | | CatalogManager* catalog_manager, const scoped_refptr<TableInfo>& indexed_table, |
409 | 5.56k | uint32_t current_version, bool respect_backfill_deferrals) { |
410 | 7 | DVLOG(3) << __PRETTY_FUNCTION__ << " " << yb::ToString(*indexed_table); |
411 | | |
412 | 5.56k | const bool is_ysql_table = (indexed_table->GetTableType() == TableType::PGSQL_TABLE_TYPE); |
413 | 5.56k | const bool defer_backfill = !is_ysql_table && GetAtomicFlag(&FLAGS_defer_index_backfill); |
414 | 5.56k | const bool is_backfilling = indexed_table->IsBackfilling(); |
415 | | |
416 | 5.56k | std::unordered_map<TableId, IndexPermissions> indexes_to_update; |
417 | 5.56k | vector<IndexInfoPB> indexes_to_backfill; |
418 | 5.56k | vector<IndexInfoPB> deferred_indexes; |
419 | 5.56k | vector<IndexInfoPB> indexes_to_delete; |
420 | 5.56k | { |
421 | 5.56k | TRACE("Locking indexed table"); |
422 | 11 | VLOG(1) << ("Locking indexed table"); |
423 | 5.56k | auto l = indexed_table->LockForRead(); |
424 | 7 | VLOG(1) << ("Locked indexed table"); |
425 | 5.56k | if (current_version != l->pb.version()) { |
426 | 0 | LOG(WARNING) << "Somebody launched the next version before we got to it."; |
427 | 0 | return Status::OK(); |
428 | 0 | } |
429 | | |
430 | | // Attempt to find an index that requires us to just launch the next state (i.e. not backfill) |
431 | 9.95k | for (int i = 0; i < l->pb.indexes_size(); i++) { |
432 | 4.38k | const IndexInfoPB& idx_pb = l->pb.indexes(i); |
433 | 4.38k | if (!idx_pb.has_index_permissions()) { |
434 | 402 | continue; |
435 | 402 | } |
436 | 3.98k | if (idx_pb.index_permissions() == INDEX_PERM_DO_BACKFILL) { |
437 | 706 | if (respect_backfill_deferrals && (defer_backfill || idx_pb.is_backfill_deferred())) { |
438 | 23 | LOG(INFO) << "Deferring index-backfill for " << idx_pb.table_id(); |
439 | 23 | deferred_indexes.emplace_back(idx_pb); |
440 | 683 | } else { |
441 | 683 | indexes_to_backfill.emplace_back(idx_pb); |
442 | 683 | } |
443 | 3.27k | } else if (idx_pb.index_permissions() == INDEX_PERM_INDEX_UNUSED) { |
444 | 143 | indexes_to_delete.emplace_back(idx_pb); |
445 | | // For YSQL, there should never be indexes to update from master side because postgres drives |
446 | | // permission changes. |
447 | 3.13k | } else if (idx_pb.index_permissions() != INDEX_PERM_READ_WRITE_AND_DELETE && !is_ysql_table) { |
448 | 1.15k | indexes_to_update.emplace(idx_pb.table_id(), NextPermission(idx_pb.index_permissions())); |
449 | 1.15k | } |
450 | 3.98k | } |
451 | | |
452 | | // TODO(#6218): Do we really not want to continue backfill |
453 | | // across master failovers for YSQL? |
454 | 5.56k | if (!is_ysql_table && !is_backfilling && l.data().pb.backfill_jobs_size() > 0) { |
455 | | // If a backfill job was started for a set of indexes and then the leader |
456 | | // fails over, we should be careful that we are restarting the backfill job |
457 | | // with the same set of indexes. |
458 | | // A new index could have been added since the time the last backfill job started on |
459 | | // the old master. The safe time calculated for the earlier set of indexes may not be |
460 | | // valid for the new index(es) to use. |
461 | 0 | DCHECK(l.data().pb.backfill_jobs_size() == 1) << "For now we only expect to have up to 1 " |
462 | 0 | "outstanding backfill job."; |
463 | 0 | const BackfillJobPB& backfill_job = l.data().pb.backfill_jobs(0); |
464 | 0 | VLOG(3) << "Found an in-progress backfill-job " << yb::ToString(backfill_job); |
465 | | // Do not allow for any other indexes to piggy back with this backfill. |
466 | 0 | indexes_to_backfill.clear(); |
467 | 0 | deferred_indexes.clear(); |
468 | 0 | for (int i = 0; i < backfill_job.indexes_size(); i++) { |
469 | 0 | const IndexInfoPB& idx_pb = backfill_job.indexes(i); |
470 | 0 | indexes_to_backfill.push_back(idx_pb); |
471 | 0 | } |
472 | 0 | } |
473 | 5.56k | } |
474 | | |
475 | 5.56k | if (indexes_to_update.empty() && |
476 | 4.44k | indexes_to_delete.empty() && |
477 | 4.30k | (is_backfilling || indexes_to_backfill.empty())) { |
478 | 3.89k | TRACE("Not necessary to launch next version"); |
479 | 1 | VLOG(1) << "Not necessary to launch next version"; |
480 | 3.89k | return ClearFullyAppliedAndUpdateState( |
481 | 3.89k | catalog_manager, indexed_table, current_version, /* change state to RUNNING */ true); |
482 | 3.89k | } |
483 | | |
484 | | // For YSQL online schema migration of indexes, instead of master driving the schema changes, |
485 | | // postgres will drive it. Postgres will use three of the DocDB index permissions: |
486 | | // |
487 | | // - INDEX_PERM_WRITE_AND_DELETE (set from the start) |
488 | | // - INDEX_PERM_READ_WRITE_AND_DELETE (set by master) |
489 | | // - INDEX_PERM_WRITE_AND_DELETE_WHILE_REMOVING (set by master) |
490 | | // |
491 | | // This changes how we treat indexes_to_foo: |
492 | | // |
493 | | // - indexes_to_update should always be empty because we never want master to set index |
494 | | // permissions. |
495 | | // - indexes_to_delete is impossible to be nonempty, and, in the future, when we do use |
496 | | // INDEX_PERM_INDEX_UNUSED, we want to use some other delete trigger that makes sure no |
497 | | // transactions are left using the index. Prepare for that by doing nothing when nonempty. |
498 | | // - indexes_to_backfill is impossible to be nonempty, but, in the future, we want to set |
499 | | // INDEX_PERM_DO_BACKFILL so that backfill resumes on master leader changes. Prepare for that |
500 | | // by handling indexes_to_backfill like for YCQL. |
501 | | // |
502 | | // TODO(jason): when using INDEX_PERM_DO_BACKFILL, update this comment (issue #6218). |
503 | | |
504 | 1.67k | if (!indexes_to_update.empty()) { |
505 | 4 | VLOG(1) << "Updating index permissions for " << yb::ToString(indexes_to_update) << " on " |
506 | 4 | << indexed_table->ToString(); |
507 | 1.11k | Result<bool> permissions_updated = |
508 | 959 | VERIFY_RESULT(UpdateIndexPermission(catalog_manager, indexed_table, indexes_to_update, |
509 | 959 | current_version)); |
510 | | |
511 | 959 | if (!permissions_updated.ok()) { |
512 | 0 | LOG(WARNING) << "Could not update index permissions." |
513 | 0 | << " Possible that the master-leader has changed, or a race " |
514 | 0 | << "with another thread trying to launch next version: " |
515 | 0 | << permissions_updated.ToString(); |
516 | 0 | } |
517 | | |
518 | 959 | if (permissions_updated.ok() && *permissions_updated) { |
519 | 4 | VLOG(1) << "Sending alter table request with updated permissions"; |
520 | 959 | RETURN_NOT_OK(catalog_manager->SendAlterTableRequest(indexed_table)); |
521 | 959 | return Status::OK(); |
522 | 553 | } |
523 | 959 | } |
524 | | |
525 | 553 | if (!indexes_to_delete.empty()) { |
526 | 143 | const auto& index_info_to_update = indexes_to_delete[0]; |
527 | 1 | VLOG(3) << "Deleting the index and the entry in the indexed table for " |
528 | 1 | << yb::ToString(index_info_to_update); |
529 | 143 | DeleteTableRequestPB req; |
530 | 143 | DeleteTableResponsePB resp; |
531 | 143 | req.mutable_table()->set_table_id(index_info_to_update.table_id()); |
532 | 143 | req.set_is_index_table(true); |
533 | 143 | RETURN_NOT_OK(catalog_manager->DeleteTableInternal(&req, &resp, nullptr)); |
534 | 109 | return Status::OK(); |
535 | 410 | } |
536 | | |
537 | 410 | if (!indexes_to_backfill.empty()) { |
538 | 1 | VLOG(3) << "Backfilling " << yb::ToString(indexes_to_backfill) |
539 | 1 | << (deferred_indexes.empty() |
540 | 1 | ? "" |
541 | 0 | : yb::Format(" along with deferred indexes $0", |
542 | 0 | yb::ToString(deferred_indexes))); |
543 | 4 | for (auto& deferred_idx : deferred_indexes) { |
544 | 4 | indexes_to_backfill.emplace_back(deferred_idx); |
545 | 4 | } |
546 | 410 | WARN_NOT_OK( |
547 | 410 | StartBackfillingData( |
548 | 410 | catalog_manager, indexed_table.get(), indexes_to_backfill, current_version), |
549 | 410 | yb::Format("Could not launch backfill for $0", indexed_table->ToString())); |
550 | 410 | } |
551 | | |
552 | 410 | return Status::OK(); |
553 | 410 | } |
554 | | |
555 | | // ----------------------------------------------------------------------------------------------- |
556 | | // BackfillTableJob |
557 | | // ----------------------------------------------------------------------------------------------- |
558 | 0 | std::string BackfillTableJob::description() const { |
559 | 0 | const std::shared_ptr<BackfillTable> retain_bt = backfill_table_; |
560 | 0 | auto curr_state = state(); |
561 | 0 | if (!IsStateTerminal(curr_state) && retain_bt) { |
562 | 0 | return retain_bt->description(); |
563 | 0 | } else if (curr_state == MonitoredTaskState::kFailed) { |
564 | 0 | return Format("Backfilling $0 Failed", requested_index_names_); |
565 | 0 | } else if (curr_state == MonitoredTaskState::kAborted) { |
566 | 0 | return Format("Backfilling $0 Aborted", requested_index_names_); |
567 | 0 | } else { |
568 | 0 | DCHECK(curr_state == MonitoredTaskState::kComplete); |
569 | 0 | return Format("Backfilling $0 Done", requested_index_names_); |
570 | 0 | } |
571 | 0 | } |
572 | | |
573 | 0 | MonitoredTaskState BackfillTableJob::AbortAndReturnPrevState(const Status& status) { |
574 | 0 | auto old_state = state(); |
575 | 0 | while (!IsStateTerminal(old_state)) { |
576 | 0 | if (state_.compare_exchange_strong(old_state, |
577 | 0 | MonitoredTaskState::kAborted)) { |
578 | 0 | return old_state; |
579 | 0 | } |
580 | 0 | old_state = state(); |
581 | 0 | } |
582 | 0 | return old_state; |
583 | 0 | } |
584 | | |
585 | 872 | void BackfillTableJob::SetState(MonitoredTaskState new_state) { |
586 | 872 | auto old_state = state(); |
587 | 872 | if (!IsStateTerminal(old_state)) { |
588 | 871 | if (state_.compare_exchange_strong(old_state, new_state) && IsStateTerminal(new_state)) { |
589 | 419 | MarkDone(); |
590 | 419 | } |
591 | 871 | } |
592 | 872 | } |
593 | | // ----------------------------------------------------------------------------------------------- |
594 | | // BackfillTable |
595 | | // ----------------------------------------------------------------------------------------------- |
596 | | |
597 | | namespace { |
598 | | |
599 | 452 | std::unordered_set<TableId> IndexIdsFromInfos(const std::vector<IndexInfoPB>& indexes) { |
600 | 452 | std::unordered_set<TableId> idx_ids; |
601 | 464 | for (const auto& idx_info : indexes) { |
602 | 464 | idx_ids.insert(idx_info.table_id()); |
603 | 464 | } |
604 | 452 | return idx_ids; |
605 | 452 | } |
606 | | |
607 | | std::string RetrieveIndexNames(CatalogManager* mgr, |
608 | 3.17k | const std::unordered_set<std::string>& index_ids) { |
609 | 3.17k | std::ostringstream out; |
610 | 3.17k | out << "{ "; |
611 | 3.17k | bool first = true; |
612 | 3.22k | for (const auto& index_id : index_ids) { |
613 | 3.22k | const auto table_info = mgr->GetTableInfo(index_id); |
614 | 3.22k | if (!table_info) { |
615 | 0 | LOG(WARNING) << "No table info can be found with index table id " << index_id; |
616 | 0 | continue; |
617 | 0 | } |
618 | 3.22k | if (!first) { |
619 | 47 | out << ", "; |
620 | 47 | } |
621 | 3.22k | first = false; |
622 | | |
623 | 3.22k | out << table_info->name(); |
624 | 3.22k | } |
625 | 3.17k | out << " }"; |
626 | 3.17k | return out.str(); |
627 | 3.17k | } |
628 | | |
629 | | } // namespace |
630 | | |
631 | | BackfillTable::BackfillTable( |
632 | | Master* master, ThreadPool* callback_pool, const scoped_refptr<TableInfo>& indexed_table, |
633 | | std::vector<IndexInfoPB> indexes, const scoped_refptr<NamespaceInfo>& ns_info) |
634 | | : master_(master), |
635 | | callback_pool_(callback_pool), |
636 | | indexed_table_(indexed_table), |
637 | | index_infos_(indexes), |
638 | | requested_index_ids_(IndexIdsFromInfos(indexes)), |
639 | | requested_index_names_(RetrieveIndexNames( |
640 | | master->catalog_manager_impl(), requested_index_ids_)), |
641 | 452 | ns_info_(ns_info) { |
642 | 452 | auto l = indexed_table_->LockForRead(); |
643 | 452 | schema_version_ = indexed_table_->metadata().state().pb.version(); |
644 | 452 | leader_term_ = master_->catalog_manager()->leader_ready_term(); |
645 | 452 | if (l.data().pb.backfill_jobs_size() > 0) { |
646 | 0 | number_rows_processed_.store(l.data().pb.backfill_jobs(0).num_rows_processed()); |
647 | 452 | } else { |
648 | 452 | number_rows_processed_.store(0); |
649 | 452 | } |
650 | | |
651 | 452 | const auto& pb = indexed_table_->metadata().state().pb; |
652 | 452 | if (pb.backfill_jobs_size() > 0 && pb.backfill_jobs(0).has_backfilling_timestamp() && |
653 | 0 | read_time_for_backfill_.FromUint64(pb.backfill_jobs(0).backfilling_timestamp()).ok()) { |
654 | 0 | DCHECK(pb.backfill_jobs_size() == 1) << "Expect only 1 outstanding backfill job"; |
655 | 0 | DCHECK(implicit_cast<size_t>(pb.backfill_jobs(0).indexes_size()) == index_infos_.size()) |
656 | 0 | << "Expect to use the same set of indexes."; |
657 | 0 | timestamp_chosen_.store(true, std::memory_order_release); |
658 | 0 | VLOG_WITH_PREFIX(1) << "Will be using " << read_time_for_backfill_ |
659 | 0 | << " for backfill"; |
660 | 452 | } else { |
661 | 452 | read_time_for_backfill_ = HybridTime::kInvalid; |
662 | 452 | timestamp_chosen_.store(false, std::memory_order_release); |
663 | 452 | } |
664 | 452 | done_.store(false, std::memory_order_release); |
665 | 452 | } |
666 | | |
667 | 8.40k | const std::unordered_set<TableId> BackfillTable::indexes_to_build() const { |
668 | 8.40k | std::unordered_set<TableId> indexes_to_build; |
669 | 8.40k | { |
670 | 8.40k | auto l = indexed_table_->LockForRead(); |
671 | 8.40k | const auto& indexed_table_pb = l.data().pb; |
672 | 8.40k | if (indexed_table_pb.backfill_jobs_size() == 0) { |
673 | | // Some other task already marked the backfill job as done. |
674 | 5 | return {}; |
675 | 5 | } |
676 | 18.4E | DCHECK(indexed_table_pb.backfill_jobs_size() == 1) << "For now we only expect to have up to 1 " |
677 | 18.4E | "outstanding backfill job."; |
678 | 8.51k | for (const auto& kv_pair : indexed_table_pb.backfill_jobs(0).backfill_state()) { |
679 | 8.51k | if (kv_pair.second == BackfillJobPB::IN_PROGRESS) { |
680 | 8.49k | indexes_to_build.insert(kv_pair.first); |
681 | 8.49k | } |
682 | 8.51k | } |
683 | 8.39k | } |
684 | 8.39k | return indexes_to_build; |
685 | 8.39k | } |
686 | | |
687 | 452 | void BackfillTable::Launch() { |
688 | 452 | backfill_job_ = std::make_shared<BackfillTableJob>(shared_from_this()); |
689 | 452 | backfill_job_->SetState(MonitoredTaskState::kRunning); |
690 | 452 | master_->catalog_manager_impl()->jobs_tracker_->AddTask(backfill_job_); |
691 | | |
692 | 452 | { |
693 | 452 | auto l = indexed_table_->LockForWrite(); |
694 | 452 | if (l.data().pb.backfill_jobs_size() == 0) { |
695 | 452 | auto* backfill_job = l.mutable_data()->pb.add_backfill_jobs(); |
696 | 464 | for (const auto& idx_info : index_infos_) { |
697 | 464 | backfill_job->add_indexes()->CopyFrom(idx_info); |
698 | 464 | backfill_job->mutable_backfill_state()->insert( |
699 | 464 | {idx_info.table_id(), BackfillJobPB::IN_PROGRESS}); |
700 | 464 | } |
701 | 452 | auto s = master_->catalog_manager_impl()->sys_catalog_->Upsert( |
702 | 452 | leader_term(), indexed_table_); |
703 | 452 | if (!s.ok()) { |
704 | 0 | LOG(WARNING) << "Failed to persist backfill jobs. Abandoning launch. " << s; |
705 | 0 | return; |
706 | 0 | } |
707 | 452 | l.Commit(); |
708 | 452 | } |
709 | 452 | } |
710 | 452 | if (!timestamp_chosen_.load(std::memory_order_acquire)) { |
711 | 452 | LaunchComputeSafeTimeForRead(); |
712 | 0 | } else { |
713 | 0 | LaunchBackfill(); |
714 | 0 | } |
715 | 452 | } |
716 | | |
717 | 452 | void BackfillTable::LaunchComputeSafeTimeForRead() { |
718 | 452 | auto tablets = indexed_table_->GetTablets(); |
719 | | |
720 | 452 | num_tablets_.store(tablets.size(), std::memory_order_release); |
721 | 452 | tablets_pending_.store(tablets.size(), std::memory_order_release); |
722 | 452 | auto min_cutoff = master()->clock()->Now(); |
723 | 2.66k | for (const scoped_refptr<TabletInfo>& tablet : tablets) { |
724 | 2.66k | auto get_safetime = std::make_shared<GetSafeTimeForTablet>( |
725 | 2.66k | shared_from_this(), tablet, min_cutoff); |
726 | 2.66k | get_safetime->Launch(); |
727 | 2.66k | } |
728 | 452 | } |
729 | | |
730 | 886 | std::string BackfillTable::LogPrefix() const { |
731 | 886 | return Format("Backfill Index Table(s) $0 ", requested_index_names_); |
732 | 886 | } |
733 | | |
734 | 0 | std::string BackfillTable::description() const { |
735 | 0 | auto num_pending = tablets_pending_.load(std::memory_order_acquire); |
736 | 0 | auto num_tablets = num_tablets_.load(std::memory_order_acquire); |
737 | 0 | return Format( |
738 | 0 | "Backfill Index Table(s) $0 : $1", requested_index_names_, |
739 | 0 | (timestamp_chosen() |
740 | 0 | ? (done() ? Format("Backfill $0/$1 tablets done", num_pending, num_tablets) |
741 | 0 | : Format( |
742 | 0 | "Backfilling $0/$1 tablets with $2 rows done", num_pending, num_tablets, |
743 | 0 | number_rows_processed_.load())) |
744 | 0 | : Format("Waiting to GetSafeTime from $0/$1 tablets", num_pending, num_tablets))); |
745 | 0 | } |
746 | | |
747 | 235 | const std::string BackfillTable::GetNamespaceName() const { |
748 | 235 | return ns_info_->name(); |
749 | 235 | } |
750 | | |
751 | 2.62k | Status BackfillTable::UpdateRowsProcessedForIndexTable(const uint64_t number_rows_processed) { |
752 | 2.62k | auto l = indexed_table_->LockForWrite(); |
753 | | |
754 | 2.62k | if (l.data().pb.backfill_jobs_size() == 0) { |
755 | | // Some other task already marked the backfill job as done. |
756 | 3 | return Status::OK(); |
757 | 3 | } |
758 | | |
759 | | // This is consistent with logic assuming that we have only one backfill job in queue |
760 | | // We might in the future change this to a for loop to account for multiple backfill jobs |
761 | 2.62k | number_rows_processed_.fetch_add(number_rows_processed); |
762 | 2.62k | auto* indexed_table_pb = l.mutable_data()->pb.mutable_backfill_jobs(0); |
763 | 2.62k | indexed_table_pb->set_num_rows_processed(number_rows_processed_.load()); |
764 | 0 | VLOG(2) << "Updated backfill task to having processed " << number_rows_processed |
765 | 0 | << " more rows. Total rows processed is: " << number_rows_processed_; |
766 | | |
767 | 2.62k | RETURN_NOT_OK( |
768 | 2.62k | master_->catalog_manager_impl()->sys_catalog_->Upsert(leader_term(), indexed_table_)); |
769 | 2.62k | l.Commit(); |
770 | 2.62k | return Status::OK(); |
771 | 2.62k | } |
772 | | |
773 | 2.66k | Status BackfillTable::UpdateSafeTime(const Status& s, HybridTime ht) { |
774 | 2.66k | if (!s.ok()) { |
775 | | // Move on to ABORTED permission. |
776 | 4 | LOG_WITH_PREFIX(ERROR) |
777 | 4 | << "Failed backfill. Could not compute safe time for " |
778 | 4 | << yb::ToString(indexed_table_) << " " << s; |
779 | 4 | if (!timestamp_chosen_.exchange(true)) { |
780 | 4 | RETURN_NOT_OK_PREPEND( |
781 | 4 | MarkAllIndexesAsFailed(), "Failed to mark backfill as failed. Abandoning."); |
782 | 4 | } |
783 | 4 | return Status::OK(); |
784 | 2.65k | } |
785 | | |
786 | | // Need to guard this. |
787 | 2.65k | HybridTime read_timestamp; |
788 | 2.65k | { |
789 | 2.65k | std::lock_guard<simple_spinlock> l(mutex_); |
790 | 1 | VLOG(2) << "Updating read_time_for_backfill_ to max{ " |
791 | 1 | << read_time_for_backfill_.ToString() << ", " << ht.ToString() |
792 | 1 | << " }."; |
793 | 2.65k | read_time_for_backfill_.MakeAtLeast(ht); |
794 | 2.65k | read_timestamp = read_time_for_backfill_; |
795 | 2.65k | } |
796 | | |
797 | | // If OK then move on to doing backfill. |
798 | 2.65k | if (!timestamp_chosen() && --tablets_pending_ == 0) { |
799 | 448 | LOG_WITH_PREFIX(INFO) << "Completed fetching SafeTime for the table " |
800 | 448 | << yb::ToString(indexed_table_) << " will be using " |
801 | 448 | << read_timestamp.ToString(); |
802 | 448 | { |
803 | 448 | auto l = indexed_table_->LockForWrite(); |
804 | 448 | DCHECK_EQ(l.mutable_data()->pb.backfill_jobs_size(), 1); |
805 | 448 | auto* backfill_job = l.mutable_data()->pb.mutable_backfill_jobs(0); |
806 | 448 | backfill_job->set_backfilling_timestamp(read_timestamp.ToUint64()); |
807 | 448 | RETURN_NOT_OK_PREPEND( |
808 | 448 | master_->catalog_manager_impl()->sys_catalog_->Upsert( |
809 | 448 | leader_term(), indexed_table_), |
810 | 448 | "Failed to persist backfilling timestamp. Abandoning."); |
811 | 448 | l.Commit(); |
812 | 448 | } |
813 | 1 | VLOG_WITH_PREFIX(2) << "Saved " << read_timestamp |
814 | 1 | << " as backfilling_timestamp"; |
815 | 448 | timestamp_chosen_.store(true, std::memory_order_release); |
816 | 448 | LaunchBackfill(); |
817 | 448 | } |
818 | 2.65k | return Status::OK(); |
819 | 2.65k | } |
820 | | |
821 | 448 | void BackfillTable::LaunchBackfill() { |
822 | 1 | VLOG_WITH_PREFIX(1) << "launching backfill with timestamp: " |
823 | 1 | << read_time_for_backfill_; |
824 | 448 | auto tablets = indexed_table_->GetTablets(); |
825 | | |
826 | 448 | num_tablets_.store(tablets.size(), std::memory_order_release); |
827 | 448 | tablets_pending_.store(tablets.size(), std::memory_order_release); |
828 | 2.62k | for (const scoped_refptr<TabletInfo>& tablet : tablets) { |
829 | 2.62k | auto backfill_tablet = std::make_shared<BackfillTablet>(shared_from_this(), tablet); |
830 | 2.62k | backfill_tablet->Launch(); |
831 | 2.62k | } |
832 | 448 | } |
833 | | |
834 | 2.53k | void BackfillTable::Done(const Status& s, const std::unordered_set<TableId>& failed_indexes) { |
835 | 2.53k | if (!s.ok()) { |
836 | 9 | LOG_WITH_PREFIX(ERROR) << "failed to backfill the index: " << yb::ToString(failed_indexes) |
837 | 9 | << " due to " << s; |
838 | 9 | WARN_NOT_OK( |
839 | 9 | MarkIndexesAsFailed(failed_indexes, s.message().ToBuffer()), |
840 | 9 | "Couldn't to mark Indexes as failed"); |
841 | 9 | CheckIfDone(); |
842 | 9 | return; |
843 | 9 | } |
844 | | |
845 | | // If OK then move on to READ permissions. |
846 | 2.52k | if (!done() && --tablets_pending_ == 0) { |
847 | 421 | LOG_WITH_PREFIX(INFO) << "Completed backfilling the index table."; |
848 | 421 | done_.store(true, std::memory_order_release); |
849 | 421 | WARN_NOT_OK(MarkAllIndexesAsSuccess(), "Failed to complete backfill."); |
850 | 421 | WARN_NOT_OK(UpdateIndexPermissionsForIndexes(), "Failed to complete backfill."); |
851 | 2.10k | } else { |
852 | 0 | VLOG_WITH_PREFIX(1) << "Still backfilling " << tablets_pending_ << " more tablets."; |
853 | 2.10k | } |
854 | 2.52k | } |
855 | | |
856 | | Status BackfillTable::MarkIndexesAsFailed( |
857 | 9 | const std::unordered_set<TableId>& failed_indexes, const string& message) { |
858 | 9 | return MarkIndexesAsDesired(failed_indexes, BackfillJobPB::FAILED, message); |
859 | 9 | } |
860 | | |
861 | 4 | Status BackfillTable::MarkAllIndexesAsFailed() { |
862 | 4 | return MarkIndexesAsDesired(indexes_to_build(), BackfillJobPB::FAILED, "failed"); |
863 | 4 | } |
864 | | |
865 | 421 | Status BackfillTable::MarkAllIndexesAsSuccess() { |
866 | 421 | return MarkIndexesAsDesired(indexes_to_build(), BackfillJobPB::SUCCESS, ""); |
867 | 421 | } |
868 | | |
869 | | Status BackfillTable::MarkIndexesAsDesired( |
870 | | const std::unordered_set<TableId>& index_ids_set, BackfillJobPB_State state, |
871 | 434 | const string message) { |
872 | 1 | VLOG_WITH_PREFIX(3) << "Marking " << yb::ToString(index_ids_set) |
873 | 1 | << " as " << BackfillJobPB_State_Name(state) |
874 | 1 | << " due to " << message; |
875 | 434 | if (!index_ids_set.empty()) { |
876 | 434 | auto l = indexed_table_->LockForWrite(); |
877 | 434 | auto& indexed_table_pb = l.mutable_data()->pb; |
878 | 0 | DCHECK_LE(indexed_table_pb.backfill_jobs_size(), 1) << "For now we only expect to have up to 1 " |
879 | 0 | "outstanding backfill job."; |
880 | 434 | if (indexed_table_pb.backfill_jobs_size() == 0) { |
881 | | // Some other task already marked the backfill job as done. |
882 | 1 | return Status::OK(); |
883 | 1 | } |
884 | 433 | auto* backfill_state_pb = indexed_table_pb.mutable_backfill_jobs(0)->mutable_backfill_state(); |
885 | 440 | for (const auto& idx_id : index_ids_set) { |
886 | 440 | backfill_state_pb->at(idx_id) = state; |
887 | 1 | VLOG(2) << "Marking index " << idx_id << " as " << BackfillJobPB_State_Name(state); |
888 | 440 | } |
889 | 1.26k | for (int i = 0; i < indexed_table_pb.indexes_size(); i++) { |
890 | 830 | IndexInfoPB* idx_pb = indexed_table_pb.mutable_indexes(i); |
891 | 830 | if (index_ids_set.find(idx_pb->table_id()) != index_ids_set.end()) { |
892 | | // Should this also move to the BackfillJob instead? |
893 | 439 | if (!message.empty()) { |
894 | 8 | idx_pb->set_backfill_error_message(message); |
895 | 431 | } else { |
896 | 431 | idx_pb->clear_backfill_error_message(); |
897 | 431 | } |
898 | 439 | idx_pb->clear_is_backfill_deferred(); |
899 | 439 | } |
900 | 830 | } |
901 | 433 | RETURN_NOT_OK( |
902 | 433 | master_->catalog_manager_impl()->sys_catalog_->Upsert(leader_term(), indexed_table_)); |
903 | 433 | l.Commit(); |
904 | 433 | } |
905 | 433 | return Status::OK(); |
906 | 434 | } |
907 | | |
908 | 9 | void BackfillTable::CheckIfDone() { |
909 | 9 | if (indexes_to_build().empty()) { |
910 | 8 | done_.store(true, std::memory_order_release); |
911 | 8 | WARN_NOT_OK( |
912 | 8 | UpdateIndexPermissionsForIndexes(), "Could not update index permissions after backfill"); |
913 | 8 | } |
914 | 9 | } |
915 | | |
916 | 429 | Status BackfillTable::UpdateIndexPermissionsForIndexes() { |
917 | 429 | std::unordered_map<TableId, IndexPermissions> permissions_to_set; |
918 | 429 | bool all_success = true; |
919 | 429 | { |
920 | 429 | auto l = indexed_table_->LockForRead(); |
921 | 429 | const auto& indexed_table_pb = l.data().pb; |
922 | 429 | if (indexed_table_pb.backfill_jobs_size() == 0) { |
923 | | // Some other task already marked the backfill job as done. |
924 | 1 | return Status::OK(); |
925 | 1 | } |
926 | 0 | DCHECK(indexed_table_pb.backfill_jobs_size() == 1) << "For now we only expect to have up to 1 " |
927 | 0 | "outstanding backfill job."; |
928 | 440 | for (const auto& kv_pair : indexed_table_pb.backfill_jobs(0).backfill_state()) { |
929 | 1 | VLOG(2) << "Reading backfill_state for " << kv_pair.first << " as " |
930 | 1 | << BackfillJobPB_State_Name(kv_pair.second); |
931 | 0 | DCHECK_NE(kv_pair.second, BackfillJobPB::IN_PROGRESS) |
932 | 0 | << __func__ << " is expected to be only called after all indexes are done."; |
933 | 440 | const bool success = (kv_pair.second == BackfillJobPB::SUCCESS); |
934 | 440 | all_success &= success; |
935 | 440 | permissions_to_set.emplace( |
936 | 440 | kv_pair.first, |
937 | 432 | success ? INDEX_PERM_READ_WRITE_AND_DELETE : INDEX_PERM_WRITE_AND_DELETE_WHILE_REMOVING); |
938 | 440 | } |
939 | 428 | } |
940 | | |
941 | 440 | for (const auto& kv_pair : permissions_to_set) { |
942 | 440 | if (kv_pair.second == INDEX_PERM_READ_WRITE_AND_DELETE) { |
943 | 432 | RETURN_NOT_OK(AllowCompactionsToGCDeleteMarkers(kv_pair.first)); |
944 | 432 | } |
945 | 440 | } |
946 | | |
947 | 428 | RETURN_NOT_OK_PREPEND( |
948 | 428 | MultiStageAlterTable::UpdateIndexPermission( |
949 | 428 | master_->catalog_manager_impl(), indexed_table_, permissions_to_set, boost::none), |
950 | 428 | "Could not update permissions after backfill. " |
951 | 428 | "Possible that the master-leader has changed."); |
952 | 428 | backfill_job_->SetState( |
953 | 413 | all_success ? MonitoredTaskState::kComplete : MonitoredTaskState::kFailed); |
954 | 428 | RETURN_NOT_OK(ClearCheckpointStateInTablets()); |
955 | 428 | indexed_table_->ClearIsBackfilling(); |
956 | | |
957 | 21 | VLOG(1) << "Sending alter table requests to the Indexed table"; |
958 | 428 | RETURN_NOT_OK(master_->catalog_manager_impl()->SendAlterTableRequest(indexed_table_)); |
959 | 21 | VLOG(1) << "DONE Sending alter table requests to the Indexed table"; |
960 | | |
961 | 428 | LOG(INFO) << "Done backfill on " << indexed_table_->ToString() << " setting permissions to " |
962 | 428 | << yb::ToString(permissions_to_set); |
963 | 428 | return Status::OK(); |
964 | 428 | } |
965 | | |
966 | 420 | Status BackfillTable::ClearCheckpointStateInTablets() { |
967 | 420 | auto tablets = indexed_table_->GetTablets(); |
968 | 420 | std::vector<TabletInfo*> tablet_ptrs; |
969 | 2.48k | for (scoped_refptr<TabletInfo>& tablet : tablets) { |
970 | 2.48k | tablet_ptrs.push_back(tablet.get()); |
971 | 2.48k | tablet->mutable_metadata()->StartMutation(); |
972 | 2.48k | auto& pb = tablet->mutable_metadata()->mutable_dirty()->pb; |
973 | 2.51k | for (const auto& idx : requested_index_ids_) { |
974 | 2.51k | pb.mutable_backfilled_until()->erase(idx); |
975 | 2.51k | } |
976 | 2.48k | } |
977 | 420 | RETURN_NOT_OK_PREPEND( |
978 | 420 | master()->catalog_manager()->sys_catalog()->Upsert(leader_term(), tablet_ptrs), |
979 | 420 | "Could not persist that the table is done backfilling."); |
980 | 2.48k | for (scoped_refptr<TabletInfo>& tablet : tablets) { |
981 | 1 | VLOG(2) << "Done backfilling the table. " << yb::ToString(tablet) |
982 | 1 | << " clearing backfilled_until"; |
983 | 2.48k | tablet->mutable_metadata()->CommitMutation(); |
984 | 2.48k | } |
985 | | |
986 | 420 | if (FLAGS_TEST_slowdown_backfill_job_deletion_ms > 0) { |
987 | 21 | SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_slowdown_backfill_job_deletion_ms)); |
988 | 21 | } |
989 | | |
990 | 420 | { |
991 | 420 | auto l = indexed_table_->LockForWrite(); |
992 | 0 | DCHECK_LE(l.data().pb.backfill_jobs_size(), 1) << "For now we only expect to have up to 1 " |
993 | 0 | "outstanding backfill job."; |
994 | 420 | l.mutable_data()->pb.clear_backfill_jobs(); |
995 | 420 | RETURN_NOT_OK_PREPEND(master_->catalog_manager_impl()->sys_catalog_->Upsert( |
996 | 420 | leader_term(), indexed_table_), |
997 | 420 | "Could not clear backfilling timestamp."); |
998 | 420 | l.Commit(); |
999 | 420 | } |
1000 | 13 | VLOG_WITH_PREFIX(2) << "Cleared backfilling timestamp."; |
1001 | 420 | return Status::OK(); |
1002 | 420 | } |
1003 | | |
1004 | | Status BackfillTable::AllowCompactionsToGCDeleteMarkers( |
1005 | 432 | const TableId &index_table_id) { |
1006 | 0 | DVLOG(3) << __PRETTY_FUNCTION__; |
1007 | 432 | auto res = master_->catalog_manager()->FindTableById(index_table_id); |
1008 | 432 | if (!res && res.status().IsNotFound()) { |
1009 | 0 | LOG(ERROR) << "Index " << index_table_id << " was not found." |
1010 | 0 | << " This is ok in case somebody issued a delete index. : " << res.ToString(); |
1011 | 0 | return Status::OK(); |
1012 | 0 | } |
1013 | 432 | scoped_refptr<TableInfo> index_table_info = VERIFY_RESULT_PREPEND(std::move(res), |
1014 | 432 | Format("Could not find the index table $0", index_table_id)); |
1015 | | |
1016 | | // Add a sleep here to wait until the Table is fully created. |
1017 | 432 | bool is_ready = false; |
1018 | 432 | bool first_run = true; |
1019 | 432 | do { |
1020 | 432 | if (!first_run) { |
1021 | 0 | YB_LOG_EVERY_N_SECS(INFO, 1) << "Waiting for the previous alter table to " |
1022 | 0 | "complete on the index table " |
1023 | 0 | << index_table_id; |
1024 | 0 | SleepFor( |
1025 | 0 | MonoDelta::FromMilliseconds(FLAGS_index_backfill_wait_for_alter_table_completion_ms)); |
1026 | 0 | } |
1027 | 432 | first_run = false; |
1028 | 432 | { |
1029 | 0 | VLOG(2) << __func__ << ": Trying to lock index table for Read"; |
1030 | 432 | auto l = index_table_info->LockForRead(); |
1031 | 432 | auto state = l->pb.state(); |
1032 | 432 | if (state != SysTablesEntryPB::RUNNING && state != SysTablesEntryPB::ALTERING) { |
1033 | 1 | LOG(ERROR) << "Index " << index_table_id << " is in state " |
1034 | 1 | << SysTablesEntryPB_State_Name(state) << " : cannot enable compactions on it"; |
1035 | | // Treating it as success so that we can proceed with updating other indexes. |
1036 | 1 | return Status::OK(); |
1037 | 1 | } |
1038 | 431 | is_ready = state == SysTablesEntryPB::RUNNING; |
1039 | 431 | } |
1040 | 0 | VLOG(2) << __func__ << ": Unlocked index table for Read"; |
1041 | 431 | } while (!is_ready); |
1042 | 431 | { |
1043 | 431 | TRACE("Locking index table"); |
1044 | 0 | VLOG(2) << __func__ << ": Trying to lock index table for Write"; |
1045 | 431 | auto l = index_table_info->LockForWrite(); |
1046 | 0 | VLOG(2) << __func__ << ": locked index table for Write"; |
1047 | 431 | l.mutable_data()->pb.mutable_schema()->mutable_table_properties() |
1048 | 431 | ->set_retain_delete_markers(false); |
1049 | | |
1050 | | // Update sys-catalog with the new indexed table info. |
1051 | 431 | TRACE("Updating index table metadata on disk"); |
1052 | 431 | RETURN_NOT_OK_PREPEND( |
1053 | 431 | master_->catalog_manager_impl()->sys_catalog_->Upsert( |
1054 | 431 | leader_term(), index_table_info), |
1055 | 431 | yb::Format( |
1056 | 431 | "Could not update index_table_info for $0 to enable compactions.", |
1057 | 431 | index_table_id)); |
1058 | | |
1059 | | // Update the in-memory state. |
1060 | 431 | TRACE("Committing in-memory state"); |
1061 | 431 | l.Commit(); |
1062 | 431 | } |
1063 | 0 | VLOG(2) << __func__ << ": Unlocked index table for Read"; |
1064 | 0 | VLOG(1) << "Sending backfill done requests to the Index table"; |
1065 | 431 | RETURN_NOT_OK(SendRpcToAllowCompactionsToGCDeleteMarkers(index_table_info)); |
1066 | 0 | VLOG(1) << "DONE Sending backfill done requests to the Index table"; |
1067 | 431 | return Status::OK(); |
1068 | 431 | } |
1069 | | |
1070 | | Status BackfillTable::SendRpcToAllowCompactionsToGCDeleteMarkers( |
1071 | 431 | const scoped_refptr<TableInfo> &table) { |
1072 | 431 | auto tablets = table->GetTablets(); |
1073 | | |
1074 | 2.50k | for (const scoped_refptr<TabletInfo>& tablet : tablets) { |
1075 | 2.50k | RETURN_NOT_OK(SendRpcToAllowCompactionsToGCDeleteMarkers(tablet, table->id())); |
1076 | 2.50k | } |
1077 | 431 | return Status::OK(); |
1078 | 431 | } |
1079 | | |
1080 | | Status BackfillTable::SendRpcToAllowCompactionsToGCDeleteMarkers( |
1081 | 2.50k | const scoped_refptr<TabletInfo> &tablet, const std::string &table_id) { |
1082 | 2.50k | auto call = std::make_shared<AsyncBackfillDone>(master_, callback_pool_, tablet, table_id); |
1083 | 2.50k | tablet->table()->AddTask(call); |
1084 | 2.50k | RETURN_NOT_OK_PREPEND( |
1085 | 2.50k | master_->catalog_manager()->ScheduleTask(call), |
1086 | 2.50k | "Failed to send backfill done request"); |
1087 | 2.50k | return Status::OK(); |
1088 | 2.50k | } |
1089 | | |
1090 | | // ----------------------------------------------------------------------------------------------- |
1091 | | // BackfillTablet |
1092 | | // ----------------------------------------------------------------------------------------------- |
1093 | | BackfillTablet::BackfillTablet( |
1094 | | std::shared_ptr<BackfillTable> backfill_table, const scoped_refptr<TabletInfo>& tablet) |
1095 | 2.62k | : backfill_table_(backfill_table), tablet_(tablet) { |
1096 | 2.62k | const auto& index_ids = backfill_table->indexes_to_build(); |
1097 | 2.62k | { |
1098 | 2.62k | auto l = tablet_->LockForRead(); |
1099 | 2.62k | const auto& pb = tablet_->metadata().state().pb; |
1100 | 2.62k | Partition::FromPB(pb.partition(), &partition_); |
1101 | | // calculate backfilled_until_ as the largest key which all (active) indexes have backfilled. |
1102 | 2.65k | for (const TableId& idx_id : index_ids) { |
1103 | 2.65k | if (pb.backfilled_until().find(idx_id) != pb.backfilled_until().end()) { |
1104 | 0 | auto key = pb.backfilled_until().at(idx_id); |
1105 | 0 | if (backfilled_until_.empty() || key.compare(backfilled_until_) < 0) { |
1106 | 0 | VLOG(2) << "Updating backfilled_until_ as " << key; |
1107 | 0 | backfilled_until_ = key; |
1108 | 0 | done_.store(backfilled_until_.empty(), std::memory_order_release); |
1109 | 0 | } |
1110 | 0 | } |
1111 | 2.65k | } |
1112 | 2.62k | } |
1113 | 2.62k | if (!backfilled_until_.empty()) { |
1114 | 0 | VLOG_WITH_PREFIX(1) << " resuming backfill from " |
1115 | 0 | << yb::ToString(backfilled_until_); |
1116 | 2.62k | } else if (done()) { |
1117 | 0 | VLOG_WITH_PREFIX(1) << " backfill already done."; |
1118 | 2.62k | } else { |
1119 | 1 | VLOG_WITH_PREFIX(1) << " beginning backfill from " |
1120 | 1 | << "<start-of-the-tablet>"; |
1121 | 2.62k | } |
1122 | 2.62k | } |
1123 | | |
1124 | 3 | std::string BackfillTablet::LogPrefix() const { |
1125 | 3 | return Format("Backfill Index(es) $0 for tablet $1 ", |
1126 | 3 | yb::ToString(backfill_table_->indexes_to_build()), |
1127 | 3 | tablet_->id()); |
1128 | 3 | } |
1129 | | |
1130 | 5.25k | void BackfillTablet::LaunchNextChunkOrDone() { |
1131 | 5.25k | if (done()) { |
1132 | 0 | VLOG_WITH_PREFIX(1) << "is done"; |
1133 | 2.52k | backfill_table_->Done(Status::OK(), /* failed_indexes */ {}); |
1134 | 2.72k | } else if (!backfill_table_->done()) { |
1135 | 1 | VLOG_WITH_PREFIX(2) << "Launching next chunk from " << backfilled_until_; |
1136 | 2.72k | auto chunk = std::make_shared<BackfillChunk>(shared_from_this(), |
1137 | 2.72k | backfilled_until_); |
1138 | 2.72k | chunk->Launch(); |
1139 | 2.72k | } |
1140 | 5.25k | } |
1141 | | |
1142 | | void BackfillTablet::Done( |
1143 | | const Status& status, const boost::optional<string>& backfilled_until, |
1144 | 2.62k | const uint64_t number_rows_processed, const std::unordered_set<TableId>& failed_indexes) { |
1145 | 2.62k | if (!status.ok()) { |
1146 | 9 | LOG(INFO) << "Failed to backfill the tablet " << yb::ToString(tablet_) << ": " << status |
1147 | 9 | << "\nFailed_indexes are " << yb::ToString(failed_indexes); |
1148 | 9 | backfill_table_->Done(status, failed_indexes); |
1149 | 9 | } |
1150 | | |
1151 | 2.62k | if (backfilled_until) { |
1152 | 2.62k | auto s = UpdateBackfilledUntil(*backfilled_until, number_rows_processed); |
1153 | 2.62k | if (!s.ok()) { |
1154 | 0 | LOG(WARNING) << "Could not persist how far the tablet is done backfilling. " << s.ToString(); |
1155 | 0 | return; |
1156 | 0 | } |
1157 | 2.62k | } |
1158 | | |
1159 | 2.62k | LaunchNextChunkOrDone(); |
1160 | 2.62k | } |
1161 | | |
1162 | | Status BackfillTablet::UpdateBackfilledUntil( |
1163 | 2.62k | const string& backfilled_until, const uint64_t number_rows_processed) { |
1164 | 2.62k | backfilled_until_ = backfilled_until; |
1165 | 1 | VLOG_WITH_PREFIX(2) << "Done backfilling the tablet " << yb::ToString(tablet_) << " until " |
1166 | 1 | << yb::ToString(backfilled_until_); |
1167 | 2.62k | { |
1168 | 2.62k | auto l = tablet_->LockForWrite(); |
1169 | 2.65k | for (const auto& idx_id : backfill_table_->indexes_to_build()) { |
1170 | 2.65k | l.mutable_data()->pb.mutable_backfilled_until()->insert({idx_id, backfilled_until_}); |
1171 | 2.65k | } |
1172 | 2.62k | RETURN_NOT_OK(backfill_table_->master()->catalog_manager()->sys_catalog()->Upsert( |
1173 | 2.62k | backfill_table_->leader_term(), tablet_)); |
1174 | 2.62k | l.Commit(); |
1175 | 2.62k | } |
1176 | | |
1177 | | // This is the last chunk. |
1178 | 2.62k | if (backfilled_until_.empty()) { |
1179 | 2.52k | LOG(INFO) << "Done backfilling the tablet " << yb::ToString(tablet_); |
1180 | 2.52k | done_.store(true, std::memory_order_release); |
1181 | 2.52k | } |
1182 | 2.62k | return backfill_table_->UpdateRowsProcessedForIndexTable(number_rows_processed); |
1183 | 2.62k | } |
1184 | | |
1185 | | // ----------------------------------------------------------------------------------------------- |
1186 | | // GetSafeTimeForTablet |
1187 | | // ----------------------------------------------------------------------------------------------- |
1188 | | |
1189 | 2.66k | void GetSafeTimeForTablet::Launch() { |
1190 | 2.66k | tablet_->table()->AddTask(shared_from_this()); |
1191 | 2.66k | Status status = Run(); |
1192 | | // Need to print this after Run() because that's where it picks the TS which description() |
1193 | | // needs. |
1194 | 2.66k | if (status.ok()) { |
1195 | 1 | VLOG(3) << "Started GetSafeTimeForTablet : " << this->description(); |
1196 | 0 | } else { |
1197 | 0 | LOG(WARNING) << Substitute("Failed to send GetSafeTime request for $0. ", |
1198 | 0 | tablet_->ToString()) |
1199 | 0 | << status; |
1200 | 0 | } |
1201 | 2.66k | } |
1202 | | |
1203 | 2.66k | bool GetSafeTimeForTablet::SendRequest(int attempt) { |
1204 | 1 | VLOG(1) << __PRETTY_FUNCTION__; |
1205 | 2.66k | tserver::GetSafeTimeRequestPB req; |
1206 | 2.66k | req.set_dest_uuid(permanent_uuid()); |
1207 | 2.66k | req.set_tablet_id(tablet_->tablet_id()); |
1208 | 2.66k | auto now = backfill_table_->master()->clock()->Now().ToUint64(); |
1209 | 2.66k | req.set_min_hybrid_time_for_backfill(min_cutoff_.ToUint64()); |
1210 | 2.66k | req.set_propagated_hybrid_time(now); |
1211 | | |
1212 | 2.66k | ts_admin_proxy_->GetSafeTimeAsync(req, &resp_, &rpc_, BindRpcCallback()); |
1213 | 1 | VLOG(1) << "Send " << description() << " to " << permanent_uuid() |
1214 | 1 | << " (attempt " << attempt << "):\n" |
1215 | 1 | << req.DebugString(); |
1216 | 2.66k | return true; |
1217 | 2.66k | } |
1218 | | |
1219 | 2.65k | void GetSafeTimeForTablet::HandleResponse(int attempt) { |
1220 | 4 | VLOG(1) << __PRETTY_FUNCTION__; |
1221 | 2.65k | Status status = Status::OK(); |
1222 | 2.65k | if (resp_.has_error()) { |
1223 | 5 | status = StatusFromPB(resp_.error().status()); |
1224 | | |
1225 | | // Do not retry on a fatal error |
1226 | 5 | switch (resp_.error().code()) { |
1227 | 0 | case TabletServerErrorPB::TABLET_NOT_FOUND: |
1228 | 0 | case TabletServerErrorPB::MISMATCHED_SCHEMA: |
1229 | 0 | case TabletServerErrorPB::TABLET_HAS_A_NEWER_SCHEMA: |
1230 | 0 | case TabletServerErrorPB::OPERATION_NOT_SUPPORTED: |
1231 | 0 | LOG(WARNING) << "TS " << permanent_uuid() << ": GetSafeTime failed for tablet " |
1232 | 0 | << tablet_->ToString() << " no further retry: " << status; |
1233 | 0 | TransitionToFailedState(MonitoredTaskState::kRunning, status); |
1234 | 0 | break; |
1235 | 5 | default: |
1236 | 5 | LOG(WARNING) << "TS " << permanent_uuid() << ": GetSafeTime failed for tablet " |
1237 | 5 | << tablet_->ToString() << ": " << status << " code " << resp_.error().code(); |
1238 | 5 | break; |
1239 | 2.65k | } |
1240 | 2.65k | } else { |
1241 | 2.65k | TransitionToCompleteState(); |
1242 | 2 | VLOG(1) << "TS " << permanent_uuid() << ": GetSafeTime complete on tablet " |
1243 | 2 | << tablet_->ToString(); |
1244 | 2.65k | } |
1245 | | |
1246 | 2.65k | server::UpdateClock(resp_, master_->clock()); |
1247 | 2.65k | } |
1248 | | |
1249 | 2.66k | void GetSafeTimeForTablet::UnregisterAsyncTaskCallback() { |
1250 | 2.66k | Status status; |
1251 | 2.66k | HybridTime safe_time; |
1252 | 2.66k | if (resp_.has_error()) { |
1253 | 0 | status = StatusFromPB(resp_.error().status()); |
1254 | 0 | VLOG(3) << "GetSafeTime for " << tablet_->ToString() << " got an error. Returning " |
1255 | 0 | << safe_time; |
1256 | 2.66k | } else if (state() != MonitoredTaskState::kComplete) { |
1257 | 4 | status = STATUS_FORMAT(InternalError, "$0 in state $1", description(), state()); |
1258 | 2.65k | } else { |
1259 | 2.65k | safe_time = HybridTime(resp_.safe_time()); |
1260 | 2.65k | if (safe_time.is_special()) { |
1261 | 0 | LOG(ERROR) << "GetSafeTime for " << tablet_->ToString() << " got " << safe_time; |
1262 | 2.65k | } else { |
1263 | 1 | VLOG(3) << "GetSafeTime for " << tablet_->ToString() << " got " << safe_time; |
1264 | 2.65k | } |
1265 | 2.65k | } |
1266 | 2.66k | WARN_NOT_OK(backfill_table_->UpdateSafeTime(status, safe_time), |
1267 | 2.66k | "Could not UpdateSafeTime"); |
1268 | 2.66k | } |
1269 | | |
1270 | 2.67k | TabletServerId GetSafeTimeForTablet::permanent_uuid() { |
1271 | 2.67k | return target_ts_desc_ != nullptr ? target_ts_desc_->permanent_uuid() : ""; |
1272 | 2.67k | } |
1273 | | |
1274 | | BackfillChunk::BackfillChunk(std::shared_ptr<BackfillTablet> backfill_tablet, |
1275 | | const std::string& start_key) |
1276 | | : RetryingTSRpcTask(backfill_tablet->master(), |
1277 | | backfill_tablet->threadpool(), |
1278 | | std::unique_ptr<TSPicker>(new PickLeaderReplica(backfill_tablet->tablet())), |
1279 | | backfill_tablet->tablet()->table().get()), |
1280 | | indexes_being_backfilled_(backfill_tablet->indexes_to_build()), |
1281 | | backfill_tablet_(backfill_tablet), |
1282 | | start_key_(start_key), |
1283 | | requested_index_names_(RetrieveIndexNames(backfill_tablet->master()->catalog_manager_impl(), |
1284 | 2.72k | indexes_being_backfilled_)) { |
1285 | 2.72k | deadline_ = MonoTime::Max(); // Never time out. |
1286 | 2.72k | } |
1287 | | |
1288 | | // ----------------------------------------------------------------------------------------------- |
1289 | | // BackfillChunk |
1290 | | // ----------------------------------------------------------------------------------------------- |
1291 | 2.72k | void BackfillChunk::Launch() { |
1292 | 2.72k | backfill_tablet_->tablet()->table()->AddTask(shared_from_this()); |
1293 | 2.72k | Status status = Run(); |
1294 | 2.72k | WARN_NOT_OK( |
1295 | 2.72k | status, Substitute( |
1296 | 2.72k | "Failed to send backfill Chunk request for $0", |
1297 | 2.72k | backfill_tablet_->tablet().get()->ToString())); |
1298 | | |
1299 | | // Need to print this after Run() because that's where it picks the TS which description() |
1300 | | // needs. |
1301 | 2.72k | if (status.ok()) { |
1302 | 2.72k | LOG(INFO) << "Started BackfillChunk : " << this->description(); |
1303 | 2.72k | } |
1304 | 2.72k | } |
1305 | | |
1306 | 2.72k | MonoTime BackfillChunk::ComputeDeadline() { |
1307 | 2.72k | MonoTime timeout = MonoTime::Now(); |
1308 | 2.72k | if (GetTableType() == TableType::PGSQL_TABLE_TYPE) { |
1309 | 235 | timeout.AddDelta(MonoDelta::FromMilliseconds(FLAGS_ysql_index_backfill_rpc_timeout_ms)); |
1310 | 2.48k | } else { |
1311 | 2.48k | DCHECK(GetTableType() == TableType::YQL_TABLE_TYPE); |
1312 | 2.48k | timeout.AddDelta(MonoDelta::FromMilliseconds(FLAGS_index_backfill_rpc_timeout_ms)); |
1313 | 2.48k | } |
1314 | 2.72k | return MonoTime::Earliest(timeout, deadline_); |
1315 | 2.72k | } |
1316 | | |
1317 | 6 | int BackfillChunk::num_max_retries() { |
1318 | 6 | return FLAGS_index_backfill_rpc_max_retries; |
1319 | 6 | } |
1320 | | |
1321 | 0 | int BackfillChunk::max_delay_ms() { |
1322 | 0 | return FLAGS_index_backfill_rpc_max_delay_ms; |
1323 | 0 | } |
1324 | | |
1325 | 8.09k | std::string BackfillChunk::description() const { |
1326 | 8.09k | return yb::Format("Backfilling indexes $0 for tablet $1 from key '$2'", |
1327 | 8.09k | requested_index_names_, tablet_id(), |
1328 | 8.09k | b2a_hex(start_key_)); |
1329 | 8.09k | } |
1330 | | |
1331 | 2.72k | bool BackfillChunk::SendRequest(int attempt) { |
1332 | 1 | VLOG(1) << __PRETTY_FUNCTION__; |
1333 | 2.72k | if (indexes_being_backfilled_.empty()) { |
1334 | 0 | TransitionToCompleteState(); |
1335 | 0 | return false; |
1336 | 0 | } |
1337 | | |
1338 | 2.72k | tserver::BackfillIndexRequestPB req; |
1339 | 2.72k | req.set_dest_uuid(permanent_uuid()); |
1340 | 2.72k | req.set_tablet_id(backfill_tablet_->tablet()->tablet_id()); |
1341 | 2.72k | req.set_read_at_hybrid_time(backfill_tablet_->read_time_for_backfill().ToUint64()); |
1342 | 2.72k | req.set_schema_version(backfill_tablet_->schema_version()); |
1343 | 2.72k | req.set_start_key(start_key_); |
1344 | 2.72k | req.set_indexed_table_id(backfill_tablet_->indexed_table_id()); |
1345 | 2.72k | if (GetTableType() == TableType::PGSQL_TABLE_TYPE) { |
1346 | 235 | req.set_namespace_name(backfill_tablet_->GetNamespaceName()); |
1347 | 235 | } |
1348 | 2.72k | std::unordered_set<TableId> found_idxs; |
1349 | 2.76k | for (const IndexInfoPB& idx_info : backfill_tablet_->index_infos()) { |
1350 | 2.76k | if (indexes_being_backfilled_.find(idx_info.table_id()) != indexes_being_backfilled_.end()) { |
1351 | 2.75k | req.add_indexes()->CopyFrom(idx_info); |
1352 | 2.75k | found_idxs.insert(idx_info.table_id()); |
1353 | 2.75k | } |
1354 | 2.76k | } |
1355 | 2.72k | if (found_idxs.size() != indexes_being_backfilled_.size()) { |
1356 | | // We could not find the IndexInfoPB for all the requested indexes. This can happen |
1357 | | // if that index was deleted while the backfill was still going on. |
1358 | | // We are going to fail fast and mark that index as failed. |
1359 | 0 | for (auto& idx : indexes_being_backfilled_) { |
1360 | 0 | if (found_idxs.find(idx) == found_idxs.end()) { |
1361 | 0 | VLOG_WITH_PREFIX(3) << "Marking " << idx << " as failed"; |
1362 | 0 | *resp_.add_failed_index_ids() = idx; |
1363 | 0 | } |
1364 | 0 | } |
1365 | 0 | const string error_message("Could not find IndexInfoPB for some indexes"); |
1366 | 0 | resp_.mutable_error()->mutable_status()->set_code(AppStatusPB::NOT_FOUND); |
1367 | 0 | resp_.mutable_error()->mutable_status()->set_message(error_message); |
1368 | 0 | TransitionToFailedState(MonitoredTaskState::kRunning, |
1369 | 0 | STATUS(NotFound, error_message)); |
1370 | 0 | return false; |
1371 | 0 | } |
1372 | 2.72k | req.set_propagated_hybrid_time(backfill_tablet_->master()->clock()->Now().ToUint64()); |
1373 | | |
1374 | 2.72k | ts_admin_proxy_->BackfillIndexAsync(req, &resp_, &rpc_, BindRpcCallback()); |
1375 | 1 | VLOG(1) << "Send " << description() << " to " << permanent_uuid() |
1376 | 1 | << " (attempt " << attempt << "):\n" |
1377 | 1 | << req.DebugString(); |
1378 | 2.72k | return true; |
1379 | 2.72k | } |
1380 | | |
1381 | 2.62k | void BackfillChunk::HandleResponse(int attempt) { |
1382 | 4 | VLOG(1) << __PRETTY_FUNCTION__ << " response is " << yb::ToString(resp_); |
1383 | 2.62k | Status status; |
1384 | 2.62k | if (resp_.has_error()) { |
1385 | 6 | status = StatusFromPB(resp_.error().status()); |
1386 | | |
1387 | | // Do not retry on a fatal error |
1388 | 6 | switch (resp_.error().code()) { |
1389 | 0 | case TabletServerErrorPB::MISMATCHED_SCHEMA: |
1390 | 6 | case TabletServerErrorPB::OPERATION_NOT_SUPPORTED: |
1391 | 6 | case TabletServerErrorPB::TABLET_HAS_A_NEWER_SCHEMA: |
1392 | 6 | case TabletServerErrorPB::TABLET_NOT_FOUND: |
1393 | 6 | LOG(WARNING) << "TS " << permanent_uuid() << ": backfill failed for tablet " |
1394 | 6 | << backfill_tablet_->tablet()->ToString() << " no further retry: " << status |
1395 | 6 | << " response was " << yb::ToString(resp_); |
1396 | 6 | TransitionToFailedState(MonitoredTaskState::kRunning, status); |
1397 | 6 | break; |
1398 | 0 | default: |
1399 | 0 | LOG(WARNING) << "TS " << permanent_uuid() << ": backfill failed for tablet " |
1400 | 0 | << backfill_tablet_->tablet()->ToString() << ": " << status.ToString() |
1401 | 0 | << " code " << resp_.error().code(); |
1402 | 0 | break; |
1403 | 2.61k | } |
1404 | 2.61k | } else { |
1405 | 2.61k | TransitionToCompleteState(); |
1406 | 1 | VLOG(1) << "TS " << permanent_uuid() << ": backfill complete on tablet " |
1407 | 1 | << backfill_tablet_->tablet()->ToString(); |
1408 | 2.61k | } |
1409 | | |
1410 | 2.62k | server::UpdateClock(resp_, master_->clock()); |
1411 | 2.62k | } |
1412 | | |
1413 | 2.71k | void BackfillChunk::UnregisterAsyncTaskCallback() { |
1414 | 2.71k | if (state() == MonitoredTaskState::kAborted) { |
1415 | 0 | VLOG(1) << " was aborted"; |
1416 | 84 | return; |
1417 | 84 | } |
1418 | | |
1419 | 2.62k | Status status; |
1420 | 2.62k | std::unordered_set<TableId> failed_indexes; |
1421 | 2.62k | if (resp_.has_error()) { |
1422 | 6 | status = StatusFromPB(resp_.error().status()); |
1423 | 6 | if (resp_.failed_index_ids_size() > 0) { |
1424 | 12 | for (int i = 0; i < resp_.failed_index_ids_size(); i++) { |
1425 | 1 | VLOG(1) << " Added to failed index " << resp_.failed_index_ids(i); |
1426 | 6 | failed_indexes.insert(resp_.failed_index_ids(i)); |
1427 | 6 | } |
1428 | 0 | } else { |
1429 | | // No specific index was marked as a failure. So consider all of them as failed. |
1430 | 0 | failed_indexes = indexes_being_backfilled_; |
1431 | 0 | } |
1432 | 2.62k | } else if (state() != MonitoredTaskState::kComplete) { |
1433 | | // There is no response, so the error happened even before we could |
1434 | | // get a response. Mark all indexes as failed. |
1435 | 3 | failed_indexes = indexes_being_backfilled_; |
1436 | 0 | VLOG(3) << "Considering all indexes : " |
1437 | 0 | << yb::ToString(indexes_being_backfilled_) |
1438 | 0 | << " as failed."; |
1439 | 3 | status = STATUS_FORMAT(InternalError, "$0 in state $1", description(), state()); |
1440 | 3 | } |
1441 | | |
1442 | 2.62k | if (resp_.has_backfilled_until()) { |
1443 | 2.62k | backfill_tablet_->Done( |
1444 | 2.62k | status, resp_.backfilled_until(), resp_.number_rows_processed(), failed_indexes); |
1445 | 3 | } else { |
1446 | 3 | backfill_tablet_->Done(status, boost::none, resp_.number_rows_processed(), failed_indexes); |
1447 | 3 | } |
1448 | 2.62k | } |
1449 | | |
1450 | 2.73k | TabletServerId BackfillChunk::permanent_uuid() { |
1451 | 2.73k | return target_ts_desc_ != nullptr ? target_ts_desc_->permanent_uuid() : ""; |
1452 | 2.73k | } |
1453 | | |
1454 | | } // namespace master |
1455 | | } // namespace yb |