Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/mysql/reflection.py: 17%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

312 statements  

1# dialects/mysql/reflection.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7from __future__ import annotations 

8 

9import re 

10from typing import Any 

11from typing import Callable 

12from typing import Dict 

13from typing import List 

14from typing import Optional 

15from typing import overload 

16from typing import Sequence 

17from typing import Tuple 

18from typing import TYPE_CHECKING 

19from typing import Union 

20 

21from .enumerated import ENUM 

22from .enumerated import SET 

23from .types import DATETIME 

24from .types import TIME 

25from .types import TIMESTAMP 

26from ... import types as sqltypes 

27from ... import util 

28from ...util.typing import Literal 

29 

30if TYPE_CHECKING: 

31 from .base import MySQLDialect 

32 from .base import MySQLIdentifierPreparer 

33 from ...engine.interfaces import ReflectedColumn 

34 

35 

36class ReflectedState: 

37 """Stores raw information about a SHOW CREATE TABLE statement.""" 

38 

39 charset: Optional[str] 

40 

41 def __init__(self) -> None: 

42 self.columns: List[ReflectedColumn] = [] 

43 self.table_options: Dict[str, str] = {} 

44 self.table_name: Optional[str] = None 

45 self.keys: List[Dict[str, Any]] = [] 

46 self.fk_constraints: List[Dict[str, Any]] = [] 

47 self.ck_constraints: List[Dict[str, Any]] = [] 

48 

49 

50class MySQLTableDefinitionParser: 

51 """Parses the results of a SHOW CREATE TABLE statement.""" 

52 

53 def __init__( 

54 self, dialect: MySQLDialect, preparer: MySQLIdentifierPreparer 

55 ): 

56 self.dialect = dialect 

57 self.preparer = preparer 

58 self._prep_regexes() 

59 

60 def parse( 

61 self, show_create: str, charset: Optional[str] 

62 ) -> ReflectedState: 

63 state = ReflectedState() 

64 state.charset = charset 

65 for line in re.split(r"\r?\n", show_create): 

66 if line.startswith(" " + self.preparer.initial_quote): 

67 self._parse_column(line, state) 

68 # a regular table options line 

69 elif line.startswith(") "): 

70 self._parse_table_options(line, state) 

71 # an ANSI-mode table options line 

72 elif line == ")": 

73 pass 

74 elif line.startswith("CREATE "): 

75 self._parse_table_name(line, state) 

76 elif "PARTITION" in line: 

77 self._parse_partition_options(line, state) 

78 # Not present in real reflection, but may be if 

79 # loading from a file. 

80 elif not line: 

81 pass 

82 else: 

83 type_, spec = self._parse_constraints(line) 

84 if type_ is None: 

85 util.warn("Unknown schema content: %r" % line) 

86 elif type_ == "key": 

87 state.keys.append(spec) # type: ignore[arg-type] 

88 elif type_ == "fk_constraint": 

89 state.fk_constraints.append(spec) # type: ignore[arg-type] 

90 elif type_ == "ck_constraint": 

91 state.ck_constraints.append(spec) # type: ignore[arg-type] 

92 else: 

93 pass 

94 return state 

95 

96 def _check_view(self, sql: str) -> bool: 

97 return bool(self._re_is_view.match(sql)) 

98 

99 def _parse_constraints(self, line: str) -> Union[ 

100 Tuple[None, str], 

101 Tuple[Literal["partition"], str], 

102 Tuple[ 

103 Literal["ck_constraint", "fk_constraint", "key"], Dict[str, str] 

104 ], 

105 ]: 

106 """Parse a KEY or CONSTRAINT line. 

107 

108 :param line: A line of SHOW CREATE TABLE output 

109 """ 

110 

111 # KEY 

112 m = self._re_key.match(line) 

113 if m: 

114 spec = m.groupdict() 

115 # convert columns into name, length pairs 

116 # NOTE: we may want to consider SHOW INDEX as the 

117 # format of indexes in MySQL becomes more complex 

118 spec["columns"] = self._parse_keyexprs(spec["columns"]) 

119 if spec["version_sql"]: 

120 m2 = self._re_key_version_sql.match(spec["version_sql"]) 

