/Users/deen/code/yugabyte-db/src/yb/master/sys_catalog_writer.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/master/sys_catalog_writer.h" |
15 | | |
16 | | #include "yb/common/pgsql_protocol.pb.h" |
17 | | #include "yb/common/ql_expr.h" |
18 | | #include "yb/common/ql_protocol_util.h" |
19 | | |
20 | | #include "yb/docdb/doc_ql_scanspec.h" |
21 | | #include "yb/docdb/doc_rowwise_iterator.h" |
22 | | |
23 | | #include "yb/gutil/casts.h" |
24 | | |
25 | | #include "yb/master/sys_catalog_constants.h" |
26 | | |
27 | | #include "yb/tablet/tablet.h" |
28 | | |
29 | | #include "yb/tserver/tserver.pb.h" |
30 | | |
31 | | #include "yb/util/pb_util.h" |
32 | | #include "yb/util/status_format.h" |
33 | | |
34 | | namespace yb { |
35 | | namespace master { |
36 | | |
37 | | namespace { |
38 | | |
39 | 475k | void SetBinaryValue(const Slice& binary_value, QLExpressionPB* expr_pb) { |
40 | 475k | expr_pb->mutable_value()->set_binary_value(binary_value.cdata(), binary_value.size()); |
41 | 475k | } |
42 | | |
43 | 239k | void SetInt8Value(const int8_t int8_value, QLExpressionPB* expr_pb) { |
44 | 239k | expr_pb->mutable_value()->set_int8_value(int8_value); |
45 | 239k | } |
46 | | |
47 | | CHECKED_STATUS SetColumnId( |
48 | 236k | const Schema& schema_with_ids, const std::string& column_name, QLColumnValuePB* col_pb) { |
49 | 236k | auto column_id = VERIFY_RESULT(schema_with_ids.ColumnIdByName(column_name)); |
50 | 236k | col_pb->set_column_id(column_id.rep()); |
51 | 236k | return Status::OK(); |
52 | 236k | } |
53 | | |
54 | | } // namespace |
55 | | |
56 | 1.15M | bool IsWrite(QLWriteRequestPB::QLStmtType op_type) { |
57 | 1.15M | return op_type == QLWriteRequestPB::QL_STMT_INSERT || op_type == QLWriteRequestPB::QL_STMT_UPDATE; |
58 | 1.15M | } |
59 | | |
60 | | SysCatalogWriter::SysCatalogWriter(const Schema& schema_with_ids, int64_t leader_term) |
61 | | : schema_with_ids_(schema_with_ids), req_(std::make_unique<tserver::WriteRequestPB>()), |
62 | 261k | leader_term_(leader_term) { |
63 | 261k | } |
64 | | |
65 | 261k | SysCatalogWriter::~SysCatalogWriter() = default; |
66 | | |
67 | | Status SysCatalogWriter::DoMutateItem( |
68 | | int8_t type, |
69 | | const std::string& item_id, |
70 | | const google::protobuf::Message& prev_pb, |
71 | | const google::protobuf::Message& new_pb, |
72 | 336k | QLWriteRequestPB::QLStmtType op_type) { |
73 | 336k | const bool is_write = IsWrite(op_type); |
74 | | |
75 | 336k | if (is_write) { |
76 | 334k | std::string diff; |
77 | | |
78 | 334k | if (pb_util::ArePBsEqual(prev_pb, new_pb, VLOG_IS_ON(2) ? &diff : nullptr)) { |
79 | 0 | VLOG(2) << "Skipping empty update for item " << item_id; |
80 | | // Short-circuit empty updates. |
81 | 97.8k | return Status::OK(); |
82 | 97.8k | } |
83 | | |
84 | 18.4E | VLOG(2) << "Updating item " << item_id << " in catalog: " << diff; |
85 | 236k | } |
86 | | |
87 | 239k | return FillSysCatalogWriteRequest( |
88 | 239k | type, item_id, new_pb, op_type, schema_with_ids_, req_->add_ql_write_batch()); |
89 | 336k | } |
90 | | |
91 | | Status SysCatalogWriter::InsertPgsqlTableRow(const Schema& source_schema, |
92 | | const QLTableRow& source_row, |
93 | | const TableId& target_table_id, |
94 | | const Schema& target_schema, |
95 | | const uint32_t target_schema_version, |
96 | 984k | bool is_upsert) { |
97 | 984k | PgsqlWriteRequestPB* pgsql_write = req_->add_pgsql_write_batch(); |
98 | | |
99 | 984k | pgsql_write->set_client(YQL_CLIENT_PGSQL); |
100 | 984k | if (is_upsert) { |
101 | 984k | pgsql_write->set_stmt_type(PgsqlWriteRequestPB::PGSQL_UPSERT); |
102 | 0 | } else { |
103 | 0 | pgsql_write->set_stmt_type(PgsqlWriteRequestPB::PGSQL_INSERT); |
104 | 0 | } |
105 | 984k | pgsql_write->set_table_id(target_table_id); |
106 | 984k | pgsql_write->set_schema_version(target_schema_version); |
107 | | |
108 | | // Postgres sys catalog table is non-partitioned. So there should be no hash column. |
109 | 984k | DCHECK_EQ(source_schema.num_hash_key_columns(), 0); |
110 | 3.73M | for (size_t i = 0; i < source_schema.num_range_key_columns(); i++) { |
111 | 2.74M | const auto& value = source_row.GetValue(source_schema.column_id(i)); |
112 | 2.74M | if (value) { |
113 | 2.74M | pgsql_write->add_range_column_values()->mutable_value()->CopyFrom(*value); |
114 | 0 | } else { |
115 | 0 | return STATUS_FORMAT(Corruption, "Range value of column id $0 missing for table $1", |
116 | 0 | source_schema.column_id(i), target_table_id); |
117 | 0 | } |
118 | 2.74M | } |
119 | 6.56M | for (size_t i = source_schema.num_range_key_columns(); i < source_schema.num_columns(); i++) { |
120 | 5.58M | const auto& value = source_row.GetValue(source_schema.column_id(i)); |
121 | 5.58M | if (value) { |
122 | 5.58M | PgsqlColumnValuePB* column_value = pgsql_write->add_column_values(); |
123 | 5.58M | column_value->set_column_id(target_schema.column_id(i)); |
124 | 5.58M | column_value->mutable_expr()->mutable_value()->CopyFrom(*value); |
125 | 5.58M | } |
126 | 5.58M | } |
127 | | |
128 | 984k | return Status::OK(); |
129 | 984k | } |
130 | | |
131 | | Status FillSysCatalogWriteRequest( |
132 | | int8_t type, const std::string& item_id, const Slice& data, |
133 | 239k | QLWriteRequestPB::QLStmtType op_type, const Schema& schema_with_ids, QLWriteRequestPB* req) { |
134 | 239k | if (IsWrite(op_type)) { |
135 | | // Add the metadata column. |
136 | 236k | QLColumnValuePB* metadata = req->add_column_values(); |
137 | 236k | RETURN_NOT_OK(SetColumnId(schema_with_ids, kSysCatalogTableColMetadata, metadata)); |
138 | 236k | SetBinaryValue(data, metadata->mutable_expr()); |
139 | 236k | } |
140 | | |
141 | 239k | req->set_type(op_type); |
142 | | |
143 | | // Add column type. |
144 | 239k | SetInt8Value(type, req->add_range_column_values()); |
145 | | |
146 | | // Add column id. |
147 | 239k | SetBinaryValue(item_id, req->add_range_column_values()); |
148 | | |
149 | 239k | return Status::OK(); |
150 | 239k | } |
151 | | |
152 | | Status FillSysCatalogWriteRequest( |
153 | | int8_t type, const std::string& item_id, const google::protobuf::Message& new_pb, |
154 | 239k | QLWriteRequestPB::QLStmtType op_type, const Schema& schema_with_ids, QLWriteRequestPB* req) { |
155 | 239k | req->set_type(op_type); |
156 | | |
157 | 239k | if (IsWrite(op_type)) { |
158 | 236k | faststring metadata_buf; |
159 | | |
160 | 236k | pb_util::SerializeToString(new_pb, &metadata_buf); |
161 | | |
162 | 236k | return FillSysCatalogWriteRequest( |
163 | 236k | type, item_id, Slice(metadata_buf.data(), metadata_buf.size()), op_type, schema_with_ids, |
164 | 236k | req); |
165 | 236k | } |
166 | | |
167 | 2.28k | return FillSysCatalogWriteRequest(type, item_id, Slice(), op_type, schema_with_ids, req); |
168 | 2.28k | } |
169 | | |
170 | | Status EnumerateSysCatalog( |
171 | | tablet::Tablet* tablet, const Schema& schema, int8_t entry_type, |
172 | 26.2k | const EnumerationCallback& callback) { |
173 | 26.2k | auto iter = VERIFY_RESULT(tablet->NewRowIterator( |
174 | 26.2k | schema.CopyWithoutColumnIds(), ReadHybridTime::Max(), /* table_id= */ "", |
175 | 26.2k | CoarseTimePoint::max(), tablet::AllowBootstrappingState::kTrue)); |
176 | | |
177 | 26.2k | return EnumerateSysCatalog( |
178 | 26.2k | down_cast<docdb::DocRowwiseIterator*>(iter.get()), schema, entry_type, callback); |
179 | 26.2k | } |
180 | | |
181 | | Status EnumerateSysCatalog( |
182 | | docdb::DocRowwiseIterator* doc_iter, const Schema& schema, int8_t entry_type, |
183 | 26.2k | const EnumerationCallback& callback) { |
184 | 26.2k | const auto type_col_idx = VERIFY_RESULT(schema.ColumnIndexByName(kSysCatalogTableColType)); |
185 | 26.2k | const auto entry_id_col_idx = VERIFY_RESULT(schema.ColumnIndexByName(kSysCatalogTableColId)); |
186 | 26.2k | const auto metadata_col_idx = VERIFY_RESULT(schema.ColumnIndexByName( |
187 | 26.2k | kSysCatalogTableColMetadata)); |
188 | | |
189 | 26.2k | QLConditionPB cond; |
190 | 26.2k | cond.set_op(QL_OP_AND); |
191 | 26.2k | QLAddInt8Condition(&cond, schema.column_id(type_col_idx), QL_OP_EQUAL, entry_type); |
192 | 26.2k | const std::vector<docdb::PrimitiveValue> empty_hash_components; |
193 | 26.2k | docdb::DocQLScanSpec spec( |
194 | 26.2k | schema, boost::none /* hash_code */, boost::none /* max_hash_code */, |
195 | 26.2k | empty_hash_components, &cond, nullptr /* if_req */, rocksdb::kDefaultQueryId); |
196 | 26.2k | RETURN_NOT_OK(doc_iter->Init(spec)); |
197 | | |
198 | 26.2k | QLTableRow value_map; |
199 | 26.2k | QLValue found_entry_type, entry_id, metadata; |
200 | 223k | while (VERIFY_RESULT(doc_iter->HasNext())) { |
201 | 223k | RETURN_NOT_OK(doc_iter->NextRow(&value_map)); |
202 | 223k | RETURN_NOT_OK(value_map.GetValue(schema.column_id(type_col_idx), &found_entry_type)); |
203 | 223k | SCHECK_EQ(found_entry_type.int8_value(), entry_type, Corruption, "Found wrong entry type"); |
204 | 223k | RETURN_NOT_OK(value_map.GetValue(schema.column_id(entry_id_col_idx), &entry_id)); |
205 | 223k | RETURN_NOT_OK(value_map.GetValue(schema.column_id(metadata_col_idx), &metadata)); |
206 | 223k | SCHECK_EQ(metadata.type(), InternalType::kBinaryValue, Corruption, |
207 | 223k | "System catalog snapshot is corrupted, or is built using different build type"); |
208 | 223k | RETURN_NOT_OK(callback(entry_id.binary_value(), metadata.binary_value())); |
209 | 223k | } |
210 | | |
211 | 26.2k | return Status::OK(); |
212 | 26.2k | } |
213 | | |
214 | | } // namespace master |
215 | | } // namespace yb |