YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/tserver/tablet_server-test.cc
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include "yb/common/index.h"
34
#include "yb/common/partition.h"
35
#include "yb/common/ql_value.h"
36
#include "yb/common/wire_protocol.h"
37
38
#include "yb/consensus/log-test-base.h"
39
40
#include "yb/gutil/strings/escaping.h"
41
#include "yb/gutil/strings/substitute.h"
42
43
#include "yb/rpc/messenger.h"
44
#include "yb/rpc/rpc_controller.h"
45
#include "yb/rpc/rpc_test_util.h"
46
#include "yb/rpc/yb_rpc.h"
47
48
#include "yb/server/hybrid_clock.h"
49
#include "yb/server/server_base.pb.h"
50
#include "yb/server/server_base.proxy.h"
51
52
#include "yb/tablet/tablet.h"
53
#include "yb/tablet/tablet_metadata.h"
54
#include "yb/tablet/tablet_peer.h"
55
56
#include "yb/tserver/mini_tablet_server.h"
57
#include "yb/tserver/tablet_server-test-base.h"
58
#include "yb/tserver/tablet_server.h"
59
#include "yb/tserver/tablet_server_test_util.h"
60
#include "yb/tserver/ts_tablet_manager.h"
61
#include "yb/tserver/tserver_admin.proxy.h"
62
#include "yb/tserver/tserver_service.proxy.h"
63
64
#include "yb/util/crc.h"
65
#include "yb/util/curl_util.h"
66
#include "yb/util/metrics.h"
67
#include "yb/util/status_log.h"
68
69
using yb::consensus::RaftConfigPB;
70
using yb::consensus::RaftPeerPB;
71
using yb::rpc::Messenger;
72
using yb::rpc::MessengerBuilder;
73
using yb::rpc::RpcController;
74
using yb::server::Clock;
75
using yb::server::HybridClock;
76
using yb::tablet::Tablet;
77
using yb::tablet::TabletPeer;
78
using std::shared_ptr;
79
using std::string;
80
using strings::Substitute;
81
82
DEFINE_int32(single_threaded_insert_latency_bench_warmup_rows, 100,
83
             "Number of rows to insert in the warmup phase of the single threaded"
84
             " tablet server insert latency micro-benchmark");
85
86
DEFINE_int32(single_threaded_insert_latency_bench_insert_rows, 1000,
87
             "Number of rows to insert in the testing phase of the single threaded"
88
             " tablet server insert latency micro-benchmark");
