Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/charset_normalizer/api.py: 8%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

301 statements  

1from __future__ import annotations 

2 

3import logging 

4from os import PathLike 

5from typing import BinaryIO 

6 

7from .cd import ( 

8 coherence_ratio, 

9 encoding_languages, 

10 mb_encoding_languages, 

11 merge_coherence_ratios, 

12) 

13from .constant import ( 

14 IANA_SUPPORTED, 

15 IANA_SUPPORTED_SIMILAR, 

16 TOO_BIG_SEQUENCE, 

17 TOO_SMALL_SEQUENCE, 

18 TRACE, 

19) 

20from .md import mess_ratio 

21from .models import CharsetMatch, CharsetMatches 

22from .utils import ( 

23 any_specified_encoding, 

24 cut_sequence_chunks, 

25 iana_name, 

26 identify_sig_or_bom, 

27 is_multi_byte_encoding, 

28 should_strip_sig_or_bom, 

29) 

30 

31logger = logging.getLogger("charset_normalizer") 

32explain_handler = logging.StreamHandler() 

33explain_handler.setFormatter( 

34 logging.Formatter("%(asctime)s | %(levelname)s | %(message)s") 

35) 

36 

37# Pre-compute a reordered encoding list: multibyte first, then single-byte. 

38# This allows the mb_definitive_match optimization to fire earlier, skipping 

39# all single-byte encodings for genuine CJK content. Multibyte codecs 

40# hard-fail (UnicodeDecodeError) on single-byte data almost instantly, so 

41# testing them first costs negligible time for non-CJK files. 

42_mb_supported: list[str] = [] 

43_sb_supported: list[str] = [] 

44 

45for _supported_enc in IANA_SUPPORTED: 

46 try: 

47 if is_multi_byte_encoding(_supported_enc): 

48 _mb_supported.append(_supported_enc) 

49 else: 

50 _sb_supported.append(_supported_enc) 

51 except ImportError: 

52 _sb_supported.append(_supported_enc) 

53 

54IANA_SUPPORTED_MB_FIRST: list[str] = _mb_supported + _sb_supported 

55 

56 

57def from_bytes( 

58 sequences: bytes | bytearray, 

59 steps: int = 5, 

60 chunk_size: int = 512, 

61 threshold: float = 0.2, 

62 cp_isolation: list[str] | None = None, 

63 cp_exclusion: list[str] | None = None, 

64 preemptive_behaviour: bool = True, 

65 explain: bool = False, 

66 language_threshold: float = 0.1, 

67 enable_fallback: bool = True, 

68) -> CharsetMatches: 

69 """ 

70 Given a raw bytes sequence, return the best possibles charset usable to render str objects. 

71 If there is no results, it is a strong indicator that the source is binary/not text. 

72 By default, the process will extract 5 blocks of 512o each to assess the mess and coherence of a given sequence. 

73 And will give up a particular code page after 20% of measured mess. Those criteria are customizable at will. 

74 

75 The preemptive behavior DOES NOT replace the traditional detection workflow, it prioritize a particular code page 

76 but never take it for granted. Can improve the performance. 

77 

78 You may want to focus your attention to some code page or/and not others, use cp_isolation and cp_exclusion for that 

79 purpose. 

80 

81 This function will strip the SIG in the payload/sequence every time except on UTF-16, UTF-32. 

82 By default the library does not setup any handler other than the NullHandler, if you choose to set the 'explain' 

83 toggle to True it will alter the logger configuration to add a StreamHandler that is suitable for debugging. 

84 Custom logging format and handler can be set manually. 

85 """ 

86 

87 if not isinstance(sequences, (bytearray, bytes)): 

88 raise TypeError( 

89 "Expected object of type bytes or bytearray, got: {}".format( 

90 type(sequences) 

91 ) 

92 ) 

93 

94 if explain: 

95 previous_logger_level: int = logger.level 

96 logger.addHandler(explain_handler) 

97 logger.setLevel(TRACE) 

98 

99 length: int = len(sequences) 

100 

101 if length == 0: 

102 logger.debug("Encoding detection on empty bytes, assuming utf_8 intention.") 

103 if explain: # Defensive: ensure exit path clean handler 

104 logger.removeHandler(explain_handler) 

105 logger.setLevel(previous_logger_level) 

106 return CharsetMatches([CharsetMatch(sequences, "utf_8", 0.0, False, [], "")]) 

107 

108 if cp_isolation is not None: 

109 logger.log( 

110 TRACE, 

111 "cp_isolation is set. use this flag for debugging purpose. " 

112 "limited list of encoding allowed : %s.", 

113 ", ".join(cp_isolation), 

114 ) 

115 cp_isolation = [iana_name(cp, False) for cp in cp_isolation] 

116 else: 

117 cp_isolation = [] 