121 if m2 and m2.groupdict()["parser"]: 

122 spec["parser"] = m2.groupdict()["parser"] 

123 if spec["parser"]: 

124 spec["parser"] = self.preparer.unformat_identifiers( 

125 spec["parser"] 

126 )[0] 

127 return "key", spec 

128 

129 # FOREIGN KEY CONSTRAINT 

130 m = self._re_fk_constraint.match(line) 

131 if m: 

132 spec = m.groupdict() 

133 spec["table"] = self.preparer.unformat_identifiers(spec["table"]) 

134 spec["local"] = [c[0] for c in self._parse_keyexprs(spec["local"])] 

135 spec["foreign"] = [ 

136 c[0] for c in self._parse_keyexprs(spec["foreign"]) 

137 ] 

138 return "fk_constraint", spec 

139 

140 # CHECK constraint 

141 m = self._re_ck_constraint.match(line) 

142 if m: 

143 spec = m.groupdict() 

144 return "ck_constraint", spec 

145 

146 # PARTITION and SUBPARTITION 

147 m = self._re_partition.match(line) 

148 if m: 

149 # Punt! 

150 return "partition", line 

151 

152 # No match. 

153 return (None, line) 

154 

155 def _parse_table_name(self, line: str, state: ReflectedState) -> None: 

156 """Extract the table name. 

157 

158 :param line: The first line of SHOW CREATE TABLE 

159 """ 

160 

161 regex, cleanup = self._pr_name 

162 m = regex.match(line) 

163 if m: 

164 state.table_name = cleanup(m.group("name")) 

165 

166 def _parse_table_options(self, line: str, state: ReflectedState) -> None: 

167 """Build a dictionary of all reflected table-level options. 

168 

169 :param line: The final line of SHOW CREATE TABLE output. 

170 """ 

171 

172 options = {} 

173 

174 if line and line != ")": 

175 rest_of_line = line 

176 for regex, cleanup in self._pr_options: 

177 m = regex.search(rest_of_line) 

178 if not m: 

179 continue 

180 directive, value = m.group("directive"), m.group("val") 

181 if cleanup: 

182 value = cleanup(value) 

183 options[directive.lower()] = value 

184 rest_of_line = regex.sub("", rest_of_line) 

185 

186 for nope in ("auto_increment", "data directory", "index directory"): 

187 options.pop(nope, None) 

188 

189 for opt, val in options.items(): 

190 state.table_options["%s_%s" % (self.dialect.name, opt)] = val 

191 

192 def _parse_partition_options( 

193 self, line: str, state: ReflectedState 

194 ) -> None: 

195 options = {} 

196 new_line = line[:] 

197 

198 while new_line.startswith("(") or new_line.startswith(" "): 

199 new_line = new_line[1:] 

200 

201 for regex, cleanup in self._pr_options: 

202 m = regex.search(new_line) 

203 if not m or "PARTITION" not in regex.pattern: 

204 continue 

205 

206 directive = m.group("directive") 

207 directive = directive.lower() 

208 is_subpartition = directive == "subpartition" 

209 

210 if directive == "partition" or is_subpartition: 

211 new_line = new_line.replace(") */", "") 

212 new_line = new_line.replace(",", "") 

213 if is_subpartition and new_line.endswith(")"): 

214 new_line = new_line[:-1] 

215 if self.dialect.name == "mariadb" and new_line.endswith(")"): 

216 if ( 

217 "MAXVALUE" in new_line 

218 or "MINVALUE" in new_line 

219 or "ENGINE" in new_line 

220 ): 

221 # final line of MariaDB partition endswith ")" 

222 new_line = new_line[:-1] 

223 

224 defs = "%s_%s_definitions" % (self.dialect.name, directive) 

225 options[defs] = new_line 

226 

227 else: 

228 directive = directive.replace(" ", "_") 

229 value = m.group("val") 

230 if cleanup: 

231 value = cleanup(value) 

232 options[directive] = value 

233 break 

234 

235 for opt, val in options.items(): 

236 part_def = "%s_partition_definitions" % (self.dialect.name) 

237 subpart_def = "%s_subpartition_definitions" % (self.dialect.name) 

