Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/charset_normalizer/api.py: 7%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

214 statements  

1from __future__ import annotations 

2 

3import logging 

4from os import PathLike 

5from typing import BinaryIO 

6 

7from .cd import ( 

8 coherence_ratio, 

9 encoding_languages, 

10 mb_encoding_languages, 

11 merge_coherence_ratios, 

12) 

13from .constant import IANA_SUPPORTED, TOO_BIG_SEQUENCE, TOO_SMALL_SEQUENCE, TRACE 

14from .md import mess_ratio 

15from .models import CharsetMatch, CharsetMatches 

16from .utils import ( 

17 any_specified_encoding, 

18 cut_sequence_chunks, 

19 iana_name, 

20 identify_sig_or_bom, 

21 is_cp_similar, 

22 is_multi_byte_encoding, 

23 should_strip_sig_or_bom, 

24) 

25 

26logger = logging.getLogger("charset_normalizer") 

27explain_handler = logging.StreamHandler() 

28explain_handler.setFormatter( 

29 logging.Formatter("%(asctime)s | %(levelname)s | %(message)s") 

30) 

31 

32 

33def from_bytes( 

34 sequences: bytes | bytearray, 

35 steps: int = 5, 

36 chunk_size: int = 512, 

37 threshold: float = 0.2, 

38 cp_isolation: list[str] | None = None, 

39 cp_exclusion: list[str] | None = None, 

40 preemptive_behaviour: bool = True, 

41 explain: bool = False, 

42 language_threshold: float = 0.1, 

43 enable_fallback: bool = True, 

44) -> CharsetMatches: 

45 """ 

46 Given a raw bytes sequence, return the best possibles charset usable to render str objects. 

47 If there is no results, it is a strong indicator that the source is binary/not text. 

48 By default, the process will extract 5 blocks of 512o each to assess the mess and coherence of a given sequence. 

49 And will give up a particular code page after 20% of measured mess. Those criteria are customizable at will. 

50 

51 The preemptive behavior DOES NOT replace the traditional detection workflow, it prioritize a particular code page 

52 but never take it for granted. Can improve the performance. 

53 

54 You may want to focus your attention to some code page or/and not others, use cp_isolation and cp_exclusion for that 

55 purpose. 

56 

57 This function will strip the SIG in the payload/sequence every time except on UTF-16, UTF-32. 

58 By default the library does not setup any handler other than the NullHandler, if you choose to set the 'explain' 

59 toggle to True it will alter the logger configuration to add a StreamHandler that is suitable for debugging. 

60 Custom logging format and handler can be set manually. 

61 """ 

62 

63 if not isinstance(sequences, (bytearray, bytes)): 

64 raise TypeError( 

65 "Expected object of type bytes or bytearray, got: {}".format( 

66 type(sequences) 

67 ) 

68 ) 

69 

70 if explain: 

71 previous_logger_level: int = logger.level 

72 logger.addHandler(explain_handler) 

73 logger.setLevel(TRACE) 

74 

75 length: int = len(sequences) 

76 

77 if length == 0: 

78 logger.debug("Encoding detection on empty bytes, assuming utf_8 intention.") 

79 if explain: # Defensive: ensure exit path clean handler 

80 logger.removeHandler(explain_handler) 

81 logger.setLevel(previous_logger_level or logging.WARNING) 

82 return CharsetMatches([CharsetMatch(sequences, "utf_8", 0.0, False, [], "")]) 

83 

84 if cp_isolation is not None: 

85 logger.log( 

86 TRACE, 

87 "cp_isolation is set. use this flag for debugging purpose. " 

88 "limited list of encoding allowed : %s.", 

89 ", ".join(cp_isolation), 

90 ) 

91 cp_isolation = [iana_name(cp, False) for cp in cp_isolation] 

92 else: 

93 cp_isolation = [] 

94 

95 if cp_exclusion is not None: 

