YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rocksdb/table/plain_table_key_coding.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
21
#ifndef ROCKSDB_LITE
22
#include "yb/rocksdb/table/plain_table_key_coding.h"
23
24
#include <algorithm>
25
#include <string>
26
#include "yb/rocksdb/db/dbformat.h"
27
#include "yb/rocksdb/table/plain_table_reader.h"
28
#include "yb/rocksdb/table/plain_table_factory.h"
29
#include "yb/rocksdb/util/file_reader_writer.h"
30
31
namespace rocksdb {
32
33
enum PlainTableEntryType : unsigned char {
34
  kFullKey = 0,
35
  kPrefixFromPreviousKey = 1,
36
  kKeySuffix = 2,
37
};
38
39
namespace {
40
41
// Control byte:
42
// First two bits indicate type of entry
43
// Other bytes are inlined sizes. If all bits are 1 (0x03F), overflow bytes
44
// are used. key_size-0x3F will be encoded as a variint32 after this bytes.
45
46
const unsigned char kSizeInlineLimit = 0x3F;
47
48
// Return 0 for error
49
size_t EncodeSize(PlainTableEntryType type, uint32_t key_size,
50
376
                  char* out_buffer) {
51
376
  out_buffer[0] = type << 6;
52
53
376
  if (key_size < static_cast<uint32_t>(kSizeInlineLimit)) {
54
    // size inlined
55
374
    out_buffer[0] |= static_cast<char>(key_size);
56
374
    return 1;
57
374
  } else {
58
2
    out_buffer[0] |= kSizeInlineLimit;
59
2
    char* ptr = EncodeVarint32(out_buffer + 1, key_size - kSizeInlineLimit);
60
2
    return ptr - out_buffer;
61
2
  }
62
376
}
63
}  // namespace
64
65
// Fill bytes_read with number of bytes read.
66
inline Status PlainTableKeyDecoder::DecodeSize(uint32_t start_offset,
67
                                               PlainTableEntryType* entry_type,
68
                                               uint32_t* key_size,
69
996
                                               uint32_t* bytes_read) {
70
996
  Slice next_byte_slice;
71
996
  bool success = file_reader_.Read(start_offset, 1, &next_byte_slice);
72
996
  if (!success) {
73
0
    return file_reader_.status();
74
0
  }
75
996
  *entry_type = static_cast<PlainTableEntryType>(
76
996
      (static_cast<unsigned char>(next_byte_slice[0]) & ~kSizeInlineLimit) >>
77
996
      6);
78
996
  char inline_key_size = next_byte_slice[0] & kSizeInlineLimit;
79
996
  if (inline_key_size < kSizeInlineLimit) {
80
992
    *key_size = inline_key_size;
81
992
    *bytes_read = 1;
82
992
    return Status::OK();
83
992
  } else {
84
4
    uint32_t extra_size;
85
4
    uint32_t tmp_bytes_read;
86
4
    success = file_reader_.ReadVarint32(start_offset + 1, &extra_size,
87
4
                                        &tmp_bytes_read);
88
4
    if (!success) {
89
0
      return file_reader_.status();
90
0
    }
91
4
    assert(tmp_bytes_read > 0);
92
0
    *key_size = kSizeInlineLimit + extra_size;
93
4
    *bytes_read = tmp_bytes_read + 1;
94
4
    return Status::OK();
95
4
  }
96
996
}
97
98
Status PlainTableKeyEncoder::AppendKey(const Slice& key,
99
                                       WritableFileWriter* file,
100
                                       uint64_t* offset, char* meta_bytes_buf,
101
344k
                                       size_t* meta_bytes_buf_size) {
102
344k
  ParsedInternalKey parsed_key;
103
344k
  if (!ParseInternalKey(key, &parsed_key)) {
104
0
    return STATUS(Corruption, Slice());
105
0
  }
106
107
344k
  Slice key_to_write = key;  // Portion of internal key to write out.
108
109
344k
  uint32_t user_key_size = static_cast<uint32_t>(key.size() - 8);
110
344k
  if (encoding_type_ == kPlain) {
111
344k
    if (fixed_user_key_len_ == kPlainTableVariableLength) {
112
      // Write key length
113
343k
      char key_size_buf[5];  // tmp buffer for key size as varint32
114
343k
      char* ptr = EncodeVarint32(key_size_buf, user_key_size);
115
343k
      assert(ptr <= key_size_buf + sizeof(key_size_buf));
116
0
      auto len = ptr - key_size_buf;
117
343k
      Status s = file->Append(Slice(key_size_buf, len));
118
343k
      if (!s.ok()) {
119
0
        return s;
120
0
      }
121
343k
      *offset += len;
122
343k
    }
123
344k
  } else {
124
330
    assert(encoding_type_ == kPrefix);
125
0
    char size_bytes[12];
126
330
    size_t size_bytes_pos = 0;
127
128
330
    Slice prefix =
129
330
        prefix_extractor_->Transform(Slice(key.data(), user_key_size));
130
330
    if (key_count_for_prefix_ == 0 || 
prefix != pre_prefix_.GetKey()226
||
131
330
        
key_count_for_prefix_ % index_sparseness_ == 0158
) {
132
208
      key_count_for_prefix_ = 1;
133
208
      pre_prefix_.SetKey(prefix);
134
208
      size_bytes_pos += EncodeSize(kFullKey, user_key_size, size_bytes);
135
208
      Status s = file->Append(Slice(size_bytes, size_bytes_pos));
136
208
      if (!s.ok()) {
137
0
        return s;
138
0
      }
139
208
      *offset += size_bytes_pos;
140
208
    } else {
141
122
      key_count_for_prefix_++;
142
122
      if (key_count_for_prefix_ == 2) {
143
        // For second key within a prefix, need to encode prefix length
144
46
        size_bytes_pos +=
145
46
            EncodeSize(kPrefixFromPreviousKey,
146
46
                       static_cast<uint32_t>(pre_prefix_.GetKey().size()),
147
46
                       size_bytes + size_bytes_pos);
148
46
      }
149
122
      uint32_t prefix_len = static_cast<uint32_t>(pre_prefix_.GetKey().size());
150
122
      size_bytes_pos += EncodeSize(kKeySuffix, user_key_size - prefix_len,
151
122
                                   size_bytes + size_bytes_pos);
152
122
      Status s = file->Append(Slice(size_bytes, size_bytes_pos));
153
122
      if (!s.ok()) {
154
0
        return s;
155
0
      }
156
122
      *offset += size_bytes_pos;
157
122
      key_to_write = Slice(key.data() + prefix_len, key.size() - prefix_len);
158
122
    }
159
330
  }
160
161
  // Encode full key
162
  // For value size as varint32 (up to 5 bytes).
163
  // If the row is of value type with seqId 0, flush the special flag together
164
  // in this buffer to safe one file append call, which takes 1 byte.
165
344k
  if (parsed_key.sequence == 0 && 
parsed_key.type == kTypeValue147k
) {
166
147k
    Status s =
167
147k
        file->Append(Slice(key_to_write.data(), key_to_write.size() - 8));
168
147k
    if (!s.ok()) {
169
0
      return s;
170
0
    }
171
147k
    *offset += key_to_write.size() - 8;
172
147k
    meta_bytes_buf[*meta_bytes_buf_size] = PlainTableFactory::kValueTypeSeqId0;
173
147k
    *meta_bytes_buf_size += 1;
174
197k
  } else {
175
197k
    RETURN_NOT_OK(file->Append(key_to_write));
176
197k
    *offset += key_to_write.size();
177
197k
  }
178
179
344k
  return Status::OK();
180
344k
}
181
182
Slice PlainTableFileReader::GetFromBuffer(Buffer* buffer, uint32_t file_offset,
183
4.73M
                                          uint32_t len) {
184
4.73M
  assert(file_offset + len <= file_info_->data_end_offset);
185
0
  return Slice(buffer->buf.get() + (file_offset - buffer->buf_start_offset),
186
4.73M
               len);
187
4.73M
}
188
189
bool PlainTableFileReader::ReadNonMmap(uint32_t file_offset, uint32_t len,
190
4.72M
                                       Slice* out) {
191
4.72M
  const uint32_t kPrefetchSize = 256u;
192
193
  // Try to read from buffers.
194
5.70M
  for (uint32_t i = 0; i < num_buf_; 
i++976k
) {
195
5.31M
    Buffer* buffer = buffers_[num_buf_ - 1 - i].get();
196
5.31M
    if (file_offset >= buffer->buf_start_offset &&
197
5.31M
        
file_offset + len <= buffer->buf_start_offset + buffer->buf_len4.93M
) {
198
4.33M
      *out = GetFromBuffer(buffer, file_offset, len);
199
4.33M
      return true;
200
4.33M
    }
201
5.31M
  }
202
203
392k
  Buffer* new_buffer;
204
  // Data needed is not in any of the buffer. Allocate a new buffer.
205
392k
  if (num_buf_ < buffers_.size()) {
206
    // Add a new buffer
207
30.2k
    new_buffer = new Buffer();
208
30.2k
    buffers_[num_buf_++].reset(new_buffer);
209
362k
  } else {
210
    // Now simply replace the last buffer. Can improve the placement policy
211
    // if needed.
212
362k
    new_buffer = buffers_[num_buf_ - 1].get();
213
362k
  }
214
215
392k
  assert(file_offset + len <= file_info_->data_end_offset);
216
0
  uint32_t size_to_read = std::min(file_info_->data_end_offset - file_offset,
217
392k
                                   std::max(kPrefetchSize, len));
218
392k
  if (size_to_read > new_buffer->buf_capacity) {
219
43.9k
    new_buffer->buf.reset(new char[size_to_read]);
220
43.9k
    new_buffer->buf_capacity = size_to_read;
221
43.9k
    new_buffer->buf_len = 0;
222
43.9k
  }
223
392k
  Slice read_result;
224
392k
  Status s = file_info_->file->Read(file_offset, size_to_read, &read_result,
225
392k
                                    new_buffer->buf.get());
226
392k
  if (!s.ok()) {
227
0
    status_ = s;
228
0
    return false;
229
0
  }
230
392k
  new_buffer->buf_start_offset = file_offset;
231
392k
  new_buffer->buf_len = size_to_read;
232
392k
  *out = GetFromBuffer(new_buffer, file_offset, len);
233
392k
  return true;
234
392k
}
235
236
inline bool PlainTableFileReader::ReadVarint32(uint32_t offset, uint32_t* out,
237
7.89M
                                               uint32_t* bytes_read) {
238
7.89M
  if (file_info_->is_mmap_mode) {
239
5.76M
    const char* start = file_info_->file_data.cdata() + offset;
240
5.76M
    const char* limit = file_info_->file_data.cdata() + file_info_->data_end_offset;
241
5.76M
    const char* key_ptr = GetVarint32Ptr(start, limit, out);
242
5.76M
    assert(key_ptr != nullptr);
243
0
    *bytes_read = static_cast<uint32_t>(key_ptr - start);
244
5.76M
    return true;
245
5.76M
  } else {
246
2.13M
    return ReadVarint32NonMmap(offset, out, bytes_read);
247
2.13M
  }
248
7.89M
}
249
250
bool PlainTableFileReader::ReadVarint32NonMmap(uint32_t offset, uint32_t* out,
251
2.13M
                                               uint32_t* bytes_read) {
252
2.13M
  const char* start;
253
2.13M
  const char* limit;
254
2.13M
  const uint32_t kMaxVarInt32Size = 6u;
255
2.13M
  uint32_t bytes_to_read =
256
2.13M
      std::min(file_info_->data_end_offset - offset, kMaxVarInt32Size);
257
2.13M
  Slice bytes;
258
2.13M
  if (!Read(offset, bytes_to_read, &bytes)) {
259
0
    return false;
260
0
  }
261
2.13M
  start = bytes.cdata();
262
2.13M
  limit = bytes.cend();
263
264
2.13M
  const char* key_ptr = GetVarint32Ptr(start, limit, out);
265
2.13M
  *bytes_read =
266
2.13M
      (key_ptr != nullptr) ? 
static_cast<uint32_t>(key_ptr - start)2.13M
:
02.38k
;
267
2.13M
  return true;
268
2.13M
}
269
270
Status PlainTableKeyDecoder::ReadInternalKey(
271
    uint32_t file_offset, uint32_t user_key_size, ParsedInternalKey* parsed_key,
272
4.16M
    uint32_t* bytes_read, bool* internal_key_valid, Slice* internal_key) {
273
4.16M
  Slice tmp_slice;
274
4.16M
  bool success = file_reader_.Read(file_offset, user_key_size + 1, &tmp_slice);
275
4.16M
  if (!success) {
276
0
    return file_reader_.status();
277
0
  }
278
4.16M
  if (tmp_slice.cdata()[user_key_size] == PlainTableFactory::kValueTypeSeqId0) {
279
    // Special encoding for the row with seqID=0
280
2.50M
    parsed_key->user_key = Slice(tmp_slice.data(), user_key_size);
281
2.50M
    parsed_key->sequence = 0;
282
2.50M
    parsed_key->type = kTypeValue;
283
2.50M
    *bytes_read += user_key_size + 1;
284
2.50M
    *internal_key_valid = false;
285
2.50M
  } else {
286
1.66M
    success = file_reader_.Read(file_offset, user_key_size + 8, internal_key);
287
1.66M
    if (!success) {
288
0
      return file_reader_.status();
289
0
    }
290
1.66M
    *internal_key_valid = true;
291
1.66M
    if (!ParseInternalKey(*internal_key, parsed_key)) {
292
0
      return STATUS(Corruption,
293
0
          Slice("Incorrect value type found when reading the next key"));
294
0
    }
295
1.66M
    *bytes_read += user_key_size + 8;
296
1.66M
  }
297
4.16M
  return Status::OK();
298
4.16M
}
299
300
Status PlainTableKeyDecoder::NextPlainEncodingKey(uint32_t start_offset,
301
                                                  ParsedInternalKey* parsed_key,
302
                                                  Slice* internal_key,
303
                                                  uint32_t* bytes_read,
304
4.17M
                                                  bool* seekable) {
305
4.17M
  uint32_t user_key_size = 0;
306
4.17M
  Status s;
307
4.17M
  if (fixed_user_key_len_ != kPlainTableVariableLength) {
308
3.21k
    user_key_size = fixed_user_key_len_;
309
4.17M
  } else {
310
4.17M
    uint32_t tmp_size = 0;
311
4.17M
    uint32_t tmp_read;
312
4.17M
    bool success =
313
4.17M
        file_reader_.ReadVarint32(start_offset, &tmp_size, &tmp_read);
314
4.17M
    if (!success) {
315
0
      return file_reader_.status();
316
0
    }
317
4.17M
    assert(tmp_read > 0);
318
0
    user_key_size = tmp_size;
319
4.17M
    *bytes_read = tmp_read;
320
4.17M
  }
321
  // dummy initial value to avoid compiler complain
322
4.17M
  bool decoded_internal_key_valid = true;
323
4.17M
  Slice decoded_internal_key;
324
4.17M
  s = ReadInternalKey(start_offset + *bytes_read, user_key_size, parsed_key,
325
4.17M
                      bytes_read, &decoded_internal_key_valid,
326
4.17M
                      &decoded_internal_key);
327
4.17M
  if (!s.ok()) {
328
0
    return s;
329
0
  }
330
4.17M
  if (!file_reader_.file_info()->is_mmap_mode) {
331
1.12M
    cur_key_.SetInternalKey(*parsed_key);
332
1.12M
    parsed_key->user_key = Slice(cur_key_.GetKey().data(), user_key_size);
333
1.12M
    if (internal_key != nullptr) {
334
630k
      *internal_key = cur_key_.GetKey();
335
630k
    }
336
3.04M
  } else if (internal_key != nullptr) {
337
1.62M
    if (decoded_internal_key_valid) {
338
291k
      *internal_key = decoded_internal_key;
339
1.33M
    } else {
340
      // Need to copy out the internal key
341
1.33M
      cur_key_.SetInternalKey(*parsed_key);
342
1.33M
      *internal_key = cur_key_.GetKey();
343
1.33M
    }
344
1.62M
  }
345
4.17M
  return Status::OK();
346
4.17M
}
347
348
Status PlainTableKeyDecoder::NextPrefixEncodingKey(
349
    uint32_t start_offset, ParsedInternalKey* parsed_key, Slice* internal_key,
350
866
    uint32_t* bytes_read, bool* seekable) {
351
866
  PlainTableEntryType entry_type = PlainTableEntryType::kFullKey;
352
353
866
  bool expect_suffix = false;
354
866
  Status s;
355
996
  do {
356
996
    uint32_t size = 0;
357
    // dummy initial value to avoid compiler complain
358
996
    bool decoded_internal_key_valid = true;
359
996
    uint32_t my_bytes_read = 0;
360
996
    s = DecodeSize(start_offset + *bytes_read, &entry_type, &size,
361
996
                   &my_bytes_read);
362
996
    if (!s.ok()) {
363
0
      return s;
364
0
    }
365
996
    if (my_bytes_read == 0) {
366
0
      return STATUS(Corruption, "Unexpected EOF when reading size of the key");
367
0
    }
368
996
    *bytes_read += my_bytes_read;
369
370
996
    switch (entry_type) {
371
468
      case kFullKey: {
372
468
        expect_suffix = false;
373
468
        Slice decoded_internal_key;
374
468
        s = ReadInternalKey(start_offset + *bytes_read, size, parsed_key,
375
468
                            bytes_read, &decoded_internal_key_valid,
376
468
                            &decoded_internal_key);
377
468
        if (!s.ok()) {
378
0
          return s;
379
0
        }
380
468
        if (!file_reader_.file_info()->is_mmap_mode ||
381
468
            
(234
internal_key != nullptr234
&&
!decoded_internal_key_valid86
)) {
382
          // In non-mmap mode, always need to make a copy of keys returned to
383
          // users, because after reading value for the key, the key might
384
          // be invalid.
385
234
          cur_key_.SetInternalKey(*parsed_key);
386
234
          saved_user_key_ = cur_key_.GetKey();
387
234
          if (!file_reader_.file_info()->is_mmap_mode) {
388
234
            parsed_key->user_key = Slice(cur_key_.GetKey().data(), size);
389
234
          }
390
234
          if (internal_key != nullptr) {
391
86
            *internal_key = cur_key_.GetKey();
392
86
          }
393
234
        } else {
394
234
          if (internal_key != nullptr) {
395
86
            *internal_key = decoded_internal_key;
396
86
          }
397
234
          saved_user_key_ = parsed_key->user_key;
398
234
        }
399
468
        break;
400
468
      }
401
130
      case kPrefixFromPreviousKey: {
402
130
        if (seekable != nullptr) {
403
46
          *seekable = false;
404
46
        }
405
130
        prefix_len_ = size;
406
130
        assert(prefix_extractor_ == nullptr ||
407
130
               prefix_extractor_->Transform(saved_user_key_).size() ==
408
130
                   prefix_len_);
409
        // Need read another size flag for suffix
410
0
        expect_suffix = true;
411
130
        break;
412
468
      }
413
398
      case kKeySuffix: {
414
398
        expect_suffix = false;
415
398
        if (seekable != nullptr) {
416
122
          *seekable = false;
417
122
        }
418
419
398
        Slice tmp_slice;
420
398
        s = ReadInternalKey(start_offset + *bytes_read, size, parsed_key,
421
398
                            bytes_read, &decoded_internal_key_valid,
422
398
                            &tmp_slice);
423
398
        if (!s.ok()) {
424
0
          return s;
425
0
        }
426
398
        if (!file_reader_.file_info()->is_mmap_mode) {
427
          // In non-mmap mode, we need to make a copy of keys returned to
428
          // users, because after reading value for the key, the key might
429
          // be invalid.
430
          // saved_user_key_ points to cur_key_. We are making a copy of
431
          // the prefix part to another string, and construct the current
432
          // key from the prefix part and the suffix part back to cur_key_.
433
199
          std::string tmp =
434
199
              Slice(saved_user_key_.data(), prefix_len_).ToString();
435
199
          cur_key_.Reserve(prefix_len_ + size);
436
199
          cur_key_.SetInternalKey(tmp, *parsed_key);
437
199
          parsed_key->user_key =
438
199
              Slice(cur_key_.GetKey().data(), prefix_len_ + size);
439
199
          saved_user_key_ = cur_key_.GetKey();
440
199
        } else {
441
199
          cur_key_.Reserve(prefix_len_ + size);
442
199
          cur_key_.SetInternalKey(Slice(saved_user_key_.data(), prefix_len_),
443
199
                                  *parsed_key);
444
199
        }
445
398
        parsed_key->user_key = ExtractUserKey(cur_key_.GetKey());
446
398
        if (internal_key != nullptr) {
447
254
          *internal_key = cur_key_.GetKey();
448
254
        }
449
398
        break;
450
398
      }
451
0
      default:
452
0
        return STATUS(Corruption, "Un-identified size flag.");
453
996
    }
454
996
  } while (expect_suffix);  // Another round if suffix is expected.
455
866
  return Status::OK();
456
866
}
457
458
Status PlainTableKeyDecoder::NextKey(uint32_t start_offset,
459
                                     ParsedInternalKey* parsed_key,
460
                                     Slice* internal_key, Slice* value,
461
3.74M
                                     uint32_t* bytes_read, bool* seekable) {
462
3.74M
  assert(value != nullptr);
463
0
  Status s = NextKeyNoValue(start_offset, parsed_key, internal_key, bytes_read,
464
3.74M
                            seekable);
465
3.74M
  if (
s.ok()3.74M
) {
466
3.74M
    assert(bytes_read != nullptr);
467
0
    uint32_t value_size;
468
3.74M
    uint32_t value_size_bytes;
469
3.74M
    bool success = file_reader_.ReadVarint32(start_offset + *bytes_read,
470
3.74M
                                             &value_size, &value_size_bytes);
471
3.74M
    if (!success) {
472
0
      return file_reader_.status();
473
0
    }
474
3.74M
    if (value_size_bytes == 0) {
475
0
      return STATUS(Corruption,
476
0
          "Unexpected EOF when reading the next value's size.");
477
0
    }
478
3.74M
    *bytes_read += value_size_bytes;
479
3.74M
    success = file_reader_.Read(start_offset + *bytes_read, value_size, value);
480
3.74M
    if (!success) {
481
0
      return file_reader_.status();
482
0
    }
483
3.74M
    *bytes_read += value_size;
484
3.74M
  }
485
3.74M
  return s;
486
3.74M
}
487
488
Status PlainTableKeyDecoder::NextKeyNoValue(uint32_t start_offset,
489
                                            ParsedInternalKey* parsed_key,
490
                                            Slice* internal_key,
491
                                            uint32_t* bytes_read,
492
4.17M
                                            bool* seekable) {
493
4.17M
  *bytes_read = 0;
494
4.17M
  if (seekable != nullptr) {
495
975k
    *seekable = true;
496
975k
  }
497
4.17M
  Status s;
498
4.17M
  if (encoding_type_ == kPlain) {
499
4.17M
    return NextPlainEncodingKey(start_offset, parsed_key, internal_key,
500
4.17M
                                bytes_read, seekable);
501
4.17M
  } else {
502
88
    assert(encoding_type_ == kPrefix);
503
0
    return NextPrefixEncodingKey(start_offset, parsed_key, internal_key,
504
88
                                 bytes_read, seekable);
505
88
  }
506
4.17M
}
507
508
}  // namespace rocksdb
509
#endif  // ROCKSDB_LIT