YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/master/backfill_index.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/master/backfill_index.h"
15
16
#include <pthread.h>
17
#include <stdlib.h>
18
#include <sys/types.h>
19
20
#include <algorithm>
21
#include <bitset>
22
#include <functional>
23
#include <memory>
24
#include <mutex>
25
#include <set>
26
#include <string>
27
#include <unordered_map>
28
#include <vector>
29
30
#include <boost/optional.hpp>
31
#include <boost/preprocessor/cat.hpp>
32
#include <glog/logging.h>
33
34
#include "yb/common/partial_row.h"
35
#include "yb/common/partition.h"
36
#include "yb/common/wire_protocol.h"
37
38
#include "yb/docdb/doc_rowwise_iterator.h"
39
40
#include "yb/gutil/atomicops.h"
41
#include "yb/gutil/callback.h"
42
#include "yb/gutil/casts.h"
43
#include "yb/gutil/map-util.h"
44
#include "yb/gutil/mathlimits.h"
45
#include "yb/gutil/ref_counted.h"
46
#include "yb/gutil/stl_util.h"
47
#include "yb/gutil/strings/escaping.h"
48
#include "yb/gutil/strings/join.h"
49
#include "yb/gutil/strings/substitute.h"
50
#include "yb/gutil/sysinfo.h"
51
52
#include "yb/master/master_fwd.h"
53
#include "yb/master/async_rpc_tasks.h"
54
#include "yb/master/catalog_manager.h"
55
#include "yb/master/master.h"
56
#include "yb/master/master_ddl.pb.h"
57
#include "yb/master/sys_catalog.h"
58
59
#include "yb/tablet/tablet.h"
60
#include "yb/tablet/tablet_metadata.h"
61
#include "yb/tablet/tablet_peer.h"
62
63
#include "yb/tserver/tserver_admin.proxy.h"
64
65
#include "yb/util/flag_tags.h"
66
#include "yb/util/format.h"
67
#include "yb/util/math_util.h"
68
#include "yb/util/monotime.h"
69
#include "yb/util/random_util.h"
70
#include "yb/util/result.h"
71
#include "yb/util/status_format.h"
72
#include "yb/util/status_log.h"
73
#include "yb/util/threadpool.h"
74
#include "yb/util/trace.h"
75
#include "yb/util/uuid.h"
76
77
DEFINE_int32(ysql_index_backfill_rpc_timeout_ms, 60 * 1000, // 1 min.
78
             "Timeout used by the master when attempting to backfill a YSQL tablet during index "
79
             "creation.");
80
TAG_FLAG(ysql_index_backfill_rpc_timeout_ms, advanced);
81
TAG_FLAG(ysql_index_backfill_rpc_timeout_ms, runtime);
82
83
DEFINE_int32(index_backfill_rpc_timeout_ms, 1 * 30 * 1000, // 30 sec.
84
             "Timeout used by the master when attempting to backfill a tablet "
85
             "during index creation.");
86
TAG_FLAG(index_backfill_rpc_timeout_ms, advanced);
87
TAG_FLAG(index_backfill_rpc_timeout_ms, runtime);
88
89
DEFINE_int32(index_backfill_rpc_max_retries, 150,
90
             "Number of times to retry backfilling a tablet chunk "
91
             "during index creation.");
92
TAG_FLAG(index_backfill_rpc_max_retries, advanced);
93
TAG_FLAG(index_backfill_rpc_max_retries, runtime);
94
95
DEFINE_int32(index_backfill_rpc_max_delay_ms, 10 * 60 * 1000, // 10 min.
96
             "Maximum delay before retrying a backfill tablet chunk request "
97
             "during index creation.");
98
TAG_FLAG(index_backfill_rpc_max_delay_ms, advanced);
99
TAG_FLAG(index_backfill_rpc_max_delay_ms, runtime);
100
101
DEFINE_int32(index_backfill_wait_for_alter_table_completion_ms, 100,
102
             "Delay before retrying to see if an in-progress alter table has "
103
             "completed, during index backfill.");
104
TAG_FLAG(index_backfill_wait_for_alter_table_completion_ms, advanced);
105
TAG_FLAG(index_backfill_wait_for_alter_table_completion_ms, runtime);
106
107
DEFINE_bool(defer_index_backfill, false,
108
            "Defer index backfill so that backfills can be performed as a batch later on.");
109
TAG_FLAG(defer_index_backfill, advanced);
110
TAG_FLAG(defer_index_backfill, runtime);
111
112
DEFINE_test_flag(int32, slowdown_backfill_alter_table_rpcs_ms, 0,
113
    "Slows down the send alter table rpc's so that the master may be stopped between "
114
    "different phases.");
115
116
DEFINE_test_flag(
117
    int32, slowdown_backfill_job_deletion_ms, 0,
118
    "Slows down backfill job deletion so that backfill job can be read by test.");
