1""" 
    2<Module Name> 
    3  functions.py 
    4 
    5<Author> 
    6  Santiago Torres-Arias <santiago@nyu.edu> 
    7 
    8<Started> 
    9  Nov 15, 2017 
    10 
    11<Copyright> 
    12  See LICENSE for licensing information. 
    13 
    14<Purpose> 
    15  publicly-usable functions for exporting public-keys, signing data and 
    16  verifying signatures. 
    17""" 
    18 
    19import logging 
    20import subprocess 
    21import time 
    22 
    23from securesystemslib import exceptions 
    24from securesystemslib._gpg.common import ( 
    25    get_pubkey_bundle, 
    26    parse_signature_packet, 
    27) 
    28from securesystemslib._gpg.constants import ( 
    29    FULLY_SUPPORTED_MIN_VERSION, 
    30    GPG_TIMEOUT, 
    31    NO_GPG_MSG, 
    32    SHA256, 
    33    gpg_export_pubkey_command, 
    34    gpg_sign_command, 
    35    have_gpg, 
    36) 
    37from securesystemslib._gpg.exceptions import KeyExpirationError 
    38from securesystemslib._gpg.handlers import SIGNATURE_HANDLERS 
    39from securesystemslib._gpg.rsa import CRYPTO 
    40 
    41log = logging.getLogger(__name__) 
    42 
    43NO_CRYPTO_MSG = "GPG support requires the cryptography library" 
    44 
    45 
    46def create_signature(content, keyid=None, homedir=None, timeout=GPG_TIMEOUT): 
    47    """ 
    48    <Purpose> 
    49      Calls the gpg command line utility to sign the passed content with the key 
    50      identified by the passed keyid from the gpg keyring at the passed homedir. 
    51 
    52      The executed base command is defined in 
    53      securesystemslib._gpg.constants.gpg_sign_command. 
    54 
    55      NOTE: On not fully supported versions of GPG, i.e. versions below 
    56      securesystemslib._gpg.constants.FULLY_SUPPORTED_MIN_VERSION the returned 
    57      signature does not contain the full keyid. As a work around, we export the 
    58      public key bundle identified by the short keyid to compute the full keyid 
    59      and add it to the returned signature. 
    60 
    61    <Arguments> 
    62      content: 
    63              The content to be signed. (bytes) 
    64 
    65      keyid: (optional) 
    66              The keyid of the gpg signing keyid. If not passed the default 
    67              key in the keyring is used. 
    68 
    69      homedir: (optional) 
    70              Path to the gpg keyring. If not passed the default keyring is used. 
    71 
    72      timeout (optional): 
    73              gpg command timeout in seconds. Default is 10. 
    74 
    75    <Exceptions> 
    76 
    77      ValueError: 
    78              If the gpg command failed to create a valid signature. 
    79 
    80      OSError: 
    81              If the gpg command is not present, or non-executable, 
    82              or returned a non-zero exit code 
    83 
    84      securesystemslib.exceptions.UnsupportedLibraryError: 
    85              If the gpg command is not available, or 
    86              the cryptography library is not installed. 
    87 
    88      securesystemslib._gpg.exceptions.KeyNotFoundError: 
    89              If the used gpg version is not fully supported 
    90              and no public key can be found for short keyid. 
    91 
    92    <Side Effects> 
    93      None. 
    94 
    95    <Returns> 
    96      A signature dict. 
    97 
    98    """ 
    99    if not have_gpg():  # pragma: no cover 
    100        raise exceptions.UnsupportedLibraryError(NO_GPG_MSG) 
    101 
    102    if not CRYPTO:  # pragma: no cover 
    103        raise exceptions.UnsupportedLibraryError(NO_CRYPTO_MSG) 
    104 
    105    keyarg = "" 
    106    if keyid: 
    107        keyarg = f"--local-user {keyid}" 
    108 
    109    homearg = "" 
    110    if homedir: 
    111        homearg = f"--homedir {homedir}".replace("\\", "/") 
    112 
    113    command = gpg_sign_command(keyarg=keyarg, homearg=homearg) 
    114 
    115    gpg_process = subprocess.run(  # noqa: S603 
    116        command, 
    117        input=content, 
    118        check=False, 
    119        capture_output=True, 
    120        timeout=timeout, 
    121    ) 
    122 
    123    # TODO: It's suggested to take a look at `--status-fd` for proper error 
    124    # reporting, as there is no clear distinction between the return codes 
    125    # https://lists.gnupg.org/pipermail/gnupg-devel/2005-December/022559.html 
    126    if gpg_process.returncode != 0: 
    127        raise OSError( 
    128            f"Command '{gpg_process.args}' returned " 
    129            f"non-zero exit status '{gpg_process.returncode}', " 
    130            f"stderr was:\n{gpg_process.stderr.decode()}." 
    131        ) 
    132 
    133    signature_data = gpg_process.stdout 
    134    signature = parse_signature_packet(signature_data) 
    135 
    136    # On GPG < 2.1 we cannot derive the full keyid from the signature data. 
    137    # Instead we try to compute the keyid from the public part of the signing 
    138    # key or its subkeys, identified by the short keyid. 
    139    # parse_signature_packet is guaranteed to return at least one of keyid or 
    140    # short_keyid. 
    141    # Exclude the following code from coverage for consistent coverage across 
    142    # test environments. 
    143    if not signature["keyid"]:  # pragma: no cover 
    144        log.warning( 
    145            "The created signature does not include the hashed subpacket" 
    146            " '33' (full keyid). You probably have a gpg version" 
    147            f" <{FULLY_SUPPORTED_MIN_VERSION}." 
    148            " We will export the public keys associated with the short keyid to" 
    149            " compute the full keyid." 
    150        ) 
    151 
    152        short_keyid = signature["short_keyid"] 
    153 
    154        # Export public key bundle (master key including with optional subkeys) 
    155        public_key_bundle = export_pubkey(short_keyid, homedir) 
    156 
    157        # Test if the short keyid matches the master key ... 
    158        master_key_full_keyid = public_key_bundle["keyid"] 
    159        if master_key_full_keyid.endswith(short_keyid.lower()): 
    160            signature["keyid"] = master_key_full_keyid 
    161 
    162        # ... or one of the subkeys, and add the full keyid to the signature dict. 
    163        else: 
    164            for sub_key_full_keyid in list(public_key_bundle.get("subkeys", {}).keys()): 
    165                if sub_key_full_keyid.endswith(short_keyid.lower()): 
    166                    signature["keyid"] = sub_key_full_keyid 
    167                    break 
    168 
    169    # If there is still no full keyid something went wrong 
    170    if not signature["keyid"]:  # pragma: no cover 
    171        raise ValueError( 
    172            f"Full keyid could not be determined for signature '{signature}'" 
    173        ) 
    174 
    175    # It is okay now to remove the optional short keyid to save space 
    176    signature.pop("short_keyid", None) 
    177 
    178    return signature 
    179 
    180 
    181def verify_signature(signature_object, pubkey_info, content): 
    182    """ 
    183    <Purpose> 
    184      Verifies the passed signature against the passed content using the 
    185      passed public key, or one of its subkeys, associated by the signature's 
    186      keyid. 
    187 
    188      The function selects the appropriate verification algorithm (rsa or dsa) 
    189      based on the "type" field in the passed public key object. 
    190 
    191    <Arguments> 
    192      signature_object: 
    193              A signature dict. 
    194 
    195      pubkey_info: 
    196              A public key dict. 
    197 
    198      content: 
    199              The content to be verified. (bytes) 
    200 
    201    <Exceptions> 
    202      securesystemslib._gpg.exceptions.KeyExpirationError: 
    203              if the passed public key has expired 
    204 
    205      securesystemslib.exceptions.UnsupportedLibraryError: 
    206              if the cryptography module is unavailable 
    207 
    208    <Side Effects> 
    209      None. 
    210 
    211    <Returns> 
    212      True if signature verification passes, False otherwise. 
    213 
    214    """ 
    215    if not CRYPTO:  # pragma: no cover 
    216        raise exceptions.UnsupportedLibraryError(NO_CRYPTO_MSG) 
    217 
    218    handler = SIGNATURE_HANDLERS[pubkey_info["type"]] 
    219    sig_keyid = signature_object["keyid"] 
    220 
    221    verification_key = pubkey_info 
    222 
    223    # If the keyid on the signature matches a subkey of the passed key, 
    224    # we use that subkey for verification instead of the master key. 
    225    if sig_keyid in list(pubkey_info.get("subkeys", {}).keys()): 
    226        verification_key = pubkey_info["subkeys"][sig_keyid] 
    227 
    228    creation_time = verification_key.get("creation_time") 
    229    validity_period = verification_key.get("validity_period") 
    230 
    231    if ( 
    232        creation_time 
    233        and validity_period 
    234        and creation_time + validity_period < time.time() 
    235    ): 
    236        raise KeyExpirationError(verification_key) 
    237 
    238    return handler.verify_signature(signature_object, verification_key, content, SHA256) 
    239 
    240 
    241def export_pubkey(keyid, homedir=None, timeout=GPG_TIMEOUT): 
    242    """Exports a public key from a GnuPG keyring. 
    243 
    244    Arguments: 
    245      keyid: An OpenPGP keyid.. 
    246      homedir (optional): A path to the GnuPG home directory. If not set the 
    247          default GnuPG home directory is used. 
    248      timeout (optional): gpg command timeout in seconds. Default is 10. 
    249 
    250    Raises: 
    251      UnsupportedLibraryError: The gpg command or pyca/cryptography are not 
    252          available. 
    253      KeyNotFoundError: No key or subkey was found for that keyid. 
    254 
    255    Side Effects: 
    256      Calls system gpg command in a subprocess. 
    257 
    258    Returns: 
    259      An OpenPGP public key dict. 
    260 
    261    """ 
    262    if not have_gpg():  # pragma: no cover 
    263        raise exceptions.UnsupportedLibraryError(NO_GPG_MSG) 
    264 
    265    if not CRYPTO:  # pragma: no cover 
    266        raise exceptions.UnsupportedLibraryError(NO_CRYPTO_MSG) 
    267 
    268    homearg = "" 
    269    if homedir: 
    270        homearg = f"--homedir {homedir}".replace("\\", "/") 
    271 
    272    # TODO: Consider adopting command error handling from `create_signature` 
    273    # above, e.g. in a common 'run gpg command' utility function 
    274    command = gpg_export_pubkey_command(keyid=keyid, homearg=homearg) 
    275    gpg_process = subprocess.run(  # noqa: S603 
    276        command, 
    277        capture_output=True, 
    278        timeout=timeout, 
    279        check=True, 
    280    ) 
    281 
    282    key_packet = gpg_process.stdout 
    283    key_bundle = get_pubkey_bundle(key_packet, keyid) 
    284 
    285    return key_bundle 
    286 
    287 
    288def export_pubkeys(keyids, homedir=None, timeout=GPG_TIMEOUT): 
    289    """Exports multiple public keys from a GnuPG keyring. 
    290 
    291    Arguments: 
    292      keyids: A list of OpenPGP keyids. 
    293      homedir (optional): A path to the GnuPG home directory. If not set the 
    294          default GnuPG home directory is used. 
    295      timeout (optional): gpg command timeout in seconds. Default is 10. 
    296 
    297    Raises: 
    298      TypeError: Keyids is not iterable. 
    299      ValueError: A Keyid is not a string. 
    300      UnsupportedLibraryError: The gpg command or pyca/cryptography are not 
    301          available. 
    302      KeyNotFoundError: No key or subkey was found for that keyid. 
    303 
    304    Side Effects: 
    305      Calls system gpg command in a subprocess. 
    306 
    307    Returns: 
    308      A dict of OpenPGP public key dicts as values, 
    309      and their keyids as dict keys. 
    310 
    311 
    312    """ 
    313    public_key_dict = {} 
    314    for gpg_keyid in keyids: 
    315        public_key = export_pubkey(gpg_keyid, homedir=homedir, timeout=timeout) 
    316        keyid = public_key["keyid"] 
    317        public_key_dict[keyid] = public_key 
    318 
    319    return public_key_dict