/Users/deen/code/yugabyte-db/src/yb/tablet/write_query.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/tablet/write_query.h" |
15 | | |
16 | | #include "yb/client/client.h" |
17 | | #include "yb/client/error.h" |
18 | | #include "yb/client/meta_data_cache.h" |
19 | | #include "yb/client/session.h" |
20 | | #include "yb/client/table.h" |
21 | | #include "yb/client/transaction.h" |
22 | | #include "yb/client/yb_op.h" |
23 | | |
24 | | #include "yb/common/index.h" |
25 | | #include "yb/common/row_mark.h" |
26 | | #include "yb/common/schema.h" |
27 | | |
28 | | #include "yb/docdb/conflict_resolution.h" |
29 | | #include "yb/docdb/cql_operation.h" |
30 | | #include "yb/docdb/doc_write_batch.h" |
31 | | #include "yb/docdb/pgsql_operation.h" |
32 | | #include "yb/docdb/redis_operation.h" |
33 | | |
34 | | #include "yb/tablet/tablet_metadata.h" |
35 | | #include "yb/tablet/operations/write_operation.h" |
36 | | #include "yb/tablet/tablet.h" |
37 | | #include "yb/tablet/tablet_metrics.h" |
38 | | #include "yb/tablet/transaction_participant.h" |
39 | | #include "yb/tablet/write_query_context.h" |
40 | | |
41 | | #include "yb/tserver/tserver.pb.h" |
42 | | |
43 | | #include "yb/util/logging.h" |
44 | | #include "yb/util/metrics.h" |
45 | | #include "yb/util/trace.h" |
46 | | |
47 | | using namespace std::placeholders; |
48 | | |
49 | | namespace yb { |
50 | | namespace tablet { |
51 | | |
52 | | namespace { |
53 | | |
54 | | // Separate Redis / QL / row operations write batches from write_request in preparation for the |
55 | | // write transaction. Leave just the tablet id behind. Return Redis / QL / row operations, etc. |
56 | | // in batch_request. |
57 | 1.59M | void SetupKeyValueBatch(const tserver::WriteRequestPB& client_request, WritePB* out_request) { |
58 | 1.59M | out_request->set_unused_tablet_id(""); // Backward compatibility. |
59 | 1.59M | auto& out_write_batch = *out_request->mutable_write_batch(); |
60 | 1.59M | if (client_request.has_write_batch()) { |
61 | 405k | out_write_batch = client_request.write_batch(); |
62 | 405k | } |
63 | 1.59M | out_write_batch.set_deprecated_may_have_metadata(true); |
64 | 1.59M | if (client_request.has_request_id()) { |
65 | 1.31M | out_request->set_client_id1(client_request.client_id1()); |
66 | 1.31M | out_request->set_client_id2(client_request.client_id2()); |
67 | 1.31M | out_request->set_request_id(client_request.request_id()); |
68 | 1.31M | out_request->set_min_running_request_id(client_request.min_running_request_id()); |
69 | 1.31M | } |
70 | 1.59M | out_request->set_batch_idx(client_request.batch_idx()); |
71 | | // Actually, in production code, we could check for external hybrid time only when there are |
72 | | // no ql, pgsql, redis operations. |
73 | | // But in CDCServiceTest we have ql write batch with external time. |
74 | 1.59M | if (client_request.has_external_hybrid_time()) { |
75 | 1.18k | out_request->set_external_hybrid_time(client_request.external_hybrid_time()); |
76 | 1.18k | } |
77 | 1.59M | } |
78 | | |
79 | | } // namespace |
80 | | |
81 | | enum class WriteQuery::ExecuteMode { |
82 | | kSimple, |
83 | | kRedis, |
84 | | kCql, |
85 | | kPgsql, |
86 | | }; |
87 | | |
88 | | WriteQuery::WriteQuery( |
89 | | int64_t term, |
90 | | CoarseTimePoint deadline, |
91 | | WriteQueryContext* context, |
92 | | Tablet* tablet, |
93 | | tserver::WriteResponsePB* response, |
94 | | docdb::OperationKind kind) |
95 | | : operation_(std::make_unique<WriteOperation>(tablet)), |
96 | | term_(term), deadline_(deadline), |
97 | | context_(context), |
98 | | response_(response), |
99 | | kind_(kind), |
100 | 1.74M | start_time_(CoarseMonoClock::Now()) { |
101 | 1.74M | } |
102 | | |
103 | 6.45M | WritePB& WriteQuery::request() { |
104 | 6.45M | return *operation_->mutable_request(); |
105 | 6.45M | } |
106 | | |
107 | 1.64M | std::unique_ptr<WriteOperation> WriteQuery::PrepareSubmit() { |
108 | 1.64M | operation_->set_completion_callback( |
109 | 1.64M | [operation = operation_.get(), query = this](const Status& status) { |
110 | 1.64M | std::unique_ptr<WriteQuery> query_holder(query); |
111 | 1.64M | query->Finished(operation, status); |
112 | 1.64M | }); |
113 | 1.64M | return std::move(operation_); |
114 | 1.64M | } |
115 | | |
116 | 1.74M | void WriteQuery::DoStartSynchronization(const Status& status) { |
117 | 1.74M | std::unique_ptr<WriteQuery> self(this); |
118 | | // Move submit_token_ so it is released after this function. |
119 | 1.74M | ScopedRWOperation submit_token(std::move(submit_token_)); |
120 | | // If a restart read is required, then we return this fact to caller and don't perform the write |
121 | | // operation. |
122 | 1.74M | if (status.ok() && restart_read_ht_.is_valid()) { |
123 | 0 | auto restart_time = response()->mutable_restart_read_time(); |
124 | 0 | restart_time->set_read_ht(restart_read_ht_.ToUint64()); |
125 | 0 | auto local_limit = context_->ReportReadRestart(); |
126 | 0 | if (!local_limit.ok()) { |
127 | 0 | Cancel(local_limit.status()); |
128 | 0 | return; |
129 | 0 | } |
130 | 0 | restart_time->set_deprecated_max_of_read_time_and_local_limit_ht(local_limit->ToUint64()); |
131 | 0 | restart_time->set_local_limit_ht(local_limit->ToUint64()); |
132 | | // Global limit is ignored by caller, so we don't set it. |
133 | 0 | Cancel(Status::OK()); |
134 | 0 | return; |
135 | 0 | } |
136 | | |
137 | 1.74M | if (!status.ok()) { |
138 | 107k | Cancel(status); |
139 | 107k | return; |
140 | 107k | } |
141 | | |
142 | 1.64M | context_->Submit(self.release()->PrepareSubmit(), term_); |
143 | 1.64M | } |
144 | | |
145 | 1.75M | void WriteQuery::Release() { |
146 | | // Free DocDB multi-level locks. |
147 | 1.75M | docdb_locks_.Reset(); |
148 | 1.75M | } |
149 | | |
150 | 1.75M | WriteQuery::~WriteQuery() { |
151 | 1.75M | } |
152 | | |
153 | 1.61M | void WriteQuery::set_client_request(std::reference_wrapper<const tserver::WriteRequestPB> req) { |
154 | 1.61M | client_request_ = &req.get(); |
155 | 1.61M | read_time_ = ReadHybridTime::FromReadTimePB(req.get()); |
156 | 1.61M | allow_immediate_read_restart_ = !read_time_; |
157 | 1.61M | } |
158 | | |
159 | 0 | void WriteQuery::set_client_request(std::unique_ptr<tserver::WriteRequestPB> req) { |
160 | 0 | set_client_request(*req); |
161 | 0 | client_request_holder_ = std::move(req); |
162 | 0 | } |
163 | | |
164 | 1.64M | void WriteQuery::Finished(WriteOperation* operation, const Status& status) { |
165 | 625 | LOG_IF(DFATAL, operation_) << "Finished not submitted operation: " << status; |
166 | | |
167 | 1.64M | if (status.ok()) { |
168 | 1.62M | TabletMetrics* metrics = operation->tablet()->metrics(); |
169 | 1.62M | if (metrics) { |
170 | 1.62M | auto op_duration_usec = MonoDelta(CoarseMonoClock::now() - start_time_).ToMicroseconds(); |
171 | 1.62M | metrics->write_op_duration_client_propagated_consistency->Increment(op_duration_usec); |
172 | 1.62M | } |
173 | 1.62M | } |
174 | | |
175 | 1.64M | Complete(status); |
176 | 1.64M | } |
177 | | |
178 | 107k | void WriteQuery::Cancel(const Status& status) { |
179 | 69 | LOG_IF(DFATAL, !operation_) << "Cancelled submitted operation: " << status; |
180 | | |
181 | 107k | Complete(status); |
182 | 107k | } |
183 | | |
184 | 1.75M | void WriteQuery::Complete(const Status& status) { |
185 | 1.75M | Release(); |
186 | | |
187 | 1.75M | if (callback_) { |
188 | 1.65M | callback_(status); |
189 | 1.65M | } |
190 | 1.75M | } |
191 | | |
192 | 1.74M | void WriteQuery::ExecuteDone(const Status& status) { |
193 | 1.74M | scoped_read_operation_.Reset(); |
194 | 1.74M | switch (execute_mode_) { |
195 | 140k | case ExecuteMode::kSimple: |
196 | 140k | SimpleExecuteDone(status); |
197 | 140k | return; |
198 | 61.5k | case ExecuteMode::kRedis: |
199 | 61.5k | RedisExecuteDone(status); |
200 | 61.5k | return; |
201 | 1.27M | case ExecuteMode::kCql: |
202 | 1.27M | CqlExecuteDone(status); |
203 | 1.27M | return; |
204 | 261k | case ExecuteMode::kPgsql: |
205 | 261k | PgsqlExecuteDone(status); |
206 | 261k | return; |
207 | 0 | } |
208 | 0 | FATAL_INVALID_ENUM_VALUE(ExecuteMode, execute_mode_); |
209 | 0 | } |
210 | | |
211 | 1.74M | Result<bool> WriteQuery::PrepareExecute() { |
212 | 1.74M | if (client_request_) { |
213 | 1.60M | auto* request = operation().AllocateRequest(); |
214 | 1.60M | SetupKeyValueBatch(*client_request_, request); |
215 | | |
216 | 1.60M | if (!client_request_->redis_write_batch().empty()) { |
217 | 61.5k | return RedisPrepareExecute(); |
218 | 61.5k | } |
219 | | |
220 | 1.54M | if (!client_request_->ql_write_batch().empty()) { |
221 | 1.27M | return CqlPrepareExecute(); |
222 | 1.27M | } |
223 | | |
224 | 261k | if (!client_request_->pgsql_write_batch().empty()) { |
225 | 260k | return PgsqlPrepareExecute(); |
226 | 260k | } |
227 | | |
228 | 1.28k | if (client_request_->has_write_batch() && client_request_->has_external_hybrid_time()) { |
229 | 0 | return false; |
230 | 0 | } |
231 | 140k | } else { |
232 | 140k | const auto* request = operation().request(); |
233 | 140k | if (request && request->has_write_batch() && !request->write_batch().read_pairs().empty()) { |
234 | 140k | return SimplePrepareExecute(); |
235 | 140k | } |
236 | 1.35k | } |
237 | | |
238 | | // Empty write should not happen, but we could handle it. |
239 | | // Just report it as error in release mode. |
240 | 1.35k | LOG(DFATAL) << "Empty write: " << AsString(client_request_) << ", " << AsString(request()); |
241 | | |
242 | 1.35k | return STATUS(InvalidArgument, "Empty write"); |
243 | 1.35k | } |
244 | | |
245 | 1.74M | CHECKED_STATUS WriteQuery::InitExecute(ExecuteMode mode) { |
246 | 1.74M | scoped_read_operation_ = tablet().CreateNonAbortableScopedRWOperation(); |
247 | 1.74M | if (!scoped_read_operation_.ok()) { |
248 | 7 | return MoveStatus(scoped_read_operation_); |
249 | 7 | } |
250 | 1.74M | execute_mode_ = mode; |
251 | 1.74M | return Status::OK(); |
252 | 1.74M | } |
253 | | |
254 | 61.5k | Result<bool> WriteQuery::RedisPrepareExecute() { |
255 | 61.5k | RETURN_NOT_OK(InitExecute(ExecuteMode::kRedis)); |
256 | | |
257 | | // Since we take exclusive locks, it's okay to use Now as the read TS for writes. |
258 | 61.5k | const auto& redis_write_batch = client_request_->redis_write_batch(); |
259 | | |
260 | 61.5k | doc_ops_.reserve(redis_write_batch.size()); |
261 | 61.5k | for (const auto& redis_request : redis_write_batch) { |
262 | 61.5k | doc_ops_.emplace_back(new docdb::RedisWriteOperation(redis_request)); |
263 | 61.5k | } |
264 | | |
265 | 61.5k | return true; |
266 | 61.5k | } |
267 | | |
268 | 140k | Result<bool> WriteQuery::SimplePrepareExecute() { |
269 | 140k | RETURN_NOT_OK(InitExecute(ExecuteMode::kSimple)); |
270 | 140k | return true; |
271 | 140k | } |
272 | | |
273 | 1.27M | Result<bool> WriteQuery::CqlPrepareExecute() { |
274 | 1.27M | RETURN_NOT_OK(InitExecute(ExecuteMode::kCql)); |
275 | | |
276 | 1.27M | auto& metadata = *tablet().metadata(); |
277 | 18.4E | DVLOG(2) << "Schema version for " << metadata.table_name() << ": " << metadata.schema_version(); |
278 | | |
279 | 1.27M | const auto& ql_write_batch = client_request_->ql_write_batch(); |
280 | | |
281 | 1.27M | doc_ops_.reserve(ql_write_batch.size()); |
282 | | |
283 | 1.27M | auto txn_op_ctx = VERIFY_RESULT(tablet().CreateTransactionOperationContext( |
284 | 1.27M | request().write_batch().transaction(), |
285 | 1.27M | /* is_ysql_catalog_table */ false, |
286 | 1.27M | &request().write_batch().subtransaction())); |
287 | 1.27M | auto table_info = metadata.primary_table_info(); |
288 | 3.38M | for (const auto& req : ql_write_batch) { |
289 | 3.38M | QLResponsePB* resp = response_->add_ql_response_batch(); |
290 | 3.38M | if (!IsSchemaVersionCompatible( |
291 | 3.38M | table_info->schema_version, req.schema_version(), |
292 | 1.54k | req.is_compatible_with_previous_version())) { |
293 | 18.4E | DVLOG(1) << " On " << table_info->table_name |
294 | 18.4E | << " Setting status for write as YQL_STATUS_SCHEMA_VERSION_MISMATCH tserver's: " |
295 | 18.4E | << table_info->schema_version << " vs req's : " << req.schema_version() |
296 | 18.4E | << " is req compatible with prev version: " |
297 | 18.4E | << req.is_compatible_with_previous_version() << " for " << AsString(req); |
298 | 1.54k | resp->set_status(QLResponsePB::YQL_STATUS_SCHEMA_VERSION_MISMATCH); |
299 | 1.54k | resp->set_error_message(Format( |
300 | 1.54k | "schema version mismatch for table $0: expected $1, got $2 (compt with prev: $3)", |
301 | 1.54k | table_info->table_id, |
302 | 1.54k | table_info->schema_version, req.schema_version(), |
303 | 1.54k | req.is_compatible_with_previous_version())); |
304 | 3.38M | } else { |
305 | 6.20k | DVLOG(3) << "Version matches : " << table_info->schema_version << " for " |
306 | 6.20k | << AsString(req); |
307 | 3.38M | auto write_op = std::make_unique<docdb::QLWriteOperation>( |
308 | 3.38M | req, std::shared_ptr<Schema>(table_info, table_info->schema.get()), |
309 | 3.38M | *table_info->index_map, tablet().unique_index_key_schema(), |
310 | 3.38M | txn_op_ctx); |
311 | 3.38M | RETURN_NOT_OK(write_op->Init(resp)); |
312 | 3.38M | doc_ops_.emplace_back(std::move(write_op)); |
313 | 3.38M | } |
314 | 3.38M | } |
315 | | |
316 | | // All operations has wrong schema version |
317 | 1.27M | if (doc_ops_.empty()) { |
318 | 1.28k | return false; |
319 | 1.28k | } |
320 | | |
321 | 1.27M | return true; |
322 | 1.27M | } |
323 | | |
324 | 260k | Result<bool> WriteQuery::PgsqlPrepareExecute() { |
325 | 260k | RETURN_NOT_OK(InitExecute(ExecuteMode::kPgsql)); |
326 | | |
327 | 260k | const auto& pgsql_write_batch = client_request_->pgsql_write_batch(); |
328 | | |
329 | 260k | doc_ops_.reserve(pgsql_write_batch.size()); |
330 | | |
331 | 260k | TransactionOperationContext txn_op_ctx; |
332 | | |
333 | 260k | auto& metadata = *tablet().metadata(); |
334 | 260k | bool colocated = metadata.colocated(); |
335 | | |
336 | 3.11M | for (const auto& req : pgsql_write_batch) { |
337 | 3.11M | PgsqlResponsePB* resp = response_->add_pgsql_response_batch(); |
338 | | // Table-level tombstones should not be requested for non-colocated tables. |
339 | 3.11M | if ((req.stmt_type() == PgsqlWriteRequestPB::PGSQL_TRUNCATE_COLOCATED) && !colocated) { |
340 | 0 | LOG(WARNING) << "cannot create table-level tombstone for a non-colocated table"; |
341 | 0 | resp->set_skipped(true); |
342 | 0 | continue; |
343 | 0 | } |
344 | 3.11M | const std::shared_ptr<tablet::TableInfo> table_info = |
345 | 3.11M | VERIFY_RESULT(metadata.GetTableInfo(req.table_id())); |
346 | 3.11M | if (table_info->schema_version != req.schema_version()) { |
347 | 5 | resp->set_status(PgsqlResponsePB::PGSQL_STATUS_SCHEMA_VERSION_MISMATCH); |
348 | 5 | resp->set_error_message( |
349 | 5 | Format("schema version mismatch for table $0: expected $1, got $2", |
350 | 5 | table_info->table_id, |
351 | 5 | table_info->schema_version, |
352 | 5 | req.schema_version())); |
353 | 3.11M | } else { |
354 | 3.11M | if (doc_ops_.empty()) { |
355 | | // Use the value of is_ysql_catalog_table from the first operation in the batch. |
356 | 258k | txn_op_ctx = VERIFY_RESULT(tablet().CreateTransactionOperationContext( |
357 | 258k | request().write_batch().transaction(), |
358 | 258k | table_info->schema->table_properties().is_ysql_catalog_table(), |
359 | 258k | &request().write_batch().subtransaction())); |
360 | 258k | } |
361 | 3.11M | auto write_op = std::make_unique<docdb::PgsqlWriteOperation>( |
362 | 3.11M | req, *table_info->schema, txn_op_ctx); |
363 | 3.11M | RETURN_NOT_OK(write_op->Init(resp)); |
364 | 3.11M | doc_ops_.emplace_back(std::move(write_op)); |
365 | 3.11M | } |
366 | 3.11M | } |
367 | | |
368 | | // All operations have wrong schema version. |
369 | 260k | if (doc_ops_.empty()) { |
370 | 5 | return false; |
371 | 5 | } |
372 | | |
373 | 260k | return true; |
374 | 260k | } |
375 | | |
376 | 1.74M | void WriteQuery::Execute(std::unique_ptr<WriteQuery> query) { |
377 | 1.74M | auto prepare_result = query->PrepareExecute(); |
378 | 1.74M | if (!prepare_result.ok()) { |
379 | 7 | StartSynchronization(std::move(query), prepare_result.status()); |
380 | 7 | return; |
381 | 7 | } |
382 | | |
383 | 1.74M | if (!prepare_result.get()) { |
384 | 1.28k | StartSynchronization(std::move(query), Status::OK()); |
385 | 1.28k | return; |
386 | 1.28k | } |
387 | | |
388 | 1.74M | auto* query_ptr = query.get(); |
389 | 1.74M | query_ptr->self_ = std::move(query); |
390 | 1.74M | auto status = query_ptr->DoExecute(); |
391 | 1.74M | if (!status.ok()) { |
392 | 46 | query_ptr->ExecuteDone(status); |
393 | 46 | } |
394 | 1.74M | } |
395 | | |
396 | 1.73M | CHECKED_STATUS WriteQuery::DoExecute() { |
397 | 1.73M | auto& write_batch = *request().mutable_write_batch(); |
398 | 1.73M | isolation_level_ = VERIFY_RESULT(tablet().GetIsolationLevelFromPB(write_batch)); |
399 | 1.73M | const RowMarkType row_mark_type = GetRowMarkTypeFromPB(write_batch); |
400 | 1.73M | const auto& metadata = *tablet().metadata(); |
401 | | |
402 | 1.73M | const bool transactional_table = metadata.schema()->table_properties().is_transactional() || |
403 | 1.18M | force_txn_path_; |
404 | | |
405 | 1.73M | if (!transactional_table && isolation_level_ != IsolationLevel::NON_TRANSACTIONAL) { |
406 | 0 | YB_LOG_EVERY_N_SECS(DFATAL, 30) |
407 | 0 | << "An attempt to perform a transactional operation on a non-transactional table: " |
408 | 0 | << operation_->ToString(); |
409 | 0 | } |
410 | | |
411 | 1.73M | docdb::PartialRangeKeyIntents partial_range_key_intents(metadata.UsePartialRangeKeyIntents()); |
412 | 1.73M | prepare_result_ = VERIFY_RESULT(docdb::PrepareDocWriteOperation( |
413 | 1.73M | doc_ops_, write_batch.read_pairs(), tablet().metrics()->write_lock_latency, |
414 | 1.73M | isolation_level_, kind(), row_mark_type, transactional_table, |
415 | 1.73M | deadline(), partial_range_key_intents, tablet().shared_lock_manager())); |
416 | | |
417 | 1.73M | auto* transaction_participant = tablet().transaction_participant(); |
418 | 1.73M | if (transaction_participant) { |
419 | 768k | request_scope_ = RequestScope(transaction_participant); |
420 | 768k | } |
421 | | |
422 | 1.73M | if (!tablet().txns_enabled() || !transactional_table) { |
423 | 1.13M | CompleteExecute(); |
424 | 1.13M | return Status::OK(); |
425 | 1.13M | } |
426 | | |
427 | 602k | if (isolation_level_ == IsolationLevel::NON_TRANSACTIONAL) { |
428 | 58.2k | auto now = tablet().clock()->Now(); |
429 | 58.2k | docdb::ResolveOperationConflicts( |
430 | 58.2k | doc_ops_, now, tablet().doc_db(), partial_range_key_intents, |
431 | 58.2k | transaction_participant, tablet().metrics()->transaction_conflicts.get(), |
432 | 58.3k | [this, now](const Result<HybridTime>& result) { |
433 | 58.3k | if (!result.ok()) { |
434 | 3 | ExecuteDone(result.status()); |
435 | 3 | TRACE("InvokeCallback"); |
436 | 3 | return; |
437 | 3 | } |
438 | 58.3k | NonTransactionalConflictsResolved(now, *result); |
439 | 58.3k | TRACE("NonTransactionalConflictsResolved"); |
440 | 58.3k | }); |
441 | 58.2k | return Status::OK(); |
442 | 58.2k | } |
443 | | |
444 | 544k | if (isolation_level_ == IsolationLevel::SERIALIZABLE_ISOLATION && |
445 | 192k | prepare_result_.need_read_snapshot) { |
446 | 30.0k | boost::container::small_vector<RefCntPrefix, 16> paths; |
447 | 30.1k | for (const auto& doc_op : doc_ops_) { |
448 | 30.1k | paths.clear(); |
449 | 30.1k | IsolationLevel ignored_isolation_level; |
450 | 30.1k | RETURN_NOT_OK(doc_op->GetDocPaths( |
451 | 30.1k | docdb::GetDocPathsMode::kLock, &paths, &ignored_isolation_level)); |
452 | 30.1k | for (const auto& path : paths) { |
453 | 30.1k | auto key = path.as_slice(); |
454 | 30.1k | auto* pair = write_batch.mutable_read_pairs()->Add(); |
455 | 30.1k | pair->set_key(key.data(), key.size()); |
456 | | // Empty values are disallowed by docdb. |
457 | | // https://github.com/YugaByte/yugabyte-db/issues/736 |
458 | 30.1k | pair->set_value(std::string(1, docdb::ValueTypeAsChar::kNullLow)); |
459 | 30.1k | write_batch.set_wait_policy(WAIT_ERROR); |
460 | 30.1k | } |
461 | 30.1k | } |
462 | 30.0k | } |
463 | | |
464 | 544k | docdb::ResolveTransactionConflicts( |
465 | 544k | doc_ops_, write_batch, tablet().clock()->Now(), |
466 | 299k | read_time_ ? read_time_.read : HybridTime::kMax, |
467 | 544k | tablet().doc_db(), partial_range_key_intents, |
468 | 544k | transaction_participant, tablet().metrics()->transaction_conflicts.get(), |
469 | 547k | [this](const Result<HybridTime>& result) { |
470 | 547k | if (!result.ok()) { |
471 | 100k | ExecuteDone(result.status()); |
472 | 100k | TRACE("ExecuteDone"); |
473 | 100k | return; |
474 | 100k | } |
475 | 446k | TransactionalConflictsResolved(); |
476 | 446k | TRACE("TransactionalConflictsResolved"); |
477 | 446k | }); |
478 | | |
479 | 544k | return Status::OK(); |
480 | 544k | } |
481 | | |
482 | 58.2k | void WriteQuery::NonTransactionalConflictsResolved(HybridTime now, HybridTime result) { |
483 | 58.2k | if (now != result) { |
484 | 3 | tablet().clock()->Update(result); |
485 | 3 | } |
486 | | |
487 | 58.2k | CompleteExecute(); |
488 | 58.2k | } |
489 | | |
490 | 446k | void WriteQuery::TransactionalConflictsResolved() { |
491 | 446k | auto status = DoTransactionalConflictsResolved(); |
492 | 446k | if (!status.ok()) { |
493 | 0 | LOG(DFATAL) << status; |
494 | 0 | ExecuteDone(status); |
495 | 0 | } |
496 | 446k | } |
497 | | |
498 | 446k | CHECKED_STATUS WriteQuery::DoTransactionalConflictsResolved() { |
499 | 446k | if (!read_time_) { |
500 | 220k | auto safe_time = VERIFY_RESULT(tablet().SafeTime(RequireLease::kTrue)); |
501 | 220k | read_time_ = ReadHybridTime::FromHybridTimeRange( |
502 | 220k | {safe_time, tablet().clock()->NowRange().second}); |
503 | 225k | } else if (prepare_result_.need_read_snapshot && |
504 | 139k | isolation_level_ == IsolationLevel::SERIALIZABLE_ISOLATION) { |
505 | 0 | return STATUS_FORMAT( |
506 | 0 | InvalidArgument, |
507 | 0 | "Read time should NOT be specified for serializable isolation level: $0", |
508 | 0 | read_time_); |
509 | 0 | } |
510 | | |
511 | 446k | CompleteExecute(); |
512 | 446k | return Status::OK(); |
513 | 446k | } |
514 | | |
515 | 1.64M | void WriteQuery::CompleteExecute() { |
516 | 1.64M | ExecuteDone(DoCompleteExecute()); |
517 | 1.64M | } |
518 | | |
519 | 1.64M | CHECKED_STATUS WriteQuery::DoCompleteExecute() { |
520 | 1.64M | auto read_op = prepare_result_.need_read_snapshot |
521 | 1.64M | ? VERIFY_RESULT(ScopedReadOperation::Create(&tablet(), RequireLease::kTrue, read_time_)) |
522 | 1.41M | : ScopedReadOperation(); |
523 | | // Actual read hybrid time used for read-modify-write operation. |
524 | 1.64M | auto real_read_time = prepare_result_.need_read_snapshot |
525 | 227k | ? read_op.read_time() |
526 | | // When need_read_snapshot is false, this time is used only to write TTL field of record. |
527 | 1.41M | : ReadHybridTime::SingleTime(tablet().clock()->Now()); |
528 | | |
529 | | // We expect all read operations for this transaction to be done in AssembleDocWriteBatch. Once |
530 | | // read_txn goes out of scope, the read point is deregistered. |
531 | 1.64M | bool local_limit_updated = false; |
532 | | |
533 | | // This loop may be executed multiple times multiple times only for serializable isolation or |
534 | | // when read_time was not yet picked for snapshot isolation. |
535 | | // In all other cases it is executed only once. |
536 | 1.64M | auto init_marker_behavior = tablet().table_type() == TableType::REDIS_TABLE_TYPE |
537 | 61.5k | ? docdb::InitMarkerBehavior::kRequired |
538 | 1.58M | : docdb::InitMarkerBehavior::kOptional; |
539 | 1.64M | for (;;) { |
540 | 1.64M | RETURN_NOT_OK(docdb::AssembleDocWriteBatch( |
541 | 1.64M | doc_ops_, deadline(), real_read_time, tablet().doc_db(), |
542 | 1.64M | request().mutable_write_batch(), init_marker_behavior, |
543 | 1.64M | tablet().monotonic_counter(), &restart_read_ht_, |
544 | 1.64M | tablet().metadata()->table_name())); |
545 | | |
546 | | // For serializable isolation we don't fix read time, so could do read restart locally, |
547 | | // instead of failing whole transaction. |
548 | 1.64M | if (!restart_read_ht_.is_valid() || !allow_immediate_read_restart_) { |
549 | 1.63M | break; |
550 | 1.63M | } |
551 | | |
552 | 2.61k | real_read_time.read = restart_read_ht_; |
553 | 2.61k | if (!local_limit_updated) { |
554 | 0 | local_limit_updated = true; |
555 | 0 | real_read_time.local_limit = std::min( |
556 | 0 | real_read_time.local_limit, VERIFY_RESULT(tablet().SafeTime(RequireLease::kTrue))); |
557 | 0 | } |
558 | | |
559 | 2.61k | restart_read_ht_ = HybridTime(); |
560 | | |
561 | 2.61k | request().mutable_write_batch()->clear_write_pairs(); |
562 | | |
563 | 0 | for (auto& doc_op : doc_ops_) { |
564 | 0 | doc_op->ClearResponse(); |
565 | 0 | } |
566 | 2.61k | } |
567 | | |
568 | 1.64M | if (allow_immediate_read_restart_ && |
569 | 1.24M | isolation_level_ != IsolationLevel::NON_TRANSACTIONAL && |
570 | 110k | response_) { |
571 | 110k | real_read_time.ToPB(response_->mutable_used_read_time()); |
572 | 110k | } |
573 | | |
574 | 1.64M | if (restart_read_ht_.is_valid()) { |
575 | 0 | return Status::OK(); |
576 | 0 | } |
577 | | |
578 | 1.64M | docdb_locks_ = std::move(prepare_result_.lock_batch); |
579 | | |
580 | 1.64M | return Status::OK(); |
581 | 1.64M | } |
582 | | |
583 | 30.8M | Tablet& WriteQuery::tablet() const { |
584 | 30.8M | return *operation_->tablet(); |
585 | 30.8M | } |
586 | | |
587 | 1.48M | void WriteQuery::AdjustYsqlQueryTransactionality(size_t ysql_batch_size) { |
588 | 1.48M | force_txn_path_ = ysql_batch_size > 0 && tablet().is_sys_catalog(); |
589 | 1.48M | } |
590 | | |
591 | 61.5k | void WriteQuery::RedisExecuteDone(const Status& status) { |
592 | 61.5k | if (!status.ok() || restart_read_ht().is_valid()) { |
593 | 0 | StartSynchronization(std::move(self_), status); |
594 | 0 | return; |
595 | 0 | } |
596 | 61.5k | for (auto& doc_op : doc_ops_) { |
597 | 61.5k | auto* redis_write_operation = down_cast<docdb::RedisWriteOperation*>(doc_op.get()); |
598 | 61.5k | response_->add_redis_response_batch()->Swap(&redis_write_operation->response()); |
599 | 61.5k | } |
600 | | |
601 | 61.5k | StartSynchronization(std::move(self_), Status::OK()); |
602 | 61.5k | } |
603 | | |
604 | 1.27M | void WriteQuery::CqlExecuteDone(const Status& status) { |
605 | 1.27M | if (restart_read_ht().is_valid()) { |
606 | 0 | StartSynchronization(std::move(self_), Status::OK()); |
607 | 0 | return; |
608 | 0 | } |
609 | | |
610 | 1.27M | if (status.ok()) { |
611 | 1.22M | UpdateQLIndexes(); |
612 | 50.8k | } else { |
613 | 50.8k | CompleteQLWriteBatch(status); |
614 | 50.8k | } |
615 | 1.27M | } |
616 | | |
617 | 1.27M | void WriteQuery::CompleteQLWriteBatch(const Status& status) { |
618 | 1.27M | if (!status.ok()) { |
619 | 49.5k | StartSynchronization(std::move(self_), status); |
620 | 49.5k | return; |
621 | 49.5k | } |
622 | | |
623 | 1.22M | bool is_unique_index = tablet().metadata()->is_unique_index(); |
624 | | |
625 | 3.34M | for (auto& doc_op : doc_ops_) { |
626 | 3.34M | std::unique_ptr<docdb::QLWriteOperation> ql_write_op( |
627 | 3.34M | down_cast<docdb::QLWriteOperation*>(doc_op.release())); |
628 | 3.34M | if (is_unique_index && |
629 | 2.95k | ql_write_op->request().type() == QLWriteRequestPB::QL_STMT_INSERT && |
630 | 2.33k | ql_write_op->response()->has_applied() && !ql_write_op->response()->applied()) { |
631 | | // If this is an insert into a unique index and it fails to apply, report duplicate value err. |
632 | 451 | ql_write_op->response()->set_status(QLResponsePB::YQL_STATUS_USAGE_ERROR); |
633 | 451 | ql_write_op->response()->set_error_message( |
634 | 451 | Format("Duplicate value disallowed by unique index $0", |
635 | 451 | tablet().metadata()->table_name())); |
636 | 0 | DVLOG(1) << "Could not apply the given operation " << AsString(ql_write_op->request()) |
637 | 0 | << " due to " << AsString(ql_write_op->response()); |
638 | 3.34M | } else if (ql_write_op->rowblock() != nullptr) { |
639 | | // If the QL write op returns a rowblock, move the op to the transaction state to return the |
640 | | // rows data as a sidecar after the transaction completes. |
641 | 272 | ql_write_ops_.emplace_back(std::move(ql_write_op)); |
642 | 272 | } |
643 | 3.34M | } |
644 | | |
645 | 1.22M | StartSynchronization(std::move(self_), Status::OK()); |
646 | 1.22M | } |
647 | | |
648 | 1.22M | void WriteQuery::UpdateQLIndexes() { |
649 | 1.22M | client::YBClient* client = nullptr; |
650 | 1.22M | client::YBSessionPtr session; |
651 | 1.22M | client::YBTransactionPtr txn; |
652 | 1.22M | IndexOps index_ops; |
653 | 1.22M | const ChildTransactionDataPB* child_transaction_data = nullptr; |
654 | 3.33M | for (auto& doc_op : doc_ops_) { |
655 | 3.33M | auto* write_op = down_cast<docdb::QLWriteOperation*>(doc_op.get()); |
656 | 3.33M | if (write_op->index_requests()->empty()) { |
657 | 3.32M | continue; |
658 | 3.32M | } |
659 | 16.2k | if (!client) { |
660 | 15.8k | client = &tablet().client(); |
661 | 15.8k | session = std::make_shared<client::YBSession>(client); |
662 | 15.8k | session->SetDeadline(deadline()); |
663 | 15.8k | if (write_op->request().has_child_transaction_data()) { |
664 | 14.6k | child_transaction_data = &write_op->request().child_transaction_data(); |
665 | 14.6k | if (!tablet().transaction_manager()) { |
666 | 0 | StartSynchronization( |
667 | 0 | std::move(self_), |
668 | 0 | STATUS(Corruption, "Transaction manager is not present for index update")); |
669 | 0 | return; |
670 | 0 | } |
671 | 14.6k | auto child_data = client::ChildTransactionData::FromPB( |
672 | 14.6k | write_op->request().child_transaction_data()); |
673 | 14.6k | if (!child_data.ok()) { |
674 | 0 | StartSynchronization(std::move(self_), child_data.status()); |
675 | 0 | return; |
676 | 0 | } |
677 | 14.6k | txn = std::make_shared<client::YBTransaction>(tablet().transaction_manager(), *child_data); |
678 | 14.6k | session->SetTransaction(txn); |
679 | 1.23k | } else { |
680 | 1.23k | child_transaction_data = nullptr; |
681 | 1.23k | } |
682 | 436 | } else if (write_op->request().has_child_transaction_data()) { |
683 | 4 | DCHECK_ONLY_NOTNULL(child_transaction_data); |
684 | 4 | DCHECK_EQ(child_transaction_data->ShortDebugString(), |
685 | 4 | write_op->request().child_transaction_data().ShortDebugString()); |
686 | 432 | } else { |
687 | 18.4E | DCHECK(child_transaction_data == nullptr) << |
688 | 18.4E | "Value: " << child_transaction_data->ShortDebugString(); |
689 | 432 | } |
690 | | |
691 | | // Apply the write ops to update the index |
692 | 38.4k | for (auto& pair : *write_op->index_requests()) { |
693 | 38.4k | client::YBTablePtr index_table; |
694 | 38.4k | bool cache_used_ignored = false; |
695 | 38.4k | auto metadata_cache = tablet().YBMetaDataCache(); |
696 | 38.4k | if (!metadata_cache) { |
697 | 0 | StartSynchronization( |
698 | 0 | std::move(self_), |
699 | 0 | STATUS(Corruption, "Table metadata cache is not present for index update")); |
700 | 0 | return; |
701 | 0 | } |
702 | | // TODO create async version of GetTable. |
703 | | // It is ok to have sync call here, because we use cache and it should not take too long. |
704 | 38.4k | auto status = metadata_cache->GetTable(pair.first->table_id(), &index_table, |
705 | 38.4k | &cache_used_ignored); |
706 | 38.4k | if (!status.ok()) { |
707 | 0 | StartSynchronization(std::move(self_), status); |
708 | 0 | return; |
709 | 0 | } |
710 | 38.4k | std::shared_ptr<client::YBqlWriteOp> index_op(index_table->NewQLWrite()); |
711 | 38.4k | index_op->mutable_request()->Swap(&pair.second); |
712 | 38.4k | index_op->mutable_request()->MergeFrom(pair.second); |
713 | 38.4k | session->Apply(index_op); |
714 | 38.4k | index_ops.emplace_back(std::move(index_op), write_op); |
715 | 38.4k | } |
716 | 16.2k | } |
717 | | |
718 | 1.22M | if (!session) { |
719 | 1.20M | CompleteQLWriteBatch(Status::OK()); |
720 | 1.20M | return; |
721 | 1.20M | } |
722 | | |
723 | 18.3k | session->FlushAsync(std::bind( |
724 | 18.3k | &WriteQuery::UpdateQLIndexesFlushed, this, session, txn, std::move(index_ops), _1)); |
725 | 18.3k | } |
726 | | |
727 | | void WriteQuery::UpdateQLIndexesFlushed( |
728 | | const client::YBSessionPtr& session, const client::YBTransactionPtr& txn, |
729 | 15.8k | const IndexOps& index_ops, client::FlushStatus* flush_status) { |
730 | 15.8k | std::unique_ptr<WriteQuery> query(std::move(self_)); |
731 | | |
732 | 15.8k | const auto& status = flush_status->status; |
733 | 15.8k | if (PREDICT_FALSE(!status.ok())) { |
734 | | // When any error occurs during the dispatching of YBOperation, YBSession saves the error and |
735 | | // returns IOError. When it happens, retrieves the errors and discard the IOError. |
736 | 112 | if (status.IsIOError()) { |
737 | 112 | for (const auto& error : flush_status->errors) { |
738 | | // return just the first error seen. |
739 | 112 | Cancel(error->status()); |
740 | 112 | return; |
741 | 112 | } |
742 | 112 | } |
743 | 0 | Cancel(status); |
744 | 0 | return; |
745 | 15.7k | } |
746 | | |
747 | 15.7k | ChildTransactionResultPB child_result; |
748 | 15.7k | if (txn) { |
749 | 14.5k | auto finish_result = txn->FinishChild(); |
750 | 14.5k | if (!finish_result.ok()) { |
751 | 0 | query->Cancel(finish_result.status()); |
752 | 0 | return; |
753 | 0 | } |
754 | 14.5k | child_result = std::move(*finish_result); |
755 | 14.5k | } |
756 | | |
757 | | // Check the responses of the index write ops. |
758 | 38.3k | for (const auto& pair : index_ops) { |
759 | 38.3k | std::shared_ptr<client::YBqlWriteOp> index_op = pair.first; |
760 | 38.3k | auto* response = pair.second->response(); |
761 | 38.3k | DCHECK_ONLY_NOTNULL(response); |
762 | 38.3k | auto* index_response = index_op->mutable_response(); |
763 | | |
764 | 38.3k | if (index_response->status() != QLResponsePB::YQL_STATUS_OK) { |
765 | 0 | DVLOG(1) << "Got status " << index_response->status() << " for " << AsString(index_op); |
766 | 441 | response->set_status(index_response->status()); |
767 | 441 | response->set_error_message(std::move(*index_response->mutable_error_message())); |
768 | 441 | } |
769 | 38.3k | if (txn) { |
770 | 35.4k | *response->mutable_child_transaction_result() = child_result; |
771 | 35.4k | } |
772 | 38.3k | } |
773 | | |
774 | 15.7k | self_ = std::move(query); |
775 | 15.7k | CompleteQLWriteBatch(Status::OK()); |
776 | 15.7k | } |
777 | | |
778 | 261k | void WriteQuery::PgsqlExecuteDone(const Status& status) { |
779 | 261k | if (!status.ok() || restart_read_ht_.is_valid()) { |
780 | 23.0k | StartSynchronization(std::move(self_), status); |
781 | 23.0k | return; |
782 | 23.0k | } |
783 | | |
784 | 3.09M | for (auto& doc_op : doc_ops_) { |
785 | | // We'll need to return the number of rows inserted, updated, or deleted by each operation. |
786 | 3.09M | std::unique_ptr<docdb::PgsqlWriteOperation> pgsql_write_op( |
787 | 3.09M | down_cast<docdb::PgsqlWriteOperation*>(doc_op.release())); |
788 | 3.09M | pgsql_write_ops_.emplace_back(std::move(pgsql_write_op)); |
789 | 3.09M | } |
790 | | |
791 | 238k | StartSynchronization(std::move(self_), Status::OK()); |
792 | 238k | } |
793 | | |
794 | 140k | void WriteQuery::SimpleExecuteDone(const Status& status) { |
795 | 140k | StartSynchronization(std::move(self_), status); |
796 | 140k | } |
797 | | |
798 | | } // namespace tablet |
799 | | } // namespace yb |