118 

119 if cp_exclusion is not None: 

120 logger.log( 

121 TRACE, 

122 "cp_exclusion is set. use this flag for debugging purpose. " 

123 "limited list of encoding excluded : %s.", 

124 ", ".join(cp_exclusion), 

125 ) 

126 cp_exclusion = [iana_name(cp, False) for cp in cp_exclusion] 

127 else: 

128 cp_exclusion = [] 

129 

130 if length <= (chunk_size * steps): 

131 logger.log( 

132 TRACE, 

133 "override steps (%i) and chunk_size (%i) as content does not fit (%i byte(s) given) parameters.", 

134 steps, 

135 chunk_size, 

136 length, 

137 ) 

138 steps = 1 

139 chunk_size = length 

140 

141 if steps > 1 and length / steps < chunk_size: 

142 chunk_size = int(length / steps) 

143 

144 is_too_small_sequence: bool = len(sequences) < TOO_SMALL_SEQUENCE 

145 is_too_large_sequence: bool = len(sequences) >= TOO_BIG_SEQUENCE 

146 

147 if is_too_small_sequence: 

148 logger.log( 

149 TRACE, 

150 "Trying to detect encoding from a tiny portion of ({}) byte(s).".format( 

151 length 

152 ), 

153 ) 

154 elif is_too_large_sequence: 

155 logger.log( 

156 TRACE, 

157 "Using lazy str decoding because the payload is quite large, ({}) byte(s).".format( 

158 length 

159 ), 

160 ) 

161 

162 prioritized_encodings: list[str] = [] 

163 

164 specified_encoding: str | None = ( 

165 any_specified_encoding(sequences) if preemptive_behaviour else None 

166 ) 

167 

168 if specified_encoding is not None: 

169 prioritized_encodings.append(specified_encoding) 

170 logger.log( 

171 TRACE, 

172 "Detected declarative mark in sequence. Priority +1 given for %s.", 

173 specified_encoding, 

174 ) 

175 

176 tested: set[str] = set() 

177 tested_but_hard_failure: list[str] = [] 

178 tested_but_soft_failure: list[str] = [] 

179 soft_failure_skip: set[str] = set() 

180 success_fast_tracked: set[str] = set() 

181 

182 # Cache for decoded payload deduplication: hash(decoded_payload) -> (mean_mess_ratio, cd_ratios_merged, passed) 

183 # When multiple encodings decode to the exact same string, we can skip the expensive 

184 # mess_ratio and coherence_ratio analysis and reuse the results from the first encoding. 

185 payload_result_cache: dict[int, tuple[float, list[tuple[str, float]], bool]] = {} 

186 

187 # When a definitive result (chaos=0.0 and good coherence) is found after testing 

188 # the prioritized encodings (ascii, utf_8), we can significantly reduce the remaining 

189 # work. Encodings that target completely different language families (e.g., Cyrillic 

190 # when the definitive match is Latin) are skipped entirely. 

191 # Additionally, for same-family encodings that pass chaos probing, we reuse the 

192 # definitive match's coherence ratios instead of recomputing them — a major savings 

193 # since coherence_ratio accounts for ~30% of total time on slow Latin files. 

194 definitive_match_found: bool = False 

195 definitive_target_languages: set[str] = set() 

196 # After the definitive match fires, we cap the number of additional same-family 

197 # single-byte encodings that pass chaos probing. Once we've accumulated enough 

198 # good candidates (N), further same-family SB encodings are unlikely to produce 

199 # a better best() result and just waste mess_ratio + coherence_ratio time. 

200 # The first encoding to trigger the definitive match is NOT counted (it's already in). 

201 post_definitive_sb_success_count: int = 0 

202 POST_DEFINITIVE_SB_CAP: int = 7 

203 

204 # When a non-UTF multibyte encoding passes chaos probing with significant multibyte 

205 # content (decoded length < 98% of raw length), skip all remaining single-byte encodings. 

206 # Rationale: multi-byte decoders (CJK) have strict byte-sequence validation — if they 

207 # decode without error AND pass chaos probing with substantial multibyte content, the 

208 # data is genuinely multibyte encoded. Single-byte encodings will always decode (every 

209 # byte maps to something) but waste time on mess_ratio before failing. 

210 # The 98% threshold prevents false triggers on files that happen to have a few valid 

211 # multibyte pairs (e.g., cp424/_ude_1.txt where big5 decodes with 99% ratio). 

212 mb_definitive_match_found: bool = False 

213 

214 fallback_ascii: CharsetMatch | None = None 

215 fallback_u8: CharsetMatch | None = None 

216 fallback_specified: CharsetMatch | None = None 

217 

218 results: CharsetMatches = CharsetMatches() 

219 

220 early_stop_results: CharsetMatches = CharsetMatches() 

221 

