Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/kex_curve25519.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

82 statements  

1import binascii 

2import hashlib 

3 

4from cryptography.exceptions import UnsupportedAlgorithm 

5from cryptography.hazmat.primitives import constant_time, serialization 

6from cryptography.hazmat.primitives.asymmetric.x25519 import ( 

7 X25519PrivateKey, 

8 X25519PublicKey, 

9) 

10 

11from paramiko.message import Message 

12from paramiko.common import byte_chr 

13from paramiko.ssh_exception import SSHException 

14 

15 

16_MSG_KEXECDH_INIT, _MSG_KEXECDH_REPLY = range(30, 32) 

17c_MSG_KEXECDH_INIT, c_MSG_KEXECDH_REPLY = [byte_chr(c) for c in range(30, 32)] 

18 

19 

20class KexCurve25519: 

21 hash_algo = hashlib.sha256 

22 

23 def __init__(self, transport): 

24 self.transport = transport 

25 self.key = None 

26 

27 @classmethod 

28 def is_available(cls): 

29 try: 

30 X25519PrivateKey.generate() 

31 except UnsupportedAlgorithm: 

32 return False 

33 else: 

34 return True 

35 

36 def _perform_exchange(self, peer_key): 

37 secret = self.key.exchange(peer_key) 

38 if constant_time.bytes_eq(secret, b"\x00" * 32): 

39 raise SSHException( 

40 "peer's curve25519 public value has wrong order" 

41 ) 

42 return secret 

43 

44 def start_kex(self): 

45 self.key = X25519PrivateKey.generate() 

46 if self.transport.server_mode: 

47 self.transport._expect_packet(_MSG_KEXECDH_INIT) 

48 return 

49 

50 m = Message() 

51 m.add_byte(c_MSG_KEXECDH_INIT) 

52 m.add_string( 

53 self.key.public_key().public_bytes( 

54 serialization.Encoding.Raw, serialization.PublicFormat.Raw 

55 ) 

56 ) 

57 self.transport._send_message(m) 

58 self.transport._expect_packet(_MSG_KEXECDH_REPLY) 

59 

60 def parse_next(self, ptype, m): 

61 if self.transport.server_mode and (ptype == _MSG_KEXECDH_INIT): 

62 return self._parse_kexecdh_init(m) 

63 elif not self.transport.server_mode and (ptype == _MSG_KEXECDH_REPLY): 

64 return self._parse_kexecdh_reply(m) 

65 raise SSHException( 

66 "KexCurve25519 asked to handle packet type {:d}".format(ptype) 

67 ) 

68 

69 def _parse_kexecdh_init(self, m): 

70 peer_key_bytes = m.get_string() 

71 peer_key = X25519PublicKey.from_public_bytes(peer_key_bytes) 

72 K = self._perform_exchange(peer_key) 

73 K = int(binascii.hexlify(K), 16) 

74 # compute exchange hash 

75 hm = Message() 

76 hm.add( 

77 self.transport.remote_version, 

78 self.transport.local_version, 

79 self.transport.remote_kex_init, 

80 self.transport.local_kex_init, 

81 ) 

82 server_key_bytes = self.transport.get_server_key().asbytes() 

83 exchange_key_bytes = self.key.public_key().public_bytes( 

84 serialization.Encoding.Raw, serialization.PublicFormat.Raw 

85 ) 

86 hm.add_string(server_key_bytes) 

87 hm.add_string(peer_key_bytes) 

88 hm.add_string(exchange_key_bytes) 

89 hm.add_mpint(K) 

90 H = self.hash_algo(hm.asbytes()).digest() 

91 self.transport._set_K_H(K, H) 

92 sig = self.transport.get_server_key().sign_ssh_data( 

93 H, self.transport.host_key_type 

94 ) 

95 # construct reply 

96 m = Message() 

97 m.add_byte(c_MSG_KEXECDH_REPLY) 

98 m.add_string(server_key_bytes) 

99 m.add_string(exchange_key_bytes) 

100 m.add_string(sig) 

101 self.transport._send_message(m) 

102 self.transport._activate_outbound() 

103 

104 def _parse_kexecdh_reply(self, m): 

105 peer_host_key_bytes = m.get_string() 

106 peer_key_bytes = m.get_string() 

107 sig = m.get_binary() 

108 

109 peer_key = X25519PublicKey.from_public_bytes(peer_key_bytes) 

110 

111 K = self._perform_exchange(peer_key) 

112 K = int(binascii.hexlify(K), 16) 

113 # compute exchange hash and verify signature 

114 hm = Message() 

115 hm.add( 

116 self.transport.local_version, 

117 self.transport.remote_version, 

118 self.transport.local_kex_init, 

119 self.transport.remote_kex_init, 

120 ) 

121 hm.add_string(peer_host_key_bytes) 

122 hm.add_string( 

123 self.key.public_key().public_bytes( 

124 serialization.Encoding.Raw, serialization.PublicFormat.Raw 

125 ) 

126 ) 

127 hm.add_string(peer_key_bytes) 

128 hm.add_mpint(K) 

129 self.transport._set_K_H(K, self.hash_algo(hm.asbytes()).digest()) 

130 self.transport._verify_key(peer_host_key_bytes, sig) 

131 self.transport._activate_outbound()