1from __future__ import annotations
2
3import logging
4from os import PathLike
5from typing import BinaryIO
6
7from .cd import (
8 coherence_ratio,
9 encoding_languages,
10 mb_encoding_languages,
11 merge_coherence_ratios,
12)
13from .constant import (
14 IANA_SUPPORTED,
15 IANA_SUPPORTED_SIMILAR,
16 TOO_BIG_SEQUENCE,
17 TOO_SMALL_SEQUENCE,
18 TRACE,
19)
20from .md import mess_ratio
21from .models import CharsetMatch, CharsetMatches
22from .utils import (
23 any_specified_encoding,
24 cut_sequence_chunks,
25 iana_name,
26 identify_sig_or_bom,
27 is_multi_byte_encoding,
28 should_strip_sig_or_bom,
29)
30
31logger = logging.getLogger("charset_normalizer")
32explain_handler = logging.StreamHandler()
33explain_handler.setFormatter(
34 logging.Formatter("%(asctime)s | %(levelname)s | %(message)s")
35)
36
37
38def from_bytes(
39 sequences: bytes | bytearray,
40 steps: int = 5,
41 chunk_size: int = 512,
42 threshold: float = 0.2,
43 cp_isolation: list[str] | None = None,
44 cp_exclusion: list[str] | None = None,
45 preemptive_behaviour: bool = True,
46 explain: bool = False,
47 language_threshold: float = 0.1,
48 enable_fallback: bool = True,
49) -> CharsetMatches:
50 """
51 Given a raw bytes sequence, return the best possibles charset usable to render str objects.
52 If there is no results, it is a strong indicator that the source is binary/not text.
53 By default, the process will extract 5 blocks of 512o each to assess the mess and coherence of a given sequence.
54 And will give up a particular code page after 20% of measured mess. Those criteria are customizable at will.
55
56 The preemptive behavior DOES NOT replace the traditional detection workflow, it prioritize a particular code page
57 but never take it for granted. Can improve the performance.
58
59 You may want to focus your attention to some code page or/and not others, use cp_isolation and cp_exclusion for that
60 purpose.
61
62 This function will strip the SIG in the payload/sequence every time except on UTF-16, UTF-32.
63 By default the library does not setup any handler other than the NullHandler, if you choose to set the 'explain'
64 toggle to True it will alter the logger configuration to add a StreamHandler that is suitable for debugging.
65 Custom logging format and handler can be set manually.
66 """
67
68 if not isinstance(sequences, (bytearray, bytes)):
69 raise TypeError(
70 "Expected object of type bytes or bytearray, got: {}".format(
71 type(sequences)
72 )
73 )
74
75 if explain:
76 previous_logger_level: int = logger.level
77 logger.addHandler(explain_handler)
78 logger.setLevel(TRACE)
79
80 length: int = len(sequences)
81
82 if length == 0:
83 logger.debug("Encoding detection on empty bytes, assuming utf_8 intention.")
84 if explain: # Defensive: ensure exit path clean handler
85 logger.removeHandler(explain_handler)
86 logger.setLevel(previous_logger_level)
87 return CharsetMatches([CharsetMatch(sequences, "utf_8", 0.0, False, [], "")])
88
89 if cp_isolation is not None:
90 logger.log(
91 TRACE,
92 "cp_isolation is set. use this flag for debugging purpose. "
93 "limited list of encoding allowed : %s.",
94 ", ".join(cp_isolation),
95 )
96 cp_isolation = [iana_name(cp, False) for cp in cp_isolation]
97 else:
98 cp_isolation = []
99
100 if cp_exclusion is not None:
101 logger.log(
102 TRACE,
103 "cp_exclusion is set. use this flag for debugging purpose. "
104 "limited list of encoding excluded : %s.",
105 ", ".join(cp_exclusion),
106 )
107 cp_exclusion = [iana_name(cp, False) for cp in cp_exclusion]
108 else:
109 cp_exclusion = []
110
111 if length <= (chunk_size * steps):
112 logger.log(
113 TRACE,
114 "override steps (%i) and chunk_size (%i) as content does not fit (%i byte(s) given) parameters.",
115 steps,
116 chunk_size,
117 length,
118 )
119 steps = 1
120 chunk_size = length
121
122 if steps > 1 and length / steps < chunk_size:
123 chunk_size = int(length / steps)
124
125 is_too_small_sequence: bool = len(sequences) < TOO_SMALL_SEQUENCE
126 is_too_large_sequence: bool = len(sequences) >= TOO_BIG_SEQUENCE
127
128 if is_too_small_sequence:
129 logger.log(
130 TRACE,
131 "Trying to detect encoding from a tiny portion of ({}) byte(s).".format(
132 length
133 ),
134 )
135 elif is_too_large_sequence:
136 logger.log(
137 TRACE,
138 "Using lazy str decoding because the payload is quite large, ({}) byte(s).".format(
139 length
140 ),
141 )
142
143 prioritized_encodings: list[str] = []
144
145 specified_encoding: str | None = (
146 any_specified_encoding(sequences) if preemptive_behaviour else None
147 )
148
149 if specified_encoding is not None:
150 prioritized_encodings.append(specified_encoding)
151 logger.log(
152 TRACE,
153 "Detected declarative mark in sequence. Priority +1 given for %s.",
154 specified_encoding,
155 )
156
157 tested: set[str] = set()
158 tested_but_hard_failure: list[str] = []
159 tested_but_soft_failure: list[str] = []
160 soft_failure_skip: set[str] = set()
161
162 fallback_ascii: CharsetMatch | None = None
163 fallback_u8: CharsetMatch | None = None
164 fallback_specified: CharsetMatch | None = None
165
166 results: CharsetMatches = CharsetMatches()
167
168 early_stop_results: CharsetMatches = CharsetMatches()
169
170 sig_encoding, sig_payload = identify_sig_or_bom(sequences)
171
172 if sig_encoding is not None:
173 prioritized_encodings.append(sig_encoding)
174 logger.log(
175 TRACE,
176 "Detected a SIG or BOM mark on first %i byte(s). Priority +1 given for %s.",
177 len(sig_payload),
178 sig_encoding,
179 )
180
181 prioritized_encodings.append("ascii")
182
183 if "utf_8" not in prioritized_encodings:
184 prioritized_encodings.append("utf_8")
185
186 for encoding_iana in prioritized_encodings + IANA_SUPPORTED:
187 if cp_isolation and encoding_iana not in cp_isolation:
188 continue
189
190 if cp_exclusion and encoding_iana in cp_exclusion:
191 continue
192
193 if encoding_iana in tested:
194 continue
195
196 tested.add(encoding_iana)
197
198 decoded_payload: str | None = None
199 bom_or_sig_available: bool = sig_encoding == encoding_iana
200 strip_sig_or_bom: bool = bom_or_sig_available and should_strip_sig_or_bom(
201 encoding_iana
202 )
203
204 if encoding_iana in {"utf_16", "utf_32"} and not bom_or_sig_available:
205 logger.log(
206 TRACE,
207 "Encoding %s won't be tested as-is because it require a BOM. Will try some sub-encoder LE/BE.",
208 encoding_iana,
209 )
210 continue
211 if encoding_iana in {"utf_7"} and not bom_or_sig_available:
212 logger.log(
213 TRACE,
214 "Encoding %s won't be tested as-is because detection is unreliable without BOM/SIG.",
215 encoding_iana,
216 )
217 continue
218
219 # Skip encodings similar to ones that already soft-failed (high mess ratio).
220 # Checked BEFORE the expensive decode attempt.
221 if encoding_iana in soft_failure_skip:
222 logger.log(
223 TRACE,
224 "%s is deemed too similar to a code page that was already considered unsuited. Continuing!",
225 encoding_iana,
226 )
227 continue
228
229 try:
230 is_multi_byte_decoder: bool = is_multi_byte_encoding(encoding_iana)
231 except (ModuleNotFoundError, ImportError):
232 logger.log(
233 TRACE,
234 "Encoding %s does not provide an IncrementalDecoder",
235 encoding_iana,
236 )
237 continue
238
239 try:
240 if is_too_large_sequence and is_multi_byte_decoder is False:
241 str(
242 (
243 sequences[: int(50e4)]
244 if strip_sig_or_bom is False
245 else sequences[len(sig_payload) : int(50e4)]
246 ),
247 encoding=encoding_iana,
248 )
249 else:
250 decoded_payload = str(
251 (
252 sequences
253 if strip_sig_or_bom is False
254 else sequences[len(sig_payload) :]
255 ),
256 encoding=encoding_iana,
257 )
258 except (UnicodeDecodeError, LookupError) as e:
259 if not isinstance(e, LookupError):
260 logger.log(
261 TRACE,
262 "Code page %s does not fit given bytes sequence at ALL. %s",
263 encoding_iana,
264 str(e),
265 )
266 tested_but_hard_failure.append(encoding_iana)
267 continue
268
269 r_ = range(
270 0 if not bom_or_sig_available else len(sig_payload),
271 length,
272 int(length / steps),
273 )
274
275 multi_byte_bonus: bool = (
276 is_multi_byte_decoder
277 and decoded_payload is not None
278 and len(decoded_payload) < length
279 )
280
281 if multi_byte_bonus:
282 logger.log(
283 TRACE,
284 "Code page %s is a multi byte encoding table and it appear that at least one character "
285 "was encoded using n-bytes.",
286 encoding_iana,
287 )
288
289 max_chunk_gave_up: int = int(len(r_) / 4)
290
291 max_chunk_gave_up = max(max_chunk_gave_up, 2)
292 early_stop_count: int = 0
293 lazy_str_hard_failure = False
294
295 md_chunks: list[str] = []
296 md_ratios = []
297
298 try:
299 for chunk in cut_sequence_chunks(
300 sequences,
301 encoding_iana,
302 r_,
303 chunk_size,
304 bom_or_sig_available,
305 strip_sig_or_bom,
306 sig_payload,
307 is_multi_byte_decoder,
308 decoded_payload,
309 ):
310 md_chunks.append(chunk)
311
312 md_ratios.append(
313 mess_ratio(
314 chunk,
315 threshold,
316 explain is True and 1 <= len(cp_isolation) <= 2,
317 )
318 )
319
320 if md_ratios[-1] >= threshold:
321 early_stop_count += 1
322
323 if (early_stop_count >= max_chunk_gave_up) or (
324 bom_or_sig_available and strip_sig_or_bom is False
325 ):
326 break
327 except (
328 UnicodeDecodeError
329 ) as e: # Lazy str loading may have missed something there
330 logger.log(
331 TRACE,
332 "LazyStr Loading: After MD chunk decode, code page %s does not fit given bytes sequence at ALL. %s",
333 encoding_iana,
334 str(e),
335 )
336 early_stop_count = max_chunk_gave_up
337 lazy_str_hard_failure = True
338
339 # We might want to check the sequence again with the whole content
340 # Only if initial MD tests passes
341 if (
342 not lazy_str_hard_failure
343 and is_too_large_sequence
344 and not is_multi_byte_decoder
345 ):
346 try:
347 sequences[int(50e3) :].decode(encoding_iana, errors="strict")
348 except UnicodeDecodeError as e:
349 logger.log(
350 TRACE,
351 "LazyStr Loading: After final lookup, code page %s does not fit given bytes sequence at ALL. %s",
352 encoding_iana,
353 str(e),
354 )
355 tested_but_hard_failure.append(encoding_iana)
356 continue
357
358 mean_mess_ratio: float = sum(md_ratios) / len(md_ratios) if md_ratios else 0.0
359 if mean_mess_ratio >= threshold or early_stop_count >= max_chunk_gave_up:
360 tested_but_soft_failure.append(encoding_iana)
361 if encoding_iana in IANA_SUPPORTED_SIMILAR:
362 soft_failure_skip.update(IANA_SUPPORTED_SIMILAR[encoding_iana])
363 logger.log(
364 TRACE,
365 "%s was excluded because of initial chaos probing. Gave up %i time(s). "
366 "Computed mean chaos is %f %%.",
367 encoding_iana,
368 early_stop_count,
369 round(mean_mess_ratio * 100, ndigits=3),
370 )
371 # Preparing those fallbacks in case we got nothing.
372 if (
373 enable_fallback
374 and encoding_iana
375 in ["ascii", "utf_8", specified_encoding, "utf_16", "utf_32"]
376 and not lazy_str_hard_failure
377 ):
378 fallback_entry = CharsetMatch(
379 sequences,
380 encoding_iana,
381 threshold,
382 bom_or_sig_available,
383 [],
384 decoded_payload,
385 preemptive_declaration=specified_encoding,
386 )
387 if encoding_iana == specified_encoding:
388 fallback_specified = fallback_entry
389 elif encoding_iana == "ascii":
390 fallback_ascii = fallback_entry
391 else:
392 fallback_u8 = fallback_entry
393 continue
394
395 logger.log(
396 TRACE,
397 "%s passed initial chaos probing. Mean measured chaos is %f %%",
398 encoding_iana,
399 round(mean_mess_ratio * 100, ndigits=3),
400 )
401
402 if not is_multi_byte_decoder:
403 target_languages: list[str] = encoding_languages(encoding_iana)
404 else:
405 target_languages = mb_encoding_languages(encoding_iana)
406
407 if target_languages:
408 logger.log(
409 TRACE,
410 "{} should target any language(s) of {}".format(
411 encoding_iana, str(target_languages)
412 ),
413 )
414
415 cd_ratios = []
416
417 # We shall skip the CD when its about ASCII
418 # Most of the time its not relevant to run "language-detection" on it.
419 if encoding_iana != "ascii":
420 for chunk in md_chunks:
421 chunk_languages = coherence_ratio(
422 chunk,
423 language_threshold,
424 ",".join(target_languages) if target_languages else None,
425 )
426
427 cd_ratios.append(chunk_languages)
428
429 cd_ratios_merged = merge_coherence_ratios(cd_ratios)
430
431 if cd_ratios_merged:
432 logger.log(
433 TRACE,
434 "We detected language {} using {}".format(
435 cd_ratios_merged, encoding_iana
436 ),
437 )
438
439 current_match = CharsetMatch(
440 sequences,
441 encoding_iana,
442 mean_mess_ratio,
443 bom_or_sig_available,
444 cd_ratios_merged,
445 (
446 decoded_payload
447 if (
448 is_too_large_sequence is False
449 or encoding_iana in [specified_encoding, "ascii", "utf_8"]
450 )
451 else None
452 ),
453 preemptive_declaration=specified_encoding,
454 )
455
456 results.append(current_match)
457
458 if (
459 encoding_iana in [specified_encoding, "ascii", "utf_8"]
460 and mean_mess_ratio < 0.1
461 ):
462 # If md says nothing to worry about, then... stop immediately!
463 if mean_mess_ratio == 0.0:
464 logger.debug(
465 "Encoding detection: %s is most likely the one.",
466 current_match.encoding,
467 )
468 if explain: # Defensive: ensure exit path clean handler
469 logger.removeHandler(explain_handler)
470 logger.setLevel(previous_logger_level)
471 return CharsetMatches([current_match])
472
473 early_stop_results.append(current_match)
474
475 if (
476 len(early_stop_results)
477 and (specified_encoding is None or specified_encoding in tested)
478 and "ascii" in tested
479 and "utf_8" in tested
480 ):
481 probable_result: CharsetMatch = early_stop_results.best() # type: ignore[assignment]
482 logger.debug(
483 "Encoding detection: %s is most likely the one.",
484 probable_result.encoding,
485 )
486 if explain: # Defensive: ensure exit path clean handler
487 logger.removeHandler(explain_handler)
488 logger.setLevel(previous_logger_level)
489
490 return CharsetMatches([probable_result])
491
492 if encoding_iana == sig_encoding:
493 logger.debug(
494 "Encoding detection: %s is most likely the one as we detected a BOM or SIG within "
495 "the beginning of the sequence.",
496 encoding_iana,
497 )
498 if explain: # Defensive: ensure exit path clean handler
499 logger.removeHandler(explain_handler)
500 logger.setLevel(previous_logger_level)
501 return CharsetMatches([results[encoding_iana]])
502
503 if len(results) == 0:
504 if fallback_u8 or fallback_ascii or fallback_specified:
505 logger.log(
506 TRACE,
507 "Nothing got out of the detection process. Using ASCII/UTF-8/Specified fallback.",
508 )
509
510 if fallback_specified:
511 logger.debug(
512 "Encoding detection: %s will be used as a fallback match",
513 fallback_specified.encoding,
514 )
515 results.append(fallback_specified)
516 elif (
517 (fallback_u8 and fallback_ascii is None)
518 or (
519 fallback_u8
520 and fallback_ascii
521 and fallback_u8.fingerprint != fallback_ascii.fingerprint
522 )
523 or (fallback_u8 is not None)
524 ):
525 logger.debug("Encoding detection: utf_8 will be used as a fallback match")
526 results.append(fallback_u8)
527 elif fallback_ascii:
528 logger.debug("Encoding detection: ascii will be used as a fallback match")
529 results.append(fallback_ascii)
530
531 if results:
532 logger.debug(
533 "Encoding detection: Found %s as plausible (best-candidate) for content. With %i alternatives.",
534 results.best().encoding, # type: ignore
535 len(results) - 1,
536 )
537 else:
538 logger.debug("Encoding detection: Unable to determine any suitable charset.")
539
540 if explain:
541 logger.removeHandler(explain_handler)
542 logger.setLevel(previous_logger_level)
543
544 return results
545
546
547def from_fp(
548 fp: BinaryIO,
549 steps: int = 5,
550 chunk_size: int = 512,
551 threshold: float = 0.20,
552 cp_isolation: list[str] | None = None,
553 cp_exclusion: list[str] | None = None,
554 preemptive_behaviour: bool = True,
555 explain: bool = False,
556 language_threshold: float = 0.1,
557 enable_fallback: bool = True,
558) -> CharsetMatches:
559 """
560 Same thing than the function from_bytes but using a file pointer that is already ready.
561 Will not close the file pointer.
562 """
563 return from_bytes(
564 fp.read(),
565 steps,
566 chunk_size,
567 threshold,
568 cp_isolation,
569 cp_exclusion,
570 preemptive_behaviour,
571 explain,
572 language_threshold,
573 enable_fallback,
574 )
575
576
577def from_path(
578 path: str | bytes | PathLike, # type: ignore[type-arg]
579 steps: int = 5,
580 chunk_size: int = 512,
581 threshold: float = 0.20,
582 cp_isolation: list[str] | None = None,
583 cp_exclusion: list[str] | None = None,
584 preemptive_behaviour: bool = True,
585 explain: bool = False,
586 language_threshold: float = 0.1,
587 enable_fallback: bool = True,
588) -> CharsetMatches:
589 """
590 Same thing than the function from_bytes but with one extra step. Opening and reading given file path in binary mode.
591 Can raise IOError.
592 """
593 with open(path, "rb") as fp:
594 return from_fp(
595 fp,
596 steps,
597 chunk_size,
598 threshold,
599 cp_isolation,
600 cp_exclusion,
601 preemptive_behaviour,
602 explain,
603 language_threshold,
604 enable_fallback,
605 )
606
607
608def is_binary(
609 fp_or_path_or_payload: PathLike | str | BinaryIO | bytes, # type: ignore[type-arg]
610 steps: int = 5,
611 chunk_size: int = 512,
612 threshold: float = 0.20,
613 cp_isolation: list[str] | None = None,
614 cp_exclusion: list[str] | None = None,
615 preemptive_behaviour: bool = True,
616 explain: bool = False,
617 language_threshold: float = 0.1,
618 enable_fallback: bool = False,
619) -> bool:
620 """
621 Detect if the given input (file, bytes, or path) points to a binary file. aka. not a string.
622 Based on the same main heuristic algorithms and default kwargs at the sole exception that fallbacks match
623 are disabled to be stricter around ASCII-compatible but unlikely to be a string.
624 """
625 if isinstance(fp_or_path_or_payload, (str, PathLike)):
626 guesses = from_path(
627 fp_or_path_or_payload,
628 steps=steps,
629 chunk_size=chunk_size,
630 threshold=threshold,
631 cp_isolation=cp_isolation,
632 cp_exclusion=cp_exclusion,
633 preemptive_behaviour=preemptive_behaviour,
634 explain=explain,
635 language_threshold=language_threshold,
636 enable_fallback=enable_fallback,
637 )
638 elif isinstance(
639 fp_or_path_or_payload,
640 (
641 bytes,
642 bytearray,
643 ),
644 ):
645 guesses = from_bytes(
646 fp_or_path_or_payload,
647 steps=steps,
648 chunk_size=chunk_size,
649 threshold=threshold,
650 cp_isolation=cp_isolation,
651 cp_exclusion=cp_exclusion,
652 preemptive_behaviour=preemptive_behaviour,
653 explain=explain,
654 language_threshold=language_threshold,
655 enable_fallback=enable_fallback,
656 )
657 else:
658 guesses = from_fp(
659 fp_or_path_or_payload,
660 steps=steps,
661 chunk_size=chunk_size,
662 threshold=threshold,
663 cp_isolation=cp_isolation,
664 cp_exclusion=cp_exclusion,
665 preemptive_behaviour=preemptive_behaviour,
666 explain=explain,
667 language_threshold=language_threshold,
668 enable_fallback=enable_fallback,
669 )
670
671 return not guesses