Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pymysql/connections.py: 27%

833 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:28 +0000

1# Python implementation of the MySQL client-server protocol 

2# http://dev.mysql.com/doc/internals/en/client-server-protocol.html 

3# Error codes: 

4# https://dev.mysql.com/doc/refman/5.5/en/error-handling.html 

5import errno 

6import os 

7import socket 

8import struct 

9import sys 

10import traceback 

11import warnings 

12 

13from . import _auth 

14 

15from .charset import charset_by_name, charset_by_id 

16from .constants import CLIENT, COMMAND, CR, ER, FIELD_TYPE, SERVER_STATUS 

17from . import converters 

18from .cursors import Cursor 

19from .optionfile import Parser 

20from .protocol import ( 

21 dump_packet, 

22 MysqlPacket, 

23 FieldDescriptorPacket, 

24 OKPacketWrapper, 

25 EOFPacketWrapper, 

26 LoadLocalPacketWrapper, 

27) 

28from . import err, VERSION_STRING 

29 

30try: 

31 import ssl 

32 

33 SSL_ENABLED = True 

34except ImportError: 

35 ssl = None 

36 SSL_ENABLED = False 

37 

38try: 

39 import getpass 

40 

41 DEFAULT_USER = getpass.getuser() 

42 del getpass 

43except (ImportError, KeyError): 

44 # KeyError occurs when there's no entry in OS database for a current user. 

45 DEFAULT_USER = None 

46 

47DEBUG = False 

48 

49TEXT_TYPES = { 

50 FIELD_TYPE.BIT, 

51 FIELD_TYPE.BLOB, 

52 FIELD_TYPE.LONG_BLOB, 

53 FIELD_TYPE.MEDIUM_BLOB, 

54 FIELD_TYPE.STRING, 

55 FIELD_TYPE.TINY_BLOB, 

56 FIELD_TYPE.VAR_STRING, 

57 FIELD_TYPE.VARCHAR, 

58 FIELD_TYPE.GEOMETRY, 

59} 

60 

61 

62DEFAULT_CHARSET = "utf8mb4" 

63 

64MAX_PACKET_LEN = 2**24 - 1 

65 

66 

67def _pack_int24(n): 

68 return struct.pack("<I", n)[:3] 

69 

70 

71# https://dev.mysql.com/doc/internals/en/integer.html#packet-Protocol::LengthEncodedInteger 

72def _lenenc_int(i): 

73 if i < 0: 

74 raise ValueError( 

75 "Encoding %d is less than 0 - no representation in LengthEncodedInteger" % i 

76 ) 

77 elif i < 0xFB: 

78 return bytes([i]) 

79 elif i < (1 << 16): 

80 return b"\xfc" + struct.pack("<H", i) 

81 elif i < (1 << 24): 

82 return b"\xfd" + struct.pack("<I", i)[:3] 

83 elif i < (1 << 64): 

84 return b"\xfe" + struct.pack("<Q", i) 

85 else: 

86 raise ValueError( 

87 "Encoding %x is larger than %x - no representation in LengthEncodedInteger" 

88 % (i, (1 << 64)) 

89 ) 

90 

91 

92class Connection: 

93 """ 

94 Representation of a socket with a mysql server. 

95 

96 The proper way to get an instance of this class is to call 

97 connect(). 

98 

99 Establish a connection to the MySQL database. Accepts several 

100 arguments: 

101 

102 :param host: Host where the database server is located. 

103 :param user: Username to log in as. 

104 :param password: Password to use. 

105 :param database: Database to use, None to not use a particular one. 

106 :param port: MySQL port to use, default is usually OK. (default: 3306) 

107 :param bind_address: When the client has multiple network interfaces, specify 

108 the interface from which to connect to the host. Argument can be 

109 a hostname or an IP address. 

110 :param unix_socket: Use a unix socket rather than TCP/IP. 

111 :param read_timeout: The timeout for reading from the connection in seconds. 

112 (default: None - no timeout) 

113 :param write_timeout: The timeout for writing to the connection in seconds. 

114 (default: None - no timeout) 

115 :param str charset: Charset to use. 

116 :param str collation: Collation name to use. 

117 :param sql_mode: Default SQL_MODE to use. 

118 :param read_default_file: 

119 Specifies my.cnf file to read these parameters from under the [client] section. 

120 :param conv: 

121 Conversion dictionary to use instead of the default one. 

122 This is used to provide custom marshalling and unmarshalling of types. 

123 See converters. 

124 :param use_unicode: 

125 Whether or not to default to unicode strings. 

126 This option defaults to true. 

127 :param client_flag: Custom flags to send to MySQL. Find potential values in constants.CLIENT. 

128 :param cursorclass: Custom cursor class to use. 

129 :param init_command: Initial SQL statement to run when connection is established. 

130 :param connect_timeout: The timeout for connecting to the database in seconds. 

131 (default: 10, min: 1, max: 31536000) 

132 :param ssl: A dict of arguments similar to mysql_ssl_set()'s parameters or an ssl.SSLContext. 

133 :param ssl_ca: Path to the file that contains a PEM-formatted CA certificate. 

134 :param ssl_cert: Path to the file that contains a PEM-formatted client certificate. 

135 :param ssl_disabled: A boolean value that disables usage of TLS. 

136 :param ssl_key: Path to the file that contains a PEM-formatted private key for 

137 the client certificate. 

138 :param ssl_verify_cert: Set to true to check the server certificate's validity. 

139 :param ssl_verify_identity: Set to true to check the server's identity. 

140 :param read_default_group: Group to read from in the configuration file. 

141 :param autocommit: Autocommit mode. None means use server default. (default: False) 

142 :param local_infile: Boolean to enable the use of LOAD DATA LOCAL command. (default: False) 

143 :param max_allowed_packet: Max size of packet sent to server in bytes. (default: 16MB) 

144 Only used to limit size of "LOAD LOCAL INFILE" data packet smaller than default (16KB). 

145 :param defer_connect: Don't explicitly connect on construction - wait for connect call. 

146 (default: False) 

147 :param auth_plugin_map: A dict of plugin names to a class that processes that plugin. 

148 The class will take the Connection object as the argument to the constructor. 

149 The class needs an authenticate method taking an authentication packet as 

150 an argument. For the dialog plugin, a prompt(echo, prompt) method can be used 

151 (if no authenticate method) for returning a string from the user. (experimental) 

152 :param server_public_key: SHA256 authentication plugin public key value. (default: None) 

153 :param binary_prefix: Add _binary prefix on bytes and bytearray. (default: False) 

154 :param compress: Not supported. 

155 :param named_pipe: Not supported. 

156 :param db: **DEPRECATED** Alias for database. 

157 :param passwd: **DEPRECATED** Alias for password. 

158 

159 See `Connection <https://www.python.org/dev/peps/pep-0249/#connection-objects>`_ in the 

160 specification. 

161 """ 

162 

