Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/api_core/iam.py: 38%
144 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:17 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:17 +0000
1# Copyright 2017 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Non-API-specific IAM policy definitions
16For allowed roles / permissions, see:
17https://cloud.google.com/iam/docs/understanding-roles
19Example usage:
21.. code-block:: python
23 # ``get_iam_policy`` returns a :class:'~google.api_core.iam.Policy`.
24 policy = resource.get_iam_policy(requested_policy_version=3)
26 phred = "user:phred@example.com"
27 admin_group = "group:admins@groups.example.com"
28 account = "serviceAccount:account-1234@accounts.example.com"
30 policy.version = 3
31 policy.bindings = [
32 {
33 "role": "roles/owner",
34 "members": {phred, admin_group, account}
35 },
36 {
37 "role": "roles/editor",
38 "members": {"allAuthenticatedUsers"}
39 },
40 {
41 "role": "roles/viewer",
42 "members": {"allUsers"}
43 "condition": {
44 "title": "request_time",
45 "description": "Requests made before 2021-01-01T00:00:00Z",
46 "expression": "request.time < timestamp(\"2021-01-01T00:00:00Z\")"
47 }
48 }
49 ]
51 resource.set_iam_policy(policy)
52"""
54import collections
55import collections.abc
56import operator
57import warnings
59# Generic IAM roles
61OWNER_ROLE = "roles/owner"
62"""Generic role implying all rights to an object."""
64EDITOR_ROLE = "roles/editor"
65"""Generic role implying rights to modify an object."""
67VIEWER_ROLE = "roles/viewer"
68"""Generic role implying rights to access an object."""
70_ASSIGNMENT_DEPRECATED_MSG = """\
71Assigning to '{}' is deprecated. Use the `policy.bindings` property to modify bindings instead."""
73_DICT_ACCESS_MSG = """\
74Dict access is not supported on policies with version > 1 or with conditional bindings."""
77class InvalidOperationException(Exception):
78 """Raised when trying to use Policy class as a dict."""
80 pass
83class Policy(collections.abc.MutableMapping):
84 """IAM Policy
86 Args:
87 etag (Optional[str]): ETag used to identify a unique of the policy
88 version (Optional[int]): The syntax schema version of the policy.
90 Note:
91 Using conditions in bindings requires the policy's version to be set
92 to `3` or greater, depending on the versions that are currently supported.
94 Accessing the policy using dict operations will raise InvalidOperationException
95 when the policy's version is set to 3.
97 Use the policy.bindings getter/setter to retrieve and modify the policy's bindings.
99 See:
100 IAM Policy https://cloud.google.com/iam/reference/rest/v1/Policy
101 Policy versions https://cloud.google.com/iam/docs/policies#versions
102 Conditions overview https://cloud.google.com/iam/docs/conditions-overview.
103 """
105 _OWNER_ROLES = (OWNER_ROLE,)
106 """Roles mapped onto our ``owners`` attribute."""
108 _EDITOR_ROLES = (EDITOR_ROLE,)
109 """Roles mapped onto our ``editors`` attribute."""
111 _VIEWER_ROLES = (VIEWER_ROLE,)
112 """Roles mapped onto our ``viewers`` attribute."""
114 def __init__(self, etag=None, version=None):
115 self.etag = etag
116 self.version = version
117 self._bindings = []
119 def __iter__(self):
120 self.__check_version__()
121 # Exclude bindings with no members
122 return (binding["role"] for binding in self._bindings if binding["members"])
124 def __len__(self):
125 self.__check_version__()
126 # Exclude bindings with no members
127 return len(list(self.__iter__()))
129 def __getitem__(self, key):
130 self.__check_version__()
131 for b in self._bindings:
132 if b["role"] == key:
133 return b["members"]
134 # If the binding does not yet exist, create one
135 # NOTE: This will create bindings with no members
136 # which are ignored by __iter__ and __len__
137 new_binding = {"role": key, "members": set()}
138 self._bindings.append(new_binding)
139 return new_binding["members"]
141 def __setitem__(self, key, value):
142 self.__check_version__()
143 value = set(value)
144 for binding in self._bindings:
145 if binding["role"] == key:
146 binding["members"] = value
147 return
148 self._bindings.append({"role": key, "members": value})
150 def __delitem__(self, key):
151 self.__check_version__()
152 for b in self._bindings:
153 if b["role"] == key:
154 self._bindings.remove(b)
155 return
156 raise KeyError(key)
158 def __check_version__(self):
159 """Raise InvalidOperationException if version is greater than 1 or policy contains conditions."""
160 raise_version = self.version is not None and self.version > 1
162 if raise_version or self._contains_conditions():
163 raise InvalidOperationException(_DICT_ACCESS_MSG)
165 def _contains_conditions(self):
166 for b in self._bindings:
167 if b.get("condition") is not None:
168 return True
169 return False
171 @property
172 def bindings(self):
173 """The policy's list of bindings.
175 A binding is specified by a dictionary with keys:
177 * role (str): Role that is assigned to `members`.
179 * members (:obj:`set` of str): Specifies the identities associated to this binding.
181 * condition (:obj:`dict` of str:str): Specifies a condition under which this binding will apply.
183 * title (str): Title for the condition.
185 * description (:obj:str, optional): Description of the condition.
187 * expression: A CEL expression.
189 Type:
190 :obj:`list` of :obj:`dict`
192 See:
193 Policy versions https://cloud.google.com/iam/docs/policies#versions
194 Conditions overview https://cloud.google.com/iam/docs/conditions-overview.
196 Example:
198 .. code-block:: python
200 USER = "user:phred@example.com"
201 ADMIN_GROUP = "group:admins@groups.example.com"
202 SERVICE_ACCOUNT = "serviceAccount:account-1234@accounts.example.com"
203 CONDITION = {
204 "title": "request_time",
205 "description": "Requests made before 2021-01-01T00:00:00Z", # Optional
206 "expression": "request.time < timestamp(\"2021-01-01T00:00:00Z\")"
207 }
209 # Set policy's version to 3 before setting bindings containing conditions.
210 policy.version = 3
212 policy.bindings = [
213 {
214 "role": "roles/viewer",
215 "members": {USER, ADMIN_GROUP, SERVICE_ACCOUNT},
216 "condition": CONDITION
217 },
218 ...
219 ]
220 """
221 return self._bindings
223 @bindings.setter
224 def bindings(self, bindings):
225 self._bindings = bindings
227 @property
228 def owners(self):
229 """Legacy access to owner role.
231 Raise InvalidOperationException if version is greater than 1 or policy contains conditions.
233 DEPRECATED: use `policy.bindings` to access bindings instead.
234 """
235 result = set()
236 for role in self._OWNER_ROLES:
237 for member in self.get(role, ()):
238 result.add(member)
239 return frozenset(result)
241 @owners.setter
242 def owners(self, value):
243 """Update owners.
245 Raise InvalidOperationException if version is greater than 1 or policy contains conditions.
247 DEPRECATED: use `policy.bindings` to access bindings instead.
248 """
249 warnings.warn(
250 _ASSIGNMENT_DEPRECATED_MSG.format("owners", OWNER_ROLE), DeprecationWarning
251 )
252 self[OWNER_ROLE] = value
254 @property
255 def editors(self):
256 """Legacy access to editor role.
258 Raise InvalidOperationException if version is greater than 1 or policy contains conditions.
260 DEPRECATED: use `policy.bindings` to access bindings instead.
261 """
262 result = set()
263 for role in self._EDITOR_ROLES:
264 for member in self.get(role, ()):
265 result.add(member)
266 return frozenset(result)
268 @editors.setter
269 def editors(self, value):
270 """Update editors.
272 Raise InvalidOperationException if version is greater than 1 or policy contains conditions.
274 DEPRECATED: use `policy.bindings` to modify bindings instead.
275 """
276 warnings.warn(
277 _ASSIGNMENT_DEPRECATED_MSG.format("editors", EDITOR_ROLE),
278 DeprecationWarning,
279 )
280 self[EDITOR_ROLE] = value
282 @property
283 def viewers(self):
284 """Legacy access to viewer role.
286 Raise InvalidOperationException if version is greater than 1 or policy contains conditions.
288 DEPRECATED: use `policy.bindings` to modify bindings instead.
289 """
290 result = set()
291 for role in self._VIEWER_ROLES:
292 for member in self.get(role, ()):
293 result.add(member)
294 return frozenset(result)
296 @viewers.setter
297 def viewers(self, value):
298 """Update viewers.
300 Raise InvalidOperationException if version is greater than 1 or policy contains conditions.
302 DEPRECATED: use `policy.bindings` to modify bindings instead.
303 """
304 warnings.warn(
305 _ASSIGNMENT_DEPRECATED_MSG.format("viewers", VIEWER_ROLE),
306 DeprecationWarning,
307 )
308 self[VIEWER_ROLE] = value
310 @staticmethod
311 def user(email):
312 """Factory method for a user member.
314 Args:
315 email (str): E-mail for this particular user.
317 Returns:
318 str: A member string corresponding to the given user.
319 """
320 return "user:%s" % (email,)
322 @staticmethod
323 def service_account(email):
324 """Factory method for a service account member.
326 Args:
327 email (str): E-mail for this particular service account.
329 Returns:
330 str: A member string corresponding to the given service account.
332 """
333 return "serviceAccount:%s" % (email,)
335 @staticmethod
336 def group(email):
337 """Factory method for a group member.
339 Args:
340 email (str): An id or e-mail for this particular group.
342 Returns:
343 str: A member string corresponding to the given group.
344 """
345 return "group:%s" % (email,)
347 @staticmethod
348 def domain(domain):
349 """Factory method for a domain member.
351 Args:
352 domain (str): The domain for this member.
354 Returns:
355 str: A member string corresponding to the given domain.
356 """
357 return "domain:%s" % (domain,)
359 @staticmethod
360 def all_users():
361 """Factory method for a member representing all users.
363 Returns:
364 str: A member string representing all users.
365 """
366 return "allUsers"
368 @staticmethod
369 def authenticated_users():
370 """Factory method for a member representing all authenticated users.
372 Returns:
373 str: A member string representing all authenticated users.
374 """
375 return "allAuthenticatedUsers"
377 @classmethod
378 def from_api_repr(cls, resource):
379 """Factory: create a policy from a JSON resource.
381 Args:
382 resource (dict): policy resource returned by ``getIamPolicy`` API.
384 Returns:
385 :class:`Policy`: the parsed policy
386 """
387 version = resource.get("version")
388 etag = resource.get("etag")
389 policy = cls(etag, version)
390 policy.bindings = resource.get("bindings", [])
392 for binding in policy.bindings:
393 binding["members"] = set(binding.get("members", ()))
395 return policy
397 def to_api_repr(self):
398 """Render a JSON policy resource.
400 Returns:
401 dict: a resource to be passed to the ``setIamPolicy`` API.
402 """
403 resource = {}
405 if self.etag is not None:
406 resource["etag"] = self.etag
408 if self.version is not None:
409 resource["version"] = self.version
411 if self._bindings and len(self._bindings) > 0:
412 bindings = []
413 for binding in self._bindings:
414 members = binding.get("members")
415 if members:
416 new_binding = {"role": binding["role"], "members": sorted(members)}
417 condition = binding.get("condition")
418 if condition:
419 new_binding["condition"] = condition
420 bindings.append(new_binding)
422 if bindings:
423 # Sort bindings by role
424 key = operator.itemgetter("role")
425 resource["bindings"] = sorted(bindings, key=key)
427 return resource