119
120
namespace yb {
121
namespace master {
122
123
using namespace std::literals;
124
using server::MonitoredTaskState;
125
using strings::Substitute;
126
using tserver::TabletServerErrorPB;
127
128
namespace {
129
130
// Peek into pg_index table to get an index (boolean) status from YSQL perspective.
131
Result<bool> GetPgIndexStatus(
132
    CatalogManager* catalog_manager,
133
    const TableId& idx_id,
134
0
    const std::string& status_col_name) {
135
0
  const auto pg_index_id =
136
0
      GetPgsqlTableId(VERIFY_RESULT(GetPgsqlDatabaseOid(idx_id)), kPgIndexTableOid);
137
138
0
  const tablet::Tablet* catalog_tablet =
139
0
      catalog_manager->tablet_peer()->tablet();
140
0
  const Schema& pg_index_schema =
141
0
      *VERIFY_RESULT(catalog_tablet->metadata()->GetTableInfo(pg_index_id))->schema;
142
143
0
  Schema projection;
144
0
  RETURN_NOT_OK(pg_index_schema.CreateProjectionByNames({"indexrelid", status_col_name},
145
0
                                                        &projection,
146
0
                                                        pg_index_schema.num_key_columns()));
147
148
0
  const auto indexrelid_col_id = VERIFY_RESULT(projection.ColumnIdByName("indexrelid")).rep();
149
0
  const auto status_col_id     = VERIFY_RESULT(projection.ColumnIdByName(status_col_name)).rep();
150
151
0
  const auto idx_oid = VERIFY_RESULT(GetPgsqlTableOid(idx_id));
152
153
0
  auto iter = VERIFY_RESULT(catalog_tablet->NewRowIterator(projection.CopyWithoutColumnIds(),
154
0
                                                           {} /* read_hybrid_time */,
155
0
                                                           pg_index_id));
156
157
  // Filtering by 'indexrelid' == idx_oid.
158
0
  {
159
0
    auto doc_iter = down_cast<docdb::DocRowwiseIterator*>(iter.get());
160
0
    PgsqlConditionPB cond;
161
0
    cond.add_operands()->set_column_id(indexrelid_col_id);
162
0
    cond.set_op(QL_OP_EQUAL);
163
0
    cond.add_operands()->mutable_value()->set_uint32_value(idx_oid);
164
0
    const std::vector<docdb::PrimitiveValue> empty_key_components;
165
0
    docdb::DocPgsqlScanSpec spec(projection,
166
0
                                 rocksdb::kDefaultQueryId,
167
0
                                 empty_key_components,
168
0
                                 empty_key_components,
169
0
                                 &cond,
170
0
                                 boost::none /* hash_code */,
171
0
                                 boost::none /* max_hash_code */,
172
0
                                 nullptr /* where_expr */);
173
0
    RETURN_NOT_OK(doc_iter->Init(spec));
174
0
  }
175
176
  // Expecting one row at most.
177
0
  QLTableRow row;
178
0
  if (VERIFY_RESULT(iter->HasNext())) {
179
0
    RETURN_NOT_OK(iter->NextRow(&row));
180
0
    return row.GetColumn(status_col_id)->bool_value();
181
0
  }
182
183
  // For practical purposes, an absent index is the same as having false status column value.
184
0
  return false;
185
0
}
186
187
// Before advancing index permissions, we need to make sure Postgres side has advanced sufficiently
188
// - that the state tracked in pg_index haven't fallen behind from the desired permission
189
// for more than one step.
190
Result<bool> ShouldProceedWithPgsqlIndexPermissionUpdate(
191
    CatalogManager* catalog_manager,
192
    const TableId& idx_id,
193
534
    IndexPermissions new_perm) {
194
  // TODO(alex, jason): Add the appropriate cases for dropping index path
195
534
  switch (new_perm) {
196
0
    case INDEX_PERM_WRITE_AND_DELETE: {
197
0
      auto live = VERIFY_RESULT(GetPgIndexStatus(catalog_manager, idx_id, "indislive"));
198
0
      if (!live) {
199
0
        VLOG(1) << "Index " << idx_id << " is not yet live, skipping permission update";
200
0
      }
201
0
      return live;
202
0
    }
203
0
    case INDEX_PERM_DO_BACKFILL: {
204
0
      auto ready = VERIFY_RESULT(GetPgIndexStatus(catalog_manager, idx_id, "indisready"));
205
0
      if (!ready) {
206
0
        VLOG(1) << "Index " << idx_id << " is not yet ready, skipping permission update";
207
0
      }
208
0
      return ready;
209
0
    }
210
534
    default:
211
      // No need to wait for anything
212
534
      return true;
213
534
  }
214
534
}
215
216
} // namespace
217
218
4.41k
void MultiStageAlterTable::CopySchemaDetailsToFullyApplied(SysTablesEntryPB* pb) {
219
4.41k
  VLOG
(4) << "Setting fully_applied_schema_version to " << pb->version()0
;
220
4.41k
  pb->mutable_fully_applied_schema()->CopyFrom(pb->schema());
221
4.41k
  pb->set_fully_applied_schema_version(pb->version());
222
4.41k
  pb->mutable_fully_applied_indexes()->CopyFrom(pb->indexes());
223
4.41k
  if (pb->has_index_info()) {
224
0
    pb->mutable_fully_applied_index_info()->CopyFrom(pb->index_info());
225
0
  }
226
4.41k
}
227
228
Status MultiStageAlterTable::ClearFullyAppliedAndUpdateState(
229
    CatalogManager* catalog_manager,
230
    const scoped_refptr<TableInfo>& table,
231
    boost::optional<uint32_t> expected_version,
232
9.85k
    bool update_state_to_running) {
233
9.85k
  auto l = table->LockForWrite();
234
9.85k
  uint32_t current_version = l->pb.version();
235
9.85k
  if (expected_version && 
*expected_version != current_version9.30k
) {
236
9
    return STATUS(AlreadyPresent, "Table has already moved to a different version.");
237
9
  }
238
9.84k
  l.mutable_data()->pb.clear_fully_applied_schema();
239
9.84k
  l.mutable_data()->pb.clear_fully_applied_schema_version();
240
9.84k
  l.mutable_data()->pb.clear_fully_applied_indexes();
241
9.84k
  l.mutable_data()->pb.clear_fully_applied_index_info();
242
9.84k
  auto new_state = update_state_to_running ? 
SysTablesEntryPB::RUNNING8.89k
:
SysTablesEntryPB::ALTERING950
;
243
9.84k
  l.mutable_data()->set_state(new_state, Format("Current schema version=$0", current_version));
244
245
9.84k
  Status s = catalog_manager->sys_catalog_->Upsert(catalog_manager->leader_ready_term(), table);
246
9.84k
  if (!s.ok()) {
247
0
    LOG(WARNING) << "An error occurred while updating sys-tables: " << s.ToString()
248
0
                 << ". This master may not be the leader anymore.";
249
0
    return s;
250
0
  }
251
252
9.84k
  l.Commit();
253
9.84k
  LOG(INFO) << table->ToString() << " - Alter table completed version=" << current_version
254
9.84k
            << ", state: " << SysTablesEntryPB::State_Name(new_state);
255
9.84k
  return Status::OK();
256
9.84k
}
257
258
Result<bool> MultiStageAlterTable::UpdateIndexPermission(
259
    CatalogManager* catalog_manager,
260
    const scoped_refptr<TableInfo>& indexed_table,
261
    const std::unordered_map<TableId, IndexPermissions>& perm_mapping,
262
2.13k
    boost::optional<uint32_t> current_version) {
263
2.13k
  TRACE(__func__);
264
2.13k
  DVLOG
(3) << __PRETTY_FUNCTION__ << " " << yb::ToString(*indexed_table)5
;
265
2.13k
  if (FLAGS_TEST_slowdown_backfill_alter_table_rpcs_ms > 0) {
266
196
    TRACE("Sleeping for $0 ms", FLAGS_TEST_slowdown_backfill_alter_table_rpcs_ms);
267
196
    DVLOG(3) << __PRETTY_FUNCTION__ << " " << yb::ToString(*indexed_table) << " sleeping for "
268
5
             << FLAGS_TEST_slowdown_backfill_alter_table_rpcs_ms
269
5
             << "ms BEFORE updating the index permission to " << ToString(perm_mapping);
270
196
    SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_slowdown_backfill_alter_table_rpcs_ms));
271
196
    DVLOG
(3) << __PRETTY_FUNCTION__ << " Done Sleeping"7
;
272
196
    TRACE("Done Sleeping");
273
196
  }
274
275
2.13k
  bool permissions_updated = false;
276
2.13k
  {
277
2.13k
    TRACE("Locking indexed table");
278
2.13k
    auto l = indexed_table->LockForWrite();
279
2.13k
    auto& indexed_table_data = *l.mutable_data();
280
2.13k
    auto& indexed_table_pb = indexed_table_data.pb;
281
2.13k
    if (current_version && 
*current_version != indexed_table_pb.version()1.14k
) {
282
180
      LOG(INFO) << "The table schema version "
283
180
                << "seems to have already been updated to " << indexed_table_pb.version()
284
180
                << " We wanted to do this update at " << *current_version;
285
180
      return STATUS_SUBSTITUTE(
286
180
          AlreadyPresent, "Schema was already updated to $0 before we got to it (expected $1).",
287
180
          indexed_table_pb.version(), *current_version);
288
180
    }
289
290
1.95k
    CopySchemaDetailsToFullyApplied(&indexed_table_pb);
291
1.95k
    bool is_pgsql = indexed_table_pb.table_type() == TableType::PGSQL_TABLE_TYPE;
292
5.48k
    for (int i = 0; i < indexed_table_pb.indexes_size(); 
i++3.52k
) {
293
3.52k
      IndexInfoPB* idx_pb = indexed_table_pb.mutable_indexes(i);
294
3.52k
      auto& idx_table_id = idx_pb->table_id();
295
3.52k
      if (perm_mapping.find(idx_table_id) != perm_mapping.end()) {
296
1.97k
        const auto new_perm = perm_mapping.at(idx_table_id);
297
1.97k
        if (idx_pb->index_permissions() >= new_perm) {
298
1
          LOG(WARNING) << "Index " << idx_pb->table_id() << " on table "
299
1
                       << indexed_table->ToString() << " has index_permission "
300
1
                       << IndexPermissions_Name(idx_pb->index_permissions()) << " already past "
301
1
                       << IndexPermissions_Name(new_perm) << ". Will not update it";
302
1
          continue;
303
1
        }
304
        // TODO(alex, amit): Non-OK status here should be converted to TryAgain,
305
        //                   which should be handled on an upper level.
306
1.97k
        if (is_pgsql && 
!534
VERIFY_RESULT534
(ShouldProceedWithPgsqlIndexPermissionUpdate(catalog_manager,
307
1.97k
                                                                                   idx_table_id,
308
1.97k
                                                                                   new_perm))) {
309
0
          continue;
310
0
        }
311
1.97k
        idx_pb->set_index_permissions(new_perm);
312
1.97k
        permissions_updated = true;
313
1.97k
      }
314
3.52k
    }
315
316
1.95k
    if (permissions_updated) {
317
1.94k
      indexed_table_pb.set_version(indexed_table_pb.version() + 1);
318
1.94k
      indexed_table_pb.set_updates_only_index_permissions(true);
319
1.94k
    } else {
320
7
      VLOG(1) << "Index permissions update skipped, leaving schema_version at "
321
1
              << indexed_table_pb.version();
322
7
    }
323
1.95k
    indexed_table_data.set_state(
324
1.95k
        SysTablesEntryPB::ALTERING,
325
1.95k
        Format("Update index permission version=$0 ts=$1",
326
1.95k
               indexed_table_pb.version(), LocalTimeAsString()));
327
328
    // Update sys-catalog with the new indexed table info.
329
1.95k
    TRACE("Updating indexed table metadata on disk");
330
1.95k
    RETURN_NOT_OK(catalog_manager->sys_catalog_->Upsert(
331
1.95k
        catalog_manager->leader_ready_term(), indexed_table));
332
333
    // Update the in-memory state.
334
1.95k
    TRACE("Committing in-memory state");
335
1.95k
    l.Commit();
336
1.95k
  }
337
1.95k
  if (PREDICT_FALSE(FLAGS_TEST_slowdown_backfill_alter_table_rpcs_ms > 0)) {
338
139
    TRACE("Sleeping for $0 ms",
339
139
          FLAGS_TEST_slowdown_backfill_alter_table_rpcs_ms);
340
139
    DVLOG(3) << __PRETTY_FUNCTION__ << " " << yb::ToString(*indexed_table) << " sleeping for "
341
5
             << FLAGS_TEST_slowdown_backfill_alter_table_rpcs_ms
342
5
             << "ms AFTER updating the index permission to " << ToString(perm_mapping);
343
139
    SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_slowdown_backfill_alter_table_rpcs_ms));
344
139
    DVLOG
(3) << __PRETTY_FUNCTION__ << " Done Sleeping"13
;
345
139
    TRACE("Done Sleeping");
346
139
  }
347
1.95k
  return permissions_updated;
