Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/api_core/iam.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

151 statements  

1# Copyright 2017 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14"""Non-API-specific IAM policy definitions 

15 

16For allowed roles / permissions, see: 

17https://cloud.google.com/iam/docs/understanding-roles 

18 

19Example usage: 

20 

21.. code-block:: python 

22 

23 # ``get_iam_policy`` returns a :class:'~google.api_core.iam.Policy`. 

24 policy = resource.get_iam_policy(requested_policy_version=3) 

25 

26 phred = "user:phred@example.com" 

27 admin_group = "group:admins@groups.example.com" 

28 account = "serviceAccount:account-1234@accounts.example.com" 

29 

30 policy.version = 3 

31 policy.bindings = [ 

32 { 

33 "role": "roles/owner", 

34 "members": {phred, admin_group, account} 

35 }, 

36 { 

37 "role": "roles/editor", 

38 "members": {"allAuthenticatedUsers"} 

39 }, 

40 { 

41 "role": "roles/viewer", 

42 "members": {"allUsers"} 

43 "condition": { 

44 "title": "request_time", 

45 "description": "Requests made before 2021-01-01T00:00:00Z", 

46 "expression": "request.time < timestamp(\"2021-01-01T00:00:00Z\")" 

47 } 

48 } 

49 ] 

50 

51 resource.set_iam_policy(policy) 

52""" 

53 

54import collections 

55import collections.abc 

56import operator 

57import warnings 

58 

59# Generic IAM roles 

60 

61OWNER_ROLE = "roles/owner" 

62"""Generic role implying all rights to an object.""" 

63 

64EDITOR_ROLE = "roles/editor" 

65"""Generic role implying rights to modify an object.""" 

66 

67VIEWER_ROLE = "roles/viewer" 

68"""Generic role implying rights to access an object.""" 

69 

70_ASSIGNMENT_DEPRECATED_MSG = """\ 

71Assigning to '{}' is deprecated. Use the `policy.bindings` property to modify bindings instead.""" 

72 

73_DICT_ACCESS_MSG = """\ 

74Dict access is not supported on policies with version > 1 or with conditional bindings.""" 

75 

76 

77class InvalidOperationException(Exception): 

78 """Raised when trying to use Policy class as a dict.""" 

79 

80 pass 

81 

82 

83class Policy(collections.abc.MutableMapping): 

84 """IAM Policy 

85 

86 Args: 

87 etag (Optional[str]): ETag used to identify a unique of the policy 

88 version (Optional[int]): The syntax schema version of the policy. 

89 

90 Note: 

91 Using conditions in bindings requires the policy's version to be set 

92 to `3` or greater, depending on the versions that are currently supported. 

93 

94 Accessing the policy using dict operations will raise InvalidOperationException 

95 when the policy's version is set to 3. 

96 

97 Use the policy.bindings getter/setter to retrieve and modify the policy's bindings. 

98 

99 See: 

100 IAM Policy https://cloud.google.com/iam/reference/rest/v1/Policy 

101 Policy versions https://cloud.google.com/iam/docs/policies#versions 

102 Conditions overview https://cloud.google.com/iam/docs/conditions-overview. 

103 """ 

104 

105 _OWNER_ROLES = (OWNER_ROLE,) 

106 """Roles mapped onto our ``owners`` attribute.""" 

107 

108 _EDITOR_ROLES = (EDITOR_ROLE,) 

109 """Roles mapped onto our ``editors`` attribute.""" 

110 

111 _VIEWER_ROLES = (VIEWER_ROLE,) 

112 """Roles mapped onto our ``viewers`` attribute.""" 

113 

114 def __init__(self, etag=None, version=None): 

115 self.etag = etag 

116 self.version = version 

117 self._bindings = [] 

118 

119 def __iter__(self): 

120 self.__check_version__() 

121 # Exclude bindings with no members 

122 return (binding["role"] for binding in self._bindings if binding["members"]) 

123 

124 def __len__(self): 

125 self.__check_version__() 

126 # Exclude bindings with no members 

127 return len(list(self.__iter__())) 

128 

129 def __getitem__(self, key): 

130 self.__check_version__() 

131 for b in self._bindings: 

132 if b["role"] == key: 

133 return b["members"] 

134 # If the binding does not yet exist, create one 

135 # NOTE: This will create bindings with no members 

136 # which are ignored by __iter__ and __len__ 

137 new_binding = {"role": key, "members": set()} 

138 self._bindings.append(new_binding) 

139 return new_binding["members"] 

140 

141 def __setitem__(self, key, value): 

142 self.__check_version__() 

143 value = set(value) 

144 for binding in self._bindings: 

145 if binding["role"] == key: 

146 binding["members"] = value 

147 return 

148 self._bindings.append({"role": key, "members": value}) 

149 

150 def __delitem__(self, key): 

151 self.__check_version__() 

152 for b in self._bindings: 

153 if b["role"] == key: 

154 self._bindings.remove(b) 

155 return 

156 raise KeyError(key) 

157 

158 def __check_version__(self): 

159 """Raise InvalidOperationException if version is greater than 1 or policy contains conditions.""" 

160 raise_version = self.version is not None and self.version > 1 

161 

162 if raise_version or self._contains_conditions(): 

163 raise InvalidOperationException(_DICT_ACCESS_MSG) 

164 

165 def _contains_conditions(self): 

166 for b in self._bindings: 

167 if b.get("condition") is not None: 

168 return True 

169 return False 

170 

171 @property 

172 def bindings(self): 

173 """The policy's list of bindings. 

