1""" 
    2<Module Name> 
    3  util.py 
    4 
    5<Author> 
    6  Santiago Torres-Arias <santiago@nyu.edu> 
    7 
    8<Started> 
    9  Nov 15, 2017 
    10 
    11<Copyright> 
    12  See LICENSE for licensing information. 
    13 
    14<Purpose> 
    15  general-purpose utilities for binary data handling and pgp data parsing 
    16""" 
    17 
    18# ruff: noqa: PLR2004 
    19# (disbales "Magic value used in comparison", like on line 150) 
    20 
    21import binascii 
    22import logging 
    23import struct 
    24 
    25CRYPTO = True 
    26NO_CRYPTO_MSG = "gpg.utils requires the cryptography library" 
    27try: 
    28    from cryptography.hazmat import backends 
    29    from cryptography.hazmat.primitives import hashes as hashing 
    30except ImportError: 
    31    CRYPTO = False 
    32 
    33# ruff: noqa: E402 
    34from securesystemslib import exceptions 
    35from securesystemslib._gpg import constants 
    36from securesystemslib._gpg.exceptions import PacketParsingError 
    37 
    38log = logging.getLogger(__name__) 
    39 
    40 
    41def get_mpi_length(data): 
    42    """ 
    43    <Purpose> 
    44      parses an MPI (Multi-Precision Integer) buffer and returns the appropriate 
    45      length. This is mostly done to perform bitwise to byte-wise conversion. 
    46 
    47      See RFC4880 section 3.2. Multiprecision Integers for details. 
    48 
    49    <Arguments> 
    50      data: The MPI data 
    51 
    52    <Exceptions> 
    53      None 
    54 
    55    <Side Effects> 
    56      None 
    57 
    58    <Returns> 
    59      The length of the MPI contained at the beginning of this data buffer. 
    60    """ 
    61    bitlength = int(struct.unpack(">H", data)[0]) 
    62    # Notice the /8 at the end, this length is the bitlength, not the length of 
    63    # the data in bytes (as len reports it) 
    64    return int((bitlength - 1) / 8) + 1 
    65 
    66 
    67def hash_object(headers, algorithm, content): 
    68    """ 
    69    <Purpose> 
    70      Hash data prior to signature verification in conformance of the RFC4880 
    71      openPGP standard. 
    72 
    73    <Arguments> 
    74      headers: the additional OpenPGP headers as populated from 
    75      gpg_generate_signature 
    76 
    77      algorithm: The hash algorithm object defined by the cryptography.io hashes 
    78      module 
    79 
    80      content: the signed content 
    81 
    82    <Exceptions> 
    83      securesystemslib.exceptions.UnsupportedLibraryError if: 
    84        the cryptography module is unavailable 
    85 
    86    <Side Effects> 
    87      None 
    88 
    89    <Returns> 
    90      The RFC4880-compliant hashed buffer 
    91    """ 
    92    if not CRYPTO:  # pragma: no cover 
    93        raise exceptions.UnsupportedLibraryError(NO_CRYPTO_MSG) 
    94 
    95    # As per RFC4880 Section 5.2.4., we need to hash the content, 
    96    # signature headers and add a very opinionated trailing header 
    97    hasher = hashing.Hash(algorithm, backend=backends.default_backend()) 
    98    hasher.update(content) 
    99    hasher.update(headers) 
    100    hasher.update(b"\x04\xff") 
    101    hasher.update(struct.pack(">I", len(headers))) 
    102 
    103    return hasher.finalize() 
    104 
    105 
    106def parse_packet_header(data, expected_type=None):  # noqa: PLR0912 
    107    """ 
    108    <Purpose> 
    109      Parse out packet type and header and body lengths from an RFC4880 packet. 
    110 
    111    <Arguments> 
    112      data: 
    113              An RFC4880 packet as described in section 4.2 of the rfc. 
    114 
    115      expected_type: (optional) 
    116              Used to error out if the packet does not have the expected 
    117              type. See securesystemslib._gpg.constants.PACKET_TYPE_* for 
    118              available types. 
    119 
    120    <Exceptions> 
    121      securesystemslib._gpg.exceptions.PacketParsingError 
    122              If the new format packet length encodes a partial body length 
    123              If the old format packet length encodes an indeterminate length 
    124              If header or body length could not be determined 
    125              If the expected_type was passed and does not match the packet type 
    126 
    127      IndexError 
    128              If the passed data is incomplete 
    129 
    130    <Side Effects> 
    131      None. 
    132 
    133    <Returns> 
    134      A tuple of packet type, header length, body length and packet length. 
    135      (see  RFC4880 4.3. for the list of available packet types) 
    136 
    137    """ 
    138    data = bytearray(data) 
    139    header_len = None 
    140    body_len = None 
    141 
    142    # If Bit 6 of 1st octet is set we parse a New Format Packet Length, and 
    143    # an Old Format Packet Lengths otherwise 
    144    if data[0] & 0b01000000: 
    145        # In new format packet lengths the packet type is encoded in Bits 5-0 of 
    146        # the 1st octet of the packet 
    147        packet_type = data[0] & 0b00111111 
    148 
    149        # The rest of the packet header is the body length header, which may 
    150        # consist of one, two or five octets. To disambiguate the RFC, the first 
    151        # octet of the body length header is the second octet of the packet. 
    152        if data[1] < 192: 
    153            header_len = 2 
    154            body_len = data[1] 
    155 
    156        elif data[1] >= 192 and data[1] <= 223: 
    157            header_len = 3 
    158            body_len = (data[1] - 192 << 8) + data[2] + 192 
    159 
    160        elif data[1] >= 224 and data[1] < 255: 
    161            raise PacketParsingError( 
    162                "New length format packets of partial body lengths are not supported" 
    163            ) 
    164 
    165        elif data[1] == 255: 
    166            header_len = 6 
    167            body_len = data[2] << 24 | data[3] << 16 | data[4] << 8 | data[5] 
    168 
    169        else:  # pragma: no cover 
    170            # Unreachable: octet must be between 0 and 255 
    171            raise PacketParsingError("Invalid new length") 
    172 
    173    else: 
    174        # In old format packet lengths the packet type is encoded in Bits 5-2 of 
    175        # the 1st octet and the length type in Bits 1-0 
    176        packet_type = (data[0] & 0b00111100) >> 2 
    177        length_type = data[0] & 0b00000011 
    178 
    179        # The body length is encoded using one, two, or four octets, starting 
    180        # with the second octet of the packet 
    181        if length_type == 0: 
    182            body_len = data[1] 
    183            header_len = 2 
    184 
    185        elif length_type == 1: 
    186            header_len = 3 
    187            body_len = struct.unpack(">H", data[1:header_len])[0] 
    188 
    189        elif length_type == 2: 
    190            header_len = 5 
    191            body_len = struct.unpack(">I", data[1:header_len])[0] 
    192 
    193        elif length_type == 3: 
    194            raise PacketParsingError( 
    195                "Old length format packets of indeterminate length are not supported" 
    196            ) 
    197 
    198        else:  # pragma: no cover (unreachable) 
    199            # Unreachable: bits 1-0 must be one of 0 to 3 
    200            raise PacketParsingError("Invalid old length") 
    201 
    202    if header_len is None or body_len is None:  # pragma: no cover 
    203        # Unreachable: One of above must have assigned lengths or raised error 
    204        raise PacketParsingError("Could not determine packet length") 
    205 
    206    if expected_type is not None and packet_type != expected_type: 
    207        raise PacketParsingError( 
    208            f"Expected packet {expected_type}, but got {packet_type} instead!" 
    209        ) 
    210 
    211    return packet_type, header_len, body_len, header_len + body_len 
    212 
    213 
    214def compute_keyid(pubkey_packet_data): 
    215    """ 
    216    <Purpose> 
    217      compute a keyid from an RFC4880 public-key buffer 
    218 
    219    <Arguments> 
    220      pubkey_packet_data: the public-key packet buffer 
    221 
    222    <Exceptions> 
    223      securesystemslib.exceptions.UnsupportedLibraryError if: 
    224        the cryptography module is unavailable 
    225 
    226    <Side Effects> 
    227      None 
    228 
    229    <Returns> 
    230      The RFC4880-compliant hashed buffer 
    231    """ 
    232    if not CRYPTO:  # pragma: no cover 
    233        raise exceptions.UnsupportedLibraryError(NO_CRYPTO_MSG) 
    234 
    235    hasher = hashing.Hash( 
    236        hashing.SHA1(),  # noqa: S303 
    237        backend=backends.default_backend(), 
    238    ) 
    239    hasher.update(b"\x99") 
    240    hasher.update(struct.pack(">H", len(pubkey_packet_data))) 
    241    hasher.update(bytes(pubkey_packet_data)) 
    242    return binascii.hexlify(hasher.finalize()).decode("ascii") 
    243 
    244 
    245def parse_subpacket_header(data): 
    246    """Parse out subpacket header as per RFC4880 5.2.3.1. Signature Subpacket 
    247    Specification.""" 
    248    # NOTE: Although the RFC does not state it explicitly, the length encoded 
    249    # in the header must be greater equal 1, as it includes the mandatory 
    250    # subpacket type octet. 
    251    # Hence, passed bytearrays like [0] or [255, 0, 0, 0, 0], which encode a 
    252    # subpacket length 0  are invalid. 
    253    # The caller has to deal with the resulting IndexError. 
    254    if data[0] < 192: 
    255        length_len = 1 
    256        length = data[0] 
    257 
    258    elif data[0] >= 192 and data[0] < 255: 
    259        length_len = 2 
    260        length = (data[0] - 192 << 8) + (data[1] + 192) 
    261 
    262    elif data[0] == 255: 
    263        length_len = 5 
    264        length = struct.unpack(">I", data[1:length_len])[0] 
    265 
    266    else:  # pragma: no cover (unreachable) 
    267        raise PacketParsingError("Invalid subpacket header") 
    268 
    269    return data[length_len], length_len + 1, length - 1, length_len + length 
    270 
    271 
    272def parse_subpackets(data): 
    273    """ 
    274    <Purpose> 
    275      parse the subpackets fields 
    276 
    277    <Arguments> 
    278      data: the unparsed subpacketoctets 
    279 
    280    <Exceptions> 
    281      IndexErrorif the subpackets octets are incomplete or malformed 
    282 
    283    <Side Effects> 
    284      None 
    285 
    286    <Returns> 
    287      A list of tuples with like: 
    288          [ (packet_type, data), 
    289            (packet_type, data), 
    290            ... 
    291          ] 
    292    """ 
    293    parsed_subpackets = [] 
    294    position = 0 
    295 
    296    while position < len(data): 
    297        subpacket_type, header_len, _, subpacket_len = parse_subpacket_header( 
    298            data[position:] 
    299        ) 
    300 
    301        payload = data[position + header_len : position + subpacket_len] 
    302        parsed_subpackets.append((subpacket_type, payload)) 
    303 
    304        position += subpacket_len 
    305 
    306    return parsed_subpackets 
    307 
    308 
    309def get_hashing_class(hash_algorithm_id): 
    310    """ 
    311    <Purpose> 
    312      Return a pyca/cryptography hashing class reference for the passed RFC4880 
    313      hash algorithm ID. 
    314 
    315    <Arguments> 
    316      hash_algorithm_id: 
    317              one of SHA1, SHA256, SHA512 (see securesystemslib._gpg.constants) 
    318 
    319    <Exceptions> 
    320      ValueError 
    321              if the passed hash_algorithm_id is not supported. 
    322 
    323    <Returns> 
    324      A pyca/cryptography hashing class 
    325 
    326    """ 
    327    supported_hashing_algorithms = [ 
    328        constants.SHA1, 
    329        constants.SHA256, 
    330        constants.SHA512, 
    331    ] 
    332    corresponding_hashing_classes = [ 
    333        hashing.SHA1, 
    334        hashing.SHA256, 
    335        hashing.SHA512, 
    336    ] 
    337 
    338    # Map supported hash algorithm ids to corresponding hashing classes 
    339    hashing_class = dict( 
    340        zip(supported_hashing_algorithms, corresponding_hashing_classes) 
    341    ) 
    342 
    343    try: 
    344        return hashing_class[hash_algorithm_id] 
    345 
    346    except KeyError: 
    347        raise ValueError( 
    348            f"Hash algorithm '{hash_algorithm_id}' not supported, " 
    349            f"must be one of '{supported_hashing_algorithms}' " 
    350            "(see RFC4880 9.4. Hash Algorithms)." 
    351        )