Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/charset_normalizer/api.py: 8%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

212 statements  

1from __future__ import annotations 

2 

3import logging 

4from os import PathLike 

5from typing import BinaryIO 

6 

7from .cd import ( 

8 coherence_ratio, 

9 encoding_languages, 

10 mb_encoding_languages, 

11 merge_coherence_ratios, 

12) 

13from .constant import ( 

14 IANA_SUPPORTED, 

15 IANA_SUPPORTED_SIMILAR, 

16 TOO_BIG_SEQUENCE, 

17 TOO_SMALL_SEQUENCE, 

18 TRACE, 

19) 

20from .md import mess_ratio 

21from .models import CharsetMatch, CharsetMatches 

22from .utils import ( 

23 any_specified_encoding, 

24 cut_sequence_chunks, 

25 iana_name, 

26 identify_sig_or_bom, 

27 is_multi_byte_encoding, 

28 should_strip_sig_or_bom, 

29) 

30 

31logger = logging.getLogger("charset_normalizer") 

32explain_handler = logging.StreamHandler() 

33explain_handler.setFormatter( 

34 logging.Formatter("%(asctime)s | %(levelname)s | %(message)s") 

35) 

36 

37 

38def from_bytes( 

39 sequences: bytes | bytearray, 

40 steps: int = 5, 

41 chunk_size: int = 512, 

42 threshold: float = 0.2, 

43 cp_isolation: list[str] | None = None, 

44 cp_exclusion: list[str] | None = None, 

45 preemptive_behaviour: bool = True, 

46 explain: bool = False, 

47 language_threshold: float = 0.1, 

48 enable_fallback: bool = True, 

49) -> CharsetMatches: 

50 """ 

51 Given a raw bytes sequence, return the best possibles charset usable to render str objects. 

52 If there is no results, it is a strong indicator that the source is binary/not text. 

53 By default, the process will extract 5 blocks of 512o each to assess the mess and coherence of a given sequence. 

54 And will give up a particular code page after 20% of measured mess. Those criteria are customizable at will. 

55 

56 The preemptive behavior DOES NOT replace the traditional detection workflow, it prioritize a particular code page 

57 but never take it for granted. Can improve the performance. 

58 

59 You may want to focus your attention to some code page or/and not others, use cp_isolation and cp_exclusion for that 

60 purpose. 

61 

62 This function will strip the SIG in the payload/sequence every time except on UTF-16, UTF-32. 

63 By default the library does not setup any handler other than the NullHandler, if you choose to set the 'explain' 

64 toggle to True it will alter the logger configuration to add a StreamHandler that is suitable for debugging. 

65 Custom logging format and handler can be set manually. 

66 """ 

67 

68 if not isinstance(sequences, (bytearray, bytes)): 

69 raise TypeError( 

70 "Expected object of type bytes or bytearray, got: {}".format( 

71 type(sequences) 

72 ) 

73 ) 

74 

75 if explain: 

76 previous_logger_level: int = logger.level 

77 logger.addHandler(explain_handler) 

78 logger.setLevel(TRACE) 

79 

80 length: int = len(sequences) 

81 

82 if length == 0: 

83 logger.debug("Encoding detection on empty bytes, assuming utf_8 intention.") 

84 if explain: # Defensive: ensure exit path clean handler 

85 logger.removeHandler(explain_handler) 

86 logger.setLevel(previous_logger_level) 

87 return CharsetMatches([CharsetMatch(sequences, "utf_8", 0.0, False, [], "")]) 

88 

89 if cp_isolation is not None: 

90 logger.log( 

91 TRACE, 

92 "cp_isolation is set. use this flag for debugging purpose. " 

93 "limited list of encoding allowed : %s.", 

94 ", ".join(cp_isolation), 

95 ) 

96 cp_isolation = [iana_name(cp, False) for cp in cp_isolation] 

97 else: 

98 cp_isolation = [] 

99 

100 if cp_exclusion is not None: 

