1""" 
    2<Module Name> 
    3  dsa.py 
    4 
    5<Author> 
    6  Santiago Torres-Arias <santiago@nyu.edu> 
    7 
    8<Started> 
    9  Nov 15, 2017 
    10 
    11<Copyright> 
    12  See LICENSE for licensing information. 
    13 
    14<Purpose> 
    15  DSA-specific handling routines for signature verification and key parsing 
    16""" 
    17 
    18import binascii 
    19 
    20CRYPTO = True 
    21NO_CRYPTO_MSG = "DSA key support for GPG requires the cryptography library" 
    22try: 
    23    from cryptography.exceptions import InvalidSignature 
    24    from cryptography.hazmat import backends 
    25    from cryptography.hazmat.primitives.asymmetric import dsa 
    26    from cryptography.hazmat.primitives.asymmetric import utils as dsautils 
    27except ImportError: 
    28    CRYPTO = False 
    29 
    30# ruff: noqa: E402 
    31from securesystemslib import exceptions 
    32from securesystemslib._gpg import util as gpg_util 
    33from securesystemslib._gpg.exceptions import PacketParsingError 
    34 
    35 
    36def create_pubkey(pubkey_info): 
    37    """ 
    38    <Purpose> 
    39      Create and return a DSAPublicKey object from the passed pubkey_info 
    40      using pyca/cryptography. 
    41 
    42    <Arguments> 
    43      pubkey_info: 
    44              The DSA pubkey dict. 
    45 
    46    <Exceptions> 
    47      securesystemslib.exceptions.UnsupportedLibraryError if 
    48        the cryptography module is not available 
    49 
    50    <Returns> 
    51      A cryptography.hazmat.primitives.asymmetric.dsa.DSAPublicKey based on the 
    52      passed pubkey_info. 
    53 
    54    """ 
    55    if not CRYPTO:  # pragma: no cover 
    56        raise exceptions.UnsupportedLibraryError(NO_CRYPTO_MSG) 
    57 
    58    y = int(pubkey_info["keyval"]["public"]["y"], 16) 
    59    g = int(pubkey_info["keyval"]["public"]["g"], 16) 
    60    p = int(pubkey_info["keyval"]["public"]["p"], 16) 
    61    q = int(pubkey_info["keyval"]["public"]["q"], 16) 
    62    parameter_numbers = dsa.DSAParameterNumbers(p, q, g) 
    63    pubkey = dsa.DSAPublicNumbers(y, parameter_numbers).public_key( 
    64        backends.default_backend() 
    65    ) 
    66 
    67    return pubkey 
    68 
    69 
    70def get_pubkey_params(data): 
    71    """ 
    72    <Purpose> 
    73      Parse the public-key parameters as multi-precision-integers. 
    74 
    75    <Arguments> 
    76      data: 
    77             the RFC4880-encoded public key parameters data buffer as described 
    78             in the fifth paragraph of section 5.5.2. 
    79 
    80    <Exceptions> 
    81      securesystemslib._gpg.exceptions.PacketParsingError: 
    82             if the public key parameters are malformed 
    83 
    84    <Side Effects> 
    85      None. 
    86 
    87    <Returns> 
    88      A DSA public key dict. 
    89 
    90    """ 
    91    ptr = 0 
    92 
    93    prime_p_length = gpg_util.get_mpi_length(data[ptr : ptr + 2]) 
    94    ptr += 2 
    95    prime_p = data[ptr : ptr + prime_p_length] 
    96    if len(prime_p) != prime_p_length:  # pragma: no cover 
    97        raise PacketParsingError("This MPI was truncated!") 
    98    ptr += prime_p_length 
    99 
    100    group_order_q_length = gpg_util.get_mpi_length(data[ptr : ptr + 2]) 
    101    ptr += 2 
    102    group_order_q = data[ptr : ptr + group_order_q_length] 
    103    if len(group_order_q) != group_order_q_length:  # pragma: no cover 
    104        raise PacketParsingError("This MPI has been truncated!") 
    105    ptr += group_order_q_length 
    106 
    107    generator_length = gpg_util.get_mpi_length(data[ptr : ptr + 2]) 
    108    ptr += 2 
    109    generator = data[ptr : ptr + generator_length] 
    110    if len(generator) != generator_length:  # pragma: no cover 
    111        raise PacketParsingError("This MPI has been truncated!") 
    112    ptr += generator_length 
    113 
    114    value_y_length = gpg_util.get_mpi_length(data[ptr : ptr + 2]) 
    115    ptr += 2 
    116    value_y = data[ptr : ptr + value_y_length] 
    117    if len(value_y) != value_y_length:  # pragma: no cover 
    118        raise PacketParsingError("This MPI has been truncated!") 
    119 
    120    return { 
    121        "y": binascii.hexlify(value_y).decode("ascii"), 
    122        "p": binascii.hexlify(prime_p).decode("ascii"), 
    123        "g": binascii.hexlify(generator).decode("ascii"), 
    124        "q": binascii.hexlify(group_order_q).decode("ascii"), 
    125    } 
    126 
    127 
    128def get_signature_params(data): 
    129    """ 
    130    <Purpose> 
    131      Parse the signature parameters as multi-precision-integers. 
    132 
    133    <Arguments> 
    134      data: 
    135             the RFC4880-encoded signature data buffer as described 
    136             in the fourth paragraph of section 5.2.2 
    137 
    138    <Exceptions> 
    139      securesystemslib._gpg.exceptions.PacketParsingError: 
    140             if the public key parameters are malformed 
    141 
    142      securesystemslib.exceptions.UnsupportedLibraryError: 
    143             if the cryptography module is not available 
    144 
    145    <Side Effects> 
    146      None. 
    147 
    148    <Returns> 
    149      The decoded signature buffer 
    150    """ 
    151    if not CRYPTO:  # pragma: no cover 
    152        return exceptions.UnsupportedLibraryError(NO_CRYPTO_MSG) 
    153 
    154    ptr = 0 
    155    r_length = gpg_util.get_mpi_length(data[ptr : ptr + 2]) 
    156    ptr += 2 
    157    r = data[ptr : ptr + r_length] 
    158    if len(r) != r_length:  # pragma: no cover 
    159        raise PacketParsingError("r-value truncated in signature") 
    160    ptr += r_length 
    161 
    162    s_length = gpg_util.get_mpi_length(data[ptr : ptr + 2]) 
    163    ptr += 2 
    164    s = data[ptr : ptr + s_length] 
    165    if len(s) != s_length:  # pragma: no cover 
    166        raise PacketParsingError("s-value truncated in signature") 
    167 
    168    s = int(binascii.hexlify(s), 16) 
    169    r = int(binascii.hexlify(r), 16) 
    170 
    171    signature = dsautils.encode_dss_signature(r, s) 
    172 
    173    return signature 
    174 
    175 
    176def verify_signature(signature_object, pubkey_info, content, hash_algorithm_id): 
    177    """ 
    178    <Purpose> 
    179      Verify the passed signature against the passed content with the passed 
    180      DSA public key using pyca/cryptography. 
    181 
    182    <Arguments> 
    183      signature_object: 
    184              A signature dict. 
    185 
    186      pubkey_info: 
    187              The DSA public key dict. 
    188 
    189      hash_algorithm_id: 
    190              one of SHA1, SHA256, SHA512 (see securesystemslib._gpg.constants) 
    191              used to verify the signature 
    192              NOTE: Overrides any hash algorithm specification in "pubkey_info"'s 
    193              "hashes" or "method" fields. 
    194 
    195      content: 
    196              The signed bytes against which the signature is verified 
    197 
    198    <Exceptions> 
    199      securesystemslib.exceptions.UnsupportedLibraryError if: 
    200        the cryptography module is not available 
    201 
    202      ValueError: 
    203        if the passed hash_algorithm_id is not supported (see 
    204        securesystemslib._gpg.util.get_hashing_class) 
    205 
    206    <Returns> 
    207      True if signature verification passes and False otherwise 
    208 
    209    """ 
    210    if not CRYPTO:  # pragma: no cover 
    211        raise exceptions.UnsupportedLibraryError(NO_CRYPTO_MSG) 
    212 
    213    hasher = gpg_util.get_hashing_class(hash_algorithm_id) 
    214 
    215    pubkey_object = create_pubkey(pubkey_info) 
    216 
    217    digest = gpg_util.hash_object( 
    218        binascii.unhexlify(signature_object["other_headers"]), hasher(), content 
    219    ) 
    220 
    221    try: 
    222        pubkey_object.verify( 
    223            binascii.unhexlify(signature_object["signature"]), 
    224            digest, 
    225            dsautils.Prehashed(hasher()), 
    226        ) 
    227        return True 
    228    except InvalidSignature: 
    229        return False