/Users/deen/code/yugabyte-db/src/yb/tablet/local_tablet_writer.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | |
14 | | #include "yb/tablet/local_tablet_writer.h" |
15 | | |
16 | | #include "yb/common/ql_protocol_util.h" |
17 | | #include "yb/common/ql_value.h" |
18 | | |
19 | | #include "yb/gutil/casts.h" |
20 | | #include "yb/gutil/singleton.h" |
21 | | |
22 | | #include "yb/tablet/operations/write_operation.h" |
23 | | #include "yb/tablet/tablet.h" |
24 | | #include "yb/tablet/tablet_metadata.h" |
25 | | #include "yb/tablet/write_query.h" |
26 | | |
27 | | #include "yb/tserver/tserver.pb.h" |
28 | | |
29 | | #include "yb/util/status_format.h" |
30 | | #include "yb/util/status_log.h" |
31 | | |
32 | | namespace yb { |
33 | | namespace tablet { |
34 | | |
35 | | namespace { |
36 | | |
37 | | // This is used for providing OpIds to write operations, which must always be increasing. |
38 | | class AutoIncrementingCounter { |
39 | | public: |
40 | 54 | AutoIncrementingCounter() : next_index_(1) {} |
41 | 96.1k | int64_t GetAndIncrement() { return next_index_.fetch_add(1); } |
42 | | private: |
43 | | std::atomic<int64_t> next_index_; |
44 | | }; |
45 | | |
46 | | } // namespace |
47 | | |
48 | | LocalTabletWriter::LocalTabletWriter(Tablet* tablet) |
49 | | : tablet_(tablet), req_(std::make_unique<tserver::WriteRequestPB>()), |
50 | 205 | resp_(std::make_unique<tserver::WriteResponsePB>()) { |
51 | 205 | } |
52 | | |
53 | 205 | LocalTabletWriter::~LocalTabletWriter() = default; |
54 | | |
55 | | // Perform a write against the local tablet. |
56 | | // Returns a bad Status if the applied operation had a per-row error. |
57 | 95.6k | Status LocalTabletWriter::Write(QLWriteRequestPB* request) { |
58 | 95.6k | Batch batch; |
59 | 95.6k | batch.Add()->Swap(request); |
60 | 95.6k | return WriteBatch(&batch); |
61 | 95.6k | } |
62 | | |
63 | 96.1k | Status LocalTabletWriter::WriteBatch(Batch* batch) { |
64 | 96.1k | req_->Clear(); |
65 | 114k | for (auto& req : *batch) { |
66 | 114k | req.set_schema_version(tablet_->metadata()->schema_version()); |
67 | 114k | QLSetHashCode(&req); |
68 | 114k | } |
69 | 96.1k | req_->mutable_ql_write_batch()->Swap(batch); |
70 | | |
71 | 96.1k | auto query = std::make_unique<WriteQuery>( |
72 | 96.1k | OpId::kUnknownTerm, CoarseTimePoint::max() /* deadline */, this, |
73 | 96.1k | tablet_, resp_.get()); |
74 | 96.1k | query->set_client_request(*req_); |
75 | 96.1k | write_promise_ = std::promise<Status>(); |
76 | 96.1k | tablet_->AcquireLocksAndPerformDocOperations(std::move(query)); |
77 | | |
78 | 96.1k | return write_promise_.get_future().get(); |
79 | 96.1k | } |
80 | | |
81 | 96.1k | void LocalTabletWriter::Submit(std::unique_ptr<Operation> operation, int64_t term) { |
82 | 96.1k | auto state = down_cast<WriteOperation*>(operation.get()); |
83 | 96.1k | OpId op_id(term, Singleton<AutoIncrementingCounter>::get()->GetAndIncrement()); |
84 | | |
85 | 96.1k | auto hybrid_time = tablet_->mvcc_manager()->AddLeaderPending(op_id); |
86 | 96.1k | state->set_hybrid_time(hybrid_time); |
87 | | |
88 | | // Create a "fake" OpId and set it in the Operation for anchoring. |
89 | 96.1k | state->set_op_id(op_id); |
90 | | |
91 | 96.1k | CHECK_OK(tablet_->ApplyRowOperations(state)); |
92 | | |
93 | 96.1k | tablet_->mvcc_manager()->Replicated(hybrid_time, op_id); |
94 | | |
95 | 96.1k | state->CompleteWithStatus(Status::OK()); |
96 | | |
97 | | // Return the status of first failed op. |
98 | 96.1k | int op_idx = 0; |
99 | 387M | for (const auto& result : resp_->ql_response_batch()) { |
100 | 387M | if (result.status() != QLResponsePB::YQL_STATUS_OK) { |
101 | 0 | write_promise_.set_value(STATUS_FORMAT( |
102 | 0 | RuntimeError, "Op $0 failed: $1 ($2)", op_idx, result.error_message(), |
103 | 0 | QLResponsePB_QLStatus_Name(result.status()))); |
104 | 0 | } |
105 | 387M | op_idx++; |
106 | 387M | } |
107 | | |
108 | 96.1k | write_promise_.set_value(Status::OK()); |
109 | 96.1k | } |
110 | | |
111 | 0 | Result<HybridTime> LocalTabletWriter::ReportReadRestart() { |
112 | 0 | return HybridTime(); |
113 | 0 | } |
114 | | |
115 | | } // namespace tablet |
116 | | } // namespace yb |