1# Copyright 2017 Google LLC 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#      http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14 
    15"""RSA verifier and signer that use the ``cryptography`` library. 
    16 
    17This is a much faster implementation than the default (in 
    18``google.auth.crypt._python_rsa``), which depends on the pure-Python 
    19``rsa`` library. 
    20""" 
    21 
    22import cryptography.exceptions 
    23from cryptography.hazmat import backends 
    24from cryptography.hazmat.primitives import hashes 
    25from cryptography.hazmat.primitives import serialization 
    26from cryptography.hazmat.primitives.asymmetric import padding 
    27import cryptography.x509 
    28 
    29from google.auth import _helpers 
    30from google.auth.crypt import base 
    31 
    32_CERTIFICATE_MARKER = b"-----BEGIN CERTIFICATE-----" 
    33_BACKEND = backends.default_backend() 
    34_PADDING = padding.PKCS1v15() 
    35_SHA256 = hashes.SHA256() 
    36 
    37 
    38class RSAVerifier(base.Verifier): 
    39    """Verifies RSA cryptographic signatures using public keys. 
    40 
    41    Args: 
    42        public_key ( 
    43                cryptography.hazmat.primitives.asymmetric.rsa.RSAPublicKey): 
    44            The public key used to verify signatures. 
    45    """ 
    46 
    47    def __init__(self, public_key): 
    48        self._pubkey = public_key 
    49 
    50    @_helpers.copy_docstring(base.Verifier) 
    51    def verify(self, message, signature): 
    52        message = _helpers.to_bytes(message) 
    53        try: 
    54            self._pubkey.verify(signature, message, _PADDING, _SHA256) 
    55            return True 
    56        except (ValueError, cryptography.exceptions.InvalidSignature): 
    57            return False 
    58 
    59    @classmethod 
    60    def from_string(cls, public_key): 
    61        """Construct an Verifier instance from a public key or public 
    62        certificate string. 
    63 
    64        Args: 
    65            public_key (Union[str, bytes]): The public key in PEM format or the 
    66                x509 public key certificate. 
    67 
    68        Returns: 
    69            Verifier: The constructed verifier. 
    70 
    71        Raises: 
    72            ValueError: If the public key can't be parsed. 
    73        """ 
    74        public_key_data = _helpers.to_bytes(public_key) 
    75 
    76        if _CERTIFICATE_MARKER in public_key_data: 
    77            cert = cryptography.x509.load_pem_x509_certificate( 
    78                public_key_data, _BACKEND 
    79            ) 
    80            pubkey = cert.public_key() 
    81 
    82        else: 
    83            pubkey = serialization.load_pem_public_key(public_key_data, _BACKEND) 
    84 
    85        return cls(pubkey) 
    86 
    87 
    88class RSASigner(base.Signer, base.FromServiceAccountMixin): 
    89    """Signs messages with an RSA private key. 
    90 
    91    Args: 
    92        private_key ( 
    93                cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey): 
    94            The private key to sign with. 
    95        key_id (str): Optional key ID used to identify this private key. This 
    96            can be useful to associate the private key with its associated 
    97            public key or certificate. 
    98    """ 
    99 
    100    def __init__(self, private_key, key_id=None): 
    101        self._key = private_key 
    102        self._key_id = key_id 
    103 
    104    @property  # type: ignore 
    105    @_helpers.copy_docstring(base.Signer) 
    106    def key_id(self): 
    107        return self._key_id 
    108 
    109    @_helpers.copy_docstring(base.Signer) 
    110    def sign(self, message): 
    111        message = _helpers.to_bytes(message) 
    112        return self._key.sign(message, _PADDING, _SHA256) 
    113 
    114    @classmethod 
    115    def from_string(cls, key, key_id=None): 
    116        """Construct a RSASigner from a private key in PEM format. 
    117 
    118        Args: 
    119            key (Union[bytes, str]): Private key in PEM format. 
    120            key_id (str): An optional key id used to identify the private key. 
    121 
    122        Returns: 
    123            google.auth.crypt._cryptography_rsa.RSASigner: The 
    124            constructed signer. 
    125 
    126        Raises: 
    127            ValueError: If ``key`` is not ``bytes`` or ``str`` (unicode). 
    128            UnicodeDecodeError: If ``key`` is ``bytes`` but cannot be decoded 
    129                into a UTF-8 ``str``. 
    130            ValueError: If ``cryptography`` "Could not deserialize key data." 
    131        """ 
    132        key = _helpers.to_bytes(key) 
    133        private_key = serialization.load_pem_private_key( 
    134            key, password=None, backend=_BACKEND 
    135        ) 
    136        return cls(private_key, key_id=key_id) 
    137 
    138    def __getstate__(self): 
    139        """Pickle helper that serializes the _key attribute.""" 
    140        state = self.__dict__.copy() 
    141        state["_key"] = self._key.private_bytes( 
    142            encoding=serialization.Encoding.PEM, 
    143            format=serialization.PrivateFormat.PKCS8, 
    144            encryption_algorithm=serialization.NoEncryption(), 
    145        ) 
    146        return state 
    147 
    148    def __setstate__(self, state): 
    149        """Pickle helper that deserializes the _key attribute.""" 
    150        state["_key"] = serialization.load_pem_private_key(state["_key"], None) 
    151        self.__dict__.update(state)