Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/google/cloud/storage/_signing.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

190 statements  

1# Copyright 2017 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import base64 

17import binascii 

18import collections 

19import datetime 

20import hashlib 

21import json 

22 

23import http 

24import urllib 

25 

26import google.auth.credentials 

27 

28from google.auth import exceptions 

29from google.auth.transport import requests 

30from google.cloud import _helpers 

31from google.cloud.storage._helpers import _DEFAULT_UNIVERSE_DOMAIN 

32from google.cloud.storage._helpers import _NOW 

33from google.cloud.storage._helpers import _UTC 

34from google.cloud.storage.retry import DEFAULT_RETRY 

35 

36 

37# `google.cloud.storage._signing.NOW` is deprecated. 

38# Use `_NOW(_UTC)` instead. 

39NOW = datetime.datetime.utcnow 

40 

41SERVICE_ACCOUNT_URL = ( 

42 "https://googleapis.dev/python/google-api-core/latest/" 

43 "auth.html#setting-up-a-service-account" 

44) 

45 

46 

47def ensure_signed_credentials(credentials): 

48 """Raise AttributeError if the credentials are unsigned. 

49 

50 :type credentials: :class:`google.auth.credentials.Signing` 

51 :param credentials: The credentials used to create a private key 

52 for signing text. 

53 

54 :raises: :exc:`AttributeError` if credentials is not an instance 

55 of :class:`google.auth.credentials.Signing`. 

56 """ 

57 if not isinstance(credentials, google.auth.credentials.Signing): 

58 raise AttributeError( 

59 "you need a private key to sign credentials." 

60 "the credentials you are currently using {} " 

61 "just contains a token. see {} for more " 

62 "details.".format(type(credentials), SERVICE_ACCOUNT_URL) 

63 ) 

64 

65 

66def get_signed_query_params_v2(credentials, expiration, string_to_sign): 

67 """Gets query parameters for creating a signed URL. 

68 

69 :type credentials: :class:`google.auth.credentials.Signing` 

70 :param credentials: The credentials used to create a private key 

71 for signing text. 

72 

73 :type expiration: int or long 

74 :param expiration: When the signed URL should expire. 

75 

76 :type string_to_sign: str 

77 :param string_to_sign: The string to be signed by the credentials. 

78 

79 :raises: :exc:`AttributeError` if credentials is not an instance 

80 of :class:`google.auth.credentials.Signing`. 

81 

82 :rtype: dict 

83 :returns: Query parameters matching the signing credentials with a 

84 signed payload. 

85 """ 

86 ensure_signed_credentials(credentials) 

87 signature_bytes = credentials.sign_bytes(string_to_sign.encode("ascii")) 

88 signature = base64.b64encode(signature_bytes) 

89 service_account_name = credentials.signer_email 

90 return { 

91 "GoogleAccessId": service_account_name, 

92 "Expires": expiration, 

93 "Signature": signature, 

94 } 

95 

96 

97def get_expiration_seconds_v2(expiration): 

98 """Convert 'expiration' to a number of seconds in the future. 

99 

100 :type expiration: Union[Integer, datetime.datetime, datetime.timedelta] 

101 :param expiration: Point in time when the signed URL should expire. If 

102 a ``datetime`` instance is passed without an explicit 

103 ``tzinfo`` set, it will be assumed to be ``UTC``. 

104 

105 :raises: :exc:`TypeError` when expiration is not a valid type. 

106 

107 :rtype: int 

108 :returns: a timestamp as an absolute number of seconds since epoch. 

109 """ 

110 # If it's a timedelta, add it to `now` in UTC. 

111 if isinstance(expiration, datetime.timedelta): 

112 now = _NOW(_UTC) 

113 expiration = now + expiration 

114 

115 # If it's a datetime, convert to a timestamp. 

116 if isinstance(expiration, datetime.datetime): 

117 micros = _helpers._microseconds_from_datetime(expiration) 

118 expiration = micros // 10**6 

119 

120 if not isinstance(expiration, int): 

121 raise TypeError( 

122 "Expected an integer timestamp, datetime, or " 

123 "timedelta. Got %s" % type(expiration) 

124 ) 

125 return expiration 

126 

127 

128_EXPIRATION_TYPES = (int, datetime.datetime, datetime.timedelta) 

129 

130 

131def get_expiration_seconds_v4(expiration): 

