1# dialects/mysql/reflection.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7from __future__ import annotations
8
9import re
10from typing import Any
11from typing import Callable
12from typing import Dict
13from typing import List
14from typing import Optional
15from typing import overload
16from typing import Sequence
17from typing import Tuple
18from typing import TYPE_CHECKING
19from typing import Union
20
21from .enumerated import ENUM
22from .enumerated import SET
23from .types import DATETIME
24from .types import TIME
25from .types import TIMESTAMP
26from ... import types as sqltypes
27from ... import util
28from ...util.typing import Literal
29
30if TYPE_CHECKING:
31 from .base import MySQLDialect
32 from .base import MySQLIdentifierPreparer
33 from ...engine.interfaces import ReflectedColumn
34
35
36class ReflectedState:
37 """Stores raw information about a SHOW CREATE TABLE statement."""
38
39 charset: Optional[str]
40
41 def __init__(self) -> None:
42 self.columns: List[ReflectedColumn] = []
43 self.table_options: Dict[str, str] = {}
44 self.table_name: Optional[str] = None
45 self.keys: List[Dict[str, Any]] = []
46 self.fk_constraints: List[Dict[str, Any]] = []
47 self.ck_constraints: List[Dict[str, Any]] = []
48
49
50class MySQLTableDefinitionParser:
51 """Parses the results of a SHOW CREATE TABLE statement."""
52
53 def __init__(
54 self, dialect: MySQLDialect, preparer: MySQLIdentifierPreparer
55 ):
56 self.dialect = dialect
57 self.preparer = preparer
58 self._prep_regexes()
59
60 def parse(
61 self, show_create: str, charset: Optional[str]
62 ) -> ReflectedState:
63 state = ReflectedState()
64 state.charset = charset
65 for line in re.split(r"\r?\n", show_create):
66 if line.startswith(" " + self.preparer.initial_quote):
67 self._parse_column(line, state)
68 # a regular table options line
69 elif line.startswith(") "):
70 self._parse_table_options(line, state)
71 # an ANSI-mode table options line
72 elif line == ")":
73 pass
74 elif line.startswith("CREATE "):
75 self._parse_table_name(line, state)
76 elif "PARTITION" in line:
77 self._parse_partition_options(line, state)
78 # Not present in real reflection, but may be if
79 # loading from a file.
80 elif not line:
81 pass
82 else:
83 type_, spec = self._parse_constraints(line)
84 if type_ is None:
85 util.warn("Unknown schema content: %r" % line)
86 elif type_ == "key":
87 state.keys.append(spec) # type: ignore[arg-type]
88 elif type_ == "fk_constraint":
89 state.fk_constraints.append(spec) # type: ignore[arg-type]
90 elif type_ == "ck_constraint":
91 state.ck_constraints.append(spec) # type: ignore[arg-type]
92 else:
93 pass
94 return state
95
96 def _check_view(self, sql: str) -> bool:
97 return bool(self._re_is_view.match(sql))
98
99 def _parse_constraints(self, line: str) -> Union[
100 Tuple[None, str],
101 Tuple[Literal["partition"], str],
102 Tuple[
103 Literal["ck_constraint", "fk_constraint", "key"], Dict[str, str]
104 ],
105 ]:
106 """Parse a KEY or CONSTRAINT line.
107
108 :param line: A line of SHOW CREATE TABLE output
109 """
110
111 # KEY
112 m = self._re_key.match(line)
113 if m:
114 spec = m.groupdict()
115 # convert columns into name, length pairs
116 # NOTE: we may want to consider SHOW INDEX as the
117 # format of indexes in MySQL becomes more complex
118 spec["columns"] = self._parse_keyexprs(spec["columns"])
119 if spec["version_sql"]:
120 m2 = self._re_key_version_sql.match(spec["version_sql"])
121 if m2 and m2.groupdict()["parser"]:
122 spec["parser"] = m2.groupdict()["parser"]
123 if spec["parser"]:
124 spec["parser"] = self.preparer.unformat_identifiers(
125 spec["parser"]
126 )[0]
127 return "key", spec
128
129 # FOREIGN KEY CONSTRAINT
130 m = self._re_fk_constraint.match(line)
131 if m:
132 spec = m.groupdict()
133 spec["table"] = self.preparer.unformat_identifiers(spec["table"])
134 spec["local"] = [c[0] for c in self._parse_keyexprs(spec["local"])]
135 spec["foreign"] = [
136 c[0] for c in self._parse_keyexprs(spec["foreign"])
137 ]
138 return "fk_constraint", spec
139
140 # CHECK constraint
141 m = self._re_ck_constraint.match(line)
142 if m:
143 spec = m.groupdict()
144 return "ck_constraint", spec
145
146 # PARTITION and SUBPARTITION
147 m = self._re_partition.match(line)
148 if m:
149 # Punt!
150 return "partition", line
151
152 # No match.
153 return (None, line)
154
155 def _parse_table_name(self, line: str, state: ReflectedState) -> None:
156 """Extract the table name.
157
158 :param line: The first line of SHOW CREATE TABLE
159 """
160
161 regex, cleanup = self._pr_name
162 m = regex.match(line)
163 if m:
164 state.table_name = cleanup(m.group("name"))
165
166 def _parse_table_options(self, line: str, state: ReflectedState) -> None:
167 """Build a dictionary of all reflected table-level options.
168
169 :param line: The final line of SHOW CREATE TABLE output.
170 """
171
172 options = {}
173
174 if line and line != ")":
175 rest_of_line = line
176 for regex, cleanup in self._pr_options:
177 m = regex.search(rest_of_line)
178 if not m:
179 continue
180 directive, value = m.group("directive"), m.group("val")
181 if cleanup:
182 value = cleanup(value)
183 options[directive.lower()] = value
184 rest_of_line = regex.sub("", rest_of_line)
185
186 for nope in ("auto_increment", "data directory", "index directory"):
187 options.pop(nope, None)
188
189 for opt, val in options.items():
190 state.table_options["%s_%s" % (self.dialect.name, opt)] = val
191
192 def _parse_partition_options(
193 self, line: str, state: ReflectedState
194 ) -> None:
195 options = {}
196 new_line = line[:]
197
198 while new_line.startswith("(") or new_line.startswith(" "):
199 new_line = new_line[1:]
200
201 for regex, cleanup in self._pr_options:
202 m = regex.search(new_line)
203 if not m or "PARTITION" not in regex.pattern:
204 continue
205
206 directive = m.group("directive")
207 directive = directive.lower()
208 is_subpartition = directive == "subpartition"
209
210 if directive == "partition" or is_subpartition:
211 new_line = new_line.replace(") */", "")
212 new_line = new_line.replace(",", "")
213 if is_subpartition and new_line.endswith(")"):
214 new_line = new_line[:-1]
215 if self.dialect.name == "mariadb" and new_line.endswith(")"):
216 if (
217 "MAXVALUE" in new_line
218 or "MINVALUE" in new_line
219 or "ENGINE" in new_line
220 ):
221 # final line of MariaDB partition endswith ")"
222 new_line = new_line[:-1]
223
224 defs = "%s_%s_definitions" % (self.dialect.name, directive)
225 options[defs] = new_line
226
227 else:
228 directive = directive.replace(" ", "_")
229 value = m.group("val")
230 if cleanup:
231 value = cleanup(value)
232 options[directive] = value
233 break
234
235 for opt, val in options.items():
236 part_def = "%s_partition_definitions" % (self.dialect.name)
237 subpart_def = "%s_subpartition_definitions" % (self.dialect.name)
238 if opt == part_def or opt == subpart_def:
239 # builds a string of definitions
240 if opt not in state.table_options:
241 state.table_options[opt] = val
242 else:
243 state.table_options[opt] = "%s, %s" % (
244 state.table_options[opt],
245 val,
246 )
247 else:
248 state.table_options["%s_%s" % (self.dialect.name, opt)] = val
249
250 def _parse_column(self, line: str, state: ReflectedState) -> None:
251 """Extract column details.
252
253 Falls back to a 'minimal support' variant if full parse fails.
254
255 :param line: Any column-bearing line from SHOW CREATE TABLE
256 """
257
258 spec = None
259 m = self._re_column.match(line)
260 if m:
261 spec = m.groupdict()
262 spec["full"] = True
263 else:
264 m = self._re_column_loose.match(line)
265 if m:
266 spec = m.groupdict()
267 spec["full"] = False
268 if not spec:
269 util.warn("Unknown column definition %r" % line)
270 return
271 if not spec["full"]:
272 util.warn("Incomplete reflection of column definition %r" % line)
273
274 name, type_, args = spec["name"], spec["coltype"], spec["arg"]
275
276 try:
277 col_type = self.dialect.ischema_names[type_]
278 except KeyError:
279 util.warn(
280 "Did not recognize type '%s' of column '%s'" % (type_, name)
281 )
282 col_type = sqltypes.NullType
283
284 # Column type positional arguments eg. varchar(32)
285 if args is None or args == "":
286 type_args = []
287 elif args[0] == "'" and args[-1] == "'":
288 type_args = self._re_csv_str.findall(args)
289 else:
290 type_args = [int(v) for v in self._re_csv_int.findall(args)]
291
292 # Column type keyword options
293 type_kw = {}
294
295 if issubclass(col_type, (DATETIME, TIME, TIMESTAMP)):
296 if type_args:
297 type_kw["fsp"] = type_args.pop(0)
298
299 for kw in ("unsigned", "zerofill"):
300 if spec.get(kw, False):
301 type_kw[kw] = True
302 for kw in ("charset", "collate"):
303 if spec.get(kw, False):
304 type_kw[kw] = spec[kw]
305 if issubclass(col_type, (ENUM, SET)):
306 type_args = _strip_values(type_args)
307
308 if issubclass(col_type, SET) and "" in type_args:
309 type_kw["retrieve_as_bitwise"] = True
310
311 type_instance = col_type(*type_args, **type_kw)
312
313 col_kw: Dict[str, Any] = {}
314
315 # NOT NULL
316 col_kw["nullable"] = True
317 # this can be "NULL" in the case of TIMESTAMP
318 if spec.get("notnull", False) == "NOT NULL":
319 col_kw["nullable"] = False
320 # For generated columns, the nullability is marked in a different place
321 if spec.get("notnull_generated", False) == "NOT NULL":
322 col_kw["nullable"] = False
323
324 # AUTO_INCREMENT
325 if spec.get("autoincr", False):
326 col_kw["autoincrement"] = True
327 elif issubclass(col_type, sqltypes.Integer):
328 col_kw["autoincrement"] = False
329
330 # DEFAULT
331 default = spec.get("default", None)
332
333 if default == "NULL":
334 # eliminates the need to deal with this later.
335 default = None
336
337 comment = spec.get("comment", None)
338
339 if comment is not None:
340 comment = cleanup_text(comment)
341
342 sqltext = spec.get("generated")
343 if sqltext is not None:
344 computed = dict(sqltext=sqltext)
345 persisted = spec.get("persistence")
346 if persisted is not None:
347 computed["persisted"] = persisted == "STORED"
348 col_kw["computed"] = computed
349
350 col_d = dict(
351 name=name, type=type_instance, default=default, comment=comment
352 )
353 col_d.update(col_kw)
354 state.columns.append(col_d) # type: ignore[arg-type]
355
356 def _describe_to_create(
357 self,
358 table_name: str,
359 columns: Sequence[Tuple[str, str, str, str, str, str]],
360 ) -> str:
361 """Re-format DESCRIBE output as a SHOW CREATE TABLE string.
362
363 DESCRIBE is a much simpler reflection and is sufficient for
364 reflecting views for runtime use. This method formats DDL
365 for columns only- keys are omitted.
366
367 :param columns: A sequence of DESCRIBE or SHOW COLUMNS 6-tuples.
368 SHOW FULL COLUMNS FROM rows must be rearranged for use with
369 this function.
370 """
371
372 buffer = []
373 for row in columns:
374 (name, col_type, nullable, default, extra) = (
375 row[i] for i in (0, 1, 2, 4, 5)
376 )
377
378 line = [" "]
379 line.append(self.preparer.quote_identifier(name))
380 line.append(col_type)
381 if not nullable:
382 line.append("NOT NULL")
383 if default:
384 if "auto_increment" in default:
385 pass
386 elif col_type.startswith("timestamp") and default.startswith(
387 "C"
388 ):
389 line.append("DEFAULT")
390 line.append(default)
391 elif default == "NULL":
392 line.append("DEFAULT")
393 line.append(default)
394 else:
395 line.append("DEFAULT")
396 line.append("'%s'" % default.replace("'", "''"))
397 if extra:
398 line.append(extra)
399
400 buffer.append(" ".join(line))
401
402 return "".join(
403 [
404 (
405 "CREATE TABLE %s (\n"
406 % self.preparer.quote_identifier(table_name)
407 ),
408 ",\n".join(buffer),
409 "\n) ",
410 ]
411 )
412
413 def _parse_keyexprs(
414 self, identifiers: str
415 ) -> List[Tuple[str, Optional[int], str]]:
416 """Unpack '"col"(2),"col" ASC'-ish strings into components."""
417
418 return [
419 (colname, int(length) if length else None, modifiers)
420 for colname, length, modifiers in self._re_keyexprs.findall(
421 identifiers
422 )
423 ]
424
425 def _prep_regexes(self) -> None:
426 """Pre-compile regular expressions."""
427
428 self._pr_options: List[
429 Tuple[re.Pattern[Any], Optional[Callable[[str], str]]]
430 ] = []
431
432 _final = self.preparer.final_quote
433
434 quotes = dict(
435 zip(
436 ("iq", "fq", "esc_fq"),
437 [
438 re.escape(s)
439 for s in (
440 self.preparer.initial_quote,
441 _final,
442 self.preparer._escape_identifier(_final),
443 )
444 ],
445 )
446 )
447
448 self._pr_name = _pr_compile(
449 r"^CREATE (?:\w+ +)?TABLE +"
450 r"%(iq)s(?P<name>(?:%(esc_fq)s|[^%(fq)s])+)%(fq)s +\($" % quotes,
451 self.preparer._unescape_identifier,
452 )
453
454 self._re_is_view = _re_compile(r"^CREATE(?! TABLE)(\s.*)?\sVIEW")
455
456 # `col`,`col2`(32),`col3`(15) DESC
457 #
458 self._re_keyexprs = _re_compile(
459 r"(?:"
460 r"(?:%(iq)s((?:%(esc_fq)s|[^%(fq)s])+)%(fq)s)"
461 r"(?:\((\d+)\))?(?: +(ASC|DESC))?(?=\,|$))+" % quotes
462 )
463
464 # 'foo' or 'foo','bar' or 'fo,o','ba''a''r'
465 self._re_csv_str = _re_compile(r"\x27(?:\x27\x27|[^\x27])*\x27")
466
467 # 123 or 123,456
468 self._re_csv_int = _re_compile(r"\d+")
469
470 # `colname` <type> [type opts]
471 # (NOT NULL | NULL)
472 # DEFAULT ('value' | CURRENT_TIMESTAMP...)
473 # COMMENT 'comment'
474 # COLUMN_FORMAT (FIXED|DYNAMIC|DEFAULT)
475 # STORAGE (DISK|MEMORY)
476 self._re_column = _re_compile(
477 r" "
478 r"%(iq)s(?P<name>(?:%(esc_fq)s|[^%(fq)s])+)%(fq)s +"
479 r"(?P<coltype>\w+)"
480 r"(?:\((?P<arg>(?:\d+|\d+,\d+|"
481 r"(?:'(?:''|[^'])*',?)+))\))?"
482 r"(?: +(?P<unsigned>UNSIGNED))?"
483 r"(?: +(?P<zerofill>ZEROFILL))?"
484 r"(?: +CHARACTER SET +(?P<charset>[\w_]+))?"
485 r"(?: +COLLATE +(?P<collate>[\w_]+))?"
486 r"(?: +(?P<notnull>(?:NOT )?NULL))?"
487 r"(?: +DEFAULT +(?P<default>"
488 r"(?:NULL|'(?:''|[^'])*'|\(.+?\)|[\-\w\.\(\)]+"
489 r"(?: +ON UPDATE [\-\w\.\(\)]+)?)"
490 r"))?"
491 r"(?: +(?:GENERATED ALWAYS)? ?AS +(?P<generated>\("
492 r".*\))? ?(?P<persistence>VIRTUAL|STORED)?"
493 r"(?: +(?P<notnull_generated>(?:NOT )?NULL))?"
494 r")?"
495 r"(?: +(?P<autoincr>AUTO_INCREMENT))?"
496 r"(?: +COMMENT +'(?P<comment>(?:''|[^'])*)')?"
497 r"(?: +COLUMN_FORMAT +(?P<colfmt>\w+))?"
498 r"(?: +STORAGE +(?P<storage>\w+))?"
499 r"(?: +(?P<extra>.*))?"
500 r",?$" % quotes
501 )
502
503 # Fallback, try to parse as little as possible
504 self._re_column_loose = _re_compile(
505 r" "
506 r"%(iq)s(?P<name>(?:%(esc_fq)s|[^%(fq)s])+)%(fq)s +"
507 r"(?P<coltype>\w+)"
508 r"(?:\((?P<arg>(?:\d+|\d+,\d+|\x27(?:\x27\x27|[^\x27])+\x27))\))?"
509 r".*?(?P<notnull>(?:NOT )NULL)?" % quotes
510 )
511
512 # (PRIMARY|UNIQUE|FULLTEXT|SPATIAL) INDEX `name` (USING (BTREE|HASH))?
513 # (`col` (ASC|DESC)?, `col` (ASC|DESC)?)
514 # KEY_BLOCK_SIZE size | WITH PARSER name /*!50100 WITH PARSER name */
515 self._re_key = _re_compile(
516 r" "
517 r"(?:(?P<type>\S+) )?KEY"
518 r"(?: +%(iq)s(?P<name>(?:%(esc_fq)s|[^%(fq)s])+)%(fq)s)?"
519 r"(?: +USING +(?P<using_pre>\S+))?"
520 r" +\((?P<columns>.+?)\)"
521 r"(?: +USING +(?P<using_post>\S+))?"
522 r"(?: +KEY_BLOCK_SIZE *[ =]? *(?P<keyblock>\S+))?"
523 r"(?: +WITH PARSER +(?P<parser>\S+))?"
524 r"(?: +COMMENT +(?P<comment>(\x27\x27|\x27([^\x27])*?\x27)+))?"
525 r"(?: +/\*(?P<version_sql>.+)\*/ *)?"
526 r",?$" % quotes
527 )
528
529 # https://forums.mysql.com/read.php?20,567102,567111#msg-567111
530 # It means if the MySQL version >= \d+, execute what's in the comment
531 self._re_key_version_sql = _re_compile(
532 r"\!\d+ " r"(?: *WITH PARSER +(?P<parser>\S+) *)?"
533 )
534
535 # CONSTRAINT `name` FOREIGN KEY (`local_col`)
536 # REFERENCES `remote` (`remote_col`)
537 # MATCH FULL | MATCH PARTIAL | MATCH SIMPLE
538 # ON DELETE CASCADE ON UPDATE RESTRICT
539 #
540 # unique constraints come back as KEYs
541 kw = quotes.copy()
542 kw["on"] = "RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT"
543 self._re_fk_constraint = _re_compile(
544 r" "
545 r"CONSTRAINT +"
546 r"%(iq)s(?P<name>(?:%(esc_fq)s|[^%(fq)s])+)%(fq)s +"
547 r"FOREIGN KEY +"
548 r"\((?P<local>[^\)]+?)\) REFERENCES +"
549 r"(?P<table>%(iq)s[^%(fq)s]+%(fq)s"
550 r"(?:\.%(iq)s[^%(fq)s]+%(fq)s)?) +"
551 r"\((?P<foreign>(?:%(iq)s[^%(fq)s]+%(fq)s(?: *, *)?)+)\)"
552 r"(?: +(?P<match>MATCH \w+))?"
553 r"(?: +ON DELETE (?P<ondelete>%(on)s))?"
554 r"(?: +ON UPDATE (?P<onupdate>%(on)s))?" % kw
555 )
556
557 # CONSTRAINT `CONSTRAINT_1` CHECK (`x` > 5)'
558 # testing on MariaDB 10.2 shows that the CHECK constraint
559 # is returned on a line by itself, so to match without worrying
560 # about parenthesis in the expression we go to the end of the line
561 self._re_ck_constraint = _re_compile(
562 r" "
563 r"CONSTRAINT +"
564 r"%(iq)s(?P<name>(?:%(esc_fq)s|[^%(fq)s])+)%(fq)s +"
565 r"CHECK +"
566 r"\((?P<sqltext>.+)\),?" % kw
567 )
568
569 # PARTITION
570 #
571 # punt!
572 self._re_partition = _re_compile(r"(?:.*)(?:SUB)?PARTITION(?:.*)")
573
574 # Table-level options (COLLATE, ENGINE, etc.)
575 # Do the string options first, since they have quoted
576 # strings we need to get rid of.
577 for option in _options_of_type_string:
578 self._add_option_string(option)
579
580 for option in (
581 "ENGINE",
582 "TYPE",
583 "AUTO_INCREMENT",
584 "AVG_ROW_LENGTH",
585 "CHARACTER SET",
586 "DEFAULT CHARSET",
587 "CHECKSUM",
588 "COLLATE",
589 "DELAY_KEY_WRITE",
590 "INSERT_METHOD",
591 "MAX_ROWS",
592 "MIN_ROWS",
593 "PACK_KEYS",
594 "ROW_FORMAT",
595 "KEY_BLOCK_SIZE",
596 "STATS_SAMPLE_PAGES",
597 ):
598 self._add_option_word(option)
599
600 for option in (
601 "PARTITION BY",
602 "SUBPARTITION BY",
603 "PARTITIONS",
604 "SUBPARTITIONS",
605 "PARTITION",
606 "SUBPARTITION",
607 ):
608 self._add_partition_option_word(option)
609
610 self._add_option_regex("UNION", r"\([^\)]+\)")
611 self._add_option_regex("TABLESPACE", r".*? STORAGE DISK")
612 self._add_option_regex(
613 "RAID_TYPE",
614 r"\w+\s+RAID_CHUNKS\s*\=\s*\w+RAID_CHUNKSIZE\s*=\s*\w+",
615 )
616
617 _optional_equals = r"(?:\s*(?:=\s*)|\s+)"
618
619 def _add_option_string(self, directive: str) -> None:
620 regex = r"(?P<directive>%s)%s" r"'(?P<val>(?:[^']|'')*?)'(?!')" % (
621 re.escape(directive),
622 self._optional_equals,
623 )
624 self._pr_options.append(_pr_compile(regex, cleanup_text))
625
626 def _add_option_word(self, directive: str) -> None:
627 regex = r"(?P<directive>%s)%s" r"(?P<val>\w+)" % (
628 re.escape(directive),
629 self._optional_equals,
630 )
631 self._pr_options.append(_pr_compile(regex))
632
633 def _add_partition_option_word(self, directive: str) -> None:
634 if directive == "PARTITION BY" or directive == "SUBPARTITION BY":
635 regex = r"(?<!\S)(?P<directive>%s)%s" r"(?P<val>\w+.*)" % (
636 re.escape(directive),
637 self._optional_equals,
638 )
639 elif directive == "SUBPARTITIONS" or directive == "PARTITIONS":
640 regex = r"(?<!\S)(?P<directive>%s)%s" r"(?P<val>\d+)" % (
641 re.escape(directive),
642 self._optional_equals,
643 )
644 else:
645 regex = r"(?<!\S)(?P<directive>%s)(?!\S)" % (re.escape(directive),)
646 self._pr_options.append(_pr_compile(regex))
647
648 def _add_option_regex(self, directive: str, regex: str) -> None:
649 regex = r"(?P<directive>%s)%s" r"(?P<val>%s)" % (
650 re.escape(directive),
651 self._optional_equals,
652 regex,
653 )
654 self._pr_options.append(_pr_compile(regex))
655
656
657_options_of_type_string = (
658 "COMMENT",
659 "DATA DIRECTORY",
660 "INDEX DIRECTORY",
661 "PASSWORD",
662 "CONNECTION",
663)
664
665
666@overload
667def _pr_compile(
668 regex: str, cleanup: Callable[[str], str]
669) -> Tuple[re.Pattern[Any], Callable[[str], str]]: ...
670
671
672@overload
673def _pr_compile(
674 regex: str, cleanup: None = None
675) -> Tuple[re.Pattern[Any], None]: ...
676
677
678def _pr_compile(
679 regex: str, cleanup: Optional[Callable[[str], str]] = None
680) -> Tuple[re.Pattern[Any], Optional[Callable[[str], str]]]:
681 """Prepare a 2-tuple of compiled regex and callable."""
682
683 return (_re_compile(regex), cleanup)
684
685
686def _re_compile(regex: str) -> re.Pattern[Any]:
687 """Compile a string to regex, I and UNICODE."""
688
689 return re.compile(regex, re.I | re.UNICODE)
690
691
692def _strip_values(values: Sequence[str]) -> List[str]:
693 "Strip reflected values quotes"
694 strip_values: List[str] = []
695 for a in values:
696 if a[0:1] == '"' or a[0:1] == "'":
697 # strip enclosing quotes and unquote interior
698 a = a[1:-1].replace(a[0] * 2, a[0])
699 strip_values.append(a)
700 return strip_values
701
702
703def cleanup_text(raw_text: str) -> str:
704 if "\\" in raw_text:
705 raw_text = re.sub(
706 _control_char_regexp,
707 lambda s: _control_char_map[s[0]], # type: ignore[index]
708 raw_text,
709 )
710 return raw_text.replace("''", "'")
711
712
713_control_char_map = {
714 "\\\\": "\\",
715 "\\0": "\0",
716 "\\a": "\a",
717 "\\b": "\b",
718 "\\t": "\t",
719 "\\n": "\n",
720 "\\v": "\v",
721 "\\f": "\f",
722 "\\r": "\r",
723 # '\\e':'\e',
724}
725_control_char_regexp = re.compile(
726 "|".join(re.escape(k) for k in _control_char_map)
727)