Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/charset_normalizer/api.py: 8%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

297 statements  

1from __future__ import annotations 

2 

3import logging 

4from os import PathLike 

5from typing import BinaryIO 

6 

7from .cd import ( 

8 coherence_ratio, 

9 encoding_languages, 

10 mb_encoding_languages, 

11 merge_coherence_ratios, 

12) 

13from .constant import ( 

14 IANA_SUPPORTED, 

15 IANA_SUPPORTED_SIMILAR, 

16 TOO_BIG_SEQUENCE, 

17 TOO_SMALL_SEQUENCE, 

18 TRACE, 

19) 

20from .md import mess_ratio 

21from .models import CharsetMatch, CharsetMatches 

22from .utils import ( 

23 any_specified_encoding, 

24 cut_sequence_chunks, 

25 iana_name, 

26 identify_sig_or_bom, 

27 is_multi_byte_encoding, 

28 should_strip_sig_or_bom, 

29) 

30 

31logger = logging.getLogger("charset_normalizer") 

32explain_handler = logging.StreamHandler() 

33explain_handler.setFormatter( 

34 logging.Formatter("%(asctime)s | %(levelname)s | %(message)s") 

35) 

36 

37# Pre-compute a reordered encoding list: multibyte first, then single-byte. 

38# This allows the mb_definitive_match optimization to fire earlier, skipping 

39# all single-byte encodings for genuine CJK content. Multibyte codecs 

40# hard-fail (UnicodeDecodeError) on single-byte data almost instantly, so 

41# testing them first costs negligible time for non-CJK files. 

42_mb_supported: list[str] = [] 

43_sb_supported: list[str] = [] 

44 

45for _supported_enc in IANA_SUPPORTED: 

46 try: 

47 if is_multi_byte_encoding(_supported_enc): 

48 _mb_supported.append(_supported_enc) 

49 else: 

50 _sb_supported.append(_supported_enc) 

51 except ImportError: 

52 _sb_supported.append(_supported_enc) 

53 

54IANA_SUPPORTED_MB_FIRST: list[str] = _mb_supported + _sb_supported 

55 

56 

57def from_bytes( 

58 sequences: bytes | bytearray, 

59 steps: int = 5, 

60 chunk_size: int = 512, 

61 threshold: float = 0.2, 

62 cp_isolation: list[str] | None = None, 

63 cp_exclusion: list[str] | None = None, 

64 preemptive_behaviour: bool = True, 

65 explain: bool = False, 

66 language_threshold: float = 0.1, 

67 enable_fallback: bool = True, 

68) -> CharsetMatches: 

69 """ 

70 Given a raw bytes sequence, return the best possibles charset usable to render str objects. 

71 If there is no results, it is a strong indicator that the source is binary/not text. 

72 By default, the process will extract 5 blocks of 512o each to assess the mess and coherence of a given sequence. 

73 And will give up a particular code page after 20% of measured mess. Those criteria are customizable at will. 

74 

75 The preemptive behavior DOES NOT replace the traditional detection workflow, it prioritize a particular code page 

76 but never take it for granted. Can improve the performance. 

77 

78 You may want to focus your attention to some code page or/and not others, use cp_isolation and cp_exclusion for that 

79 purpose. 

80 

81 This function will strip the SIG in the payload/sequence every time except on UTF-16, UTF-32. 

82 By default the library does not setup any handler other than the NullHandler, if you choose to set the 'explain' 

83 toggle to True it will alter the logger configuration to add a StreamHandler that is suitable for debugging. 

84 Custom logging format and handler can be set manually. 

85 """ 

86 

87 if not isinstance(sequences, (bytearray, bytes)): 

88 raise TypeError( 

89 "Expected object of type bytes or bytearray, got: {}".format( 

90 type(sequences) 

91 ) 

92 ) 

93 

94 if explain: 

95 previous_logger_level: int = logger.level 

96 logger.addHandler(explain_handler) 

97 logger.setLevel(TRACE) 

98 

99 length: int = len(sequences) 

100 

101 if length == 0: 

102 logger.debug("Encoding detection on empty bytes, assuming utf_8 intention.") 

103 if explain: # Defensive: ensure exit path clean handler 

104 logger.removeHandler(explain_handler) 

105 logger.setLevel(previous_logger_level) 

106 return CharsetMatches([CharsetMatch(sequences, "utf_8", 0.0, False, [], "")]) 

107 

108 if cp_isolation is not None: 

109 logger.log( 

110 TRACE, 

111 "cp_isolation is set. use this flag for debugging purpose. " 

112 "limited list of encoding allowed : %s.", 

113 ", ".join(cp_isolation), 

114 ) 

