Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/kex_curve25519.py: 28%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import binascii
2import hashlib
4from cryptography.exceptions import UnsupportedAlgorithm
5from cryptography.hazmat.primitives import constant_time, serialization
6from cryptography.hazmat.primitives.asymmetric.x25519 import (
7 X25519PrivateKey,
8 X25519PublicKey,
9)
11from paramiko.message import Message
12from paramiko.common import byte_chr
13from paramiko.ssh_exception import SSHException
16_MSG_KEXECDH_INIT, _MSG_KEXECDH_REPLY = range(30, 32)
17c_MSG_KEXECDH_INIT, c_MSG_KEXECDH_REPLY = [byte_chr(c) for c in range(30, 32)]
20class KexCurve25519:
21 hash_algo = hashlib.sha256
23 def __init__(self, transport):
24 self.transport = transport
25 self.key = None
27 @classmethod
28 def is_available(cls):
29 try:
30 X25519PrivateKey.generate()
31 except UnsupportedAlgorithm:
32 return False
33 else:
34 return True
36 def _perform_exchange(self, peer_key):
37 secret = self.key.exchange(peer_key)
38 if constant_time.bytes_eq(secret, b"\x00" * 32):
39 raise SSHException(
40 "peer's curve25519 public value has wrong order"
41 )
42 return secret
44 def start_kex(self):
45 self.key = X25519PrivateKey.generate()
46 if self.transport.server_mode:
47 self.transport._expect_packet(_MSG_KEXECDH_INIT)
48 return
50 m = Message()
51 m.add_byte(c_MSG_KEXECDH_INIT)
52 m.add_string(
53 self.key.public_key().public_bytes(
54 serialization.Encoding.Raw, serialization.PublicFormat.Raw
55 )
56 )
57 self.transport._send_message(m)
58 self.transport._expect_packet(_MSG_KEXECDH_REPLY)
60 def parse_next(self, ptype, m):
61 if self.transport.server_mode and (ptype == _MSG_KEXECDH_INIT):
62 return self._parse_kexecdh_init(m)
63 elif not self.transport.server_mode and (ptype == _MSG_KEXECDH_REPLY):
64 return self._parse_kexecdh_reply(m)
65 raise SSHException(
66 "KexCurve25519 asked to handle packet type {:d}".format(ptype)
67 )
69 def _parse_kexecdh_init(self, m):
70 peer_key_bytes = m.get_string()
71 peer_key = X25519PublicKey.from_public_bytes(peer_key_bytes)
72 K = self._perform_exchange(peer_key)
73 K = int(binascii.hexlify(K), 16)
74 # compute exchange hash
75 hm = Message()
76 hm.add(
77 self.transport.remote_version,
78 self.transport.local_version,
79 self.transport.remote_kex_init,
80 self.transport.local_kex_init,
81 )
82 server_key_bytes = self.transport.get_server_key().asbytes()
83 exchange_key_bytes = self.key.public_key().public_bytes(
84 serialization.Encoding.Raw, serialization.PublicFormat.Raw
85 )
86 hm.add_string(server_key_bytes)
87 hm.add_string(peer_key_bytes)
88 hm.add_string(exchange_key_bytes)
89 hm.add_mpint(K)
90 H = self.hash_algo(hm.asbytes()).digest()
91 self.transport._set_K_H(K, H)
92 sig = self.transport.get_server_key().sign_ssh_data(
93 H, self.transport.host_key_type
94 )
95 # construct reply
96 m = Message()
97 m.add_byte(c_MSG_KEXECDH_REPLY)
98 m.add_string(server_key_bytes)
99 m.add_string(exchange_key_bytes)
100 m.add_string(sig)
101 self.transport._send_message(m)
102 self.transport._activate_outbound()
104 def _parse_kexecdh_reply(self, m):
105 peer_host_key_bytes = m.get_string()
106 peer_key_bytes = m.get_string()
107 sig = m.get_binary()
109 peer_key = X25519PublicKey.from_public_bytes(peer_key_bytes)
111 K = self._perform_exchange(peer_key)
112 K = int(binascii.hexlify(K), 16)
113 # compute exchange hash and verify signature
114 hm = Message()
115 hm.add(
116 self.transport.local_version,
117 self.transport.remote_version,
118 self.transport.local_kex_init,
119 self.transport.remote_kex_init,
120 )
121 hm.add_string(peer_host_key_bytes)
122 hm.add_string(
123 self.key.public_key().public_bytes(
124 serialization.Encoding.Raw, serialization.PublicFormat.Raw
125 )
126 )
127 hm.add_string(peer_key_bytes)
128 hm.add_mpint(K)
129 self.transport._set_K_H(K, self.hash_algo(hm.asbytes()).digest())
130 self.transport._verify_key(peer_host_key_bytes, sig)
131 self.transport._activate_outbound()