222 sig_encoding, sig_payload = identify_sig_or_bom(sequences) 

223 

224 if sig_encoding is not None: 

225 prioritized_encodings.append(sig_encoding) 

226 logger.log( 

227 TRACE, 

228 "Detected a SIG or BOM mark on first %i byte(s). Priority +1 given for %s.", 

229 len(sig_payload), 

230 sig_encoding, 

231 ) 

232 

233 prioritized_encodings.append("ascii") 

234 

235 if "utf_8" not in prioritized_encodings: 

236 prioritized_encodings.append("utf_8") 

237 

238 for encoding_iana in prioritized_encodings + IANA_SUPPORTED_MB_FIRST: 

239 if cp_isolation and encoding_iana not in cp_isolation: 

240 continue 

241 

242 if cp_exclusion and encoding_iana in cp_exclusion: 

243 continue 

244 

245 if encoding_iana in tested: 

246 continue 

247 

248 tested.add(encoding_iana) 

249 

250 decoded_payload: str | None = None 

251 bom_or_sig_available: bool = sig_encoding == encoding_iana 

252 strip_sig_or_bom: bool = bom_or_sig_available and should_strip_sig_or_bom( 

253 encoding_iana 

254 ) 

255 

256 if encoding_iana in {"utf_16", "utf_32"} and not bom_or_sig_available: 

257 logger.log( 

258 TRACE, 

259 "Encoding %s won't be tested as-is because it require a BOM. Will try some sub-encoder LE/BE.", 

260 encoding_iana, 

261 ) 

262 continue 

263 if encoding_iana in {"utf_7"} and not bom_or_sig_available: 

264 logger.log( 

265 TRACE, 

266 "Encoding %s won't be tested as-is because detection is unreliable without BOM/SIG.", 

267 encoding_iana, 

268 ) 

269 continue 

270 

271 # Skip encodings similar to ones that already soft-failed (high mess ratio). 

272 # Checked BEFORE the expensive decode attempt. 

273 if encoding_iana in soft_failure_skip: 

274 logger.log( 

275 TRACE, 

276 "%s is deemed too similar to a code page that was already considered unsuited. Continuing!", 

277 encoding_iana, 

278 ) 

279 continue 

280 

281 # Skip encodings that were already fast-tracked from a similar successful encoding. 

282 if encoding_iana in success_fast_tracked: 

283 logger.log( 

284 TRACE, 

285 "Skipping %s: already fast-tracked from a similar successful encoding.", 

286 encoding_iana, 

287 ) 

288 continue 

289 

290 try: 

291 is_multi_byte_decoder: bool = is_multi_byte_encoding(encoding_iana) 

292 except (ModuleNotFoundError, ImportError): # Defensive: 

293 logger.log( 

294 TRACE, 

295 "Encoding %s does not provide an IncrementalDecoder", 

296 encoding_iana, 

297 ) 

298 continue 

299 

300 # When we've already found a definitive match (chaos=0.0 with good coherence) 

301 # after testing the prioritized encodings, skip encodings that target 

302 # completely different language families. This avoids running expensive 

303 # mess_ratio + coherence_ratio on clearly unrelated candidates (e.g., Cyrillic 

304 # when the definitive match is Latin-based). 

305 if definitive_match_found: 

306 if not is_multi_byte_decoder: 

307 enc_languages = set(encoding_languages(encoding_iana)) 

308 else: 

309 enc_languages = set(mb_encoding_languages(encoding_iana)) 

310 if not enc_languages.intersection(definitive_target_languages): 

311 logger.log( 

312 TRACE, 

313 "Skipping %s: definitive match already found, this encoding targets different languages (%s vs %s).", 

314 encoding_iana, 

315 enc_languages, 

316 definitive_target_languages, 

317 ) 

318 continue 

319 

320 # After the definitive match, cap the number of additional same-family 

321 # single-byte encodings that pass chaos probing. This avoids testing the 

322 # tail of rare, low-value same-family encodings (mac_iceland, cp860, etc.) 

323 # that almost never change best() but each cost ~1-2ms of mess_ratio + coherence. 

324 if ( 

325 definitive_match_found 

326 and not is_multi_byte_decoder 

327 and post_definitive_sb_success_count >= POST_DEFINITIVE_SB_CAP 

328 ): 

329 logger.log( 

330 TRACE, 

331 "Skipping %s: already accumulated %d same-family results after definitive match (cap=%d).", 

332 encoding_iana, 

333 post_definitive_sb_success_count, 

334 POST_DEFINITIVE_SB_CAP, 

335 ) 

336 continue 

337 

338 # When a multibyte encoding with significant multibyte content has already 

339 # passed chaos probing, skip all single-byte encodings. They will either fail 

340 # chaos probing (wasting mess_ratio time) or produce inferior results. 

