YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/docdb/rocksdb_writer.h
Line
Count
Source
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#ifndef YB_DOCDB_ROCKSDB_WRITER_H
15
#define YB_DOCDB_ROCKSDB_WRITER_H
16
17
#include "yb/common/doc_hybrid_time.h"
18
#include "yb/common/hybrid_time.h"
19
#include "yb/common/transaction.h"
20
21
#include "yb/docdb/docdb.h"
22
#include "yb/docdb/docdb.fwd.h"
23
#include "yb/docdb/docdb_fwd.h"
24
#include "yb/docdb/intent.h"
25
26
#include "yb/rocksdb/write_batch.h"
27
28
namespace yb {
29
namespace docdb {
30
31
class NonTransactionalWriter : public rocksdb::DirectWriter {
32
 public:
33
  NonTransactionalWriter(
34
    std::reference_wrapper<const KeyValueWriteBatchPB> put_batch, HybridTime hybrid_time);
35
36
  bool Empty() const;
37
38
  CHECKED_STATUS Apply(rocksdb::DirectWriteHandler* handler) override;
39
40
 private:
41
  const docdb::KeyValueWriteBatchPB& put_batch_;
42
  HybridTime hybrid_time_;
43
};
44
45
// Buffer for encoding DocHybridTime
46
class DocHybridTimeBuffer {
47
 public:
48
  DocHybridTimeBuffer();
49
50
84.3M
  Slice EncodeWithValueType(const DocHybridTime& doc_ht) {
51
84.3M
    auto end = doc_ht.EncodedInDocDbFormat(buffer_.data() + 1);
52
84.3M
    return Slice(buffer_.data(), end);
53
84.3M
  }
54
55
84.3M
  Slice EncodeWithValueType(HybridTime ht, IntraTxnWriteId write_id) {
56
84.3M
    return EncodeWithValueType(DocHybridTime(ht, write_id));
57
84.3M
  }
58
 private:
59
  std::array<char, 1 + kMaxBytesPerEncodedHybridTime> buffer_;
60
};
61
62
class TransactionalWriter : public rocksdb::DirectWriter {
63
 public:
64
  TransactionalWriter(
65
      std::reference_wrapper<const docdb::KeyValueWriteBatchPB> put_batch,
66
      HybridTime hybrid_time,
67
      const TransactionId& transaction_id,
68
      IsolationLevel isolation_level,
69
      PartialRangeKeyIntents partial_range_key_intents,
70
      const Slice& replicated_batches_state,
71
      IntraTxnWriteId intra_txn_write_id);
72
73
  CHECKED_STATUS Apply(rocksdb::DirectWriteHandler* handler) override;
74
75
1.26M
  IntraTxnWriteId intra_txn_write_id() const {
76
1.26M
    return intra_txn_write_id_;
77
1.26M
  }
78
79
968k
  void SetMetadataToStore(const TransactionMetadataPB* value) {
80
968k
    metadata_to_store_ = value;
81
968k
  }
82
83
  CHECKED_STATUS operator()(
84
      IntentStrength intent_strength, FullDocKey, Slice value_slice, KeyBytes* key,
85
      LastKey last_key);
86
87
 private:
88
  CHECKED_STATUS Finish();
89
  CHECKED_STATUS AddWeakIntent(
90
      const std::pair<KeyBuffer, IntentTypeSet>& intent_and_types,
91
      const std::array<Slice, 2>& value,
92
      DocHybridTimeBuffer* doc_ht_buffer);
93
94
  const docdb::KeyValueWriteBatchPB& put_batch_;
95
  HybridTime hybrid_time_;
96
  TransactionId transaction_id_;
97
  IsolationLevel isolation_level_;
98
  PartialRangeKeyIntents partial_range_key_intents_;
99
  Slice replicated_batches_state_;
100
  IntraTxnWriteId intra_txn_write_id_;
101
  IntraTxnWriteId write_id_ = 0;
102
  const TransactionMetadataPB* metadata_to_store_ = nullptr;
103
104
  // TODO(dtxn) weak & strong intent in one batch.
105
  // TODO(dtxn) extract part of code knowing about intents structure to lower level.
106
  // Handler is initialized in Apply method, and not used after apply returns.
107
  rocksdb::DirectWriteHandler* handler_;
108
  RowMarkType row_mark_;
109
  SubTransactionId subtransaction_id_;
110
  IntentTypeSet strong_intent_types_;
111
  std::unordered_map<KeyBuffer, IntentTypeSet, ByteBufferHash> weak_intents_;
112
};
113
114
// Base class used by IntentsWriter to handle found intents.
115
class IntentsWriterContext {
116
 public:
117
  explicit IntentsWriterContext(const TransactionId& transaction_id);
118
119
1.70M
  virtual ~IntentsWriterContext() = default;
120
121
  // Called at the start of iteration. Passed key of the first found entry, if present.
122
968k
  virtual void Start(const boost::optional<Slice>& first_key) {}
123
124
  // Called on every reverse index entry.
125
  // key - entry key.
126
  // value - entry value.
127
  // metadata - whether entry is metadata entry or not.
128
  // Returns true if we should interrupt iteration, false otherwise.
129
  virtual Result<bool> Entry(
130
      const Slice& key, const Slice& value, bool metadata,
131
      rocksdb::DirectWriteHandler* handler) = 0;
132
133
  virtual void Complete(rocksdb::DirectWriteHandler* handler) = 0;
134
135
44.6M
  const TransactionId& transaction_id() const {
136
44.6M
    return transaction_id_;
137
44.6M
  }
138
139
1.70M
  ApplyTransactionState& apply_state() {
140
1.70M
    return apply_state_;
141
1.70M
  }
142
143
54.8M
  bool reached_records_limit() const {
144
54.8M
    return left_records_ <= 0;
145
54.8M
  }
146
147
73.2M
  void RegisterRecord() {
148
73.2M
    --left_records_;
149
73.2M
  }
150
151
 protected:
152
  void SetApplyState(
153
41
      const Slice& key, IntraTxnWriteId write_id, const AbortedSubTransactionSet& aborted) {
154
41
    apply_state_.key = key.ToBuffer();
155
41
    apply_state_.write_id = write_id;
156
41
    apply_state_.aborted = aborted;
157
41
  }
158
159
 private:
160
  TransactionId transaction_id_;
161
  ApplyTransactionState apply_state_;
162
  int64_t left_records_;
163
};
164
165
class IntentsWriter : public rocksdb::DirectWriter {
166
 public:
167
  IntentsWriter(const Slice& start_key,
168
                rocksdb::DB* intents_db,
169
                IntentsWriterContext* context);
170
171
  CHECKED_STATUS Apply(rocksdb::DirectWriteHandler* handler) override;
172
173
 private:
174
  Slice start_key_;
175
  rocksdb::DB* intents_db_;
176
  IntentsWriterContext& context_;
177
};
178
179
class ApplyIntentsContext : public IntentsWriterContext {
180
 public:
181
  ApplyIntentsContext(
182
      const TransactionId& transaction_id,
183
      const ApplyTransactionState* apply_state,
184
      const AbortedSubTransactionSet& aborted,
185
      HybridTime commit_ht,
186
      HybridTime log_ht,
187
      const KeyBounds* key_bounds,
188
      rocksdb::DB* intents_db);
189
190
  void Start(const boost::optional<Slice>& first_key) override;
191
192
  Result<bool> Entry(
193
      const Slice& key, const Slice& value, bool metadata,
194
      rocksdb::DirectWriteHandler* handler) override;
195
196
  void Complete(rocksdb::DirectWriteHandler* handler) override;
197
198
 private:
199
  Result<bool> StoreApplyState(const Slice& key, rocksdb::DirectWriteHandler* handler);
200
201
  const ApplyTransactionState* apply_state_;
202
  const AbortedSubTransactionSet& aborted_;
203
  HybridTime commit_ht_;
204
  HybridTime log_ht_;
205
  IntraTxnWriteId write_id_;
206
  const KeyBounds* key_bounds_;
207
  BoundedRocksDbIterator intent_iter_;
208
};
209
210
class RemoveIntentsContext : public IntentsWriterContext {
211
 public:
212
  explicit RemoveIntentsContext(const TransactionId& transaction_id);
213
214
  Result<bool> Entry(
215
      const Slice& key, const Slice& value, bool metadata,
216
      rocksdb::DirectWriteHandler* handler) override;
217
218
  void Complete(rocksdb::DirectWriteHandler* handler) override;
219
 private:
220
};
221
222
} // namespace docdb
223
} // namespace yb
224
225
#endif // YB_DOCDB_ROCKSDB_WRITER_H