YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/tserver/pg_table_cache.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/tserver/pg_table_cache.h"
15
16
#include <unordered_map>
17
18
#include "yb/client/client.h"
19
#include "yb/client/table.h"
20
21
#include "yb/gutil/thread_annotations.h"
22
23
#include "yb/tserver/pg_client.pb.h"
24
25
#include "yb/util/scope_exit.h"
26
27
namespace yb {
28
namespace tserver {
29
30
namespace {
31
32
struct CacheEntry {
33
  std::promise<Result<client::YBTablePtr>> promise;
34
  std::shared_future<Result<client::YBTablePtr>> future;
35
  master::GetTableSchemaResponsePB info;
36
  PgTablePartitionsPB partitions;
37
38
36.0k
  CacheEntry() : future(promise.get_future()) {
39
36.0k
  }
40
};
41
42
} // namespace
43
44
class PgTableCache::Impl {
45
 public:
46
  explicit Impl(std::shared_future<client::YBClient*> client_future)
47
9.25k
      : client_future_(client_future) {}
48
49
  CHECKED_STATUS GetInfo(
50
      const TableId& table_id,
51
      master::GetTableSchemaResponsePB* info,
52
64.5k
      PgTablePartitionsPB* partitions) {
53
64.5k
    auto entry = GetEntry(table_id);
54
64.5k
    RETURN_NOT_OK(entry->future.get());
55
64.5k
    *info = entry->info;
56
64.5k
    *partitions = entry->partitions;
57
64.5k
    return Status::OK();
58
64.5k
  }
59
60
1.19M
  Result<client::YBTablePtr> Get(const TableId& table_id) {
61
1.19M
    return GetEntry(table_id)->future.get();
62
1.19M
  }
63
64
1.92k
  void Invalidate(const TableId& table_id) {
65
1.92k
    std::lock_guard<std::mutex> lock(mutex_);
66
1.92k
    cache_.erase(table_id);
67
1.92k
  }
68
69
551
  void InvalidateAll(CoarseTimePoint invalidation_time) {
70
551
    std::lock_guard<std::mutex> lock(mutex_);
71
551
    if (last_cache_invalidation_ > invalidation_time) {
72
66
      return;
73
66
    }
74
485
    last_cache_invalidation_ = CoarseMonoClock::now();
75
485
    cache_.clear();
76
485
  }
77
78
 private:
79
36.0k
  client::YBClient& client() {
80
36.0k
    return *client_future_.get();
81
36.0k
  }
82
83
1.25M
  std::shared_ptr<CacheEntry> GetEntry(const TableId& table_id) {
84
1.25M
    auto p = DoGetEntry(table_id);
85
1.25M
    if (p.second) {
86
36.0k
      LoadEntry(table_id, p.first.get());
87
36.0k
    }
88
1.25M
    return p.first;
89
1.25M
  }
90
91
1.25M
  std::pair<std::shared_ptr<CacheEntry>, bool> DoGetEntry(const TableId& table_id) {
92
1.25M
    std::lock_guard<std::mutex> lock(mutex_);
93
1.25M
    auto it = cache_.find(table_id);
94
1.25M
    if (it != cache_.end()) {
95
1.21M
      return std::make_pair(it->second, false);
96
1.21M
    }
97
35.9k
    it = cache_.emplace(table_id, std::make_shared<CacheEntry>()).first;
98
35.9k
    return std::make_pair(it->second, true);
99
35.9k
  }
100
101
  CHECKED_STATUS OpenTable(
102
36.0k
      const TableId& table_id, client::YBTablePtr* table, master::GetTableSchemaResponsePB* info) {
103
36.0k
    RETURN_NOT_OK(client().OpenTable(table_id, table, info));
104
36.0k
    RSTATUS_DCHECK(
105
35.9k
        (**table).table_type() == client::YBTableType::PGSQL_TABLE_TYPE, RuntimeError,
106
35.9k
        "Wrong table type");
107
35.9k
    return Status::OK();
108
36.0k
  }
109
110
36.0k
  void LoadEntry(const TableId& table_id, CacheEntry* entry) {
111
36.0k
    client::YBTablePtr table;
112
36.0k
    bool finished = false;
113
36.0k
    auto se = ScopeExit([entry, &finished] {
114
36.0k
      if (finished) {
115
36.0k
        return;
116
36.0k
      }
117
0
      entry->promise.set_value(STATUS(InternalError, "Unexpected return"));
118
0
    });
119
36.0k
    const auto status = OpenTable(table_id, &table, &entry->info);
120
36.0k
    if (!status.ok()) {
121
43
      Invalidate(table_id);
122
43
      entry->promise.set_value(status);
123
43
      finished = true;
124
43
      return;
125
43
    }
126
35.9k
    const auto partitions = table->GetVersionedPartitions();
127
35.9k
    entry->partitions.set_version(partitions->version);
128
41.6k
    for (const auto& key : partitions->keys) {
129
41.6k
      *entry->partitions.mutable_keys()->Add() = key;
130
41.6k
    }
131
132
35.9k
    entry->promise.set_value(table);
133
35.9k
    finished = true;
134
35.9k
  }
135
136
  std::shared_future<client::YBClient*> client_future_;
137
  std::mutex mutex_;
138
  std::unordered_map<TableId, std::shared_ptr<CacheEntry>> cache_ GUARDED_BY(mutex_);
139
  CoarseTimePoint last_cache_invalidation_ GUARDED_BY(mutex_);
140
};
141
142
PgTableCache::PgTableCache(std::shared_future<client::YBClient*> client_future)
143
9.25k
    : impl_(new Impl(std::move(client_future))) {
144
9.25k
}
145
146
0
PgTableCache::~PgTableCache() {
147
0
}
148
149
Status PgTableCache::GetInfo(
150
    const TableId& table_id,
151
    master::GetTableSchemaResponsePB* info,
152
64.6k
    PgTablePartitionsPB* partitions) {
153
64.6k
  return impl_->GetInfo(table_id, info, partitions);
154
64.6k
}
155
156
1.19M
Result<client::YBTablePtr> PgTableCache::Get(const TableId& table_id) {
157
1.19M
  return impl_->Get(table_id);
158
1.19M
}
159
160
1.88k
void PgTableCache::Invalidate(const TableId& table_id) {
161
1.88k
  impl_->Invalidate(table_id);
162
1.88k
}
163
164
551
void PgTableCache::InvalidateAll(CoarseTimePoint invalidation_time) {
165
551
  impl_->InvalidateAll(invalidation_time);
166
551
}
167
168
}  // namespace tserver
169
}  // namespace yb