348
1.95k
}
349
350
Status MultiStageAlterTable::StartBackfillingData(
351
    CatalogManager* catalog_manager,
352
    const scoped_refptr<TableInfo>& indexed_table,
353
    const std::vector<IndexInfoPB>& idx_infos,
354
947
    boost::optional<uint32_t> current_version) {
355
  // We leave the table state as ALTERING so that a master failover can resume the backfill.
356
947
  RETURN_NOT_OK(ClearFullyAppliedAndUpdateState(
357
947
      catalog_manager, indexed_table, current_version, /* change_state to RUNNING */ false));
358
359
947
  RETURN_NOT_OK(indexed_table->SetIsBackfilling());
360
361
906
  TRACE("Starting backfill process");
362
906
  VLOG(0) << __func__ << " starting backfill on " << indexed_table->ToString() << " for "
363
906
          << yb::ToString(idx_infos);
364
365
906
  auto ns_info = catalog_manager->FindNamespaceById(indexed_table->namespace_id());
366
906
  RETURN_NOT_OK_PREPEND(ns_info, "Unable to get namespace info for backfill");
367
368
906
  auto backfill_table = std::make_shared<BackfillTable>(
369
906
      catalog_manager->master_, catalog_manager->AsyncTaskPool(), indexed_table, idx_infos,
370
906
      *ns_info);
371
906
  backfill_table->Launch();
372
906
  return Status::OK();
373
906
}
374
375
// Returns true, if the said IndexPermissions is a transient state.
376
// Returns false, if it is a state where the index can be. viz: READ_WRITE_AND_DELETE
377
// INDEX_UNUSED is considered transcient because it needs to delete the index.
378
0
bool IsTransientState(IndexPermissions perm) {
379
0
  return perm != INDEX_PERM_READ_WRITE_AND_DELETE && perm != INDEX_PERM_NOT_USED;
380
0
}
381
382
1.18k
IndexPermissions NextPermission(IndexPermissions perm) {
383
1.18k
  switch (perm) {
384
480
    case INDEX_PERM_DELETE_ONLY:
385
480
      return INDEX_PERM_WRITE_AND_DELETE;
386
457
    case INDEX_PERM_WRITE_AND_DELETE:
387
457
      return INDEX_PERM_DO_BACKFILL;
388
0
    case INDEX_PERM_DO_BACKFILL:
389
0
      CHECK(false) << "Not expected to be here.";
390
0
      return INDEX_PERM_DELETE_ONLY;
391
0
    case INDEX_PERM_READ_WRITE_AND_DELETE:
392
0
      CHECK(false) << "Not expected to be here.";
393
0
      return INDEX_PERM_DELETE_ONLY;
394
131
    case INDEX_PERM_WRITE_AND_DELETE_WHILE_REMOVING:
395
131
      return INDEX_PERM_DELETE_ONLY_WHILE_REMOVING;
396
119
    case INDEX_PERM_DELETE_ONLY_WHILE_REMOVING:
397
119
      return INDEX_PERM_INDEX_UNUSED;
398
0
    case INDEX_PERM_INDEX_UNUSED:
399
0
    case INDEX_PERM_NOT_USED:
400
0
      CHECK(false) << "Not expected to be here.";
401
0
      return INDEX_PERM_DELETE_ONLY;
402
1.18k
  }
403
0
  CHECK(false) << "Not expected to be here.";
404
0
  return INDEX_PERM_DELETE_ONLY;
405
1.18k
}
406
407
Status MultiStageAlterTable::LaunchNextTableInfoVersionIfNecessary(
408
    CatalogManager* catalog_manager, const scoped_refptr<TableInfo>& indexed_table,
409
10.5k
    uint32_t current_version, bool respect_backfill_deferrals) {
410
10.5k
  DVLOG
(3) << __PRETTY_FUNCTION__ << " " << yb::ToString(*indexed_table)7
;
411
412
10.5k
  const bool is_ysql_table = (indexed_table->GetTableType() == TableType::PGSQL_TABLE_TYPE);
413
10.5k
  const bool defer_backfill = !is_ysql_table && 
GetAtomicFlag(&FLAGS_defer_index_backfill)2.47k
;
414
10.5k
  const bool is_backfilling = indexed_table->IsBackfilling();
415
416
10.5k
  std::unordered_map<TableId, IndexPermissions> indexes_to_update;
417
10.5k
  vector<IndexInfoPB> indexes_to_backfill;
418
10.5k
  vector<IndexInfoPB> deferred_indexes;
419
10.5k
  vector<IndexInfoPB> indexes_to_delete;
420
10.5k
  {
421
10.5k
    TRACE("Locking indexed table");
422
10.5k
    VLOG
(1) << ("Locking indexed table")9
;
423
10.5k
    auto l = indexed_table->LockForRead();
424
10.5k
    VLOG
(1) << ("Locked indexed table")11
;
425
10.5k
    if (current_version != l->pb.version()) {
426
0
      LOG(WARNING) << "Somebody launched the next version before we got to it.";
427
0
      return Status::OK();
428
0
    }
429
430
    // Attempt to find an index that requires us to just launch the next state (i.e. not backfill)
431
17.5k
    
for (int i = 0; 10.5k
i < l->pb.indexes_size();
i++6.98k
) {
432
6.98k
      const IndexInfoPB& idx_pb = l->pb.indexes(i);
433
6.98k
      if (!idx_pb.has_index_permissions()) {
434
762
        continue;
435
762
      }
436
6.22k
      if (idx_pb.index_permissions() == INDEX_PERM_DO_BACKFILL) {
437
640
        if (respect_backfill_deferrals && 
(639
defer_backfill639
||
idx_pb.is_backfill_deferred()639
)) {
438
22
          LOG(INFO) << "Deferring index-backfill for " << idx_pb.table_id();
439
22
          deferred_indexes.emplace_back(idx_pb);
440
618
        } else {
441
618
          indexes_to_backfill.emplace_back(idx_pb);
442
618
        }
443
5.58k
      } else if (idx_pb.index_permissions() == INDEX_PERM_INDEX_UNUSED) {
444
124
        indexes_to_delete.emplace_back(idx_pb);
445
      // For YSQL, there should never be indexes to update from master side because postgres drives
446
      // permission changes.
447
5.45k
      } else if (idx_pb.index_permissions() != INDEX_PERM_READ_WRITE_AND_DELETE && 
!is_ysql_table1.83k
) {
448
1.18k
        indexes_to_update.emplace(idx_pb.table_id(), NextPermission(idx_pb.index_permissions()));
449
1.18k
      }
450
6.22k
    }
451
452
    // TODO(#6218): Do we really not want to continue backfill
453
    // across master failovers for YSQL?
454
10.5k
    if (!is_ysql_table && 
!is_backfilling2.47k
&&
l.data().pb.backfill_jobs_size() > 02.33k
) {
455
      // If a backfill job was started for a set of indexes and then the leader
456
      // fails over, we should be careful that we are restarting the backfill job
457
      // with the same set of indexes.
458
      // A new index could have been added since the time the last backfill job started on
459
      // the old master. The safe time calculated for the earlier set of indexes may not be
460
      // valid for the new index(es) to use.
461
0
      DCHECK(l.data().pb.backfill_jobs_size() == 1) << "For now we only expect to have up to 1 "
462
0
                                                        "outstanding backfill job.";
463
0
      const BackfillJobPB& backfill_job = l.data().pb.backfill_jobs(0);
464
0
      VLOG(3) << "Found an in-progress backfill-job " << yb::ToString(backfill_job);
465
      // Do not allow for any other indexes to piggy back with this backfill.
466
0
      indexes_to_backfill.clear();
467
0
      deferred_indexes.clear();
468
0
      for (int i = 0; i < backfill_job.indexes_size(); i++) {
469
0
        const IndexInfoPB& idx_pb = backfill_job.indexes(i);
470
0
        indexes_to_backfill.push_back(idx_pb);
471
0
      }
472
0
    }
473
10.5k
  }
474
475
10.5k
  if (indexes_to_update.empty() &&
476
10.5k
      
indexes_to_delete.empty()9.43k
&&
477
10.5k
      
(9.31k
is_backfilling9.31k
||
indexes_to_backfill.empty()9.27k
)) {
478
8.90k
    TRACE("Not necessary to launch next version");
479
8.90k
    VLOG
(1) << "Not necessary to launch next version"4
;
480
8.90k
    return ClearFullyAppliedAndUpdateState(
481
8.90k
        catalog_manager, indexed_table, current_version, /* change state to RUNNING */ true);
482
8.90k
  }
483
484
  // For YSQL online schema migration of indexes, instead of master driving the schema changes,
485
  // postgres will drive it.  Postgres will use three of the DocDB index permissions:
486
  //
487
  // - INDEX_PERM_WRITE_AND_DELETE (set from the start)
488
  // - INDEX_PERM_READ_WRITE_AND_DELETE (set by master)
489
  // - INDEX_PERM_WRITE_AND_DELETE_WHILE_REMOVING (set by master)
490
  //
491
  // This changes how we treat indexes_to_foo:
492
  //
493
  // - indexes_to_update should always be empty because we never want master to set index
494
  //   permissions.
495
  // - indexes_to_delete is impossible to be nonempty, and, in the future, when we do use
496
  //   INDEX_PERM_INDEX_UNUSED, we want to use some other delete trigger that makes sure no
497
  //   transactions are left using the index.  Prepare for that by doing nothing when nonempty.
498
  // - indexes_to_backfill is impossible to be nonempty, but, in the future, we want to set
499
  //   INDEX_PERM_DO_BACKFILL so that backfill resumes on master leader changes.  Prepare for that
500
  //   by handling indexes_to_backfill like for YCQL.
501
  //
502
  // TODO(jason): when using INDEX_PERM_DO_BACKFILL, update this comment (issue #6218).
503
504
1.67k
  if (!indexes_to_update.empty()) {
505
1.14k
    VLOG(1) << "Updating index permissions for " << yb::ToString(indexes_to_update) << " on "
506
4
            << indexed_table->ToString();
507
1.14k
    Result<bool> permissions_updated =
508
1.14k
        
VERIFY_RESULT966
(UpdateIndexPermission(catalog_manager, indexed_table, indexes_to_update,
509
966
                                            current_version));
510
511
966
    if (!permissions_updated.ok()) {
512
0
      LOG(WARNING) << "Could not update index permissions."
513
0
                   << " Possible that the master-leader has changed, or a race "
514
0
                   << "with another thread trying to launch next version: "
515
0
                   << permissions_updated.ToString();
516
0
    }
517
518
966
    if (permissions_updated.ok() && *permissions_updated) {
519
966
      VLOG
(1) << "Sending alter table request with updated permissions"4
;
520
966
      RETURN_NOT_OK(catalog_manager->SendAlterTableRequest(indexed_table));
521
966
      return Status::OK();
522
966
    }
523
966
  }
524
525
530
  if (!indexes_to_delete.empty()) {
526
124
    const auto& index_info_to_update = indexes_to_delete[0];
527
124
    VLOG(3) << "Deleting the index and the entry in the indexed table for "
528
1
            << yb::ToString(index_info_to_update);
529
124
    DeleteTableRequestPB req;
530
124
    DeleteTableResponsePB resp;
531
124
    req.mutable_table()->set_table_id(index_info_to_update.table_id());
532
124
    req.set_is_index_table(true);
533
124
    RETURN_NOT_OK(catalog_manager->DeleteTableInternal(&req, &resp, nullptr));
534
110
    return Status::OK();
535
124
  }
536
537
407
  
if (406
!indexes_to_backfill.empty()406
) {
538
407
    VLOG(3) << "Backfilling " << yb::ToString(indexes_to_backfill)
539
1
            << (deferred_indexes.empty()
540
1
                 ? ""
541
1
                 : yb::Format(" along with deferred indexes $0",
542
0
                              yb::ToString(deferred_indexes)));
543
407
    for (auto& deferred_idx : deferred_indexes) {
544
5
      indexes_to_backfill.emplace_back(deferred_idx);
545
5
    }
546
407
    WARN_NOT_OK(
547
407
        StartBackfillingData(
548
407
            catalog_manager, indexed_table.get(), indexes_to_backfill, current_version),
549
407
        yb::Format("Could not launch backfill for $0", indexed_table->ToString()));
550
407
  }
551
552
406
  return Status::OK();
553
530
}
554
555
// -----------------------------------------------------------------------------------------------
556
// BackfillTableJob
557
// -----------------------------------------------------------------------------------------------
558
0
std::string BackfillTableJob::description() const {
559
0
  const std::shared_ptr<BackfillTable> retain_bt = backfill_table_;
560
0
  auto curr_state = state();
561
0
  if (!IsStateTerminal(curr_state) && retain_bt) {
562
0
    return retain_bt->description();
563
0
  } else if (curr_state == MonitoredTaskState::kFailed) {
564
0
    return Format("Backfilling $0 Failed", requested_index_names_);
565
0
  } else if (curr_state == MonitoredTaskState::kAborted) {
566
0
    return Format("Backfilling $0 Aborted", requested_index_names_);
567
0
  } else {
568
0
    DCHECK(curr_state == MonitoredTaskState::kComplete);
569
0
    return Format("Backfilling $0 Done", requested_index_names_);
570
0
  }
571
0
}
572
573
0
MonitoredTaskState BackfillTableJob::AbortAndReturnPrevState(const Status& status) {
574
0
  auto old_state = state();
575
0
  while (!IsStateTerminal(old_state)) {
576
0
    if (state_.compare_exchange_strong(old_state,
577
0
                                       MonitoredTaskState::kAborted)) {
578
0
      return old_state;
579
0
    }
580
0
    old_state = state();
581
0
  }
582
0
  return old_state;
583
0
}
584
585
1.78k
void BackfillTableJob::SetState(MonitoredTaskState new_state) {
586
1.78k
  auto old_state = state();
587
1.78k
  if (!IsStateTerminal(old_state)) {
588
1.77k
    if (state_.compare_exchange_strong(old_state, new_state) && IsStateTerminal(new_state)) {
589
871
      MarkDone();
590
871
    }
591
1.77k
  }
592
1.78k
}
593
// -----------------------------------------------------------------------------------------------
594
// BackfillTable
595
// -----------------------------------------------------------------------------------------------
596
597
namespace {
598
599
906
std::unordered_set<TableId> IndexIdsFromInfos(const std::vector<IndexInfoPB>& indexes) {
600
906
  std::unordered_set<TableId> idx_ids;
601
918
  for (const auto& idx_info : indexes) {
602
918
    idx_ids.insert(idx_info.table_id());
603
918
  }
604
906
  return idx_ids;
605
906
}
606
607
std::string RetrieveIndexNames(CatalogManager* mgr,
608
5.43k
                               const std::unordered_set<std::string>& index_ids) {
609
5.43k
  std::ostringstream out;
610
5.43k
  out << "{ ";
611
5.43k
  bool first = true;
612
5.47k
  for (const auto& index_id : index_ids) {
613
5.47k
    const auto table_info = mgr->GetTableInfo(index_id);
614
5.47k
    if (!table_info) {
615
0
      LOG(WARNING) << "No table info can be found with index table id " << index_id;
616
0
      continue;
617
0
    }
618
5.47k
    if (!first) {
619
47
      out << ", ";
620
47
    }
621
5.47k
    first = false;
622
623
5.47k
    out << table_info->name();
624
5.47k
  }
625
5.43k
  out << " }";
626
5.43k
  return out.str();
627
5.43k
}
628
629
}  // namespace
630
631
BackfillTable::BackfillTable(
632
    Master* master, ThreadPool* callback_pool, const scoped_refptr<TableInfo>& indexed_table,
633
    std::vector<IndexInfoPB> indexes, const scoped_refptr<NamespaceInfo>& ns_info)
