1# Copyright 2018 Donald Stufft and individual contributors 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7# http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14from typing import Tuple 
    15 
    16from nacl import exceptions as exc 
    17from nacl._sodium import ffi, lib 
    18from nacl.exceptions import ensure 
    19 
    20__all__ = [ 
    21    "crypto_kx_keypair", 
    22    "crypto_kx_client_session_keys", 
    23    "crypto_kx_server_session_keys", 
    24    "crypto_kx_PUBLIC_KEY_BYTES", 
    25    "crypto_kx_SECRET_KEY_BYTES", 
    26    "crypto_kx_SEED_BYTES", 
    27    "crypto_kx_SESSION_KEY_BYTES", 
    28] 
    29 
    30""" 
    31Implementations of client, server key exchange 
    32""" 
    33crypto_kx_PUBLIC_KEY_BYTES: int = lib.crypto_kx_publickeybytes() 
    34crypto_kx_SECRET_KEY_BYTES: int = lib.crypto_kx_secretkeybytes() 
    35crypto_kx_SEED_BYTES: int = lib.crypto_kx_seedbytes() 
    36crypto_kx_SESSION_KEY_BYTES: int = lib.crypto_kx_sessionkeybytes() 
    37 
    38 
    39def crypto_kx_keypair() -> Tuple[bytes, bytes]: 
    40    """ 
    41    Generate a key pair. 
    42    This is a duplicate crypto_box_keypair, but 
    43    is included for api consistency. 
    44    :return: (public_key, secret_key) 
    45    :rtype: (bytes, bytes) 
    46    """ 
    47    public_key = ffi.new("unsigned char[]", crypto_kx_PUBLIC_KEY_BYTES) 
    48    secret_key = ffi.new("unsigned char[]", crypto_kx_SECRET_KEY_BYTES) 
    49    res = lib.crypto_kx_keypair(public_key, secret_key) 
    50    ensure(res == 0, "Key generation failed.", raising=exc.CryptoError) 
    51 
    52    return ( 
    53        ffi.buffer(public_key, crypto_kx_PUBLIC_KEY_BYTES)[:], 
    54        ffi.buffer(secret_key, crypto_kx_SECRET_KEY_BYTES)[:], 
    55    ) 
    56 
    57 
    58def crypto_kx_seed_keypair(seed: bytes) -> Tuple[bytes, bytes]: 
    59    """ 
    60    Generate a key pair with a given seed. 
    61    This is functionally the same as crypto_box_seed_keypair, however 
    62    it uses the blake2b hash primitive instead of sha512. 
    63    It is included mainly for api consistency when using crypto_kx. 
    64    :param seed: random seed 
    65    :type seed: bytes 
    66    :return: (public_key, secret_key) 
    67    :rtype: (bytes, bytes) 
    68    """ 
    69    public_key = ffi.new("unsigned char[]", crypto_kx_PUBLIC_KEY_BYTES) 
    70    secret_key = ffi.new("unsigned char[]", crypto_kx_SECRET_KEY_BYTES) 
    71    ensure( 
    72        isinstance(seed, bytes) and len(seed) == crypto_kx_SEED_BYTES, 
    73        "Seed must be a {} byte long bytes sequence".format( 
    74            crypto_kx_SEED_BYTES 
    75        ), 
    76        raising=exc.TypeError, 
    77    ) 
    78    res = lib.crypto_kx_seed_keypair(public_key, secret_key, seed) 
    79    ensure(res == 0, "Key generation failed.", raising=exc.CryptoError) 
    80 
    81    return ( 
    82        ffi.buffer(public_key, crypto_kx_PUBLIC_KEY_BYTES)[:], 
    83        ffi.buffer(secret_key, crypto_kx_SECRET_KEY_BYTES)[:], 
    84    ) 
    85 
    86 
    87def crypto_kx_client_session_keys( 
    88    client_public_key: bytes, 
    89    client_secret_key: bytes, 
    90    server_public_key: bytes, 
    91) -> Tuple[bytes, bytes]: 
    92    """ 
    93    Generate session keys for the client. 
    94    :param client_public_key: 
    95    :type client_public_key: bytes 
    96    :param client_secret_key: 
    97    :type client_secret_key: bytes 
    98    :param server_public_key: 
    99    :type server_public_key: bytes 
    100    :return: (rx_key, tx_key) 
    101    :rtype: (bytes, bytes) 
    102    """ 
    103    ensure( 
    104        isinstance(client_public_key, bytes) 
    105        and len(client_public_key) == crypto_kx_PUBLIC_KEY_BYTES, 
    106        "Client public key must be a {} bytes long bytes sequence".format( 
    107            crypto_kx_PUBLIC_KEY_BYTES 
    108        ), 
    109        raising=exc.TypeError, 
    110    ) 
    111    ensure( 
    112        isinstance(client_secret_key, bytes) 
    113        and len(client_secret_key) == crypto_kx_SECRET_KEY_BYTES, 
    114        "Client secret key must be a {} bytes long bytes sequence".format( 
    115            crypto_kx_PUBLIC_KEY_BYTES 
    116        ), 
    117        raising=exc.TypeError, 
    118    ) 
    119    ensure( 
    120        isinstance(server_public_key, bytes) 
    121        and len(server_public_key) == crypto_kx_PUBLIC_KEY_BYTES, 
    122        "Server public key must be a {} bytes long bytes sequence".format( 
    123            crypto_kx_PUBLIC_KEY_BYTES 
    124        ), 
    125        raising=exc.TypeError, 
    126    ) 
    127 
    128    rx_key = ffi.new("unsigned char[]", crypto_kx_SESSION_KEY_BYTES) 
    129    tx_key = ffi.new("unsigned char[]", crypto_kx_SESSION_KEY_BYTES) 
    130    res = lib.crypto_kx_client_session_keys( 
    131        rx_key, tx_key, client_public_key, client_secret_key, server_public_key 
    132    ) 
    133    ensure( 
    134        res == 0, 
    135        "Client session key generation failed.", 
    136        raising=exc.CryptoError, 
    137    ) 
    138 
    139    return ( 
    140        ffi.buffer(rx_key, crypto_kx_SESSION_KEY_BYTES)[:], 
    141        ffi.buffer(tx_key, crypto_kx_SESSION_KEY_BYTES)[:], 
    142    ) 
    143 
    144 
    145def crypto_kx_server_session_keys( 
    146    server_public_key: bytes, 
    147    server_secret_key: bytes, 
    148    client_public_key: bytes, 
    149) -> Tuple[bytes, bytes]: 
    150    """ 
    151    Generate session keys for the server. 
    152    :param server_public_key: 
    153    :type server_public_key: bytes 
    154    :param server_secret_key: 
    155    :type server_secret_key: bytes 
    156    :param client_public_key: 
    157    :type client_public_key: bytes 
    158    :return: (rx_key, tx_key) 
    159    :rtype: (bytes, bytes) 
    160    """ 
    161    ensure( 
    162        isinstance(server_public_key, bytes) 
    163        and len(server_public_key) == crypto_kx_PUBLIC_KEY_BYTES, 
    164        "Server public key must be a {} bytes long bytes sequence".format( 
    165            crypto_kx_PUBLIC_KEY_BYTES 
    166        ), 
    167        raising=exc.TypeError, 
    168    ) 
    169    ensure( 
    170        isinstance(server_secret_key, bytes) 
    171        and len(server_secret_key) == crypto_kx_SECRET_KEY_BYTES, 
    172        "Server secret key must be a {} bytes long bytes sequence".format( 
    173            crypto_kx_PUBLIC_KEY_BYTES 
    174        ), 
    175        raising=exc.TypeError, 
    176    ) 
    177    ensure( 
    178        isinstance(client_public_key, bytes) 
    179        and len(client_public_key) == crypto_kx_PUBLIC_KEY_BYTES, 
    180        "Client public key must be a {} bytes long bytes sequence".format( 
    181            crypto_kx_PUBLIC_KEY_BYTES 
    182        ), 
    183        raising=exc.TypeError, 
    184    ) 
    185 
    186    rx_key = ffi.new("unsigned char[]", crypto_kx_SESSION_KEY_BYTES) 
    187    tx_key = ffi.new("unsigned char[]", crypto_kx_SESSION_KEY_BYTES) 
    188    res = lib.crypto_kx_server_session_keys( 
    189        rx_key, tx_key, server_public_key, server_secret_key, client_public_key 
    190    ) 
    191    ensure( 
    192        res == 0, 
    193        "Server session key generation failed.", 
    194        raising=exc.CryptoError, 
    195    ) 
    196 
    197    return ( 
    198        ffi.buffer(rx_key, crypto_kx_SESSION_KEY_BYTES)[:], 
    199        ffi.buffer(tx_key, crypto_kx_SESSION_KEY_BYTES)[:], 
    200    )