YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/consensus/log_cache.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/consensus/log_cache.h"
34
35
#include <algorithm>
36
#include <map>
37
#include <mutex>
38
#include <vector>
39
40
#include "yb/consensus/consensus_util.h"
41
#include "yb/consensus/log.h"
42
#include "yb/consensus/log_reader.h"
43
#include "yb/consensus/opid_util.h"
44
45
#include "yb/gutil/bind.h"
46
#include "yb/gutil/map-util.h"
47
#include "yb/gutil/strings/human_readable.h"
48
49
#include "yb/util/flag_tags.h"
50
#include "yb/util/format.h"
51
#include "yb/util/locks.h"
52
#include "yb/util/logging.h"
53
#include "yb/util/mem_tracker.h"
54
#include "yb/util/metrics.h"
55
#include "yb/util/monotime.h"
56
#include "yb/util/result.h"
57
#include "yb/util/size_literals.h"
58
#include "yb/util/status_format.h"
59
60
using namespace std::literals;
61
62
DEFINE_int32(log_cache_size_limit_mb, 128,
63
             "The total per-tablet size of consensus entries which may be kept in memory. "
64
             "The log cache attempts to keep all entries which have not yet been replicated "
65
             "to all followers in memory, but if the total size of those entries exceeds "
66
             "this limit within an individual tablet, the oldest will be evicted.");
67
TAG_FLAG(log_cache_size_limit_mb, advanced);
68
69
DEFINE_int32(global_log_cache_size_limit_mb, 1024,
70
             "Server-wide version of 'log_cache_size_limit_mb'. The total memory used for "
71
             "caching log entries across all tablets is kept under this threshold.");
72
TAG_FLAG(global_log_cache_size_limit_mb, advanced);
73
74
DEFINE_int32(global_log_cache_size_limit_percentage, 5,
75
             "The maximum percentage of root process memory that can be used for caching log "
76
             "entries across all tablets. Default is 5.");
77
TAG_FLAG(global_log_cache_size_limit_percentage, advanced);
78
79
DEFINE_test_flag(bool, log_cache_skip_eviction, false,
80
                 "Don't evict log entries in tests.");
81
82
using strings::Substitute;
83
84
METRIC_DEFINE_gauge_int64(tablet, log_cache_num_ops, "Log Cache Operation Count",
85
                          yb::MetricUnit::kOperations,
86
                          "Number of operations in the log cache.");
87
METRIC_DEFINE_gauge_int64(tablet, log_cache_size, "Log Cache Memory Usage",
88
                          yb::MetricUnit::kBytes,
89
                          "Amount of memory in use for caching the local log.");
90
METRIC_DEFINE_counter(tablet, log_cache_disk_reads, "Log Cache Disk Reads",
91
                      yb::MetricUnit::kEntries,
92
                      "Amount of operations read from disk.");