238 if opt == part_def or opt == subpart_def: 

239 # builds a string of definitions 

240 if opt not in state.table_options: 

241 state.table_options[opt] = val 

242 else: 

243 state.table_options[opt] = "%s, %s" % ( 

244 state.table_options[opt], 

245 val, 

246 ) 

247 else: 

248 state.table_options["%s_%s" % (self.dialect.name, opt)] = val 

249 

250 def _parse_column(self, line: str, state: ReflectedState) -> None: 

251 """Extract column details. 

252 

253 Falls back to a 'minimal support' variant if full parse fails. 

254 

255 :param line: Any column-bearing line from SHOW CREATE TABLE 

256 """ 

257 

258 spec = None 

259 m = self._re_column.match(line) 

260 if m: 

261 spec = m.groupdict() 

262 spec["full"] = True 

263 else: 

264 m = self._re_column_loose.match(line) 

265 if m: 

266 spec = m.groupdict() 

267 spec["full"] = False 

268 if not spec: 

269 util.warn("Unknown column definition %r" % line) 

270 return 

271 if not spec["full"]: 

272 util.warn("Incomplete reflection of column definition %r" % line) 

273 

274 name, type_, args = spec["name"], spec["coltype"], spec["arg"] 

275 

276 try: 

277 col_type = self.dialect.ischema_names[type_] 

278 except KeyError: 

279 util.warn( 

280 "Did not recognize type '%s' of column '%s'" % (type_, name) 

281 ) 

282 col_type = sqltypes.NullType 

283 

284 # Column type positional arguments eg. varchar(32) 

285 if args is None or args == "": 

286 type_args = [] 

287 elif args[0] == "'" and args[-1] == "'": 

288 type_args = self._re_csv_str.findall(args) 

289 else: 

290 type_args = [int(v) for v in self._re_csv_int.findall(args)] 

291 

292 # Column type keyword options 

293 type_kw = {} 

294 

295 if issubclass(col_type, (DATETIME, TIME, TIMESTAMP)): 

296 if type_args: 

297 type_kw["fsp"] = type_args.pop(0) 

298 

299 for kw in ("unsigned", "zerofill"): 

300 if spec.get(kw, False): 

301 type_kw[kw] = True 

302 for kw in ("charset", "collate"): 

303 if spec.get(kw, False): 

304 type_kw[kw] = spec[kw] 

305 if issubclass(col_type, (ENUM, SET)): 

306 type_args = _strip_values(type_args) 

307 

308 if issubclass(col_type, SET) and "" in type_args: 

309 type_kw["retrieve_as_bitwise"] = True 

310 

311 type_instance = col_type(*type_args, **type_kw) 

312 

313 col_kw: Dict[str, Any] = {} 

314 

315 # NOT NULL 

316 col_kw["nullable"] = True 

317 # this can be "NULL" in the case of TIMESTAMP 

318 if spec.get("notnull", False) == "NOT NULL": 

319 col_kw["nullable"] = False 

320 # For generated columns, the nullability is marked in a different place 

321 if spec.get("notnull_generated", False) == "NOT NULL": 

322 col_kw["nullable"] = False 

323 

324 # AUTO_INCREMENT 

325 if spec.get("autoincr", False): 

326 col_kw["autoincrement"] = True 

327 elif issubclass(col_type, sqltypes.Integer): 

328 col_kw["autoincrement"] = False 

329 

330 # DEFAULT 

331 default = spec.get("default", None) 

332 

333 if default == "NULL": 

334 # eliminates the need to deal with this later. 

335 default = None 

336 

337 comment = spec.get("comment", None) 

338 

339 if comment is not None: 

340 comment = cleanup_text(comment) 

341 

342 sqltext = spec.get("generated") 

343 if sqltext is not None: 

344 computed = dict(sqltext=sqltext) 

345 persisted = spec.get("persistence") 

346 if persisted is not None: 

347 computed["persisted"] = persisted == "STORED" 

348 col_kw["computed"] = computed 

349 

350 col_d = dict( 

351 name=name, type=type_instance, default=default, comment=comment 

352 ) 

353 col_d.update(col_kw) 

