Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/storage/_signing.py: 17%

180 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:17 +0000

1# Copyright 2017 Google LLC 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import base64 

17import binascii 

18import collections 

19import datetime 

20import hashlib 

21import json 

22 

23import http 

24import urllib 

25 

26import google.auth.credentials 

27 

28from google.auth import exceptions 

29from google.auth.transport import requests 

30from google.cloud import _helpers 

31 

32 

33NOW = datetime.datetime.utcnow # To be replaced by tests. 

34 

35SERVICE_ACCOUNT_URL = ( 

36 "https://googleapis.dev/python/google-api-core/latest/" 

37 "auth.html#setting-up-a-service-account" 

38) 

39 

40 

41def ensure_signed_credentials(credentials): 

42 """Raise AttributeError if the credentials are unsigned. 

43 

44 :type credentials: :class:`google.auth.credentials.Signing` 

45 :param credentials: The credentials used to create a private key 

46 for signing text. 

47 

48 :raises: :exc:`AttributeError` if credentials is not an instance 

49 of :class:`google.auth.credentials.Signing`. 

50 """ 

51 if not isinstance(credentials, google.auth.credentials.Signing): 

52 raise AttributeError( 

53 "you need a private key to sign credentials." 

54 "the credentials you are currently using {} " 

55 "just contains a token. see {} for more " 

56 "details.".format(type(credentials), SERVICE_ACCOUNT_URL) 

57 ) 

58 

59 

60def get_signed_query_params_v2(credentials, expiration, string_to_sign): 

61 """Gets query parameters for creating a signed URL. 

62 

63 :type credentials: :class:`google.auth.credentials.Signing` 

64 :param credentials: The credentials used to create a private key 

65 for signing text. 

66 

67 :type expiration: int or long 

68 :param expiration: When the signed URL should expire. 

69 

70 :type string_to_sign: str 

71 :param string_to_sign: The string to be signed by the credentials. 

72 

73 :raises: :exc:`AttributeError` if credentials is not an instance 

74 of :class:`google.auth.credentials.Signing`. 

75 

76 :rtype: dict 

77 :returns: Query parameters matching the signing credentials with a 

78 signed payload. 

79 """ 

80 ensure_signed_credentials(credentials) 

81 signature_bytes = credentials.sign_bytes(string_to_sign.encode("ascii")) 

82 signature = base64.b64encode(signature_bytes) 

83 service_account_name = credentials.signer_email 

84 return { 

85 "GoogleAccessId": service_account_name, 

86 "Expires": expiration, 

87 "Signature": signature, 

88 } 

89 

90 

91def get_expiration_seconds_v2(expiration): 

92 """Convert 'expiration' to a number of seconds in the future. 

93 

94 :type expiration: Union[Integer, datetime.datetime, datetime.timedelta] 

95 :param expiration: Point in time when the signed URL should expire. If 

96 a ``datetime`` instance is passed without an explicit 

97 ``tzinfo`` set, it will be assumed to be ``UTC``. 

98 

99 :raises: :exc:`TypeError` when expiration is not a valid type. 

100 

101 :rtype: int 

102 :returns: a timestamp as an absolute number of seconds since epoch. 

103 """ 

104 # If it's a timedelta, add it to `now` in UTC. 

105 if isinstance(expiration, datetime.timedelta): 

106 now = NOW().replace(tzinfo=_helpers.UTC) 

107 expiration = now + expiration 

108 

109 # If it's a datetime, convert to a timestamp. 

110 if isinstance(expiration, datetime.datetime): 

111 micros = _helpers._microseconds_from_datetime(expiration) 

112 expiration = micros // 10**6 

113 

114 if not isinstance(expiration, int): 

115 raise TypeError( 

116 "Expected an integer timestamp, datetime, or " 

117 "timedelta. Got %s" % type(expiration) 

118 ) 

119 return expiration 

120 

121 

122_EXPIRATION_TYPES = (int, datetime.datetime, datetime.timedelta) 

123 

124 

125def get_expiration_seconds_v4(expiration): 

126 """Convert 'expiration' to a number of seconds offset from the current time. 

127 

128 :type expiration: Union[Integer, datetime.datetime, datetime.timedelta] 

129 :param expiration: Point in time when the signed URL should expire. If 

130 a ``datetime`` instance is passed without an explicit 

131 ``tzinfo`` set, it will be assumed to be ``UTC``. 

132 

133 :raises: :exc:`TypeError` when expiration is not a valid type. 

134 :raises: :exc:`ValueError` when expiration is too large. 

135 :rtype: Integer 

136 :returns: seconds in the future when the signed URL will expire 

137 """ 