115 cp_isolation = [iana_name(cp, False) for cp in cp_isolation] 

116 else: 

117 cp_isolation = [] 

118 

119 if cp_exclusion is not None: 

120 logger.log( 

121 TRACE, 

122 "cp_exclusion is set. use this flag for debugging purpose. " 

123 "limited list of encoding excluded : %s.", 

124 ", ".join(cp_exclusion), 

125 ) 

126 cp_exclusion = [iana_name(cp, False) for cp in cp_exclusion] 

127 else: 

128 cp_exclusion = [] 

129 

130 if length <= (chunk_size * steps): 

131 logger.log( 

132 TRACE, 

133 "override steps (%i) and chunk_size (%i) as content does not fit (%i byte(s) given) parameters.", 

134 steps, 

135 chunk_size, 

136 length, 

137 ) 

138 steps = 1 

139 chunk_size = length 

140 

141 if steps > 1 and length / steps < chunk_size: 

142 chunk_size = int(length / steps) 

143 

144 is_too_small_sequence: bool = len(sequences) < TOO_SMALL_SEQUENCE 

145 is_too_large_sequence: bool = len(sequences) >= TOO_BIG_SEQUENCE 

146 

147 if is_too_small_sequence: 

148 logger.log( 

149 TRACE, 

150 "Trying to detect encoding from a tiny portion of ({}) byte(s).".format( 

151 length 

152 ), 

153 ) 

154 elif is_too_large_sequence: 

155 logger.log( 

156 TRACE, 

157 "Using lazy str decoding because the payload is quite large, ({}) byte(s).".format( 

158 length 

159 ), 

160 ) 

161 

162 prioritized_encodings: list[str] = [] 

163 

164 specified_encoding: str | None = ( 

165 any_specified_encoding(sequences) if preemptive_behaviour else None 

166 ) 

167 

168 if specified_encoding is not None: 

169 prioritized_encodings.append(specified_encoding) 

170 logger.log( 

171 TRACE, 

172 "Detected declarative mark in sequence. Priority +1 given for %s.", 

173 specified_encoding, 

174 ) 

175 

176 tested: set[str] = set() 

177 tested_but_hard_failure: list[str] = [] 

178 tested_but_soft_failure: list[str] = [] 

179 soft_failure_skip: set[str] = set() 

180 success_fast_tracked: set[str] = set() 

181 

182 # Cache for decoded payload deduplication: hash(decoded_payload) -> (mean_mess_ratio, cd_ratios_merged, passed) 

183 # When multiple encodings decode to the exact same string, we can skip the expensive 

184 # mess_ratio and coherence_ratio analysis and reuse the results from the first encoding. 

185 payload_result_cache: dict[int, tuple[float, list[tuple[str, float]], bool]] = {} 

186 

187 # When a definitive result (chaos=0.0 and good coherence) is found after testing 

188 # the prioritized encodings (ascii, utf_8), we can significantly reduce the remaining 

189 # work. Encodings that target completely different language families (e.g., Cyrillic 

190 # when the definitive match is Latin) are skipped entirely. 

191 # Additionally, for same-family encodings that pass chaos probing, we reuse the 

192 # definitive match's coherence ratios instead of recomputing them — a major savings 

193 # since coherence_ratio accounts for ~30% of total time on slow Latin files. 

194 definitive_match_found: bool = False 

195 definitive_target_languages: set[str] = set() 

196 # After the definitive match fires, we cap the number of additional same-family 

197 # single-byte encodings that pass chaos probing. Once we've accumulated enough 

198 # good candidates (N), further same-family SB encodings are unlikely to produce 

199 # a better best() result and just waste mess_ratio + coherence_ratio time. 

200 # The first encoding to trigger the definitive match is NOT counted (it's already in). 

201 post_definitive_sb_success_count: int = 0 

202 POST_DEFINITIVE_SB_CAP: int = 7 

203 

204 # When a non-UTF multibyte encoding passes chaos probing with significant multibyte 

205 # content (decoded length < 98% of raw length), skip all remaining single-byte encodings. 

206 # Rationale: multi-byte decoders (CJK) have strict byte-sequence validation — if they 

207 # decode without error AND pass chaos probing with substantial multibyte content, the 

208 # data is genuinely multibyte encoded. Single-byte encodings will always decode (every 

209 # byte maps to something) but waste time on mess_ratio before failing. 

210 # The 98% threshold prevents false triggers on files that happen to have a few valid 

211 # multibyte pairs (e.g., cp424/_ude_1.txt where big5 decodes with 99% ratio). 

212 mb_definitive_match_found: bool = False 

213 

214 fallback_ascii: CharsetMatch | None = None 

215 fallback_u8: CharsetMatch | None = None 