132 """Convert 'expiration' to a number of seconds offset from the current time. 

133 

134 :type expiration: Union[Integer, datetime.datetime, datetime.timedelta] 

135 :param expiration: Point in time when the signed URL should expire. If 

136 a ``datetime`` instance is passed without an explicit 

137 ``tzinfo`` set, it will be assumed to be ``UTC``. 

138 

139 :raises: :exc:`TypeError` when expiration is not a valid type. 

140 :raises: :exc:`ValueError` when expiration is too large. 

141 :rtype: Integer 

142 :returns: seconds in the future when the signed URL will expire 

143 """ 

144 if not isinstance(expiration, _EXPIRATION_TYPES): 

145 raise TypeError( 

146 "Expected an integer timestamp, datetime, or " 

147 "timedelta. Got %s" % type(expiration) 

148 ) 

149 

150 now = _NOW(_UTC) 

151 

152 if isinstance(expiration, int): 

153 seconds = expiration 

154 

155 if isinstance(expiration, datetime.datetime): 

156 if expiration.tzinfo is None: 

157 expiration = expiration.replace(tzinfo=_helpers.UTC) 

158 expiration = expiration - now 

159 

160 if isinstance(expiration, datetime.timedelta): 

161 seconds = int(expiration.total_seconds()) 

162 

163 if seconds > SEVEN_DAYS: 

164 raise ValueError(f"Max allowed expiration interval is seven days {SEVEN_DAYS}") 

165 

166 return seconds 

167 

168 

169def get_canonical_headers(headers): 

170 """Canonicalize headers for signing. 

171 

172 See: 

173 https://cloud.google.com/storage/docs/access-control/signed-urls#about-canonical-extension-headers 

174 

175 :type headers: Union[dict|List(Tuple(str,str))] 

176 :param headers: 

177 (Optional) Additional HTTP headers to be included as part of the 

178 signed URLs. See: 

179 https://cloud.google.com/storage/docs/xml-api/reference-headers 

180 Requests using the signed URL *must* pass the specified header 

181 (name and value) with each request for the URL. 

182 

183 :rtype: str 

184 :returns: List of headers, normalized / sortted per the URL refernced above. 

185 """ 

186 if headers is None: 

187 headers = [] 

188 elif isinstance(headers, dict): 

189 headers = list(headers.items()) 

190 

191 if not headers: 

192 return [], [] 

193 

194 normalized = collections.defaultdict(list) 

195 for key, val in headers: 

196 key = key.lower().strip() 

197 val = " ".join(val.split()) 

198 normalized[key].append(val) 

199 

200 ordered_headers = sorted((key, ",".join(val)) for key, val in normalized.items()) 

201 

202 canonical_headers = ["{}:{}".format(*item) for item in ordered_headers] 

203 return canonical_headers, ordered_headers 

204 

205 

206_Canonical = collections.namedtuple( 

207 "_Canonical", ["method", "resource", "query_parameters", "headers"] 

208) 

209 

210 

211def canonicalize_v2(method, resource, query_parameters, headers): 

212 """Canonicalize method, resource per the V2 spec. 

213 

214 :type method: str 

215 :param method: The HTTP verb that will be used when requesting the URL. 

216 Defaults to ``'GET'``. If method is ``'RESUMABLE'`` then the 

217 signature will additionally contain the `x-goog-resumable` 

218 header, and the method changed to POST. See the signed URL 

219 docs regarding this flow: 

220 https://cloud.google.com/storage/docs/access-control/signed-urls 

221 

222 :type resource: str 

223 :param resource: A pointer to a specific resource 

224 (typically, ``/bucket-name/path/to/blob.txt``). 

225 

226 :type query_parameters: dict 

227 :param query_parameters: 

228 (Optional) Additional query parameters to be included as part of the 

229 signed URLs. See: 

230 https://cloud.google.com/storage/docs/xml-api/reference-headers#query 

231 

232 :type headers: Union[dict|List(Tuple(str,str))] 

233 :param headers: 

234 (Optional) Additional HTTP headers to be included as part of the 

235 signed URLs. See: 

236 https://cloud.google.com/storage/docs/xml-api/reference-headers 

237 Requests using the signed URL *must* pass the specified header 

238 (name and value) with each request for the URL. 

239 

240 :rtype: :class:_Canonical 

241 :returns: Canonical method, resource, query_parameters, and headers. 

242 """ 

243 headers, _ = get_canonical_headers(headers) 

244 

245 if method == "RESUMABLE": 

246 method = "POST" 

247 headers.append("x-goog-resumable:start") 

248 

249 if query_parameters is None: 

250 return _Canonical(method, resource, [], headers) 

