YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/yql/pggate/pg_dml_write.cc
Line
Count
Source (jump to first uncovered line)
1
//--------------------------------------------------------------------------------------------------
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//--------------------------------------------------------------------------------------------------
15
16
#include "yb/yql/pggate/pg_dml_write.h"
17
18
#include "yb/client/yb_op.h"
19
20
#include "yb/gutil/casts.h"
21
22
namespace yb {
23
namespace pggate {
24
25
using std::make_shared;
26
using std::shared_ptr;
27
using std::string;
28
using namespace std::literals;  // NOLINT
29
30
using client::YBClient;
31
using client::YBSession;
32
using client::YBMetaDataCache;
33
using client::YBTable;
34
using client::YBTableName;
35
using client::YBPgsqlWriteOp;
36
37
// TODO(neil) This should be derived from a GFLAGS.
38
static MonoDelta kSessionTimeout = 60s;
39
40
//--------------------------------------------------------------------------------------------------
41
// PgDmlWrite
42
//--------------------------------------------------------------------------------------------------
43
44
PgDmlWrite::PgDmlWrite(PgSession::ScopedRefPtr pg_session,
45
                       const PgObjectId& table_id,
46
                       const bool is_single_row_txn)
47
2.12M
    : PgDml(std::move(pg_session), table_id), is_single_row_txn_(is_single_row_txn) {
48
2.12M
}
49
50
2.12M
PgDmlWrite::~PgDmlWrite() {
51
2.12M
}
52
53
2.12M
Status PgDmlWrite::Prepare() {
54
  // Setup descriptors for target and bind columns.
55
2.12M
  target_ = bind_ = PgTable(VERIFY_RESULT(pg_session_->LoadTable(table_id_)));
56
57
  // Allocate either INSERT, UPDATE, DELETE, or TRUNCATE_COLOCATED request.
58
2.12M
  AllocWriteRequest();
59
2.12M
  PrepareColumns();
60
2.12M
  return Status::OK();
61
2.12M
}
62
63
2.12M
void PgDmlWrite::PrepareColumns() {
64
  // Because DocDB API requires that primary columns must be listed in their created-order,
65
  // the slots for primary column bind expressions are allocated here in correct order.
66
11.1M
  for (auto& col : target_.columns()) {
67
11.1M
    col.AllocPrimaryBindPB(write_req_.get());
68
11.1M
  }
69
2.12M
}
70
71
2.12M
Status PgDmlWrite::DeleteEmptyPrimaryBinds() {
72
  // Iterate primary-key columns and remove the binds without values.
73
2.12M
  bool missing_primary_key = false;
74
75
  // Either ybctid or primary key must be present.
76
2.12M
  if (!ybctid_bind_) {
77
    // Remove empty binds from partition list.
78
351k
    auto partition_iter = write_req_->mutable_partition_column_values()->begin();
79
508k
    while (partition_iter != write_req_->mutable_partition_column_values()->end()) {
80
156k
      if (expr_binds_.find(&*partition_iter) == expr_binds_.end()) {
81
0
        missing_primary_key = true;
82
0
        partition_iter = write_req_->mutable_partition_column_values()->erase(partition_iter);
83
156k
      } else {
84
156k
        partition_iter++;
85
156k
      }
86
156k
    }
87
88
    // Remove empty binds from range list.
89
351k
    auto range_iter = write_req_->mutable_range_column_values()->begin();
90
1.22M
    while (range_iter != write_req_->mutable_range_column_values()->end()) {
91
875k
      if (expr_binds_.find(&*range_iter) == expr_binds_.end()) {
92
25
        missing_primary_key = true;
93
25
        range_iter = write_req_->mutable_range_column_values()->erase(range_iter);
94
875k
      } else {
95
875k
        range_iter++;
96
875k
      }
97
875k
    }
98
1.77M
  } else {
99
1.77M
    write_req_->clear_partition_column_values();
100
1.77M
    write_req_->clear_range_column_values();
101
1.77M
  }
102
103
  // Check for missing key.  This is okay when binding the whole table (for colocated truncate).
104
2.12M
  if (missing_primary_key && !bind_table_) {
105
0
    return STATUS(InvalidArgument, "Primary key must be fully specified for modifying table");
106
0
  }
107
108
2.12M
  return Status::OK();
109
2.12M
}
110
111
2.12M
Status PgDmlWrite::Exec(bool force_non_bufferable) {
112
113
  // Delete allocated binds that are not associated with a value.
114
  // YBClient interface enforce us to allocate binds for primary key columns in their indexing
115
  // order, so we have to allocate these binds before associating them with values. When the values
116
  // are not assigned, these allocated binds must be deleted.
117
2.12M
  RETURN_NOT_OK(DeleteEmptyPrimaryBinds());
118
119
  // First update protobuf with new bind values.
120
2.12M
  RETURN_NOT_OK(UpdateBindPBs());
121
2.12M
  RETURN_NOT_OK(UpdateAssignPBs());
122
123
2.12M
  if (write_req_->has_ybctid_column_value()) {
124
1.77M
    PgsqlExpressionPB *exprpb = write_req_->mutable_ybctid_column_value();
125
18.4E
    CHECK(exprpb->has_value() && exprpb->value().has_binary_value())
126
18.4E
      << "YBCTID must be of BINARY datatype";
127
1.77M
  }
128
129
  // Initialize doc operator.
130
2.12M
  RETURN_NOT_OK(doc_op_->ExecuteInit(nullptr));
131
132
  // Set column references in protobuf.
133
2.12M
  ColRefsToPB();
134
  // Compatibility: set column ids as expected by legacy nodes
135
2.12M
  ColumnRefsToPB(write_req_->mutable_column_refs());
136
137
  // Execute the statement. If the request has been sent, get the result and handle any rows
138
  // returned.
139
4.25M
  if (VERIFY_RESULT(doc_op_->Execute(force_non_bufferable)) == RequestSent::kTrue) {
140
6.39k
    RETURN_NOT_OK(doc_op_->GetResult(&rowsets_));
141
142
    // Save the number of rows affected by the op.
143
4.08k
    rows_affected_count_ = VERIFY_RESULT(doc_op_->GetRowsAffectedCount());
144
4.08k
  }
145
146
2.12M
  return Status::OK();
147
2.12M
}
148
149
103k
Status PgDmlWrite::SetWriteTime(const HybridTime& write_time) {
150
103k
  SCHECK(doc_op_.get() != nullptr, RuntimeError, "expected doc_op_ to be initialized");
151
103k
  down_cast<PgDocWriteOp*>(doc_op_.get())->SetWriteTime(write_time);
152
103k
  return Status::OK();
153
103k
}
154
155
2.12M
void PgDmlWrite::AllocWriteRequest() {
156
2.12M
  auto write_op = std::make_shared<PgsqlWriteOp>(!is_single_row_txn_);
157
158
2.12M
  write_req_ = std::shared_ptr<PgsqlWriteRequestPB>(write_op, &write_op->write_request());
159
2.12M
  write_req_->set_stmt_type(stmt_type());
160
2.12M
  write_req_->set_client(YQL_CLIENT_PGSQL);
161
2.12M
  write_req_->set_table_id(table_id_.GetYBTableId());
162
2.12M
  write_req_->set_schema_version(target_->schema_version());
163
2.12M
  write_req_->set_stmt_id(reinterpret_cast<uint64_t>(write_req_.get()));
164
165
2.12M
  doc_op_ = std::make_shared<PgDocWriteOp>(pg_session_, &target_, table_id_, std::move(write_op));
166
2.12M
}
167
168
6.24M
PgsqlExpressionPB *PgDmlWrite::AllocColumnBindPB(PgColumn *col) {
169
6.24M
  return col->AllocBindPB(write_req_.get());
170
6.24M
}
171
172
259k
PgsqlExpressionPB *PgDmlWrite::AllocColumnAssignPB(PgColumn *col) {
173
259k
  return col->AllocAssignPB(write_req_.get());
174
259k
}
175
176
0
PgsqlExpressionPB *PgDmlWrite::AllocTargetPB() {
177
0
  return write_req_->add_targets();
178
0
}
179
180
0
PgsqlExpressionPB *PgDmlWrite::AllocQualPB() {
181
0
  LOG(FATAL) << "Pure virtual function is being called";
182
0
  return nullptr;
183
0
}
184
185
259k
PgsqlColRefPB *PgDmlWrite::AllocColRefPB() {
186
259k
  return write_req_->add_col_refs();
187
259k
}
188
189
2.12M
void PgDmlWrite::ClearColRefPBs() {
190
2.12M
  write_req_->clear_col_refs();
191
2.12M
}
192
193
}  // namespace pggate
194
}  // namespace yb