/Users/deen/code/yugabyte-db/src/yb/consensus/log_cache.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/consensus/log_cache.h" |
34 | | |
35 | | #include <algorithm> |
36 | | #include <map> |
37 | | #include <mutex> |
38 | | #include <vector> |
39 | | |
40 | | #include "yb/consensus/consensus_util.h" |
41 | | #include "yb/consensus/log.h" |
42 | | #include "yb/consensus/log_reader.h" |
43 | | #include "yb/consensus/opid_util.h" |
44 | | |
45 | | #include "yb/gutil/bind.h" |
46 | | #include "yb/gutil/map-util.h" |
47 | | #include "yb/gutil/strings/human_readable.h" |
48 | | |
49 | | #include "yb/util/flag_tags.h" |
50 | | #include "yb/util/format.h" |
51 | | #include "yb/util/locks.h" |
52 | | #include "yb/util/logging.h" |
53 | | #include "yb/util/mem_tracker.h" |
54 | | #include "yb/util/metrics.h" |
55 | | #include "yb/util/monotime.h" |
56 | | #include "yb/util/result.h" |
57 | | #include "yb/util/size_literals.h" |
58 | | #include "yb/util/status_format.h" |
59 | | |
60 | | using namespace std::literals; |
61 | | |
62 | | DEFINE_int32(log_cache_size_limit_mb, 128, |
63 | | "The total per-tablet size of consensus entries which may be kept in memory. " |
64 | | "The log cache attempts to keep all entries which have not yet been replicated " |
65 | | "to all followers in memory, but if the total size of those entries exceeds " |
66 | | "this limit within an individual tablet, the oldest will be evicted."); |
67 | | TAG_FLAG(log_cache_size_limit_mb, advanced); |
68 | | |
69 | | DEFINE_int32(global_log_cache_size_limit_mb, 1024, |
70 | | "Server-wide version of 'log_cache_size_limit_mb'. The total memory used for " |
71 | | "caching log entries across all tablets is kept under this threshold."); |
72 | | TAG_FLAG(global_log_cache_size_limit_mb, advanced); |
73 | | |
74 | | DEFINE_int32(global_log_cache_size_limit_percentage, 5, |
75 | | "The maximum percentage of root process memory that can be used for caching log " |
76 | | "entries across all tablets. Default is 5."); |
77 | | TAG_FLAG(global_log_cache_size_limit_percentage, advanced); |
78 | | |
79 | | DEFINE_test_flag(bool, log_cache_skip_eviction, false, |
80 | | "Don't evict log entries in tests."); |
81 | | |
82 | | using strings::Substitute; |
83 | | |
84 | | METRIC_DEFINE_gauge_int64(tablet, log_cache_num_ops, "Log Cache Operation Count", |
85 | | yb::MetricUnit::kOperations, |
86 | | "Number of operations in the log cache."); |
87 | | METRIC_DEFINE_gauge_int64(tablet, log_cache_size, "Log Cache Memory Usage", |
88 | | yb::MetricUnit::kBytes, |
89 | | "Amount of memory in use for caching the local log."); |
90 | | METRIC_DEFINE_counter(tablet, log_cache_disk_reads, "Log Cache Disk Reads", |
91 | | yb::MetricUnit::kEntries, |
92 | | "Amount of operations read from disk."); |
93 | | |
94 | | DECLARE_bool(get_changes_honor_deadline); |
95 | | |
96 | | namespace yb { |
97 | | namespace consensus { |
98 | | |
99 | | namespace { |
100 | | |
101 | | const std::string kParentMemTrackerId = "log_cache"s; |
102 | | |
103 | | } |
104 | | |
105 | | typedef vector<const ReplicateMsg*>::const_iterator MsgIter; |
106 | | |
107 | | LogCache::LogCache(const scoped_refptr<MetricEntity>& metric_entity, |
108 | | const log::LogPtr& log, |
109 | | const MemTrackerPtr& server_tracker, |
110 | | const string& local_uuid, |
111 | | const string& tablet_id) |
112 | | : log_(log), |
113 | | local_uuid_(local_uuid), |
114 | | tablet_id_(tablet_id), |
115 | | next_sequential_op_index_(0), |
116 | | min_pinned_op_index_(0), |
117 | 88.8k | metrics_(metric_entity) { |
118 | | |
119 | 88.8k | const int64_t max_ops_size_bytes = FLAGS_log_cache_size_limit_mb * 1_MB; |
120 | | |
121 | | // Set up (or reuse) a tracker with the global limit. It is parented directly to the root tracker |
122 | | // so that it's always global. |
123 | 88.8k | parent_tracker_ = GetServerMemTracker(server_tracker); |
124 | | |
125 | | // And create a child tracker with the per-tablet limit. |
126 | 88.8k | tracker_ = MemTracker::CreateTracker( |
127 | 88.8k | max_ops_size_bytes, Format("$0-$1", kParentMemTrackerId, tablet_id), parent_tracker_, |
128 | 88.8k | AddToParent::kTrue, CreateMetrics::kFalse); |
129 | 88.8k | tracker_->SetMetricEntity(metric_entity, kParentMemTrackerId); |
130 | | |
131 | | // Put a fake message at index 0, since this simplifies a lot of our code paths elsewhere. |
132 | 88.8k | auto zero_op = std::make_shared<ReplicateMsg>(); |
133 | 88.8k | *zero_op->mutable_id() = MinimumOpId(); |
134 | 88.8k | InsertOrDie(&cache_, 0, { zero_op, zero_op->SpaceUsed() }); |
135 | 88.8k | } |
136 | | |
137 | 100k | MemTrackerPtr LogCache::GetServerMemTracker(const MemTrackerPtr& server_tracker) { |
138 | 18.4E | CHECK(FLAGS_global_log_cache_size_limit_percentage > 0 && |
139 | 18.4E | FLAGS_global_log_cache_size_limit_percentage <= 100) |
140 | 18.4E | << Substitute("Flag FLAGS_global_log_cache_size_limit_percentage must be between 0 and 100. ", |
141 | 18.4E | "Current value: $0", |
142 | 18.4E | FLAGS_global_log_cache_size_limit_percentage); |
143 | | |
144 | 100k | int64_t global_max_ops_size_bytes = FLAGS_global_log_cache_size_limit_mb * 1_MB; |
145 | 100k | int64_t root_mem_limit = MemTracker::GetRootTracker()->limit(); |
146 | 100k | global_max_ops_size_bytes = std::min( |
147 | 100k | global_max_ops_size_bytes, |
148 | 100k | root_mem_limit * FLAGS_global_log_cache_size_limit_percentage / 100); |
149 | 100k | return MemTracker::FindOrCreateTracker( |
150 | 100k | global_max_ops_size_bytes, kParentMemTrackerId, server_tracker); |
151 | 100k | } |
152 | | |
153 | 47.8k | LogCache::~LogCache() { |
154 | 47.8k | tracker_->Release(tracker_->consumption()); |
155 | 47.8k | cache_.clear(); |
156 | | |
157 | 47.8k | tracker_->UnregisterFromParent(); |
158 | 47.8k | } |
159 | | |
160 | 88.7k | void LogCache::Init(const OpIdPB& preceding_op) { |
161 | 88.7k | std::lock_guard<simple_spinlock> l(lock_); |
162 | 0 | CHECK_EQ(cache_.size(), 1) << "Cache should have only our special '0' op"; |
163 | 88.7k | next_sequential_op_index_ = preceding_op.index() + 1; |
164 | 88.7k | min_pinned_op_index_ = next_sequential_op_index_; |
165 | 88.7k | } |
166 | | |
167 | 7.15M | LogCache::PrepareAppendResult LogCache::PrepareAppendOperations(const ReplicateMsgs& msgs) { |
168 | | // SpaceUsed is relatively expensive, so do calculations outside the lock |
169 | 7.15M | PrepareAppendResult result; |
170 | 7.15M | std::vector<CacheEntry> entries_to_insert; |
171 | 7.15M | entries_to_insert.reserve(msgs.size()); |
172 | 7.90M | for (const auto& msg : msgs) { |
173 | 7.90M | CacheEntry e = { msg, static_cast<int64_t>(msg->SpaceUsedLong()) }; |
174 | 7.90M | result.mem_required += e.mem_usage; |
175 | 7.90M | entries_to_insert.emplace_back(std::move(e)); |
176 | 7.90M | } |
177 | | |
178 | 7.15M | int64_t first_idx_in_batch = msgs.front()->id().index(); |
179 | 7.15M | result.last_idx_in_batch = msgs.back()->id().index(); |
180 | | |
181 | 7.15M | std::unique_lock<simple_spinlock> lock(lock_); |
182 | | // If we're not appending a consecutive op we're likely overwriting and need to replace operations |
183 | | // in the cache. |
184 | 7.15M | if (first_idx_in_batch != next_sequential_op_index_) { |
185 | | // If the index is not consecutive then it must be lower than or equal to the last index, i.e. |
186 | | // we're overwriting. |
187 | 123 | CHECK_LE(first_idx_in_batch, next_sequential_op_index_); |
188 | | |
189 | | // Now remove the overwritten operations. |
190 | 349 | for (int64_t i = first_idx_in_batch; i < next_sequential_op_index_; ++i) { |
191 | 226 | auto it = cache_.find(i); |
192 | 226 | if (it != cache_.end()) { |
193 | 22 | AccountForMessageRemovalUnlocked(it->second); |
194 | 22 | cache_.erase(it); |
195 | 22 | } |
196 | 226 | } |
197 | 123 | } |
198 | | |
199 | 7.91M | for (auto& e : entries_to_insert) { |
200 | 7.91M | auto index = e.msg->id().index(); |
201 | 7.91M | EmplaceOrDie(&cache_, index, std::move(e)); |
202 | 7.91M | next_sequential_op_index_ = index + 1; |
203 | 7.91M | } |
204 | | |
205 | 7.15M | return result; |
206 | 7.15M | } |
207 | | |
208 | | Status LogCache::AppendOperations(const ReplicateMsgs& msgs, const yb::OpId& committed_op_id, |
209 | | RestartSafeCoarseTimePoint batch_mono_time, |
210 | 13.1M | const StatusCallback& callback) { |
211 | 13.1M | PrepareAppendResult prepare_result; |
212 | 13.1M | if (!msgs.empty()) { |
213 | 7.14M | prepare_result = PrepareAppendOperations(msgs); |
214 | 7.14M | } |
215 | | |
216 | 13.1M | Status log_status = log_->AsyncAppendReplicates( |
217 | 13.1M | msgs, committed_op_id, batch_mono_time, |
218 | 13.1M | Bind(&LogCache::LogCallback, Unretained(this), prepare_result.last_idx_in_batch, callback)); |
219 | | |
220 | 13.1M | if (!log_status.ok()) { |
221 | 0 | LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Couldn't append to log: " << log_status; |
222 | 0 | return log_status; |
223 | 0 | } |
224 | | |
225 | 13.1M | metrics_.size->IncrementBy(prepare_result.mem_required); |
226 | 13.1M | metrics_.num_ops->IncrementBy(msgs.size()); |
227 | | |
228 | 13.1M | return Status::OK(); |
229 | 13.1M | } |
230 | | |
231 | | void LogCache::LogCallback(int64_t last_idx_in_batch, |
232 | | const StatusCallback& user_callback, |
233 | 13.1M | const Status& log_status) { |
234 | 13.1M | if (log_status.ok()) { |
235 | 13.1M | std::lock_guard<simple_spinlock> l(lock_); |
236 | 13.1M | if (min_pinned_op_index_ <= last_idx_in_batch) { |
237 | 84 | VLOG_WITH_PREFIX_UNLOCKED(1) << "Updating pinned index to " << (last_idx_in_batch + 1); |
238 | 7.15M | min_pinned_op_index_ = last_idx_in_batch + 1; |
239 | 7.15M | } |
240 | 13.1M | } |
241 | 13.1M | user_callback.Run(log_status); |
242 | 13.1M | } |
243 | | |
244 | 302 | int64_t LogCache::earliest_op_index() const { |
245 | 302 | auto ret = log_->GetLogReader()->GetMinReplicateIndex(); |
246 | 302 | if (ret == -1) { // No finalized log files yet. Query the active log. |
247 | 302 | ret = log_->GetMinReplicateIndex(); |
248 | 302 | } |
249 | 302 | return ret; |
250 | 302 | } |
251 | | |
252 | 15.1M | bool LogCache::HasOpBeenWritten(int64_t index) const { |
253 | 15.1M | std::lock_guard<simple_spinlock> l(lock_); |
254 | 15.1M | return index < next_sequential_op_index_; |
255 | 15.1M | } |
256 | | |
257 | 25.4M | Result<yb::OpId> LogCache::LookupOpId(int64_t op_index) const { |
258 | | // First check the log cache itself. |
259 | 25.4M | { |
260 | 25.4M | std::lock_guard<simple_spinlock> l(lock_); |
261 | | |
262 | | // We sometimes try to look up OpIds that have never been written on |
263 | | // the local node. In that case, don't try to read the op from the |
264 | | // log reader, since it might actually race against the writing of the op. |
265 | 25.4M | if (op_index >= next_sequential_op_index_) { |
266 | 4 | return STATUS(Incomplete, Substitute("Op with index $0 is ahead of the local log " |
267 | 4 | "(next sequential op: $1)", |
268 | 4 | op_index, next_sequential_op_index_)); |
269 | 4 | } |
270 | 25.4M | auto iter = cache_.find(op_index); |
271 | 25.4M | if (iter != cache_.end()) { |
272 | 10.7M | return yb::OpId::FromPB(iter->second.msg->id()); |
273 | 10.7M | } |
274 | 14.7M | } |
275 | | |
276 | | // If it misses, read from the log. |
277 | 14.7M | return log_->GetLogReader()->LookupOpId(op_index); |
278 | 14.7M | } |
279 | | |
280 | | namespace { |
281 | | |
282 | | // Calculate the total byte size that will be used on the wire to replicate this message as part of |
283 | | // a consensus update request. This accounts for the length delimiting and tagging of the message. |
284 | 5.20M | int64_t TotalByteSizeForMessage(const ReplicateMsg& msg) { |
285 | 5.20M | auto msg_size = google::protobuf::internal::WireFormatLite::LengthDelimitedSize( |
286 | 5.20M | msg.ByteSize()); |
287 | 5.20M | msg_size += 1; // for the type tag |
288 | 5.20M | return msg_size; |
289 | 5.20M | } |
290 | | |
291 | | } // anonymous namespace |
292 | | |
293 | 14 | Result<ReadOpsResult> LogCache::ReadOps(int64_t after_op_index, size_t max_size_bytes) { |
294 | 14 | return ReadOps(after_op_index, 0 /* to_op_index */, max_size_bytes); |
295 | 14 | } |
296 | | |
297 | | Result<ReadOpsResult> LogCache::ReadOps(int64_t after_op_index, |
298 | | int64_t to_op_index, |
299 | | size_t max_size_bytes, |
300 | 9.42M | CoarseTimePoint deadline) { |
301 | 9.42M | DCHECK_GE(after_op_index, 0); |
302 | | |
303 | 9.43k | VLOG_WITH_PREFIX_UNLOCKED(4) << "ReadOps, after_op_index: " << after_op_index |
304 | 9.43k | << ", to_op_index: " << to_op_index |
305 | 9.43k | << ", max_size_bytes: " << max_size_bytes; |
306 | 9.42M | ReadOpsResult result; |
307 | 9.42M | int64_t starting_op_segment_seq_num; |
308 | 9.42M | result.preceding_op = VERIFY_RESULT(LookupOpId(after_op_index)); |
309 | | |
310 | 9.42M | std::unique_lock<simple_spinlock> l(lock_); |
311 | 9.42M | int64_t next_index = after_op_index + 1; |
312 | 9.42M | int64_t to_index = to_op_index > 0 |
313 | 307 | ? std::min(to_op_index + 1, next_sequential_op_index_) |
314 | 9.42M | : next_sequential_op_index_; |
315 | | |
316 | | // Remove the deadline if the GetChanges deadline feature is disabled. |
317 | 9.42M | if (!ANNOTATE_UNPROTECTED_READ(FLAGS_get_changes_honor_deadline)) { |
318 | 0 | deadline = CoarseTimePoint::max(); |
319 | 0 | } |
320 | | |
321 | | // Return as many operations as we can, up to the limit. |
322 | 9.42M | int64_t remaining_space = max_size_bytes; |
323 | 13.6M | while (remaining_space >= 0 && next_index < to_index) { |
324 | | // Stop reading if a deadline was specified and the deadline has been exceeded. |
325 | 4.21M | if (deadline != CoarseTimePoint::max() && CoarseMonoClock::Now() >= deadline) { |
326 | 0 | break; |
327 | 0 | } |
328 | | |
329 | | // If the messages the peer needs haven't been loaded into the queue yet, load them. |
330 | 4.21M | MessageCache::const_iterator iter = cache_.lower_bound(next_index); |
331 | 4.21M | if (iter == cache_.end() || iter->first != next_index) { |
332 | 398 | int64_t up_to; |
333 | 398 | if (iter == cache_.end()) { |
334 | | // Read all the way to the current op. |
335 | 102 | up_to = to_index - 1; |
336 | 296 | } else { |
337 | | // Read up to the next entry that's in the cache or to_index whichever is lesser. |
338 | 296 | up_to = std::min(iter->first - 1, to_index - 1); |
339 | 296 | } |
340 | | |
341 | 398 | l.unlock(); |
342 | | |
343 | 398 | ReplicateMsgs raw_replicate_ptrs; |
344 | 398 | RETURN_NOT_OK_PREPEND( |
345 | 398 | log_->GetLogReader()->ReadReplicatesInRange( |
346 | 398 | next_index, up_to, remaining_space, &raw_replicate_ptrs, &starting_op_segment_seq_num, |
347 | 398 | &result.header_schema, &(result.header_schema_version), deadline), |
348 | 398 | Substitute("Failed to read ops $0..$1", next_index, up_to)); |
349 | | |
350 | 398 | if ((starting_op_segment_seq_num != -1) && !result.header_schema.IsInitialized()) { |
351 | 0 | scoped_refptr<log::ReadableLogSegment> segment = |
352 | 0 | log_->GetLogReader()->GetSegmentBySequenceNumber(starting_op_segment_seq_num); |
353 | |
|
354 | 0 | if (segment != nullptr && segment->header().has_unused_schema()) { |
355 | 0 | result.header_schema.CopyFrom(segment->header().unused_schema()); |
356 | 0 | result.header_schema_version = segment->header().unused_schema_version(); |
357 | 0 | } |
358 | 0 | } |
359 | | |
360 | 398 | metrics_.disk_reads->IncrementBy(raw_replicate_ptrs.size()); |
361 | 398 | LOG_WITH_PREFIX_UNLOCKED(INFO) |
362 | 398 | << "Successfully read " << raw_replicate_ptrs.size() << " ops from disk."; |
363 | 398 | l.lock(); |
364 | | |
365 | 940 | for (auto& msg : raw_replicate_ptrs) { |
366 | 940 | CHECK_EQ(next_index, msg->id().index()); |
367 | | |
368 | 940 | auto current_message_size = TotalByteSizeForMessage(*msg); |
369 | 940 | remaining_space -= current_message_size; |
370 | 940 | if (remaining_space < 0 && !result.messages.empty()) { |
371 | 0 | break; |
372 | 0 | } |
373 | 940 | result.messages.push_back(msg); |
374 | 940 | if (msg->op_type() == consensus::OperationType::CHANGE_METADATA_OP) { |
375 | 301 | result.header_schema.CopyFrom(msg->change_metadata_request().schema()); |
376 | 301 | result.header_schema_version = msg->change_metadata_request().schema_version(); |
377 | 301 | } |
378 | 940 | result.read_from_disk_size += current_message_size; |
379 | 940 | next_index++; |
380 | 940 | } |
381 | 4.21M | } else { |
382 | 4.21M | starting_op_segment_seq_num = VERIFY_RESULT(log_->GetLogReader()->LookupHeader(next_index)); |
383 | | |
384 | 4.21M | if ((starting_op_segment_seq_num != -1)) { |
385 | 2.50M | scoped_refptr<log::ReadableLogSegment> segment = |
386 | 2.50M | log_->GetLogReader()->GetSegmentBySequenceNumber(starting_op_segment_seq_num); |
387 | 2.50M | if (segment != nullptr && segment->header().has_unused_schema()) { |
388 | 2.50M | result.header_schema.CopyFrom(segment->header().unused_schema()); |
389 | 2.50M | result.header_schema_version = segment->header().unused_schema_version(); |
390 | 2.50M | } |
391 | 2.50M | } |
392 | | // Pull contiguous messages from the cache until the size limit is achieved. |
393 | 8.90M | for (; iter != cache_.end(); ++iter) { |
394 | 4.68M | if (to_op_index > 0 && next_index > to_op_index) { |
395 | 0 | break; |
396 | 0 | } |
397 | 4.68M | const ReplicateMsgPtr& msg = iter->second.msg; |
398 | 4.68M | int64_t index = msg->id().index(); |
399 | 4.68M | if (index != next_index) { |
400 | 0 | continue; |
401 | 0 | } |
402 | | |
403 | 4.68M | auto current_message_size = TotalByteSizeForMessage(*msg); |
404 | 4.68M | remaining_space -= current_message_size; |
405 | 4.68M | if (remaining_space < 0 && !result.messages.empty()) { |
406 | 0 | break; |
407 | 0 | } |
408 | | |
409 | 4.68M | result.messages.push_back(msg); |
410 | 4.68M | if (msg->op_type() == consensus::OperationType::CHANGE_METADATA_OP) { |
411 | 320k | result.header_schema.CopyFrom(msg->change_metadata_request().schema()); |
412 | 320k | result.header_schema_version = msg->change_metadata_request().schema_version(); |
413 | 320k | } |
414 | 4.68M | next_index++; |
415 | 4.68M | } |
416 | 4.21M | } |
417 | 4.21M | } |
418 | 9.42M | result.have_more_messages = remaining_space < 0; |
419 | 9.42M | return result; |
420 | 9.42M | } |
421 | | |
422 | 23.3M | size_t LogCache::EvictThroughOp(int64_t index, int64_t bytes_to_evict) { |
423 | 23.3M | std::lock_guard<simple_spinlock> lock(lock_); |
424 | 23.3M | return EvictSomeUnlocked(index, bytes_to_evict); |
425 | 23.3M | } |
426 | | |
427 | 23.3M | size_t LogCache::EvictSomeUnlocked(int64_t stop_after_index, int64_t bytes_to_evict) { |
428 | 23.3M | DCHECK(lock_.is_locked()); |
429 | 18.4E | VLOG_WITH_PREFIX_UNLOCKED(2) << "Evicting log cache index <= " |
430 | 18.4E | << stop_after_index |
431 | 18.4E | << " or " << HumanReadableNumBytes::ToString(bytes_to_evict) |
432 | 18.4E | << ": before state: " << ToStringUnlocked(); |
433 | | |
434 | 23.3M | if (ANNOTATE_UNPROTECTED_READ(FLAGS_TEST_log_cache_skip_eviction)) { |
435 | 0 | return 0; |
436 | 0 | } |
437 | | |
438 | 23.3M | int64_t bytes_evicted = 0; |
439 | 54.6M | for (auto iter = cache_.begin(); iter != cache_.end();) { |
440 | 38.7M | const CacheEntry& entry = iter->second; |
441 | 38.7M | const ReplicateMsgPtr& msg = entry.msg; |
442 | 18.4E | VLOG_WITH_PREFIX_UNLOCKED(2) << "considering for eviction: " << msg->id(); |
443 | 38.7M | int64_t msg_index = msg->id().index(); |
444 | 38.7M | if (msg_index == 0) { |
445 | | // Always keep our special '0' op. |
446 | 23.3M | ++iter; |
447 | 23.3M | continue; |
448 | 23.3M | } |
449 | | |
450 | 15.4M | if (msg_index > stop_after_index || msg_index >= min_pinned_op_index_) { |
451 | 7.51M | break; |
452 | 7.51M | } |
453 | | |
454 | 18.4E | VLOG_WITH_PREFIX_UNLOCKED(2) << "Evicting cache. Removing: " << msg->id(); |
455 | 7.89M | AccountForMessageRemovalUnlocked(entry); |
456 | 7.89M | bytes_evicted += entry.mem_usage; |
457 | 7.89M | cache_.erase(iter++); |
458 | | |
459 | 7.89M | if (bytes_evicted >= bytes_to_evict) { |
460 | 2.39k | break; |
461 | 2.39k | } |
462 | 7.89M | } |
463 | 18.4E | VLOG_WITH_PREFIX_UNLOCKED(1) << "Evicting log cache: after state: " << ToStringUnlocked(); |
464 | | |
465 | 23.3M | return bytes_evicted; |
466 | 23.3M | } |
467 | | |
468 | 0 | Status LogCache::FlushIndex() { |
469 | 0 | return log_->FlushIndex(); |
470 | 0 | } |
471 | | |
472 | 7.90M | void LogCache::AccountForMessageRemovalUnlocked(const CacheEntry& entry) { |
473 | 7.90M | if (entry.tracked) { |
474 | 1.63M | tracker_->Release(entry.mem_usage); |
475 | 1.63M | } |
476 | 7.90M | metrics_.size->DecrementBy(entry.mem_usage); |
477 | 7.90M | metrics_.num_ops->Decrement(); |
478 | 7.90M | } |
479 | | |
480 | 7 | int64_t LogCache::BytesUsed() const { |
481 | 7 | return tracker_->consumption(); |
482 | 7 | } |
483 | | |
484 | 3 | Result<OpId> LogCache::TEST_GetLastOpIdWithType(int64_t max_allowed_index, OperationType op_type) { |
485 | 3 | constexpr int kStepSize = 20; |
486 | 6 | for (auto end = max_allowed_index; end > 0; end -= kStepSize) { |
487 | 3 | auto result = VERIFY_RESULT(ReadOps( |
488 | 3 | std::max<int64_t>(0, end - kStepSize), end, std::numeric_limits<int>::max())); |
489 | 6 | for (auto it = result.messages.end(); it != result.messages.begin();) { |
490 | 3 | --it; |
491 | 3 | if ((**it).op_type() == op_type) { |
492 | 0 | return OpId::FromPB((**it).id()); |
493 | 0 | } |
494 | 3 | } |
495 | 3 | } |
496 | 3 | return STATUS_FORMAT(NotFound, "Operation of type $0 not found before $1", |
497 | 3 | OperationType_Name(op_type), max_allowed_index); |
498 | 3 | } |
499 | | |
500 | 94 | string LogCache::StatsString() const { |
501 | 94 | std::lock_guard<simple_spinlock> lock(lock_); |
502 | 94 | return StatsStringUnlocked(); |
503 | 94 | } |
504 | | |
505 | 97 | string LogCache::StatsStringUnlocked() const { |
506 | 97 | return Substitute("LogCacheStats(num_ops=$0, bytes=$1, disk_reads=$2)", |
507 | 97 | metrics_.num_ops->value(), |
508 | 97 | metrics_.size->value(), |
509 | 97 | metrics_.disk_reads->value()); |
510 | 97 | } |
511 | | |
512 | 1 | std::string LogCache::ToString() const { |
513 | 1 | std::lock_guard<simple_spinlock> lock(lock_); |
514 | 1 | return ToStringUnlocked(); |
515 | 1 | } |
516 | | |
517 | 3 | std::string LogCache::ToStringUnlocked() const { |
518 | 3 | return Substitute("Pinned index: $0, $1", |
519 | 3 | min_pinned_op_index_, |
520 | 3 | StatsStringUnlocked()); |
521 | 3 | } |
522 | | |
523 | 1.77k | std::string LogCache::LogPrefixUnlocked() const { |
524 | 1.77k | return MakeTabletLogPrefix(tablet_id_, local_uuid_); |
525 | 1.77k | } |
526 | | |
527 | 0 | void LogCache::DumpToLog() const { |
528 | 0 | vector<string> strings; |
529 | 0 | DumpToStrings(&strings); |
530 | 0 | for (const string& s : strings) { |
531 | 0 | LOG_WITH_PREFIX_UNLOCKED(INFO) << s; |
532 | 0 | } |
533 | 0 | } |
534 | | |
535 | 0 | void LogCache::DumpToStrings(vector<string>* lines) const { |
536 | 0 | std::lock_guard<simple_spinlock> lock(lock_); |
537 | 0 | int counter = 0; |
538 | 0 | lines->push_back(ToStringUnlocked()); |
539 | 0 | lines->push_back("Messages:"); |
540 | 0 | for (const auto& entry : cache_) { |
541 | 0 | const ReplicateMsgPtr msg = entry.second.msg; |
542 | 0 | lines->push_back( |
543 | 0 | Substitute("Message[$0] $1.$2 : REPLICATE. Type: $3, Size: $4", |
544 | 0 | counter++, msg->id().term(), msg->id().index(), |
545 | 0 | OperationType_Name(msg->op_type()), |
546 | 0 | msg->ByteSize())); |
547 | 0 | } |
548 | 0 | } |
549 | | |
550 | 47 | void LogCache::DumpToHtml(std::ostream& out) const { |
551 | 47 | using std::endl; |
552 | | |
553 | 47 | std::lock_guard<simple_spinlock> lock(lock_); |
554 | 47 | out << "<h3>Messages:</h3>" << endl; |
555 | 47 | out << "<table>" << endl; |
556 | 47 | out << "<tr><th>Entry</th><th>OpId</th><th>Type</th><th>Size</th><th>Status</th></tr>" << endl; |
557 | | |
558 | 47 | int counter = 0; |
559 | 59 | for (const auto& entry : cache_) { |
560 | 59 | const ReplicateMsgPtr msg = entry.second.msg; |
561 | 59 | out << Substitute("<tr><th>$0</th><th>$1.$2</th><td>REPLICATE $3</td>" |
562 | 59 | "<td>$4</td><td>$5</td></tr>", |
563 | 59 | counter++, msg->id().term(), msg->id().index(), |
564 | 59 | OperationType_Name(msg->op_type()), |
565 | 59 | msg->ByteSize(), msg->id().ShortDebugString()) << endl; |
566 | 59 | } |
567 | 47 | out << "</table>"; |
568 | 47 | } |
569 | | |
570 | 7.03M | void LogCache::TrackOperationsMemory(const OpIds& op_ids) { |
571 | 7.03M | if (op_ids.empty()) { |
572 | 112k | return; |
573 | 112k | } |
574 | | |
575 | 6.92M | std::lock_guard<simple_spinlock> lock(lock_); |
576 | | |
577 | 6.92M | size_t mem_required = 0; |
578 | 7.79M | for (const auto& op_id : op_ids) { |
579 | 7.79M | auto it = cache_.find(op_id.index); |
580 | 7.79M | if (it != cache_.end() && it->second.msg->id().term() == op_id.term) { |
581 | 1.63M | mem_required += it->second.mem_usage; |
582 | 1.63M | it->second.tracked = true; |
583 | 1.63M | } |
584 | 7.79M | } |
585 | | |
586 | 6.92M | if (mem_required == 0) { |
587 | 5.46M | return; |
588 | 5.46M | } |
589 | | |
590 | | // Try to consume the memory. If it can't be consumed, we may need to evict. |
591 | 1.46M | if (!tracker_->TryConsume(mem_required)) { |
592 | 2.39k | auto spare = tracker_->SpareCapacity(); |
593 | 2.39k | auto need_to_free = mem_required - spare; |
594 | 0 | VLOG_WITH_PREFIX_UNLOCKED(1) |
595 | 0 | << "Memory limit would be exceeded trying to append " |
596 | 0 | << HumanReadableNumBytes::ToString(mem_required) |
597 | 0 | << " to log cache (available=" |
598 | 0 | << HumanReadableNumBytes::ToString(spare) |
599 | 0 | << "): attempting to evict some operations..."; |
600 | | |
601 | 2.39k | tracker_->Consume(mem_required); |
602 | | |
603 | | // TODO: we should also try to evict from other tablets - probably better to evict really old |
604 | | // ops from another tablet than evict recent ops from this one. |
605 | 2.39k | EvictSomeUnlocked(min_pinned_op_index_, need_to_free); |
606 | 2.39k | } |
607 | 1.46M | } |
608 | | |
609 | 7 | int64_t LogCache::num_cached_ops() const { |
610 | 7 | return metrics_.num_ops->value(); |
611 | 7 | } |
612 | | |
613 | | #define INSTANTIATE_METRIC(x, ...) \ |
614 | | x(BOOST_PP_CAT(METRIC_log_cache_, x).Instantiate(metric_entity, ## __VA_ARGS__)) |
615 | | LogCache::Metrics::Metrics(const scoped_refptr<MetricEntity>& metric_entity) |
616 | | : INSTANTIATE_METRIC(num_ops, 0), |
617 | | INSTANTIATE_METRIC(size, 0), |
618 | 88.8k | INSTANTIATE_METRIC(disk_reads) { |
619 | 88.8k | } |
620 | | #undef INSTANTIATE_METRIC |
621 | | |
622 | | } // namespace consensus |
623 | | } // namespace yb |