YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/tserver/metrics_snapshotter.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/tserver/metrics_snapshotter.h"
15
16
#include <sys/statvfs.h>
17
18
#include <memory>
19
#include <vector>
20
#include <mutex>
21
#include <set>
22
23
#include <chrono>
24
#include <thread>
25
26
#ifdef __APPLE__
27
#include <mach/mach_init.h>
28
#include <mach/mach_error.h>
29
#include <mach/mach_host.h>
30
#include <mach/vm_map.h>
31
#else
32
#include <stdlib.h>
33
#include <stdio.h>
34
#include <string.h>
35
#endif
36
37
#include <boost/algorithm/string.hpp>
38
39
#include <rapidjson/document.h>
40
41
#include <glog/logging.h>
42
43
#include "yb/common/jsonb.h"
44
#include "yb/common/wire_protocol.h"
45
46
#include "yb/client/client.h"
47
#include "yb/client/error.h"
48
#include "yb/client/schema.h"
49
#include "yb/client/session.h"
50
#include "yb/client/table_handle.h"
51
#include "yb/client/yb_op.h"
52
#include "yb/client/yb_table_name.h"
53
54
#include "yb/gutil/ref_counted.h"
55
#include "yb/gutil/stringprintf.h"
56
#include "yb/gutil/strings/escaping.h"
57
#include "yb/gutil/strings/substitute.h"
58
59
#include "yb/master/master_defaults.h"
60
61
#include "yb/tablet/tablet.h"
62
#include "yb/tablet/tablet_peer.h"
63
#include "yb/tserver/tablet_server.h"
64
#include "yb/tserver/tablet_server_options.h"
65
#include "yb/tserver/ts_tablet_manager.h"
66
67
#include "yb/client/client_fwd.h"
68
#include "yb/gutil/macros.h"
69
70
#include "yb/util/bytes_formatter.h"
71
#include "yb/util/capabilities.h"
72
#include "yb/util/date_time.h"
73
#include "yb/util/decimal.h"
74
#include "yb/util/enums.h"
75
#include "yb/util/flag_tags.h"
76
#include "yb/util/logging.h"
77
#include "yb/util/mem_tracker.h"
78
#include "yb/util/metrics.h"
79
#include "yb/util/monotime.h"
80
#include "yb/util/net/net_util.h"
81
#include "yb/util/status.h"
82
#include "yb/util/status_log.h"
83
#include "yb/util/thread.h"
84
#include "yb/util/tsan_util.h"
85
#include "yb/util/varint.h"
86
87
using namespace std::literals;
88
89
DEFINE_int32(metrics_snapshotter_interval_ms, 30 * 1000,
90
             "Interval at which the metrics are snapshotted.");
91
TAG_FLAG(metrics_snapshotter_interval_ms, advanced);
92
93
DEFINE_string(metrics_snapshotter_tserver_metrics_whitelist,
94
    "handler_latency_yb_client_read_local_sum,handler_latency_yb_client_read_local_count",
95
    "Tserver metrics to record in native metrics storage.");
96
TAG_FLAG(metrics_snapshotter_tserver_metrics_whitelist, advanced);
97
98
DEFINE_string(metrics_snapshotter_table_metrics_whitelist,
99
    "rocksdb_sst_read_micros_sum,rocksdb_sst_read_micros_count",
100
    "Table metrics to record in native metrics storage.");
101
TAG_FLAG(metrics_snapshotter_table_metrics_whitelist, advanced);
102
103
constexpr int kTServerMetricsSnapshotterYbClientDefaultTimeoutMs =
104
  yb::RegularBuildVsSanitizers(5, 60) * 1000;
105
106
DEFINE_int32(tserver_metrics_snapshotter_yb_client_default_timeout_ms,
107
    kTServerMetricsSnapshotterYbClientDefaultTimeoutMs,
108
    "Default timeout for the YBClient embedded into the tablet server that is used "
109
    "by metrics snapshotter.");
110
TAG_FLAG(tserver_metrics_snapshotter_yb_client_default_timeout_ms, advanced);
111
112
DEFINE_uint64(metrics_snapshotter_ttl_ms, 7 * 24 * 60 * 60 * 1000 /* 1 week */,
113
             "Ttl for snapshotted metrics.");
