YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/tablet/local_tablet_writer.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/tablet/local_tablet_writer.h"
15
16
#include "yb/common/ql_protocol_util.h"
17
#include "yb/common/ql_value.h"
18
19
#include "yb/gutil/casts.h"
20
#include "yb/gutil/singleton.h"
21
22
#include "yb/tablet/operations/write_operation.h"
23
#include "yb/tablet/tablet.h"
24
#include "yb/tablet/tablet_metadata.h"
25
#include "yb/tablet/write_query.h"
26
27
#include "yb/tserver/tserver.pb.h"
28
29
#include "yb/util/status_format.h"
30
#include "yb/util/status_log.h"
31
32
namespace yb {
33
namespace tablet {
34
35
namespace {
36
37
// This is used for providing OpIds to write operations, which must always be increasing.
38
class AutoIncrementingCounter {
39
 public:
40
54
  AutoIncrementingCounter() : next_index_(1) {}
41
96.0k
  int64_t GetAndIncrement() { return next_index_.fetch_add(1); }
42
 private:
43
  std::atomic<int64_t> next_index_;
44
};
45
46
} // namespace
47
48
LocalTabletWriter::LocalTabletWriter(Tablet* tablet)
49
    : tablet_(tablet), req_(std::make_unique<tserver::WriteRequestPB>()),
50
205
      resp_(std::make_unique<tserver::WriteResponsePB>()) {
51
205
}
52
53
205
LocalTabletWriter::~LocalTabletWriter() = default;
54
55
// Perform a write against the local tablet.
56
// Returns a bad Status if the applied operation had a per-row error.
57
95.6k
Status LocalTabletWriter::Write(QLWriteRequestPB* request) {
58
95.6k
  Batch batch;
59
95.6k
  batch.Add()->Swap(request);
60
95.6k
  return WriteBatch(&batch);
61
95.6k
}
62
63
96.0k
Status LocalTabletWriter::WriteBatch(Batch* batch) {
64
96.0k
  req_->Clear();
65
110k
  for (auto& req : *batch) {
66
110k
    req.set_schema_version(tablet_->metadata()->schema_version());
67
110k
    QLSetHashCode(&req);
68
110k
  }
69
96.0k
  req_->mutable_ql_write_batch()->Swap(batch);
70
71
96.0k
  auto query = std::make_unique<WriteQuery>(
72
96.0k
      OpId::kUnknownTerm, CoarseTimePoint::max() /* deadline */, this,
73
96.0k
      tablet_, resp_.get());
74
96.0k
  query->set_client_request(*req_);
75
96.0k
  write_promise_ = std::promise<Status>();
76
96.0k
  tablet_->AcquireLocksAndPerformDocOperations(std::move(query));
77
78
96.0k
  return write_promise_.get_future().get();
79
96.0k
}
80
81
96.0k
void LocalTabletWriter::Submit(std::unique_ptr<Operation> operation, int64_t term) {
82
96.0k
  auto state = down_cast<WriteOperation*>(operation.get());
83
96.0k
  OpId op_id(term, Singleton<AutoIncrementingCounter>::get()->GetAndIncrement());
84
85
96.0k
  auto hybrid_time = tablet_->mvcc_manager()->AddLeaderPending(op_id);
86
96.0k
  state->set_hybrid_time(hybrid_time);
87
88
  // Create a "fake" OpId and set it in the Operation for anchoring.
89
96.0k
  state->set_op_id(op_id);
90
91
96.0k
  CHECK_OK(tablet_->ApplyRowOperations(state));
92
93
96.0k
  tablet_->mvcc_manager()->Replicated(hybrid_time, op_id);
94
95
96.0k
  state->CompleteWithStatus(Status::OK());
96
97
  // Return the status of first failed op.
98
96.0k
  int op_idx = 0;
99
387M
  for (const auto& result : resp_->ql_response_batch()) {
100
387M
    if (result.status() != QLResponsePB::YQL_STATUS_OK) {
101
0
      write_promise_.set_value(STATUS_FORMAT(
102
0
          RuntimeError, "Op $0 failed: $1 ($2)", op_idx, result.error_message(),
103
0
          QLResponsePB_QLStatus_Name(result.status())));
104
0
    }
105
387M
    op_idx++;
106
387M
  }
107
108
96.0k
  write_promise_.set_value(Status::OK());
109
96.0k
}
110
111
0
Result<HybridTime> LocalTabletWriter::ReportReadRestart() {
112
0
  return HybridTime();
113
0
}
114
115
}  // namespace tablet
116
}  // namespace yb