341 if mb_definitive_match_found and not is_multi_byte_decoder: 

342 logger.log( 

343 TRACE, 

344 "Skipping single-byte %s: multi-byte definitive match already found.", 

345 encoding_iana, 

346 ) 

347 continue 

348 

349 try: 

350 if is_too_large_sequence and is_multi_byte_decoder is False: 

351 str( 

352 ( 

353 sequences[: int(50e4)] 

354 if strip_sig_or_bom is False 

355 else sequences[len(sig_payload) : int(50e4)] 

356 ), 

357 encoding=encoding_iana, 

358 ) 

359 else: 

360 # UTF-7 BOM is encoded in modified Base64 whose byte boundary 

361 # can overlap with the next character. Stripping raw SIG bytes 

362 # before decoding may leave stray bytes that decode as garbage. 

363 # Decode the full sequence and remove the leading BOM char instead. 

364 # see https://github.com/jawah/charset_normalizer/issues/718 

365 # and https://github.com/jawah/charset_normalizer/issues/716 

366 if encoding_iana == "utf_7" and bom_or_sig_available: 

367 decoded_payload = str( 

368 sequences, 

369 encoding=encoding_iana, 

370 ) 

371 if decoded_payload and decoded_payload[0] == "\ufeff": 

372 decoded_payload = decoded_payload[1:] 

373 else: 

374 decoded_payload = str( 

375 ( 

376 sequences 

377 if strip_sig_or_bom is False 

378 else sequences[len(sig_payload) :] 

379 ), 

380 encoding=encoding_iana, 

381 ) 

382 except (UnicodeDecodeError, LookupError) as e: 

383 if not isinstance(e, LookupError): 

384 logger.log( 

385 TRACE, 

386 "Code page %s does not fit given bytes sequence at ALL. %s", 

387 encoding_iana, 

388 str(e), 

389 ) 

390 tested_but_hard_failure.append(encoding_iana) 

391 continue 

392 

393 r_ = range( 

394 0 if not bom_or_sig_available else len(sig_payload), 

395 length, 

396 int(length / steps), 

397 ) 

398 

399 multi_byte_bonus: bool = ( 

400 is_multi_byte_decoder 

401 and decoded_payload is not None 

402 and len(decoded_payload) < length 

403 ) 

404 

405 if multi_byte_bonus: 

406 logger.log( 

407 TRACE, 

408 "Code page %s is a multi byte encoding table and it appear that at least one character " 

409 "was encoded using n-bytes.", 

410 encoding_iana, 

411 ) 

412 

413 # Payload-hash deduplication: if another encoding already decoded to the 

414 # exact same string, reuse its mess_ratio and coherence results entirely. 

415 # This is strictly more general than the old IANA_SUPPORTED_SIMILAR approach 

416 # because it catches ALL identical decoding, not just pre-mapped ones. 

417 if decoded_payload is not None and not is_multi_byte_decoder: 

418 payload_hash: int = hash(decoded_payload) 

419 cached = payload_result_cache.get(payload_hash) 

420 if cached is not None: 

421 cached_mess, cached_cd, cached_passed = cached 

422 if cached_passed: 

423 # The previous encoding with identical output passed chaos probing. 

424 fast_match = CharsetMatch( 

425 sequences, 

426 encoding_iana, 

427 cached_mess, 

428 bom_or_sig_available, 

429 cached_cd, 

430 ( 

431 decoded_payload 

432 if ( 

433 is_too_large_sequence is False 

434 or encoding_iana 

435 in [specified_encoding, "ascii", "utf_8"] 

436 ) 

437 else None 

438 ), 

439 preemptive_declaration=specified_encoding, 

440 ) 

441 results.append(fast_match) 

442 success_fast_tracked.add(encoding_iana) 

443 logger.log( 

444 TRACE, 

445 "%s fast-tracked (identical decoded payload to a prior encoding, chaos=%f %%).", 

446 encoding_iana, 

447 round(cached_mess * 100, ndigits=3), 

448 ) 

449 

450 if ( 

451 encoding_iana in [specified_encoding, "ascii", "utf_8"] 

452 and cached_mess < 0.1 

453 ): 

454 if cached_mess == 0.0: 

455 logger.debug( 

456 "Encoding detection: %s is most likely the one.", 

457 fast_match.encoding, 

458 ) 

459 if explain: 

460 logger.removeHandler(explain_handler) 

461 logger.setLevel(previous_logger_level) 

462 return CharsetMatches([fast_match]) 

463 early_stop_results.append(fast_match) 

464 

465 if ( 

466 len(early_stop_results) 

467 and (specified_encoding is None or specified_encoding in tested) 

468 and "ascii" in tested 

469 and "utf_8" in tested 

470 ): 

