YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/ent/src/yb/tserver/twodc_write_implementations.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
13
#include <deque>
14
15
#include "yb/common/transaction.h"
16
17
#include "yb/docdb/docdb.h"
18
#include "yb/docdb/key_bytes.h"
19
20
#include "yb/tserver/twodc_write_interface.h"
21
#include "yb/tserver/tserver.pb.h"
22
23
#include "yb/cdc/cdc_service.pb.h"
24
25
#include "yb/util/size_literals.h"
26
#include "yb/util/flag_tags.h"
27
#include "yb/util/flags.h"
28
29
#include "yb/common/hybrid_time.h"
30
31
DEFINE_int32(cdc_max_apply_batch_num_records, 1024, "Max CDC write request batch num records. If"
32
                                                    " set to 0, there is no max num records, which"
33
                                                    " means batches will be limited only by size.");
34
TAG_FLAG(cdc_max_apply_batch_num_records, runtime);
35
36
DEFINE_int32(cdc_max_apply_batch_size_bytes, 0, "Max CDC write request batch size in kb. If 0,"
37
                                                " default to consensus_max_batch_size_bytes.");
38
TAG_FLAG(cdc_max_apply_batch_size_bytes, runtime);
39
40
DEFINE_test_flag(bool, twodc_write_hybrid_time, false,
41
                 "Override external_hybrid_time with initialHybridTimeValue for testing.");