114
TAG_FLAG(metrics_snapshotter_ttl_ms, advanced);
115
116
DECLARE_int32(max_tables_metrics_breakdowns);
117
118
using std::shared_ptr;
119
using std::vector;
120
using strings::Substitute;
121
122
namespace yb {
123
124
using client::YBSession;
125
using client::YBTableName;
126
using client::YBqlOp;
127
128
namespace tserver {
129
130
// Most of the actual logic of the metrics snapshotter is inside this inner class,
131
// to avoid having too many dependencies from the header itself.
132
//
133
// This is basically the "PIMPL" pattern.
134
class MetricsSnapshotter::Thread {
135
 public:
136
  Thread(const TabletServerOptions& opts, TabletServer* server);
137
138
  Status Start();
139
  Status Stop();
140
141
 private:
142
  void RunThread();
143
  int GetMillisUntilNextMetricsSnapshot() const;
144
145
  CHECKED_STATUS DoPrometheusMetricsSnapshot(const client::TableHandle& table,
146
    shared_ptr<YBSession> session, const std::string& entity_type, const std::string& entity_id,
147
    const std::string& metric_name, int64_t metric_val, const rapidjson::Document* details);
148
  CHECKED_STATUS DoMetricsSnapshot();
149
150
  void FlushSession(const std::shared_ptr<YBSession>& session,
151
      const std::vector<std::shared_ptr<YBqlOp>>& ops = {});
152
  void LogSessionErrors(const client::FlushStatus& flush_status);
153
  bool IsCurrentThread() const;
154
155
1
  const std::string& LogPrefix() const {
156
1
    return log_prefix_;
157
1
  }
158
159
  // Retrieves current cpu usage information.
160
  Result<vector<uint64_t>> GetCpuUsage();
161
162
  // The server for which we are collecting metrics.
163
  TabletServer* const server_;
164
165
  // The actual running thread (NULL before it is started)
166
  scoped_refptr<yb::Thread> thread_;
167
168
  boost::optional<yb::client::AsyncClientInitialiser> async_client_init_;
169
170
  // True once at least one attempt to record a snapshot has been made.
171
  bool has_metricssnapshotted_ = false;
172
173
  // Mutex/condition pair to trigger the metrics snapshotter thread
174
  // to either snapshot early or exit.
175
  Mutex mutex_;
176
  ConditionVariable cond_;
177
178
  // Protected by mutex_.
179
  bool should_run_ = false;
180
181
  const std::string log_prefix_;
182
183
  // Tokens from FLAGS_metrics_snapshotter_tserver_metrics_whitelist.
184
  std::unordered_set<std::string> tserver_metrics_whitelist_;
185
186
  // Tokens from FLAGS_metrics_snapshotter_table_metrics_whitelist.
187
  std::unordered_set<std::string> table_metrics_whitelist_;
188
189
  // Used to calculate CPU usage if enabled. Stores {total_ticks, user_ticks, system_ticks}.
190
  vector<uint64_t> prev_ticks_ = {0, 0, 0};
191
  bool first_run_cpu_ticks_ = true;
192
193
  TabletServerOptions opts_;
194
195
  DISALLOW_COPY_AND_ASSIGN(Thread);
196
};
197
198
////////////////////////////////////////////////////////////
199
// MetricsSnapshotter
200
////////////////////////////////////////////////////////////
201
202
MetricsSnapshotter::MetricsSnapshotter(const TabletServerOptions& opts, TabletServer* server)
203
1
  : thread_(new Thread(opts, server)) {
204
1
}
205
0
MetricsSnapshotter::~MetricsSnapshotter() {
206
0
  WARN_NOT_OK(Stop(), "Unable to stop metrics snapshotter thread");
207
0
}
208
209
1
Status MetricsSnapshotter::Start() {
210
1
  return thread_->Start();
211
1
}
212
1
Status MetricsSnapshotter::Stop() {
213
1
  return thread_->Stop();
214
1
}
215
216
////////////////////////////////////////////////////////////
217
// MetricsSnapshotter::Thread
218
////////////////////////////////////////////////////////////
219
220
2
static std::unordered_set<std::string> CSVToSet(const std::string& s) {
221
2
  std::unordered_set<std::string> t;
222
2
  boost::split(t, s, boost::is_any_of(","));
223
2
  return t;
224
2
}
225
226
MetricsSnapshotter::Thread::Thread(const TabletServerOptions& opts, TabletServer* server)
227
  : server_(server),
228
    cond_(&mutex_),
229
    log_prefix_(Format("P $0: ", server_->permanent_uuid())),
230
1
    opts_(opts) {
231
1
  
VLOG_WITH_PREFIX0
(1) << "Initializing metrics snapshotter thread"0
;
232
233
  // Parse whitelist elements out of flag.
234
1
  tserver_metrics_whitelist_ = CSVToSet(FLAGS_metrics_snapshotter_tserver_metrics_whitelist);
235
1
  table_metrics_whitelist_ = CSVToSet(FLAGS_metrics_snapshotter_table_metrics_whitelist);
236
237
1
  async_client_init_.emplace(
238
1
      "tserver_metrics_snapshotter_client", 0 /* num_reactors */,
239
1
      FLAGS_tserver_metrics_snapshotter_yb_client_default_timeout_ms / 1000, "" /* tserver_uuid */,
240
1
      &server->options(), server->metric_entity(), server->mem_tracker(),
241
1
      server->messenger());
242
1
}
243
244
2
int MetricsSnapshotter::Thread::GetMillisUntilNextMetricsSnapshot() const {
245
  // When we first start up, snapshot immediately.
246
2
  if (!has_metricssnapshotted_) {
247
1
    return 0;
248
1
  }
249
250
1
  return FLAGS_metrics_snapshotter_interval_ms;
251
2
}
252
253
0
void MetricsSnapshotter::Thread::LogSessionErrors(const client::FlushStatus& flush_status) {
254
0
  const auto& errors = flush_status.errors;
255
256
0
  size_t num_errors_to_log = 10;
257
258
  // Log only the first 10 errors.
259
0
  LOG_WITH_PREFIX(INFO) << errors.size() << " failed ops. First few errors follow";
260
0
  size_t i = 0;
261
0
  for (const auto& e : errors) {
262
0
    if (i == num_errors_to_log) {
263
0
      break;
264
0
    }
265
0
    LOG_WITH_PREFIX(INFO) << "Op " << e->failed_op().ToString()
266
0
              << " had status " << e->status().ToString();
267
0
    i++;
268
0
  }
269
270
0
  if (errors.size() > num_errors_to_log) {
271
0
    LOG_WITH_PREFIX(INFO) << (errors.size() - num_errors_to_log) << " failed ops skipped.";
272
0
  }
273
0
}
274
275
void MetricsSnapshotter::Thread::FlushSession(const std::shared_ptr<YBSession>& session,
276
0
                       const std::vector<std::shared_ptr<YBqlOp>>& ops) {
277
0
  auto flush_status = session->FlushAndGetOpsErrors();
278
0
  if (PREDICT_FALSE(!flush_status.status.ok())) {
279
0
    LogSessionErrors(flush_status);
280
0
    return;
281
0
  }
282
283
0
  for (auto& op : ops) {
284
0
    if (QLResponsePB::YQL_STATUS_OK != op->response().status()) {
285
0
      LOG_WITH_PREFIX(WARNING) << "Status: " <<
286
0
        QLResponsePB::QLStatus_Name(op->response().status());
287
0
    }
288
0
  }
289
0
}
290
291
Status MetricsSnapshotter::Thread::DoPrometheusMetricsSnapshot(const client::TableHandle& table,
292
    shared_ptr<YBSession> session, const std::string& entity_type, const std::string& entity_id,
293
    const std::string& metric_name, int64_t metric_val,
294
0
    const rapidjson::Document* details = nullptr) {
295
0
  auto op = table.NewWriteOp(QLWriteRequestPB::QL_STMT_INSERT);
296
0
  auto req = op->mutable_request();
297
298
0
  QLAddStringHashValue(req, server_->permanent_uuid());
299
0
  QLAddStringRangeValue(req, entity_type);
300
0
  QLAddStringRangeValue(req, entity_id);
301
0
  QLAddStringRangeValue(req, metric_name);
302
0
  QLAddTimestampRangeValue(req, DateTime::TimestampNow().ToInt64());
303
0
  table.AddInt64ColumnValue(req, "value", metric_val);
304
0
  if (details != nullptr) {
305
0
    common::Jsonb jsonb;
306
0
    RETURN_NOT_OK(jsonb.FromRapidJson(*details));
307
0
    table.AddJsonbColumnValue(req, "details", jsonb.MoveSerializedJsonb());
308
0
  }
309
310
0
  req->set_ttl(FLAGS_metrics_snapshotter_ttl_ms);
311
0
  session->Apply(op);
312
0
  return Status::OK();
313
0
}
314
315
1
Status MetricsSnapshotter::Thread::DoMetricsSnapshot() {
316
1
  CHECK(IsCurrentThread());
317
318
1
  auto client_ = async_client_init_->client();
319
1
  shared_ptr<YBSession> session = client_->NewSession();
320
1
  session->SetTimeout(15s);
321
322
1
  const YBTableName kTableName(
323
1
      YQL_DATABASE_CQL, master::kSystemNamespaceName, kMetricsSnapshotsTableName);
324
325
1
  client::TableHandle table;
326
1
  RETURN_NOT_OK(table.Open(kTableName, client_));
327
328
0
  NMSWriter::EntityMetricsMap table_metrics;
329
0
  NMSWriter::MetricsMap server_metrics;
330
0
  NMSWriter nmswriter{&table_metrics, &server_metrics};
331
0
  auto opt = MetricPrometheusOptions();
332
0
  opt.max_tables_metrics_breakdowns = FLAGS_max_tables_metrics_breakdowns;
333
0
  WARN_NOT_OK(
334
0
      server_->metric_registry()->WriteForPrometheus(&nmswriter, opt),
335
0
      "Couldn't write metrics for native metrics storage");
336
0
  for (const auto& kv : server_metrics) {
337
0
    if (tserver_metrics_whitelist_.find(kv.first) != tserver_metrics_whitelist_.end()) {
338
0
      RETURN_NOT_OK(DoPrometheusMetricsSnapshot(table, session, "tserver",
339
0
            server_->permanent_uuid(), kv.first, kv.second));
340
0
    }
341
0
  }
342
343
0
  if (tserver_metrics_whitelist_.find("node_up") != tserver_metrics_whitelist_.end()) {
344
0
    RETURN_NOT_OK(DoPrometheusMetricsSnapshot(table, session, "tserver",
345
0
                                              server_->permanent_uuid(), "node_up",
346
0
                                              1));
347
0
  }
348
349
0
  if (tserver_metrics_whitelist_.find("disk_usage") != tserver_metrics_whitelist_.end()) {
350
0
    struct statvfs stat;
351
0
    set<uint64_t> fs_ids;
352
0
    std::vector<std::string> all_data_paths = opts_.fs_opts.data_paths;
353
0
    all_data_paths.insert(
354
0
      all_data_paths.end(), opts_.fs_opts.wal_paths.begin(), opts_.fs_opts.wal_paths.end());
355
0
    for (const auto& path : all_data_paths) {
356
0
      if (statvfs(path.c_str(), &stat) == 0 && fs_ids.insert(stat.f_fsid).second) {
357
0
        uint64_t num_frags = static_cast<uint64_t>(stat.f_blocks);
358
0
        uint64_t frag_size = static_cast<uint64_t>(stat.f_frsize);
359
0
        uint64_t free_blocks = static_cast<uint64_t>(stat.f_bfree);
360
0
        uint64_t total_disk = num_frags * frag_size;
361
0
        uint64_t free_disk = free_blocks * frag_size;
362
0
        RETURN_NOT_OK(DoPrometheusMetricsSnapshot(table, session, "table",
363
0
                                                  server_->permanent_uuid(), "total_disk",
364
0
                                                  total_disk));
365
0
        RETURN_NOT_OK(DoPrometheusMetricsSnapshot(table, session, "table",
366
0
                                                  server_->permanent_uuid(), "free_disk",
367
0
                                                  free_disk));
368
0
      }
369
0
    }
370
0
  }
371
372
0
  if (tserver_metrics_whitelist_.find("cpu_usage") != tserver_metrics_whitelist_.end()) {
373
    // Store the {total_ticks, user_ticks, and system_ticks}
374
0
    auto cur_ticks = CHECK_RESULT(GetCpuUsage());
375
0
    bool get_cpu_success = std::all_of(
376
0
        cur_ticks.begin(), cur_ticks.end(), [](bool v) { return v > 0; });
377
0
    if (get_cpu_success && first_run_cpu_ticks_) {
378
0
      prev_ticks_ = cur_ticks;
379
0
      first_run_cpu_ticks_ = false;
380
0
      std::this_thread::sleep_for(std::chrono::milliseconds(500));
381
0
      cur_ticks = CHECK_RESULT(GetCpuUsage());
382
0
      get_cpu_success = std::all_of(
383
0
          cur_ticks.begin(), cur_ticks.end(), [](bool v) { return v > 0; });
384
0
    }
385
386
0
    if (get_cpu_success) {
387
0
      uint64_t total_ticks = cur_ticks[0] - prev_ticks_[0];
388
0
      uint64_t user_ticks = cur_ticks[1] - prev_ticks_[1];
389
0
      uint64_t system_ticks = cur_ticks[2] - prev_ticks_[2];
390
0
      if (total_ticks <= 0) {
391
0
        YB_LOG_EVERY_N_SECS(ERROR, 120) << Format("Failed to calculate CPU usage - "
392
0
                                                 "invalid total CPU ticks: $0.", total_ticks);
393
0
      } else {
394
0
        double cpu_usage_user = static_cast<double>(user_ticks) / total_ticks;
395
0
        double cpu_usage_system = static_cast<double>(system_ticks) / total_ticks;
396
397
        // The value column is type bigint, so store real value in details.
398
0
        rapidjson::Document details;
399
0
        details.SetObject();
400
0
        details.AddMember("value", cpu_usage_user, details.GetAllocator());
401
0
        RETURN_NOT_OK(DoPrometheusMetricsSnapshot(table, session, "table",
402
0
                        server_->permanent_uuid(), "cpu_usage_user", 1000000 * cpu_usage_user,
403
0
                        &details));
404
405
0
        details.RemoveAllMembers();
406
0
        details.AddMember("value", cpu_usage_system, details.GetAllocator());
407
0
        RETURN_NOT_OK(DoPrometheusMetricsSnapshot(table, session, "table",
408
0
                        server_->permanent_uuid(), "cpu_usage_system", 1000000 * cpu_usage_system,
409
0
                        &details));
410
0
      }
411
0
    } else {
412
0
      YB_LOG_EVERY_N_SECS(WARNING, 120) << Format("Failed to retrieve cpu ticks. Got "
413
0
                                                  "[total_ticks, user-ticks, system_ticks]=$0.",
414
0
                                                  cur_ticks);
415
0
    }
416
0
  }
417
418
0
  for (const auto& kv : table_metrics) {
419
0
    for (const auto& vkv : kv.second) {
420
0
      if (table_metrics_whitelist_.find(vkv.first) != table_metrics_whitelist_.end()) {
421
0
        RETURN_NOT_OK(DoPrometheusMetricsSnapshot(table, session, "table", kv.first, vkv.first,
422
0
              vkv.second));
423
0
      }
424
0
    }
425
0
  }
426
427
0
  FlushSession(session);
428
0
  return Status::OK();
429
0
}
430
431
0
Result<vector<uint64_t>> MetricsSnapshotter::Thread::GetCpuUsage() {
432
0
  uint64_t total_ticks = 0, total_user_ticks = 0, total_system_ticks = 0;
433
0
#ifdef __APPLE__
434
0
  host_cpu_load_info_data_t cpuinfo;
435
0
  mach_msg_type_number_t count = HOST_CPU_LOAD_INFO_COUNT;
436
0
  if (host_statistics(
437
0
        mach_host_self(), HOST_CPU_LOAD_INFO, (host_info_t) &cpuinfo, &count) == KERN_SUCCESS) {
438
0
    for (int i = 0; i < CPU_STATE_MAX; i++) {
439
0
      total_ticks += cpuinfo.cpu_ticks[i];
440
0
    }
441
0
    total_user_ticks = cpuinfo.cpu_ticks[CPU_STATE_USER];
442
0
    total_system_ticks = cpuinfo.cpu_ticks[CPU_STATE_SYSTEM];
443
0
  } else {
444
0
    YB_LOG_EVERY_N_SECS(WARNING, 120) << "Couldn't get CPU ticks, failed opening host_statistics "
445
0
                                      << "with errno: " << strerror(errno);
446
0
  }
447
#else
448
  FILE* file = fopen("/proc/stat", "r");
449
  if (!file) {
450
    YB_LOG_EVERY_N_SECS(WARNING, 120) << "Could not get CPU ticks: failed to open /proc/stat "
451
                                      << "with errno: " << strerror(errno);
452
  }
453
  uint64_t user_ticks, user_nice_ticks, system_ticks, idle_ticks;
454
  int scanned = fscanf(file, "cpu %" SCNu64 " %" SCNu64 " %" SCNu64 " %" SCNu64,
455
      &user_ticks, &user_nice_ticks, &system_ticks, &idle_ticks);
456
  if (scanned <= 0) {
457
    YB_LOG_EVERY_N_SECS(WARNING, 120) << Format("Failed to scan /proc/stat for cpu ticks "
458
                                               "with error code=$0 and errno=$1.", scanned, errno);
459
  } else if (scanned != 4) {
460
    YB_LOG_EVERY_N_SECS(WARNING, 120) << Format("Failed to scan /proc/stat for cpu ticks. ",
461
                                               "Expected 4 inputs but got $0.", scanned);
462
  } else {
463
    if (fclose(file)) {
464
      YB_LOG_EVERY_N_SECS(WARNING, 120) << "Failed to close /proc/stat with errno: "
465
                                        << strerror(errno);
466
    }
467
    total_ticks = user_ticks + user_nice_ticks + system_ticks + idle_ticks;
468
    total_user_ticks = user_ticks + user_nice_ticks;
469
    total_system_ticks = system_ticks;
470
  }
471
#endif
472
0
  vector<uint64_t> ret = {total_ticks, total_user_ticks, total_system_ticks};
473
0
  return ret;
474
0
}
475
476
1
void MetricsSnapshotter::Thread::RunThread() {
477
1
  CHECK(IsCurrentThread());
478
1
  
VLOG_WITH_PREFIX0
(1) << "Metrics snapshot thread starting"0
;
479
480
2
  while (true) {
481
2
    MonoTime next_metrics_snapshot = MonoTime::Now();
482
2
    next_metrics_snapshot.AddDelta(
483
2
        MonoDelta::FromMilliseconds(GetMillisUntilNextMetricsSnapshot()));
484
485
    // Wait for either the snapshot interval to elapse, or for the signal to shut down.
486
2
    {
487
2
      MutexLock l(mutex_);
488
3
      while (true) {
489
3
        MonoDelta remaining = next_metrics_snapshot.GetDeltaSince(MonoTime::Now());
490
3
        if (remaining.ToMilliseconds() <= 0 ||
491
3
            
!should_run_2
) {
492
2
          break;
493
2
        }
494
1
        cond_.TimedWait(remaining);
495
1
      }
496
497
2
      if (!should_run_) {
498
1
        
VLOG_WITH_PREFIX0
(1) << "Metris snapshot thread finished"0
;
499
1
        return;
500
1
      }
501
2
    }
502
503
1
    Status s = DoMetricsSnapshot();
504
1
    if (!s.ok()) {
505
1
      LOG_WITH_PREFIX(WARNING) << "Failed to snapshot metrics, code=" << s;
506
1
    }
507
1
    has_metricssnapshotted_ = true;
508
1
  }
509
1
}
510
511
2
bool MetricsSnapshotter::Thread::IsCurrentThread() const {
512
2
  return thread_.get() == yb::Thread::current_thread();
513
2
}
514
515
1
Status MetricsSnapshotter::Thread::Start() {
516
1
  CHECK(thread_ == nullptr);
517
518
1
  async_client_init_->Start();
519
520
1
  should_run_ = true;
521
1
  return yb::Thread::Create("metrics_snapshotter", "metrics_snapshot",
522
1
      &MetricsSnapshotter::Thread::RunThread, this, &thread_);
523
1
}
524
525
1
Status MetricsSnapshotter::Thread::Stop() {
526
1
  if (!thread_) {
527
0
    return Status::OK();
528
0
  }
529
530
1
  async_client_init_->Shutdown();
531
532
1
  {
533
1
    MutexLock l(mutex_);
534
1
    should_run_ = false;
535
1
    cond_.Signal();
536
1
  }
537
1
  RETURN_NOT_OK(ThreadJoiner(thread_.get()).Join());
538
1
  thread_ = nullptr;
539
1
  return Status::OK();
540
1
}
541
542
} // namespace tserver
543
} // namespace yb