101 logger.log( 

102 TRACE, 

103 "cp_exclusion is set. use this flag for debugging purpose. " 

104 "limited list of encoding excluded : %s.", 

105 ", ".join(cp_exclusion), 

106 ) 

107 cp_exclusion = [iana_name(cp, False) for cp in cp_exclusion] 

108 else: 

109 cp_exclusion = [] 

110 

111 if length <= (chunk_size * steps): 

112 logger.log( 

113 TRACE, 

114 "override steps (%i) and chunk_size (%i) as content does not fit (%i byte(s) given) parameters.", 

115 steps, 

116 chunk_size, 

117 length, 

118 ) 

119 steps = 1 

120 chunk_size = length 

121 

122 if steps > 1 and length / steps < chunk_size: 

123 chunk_size = int(length / steps) 

124 

125 is_too_small_sequence: bool = len(sequences) < TOO_SMALL_SEQUENCE 

126 is_too_large_sequence: bool = len(sequences) >= TOO_BIG_SEQUENCE 

127 

128 if is_too_small_sequence: 

129 logger.log( 

130 TRACE, 

131 "Trying to detect encoding from a tiny portion of ({}) byte(s).".format( 

132 length 

133 ), 

134 ) 

135 elif is_too_large_sequence: 

136 logger.log( 

137 TRACE, 

138 "Using lazy str decoding because the payload is quite large, ({}) byte(s).".format( 

139 length 

140 ), 

141 ) 

142 

143 prioritized_encodings: list[str] = [] 

144 

145 specified_encoding: str | None = ( 

146 any_specified_encoding(sequences) if preemptive_behaviour else None 

147 ) 

148 

149 if specified_encoding is not None: 

150 prioritized_encodings.append(specified_encoding) 

151 logger.log( 

152 TRACE, 

153 "Detected declarative mark in sequence. Priority +1 given for %s.", 

154 specified_encoding, 

155 ) 

156 

157 tested: set[str] = set() 

158 tested_but_hard_failure: list[str] = [] 

159 tested_but_soft_failure: list[str] = [] 

160 soft_failure_skip: set[str] = set() 

161 

162 fallback_ascii: CharsetMatch | None = None 

163 fallback_u8: CharsetMatch | None = None 

164 fallback_specified: CharsetMatch | None = None 

165 

166 results: CharsetMatches = CharsetMatches() 

167 

168 early_stop_results: CharsetMatches = CharsetMatches() 

169 

170 sig_encoding, sig_payload = identify_sig_or_bom(sequences) 

171 

172 if sig_encoding is not None: 

173 prioritized_encodings.append(sig_encoding) 

174 logger.log( 

175 TRACE, 

176 "Detected a SIG or BOM mark on first %i byte(s). Priority +1 given for %s.", 

177 len(sig_payload), 

178 sig_encoding, 

179 ) 

180 

181 prioritized_encodings.append("ascii") 

182 

183 if "utf_8" not in prioritized_encodings: 

184 prioritized_encodings.append("utf_8") 

185 

186 for encoding_iana in prioritized_encodings + IANA_SUPPORTED: 

187 if cp_isolation and encoding_iana not in cp_isolation: 

188 continue 

189 

190 if cp_exclusion and encoding_iana in cp_exclusion: 

191 continue 

192 

193 if encoding_iana in tested: 

194 continue 

195 

196 tested.add(encoding_iana) 

197 

198 decoded_payload: str | None = None 

199 bom_or_sig_available: bool = sig_encoding == encoding_iana 

200 strip_sig_or_bom: bool = bom_or_sig_available and should_strip_sig_or_bom( 

201 encoding_iana 

202 ) 

203 

204 if encoding_iana in {"utf_16", "utf_32"} and not bom_or_sig_available: 

205 logger.log( 

206 TRACE, 

207 "Encoding %s won't be tested as-is because it require a BOM. Will try some sub-encoder LE/BE.", 

208 encoding_iana, 

209 ) 

210 continue 

211 if encoding_iana in {"utf_7"} and not bom_or_sig_available: 

