1# Copyright 2020 Google LLC 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#      http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14 
    15"""Helper functions for getting mTLS cert and key.""" 
    16 
    17import json 
    18import logging 
    19from os import environ, path 
    20import re 
    21import subprocess 
    22 
    23from google.auth import exceptions 
    24 
    25CONTEXT_AWARE_METADATA_PATH = "~/.secureConnect/context_aware_metadata.json" 
    26CERTIFICATE_CONFIGURATION_DEFAULT_PATH = "~/.config/gcloud/certificate_config.json" 
    27_CERTIFICATE_CONFIGURATION_ENV = "GOOGLE_API_CERTIFICATE_CONFIG" 
    28_CERT_PROVIDER_COMMAND = "cert_provider_command" 
    29_CERT_REGEX = re.compile( 
    30    b"-----BEGIN CERTIFICATE-----.+-----END CERTIFICATE-----\r?\n?", re.DOTALL 
    31) 
    32 
    33# support various format of key files, e.g. 
    34# "-----BEGIN PRIVATE KEY-----...", 
    35# "-----BEGIN EC PRIVATE KEY-----...", 
    36# "-----BEGIN RSA PRIVATE KEY-----..." 
    37# "-----BEGIN ENCRYPTED PRIVATE KEY-----" 
    38_KEY_REGEX = re.compile( 
    39    b"-----BEGIN [A-Z ]*PRIVATE KEY-----.+-----END [A-Z ]*PRIVATE KEY-----\r?\n?", 
    40    re.DOTALL, 
    41) 
    42 
    43_LOGGER = logging.getLogger(__name__) 
    44 
    45 
    46_PASSPHRASE_REGEX = re.compile( 
    47    b"-----BEGIN PASSPHRASE-----(.+)-----END PASSPHRASE-----", re.DOTALL 
    48) 
    49 
    50 
    51def _check_config_path(config_path): 
    52    """Checks for config file path. If it exists, returns the absolute path with user expansion; 
    53    otherwise returns None. 
    54 
    55    Args: 
    56        config_path (str): The config file path for either context_aware_metadata.json or certificate_config.json for example 
    57 
    58    Returns: 
    59        str: absolute path if exists and None otherwise. 
    60    """ 
    61    config_path = path.expanduser(config_path) 
    62    if not path.exists(config_path): 
    63        _LOGGER.debug("%s is not found.", config_path) 
    64        return None 
    65    return config_path 
    66 
    67 
    68def _load_json_file(path): 
    69    """Reads and loads JSON from the given path. Used to read both X509 workload certificate and 
    70    secure connect configurations. 
    71 
    72    Args: 
    73        path (str): the path to read from. 
    74 
    75    Returns: 
    76        Dict[str, str]: The JSON stored at the file. 
    77 
    78    Raises: 
    79        google.auth.exceptions.ClientCertError: If failed to parse the file as JSON. 
    80    """ 
    81    try: 
    82        with open(path) as f: 
    83            json_data = json.load(f) 
    84    except ValueError as caught_exc: 
    85        new_exc = exceptions.ClientCertError(caught_exc) 
    86        raise new_exc from caught_exc 
    87 
    88    return json_data 
    89 
    90 
    91def _get_workload_cert_and_key(certificate_config_path=None): 
    92    """Read the workload identity cert and key files specified in the certificate config provided. 
    93    If no config path is provided, check the environment variable: "GOOGLE_API_CERTIFICATE_CONFIG" 
    94    first, then the well known gcloud location: "~/.config/gcloud/certificate_config.json". 
    95 
    96    Args: 
    97        certificate_config_path (string): The certificate config path. If no path is provided, 
    98        the environment variable will be checked first, then the well known gcloud location. 
    99 
    100    Returns: 
    101        Tuple[Optional[bytes], Optional[bytes]]: client certificate bytes in PEM format and key 
    102            bytes in PEM format. 
    103 
    104    Raises: 
    105        google.auth.exceptions.ClientCertError: if problems occurs when retrieving 
    106        the certificate or key information. 
    107    """ 
    108 
    109    cert_path, key_path = _get_workload_cert_and_key_paths(certificate_config_path) 
    110 
    111    if cert_path is None and key_path is None: 
    112        return None, None 
    113 
    114    return _read_cert_and_key_files(cert_path, key_path) 
    115 
    116 
    117def _get_cert_config_path(certificate_config_path=None): 
    118    """Get the certificate configuration path based on the following order: 
    119 
    120    1: Explicit override, if set 
    121    2: Environment variable, if set 
    122    3: Well-known location 
    123 
    124    Returns "None" if the selected config file does not exist. 
    125 
    126    Args: 
    127        certificate_config_path (string): The certificate config path. If provided, the well known 
    128        location and environment variable will be ignored. 
    129 
    130    Returns: 
    131        The absolute path of the certificate config file, and None if the file does not exist. 
    132    """ 
    133 
    134    if certificate_config_path is None: 
    135        env_path = environ.get(_CERTIFICATE_CONFIGURATION_ENV, None) 
    136        if env_path is not None and env_path != "": 
    137            certificate_config_path = env_path 
    138        else: 
    139            certificate_config_path = CERTIFICATE_CONFIGURATION_DEFAULT_PATH 
    140 
    141    certificate_config_path = path.expanduser(certificate_config_path) 
    142    if not path.exists(certificate_config_path): 
    143        return None 
    144    return certificate_config_path 
    145 
    146 
    147def _get_workload_cert_and_key_paths(config_path): 
    148    absolute_path = _get_cert_config_path(config_path) 
    149    if absolute_path is None: 
    150        return None, None 
    151 
    152    data = _load_json_file(absolute_path) 
    153 
    154    if "cert_configs" not in data: 
    155        raise exceptions.ClientCertError( 
    156            'Certificate config file {} is in an invalid format, a "cert configs" object is expected'.format( 
    157                absolute_path 
    158            ) 
    159        ) 
    160    cert_configs = data["cert_configs"] 
    161 
    162    if "workload" not in cert_configs: 
    163        raise exceptions.ClientCertError( 
    164            'Certificate config file {} is in an invalid format, a "workload" cert config is expected'.format( 
    165                absolute_path 
    166            ) 
    167        ) 
    168    workload = cert_configs["workload"] 
    169 
    170    if "cert_path" not in workload: 
    171        raise exceptions.ClientCertError( 
    172            'Certificate config file {} is in an invalid format, a "cert_path" is expected in the workload cert config'.format( 
    173                absolute_path 
    174            ) 
    175        ) 
    176    cert_path = workload["cert_path"] 
    177 
    178    if "key_path" not in workload: 
    179        raise exceptions.ClientCertError( 
    180            'Certificate config file {} is in an invalid format, a "key_path" is expected in the workload cert config'.format( 
    181                absolute_path 
    182            ) 
    183        ) 
    184    key_path = workload["key_path"] 
    185 
    186    return cert_path, key_path 
    187 
    188 
    189def _read_cert_and_key_files(cert_path, key_path): 
    190    cert_data = _read_cert_file(cert_path) 
    191    key_data = _read_key_file(key_path) 
    192 
    193    return cert_data, key_data 
    194 
    195 
    196def _read_cert_file(cert_path): 
    197    with open(cert_path, "rb") as cert_file: 
    198        cert_data = cert_file.read() 
    199 
    200    cert_match = re.findall(_CERT_REGEX, cert_data) 
    201    if len(cert_match) != 1: 
    202        raise exceptions.ClientCertError( 
    203            "Certificate file {} is in an invalid format, a single PEM formatted certificate is expected".format( 
    204                cert_path 
    205            ) 
    206        ) 
    207    return cert_match[0] 
    208 
    209 
    210def _read_key_file(key_path): 
    211    with open(key_path, "rb") as key_file: 
    212        key_data = key_file.read() 
    213 
    214    key_match = re.findall(_KEY_REGEX, key_data) 
    215    if len(key_match) != 1: 
    216        raise exceptions.ClientCertError( 
    217            "Private key file {} is in an invalid format, a single PEM formatted private key is expected".format( 
    218                key_path 
    219            ) 
    220        ) 
    221 
    222    return key_match[0] 
    223 
    224 
    225def _run_cert_provider_command(command, expect_encrypted_key=False): 
    226    """Run the provided command, and return client side mTLS cert, key and 
    227    passphrase. 
    228 
    229    Args: 
    230        command (List[str]): cert provider command. 
    231        expect_encrypted_key (bool): If encrypted private key is expected. 
    232 
    233    Returns: 
    234        Tuple[bytes, bytes, bytes]: client certificate bytes in PEM format, key 
    235            bytes in PEM format and passphrase bytes. 
    236 
    237    Raises: 
    238        google.auth.exceptions.ClientCertError: if problems occurs when running 
    239            the cert provider command or generating cert, key and passphrase. 
    240    """ 
    241    try: 
    242        process = subprocess.Popen( 
    243            command, stdout=subprocess.PIPE, stderr=subprocess.PIPE 
    244        ) 
    245        stdout, stderr = process.communicate() 
    246    except OSError as caught_exc: 
    247        new_exc = exceptions.ClientCertError(caught_exc) 
    248        raise new_exc from caught_exc 
    249 
    250    # Check cert provider command execution error. 
    251    if process.returncode != 0: 
    252        raise exceptions.ClientCertError( 
    253            "Cert provider command returns non-zero status code %s" % process.returncode 
    254        ) 
    255 
    256    # Extract certificate (chain), key and passphrase. 
    257    cert_match = re.findall(_CERT_REGEX, stdout) 
    258    if len(cert_match) != 1: 
    259        raise exceptions.ClientCertError("Client SSL certificate is missing or invalid") 
    260    key_match = re.findall(_KEY_REGEX, stdout) 
    261    if len(key_match) != 1: 
    262        raise exceptions.ClientCertError("Client SSL key is missing or invalid") 
    263    passphrase_match = re.findall(_PASSPHRASE_REGEX, stdout) 
    264 
    265    if expect_encrypted_key: 
    266        if len(passphrase_match) != 1: 
    267            raise exceptions.ClientCertError("Passphrase is missing or invalid") 
    268        if b"ENCRYPTED" not in key_match[0]: 
    269            raise exceptions.ClientCertError("Encrypted private key is expected") 
    270        return cert_match[0], key_match[0], passphrase_match[0].strip() 
    271 
    272    if b"ENCRYPTED" in key_match[0]: 
    273        raise exceptions.ClientCertError("Encrypted private key is not expected") 
    274    if len(passphrase_match) > 0: 
    275        raise exceptions.ClientCertError("Passphrase is not expected") 
    276    return cert_match[0], key_match[0], None 
    277 
    278 
    279def get_client_ssl_credentials( 
    280    generate_encrypted_key=False, 
    281    context_aware_metadata_path=CONTEXT_AWARE_METADATA_PATH, 
    282    certificate_config_path=CERTIFICATE_CONFIGURATION_DEFAULT_PATH, 
    283): 
    284    """Returns the client side certificate, private key and passphrase. 
    285 
    286    We look for certificates and keys with the following order of priority: 
    287        1. Certificate and key specified by certificate_config.json. 
    288               Currently, only X.509 workload certificates are supported. 
    289        2. Certificate and key specified by context aware metadata (i.e. SecureConnect). 
    290 
    291    Args: 
    292        generate_encrypted_key (bool): If set to True, encrypted private key 
    293            and passphrase will be generated; otherwise, unencrypted private key 
    294            will be generated and passphrase will be None. This option only 
    295            affects keys obtained via context_aware_metadata.json. 
    296        context_aware_metadata_path (str): The context_aware_metadata.json file path. 
    297        certificate_config_path (str): The certificate_config.json file path. 
    298 
    299    Returns: 
    300        Tuple[bool, bytes, bytes, bytes]: 
    301            A boolean indicating if cert, key and passphrase are obtained, the 
    302            cert bytes and key bytes both in PEM format, and passphrase bytes. 
    303 
    304    Raises: 
    305        google.auth.exceptions.ClientCertError: if problems occurs when getting 
    306            the cert, key and passphrase. 
    307    """ 
    308 
    309    # 1. Check for certificate config json. 
    310    cert_config_path = _check_config_path(certificate_config_path) 
    311    if cert_config_path: 
    312        # Attempt to retrieve X.509 Workload cert and key. 
    313        cert, key = _get_workload_cert_and_key(cert_config_path) 
    314        if cert and key: 
    315            return True, cert, key, None 
    316 
    317    # 2. Check for context aware metadata json 
    318    metadata_path = _check_config_path(context_aware_metadata_path) 
    319 
    320    if metadata_path: 
    321        metadata_json = _load_json_file(metadata_path) 
    322 
    323        if _CERT_PROVIDER_COMMAND not in metadata_json: 
    324            raise exceptions.ClientCertError("Cert provider command is not found") 
    325 
    326        command = metadata_json[_CERT_PROVIDER_COMMAND] 
    327 
    328        if generate_encrypted_key and "--with_passphrase" not in command: 
    329            command.append("--with_passphrase") 
    330 
    331        # Execute the command. 
    332        cert, key, passphrase = _run_cert_provider_command( 
    333            command, expect_encrypted_key=generate_encrypted_key 
    334        ) 
    335        return True, cert, key, passphrase 
    336 
    337    return False, None, None, None 
    338 
    339 
    340def get_client_cert_and_key(client_cert_callback=None): 
    341    """Returns the client side certificate and private key. The function first 
    342    tries to get certificate and key from client_cert_callback; if the callback 
    343    is None or doesn't provide certificate and key, the function tries application 
    344    default SSL credentials. 
    345 
    346    Args: 
    347        client_cert_callback (Optional[Callable[[], (bytes, bytes)]]): An 
    348            optional callback which returns client certificate bytes and private 
    349            key bytes both in PEM format. 
    350 
    351    Returns: 
    352        Tuple[bool, bytes, bytes]: 
    353            A boolean indicating if cert and key are obtained, the cert bytes 
    354            and key bytes both in PEM format. 
    355 
    356    Raises: 
    357        google.auth.exceptions.ClientCertError: if problems occurs when getting 
    358            the cert and key. 
    359    """ 
    360    if client_cert_callback: 
    361        cert, key = client_cert_callback() 
    362        return True, cert, key 
    363 
    364    has_cert, cert, key, _ = get_client_ssl_credentials(generate_encrypted_key=False) 
    365    return has_cert, cert, key 
    366 
    367 
    368def decrypt_private_key(key, passphrase): 
    369    """A helper function to decrypt the private key with the given passphrase. 
    370    google-auth library doesn't support passphrase protected private key for 
    371    mutual TLS channel. This helper function can be used to decrypt the 
    372    passphrase protected private key in order to estalish mutual TLS channel. 
    373 
    374    For example, if you have a function which produces client cert, passphrase 
    375    protected private key and passphrase, you can convert it to a client cert 
    376    callback function accepted by google-auth:: 
    377 
    378        from google.auth.transport import _mtls_helper 
    379 
    380        def your_client_cert_function(): 
    381            return cert, encrypted_key, passphrase 
    382 
    383        # callback accepted by google-auth for mutual TLS channel. 
    384        def client_cert_callback(): 
    385            cert, encrypted_key, passphrase = your_client_cert_function() 
    386            decrypted_key = _mtls_helper.decrypt_private_key(encrypted_key, 
    387                passphrase) 
    388            return cert, decrypted_key 
    389 
    390    Args: 
    391        key (bytes): The private key bytes in PEM format. 
    392        passphrase (bytes): The passphrase bytes. 
    393 
    394    Returns: 
    395        bytes: The decrypted private key in PEM format. 
    396 
    397    Raises: 
    398        ImportError: If pyOpenSSL is not installed. 
    399        OpenSSL.crypto.Error: If there is any problem decrypting the private key. 
    400    """ 
    401    from OpenSSL import crypto 
    402 
    403    # First convert encrypted_key_bytes to PKey object 
    404    pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key, passphrase=passphrase) 
    405 
    406    # Then dump the decrypted key bytes 
    407    return crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)