/Users/deen/code/yugabyte-db/src/yb/master/backfill_index.cc
Line  | Count  | Source (jump to first uncovered line)  | 
1  |  | // Copyright (c) YugaByte, Inc.  | 
2  |  | //  | 
3  |  | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except  | 
4  |  | // in compliance with the License.  You may obtain a copy of the License at  | 
5  |  | //  | 
6  |  | // http://www.apache.org/licenses/LICENSE-2.0  | 
7  |  | //  | 
8  |  | // Unless required by applicable law or agreed to in writing, software distributed under the License  | 
9  |  | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express  | 
10  |  | // or implied.  See the License for the specific language governing permissions and limitations  | 
11  |  | // under the License.  | 
12  |  | //  | 
13  |  |  | 
14  |  | #include "yb/master/backfill_index.h"  | 
15  |  |  | 
16  |  | #include <pthread.h>  | 
17  |  | #include <stdlib.h>  | 
18  |  | #include <sys/types.h>  | 
19  |  |  | 
20  |  | #include <algorithm>  | 
21  |  | #include <bitset>  | 
22  |  | #include <functional>  | 
23  |  | #include <memory>  | 
24  |  | #include <mutex>  | 
25  |  | #include <set>  | 
26  |  | #include <string>  | 
27  |  | #include <unordered_map>  | 
28  |  | #include <vector>  | 
29  |  |  | 
30  |  | #include <boost/optional.hpp>  | 
31  |  | #include <boost/preprocessor/cat.hpp>  | 
32  |  | #include <glog/logging.h>  | 
33  |  |  | 
34  |  | #include "yb/common/partial_row.h"  | 
35  |  | #include "yb/common/partition.h"  | 
36  |  | #include "yb/common/wire_protocol.h"  | 
37  |  |  | 
38  |  | #include "yb/docdb/doc_rowwise_iterator.h"  | 
39  |  |  | 
40  |  | #include "yb/gutil/atomicops.h"  | 
41  |  | #include "yb/gutil/callback.h"  | 
42  |  | #include "yb/gutil/casts.h"  | 
43  |  | #include "yb/gutil/map-util.h"  | 
44  |  | #include "yb/gutil/mathlimits.h"  | 
45  |  | #include "yb/gutil/ref_counted.h"  | 
46  |  | #include "yb/gutil/stl_util.h"  | 
47  |  | #include "yb/gutil/strings/escaping.h"  | 
48  |  | #include "yb/gutil/strings/join.h"  | 
49  |  | #include "yb/gutil/strings/substitute.h"  | 
50  |  | #include "yb/gutil/sysinfo.h"  | 
51  |  |  | 
52  |  | #include "yb/master/master_fwd.h"  | 
53  |  | #include "yb/master/async_rpc_tasks.h"  | 
54  |  | #include "yb/master/catalog_manager.h"  | 
55  |  | #include "yb/master/master.h"  | 
56  |  | #include "yb/master/master_ddl.pb.h"  | 
57  |  | #include "yb/master/sys_catalog.h"  | 
58  |  |  | 
59  |  | #include "yb/tablet/tablet.h"  | 
60  |  | #include "yb/tablet/tablet_metadata.h"  | 
61  |  | #include "yb/tablet/tablet_peer.h"  | 
62  |  |  | 
63  |  | #include "yb/tserver/tserver_admin.proxy.h"  | 
64  |  |  | 
65  |  | #include "yb/util/flag_tags.h"  | 
66  |  | #include "yb/util/format.h"  | 
67  |  | #include "yb/util/math_util.h"  | 
68  |  | #include "yb/util/monotime.h"  | 
69  |  | #include "yb/util/random_util.h"  | 
70  |  | #include "yb/util/result.h"  | 
71  |  | #include "yb/util/status_format.h"  | 
72  |  | #include "yb/util/status_log.h"  | 
73  |  | #include "yb/util/threadpool.h"  | 
74  |  | #include "yb/util/trace.h"  | 
75  |  | #include "yb/util/uuid.h"  | 
76  |  |  | 
77  |  | DEFINE_int32(ysql_index_backfill_rpc_timeout_ms, 60 * 1000, // 1 min.  | 
78  |  |              "Timeout used by the master when attempting to backfill a YSQL tablet during index "  | 
79  |  |              "creation.");  | 
80  |  | TAG_FLAG(ysql_index_backfill_rpc_timeout_ms, advanced);  | 
81  |  | TAG_FLAG(ysql_index_backfill_rpc_timeout_ms, runtime);  | 
82  |  |  | 
83  |  | DEFINE_int32(index_backfill_rpc_timeout_ms, 1 * 30 * 1000, // 30 sec.  | 
84  |  |              "Timeout used by the master when attempting to backfill a tablet "  | 
85  |  |              "during index creation.");  | 
86  |  | TAG_FLAG(index_backfill_rpc_timeout_ms, advanced);  | 
87  |  | TAG_FLAG(index_backfill_rpc_timeout_ms, runtime);  | 
88  |  |  | 
89  |  | DEFINE_int32(index_backfill_rpc_max_retries, 150,  | 
90  |  |              "Number of times to retry backfilling a tablet chunk "  | 
91  |  |              "during index creation.");  | 
92  |  | TAG_FLAG(index_backfill_rpc_max_retries, advanced);  | 
93  |  | TAG_FLAG(index_backfill_rpc_max_retries, runtime);  | 
94  |  |  | 
95  |  | DEFINE_int32(index_backfill_rpc_max_delay_ms, 10 * 60 * 1000, // 10 min.  | 
96  |  |              "Maximum delay before retrying a backfill tablet chunk request "  | 
97  |  |              "during index creation.");  | 
98  |  | TAG_FLAG(index_backfill_rpc_max_delay_ms, advanced);  | 
99  |  | TAG_FLAG(index_backfill_rpc_max_delay_ms, runtime);  | 
100  |  |  | 
101  |  | DEFINE_int32(index_backfill_wait_for_alter_table_completion_ms, 100,  | 
102  |  |              "Delay before retrying to see if an in-progress alter table has "  | 
103  |  |              "completed, during index backfill.");  | 
104  |  | TAG_FLAG(index_backfill_wait_for_alter_table_completion_ms, advanced);  | 
105  |  | TAG_FLAG(index_backfill_wait_for_alter_table_completion_ms, runtime);  | 
106  |  |  | 
107  |  | DEFINE_bool(defer_index_backfill, false,  | 
108  |  |             "Defer index backfill so that backfills can be performed as a batch later on.");  | 
109  |  | TAG_FLAG(defer_index_backfill, advanced);  | 
110  |  | TAG_FLAG(defer_index_backfill, runtime);  | 
111  |  |  | 
112  |  | DEFINE_test_flag(int32, slowdown_backfill_alter_table_rpcs_ms, 0,  | 
113  |  |     "Slows down the send alter table rpc's so that the master may be stopped between "  | 
114  |  |     "different phases.");  | 
115  |  |  | 
116  |  | DEFINE_test_flag(  | 
117  |  |     int32, slowdown_backfill_job_deletion_ms, 0,  | 
118  |  |     "Slows down backfill job deletion so that backfill job can be read by test.");  | 
119  |  |  | 
120  |  | namespace yb { | 
121  |  | namespace master { | 
122  |  |  | 
123  |  | using namespace std::literals;  | 
124  |  | using server::MonitoredTaskState;  | 
125  |  | using strings::Substitute;  | 
126  |  | using tserver::TabletServerErrorPB;  | 
127  |  |  | 
128  |  | namespace { | 
129  |  |  | 
130  |  | // Peek into pg_index table to get an index (boolean) status from YSQL perspective.  | 
131  |  | Result<bool> GetPgIndexStatus(  | 
132  |  |     CatalogManager* catalog_manager,  | 
133  |  |     const TableId& idx_id,  | 
134  | 0  |     const std::string& status_col_name) { | 
135  | 0  |   const auto pg_index_id =  | 
136  | 0  |       GetPgsqlTableId(VERIFY_RESULT(GetPgsqlDatabaseOid(idx_id)), kPgIndexTableOid);  | 
137  |  |  | 
138  | 0  |   const tablet::Tablet* catalog_tablet =  | 
139  | 0  |       catalog_manager->tablet_peer()->tablet();  | 
140  | 0  |   const Schema& pg_index_schema =  | 
141  | 0  |       *VERIFY_RESULT(catalog_tablet->metadata()->GetTableInfo(pg_index_id))->schema;  | 
142  |  |  | 
143  | 0  |   Schema projection;  | 
144  | 0  |   RETURN_NOT_OK(pg_index_schema.CreateProjectionByNames({"indexrelid", status_col_name}, | 
145  | 0  |                                                         &projection,  | 
146  | 0  |                                                         pg_index_schema.num_key_columns()));  | 
147  |  |  | 
148  | 0  |   const auto indexrelid_col_id = VERIFY_RESULT(projection.ColumnIdByName("indexrelid")).rep(); | 
149  | 0  |   const auto status_col_id     = VERIFY_RESULT(projection.ColumnIdByName(status_col_name)).rep();  | 
150  |  |  | 
151  | 0  |   const auto idx_oid = VERIFY_RESULT(GetPgsqlTableOid(idx_id));  | 
152  |  |  | 
153  | 0  |   auto iter = VERIFY_RESULT(catalog_tablet->NewRowIterator(projection.CopyWithoutColumnIds(),  | 
154  | 0  |                                                            {} /* read_hybrid_time */, | 
155  | 0  |                                                            pg_index_id));  | 
156  |  |  | 
157  |  |   // Filtering by 'indexrelid' == idx_oid.  | 
158  | 0  |   { | 
159  | 0  |     auto doc_iter = down_cast<docdb::DocRowwiseIterator*>(iter.get());  | 
160  | 0  |     PgsqlConditionPB cond;  | 
161  | 0  |     cond.add_operands()->set_column_id(indexrelid_col_id);  | 
162  | 0  |     cond.set_op(QL_OP_EQUAL);  | 
163  | 0  |     cond.add_operands()->mutable_value()->set_uint32_value(idx_oid);  | 
164  | 0  |     const std::vector<docdb::PrimitiveValue> empty_key_components;  | 
165  | 0  |     docdb::DocPgsqlScanSpec spec(projection,  | 
166  | 0  |                                  rocksdb::kDefaultQueryId,  | 
167  | 0  |                                  empty_key_components,  | 
168  | 0  |                                  empty_key_components,  | 
169  | 0  |                                  &cond,  | 
170  | 0  |                                  boost::none /* hash_code */,  | 
171  | 0  |                                  boost::none /* max_hash_code */,  | 
172  | 0  |                                  nullptr /* where_expr */);  | 
173  | 0  |     RETURN_NOT_OK(doc_iter->Init(spec));  | 
174  | 0  |   }  | 
175  |  |  | 
176  |  |   // Expecting one row at most.  | 
177  | 0  |   QLTableRow row;  | 
178  | 0  |   if (VERIFY_RESULT(iter->HasNext())) { | 
179  | 0  |     RETURN_NOT_OK(iter->NextRow(&row));  | 
180  | 0  |     return row.GetColumn(status_col_id)->bool_value();  | 
181  | 0  |   }  | 
182  |  |  | 
183  |  |   // For practical purposes, an absent index is the same as having false status column value.  | 
184  | 0  |   return false;  | 
185  | 0  | }  | 
186  |  |  | 
187  |  | // Before advancing index permissions, we need to make sure Postgres side has advanced sufficiently  | 
188  |  | // - that the state tracked in pg_index haven't fallen behind from the desired permission  | 
189  |  | // for more than one step.  | 
190  |  | Result<bool> ShouldProceedWithPgsqlIndexPermissionUpdate(  | 
191  |  |     CatalogManager* catalog_manager,  | 
192  |  |     const TableId& idx_id,  | 
193  | 534  |     IndexPermissions new_perm) { | 
194  |  |   // TODO(alex, jason): Add the appropriate cases for dropping index path  | 
195  | 534  |   switch (new_perm) { | 
196  | 0  |     case INDEX_PERM_WRITE_AND_DELETE: { | 
197  | 0  |       auto live = VERIFY_RESULT(GetPgIndexStatus(catalog_manager, idx_id, "indislive"));  | 
198  | 0  |       if (!live) { | 
199  | 0  |         VLOG(1) << "Index " << idx_id << " is not yet live, skipping permission update";  | 
200  | 0  |       }  | 
201  | 0  |       return live;  | 
202  | 0  |     }  | 
203  | 0  |     case INDEX_PERM_DO_BACKFILL: { | 
204  | 0  |       auto ready = VERIFY_RESULT(GetPgIndexStatus(catalog_manager, idx_id, "indisready"));  | 
205  | 0  |       if (!ready) { | 
206  | 0  |         VLOG(1) << "Index " << idx_id << " is not yet ready, skipping permission update";  | 
207  | 0  |       }  | 
208  | 0  |       return ready;  | 
209  | 0  |     }  | 
210  | 534  |     default:  | 
211  |  |       // No need to wait for anything  | 
212  | 534  |       return true;  | 
213  | 534  |   }  | 
214  | 534  | }  | 
215  |  |  | 
216  |  | } // namespace  | 
217  |  |  | 
218  | 4.41k  | void MultiStageAlterTable::CopySchemaDetailsToFullyApplied(SysTablesEntryPB* pb) { | 
219  | 4.41k  |   VLOG(4) << "Setting fully_applied_schema_version to " << pb->version()0 ;  | 
220  | 4.41k  |   pb->mutable_fully_applied_schema()->CopyFrom(pb->schema());  | 
221  | 4.41k  |   pb->set_fully_applied_schema_version(pb->version());  | 
222  | 4.41k  |   pb->mutable_fully_applied_indexes()->CopyFrom(pb->indexes());  | 
223  | 4.41k  |   if (pb->has_index_info()) { | 
224  | 0  |     pb->mutable_fully_applied_index_info()->CopyFrom(pb->index_info());  | 
225  | 0  |   }  | 
226  | 4.41k  | }  | 
227  |  |  | 
228  |  | Status MultiStageAlterTable::ClearFullyAppliedAndUpdateState(  | 
229  |  |     CatalogManager* catalog_manager,  | 
230  |  |     const scoped_refptr<TableInfo>& table,  | 
231  |  |     boost::optional<uint32_t> expected_version,  | 
232  | 9.85k  |     bool update_state_to_running) { | 
233  | 9.85k  |   auto l = table->LockForWrite();  | 
234  | 9.85k  |   uint32_t current_version = l->pb.version();  | 
235  | 9.85k  |   if (expected_version && *expected_version != current_version9.30k ) {  | 
236  | 9  |     return STATUS(AlreadyPresent, "Table has already moved to a different version.");  | 
237  | 9  |   }  | 
238  | 9.84k  |   l.mutable_data()->pb.clear_fully_applied_schema();  | 
239  | 9.84k  |   l.mutable_data()->pb.clear_fully_applied_schema_version();  | 
240  | 9.84k  |   l.mutable_data()->pb.clear_fully_applied_indexes();  | 
241  | 9.84k  |   l.mutable_data()->pb.clear_fully_applied_index_info();  | 
242  | 9.84k  |   auto new_state = update_state_to_running ? SysTablesEntryPB::RUNNING8.89k  : SysTablesEntryPB::ALTERING950 ;  | 
243  | 9.84k  |   l.mutable_data()->set_state(new_state, Format("Current schema version=$0", current_version)); | 
244  |  |  | 
245  | 9.84k  |   Status s = catalog_manager->sys_catalog_->Upsert(catalog_manager->leader_ready_term(), table);  | 
246  | 9.84k  |   if (!s.ok()) { | 
247  | 0  |     LOG(WARNING) << "An error occurred while updating sys-tables: " << s.ToString()  | 
248  | 0  |                  << ". This master may not be the leader anymore.";  | 
249  | 0  |     return s;  | 
250  | 0  |   }  | 
251  |  |  | 
252  | 9.84k  |   l.Commit();  | 
253  | 9.84k  |   LOG(INFO) << table->ToString() << " - Alter table completed version=" << current_version  | 
254  | 9.84k  |             << ", state: " << SysTablesEntryPB::State_Name(new_state);  | 
255  | 9.84k  |   return Status::OK();  | 
256  | 9.84k  | }  | 
257  |  |  | 
258  |  | Result<bool> MultiStageAlterTable::UpdateIndexPermission(  | 
259  |  |     CatalogManager* catalog_manager,  | 
260  |  |     const scoped_refptr<TableInfo>& indexed_table,  | 
261  |  |     const std::unordered_map<TableId, IndexPermissions>& perm_mapping,  | 
262  | 2.13k  |     boost::optional<uint32_t> current_version) { | 
263  | 2.13k  |   TRACE(__func__);  | 
264  | 2.13k  |   DVLOG(3) << __PRETTY_FUNCTION__ << " " << yb::ToString(*indexed_table)5 ;  | 
265  | 2.13k  |   if (FLAGS_TEST_slowdown_backfill_alter_table_rpcs_ms > 0) { | 
266  | 196  |     TRACE("Sleeping for $0 ms", FLAGS_TEST_slowdown_backfill_alter_table_rpcs_ms); | 
267  | 196  |     DVLOG(3) << __PRETTY_FUNCTION__ << " " << yb::ToString(*indexed_table) << " sleeping for "  | 
268  | 5  |              << FLAGS_TEST_slowdown_backfill_alter_table_rpcs_ms  | 
269  | 5  |              << "ms BEFORE updating the index permission to " << ToString(perm_mapping);  | 
270  | 196  |     SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_slowdown_backfill_alter_table_rpcs_ms));  | 
271  | 196  |     DVLOG(3) << __PRETTY_FUNCTION__ << " Done Sleeping"7 ;  | 
272  | 196  |     TRACE("Done Sleeping"); | 
273  | 196  |   }  | 
274  |  |  | 
275  | 2.13k  |   bool permissions_updated = false;  | 
276  | 2.13k  |   { | 
277  | 2.13k  |     TRACE("Locking indexed table"); | 
278  | 2.13k  |     auto l = indexed_table->LockForWrite();  | 
279  | 2.13k  |     auto& indexed_table_data = *l.mutable_data();  | 
280  | 2.13k  |     auto& indexed_table_pb = indexed_table_data.pb;  | 
281  | 2.13k  |     if (current_version && *current_version != indexed_table_pb.version()1.14k ) {  | 
282  | 180  |       LOG(INFO) << "The table schema version "  | 
283  | 180  |                 << "seems to have already been updated to " << indexed_table_pb.version()  | 
284  | 180  |                 << " We wanted to do this update at " << *current_version;  | 
285  | 180  |       return STATUS_SUBSTITUTE(  | 
286  | 180  |           AlreadyPresent, "Schema was already updated to $0 before we got to it (expected $1).",  | 
287  | 180  |           indexed_table_pb.version(), *current_version);  | 
288  | 180  |     }  | 
289  |  |  | 
290  | 1.95k  |     CopySchemaDetailsToFullyApplied(&indexed_table_pb);  | 
291  | 1.95k  |     bool is_pgsql = indexed_table_pb.table_type() == TableType::PGSQL_TABLE_TYPE;  | 
292  | 5.48k  |     for (int i = 0; i < indexed_table_pb.indexes_size(); i++3.52k ) {  | 
293  | 3.52k  |       IndexInfoPB* idx_pb = indexed_table_pb.mutable_indexes(i);  | 
294  | 3.52k  |       auto& idx_table_id = idx_pb->table_id();  | 
295  | 3.52k  |       if (perm_mapping.find(idx_table_id) != perm_mapping.end()) { | 
296  | 1.97k  |         const auto new_perm = perm_mapping.at(idx_table_id);  | 
297  | 1.97k  |         if (idx_pb->index_permissions() >= new_perm) { | 
298  | 1  |           LOG(WARNING) << "Index " << idx_pb->table_id() << " on table "  | 
299  | 1  |                        << indexed_table->ToString() << " has index_permission "  | 
300  | 1  |                        << IndexPermissions_Name(idx_pb->index_permissions()) << " already past "  | 
301  | 1  |                        << IndexPermissions_Name(new_perm) << ". Will not update it";  | 
302  | 1  |           continue;  | 
303  | 1  |         }  | 
304  |  |         // TODO(alex, amit): Non-OK status here should be converted to TryAgain,  | 
305  |  |         //                   which should be handled on an upper level.  | 
306  | 1.97k  |         if (is_pgsql && !534 VERIFY_RESULT534 (ShouldProceedWithPgsqlIndexPermissionUpdate(catalog_manager,  | 
307  | 1.97k  |                                                                                    idx_table_id,  | 
308  | 1.97k  |                                                                                    new_perm))) { | 
309  | 0  |           continue;  | 
310  | 0  |         }  | 
311  | 1.97k  |         idx_pb->set_index_permissions(new_perm);  | 
312  | 1.97k  |         permissions_updated = true;  | 
313  | 1.97k  |       }  | 
314  | 3.52k  |     }  | 
315  |  |  | 
316  | 1.95k  |     if (permissions_updated) { | 
317  | 1.94k  |       indexed_table_pb.set_version(indexed_table_pb.version() + 1);  | 
318  | 1.94k  |       indexed_table_pb.set_updates_only_index_permissions(true);  | 
319  | 1.94k  |     } else { | 
320  | 7  |       VLOG(1) << "Index permissions update skipped, leaving schema_version at "  | 
321  | 1  |               << indexed_table_pb.version();  | 
322  | 7  |     }  | 
323  | 1.95k  |     indexed_table_data.set_state(  | 
324  | 1.95k  |         SysTablesEntryPB::ALTERING,  | 
325  | 1.95k  |         Format("Update index permission version=$0 ts=$1", | 
326  | 1.95k  |                indexed_table_pb.version(), LocalTimeAsString()));  | 
327  |  |  | 
328  |  |     // Update sys-catalog with the new indexed table info.  | 
329  | 1.95k  |     TRACE("Updating indexed table metadata on disk"); | 
330  | 1.95k  |     RETURN_NOT_OK(catalog_manager->sys_catalog_->Upsert(  | 
331  | 1.95k  |         catalog_manager->leader_ready_term(), indexed_table));  | 
332  |  |  | 
333  |  |     // Update the in-memory state.  | 
334  | 1.95k  |     TRACE("Committing in-memory state"); | 
335  | 1.95k  |     l.Commit();  | 
336  | 1.95k  |   }  | 
337  | 1.95k  |   if (PREDICT_FALSE(FLAGS_TEST_slowdown_backfill_alter_table_rpcs_ms > 0)) { | 
338  | 139  |     TRACE("Sleeping for $0 ms", | 
339  | 139  |           FLAGS_TEST_slowdown_backfill_alter_table_rpcs_ms);  | 
340  | 139  |     DVLOG(3) << __PRETTY_FUNCTION__ << " " << yb::ToString(*indexed_table) << " sleeping for "  | 
341  | 5  |              << FLAGS_TEST_slowdown_backfill_alter_table_rpcs_ms  | 
342  | 5  |              << "ms AFTER updating the index permission to " << ToString(perm_mapping);  | 
343  | 139  |     SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_slowdown_backfill_alter_table_rpcs_ms));  | 
344  | 139  |     DVLOG(3) << __PRETTY_FUNCTION__ << " Done Sleeping"13 ;  | 
345  | 139  |     TRACE("Done Sleeping"); | 
346  | 139  |   }  | 
347  | 1.95k  |   return permissions_updated;  | 
348  | 1.95k  | }  | 
349  |  |  | 
350  |  | Status MultiStageAlterTable::StartBackfillingData(  | 
351  |  |     CatalogManager* catalog_manager,  | 
352  |  |     const scoped_refptr<TableInfo>& indexed_table,  | 
353  |  |     const std::vector<IndexInfoPB>& idx_infos,  | 
354  | 947  |     boost::optional<uint32_t> current_version) { | 
355  |  |   // We leave the table state as ALTERING so that a master failover can resume the backfill.  | 
356  | 947  |   RETURN_NOT_OK(ClearFullyAppliedAndUpdateState(  | 
357  | 947  |       catalog_manager, indexed_table, current_version, /* change_state to RUNNING */ false));  | 
358  |  |  | 
359  | 947  |   RETURN_NOT_OK(indexed_table->SetIsBackfilling());  | 
360  |  |  | 
361  | 906  |   TRACE("Starting backfill process"); | 
362  | 906  |   VLOG(0) << __func__ << " starting backfill on " << indexed_table->ToString() << " for "  | 
363  | 906  |           << yb::ToString(idx_infos);  | 
364  |  |  | 
365  | 906  |   auto ns_info = catalog_manager->FindNamespaceById(indexed_table->namespace_id());  | 
366  | 906  |   RETURN_NOT_OK_PREPEND(ns_info, "Unable to get namespace info for backfill");  | 
367  |  |  | 
368  | 906  |   auto backfill_table = std::make_shared<BackfillTable>(  | 
369  | 906  |       catalog_manager->master_, catalog_manager->AsyncTaskPool(), indexed_table, idx_infos,  | 
370  | 906  |       *ns_info);  | 
371  | 906  |   backfill_table->Launch();  | 
372  | 906  |   return Status::OK();  | 
373  | 906  | }  | 
374  |  |  | 
375  |  | // Returns true, if the said IndexPermissions is a transient state.  | 
376  |  | // Returns false, if it is a state where the index can be. viz: READ_WRITE_AND_DELETE  | 
377  |  | // INDEX_UNUSED is considered transcient because it needs to delete the index.  | 
378  | 0  | bool IsTransientState(IndexPermissions perm) { | 
379  | 0  |   return perm != INDEX_PERM_READ_WRITE_AND_DELETE && perm != INDEX_PERM_NOT_USED;  | 
380  | 0  | }  | 
381  |  |  | 
382  | 1.18k  | IndexPermissions NextPermission(IndexPermissions perm) { | 
383  | 1.18k  |   switch (perm) { | 
384  | 480  |     case INDEX_PERM_DELETE_ONLY:  | 
385  | 480  |       return INDEX_PERM_WRITE_AND_DELETE;  | 
386  | 457  |     case INDEX_PERM_WRITE_AND_DELETE:  | 
387  | 457  |       return INDEX_PERM_DO_BACKFILL;  | 
388  | 0  |     case INDEX_PERM_DO_BACKFILL:  | 
389  | 0  |       CHECK(false) << "Not expected to be here.";  | 
390  | 0  |       return INDEX_PERM_DELETE_ONLY;  | 
391  | 0  |     case INDEX_PERM_READ_WRITE_AND_DELETE:  | 
392  | 0  |       CHECK(false) << "Not expected to be here.";  | 
393  | 0  |       return INDEX_PERM_DELETE_ONLY;  | 
394  | 131  |     case INDEX_PERM_WRITE_AND_DELETE_WHILE_REMOVING:  | 
395  | 131  |       return INDEX_PERM_DELETE_ONLY_WHILE_REMOVING;  | 
396  | 119  |     case INDEX_PERM_DELETE_ONLY_WHILE_REMOVING:  | 
397  | 119  |       return INDEX_PERM_INDEX_UNUSED;  | 
398  | 0  |     case INDEX_PERM_INDEX_UNUSED:  | 
399  | 0  |     case INDEX_PERM_NOT_USED:  | 
400  | 0  |       CHECK(false) << "Not expected to be here.";  | 
401  | 0  |       return INDEX_PERM_DELETE_ONLY;  | 
402  | 1.18k  |   }  | 
403  | 0  |   CHECK(false) << "Not expected to be here.";  | 
404  | 0  |   return INDEX_PERM_DELETE_ONLY;  | 
405  | 1.18k  | }  | 
406  |  |  | 
407  |  | Status MultiStageAlterTable::LaunchNextTableInfoVersionIfNecessary(  | 
408  |  |     CatalogManager* catalog_manager, const scoped_refptr<TableInfo>& indexed_table,  | 
409  | 10.5k  |     uint32_t current_version, bool respect_backfill_deferrals) { | 
410  | 10.5k  |   DVLOG(3) << __PRETTY_FUNCTION__ << " " << yb::ToString(*indexed_table)7 ;  | 
411  |  |  | 
412  | 10.5k  |   const bool is_ysql_table = (indexed_table->GetTableType() == TableType::PGSQL_TABLE_TYPE);  | 
413  | 10.5k  |   const bool defer_backfill = !is_ysql_table && GetAtomicFlag(&FLAGS_defer_index_backfill)2.47k ;  | 
414  | 10.5k  |   const bool is_backfilling = indexed_table->IsBackfilling();  | 
415  |  |  | 
416  | 10.5k  |   std::unordered_map<TableId, IndexPermissions> indexes_to_update;  | 
417  | 10.5k  |   vector<IndexInfoPB> indexes_to_backfill;  | 
418  | 10.5k  |   vector<IndexInfoPB> deferred_indexes;  | 
419  | 10.5k  |   vector<IndexInfoPB> indexes_to_delete;  | 
420  | 10.5k  |   { | 
421  | 10.5k  |     TRACE("Locking indexed table"); | 
422  | 10.5k  |     VLOG(1) << ("Locking indexed table")9 ;  | 
423  | 10.5k  |     auto l = indexed_table->LockForRead();  | 
424  | 10.5k  |     VLOG(1) << ("Locked indexed table")11 ;  | 
425  | 10.5k  |     if (current_version != l->pb.version()) { | 
426  | 0  |       LOG(WARNING) << "Somebody launched the next version before we got to it.";  | 
427  | 0  |       return Status::OK();  | 
428  | 0  |     }  | 
429  |  |  | 
430  |  |     // Attempt to find an index that requires us to just launch the next state (i.e. not backfill)  | 
431  | 17.5k  |     for (int i = 0; 10.5k i < l->pb.indexes_size(); i++6.98k ) {  | 
432  | 6.98k  |       const IndexInfoPB& idx_pb = l->pb.indexes(i);  | 
433  | 6.98k  |       if (!idx_pb.has_index_permissions()) { | 
434  | 762  |         continue;  | 
435  | 762  |       }  | 
436  | 6.22k  |       if (idx_pb.index_permissions() == INDEX_PERM_DO_BACKFILL) { | 
437  | 640  |         if (respect_backfill_deferrals && (639 defer_backfill639  || idx_pb.is_backfill_deferred()639 )) {  | 
438  | 22  |           LOG(INFO) << "Deferring index-backfill for " << idx_pb.table_id();  | 
439  | 22  |           deferred_indexes.emplace_back(idx_pb);  | 
440  | 618  |         } else { | 
441  | 618  |           indexes_to_backfill.emplace_back(idx_pb);  | 
442  | 618  |         }  | 
443  | 5.58k  |       } else if (idx_pb.index_permissions() == INDEX_PERM_INDEX_UNUSED) { | 
444  | 124  |         indexes_to_delete.emplace_back(idx_pb);  | 
445  |  |       // For YSQL, there should never be indexes to update from master side because postgres drives  | 
446  |  |       // permission changes.  | 
447  | 5.45k  |       } else if (idx_pb.index_permissions() != INDEX_PERM_READ_WRITE_AND_DELETE && !is_ysql_table1.83k ) {  | 
448  | 1.18k  |         indexes_to_update.emplace(idx_pb.table_id(), NextPermission(idx_pb.index_permissions()));  | 
449  | 1.18k  |       }  | 
450  | 6.22k  |     }  | 
451  |  |  | 
452  |  |     // TODO(#6218): Do we really not want to continue backfill  | 
453  |  |     // across master failovers for YSQL?  | 
454  | 10.5k  |     if (!is_ysql_table && !is_backfilling2.47k  && l.data().pb.backfill_jobs_size() > 02.33k ) {  | 
455  |  |       // If a backfill job was started for a set of indexes and then the leader  | 
456  |  |       // fails over, we should be careful that we are restarting the backfill job  | 
457  |  |       // with the same set of indexes.  | 
458  |  |       // A new index could have been added since the time the last backfill job started on  | 
459  |  |       // the old master. The safe time calculated for the earlier set of indexes may not be  | 
460  |  |       // valid for the new index(es) to use.  | 
461  | 0  |       DCHECK(l.data().pb.backfill_jobs_size() == 1) << "For now we only expect to have up to 1 "  | 
462  | 0  |                                                         "outstanding backfill job.";  | 
463  | 0  |       const BackfillJobPB& backfill_job = l.data().pb.backfill_jobs(0);  | 
464  | 0  |       VLOG(3) << "Found an in-progress backfill-job " << yb::ToString(backfill_job);  | 
465  |  |       // Do not allow for any other indexes to piggy back with this backfill.  | 
466  | 0  |       indexes_to_backfill.clear();  | 
467  | 0  |       deferred_indexes.clear();  | 
468  | 0  |       for (int i = 0; i < backfill_job.indexes_size(); i++) { | 
469  | 0  |         const IndexInfoPB& idx_pb = backfill_job.indexes(i);  | 
470  | 0  |         indexes_to_backfill.push_back(idx_pb);  | 
471  | 0  |       }  | 
472  | 0  |     }  | 
473  | 10.5k  |   }  | 
474  |  |  | 
475  | 10.5k  |   if (indexes_to_update.empty() &&  | 
476  | 10.5k  |       indexes_to_delete.empty()9.43k  &&  | 
477  | 10.5k  |       (9.31k is_backfilling9.31k  || indexes_to_backfill.empty()9.27k )) {  | 
478  | 8.90k  |     TRACE("Not necessary to launch next version"); | 
479  | 8.90k  |     VLOG(1) << "Not necessary to launch next version"4 ;  | 
480  | 8.90k  |     return ClearFullyAppliedAndUpdateState(  | 
481  | 8.90k  |         catalog_manager, indexed_table, current_version, /* change state to RUNNING */ true);  | 
482  | 8.90k  |   }  | 
483  |  |  | 
484  |  |   // For YSQL online schema migration of indexes, instead of master driving the schema changes,  | 
485  |  |   // postgres will drive it.  Postgres will use three of the DocDB index permissions:  | 
486  |  |   //  | 
487  |  |   // - INDEX_PERM_WRITE_AND_DELETE (set from the start)  | 
488  |  |   // - INDEX_PERM_READ_WRITE_AND_DELETE (set by master)  | 
489  |  |   // - INDEX_PERM_WRITE_AND_DELETE_WHILE_REMOVING (set by master)  | 
490  |  |   //  | 
491  |  |   // This changes how we treat indexes_to_foo:  | 
492  |  |   //  | 
493  |  |   // - indexes_to_update should always be empty because we never want master to set index  | 
494  |  |   //   permissions.  | 
495  |  |   // - indexes_to_delete is impossible to be nonempty, and, in the future, when we do use  | 
496  |  |   //   INDEX_PERM_INDEX_UNUSED, we want to use some other delete trigger that makes sure no  | 
497  |  |   //   transactions are left using the index.  Prepare for that by doing nothing when nonempty.  | 
498  |  |   // - indexes_to_backfill is impossible to be nonempty, but, in the future, we want to set  | 
499  |  |   //   INDEX_PERM_DO_BACKFILL so that backfill resumes on master leader changes.  Prepare for that  | 
500  |  |   //   by handling indexes_to_backfill like for YCQL.  | 
501  |  |   //  | 
502  |  |   // TODO(jason): when using INDEX_PERM_DO_BACKFILL, update this comment (issue #6218).  | 
503  |  |  | 
504  | 1.67k  |   if (!indexes_to_update.empty()) { | 
505  | 1.14k  |     VLOG(1) << "Updating index permissions for " << yb::ToString(indexes_to_update) << " on "  | 
506  | 4  |             << indexed_table->ToString();  | 
507  | 1.14k  |     Result<bool> permissions_updated =  | 
508  | 1.14k  |         VERIFY_RESULT966 (UpdateIndexPermission(catalog_manager, indexed_table, indexes_to_update,  | 
509  | 966  |                                             current_version));  | 
510  |  |  | 
511  | 966  |     if (!permissions_updated.ok()) { | 
512  | 0  |       LOG(WARNING) << "Could not update index permissions."  | 
513  | 0  |                    << " Possible that the master-leader has changed, or a race "  | 
514  | 0  |                    << "with another thread trying to launch next version: "  | 
515  | 0  |                    << permissions_updated.ToString();  | 
516  | 0  |     }  | 
517  |  |  | 
518  | 966  |     if (permissions_updated.ok() && *permissions_updated) { | 
519  | 966  |       VLOG(1) << "Sending alter table request with updated permissions"4 ;  | 
520  | 966  |       RETURN_NOT_OK(catalog_manager->SendAlterTableRequest(indexed_table));  | 
521  | 966  |       return Status::OK();  | 
522  | 966  |     }  | 
523  | 966  |   }  | 
524  |  |  | 
525  | 530  |   if (!indexes_to_delete.empty()) { | 
526  | 124  |     const auto& index_info_to_update = indexes_to_delete[0];  | 
527  | 124  |     VLOG(3) << "Deleting the index and the entry in the indexed table for "  | 
528  | 1  |             << yb::ToString(index_info_to_update);  | 
529  | 124  |     DeleteTableRequestPB req;  | 
530  | 124  |     DeleteTableResponsePB resp;  | 
531  | 124  |     req.mutable_table()->set_table_id(index_info_to_update.table_id());  | 
532  | 124  |     req.set_is_index_table(true);  | 
533  | 124  |     RETURN_NOT_OK(catalog_manager->DeleteTableInternal(&req, &resp, nullptr));  | 
534  | 110  |     return Status::OK();  | 
535  | 124  |   }  | 
536  |  |  | 
537  | 407  |   if (406 !indexes_to_backfill.empty()406 ) {  | 
538  | 407  |     VLOG(3) << "Backfilling " << yb::ToString(indexes_to_backfill)  | 
539  | 1  |             << (deferred_indexes.empty()  | 
540  | 1  |                  ? ""  | 
541  | 1  |                  : yb::Format(" along with deferred indexes $0", | 
542  | 0  |                               yb::ToString(deferred_indexes)));  | 
543  | 407  |     for (auto& deferred_idx : deferred_indexes) { | 
544  | 5  |       indexes_to_backfill.emplace_back(deferred_idx);  | 
545  | 5  |     }  | 
546  | 407  |     WARN_NOT_OK(  | 
547  | 407  |         StartBackfillingData(  | 
548  | 407  |             catalog_manager, indexed_table.get(), indexes_to_backfill, current_version),  | 
549  | 407  |         yb::Format("Could not launch backfill for $0", indexed_table->ToString())); | 
550  | 407  |   }  | 
551  |  |  | 
552  | 406  |   return Status::OK();  | 
553  | 530  | }  | 
554  |  |  | 
555  |  | // -----------------------------------------------------------------------------------------------  | 
556  |  | // BackfillTableJob  | 
557  |  | // -----------------------------------------------------------------------------------------------  | 
558  | 0  | std::string BackfillTableJob::description() const { | 
559  | 0  |   const std::shared_ptr<BackfillTable> retain_bt = backfill_table_;  | 
560  | 0  |   auto curr_state = state();  | 
561  | 0  |   if (!IsStateTerminal(curr_state) && retain_bt) { | 
562  | 0  |     return retain_bt->description();  | 
563  | 0  |   } else if (curr_state == MonitoredTaskState::kFailed) { | 
564  | 0  |     return Format("Backfilling $0 Failed", requested_index_names_); | 
565  | 0  |   } else if (curr_state == MonitoredTaskState::kAborted) { | 
566  | 0  |     return Format("Backfilling $0 Aborted", requested_index_names_); | 
567  | 0  |   } else { | 
568  | 0  |     DCHECK(curr_state == MonitoredTaskState::kComplete);  | 
569  | 0  |     return Format("Backfilling $0 Done", requested_index_names_); | 
570  | 0  |   }  | 
571  | 0  | }  | 
572  |  |  | 
573  | 0  | MonitoredTaskState BackfillTableJob::AbortAndReturnPrevState(const Status& status) { | 
574  | 0  |   auto old_state = state();  | 
575  | 0  |   while (!IsStateTerminal(old_state)) { | 
576  | 0  |     if (state_.compare_exchange_strong(old_state,  | 
577  | 0  |                                        MonitoredTaskState::kAborted)) { | 
578  | 0  |       return old_state;  | 
579  | 0  |     }  | 
580  | 0  |     old_state = state();  | 
581  | 0  |   }  | 
582  | 0  |   return old_state;  | 
583  | 0  | }  | 
584  |  |  | 
585  | 1.78k  | void BackfillTableJob::SetState(MonitoredTaskState new_state) { | 
586  | 1.78k  |   auto old_state = state();  | 
587  | 1.78k  |   if (!IsStateTerminal(old_state)) { | 
588  | 1.77k  |     if (state_.compare_exchange_strong(old_state, new_state) && IsStateTerminal(new_state)) { | 
589  | 871  |       MarkDone();  | 
590  | 871  |     }  | 
591  | 1.77k  |   }  | 
592  | 1.78k  | }  | 
593  |  | // -----------------------------------------------------------------------------------------------  | 
594  |  | // BackfillTable  | 
595  |  | // -----------------------------------------------------------------------------------------------  | 
596  |  |  | 
597  |  | namespace { | 
598  |  |  | 
599  | 906  | std::unordered_set<TableId> IndexIdsFromInfos(const std::vector<IndexInfoPB>& indexes) { | 
600  | 906  |   std::unordered_set<TableId> idx_ids;  | 
601  | 918  |   for (const auto& idx_info : indexes) { | 
602  | 918  |     idx_ids.insert(idx_info.table_id());  | 
603  | 918  |   }  | 
604  | 906  |   return idx_ids;  | 
605  | 906  | }  | 
606  |  |  | 
607  |  | std::string RetrieveIndexNames(CatalogManager* mgr,  | 
608  | 5.43k  |                                const std::unordered_set<std::string>& index_ids) { | 
609  | 5.43k  |   std::ostringstream out;  | 
610  | 5.43k  |   out << "{ "; | 
611  | 5.43k  |   bool first = true;  | 
612  | 5.47k  |   for (const auto& index_id : index_ids) { | 
613  | 5.47k  |     const auto table_info = mgr->GetTableInfo(index_id);  | 
614  | 5.47k  |     if (!table_info) { | 
615  | 0  |       LOG(WARNING) << "No table info can be found with index table id " << index_id;  | 
616  | 0  |       continue;  | 
617  | 0  |     }  | 
618  | 5.47k  |     if (!first) { | 
619  | 47  |       out << ", ";  | 
620  | 47  |     }  | 
621  | 5.47k  |     first = false;  | 
622  |  |  | 
623  | 5.47k  |     out << table_info->name();  | 
624  | 5.47k  |   }  | 
625  | 5.43k  |   out << " }";  | 
626  | 5.43k  |   return out.str();  | 
627  | 5.43k  | }  | 
628  |  |  | 
629  |  | }  // namespace  | 
630  |  |  | 
631  |  | BackfillTable::BackfillTable(  | 
632  |  |     Master* master, ThreadPool* callback_pool, const scoped_refptr<TableInfo>& indexed_table,  | 
633  |  |     std::vector<IndexInfoPB> indexes, const scoped_refptr<NamespaceInfo>& ns_info)  | 
634  |  |     : master_(master),  | 
635  |  |       callback_pool_(callback_pool),  | 
636  |  |       indexed_table_(indexed_table),  | 
637  |  |       index_infos_(indexes),  | 
638  |  |       requested_index_ids_(IndexIdsFromInfos(indexes)),  | 
639  |  |       requested_index_names_(RetrieveIndexNames(  | 
640  |  |           master->catalog_manager_impl(), requested_index_ids_)),  | 
641  | 906  |       ns_info_(ns_info) { | 
642  | 906  |   auto l = indexed_table_->LockForRead();  | 
643  | 906  |   schema_version_ = indexed_table_->metadata().state().pb.version();  | 
644  | 906  |   leader_term_ = master_->catalog_manager()->leader_ready_term();  | 
645  | 906  |   if (l.data().pb.backfill_jobs_size() > 0) { | 
646  | 0  |     number_rows_processed_.store(l.data().pb.backfill_jobs(0).num_rows_processed());  | 
647  | 906  |   } else { | 
648  | 906  |     number_rows_processed_.store(0);  | 
649  | 906  |   }  | 
650  |  |  | 
651  | 906  |   const auto& pb = indexed_table_->metadata().state().pb;  | 
652  | 906  |   if (pb.backfill_jobs_size() > 0 && pb.backfill_jobs(0).has_backfilling_timestamp()0  &&  | 
653  | 906  |       read_time_for_backfill_.FromUint64(pb.backfill_jobs(0).backfilling_timestamp()).ok()0 ) {  | 
654  | 0  |     DCHECK(pb.backfill_jobs_size() == 1) << "Expect only 1 outstanding backfill job";  | 
655  | 0  |     DCHECK(implicit_cast<size_t>(pb.backfill_jobs(0).indexes_size()) == index_infos_.size())  | 
656  | 0  |         << "Expect to use the same set of indexes.";  | 
657  | 0  |     timestamp_chosen_.store(true, std::memory_order_release);  | 
658  | 0  |     VLOG_WITH_PREFIX(1) << "Will be using " << read_time_for_backfill_  | 
659  | 0  |                         << " for backfill";  | 
660  | 906  |   } else { | 
661  | 906  |     read_time_for_backfill_ = HybridTime::kInvalid;  | 
662  | 906  |     timestamp_chosen_.store(false, std::memory_order_release);  | 
663  | 906  |   }  | 
664  | 906  |   done_.store(false, std::memory_order_release);  | 
665  | 906  | }  | 
666  |  |  | 
667  | 14.1k  | const std::unordered_set<TableId> BackfillTable::indexes_to_build() const { | 
668  | 14.1k  |   std::unordered_set<TableId> indexes_to_build;  | 
669  | 14.1k  |   { | 
670  | 14.1k  |     auto l = indexed_table_->LockForRead();  | 
671  | 14.1k  |     const auto& indexed_table_pb = l.data().pb;  | 
672  | 14.1k  |     if (indexed_table_pb.backfill_jobs_size() == 0) { | 
673  |  |       // Some other task already marked the backfill job as done.  | 
674  | 56  |       return {}; | 
675  | 56  |     }  | 
676  | 18.4E  |     DCHECK(indexed_table_pb.backfill_jobs_size() == 1) << "For now we only expect to have up to 1 "  | 
677  | 18.4E  |                                                           "outstanding backfill job.";  | 
678  | 14.2k  |     for (const auto& kv_pair : indexed_table_pb.backfill_jobs(0).backfill_state()) { | 
679  | 14.2k  |       if (kv_pair.second == BackfillJobPB::IN_PROGRESS) { | 
680  | 14.2k  |         indexes_to_build.insert(kv_pair.first);  | 
681  | 14.2k  |       }  | 
682  | 14.2k  |     }  | 
683  | 14.1k  |   }  | 
684  | 0  |   return indexes_to_build;  | 
685  | 14.1k  | }  | 
686  |  |  | 
687  | 906  | void BackfillTable::Launch() { | 
688  | 906  |   backfill_job_ = std::make_shared<BackfillTableJob>(shared_from_this());  | 
689  | 906  |   backfill_job_->SetState(MonitoredTaskState::kRunning);  | 
690  | 906  |   master_->catalog_manager_impl()->jobs_tracker_->AddTask(backfill_job_);  | 
691  |  |  | 
692  | 906  |   { | 
693  | 906  |     auto l = indexed_table_->LockForWrite();  | 
694  | 906  |     if (l.data().pb.backfill_jobs_size() == 0) { | 
695  | 906  |       auto* backfill_job = l.mutable_data()->pb.add_backfill_jobs();  | 
696  | 918  |       for (const auto& idx_info : index_infos_) { | 
697  | 918  |         backfill_job->add_indexes()->CopyFrom(idx_info);  | 
698  | 918  |         backfill_job->mutable_backfill_state()->insert(  | 
699  | 918  |             {idx_info.table_id(), BackfillJobPB::IN_PROGRESS}); | 
700  | 918  |       }  | 
701  | 906  |       auto s = master_->catalog_manager_impl()->sys_catalog_->Upsert(  | 
702  | 906  |               leader_term(), indexed_table_);  | 
703  | 906  |       if (!s.ok()) { | 
704  | 0  |         LOG(WARNING) << "Failed to persist backfill jobs. Abandoning launch. " << s;  | 
705  | 0  |         return;  | 
706  | 0  |       }  | 
707  | 906  |       l.Commit();  | 
708  | 906  |     }  | 
709  | 906  |   }  | 
710  | 906  |   if (!timestamp_chosen_.load(std::memory_order_acquire)) { | 
711  | 906  |     LaunchComputeSafeTimeForRead();  | 
712  | 906  |   } else { | 
713  | 0  |     LaunchBackfill();  | 
714  | 0  |   }  | 
715  | 906  | }  | 
716  |  |  | 
717  | 906  | void BackfillTable::LaunchComputeSafeTimeForRead() { | 
718  | 906  |   auto tablets = indexed_table_->GetTablets();  | 
719  |  |  | 
720  | 906  |   num_tablets_.store(tablets.size(), std::memory_order_release);  | 
721  | 906  |   tablets_pending_.store(tablets.size(), std::memory_order_release);  | 
722  | 906  |   auto min_cutoff = master()->clock()->Now();  | 
723  | 4.40k  |   for (const scoped_refptr<TabletInfo>& tablet : tablets) { | 
724  | 4.40k  |     auto get_safetime = std::make_shared<GetSafeTimeForTablet>(  | 
725  | 4.40k  |         shared_from_this(), tablet, min_cutoff);  | 
726  | 4.40k  |     get_safetime->Launch();  | 
727  | 4.40k  |   }  | 
728  | 906  | }  | 
729  |  |  | 
730  | 1.81k  | std::string BackfillTable::LogPrefix() const { | 
731  | 1.81k  |   return Format("Backfill Index Table(s) $0 ", requested_index_names_); | 
732  | 1.81k  | }  | 
733  |  |  | 
734  | 0  | std::string BackfillTable::description() const { | 
735  | 0  |   auto num_pending = tablets_pending_.load(std::memory_order_acquire);  | 
736  | 0  |   auto num_tablets = num_tablets_.load(std::memory_order_acquire);  | 
737  | 0  |   return Format(  | 
738  | 0  |       "Backfill Index Table(s) $0 : $1", requested_index_names_,  | 
739  | 0  |       (timestamp_chosen()  | 
740  | 0  |            ? (done() ? Format("Backfill $0/$1 tablets done", num_pending, num_tablets) | 
741  | 0  |                      : Format(  | 
742  | 0  |                            "Backfilling $0/$1 tablets with $2 rows done", num_pending, num_tablets,  | 
743  | 0  |                            number_rows_processed_.load()))  | 
744  | 0  |            : Format("Waiting to GetSafeTime from $0/$1 tablets", num_pending, num_tablets))); | 
745  | 0  | }  | 
746  |  |  | 
747  | 1.99k  | const std::string BackfillTable::GetNamespaceName() const { | 
748  | 1.99k  |   return ns_info_->name();  | 
749  | 1.99k  | }  | 
750  |  |  | 
751  | 4.32k  | Status BackfillTable::UpdateRowsProcessedForIndexTable(const uint64_t number_rows_processed) { | 
752  | 4.32k  |   auto l = indexed_table_->LockForWrite();  | 
753  |  |  | 
754  | 4.32k  |   if (l.data().pb.backfill_jobs_size() == 0) { | 
755  |  |     // Some other task already marked the backfill job as done.  | 
756  | 31  |     return Status::OK();  | 
757  | 31  |   }  | 
758  |  |  | 
759  |  |   // This is consistent with logic assuming that we have only one backfill job in queue  | 
760  |  |   // We might in the future change this to a for loop to account for multiple backfill jobs  | 
761  | 4.29k  |   number_rows_processed_.fetch_add(number_rows_processed);  | 
762  | 4.29k  |   auto* indexed_table_pb = l.mutable_data()->pb.mutable_backfill_jobs(0);  | 
763  | 4.29k  |   indexed_table_pb->set_num_rows_processed(number_rows_processed_.load());  | 
764  | 4.29k  |   VLOG(2) << "Updated backfill task to having processed " << number_rows_processed  | 
765  | 0  |           << " more rows. Total rows processed is: " << number_rows_processed_;  | 
766  |  |  | 
767  | 4.29k  |   RETURN_NOT_OK(  | 
768  | 4.29k  |       master_->catalog_manager_impl()->sys_catalog_->Upsert(leader_term(), indexed_table_));  | 
769  | 4.29k  |   l.Commit();  | 
770  | 4.29k  |   return Status::OK();  | 
771  | 4.29k  | }  | 
772  |  |  | 
773  | 4.40k  | Status BackfillTable::UpdateSafeTime(const Status& s, HybridTime ht) { | 
774  | 4.40k  |   if (!s.ok()) { | 
775  |  |     // Move on to ABORTED permission.  | 
776  | 1  |     LOG_WITH_PREFIX(ERROR)  | 
777  | 1  |         << "Failed backfill. Could not compute safe time for "  | 
778  | 1  |         << yb::ToString(indexed_table_) << " " << s;  | 
779  | 1  |     if (!timestamp_chosen_.exchange(true)) { | 
780  | 1  |       RETURN_NOT_OK_PREPEND(  | 
781  | 1  |           MarkAllIndexesAsFailed(), "Failed to mark backfill as failed. Abandoning.");  | 
782  | 1  |     }  | 
783  | 1  |     return Status::OK();  | 
784  | 1  |   }  | 
785  |  |  | 
786  |  |   // Need to guard this.  | 
787  | 4.39k  |   HybridTime read_timestamp;  | 
788  | 4.39k  |   { | 
789  | 4.39k  |     std::lock_guard<simple_spinlock> l(mutex_);  | 
790  | 4.39k  |     VLOG(2) << "Updating read_time_for_backfill_ to max{ " | 
791  | 1  |             << read_time_for_backfill_.ToString() << ", " << ht.ToString()  | 
792  | 1  |             << " }.";  | 
793  | 4.39k  |     read_time_for_backfill_.MakeAtLeast(ht);  | 
794  | 4.39k  |     read_timestamp = read_time_for_backfill_;  | 
795  | 4.39k  |   }  | 
796  |  |  | 
797  |  |   // If OK then move on to doing backfill.  | 
798  | 4.39k  |   if (!timestamp_chosen() && --tablets_pending_ == 04.39k ) {  | 
799  | 905  |     LOG_WITH_PREFIX(INFO) << "Completed fetching SafeTime for the table "  | 
800  | 905  |                           << yb::ToString(indexed_table_) << " will be using "  | 
801  | 905  |                           << read_timestamp.ToString();  | 
802  | 905  |     { | 
803  | 905  |       auto l = indexed_table_->LockForWrite();  | 
804  | 905  |       DCHECK_EQ(l.mutable_data()->pb.backfill_jobs_size(), 1);  | 
805  | 905  |       auto* backfill_job = l.mutable_data()->pb.mutable_backfill_jobs(0);  | 
806  | 905  |       backfill_job->set_backfilling_timestamp(read_timestamp.ToUint64());  | 
807  | 905  |       RETURN_NOT_OK_PREPEND(  | 
808  | 905  |           master_->catalog_manager_impl()->sys_catalog_->Upsert(  | 
809  | 905  |               leader_term(), indexed_table_),  | 
810  | 905  |           "Failed to persist backfilling timestamp. Abandoning.");  | 
811  | 905  |       l.Commit();  | 
812  | 905  |     }  | 
813  | 1  |     VLOG_WITH_PREFIX(2) << "Saved " << read_timestamp  | 
814  | 1  |                         << " as backfilling_timestamp";  | 
815  | 905  |     timestamp_chosen_.store(true, std::memory_order_release);  | 
816  | 905  |     LaunchBackfill();  | 
817  | 905  |   }  | 
818  | 4.39k  |   return Status::OK();  | 
819  | 4.39k  | }  | 
820  |  |  | 
821  | 905  | void BackfillTable::LaunchBackfill() { | 
822  | 905  |   VLOG_WITH_PREFIX1 (1) << "launching backfill with timestamp: "  | 
823  | 1  |                       << read_time_for_backfill_;  | 
824  | 905  |   auto tablets = indexed_table_->GetTablets();  | 
825  |  |  | 
826  | 905  |   num_tablets_.store(tablets.size(), std::memory_order_release);  | 
827  | 905  |   tablets_pending_.store(tablets.size(), std::memory_order_release);  | 
828  | 4.39k  |   for (const scoped_refptr<TabletInfo>& tablet : tablets) { | 
829  | 4.39k  |     auto backfill_tablet = std::make_shared<BackfillTablet>(shared_from_this(), tablet);  | 
830  | 4.39k  |     backfill_tablet->Launch();  | 
831  | 4.39k  |   }  | 
832  | 905  | }  | 
833  |  |  | 
834  | 4.23k  | void BackfillTable::Done(const Status& s, const std::unordered_set<TableId>& failed_indexes) { | 
835  | 4.23k  |   if (!s.ok()) { | 
836  | 41  |     LOG_WITH_PREFIX(ERROR) << "failed to backfill the index: " << yb::ToString(failed_indexes)  | 
837  | 41  |                            << " due to " << s;  | 
838  | 41  |     WARN_NOT_OK(  | 
839  | 41  |         MarkIndexesAsFailed(failed_indexes, s.message().ToBuffer()),  | 
840  | 41  |         "Couldn't to mark Indexes as failed");  | 
841  | 41  |     CheckIfDone();  | 
842  | 41  |     return;  | 
843  | 41  |   }  | 
844  |  |  | 
845  |  |   // If OK then move on to READ permissions.  | 
846  | 4.18k  |   if (!done() && --tablets_pending_ == 04.16k ) {  | 
847  | 868  |     LOG_WITH_PREFIX(INFO) << "Completed backfilling the index table.";  | 
848  | 868  |     done_.store(true, std::memory_order_release);  | 
849  | 868  |     WARN_NOT_OK(MarkAllIndexesAsSuccess(), "Failed to complete backfill.");  | 
850  | 868  |     WARN_NOT_OK(UpdateIndexPermissionsForIndexes(), "Failed to complete backfill.");  | 
851  | 3.32k  |   } else { | 
852  | 3.32k  |     VLOG_WITH_PREFIX0 (1) << "Still backfilling " << tablets_pending_ << " more tablets."0 ;  | 
853  | 3.32k  |   }  | 
854  | 4.18k  | }  | 
855  |  |  | 
856  |  | Status BackfillTable::MarkIndexesAsFailed(  | 
857  | 41  |     const std::unordered_set<TableId>& failed_indexes, const string& message) { | 
858  | 41  |   return MarkIndexesAsDesired(failed_indexes, BackfillJobPB::FAILED, message);  | 
859  | 41  | }  | 
860  |  |  | 
861  | 1  | Status BackfillTable::MarkAllIndexesAsFailed() { | 
862  | 1  |   return MarkIndexesAsDesired(indexes_to_build(), BackfillJobPB::FAILED, "failed");  | 
863  | 1  | }  | 
864  |  |  | 
865  | 868  | Status BackfillTable::MarkAllIndexesAsSuccess() { | 
866  | 868  |   return MarkIndexesAsDesired(indexes_to_build(), BackfillJobPB::SUCCESS, "");  | 
867  | 868  | }  | 
868  |  |  | 
869  |  | Status BackfillTable::MarkIndexesAsDesired(  | 
870  |  |     const std::unordered_set<TableId>& index_ids_set, BackfillJobPB_State state,  | 
871  | 910  |     const string message) { | 
872  | 910  |   VLOG_WITH_PREFIX1 (3) << "Marking " << yb::ToString(index_ids_set)  | 
873  | 1  |                       << " as " << BackfillJobPB_State_Name(state)  | 
874  | 1  |                       << " due to " << message;  | 
875  | 910  |   if (!index_ids_set.empty()) { | 
876  | 910  |     auto l = indexed_table_->LockForWrite();  | 
877  | 910  |     auto& indexed_table_pb = l.mutable_data()->pb;  | 
878  | 910  |     DCHECK_LE(indexed_table_pb.backfill_jobs_size(), 1) << "For now we only expect to have up to 1 "  | 
879  | 0  |                                                            "outstanding backfill job.";  | 
880  | 910  |     if (indexed_table_pb.backfill_jobs_size() == 0) { | 
881  |  |       // Some other task already marked the backfill job as done.  | 
882  | 24  |       return Status::OK();  | 
883  | 24  |     }  | 
884  | 886  |     auto* backfill_state_pb = indexed_table_pb.mutable_backfill_jobs(0)->mutable_backfill_state();  | 
885  | 896  |     for (const auto& idx_id : index_ids_set) { | 
886  | 896  |       backfill_state_pb->at(idx_id) = state;  | 
887  | 896  |       VLOG(2) << "Marking index " << idx_id << " as " << BackfillJobPB_State_Name(state)1 ;  | 
888  | 896  |     }  | 
889  | 2.46k  |     for (int i = 0; i < indexed_table_pb.indexes_size(); i++1.57k ) {  | 
890  | 1.57k  |       IndexInfoPB* idx_pb = indexed_table_pb.mutable_indexes(i);  | 
891  | 1.57k  |       if (index_ids_set.find(idx_pb->table_id()) != index_ids_set.end()) { | 
892  |  |         // Should this also move to the BackfillJob instead?  | 
893  | 891  |         if (!message.empty()) { | 
894  | 13  |           idx_pb->set_backfill_error_message(message);  | 
895  | 878  |         } else { | 
896  | 878  |           idx_pb->clear_backfill_error_message();  | 
897  | 878  |         }  | 
898  | 891  |         idx_pb->clear_is_backfill_deferred();  | 
899  | 891  |       }  | 
900  | 1.57k  |     }  | 
901  | 886  |     RETURN_NOT_OK(  | 
902  | 886  |         master_->catalog_manager_impl()->sys_catalog_->Upsert(leader_term(), indexed_table_));  | 
903  | 886  |     l.Commit();  | 
904  | 886  |   }  | 
905  | 886  |   return Status::OK();  | 
906  | 910  | }  | 
907  |  |  | 
908  | 41  | void BackfillTable::CheckIfDone() { | 
909  | 41  |   if (indexes_to_build().empty()) { | 
910  | 40  |     done_.store(true, std::memory_order_release);  | 
911  | 40  |     WARN_NOT_OK(  | 
912  | 40  |         UpdateIndexPermissionsForIndexes(), "Could not update index permissions after backfill");  | 
913  | 40  |   }  | 
914  | 41  | }  | 
915  |  |  | 
916  | 908  | Status BackfillTable::UpdateIndexPermissionsForIndexes() { | 
917  | 908  |   std::unordered_map<TableId, IndexPermissions> permissions_to_set;  | 
918  | 908  |   bool all_success = true;  | 
919  | 908  |   { | 
920  | 908  |     auto l = indexed_table_->LockForRead();  | 
921  | 908  |     const auto& indexed_table_pb = l.data().pb;  | 
922  | 908  |     if (indexed_table_pb.backfill_jobs_size() == 0) { | 
923  |  |       // Some other task already marked the backfill job as done.  | 
924  | 24  |       return Status::OK();  | 
925  | 24  |     }  | 
926  | 884  |     DCHECK(indexed_table_pb.backfill_jobs_size() == 1) << "For now we only expect to have up to 1 "  | 
927  | 0  |                                                           "outstanding backfill job.";  | 
928  | 896  |     for (const auto& kv_pair : indexed_table_pb.backfill_jobs(0).backfill_state()) { | 
929  | 896  |       VLOG(2) << "Reading backfill_state for " << kv_pair.first << " as "  | 
930  | 1  |               << BackfillJobPB_State_Name(kv_pair.second);  | 
931  | 896  |       DCHECK_NE(kv_pair.second, BackfillJobPB::IN_PROGRESS)  | 
932  | 0  |           << __func__ << " is expected to be only called after all indexes are done.";  | 
933  | 896  |       const bool success = (kv_pair.second == BackfillJobPB::SUCCESS);  | 
934  | 896  |       all_success &= success;  | 
935  | 896  |       permissions_to_set.emplace(  | 
936  | 896  |           kv_pair.first,  | 
937  | 896  |           success ? INDEX_PERM_READ_WRITE_AND_DELETE879  : INDEX_PERM_WRITE_AND_DELETE_WHILE_REMOVING17 );  | 
938  | 896  |     }  | 
939  | 884  |   }  | 
940  |  |  | 
941  | 896  |   for (const auto& kv_pair : permissions_to_set) { | 
942  | 896  |     if (kv_pair.second == INDEX_PERM_READ_WRITE_AND_DELETE) { | 
943  | 879  |       RETURN_NOT_OK(AllowCompactionsToGCDeleteMarkers(kv_pair.first));  | 
944  | 879  |     }  | 
945  | 896  |   }  | 
946  |  |  | 
947  | 884  |   RETURN_NOT_OK_PREPEND(  | 
948  | 884  |       MultiStageAlterTable::UpdateIndexPermission(  | 
949  | 884  |           master_->catalog_manager_impl(), indexed_table_, permissions_to_set, boost::none),  | 
950  | 884  |       "Could not update permissions after backfill. "  | 
951  | 884  |       "Possible that the master-leader has changed.");  | 
952  | 884  |   backfill_job_->SetState(  | 
953  | 884  |       all_success ? MonitoredTaskState::kComplete859  : MonitoredTaskState::kFailed25 );  | 
954  | 884  |   RETURN_NOT_OK(ClearCheckpointStateInTablets());  | 
955  | 884  |   indexed_table_->ClearIsBackfilling();  | 
956  |  |  | 
957  | 884  |   VLOG(1) << "Sending alter table requests to the Indexed table"20 ;  | 
958  | 884  |   RETURN_NOT_OK(master_->catalog_manager_impl()->SendAlterTableRequest(indexed_table_));  | 
959  | 884  |   VLOG(1) << "DONE Sending alter table requests to the Indexed table"20 ;  | 
960  |  |  | 
961  | 884  |   LOG(INFO) << "Done backfill on " << indexed_table_->ToString() << " setting permissions to "  | 
962  | 884  |             << yb::ToString(permissions_to_set);  | 
963  | 884  |   return Status::OK();  | 
964  | 884  | }  | 
965  |  |  | 
966  | 875  | Status BackfillTable::ClearCheckpointStateInTablets() { | 
967  | 875  |   auto tablets = indexed_table_->GetTablets();  | 
968  | 875  |   std::vector<TabletInfo*> tablet_ptrs;  | 
969  | 4.23k  |   for (scoped_refptr<TabletInfo>& tablet : tablets) { | 
970  | 4.23k  |     tablet_ptrs.push_back(tablet.get());  | 
971  | 4.23k  |     tablet->mutable_metadata()->StartMutation();  | 
972  | 4.23k  |     auto& pb = tablet->mutable_metadata()->mutable_dirty()->pb;  | 
973  | 4.26k  |     for (const auto& idx : requested_index_ids_) { | 
974  | 4.26k  |       pb.mutable_backfilled_until()->erase(idx);  | 
975  | 4.26k  |     }  | 
976  | 4.23k  |   }  | 
977  | 875  |   RETURN_NOT_OK_PREPEND(  | 
978  | 875  |       master()->catalog_manager()->sys_catalog()->Upsert(leader_term(), tablet_ptrs),  | 
979  | 875  |       "Could not persist that the table is done backfilling.");  | 
980  | 4.23k  |   for (scoped_refptr<TabletInfo>& tablet : tablets) { | 
981  | 4.23k  |     VLOG(2) << "Done backfilling the table. " << yb::ToString(tablet)  | 
982  | 1  |             << " clearing backfilled_until";  | 
983  | 4.23k  |     tablet->mutable_metadata()->CommitMutation();  | 
984  | 4.23k  |   }  | 
985  |  |  | 
986  | 875  |   if (FLAGS_TEST_slowdown_backfill_job_deletion_ms > 0) { | 
987  | 20  |     SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_slowdown_backfill_job_deletion_ms));  | 
988  | 20  |   }  | 
989  |  |  | 
990  | 875  |   { | 
991  | 875  |     auto l = indexed_table_->LockForWrite();  | 
992  | 875  |     DCHECK_LE(l.data().pb.backfill_jobs_size(), 1) << "For now we only expect to have up to 1 "  | 
993  | 0  |                                                        "outstanding backfill job.";  | 
994  | 875  |     l.mutable_data()->pb.clear_backfill_jobs();  | 
995  | 875  |     RETURN_NOT_OK_PREPEND(master_->catalog_manager_impl()->sys_catalog_->Upsert(  | 
996  | 875  |                               leader_term(), indexed_table_),  | 
997  | 875  |                           "Could not clear backfilling timestamp.");  | 
998  | 875  |     l.Commit();  | 
999  | 875  |   }  | 
1000  | 11  |   VLOG_WITH_PREFIX(2) << "Cleared backfilling timestamp.";  | 
1001  | 875  |   return Status::OK();  | 
1002  | 875  | }  | 
1003  |  |  | 
1004  |  | Status BackfillTable::AllowCompactionsToGCDeleteMarkers(  | 
1005  | 879  |     const TableId &index_table_id) { | 
1006  | 879  |   DVLOG(3) << __PRETTY_FUNCTION__0 ;  | 
1007  | 879  |   auto res = master_->catalog_manager()->FindTableById(index_table_id);  | 
1008  | 879  |   if (!res && res.status().IsNotFound()0 ) {  | 
1009  | 0  |     LOG(ERROR) << "Index " << index_table_id << " was not found."  | 
1010  | 0  |                << " This is ok in case somebody issued a delete index. : " << res.ToString();  | 
1011  | 0  |     return Status::OK();  | 
1012  | 0  |   }  | 
1013  | 879  |   scoped_refptr<TableInfo> index_table_info = VERIFY_RESULT_PREPEND(std::move(res),  | 
1014  | 879  |       Format("Could not find the index table $0", index_table_id)); | 
1015  |  |  | 
1016  |  |   // Add a sleep here to wait until the Table is fully created.  | 
1017  | 0  |   bool is_ready = false;  | 
1018  | 879  |   bool first_run = true;  | 
1019  | 879  |   do { | 
1020  | 879  |     if (!first_run) { | 
1021  | 0  |       YB_LOG_EVERY_N_SECS(INFO, 1) << "Waiting for the previous alter table to "  | 
1022  | 0  |                                       "complete on the index table "  | 
1023  | 0  |                                    << index_table_id;  | 
1024  | 0  |       SleepFor(  | 
1025  | 0  |           MonoDelta::FromMilliseconds(FLAGS_index_backfill_wait_for_alter_table_completion_ms));  | 
1026  | 0  |     }  | 
1027  | 879  |     first_run = false;  | 
1028  | 879  |     { | 
1029  | 879  |       VLOG(2) << __func__ << ": Trying to lock index table for Read"0 ;  | 
1030  | 879  |       auto l = index_table_info->LockForRead();  | 
1031  | 879  |       auto state = l->pb.state();  | 
1032  | 879  |       if (state != SysTablesEntryPB::RUNNING && state != SysTablesEntryPB::ALTERING2 ) {  | 
1033  | 2  |         LOG(ERROR) << "Index " << index_table_id << " is in state "  | 
1034  | 2  |                    << SysTablesEntryPB_State_Name(state) << " : cannot enable compactions on it";  | 
1035  |  |         // Treating it as success so that we can proceed with updating other indexes.  | 
1036  | 2  |         return Status::OK();  | 
1037  | 2  |       }  | 
1038  | 877  |       is_ready = state == SysTablesEntryPB::RUNNING;  | 
1039  | 877  |     }  | 
1040  | 0  |     VLOG(2) << __func__ << ": Unlocked index table for Read";  | 
1041  | 877  |   } while (!is_ready);  | 
1042  | 877  |   { | 
1043  | 877  |     TRACE("Locking index table"); | 
1044  | 877  |     VLOG(2) << __func__ << ": Trying to lock index table for Write"0 ;  | 
1045  | 877  |     auto l = index_table_info->LockForWrite();  | 
1046  | 877  |     VLOG(2) << __func__ << ": locked index table for Write"0 ;  | 
1047  | 877  |     l.mutable_data()->pb.mutable_schema()->mutable_table_properties()  | 
1048  | 877  |         ->set_retain_delete_markers(false);  | 
1049  |  |  | 
1050  |  |     // Update sys-catalog with the new indexed table info.  | 
1051  | 877  |     TRACE("Updating index table metadata on disk"); | 
1052  | 877  |     RETURN_NOT_OK_PREPEND(  | 
1053  | 877  |         master_->catalog_manager_impl()->sys_catalog_->Upsert(  | 
1054  | 877  |             leader_term(), index_table_info),  | 
1055  | 877  |         yb::Format(  | 
1056  | 877  |             "Could not update index_table_info for $0 to enable compactions.",  | 
1057  | 877  |             index_table_id));  | 
1058  |  |  | 
1059  |  |     // Update the in-memory state.  | 
1060  | 877  |     TRACE("Committing in-memory state"); | 
1061  | 877  |     l.Commit();  | 
1062  | 877  |   }  | 
1063  | 0  |   VLOG(2) << __func__ << ": Unlocked index table for Read";  | 
1064  | 877  |   VLOG(1) << "Sending backfill done requests to the Index table"0 ;  | 
1065  | 877  |   RETURN_NOT_OK(SendRpcToAllowCompactionsToGCDeleteMarkers(index_table_info));  | 
1066  | 877  |   VLOG(1) << "DONE Sending backfill done requests to the Index table"0 ;  | 
1067  | 877  |   return Status::OK();  | 
1068  | 877  | }  | 
1069  |  |  | 
1070  |  | Status BackfillTable::SendRpcToAllowCompactionsToGCDeleteMarkers(  | 
1071  | 877  |     const scoped_refptr<TableInfo> &table) { | 
1072  | 877  |   auto tablets = table->GetTablets();  | 
1073  |  |  | 
1074  | 3.80k  |   for (const scoped_refptr<TabletInfo>& tablet : tablets) { | 
1075  | 3.80k  |     RETURN_NOT_OK(SendRpcToAllowCompactionsToGCDeleteMarkers(tablet, table->id()));  | 
1076  | 3.80k  |   }  | 
1077  | 877  |   return Status::OK();  | 
1078  | 877  | }  | 
1079  |  |  | 
1080  |  | Status BackfillTable::SendRpcToAllowCompactionsToGCDeleteMarkers(  | 
1081  | 3.80k  |     const scoped_refptr<TabletInfo> &tablet, const std::string &table_id) { | 
1082  | 3.80k  |   auto call = std::make_shared<AsyncBackfillDone>(master_, callback_pool_, tablet, table_id);  | 
1083  | 3.80k  |   tablet->table()->AddTask(call);  | 
1084  | 3.80k  |   RETURN_NOT_OK_PREPEND(  | 
1085  | 3.80k  |       master_->catalog_manager()->ScheduleTask(call),  | 
1086  | 3.80k  |       "Failed to send backfill done request");  | 
1087  | 3.80k  |   return Status::OK();  | 
1088  | 3.80k  | }  | 
1089  |  |  | 
1090  |  | // -----------------------------------------------------------------------------------------------  | 
1091  |  | // BackfillTablet  | 
1092  |  | // -----------------------------------------------------------------------------------------------  | 
1093  |  | BackfillTablet::BackfillTablet(  | 
1094  |  |     std::shared_ptr<BackfillTable> backfill_table, const scoped_refptr<TabletInfo>& tablet)  | 
1095  | 4.39k  |     : backfill_table_(backfill_table), tablet_(tablet) { | 
1096  | 4.39k  |   const auto& index_ids = backfill_table->indexes_to_build();  | 
1097  | 4.39k  |   { | 
1098  | 4.39k  |     auto l = tablet_->LockForRead();  | 
1099  | 4.39k  |     const auto& pb = tablet_->metadata().state().pb;  | 
1100  | 4.39k  |     Partition::FromPB(pb.partition(), &partition_);  | 
1101  |  |     // calculate backfilled_until_ as the largest key which all (active) indexes have backfilled.  | 
1102  | 4.42k  |     for (const TableId& idx_id : index_ids) { | 
1103  | 4.42k  |       if (pb.backfilled_until().find(idx_id) != pb.backfilled_until().end()) { | 
1104  | 0  |         auto key = pb.backfilled_until().at(idx_id);  | 
1105  | 0  |         if (backfilled_until_.empty() || key.compare(backfilled_until_) < 0) { | 
1106  | 0  |           VLOG(2) << "Updating backfilled_until_ as " << key;  | 
1107  | 0  |           backfilled_until_ = key;  | 
1108  | 0  |           done_.store(backfilled_until_.empty(), std::memory_order_release);  | 
1109  | 0  |         }  | 
1110  | 0  |       }  | 
1111  | 4.42k  |     }  | 
1112  | 4.39k  |   }  | 
1113  | 4.39k  |   if (!backfilled_until_.empty()) { | 
1114  | 0  |     VLOG_WITH_PREFIX(1) << " resuming backfill from "  | 
1115  | 0  |                         << yb::ToString(backfilled_until_);  | 
1116  | 4.39k  |   } else if (done()) { | 
1117  | 0  |     VLOG_WITH_PREFIX(1) << " backfill already done.";  | 
1118  | 4.39k  |   } else { | 
1119  | 4.39k  |     VLOG_WITH_PREFIX1 (1) << " beginning backfill from "  | 
1120  | 1  |                         << "<start-of-the-tablet>";  | 
1121  | 4.39k  |   }  | 
1122  | 4.39k  | }  | 
1123  |  |  | 
1124  | 3  | std::string BackfillTablet::LogPrefix() const { | 
1125  | 3  |   return Format("Backfill Index(es) $0 for tablet $1 ", | 
1126  | 3  |                 yb::ToString(backfill_table_->indexes_to_build()),  | 
1127  | 3  |                 tablet_->id());  | 
1128  | 3  | }  | 
1129  |  |  | 
1130  | 8.73k  | void BackfillTablet::LaunchNextChunkOrDone() { | 
1131  | 8.73k  |   if (done()) { | 
1132  | 4.18k  |     VLOG_WITH_PREFIX0 (1) << "is done"0 ;  | 
1133  | 4.18k  |     backfill_table_->Done(Status::OK(), /* failed_indexes */ {}); | 
1134  | 4.54k  |   } else if (!backfill_table_->done()) { | 
1135  | 4.52k  |     VLOG_WITH_PREFIX1 (2) << "Launching next chunk from " << backfilled_until_1 ;  | 
1136  | 4.52k  |     auto chunk = std::make_shared<BackfillChunk>(shared_from_this(),  | 
1137  | 4.52k  |                                                  backfilled_until_);  | 
1138  | 4.52k  |     chunk->Launch();  | 
1139  | 4.52k  |   }  | 
1140  | 8.73k  | }  | 
1141  |  |  | 
1142  |  | void BackfillTablet::Done(  | 
1143  |  |     const Status& status, const boost::optional<string>& backfilled_until,  | 
1144  | 4.37k  |     const uint64_t number_rows_processed, const std::unordered_set<TableId>& failed_indexes) { | 
1145  | 4.37k  |   if (!status.ok()) { | 
1146  | 41  |     LOG(INFO) << "Failed to backfill the tablet " << yb::ToString(tablet_) << ": " << status  | 
1147  | 41  |               << "\nFailed_indexes are " << yb::ToString(failed_indexes);  | 
1148  | 41  |     backfill_table_->Done(status, failed_indexes);  | 
1149  | 41  |   }  | 
1150  |  |  | 
1151  | 4.37k  |   if (backfilled_until) { | 
1152  | 4.34k  |     auto s = UpdateBackfilledUntil(*backfilled_until, number_rows_processed);  | 
1153  | 4.34k  |     if (!s.ok()) { | 
1154  | 24  |       LOG(WARNING) << "Could not persist how far the tablet is done backfilling. " << s.ToString();  | 
1155  | 24  |       return;  | 
1156  | 24  |     }  | 
1157  | 4.34k  |   }  | 
1158  |  |  | 
1159  | 4.34k  |   LaunchNextChunkOrDone();  | 
1160  | 4.34k  | }  | 
1161  |  |  | 
1162  |  | Status BackfillTablet::UpdateBackfilledUntil(  | 
1163  | 4.34k  |     const string& backfilled_until, const uint64_t number_rows_processed) { | 
1164  | 4.34k  |   backfilled_until_ = backfilled_until;  | 
1165  | 4.34k  |   VLOG_WITH_PREFIX1 (2) << "Done backfilling the tablet " << yb::ToString(tablet_) << " until "  | 
1166  | 1  |                       << yb::ToString(backfilled_until_);  | 
1167  | 4.34k  |   { | 
1168  | 4.34k  |     auto l = tablet_->LockForWrite();  | 
1169  | 4.34k  |     for (const auto& idx_id : backfill_table_->indexes_to_build()) { | 
1170  | 4.34k  |       l.mutable_data()->pb.mutable_backfilled_until()->insert({idx_id, backfilled_until_}); | 
1171  | 4.34k  |     }  | 
1172  | 4.34k  |     RETURN_NOT_OK(backfill_table_->master()->catalog_manager()->sys_catalog()->Upsert(  | 
1173  | 4.34k  |         backfill_table_->leader_term(), tablet_));  | 
1174  | 4.32k  |     l.Commit();  | 
1175  | 4.32k  |   }  | 
1176  |  |  | 
1177  |  |   // This is the last chunk.  | 
1178  | 4.32k  |   if (backfilled_until_.empty()) { | 
1179  | 4.18k  |     LOG(INFO) << "Done backfilling the tablet " << yb::ToString(tablet_);  | 
1180  | 4.18k  |     done_.store(true, std::memory_order_release);  | 
1181  | 4.18k  |   }  | 
1182  | 4.32k  |   return backfill_table_->UpdateRowsProcessedForIndexTable(number_rows_processed);  | 
1183  | 4.34k  | }  | 
1184  |  |  | 
1185  |  | // -----------------------------------------------------------------------------------------------  | 
1186  |  | // GetSafeTimeForTablet  | 
1187  |  | // -----------------------------------------------------------------------------------------------  | 
1188  |  |  | 
1189  | 4.40k  | void GetSafeTimeForTablet::Launch() { | 
1190  | 4.40k  |   tablet_->table()->AddTask(shared_from_this());  | 
1191  | 4.40k  |   Status status = Run();  | 
1192  |  |   // Need to print this after Run() because that's where it picks the TS which description()  | 
1193  |  |   // needs.  | 
1194  | 4.40k  |   if (status.ok()) { | 
1195  | 4.40k  |     VLOG(3) << "Started GetSafeTimeForTablet : " << this->description()1 ;  | 
1196  | 4.40k  |   } else { | 
1197  | 0  |     LOG(WARNING) << Substitute("Failed to send GetSafeTime request for $0. ", | 
1198  | 0  |                                tablet_->ToString())  | 
1199  | 0  |                  << status;  | 
1200  | 0  |   }  | 
1201  | 4.40k  | }  | 
1202  |  |  | 
1203  | 4.40k  | bool GetSafeTimeForTablet::SendRequest(int attempt) { | 
1204  | 4.40k  |   VLOG(1) << __PRETTY_FUNCTION__1 ;  | 
1205  | 4.40k  |   tserver::GetSafeTimeRequestPB req;  | 
1206  | 4.40k  |   req.set_dest_uuid(permanent_uuid());  | 
1207  | 4.40k  |   req.set_tablet_id(tablet_->tablet_id());  | 
1208  | 4.40k  |   auto now = backfill_table_->master()->clock()->Now().ToUint64();  | 
1209  | 4.40k  |   req.set_min_hybrid_time_for_backfill(min_cutoff_.ToUint64());  | 
1210  | 4.40k  |   req.set_propagated_hybrid_time(now);  | 
1211  |  |  | 
1212  | 4.40k  |   ts_admin_proxy_->GetSafeTimeAsync(req, &resp_, &rpc_, BindRpcCallback());  | 
1213  | 4.40k  |   VLOG(1) << "Send " << description() << " to " << permanent_uuid()  | 
1214  | 1  |           << " (attempt " << attempt << "):\n"  | 
1215  | 1  |           << req.DebugString();  | 
1216  | 4.40k  |   return true;  | 
1217  | 4.40k  | }  | 
1218  |  |  | 
1219  | 4.39k  | void GetSafeTimeForTablet::HandleResponse(int attempt) { | 
1220  | 4.39k  |   VLOG(1) << __PRETTY_FUNCTION__3 ;  | 
1221  | 4.39k  |   Status status = Status::OK();  | 
1222  | 4.39k  |   if (resp_.has_error()) { | 
1223  | 3  |     status = StatusFromPB(resp_.error().status());  | 
1224  |  |  | 
1225  |  |     // Do not retry on a fatal error  | 
1226  | 3  |     switch (resp_.error().code()) { | 
1227  | 0  |       case TabletServerErrorPB::TABLET_NOT_FOUND:  | 
1228  | 0  |       case TabletServerErrorPB::MISMATCHED_SCHEMA:  | 
1229  | 0  |       case TabletServerErrorPB::TABLET_HAS_A_NEWER_SCHEMA:  | 
1230  | 0  |       case TabletServerErrorPB::OPERATION_NOT_SUPPORTED:  | 
1231  | 0  |         LOG(WARNING) << "TS " << permanent_uuid() << ": GetSafeTime failed for tablet "  | 
1232  | 0  |                      << tablet_->ToString() << " no further retry: " << status;  | 
1233  | 0  |         TransitionToFailedState(MonitoredTaskState::kRunning, status);  | 
1234  | 0  |         break;  | 
1235  | 3  |       default:  | 
1236  | 3  |         LOG(WARNING) << "TS " << permanent_uuid() << ": GetSafeTime failed for tablet "  | 
1237  | 3  |                      << tablet_->ToString() << ": " << status << " code " << resp_.error().code();  | 
1238  | 3  |         break;  | 
1239  | 3  |     }  | 
1240  | 4.39k  |   } else { | 
1241  | 4.39k  |     TransitionToCompleteState();  | 
1242  | 4.39k  |     VLOG(1) << "TS " << permanent_uuid() << ": GetSafeTime complete on tablet "  | 
1243  | 5  |             << tablet_->ToString();  | 
1244  | 4.39k  |   }  | 
1245  |  |  | 
1246  | 4.39k  |   server::UpdateClock(resp_, master_->clock());  | 
1247  | 4.39k  | }  | 
1248  |  |  | 
1249  | 4.40k  | void GetSafeTimeForTablet::UnregisterAsyncTaskCallback() { | 
1250  | 4.40k  |   Status status;  | 
1251  | 4.40k  |   HybridTime safe_time;  | 
1252  | 4.40k  |   if (resp_.has_error()) { | 
1253  | 0  |     status = StatusFromPB(resp_.error().status());  | 
1254  | 0  |     VLOG(3) << "GetSafeTime for " << tablet_->ToString() << " got an error. Returning "  | 
1255  | 0  |             << safe_time;  | 
1256  | 4.40k  |   } else if (state() != MonitoredTaskState::kComplete) { | 
1257  | 1  |     status = STATUS_FORMAT(InternalError, "$0 in state $1", description(), state());  | 
1258  | 4.39k  |   } else { | 
1259  | 4.39k  |     safe_time = HybridTime(resp_.safe_time());  | 
1260  | 4.39k  |     if (safe_time.is_special()) { | 
1261  | 0  |       LOG(ERROR) << "GetSafeTime for " << tablet_->ToString() << " got " << safe_time;  | 
1262  | 4.39k  |     } else { | 
1263  | 4.39k  |       VLOG(3) << "GetSafeTime for " << tablet_->ToString() << " got " << safe_time1 ;  | 
1264  | 4.39k  |     }  | 
1265  | 4.39k  |   }  | 
1266  | 4.40k  |   WARN_NOT_OK(backfill_table_->UpdateSafeTime(status, safe_time),  | 
1267  | 4.40k  |     "Could not UpdateSafeTime");  | 
1268  | 4.40k  | }  | 
1269  |  |  | 
1270  | 4.40k  | TabletServerId GetSafeTimeForTablet::permanent_uuid() { | 
1271  | 4.40k  |   return target_ts_desc_ != nullptr ? target_ts_desc_->permanent_uuid() : ""0 ;  | 
1272  | 4.40k  | }  | 
1273  |  |  | 
1274  |  | BackfillChunk::BackfillChunk(std::shared_ptr<BackfillTablet> backfill_tablet,  | 
1275  |  |                              const std::string& start_key)  | 
1276  |  |     : RetryingTSRpcTask(backfill_tablet->master(),  | 
1277  |  |                         backfill_tablet->threadpool(),  | 
1278  |  |                         std::unique_ptr<TSPicker>(new PickLeaderReplica(backfill_tablet->tablet())),  | 
1279  |  |                         backfill_tablet->tablet()->table().get()),  | 
1280  |  |       indexes_being_backfilled_(backfill_tablet->indexes_to_build()),  | 
1281  |  |       backfill_tablet_(backfill_tablet),  | 
1282  |  |       start_key_(start_key),  | 
1283  |  |       requested_index_names_(RetrieveIndexNames(backfill_tablet->master()->catalog_manager_impl(),  | 
1284  | 4.52k  |                                                 indexes_being_backfilled_)) { | 
1285  | 4.52k  |   deadline_ = MonoTime::Max(); // Never time out.  | 
1286  | 4.52k  | }  | 
1287  |  |  | 
1288  |  | // -----------------------------------------------------------------------------------------------  | 
1289  |  | // BackfillChunk  | 
1290  |  | // -----------------------------------------------------------------------------------------------  | 
1291  | 4.52k  | void BackfillChunk::Launch() { | 
1292  | 4.52k  |   backfill_tablet_->tablet()->table()->AddTask(shared_from_this());  | 
1293  | 4.52k  |   Status status = Run();  | 
1294  | 4.52k  |   WARN_NOT_OK(  | 
1295  | 4.52k  |       status, Substitute(  | 
1296  | 4.52k  |                   "Failed to send backfill Chunk request for $0",  | 
1297  | 4.52k  |                   backfill_tablet_->tablet().get()->ToString()));  | 
1298  |  |  | 
1299  |  |   // Need to print this after Run() because that's where it picks the TS which description()  | 
1300  |  |   // needs.  | 
1301  | 4.52k  |   if (status.ok()) { | 
1302  | 4.52k  |     LOG(INFO) << "Started BackfillChunk : " << this->description();  | 
1303  | 4.52k  |   }  | 
1304  | 4.52k  | }  | 
1305  |  |  | 
1306  | 4.52k  | MonoTime BackfillChunk::ComputeDeadline() { | 
1307  | 4.52k  |   MonoTime timeout = MonoTime::Now();  | 
1308  | 4.52k  |   if (GetTableType() == TableType::PGSQL_TABLE_TYPE) { | 
1309  | 1.99k  |     timeout.AddDelta(MonoDelta::FromMilliseconds(FLAGS_ysql_index_backfill_rpc_timeout_ms));  | 
1310  | 2.53k  |   } else { | 
1311  | 2.53k  |     DCHECK(GetTableType() == TableType::YQL_TABLE_TYPE);  | 
1312  | 2.53k  |     timeout.AddDelta(MonoDelta::FromMilliseconds(FLAGS_index_backfill_rpc_timeout_ms));  | 
1313  | 2.53k  |   }  | 
1314  | 4.52k  |   return MonoTime::Earliest(timeout, deadline_);  | 
1315  | 4.52k  | }  | 
1316  |  |  | 
1317  | 6  | int BackfillChunk::num_max_retries() { | 
1318  | 6  |   return FLAGS_index_backfill_rpc_max_retries;  | 
1319  | 6  | }  | 
1320  |  |  | 
1321  | 0  | int BackfillChunk::max_delay_ms() { | 
1322  | 0  |   return FLAGS_index_backfill_rpc_max_delay_ms;  | 
1323  | 0  | }  | 
1324  |  |  | 
1325  | 13.4k  | std::string BackfillChunk::description() const { | 
1326  | 13.4k  |   return yb::Format("Backfilling indexes $0 for tablet $1 from key '$2'", | 
1327  | 13.4k  |                     requested_index_names_, tablet_id(),  | 
1328  | 13.4k  |                     b2a_hex(start_key_));  | 
1329  | 13.4k  | }  | 
1330  |  |  | 
1331  | 4.52k  | bool BackfillChunk::SendRequest(int attempt) { | 
1332  | 4.52k  |   VLOG(1) << __PRETTY_FUNCTION__1 ;  | 
1333  | 4.52k  |   if (indexes_being_backfilled_.empty()) { | 
1334  | 0  |     TransitionToCompleteState();  | 
1335  | 0  |     return false;  | 
1336  | 0  |   }  | 
1337  |  |  | 
1338  | 4.52k  |   tserver::BackfillIndexRequestPB req;  | 
1339  | 4.52k  |   req.set_dest_uuid(permanent_uuid());  | 
1340  | 4.52k  |   req.set_tablet_id(backfill_tablet_->tablet()->tablet_id());  | 
1341  | 4.52k  |   req.set_read_at_hybrid_time(backfill_tablet_->read_time_for_backfill().ToUint64());  | 
1342  | 4.52k  |   req.set_schema_version(backfill_tablet_->schema_version());  | 
1343  | 4.52k  |   req.set_start_key(start_key_);  | 
1344  | 4.52k  |   req.set_indexed_table_id(backfill_tablet_->indexed_table_id());  | 
1345  | 4.52k  |   if (GetTableType() == TableType::PGSQL_TABLE_TYPE) { | 
1346  | 1.99k  |     req.set_namespace_name(backfill_tablet_->GetNamespaceName());  | 
1347  | 1.99k  |   }  | 
1348  | 4.52k  |   std::unordered_set<TableId> found_idxs;  | 
1349  | 4.56k  |   for (const IndexInfoPB& idx_info : backfill_tablet_->index_infos()) { | 
1350  | 4.56k  |     if (indexes_being_backfilled_.find(idx_info.table_id()) != indexes_being_backfilled_.end()) { | 
1351  | 4.56k  |       req.add_indexes()->CopyFrom(idx_info);  | 
1352  | 4.56k  |       found_idxs.insert(idx_info.table_id());  | 
1353  | 4.56k  |     }  | 
1354  | 4.56k  |   }  | 
1355  | 4.52k  |   if (found_idxs.size() != indexes_being_backfilled_.size()) { | 
1356  |  |     // We could not find the IndexInfoPB for all the requested indexes. This can happen  | 
1357  |  |     // if that index was deleted while the backfill was still going on.  | 
1358  |  |     // We are going to fail fast and mark that index as failed.  | 
1359  | 0  |     for (auto& idx : indexes_being_backfilled_) { | 
1360  | 0  |       if (found_idxs.find(idx) == found_idxs.end()) { | 
1361  | 0  |         VLOG_WITH_PREFIX(3) << "Marking " << idx << " as failed";  | 
1362  | 0  |         *resp_.add_failed_index_ids() = idx;  | 
1363  | 0  |       }  | 
1364  | 0  |     }  | 
1365  | 0  |     const string error_message("Could not find IndexInfoPB for some indexes"); | 
1366  | 0  |     resp_.mutable_error()->mutable_status()->set_code(AppStatusPB::NOT_FOUND);  | 
1367  | 0  |     resp_.mutable_error()->mutable_status()->set_message(error_message);  | 
1368  | 0  |     TransitionToFailedState(MonitoredTaskState::kRunning,  | 
1369  | 0  |                             STATUS(NotFound, error_message));  | 
1370  | 0  |     return false;  | 
1371  | 0  |   }  | 
1372  | 4.52k  |   req.set_propagated_hybrid_time(backfill_tablet_->master()->clock()->Now().ToUint64());  | 
1373  |  |  | 
1374  | 4.52k  |   ts_admin_proxy_->BackfillIndexAsync(req, &resp_, &rpc_, BindRpcCallback());  | 
1375  | 4.52k  |   VLOG(1) << "Send " << description() << " to " << permanent_uuid()  | 
1376  | 1  |           << " (attempt " << attempt << "):\n"  | 
1377  | 1  |           << req.DebugString();  | 
1378  | 4.52k  |   return true;  | 
1379  | 4.52k  | }  | 
1380  |  |  | 
1381  | 4.36k  | void BackfillChunk::HandleResponse(int attempt) { | 
1382  | 4.36k  |   VLOG(1) << __PRETTY_FUNCTION__ << " response is " << yb::ToString(resp_)1 ;  | 
1383  | 4.36k  |   Status status;  | 
1384  | 4.36k  |   if (resp_.has_error()) { | 
1385  | 38  |     status = StatusFromPB(resp_.error().status());  | 
1386  |  |  | 
1387  |  |     // Do not retry on a fatal error  | 
1388  | 38  |     switch (resp_.error().code()) { | 
1389  | 9  |       case TabletServerErrorPB::MISMATCHED_SCHEMA:  | 
1390  | 38  |       case TabletServerErrorPB::OPERATION_NOT_SUPPORTED:  | 
1391  | 38  |       case TabletServerErrorPB::TABLET_HAS_A_NEWER_SCHEMA:  | 
1392  | 38  |       case TabletServerErrorPB::TABLET_NOT_FOUND:  | 
1393  | 38  |         LOG(WARNING) << "TS " << permanent_uuid() << ": backfill failed for tablet "  | 
1394  | 38  |                      << backfill_tablet_->tablet()->ToString() << " no further retry: " << status  | 
1395  | 38  |                      << " response was " << yb::ToString(resp_);  | 
1396  | 38  |         TransitionToFailedState(MonitoredTaskState::kRunning, status);  | 
1397  | 38  |         break;  | 
1398  | 0  |       default:  | 
1399  | 0  |         LOG(WARNING) << "TS " << permanent_uuid() << ": backfill failed for tablet "  | 
1400  | 0  |                      << backfill_tablet_->tablet()->ToString() << ": " << status.ToString()  | 
1401  | 0  |                      << " code " << resp_.error().code();  | 
1402  | 0  |         break;  | 
1403  | 38  |     }  | 
1404  | 4.33k  |   } else { | 
1405  | 4.33k  |     TransitionToCompleteState();  | 
1406  | 4.33k  |     VLOG(1) << "TS " << permanent_uuid() << ": backfill complete on tablet "  | 
1407  | 5  |             << backfill_tablet_->tablet()->ToString();  | 
1408  | 4.33k  |   }  | 
1409  |  |  | 
1410  | 4.36k  |   server::UpdateClock(resp_, master_->clock());  | 
1411  | 4.36k  | }  | 
1412  |  |  | 
1413  | 4.45k  | void BackfillChunk::UnregisterAsyncTaskCallback() { | 
1414  | 4.45k  |   if (state() == MonitoredTaskState::kAborted) { | 
1415  | 84  |     VLOG(1) << " was aborted"0 ;  | 
1416  | 84  |     return;  | 
1417  | 84  |   }  | 
1418  |  |  | 
1419  | 4.37k  |   Status status;  | 
1420  | 4.37k  |   std::unordered_set<TableId> failed_indexes;  | 
1421  | 4.37k  |   if (resp_.has_error()) { | 
1422  | 38  |     status = StatusFromPB(resp_.error().status());  | 
1423  | 38  |     if (resp_.failed_index_ids_size() > 0) { | 
1424  | 58  |       for (int i = 0; i < resp_.failed_index_ids_size(); i++29 ) {  | 
1425  | 29  |         VLOG(1) << " Added to failed index " << resp_.failed_index_ids(i)1 ;  | 
1426  | 29  |         failed_indexes.insert(resp_.failed_index_ids(i));  | 
1427  | 29  |       }  | 
1428  | 29  |     } else { | 
1429  |  |       // No specific index was marked as a failure. So consider all of them as failed.  | 
1430  | 9  |       failed_indexes = indexes_being_backfilled_;  | 
1431  | 9  |     }  | 
1432  | 4.33k  |   } else if (state() != MonitoredTaskState::kComplete) { | 
1433  |  |     // There is no response, so the error happened even before we could  | 
1434  |  |     // get a response. Mark all indexes as failed.  | 
1435  | 3  |     failed_indexes = indexes_being_backfilled_;  | 
1436  | 3  |     VLOG(3) << "Considering all indexes : "  | 
1437  | 0  |             << yb::ToString(indexes_being_backfilled_)  | 
1438  | 0  |             << " as failed.";  | 
1439  | 3  |     status = STATUS_FORMAT(InternalError, "$0 in state $1", description(), state());  | 
1440  | 3  |   }  | 
1441  |  |  | 
1442  | 4.37k  |   if (resp_.has_backfilled_until()) { | 
1443  | 4.35k  |     backfill_tablet_->Done(  | 
1444  | 4.35k  |         status, resp_.backfilled_until(), resp_.number_rows_processed(), failed_indexes);  | 
1445  | 4.35k  |   } else { | 
1446  | 20  |     backfill_tablet_->Done(status, boost::none, resp_.number_rows_processed(), failed_indexes);  | 
1447  | 20  |   }  | 
1448  | 4.37k  | }  | 
1449  |  |  | 
1450  | 4.56k  | TabletServerId BackfillChunk::permanent_uuid() { | 
1451  | 4.56k  |   return target_ts_desc_ != nullptr ? target_ts_desc_->permanent_uuid() : ""0 ;  | 
1452  | 4.56k  | }  | 
1453  |  |  | 
1454  |  | }  // namespace master  | 
1455  |  | }  // namespace yb  |