251 

252 normalized_qp = sorted( 

253 (key.lower(), value and value.strip() or "") 

254 for key, value in query_parameters.items() 

255 ) 

256 encoded_qp = urllib.parse.urlencode(normalized_qp) 

257 canonical_resource = f"{resource}?{encoded_qp}" 

258 return _Canonical(method, canonical_resource, normalized_qp, headers) 

259 

260 

261def generate_signed_url_v2( 

262 credentials, 

263 resource, 

264 expiration, 

265 api_access_endpoint="", 

266 method="GET", 

267 content_md5=None, 

268 content_type=None, 

269 response_type=None, 

270 response_disposition=None, 

271 generation=None, 

272 headers=None, 

273 query_parameters=None, 

274 service_account_email=None, 

275 access_token=None, 

276 universe_domain=None, 

277): 

278 """Generate a V2 signed URL to provide query-string auth'n to a resource. 

279 

280 .. note:: 

281 

282 Assumes ``credentials`` implements the 

283 :class:`google.auth.credentials.Signing` interface. Also assumes 

284 ``credentials`` has a ``signer_email`` property which 

285 identifies the credentials. 

286 

287 .. note:: 

288 

289 If you are on Google Compute Engine, you can't generate a signed URL. 

290 If you'd like to be able to generate a signed URL from GCE, you can use a 

291 standard service account from a JSON file rather than a GCE service account. 

292 

293 See headers [reference](https://cloud.google.com/storage/docs/reference-headers) 

294 for more details on optional arguments. 

295 

296 :type credentials: :class:`google.auth.credentials.Signing` 

297 :param credentials: Credentials object with an associated private key to 

298 sign text. 

299 

300 :type resource: str 

301 :param resource: A pointer to a specific resource 

302 (typically, ``/bucket-name/path/to/blob.txt``). 

303 Caller should have already URL-encoded the value. 

304 

305 :type expiration: Union[Integer, datetime.datetime, datetime.timedelta] 

306 :param expiration: Point in time when the signed URL should expire. If 

307 a ``datetime`` instance is passed without an explicit 

308 ``tzinfo`` set, it will be assumed to be ``UTC``. 

309 

310 :type api_access_endpoint: str 

311 :param api_access_endpoint: (Optional) URI base. Defaults to empty string. 

312 

313 :type method: str 

314 :param method: The HTTP verb that will be used when requesting the URL. 

315 Defaults to ``'GET'``. If method is ``'RESUMABLE'`` then the 

316 signature will additionally contain the `x-goog-resumable` 

317 header, and the method changed to POST. See the signed URL 

318 docs regarding this flow: 

319 https://cloud.google.com/storage/docs/access-control/signed-urls 

320 

321 

322 :type content_md5: str 

323 :param content_md5: (Optional) The MD5 hash of the object referenced by 

324 ``resource``. 

325 

326 :type content_type: str 

327 :param content_type: (Optional) The content type of the object referenced 

328 by ``resource``. 

329 

330 :type response_type: str 

331 :param response_type: (Optional) Content type of responses to requests for 

332 the signed URL. Ignored if content_type is set on 

333 object/blob metadata. 

334 

335 :type response_disposition: str 

336 :param response_disposition: (Optional) Content disposition of responses to 

337 requests for the signed URL. 

338 

339 :type generation: str 

340 :param generation: (Optional) A value that indicates which generation of 

341 the resource to fetch. 

342 

343 :type headers: Union[dict|List(Tuple(str,str))] 

344 :param headers: 

345 (Optional) Additional HTTP headers to be included as part of the 

346 signed URLs. See: 

347 https://cloud.google.com/storage/docs/xml-api/reference-headers 

348 Requests using the signed URL *must* pass the specified header 

349 (name and value) with each request for the URL. 

350 

351 :type service_account_email: str 

352 :param service_account_email: (Optional) E-mail address of the service account. 

353 

354 :type access_token: str 

355 :param access_token: (Optional) Access token for a service account. 

356 

357 :type query_parameters: dict 

358 :param query_parameters: 

359 (Optional) Additional query parameters to be included as part of the 

360 signed URLs. See: 

361 https://cloud.google.com/storage/docs/xml-api/reference-headers#query 

362 

363 :raises: :exc:`TypeError` when expiration is not a valid type. 

364 :raises: :exc:`AttributeError` if credentials is not an instance 

365 of :class:`google.auth.credentials.Signing`. 

366 

367 :rtype: str 

368 :returns: A signed URL you can use to access the resource 

369 until expiration. 

370 """ 

