YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/yql/pgwrapper/ysql_upgrade.cc
Line
Count
Source (jump to first uncovered line)
1
//--------------------------------------------------------------------------------------------------
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//--------------------------------------------------------------------------------------------------
14
15
#include "yb/yql/pgwrapper/ysql_upgrade.h"
16
17
#include <regex>
18
19
#include <boost/algorithm/string.hpp>
20
21
#include "server/catalog/pg_yb_migration_d.h"
22
23
#include "yb/util/env_util.h"
24
#include "yb/util/format.h"
25
#include "yb/util/path_util.h"
26
#include "yb/util/pg_util.h"
27
28
namespace yb {
29
namespace pgwrapper {
30
31
namespace {
32
33
const int kCatalogVersionMigrationNumber = 1;
34
35
const char* kStaticDataParentDir = "share";
36
const char* kMigrationsDir = "ysql_migrations";
37
38
0
std::ostream& operator<<(std::ostream& os, const Version& v) {
39
0
  os << v.first << "." << v.second;
40
0
  return os;
41
0
}
42
43
Result<int64_t> SelectCountStar(PGConn* pgconn,
44
                                const std::string& table_name,
45
0
                                const std::string& where_clause = "") {
46
0
  auto query_str = Format("SELECT COUNT(*) FROM $0$1",
47
0
                          table_name,
48
0
                          where_clause == "" ? "" : Format(" WHERE $0", where_clause));
49
0
  auto res = VERIFY_RESULT(pgconn->Fetch(query_str));
50
0
  SCHECK(PQntuples(res.get()) == 1, InternalError,
51
0
         Format("Query $0 was expected to return a single row", query_str));
52
0
  return pgwrapper::GetInt64(res.get(), 0, 0);
53
0
}
54
55
0
Result<bool> SystemTableExists(PGConn* pgconn, const std::string& table_name) {
56
0
  auto where_clause = Format("relname = '$0' AND relnamespace = 'pg_catalog'::regnamespace",
57
0
                             table_name);
58
0
  return VERIFY_RESULT(SelectCountStar(pgconn, "pg_class", where_clause)) == 1;
59
0
}
60
61
// Verify that system table exists and is not empty.
62
0
Result<bool> SystemTableHasRows(PGConn* pgconn, const std::string& table_name) {
63
0
  if (!VERIFY_RESULT(SystemTableExists(pgconn, table_name)))
64
0
    return false;
65
0
  return VERIFY_RESULT(SelectCountStar(pgconn, table_name)) > 0;
66
0
}
67
68
0
Result<bool> FunctionExists(PGConn* pgconn, const std::string& function_name) {
69
0
  auto where_clause = Format("proname = '$0'", function_name);
70
0
  return VERIFY_RESULT(SelectCountStar(pgconn, "pg_proc", where_clause)) == 1;
71
0
}
72
73
0
std::string WrapSystemDml(const std::string& query) {
74
0
  return "SET LOCAL yb_non_ddl_txn_for_sys_tables_allowed TO true;\n" + query;
75
0
}
76
77
// Analyze pg_catalog state of a database to determine a current major version of a catalog state
78
// by checking presence of catalog changing features released before the migrations feature landed.
79
// 0 means that no migrations were applied yet.
80
0
Result<int> GetMajorVersionFromSystemCatalogState(PGConn* pgconn) {
81
0
  int major_version = 0;
82
83
  // Helper macro removing boilerplate.
84
0
#define INCREMENT_VERSION_OR_RETURN_IT(oneliner_with_result) \
85
0
    if (VERIFY_RESULT(oneliner_with_result)) { \
86
0
      ++major_version; \
87
0
    } else { \
88
0
      return major_version; \
89
0
    }
90
91
  // V1: #3979 introducing pg_yb_catalog_version table.
92
0
  INCREMENT_VERSION_OR_RETURN_IT(SystemTableHasRows(pgconn, "pg_yb_catalog_version"))
93
94
  // V2: #4525 which creates pg_tablegroup.
95
0
  INCREMENT_VERSION_OR_RETURN_IT(SystemTableExists(pgconn, "pg_tablegroup"))
96
97
  // V3: #5478 installing pg_stat_statements.
98
0
  INCREMENT_VERSION_OR_RETURN_IT(SystemTableExists(pgconn, "pg_stat_statements"))
99
100
  // V4: #5408 introducing a bunch of JSONB functions.
101
0
  INCREMENT_VERSION_OR_RETURN_IT(FunctionExists(pgconn, "jsonb_path_query"))
102
103
  // V5: #6509 introducing yb_getrusage and yb_mem_usage* functions.
104
0
  INCREMENT_VERSION_OR_RETURN_IT(FunctionExists(pgconn, "yb_getrusage"))
105
106
  // V6: #7879 introducing yb_servers function.
107
0
  INCREMENT_VERSION_OR_RETURN_IT(FunctionExists(pgconn, "yb_servers"))
108
109
  // V7: #8719 introducing yb_hash_code function.
110
0
  INCREMENT_VERSION_OR_RETURN_IT(FunctionExists(pgconn, "yb_hash_code"))
111
112
  // V8: #7850 introducing ybgin access method.
113
0
  INCREMENT_VERSION_OR_RETURN_IT(FunctionExists(pgconn, "ybginhandler"))
114
115
0
  return major_version;
116
0
}
117
118
// Create a pg_yb_migration if it doesn't exist yet.
119
// Returns true if the table was created or false if it was present already.
120
0
Result<bool> CreateMigrationTableIfNotExist(PGConn* pgconn) {
121
0
  if (VERIFY_RESULT(SystemTableExists(pgconn, "pg_yb_migration"))) {
122
0
    LOG(INFO) << "pg_yb_migration table is present";
123
0
    return false;
124
0
  }
125
126
0
  const std::string query_str(
127
0
      "CREATE TABLE pg_catalog.pg_yb_migration ("
128
0
      "  major        int    NOT NULL,"
129
0
      "  minor        int    NOT NULL,"
130
0
      "  name         name   NOT NULL,"
131
0
      "  time_applied bigint"
132
0
      ") WITH (table_oid = $0, row_type_oid = $1);");
133
0
  RETURN_NOT_OK(pgconn->ExecuteFormat(query_str,
134
0
                                      YBMigrationRelationId,
135
0
                                      YBMigrationRelation_Rowtype_Id));
136
0
  LOG(INFO) << "pg_yb_migration table was created";
137
0
  return true;
138
0
}
139
140
// Determine a YSQL version of a given database and make sure it's recorded in pg_yb_migration.
141
// Creates a pg_yb_migration if it doesn't yet exist.
142
0
Result<Version> DetermineAndSetVersion(PGConn* pgconn) {
143
0
  bool table_created = VERIFY_RESULT(CreateMigrationTableIfNotExist(pgconn));
144
145
  // If pg_yb_migration was present before and has values, that's our version.
146
0
  if (!table_created) {
147
0
    const std::string query_str(
148
0
        "SELECT major, minor FROM pg_catalog.pg_yb_migration"
149
0
        "  ORDER BY major DESC, minor DESC"
150
0
        "  LIMIT 1");
151
0
    pgwrapper::PGResultPtr res = VERIFY_RESULT(pgconn->Fetch(query_str));
152
0
    if (PQntuples(res.get()) == 1) {
153
0
      int major_version = VERIFY_RESULT(pgwrapper::GetInt32(res.get(), 0, 0));
154
0
      int minor_version = VERIFY_RESULT(pgwrapper::GetInt32(res.get(), 0, 1));
155
0
      Version ver(major_version, minor_version);
156
0
      LOG(INFO) << "Version is " << ver;
157
0
      return ver;
158
0
    }
159
0
  }
160
161
0
  int major_version = VERIFY_RESULT(GetMajorVersionFromSystemCatalogState(pgconn));
162
0
  const std::string query_str(
163
0
      "INSERT INTO pg_catalog.pg_yb_migration (major, minor, name, time_applied)"
164
0
      "  VALUES ($0, 0, '<baseline>', NULL);");
165
0
  RETURN_NOT_OK(pgconn->ExecuteFormat(WrapSystemDml(query_str), major_version));
166
167
0
  Version ver(major_version, 0);
168
0
  LOG(INFO) << "Inserted a version " << ver;
169
0
  return ver;
170
0
}
171
172
0
bool IsNonSqlFile(const std::string& filename) {
173
0
  return !boost::algorithm::iends_with(filename, ".sql");
174
0
}
175
176
} // anonymous namespace
177
178
YsqlUpgradeHelper::YsqlUpgradeHelper(const HostPort& ysql_proxy_addr,
179
                                     uint64_t ysql_auth_key,
180
                                     uint32_t heartbeat_interval_ms)
181
    : ysql_proxy_addr_(ysql_proxy_addr),
182
      ysql_auth_key_(ysql_auth_key),
183
0
      heartbeat_interval_ms_(heartbeat_interval_ms) {
184
0
}
185
186
0
Status YsqlUpgradeHelper::AnalyzeMigrationFiles() {
187
0
  const std::string search_for_dir = JoinPathSegments(kStaticDataParentDir, kMigrationsDir);
188
0
  const std::string root_dir       = env_util::GetRootDir(search_for_dir);
189
0
  SCHECK(root_dir != "", InternalError,
190
0
         "Executable path not found");
191
0
  migrations_dir_ =
192
0
      JoinPathSegments(root_dir, kStaticDataParentDir, kMigrationsDir);
193
0
  auto* env = Env::Default();
194
0
  SCHECK(env->DirExists(migrations_dir_), InternalError,
195
0
         "Migrations directory not found");
196
197
0
  migration_filenames_map_.clear();
198
0
  std::vector<std::string> migration_filenames;
199
0
  RETURN_NOT_OK(env->GetChildren(migrations_dir_, &migration_filenames));
200
201
  // Remove unrelated files.
202
0
  migration_filenames.erase(
203
0
      std::remove_if(migration_filenames.begin(), migration_filenames.end(), IsNonSqlFile),
204
0
      migration_filenames.end());
205
206
0
  SCHECK(migration_filenames.size() > 0, InternalError,
207
0
         "No migrations found!");
208
209
  // Check that all migrations conform to the naming schema.
210
0
  static const std::regex regex("V(\\d+)(\\.(\\d+))?__\\d+__[_0-9A-Za-z]+\\.sql");
211
0
  std::smatch version_match;
212
0
  for (size_t i = 0; i < migration_filenames.size(); ++i) {
213
0
    const auto& filename = migration_filenames[i];
214
0
    SCHECK(std::regex_search(filename.begin(), filename.end(), version_match, regex),
215
0
           InternalError,
216
0
           Format("Migration '$0' does not conform to the filename pattern", filename));
217
0
    int major_version = std::stoi(version_match[1]);
218
0
    int minor_version = version_match[3].length() > 0 ? std::stoi(version_match[3]) : 0;
219
0
    Version version{major_version, minor_version};
220
0
    migration_filenames_map_[version] = filename;
221
0
  }
222
223
0
  latest_version_ = std::prev(migration_filenames_map_.end())->first;
224
225
0
  return Status::OK();
226
0
}
227
228
0
Result<PGConn> YsqlUpgradeHelper::Connect(const std::string& database_name) {
229
  // Construct connection string.  Note that the plain password in the connection string will be
230
  // sent over the wire, but since it only goes over a unix-domain socket, there should be no
231
  // eavesdropping/tampering issues.
232
0
  const std::string conn_str = Format(
233
0
      "user=$0 password=$1 host=$2 port=$3 dbname=$4",
234
0
      "postgres",
235
0
      ysql_auth_key_,
236
0
      PgDeriveSocketDir(ysql_proxy_addr_.host()),
237
0
      ysql_proxy_addr_.port(),
238
0
      pgwrapper::PqEscapeLiteral(database_name));
239
240
0
  PGConn pgconn = VERIFY_RESULT(PGConn::Connect(conn_str));
241
242
0
  RETURN_NOT_OK(pgconn.Execute("SET ysql_upgrade_mode TO true;"));
243
244
0
  return pgconn;
245
0
}
246
247
0
Status YsqlUpgradeHelper::Upgrade() {
248
0
  RETURN_NOT_OK(AnalyzeMigrationFiles());
249
0
  LOG(INFO) << "Latest version defined in migrations is " << latest_version_;
250
251
0
  std::vector<DatabaseEntry> databases;
252
253
0
  {
254
0
    PGConn t1_pgconn = VERIFY_RESULT(Connect("template1"));
255
256
    // Place template databases to be processed first.
257
0
    std::vector<std::string> db_names{"template1", "template0"};
258
259
    // Fetch databases list
260
0
    {
261
0
      const std::string query_str("SELECT datname FROM pg_database"
262
0
                                  "  WHERE datname NOT IN ('template0', 'template1');");
263
0
      pgwrapper::PGResultPtr res = VERIFY_RESULT(t1_pgconn.Fetch(query_str));
264
0
      for (int i = 0; i < PQntuples(res.get()); i++) {
265
0
        db_names.emplace_back(VERIFY_RESULT(pgwrapper::GetString(res.get(), i, 0)));
266
0
      }
267
0
    }
268
269
0
    for (const auto& db : db_names) {
270
0
      LOG(INFO) << "Determining a YSQL version for DB " << db;
271
0
      PGConn pgconn = db == "template1" ? std::move(t1_pgconn) : VERIFY_RESULT(Connect(db));
272
273
0
      Version current_version = VERIFY_RESULT(DetermineAndSetVersion(&pgconn));
274
0
      if (current_version.first >= kCatalogVersionMigrationNumber) {
275
0
        catalog_version_migration_applied_ = true;
276
0
      }
277
0
      databases.emplace_back(db, std::move(pgconn), current_version);
278
0
    }
279
0
  }
280
281
0
  while (true) {
282
0
    DatabaseEntry* min_version_entry =
283
0
        &*std::min_element(databases.begin(), databases.end(),
284
0
                           [](const DatabaseEntry& db1, const DatabaseEntry& db2) {
285
0
                             return std::get<2>(db1) < std::get<2>(db2);
286
0
                           });
287
288
0
    auto& min_version = std::get<2>(*min_version_entry);
289
0
    if (min_version >= latest_version_) {
290
0
      LOG(INFO) << "Minimum version is " << min_version
291
0
                << " which is latest";
292
0
      break;
293
0
    }
294
295
0
    LOG(INFO) << "Minimum version is " << min_version
296
0
              << " (database " << std::get<0>(*min_version_entry) << ")";
297
298
0
    RETURN_NOT_OK(MigrateOnce(min_version_entry));
299
0
  }
300
301
0
  return Status::OK();
302
0
}
303
304
0
Status YsqlUpgradeHelper::MigrateOnce(DatabaseEntry* db_entry) {
305
0
  const std::string& db_name = std::get<0>(*db_entry);
306
0
  auto& pgconn = std::get<1>(*db_entry);
307
0
  const auto& version = std::get<2>(*db_entry);
308
309
0
  const auto& next_migration = std::find_if(migration_filenames_map_.begin(),
310
0
                                            migration_filenames_map_.end(),
311
0
                                            [version](const std::pair<Version, std::string>& e) {
312
0
                                              return e.first > version;
313
0
                                            });
314
0
  SCHECK(next_migration != migration_filenames_map_.end(),
315
0
         InternalError,
316
0
         Format("Migration following $0.$1 is not found!", version.first, version.second));
317
0
  const auto& next_version = next_migration->first;
318
0
  const auto& next_migration_filename = next_migration->second;
319
320
0
  faststring migration_content;
321
0
  RETURN_NOT_OK_PREPEND(ReadFileToString(Env::Default(),
322
0
                                         JoinPathSegments(migrations_dir_, next_migration_filename),
323
0
                                         &migration_content),
324
0
                        Format("Failed to read migration '$0'", next_migration_filename));
325
326
0
  LOG(INFO) << db_name << ": applying migration '" << next_migration_filename << "'";
327
328
  // Note that underlying PQexec executes mutiple statements transactionally, where our usual ACID
329
  // guarantees apply.
330
  // Migrations may override that using BEGIN/COMMIT statements - this will split a singular
331
  // implicit transaction onto several explicit ones.
332
0
  RETURN_NOT_OK_PREPEND(pgconn.Execute(migration_content.ToString(),
333
0
                                       false /* show_query_in_error */),
334
0
                        Format("Failed to apply migration '$0' to a database $1",
335
0
                               next_migration_filename,
336
0
                               db_name));
337
338
  // Wait for the new Catalog Version to be propagated to tserver through heartbeat.
339
  // This can only happen once, when the table is introduced in the first migration.
340
  // Sleep here isn't guaranteed to work (see #6238), failure to propagate a catalog version
341
  // would lead to Catalog Version Mismatch error fixed by retrial.
342
0
  if (!catalog_version_migration_applied_) {
343
0
    SleepFor(MonoDelta::FromMilliseconds(2 * heartbeat_interval_ms_));
344
0
    catalog_version_migration_applied_ = true;
345
0
  }
346
347
0
  RETURN_NOT_OK_PREPEND(
348
0
      pgconn.ExecuteFormat(
349
0
          WrapSystemDml(
350
0
              "INSERT INTO pg_catalog.pg_yb_migration (major, minor, name, time_applied) "
351
0
              "  VALUES ($0, $1, '$2', ROUND(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000));"),
352
0
          next_version.first, next_version.second, next_migration_filename),
353
0
      Format("Failed to bump pg_yb_migration to $0.$1 in database $2",
354
0
             next_version.first, next_version.second, db_name));
355
0
  std::get<2>(*db_entry) = next_version;
356
0
  LOG(INFO) << db_name << ": migration successfully applied, version bumped to " << next_version;
357
358
0
  return Status::OK();
359
0
}
360
361
}  // namespace pgwrapper
362
}  // namespace yb