96 logger.log( 

97 TRACE, 

98 "cp_exclusion is set. use this flag for debugging purpose. " 

99 "limited list of encoding excluded : %s.", 

100 ", ".join(cp_exclusion), 

101 ) 

102 cp_exclusion = [iana_name(cp, False) for cp in cp_exclusion] 

103 else: 

104 cp_exclusion = [] 

105 

106 if length <= (chunk_size * steps): 

107 logger.log( 

108 TRACE, 

109 "override steps (%i) and chunk_size (%i) as content does not fit (%i byte(s) given) parameters.", 

110 steps, 

111 chunk_size, 

112 length, 

113 ) 

114 steps = 1 

115 chunk_size = length 

116 

117 if steps > 1 and length / steps < chunk_size: 

118 chunk_size = int(length / steps) 

119 

120 is_too_small_sequence: bool = len(sequences) < TOO_SMALL_SEQUENCE 

121 is_too_large_sequence: bool = len(sequences) >= TOO_BIG_SEQUENCE 

122 

123 if is_too_small_sequence: 

124 logger.log( 

125 TRACE, 

126 "Trying to detect encoding from a tiny portion of ({}) byte(s).".format( 

127 length 

128 ), 

129 ) 

130 elif is_too_large_sequence: 

131 logger.log( 

132 TRACE, 

133 "Using lazy str decoding because the payload is quite large, ({}) byte(s).".format( 

134 length 

135 ), 

136 ) 

137 

138 prioritized_encodings: list[str] = [] 

139 

140 specified_encoding: str | None = ( 

141 any_specified_encoding(sequences) if preemptive_behaviour else None 

142 ) 

143 

144 if specified_encoding is not None: 

145 prioritized_encodings.append(specified_encoding) 

146 logger.log( 

147 TRACE, 

148 "Detected declarative mark in sequence. Priority +1 given for %s.", 

149 specified_encoding, 

150 ) 

151 

152 tested: set[str] = set() 

153 tested_but_hard_failure: list[str] = [] 

154 tested_but_soft_failure: list[str] = [] 

155 

156 fallback_ascii: CharsetMatch | None = None 

157 fallback_u8: CharsetMatch | None = None 

158 fallback_specified: CharsetMatch | None = None 

159 

160 results: CharsetMatches = CharsetMatches() 

161 

162 early_stop_results: CharsetMatches = CharsetMatches() 

163 

164 sig_encoding, sig_payload = identify_sig_or_bom(sequences) 

165 

166 if sig_encoding is not None: 

167 prioritized_encodings.append(sig_encoding) 

168 logger.log( 

169 TRACE, 

170 "Detected a SIG or BOM mark on first %i byte(s). Priority +1 given for %s.", 

171 len(sig_payload), 

172 sig_encoding, 

173 ) 

174 

175 prioritized_encodings.append("ascii") 

176 

177 if "utf_8" not in prioritized_encodings: 

178 prioritized_encodings.append("utf_8") 

179 

180 for encoding_iana in prioritized_encodings + IANA_SUPPORTED: 

181 if cp_isolation and encoding_iana not in cp_isolation: 

182 continue 

183 

184 if cp_exclusion and encoding_iana in cp_exclusion: 

185 continue 

186 

187 if encoding_iana in tested: 

188 continue 

189 

190 tested.add(encoding_iana) 

191 

192 decoded_payload: str | None = None 

193 bom_or_sig_available: bool = sig_encoding == encoding_iana 

194 strip_sig_or_bom: bool = bom_or_sig_available and should_strip_sig_or_bom( 

195 encoding_iana 

196 ) 

197 

198 if encoding_iana in {"utf_16", "utf_32"} and not bom_or_sig_available: 

199 logger.log( 

200 TRACE, 

201 "Encoding %s won't be tested as-is because it require a BOM. Will try some sub-encoder LE/BE.", 

202 encoding_iana, 

203 ) 

204 continue 

205 if encoding_iana in {"utf_7"} and not bom_or_sig_available: 