163 _sock = None 

164 _auth_plugin_name = "" 

165 _closed = False 

166 _secure = False 

167 

168 def __init__( 

169 self, 

170 *, 

171 user=None, # The first four arguments is based on DB-API 2.0 recommendation. 

172 password="", 

173 host=None, 

174 database=None, 

175 unix_socket=None, 

176 port=0, 

177 charset="", 

178 collation=None, 

179 sql_mode=None, 

180 read_default_file=None, 

181 conv=None, 

182 use_unicode=True, 

183 client_flag=0, 

184 cursorclass=Cursor, 

185 init_command=None, 

186 connect_timeout=10, 

187 read_default_group=None, 

188 autocommit=False, 

189 local_infile=False, 

190 max_allowed_packet=16 * 1024 * 1024, 

191 defer_connect=False, 

192 auth_plugin_map=None, 

193 read_timeout=None, 

194 write_timeout=None, 

195 bind_address=None, 

196 binary_prefix=False, 

197 program_name=None, 

198 server_public_key=None, 

199 ssl=None, 

200 ssl_ca=None, 

201 ssl_cert=None, 

202 ssl_disabled=None, 

203 ssl_key=None, 

204 ssl_verify_cert=None, 

205 ssl_verify_identity=None, 

206 compress=None, # not supported 

207 named_pipe=None, # not supported 

208 passwd=None, # deprecated 

209 db=None, # deprecated 

210 ): 

211 if db is not None and database is None: 

212 # We will raise warning in 2022 or later. 

213 # See https://github.com/PyMySQL/PyMySQL/issues/939 

214 # warnings.warn("'db' is deprecated, use 'database'", DeprecationWarning, 3) 

215 database = db 

216 if passwd is not None and not password: 

217 # We will raise warning in 2022 or later. 

218 # See https://github.com/PyMySQL/PyMySQL/issues/939 

219 # warnings.warn( 

220 # "'passwd' is deprecated, use 'password'", DeprecationWarning, 3 

221 # ) 

222 password = passwd 

223 

224 if compress or named_pipe: 

225 raise NotImplementedError( 

226 "compress and named_pipe arguments are not supported" 

227 ) 

228 

229 self._local_infile = bool(local_infile) 

230 if self._local_infile: 

231 client_flag |= CLIENT.LOCAL_FILES 

232 

233 if read_default_group and not read_default_file: 

234 if sys.platform.startswith("win"): 

235 read_default_file = "c:\\my.ini" 

236 else: 

237 read_default_file = "/etc/my.cnf" 

238 

239 if read_default_file: 

240 if not read_default_group: 

241 read_default_group = "client" 

242 

243 cfg = Parser() 

244 cfg.read(os.path.expanduser(read_default_file)) 

245 

246 def _config(key, arg): 

247 if arg: 

248 return arg 

249 try: 

250 return cfg.get(read_default_group, key) 

251 except Exception: 

252 return arg 

253 

254 user = _config("user", user) 

255 password = _config("password", password) 

256 host = _config("host", host) 

257 database = _config("database", database) 

258 unix_socket = _config("socket", unix_socket) 

259 port = int(_config("port", port)) 

260 bind_address = _config("bind-address", bind_address) 

261 charset = _config("default-character-set", charset) 

262 if not ssl: 

263 ssl = {} 

264 if isinstance(ssl, dict): 

265 for key in ["ca", "capath", "cert", "key", "cipher"]: 

266 value = _config("ssl-" + key, ssl.get(key)) 

267 if value: 

268 ssl[key] = value 

269 

270 self.ssl = False 

271 if not ssl_disabled: 

272 if ssl_ca or ssl_cert or ssl_key or ssl_verify_cert or ssl_verify_identity: 

273 ssl = { 

274 "ca": ssl_ca, 

275 "check_hostname": bool(ssl_verify_identity), 

276 "verify_mode": ssl_verify_cert 

277 if ssl_verify_cert is not None 

278 else False, 

279 } 

280 if ssl_cert is not None: 

281 ssl["cert"] = ssl_cert 

282 if ssl_key is not None: 

283 ssl["key"] = ssl_key 

284 if ssl: 

285 if not SSL_ENABLED: 

286 raise NotImplementedError("ssl module not found") 

287 self.ssl = True 

288 client_flag |= CLIENT.SSL 

289 self.ctx = self._create_ssl_ctx(ssl) 

290 

291 self.host = host or "localhost" 

292 self.port = port or 3306 

293 if type(self.port) is not int: 

294 raise ValueError("port should be of type int") 

295 self.user = user or DEFAULT_USER 

296 self.password = password or b"" 

297 if isinstance(self.password, str): 

298 self.password = self.password.encode("latin1") 

299 self.db = database 

300 self.unix_socket = unix_socket 

301 self.bind_address = bind_address 

302 if not (0 < connect_timeout <= 31536000): 

303 raise ValueError("connect_timeout should be >0 and <=31536000") 

304 self.connect_timeout = connect_timeout or None 

305 if read_timeout is not None and read_timeout <= 0: 

306 raise ValueError("read_timeout should be > 0") 

307 self._read_timeout = read_timeout 

308 if write_timeout is not None and write_timeout <= 0: 

309 raise ValueError("write_timeout should be > 0") 

310 self._write_timeout = write_timeout 

311 

312 self.charset = charset or DEFAULT_CHARSET 

313 self.collation = collation 

314 self.use_unicode = use_unicode 

315 

316 self.encoding = charset_by_name(self.charset).encoding 

317 

318 client_flag |= CLIENT.CAPABILITIES 

319 if self.db: 

320 client_flag |= CLIENT.CONNECT_WITH_DB 

321 

322 self.client_flag = client_flag 

323 

324 self.cursorclass = cursorclass 

325 

326 self._result = None 

327 self._affected_rows = 0 

328 self.host_info = "Not connected" 

329 

330 # specified autocommit mode. None means use server default. 

331 self.autocommit_mode = autocommit 

332 

333 if conv is None: 

334 conv = converters.conversions 

335 

336 # Need for MySQLdb compatibility. 

337 self.encoders = {k: v for (k, v) in conv.items() if type(k) is not int} 

338 self.decoders = {k: v for (k, v) in conv.items() if type(k) is int} 

339 self.sql_mode = sql_mode 

340 self.init_command = init_command 

341 self.max_allowed_packet = max_allowed_packet 

342 self._auth_plugin_map = auth_plugin_map or {} 

343 self._binary_prefix = binary_prefix 

344 self.server_public_key = server_public_key 

345 

346 self._connect_attrs = { 

347 "_client_name": "pymysql", 

348 "_client_version": VERSION_STRING, 

349 "_pid": str(os.getpid()), 

350 } 

351 

352 if program_name: 

353 self._connect_attrs["program_name"] = program_name 

354 

355 if defer_connect: 

356 self._sock = None 

357 else: 

358 self.connect() 

359 