174 

175 A binding is specified by a dictionary with keys: 

176 

177 * role (str): Role that is assigned to `members`. 

178 

179 * members (:obj:`set` of str): Specifies the identities associated to this binding. 

180 

181 * condition (:obj:`dict` of str:str): Specifies a condition under which this binding will apply. 

182 

183 * title (str): Title for the condition. 

184 

185 * description (:obj:str, optional): Description of the condition. 

186 

187 * expression: A CEL expression. 

188 

189 Type: 

190 :obj:`list` of :obj:`dict` 

191 

192 See: 

193 Policy versions https://cloud.google.com/iam/docs/policies#versions 

194 Conditions overview https://cloud.google.com/iam/docs/conditions-overview. 

195 

196 Example: 

197 

198 .. code-block:: python 

199 

200 USER = "user:phred@example.com" 

201 ADMIN_GROUP = "group:admins@groups.example.com" 

202 SERVICE_ACCOUNT = "serviceAccount:account-1234@accounts.example.com" 

203 CONDITION = { 

204 "title": "request_time", 

205 "description": "Requests made before 2021-01-01T00:00:00Z", # Optional 

206 "expression": "request.time < timestamp(\"2021-01-01T00:00:00Z\")" 

207 } 

208 

209 # Set policy's version to 3 before setting bindings containing conditions. 

210 policy.version = 3 

211 

212 policy.bindings = [ 

213 { 

214 "role": "roles/viewer", 

215 "members": {USER, ADMIN_GROUP, SERVICE_ACCOUNT}, 

216 "condition": CONDITION 

217 }, 

218 ... 

219 ] 

220 """ 

221 return self._bindings 

222 

223 @bindings.setter 

224 def bindings(self, bindings): 

225 self._bindings = bindings 

226 

227 @property 

228 def owners(self): 

229 """Legacy access to owner role. 

230 

231 Raise InvalidOperationException if version is greater than 1 or policy contains conditions. 

232 

233 DEPRECATED: use `policy.bindings` to access bindings instead. 

234 """ 

235 result = set() 

236 for role in self._OWNER_ROLES: 

237 for member in self.get(role, ()): 

238 result.add(member) 

239 return frozenset(result) 

240 

241 @owners.setter 

242 def owners(self, value): 

243 """Update owners. 

244 

245 Raise InvalidOperationException if version is greater than 1 or policy contains conditions. 

246 

247 DEPRECATED: use `policy.bindings` to access bindings instead. 

248 """ 

249 warnings.warn( 

250 _ASSIGNMENT_DEPRECATED_MSG.format("owners", OWNER_ROLE), DeprecationWarning 

251 ) 

252 self[OWNER_ROLE] = value 

253 

254 @property 

255 def editors(self): 

256 """Legacy access to editor role. 

257 

258 Raise InvalidOperationException if version is greater than 1 or policy contains conditions. 

259 

260 DEPRECATED: use `policy.bindings` to access bindings instead. 

261 """ 

262 result = set() 

263 for role in self._EDITOR_ROLES: 

264 for member in self.get(role, ()): 

265 result.add(member) 

266 return frozenset(result) 

267 

268 @editors.setter 

269 def editors(self, value): 

270 """Update editors. 

271 

272 Raise InvalidOperationException if version is greater than 1 or policy contains conditions. 

273 

274 DEPRECATED: use `policy.bindings` to modify bindings instead. 