354 state.columns.append(col_d) # type: ignore[arg-type] 

355 

356 def _describe_to_create( 

357 self, 

358 table_name: str, 

359 columns: Sequence[Tuple[str, str, str, str, str, str]], 

360 ) -> str: 

361 """Re-format DESCRIBE output as a SHOW CREATE TABLE string. 

362 

363 DESCRIBE is a much simpler reflection and is sufficient for 

364 reflecting views for runtime use. This method formats DDL 

365 for columns only- keys are omitted. 

366 

367 :param columns: A sequence of DESCRIBE or SHOW COLUMNS 6-tuples. 

368 SHOW FULL COLUMNS FROM rows must be rearranged for use with 

369 this function. 

370 """ 

371 

372 buffer = [] 

373 for row in columns: 

374 (name, col_type, nullable, default, extra) = ( 

375 row[i] for i in (0, 1, 2, 4, 5) 

376 ) 

377 

378 line = [" "] 

379 line.append(self.preparer.quote_identifier(name)) 

380 line.append(col_type) 

381 if not nullable: 

382 line.append("NOT NULL") 

383 if default: 

384 if "auto_increment" in default: 

385 pass 

386 elif col_type.startswith("timestamp") and default.startswith( 

387 "C" 

388 ): 

389 line.append("DEFAULT") 

390 line.append(default) 

391 elif default == "NULL": 

392 line.append("DEFAULT") 

393 line.append(default) 

394 else: 

395 line.append("DEFAULT") 

396 line.append("'%s'" % default.replace("'", "''")) 

397 if extra: 

398 line.append(extra) 

399 

400 buffer.append(" ".join(line)) 

401 

402 return "".join( 

403 [ 

404 ( 

405 "CREATE TABLE %s (\n" 

406 % self.preparer.quote_identifier(table_name) 

407 ), 

408 ",\n".join(buffer), 

409 "\n) ", 

410 ] 

411 ) 

412 

413 def _parse_keyexprs( 

414 self, identifiers: str 

415 ) -> List[Tuple[str, Optional[int], str]]: 

416 """Unpack '"col"(2),"col" ASC'-ish strings into components.""" 

417 

418 return [ 

419 (colname, int(length) if length else None, modifiers) 

420 for colname, length, modifiers in self._re_keyexprs.findall( 

421 identifiers 

422 ) 

423 ] 

424 

425 def _prep_regexes(self) -> None: 

426 """Pre-compile regular expressions.""" 

427 

428 self._pr_options: List[ 

429 Tuple[re.Pattern[Any], Optional[Callable[[str], str]]] 

430 ] = [] 

431 

432 _final = self.preparer.final_quote 

433 

434 quotes = dict( 

435 zip( 

436 ("iq", "fq", "esc_fq"), 

437 [ 

438 re.escape(s) 

439 for s in ( 

440 self.preparer.initial_quote, 

441 _final, 

442 self.preparer._escape_identifier(_final), 

443 ) 

444 ], 

445 ) 

446 ) 

447 

448 self._pr_name = _pr_compile( 

449 r"^CREATE (?:\w+ +)?TABLE +" 

450 r"%(iq)s(?P<name>(?:%(esc_fq)s|[^%(fq)s])+)%(fq)s +\($" % quotes, 

451 self.preparer._unescape_identifier, 

452 ) 

453 

454 self._re_is_view = _re_compile(r"^CREATE(?! TABLE)(\s.*)?\sVIEW") 

455 

456 # `col`,`col2`(32),`col3`(15) DESC 

457 # 

458 self._re_keyexprs = _re_compile( 

459 r"(?:" 

460 r"(?:%(iq)s((?:%(esc_fq)s|[^%(fq)s])+)%(fq)s)" 

461 r"(?:\((\d+)\))?(?: +(ASC|DESC))?(?=\,|$))+" % quotes 

462 ) 

463 

464 # 'foo' or 'foo','bar' or 'fo,o','ba''a''r' 

465 self._re_csv_str = _re_compile(r"\x27(?:\x27\x27|[^\x27])*\x27") 

466 

467 # 123 or 123,456 

468 self._re_csv_int = _re_compile(r"\d+") 