360 def __enter__(self): 

361 return self 

362 

363 def __exit__(self, *exc_info): 

364 del exc_info 

365 self.close() 

366 

367 def _create_ssl_ctx(self, sslp): 

368 if isinstance(sslp, ssl.SSLContext): 

369 return sslp 

370 ca = sslp.get("ca") 

371 capath = sslp.get("capath") 

372 hasnoca = ca is None and capath is None 

373 ctx = ssl.create_default_context(cafile=ca, capath=capath) 

374 ctx.check_hostname = not hasnoca and sslp.get("check_hostname", True) 

375 verify_mode_value = sslp.get("verify_mode") 

376 if verify_mode_value is None: 

377 ctx.verify_mode = ssl.CERT_NONE if hasnoca else ssl.CERT_REQUIRED 

378 elif isinstance(verify_mode_value, bool): 

379 ctx.verify_mode = ssl.CERT_REQUIRED if verify_mode_value else ssl.CERT_NONE 

380 else: 

381 if isinstance(verify_mode_value, str): 

382 verify_mode_value = verify_mode_value.lower() 

383 if verify_mode_value in ("none", "0", "false", "no"): 

384 ctx.verify_mode = ssl.CERT_NONE 

385 elif verify_mode_value == "optional": 

386 ctx.verify_mode = ssl.CERT_OPTIONAL 

387 elif verify_mode_value in ("required", "1", "true", "yes"): 

388 ctx.verify_mode = ssl.CERT_REQUIRED 

389 else: 

390 ctx.verify_mode = ssl.CERT_NONE if hasnoca else ssl.CERT_REQUIRED 

391 if "cert" in sslp: 

392 ctx.load_cert_chain(sslp["cert"], keyfile=sslp.get("key")) 

393 if "cipher" in sslp: 

394 ctx.set_ciphers(sslp["cipher"]) 

395 ctx.options |= ssl.OP_NO_SSLv2 

396 ctx.options |= ssl.OP_NO_SSLv3 

397 return ctx 

398 

399 def close(self): 

400 """ 

401 Send the quit message and close the socket. 

402 

403 See `Connection.close() <https://www.python.org/dev/peps/pep-0249/#Connection.close>`_ 

404 in the specification. 

405 

406 :raise Error: If the connection is already closed. 

407 """ 

408 if self._closed: 

409 raise err.Error("Already closed") 

410 self._closed = True 

411 if self._sock is None: 

412 return 

413 send_data = struct.pack("<iB", 1, COMMAND.COM_QUIT) 

414 try: 

415 self._write_bytes(send_data) 

416 except Exception: 

417 pass 

418 finally: 

419 self._force_close() 

420 

421 @property 

422 def open(self): 

423 """Return True if the connection is open.""" 

424 return self._sock is not None 

425 

426 def _force_close(self): 

427 """Close connection without QUIT message.""" 

428 if self._sock: 

429 try: 

430 self._sock.close() 

431 except: # noqa 

432 pass 

433 self._sock = None 

434 self._rfile = None 

435 

436 __del__ = _force_close 

437 

438 def autocommit(self, value): 

439 self.autocommit_mode = bool(value) 

440 current = self.get_autocommit() 

441 if value != current: 

442 self._send_autocommit_mode() 

443 

444 def get_autocommit(self): 

445 return bool(self.server_status & SERVER_STATUS.SERVER_STATUS_AUTOCOMMIT) 

446 

447 def _read_ok_packet(self): 

448 pkt = self._read_packet() 

449 if not pkt.is_ok_packet(): 

450 raise err.OperationalError( 

451 CR.CR_COMMANDS_OUT_OF_SYNC, 

452 "Command Out of Sync", 

453 ) 

454 ok = OKPacketWrapper(pkt) 

455 self.server_status = ok.server_status 

456 return ok 

457 

458 def _send_autocommit_mode(self): 

459 """Set whether or not to commit after every execute().""" 

460 self._execute_command( 

461 COMMAND.COM_QUERY, "SET AUTOCOMMIT = %s" % self.escape(self.autocommit_mode) 

462 ) 

463 self._read_ok_packet() 

464 

465 def begin(self): 

466 """Begin transaction.""" 

467 self._execute_command(COMMAND.COM_QUERY, "BEGIN") 

468 self._read_ok_packet() 

469 

470 def commit(self): 

471 """ 

472 Commit changes to stable storage. 

473 

474 See `Connection.commit() <https://www.python.org/dev/peps/pep-0249/#commit>`_ 

475 in the specification. 

476 """ 

477 self._execute_command(COMMAND.COM_QUERY, "COMMIT") 

478 self._read_ok_packet() 

479 

480 def rollback(self): 

481 """ 

482 Roll back the current transaction. 

483 

484 See `Connection.rollback() <https://www.python.org/dev/peps/pep-0249/#rollback>`_ 

485 in the specification. 

486 """ 

487 self._execute_command(COMMAND.COM_QUERY, "ROLLBACK") 

488 self._read_ok_packet() 

489 

490 def show_warnings(self): 

491 """Send the "SHOW WARNINGS" SQL command.""" 

492 self._execute_command(COMMAND.COM_QUERY, "SHOW WARNINGS") 

493 result = MySQLResult(self) 

494 result.read() 

495 return result.rows 

496 

497 def select_db(self, db): 

498 """ 

499 Set current db. 

500 

501 :param db: The name of the db. 

502 """ 

503 self._execute_command(COMMAND.COM_INIT_DB, db) 

504 self._read_ok_packet() 

505 

506 def escape(self, obj, mapping=None): 

507 """Escape whatever value is passed. 

508 

509 Non-standard, for internal use; do not use this in your applications. 

510 """ 

511 if isinstance(obj, str): 

512 return "'" + self.escape_string(obj) + "'" 

513 if isinstance(obj, (bytes, bytearray)): 

514 ret = self._quote_bytes(obj) 

515 if self._binary_prefix: 

516 ret = "_binary" + ret 

517 return ret 

518 return converters.escape_item(obj, self.charset, mapping=mapping) 

519 

520 def literal(self, obj): 

521 """Alias for escape(). 

522 

523 Non-standard, for internal use; do not use this in your applications. 

524 """ 

525 return self.escape(obj, self.encoders) 

526 

527 def escape_string(self, s): 

528 if self.server_status & SERVER_STATUS.SERVER_STATUS_NO_BACKSLASH_ESCAPES: 

529 return s.replace("'", "''") 

530 return converters.escape_string(s) 

531 

532 def _quote_bytes(self, s): 

533 if self.server_status & SERVER_STATUS.SERVER_STATUS_NO_BACKSLASH_ESCAPES: 

534 return "'{}'".format( 

535 s.replace(b"'", b"''").decode("ascii", "surrogateescape") 

536 ) 

537 return converters.escape_bytes(s) 

538 

539 def cursor(self, cursor=None): 