212 logger.log( 

213 TRACE, 

214 "Encoding %s won't be tested as-is because detection is unreliable without BOM/SIG.", 

215 encoding_iana, 

216 ) 

217 continue 

218 

219 # Skip encodings similar to ones that already soft-failed (high mess ratio). 

220 # Checked BEFORE the expensive decode attempt. 

221 if encoding_iana in soft_failure_skip: 

222 logger.log( 

223 TRACE, 

224 "%s is deemed too similar to a code page that was already considered unsuited. Continuing!", 

225 encoding_iana, 

226 ) 

227 continue 

228 

229 try: 

230 is_multi_byte_decoder: bool = is_multi_byte_encoding(encoding_iana) 

231 except (ModuleNotFoundError, ImportError): 

232 logger.log( 

233 TRACE, 

234 "Encoding %s does not provide an IncrementalDecoder", 

235 encoding_iana, 

236 ) 

237 continue 

238 

239 try: 

240 if is_too_large_sequence and is_multi_byte_decoder is False: 

241 str( 

242 ( 

243 sequences[: int(50e4)] 

244 if strip_sig_or_bom is False 

245 else sequences[len(sig_payload) : int(50e4)] 

246 ), 

247 encoding=encoding_iana, 

248 ) 

249 else: 

250 decoded_payload = str( 

251 ( 

252 sequences 

253 if strip_sig_or_bom is False 

254 else sequences[len(sig_payload) :] 

255 ), 

256 encoding=encoding_iana, 

257 ) 

258 except (UnicodeDecodeError, LookupError) as e: 

259 if not isinstance(e, LookupError): 

260 logger.log( 

261 TRACE, 

262 "Code page %s does not fit given bytes sequence at ALL. %s", 

263 encoding_iana, 

264 str(e), 

265 ) 

266 tested_but_hard_failure.append(encoding_iana) 

267 continue 

268 

269 r_ = range( 

270 0 if not bom_or_sig_available else len(sig_payload), 

271 length, 

272 int(length / steps), 

273 ) 

274 

275 multi_byte_bonus: bool = ( 

276 is_multi_byte_decoder 

277 and decoded_payload is not None 

278 and len(decoded_payload) < length 

279 ) 

280 

281 if multi_byte_bonus: 

282 logger.log( 

283 TRACE, 

284 "Code page %s is a multi byte encoding table and it appear that at least one character " 

285 "was encoded using n-bytes.", 

286 encoding_iana, 

287 ) 

288 

289 max_chunk_gave_up: int = int(len(r_) / 4) 

290 

291 max_chunk_gave_up = max(max_chunk_gave_up, 2) 

292 early_stop_count: int = 0 

293 lazy_str_hard_failure = False 

294 

295 md_chunks: list[str] = [] 

296 md_ratios = [] 

297 

298 try: 

299 for chunk in cut_sequence_chunks( 

300 sequences, 

301 encoding_iana, 

302 r_, 

303 chunk_size, 

304 bom_or_sig_available, 

305 strip_sig_or_bom, 

306 sig_payload, 

307 is_multi_byte_decoder, 

308 decoded_payload, 

309 ): 

310 md_chunks.append(chunk) 

311 

312 md_ratios.append( 

313 mess_ratio( 

314 chunk, 

315 threshold, 

316 explain is True and 1 <= len(cp_isolation) <= 2, 

317 ) 

318 ) 

319 

320 if md_ratios[-1] >= threshold: 

321 early_stop_count += 1 

322 

323 if (early_stop_count >= max_chunk_gave_up) or ( 

324 bom_or_sig_available and strip_sig_or_bom is False 

325 ): 

326 break 

327 except ( 

328 UnicodeDecodeError 

329 ) as e: # Lazy str loading may have missed something there 

330 logger.log( 

331 TRACE, 

332 "LazyStr Loading: After MD chunk decode, code page %s does not fit given bytes sequence at ALL. %s", 

333 encoding_iana, 

334 str(e), 

335 ) 

