1# Copyright 2020 Google LLC 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#      http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14 
    15"""Utilites for mutual TLS.""" 
    16 
    17from google.auth import exceptions 
    18from google.auth.transport import _mtls_helper 
    19 
    20 
    21def has_default_client_cert_source(): 
    22    """Check if default client SSL credentials exists on the device. 
    23 
    24    Returns: 
    25        bool: indicating if the default client cert source exists. 
    26    """ 
    27    if ( 
    28        _mtls_helper._check_config_path(_mtls_helper.CONTEXT_AWARE_METADATA_PATH) 
    29        is not None 
    30    ): 
    31        return True 
    32    if ( 
    33        _mtls_helper._check_config_path( 
    34            _mtls_helper.CERTIFICATE_CONFIGURATION_DEFAULT_PATH 
    35        ) 
    36        is not None 
    37    ): 
    38        return True 
    39    return False 
    40 
    41 
    42def default_client_cert_source(): 
    43    """Get a callback which returns the default client SSL credentials. 
    44 
    45    Returns: 
    46        Callable[[], [bytes, bytes]]: A callback which returns the default 
    47            client certificate bytes and private key bytes, both in PEM format. 
    48 
    49    Raises: 
    50        google.auth.exceptions.DefaultClientCertSourceError: If the default 
    51            client SSL credentials don't exist or are malformed. 
    52    """ 
    53    if not has_default_client_cert_source(): 
    54        raise exceptions.MutualTLSChannelError( 
    55            "Default client cert source doesn't exist" 
    56        ) 
    57 
    58    def callback(): 
    59        try: 
    60            _, cert_bytes, key_bytes = _mtls_helper.get_client_cert_and_key() 
    61        except (OSError, RuntimeError, ValueError) as caught_exc: 
    62            new_exc = exceptions.MutualTLSChannelError(caught_exc) 
    63            raise new_exc from caught_exc 
    64 
    65        return cert_bytes, key_bytes 
    66 
    67    return callback 
    68 
    69 
    70def default_client_encrypted_cert_source(cert_path, key_path): 
    71    """Get a callback which returns the default encrpyted client SSL credentials. 
    72 
    73    Args: 
    74        cert_path (str): The cert file path. The default client certificate will 
    75            be written to this file when the returned callback is called. 
    76        key_path (str): The key file path. The default encrypted client key will 
    77            be written to this file when the returned callback is called. 
    78 
    79    Returns: 
    80        Callable[[], [str, str, bytes]]: A callback which generates the default 
    81            client certificate, encrpyted private key and passphrase. It writes 
    82            the certificate and private key into the cert_path and key_path, and 
    83            returns the cert_path, key_path and passphrase bytes. 
    84 
    85    Raises: 
    86        google.auth.exceptions.DefaultClientCertSourceError: If any problem 
    87            occurs when loading or saving the client certificate and key. 
    88    """ 
    89    if not has_default_client_cert_source(): 
    90        raise exceptions.MutualTLSChannelError( 
    91            "Default client encrypted cert source doesn't exist" 
    92        ) 
    93 
    94    def callback(): 
    95        try: 
    96            ( 
    97                _, 
    98                cert_bytes, 
    99                key_bytes, 
    100                passphrase_bytes, 
    101            ) = _mtls_helper.get_client_ssl_credentials(generate_encrypted_key=True) 
    102            with open(cert_path, "wb") as cert_file: 
    103                cert_file.write(cert_bytes) 
    104            with open(key_path, "wb") as key_file: 
    105                key_file.write(key_bytes) 
    106        except (exceptions.ClientCertError, OSError) as caught_exc: 
    107            new_exc = exceptions.MutualTLSChannelError(caught_exc) 
    108            raise new_exc from caught_exc 
    109 
    110        return cert_path, key_path, passphrase_bytes 
    111 
    112    return callback