540 """ 

541 Create a new cursor to execute queries with. 

542 

543 :param cursor: The type of cursor to create. None means use Cursor. 

544 :type cursor: :py:class:`Cursor`, :py:class:`SSCursor`, :py:class:`DictCursor`, 

545 or :py:class:`SSDictCursor`. 

546 """ 

547 if cursor: 

548 return cursor(self) 

549 return self.cursorclass(self) 

550 

551 # The following methods are INTERNAL USE ONLY (called from Cursor) 

552 def query(self, sql, unbuffered=False): 

553 # if DEBUG: 

554 # print("DEBUG: sending query:", sql) 

555 if isinstance(sql, str): 

556 sql = sql.encode(self.encoding, "surrogateescape") 

557 self._execute_command(COMMAND.COM_QUERY, sql) 

558 self._affected_rows = self._read_query_result(unbuffered=unbuffered) 

559 return self._affected_rows 

560 

561 def next_result(self, unbuffered=False): 

562 self._affected_rows = self._read_query_result(unbuffered=unbuffered) 

563 return self._affected_rows 

564 

565 def affected_rows(self): 

566 return self._affected_rows 

567 

568 def kill(self, thread_id): 

569 arg = struct.pack("<I", thread_id) 

570 self._execute_command(COMMAND.COM_PROCESS_KILL, arg) 

571 return self._read_ok_packet() 

572 

573 def ping(self, reconnect=True): 

574 """ 

575 Check if the server is alive. 

576 

577 :param reconnect: If the connection is closed, reconnect. 

578 :type reconnect: boolean 

579 

580 :raise Error: If the connection is closed and reconnect=False. 

581 """ 

582 if self._sock is None: 

583 if reconnect: 

584 self.connect() 

585 reconnect = False 

586 else: 

587 raise err.Error("Already closed") 

588 try: 

589 self._execute_command(COMMAND.COM_PING, "") 

590 self._read_ok_packet() 

591 except Exception: 

592 if reconnect: 

593 self.connect() 

594 self.ping(False) 

595 else: 

596 raise 

597 

598 def set_charset(self, charset): 

599 """Deprecated. Use set_character_set() instead.""" 

600 # This function has been implemented in old PyMySQL. 

601 # But this name is different from MySQLdb. 

602 # So we keep this function for compatibility and add 

603 # new set_character_set() function. 

604 self.set_character_set(charset) 

605 

606 def set_character_set(self, charset, collation=None): 

607 """ 

608 Set charaset (and collation) 

609 

610 Send "SET NAMES charset [COLLATE collation]" query. 

611 Update Connection.encoding based on charset. 

612 """ 

613 # Make sure charset is supported. 

614 encoding = charset_by_name(charset).encoding 

615 

616 if collation: 

617 query = f"SET NAMES {charset} COLLATE {collation}" 

618 else: 

619 query = f"SET NAMES {charset}" 

620 self._execute_command(COMMAND.COM_QUERY, query) 

621 self._read_packet() 

622 self.charset = charset 

623 self.encoding = encoding 

624 self.collation = collation 

625 

626 def connect(self, sock=None): 

627 self._closed = False 

628 try: 

629 if sock is None: 

630 if self.unix_socket: 

631 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 

632 sock.settimeout(self.connect_timeout) 

633 sock.connect(self.unix_socket) 

634 self.host_info = "Localhost via UNIX socket" 

635 self._secure = True 

636 if DEBUG: 

637 print("connected using unix_socket") 

638 else: 

639 kwargs = {} 

640 if self.bind_address is not None: 

641 kwargs["source_address"] = (self.bind_address, 0) 

642 while True: 

643 try: 

644 sock = socket.create_connection( 

645 (self.host, self.port), self.connect_timeout, **kwargs 

646 ) 

647 break 

648 except OSError as e: 

649 if e.errno == errno.EINTR: 

650 continue 

651 raise 

652 self.host_info = "socket %s:%d" % (self.host, self.port) 

653 if DEBUG: 

654 print("connected using socket") 

655 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 

656 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) 

657 sock.settimeout(None) 

658 

659 self._sock = sock 

660 self._rfile = sock.makefile("rb") 

661 self._next_seq_id = 0 

662 

663 self._get_server_information() 

664 self._request_authentication() 

665 

666 # Send "SET NAMES" query on init for: 

667 # - Ensure charaset (and collation) is set to the server. 

668 # - collation_id in handshake packet may be ignored. 

669 # - If collation is not specified, we don't know what is server's 

670 # default collation for the charset. For example, default collation 

671 # of utf8mb4 is: 

672 # - MySQL 5.7, MariaDB 10.x: utf8mb4_general_ci 

673 # - MySQL 8.0: utf8mb4_0900_ai_ci 

674 # 

675 # Reference: 

676 # - https://github.com/PyMySQL/PyMySQL/issues/1092 

677 # - https://github.com/wagtail/wagtail/issues/9477 

678 # - https://zenn.dev/methane/articles/2023-mysql-collation (Japanese) 

679 self.set_character_set(self.charset, self.collation) 

680 

681 if self.sql_mode is not None: 

682 c = self.cursor() 

683 c.execute("SET sql_mode=%s", (self.sql_mode,)) 

684 c.close() 

685 

686 if self.init_command is not None: 

687 c = self.cursor() 

688 c.execute(self.init_command) 

689 c.close() 

690 

691 if self.autocommit_mode is not None: 

692 self.autocommit(self.autocommit_mode) 

693 except BaseException as e: 

694 self._rfile = None 

695 if sock is not None: 

696 try: 

697 sock.close() 

698 except: # noqa 

699 pass 

700 

701 if isinstance(e, (OSError, IOError)): 

702 exc = err.OperationalError( 

703 CR.CR_CONN_HOST_ERROR, 

704 f"Can't connect to MySQL server on {self.host!r} ({e})", 

705 ) 

706 # Keep original exception and traceback to investigate error. 

707 exc.original_exception = e 

708 exc.traceback = traceback.format_exc() 

709 if DEBUG: 

710 print(exc.traceback) 

711 raise exc 

712 

713 # If e is neither DatabaseError or IOError, It's a bug. 

714 # But raising AssertionError hides original error. 

715 # So just reraise it. 

716 raise 

717 

718 def write_packet(self, payload): 

719 """Writes an entire "mysql packet" in its entirety to the network 

720 adding its length and sequence number. 

721 """ 

722 # Internal note: when you build packet manually and calls _write_bytes() 

723 # directly, you should set self._next_seq_id properly. 

724 data = _pack_int24(len(payload)) + bytes([self._next_seq_id]) + payload 

725 if DEBUG: 

726 dump_packet(data) 

727 self._write_bytes(data) 

728 self._next_seq_id = (self._next_seq_id + 1) % 256 

729 

730 def _read_packet(self, packet_type=MysqlPacket): 