42
43
DECLARE_uint64(consensus_max_batch_size_bytes);
44
45
namespace yb {
46
47
using namespace yb::size_literals;
48
49
namespace tserver {
50
namespace enterprise {
51
52
CHECKED_STATUS CombineExternalIntents(
53
    const tablet::TransactionStatePB& transaction_state,
54
    const google::protobuf::RepeatedPtrField<cdc::KeyValuePairPB>& pairs,
55
0
    google::protobuf::RepeatedPtrField<docdb::KeyValuePairPB> *out) {
56
57
0
  class Provider : public docdb::ExternalIntentsProvider {
58
0
   public:
59
0
    Provider(
60
0
        const Uuid& involved_tablet,
61
0
        const google::protobuf::RepeatedPtrField<cdc::KeyValuePairPB>* pairs,
62
0
        docdb::KeyValuePairPB* out)
63
0
        : involved_tablet_(involved_tablet), pairs_(*pairs), out_(out) {
64
0
    }
65
66
0
    void SetKey(const Slice& slice) override {
67
0
      out_->set_key(slice.cdata(), slice.size());
68
0
    }
69
70
0
    void SetValue(const Slice& slice) override {
71
0
      out_->set_value(slice.cdata(), slice.size());
72
0
    }
73
74
0
    const Uuid& InvolvedTablet() override {
75
0
      return involved_tablet_;
76
0
    }
77
78
0
    boost::optional<std::pair<Slice, Slice>> Next() override {
79
0
      if (next_idx_ >= pairs_.size()) {
80
0
        return boost::none;
81
0
      }
82
83
0
      const auto& input = pairs_[next_idx_];
84
0
      ++next_idx_;
85
86
0
      return std::pair<Slice, Slice>(input.key(), input.value().binary_value());
87
0
    }
88
89
0
   private:
90
0
    Uuid involved_tablet_;
91
0
    const google::protobuf::RepeatedPtrField<cdc::KeyValuePairPB>& pairs_;
92
0
    docdb::KeyValuePairPB* out_;
93
0
    int next_idx_ = 0;
94
0
  };
95
96
0
  auto txn_id = VERIFY_RESULT(FullyDecodeTransactionId(transaction_state.transaction_id()));
97
0
  SCHECK_EQ(transaction_state.tablets().size(), 1, InvalidArgument, "Wrong tablets number");
98
0
  Uuid status_tablet;
99
0
  RETURN_NOT_OK(status_tablet.FromHexString(transaction_state.tablets()[0]));
100
101
0
  Provider provider(status_tablet, &pairs, out->Add());
102
0
  docdb::CombineExternalIntents(txn_id, &provider);
103
0
  return Status::OK();
104
0
}
105
106
CHECKED_STATUS AddRecord(
107
    const cdc::CDCRecordPB& record,
108
0
    docdb::KeyValueWriteBatchPB* write_batch) {
109
0
  if (record.operation() == cdc::CDCRecordPB::APPLY) {
110
0
    auto* apply_txn = write_batch->mutable_apply_external_transactions()->Add();
111
0
    apply_txn->set_transaction_id(record.transaction_state().transaction_id());
112
0
    apply_txn->set_commit_hybrid_time(record.transaction_state().commit_hybrid_time());
113
0
    return Status::OK();
114
0
  }
115
116
0
  if (record.has_transaction_state()) {
117
0
    return CombineExternalIntents(
118
0
        record.transaction_state(), record.changes(), write_batch->mutable_write_pairs());
119
0
  }
120
121
0
  for (const auto& kv_pair : record.changes()) {
122
0
    auto* write_pair = write_batch->mutable_write_pairs()->Add();
123
0
    write_pair->set_key(kv_pair.key());
124
0
    write_pair->set_value(kv_pair.value().binary_value());
125
0
    if (PREDICT_FALSE(FLAGS_TEST_twodc_write_hybrid_time)) {
126
      // Used only for testing external hybrid time.
127
0
      write_pair->set_external_hybrid_time(yb::kInitialHybridTimeValue);
128
0
    } else {
129
0
      write_pair->set_external_hybrid_time(record.time());
130
0
    }
131
0
  }
132
133
0
  return Status::OK();
134
0
}
135
136
// The BatchedWriteImplementation strategy batches together multiple records per WriteRequestPB.
137
// Max number of records in a request is cdc_max_apply_batch_num_records, and max size of a request
138
// is cdc_max_apply_batch_size_kb. Batches are not sent by opid order, since a GetChangesResponse
139
// can contain interleaved records to multiple tablets. Rather, we send batches to each tablet
140
// in order for that tablet, before moving on to the next tablet.
141
class BatchedWriteImplementation : public TwoDCWriteInterface {
142
0
  ~BatchedWriteImplementation() = default;
143
144
0
  Status ProcessRecord(const std::string& tablet_id, const cdc::CDCRecordPB& record) override {
145
0
    auto it = records_.find(tablet_id);
146
0
    if (it == records_.end()) {
147
0
      it = records_.emplace(tablet_id, std::deque<std::unique_ptr<WriteRequestPB>>()).first;
148
0
    }
149
150
0
    auto& queue = it->second;
151
152
0
    auto max_batch_records = FLAGS_cdc_max_apply_batch_num_records != 0 ?
153
0
        FLAGS_cdc_max_apply_batch_num_records : std::numeric_limits<uint32_t>::max();
154
0
    auto max_batch_size = FLAGS_cdc_max_apply_batch_size_bytes != 0 ?
155
0
        FLAGS_cdc_max_apply_batch_size_bytes : FLAGS_consensus_max_batch_size_bytes;
156
157
0
    if (queue.empty() ||
158
0
        implicit_cast<size_t>(queue.back()->write_batch().write_pairs_size())
159
0
            >= max_batch_records ||
160
0
        queue.back()->ByteSizeLong() >= max_batch_size) {
161
      // Create a new batch.
162
0
      auto req = std::make_unique<WriteRequestPB>();
163
0
      req->set_tablet_id(tablet_id);
164
0
      req->set_external_hybrid_time(record.time());
165
0
      queue.push_back(std::move(req));
166
0
    }
167
0
    auto* write_request = queue.back().get();
168
169
0
    return AddRecord(record, write_request->mutable_write_batch());
170
0
  }
171
172
0
  std::unique_ptr<WriteRequestPB> GetNextWriteRequest() override {
173
0
    if (records_.empty()) {
174
0
      return nullptr;
175
0
    }
176
0
    auto& queue = records_.begin()->second;
177
0
    auto next_req = std::move(queue.front());
178
0
    queue.pop_front();
179
0
    if (queue.empty()) {
180
0
      records_.erase(next_req->tablet_id());
181
0
    }
182
0
    return next_req;
183
0
  }
184
185
 private:
186
  std::map<std::string, std::deque<std::unique_ptr<WriteRequestPB>>> records_;
187
};
188
189
0
void ResetWriteInterface(std::unique_ptr<TwoDCWriteInterface>* write_strategy) {
190
0
  write_strategy->reset(new BatchedWriteImplementation());
191
0
}
192
193
} // namespace enterprise
194
} // namespace tserver
195
} // namespace yb