1from __future__ import annotations
2
3import logging
4from os import PathLike
5from typing import BinaryIO
6
7from .cd import (
8 coherence_ratio,
9 encoding_languages,
10 mb_encoding_languages,
11 merge_coherence_ratios,
12)
13from .constant import (
14 IANA_SUPPORTED,
15 IANA_SUPPORTED_SIMILAR,
16 TOO_BIG_SEQUENCE,
17 TOO_SMALL_SEQUENCE,
18 TRACE,
19)
20from .md import mess_ratio
21from .models import CharsetMatch, CharsetMatches
22from .utils import (
23 any_specified_encoding,
24 cut_sequence_chunks,
25 iana_name,
26 identify_sig_or_bom,
27 is_multi_byte_encoding,
28 should_strip_sig_or_bom,
29)
30
31logger = logging.getLogger("charset_normalizer")
32explain_handler = logging.StreamHandler()
33explain_handler.setFormatter(
34 logging.Formatter("%(asctime)s | %(levelname)s | %(message)s")
35)
36
37# Pre-compute a reordered encoding list: multibyte first, then single-byte.
38# This allows the mb_definitive_match optimization to fire earlier, skipping
39# all single-byte encodings for genuine CJK content. Multibyte codecs
40# hard-fail (UnicodeDecodeError) on single-byte data almost instantly, so
41# testing them first costs negligible time for non-CJK files.
42_mb_supported: list[str] = []
43_sb_supported: list[str] = []
44
45for _supported_enc in IANA_SUPPORTED:
46 try:
47 if is_multi_byte_encoding(_supported_enc):
48 _mb_supported.append(_supported_enc)
49 else:
50 _sb_supported.append(_supported_enc)
51 except ImportError:
52 _sb_supported.append(_supported_enc)
53
54IANA_SUPPORTED_MB_FIRST: list[str] = _mb_supported + _sb_supported
55
56
57def from_bytes(
58 sequences: bytes | bytearray,
59 steps: int = 5,
60 chunk_size: int = 512,
61 threshold: float = 0.2,
62 cp_isolation: list[str] | None = None,
63 cp_exclusion: list[str] | None = None,
64 preemptive_behaviour: bool = True,
65 explain: bool = False,
66 language_threshold: float = 0.1,
67 enable_fallback: bool = True,
68) -> CharsetMatches:
69 """
70 Given a raw bytes sequence, return the best possibles charset usable to render str objects.
71 If there is no results, it is a strong indicator that the source is binary/not text.
72 By default, the process will extract 5 blocks of 512o each to assess the mess and coherence of a given sequence.
73 And will give up a particular code page after 20% of measured mess. Those criteria are customizable at will.
74
75 The preemptive behavior DOES NOT replace the traditional detection workflow, it prioritize a particular code page
76 but never take it for granted. Can improve the performance.
77
78 You may want to focus your attention to some code page or/and not others, use cp_isolation and cp_exclusion for that
79 purpose.
80
81 This function will strip the SIG in the payload/sequence every time except on UTF-16, UTF-32.
82 By default the library does not setup any handler other than the NullHandler, if you choose to set the 'explain'
83 toggle to True it will alter the logger configuration to add a StreamHandler that is suitable for debugging.
84 Custom logging format and handler can be set manually.
85 """
86
87 if not isinstance(sequences, (bytearray, bytes)):
88 raise TypeError(
89 "Expected object of type bytes or bytearray, got: {}".format(
90 type(sequences)
91 )
92 )
93
94 if explain:
95 previous_logger_level: int = logger.level
96 logger.addHandler(explain_handler)
97 logger.setLevel(TRACE)
98
99 length: int = len(sequences)
100
101 if length == 0:
102 logger.debug("Encoding detection on empty bytes, assuming utf_8 intention.")
103 if explain: # Defensive: ensure exit path clean handler
104 logger.removeHandler(explain_handler)
105 logger.setLevel(previous_logger_level)
106 return CharsetMatches([CharsetMatch(sequences, "utf_8", 0.0, False, [], "")])
107
108 if cp_isolation is not None:
109 logger.log(
110 TRACE,
111 "cp_isolation is set. use this flag for debugging purpose. "
112 "limited list of encoding allowed : %s.",
113 ", ".join(cp_isolation),
114 )
115 cp_isolation = [iana_name(cp, False) for cp in cp_isolation]
116 else:
117 cp_isolation = []
118
119 if cp_exclusion is not None:
120 logger.log(
121 TRACE,
122 "cp_exclusion is set. use this flag for debugging purpose. "
123 "limited list of encoding excluded : %s.",
124 ", ".join(cp_exclusion),
125 )
126 cp_exclusion = [iana_name(cp, False) for cp in cp_exclusion]
127 else:
128 cp_exclusion = []
129
130 if length <= (chunk_size * steps):
131 logger.log(
132 TRACE,
133 "override steps (%i) and chunk_size (%i) as content does not fit (%i byte(s) given) parameters.",
134 steps,
135 chunk_size,
136 length,
137 )
138 steps = 1
139 chunk_size = length
140
141 if steps > 1 and length / steps < chunk_size:
142 chunk_size = int(length / steps)
143
144 is_too_small_sequence: bool = len(sequences) < TOO_SMALL_SEQUENCE
145 is_too_large_sequence: bool = len(sequences) >= TOO_BIG_SEQUENCE
146
147 if is_too_small_sequence:
148 logger.log(
149 TRACE,
150 "Trying to detect encoding from a tiny portion of ({}) byte(s).".format(
151 length
152 ),
153 )
154 elif is_too_large_sequence:
155 logger.log(
156 TRACE,
157 "Using lazy str decoding because the payload is quite large, ({}) byte(s).".format(
158 length
159 ),
160 )
161
162 prioritized_encodings: list[str] = []
163
164 specified_encoding: str | None = (
165 any_specified_encoding(sequences) if preemptive_behaviour else None
166 )
167
168 if specified_encoding is not None:
169 prioritized_encodings.append(specified_encoding)
170 logger.log(
171 TRACE,
172 "Detected declarative mark in sequence. Priority +1 given for %s.",
173 specified_encoding,
174 )
175
176 tested: set[str] = set()
177 tested_but_hard_failure: list[str] = []
178 tested_but_soft_failure: list[str] = []
179 soft_failure_skip: set[str] = set()
180 success_fast_tracked: set[str] = set()
181
182 # Cache for decoded payload deduplication: hash(decoded_payload) -> (mean_mess_ratio, cd_ratios_merged, passed)
183 # When multiple encodings decode to the exact same string, we can skip the expensive
184 # mess_ratio and coherence_ratio analysis and reuse the results from the first encoding.
185 payload_result_cache: dict[int, tuple[float, list[tuple[str, float]], bool]] = {}
186
187 # When a definitive result (chaos=0.0 and good coherence) is found after testing
188 # the prioritized encodings (ascii, utf_8), we can significantly reduce the remaining
189 # work. Encodings that target completely different language families (e.g., Cyrillic
190 # when the definitive match is Latin) are skipped entirely.
191 # Additionally, for same-family encodings that pass chaos probing, we reuse the
192 # definitive match's coherence ratios instead of recomputing them — a major savings
193 # since coherence_ratio accounts for ~30% of total time on slow Latin files.
194 definitive_match_found: bool = False
195 definitive_target_languages: set[str] = set()
196 # After the definitive match fires, we cap the number of additional same-family
197 # single-byte encodings that pass chaos probing. Once we've accumulated enough
198 # good candidates (N), further same-family SB encodings are unlikely to produce
199 # a better best() result and just waste mess_ratio + coherence_ratio time.
200 # The first encoding to trigger the definitive match is NOT counted (it's already in).
201 post_definitive_sb_success_count: int = 0
202 POST_DEFINITIVE_SB_CAP: int = 7
203
204 # When a non-UTF multibyte encoding passes chaos probing with significant multibyte
205 # content (decoded length < 98% of raw length), skip all remaining single-byte encodings.
206 # Rationale: multi-byte decoders (CJK) have strict byte-sequence validation — if they
207 # decode without error AND pass chaos probing with substantial multibyte content, the
208 # data is genuinely multibyte encoded. Single-byte encodings will always decode (every
209 # byte maps to something) but waste time on mess_ratio before failing.
210 # The 98% threshold prevents false triggers on files that happen to have a few valid
211 # multibyte pairs (e.g., cp424/_ude_1.txt where big5 decodes with 99% ratio).
212 mb_definitive_match_found: bool = False
213
214 fallback_ascii: CharsetMatch | None = None
215 fallback_u8: CharsetMatch | None = None
216 fallback_specified: CharsetMatch | None = None
217
218 results: CharsetMatches = CharsetMatches()
219
220 early_stop_results: CharsetMatches = CharsetMatches()
221
222 sig_encoding, sig_payload = identify_sig_or_bom(sequences)
223
224 if sig_encoding is not None:
225 prioritized_encodings.append(sig_encoding)
226 logger.log(
227 TRACE,
228 "Detected a SIG or BOM mark on first %i byte(s). Priority +1 given for %s.",
229 len(sig_payload),
230 sig_encoding,
231 )
232
233 prioritized_encodings.append("ascii")
234
235 if "utf_8" not in prioritized_encodings:
236 prioritized_encodings.append("utf_8")
237
238 for encoding_iana in prioritized_encodings + IANA_SUPPORTED_MB_FIRST:
239 if cp_isolation and encoding_iana not in cp_isolation:
240 continue
241
242 if cp_exclusion and encoding_iana in cp_exclusion:
243 continue
244
245 if encoding_iana in tested:
246 continue
247
248 tested.add(encoding_iana)
249
250 decoded_payload: str | None = None
251 bom_or_sig_available: bool = sig_encoding == encoding_iana
252 strip_sig_or_bom: bool = bom_or_sig_available and should_strip_sig_or_bom(
253 encoding_iana
254 )
255
256 if encoding_iana in {"utf_16", "utf_32"} and not bom_or_sig_available:
257 logger.log(
258 TRACE,
259 "Encoding %s won't be tested as-is because it require a BOM. Will try some sub-encoder LE/BE.",
260 encoding_iana,
261 )
262 continue
263 if encoding_iana in {"utf_7"} and not bom_or_sig_available:
264 logger.log(
265 TRACE,
266 "Encoding %s won't be tested as-is because detection is unreliable without BOM/SIG.",
267 encoding_iana,
268 )
269 continue
270
271 # Skip encodings similar to ones that already soft-failed (high mess ratio).
272 # Checked BEFORE the expensive decode attempt.
273 if encoding_iana in soft_failure_skip:
274 logger.log(
275 TRACE,
276 "%s is deemed too similar to a code page that was already considered unsuited. Continuing!",
277 encoding_iana,
278 )
279 continue
280
281 # Skip encodings that were already fast-tracked from a similar successful encoding.
282 if encoding_iana in success_fast_tracked:
283 logger.log(
284 TRACE,
285 "Skipping %s: already fast-tracked from a similar successful encoding.",
286 encoding_iana,
287 )
288 continue
289
290 try:
291 is_multi_byte_decoder: bool = is_multi_byte_encoding(encoding_iana)
292 except (ModuleNotFoundError, ImportError): # Defensive:
293 logger.log(
294 TRACE,
295 "Encoding %s does not provide an IncrementalDecoder",
296 encoding_iana,
297 )
298 continue
299
300 # When we've already found a definitive match (chaos=0.0 with good coherence)
301 # after testing the prioritized encodings, skip encodings that target
302 # completely different language families. This avoids running expensive
303 # mess_ratio + coherence_ratio on clearly unrelated candidates (e.g., Cyrillic
304 # when the definitive match is Latin-based).
305 if definitive_match_found:
306 if not is_multi_byte_decoder:
307 enc_languages = set(encoding_languages(encoding_iana))
308 else:
309 enc_languages = set(mb_encoding_languages(encoding_iana))
310 if not enc_languages.intersection(definitive_target_languages):
311 logger.log(
312 TRACE,
313 "Skipping %s: definitive match already found, this encoding targets different languages (%s vs %s).",
314 encoding_iana,
315 enc_languages,
316 definitive_target_languages,
317 )
318 continue
319
320 # After the definitive match, cap the number of additional same-family
321 # single-byte encodings that pass chaos probing. This avoids testing the
322 # tail of rare, low-value same-family encodings (mac_iceland, cp860, etc.)
323 # that almost never change best() but each cost ~1-2ms of mess_ratio + coherence.
324 if (
325 definitive_match_found
326 and not is_multi_byte_decoder
327 and post_definitive_sb_success_count >= POST_DEFINITIVE_SB_CAP
328 ):
329 logger.log(
330 TRACE,
331 "Skipping %s: already accumulated %d same-family results after definitive match (cap=%d).",
332 encoding_iana,
333 post_definitive_sb_success_count,
334 POST_DEFINITIVE_SB_CAP,
335 )
336 continue
337
338 # When a multibyte encoding with significant multibyte content has already
339 # passed chaos probing, skip all single-byte encodings. They will either fail
340 # chaos probing (wasting mess_ratio time) or produce inferior results.
341 if mb_definitive_match_found and not is_multi_byte_decoder:
342 logger.log(
343 TRACE,
344 "Skipping single-byte %s: multi-byte definitive match already found.",
345 encoding_iana,
346 )
347 continue
348
349 try:
350 if is_too_large_sequence and is_multi_byte_decoder is False:
351 str(
352 (
353 sequences[: int(50e4)]
354 if strip_sig_or_bom is False
355 else sequences[len(sig_payload) : int(50e4)]
356 ),
357 encoding=encoding_iana,
358 )
359 else:
360 # UTF-7 BOM is encoded in modified Base64 whose byte boundary
361 # can overlap with the next character. Stripping raw SIG bytes
362 # before decoding may leave stray bytes that decode as garbage.
363 # Decode the full sequence and remove the leading BOM char instead.
364 # see https://github.com/jawah/charset_normalizer/issues/718
365 # and https://github.com/jawah/charset_normalizer/issues/716
366 if encoding_iana == "utf_7" and bom_or_sig_available:
367 decoded_payload = str(
368 sequences,
369 encoding=encoding_iana,
370 )
371 if decoded_payload and decoded_payload[0] == "\ufeff":
372 decoded_payload = decoded_payload[1:]
373 else:
374 decoded_payload = str(
375 (
376 sequences
377 if strip_sig_or_bom is False
378 else sequences[len(sig_payload) :]
379 ),
380 encoding=encoding_iana,
381 )
382 except (UnicodeDecodeError, LookupError) as e:
383 if not isinstance(e, LookupError):
384 logger.log(
385 TRACE,
386 "Code page %s does not fit given bytes sequence at ALL. %s",
387 encoding_iana,
388 str(e),
389 )
390 tested_but_hard_failure.append(encoding_iana)
391 continue
392
393 r_ = range(
394 0 if not bom_or_sig_available else len(sig_payload),
395 length,
396 int(length / steps),
397 )
398
399 multi_byte_bonus: bool = (
400 is_multi_byte_decoder
401 and decoded_payload is not None
402 and len(decoded_payload) < length
403 )
404
405 if multi_byte_bonus:
406 logger.log(
407 TRACE,
408 "Code page %s is a multi byte encoding table and it appear that at least one character "
409 "was encoded using n-bytes.",
410 encoding_iana,
411 )
412
413 # Payload-hash deduplication: if another encoding already decoded to the
414 # exact same string, reuse its mess_ratio and coherence results entirely.
415 # This is strictly more general than the old IANA_SUPPORTED_SIMILAR approach
416 # because it catches ALL identical decoding, not just pre-mapped ones.
417 if decoded_payload is not None and not is_multi_byte_decoder:
418 payload_hash: int = hash(decoded_payload)
419 cached = payload_result_cache.get(payload_hash)
420 if cached is not None:
421 cached_mess, cached_cd, cached_passed = cached
422 if cached_passed:
423 # The previous encoding with identical output passed chaos probing.
424 fast_match = CharsetMatch(
425 sequences,
426 encoding_iana,
427 cached_mess,
428 bom_or_sig_available,
429 cached_cd,
430 (
431 decoded_payload
432 if (
433 is_too_large_sequence is False
434 or encoding_iana
435 in [specified_encoding, "ascii", "utf_8"]
436 )
437 else None
438 ),
439 preemptive_declaration=specified_encoding,
440 )
441 results.append(fast_match)
442 success_fast_tracked.add(encoding_iana)
443 logger.log(
444 TRACE,
445 "%s fast-tracked (identical decoded payload to a prior encoding, chaos=%f %%).",
446 encoding_iana,
447 round(cached_mess * 100, ndigits=3),
448 )
449
450 if (
451 encoding_iana in [specified_encoding, "ascii", "utf_8"]
452 and cached_mess < 0.1
453 ):
454 if cached_mess == 0.0:
455 logger.debug(
456 "Encoding detection: %s is most likely the one.",
457 fast_match.encoding,
458 )
459 if explain:
460 logger.removeHandler(explain_handler)
461 logger.setLevel(previous_logger_level)
462 return CharsetMatches([fast_match])
463 early_stop_results.append(fast_match)
464
465 if (
466 len(early_stop_results)
467 and (specified_encoding is None or specified_encoding in tested)
468 and "ascii" in tested
469 and "utf_8" in tested
470 ):
471 probable_result: CharsetMatch = early_stop_results.best() # type: ignore[assignment]
472 logger.debug(
473 "Encoding detection: %s is most likely the one.",
474 probable_result.encoding,
475 )
476 if explain:
477 logger.removeHandler(explain_handler)
478 logger.setLevel(previous_logger_level)
479 return CharsetMatches([probable_result])
480
481 continue
482 else:
483 # The previous encoding with identical output failed chaos probing.
484 tested_but_soft_failure.append(encoding_iana)
485 logger.log(
486 TRACE,
487 "%s fast-skipped (identical decoded payload to a prior encoding that failed chaos probing).",
488 encoding_iana,
489 )
490 # Prepare fallbacks for special encodings even when skipped.
491 if enable_fallback and encoding_iana in [
492 "ascii",
493 "utf_8",
494 specified_encoding,
495 "utf_16",
496 "utf_32",
497 ]:
498 fallback_entry = CharsetMatch(
499 sequences,
500 encoding_iana,
501 threshold,
502 bom_or_sig_available,
503 [],
504 decoded_payload,
505 preemptive_declaration=specified_encoding,
506 )
507 if encoding_iana == specified_encoding:
508 fallback_specified = fallback_entry
509 elif encoding_iana == "ascii":
510 fallback_ascii = fallback_entry
511 else:
512 fallback_u8 = fallback_entry
513 continue
514
515 max_chunk_gave_up: int = int(len(r_) / 4)
516
517 max_chunk_gave_up = max(max_chunk_gave_up, 2)
518 early_stop_count: int = 0
519 lazy_str_hard_failure = False
520
521 md_chunks: list[str] = []
522 md_ratios = []
523
524 try:
525 for chunk in cut_sequence_chunks(
526 sequences,
527 encoding_iana,
528 r_,
529 chunk_size,
530 bom_or_sig_available,
531 strip_sig_or_bom,
532 sig_payload,
533 is_multi_byte_decoder,
534 decoded_payload,
535 ):
536 md_chunks.append(chunk)
537
538 md_ratios.append(
539 mess_ratio(
540 chunk,
541 threshold,
542 explain is True and 1 <= len(cp_isolation) <= 2,
543 )
544 )
545
546 if md_ratios[-1] >= threshold:
547 early_stop_count += 1
548
549 if (early_stop_count >= max_chunk_gave_up) or (
550 bom_or_sig_available and strip_sig_or_bom is False
551 ):
552 break
553 except (
554 UnicodeDecodeError
555 ) as e: # Lazy str loading may have missed something there
556 logger.log(
557 TRACE,
558 "LazyStr Loading: After MD chunk decode, code page %s does not fit given bytes sequence at ALL. %s",
559 encoding_iana,
560 str(e),
561 )
562 early_stop_count = max_chunk_gave_up
563 lazy_str_hard_failure = True
564
565 # We might want to check the sequence again with the whole content
566 # Only if initial MD tests passes
567 if (
568 not lazy_str_hard_failure
569 and is_too_large_sequence
570 and not is_multi_byte_decoder
571 ):
572 try:
573 sequences[int(50e3) :].decode(encoding_iana, errors="strict")
574 except UnicodeDecodeError as e:
575 logger.log(
576 TRACE,
577 "LazyStr Loading: After final lookup, code page %s does not fit given bytes sequence at ALL. %s",
578 encoding_iana,
579 str(e),
580 )
581 tested_but_hard_failure.append(encoding_iana)
582 continue
583
584 mean_mess_ratio: float = sum(md_ratios) / len(md_ratios) if md_ratios else 0.0
585 if mean_mess_ratio >= threshold or early_stop_count >= max_chunk_gave_up:
586 tested_but_soft_failure.append(encoding_iana)
587 if encoding_iana in IANA_SUPPORTED_SIMILAR:
588 soft_failure_skip.update(IANA_SUPPORTED_SIMILAR[encoding_iana])
589 # Cache this soft-failure so identical decoding from other encodings
590 # can be skipped immediately.
591 if decoded_payload is not None and not is_multi_byte_decoder:
592 payload_result_cache.setdefault(
593 hash(decoded_payload), (mean_mess_ratio, [], False)
594 )
595 logger.log(
596 TRACE,
597 "%s was excluded because of initial chaos probing. Gave up %i time(s). "
598 "Computed mean chaos is %f %%.",
599 encoding_iana,
600 early_stop_count,
601 round(mean_mess_ratio * 100, ndigits=3),
602 )
603 # Preparing those fallbacks in case we got nothing.
604 if (
605 enable_fallback
606 and encoding_iana
607 in ["ascii", "utf_8", specified_encoding, "utf_16", "utf_32"]
608 and not lazy_str_hard_failure
609 ):
610 fallback_entry = CharsetMatch(
611 sequences,
612 encoding_iana,
613 threshold,
614 bom_or_sig_available,
615 [],
616 decoded_payload,
617 preemptive_declaration=specified_encoding,
618 )
619 if encoding_iana == specified_encoding:
620 fallback_specified = fallback_entry
621 elif encoding_iana == "ascii":
622 fallback_ascii = fallback_entry
623 else:
624 fallback_u8 = fallback_entry
625 continue
626
627 logger.log(
628 TRACE,
629 "%s passed initial chaos probing. Mean measured chaos is %f %%",
630 encoding_iana,
631 round(mean_mess_ratio * 100, ndigits=3),
632 )
633
634 if not is_multi_byte_decoder:
635 target_languages: list[str] = encoding_languages(encoding_iana)
636 else:
637 target_languages = mb_encoding_languages(encoding_iana)
638
639 if target_languages:
640 logger.log(
641 TRACE,
642 "{} should target any language(s) of {}".format(
643 encoding_iana, str(target_languages)
644 ),
645 )
646
647 cd_ratios = []
648
649 # Run coherence detection on all chunks. We previously tried limiting to
650 # 1-2 chunks for post-definitive encodings to save time, but this caused
651 # coverage regressions by producing unrepresentative coherence scores.
652 # The SB cap and language-family skip optimizations provide sufficient
653 # speedup without sacrificing coherence accuracy.
654 if encoding_iana != "ascii":
655 # We shall skip the CD when its about ASCII
656 # Most of the time its not relevant to run "language-detection" on it.
657 for chunk in md_chunks:
658 chunk_languages = coherence_ratio(
659 chunk,
660 language_threshold,
661 ",".join(target_languages) if target_languages else None,
662 )
663
664 cd_ratios.append(chunk_languages)
665 cd_ratios_merged = merge_coherence_ratios(cd_ratios)
666 else:
667 cd_ratios_merged = merge_coherence_ratios(cd_ratios)
668
669 if cd_ratios_merged:
670 logger.log(
671 TRACE,
672 "We detected language {} using {}".format(
673 cd_ratios_merged, encoding_iana
674 ),
675 )
676
677 current_match = CharsetMatch(
678 sequences,
679 encoding_iana,
680 mean_mess_ratio,
681 bom_or_sig_available,
682 cd_ratios_merged,
683 (
684 decoded_payload
685 if (
686 is_too_large_sequence is False
687 or encoding_iana in [specified_encoding, "ascii", "utf_8"]
688 )
689 else None
690 ),
691 preemptive_declaration=specified_encoding,
692 )
693
694 results.append(current_match)
695
696 # Cache the successful result for payload-hash deduplication.
697 if decoded_payload is not None and not is_multi_byte_decoder:
698 payload_result_cache.setdefault(
699 hash(decoded_payload),
700 (mean_mess_ratio, cd_ratios_merged, True),
701 )
702
703 # Count post-definitive same-family SB successes for the early termination cap.
704 # Only count low-mess encodings (< 2%) toward the cap. High-mess encodings are
705 # marginal results that shouldn't prevent better-quality candidates from being
706 # tested. For example, iso8859_4 (mess=0%) should not be skipped just because
707 # 7 high-mess Latin encodings (cp1252 at 8%, etc.) were tried first.
708 if (
709 definitive_match_found
710 and not is_multi_byte_decoder
711 and mean_mess_ratio < 0.02
712 ):
713 post_definitive_sb_success_count += 1
714
715 if (
716 encoding_iana in [specified_encoding, "ascii", "utf_8"]
717 and mean_mess_ratio < 0.1
718 ):
719 # If md says nothing to worry about, then... stop immediately!
720 if mean_mess_ratio == 0.0:
721 logger.debug(
722 "Encoding detection: %s is most likely the one.",
723 current_match.encoding,
724 )
725 if explain: # Defensive: ensure exit path clean handler
726 logger.removeHandler(explain_handler)
727 logger.setLevel(previous_logger_level)
728 return CharsetMatches([current_match])
729
730 early_stop_results.append(current_match)
731
732 if (
733 len(early_stop_results)
734 and (specified_encoding is None or specified_encoding in tested)
735 and "ascii" in tested
736 and "utf_8" in tested
737 ):
738 probable_result = early_stop_results.best() # type: ignore[assignment]
739 logger.debug(
740 "Encoding detection: %s is most likely the one.",
741 probable_result.encoding, # type: ignore[union-attr]
742 )
743 if explain: # Defensive: ensure exit path clean handler
744 logger.removeHandler(explain_handler)
745 logger.setLevel(previous_logger_level)
746
747 return CharsetMatches([probable_result])
748
749 # Once we find a result with good coherence (>= 0.5) after testing the
750 # prioritized encodings (ascii, utf_8), activate "definitive mode": skip
751 # encodings that target completely different language families. This avoids
752 # running expensive mess_ratio + coherence_ratio on clearly unrelated
753 # candidates (e.g., Cyrillic encodings when the match is Latin-based).
754 # We require coherence >= 0.5 to avoid false positives (e.g., cp1251 decoding
755 # Hebrew text with 0.0 chaos but wrong language detection at coherence 0.33).
756 if not definitive_match_found and not is_multi_byte_decoder:
757 best_coherence = (
758 max((v for _, v in cd_ratios_merged), default=0.0)
759 if cd_ratios_merged
760 else 0.0
761 )
762 if best_coherence >= 0.5 and "ascii" in tested and "utf_8" in tested:
763 definitive_match_found = True
764 definitive_target_languages.update(target_languages)
765 logger.log(
766 TRACE,
767 "Definitive match found: %s (chaos=%.3f, coherence=%.2f). Encodings targeting different language families will be skipped.",
768 encoding_iana,
769 mean_mess_ratio,
770 best_coherence,
771 )
772
773 # When a non-UTF multibyte encoding passes chaos probing with significant
774 # multibyte content (decoded < 98% of raw), activate mb_definitive_match.
775 # This skips all remaining single-byte encodings which would either soft-fail
776 # (running expensive mess_ratio for nothing) or produce inferior results.
777 if (
778 not mb_definitive_match_found
779 and is_multi_byte_decoder
780 and multi_byte_bonus
781 and decoded_payload is not None
782 and len(decoded_payload) < length * 0.98
783 and encoding_iana
784 not in {
785 "utf_8",
786 "utf_8_sig",
787 "utf_16",
788 "utf_16_be",
789 "utf_16_le",
790 "utf_32",
791 "utf_32_be",
792 "utf_32_le",
793 "utf_7",
794 }
795 and "ascii" in tested
796 and "utf_8" in tested
797 ):
798 mb_definitive_match_found = True
799 logger.log(
800 TRACE,
801 "Multi-byte definitive match: %s (chaos=%.3f, decoded=%d/%d=%.1f%%). Single-byte encodings will be skipped.",
802 encoding_iana,
803 mean_mess_ratio,
804 len(decoded_payload),
805 length,
806 len(decoded_payload) / length * 100,
807 )
808
809 if encoding_iana == sig_encoding:
810 logger.debug(
811 "Encoding detection: %s is most likely the one as we detected a BOM or SIG within "
812 "the beginning of the sequence.",
813 encoding_iana,
814 )
815 if explain: # Defensive: ensure exit path clean handler
816 logger.removeHandler(explain_handler)
817 logger.setLevel(previous_logger_level)
818 return CharsetMatches([results[encoding_iana]])
819
820 if len(results) == 0:
821 if fallback_u8 or fallback_ascii or fallback_specified:
822 logger.log(
823 TRACE,
824 "Nothing got out of the detection process. Using ASCII/UTF-8/Specified fallback.",
825 )
826
827 if fallback_specified:
828 logger.debug(
829 "Encoding detection: %s will be used as a fallback match",
830 fallback_specified.encoding,
831 )
832 results.append(fallback_specified)
833 elif (
834 (fallback_u8 and fallback_ascii is None)
835 or (
836 fallback_u8
837 and fallback_ascii
838 and fallback_u8.fingerprint != fallback_ascii.fingerprint
839 )
840 or (fallback_u8 is not None)
841 ):
842 logger.debug("Encoding detection: utf_8 will be used as a fallback match")
843 results.append(fallback_u8)
844 elif fallback_ascii:
845 logger.debug("Encoding detection: ascii will be used as a fallback match")
846 results.append(fallback_ascii)
847
848 if results:
849 logger.debug(
850 "Encoding detection: Found %s as plausible (best-candidate) for content. With %i alternatives.",
851 results.best().encoding, # type: ignore
852 len(results) - 1,
853 )
854 else:
855 logger.debug("Encoding detection: Unable to determine any suitable charset.")
856
857 if explain:
858 logger.removeHandler(explain_handler)
859 logger.setLevel(previous_logger_level)
860
861 return results
862
863
864def from_fp(
865 fp: BinaryIO,
866 steps: int = 5,
867 chunk_size: int = 512,
868 threshold: float = 0.20,
869 cp_isolation: list[str] | None = None,
870 cp_exclusion: list[str] | None = None,
871 preemptive_behaviour: bool = True,
872 explain: bool = False,
873 language_threshold: float = 0.1,
874 enable_fallback: bool = True,
875) -> CharsetMatches:
876 """
877 Same thing than the function from_bytes but using a file pointer that is already ready.
878 Will not close the file pointer.
879 """
880 return from_bytes(
881 fp.read(),
882 steps,
883 chunk_size,
884 threshold,
885 cp_isolation,
886 cp_exclusion,
887 preemptive_behaviour,
888 explain,
889 language_threshold,
890 enable_fallback,
891 )
892
893
894def from_path(
895 path: str | bytes | PathLike, # type: ignore[type-arg]
896 steps: int = 5,
897 chunk_size: int = 512,
898 threshold: float = 0.20,
899 cp_isolation: list[str] | None = None,
900 cp_exclusion: list[str] | None = None,
901 preemptive_behaviour: bool = True,
902 explain: bool = False,
903 language_threshold: float = 0.1,
904 enable_fallback: bool = True,
905) -> CharsetMatches:
906 """
907 Same thing than the function from_bytes but with one extra step. Opening and reading given file path in binary mode.
908 Can raise IOError.
909 """
910 with open(path, "rb") as fp:
911 return from_fp(
912 fp,
913 steps,
914 chunk_size,
915 threshold,
916 cp_isolation,
917 cp_exclusion,
918 preemptive_behaviour,
919 explain,
920 language_threshold,
921 enable_fallback,
922 )
923
924
925def is_binary(
926 fp_or_path_or_payload: PathLike | str | BinaryIO | bytes, # type: ignore[type-arg]
927 steps: int = 5,
928 chunk_size: int = 512,
929 threshold: float = 0.20,
930 cp_isolation: list[str] | None = None,
931 cp_exclusion: list[str] | None = None,
932 preemptive_behaviour: bool = True,
933 explain: bool = False,
934 language_threshold: float = 0.1,
935 enable_fallback: bool = False,
936) -> bool:
937 """
938 Detect if the given input (file, bytes, or path) points to a binary file. aka. not a string.
939 Based on the same main heuristic algorithms and default kwargs at the sole exception that fallbacks match
940 are disabled to be stricter around ASCII-compatible but unlikely to be a string.
941 """
942 if isinstance(fp_or_path_or_payload, (str, PathLike)):
943 guesses = from_path(
944 fp_or_path_or_payload,
945 steps=steps,
946 chunk_size=chunk_size,
947 threshold=threshold,
948 cp_isolation=cp_isolation,
949 cp_exclusion=cp_exclusion,
950 preemptive_behaviour=preemptive_behaviour,
951 explain=explain,
952 language_threshold=language_threshold,
953 enable_fallback=enable_fallback,
954 )
955 elif isinstance(
956 fp_or_path_or_payload,
957 (
958 bytes,
959 bytearray,
960 ),
961 ):
962 guesses = from_bytes(
963 fp_or_path_or_payload,
964 steps=steps,
965 chunk_size=chunk_size,
966 threshold=threshold,
967 cp_isolation=cp_isolation,
968 cp_exclusion=cp_exclusion,
969 preemptive_behaviour=preemptive_behaviour,
970 explain=explain,
971 language_threshold=language_threshold,
972 enable_fallback=enable_fallback,
973 )
974 else:
975 guesses = from_fp(
976 fp_or_path_or_payload,
977 steps=steps,
978 chunk_size=chunk_size,
979 threshold=threshold,
980 cp_isolation=cp_isolation,
981 cp_exclusion=cp_exclusion,
982 preemptive_behaviour=preemptive_behaviour,
983 explain=explain,
984 language_threshold=language_threshold,
985 enable_fallback=enable_fallback,
986 )
987
988 return not guesses