1""" 
    2<Module Name> 
    3  common.py 
    4 
    5<Author> 
    6  Santiago Torres-Arias <santiago@nyu.edu> 
    7 
    8<Started> 
    9  Nov 15, 2017 
    10 
    11<Copyright> 
    12  See LICENSE for licensing information. 
    13 
    14<Purpose> 
    15  Provides algorithm-agnostic gpg public key and signature parsing functions. 
    16  The functions select the appropriate functions for each algorithm and 
    17  call them. 
    18 
    19""" 
    20 
    21import binascii 
    22import collections 
    23import logging 
    24import struct 
    25 
    26from securesystemslib._gpg import util as gpg_util 
    27from securesystemslib._gpg.constants import ( 
    28    FULL_KEYID_SUBPACKET, 
    29    GPG_HASH_ALGORITHM_STRING, 
    30    KEY_EXPIRATION_SUBPACKET, 
    31    PACKET_TYPE_PRIMARY_KEY, 
    32    PACKET_TYPE_SIGNATURE, 
    33    PACKET_TYPE_SUB_KEY, 
    34    PACKET_TYPE_USER_ATTR, 
    35    PACKET_TYPE_USER_ID, 
    36    PARTIAL_KEYID_SUBPACKET, 
    37    PRIMARY_USERID_SUBPACKET, 
    38    SHA1, 
    39    SHA256, 
    40    SHA512, 
    41    SIG_CREATION_SUBPACKET, 
    42    SIGNATURE_TYPE_BINARY, 
    43    SIGNATURE_TYPE_CERTIFICATES, 
    44    SIGNATURE_TYPE_SUB_KEY_BINDING, 
    45    SUPPORTED_PUBKEY_PACKET_VERSIONS, 
    46    SUPPORTED_SIGNATURE_PACKET_VERSIONS, 
    47) 
    48from securesystemslib._gpg.exceptions import ( 
    49    KeyNotFoundError, 
    50    PacketParsingError, 
    51    PacketVersionNotSupportedError, 
    52    SignatureAlgorithmNotSupportedError, 
    53) 
    54from securesystemslib._gpg.handlers import ( 
    55    SIGNATURE_HANDLERS, 
    56    SUPPORTED_SIGNATURE_ALGORITHMS, 
    57) 
    58 
    59log = logging.getLogger(__name__) 
    60 
    61 
    62def parse_pubkey_payload(data): 
    63    """ 
    64    <Purpose> 
    65      Parse the passed public-key packet (payload only) and construct a 
    66      public key dictionary. 
    67 
    68    <Arguments> 
    69      data: 
    70            An RFC4880 public key packet payload as described in section 5.5.2. 
    71            (version 4) of the RFC. 
    72 
    73            NOTE: The payload can be parsed from a full key packet (header + 
    74            payload) by using securesystemslib._gpg.util.parse_packet_header. 
    75 
    76            WARNING: this doesn't support armored pubkey packets, so use with 
    77            care. pubkey packets are a little bit more complicated than the 
    78            signature ones 
    79 
    80    <Exceptions> 
    81      ValueError 
    82            If the passed public key data is empty. 
    83 
    84      securesystemslib._gpg.exceptions.PacketVersionNotSupportedError 
    85            If the packet version does not match 
    86            securesystemslib._gpg.constants.SUPPORTED_PUBKEY_PACKET_VERSIONS 
    87 
    88      securesystemslib._gpg.exceptions.SignatureAlgorithmNotSupportedError 
    89            If the signature algorithm does not match one of 
    90            securesystemslib._gpg.constants.SUPPORTED_SIGNATURE_ALGORITHMS 
    91 
    92    <Side Effects> 
    93      None. 
    94 
    95    <Returns> 
    96      A public key dict. 
    97 
    98    """ 
    99    if not data: 
    100        raise ValueError("Could not parse empty pubkey payload.") 
    101 
    102    ptr = 0 
    103    keyinfo = {} 
    104    version_number = data[ptr] 
    105    ptr += 1 
    106    if version_number not in SUPPORTED_PUBKEY_PACKET_VERSIONS: 
    107        raise PacketVersionNotSupportedError( 
    108            f"Pubkey packet version '{version_number}' not supported, " 
    109            f"must be one of {SUPPORTED_PUBKEY_PACKET_VERSIONS}" 
    110        ) 
    111 
    112    # NOTE: Uncomment this line to decode the time of creation 
    113    time_of_creation = struct.unpack(">I", data[ptr : ptr + 4]) 
    114    ptr += 4 
    115 
    116    algorithm = data[ptr] 
    117 
    118    ptr += 1 
    119 
    120    # TODO: Should we only export keys with signing capabilities? 
    121    # Section 5.5.2 of RFC4880 describes a public-key algorithm octet with one 
    122    # of the values described in section 9.1 that could be used to determine the 
    123    # capabilities. However, in case of RSA subkeys this field doesn't seem to 
    124    # correctly encode the capabilities. It always has the value 1, i.e. 
    125    # RSA (Encrypt or Sign). 
    126    # For RSA public keys we would have to parse the subkey's signature created 
    127    # with the master key, for the signature's key flags subpacket, identified 
    128    # by the value 27 (see section 5.2.3.1.) containing a list of binary flags 
    129    # as described in section 5.2.3.21. 
    130    if algorithm not in SUPPORTED_SIGNATURE_ALGORITHMS: 
    131        raise SignatureAlgorithmNotSupportedError( 
    132            f"Signature algorithm '{algorithm}' not " 
    133            "supported, please verify that your gpg configuration is creating " 
    134            "either DSA, RSA, or EdDSA signatures (see RFC4880 9.1. Public-Key " 
    135            "Algorithms)." 
    136        ) 
    137 
    138    keyinfo["type"] = SUPPORTED_SIGNATURE_ALGORITHMS[algorithm]["type"] 
    139    keyinfo["method"] = SUPPORTED_SIGNATURE_ALGORITHMS[algorithm]["method"] 
    140    handler = SIGNATURE_HANDLERS[keyinfo["type"]] 
    141    keyinfo["keyid"] = gpg_util.compute_keyid(data) 
    142    key_params = handler.get_pubkey_params(data[ptr:]) 
    143 
    144    return { 
    145        "method": keyinfo["method"], 
    146        "type": keyinfo["type"], 
    147        "hashes": [GPG_HASH_ALGORITHM_STRING], 
    148        "creation_time": time_of_creation[0], 
    149        "keyid": keyinfo["keyid"], 
    150        "keyval": {"private": "", "public": key_params}, 
    151    } 
    152 
    153 
    154def parse_pubkey_bundle(data): 
    155    """ 
    156    <Purpose> 
    157      Parse packets from passed gpg public key data, associating self-signatures 
    158      with the packets they correspond to, based on the structure of V4 keys 
    159      defined in RFC4880 12.1 Key Structures. 
    160 
    161      The returned raw key bundle may be used to further enrich the master key, 
    162      with certified information (e.g. key expiration date) taken from 
    163      self-signatures, and/or to verify that the parsed subkeys are bound to the 
    164      primary key via signatures. 
    165 
    166    <Arguments> 
    167      data: 
    168            Public key data as written to stdout by gpg_export_pubkey_command. 
    169 
    170    <Exceptions> 
    171      securesystemslib._gpg.exceptions.PacketParsingError 
    172            If data is empty. 
    173            If data cannot be parsed. 
    174 
    175    <Side Effects> 
    176      None. 
    177 
    178    <Returns> 
    179      A raw public key bundle where self-signatures are associated with their 
    180      corresponding packets. See `key_bundle` for details. 
    181 
    182    """ 
    183    if not data: 
    184        raise PacketParsingError("Cannot parse keys from empty gpg data.") 
    185 
    186    # Temporary data structure to hold parsed gpg packets 
    187    key_bundle = { 
    188        PACKET_TYPE_PRIMARY_KEY: {"key": {}, "packet": None, "signatures": []}, 
    189        PACKET_TYPE_USER_ID: collections.OrderedDict(), 
    190        PACKET_TYPE_USER_ATTR: collections.OrderedDict(), 
    191        PACKET_TYPE_SUB_KEY: collections.OrderedDict(), 
    192    } 
    193 
    194    # Iterate over gpg data and parse out packets of different types 
    195    position = 0 
    196    while position < len(data): 
    197        try: 
    198            ( 
    199                packet_type, 
    200                header_len, 
    201                body_len, 
    202                packet_length, 
    203            ) = gpg_util.parse_packet_header(data[position:]) 
    204 
    205            packet = data[position : position + packet_length] 
    206            payload = packet[header_len:] 
    207            # The first (and only the first) packet in the bundle must be the master 
    208            # key.  See RFC4880 12.1 Key Structures, V4 version keys 
    209            # TODO: Do we need additional key structure assertions? e.g. 
    210            # - there must be least one User ID packet, or 
    211            # - order and type of signatures, or 
    212            # - disallow duplicate packets 
    213            if ( 
    214                packet_type != PACKET_TYPE_PRIMARY_KEY 
    215                and not key_bundle[PACKET_TYPE_PRIMARY_KEY]["key"] 
    216            ): 
    217                raise PacketParsingError( 
    218                    "First packet must be a primary key " 
    219                    f"('{PACKET_TYPE_PRIMARY_KEY}'), got '{packet_type}'." 
    220                ) 
    221 
    222            if ( 
    223                packet_type == PACKET_TYPE_PRIMARY_KEY 
    224                and key_bundle[PACKET_TYPE_PRIMARY_KEY]["key"] 
    225            ): 
    226                raise PacketParsingError("Unexpected primary key.") 
    227 
    228            # Fully parse master key to fail early, e.g. if key is malformed 
    229            # or not supported, but also retain original packet for subkey binding 
    230            # signature verification 
    231            if packet_type == PACKET_TYPE_PRIMARY_KEY: 
    232                key_bundle[PACKET_TYPE_PRIMARY_KEY] = { 
    233                    "key": parse_pubkey_payload(bytearray(payload)), 
    234                    "packet": packet, 
    235                    "signatures": [], 
    236                } 
    237 
    238            # Other non-signature packets in the key bundle include User IDs and User 
    239            # Attributes, required to verify primary key certificates, and subkey 
    240            # packets. For each packet we create a new ordered dictionary entry. We 
    241            # use a dictionary to aggregate signatures by packet below, 
    242            # and it must be ordered because each signature packet belongs to the 
    243            # most recently parsed packet of a type. 
    244            elif packet_type in { 
    245                PACKET_TYPE_USER_ID, 
    246                PACKET_TYPE_USER_ATTR, 
    247                PACKET_TYPE_SUB_KEY, 
    248            }: 
    249                key_bundle[packet_type][packet] = { 
    250                    "header_len": header_len, 
    251                    "body_len": body_len, 
    252                    "signatures": [], 
    253                } 
    254 
    255            # The remaining relevant packets are signatures, required to bind subkeys 
    256            # to the primary key, or to gather additional information about the 
    257            # primary key, e.g. expiration date. 
    258            # A signature corresponds to the most recently parsed packet of a type, 
    259            # where the type is given by the availability of respective packets. 
    260            # We test availability and assign accordingly as per the order of packet 
    261            # types defined in RFC4880 12.1 (bottom-up). 
    262            elif packet_type == PACKET_TYPE_SIGNATURE: 
    263                for _type in [ 
    264                    PACKET_TYPE_SUB_KEY, 
    265                    PACKET_TYPE_USER_ATTR, 
    266                    PACKET_TYPE_USER_ID, 
    267                ]: 
    268                    if key_bundle[_type]: 
    269                        # Add to most recently added packet's 
    270                        # signatures of matching type 
    271                        key_bundle[_type][next(reversed(key_bundle[_type]))][ 
    272                            "signatures" 
    273                        ].append(packet) 
    274                        break 
    275 
    276                else: 
    277                    # If no packets are available for any of above types (yet), the 
    278                    # signature belongs to the primary key 
    279                    key_bundle[PACKET_TYPE_PRIMARY_KEY]["signatures"].append(packet) 
    280 
    281            else: 
    282                packets_list = [ 
    283                    PACKET_TYPE_PRIMARY_KEY, 
    284                    PACKET_TYPE_USER_ID, 
    285                    PACKET_TYPE_USER_ATTR, 
    286                    PACKET_TYPE_SUB_KEY, 
    287                    PACKET_TYPE_SIGNATURE, 
    288                ] 
    289                log.info( 
    290                    f"Ignoring gpg key packet '{packet_type}', " 
    291                    "we only handle packets of " 
    292                    f"types '{packets_list}' (see RFC4880 4.3. Packet Tags)." 
    293                ) 
    294 
    295        # Both errors might be raised in parse_packet_header and in this loop 
    296        except (PacketParsingError, IndexError) as e: 
    297            raise PacketParsingError( 
    298                f"Invalid public key data at position {position}: {e}." 
    299            ) 
    300 
    301        # Go to next packet 
    302        position += packet_length 
    303 
    304    return key_bundle 
    305 
    306 
    307def _assign_certified_key_info(bundle): 
    308    """ 
    309    <Purpose> 
    310      Helper function to verify User ID certificates corresponding to a gpg 
    311      master key, in order to enrich the master key with additional information 
    312      (e.g. expiration dates). The enriched master key is returned. 
    313 
    314      NOTE: Currently we only consider User ID certificates. We can do the same 
    315      for User Attribute certificates by iterating over 
    316      bundle[PACKET_TYPE_USER_ATTR] instead of bundle[PACKET_TYPE_USER_ID], and 
    317      replacing the signed_content constant '\xb4'  with '\xd1' (see RFC4880 
    318      section 5.2.4. paragraph 4). 
    319 
    320    <Arguments> 
    321      bundle: 
    322            GPG key bundle as parsed in parse_pubkey_bundle(). 
    323 
    324    <Exceptions> 
    325      None. 
    326 
    327    <Side Effects> 
    328      None. 
    329 
    330    <Returns> 
    331      A public key dict. 
    332 
    333    """ 
    334    # Create handler shortcut 
    335    handler = SIGNATURE_HANDLERS[bundle[PACKET_TYPE_PRIMARY_KEY]["key"]["type"]] 
    336 
    337    is_primary_user = False 
    338    validity_period = None 
    339    sig_creation_time = None 
    340 
    341    # Verify User ID signatures to gather information about primary key 
    342    # (see Notes about certification signatures in RFC 4880 5.2.3.3.) 
    343    for user_id_packet, packet_data in bundle[PACKET_TYPE_USER_ID].items(): 
    344        # Construct signed content (see RFC4880 section 5.2.4. paragraph 4) 
    345        signed_content = ( 
    346            bundle[PACKET_TYPE_PRIMARY_KEY]["packet"] 
    347            + b"\xb4\x00\x00\x00" 
    348            + user_id_packet[1:] 
    349        ) 
    350        for signature_packet in packet_data["signatures"]: 
    351            try: 
    352                signature = parse_signature_packet( 
    353                    signature_packet, 
    354                    supported_hash_algorithms={SHA1, SHA256, SHA512}, 
    355                    supported_signature_types=SIGNATURE_TYPE_CERTIFICATES, 
    356                    include_info=True, 
    357                ) 
    358                # verify_signature requires a "keyid" even if it is short. 
    359                # (see parse_signature_packet for more information about keyids) 
    360                signature["keyid"] = signature["keyid"] or signature["short_keyid"] 
    361 
    362            # TODO: Revise exception taxonomy: 
    363            # It's okay to ignore some exceptions (unsupported algorithms etc.) but 
    364            # we should blow up if a signature is malformed (missing subpackets). 
    365            except Exception as e: 
    366                log.info(e) 
    367                continue 
    368 
    369            if not bundle[PACKET_TYPE_PRIMARY_KEY]["key"]["keyid"].endswith( 
    370                signature["keyid"] 
    371            ): 
    372                log.info( 
    373                    "Ignoring User ID certificate issued by '{}'.".format( 
    374                        signature["keyid"] 
    375                    ) 
    376                ) 
    377                continue 
    378 
    379            is_valid = handler.verify_signature( 
    380                signature, 
    381                bundle[PACKET_TYPE_PRIMARY_KEY]["key"], 
    382                signed_content, 
    383                signature["info"]["hash_algorithm"], 
    384            ) 
    385 
    386            if not is_valid: 
    387                log.info( 
    388                    "Ignoring invalid User ID self-certificate issued by '{}'.".format( 
    389                        signature["keyid"] 
    390                    ) 
    391                ) 
    392                continue 
    393 
    394            # If the signature is valid, we try to extract subpackets relevant to 
    395            # the primary key, i.e. expiration time. 
    396            # NOTE: There might be multiple User IDs per primary key and multiple 
    397            # certificates per User ID. RFC4880 5.2.3.19. and last paragraph of 
    398            # 5.2.3.3. provides some suggestions about ambiguity, but delegates the 
    399            # responsibility to the implementer. 
    400 
    401            # Ambiguity resolution scheme: 
    402            # We take the key expiration time from the most recent certificate, i.e. 
    403            # the certificate with the highest signature creation time. Additionally, 
    404            # we prioritize certificates with primary user id flag set True. Note 
    405            # that, if the ultimately prioritized certificate does not have a key 
    406            # expiration time subpacket, we don't assign one, even if there were 
    407            # certificates of lower priority carrying that subpacket. 
    408            tmp_validity_period = signature["info"]["subpackets"].get( 
    409                KEY_EXPIRATION_SUBPACKET 
    410            ) 
    411 
    412            # No key expiration time, go to next certificate 
    413            if tmp_validity_period is None: 
    414                continue 
    415 
    416            # Create shortcut to mandatory pre-parsed creation time subpacket 
    417            tmp_sig_creation_time = signature["info"]["creation_time"] 
    418 
    419            tmp_is_primary_user = signature["info"]["subpackets"].get( 
    420                PRIMARY_USERID_SUBPACKET 
    421            ) 
    422 
    423            if tmp_is_primary_user is not None: 
    424                tmp_is_primary_user = bool(tmp_is_primary_user[0]) 
    425 
    426            # If we already have a primary user certified expiration date and this 
    427            # is none, we don't consider it, and go to next certificate 
    428            if is_primary_user and not tmp_is_primary_user: 
    429                continue 
    430 
    431            if not sig_creation_time or sig_creation_time < tmp_sig_creation_time: 
    432                # This is the most recent certificate that has a validity_period and 
    433                # doesn't have lower priority in regard to the primary user id flag. We 
    434                # accept it the keys validty_period, until we get a newer value from 
    435                # a certificate with higher priority. 
    436                validity_period = struct.unpack(">I", tmp_validity_period)[0] 
    437                # We also keep track of the used certificate's primary user id flag and 
    438                # the signature creation time, for prioritization. 
    439                is_primary_user = tmp_is_primary_user 
    440                sig_creation_time = tmp_sig_creation_time 
    441 
    442    if validity_period is not None: 
    443        bundle[PACKET_TYPE_PRIMARY_KEY]["key"]["validity_period"] = validity_period 
    444 
    445    return bundle[PACKET_TYPE_PRIMARY_KEY]["key"] 
    446 
    447 
    448def _get_verified_subkeys(bundle): 
    449    """ 
    450    <Purpose> 
    451      Helper function to verify the subkey binding signature for all subkeys in 
    452      the passed bundle in order to enrich subkeys with additional information 
    453      (e.g. expiration dates). Only valid (i.e. parsable) subkeys that are 
    454      verifiably bound to the the master key of the bundle are returned. All 
    455      other subkeys are discarded. 
    456 
    457    <Arguments> 
    458      bundle: 
    459            GPG key bundle as parsed in parse_pubkey_bundle(). 
    460 
    461    <Exceptions> 
    462      None. 
    463 
    464    <Side Effects> 
    465      None. 
    466 
    467    <Returns> 
    468      A dict of public keys dicts with keyids as dict keys. 
    469 
    470    """ 
    471    # Create handler shortcut 
    472    handler = SIGNATURE_HANDLERS[bundle[PACKET_TYPE_PRIMARY_KEY]["key"]["type"]] 
    473 
    474    # Verify subkey binding signatures and only keep verified keys 
    475    # See notes about subkey binding signature in RFC4880 5.2.3.3 
    476    verified_subkeys = {} 
    477    for subkey_packet, packet_data in bundle[PACKET_TYPE_SUB_KEY].items(): 
    478        try: 
    479            # Parse subkey if possible and skip if invalid (e.g. not-supported) 
    480            subkey = parse_pubkey_payload( 
    481                bytearray(subkey_packet[-packet_data["body_len"] :]) 
    482            ) 
    483 
    484        # TODO: Revise exception taxonomy 
    485        except Exception as e: 
    486            log.info(e) 
    487            continue 
    488 
    489        # Construct signed content (see RFC4880 section 5.2.4. paragraph 3) 
    490        signed_content = ( 
    491            bundle[PACKET_TYPE_PRIMARY_KEY]["packet"] + b"\x99" + subkey_packet[1:] 
    492        ) 
    493 
    494        # Filter sub key binding signature from other signatures, e.g. subkey 
    495        # binding revocation signatures 
    496        key_binding_signatures = [] 
    497        for signature_packet in packet_data["signatures"]: 
    498            try: 
    499                signature = parse_signature_packet( 
    500                    signature_packet, 
    501                    supported_hash_algorithms={SHA1, SHA256, SHA512}, 
    502                    supported_signature_types={SIGNATURE_TYPE_SUB_KEY_BINDING}, 
    503                    include_info=True, 
    504                ) 
    505                # verify_signature requires a "keyid" even if it is short. 
    506                # (see parse_signature_packet for more information about keyids) 
    507                signature["keyid"] = signature["keyid"] or signature["short_keyid"] 
    508                key_binding_signatures.append(signature) 
    509 
    510            # TODO: Revise exception taxonomy 
    511            except Exception as e: 
    512                log.info(e) 
    513                continue 
    514        # NOTE: As per the V4 key structure diagram in RFC4880 section 12.1., a 
    515        # subkey must be followed by exactly one Primary-Key-Binding-Signature. 
    516        # Based on inspection of real-world keys and other parts of the RFC (e.g. 
    517        # the paragraph below the diagram and paragraph 0x18: Subkey Binding 
    518        # Signature in section 5.2.1.) the mandated signature is actually a 
    519        # *subkey binding signature*, which in case of a signing subkey, must have 
    520        # an *embedded primary key binding signature*. 
    521        if len(key_binding_signatures) != 1: 
    522            log.info( 
    523                "Ignoring subkey '{}' due to wrong amount of key binding " 
    524                "signatures ({}), must be exactly 1.".format( 
    525                    subkey["keyid"], len(key_binding_signatures) 
    526                ) 
    527            ) 
    528            continue 
    529        is_valid = handler.verify_signature( 
    530            signature, 
    531            bundle[PACKET_TYPE_PRIMARY_KEY]["key"], 
    532            signed_content, 
    533            signature["info"]["hash_algorithm"], 
    534        ) 
    535 
    536        if not is_valid: 
    537            log.info( 
    538                "Ignoring subkey '{}' due to invalid key binding signature.".format( 
    539                    subkey["keyid"] 
    540                ) 
    541            ) 
    542            continue 
    543 
    544        # If the signature is valid, we may also extract relevant information from 
    545        # its "info" field (e.g. subkey expiration date) and assign to it to the 
    546        # subkey here 
    547        validity_period = signature["info"]["subpackets"].get(KEY_EXPIRATION_SUBPACKET) 
    548        if validity_period is not None: 
    549            subkey["validity_period"] = struct.unpack(">I", validity_period)[0] 
    550 
    551        verified_subkeys[subkey["keyid"]] = subkey 
    552 
    553    return verified_subkeys 
    554 
    555 
    556def get_pubkey_bundle(data, keyid): 
    557    """ 
    558    <Purpose> 
    559      Call function to extract and verify master key and subkeys from the passed 
    560      gpg key data, where either the master key or one of the subkeys matches the 
    561      passed keyid. 
    562 
    563      NOTE: 
    564      - If the keyid matches one of the subkeys, a warning is issued to notify 
    565        the user about potential privilege escalation 
    566      - Subkeys with invalid key binding signatures are discarded 
    567 
    568    <Arguments> 
    569      data: 
    570            Public key data as written to stdout by 
    571            securesystemslib._gpg.constants.gpg_export_pubkey_command. 
    572 
    573      keyid: 
    574            The keyid of the master key or one of its subkeys expected to be 
    575            contained in the passed gpg data. 
    576 
    577    <Exceptions> 
    578      securesystemslib._gpg.exceptions.PacketParsingError 
    579            If the key data could not be parsed 
    580 
    581      securesystemslib._gpg.exceptions.KeyNotFoundError 
    582            If the passed data is empty. 
    583            If no master key or subkeys could be found that matches the passed 
    584            keyid. 
    585 
    586 
    587    <Side Effects> 
    588      None. 
    589 
    590    <Returns> 
    591      A public key dict with optional subkeys. 
    592 
    593    """ 
    594    if not data: 
    595        raise KeyNotFoundError( 
    596            f"Could not find gpg key '{keyid}' in empty exported key data." 
    597        ) 
    598 
    599    # Parse out master key and subkeys (enriched and verified via certificates 
    600    # and binding signatures) 
    601    raw_key_bundle = parse_pubkey_bundle(data) 
    602    master_public_key = _assign_certified_key_info(raw_key_bundle) 
    603    sub_public_keys = _get_verified_subkeys(raw_key_bundle) 
    604 
    605    # Since GPG returns all pubkeys associated with a keyid (master key and 
    606    # subkeys) we check which key matches the passed keyid. 
    607    # If the matching key is a subkey, we warn the user because we return 
    608    # the whole bundle (master plus all subkeys) and not only the subkey. 
    609    # If no matching key is found we raise a KeyNotFoundError. 
    610    for idx, public_key in enumerate( 
    611        [master_public_key] + list(sub_public_keys.values()) 
    612    ): 
    613        if public_key and public_key["keyid"].endswith(keyid.lower()): 
    614            if idx > 1: 
    615                log.debug( 
    616                    "Exporting master key '{}' including subkeys '{}' for" 
    617                    " passed keyid '{}'.".format( 
    618                        master_public_key["keyid"], 
    619                        ", ".join(list(sub_public_keys.keys())), 
    620                        keyid, 
    621                    ) 
    622                ) 
    623            break 
    624 
    625    else: 
    626        raise KeyNotFoundError( 
    627            f"Could not find gpg key '{keyid}' in exported key data." 
    628        ) 
    629 
    630    # Add subkeys dictionary to master pubkey "subkeys" field if subkeys exist 
    631    if sub_public_keys: 
    632        master_public_key["subkeys"] = sub_public_keys 
    633 
    634    return master_public_key 
    635 
    636 
    637# ruff: noqa: PLR0912, PLR0915 
    638def parse_signature_packet( 
    639    data, 
    640    supported_signature_types=None, 
    641    supported_hash_algorithms=None, 
    642    include_info=False, 
    643): 
    644    """ 
    645    <Purpose> 
    646      Parse the signature information on an RFC4880-encoded binary signature data 
    647      buffer. 
    648 
    649      NOTE: Older gpg versions (< FULLY_SUPPORTED_MIN_VERSION) might only 
    650      reveal the partial key id. It is the callers responsibility to determine 
    651      the full keyid based on the partial keyid, e.g. by exporting the related 
    652      public and replacing the partial keyid with the full keyid. 
    653 
    654    <Arguments> 
    655      data: 
    656             the RFC4880-encoded binary signature data buffer as described in 
    657             section 5.2 (and 5.2.3.1). 
    658      supported_signature_types: (optional) 
    659            a set of supported signature_types, the signature packet may be 
    660            (see securesystemslib._gpg.constants for available types). If None is 
    661            specified the signature packet must be of type SIGNATURE_TYPE_BINARY. 
    662      supported_hash_algorithms: (optional) 
    663            a set of supported hash algorithm ids, the signature packet 
    664            may use. Available ids are SHA1, SHA256, SHA512 (see 
    665            securesystemslib._gpg.constants). If None is specified, the signature 
    666            packet must use SHA256. 
    667      include_info: (optional) 
    668            a boolean that indicates whether an opaque dictionary should be 
    669            added to the returned signature under the key "info". Default is 
    670            False. 
    671 
    672    <Exceptions> 
    673      ValueError: if the signature packet is not supported or the data is 
    674        malformed 
    675      IndexError: if the signature packet is incomplete 
    676 
    677    <Side Effects> 
    678      None. 
    679 
    680    <Returns> 
    681      A signature dict with the following special characteristics: 
    682       - The "keyid" field is an empty string if it cannot be determined 
    683       - The "short_keyid" is not added if it cannot be determined 
    684       - At least one of non-empty "keyid" or "short_keyid" are part of the 
    685         signature 
    686 
    687    """ 
    688    if not supported_signature_types: 
    689        supported_signature_types = {SIGNATURE_TYPE_BINARY} 
    690 
    691    if not supported_hash_algorithms: 
    692        supported_hash_algorithms = {SHA256} 
    693 
    694    _, header_len, _, packet_len = gpg_util.parse_packet_header( 
    695        data, PACKET_TYPE_SIGNATURE 
    696    ) 
    697 
    698    data = bytearray(data[header_len:packet_len]) 
    699 
    700    ptr = 0 
    701 
    702    # we get the version number, which we also expect to be v4, or we bail 
    703    # FIXME: support v3 type signatures (which I haven't seen in the wild) 
    704    version_number = data[ptr] 
    705    ptr += 1 
    706    if version_number not in SUPPORTED_SIGNATURE_PACKET_VERSIONS: 
    707        raise ValueError( 
    708            f"Signature version '{version_number}' not supported, " 
    709            f"must be one of {SUPPORTED_SIGNATURE_PACKET_VERSIONS}." 
    710        ) 
    711 
    712    # Per default we only parse "signatures of a binary document". Other types 
    713    # may be allowed by passing type constants via `supported_signature_types`. 
    714    # Types include revocation signatures, key binding signatures, persona 
    715    # certifications, etc. (see RFC 4880 section 5.2.1.). 
    716    signature_type = data[ptr] 
    717    ptr += 1 
    718 
    719    if signature_type not in supported_signature_types: 
    720        raise ValueError( 
    721            f"Signature type '{signature_type}' not supported, " 
    722            f"must be one of {supported_signature_types} " 
    723            "(see RFC4880 5.2.1. Signature Types)." 
    724        ) 
    725 
    726    signature_algorithm = data[ptr] 
    727    ptr += 1 
    728 
    729    if signature_algorithm not in SUPPORTED_SIGNATURE_ALGORITHMS: 
    730        raise ValueError( 
    731            f"Signature algorithm '{signature_algorithm}' not " 
    732            "supported, please verify that your gpg configuration is creating " 
    733            "either DSA, RSA, or EdDSA signatures (see RFC4880 9.1. Public-Key " 
    734            "Algorithms)." 
    735        ) 
    736 
    737    key_type = SUPPORTED_SIGNATURE_ALGORITHMS[signature_algorithm]["type"] 
    738    handler = SIGNATURE_HANDLERS[key_type] 
    739 
    740    hash_algorithm = data[ptr] 
    741    ptr += 1 
    742 
    743    if hash_algorithm not in supported_hash_algorithms: 
    744        raise ValueError( 
    745            f"Hash algorithm '{hash_algorithm}' not supported, " 
    746            f"must be one of {supported_hash_algorithms} " 
    747            "(see RFC4880 9.4. Hash Algorithms)." 
    748        ) 
    749 
    750    # Obtain the hashed octets 
    751    hashed_octet_count = struct.unpack(">H", data[ptr : ptr + 2])[0] 
    752    ptr += 2 
    753    hashed_subpackets = data[ptr : ptr + hashed_octet_count] 
    754    hashed_subpacket_info = gpg_util.parse_subpackets(hashed_subpackets) 
    755 
    756    if len(hashed_subpackets) != hashed_octet_count:  # pragma: no cover 
    757        raise ValueError( 
    758            "Signature packet contains an unexpected amount of hashed octets" 
    759        ) 
    760 
    761    ptr += hashed_octet_count 
    762    other_headers_ptr = ptr 
    763 
    764    unhashed_octet_count = struct.unpack(">H", data[ptr : ptr + 2])[0] 
    765    ptr += 2 
    766 
    767    unhashed_subpackets = data[ptr : ptr + unhashed_octet_count] 
    768    unhashed_subpacket_info = gpg_util.parse_subpackets(unhashed_subpackets) 
    769 
    770    ptr += unhashed_octet_count 
    771 
    772    # Use the info dict to return further signature information that may be 
    773    # needed for intermediate processing, but does not have to be on the eventual 
    774    # signature datastructure 
    775    info = { 
    776        "signature_type": signature_type, 
    777        "hash_algorithm": hash_algorithm, 
    778        "creation_time": None, 
    779        "subpackets": {}, 
    780    } 
    781 
    782    keyid = "" 
    783    short_keyid = "" 
    784 
    785    # Parse "Issuer" (short keyid) and "Issuer Fingerprint" (full keyid) type 
    786    # subpackets 
    787    # Strategy: Loop over all unhashed and hashed subpackets (in that order!) and 
    788    # store only the last of a type. Due to the order in the loop, hashed 
    789    # subpackets are prioritized over unhashed subpackets (see NOTEs below). 
    790 
    791    # NOTE: A subpacket may be found either in the hashed or unhashed subpacket 
    792    # sections of a signature. If a subpacket is not hashed, then the information 
    793    # in it cannot be considered definitive because it is not part of the 
    794    # signature proper. (see RFC4880 5.2.3.2.) 
    795    # NOTE: Signatures may contain conflicting information in subpackets. In most 
    796    # cases, an implementation SHOULD use the last subpacket, but MAY use any 
    797    # conflict resolution scheme that makes more sense. (see RFC4880 5.2.4.1.) 
    798    for idx, subpacket_tuple in enumerate( 
    799        unhashed_subpacket_info + hashed_subpacket_info 
    800    ): 
    801        # The idx indicates if the info is from the unhashed (first) or 
    802        # hashed (second) of the above concatenated lists 
    803        is_hashed = idx >= len(unhashed_subpacket_info) 
    804        subpacket_type, subpacket_data = subpacket_tuple 
    805 
    806        # Warn if expiration subpacket is not hashed 
    807        if subpacket_type == KEY_EXPIRATION_SUBPACKET: 
    808            if not is_hashed: 
    809                log.warning( 
    810                    "Expiration subpacket not hashed, gpg client possibly " 
    811                    "exporting a weakly configured key." 
    812                ) 
    813 
    814        # Full keyids are only available in newer signatures 
    815        # (see RFC4880 and rfc4880bis-06 5.2.3.1.) 
    816        if subpacket_type == FULL_KEYID_SUBPACKET:  # pragma: no cover 
    817            # Exclude from coverage for consistent results across test envs 
    818            # NOTE: The first byte of the subpacket payload is a version number 
    819            # (see rfc4880bis-06 5.2.3.28.) 
    820            keyid = binascii.hexlify(subpacket_data[1:]).decode("ascii") 
    821 
    822        # We also return the short keyid, because the full might not be available 
    823        if subpacket_type == PARTIAL_KEYID_SUBPACKET: 
    824            short_keyid = binascii.hexlify(subpacket_data).decode("ascii") 
    825 
    826        if subpacket_type == SIG_CREATION_SUBPACKET: 
    827            info["creation_time"] = struct.unpack(">I", subpacket_data)[0] 
    828 
    829        info["subpackets"][subpacket_type] = subpacket_data 
    830 
    831    # Fail if there is no keyid at all (this should not happen) 
    832    if not (keyid or short_keyid):  # pragma: no cover 
    833        raise ValueError( 
    834            "This signature packet seems to be corrupted. It does " 
    835            "not have an 'Issuer' or 'Issuer Fingerprint' subpacket (see RFC4880 " 
    836            "and rfc4880bis-06 5.2.3.1. Signature Subpacket Specification)." 
    837        ) 
    838 
    839    # Fail if keyid and short keyid are specified but don't match 
    840    if keyid and not keyid.endswith(short_keyid):  # pragma: no cover 
    841        raise ValueError( 
    842            "This signature packet seems to be corrupted. The key ID " 
    843            f"'{short_keyid}' of the 'Issuer' subpacket must match the " 
    844            f"lower 64 bits of the fingerprint '{keyid}' of the 'Issuer " 
    845            "Fingerprint' subpacket (see RFC4880 and rfc4880bis-06 5.2.3.28. " 
    846            "Issuer Fingerprint)." 
    847        ) 
    848 
    849    if not info["creation_time"]:  # pragma: no cover 
    850        raise ValueError( 
    851            "This signature packet seems to be corrupted. It does " 
    852            "not have a 'Signature Creation Time' subpacket (see RFC4880 5.2.3.4 " 
    853            "Signature Creation Time)." 
    854        ) 
    855 
    856    # Uncomment this variable to obtain the left-hash-bits information (used for 
    857    # early rejection) 
    858    # left_hash_bits = struct.unpack(">H", data[ptr:ptr+2])[0] 
    859    ptr += 2 
    860 
    861    # Finally, fetch the actual signature (as opposed to signature metadata). 
    862    signature = handler.get_signature_params(data[ptr:]) 
    863 
    864    signature_data = { 
    865        "keyid": keyid, 
    866        "other_headers": binascii.hexlify(data[:other_headers_ptr]).decode("ascii"), 
    867        "signature": binascii.hexlify(signature).decode("ascii"), 
    868    } 
    869 
    870    if short_keyid:  # pragma: no branch 
    871        signature_data["short_keyid"] = short_keyid 
    872 
    873    if include_info: 
    874        signature_data["info"] = info 
    875 
    876    return signature_data