/Users/deen/code/yugabyte-db/src/yb/yql/pgwrapper/ysql_upgrade.cc
Line | Count | Source (jump to first uncovered line) |
1 | | //-------------------------------------------------------------------------------------------------- |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | //-------------------------------------------------------------------------------------------------- |
14 | | |
15 | | #include "yb/yql/pgwrapper/ysql_upgrade.h" |
16 | | |
17 | | #include <regex> |
18 | | |
19 | | #include <boost/algorithm/string.hpp> |
20 | | |
21 | | #include "server/catalog/pg_yb_migration_d.h" |
22 | | |
23 | | #include "yb/util/env_util.h" |
24 | | #include "yb/util/format.h" |
25 | | #include "yb/util/path_util.h" |
26 | | #include "yb/util/pg_util.h" |
27 | | |
28 | | namespace yb { |
29 | | namespace pgwrapper { |
30 | | |
31 | | namespace { |
32 | | |
33 | | const int kCatalogVersionMigrationNumber = 1; |
34 | | |
35 | | const char* kStaticDataParentDir = "share"; |
36 | | const char* kMigrationsDir = "ysql_migrations"; |
37 | | |
38 | 458 | std::ostream& operator<<(std::ostream& os, const Version& v) { |
39 | 458 | os << v.first << "." << v.second; |
40 | 458 | return os; |
41 | 458 | } |
42 | | |
43 | | Result<int64_t> SelectCountStar(PGConn* pgconn, |
44 | | const std::string& table_name, |
45 | 45 | const std::string& where_clause = "") { |
46 | 45 | auto query_str = Format("SELECT COUNT(*) FROM $0$1", |
47 | 45 | table_name, |
48 | 45 | where_clause == "" ? ""1 : Format(" WHERE $0", where_clause)44 ); |
49 | 45 | auto res = VERIFY_RESULT(pgconn->Fetch(query_str)); |
50 | 45 | SCHECK(PQntuples(res.get()) == 1, InternalError, |
51 | 45 | Format("Query $0 was expected to return a single row", query_str)); |
52 | 45 | return pgwrapper::GetInt64(res.get(), 0, 0); |
53 | 45 | } |
54 | | |
55 | 39 | Result<bool> SystemTableExists(PGConn* pgconn, const std::string& table_name) { |
56 | 39 | auto where_clause = Format("relname = '$0' AND relnamespace = 'pg_catalog'::regnamespace", |
57 | 39 | table_name); |
58 | 39 | return VERIFY_RESULT(SelectCountStar(pgconn, "pg_class", where_clause)) == 1; |
59 | 39 | } |
60 | | |
61 | | // Verify that system table exists and is not empty. |
62 | 13 | Result<bool> SystemTableHasRows(PGConn* pgconn, const std::string& table_name) { |
63 | 13 | if (!VERIFY_RESULT(SystemTableExists(pgconn, table_name))) |
64 | 12 | return false; |
65 | 1 | return VERIFY_RESULT(SelectCountStar(pgconn, table_name)) > 0; |
66 | 1 | } |
67 | | |
68 | 5 | Result<bool> FunctionExists(PGConn* pgconn, const std::string& function_name) { |
69 | 5 | auto where_clause = Format("proname = '$0'", function_name); |
70 | 5 | return VERIFY_RESULT(SelectCountStar(pgconn, "pg_proc", where_clause)) == 1; |
71 | 5 | } |
72 | | |
73 | 226 | std::string WrapSystemDml(const std::string& query) { |
74 | 226 | return "SET LOCAL yb_non_ddl_txn_for_sys_tables_allowed TO true;\n" + query; |
75 | 226 | } |
76 | | |
77 | | // Analyze pg_catalog state of a database to determine a current major version of a catalog state |
78 | | // by checking presence of catalog changing features released before the migrations feature landed. |
79 | | // 0 means that no migrations were applied yet. |
80 | 13 | Result<int> GetMajorVersionFromSystemCatalogState(PGConn* pgconn) { |
81 | 13 | int major_version = 0; |
82 | | |
83 | | // Helper macro removing boilerplate. |
84 | 13 | #define INCREMENT_VERSION_OR_RETURN_IT(oneliner_with_result) \ |
85 | 20 | if (VERIFY_RESULT(oneliner_with_result)) { \ |
86 | 8 | ++major_version; \ |
87 | 12 | } else { \ |
88 | 12 | return major_version; \ |
89 | 12 | } |
90 | | |
91 | | // V1: #3979 introducing pg_yb_catalog_version table. |
92 | 13 | INCREMENT_VERSION_OR_RETURN_IT(SystemTableHasRows(pgconn, "pg_yb_catalog_version")) |
93 | | |
94 | | // V2: #4525 which creates pg_tablegroup. |
95 | 1 | INCREMENT_VERSION_OR_RETURN_IT(SystemTableExists(pgconn, "pg_tablegroup")) |
96 | | |
97 | | // V3: #5478 installing pg_stat_statements. |
98 | 1 | INCREMENT_VERSION_OR_RETURN_IT(SystemTableExists(pgconn, "pg_stat_statements")) |
99 | | |
100 | | // V4: #5408 introducing a bunch of JSONB functions. |
101 | 1 | INCREMENT_VERSION_OR_RETURN_IT(FunctionExists(pgconn, "jsonb_path_query")) |
102 | | |
103 | | // V5: #6509 introducing yb_getrusage and yb_mem_usage* functions. |
104 | 1 | INCREMENT_VERSION_OR_RETURN_IT(FunctionExists(pgconn, "yb_getrusage")) |
105 | | |
106 | | // V6: #7879 introducing yb_servers function. |
107 | 1 | INCREMENT_VERSION_OR_RETURN_IT(FunctionExists(pgconn, "yb_servers")) |
108 | | |
109 | | // V7: #8719 introducing yb_hash_code function. |
110 | 1 | INCREMENT_VERSION_OR_RETURN_IT(FunctionExists(pgconn, "yb_hash_code")) |
111 | | |
112 | | // V8: #7850 introducing ybgin access method. |
113 | 1 | INCREMENT_VERSION_OR_RETURN_IT(FunctionExists(pgconn, "ybginhandler")) |
114 | | |
115 | 1 | return major_version; |
116 | 1 | } |
117 | | |
118 | | // Create a pg_yb_migration if it doesn't exist yet. |
119 | | // Returns true if the table was created or false if it was present already. |
120 | 24 | Result<bool> CreateMigrationTableIfNotExist(PGConn* pgconn) { |
121 | 24 | if (VERIFY_RESULT(SystemTableExists(pgconn, "pg_yb_migration"))) { |
122 | 12 | LOG(INFO) << "pg_yb_migration table is present"; |
123 | 12 | return false; |
124 | 12 | } |
125 | | |
126 | 12 | const std::string query_str( |
127 | 12 | "CREATE TABLE pg_catalog.pg_yb_migration (" |
128 | 12 | " major int NOT NULL," |
129 | 12 | " minor int NOT NULL," |
130 | 12 | " name name NOT NULL," |
131 | 12 | " time_applied bigint" |
132 | 12 | ") WITH (table_oid = $0, row_type_oid = $1);"); |
133 | 12 | RETURN_NOT_OK(pgconn->ExecuteFormat(query_str, |
134 | 12 | YBMigrationRelationId, |
135 | 12 | YBMigrationRelation_Rowtype_Id)); |
136 | 12 | LOG(INFO) << "pg_yb_migration table was created"; |
137 | 12 | return true; |
138 | 12 | } |
139 | | |
140 | | // Determine a YSQL version of a given database and make sure it's recorded in pg_yb_migration. |
141 | | // Creates a pg_yb_migration if it doesn't yet exist. |
142 | 24 | Result<Version> DetermineAndSetVersion(PGConn* pgconn) { |
143 | 24 | bool table_created = VERIFY_RESULT(CreateMigrationTableIfNotExist(pgconn)); |
144 | | |
145 | | // If pg_yb_migration was present before and has values, that's our version. |
146 | 24 | if (!table_created) { |
147 | 12 | const std::string query_str( |
148 | 12 | "SELECT major, minor FROM pg_catalog.pg_yb_migration" |
149 | 12 | " ORDER BY major DESC, minor DESC" |
150 | 12 | " LIMIT 1"); |
151 | 12 | pgwrapper::PGResultPtr res = VERIFY_RESULT(pgconn->Fetch(query_str)); |
152 | 12 | if (PQntuples(res.get()) == 1) { |
153 | 11 | int major_version = VERIFY_RESULT(pgwrapper::GetInt32(res.get(), 0, 0)); |
154 | 11 | int minor_version = VERIFY_RESULT(pgwrapper::GetInt32(res.get(), 0, 1)); |
155 | 0 | Version ver(major_version, minor_version); |
156 | 11 | LOG(INFO) << "Version is " << ver; |
157 | 11 | return ver; |
158 | 11 | } |
159 | 12 | } |
160 | | |
161 | 13 | int major_version = VERIFY_RESULT(GetMajorVersionFromSystemCatalogState(pgconn)); |
162 | 0 | const std::string query_str( |
163 | 13 | "INSERT INTO pg_catalog.pg_yb_migration (major, minor, name, time_applied)" |
164 | 13 | " VALUES ($0, 0, '<baseline>', NULL);"); |
165 | 13 | RETURN_NOT_OK(pgconn->ExecuteFormat(WrapSystemDml(query_str), major_version)); |
166 | | |
167 | 13 | Version ver(major_version, 0); |
168 | 13 | LOG(INFO) << "Inserted a version " << ver; |
169 | 13 | return ver; |
170 | 13 | } |
171 | | |
172 | 80 | bool IsNonSqlFile(const std::string& filename) { |
173 | 80 | return !boost::algorithm::iends_with(filename, ".sql"); |
174 | 80 | } |
175 | | |
176 | | } // anonymous namespace |
177 | | |
178 | | YsqlUpgradeHelper::YsqlUpgradeHelper(const HostPort& ysql_proxy_addr, |
179 | | uint64_t ysql_auth_key, |
180 | | uint32_t heartbeat_interval_ms) |
181 | | : ysql_proxy_addr_(ysql_proxy_addr), |
182 | | ysql_auth_key_(ysql_auth_key), |
183 | 4 | heartbeat_interval_ms_(heartbeat_interval_ms) { |
184 | 4 | } |
185 | | |
186 | 4 | Status YsqlUpgradeHelper::AnalyzeMigrationFiles() { |
187 | 4 | const std::string search_for_dir = JoinPathSegments(kStaticDataParentDir, kMigrationsDir); |
188 | 4 | const std::string root_dir = env_util::GetRootDir(search_for_dir); |
189 | 4 | SCHECK(root_dir != "", InternalError, |
190 | 4 | "Executable path not found"); |
191 | 4 | migrations_dir_ = |
192 | 4 | JoinPathSegments(root_dir, kStaticDataParentDir, kMigrationsDir); |
193 | 4 | auto* env = Env::Default(); |
194 | 4 | SCHECK(env->DirExists(migrations_dir_), InternalError, |
195 | 4 | "Migrations directory not found"); |
196 | | |
197 | 4 | migration_filenames_map_.clear(); |
198 | 4 | std::vector<std::string> migration_filenames; |
199 | 4 | RETURN_NOT_OK(env->GetChildren(migrations_dir_, &migration_filenames)); |
200 | | |
201 | | // Remove unrelated files. |
202 | 4 | migration_filenames.erase( |
203 | 4 | std::remove_if(migration_filenames.begin(), migration_filenames.end(), IsNonSqlFile), |
204 | 4 | migration_filenames.end()); |
205 | | |
206 | 4 | SCHECK(migration_filenames.size() > 0, InternalError, |
207 | 4 | "No migrations found!"); |
208 | | |
209 | | // Check that all migrations conform to the naming schema. |
210 | 4 | static const std::regex regex("V(\\d+)(\\.(\\d+))?__\\d+__[_0-9A-Za-z]+\\.sql"); |
211 | 4 | std::smatch version_match; |
212 | 72 | for (size_t i = 0; i < migration_filenames.size(); ++i68 ) { |
213 | 68 | const auto& filename = migration_filenames[i]; |
214 | 68 | SCHECK(std::regex_search(filename.begin(), filename.end(), version_match, regex), |
215 | 68 | InternalError, |
216 | 68 | Format("Migration '$0' does not conform to the filename pattern", filename)); |
217 | 68 | int major_version = std::stoi(version_match[1]); |
218 | 68 | int minor_version = version_match[3].length() > 0 ? std::stoi(version_match[3])0 : 0; |
219 | 68 | Version version{major_version, minor_version}; |
220 | 68 | migration_filenames_map_[version] = filename; |
221 | 68 | } |
222 | | |
223 | 4 | latest_version_ = std::prev(migration_filenames_map_.end())->first; |
224 | | |
225 | 4 | return Status::OK(); |
226 | 4 | } |
227 | | |
228 | 24 | Result<PGConn> YsqlUpgradeHelper::Connect(const std::string& database_name) { |
229 | | // Construct connection string. Note that the plain password in the connection string will be |
230 | | // sent over the wire, but since it only goes over a unix-domain socket, there should be no |
231 | | // eavesdropping/tampering issues. |
232 | 24 | const std::string conn_str = Format( |
233 | 24 | "user=$0 password=$1 host=$2 port=$3 dbname=$4", |
234 | 24 | "postgres", |
235 | 24 | ysql_auth_key_, |
236 | 24 | PgDeriveSocketDir(ysql_proxy_addr_.host()), |
237 | 24 | ysql_proxy_addr_.port(), |
238 | 24 | pgwrapper::PqEscapeLiteral(database_name)); |
239 | | |
240 | 24 | PGConn pgconn = VERIFY_RESULT(PGConn::Connect(conn_str)); |
241 | | |
242 | 24 | RETURN_NOT_OK(pgconn.Execute("SET ysql_upgrade_mode TO true;")); |
243 | | |
244 | | // Force global transactions when running upgrade to avoid transactions being run as local. |
245 | | // This can be removed once #11731 is resolved. |
246 | 24 | RETURN_NOT_OK(pgconn.Execute("SET force_global_transaction TO true;")); |
247 | | |
248 | 24 | return pgconn; |
249 | 24 | } |
250 | | |
251 | 4 | Status YsqlUpgradeHelper::Upgrade() { |
252 | 4 | RETURN_NOT_OK(AnalyzeMigrationFiles()); |
253 | 4 | LOG(INFO) << "Latest version defined in migrations is " << latest_version_; |
254 | | |
255 | 4 | std::vector<DatabaseEntry> databases; |
256 | | |
257 | 4 | { |
258 | 4 | PGConn t1_pgconn = VERIFY_RESULT(Connect("template1")); |
259 | | |
260 | | // Place template databases to be processed first. |
261 | 0 | std::vector<std::string> db_names{"template1", "template0"}; |
262 | | |
263 | | // Fetch databases list |
264 | 4 | { |
265 | 4 | const std::string query_str("SELECT datname FROM pg_database" |
266 | 4 | " WHERE datname NOT IN ('template0', 'template1');"); |
267 | 4 | pgwrapper::PGResultPtr res = VERIFY_RESULT(t1_pgconn.Fetch(query_str)); |
268 | 20 | for (int i = 0; i < PQntuples(res.get()); i++16 ) { |
269 | 16 | db_names.emplace_back(VERIFY_RESULT(pgwrapper::GetString(res.get(), i, 0))); |
270 | 16 | } |
271 | 4 | } |
272 | | |
273 | 24 | for (const auto& db : db_names)4 { |
274 | 24 | LOG(INFO) << "Determining a YSQL version for DB " << db; |
275 | 24 | PGConn pgconn = db == "template1" ? std::move(t1_pgconn)4 : VERIFY_RESULT(Connect(db)); |
276 | | |
277 | 24 | Version current_version = VERIFY_RESULT(DetermineAndSetVersion(&pgconn)); |
278 | 24 | if (current_version.first >= kCatalogVersionMigrationNumber) { |
279 | 12 | catalog_version_migration_applied_ = true; |
280 | 12 | } |
281 | 24 | databases.emplace_back(db, std::move(pgconn), current_version); |
282 | 24 | } |
283 | 4 | } |
284 | | |
285 | 217 | while (4 true) { |
286 | 217 | DatabaseEntry* min_version_entry = |
287 | 217 | &*std::min_element(databases.begin(), databases.end(), |
288 | 1.08k | [](const DatabaseEntry& db1, const DatabaseEntry& db2) { |
289 | 1.08k | return std::get<2>(db1) < std::get<2>(db2); |
290 | 1.08k | }); |
291 | | |
292 | 217 | auto& min_version = std::get<2>(*min_version_entry); |
293 | 217 | if (min_version >= latest_version_) { |
294 | 4 | LOG(INFO) << "Minimum version is " << min_version |
295 | 4 | << " which is latest"; |
296 | 4 | break; |
297 | 4 | } |
298 | | |
299 | 213 | LOG(INFO) << "Minimum version is " << min_version |
300 | 213 | << " (database " << std::get<0>(*min_version_entry) << ")"; |
301 | | |
302 | 213 | RETURN_NOT_OK(MigrateOnce(min_version_entry)); |
303 | 213 | } |
304 | | |
305 | 4 | return Status::OK(); |
306 | 4 | } |
307 | | |
308 | 213 | Status YsqlUpgradeHelper::MigrateOnce(DatabaseEntry* db_entry) { |
309 | 213 | const std::string& db_name = std::get<0>(*db_entry); |
310 | 213 | auto& pgconn = std::get<1>(*db_entry); |
311 | 213 | const auto& version = std::get<2>(*db_entry); |
312 | | |
313 | 213 | const auto& next_migration = std::find_if(migration_filenames_map_.begin(), |
314 | 213 | migration_filenames_map_.end(), |
315 | 1.95k | [version](const std::pair<Version, std::string>& e) { |
316 | 1.95k | return e.first > version; |
317 | 1.95k | }); |
318 | 213 | SCHECK(next_migration != migration_filenames_map_.end(), |
319 | 213 | InternalError, |
320 | 213 | Format("Migration following $0.$1 is not found!", version.first, version.second)); |
321 | 213 | const auto& next_version = next_migration->first; |
322 | 213 | const auto& next_migration_filename = next_migration->second; |
323 | | |
324 | 213 | faststring migration_content; |
325 | 213 | RETURN_NOT_OK_PREPEND(ReadFileToString(Env::Default(), |
326 | 213 | JoinPathSegments(migrations_dir_, next_migration_filename), |
327 | 213 | &migration_content), |
328 | 213 | Format("Failed to read migration '$0'", next_migration_filename)); |
329 | | |
330 | 213 | LOG(INFO) << db_name << ": applying migration '" << next_migration_filename << "'"; |
331 | | |
332 | | // Note that underlying PQexec executes mutiple statements transactionally, where our usual ACID |
333 | | // guarantees apply. |
334 | | // Migrations may override that using BEGIN/COMMIT statements - this will split a singular |
335 | | // implicit transaction onto several explicit ones. |
336 | 213 | RETURN_NOT_OK_PREPEND(pgconn.Execute(migration_content.ToString(), |
337 | 213 | false /* show_query_in_error */), |
338 | 213 | Format("Failed to apply migration '$0' to a database $1", |
339 | 213 | next_migration_filename, |
340 | 213 | db_name)); |
341 | | |
342 | | // Wait for the new Catalog Version to be propagated to tserver through heartbeat. |
343 | | // This can only happen once, when the table is introduced in the first migration. |
344 | | // Sleep here isn't guaranteed to work (see #6238), failure to propagate a catalog version |
345 | | // would lead to Catalog Version Mismatch error fixed by retrial. |
346 | 213 | if (!catalog_version_migration_applied_) { |
347 | 2 | SleepFor(MonoDelta::FromMilliseconds(2 * heartbeat_interval_ms_)); |
348 | 2 | catalog_version_migration_applied_ = true; |
349 | 2 | } |
350 | | |
351 | 213 | RETURN_NOT_OK_PREPEND( |
352 | 213 | pgconn.ExecuteFormat( |
353 | 213 | WrapSystemDml( |
354 | 213 | "INSERT INTO pg_catalog.pg_yb_migration (major, minor, name, time_applied) " |
355 | 213 | " VALUES ($0, $1, '$2', ROUND(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000));"), |
356 | 213 | next_version.first, next_version.second, next_migration_filename), |
357 | 213 | Format("Failed to bump pg_yb_migration to $0.$1 in database $2", |
358 | 213 | next_version.first, next_version.second, db_name)); |
359 | 213 | std::get<2>(*db_entry) = next_version; |
360 | 213 | LOG(INFO) << db_name << ": migration successfully applied, version bumped to " << next_version; |
361 | | |
362 | 213 | return Status::OK(); |
363 | 213 | } |
364 | | |
365 | | } // namespace pgwrapper |
366 | | } // namespace yb |