469 

470 # `colname` <type> [type opts] 

471 # (NOT NULL | NULL) 

472 # DEFAULT ('value' | CURRENT_TIMESTAMP...) 

473 # COMMENT 'comment' 

474 # COLUMN_FORMAT (FIXED|DYNAMIC|DEFAULT) 

475 # STORAGE (DISK|MEMORY) 

476 self._re_column = _re_compile( 

477 r" " 

478 r"%(iq)s(?P<name>(?:%(esc_fq)s|[^%(fq)s])+)%(fq)s +" 

479 r"(?P<coltype>\w+)" 

480 r"(?:\((?P<arg>(?:\d+|\d+,\d+|" 

481 r"(?:'(?:''|[^'])*',?)+))\))?" 

482 r"(?: +(?P<unsigned>UNSIGNED))?" 

483 r"(?: +(?P<zerofill>ZEROFILL))?" 

484 r"(?: +CHARACTER SET +(?P<charset>[\w_]+))?" 

485 r"(?: +COLLATE +(?P<collate>[\w_]+))?" 

486 r"(?: +(?P<notnull>(?:NOT )?NULL))?" 

487 r"(?: +DEFAULT +(?P<default>" 

488 r"(?:NULL|'(?:''|[^'])*'|\(.+?\)|[\-\w\.\(\)]+" 

489 r"(?: +ON UPDATE [\-\w\.\(\)]+)?)" 

490 r"))?" 

491 r"(?: +(?:GENERATED ALWAYS)? ?AS +(?P<generated>\(" 

492 r".*\))? ?(?P<persistence>VIRTUAL|STORED)?" 

493 r"(?: +(?P<notnull_generated>(?:NOT )?NULL))?" 

494 r")?" 

495 r"(?: +(?P<autoincr>AUTO_INCREMENT))?" 

496 r"(?: +COMMENT +'(?P<comment>(?:''|[^'])*)')?" 

497 r"(?: +COLUMN_FORMAT +(?P<colfmt>\w+))?" 

498 r"(?: +STORAGE +(?P<storage>\w+))?" 

499 r"(?: +(?P<extra>.*))?" 

500 r",?$" % quotes 

501 ) 

502 

503 # Fallback, try to parse as little as possible 

504 self._re_column_loose = _re_compile( 

505 r" " 

506 r"%(iq)s(?P<name>(?:%(esc_fq)s|[^%(fq)s])+)%(fq)s +" 

507 r"(?P<coltype>\w+)" 

508 r"(?:\((?P<arg>(?:\d+|\d+,\d+|\x27(?:\x27\x27|[^\x27])+\x27))\))?" 

509 r".*?(?P<notnull>(?:NOT )NULL)?" % quotes 

510 ) 

511 

512 # (PRIMARY|UNIQUE|FULLTEXT|SPATIAL) INDEX `name` (USING (BTREE|HASH))? 

513 # (`col` (ASC|DESC)?, `col` (ASC|DESC)?) 

514 # KEY_BLOCK_SIZE size | WITH PARSER name /*!50100 WITH PARSER name */ 

515 self._re_key = _re_compile( 

516 r" " 

517 r"(?:(?P<type>\S+) )?KEY" 

518 r"(?: +%(iq)s(?P<name>(?:%(esc_fq)s|[^%(fq)s])+)%(fq)s)?" 

519 r"(?: +USING +(?P<using_pre>\S+))?" 

520 r" +\((?P<columns>.+?)\)" 

521 r"(?: +USING +(?P<using_post>\S+))?" 

522 r"(?: +KEY_BLOCK_SIZE *[ =]? *(?P<keyblock>\S+))?" 

523 r"(?: +WITH PARSER +(?P<parser>\S+))?" 

524 r"(?: +COMMENT +(?P<comment>(\x27\x27|\x27([^\x27])*?\x27)+))?" 

525 r"(?: +/\*(?P<version_sql>.+)\*/ *)?" 

526 r",?$" % quotes 

527 ) 

528 

529 # https://forums.mysql.com/read.php?20,567102,567111#msg-567111 

530 # It means if the MySQL version >= \d+, execute what's in the comment 