634
    : master_(master),
635
      callback_pool_(callback_pool),
636
      indexed_table_(indexed_table),
637
      index_infos_(indexes),
638
      requested_index_ids_(IndexIdsFromInfos(indexes)),
639
      requested_index_names_(RetrieveIndexNames(
640
          master->catalog_manager_impl(), requested_index_ids_)),
641
906
      ns_info_(ns_info) {
642
906
  auto l = indexed_table_->LockForRead();
643
906
  schema_version_ = indexed_table_->metadata().state().pb.version();
644
906
  leader_term_ = master_->catalog_manager()->leader_ready_term();
645
906
  if (l.data().pb.backfill_jobs_size() > 0) {
646
0
    number_rows_processed_.store(l.data().pb.backfill_jobs(0).num_rows_processed());
647
906
  } else {
648
906
    number_rows_processed_.store(0);
649
906
  }
650
651
906
  const auto& pb = indexed_table_->metadata().state().pb;
652
906
  if (pb.backfill_jobs_size() > 0 && 
pb.backfill_jobs(0).has_backfilling_timestamp()0
&&
653
906
      
read_time_for_backfill_.FromUint64(pb.backfill_jobs(0).backfilling_timestamp()).ok()0
) {
654
0
    DCHECK(pb.backfill_jobs_size() == 1) << "Expect only 1 outstanding backfill job";
655
0
    DCHECK(implicit_cast<size_t>(pb.backfill_jobs(0).indexes_size()) == index_infos_.size())
656
0
        << "Expect to use the same set of indexes.";
657
0
    timestamp_chosen_.store(true, std::memory_order_release);
658
0
    VLOG_WITH_PREFIX(1) << "Will be using " << read_time_for_backfill_
659
0
                        << " for backfill";
660
906
  } else {
661
906
    read_time_for_backfill_ = HybridTime::kInvalid;
662
906
    timestamp_chosen_.store(false, std::memory_order_release);
663
906
  }
664
906
  done_.store(false, std::memory_order_release);
665
906
}
666
667
14.1k
const std::unordered_set<TableId> BackfillTable::indexes_to_build() const {
668
14.1k
  std::unordered_set<TableId> indexes_to_build;
669
14.1k
  {
670
14.1k
    auto l = indexed_table_->LockForRead();
671
14.1k
    const auto& indexed_table_pb = l.data().pb;
672
14.1k
    if (indexed_table_pb.backfill_jobs_size() == 0) {
673
      // Some other task already marked the backfill job as done.
674
56
      return {};
675
56
    }
676
18.4E
    DCHECK(indexed_table_pb.backfill_jobs_size() == 1) << "For now we only expect to have up to 1 "
677
18.4E
                                                          "outstanding backfill job.";
678
14.2k
    for (const auto& kv_pair : indexed_table_pb.backfill_jobs(0).backfill_state()) {
679
14.2k
      if (kv_pair.second == BackfillJobPB::IN_PROGRESS) {
680
14.2k
        indexes_to_build.insert(kv_pair.first);
681
14.2k
      }
682
14.2k
    }
683
14.1k
  }
684
0
  return indexes_to_build;
685
14.1k
}
686
687
906
void BackfillTable::Launch() {
688
906
  backfill_job_ = std::make_shared<BackfillTableJob>(shared_from_this());
689
906
  backfill_job_->SetState(MonitoredTaskState::kRunning);
690
906
  master_->catalog_manager_impl()->jobs_tracker_->AddTask(backfill_job_);
691
692
906
  {
693
906
    auto l = indexed_table_->LockForWrite();
694
906
    if (l.data().pb.backfill_jobs_size() == 0) {
695
906
      auto* backfill_job = l.mutable_data()->pb.add_backfill_jobs();
696
918
      for (const auto& idx_info : index_infos_) {
697
918
        backfill_job->add_indexes()->CopyFrom(idx_info);
698
918
        backfill_job->mutable_backfill_state()->insert(
699
918
            {idx_info.table_id(), BackfillJobPB::IN_PROGRESS});
700
918
      }
701
906
      auto s = master_->catalog_manager_impl()->sys_catalog_->Upsert(
702
906
              leader_term(), indexed_table_);
703
906
      if (!s.ok()) {
704
0
        LOG(WARNING) << "Failed to persist backfill jobs. Abandoning launch. " << s;
705
0
        return;
706
0
      }
707
906
      l.Commit();
708
906
    }
709
906
  }
710
906
  if (!timestamp_chosen_.load(std::memory_order_acquire)) {
711
906
    LaunchComputeSafeTimeForRead();
712
906
  } else {
713
0
    LaunchBackfill();
714
0
  }
715
906
}
716
717
906
void BackfillTable::LaunchComputeSafeTimeForRead() {
718
906
  auto tablets = indexed_table_->GetTablets();
719
720
906
  num_tablets_.store(tablets.size(), std::memory_order_release);
721
906
  tablets_pending_.store(tablets.size(), std::memory_order_release);
722
906
  auto min_cutoff = master()->clock()->Now();
723
4.40k
  for (const scoped_refptr<TabletInfo>& tablet : tablets) {
724
4.40k
    auto get_safetime = std::make_shared<GetSafeTimeForTablet>(
725
4.40k
        shared_from_this(), tablet, min_cutoff);
726
4.40k
    get_safetime->Launch();
727
4.40k
  }
728
906
}
729
730
1.81k
std::string BackfillTable::LogPrefix() const {
731
1.81k
  return Format("Backfill Index Table(s) $0 ", requested_index_names_);
732
1.81k
}
733
734
0
std::string BackfillTable::description() const {
735
0
  auto num_pending = tablets_pending_.load(std::memory_order_acquire);
736
0
  auto num_tablets = num_tablets_.load(std::memory_order_acquire);
737
0
  return Format(
738
0
      "Backfill Index Table(s) $0 : $1", requested_index_names_,
739
0
      (timestamp_chosen()
740
0
           ? (done() ? Format("Backfill $0/$1 tablets done", num_pending, num_tablets)
741
0
                     : Format(
742
0
                           "Backfilling $0/$1 tablets with $2 rows done", num_pending, num_tablets,
743
0
                           number_rows_processed_.load()))
744
0
           : Format("Waiting to GetSafeTime from $0/$1 tablets", num_pending, num_tablets)));
745
0
}
746
747
1.99k
const std::string BackfillTable::GetNamespaceName() const {
748
1.99k
  return ns_info_->name();
749
1.99k
}
750
751
4.32k
Status BackfillTable::UpdateRowsProcessedForIndexTable(const uint64_t number_rows_processed) {
752
4.32k
  auto l = indexed_table_->LockForWrite();
753
754
4.32k
  if (l.data().pb.backfill_jobs_size() == 0) {
755
    // Some other task already marked the backfill job as done.
756
31
    return Status::OK();
757
31
  }
758
759
  // This is consistent with logic assuming that we have only one backfill job in queue
760
  // We might in the future change this to a for loop to account for multiple backfill jobs
761
4.29k
  number_rows_processed_.fetch_add(number_rows_processed);
762
4.29k
  auto* indexed_table_pb = l.mutable_data()->pb.mutable_backfill_jobs(0);
763
4.29k
  indexed_table_pb->set_num_rows_processed(number_rows_processed_.load());
764
4.29k
  VLOG(2) << "Updated backfill task to having processed " << number_rows_processed
765
0
          << " more rows. Total rows processed is: " << number_rows_processed_;
766
767
4.29k
  RETURN_NOT_OK(
768
4.29k
      master_->catalog_manager_impl()->sys_catalog_->Upsert(leader_term(), indexed_table_));
769
4.29k
  l.Commit();
770
4.29k
  return Status::OK();
771
4.29k
}
772
773
4.40k
Status BackfillTable::UpdateSafeTime(const Status& s, HybridTime ht) {
774
4.40k
  if (!s.ok()) {
775
    // Move on to ABORTED permission.
776
1
    LOG_WITH_PREFIX(ERROR)
777
1
        << "Failed backfill. Could not compute safe time for "
778
1
        << yb::ToString(indexed_table_) << " " << s;
779
1
    if (!timestamp_chosen_.exchange(true)) {
780
1
      RETURN_NOT_OK_PREPEND(
781
1
          MarkAllIndexesAsFailed(), "Failed to mark backfill as failed. Abandoning.");
782
1
    }
783
1
    return Status::OK();
784
1
  }
785
786
  // Need to guard this.
787
4.39k
  HybridTime read_timestamp;
788
4.39k
  {
789
4.39k
    std::lock_guard<simple_spinlock> l(mutex_);
790
4.39k
    VLOG(2) << "Updating read_time_for_backfill_ to max{ "
791
1
            << read_time_for_backfill_.ToString() << ", " << ht.ToString()
792
1
            << " }.";
793
4.39k
    read_time_for_backfill_.MakeAtLeast(ht);
794
4.39k
    read_timestamp = read_time_for_backfill_;
795
4.39k
  }
796
797
  // If OK then move on to doing backfill.
798
4.39k
  if (!timestamp_chosen() && 
--tablets_pending_ == 04.39k
) {
799
905
    LOG_WITH_PREFIX(INFO) << "Completed fetching SafeTime for the table "
800
905
                          << yb::ToString(indexed_table_) << " will be using "
801
905
                          << read_timestamp.ToString();
802
905
    {
803
905
      auto l = indexed_table_->LockForWrite();
804
905
      DCHECK_EQ(l.mutable_data()->pb.backfill_jobs_size(), 1);
805
905
      auto* backfill_job = l.mutable_data()->pb.mutable_backfill_jobs(0);
806
905
      backfill_job->set_backfilling_timestamp(read_timestamp.ToUint64());
807
905
      RETURN_NOT_OK_PREPEND(
808
905
          master_->catalog_manager_impl()->sys_catalog_->Upsert(
809
905
              leader_term(), indexed_table_),
810
905
          "Failed to persist backfilling timestamp. Abandoning.");
811
905
      l.Commit();
812
905
    }
813
1
    VLOG_WITH_PREFIX(2) << "Saved " << read_timestamp
814
1
                        << " as backfilling_timestamp";
815
905
    timestamp_chosen_.store(true, std::memory_order_release);
816
905
    LaunchBackfill();
817
905
  }
818
4.39k
  return Status::OK();
819
4.39k
}
820
821
905
void BackfillTable::LaunchBackfill() {
822
905
  
VLOG_WITH_PREFIX1
(1) << "launching backfill with timestamp: "
823
1
                      << read_time_for_backfill_;
824
905
  auto tablets = indexed_table_->GetTablets();
825
826
905
  num_tablets_.store(tablets.size(), std::memory_order_release);
827
905
  tablets_pending_.store(tablets.size(), std::memory_order_release);
828
4.39k
  for (const scoped_refptr<TabletInfo>& tablet : tablets) {
829
4.39k
    auto backfill_tablet = std::make_shared<BackfillTablet>(shared_from_this(), tablet);
830
4.39k
    backfill_tablet->Launch();
831
4.39k
  }
832
905
}
833
834
4.23k
void BackfillTable::Done(const Status& s, const std::unordered_set<TableId>& failed_indexes) {
835
4.23k
  if (!s.ok()) {
836
41
    LOG_WITH_PREFIX(ERROR) << "failed to backfill the index: " << yb::ToString(failed_indexes)
837
41
                           << " due to " << s;
838
41
    WARN_NOT_OK(
839
41
        MarkIndexesAsFailed(failed_indexes, s.message().ToBuffer()),
840
41
        "Couldn't to mark Indexes as failed");
841
41
    CheckIfDone();
842
41
    return;
843
41
  }
844
845
  // If OK then move on to READ permissions.
846
4.18k
  if (!done() && 
--tablets_pending_ == 04.16k
) {
847
868
    LOG_WITH_PREFIX(INFO) << "Completed backfilling the index table.";
848
868
    done_.store(true, std::memory_order_release);
849
868
    WARN_NOT_OK(MarkAllIndexesAsSuccess(), "Failed to complete backfill.");
850
868
    WARN_NOT_OK(UpdateIndexPermissionsForIndexes(), "Failed to complete backfill.");
851
3.32k
  } else {
852
3.32k
    
VLOG_WITH_PREFIX0
(1) << "Still backfilling " << tablets_pending_ << " more tablets."0
;
853
3.32k
  }
854
4.18k
}
855
856
Status BackfillTable::MarkIndexesAsFailed(
857
41
    const std::unordered_set<TableId>& failed_indexes, const string& message) {
858
41
  return MarkIndexesAsDesired(failed_indexes, BackfillJobPB::FAILED, message);
859
41
}
860
861
1
Status BackfillTable::MarkAllIndexesAsFailed() {
862
1
  return MarkIndexesAsDesired(indexes_to_build(), BackfillJobPB::FAILED, "failed");
863
1
}
864
865
868
Status BackfillTable::MarkAllIndexesAsSuccess() {
866
868
  return MarkIndexesAsDesired(indexes_to_build(), BackfillJobPB::SUCCESS, "");
867
868
}
868
869
Status BackfillTable::MarkIndexesAsDesired(
870
    const std::unordered_set<TableId>& index_ids_set, BackfillJobPB_State state,
871
910
    const string message) {
872
910
  
VLOG_WITH_PREFIX1
(3) << "Marking " << yb::ToString(index_ids_set)
873
1
                      << " as " << BackfillJobPB_State_Name(state)
874
1
                      << " due to " << message;
875
910
  if (!index_ids_set.empty()) {
876
910
    auto l = indexed_table_->LockForWrite();
877
910
    auto& indexed_table_pb = l.mutable_data()->pb;
878
910
    DCHECK_LE(indexed_table_pb.backfill_jobs_size(), 1) << "For now we only expect to have up to 1 "
879
0
                                                           "outstanding backfill job.";
880
910
    if (indexed_table_pb.backfill_jobs_size() == 0) {
881
      // Some other task already marked the backfill job as done.
882
24
      return Status::OK();
883
24
    }
884
886
    auto* backfill_state_pb = indexed_table_pb.mutable_backfill_jobs(0)->mutable_backfill_state();
885
896
    for (const auto& idx_id : index_ids_set) {
886
896
      backfill_state_pb->at(idx_id) = state;
887
896
      VLOG
(2) << "Marking index " << idx_id << " as " << BackfillJobPB_State_Name(state)1
;
888
896
    }
889
2.46k
    for (int i = 0; i < indexed_table_pb.indexes_size(); 
i++1.57k
) {
890
1.57k
      IndexInfoPB* idx_pb = indexed_table_pb.mutable_indexes(i);
891
1.57k
      if (index_ids_set.find(idx_pb->table_id()) != index_ids_set.end()) {
892
        // Should this also move to the BackfillJob instead?
893
891
        if (!message.empty()) {
894
13
          idx_pb->set_backfill_error_message(message);
895
878
        } else {
896
878
          idx_pb->clear_backfill_error_message();
897
878
        }
898
891
        idx_pb->clear_is_backfill_deferred();
899
891
      }
900
1.57k
    }
901
886
    RETURN_NOT_OK(
902
886
        master_->catalog_manager_impl()->sys_catalog_->Upsert(leader_term(), indexed_table_));
903
886
    l.Commit();
904
886
  }
905
886
  return Status::OK();
906
910
}
907
908
41
void BackfillTable::CheckIfDone() {
909
41
  if (indexes_to_build().empty()) {
910
40
    done_.store(true, std::memory_order_release);
911
40
    WARN_NOT_OK(
912
40
        UpdateIndexPermissionsForIndexes(), "Could not update index permissions after backfill");
913
40
  }
914
41
}
915
916
908
Status BackfillTable::UpdateIndexPermissionsForIndexes() {
917
908
  std::unordered_map<TableId, IndexPermissions> permissions_to_set;
918
908
  bool all_success = true;
919
908
  {
920
908
    auto l = indexed_table_->LockForRead();
921
908
    const auto& indexed_table_pb = l.data().pb;
922
908
    if (indexed_table_pb.backfill_jobs_size() == 0) {
923
      // Some other task already marked the backfill job as done.
924
24
      return Status::OK();
925
24
    }
926
884
    DCHECK(indexed_table_pb.backfill_jobs_size() == 1) << "For now we only expect to have up to 1 "
927
0
                                                          "outstanding backfill job.";
928
896
    for (const auto& kv_pair : indexed_table_pb.backfill_jobs(0).backfill_state()) {
929
896
      VLOG(2) << "Reading backfill_state for " << kv_pair.first << " as "
930
1
              << BackfillJobPB_State_Name(kv_pair.second);
931
896
      DCHECK_NE(kv_pair.second, BackfillJobPB::IN_PROGRESS)
932
0
          << __func__ << " is expected to be only called after all indexes are done.";
933
896
      const bool success = (kv_pair.second == BackfillJobPB::SUCCESS);
934
896
      all_success &= success;
935
896
      permissions_to_set.emplace(
936
896
          kv_pair.first,
937
896
          success ? 
INDEX_PERM_READ_WRITE_AND_DELETE879
:
INDEX_PERM_WRITE_AND_DELETE_WHILE_REMOVING17
);
938
896
    }
939
884
  }
940
941
896
  for (const auto& kv_pair : permissions_to_set) {
942
896
    if (kv_pair.second == INDEX_PERM_READ_WRITE_AND_DELETE) {
943
879
      RETURN_NOT_OK(AllowCompactionsToGCDeleteMarkers(kv_pair.first));
944
879
    }
945
896
  }
946
947
884
  RETURN_NOT_OK_PREPEND(
948
884
      MultiStageAlterTable::UpdateIndexPermission(
949
884
          master_->catalog_manager_impl(), indexed_table_, permissions_to_set, boost::none),
950
884
      "Could not update permissions after backfill. "
951
884
      "Possible that the master-leader has changed.");
952
884
  backfill_job_->SetState(
953
884
      all_success ? 
MonitoredTaskState::kComplete859
:
MonitoredTaskState::kFailed25
);
954
884
  RETURN_NOT_OK(ClearCheckpointStateInTablets());
955
884
  indexed_table_->ClearIsBackfilling();
956
957
884
  VLOG
(1) << "Sending alter table requests to the Indexed table"20
;
958
884
  RETURN_NOT_OK(master_->catalog_manager_impl()->SendAlterTableRequest(indexed_table_));
959
884
  VLOG
(1) << "DONE Sending alter table requests to the Indexed table"20
;
960
961
884
  LOG(INFO) << "Done backfill on " << indexed_table_->ToString() << " setting permissions to "
962
884
            << yb::ToString(permissions_to_set);
963
884
  return Status::OK();
964
884
}
965
966
875
Status BackfillTable::ClearCheckpointStateInTablets() {
967
875
  auto tablets = indexed_table_->GetTablets();
968
875
  std::vector<TabletInfo*> tablet_ptrs;
969
4.23k
  for (scoped_refptr<TabletInfo>& tablet : tablets) {
970
4.23k
    tablet_ptrs.push_back(tablet.get());
971
4.23k
    tablet->mutable_metadata()->StartMutation();
972
4.23k
    auto& pb = tablet->mutable_metadata()->mutable_dirty()->pb;
973
4.26k
    for (const auto& idx : requested_index_ids_) {
974
4.26k
      pb.mutable_backfilled_until()->erase(idx);
975
4.26k
    }
976
4.23k
  }
977
875
  RETURN_NOT_OK_PREPEND(
978
875
      master()->catalog_manager()->sys_catalog()->Upsert(leader_term(), tablet_ptrs),
979
875
      "Could not persist that the table is done backfilling.");
980
4.23k
  for (scoped_refptr<TabletInfo>& tablet : tablets) {
981
4.23k
    VLOG(2) << "Done backfilling the table. " << yb::ToString(tablet)
982
1
            << " clearing backfilled_until";
983
4.23k
    tablet->mutable_metadata()->CommitMutation();
984
4.23k
  }
985
986
875
  if (FLAGS_TEST_slowdown_backfill_job_deletion_ms > 0) {
987
20
    SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_slowdown_backfill_job_deletion_ms));
