YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/master/permissions_manager.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#include "yb/master/permissions_manager.h"
15
16
#include <mutex>
17
18
#include "yb/gutil/casts.h"
19
#include "yb/gutil/strings/substitute.h"
20
21
#include "yb/master/catalog_manager-internal.h"
22
#include "yb/master/master_dcl.pb.h"
23
#include "yb/master/master_ddl.pb.h"
24
#include "yb/master/scoped_leader_shared_lock-internal.h"
25
#include "yb/master/sys_catalog.h"
26
#include "yb/master/sys_catalog_constants.h"
27
28
#include "yb/util/crypt.h"
29
#include "yb/util/shared_lock.h"
30
#include "yb/util/status_format.h"
31
#include "yb/util/status_log.h"
32
#include "yb/util/trace.h"
33
34
using std::shared_ptr;
35
36
using yb::util::kBcryptHashSize;
37
using yb::util::bcrypt_hashpw;
38
using strings::Substitute;
39
40
DECLARE_bool(ycql_cache_login_info);
41
42
// TODO: remove direct references to member fields in CatalogManager from here.
43
44
namespace yb {
45
namespace master {
46
47
namespace {
48
49
// Helper class to abort mutations at the end of a scope.
50
template<class PersistentDataEntryPB>
51
class ScopedMutation {
52
 public:
53
  explicit ScopedMutation(PersistentDataEntryPB* cow_object)
54
12.7k
      : cow_object_(DCHECK_NOTNULL(cow_object)) {
55
12.7k
    cow_object->mutable_metadata()->StartMutation();
56
12.7k
  }
57
58
4.72k
  void Commit() {
59
4.72k
    cow_object_->mutable_metadata()->CommitMutation();
60
4.72k
    committed_ = true;
61
4.72k
  }
62
63
  // Abort the mutation if it wasn't committed.
64
12.7k
  ~ScopedMutation() {
65
12.7k
    if (PREDICT_FALSE(!committed_)) {
66
7.98k
      cow_object_->mutable_metadata()->AbortMutation();
67
7.98k
    }
68
12.7k
}
69
70
 private:
71
  PersistentDataEntryPB* cow_object_;
72
  bool committed_ = false;
73
};
74
75
}  // anonymous namespace
76
77
78
PermissionsManager::PermissionsManager(CatalogManager* catalog_manager)
79
    : security_config_(nullptr),
80
8.07k
      catalog_manager_(catalog_manager) {
81
8.07k
  CHECK_NOTNULL(catalog_manager);
82
8.07k
}
83
84
3.00k
Status PermissionsManager::PrepareDefaultRoles(int64_t term) {
85
3.00k
  LockGuard lock(mutex_);
86
3.00k
  if (FindPtrOrNull(roles_map_, kDefaultCassandraUsername) != nullptr) {
87
845
    LOG(INFO) << "Role " << kDefaultCassandraUsername
88
845
              << " already created, skipping initialization";
89
845
    return Status::OK();
90
845
  }
91
92
2.16k
  char hash[kBcryptHashSize];
93
  // TODO: refactor interface to be more c++ like...
94
2.16k
  int ret = bcrypt_hashpw(kDefaultCassandraPassword, hash);
95
2.16k
  if (ret != 0) {
96
4
    return STATUS_SUBSTITUTE(IllegalState, "Could not hash password, reason: $0", ret);
97
4
  }
98
99
  // Create in memory object.
100
2.15k
  Status s = CreateRoleUnlocked(kDefaultCassandraUsername, std::string(hash, kBcryptHashSize),
101
2.15k
                                true, true, term, false /* Don't increment the roles version */);
102
2.15k
  if (PREDICT_TRUE(s.ok())) {
103
2.15k
    LOG(INFO) << "Created role: " << kDefaultCassandraUsername;
104
2.15k
  }
105
106
2.15k
  return s;
107
2.16k
}
108
109
template<class RespClass>
110
Status PermissionsManager::GrantPermissions(
111
    const RoleName& role_name,
112
    const std::string& canonical_resource,
113
    const std::string& resource_name,
114
    const NamespaceName& keyspace,
115
    const std::vector<PermissionType>& permissions,
116
    const ResourceType resource_type,
117
1.89k
    RespClass* resp) {
118
1.89k
  LockGuard lock(mutex_);
119
120
1.89k
  scoped_refptr<RoleInfo> rp;
121
1.89k
  rp = FindPtrOrNull(roles_map_, role_name);
122
1.89k
  if (rp == nullptr) {
123
0
    const Status s = STATUS_SUBSTITUTE(NotFound, "Role $0 was not found", role_name);
124
0
    return SetupError(resp->mutable_error(), MasterErrorPB::ROLE_NOT_FOUND, s);
125
0
  }
126
127
1.89k
  RETURN_NOT_OK(IncrementRolesVersionUnlocked());
128
129
1.89k
  {
130
1.89k
    SysRoleEntryPB* metadata;
131
1.89k
    ScopedMutation <RoleInfo> role_info_mutation(rp.get());
132
1.89k
    metadata = &rp->mutable_metadata()->mutable_dirty()->pb;
133
134
1.89k
    ResourcePermissionsPB* current_resource = metadata->add_resources();
135
136
1.89k
    current_resource->set_canonical_resource(canonical_resource);
137
1.89k
    current_resource->set_resource_type(resource_type);
138
1.89k
    current_resource->set_resource_name(resource_name);
139
1.89k
    current_resource->set_namespace_name(keyspace);
140
141
8.88k
    for (const auto& permission : permissions) {
142
8.88k
      if (permission == PermissionType::DESCRIBE_PERMISSION &&
143
8.88k
          
resource_type != ResourceType::ROLE0
&&
144
8.88k
          
resource_type != ResourceType::ALL_ROLES0
) {
145
        // Describe permission should only be granted to the role resource.
146
0
        continue;
147
0
      }
148
8.88k
      current_resource->add_permissions(permission);
149
8.88k
    }
150
1.89k
    Status s = catalog_manager_->sys_catalog_->Upsert(catalog_manager_->leader_ready_term(), rp);
151
1.89k
    if (!s.ok()) {
152
0
      s = s.CloneAndPrepend(Substitute(
153
0
          "An error occurred while updating permissions in sys-catalog: $0", s.ToString()));
154
0
      LOG(WARNING) << s;
155
0
      return CheckIfNoLongerLeaderAndSetupError(s, resp);
156
0
    }
157
1.89k
    TRACE("Wrote Permission to sys-catalog");
158
1.89k
    role_info_mutation.Commit();
159
160
1.89k
    BuildResourcePermissionsUnlocked();
161
1.89k
  }
162
0
  return Status::OK();
163
1.89k
}
yb::Status yb::master::PermissionsManager::GrantPermissions<yb::master::CreateTableResponsePB>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::vector<yb::PermissionType, std::__1::allocator<yb::PermissionType> > const&, yb::ResourceType, yb::master::CreateTableResponsePB*)
Line
Count
Source
117
242
    RespClass* resp) {
118
242
  LockGuard lock(mutex_);
119
120
242
  scoped_refptr<RoleInfo> rp;
121
242
  rp = FindPtrOrNull(roles_map_, role_name);
122
242
  if (rp == nullptr) {
123
0
    const Status s = STATUS_SUBSTITUTE(NotFound, "Role $0 was not found", role_name);
124
0
    return SetupError(resp->mutable_error(), MasterErrorPB::ROLE_NOT_FOUND, s);
125
0
  }
126
127
242
  RETURN_NOT_OK(IncrementRolesVersionUnlocked());
128
129
242
  {
130
242
    SysRoleEntryPB* metadata;
131
242
    ScopedMutation <RoleInfo> role_info_mutation(rp.get());
132
242
    metadata = &rp->mutable_metadata()->mutable_dirty()->pb;
133
134
242
    ResourcePermissionsPB* current_resource = metadata->add_resources();
135
136
242
    current_resource->set_canonical_resource(canonical_resource);
137
242
    current_resource->set_resource_type(resource_type);
138
242
    current_resource->set_resource_name(resource_name);
139
242
    current_resource->set_namespace_name(keyspace);
140
141
1.21k
    for (const auto& permission : permissions) {
142
1.21k
      if (permission == PermissionType::DESCRIBE_PERMISSION &&
143
1.21k
          
resource_type != ResourceType::ROLE0
&&
144
1.21k
          
resource_type != ResourceType::ALL_ROLES0
) {
145
        // Describe permission should only be granted to the role resource.
146
0
        continue;
147
0
      }
148
1.21k
      current_resource->add_permissions(permission);
149
1.21k
    }
150
242
    Status s = catalog_manager_->sys_catalog_->Upsert(catalog_manager_->leader_ready_term(), rp);
151
242
    if (!s.ok()) {
152
0
      s = s.CloneAndPrepend(Substitute(
153
0
          "An error occurred while updating permissions in sys-catalog: $0", s.ToString()));
154
0
      LOG(WARNING) << s;
155
0
      return CheckIfNoLongerLeaderAndSetupError(s, resp);
156
0
    }
157
242
    TRACE("Wrote Permission to sys-catalog");
158
242
    role_info_mutation.Commit();
159
160
242
    BuildResourcePermissionsUnlocked();
161
242
  }
162
0
  return Status::OK();
163
242
}
yb::Status yb::master::PermissionsManager::GrantPermissions<yb::master::CreateNamespaceResponsePB>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::vector<yb::PermissionType, std::__1::allocator<yb::PermissionType> > const&, yb::ResourceType, yb::master::CreateNamespaceResponsePB*)
Line
Count
Source
117
904
    RespClass* resp) {
118
904
  LockGuard lock(mutex_);
119
120
904
  scoped_refptr<RoleInfo> rp;
121
904
  rp = FindPtrOrNull(roles_map_, role_name);
122
904
  if (rp == nullptr) {
123
0
    const Status s = STATUS_SUBSTITUTE(NotFound, "Role $0 was not found", role_name);
124
0
    return SetupError(resp->mutable_error(), MasterErrorPB::ROLE_NOT_FOUND, s);
125
0
  }
126
127
904
  RETURN_NOT_OK(IncrementRolesVersionUnlocked());
128
129
904
  {
130
904
    SysRoleEntryPB* metadata;
131
904
    ScopedMutation <RoleInfo> role_info_mutation(rp.get());
132
904
    metadata = &rp->mutable_metadata()->mutable_dirty()->pb;
133
134
904
    ResourcePermissionsPB* current_resource = metadata->add_resources();
135
136
904
    current_resource->set_canonical_resource(canonical_resource);
137
904
    current_resource->set_resource_type(resource_type);
138
904
    current_resource->set_resource_name(resource_name);
139
904
    current_resource->set_namespace_name(keyspace);
140
141
5.42k
    for (const auto& permission : permissions) {
142
5.42k
      if (permission == PermissionType::DESCRIBE_PERMISSION &&
143
5.42k
          
resource_type != ResourceType::ROLE0
&&
144
5.42k
          
resource_type != ResourceType::ALL_ROLES0
) {
145
        // Describe permission should only be granted to the role resource.
146
0
        continue;
147
0
      }
148
5.42k
      current_resource->add_permissions(permission);
149
5.42k
    }
150
904
    Status s = catalog_manager_->sys_catalog_->Upsert(catalog_manager_->leader_ready_term(), rp);
151
904
    if (!s.ok()) {
152
0
      s = s.CloneAndPrepend(Substitute(
153
0
          "An error occurred while updating permissions in sys-catalog: $0", s.ToString()));
154
0
      LOG(WARNING) << s;
155
0
      return CheckIfNoLongerLeaderAndSetupError(s, resp);
156
0
    }
157
904
    TRACE("Wrote Permission to sys-catalog");
158
904
    role_info_mutation.Commit();
159
160
904
    BuildResourcePermissionsUnlocked();
161
904
  }
162
0
  return Status::OK();
163
904
}
yb::Status yb::master::PermissionsManager::GrantPermissions<yb::master::CreateRoleResponsePB>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::vector<yb::PermissionType, std::__1::allocator<yb::PermissionType> > const&, yb::ResourceType, yb::master::CreateRoleResponsePB*)
Line
Count
Source
117
751
    RespClass* resp) {
118
751
  LockGuard lock(mutex_);
119
120
751
  scoped_refptr<RoleInfo> rp;
121
751
  rp = FindPtrOrNull(roles_map_, role_name);
122
751
  if (rp == nullptr) {
123
0
    const Status s = STATUS_SUBSTITUTE(NotFound, "Role $0 was not found", role_name);
124
0
    return SetupError(resp->mutable_error(), MasterErrorPB::ROLE_NOT_FOUND, s);
125
0
  }
126
127
751
  RETURN_NOT_OK(IncrementRolesVersionUnlocked());
128
129
751
  {
130
751
    SysRoleEntryPB* metadata;
131
751
    ScopedMutation <RoleInfo> role_info_mutation(rp.get());
132
751
    metadata = &rp->mutable_metadata()->mutable_dirty()->pb;
133
134
751
    ResourcePermissionsPB* current_resource = metadata->add_resources();
135
136
751
    current_resource->set_canonical_resource(canonical_resource);
137
751
    current_resource->set_resource_type(resource_type);
138
751
    current_resource->set_resource_name(resource_name);
139
751
    current_resource->set_namespace_name(keyspace);
140
141
2.25k
    for (const auto& permission : permissions) {
142
2.25k
      if (permission == PermissionType::DESCRIBE_PERMISSION &&
143
2.25k
          
resource_type != ResourceType::ROLE0
&&
144
2.25k
          
resource_type != ResourceType::ALL_ROLES0
) {
145
        // Describe permission should only be granted to the role resource.
146
0
        continue;
147
0
      }
148
2.25k
      current_resource->add_permissions(permission);
149
2.25k
    }
150
751
    Status s = catalog_manager_->sys_catalog_->Upsert(catalog_manager_->leader_ready_term(), rp);
151
751
    if (!s.ok()) {
152
0
      s = s.CloneAndPrepend(Substitute(
153
0
          "An error occurred while updating permissions in sys-catalog: $0", s.ToString()));
154
0
      LOG(WARNING) << s;
155
0
      return CheckIfNoLongerLeaderAndSetupError(s, resp);
156
0
    }
157
751
    TRACE("Wrote Permission to sys-catalog");
158
751
    role_info_mutation.Commit();
159
160
751
    BuildResourcePermissionsUnlocked();
161
751
  }
162
0
  return Status::OK();
163
751
}
164
165
// TODO: get rid of explicit instantiations.
166
template Status PermissionsManager::GrantPermissions<CreateTableResponsePB>(
167
    const RoleName& role_name,
168
    const std::string& canonical_resource,
169
    const std::string& resource_name,
170
    const NamespaceName& keyspace,
171
    const std::vector<PermissionType>& permissions,
172
    const ResourceType resource_type,
173
    CreateTableResponsePB* resp);