731 """Read an entire "mysql packet" in its entirety from the network 

732 and return a MysqlPacket type that represents the results. 

733 

734 :raise OperationalError: If the connection to the MySQL server is lost. 

735 :raise InternalError: If the packet sequence number is wrong. 

736 """ 

737 buff = bytearray() 

738 while True: 

739 packet_header = self._read_bytes(4) 

740 # if DEBUG: dump_packet(packet_header) 

741 

742 btrl, btrh, packet_number = struct.unpack("<HBB", packet_header) 

743 bytes_to_read = btrl + (btrh << 16) 

744 if packet_number != self._next_seq_id: 

745 self._force_close() 

746 if packet_number == 0: 

747 # MariaDB sends error packet with seqno==0 when shutdown 

748 raise err.OperationalError( 

749 CR.CR_SERVER_LOST, 

750 "Lost connection to MySQL server during query", 

751 ) 

752 raise err.InternalError( 

753 "Packet sequence number wrong - got %d expected %d" 

754 % (packet_number, self._next_seq_id) 

755 ) 

756 self._next_seq_id = (self._next_seq_id + 1) % 256 

757 

758 recv_data = self._read_bytes(bytes_to_read) 

759 if DEBUG: 

760 dump_packet(recv_data) 

761 buff += recv_data 

762 # https://dev.mysql.com/doc/internals/en/sending-more-than-16mbyte.html 

763 if bytes_to_read == 0xFFFFFF: 

764 continue 

765 if bytes_to_read < MAX_PACKET_LEN: 

766 break 

767 

768 packet = packet_type(bytes(buff), self.encoding) 

769 if packet.is_error_packet(): 

770 if self._result is not None and self._result.unbuffered_active is True: 

771 self._result.unbuffered_active = False 

772 packet.raise_for_error() 

773 return packet 

774 

775 def _read_bytes(self, num_bytes): 

776 self._sock.settimeout(self._read_timeout) 

777 while True: 

778 try: 

779 data = self._rfile.read(num_bytes) 

780 break 

781 except OSError as e: 

782 if e.errno == errno.EINTR: 

783 continue 

784 self._force_close() 

785 raise err.OperationalError( 

786 CR.CR_SERVER_LOST, 

787 f"Lost connection to MySQL server during query ({e})", 

788 ) 

789 except BaseException: 

790 # Don't convert unknown exception to MySQLError. 

791 self._force_close() 

792 raise 

793 if len(data) < num_bytes: 

794 self._force_close() 

795 raise err.OperationalError( 

796 CR.CR_SERVER_LOST, "Lost connection to MySQL server during query" 

797 ) 

798 return data 

799 

800 def _write_bytes(self, data): 

801 self._sock.settimeout(self._write_timeout) 

802 try: 

803 self._sock.sendall(data) 

804 except OSError as e: 

805 self._force_close() 

806 raise err.OperationalError( 

807 CR.CR_SERVER_GONE_ERROR, f"MySQL server has gone away ({e!r})" 

808 ) 

809 

810 def _read_query_result(self, unbuffered=False): 

811 self._result = None 

812 if unbuffered: 

813 try: 

814 result = MySQLResult(self) 

815 result.init_unbuffered_query() 

816 except: 

817 result.unbuffered_active = False 

818 result.connection = None 

819 raise 

820 else: 

821 result = MySQLResult(self) 

822 result.read() 

823 self._result = result 

824 if result.server_status is not None: 

825 self.server_status = result.server_status 

826 return result.affected_rows 

827 

828 def insert_id(self): 

829 if self._result: 

830 return self._result.insert_id 

831 else: 

832 return 0 

833 

834 def _execute_command(self, command, sql): 

835 """ 

836 :raise InterfaceError: If the connection is closed. 

837 :raise ValueError: If no username was specified. 

838 """ 

839 if not self._sock: 

840 raise err.InterfaceError(0, "") 

841 

842 # If the last query was unbuffered, make sure it finishes before 

843 # sending new commands 

844 if self._result is not None: 

845 if self._result.unbuffered_active: 

846 warnings.warn("Previous unbuffered result was left incomplete") 

847 self._result._finish_unbuffered_query() 

848 while self._result.has_next: 

849 self.next_result() 

850 self._result = None 

851 

852 if isinstance(sql, str): 

853 sql = sql.encode(self.encoding) 

854 

855 packet_size = min(MAX_PACKET_LEN, len(sql) + 1) # +1 is for command 

856 

857 # tiny optimization: build first packet manually instead of 

858 # calling self..write_packet() 

859 prelude = struct.pack("<iB", packet_size, command) 

860 packet = prelude + sql[: packet_size - 1] 

861 self._write_bytes(packet) 

862 if DEBUG: 

863 dump_packet(packet) 

864 self._next_seq_id = 1 

865 

866 if packet_size < MAX_PACKET_LEN: 

867 return 

868 

869 sql = sql[packet_size - 1 :] 

870 while True: 

871 packet_size = min(MAX_PACKET_LEN, len(sql)) 

872 self.write_packet(sql[:packet_size]) 

873 sql = sql[packet_size:] 

874 if not sql and packet_size < MAX_PACKET_LEN: 

875 break 

876 

877 def _request_authentication(self): 

878 # https://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::HandshakeResponse 

879 if int(self.server_version.split(".", 1)[0]) >= 5: 

880 self.client_flag |= CLIENT.MULTI_RESULTS 

881 

882 if self.user is None: 

883 raise ValueError("Did not specify a username") 

884 

885 charset_id = charset_by_name(self.charset).id 

886 if isinstance(self.user, str): 

887 self.user = self.user.encode(self.encoding) 

888 

889 data_init = struct.pack( 

890 "<iIB23s", self.client_flag, MAX_PACKET_LEN, charset_id, b"" 

891 ) 

892 

893 if self.ssl and self.server_capabilities & CLIENT.SSL: 

894 self.write_packet(data_init) 

895 

896 self._sock = self.ctx.wrap_socket(self._sock, server_hostname=self.host) 

897 self._rfile = self._sock.makefile("rb") 

898 self._secure = True 

899 

900 data = data_init + self.user + b"\0" 

901 

902 authresp = b"" 

903 plugin_name = None 

904 

905 if self._auth_plugin_name == "": 

906 plugin_name = b"" 

907 authresp = _auth.scramble_native_password(self.password, self.salt) 

908 elif self._auth_plugin_name == "mysql_native_password": 

909 plugin_name = b"mysql_native_password" 

910 authresp = _auth.scramble_native_password(self.password, self.salt) 

911 elif self._auth_plugin_name == "caching_sha2_password": 

912 plugin_name = b"caching_sha2_password" 

913 if self.password: 

914 if DEBUG: 

915 print("caching_sha2: trying fast path") 

916 authresp = _auth.scramble_caching_sha2(self.password, self.salt) 

917 else: 

918 if DEBUG: 

919 print("caching_sha2: empty password") 

920 elif self._auth_plugin_name == "sha256_password": 