206 logger.log( 

207 TRACE, 

208 "Encoding %s won't be tested as-is because detection is unreliable without BOM/SIG.", 

209 encoding_iana, 

210 ) 

211 continue 

212 

213 try: 

214 is_multi_byte_decoder: bool = is_multi_byte_encoding(encoding_iana) 

215 except (ModuleNotFoundError, ImportError): 

216 logger.log( 

217 TRACE, 

218 "Encoding %s does not provide an IncrementalDecoder", 

219 encoding_iana, 

220 ) 

221 continue 

222 

223 try: 

224 if is_too_large_sequence and is_multi_byte_decoder is False: 

225 str( 

226 ( 

227 sequences[: int(50e4)] 

228 if strip_sig_or_bom is False 

229 else sequences[len(sig_payload) : int(50e4)] 

230 ), 

231 encoding=encoding_iana, 

232 ) 

233 else: 

234 decoded_payload = str( 

235 ( 

236 sequences 

237 if strip_sig_or_bom is False 

238 else sequences[len(sig_payload) :] 

239 ), 

240 encoding=encoding_iana, 

241 ) 

242 except (UnicodeDecodeError, LookupError) as e: 

243 if not isinstance(e, LookupError): 

244 logger.log( 

245 TRACE, 

246 "Code page %s does not fit given bytes sequence at ALL. %s", 

247 encoding_iana, 

248 str(e), 

249 ) 

250 tested_but_hard_failure.append(encoding_iana) 

251 continue 

252 

253 similar_soft_failure_test: bool = False 

254 

255 for encoding_soft_failed in tested_but_soft_failure: 

256 if is_cp_similar(encoding_iana, encoding_soft_failed): 

257 similar_soft_failure_test = True 

258 break 

259 

260 if similar_soft_failure_test: 

261 logger.log( 

262 TRACE, 

263 "%s is deemed too similar to code page %s and was consider unsuited already. Continuing!", 

264 encoding_iana, 

265 encoding_soft_failed, 

266 ) 

267 continue 

268 

269 r_ = range( 

270 0 if not bom_or_sig_available else len(sig_payload), 

271 length, 

272 int(length / steps), 

273 ) 

274 

275 multi_byte_bonus: bool = ( 

276 is_multi_byte_decoder 

277 and decoded_payload is not None 

278 and len(decoded_payload) < length 

279 ) 

280 

281 if multi_byte_bonus: 

282 logger.log( 

283 TRACE, 

284 "Code page %s is a multi byte encoding table and it appear that at least one character " 

285 "was encoded using n-bytes.", 

286 encoding_iana, 

287 ) 

288 

289 max_chunk_gave_up: int = int(len(r_) / 4) 

290 

291 max_chunk_gave_up = max(max_chunk_gave_up, 2) 

292 early_stop_count: int = 0 

293 lazy_str_hard_failure = False 

294 

295 md_chunks: list[str] = [] 

296 md_ratios = [] 

297 

298 try: 

299 for chunk in cut_sequence_chunks( 

300 sequences, 

301 encoding_iana, 

302 r_, 

303 chunk_size, 

304 bom_or_sig_available, 

305 strip_sig_or_bom, 

306 sig_payload, 

307 is_multi_byte_decoder, 

308 decoded_payload, 

309 ): 

310 md_chunks.append(chunk) 

311 

312 md_ratios.append( 

313 mess_ratio( 

314 chunk, 

315 threshold, 

316 explain is True and 1 <= len(cp_isolation) <= 2, 

317 ) 

318 ) 

319 

320 if md_ratios[-1] >= threshold: 

321 early_stop_count += 1 

322 

323 if (early_stop_count >= max_chunk_gave_up) or ( 

324 bom_or_sig_available and strip_sig_or_bom is False 

325 ): 

326 break 

327 except ( 

328 UnicodeDecodeError 

329 ) as e: # Lazy str loading may have missed something there 

330 logger.log( 

331 TRACE, 

332 "LazyStr Loading: After MD chunk decode, code page %s does not fit given bytes sequence at ALL. %s", 

333 encoding_iana, 

334 str(e), 

335 ) 