174
175
template Status PermissionsManager::GrantPermissions<CreateNamespaceResponsePB>(
176
    const RoleName& role_name,
177
    const std::string& canonical_resource,
178
    const std::string& resource_name,
179
    const NamespaceName& keyspace,
180
    const std::vector<PermissionType>& permissions,
181
    const ResourceType resource_type,
182
    CreateNamespaceResponsePB* resp);
183
184
// Create a SysVersionInfo object to track the roles versions.
185
6.02k
Status PermissionsManager::IncrementRolesVersionUnlocked() {
186
  // Prepare write.
187
6.02k
  auto l = CHECK_NOTNULL(security_config_.get())->LockForWrite();
188
6.02k
  const uint64_t roles_version = l.mutable_data()->pb.security_config().roles_version();
189
6.02k
  if (roles_version == std::numeric_limits<uint64_t>::max()) {
190
0
    DFATAL_OR_RETURN_NOT_OK(
191
0
        STATUS_SUBSTITUTE(IllegalState,
192
0
                          "Roles version reached max allowable integer: $0", roles_version));
193
0
  }
194
6.02k
  l.mutable_data()->pb.mutable_security_config()->set_roles_version(roles_version + 1);
195
196
6.02k
  TRACE("Set CatalogManager's roles version");
197
198
  // Write to sys_catalog and in memory.
199
6.02k
  RETURN_NOT_OK(catalog_manager_->sys_catalog_->Upsert(
200
6.02k
      catalog_manager_->leader_ready_term(), security_config_));
201
202
6.02k
  l.Commit();
203
6.02k
  return Status::OK();
204
6.02k
}
205
206
template<class RespClass>
207
Status PermissionsManager::RemoveAllPermissionsForResourceUnlocked(
208
    const std::string& canonical_resource,
209
7.84k
    RespClass* resp) {
210
211
7.84k
  bool permissions_modified = false;
212
10.0k
  for (const auto& e : roles_map_) {
213
10.0k
    scoped_refptr<RoleInfo> rp = e.second;
214
10.0k
    ScopedMutation<RoleInfo> role_info_mutation(rp.get());
215
10.0k
    auto* resources = rp->mutable_metadata()->mutable_dirty()->pb.mutable_resources();
216
12.6k
    for (auto itr = resources->begin(); itr != resources->end(); 
itr++2.55k
) {
217
4.62k
      if (itr->canonical_resource() == canonical_resource) {
218
2.06k
        resources->erase(itr);
219
2.06k
        role_info_mutation.Commit();
220
2.06k
        permissions_modified = true;
221
2.06k
        break;
222
2.06k
      }
223
4.62k
    }
224
10.0k
  }
225
226
  // Increment the roles version and update the cache only if there was a modification to the
227
  // permissions.
228
7.84k
  if (permissions_modified) {
229
1.82k
    const Status s = IncrementRolesVersionUnlocked();
230
1.82k
    if (!s.ok()) {
231
0
      return CheckIfNoLongerLeaderAndSetupError(s, resp);
232
0
    }
233
234
1.82k
    BuildResourcePermissionsUnlocked();
235
1.82k
  }
236
237
7.84k
  return Status::OK();
238
7.84k
}
yb::Status yb::master::PermissionsManager::RemoveAllPermissionsForResourceUnlocked<yb::master::DeleteTableResponsePB>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::master::DeleteTableResponsePB*)
Line
Count
Source
209
5.58k
    RespClass* resp) {
210
211
5.58k
  bool permissions_modified = false;
212
5.88k
  for (const auto& e : roles_map_) {
213
5.88k
    scoped_refptr<RoleInfo> rp = e.second;
214
5.88k
    ScopedMutation<RoleInfo> role_info_mutation(rp.get());
215
5.88k
    auto* resources = rp->mutable_metadata()->mutable_dirty()->pb.mutable_resources();
216
6.73k
    for (auto itr = resources->begin(); itr != resources->end(); 
itr++849
) {
217
1.18k
      if (itr->canonical_resource() == canonical_resource) {
218
331
        resources->erase(itr);
219
331
        role_info_mutation.Commit();
220
331
        permissions_modified = true;
221
331
        break;
222
331
      }
223
1.18k
    }
224
5.88k
  }
225
226
  // Increment the roles version and update the cache only if there was a modification to the
227
  // permissions.
228
5.58k
  if (permissions_modified) {
229
235
    const Status s = IncrementRolesVersionUnlocked();
230
235
    if (!s.ok()) {
231
0
      return CheckIfNoLongerLeaderAndSetupError(s, resp);
232
0
    }
233
234
235
    BuildResourcePermissionsUnlocked();
235
235
  }
236
237
5.58k
  return Status::OK();
238
5.58k
}
yb::Status yb::master::PermissionsManager::RemoveAllPermissionsForResourceUnlocked<yb::master::DeleteNamespaceResponsePB>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::master::DeleteNamespaceResponsePB*)
Line
Count
Source
209
1.53k
    RespClass* resp) {
210
211
1.53k
  bool permissions_modified = false;
212
2.88k
  for (const auto& e : roles_map_) {
213
2.88k
    scoped_refptr<RoleInfo> rp = e.second;
214
2.88k
    ScopedMutation<RoleInfo> role_info_mutation(rp.get());
215
2.88k
    auto* resources = rp->mutable_metadata()->mutable_dirty()->pb.mutable_resources();
216
4.05k
    for (auto itr = resources->begin(); itr != resources->end(); 
itr++1.17k
) {
217
2.14k
      if (itr->canonical_resource() == canonical_resource) {
218
977
        resources->erase(itr);
219
977
        role_info_mutation.Commit();
220
977
        permissions_modified = true;
221
977
        break;
222
977
      }
223
2.14k
    }
224
2.88k
  }
225
226
  // Increment the roles version and update the cache only if there was a modification to the
227
  // permissions.
228
1.53k
  if (permissions_modified) {
229
872
    const Status s = IncrementRolesVersionUnlocked();
230
872
    if (!s.ok()) {
231
0
      return CheckIfNoLongerLeaderAndSetupError(s, resp);
232
0
    }
233
234
872
    BuildResourcePermissionsUnlocked();
235
872
  }
236
237
1.53k
  return Status::OK();
238
1.53k
}
yb::Status yb::master::PermissionsManager::RemoveAllPermissionsForResourceUnlocked<yb::master::DeleteRoleResponsePB>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::master::DeleteRoleResponsePB*)
Line
Count
Source
209
724
    RespClass* resp) {
210
211
724
  bool permissions_modified = false;
212
1.27k
  for (const auto& e : roles_map_) {
213
1.27k
    scoped_refptr<RoleInfo> rp = e.second;
214
1.27k
    ScopedMutation<RoleInfo> role_info_mutation(rp.get());
215
1.27k
    auto* resources = rp->mutable_metadata()->mutable_dirty()->pb.mutable_resources();
216
1.81k
    for (auto itr = resources->begin(); itr != resources->end(); 
itr++537
) {
217
1.29k
      if (itr->canonical_resource() == canonical_resource) {
218
756
        resources->erase(itr);
219
756
        role_info_mutation.Commit();
220
756
        permissions_modified = true;
221
756
        break;
222
756
      }
223
1.29k
    }
224
1.27k
  }
225
226
  // Increment the roles version and update the cache only if there was a modification to the
227
  // permissions.
228
724
  if (permissions_modified) {
229
717
    const Status s = IncrementRolesVersionUnlocked();
230
717
    if (!s.ok()) {
231
0
      return CheckIfNoLongerLeaderAndSetupError(s, resp);
232
0
    }
233
234
717
    BuildResourcePermissionsUnlocked();
235
717
  }
236
237
724
  return Status::OK();
238
724
}
239
240
template<class RespClass>
241
Status PermissionsManager::RemoveAllPermissionsForResource(
242
    const std::string& canonical_resource,
243
7.12k
    RespClass* resp) {
244
7.12k
  LockGuard lock(mutex_);
245
7.12k
  return RemoveAllPermissionsForResourceUnlocked(canonical_resource, resp);
246
7.12k
}
yb::Status yb::master::PermissionsManager::RemoveAllPermissionsForResource<yb::master::DeleteTableResponsePB>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::master::DeleteTableResponsePB*)
Line
Count
Source
243
5.58k
    RespClass* resp) {
244
5.58k
  LockGuard lock(mutex_);
245
5.58k
  return RemoveAllPermissionsForResourceUnlocked(canonical_resource, resp);
246
5.58k
}
yb::Status yb::master::PermissionsManager::RemoveAllPermissionsForResource<yb::master::DeleteNamespaceResponsePB>(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, yb::master::DeleteNamespaceResponsePB*)
Line
Count
Source
243
1.53k
    RespClass* resp) {
244
1.53k
  LockGuard lock(mutex_);
245
1.53k
  return RemoveAllPermissionsForResourceUnlocked(canonical_resource, resp);
246
1.53k
}
247
248
// TODO: get rid of the need for explicit instantations.
249
template
250
Status PermissionsManager::RemoveAllPermissionsForResource<DeleteTableResponsePB>(
251
    const std::string& canonical_resource,
252
    DeleteTableResponsePB* resp);
