YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/tserver/pg_table_cache.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/tserver/pg_table_cache.h"
15
16
#include <unordered_map>
17
18
#include "yb/client/client.h"
19
#include "yb/client/table.h"
20
21
#include "yb/gutil/thread_annotations.h"
22
23
#include "yb/tserver/pg_client.pb.h"
24
25
#include "yb/util/scope_exit.h"
26
27
namespace yb {
28
namespace tserver {
29
30
namespace {
31
32
struct CacheEntry {
33
  std::promise<Result<client::YBTablePtr>> promise;
34
  std::shared_future<Result<client::YBTablePtr>> future;
35
  master::GetTableSchemaResponsePB info;
36
  PgTablePartitionsPB partitions;
37
38
80.3k
  CacheEntry() : future(promise.get_future()) {
39
80.3k
  }
40
};
41
42
} // namespace
43
44
class PgTableCache::Impl {
45
 public:
46
  explicit Impl(std::shared_future<client::YBClient*> client_future)
47
16.7k
      : client_future_(client_future) {}
48
49
  CHECKED_STATUS GetInfo(
50
      const TableId& table_id,
51
      master::GetTableSchemaResponsePB* info,
52
191k
      PgTablePartitionsPB* partitions) {
53
191k
    auto entry = GetEntry(table_id);
54
191k
    RETURN_NOT_OK(entry->future.get());
55
191k
    *info = entry->info;
56
191k
    *partitions = entry->partitions;
57
191k
    return Status::OK();
58
191k
  }
59
60
3.56M
  Result<client::YBTablePtr> Get(const TableId& table_id) {
61
3.56M
    return GetEntry(table_id)->future.get();
62
3.56M
  }
63
64
8.75k
  void Invalidate(const TableId& table_id) {
65
8.75k
    std::lock_guard<std::mutex> lock(mutex_);
66
8.75k
    cache_.erase(table_id);
67
8.75k
  }
68
69
1.04k
  void InvalidateAll(CoarseTimePoint invalidation_time) {
70
1.04k
    std::lock_guard<std::mutex> lock(mutex_);
71
1.04k
    if (last_cache_invalidation_ > invalidation_time) {
72
76
      return;
73
76
    }
74
965
    last_cache_invalidation_ = CoarseMonoClock::now();
75
965
    cache_.clear();
76
965
  }
77
78
 private:
79
80.3k
  client::YBClient& client() {
80
80.3k
    return *client_future_.get();
81
80.3k
  }
82
83
3.75M
  std::shared_ptr<CacheEntry> GetEntry(const TableId& table_id) {
84
3.75M
    auto p = DoGetEntry(table_id);
85
3.75M
    if (p.second) {
86
80.3k
      LoadEntry(table_id, p.first.get());
87
80.3k
    }
88
3.75M
    return p.first;
89
3.75M
  }
90
91
3.75M
  std::pair<std::shared_ptr<CacheEntry>, bool> DoGetEntry(const TableId& table_id) {
92
3.75M
    std::lock_guard<std::mutex> lock(mutex_);
93
3.75M
    auto it = cache_.find(table_id);
94
3.75M
    if (it != cache_.end()) {
95
3.67M
      return std::make_pair(it->second, false);
96
3.67M
    }
97
80.0k
    it = cache_.emplace(table_id, std::make_shared<CacheEntry>()).first;
98
80.0k
    return std::make_pair(it->second, true);
99
3.75M
  }
100
101
  CHECKED_STATUS OpenTable(
102
80.3k
      const TableId& table_id, client::YBTablePtr* table, master::GetTableSchemaResponsePB* info) {
103
80.3k
    RETURN_NOT_OK(client().OpenTable(table_id, table, info));
104
80.3k
    RSTATUS_DCHECK(
105
80.1k
        (**table).table_type() == client::YBTableType::PGSQL_TABLE_TYPE, RuntimeError,
106
80.1k
        "Wrong table type");
107
80.1k
    return Status::OK();
108
80.3k
  }
109
110
80.3k
  void LoadEntry(const TableId& table_id, CacheEntry* entry) {
111
80.3k
    client::YBTablePtr table;
112
80.3k
    bool finished = false;
113
80.3k
    auto se = ScopeExit([entry, &finished] {
114
80.3k
      if (finished) {
115
80.3k
        return;
116
80.3k
      }
117
0
      entry->promise.set_value(STATUS(InternalError, "Unexpected return"));
118
0
    });
119
80.3k
    const auto status = OpenTable(table_id, &table, &entry->info);
120
80.3k
    if (!status.ok()) {
121
226
      Invalidate(table_id);
122
226
      entry->promise.set_value(status);
123
226
      finished = true;
124
226
      return;
125
226
    }
126
80.1k
    const auto partitions = table->GetVersionedPartitions();
127
80.1k
    entry->partitions.set_version(partitions->version);
128
109k
    for (const auto& key : partitions->keys) {
129
109k
      *entry->partitions.mutable_keys()->Add() = key;
130
109k
    }
131
132
80.1k
    entry->promise.set_value(table);
133
80.1k
    finished = true;
134
80.1k
  }
135
136
  std::shared_future<client::YBClient*> client_future_;
137
  std::mutex mutex_;
138
  std::unordered_map<TableId, std::shared_ptr<CacheEntry>> cache_ GUARDED_BY(mutex_);
139
  CoarseTimePoint last_cache_invalidation_ GUARDED_BY(mutex_);
140
};
141
142
PgTableCache::PgTableCache(std::shared_future<client::YBClient*> client_future)
143
16.7k
    : impl_(new Impl(std::move(client_future))) {
144
16.7k
}
145
146
182
PgTableCache::~PgTableCache() {
147
182
}
148
149
Status PgTableCache::GetInfo(
150
    const TableId& table_id,
151
    master::GetTableSchemaResponsePB* info,
152
191k
    PgTablePartitionsPB* partitions) {
153
191k
  return impl_->GetInfo(table_id, info, partitions);
154
191k
}
155
156
3.56M
Result<client::YBTablePtr> PgTableCache::Get(const TableId& table_id) {
157
3.56M
  return impl_->Get(table_id);
158
3.56M
}
159
160
8.52k
void PgTableCache::Invalidate(const TableId& table_id) {
161
8.52k
  impl_->Invalidate(table_id);
162
8.52k
}
163
164
1.04k
void PgTableCache::InvalidateAll(CoarseTimePoint invalidation_time) {
165
1.04k
  impl_->InvalidateAll(invalidation_time);
166
1.04k
}
167
168
}  // namespace tserver
169
}  // namespace yb