Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/charset_normalizer/api.py: 7%
195 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:25 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:25 +0000
1import logging
2from os import PathLike
3from typing import Any, BinaryIO, List, Optional, Set
5from .cd import (
6 coherence_ratio,
7 encoding_languages,
8 mb_encoding_languages,
9 merge_coherence_ratios,
10)
11from .constant import IANA_SUPPORTED, TOO_BIG_SEQUENCE, TOO_SMALL_SEQUENCE, TRACE
12from .md import mess_ratio
13from .models import CharsetMatch, CharsetMatches
14from .utils import (
15 any_specified_encoding,
16 cut_sequence_chunks,
17 iana_name,
18 identify_sig_or_bom,
19 is_cp_similar,
20 is_multi_byte_encoding,
21 should_strip_sig_or_bom,
22)
24# Will most likely be controversial
25# logging.addLevelName(TRACE, "TRACE")
26logger = logging.getLogger("charset_normalizer")
27explain_handler = logging.StreamHandler()
28explain_handler.setFormatter(
29 logging.Formatter("%(asctime)s | %(levelname)s | %(message)s")
30)
33def from_bytes(
34 sequences: bytes,
35 steps: int = 5,
36 chunk_size: int = 512,
37 threshold: float = 0.2,
38 cp_isolation: Optional[List[str]] = None,
39 cp_exclusion: Optional[List[str]] = None,
40 preemptive_behaviour: bool = True,
41 explain: bool = False,
42 language_threshold: float = 0.1,
43) -> CharsetMatches:
44 """
45 Given a raw bytes sequence, return the best possibles charset usable to render str objects.
46 If there is no results, it is a strong indicator that the source is binary/not text.
47 By default, the process will extract 5 blocks of 512o each to assess the mess and coherence of a given sequence.
48 And will give up a particular code page after 20% of measured mess. Those criteria are customizable at will.
50 The preemptive behavior DOES NOT replace the traditional detection workflow, it prioritize a particular code page
51 but never take it for granted. Can improve the performance.
53 You may want to focus your attention to some code page or/and not others, use cp_isolation and cp_exclusion for that
54 purpose.
56 This function will strip the SIG in the payload/sequence every time except on UTF-16, UTF-32.
57 By default the library does not setup any handler other than the NullHandler, if you choose to set the 'explain'
58 toggle to True it will alter the logger configuration to add a StreamHandler that is suitable for debugging.
59 Custom logging format and handler can be set manually.
60 """
62 if not isinstance(sequences, (bytearray, bytes)):
63 raise TypeError(
64 "Expected object of type bytes or bytearray, got: {0}".format(
65 type(sequences)
66 )
67 )
69 if explain:
70 previous_logger_level: int = logger.level
71 logger.addHandler(explain_handler)
72 logger.setLevel(TRACE)
74 length: int = len(sequences)
76 if length == 0:
77 logger.debug("Encoding detection on empty bytes, assuming utf_8 intention.")
78 if explain:
79 logger.removeHandler(explain_handler)
80 logger.setLevel(previous_logger_level or logging.WARNING)
81 return CharsetMatches([CharsetMatch(sequences, "utf_8", 0.0, False, [], "")])
83 if cp_isolation is not None:
84 logger.log(
85 TRACE,
86 "cp_isolation is set. use this flag for debugging purpose. "
87 "limited list of encoding allowed : %s.",
88 ", ".join(cp_isolation),
89 )
90 cp_isolation = [iana_name(cp, False) for cp in cp_isolation]
91 else:
92 cp_isolation = []
94 if cp_exclusion is not None:
95 logger.log(
96 TRACE,
97 "cp_exclusion is set. use this flag for debugging purpose. "
98 "limited list of encoding excluded : %s.",
99 ", ".join(cp_exclusion),
100 )
101 cp_exclusion = [iana_name(cp, False) for cp in cp_exclusion]
102 else:
103 cp_exclusion = []
105 if length <= (chunk_size * steps):
106 logger.log(
107 TRACE,
108 "override steps (%i) and chunk_size (%i) as content does not fit (%i byte(s) given) parameters.",
109 steps,
110 chunk_size,
111 length,
112 )
113 steps = 1
114 chunk_size = length
116 if steps > 1 and length / steps < chunk_size:
117 chunk_size = int(length / steps)
119 is_too_small_sequence: bool = len(sequences) < TOO_SMALL_SEQUENCE
120 is_too_large_sequence: bool = len(sequences) >= TOO_BIG_SEQUENCE
122 if is_too_small_sequence:
123 logger.log(
124 TRACE,
125 "Trying to detect encoding from a tiny portion of ({}) byte(s).".format(
126 length
127 ),
128 )
129 elif is_too_large_sequence:
130 logger.log(
131 TRACE,
132 "Using lazy str decoding because the payload is quite large, ({}) byte(s).".format(
133 length
134 ),
135 )
137 prioritized_encodings: List[str] = []
139 specified_encoding: Optional[str] = (
140 any_specified_encoding(sequences) if preemptive_behaviour else None
141 )
143 if specified_encoding is not None:
144 prioritized_encodings.append(specified_encoding)
145 logger.log(
146 TRACE,
147 "Detected declarative mark in sequence. Priority +1 given for %s.",
148 specified_encoding,
149 )
151 tested: Set[str] = set()
152 tested_but_hard_failure: List[str] = []
153 tested_but_soft_failure: List[str] = []
155 fallback_ascii: Optional[CharsetMatch] = None
156 fallback_u8: Optional[CharsetMatch] = None
157 fallback_specified: Optional[CharsetMatch] = None
159 results: CharsetMatches = CharsetMatches()
161 sig_encoding, sig_payload = identify_sig_or_bom(sequences)
163 if sig_encoding is not None:
164 prioritized_encodings.append(sig_encoding)
165 logger.log(
166 TRACE,
167 "Detected a SIG or BOM mark on first %i byte(s). Priority +1 given for %s.",
168 len(sig_payload),
169 sig_encoding,
170 )
172 prioritized_encodings.append("ascii")
174 if "utf_8" not in prioritized_encodings:
175 prioritized_encodings.append("utf_8")
177 for encoding_iana in prioritized_encodings + IANA_SUPPORTED:
178 if cp_isolation and encoding_iana not in cp_isolation:
179 continue
181 if cp_exclusion and encoding_iana in cp_exclusion:
182 continue
184 if encoding_iana in tested:
185 continue
187 tested.add(encoding_iana)
189 decoded_payload: Optional[str] = None
190 bom_or_sig_available: bool = sig_encoding == encoding_iana
191 strip_sig_or_bom: bool = bom_or_sig_available and should_strip_sig_or_bom(
192 encoding_iana
193 )
195 if encoding_iana in {"utf_16", "utf_32"} and not bom_or_sig_available:
196 logger.log(
197 TRACE,
198 "Encoding %s won't be tested as-is because it require a BOM. Will try some sub-encoder LE/BE.",
199 encoding_iana,
200 )
201 continue
202 if encoding_iana in {"utf_7"} and not bom_or_sig_available:
203 logger.log(
204 TRACE,
205 "Encoding %s won't be tested as-is because detection is unreliable without BOM/SIG.",
206 encoding_iana,
207 )
208 continue
210 try:
211 is_multi_byte_decoder: bool = is_multi_byte_encoding(encoding_iana)
212 except (ModuleNotFoundError, ImportError):
213 logger.log(
214 TRACE,
215 "Encoding %s does not provide an IncrementalDecoder",
216 encoding_iana,
217 )
218 continue
220 try:
221 if is_too_large_sequence and is_multi_byte_decoder is False:
222 str(
223 sequences[: int(50e4)]
224 if strip_sig_or_bom is False
225 else sequences[len(sig_payload) : int(50e4)],
226 encoding=encoding_iana,
227 )
228 else:
229 decoded_payload = str(
230 sequences
231 if strip_sig_or_bom is False
232 else sequences[len(sig_payload) :],
233 encoding=encoding_iana,
234 )
235 except (UnicodeDecodeError, LookupError) as e:
236 if not isinstance(e, LookupError):
237 logger.log(
238 TRACE,
239 "Code page %s does not fit given bytes sequence at ALL. %s",
240 encoding_iana,
241 str(e),
242 )
243 tested_but_hard_failure.append(encoding_iana)
244 continue
246 similar_soft_failure_test: bool = False
248 for encoding_soft_failed in tested_but_soft_failure:
249 if is_cp_similar(encoding_iana, encoding_soft_failed):
250 similar_soft_failure_test = True
251 break
253 if similar_soft_failure_test:
254 logger.log(
255 TRACE,
256 "%s is deemed too similar to code page %s and was consider unsuited already. Continuing!",
257 encoding_iana,
258 encoding_soft_failed,
259 )
260 continue
262 r_ = range(
263 0 if not bom_or_sig_available else len(sig_payload),
264 length,
265 int(length / steps),
266 )
268 multi_byte_bonus: bool = (
269 is_multi_byte_decoder
270 and decoded_payload is not None
271 and len(decoded_payload) < length
272 )
274 if multi_byte_bonus:
275 logger.log(
276 TRACE,
277 "Code page %s is a multi byte encoding table and it appear that at least one character "
278 "was encoded using n-bytes.",
279 encoding_iana,
280 )
282 max_chunk_gave_up: int = int(len(r_) / 4)
284 max_chunk_gave_up = max(max_chunk_gave_up, 2)
285 early_stop_count: int = 0
286 lazy_str_hard_failure = False
288 md_chunks: List[str] = []
289 md_ratios = []
291 try:
292 for chunk in cut_sequence_chunks(
293 sequences,
294 encoding_iana,
295 r_,
296 chunk_size,
297 bom_or_sig_available,
298 strip_sig_or_bom,
299 sig_payload,
300 is_multi_byte_decoder,
301 decoded_payload,
302 ):
303 md_chunks.append(chunk)
305 md_ratios.append(
306 mess_ratio(
307 chunk,
308 threshold,
309 explain is True and 1 <= len(cp_isolation) <= 2,
310 )
311 )
313 if md_ratios[-1] >= threshold:
314 early_stop_count += 1
316 if (early_stop_count >= max_chunk_gave_up) or (
317 bom_or_sig_available and strip_sig_or_bom is False
318 ):
319 break
320 except (
321 UnicodeDecodeError
322 ) as e: # Lazy str loading may have missed something there
323 logger.log(
324 TRACE,
325 "LazyStr Loading: After MD chunk decode, code page %s does not fit given bytes sequence at ALL. %s",
326 encoding_iana,
327 str(e),
328 )
329 early_stop_count = max_chunk_gave_up
330 lazy_str_hard_failure = True
332 # We might want to check the sequence again with the whole content
333 # Only if initial MD tests passes
334 if (
335 not lazy_str_hard_failure
336 and is_too_large_sequence
337 and not is_multi_byte_decoder
338 ):
339 try:
340 sequences[int(50e3) :].decode(encoding_iana, errors="strict")
341 except UnicodeDecodeError as e:
342 logger.log(
343 TRACE,
344 "LazyStr Loading: After final lookup, code page %s does not fit given bytes sequence at ALL. %s",
345 encoding_iana,
346 str(e),
347 )
348 tested_but_hard_failure.append(encoding_iana)
349 continue
351 mean_mess_ratio: float = sum(md_ratios) / len(md_ratios) if md_ratios else 0.0
352 if mean_mess_ratio >= threshold or early_stop_count >= max_chunk_gave_up:
353 tested_but_soft_failure.append(encoding_iana)
354 logger.log(
355 TRACE,
356 "%s was excluded because of initial chaos probing. Gave up %i time(s). "
357 "Computed mean chaos is %f %%.",
358 encoding_iana,
359 early_stop_count,
360 round(mean_mess_ratio * 100, ndigits=3),
361 )
362 # Preparing those fallbacks in case we got nothing.
363 if (
364 encoding_iana in ["ascii", "utf_8", specified_encoding]
365 and not lazy_str_hard_failure
366 ):
367 fallback_entry = CharsetMatch(
368 sequences, encoding_iana, threshold, False, [], decoded_payload
369 )
370 if encoding_iana == specified_encoding:
371 fallback_specified = fallback_entry
372 elif encoding_iana == "ascii":
373 fallback_ascii = fallback_entry
374 else:
375 fallback_u8 = fallback_entry
376 continue
378 logger.log(
379 TRACE,
380 "%s passed initial chaos probing. Mean measured chaos is %f %%",
381 encoding_iana,
382 round(mean_mess_ratio * 100, ndigits=3),
383 )
385 if not is_multi_byte_decoder:
386 target_languages: List[str] = encoding_languages(encoding_iana)
387 else:
388 target_languages = mb_encoding_languages(encoding_iana)
390 if target_languages:
391 logger.log(
392 TRACE,
393 "{} should target any language(s) of {}".format(
394 encoding_iana, str(target_languages)
395 ),
396 )
398 cd_ratios = []
400 # We shall skip the CD when its about ASCII
401 # Most of the time its not relevant to run "language-detection" on it.
402 if encoding_iana != "ascii":
403 for chunk in md_chunks:
404 chunk_languages = coherence_ratio(
405 chunk,
406 language_threshold,
407 ",".join(target_languages) if target_languages else None,
408 )
410 cd_ratios.append(chunk_languages)
412 cd_ratios_merged = merge_coherence_ratios(cd_ratios)
414 if cd_ratios_merged:
415 logger.log(
416 TRACE,
417 "We detected language {} using {}".format(
418 cd_ratios_merged, encoding_iana
419 ),
420 )
422 results.append(
423 CharsetMatch(
424 sequences,
425 encoding_iana,
426 mean_mess_ratio,
427 bom_or_sig_available,
428 cd_ratios_merged,
429 decoded_payload,
430 )
431 )
433 if (
434 encoding_iana in [specified_encoding, "ascii", "utf_8"]
435 and mean_mess_ratio < 0.1
436 ):
437 logger.debug(
438 "Encoding detection: %s is most likely the one.", encoding_iana
439 )
440 if explain:
441 logger.removeHandler(explain_handler)
442 logger.setLevel(previous_logger_level)
443 return CharsetMatches([results[encoding_iana]])
445 if encoding_iana == sig_encoding:
446 logger.debug(
447 "Encoding detection: %s is most likely the one as we detected a BOM or SIG within "
448 "the beginning of the sequence.",
449 encoding_iana,
450 )
451 if explain:
452 logger.removeHandler(explain_handler)
453 logger.setLevel(previous_logger_level)
454 return CharsetMatches([results[encoding_iana]])
456 if len(results) == 0:
457 if fallback_u8 or fallback_ascii or fallback_specified:
458 logger.log(
459 TRACE,
460 "Nothing got out of the detection process. Using ASCII/UTF-8/Specified fallback.",
461 )
463 if fallback_specified:
464 logger.debug(
465 "Encoding detection: %s will be used as a fallback match",
466 fallback_specified.encoding,
467 )
468 results.append(fallback_specified)
469 elif (
470 (fallback_u8 and fallback_ascii is None)
471 or (
472 fallback_u8
473 and fallback_ascii
474 and fallback_u8.fingerprint != fallback_ascii.fingerprint
475 )
476 or (fallback_u8 is not None)
477 ):
478 logger.debug("Encoding detection: utf_8 will be used as a fallback match")
479 results.append(fallback_u8)
480 elif fallback_ascii:
481 logger.debug("Encoding detection: ascii will be used as a fallback match")
482 results.append(fallback_ascii)
484 if results:
485 logger.debug(
486 "Encoding detection: Found %s as plausible (best-candidate) for content. With %i alternatives.",
487 results.best().encoding, # type: ignore
488 len(results) - 1,
489 )
490 else:
491 logger.debug("Encoding detection: Unable to determine any suitable charset.")
493 if explain:
494 logger.removeHandler(explain_handler)
495 logger.setLevel(previous_logger_level)
497 return results
500def from_fp(
501 fp: BinaryIO,
502 steps: int = 5,
503 chunk_size: int = 512,
504 threshold: float = 0.20,
505 cp_isolation: Optional[List[str]] = None,
506 cp_exclusion: Optional[List[str]] = None,
507 preemptive_behaviour: bool = True,
508 explain: bool = False,
509 language_threshold: float = 0.1,
510) -> CharsetMatches:
511 """
512 Same thing than the function from_bytes but using a file pointer that is already ready.
513 Will not close the file pointer.
514 """
515 return from_bytes(
516 fp.read(),
517 steps,
518 chunk_size,
519 threshold,
520 cp_isolation,
521 cp_exclusion,
522 preemptive_behaviour,
523 explain,
524 language_threshold,
525 )
528def from_path(
529 path: "PathLike[Any]",
530 steps: int = 5,
531 chunk_size: int = 512,
532 threshold: float = 0.20,
533 cp_isolation: Optional[List[str]] = None,
534 cp_exclusion: Optional[List[str]] = None,
535 preemptive_behaviour: bool = True,
536 explain: bool = False,
537 language_threshold: float = 0.1,
538) -> CharsetMatches:
539 """
540 Same thing than the function from_bytes but with one extra step. Opening and reading given file path in binary mode.
541 Can raise IOError.
542 """
543 with open(path, "rb") as fp:
544 return from_fp(
545 fp,
546 steps,
547 chunk_size,
548 threshold,
549 cp_isolation,
550 cp_exclusion,
551 preemptive_behaviour,
552 explain,
553 language_threshold,
554 )