253
254
template
255
Status PermissionsManager::RemoveAllPermissionsForResource<DeleteNamespaceResponsePB>(
256
    const std::string& canonical_resource,
257
    DeleteNamespaceResponsePB* resp);
258
259
Status PermissionsManager::CreateRoleUnlocked(
260
    const std::string& role_name,
261
    const std::string& salted_hash,
262
    const bool login,
263
    const bool superuser,
264
    int64_t term,
265
2.90k
    const bool increment_roles_version) {
266
  // Create Entry.
267
2.90k
  SysRoleEntryPB role_entry;
268
2.90k
  role_entry.set_role(role_name);
269
2.90k
  role_entry.set_can_login(login);
270
2.90k
  role_entry.set_is_superuser(superuser);
271
2.90k
  if (salted_hash.size() != 0) {
272
2.86k
    role_entry.set_salted_hash(salted_hash);
273
2.86k
  }
274
275
2.90k
  if (increment_roles_version) {
276
    // Increment roles version.
277
751
    RETURN_NOT_OK(IncrementRolesVersionUnlocked());
278
751
  }
279
280
  // Create in memory object.
281
2.90k
  scoped_refptr<RoleInfo> role = new RoleInfo(role_name);
282
283
  // Prepare write.
284
2.90k
  {
285
2.90k
    auto l = role->LockForWrite();
286
2.90k
    l.mutable_data()->pb = std::move(role_entry);
287
288
2.90k
    roles_map_[role_name] = role;
289
2.90k
    TRACE("Inserted new role info into CatalogManager maps");
290
291
    // Write to sys_catalog and in memory.
292
2.90k
    RETURN_NOT_OK(catalog_manager_->sys_catalog_->Upsert(term, role));
293
294
2.90k
    l.Commit();
295
2.90k
  }
296
0
  BuildRecursiveRolesUnlocked();
297
2.90k
  return Status::OK();
298
2.90k
}
299
300
Status PermissionsManager::CreateRole(
301
    const CreateRoleRequestPB* req,
302
    CreateRoleResponsePB* resp,
303
757
    rpc::RpcContext* rpc) {
304
305
757
  LOG(INFO) << "CreateRole from " << RequestorString(rpc) << ": " << req->DebugString();
306
307
757
  Status s;
308
757
  {
309
757
    TRACE("Acquired lock");
310
757
    LockGuard lock(mutex_);
311
    // Only a SUPERUSER role can create another SUPERUSER role. In Apache Cassandra this gets
312
    // checked before the existence of the new role.
313
757
    if (req->superuser()) {
314
52
      scoped_refptr<RoleInfo> creator_role = FindPtrOrNull(roles_map_, req->creator_role_name());
315
52
      if (creator_role == nullptr) {
316
0
        s = STATUS_SUBSTITUTE(NotFound, "role $0 does not exist", req->creator_role_name());
317
0
        return SetupError(resp->mutable_error(), MasterErrorPB::ROLE_NOT_FOUND, s);
318
0
      }
319
320
52
      auto clr = creator_role->LockForRead();
321
52
      if (!clr->pb.is_superuser()) {
322
6
        s = STATUS(NotAuthorized, "Only superusers can create a role with superuser status");
323
6
        return SetupError(resp->mutable_error(), MasterErrorPB::NOT_AUTHORIZED, s);
324
6
      }
325
52
    }
326
751
    if (FindPtrOrNull(roles_map_, req->name()) != nullptr) {
327
0
      s = STATUS_SUBSTITUTE(AlreadyPresent, "Role $0 already exists", req->name());
328
0
      return SetupError(resp->mutable_error(), MasterErrorPB::ROLE_ALREADY_PRESENT, s);
329
0
    }
330
751
    s = CreateRoleUnlocked(
331
751
        req->name(), req->salted_hash(), req->login(), req->superuser(),
332
751
        catalog_manager_->leader_ready_term());
333
751
  }
334
751
  if (PREDICT_TRUE(s.ok())) {
335
751
    LOG(INFO) << "Created role: " << req->name();
336
751
    if (req->has_creator_role_name()) {
337
751
      RETURN_NOT_OK(GrantPermissions(req->creator_role_name(),
338
751
                                     get_canonical_role(req->name()),
339
751
                                     req->name() /* resource name */,
340
751
                                     "" /* keyspace name */,
341
751
                                     all_permissions_for_resource(ResourceType::ROLE),
342
751
                                     ResourceType::ROLE,
343
751
                                     resp));
344
751
    }
345
751
  }
346
751
  return s;
347
751
}
348
349
Status PermissionsManager::AlterRole(
350
    const AlterRoleRequestPB* req,
351
    AlterRoleResponsePB* resp,
352
58
    rpc::RpcContext* rpc) {
353
354
58
  VLOG
(1) << "AlterRole from " << RequestorString(rpc) << ": " << req->DebugString()0
;
355
356
58
  Status s;
357
358
58
  TRACE("Acquired lock");
359
58
  LockGuard lock(mutex_);
360
361
58
  auto role = FindPtrOrNull(roles_map_, req->name());
362
58
  if (role == nullptr) {
363
3
    s = STATUS_SUBSTITUTE(NotFound, "Role $0 does not exist", req->name());
364
3
    return SetupError(resp->mutable_error(), MasterErrorPB::ROLE_NOT_FOUND, s);
365
3
  }
366
367
  // Increment roles version.
368
55
  RETURN_NOT_OK(IncrementRolesVersionUnlocked());
369
370
  // Modify the role.
371
55
  {
372
55
    auto l = role->LockForWrite();
373
374
    // If the role we are trying to alter is a SUPERUSER, and the request is trying to alter the
375
    // SUPERUSER field for that role, the role requesting the alter operation must be a SUPERUSER
376
    // too.
377
55
    if (req->has_superuser()) {
378
46
      auto current_role = FindPtrOrNull(roles_map_, req->current_role());
379
46
      if (current_role == nullptr) {
380
0
        s = STATUS_SUBSTITUTE(NotFound, "Internal error: role $0 does not exist",
381
0
                              req->current_role());
382
0
        return SetupError(resp->mutable_error(), MasterErrorPB::ROLE_NOT_FOUND, s);
383
0
      }
384
385
      // Fix for https://github.com/yugabyte/yugabyte-db/issues/2505.
386
      // A role cannot modify its own superuser status, nor the superuser status of any role granted
387
      // to it directly or through inheritance. This check should happen before the next check that
388
      // verifies that the role requesting the modification is a superuser.
389
46
      if (l->pb.role() == req->current_role() || 
IsMemberOf(l->pb.role(), req->current_role())44
) {
390
4
        s = STATUS(NotAuthorized, "You aren't allowed to alter your own superuser status or that "
391
4
                                  "of a role granted to you");
392
4
        return SetupError(resp->mutable_error(), MasterErrorPB::NOT_AUTHORIZED, s);
393
4
      }
394
395
      // Don't allow a non-superuser role to modify the superuser status of another role.
396
42
      auto clr = current_role->LockForRead();
397
42
      if (!clr->pb.is_superuser()) {
398
6
        s = STATUS(NotAuthorized, "Only superusers are allowed to alter superuser status");
399
6
        return SetupError(resp->mutable_error(), MasterErrorPB::NOT_AUTHORIZED, s);
400
6
      }
401
42
    }
402
403
45
    if (req->has_login()) {
404
10
      l.mutable_data()->pb.set_can_login(req->login());
405
10
    }
406
45
    if (req->has_superuser()) {
407
36
      l.mutable_data()->pb.set_is_superuser(req->superuser());
408
36
    }
409
45
    if (req->has_salted_hash()) {
410
9
      l.mutable_data()->pb.set_salted_hash(req->salted_hash());
411
9
    }
412
413
45
    s = catalog_manager_->sys_catalog_->Upsert(catalog_manager_->leader_ready_term(), role);
414
45
    if (!s.ok()) {
415
0
      LOG(ERROR) << "Unable to alter role " << req->name() << ": " << s;
416
0
      return s;
417
0
    }
418
45
    l.Commit();
419
45
  }
420
0
  VLOG(1) << "Altered role with request: " << req->ShortDebugString();
421
422
45
  BuildResourcePermissionsUnlocked();
423
45
  return Status::OK();
424
45
}
425
426
Status PermissionsManager::DeleteRole(
427
    const DeleteRoleRequestPB* req,
428
    DeleteRoleResponsePB* resp,
429
730
    rpc::RpcContext* rpc) {
430
431
730
  LOG(INFO) << "Servicing DeleteRole request from " << RequestorString(rpc)
432
730
            << ": " << req->ShortDebugString();
433
730
  Status s;
434
435
730
  if (!req->has_name()) {
436
0
    s = STATUS(InvalidArgument, "No role name given", req->DebugString());
437
0
    return SetupError(resp->mutable_error(), MasterErrorPB::ROLE_NOT_FOUND, s);
438
0
  }
439
440
730
  TRACE("Acquired lock");
441
730
  LockGuard lock(mutex_);
442
443
730
  auto role = FindPtrOrNull(roles_map_, req->name());
444
730
  if (role == nullptr) {
445
3
    s = STATUS_SUBSTITUTE(NotFound, "Role $0 does not exist", req->name());
446
3
    return SetupError(resp->mutable_error(), MasterErrorPB::ROLE_NOT_FOUND, s);
447
3
  }
448
449
  // Increment roles version.
450
727
  RETURN_NOT_OK(IncrementRolesVersionUnlocked());
451
452
  // Find all the roles where req->name() is part of the member_of list since we will need to remove
453
  // the role we are deleting from those lists.
454
727
  auto direct_member_of = DirectMemberOf(req->name());
455
727
  for (const auto& role_name : direct_member_of) {
456
19
    auto role = FindPtrOrNull(roles_map_, role_name);
457
19
    if (role == nullptr) {
458
0
      continue;
459
0
    }
460
19
    role->mutable_metadata()->StartMutation();
461
19
    auto metadata = &role->mutable_metadata()->mutable_dirty()->pb;
462
463
    // Create a new list that contains all the original roles in member_of with the exception of
464
    // the role we are deleting.
465
19
    vector<string> member_of_new_list;
466
21
    for (const auto& member_of : metadata->member_of()) {
467
21
      if (member_of != req->name()) {
468
2
        member_of_new_list.push_back(member_of);
469
2
      }
470
21
    }
471
472
    // Remove the role we are deleting from the list member_of.
473
19
    metadata->clear_member_of();
474
19
    for (auto member_of : member_of_new_list) {
475
2
      metadata->add_member_of(std::move(member_of));
476
2
    }
477
478
    // Update sys-catalog with the new member_of list for this role.
479
19
    s = catalog_manager_->sys_catalog_->Upsert(catalog_manager_->leader_ready_term(), role);
480
19
    if (!s.ok()) {
481
0
      LOG(ERROR) << "Unable to remove role " << req->name()
482
0
                 << " from member_of list for role " << role_name;
483
0
      role->mutable_metadata()->AbortMutation();
484
19
    } else {
485
19
      role->mutable_metadata()->CommitMutation();
486
19
    }
487
19
  }
488
489
727
  {
490
727
    auto l = role->LockForWrite();
491
492
727
    if (l.mutable_data()->pb.is_superuser()) {
493
      // If the role we are trying to remove is a SUPERUSER, the role trying to remove it has to be
494
      // a SUPERUSER too.
495
55
      auto current_role = FindPtrOrNull(roles_map_, req->current_role());
496
55
      if (current_role == nullptr) {
497
0
        s = STATUS_SUBSTITUTE(NotFound, "Internal error: role $0 does not exist",
498
0
                              req->current_role());
499
0
        return SetupError(resp->mutable_error(), MasterErrorPB::ROLE_NOT_FOUND, s);
500
0
      }
501
502
55
      auto clr = current_role->LockForRead();
503
55
      if (!clr->pb.is_superuser()) {
504
3
        s = STATUS(NotAuthorized, "Only superusers can drop a role with superuser status");
505
3
        return SetupError(resp->mutable_error(), MasterErrorPB::NOT_AUTHORIZED, s);
506
3
      }
507
55
    }
508
509
    // Write to sys_catalog and in memory.
510
724
    RETURN_NOT_OK(catalog_manager_->sys_catalog_->Delete(
511
724
        catalog_manager_->leader_ready_term(), role));
512
    // Remove it from the maps.
513
724
    if (roles_map_.erase(role->id()) < 1) {
514
0
      PANIC_RPC(rpc, "Could not remove role from map, role name=" + role->id());
515
0
    }
516
517
    // Update the in-memory state.
518
724
    TRACE("Committing in-memory state");
519
724
    l.Commit();
520
724
  }
521
0
  BuildRecursiveRolesUnlocked();
522
523
  // Remove all the permissions granted on the deleted role to any role. See DeleteTable() comment
524
  // for for more details.
525
724
  string canonical_resource = get_canonical_role(req->name());
526
724
  RETURN_NOT_OK(RemoveAllPermissionsForResourceUnlocked(canonical_resource, resp));
527
528
724
  LOG(INFO) << "Successfully deleted role " << role->ToString()
529
724
            << " per request from " << RequestorString(rpc);
530
531
724
  return Status::OK();
532
724
}
533
534
Status PermissionsManager::GrantRevokeRole(
535
    const GrantRevokeRoleRequestPB* req,
536
    GrantRevokeRoleResponsePB* resp,
537
52
    rpc::RpcContext* rpc) {
538
539
52
  LOG(INFO) << "Servicing " << (req->revoke() ? 
"RevokeRole"8
:
"GrantRole"44
)
540
52
            << " request from " << RequestorString(rpc) << ": " << req->ShortDebugString();
541
542
  // Cannot grant or revoke itself.
543
52
  if (req->granted_role() == req->recipient_role()) {
544
2
    if (req->revoke()) {
545
      // Ignore the request. This is what Apache Cassandra does.
546
1
      return Status::OK();
547
1
    }
548
1
    auto s = STATUS_SUBSTITUTE(InvalidArgument,
549
1
        "$0 is a member of $1", req->recipient_role(), req->granted_role());
550
1
    return SetupError(resp->mutable_error(), MasterErrorPB::INVALID_REQUEST, s);
551
2
  }
552
553
50
  Status s;
554
  // If the request is revoke, we need to create a new list of the roles req->recipient_role()
555
  // is member of in which we exclude req->granted_role().
556
50
  if (!req->has_granted_role() || !req->has_recipient_role()) {
557
0
    s = STATUS(InvalidArgument, "No role name given", req->DebugString());
558
0
    return SetupError(resp->mutable_error(), MasterErrorPB::ROLE_NOT_FOUND, s);
559
0
  }
560
561
50
  {
562
50
    constexpr char role_not_found_msg_str[] = "$0 doesn't exist";
563
50
    TRACE("Acquired lock");
564
50
    LockGuard lock(mutex_);
565
566
50
    scoped_refptr<RoleInfo> granted_role;
567
50
    granted_role = FindPtrOrNull(roles_map_, req->granted_role());
568
50
    if (granted_role == nullptr) {
569
1
      s = STATUS_SUBSTITUTE(NotFound, role_not_found_msg_str, req->granted_role());
570
1
      return SetupError(resp->mutable_error(), MasterErrorPB::ROLE_NOT_FOUND, s);
571
1
    }
572
573
49
    scoped_refptr<RoleInfo> recipient_role;
574
49
    recipient_role = FindPtrOrNull(roles_map_, req->recipient_role());
575
49
    if (recipient_role == nullptr) {
576
0
      s = STATUS_SUBSTITUTE(NotFound, role_not_found_msg_str, req->recipient_role());
577
0
      return SetupError(resp->mutable_error(), MasterErrorPB::ROLE_NOT_FOUND, s);
578
0
    }
579
580
    // Both roles are present.
581
49
    SysRoleEntryPB* metadata;
582
49
    {
583
49
      ScopedMutation<RoleInfo> role_info_mutation(recipient_role.get());
584
49
      metadata = &recipient_role->mutable_metadata()->mutable_dirty()->pb;
585
586
      // When revoking a role, the granted role has to be a direct member of the recipient role,
587
      // but when granting a role, if the recipient role has been granted to the granted role either
588
      // directly or through inheritance, we will return an error.
589
49
      if (req->revoke()) {
590
7
        bool direct_member = false;
591
7
        vector<string> member_of_new_list;
592
10
        for (const auto& member_of : metadata->member_of()) {
593
10
          if (member_of == req->granted_role()) {
594
7
            direct_member = true;
595
7
          } else 
if (3
req->revoke()3
) {
596
3
            member_of_new_list.push_back(member_of);
597
3
          }
598
10
        }
599
600
7
        if (!direct_member) {
601
0
          s = STATUS_SUBSTITUTE(InvalidArgument, "$0 is not a member of $1",
602
0
                                req->recipient_role(), req->granted_role());
603
0
          return SetupError(resp->mutable_error(), MasterErrorPB::INVALID_REQUEST, s);
604
0
        }
605
606
7
        metadata->clear_member_of();
607
7
        for (auto member_of : member_of_new_list) {
608
3
          metadata->add_member_of(std::move(member_of));
609
3
        }
610
7
        s = catalog_manager_->sys_catalog_->Upsert(
611
7
            catalog_manager_->leader_ready_term(), recipient_role);
612
42
      } else {
613
        // Let's make sure that we don't have circular dependencies.
614
42
        if (IsMemberOf(req->granted_role(), req->recipient_role()) ||
615
42
            IsMemberOf(req->recipient_role(), req->granted_role()) ||
616
42
            
req->granted_role() == req->recipient_role()41
) {
617
1
          s = STATUS_SUBSTITUTE(InvalidArgument, "$0 is a member of $1",
618
1
                                req->recipient_role(), req->granted_role());
619
1
          return SetupError(resp->mutable_error(), MasterErrorPB::INVALID_REQUEST, s);
620
1
        }
621
41
        metadata->add_member_of(req->granted_role());
622
41
        s = catalog_manager_->sys_catalog_->Upsert(
623
41
            catalog_manager_->leader_ready_term(), recipient_role);
624
41
      }
625
48
      if (!s.ok()) {
626
0
        s = s.CloneAndPrepend(Substitute(
627
0
            "An error occurred while updating roles in sys-catalog: $0", s.ToString()));
628
0
        LOG(WARNING) << s.ToString();
629
0
        return CheckIfNoLongerLeaderAndSetupError(s, resp);
630
0
      }
631
632
48
      LOG(INFO) << "Modified 'member of' field of role " << recipient_role->id();
633
634
      // Increment roles version before commiting the mutation.
635
48
      RETURN_NOT_OK(IncrementRolesVersionUnlocked());
636
637
48
      role_info_mutation.Commit();
638
48
      BuildRecursiveRolesUnlocked();
639
48
    }
640
48
    TRACE("Wrote grant/revoke role to sys-catalog");
641
48
  }
642
643
0
  return Status::OK();
644
48
}
645
646
// Depth first search. We assume that there are no cycles in the graph of dependencies.
647
6.68k
void PermissionsManager::BuildRecursiveRolesUnlocked() {
648
6.68k
  recursive_granted_roles_.clear();
649
650
  // Build the first level member of map and find all the roles that have at least one member in its
651
  // member_of field.
652
8.71k
  for (const auto& e : roles_map_) {
653
8.71k
    const auto& role_name = e.first;
654
8.71k
    if (recursive_granted_roles_.find(role_name) == recursive_granted_roles_.end()) {
655
8.65k
      TraverseRole(role_name, nullptr);
656
8.65k
    }
657
8.71k
  }
658
659
6.68k
  BuildResourcePermissionsUnlocked();
660
6.68k
}
661
662
11.1k
void PermissionsManager::BuildResourcePermissionsUnlocked() {
663
11.1k
  shared_ptr<GetPermissionsResponsePB> response = std::make_shared<GetPermissionsResponsePB>();
664
665
18.6k
  for (const auto& e : recursive_granted_roles_) {
666
18.6k
    const auto& role_name = e.first;
667
    // Get a copy of this set.
668
18.6k
    auto granted_roles = e.second;
669
18.6k
    granted_roles.insert(role_name);
670
18.6k
    auto* role_permissions = response->add_role_permissions();
671
18.6k
    role_permissions->set_role(role_name);
672
18.6k
    const auto& rinfo = roles_map_[role_name];
673
18.6k
    {
674
18.6k
      auto l = rinfo->LockForRead();
675
18.6k
      role_permissions->set_salted_hash(l->pb.salted_hash());
676
18.6k
      role_permissions->set_can_login(l->pb.can_login());
677
18.6k
    }
678
679
    // No permissions on ALL ROLES and ALL KEYSPACES by default.
680
18.6k
    role_permissions->set_all_keyspaces_permissions(0);
681
18.6k
    role_permissions->set_all_roles_permissions(0);
682
683
19.1k
    for (const auto& granted_role : granted_roles) {
684
19.1k
      const auto& role_info = roles_map_[granted_role];
685
19.1k
      auto l = role_info->LockForRead();
686
19.1k
      const auto& pb = l->pb;
687
688
19.1k
      Permissions all_roles_permissions_bitset(role_permissions->all_roles_permissions());
689
19.1k
      Permissions all_keyspaces_permissions_bitset(role_permissions->all_keyspaces_permissions());
690
691
19.1k
      for (const auto& resource : pb.resources()) {
692
16.5k
        Permissions resource_permissions_bitset;
693
694
69.7k
        for (const auto& permission : resource.permissions()) {
695
69.7k
          if (resource.canonical_resource() == kRolesRoleResource) {
696
488
            all_roles_permissions_bitset.set(permission);
697
69.2k
          } else if (resource.canonical_resource() == kRolesDataResource) {
698
1.16k
            all_keyspaces_permissions_bitset.set(permission);
699
68.1k
          } else {
700
68.1k
            resource_permissions_bitset.set(permission);
701
68.1k
          }
702
69.7k
        }
703
704
16.5k
        if (resource.canonical_resource() != kRolesDataResource &&
705
16.5k
            
resource.canonical_resource() != kRolesRoleResource16.0k
) {
706
15.7k
          auto* resource_permissions = role_permissions->add_resource_permissions();
707
15.7k
          resource_permissions->set_canonical_resource(resource.canonical_resource());
708
15.7k
          resource_permissions->set_permissions(
709
15.7k
              narrow_cast<uint32_t>(resource_permissions_bitset.to_ullong()));
710
15.7k
          recursive_granted_permissions_[role_name][resource.canonical_resource()] =
711
15.7k
              resource_permissions_bitset.to_ullong();
712
15.7k
        }
713
16.5k
      }
714
715
19.1k
      role_permissions->set_all_keyspaces_permissions(
716
19.1k
          narrow_cast<uint32_t>(all_keyspaces_permissions_bitset.to_ullong()));
717
19.1k
      VLOG(2) << "Setting all_keyspaces_permissions to "
718
0
              << role_permissions->all_keyspaces_permissions()
719
0
              << " for role " << role_name;
720
721
19.1k
      role_permissions->set_all_roles_permissions(
722
19.1k
          narrow_cast<uint32_t>(all_roles_permissions_bitset.to_ullong()));
723
19.1k
      VLOG(2) << "Setting all_roles_permissions to "
724
0
              << role_permissions->all_roles_permissions()
725
0
              << " for role " << role_name;
726
727
      // TODO: since this gets checked first when enforcing permissions, there is no point in
728
      // populating the rest of the permissions. Furthermore, we should remove any specific
729
      // permissions in the map.
730
      // In other words, if all-roles_all_keyspaces_permissions is equal to superuser_permissions,
731
      // it should be the only field sent to the clients for that specific role.
732
19.1k
      if (pb.is_superuser()) {
733
11.4k
        Permissions superuser_bitset;
734
        // Set all the bits to 1.
735
11.4k
        superuser_bitset.set();
736
737
11.4k
        role_permissions->set_all_keyspaces_permissions(
738
11.4k
            narrow_cast<uint32_t>(superuser_bitset.to_ullong()));
739
11.4k
        role_permissions->set_all_roles_permissions(
740
11.4k
            narrow_cast<uint32_t>(superuser_bitset.to_ullong()));
741
11.4k
      }
742
19.1k
    }
743
18.6k
  }
744
745
11.1k
  auto config = CHECK_NOTNULL(security_config_.get())->LockForRead();
746
11.1k
  response->set_version(config->pb.security_config().roles_version());
747
11.1k
  permissions_cache_ = std::move(response);
748
11.1k
}
749
750
// Get all the permissions granted to resources.
751
Status PermissionsManager::GetPermissions(
752
    const GetPermissionsRequestPB* req,
753
    GetPermissionsResponsePB* resp,
754
116k
    rpc::RpcContext* rpc) {
755
116k
  std::shared_ptr<GetPermissionsResponsePB> permissions_cache;
756
116k
  {
757
116k
    LockGuard lock(mutex_);
758
116k
    if (!permissions_cache_) {
759
0
      BuildRecursiveRolesUnlocked();
760
0
      if (!permissions_cache_) {
761
0
        DFATAL_OR_RETURN_NOT_OK(STATUS(IllegalState, "Unable to build permissions cache"));
762
0
      }
763
0
    }
764
    // Create another reference so that the cache doesn't go away while we are using it.
765
116k
    permissions_cache = permissions_cache_;
766
116k
  }
767
768
0
  boost::optional<uint64_t> request_version;
769
116k
  if (req->has_if_version_greater_than()) {
770
115k
    request_version = req->if_version_greater_than();
771
115k
  }
772
773
116k
  if (request_version && 
permissions_cache->version() == *request_version115k
) {
774
107k
    resp->set_version(permissions_cache->version());
775
107k
    return Status::OK();
776
107k
  } else 
if (9.53k
request_version9.53k
&&
permissions_cache->version() < *request_version8.06k
) {
777
0
    LOG(WARNING) << "GetPermissionsRequestPB version is greater than master's version";
778
0
    Status s = STATUS_SUBSTITUTE(IllegalState,
779
0
        "GetPermissionsRequestPB version $0 is greater than master's version $1. "
780
0
        "Should call GetPermissions again",  *request_version,
781
0
        permissions_cache->version());
782
0
    return SetupError(resp->mutable_error(), MasterErrorPB::CONFIG_VERSION_MISMATCH, s);
783
0
  }
784
785
  // permisions_cache_->version() > req->if_version_greater_than() or
786
  // req->if_version_greather_than() is not set.
787
9.53k
  resp->CopyFrom(*permissions_cache);
788
9.53k
  return Status::OK();
789
116k
}
790
791
128
bool PermissionsManager::IsMemberOf(const RoleName& granted_role, const RoleName& role) {
792
128
  const auto& iter = recursive_granted_roles_.find(role);
793
128
  if (iter == recursive_granted_roles_.end()) {
794
    // No roles have been granted to role.
795
0
    return false;
796
0
  }
797
798
128
  const auto& granted_roles = iter->second;
799
128
  if (granted_roles.find(granted_role) == granted_roles.end()) {
800
    // granted_role has not been granted directly or through inheritance to role.
801
125
    return false;
802
125
  }
803
804
3
  return true;
805
128
}
806
807
Status PermissionsManager::GrantRevokePermission(
808
    const GrantRevokePermissionRequestPB* req,
809
    GrantRevokePermissionResponsePB* resp,
810
721
    rpc::RpcContext* rpc) {
811
721
  LOG(INFO) << (req->revoke() ? 
"Revoke"126
:
"Grant"595
) << " permission "
812
721
            << RequestorString(rpc) << ": " << req->ShortDebugString();
813
721
  RETURN_NOT_OK(catalog_manager_->CheckResource(req, resp));
814
815
721
  LockGuard lock(mutex_);
816
721
  TRACE("Acquired lock");
817
721
  Status s;
818
819
721
  if (req->resource_type() == ResourceType::ROLE) {
820
85
    scoped_refptr<RoleInfo> role;
821
85
    role = FindPtrOrNull(roles_map_, req->resource_name());
822
85
    if (role == nullptr) {
823
0
      s = STATUS_SUBSTITUTE(NotFound, "Resource <role $0> does not exist", req->role_name());
824
0
      return SetupError(resp->mutable_error(), MasterErrorPB::ROLE_NOT_FOUND, s);
825
0
    }
826
85
  }
827
828
721
  scoped_refptr<RoleInfo> rp;
829
721
  rp = FindPtrOrNull(roles_map_, req->role_name());
830
721
  if (rp == nullptr) {
831
0
    s = STATUS_SUBSTITUTE(InvalidArgument, "Role $0 doesn't exist", req->role_name());
832
0
    return SetupError(resp->mutable_error(), MasterErrorPB::ROLE_NOT_FOUND, s);
833
0
  }
834
835
  // Increment roles version.
836
721
  RETURN_NOT_OK(IncrementRolesVersionUnlocked());
837
838
721
  SysRoleEntryPB* metadata;
839
721
  {
840
721
    ScopedMutation<RoleInfo> role_info_mutation(rp.get());
841
721
    metadata = &rp->mutable_metadata()->mutable_dirty()->pb;
842
843
721
    ResourcePermissionsPB* current_resource = nullptr;
844
721
    auto current_resource_iter = metadata->mutable_resources()->end();
845
721
    for (current_resource_iter = metadata->mutable_resources()->begin();
846
766
         current_resource_iter != metadata->mutable_resources()->end(); 
current_resource_iter++45
) {
847
390
      if (current_resource_iter->canonical_resource() == req->canonical_resource()) {
848
345
        break;
849
345
      }
850
390
    }
851
852
721
    if (current_resource_iter != metadata->mutable_resources()->end()) {
853
345
      current_resource = &(*current_resource_iter);
854
345
    }
855
856
721
    if (current_resource == nullptr) {
857
376
      if (req->revoke()) {
858
4
        return Status::OK();
859
4
      }
860
861
372
      current_resource = metadata->add_resources();
862
372
      current_resource_iter = std::prev(metadata->mutable_resources()->end());
863
864
372
      current_resource->set_canonical_resource(req->canonical_resource());
865
372
      current_resource->set_resource_type(req->resource_type());
866
372
      if (req->has_resource_name()) {
867
284
        current_resource->set_resource_name(req->resource_name());
868
284
      }
869
372
      if (req->has_namespace_()) {
870
234
        current_resource->set_namespace_name(req->namespace_().name());
871
234
      }
872
372
    }
873
874
717
    if (req->permission() != PermissionType::ALL_PERMISSION) {
875
649
      auto permission_iter = current_resource->permissions().end();
876
649
      for (permission_iter = current_resource->permissions().begin();
877
1.13k
           permission_iter != current_resource->permissions().end(); 
permission_iter++490
) {
878
605
        if (*permission_iter == req->permission()) {
879
115
          break;
880
115
        }
881
605
      }
882
883
      // Resource doesn't have the permission, and we got a GRANT request.
884
649
      if (permission_iter == current_resource->permissions().end() && 
!req->revoke()534
) {
885
        // Verify that the permission is supported by the resource.
886
534
        if (!valid_permission_for_resource(req->permission(), req->resource_type())) {
887
0
          s = STATUS_SUBSTITUTE(InvalidArgument, "Invalid permission $0 for resource type $1",
888
0
              req->permission(), ResourceType_Name(req->resource_type()));
889
          // This should never happen because invalid permissions get rejected in the analysis part.
890
          // So crash the process if in debug mode.
891
0
          DFATAL_OR_RETURN_NOT_OK(s);
892
0
          return SetupError(resp->mutable_error(), MasterErrorPB::INVALID_REQUEST, s);
893
0
        }
894
534
        current_resource->add_permissions(req->permission());
895
534
      } else 
if (115
permission_iter != current_resource->permissions().end()115
&&
req->revoke()115
) {
896
115
        current_resource->mutable_permissions()->erase(permission_iter);
897
115
      }
898
649
    } else {
899
      // ALL permissions.
900
      // TODO (Bristy) : Add different permissions for different resources based on role names.
901
      // For REVOKE ALL we clear all the permissions and do nothing else.
902
68
      current_resource->clear_permissions();
903
904
68
      if (!req->revoke()) {
905
308
        for (const auto& permission : all_permissions_for_resource(req->resource_type())) {
906
308
          current_resource->add_permissions(permission);
907
308
        }
908
61
      }
909
68
    }
910
911
    // If this resource doesn't have any more permissions, remove it.
912
717
    if (current_resource->permissions().empty()) {
913
60
      metadata->mutable_resources()->erase(current_resource_iter);
914
60
    }
915
916
717
    s = catalog_manager_->sys_catalog_->Upsert(catalog_manager_->leader_ready_term(), rp);
917
717
    if (!s.ok()) {
918
0
      s = s.CloneAndPrepend(Substitute(
919
0
          "An error occurred while updating permissions in sys-catalog: $0", s.ToString()));
920
0
      LOG(WARNING) << s.ToString();
921
0
      return CheckIfNoLongerLeaderAndSetupError(s, resp);
922
0
    }
923
717
    TRACE("Wrote Permission to sys-catalog");
924
925
717
    role_info_mutation.Commit();
926
717
  }
927
0
  LOG(INFO) << "Modified Permission for role " << rp->id();
928
717
  BuildResourcePermissionsUnlocked();
929
717
  return Status::OK();
930
717
}
931
932
7.71k
void PermissionsManager::GetAllRoles(std::vector<scoped_refptr<RoleInfo>>* roles) {
933
7.71k
  roles->clear();
934
7.71k
  SharedLock lock(mutex_);
935
14.1k
  for (const RoleInfoMap::value_type& e : roles_map_) {
936
14.1k
    roles->push_back(e.second);
937
14.1k
  }
938
7.71k
}
939
940
727
vector<string> PermissionsManager::DirectMemberOf(const RoleName& role) {
941
727
  vector<string> roles;
942
2.00k
  for (const auto& e : roles_map_) {
943
2.00k
    auto l = e.second->LockForRead();
944
2.00k
    const auto& pb = l->pb;
945
2.00k
    for (const auto& member_of : pb.member_of()) {
946
56
      if (member_of == role) {
947
19
        roles.push_back(pb.role());
948
        // No need to keep checking the rest of the members.
949
19
        break;
950
19
      }
951
56
    }
952
2.00k
  }
953
727
  return roles;
954
727
}
955
956
3.00k
void PermissionsManager::BuildRecursiveRoles() {
957
3.00k
  TRACE("Acquired lock");
958
3.00k
  LockGuard lock(mutex_);
959
3.00k
  BuildRecursiveRolesUnlocked();
960
3.00k
}
961
962
void PermissionsManager::TraverseRole(
963
8.75k
    const string& role_name, std::unordered_set<RoleName>* granted_roles) {
964
8.75k
  auto iter = recursive_granted_roles_.find(role_name);
965
  // This node has already been visited. So just add all the granted (directly or through
966
  // inheritance) roles to granted_roles.
967
8.75k
  if (iter != recursive_granted_roles_.end()) {
968
40
    if (granted_roles) {
969
40
      const auto& set = iter->second;
970
40
      granted_roles->insert(set.begin(), set.end());
971
40
    }
972
40
    return;
973
40
  }
974
975
8.71k
  const auto& role_info = roles_map_[role_name];
976
8.71k
  auto l = role_info->LockForRead();
977
8.71k
  const auto& pb = l->pb;
978
8.71k
  if (pb.member_of().size() == 0) {
979
8.63k
    recursive_granted_roles_.insert({role_name, {}});
980
8.63k
  } else {
981
99
    for (const auto& direct_granted_role : pb.member_of()) {
982
99
      recursive_granted_roles_[role_name].insert(direct_granted_role);
983
99
      TraverseRole(direct_granted_role, &recursive_granted_roles_[role_name]);
984
99
    }
985
87
    if (granted_roles) {
986
16
      const auto& set = recursive_granted_roles_[role_name];
987
16
      granted_roles->insert(set.begin(), set.end());
988
16
    }
989
87
  }
990
8.71k
}
991
992
void PermissionsManager::AddRoleUnlocked(
993
    const RoleName& role_name,
994
845
    scoped_refptr<RoleInfo> role_info) {
995
845
  CHECK
(roles_map_.count(role_name) == 0) << "Role already exists: " << role_name0
;
996
997
845
  roles_map_[role_name] = std::move(role_info);
998
845
}
999
1000
3.75k
void PermissionsManager::ClearRolesUnlocked() {
1001
3.75k
  roles_map_.clear();
1002
3.75k
}
1003
1004
3.01k
Status PermissionsManager::PrepareDefaultSecurityConfigUnlocked(int64_t term) {
1005
  // Set up default security config if not already present.
1006
3.01k
  if (!security_config_) {
1007
2.90k
    SysSecurityConfigEntryPB security_config;
1008
2.90k
    security_config.set_roles_version(0);
1009
1010
    // Create in memory object.
1011
2.90k
    security_config_ = new SysConfigInfo(kSecurityConfigType);
1012
1013
    // Prepare write.
1014
2.90k
    auto l = security_config_->LockForWrite();
1015
2.90k
    *l.mutable_data()->pb.mutable_security_config() = std::move(security_config);
1016
1017
    // Write to sys_catalog and in memory.
1018
2.90k
    RETURN_NOT_OK(catalog_manager_->sys_catalog_->Upsert(term, security_config_));
1019
2.90k
    l.Commit();
1020
2.90k
  }
1021
3.01k
  return Status::OK();
1022
3.01k
}
1023
1024
846
void PermissionsManager::SetSecurityConfigOnLoadUnlocked(SysConfigInfo* security_config) {
1025
846
  LOG_IF(WARNING, security_config_ != nullptr)
1026
762
      << "Multiple security configs found when loading sys catalog";
1027
846
  security_config_ = security_config;
1028
846
}
1029
1030
}  // namespace master
1031
}  // namespace yb