89
90
DECLARE_int32(scanner_batch_size_rows);
91
DECLARE_int32(metrics_retirement_age_ms);
92
DECLARE_string(block_manager);
93
DECLARE_string(rpc_bind_addresses);
94
DECLARE_bool(disable_clock_sync_error);
95
96
// Declare these metrics prototypes for simpler unit testing of their behavior.
97
METRIC_DECLARE_counter(rows_inserted);
98
METRIC_DECLARE_counter(rows_updated);
99
METRIC_DECLARE_counter(rows_deleted);
100
101
namespace yb {
102
namespace tserver {
103
104
class TabletServerTest : public TabletServerTestBase {
105
 public:
106
  explicit TabletServerTest(TableType table_type = YQL_TABLE_TYPE)
107
17
      : TabletServerTestBase(table_type) {}
108
109
  // Starts the tablet server, override to start it later.
110
17
  void SetUp() override {
111
17
    TabletServerTestBase::SetUp();
112
17
    StartTabletServer();
113
17
  }
114
115
  CHECKED_STATUS CallDeleteTablet(const std::string& uuid,
116
                    const char* tablet_id,
117
2
                    tablet::TabletDataState state) {
118
2
    DeleteTabletRequestPB req;
119
2
    DeleteTabletResponsePB resp;
120
2
    RpcController rpc;
121
122
2
    req.set_dest_uuid(uuid);
123
2
    req.set_tablet_id(tablet_id);
124
2
    req.set_delete_type(state);
125
126
    // Send the call
127
2
    {
128
2
      SCOPED_TRACE(req.DebugString());
129
2
      RETURN_NOT_OK(admin_proxy_->DeleteTablet(req, &resp, &rpc));
130
2
      SCOPED_TRACE(resp.DebugString());
131
2
      if (resp.has_error()) {
132
1
        auto status = StatusFromPB(resp.error().status());
133
1
        RETURN_NOT_OK(status);
134
1
      }
135
2
    }
136
1
    return Status::OK();
137
2
  }
138
};
139
140
1
TEST_F(TabletServerTest, TestPingServer) {
141
  // Ping the server.
142
1
  server::PingRequestPB req;
143
1
  server::PingResponsePB resp;
144
1
  RpcController controller;
145
1
  ASSERT_OK(generic_proxy_->Ping(req, &resp, &controller));
146
1
}
147
148
1
TEST_F(TabletServerTest, TestServerClock) {
149
1
  server::ServerClockRequestPB req;
150
1
  server::ServerClockResponsePB resp;
151
1
  RpcController controller;
152
153
1
  ASSERT_OK(generic_proxy_->ServerClock(req, &resp, &controller));
154
1
  ASSERT_GT(mini_server_->server()->clock()->Now().ToUint64(), resp.hybrid_time());
155
1
}
156
157
1
TEST_F(TabletServerTest, TestSetFlagsAndCheckWebPages) {
158
1
  server::GenericServiceProxy proxy(
159
1
      proxy_cache_.get(), HostPort::FromBoundEndpoint(mini_server_->bound_rpc_addr()));
160
161
1
  server::SetFlagRequestPB req;
162
1
  server::SetFlagResponsePB resp;
163
164
  // Set an invalid flag.
165
1
  {
166
1
    RpcController controller;
167
1
    req.set_flag("foo");
168
1
    req.set_value("bar");
169
1
    ASSERT_OK(proxy.SetFlag(req, &resp, &controller));
170
1
    SCOPED_TRACE(resp.DebugString());
171
1
    EXPECT_EQ(server::SetFlagResponsePB::NO_SUCH_FLAG, resp.result());
172
1
    EXPECT_TRUE(resp.msg().empty());
173
1
  }
174
175
  // Set a valid flag to a valid value.
176
1
  {
177
1
    int32_t old_val = FLAGS_metrics_retirement_age_ms;
178
1
    RpcController controller;
179
1
    req.set_flag("metrics_retirement_age_ms");
180
1
    req.set_value("12345");
181
1
    ASSERT_OK(proxy.SetFlag(req, &resp, &controller));
182
1
    SCOPED_TRACE(resp.DebugString());
183
1
    EXPECT_EQ(server::SetFlagResponsePB::SUCCESS, resp.result());
184
1
    EXPECT_EQ(resp.msg(), "metrics_retirement_age_ms set to 12345\n");
185
1
    EXPECT_EQ(Substitute("$0", old_val), resp.old_value());
186
1
    EXPECT_EQ(12345, FLAGS_metrics_retirement_age_ms);
187
1
  }
188
189
  // Set a valid flag to an invalid value.
190
1
  {
191
1
    RpcController controller;
192
1
    req.set_flag("metrics_retirement_age_ms");
193
1
    req.set_value("foo");
194
1
    ASSERT_OK(proxy.SetFlag(req, &resp, &controller));
195
1
    SCOPED_TRACE(resp.DebugString());
196
1
    EXPECT_EQ(server::SetFlagResponsePB::BAD_VALUE, resp.result());
197
1
    EXPECT_EQ(resp.msg(), "Unable to set flag: bad value");
198
1
    EXPECT_EQ(12345, FLAGS_metrics_retirement_age_ms);
199
1
  }
200
201
  // Try setting a flag which isn't runtime-modifiable
202
1
  {
203
1
    RpcController controller;
204
1
    req.set_flag("tablet_do_dup_key_checks");
205
1
    req.set_value("true");
206
1
    ASSERT_OK(proxy.SetFlag(req, &resp, &controller));
207
1
    SCOPED_TRACE(resp.DebugString());
208
1
    EXPECT_EQ(server::SetFlagResponsePB::NOT_SAFE, resp.result());
209
1
  }
210
211
  // Try again, but with the force flag.
212
1
  {
213
1
    RpcController controller;
214
1
    req.set_flag("tablet_do_dup_key_checks");
215
1
    req.set_value("true");
216
1
    req.set_force(true);
217
1
    ASSERT_OK(proxy.SetFlag(req, &resp, &controller));
218
1
    SCOPED_TRACE(resp.DebugString());
219
1
    EXPECT_EQ(server::SetFlagResponsePB::SUCCESS, resp.result());
220
1
  }
221
222
1
  EasyCurl c;
223
1
  faststring buf;
224
1
  string addr = yb::ToString(mini_server_->bound_http_addr());
225
226
  // Tablets page should list tablet.
227
1
  ASSERT_OK(c.FetchURL(Substitute("http://$0/tablets", addr),
228
1
                       &buf));
229
1
  ASSERT_STR_CONTAINS(buf.ToString(), kTabletId);
230
231
  // Tablet page should include the schema.
232
1
  ASSERT_OK(c.FetchURL(Substitute("http://$0/tablet?id=$1", addr, kTabletId),
233
1
                       &buf));
234
1
  ASSERT_STR_CONTAINS(buf.ToString(), "<th>key</th>");
235
1
  ASSERT_STR_CONTAINS(buf.ToString(), "<td>string NULLABLE NOT A PARTITION KEY</td>");
236
237
1
  ASSERT_OK(c.FetchURL(Substitute("http://$0/tablet-consensus-status?id=$1",
238
1
                       addr, kTabletId), &buf));
239
1
  ASSERT_STR_CONTAINS(buf.ToString(), kTabletId);
240
241
  // Test fetching metrics.
242
  // Fetching metrics has the side effect of retiring metrics, but not in a single pass.
243
  // So, we check a couple of times in a loop -- thus, if we had a bug where one of these
244
  // metrics was accidentally un-referenced too early, we'd cause it to get retired.
245
  // If the metrics survive several passes of fetching, then we are pretty sure they will
246
  // stick around properly for the whole lifetime of the server.
247
1
  FLAGS_metrics_retirement_age_ms = 0;
248
4
  for (int i = 0; i < 3; i++) {
249
3
    SCOPED_TRACE(i);
250
3
    ASSERT_OK(c.FetchURL(strings::Substitute("http://$0/jsonmetricz", addr, kTabletId),
251
3
                                &buf));
252
253
    // Check that the tablet entry shows up.
254
3
    ASSERT_STR_CONTAINS(buf.ToString(), "\"type\": \"tablet\"");
255
3
    ASSERT_STR_CONTAINS(buf.ToString(), "\"id\": \"test-tablet\"");
256
257
    // Check entity attributes.
258
3
    ASSERT_STR_CONTAINS(buf.ToString(), "\"table_name\": \"test-table\"");
259
260
    // Check for the existence of some particular metrics for which we've had early-retirement
261
    // bugs in the past.
262
3
    ASSERT_STR_CONTAINS(buf.ToString(), "hybrid_clock_hybrid_time");
263
3
    ASSERT_STR_CONTAINS(buf.ToString(), "threads_started");
264
#ifdef TCMALLOC_ENABLED
265
    ASSERT_STR_CONTAINS(buf.ToString(), "tcmalloc_max_total_thread_cache_bytes");
266
#endif
267
3
    ASSERT_STR_CONTAINS(buf.ToString(), "glog_info_messages");
268
3
  }
269
270
  // Smoke-test the tracing infrastructure.
271
1
  ASSERT_OK(c.FetchURL(
272
1
                Substitute("http://$0/tracing/json/get_buffer_percent_full", addr, kTabletId),
273
1
                &buf));
274
1
  ASSERT_EQ(buf.ToString(), "0");
275
276
1
  string enable_req_json = "{\"categoryFilter\":\"*\", \"useContinuousTracing\": \"true\","
277
1
    " \"useSampling\": \"false\"}";
278
1
  string req_b64;
279
1
  Base64Escape(enable_req_json, &req_b64);
280
281
1
  ASSERT_OK(c.FetchURL(Substitute("http://$0/tracing/json/begin_recording?$1",
282
1
                                         addr,
283
1
                                         req_b64), &buf));
284
1
  ASSERT_EQ(buf.ToString(), "");
285
1
  ASSERT_OK(c.FetchURL(Substitute("http://$0/tracing/json/end_recording", addr),
286
1
                       &buf));
287
1
  ASSERT_STR_CONTAINS(buf.ToString(), "__metadata");
288
1
  ASSERT_OK(c.FetchURL(Substitute("http://$0/tracing/json/categories", addr),
289
1
                       &buf));
290
1
  ASSERT_STR_CONTAINS(buf.ToString(), "\"rpc\"");
291
292
  // Smoke test the pprof contention profiler handler.
293
1
  ASSERT_OK(c.FetchURL(Substitute("http://$0/pprof/contention?seconds=1", addr),
294
1
                       &buf));
295
1
  ASSERT_STR_CONTAINS(buf.ToString(), "Discarded samples = 0");
296
#if defined(__linux__)
297
  // The executable name appears as part of the dump of /proc/self/maps, which
298
  // only exists on Linux.
299
  ASSERT_STR_CONTAINS(buf.ToString(), "tablet_server-test");
300
#endif
301
1
}
302
303
1
TEST_F(TabletServerTest, TestInsert) {
304
1
  WriteRequestPB req;
305
306
1
  req.set_tablet_id(kTabletId);
307
308
1
  WriteResponsePB resp;
309
1
  RpcController controller;
310
311
1
  std::shared_ptr<TabletPeer> tablet;
312
1
  ASSERT_TRUE(mini_server_->server()->tablet_manager()->LookupTablet(kTabletId, &tablet));
313
1
  scoped_refptr<Counter> rows_inserted =
314
1
      METRIC_rows_inserted.Instantiate(tablet->tablet()->GetTabletMetricsEntity());
315
1
  ASSERT_EQ(0, rows_inserted->value());
316
1
  tablet.reset();
317
318
  // Send an empty request.
319
  // This should succeed and do nothing.
320
1
  {
321
1
    controller.Reset();
322
1
    SCOPED_TRACE(req.DebugString());
323
1
    ASSERT_OK(proxy_->Write(req, &resp, &controller));
324
1
    SCOPED_TRACE(resp.DebugString());
325
1
    ASSERT_FALSE(resp.has_error());
326
1
  }
327
328
  // Send an actual row insert.
329
1
  {
330
1
    controller.Reset();
331
1
    AddTestRowInsert(1234, 5678, "hello world via RPC", &req);
332
1
    SCOPED_TRACE(req.DebugString());
333
1
    ASSERT_OK(proxy_->Write(req, &resp, &controller));
334
1
    SCOPED_TRACE(resp.DebugString());
335
1
    ASSERT_FALSE(resp.has_error());
336
1
    req.clear_ql_write_batch();
337
1
  }
338
339
  // Send a batch with multiple rows, one of which is a duplicate of
340
  // the above insert. This should generate one error into per_row_errors.
341
1
  {
342
1
    controller.Reset();
343
1
    AddTestRowInsert(1, 1, "ceci n'est pas une dupe", &req);
344
1
    AddTestRowInsert(2, 1, "also not a dupe key", &req);
345
1
    AddTestRowInsert(1234, 1, "I am a duplicate key", &req);
346
1
    SCOPED_TRACE(req.DebugString());
347
1
    ASSERT_OK(proxy_->Write(req, &resp, &controller));
348
1
    SCOPED_TRACE(resp.DebugString());
349
2
    ASSERT_FALSE(resp.has_error()) << resp.ShortDebugString();
350
1
  }
351
352
  // get the clock's current hybrid_time
353
1
  HybridTime now_before = mini_server_->server()->clock()->Now();
354
355
1
  rows_inserted = nullptr;
356
1
  ASSERT_OK(ShutdownAndRebuildTablet());
357
1
  VerifyRows(schema_, { KeyValue(1, 1), KeyValue(2, 1), KeyValue(1234, 5678) });
358
359
  // get the clock's hybrid_time after replay
360
1
  HybridTime now_after = mini_server_->server()->clock()->Now();
361
362
  // make sure 'now_after' is greater than or equal to 'now_before'
363
1
  ASSERT_GE(now_after.value(), now_before.value());
364
1
}
365
366
1
TEST_F(TabletServerTest, TestExternalConsistencyModes_ClientPropagated) {
367
1
  WriteRequestPB req;
368
1
  req.set_tablet_id(kTabletId);
369
1
  WriteResponsePB resp;
370
1
  RpcController controller;
371
372
1
  std::shared_ptr<TabletPeer> tablet;
373
1
  ASSERT_TRUE(
374
1
      mini_server_->server()->tablet_manager()->LookupTablet(kTabletId,
375
1
                                                             &tablet));
376
  // get the current time
377
1
  HybridTime current = mini_server_->server()->clock()->Now();
378
  // advance current to some time in the future. we do 5 secs to make
379
  // sure this hybrid_time will still be in the future when it reaches the
380
  // server.
381
1
  current = HybridClock::HybridTimeFromMicroseconds(
382
1
      HybridClock::GetPhysicalValueMicros(current) + 5000000);
383
384
1
  AddTestRowInsert(1234, 5678, "hello world via RPC", &req);
385
386
1
  req.set_propagated_hybrid_time(current.ToUint64());
387
1
  SCOPED_TRACE(req.DebugString());
388
1
  ASSERT_OK(proxy_->Write(req, &resp, &controller));
389
1
  SCOPED_TRACE(resp.DebugString());
390
1
  ASSERT_FALSE(resp.has_error());
391
392
  // make sure the server returned a write hybrid_time where only
393
  // the logical value was increased since he should have updated
394
  // its clock with the client's value.
395
1
  HybridTime write_hybrid_time(resp.propagated_hybrid_time());
396
397
1
  ASSERT_EQ(HybridClock::GetPhysicalValueMicros(current),
398
1
            HybridClock::GetPhysicalValueMicros(write_hybrid_time));
399
400
1
  ASSERT_LE(HybridClock::GetLogicalValue(current) + 1,
401
1
            HybridClock::GetLogicalValue(write_hybrid_time));
402
1
}
403
404
1
TEST_F(TabletServerTest, TestInsertAndMutate) {
405
406
1
  std::shared_ptr<TabletPeer> tablet;
407
1
  ASSERT_TRUE(mini_server_->server()->tablet_manager()->LookupTablet(kTabletId, &tablet));
408
1
  tablet.reset();
409
410
1
  RpcController controller;
411
412
1
  {
413
1
    WriteRequestPB req;
414
1
    WriteResponsePB resp;
415
1
    req.set_tablet_id(kTabletId);
416
417
1
    AddTestRowInsert(1, 1, "original1", &req);
418
1
    AddTestRowInsert(2, 2, "original2", &req);
419
1
    AddTestRowInsert(3, 3, "original3", &req);
420
1
    SCOPED_TRACE(req.DebugString());
421
1
    ASSERT_OK(proxy_->Write(req, &resp, &controller));
422
1
    SCOPED_TRACE(resp.DebugString());
423
2
    ASSERT_FALSE(resp.has_error()) << resp.ShortDebugString();
424
1
    controller.Reset();
425
1
  }
426
427
  // Try and mutate the rows inserted above
428
1
  {
429
1
    WriteRequestPB req;
430
1
    WriteResponsePB resp;
431
1
    req.set_tablet_id(kTabletId);
432
433
1
    AddTestRowUpdate(1, 2, "mutation1", &req);
434
1
    AddTestRowUpdate(2, 3, "mutation2", &req);
435
1
    AddTestRowUpdate(3, 4, "mutation3", &req);
436
1
    SCOPED_TRACE(req.DebugString());
437
1
    ASSERT_OK(proxy_->Write(req, &resp, &controller));
438
1
    SCOPED_TRACE(resp.DebugString());
439
2
    ASSERT_FALSE(resp.has_error()) << resp.ShortDebugString();
440
1
    controller.Reset();
441
1
  }
442
443
  // Try and mutate a non existent row key (should not get an error)
444
1
  {
445
1
    WriteRequestPB req;
446
1
    WriteResponsePB resp;
447
1
    req.set_tablet_id(kTabletId);
448
449
1
    AddTestRowUpdate(1234, 2, "mutated", &req);
450
1
    SCOPED_TRACE(req.DebugString());
451
1
    ASSERT_OK(proxy_->Write(req, &resp, &controller));
452
1
    SCOPED_TRACE(resp.DebugString());
453
2
    ASSERT_FALSE(resp.has_error()) << resp.ShortDebugString();
454
1
    controller.Reset();
455
1
  }
456
457
  // Try and delete 1 row
458
1
  {
459
1
    WriteRequestPB req;
460
1
    WriteResponsePB resp;
461
1
    req.set_tablet_id(kTabletId);
462
463
1
    AddTestRowDelete(1, &req);
464
1
    SCOPED_TRACE(req.DebugString());
465
1
    ASSERT_OK(proxy_->Write(req, &resp, &controller));
466
1
    SCOPED_TRACE(resp.DebugString());
467
2
    ASSERT_FALSE(resp.has_error()) << resp.ShortDebugString();
468
1
    controller.Reset();
469
1
  }
470
471
  // Now try and mutate a row we just deleted, we should not get an error
472
1
  {
473
1
    WriteRequestPB req;
474
1
    WriteResponsePB resp;
475
1
    req.set_tablet_id(kTabletId);
476
477
1
    AddTestRowUpdate(1, 2, "mutated1", &req);
478
1
    SCOPED_TRACE(req.DebugString());
479
1
    ASSERT_OK(proxy_->Write(req, &resp, &controller));
480
1
    SCOPED_TRACE(resp.DebugString());
481
2
    ASSERT_FALSE(resp.has_error()) << resp.ShortDebugString();
482
1
    controller.Reset();
483
1
  }
484
485
  // At this point, we have two left.
486
1
  VerifyRows(schema_, {KeyValue(1, 2), KeyValue(2, 3), KeyValue(3, 4), KeyValue(1234, 2)});
487
488
  // Do a mixed operation (some insert, update, and delete, some of which fail)
489
1
  {
490
1
    WriteRequestPB req;
491
1
    WriteResponsePB resp;
492
1
    req.set_tablet_id(kTabletId);
493
494
    // op 0: Mutate row 1, which doesn't exist. This should not fail.
495
1
    AddTestRowUpdate(1, 3, "mutate_should_not_fail", &req);
496
    // op 1: Insert a new row 4 (succeeds)
497
1
    AddTestRowInsert(4, 4, "new row 4", &req);
498
    // op 2: Delete a non-existent row 5 (should fail)
499
1
    AddTestRowDelete(5, &req);
500
    // op 3: Insert a new row 6 (succeeds)
501
1
    AddTestRowInsert(6, 6, "new row 6", &req);
502
503
1
    SCOPED_TRACE(req.DebugString());
504
1
    ASSERT_OK(proxy_->Write(req, &resp, &controller));
505
1
    SCOPED_TRACE(resp.DebugString());
506
2
    ASSERT_FALSE(resp.has_error()) << resp.ShortDebugString();
507
1
    controller.Reset();
508
1
  }
509
510
  // get the clock's current hybrid_time
511
1
  HybridTime now_before = mini_server_->server()->clock()->Now();
512
513
1
  ASSERT_NO_FATALS(WARN_NOT_OK(ShutdownAndRebuildTablet(), "Shutdown failed: "));
514
1
  VerifyRows(schema_,
515
1
      { KeyValue(1, 3), KeyValue(2, 3), KeyValue(3, 4), KeyValue(4, 4), KeyValue(6, 6),
516
1
        KeyValue(1234, 2) });
517
518
  // get the clock's hybrid_time after replay
519
1
  HybridTime now_after = mini_server_->server()->clock()->Now();
520
521
  // make sure 'now_after' is greater that or equal to 'now_before'
522
1
  ASSERT_GE(now_after.value(), now_before.value());
523
1
}
524
525
// Test that passing a schema with fields not present in the tablet schema
526
// throws an exception.
527
1
TEST_F(TabletServerTest, TestInvalidWriteRequest_BadSchema) {
528
1
  SchemaBuilder schema_builder(schema_);
529
1
  ASSERT_OK(schema_builder.AddColumn("col_doesnt_exist", INT32));
530
1
  Schema bad_schema_with_ids = schema_builder.Build();
531
1
  Schema bad_schema = schema_builder.BuildWithoutIds();
532
533
  // Send a row insert with an extra column
534
1
  {
535
1
    WriteRequestPB req;
536
1
    WriteResponsePB resp;
537
1
    RpcController controller;
538
539
1
    req.set_tablet_id(kTabletId);
540
541
1
    AddTestRowInsert(1234, 5678, "hello world via RPC", &req);
542
1
    req.mutable_ql_write_batch(0)->set_schema_version(1);
543
544
1
    SCOPED_TRACE(req.DebugString());
545
1
    ASSERT_OK(proxy_->Write(req, &resp, &controller));
546
1
    SCOPED_TRACE(resp.DebugString());
547
1
    ASSERT_FALSE(resp.has_error());
548
1
    ASSERT_EQ(QLResponsePB::YQL_STATUS_SCHEMA_VERSION_MISMATCH, resp.ql_response_batch(0).status());
549
1
  }
550
1
}
551
552
1
TEST_F(TabletServerTest, TestClientGetsErrorBackWhenRecoveryFailed) {
553
1
  ASSERT_NO_FATALS(InsertTestRowsRemote(0, 1, 7));
554
555
1
  ASSERT_OK(tablet_peer_->tablet()->Flush(tablet::FlushMode::kSync));
556
557
  // Save the log path before shutting down the tablet (and destroying
558
  // the tablet peer).
559
1
  string log_path = tablet_peer_->log()->ActiveSegmentForTests()->path();
560
1
  auto idx = tablet_peer_->log()->ActiveSegmentForTests()->first_entry_offset() + 300;
561
562
1
  ShutdownTablet();
563
1
  ASSERT_OK(log::CorruptLogFile(env_.get(), log_path, log::FLIP_BYTE, idx));
564
565
1
  ASSERT_FALSE(ShutdownAndRebuildTablet().ok());
566
567
  // Connect to it.
568
1
  CreateTsClientProxies(HostPort::FromBoundEndpoint(mini_server_->bound_rpc_addr()),
569
1
                        proxy_cache_.get(),
570
1
                        &proxy_, &admin_proxy_, &consensus_proxy_, &generic_proxy_);
571
572
1
  WriteRequestPB req;
573
1
  req.set_tablet_id(kTabletId);
574
575
1
  WriteResponsePB resp;
576
1
  rpc::RpcController controller;
577
578
  // We're expecting the write to fail.
579
1
  ASSERT_OK(DCHECK_NOTNULL(proxy_.get())->Write(req, &resp, &controller));
580
1
  ASSERT_EQ(TabletServerErrorPB::TABLET_NOT_RUNNING, resp.error().code());
581
1
  ASSERT_STR_CONTAINS(resp.error().status().message(), "Tablet not RUNNING: FAILED");
582
1
}
583
584
1
TEST_F(TabletServerTest, TestCreateTablet_TabletExists) {
585
1
  CreateTabletRequestPB req;
586
1
  CreateTabletResponsePB resp;
587
1
  RpcController rpc;
588
589
1
  req.set_dest_uuid(mini_server_->server()->fs_manager()->uuid());
590
1
  req.set_table_id("testtb");
591
1
  req.set_tablet_id(kTabletId);
592
1
  req.set_table_name("testtb");
593
1
  req.mutable_config()->CopyFrom(mini_server_->CreateLocalConfig());
594
595
1
  Schema schema = SchemaBuilder(schema_).Build();
596
1
  SchemaToPB(schema, req.mutable_schema());
597
598
  // Send the call
599
1
  {
600
1
    SCOPED_TRACE(req.DebugString());
601
1
    ASSERT_OK(admin_proxy_->CreateTablet(req, &resp, &rpc));
602
1
    SCOPED_TRACE(resp.DebugString());
603
1
    ASSERT_TRUE(resp.has_error());
604
1
    ASSERT_EQ(TabletServerErrorPB::TABLET_ALREADY_EXISTS, resp.error().code());
605
1
  }
606
1
}
607
608
1
TEST_F(TabletServerTest, TestDeleteTablet) {
609
1
  std::shared_ptr<TabletPeer> tablet;
610
611
  // Verify that the tablet exists
612
1
  ASSERT_TRUE(mini_server_->server()->tablet_manager()->LookupTablet(kTabletId, &tablet));
613
614
  // Put some data in the tablet. We flush and insert more rows to ensure that
615
  // there is data both in the MRS and on disk.
616
1
  ASSERT_NO_FATALS(InsertTestRowsRemote(0, 1, 1));
617
1
  ASSERT_OK(tablet_peer_->tablet()->Flush(tablet::FlushMode::kSync));
618
1
  ASSERT_NO_FATALS(InsertTestRowsRemote(0, 2, 1));
619
620
  // Drop any local references to the tablet from within this test,
621
  // so that when we delete it on the server, it's not held alive
622
  // by the test code.
623
1
  tablet_peer_.reset();
624
1
  tablet.reset();
625
626
1
  ASSERT_OK(CallDeleteTablet(mini_server_->server()->fs_manager()->uuid(),
627
1
                             kTabletId,
628
1
                             tablet::TABLET_DATA_DELETED));
629
630
  // Verify that the tablet is removed from the tablet map
631
1
  ASSERT_FALSE(mini_server_->server()->tablet_manager()->LookupTablet(kTabletId, &tablet));
632
633
  // Verify that fetching metrics doesn't crash. Regression test for KUDU-638.
634
1
  EasyCurl c;
635
1
  faststring buf;
636
1
  ASSERT_OK(c.FetchURL(strings::Substitute("http://$0/jsonmetricz",
637
1
                                           AsString(mini_server_->bound_http_addr())),
638
1
                                           &buf));
639
640
  // Verify that after restarting the TS, the tablet is still not in the tablet manager.
641
  // This ensures that the on-disk metadata got removed.
642
1
  Status s = ShutdownAndRebuildTablet();
643
2
  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
644
1
  ASSERT_FALSE(mini_server_->server()->tablet_manager()->LookupTablet(kTabletId, &tablet));
645
1
}
646
647
1
TEST_F(TabletServerTest, TestDeleteTablet_TabletNotCreated) {
648
1
  Status s = CallDeleteTablet(mini_server_->server()->fs_manager()->uuid(),
649
1
                              "NotPresentTabletId",
650
1
                              tablet::TABLET_DATA_DELETED);
651
2
  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
652
1
}
653
654
// Test that with concurrent requests to delete the same tablet, one wins and
655
// the other fails, with no assertion failures. Regression test for KUDU-345.
656
1
TEST_F(TabletServerTest, TestConcurrentDeleteTablet) {
657
  // Verify that the tablet exists
658
1
  std::shared_ptr<TabletPeer> tablet;
659
1
  ASSERT_TRUE(mini_server_->server()->tablet_manager()->LookupTablet(kTabletId, &tablet));
660
661
1
  static const int kNumDeletes = 2;
662
1
  RpcController rpcs[kNumDeletes];
663
1
  DeleteTabletResponsePB responses[kNumDeletes];
664
1
  CountDownLatch latch(kNumDeletes);
665
666
1
  DeleteTabletRequestPB req;
667
1
  req.set_dest_uuid(mini_server_->server()->fs_manager()->uuid());
668
1
  req.set_tablet_id(kTabletId);
669
1
  req.set_delete_type(tablet::TABLET_DATA_DELETED);
670
671
3
  for (int i = 0; i < kNumDeletes; i++) {
672
2
    SCOPED_TRACE(req.DebugString());
673
2
    admin_proxy_->DeleteTabletAsync(
674
2
        req, &responses[i], &rpcs[i], [&latch]() { latch.CountDown(); });
675
2
  }
676
1
  latch.Wait();
677
678
1
  int num_success = 0;
679
3
  for (int i = 0; i < kNumDeletes; i++) {
680
2
    ASSERT_TRUE(rpcs[i].finished());
681
2
    LOG(INFO) << "STATUS " << i << ": " << rpcs[i].status().ToString();
682
2
    LOG(INFO) << "RESPONSE " << i << ": " << responses[i].DebugString();
683
2
    if (!responses[i].has_error()) {
684
1
      num_success++;
685
1
    }
686
2
  }
687
688
  // Verify that the tablet is removed from the tablet map
689
1
  ASSERT_FALSE(mini_server_->server()->tablet_manager()->LookupTablet(kTabletId, &tablet));
690
1
  ASSERT_EQ(1, num_success);
691
1
}
692
693
1
TEST_F(TabletServerTest, TestInsertLatencyMicroBenchmark) {
694
1
  METRIC_DEFINE_entity(test);
695
1
  METRIC_DEFINE_coarse_histogram(test, insert_latency,
696
1
                          "Insert Latency",
697
1
                          MetricUnit::kMicroseconds,
698
1
                          "TabletServer single threaded insert latency.");
699
700
1
  scoped_refptr<Histogram> histogram = METRIC_insert_latency.Instantiate(ts_test_metric_entity_);
701
702
1
  auto warmup = AllowSlowTests() ?
703
1
      FLAGS_single_threaded_insert_latency_bench_warmup_rows : 10;
704
705
11
  for (int i = 0; i < warmup; i++) {
706
10
    InsertTestRowsRemote(0, i, 1);
707
10
  }
708
709
1
  auto max_rows = AllowSlowTests() ?
710
1
      FLAGS_single_threaded_insert_latency_bench_insert_rows : 100;
711
712
1
  MonoTime start = MonoTime::Now();
713
714
101
  for (int i = warmup; i < warmup + max_rows; i++) {
715
100
    MonoTime before = MonoTime::Now();
716
100
    InsertTestRowsRemote(0, i, 1);
717
100
    MonoTime after = MonoTime::Now();
718
100
    MonoDelta delta = after.GetDeltaSince(before);
719
100
    histogram->Increment(delta.ToMicroseconds());
720
100
  }
721
722
1
  MonoTime end = MonoTime::Now();
723
1
  double throughput = ((max_rows - warmup) * 1.0) / end.GetDeltaSince(start).ToSeconds();
724
725
  // Generate the JSON.
726
1
  std::stringstream out;
727
1
  JsonWriter writer(&out, JsonWriter::PRETTY);
728
1
  ASSERT_OK(histogram->WriteAsJson(&writer, MetricJsonOptions()));
729
730
1
  LOG(INFO) << "Throughput: " << throughput << " rows/sec.";
731
1
  LOG(INFO) << out.str();
732
1
}
733
734
// Simple test to ensure we can destroy an RpcServer in different states of
735
// initialization before Start()ing it.
736
1
TEST_F(TabletServerTest, TestRpcServerCreateDestroy) {
737
1
  server::RpcServerOptions opts;
738
1
  {
739
1
    server::RpcServer server1(
740
1
        "server1", opts, rpc::CreateConnectionContextFactory<rpc::YBInboundConnectionContext>());
741
1
  }
742
1
  {
743
1
    MessengerBuilder mb("foo");
744
1
    auto messenger = rpc::CreateAutoShutdownMessengerHolder(ASSERT_RESULT(mb.Build()));
745
1
    {
746
1
      server::RpcServer server2(
747
1
          "server2", opts, rpc::CreateConnectionContextFactory<rpc::YBInboundConnectionContext>());
748
1
      ASSERT_OK(server2.Init(messenger.get()));
749
1
    }
750
1
  }
751
1
}
752
753
// Simple test to ensure we can create RpcServer with different bind address options.
754
1
TEST_F(TabletServerTest, TestRpcServerRPCFlag) {
755
1
  FLAGS_rpc_bind_addresses = "0.0.0.0:2000";
756
1
  server::RpcServerOptions opts;
757
1
  ServerRegistrationPB reg;
758
1
  auto tbo = ASSERT_RESULT(TabletServerOptions::CreateTabletServerOptions());
759
1
  MessengerBuilder mb("foo");
760
1
  auto messenger = CreateAutoShutdownMessengerHolder(ASSERT_RESULT(mb.Build()));
761
762
1
  server::RpcServer server1(
763
1
      "server1", opts, rpc::CreateConnectionContextFactory<rpc::YBInboundConnectionContext>());
764
1
  ASSERT_OK(server1.Init(messenger.get()));
765
766
1
  FLAGS_rpc_bind_addresses = "0.0.0.0:2000,0.0.0.1:2001";
767
1
  server::RpcServerOptions opts2;
768
1
  server::RpcServer server2(
769
1
      "server2", opts2, rpc::CreateConnectionContextFactory<rpc::YBInboundConnectionContext>());
770
1
  ASSERT_OK(server2.Init(messenger.get()));
771
772
1
  FLAGS_rpc_bind_addresses = "10.20.30.40:2017";
773
1
  server::RpcServerOptions opts3;
774
1
  server::RpcServer server3(
775
1
      "server3", opts3, rpc::CreateConnectionContextFactory<rpc::YBInboundConnectionContext>());
776
1
  ASSERT_OK(server3.Init(messenger.get()));
777
778
1
  reg.Clear();
779
1
  tbo.fs_opts.data_paths = { GetTestPath("fake-ts") };
780
1
  tbo.rpc_opts = opts3;
781
1
  TabletServer server(tbo);
782
783
1
  ASSERT_NO_FATALS(WARN_NOT_OK(server.Init(), "Ignore"));
784
  // This call will fail for http binding, but this test is for rpc.
785
1
  ASSERT_NO_FATALS(WARN_NOT_OK(server.GetRegistration(&reg), "Ignore"));
786
1
  ASSERT_EQ(1, reg.private_rpc_addresses().size());
787
1
  ASSERT_EQ("10.20.30.40", reg.private_rpc_addresses(0).host());
788
1
  ASSERT_EQ(2017, reg.private_rpc_addresses(0).port());
789
790
1
  reg.Clear();
791
1
  FLAGS_rpc_bind_addresses = "10.20.30.40:2017,20.30.40.50:2018";
792
1
  server::RpcServerOptions opts4;
793
1
  tbo.rpc_opts = opts4;
794
1
  TabletServer tserver2(tbo);
795
1
  ASSERT_NO_FATALS(WARN_NOT_OK(tserver2.Init(), "Ignore"));
796
  // This call will fail for http binding, but this test is for rpc.
797
1
  ASSERT_NO_FATALS(WARN_NOT_OK(tserver2.GetRegistration(&reg), "Ignore"));
798
1
  ASSERT_EQ(2, reg.private_rpc_addresses().size());
799
1
}
800
801
// We are not checking if a row is out of bounds in YB because we are using a completely different
802
// hash-based partitioning scheme, so this test is now actually testing that there is no error
803
// returned from the server. If we introduce such range checking, this test could be enhanced to
804
// test for it.
805
1
TEST_F(TabletServerTest, TestWriteOutOfBounds) {
806
1
  const char *tabletId = "TestWriteOutOfBoundsTablet";
807
1
  Schema schema = SchemaBuilder(schema_).Build();
808
809
1
  PartitionSchema partition_schema;
810
1
  CHECK_OK(PartitionSchema::FromPB(PartitionSchemaPB(), schema, &partition_schema));
811
812
1
  Partition partition;
813
1
  auto table_info = std::make_shared<tablet::TableInfo>(
814
1
      "TestWriteOutOfBoundsTable", "test_ns", tabletId, YQL_TABLE_TYPE, schema, IndexMap(),
815
1
      boost::none /* index_info */, 0 /* schema_version */, partition_schema);
816
1
  ASSERT_OK(mini_server_->server()->tablet_manager()->CreateNewTablet(
817
1
      table_info, tabletId, partition, mini_server_->CreateLocalConfig()));
818
819
1
  ASSERT_OK(WaitForTabletRunning(tabletId));
820
821
1
  WriteRequestPB req;
822
1
  WriteResponsePB resp;
823
1
  RpcController controller;
824
1
  req.set_tablet_id(tabletId);
825
826
2
  for (auto op : { QLWriteRequestPB::QL_STMT_INSERT, QLWriteRequestPB::QL_STMT_UPDATE }) {
827
2
    AddTestRow(20, 1, "1", op, &req);
828
2
    ASSERT_OK(proxy_->Write(req, &resp, &controller));
829
2
    SCOPED_TRACE(resp.DebugString());
830
2
    ASSERT_FALSE(resp.has_error());
831
2
    req.clear_ql_write_batch();
832
2
    controller.Reset();
833
2
  }
834
1
}
835
836
namespace {
837
838
2
void CalcTestRowChecksum(uint64_t *out, int32_t key, uint8_t string_field_defined = true) {
839
2
  QLValue value;
840
841
2
  string strval = strings::Substitute("original$0", key);
842
2
  std::string buffer;
843
2
  uint32_t index = 0;
844
2
  buffer.append(pointer_cast<const char*>(&index), sizeof(index));
845
2
  value.set_int32_value(key);
846
2
  value.value().AppendToString(&buffer);
847
848
2
  index = 1;
849
2
  buffer.append(pointer_cast<const char*>(&index), sizeof(index));
850
2
  value.value().AppendToString(&buffer);
851
852
2
  index = 2;
853
2
  buffer.append(pointer_cast<const char*>(&index), sizeof(index));
854
2
  buffer.append(pointer_cast<const char*>(&string_field_defined), sizeof(string_field_defined));
855
2
  if (string_field_defined) {
856
1
    value.set_string_value(strval);
857
1
    value.value().AppendToString(&buffer);
858
1
  }
859
860
2
  crc::Crc* crc = crc::GetCrc32cInstance();
861
2
  crc->Compute(buffer.c_str(), buffer.size(), out, nullptr);
862
2
}
863
864
} // namespace
865
866
// Simple test to check that our checksum scans work as expected.
867
1
TEST_F(TabletServerTest, TestChecksumScan) {
868
1
  uint64_t total_crc = 0;
869
870
1
  ChecksumRequestPB req;
871
1
  req.set_tablet_id(kTabletId);
872
1
  ChecksumResponsePB resp;
873
1
  RpcController controller;
874
1
  ASSERT_OK(proxy_->Checksum(req, &resp, &controller));
875
876
  // No rows.
877
1
  ASSERT_EQ(total_crc, resp.checksum());
878
879
  // First row.
880
1
  int32_t key = 1;
881
1
  InsertTestRowsRemote(0, key, 1);
882
1
  controller.Reset();
883
1
  ASSERT_OK(proxy_->Checksum(req, &resp, &controller));
884
1
  CalcTestRowChecksum(&total_crc, key);
885
1
  uint64_t first_crc = total_crc; // Cache first record checksum.
886
887
2
  ASSERT_FALSE(resp.has_error()) << resp.error().DebugString();
888
1
  ASSERT_EQ(total_crc, resp.checksum());
889
890
  // Second row (null string field).
891
1
  key = 2;
892
1
  InsertTestRowsRemote(0, key, 1, 1, nullptr, kTabletId, nullptr, nullptr, false);
893
1
  controller.Reset();
894
1
  ASSERT_OK(proxy_->Checksum(req, &resp, &controller));
895
1
  CalcTestRowChecksum(&total_crc, key, false);
896
897
2
  ASSERT_FALSE(resp.has_error()) << resp.error().DebugString();
898
1
  ASSERT_EQ(total_crc, resp.checksum());
899
900
  // Finally, delete row 2, so we're back to the row 1 checksum.
901
1
  ASSERT_NO_FATALS(DeleteTestRowsRemote(key, 1));
902
1
  FLAGS_scanner_batch_size_rows = 100;
903
1
  controller.Reset();
904
1
  ASSERT_OK(proxy_->Checksum(req, &resp, &controller));
905
1
  ASSERT_NE(total_crc, resp.checksum());
906
1
  ASSERT_EQ(first_crc, resp.checksum());
907
1
}
908
909
} // namespace tserver
910
} // namespace yb