275 """ 

276 warnings.warn( 

277 _ASSIGNMENT_DEPRECATED_MSG.format("editors", EDITOR_ROLE), 

278 DeprecationWarning, 

279 ) 

280 self[EDITOR_ROLE] = value 

281 

282 @property 

283 def viewers(self): 

284 """Legacy access to viewer role. 

285 

286 Raise InvalidOperationException if version is greater than 1 or policy contains conditions. 

287 

288 DEPRECATED: use `policy.bindings` to modify bindings instead. 

289 """ 

290 result = set() 

291 for role in self._VIEWER_ROLES: 

292 for member in self.get(role, ()): 

293 result.add(member) 

294 return frozenset(result) 

295 

296 @viewers.setter 

297 def viewers(self, value): 

298 """Update viewers. 

299 

300 Raise InvalidOperationException if version is greater than 1 or policy contains conditions. 

301 

302 DEPRECATED: use `policy.bindings` to modify bindings instead. 

303 """ 

304 warnings.warn( 

305 _ASSIGNMENT_DEPRECATED_MSG.format("viewers", VIEWER_ROLE), 

306 DeprecationWarning, 

307 ) 

308 self[VIEWER_ROLE] = value 

309 

310 @staticmethod 

311 def user(email): 

312 """Factory method for a user member. 

313 

314 Args: 

315 email (str): E-mail for this particular user. 

316 

317 Returns: 

318 str: A member string corresponding to the given user. 

319 """ 

320 return "user:%s" % (email,) 

321 

322 @staticmethod 

323 def service_account(email): 

324 """Factory method for a service account member. 

325 

326 Args: 

327 email (str): E-mail for this particular service account. 

328 

329 Returns: 

330 str: A member string corresponding to the given service account. 

331 

332 """ 

333 return "serviceAccount:%s" % (email,) 

334 

335 @staticmethod 

336 def group(email): 

337 """Factory method for a group member. 

338 

339 Args: 

340 email (str): An id or e-mail for this particular group. 

341 

342 Returns: 

343 str: A member string corresponding to the given group. 

344 """ 

345 return "group:%s" % (email,) 

346 

347 @staticmethod 

348 def domain(domain): 

349 """Factory method for a domain member. 

350 

351 Args: 

352 domain (str): The domain for this member. 

353 

354 Returns: 

355 str: A member string corresponding to the given domain. 

356 """ 

357 return "domain:%s" % (domain,) 

358 

359 @staticmethod 

360 def all_users(): 

361 """Factory method for a member representing all users. 

362 

363 Returns: 

364 str: A member string representing all users. 

365 """ 

366 return "allUsers" 

367 

368 @staticmethod 

369 def authenticated_users(): 

370 """Factory method for a member representing all authenticated users. 

371 

372 Returns: 

373 str: A member string representing all authenticated users. 

374 """ 

375 return "allAuthenticatedUsers" 

376 

377 @classmethod 

378 def from_api_repr(cls, resource): 

379 """Factory: create a policy from a JSON resource. 

380 

381 Args: 

382 resource (dict): policy resource returned by ``getIamPolicy`` API. 

383 

384 Returns: 

385 :class:`Policy`: the parsed policy 

386 """ 

387 version = resource.get("version") 

388 etag = resource.get("etag") 

389 policy = cls(etag, version) 

390 policy.bindings = resource.get("bindings", []) 

391 

392 for binding in policy.bindings: 

393 binding["members"] = set(binding.get("members", ())) 

394 

395 return policy 

396 

397 def to_api_repr(self): 

398 """Render a JSON policy resource. 

399 

400 Returns: 

401 dict: a resource to be passed to the ``setIamPolicy`` API. 

402 """ 

403 resource = {} 

404 

405 if self.etag is not None: 

406 resource["etag"] = self.etag 

407 

408 if self.version is not None: 

409 resource["version"] = self.version 

410 

411 if self._bindings and len(self._bindings) > 0: 

412 bindings = [] 

413 for binding in self._bindings: 

414 members = binding.get("members") 

415 if members: 

416 new_binding = {"role": binding["role"], "members": sorted(members)} 

417 condition = binding.get("condition") 

418 if condition: 

419 new_binding["condition"] = condition 

420 bindings.append(new_binding) 

421 

422 if bindings: 

423 # Sort bindings by role 

424 key = operator.itemgetter("role") 

425 resource["bindings"] = sorted(bindings, key=key) 

426 

427 return resource