216 fallback_specified: CharsetMatch | None = None 

217 

218 results: CharsetMatches = CharsetMatches() 

219 

220 early_stop_results: CharsetMatches = CharsetMatches() 

221 

222 sig_encoding, sig_payload = identify_sig_or_bom(sequences) 

223 

224 if sig_encoding is not None: 

225 prioritized_encodings.append(sig_encoding) 

226 logger.log( 

227 TRACE, 

228 "Detected a SIG or BOM mark on first %i byte(s). Priority +1 given for %s.", 

229 len(sig_payload), 

230 sig_encoding, 

231 ) 

232 

233 prioritized_encodings.append("ascii") 

234 

235 if "utf_8" not in prioritized_encodings: 

236 prioritized_encodings.append("utf_8") 

237 

238 for encoding_iana in prioritized_encodings + IANA_SUPPORTED_MB_FIRST: 

239 if cp_isolation and encoding_iana not in cp_isolation: 

240 continue 

241 

242 if cp_exclusion and encoding_iana in cp_exclusion: 

243 continue 

244 

245 if encoding_iana in tested: 

246 continue 

247 

248 tested.add(encoding_iana) 

249 

250 decoded_payload: str | None = None 

251 bom_or_sig_available: bool = sig_encoding == encoding_iana 

252 strip_sig_or_bom: bool = bom_or_sig_available and should_strip_sig_or_bom( 

253 encoding_iana 

254 ) 

255 

256 if encoding_iana in {"utf_16", "utf_32"} and not bom_or_sig_available: 

257 logger.log( 

258 TRACE, 

259 "Encoding %s won't be tested as-is because it require a BOM. Will try some sub-encoder LE/BE.", 

260 encoding_iana, 

261 ) 

262 continue 

263 if encoding_iana in {"utf_7"} and not bom_or_sig_available: 

264 logger.log( 

265 TRACE, 

266 "Encoding %s won't be tested as-is because detection is unreliable without BOM/SIG.", 

267 encoding_iana, 

268 ) 

269 continue 

270 

271 # Skip encodings similar to ones that already soft-failed (high mess ratio). 

272 # Checked BEFORE the expensive decode attempt. 

273 if encoding_iana in soft_failure_skip: 

274 logger.log( 

275 TRACE, 

276 "%s is deemed too similar to a code page that was already considered unsuited. Continuing!", 

277 encoding_iana, 

278 ) 

279 continue 

280 

281 # Skip encodings that were already fast-tracked from a similar successful encoding. 

282 if encoding_iana in success_fast_tracked: 

283 logger.log( 

284 TRACE, 

285 "Skipping %s: already fast-tracked from a similar successful encoding.", 

286 encoding_iana, 

287 ) 

288 continue 

289 

290 try: 

291 is_multi_byte_decoder: bool = is_multi_byte_encoding(encoding_iana) 

292 except (ModuleNotFoundError, ImportError): # Defensive: 

293 logger.log( 

294 TRACE, 

295 "Encoding %s does not provide an IncrementalDecoder", 

296 encoding_iana, 

297 ) 

298 continue 

299 

300 # When we've already found a definitive match (chaos=0.0 with good coherence) 

301 # after testing the prioritized encodings, skip encodings that target 

302 # completely different language families. This avoids running expensive 

303 # mess_ratio + coherence_ratio on clearly unrelated candidates (e.g., Cyrillic 

304 # when the definitive match is Latin-based). 

305 if definitive_match_found: 

306 if not is_multi_byte_decoder: 

307 enc_languages = set(encoding_languages(encoding_iana)) 

308 else: 

309 enc_languages = set(mb_encoding_languages(encoding_iana)) 

310 if not enc_languages.intersection(definitive_target_languages): 

311 logger.log( 

312 TRACE, 

313 "Skipping %s: definitive match already found, this encoding targets different languages (%s vs %s).", 

314 encoding_iana, 

315 enc_languages, 

316 definitive_target_languages, 

317 ) 

318 continue 

319 

320 # After the definitive match, cap the number of additional same-family 

321 # single-byte encodings that pass chaos probing. This avoids testing the 

322 # tail of rare, low-value same-family encodings (mac_iceland, cp860, etc.) 

323 # that almost never change best() but each cost ~1-2ms of mess_ratio + coherence. 

324 if ( 

325 definitive_match_found 

326 and not is_multi_byte_decoder 

327 and post_definitive_sb_success_count >= POST_DEFINITIVE_SB_CAP 

328 ): 

329 logger.log( 

330 TRACE, 

331 "Skipping %s: already accumulated %d same-family results after definitive match (cap=%d).", 

332 encoding_iana, 

333 post_definitive_sb_success_count, 

334 POST_DEFINITIVE_SB_CAP, 

335 ) 

