Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pymysql/_auth.py: 16%
140 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:28 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:28 +0000
1"""
2Implements auth methods
3"""
4from .err import OperationalError
7try:
8 from cryptography.hazmat.backends import default_backend
9 from cryptography.hazmat.primitives import serialization, hashes
10 from cryptography.hazmat.primitives.asymmetric import padding
12 _have_cryptography = True
13except ImportError:
14 _have_cryptography = False
16from functools import partial
17import hashlib
20DEBUG = False
21SCRAMBLE_LENGTH = 20
22sha1_new = partial(hashlib.new, "sha1")
25# mysql_native_password
26# https://dev.mysql.com/doc/internals/en/secure-password-authentication.html#packet-Authentication::Native41
29def scramble_native_password(password, message):
30 """Scramble used for mysql_native_password"""
31 if not password:
32 return b""
34 stage1 = sha1_new(password).digest()
35 stage2 = sha1_new(stage1).digest()
36 s = sha1_new()
37 s.update(message[:SCRAMBLE_LENGTH])
38 s.update(stage2)
39 result = s.digest()
40 return _my_crypt(result, stage1)
43def _my_crypt(message1, message2):
44 result = bytearray(message1)
46 for i in range(len(result)):
47 result[i] ^= message2[i]
49 return bytes(result)
52# MariaDB's client_ed25519-plugin
53# https://mariadb.com/kb/en/library/connection/#client_ed25519-plugin
55_nacl_bindings = False
58def _init_nacl():
59 global _nacl_bindings
60 try:
61 from nacl import bindings
63 _nacl_bindings = bindings
64 except ImportError:
65 raise RuntimeError(
66 "'pynacl' package is required for ed25519_password auth method"
67 )
70def _scalar_clamp(s32):
71 ba = bytearray(s32)
72 ba0 = bytes(bytearray([ba[0] & 248]))
73 ba31 = bytes(bytearray([(ba[31] & 127) | 64]))
74 return ba0 + bytes(s32[1:31]) + ba31
77def ed25519_password(password, scramble):
78 """Sign a random scramble with elliptic curve Ed25519.
80 Secret and public key are derived from password.
81 """
82 # variable names based on rfc8032 section-5.1.6
83 #
84 if not _nacl_bindings:
85 _init_nacl()
87 # h = SHA512(password)
88 h = hashlib.sha512(password).digest()
90 # s = prune(first_half(h))
91 s = _scalar_clamp(h[:32])
93 # r = SHA512(second_half(h) || M)
94 r = hashlib.sha512(h[32:] + scramble).digest()
96 # R = encoded point [r]B
97 r = _nacl_bindings.crypto_core_ed25519_scalar_reduce(r)
98 R = _nacl_bindings.crypto_scalarmult_ed25519_base_noclamp(r)
100 # A = encoded point [s]B
101 A = _nacl_bindings.crypto_scalarmult_ed25519_base_noclamp(s)
103 # k = SHA512(R || A || M)
104 k = hashlib.sha512(R + A + scramble).digest()
106 # S = (k * s + r) mod L
107 k = _nacl_bindings.crypto_core_ed25519_scalar_reduce(k)
108 ks = _nacl_bindings.crypto_core_ed25519_scalar_mul(k, s)
109 S = _nacl_bindings.crypto_core_ed25519_scalar_add(ks, r)
111 # signature = R || S
112 return R + S
115# sha256_password
118def _roundtrip(conn, send_data):
119 conn.write_packet(send_data)
120 pkt = conn._read_packet()
121 pkt.check_error()
122 return pkt
125def _xor_password(password, salt):
126 # Trailing NUL character will be added in Auth Switch Request.
127 # See https://github.com/mysql/mysql-server/blob/7d10c82196c8e45554f27c00681474a9fb86d137/sql/auth/sha2_password.cc#L939-L945
128 salt = salt[:SCRAMBLE_LENGTH]
129 password_bytes = bytearray(password)
130 # salt = bytearray(salt) # for PY2 compat.
131 salt_len = len(salt)
132 for i in range(len(password_bytes)):
133 password_bytes[i] ^= salt[i % salt_len]
134 return bytes(password_bytes)
137def sha2_rsa_encrypt(password, salt, public_key):
138 """Encrypt password with salt and public_key.
140 Used for sha256_password and caching_sha2_password.
141 """
142 if not _have_cryptography:
143 raise RuntimeError(
144 "'cryptography' package is required for sha256_password or"
145 + " caching_sha2_password auth methods"
146 )
147 message = _xor_password(password + b"\0", salt)
148 rsa_key = serialization.load_pem_public_key(public_key, default_backend())
149 return rsa_key.encrypt(
150 message,
151 padding.OAEP(
152 mgf=padding.MGF1(algorithm=hashes.SHA1()),
153 algorithm=hashes.SHA1(),
154 label=None,
155 ),
156 )
159def sha256_password_auth(conn, pkt):
160 if conn._secure:
161 if DEBUG:
162 print("sha256: Sending plain password")
163 data = conn.password + b"\0"
164 return _roundtrip(conn, data)
166 if pkt.is_auth_switch_request():
167 conn.salt = pkt.read_all()
168 if not conn.server_public_key and conn.password:
169 # Request server public key
170 if DEBUG:
171 print("sha256: Requesting server public key")
172 pkt = _roundtrip(conn, b"\1")
174 if pkt.is_extra_auth_data():
175 conn.server_public_key = pkt._data[1:]
176 if DEBUG:
177 print("Received public key:\n", conn.server_public_key.decode("ascii"))
179 if conn.password:
180 if not conn.server_public_key:
181 raise OperationalError("Couldn't receive server's public key")
183 data = sha2_rsa_encrypt(conn.password, conn.salt, conn.server_public_key)
184 else:
185 data = b""
187 return _roundtrip(conn, data)
190def scramble_caching_sha2(password, nonce):
191 # (bytes, bytes) -> bytes
192 """Scramble algorithm used in cached_sha2_password fast path.
194 XOR(SHA256(password), SHA256(SHA256(SHA256(password)), nonce))
195 """
196 if not password:
197 return b""
199 p1 = hashlib.sha256(password).digest()
200 p2 = hashlib.sha256(p1).digest()
201 p3 = hashlib.sha256(p2 + nonce).digest()
203 res = bytearray(p1)
204 for i in range(len(p3)):
205 res[i] ^= p3[i]
207 return bytes(res)
210def caching_sha2_password_auth(conn, pkt):
211 # No password fast path
212 if not conn.password:
213 return _roundtrip(conn, b"")
215 if pkt.is_auth_switch_request():
216 # Try from fast auth
217 if DEBUG:
218 print("caching sha2: Trying fast path")
219 conn.salt = pkt.read_all()
220 scrambled = scramble_caching_sha2(conn.password, conn.salt)
221 pkt = _roundtrip(conn, scrambled)
222 # else: fast auth is tried in initial handshake
224 if not pkt.is_extra_auth_data():
225 raise OperationalError(
226 "caching sha2: Unknown packet for fast auth: %s" % pkt._data[:1]
227 )
229 # magic numbers:
230 # 2 - request public key
231 # 3 - fast auth succeeded
232 # 4 - need full auth
234 pkt.advance(1)
235 n = pkt.read_uint8()
237 if n == 3:
238 if DEBUG:
239 print("caching sha2: succeeded by fast path.")
240 pkt = conn._read_packet()
241 pkt.check_error() # pkt must be OK packet
242 return pkt
244 if n != 4:
245 raise OperationalError("caching sha2: Unknown result for fast auth: %s" % n)
247 if DEBUG:
248 print("caching sha2: Trying full auth...")
250 if conn._secure:
251 if DEBUG:
252 print("caching sha2: Sending plain password via secure connection")
253 return _roundtrip(conn, conn.password + b"\0")
255 if not conn.server_public_key:
256 pkt = _roundtrip(conn, b"\x02") # Request public key
257 if not pkt.is_extra_auth_data():
258 raise OperationalError(
259 "caching sha2: Unknown packet for public key: %s" % pkt._data[:1]
260 )
262 conn.server_public_key = pkt._data[1:]
263 if DEBUG:
264 print(conn.server_public_key.decode("ascii"))
266 data = sha2_rsa_encrypt(conn.password, conn.salt, conn.server_public_key)
267 pkt = _roundtrip(conn, data)