336 early_stop_count = max_chunk_gave_up 

337 lazy_str_hard_failure = True 

338 

339 # We might want to check the sequence again with the whole content 

340 # Only if initial MD tests passes 

341 if ( 

342 not lazy_str_hard_failure 

343 and is_too_large_sequence 

344 and not is_multi_byte_decoder 

345 ): 

346 try: 

347 sequences[int(50e3) :].decode(encoding_iana, errors="strict") 

348 except UnicodeDecodeError as e: 

349 logger.log( 

350 TRACE, 

351 "LazyStr Loading: After final lookup, code page %s does not fit given bytes sequence at ALL. %s", 

352 encoding_iana, 

353 str(e), 

354 ) 

355 tested_but_hard_failure.append(encoding_iana) 

356 continue 

357 

358 mean_mess_ratio: float = sum(md_ratios) / len(md_ratios) if md_ratios else 0.0 

359 if mean_mess_ratio >= threshold or early_stop_count >= max_chunk_gave_up: 

360 tested_but_soft_failure.append(encoding_iana) 

361 logger.log( 

362 TRACE, 

363 "%s was excluded because of initial chaos probing. Gave up %i time(s). " 

364 "Computed mean chaos is %f %%.", 

365 encoding_iana, 

366 early_stop_count, 

367 round(mean_mess_ratio * 100, ndigits=3), 

368 ) 

369 # Preparing those fallbacks in case we got nothing. 

370 if ( 

371 enable_fallback 

372 and encoding_iana 

373 in ["ascii", "utf_8", specified_encoding, "utf_16", "utf_32"] 

374 and not lazy_str_hard_failure 

375 ): 

376 fallback_entry = CharsetMatch( 

377 sequences, 

378 encoding_iana, 

379 threshold, 

380 bom_or_sig_available, 

381 [], 

382 decoded_payload, 

383 preemptive_declaration=specified_encoding, 

384 ) 

385 if encoding_iana == specified_encoding: 

386 fallback_specified = fallback_entry 

387 elif encoding_iana == "ascii": 

388 fallback_ascii = fallback_entry 

389 else: 

390 fallback_u8 = fallback_entry 

391 continue 

392 

393 logger.log( 

394 TRACE, 

395 "%s passed initial chaos probing. Mean measured chaos is %f %%", 

396 encoding_iana, 

397 round(mean_mess_ratio * 100, ndigits=3), 

398 ) 

399 

400 if not is_multi_byte_decoder: 

401 target_languages: list[str] = encoding_languages(encoding_iana) 

402 else: 

403 target_languages = mb_encoding_languages(encoding_iana) 

404 

405 if target_languages: 

406 logger.log( 

407 TRACE, 

408 "{} should target any language(s) of {}".format( 

409 encoding_iana, str(target_languages) 

410 ), 

411 ) 

412 

413 cd_ratios = [] 

414 

415 # We shall skip the CD when its about ASCII 

416 # Most of the time its not relevant to run "language-detection" on it. 

417 if encoding_iana != "ascii": 

418 for chunk in md_chunks: 

419 chunk_languages = coherence_ratio( 

420 chunk, 

421 language_threshold, 

422 ",".join(target_languages) if target_languages else None, 

423 ) 

424 

425 cd_ratios.append(chunk_languages) 

426 

427 cd_ratios_merged = merge_coherence_ratios(cd_ratios) 

428 

429 if cd_ratios_merged: 

430 logger.log( 

431 TRACE, 

432 "We detected language {} using {}".format( 

433 cd_ratios_merged, encoding_iana 

434 ), 

435 ) 

436 

437 current_match = CharsetMatch( 

438 sequences, 

439 encoding_iana, 

440 mean_mess_ratio, 

441 bom_or_sig_available, 

442 cd_ratios_merged, 

443 ( 

444 decoded_payload 

445 if ( 

446 is_too_large_sequence is False 

447 or encoding_iana in [specified_encoding, "ascii", "utf_8"] 

448 ) 

449 else None 

450 ), 

451 preemptive_declaration=specified_encoding, 

452 ) 

