/Users/deen/code/yugabyte-db/src/yb/tablet/write_query.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/tablet/write_query.h" |
15 | | |
16 | | #include "yb/client/client.h" |
17 | | #include "yb/client/error.h" |
18 | | #include "yb/client/meta_data_cache.h" |
19 | | #include "yb/client/session.h" |
20 | | #include "yb/client/table.h" |
21 | | #include "yb/client/transaction.h" |
22 | | #include "yb/client/yb_op.h" |
23 | | |
24 | | #include "yb/common/index.h" |
25 | | #include "yb/common/row_mark.h" |
26 | | #include "yb/common/schema.h" |
27 | | |
28 | | #include "yb/docdb/conflict_resolution.h" |
29 | | #include "yb/docdb/cql_operation.h" |
30 | | #include "yb/docdb/doc_write_batch.h" |
31 | | #include "yb/docdb/pgsql_operation.h" |
32 | | #include "yb/docdb/redis_operation.h" |
33 | | |
34 | | #include "yb/tablet/tablet_metadata.h" |
35 | | #include "yb/tablet/operations/write_operation.h" |
36 | | #include "yb/tablet/tablet.h" |
37 | | #include "yb/tablet/tablet_metrics.h" |
38 | | #include "yb/tablet/transaction_participant.h" |
39 | | #include "yb/tablet/write_query_context.h" |
40 | | |
41 | | #include "yb/tserver/tserver.pb.h" |
42 | | |
43 | | #include "yb/util/logging.h" |
44 | | #include "yb/util/metrics.h" |
45 | | #include "yb/util/trace.h" |
46 | | |
47 | | using namespace std::placeholders; |
48 | | |
49 | | namespace yb { |
50 | | namespace tablet { |
51 | | |
52 | | namespace { |
53 | | |
54 | | // Separate Redis / QL / row operations write batches from write_request in preparation for the |
55 | | // write transaction. Leave just the tablet id behind. Return Redis / QL / row operations, etc. |
56 | | // in batch_request. |
57 | 2.93M | void SetupKeyValueBatch(const tserver::WriteRequestPB& client_request, WritePB* out_request) { |
58 | 2.93M | out_request->set_unused_tablet_id(""); // Backward compatibility. |
59 | 2.93M | auto& out_write_batch = *out_request->mutable_write_batch(); |
60 | 2.93M | if (client_request.has_write_batch()) { |
61 | 673k | out_write_batch = client_request.write_batch(); |
62 | 673k | } |
63 | 2.93M | out_write_batch.set_deprecated_may_have_metadata(true); |
64 | 2.93M | if (client_request.has_request_id()) { |
65 | 2.52M | out_request->set_client_id1(client_request.client_id1()); |
66 | 2.52M | out_request->set_client_id2(client_request.client_id2()); |
67 | 2.52M | out_request->set_request_id(client_request.request_id()); |
68 | 2.52M | out_request->set_min_running_request_id(client_request.min_running_request_id()); |
69 | 2.52M | } |
70 | 2.93M | out_request->set_batch_idx(client_request.batch_idx()); |
71 | | // Actually, in production code, we could check for external hybrid time only when there are |
72 | | // no ql, pgsql, redis operations. |
73 | | // But in CDCServiceTest we have ql write batch with external time. |
74 | 2.93M | if (client_request.has_external_hybrid_time()) { |
75 | 1.87k | out_request->set_external_hybrid_time(client_request.external_hybrid_time()); |
76 | 1.87k | } |
77 | 2.93M | } |
78 | | |
79 | | } // namespace |
80 | | |
81 | | enum class WriteQuery::ExecuteMode { |
82 | | kSimple, |
83 | | kRedis, |
84 | | kCql, |
85 | | kPgsql, |
86 | | }; |
87 | | |
88 | | WriteQuery::WriteQuery( |
89 | | int64_t term, |
90 | | CoarseTimePoint deadline, |
91 | | WriteQueryContext* context, |
92 | | Tablet* tablet, |
93 | | tserver::WriteResponsePB* response, |
94 | | docdb::OperationKind kind) |
95 | | : operation_(std::make_unique<WriteOperation>(tablet)), |
96 | | term_(term), deadline_(deadline), |
97 | | context_(context), |
98 | | response_(response), |
99 | | kind_(kind), |
100 | 3.26M | start_time_(CoarseMonoClock::Now()) { |
101 | 3.26M | } |
102 | | |
103 | 11.9M | WritePB& WriteQuery::request() { |
104 | 11.9M | return *operation_->mutable_request(); |
105 | 11.9M | } |
106 | | |
107 | 3.11M | std::unique_ptr<WriteOperation> WriteQuery::PrepareSubmit() { |
108 | 3.11M | operation_->set_completion_callback( |
109 | 3.11M | [operation = operation_.get(), query = this](const Status& status) { |
110 | 3.11M | std::unique_ptr<WriteQuery> query_holder(query); |
111 | 3.11M | query->Finished(operation, status); |
112 | 3.11M | }); |
113 | 3.11M | return std::move(operation_); |
114 | 3.11M | } |
115 | | |
116 | 3.26M | void WriteQuery::DoStartSynchronization(const Status& status) { |
117 | 3.26M | std::unique_ptr<WriteQuery> self(this); |
118 | | // Move submit_token_ so it is released after this function. |
119 | 3.26M | ScopedRWOperation submit_token(std::move(submit_token_)); |
120 | | // If a restart read is required, then we return this fact to caller and don't perform the write |
121 | | // operation. |
122 | 3.26M | if (status.ok() && restart_read_ht_.is_valid()3.11M ) { |
123 | 0 | auto restart_time = response()->mutable_restart_read_time(); |
124 | 0 | restart_time->set_read_ht(restart_read_ht_.ToUint64()); |
125 | 0 | auto local_limit = context_->ReportReadRestart(); |
126 | 0 | if (!local_limit.ok()) { |
127 | 0 | Cancel(local_limit.status()); |
128 | 0 | return; |
129 | 0 | } |
130 | 0 | restart_time->set_deprecated_max_of_read_time_and_local_limit_ht(local_limit->ToUint64()); |
131 | 0 | restart_time->set_local_limit_ht(local_limit->ToUint64()); |
132 | | // Global limit is ignored by caller, so we don't set it. |
133 | 0 | Cancel(Status::OK()); |
134 | 0 | return; |
135 | 0 | } |
136 | | |
137 | 3.26M | if (!status.ok()) { |
138 | 146k | Cancel(status); |
139 | 146k | return; |
140 | 146k | } |
141 | | |
142 | 3.11M | context_->Submit(self.release()->PrepareSubmit(), term_); |
143 | 3.11M | } |
144 | | |
145 | 3.26M | void WriteQuery::Release() { |
146 | | // Free DocDB multi-level locks. |
147 | 3.26M | docdb_locks_.Reset(); |
148 | 3.26M | } |
149 | | |
150 | 3.26M | WriteQuery::~WriteQuery() { |
151 | 3.26M | } |
152 | | |
153 | 2.95M | void WriteQuery::set_client_request(std::reference_wrapper<const tserver::WriteRequestPB> req) { |
154 | 2.95M | client_request_ = &req.get(); |
155 | 2.95M | read_time_ = ReadHybridTime::FromReadTimePB(req.get()); |
156 | 2.95M | allow_immediate_read_restart_ = !read_time_; |
157 | 2.95M | } |
158 | | |
159 | 0 | void WriteQuery::set_client_request(std::unique_ptr<tserver::WriteRequestPB> req) { |
160 | 0 | set_client_request(*req); |
161 | 0 | client_request_holder_ = std::move(req); |
162 | 0 | } |
163 | | |
164 | 3.11M | void WriteQuery::Finished(WriteOperation* operation, const Status& status) { |
165 | 3.11M | LOG_IF(DFATAL, operation_) << "Finished not submitted operation: " << status1.31k ; |
166 | | |
167 | 3.11M | if (status.ok()) { |
168 | 3.08M | TabletMetrics* metrics = operation->tablet()->metrics(); |
169 | 3.08M | if (metrics) { |
170 | 3.08M | auto op_duration_usec = MonoDelta(CoarseMonoClock::now() - start_time_).ToMicroseconds(); |
171 | 3.08M | metrics->write_op_duration_client_propagated_consistency->Increment(op_duration_usec); |
172 | 3.08M | } |
173 | 3.08M | } |
174 | | |
175 | 3.11M | Complete(status); |
176 | 3.11M | } |
177 | | |
178 | 146k | void WriteQuery::Cancel(const Status& status) { |
179 | 146k | LOG_IF(DFATAL, !operation_) << "Cancelled submitted operation: " << status75 ; |
180 | | |
181 | 146k | Complete(status); |
182 | 146k | } |
183 | | |
184 | 3.26M | void WriteQuery::Complete(const Status& status) { |
185 | 3.26M | Release(); |
186 | | |
187 | 3.26M | if (callback_) { |
188 | 3.16M | callback_(status); |
189 | 3.16M | } |
190 | 3.26M | } |
191 | | |
192 | 3.24M | void WriteQuery::ExecuteDone(const Status& status) { |
193 | 3.24M | scoped_read_operation_.Reset(); |
194 | 3.24M | switch (execute_mode_) { |
195 | 314k | case ExecuteMode::kSimple: |
196 | 314k | SimpleExecuteDone(status); |
197 | 314k | return; |
198 | 123k | case ExecuteMode::kRedis: |
199 | 123k | RedisExecuteDone(status); |
200 | 123k | return; |
201 | 2.11M | case ExecuteMode::kCql: |
202 | 2.11M | CqlExecuteDone(status); |
203 | 2.11M | return; |
204 | 695k | case ExecuteMode::kPgsql: |
205 | 695k | PgsqlExecuteDone(status); |
206 | 695k | return; |
207 | 3.24M | } |
208 | 0 | FATAL_INVALID_ENUM_VALUE(ExecuteMode, execute_mode_); |
209 | 0 | } |
210 | | |
211 | 3.24M | Result<bool> WriteQuery::PrepareExecute() { |
212 | 3.24M | if (client_request_) { |
213 | 2.93M | auto* request = operation().AllocateRequest(); |
214 | 2.93M | SetupKeyValueBatch(*client_request_, request); |
215 | | |
216 | 2.93M | if (!client_request_->redis_write_batch().empty()) { |
217 | 123k | return RedisPrepareExecute(); |
218 | 123k | } |
219 | | |
220 | 2.81M | if (!client_request_->ql_write_batch().empty()) { |
221 | 2.11M | return CqlPrepareExecute(); |
222 | 2.11M | } |
223 | | |
224 | 695k | if (!client_request_->pgsql_write_batch().empty()) { |
225 | 692k | return PgsqlPrepareExecute(); |
226 | 692k | } |
227 | | |
228 | 2.94k | if (client_request_->has_write_batch() && client_request_->has_external_hybrid_time()0 ) { |
229 | 0 | return false; |
230 | 0 | } |
231 | 314k | } else { |
232 | 314k | const auto* request = operation().request(); |
233 | 314k | if (request && request->has_write_batch()314k && !request->write_batch().read_pairs().empty()314k ) { |
234 | 314k | return SimplePrepareExecute(); |
235 | 314k | } |
236 | 314k | } |
237 | | |
238 | | // Empty write should not happen, but we could handle it. |
239 | | // Just report it as error in release mode. |
240 | 3.30k | LOG(DFATAL) << "Empty write: " << AsString(client_request_) << ", " << AsString(request()); |
241 | | |
242 | 3.30k | return STATUS(InvalidArgument, "Empty write"); |
243 | 3.24M | } |
244 | | |
245 | 3.24M | CHECKED_STATUS WriteQuery::InitExecute(ExecuteMode mode) { |
246 | 3.24M | scoped_read_operation_ = tablet().CreateNonAbortableScopedRWOperation(); |
247 | 3.24M | if (!scoped_read_operation_.ok()) { |
248 | 99 | return MoveStatus(scoped_read_operation_); |
249 | 99 | } |
250 | 3.24M | execute_mode_ = mode; |
251 | 3.24M | return Status::OK(); |
252 | 3.24M | } |
253 | | |
254 | 123k | Result<bool> WriteQuery::RedisPrepareExecute() { |
255 | 123k | RETURN_NOT_OK(InitExecute(ExecuteMode::kRedis)); |
256 | | |
257 | | // Since we take exclusive locks, it's okay to use Now as the read TS for writes. |
258 | 123k | const auto& redis_write_batch = client_request_->redis_write_batch(); |
259 | | |
260 | 123k | doc_ops_.reserve(redis_write_batch.size()); |
261 | 123k | for (const auto& redis_request : redis_write_batch) { |
262 | 123k | doc_ops_.emplace_back(new docdb::RedisWriteOperation(redis_request)); |
263 | 123k | } |
264 | | |
265 | 123k | return true; |
266 | 123k | } |
267 | | |
268 | 314k | Result<bool> WriteQuery::SimplePrepareExecute() { |
269 | 314k | RETURN_NOT_OK(InitExecute(ExecuteMode::kSimple)); |
270 | 314k | return true; |
271 | 314k | } |
272 | | |
273 | 2.11M | Result<bool> WriteQuery::CqlPrepareExecute() { |
274 | 2.11M | RETURN_NOT_OK(InitExecute(ExecuteMode::kCql)); |
275 | | |
276 | 2.11M | auto& metadata = *tablet().metadata(); |
277 | 18.4E | DVLOG(2) << "Schema version for " << metadata.table_name() << ": " << metadata.schema_version(); |
278 | | |
279 | 2.11M | const auto& ql_write_batch = client_request_->ql_write_batch(); |
280 | | |
281 | 2.11M | doc_ops_.reserve(ql_write_batch.size()); |
282 | | |
283 | 2.11M | auto txn_op_ctx = VERIFY_RESULT(tablet().CreateTransactionOperationContext( |
284 | 2.11M | request().write_batch().transaction(), |
285 | 2.11M | /* is_ysql_catalog_table */ false, |
286 | 2.11M | &request().write_batch().subtransaction())); |
287 | 0 | auto table_info = metadata.primary_table_info(); |
288 | 4.51M | for (const auto& req : ql_write_batch) { |
289 | 4.51M | QLResponsePB* resp = response_->add_ql_response_batch(); |
290 | 4.51M | if (!IsSchemaVersionCompatible( |
291 | 4.51M | table_info->schema_version, req.schema_version(), |
292 | 4.51M | req.is_compatible_with_previous_version())) { |
293 | 18.4E | DVLOG(1) << " On " << table_info->table_name |
294 | 18.4E | << " Setting status for write as YQL_STATUS_SCHEMA_VERSION_MISMATCH tserver's: " |
295 | 18.4E | << table_info->schema_version << " vs req's : " << req.schema_version() |
296 | 18.4E | << " is req compatible with prev version: " |
297 | 18.4E | << req.is_compatible_with_previous_version() << " for " << AsString(req); |
298 | 1.55k | resp->set_status(QLResponsePB::YQL_STATUS_SCHEMA_VERSION_MISMATCH); |
299 | 1.55k | resp->set_error_message(Format( |
300 | 1.55k | "schema version mismatch for table $0: expected $1, got $2 (compt with prev: $3)", |
301 | 1.55k | table_info->table_id, |
302 | 1.55k | table_info->schema_version, req.schema_version(), |
303 | 1.55k | req.is_compatible_with_previous_version())); |
304 | 4.51M | } else { |
305 | 4.51M | DVLOG(3) << "Version matches : " << table_info->schema_version << " for " |
306 | 3.27k | << AsString(req); |
307 | 4.51M | auto write_op = std::make_unique<docdb::QLWriteOperation>( |
308 | 4.51M | req, std::shared_ptr<Schema>(table_info, table_info->schema.get()), |
309 | 4.51M | *table_info->index_map, tablet().unique_index_key_schema(), |
310 | 4.51M | txn_op_ctx); |
311 | 4.51M | RETURN_NOT_OK(write_op->Init(resp)); |
312 | 4.51M | doc_ops_.emplace_back(std::move(write_op)); |
313 | 4.51M | } |
314 | 4.51M | } |
315 | | |
316 | | // All operations has wrong schema version |
317 | 2.11M | if (doc_ops_.empty()) { |
318 | 1.27k | return false; |
319 | 1.27k | } |
320 | | |
321 | 2.11M | return true; |
322 | 2.11M | } |
323 | | |
324 | 694k | Result<bool> WriteQuery::PgsqlPrepareExecute() { |
325 | 694k | RETURN_NOT_OK(InitExecute(ExecuteMode::kPgsql)); |
326 | | |
327 | 694k | const auto& pgsql_write_batch = client_request_->pgsql_write_batch(); |
328 | | |
329 | 694k | doc_ops_.reserve(pgsql_write_batch.size()); |
330 | | |
331 | 694k | TransactionOperationContext txn_op_ctx; |
332 | | |
333 | 694k | auto& metadata = *tablet().metadata(); |
334 | 694k | bool colocated = metadata.colocated(); |
335 | | |
336 | 12.8M | for (const auto& req : pgsql_write_batch) { |
337 | 12.8M | PgsqlResponsePB* resp = response_->add_pgsql_response_batch(); |
338 | | // Table-level tombstones should not be requested for non-colocated tables. |
339 | 12.8M | if ((req.stmt_type() == PgsqlWriteRequestPB::PGSQL_TRUNCATE_COLOCATED) && !colocated90 ) { |
340 | 0 | LOG(WARNING) << "cannot create table-level tombstone for a non-colocated table"; |
341 | 0 | resp->set_skipped(true); |
342 | 0 | continue; |
343 | 0 | } |
344 | 12.8M | const std::shared_ptr<tablet::TableInfo> table_info = |
345 | 12.8M | VERIFY_RESULT(metadata.GetTableInfo(req.table_id())); |
346 | 12.8M | if (table_info->schema_version != req.schema_version()) { |
347 | 22 | resp->set_status(PgsqlResponsePB::PGSQL_STATUS_SCHEMA_VERSION_MISMATCH); |
348 | 22 | resp->set_error_message( |
349 | 22 | Format("schema version mismatch for table $0: expected $1, got $2", |
350 | 22 | table_info->table_id, |
351 | 22 | table_info->schema_version, |
352 | 22 | req.schema_version())); |
353 | 12.8M | } else { |
354 | 12.8M | if (doc_ops_.empty()) { |
355 | | // Use the value of is_ysql_catalog_table from the first operation in the batch. |
356 | 693k | txn_op_ctx = VERIFY_RESULT(tablet().CreateTransactionOperationContext( |
357 | 693k | request().write_batch().transaction(), |
358 | 693k | table_info->schema->table_properties().is_ysql_catalog_table(), |
359 | 693k | &request().write_batch().subtransaction())); |
360 | 693k | } |
361 | 12.8M | auto write_op = std::make_unique<docdb::PgsqlWriteOperation>( |
362 | 12.8M | req, *table_info->schema, txn_op_ctx); |
363 | 12.8M | RETURN_NOT_OK(write_op->Init(resp)); |
364 | 12.8M | doc_ops_.emplace_back(std::move(write_op)); |
365 | 12.8M | } |
366 | 12.8M | } |
367 | | |
368 | | // All operations have wrong schema version. |
369 | 694k | if (doc_ops_.empty()) { |
370 | 22 | return false; |
371 | 22 | } |
372 | | |
373 | 694k | return true; |
374 | 694k | } |
375 | | |
376 | 3.24M | void WriteQuery::Execute(std::unique_ptr<WriteQuery> query) { |
377 | 3.24M | auto prepare_result = query->PrepareExecute(); |
378 | 3.24M | if (!prepare_result.ok()) { |
379 | 99 | StartSynchronization(std::move(query), prepare_result.status()); |
380 | 99 | return; |
381 | 99 | } |
382 | | |
383 | 3.24M | if (!prepare_result.get()) { |
384 | 1.29k | StartSynchronization(std::move(query), Status::OK()); |
385 | 1.29k | return; |
386 | 1.29k | } |
387 | | |
388 | 3.24M | auto* query_ptr = query.get(); |
389 | 3.24M | query_ptr->self_ = std::move(query); |
390 | 3.24M | auto status = query_ptr->DoExecute(); |
391 | 3.24M | if (!status.ok()) { |
392 | 711 | query_ptr->ExecuteDone(status); |
393 | 711 | } |
394 | 3.24M | } |
395 | | |
396 | 3.24M | CHECKED_STATUS WriteQuery::DoExecute() { |
397 | 3.24M | auto& write_batch = *request().mutable_write_batch(); |
398 | 3.24M | isolation_level_ = VERIFY_RESULT3.24M (3.24M tablet().GetIsolationLevelFromPB(write_batch)); |
399 | 0 | const RowMarkType row_mark_type = GetRowMarkTypeFromPB(write_batch); |
400 | 3.24M | const auto& metadata = *tablet().metadata(); |
401 | | |
402 | 3.24M | const bool transactional_table = metadata.schema()->table_properties().is_transactional() || |
403 | 3.24M | force_txn_path_2.27M ; |
404 | | |
405 | 3.24M | if (!transactional_table && isolation_level_ != IsolationLevel::NON_TRANSACTIONAL2.09M ) { |
406 | 0 | YB_LOG_EVERY_N_SECS(DFATAL, 30) |
407 | 0 | << "An attempt to perform a transactional operation on a non-transactional table: " |
408 | 0 | << operation_->ToString(); |
409 | 0 | } |
410 | | |
411 | 3.24M | docdb::PartialRangeKeyIntents partial_range_key_intents(metadata.UsePartialRangeKeyIntents()); |
412 | 3.24M | prepare_result_ = VERIFY_RESULT3.24M (3.24M docdb::PrepareDocWriteOperation( |
413 | 0 | doc_ops_, write_batch.read_pairs(), tablet().metrics()->write_lock_latency, |
414 | 0 | isolation_level_, kind(), row_mark_type, transactional_table, |
415 | 0 | deadline(), partial_range_key_intents, tablet().shared_lock_manager())); |
416 | | |
417 | 0 | auto* transaction_participant = tablet().transaction_participant(); |
418 | 3.24M | if (transaction_participant) { |
419 | 1.39M | request_scope_ = RequestScope(transaction_participant); |
420 | 1.39M | } |
421 | | |
422 | 3.24M | if (!tablet().txns_enabled() || !transactional_table3.09M ) { |
423 | 2.13M | CompleteExecute(); |
424 | 2.13M | return Status::OK(); |
425 | 2.13M | } |
426 | | |
427 | 1.10M | if (isolation_level_ == IsolationLevel::NON_TRANSACTIONAL) { |
428 | 122k | auto now = tablet().clock()->Now(); |
429 | 122k | docdb::ResolveOperationConflicts( |
430 | 122k | doc_ops_, now, tablet().doc_db(), partial_range_key_intents, |
431 | 122k | transaction_participant, tablet().metrics()->transaction_conflicts.get(), |
432 | 122k | [this, now](const Result<HybridTime>& result) { |
433 | 122k | if (!result.ok()) { |
434 | 3 | ExecuteDone(result.status()); |
435 | 3 | TRACE("InvokeCallback"); |
436 | 3 | return; |
437 | 3 | } |
438 | 122k | NonTransactionalConflictsResolved(now, *result); |
439 | 122k | TRACE("NonTransactionalConflictsResolved"); |
440 | 122k | }); |
441 | 122k | return Status::OK(); |
442 | 122k | } |
443 | | |
444 | 983k | if (isolation_level_ == IsolationLevel::SERIALIZABLE_ISOLATION && |
445 | 983k | prepare_result_.need_read_snapshot437k ) { |
446 | 72.8k | boost::container::small_vector<RefCntPrefix, 16> paths; |
447 | 73.0k | for (const auto& doc_op : doc_ops_) { |
448 | 73.0k | paths.clear(); |
449 | 73.0k | IsolationLevel ignored_isolation_level; |
450 | 73.0k | RETURN_NOT_OK(doc_op->GetDocPaths( |
451 | 73.0k | docdb::GetDocPathsMode::kLock, &paths, &ignored_isolation_level)); |
452 | 73.0k | for (const auto& path : paths)73.0k { |
453 | 73.0k | auto key = path.as_slice(); |
454 | 73.0k | auto* pair = write_batch.mutable_read_pairs()->Add(); |
455 | 73.0k | pair->set_key(key.data(), key.size()); |
456 | | // Empty values are disallowed by docdb. |
457 | | // https://github.com/YugaByte/yugabyte-db/issues/736 |
458 | 73.0k | pair->set_value(std::string(1, docdb::ValueTypeAsChar::kNullLow)); |
459 | 73.0k | write_batch.set_wait_policy(WAIT_ERROR); |
460 | 73.0k | } |
461 | 73.0k | } |
462 | 72.8k | } |
463 | | |
464 | 983k | docdb::ResolveTransactionConflicts( |
465 | 983k | doc_ops_, write_batch, tablet().clock()->Now(), |
466 | 983k | read_time_ ? read_time_.read495k : HybridTime::kMax488k , |
467 | 983k | tablet().doc_db(), partial_range_key_intents, |
468 | 983k | transaction_participant, tablet().metrics()->transaction_conflicts.get(), |
469 | 987k | [this](const Result<HybridTime>& result) { |
470 | 987k | if (!result.ok()) { |
471 | 129k | ExecuteDone(result.status()); |
472 | 129k | TRACE("ExecuteDone"); |
473 | 129k | return; |
474 | 129k | } |
475 | 858k | TransactionalConflictsResolved(); |
476 | 858k | TRACE("TransactionalConflictsResolved"); |
477 | 858k | }); |
478 | | |
479 | 983k | return Status::OK(); |
480 | 983k | } |
481 | | |
482 | 122k | void WriteQuery::NonTransactionalConflictsResolved(HybridTime now, HybridTime result) { |
483 | 122k | if (now != result) { |
484 | 10 | tablet().clock()->Update(result); |
485 | 10 | } |
486 | | |
487 | 122k | CompleteExecute(); |
488 | 122k | } |
489 | | |
490 | 858k | void WriteQuery::TransactionalConflictsResolved() { |
491 | 858k | auto status = DoTransactionalConflictsResolved(); |
492 | 858k | if (!status.ok()) { |
493 | 0 | LOG(DFATAL) << status; |
494 | 0 | ExecuteDone(status); |
495 | 0 | } |
496 | 858k | } |
497 | | |
498 | 858k | CHECKED_STATUS WriteQuery::DoTransactionalConflictsResolved() { |
499 | 858k | if (!read_time_) { |
500 | 427k | auto safe_time = VERIFY_RESULT(tablet().SafeTime(RequireLease::kTrue)); |
501 | 0 | read_time_ = ReadHybridTime::FromHybridTimeRange( |
502 | 427k | {safe_time, tablet().clock()->NowRange().second}); |
503 | 430k | } else if (prepare_result_.need_read_snapshot && |
504 | 430k | isolation_level_ == IsolationLevel::SERIALIZABLE_ISOLATION359k ) { |
505 | 0 | return STATUS_FORMAT( |
506 | 0 | InvalidArgument, |
507 | 0 | "Read time should NOT be specified for serializable isolation level: $0", |
508 | 0 | read_time_); |
509 | 0 | } |
510 | | |
511 | 858k | CompleteExecute(); |
512 | 858k | return Status::OK(); |
513 | 858k | } |
514 | | |
515 | 3.11M | void WriteQuery::CompleteExecute() { |
516 | 3.11M | ExecuteDone(DoCompleteExecute()); |
517 | 3.11M | } |
518 | | |
519 | 3.11M | CHECKED_STATUS WriteQuery::DoCompleteExecute() { |
520 | 3.11M | auto read_op = prepare_result_.need_read_snapshot |
521 | 3.11M | ? VERIFY_RESULT(ScopedReadOperation::Create(&tablet(), RequireLease::kTrue, read_time_)) |
522 | 3.11M | : ScopedReadOperation()2.51M ; |
523 | | // Actual read hybrid time used for read-modify-write operation. |
524 | 3.11M | auto real_read_time = prepare_result_.need_read_snapshot |
525 | 3.11M | ? read_op.read_time()596k |
526 | | // When need_read_snapshot is false, this time is used only to write TTL field of record. |
527 | 3.11M | : ReadHybridTime::SingleTime(tablet().clock()->Now())2.52M ; |
528 | | |
529 | | // We expect all read operations for this transaction to be done in AssembleDocWriteBatch. Once |
530 | | // read_txn goes out of scope, the read point is deregistered. |
531 | 3.11M | bool local_limit_updated = false; |
532 | | |
533 | | // This loop may be executed multiple times multiple times only for serializable isolation or |
534 | | // when read_time was not yet picked for snapshot isolation. |
535 | | // In all other cases it is executed only once. |
536 | 3.11M | auto init_marker_behavior = tablet().table_type() == TableType::REDIS_TABLE_TYPE |
537 | 3.11M | ? docdb::InitMarkerBehavior::kRequired123k |
538 | 3.11M | : docdb::InitMarkerBehavior::kOptional2.99M ; |
539 | 3.11M | for (;;) { |
540 | 3.11M | RETURN_NOT_OK(docdb::AssembleDocWriteBatch( |
541 | 3.11M | doc_ops_, deadline(), real_read_time, tablet().doc_db(), |
542 | 3.11M | request().mutable_write_batch(), init_marker_behavior, |
543 | 3.11M | tablet().monotonic_counter(), &restart_read_ht_, |
544 | 3.11M | tablet().metadata()->table_name())); |
545 | | |
546 | | // For serializable isolation we don't fix read time, so could do read restart locally, |
547 | | // instead of failing whole transaction. |
548 | 3.11M | if (!restart_read_ht_.is_valid() || !allow_immediate_read_restart_0 ) { |
549 | 3.11M | break; |
550 | 3.11M | } |
551 | | |
552 | 2.62k | real_read_time.read = restart_read_ht_; |
553 | 2.62k | if (!local_limit_updated) { |
554 | 0 | local_limit_updated = true; |
555 | 0 | real_read_time.local_limit = std::min( |
556 | 0 | real_read_time.local_limit, VERIFY_RESULT(tablet().SafeTime(RequireLease::kTrue))); |
557 | 0 | } |
558 | | |
559 | 2.62k | restart_read_ht_ = HybridTime(); |
560 | | |
561 | 2.62k | request().mutable_write_batch()->clear_write_pairs(); |
562 | | |
563 | 2.62k | for (auto& doc_op : doc_ops_) { |
564 | 0 | doc_op->ClearResponse(); |
565 | 0 | } |
566 | 2.62k | } |
567 | | |
568 | 3.11M | if (allow_immediate_read_restart_ && |
569 | 3.11M | isolation_level_ != IsolationLevel::NON_TRANSACTIONAL2.27M && |
570 | 3.11M | response_175k ) { |
571 | 175k | real_read_time.ToPB(response_->mutable_used_read_time()); |
572 | 175k | } |
573 | | |
574 | 3.11M | if (restart_read_ht_.is_valid()) { |
575 | 0 | return Status::OK(); |
576 | 0 | } |
577 | | |
578 | 3.11M | docdb_locks_ = std::move(prepare_result_.lock_batch); |
579 | | |
580 | 3.11M | return Status::OK(); |
581 | 3.11M | } |
582 | | |
583 | 55.7M | Tablet& WriteQuery::tablet() const { |
584 | 55.7M | return *operation_->tablet(); |
585 | 55.7M | } |
586 | | |
587 | 2.87M | void WriteQuery::AdjustYsqlQueryTransactionality(size_t ysql_batch_size) { |
588 | 2.87M | force_txn_path_ = ysql_batch_size > 0 && tablet().is_sys_catalog()1.00M ; |
589 | 2.87M | } |
590 | | |
591 | 123k | void WriteQuery::RedisExecuteDone(const Status& status) { |
592 | 123k | if (!status.ok() || restart_read_ht().is_valid()) { |
593 | 0 | StartSynchronization(std::move(self_), status); |
594 | 0 | return; |
595 | 0 | } |
596 | 123k | for (auto& doc_op : doc_ops_)123k { |
597 | 123k | auto* redis_write_operation = down_cast<docdb::RedisWriteOperation*>(doc_op.get()); |
598 | 123k | response_->add_redis_response_batch()->Swap(&redis_write_operation->response()); |
599 | 123k | } |
600 | | |
601 | 123k | StartSynchronization(std::move(self_), Status::OK()); |
602 | 123k | } |
603 | | |
604 | 2.11M | void WriteQuery::CqlExecuteDone(const Status& status) { |
605 | 2.11M | if (restart_read_ht().is_valid()) { |
606 | 0 | StartSynchronization(std::move(self_), Status::OK()); |
607 | 0 | return; |
608 | 0 | } |
609 | | |
610 | 2.11M | if (status.ok()) { |
611 | 2.08M | UpdateQLIndexes(); |
612 | 2.08M | } else { |
613 | 26.9k | CompleteQLWriteBatch(status); |
614 | 26.9k | } |
615 | 2.11M | } |
616 | | |
617 | 2.11M | void WriteQuery::CompleteQLWriteBatch(const Status& status) { |
618 | 2.11M | if (!status.ok()) { |
619 | 24.8k | StartSynchronization(std::move(self_), status); |
620 | 24.8k | return; |
621 | 24.8k | } |
622 | | |
623 | 2.08M | bool is_unique_index = tablet().metadata()->is_unique_index(); |
624 | | |
625 | 4.49M | for (auto& doc_op : doc_ops_) { |
626 | 4.49M | std::unique_ptr<docdb::QLWriteOperation> ql_write_op( |
627 | 4.49M | down_cast<docdb::QLWriteOperation*>(doc_op.release())); |
628 | 4.49M | if (is_unique_index && |
629 | 4.49M | ql_write_op->request().type() == QLWriteRequestPB::QL_STMT_INSERT2.90k && |
630 | 4.49M | ql_write_op->response()->has_applied()2.30k && !ql_write_op->response()->applied()451 ) { |
631 | | // If this is an insert into a unique index and it fails to apply, report duplicate value err. |
632 | 451 | ql_write_op->response()->set_status(QLResponsePB::YQL_STATUS_USAGE_ERROR); |
633 | 451 | ql_write_op->response()->set_error_message( |
634 | 451 | Format("Duplicate value disallowed by unique index $0", |
635 | 451 | tablet().metadata()->table_name())); |
636 | 451 | DVLOG(1) << "Could not apply the given operation " << AsString(ql_write_op->request()) |
637 | 0 | << " due to " << AsString(ql_write_op->response()); |
638 | 4.49M | } else if (ql_write_op->rowblock() != nullptr) { |
639 | | // If the QL write op returns a rowblock, move the op to the transaction state to return the |
640 | | // rows data as a sidecar after the transaction completes. |
641 | 273 | ql_write_ops_.emplace_back(std::move(ql_write_op)); |
642 | 273 | } |
643 | 4.49M | } |
644 | | |
645 | 2.08M | StartSynchronization(std::move(self_), Status::OK()); |
646 | 2.08M | } |
647 | | |
648 | 2.08M | void WriteQuery::UpdateQLIndexes() { |
649 | 2.08M | client::YBClient* client = nullptr; |
650 | 2.08M | client::YBSessionPtr session; |
651 | 2.08M | client::YBTransactionPtr txn; |
652 | 2.08M | IndexOps index_ops; |
653 | 2.08M | const ChildTransactionDataPB* child_transaction_data = nullptr; |
654 | 4.48M | for (auto& doc_op : doc_ops_) { |
655 | 4.48M | auto* write_op = down_cast<docdb::QLWriteOperation*>(doc_op.get()); |
656 | 4.48M | if (write_op->index_requests()->empty()) { |
657 | 4.47M | continue; |
658 | 4.47M | } |
659 | 15.1k | if (14.0k !client14.0k ) { |
660 | 15.1k | client = &tablet().client(); |
661 | 15.1k | session = std::make_shared<client::YBSession>(client); |
662 | 15.1k | session->SetDeadline(deadline()); |
663 | 15.1k | if (write_op->request().has_child_transaction_data()) { |
664 | 13.8k | child_transaction_data = &write_op->request().child_transaction_data(); |
665 | 13.8k | if (!tablet().transaction_manager()) { |
666 | 0 | StartSynchronization( |
667 | 0 | std::move(self_), |
668 | 0 | STATUS(Corruption, "Transaction manager is not present for index update")); |
669 | 0 | return; |
670 | 0 | } |
671 | 13.8k | auto child_data = client::ChildTransactionData::FromPB( |
672 | 13.8k | write_op->request().child_transaction_data()); |
673 | 13.8k | if (!child_data.ok()) { |
674 | 0 | StartSynchronization(std::move(self_), child_data.status()); |
675 | 0 | return; |
676 | 0 | } |
677 | 13.8k | txn = std::make_shared<client::YBTransaction>(tablet().transaction_manager(), *child_data); |
678 | 13.8k | session->SetTransaction(txn); |
679 | 13.8k | } else { |
680 | 1.24k | child_transaction_data = nullptr; |
681 | 1.24k | } |
682 | 18.4E | } else if (write_op->request().has_child_transaction_data()) { |
683 | 4 | DCHECK_ONLY_NOTNULL(child_transaction_data); |
684 | 4 | DCHECK_EQ(child_transaction_data->ShortDebugString(), |
685 | 4 | write_op->request().child_transaction_data().ShortDebugString()); |
686 | 18.4E | } else { |
687 | 18.4E | DCHECK(child_transaction_data == nullptr) << |
688 | 18.4E | "Value: " << child_transaction_data->ShortDebugString(); |
689 | 18.4E | } |
690 | | |
691 | | // Apply the write ops to update the index |
692 | 36.2k | for (auto& pair : *write_op->index_requests())14.0k { |
693 | 36.2k | client::YBTablePtr index_table; |
694 | 36.2k | bool cache_used_ignored = false; |
695 | 36.2k | auto metadata_cache = tablet().YBMetaDataCache(); |
696 | 36.2k | if (!metadata_cache) { |
697 | 0 | StartSynchronization( |
698 | 0 | std::move(self_), |
699 | 0 | STATUS(Corruption, "Table metadata cache is not present for index update")); |
700 | 0 | return; |
701 | 0 | } |
702 | | // TODO create async version of GetTable. |
703 | | // It is ok to have sync call here, because we use cache and it should not take too long. |
704 | 36.2k | auto status = metadata_cache->GetTable(pair.first->table_id(), &index_table, |
705 | 36.2k | &cache_used_ignored); |
706 | 36.2k | if (!status.ok()) { |
707 | 0 | StartSynchronization(std::move(self_), status); |
708 | 0 | return; |
709 | 0 | } |
710 | 36.2k | std::shared_ptr<client::YBqlWriteOp> index_op(index_table->NewQLWrite()); |
711 | 36.2k | index_op->mutable_request()->Swap(&pair.second); |
712 | 36.2k | index_op->mutable_request()->MergeFrom(pair.second); |
713 | 36.2k | session->Apply(index_op); |
714 | 36.2k | index_ops.emplace_back(std::move(index_op), write_op); |
715 | 36.2k | } |
716 | 14.0k | } |
717 | | |
718 | 2.08M | if (!session) { |
719 | 2.06M | CompleteQLWriteBatch(Status::OK()); |
720 | 2.06M | return; |
721 | 2.06M | } |
722 | | |
723 | 18.8k | session->FlushAsync(std::bind( |
724 | 18.8k | &WriteQuery::UpdateQLIndexesFlushed, this, session, txn, std::move(index_ops), _1)); |
725 | 18.8k | } |
726 | | |
727 | | void WriteQuery::UpdateQLIndexesFlushed( |
728 | | const client::YBSessionPtr& session, const client::YBTransactionPtr& txn, |
729 | 15.1k | const IndexOps& index_ops, client::FlushStatus* flush_status) { |
730 | 15.1k | std::unique_ptr<WriteQuery> query(std::move(self_)); |
731 | | |
732 | 15.1k | const auto& status = flush_status->status; |
733 | 15.1k | if (PREDICT_FALSE(!status.ok())) { |
734 | | // When any error occurs during the dispatching of YBOperation, YBSession saves the error and |
735 | | // returns IOError. When it happens, retrieves the errors and discard the IOError. |
736 | 145 | if (status.IsIOError()) { |
737 | 145 | for (const auto& error : flush_status->errors) { |
738 | | // return just the first error seen. |
739 | 145 | Cancel(error->status()); |
740 | 145 | return; |
741 | 145 | } |
742 | 145 | } |
743 | 0 | Cancel(status); |
744 | 0 | return; |
745 | 145 | } |
746 | | |
747 | 14.9k | ChildTransactionResultPB child_result; |
748 | 14.9k | if (txn) { |
749 | 13.7k | auto finish_result = txn->FinishChild(); |
750 | 13.7k | if (!finish_result.ok()) { |
751 | 0 | query->Cancel(finish_result.status()); |
752 | 0 | return; |
753 | 0 | } |
754 | 13.7k | child_result = std::move(*finish_result); |
755 | 13.7k | } |
756 | | |
757 | | // Check the responses of the index write ops. |
758 | 36.0k | for (const auto& pair : index_ops)14.9k { |
759 | 36.0k | std::shared_ptr<client::YBqlWriteOp> index_op = pair.first; |
760 | 36.0k | auto* response = pair.second->response(); |
761 | 36.0k | DCHECK_ONLY_NOTNULL(response); |
762 | 36.0k | auto* index_response = index_op->mutable_response(); |
763 | | |
764 | 36.0k | if (index_response->status() != QLResponsePB::YQL_STATUS_OK) { |
765 | 441 | DVLOG(1) << "Got status " << index_response->status() << " for " << AsString(index_op)0 ; |
766 | 441 | response->set_status(index_response->status()); |
767 | 441 | response->set_error_message(std::move(*index_response->mutable_error_message())); |
768 | 441 | } |
769 | 36.0k | if (txn) { |
770 | 33.2k | *response->mutable_child_transaction_result() = child_result; |
771 | 33.2k | } |
772 | 36.0k | } |
773 | | |
774 | 14.9k | self_ = std::move(query); |
775 | 14.9k | CompleteQLWriteBatch(Status::OK()); |
776 | 14.9k | } |
777 | | |
778 | 695k | void WriteQuery::PgsqlExecuteDone(const Status& status) { |
779 | 695k | if (!status.ok() || restart_read_ht_.is_valid()647k ) { |
780 | 47.6k | StartSynchronization(std::move(self_), status); |
781 | 47.6k | return; |
782 | 47.6k | } |
783 | | |
784 | 12.8M | for (auto& doc_op : doc_ops_)647k { |
785 | | // We'll need to return the number of rows inserted, updated, or deleted by each operation. |
786 | 12.8M | std::unique_ptr<docdb::PgsqlWriteOperation> pgsql_write_op( |
787 | 12.8M | down_cast<docdb::PgsqlWriteOperation*>(doc_op.release())); |
788 | 12.8M | pgsql_write_ops_.emplace_back(std::move(pgsql_write_op)); |
789 | 12.8M | } |
790 | | |
791 | 647k | StartSynchronization(std::move(self_), Status::OK()); |
792 | 647k | } |
793 | | |
794 | 314k | void WriteQuery::SimpleExecuteDone(const Status& status) { |
795 | 314k | StartSynchronization(std::move(self_), status); |
796 | 314k | } |
797 | | |
798 | | } // namespace tablet |
799 | | } // namespace yb |