138 if not isinstance(expiration, _EXPIRATION_TYPES): 

139 raise TypeError( 

140 "Expected an integer timestamp, datetime, or " 

141 "timedelta. Got %s" % type(expiration) 

142 ) 

143 

144 now = NOW().replace(tzinfo=_helpers.UTC) 

145 

146 if isinstance(expiration, int): 

147 seconds = expiration 

148 

149 if isinstance(expiration, datetime.datetime): 

150 

151 if expiration.tzinfo is None: 

152 expiration = expiration.replace(tzinfo=_helpers.UTC) 

153 

154 expiration = expiration - now 

155 

156 if isinstance(expiration, datetime.timedelta): 

157 seconds = int(expiration.total_seconds()) 

158 

159 if seconds > SEVEN_DAYS: 

160 raise ValueError(f"Max allowed expiration interval is seven days {SEVEN_DAYS}") 

161 

162 return seconds 

163 

164 

165def get_canonical_headers(headers): 

166 """Canonicalize headers for signing. 

167 

168 See: 

169 https://cloud.google.com/storage/docs/access-control/signed-urls#about-canonical-extension-headers 

170 

171 :type headers: Union[dict|List(Tuple(str,str))] 

172 :param headers: 

173 (Optional) Additional HTTP headers to be included as part of the 

174 signed URLs. See: 

175 https://cloud.google.com/storage/docs/xml-api/reference-headers 

176 Requests using the signed URL *must* pass the specified header 

177 (name and value) with each request for the URL. 

178 

179 :rtype: str 

180 :returns: List of headers, normalized / sortted per the URL refernced above. 

181 """ 

182 if headers is None: 

183 headers = [] 

184 elif isinstance(headers, dict): 

185 headers = list(headers.items()) 

186 

187 if not headers: 

188 return [], [] 

189 

190 normalized = collections.defaultdict(list) 

191 for key, val in headers: 

192 key = key.lower().strip() 

193 val = " ".join(val.split()) 

194 normalized[key].append(val) 

195 

196 ordered_headers = sorted((key, ",".join(val)) for key, val in normalized.items()) 

197 

198 canonical_headers = ["{}:{}".format(*item) for item in ordered_headers] 

199 return canonical_headers, ordered_headers 

200 

201 

202_Canonical = collections.namedtuple( 

203 "_Canonical", ["method", "resource", "query_parameters", "headers"] 

204) 

205 

206 

207def canonicalize_v2(method, resource, query_parameters, headers): 

208 """Canonicalize method, resource per the V2 spec. 

209 

210 :type method: str 

211 :param method: The HTTP verb that will be used when requesting the URL. 

212 Defaults to ``'GET'``. If method is ``'RESUMABLE'`` then the 

213 signature will additionally contain the `x-goog-resumable` 

214 header, and the method changed to POST. See the signed URL 

215 docs regarding this flow: 

216 https://cloud.google.com/storage/docs/access-control/signed-urls 

217 

218 :type resource: str 

219 :param resource: A pointer to a specific resource 

220 (typically, ``/bucket-name/path/to/blob.txt``). 

221 

222 :type query_parameters: dict 

223 :param query_parameters: 

224 (Optional) Additional query parameters to be included as part of the 

225 signed URLs. See: 

226 https://cloud.google.com/storage/docs/xml-api/reference-headers#query 

227 

228 :type headers: Union[dict|List(Tuple(str,str))] 

229 :param headers: 

230 (Optional) Additional HTTP headers to be included as part of the 

231 signed URLs. See: 

232 https://cloud.google.com/storage/docs/xml-api/reference-headers 

233 Requests using the signed URL *must* pass the specified header 

234 (name and value) with each request for the URL. 

235 

236 :rtype: :class:_Canonical 

237 :returns: Canonical method, resource, query_parameters, and headers. 

238 """ 

239 headers, _ = get_canonical_headers(headers) 

240 

241 if method == "RESUMABLE": 

242 method = "POST" 

243 headers.append("x-goog-resumable:start") 

244 

245 if query_parameters is None: 

246 return _Canonical(method, resource, [], headers) 

