/Users/deen/code/yugabyte-db/src/yb/yql/pggate/pg_dml_write.cc
Line | Count | Source (jump to first uncovered line) |
1 | | //-------------------------------------------------------------------------------------------------- |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | //-------------------------------------------------------------------------------------------------- |
15 | | |
16 | | #include "yb/yql/pggate/pg_dml_write.h" |
17 | | |
18 | | #include "yb/client/yb_op.h" |
19 | | |
20 | | #include "yb/gutil/casts.h" |
21 | | |
22 | | namespace yb { |
23 | | namespace pggate { |
24 | | |
25 | | using std::make_shared; |
26 | | using std::shared_ptr; |
27 | | using std::string; |
28 | | using namespace std::literals; // NOLINT |
29 | | |
30 | | using client::YBSession; |
31 | | using client::YBMetaDataCache; |
32 | | using client::YBTable; |
33 | | using client::YBTableName; |
34 | | using client::YBPgsqlWriteOp; |
35 | | |
36 | | // TODO(neil) This should be derived from a GFLAGS. |
37 | | static MonoDelta kSessionTimeout = 60s; |
38 | | |
39 | | //-------------------------------------------------------------------------------------------------- |
40 | | // PgDmlWrite |
41 | | //-------------------------------------------------------------------------------------------------- |
42 | | |
43 | | PgDmlWrite::PgDmlWrite(PgSession::ScopedRefPtr pg_session, |
44 | | const PgObjectId& table_id, |
45 | | const bool is_single_row_txn) |
46 | 7.20M | : PgDml(std::move(pg_session), table_id), is_single_row_txn_(is_single_row_txn) { |
47 | 7.20M | } |
48 | | |
49 | 7.20M | PgDmlWrite::~PgDmlWrite() { |
50 | 7.20M | } |
51 | | |
52 | 7.20M | Status PgDmlWrite::Prepare() { |
53 | | // Setup descriptors for target and bind columns. |
54 | 7.20M | target_ = bind_ = PgTable(VERIFY_RESULT(pg_session_->LoadTable(table_id_))); |
55 | | |
56 | | // Allocate either INSERT, UPDATE, DELETE, or TRUNCATE_COLOCATED request. |
57 | 0 | AllocWriteRequest(); |
58 | 7.20M | PrepareColumns(); |
59 | 7.20M | return Status::OK(); |
60 | 7.20M | } |
61 | | |
62 | 7.20M | void PgDmlWrite::PrepareColumns() { |
63 | | // Because DocDB API requires that primary columns must be listed in their created-order, |
64 | | // the slots for primary column bind expressions are allocated here in correct order. |
65 | 37.9M | for (auto& col : target_.columns()) { |
66 | 37.9M | col.AllocPrimaryBindPB(write_req_.get()); |
67 | 37.9M | } |
68 | 7.20M | } |
69 | | |
70 | 7.20M | Status PgDmlWrite::DeleteEmptyPrimaryBinds() { |
71 | | // Iterate primary-key columns and remove the binds without values. |
72 | 7.20M | bool missing_primary_key = false; |
73 | | |
74 | | // Either ybctid or primary key must be present. |
75 | 7.20M | if (!ybctid_bind_) { |
76 | | // Remove empty binds from partition list. |
77 | 1.34M | auto partition_iter = write_req_->mutable_partition_column_values()->begin(); |
78 | 2.02M | while (partition_iter != write_req_->mutable_partition_column_values()->end()) { |
79 | 677k | if (expr_binds_.find(&*partition_iter) == expr_binds_.end()) { |
80 | 0 | missing_primary_key = true; |
81 | 0 | partition_iter = write_req_->mutable_partition_column_values()->erase(partition_iter); |
82 | 677k | } else { |
83 | 677k | partition_iter++; |
84 | 677k | } |
85 | 677k | } |
86 | | |
87 | | // Remove empty binds from range list. |
88 | 1.34M | auto range_iter = write_req_->mutable_range_column_values()->begin(); |
89 | 4.79M | while (range_iter != write_req_->mutable_range_column_values()->end()) { |
90 | 3.45M | if (expr_binds_.find(&*range_iter) == expr_binds_.end()) { |
91 | 113 | missing_primary_key = true; |
92 | 113 | range_iter = write_req_->mutable_range_column_values()->erase(range_iter); |
93 | 3.45M | } else { |
94 | 3.45M | range_iter++; |
95 | 3.45M | } |
96 | 3.45M | } |
97 | 5.86M | } else { |
98 | 5.86M | write_req_->clear_partition_column_values(); |
99 | 5.86M | write_req_->clear_range_column_values(); |
100 | 5.86M | } |
101 | | |
102 | | // Check for missing key. This is okay when binding the whole table (for colocated truncate). |
103 | 7.20M | if (missing_primary_key && !bind_table_90 ) { |
104 | 0 | return STATUS(InvalidArgument, "Primary key must be fully specified for modifying table"); |
105 | 0 | } |
106 | | |
107 | 7.20M | return Status::OK(); |
108 | 7.20M | } |
109 | | |
110 | 7.20M | Status PgDmlWrite::Exec(bool force_non_bufferable) { |
111 | | |
112 | | // Delete allocated binds that are not associated with a value. |
113 | | // YBClient interface enforce us to allocate binds for primary key columns in their indexing |
114 | | // order, so we have to allocate these binds before associating them with values. When the values |
115 | | // are not assigned, these allocated binds must be deleted. |
116 | 7.20M | RETURN_NOT_OK(DeleteEmptyPrimaryBinds()); |
117 | | |
118 | | // First update protobuf with new bind values. |
119 | 7.20M | RETURN_NOT_OK(UpdateBindPBs()); |
120 | 7.20M | RETURN_NOT_OK(UpdateAssignPBs()); |
121 | | |
122 | 7.20M | if (write_req_->has_ybctid_column_value()) { |
123 | 5.86M | PgsqlExpressionPB *exprpb = write_req_->mutable_ybctid_column_value(); |
124 | 18.4E | CHECK(exprpb->has_value() && exprpb->value().has_binary_value()) |
125 | 18.4E | << "YBCTID must be of BINARY datatype"; |
126 | 5.86M | } |
127 | | |
128 | | // Initialize doc operator. |
129 | 7.20M | RETURN_NOT_OK(doc_op_->ExecuteInit(nullptr)); |
130 | | |
131 | | // Set column references in protobuf. |
132 | 7.20M | ColRefsToPB(); |
133 | | // Compatibility: set column ids as expected by legacy nodes |
134 | 7.20M | ColumnRefsToPB(write_req_->mutable_column_refs()); |
135 | | |
136 | | // Execute the statement. If the request has been sent, get the result and handle any rows |
137 | | // returned. |
138 | 14.4M | if (VERIFY_RESULT7.20M (doc_op_->Execute(force_non_bufferable)) == RequestSent::kTrue) { |
139 | 56.9k | RETURN_NOT_OK(doc_op_->GetResult(&rowsets_)); |
140 | | |
141 | | // Save the number of rows affected by the op. |
142 | 53.4k | rows_affected_count_ = VERIFY_RESULT(doc_op_->GetRowsAffectedCount()); |
143 | 53.4k | } |
144 | | |
145 | 7.20M | return Status::OK(); |
146 | 7.20M | } |
147 | | |
148 | 371k | Status PgDmlWrite::SetWriteTime(const HybridTime& write_time) { |
149 | 371k | SCHECK(doc_op_.get() != nullptr, RuntimeError, "expected doc_op_ to be initialized"); |
150 | 371k | down_cast<PgDocWriteOp*>(doc_op_.get())->SetWriteTime(write_time); |
151 | 371k | return Status::OK(); |
152 | 371k | } |
153 | | |
154 | 7.20M | void PgDmlWrite::AllocWriteRequest() { |
155 | 7.20M | auto write_op = std::make_shared<PgsqlWriteOp>(!is_single_row_txn_); |
156 | | |
157 | 7.20M | write_req_ = std::shared_ptr<PgsqlWriteRequestPB>(write_op, &write_op->write_request()); |
158 | 7.20M | write_req_->set_stmt_type(stmt_type()); |
159 | 7.20M | write_req_->set_client(YQL_CLIENT_PGSQL); |
160 | 7.20M | write_req_->set_table_id(table_id_.GetYbTableId()); |
161 | 7.20M | write_req_->set_schema_version(target_->schema_version()); |
162 | 7.20M | write_req_->set_stmt_id(reinterpret_cast<uint64_t>(write_req_.get())); |
163 | | |
164 | 7.20M | doc_op_ = std::make_shared<PgDocWriteOp>(pg_session_, &target_, std::move(write_op)); |
165 | 7.20M | } |
166 | | |
167 | 20.5M | PgsqlExpressionPB *PgDmlWrite::AllocColumnBindPB(PgColumn *col) { |
168 | 20.5M | return col->AllocBindPB(write_req_.get()); |
169 | 20.5M | } |
170 | | |
171 | 983k | PgsqlExpressionPB *PgDmlWrite::AllocColumnAssignPB(PgColumn *col) { |
172 | 983k | return col->AllocAssignPB(write_req_.get()); |
173 | 983k | } |
174 | | |
175 | 23 | PgsqlExpressionPB *PgDmlWrite::AllocTargetPB() { |
176 | 23 | return write_req_->add_targets(); |
177 | 23 | } |
178 | | |
179 | 0 | PgsqlExpressionPB *PgDmlWrite::AllocQualPB() { |
180 | 0 | LOG(FATAL) << "Pure virtual function is being called"; |
181 | 0 | return nullptr; |
182 | 0 | } |
183 | | |
184 | 983k | PgsqlColRefPB *PgDmlWrite::AllocColRefPB() { |
185 | 983k | return write_req_->add_col_refs(); |
186 | 983k | } |
187 | | |
188 | 7.20M | void PgDmlWrite::ClearColRefPBs() { |
189 | 7.20M | write_req_->clear_col_refs(); |
190 | 7.20M | } |
191 | | |
192 | | } // namespace pggate |
193 | | } // namespace yb |