471 probable_result: CharsetMatch = early_stop_results.best() # type: ignore[assignment] 

472 logger.debug( 

473 "Encoding detection: %s is most likely the one.", 

474 probable_result.encoding, 

475 ) 

476 if explain: 

477 logger.removeHandler(explain_handler) 

478 logger.setLevel(previous_logger_level) 

479 return CharsetMatches([probable_result]) 

480 

481 continue 

482 else: 

483 # The previous encoding with identical output failed chaos probing. 

484 tested_but_soft_failure.append(encoding_iana) 

485 logger.log( 

486 TRACE, 

487 "%s fast-skipped (identical decoded payload to a prior encoding that failed chaos probing).", 

488 encoding_iana, 

489 ) 

490 # Prepare fallbacks for special encodings even when skipped. 

491 if enable_fallback and encoding_iana in [ 

492 "ascii", 

493 "utf_8", 

494 specified_encoding, 

495 "utf_16", 

496 "utf_32", 

497 ]: 

498 fallback_entry = CharsetMatch( 

499 sequences, 

500 encoding_iana, 

501 threshold, 

502 bom_or_sig_available, 

503 [], 

504 decoded_payload, 

505 preemptive_declaration=specified_encoding, 

506 ) 

507 if encoding_iana == specified_encoding: 

508 fallback_specified = fallback_entry 

509 elif encoding_iana == "ascii": 

510 fallback_ascii = fallback_entry 

511 else: 

512 fallback_u8 = fallback_entry 

513 continue 

514 

515 max_chunk_gave_up: int = int(len(r_) / 4) 

516 

517 max_chunk_gave_up = max(max_chunk_gave_up, 2) 

518 early_stop_count: int = 0 

519 lazy_str_hard_failure = False 

520 

521 md_chunks: list[str] = [] 

522 md_ratios = [] 

523 

524 try: 

525 for chunk in cut_sequence_chunks( 

526 sequences, 

527 encoding_iana, 

528 r_, 

529 chunk_size, 

530 bom_or_sig_available, 

531 strip_sig_or_bom, 

532 sig_payload, 

533 is_multi_byte_decoder, 

534 decoded_payload, 

535 ): 

536 md_chunks.append(chunk) 

537 

538 md_ratios.append( 

539 mess_ratio( 

540 chunk, 

541 threshold, 

542 explain is True and 1 <= len(cp_isolation) <= 2, 

543 ) 

544 ) 

545 

546 if md_ratios[-1] >= threshold: 

547 early_stop_count += 1 

548 

549 if (early_stop_count >= max_chunk_gave_up) or ( 

550 bom_or_sig_available and strip_sig_or_bom is False 

551 ): 

552 break 

553 except ( 

554 UnicodeDecodeError 

555 ) as e: # Lazy str loading may have missed something there 

556 logger.log( 

557 TRACE, 

558 "LazyStr Loading: After MD chunk decode, code page %s does not fit given bytes sequence at ALL. %s", 

559 encoding_iana, 

560 str(e), 

561 ) 

562 early_stop_count = max_chunk_gave_up 

563 lazy_str_hard_failure = True 

564 

565 # We might want to check the sequence again with the whole content 

566 # Only if initial MD tests passes 

567 if ( 

568 not lazy_str_hard_failure 

569 and is_too_large_sequence 

570 and not is_multi_byte_decoder 

571 ): 

572 try: 

573 sequences[int(50e3) :].decode(encoding_iana, errors="strict") 

574 except UnicodeDecodeError as e: 

575 logger.log( 

576 TRACE, 

577 "LazyStr Loading: After final lookup, code page %s does not fit given bytes sequence at ALL. %s", 

578 encoding_iana, 

579 str(e), 

580 ) 

581 tested_but_hard_failure.append(encoding_iana) 

582 continue 

583 

584 mean_mess_ratio: float = sum(md_ratios) / len(md_ratios) if md_ratios else 0.0 

585 if mean_mess_ratio >= threshold or early_stop_count >= max_chunk_gave_up: 

586 tested_but_soft_failure.append(encoding_iana) 

587 if encoding_iana in IANA_SUPPORTED_SIMILAR: 

588 soft_failure_skip.update(IANA_SUPPORTED_SIMILAR[encoding_iana]) 

589 # Cache this soft-failure so identical decoding from other encodings 

590 # can be skipped immediately. 

591 if decoded_payload is not None and not is_multi_byte_decoder: 

592 payload_result_cache.setdefault( 

593 hash(decoded_payload), (mean_mess_ratio, [], False) 

594 ) 

595 logger.log( 

596 TRACE, 

597 "%s was excluded because of initial chaos probing. Gave up %i time(s). " 

598 "Computed mean chaos is %f %%.", 

599 encoding_iana, 

600 early_stop_count, 

601 round(mean_mess_ratio * 100, ndigits=3), 

602 ) 