247 

248 normalized_qp = sorted( 

249 (key.lower(), value and value.strip() or "") 

250 for key, value in query_parameters.items() 

251 ) 

252 encoded_qp = urllib.parse.urlencode(normalized_qp) 

253 canonical_resource = f"{resource}?{encoded_qp}" 

254 return _Canonical(method, canonical_resource, normalized_qp, headers) 

255 

256 

257def generate_signed_url_v2( 

258 credentials, 

259 resource, 

260 expiration, 

261 api_access_endpoint="", 

262 method="GET", 

263 content_md5=None, 

264 content_type=None, 

265 response_type=None, 

266 response_disposition=None, 

267 generation=None, 

268 headers=None, 

269 query_parameters=None, 

270 service_account_email=None, 

271 access_token=None, 

272): 

273 """Generate a V2 signed URL to provide query-string auth'n to a resource. 

274 

275 .. note:: 

276 

277 Assumes ``credentials`` implements the 

278 :class:`google.auth.credentials.Signing` interface. Also assumes 

279 ``credentials`` has a ``signer_email`` property which 

280 identifies the credentials. 

281 

282 .. note:: 

283 

284 If you are on Google Compute Engine, you can't generate a signed URL. 

285 If you'd like to be able to generate a signed URL from GCE, you can use a 

286 standard service account from a JSON file rather than a GCE service account. 

287 

288 See headers [reference](https://cloud.google.com/storage/docs/reference-headers) 

289 for more details on optional arguments. 

290 

291 :type credentials: :class:`google.auth.credentials.Signing` 

292 :param credentials: Credentials object with an associated private key to 

293 sign text. 

294 

295 :type resource: str 

296 :param resource: A pointer to a specific resource 

297 (typically, ``/bucket-name/path/to/blob.txt``). 

298 Caller should have already URL-encoded the value. 

299 

300 :type expiration: Union[Integer, datetime.datetime, datetime.timedelta] 

301 :param expiration: Point in time when the signed URL should expire. If 

302 a ``datetime`` instance is passed without an explicit 

303 ``tzinfo`` set, it will be assumed to be ``UTC``. 

304 

305 :type api_access_endpoint: str 

306 :param api_access_endpoint: (Optional) URI base. Defaults to empty string. 

307 

308 :type method: str 

309 :param method: The HTTP verb that will be used when requesting the URL. 

310 Defaults to ``'GET'``. If method is ``'RESUMABLE'`` then the 

311 signature will additionally contain the `x-goog-resumable` 

312 header, and the method changed to POST. See the signed URL 

313 docs regarding this flow: 

314 https://cloud.google.com/storage/docs/access-control/signed-urls 

315 

316 

317 :type content_md5: str 

318 :param content_md5: (Optional) The MD5 hash of the object referenced by 

319 ``resource``. 

320 

321 :type content_type: str 

322 :param content_type: (Optional) The content type of the object referenced 

323 by ``resource``. 

324 

325 :type response_type: str 

326 :param response_type: (Optional) Content type of responses to requests for 

327 the signed URL. Ignored if content_type is set on 

328 object/blob metadata. 

329 

330 :type response_disposition: str 

331 :param response_disposition: (Optional) Content disposition of responses to 

332 requests for the signed URL. 

333 

334 :type generation: str 

335 :param generation: (Optional) A value that indicates which generation of 

336 the resource to fetch. 

337 

338 :type headers: Union[dict|List(Tuple(str,str))] 

339 :param headers: 

340 (Optional) Additional HTTP headers to be included as part of the 

341 signed URLs. See: 

342 https://cloud.google.com/storage/docs/xml-api/reference-headers 

343 Requests using the signed URL *must* pass the specified header 

344 (name and value) with each request for the URL. 

345 

346 :type service_account_email: str 

347 :param service_account_email: (Optional) E-mail address of the service account. 

348 

349 :type access_token: str 

350 :param access_token: (Optional) Access token for a service account. 

351 

352 :type query_parameters: dict 

353 :param query_parameters: 

354 (Optional) Additional query parameters to be included as part of the 

355 signed URLs. See: 

356 https://cloud.google.com/storage/docs/xml-api/reference-headers#query 

357 

358 :raises: :exc:`TypeError` when expiration is not a valid type. 

359 :raises: :exc:`AttributeError` if credentials is not an instance 

360 of :class:`google.auth.credentials.Signing`. 

361 

362 :rtype: str 

363 :returns: A signed URL you can use to access the resource 

364 until expiration. 

365 """ 