336 early_stop_count = max_chunk_gave_up 

337 lazy_str_hard_failure = True 

338 

339 # We might want to check the sequence again with the whole content 

340 # Only if initial MD tests passes 

341 if ( 

342 not lazy_str_hard_failure 

343 and is_too_large_sequence 

344 and not is_multi_byte_decoder 

345 ): 

346 try: 

347 sequences[int(50e3) :].decode(encoding_iana, errors="strict") 

348 except UnicodeDecodeError as e: 

349 logger.log( 

350 TRACE, 

351 "LazyStr Loading: After final lookup, code page %s does not fit given bytes sequence at ALL. %s", 

352 encoding_iana, 

353 str(e), 

354 ) 

355 tested_but_hard_failure.append(encoding_iana) 

356 continue 

357 

358 mean_mess_ratio: float = sum(md_ratios) / len(md_ratios) if md_ratios else 0.0 

359 if mean_mess_ratio >= threshold or early_stop_count >= max_chunk_gave_up: 

360 tested_but_soft_failure.append(encoding_iana) 

361 if encoding_iana in IANA_SUPPORTED_SIMILAR: 

362 soft_failure_skip.update(IANA_SUPPORTED_SIMILAR[encoding_iana]) 

363 logger.log( 

364 TRACE, 

365 "%s was excluded because of initial chaos probing. Gave up %i time(s). " 

366 "Computed mean chaos is %f %%.", 

367 encoding_iana, 

368 early_stop_count, 

369 round(mean_mess_ratio * 100, ndigits=3), 

370 ) 

371 # Preparing those fallbacks in case we got nothing. 

372 if ( 

373 enable_fallback 

374 and encoding_iana 

375 in ["ascii", "utf_8", specified_encoding, "utf_16", "utf_32"] 

376 and not lazy_str_hard_failure 

377 ): 

378 fallback_entry = CharsetMatch( 

379 sequences, 

380 encoding_iana, 

381 threshold, 

382 bom_or_sig_available, 

383 [], 

384 decoded_payload, 

385 preemptive_declaration=specified_encoding, 

386 ) 

387 if encoding_iana == specified_encoding: 

388 fallback_specified = fallback_entry 

389 elif encoding_iana == "ascii": 

390 fallback_ascii = fallback_entry 

391 else: 

392 fallback_u8 = fallback_entry 

393 continue 

394 

395 logger.log( 

396 TRACE, 

397 "%s passed initial chaos probing. Mean measured chaos is %f %%", 

398 encoding_iana, 

399 round(mean_mess_ratio * 100, ndigits=3), 

400 ) 

401 

402 if not is_multi_byte_decoder: 

403 target_languages: list[str] = encoding_languages(encoding_iana) 

404 else: 

405 target_languages = mb_encoding_languages(encoding_iana) 

406 

407 if target_languages: 

408 logger.log( 

409 TRACE, 

410 "{} should target any language(s) of {}".format( 

411 encoding_iana, str(target_languages) 

412 ), 

413 ) 

414 

415 cd_ratios = [] 

416 

417 # We shall skip the CD when its about ASCII 

418 # Most of the time its not relevant to run "language-detection" on it. 

419 if encoding_iana != "ascii": 

420 for chunk in md_chunks: 

421 chunk_languages = coherence_ratio( 

422 chunk, 

423 language_threshold, 

424 ",".join(target_languages) if target_languages else None, 

425 ) 

426 

427 cd_ratios.append(chunk_languages) 

428 

429 cd_ratios_merged = merge_coherence_ratios(cd_ratios) 

430 

431 if cd_ratios_merged: 

432 logger.log( 

433 TRACE, 

434 "We detected language {} using {}".format( 

435 cd_ratios_merged, encoding_iana 

436 ), 

437 ) 

438 

