YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/yql/cql/cqlserver/system_query_cache.cc
Line
Count
Source (jump to first uncovered line)
1
//--------------------------------------------------------------------------------------------------
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
// SystemQueryCache proactively caches system queries to be used by
16
// CQLProcessors.  This helps with performance and availability, since masters
17
// have reduced traffic when a connection is established.
18
//--------------------------------------------------------------------------------------------------
19
20
#include "yb/yql/cql/cqlserver/system_query_cache.h"
21
22
#include <algorithm>
23
#include <condition_variable>
24
#include <mutex>
25
#include <unordered_map>
26
27
#include <boost/optional.hpp>
28
29
#include "yb/common/ql_rowblock.h"
30
31
#include "yb/gutil/bind.h"
32
33
#include "yb/rpc/io_thread_pool.h"
34
#include "yb/rpc/scheduler.h"
35
36
#include "yb/util/async_util.h"
37
#include "yb/util/format.h"
38
#include "yb/util/monotime.h"
39
#include "yb/util/result.h"
40
#include "yb/util/string_util.h"
41
42
#include "yb/yql/cql/cqlserver/cql_processor.h"
43
#include "yb/yql/cql/cqlserver/cql_service.h"
44
#include "yb/yql/cql/ql/util/statement_params.h"
45
#include "yb/yql/cql/ql/util/statement_result.h"
46
47
typedef struct QualifiedTable {
48
  std::string keyspace;
49
  std::string table;
50
51
0
  QualifiedTable(std::string keyspace_, std::string table_) : keyspace(keyspace_), table(table_) {
52
0
  }
53
} QualifiedTable;
54
55
static bool parse_tables(
56
    const std::string& text,
57
8.56k
    std::vector<QualifiedTable>* tables) {
58
8.56k
  auto raw_tables = yb::StringSplit(text, ';');
59
8.56k
  raw_tables.erase(std::remove_if(raw_tables.begin(), raw_tables.end(),
60
0
      [](const std::string& s) { return s.length() == 0; }), raw_tables.end());
61
62
0
  for (const auto& raw : raw_tables) {
63
0
    auto table_pair = yb::StringSplit(raw, '.');
64
65
0
    if (table_pair.size() != 2) {
66
0
      return false;
67
0
    }
68
0
    tables->push_back(QualifiedTable(table_pair[0], table_pair[1]));
69
0
  }
70
71
8.56k
  return true;
72
8.56k
}
73
74
5.53k
static bool validate_tables(const char* flagname, const std::string& value) {
75
5.53k
  std::vector<QualifiedTable> tables;
76
77
5.53k
  if (parse_tables(value, &tables)) {
78
5.53k
    return true;
79
5.53k
  }
80
0
  printf("Invalid value for --%s: %s\n", flagname, value.c_str());
81
0
  return false;
82
0
}
83
84
DEFINE_int32(cql_update_system_query_cache_msecs, 0,
85
             "How often the system query cache should be updated. <= 0 disables caching.");
86
DEFINE_int32(cql_system_query_cache_stale_msecs, 60000,
87
             "Maximum permitted staleness for the system query cache. "
88
             "<= 0 permits infinite staleness.");
89
DEFINE_string(cql_system_query_cache_tables, "",
90
    "Tables to cache connection data for. Entries are semicolon-delimited, in the "
91
    "format <keyspace>.<table>.");
92
__attribute__((unused))
93
DEFINE_validator(cql_system_query_cache_tables, &validate_tables);
94
DEFINE_bool(cql_system_query_cache_empty_responses, true,
95
            "Whether to cache empty responses from the master.");
