1from __future__ import annotations
2
3import logging
4from os import PathLike
5from typing import BinaryIO
6
7from .cd import (
8 coherence_ratio,
9 encoding_languages,
10 mb_encoding_languages,
11 merge_coherence_ratios,
12)
13from .constant import IANA_SUPPORTED, TOO_BIG_SEQUENCE, TOO_SMALL_SEQUENCE, TRACE
14from .md import mess_ratio
15from .models import CharsetMatch, CharsetMatches
16from .utils import (
17 any_specified_encoding,
18 cut_sequence_chunks,
19 iana_name,
20 identify_sig_or_bom,
21 is_cp_similar,
22 is_multi_byte_encoding,
23 should_strip_sig_or_bom,
24)
25
26logger = logging.getLogger("charset_normalizer")
27explain_handler = logging.StreamHandler()
28explain_handler.setFormatter(
29 logging.Formatter("%(asctime)s | %(levelname)s | %(message)s")
30)
31
32
33def from_bytes(
34 sequences: bytes | bytearray,
35 steps: int = 5,
36 chunk_size: int = 512,
37 threshold: float = 0.2,
38 cp_isolation: list[str] | None = None,
39 cp_exclusion: list[str] | None = None,
40 preemptive_behaviour: bool = True,
41 explain: bool = False,
42 language_threshold: float = 0.1,
43 enable_fallback: bool = True,
44) -> CharsetMatches:
45 """
46 Given a raw bytes sequence, return the best possibles charset usable to render str objects.
47 If there is no results, it is a strong indicator that the source is binary/not text.
48 By default, the process will extract 5 blocks of 512o each to assess the mess and coherence of a given sequence.
49 And will give up a particular code page after 20% of measured mess. Those criteria are customizable at will.
50
51 The preemptive behavior DOES NOT replace the traditional detection workflow, it prioritize a particular code page
52 but never take it for granted. Can improve the performance.
53
54 You may want to focus your attention to some code page or/and not others, use cp_isolation and cp_exclusion for that
55 purpose.
56
57 This function will strip the SIG in the payload/sequence every time except on UTF-16, UTF-32.
58 By default the library does not setup any handler other than the NullHandler, if you choose to set the 'explain'
59 toggle to True it will alter the logger configuration to add a StreamHandler that is suitable for debugging.
60 Custom logging format and handler can be set manually.
61 """
62
63 if not isinstance(sequences, (bytearray, bytes)):
64 raise TypeError(
65 "Expected object of type bytes or bytearray, got: {}".format(
66 type(sequences)
67 )
68 )
69
70 if explain:
71 previous_logger_level: int = logger.level
72 logger.addHandler(explain_handler)
73 logger.setLevel(TRACE)
74
75 length: int = len(sequences)
76
77 if length == 0:
78 logger.debug("Encoding detection on empty bytes, assuming utf_8 intention.")
79 if explain: # Defensive: ensure exit path clean handler
80 logger.removeHandler(explain_handler)
81 logger.setLevel(previous_logger_level or logging.WARNING)
82 return CharsetMatches([CharsetMatch(sequences, "utf_8", 0.0, False, [], "")])
83
84 if cp_isolation is not None:
85 logger.log(
86 TRACE,
87 "cp_isolation is set. use this flag for debugging purpose. "
88 "limited list of encoding allowed : %s.",
89 ", ".join(cp_isolation),
90 )
91 cp_isolation = [iana_name(cp, False) for cp in cp_isolation]
92 else:
93 cp_isolation = []
94
95 if cp_exclusion is not None:
96 logger.log(
97 TRACE,
98 "cp_exclusion is set. use this flag for debugging purpose. "
99 "limited list of encoding excluded : %s.",
100 ", ".join(cp_exclusion),
101 )
102 cp_exclusion = [iana_name(cp, False) for cp in cp_exclusion]
103 else:
104 cp_exclusion = []
105
106 if length <= (chunk_size * steps):
107 logger.log(
108 TRACE,
109 "override steps (%i) and chunk_size (%i) as content does not fit (%i byte(s) given) parameters.",
110 steps,
111 chunk_size,
112 length,
113 )
114 steps = 1
115 chunk_size = length
116
117 if steps > 1 and length / steps < chunk_size:
118 chunk_size = int(length / steps)
119
120 is_too_small_sequence: bool = len(sequences) < TOO_SMALL_SEQUENCE
121 is_too_large_sequence: bool = len(sequences) >= TOO_BIG_SEQUENCE
122
123 if is_too_small_sequence:
124 logger.log(
125 TRACE,
126 "Trying to detect encoding from a tiny portion of ({}) byte(s).".format(
127 length
128 ),
129 )
130 elif is_too_large_sequence:
131 logger.log(
132 TRACE,
133 "Using lazy str decoding because the payload is quite large, ({}) byte(s).".format(
134 length
135 ),
136 )
137
138 prioritized_encodings: list[str] = []
139
140 specified_encoding: str | None = (
141 any_specified_encoding(sequences) if preemptive_behaviour else None
142 )
143
144 if specified_encoding is not None:
145 prioritized_encodings.append(specified_encoding)
146 logger.log(
147 TRACE,
148 "Detected declarative mark in sequence. Priority +1 given for %s.",
149 specified_encoding,
150 )
151
152 tested: set[str] = set()
153 tested_but_hard_failure: list[str] = []
154 tested_but_soft_failure: list[str] = []
155
156 fallback_ascii: CharsetMatch | None = None
157 fallback_u8: CharsetMatch | None = None
158 fallback_specified: CharsetMatch | None = None
159
160 results: CharsetMatches = CharsetMatches()
161
162 early_stop_results: CharsetMatches = CharsetMatches()
163
164 sig_encoding, sig_payload = identify_sig_or_bom(sequences)
165
166 if sig_encoding is not None:
167 prioritized_encodings.append(sig_encoding)
168 logger.log(
169 TRACE,
170 "Detected a SIG or BOM mark on first %i byte(s). Priority +1 given for %s.",
171 len(sig_payload),
172 sig_encoding,
173 )
174
175 prioritized_encodings.append("ascii")
176
177 if "utf_8" not in prioritized_encodings:
178 prioritized_encodings.append("utf_8")
179
180 for encoding_iana in prioritized_encodings + IANA_SUPPORTED:
181 if cp_isolation and encoding_iana not in cp_isolation:
182 continue
183
184 if cp_exclusion and encoding_iana in cp_exclusion:
185 continue
186
187 if encoding_iana in tested:
188 continue
189
190 tested.add(encoding_iana)
191
192 decoded_payload: str | None = None
193 bom_or_sig_available: bool = sig_encoding == encoding_iana
194 strip_sig_or_bom: bool = bom_or_sig_available and should_strip_sig_or_bom(
195 encoding_iana
196 )
197
198 if encoding_iana in {"utf_16", "utf_32"} and not bom_or_sig_available:
199 logger.log(
200 TRACE,
201 "Encoding %s won't be tested as-is because it require a BOM. Will try some sub-encoder LE/BE.",
202 encoding_iana,
203 )
204 continue
205 if encoding_iana in {"utf_7"} and not bom_or_sig_available:
206 logger.log(
207 TRACE,
208 "Encoding %s won't be tested as-is because detection is unreliable without BOM/SIG.",
209 encoding_iana,
210 )
211 continue
212
213 try:
214 is_multi_byte_decoder: bool = is_multi_byte_encoding(encoding_iana)
215 except (ModuleNotFoundError, ImportError):
216 logger.log(
217 TRACE,
218 "Encoding %s does not provide an IncrementalDecoder",
219 encoding_iana,
220 )
221 continue
222
223 try:
224 if is_too_large_sequence and is_multi_byte_decoder is False:
225 str(
226 (
227 sequences[: int(50e4)]
228 if strip_sig_or_bom is False
229 else sequences[len(sig_payload) : int(50e4)]
230 ),
231 encoding=encoding_iana,
232 )
233 else:
234 decoded_payload = str(
235 (
236 sequences
237 if strip_sig_or_bom is False
238 else sequences[len(sig_payload) :]
239 ),
240 encoding=encoding_iana,
241 )
242 except (UnicodeDecodeError, LookupError) as e:
243 if not isinstance(e, LookupError):
244 logger.log(
245 TRACE,
246 "Code page %s does not fit given bytes sequence at ALL. %s",
247 encoding_iana,
248 str(e),
249 )
250 tested_but_hard_failure.append(encoding_iana)
251 continue
252
253 similar_soft_failure_test: bool = False
254
255 for encoding_soft_failed in tested_but_soft_failure:
256 if is_cp_similar(encoding_iana, encoding_soft_failed):
257 similar_soft_failure_test = True
258 break
259
260 if similar_soft_failure_test:
261 logger.log(
262 TRACE,
263 "%s is deemed too similar to code page %s and was consider unsuited already. Continuing!",
264 encoding_iana,
265 encoding_soft_failed,
266 )
267 continue
268
269 r_ = range(
270 0 if not bom_or_sig_available else len(sig_payload),
271 length,
272 int(length / steps),
273 )
274
275 multi_byte_bonus: bool = (
276 is_multi_byte_decoder
277 and decoded_payload is not None
278 and len(decoded_payload) < length
279 )
280
281 if multi_byte_bonus:
282 logger.log(
283 TRACE,
284 "Code page %s is a multi byte encoding table and it appear that at least one character "
285 "was encoded using n-bytes.",
286 encoding_iana,
287 )
288
289 max_chunk_gave_up: int = int(len(r_) / 4)
290
291 max_chunk_gave_up = max(max_chunk_gave_up, 2)
292 early_stop_count: int = 0
293 lazy_str_hard_failure = False
294
295 md_chunks: list[str] = []
296 md_ratios = []
297
298 try:
299 for chunk in cut_sequence_chunks(
300 sequences,
301 encoding_iana,
302 r_,
303 chunk_size,
304 bom_or_sig_available,
305 strip_sig_or_bom,
306 sig_payload,
307 is_multi_byte_decoder,
308 decoded_payload,
309 ):
310 md_chunks.append(chunk)
311
312 md_ratios.append(
313 mess_ratio(
314 chunk,
315 threshold,
316 explain is True and 1 <= len(cp_isolation) <= 2,
317 )
318 )
319
320 if md_ratios[-1] >= threshold:
321 early_stop_count += 1
322
323 if (early_stop_count >= max_chunk_gave_up) or (
324 bom_or_sig_available and strip_sig_or_bom is False
325 ):
326 break
327 except (
328 UnicodeDecodeError
329 ) as e: # Lazy str loading may have missed something there
330 logger.log(
331 TRACE,
332 "LazyStr Loading: After MD chunk decode, code page %s does not fit given bytes sequence at ALL. %s",
333 encoding_iana,
334 str(e),
335 )
336 early_stop_count = max_chunk_gave_up
337 lazy_str_hard_failure = True
338
339 # We might want to check the sequence again with the whole content
340 # Only if initial MD tests passes
341 if (
342 not lazy_str_hard_failure
343 and is_too_large_sequence
344 and not is_multi_byte_decoder
345 ):
346 try:
347 sequences[int(50e3) :].decode(encoding_iana, errors="strict")
348 except UnicodeDecodeError as e:
349 logger.log(
350 TRACE,
351 "LazyStr Loading: After final lookup, code page %s does not fit given bytes sequence at ALL. %s",
352 encoding_iana,
353 str(e),
354 )
355 tested_but_hard_failure.append(encoding_iana)
356 continue
357
358 mean_mess_ratio: float = sum(md_ratios) / len(md_ratios) if md_ratios else 0.0
359 if mean_mess_ratio >= threshold or early_stop_count >= max_chunk_gave_up:
360 tested_but_soft_failure.append(encoding_iana)
361 logger.log(
362 TRACE,
363 "%s was excluded because of initial chaos probing. Gave up %i time(s). "
364 "Computed mean chaos is %f %%.",
365 encoding_iana,
366 early_stop_count,
367 round(mean_mess_ratio * 100, ndigits=3),
368 )
369 # Preparing those fallbacks in case we got nothing.
370 if (
371 enable_fallback
372 and encoding_iana
373 in ["ascii", "utf_8", specified_encoding, "utf_16", "utf_32"]
374 and not lazy_str_hard_failure
375 ):
376 fallback_entry = CharsetMatch(
377 sequences,
378 encoding_iana,
379 threshold,
380 bom_or_sig_available,
381 [],
382 decoded_payload,
383 preemptive_declaration=specified_encoding,
384 )
385 if encoding_iana == specified_encoding:
386 fallback_specified = fallback_entry
387 elif encoding_iana == "ascii":
388 fallback_ascii = fallback_entry
389 else:
390 fallback_u8 = fallback_entry
391 continue
392
393 logger.log(
394 TRACE,
395 "%s passed initial chaos probing. Mean measured chaos is %f %%",
396 encoding_iana,
397 round(mean_mess_ratio * 100, ndigits=3),
398 )
399
400 if not is_multi_byte_decoder:
401 target_languages: list[str] = encoding_languages(encoding_iana)
402 else:
403 target_languages = mb_encoding_languages(encoding_iana)
404
405 if target_languages:
406 logger.log(
407 TRACE,
408 "{} should target any language(s) of {}".format(
409 encoding_iana, str(target_languages)
410 ),
411 )
412
413 cd_ratios = []
414
415 # We shall skip the CD when its about ASCII
416 # Most of the time its not relevant to run "language-detection" on it.
417 if encoding_iana != "ascii":
418 for chunk in md_chunks:
419 chunk_languages = coherence_ratio(
420 chunk,
421 language_threshold,
422 ",".join(target_languages) if target_languages else None,
423 )
424
425 cd_ratios.append(chunk_languages)
426
427 cd_ratios_merged = merge_coherence_ratios(cd_ratios)
428
429 if cd_ratios_merged:
430 logger.log(
431 TRACE,
432 "We detected language {} using {}".format(
433 cd_ratios_merged, encoding_iana
434 ),
435 )
436
437 current_match = CharsetMatch(
438 sequences,
439 encoding_iana,
440 mean_mess_ratio,
441 bom_or_sig_available,
442 cd_ratios_merged,
443 (
444 decoded_payload
445 if (
446 is_too_large_sequence is False
447 or encoding_iana in [specified_encoding, "ascii", "utf_8"]
448 )
449 else None
450 ),
451 preemptive_declaration=specified_encoding,
452 )
453
454 results.append(current_match)
455
456 if (
457 encoding_iana in [specified_encoding, "ascii", "utf_8"]
458 and mean_mess_ratio < 0.1
459 ):
460 # If md says nothing to worry about, then... stop immediately!
461 if mean_mess_ratio == 0.0:
462 logger.debug(
463 "Encoding detection: %s is most likely the one.",
464 current_match.encoding,
465 )
466 if explain: # Defensive: ensure exit path clean handler
467 logger.removeHandler(explain_handler)
468 logger.setLevel(previous_logger_level)
469 return CharsetMatches([current_match])
470
471 early_stop_results.append(current_match)
472
473 if (
474 len(early_stop_results)
475 and (specified_encoding is None or specified_encoding in tested)
476 and "ascii" in tested
477 and "utf_8" in tested
478 ):
479 probable_result: CharsetMatch = early_stop_results.best() # type: ignore[assignment]
480 logger.debug(
481 "Encoding detection: %s is most likely the one.",
482 probable_result.encoding,
483 )
484 if explain: # Defensive: ensure exit path clean handler
485 logger.removeHandler(explain_handler)
486 logger.setLevel(previous_logger_level)
487
488 return CharsetMatches([probable_result])
489
490 if encoding_iana == sig_encoding:
491 logger.debug(
492 "Encoding detection: %s is most likely the one as we detected a BOM or SIG within "
493 "the beginning of the sequence.",
494 encoding_iana,
495 )
496 if explain: # Defensive: ensure exit path clean handler
497 logger.removeHandler(explain_handler)
498 logger.setLevel(previous_logger_level)
499 return CharsetMatches([results[encoding_iana]])
500
501 if len(results) == 0:
502 if fallback_u8 or fallback_ascii or fallback_specified:
503 logger.log(
504 TRACE,
505 "Nothing got out of the detection process. Using ASCII/UTF-8/Specified fallback.",
506 )
507
508 if fallback_specified:
509 logger.debug(
510 "Encoding detection: %s will be used as a fallback match",
511 fallback_specified.encoding,
512 )
513 results.append(fallback_specified)
514 elif (
515 (fallback_u8 and fallback_ascii is None)
516 or (
517 fallback_u8
518 and fallback_ascii
519 and fallback_u8.fingerprint != fallback_ascii.fingerprint
520 )
521 or (fallback_u8 is not None)
522 ):
523 logger.debug("Encoding detection: utf_8 will be used as a fallback match")
524 results.append(fallback_u8)
525 elif fallback_ascii:
526 logger.debug("Encoding detection: ascii will be used as a fallback match")
527 results.append(fallback_ascii)
528
529 if results:
530 logger.debug(
531 "Encoding detection: Found %s as plausible (best-candidate) for content. With %i alternatives.",
532 results.best().encoding, # type: ignore
533 len(results) - 1,
534 )
535 else:
536 logger.debug("Encoding detection: Unable to determine any suitable charset.")
537
538 if explain:
539 logger.removeHandler(explain_handler)
540 logger.setLevel(previous_logger_level)
541
542 return results
543
544
545def from_fp(
546 fp: BinaryIO,
547 steps: int = 5,
548 chunk_size: int = 512,
549 threshold: float = 0.20,
550 cp_isolation: list[str] | None = None,
551 cp_exclusion: list[str] | None = None,
552 preemptive_behaviour: bool = True,
553 explain: bool = False,
554 language_threshold: float = 0.1,
555 enable_fallback: bool = True,
556) -> CharsetMatches:
557 """
558 Same thing than the function from_bytes but using a file pointer that is already ready.
559 Will not close the file pointer.
560 """
561 return from_bytes(
562 fp.read(),
563 steps,
564 chunk_size,
565 threshold,
566 cp_isolation,
567 cp_exclusion,
568 preemptive_behaviour,
569 explain,
570 language_threshold,
571 enable_fallback,
572 )
573
574
575def from_path(
576 path: str | bytes | PathLike, # type: ignore[type-arg]
577 steps: int = 5,
578 chunk_size: int = 512,
579 threshold: float = 0.20,
580 cp_isolation: list[str] | None = None,
581 cp_exclusion: list[str] | None = None,
582 preemptive_behaviour: bool = True,
583 explain: bool = False,
584 language_threshold: float = 0.1,
585 enable_fallback: bool = True,
586) -> CharsetMatches:
587 """
588 Same thing than the function from_bytes but with one extra step. Opening and reading given file path in binary mode.
589 Can raise IOError.
590 """
591 with open(path, "rb") as fp:
592 return from_fp(
593 fp,
594 steps,
595 chunk_size,
596 threshold,
597 cp_isolation,
598 cp_exclusion,
599 preemptive_behaviour,
600 explain,
601 language_threshold,
602 enable_fallback,
603 )
604
605
606def is_binary(
607 fp_or_path_or_payload: PathLike | str | BinaryIO | bytes, # type: ignore[type-arg]
608 steps: int = 5,
609 chunk_size: int = 512,
610 threshold: float = 0.20,
611 cp_isolation: list[str] | None = None,
612 cp_exclusion: list[str] | None = None,
613 preemptive_behaviour: bool = True,
614 explain: bool = False,
615 language_threshold: float = 0.1,
616 enable_fallback: bool = False,
617) -> bool:
618 """
619 Detect if the given input (file, bytes, or path) points to a binary file. aka. not a string.
620 Based on the same main heuristic algorithms and default kwargs at the sole exception that fallbacks match
621 are disabled to be stricter around ASCII-compatible but unlikely to be a string.
622 """
623 if isinstance(fp_or_path_or_payload, (str, PathLike)):
624 guesses = from_path(
625 fp_or_path_or_payload,
626 steps=steps,
627 chunk_size=chunk_size,
628 threshold=threshold,
629 cp_isolation=cp_isolation,
630 cp_exclusion=cp_exclusion,
631 preemptive_behaviour=preemptive_behaviour,
632 explain=explain,
633 language_threshold=language_threshold,
634 enable_fallback=enable_fallback,
635 )
636 elif isinstance(
637 fp_or_path_or_payload,
638 (
639 bytes,
640 bytearray,
641 ),
642 ):
643 guesses = from_bytes(
644 fp_or_path_or_payload,
645 steps=steps,
646 chunk_size=chunk_size,
647 threshold=threshold,
648 cp_isolation=cp_isolation,
649 cp_exclusion=cp_exclusion,
650 preemptive_behaviour=preemptive_behaviour,
651 explain=explain,
652 language_threshold=language_threshold,
653 enable_fallback=enable_fallback,
654 )
655 else:
656 guesses = from_fp(
657 fp_or_path_or_payload,
658 steps=steps,
659 chunk_size=chunk_size,
660 threshold=threshold,
661 cp_isolation=cp_isolation,
662 cp_exclusion=cp_exclusion,
663 preemptive_behaviour=preemptive_behaviour,
664 explain=explain,
665 language_threshold=language_threshold,
666 enable_fallback=enable_fallback,
667 )
668
669 return not guesses