366 expiration_stamp = get_expiration_seconds_v2(expiration) 

367 

368 canonical = canonicalize_v2(method, resource, query_parameters, headers) 

369 

370 # Generate the string to sign. 

371 elements_to_sign = [ 

372 canonical.method, 

373 content_md5 or "", 

374 content_type or "", 

375 str(expiration_stamp), 

376 ] 

377 elements_to_sign.extend(canonical.headers) 

378 elements_to_sign.append(canonical.resource) 

379 string_to_sign = "\n".join(elements_to_sign) 

380 

381 # If you are on Google Compute Engine, you can't generate a signed URL. 

382 # See https://github.com/googleapis/google-cloud-python/issues/922 

383 # Set the right query parameters. 

384 if access_token and service_account_email: 

385 signature = _sign_message(string_to_sign, access_token, service_account_email) 

386 signed_query_params = { 

387 "GoogleAccessId": service_account_email, 

388 "Expires": expiration_stamp, 

389 "Signature": signature, 

390 } 

391 else: 

392 signed_query_params = get_signed_query_params_v2( 

393 credentials, expiration_stamp, string_to_sign 

394 ) 

395 

396 if response_type is not None: 

397 signed_query_params["response-content-type"] = response_type 

398 if response_disposition is not None: 

399 signed_query_params["response-content-disposition"] = response_disposition 

400 if generation is not None: 

401 signed_query_params["generation"] = generation 

402 

403 signed_query_params.update(canonical.query_parameters) 

404 sorted_signed_query_params = sorted(signed_query_params.items()) 

405 

406 # Return the built URL. 

407 return "{endpoint}{resource}?{querystring}".format( 

408 endpoint=api_access_endpoint, 

409 resource=resource, 

410 querystring=urllib.parse.urlencode(sorted_signed_query_params), 

411 ) 

412 

413 

414SEVEN_DAYS = 7 * 24 * 60 * 60 # max age for V4 signed URLs. 

415DEFAULT_ENDPOINT = "https://storage.googleapis.com" 

416 

417 

418def generate_signed_url_v4( 

419 credentials, 

420 resource, 

421 expiration, 

422 api_access_endpoint=DEFAULT_ENDPOINT, 

423 method="GET", 

424 content_md5=None, 

425 content_type=None, 

426 response_type=None, 

427 response_disposition=None, 

428 generation=None, 

429 headers=None, 

430 query_parameters=None, 

431 service_account_email=None, 

432 access_token=None, 

433 _request_timestamp=None, # for testing only 

434): 