336 continue 

337 

338 # When a multibyte encoding with significant multibyte content has already 

339 # passed chaos probing, skip all single-byte encodings. They will either fail 

340 # chaos probing (wasting mess_ratio time) or produce inferior results. 

341 if mb_definitive_match_found and not is_multi_byte_decoder: 

342 logger.log( 

343 TRACE, 

344 "Skipping single-byte %s: multi-byte definitive match already found.", 

345 encoding_iana, 

346 ) 

347 continue 

348 

349 try: 

350 if is_too_large_sequence and is_multi_byte_decoder is False: 

351 str( 

352 ( 

353 sequences[: int(50e4)] 

354 if strip_sig_or_bom is False 

355 else sequences[len(sig_payload) : int(50e4)] 

356 ), 

357 encoding=encoding_iana, 

358 ) 

359 else: 

360 decoded_payload = str( 

361 ( 

362 sequences 

363 if strip_sig_or_bom is False 

364 else sequences[len(sig_payload) :] 

365 ), 

366 encoding=encoding_iana, 

367 ) 

368 except (UnicodeDecodeError, LookupError) as e: 

369 if not isinstance(e, LookupError): 

370 logger.log( 

371 TRACE, 

372 "Code page %s does not fit given bytes sequence at ALL. %s", 

373 encoding_iana, 

374 str(e), 

375 ) 

376 tested_but_hard_failure.append(encoding_iana) 

377 continue 

378 

379 r_ = range( 

380 0 if not bom_or_sig_available else len(sig_payload), 

381 length, 

382 int(length / steps), 

383 ) 

384 

385 multi_byte_bonus: bool = ( 

386 is_multi_byte_decoder 

387 and decoded_payload is not None 

388 and len(decoded_payload) < length 

389 ) 

390 

391 if multi_byte_bonus: 

392 logger.log( 

393 TRACE, 

394 "Code page %s is a multi byte encoding table and it appear that at least one character " 

395 "was encoded using n-bytes.", 

396 encoding_iana, 

397 ) 

398 

399 # Payload-hash deduplication: if another encoding already decoded to the 

400 # exact same string, reuse its mess_ratio and coherence results entirely. 

401 # This is strictly more general than the old IANA_SUPPORTED_SIMILAR approach 

402 # because it catches ALL identical decoding, not just pre-mapped ones. 

403 if decoded_payload is not None and not is_multi_byte_decoder: 

404 payload_hash: int = hash(decoded_payload) 

405 cached = payload_result_cache.get(payload_hash) 

406 if cached is not None: 

407 cached_mess, cached_cd, cached_passed = cached 

408 if cached_passed: 

409 # The previous encoding with identical output passed chaos probing. 

410 fast_match = CharsetMatch( 

411 sequences, 

412 encoding_iana, 

413 cached_mess, 

414 bom_or_sig_available, 

415 cached_cd, 

416 ( 

417 decoded_payload 

418 if ( 

419 is_too_large_sequence is False 

420 or encoding_iana 

421 in [specified_encoding, "ascii", "utf_8"] 

422 ) 

423 else None 

424 ), 

425 preemptive_declaration=specified_encoding, 

426 ) 

427 results.append(fast_match) 

428 success_fast_tracked.add(encoding_iana) 

429 logger.log( 

430 TRACE, 

431 "%s fast-tracked (identical decoded payload to a prior encoding, chaos=%f %%).", 

432 encoding_iana, 

433 round(cached_mess * 100, ndigits=3), 

434 ) 

435 

436 if ( 

437 encoding_iana in [specified_encoding, "ascii", "utf_8"] 

438 and cached_mess < 0.1 

439 ): 

440 if cached_mess == 0.0: 

441 logger.debug( 

442 "Encoding detection: %s is most likely the one.", 

443 fast_match.encoding, 

444 ) 

445 if explain: 

446 logger.removeHandler(explain_handler) 

447 logger.setLevel(previous_logger_level) 

448 return CharsetMatches([fast_match]) 

449 early_stop_results.append(fast_match) 

450 

451 if ( 

452 len(early_stop_results) 

453 and (specified_encoding is None or specified_encoding in tested) 

454 and "ascii" in tested 

455 and "utf_8" in tested 

456 ): 

457 probable_result: CharsetMatch = early_stop_results.best() # type: ignore[assignment] 

458 logger.debug( 

459 "Encoding detection: %s is most likely the one.", 

460 probable_result.encoding, 

461 ) 

462 if explain: 

463 logger.removeHandler(explain_handler) 

464 logger.setLevel(previous_logger_level) 

465 return CharsetMatches([probable_result]) 

466 

467 continue 

