YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/consensus/log_cache.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/consensus/log_cache.h"
34
35
#include <algorithm>
36
#include <map>
37
#include <mutex>
38
#include <vector>
39
40
#include "yb/consensus/consensus_util.h"
41
#include "yb/consensus/log.h"
42
#include "yb/consensus/log_reader.h"
43
#include "yb/consensus/opid_util.h"
44
45
#include "yb/gutil/bind.h"
46
#include "yb/gutil/map-util.h"
47
#include "yb/gutil/strings/human_readable.h"
48
49
#include "yb/util/flag_tags.h"
50
#include "yb/util/format.h"
51
#include "yb/util/locks.h"
52
#include "yb/util/logging.h"
53
#include "yb/util/mem_tracker.h"
54
#include "yb/util/metrics.h"
55
#include "yb/util/monotime.h"
56
#include "yb/util/result.h"
57
#include "yb/util/size_literals.h"
58
#include "yb/util/status_format.h"
59
60
using namespace std::literals;
61
62
DEFINE_int32(log_cache_size_limit_mb, 128,
63
             "The total per-tablet size of consensus entries which may be kept in memory. "
64
             "The log cache attempts to keep all entries which have not yet been replicated "
65
             "to all followers in memory, but if the total size of those entries exceeds "
66
             "this limit within an individual tablet, the oldest will be evicted.");
67
TAG_FLAG(log_cache_size_limit_mb, advanced);
68
69
DEFINE_int32(global_log_cache_size_limit_mb, 1024,
70
             "Server-wide version of 'log_cache_size_limit_mb'. The total memory used for "
71
             "caching log entries across all tablets is kept under this threshold.");
72
TAG_FLAG(global_log_cache_size_limit_mb, advanced);
73
74
DEFINE_int32(global_log_cache_size_limit_percentage, 5,
75
             "The maximum percentage of root process memory that can be used for caching log "
76
             "entries across all tablets. Default is 5.");
77
TAG_FLAG(global_log_cache_size_limit_percentage, advanced);
78
79
DEFINE_test_flag(bool, log_cache_skip_eviction, false,
80
                 "Don't evict log entries in tests.");
81
82
using strings::Substitute;
83
84
METRIC_DEFINE_gauge_int64(tablet, log_cache_num_ops, "Log Cache Operation Count",
85
                          yb::MetricUnit::kOperations,
86
                          "Number of operations in the log cache.");
87
METRIC_DEFINE_gauge_int64(tablet, log_cache_size, "Log Cache Memory Usage",
88
                          yb::MetricUnit::kBytes,
89
                          "Amount of memory in use for caching the local log.");
90
METRIC_DEFINE_counter(tablet, log_cache_disk_reads, "Log Cache Disk Reads",
91
                      yb::MetricUnit::kEntries,
92
                      "Amount of operations read from disk.");