988
20
  }
989
990
875
  {
991
875
    auto l = indexed_table_->LockForWrite();
992
875
    DCHECK_LE(l.data().pb.backfill_jobs_size(), 1) << "For now we only expect to have up to 1 "
993
0
                                                       "outstanding backfill job.";
994
875
    l.mutable_data()->pb.clear_backfill_jobs();
995
875
    RETURN_NOT_OK_PREPEND(master_->catalog_manager_impl()->sys_catalog_->Upsert(
996
875
                              leader_term(), indexed_table_),
997
875
                          "Could not clear backfilling timestamp.");
998
875
    l.Commit();
999
875
  }
1000
11
  VLOG_WITH_PREFIX(2) << "Cleared backfilling timestamp.";
1001
875
  return Status::OK();
1002
875
}
1003
1004
Status BackfillTable::AllowCompactionsToGCDeleteMarkers(
1005
879
    const TableId &index_table_id) {
1006
879
  DVLOG
(3) << __PRETTY_FUNCTION__0
;
1007
879
  auto res = master_->catalog_manager()->FindTableById(index_table_id);
1008
879
  if (!res && 
res.status().IsNotFound()0
) {
1009
0
    LOG(ERROR) << "Index " << index_table_id << " was not found."
1010
0
               << " This is ok in case somebody issued a delete index. : " << res.ToString();
1011
0
    return Status::OK();
1012
0
  }
1013
879
  scoped_refptr<TableInfo> index_table_info = VERIFY_RESULT_PREPEND(std::move(res),
1014
879
      Format("Could not find the index table $0", index_table_id));
1015
1016
  // Add a sleep here to wait until the Table is fully created.
1017
0
  bool is_ready = false;
1018
879
  bool first_run = true;
1019
879
  do {
1020
879
    if (!first_run) {
1021
0
      YB_LOG_EVERY_N_SECS(INFO, 1) << "Waiting for the previous alter table to "
1022
0
                                      "complete on the index table "
1023
0
                                   << index_table_id;
1024
0
      SleepFor(
1025
0
          MonoDelta::FromMilliseconds(FLAGS_index_backfill_wait_for_alter_table_completion_ms));
1026
0
    }
1027
879
    first_run = false;
1028
879
    {
1029
879
      VLOG
(2) << __func__ << ": Trying to lock index table for Read"0
;
1030
879
      auto l = index_table_info->LockForRead();
1031
879
      auto state = l->pb.state();
1032
879
      if (state != SysTablesEntryPB::RUNNING && 
state != SysTablesEntryPB::ALTERING2
) {
1033
2
        LOG(ERROR) << "Index " << index_table_id << " is in state "
1034
2
                   << SysTablesEntryPB_State_Name(state) << " : cannot enable compactions on it";
1035
        // Treating it as success so that we can proceed with updating other indexes.
1036
2
        return Status::OK();
1037
2
      }
1038
877
      is_ready = state == SysTablesEntryPB::RUNNING;
1039
877
    }
1040
0
    VLOG(2) << __func__ << ": Unlocked index table for Read";
1041
877
  } while (!is_ready);
1042
877
  {
1043
877
    TRACE("Locking index table");
1044
877
    VLOG
(2) << __func__ << ": Trying to lock index table for Write"0
;
1045
877
    auto l = index_table_info->LockForWrite();
1046
877
    VLOG
(2) << __func__ << ": locked index table for Write"0
;
1047
877
    l.mutable_data()->pb.mutable_schema()->mutable_table_properties()
1048
877
        ->set_retain_delete_markers(false);
1049
1050
    // Update sys-catalog with the new indexed table info.
1051
877
    TRACE("Updating index table metadata on disk");
1052
877
    RETURN_NOT_OK_PREPEND(
1053
877
        master_->catalog_manager_impl()->sys_catalog_->Upsert(
1054
877
            leader_term(), index_table_info),
1055
877
        yb::Format(
1056
877
            "Could not update index_table_info for $0 to enable compactions.",
1057
877
            index_table_id));
1058
1059
    // Update the in-memory state.
1060
877
    TRACE("Committing in-memory state");
1061
877
    l.Commit();
1062
877
  }
1063
0
  VLOG(2) << __func__ << ": Unlocked index table for Read";
1064
877
  VLOG
(1) << "Sending backfill done requests to the Index table"0
;
1065
877
  RETURN_NOT_OK(SendRpcToAllowCompactionsToGCDeleteMarkers(index_table_info));
1066
877
  VLOG
(1) << "DONE Sending backfill done requests to the Index table"0
;
1067
877
  return Status::OK();
1068
877
}
1069
1070
Status BackfillTable::SendRpcToAllowCompactionsToGCDeleteMarkers(
1071
877
    const scoped_refptr<TableInfo> &table) {
1072
877
  auto tablets = table->GetTablets();
1073
1074
3.80k
  for (const scoped_refptr<TabletInfo>& tablet : tablets) {
1075
3.80k
    RETURN_NOT_OK(SendRpcToAllowCompactionsToGCDeleteMarkers(tablet, table->id()));
1076
3.80k
  }
1077
877
  return Status::OK();
1078
877
}
1079
1080
Status BackfillTable::SendRpcToAllowCompactionsToGCDeleteMarkers(
1081
3.80k
    const scoped_refptr<TabletInfo> &tablet, const std::string &table_id) {
1082
3.80k
  auto call = std::make_shared<AsyncBackfillDone>(master_, callback_pool_, tablet, table_id);
1083
3.80k
  tablet->table()->AddTask(call);
1084
3.80k
  RETURN_NOT_OK_PREPEND(
1085
3.80k
      master_->catalog_manager()->ScheduleTask(call),
1086
3.80k
      "Failed to send backfill done request");
1087
3.80k
  return Status::OK();
1088
3.80k
}
1089
1090
// -----------------------------------------------------------------------------------------------
1091
// BackfillTablet
1092
// -----------------------------------------------------------------------------------------------
1093
BackfillTablet::BackfillTablet(
1094
    std::shared_ptr<BackfillTable> backfill_table, const scoped_refptr<TabletInfo>& tablet)