468 else: 

469 # The previous encoding with identical output failed chaos probing. 

470 tested_but_soft_failure.append(encoding_iana) 

471 logger.log( 

472 TRACE, 

473 "%s fast-skipped (identical decoded payload to a prior encoding that failed chaos probing).", 

474 encoding_iana, 

475 ) 

476 # Prepare fallbacks for special encodings even when skipped. 

477 if enable_fallback and encoding_iana in [ 

478 "ascii", 

479 "utf_8", 

480 specified_encoding, 

481 "utf_16", 

482 "utf_32", 

483 ]: 

484 fallback_entry = CharsetMatch( 

485 sequences, 

486 encoding_iana, 

487 threshold, 

488 bom_or_sig_available, 

489 [], 

490 decoded_payload, 

491 preemptive_declaration=specified_encoding, 

492 ) 

493 if encoding_iana == specified_encoding: 

494 fallback_specified = fallback_entry 

495 elif encoding_iana == "ascii": 

496 fallback_ascii = fallback_entry 

497 else: 

498 fallback_u8 = fallback_entry 

499 continue 

500 

501 max_chunk_gave_up: int = int(len(r_) / 4) 

502 

503 max_chunk_gave_up = max(max_chunk_gave_up, 2) 

504 early_stop_count: int = 0 

505 lazy_str_hard_failure = False 

506 

507 md_chunks: list[str] = [] 

508 md_ratios = [] 

509 

510 try: 

511 for chunk in cut_sequence_chunks( 

512 sequences, 

513 encoding_iana, 

514 r_, 

515 chunk_size, 

516 bom_or_sig_available, 

517 strip_sig_or_bom, 

518 sig_payload, 

519 is_multi_byte_decoder, 

520 decoded_payload, 

521 ): 

522 md_chunks.append(chunk) 

523 

524 md_ratios.append( 

525 mess_ratio( 

526 chunk, 

527 threshold, 

528 explain is True and 1 <= len(cp_isolation) <= 2, 

529 ) 

530 ) 

531 

532 if md_ratios[-1] >= threshold: 

533 early_stop_count += 1 

534 

535 if (early_stop_count >= max_chunk_gave_up) or ( 

536 bom_or_sig_available and strip_sig_or_bom is False 

537 ): 

538 break 

539 except ( 

540 UnicodeDecodeError 

541 ) as e: # Lazy str loading may have missed something there 

542 logger.log( 

543 TRACE, 

544 "LazyStr Loading: After MD chunk decode, code page %s does not fit given bytes sequence at ALL. %s", 

545 encoding_iana, 

546 str(e), 

547 ) 

548 early_stop_count = max_chunk_gave_up 

549 lazy_str_hard_failure = True 

550 

551 # We might want to check the sequence again with the whole content 

552 # Only if initial MD tests passes 

553 if ( 

554 not lazy_str_hard_failure 

555 and is_too_large_sequence 

556 and not is_multi_byte_decoder 

557 ): 

558 try: 

559 sequences[int(50e3) :].decode(encoding_iana, errors="strict") 

560 except UnicodeDecodeError as e: 

561 logger.log( 

562 TRACE, 

563 "LazyStr Loading: After final lookup, code page %s does not fit given bytes sequence at ALL. %s", 

564 encoding_iana, 

565 str(e), 

566 ) 

567 tested_but_hard_failure.append(encoding_iana) 

568 continue 

569 

570 mean_mess_ratio: float = sum(md_ratios) / len(md_ratios) if md_ratios else 0.0 

571 if mean_mess_ratio >= threshold or early_stop_count >= max_chunk_gave_up: 

572 tested_but_soft_failure.append(encoding_iana) 

573 if encoding_iana in IANA_SUPPORTED_SIMILAR: 

574 soft_failure_skip.update(IANA_SUPPORTED_SIMILAR[encoding_iana]) 

575 # Cache this soft-failure so identical decoding from other encodings 

576 # can be skipped immediately. 

577 if decoded_payload is not None and not is_multi_byte_decoder: 

578 payload_result_cache.setdefault( 

579 hash(decoded_payload), (mean_mess_ratio, [], False) 

580 ) 

581 logger.log( 

582 TRACE, 

583 "%s was excluded because of initial chaos probing. Gave up %i time(s). " 

584 "Computed mean chaos is %f %%.", 

585 encoding_iana, 

586 early_stop_count, 

587 round(mean_mess_ratio * 100, ndigits=3), 

588 ) 

589 # Preparing those fallbacks in case we got nothing. 

590 if ( 

591 enable_fallback 

592 and encoding_iana 

593 in ["ascii", "utf_8", specified_encoding, "utf_16", "utf_32"] 

594 and not lazy_str_hard_failure 

595 ): 

