/Users/deen/code/yugabyte-db/src/yb/tserver/tablet_server-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/common/index.h" |
34 | | #include "yb/common/partition.h" |
35 | | #include "yb/common/ql_value.h" |
36 | | #include "yb/common/wire_protocol.h" |
37 | | |
38 | | #include "yb/consensus/log-test-base.h" |
39 | | |
40 | | #include "yb/gutil/strings/escaping.h" |
41 | | #include "yb/gutil/strings/substitute.h" |
42 | | |
43 | | #include "yb/rpc/messenger.h" |
44 | | #include "yb/rpc/rpc_controller.h" |
45 | | #include "yb/rpc/rpc_test_util.h" |
46 | | #include "yb/rpc/yb_rpc.h" |
47 | | |
48 | | #include "yb/server/hybrid_clock.h" |
49 | | #include "yb/server/server_base.pb.h" |
50 | | #include "yb/server/server_base.proxy.h" |
51 | | |
52 | | #include "yb/tablet/tablet.h" |
53 | | #include "yb/tablet/tablet_metadata.h" |
54 | | #include "yb/tablet/tablet_peer.h" |
55 | | |
56 | | #include "yb/tserver/mini_tablet_server.h" |
57 | | #include "yb/tserver/tablet_server-test-base.h" |
58 | | #include "yb/tserver/tablet_server.h" |
59 | | #include "yb/tserver/tablet_server_test_util.h" |
60 | | #include "yb/tserver/ts_tablet_manager.h" |
61 | | #include "yb/tserver/tserver_admin.proxy.h" |
62 | | #include "yb/tserver/tserver_service.proxy.h" |
63 | | |
64 | | #include "yb/util/crc.h" |
65 | | #include "yb/util/curl_util.h" |
66 | | #include "yb/util/metrics.h" |
67 | | #include "yb/util/status_log.h" |
68 | | |
69 | | using yb::consensus::RaftConfigPB; |
70 | | using yb::consensus::RaftPeerPB; |
71 | | using yb::rpc::Messenger; |
72 | | using yb::rpc::MessengerBuilder; |
73 | | using yb::rpc::RpcController; |
74 | | using yb::server::Clock; |
75 | | using yb::server::HybridClock; |
76 | | using yb::tablet::Tablet; |
77 | | using yb::tablet::TabletPeer; |
78 | | using std::shared_ptr; |
79 | | using std::string; |
80 | | using strings::Substitute; |
81 | | |
82 | | DEFINE_int32(single_threaded_insert_latency_bench_warmup_rows, 100, |
83 | | "Number of rows to insert in the warmup phase of the single threaded" |
84 | | " tablet server insert latency micro-benchmark"); |
85 | | |
86 | | DEFINE_int32(single_threaded_insert_latency_bench_insert_rows, 1000, |
87 | | "Number of rows to insert in the testing phase of the single threaded" |
88 | | " tablet server insert latency micro-benchmark"); |
89 | | |
90 | | DECLARE_int32(scanner_batch_size_rows); |
91 | | DECLARE_int32(metrics_retirement_age_ms); |
92 | | DECLARE_string(block_manager); |
93 | | DECLARE_string(rpc_bind_addresses); |
94 | | DECLARE_bool(disable_clock_sync_error); |
95 | | |
96 | | // Declare these metrics prototypes for simpler unit testing of their behavior. |
97 | | METRIC_DECLARE_counter(rows_inserted); |
98 | | METRIC_DECLARE_counter(rows_updated); |
99 | | METRIC_DECLARE_counter(rows_deleted); |
100 | | |
101 | | namespace yb { |
102 | | namespace tserver { |
103 | | |
104 | | class TabletServerTest : public TabletServerTestBase { |
105 | | public: |
106 | | explicit TabletServerTest(TableType table_type = YQL_TABLE_TYPE) |
107 | 17 | : TabletServerTestBase(table_type) {} |
108 | | |
109 | | // Starts the tablet server, override to start it later. |
110 | 17 | void SetUp() override { |
111 | 17 | TabletServerTestBase::SetUp(); |
112 | 17 | StartTabletServer(); |
113 | 17 | } |
114 | | |
115 | | CHECKED_STATUS CallDeleteTablet(const std::string& uuid, |
116 | | const char* tablet_id, |
117 | 2 | tablet::TabletDataState state) { |
118 | 2 | DeleteTabletRequestPB req; |
119 | 2 | DeleteTabletResponsePB resp; |
120 | 2 | RpcController rpc; |
121 | | |
122 | 2 | req.set_dest_uuid(uuid); |
123 | 2 | req.set_tablet_id(tablet_id); |
124 | 2 | req.set_delete_type(state); |
125 | | |
126 | | // Send the call |
127 | 2 | { |
128 | 2 | SCOPED_TRACE(req.DebugString()); |
129 | 2 | RETURN_NOT_OK(admin_proxy_->DeleteTablet(req, &resp, &rpc)); |
130 | 2 | SCOPED_TRACE(resp.DebugString()); |
131 | 2 | if (resp.has_error()) { |
132 | 1 | auto status = StatusFromPB(resp.error().status()); |
133 | 1 | RETURN_NOT_OK(status); |
134 | 1 | } |
135 | 2 | } |
136 | 1 | return Status::OK(); |
137 | 2 | } |
138 | | }; |
139 | | |
140 | 1 | TEST_F(TabletServerTest, TestPingServer) { |
141 | | // Ping the server. |
142 | 1 | server::PingRequestPB req; |
143 | 1 | server::PingResponsePB resp; |
144 | 1 | RpcController controller; |
145 | 1 | ASSERT_OK(generic_proxy_->Ping(req, &resp, &controller)); |
146 | 1 | } |
147 | | |
148 | 1 | TEST_F(TabletServerTest, TestServerClock) { |
149 | 1 | server::ServerClockRequestPB req; |
150 | 1 | server::ServerClockResponsePB resp; |
151 | 1 | RpcController controller; |
152 | | |
153 | 1 | ASSERT_OK(generic_proxy_->ServerClock(req, &resp, &controller)); |
154 | 1 | ASSERT_GT(mini_server_->server()->clock()->Now().ToUint64(), resp.hybrid_time()); |
155 | 1 | } |
156 | | |
157 | 1 | TEST_F(TabletServerTest, TestSetFlagsAndCheckWebPages) { |
158 | 1 | server::GenericServiceProxy proxy( |
159 | 1 | proxy_cache_.get(), HostPort::FromBoundEndpoint(mini_server_->bound_rpc_addr())); |
160 | | |
161 | 1 | server::SetFlagRequestPB req; |
162 | 1 | server::SetFlagResponsePB resp; |
163 | | |
164 | | // Set an invalid flag. |
165 | 1 | { |
166 | 1 | RpcController controller; |
167 | 1 | req.set_flag("foo"); |
168 | 1 | req.set_value("bar"); |
169 | 1 | ASSERT_OK(proxy.SetFlag(req, &resp, &controller)); |
170 | 1 | SCOPED_TRACE(resp.DebugString()); |
171 | 1 | EXPECT_EQ(server::SetFlagResponsePB::NO_SUCH_FLAG, resp.result()); |
172 | 1 | EXPECT_TRUE(resp.msg().empty()); |
173 | 1 | } |
174 | | |
175 | | // Set a valid flag to a valid value. |
176 | 1 | { |
177 | 1 | int32_t old_val = FLAGS_metrics_retirement_age_ms; |
178 | 1 | RpcController controller; |
179 | 1 | req.set_flag("metrics_retirement_age_ms"); |
180 | 1 | req.set_value("12345"); |
181 | 1 | ASSERT_OK(proxy.SetFlag(req, &resp, &controller)); |
182 | 1 | SCOPED_TRACE(resp.DebugString()); |
183 | 1 | EXPECT_EQ(server::SetFlagResponsePB::SUCCESS, resp.result()); |
184 | 1 | EXPECT_EQ(resp.msg(), "metrics_retirement_age_ms set to 12345\n"); |
185 | 1 | EXPECT_EQ(Substitute("$0", old_val), resp.old_value()); |
186 | 1 | EXPECT_EQ(12345, FLAGS_metrics_retirement_age_ms); |
187 | 1 | } |
188 | | |
189 | | // Set a valid flag to an invalid value. |
190 | 1 | { |
191 | 1 | RpcController controller; |
192 | 1 | req.set_flag("metrics_retirement_age_ms"); |
193 | 1 | req.set_value("foo"); |
194 | 1 | ASSERT_OK(proxy.SetFlag(req, &resp, &controller)); |
195 | 1 | SCOPED_TRACE(resp.DebugString()); |
196 | 1 | EXPECT_EQ(server::SetFlagResponsePB::BAD_VALUE, resp.result()); |
197 | 1 | EXPECT_EQ(resp.msg(), "Unable to set flag: bad value"); |
198 | 1 | EXPECT_EQ(12345, FLAGS_metrics_retirement_age_ms); |
199 | 1 | } |
200 | | |
201 | | // Try setting a flag which isn't runtime-modifiable |
202 | 1 | { |
203 | 1 | RpcController controller; |
204 | 1 | req.set_flag("tablet_do_dup_key_checks"); |
205 | 1 | req.set_value("true"); |
206 | 1 | ASSERT_OK(proxy.SetFlag(req, &resp, &controller)); |
207 | 1 | SCOPED_TRACE(resp.DebugString()); |
208 | 1 | EXPECT_EQ(server::SetFlagResponsePB::NOT_SAFE, resp.result()); |
209 | 1 | } |
210 | | |
211 | | // Try again, but with the force flag. |
212 | 1 | { |
213 | 1 | RpcController controller; |
214 | 1 | req.set_flag("tablet_do_dup_key_checks"); |
215 | 1 | req.set_value("true"); |
216 | 1 | req.set_force(true); |
217 | 1 | ASSERT_OK(proxy.SetFlag(req, &resp, &controller)); |
218 | 1 | SCOPED_TRACE(resp.DebugString()); |
219 | 1 | EXPECT_EQ(server::SetFlagResponsePB::SUCCESS, resp.result()); |
220 | 1 | } |
221 | | |
222 | 1 | EasyCurl c; |
223 | 1 | faststring buf; |
224 | 1 | string addr = yb::ToString(mini_server_->bound_http_addr()); |
225 | | |
226 | | // Tablets page should list tablet. |
227 | 1 | ASSERT_OK(c.FetchURL(Substitute("http://$0/tablets", addr), |
228 | 1 | &buf)); |
229 | 1 | ASSERT_STR_CONTAINS(buf.ToString(), kTabletId); |
230 | | |
231 | | // Tablet page should include the schema. |
232 | 1 | ASSERT_OK(c.FetchURL(Substitute("http://$0/tablet?id=$1", addr, kTabletId), |
233 | 1 | &buf)); |
234 | 1 | ASSERT_STR_CONTAINS(buf.ToString(), "<th>key</th>"); |
235 | 1 | ASSERT_STR_CONTAINS(buf.ToString(), "<td>string NULLABLE NOT A PARTITION KEY</td>"); |
236 | | |
237 | 1 | ASSERT_OK(c.FetchURL(Substitute("http://$0/tablet-consensus-status?id=$1", |
238 | 1 | addr, kTabletId), &buf)); |
239 | 1 | ASSERT_STR_CONTAINS(buf.ToString(), kTabletId); |
240 | | |
241 | | // Test fetching metrics. |
242 | | // Fetching metrics has the side effect of retiring metrics, but not in a single pass. |
243 | | // So, we check a couple of times in a loop -- thus, if we had a bug where one of these |
244 | | // metrics was accidentally un-referenced too early, we'd cause it to get retired. |
245 | | // If the metrics survive several passes of fetching, then we are pretty sure they will |
246 | | // stick around properly for the whole lifetime of the server. |
247 | 1 | FLAGS_metrics_retirement_age_ms = 0; |
248 | 4 | for (int i = 0; i < 3; i++) { |
249 | 3 | SCOPED_TRACE(i); |
250 | 3 | ASSERT_OK(c.FetchURL(strings::Substitute("http://$0/jsonmetricz", addr, kTabletId), |
251 | 3 | &buf)); |
252 | | |
253 | | // Check that the tablet entry shows up. |
254 | 3 | ASSERT_STR_CONTAINS(buf.ToString(), "\"type\": \"tablet\""); |
255 | 3 | ASSERT_STR_CONTAINS(buf.ToString(), "\"id\": \"test-tablet\""); |
256 | | |
257 | | // Check entity attributes. |
258 | 3 | ASSERT_STR_CONTAINS(buf.ToString(), "\"table_name\": \"test-table\""); |
259 | | |
260 | | // Check for the existence of some particular metrics for which we've had early-retirement |
261 | | // bugs in the past. |
262 | 3 | ASSERT_STR_CONTAINS(buf.ToString(), "hybrid_clock_hybrid_time"); |
263 | 3 | ASSERT_STR_CONTAINS(buf.ToString(), "threads_started"); |
264 | | #ifdef TCMALLOC_ENABLED |
265 | | ASSERT_STR_CONTAINS(buf.ToString(), "tcmalloc_max_total_thread_cache_bytes"); |
266 | | #endif |
267 | 3 | ASSERT_STR_CONTAINS(buf.ToString(), "glog_info_messages"); |
268 | 3 | } |
269 | | |
270 | | // Smoke-test the tracing infrastructure. |
271 | 1 | ASSERT_OK(c.FetchURL( |
272 | 1 | Substitute("http://$0/tracing/json/get_buffer_percent_full", addr, kTabletId), |
273 | 1 | &buf)); |
274 | 1 | ASSERT_EQ(buf.ToString(), "0"); |
275 | | |
276 | 1 | string enable_req_json = "{\"categoryFilter\":\"*\", \"useContinuousTracing\": \"true\"," |
277 | 1 | " \"useSampling\": \"false\"}"; |
278 | 1 | string req_b64; |
279 | 1 | Base64Escape(enable_req_json, &req_b64); |
280 | | |
281 | 1 | ASSERT_OK(c.FetchURL(Substitute("http://$0/tracing/json/begin_recording?$1", |
282 | 1 | addr, |
283 | 1 | req_b64), &buf)); |
284 | 1 | ASSERT_EQ(buf.ToString(), ""); |
285 | 1 | ASSERT_OK(c.FetchURL(Substitute("http://$0/tracing/json/end_recording", addr), |
286 | 1 | &buf)); |
287 | 1 | ASSERT_STR_CONTAINS(buf.ToString(), "__metadata"); |
288 | 1 | ASSERT_OK(c.FetchURL(Substitute("http://$0/tracing/json/categories", addr), |
289 | 1 | &buf)); |
290 | 1 | ASSERT_STR_CONTAINS(buf.ToString(), "\"rpc\""); |
291 | | |
292 | | // Smoke test the pprof contention profiler handler. |
293 | 1 | ASSERT_OK(c.FetchURL(Substitute("http://$0/pprof/contention?seconds=1", addr), |
294 | 1 | &buf)); |
295 | 1 | ASSERT_STR_CONTAINS(buf.ToString(), "Discarded samples = 0"); |
296 | | #if defined(__linux__) |
297 | | // The executable name appears as part of the dump of /proc/self/maps, which |
298 | | // only exists on Linux. |
299 | | ASSERT_STR_CONTAINS(buf.ToString(), "tablet_server-test"); |
300 | | #endif |
301 | 1 | } |
302 | | |
303 | 1 | TEST_F(TabletServerTest, TestInsert) { |
304 | 1 | WriteRequestPB req; |
305 | | |
306 | 1 | req.set_tablet_id(kTabletId); |
307 | | |
308 | 1 | WriteResponsePB resp; |
309 | 1 | RpcController controller; |
310 | | |
311 | 1 | std::shared_ptr<TabletPeer> tablet; |
312 | 1 | ASSERT_TRUE(mini_server_->server()->tablet_manager()->LookupTablet(kTabletId, &tablet)); |
313 | 1 | scoped_refptr<Counter> rows_inserted = |
314 | 1 | METRIC_rows_inserted.Instantiate(tablet->tablet()->GetTabletMetricsEntity()); |
315 | 1 | ASSERT_EQ(0, rows_inserted->value()); |
316 | 1 | tablet.reset(); |
317 | | |
318 | | // Send an empty request. |
319 | | // This should succeed and do nothing. |
320 | 1 | { |
321 | 1 | controller.Reset(); |
322 | 1 | SCOPED_TRACE(req.DebugString()); |
323 | 1 | ASSERT_OK(proxy_->Write(req, &resp, &controller)); |
324 | 1 | SCOPED_TRACE(resp.DebugString()); |
325 | 1 | ASSERT_FALSE(resp.has_error()); |
326 | 1 | } |
327 | | |
328 | | // Send an actual row insert. |
329 | 1 | { |
330 | 1 | controller.Reset(); |
331 | 1 | AddTestRowInsert(1234, 5678, "hello world via RPC", &req); |
332 | 1 | SCOPED_TRACE(req.DebugString()); |
333 | 1 | ASSERT_OK(proxy_->Write(req, &resp, &controller)); |
334 | 1 | SCOPED_TRACE(resp.DebugString()); |
335 | 1 | ASSERT_FALSE(resp.has_error()); |
336 | 1 | req.clear_ql_write_batch(); |
337 | 1 | } |
338 | | |
339 | | // Send a batch with multiple rows, one of which is a duplicate of |
340 | | // the above insert. This should generate one error into per_row_errors. |
341 | 1 | { |
342 | 1 | controller.Reset(); |
343 | 1 | AddTestRowInsert(1, 1, "ceci n'est pas une dupe", &req); |
344 | 1 | AddTestRowInsert(2, 1, "also not a dupe key", &req); |
345 | 1 | AddTestRowInsert(1234, 1, "I am a duplicate key", &req); |
346 | 1 | SCOPED_TRACE(req.DebugString()); |
347 | 1 | ASSERT_OK(proxy_->Write(req, &resp, &controller)); |
348 | 1 | SCOPED_TRACE(resp.DebugString()); |
349 | 2 | ASSERT_FALSE(resp.has_error()) << resp.ShortDebugString(); |
350 | 1 | } |
351 | | |
352 | | // get the clock's current hybrid_time |
353 | 1 | HybridTime now_before = mini_server_->server()->clock()->Now(); |
354 | | |
355 | 1 | rows_inserted = nullptr; |
356 | 1 | ASSERT_OK(ShutdownAndRebuildTablet()); |
357 | 1 | VerifyRows(schema_, { KeyValue(1, 1), KeyValue(2, 1), KeyValue(1234, 5678) }); |
358 | | |
359 | | // get the clock's hybrid_time after replay |
360 | 1 | HybridTime now_after = mini_server_->server()->clock()->Now(); |
361 | | |
362 | | // make sure 'now_after' is greater than or equal to 'now_before' |
363 | 1 | ASSERT_GE(now_after.value(), now_before.value()); |
364 | 1 | } |
365 | | |
366 | 1 | TEST_F(TabletServerTest, TestExternalConsistencyModes_ClientPropagated) { |
367 | 1 | WriteRequestPB req; |
368 | 1 | req.set_tablet_id(kTabletId); |
369 | 1 | WriteResponsePB resp; |
370 | 1 | RpcController controller; |
371 | | |
372 | 1 | std::shared_ptr<TabletPeer> tablet; |
373 | 1 | ASSERT_TRUE( |
374 | 1 | mini_server_->server()->tablet_manager()->LookupTablet(kTabletId, |
375 | 1 | &tablet)); |
376 | | // get the current time |
377 | 1 | HybridTime current = mini_server_->server()->clock()->Now(); |
378 | | // advance current to some time in the future. we do 5 secs to make |
379 | | // sure this hybrid_time will still be in the future when it reaches the |
380 | | // server. |
381 | 1 | current = HybridClock::HybridTimeFromMicroseconds( |
382 | 1 | HybridClock::GetPhysicalValueMicros(current) + 5000000); |
383 | | |
384 | 1 | AddTestRowInsert(1234, 5678, "hello world via RPC", &req); |
385 | | |
386 | 1 | req.set_propagated_hybrid_time(current.ToUint64()); |
387 | 1 | SCOPED_TRACE(req.DebugString()); |
388 | 1 | ASSERT_OK(proxy_->Write(req, &resp, &controller)); |
389 | 1 | SCOPED_TRACE(resp.DebugString()); |
390 | 1 | ASSERT_FALSE(resp.has_error()); |
391 | | |
392 | | // make sure the server returned a write hybrid_time where only |
393 | | // the logical value was increased since he should have updated |
394 | | // its clock with the client's value. |
395 | 1 | HybridTime write_hybrid_time(resp.propagated_hybrid_time()); |
396 | | |
397 | 1 | ASSERT_EQ(HybridClock::GetPhysicalValueMicros(current), |
398 | 1 | HybridClock::GetPhysicalValueMicros(write_hybrid_time)); |
399 | | |
400 | 1 | ASSERT_LE(HybridClock::GetLogicalValue(current) + 1, |
401 | 1 | HybridClock::GetLogicalValue(write_hybrid_time)); |
402 | 1 | } |
403 | | |
404 | 1 | TEST_F(TabletServerTest, TestInsertAndMutate) { |
405 | | |
406 | 1 | std::shared_ptr<TabletPeer> tablet; |
407 | 1 | ASSERT_TRUE(mini_server_->server()->tablet_manager()->LookupTablet(kTabletId, &tablet)); |
408 | 1 | tablet.reset(); |
409 | | |
410 | 1 | RpcController controller; |
411 | | |
412 | 1 | { |
413 | 1 | WriteRequestPB req; |
414 | 1 | WriteResponsePB resp; |
415 | 1 | req.set_tablet_id(kTabletId); |
416 | | |
417 | 1 | AddTestRowInsert(1, 1, "original1", &req); |
418 | 1 | AddTestRowInsert(2, 2, "original2", &req); |
419 | 1 | AddTestRowInsert(3, 3, "original3", &req); |
420 | 1 | SCOPED_TRACE(req.DebugString()); |
421 | 1 | ASSERT_OK(proxy_->Write(req, &resp, &controller)); |
422 | 1 | SCOPED_TRACE(resp.DebugString()); |
423 | 2 | ASSERT_FALSE(resp.has_error()) << resp.ShortDebugString(); |
424 | 1 | controller.Reset(); |
425 | 1 | } |
426 | | |
427 | | // Try and mutate the rows inserted above |
428 | 1 | { |
429 | 1 | WriteRequestPB req; |
430 | 1 | WriteResponsePB resp; |
431 | 1 | req.set_tablet_id(kTabletId); |
432 | | |
433 | 1 | AddTestRowUpdate(1, 2, "mutation1", &req); |
434 | 1 | AddTestRowUpdate(2, 3, "mutation2", &req); |
435 | 1 | AddTestRowUpdate(3, 4, "mutation3", &req); |
436 | 1 | SCOPED_TRACE(req.DebugString()); |
437 | 1 | ASSERT_OK(proxy_->Write(req, &resp, &controller)); |
438 | 1 | SCOPED_TRACE(resp.DebugString()); |
439 | 2 | ASSERT_FALSE(resp.has_error()) << resp.ShortDebugString(); |
440 | 1 | controller.Reset(); |
441 | 1 | } |
442 | | |
443 | | // Try and mutate a non existent row key (should not get an error) |
444 | 1 | { |
445 | 1 | WriteRequestPB req; |
446 | 1 | WriteResponsePB resp; |
447 | 1 | req.set_tablet_id(kTabletId); |
448 | | |
449 | 1 | AddTestRowUpdate(1234, 2, "mutated", &req); |
450 | 1 | SCOPED_TRACE(req.DebugString()); |
451 | 1 | ASSERT_OK(proxy_->Write(req, &resp, &controller)); |
452 | 1 | SCOPED_TRACE(resp.DebugString()); |
453 | 2 | ASSERT_FALSE(resp.has_error()) << resp.ShortDebugString(); |
454 | 1 | controller.Reset(); |
455 | 1 | } |
456 | | |
457 | | // Try and delete 1 row |
458 | 1 | { |
459 | 1 | WriteRequestPB req; |
460 | 1 | WriteResponsePB resp; |
461 | 1 | req.set_tablet_id(kTabletId); |
462 | | |
463 | 1 | AddTestRowDelete(1, &req); |
464 | 1 | SCOPED_TRACE(req.DebugString()); |
465 | 1 | ASSERT_OK(proxy_->Write(req, &resp, &controller)); |
466 | 1 | SCOPED_TRACE(resp.DebugString()); |
467 | 2 | ASSERT_FALSE(resp.has_error()) << resp.ShortDebugString(); |
468 | 1 | controller.Reset(); |
469 | 1 | } |
470 | | |
471 | | // Now try and mutate a row we just deleted, we should not get an error |
472 | 1 | { |
473 | 1 | WriteRequestPB req; |
474 | 1 | WriteResponsePB resp; |
475 | 1 | req.set_tablet_id(kTabletId); |
476 | | |
477 | 1 | AddTestRowUpdate(1, 2, "mutated1", &req); |
478 | 1 | SCOPED_TRACE(req.DebugString()); |
479 | 1 | ASSERT_OK(proxy_->Write(req, &resp, &controller)); |
480 | 1 | SCOPED_TRACE(resp.DebugString()); |
481 | 2 | ASSERT_FALSE(resp.has_error()) << resp.ShortDebugString(); |
482 | 1 | controller.Reset(); |
483 | 1 | } |
484 | | |
485 | | // At this point, we have two left. |
486 | 1 | VerifyRows(schema_, {KeyValue(1, 2), KeyValue(2, 3), KeyValue(3, 4), KeyValue(1234, 2)}); |
487 | | |
488 | | // Do a mixed operation (some insert, update, and delete, some of which fail) |
489 | 1 | { |
490 | 1 | WriteRequestPB req; |
491 | 1 | WriteResponsePB resp; |
492 | 1 | req.set_tablet_id(kTabletId); |
493 | | |
494 | | // op 0: Mutate row 1, which doesn't exist. This should not fail. |
495 | 1 | AddTestRowUpdate(1, 3, "mutate_should_not_fail", &req); |
496 | | // op 1: Insert a new row 4 (succeeds) |
497 | 1 | AddTestRowInsert(4, 4, "new row 4", &req); |
498 | | // op 2: Delete a non-existent row 5 (should fail) |
499 | 1 | AddTestRowDelete(5, &req); |
500 | | // op 3: Insert a new row 6 (succeeds) |
501 | 1 | AddTestRowInsert(6, 6, "new row 6", &req); |
502 | | |
503 | 1 | SCOPED_TRACE(req.DebugString()); |
504 | 1 | ASSERT_OK(proxy_->Write(req, &resp, &controller)); |
505 | 1 | SCOPED_TRACE(resp.DebugString()); |
506 | 2 | ASSERT_FALSE(resp.has_error()) << resp.ShortDebugString(); |
507 | 1 | controller.Reset(); |
508 | 1 | } |
509 | | |
510 | | // get the clock's current hybrid_time |
511 | 1 | HybridTime now_before = mini_server_->server()->clock()->Now(); |
512 | | |
513 | 1 | ASSERT_NO_FATALS(WARN_NOT_OK(ShutdownAndRebuildTablet(), "Shutdown failed: ")); |
514 | 1 | VerifyRows(schema_, |
515 | 1 | { KeyValue(1, 3), KeyValue(2, 3), KeyValue(3, 4), KeyValue(4, 4), KeyValue(6, 6), |
516 | 1 | KeyValue(1234, 2) }); |
517 | | |
518 | | // get the clock's hybrid_time after replay |
519 | 1 | HybridTime now_after = mini_server_->server()->clock()->Now(); |
520 | | |
521 | | // make sure 'now_after' is greater that or equal to 'now_before' |
522 | 1 | ASSERT_GE(now_after.value(), now_before.value()); |
523 | 1 | } |
524 | | |
525 | | // Test that passing a schema with fields not present in the tablet schema |
526 | | // throws an exception. |
527 | 1 | TEST_F(TabletServerTest, TestInvalidWriteRequest_BadSchema) { |
528 | 1 | SchemaBuilder schema_builder(schema_); |
529 | 1 | ASSERT_OK(schema_builder.AddColumn("col_doesnt_exist", INT32)); |
530 | 1 | Schema bad_schema_with_ids = schema_builder.Build(); |
531 | 1 | Schema bad_schema = schema_builder.BuildWithoutIds(); |
532 | | |
533 | | // Send a row insert with an extra column |
534 | 1 | { |
535 | 1 | WriteRequestPB req; |
536 | 1 | WriteResponsePB resp; |
537 | 1 | RpcController controller; |
538 | | |
539 | 1 | req.set_tablet_id(kTabletId); |
540 | | |
541 | 1 | AddTestRowInsert(1234, 5678, "hello world via RPC", &req); |
542 | 1 | req.mutable_ql_write_batch(0)->set_schema_version(1); |
543 | | |
544 | 1 | SCOPED_TRACE(req.DebugString()); |
545 | 1 | ASSERT_OK(proxy_->Write(req, &resp, &controller)); |
546 | 1 | SCOPED_TRACE(resp.DebugString()); |
547 | 1 | ASSERT_FALSE(resp.has_error()); |
548 | 1 | ASSERT_EQ(QLResponsePB::YQL_STATUS_SCHEMA_VERSION_MISMATCH, resp.ql_response_batch(0).status()); |
549 | 1 | } |
550 | 1 | } |
551 | | |
552 | 1 | TEST_F(TabletServerTest, TestClientGetsErrorBackWhenRecoveryFailed) { |
553 | 1 | ASSERT_NO_FATALS(InsertTestRowsRemote(0, 1, 7)); |
554 | | |
555 | 1 | ASSERT_OK(tablet_peer_->tablet()->Flush(tablet::FlushMode::kSync)); |
556 | | |
557 | | // Save the log path before shutting down the tablet (and destroying |
558 | | // the tablet peer). |
559 | 1 | string log_path = tablet_peer_->log()->ActiveSegmentForTests()->path(); |
560 | 1 | auto idx = tablet_peer_->log()->ActiveSegmentForTests()->first_entry_offset() + 300; |
561 | | |
562 | 1 | ShutdownTablet(); |
563 | 1 | ASSERT_OK(log::CorruptLogFile(env_.get(), log_path, log::FLIP_BYTE, idx)); |
564 | | |
565 | 1 | ASSERT_FALSE(ShutdownAndRebuildTablet().ok()); |
566 | | |
567 | | // Connect to it. |
568 | 1 | CreateTsClientProxies(HostPort::FromBoundEndpoint(mini_server_->bound_rpc_addr()), |
569 | 1 | proxy_cache_.get(), |
570 | 1 | &proxy_, &admin_proxy_, &consensus_proxy_, &generic_proxy_); |
571 | | |
572 | 1 | WriteRequestPB req; |
573 | 1 | req.set_tablet_id(kTabletId); |
574 | | |
575 | 1 | WriteResponsePB resp; |
576 | 1 | rpc::RpcController controller; |
577 | | |
578 | | // We're expecting the write to fail. |
579 | 1 | ASSERT_OK(DCHECK_NOTNULL(proxy_.get())->Write(req, &resp, &controller)); |
580 | 1 | ASSERT_EQ(TabletServerErrorPB::TABLET_NOT_RUNNING, resp.error().code()); |
581 | 1 | ASSERT_STR_CONTAINS(resp.error().status().message(), "Tablet not RUNNING: FAILED"); |
582 | 1 | } |
583 | | |
584 | 1 | TEST_F(TabletServerTest, TestCreateTablet_TabletExists) { |
585 | 1 | CreateTabletRequestPB req; |
586 | 1 | CreateTabletResponsePB resp; |
587 | 1 | RpcController rpc; |
588 | | |
589 | 1 | req.set_dest_uuid(mini_server_->server()->fs_manager()->uuid()); |
590 | 1 | req.set_table_id("testtb"); |
591 | 1 | req.set_tablet_id(kTabletId); |
592 | 1 | req.set_table_name("testtb"); |
593 | 1 | req.mutable_config()->CopyFrom(mini_server_->CreateLocalConfig()); |
594 | | |
595 | 1 | Schema schema = SchemaBuilder(schema_).Build(); |
596 | 1 | SchemaToPB(schema, req.mutable_schema()); |
597 | | |
598 | | // Send the call |
599 | 1 | { |
600 | 1 | SCOPED_TRACE(req.DebugString()); |
601 | 1 | ASSERT_OK(admin_proxy_->CreateTablet(req, &resp, &rpc)); |
602 | 1 | SCOPED_TRACE(resp.DebugString()); |
603 | 1 | ASSERT_TRUE(resp.has_error()); |
604 | 1 | ASSERT_EQ(TabletServerErrorPB::TABLET_ALREADY_EXISTS, resp.error().code()); |
605 | 1 | } |
606 | 1 | } |
607 | | |
608 | 1 | TEST_F(TabletServerTest, TestDeleteTablet) { |
609 | 1 | std::shared_ptr<TabletPeer> tablet; |
610 | | |
611 | | // Verify that the tablet exists |
612 | 1 | ASSERT_TRUE(mini_server_->server()->tablet_manager()->LookupTablet(kTabletId, &tablet)); |
613 | | |
614 | | // Put some data in the tablet. We flush and insert more rows to ensure that |
615 | | // there is data both in the MRS and on disk. |
616 | 1 | ASSERT_NO_FATALS(InsertTestRowsRemote(0, 1, 1)); |
617 | 1 | ASSERT_OK(tablet_peer_->tablet()->Flush(tablet::FlushMode::kSync)); |
618 | 1 | ASSERT_NO_FATALS(InsertTestRowsRemote(0, 2, 1)); |
619 | | |
620 | | // Drop any local references to the tablet from within this test, |
621 | | // so that when we delete it on the server, it's not held alive |
622 | | // by the test code. |
623 | 1 | tablet_peer_.reset(); |
624 | 1 | tablet.reset(); |
625 | | |
626 | 1 | ASSERT_OK(CallDeleteTablet(mini_server_->server()->fs_manager()->uuid(), |
627 | 1 | kTabletId, |
628 | 1 | tablet::TABLET_DATA_DELETED)); |
629 | | |
630 | | // Verify that the tablet is removed from the tablet map |
631 | 1 | ASSERT_FALSE(mini_server_->server()->tablet_manager()->LookupTablet(kTabletId, &tablet)); |
632 | | |
633 | | // Verify that fetching metrics doesn't crash. Regression test for KUDU-638. |
634 | 1 | EasyCurl c; |
635 | 1 | faststring buf; |
636 | 1 | ASSERT_OK(c.FetchURL(strings::Substitute("http://$0/jsonmetricz", |
637 | 1 | AsString(mini_server_->bound_http_addr())), |
638 | 1 | &buf)); |
639 | | |
640 | | // Verify that after restarting the TS, the tablet is still not in the tablet manager. |
641 | | // This ensures that the on-disk metadata got removed. |
642 | 1 | Status s = ShutdownAndRebuildTablet(); |
643 | 2 | ASSERT_TRUE(s.IsNotFound()) << s.ToString(); |
644 | 1 | ASSERT_FALSE(mini_server_->server()->tablet_manager()->LookupTablet(kTabletId, &tablet)); |
645 | 1 | } |
646 | | |
647 | 1 | TEST_F(TabletServerTest, TestDeleteTablet_TabletNotCreated) { |
648 | 1 | Status s = CallDeleteTablet(mini_server_->server()->fs_manager()->uuid(), |
649 | 1 | "NotPresentTabletId", |
650 | 1 | tablet::TABLET_DATA_DELETED); |
651 | 2 | ASSERT_TRUE(s.IsNotFound()) << s.ToString(); |
652 | 1 | } |
653 | | |
654 | | // Test that with concurrent requests to delete the same tablet, one wins and |
655 | | // the other fails, with no assertion failures. Regression test for KUDU-345. |
656 | 1 | TEST_F(TabletServerTest, TestConcurrentDeleteTablet) { |
657 | | // Verify that the tablet exists |
658 | 1 | std::shared_ptr<TabletPeer> tablet; |
659 | 1 | ASSERT_TRUE(mini_server_->server()->tablet_manager()->LookupTablet(kTabletId, &tablet)); |
660 | | |
661 | 1 | static const int kNumDeletes = 2; |
662 | 1 | RpcController rpcs[kNumDeletes]; |
663 | 1 | DeleteTabletResponsePB responses[kNumDeletes]; |
664 | 1 | CountDownLatch latch(kNumDeletes); |
665 | | |
666 | 1 | DeleteTabletRequestPB req; |
667 | 1 | req.set_dest_uuid(mini_server_->server()->fs_manager()->uuid()); |
668 | 1 | req.set_tablet_id(kTabletId); |
669 | 1 | req.set_delete_type(tablet::TABLET_DATA_DELETED); |
670 | | |
671 | 3 | for (int i = 0; i < kNumDeletes; i++) { |
672 | 2 | SCOPED_TRACE(req.DebugString()); |
673 | 2 | admin_proxy_->DeleteTabletAsync( |
674 | 2 | req, &responses[i], &rpcs[i], [&latch]() { latch.CountDown(); }); |
675 | 2 | } |
676 | 1 | latch.Wait(); |
677 | | |
678 | 1 | int num_success = 0; |
679 | 3 | for (int i = 0; i < kNumDeletes; i++) { |
680 | 2 | ASSERT_TRUE(rpcs[i].finished()); |
681 | 2 | LOG(INFO) << "STATUS " << i << ": " << rpcs[i].status().ToString(); |
682 | 2 | LOG(INFO) << "RESPONSE " << i << ": " << responses[i].DebugString(); |
683 | 2 | if (!responses[i].has_error()) { |
684 | 1 | num_success++; |
685 | 1 | } |
686 | 2 | } |
687 | | |
688 | | // Verify that the tablet is removed from the tablet map |
689 | 1 | ASSERT_FALSE(mini_server_->server()->tablet_manager()->LookupTablet(kTabletId, &tablet)); |
690 | 1 | ASSERT_EQ(1, num_success); |
691 | 1 | } |
692 | | |
693 | 1 | TEST_F(TabletServerTest, TestInsertLatencyMicroBenchmark) { |
694 | 1 | METRIC_DEFINE_entity(test); |
695 | 1 | METRIC_DEFINE_coarse_histogram(test, insert_latency, |
696 | 1 | "Insert Latency", |
697 | 1 | MetricUnit::kMicroseconds, |
698 | 1 | "TabletServer single threaded insert latency."); |
699 | | |
700 | 1 | scoped_refptr<Histogram> histogram = METRIC_insert_latency.Instantiate(ts_test_metric_entity_); |
701 | | |
702 | 1 | auto warmup = AllowSlowTests() ? |
703 | 1 | FLAGS_single_threaded_insert_latency_bench_warmup_rows : 10; |
704 | | |
705 | 11 | for (int i = 0; i < warmup; i++) { |
706 | 10 | InsertTestRowsRemote(0, i, 1); |
707 | 10 | } |
708 | | |
709 | 1 | auto max_rows = AllowSlowTests() ? |
710 | 1 | FLAGS_single_threaded_insert_latency_bench_insert_rows : 100; |
711 | | |
712 | 1 | MonoTime start = MonoTime::Now(); |
713 | | |
714 | 101 | for (int i = warmup; i < warmup + max_rows; i++) { |
715 | 100 | MonoTime before = MonoTime::Now(); |
716 | 100 | InsertTestRowsRemote(0, i, 1); |
717 | 100 | MonoTime after = MonoTime::Now(); |
718 | 100 | MonoDelta delta = after.GetDeltaSince(before); |
719 | 100 | histogram->Increment(delta.ToMicroseconds()); |
720 | 100 | } |
721 | | |
722 | 1 | MonoTime end = MonoTime::Now(); |
723 | 1 | double throughput = ((max_rows - warmup) * 1.0) / end.GetDeltaSince(start).ToSeconds(); |
724 | | |
725 | | // Generate the JSON. |
726 | 1 | std::stringstream out; |
727 | 1 | JsonWriter writer(&out, JsonWriter::PRETTY); |
728 | 1 | ASSERT_OK(histogram->WriteAsJson(&writer, MetricJsonOptions())); |
729 | | |
730 | 1 | LOG(INFO) << "Throughput: " << throughput << " rows/sec."; |
731 | 1 | LOG(INFO) << out.str(); |
732 | 1 | } |
733 | | |
734 | | // Simple test to ensure we can destroy an RpcServer in different states of |
735 | | // initialization before Start()ing it. |
736 | 1 | TEST_F(TabletServerTest, TestRpcServerCreateDestroy) { |
737 | 1 | server::RpcServerOptions opts; |
738 | 1 | { |
739 | 1 | server::RpcServer server1( |
740 | 1 | "server1", opts, rpc::CreateConnectionContextFactory<rpc::YBInboundConnectionContext>()); |
741 | 1 | } |
742 | 1 | { |
743 | 1 | MessengerBuilder mb("foo"); |
744 | 1 | auto messenger = rpc::CreateAutoShutdownMessengerHolder(ASSERT_RESULT(mb.Build())); |
745 | 1 | { |
746 | 1 | server::RpcServer server2( |
747 | 1 | "server2", opts, rpc::CreateConnectionContextFactory<rpc::YBInboundConnectionContext>()); |
748 | 1 | ASSERT_OK(server2.Init(messenger.get())); |
749 | 1 | } |
750 | 1 | } |
751 | 1 | } |
752 | | |
753 | | // Simple test to ensure we can create RpcServer with different bind address options. |
754 | 1 | TEST_F(TabletServerTest, TestRpcServerRPCFlag) { |
755 | 1 | FLAGS_rpc_bind_addresses = "0.0.0.0:2000"; |
756 | 1 | server::RpcServerOptions opts; |
757 | 1 | ServerRegistrationPB reg; |
758 | 1 | auto tbo = ASSERT_RESULT(TabletServerOptions::CreateTabletServerOptions()); |
759 | 1 | MessengerBuilder mb("foo"); |
760 | 1 | auto messenger = CreateAutoShutdownMessengerHolder(ASSERT_RESULT(mb.Build())); |
761 | | |
762 | 1 | server::RpcServer server1( |
763 | 1 | "server1", opts, rpc::CreateConnectionContextFactory<rpc::YBInboundConnectionContext>()); |
764 | 1 | ASSERT_OK(server1.Init(messenger.get())); |
765 | | |
766 | 1 | FLAGS_rpc_bind_addresses = "0.0.0.0:2000,0.0.0.1:2001"; |
767 | 1 | server::RpcServerOptions opts2; |
768 | 1 | server::RpcServer server2( |
769 | 1 | "server2", opts2, rpc::CreateConnectionContextFactory<rpc::YBInboundConnectionContext>()); |
770 | 1 | ASSERT_OK(server2.Init(messenger.get())); |
771 | | |
772 | 1 | FLAGS_rpc_bind_addresses = "10.20.30.40:2017"; |
773 | 1 | server::RpcServerOptions opts3; |
774 | 1 | server::RpcServer server3( |
775 | 1 | "server3", opts3, rpc::CreateConnectionContextFactory<rpc::YBInboundConnectionContext>()); |
776 | 1 | ASSERT_OK(server3.Init(messenger.get())); |
777 | | |
778 | 1 | reg.Clear(); |
779 | 1 | tbo.fs_opts.data_paths = { GetTestPath("fake-ts") }; |
780 | 1 | tbo.rpc_opts = opts3; |
781 | 1 | TabletServer server(tbo); |
782 | | |
783 | 1 | ASSERT_NO_FATALS(WARN_NOT_OK(server.Init(), "Ignore")); |
784 | | // This call will fail for http binding, but this test is for rpc. |
785 | 1 | ASSERT_NO_FATALS(WARN_NOT_OK(server.GetRegistration(®), "Ignore")); |
786 | 1 | ASSERT_EQ(1, reg.private_rpc_addresses().size()); |
787 | 1 | ASSERT_EQ("10.20.30.40", reg.private_rpc_addresses(0).host()); |
788 | 1 | ASSERT_EQ(2017, reg.private_rpc_addresses(0).port()); |
789 | | |
790 | 1 | reg.Clear(); |
791 | 1 | FLAGS_rpc_bind_addresses = "10.20.30.40:2017,20.30.40.50:2018"; |
792 | 1 | server::RpcServerOptions opts4; |
793 | 1 | tbo.rpc_opts = opts4; |
794 | 1 | TabletServer tserver2(tbo); |
795 | 1 | ASSERT_NO_FATALS(WARN_NOT_OK(tserver2.Init(), "Ignore")); |
796 | | // This call will fail for http binding, but this test is for rpc. |
797 | 1 | ASSERT_NO_FATALS(WARN_NOT_OK(tserver2.GetRegistration(®), "Ignore")); |
798 | 1 | ASSERT_EQ(2, reg.private_rpc_addresses().size()); |
799 | 1 | } |
800 | | |
801 | | // We are not checking if a row is out of bounds in YB because we are using a completely different |
802 | | // hash-based partitioning scheme, so this test is now actually testing that there is no error |
803 | | // returned from the server. If we introduce such range checking, this test could be enhanced to |
804 | | // test for it. |
805 | 1 | TEST_F(TabletServerTest, TestWriteOutOfBounds) { |
806 | 1 | const char *tabletId = "TestWriteOutOfBoundsTablet"; |
807 | 1 | Schema schema = SchemaBuilder(schema_).Build(); |
808 | | |
809 | 1 | PartitionSchema partition_schema; |
810 | 1 | CHECK_OK(PartitionSchema::FromPB(PartitionSchemaPB(), schema, &partition_schema)); |
811 | | |
812 | 1 | Partition partition; |
813 | 1 | auto table_info = std::make_shared<tablet::TableInfo>( |
814 | 1 | "TestWriteOutOfBoundsTable", "test_ns", tabletId, YQL_TABLE_TYPE, schema, IndexMap(), |
815 | 1 | boost::none /* index_info */, 0 /* schema_version */, partition_schema); |
816 | 1 | ASSERT_OK(mini_server_->server()->tablet_manager()->CreateNewTablet( |
817 | 1 | table_info, tabletId, partition, mini_server_->CreateLocalConfig())); |
818 | | |
819 | 1 | ASSERT_OK(WaitForTabletRunning(tabletId)); |
820 | | |
821 | 1 | WriteRequestPB req; |
822 | 1 | WriteResponsePB resp; |
823 | 1 | RpcController controller; |
824 | 1 | req.set_tablet_id(tabletId); |
825 | | |
826 | 2 | for (auto op : { QLWriteRequestPB::QL_STMT_INSERT, QLWriteRequestPB::QL_STMT_UPDATE }) { |
827 | 2 | AddTestRow(20, 1, "1", op, &req); |
828 | 2 | ASSERT_OK(proxy_->Write(req, &resp, &controller)); |
829 | 2 | SCOPED_TRACE(resp.DebugString()); |
830 | 2 | ASSERT_FALSE(resp.has_error()); |
831 | 2 | req.clear_ql_write_batch(); |
832 | 2 | controller.Reset(); |
833 | 2 | } |
834 | 1 | } |
835 | | |
836 | | namespace { |
837 | | |
838 | 2 | void CalcTestRowChecksum(uint64_t *out, int32_t key, uint8_t string_field_defined = true) { |
839 | 2 | QLValue value; |
840 | | |
841 | 2 | string strval = strings::Substitute("original$0", key); |
842 | 2 | std::string buffer; |
843 | 2 | uint32_t index = 0; |
844 | 2 | buffer.append(pointer_cast<const char*>(&index), sizeof(index)); |
845 | 2 | value.set_int32_value(key); |
846 | 2 | value.value().AppendToString(&buffer); |
847 | | |
848 | 2 | index = 1; |
849 | 2 | buffer.append(pointer_cast<const char*>(&index), sizeof(index)); |
850 | 2 | value.value().AppendToString(&buffer); |
851 | | |
852 | 2 | index = 2; |
853 | 2 | buffer.append(pointer_cast<const char*>(&index), sizeof(index)); |
854 | 2 | buffer.append(pointer_cast<const char*>(&string_field_defined), sizeof(string_field_defined)); |
855 | 2 | if (string_field_defined) { |
856 | 1 | value.set_string_value(strval); |
857 | 1 | value.value().AppendToString(&buffer); |
858 | 1 | } |
859 | | |
860 | 2 | crc::Crc* crc = crc::GetCrc32cInstance(); |
861 | 2 | crc->Compute(buffer.c_str(), buffer.size(), out, nullptr); |
862 | 2 | } |
863 | | |
864 | | } // namespace |
865 | | |
866 | | // Simple test to check that our checksum scans work as expected. |
867 | 1 | TEST_F(TabletServerTest, TestChecksumScan) { |
868 | 1 | uint64_t total_crc = 0; |
869 | | |
870 | 1 | ChecksumRequestPB req; |
871 | 1 | req.set_tablet_id(kTabletId); |
872 | 1 | ChecksumResponsePB resp; |
873 | 1 | RpcController controller; |
874 | 1 | ASSERT_OK(proxy_->Checksum(req, &resp, &controller)); |
875 | | |
876 | | // No rows. |
877 | 1 | ASSERT_EQ(total_crc, resp.checksum()); |
878 | | |
879 | | // First row. |
880 | 1 | int32_t key = 1; |
881 | 1 | InsertTestRowsRemote(0, key, 1); |
882 | 1 | controller.Reset(); |
883 | 1 | ASSERT_OK(proxy_->Checksum(req, &resp, &controller)); |
884 | 1 | CalcTestRowChecksum(&total_crc, key); |
885 | 1 | uint64_t first_crc = total_crc; // Cache first record checksum. |
886 | | |
887 | 2 | ASSERT_FALSE(resp.has_error()) << resp.error().DebugString(); |
888 | 1 | ASSERT_EQ(total_crc, resp.checksum()); |
889 | | |
890 | | // Second row (null string field). |
891 | 1 | key = 2; |
892 | 1 | InsertTestRowsRemote(0, key, 1, 1, nullptr, kTabletId, nullptr, nullptr, false); |
893 | 1 | controller.Reset(); |
894 | 1 | ASSERT_OK(proxy_->Checksum(req, &resp, &controller)); |
895 | 1 | CalcTestRowChecksum(&total_crc, key, false); |
896 | | |
897 | 2 | ASSERT_FALSE(resp.has_error()) << resp.error().DebugString(); |
898 | 1 | ASSERT_EQ(total_crc, resp.checksum()); |
899 | | |
900 | | // Finally, delete row 2, so we're back to the row 1 checksum. |
901 | 1 | ASSERT_NO_FATALS(DeleteTestRowsRemote(key, 1)); |
902 | 1 | FLAGS_scanner_batch_size_rows = 100; |
903 | 1 | controller.Reset(); |
904 | 1 | ASSERT_OK(proxy_->Checksum(req, &resp, &controller)); |
905 | 1 | ASSERT_NE(total_crc, resp.checksum()); |
906 | 1 | ASSERT_EQ(first_crc, resp.checksum()); |
907 | 1 | } |
908 | | |
909 | | } // namespace tserver |
910 | | } // namespace yb |