531 self._re_key_version_sql = _re_compile( 

532 r"\!\d+ " r"(?: *WITH PARSER +(?P<parser>\S+) *)?" 

533 ) 

534 

535 # CONSTRAINT `name` FOREIGN KEY (`local_col`) 

536 # REFERENCES `remote` (`remote_col`) 

537 # MATCH FULL | MATCH PARTIAL | MATCH SIMPLE 

538 # ON DELETE CASCADE ON UPDATE RESTRICT 

539 # 

540 # unique constraints come back as KEYs 

541 kw = quotes.copy() 

542 kw["on"] = "RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT" 

543 self._re_fk_constraint = _re_compile( 

544 r" " 

545 r"CONSTRAINT +" 

546 r"%(iq)s(?P<name>(?:%(esc_fq)s|[^%(fq)s])+)%(fq)s +" 

547 r"FOREIGN KEY +" 

548 r"\((?P<local>[^\)]+?)\) REFERENCES +" 

549 r"(?P<table>%(iq)s[^%(fq)s]+%(fq)s" 

550 r"(?:\.%(iq)s[^%(fq)s]+%(fq)s)?) +" 

551 r"\((?P<foreign>(?:%(iq)s[^%(fq)s]+%(fq)s(?: *, *)?)+)\)" 

552 r"(?: +(?P<match>MATCH \w+))?" 

553 r"(?: +ON DELETE (?P<ondelete>%(on)s))?" 

554 r"(?: +ON UPDATE (?P<onupdate>%(on)s))?" % kw 

555 ) 

556 

557 # CONSTRAINT `CONSTRAINT_1` CHECK (`x` > 5)' 

558 # testing on MariaDB 10.2 shows that the CHECK constraint 

559 # is returned on a line by itself, so to match without worrying 

560 # about parenthesis in the expression we go to the end of the line 

561 self._re_ck_constraint = _re_compile( 

562 r" " 

563 r"CONSTRAINT +" 

564 r"%(iq)s(?P<name>(?:%(esc_fq)s|[^%(fq)s])+)%(fq)s +" 

565 r"CHECK +" 

566 r"\((?P<sqltext>.+)\),?" % kw 

567 ) 

568 

569 # PARTITION 

570 # 

571 # punt! 

572 self._re_partition = _re_compile(r"(?:.*)(?:SUB)?PARTITION(?:.*)") 

573 

574 # Table-level options (COLLATE, ENGINE, etc.) 

575 # Do the string options first, since they have quoted 

576 # strings we need to get rid of. 

577 for option in _options_of_type_string: 

578 self._add_option_string(option) 

579 

580 for option in ( 

581 "ENGINE", 

582 "TYPE", 

583 "AUTO_INCREMENT", 

584 "AVG_ROW_LENGTH", 

585 "CHARACTER SET", 

586 "DEFAULT CHARSET", 

587 "CHECKSUM", 

588 "COLLATE", 

589 "DELAY_KEY_WRITE", 

590 "INSERT_METHOD", 

591 "MAX_ROWS", 

592 "MIN_ROWS", 

593 "PACK_KEYS", 

594 "ROW_FORMAT", 

595 "KEY_BLOCK_SIZE", 

596 "STATS_SAMPLE_PAGES", 

597 ): 

598 self._add_option_word(option) 

599 

600 for option in ( 

601 "PARTITION BY", 

602 "SUBPARTITION BY", 

603 "PARTITIONS", 

604 "SUBPARTITIONS", 

605 "PARTITION", 

606 "SUBPARTITION", 

607 ): 

608 self._add_partition_option_word(option) 

609 

610 self._add_option_regex("UNION", r"\([^\)]+\)") 

611 self._add_option_regex("TABLESPACE", r".*? STORAGE DISK") 

612 self._add_option_regex( 

613 "RAID_TYPE", 

614 r"\w+\s+RAID_CHUNKS\s*\=\s*\w+RAID_CHUNKSIZE\s*=\s*\w+", 

615 ) 

616 

617 _optional_equals = r"(?:\s*(?:=\s*)|\s+)" 

618 

619 def _add_option_string(self, directive: str) -> None: 

