1from __future__ import annotations
2
3import importlib
4import logging
5import unicodedata
6from codecs import IncrementalDecoder
7from encodings.aliases import aliases
8from functools import lru_cache
9from re import findall
10from typing import Generator
11
12from _multibytecodec import ( # type: ignore[import-not-found,import]
13 MultibyteIncrementalDecoder,
14)
15
16from .constant import (
17 ENCODING_MARKS,
18 IANA_SUPPORTED_SIMILAR,
19 RE_POSSIBLE_ENCODING_INDICATION,
20 UNICODE_RANGES_COMBINED,
21 UNICODE_SECONDARY_RANGE_KEYWORD,
22 UTF8_MAXIMAL_ALLOCATION,
23 COMMON_CJK_CHARACTERS,
24)
25
26
27@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
28def is_accentuated(character: str) -> bool:
29 try:
30 description: str = unicodedata.name(character)
31 except ValueError: # Defensive: unicode database outdated?
32 return False
33 return (
34 "WITH GRAVE" in description
35 or "WITH ACUTE" in description
36 or "WITH CEDILLA" in description
37 or "WITH DIAERESIS" in description
38 or "WITH CIRCUMFLEX" in description
39 or "WITH TILDE" in description
40 or "WITH MACRON" in description
41 or "WITH RING ABOVE" in description
42 )
43
44
45@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
46def remove_accent(character: str) -> str:
47 decomposed: str = unicodedata.decomposition(character)
48 if not decomposed:
49 return character
50
51 codes: list[str] = decomposed.split(" ")
52
53 return chr(int(codes[0], 16))
54
55
56@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
57def unicode_range(character: str) -> str | None:
58 """
59 Retrieve the Unicode range official name from a single character.
60 """
61 character_ord: int = ord(character)
62
63 for range_name, ord_range in UNICODE_RANGES_COMBINED.items():
64 if character_ord in ord_range:
65 return range_name
66
67 return None
68
69
70@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
71def is_latin(character: str) -> bool:
72 try:
73 description: str = unicodedata.name(character)
74 except ValueError: # Defensive: unicode database outdated?
75 return False
76 return "LATIN" in description
77
78
79@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
80def is_punctuation(character: str) -> bool:
81 character_category: str = unicodedata.category(character)
82
83 if "P" in character_category:
84 return True
85
86 character_range: str | None = unicode_range(character)
87
88 if character_range is None:
89 return False
90
91 return "Punctuation" in character_range
92
93
94@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
95def is_symbol(character: str) -> bool:
96 character_category: str = unicodedata.category(character)
97
98 if "S" in character_category or "N" in character_category:
99 return True
100
101 character_range: str | None = unicode_range(character)
102
103 if character_range is None:
104 return False
105
106 return "Forms" in character_range and character_category != "Lo"
107
108
109@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
110def is_emoticon(character: str) -> bool:
111 character_range: str | None = unicode_range(character)
112
113 if character_range is None:
114 return False
115
116 return "Emoticons" in character_range or "Pictographs" in character_range
117
118
119@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
120def is_separator(character: str) -> bool:
121 if character.isspace() or character in {"|", "+", "<", ">"}:
122 return True
123
124 character_category: str = unicodedata.category(character)
125
126 return "Z" in character_category or character_category in {"Po", "Pd", "Pc"}
127
128
129@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
130def is_case_variable(character: str) -> bool:
131 return character.islower() != character.isupper()
132
133
134@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
135def is_cjk(character: str) -> bool:
136 try:
137 character_name = unicodedata.name(character)
138 except ValueError: # Defensive: unicode database outdated?
139 return False
140
141 return "CJK" in character_name
142
143
144@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
145def is_hiragana(character: str) -> bool:
146 try:
147 character_name = unicodedata.name(character)
148 except ValueError: # Defensive: unicode database outdated?
149 return False
150
151 return "HIRAGANA" in character_name
152
153
154@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
155def is_katakana(character: str) -> bool:
156 try:
157 character_name = unicodedata.name(character)
158 except ValueError: # Defensive: unicode database outdated?
159 return False
160
161 return "KATAKANA" in character_name
162
163
164@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
165def is_hangul(character: str) -> bool:
166 try:
167 character_name = unicodedata.name(character)
168 except ValueError: # Defensive: unicode database outdated?
169 return False
170
171 return "HANGUL" in character_name
172
173
174@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
175def is_thai(character: str) -> bool:
176 try:
177 character_name = unicodedata.name(character)
178 except ValueError: # Defensive: unicode database outdated?
179 return False
180
181 return "THAI" in character_name
182
183
184@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
185def is_arabic(character: str) -> bool:
186 try:
187 character_name = unicodedata.name(character)
188 except ValueError: # Defensive: unicode database outdated?
189 return False
190
191 return "ARABIC" in character_name
192
193
194@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
195def is_arabic_isolated_form(character: str) -> bool:
196 try:
197 character_name = unicodedata.name(character)
198 except ValueError: # Defensive: unicode database outdated?
199 return False
200
201 return "ARABIC" in character_name and "ISOLATED FORM" in character_name
202
203
204@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
205def is_cjk_uncommon(character: str) -> bool:
206 return character not in COMMON_CJK_CHARACTERS
207
208
209@lru_cache(maxsize=len(UNICODE_RANGES_COMBINED))
210def is_unicode_range_secondary(range_name: str) -> bool:
211 return any(keyword in range_name for keyword in UNICODE_SECONDARY_RANGE_KEYWORD)
212
213
214@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
215def is_unprintable(character: str) -> bool:
216 return (
217 character.isspace() is False # includes \n \t \r \v
218 and character.isprintable() is False
219 and character != "\x1a" # Why? Its the ASCII substitute character.
220 and character != "\ufeff" # bug discovered in Python,
221 # Zero Width No-Break Space located in Arabic Presentation Forms-B, Unicode 1.1 not acknowledged as space.
222 )
223
224
225def any_specified_encoding(sequence: bytes, search_zone: int = 8192) -> str | None:
226 """
227 Extract using ASCII-only decoder any specified encoding in the first n-bytes.
228 """
229 if not isinstance(sequence, bytes):
230 raise TypeError
231
232 seq_len: int = len(sequence)
233
234 results: list[str] = findall(
235 RE_POSSIBLE_ENCODING_INDICATION,
236 sequence[: min(seq_len, search_zone)].decode("ascii", errors="ignore"),
237 )
238
239 if len(results) == 0:
240 return None
241
242 for specified_encoding in results:
243 specified_encoding = specified_encoding.lower().replace("-", "_")
244
245 encoding_alias: str
246 encoding_iana: str
247
248 for encoding_alias, encoding_iana in aliases.items():
249 if encoding_alias == specified_encoding:
250 return encoding_iana
251 if encoding_iana == specified_encoding:
252 return encoding_iana
253
254 return None
255
256
257@lru_cache(maxsize=128)
258def is_multi_byte_encoding(name: str) -> bool:
259 """
260 Verify is a specific encoding is a multi byte one based on it IANA name
261 """
262 return name in {
263 "utf_8",
264 "utf_8_sig",
265 "utf_16",
266 "utf_16_be",
267 "utf_16_le",
268 "utf_32",
269 "utf_32_le",
270 "utf_32_be",
271 "utf_7",
272 } or issubclass(
273 importlib.import_module(f"encodings.{name}").IncrementalDecoder,
274 MultibyteIncrementalDecoder,
275 )
276
277
278def identify_sig_or_bom(sequence: bytes) -> tuple[str | None, bytes]:
279 """
280 Identify and extract SIG/BOM in given sequence.
281 """
282
283 for iana_encoding in ENCODING_MARKS:
284 marks: bytes | list[bytes] = ENCODING_MARKS[iana_encoding]
285
286 if isinstance(marks, bytes):
287 marks = [marks]
288
289 for mark in marks:
290 if sequence.startswith(mark):
291 return iana_encoding, mark
292
293 return None, b""
294
295
296def should_strip_sig_or_bom(iana_encoding: str) -> bool:
297 return iana_encoding not in {"utf_16", "utf_32"}
298
299
300def iana_name(cp_name: str, strict: bool = True) -> str:
301 """Returns the Python normalized encoding name (Not the IANA official name)."""
302 cp_name = cp_name.lower().replace("-", "_")
303
304 encoding_alias: str
305 encoding_iana: str
306
307 for encoding_alias, encoding_iana in aliases.items():
308 if cp_name in [encoding_alias, encoding_iana]:
309 return encoding_iana
310
311 if strict:
312 raise ValueError(f"Unable to retrieve IANA for '{cp_name}'")
313
314 return cp_name
315
316
317def cp_similarity(iana_name_a: str, iana_name_b: str) -> float:
318 if is_multi_byte_encoding(iana_name_a) or is_multi_byte_encoding(iana_name_b):
319 return 0.0
320
321 decoder_a = importlib.import_module(f"encodings.{iana_name_a}").IncrementalDecoder
322 decoder_b = importlib.import_module(f"encodings.{iana_name_b}").IncrementalDecoder
323
324 id_a: IncrementalDecoder = decoder_a(errors="ignore")
325 id_b: IncrementalDecoder = decoder_b(errors="ignore")
326
327 character_match_count: int = 0
328
329 for i in range(255):
330 to_be_decoded: bytes = bytes([i])
331 if id_a.decode(to_be_decoded) == id_b.decode(to_be_decoded):
332 character_match_count += 1
333
334 return character_match_count / 254
335
336
337def is_cp_similar(iana_name_a: str, iana_name_b: str) -> bool:
338 """
339 Determine if two code page are at least 80% similar. IANA_SUPPORTED_SIMILAR dict was generated using
340 the function cp_similarity.
341 """
342 return (
343 iana_name_a in IANA_SUPPORTED_SIMILAR
344 and iana_name_b in IANA_SUPPORTED_SIMILAR[iana_name_a]
345 )
346
347
348def set_logging_handler(
349 name: str = "charset_normalizer",
350 level: int = logging.INFO,
351 format_string: str = "%(asctime)s | %(levelname)s | %(message)s",
352) -> None:
353 logger = logging.getLogger(name)
354 logger.setLevel(level)
355
356 handler = logging.StreamHandler()
357 handler.setFormatter(logging.Formatter(format_string))
358 logger.addHandler(handler)
359
360
361def cut_sequence_chunks(
362 sequences: bytes,
363 encoding_iana: str,
364 offsets: range,
365 chunk_size: int,
366 bom_or_sig_available: bool,
367 strip_sig_or_bom: bool,
368 sig_payload: bytes,
369 is_multi_byte_decoder: bool,
370 decoded_payload: str | None = None,
371) -> Generator[str, None, None]:
372 if decoded_payload and is_multi_byte_decoder is False:
373 for i in offsets:
374 chunk = decoded_payload[i : i + chunk_size]
375 if not chunk:
376 break
377 yield chunk
378 else:
379 for i in offsets:
380 chunk_end = i + chunk_size
381 if chunk_end > len(sequences) + 8:
382 continue
383
384 cut_sequence = sequences[i : i + chunk_size]
385
386 if bom_or_sig_available and strip_sig_or_bom is False:
387 cut_sequence = sig_payload + cut_sequence
388
389 chunk = cut_sequence.decode(
390 encoding_iana,
391 errors="ignore" if is_multi_byte_decoder else "strict",
392 )
393
394 # multi-byte bad cutting detector and adjustment
395 # not the cleanest way to perform that fix but clever enough for now.
396 if is_multi_byte_decoder and i > 0:
397 chunk_partial_size_chk: int = min(chunk_size, 16)
398
399 if (
400 decoded_payload
401 and chunk[:chunk_partial_size_chk] not in decoded_payload
402 ):
403 for j in range(i, i - 4, -1):
404 cut_sequence = sequences[j:chunk_end]
405
406 if bom_or_sig_available and strip_sig_or_bom is False:
407 cut_sequence = sig_payload + cut_sequence
408
409 chunk = cut_sequence.decode(encoding_iana, errors="ignore")
410
411 if chunk[:chunk_partial_size_chk] in decoded_payload:
412 break
413
414 yield chunk