603 # Preparing those fallbacks in case we got nothing. 

604 if ( 

605 enable_fallback 

606 and encoding_iana 

607 in ["ascii", "utf_8", specified_encoding, "utf_16", "utf_32"] 

608 and not lazy_str_hard_failure 

609 ): 

610 fallback_entry = CharsetMatch( 

611 sequences, 

612 encoding_iana, 

613 threshold, 

614 bom_or_sig_available, 

615 [], 

616 decoded_payload, 

617 preemptive_declaration=specified_encoding, 

618 ) 

619 if encoding_iana == specified_encoding: 

620 fallback_specified = fallback_entry 

621 elif encoding_iana == "ascii": 

622 fallback_ascii = fallback_entry 

623 else: 

624 fallback_u8 = fallback_entry 

625 continue 

626 

627 logger.log( 

628 TRACE, 

629 "%s passed initial chaos probing. Mean measured chaos is %f %%", 

630 encoding_iana, 

631 round(mean_mess_ratio * 100, ndigits=3), 

632 ) 

633 

634 if not is_multi_byte_decoder: 

635 target_languages: list[str] = encoding_languages(encoding_iana) 

636 else: 

637 target_languages = mb_encoding_languages(encoding_iana) 

638 

639 if target_languages: 

640 logger.log( 

641 TRACE, 

642 "{} should target any language(s) of {}".format( 

643 encoding_iana, str(target_languages) 

644 ), 

645 ) 

646 

647 cd_ratios = [] 

648 

649 # Run coherence detection on all chunks. We previously tried limiting to 

650 # 1-2 chunks for post-definitive encodings to save time, but this caused 

651 # coverage regressions by producing unrepresentative coherence scores. 

652 # The SB cap and language-family skip optimizations provide sufficient 

653 # speedup without sacrificing coherence accuracy. 

654 if encoding_iana != "ascii": 

655 # We shall skip the CD when its about ASCII 

656 # Most of the time its not relevant to run "language-detection" on it. 

657 for chunk in md_chunks: 

658 chunk_languages = coherence_ratio( 

659 chunk, 

660 language_threshold, 

661 ",".join(target_languages) if target_languages else None, 

662 ) 

663 

664 cd_ratios.append(chunk_languages) 

665 cd_ratios_merged = merge_coherence_ratios(cd_ratios) 

666 else: 

667 cd_ratios_merged = merge_coherence_ratios(cd_ratios) 

668 

669 if cd_ratios_merged: 

670 logger.log( 

671 TRACE, 

672 "We detected language {} using {}".format( 

673 cd_ratios_merged, encoding_iana 

674 ), 

675 ) 

676 

677 current_match = CharsetMatch( 

678 sequences, 

679 encoding_iana, 

680 mean_mess_ratio, 

681 bom_or_sig_available, 

682 cd_ratios_merged, 

683 ( 

684 decoded_payload 

685 if ( 

686 is_too_large_sequence is False 

687 or encoding_iana in [specified_encoding, "ascii", "utf_8"] 

688 ) 

689 else None 

690 ), 

691 preemptive_declaration=specified_encoding, 

692 ) 

693 

694 results.append(current_match) 

695 

696 # Cache the successful result for payload-hash deduplication. 

697 if decoded_payload is not None and not is_multi_byte_decoder: 

698 payload_result_cache.setdefault( 

699 hash(decoded_payload), 

700 (mean_mess_ratio, cd_ratios_merged, True), 

701 ) 

702 

703 # Count post-definitive same-family SB successes for the early termination cap. 

704 # Only count low-mess encodings (< 2%) toward the cap. High-mess encodings are 

705 # marginal results that shouldn't prevent better-quality candidates from being 

706 # tested. For example, iso8859_4 (mess=0%) should not be skipped just because 

707 # 7 high-mess Latin encodings (cp1252 at 8%, etc.) were tried first. 

708 if ( 

709 definitive_match_found 

710 and not is_multi_byte_decoder 

711 and mean_mess_ratio < 0.02 

712 ): 

713 post_definitive_sb_success_count += 1 

714 

715 if ( 

716 encoding_iana in [specified_encoding, "ascii", "utf_8"] 

717 and mean_mess_ratio < 0.1 

718 ): 

719 # If md says nothing to worry about, then... stop immediately! 

720 if mean_mess_ratio == 0.0: 

721 logger.debug( 

722 "Encoding detection: %s is most likely the one.", 

723 current_match.encoding, 

724 ) 

725 if explain: # Defensive: ensure exit path clean handler 

726 logger.removeHandler(explain_handler) 

727 logger.setLevel(previous_logger_level) 

728 return CharsetMatches([current_match]) 

729 

730 early_stop_results.append(current_match) 

731 