93
94
DECLARE_bool(get_changes_honor_deadline);
95
96
namespace yb {
97
namespace consensus {
98
99
namespace {
100
101
const std::string kParentMemTrackerId = "log_cache"s;
102
103
}
104
105
typedef vector<const ReplicateMsg*>::const_iterator MsgIter;
106
107
LogCache::LogCache(const scoped_refptr<MetricEntity>& metric_entity,
108
                   const log::LogPtr& log,
109
                   const MemTrackerPtr& server_tracker,
110
                   const string& local_uuid,
111
                   const string& tablet_id)
112
  : log_(log),
113
    local_uuid_(local_uuid),
114
    tablet_id_(tablet_id),
115
    next_sequential_op_index_(0),
116
    min_pinned_op_index_(0),
117
150k
    metrics_(metric_entity) {
118
119
150k
  const int64_t max_ops_size_bytes = FLAGS_log_cache_size_limit_mb * 1_MB;
120
121
  // Set up (or reuse) a tracker with the global limit. It is parented directly to the root tracker
122
  // so that it's always global.
123
150k
  parent_tracker_ = GetServerMemTracker(server_tracker);
124
125
  // And create a child tracker with the per-tablet limit.
126
150k
  tracker_ = MemTracker::CreateTracker(
127
150k
      max_ops_size_bytes, Format("$0-$1", kParentMemTrackerId, tablet_id), parent_tracker_,
128
150k
      AddToParent::kTrue, CreateMetrics::kFalse);
129
150k
  tracker_->SetMetricEntity(metric_entity, kParentMemTrackerId);
130
131
  // Put a fake message at index 0, since this simplifies a lot of our code paths elsewhere.
132
150k
  auto zero_op = std::make_shared<ReplicateMsg>();
133
150k
  *zero_op->mutable_id() = MinimumOpId();
134
150k
  InsertOrDie(&cache_, 0, { zero_op, zero_op->SpaceUsed() });
135
150k
}
136
137
167k
MemTrackerPtr LogCache::GetServerMemTracker(const MemTrackerPtr& server_tracker) {
138
18.4E
  CHECK(FLAGS_global_log_cache_size_limit_percentage > 0 &&
139
18.4E
        FLAGS_global_log_cache_size_limit_percentage <= 100)
140
18.4E
    << Substitute("Flag FLAGS_global_log_cache_size_limit_percentage must be between 0 and 100. ",
141
18.4E
                  "Current value: $0",
142
18.4E
                  FLAGS_global_log_cache_size_limit_percentage);
143
144
167k
  int64_t global_max_ops_size_bytes = FLAGS_global_log_cache_size_limit_mb * 1_MB;
145
167k
  int64_t root_mem_limit = MemTracker::GetRootTracker()->limit();
146
167k
  global_max_ops_size_bytes = std::min(
147
167k
      global_max_ops_size_bytes,
148
167k
      root_mem_limit * FLAGS_global_log_cache_size_limit_percentage / 100);
149
167k
  return MemTracker::FindOrCreateTracker(
150
167k
      global_max_ops_size_bytes, kParentMemTrackerId, server_tracker);
151
167k
}
152
153
75.6k
LogCache::~LogCache() {
154
75.6k
  tracker_->Release(tracker_->consumption());
155
75.6k
  cache_.clear();
156
157
75.6k
  tracker_->UnregisterFromParent();
158
75.6k
}
159
160
150k
void LogCache::Init(const OpIdPB& preceding_op) {
161
150k
  std::lock_guard<simple_spinlock> l(lock_);
162
150k
  CHECK_EQ
(cache_.size(), 1) << "Cache should have only our special '0' op"0
;
163
150k
  next_sequential_op_index_ = preceding_op.index() + 1;
164
150k
  min_pinned_op_index_ = next_sequential_op_index_;
165
150k
}
166
167
13.2M
LogCache::PrepareAppendResult LogCache::PrepareAppendOperations(const ReplicateMsgs& msgs) {
168
  // SpaceUsed is relatively expensive, so do calculations outside the lock
169
13.2M
  PrepareAppendResult result;
170
13.2M
  std::vector<CacheEntry> entries_to_insert;
171
13.2M
  entries_to_insert.reserve(msgs.size());
172
14.5M
  for (const auto& msg : msgs) {
173
14.5M
    CacheEntry e = { msg, static_cast<int64_t>(msg->SpaceUsedLong()) };
174
14.5M
    result.mem_required += e.mem_usage;
175
14.5M
    entries_to_insert.emplace_back(std::move(e));
176
14.5M
  }
177
178
13.2M
  int64_t first_idx_in_batch = msgs.front()->id().index();
179
13.2M
  result.last_idx_in_batch = msgs.back()->id().index();
180
181
13.2M
  std::unique_lock<simple_spinlock> lock(lock_);
182
  // If we're not appending a consecutive op we're likely overwriting and need to replace operations
183
  // in the cache.
184
13.2M
  if (first_idx_in_batch != next_sequential_op_index_) {
185
    // If the index is not consecutive then it must be lower than or equal to the last index, i.e.
186
    // we're overwriting.
187
179
    CHECK_LE(first_idx_in_batch, next_sequential_op_index_);
188
189
    // Now remove the overwritten operations.
190
461
    for (int64_t i = first_idx_in_batch; i < next_sequential_op_index_; 
++i282
) {
191
282
      auto it = cache_.find(i);
192
282
      if (it != cache_.end()) {
193
69
        AccountForMessageRemovalUnlocked(it->second);
194
69
        cache_.erase(it);
195
69
      }
196
282
    }
197
179
  }
198
199
14.5M
  for (auto& e : entries_to_insert) {
200
14.5M
    auto index = e.msg->id().index();
201
14.5M
    EmplaceOrDie(&cache_, index, std::move(e));
202
14.5M
    next_sequential_op_index_ = index + 1;
203
14.5M
  }
204
205
13.2M
  return result;
206
13.2M
}
207
208
Status LogCache::AppendOperations(const ReplicateMsgs& msgs, const yb::OpId& committed_op_id,
209
                                  RestartSafeCoarseTimePoint batch_mono_time,
210
24.2M
                                  const StatusCallback& callback) {
211
24.2M
  PrepareAppendResult prepare_result;
212
24.2M
  if (!msgs.empty()) {
213
13.2M
    prepare_result = PrepareAppendOperations(msgs);
214
13.2M
  }
215
216
24.2M
  Status log_status = log_->AsyncAppendReplicates(
217
24.2M
    msgs, committed_op_id, batch_mono_time,
218
24.2M
    Bind(&LogCache::LogCallback, Unretained(this), prepare_result.last_idx_in_batch, callback));
219
220
24.2M
  if (!log_status.ok()) {
221
0
    LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Couldn't append to log: " << log_status;
222
0
    return log_status;
223
0
  }
224
225
24.2M
  metrics_.size->IncrementBy(prepare_result.mem_required);
226
24.2M
  metrics_.num_ops->IncrementBy(msgs.size());
227
228
24.2M
  return Status::OK();
229
24.2M
}
230
231
void LogCache::LogCallback(int64_t last_idx_in_batch,
232
                           const StatusCallback& user_callback,
233
24.3M
                           const Status& log_status) {
234
24.3M
  if (log_status.ok()) {
235
24.3M
    std::lock_guard<simple_spinlock> l(lock_);
236
24.3M
    if (min_pinned_op_index_ <= last_idx_in_batch) {
237
13.2M
      
VLOG_WITH_PREFIX_UNLOCKED192
(1) << "Updating pinned index to " << (last_idx_in_batch + 1)192
;
238
13.2M
      min_pinned_op_index_ = last_idx_in_batch + 1;
239
13.2M
    }
240
24.3M
  }
241
24.3M
  user_callback.Run(log_status);
242
24.3M
}
243
244
606
int64_t LogCache::earliest_op_index() const {
245
606
  auto ret = log_->GetLogReader()->GetMinReplicateIndex();
246
606
  if (ret == -1) { // No finalized log files yet.  Query the active log.
247
606
    ret = log_->GetMinReplicateIndex();
248
606
  }
249
606
  return ret;
250
606
}
251
252
34.7M
bool LogCache::HasOpBeenWritten(int64_t index) const {
253
34.7M
  std::lock_guard<simple_spinlock> l(lock_);
254
34.7M
  return index < next_sequential_op_index_;
255
34.7M
}
256
257
60.3M
Result<yb::OpId> LogCache::LookupOpId(int64_t op_index) const {
258
  // First check the log cache itself.
259
60.3M
  {
260
60.3M
    std::lock_guard<simple_spinlock> l(lock_);
261
262
    // We sometimes try to look up OpIds that have never been written on
263
    // the local node. In that case, don't try to read the op from the
264
    // log reader, since it might actually race against the writing of the op.
265
60.3M
    if (op_index >= next_sequential_op_index_) {
266
45
      return STATUS(Incomplete, Substitute("Op with index $0 is ahead of the local log "
267
45
                                          "(next sequential op: $1)",
268
45
                                          op_index, next_sequential_op_index_));
269
45
    }
270
60.3M
    auto iter = cache_.find(op_index);
271
60.3M
    if (iter != cache_.end()) {
272
19.5M
      return yb::OpId::FromPB(iter->second.msg->id());
273
19.5M
    }
274
60.3M
  }
275
276
  // If it misses, read from the log.
277
40.8M
  return log_->GetLogReader()->LookupOpId(op_index);
278
60.3M
}
279
280
namespace {
281
282
// Calculate the total byte size that will be used on the wire to replicate this message as part of
283
// a consensus update request. This accounts for the length delimiting and tagging of the message.
284
9.45M
int64_t TotalByteSizeForMessage(const ReplicateMsg& msg) {
285
9.45M
  auto msg_size = google::protobuf::internal::WireFormatLite::LengthDelimitedSize(
286
9.45M
    msg.ByteSize());
287
9.45M
  msg_size += 1; // for the type tag
288
9.45M
  return msg_size;
289
9.45M
}
290
291
} // anonymous namespace
292
293
14
Result<ReadOpsResult> LogCache::ReadOps(int64_t after_op_index, size_t max_size_bytes) {
294
14
  return ReadOps(after_op_index, 0 /* to_op_index */, max_size_bytes);
295
14
}
296
297
Result<ReadOpsResult> LogCache::ReadOps(int64_t after_op_index,
298
                                        int64_t to_op_index,
299
                                        size_t max_size_bytes,
300
25.4M
                                        CoarseTimePoint deadline) {
301
25.4M
  DCHECK_GE(after_op_index, 0);
302
303
25.4M
  
VLOG_WITH_PREFIX_UNLOCKED32.1k
(4) << "ReadOps, after_op_index: " << after_op_index
304
32.1k
                               << ", to_op_index: " << to_op_index
305
32.1k
                               << ", max_size_bytes: " << max_size_bytes;
306
25.4M
  ReadOpsResult result;
307
25.4M
  int64_t starting_op_segment_seq_num;
308
25.4M
  result.preceding_op = 
VERIFY_RESULT25.4M
(25.4M
LookupOpId(after_op_index));
309
310
0
  std::unique_lock<simple_spinlock> l(lock_);
311
25.4M
  int64_t next_index = after_op_index + 1;
312
25.4M
  int64_t to_index = to_op_index > 0
313
25.4M
      ? 
std::min(to_op_index + 1, next_sequential_op_index_)629
314
25.4M
      : 
next_sequential_op_index_25.4M
;
315
316
  // Remove the deadline if the GetChanges deadline feature is disabled.
317
25.4M
  if (!ANNOTATE_UNPROTECTED_READ(FLAGS_get_changes_honor_deadline)) {
318
0
    deadline = CoarseTimePoint::max();
319
0
  }
320
321
  // Return as many operations as we can, up to the limit.
322
25.4M
  int64_t remaining_space = max_size_bytes;
323
33.7M
  while (remaining_space >= 0 && 
next_index < to_index33.7M
) {
324
    // Stop reading if a deadline was specified and the deadline has been exceeded.
325
8.28M
    if (deadline != CoarseTimePoint::max() && 
CoarseMonoClock::Now() >= deadline990
) {
326
0
      break;
327
0
    }
328
329
    // If the messages the peer needs haven't been loaded into the queue yet, load them.
330
8.28M
    MessageCache::const_iterator iter = cache_.lower_bound(next_index);
331
8.28M
    if (iter == cache_.end() || 
iter->first != next_index8.28M
) {
332
2.30k
      int64_t up_to;
333
2.30k
      if (iter == cache_.end()) {
334
        // Read all the way to the current op.
335
1.26k
        up_to = to_index - 1;
336
1.26k
      } else {
337
        // Read up to the next entry that's in the cache or to_index whichever is lesser.
338
1.04k
        up_to = std::min(iter->first - 1, to_index - 1);
339
1.04k
      }
340
341
2.30k
      l.unlock();
342
343
2.30k
      ReplicateMsgs raw_replicate_ptrs;
344
2.30k
      RETURN_NOT_OK_PREPEND(
345
2.30k
          log_->GetLogReader()->ReadReplicatesInRange(
346
2.30k
              next_index, up_to, remaining_space, &raw_replicate_ptrs, &starting_op_segment_seq_num,
347
2.30k
              &result.header_schema, &(result.header_schema_version), deadline),
348
2.30k
          Substitute("Failed to read ops $0..$1", next_index, up_to));
349
350
2.30k
      if ((starting_op_segment_seq_num != -1) && !result.header_schema.IsInitialized()) {
351
0
        scoped_refptr<log::ReadableLogSegment> segment =
352
0
            log_->GetLogReader()->GetSegmentBySequenceNumber(starting_op_segment_seq_num);
353
354
0
        if (segment != nullptr && segment->header().has_unused_schema()) {
355
0
          result.header_schema.CopyFrom(segment->header().unused_schema());
356
0
          result.header_schema_version = segment->header().unused_schema_version();
357
0
        }
358
0
      }
359
360
2.30k
      metrics_.disk_reads->IncrementBy(raw_replicate_ptrs.size());
361
2.30k
      LOG_WITH_PREFIX_UNLOCKED(INFO)
362
2.30k
          << "Successfully read " << raw_replicate_ptrs.size() << " ops from disk.";
363
2.30k
      l.lock();
364
365
24.9k
      for (auto& msg : raw_replicate_ptrs) {
366
24.9k
        CHECK_EQ(next_index, msg->id().index());
367
368
24.9k
        auto current_message_size = TotalByteSizeForMessage(*msg);
369
24.9k
        remaining_space -= current_message_size;
370
24.9k
        if (remaining_space < 0 && 
!result.messages.empty()1.02k
) {
371
27
          break;
372
27
        }
373
24.9k
        result.messages.push_back(msg);
374
24.9k
        if (msg->op_type() == consensus::OperationType::CHANGE_METADATA_OP) {
375
1.78k
          result.header_schema.CopyFrom(msg->change_metadata_request().schema());
376
1.78k
          result.header_schema_version = msg->change_metadata_request().schema_version();
377
1.78k
        }
378
24.9k
        result.read_from_disk_size += current_message_size;
379
24.9k
        next_index++;
380
24.9k
      }
381
8.27M
    } else {
382
8.27M
      starting_op_segment_seq_num = VERIFY_RESULT(log_->GetLogReader()->LookupHeader(next_index));
383
384
8.27M
      if ((starting_op_segment_seq_num != -1)) {
385
4.82M
        scoped_refptr<log::ReadableLogSegment> segment =
386
4.82M
            log_->GetLogReader()->GetSegmentBySequenceNumber(starting_op_segment_seq_num);
387
4.82M
        if (
segment != nullptr4.82M
&&
segment->header().has_unused_schema()4.82M
) {
388
4.82M
          result.header_schema.CopyFrom(segment->header().unused_schema());
389
4.82M
          result.header_schema_version = segment->header().unused_schema_version();
390
4.82M
        }
391
4.82M
      }
392
      // Pull contiguous messages from the cache until the size limit is achieved.
393
17.7M
      for (; iter != cache_.end(); 
++iter9.42M
) {
394
9.42M
        if (to_op_index > 0 && 
next_index > to_op_index6.92k
) {
395
4
          break;
396
4
        }
397
9.42M
        const ReplicateMsgPtr& msg = iter->second.msg;
398
9.42M
        int64_t index = msg->id().index();
399
9.42M
        if (index != next_index) {
400
0
          continue;
401
0
        }
402
403
9.42M
        auto current_message_size = TotalByteSizeForMessage(*msg);
404
9.42M
        remaining_space -= current_message_size;
405
9.42M
        if (remaining_space < 0 && 
!result.messages.empty()4.05k
) {
406
1.04k
          break;
407
1.04k
        }
408
409
9.42M
        result.messages.push_back(msg);
410
9.42M
        if (msg->op_type() == consensus::OperationType::CHANGE_METADATA_OP) {
411
651k
          result.header_schema.CopyFrom(msg->change_metadata_request().schema());
412
651k
          result.header_schema_version = msg->change_metadata_request().schema_version();
413
651k
        }
414
9.42M
        next_index++;
415
9.42M
      }
416
8.27M
    }
417
8.28M
  }
418
25.4M
  result.have_more_messages = HaveMoreMessages(remaining_space < 0);
419
25.4M
  return result;
420
25.4M
}
421
422
49.7M
size_t LogCache::EvictThroughOp(int64_t index, int64_t bytes_to_evict) {
423
49.7M
  std::lock_guard<simple_spinlock> lock(lock_);
424
49.7M
  return EvictSomeUnlocked(index, bytes_to_evict);
425
49.7M
}
426
427
49.7M
size_t LogCache::EvictSomeUnlocked(int64_t stop_after_index, int64_t bytes_to_evict) {
428
49.7M
  DCHECK(lock_.is_locked());
429
49.7M
  
VLOG_WITH_PREFIX_UNLOCKED2.63k
(2) << "Evicting log cache index <= "
430
2.63k
                      << stop_after_index
431
2.63k
                      << " or " << HumanReadableNumBytes::ToString(bytes_to_evict)
432
2.63k
                      << ": before state: " << ToStringUnlocked();
433
434
49.7M
  if (ANNOTATE_UNPROTECTED_READ(FLAGS_TEST_log_cache_skip_eviction)) {
435
0
    return 0;
436
0
  }
437
438
49.7M
  int64_t bytes_evicted = 0;
439
114M
  for (auto iter = cache_.begin(); iter != cache_.end();) {
440
77.8M
    const CacheEntry& entry = iter->second;
441
77.8M
    const ReplicateMsgPtr& msg = entry.msg;
442
77.8M
    
VLOG_WITH_PREFIX_UNLOCKED2.22k
(2) << "considering for eviction: " << msg->id()2.22k
;
443
77.8M
    int64_t msg_index = msg->id().index();
444
77.8M
    if (msg_index == 0) {
445
      // Always keep our special '0' op.
446
49.7M
      ++iter;
447
49.7M
      continue;
448
49.7M
    }
449
450
28.1M
    if (msg_index > stop_after_index || 
msg_index >= min_pinned_op_index_14.5M
) {
451
13.5M
      break;
452
13.5M
    }
453
454
18.4E
    VLOG_WITH_PREFIX_UNLOCKED(2) << "Evicting cache. Removing: " << msg->id();
455
14.5M
    AccountForMessageRemovalUnlocked(entry);
456
14.5M
    bytes_evicted += entry.mem_usage;
457
14.5M
    cache_.erase(iter++);
458
459
14.5M
    if (bytes_evicted >= bytes_to_evict) {
460
5.12k
      break;
461
5.12k
    }
462
14.5M
  }
463
18.4E
  VLOG_WITH_PREFIX_UNLOCKED(1) << "Evicting log cache: after state: " << ToStringUnlocked();
464
465
49.7M
  return bytes_evicted;
466
49.7M
}
467
468
0
Status LogCache::FlushIndex() {
469
0
  return log_->FlushIndex();
470
0
}
471
472
14.5M
void LogCache::AccountForMessageRemovalUnlocked(const CacheEntry& entry) {
473
14.5M
  if (entry.tracked) {
474
2.89M
    tracker_->Release(entry.mem_usage);
475
2.89M
  }
476
14.5M
  metrics_.size->DecrementBy(entry.mem_usage);
477
14.5M
  metrics_.num_ops->Decrement();
478
14.5M
}
479
480
7
int64_t LogCache::BytesUsed() const {
481
7
  return tracker_->consumption();
482
7
}
483
484
3
Result<OpId> LogCache::TEST_GetLastOpIdWithType(int64_t max_allowed_index, OperationType op_type) {
485
3
  constexpr int kStepSize = 20;
486
5
  for (auto end = max_allowed_index; end > 0; 
end -= kStepSize2
) {
487
3
    auto result = VERIFY_RESULT(ReadOps(
488
3
        std::max<int64_t>(0, end - kStepSize), end, std::numeric_limits<int>::max()));
489
5
    for (auto it = result.messages.end(); it != result.messages.begin();) {
490
3
      --it;
491
3
      if ((**it).op_type() == op_type) {
492
1
        return OpId::FromPB((**it).id());
493
1
      }
494
3
    }
495
3
  }
496
2
  return STATUS_FORMAT(NotFound, "Operation of type $0 not found before $1",
497
3
                       OperationType_Name(op_type), max_allowed_index);
498
3
}
499
500
76
string LogCache::StatsString() const {
501
76
  std::lock_guard<simple_spinlock> lock(lock_);
502
76
  return StatsStringUnlocked();
503
76
}
504
505
81
string LogCache::StatsStringUnlocked() const {
506
81
  return Substitute("LogCacheStats(num_ops=$0, bytes=$1, disk_reads=$2)",
507
81
                    metrics_.num_ops->value(),
508
81
                    metrics_.size->value(),
509
81
                    metrics_.disk_reads->value());
510
81
}
511
512
1
std::string LogCache::ToString() const {
513
1
  std::lock_guard<simple_spinlock> lock(lock_);
514
1
  return ToStringUnlocked();
515
1
}
516
517
5
std::string LogCache::ToStringUnlocked() const {
518
5
  return Substitute("Pinned index: $0, $1",
519
5
                    min_pinned_op_index_,
520
5
                    StatsStringUnlocked());
521
5
}
522
523
2.31k
std::string LogCache::LogPrefixUnlocked() const {
524
2.31k
  return MakeTabletLogPrefix(tablet_id_, local_uuid_);
525
2.31k
}
526
527
0
void LogCache::DumpToLog() const {
528
0
  vector<string> strings;
529
0
  DumpToStrings(&strings);
530
0
  for (const string& s : strings) {
531
0
    LOG_WITH_PREFIX_UNLOCKED(INFO) << s;
532
0
  }
533
0
}
534
535
0
void LogCache::DumpToStrings(vector<string>* lines) const {
536
0
  std::lock_guard<simple_spinlock> lock(lock_);
537
0
  int counter = 0;
538
0
  lines->push_back(ToStringUnlocked());
539
0
  lines->push_back("Messages:");
540
0
  for (const auto& entry : cache_) {
541
0
    const ReplicateMsgPtr msg = entry.second.msg;
542
0
    lines->push_back(
543
0
      Substitute("Message[$0] $1.$2 : REPLICATE. Type: $3, Size: $4",
544
0
                 counter++, msg->id().term(), msg->id().index(),
545
0
                 OperationType_Name(msg->op_type()),
546
0
                 msg->ByteSize()));
547
0
  }
548
0
}
549
550
38
void LogCache::DumpToHtml(std::ostream& out) const {
551
38
  using std::endl;
552
553
38
  std::lock_guard<simple_spinlock> lock(lock_);
554
38
  out << "<h3>Messages:</h3>" << endl;
555
38
  out << "<table>" << endl;
556
38
  out << "<tr><th>Entry</th><th>OpId</th><th>Type</th><th>Size</th><th>Status</th></tr>" << endl;
557
558
38
  int counter = 0;
559
60
  for (const auto& entry : cache_) {
560
60
    const ReplicateMsgPtr msg = entry.second.msg;
561
60
    out << Substitute("<tr><th>$0</th><th>$1.$2</th><td>REPLICATE $3</td>"
562
60
                      "<td>$4</td><td>$5</td></tr>",
563
60
                      counter++, msg->id().term(), msg->id().index(),
564
60
                      OperationType_Name(msg->op_type()),
565
60
                      msg->ByteSize(), msg->id().ShortDebugString()) << endl;
566
60
  }
567
38
  out << "</table>";
568
38
}
569
570
13.0M
void LogCache::TrackOperationsMemory(const OpIds& op_ids) {
571
13.0M
  if (op_ids.empty()) {
572
198k
    return;
573
198k
  }
574
575
12.8M
  std::lock_guard<simple_spinlock> lock(lock_);
576
577
12.8M
  size_t mem_required = 0;
578
14.3M
  for (const auto& op_id : op_ids) {
579
14.3M
    auto it = cache_.find(op_id.index);
580
14.3M
    if (it != cache_.end() && 
it->second.msg->id().term() == op_id.term2.90M
) {
581
2.90M
      mem_required += it->second.mem_usage;
582
2.90M
      it->second.tracked = true;
583
2.90M
    }
584
14.3M
  }
585
586
12.8M
  if (mem_required == 0) {
587
10.2M
    return;
588
10.2M
  }
589
590
  // Try to consume the memory. If it can't be consumed, we may need to evict.
591
2.58M
  if (!tracker_->TryConsume(mem_required)) {
592
5.12k
    auto spare = tracker_->SpareCapacity();
593
5.12k
    auto need_to_free = mem_required - spare;
594
5.12k
    
VLOG_WITH_PREFIX_UNLOCKED0
(1)
595
0
        << "Memory limit would be exceeded trying to append "
596
0
        << HumanReadableNumBytes::ToString(mem_required)
597
0
        << " to log cache (available="
598
0
        << HumanReadableNumBytes::ToString(spare)
599
0
        << "): attempting to evict some operations...";
600
601
5.12k
    tracker_->Consume(mem_required);
602
603
    // TODO: we should also try to evict from other tablets - probably better to evict really old
604
    // ops from another tablet than evict recent ops from this one.
605
5.12k
    EvictSomeUnlocked(min_pinned_op_index_, need_to_free);
606
5.12k
  }
607
2.58M
}
608
609
7
int64_t LogCache::num_cached_ops() const {
610
7
  return metrics_.num_ops->value();
611
7
}
612
613
#define INSTANTIATE_METRIC(x, ...) \
614
  x(BOOST_PP_CAT(METRIC_log_cache_, x).Instantiate(metric_entity, ## __VA_ARGS__))
615
LogCache::Metrics::Metrics(const scoped_refptr<MetricEntity>& metric_entity)
616
  : INSTANTIATE_METRIC(num_ops, 0),
617
    INSTANTIATE_METRIC(size, 0),
618
150k
    INSTANTIATE_METRIC(disk_reads) {
619
150k
}
620
#undef INSTANTIATE_METRIC
621
622
} // namespace consensus
623
} // namespace yb