1# Copyright 2017 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Non-API-specific IAM policy definitions
15
16For allowed roles / permissions, see:
17https://cloud.google.com/iam/docs/understanding-roles
18
19Example usage:
20
21.. code-block:: python
22
23 # ``get_iam_policy`` returns a :class:'~google.api_core.iam.Policy`.
24 policy = resource.get_iam_policy(requested_policy_version=3)
25
26 phred = "user:phred@example.com"
27 admin_group = "group:admins@groups.example.com"
28 account = "serviceAccount:account-1234@accounts.example.com"
29
30 policy.version = 3
31 policy.bindings = [
32 {
33 "role": "roles/owner",
34 "members": {phred, admin_group, account}
35 },
36 {
37 "role": "roles/editor",
38 "members": {"allAuthenticatedUsers"}
39 },
40 {
41 "role": "roles/viewer",
42 "members": {"allUsers"}
43 "condition": {
44 "title": "request_time",
45 "description": "Requests made before 2021-01-01T00:00:00Z",
46 "expression": "request.time < timestamp(\"2021-01-01T00:00:00Z\")"
47 }
48 }
49 ]
50
51 resource.set_iam_policy(policy)
52"""
53
54import collections
55import collections.abc
56import operator
57import warnings
58
59# Generic IAM roles
60
61OWNER_ROLE = "roles/owner"
62"""Generic role implying all rights to an object."""
63
64EDITOR_ROLE = "roles/editor"
65"""Generic role implying rights to modify an object."""
66
67VIEWER_ROLE = "roles/viewer"
68"""Generic role implying rights to access an object."""
69
70_ASSIGNMENT_DEPRECATED_MSG = """\
71Assigning to '{}' is deprecated. Use the `policy.bindings` property to modify bindings instead."""
72
73_DICT_ACCESS_MSG = """\
74Dict access is not supported on policies with version > 1 or with conditional bindings."""
75
76
77class InvalidOperationException(Exception):
78 """Raised when trying to use Policy class as a dict."""
79
80 pass
81
82
83class Policy(collections.abc.MutableMapping):
84 """IAM Policy
85
86 Args:
87 etag (Optional[str]): ETag used to identify a unique of the policy
88 version (Optional[int]): The syntax schema version of the policy.
89
90 Note:
91 Using conditions in bindings requires the policy's version to be set
92 to `3` or greater, depending on the versions that are currently supported.
93
94 Accessing the policy using dict operations will raise InvalidOperationException
95 when the policy's version is set to 3.
96
97 Use the policy.bindings getter/setter to retrieve and modify the policy's bindings.
98
99 See:
100 IAM Policy https://cloud.google.com/iam/reference/rest/v1/Policy
101 Policy versions https://cloud.google.com/iam/docs/policies#versions
102 Conditions overview https://cloud.google.com/iam/docs/conditions-overview.
103 """
104
105 _OWNER_ROLES = (OWNER_ROLE,)
106 """Roles mapped onto our ``owners`` attribute."""
107
108 _EDITOR_ROLES = (EDITOR_ROLE,)
109 """Roles mapped onto our ``editors`` attribute."""
110
111 _VIEWER_ROLES = (VIEWER_ROLE,)
112 """Roles mapped onto our ``viewers`` attribute."""
113
114 def __init__(self, etag=None, version=None):
115 self.etag = etag
116 self.version = version
117 self._bindings = []
118
119 def __iter__(self):
120 self.__check_version__()
121 # Exclude bindings with no members
122 return (binding["role"] for binding in self._bindings if binding["members"])
123
124 def __len__(self):
125 self.__check_version__()
126 # Exclude bindings with no members
127 return len(list(self.__iter__()))
128
129 def __getitem__(self, key):
130 self.__check_version__()
131 for b in self._bindings:
132 if b["role"] == key:
133 return b["members"]
134 # If the binding does not yet exist, create one
135 # NOTE: This will create bindings with no members
136 # which are ignored by __iter__ and __len__
137 new_binding = {"role": key, "members": set()}
138 self._bindings.append(new_binding)
139 return new_binding["members"]
140
141 def __setitem__(self, key, value):
142 self.__check_version__()
143 value = set(value)
144 for binding in self._bindings:
145 if binding["role"] == key:
146 binding["members"] = value
147 return
148 self._bindings.append({"role": key, "members": value})
149
150 def __delitem__(self, key):
151 self.__check_version__()
152 for b in self._bindings:
153 if b["role"] == key:
154 self._bindings.remove(b)
155 return
156 raise KeyError(key)
157
158 def __check_version__(self):
159 """Raise InvalidOperationException if version is greater than 1 or policy contains conditions."""
160 raise_version = self.version is not None and self.version > 1
161
162 if raise_version or self._contains_conditions():
163 raise InvalidOperationException(_DICT_ACCESS_MSG)
164
165 def _contains_conditions(self):
166 for b in self._bindings:
167 if b.get("condition") is not None:
168 return True
169 return False
170
171 @property
172 def bindings(self):
173 """The policy's list of bindings.
174
175 A binding is specified by a dictionary with keys:
176
177 * role (str): Role that is assigned to `members`.
178
179 * members (:obj:`set` of str): Specifies the identities associated to this binding.
180
181 * condition (:obj:`dict` of str:str): Specifies a condition under which this binding will apply.
182
183 * title (str): Title for the condition.
184
185 * description (:obj:str, optional): Description of the condition.
186
187 * expression: A CEL expression.
188
189 Type:
190 :obj:`list` of :obj:`dict`
191
192 See:
193 Policy versions https://cloud.google.com/iam/docs/policies#versions
194 Conditions overview https://cloud.google.com/iam/docs/conditions-overview.
195
196 Example:
197
198 .. code-block:: python
199
200 USER = "user:phred@example.com"
201 ADMIN_GROUP = "group:admins@groups.example.com"
202 SERVICE_ACCOUNT = "serviceAccount:account-1234@accounts.example.com"
203 CONDITION = {
204 "title": "request_time",
205 "description": "Requests made before 2021-01-01T00:00:00Z", # Optional
206 "expression": "request.time < timestamp(\"2021-01-01T00:00:00Z\")"
207 }
208
209 # Set policy's version to 3 before setting bindings containing conditions.
210 policy.version = 3
211
212 policy.bindings = [
213 {
214 "role": "roles/viewer",
215 "members": {USER, ADMIN_GROUP, SERVICE_ACCOUNT},
216 "condition": CONDITION
217 },
218 ...
219 ]
220 """
221 return self._bindings
222
223 @bindings.setter
224 def bindings(self, bindings):
225 self._bindings = bindings
226
227 @property
228 def owners(self):
229 """Legacy access to owner role.
230
231 Raise InvalidOperationException if version is greater than 1 or policy contains conditions.
232
233 DEPRECATED: use `policy.bindings` to access bindings instead.
234 """
235 result = set()
236 for role in self._OWNER_ROLES:
237 for member in self.get(role, ()):
238 result.add(member)
239 return frozenset(result)
240
241 @owners.setter
242 def owners(self, value):
243 """Update owners.
244
245 Raise InvalidOperationException if version is greater than 1 or policy contains conditions.
246
247 DEPRECATED: use `policy.bindings` to access bindings instead.
248 """
249 warnings.warn(
250 _ASSIGNMENT_DEPRECATED_MSG.format("owners", OWNER_ROLE), DeprecationWarning
251 )
252 self[OWNER_ROLE] = value
253
254 @property
255 def editors(self):
256 """Legacy access to editor role.
257
258 Raise InvalidOperationException if version is greater than 1 or policy contains conditions.
259
260 DEPRECATED: use `policy.bindings` to access bindings instead.
261 """
262 result = set()
263 for role in self._EDITOR_ROLES:
264 for member in self.get(role, ()):
265 result.add(member)
266 return frozenset(result)
267
268 @editors.setter
269 def editors(self, value):
270 """Update editors.
271
272 Raise InvalidOperationException if version is greater than 1 or policy contains conditions.
273
274 DEPRECATED: use `policy.bindings` to modify bindings instead.
275 """
276 warnings.warn(
277 _ASSIGNMENT_DEPRECATED_MSG.format("editors", EDITOR_ROLE),
278 DeprecationWarning,
279 )
280 self[EDITOR_ROLE] = value
281
282 @property
283 def viewers(self):
284 """Legacy access to viewer role.
285
286 Raise InvalidOperationException if version is greater than 1 or policy contains conditions.
287
288 DEPRECATED: use `policy.bindings` to modify bindings instead.
289 """
290 result = set()
291 for role in self._VIEWER_ROLES:
292 for member in self.get(role, ()):
293 result.add(member)
294 return frozenset(result)
295
296 @viewers.setter
297 def viewers(self, value):
298 """Update viewers.
299
300 Raise InvalidOperationException if version is greater than 1 or policy contains conditions.
301
302 DEPRECATED: use `policy.bindings` to modify bindings instead.
303 """
304 warnings.warn(
305 _ASSIGNMENT_DEPRECATED_MSG.format("viewers", VIEWER_ROLE),
306 DeprecationWarning,
307 )
308 self[VIEWER_ROLE] = value
309
310 @staticmethod
311 def user(email):
312 """Factory method for a user member.
313
314 Args:
315 email (str): E-mail for this particular user.
316
317 Returns:
318 str: A member string corresponding to the given user.
319 """
320 return "user:%s" % (email,)
321
322 @staticmethod
323 def service_account(email):
324 """Factory method for a service account member.
325
326 Args:
327 email (str): E-mail for this particular service account.
328
329 Returns:
330 str: A member string corresponding to the given service account.
331
332 """
333 return "serviceAccount:%s" % (email,)
334
335 @staticmethod
336 def group(email):
337 """Factory method for a group member.
338
339 Args:
340 email (str): An id or e-mail for this particular group.
341
342 Returns:
343 str: A member string corresponding to the given group.
344 """
345 return "group:%s" % (email,)
346
347 @staticmethod
348 def domain(domain):
349 """Factory method for a domain member.
350
351 Args:
352 domain (str): The domain for this member.
353
354 Returns:
355 str: A member string corresponding to the given domain.
356 """
357 return "domain:%s" % (domain,)
358
359 @staticmethod
360 def all_users():
361 """Factory method for a member representing all users.
362
363 Returns:
364 str: A member string representing all users.
365 """
366 return "allUsers"
367
368 @staticmethod
369 def authenticated_users():
370 """Factory method for a member representing all authenticated users.
371
372 Returns:
373 str: A member string representing all authenticated users.
374 """
375 return "allAuthenticatedUsers"
376
377 @classmethod
378 def from_api_repr(cls, resource):
379 """Factory: create a policy from a JSON resource.
380
381 Args:
382 resource (dict): policy resource returned by ``getIamPolicy`` API.
383
384 Returns:
385 :class:`Policy`: the parsed policy
386 """
387 version = resource.get("version")
388 etag = resource.get("etag")
389 policy = cls(etag, version)
390 policy.bindings = resource.get("bindings", [])
391
392 for binding in policy.bindings:
393 binding["members"] = set(binding.get("members", ()))
394
395 return policy
396
397 def to_api_repr(self):
398 """Render a JSON policy resource.
399
400 Returns:
401 dict: a resource to be passed to the ``setIamPolicy`` API.
402 """
403 resource = {}
404
405 if self.etag is not None:
406 resource["etag"] = self.etag
407
408 if self.version is not None:
409 resource["version"] = self.version
410
411 if self._bindings and len(self._bindings) > 0:
412 bindings = []
413 for binding in self._bindings:
414 members = binding.get("members")
415 if members:
416 new_binding = {"role": binding["role"], "members": sorted(members)}
417 condition = binding.get("condition")
418 if condition:
419 new_binding["condition"] = condition
420 bindings.append(new_binding)
421
422 if bindings:
423 # Sort bindings by role
424 key = operator.itemgetter("role")
425 resource["bindings"] = sorted(bindings, key=key)
426
427 return resource