453 

454 results.append(current_match) 

455 

456 if ( 

457 encoding_iana in [specified_encoding, "ascii", "utf_8"] 

458 and mean_mess_ratio < 0.1 

459 ): 

460 # If md says nothing to worry about, then... stop immediately! 

461 if mean_mess_ratio == 0.0: 

462 logger.debug( 

463 "Encoding detection: %s is most likely the one.", 

464 current_match.encoding, 

465 ) 

466 if explain: # Defensive: ensure exit path clean handler 

467 logger.removeHandler(explain_handler) 

468 logger.setLevel(previous_logger_level) 

469 return CharsetMatches([current_match]) 

470 

471 early_stop_results.append(current_match) 

472 

473 if ( 

474 len(early_stop_results) 

475 and (specified_encoding is None or specified_encoding in tested) 

476 and "ascii" in tested 

477 and "utf_8" in tested 

478 ): 

479 probable_result: CharsetMatch = early_stop_results.best() # type: ignore[assignment] 

480 logger.debug( 

481 "Encoding detection: %s is most likely the one.", 

482 probable_result.encoding, 

483 ) 

484 if explain: # Defensive: ensure exit path clean handler 

485 logger.removeHandler(explain_handler) 

486 logger.setLevel(previous_logger_level) 

487 

488 return CharsetMatches([probable_result]) 

489 

490 if encoding_iana == sig_encoding: 

491 logger.debug( 

492 "Encoding detection: %s is most likely the one as we detected a BOM or SIG within " 

493 "the beginning of the sequence.", 

494 encoding_iana, 

495 ) 

496 if explain: # Defensive: ensure exit path clean handler 

497 logger.removeHandler(explain_handler) 

498 logger.setLevel(previous_logger_level) 

499 return CharsetMatches([results[encoding_iana]]) 

500 

501 if len(results) == 0: 

502 if fallback_u8 or fallback_ascii or fallback_specified: 

503 logger.log( 

504 TRACE, 

505 "Nothing got out of the detection process. Using ASCII/UTF-8/Specified fallback.", 

506 ) 

507 

508 if fallback_specified: 

509 logger.debug( 

510 "Encoding detection: %s will be used as a fallback match", 

511 fallback_specified.encoding, 

512 ) 

513 results.append(fallback_specified) 

514 elif ( 

515 (fallback_u8 and fallback_ascii is None) 

516 or ( 

517 fallback_u8 

518 and fallback_ascii 

519 and fallback_u8.fingerprint != fallback_ascii.fingerprint 

520 ) 

521 or (fallback_u8 is not None) 

522 ): 

523 logger.debug("Encoding detection: utf_8 will be used as a fallback match") 

524 results.append(fallback_u8) 

525 elif fallback_ascii: 

526 logger.debug("Encoding detection: ascii will be used as a fallback match") 

527 results.append(fallback_ascii) 

528 

529 if results: 

530 logger.debug( 

531 "Encoding detection: Found %s as plausible (best-candidate) for content. With %i alternatives.", 

532 results.best().encoding, # type: ignore 

533 len(results) - 1, 

534 ) 

535 else: 

536 logger.debug("Encoding detection: Unable to determine any suitable charset.") 

537 

538 if explain: 

539 logger.removeHandler(explain_handler) 

540 logger.setLevel(previous_logger_level) 

541 

542 return results 

543 

544 

545def from_fp( 

546 fp: BinaryIO, 

547 steps: int = 5, 

548 chunk_size: int = 512, 

549 threshold: float = 0.20, 

550 cp_isolation: list[str] | None = None, 

551 cp_exclusion: list[str] | None = None, 

552 preemptive_behaviour: bool = True, 

553 explain: bool = False, 

554 language_threshold: float = 0.1, 

555 enable_fallback: bool = True, 

556) -> CharsetMatches: 