596 fallback_entry = CharsetMatch( 

597 sequences, 

598 encoding_iana, 

599 threshold, 

600 bom_or_sig_available, 

601 [], 

602 decoded_payload, 

603 preemptive_declaration=specified_encoding, 

604 ) 

605 if encoding_iana == specified_encoding: 

606 fallback_specified = fallback_entry 

607 elif encoding_iana == "ascii": 

608 fallback_ascii = fallback_entry 

609 else: 

610 fallback_u8 = fallback_entry 

611 continue 

612 

613 logger.log( 

614 TRACE, 

615 "%s passed initial chaos probing. Mean measured chaos is %f %%", 

616 encoding_iana, 

617 round(mean_mess_ratio * 100, ndigits=3), 

618 ) 

619 

620 if not is_multi_byte_decoder: 

621 target_languages: list[str] = encoding_languages(encoding_iana) 

622 else: 

623 target_languages = mb_encoding_languages(encoding_iana) 

624 

625 if target_languages: 

626 logger.log( 

627 TRACE, 

628 "{} should target any language(s) of {}".format( 

629 encoding_iana, str(target_languages) 

630 ), 

631 ) 

632 

633 cd_ratios = [] 

634 

635 # Run coherence detection on all chunks. We previously tried limiting to 

636 # 1-2 chunks for post-definitive encodings to save time, but this caused 

637 # coverage regressions by producing unrepresentative coherence scores. 

638 # The SB cap and language-family skip optimizations provide sufficient 

639 # speedup without sacrificing coherence accuracy. 

640 if encoding_iana != "ascii": 

641 # We shall skip the CD when its about ASCII 

642 # Most of the time its not relevant to run "language-detection" on it. 

643 for chunk in md_chunks: 

644 chunk_languages = coherence_ratio( 

645 chunk, 

646 language_threshold, 

647 ",".join(target_languages) if target_languages else None, 

648 ) 

649 

650 cd_ratios.append(chunk_languages) 

651 cd_ratios_merged = merge_coherence_ratios(cd_ratios) 

652 else: 

653 cd_ratios_merged = merge_coherence_ratios(cd_ratios) 

654 

655 if cd_ratios_merged: 

656 logger.log( 

657 TRACE, 

658 "We detected language {} using {}".format( 

659 cd_ratios_merged, encoding_iana 

660 ), 

661 ) 

662 

663 current_match = CharsetMatch( 

664 sequences, 

665 encoding_iana, 

666 mean_mess_ratio, 

667 bom_or_sig_available, 

668 cd_ratios_merged, 

669 ( 

670 decoded_payload 

671 if ( 

672 is_too_large_sequence is False 

673 or encoding_iana in [specified_encoding, "ascii", "utf_8"] 

674 ) 

675 else None 

676 ), 

677 preemptive_declaration=specified_encoding, 

678 ) 

679 

680 results.append(current_match) 

681 

682 # Cache the successful result for payload-hash deduplication. 

683 if decoded_payload is not None and not is_multi_byte_decoder: 

684 payload_result_cache.setdefault( 

685 hash(decoded_payload), 

686 (mean_mess_ratio, cd_ratios_merged, True), 

687 ) 

688 

689 # Count post-definitive same-family SB successes for the early termination cap. 

690 # Only count low-mess encodings (< 2%) toward the cap. High-mess encodings are 

691 # marginal results that shouldn't prevent better-quality candidates from being 

692 # tested. For example, iso8859_4 (mess=0%) should not be skipped just because 

693 # 7 high-mess Latin encodings (cp1252 at 8%, etc.) were tried first. 

694 if ( 

695 definitive_match_found 

696 and not is_multi_byte_decoder 

697 and mean_mess_ratio < 0.02 

698 ): 

699 post_definitive_sb_success_count += 1 

700 

701 if ( 

702 encoding_iana in [specified_encoding, "ascii", "utf_8"] 

703 and mean_mess_ratio < 0.1 

704 ): 

705 # If md says nothing to worry about, then... stop immediately! 

706 if mean_mess_ratio == 0.0: 

707 logger.debug( 

708 "Encoding detection: %s is most likely the one.", 

709 current_match.encoding, 

710 ) 

711 if explain: # Defensive: ensure exit path clean handler 

712 logger.removeHandler(explain_handler) 

713 logger.setLevel(previous_logger_level) 

714 return CharsetMatches([current_match]) 

715 

716 early_stop_results.append(current_match) 

717 

718 if ( 

719 len(early_stop_results) 

720 and (specified_encoding is None or specified_encoding in tested) 

721 and "ascii" in tested 

722 and "utf_8" in tested 

723 ): 

