Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/cloud/storage/_signing.py: 17%
180 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:17 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:17 +0000
1# Copyright 2017 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16import base64
17import binascii
18import collections
19import datetime
20import hashlib
21import json
23import http
24import urllib
26import google.auth.credentials
28from google.auth import exceptions
29from google.auth.transport import requests
30from google.cloud import _helpers
33NOW = datetime.datetime.utcnow # To be replaced by tests.
35SERVICE_ACCOUNT_URL = (
36 "https://googleapis.dev/python/google-api-core/latest/"
37 "auth.html#setting-up-a-service-account"
38)
41def ensure_signed_credentials(credentials):
42 """Raise AttributeError if the credentials are unsigned.
44 :type credentials: :class:`google.auth.credentials.Signing`
45 :param credentials: The credentials used to create a private key
46 for signing text.
48 :raises: :exc:`AttributeError` if credentials is not an instance
49 of :class:`google.auth.credentials.Signing`.
50 """
51 if not isinstance(credentials, google.auth.credentials.Signing):
52 raise AttributeError(
53 "you need a private key to sign credentials."
54 "the credentials you are currently using {} "
55 "just contains a token. see {} for more "
56 "details.".format(type(credentials), SERVICE_ACCOUNT_URL)
57 )
60def get_signed_query_params_v2(credentials, expiration, string_to_sign):
61 """Gets query parameters for creating a signed URL.
63 :type credentials: :class:`google.auth.credentials.Signing`
64 :param credentials: The credentials used to create a private key
65 for signing text.
67 :type expiration: int or long
68 :param expiration: When the signed URL should expire.
70 :type string_to_sign: str
71 :param string_to_sign: The string to be signed by the credentials.
73 :raises: :exc:`AttributeError` if credentials is not an instance
74 of :class:`google.auth.credentials.Signing`.
76 :rtype: dict
77 :returns: Query parameters matching the signing credentials with a
78 signed payload.
79 """
80 ensure_signed_credentials(credentials)
81 signature_bytes = credentials.sign_bytes(string_to_sign.encode("ascii"))
82 signature = base64.b64encode(signature_bytes)
83 service_account_name = credentials.signer_email
84 return {
85 "GoogleAccessId": service_account_name,
86 "Expires": expiration,
87 "Signature": signature,
88 }
91def get_expiration_seconds_v2(expiration):
92 """Convert 'expiration' to a number of seconds in the future.
94 :type expiration: Union[Integer, datetime.datetime, datetime.timedelta]
95 :param expiration: Point in time when the signed URL should expire. If
96 a ``datetime`` instance is passed without an explicit
97 ``tzinfo`` set, it will be assumed to be ``UTC``.
99 :raises: :exc:`TypeError` when expiration is not a valid type.
101 :rtype: int
102 :returns: a timestamp as an absolute number of seconds since epoch.
103 """
104 # If it's a timedelta, add it to `now` in UTC.
105 if isinstance(expiration, datetime.timedelta):
106 now = NOW().replace(tzinfo=_helpers.UTC)
107 expiration = now + expiration
109 # If it's a datetime, convert to a timestamp.
110 if isinstance(expiration, datetime.datetime):
111 micros = _helpers._microseconds_from_datetime(expiration)
112 expiration = micros // 10**6
114 if not isinstance(expiration, int):
115 raise TypeError(
116 "Expected an integer timestamp, datetime, or "
117 "timedelta. Got %s" % type(expiration)
118 )
119 return expiration
122_EXPIRATION_TYPES = (int, datetime.datetime, datetime.timedelta)
125def get_expiration_seconds_v4(expiration):
126 """Convert 'expiration' to a number of seconds offset from the current time.
128 :type expiration: Union[Integer, datetime.datetime, datetime.timedelta]
129 :param expiration: Point in time when the signed URL should expire. If
130 a ``datetime`` instance is passed without an explicit
131 ``tzinfo`` set, it will be assumed to be ``UTC``.
133 :raises: :exc:`TypeError` when expiration is not a valid type.
134 :raises: :exc:`ValueError` when expiration is too large.
135 :rtype: Integer
136 :returns: seconds in the future when the signed URL will expire
137 """
138 if not isinstance(expiration, _EXPIRATION_TYPES):
139 raise TypeError(
140 "Expected an integer timestamp, datetime, or "
141 "timedelta. Got %s" % type(expiration)
142 )
144 now = NOW().replace(tzinfo=_helpers.UTC)
146 if isinstance(expiration, int):
147 seconds = expiration
149 if isinstance(expiration, datetime.datetime):
151 if expiration.tzinfo is None:
152 expiration = expiration.replace(tzinfo=_helpers.UTC)
154 expiration = expiration - now
156 if isinstance(expiration, datetime.timedelta):
157 seconds = int(expiration.total_seconds())
159 if seconds > SEVEN_DAYS:
160 raise ValueError(f"Max allowed expiration interval is seven days {SEVEN_DAYS}")
162 return seconds
165def get_canonical_headers(headers):
166 """Canonicalize headers for signing.
168 See:
169 https://cloud.google.com/storage/docs/access-control/signed-urls#about-canonical-extension-headers
171 :type headers: Union[dict|List(Tuple(str,str))]
172 :param headers:
173 (Optional) Additional HTTP headers to be included as part of the
174 signed URLs. See:
175 https://cloud.google.com/storage/docs/xml-api/reference-headers
176 Requests using the signed URL *must* pass the specified header
177 (name and value) with each request for the URL.
179 :rtype: str
180 :returns: List of headers, normalized / sortted per the URL refernced above.
181 """
182 if headers is None:
183 headers = []
184 elif isinstance(headers, dict):
185 headers = list(headers.items())
187 if not headers:
188 return [], []
190 normalized = collections.defaultdict(list)
191 for key, val in headers:
192 key = key.lower().strip()
193 val = " ".join(val.split())
194 normalized[key].append(val)
196 ordered_headers = sorted((key, ",".join(val)) for key, val in normalized.items())
198 canonical_headers = ["{}:{}".format(*item) for item in ordered_headers]
199 return canonical_headers, ordered_headers
202_Canonical = collections.namedtuple(
203 "_Canonical", ["method", "resource", "query_parameters", "headers"]
204)
207def canonicalize_v2(method, resource, query_parameters, headers):
208 """Canonicalize method, resource per the V2 spec.
210 :type method: str
211 :param method: The HTTP verb that will be used when requesting the URL.
212 Defaults to ``'GET'``. If method is ``'RESUMABLE'`` then the
213 signature will additionally contain the `x-goog-resumable`
214 header, and the method changed to POST. See the signed URL
215 docs regarding this flow:
216 https://cloud.google.com/storage/docs/access-control/signed-urls
218 :type resource: str
219 :param resource: A pointer to a specific resource
220 (typically, ``/bucket-name/path/to/blob.txt``).
222 :type query_parameters: dict
223 :param query_parameters:
224 (Optional) Additional query parameters to be included as part of the
225 signed URLs. See:
226 https://cloud.google.com/storage/docs/xml-api/reference-headers#query
228 :type headers: Union[dict|List(Tuple(str,str))]
229 :param headers:
230 (Optional) Additional HTTP headers to be included as part of the
231 signed URLs. See:
232 https://cloud.google.com/storage/docs/xml-api/reference-headers
233 Requests using the signed URL *must* pass the specified header
234 (name and value) with each request for the URL.
236 :rtype: :class:_Canonical
237 :returns: Canonical method, resource, query_parameters, and headers.
238 """
239 headers, _ = get_canonical_headers(headers)
241 if method == "RESUMABLE":
242 method = "POST"
243 headers.append("x-goog-resumable:start")
245 if query_parameters is None:
246 return _Canonical(method, resource, [], headers)
248 normalized_qp = sorted(
249 (key.lower(), value and value.strip() or "")
250 for key, value in query_parameters.items()
251 )
252 encoded_qp = urllib.parse.urlencode(normalized_qp)
253 canonical_resource = f"{resource}?{encoded_qp}"
254 return _Canonical(method, canonical_resource, normalized_qp, headers)
257def generate_signed_url_v2(
258 credentials,
259 resource,
260 expiration,
261 api_access_endpoint="",
262 method="GET",
263 content_md5=None,
264 content_type=None,
265 response_type=None,
266 response_disposition=None,
267 generation=None,
268 headers=None,
269 query_parameters=None,
270 service_account_email=None,
271 access_token=None,
272):
273 """Generate a V2 signed URL to provide query-string auth'n to a resource.
275 .. note::
277 Assumes ``credentials`` implements the
278 :class:`google.auth.credentials.Signing` interface. Also assumes
279 ``credentials`` has a ``signer_email`` property which
280 identifies the credentials.
282 .. note::
284 If you are on Google Compute Engine, you can't generate a signed URL.
285 If you'd like to be able to generate a signed URL from GCE, you can use a
286 standard service account from a JSON file rather than a GCE service account.
288 See headers [reference](https://cloud.google.com/storage/docs/reference-headers)
289 for more details on optional arguments.
291 :type credentials: :class:`google.auth.credentials.Signing`
292 :param credentials: Credentials object with an associated private key to
293 sign text.
295 :type resource: str
296 :param resource: A pointer to a specific resource
297 (typically, ``/bucket-name/path/to/blob.txt``).
298 Caller should have already URL-encoded the value.
300 :type expiration: Union[Integer, datetime.datetime, datetime.timedelta]
301 :param expiration: Point in time when the signed URL should expire. If
302 a ``datetime`` instance is passed without an explicit
303 ``tzinfo`` set, it will be assumed to be ``UTC``.
305 :type api_access_endpoint: str
306 :param api_access_endpoint: (Optional) URI base. Defaults to empty string.
308 :type method: str
309 :param method: The HTTP verb that will be used when requesting the URL.
310 Defaults to ``'GET'``. If method is ``'RESUMABLE'`` then the
311 signature will additionally contain the `x-goog-resumable`
312 header, and the method changed to POST. See the signed URL
313 docs regarding this flow:
314 https://cloud.google.com/storage/docs/access-control/signed-urls
317 :type content_md5: str
318 :param content_md5: (Optional) The MD5 hash of the object referenced by
319 ``resource``.
321 :type content_type: str
322 :param content_type: (Optional) The content type of the object referenced
323 by ``resource``.
325 :type response_type: str
326 :param response_type: (Optional) Content type of responses to requests for
327 the signed URL. Ignored if content_type is set on
328 object/blob metadata.
330 :type response_disposition: str
331 :param response_disposition: (Optional) Content disposition of responses to
332 requests for the signed URL.
334 :type generation: str
335 :param generation: (Optional) A value that indicates which generation of
336 the resource to fetch.
338 :type headers: Union[dict|List(Tuple(str,str))]
339 :param headers:
340 (Optional) Additional HTTP headers to be included as part of the
341 signed URLs. See:
342 https://cloud.google.com/storage/docs/xml-api/reference-headers
343 Requests using the signed URL *must* pass the specified header
344 (name and value) with each request for the URL.
346 :type service_account_email: str
347 :param service_account_email: (Optional) E-mail address of the service account.
349 :type access_token: str
350 :param access_token: (Optional) Access token for a service account.
352 :type query_parameters: dict
353 :param query_parameters:
354 (Optional) Additional query parameters to be included as part of the
355 signed URLs. See:
356 https://cloud.google.com/storage/docs/xml-api/reference-headers#query
358 :raises: :exc:`TypeError` when expiration is not a valid type.
359 :raises: :exc:`AttributeError` if credentials is not an instance
360 of :class:`google.auth.credentials.Signing`.
362 :rtype: str
363 :returns: A signed URL you can use to access the resource
364 until expiration.
365 """
366 expiration_stamp = get_expiration_seconds_v2(expiration)
368 canonical = canonicalize_v2(method, resource, query_parameters, headers)
370 # Generate the string to sign.
371 elements_to_sign = [
372 canonical.method,
373 content_md5 or "",
374 content_type or "",
375 str(expiration_stamp),
376 ]
377 elements_to_sign.extend(canonical.headers)
378 elements_to_sign.append(canonical.resource)
379 string_to_sign = "\n".join(elements_to_sign)
381 # If you are on Google Compute Engine, you can't generate a signed URL.
382 # See https://github.com/googleapis/google-cloud-python/issues/922
383 # Set the right query parameters.
384 if access_token and service_account_email:
385 signature = _sign_message(string_to_sign, access_token, service_account_email)
386 signed_query_params = {
387 "GoogleAccessId": service_account_email,
388 "Expires": expiration_stamp,
389 "Signature": signature,
390 }
391 else:
392 signed_query_params = get_signed_query_params_v2(
393 credentials, expiration_stamp, string_to_sign
394 )
396 if response_type is not None:
397 signed_query_params["response-content-type"] = response_type
398 if response_disposition is not None:
399 signed_query_params["response-content-disposition"] = response_disposition
400 if generation is not None:
401 signed_query_params["generation"] = generation
403 signed_query_params.update(canonical.query_parameters)
404 sorted_signed_query_params = sorted(signed_query_params.items())
406 # Return the built URL.
407 return "{endpoint}{resource}?{querystring}".format(
408 endpoint=api_access_endpoint,
409 resource=resource,
410 querystring=urllib.parse.urlencode(sorted_signed_query_params),
411 )
414SEVEN_DAYS = 7 * 24 * 60 * 60 # max age for V4 signed URLs.
415DEFAULT_ENDPOINT = "https://storage.googleapis.com"
418def generate_signed_url_v4(
419 credentials,
420 resource,
421 expiration,
422 api_access_endpoint=DEFAULT_ENDPOINT,
423 method="GET",
424 content_md5=None,
425 content_type=None,
426 response_type=None,
427 response_disposition=None,
428 generation=None,
429 headers=None,
430 query_parameters=None,
431 service_account_email=None,
432 access_token=None,
433 _request_timestamp=None, # for testing only
434):
435 """Generate a V4 signed URL to provide query-string auth'n to a resource.
437 .. note::
439 Assumes ``credentials`` implements the
440 :class:`google.auth.credentials.Signing` interface. Also assumes
441 ``credentials`` has a ``signer_email`` property which
442 identifies the credentials.
444 .. note::
446 If you are on Google Compute Engine, you can't generate a signed URL.
447 If you'd like to be able to generate a signed URL from GCE,you can use a
448 standard service account from a JSON file rather than a GCE service account.
450 See headers [reference](https://cloud.google.com/storage/docs/reference-headers)
451 for more details on optional arguments.
453 :type credentials: :class:`google.auth.credentials.Signing`
454 :param credentials: Credentials object with an associated private key to
455 sign text. That credentials must provide signer_email
456 only if service_account_email and access_token are not
457 passed.
459 :type resource: str
460 :param resource: A pointer to a specific resource
461 (typically, ``/bucket-name/path/to/blob.txt``).
462 Caller should have already URL-encoded the value.
464 :type expiration: Union[Integer, datetime.datetime, datetime.timedelta]
465 :param expiration: Point in time when the signed URL should expire. If
466 a ``datetime`` instance is passed without an explicit
467 ``tzinfo`` set, it will be assumed to be ``UTC``.
469 :type api_access_endpoint: str
470 :param api_access_endpoint: (Optional) URI base. Defaults to
471 "https://storage.googleapis.com/"
473 :type method: str
474 :param method: The HTTP verb that will be used when requesting the URL.
475 Defaults to ``'GET'``. If method is ``'RESUMABLE'`` then the
476 signature will additionally contain the `x-goog-resumable`
477 header, and the method changed to POST. See the signed URL
478 docs regarding this flow:
479 https://cloud.google.com/storage/docs/access-control/signed-urls
482 :type content_md5: str
483 :param content_md5: (Optional) The MD5 hash of the object referenced by
484 ``resource``.
486 :type content_type: str
487 :param content_type: (Optional) The content type of the object referenced
488 by ``resource``.
490 :type response_type: str
491 :param response_type: (Optional) Content type of responses to requests for
492 the signed URL. Ignored if content_type is set on
493 object/blob metadata.
495 :type response_disposition: str
496 :param response_disposition: (Optional) Content disposition of responses to
497 requests for the signed URL.
499 :type generation: str
500 :param generation: (Optional) A value that indicates which generation of
501 the resource to fetch.
503 :type headers: dict
504 :param headers:
505 (Optional) Additional HTTP headers to be included as part of the
506 signed URLs. See:
507 https://cloud.google.com/storage/docs/xml-api/reference-headers
508 Requests using the signed URL *must* pass the specified header
509 (name and value) with each request for the URL.
511 :type query_parameters: dict
512 :param query_parameters:
513 (Optional) Additional query parameters to be included as part of the
514 signed URLs. See:
515 https://cloud.google.com/storage/docs/xml-api/reference-headers#query
517 :type service_account_email: str
518 :param service_account_email: (Optional) E-mail address of the service account.
520 :type access_token: str
521 :param access_token: (Optional) Access token for a service account.
523 :raises: :exc:`TypeError` when expiration is not a valid type.
524 :raises: :exc:`AttributeError` if credentials is not an instance
525 of :class:`google.auth.credentials.Signing`.
527 :rtype: str
528 :returns: A signed URL you can use to access the resource
529 until expiration.
530 """
531 expiration_seconds = get_expiration_seconds_v4(expiration)
533 if _request_timestamp is None:
534 request_timestamp, datestamp = get_v4_now_dtstamps()
535 else:
536 request_timestamp = _request_timestamp
537 datestamp = _request_timestamp[:8]
539 # If you are on Google Compute Engine, you can't generate a signed URL.
540 # See https://github.com/googleapis/google-cloud-python/issues/922
541 client_email = service_account_email
542 if not access_token or not service_account_email:
543 ensure_signed_credentials(credentials)
544 client_email = credentials.signer_email
546 credential_scope = f"{datestamp}/auto/storage/goog4_request"
547 credential = f"{client_email}/{credential_scope}"
549 if headers is None:
550 headers = {}
552 if content_type is not None:
553 headers["Content-Type"] = content_type
555 if content_md5 is not None:
556 headers["Content-MD5"] = content_md5
558 header_names = [key.lower() for key in headers]
559 if "host" not in header_names:
560 headers["Host"] = urllib.parse.urlparse(api_access_endpoint).netloc
562 if method.upper() == "RESUMABLE":
563 method = "POST"
564 headers["x-goog-resumable"] = "start"
566 canonical_headers, ordered_headers = get_canonical_headers(headers)
567 canonical_header_string = (
568 "\n".join(canonical_headers) + "\n"
569 ) # Yes, Virginia, the extra newline is part of the spec.
570 signed_headers = ";".join([key for key, _ in ordered_headers])
572 if query_parameters is None:
573 query_parameters = {}
574 else:
575 query_parameters = {key: value or "" for key, value in query_parameters.items()}
577 query_parameters["X-Goog-Algorithm"] = "GOOG4-RSA-SHA256"
578 query_parameters["X-Goog-Credential"] = credential
579 query_parameters["X-Goog-Date"] = request_timestamp
580 query_parameters["X-Goog-Expires"] = expiration_seconds
581 query_parameters["X-Goog-SignedHeaders"] = signed_headers
583 if response_type is not None:
584 query_parameters["response-content-type"] = response_type
586 if response_disposition is not None:
587 query_parameters["response-content-disposition"] = response_disposition
589 if generation is not None:
590 query_parameters["generation"] = generation
592 canonical_query_string = _url_encode(query_parameters)
594 lowercased_headers = dict(ordered_headers)
596 if "x-goog-content-sha256" in lowercased_headers:
597 payload = lowercased_headers["x-goog-content-sha256"]
598 else:
599 payload = "UNSIGNED-PAYLOAD"
601 canonical_elements = [
602 method,
603 resource,
604 canonical_query_string,
605 canonical_header_string,
606 signed_headers,
607 payload,
608 ]
609 canonical_request = "\n".join(canonical_elements)
611 canonical_request_hash = hashlib.sha256(
612 canonical_request.encode("ascii")
613 ).hexdigest()
615 string_elements = [
616 "GOOG4-RSA-SHA256",
617 request_timestamp,
618 credential_scope,
619 canonical_request_hash,
620 ]
621 string_to_sign = "\n".join(string_elements)
623 if access_token and service_account_email:
624 signature = _sign_message(string_to_sign, access_token, service_account_email)
625 signature_bytes = base64.b64decode(signature)
626 signature = binascii.hexlify(signature_bytes).decode("ascii")
627 else:
628 signature_bytes = credentials.sign_bytes(string_to_sign.encode("ascii"))
629 signature = binascii.hexlify(signature_bytes).decode("ascii")
631 return "{}{}?{}&X-Goog-Signature={}".format(
632 api_access_endpoint, resource, canonical_query_string, signature
633 )
636def get_v4_now_dtstamps():
637 """Get current timestamp and datestamp in V4 valid format.
639 :rtype: str, str
640 :returns: Current timestamp, datestamp.
641 """
642 now = NOW()
643 timestamp = now.strftime("%Y%m%dT%H%M%SZ")
644 datestamp = now.date().strftime("%Y%m%d")
645 return timestamp, datestamp
648def _sign_message(message, access_token, service_account_email):
650 """Signs a message.
652 :type message: str
653 :param message: The message to be signed.
655 :type access_token: str
656 :param access_token: Access token for a service account.
659 :type service_account_email: str
660 :param service_account_email: E-mail address of the service account.
662 :raises: :exc:`TransportError` if an `access_token` is unauthorized.
664 :rtype: str
665 :returns: The signature of the message.
667 """
668 message = _helpers._to_bytes(message)
670 method = "POST"
671 url = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{}:signBlob?alt=json".format(
672 service_account_email
673 )
674 headers = {
675 "Authorization": "Bearer " + access_token,
676 "Content-type": "application/json",
677 }
678 body = json.dumps({"payload": base64.b64encode(message).decode("utf-8")})
680 request = requests.Request()
681 response = request(url=url, method=method, body=body, headers=headers)
683 if response.status != http.client.OK:
684 raise exceptions.TransportError(
685 f"Error calling the IAM signBytes API: {response.data}"
686 )
688 data = json.loads(response.data.decode("utf-8"))
689 return data["signedBlob"]
692def _url_encode(query_params):
693 """Encode query params into URL.
695 :type query_params: dict
696 :param query_params: Query params to be encoded.
698 :rtype: str
699 :returns: URL encoded query params.
700 """
701 params = [
702 f"{_quote_param(name)}={_quote_param(value)}"
703 for name, value in query_params.items()
704 ]
706 return "&".join(sorted(params))
709def _quote_param(param):
710 """Quote query param.
712 :type param: Any
713 :param param: Query param to be encoded.
715 :rtype: str
716 :returns: URL encoded query param.
717 """
718 if not isinstance(param, bytes):
719 param = str(param)
720 return urllib.parse.quote(param, safe="~")