435 """Generate a V4 signed URL to provide query-string auth'n to a resource. 

436 

437 .. note:: 

438 

439 Assumes ``credentials`` implements the 

440 :class:`google.auth.credentials.Signing` interface. Also assumes 

441 ``credentials`` has a ``signer_email`` property which 

442 identifies the credentials. 

443 

444 .. note:: 

445 

446 If you are on Google Compute Engine, you can't generate a signed URL. 

447 If you'd like to be able to generate a signed URL from GCE,you can use a 

448 standard service account from a JSON file rather than a GCE service account. 

449 

450 See headers [reference](https://cloud.google.com/storage/docs/reference-headers) 

451 for more details on optional arguments. 

452 

453 :type credentials: :class:`google.auth.credentials.Signing` 

454 :param credentials: Credentials object with an associated private key to 

455 sign text. That credentials must provide signer_email 

456 only if service_account_email and access_token are not 

457 passed. 

458 

459 :type resource: str 

460 :param resource: A pointer to a specific resource 

461 (typically, ``/bucket-name/path/to/blob.txt``). 

462 Caller should have already URL-encoded the value. 

463 

464 :type expiration: Union[Integer, datetime.datetime, datetime.timedelta] 

465 :param expiration: Point in time when the signed URL should expire. If 

466 a ``datetime`` instance is passed without an explicit 

467 ``tzinfo`` set, it will be assumed to be ``UTC``. 

468 

469 :type api_access_endpoint: str 

470 :param api_access_endpoint: (Optional) URI base. Defaults to 

471 "https://storage.googleapis.com/" 

472 

473 :type method: str 

474 :param method: The HTTP verb that will be used when requesting the URL. 

475 Defaults to ``'GET'``. If method is ``'RESUMABLE'`` then the 

476 signature will additionally contain the `x-goog-resumable` 

477 header, and the method changed to POST. See the signed URL 

478 docs regarding this flow: 

479 https://cloud.google.com/storage/docs/access-control/signed-urls 

480 

481 

482 :type content_md5: str 

483 :param content_md5: (Optional) The MD5 hash of the object referenced by 

484 ``resource``. 

485 

486 :type content_type: str 

487 :param content_type: (Optional) The content type of the object referenced 

488 by ``resource``. 

489 

490 :type response_type: str 

491 :param response_type: (Optional) Content type of responses to requests for 

492 the signed URL. Ignored if content_type is set on 

493 object/blob metadata. 

494 

495 :type response_disposition: str 

496 :param response_disposition: (Optional) Content disposition of responses to 

497 requests for the signed URL. 

498 

499 :type generation: str 

500 :param generation: (Optional) A value that indicates which generation of 

501 the resource to fetch. 

502 

503 :type headers: dict 

504 :param headers: 

505 (Optional) Additional HTTP headers to be included as part of the 

506 signed URLs. See: 

507 https://cloud.google.com/storage/docs/xml-api/reference-headers 

508 Requests using the signed URL *must* pass the specified header 

509 (name and value) with each request for the URL. 

510 

511 :type query_parameters: dict 

512 :param query_parameters: 

513 (Optional) Additional query parameters to be included as part of the 

514 signed URLs. See: 

515 https://cloud.google.com/storage/docs/xml-api/reference-headers#query 

516 

517 :type service_account_email: str 

518 :param service_account_email: (Optional) E-mail address of the service account. 

519 

520 :type access_token: str 

521 :param access_token: (Optional) Access token for a service account. 

522 

523 :raises: :exc:`TypeError` when expiration is not a valid type. 

524 :raises: :exc:`AttributeError` if credentials is not an instance 

525 of :class:`google.auth.credentials.Signing`. 

526 

527 :rtype: str 

528 :returns: A signed URL you can use to access the resource 

529 until expiration. 

530 """ 

531 expiration_seconds = get_expiration_seconds_v4(expiration) 

532 

533 if _request_timestamp is None: 

534 request_timestamp, datestamp = get_v4_now_dtstamps() 

535 else: 

536 request_timestamp = _request_timestamp 

537 datestamp = _request_timestamp[:8] 

538 

539 # If you are on Google Compute Engine, you can't generate a signed URL. 

540 # See https://github.com/googleapis/google-cloud-python/issues/922 

541 client_email = service_account_email 

542 if not access_token or not service_account_email: 

543 ensure_signed_credentials(credentials) 

544 client_email = credentials.signer_email 

545 

546 credential_scope = f"{datestamp}/auto/storage/goog4_request" 

547 credential = f"{client_email}/{credential_scope}" 

548 

549 if headers is None: 

550 headers = {} 

551 

552 if content_type is not None: 

553 headers["Content-Type"] = content_type 

554 

555 if content_md5 is not None: 

556 headers["Content-MD5"] = content_md5 

557 

558 header_names = [key.lower() for key in headers] 

559 if "host" not in header_names: 

560 headers["Host"] = urllib.parse.urlparse(api_access_endpoint).netloc 

561 

562 if method.upper() == "RESUMABLE": 

563 method = "POST" 

564 headers["x-goog-resumable"] = "start" 

565 

566 canonical_headers, ordered_headers = get_canonical_headers(headers) 

567 canonical_header_string = ( 

568 "\n".join(canonical_headers) + "\n" 

569 ) # Yes, Virginia, the extra newline is part of the spec. 

570 signed_headers = ";".join([key for key, _ in ordered_headers]) 

571 

572 if query_parameters is None: 

573 query_parameters = {} 

574 else: 

575 query_parameters = {key: value or "" for key, value in query_parameters.items()} 

576 

577 query_parameters["X-Goog-Algorithm"] = "GOOG4-RSA-SHA256" 

578 query_parameters["X-Goog-Credential"] = credential 

579 query_parameters["X-Goog-Date"] = request_timestamp 

580 query_parameters["X-Goog-Expires"] = expiration_seconds 

581 query_parameters["X-Goog-SignedHeaders"] = signed_headers 

582 

583 if response_type is not None: 

