/Users/deen/code/yugabyte-db/src/yb/yql/pggate/util/pg_doc_data.cc
Line | Count | Source (jump to first uncovered line) |
1 | | //-------------------------------------------------------------------------------------------------- |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | //-------------------------------------------------------------------------------------------------- |
15 | | |
16 | | #include "yb/yql/pggate/util/pg_doc_data.h" |
17 | | |
18 | | #include "yb/client/client.h" |
19 | | |
20 | | #include "yb/common/ql_value.h" |
21 | | |
22 | | #include "yb/util/format.h" |
23 | | #include "yb/util/status_format.h" |
24 | | |
25 | | namespace yb { |
26 | | namespace pggate { |
27 | | |
28 | 228M | Status WriteColumn(const QLValuePB& col_value, faststring *buffer) { |
29 | | // Write data header. |
30 | 228M | bool has_data = true; |
31 | 228M | PgWireDataHeader col_header; |
32 | 228M | if (QLValue::IsNull(col_value)) { |
33 | 20.8M | col_header.set_null(); |
34 | 20.8M | has_data = false; |
35 | 20.8M | } |
36 | 228M | PgWire::WriteUint8(col_header.ToUint8(), buffer); |
37 | | |
38 | 228M | if (!has_data) { |
39 | 20.8M | return Status::OK(); |
40 | 20.8M | } |
41 | | |
42 | 207M | switch (col_value.value_case()) { |
43 | 0 | case InternalType::VALUE_NOT_SET: |
44 | 0 | break; |
45 | 36.6M | case InternalType::kBoolValue: |
46 | 36.6M | PgWire::WriteBool(col_value.bool_value(), buffer); |
47 | 36.6M | break; |
48 | 21.1M | case InternalType::kInt8Value: |
49 | 21.1M | PgWire::WriteInt8(col_value.int8_value(), buffer); |
50 | 21.1M | break; |
51 | 12.2M | case InternalType::kInt16Value: |
52 | 12.2M | PgWire::WriteInt16(col_value.int16_value(), buffer); |
53 | 12.2M | break; |
54 | 53.6M | case InternalType::kInt32Value: |
55 | 53.6M | PgWire::WriteInt32(col_value.int32_value(), buffer); |
56 | 53.6M | break; |
57 | 532k | case InternalType::kInt64Value: |
58 | 532k | PgWire::WriteInt64(col_value.int64_value(), buffer); |
59 | 532k | break; |
60 | 49.8M | case InternalType::kUint32Value: |
61 | 49.8M | PgWire::WriteUint32(col_value.uint32_value(), buffer); |
62 | 49.8M | break; |
63 | 0 | case InternalType::kUint64Value: |
64 | 0 | PgWire::WriteUint64(col_value.uint64_value(), buffer); |
65 | 0 | break; |
66 | 694k | case InternalType::kFloatValue: |
67 | 694k | PgWire::WriteFloat(col_value.float_value(), buffer); |
68 | 694k | break; |
69 | 60.8k | case InternalType::kDoubleValue: |
70 | 60.8k | PgWire::WriteDouble(col_value.double_value(), buffer); |
71 | 60.8k | break; |
72 | 13.9M | case InternalType::kStringValue: |
73 | 13.9M | PgWire::WriteText(col_value.string_value(), buffer); |
74 | 13.9M | break; |
75 | 19.1M | case InternalType::kBinaryValue: |
76 | 19.1M | PgWire::WriteBinary(col_value.binary_value(), buffer); |
77 | 19.1M | break; |
78 | 8.15k | case InternalType::kDecimalValue: |
79 | | // Passing a serialized form of YB Decimal, decoding will be done in pg_expr.cc |
80 | 8.15k | PgWire::WriteText(col_value.decimal_value(), buffer); |
81 | 8.15k | break; |
82 | 0 | case InternalType::kVirtualValue: |
83 | | // Expecting database to return an actual value and not a virtual one. |
84 | 0 | case InternalType::kTimestampValue: |
85 | 0 | case InternalType::kDateValue: // Not used for PG storage |
86 | 0 | case InternalType::kTimeValue: // Not used for PG storage |
87 | 0 | case InternalType::kVarintValue: |
88 | 0 | case InternalType::kInetaddressValue: |
89 | 0 | case InternalType::kJsonbValue: |
90 | 0 | case InternalType::kUuidValue: |
91 | 0 | case InternalType::kTimeuuidValue: |
92 | | // PgGate has not supported these datatypes yet. |
93 | 0 | return STATUS_FORMAT(NotSupported, |
94 | 0 | "Unexpected data was read from database: col_value.type()=$0", col_value.value_case()); |
95 | |
|
96 | 0 | case InternalType::kListValue: |
97 | 0 | case InternalType::kMapValue: |
98 | 0 | case InternalType::kSetValue: |
99 | 0 | case InternalType::kFrozenValue: |
100 | | // Postgres does not have these datatypes. |
101 | 0 | return STATUS_FORMAT(Corruption, |
102 | 0 | "Unexpected data was read from database: col_value.type()=$0", col_value.value_case()); |
103 | 0 | case InternalType::kGinNullValue: |
104 | 0 | PgWire::WriteUint8(col_value.gin_null_value(), buffer); |
105 | 207M | } |
106 | | |
107 | 207M | return Status::OK(); |
108 | 207M | } |
109 | | |
110 | | //-------------------------------------------------------------------------------------------------- |
111 | | // Read Tuple Routine in DocDB Format (wire_protocol). |
112 | | //-------------------------------------------------------------------------------------------------- |
113 | | |
114 | 1.36M | void PgDocData::LoadCache(const rpc::SidecarPtr& cache, int64_t *total_row_count, Slice *cursor) { |
115 | | // Setup the buffer to read the next set of tuples. |
116 | 63 | CHECK(cursor->empty()) << "Existing cache is not yet fully read"; |
117 | 1.36M | auto *sidecar = cache.get(); |
118 | 1.36M | LoadCache(Slice(sidecar[0], sidecar[1]), total_row_count, cursor); |
119 | 1.36M | } |
120 | | |
121 | 1.36M | void PgDocData::LoadCache(const Slice& cache, int64_t *total_row_count, Slice *cursor) { |
122 | 1.36M | *cursor = cache; |
123 | | |
124 | | // Read the number row_count in this set. |
125 | 1.36M | int64_t this_count; |
126 | 1.36M | size_t read_size = ReadNumber(cursor, &this_count); |
127 | 1.36M | *total_row_count = this_count; |
128 | 1.36M | cursor->remove_prefix(read_size); |
129 | 1.36M | } |
130 | | |
131 | 228M | PgWireDataHeader PgDocData::ReadDataHeader(Slice *cursor) { |
132 | | // Read for NULL value. |
133 | 228M | uint8_t header_data; |
134 | 228M | size_t read_size = ReadNumber(cursor, &header_data); |
135 | 228M | cursor->remove_prefix(read_size); |
136 | | |
137 | 228M | return PgWireDataHeader(header_data); |
138 | 228M | } |
139 | | |
140 | | } // namespace pggate |
141 | | } // namespace yb |