439 current_match = CharsetMatch( 

440 sequences, 

441 encoding_iana, 

442 mean_mess_ratio, 

443 bom_or_sig_available, 

444 cd_ratios_merged, 

445 ( 

446 decoded_payload 

447 if ( 

448 is_too_large_sequence is False 

449 or encoding_iana in [specified_encoding, "ascii", "utf_8"] 

450 ) 

451 else None 

452 ), 

453 preemptive_declaration=specified_encoding, 

454 ) 

455 

456 results.append(current_match) 

457 

458 if ( 

459 encoding_iana in [specified_encoding, "ascii", "utf_8"] 

460 and mean_mess_ratio < 0.1 

461 ): 

462 # If md says nothing to worry about, then... stop immediately! 

463 if mean_mess_ratio == 0.0: 

464 logger.debug( 

465 "Encoding detection: %s is most likely the one.", 

466 current_match.encoding, 

467 ) 

468 if explain: # Defensive: ensure exit path clean handler 

469 logger.removeHandler(explain_handler) 

470 logger.setLevel(previous_logger_level) 

471 return CharsetMatches([current_match]) 

472 

473 early_stop_results.append(current_match) 

474 

475 if ( 

476 len(early_stop_results) 

477 and (specified_encoding is None or specified_encoding in tested) 

478 and "ascii" in tested 

479 and "utf_8" in tested 

480 ): 

481 probable_result: CharsetMatch = early_stop_results.best() # type: ignore[assignment] 

482 logger.debug( 

483 "Encoding detection: %s is most likely the one.", 

484 probable_result.encoding, 

485 ) 

486 if explain: # Defensive: ensure exit path clean handler 

487 logger.removeHandler(explain_handler) 

488 logger.setLevel(previous_logger_level) 

489 

490 return CharsetMatches([probable_result]) 

491 

492 if encoding_iana == sig_encoding: 

493 logger.debug( 

494 "Encoding detection: %s is most likely the one as we detected a BOM or SIG within " 

495 "the beginning of the sequence.", 

496 encoding_iana, 

497 ) 

498 if explain: # Defensive: ensure exit path clean handler 

499 logger.removeHandler(explain_handler) 

500 logger.setLevel(previous_logger_level) 

501 return CharsetMatches([results[encoding_iana]]) 

502 

503 if len(results) == 0: 

504 if fallback_u8 or fallback_ascii or fallback_specified: 

505 logger.log( 

506 TRACE, 

507 "Nothing got out of the detection process. Using ASCII/UTF-8/Specified fallback.", 

508 ) 

509 

510 if fallback_specified: 

511 logger.debug( 

512 "Encoding detection: %s will be used as a fallback match", 

513 fallback_specified.encoding, 

514 ) 

515 results.append(fallback_specified) 

516 elif ( 

517 (fallback_u8 and fallback_ascii is None) 

518 or ( 

519 fallback_u8 

520 and fallback_ascii 

521 and fallback_u8.fingerprint != fallback_ascii.fingerprint 

522 ) 

523 or (fallback_u8 is not None) 

524 ): 

525 logger.debug("Encoding detection: utf_8 will be used as a fallback match") 

526 results.append(fallback_u8) 

527 elif fallback_ascii: 

528 logger.debug("Encoding detection: ascii will be used as a fallback match") 

529 results.append(fallback_ascii) 

530 

531 if results: 

532 logger.debug( 

533 "Encoding detection: Found %s as plausible (best-candidate) for content. With %i alternatives.", 

534 results.best().encoding, # type: ignore 

535 len(results) - 1, 

536 ) 

537 else: 

538 logger.debug("Encoding detection: Unable to determine any suitable charset.") 

539 

540 if explain: 

541 logger.removeHandler(explain_handler) 

542 logger.setLevel(previous_logger_level) 

543 

544 return results 

545 

546 

547def from_fp( 

548 fp: BinaryIO, 

549 steps: int = 5, 

550 chunk_size: int = 512, 

551 threshold: float = 0.20, 

552 cp_isolation: list[str] | None = None, 

553 cp_exclusion: list[str] | None = None, 

554 preemptive_behaviour: bool = True, 

555 explain: bool = False, 

556 language_threshold: float = 0.1, 

557 enable_fallback: bool = True, 

558) -> CharsetMatches: 

