1import logging
2from os import PathLike
3from typing import BinaryIO, List, Optional, Set, Union
4
5from .cd import (
6 coherence_ratio,
7 encoding_languages,
8 mb_encoding_languages,
9 merge_coherence_ratios,
10)
11from .constant import IANA_SUPPORTED, TOO_BIG_SEQUENCE, TOO_SMALL_SEQUENCE, TRACE
12from .md import mess_ratio
13from .models import CharsetMatch, CharsetMatches
14from .utils import (
15 any_specified_encoding,
16 cut_sequence_chunks,
17 iana_name,
18 identify_sig_or_bom,
19 is_cp_similar,
20 is_multi_byte_encoding,
21 should_strip_sig_or_bom,
22)
23
24# Will most likely be controversial
25# logging.addLevelName(TRACE, "TRACE")
26logger = logging.getLogger("charset_normalizer")
27explain_handler = logging.StreamHandler()
28explain_handler.setFormatter(
29 logging.Formatter("%(asctime)s | %(levelname)s | %(message)s")
30)
31
32
33def from_bytes(
34 sequences: Union[bytes, bytearray],
35 steps: int = 5,
36 chunk_size: int = 512,
37 threshold: float = 0.2,
38 cp_isolation: Optional[List[str]] = None,
39 cp_exclusion: Optional[List[str]] = None,
40 preemptive_behaviour: bool = True,
41 explain: bool = False,
42 language_threshold: float = 0.1,
43 enable_fallback: bool = True,
44) -> CharsetMatches:
45 """
46 Given a raw bytes sequence, return the best possibles charset usable to render str objects.
47 If there is no results, it is a strong indicator that the source is binary/not text.
48 By default, the process will extract 5 blocks of 512o each to assess the mess and coherence of a given sequence.
49 And will give up a particular code page after 20% of measured mess. Those criteria are customizable at will.
50
51 The preemptive behavior DOES NOT replace the traditional detection workflow, it prioritize a particular code page
52 but never take it for granted. Can improve the performance.
53
54 You may want to focus your attention to some code page or/and not others, use cp_isolation and cp_exclusion for that
55 purpose.
56
57 This function will strip the SIG in the payload/sequence every time except on UTF-16, UTF-32.
58 By default the library does not setup any handler other than the NullHandler, if you choose to set the 'explain'
59 toggle to True it will alter the logger configuration to add a StreamHandler that is suitable for debugging.
60 Custom logging format and handler can be set manually.
61 """
62
63 if not isinstance(sequences, (bytearray, bytes)):
64 raise TypeError(
65 "Expected object of type bytes or bytearray, got: {0}".format(
66 type(sequences)
67 )
68 )
69
70 if explain:
71 previous_logger_level: int = logger.level
72 logger.addHandler(explain_handler)
73 logger.setLevel(TRACE)
74
75 length: int = len(sequences)
76
77 if length == 0:
78 logger.debug("Encoding detection on empty bytes, assuming utf_8 intention.")
79 if explain:
80 logger.removeHandler(explain_handler)
81 logger.setLevel(previous_logger_level or logging.WARNING)
82 return CharsetMatches([CharsetMatch(sequences, "utf_8", 0.0, False, [], "")])
83
84 if cp_isolation is not None:
85 logger.log(
86 TRACE,
87 "cp_isolation is set. use this flag for debugging purpose. "
88 "limited list of encoding allowed : %s.",
89 ", ".join(cp_isolation),
90 )
91 cp_isolation = [iana_name(cp, False) for cp in cp_isolation]
92 else:
93 cp_isolation = []
94
95 if cp_exclusion is not None:
96 logger.log(
97 TRACE,
98 "cp_exclusion is set. use this flag for debugging purpose. "
99 "limited list of encoding excluded : %s.",
100 ", ".join(cp_exclusion),
101 )
102 cp_exclusion = [iana_name(cp, False) for cp in cp_exclusion]
103 else:
104 cp_exclusion = []
105
106 if length <= (chunk_size * steps):
107 logger.log(
108 TRACE,
109 "override steps (%i) and chunk_size (%i) as content does not fit (%i byte(s) given) parameters.",
110 steps,
111 chunk_size,
112 length,
113 )
114 steps = 1
115 chunk_size = length
116
117 if steps > 1 and length / steps < chunk_size:
118 chunk_size = int(length / steps)
119
120 is_too_small_sequence: bool = len(sequences) < TOO_SMALL_SEQUENCE
121 is_too_large_sequence: bool = len(sequences) >= TOO_BIG_SEQUENCE
122
123 if is_too_small_sequence:
124 logger.log(
125 TRACE,
126 "Trying to detect encoding from a tiny portion of ({}) byte(s).".format(
127 length
128 ),
129 )
130 elif is_too_large_sequence:
131 logger.log(
132 TRACE,
133 "Using lazy str decoding because the payload is quite large, ({}) byte(s).".format(
134 length
135 ),
136 )
137
138 prioritized_encodings: List[str] = []
139
140 specified_encoding: Optional[str] = (
141 any_specified_encoding(sequences) if preemptive_behaviour else None
142 )
143
144 if specified_encoding is not None:
145 prioritized_encodings.append(specified_encoding)
146 logger.log(
147 TRACE,
148 "Detected declarative mark in sequence. Priority +1 given for %s.",
149 specified_encoding,
150 )
151
152 tested: Set[str] = set()
153 tested_but_hard_failure: List[str] = []
154 tested_but_soft_failure: List[str] = []
155
156 fallback_ascii: Optional[CharsetMatch] = None
157 fallback_u8: Optional[CharsetMatch] = None
158 fallback_specified: Optional[CharsetMatch] = None
159
160 results: CharsetMatches = CharsetMatches()
161
162 sig_encoding, sig_payload = identify_sig_or_bom(sequences)
163
164 if sig_encoding is not None:
165 prioritized_encodings.append(sig_encoding)
166 logger.log(
167 TRACE,
168 "Detected a SIG or BOM mark on first %i byte(s). Priority +1 given for %s.",
169 len(sig_payload),
170 sig_encoding,
171 )
172
173 prioritized_encodings.append("ascii")
174
175 if "utf_8" not in prioritized_encodings:
176 prioritized_encodings.append("utf_8")
177
178 for encoding_iana in prioritized_encodings + IANA_SUPPORTED:
179 if cp_isolation and encoding_iana not in cp_isolation:
180 continue
181
182 if cp_exclusion and encoding_iana in cp_exclusion:
183 continue
184
185 if encoding_iana in tested:
186 continue
187
188 tested.add(encoding_iana)
189
190 decoded_payload: Optional[str] = None
191 bom_or_sig_available: bool = sig_encoding == encoding_iana
192 strip_sig_or_bom: bool = bom_or_sig_available and should_strip_sig_or_bom(
193 encoding_iana
194 )
195
196 if encoding_iana in {"utf_16", "utf_32"} and not bom_or_sig_available:
197 logger.log(
198 TRACE,
199 "Encoding %s won't be tested as-is because it require a BOM. Will try some sub-encoder LE/BE.",
200 encoding_iana,
201 )
202 continue
203 if encoding_iana in {"utf_7"} and not bom_or_sig_available:
204 logger.log(
205 TRACE,
206 "Encoding %s won't be tested as-is because detection is unreliable without BOM/SIG.",
207 encoding_iana,
208 )
209 continue
210
211 try:
212 is_multi_byte_decoder: bool = is_multi_byte_encoding(encoding_iana)
213 except (ModuleNotFoundError, ImportError):
214 logger.log(
215 TRACE,
216 "Encoding %s does not provide an IncrementalDecoder",
217 encoding_iana,
218 )
219 continue
220
221 try:
222 if is_too_large_sequence and is_multi_byte_decoder is False:
223 str(
224 sequences[: int(50e4)]
225 if strip_sig_or_bom is False
226 else sequences[len(sig_payload) : int(50e4)],
227 encoding=encoding_iana,
228 )
229 else:
230 decoded_payload = str(
231 sequences
232 if strip_sig_or_bom is False
233 else sequences[len(sig_payload) :],
234 encoding=encoding_iana,
235 )
236 except (UnicodeDecodeError, LookupError) as e:
237 if not isinstance(e, LookupError):
238 logger.log(
239 TRACE,
240 "Code page %s does not fit given bytes sequence at ALL. %s",
241 encoding_iana,
242 str(e),
243 )
244 tested_but_hard_failure.append(encoding_iana)
245 continue
246
247 similar_soft_failure_test: bool = False
248
249 for encoding_soft_failed in tested_but_soft_failure:
250 if is_cp_similar(encoding_iana, encoding_soft_failed):
251 similar_soft_failure_test = True
252 break
253
254 if similar_soft_failure_test:
255 logger.log(
256 TRACE,
257 "%s is deemed too similar to code page %s and was consider unsuited already. Continuing!",
258 encoding_iana,
259 encoding_soft_failed,
260 )
261 continue
262
263 r_ = range(
264 0 if not bom_or_sig_available else len(sig_payload),
265 length,
266 int(length / steps),
267 )
268
269 multi_byte_bonus: bool = (
270 is_multi_byte_decoder
271 and decoded_payload is not None
272 and len(decoded_payload) < length
273 )
274
275 if multi_byte_bonus:
276 logger.log(
277 TRACE,
278 "Code page %s is a multi byte encoding table and it appear that at least one character "
279 "was encoded using n-bytes.",
280 encoding_iana,
281 )
282
283 max_chunk_gave_up: int = int(len(r_) / 4)
284
285 max_chunk_gave_up = max(max_chunk_gave_up, 2)
286 early_stop_count: int = 0
287 lazy_str_hard_failure = False
288
289 md_chunks: List[str] = []
290 md_ratios = []
291
292 try:
293 for chunk in cut_sequence_chunks(
294 sequences,
295 encoding_iana,
296 r_,
297 chunk_size,
298 bom_or_sig_available,
299 strip_sig_or_bom,
300 sig_payload,
301 is_multi_byte_decoder,
302 decoded_payload,
303 ):
304 md_chunks.append(chunk)
305
306 md_ratios.append(
307 mess_ratio(
308 chunk,
309 threshold,
310 explain is True and 1 <= len(cp_isolation) <= 2,
311 )
312 )
313
314 if md_ratios[-1] >= threshold:
315 early_stop_count += 1
316
317 if (early_stop_count >= max_chunk_gave_up) or (
318 bom_or_sig_available and strip_sig_or_bom is False
319 ):
320 break
321 except (
322 UnicodeDecodeError
323 ) as e: # Lazy str loading may have missed something there
324 logger.log(
325 TRACE,
326 "LazyStr Loading: After MD chunk decode, code page %s does not fit given bytes sequence at ALL. %s",
327 encoding_iana,
328 str(e),
329 )
330 early_stop_count = max_chunk_gave_up
331 lazy_str_hard_failure = True
332
333 # We might want to check the sequence again with the whole content
334 # Only if initial MD tests passes
335 if (
336 not lazy_str_hard_failure
337 and is_too_large_sequence
338 and not is_multi_byte_decoder
339 ):
340 try:
341 sequences[int(50e3) :].decode(encoding_iana, errors="strict")
342 except UnicodeDecodeError as e:
343 logger.log(
344 TRACE,
345 "LazyStr Loading: After final lookup, code page %s does not fit given bytes sequence at ALL. %s",
346 encoding_iana,
347 str(e),
348 )
349 tested_but_hard_failure.append(encoding_iana)
350 continue
351
352 mean_mess_ratio: float = sum(md_ratios) / len(md_ratios) if md_ratios else 0.0
353 if mean_mess_ratio >= threshold or early_stop_count >= max_chunk_gave_up:
354 tested_but_soft_failure.append(encoding_iana)
355 logger.log(
356 TRACE,
357 "%s was excluded because of initial chaos probing. Gave up %i time(s). "
358 "Computed mean chaos is %f %%.",
359 encoding_iana,
360 early_stop_count,
361 round(mean_mess_ratio * 100, ndigits=3),
362 )
363 # Preparing those fallbacks in case we got nothing.
364 if (
365 enable_fallback
366 and encoding_iana in ["ascii", "utf_8", specified_encoding]
367 and not lazy_str_hard_failure
368 ):
369 fallback_entry = CharsetMatch(
370 sequences, encoding_iana, threshold, False, [], decoded_payload
371 )
372 if encoding_iana == specified_encoding:
373 fallback_specified = fallback_entry
374 elif encoding_iana == "ascii":
375 fallback_ascii = fallback_entry
376 else:
377 fallback_u8 = fallback_entry
378 continue
379
380 logger.log(
381 TRACE,
382 "%s passed initial chaos probing. Mean measured chaos is %f %%",
383 encoding_iana,
384 round(mean_mess_ratio * 100, ndigits=3),
385 )
386
387 if not is_multi_byte_decoder:
388 target_languages: List[str] = encoding_languages(encoding_iana)
389 else:
390 target_languages = mb_encoding_languages(encoding_iana)
391
392 if target_languages:
393 logger.log(
394 TRACE,
395 "{} should target any language(s) of {}".format(
396 encoding_iana, str(target_languages)
397 ),
398 )
399
400 cd_ratios = []
401
402 # We shall skip the CD when its about ASCII
403 # Most of the time its not relevant to run "language-detection" on it.
404 if encoding_iana != "ascii":
405 for chunk in md_chunks:
406 chunk_languages = coherence_ratio(
407 chunk,
408 language_threshold,
409 ",".join(target_languages) if target_languages else None,
410 )
411
412 cd_ratios.append(chunk_languages)
413
414 cd_ratios_merged = merge_coherence_ratios(cd_ratios)
415
416 if cd_ratios_merged:
417 logger.log(
418 TRACE,
419 "We detected language {} using {}".format(
420 cd_ratios_merged, encoding_iana
421 ),
422 )
423
424 results.append(
425 CharsetMatch(
426 sequences,
427 encoding_iana,
428 mean_mess_ratio,
429 bom_or_sig_available,
430 cd_ratios_merged,
431 decoded_payload,
432 )
433 )
434
435 if (
436 encoding_iana in [specified_encoding, "ascii", "utf_8"]
437 and mean_mess_ratio < 0.1
438 ):
439 logger.debug(
440 "Encoding detection: %s is most likely the one.", encoding_iana
441 )
442 if explain:
443 logger.removeHandler(explain_handler)
444 logger.setLevel(previous_logger_level)
445 return CharsetMatches([results[encoding_iana]])
446
447 if encoding_iana == sig_encoding:
448 logger.debug(
449 "Encoding detection: %s is most likely the one as we detected a BOM or SIG within "
450 "the beginning of the sequence.",
451 encoding_iana,
452 )
453 if explain:
454 logger.removeHandler(explain_handler)
455 logger.setLevel(previous_logger_level)
456 return CharsetMatches([results[encoding_iana]])
457
458 if len(results) == 0:
459 if fallback_u8 or fallback_ascii or fallback_specified:
460 logger.log(
461 TRACE,
462 "Nothing got out of the detection process. Using ASCII/UTF-8/Specified fallback.",
463 )
464
465 if fallback_specified:
466 logger.debug(
467 "Encoding detection: %s will be used as a fallback match",
468 fallback_specified.encoding,
469 )
470 results.append(fallback_specified)
471 elif (
472 (fallback_u8 and fallback_ascii is None)
473 or (
474 fallback_u8
475 and fallback_ascii
476 and fallback_u8.fingerprint != fallback_ascii.fingerprint
477 )
478 or (fallback_u8 is not None)
479 ):
480 logger.debug("Encoding detection: utf_8 will be used as a fallback match")
481 results.append(fallback_u8)
482 elif fallback_ascii:
483 logger.debug("Encoding detection: ascii will be used as a fallback match")
484 results.append(fallback_ascii)
485
486 if results:
487 logger.debug(
488 "Encoding detection: Found %s as plausible (best-candidate) for content. With %i alternatives.",
489 results.best().encoding, # type: ignore
490 len(results) - 1,
491 )
492 else:
493 logger.debug("Encoding detection: Unable to determine any suitable charset.")
494
495 if explain:
496 logger.removeHandler(explain_handler)
497 logger.setLevel(previous_logger_level)
498
499 return results
500
501
502def from_fp(
503 fp: BinaryIO,
504 steps: int = 5,
505 chunk_size: int = 512,
506 threshold: float = 0.20,
507 cp_isolation: Optional[List[str]] = None,
508 cp_exclusion: Optional[List[str]] = None,
509 preemptive_behaviour: bool = True,
510 explain: bool = False,
511 language_threshold: float = 0.1,
512 enable_fallback: bool = True,
513) -> CharsetMatches:
514 """
515 Same thing than the function from_bytes but using a file pointer that is already ready.
516 Will not close the file pointer.
517 """
518 return from_bytes(
519 fp.read(),
520 steps,
521 chunk_size,
522 threshold,
523 cp_isolation,
524 cp_exclusion,
525 preemptive_behaviour,
526 explain,
527 language_threshold,
528 enable_fallback,
529 )
530
531
532def from_path(
533 path: Union[str, bytes, PathLike], # type: ignore[type-arg]
534 steps: int = 5,
535 chunk_size: int = 512,
536 threshold: float = 0.20,
537 cp_isolation: Optional[List[str]] = None,
538 cp_exclusion: Optional[List[str]] = None,
539 preemptive_behaviour: bool = True,
540 explain: bool = False,
541 language_threshold: float = 0.1,
542 enable_fallback: bool = True,
543) -> CharsetMatches:
544 """
545 Same thing than the function from_bytes but with one extra step. Opening and reading given file path in binary mode.
546 Can raise IOError.
547 """
548 with open(path, "rb") as fp:
549 return from_fp(
550 fp,
551 steps,
552 chunk_size,
553 threshold,
554 cp_isolation,
555 cp_exclusion,
556 preemptive_behaviour,
557 explain,
558 language_threshold,
559 enable_fallback,
560 )
561
562
563def is_binary(
564 fp_or_path_or_payload: Union[PathLike, str, BinaryIO, bytes], # type: ignore[type-arg]
565 steps: int = 5,
566 chunk_size: int = 512,
567 threshold: float = 0.20,
568 cp_isolation: Optional[List[str]] = None,
569 cp_exclusion: Optional[List[str]] = None,
570 preemptive_behaviour: bool = True,
571 explain: bool = False,
572 language_threshold: float = 0.1,
573 enable_fallback: bool = False,
574) -> bool:
575 """
576 Detect if the given input (file, bytes, or path) points to a binary file. aka. not a string.
577 Based on the same main heuristic algorithms and default kwargs at the sole exception that fallbacks match
578 are disabled to be stricter around ASCII-compatible but unlikely to be a string.
579 """
580 if isinstance(fp_or_path_or_payload, (str, PathLike)):
581 guesses = from_path(
582 fp_or_path_or_payload,
583 steps=steps,
584 chunk_size=chunk_size,
585 threshold=threshold,
586 cp_isolation=cp_isolation,
587 cp_exclusion=cp_exclusion,
588 preemptive_behaviour=preemptive_behaviour,
589 explain=explain,
590 language_threshold=language_threshold,
591 enable_fallback=enable_fallback,
592 )
593 elif isinstance(
594 fp_or_path_or_payload,
595 (
596 bytes,
597 bytearray,
598 ),
599 ):
600 guesses = from_bytes(
601 fp_or_path_or_payload,
602 steps=steps,
603 chunk_size=chunk_size,
604 threshold=threshold,
605 cp_isolation=cp_isolation,
606 cp_exclusion=cp_exclusion,
607 preemptive_behaviour=preemptive_behaviour,
608 explain=explain,
609 language_threshold=language_threshold,
610 enable_fallback=enable_fallback,
611 )
612 else:
613 guesses = from_fp(
614 fp_or_path_or_payload,
615 steps=steps,
616 chunk_size=chunk_size,
617 threshold=threshold,
618 cp_isolation=cp_isolation,
619 cp_exclusion=cp_exclusion,
620 preemptive_behaviour=preemptive_behaviour,
621 explain=explain,
622 language_threshold=language_threshold,
623 enable_fallback=enable_fallback,
624 )
625
626 return not guesses