YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/master/sys_catalog_writer.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/master/sys_catalog_writer.h"
15
16
#include "yb/common/pgsql_protocol.pb.h"
17
#include "yb/common/ql_expr.h"
18
#include "yb/common/ql_protocol_util.h"
19
20
#include "yb/docdb/doc_ql_scanspec.h"
21
#include "yb/docdb/doc_rowwise_iterator.h"
22
23
#include "yb/gutil/casts.h"
24
25
#include "yb/master/sys_catalog_constants.h"
26
27
#include "yb/tablet/tablet.h"
28
29
#include "yb/tserver/tserver.pb.h"
30
31
#include "yb/util/pb_util.h"
32
#include "yb/util/status_format.h"
33
34
namespace yb {
35
namespace master {
36
37
namespace {
38
39
475k
void SetBinaryValue(const Slice& binary_value, QLExpressionPB* expr_pb) {
40
475k
  expr_pb->mutable_value()->set_binary_value(binary_value.cdata(), binary_value.size());
41
475k
}
42
43
239k
void SetInt8Value(const int8_t int8_value, QLExpressionPB* expr_pb) {
44
239k
  expr_pb->mutable_value()->set_int8_value(int8_value);
45
239k
}
46
47
CHECKED_STATUS SetColumnId(
48
236k
    const Schema& schema_with_ids, const std::string& column_name, QLColumnValuePB* col_pb) {
49
236k
  auto column_id = VERIFY_RESULT(schema_with_ids.ColumnIdByName(column_name));
50
236k
  col_pb->set_column_id(column_id.rep());
51
236k
  return Status::OK();
52
236k
}
53
54
} // namespace
55
56
1.15M
bool IsWrite(QLWriteRequestPB::QLStmtType op_type) {
57
1.15M
  return op_type == QLWriteRequestPB::QL_STMT_INSERT || op_type == QLWriteRequestPB::QL_STMT_UPDATE;
58
1.15M
}
59
60
SysCatalogWriter::SysCatalogWriter(const Schema& schema_with_ids, int64_t leader_term)
61
    : schema_with_ids_(schema_with_ids), req_(std::make_unique<tserver::WriteRequestPB>()),
62
261k
      leader_term_(leader_term) {
63
261k
}
64
65
261k
SysCatalogWriter::~SysCatalogWriter() = default;
66
67
Status SysCatalogWriter::DoMutateItem(
68
    int8_t type,
69
    const std::string& item_id,
70
    const google::protobuf::Message& prev_pb,
71
    const google::protobuf::Message& new_pb,
72
336k
    QLWriteRequestPB::QLStmtType op_type) {
73
336k
  const bool is_write = IsWrite(op_type);
74
75
336k
  if (is_write) {
76
334k
    std::string diff;
77
78
334k
    if (pb_util::ArePBsEqual(prev_pb, new_pb, VLOG_IS_ON(2) ? &diff : nullptr)) {
79
0
      VLOG(2) << "Skipping empty update for item " << item_id;
80
      // Short-circuit empty updates.
81
97.8k
      return Status::OK();
82
97.8k
    }
83
84
18.4E
    VLOG(2) << "Updating item " << item_id << " in catalog: " << diff;
85
236k
  }
86
87
239k
  return FillSysCatalogWriteRequest(
88
239k
      type, item_id, new_pb, op_type, schema_with_ids_, req_->add_ql_write_batch());
89
336k
}
90
91
Status SysCatalogWriter::InsertPgsqlTableRow(const Schema& source_schema,
92
                                             const QLTableRow& source_row,
93
                                             const TableId& target_table_id,
94
                                             const Schema& target_schema,
95
                                             const uint32_t target_schema_version,
96
984k
                                             bool is_upsert) {
97
984k
  PgsqlWriteRequestPB* pgsql_write = req_->add_pgsql_write_batch();
98
99
984k
  pgsql_write->set_client(YQL_CLIENT_PGSQL);
100
984k
  if (is_upsert) {
101
984k
    pgsql_write->set_stmt_type(PgsqlWriteRequestPB::PGSQL_UPSERT);
102
0
  } else {
103
0
    pgsql_write->set_stmt_type(PgsqlWriteRequestPB::PGSQL_INSERT);
104
0
  }
105
984k
  pgsql_write->set_table_id(target_table_id);
106
984k
  pgsql_write->set_schema_version(target_schema_version);
107
108
  // Postgres sys catalog table is non-partitioned. So there should be no hash column.
109
984k
  DCHECK_EQ(source_schema.num_hash_key_columns(), 0);
110
3.73M
  for (size_t i = 0; i < source_schema.num_range_key_columns(); i++) {
111
2.74M
    const auto& value = source_row.GetValue(source_schema.column_id(i));
112
2.74M
    if (value) {
113
2.74M
      pgsql_write->add_range_column_values()->mutable_value()->CopyFrom(*value);
114
0
    } else {
115
0
      return STATUS_FORMAT(Corruption, "Range value of column id $0 missing for table $1",
116
0
                           source_schema.column_id(i), target_table_id);
117
0
    }
118
2.74M
  }
119
6.56M
  for (size_t i = source_schema.num_range_key_columns(); i < source_schema.num_columns(); i++) {
120
5.58M
    const auto& value = source_row.GetValue(source_schema.column_id(i));
121
5.58M
    if (value) {
122
5.58M
      PgsqlColumnValuePB* column_value = pgsql_write->add_column_values();
123
5.58M
      column_value->set_column_id(target_schema.column_id(i));
124
5.58M
      column_value->mutable_expr()->mutable_value()->CopyFrom(*value);
125
5.58M
    }
126
5.58M
  }
127
128
984k
  return Status::OK();
129
984k
}
130
131
Status FillSysCatalogWriteRequest(
132
    int8_t type, const std::string& item_id, const Slice& data,
133
239k
    QLWriteRequestPB::QLStmtType op_type, const Schema& schema_with_ids, QLWriteRequestPB* req) {
134
239k
  if (IsWrite(op_type)) {
135
    // Add the metadata column.
136
236k
    QLColumnValuePB* metadata = req->add_column_values();
137
236k
    RETURN_NOT_OK(SetColumnId(schema_with_ids, kSysCatalogTableColMetadata, metadata));
138
236k
    SetBinaryValue(data, metadata->mutable_expr());
139
236k
  }
140
141
239k
  req->set_type(op_type);
142
143
  // Add column type.
144
239k
  SetInt8Value(type, req->add_range_column_values());
145
146
  // Add column id.
147
239k
  SetBinaryValue(item_id, req->add_range_column_values());
148
149
239k
  return Status::OK();
150
239k
}
151
152
Status FillSysCatalogWriteRequest(
153
    int8_t type, const std::string& item_id, const google::protobuf::Message& new_pb,
154
239k
    QLWriteRequestPB::QLStmtType op_type, const Schema& schema_with_ids, QLWriteRequestPB* req) {
155
239k
  req->set_type(op_type);
156
157
239k
  if (IsWrite(op_type)) {
158
236k
    faststring metadata_buf;
159
160
236k
    pb_util::SerializeToString(new_pb, &metadata_buf);
161
162
236k
    return FillSysCatalogWriteRequest(
163
236k
        type, item_id, Slice(metadata_buf.data(), metadata_buf.size()), op_type, schema_with_ids,
164
236k
        req);
165
236k
  }
166
167
2.28k
  return FillSysCatalogWriteRequest(type, item_id, Slice(), op_type, schema_with_ids, req);
168
2.28k
}
169
170
Status EnumerateSysCatalog(
171
    tablet::Tablet* tablet, const Schema& schema, int8_t entry_type,
172
26.2k
    const EnumerationCallback& callback) {
173
26.2k
  auto iter = VERIFY_RESULT(tablet->NewRowIterator(
174
26.2k
      schema.CopyWithoutColumnIds(), ReadHybridTime::Max(), /* table_id= */ "",
175
26.2k
      CoarseTimePoint::max(), tablet::AllowBootstrappingState::kTrue));
176
177
26.2k
  return EnumerateSysCatalog(
178
26.2k
      down_cast<docdb::DocRowwiseIterator*>(iter.get()), schema, entry_type, callback);
179
26.2k
}
180
181
Status EnumerateSysCatalog(
182
    docdb::DocRowwiseIterator* doc_iter, const Schema& schema, int8_t entry_type,
183
26.2k
    const EnumerationCallback& callback) {
184
26.2k
  const auto type_col_idx = VERIFY_RESULT(schema.ColumnIndexByName(kSysCatalogTableColType));
185
26.2k
  const auto entry_id_col_idx = VERIFY_RESULT(schema.ColumnIndexByName(kSysCatalogTableColId));
186
26.2k
  const auto metadata_col_idx = VERIFY_RESULT(schema.ColumnIndexByName(
187
26.2k
      kSysCatalogTableColMetadata));
188
189
26.2k
  QLConditionPB cond;
190
26.2k
  cond.set_op(QL_OP_AND);
191
26.2k
  QLAddInt8Condition(&cond, schema.column_id(type_col_idx), QL_OP_EQUAL, entry_type);
192
26.2k
  const std::vector<docdb::PrimitiveValue> empty_hash_components;
193
26.2k
  docdb::DocQLScanSpec spec(
194
26.2k
      schema, boost::none /* hash_code */, boost::none /* max_hash_code */,
195
26.2k
      empty_hash_components, &cond, nullptr /* if_req */, rocksdb::kDefaultQueryId);
196
26.2k
  RETURN_NOT_OK(doc_iter->Init(spec));
197
198
26.2k
  QLTableRow value_map;
199
26.2k
  QLValue found_entry_type, entry_id, metadata;
200
223k
  while (VERIFY_RESULT(doc_iter->HasNext())) {
201
223k
    RETURN_NOT_OK(doc_iter->NextRow(&value_map));
202
223k
    RETURN_NOT_OK(value_map.GetValue(schema.column_id(type_col_idx), &found_entry_type));
203
223k
    SCHECK_EQ(found_entry_type.int8_value(), entry_type, Corruption, "Found wrong entry type");
204
223k
    RETURN_NOT_OK(value_map.GetValue(schema.column_id(entry_id_col_idx), &entry_id));
205
223k
    RETURN_NOT_OK(value_map.GetValue(schema.column_id(metadata_col_idx), &metadata));
206
223k
    SCHECK_EQ(metadata.type(), InternalType::kBinaryValue, Corruption,
207
223k
              "System catalog snapshot is corrupted, or is built using different build type");
208
223k
    RETURN_NOT_OK(callback(entry_id.binary_value(), metadata.binary_value()));
209
223k
  }
210
211
26.2k
  return Status::OK();
212
26.2k
}
213
214
} // namespace master
215
} // namespace yb