1095
4.39k
    : backfill_table_(backfill_table), tablet_(tablet) {
1096
4.39k
  const auto& index_ids = backfill_table->indexes_to_build();
1097
4.39k
  {
1098
4.39k
    auto l = tablet_->LockForRead();
1099
4.39k
    const auto& pb = tablet_->metadata().state().pb;
1100
4.39k
    Partition::FromPB(pb.partition(), &partition_);
1101
    // calculate backfilled_until_ as the largest key which all (active) indexes have backfilled.
1102
4.42k
    for (const TableId& idx_id : index_ids) {
1103
4.42k
      if (pb.backfilled_until().find(idx_id) != pb.backfilled_until().end()) {
1104
0
        auto key = pb.backfilled_until().at(idx_id);
1105
0
        if (backfilled_until_.empty() || key.compare(backfilled_until_) < 0) {
1106
0
          VLOG(2) << "Updating backfilled_until_ as " << key;
1107
0
          backfilled_until_ = key;
1108
0
          done_.store(backfilled_until_.empty(), std::memory_order_release);
1109
0
        }
1110
0
      }
1111
4.42k
    }
1112
4.39k
  }
1113
4.39k
  if (!backfilled_until_.empty()) {
1114
0
    VLOG_WITH_PREFIX(1) << " resuming backfill from "
1115
0
                        << yb::ToString(backfilled_until_);
1116
4.39k
  } else if (done()) {
1117
0
    VLOG_WITH_PREFIX(1) << " backfill already done.";
1118
4.39k
  } else {
1119
4.39k
    
VLOG_WITH_PREFIX1
(1) << " beginning backfill from "
1120
1
                        << "<start-of-the-tablet>";
1121
4.39k
  }
1122
4.39k
}
1123
1124
3
std::string BackfillTablet::LogPrefix() const {
1125
3
  return Format("Backfill Index(es) $0 for tablet $1 ",
1126
3
                yb::ToString(backfill_table_->indexes_to_build()),
1127
3
                tablet_->id());
1128
3
}
1129
1130
8.73k
void BackfillTablet::LaunchNextChunkOrDone() {
1131
8.73k
  if (done()) {
1132
4.18k
    
VLOG_WITH_PREFIX0
(1) << "is done"0
;
1133
4.18k
    backfill_table_->Done(Status::OK(), /* failed_indexes */ {});
1134
4.54k
  } else if (!backfill_table_->done()) {
1135
4.52k
    
VLOG_WITH_PREFIX1
(2) << "Launching next chunk from " << backfilled_until_1
;
1136
4.52k
    auto chunk = std::make_shared<BackfillChunk>(shared_from_this(),
1137
4.52k
                                                 backfilled_until_);
1138
4.52k
    chunk->Launch();
1139
4.52k
  }
1140
8.73k
}
1141
1142
void BackfillTablet::Done(
1143
    const Status& status, const boost::optional<string>& backfilled_until,
1144
4.37k
    const uint64_t number_rows_processed, const std::unordered_set<TableId>& failed_indexes) {
1145
4.37k
  if (!status.ok()) {
1146
41
    LOG(INFO) << "Failed to backfill the tablet " << yb::ToString(tablet_) << ": " << status
1147
41
              << "\nFailed_indexes are " << yb::ToString(failed_indexes);
1148
41
    backfill_table_->Done(status, failed_indexes);
1149
41
  }
1150
1151
4.37k
  if (backfilled_until) {
1152
4.34k
    auto s = UpdateBackfilledUntil(*backfilled_until, number_rows_processed);
1153
4.34k
    if (!s.ok()) {
1154
24
      LOG(WARNING) << "Could not persist how far the tablet is done backfilling. " << s.ToString();
1155
24
      return;
1156
24
    }
1157
4.34k
  }
1158
1159
4.34k
  LaunchNextChunkOrDone();
1160
4.34k
}
1161
1162
Status BackfillTablet::UpdateBackfilledUntil(
1163
4.34k
    const string& backfilled_until, const uint64_t number_rows_processed) {
1164
4.34k
  backfilled_until_ = backfilled_until;
1165
4.34k
  
VLOG_WITH_PREFIX1
(2) << "Done backfilling the tablet " << yb::ToString(tablet_) << " until "
1166
1
                      << yb::ToString(backfilled_until_);
1167
4.34k
  {
1168
4.34k
    auto l = tablet_->LockForWrite();
1169
4.34k
    for (const auto& idx_id : backfill_table_->indexes_to_build()) {
1170
4.34k
      l.mutable_data()->pb.mutable_backfilled_until()->insert({idx_id, backfilled_until_});
1171
4.34k
    }
1172
4.34k
    RETURN_NOT_OK(backfill_table_->master()->catalog_manager()->sys_catalog()->Upsert(
1173
4.34k
        backfill_table_->leader_term(), tablet_));
1174
4.32k
    l.Commit();
1175
4.32k
  }
1176
1177
  // This is the last chunk.
1178
4.32k
  if (backfilled_until_.empty()) {
1179
4.18k
    LOG(INFO) << "Done backfilling the tablet " << yb::ToString(tablet_);
1180
4.18k
    done_.store(true, std::memory_order_release);
1181
4.18k
  }
1182
4.32k
  return backfill_table_->UpdateRowsProcessedForIndexTable(number_rows_processed);
1183
4.34k
}
1184
1185
// -----------------------------------------------------------------------------------------------
1186
// GetSafeTimeForTablet
1187
// -----------------------------------------------------------------------------------------------
1188
1189
4.40k
void GetSafeTimeForTablet::Launch() {
1190
4.40k
  tablet_->table()->AddTask(shared_from_this());
1191
4.40k
  Status status = Run();
1192
  // Need to print this after Run() because that's where it picks the TS which description()
1193
  // needs.
1194
4.40k
  if (status.ok()) {
1195
4.40k
    VLOG
(3) << "Started GetSafeTimeForTablet : " << this->description()1
;
1196
4.40k
  } else {
1197
0
    LOG(WARNING) << Substitute("Failed to send GetSafeTime request for $0. ",
1198
0
                               tablet_->ToString())
1199
0
                 << status;
1200
0
  }
1201
4.40k
}
1202
1203
4.40k
bool GetSafeTimeForTablet::SendRequest(int attempt) {
1204
4.40k
  VLOG
(1) << __PRETTY_FUNCTION__1
;
1205
4.40k
  tserver::GetSafeTimeRequestPB req;
1206
4.40k
  req.set_dest_uuid(permanent_uuid());
1207
4.40k
  req.set_tablet_id(tablet_->tablet_id());
1208
4.40k
  auto now = backfill_table_->master()->clock()->Now().ToUint64();
1209
4.40k
  req.set_min_hybrid_time_for_backfill(min_cutoff_.ToUint64());
1210
4.40k
  req.set_propagated_hybrid_time(now);
1211
1212
4.40k
  ts_admin_proxy_->GetSafeTimeAsync(req, &resp_, &rpc_, BindRpcCallback());
1213
4.40k
  VLOG(1) << "Send " << description() << " to " << permanent_uuid()
1214
1
          << " (attempt " << attempt << "):\n"
1215
1
          << req.DebugString();
1216
4.40k
  return true;
1217
4.40k
}
1218
1219
4.39k
void GetSafeTimeForTablet::HandleResponse(int attempt) {
1220
4.39k
  VLOG
(1) << __PRETTY_FUNCTION__3
;
1221
4.39k
  Status status = Status::OK();
1222
4.39k
  if (resp_.has_error()) {
1223
3
    status = StatusFromPB(resp_.error().status());
1224
1225
    // Do not retry on a fatal error
1226
3
    switch (resp_.error().code()) {
1227
0
      case TabletServerErrorPB::TABLET_NOT_FOUND:
1228
0
      case TabletServerErrorPB::MISMATCHED_SCHEMA:
1229
0
      case TabletServerErrorPB::TABLET_HAS_A_NEWER_SCHEMA:
1230
0
      case TabletServerErrorPB::OPERATION_NOT_SUPPORTED:
1231
0
        LOG(WARNING) << "TS " << permanent_uuid() << ": GetSafeTime failed for tablet "
1232
0
                     << tablet_->ToString() << " no further retry: " << status;
1233
0
        TransitionToFailedState(MonitoredTaskState::kRunning, status);
1234
0
        break;
1235
3
      default:
1236
3
        LOG(WARNING) << "TS " << permanent_uuid() << ": GetSafeTime failed for tablet "
1237
3
                     << tablet_->ToString() << ": " << status << " code " << resp_.error().code();
1238
3
        break;
1239
3
    }
1240
4.39k
  } else {
1241
4.39k
    TransitionToCompleteState();
1242
4.39k
    VLOG(1) << "TS " << permanent_uuid() << ": GetSafeTime complete on tablet "
1243
5
            << tablet_->ToString();
1244
4.39k
  }
1245
1246
4.39k
  server::UpdateClock(resp_, master_->clock());
1247
4.39k
}
1248
1249
4.40k
void GetSafeTimeForTablet::UnregisterAsyncTaskCallback() {
1250
4.40k
  Status status;
1251
4.40k
  HybridTime safe_time;
1252
4.40k
  if (resp_.has_error()) {
1253
0
    status = StatusFromPB(resp_.error().status());
1254
0
    VLOG(3) << "GetSafeTime for " << tablet_->ToString() << " got an error. Returning "
1255
0
            << safe_time;
1256
4.40k
  } else if (state() != MonitoredTaskState::kComplete) {
1257
1
    status = STATUS_FORMAT(InternalError, "$0 in state $1", description(), state());
1258
4.39k
  } else {
1259
4.39k
    safe_time = HybridTime(resp_.safe_time());
1260
4.39k
    if (safe_time.is_special()) {
1261
0
      LOG(ERROR) << "GetSafeTime for " << tablet_->ToString() << " got " << safe_time;
1262
4.39k
    } else {
1263
4.39k
      VLOG
(3) << "GetSafeTime for " << tablet_->ToString() << " got " << safe_time1
;
1264
4.39k
    }
1265
4.39k
  }
1266
4.40k
  WARN_NOT_OK(backfill_table_->UpdateSafeTime(status, safe_time),
1267
4.40k
    "Could not UpdateSafeTime");
1268
4.40k
}
1269
1270
4.40k
TabletServerId GetSafeTimeForTablet::permanent_uuid() {
1271
4.40k
  return target_ts_desc_ != nullptr ? target_ts_desc_->permanent_uuid() : 
""0
;
1272
4.40k
}
1273
1274
BackfillChunk::BackfillChunk(std::shared_ptr<BackfillTablet> backfill_tablet,
1275
                             const std::string& start_key)