921 plugin_name = b"sha256_password" 

922 if self.ssl and self.server_capabilities & CLIENT.SSL: 

923 authresp = self.password + b"\0" 

924 elif self.password: 

925 authresp = b"\1" # request public key 

926 else: 

927 authresp = b"\0" # empty password 

928 

929 if self.server_capabilities & CLIENT.PLUGIN_AUTH_LENENC_CLIENT_DATA: 

930 data += _lenenc_int(len(authresp)) + authresp 

931 elif self.server_capabilities & CLIENT.SECURE_CONNECTION: 

932 data += struct.pack("B", len(authresp)) + authresp 

933 else: # pragma: no cover - not testing against servers without secure auth (>=5.0) 

934 data += authresp + b"\0" 

935 

936 if self.db and self.server_capabilities & CLIENT.CONNECT_WITH_DB: 

937 if isinstance(self.db, str): 

938 self.db = self.db.encode(self.encoding) 

939 data += self.db + b"\0" 

940 

941 if self.server_capabilities & CLIENT.PLUGIN_AUTH: 

942 data += (plugin_name or b"") + b"\0" 

943 

944 if self.server_capabilities & CLIENT.CONNECT_ATTRS: 

945 connect_attrs = b"" 

946 for k, v in self._connect_attrs.items(): 

947 k = k.encode("utf-8") 

948 connect_attrs += _lenenc_int(len(k)) + k 

949 v = v.encode("utf-8") 

950 connect_attrs += _lenenc_int(len(v)) + v 

951 data += _lenenc_int(len(connect_attrs)) + connect_attrs 

952 

953 self.write_packet(data) 

954 auth_packet = self._read_packet() 

955 

956 # if authentication method isn't accepted the first byte 

957 # will have the octet 254 

958 if auth_packet.is_auth_switch_request(): 

959 if DEBUG: 

960 print("received auth switch") 

961 # https://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::AuthSwitchRequest 

962 auth_packet.read_uint8() # 0xfe packet identifier 

963 plugin_name = auth_packet.read_string() 

964 if ( 

965 self.server_capabilities & CLIENT.PLUGIN_AUTH 

966 and plugin_name is not None 

967 ): 

968 auth_packet = self._process_auth(plugin_name, auth_packet) 

969 else: 

970 raise err.OperationalError("received unknown auth switch request") 

971 elif auth_packet.is_extra_auth_data(): 

972 if DEBUG: 

973 print("received extra data") 

974 # https://dev.mysql.com/doc/internals/en/successful-authentication.html 

975 if self._auth_plugin_name == "caching_sha2_password": 

976 auth_packet = _auth.caching_sha2_password_auth(self, auth_packet) 

977 elif self._auth_plugin_name == "sha256_password": 

978 auth_packet = _auth.sha256_password_auth(self, auth_packet) 

979 else: 

980 raise err.OperationalError( 

981 "Received extra packet for auth method %r", self._auth_plugin_name 

982 ) 

983 

984 if DEBUG: 

985 print("Succeed to auth") 

986 

987 def _process_auth(self, plugin_name, auth_packet): 

988 handler = self._get_auth_plugin_handler(plugin_name) 

989 if handler: 

990 try: 

991 return handler.authenticate(auth_packet) 

992 except AttributeError: 

993 if plugin_name != b"dialog": 

994 raise err.OperationalError( 

995 CR.CR_AUTH_PLUGIN_CANNOT_LOAD, 

996 "Authentication plugin '%s'" 

997 " not loaded: - %r missing authenticate method" 

998 % (plugin_name, type(handler)), 

999 ) 

1000 if plugin_name == b"caching_sha2_password": 

1001 return _auth.caching_sha2_password_auth(self, auth_packet) 

1002 elif plugin_name == b"sha256_password": 

1003 return _auth.sha256_password_auth(self, auth_packet) 

1004 elif plugin_name == b"mysql_native_password": 

1005 data = _auth.scramble_native_password(self.password, auth_packet.read_all()) 

1006 elif plugin_name == b"client_ed25519": 

1007 data = _auth.ed25519_password(self.password, auth_packet.read_all()) 

1008 elif plugin_name == b"mysql_old_password": 

1009 data = ( 

1010 _auth.scramble_old_password(self.password, auth_packet.read_all()) 

1011 + b"\0" 

1012 ) 

1013 elif plugin_name == b"mysql_clear_password": 

1014 # https://dev.mysql.com/doc/internals/en/clear-text-authentication.html 

1015 data = self.password + b"\0" 

1016 elif plugin_name == b"dialog": 

1017 pkt = auth_packet 

1018 while True: 

1019 flag = pkt.read_uint8() 

1020 echo = (flag & 0x06) == 0x02 

1021 last = (flag & 0x01) == 0x01 

1022 prompt = pkt.read_all() 

1023 

1024 if prompt == b"Password: ": 

1025 self.write_packet(self.password + b"\0") 

1026 elif handler: 

1027 resp = "no response - TypeError within plugin.prompt method" 

1028 try: 

1029 resp = handler.prompt(echo, prompt) 

1030 self.write_packet(resp + b"\0") 

1031 except AttributeError: 

1032 raise err.OperationalError( 

1033 CR.CR_AUTH_PLUGIN_CANNOT_LOAD, 

1034 "Authentication plugin '%s'" 

1035 " not loaded: - %r missing prompt method" 

1036 % (plugin_name, handler), 

1037 ) 

1038 except TypeError: 

1039 raise err.OperationalError( 

1040 CR.CR_AUTH_PLUGIN_ERR, 

1041 "Authentication plugin '%s'" 

1042 " %r didn't respond with string. Returned '%r' to prompt %r" 

1043 % (plugin_name, handler, resp, prompt), 

1044 ) 

1045 else: 

1046 raise err.OperationalError( 

1047 CR.CR_AUTH_PLUGIN_CANNOT_LOAD, 

1048 f"Authentication plugin '{plugin_name}' not configured", 

1049 ) 

1050 pkt = self._read_packet() 

1051 pkt.check_error() 

1052 if pkt.is_ok_packet() or last: 

1053 break 

1054 return pkt 

1055 else: 

1056 raise err.OperationalError( 

1057 CR.CR_AUTH_PLUGIN_CANNOT_LOAD, 

1058 "Authentication plugin '%s' not configured" % plugin_name, 

1059 ) 

1060 

1061 self.write_packet(data) 

1062 pkt = self._read_packet() 

1063 pkt.check_error() 

1064 return pkt 

1065 

1066 def _get_auth_plugin_handler(self, plugin_name): 

1067 plugin_class = self._auth_plugin_map.get(plugin_name) 

1068 if not plugin_class and isinstance(plugin_name, bytes): 

1069 plugin_class = self._auth_plugin_map.get(plugin_name.decode("ascii")) 

1070 if plugin_class: 

1071 try: 

1072 handler = plugin_class(self) 

1073 except TypeError: 

