YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/common/partition.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/common/partition.h"
34
35
#include <algorithm>
36
#include <set>
37
38
#include <glog/logging.h>
39
40
#include "yb/common/common.pb.h"
41
#include "yb/common/crc16.h"
42
#include "yb/common/key_encoder.h"
43
#include "yb/common/partial_row.h"
44
#include "yb/common/pgsql_protocol.pb.h"
45
#include "yb/common/ql_value.h"
46
#include "yb/common/row.h"
47
#include "yb/common/schema.h"
48
49
#include "yb/docdb/doc_key.h"
50
51
#include "yb/gutil/hash/hash.h"
52
#include "yb/gutil/map-util.h"
53
#include "yb/gutil/strings/join.h"
54
#include "yb/gutil/strings/substitute.h"
55
56
#include "yb/util/status_format.h"
57
#include "yb/util/yb_partition.h"
58
59
#include "yb/yql/redis/redisserver/redis_constants.h"
60
61
namespace yb {
62
63
using std::set;
64
using std::string;
65
using std::vector;
66
67
using google::protobuf::RepeatedPtrField;
68
using strings::Substitute;
69
70
// The encoded size of a hash bucket in a partition key.
71
static const size_t kEncodedBucketSize = sizeof(uint32_t);
72
73
5.91k
Slice Partition::range_key_start() const {
74
5.91k
  return range_key(partition_key_start());
75
5.91k
}
76
77
5.91k
Slice Partition::range_key_end() const {
78
5.91k
  return range_key(partition_key_end());
79
5.91k
}
80
81
11.8k
Slice Partition::range_key(const string& partition_key) const {
82
11.8k
  size_t hash_size = kEncodedBucketSize * hash_buckets().size();
83
11.8k
  if (partition_key.size() > hash_size) {
84
0
    Slice s = Slice(partition_key);
85
0
    s.remove_prefix(hash_size);
86
0
    return s;
87
11.8k
  } else {
88
11.8k
    return Slice();
89
11.8k
  }
90
11.8k
}
91
92
882k
void Partition::ToPB(PartitionPB* pb) const {
93
882k
  pb->Clear();
94
882k
  pb->mutable_hash_buckets()->Reserve(narrow_cast<int>(hash_buckets_.size()));
95
0
  for (int32_t bucket : hash_buckets()) {
96
0
    pb->add_hash_buckets(bucket);
97
0
  }
98
882k
  pb->set_partition_key_start(partition_key_start());
99
882k
  pb->set_partition_key_end(partition_key_end());
100
882k
}
101
102
157k
void Partition::FromPB(const PartitionPB& pb, Partition* partition) {
103
157k
  partition->hash_buckets_.clear();
104
157k
  partition->hash_buckets_.reserve(pb.hash_buckets_size());
105
0
  for (int32_t hash_bucket : pb.hash_buckets()) {
106
0
    partition->hash_buckets_.push_back(hash_bucket);
107
0
  }
108
109
157k
  partition->partition_key_start_ = pb.partition_key_start();
110
157k
  partition->partition_key_end_ = pb.partition_key_end();
111
157k
}
112
113
8.20k
std::string Partition::ToString() const {
114
8.20k
  return Format(
115
8.20k
      "{ partition_key_start: $0 partition_key_end: $1 hash_buckets: $2 }",
116
8.20k
      Slice(partition_key_start_).ToDebugString(),
117
8.20k
      Slice(partition_key_end_).ToDebugString(),
118
8.20k
      hash_buckets_);
119
8.20k
}
120
121
namespace {
122
// Extracts the column IDs from a protobuf repeated field of column identifiers.
123
Status ExtractColumnIds(const RepeatedPtrField<PartitionSchemaPB_ColumnIdentifierPB>& identifiers,
124
                        const Schema& schema,
125
791
                        vector<ColumnId>* column_ids) {
126
791
    column_ids->reserve(identifiers.size());
127
1.21k
    for (PartitionSchemaPB_ColumnIdentifierPB identifier : identifiers) {
128
1.21k
      switch (identifier.identifier_case()) {
129
1.09k
        case PartitionSchemaPB_ColumnIdentifierPB::kId: {
130
1.09k
          ColumnId column_id(identifier.id());
131
1.09k
          if (schema.find_column_by_id(column_id) == Schema::kColumnNotFound) {
132
0
            return STATUS(InvalidArgument, "unknown column id", identifier.DebugString());
133
0
          }
134
1.09k
          column_ids->push_back(column_id);
135
1.09k
          continue;
136
1.09k
        }
137
122
        case PartitionSchemaPB_ColumnIdentifierPB::kName: {
138
122
          auto column_idx = schema.find_column(identifier.name());
139
122
          if (column_idx == Schema::kColumnNotFound) {
140
0
            return STATUS(InvalidArgument, "unknown column", identifier.DebugString());
141
0
          }
142
122
          column_ids->push_back(schema.column_id(column_idx));
143
122
          continue;
144
122
        }
145
0
        default: return STATUS(InvalidArgument, "unknown column", identifier.DebugString());
146
1.21k
      }
147
1.21k
    }
148
792
    return Status::OK();
149
791
}
150
// Sets a repeated field of column identifiers to the provided column IDs.
151
void SetColumnIdentifiers(const vector<ColumnId>& column_ids,
152
140M
                          RepeatedPtrField<PartitionSchemaPB_ColumnIdentifierPB>* identifiers) {
153
140M
    identifiers->Reserve(narrow_cast<int>(column_ids.size()));
154
275M
    for (ColumnId column_id : column_ids) {
155
275M
      identifiers->Add()->set_id(column_id);
156
275M
    }
157
140M
}
158
159
} // namespace
160
161
Status PartitionSchema::FromPB(const PartitionSchemaPB& pb,
162
                               const Schema& schema,
163
761k
                               PartitionSchema* partition_schema) {
164
761k
  if (pb.hash_bucket_schemas_size() > 0) {
165
    // Maybe dead code. Leave it here for now just in case we use it.
166
0
    return KuduFromPB(pb, schema, partition_schema);
167
0
  }
168
169
  // TODO(neil) Fix bug github #5832.
170
  // SCHECK(!pb.has_hash_schema() && !pb.has_range_schema(), IllegalState,
171
  //        "Table definition does not specify partition schema");
172
173
  // TODO(neil) We should allow schema definition that has both hash and range partition in PK
174
  // without forcing users to use secondary index, which is a lot slower. However, its
175
  // specification needs to be well-defined and discussed.
176
  //
177
  // One example for interpretation
178
  // - Use range-partition schema to SPLIT by TIME, so that users can archive old data away.
179
  // - Use hash-partition schema for performance purposes.
180
761k
  SCHECK(!pb.has_hash_schema() || !pb.has_range_schema() || pb.range_schema().splits_size() == 0,
181
761k
         Corruption, "Table schema that has both hash and range partition is not yet supported");
182
183
  // Initialize partition schema.
184
761k
  partition_schema->Clear();
185
186
  // YugaByte hash partition.
187
761k
  if (pb.has_hash_schema()) {
188
171k
    switch (pb.hash_schema()) {
189
145k
      case PartitionSchemaPB::MULTI_COLUMN_HASH_SCHEMA:
190
282
        VLOG(3) << "Using multi-column hash value for partitioning";
191
145k
        partition_schema->hash_schema_ = YBHashSchema::kMultiColumnHash;
192
145k
        return Status::OK();
193
194
3.64k
      case PartitionSchemaPB::REDIS_HASH_SCHEMA:
195
5
        VLOG(3) << "Using redis hash schema for partitioning";
196
3.64k
        partition_schema->hash_schema_ = YBHashSchema::kRedisHash;
197
3.64k
        return Status::OK();
198
199
21.2k
      case PartitionSchemaPB::PGSQL_HASH_SCHEMA:
200
79
        VLOG(3) << "Using pgsql hash schema for partitioning";
201
21.2k
        partition_schema->hash_schema_ = YBHashSchema::kPgsqlHash;
202
21.2k
        return Status::OK();
203
590k
    }
204
590k
  }
205
206
  // YugaByte range partition whose schema also defines split_rows.
207
590k
  if (pb.has_range_schema()) {
208
792
    const PartitionSchemaPB_RangeSchemaPB& range_pb = pb.range_schema();
209
792
    RETURN_NOT_OK(ExtractColumnIds(range_pb.columns(), schema,
210
792
                                   &partition_schema->range_schema_.column_ids));
211
792
    partition_schema->range_schema_.splits.reserve(range_pb.splits_size());
212
222
    for (const auto& split : range_pb.splits()) {
213
222
      partition_schema->range_schema_.splits.emplace_back(split.column_bounds());
214
222
    }
215
216
589k
  } else {
217
    // Currently system table schema does not define partitioning method (See github issue #5832).
218
    // NOTE: Each system table uses only one tablet.
219
1.79M
    for (size_t column_idx = 0; column_idx < schema.num_key_columns(); column_idx++) {
220
1.20M
      partition_schema->range_schema_.column_ids.push_back(schema.column_id(column_idx));
221
1.20M
    }
222
589k
  }
223
224
  // Done processing.
225
590k
  return Status::OK();
226
590k
}
227
228
140M
void PartitionSchema::ToPB(PartitionSchemaPB* pb) const {
229
140M
  if (hash_bucket_schemas_.size() > 0) {
230
    // Maybe dead code. Leave it here for now just in case we use it.
231
0
    return KuduToPB(pb);
232
0
  }
233
234
  // Initialize protobuf.
235
140M
  pb->Clear();
236
237
  // Hash partitioning schema.
238
140M
  if (IsHashPartitioning()) {
239
348k
    switch (*hash_schema_) {
240
292k
      case YBHashSchema::kMultiColumnHash:
241
292k
        pb->set_hash_schema(PartitionSchemaPB::MULTI_COLUMN_HASH_SCHEMA);
242
292k
        break;
243
6.89k
      case YBHashSchema::kRedisHash:
244
6.89k
        pb->set_hash_schema(PartitionSchemaPB::REDIS_HASH_SCHEMA);
245
6.89k
        break;
246
48.8k
      case YBHashSchema::kPgsqlHash:
247
48.8k
        pb->set_hash_schema(PartitionSchemaPB::PGSQL_HASH_SCHEMA);
248
48.8k
        break;
249
140M
    }
250
140M
  }
251
252
  // Range partitioning schema.
253
140M
  if (IsRangePartitioning()) {
254
140M
    SetColumnIdentifiers(range_schema_.column_ids, pb->mutable_range_schema()->mutable_columns());
255
770
    for (const auto& split : range_schema_.splits) {
256
770
      pb->mutable_range_schema()->add_splits()->set_column_bounds(split.column_bounds);
257
770
    }
258
140M
  }
259
140M
}
260
261
Status PartitionSchema::KuduFromPB(const PartitionSchemaPB& pb,
262
                                   const Schema& schema,
263
0
                                   PartitionSchema* partition_schema) {
264
  // The following is Kudu's original partitioning code and should not be used for YBTable.
265
  // - Don't modify the following code. Leave it as is.
266
  // - Current system tables in master might still be using this.
267
  // - If this code is deleted, Kudu's original test needs to be updated.
268
0
  partition_schema->Clear();
269
270
0
  for (const PartitionSchemaPB_HashBucketSchemaPB& hash_bucket_pb : pb.hash_bucket_schemas()) {
271
0
    HashBucketSchema hash_bucket;
272
0
    RETURN_NOT_OK(ExtractColumnIds(hash_bucket_pb.columns(), schema, &hash_bucket.column_ids));
273
274
    // Hashing is column-order dependent, so sort the column_ids to ensure that
275
    // hash components with the same columns hash consistently. This is
276
    // important when deserializing a user-supplied partition schema during
277
    // table creation; after that the columns should remain in sorted order.
278
0
    std::sort(hash_bucket.column_ids.begin(), hash_bucket.column_ids.end());
279
280
0
    hash_bucket.seed = hash_bucket_pb.seed();
281
0
    hash_bucket.num_buckets = hash_bucket_pb.num_buckets();
282
0
    partition_schema->hash_bucket_schemas_.push_back(hash_bucket);
283
0
  }
284
285
0
  if (pb.has_range_schema()) {
286
0
    const PartitionSchemaPB_RangeSchemaPB& range_pb = pb.range_schema();
287
0
    RETURN_NOT_OK(ExtractColumnIds(range_pb.columns(), schema,
288
0
                                   &partition_schema->range_schema_.column_ids));
289
0
  } else {
290
    // Fill in the default range partition (PK columns).
291
    // like the sorting above, this should only happen during table creation
292
    // while deserializing the user-provided partition schema.
293
0
    for (size_t column_idx = 0; column_idx < schema.num_key_columns(); column_idx++) {
294
0
      partition_schema->range_schema_.column_ids.push_back(schema.column_id(column_idx));
295
0
    }
296
0
  }
297
298
0
  return partition_schema->Validate(schema);
299
0
}
300
301
0
void PartitionSchema::KuduToPB(PartitionSchemaPB* pb) const {
302
  // The following is Kudu's original partitioning code and should not be used for YBTable.
303
  // - Don't modify the following code. Leave it as is.
304
  // - Current system tables in master might still be using this.
305
  // - If this code is deleted, Kudu's original test needs to be updated.
306
0
  pb->Clear();
307
308
0
  pb->mutable_hash_bucket_schemas()->Reserve(narrow_cast<int>(hash_bucket_schemas_.size()));
309
0
  for (const HashBucketSchema& hash_bucket : hash_bucket_schemas_) {
310
0
    PartitionSchemaPB_HashBucketSchemaPB* hash_bucket_pb = pb->add_hash_bucket_schemas();
311
0
    SetColumnIdentifiers(hash_bucket.column_ids, hash_bucket_pb->mutable_columns());
312
0
    hash_bucket_pb->set_num_buckets(hash_bucket.num_buckets);
313
0
    hash_bucket_pb->set_seed(hash_bucket.seed);
314
0
  }
315
316
0
  SetColumnIdentifiers(range_schema_.column_ids, pb->mutable_range_schema()->mutable_columns());
317
0
}
318
319
11
Status PartitionSchema::EncodeRedisKey(const YBPartialRow& row, string* buf) const {
320
11
  CHECK_EQ(row.schema()->num_hash_key_columns(), 1);
321
11
  ConstContiguousRow cont_row(row.schema(), row.row_data_);
322
11
  return EncodeRedisKey(cont_row, buf);
323
11
}
324
325
11
Status PartitionSchema::EncodeRedisKey(const ConstContiguousRow& row, string* buf) const {
326
11
  auto slice = reinterpret_cast<const Slice*>(row.cell_ptr(0));
327
11
  return EncodeRedisKey(*slice, buf);
328
11
}
329
330
208k
Status PartitionSchema::EncodeRedisKey(const Slice& slice, string* buf) const {
331
208k
  size_t i = 0;
332
3.16M
  for (i = 0; i < slice.size(); i++) {
333
2.95M
    if (slice.data()[i] == '{') break;
334
2.95M
  }
335
336
208k
  for (size_t j = i + 1; j < slice.size(); j++) {
337
34
    if (slice.data()[j] == '}') {
338
9
      if (j - i > 1) {
339
6
        *buf = EncodeMultiColumnHashValue(
340
6
            crc16(&slice.data()[i + 1], j - i - 1) % kRedisClusterSlots);
341
6
        return Status::OK();
342
6
      }
343
      // We only search up to the first '}' character following the first '{' character.
344
3
      break;
345
3
    }
346
34
  }
347
348
208k
  *buf = EncodeMultiColumnHashValue(crc16(slice.data(), slice.size()) % kRedisClusterSlots);
349
208k
  return Status::OK();
350
208k
}
351
352
Status PartitionSchema::EncodeKey(const RepeatedPtrField<QLExpressionPB>& hash_col_values,
353
6.75M
                                  string* buf) const {
354
6.75M
  if (!hash_schema_) {
355
0
    return Status::OK();
356
0
  }
357
358
6.75M
  switch (*hash_schema_) {
359
6.70M
    case YBHashSchema::kMultiColumnHash: {
360
6.70M
      string tmp;
361
6.78M
      for (const auto &col_expr_pb : hash_col_values) {
362
6.78M
        AppendToKey(col_expr_pb.value(), &tmp);
363
6.78M
      }
364
6.70M
      const uint16_t hash_value = YBPartition::HashColumnCompoundValue(tmp);
365
6.70M
      *buf = EncodeMultiColumnHashValue(hash_value);
366
6.70M
      return Status::OK();
367
0
    }
368
0
    case YBHashSchema::kPgsqlHash:
369
0
      DLOG(FATAL) << "Illegal code path. PGSQL hash cannot be computed from CQL expression";
370
0
      break;
371
0
    case YBHashSchema::kRedisHash:
372
0
      DLOG(FATAL) << "Illegal code path. REDIS hash cannot be computed from CQL expression";
373
0
      break;
374
0
  }
375
376
0
  return STATUS(InvalidArgument, "Unsupported Partition Schema Type.");
377
0
}
378
379
Status PartitionSchema::EncodeKey(const RepeatedPtrField<PgsqlExpressionPB>& hash_col_values,
380
3.17M
                                  string* buf) const {
381
3.17M
  if (!hash_schema_) {
382
0
    return Status::OK();
383
0
  }
384
385
3.17M
  switch (*hash_schema_) {
386
3.17M
    case YBHashSchema::kPgsqlHash: {
387
      // TODO(neil) Discussion is needed. PGSQL hash should be done appropriately.
388
      // For now, let's not doing anything. Just borrow code from multi column hashing style.
389
3.17M
      string tmp;
390
3.27M
      for (const auto &col_expr_pb : hash_col_values) {
391
3.27M
        AppendToKey(col_expr_pb.value(), &tmp);
392
3.27M
      }
393
3.17M
      const uint16_t hash_value = YBPartition::HashColumnCompoundValue(tmp);
394
3.17M
      *buf = EncodeMultiColumnHashValue(hash_value);
395
3.17M
      return Status::OK();
396
0
    }
397
398
0
    case YBHashSchema::kMultiColumnHash:
399
0
      DLOG(FATAL) << "Illegal code path. CQL hash cannot be computed from PGSQL expression";
400
0
      break;
401
402
0
    case YBHashSchema::kRedisHash:
403
0
      DLOG(FATAL) << "Illegal code path. REDIS hash cannot be computed from PGSQL expression";
404
0
      break;
405
0
  }
406
407
0
  return STATUS(InvalidArgument, "Unsupported Partition Schema Type.");
408
0
}
409
410
0
Status PartitionSchema::EncodeKey(const YBPartialRow& row, string* buf) const {
411
412
0
  if (hash_schema_) {
413
0
    switch (*hash_schema_) {
414
0
      case YBHashSchema::kPgsqlHash:
415
        // TODO(neil) Discussion is needed. PGSQL hash should be done appropriately.
416
        // For now, let's not doing anything. Just borrow code from multi column hashing style.
417
0
        FALLTHROUGH_INTENDED;
418
0
      case YBHashSchema::kMultiColumnHash:
419
0
        return EncodeColumns(row, buf);
420
0
      case YBHashSchema::kRedisHash:
421
0
        return EncodeRedisKey(row, buf);
422
0
    }
423
0
  }
424
425
0
  const KeyEncoder<string>& hash_encoder = GetKeyEncoder<string>(GetTypeInfo(UINT32));
426
427
0
  for (const HashBucketSchema& hash_bucket_schema : hash_bucket_schemas_) {
428
0
    int32_t bucket;
429
0
    RETURN_NOT_OK(BucketForRow(row, hash_bucket_schema, &bucket));
430
0
    hash_encoder.Encode(&bucket, buf);
431
0
  }
432
433
0
  return EncodeColumns(row, range_schema_.column_ids, buf);
434
0
}
435
436
0
Status PartitionSchema::EncodeKey(const ConstContiguousRow& row, string* buf) const {
437
0
  if (hash_schema_) {
438
0
    switch (*hash_schema_) {
439
0
      case YBHashSchema::kRedisHash:
440
0
        LOG(FATAL) << "Invalid hash schema kRedisHash passed to EncodeKey";
441
0
      case YBHashSchema::kPgsqlHash:
442
        // TODO(neil) Discussion is needed. PGSQL hash should be done appropriately.
443
        // For now, let's not doing anything. Just borrow code from multi column hashing style.
444
0
        FALLTHROUGH_INTENDED;
445
0
      case YBHashSchema::kMultiColumnHash:
446
0
        return EncodeColumns(row, buf);
447
0
    }
448
0
  }
449
450
0
  const KeyEncoder<string>& hash_encoder = GetKeyEncoder<string>(GetTypeInfo(UINT32));
451
0
  for (const HashBucketSchema& hash_bucket_schema : hash_bucket_schemas_) {
452
0
    int32_t bucket;
453
0
    RETURN_NOT_OK(BucketForRow(row, hash_bucket_schema, &bucket));
454
0
    hash_encoder.Encode(&bucket, buf);
455
0
  }
456
457
0
  return EncodeColumns(row, range_schema_.column_ids, buf);
458
0
}
459
460
12.2M
string PartitionSchema::EncodeMultiColumnHashValue(uint16_t hash_value) {
461
12.2M
  char value_bytes[kPartitionKeySize];
462
12.2M
  value_bytes[0] = hash_value >> 8;
463
12.2M
  value_bytes[1] = hash_value & 0xff;
464
12.2M
  return string(value_bytes, kPartitionKeySize);
465
12.2M
}
466
467
20.4M
uint16_t PartitionSchema::DecodeMultiColumnHashValue(const string& partition_key) {
468
20.4M
  DCHECK_EQ(partition_key.size(), kPartitionKeySize);
469
20.4M
  const uint8_t *bytes = reinterpret_cast<const uint8_t *>(partition_key.data());
470
20.4M
  return (bytes[0] << 8) | bytes[1];
471
20.4M
}
472
473
string PartitionSchema::GetEncodedKeyPrefix(
474
27
    const string& partition_key, const PartitionSchemaPB& partition_schema) {
475
27
  if (partition_schema.has_hash_schema()) {
476
27
    const auto doc_key_hash = PartitionSchema::DecodeMultiColumnHashValue(partition_key);
477
27
    docdb::KeyBytes split_encoded_key_bytes;
478
27
    docdb::DocKeyEncoderAfterTableIdStep(&split_encoded_key_bytes)
479
27
      .Hash(doc_key_hash, std::vector<docdb::PrimitiveValue>());
480
27
    return split_encoded_key_bytes.ToStringBuffer();
481
27
  }
482
0
  return partition_key;
483
0
}
484
485
Status PartitionSchema::IsValidHashPartitionRange(const string& partition_key_start,
486
0
                                                  const string& partition_key_end) {
487
0
  if (!IsValidHashPartitionKeyBound(partition_key_start) ||
488
0
      !IsValidHashPartitionKeyBound(partition_key_end)) {
489
0
    return STATUS(InvalidArgument, "Passed in partition keys are not hash partitions.");
490
0
  }
491
0
  if (partition_key_start.empty() || partition_key_end.empty()) {
492
    // We consider the empty string an open bound (0 for start, 0xFFFF for end), so this is a valid
493
    // range.
494
0
    return Status::OK();
495
0
  }
496
0
  if (PartitionSchema::DecodeMultiColumnHashValue(partition_key_start) >=
497
0
      PartitionSchema::DecodeMultiColumnHashValue(partition_key_end)) {
498
0
    return STATUS(InvalidArgument,
499
0
                  Format("Invalid arguments for partition_key_start: $0 and partition_key_end: $1",
500
0
                  Slice(partition_key_start).ToDebugHexString(),
501
0
                  Slice(partition_key_end).ToDebugHexString()));
502
0
  }
503
0
  return Status::OK();
504
0
}
505
506
0
bool PartitionSchema::IsValidHashPartitionKeyBound(const string& partition_key) {
507
0
  return partition_key.empty() || partition_key.size() == kPartitionKeySize;
508
0
}
509
510
1.66k
Status PartitionSchema::CreateRangePartitions(std::vector<Partition>* partitions) const {
511
  // Create the start range keys.
512
  // NOTE: When converting FromPB to partition schema, we already error-check, so we don't need
513
  // to error-check again for its content here.
514
1.66k
  partitions->clear();
515
1.66k
  string start_key;
516
15
  for (const auto& split : range_schema_.splits) {
517
15
    Partition partition;
518
15
    partition.partition_key_start_.append(start_key);
519
15
    partition.partition_key_end_.append(split.column_bounds);
520
15
    partitions->push_back(partition);
521
15
    start_key = split.column_bounds;
522
15
  }
523
524
  // Add the final partition
525
1.66k
  Partition partition;
526
1.66k
  partition.partition_key_start_.append(start_key);
527
1.66k
  partitions->push_back(partition);
528
1.66k
  return Status::OK();
529
1.66k
}
530
531
Status PartitionSchema::CreateHashPartitions(int32_t num_tablets,
532
                                             vector<Partition> *partitions,
533
31.6k
                                             int32_t max_partition_key) const {
534
31.6k
  DCHECK_GT(max_partition_key, 0);
535
31.6k
  DCHECK_LE(max_partition_key, kMaxPartitionKey);
536
537
31.6k
  if (max_partition_key <= 0 || max_partition_key > kMaxPartitionKey) {
538
0
    return STATUS_SUBSTITUTE(InvalidArgument, "max_partition_key $0 should be in ($1, $2].",
539
0
                             0, kMaxPartitionKey);
540
0
  }
541
542
31.6k
  LOG(INFO) << "Creating partitions with num_tablets: " << num_tablets;
543
544
  // May be also add an upper bound? TODO.
545
31.6k
  if (num_tablets <= 0) {
546
2
    return STATUS_SUBSTITUTE(InvalidArgument, "num_tablets should be greater than 0. Client "
547
2
                             "would need to wait for master leader get heartbeats from tserver.");
548
2
  }
549
31.6k
  if (num_tablets > 0xffff) {
550
0
    return STATUS_SUBSTITUTE(InvalidArgument, "Too many tablets requested: $0", num_tablets);
551
0
  }
552
553
  // Allocate the partitions.
554
31.6k
  partitions->resize(num_tablets);
555
31.6k
  const uint16_t partition_interval = max_partition_key / num_tablets;
556
557
31.6k
  uint16_t pstart;
558
31.6k
  uint16_t pend = 0;
559
87.5k
  for (int partition_index = 0; partition_index < num_tablets; partition_index++) {
560
55.9k
    pstart = pend;
561
55.9k
    pend = (partition_index + 1) * partition_interval;
562
563
    // For the first tablet, start key is open-ended:
564
55.9k
    if (partition_index != 0) {
565
24.3k
      (*partitions)[partition_index].partition_key_start_ = EncodeMultiColumnHashValue(pstart);
566
24.3k
    }
567
568
55.9k
    if (partition_index < num_tablets - 1) {
569
24.3k
      (*partitions)[partition_index].partition_key_end_ = EncodeMultiColumnHashValue(pend);
570
24.3k
    }
571
55.9k
  }
572
573
31.6k
  return Status::OK();
574
31.6k
}
575
576
33.2k
Status PartitionSchema::CreatePartitions(int32_t num_tablets, vector<Partition> *partitions) const {
577
33.2k
  SCHECK(!hash_schema_ || !IsRangePartitioning(), IllegalState,
578
33.2k
         "Schema containing both hash and range partitioning is not yet supported");
579
580
33.2k
  if (!IsHashPartitioning() && !IsRangePartitioning()) {
581
    // Partitioning method is not defined. This bug is file as github issue #5832.
582
    // For compatibility reasons, we create tablet using HASH schema option. However, we should
583
    // have created tablet using RANGE schema option.
584
2.30k
    return CreateHashPartitions(num_tablets, partitions);
585
2.30k
  }
586
587
30.9k
  if (IsHashPartitioning()) {
588
29.2k
    switch (*hash_schema_) {
589
1.33k
      case YBHashSchema::kPgsqlHash:
590
        // TODO(neil) After a discussion, PGSQL hash should be done appropriately.
591
        // For now, let's not doing anything. Just borrow the multi column hash.
592
1.33k
        FALLTHROUGH_INTENDED;
593
29.1k
      case YBHashSchema::kMultiColumnHash: {
594
        // Use the given number of tablets to create partitions and ignore the other schema
595
        // options in the request.
596
29.1k
        RETURN_NOT_OK(CreateHashPartitions(num_tablets, partitions));
597
29.1k
        break;
598
29.1k
      }
599
107
      case YBHashSchema::kRedisHash: {
600
107
        RETURN_NOT_OK(CreateHashPartitions(num_tablets, partitions, kRedisClusterSlots));
601
107
        break;
602
30.9k
      }
603
29.2k
    }
604
29.2k
  }
605
606
30.9k
  if (IsRangePartitioning()) {
607
1.66k
    RETURN_NOT_OK(CreateRangePartitions(partitions));
608
1.66k
  }
609
610
30.9k
  return Status::OK();
611
30.9k
}
612
613
Status PartitionSchema::CreatePartitions(const vector<YBPartialRow>& split_rows,
614
                                         const Schema& schema,
615
5.90k
                                         vector<Partition>* partitions) const {
616
5.90k
  const KeyEncoder<string>& hash_encoder = GetKeyEncoder<string>(GetTypeInfo(UINT32));
617
618
  // Create a partition per hash bucket combination.
619
5.90k
  *partitions = vector<Partition>(1);
620
0
  for (const HashBucketSchema& bucket_schema : hash_bucket_schemas_) {
621
0
    vector<Partition> new_partitions;
622
    // For each of the partitions created so far, replicate it
623
    // by the number of buckets in the next hash bucketing component
624
0
    for (const Partition& base_partition : *partitions) {
625
0
      for (int32_t bucket = 0; bucket < bucket_schema.num_buckets; bucket++) {
626
0
        Partition partition = base_partition;
627
0
        partition.hash_buckets_.push_back(bucket);
628
0
        hash_encoder.Encode(&bucket, &partition.partition_key_start_);
629
0
        hash_encoder.Encode(&bucket, &partition.partition_key_end_);
630
0
        new_partitions.push_back(partition);
631
0
      }
632
0
    }
633
0
    partitions->swap(new_partitions);
634
0
  }
635
636
5.90k
  std::unordered_set<size_t> range_column_idxs;
637
11.2k
  for (ColumnId column_id : range_schema_.column_ids) {
638
11.2k
    int column_idx = schema.find_column_by_id(column_id);
639
11.2k
    if (column_idx == Schema::kColumnNotFound) {
640
0
      return STATUS_FORMAT(
641
0
          InvalidArgument, "Range partition column ID $0 not found in table schema", column_id);
642
0
    }
643
11.2k
    if (!InsertIfNotPresent(&range_column_idxs, column_idx)) {
644
0
      return STATUS(InvalidArgument, "Duplicate column in range partition",
645
0
                                     schema.column(column_idx).name());
646
0
    }
647
11.2k
  }
648
649
  // Create the start range keys.
650
5.90k
  set<string> start_keys;
651
5.90k
  string start_key;
652
0
  for (const YBPartialRow& row : split_rows) {
653
0
    int column_count = 0;
654
0
    for (size_t column_idx = 0; column_idx < schema.num_columns(); column_idx++) {
655
0
      const ColumnSchema& column = schema.column(column_idx);
656
0
      if (row.IsColumnSet(column_idx)) {
657
0
        if (ContainsKey(range_column_idxs, column_idx)) {
658
0
          column_count++;
659
0
        } else {
660
0
          return STATUS(InvalidArgument, "Split rows may only contain values for "
661
0
                                         "range partitioned columns", column.name());
662
0
        }
663
0
      }
664
0
    }
665
666
    // Check for an empty split row.
667
0
    if (column_count == 0) {
668
0
    return STATUS(InvalidArgument, "Split rows must contain a value for at "
669
0
                                   "least one range partition column");
670
0
    }
671
672
0
    start_key.clear();
673
0
    RETURN_NOT_OK(EncodeColumns(row, range_schema_.column_ids, &start_key));
674
675
    // Check for a duplicate split row.
676
0
    if (!InsertIfNotPresent(&start_keys, start_key)) {
677
0
      return STATUS(InvalidArgument, "Duplicate split row", row.ToString());
678
0
    }
679
0
  }
680
681
  // Create a partition per range and hash bucket combination.
682
5.90k
  vector<Partition> new_partitions;
683
5.90k
  for (const Partition& base_partition : *partitions) {
684
5.90k
    start_key.clear();
685
686
0
    for (const string& end_key : start_keys) {
687
0
      Partition partition = base_partition;
688
0
      partition.partition_key_start_.append(start_key);
689
0
      partition.partition_key_end_.append(end_key);
690
0
      new_partitions.push_back(partition);
691
0
      start_key = end_key;
692
0
    }
693
694
    // Add the final range.
695
5.90k
    Partition partition = base_partition;
696
5.90k
    partition.partition_key_start_.append(start_key);
697
5.90k
    new_partitions.push_back(partition);
698
5.90k
  }
699
5.90k
  partitions->swap(new_partitions);
700
701
  // Note: the following discussion and logic only takes effect when the table's
702
  // partition schema includes at least one hash bucket component.
703
  //
704
  // At this point, we have the full set of partitions built up, but each
705
  // partition only covers a finite slice of the partition key-space. Some
706
  // operations involving partitions are easier (pruning, client meta cache) if
707
  // it can be assumed that the partition keyspace does not have holes.
708
  //
709
  // In order to 'fill in' the partition key space, the absolute first and last
710
  // partitions are extended to cover the rest of the lower and upper partition
711
  // range by clearing the start and end partition key, respectively.
712
  //
713
  // When the table has two or more hash components, there will be gaps in
714
  // between partitions at the boundaries of the component ranges. Similar to
715
  // the absolute start and end case, these holes are filled by clearing the
716
  // partition key beginning at the hash component. For a concrete example,
717
  // see PartitionTest::TestCreatePartitions.
718
5.90k
  for (Partition& partition : *partitions) {
719
5.90k
    if (partition.range_key_start().empty()) {
720
5.90k
      for (size_t i = partition.hash_buckets().size(); i > 0;) {
721
0
        --i;
722
0
        if (partition.hash_buckets()[i] != 0) {
723
0
          break;
724
0
        }
725
0
        partition.partition_key_start_.erase(kEncodedBucketSize * i);
726
0
      }
727
5.90k
    }
728
5.90k
    if (partition.range_key_end().empty()) {
729
5.90k
      for (size_t i = partition.hash_buckets().size(); i > 0;) {
730
0
        --i;
731
0
        partition.partition_key_end_.erase(kEncodedBucketSize * i);
732
0
        int32_t hash_bucket = partition.hash_buckets()[i] + 1;
733
0
        if (hash_bucket != hash_bucket_schemas_[i].num_buckets) {
734
0
          hash_encoder.Encode(&hash_bucket, &partition.partition_key_end_);
735
0
          break;
736
0
        }
737
0
      }
738
5.90k
    }
739
5.90k
  }
740
741
5.90k
  return Status::OK();
742
5.90k
}
743
744
template<typename Row>
745
Status PartitionSchema::PartitionContainsRowImpl(const Partition& partition,
746
                                                 const Row& row,
747
0
                                                 bool* contains) const {
748
0
  CHECK_EQ(partition.hash_buckets().size(), hash_bucket_schemas_.size());
749
0
  for (size_t i = 0; i < hash_bucket_schemas_.size(); i++) {
750
0
    const HashBucketSchema& hash_bucket_schema = hash_bucket_schemas_[i];
751
0
    int32_t bucket;
752
0
    RETURN_NOT_OK(BucketForRow(row, hash_bucket_schema, &bucket));
753
754
0
    if (bucket != partition.hash_buckets()[i]) {
755
0
      *contains = false;
756
0
      return Status::OK();
757
0
    }
758
0
  }
759
760
0
  string partition_key;
761
0
  if (hash_schema_) {
762
0
    switch (*hash_schema_) {
763
0
      case YBHashSchema::kPgsqlHash:
764
        // TODO(neil) Discussion is needed. PGSQL hash should be done appropriately.
765
        // For now, let's not doing anything. Just borrow code from multi column hashing style.
766
0
        FALLTHROUGH_INTENDED;
767
0
      case YBHashSchema::kMultiColumnHash:
768
0
        RETURN_NOT_OK(EncodeColumns(row, &partition_key));
769
0
        break;
770
0
      case YBHashSchema::kRedisHash:
771
0
        RETURN_NOT_OK(EncodeRedisKey(row, &partition_key));
772
0
        break;
773
0
    }
774
0
  }
775
776
  // If all of the hash buckets match, then the row is contained in the
777
  // partition if the row is gte the lower bound; and if there is no upper
778
  // bound, or the row is lt the upper bound.
779
0
  *contains = (Slice(partition_key).compare(partition.range_key_start()) >= 0)
780
0
           && (partition.range_key_end().empty()
781
0
                || Slice(partition_key).compare(partition.range_key_end()) < 0);
782
783
0
  return Status::OK();
784
0
}
Unexecuted instantiation: _ZNK2yb15PartitionSchema24PartitionContainsRowImplINS_12YBPartialRowEEENS_6StatusERKNS_9PartitionERKT_Pb
Unexecuted instantiation: _ZNK2yb15PartitionSchema24PartitionContainsRowImplINS_18ConstContiguousRowEEENS_6StatusERKNS_9PartitionERKT_Pb
785
786
Status PartitionSchema::PartitionContainsRow(const Partition& partition,
787
                                             const YBPartialRow& row,
788
0
                                             bool* contains) const {
789
0
  return PartitionContainsRowImpl(partition, row, contains);
790
0
}
791
792
Status PartitionSchema::PartitionContainsRow(const Partition& partition,
793
                                             const ConstContiguousRow& row,
794
0
                                             bool* contains) const {
795
0
  return PartitionContainsRowImpl(partition, row, contains);
796
0
}
797
798
Status PartitionSchema::DecodeRangeKey(Slice* encoded_key,
799
                                       YBPartialRow* row,
800
4
                                       Arena* arena) const {
801
4
  ContiguousRow cont_row(row->schema(), row->row_data_);
802
8
  for (size_t i = 0; i < range_schema_.column_ids.size(); i++) {
803
804
4
    if (encoded_key->empty()) {
805
      // This can happen when decoding partition start and end keys, since they
806
      // are truncated to simulate absolute upper and lower bounds.
807
4
      continue;
808
4
    }
809
810
0
    int32_t column_idx = row->schema()->find_column_by_id(range_schema_.column_ids[i]);
811
0
    const ColumnSchema& column = row->schema()->column(column_idx);
812
0
    const KeyEncoder<faststring>& key_encoder = GetKeyEncoder<faststring>(column.type_info());
813
0
    bool is_last = i == (range_schema_.column_ids.size() - 1);
814
815
    // Decode the column.
816
0
    RETURN_NOT_OK_PREPEND(key_encoder.Decode(encoded_key,
817
0
                                             is_last,
818
0
                                             arena,
819
0
                                             cont_row.mutable_cell_ptr(column_idx)),
820
0
                          Substitute("Error decoding partition key range component '$0'",
821
0
                                     column.name()));
822
    // Mark the column as set.
823
0
    BitmapSet(row->isset_bitmap_, column_idx);
824
0
  }
825
4
  if (!encoded_key->empty()) {
826
0
    return STATUS(InvalidArgument, "unable to fully decode partition key range components");
827
0
  }
828
4
  return Status::OK();
829
4
}
830
831
// Decodes a slice of a partition key into the buckets. The slice is modified to
832
// remove the hash components.
833
Status PartitionSchema::DecodeHashBuckets(Slice* encoded_key,
834
0
                                          vector<int32_t>* buckets) const {
835
0
  size_t hash_components_size = kEncodedBucketSize * hash_bucket_schemas_.size();
836
0
  if (encoded_key->size() < hash_components_size) {
837
0
    return STATUS(InvalidArgument,
838
0
        Substitute("expected encoded hash key to be at least $0 bytes (only found $1)",
839
0
                   hash_components_size, encoded_key->size()));
840
0
  }
841
0
  for (const auto& schema : hash_bucket_schemas_) {
842
0
    (void) schema; // quiet unused variable warning
843
0
    uint32_t big_endian;
844
0
    memcpy(&big_endian, encoded_key->data(), sizeof(uint32_t));
845
0
    buckets->push_back(BigEndian::ToHost32(big_endian));
846
0
    encoded_key->remove_prefix(sizeof(uint32_t));
847
0
  }
848
849
0
  return Status::OK();
850
0
}
851
852
string PartitionSchema::RangePartitionDebugString(const Partition& partition,
853
259
                                                  const Schema& schema) const {
854
259
  CHECK(!schema.num_hash_key_columns());
855
259
  std::string s;
856
259
  s.append("range: [");
857
259
  if (partition.partition_key_start().empty()) {
858
206
    s.append("<start>");
859
53
  } else {
860
53
    s.append(docdb::DocKey::DebugSliceToString(partition.partition_key_start()));
861
53
  }
862
259
  s.append(", ");
863
259
  if (partition.partition_key_end().empty()) {
864
206
    s.append("<end>");
865
53
  } else {
866
53
    s.append(docdb::DocKey::DebugSliceToString(partition.partition_key_end()));
867
53
  }
868
259
  s.append(")");
869
259
  return s;
870
259
}
871
872
string PartitionSchema::PartitionDebugString(const Partition& partition,
873
79.0k
                                             const Schema& schema) const {
874
875
79.0k
  if (schema.num_hash_key_columns() == 0) {
876
231
    return RangePartitionDebugString(partition, schema);
877
231
  }
878
879
78.8k
  string s;
880
78.8k
  if (hash_schema_) {
881
78.4k
    switch (*hash_schema_) {
882
3.18k
      case YBHashSchema::kRedisHash: FALLTHROUGH_INTENDED;
883
13.3k
      case YBHashSchema::kPgsqlHash: FALLTHROUGH_INTENDED;
884
78.1k
      case YBHashSchema::kMultiColumnHash: {
885
78.1k
        const string& pstart = partition.partition_key_start();
886
78.1k
        const string& pend = partition.partition_key_end();
887
68.6k
        uint16_t hash_start = !pstart.empty() ? DecodeMultiColumnHashValue(pstart) : 0;
888
68.8k
        uint16_t hash_end = !pend.empty() ? DecodeMultiColumnHashValue(pend) : UINT16_MAX;
889
78.1k
        s.append(Substitute("hash_split: [0x$0, 0x$1)",
890
78.1k
                            Uint16ToHexString(hash_start), Uint16ToHexString(hash_end)));
891
78.1k
        return s;
892
408
      }
893
78.4k
    }
894
78.4k
  }
895
896
408
  if (!partition.hash_buckets().empty()) {
897
0
    vector<string> components;
898
0
    for (int32_t bucket : partition.hash_buckets()) {
899
0
      components.push_back(Substitute("$0", bucket));
900
0
    }
901
0
    s.append("hash buckets: (");
902
0
    s.append(JoinStrings(components, ", "));
903
0
    if (!range_schema_.column_ids.empty()) {
904
0
      s.append("), ");
905
0
    } else {
906
0
      s.append(")");
907
0
    }
908
0
  }
909
910
408
  if (!range_schema_.column_ids.empty()) {
911
0
    Arena arena(1024, 128 * 1024);
912
0
    YBPartialRow start_row(&schema);
913
0
    YBPartialRow end_row(&schema);
914
915
0
    s.append("range: [");
916
917
0
    vector<string> start_components;
918
0
    Slice encoded_range_key_start = partition.range_key_start();
919
0
    Status status;
920
0
    status = DecodeRangeKey(&encoded_range_key_start, &start_row, &arena);
921
0
    if (status.ok()) {
922
0
      AppendRangeDebugStringComponentsOrString(start_row, "<start>", &start_components);
923
0
      s.append(JoinStrings(start_components, ", "));
924
0
    } else {
925
0
      s.append(Substitute("<decode-error: $0>", status.ToString()));
926
0
    }
927
0
    s.append(", ");
928
929
0
    vector<string> end_components;
930
0
    Slice encoded_range_key_end = partition.range_key_end();
931
0
    status = DecodeRangeKey(&encoded_range_key_end, &end_row, &arena);
932
0
    if (status.ok()) {
933
0
      AppendRangeDebugStringComponentsOrString(end_row, "<end>", &end_components);
934
0
      s.append(JoinStrings(end_components, ", "));
935
0
    } else {
936
0
      s.append(Substitute("<decode-error: $0>", status.ToString()));
937
0
    }
938
0
    s.append(")");
939
0
  }
940
941
408
  return s;
942
408
}
943
944
void PartitionSchema::AppendRangeDebugStringComponentsOrString(const YBPartialRow& row,
945
                                                               const GStringPiece default_string,
946
4
                                                               vector<string>* components) const {
947
4
  ConstContiguousRow const_row(row.schema(), row.row_data_);
948
949
4
  for (ColumnId column_id : range_schema_.column_ids) {
950
4
    string column;
951
4
    int32_t column_idx = row.schema()->find_column_by_id(column_id);
952
4
    if (column_idx == Schema::kColumnNotFound) {
953
0
      components->push_back("<unknown-column>");
954
0
      continue;
955
0
    }
956
4
    const ColumnSchema& column_schema = row.schema()->column(column_idx);
957
958
4
    if (!row.IsColumnSet(column_idx)) {
959
4
      components->push_back(default_string.as_string());
960
4
      break;
961
0
    } else {
962
0
      column_schema.DebugCellAppend(const_row.cell(column_idx), &column);
963
0
    }
964
965
0
    components->push_back(column);
966
0
  }
967
4
}
968
969
void PartitionSchema::AppendRangeDebugStringComponentsOrMin(const YBPartialRow& row,
970
0
                                                            vector<string>* components) const {
971
0
  ConstContiguousRow const_row(row.schema(), row.row_data_);
972
973
0
  for (ColumnId column_id : range_schema_.column_ids) {
974
0
    string column;
975
0
    int32_t column_idx = row.schema()->find_column_by_id(column_id);
976
0
    if (column_idx == Schema::kColumnNotFound) {
977
0
      components->push_back("<unknown-column>");
978
0
      continue;
979
0
    }
980
0
    const ColumnSchema& column_schema = row.schema()->column(column_idx);
981
982
0
    if (!row.IsColumnSet(column_idx)) {
983
0
      uint8_t min_value[kLargestTypeSize];
984
0
      column_schema.type_info()->CopyMinValue(&min_value);
985
0
      SimpleConstCell cell(&column_schema, &min_value);
986
0
      column_schema.DebugCellAppend(cell, &column);
987
0
    } else {
988
0
      column_schema.DebugCellAppend(const_row.cell(column_idx), &column);
989
0
    }
990
991
0
    components->push_back(column);
992
0
  }
993
0
}
994
995
0
string PartitionSchema::RowDebugString(const ConstContiguousRow& row) const {
996
0
  vector<string> components;
997
998
0
  for (const HashBucketSchema& hash_bucket_schema : hash_bucket_schemas_) {
999
0
    int32_t bucket;
1000
0
    Status s = BucketForRow(row, hash_bucket_schema, &bucket);
1001
0
    if (s.ok()) {
1002
0
      components.push_back(Substitute("bucket=$0", bucket));
1003
0
    } else {
1004
0
      components.push_back(Substitute("<bucket-error: $0>", s.ToString()));
1005
0
    }
1006
0
  }
1007
1008
0
  for (ColumnId column_id : range_schema_.column_ids) {
1009
0
    string column;
1010
0
    int32_t column_idx = row.schema()->find_column_by_id(column_id);
1011
0
    if (column_idx == Schema::kColumnNotFound) {
1012
0
      components.push_back("<unknown-column>");
1013
0
      break;
1014
0
    }
1015
0
    row.schema()->column(column_idx).DebugCellAppend(row.cell(column_idx), &column);
1016
0
    components.push_back(column);
1017
0
  }
1018
1019
0
  return JoinStrings(components, ", ");
1020
0
}
1021
1022
0
string PartitionSchema::RowDebugString(const YBPartialRow& row) const {
1023
0
  vector<string> components;
1024
1025
0
  for (const HashBucketSchema& hash_bucket_schema : hash_bucket_schemas_) {
1026
0
    int32_t bucket;
1027
0
    Status s = BucketForRow(row, hash_bucket_schema, &bucket);
1028
0
    if (s.ok()) {
1029
0
      components.push_back(Substitute("bucket=$0", bucket));
1030
0
    } else {
1031
0
      components.push_back(Substitute("<bucket-error: $0>", s.ToString()));
1032
0
    }
1033
0
  }
1034
1035
0
  AppendRangeDebugStringComponentsOrMin(row, &components);
1036
1037
0
  return JoinStrings(components, ", ");
1038
0
}
1039
1040
252
string PartitionSchema::PartitionKeyDebugString(const string& key, const Schema& schema) const {
1041
252
  Slice encoded_key = key;
1042
1043
252
  vector<string> components;
1044
1045
252
  if (hash_schema_) {
1046
252
    switch (*hash_schema_) {
1047
0
      case YBHashSchema::kRedisHash: FALLTHROUGH_INTENDED;
1048
252
      case YBHashSchema::kMultiColumnHash:
1049
252
        if (key.empty()) {
1050
230
          return "hash_code: NaN";
1051
22
        } else {
1052
22
          return Substitute("hash_code: $0", DecodeMultiColumnHashValue(key));
1053
22
        }
1054
0
      case YBHashSchema::kPgsqlHash:
1055
0
        return "Pgsql Hash";
1056
0
    }
1057
0
  }
1058
1059
0
  if (!hash_bucket_schemas_.empty()) {
1060
0
    vector<int32_t> buckets;
1061
0
    Status s = DecodeHashBuckets(&encoded_key, &buckets);
1062
0
    if (!s.ok()) {
1063
0
      return Substitute("<hash-decode-error: $0>", s.ToString());
1064
0
    }
1065
0
    for (int32_t bucket : buckets) {
1066
0
      components.push_back(Substitute("bucket=$0", bucket));
1067
0
    }
1068
0
  }
1069
1070
0
  if (!range_schema_.column_ids.empty()) {
1071
0
    Arena arena(1024, 128 * 1024);
1072
0
    YBPartialRow row(&schema);
1073
1074
0
    Status s = DecodeRangeKey(&encoded_key, &row, &arena);
1075
0
    if (!s.ok()) {
1076
0
      return Substitute("<range-decode-error: $0>", s.ToString());
1077
0
    }
1078
1079
0
    AppendRangeDebugStringComponentsOrMin(row, &components);
1080
0
  }
1081
1082
0
  return JoinStrings(components, ", ");
1083
0
}
1084
1085
namespace {
1086
// Converts a list of column IDs to a string with the column names seperated by
1087
// a comma character.
1088
string ColumnIdsToColumnNames(const Schema& schema,
1089
0
                              const vector<ColumnId> column_ids) {
1090
0
  vector<string> names;
1091
0
  for (ColumnId column_id : column_ids) {
1092
0
    names.push_back(schema.column(schema.find_column_by_id(column_id)).name());
1093
0
  }
1094
1095
0
  return JoinStrings(names, ", ");
1096
0
}
1097
} // namespace
1098
1099
0
string PartitionSchema::DebugString(const Schema& schema) const {
1100
0
  vector<string> component_types;
1101
1102
0
  if (hash_schema_) {
1103
0
    switch (*hash_schema_) {
1104
0
      case YBHashSchema::kRedisHash:
1105
0
        return "Redis Hash Partition";
1106
0
      case YBHashSchema::kMultiColumnHash: {
1107
0
        string component = "Multi Column Hash Partition. Partition columns: ";
1108
0
        const std::vector<ColumnSchema>& cols = schema.columns();
1109
0
        for (size_t idx = 0; idx < schema.num_hash_key_columns(); idx++) {
1110
0
          component.append(Substitute("$0($1)  ", cols[idx].name(), cols[idx].type_info()->name()));
1111
0
        }
1112
0
        component_types.push_back(component);
1113
0
        break;
1114
0
      }
1115
0
      case YBHashSchema::kPgsqlHash:
1116
0
        return "Pgsql Hash Partition";
1117
0
    }
1118
0
  }
1119
1120
0
  if (!hash_bucket_schemas_.empty()) {
1121
0
    vector<string> hash_components;
1122
0
    for (const HashBucketSchema& hash_bucket_schema : hash_bucket_schemas_) {
1123
0
      string component;
1124
0
      component.append(Substitute("(bucket count: $0", hash_bucket_schema.num_buckets));
1125
0
      if (hash_bucket_schema.seed != 0) {
1126
0
        component.append(Substitute(", seed: $0", hash_bucket_schema.seed));
1127
0
      }
1128
0
      component.append(Substitute(", columns: [$0])",
1129
0
                                  ColumnIdsToColumnNames(schema, hash_bucket_schema.column_ids)));
1130
0
      hash_components.push_back(component);
1131
0
    }
1132
0
    component_types.push_back(Substitute("hash bucket components: [$0]",
1133
0
                                         JoinStrings(hash_components, ", ")));
1134
0
  }
1135
1136
0
  if (!range_schema_.column_ids.empty()) {
1137
0
    component_types.push_back(Substitute("range columns: [$0]",
1138
0
                                         ColumnIdsToColumnNames(schema, range_schema_.column_ids)));
1139
0
  }
1140
0
  return JoinStrings(component_types, ", ");
1141
0
}
1142
1143
0
bool PartitionSchema::Equals(const PartitionSchema& other) const {
1144
0
  if (this == &other) return true;
1145
1146
  // Compare if both partitions schema are using a hash based scheme.
1147
0
  if ((hash_schema_ != other.hash_schema_) ||
1148
0
      (hash_schema_ && other.hash_schema_ && *hash_schema_ != *other.hash_schema_)) {
1149
0
    return false;
1150
0
  }
1151
1152
  // Compare range component.
1153
0
  if (range_schema_.column_ids != other.range_schema_.column_ids) return false;
1154
1155
  // Compare hash bucket components.
1156
0
  if (hash_bucket_schemas_.size() != other.hash_bucket_schemas_.size()) return false;
1157
0
  for (size_t i = 0; i < hash_bucket_schemas_.size(); i++) {
1158
0
    if (hash_bucket_schemas_[i].seed != other.hash_bucket_schemas_[i].seed) return false;
1159
0
    if (hash_bucket_schemas_[i].num_buckets
1160
0
        != other.hash_bucket_schemas_[i].num_buckets) return false;
1161
0
    if (hash_bucket_schemas_[i].column_ids
1162
0
        != other.hash_bucket_schemas_[i].column_ids) return false;
1163
0
  }
1164
1165
0
  return true;
1166
0
}
1167
1168
// Encodes the specified primary key columns of the supplied row into the buffer.
1169
Status PartitionSchema::EncodeColumns(const ConstContiguousRow& row,
1170
                                      const vector<ColumnId>& column_ids,
1171
0
                                      string* buf) {
1172
0
  for (size_t i = 0; i < column_ids.size(); i++) {
1173
0
    ColumnId column_id = column_ids[i];
1174
0
    int32_t column_idx = row.schema()->find_column_by_id(column_id);
1175
0
    const TypeInfo* type = row.schema()->column(column_idx).type_info();
1176
0
    GetKeyEncoder<string>(type).Encode(row.cell_ptr(column_idx), i + 1 == column_ids.size(), buf);
1177
0
  }
1178
0
  return Status::OK();
1179
0
}
1180
1181
// Encodes the specified primary key columns of the supplied row into the buffer.
1182
Status PartitionSchema::EncodeColumns(const YBPartialRow& row,
1183
                                      const vector<ColumnId>& column_ids,
1184
0
                                      string* buf) {
1185
0
  for (size_t i = 0; i < column_ids.size(); i++) {
1186
0
    int32_t column_idx = row.schema()->find_column_by_id(column_ids[i]);
1187
0
    CHECK(column_idx != Schema::kColumnNotFound);
1188
0
    const TypeInfo* type_info = row.schema()->column(column_idx).type_info();
1189
0
    const KeyEncoder<string>& encoder = GetKeyEncoder<string>(type_info);
1190
1191
0
    if (PREDICT_FALSE(!row.IsColumnSet(column_idx))) {
1192
0
      uint8_t min_value[kLargestTypeSize];
1193
0
      type_info->CopyMinValue(min_value);
1194
0
      encoder.Encode(min_value, i + 1 == column_ids.size(), buf);
1195
0
    } else {
1196
0
      ContiguousRow cont_row(row.schema(), row.row_data_);
1197
0
      encoder.Encode(cont_row.cell_ptr(column_idx), i + 1 == column_ids.size(), buf);
1198
0
    }
1199
0
  }
1200
0
  return Status::OK();
1201
0
}
1202
1203
0
uint16_t PartitionSchema::HashColumnCompoundValue(const string& compound) {
1204
  // In the future, if you wish to change the hashing behavior, you must introduce a new hashing
1205
  // method for your newly-created tables.  Existing tables must continue to use their hashing
1206
  // methods that was define by their PartitionSchema.
1207
1208
  // At the moment, Jenkins' hash is the only method we are using. In the future, we'll keep this
1209
  // as the default hashing behavior. Constant 'kseed" cannot be changed as it'd yield a different
1210
  // hashing result.
1211
0
  static const int kseed = 97;
1212
0
  const uint64_t hash_value = Hash64StringWithSeed(compound, kseed);
1213
1214
  // Convert the 64-bit hash value to 16 bit integer.
1215
0
  const uint64_t h1 = hash_value >> 48;
1216
0
  const uint64_t h2 = 3 * (hash_value >> 32);
1217
0
  const uint64_t h3 = 5 * (hash_value >> 16);
1218
0
  const uint64_t h4 = 7 * (hash_value & 0xffff);
1219
1220
0
  return (h1 ^ h2 ^ h3 ^ h4) & 0xffff;
1221
0
}
1222
1223
// Encodes the hash columns of the supplied row into a 2-byte partition key.
1224
0
Status PartitionSchema::EncodeColumns(const ConstContiguousRow& row, string* buf) {
1225
0
  string tmp;
1226
0
  auto num_cols = row.schema()->num_hash_key_columns();
1227
0
  for (size_t col_idx = 0; col_idx < num_cols; col_idx++) {
1228
0
    const TypeInfo* type = row.schema()->column(col_idx).type_info();
1229
0
    GetKeyEncoder<string>(type).Encode(row.cell_ptr(col_idx), col_idx + 1 == num_cols, &tmp);
1230
0
  }
1231
1232
0
  uint16_t hash_value = HashColumnCompoundValue(tmp);
1233
0
  *buf = EncodeMultiColumnHashValue(hash_value);
1234
0
  return Status::OK();
1235
0
}
1236
1237
// Encodes the hash columns of the supplied row into a 2-byte partition key.
1238
0
Status PartitionSchema::EncodeColumns(const YBPartialRow& row, string* buf) {
1239
0
  string tmp;
1240
0
  auto num_cols = row.schema()->num_hash_key_columns();
1241
0
  for (size_t col_idx = 0; col_idx < num_cols; col_idx++) {
1242
0
    const TypeInfo* type_info = row.schema()->column(col_idx).type_info();
1243
0
    const KeyEncoder<string>& encoder = GetKeyEncoder<string>(type_info);
1244
1245
0
    if (PREDICT_FALSE(!row.IsColumnSet(col_idx))) {
1246
0
      LOG(FATAL) << "Hash column must be specified: " << col_idx;
1247
0
    } else {
1248
0
      ContiguousRow cont_row(row.schema(), row.row_data_);
1249
0
      encoder.Encode(cont_row.cell_ptr(col_idx), col_idx + 1 == num_cols, &tmp);
1250
0
    }
1251
0
  }
1252
1253
0
  uint16_t hash_value = HashColumnCompoundValue(tmp);
1254
0
  *buf = EncodeMultiColumnHashValue(hash_value);
1255
0
  return Status::OK();
1256
0
}
1257
1258
template<typename Row>
1259
Status PartitionSchema::BucketForRow(const Row& row,
1260
                                     const HashBucketSchema& hash_bucket_schema,
1261
0
                                     int32_t* bucket) {
1262
0
  string buf;
1263
0
  RETURN_NOT_OK(EncodeColumns(row, hash_bucket_schema.column_ids, &buf));
1264
0
  uint16_t hash_value = HashColumnCompoundValue(buf);
1265
0
  *bucket = hash_value % static_cast<uint64_t>(hash_bucket_schema.num_buckets);
1266
0
  return Status::OK();
1267
0
}
Unexecuted instantiation: _ZN2yb15PartitionSchema12BucketForRowINS_12YBPartialRowEEENS_6StatusERKT_RKNS0_16HashBucketSchemaEPi
Unexecuted instantiation: _ZN2yb15PartitionSchema12BucketForRowINS_18ConstContiguousRowEEENS_6StatusERKT_RKNS0_16HashBucketSchemaEPi
1268
1269
//------------------------------------------------------------
1270
// Template instantiations: We instantiate all possible templates to avoid linker issues.
1271
// see: https://isocpp.org/wiki/faq/templates#separate-template-fn-defn-from-decl
1272
//------------------------------------------------------------
1273
1274
template
1275
Status PartitionSchema::BucketForRow(const YBPartialRow& row,
1276
                                     const HashBucketSchema& hash_bucket_schema,
1277
                                     int32_t* bucket);
1278
1279
template
1280
Status PartitionSchema::BucketForRow(const ConstContiguousRow& row,
1281
                                     const HashBucketSchema& hash_bucket_schema,
1282
                                     int32_t* bucket);
1283
1284
762k
void PartitionSchema::Clear() {
1285
762k
  hash_bucket_schemas_.clear();
1286
762k
  range_schema_.column_ids.clear();
1287
762k
  hash_schema_ = boost::none;
1288
762k
}
1289
1290
0
Status PartitionSchema::Validate(const Schema& schema) const {
1291
0
  set<ColumnId> hash_columns;
1292
0
  for (const PartitionSchema::HashBucketSchema& hash_schema : hash_bucket_schemas_) {
1293
0
    if (hash_schema.num_buckets < 2) {
1294
0
      return STATUS(InvalidArgument, "must have at least two hash buckets");
1295
0
    }
1296
1297
0
    if (hash_schema.column_ids.size() < 1) {
1298
0
      return STATUS(InvalidArgument, "must have at least one hash column");
1299
0
    }
1300
1301
0
    for (ColumnId hash_column : hash_schema.column_ids) {
1302
0
      if (!hash_columns.insert(hash_column).second) {
1303
0
        return STATUS(InvalidArgument, "hash bucket schema components must not "
1304
0
                                       "contain columns in common");
1305
0
      }
1306
0
      int32_t column_idx = schema.find_column_by_id(hash_column);
1307
0
      if (column_idx == Schema::kColumnNotFound) {
1308
0
        return STATUS(InvalidArgument, "must specify existing columns for hash "
1309
0
                                       "bucket partition components");
1310
0
      } else if (implicit_cast<size_t>(column_idx) >= schema.num_key_columns()) {
1311
0
        return STATUS(InvalidArgument, "must specify only primary key columns for "
1312
0
                                       "hash bucket partition components");
1313
0
      }
1314
0
    }
1315
0
  }
1316
1317
0
  for (ColumnId column_id : range_schema_.column_ids) {
1318
0
    int32_t column_idx = schema.find_column_by_id(column_id);
1319
0
    if (column_idx == Schema::kColumnNotFound) {
1320
0
      return STATUS(InvalidArgument, "must specify existing columns for range "
1321
0
                                     "partition component");
1322
0
    } else if (implicit_cast<size_t>(column_idx) >= schema.num_key_columns()) {
1323
0
      return STATUS(InvalidArgument, "must specify only primary key columns for "
1324
0
                                     "range partition component");
1325
0
    }
1326
0
  }
1327
1328
0
  return Status::OK();
1329
0
}
1330
1331
151M
bool PartitionSchema::IsHashPartitioning() const {
1332
151M
  return hash_schema_ != boost::none;
1333
151M
}
1334
1335
0
YBHashSchema PartitionSchema::hash_schema() const {
1336
0
  CHECK(hash_schema_);
1337
0
  return *hash_schema_;
1338
0
}
1339
1340
} // namespace yb