371 expiration_stamp = get_expiration_seconds_v2(expiration) 

372 

373 canonical = canonicalize_v2(method, resource, query_parameters, headers) 

374 

375 # Generate the string to sign. 

376 elements_to_sign = [ 

377 canonical.method, 

378 content_md5 or "", 

379 content_type or "", 

380 str(expiration_stamp), 

381 ] 

382 elements_to_sign.extend(canonical.headers) 

383 elements_to_sign.append(canonical.resource) 

384 string_to_sign = "\n".join(elements_to_sign) 

385 

386 # If you are on Google Compute Engine, you can't generate a signed URL. 

387 # See https://github.com/googleapis/google-cloud-python/issues/922 

388 # Set the right query parameters. 

389 if access_token and service_account_email: 

390 signature = _sign_message( 

391 string_to_sign, access_token, service_account_email, universe_domain 

392 ) 

393 signed_query_params = { 

394 "GoogleAccessId": service_account_email, 

395 "Expires": expiration_stamp, 

396 "Signature": signature, 

397 } 

398 else: 

399 signed_query_params = get_signed_query_params_v2( 

400 credentials, expiration_stamp, string_to_sign 

401 ) 

402 

403 if response_type is not None: 

404 signed_query_params["response-content-type"] = response_type 

405 if response_disposition is not None: 

406 signed_query_params["response-content-disposition"] = response_disposition 

407 if generation is not None: 

408 signed_query_params["generation"] = generation 

409 

410 signed_query_params.update(canonical.query_parameters) 

411 sorted_signed_query_params = sorted(signed_query_params.items()) 

412 

413 # Return the built URL. 

414 return "{endpoint}{resource}?{querystring}".format( 

415 endpoint=api_access_endpoint, 

416 resource=resource, 

417 querystring=urllib.parse.urlencode(sorted_signed_query_params), 

418 ) 

419 

420 

421SEVEN_DAYS = 7 * 24 * 60 * 60 # max age for V4 signed URLs. 

422DEFAULT_ENDPOINT = "https://storage.googleapis.com" 

423 

424 

425def generate_signed_url_v4( 

426 credentials, 

427 resource, 

428 expiration, 

429 api_access_endpoint=DEFAULT_ENDPOINT, 

430 method="GET", 

431 content_md5=None, 

432 content_type=None, 

433 response_type=None, 

434 response_disposition=None, 

435 generation=None, 

436 headers=None, 

437 query_parameters=None, 

438 service_account_email=None, 

439 access_token=None, 

440 universe_domain=None, 

441 _request_timestamp=None, # for testing only 

442): 