96
97
namespace yb {
98
namespace cqlserver {
99
100
using ql::RowsResult;
101
using ql::ExecutedResult;
102
103
// TODO: Possibly do a case-insensitive string comparison.  This may be easier
104
// said than done, since capitalization does matter for comparisons in WHERE
105
// clauses, etc.
106
const char* SYSTEM_QUERIES[] = {
107
  "SELECT * FROM system.peers",
108
  "SELECT peer, rpc_address, schema_version FROM system.peers",
109
  "SELECT peer, data_center, rack, release_version, rpc_address FROM system.peers",
110
  "SELECT peer, data_center, rack, release_version, rpc_address, tokens FROM system.peers",
111
  "SELECT data_center, rack, release_version FROM system.local WHERE key='local'",
112
  ("SELECT data_center, rack, release_version, partitioner, tokens FROM "
113
       "system.local WHERE key='local'"),
114
  "SELECT keyspace_name, table_name, start_key, end_key, replica_addresses FROM system.partitions",
115
116
  "SELECT * FROM system.local WHERE key='local'",
117
  "SELECT schema_version FROM system.local WHERE key='local'",
118
  // The client evidently doesn't always have consistent formatting.
119
  "select * from system.local where key = 'local'",
120
  "select schema_version from system.local where key='local'",
121
  "SELECT * FROM system_schema.keyspaces",
122
  "SELECT * FROM system_schema.tables",
123
  "SELECT * FROM system_schema.views",
124
  "SELECT * FROM system_schema.columns",
125
  "SELECT * FROM system_schema.types",
126
  "SELECT * FROM system_schema.functions",
127
  "SELECT * FROM system_schema.aggregates",
128
  "SELECT * FROM system_schema.triggers",
129
  "SELECT * FROM system_schema.indexes",
130
};
131
132
SystemQueryCache::SystemQueryCache(cqlserver::CQLServiceImpl* service_impl)
133
3.03k
  : service_impl_(service_impl), stmt_params_() {
134
135
3.03k
  cache_ = std::make_unique<std::unordered_map<std::string, RowsResult::SharedPtr>>();
136
3.03k
  last_updated_ = MonoTime::kMin;
137
3.03k
  InitializeQueries();
138
139
3.03k
  pool_ = std::make_unique<yb::rpc::IoThreadPool>("system_query_cache_updater", 1);
140
3.03k
  scheduler_ = std::make_unique<yb::rpc::Scheduler>(&pool_->io_service());
141
142
3.03k
  LOG(INFO) << "Created system query cache updater.";
143
144
3.03k
  if (FLAGS_cql_update_system_query_cache_msecs > 0) {
145
3.03k
    if (MonoDelta::FromMilliseconds(FLAGS_cql_system_query_cache_stale_msecs) <
146
0
        MonoDelta::FromMilliseconds(FLAGS_cql_update_system_query_cache_msecs)) {
147
0
      LOG(WARNING) << "Stale expiration shorter than update rate.";
148
0
    }
149
150
3.03k
    ScheduleRefreshCache(false /* now */);
151
0
  } else {
152
0
    LOG(WARNING) << "System cache created with nonpositive timeout. Disabling scheduling";
153
0
  }
154
3.03k
}
155
156
0
SystemQueryCache::~SystemQueryCache() {
157
0
  if (pool_) {
158
0
    scheduler_->Shutdown();
159
0
    pool_->Shutdown();
160
0
    pool_->Join();
161
0
  }
162
0
}
163
164
3.03k
void SystemQueryCache::InitializeQueries() {
165
60.6k
  for (auto query : SYSTEM_QUERIES) {
166
60.6k
    queries_.push_back(query);
167
60.6k
  }
168
169
3.03k
  std::vector<QualifiedTable> table_pairs;
170
  // This should have been caught by the flag validator
171
3.03k
  if (!parse_tables(FLAGS_cql_system_query_cache_tables, &table_pairs)) {
172
0
    return;
173
0
  }
174
175
3.03k
  const char* formats[] = {
176
3.03k
    "SELECT * FROM system_schema.tables WHERE keyspace_name = '$0' AND table_name = '$1'",
177
3.03k
    "SELECT * FROM system_schema.columns WHERE keyspace_name = '$0' AND table_name = '$1'",
178
3.03k
    "SELECT * FROM system_schema.triggers WHERE keyspace_name = '$0' AND table_name = '$1'",
179
3.03k
    "SELECT * FROM system_schema.indexes WHERE keyspace_name = '$0' AND table_name = '$1'",
180
3.03k
    "SELECT * FROM system_schema.views WHERE keyspace_name = '$0' AND view_name = '$1'",
181
3.03k
  };
182
183
0
  for (auto pair : table_pairs) {
184
0
    for (auto format : formats) {
185
0
      queries_.push_back(yb::Format(format, pair.keyspace, pair.table));
186
0
    }
187
0
  }
188
189
3.03k
}
190
191
148k
boost::optional<RowsResult::SharedPtr> SystemQueryCache::Lookup(const std::string& query) {
192
148k
  if (FLAGS_cql_system_query_cache_stale_msecs > 0 &&
193
148k
      GetStaleness() > MonoDelta::FromMilliseconds(FLAGS_cql_system_query_cache_stale_msecs)) {
194
66.4k
    return boost::none;
195
66.4k
  }
196
82.2k
  const std::lock_guard<std::mutex> l(cache_mutex_);
197
198
82.2k
  const auto it = cache_->find(query);
199
82.2k
  if (it == cache_->end()) {
200
73.0k
    return boost::none;
201
9.21k
  } else {
202
9.21k
    return it->second;
203
9.21k
  }
204
82.2k
}
205
206
148k
MonoDelta SystemQueryCache::GetStaleness() {
207
148k
  const std::lock_guard<std::mutex> l(cache_mutex_);
208
148k
  return MonoTime::Now() - last_updated_;
209
148k
}
210
211
9.12k
void SystemQueryCache::RefreshCache() {
212
0
  VLOG(1) << "Refreshing system query cache";
213
9.12k
  auto new_cache = std::make_unique<std::unordered_map<std::string, RowsResult::SharedPtr>>();
214
181k
  for (auto query : queries_) {
215
181k
    Status status;
216
181k
    ExecutedResult::SharedPtr result;
217
181k
    ExecuteSync(query, &status, &result);
218
219
181k
    if (status.ok()) {
220
181k
      auto rows_result = std::dynamic_pointer_cast<RowsResult>(result);
221
181k
      if (FLAGS_cql_system_query_cache_empty_responses ||
222
181k
          rows_result->GetRowBlock()->row_count() > 0) {
223
129k
        (*new_cache)[query] = rows_result;
224
52.3k
      } else {
225
52.3k
        LOG(INFO) << "Skipping empty result for statement: " << query;
226
52.3k
      }
227
54
    } else {
228
54
      LOG(WARNING) << "Could not execute statement: " << query << "; status: " << status.ToString();
229
      // We don't want to update the cache with no data; instead we'll let the
230
      // stale cache persist.
231
54
      ScheduleRefreshCache(false /* now */);
232
54
      return;
233
54
    }
234
181k
  }
235
236
9.07k
  {
237
9.07k
    const std::lock_guard<std::mutex> l(cache_mutex_);
238
9.07k
    cache_ = std::move(new_cache);
239
9.07k
    last_updated_ = MonoTime::Now();
240
9.07k
  }
241
242
9.07k
  ScheduleRefreshCache(false /* now */);
243
9.07k
}
244
245
12.1k
void SystemQueryCache::ScheduleRefreshCache(bool now) {
246
12.1k
  DCHECK(pool_);
247
12.1k
  DCHECK(scheduler_);
248
0
  VLOG(1) << "Scheduling cache refresh";
249
250
9.12k
  scheduler_->Schedule([this](const Status &s) {
251
9.12k
      if (!s.ok()) {
252
0
        LOG(INFO) << "System cache updater scheduler was shutdown: " << s.ToString();
253
0
        return;
254
0
      }
255
9.12k
      this->RefreshCache();
256
12.1k
      }, std::chrono::milliseconds(now ? 0 : FLAGS_cql_update_system_query_cache_msecs));
257
12.1k
}
258
259
void SystemQueryCache::ExecuteSync(const std::string& stmt, Status* status,
260
181k
    ExecutedResult::SharedPtr* result_ptr) {
261
181k
  const auto processor = service_impl_->GetProcessor();
262
181k
  if (!processor.ok()) {
263
0
    LOG(ERROR) << "Unable to get CQLProcessor for system query cache";
264
0
    *status = processor.status();
265
0
    return;
266
0
  }
267
268
181k
  Synchronizer sync;
269
181k
  const auto callback = [](Synchronizer* sync, ExecutedResult::SharedPtr* result_ptr,
270
181k
      const Status& status, const ExecutedResult::SharedPtr& result) {
271
181k
    *result_ptr = result;
272
181k
    sync->StatusCB(status);
273
181k
  };
274
275
181k
  (*processor)->RunAsync(stmt, stmt_params_, yb::Bind(+callback, &sync, result_ptr));
276
181k
  *status = sync.Wait();
277
181k
  (*processor)->Release();
278
181k
}
279
280
} // namespace cqlserver
281
} // namespace yb