724 probable_result = early_stop_results.best() # type: ignore[assignment] 

725 logger.debug( 

726 "Encoding detection: %s is most likely the one.", 

727 probable_result.encoding, # type: ignore[union-attr] 

728 ) 

729 if explain: # Defensive: ensure exit path clean handler 

730 logger.removeHandler(explain_handler) 

731 logger.setLevel(previous_logger_level) 

732 

733 return CharsetMatches([probable_result]) 

734 

735 # Once we find a result with good coherence (>= 0.5) after testing the 

736 # prioritized encodings (ascii, utf_8), activate "definitive mode": skip 

737 # encodings that target completely different language families. This avoids 

738 # running expensive mess_ratio + coherence_ratio on clearly unrelated 

739 # candidates (e.g., Cyrillic encodings when the match is Latin-based). 

740 # We require coherence >= 0.5 to avoid false positives (e.g., cp1251 decoding 

741 # Hebrew text with 0.0 chaos but wrong language detection at coherence 0.33). 

742 if not definitive_match_found and not is_multi_byte_decoder: 

743 best_coherence = ( 

744 max((v for _, v in cd_ratios_merged), default=0.0) 

745 if cd_ratios_merged 

746 else 0.0 

747 ) 

748 if best_coherence >= 0.5 and "ascii" in tested and "utf_8" in tested: 

749 definitive_match_found = True 

750 definitive_target_languages.update(target_languages) 

751 logger.log( 

752 TRACE, 

753 "Definitive match found: %s (chaos=%.3f, coherence=%.2f). Encodings targeting different language families will be skipped.", 

754 encoding_iana, 

755 mean_mess_ratio, 

756 best_coherence, 

757 ) 

758 

759 # When a non-UTF multibyte encoding passes chaos probing with significant 

760 # multibyte content (decoded < 98% of raw), activate mb_definitive_match. 

761 # This skips all remaining single-byte encodings which would either soft-fail 

762 # (running expensive mess_ratio for nothing) or produce inferior results. 

763 if ( 

764 not mb_definitive_match_found 

765 and is_multi_byte_decoder 

766 and multi_byte_bonus 

767 and decoded_payload is not None 

768 and len(decoded_payload) < length * 0.98 

769 and encoding_iana 

770 not in { 

771 "utf_8", 

772 "utf_8_sig", 

773 "utf_16", 

774 "utf_16_be", 

775 "utf_16_le", 

776 "utf_32", 

777 "utf_32_be", 

778 "utf_32_le", 

779 "utf_7", 

780 } 

781 and "ascii" in tested 

782 and "utf_8" in tested 

783 ): 

784 mb_definitive_match_found = True 

785 logger.log( 

786 TRACE, 

787 "Multi-byte definitive match: %s (chaos=%.3f, decoded=%d/%d=%.1f%%). Single-byte encodings will be skipped.", 

788 encoding_iana, 

789 mean_mess_ratio, 

790 len(decoded_payload), 

791 length, 

792 len(decoded_payload) / length * 100, 

793 ) 

794 

795 if encoding_iana == sig_encoding: 

796 logger.debug( 

797 "Encoding detection: %s is most likely the one as we detected a BOM or SIG within " 

798 "the beginning of the sequence.", 

799 encoding_iana, 

800 ) 

801 if explain: # Defensive: ensure exit path clean handler 

802 logger.removeHandler(explain_handler) 

803 logger.setLevel(previous_logger_level) 

804 return CharsetMatches([results[encoding_iana]]) 

805 

806 if len(results) == 0: 

807 if fallback_u8 or fallback_ascii or fallback_specified: 

808 logger.log( 

809 TRACE, 

810 "Nothing got out of the detection process. Using ASCII/UTF-8/Specified fallback.", 

811 ) 

812 

813 if fallback_specified: 

814 logger.debug( 

815 "Encoding detection: %s will be used as a fallback match", 

816 fallback_specified.encoding, 

817 ) 

818 results.append(fallback_specified) 

819 elif ( 

820 (fallback_u8 and fallback_ascii is None) 

821 or ( 

822 fallback_u8 

823 and fallback_ascii 

824 and fallback_u8.fingerprint != fallback_ascii.fingerprint 

825 ) 

826 or (fallback_u8 is not None) 

827 ): 

828 logger.debug("Encoding detection: utf_8 will be used as a fallback match") 

829 results.append(fallback_u8) 

830 elif fallback_ascii: 

831 logger.debug("Encoding detection: ascii will be used as a fallback match") 

832 results.append(fallback_ascii) 

833 

834 if results: 

835 logger.debug( 

836 "Encoding detection: Found %s as plausible (best-candidate) for content. With %i alternatives.", 

837 results.best().encoding, # type: ignore 

838 len(results) - 1, 

839 ) 