443 """Generate a V4 signed URL to provide query-string auth'n to a resource. 

444 

445 .. note:: 

446 

447 Assumes ``credentials`` implements the 

448 :class:`google.auth.credentials.Signing` interface. Also assumes 

449 ``credentials`` has a ``signer_email`` property which 

450 identifies the credentials. 

451 

452 .. note:: 

453 

454 If you are on Google Compute Engine, you can't generate a signed URL. 

455 If you'd like to be able to generate a signed URL from GCE,you can use a 

456 standard service account from a JSON file rather than a GCE service account. 

457 

458 See headers [reference](https://cloud.google.com/storage/docs/reference-headers) 

459 for more details on optional arguments. 

460 

461 :type credentials: :class:`google.auth.credentials.Signing` 

462 :param credentials: Credentials object with an associated private key to 

463 sign text. That credentials must provide signer_email 

464 only if service_account_email and access_token are not 

465 passed. 

466 

467 :type resource: str 

468 :param resource: A pointer to a specific resource 

469 (typically, ``/bucket-name/path/to/blob.txt``). 

470 Caller should have already URL-encoded the value. 

471 

472 :type expiration: Union[Integer, datetime.datetime, datetime.timedelta] 

473 :param expiration: Point in time when the signed URL should expire. If 

474 a ``datetime`` instance is passed without an explicit 

475 ``tzinfo`` set, it will be assumed to be ``UTC``. 

476 

477 :type api_access_endpoint: str 

478 :param api_access_endpoint: URI base. Defaults to 

479 "https://storage.googleapis.com/" 

480 

481 :type method: str 

482 :param method: The HTTP verb that will be used when requesting the URL. 

483 Defaults to ``'GET'``. If method is ``'RESUMABLE'`` then the 

484 signature will additionally contain the `x-goog-resumable` 

485 header, and the method changed to POST. See the signed URL 

486 docs regarding this flow: 

487 https://cloud.google.com/storage/docs/access-control/signed-urls 

488 

489 

490 :type content_md5: str 

491 :param content_md5: (Optional) The MD5 hash of the object referenced by 

492 ``resource``. 

493 

494 :type content_type: str 

495 :param content_type: (Optional) The content type of the object referenced 

496 by ``resource``. 

497 

498 :type response_type: str 

499 :param response_type: (Optional) Content type of responses to requests for 

500 the signed URL. Ignored if content_type is set on 

501 object/blob metadata. 

502 

503 :type response_disposition: str 

504 :param response_disposition: (Optional) Content disposition of responses to 

505 requests for the signed URL. 

506 

507 :type generation: str 

508 :param generation: (Optional) A value that indicates which generation of 

509 the resource to fetch. 

510 

511 :type headers: dict 

512 :param headers: 

513 (Optional) Additional HTTP headers to be included as part of the 

514 signed URLs. See: 

515 https://cloud.google.com/storage/docs/xml-api/reference-headers 

516 Requests using the signed URL *must* pass the specified header 

517 (name and value) with each request for the URL. 

518 

519 :type query_parameters: dict 

520 :param query_parameters: 

521 (Optional) Additional query parameters to be included as part of the 

522 signed URLs. See: 

523 https://cloud.google.com/storage/docs/xml-api/reference-headers#query 

524 

525 :type service_account_email: str 

526 :param service_account_email: (Optional) E-mail address of the service account. 

527 

528 :type access_token: str 

529 :param access_token: (Optional) Access token for a service account. 

530 

531 :raises: :exc:`TypeError` when expiration is not a valid type. 

532 :raises: :exc:`AttributeError` if credentials is not an instance 

533 of :class:`google.auth.credentials.Signing`. 

534 

535 :rtype: str 

536 :returns: A signed URL you can use to access the resource 

537 until expiration. 

538 """ 

539 expiration_seconds = get_expiration_seconds_v4(expiration) 

540 

541 if _request_timestamp is None: 

542 request_timestamp, datestamp = get_v4_now_dtstamps() 

543 else: 

544 request_timestamp = _request_timestamp 

545 datestamp = _request_timestamp[:8] 

546 

547 # If you are on Google Compute Engine, you can't generate a signed URL. 

548 # See https://github.com/googleapis/google-cloud-python/issues/922 

549 client_email = service_account_email 

550 if not access_token or not service_account_email: 

551 ensure_signed_credentials(credentials) 

552 client_email = credentials.signer_email 

553 

554 credential_scope = f"{datestamp}/auto/storage/goog4_request" 

555 credential = f"{client_email}/{credential_scope}" 

556 

557 if headers is None: 

558 headers = {} 

559 

560 if content_type is not None: 

561 headers["Content-Type"] = content_type 

562 

563 if content_md5 is not None: 

564 headers["Content-MD5"] = content_md5 

565 

566 header_names = [key.lower() for key in headers] 

567 if "host" not in header_names: 

568 headers["Host"] = urllib.parse.urlparse(api_access_endpoint).netloc 

569 

570 if method.upper() == "RESUMABLE": 

571 method = "POST" 

572 headers["x-goog-resumable"] = "start" 

573 

574 canonical_headers, ordered_headers = get_canonical_headers(headers) 

575 canonical_header_string = ( 

576 "\n".join(canonical_headers) + "\n" 

577 ) # Yes, Virginia, the extra newline is part of the spec. 

578 signed_headers = ";".join([key for key, _ in ordered_headers]) 

579 

580 if query_parameters is None: 

581 query_parameters = {} 

582 else: 

583 query_parameters = {key: value or "" for key, value in query_parameters.items()} 

584 

585 query_parameters["X-Goog-Algorithm"] = "GOOG4-RSA-SHA256" 

586 query_parameters["X-Goog-Credential"] = credential 

587 query_parameters["X-Goog-Date"] = request_timestamp 

588 query_parameters["X-Goog-Expires"] = expiration_seconds 

589 query_parameters["X-Goog-SignedHeaders"] = signed_headers 

590 

591 if response_type is not None: 

592 query_parameters["response-content-type"] = response_type 

593 

594 if response_disposition is not None: 

595 query_parameters["response-content-disposition"] = response_disposition 

596 

597 if generation is not None: 

598 query_parameters["generation"] = generation 

599 

600 canonical_query_string = _url_encode(query_parameters) 

601 

