YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/docdb/rocksdb_writer.h
Line
Count
Source
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#ifndef YB_DOCDB_ROCKSDB_WRITER_H
15
#define YB_DOCDB_ROCKSDB_WRITER_H
16
17
#include "yb/common/doc_hybrid_time.h"
18
#include "yb/common/hybrid_time.h"
19
#include "yb/common/transaction.h"
20
21
#include "yb/docdb/docdb.h"
22
#include "yb/docdb/docdb.fwd.h"
23
#include "yb/docdb/docdb_fwd.h"
24
#include "yb/docdb/intent.h"
25
26
#include "yb/rocksdb/write_batch.h"
27
28
namespace yb {
29
namespace docdb {
30
31
class NonTransactionalWriter : public rocksdb::DirectWriter {
32
 public:
33
  NonTransactionalWriter(
34
    std::reference_wrapper<const KeyValueWriteBatchPB> put_batch, HybridTime hybrid_time);
35
36
  bool Empty() const;
37
38
  CHECKED_STATUS Apply(rocksdb::DirectWriteHandler* handler) override;
39
40
 private:
41
  const docdb::KeyValueWriteBatchPB& put_batch_;
42
  HybridTime hybrid_time_;
43
};
44
45
// Buffer for encoding DocHybridTime
46
class DocHybridTimeBuffer {
47
 public:
48
  DocHybridTimeBuffer();
49
50
265M
  Slice EncodeWithValueType(const DocHybridTime& doc_ht) {
51
265M
    auto end = doc_ht.EncodedInDocDbFormat(buffer_.data() + 1);
52
265M
    return Slice(buffer_.data(), end);
53
265M
  }
54
55
265M
  Slice EncodeWithValueType(HybridTime ht, IntraTxnWriteId write_id) {
56
265M
    return EncodeWithValueType(DocHybridTime(ht, write_id));
57
265M
  }
58
 private:
59
  std::array<char, 1 + kMaxBytesPerEncodedHybridTime> buffer_;
60
};
61
62
class TransactionalWriter : public rocksdb::DirectWriter {
63
 public:
64
  TransactionalWriter(
65
      std::reference_wrapper<const docdb::KeyValueWriteBatchPB> put_batch,
66
      HybridTime hybrid_time,
67
      const TransactionId& transaction_id,
68
      IsolationLevel isolation_level,
69
      PartialRangeKeyIntents partial_range_key_intents,
70
      const Slice& replicated_batches_state,
71
      IntraTxnWriteId intra_txn_write_id);
72
73
  CHECKED_STATUS Apply(rocksdb::DirectWriteHandler* handler) override;
74
75
2.31M
  IntraTxnWriteId intra_txn_write_id() const {
76
2.31M
    return intra_txn_write_id_;
77
2.31M
  }
78
79
1.67M
  void SetMetadataToStore(const TransactionMetadataPB* value) {
80
1.67M
    metadata_to_store_ = value;
81
1.67M
  }
82
83
  CHECKED_STATUS operator()(
84
      IntentStrength intent_strength, FullDocKey, Slice value_slice, KeyBytes* key,
85
      LastKey last_key);
86
87
 private:
88
  CHECKED_STATUS Finish();
89
  CHECKED_STATUS AddWeakIntent(
90
      const std::pair<KeyBuffer, IntentTypeSet>& intent_and_types,
91
      const std::array<Slice, 2>& value,
92
      DocHybridTimeBuffer* doc_ht_buffer);
93
94
  const docdb::KeyValueWriteBatchPB& put_batch_;
95
  HybridTime hybrid_time_;
96
  TransactionId transaction_id_;
97
  IsolationLevel isolation_level_;
98
  PartialRangeKeyIntents partial_range_key_intents_;
99
  Slice replicated_batches_state_;
100
  IntraTxnWriteId intra_txn_write_id_;
101
  IntraTxnWriteId write_id_ = 0;
102
  const TransactionMetadataPB* metadata_to_store_ = nullptr;
103
104
  // TODO(dtxn) weak & strong intent in one batch.
105
  // TODO(dtxn) extract part of code knowing about intents structure to lower level.
106
  // Handler is initialized in Apply method, and not used after apply returns.
107
  rocksdb::DirectWriteHandler* handler_;
108
  RowMarkType row_mark_;
109
  SubTransactionId subtransaction_id_;
110
  IntentTypeSet strong_intent_types_;
111
  std::unordered_map<KeyBuffer, IntentTypeSet, ByteBufferHash> weak_intents_;
112
};
113
114
// Base class used by IntentsWriter to handle found intents.
115
class IntentsWriterContext {
116
 public:
117
  explicit IntentsWriterContext(const TransactionId& transaction_id);
118
119
2.97M
  virtual ~IntentsWriterContext() = default;
120
121
  // Called at the start of iteration. Passed key of the first found entry, if present.
122
1.67M
  virtual void Start(const boost::optional<Slice>& first_key) {}
123
124
  // Called on every reverse index entry.
125
  // key - entry key.
126
  // value - entry value.
127
  // metadata - whether entry is metadata entry or not.
128
  // Returns true if we should interrupt iteration, false otherwise.
129
  virtual Result<bool> Entry(
130
      const Slice& key, const Slice& value, bool metadata,
131
      rocksdb::DirectWriteHandler* handler) = 0;
132
133
  virtual void Complete(rocksdb::DirectWriteHandler* handler) = 0;
134
135
132M
  const TransactionId& transaction_id() const {
136
132M
    return transaction_id_;
137
132M
  }
138
139
2.97M
  ApplyTransactionState& apply_state() {
140
2.97M
    return apply_state_;
141
2.97M
  }
142
143
149M
  bool reached_records_limit() const {
144
149M
    return left_records_ <= 0;
145
149M
  }
146
147
197M
  void RegisterRecord() {
148
197M
    --left_records_;
149
197M
  }
150
151
 protected:
152
  void SetApplyState(
153
820
      const Slice& key, IntraTxnWriteId write_id, const AbortedSubTransactionSet& aborted) {
154
820
    apply_state_.key = key.ToBuffer();
155
820
    apply_state_.write_id = write_id;
156
820
    apply_state_.aborted = aborted;
157
820
  }
158
159
 private:
160
  TransactionId transaction_id_;
161
  ApplyTransactionState apply_state_;
162
  int64_t left_records_;
163
};
164
165
class IntentsWriter : public rocksdb::DirectWriter {
166
 public:
167
  IntentsWriter(const Slice& start_key,
168
                rocksdb::DB* intents_db,
169
                IntentsWriterContext* context);
170
171
  CHECKED_STATUS Apply(rocksdb::DirectWriteHandler* handler) override;
172
173
 private:
174
  Slice start_key_;
175
  rocksdb::DB* intents_db_;
176
  IntentsWriterContext& context_;
177
  KeyBytes txn_reverse_index_prefix_;
178
  Slice reverse_index_upperbound_;
179
  BoundedRocksDbIterator reverse_index_iter_;
180
};
181
182
class ApplyIntentsContext : public IntentsWriterContext {
183
 public:
184
  ApplyIntentsContext(
185
      const TransactionId& transaction_id,
186
      const ApplyTransactionState* apply_state,
187
      const AbortedSubTransactionSet& aborted,
188
      HybridTime commit_ht,
189
      HybridTime log_ht,
190
      const KeyBounds* key_bounds,
191
      rocksdb::DB* intents_db);
192
193
  void Start(const boost::optional<Slice>& first_key) override;
194
195
  Result<bool> Entry(
196
      const Slice& key, const Slice& value, bool metadata,
197
      rocksdb::DirectWriteHandler* handler) override;
198
199
  void Complete(rocksdb::DirectWriteHandler* handler) override;
200
201
 private:
202
  Result<bool> StoreApplyState(const Slice& key, rocksdb::DirectWriteHandler* handler);
203
204
  const ApplyTransactionState* apply_state_;
205
  const AbortedSubTransactionSet& aborted_;
206
  HybridTime commit_ht_;
207
  HybridTime log_ht_;
208
  IntraTxnWriteId write_id_;
209
  const KeyBounds* key_bounds_;
210
  BoundedRocksDbIterator intent_iter_;
211
};
212
213
class RemoveIntentsContext : public IntentsWriterContext {
214
 public:
215
  explicit RemoveIntentsContext(const TransactionId& transaction_id);
216
217
  Result<bool> Entry(
218
      const Slice& key, const Slice& value, bool metadata,
219
      rocksdb::DirectWriteHandler* handler) override;
220
221
  void Complete(rocksdb::DirectWriteHandler* handler) override;
222
 private:
223
};
224
225
} // namespace docdb
226
} // namespace yb
227
228
#endif // YB_DOCDB_ROCKSDB_WRITER_H