557 """ 

558 Same thing than the function from_bytes but using a file pointer that is already ready. 

559 Will not close the file pointer. 

560 """ 

561 return from_bytes( 

562 fp.read(), 

563 steps, 

564 chunk_size, 

565 threshold, 

566 cp_isolation, 

567 cp_exclusion, 

568 preemptive_behaviour, 

569 explain, 

570 language_threshold, 

571 enable_fallback, 

572 ) 

573 

574 

575def from_path( 

576 path: str | bytes | PathLike, # type: ignore[type-arg] 

577 steps: int = 5, 

578 chunk_size: int = 512, 

579 threshold: float = 0.20, 

580 cp_isolation: list[str] | None = None, 

581 cp_exclusion: list[str] | None = None, 

582 preemptive_behaviour: bool = True, 

583 explain: bool = False, 

584 language_threshold: float = 0.1, 

585 enable_fallback: bool = True, 

586) -> CharsetMatches: 

587 """ 

588 Same thing than the function from_bytes but with one extra step. Opening and reading given file path in binary mode. 

589 Can raise IOError. 

590 """ 

591 with open(path, "rb") as fp: 

592 return from_fp( 

593 fp, 

594 steps, 

595 chunk_size, 

596 threshold, 

597 cp_isolation, 

598 cp_exclusion, 

599 preemptive_behaviour, 

600 explain, 

601 language_threshold, 

602 enable_fallback, 

603 ) 

604 

605 

606def is_binary( 

607 fp_or_path_or_payload: PathLike | str | BinaryIO | bytes, # type: ignore[type-arg] 

608 steps: int = 5, 

609 chunk_size: int = 512, 

610 threshold: float = 0.20, 

611 cp_isolation: list[str] | None = None, 

612 cp_exclusion: list[str] | None = None, 

613 preemptive_behaviour: bool = True, 

614 explain: bool = False, 

615 language_threshold: float = 0.1, 

616 enable_fallback: bool = False, 

617) -> bool: 

618 """ 

619 Detect if the given input (file, bytes, or path) points to a binary file. aka. not a string. 

620 Based on the same main heuristic algorithms and default kwargs at the sole exception that fallbacks match 

621 are disabled to be stricter around ASCII-compatible but unlikely to be a string. 

622 """ 

623 if isinstance(fp_or_path_or_payload, (str, PathLike)): 

624 guesses = from_path( 

625 fp_or_path_or_payload, 

626 steps=steps, 

627 chunk_size=chunk_size, 

628 threshold=threshold, 

629 cp_isolation=cp_isolation, 

630 cp_exclusion=cp_exclusion, 

631 preemptive_behaviour=preemptive_behaviour, 

632 explain=explain, 

633 language_threshold=language_threshold, 

634 enable_fallback=enable_fallback, 

635 ) 

636 elif isinstance( 

637 fp_or_path_or_payload, 

638 ( 

639 bytes, 

640 bytearray, 

641 ), 

642 ): 

643 guesses = from_bytes( 

644 fp_or_path_or_payload, 

645 steps=steps, 

646 chunk_size=chunk_size, 

647 threshold=threshold, 

648 cp_isolation=cp_isolation, 

649 cp_exclusion=cp_exclusion, 

650 preemptive_behaviour=preemptive_behaviour, 

651 explain=explain, 

652 language_threshold=language_threshold, 

653 enable_fallback=enable_fallback, 

654 ) 

655 else: 

656 guesses = from_fp( 

657 fp_or_path_or_payload, 

658 steps=steps, 

659 chunk_size=chunk_size, 

660 threshold=threshold, 

661 cp_isolation=cp_isolation, 

662 cp_exclusion=cp_exclusion, 

663 preemptive_behaviour=preemptive_behaviour, 

664 explain=explain, 

665 language_threshold=language_threshold, 

666 enable_fallback=enable_fallback, 

667 ) 

668 

669 return not guesses