840 else: 

841 logger.debug("Encoding detection: Unable to determine any suitable charset.") 

842 

843 if explain: 

844 logger.removeHandler(explain_handler) 

845 logger.setLevel(previous_logger_level) 

846 

847 return results 

848 

849 

850def from_fp( 

851 fp: BinaryIO, 

852 steps: int = 5, 

853 chunk_size: int = 512, 

854 threshold: float = 0.20, 

855 cp_isolation: list[str] | None = None, 

856 cp_exclusion: list[str] | None = None, 

857 preemptive_behaviour: bool = True, 

858 explain: bool = False, 

859 language_threshold: float = 0.1, 

860 enable_fallback: bool = True, 

861) -> CharsetMatches: 

862 """ 

863 Same thing than the function from_bytes but using a file pointer that is already ready. 

864 Will not close the file pointer. 

865 """ 

866 return from_bytes( 

867 fp.read(), 

868 steps, 

869 chunk_size, 

870 threshold, 

871 cp_isolation, 

872 cp_exclusion, 

873 preemptive_behaviour, 

874 explain, 

875 language_threshold, 

876 enable_fallback, 

877 ) 

878 

879 

880def from_path( 

881 path: str | bytes | PathLike, # type: ignore[type-arg] 

882 steps: int = 5, 

883 chunk_size: int = 512, 

884 threshold: float = 0.20, 

885 cp_isolation: list[str] | None = None, 

886 cp_exclusion: list[str] | None = None, 

887 preemptive_behaviour: bool = True, 

888 explain: bool = False, 

889 language_threshold: float = 0.1, 

890 enable_fallback: bool = True, 

891) -> CharsetMatches: 

892 """ 

893 Same thing than the function from_bytes but with one extra step. Opening and reading given file path in binary mode. 

894 Can raise IOError. 

895 """ 

896 with open(path, "rb") as fp: 

897 return from_fp( 

898 fp, 

899 steps, 

900 chunk_size, 

901 threshold, 

902 cp_isolation, 

903 cp_exclusion, 

904 preemptive_behaviour, 

905 explain, 

906 language_threshold, 

907 enable_fallback, 

908 ) 

909 

910 

911def is_binary( 

912 fp_or_path_or_payload: PathLike | str | BinaryIO | bytes, # type: ignore[type-arg] 

913 steps: int = 5, 

914 chunk_size: int = 512, 

915 threshold: float = 0.20, 

916 cp_isolation: list[str] | None = None, 

917 cp_exclusion: list[str] | None = None, 

918 preemptive_behaviour: bool = True, 

919 explain: bool = False, 

920 language_threshold: float = 0.1, 

921 enable_fallback: bool = False, 

922) -> bool: 

923 """ 

924 Detect if the given input (file, bytes, or path) points to a binary file. aka. not a string. 

925 Based on the same main heuristic algorithms and default kwargs at the sole exception that fallbacks match 

926 are disabled to be stricter around ASCII-compatible but unlikely to be a string. 

927 """ 

928 if isinstance(fp_or_path_or_payload, (str, PathLike)): 

929 guesses = from_path( 

930 fp_or_path_or_payload, 

931 steps=steps, 

932 chunk_size=chunk_size, 

933 threshold=threshold, 

934 cp_isolation=cp_isolation, 

935 cp_exclusion=cp_exclusion, 

936 preemptive_behaviour=preemptive_behaviour, 

937 explain=explain, 

938 language_threshold=language_threshold, 

939 enable_fallback=enable_fallback, 

940 ) 

941 elif isinstance( 

942 fp_or_path_or_payload, 

943 ( 

944 bytes, 

945 bytearray, 

946 ), 

947 ): 

948 guesses = from_bytes( 

949 fp_or_path_or_payload, 

950 steps=steps, 

951 chunk_size=chunk_size, 

952 threshold=threshold, 

953 cp_isolation=cp_isolation, 

954 cp_exclusion=cp_exclusion, 

955 preemptive_behaviour=preemptive_behaviour, 

956 explain=explain, 

957 language_threshold=language_threshold, 

958 enable_fallback=enable_fallback, 

959 ) 

960 else: 

961 guesses = from_fp( 

962 fp_or_path_or_payload, 

963 steps=steps, 

964 chunk_size=chunk_size, 

965 threshold=threshold, 

966 cp_isolation=cp_isolation, 

967 cp_exclusion=cp_exclusion, 

968 preemptive_behaviour=preemptive_behaviour, 

969 explain=explain, 

970 language_threshold=language_threshold, 

971 enable_fallback=enable_fallback, 

972 ) 

973 

974 return not guesses