YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/yql/pgwrapper/ysql_upgrade.cc
Line
Count
Source (jump to first uncovered line)
1
//--------------------------------------------------------------------------------------------------
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//--------------------------------------------------------------------------------------------------
14
15
#include "yb/yql/pgwrapper/ysql_upgrade.h"
16
17
#include <regex>
18
19
#include <boost/algorithm/string.hpp>
20
21
#include "server/catalog/pg_yb_migration_d.h"
22
23
#include "yb/util/env_util.h"
24
#include "yb/util/format.h"
25
#include "yb/util/path_util.h"
26
#include "yb/util/pg_util.h"
27
28
namespace yb {
29
namespace pgwrapper {
30
31
namespace {
32
33
const int kCatalogVersionMigrationNumber = 1;
34
35
const char* kStaticDataParentDir = "share";
36
const char* kMigrationsDir = "ysql_migrations";
37
38
458
std::ostream& operator<<(std::ostream& os, const Version& v) {
39
458
  os << v.first << "." << v.second;
40
458
  return os;
41
458
}
42
43
Result<int64_t> SelectCountStar(PGConn* pgconn,
44
                                const std::string& table_name,
45
45
                                const std::string& where_clause = "") {
46
45
  auto query_str = Format("SELECT COUNT(*) FROM $0$1",
47
45
                          table_name,
48
45
                          where_clause == "" ? 
""1
:
Format(" WHERE $0", where_clause)44
);
49
45
  auto res = VERIFY_RESULT(pgconn->Fetch(query_str));
50
45
  SCHECK(PQntuples(res.get()) == 1, InternalError,
51
45
         Format("Query $0 was expected to return a single row", query_str));
52
45
  return pgwrapper::GetInt64(res.get(), 0, 0);
53
45
}
54
55
39
Result<bool> SystemTableExists(PGConn* pgconn, const std::string& table_name) {
56
39
  auto where_clause = Format("relname = '$0' AND relnamespace = 'pg_catalog'::regnamespace",
57
39
                             table_name);
58
39
  return VERIFY_RESULT(SelectCountStar(pgconn, "pg_class", where_clause)) == 1;
59
39
}
60
61
// Verify that system table exists and is not empty.
62
13
Result<bool> SystemTableHasRows(PGConn* pgconn, const std::string& table_name) {
63
13
  if (!VERIFY_RESULT(SystemTableExists(pgconn, table_name)))
64
12
    return false;
65
1
  return VERIFY_RESULT(SelectCountStar(pgconn, table_name)) > 0;
66
1
}
67
68
5
Result<bool> FunctionExists(PGConn* pgconn, const std::string& function_name) {
69
5
  auto where_clause = Format("proname = '$0'", function_name);
70
5
  return VERIFY_RESULT(SelectCountStar(pgconn, "pg_proc", where_clause)) == 1;
71
5
}
72
73
226
std::string WrapSystemDml(const std::string& query) {
74
226
  return "SET LOCAL yb_non_ddl_txn_for_sys_tables_allowed TO true;\n" + query;
75
226
}
76
77
// Analyze pg_catalog state of a database to determine a current major version of a catalog state
78
// by checking presence of catalog changing features released before the migrations feature landed.
79
// 0 means that no migrations were applied yet.
80
13
Result<int> GetMajorVersionFromSystemCatalogState(PGConn* pgconn) {
81
13
  int major_version = 0;
82
83
  // Helper macro removing boilerplate.
84
13
#define INCREMENT_VERSION_OR_RETURN_IT(oneliner_with_result) \
85
20
    if (VERIFY_RESULT(oneliner_with_result)) { \
86
8
      ++major_version; \
87
12
    } else { \
88
12
      return major_version; \
89
12
    }
90
91
  // V1: #3979 introducing pg_yb_catalog_version table.
92
13
  INCREMENT_VERSION_OR_RETURN_IT(SystemTableHasRows(pgconn, "pg_yb_catalog_version"))
93
94
  // V2: #4525 which creates pg_tablegroup.
95
1
  INCREMENT_VERSION_OR_RETURN_IT(SystemTableExists(pgconn, "pg_tablegroup"))
96
97
  // V3: #5478 installing pg_stat_statements.
98
1
  INCREMENT_VERSION_OR_RETURN_IT(SystemTableExists(pgconn, "pg_stat_statements"))
99
100
  // V4: #5408 introducing a bunch of JSONB functions.
101
1
  INCREMENT_VERSION_OR_RETURN_IT(FunctionExists(pgconn, "jsonb_path_query"))
102
103
  // V5: #6509 introducing yb_getrusage and yb_mem_usage* functions.
104
1
  INCREMENT_VERSION_OR_RETURN_IT(FunctionExists(pgconn, "yb_getrusage"))
105
106
  // V6: #7879 introducing yb_servers function.
107
1
  INCREMENT_VERSION_OR_RETURN_IT(FunctionExists(pgconn, "yb_servers"))
108
109
  // V7: #8719 introducing yb_hash_code function.
110
1
  INCREMENT_VERSION_OR_RETURN_IT(FunctionExists(pgconn, "yb_hash_code"))
111
112
  // V8: #7850 introducing ybgin access method.
113
1
  INCREMENT_VERSION_OR_RETURN_IT(FunctionExists(pgconn, "ybginhandler"))
114
115
1
  return major_version;
116
1
}
117
118
// Create a pg_yb_migration if it doesn't exist yet.
119
// Returns true if the table was created or false if it was present already.
120
24
Result<bool> CreateMigrationTableIfNotExist(PGConn* pgconn) {
121
24
  if (VERIFY_RESULT(SystemTableExists(pgconn, "pg_yb_migration"))) {
122
12
    LOG(INFO) << "pg_yb_migration table is present";
123
12
    return false;
124
12
  }
125
126
12
  const std::string query_str(
127
12
      "CREATE TABLE pg_catalog.pg_yb_migration ("
128
12
      "  major        int    NOT NULL,"
129
12
      "  minor        int    NOT NULL,"
130
12
      "  name         name   NOT NULL,"
131
12
      "  time_applied bigint"
132
12
      ") WITH (table_oid = $0, row_type_oid = $1);");
133
12
  RETURN_NOT_OK(pgconn->ExecuteFormat(query_str,
134
12
                                      YBMigrationRelationId,
135
12
                                      YBMigrationRelation_Rowtype_Id));
136
12
  LOG(INFO) << "pg_yb_migration table was created";
137
12
  return true;
138
12
}
139
140
// Determine a YSQL version of a given database and make sure it's recorded in pg_yb_migration.
141
// Creates a pg_yb_migration if it doesn't yet exist.
142
24
Result<Version> DetermineAndSetVersion(PGConn* pgconn) {
143
24
  bool table_created = VERIFY_RESULT(CreateMigrationTableIfNotExist(pgconn));
144
145
  // If pg_yb_migration was present before and has values, that's our version.
146
24
  if (!table_created) {
147
12
    const std::string query_str(
148
12
        "SELECT major, minor FROM pg_catalog.pg_yb_migration"
149
12
        "  ORDER BY major DESC, minor DESC"
150
12
        "  LIMIT 1");
151
12
    pgwrapper::PGResultPtr res = VERIFY_RESULT(pgconn->Fetch(query_str));
152
12
    if (PQntuples(res.get()) == 1) {
153
11
      int major_version = VERIFY_RESULT(pgwrapper::GetInt32(res.get(), 0, 0));
154
11
      int minor_version = VERIFY_RESULT(pgwrapper::GetInt32(res.get(), 0, 1));
155
0
      Version ver(major_version, minor_version);
156
11
      LOG(INFO) << "Version is " << ver;
157
11
      return ver;
158
11
    }
159
12
  }
160
161
13
  int major_version = VERIFY_RESULT(GetMajorVersionFromSystemCatalogState(pgconn));
162
0
  const std::string query_str(
163
13
      "INSERT INTO pg_catalog.pg_yb_migration (major, minor, name, time_applied)"
164
13
      "  VALUES ($0, 0, '<baseline>', NULL);");
165
13
  RETURN_NOT_OK(pgconn->ExecuteFormat(WrapSystemDml(query_str), major_version));
166
167
13
  Version ver(major_version, 0);
168
13
  LOG(INFO) << "Inserted a version " << ver;
169
13
  return ver;
170
13
}
171
172
80
bool IsNonSqlFile(const std::string& filename) {
173
80
  return !boost::algorithm::iends_with(filename, ".sql");
174
80
}
175
176
} // anonymous namespace
177
178
YsqlUpgradeHelper::YsqlUpgradeHelper(const HostPort& ysql_proxy_addr,
179
                                     uint64_t ysql_auth_key,
180
                                     uint32_t heartbeat_interval_ms)
181
    : ysql_proxy_addr_(ysql_proxy_addr),
182
      ysql_auth_key_(ysql_auth_key),
183
4
      heartbeat_interval_ms_(heartbeat_interval_ms) {
184
4
}
185
186
4
Status YsqlUpgradeHelper::AnalyzeMigrationFiles() {
187
4
  const std::string search_for_dir = JoinPathSegments(kStaticDataParentDir, kMigrationsDir);
188
4
  const std::string root_dir       = env_util::GetRootDir(search_for_dir);
189
4
  SCHECK(root_dir != "", InternalError,
190
4
         "Executable path not found");
191
4
  migrations_dir_ =
192
4
      JoinPathSegments(root_dir, kStaticDataParentDir, kMigrationsDir);
193
4
  auto* env = Env::Default();
194
4
  SCHECK(env->DirExists(migrations_dir_), InternalError,
195
4
         "Migrations directory not found");
196
197
4
  migration_filenames_map_.clear();
198
4
  std::vector<std::string> migration_filenames;
199
4
  RETURN_NOT_OK(env->GetChildren(migrations_dir_, &migration_filenames));
200
201
  // Remove unrelated files.
202
4
  migration_filenames.erase(
203
4
      std::remove_if(migration_filenames.begin(), migration_filenames.end(), IsNonSqlFile),
204
4
      migration_filenames.end());
205
206
4
  SCHECK(migration_filenames.size() > 0, InternalError,
207
4
         "No migrations found!");
208
209
  // Check that all migrations conform to the naming schema.
210
4
  static const std::regex regex("V(\\d+)(\\.(\\d+))?__\\d+__[_0-9A-Za-z]+\\.sql");
211
4
  std::smatch version_match;
212
72
  for (size_t i = 0; i < migration_filenames.size(); 
++i68
) {
213
68
    const auto& filename = migration_filenames[i];
214
68
    SCHECK(std::regex_search(filename.begin(), filename.end(), version_match, regex),
215
68
           InternalError,
216
68
           Format("Migration '$0' does not conform to the filename pattern", filename));
217
68
    int major_version = std::stoi(version_match[1]);
218
68
    int minor_version = version_match[3].length() > 0 ? 
std::stoi(version_match[3])0
: 0;
219
68
    Version version{major_version, minor_version};
220
68
    migration_filenames_map_[version] = filename;
221
68
  }
222
223
4
  latest_version_ = std::prev(migration_filenames_map_.end())->first;
224
225
4
  return Status::OK();
226
4
}
227
228
24
Result<PGConn> YsqlUpgradeHelper::Connect(const std::string& database_name) {
229
  // Construct connection string.  Note that the plain password in the connection string will be
230
  // sent over the wire, but since it only goes over a unix-domain socket, there should be no
231
  // eavesdropping/tampering issues.
232
24
  const std::string conn_str = Format(
233
24
      "user=$0 password=$1 host=$2 port=$3 dbname=$4",
234
24
      "postgres",
235
24
      ysql_auth_key_,
236
24
      PgDeriveSocketDir(ysql_proxy_addr_.host()),
237
24
      ysql_proxy_addr_.port(),
238
24
      pgwrapper::PqEscapeLiteral(database_name));
239
240
24
  PGConn pgconn = VERIFY_RESULT(PGConn::Connect(conn_str));
241
242
24
  RETURN_NOT_OK(pgconn.Execute("SET ysql_upgrade_mode TO true;"));
243
244
  // Force global transactions when running upgrade to avoid transactions being run as local.
245
  // This can be removed once #11731 is resolved.
246
24
  RETURN_NOT_OK(pgconn.Execute("SET force_global_transaction TO true;"));
247
248
24
  return pgconn;
249
24
}
250
251
4
Status YsqlUpgradeHelper::Upgrade() {
252
4
  RETURN_NOT_OK(AnalyzeMigrationFiles());
253
4
  LOG(INFO) << "Latest version defined in migrations is " << latest_version_;
254
255
4
  std::vector<DatabaseEntry> databases;
256
257
4
  {
258
4
    PGConn t1_pgconn = VERIFY_RESULT(Connect("template1"));
259
260
    // Place template databases to be processed first.
261
0
    std::vector<std::string> db_names{"template1", "template0"};
262
263
    // Fetch databases list
264
4
    {
265
4
      const std::string query_str("SELECT datname FROM pg_database"
266
4
                                  "  WHERE datname NOT IN ('template0', 'template1');");
267
4
      pgwrapper::PGResultPtr res = VERIFY_RESULT(t1_pgconn.Fetch(query_str));
268
20
      for (int i = 0; i < PQntuples(res.get()); 
i++16
) {
269
16
        db_names.emplace_back(VERIFY_RESULT(pgwrapper::GetString(res.get(), i, 0)));
270
16
      }
271
4
    }
272
273
24
    
for (const auto& db : db_names)4
{
274
24
      LOG(INFO) << "Determining a YSQL version for DB " << db;
275
24
      PGConn pgconn = db == "template1" ? 
std::move(t1_pgconn)4
: VERIFY_RESULT(Connect(db));
276
277
24
      Version current_version = VERIFY_RESULT(DetermineAndSetVersion(&pgconn));
278
24
      if (current_version.first >= kCatalogVersionMigrationNumber) {
279
12
        catalog_version_migration_applied_ = true;
280
12
      }
281
24
      databases.emplace_back(db, std::move(pgconn), current_version);
282
24
    }
283
4
  }
284
285
217
  
while (4
true) {
286
217
    DatabaseEntry* min_version_entry =
287
217
        &*std::min_element(databases.begin(), databases.end(),
288
1.08k
                           [](const DatabaseEntry& db1, const DatabaseEntry& db2) {
289
1.08k
                             return std::get<2>(db1) < std::get<2>(db2);
290
1.08k
                           });
291
292
217
    auto& min_version = std::get<2>(*min_version_entry);
293
217
    if (min_version >= latest_version_) {
294
4
      LOG(INFO) << "Minimum version is " << min_version
295
4
                << " which is latest";
296
4
      break;
297
4
    }
298
299
213
    LOG(INFO) << "Minimum version is " << min_version
300
213
              << " (database " << std::get<0>(*min_version_entry) << ")";
301
302
213
    RETURN_NOT_OK(MigrateOnce(min_version_entry));
303
213
  }
304
305
4
  return Status::OK();
306
4
}
307
308
213
Status YsqlUpgradeHelper::MigrateOnce(DatabaseEntry* db_entry) {
309
213
  const std::string& db_name = std::get<0>(*db_entry);
310
213
  auto& pgconn = std::get<1>(*db_entry);
311
213
  const auto& version = std::get<2>(*db_entry);
312
313
213
  const auto& next_migration = std::find_if(migration_filenames_map_.begin(),
314
213
                                            migration_filenames_map_.end(),
315
1.95k
                                            [version](const std::pair<Version, std::string>& e) {
316
1.95k
                                              return e.first > version;
317
1.95k
                                            });
318
213
  SCHECK(next_migration != migration_filenames_map_.end(),
319
213
         InternalError,
320
213
         Format("Migration following $0.$1 is not found!", version.first, version.second));
321
213
  const auto& next_version = next_migration->first;
322
213
  const auto& next_migration_filename = next_migration->second;
323
324
213
  faststring migration_content;
325
213
  RETURN_NOT_OK_PREPEND(ReadFileToString(Env::Default(),
326
213
                                         JoinPathSegments(migrations_dir_, next_migration_filename),
327
213
                                         &migration_content),
328
213
                        Format("Failed to read migration '$0'", next_migration_filename));
329
330
213
  LOG(INFO) << db_name << ": applying migration '" << next_migration_filename << "'";
331
332
  // Note that underlying PQexec executes mutiple statements transactionally, where our usual ACID
333
  // guarantees apply.
334
  // Migrations may override that using BEGIN/COMMIT statements - this will split a singular
335
  // implicit transaction onto several explicit ones.
336
213
  RETURN_NOT_OK_PREPEND(pgconn.Execute(migration_content.ToString(),
337
213
                                       false /* show_query_in_error */),
338
213
                        Format("Failed to apply migration '$0' to a database $1",
339
213
                               next_migration_filename,
340
213
                               db_name));
341
342
  // Wait for the new Catalog Version to be propagated to tserver through heartbeat.
343
  // This can only happen once, when the table is introduced in the first migration.
344
  // Sleep here isn't guaranteed to work (see #6238), failure to propagate a catalog version
345
  // would lead to Catalog Version Mismatch error fixed by retrial.
346
213
  if (!catalog_version_migration_applied_) {
347
2
    SleepFor(MonoDelta::FromMilliseconds(2 * heartbeat_interval_ms_));
348
2
    catalog_version_migration_applied_ = true;
349
2
  }
350
351
213
  RETURN_NOT_OK_PREPEND(
352
213
      pgconn.ExecuteFormat(
353
213
          WrapSystemDml(
354
213
              "INSERT INTO pg_catalog.pg_yb_migration (major, minor, name, time_applied) "
355
213
              "  VALUES ($0, $1, '$2', ROUND(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000));"),
356
213
          next_version.first, next_version.second, next_migration_filename),
357
213
      Format("Failed to bump pg_yb_migration to $0.$1 in database $2",
358
213
             next_version.first, next_version.second, db_name));
359
213
  std::get<2>(*db_entry) = next_version;
360
213
  LOG(INFO) << db_name << ": migration successfully applied, version bumped to " << next_version;
361
362
213
  return Status::OK();
363
213
}
364
365
}  // namespace pgwrapper
366
}  // namespace yb