732 if ( 

733 len(early_stop_results) 

734 and (specified_encoding is None or specified_encoding in tested) 

735 and "ascii" in tested 

736 and "utf_8" in tested 

737 ): 

738 probable_result = early_stop_results.best() # type: ignore[assignment] 

739 logger.debug( 

740 "Encoding detection: %s is most likely the one.", 

741 probable_result.encoding, # type: ignore[union-attr] 

742 ) 

743 if explain: # Defensive: ensure exit path clean handler 

744 logger.removeHandler(explain_handler) 

745 logger.setLevel(previous_logger_level) 

746 

747 return CharsetMatches([probable_result]) 

748 

749 # Once we find a result with good coherence (>= 0.5) after testing the 

750 # prioritized encodings (ascii, utf_8), activate "definitive mode": skip 

751 # encodings that target completely different language families. This avoids 

752 # running expensive mess_ratio + coherence_ratio on clearly unrelated 

753 # candidates (e.g., Cyrillic encodings when the match is Latin-based). 

754 # We require coherence >= 0.5 to avoid false positives (e.g., cp1251 decoding 

755 # Hebrew text with 0.0 chaos but wrong language detection at coherence 0.33). 

756 if not definitive_match_found and not is_multi_byte_decoder: 

757 best_coherence = ( 

758 max((v for _, v in cd_ratios_merged), default=0.0) 

759 if cd_ratios_merged 

760 else 0.0 

761 ) 

762 if best_coherence >= 0.5 and "ascii" in tested and "utf_8" in tested: 

763 definitive_match_found = True 

764 definitive_target_languages.update(target_languages) 

765 logger.log( 

766 TRACE, 

767 "Definitive match found: %s (chaos=%.3f, coherence=%.2f). Encodings targeting different language families will be skipped.", 

768 encoding_iana, 

769 mean_mess_ratio, 

770 best_coherence, 

771 ) 

772 

773 # When a non-UTF multibyte encoding passes chaos probing with significant 

774 # multibyte content (decoded < 98% of raw), activate mb_definitive_match. 

775 # This skips all remaining single-byte encodings which would either soft-fail 

776 # (running expensive mess_ratio for nothing) or produce inferior results. 

777 if ( 

778 not mb_definitive_match_found 

779 and is_multi_byte_decoder 

780 and multi_byte_bonus 

781 and decoded_payload is not None 

782 and len(decoded_payload) < length * 0.98 

783 and encoding_iana 

784 not in { 

785 "utf_8", 

786 "utf_8_sig", 

787 "utf_16", 

788 "utf_16_be", 

789 "utf_16_le", 

790 "utf_32", 

791 "utf_32_be", 

792 "utf_32_le", 

793 "utf_7", 

794 } 

795 and "ascii" in tested 

796 and "utf_8" in tested 

797 ): 

798 mb_definitive_match_found = True 

799 logger.log( 

800 TRACE, 

801 "Multi-byte definitive match: %s (chaos=%.3f, decoded=%d/%d=%.1f%%). Single-byte encodings will be skipped.", 

802 encoding_iana, 

803 mean_mess_ratio, 

804 len(decoded_payload), 

805 length, 

806 len(decoded_payload) / length * 100, 

807 ) 

808 

809 if encoding_iana == sig_encoding: 

810 logger.debug( 

811 "Encoding detection: %s is most likely the one as we detected a BOM or SIG within " 

812 "the beginning of the sequence.", 

813 encoding_iana, 

814 ) 

815 if explain: # Defensive: ensure exit path clean handler 

816 logger.removeHandler(explain_handler) 

817 logger.setLevel(previous_logger_level) 

818 return CharsetMatches([results[encoding_iana]]) 

819 

820 if len(results) == 0: 

821 if fallback_u8 or fallback_ascii or fallback_specified: 

822 logger.log( 

823 TRACE, 

824 "Nothing got out of the detection process. Using ASCII/UTF-8/Specified fallback.", 

825 ) 

826 

827 if fallback_specified: 

828 logger.debug( 

829 "Encoding detection: %s will be used as a fallback match", 

830 fallback_specified.encoding, 

831 ) 

832 results.append(fallback_specified) 

833 elif ( 

834 (fallback_u8 and fallback_ascii is None) 

835 or ( 

836 fallback_u8 

837 and fallback_ascii 

838 and fallback_u8.fingerprint != fallback_ascii.fingerprint 

839 ) 

840 or (fallback_u8 is not None) 

841 ): 

842 logger.debug("Encoding detection: utf_8 will be used as a fallback match") 

843 results.append(fallback_u8) 

844 elif fallback_ascii: 

845 logger.debug("Encoding detection: ascii will be used as a fallback match") 

846 results.append(fallback_ascii) 

847 

848 if results: 