620 regex = r"(?P<directive>%s)%s" r"'(?P<val>(?:[^']|'')*?)'(?!')" % ( 

621 re.escape(directive), 

622 self._optional_equals, 

623 ) 

624 self._pr_options.append(_pr_compile(regex, cleanup_text)) 

625 

626 def _add_option_word(self, directive: str) -> None: 

627 regex = r"(?P<directive>%s)%s" r"(?P<val>\w+)" % ( 

628 re.escape(directive), 

629 self._optional_equals, 

630 ) 

631 self._pr_options.append(_pr_compile(regex)) 

632 

633 def _add_partition_option_word(self, directive: str) -> None: 

634 if directive == "PARTITION BY" or directive == "SUBPARTITION BY": 

635 regex = r"(?<!\S)(?P<directive>%s)%s" r"(?P<val>\w+.*)" % ( 

636 re.escape(directive), 

637 self._optional_equals, 

638 ) 

639 elif directive == "SUBPARTITIONS" or directive == "PARTITIONS": 

640 regex = r"(?<!\S)(?P<directive>%s)%s" r"(?P<val>\d+)" % ( 

641 re.escape(directive), 

642 self._optional_equals, 

643 ) 

644 else: 

645 regex = r"(?<!\S)(?P<directive>%s)(?!\S)" % (re.escape(directive),) 

646 self._pr_options.append(_pr_compile(regex)) 

647 

648 def _add_option_regex(self, directive: str, regex: str) -> None: 

649 regex = r"(?P<directive>%s)%s" r"(?P<val>%s)" % ( 

650 re.escape(directive), 

651 self._optional_equals, 

652 regex, 

653 ) 

654 self._pr_options.append(_pr_compile(regex)) 

655 

656 

657_options_of_type_string = ( 

658 "COMMENT", 

659 "DATA DIRECTORY", 

660 "INDEX DIRECTORY", 

661 "PASSWORD", 

662 "CONNECTION", 

663) 

664 

665 

666@overload 

667def _pr_compile( 

668 regex: str, cleanup: Callable[[str], str] 

669) -> Tuple[re.Pattern[Any], Callable[[str], str]]: ... 

670 

671 

672@overload 

673def _pr_compile( 

674 regex: str, cleanup: None = None 

675) -> Tuple[re.Pattern[Any], None]: ... 

676 

677 

678def _pr_compile( 

679 regex: str, cleanup: Optional[Callable[[str], str]] = None 

680) -> Tuple[re.Pattern[Any], Optional[Callable[[str], str]]]: 

681 """Prepare a 2-tuple of compiled regex and callable.""" 

682 

683 return (_re_compile(regex), cleanup) 

684 

685 

686def _re_compile(regex: str) -> re.Pattern[Any]: 

687 """Compile a string to regex, I and UNICODE.""" 

688 

689 return re.compile(regex, re.I | re.UNICODE) 

690 

691 

692def _strip_values(values: Sequence[str]) -> List[str]: 

693 "Strip reflected values quotes" 

694 strip_values: List[str] = [] 

695 for a in values: 

696 if a[0:1] == '"' or a[0:1] == "'": 

697 # strip enclosing quotes and unquote interior 

698 a = a[1:-1].replace(a[0] * 2, a[0]) 

699 strip_values.append(a) 

700 return strip_values 

701 

702 

703def cleanup_text(raw_text: str) -> str: 

704 if "\\" in raw_text: 

705 raw_text = re.sub( 

706 _control_char_regexp, 

707 lambda s: _control_char_map[s[0]], # type: ignore[index] 

708 raw_text, 

709 ) 

710 return raw_text.replace("''", "'") 

711 

712 

713_control_char_map = { 

714 "\\\\": "\\", 

715 "\\0": "\0", 

716 "\\a": "\a", 

717 "\\b": "\b", 

718 "\\t": "\t", 

719 "\\n": "\n", 

720 "\\v": "\v", 

721 "\\f": "\f", 

722 "\\r": "\r", 

723 # '\\e':'\e', 

724} 

725_control_char_regexp = re.compile( 

726 "|".join(re.escape(k) for k in _control_char_map) 

727)