1from __future__ import annotations
2
3import logging
4from os import PathLike
5from typing import BinaryIO
6
7from .cd import (
8 coherence_ratio,
9 encoding_languages,
10 mb_encoding_languages,
11 merge_coherence_ratios,
12)
13from .constant import (
14 IANA_SUPPORTED,
15 IANA_SUPPORTED_SIMILAR,
16 TOO_BIG_SEQUENCE,
17 TOO_SMALL_SEQUENCE,
18 TRACE,
19)
20from .md import mess_ratio
21from .models import CharsetMatch, CharsetMatches
22from .utils import (
23 any_specified_encoding,
24 cut_sequence_chunks,
25 iana_name,
26 identify_sig_or_bom,
27 is_multi_byte_encoding,
28 should_strip_sig_or_bom,
29)
30
31logger = logging.getLogger("charset_normalizer")
32explain_handler = logging.StreamHandler()
33explain_handler.setFormatter(
34 logging.Formatter("%(asctime)s | %(levelname)s | %(message)s")
35)
36
37# Pre-compute a reordered encoding list: multibyte first, then single-byte.
38# This allows the mb_definitive_match optimization to fire earlier, skipping
39# all single-byte encodings for genuine CJK content. Multibyte codecs
40# hard-fail (UnicodeDecodeError) on single-byte data almost instantly, so
41# testing them first costs negligible time for non-CJK files.
42_mb_supported: list[str] = []
43_sb_supported: list[str] = []
44
45for _supported_enc in IANA_SUPPORTED:
46 try:
47 if is_multi_byte_encoding(_supported_enc):
48 _mb_supported.append(_supported_enc)
49 else:
50 _sb_supported.append(_supported_enc)
51 except ImportError:
52 _sb_supported.append(_supported_enc)
53
54IANA_SUPPORTED_MB_FIRST: list[str] = _mb_supported + _sb_supported
55
56
57def from_bytes(
58 sequences: bytes | bytearray,
59 steps: int = 5,
60 chunk_size: int = 512,
61 threshold: float = 0.2,
62 cp_isolation: list[str] | None = None,
63 cp_exclusion: list[str] | None = None,
64 preemptive_behaviour: bool = True,
65 explain: bool = False,
66 language_threshold: float = 0.1,
67 enable_fallback: bool = True,
68) -> CharsetMatches:
69 """
70 Given a raw bytes sequence, return the best possibles charset usable to render str objects.
71 If there is no results, it is a strong indicator that the source is binary/not text.
72 By default, the process will extract 5 blocks of 512o each to assess the mess and coherence of a given sequence.
73 And will give up a particular code page after 20% of measured mess. Those criteria are customizable at will.
74
75 The preemptive behavior DOES NOT replace the traditional detection workflow, it prioritize a particular code page
76 but never take it for granted. Can improve the performance.
77
78 You may want to focus your attention to some code page or/and not others, use cp_isolation and cp_exclusion for that
79 purpose.
80
81 This function will strip the SIG in the payload/sequence every time except on UTF-16, UTF-32.
82 By default the library does not setup any handler other than the NullHandler, if you choose to set the 'explain'
83 toggle to True it will alter the logger configuration to add a StreamHandler that is suitable for debugging.
84 Custom logging format and handler can be set manually.
85 """
86
87 if not isinstance(sequences, (bytearray, bytes)):
88 raise TypeError(
89 "Expected object of type bytes or bytearray, got: {}".format(
90 type(sequences)
91 )
92 )
93
94 if explain:
95 previous_logger_level: int = logger.level
96 logger.addHandler(explain_handler)
97 logger.setLevel(TRACE)
98
99 length: int = len(sequences)
100
101 if length == 0:
102 logger.debug("Encoding detection on empty bytes, assuming utf_8 intention.")
103 if explain: # Defensive: ensure exit path clean handler
104 logger.removeHandler(explain_handler)
105 logger.setLevel(previous_logger_level)
106 return CharsetMatches([CharsetMatch(sequences, "utf_8", 0.0, False, [], "")])
107
108 if cp_isolation is not None:
109 logger.log(
110 TRACE,
111 "cp_isolation is set. use this flag for debugging purpose. "
112 "limited list of encoding allowed : %s.",
113 ", ".join(cp_isolation),
114 )
115 cp_isolation = [iana_name(cp, False) for cp in cp_isolation]
116 else:
117 cp_isolation = []
118
119 if cp_exclusion is not None:
120 logger.log(
121 TRACE,
122 "cp_exclusion is set. use this flag for debugging purpose. "
123 "limited list of encoding excluded : %s.",
124 ", ".join(cp_exclusion),
125 )
126 cp_exclusion = [iana_name(cp, False) for cp in cp_exclusion]
127 else:
128 cp_exclusion = []
129
130 if length <= (chunk_size * steps):
131 logger.log(
132 TRACE,
133 "override steps (%i) and chunk_size (%i) as content does not fit (%i byte(s) given) parameters.",
134 steps,
135 chunk_size,
136 length,
137 )
138 steps = 1
139 chunk_size = length
140
141 if steps > 1 and length / steps < chunk_size:
142 chunk_size = int(length / steps)
143
144 is_too_small_sequence: bool = len(sequences) < TOO_SMALL_SEQUENCE
145 is_too_large_sequence: bool = len(sequences) >= TOO_BIG_SEQUENCE
146
147 if is_too_small_sequence:
148 logger.log(
149 TRACE,
150 "Trying to detect encoding from a tiny portion of ({}) byte(s).".format(
151 length
152 ),
153 )
154 elif is_too_large_sequence:
155 logger.log(
156 TRACE,
157 "Using lazy str decoding because the payload is quite large, ({}) byte(s).".format(
158 length
159 ),
160 )
161
162 prioritized_encodings: list[str] = []
163
164 specified_encoding: str | None = (
165 any_specified_encoding(sequences) if preemptive_behaviour else None
166 )
167
168 if specified_encoding is not None:
169 prioritized_encodings.append(specified_encoding)
170 logger.log(
171 TRACE,
172 "Detected declarative mark in sequence. Priority +1 given for %s.",
173 specified_encoding,
174 )
175
176 tested: set[str] = set()
177 tested_but_hard_failure: list[str] = []
178 tested_but_soft_failure: list[str] = []
179 soft_failure_skip: set[str] = set()
180 success_fast_tracked: set[str] = set()
181
182 # Cache for decoded payload deduplication: hash(decoded_payload) -> (mean_mess_ratio, cd_ratios_merged, passed)
183 # When multiple encodings decode to the exact same string, we can skip the expensive
184 # mess_ratio and coherence_ratio analysis and reuse the results from the first encoding.
185 payload_result_cache: dict[int, tuple[float, list[tuple[str, float]], bool]] = {}
186
187 # When a definitive result (chaos=0.0 and good coherence) is found after testing
188 # the prioritized encodings (ascii, utf_8), we can significantly reduce the remaining
189 # work. Encodings that target completely different language families (e.g., Cyrillic
190 # when the definitive match is Latin) are skipped entirely.
191 # Additionally, for same-family encodings that pass chaos probing, we reuse the
192 # definitive match's coherence ratios instead of recomputing them — a major savings
193 # since coherence_ratio accounts for ~30% of total time on slow Latin files.
194 definitive_match_found: bool = False
195 definitive_target_languages: set[str] = set()
196 # After the definitive match fires, we cap the number of additional same-family
197 # single-byte encodings that pass chaos probing. Once we've accumulated enough
198 # good candidates (N), further same-family SB encodings are unlikely to produce
199 # a better best() result and just waste mess_ratio + coherence_ratio time.
200 # The first encoding to trigger the definitive match is NOT counted (it's already in).
201 post_definitive_sb_success_count: int = 0
202 POST_DEFINITIVE_SB_CAP: int = 7
203
204 # When a non-UTF multibyte encoding passes chaos probing with significant multibyte
205 # content (decoded length < 98% of raw length), skip all remaining single-byte encodings.
206 # Rationale: multi-byte decoders (CJK) have strict byte-sequence validation — if they
207 # decode without error AND pass chaos probing with substantial multibyte content, the
208 # data is genuinely multibyte encoded. Single-byte encodings will always decode (every
209 # byte maps to something) but waste time on mess_ratio before failing.
210 # The 98% threshold prevents false triggers on files that happen to have a few valid
211 # multibyte pairs (e.g., cp424/_ude_1.txt where big5 decodes with 99% ratio).
212 mb_definitive_match_found: bool = False
213
214 fallback_ascii: CharsetMatch | None = None
215 fallback_u8: CharsetMatch | None = None
216 fallback_specified: CharsetMatch | None = None
217
218 results: CharsetMatches = CharsetMatches()
219
220 early_stop_results: CharsetMatches = CharsetMatches()
221
222 sig_encoding, sig_payload = identify_sig_or_bom(sequences)
223
224 if sig_encoding is not None:
225 prioritized_encodings.append(sig_encoding)
226 logger.log(
227 TRACE,
228 "Detected a SIG or BOM mark on first %i byte(s). Priority +1 given for %s.",
229 len(sig_payload),
230 sig_encoding,
231 )
232
233 prioritized_encodings.append("ascii")
234
235 if "utf_8" not in prioritized_encodings:
236 prioritized_encodings.append("utf_8")
237
238 for encoding_iana in prioritized_encodings + IANA_SUPPORTED_MB_FIRST:
239 if cp_isolation and encoding_iana not in cp_isolation:
240 continue
241
242 if cp_exclusion and encoding_iana in cp_exclusion:
243 continue
244
245 if encoding_iana in tested:
246 continue
247
248 tested.add(encoding_iana)
249
250 decoded_payload: str | None = None
251 bom_or_sig_available: bool = sig_encoding == encoding_iana
252 strip_sig_or_bom: bool = bom_or_sig_available and should_strip_sig_or_bom(
253 encoding_iana
254 )
255
256 if encoding_iana in {"utf_16", "utf_32"} and not bom_or_sig_available:
257 logger.log(
258 TRACE,
259 "Encoding %s won't be tested as-is because it require a BOM. Will try some sub-encoder LE/BE.",
260 encoding_iana,
261 )
262 continue
263 if encoding_iana in {"utf_7"} and not bom_or_sig_available:
264 logger.log(
265 TRACE,
266 "Encoding %s won't be tested as-is because detection is unreliable without BOM/SIG.",
267 encoding_iana,
268 )
269 continue
270
271 # Skip encodings similar to ones that already soft-failed (high mess ratio).
272 # Checked BEFORE the expensive decode attempt.
273 if encoding_iana in soft_failure_skip:
274 logger.log(
275 TRACE,
276 "%s is deemed too similar to a code page that was already considered unsuited. Continuing!",
277 encoding_iana,
278 )
279 continue
280
281 # Skip encodings that were already fast-tracked from a similar successful encoding.
282 if encoding_iana in success_fast_tracked:
283 logger.log(
284 TRACE,
285 "Skipping %s: already fast-tracked from a similar successful encoding.",
286 encoding_iana,
287 )
288 continue
289
290 try:
291 is_multi_byte_decoder: bool = is_multi_byte_encoding(encoding_iana)
292 except (ModuleNotFoundError, ImportError): # Defensive:
293 logger.log(
294 TRACE,
295 "Encoding %s does not provide an IncrementalDecoder",
296 encoding_iana,
297 )
298 continue
299
300 # When we've already found a definitive match (chaos=0.0 with good coherence)
301 # after testing the prioritized encodings, skip encodings that target
302 # completely different language families. This avoids running expensive
303 # mess_ratio + coherence_ratio on clearly unrelated candidates (e.g., Cyrillic
304 # when the definitive match is Latin-based).
305 if definitive_match_found:
306 if not is_multi_byte_decoder:
307 enc_languages = set(encoding_languages(encoding_iana))
308 else:
309 enc_languages = set(mb_encoding_languages(encoding_iana))
310 if not enc_languages.intersection(definitive_target_languages):
311 logger.log(
312 TRACE,
313 "Skipping %s: definitive match already found, this encoding targets different languages (%s vs %s).",
314 encoding_iana,
315 enc_languages,
316 definitive_target_languages,
317 )
318 continue
319
320 # After the definitive match, cap the number of additional same-family
321 # single-byte encodings that pass chaos probing. This avoids testing the
322 # tail of rare, low-value same-family encodings (mac_iceland, cp860, etc.)
323 # that almost never change best() but each cost ~1-2ms of mess_ratio + coherence.
324 if (
325 definitive_match_found
326 and not is_multi_byte_decoder
327 and post_definitive_sb_success_count >= POST_DEFINITIVE_SB_CAP
328 ):
329 logger.log(
330 TRACE,
331 "Skipping %s: already accumulated %d same-family results after definitive match (cap=%d).",
332 encoding_iana,
333 post_definitive_sb_success_count,
334 POST_DEFINITIVE_SB_CAP,
335 )
336 continue
337
338 # When a multibyte encoding with significant multibyte content has already
339 # passed chaos probing, skip all single-byte encodings. They will either fail
340 # chaos probing (wasting mess_ratio time) or produce inferior results.
341 if mb_definitive_match_found and not is_multi_byte_decoder:
342 logger.log(
343 TRACE,
344 "Skipping single-byte %s: multi-byte definitive match already found.",
345 encoding_iana,
346 )
347 continue
348
349 try:
350 if is_too_large_sequence and is_multi_byte_decoder is False:
351 str(
352 (
353 sequences[: int(50e4)]
354 if strip_sig_or_bom is False
355 else sequences[len(sig_payload) : int(50e4)]
356 ),
357 encoding=encoding_iana,
358 )
359 else:
360 decoded_payload = str(
361 (
362 sequences
363 if strip_sig_or_bom is False
364 else sequences[len(sig_payload) :]
365 ),
366 encoding=encoding_iana,
367 )
368 except (UnicodeDecodeError, LookupError) as e:
369 if not isinstance(e, LookupError):
370 logger.log(
371 TRACE,
372 "Code page %s does not fit given bytes sequence at ALL. %s",
373 encoding_iana,
374 str(e),
375 )
376 tested_but_hard_failure.append(encoding_iana)
377 continue
378
379 r_ = range(
380 0 if not bom_or_sig_available else len(sig_payload),
381 length,
382 int(length / steps),
383 )
384
385 multi_byte_bonus: bool = (
386 is_multi_byte_decoder
387 and decoded_payload is not None
388 and len(decoded_payload) < length
389 )
390
391 if multi_byte_bonus:
392 logger.log(
393 TRACE,
394 "Code page %s is a multi byte encoding table and it appear that at least one character "
395 "was encoded using n-bytes.",
396 encoding_iana,
397 )
398
399 # Payload-hash deduplication: if another encoding already decoded to the
400 # exact same string, reuse its mess_ratio and coherence results entirely.
401 # This is strictly more general than the old IANA_SUPPORTED_SIMILAR approach
402 # because it catches ALL identical decoding, not just pre-mapped ones.
403 if decoded_payload is not None and not is_multi_byte_decoder:
404 payload_hash: int = hash(decoded_payload)
405 cached = payload_result_cache.get(payload_hash)
406 if cached is not None:
407 cached_mess, cached_cd, cached_passed = cached
408 if cached_passed:
409 # The previous encoding with identical output passed chaos probing.
410 fast_match = CharsetMatch(
411 sequences,
412 encoding_iana,
413 cached_mess,
414 bom_or_sig_available,
415 cached_cd,
416 (
417 decoded_payload
418 if (
419 is_too_large_sequence is False
420 or encoding_iana
421 in [specified_encoding, "ascii", "utf_8"]
422 )
423 else None
424 ),
425 preemptive_declaration=specified_encoding,
426 )
427 results.append(fast_match)
428 success_fast_tracked.add(encoding_iana)
429 logger.log(
430 TRACE,
431 "%s fast-tracked (identical decoded payload to a prior encoding, chaos=%f %%).",
432 encoding_iana,
433 round(cached_mess * 100, ndigits=3),
434 )
435
436 if (
437 encoding_iana in [specified_encoding, "ascii", "utf_8"]
438 and cached_mess < 0.1
439 ):
440 if cached_mess == 0.0:
441 logger.debug(
442 "Encoding detection: %s is most likely the one.",
443 fast_match.encoding,
444 )
445 if explain:
446 logger.removeHandler(explain_handler)
447 logger.setLevel(previous_logger_level)
448 return CharsetMatches([fast_match])
449 early_stop_results.append(fast_match)
450
451 if (
452 len(early_stop_results)
453 and (specified_encoding is None or specified_encoding in tested)
454 and "ascii" in tested
455 and "utf_8" in tested
456 ):
457 probable_result: CharsetMatch = early_stop_results.best() # type: ignore[assignment]
458 logger.debug(
459 "Encoding detection: %s is most likely the one.",
460 probable_result.encoding,
461 )
462 if explain:
463 logger.removeHandler(explain_handler)
464 logger.setLevel(previous_logger_level)
465 return CharsetMatches([probable_result])
466
467 continue
468 else:
469 # The previous encoding with identical output failed chaos probing.
470 tested_but_soft_failure.append(encoding_iana)
471 logger.log(
472 TRACE,
473 "%s fast-skipped (identical decoded payload to a prior encoding that failed chaos probing).",
474 encoding_iana,
475 )
476 # Prepare fallbacks for special encodings even when skipped.
477 if enable_fallback and encoding_iana in [
478 "ascii",
479 "utf_8",
480 specified_encoding,
481 "utf_16",
482 "utf_32",
483 ]:
484 fallback_entry = CharsetMatch(
485 sequences,
486 encoding_iana,
487 threshold,
488 bom_or_sig_available,
489 [],
490 decoded_payload,
491 preemptive_declaration=specified_encoding,
492 )
493 if encoding_iana == specified_encoding:
494 fallback_specified = fallback_entry
495 elif encoding_iana == "ascii":
496 fallback_ascii = fallback_entry
497 else:
498 fallback_u8 = fallback_entry
499 continue
500
501 max_chunk_gave_up: int = int(len(r_) / 4)
502
503 max_chunk_gave_up = max(max_chunk_gave_up, 2)
504 early_stop_count: int = 0
505 lazy_str_hard_failure = False
506
507 md_chunks: list[str] = []
508 md_ratios = []
509
510 try:
511 for chunk in cut_sequence_chunks(
512 sequences,
513 encoding_iana,
514 r_,
515 chunk_size,
516 bom_or_sig_available,
517 strip_sig_or_bom,
518 sig_payload,
519 is_multi_byte_decoder,
520 decoded_payload,
521 ):
522 md_chunks.append(chunk)
523
524 md_ratios.append(
525 mess_ratio(
526 chunk,
527 threshold,
528 explain is True and 1 <= len(cp_isolation) <= 2,
529 )
530 )
531
532 if md_ratios[-1] >= threshold:
533 early_stop_count += 1
534
535 if (early_stop_count >= max_chunk_gave_up) or (
536 bom_or_sig_available and strip_sig_or_bom is False
537 ):
538 break
539 except (
540 UnicodeDecodeError
541 ) as e: # Lazy str loading may have missed something there
542 logger.log(
543 TRACE,
544 "LazyStr Loading: After MD chunk decode, code page %s does not fit given bytes sequence at ALL. %s",
545 encoding_iana,
546 str(e),
547 )
548 early_stop_count = max_chunk_gave_up
549 lazy_str_hard_failure = True
550
551 # We might want to check the sequence again with the whole content
552 # Only if initial MD tests passes
553 if (
554 not lazy_str_hard_failure
555 and is_too_large_sequence
556 and not is_multi_byte_decoder
557 ):
558 try:
559 sequences[int(50e3) :].decode(encoding_iana, errors="strict")
560 except UnicodeDecodeError as e:
561 logger.log(
562 TRACE,
563 "LazyStr Loading: After final lookup, code page %s does not fit given bytes sequence at ALL. %s",
564 encoding_iana,
565 str(e),
566 )
567 tested_but_hard_failure.append(encoding_iana)
568 continue
569
570 mean_mess_ratio: float = sum(md_ratios) / len(md_ratios) if md_ratios else 0.0
571 if mean_mess_ratio >= threshold or early_stop_count >= max_chunk_gave_up:
572 tested_but_soft_failure.append(encoding_iana)
573 if encoding_iana in IANA_SUPPORTED_SIMILAR:
574 soft_failure_skip.update(IANA_SUPPORTED_SIMILAR[encoding_iana])
575 # Cache this soft-failure so identical decoding from other encodings
576 # can be skipped immediately.
577 if decoded_payload is not None and not is_multi_byte_decoder:
578 payload_result_cache.setdefault(
579 hash(decoded_payload), (mean_mess_ratio, [], False)
580 )
581 logger.log(
582 TRACE,
583 "%s was excluded because of initial chaos probing. Gave up %i time(s). "
584 "Computed mean chaos is %f %%.",
585 encoding_iana,
586 early_stop_count,
587 round(mean_mess_ratio * 100, ndigits=3),
588 )
589 # Preparing those fallbacks in case we got nothing.
590 if (
591 enable_fallback
592 and encoding_iana
593 in ["ascii", "utf_8", specified_encoding, "utf_16", "utf_32"]
594 and not lazy_str_hard_failure
595 ):
596 fallback_entry = CharsetMatch(
597 sequences,
598 encoding_iana,
599 threshold,
600 bom_or_sig_available,
601 [],
602 decoded_payload,
603 preemptive_declaration=specified_encoding,
604 )
605 if encoding_iana == specified_encoding:
606 fallback_specified = fallback_entry
607 elif encoding_iana == "ascii":
608 fallback_ascii = fallback_entry
609 else:
610 fallback_u8 = fallback_entry
611 continue
612
613 logger.log(
614 TRACE,
615 "%s passed initial chaos probing. Mean measured chaos is %f %%",
616 encoding_iana,
617 round(mean_mess_ratio * 100, ndigits=3),
618 )
619
620 if not is_multi_byte_decoder:
621 target_languages: list[str] = encoding_languages(encoding_iana)
622 else:
623 target_languages = mb_encoding_languages(encoding_iana)
624
625 if target_languages:
626 logger.log(
627 TRACE,
628 "{} should target any language(s) of {}".format(
629 encoding_iana, str(target_languages)
630 ),
631 )
632
633 cd_ratios = []
634
635 # Run coherence detection on all chunks. We previously tried limiting to
636 # 1-2 chunks for post-definitive encodings to save time, but this caused
637 # coverage regressions by producing unrepresentative coherence scores.
638 # The SB cap and language-family skip optimizations provide sufficient
639 # speedup without sacrificing coherence accuracy.
640 if encoding_iana != "ascii":
641 # We shall skip the CD when its about ASCII
642 # Most of the time its not relevant to run "language-detection" on it.
643 for chunk in md_chunks:
644 chunk_languages = coherence_ratio(
645 chunk,
646 language_threshold,
647 ",".join(target_languages) if target_languages else None,
648 )
649
650 cd_ratios.append(chunk_languages)
651 cd_ratios_merged = merge_coherence_ratios(cd_ratios)
652 else:
653 cd_ratios_merged = merge_coherence_ratios(cd_ratios)
654
655 if cd_ratios_merged:
656 logger.log(
657 TRACE,
658 "We detected language {} using {}".format(
659 cd_ratios_merged, encoding_iana
660 ),
661 )
662
663 current_match = CharsetMatch(
664 sequences,
665 encoding_iana,
666 mean_mess_ratio,
667 bom_or_sig_available,
668 cd_ratios_merged,
669 (
670 decoded_payload
671 if (
672 is_too_large_sequence is False
673 or encoding_iana in [specified_encoding, "ascii", "utf_8"]
674 )
675 else None
676 ),
677 preemptive_declaration=specified_encoding,
678 )
679
680 results.append(current_match)
681
682 # Cache the successful result for payload-hash deduplication.
683 if decoded_payload is not None and not is_multi_byte_decoder:
684 payload_result_cache.setdefault(
685 hash(decoded_payload),
686 (mean_mess_ratio, cd_ratios_merged, True),
687 )
688
689 # Count post-definitive same-family SB successes for the early termination cap.
690 # Only count low-mess encodings (< 2%) toward the cap. High-mess encodings are
691 # marginal results that shouldn't prevent better-quality candidates from being
692 # tested. For example, iso8859_4 (mess=0%) should not be skipped just because
693 # 7 high-mess Latin encodings (cp1252 at 8%, etc.) were tried first.
694 if (
695 definitive_match_found
696 and not is_multi_byte_decoder
697 and mean_mess_ratio < 0.02
698 ):
699 post_definitive_sb_success_count += 1
700
701 if (
702 encoding_iana in [specified_encoding, "ascii", "utf_8"]
703 and mean_mess_ratio < 0.1
704 ):
705 # If md says nothing to worry about, then... stop immediately!
706 if mean_mess_ratio == 0.0:
707 logger.debug(
708 "Encoding detection: %s is most likely the one.",
709 current_match.encoding,
710 )
711 if explain: # Defensive: ensure exit path clean handler
712 logger.removeHandler(explain_handler)
713 logger.setLevel(previous_logger_level)
714 return CharsetMatches([current_match])
715
716 early_stop_results.append(current_match)
717
718 if (
719 len(early_stop_results)
720 and (specified_encoding is None or specified_encoding in tested)
721 and "ascii" in tested
722 and "utf_8" in tested
723 ):
724 probable_result = early_stop_results.best() # type: ignore[assignment]
725 logger.debug(
726 "Encoding detection: %s is most likely the one.",
727 probable_result.encoding, # type: ignore[union-attr]
728 )
729 if explain: # Defensive: ensure exit path clean handler
730 logger.removeHandler(explain_handler)
731 logger.setLevel(previous_logger_level)
732
733 return CharsetMatches([probable_result])
734
735 # Once we find a result with good coherence (>= 0.5) after testing the
736 # prioritized encodings (ascii, utf_8), activate "definitive mode": skip
737 # encodings that target completely different language families. This avoids
738 # running expensive mess_ratio + coherence_ratio on clearly unrelated
739 # candidates (e.g., Cyrillic encodings when the match is Latin-based).
740 # We require coherence >= 0.5 to avoid false positives (e.g., cp1251 decoding
741 # Hebrew text with 0.0 chaos but wrong language detection at coherence 0.33).
742 if not definitive_match_found and not is_multi_byte_decoder:
743 best_coherence = (
744 max((v for _, v in cd_ratios_merged), default=0.0)
745 if cd_ratios_merged
746 else 0.0
747 )
748 if best_coherence >= 0.5 and "ascii" in tested and "utf_8" in tested:
749 definitive_match_found = True
750 definitive_target_languages.update(target_languages)
751 logger.log(
752 TRACE,
753 "Definitive match found: %s (chaos=%.3f, coherence=%.2f). Encodings targeting different language families will be skipped.",
754 encoding_iana,
755 mean_mess_ratio,
756 best_coherence,
757 )
758
759 # When a non-UTF multibyte encoding passes chaos probing with significant
760 # multibyte content (decoded < 98% of raw), activate mb_definitive_match.
761 # This skips all remaining single-byte encodings which would either soft-fail
762 # (running expensive mess_ratio for nothing) or produce inferior results.
763 if (
764 not mb_definitive_match_found
765 and is_multi_byte_decoder
766 and multi_byte_bonus
767 and decoded_payload is not None
768 and len(decoded_payload) < length * 0.98
769 and encoding_iana
770 not in {
771 "utf_8",
772 "utf_8_sig",
773 "utf_16",
774 "utf_16_be",
775 "utf_16_le",
776 "utf_32",
777 "utf_32_be",
778 "utf_32_le",
779 "utf_7",
780 }
781 and "ascii" in tested
782 and "utf_8" in tested
783 ):
784 mb_definitive_match_found = True
785 logger.log(
786 TRACE,
787 "Multi-byte definitive match: %s (chaos=%.3f, decoded=%d/%d=%.1f%%). Single-byte encodings will be skipped.",
788 encoding_iana,
789 mean_mess_ratio,
790 len(decoded_payload),
791 length,
792 len(decoded_payload) / length * 100,
793 )
794
795 if encoding_iana == sig_encoding:
796 logger.debug(
797 "Encoding detection: %s is most likely the one as we detected a BOM or SIG within "
798 "the beginning of the sequence.",
799 encoding_iana,
800 )
801 if explain: # Defensive: ensure exit path clean handler
802 logger.removeHandler(explain_handler)
803 logger.setLevel(previous_logger_level)
804 return CharsetMatches([results[encoding_iana]])
805
806 if len(results) == 0:
807 if fallback_u8 or fallback_ascii or fallback_specified:
808 logger.log(
809 TRACE,
810 "Nothing got out of the detection process. Using ASCII/UTF-8/Specified fallback.",
811 )
812
813 if fallback_specified:
814 logger.debug(
815 "Encoding detection: %s will be used as a fallback match",
816 fallback_specified.encoding,
817 )
818 results.append(fallback_specified)
819 elif (
820 (fallback_u8 and fallback_ascii is None)
821 or (
822 fallback_u8
823 and fallback_ascii
824 and fallback_u8.fingerprint != fallback_ascii.fingerprint
825 )
826 or (fallback_u8 is not None)
827 ):
828 logger.debug("Encoding detection: utf_8 will be used as a fallback match")
829 results.append(fallback_u8)
830 elif fallback_ascii:
831 logger.debug("Encoding detection: ascii will be used as a fallback match")
832 results.append(fallback_ascii)
833
834 if results:
835 logger.debug(
836 "Encoding detection: Found %s as plausible (best-candidate) for content. With %i alternatives.",
837 results.best().encoding, # type: ignore
838 len(results) - 1,
839 )
840 else:
841 logger.debug("Encoding detection: Unable to determine any suitable charset.")
842
843 if explain:
844 logger.removeHandler(explain_handler)
845 logger.setLevel(previous_logger_level)
846
847 return results
848
849
850def from_fp(
851 fp: BinaryIO,
852 steps: int = 5,
853 chunk_size: int = 512,
854 threshold: float = 0.20,
855 cp_isolation: list[str] | None = None,
856 cp_exclusion: list[str] | None = None,
857 preemptive_behaviour: bool = True,
858 explain: bool = False,
859 language_threshold: float = 0.1,
860 enable_fallback: bool = True,
861) -> CharsetMatches:
862 """
863 Same thing than the function from_bytes but using a file pointer that is already ready.
864 Will not close the file pointer.
865 """
866 return from_bytes(
867 fp.read(),
868 steps,
869 chunk_size,
870 threshold,
871 cp_isolation,
872 cp_exclusion,
873 preemptive_behaviour,
874 explain,
875 language_threshold,
876 enable_fallback,
877 )
878
879
880def from_path(
881 path: str | bytes | PathLike, # type: ignore[type-arg]
882 steps: int = 5,
883 chunk_size: int = 512,
884 threshold: float = 0.20,
885 cp_isolation: list[str] | None = None,
886 cp_exclusion: list[str] | None = None,
887 preemptive_behaviour: bool = True,
888 explain: bool = False,
889 language_threshold: float = 0.1,
890 enable_fallback: bool = True,
891) -> CharsetMatches:
892 """
893 Same thing than the function from_bytes but with one extra step. Opening and reading given file path in binary mode.
894 Can raise IOError.
895 """
896 with open(path, "rb") as fp:
897 return from_fp(
898 fp,
899 steps,
900 chunk_size,
901 threshold,
902 cp_isolation,
903 cp_exclusion,
904 preemptive_behaviour,
905 explain,
906 language_threshold,
907 enable_fallback,
908 )
909
910
911def is_binary(
912 fp_or_path_or_payload: PathLike | str | BinaryIO | bytes, # type: ignore[type-arg]
913 steps: int = 5,
914 chunk_size: int = 512,
915 threshold: float = 0.20,
916 cp_isolation: list[str] | None = None,
917 cp_exclusion: list[str] | None = None,
918 preemptive_behaviour: bool = True,
919 explain: bool = False,
920 language_threshold: float = 0.1,
921 enable_fallback: bool = False,
922) -> bool:
923 """
924 Detect if the given input (file, bytes, or path) points to a binary file. aka. not a string.
925 Based on the same main heuristic algorithms and default kwargs at the sole exception that fallbacks match
926 are disabled to be stricter around ASCII-compatible but unlikely to be a string.
927 """
928 if isinstance(fp_or_path_or_payload, (str, PathLike)):
929 guesses = from_path(
930 fp_or_path_or_payload,
931 steps=steps,
932 chunk_size=chunk_size,
933 threshold=threshold,
934 cp_isolation=cp_isolation,
935 cp_exclusion=cp_exclusion,
936 preemptive_behaviour=preemptive_behaviour,
937 explain=explain,
938 language_threshold=language_threshold,
939 enable_fallback=enable_fallback,
940 )
941 elif isinstance(
942 fp_or_path_or_payload,
943 (
944 bytes,
945 bytearray,
946 ),
947 ):
948 guesses = from_bytes(
949 fp_or_path_or_payload,
950 steps=steps,
951 chunk_size=chunk_size,
952 threshold=threshold,
953 cp_isolation=cp_isolation,
954 cp_exclusion=cp_exclusion,
955 preemptive_behaviour=preemptive_behaviour,
956 explain=explain,
957 language_threshold=language_threshold,
958 enable_fallback=enable_fallback,
959 )
960 else:
961 guesses = from_fp(
962 fp_or_path_or_payload,
963 steps=steps,
964 chunk_size=chunk_size,
965 threshold=threshold,
966 cp_isolation=cp_isolation,
967 cp_exclusion=cp_exclusion,
968 preemptive_behaviour=preemptive_behaviour,
969 explain=explain,
970 language_threshold=language_threshold,
971 enable_fallback=enable_fallback,
972 )
973
974 return not guesses