1074 raise err.OperationalError( 

1075 CR.CR_AUTH_PLUGIN_CANNOT_LOAD, 

1076 "Authentication plugin '%s'" 

1077 " not loaded: - %r cannot be constructed with connection object" 

1078 % (plugin_name, plugin_class), 

1079 ) 

1080 else: 

1081 handler = None 

1082 return handler 

1083 

1084 # _mysql support 

1085 def thread_id(self): 

1086 return self.server_thread_id[0] 

1087 

1088 def character_set_name(self): 

1089 return self.charset 

1090 

1091 def get_host_info(self): 

1092 return self.host_info 

1093 

1094 def get_proto_info(self): 

1095 return self.protocol_version 

1096 

1097 def _get_server_information(self): 

1098 i = 0 

1099 packet = self._read_packet() 

1100 data = packet.get_all_data() 

1101 

1102 self.protocol_version = data[i] 

1103 i += 1 

1104 

1105 server_end = data.find(b"\0", i) 

1106 self.server_version = data[i:server_end].decode("latin1") 

1107 i = server_end + 1 

1108 

1109 self.server_thread_id = struct.unpack("<I", data[i : i + 4]) 

1110 i += 4 

1111 

1112 self.salt = data[i : i + 8] 

1113 i += 9 # 8 + 1(filler) 

1114 

1115 self.server_capabilities = struct.unpack("<H", data[i : i + 2])[0] 

1116 i += 2 

1117 

1118 if len(data) >= i + 6: 

1119 lang, stat, cap_h, salt_len = struct.unpack("<BHHB", data[i : i + 6]) 

1120 i += 6 

1121 # TODO: deprecate server_language and server_charset. 

1122 # mysqlclient-python doesn't provide it. 

1123 self.server_language = lang 

1124 try: 

1125 self.server_charset = charset_by_id(lang).name 

1126 except KeyError: 

1127 # unknown collation 

1128 self.server_charset = None 

1129 

1130 self.server_status = stat 

1131 if DEBUG: 

1132 print("server_status: %x" % stat) 

1133 

1134 self.server_capabilities |= cap_h << 16 

1135 if DEBUG: 

1136 print("salt_len:", salt_len) 

1137 salt_len = max(12, salt_len - 9) 

1138 

1139 # reserved 

1140 i += 10 

1141 

1142 if len(data) >= i + salt_len: 

1143 # salt_len includes auth_plugin_data_part_1 and filler 

1144 self.salt += data[i : i + salt_len] 

1145 i += salt_len 

1146 

1147 i += 1 

1148 # AUTH PLUGIN NAME may appear here. 

1149 if self.server_capabilities & CLIENT.PLUGIN_AUTH and len(data) >= i: 

1150 # Due to Bug#59453 the auth-plugin-name is missing the terminating 

1151 # NUL-char in versions prior to 5.5.10 and 5.6.2. 

1152 # ref: https://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::Handshake 

1153 # didn't use version checks as mariadb is corrected and reports 

1154 # earlier than those two. 

1155 server_end = data.find(b"\0", i) 

1156 if server_end < 0: # pragma: no cover - very specific upstream bug 

1157 # not found \0 and last field so take it all 

1158 self._auth_plugin_name = data[i:].decode("utf-8") 

1159 else: 

1160 self._auth_plugin_name = data[i:server_end].decode("utf-8") 

1161 

1162 def get_server_info(self): 

1163 return self.server_version 

1164 

1165 Warning = err.Warning 

1166 Error = err.Error 

1167 InterfaceError = err.InterfaceError 

1168 DatabaseError = err.DatabaseError 

1169 DataError = err.DataError 

1170 OperationalError = err.OperationalError 

1171 IntegrityError = err.IntegrityError 

1172 InternalError = err.InternalError 

1173 ProgrammingError = err.ProgrammingError 

1174 NotSupportedError = err.NotSupportedError 

1175 

1176 

1177class MySQLResult: 

1178 def __init__(self, connection): 

1179 """ 

1180 :type connection: Connection 

1181 """ 

1182 self.connection = connection 

1183 self.affected_rows = None 

1184 self.insert_id = None 

1185 self.server_status = None 

1186 self.warning_count = 0 

1187 self.message = None 

1188 self.field_count = 0 

1189 self.description = None 

1190 self.rows = None 

1191 self.has_next = None 

1192 self.unbuffered_active = False 

1193 

1194 def __del__(self): 

1195 if self.unbuffered_active: 

1196 self._finish_unbuffered_query() 

1197 

1198 def read(self): 

1199 try: 

1200 first_packet = self.connection._read_packet() 

1201 

1202 if first_packet.is_ok_packet(): 

1203 self._read_ok_packet(first_packet) 

1204 elif first_packet.is_load_local_packet(): 

1205 self._read_load_local_packet(first_packet) 

1206 else: 

1207 self._read_result_packet(first_packet) 

1208 finally: 

1209 self.connection = None 

1210 

1211 def init_unbuffered_query(self): 

1212 """ 

1213 :raise OperationalError: If the connection to the MySQL server is lost. 

1214 :raise InternalError: 

1215 """ 

1216 self.unbuffered_active = True 

1217 first_packet = self.connection._read_packet() 

1218 

1219 if first_packet.is_ok_packet(): 

1220 self._read_ok_packet(first_packet) 

1221 self.unbuffered_active = False 

1222 self.connection = None 

1223 elif first_packet.is_load_local_packet(): 

1224 self._read_load_local_packet(first_packet) 

1225 self.unbuffered_active = False 

1226 self.connection = None 

1227 else: 

1228 self.field_count = first_packet.read_length_encoded_integer() 

1229 self._get_descriptions() 

1230 

1231 # Apparently, MySQLdb picks this number because it's the maximum 

1232 # value of a 64bit unsigned integer. Since we're emulating MySQLdb, 

1233 # we set it to this instead of None, which would be preferred. 

1234 self.affected_rows = 18446744073709551615 

1235 

1236 def _read_ok_packet(self, first_packet): 

1237 ok_packet = OKPacketWrapper(first_packet) 

1238 self.affected_rows = ok_packet.affected_rows 

1239 self.insert_id = ok_packet.insert_id 

1240 self.server_status = ok_packet.server_status 

1241 self.warning_count = ok_packet.warning_count 

1242 self.message = ok_packet.message 

1243 self.has_next = ok_packet.has_next 

1244 

1245 def _read_load_local_packet(self, first_packet): 

1246 if not self.connection._local_infile: 

1247 raise RuntimeError( 

1248 "**WARN**: Received LOAD_LOCAL packet but local_infile option is false." 

1249 ) 

1250 load_packet = LoadLocalPacketWrapper(first_packet) 

1251 sender = LoadLocalFile(load_packet.filename, self.connection) 

1252 try: 

1253 sender.send_data() 

1254 except: 

1255 self.connection._read_packet() # skip ok packet 

1256 raise 

