/Users/deen/code/yugabyte-db/src/yb/yql/cql/cqlserver/system_query_cache.cc
Line | Count | Source (jump to first uncovered line) |
1 | | //-------------------------------------------------------------------------------------------------- |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | // |
15 | | // SystemQueryCache proactively caches system queries to be used by |
16 | | // CQLProcessors. This helps with performance and availability, since masters |
17 | | // have reduced traffic when a connection is established. |
18 | | //-------------------------------------------------------------------------------------------------- |
19 | | |
20 | | #include "yb/yql/cql/cqlserver/system_query_cache.h" |
21 | | |
22 | | #include <algorithm> |
23 | | #include <condition_variable> |
24 | | #include <mutex> |
25 | | #include <unordered_map> |
26 | | |
27 | | #include <boost/optional.hpp> |
28 | | |
29 | | #include "yb/common/ql_rowblock.h" |
30 | | |
31 | | #include "yb/gutil/bind.h" |
32 | | |
33 | | #include "yb/rpc/io_thread_pool.h" |
34 | | #include "yb/rpc/scheduler.h" |
35 | | |
36 | | #include "yb/util/async_util.h" |
37 | | #include "yb/util/format.h" |
38 | | #include "yb/util/monotime.h" |
39 | | #include "yb/util/result.h" |
40 | | #include "yb/util/string_util.h" |
41 | | |
42 | | #include "yb/yql/cql/cqlserver/cql_processor.h" |
43 | | #include "yb/yql/cql/cqlserver/cql_service.h" |
44 | | #include "yb/yql/cql/ql/util/statement_params.h" |
45 | | #include "yb/yql/cql/ql/util/statement_result.h" |
46 | | |
47 | | typedef struct QualifiedTable { |
48 | | std::string keyspace; |
49 | | std::string table; |
50 | | |
51 | 0 | QualifiedTable(std::string keyspace_, std::string table_) : keyspace(keyspace_), table(table_) { |
52 | 0 | } |
53 | | } QualifiedTable; |
54 | | |
55 | | static bool parse_tables( |
56 | | const std::string& text, |
57 | 8.56k | std::vector<QualifiedTable>* tables) { |
58 | 8.56k | auto raw_tables = yb::StringSplit(text, ';'); |
59 | 8.56k | raw_tables.erase(std::remove_if(raw_tables.begin(), raw_tables.end(), |
60 | 0 | [](const std::string& s) { return s.length() == 0; }), raw_tables.end()); |
61 | | |
62 | 0 | for (const auto& raw : raw_tables) { |
63 | 0 | auto table_pair = yb::StringSplit(raw, '.'); |
64 | |
|
65 | 0 | if (table_pair.size() != 2) { |
66 | 0 | return false; |
67 | 0 | } |
68 | 0 | tables->push_back(QualifiedTable(table_pair[0], table_pair[1])); |
69 | 0 | } |
70 | | |
71 | 8.56k | return true; |
72 | 8.56k | } |
73 | | |
74 | 5.53k | static bool validate_tables(const char* flagname, const std::string& value) { |
75 | 5.53k | std::vector<QualifiedTable> tables; |
76 | | |
77 | 5.53k | if (parse_tables(value, &tables)) { |
78 | 5.53k | return true; |
79 | 5.53k | } |
80 | 0 | printf("Invalid value for --%s: %s\n", flagname, value.c_str()); |
81 | 0 | return false; |
82 | 0 | } |
83 | | |
84 | | DEFINE_int32(cql_update_system_query_cache_msecs, 0, |
85 | | "How often the system query cache should be updated. <= 0 disables caching."); |
86 | | DEFINE_int32(cql_system_query_cache_stale_msecs, 60000, |
87 | | "Maximum permitted staleness for the system query cache. " |
88 | | "<= 0 permits infinite staleness."); |
89 | | DEFINE_string(cql_system_query_cache_tables, "", |
90 | | "Tables to cache connection data for. Entries are semicolon-delimited, in the " |
91 | | "format <keyspace>.<table>."); |
92 | | __attribute__((unused)) |
93 | | DEFINE_validator(cql_system_query_cache_tables, &validate_tables); |
94 | | DEFINE_bool(cql_system_query_cache_empty_responses, true, |
95 | | "Whether to cache empty responses from the master."); |
96 | | |
97 | | namespace yb { |
98 | | namespace cqlserver { |
99 | | |
100 | | using ql::RowsResult; |
101 | | using ql::ExecutedResult; |
102 | | |
103 | | // TODO: Possibly do a case-insensitive string comparison. This may be easier |
104 | | // said than done, since capitalization does matter for comparisons in WHERE |
105 | | // clauses, etc. |
106 | | const char* SYSTEM_QUERIES[] = { |
107 | | "SELECT * FROM system.peers", |
108 | | "SELECT peer, rpc_address, schema_version FROM system.peers", |
109 | | "SELECT peer, data_center, rack, release_version, rpc_address FROM system.peers", |
110 | | "SELECT peer, data_center, rack, release_version, rpc_address, tokens FROM system.peers", |
111 | | "SELECT data_center, rack, release_version FROM system.local WHERE key='local'", |
112 | | ("SELECT data_center, rack, release_version, partitioner, tokens FROM " |
113 | | "system.local WHERE key='local'"), |
114 | | "SELECT keyspace_name, table_name, start_key, end_key, replica_addresses FROM system.partitions", |
115 | | |
116 | | "SELECT * FROM system.local WHERE key='local'", |
117 | | "SELECT schema_version FROM system.local WHERE key='local'", |
118 | | // The client evidently doesn't always have consistent formatting. |
119 | | "select * from system.local where key = 'local'", |
120 | | "select schema_version from system.local where key='local'", |
121 | | "SELECT * FROM system_schema.keyspaces", |
122 | | "SELECT * FROM system_schema.tables", |
123 | | "SELECT * FROM system_schema.views", |
124 | | "SELECT * FROM system_schema.columns", |
125 | | "SELECT * FROM system_schema.types", |
126 | | "SELECT * FROM system_schema.functions", |
127 | | "SELECT * FROM system_schema.aggregates", |
128 | | "SELECT * FROM system_schema.triggers", |
129 | | "SELECT * FROM system_schema.indexes", |
130 | | }; |
131 | | |
132 | | SystemQueryCache::SystemQueryCache(cqlserver::CQLServiceImpl* service_impl) |
133 | 3.03k | : service_impl_(service_impl), stmt_params_() { |
134 | | |
135 | 3.03k | cache_ = std::make_unique<std::unordered_map<std::string, RowsResult::SharedPtr>>(); |
136 | 3.03k | last_updated_ = MonoTime::kMin; |
137 | 3.03k | InitializeQueries(); |
138 | | |
139 | 3.03k | pool_ = std::make_unique<yb::rpc::IoThreadPool>("system_query_cache_updater", 1); |
140 | 3.03k | scheduler_ = std::make_unique<yb::rpc::Scheduler>(&pool_->io_service()); |
141 | | |
142 | 3.03k | LOG(INFO) << "Created system query cache updater."; |
143 | | |
144 | 3.03k | if (FLAGS_cql_update_system_query_cache_msecs > 0) { |
145 | 3.03k | if (MonoDelta::FromMilliseconds(FLAGS_cql_system_query_cache_stale_msecs) < |
146 | 0 | MonoDelta::FromMilliseconds(FLAGS_cql_update_system_query_cache_msecs)) { |
147 | 0 | LOG(WARNING) << "Stale expiration shorter than update rate."; |
148 | 0 | } |
149 | | |
150 | 3.03k | ScheduleRefreshCache(false /* now */); |
151 | 0 | } else { |
152 | 0 | LOG(WARNING) << "System cache created with nonpositive timeout. Disabling scheduling"; |
153 | 0 | } |
154 | 3.03k | } |
155 | | |
156 | 0 | SystemQueryCache::~SystemQueryCache() { |
157 | 0 | if (pool_) { |
158 | 0 | scheduler_->Shutdown(); |
159 | 0 | pool_->Shutdown(); |
160 | 0 | pool_->Join(); |
161 | 0 | } |
162 | 0 | } |
163 | | |
164 | 3.03k | void SystemQueryCache::InitializeQueries() { |
165 | 60.6k | for (auto query : SYSTEM_QUERIES) { |
166 | 60.6k | queries_.push_back(query); |
167 | 60.6k | } |
168 | | |
169 | 3.03k | std::vector<QualifiedTable> table_pairs; |
170 | | // This should have been caught by the flag validator |
171 | 3.03k | if (!parse_tables(FLAGS_cql_system_query_cache_tables, &table_pairs)) { |
172 | 0 | return; |
173 | 0 | } |
174 | | |
175 | 3.03k | const char* formats[] = { |
176 | 3.03k | "SELECT * FROM system_schema.tables WHERE keyspace_name = '$0' AND table_name = '$1'", |
177 | 3.03k | "SELECT * FROM system_schema.columns WHERE keyspace_name = '$0' AND table_name = '$1'", |
178 | 3.03k | "SELECT * FROM system_schema.triggers WHERE keyspace_name = '$0' AND table_name = '$1'", |
179 | 3.03k | "SELECT * FROM system_schema.indexes WHERE keyspace_name = '$0' AND table_name = '$1'", |
180 | 3.03k | "SELECT * FROM system_schema.views WHERE keyspace_name = '$0' AND view_name = '$1'", |
181 | 3.03k | }; |
182 | | |
183 | 0 | for (auto pair : table_pairs) { |
184 | 0 | for (auto format : formats) { |
185 | 0 | queries_.push_back(yb::Format(format, pair.keyspace, pair.table)); |
186 | 0 | } |
187 | 0 | } |
188 | | |
189 | 3.03k | } |
190 | | |
191 | 148k | boost::optional<RowsResult::SharedPtr> SystemQueryCache::Lookup(const std::string& query) { |
192 | 148k | if (FLAGS_cql_system_query_cache_stale_msecs > 0 && |
193 | 148k | GetStaleness() > MonoDelta::FromMilliseconds(FLAGS_cql_system_query_cache_stale_msecs)) { |
194 | 66.4k | return boost::none; |
195 | 66.4k | } |
196 | 82.2k | const std::lock_guard<std::mutex> l(cache_mutex_); |
197 | | |
198 | 82.2k | const auto it = cache_->find(query); |
199 | 82.2k | if (it == cache_->end()) { |
200 | 73.0k | return boost::none; |
201 | 9.21k | } else { |
202 | 9.21k | return it->second; |
203 | 9.21k | } |
204 | 82.2k | } |
205 | | |
206 | 148k | MonoDelta SystemQueryCache::GetStaleness() { |
207 | 148k | const std::lock_guard<std::mutex> l(cache_mutex_); |
208 | 148k | return MonoTime::Now() - last_updated_; |
209 | 148k | } |
210 | | |
211 | 9.12k | void SystemQueryCache::RefreshCache() { |
212 | 0 | VLOG(1) << "Refreshing system query cache"; |
213 | 9.12k | auto new_cache = std::make_unique<std::unordered_map<std::string, RowsResult::SharedPtr>>(); |
214 | 181k | for (auto query : queries_) { |
215 | 181k | Status status; |
216 | 181k | ExecutedResult::SharedPtr result; |
217 | 181k | ExecuteSync(query, &status, &result); |
218 | | |
219 | 181k | if (status.ok()) { |
220 | 181k | auto rows_result = std::dynamic_pointer_cast<RowsResult>(result); |
221 | 181k | if (FLAGS_cql_system_query_cache_empty_responses || |
222 | 181k | rows_result->GetRowBlock()->row_count() > 0) { |
223 | 129k | (*new_cache)[query] = rows_result; |
224 | 52.3k | } else { |
225 | 52.3k | LOG(INFO) << "Skipping empty result for statement: " << query; |
226 | 52.3k | } |
227 | 54 | } else { |
228 | 54 | LOG(WARNING) << "Could not execute statement: " << query << "; status: " << status.ToString(); |
229 | | // We don't want to update the cache with no data; instead we'll let the |
230 | | // stale cache persist. |
231 | 54 | ScheduleRefreshCache(false /* now */); |
232 | 54 | return; |
233 | 54 | } |
234 | 181k | } |
235 | | |
236 | 9.07k | { |
237 | 9.07k | const std::lock_guard<std::mutex> l(cache_mutex_); |
238 | 9.07k | cache_ = std::move(new_cache); |
239 | 9.07k | last_updated_ = MonoTime::Now(); |
240 | 9.07k | } |
241 | | |
242 | 9.07k | ScheduleRefreshCache(false /* now */); |
243 | 9.07k | } |
244 | | |
245 | 12.1k | void SystemQueryCache::ScheduleRefreshCache(bool now) { |
246 | 12.1k | DCHECK(pool_); |
247 | 12.1k | DCHECK(scheduler_); |
248 | 0 | VLOG(1) << "Scheduling cache refresh"; |
249 | | |
250 | 9.12k | scheduler_->Schedule([this](const Status &s) { |
251 | 9.12k | if (!s.ok()) { |
252 | 0 | LOG(INFO) << "System cache updater scheduler was shutdown: " << s.ToString(); |
253 | 0 | return; |
254 | 0 | } |
255 | 9.12k | this->RefreshCache(); |
256 | 12.1k | }, std::chrono::milliseconds(now ? 0 : FLAGS_cql_update_system_query_cache_msecs)); |
257 | 12.1k | } |
258 | | |
259 | | void SystemQueryCache::ExecuteSync(const std::string& stmt, Status* status, |
260 | 181k | ExecutedResult::SharedPtr* result_ptr) { |
261 | 181k | const auto processor = service_impl_->GetProcessor(); |
262 | 181k | if (!processor.ok()) { |
263 | 0 | LOG(ERROR) << "Unable to get CQLProcessor for system query cache"; |
264 | 0 | *status = processor.status(); |
265 | 0 | return; |
266 | 0 | } |
267 | | |
268 | 181k | Synchronizer sync; |
269 | 181k | const auto callback = [](Synchronizer* sync, ExecutedResult::SharedPtr* result_ptr, |
270 | 181k | const Status& status, const ExecutedResult::SharedPtr& result) { |
271 | 181k | *result_ptr = result; |
272 | 181k | sync->StatusCB(status); |
273 | 181k | }; |
274 | | |
275 | 181k | (*processor)->RunAsync(stmt, stmt_params_, yb::Bind(+callback, &sync, result_ptr)); |
276 | 181k | *status = sync.Wait(); |
277 | 181k | (*processor)->Release(); |
278 | 181k | } |
279 | | |
280 | | } // namespace cqlserver |
281 | | } // namespace yb |