602 lowercased_headers = dict(ordered_headers) 

603 

604 if "x-goog-content-sha256" in lowercased_headers: 

605 payload = lowercased_headers["x-goog-content-sha256"] 

606 else: 

607 payload = "UNSIGNED-PAYLOAD" 

608 

609 canonical_elements = [ 

610 method, 

611 resource, 

612 canonical_query_string, 

613 canonical_header_string, 

614 signed_headers, 

615 payload, 

616 ] 

617 canonical_request = "\n".join(canonical_elements) 

618 

619 canonical_request_hash = hashlib.sha256( 

620 canonical_request.encode("ascii") 

621 ).hexdigest() 

622 

623 string_elements = [ 

624 "GOOG4-RSA-SHA256", 

625 request_timestamp, 

626 credential_scope, 

627 canonical_request_hash, 

628 ] 

629 string_to_sign = "\n".join(string_elements) 

630 

631 if access_token and service_account_email: 

632 signature = _sign_message( 

633 string_to_sign, access_token, service_account_email, universe_domain 

634 ) 

635 signature_bytes = base64.b64decode(signature) 

636 signature = binascii.hexlify(signature_bytes).decode("ascii") 

637 else: 

638 signature_bytes = credentials.sign_bytes(string_to_sign.encode("ascii")) 

639 signature = binascii.hexlify(signature_bytes).decode("ascii") 

640 

641 return "{}{}?{}&X-Goog-Signature={}".format( 

642 api_access_endpoint, resource, canonical_query_string, signature 

643 ) 

644 

645 

646def get_v4_now_dtstamps(): 

647 """Get current timestamp and datestamp in V4 valid format. 

648 

649 :rtype: str, str 

650 :returns: Current timestamp, datestamp. 

651 """ 

652 now = _NOW(_UTC).replace(tzinfo=None) 

653 timestamp = now.strftime("%Y%m%dT%H%M%SZ") 

654 datestamp = now.date().strftime("%Y%m%d") 

655 return timestamp, datestamp 

656 

657 

658def _sign_message( 

659 message, 

660 access_token, 

661 service_account_email, 

662 universe_domain=_DEFAULT_UNIVERSE_DOMAIN, 

663): 

664 """Signs a message. 

665 

666 :type message: str 

667 :param message: The message to be signed. 

668 

669 :type access_token: str 

670 :param access_token: Access token for a service account. 

671 

672 

673 :type service_account_email: str 

674 :param service_account_email: E-mail address of the service account. 

675 

676 :raises: :exc:`TransportError` if an `access_token` is unauthorized. 

677 

678 :rtype: str 

679 :returns: The signature of the message. 

680 

681 """ 

682 message = _helpers._to_bytes(message) 

683 

684 method = "POST" 

685 url = f"https://iamcredentials.{universe_domain}/v1/projects/-/serviceAccounts/{service_account_email}:signBlob?alt=json" 

686 headers = { 

687 "Authorization": "Bearer " + access_token, 

688 "Content-type": "application/json", 

689 } 

690 body = json.dumps({"payload": base64.b64encode(message).decode("utf-8")}) 

691 request = requests.Request() 

692 

693 def retriable_request(): 

694 response = request(url=url, method=method, body=body, headers=headers) 

695 return response 

696 

697 # Apply the default retry object to the signBlob call. 

698 retry = DEFAULT_RETRY 

699 call = retry(retriable_request) 

700 response = call() 

701 

702 if response.status != http.client.OK: 

703 raise exceptions.TransportError( 

704 f"Error calling the IAM signBytes API: {response.data}" 

705 ) 

706 

707 data = json.loads(response.data.decode("utf-8")) 

708 return data["signedBlob"] 

709 

710 

711def _url_encode(query_params): 

712 """Encode query params into URL. 

713 

714 :type query_params: dict 

715 :param query_params: Query params to be encoded. 

716 

717 :rtype: str 

718 :returns: URL encoded query params. 

719 """ 

720 params = [ 

721 f"{_quote_param(name)}={_quote_param(value)}" 

722 for name, value in query_params.items() 

723 ] 

724 

725 return "&".join(sorted(params)) 

726 

727 

728def _quote_param(param): 

729 """Quote query param. 

730 

731 :type param: Any 

732 :param param: Query param to be encoded. 

733 

734 :rtype: str 

735 :returns: URL encoded query param. 

736 """ 

737 if not isinstance(param, bytes): 

738 param = str(param) 

739 return urllib.parse.quote(param, safe="~")