1257 

1258 ok_packet = self.connection._read_packet() 

1259 if ( 

1260 not ok_packet.is_ok_packet() 

1261 ): # pragma: no cover - upstream induced protocol error 

1262 raise err.OperationalError( 

1263 CR.CR_COMMANDS_OUT_OF_SYNC, 

1264 "Commands Out of Sync", 

1265 ) 

1266 self._read_ok_packet(ok_packet) 

1267 

1268 def _check_packet_is_eof(self, packet): 

1269 if not packet.is_eof_packet(): 

1270 return False 

1271 # TODO: Support CLIENT.DEPRECATE_EOF 

1272 # 1) Add DEPRECATE_EOF to CAPABILITIES 

1273 # 2) Mask CAPABILITIES with server_capabilities 

1274 # 3) if server_capabilities & CLIENT.DEPRECATE_EOF: 

1275 # use OKPacketWrapper instead of EOFPacketWrapper 

1276 wp = EOFPacketWrapper(packet) 

1277 self.warning_count = wp.warning_count 

1278 self.has_next = wp.has_next 

1279 return True 

1280 

1281 def _read_result_packet(self, first_packet): 

1282 self.field_count = first_packet.read_length_encoded_integer() 

1283 self._get_descriptions() 

1284 self._read_rowdata_packet() 

1285 

1286 def _read_rowdata_packet_unbuffered(self): 

1287 # Check if in an active query 

1288 if not self.unbuffered_active: 

1289 return 

1290 

1291 # EOF 

1292 packet = self.connection._read_packet() 

1293 if self._check_packet_is_eof(packet): 

1294 self.unbuffered_active = False 

1295 self.connection = None 

1296 self.rows = None 

1297 return 

1298 

1299 row = self._read_row_from_packet(packet) 

1300 self.affected_rows = 1 

1301 self.rows = (row,) # rows should tuple of row for MySQL-python compatibility. 

1302 return row 

1303 

1304 def _finish_unbuffered_query(self): 

1305 # After much reading on the MySQL protocol, it appears that there is, 

1306 # in fact, no way to stop MySQL from sending all the data after 

1307 # executing a query, so we just spin, and wait for an EOF packet. 

1308 while self.unbuffered_active: 

1309 try: 

1310 packet = self.connection._read_packet() 

1311 except err.OperationalError as e: 

1312 if e.args[0] in ( 

1313 ER.QUERY_TIMEOUT, 

1314 ER.STATEMENT_TIMEOUT, 

1315 ): 

1316 # if the query timed out we can simply ignore this error 

1317 self.unbuffered_active = False 

1318 self.connection = None 

1319 return 

1320 

1321 raise 

1322 

1323 if self._check_packet_is_eof(packet): 

1324 self.unbuffered_active = False 

1325 self.connection = None # release reference to kill cyclic reference. 

1326 

1327 def _read_rowdata_packet(self): 

1328 """Read a rowdata packet for each data row in the result set.""" 

1329 rows = [] 

1330 while True: 

1331 packet = self.connection._read_packet() 

1332 if self._check_packet_is_eof(packet): 

1333 self.connection = None # release reference to kill cyclic reference. 

1334 break 

1335 rows.append(self._read_row_from_packet(packet)) 

1336 

1337 self.affected_rows = len(rows) 

1338 self.rows = tuple(rows) 

1339 

1340 def _read_row_from_packet(self, packet): 

1341 row = [] 

1342 for encoding, converter in self.converters: 

1343 try: 

1344 data = packet.read_length_coded_string() 

1345 except IndexError: 

1346 # No more columns in this row 

1347 # See https://github.com/PyMySQL/PyMySQL/pull/434 

1348 break 

1349 if data is not None: 

1350 if encoding is not None: 

1351 data = data.decode(encoding) 

1352 if DEBUG: 

1353 print("DEBUG: DATA = ", data) 

1354 if converter is not None: 

1355 data = converter(data) 

1356 row.append(data) 

1357 return tuple(row) 

1358 

1359 def _get_descriptions(self): 

1360 """Read a column descriptor packet for each column in the result.""" 

1361 self.fields = [] 

1362 self.converters = [] 

1363 use_unicode = self.connection.use_unicode 

1364 conn_encoding = self.connection.encoding 

1365 description = [] 

1366 

1367 for i in range(self.field_count): 

1368 field = self.connection._read_packet(FieldDescriptorPacket) 

1369 self.fields.append(field) 

1370 description.append(field.description()) 

1371 field_type = field.type_code 

1372 if use_unicode: 

1373 if field_type == FIELD_TYPE.JSON: 

1374 # When SELECT from JSON column: charset = binary 

1375 # When SELECT CAST(... AS JSON): charset = connection encoding 

1376 # This behavior is different from TEXT / BLOB. 

1377 # We should decode result by connection encoding regardless charsetnr. 

1378 # See https://github.com/PyMySQL/PyMySQL/issues/488 

1379 encoding = conn_encoding # SELECT CAST(... AS JSON) 

1380 elif field_type in TEXT_TYPES: 

1381 if field.charsetnr == 63: # binary 

1382 # TEXTs with charset=binary means BINARY types. 

1383 encoding = None 

1384 else: 

1385 encoding = conn_encoding 

1386 else: 

1387 # Integers, Dates and Times, and other basic data is encoded in ascii 

1388 encoding = "ascii" 

1389 else: 

1390 encoding = None 

1391 converter = self.connection.decoders.get(field_type) 

1392 if converter is converters.through: 

1393 converter = None 

1394 if DEBUG: 

1395 print(f"DEBUG: field={field}, converter={converter}") 

1396 self.converters.append((encoding, converter)) 

1397 

1398 eof_packet = self.connection._read_packet() 

1399 assert eof_packet.is_eof_packet(), "Protocol error, expecting EOF" 

1400 self.description = tuple(description) 

1401 

1402 

1403class LoadLocalFile: 

1404 def __init__(self, filename, connection): 

1405 self.filename = filename 

1406 self.connection = connection 

1407 

1408 def send_data(self): 

1409 """Send data packets from the local file to the server""" 

1410 if not self.connection._sock: 

1411 raise err.InterfaceError(0, "") 

1412 conn: Connection = self.connection 

1413 

1414 try: 

1415 with open(self.filename, "rb") as open_file: 

1416 packet_size = min( 

1417 conn.max_allowed_packet, 16 * 1024 

1418 ) # 16KB is efficient enough 

1419 while True: 

1420 chunk = open_file.read(packet_size) 

1421 if not chunk: 

1422 break 

1423 conn.write_packet(chunk) 

1424 except OSError: 

1425 raise err.OperationalError( 

1426 ER.FILE_NOT_FOUND, 

1427 f"Can't find file '{self.filename}'", 

1428 ) 

1429 finally: 

1430 if not conn._closed: 

1431 # send the empty packet to signify we are done sending data 

1432 conn.write_packet(b"")