/Users/deen/code/yugabyte-db/src/yb/client/table_handle.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/client/table_handle.h" |
15 | | |
16 | | #include "yb/client/client.h" |
17 | | #include "yb/client/error.h" |
18 | | #include "yb/client/schema.h" |
19 | | #include "yb/client/session.h" |
20 | | #include "yb/client/table.h" |
21 | | #include "yb/client/table_creator.h" |
22 | | #include "yb/client/yb_op.h" |
23 | | |
24 | | #include "yb/common/partition.h" |
25 | | #include "yb/common/ql_type.h" |
26 | | #include "yb/common/schema.h" |
27 | | |
28 | | #include "yb/master/master_client.pb.h" |
29 | | |
30 | | #include "yb/util/format.h" |
31 | | #include "yb/util/result.h" |
32 | | #include "yb/util/status_format.h" |
33 | | |
34 | | using namespace std::literals; // NOLINT |
35 | | |
36 | | namespace yb { |
37 | | namespace client { |
38 | | |
39 | | Status TableHandle::Create(const YBTableName& table_name, |
40 | | int num_tablets, |
41 | | YBClient* client, |
42 | | YBSchemaBuilder* builder, |
43 | 14 | IndexInfoPB* index_info) { |
44 | 14 | YBSchema schema; |
45 | 14 | RETURN_NOT_OK(builder->Build(&schema)); |
46 | 14 | return Create(table_name, num_tablets, schema, client, index_info); |
47 | 14 | } |
48 | | |
49 | | Status TableHandle::Create(const YBTableName& table_name, |
50 | | int num_tablets, |
51 | | const YBSchema& schema, |
52 | | YBClient* client, |
53 | 81 | IndexInfoPB* index_info) { |
54 | 81 | std::unique_ptr<YBTableCreator> table_creator(client->NewTableCreator()); |
55 | 81 | table_creator->table_name(table_name) |
56 | 81 | .schema(&schema) |
57 | 81 | .num_tablets(num_tablets); |
58 | | |
59 | | // Setup Index properties. |
60 | 81 | if (index_info) { |
61 | 0 | table_creator->indexed_table_id(index_info->indexed_table_id()) |
62 | 0 | .is_local_index(index_info->is_local()) |
63 | 0 | .is_unique_index(index_info->is_unique()) |
64 | 0 | .mutable_index_info()->CopyFrom(*index_info); |
65 | 0 | } |
66 | | |
67 | 81 | RETURN_NOT_OK(table_creator->Create()); |
68 | 81 | return Open(table_name, client); |
69 | 81 | } |
70 | | |
71 | 515 | Status TableHandle::Open(const YBTableName& table_name, YBClient* client) { |
72 | 515 | RETURN_NOT_OK(client->OpenTable(table_name, &table_)); |
73 | | |
74 | 514 | client_ = client; |
75 | 514 | auto schema = table_->schema(); |
76 | 2.33k | for (size_t i = 0; i < schema.num_columns(); ++i1.81k ) { |
77 | 1.81k | yb::ColumnId col_id = yb::ColumnId(schema.ColumnId(i)); |
78 | 1.81k | column_ids_.emplace(schema.Column(i).name(), col_id); |
79 | 1.81k | column_types_.emplace(col_id, schema.Column(i).type()); |
80 | 1.81k | } |
81 | | |
82 | 514 | return Status::OK(); |
83 | 515 | } |
84 | | |
85 | 0 | Status TableHandle::Reopen() { |
86 | 0 | return Open(name(), client_); |
87 | 0 | } |
88 | | |
89 | 218 | const YBTableName& TableHandle::name() const { |
90 | 218 | return table_->name(); |
91 | 218 | } |
92 | | |
93 | 3.44M | const YBSchema& TableHandle::schema() const { |
94 | 3.44M | return table_->schema(); |
95 | 3.44M | } |
96 | | |
97 | 22 | std::vector<std::string> TableHandle::AllColumnNames() const { |
98 | 22 | std::vector<std::string> result; |
99 | 22 | result.reserve(table_->schema().columns().size()); |
100 | 64 | for (const auto& column : table_->schema().columns()) { |
101 | 64 | result.push_back(column.name()); |
102 | 64 | } |
103 | 22 | return result; |
104 | 22 | } |
105 | | |
106 | | namespace { |
107 | | |
108 | | template<class T> |
109 | 1.91M | auto SetupRequest(const T& op, const YBSchema& schema) { |
110 | 1.91M | auto* req = op->mutable_request(); |
111 | 1.91M | req->set_client(YQL_CLIENT_CQL); |
112 | 1.91M | req->set_request_id(0); |
113 | 1.91M | req->set_query_id(reinterpret_cast<int64_t>(op.get())); |
114 | 1.91M | req->set_schema_version(schema.version()); |
115 | 1.91M | req->set_is_compatible_with_previous_version(schema.is_compatible_with_previous_version()); |
116 | 1.91M | return req; |
117 | 1.91M | } table_handle.cc:auto yb::client::(anonymous namespace)::SetupRequest<std::__1::shared_ptr<yb::client::YBqlWriteOp> >(std::__1::shared_ptr<yb::client::YBqlWriteOp> const&, yb::client::YBSchema const&) Line | Count | Source | 109 | 1.91M | auto SetupRequest(const T& op, const YBSchema& schema) { | 110 | 1.91M | auto* req = op->mutable_request(); | 111 | 1.91M | req->set_client(YQL_CLIENT_CQL); | 112 | 1.91M | req->set_request_id(0); | 113 | 1.91M | req->set_query_id(reinterpret_cast<int64_t>(op.get())); | 114 | 1.91M | req->set_schema_version(schema.version()); | 115 | 1.91M | req->set_is_compatible_with_previous_version(schema.is_compatible_with_previous_version()); | 116 | 1.91M | return req; | 117 | 1.91M | } |
table_handle.cc:auto yb::client::(anonymous namespace)::SetupRequest<std::__1::shared_ptr<yb::client::YBqlReadOp> >(std::__1::shared_ptr<yb::client::YBqlReadOp> const&, yb::client::YBSchema const&) Line | Count | Source | 109 | 960 | auto SetupRequest(const T& op, const YBSchema& schema) { | 110 | 960 | auto* req = op->mutable_request(); | 111 | 960 | req->set_client(YQL_CLIENT_CQL); | 112 | 960 | req->set_request_id(0); | 113 | 960 | req->set_query_id(reinterpret_cast<int64_t>(op.get())); | 114 | 960 | req->set_schema_version(schema.version()); | 115 | 960 | req->set_is_compatible_with_previous_version(schema.is_compatible_with_previous_version()); | 116 | 960 | return req; | 117 | 960 | } |
|
118 | | |
119 | | } // namespace |
120 | | |
121 | 1.91M | std::shared_ptr<YBqlWriteOp> TableHandle::NewWriteOp(QLWriteRequestPB::QLStmtType type) const { |
122 | 1.91M | auto op = std::make_shared<YBqlWriteOp>(table_); |
123 | 1.91M | auto* req = SetupRequest(op, table_->schema()); |
124 | 1.91M | req->set_type(type); |
125 | 1.91M | return op; |
126 | 1.91M | } |
127 | | |
128 | 960 | std::shared_ptr<YBqlReadOp> TableHandle::NewReadOp() const { |
129 | 960 | std::shared_ptr<YBqlReadOp> op(table_->NewQLRead()); |
130 | 960 | SetupRequest(op, table_->schema()); |
131 | 960 | return op; |
132 | 960 | } |
133 | | |
134 | 3.72M | QLValuePB* TableHandle::PrepareColumn(QLWriteRequestPB* req, const string& column_name) const { |
135 | 3.72M | return QLPrepareColumn(req, ColumnId(column_name)); |
136 | 3.72M | } |
137 | | |
138 | | #define TABLE_HANDLE_TYPE_DEFINITIONS_IMPL(name, lname, type) \ |
139 | | void TableHandle::PP_CAT3(Add, name, ColumnValue)( \ |
140 | 3.73M | QLWriteRequestPB* req, const std::string &column_name, type value) const { \ |
141 | 3.73M | PrepareColumn(req, column_name)->PP_CAT3(set_, lname, _value)(value); \ |
142 | 3.73M | } \ yb::client::TableHandle::AddInt8ColumnValue(yb::QLWriteRequestPB*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, signed char) const Line | Count | Source | 140 | 1.62k | QLWriteRequestPB* req, const std::string &column_name, type value) const { \ | 141 | 1.62k | PrepareColumn(req, column_name)->PP_CAT3(set_, lname, _value)(value); \ | 142 | 1.62k | } \ |
yb::client::TableHandle::AddInt16ColumnValue(yb::QLWriteRequestPB*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, short) const Line | Count | Source | 140 | 1.62k | QLWriteRequestPB* req, const std::string &column_name, type value) const { \ | 141 | 1.62k | PrepareColumn(req, column_name)->PP_CAT3(set_, lname, _value)(value); \ | 142 | 1.62k | } \ |
yb::client::TableHandle::AddInt32ColumnValue(yb::QLWriteRequestPB*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int) const Line | Count | Source | 140 | 1.74M | QLWriteRequestPB* req, const std::string &column_name, type value) const { \ | 141 | 1.74M | PrepareColumn(req, column_name)->PP_CAT3(set_, lname, _value)(value); \ | 142 | 1.74M | } \ |
yb::client::TableHandle::AddInt64ColumnValue(yb::QLWriteRequestPB*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, long long) const Line | Count | Source | 140 | 168k | QLWriteRequestPB* req, const std::string &column_name, type value) const { \ | 141 | 168k | PrepareColumn(req, column_name)->PP_CAT3(set_, lname, _value)(value); \ | 142 | 168k | } \ |
yb::client::TableHandle::AddStringColumnValue(yb::QLWriteRequestPB*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) const Line | Count | Source | 140 | 1.72M | QLWriteRequestPB* req, const std::string &column_name, type value) const { \ | 141 | 1.72M | PrepareColumn(req, column_name)->PP_CAT3(set_, lname, _value)(value); \ | 142 | 1.72M | } \ |
yb::client::TableHandle::AddBinaryColumnValue(yb::QLWriteRequestPB*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) const Line | Count | Source | 140 | 1.62k | QLWriteRequestPB* req, const std::string &column_name, type value) const { \ | 141 | 1.62k | PrepareColumn(req, column_name)->PP_CAT3(set_, lname, _value)(value); \ | 142 | 1.62k | } \ |
yb::client::TableHandle::AddBoolColumnValue(yb::QLWriteRequestPB*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, bool) const Line | Count | Source | 140 | 85.0k | QLWriteRequestPB* req, const std::string &column_name, type value) const { \ | 141 | 85.0k | PrepareColumn(req, column_name)->PP_CAT3(set_, lname, _value)(value); \ | 142 | 85.0k | } \ |
Unexecuted instantiation: yb::client::TableHandle::AddFloatColumnValue(yb::QLWriteRequestPB*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, float) const Unexecuted instantiation: yb::client::TableHandle::AddDoubleColumnValue(yb::QLWriteRequestPB*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, double) const Unexecuted instantiation: yb::client::TableHandle::AddJsonbColumnValue(yb::QLWriteRequestPB*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) const Unexecuted instantiation: yb::client::TableHandle::AddTimestampColumnValue(yb::QLWriteRequestPB*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, long long) const |
143 | | \ |
144 | | void TableHandle::PP_CAT3(Set, name, Condition)( \ |
145 | | QLConditionPB* const condition, const string& column_name, const QLOperator op, \ |
146 | 1 | type value) const { \ |
147 | 1 | PrepareCondition(condition, column_name, op)->PP_CAT3(set_, lname, _value)(value); \ |
148 | 1 | } \ Unexecuted instantiation: yb::client::TableHandle::SetInt8Condition(yb::QLConditionPB*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::QLOperator, signed char) const Unexecuted instantiation: yb::client::TableHandle::SetInt16Condition(yb::QLConditionPB*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::QLOperator, short) const yb::client::TableHandle::SetInt32Condition(yb::QLConditionPB*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::QLOperator, int) const Line | Count | Source | 146 | 1 | type value) const { \ | 147 | 1 | PrepareCondition(condition, column_name, op)->PP_CAT3(set_, lname, _value)(value); \ | 148 | 1 | } \ |
Unexecuted instantiation: yb::client::TableHandle::SetInt64Condition(yb::QLConditionPB*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::QLOperator, long long) const Unexecuted instantiation: yb::client::TableHandle::SetStringCondition(yb::QLConditionPB*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::QLOperator, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) const Unexecuted instantiation: yb::client::TableHandle::SetBinaryCondition(yb::QLConditionPB*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::QLOperator, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) const Unexecuted instantiation: yb::client::TableHandle::SetBoolCondition(yb::QLConditionPB*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::QLOperator, bool) const Unexecuted instantiation: yb::client::TableHandle::SetFloatCondition(yb::QLConditionPB*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::QLOperator, float) const Unexecuted instantiation: yb::client::TableHandle::SetDoubleCondition(yb::QLConditionPB*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::QLOperator, double) const Unexecuted instantiation: yb::client::TableHandle::SetJsonbCondition(yb::QLConditionPB*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::QLOperator, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) const Unexecuted instantiation: yb::client::TableHandle::SetTimestampCondition(yb::QLConditionPB*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::QLOperator, long long) const |
149 | | \ |
150 | | void TableHandle::PP_CAT3(Add, name, Condition)( \ |
151 | | QLConditionPB* const condition, const string& column_name, const QLOperator op, \ |
152 | 0 | type value) const { \ |
153 | 0 | PP_CAT3(Set, name, Condition)( \ |
154 | 0 | condition->add_operands()->mutable_condition(), column_name, op, value); \ |
155 | 0 | } \ Unexecuted instantiation: yb::client::TableHandle::AddInt8Condition(yb::QLConditionPB*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::QLOperator, signed char) const Unexecuted instantiation: yb::client::TableHandle::AddInt16Condition(yb::QLConditionPB*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::QLOperator, short) const Unexecuted instantiation: yb::client::TableHandle::AddInt32Condition(yb::QLConditionPB*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::QLOperator, int) const Unexecuted instantiation: yb::client::TableHandle::AddInt64Condition(yb::QLConditionPB*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::QLOperator, long long) const Unexecuted instantiation: yb::client::TableHandle::AddStringCondition(yb::QLConditionPB*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::QLOperator, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) const Unexecuted instantiation: yb::client::TableHandle::AddBinaryCondition(yb::QLConditionPB*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::QLOperator, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) const Unexecuted instantiation: yb::client::TableHandle::AddBoolCondition(yb::QLConditionPB*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::QLOperator, bool) const Unexecuted instantiation: yb::client::TableHandle::AddFloatCondition(yb::QLConditionPB*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::QLOperator, float) const Unexecuted instantiation: yb::client::TableHandle::AddDoubleCondition(yb::QLConditionPB*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::QLOperator, double) const Unexecuted instantiation: yb::client::TableHandle::AddJsonbCondition(yb::QLConditionPB*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::QLOperator, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) const Unexecuted instantiation: yb::client::TableHandle::AddTimestampCondition(yb::QLConditionPB*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::QLOperator, long long) const |
156 | | |
157 | | #define TABLE_HANDLE_TYPE_DEFINITIONS(i, data, entry) TABLE_HANDLE_TYPE_DEFINITIONS_IMPL entry |
158 | | |
159 | | BOOST_PP_SEQ_FOR_EACH(TABLE_HANDLE_TYPE_DEFINITIONS, ~, QL_PROTOCOL_TYPES); |
160 | | |
161 | 0 | void TableHandle::SetColumn(QLColumnValuePB* column_value, const string& column_name) const { |
162 | 0 | column_value->set_column_id(ColumnId(column_name)); |
163 | 0 | } |
164 | | |
165 | | QLValuePB* TableHandle::PrepareCondition( |
166 | 1 | QLConditionPB* const condition, const string& column_name, const QLOperator op) const { |
167 | 1 | return QLPrepareCondition(condition, ColumnId(column_name), op); |
168 | 1 | } |
169 | | |
170 | 0 | void TableHandle::AddCondition(QLConditionPB* const condition, const QLOperator op) const { |
171 | 0 | condition->add_operands()->mutable_condition()->set_op(op); |
172 | 0 | } |
173 | | |
174 | 894 | void TableHandle::AddColumns(const std::vector<std::string>& columns, QLReadRequestPB* req) const { |
175 | 894 | QLRSRowDescPB* rsrow_desc = req->mutable_rsrow_desc(); |
176 | 1.62k | for (const auto& column : columns) { |
177 | 1.62k | auto id = ColumnId(column); |
178 | 1.62k | req->add_selected_exprs()->set_column_id(id); |
179 | 1.62k | req->mutable_column_refs()->add_ids(id); |
180 | | |
181 | 1.62k | QLRSColDescPB* rscol_desc = rsrow_desc->add_rscol_descs(); |
182 | 1.62k | rscol_desc->set_name(column); |
183 | 1.62k | ColumnType(column)->ToQLTypePB(rscol_desc->mutable_ql_type()); |
184 | 1.62k | } |
185 | 894 | } |
186 | | |
187 | 218 | TableIteratorOptions::TableIteratorOptions() {} |
188 | | |
189 | 218 | TableIterator::TableIterator() : table_(nullptr) {} |
190 | | |
191 | | #define REPORT_AND_RETURN_IF_NOT_OK(expr) \ |
192 | 1.01k | do { \ |
193 | 1.01k | auto&& status = (expr); \ |
194 | 1.01k | if (!status.ok()) { HandleError(MoveStatus(status)); return; }0 \ |
195 | 1.01k | } while (false) \ |
196 | | |
197 | | #define REPORT_AND_RETURN_FALSE_IF_NOT_OK(expr) \ |
198 | | do { \ |
199 | | auto&& status = (expr); \ |
200 | | if (!status.ok()) { HandleError(MoveStatus(status)); return false; } \ |
201 | | } while (false) \ |
202 | | |
203 | | TableIterator::TableIterator(const TableHandle* table, const TableIteratorOptions& options) |
204 | 218 | : table_(table), error_handler_(options.error_handler) { |
205 | 218 | auto client = table->client(); |
206 | | |
207 | 218 | session_ = client->NewSession(); |
208 | | |
209 | 218 | google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets; |
210 | 218 | REPORT_AND_RETURN_IF_NOT_OK(client->GetTablets( |
211 | 218 | table->name(), /* max_tablets = */ 0, &tablets, /* partition_list_version =*/nullptr)); |
212 | 218 | if (tablets.size() == 0) { |
213 | 0 | table_ = nullptr; |
214 | 0 | return; |
215 | 0 | } |
216 | 218 | ops_.reserve(tablets.size()); |
217 | 218 | partition_key_ends_.reserve(tablets.size()); |
218 | | |
219 | 278 | for (const auto& tablet : tablets) { |
220 | 278 | if (!options.tablet.empty() && options.tablet != tablet.tablet_id()0 ) { |
221 | 0 | continue; |
222 | 0 | } |
223 | 278 | auto op = table->NewReadOp(); |
224 | 278 | auto req = op->mutable_request(); |
225 | 278 | op->set_yb_consistency_level(options.consistency); |
226 | | |
227 | 278 | const auto& key_start = tablet.partition().partition_key_start(); |
228 | 278 | if (!key_start.empty()) { |
229 | 60 | req->set_hash_code(PartitionSchema::DecodeMultiColumnHashValue(key_start)); |
230 | 60 | } |
231 | | |
232 | 278 | if (options.filter) { |
233 | 0 | options.filter(*table_, req->mutable_where_expr()->mutable_condition()); |
234 | 278 | } else { |
235 | 278 | req->set_return_paging_state(true); |
236 | 278 | req->set_limit(128); |
237 | 278 | } |
238 | 278 | if (options.read_time) { |
239 | 6 | op->SetReadTime(options.read_time); |
240 | 6 | } |
241 | 278 | table_->AddColumns(*options.columns, req); |
242 | 278 | ops_.push_back(std::move(op)); |
243 | 278 | partition_key_ends_.push_back(tablet.partition().partition_key_end()); |
244 | 278 | } |
245 | | |
246 | 218 | if (ExecuteOps()) { |
247 | 218 | Move(); |
248 | 218 | } |
249 | 218 | } |
250 | | |
251 | 218 | bool TableIterator::ExecuteOps() { |
252 | 218 | constexpr size_t kMaxConcurrentOps = 5; |
253 | 218 | const size_t new_executed_ops = std::min(ops_.size(), executed_ops_ + kMaxConcurrentOps); |
254 | 496 | for (size_t i = executed_ops_; i != new_executed_ops; ++i278 ) { |
255 | 278 | session_->Apply(ops_[i]); |
256 | 278 | } |
257 | | |
258 | 218 | if (!IsFlushStatusOkOrHandleErrors(session_->FlushAndGetOpsErrors())) { |
259 | 0 | return false; |
260 | 0 | } |
261 | | |
262 | 496 | for (size_t i = executed_ops_; 218 i != new_executed_ops; ++i278 ) { |
263 | 278 | const auto& op = ops_[i]; |
264 | 278 | if (QLResponsePB::YQL_STATUS_OK != op->response().status()) { |
265 | 0 | HandleError(STATUS_FORMAT(RuntimeError, "Error for $0: $1", *op, op->response())); |
266 | 0 | } |
267 | 278 | } |
268 | | |
269 | 218 | executed_ops_ = new_executed_ops; |
270 | 218 | return true; |
271 | 218 | } |
272 | | |
273 | 69.4k | bool TableIterator::Equals(const TableIterator& rhs) const { |
274 | 69.4k | return table_ == rhs.table_; |
275 | 69.4k | } |
276 | | |
277 | 69.1k | TableIterator& TableIterator::operator++() { |
278 | 69.1k | ++row_index_; |
279 | 69.1k | Move(); |
280 | 69.1k | return *this; |
281 | 69.1k | } |
282 | | |
283 | 68.9k | const QLRow& TableIterator::operator*() const { |
284 | 68.9k | return current_block_->rows()[row_index_]; |
285 | 68.9k | } |
286 | | |
287 | 69.4k | void TableIterator::Move() { |
288 | 70.2k | while (!current_block_ || row_index_ == current_block_->rows().size()69.9k ) { |
289 | 1.01k | if (current_block_) { |
290 | 796 | if (paging_state_) { |
291 | 518 | auto& op = ops_[ops_index_]; |
292 | 518 | *op->mutable_request()->mutable_paging_state() = *paging_state_; |
293 | 518 | session_->Apply(op); |
294 | 518 | if (!IsFlushStatusOkOrHandleErrors(session_->FlushAndGetOpsErrors())) { |
295 | 0 | return; |
296 | 0 | } |
297 | 518 | if (QLResponsePB::YQL_STATUS_OK != op->response().status()) { |
298 | 0 | HandleError(STATUS_FORMAT(RuntimeError, "Error for $0: $1", *op, op->response())); |
299 | 0 | } |
300 | 518 | } else { |
301 | 278 | ++ops_index_; |
302 | 278 | if (ops_index_ >= executed_ops_ && executed_ops_ < ops_.size()218 ) { |
303 | 0 | if (!ExecuteOps()) { |
304 | | // Error occurred. exit out early. |
305 | 0 | return; |
306 | 0 | } |
307 | 0 | } |
308 | 278 | } |
309 | 796 | } |
310 | 1.01k | if (ops_index_ == ops_.size()) { |
311 | 218 | table_ = nullptr; |
312 | 218 | return; |
313 | 218 | } |
314 | 796 | auto& op = *ops_[ops_index_]; |
315 | 796 | auto next_block = op.MakeRowBlock(); |
316 | 796 | REPORT_AND_RETURN_IF_NOT_OK(next_block); |
317 | 796 | current_block_ = std::move(*next_block); |
318 | 796 | paging_state_ = op.response().has_paging_state() ? &op.response().paging_state()578 : nullptr218 ; |
319 | 796 | if (ops_index_ < partition_key_ends_.size() - 1 && paging_state_407 && |
320 | 796 | paging_state_->next_partition_key() >= partition_key_ends_[ops_index_]407 ) { |
321 | 60 | paging_state_ = nullptr; |
322 | 60 | } |
323 | 796 | row_index_ = 0; |
324 | | |
325 | 796 | VLOG(4) << "New block: " << yb::ToString(current_block_->rows()) |
326 | 0 | << ", paging: " << yb::ToString(paging_state_); |
327 | 796 | } |
328 | 69.4k | } |
329 | | |
330 | 736 | bool TableIterator::IsFlushStatusOkOrHandleErrors(FlushStatus flush_status) { |
331 | 736 | if (flush_status.status.ok()) { |
332 | 736 | return true; |
333 | 736 | } |
334 | 0 | HandleError(flush_status.status); |
335 | 0 | if (!error_handler_) { |
336 | 0 | for (const auto& error : flush_status.errors) { |
337 | 0 | LOG(ERROR) << "Failed operation: " << error->failed_op().ToString() |
338 | 0 | << ", status: " << error->status(); |
339 | 0 | } |
340 | 0 | } |
341 | 0 | return false; |
342 | 736 | } |
343 | | |
344 | 0 | void TableIterator::HandleError(const Status& status) { |
345 | 0 | if (error_handler_) { |
346 | 0 | error_handler_(status); |
347 | 0 | } else { |
348 | 0 | LOG(FATAL) << "Failed: " << status; |
349 | 0 | } |
350 | | // Makes this iterator == end(). |
351 | 0 | table_ = nullptr; |
352 | 0 | } |
353 | | |
354 | | template <> |
355 | | void FilterBetweenImpl<int32_t>::operator()( |
356 | 0 | const TableHandle& table, QLConditionPB* condition) const { |
357 | 0 | condition->set_op(QL_OP_AND); |
358 | 0 | table.AddInt32Condition( |
359 | 0 | condition, column_, lower_inclusive_ ? QL_OP_GREATER_THAN_EQUAL : QL_OP_GREATER_THAN, |
360 | 0 | lower_bound_); |
361 | 0 | table.AddInt32Condition( |
362 | 0 | condition, column_, upper_inclusive_ ? QL_OP_LESS_THAN_EQUAL : QL_OP_LESS_THAN, upper_bound_); |
363 | 0 | } |
364 | | |
365 | | template <> |
366 | | void FilterBetweenImpl<std::string>::operator()( |
367 | 0 | const TableHandle& table, QLConditionPB* condition) const { |
368 | 0 | condition->set_op(QL_OP_AND); |
369 | 0 | table.AddStringCondition( |
370 | 0 | condition, column_, lower_inclusive_ ? QL_OP_GREATER_THAN_EQUAL : QL_OP_GREATER_THAN, |
371 | 0 | lower_bound_); |
372 | 0 | table.AddStringCondition( |
373 | 0 | condition, column_, upper_inclusive_ ? QL_OP_LESS_THAN_EQUAL : QL_OP_LESS_THAN, upper_bound_); |
374 | 0 | } |
375 | | |
376 | 0 | void FilterGreater::operator()(const TableHandle& table, QLConditionPB* condition) const { |
377 | 0 | table.SetInt32Condition( |
378 | 0 | condition, column_, inclusive_ ? QL_OP_GREATER_THAN_EQUAL : QL_OP_GREATER_THAN, bound_); |
379 | 0 | } |
380 | | |
381 | 0 | void FilterLess::operator()(const TableHandle& table, QLConditionPB* condition) const { |
382 | 0 | table.SetInt32Condition( |
383 | 0 | condition, column_, inclusive_ ? QL_OP_LESS_THAN_EQUAL : QL_OP_LESS_THAN, bound_); |
384 | 0 | } |
385 | | |
386 | | template <> |
387 | | void FilterEqualImpl<std::string>::operator()( |
388 | 0 | const TableHandle& table, QLConditionPB* condition) const { |
389 | 0 | table.SetBinaryCondition(condition, column_, QL_OP_EQUAL, t_); |
390 | 0 | } |
391 | | |
392 | | } // namespace client |
393 | | } // namespace yb |