1from __future__ import annotations
2
3import importlib
4import logging
5import unicodedata
6from codecs import IncrementalDecoder
7from encodings.aliases import aliases
8from functools import lru_cache
9from re import findall
10from typing import Generator
11
12from _multibytecodec import ( # type: ignore[import-not-found,import]
13 MultibyteIncrementalDecoder,
14)
15
16from .constant import (
17 ENCODING_MARKS,
18 IANA_SUPPORTED_SIMILAR,
19 RE_POSSIBLE_ENCODING_INDICATION,
20 UNICODE_RANGES_COMBINED,
21 UNICODE_SECONDARY_RANGE_KEYWORD,
22 UTF8_MAXIMAL_ALLOCATION,
23)
24
25
26@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
27def is_accentuated(character: str) -> bool:
28 try:
29 description: str = unicodedata.name(character)
30 except ValueError: # Defensive: unicode database outdated?
31 return False
32 return (
33 "WITH GRAVE" in description
34 or "WITH ACUTE" in description
35 or "WITH CEDILLA" in description
36 or "WITH DIAERESIS" in description
37 or "WITH CIRCUMFLEX" in description
38 or "WITH TILDE" in description
39 or "WITH MACRON" in description
40 or "WITH RING ABOVE" in description
41 )
42
43
44@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
45def remove_accent(character: str) -> str:
46 decomposed: str = unicodedata.decomposition(character)
47 if not decomposed:
48 return character
49
50 codes: list[str] = decomposed.split(" ")
51
52 return chr(int(codes[0], 16))
53
54
55@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
56def unicode_range(character: str) -> str | None:
57 """
58 Retrieve the Unicode range official name from a single character.
59 """
60 character_ord: int = ord(character)
61
62 for range_name, ord_range in UNICODE_RANGES_COMBINED.items():
63 if character_ord in ord_range:
64 return range_name
65
66 return None
67
68
69@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
70def is_latin(character: str) -> bool:
71 try:
72 description: str = unicodedata.name(character)
73 except ValueError: # Defensive: unicode database outdated?
74 return False
75 return "LATIN" in description
76
77
78@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
79def is_punctuation(character: str) -> bool:
80 character_category: str = unicodedata.category(character)
81
82 if "P" in character_category:
83 return True
84
85 character_range: str | None = unicode_range(character)
86
87 if character_range is None:
88 return False
89
90 return "Punctuation" in character_range
91
92
93@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
94def is_symbol(character: str) -> bool:
95 character_category: str = unicodedata.category(character)
96
97 if "S" in character_category or "N" in character_category:
98 return True
99
100 character_range: str | None = unicode_range(character)
101
102 if character_range is None:
103 return False
104
105 return "Forms" in character_range and character_category != "Lo"
106
107
108@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
109def is_emoticon(character: str) -> bool:
110 character_range: str | None = unicode_range(character)
111
112 if character_range is None:
113 return False
114
115 return "Emoticons" in character_range or "Pictographs" in character_range
116
117
118@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
119def is_separator(character: str) -> bool:
120 if character.isspace() or character in {"|", "+", "<", ">"}:
121 return True
122
123 character_category: str = unicodedata.category(character)
124
125 return "Z" in character_category or character_category in {"Po", "Pd", "Pc"}
126
127
128@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
129def is_case_variable(character: str) -> bool:
130 return character.islower() != character.isupper()
131
132
133@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
134def is_cjk(character: str) -> bool:
135 try:
136 character_name = unicodedata.name(character)
137 except ValueError: # Defensive: unicode database outdated?
138 return False
139
140 return "CJK" in character_name
141
142
143@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
144def is_hiragana(character: str) -> bool:
145 try:
146 character_name = unicodedata.name(character)
147 except ValueError: # Defensive: unicode database outdated?
148 return False
149
150 return "HIRAGANA" in character_name
151
152
153@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
154def is_katakana(character: str) -> bool:
155 try:
156 character_name = unicodedata.name(character)
157 except ValueError: # Defensive: unicode database outdated?
158 return False
159
160 return "KATAKANA" in character_name
161
162
163@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
164def is_hangul(character: str) -> bool:
165 try:
166 character_name = unicodedata.name(character)
167 except ValueError: # Defensive: unicode database outdated?
168 return False
169
170 return "HANGUL" in character_name
171
172
173@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
174def is_thai(character: str) -> bool:
175 try:
176 character_name = unicodedata.name(character)
177 except ValueError: # Defensive: unicode database outdated?
178 return False
179
180 return "THAI" in character_name
181
182
183@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
184def is_arabic(character: str) -> bool:
185 try:
186 character_name = unicodedata.name(character)
187 except ValueError: # Defensive: unicode database outdated?
188 return False
189
190 return "ARABIC" in character_name
191
192
193@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
194def is_arabic_isolated_form(character: str) -> bool:
195 try:
196 character_name = unicodedata.name(character)
197 except ValueError: # Defensive: unicode database outdated?
198 return False
199
200 return "ARABIC" in character_name and "ISOLATED FORM" in character_name
201
202
203@lru_cache(maxsize=len(UNICODE_RANGES_COMBINED))
204def is_unicode_range_secondary(range_name: str) -> bool:
205 return any(keyword in range_name for keyword in UNICODE_SECONDARY_RANGE_KEYWORD)
206
207
208@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
209def is_unprintable(character: str) -> bool:
210 return (
211 character.isspace() is False # includes \n \t \r \v
212 and character.isprintable() is False
213 and character != "\x1a" # Why? Its the ASCII substitute character.
214 and character != "\ufeff" # bug discovered in Python,
215 # Zero Width No-Break Space located in Arabic Presentation Forms-B, Unicode 1.1 not acknowledged as space.
216 )
217
218
219def any_specified_encoding(sequence: bytes, search_zone: int = 8192) -> str | None:
220 """
221 Extract using ASCII-only decoder any specified encoding in the first n-bytes.
222 """
223 if not isinstance(sequence, bytes):
224 raise TypeError
225
226 seq_len: int = len(sequence)
227
228 results: list[str] = findall(
229 RE_POSSIBLE_ENCODING_INDICATION,
230 sequence[: min(seq_len, search_zone)].decode("ascii", errors="ignore"),
231 )
232
233 if len(results) == 0:
234 return None
235
236 for specified_encoding in results:
237 specified_encoding = specified_encoding.lower().replace("-", "_")
238
239 encoding_alias: str
240 encoding_iana: str
241
242 for encoding_alias, encoding_iana in aliases.items():
243 if encoding_alias == specified_encoding:
244 return encoding_iana
245 if encoding_iana == specified_encoding:
246 return encoding_iana
247
248 return None
249
250
251@lru_cache(maxsize=128)
252def is_multi_byte_encoding(name: str) -> bool:
253 """
254 Verify is a specific encoding is a multi byte one based on it IANA name
255 """
256 return name in {
257 "utf_8",
258 "utf_8_sig",
259 "utf_16",
260 "utf_16_be",
261 "utf_16_le",
262 "utf_32",
263 "utf_32_le",
264 "utf_32_be",
265 "utf_7",
266 } or issubclass(
267 importlib.import_module(f"encodings.{name}").IncrementalDecoder,
268 MultibyteIncrementalDecoder,
269 )
270
271
272def identify_sig_or_bom(sequence: bytes) -> tuple[str | None, bytes]:
273 """
274 Identify and extract SIG/BOM in given sequence.
275 """
276
277 for iana_encoding in ENCODING_MARKS:
278 marks: bytes | list[bytes] = ENCODING_MARKS[iana_encoding]
279
280 if isinstance(marks, bytes):
281 marks = [marks]
282
283 for mark in marks:
284 if sequence.startswith(mark):
285 return iana_encoding, mark
286
287 return None, b""
288
289
290def should_strip_sig_or_bom(iana_encoding: str) -> bool:
291 return iana_encoding not in {"utf_16", "utf_32"}
292
293
294def iana_name(cp_name: str, strict: bool = True) -> str:
295 """Returns the Python normalized encoding name (Not the IANA official name)."""
296 cp_name = cp_name.lower().replace("-", "_")
297
298 encoding_alias: str
299 encoding_iana: str
300
301 for encoding_alias, encoding_iana in aliases.items():
302 if cp_name in [encoding_alias, encoding_iana]:
303 return encoding_iana
304
305 if strict:
306 raise ValueError(f"Unable to retrieve IANA for '{cp_name}'")
307
308 return cp_name
309
310
311def cp_similarity(iana_name_a: str, iana_name_b: str) -> float:
312 if is_multi_byte_encoding(iana_name_a) or is_multi_byte_encoding(iana_name_b):
313 return 0.0
314
315 decoder_a = importlib.import_module(f"encodings.{iana_name_a}").IncrementalDecoder
316 decoder_b = importlib.import_module(f"encodings.{iana_name_b}").IncrementalDecoder
317
318 id_a: IncrementalDecoder = decoder_a(errors="ignore")
319 id_b: IncrementalDecoder = decoder_b(errors="ignore")
320
321 character_match_count: int = 0
322
323 for i in range(255):
324 to_be_decoded: bytes = bytes([i])
325 if id_a.decode(to_be_decoded) == id_b.decode(to_be_decoded):
326 character_match_count += 1
327
328 return character_match_count / 254
329
330
331def is_cp_similar(iana_name_a: str, iana_name_b: str) -> bool:
332 """
333 Determine if two code page are at least 80% similar. IANA_SUPPORTED_SIMILAR dict was generated using
334 the function cp_similarity.
335 """
336 return (
337 iana_name_a in IANA_SUPPORTED_SIMILAR
338 and iana_name_b in IANA_SUPPORTED_SIMILAR[iana_name_a]
339 )
340
341
342def set_logging_handler(
343 name: str = "charset_normalizer",
344 level: int = logging.INFO,
345 format_string: str = "%(asctime)s | %(levelname)s | %(message)s",
346) -> None:
347 logger = logging.getLogger(name)
348 logger.setLevel(level)
349
350 handler = logging.StreamHandler()
351 handler.setFormatter(logging.Formatter(format_string))
352 logger.addHandler(handler)
353
354
355def cut_sequence_chunks(
356 sequences: bytes,
357 encoding_iana: str,
358 offsets: range,
359 chunk_size: int,
360 bom_or_sig_available: bool,
361 strip_sig_or_bom: bool,
362 sig_payload: bytes,
363 is_multi_byte_decoder: bool,
364 decoded_payload: str | None = None,
365) -> Generator[str, None, None]:
366 if decoded_payload and is_multi_byte_decoder is False:
367 for i in offsets:
368 chunk = decoded_payload[i : i + chunk_size]
369 if not chunk:
370 break
371 yield chunk
372 else:
373 for i in offsets:
374 chunk_end = i + chunk_size
375 if chunk_end > len(sequences) + 8:
376 continue
377
378 cut_sequence = sequences[i : i + chunk_size]
379
380 if bom_or_sig_available and strip_sig_or_bom is False:
381 cut_sequence = sig_payload + cut_sequence
382
383 chunk = cut_sequence.decode(
384 encoding_iana,
385 errors="ignore" if is_multi_byte_decoder else "strict",
386 )
387
388 # multi-byte bad cutting detector and adjustment
389 # not the cleanest way to perform that fix but clever enough for now.
390 if is_multi_byte_decoder and i > 0:
391 chunk_partial_size_chk: int = min(chunk_size, 16)
392
393 if (
394 decoded_payload
395 and chunk[:chunk_partial_size_chk] not in decoded_payload
396 ):
397 for j in range(i, i - 4, -1):
398 cut_sequence = sequences[j:chunk_end]
399
400 if bom_or_sig_available and strip_sig_or_bom is False:
401 cut_sequence = sig_payload + cut_sequence
402
403 chunk = cut_sequence.decode(encoding_iana, errors="ignore")
404
405 if chunk[:chunk_partial_size_chk] in decoded_payload:
406 break
407
408 yield chunk