559 """ 

560 Same thing than the function from_bytes but using a file pointer that is already ready. 

561 Will not close the file pointer. 

562 """ 

563 return from_bytes( 

564 fp.read(), 

565 steps, 

566 chunk_size, 

567 threshold, 

568 cp_isolation, 

569 cp_exclusion, 

570 preemptive_behaviour, 

571 explain, 

572 language_threshold, 

573 enable_fallback, 

574 ) 

575 

576 

577def from_path( 

578 path: str | bytes | PathLike, # type: ignore[type-arg] 

579 steps: int = 5, 

580 chunk_size: int = 512, 

581 threshold: float = 0.20, 

582 cp_isolation: list[str] | None = None, 

583 cp_exclusion: list[str] | None = None, 

584 preemptive_behaviour: bool = True, 

585 explain: bool = False, 

586 language_threshold: float = 0.1, 

587 enable_fallback: bool = True, 

588) -> CharsetMatches: 

589 """ 

590 Same thing than the function from_bytes but with one extra step. Opening and reading given file path in binary mode. 

591 Can raise IOError. 

592 """ 

593 with open(path, "rb") as fp: 

594 return from_fp( 

595 fp, 

596 steps, 

597 chunk_size, 

598 threshold, 

599 cp_isolation, 

600 cp_exclusion, 

601 preemptive_behaviour, 

602 explain, 

603 language_threshold, 

604 enable_fallback, 

605 ) 

606 

607 

608def is_binary( 

609 fp_or_path_or_payload: PathLike | str | BinaryIO | bytes, # type: ignore[type-arg] 

610 steps: int = 5, 

611 chunk_size: int = 512, 

612 threshold: float = 0.20, 

613 cp_isolation: list[str] | None = None, 

614 cp_exclusion: list[str] | None = None, 

615 preemptive_behaviour: bool = True, 

616 explain: bool = False, 

617 language_threshold: float = 0.1, 

618 enable_fallback: bool = False, 

619) -> bool: 

620 """ 

621 Detect if the given input (file, bytes, or path) points to a binary file. aka. not a string. 

622 Based on the same main heuristic algorithms and default kwargs at the sole exception that fallbacks match 

623 are disabled to be stricter around ASCII-compatible but unlikely to be a string. 

624 """ 

625 if isinstance(fp_or_path_or_payload, (str, PathLike)): 

626 guesses = from_path( 

627 fp_or_path_or_payload, 

628 steps=steps, 

629 chunk_size=chunk_size, 

630 threshold=threshold, 

631 cp_isolation=cp_isolation, 

632 cp_exclusion=cp_exclusion, 

633 preemptive_behaviour=preemptive_behaviour, 

634 explain=explain, 

635 language_threshold=language_threshold, 

636 enable_fallback=enable_fallback, 

637 ) 

638 elif isinstance( 

639 fp_or_path_or_payload, 

640 ( 

641 bytes, 

642 bytearray, 

643 ), 

644 ): 

645 guesses = from_bytes( 

646 fp_or_path_or_payload, 

647 steps=steps, 

648 chunk_size=chunk_size, 

649 threshold=threshold, 

650 cp_isolation=cp_isolation, 

651 cp_exclusion=cp_exclusion, 

652 preemptive_behaviour=preemptive_behaviour, 

653 explain=explain, 

654 language_threshold=language_threshold, 

655 enable_fallback=enable_fallback, 

656 ) 

657 else: 

658 guesses = from_fp( 

659 fp_or_path_or_payload, 

660 steps=steps, 

661 chunk_size=chunk_size, 

662 threshold=threshold, 

663 cp_isolation=cp_isolation, 

664 cp_exclusion=cp_exclusion, 

665 preemptive_behaviour=preemptive_behaviour, 

666 explain=explain, 

667 language_threshold=language_threshold, 

668 enable_fallback=enable_fallback, 

669 ) 

670 

671 return not guesses