93
94
DECLARE_bool(get_changes_honor_deadline);
95
96
namespace yb {
97
namespace consensus {
98
99
namespace {
100
101
const std::string kParentMemTrackerId = "log_cache"s;
102
103
}
104
105
typedef vector<const ReplicateMsg*>::const_iterator MsgIter;
106
107
LogCache::LogCache(const scoped_refptr<MetricEntity>& metric_entity,
108
                   const log::LogPtr& log,
109
                   const MemTrackerPtr& server_tracker,
110
                   const string& local_uuid,
111
                   const string& tablet_id)
112
  : log_(log),
113
    local_uuid_(local_uuid),
114
    tablet_id_(tablet_id),
115
    next_sequential_op_index_(0),
116
    min_pinned_op_index_(0),
117
88.8k
    metrics_(metric_entity) {
118
119
88.8k
  const int64_t max_ops_size_bytes = FLAGS_log_cache_size_limit_mb * 1_MB;
120
121
  // Set up (or reuse) a tracker with the global limit. It is parented directly to the root tracker
122
  // so that it's always global.
123
88.8k
  parent_tracker_ = GetServerMemTracker(server_tracker);
124
125
  // And create a child tracker with the per-tablet limit.
126
88.8k
  tracker_ = MemTracker::CreateTracker(
127
88.8k
      max_ops_size_bytes, Format("$0-$1", kParentMemTrackerId, tablet_id), parent_tracker_,
128
88.8k
      AddToParent::kTrue, CreateMetrics::kFalse);
129
88.8k
  tracker_->SetMetricEntity(metric_entity, kParentMemTrackerId);
130
131
  // Put a fake message at index 0, since this simplifies a lot of our code paths elsewhere.
132
88.8k
  auto zero_op = std::make_shared<ReplicateMsg>();
133
88.8k
  *zero_op->mutable_id() = MinimumOpId();
134
88.8k
  InsertOrDie(&cache_, 0, { zero_op, zero_op->SpaceUsed() });
135
88.8k
}
136
137
100k
MemTrackerPtr LogCache::GetServerMemTracker(const MemTrackerPtr& server_tracker) {
138
18.4E
  CHECK(FLAGS_global_log_cache_size_limit_percentage > 0 &&
139
18.4E
        FLAGS_global_log_cache_size_limit_percentage <= 100)
140
18.4E
    << Substitute("Flag FLAGS_global_log_cache_size_limit_percentage must be between 0 and 100. ",
141
18.4E
                  "Current value: $0",
142
18.4E
                  FLAGS_global_log_cache_size_limit_percentage);
143
144
100k
  int64_t global_max_ops_size_bytes = FLAGS_global_log_cache_size_limit_mb * 1_MB;
145
100k
  int64_t root_mem_limit = MemTracker::GetRootTracker()->limit();
146
100k
  global_max_ops_size_bytes = std::min(
147
100k
      global_max_ops_size_bytes,
148
100k
      root_mem_limit * FLAGS_global_log_cache_size_limit_percentage / 100);
149
100k
  return MemTracker::FindOrCreateTracker(
150
100k
      global_max_ops_size_bytes, kParentMemTrackerId, server_tracker);
151
100k
}
152
153
47.8k
LogCache::~LogCache() {
154
47.8k
  tracker_->Release(tracker_->consumption());
155
47.8k
  cache_.clear();
156
157
47.8k
  tracker_->UnregisterFromParent();
158
47.8k
}
159
160
88.7k
void LogCache::Init(const OpIdPB& preceding_op) {
161
88.7k
  std::lock_guard<simple_spinlock> l(lock_);
162
0
  CHECK_EQ(cache_.size(), 1) << "Cache should have only our special '0' op";
163
88.7k
  next_sequential_op_index_ = preceding_op.index() + 1;
164
88.7k
  min_pinned_op_index_ = next_sequential_op_index_;
165
88.7k
}
166
167
7.15M
LogCache::PrepareAppendResult LogCache::PrepareAppendOperations(const ReplicateMsgs& msgs) {
168
  // SpaceUsed is relatively expensive, so do calculations outside the lock
169
7.15M
  PrepareAppendResult result;
170
7.15M
  std::vector<CacheEntry> entries_to_insert;
171
7.15M
  entries_to_insert.reserve(msgs.size());
172
7.90M
  for (const auto& msg : msgs) {
173
7.90M
    CacheEntry e = { msg, static_cast<int64_t>(msg->SpaceUsedLong()) };
174
7.90M
    result.mem_required += e.mem_usage;
175
7.90M
    entries_to_insert.emplace_back(std::move(e));
176
7.90M
  }
177
178
7.15M
  int64_t first_idx_in_batch = msgs.front()->id().index();
179
7.15M
  result.last_idx_in_batch = msgs.back()->id().index();
180
181
7.15M
  std::unique_lock<simple_spinlock> lock(lock_);
182
  // If we're not appending a consecutive op we're likely overwriting and need to replace operations
183
  // in the cache.
184
7.15M
  if (first_idx_in_batch != next_sequential_op_index_) {
185
    // If the index is not consecutive then it must be lower than or equal to the last index, i.e.
186
    // we're overwriting.
187
123
    CHECK_LE(first_idx_in_batch, next_sequential_op_index_);
188
189
    // Now remove the overwritten operations.
190
349
    for (int64_t i = first_idx_in_batch; i < next_sequential_op_index_; ++i) {
191
226
      auto it = cache_.find(i);
192
226
      if (it != cache_.end()) {
193
22
        AccountForMessageRemovalUnlocked(it->second);
194
22
        cache_.erase(it);
195
22
      }
196
226
    }
197
123
  }
198
199
7.91M
  for (auto& e : entries_to_insert) {
200
7.91M
    auto index = e.msg->id().index();
201
7.91M
    EmplaceOrDie(&cache_, index, std::move(e));
202
7.91M
    next_sequential_op_index_ = index + 1;
203
7.91M
  }
204
205
7.15M
  return result;
206
7.15M
}
207
208
Status LogCache::AppendOperations(const ReplicateMsgs& msgs, const yb::OpId& committed_op_id,
209
                                  RestartSafeCoarseTimePoint batch_mono_time,
210
13.1M
                                  const StatusCallback& callback) {
211
13.1M
  PrepareAppendResult prepare_result;
212
13.1M
  if (!msgs.empty()) {
213
7.14M
    prepare_result = PrepareAppendOperations(msgs);
214
7.14M
  }
215
216
13.1M
  Status log_status = log_->AsyncAppendReplicates(
217
13.1M
    msgs, committed_op_id, batch_mono_time,
218
13.1M
    Bind(&LogCache::LogCallback, Unretained(this), prepare_result.last_idx_in_batch, callback));
219
220
13.1M
  if (!log_status.ok()) {
221
0
    LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Couldn't append to log: " << log_status;
222
0
    return log_status;
223
0
  }
224
225
13.1M
  metrics_.size->IncrementBy(prepare_result.mem_required);
226
13.1M
  metrics_.num_ops->IncrementBy(msgs.size());
227
228
13.1M
  return Status::OK();
229
13.1M
}
230
231
void LogCache::LogCallback(int64_t last_idx_in_batch,
232
                           const StatusCallback& user_callback,
233
13.1M
                           const Status& log_status) {
234
13.1M
  if (log_status.ok()) {
235
13.1M
    std::lock_guard<simple_spinlock> l(lock_);
236
13.1M
    if (min_pinned_op_index_ <= last_idx_in_batch) {
237
84
      VLOG_WITH_PREFIX_UNLOCKED(1) << "Updating pinned index to " << (last_idx_in_batch + 1);
238
7.15M
      min_pinned_op_index_ = last_idx_in_batch + 1;
239
7.15M
    }
240
13.1M
  }
241
13.1M
  user_callback.Run(log_status);
242
13.1M
}
243
244
302
int64_t LogCache::earliest_op_index() const {
245
302
  auto ret = log_->GetLogReader()->GetMinReplicateIndex();
246
302
  if (ret == -1) { // No finalized log files yet.  Query the active log.
247
302
    ret = log_->GetMinReplicateIndex();
248
302
  }
249
302
  return ret;
250
302
}
251
252
15.1M
bool LogCache::HasOpBeenWritten(int64_t index) const {
253
15.1M
  std::lock_guard<simple_spinlock> l(lock_);
254
15.1M
  return index < next_sequential_op_index_;
255
15.1M
}
256
257
25.4M
Result<yb::OpId> LogCache::LookupOpId(int64_t op_index) const {
258
  // First check the log cache itself.
259
25.4M
  {
260
25.4M
    std::lock_guard<simple_spinlock> l(lock_);
261
262
    // We sometimes try to look up OpIds that have never been written on
263
    // the local node. In that case, don't try to read the op from the
264
    // log reader, since it might actually race against the writing of the op.
265
25.4M
    if (op_index >= next_sequential_op_index_) {
266
4
      return STATUS(Incomplete, Substitute("Op with index $0 is ahead of the local log "
267
4
                                          "(next sequential op: $1)",
268
4
                                          op_index, next_sequential_op_index_));
269
4
    }
270
25.4M
    auto iter = cache_.find(op_index);
271
25.4M
    if (iter != cache_.end()) {
272
10.7M
      return yb::OpId::FromPB(iter->second.msg->id());
273
10.7M
    }
274
14.7M
  }
275
276
  // If it misses, read from the log.
277
14.7M
  return log_->GetLogReader()->LookupOpId(op_index);
278
14.7M
}
279
280
namespace {
281
282
// Calculate the total byte size that will be used on the wire to replicate this message as part of
283
// a consensus update request. This accounts for the length delimiting and tagging of the message.
284
5.20M
int64_t TotalByteSizeForMessage(const ReplicateMsg& msg) {
285
5.20M
  auto msg_size = google::protobuf::internal::WireFormatLite::LengthDelimitedSize(
286
5.20M
    msg.ByteSize());
287
5.20M
  msg_size += 1; // for the type tag
288
5.20M
  return msg_size;
289
5.20M
}
290
291
} // anonymous namespace
292
293
14
Result<ReadOpsResult> LogCache::ReadOps(int64_t after_op_index, size_t max_size_bytes) {
294
14
  return ReadOps(after_op_index, 0 /* to_op_index */, max_size_bytes);
295
14
}
296
297
Result<ReadOpsResult> LogCache::ReadOps(int64_t after_op_index,
298
                                        int64_t to_op_index,
299
                                        size_t max_size_bytes,
300
9.42M
                                        CoarseTimePoint deadline) {
301
9.42M
  DCHECK_GE(after_op_index, 0);
302
303
9.43k
  VLOG_WITH_PREFIX_UNLOCKED(4) << "ReadOps, after_op_index: " << after_op_index
304
9.43k
                               << ", to_op_index: " << to_op_index
305
9.43k
                               << ", max_size_bytes: " << max_size_bytes;
306
9.42M
  ReadOpsResult result;
307
9.42M
  int64_t starting_op_segment_seq_num;
308
9.42M
  result.preceding_op = VERIFY_RESULT(LookupOpId(after_op_index));
309
310
9.42M
  std::unique_lock<simple_spinlock> l(lock_);
311
9.42M
  int64_t next_index = after_op_index + 1;
312
9.42M
  int64_t to_index = to_op_index > 0
313
307
      ? std::min(to_op_index + 1, next_sequential_op_index_)
314
9.42M
      : next_sequential_op_index_;
315
316
  // Remove the deadline if the GetChanges deadline feature is disabled.
317
9.42M
  if (!ANNOTATE_UNPROTECTED_READ(FLAGS_get_changes_honor_deadline)) {
318
0
    deadline = CoarseTimePoint::max();
319
0
  }
320
321
  // Return as many operations as we can, up to the limit.
322
9.42M
  int64_t remaining_space = max_size_bytes;
323
13.6M
  while (remaining_space >= 0 && next_index < to_index) {
324
    // Stop reading if a deadline was specified and the deadline has been exceeded.
325
4.21M
    if (deadline != CoarseTimePoint::max() && CoarseMonoClock::Now() >= deadline) {
326
0
      break;
327
0
    }
328
329
    // If the messages the peer needs haven't been loaded into the queue yet, load them.
330
4.21M
    MessageCache::const_iterator iter = cache_.lower_bound(next_index);
331
4.21M
    if (iter == cache_.end() || iter->first != next_index) {
332
398
      int64_t up_to;
333
398
      if (iter == cache_.end()) {
334
        // Read all the way to the current op.
335
102
        up_to = to_index - 1;
336
296
      } else {
337
        // Read up to the next entry that's in the cache or to_index whichever is lesser.
338
296
        up_to = std::min(iter->first - 1, to_index - 1);
339
296
      }
340
341
398
      l.unlock();
342
343
398
      ReplicateMsgs raw_replicate_ptrs;
344
398
      RETURN_NOT_OK_PREPEND(
345
398
          log_->GetLogReader()->ReadReplicatesInRange(
346
398
              next_index, up_to, remaining_space, &raw_replicate_ptrs, &starting_op_segment_seq_num,
347
398
              &result.header_schema, &(result.header_schema_version), deadline),
348
398
          Substitute("Failed to read ops $0..$1", next_index, up_to));
349
350
398
      if ((starting_op_segment_seq_num != -1) && !result.header_schema.IsInitialized()) {
351
0
        scoped_refptr<log::ReadableLogSegment> segment =
352
0
            log_->GetLogReader()->GetSegmentBySequenceNumber(starting_op_segment_seq_num);
353
354
0
        if (segment != nullptr && segment->header().has_unused_schema()) {
355
0
          result.header_schema.CopyFrom(segment->header().unused_schema());
356
0
          result.header_schema_version = segment->header().unused_schema_version();
357
0
        }
358
0
      }
359
360
398
      metrics_.disk_reads->IncrementBy(raw_replicate_ptrs.size());
361
398
      LOG_WITH_PREFIX_UNLOCKED(INFO)
362
398
          << "Successfully read " << raw_replicate_ptrs.size() << " ops from disk.";
363
398
      l.lock();
364
365
940
      for (auto& msg : raw_replicate_ptrs) {
366
940
        CHECK_EQ(next_index, msg->id().index());
367
368
940
        auto current_message_size = TotalByteSizeForMessage(*msg);
369
940
        remaining_space -= current_message_size;
370
940
        if (remaining_space < 0 && !result.messages.empty()) {
371
0
          break;
372
0
        }
373
940
        result.messages.push_back(msg);
374
940
        if (msg->op_type() == consensus::OperationType::CHANGE_METADATA_OP) {
375
301
          result.header_schema.CopyFrom(msg->change_metadata_request().schema());
376
301
          result.header_schema_version = msg->change_metadata_request().schema_version();
377
301
        }
378
940
        result.read_from_disk_size += current_message_size;
379
940
        next_index++;
380
940
      }
381
4.21M
    } else {
382
4.21M
      starting_op_segment_seq_num = VERIFY_RESULT(log_->GetLogReader()->LookupHeader(next_index));
383
384
4.21M
      if ((starting_op_segment_seq_num != -1)) {
385
2.50M
        scoped_refptr<log::ReadableLogSegment> segment =
386
2.50M
            log_->GetLogReader()->GetSegmentBySequenceNumber(starting_op_segment_seq_num);
387
2.50M
        if (segment != nullptr && segment->header().has_unused_schema()) {
388
2.50M
          result.header_schema.CopyFrom(segment->header().unused_schema());
389
2.50M
          result.header_schema_version = segment->header().unused_schema_version();
390
2.50M
        }
391
2.50M
      }
392
      // Pull contiguous messages from the cache until the size limit is achieved.
393
8.90M
      for (; iter != cache_.end(); ++iter) {
394
4.68M
        if (to_op_index > 0 && next_index > to_op_index) {
395
0
          break;
396
0
        }
397
4.68M
        const ReplicateMsgPtr& msg = iter->second.msg;
398
4.68M
        int64_t index = msg->id().index();
399
4.68M
        if (index != next_index) {
400
0
          continue;
401
0
        }
402
403
4.68M
        auto current_message_size = TotalByteSizeForMessage(*msg);
404
4.68M
        remaining_space -= current_message_size;
405
4.68M
        if (remaining_space < 0 && !result.messages.empty()) {
406
0
          break;
407
0
        }
408
409
4.68M
        result.messages.push_back(msg);
410
4.68M
        if (msg->op_type() == consensus::OperationType::CHANGE_METADATA_OP) {
411
320k
          result.header_schema.CopyFrom(msg->change_metadata_request().schema());
412
320k
          result.header_schema_version = msg->change_metadata_request().schema_version();
413
320k
        }
414
4.68M
        next_index++;
415
4.68M
      }
416
4.21M
    }
417
4.21M
  }
418
9.42M
  result.have_more_messages = remaining_space < 0;
419
9.42M
  return result;
420
9.42M
}
421
422
23.3M
size_t LogCache::EvictThroughOp(int64_t index, int64_t bytes_to_evict) {
423
23.3M
  std::lock_guard<simple_spinlock> lock(lock_);
424
23.3M
  return EvictSomeUnlocked(index, bytes_to_evict);
425
23.3M
}
426
427
23.3M
size_t LogCache::EvictSomeUnlocked(int64_t stop_after_index, int64_t bytes_to_evict) {
428
23.3M
  DCHECK(lock_.is_locked());
429
18.4E
  VLOG_WITH_PREFIX_UNLOCKED(2) << "Evicting log cache index <= "
430
18.4E
                      << stop_after_index
431
18.4E
                      << " or " << HumanReadableNumBytes::ToString(bytes_to_evict)
432
18.4E
                      << ": before state: " << ToStringUnlocked();
433
434
23.3M
  if (ANNOTATE_UNPROTECTED_READ(FLAGS_TEST_log_cache_skip_eviction)) {
435
0
    return 0;
436
0
  }
437
438
23.3M
  int64_t bytes_evicted = 0;
439
54.6M
  for (auto iter = cache_.begin(); iter != cache_.end();) {
440
38.7M
    const CacheEntry& entry = iter->second;
441
38.7M
    const ReplicateMsgPtr& msg = entry.msg;
442
18.4E
    VLOG_WITH_PREFIX_UNLOCKED(2) << "considering for eviction: " << msg->id();
443
38.7M
    int64_t msg_index = msg->id().index();
444
38.7M
    if (msg_index == 0) {
445
      // Always keep our special '0' op.
446
23.3M
      ++iter;
447
23.3M
      continue;
448
23.3M
    }
449
450
15.4M
    if (msg_index > stop_after_index || msg_index >= min_pinned_op_index_) {
451
7.51M
      break;
452
7.51M
    }
453
454
18.4E
    VLOG_WITH_PREFIX_UNLOCKED(2) << "Evicting cache. Removing: " << msg->id();
455
7.89M
    AccountForMessageRemovalUnlocked(entry);
456
7.89M
    bytes_evicted += entry.mem_usage;
457
7.89M
    cache_.erase(iter++);
458
459
7.89M
    if (bytes_evicted >= bytes_to_evict) {
460
2.39k
      break;
461
2.39k
    }
462
7.89M
  }
463
18.4E
  VLOG_WITH_PREFIX_UNLOCKED(1) << "Evicting log cache: after state: " << ToStringUnlocked();
464
465
23.3M
  return bytes_evicted;
466
23.3M
}
467
468
0
Status LogCache::FlushIndex() {
469
0
  return log_->FlushIndex();
470
0
}
471
472
7.90M
void LogCache::AccountForMessageRemovalUnlocked(const CacheEntry& entry) {
473
7.90M
  if (entry.tracked) {
474
1.63M
    tracker_->Release(entry.mem_usage);
475
1.63M
  }
476
7.90M
  metrics_.size->DecrementBy(entry.mem_usage);
477
7.90M
  metrics_.num_ops->Decrement();
478
7.90M
}
479
480
7
int64_t LogCache::BytesUsed() const {
481
7
  return tracker_->consumption();
482
7
}
483
484
3
Result<OpId> LogCache::TEST_GetLastOpIdWithType(int64_t max_allowed_index, OperationType op_type) {
485
3
  constexpr int kStepSize = 20;
486
6
  for (auto end = max_allowed_index; end > 0; end -= kStepSize) {
487
3
    auto result = VERIFY_RESULT(ReadOps(
488
3
        std::max<int64_t>(0, end - kStepSize), end, std::numeric_limits<int>::max()));
489
6
    for (auto it = result.messages.end(); it != result.messages.begin();) {
490
3
      --it;
491
3
      if ((**it).op_type() == op_type) {
492
0
        return OpId::FromPB((**it).id());
493
0
      }
494
3
    }
495
3
  }
496
3
  return STATUS_FORMAT(NotFound, "Operation of type $0 not found before $1",
497
3
                       OperationType_Name(op_type), max_allowed_index);
498
3
}
499
500
94
string LogCache::StatsString() const {
501
94
  std::lock_guard<simple_spinlock> lock(lock_);
502
94
  return StatsStringUnlocked();
503
94
}
504
505
97
string LogCache::StatsStringUnlocked() const {
506
97
  return Substitute("LogCacheStats(num_ops=$0, bytes=$1, disk_reads=$2)",
507
97
                    metrics_.num_ops->value(),
508
97
                    metrics_.size->value(),
509
97
                    metrics_.disk_reads->value());
510
97
}
511
512
1
std::string LogCache::ToString() const {
513
1
  std::lock_guard<simple_spinlock> lock(lock_);
514
1
  return ToStringUnlocked();
515
1
}
516
517
3
std::string LogCache::ToStringUnlocked() const {
518
3
  return Substitute("Pinned index: $0, $1",
519
3
                    min_pinned_op_index_,
520
3
                    StatsStringUnlocked());
521
3
}
522
523
1.77k
std::string LogCache::LogPrefixUnlocked() const {
524
1.77k
  return MakeTabletLogPrefix(tablet_id_, local_uuid_);
525
1.77k
}
526
527
0
void LogCache::DumpToLog() const {
528
0
  vector<string> strings;
529
0
  DumpToStrings(&strings);
530
0
  for (const string& s : strings) {
531
0
    LOG_WITH_PREFIX_UNLOCKED(INFO) << s;
532
0
  }
533
0
}
534
535
0
void LogCache::DumpToStrings(vector<string>* lines) const {
536
0
  std::lock_guard<simple_spinlock> lock(lock_);
537
0
  int counter = 0;
538
0
  lines->push_back(ToStringUnlocked());
539
0
  lines->push_back("Messages:");
540
0
  for (const auto& entry : cache_) {
541
0
    const ReplicateMsgPtr msg = entry.second.msg;
542
0
    lines->push_back(
543
0
      Substitute("Message[$0] $1.$2 : REPLICATE. Type: $3, Size: $4",
544
0
                 counter++, msg->id().term(), msg->id().index(),
545
0
                 OperationType_Name(msg->op_type()),
546
0
                 msg->ByteSize()));
547
0
  }
548
0
}
549
550
47
void LogCache::DumpToHtml(std::ostream& out) const {
551
47
  using std::endl;
552
553
47
  std::lock_guard<simple_spinlock> lock(lock_);
554
47
  out << "<h3>Messages:</h3>" << endl;
555
47
  out << "<table>" << endl;
556
47
  out << "<tr><th>Entry</th><th>OpId</th><th>Type</th><th>Size</th><th>Status</th></tr>" << endl;
557
558
47
  int counter = 0;
559
59
  for (const auto& entry : cache_) {
560
59
    const ReplicateMsgPtr msg = entry.second.msg;
561
59
    out << Substitute("<tr><th>$0</th><th>$1.$2</th><td>REPLICATE $3</td>"
562
59
                      "<td>$4</td><td>$5</td></tr>",
563
59
                      counter++, msg->id().term(), msg->id().index(),
564
59
                      OperationType_Name(msg->op_type()),
565
59
                      msg->ByteSize(), msg->id().ShortDebugString()) << endl;
566
59
  }
567
47
  out << "</table>";
568
47
}
569
570
7.03M
void LogCache::TrackOperationsMemory(const OpIds& op_ids) {
571
7.03M
  if (op_ids.empty()) {
572
112k
    return;
573
112k
  }
574
575
6.92M
  std::lock_guard<simple_spinlock> lock(lock_);
576
577
6.92M
  size_t mem_required = 0;
578
7.79M
  for (const auto& op_id : op_ids) {
579
7.79M
    auto it = cache_.find(op_id.index);
580
7.79M
    if (it != cache_.end() && it->second.msg->id().term() == op_id.term) {
581
1.63M
      mem_required += it->second.mem_usage;
582
1.63M
      it->second.tracked = true;
583
1.63M
    }
584
7.79M
  }
585
586
6.92M
  if (mem_required == 0) {
587
5.46M
    return;
588
5.46M
  }
589
590
  // Try to consume the memory. If it can't be consumed, we may need to evict.
591
1.46M
  if (!tracker_->TryConsume(mem_required)) {
592
2.39k
    auto spare = tracker_->SpareCapacity();
593
2.39k
    auto need_to_free = mem_required - spare;
594
0
    VLOG_WITH_PREFIX_UNLOCKED(1)
595
0
        << "Memory limit would be exceeded trying to append "
596
0
        << HumanReadableNumBytes::ToString(mem_required)
597
0
        << " to log cache (available="
598
0
        << HumanReadableNumBytes::ToString(spare)
599
0
        << "): attempting to evict some operations...";
600
601
2.39k
    tracker_->Consume(mem_required);
602
603
    // TODO: we should also try to evict from other tablets - probably better to evict really old
604
    // ops from another tablet than evict recent ops from this one.
605
2.39k
    EvictSomeUnlocked(min_pinned_op_index_, need_to_free);
606
2.39k
  }
607
1.46M
}
608
609
7
int64_t LogCache::num_cached_ops() const {
610
7
  return metrics_.num_ops->value();
611
7
}
612
613
#define INSTANTIATE_METRIC(x, ...) \
614
  x(BOOST_PP_CAT(METRIC_log_cache_, x).Instantiate(metric_entity, ## __VA_ARGS__))
615
LogCache::Metrics::Metrics(const scoped_refptr<MetricEntity>& metric_entity)
616
  : INSTANTIATE_METRIC(num_ops, 0),
617
    INSTANTIATE_METRIC(size, 0),
618
88.8k
    INSTANTIATE_METRIC(disk_reads) {
619
88.8k
}
620
#undef INSTANTIATE_METRIC
621
622
} // namespace consensus
623
} // namespace yb