1276
    : RetryingTSRpcTask(backfill_tablet->master(),
1277
                        backfill_tablet->threadpool(),
1278
                        std::unique_ptr<TSPicker>(new PickLeaderReplica(backfill_tablet->tablet())),
1279
                        backfill_tablet->tablet()->table().get()),
1280
      indexes_being_backfilled_(backfill_tablet->indexes_to_build()),
1281
      backfill_tablet_(backfill_tablet),
1282
      start_key_(start_key),
1283
      requested_index_names_(RetrieveIndexNames(backfill_tablet->master()->catalog_manager_impl(),
1284
4.52k
                                                indexes_being_backfilled_)) {
1285
4.52k
  deadline_ = MonoTime::Max(); // Never time out.
1286
4.52k
}
1287
1288
// -----------------------------------------------------------------------------------------------
1289
// BackfillChunk
1290
// -----------------------------------------------------------------------------------------------
1291
4.52k
void BackfillChunk::Launch() {
1292
4.52k
  backfill_tablet_->tablet()->table()->AddTask(shared_from_this());
1293
4.52k
  Status status = Run();
1294
4.52k
  WARN_NOT_OK(
1295
4.52k
      status, Substitute(
1296
4.52k
                  "Failed to send backfill Chunk request for $0",
1297
4.52k
                  backfill_tablet_->tablet().get()->ToString()));
1298
1299
  // Need to print this after Run() because that's where it picks the TS which description()
1300
  // needs.
1301
4.52k
  if (status.ok()) {
1302
4.52k
    LOG(INFO) << "Started BackfillChunk : " << this->description();
1303
4.52k
  }
1304
4.52k
}
1305
1306
4.52k
MonoTime BackfillChunk::ComputeDeadline() {
1307
4.52k
  MonoTime timeout = MonoTime::Now();
1308
4.52k
  if (GetTableType() == TableType::PGSQL_TABLE_TYPE) {
1309
1.99k
    timeout.AddDelta(MonoDelta::FromMilliseconds(FLAGS_ysql_index_backfill_rpc_timeout_ms));
1310
2.53k
  } else {
1311
2.53k
    DCHECK(GetTableType() == TableType::YQL_TABLE_TYPE);
1312
2.53k
    timeout.AddDelta(MonoDelta::FromMilliseconds(FLAGS_index_backfill_rpc_timeout_ms));
1313
2.53k
  }
1314
4.52k
  return MonoTime::Earliest(timeout, deadline_);
1315
4.52k
}
1316
1317
6
int BackfillChunk::num_max_retries() {
1318
6
  return FLAGS_index_backfill_rpc_max_retries;
1319
6
}
1320
1321
0
int BackfillChunk::max_delay_ms() {
1322
0
  return FLAGS_index_backfill_rpc_max_delay_ms;
1323
0
}
1324
1325
13.4k
std::string BackfillChunk::description() const {
1326
13.4k
  return yb::Format("Backfilling indexes $0 for tablet $1 from key '$2'",
1327
13.4k
                    requested_index_names_, tablet_id(),
1328
13.4k
                    b2a_hex(start_key_));
1329
13.4k
}
1330
1331
4.52k
bool BackfillChunk::SendRequest(int attempt) {
1332
4.52k
  VLOG
(1) << __PRETTY_FUNCTION__1
;
1333
4.52k
  if (indexes_being_backfilled_.empty()) {
1334
0
    TransitionToCompleteState();
1335
0
    return false;
1336
0
  }
1337
1338
4.52k
  tserver::BackfillIndexRequestPB req;
1339
4.52k
  req.set_dest_uuid(permanent_uuid());
1340
4.52k
  req.set_tablet_id(backfill_tablet_->tablet()->tablet_id());
1341
4.52k
  req.set_read_at_hybrid_time(backfill_tablet_->read_time_for_backfill().ToUint64());
1342
4.52k
  req.set_schema_version(backfill_tablet_->schema_version());
1343
4.52k
  req.set_start_key(start_key_);
1344
4.52k
  req.set_indexed_table_id(backfill_tablet_->indexed_table_id());
1345
4.52k
  if (GetTableType() == TableType::PGSQL_TABLE_TYPE) {
1346
1.99k
    req.set_namespace_name(backfill_tablet_->GetNamespaceName());
1347
1.99k
  }
1348
4.52k
  std::unordered_set<TableId> found_idxs;
1349
4.56k
  for (const IndexInfoPB& idx_info : backfill_tablet_->index_infos()) {
1350
4.56k
    if (indexes_being_backfilled_.find(idx_info.table_id()) != indexes_being_backfilled_.end()) {
1351
4.56k
      req.add_indexes()->CopyFrom(idx_info);
1352
4.56k
      found_idxs.insert(idx_info.table_id());
1353
4.56k
    }
1354
4.56k
  }
1355
4.52k
  if (found_idxs.size() != indexes_being_backfilled_.size()) {
1356
    // We could not find the IndexInfoPB for all the requested indexes. This can happen
1357
    // if that index was deleted while the backfill was still going on.
1358
    // We are going to fail fast and mark that index as failed.
1359
0
    for (auto& idx : indexes_being_backfilled_) {
1360
0
      if (found_idxs.find(idx) == found_idxs.end()) {
1361
0
        VLOG_WITH_PREFIX(3) << "Marking " << idx << " as failed";
1362
0
        *resp_.add_failed_index_ids() = idx;
1363
0
      }
1364
0
    }
1365
0
    const string error_message("Could not find IndexInfoPB for some indexes");
1366
0
    resp_.mutable_error()->mutable_status()->set_code(AppStatusPB::NOT_FOUND);
1367
0
    resp_.mutable_error()->mutable_status()->set_message(error_message);
1368
0
    TransitionToFailedState(MonitoredTaskState::kRunning,
1369
0
                            STATUS(NotFound, error_message));
1370
0
    return false;
1371
0
  }
1372
4.52k
  req.set_propagated_hybrid_time(backfill_tablet_->master()->clock()->Now().ToUint64());
1373
1374
4.52k
  ts_admin_proxy_->BackfillIndexAsync(req, &resp_, &rpc_, BindRpcCallback());
1375
4.52k
  VLOG(1) << "Send " << description() << " to " << permanent_uuid()
1376
1
          << " (attempt " << attempt << "):\n"
1377
1
          << req.DebugString();
1378
4.52k
  return true;
1379
4.52k
}
1380
1381
4.36k
void BackfillChunk::HandleResponse(int attempt) {
1382
4.36k
  VLOG
(1) << __PRETTY_FUNCTION__ << " response is " << yb::ToString(resp_)1
;
1383
4.36k
  Status status;
1384
4.36k
  if (resp_.has_error()) {
1385
38
    status = StatusFromPB(resp_.error().status());
1386
1387
    // Do not retry on a fatal error
1388
38
    switch (resp_.error().code()) {
1389
9
      case TabletServerErrorPB::MISMATCHED_SCHEMA:
1390
38
      case TabletServerErrorPB::OPERATION_NOT_SUPPORTED:
1391
38
      case TabletServerErrorPB::TABLET_HAS_A_NEWER_SCHEMA:
1392
38
      case TabletServerErrorPB::TABLET_NOT_FOUND:
1393
38
        LOG(WARNING) << "TS " << permanent_uuid() << ": backfill failed for tablet "
1394
38
                     << backfill_tablet_->tablet()->ToString() << " no further retry: " << status
1395
38
                     << " response was " << yb::ToString(resp_);
1396
38
        TransitionToFailedState(MonitoredTaskState::kRunning, status);
1397
38
        break;
1398
0
      default:
1399
0
        LOG(WARNING) << "TS " << permanent_uuid() << ": backfill failed for tablet "
1400
0
                     << backfill_tablet_->tablet()->ToString() << ": " << status.ToString()
1401
0
                     << " code " << resp_.error().code();
1402
0
        break;
1403
38
    }
1404
4.33k
  } else {
1405
4.33k
    TransitionToCompleteState();
1406
4.33k
    VLOG(1) << "TS " << permanent_uuid() << ": backfill complete on tablet "
1407
5
            << backfill_tablet_->tablet()->ToString();
1408
4.33k
  }
1409
1410
4.36k
  server::UpdateClock(resp_, master_->clock());
1411
4.36k
}
1412
1413
4.45k
void BackfillChunk::UnregisterAsyncTaskCallback() {
1414
4.45k
  if (state() == MonitoredTaskState::kAborted) {
1415
84
    VLOG
(1) << " was aborted"0
;
1416
84
    return;
1417
84
  }
1418
1419
4.37k
  Status status;
1420
4.37k
  std::unordered_set<TableId> failed_indexes;
1421
4.37k
  if (resp_.has_error()) {
1422
38
    status = StatusFromPB(resp_.error().status());
1423
38
    if (resp_.failed_index_ids_size() > 0) {
1424
58
      for (int i = 0; i < resp_.failed_index_ids_size(); 
i++29
) {
1425
29
        VLOG
(1) << " Added to failed index " << resp_.failed_index_ids(i)1
;
1426
29
        failed_indexes.insert(resp_.failed_index_ids(i));
1427
29
      }
1428
29
    } else {
1429
      // No specific index was marked as a failure. So consider all of them as failed.
1430
9
      failed_indexes = indexes_being_backfilled_;
1431
9
    }
1432
4.33k
  } else if (state() != MonitoredTaskState::kComplete) {
1433
    // There is no response, so the error happened even before we could
1434
    // get a response. Mark all indexes as failed.
1435
3
    failed_indexes = indexes_being_backfilled_;
1436
3
    VLOG(3) << "Considering all indexes : "
1437
0
            << yb::ToString(indexes_being_backfilled_)
1438
0
            << " as failed.";
1439
3
    status = STATUS_FORMAT(InternalError, "$0 in state $1", description(), state());
1440
3
  }
1441
1442
4.37k
  if (resp_.has_backfilled_until()) {
1443
4.35k
    backfill_tablet_->Done(
1444
4.35k
        status, resp_.backfilled_until(), resp_.number_rows_processed(), failed_indexes);
1445
4.35k
  } else {
1446
20
    backfill_tablet_->Done(status, boost::none, resp_.number_rows_processed(), failed_indexes);
1447
20
  }
1448
4.37k
}
1449
1450
4.56k
TabletServerId BackfillChunk::permanent_uuid() {
1451
4.56k
  return target_ts_desc_ != nullptr ? target_ts_desc_->permanent_uuid() : 
""0
;
1452
4.56k
}
1453
1454
}  // namespace master
1455
}  // namespace yb