/Users/deen/code/yugabyte-db/src/yb/yql/pggate/util/pg_doc_data.cc
Line | Count | Source (jump to first uncovered line) |
1 | | //-------------------------------------------------------------------------------------------------- |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | //-------------------------------------------------------------------------------------------------- |
15 | | |
16 | | #include "yb/yql/pggate/util/pg_doc_data.h" |
17 | | |
18 | | #include "yb/client/client.h" |
19 | | |
20 | | #include "yb/common/ql_value.h" |
21 | | |
22 | | #include "yb/util/format.h" |
23 | | #include "yb/util/status_format.h" |
24 | | |
25 | | namespace yb { |
26 | | namespace pggate { |
27 | | |
28 | 656M | Status WriteColumn(const QLValuePB& col_value, faststring *buffer) { |
29 | | // Write data header. |
30 | 656M | bool has_data = true; |
31 | 656M | PgWireDataHeader col_header; |
32 | 656M | if (QLValue::IsNull(col_value)) { |
33 | 63.7M | col_header.set_null(); |
34 | 63.7M | has_data = false; |
35 | 63.7M | } |
36 | 656M | PgWire::WriteUint8(col_header.ToUint8(), buffer); |
37 | | |
38 | 656M | if (!has_data) { |
39 | 63.7M | return Status::OK(); |
40 | 63.7M | } |
41 | | |
42 | 592M | switch (col_value.value_case()) { |
43 | 0 | case InternalType::VALUE_NOT_SET: |
44 | 0 | break; |
45 | 111M | case InternalType::kBoolValue: |
46 | 111M | PgWire::WriteBool(col_value.bool_value(), buffer); |
47 | 111M | break; |
48 | 62.6M | case InternalType::kInt8Value: |
49 | 62.6M | PgWire::WriteInt8(col_value.int8_value(), buffer); |
50 | 62.6M | break; |
51 | 37.6M | case InternalType::kInt16Value: |
52 | 37.6M | PgWire::WriteInt16(col_value.int16_value(), buffer); |
53 | 37.6M | break; |
54 | 140M | case InternalType::kInt32Value: |
55 | 140M | PgWire::WriteInt32(col_value.int32_value(), buffer); |
56 | 140M | break; |
57 | 891k | case InternalType::kInt64Value: |
58 | 891k | PgWire::WriteInt64(col_value.int64_value(), buffer); |
59 | 891k | break; |
60 | 148M | case InternalType::kUint32Value: |
61 | 148M | PgWire::WriteUint32(col_value.uint32_value(), buffer); |
62 | 148M | break; |
63 | 24 | case InternalType::kUint64Value: |
64 | 24 | PgWire::WriteUint64(col_value.uint64_value(), buffer); |
65 | 24 | break; |
66 | 2.27M | case InternalType::kFloatValue: |
67 | 2.27M | PgWire::WriteFloat(col_value.float_value(), buffer); |
68 | 2.27M | break; |
69 | 303k | case InternalType::kDoubleValue: |
70 | 303k | PgWire::WriteDouble(col_value.double_value(), buffer); |
71 | 303k | break; |
72 | 38.9M | case InternalType::kStringValue: |
73 | 38.9M | PgWire::WriteText(col_value.string_value(), buffer); |
74 | 38.9M | break; |
75 | 51.3M | case InternalType::kBinaryValue: |
76 | 51.3M | PgWire::WriteBinary(col_value.binary_value(), buffer); |
77 | 51.3M | break; |
78 | 8.30k | case InternalType::kDecimalValue: |
79 | | // Passing a serialized form of YB Decimal, decoding will be done in pg_expr.cc |
80 | 8.30k | PgWire::WriteText(col_value.decimal_value(), buffer); |
81 | 8.30k | break; |
82 | 0 | case InternalType::kVirtualValue: |
83 | | // Expecting database to return an actual value and not a virtual one. |
84 | 0 | case InternalType::kTimestampValue: |
85 | 0 | case InternalType::kDateValue: // Not used for PG storage |
86 | 0 | case InternalType::kTimeValue: // Not used for PG storage |
87 | 0 | case InternalType::kVarintValue: |
88 | 0 | case InternalType::kInetaddressValue: |
89 | 0 | case InternalType::kJsonbValue: |
90 | 0 | case InternalType::kUuidValue: |
91 | 0 | case InternalType::kTimeuuidValue: |
92 | | // PgGate has not supported these datatypes yet. |
93 | 0 | return STATUS_FORMAT(NotSupported, |
94 | 0 | "Unexpected data was read from database: col_value.type()=$0", col_value.value_case()); |
95 | | |
96 | 0 | case InternalType::kListValue: |
97 | 0 | case InternalType::kMapValue: |
98 | 0 | case InternalType::kSetValue: |
99 | 0 | case InternalType::kFrozenValue: |
100 | | // Postgres does not have these datatypes. |
101 | 0 | return STATUS_FORMAT(Corruption, |
102 | 0 | "Unexpected data was read from database: col_value.type()=$0", col_value.value_case()); |
103 | 0 | case InternalType::kGinNullValue: |
104 | 0 | PgWire::WriteUint8(col_value.gin_null_value(), buffer); |
105 | 592M | } |
106 | | |
107 | 593M | return Status::OK(); |
108 | 592M | } |
109 | | |
110 | | //-------------------------------------------------------------------------------------------------- |
111 | | // Read Tuple Routine in DocDB Format (wire_protocol). |
112 | | //-------------------------------------------------------------------------------------------------- |
113 | | |
114 | 3.22M | void PgDocData::LoadCache(const rpc::SidecarPtr& cache, int64_t *total_row_count, Slice *cursor) { |
115 | | // Setup the buffer to read the next set of tuples. |
116 | 3.22M | CHECK(cursor->empty()) << "Existing cache is not yet fully read"685 ; |
117 | 3.22M | auto *sidecar = cache.get(); |
118 | 3.22M | LoadCache(Slice(sidecar[0], sidecar[1]), total_row_count, cursor); |
119 | 3.22M | } |
120 | | |
121 | 3.22M | void PgDocData::LoadCache(const Slice& cache, int64_t *total_row_count, Slice *cursor) { |
122 | 3.22M | *cursor = cache; |
123 | | |
124 | | // Read the number row_count in this set. |
125 | 3.22M | int64_t this_count; |
126 | 3.22M | size_t read_size = ReadNumber(cursor, &this_count); |
127 | 3.22M | *total_row_count = this_count; |
128 | 3.22M | cursor->remove_prefix(read_size); |
129 | 3.22M | } |
130 | | |
131 | 650M | PgWireDataHeader PgDocData::ReadDataHeader(Slice *cursor) { |
132 | | // Read for NULL value. |
133 | 650M | uint8_t header_data; |
134 | 650M | size_t read_size = ReadNumber(cursor, &header_data); |
135 | 650M | cursor->remove_prefix(read_size); |
136 | | |
137 | 650M | return PgWireDataHeader(header_data); |
138 | 650M | } |
139 | | |
140 | | } // namespace pggate |
141 | | } // namespace yb |