584 query_parameters["response-content-type"] = response_type 

585 

586 if response_disposition is not None: 

587 query_parameters["response-content-disposition"] = response_disposition 

588 

589 if generation is not None: 

590 query_parameters["generation"] = generation 

591 

592 canonical_query_string = _url_encode(query_parameters) 

593 

594 lowercased_headers = dict(ordered_headers) 

595 

596 if "x-goog-content-sha256" in lowercased_headers: 

597 payload = lowercased_headers["x-goog-content-sha256"] 

598 else: 

599 payload = "UNSIGNED-PAYLOAD" 

600 

601 canonical_elements = [ 

602 method, 

603 resource, 

604 canonical_query_string, 

605 canonical_header_string, 

606 signed_headers, 

607 payload, 

608 ] 

609 canonical_request = "\n".join(canonical_elements) 

610 

611 canonical_request_hash = hashlib.sha256( 

612 canonical_request.encode("ascii") 

613 ).hexdigest() 

614 

615 string_elements = [ 

616 "GOOG4-RSA-SHA256", 

617 request_timestamp, 

618 credential_scope, 

619 canonical_request_hash, 

620 ] 

621 string_to_sign = "\n".join(string_elements) 

622 

623 if access_token and service_account_email: 

624 signature = _sign_message(string_to_sign, access_token, service_account_email) 

625 signature_bytes = base64.b64decode(signature) 

626 signature = binascii.hexlify(signature_bytes).decode("ascii") 

627 else: 

628 signature_bytes = credentials.sign_bytes(string_to_sign.encode("ascii")) 

629 signature = binascii.hexlify(signature_bytes).decode("ascii") 

630 

631 return "{}{}?{}&X-Goog-Signature={}".format( 

632 api_access_endpoint, resource, canonical_query_string, signature 

633 ) 

634 

635 

636def get_v4_now_dtstamps(): 

637 """Get current timestamp and datestamp in V4 valid format. 

638 

639 :rtype: str, str 

640 :returns: Current timestamp, datestamp. 

641 """ 

642 now = NOW() 

643 timestamp = now.strftime("%Y%m%dT%H%M%SZ") 

644 datestamp = now.date().strftime("%Y%m%d") 

645 return timestamp, datestamp 

646 

647 

648def _sign_message(message, access_token, service_account_email): 

649 

650 """Signs a message. 

651 

652 :type message: str 

653 :param message: The message to be signed. 

654 

655 :type access_token: str 

656 :param access_token: Access token for a service account. 

657 

658 

659 :type service_account_email: str 

660 :param service_account_email: E-mail address of the service account. 

661 

662 :raises: :exc:`TransportError` if an `access_token` is unauthorized. 

663 

664 :rtype: str 

665 :returns: The signature of the message. 

666 

667 """ 

668 message = _helpers._to_bytes(message) 

669 

670 method = "POST" 

671 url = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{}:signBlob?alt=json".format( 

672 service_account_email 

673 ) 

674 headers = { 

675 "Authorization": "Bearer " + access_token, 

676 "Content-type": "application/json", 

677 } 

678 body = json.dumps({"payload": base64.b64encode(message).decode("utf-8")}) 

679 

680 request = requests.Request() 

681 response = request(url=url, method=method, body=body, headers=headers) 

682 

683 if response.status != http.client.OK: 

684 raise exceptions.TransportError( 

685 f"Error calling the IAM signBytes API: {response.data}" 

686 ) 

687 

688 data = json.loads(response.data.decode("utf-8")) 

689 return data["signedBlob"] 

690 

691 

692def _url_encode(query_params): 

693 """Encode query params into URL. 

694 

695 :type query_params: dict 

696 :param query_params: Query params to be encoded. 

697 

698 :rtype: str 

699 :returns: URL encoded query params. 

700 """ 

701 params = [ 

702 f"{_quote_param(name)}={_quote_param(value)}" 

703 for name, value in query_params.items() 

704 ] 

705 

706 return "&".join(sorted(params)) 

707 

708 

709def _quote_param(param): 

710 """Quote query param. 

711 

712 :type param: Any 

713 :param param: Query param to be encoded. 

714 

715 :rtype: str 

716 :returns: URL encoded query param. 

717 """ 

718 if not isinstance(param, bytes): 

719 param = str(param) 

720 return urllib.parse.quote(param, safe="~")