849 logger.debug( 

850 "Encoding detection: Found %s as plausible (best-candidate) for content. With %i alternatives.", 

851 results.best().encoding, # type: ignore 

852 len(results) - 1, 

853 ) 

854 else: 

855 logger.debug("Encoding detection: Unable to determine any suitable charset.") 

856 

857 if explain: 

858 logger.removeHandler(explain_handler) 

859 logger.setLevel(previous_logger_level) 

860 

861 return results 

862 

863 

864def from_fp( 

865 fp: BinaryIO, 

866 steps: int = 5, 

867 chunk_size: int = 512, 

868 threshold: float = 0.20, 

869 cp_isolation: list[str] | None = None, 

870 cp_exclusion: list[str] | None = None, 

871 preemptive_behaviour: bool = True, 

872 explain: bool = False, 

873 language_threshold: float = 0.1, 

874 enable_fallback: bool = True, 

875) -> CharsetMatches: 

876 """ 

877 Same thing than the function from_bytes but using a file pointer that is already ready. 

878 Will not close the file pointer. 

879 """ 

880 return from_bytes( 

881 fp.read(), 

882 steps, 

883 chunk_size, 

884 threshold, 

885 cp_isolation, 

886 cp_exclusion, 

887 preemptive_behaviour, 

888 explain, 

889 language_threshold, 

890 enable_fallback, 

891 ) 

892 

893 

894def from_path( 

895 path: str | bytes | PathLike, # type: ignore[type-arg] 

896 steps: int = 5, 

897 chunk_size: int = 512, 

898 threshold: float = 0.20, 

899 cp_isolation: list[str] | None = None, 

900 cp_exclusion: list[str] | None = None, 

901 preemptive_behaviour: bool = True, 

902 explain: bool = False, 

903 language_threshold: float = 0.1, 

904 enable_fallback: bool = True, 

905) -> CharsetMatches: 

906 """ 

907 Same thing than the function from_bytes but with one extra step. Opening and reading given file path in binary mode. 

908 Can raise IOError. 

909 """ 

910 with open(path, "rb") as fp: 

911 return from_fp( 

912 fp, 

913 steps, 

914 chunk_size, 

915 threshold, 

916 cp_isolation, 

917 cp_exclusion, 

918 preemptive_behaviour, 

919 explain, 

920 language_threshold, 

921 enable_fallback, 

922 ) 

923 

924 

925def is_binary( 

926 fp_or_path_or_payload: PathLike | str | BinaryIO | bytes, # type: ignore[type-arg] 

927 steps: int = 5, 

928 chunk_size: int = 512, 

929 threshold: float = 0.20, 

930 cp_isolation: list[str] | None = None, 

931 cp_exclusion: list[str] | None = None, 

932 preemptive_behaviour: bool = True, 

933 explain: bool = False, 

934 language_threshold: float = 0.1, 

935 enable_fallback: bool = False, 

936) -> bool: 

937 """ 

938 Detect if the given input (file, bytes, or path) points to a binary file. aka. not a string. 

939 Based on the same main heuristic algorithms and default kwargs at the sole exception that fallbacks match 

940 are disabled to be stricter around ASCII-compatible but unlikely to be a string. 

941 """ 

942 if isinstance(fp_or_path_or_payload, (str, PathLike)): 

943 guesses = from_path( 

944 fp_or_path_or_payload, 

945 steps=steps, 

946 chunk_size=chunk_size, 

947 threshold=threshold, 

948 cp_isolation=cp_isolation, 

949 cp_exclusion=cp_exclusion, 

950 preemptive_behaviour=preemptive_behaviour, 

951 explain=explain, 

952 language_threshold=language_threshold, 

953 enable_fallback=enable_fallback, 

954 ) 

955 elif isinstance( 

956 fp_or_path_or_payload, 

957 ( 

958 bytes, 

959 bytearray, 

960 ), 

961 ): 

962 guesses = from_bytes( 

963 fp_or_path_or_payload, 

964 steps=steps, 

965 chunk_size=chunk_size, 

966 threshold=threshold, 

967 cp_isolation=cp_isolation, 

968 cp_exclusion=cp_exclusion, 

969 preemptive_behaviour=preemptive_behaviour, 

970 explain=explain, 

971 language_threshold=language_threshold, 

972 enable_fallback=enable_fallback, 

973 ) 

974 else: 

975 guesses = from_fp( 

976 fp_or_path_or_payload, 

977 steps=steps, 

978 chunk_size=chunk_size, 

979 threshold=threshold, 

980 cp_isolation=cp_isolation, 

981 cp_exclusion=cp_exclusion, 

982 preemptive_behaviour=preemptive_behaviour, 

983 explain=explain, 

984 language_threshold=language_threshold, 

985 enable_fallback=enable_fallback, 

986 ) 

987 

988 return not guesses