Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/charset_normalizer/api.py: 7%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

214 statements  

1from __future__ import annotations 

2 

3import logging 

4from os import PathLike 

5from typing import BinaryIO 

6 

7from .cd import ( 

8 coherence_ratio, 

9 encoding_languages, 

10 mb_encoding_languages, 

11 merge_coherence_ratios, 

12) 

13from .constant import IANA_SUPPORTED, TOO_BIG_SEQUENCE, TOO_SMALL_SEQUENCE, TRACE 

14from .md import mess_ratio 

15from .models import CharsetMatch, CharsetMatches 

16from .utils import ( 

17 any_specified_encoding, 

18 cut_sequence_chunks, 

19 iana_name, 

20 identify_sig_or_bom, 

21 is_cp_similar, 

22 is_multi_byte_encoding, 

23 should_strip_sig_or_bom, 

24) 

25 

26logger = logging.getLogger("charset_normalizer") 

27explain_handler = logging.StreamHandler() 

28explain_handler.setFormatter( 

29 logging.Formatter("%(asctime)s | %(levelname)s | %(message)s") 

30) 

31 

32 

33def from_bytes( 

34 sequences: bytes | bytearray, 

35 steps: int = 5, 

36 chunk_size: int = 512, 

37 threshold: float = 0.2, 

38 cp_isolation: list[str] | None = None, 

39 cp_exclusion: list[str] | None = None, 

40 preemptive_behaviour: bool = True, 

41 explain: bool = False, 

42 language_threshold: float = 0.1, 

43 enable_fallback: bool = True, 

44) -> CharsetMatches: 

45 """ 

46 Given a raw bytes sequence, return the best possibles charset usable to render str objects. 

47 If there is no results, it is a strong indicator that the source is binary/not text. 

48 By default, the process will extract 5 blocks of 512o each to assess the mess and coherence of a given sequence. 

49 And will give up a particular code page after 20% of measured mess. Those criteria are customizable at will. 

50 

51 The preemptive behavior DOES NOT replace the traditional detection workflow, it prioritize a particular code page 

52 but never take it for granted. Can improve the performance. 

53 

54 You may want to focus your attention to some code page or/and not others, use cp_isolation and cp_exclusion for that 

55 purpose. 

56 

57 This function will strip the SIG in the payload/sequence every time except on UTF-16, UTF-32. 

58 By default the library does not setup any handler other than the NullHandler, if you choose to set the 'explain' 

59 toggle to True it will alter the logger configuration to add a StreamHandler that is suitable for debugging. 

60 Custom logging format and handler can be set manually. 

61 """ 

62 

63 if not isinstance(sequences, (bytearray, bytes)): 

64 raise TypeError( 

65 "Expected object of type bytes or bytearray, got: {}".format( 

66 type(sequences) 

67 ) 

68 ) 

69 

70 if explain: 

71 previous_logger_level: int = logger.level 

72 logger.addHandler(explain_handler) 

73 logger.setLevel(TRACE) 

74 

75 length: int = len(sequences) 

76 

77 if length == 0: 

78 logger.debug("Encoding detection on empty bytes, assuming utf_8 intention.") 

79 if explain: # Defensive: ensure exit path clean handler 

80 logger.removeHandler(explain_handler) 

81 logger.setLevel(previous_logger_level or logging.WARNING) 

82 return CharsetMatches([CharsetMatch(sequences, "utf_8", 0.0, False, [], "")]) 

83 

84 if cp_isolation is not None: 

85 logger.log( 

86 TRACE, 

87 "cp_isolation is set. use this flag for debugging purpose. " 

88 "limited list of encoding allowed : %s.", 

89 ", ".join(cp_isolation), 

90 ) 

91 cp_isolation = [iana_name(cp, False) for cp in cp_isolation] 

92 else: 

93 cp_isolation = [] 

94 

95 if cp_exclusion is not None: 

96 logger.log( 

97 TRACE, 

98 "cp_exclusion is set. use this flag for debugging purpose. " 

99 "limited list of encoding excluded : %s.", 

100 ", ".join(cp_exclusion), 

101 ) 

102 cp_exclusion = [iana_name(cp, False) for cp in cp_exclusion] 

103 else: 

104 cp_exclusion = [] 

105 

106 if length <= (chunk_size * steps): 

107 logger.log( 

108 TRACE, 

109 "override steps (%i) and chunk_size (%i) as content does not fit (%i byte(s) given) parameters.", 

110 steps, 

111 chunk_size, 

112 length, 

113 ) 

114 steps = 1 

115 chunk_size = length 

116 

117 if steps > 1 and length / steps < chunk_size: 

118 chunk_size = int(length / steps) 

119 

120 is_too_small_sequence: bool = len(sequences) < TOO_SMALL_SEQUENCE 

121 is_too_large_sequence: bool = len(sequences) >= TOO_BIG_SEQUENCE 

122 

123 if is_too_small_sequence: 

124 logger.log( 

125 TRACE, 

126 "Trying to detect encoding from a tiny portion of ({}) byte(s).".format( 

127 length 

128 ), 

129 ) 

130 elif is_too_large_sequence: 

131 logger.log( 

132 TRACE, 

133 "Using lazy str decoding because the payload is quite large, ({}) byte(s).".format( 

134 length 

135 ), 

136 ) 

137 

138 prioritized_encodings: list[str] = [] 

139 

140 specified_encoding: str | None = ( 

141 any_specified_encoding(sequences) if preemptive_behaviour else None 

142 ) 

143 

144 if specified_encoding is not None: 

145 prioritized_encodings.append(specified_encoding) 

146 logger.log( 

147 TRACE, 

148 "Detected declarative mark in sequence. Priority +1 given for %s.", 

149 specified_encoding, 

150 ) 

151 

152 tested: set[str] = set() 

153 tested_but_hard_failure: list[str] = [] 

154 tested_but_soft_failure: list[str] = [] 

155 

156 fallback_ascii: CharsetMatch | None = None 

157 fallback_u8: CharsetMatch | None = None 

158 fallback_specified: CharsetMatch | None = None 

159 

160 results: CharsetMatches = CharsetMatches() 

161 

162 early_stop_results: CharsetMatches = CharsetMatches() 

163 

164 sig_encoding, sig_payload = identify_sig_or_bom(sequences) 

165 

166 if sig_encoding is not None: 

167 prioritized_encodings.append(sig_encoding) 

168 logger.log( 

169 TRACE, 

170 "Detected a SIG or BOM mark on first %i byte(s). Priority +1 given for %s.", 

171 len(sig_payload), 

172 sig_encoding, 

173 ) 

174 

175 prioritized_encodings.append("ascii") 

176 

177 if "utf_8" not in prioritized_encodings: 

178 prioritized_encodings.append("utf_8") 

179 

180 for encoding_iana in prioritized_encodings + IANA_SUPPORTED: 

181 if cp_isolation and encoding_iana not in cp_isolation: 

182 continue 

183 

184 if cp_exclusion and encoding_iana in cp_exclusion: 

185 continue 

186 

187 if encoding_iana in tested: 

188 continue 

189 

190 tested.add(encoding_iana) 

191 

192 decoded_payload: str | None = None 

193 bom_or_sig_available: bool = sig_encoding == encoding_iana 

194 strip_sig_or_bom: bool = bom_or_sig_available and should_strip_sig_or_bom( 

195 encoding_iana 

196 ) 

197 

198 if encoding_iana in {"utf_16", "utf_32"} and not bom_or_sig_available: 

199 logger.log( 

200 TRACE, 

201 "Encoding %s won't be tested as-is because it require a BOM. Will try some sub-encoder LE/BE.", 

202 encoding_iana, 

203 ) 

204 continue 

205 if encoding_iana in {"utf_7"} and not bom_or_sig_available: 

206 logger.log( 

207 TRACE, 

208 "Encoding %s won't be tested as-is because detection is unreliable without BOM/SIG.", 

209 encoding_iana, 

210 ) 

211 continue 

212 

213 try: 

214 is_multi_byte_decoder: bool = is_multi_byte_encoding(encoding_iana) 

215 except (ModuleNotFoundError, ImportError): 

216 logger.log( 

217 TRACE, 

218 "Encoding %s does not provide an IncrementalDecoder", 

219 encoding_iana, 

220 ) 

221 continue 

222 

223 try: 

224 if is_too_large_sequence and is_multi_byte_decoder is False: 

225 str( 

226 ( 

227 sequences[: int(50e4)] 

228 if strip_sig_or_bom is False 

229 else sequences[len(sig_payload) : int(50e4)] 

230 ), 

231 encoding=encoding_iana, 

232 ) 

233 else: 

234 decoded_payload = str( 

235 ( 

236 sequences 

237 if strip_sig_or_bom is False 

238 else sequences[len(sig_payload) :] 

239 ), 

240 encoding=encoding_iana, 

241 ) 

242 except (UnicodeDecodeError, LookupError) as e: 

243 if not isinstance(e, LookupError): 

244 logger.log( 

245 TRACE, 

246 "Code page %s does not fit given bytes sequence at ALL. %s", 

247 encoding_iana, 

248 str(e), 

249 ) 

250 tested_but_hard_failure.append(encoding_iana) 

251 continue 

252 

253 similar_soft_failure_test: bool = False 

254 

255 for encoding_soft_failed in tested_but_soft_failure: 

256 if is_cp_similar(encoding_iana, encoding_soft_failed): 

257 similar_soft_failure_test = True 

258 break 

259 

260 if similar_soft_failure_test: 

261 logger.log( 

262 TRACE, 

263 "%s is deemed too similar to code page %s and was consider unsuited already. Continuing!", 

264 encoding_iana, 

265 encoding_soft_failed, 

266 ) 

267 continue 

268 

269 r_ = range( 

270 0 if not bom_or_sig_available else len(sig_payload), 

271 length, 

272 int(length / steps), 

273 ) 

274 

275 multi_byte_bonus: bool = ( 

276 is_multi_byte_decoder 

277 and decoded_payload is not None 

278 and len(decoded_payload) < length 

279 ) 

280 

281 if multi_byte_bonus: 

282 logger.log( 

283 TRACE, 

284 "Code page %s is a multi byte encoding table and it appear that at least one character " 

285 "was encoded using n-bytes.", 

286 encoding_iana, 

287 ) 

288 

289 max_chunk_gave_up: int = int(len(r_) / 4) 

290 

291 max_chunk_gave_up = max(max_chunk_gave_up, 2) 

292 early_stop_count: int = 0 

293 lazy_str_hard_failure = False 

294 

295 md_chunks: list[str] = [] 

296 md_ratios = [] 

297 

298 try: 

299 for chunk in cut_sequence_chunks( 

300 sequences, 

301 encoding_iana, 

302 r_, 

303 chunk_size, 

304 bom_or_sig_available, 

305 strip_sig_or_bom, 

306 sig_payload, 

307 is_multi_byte_decoder, 

308 decoded_payload, 

309 ): 

310 md_chunks.append(chunk) 

311 

312 md_ratios.append( 

313 mess_ratio( 

314 chunk, 

315 threshold, 

316 explain is True and 1 <= len(cp_isolation) <= 2, 

317 ) 

318 ) 

319 

320 if md_ratios[-1] >= threshold: 

321 early_stop_count += 1 

322 

323 if (early_stop_count >= max_chunk_gave_up) or ( 

324 bom_or_sig_available and strip_sig_or_bom is False 

325 ): 

326 break 

327 except ( 

328 UnicodeDecodeError 

329 ) as e: # Lazy str loading may have missed something there 

330 logger.log( 

331 TRACE, 

332 "LazyStr Loading: After MD chunk decode, code page %s does not fit given bytes sequence at ALL. %s", 

333 encoding_iana, 

334 str(e), 

335 ) 

336 early_stop_count = max_chunk_gave_up 

337 lazy_str_hard_failure = True 

338 

339 # We might want to check the sequence again with the whole content 

340 # Only if initial MD tests passes 

341 if ( 

342 not lazy_str_hard_failure 

343 and is_too_large_sequence 

344 and not is_multi_byte_decoder 

345 ): 

346 try: 

347 sequences[int(50e3) :].decode(encoding_iana, errors="strict") 

348 except UnicodeDecodeError as e: 

349 logger.log( 

350 TRACE, 

351 "LazyStr Loading: After final lookup, code page %s does not fit given bytes sequence at ALL. %s", 

352 encoding_iana, 

353 str(e), 

354 ) 

355 tested_but_hard_failure.append(encoding_iana) 

356 continue 

357 

358 mean_mess_ratio: float = sum(md_ratios) / len(md_ratios) if md_ratios else 0.0 

359 if mean_mess_ratio >= threshold or early_stop_count >= max_chunk_gave_up: 

360 tested_but_soft_failure.append(encoding_iana) 

361 logger.log( 

362 TRACE, 

363 "%s was excluded because of initial chaos probing. Gave up %i time(s). " 

364 "Computed mean chaos is %f %%.", 

365 encoding_iana, 

366 early_stop_count, 

367 round(mean_mess_ratio * 100, ndigits=3), 

368 ) 

369 # Preparing those fallbacks in case we got nothing. 

370 if ( 

371 enable_fallback 

372 and encoding_iana in ["ascii", "utf_8", specified_encoding] 

373 and not lazy_str_hard_failure 

374 ): 

375 fallback_entry = CharsetMatch( 

376 sequences, 

377 encoding_iana, 

378 threshold, 

379 False, 

380 [], 

381 decoded_payload, 

382 preemptive_declaration=specified_encoding, 

383 ) 

384 if encoding_iana == specified_encoding: 

385 fallback_specified = fallback_entry 

386 elif encoding_iana == "ascii": 

387 fallback_ascii = fallback_entry 

388 else: 

389 fallback_u8 = fallback_entry 

390 continue 

391 

392 logger.log( 

393 TRACE, 

394 "%s passed initial chaos probing. Mean measured chaos is %f %%", 

395 encoding_iana, 

396 round(mean_mess_ratio * 100, ndigits=3), 

397 ) 

398 

399 if not is_multi_byte_decoder: 

400 target_languages: list[str] = encoding_languages(encoding_iana) 

401 else: 

402 target_languages = mb_encoding_languages(encoding_iana) 

403 

404 if target_languages: 

405 logger.log( 

406 TRACE, 

407 "{} should target any language(s) of {}".format( 

408 encoding_iana, str(target_languages) 

409 ), 

410 ) 

411 

412 cd_ratios = [] 

413 

414 # We shall skip the CD when its about ASCII 

415 # Most of the time its not relevant to run "language-detection" on it. 

416 if encoding_iana != "ascii": 

417 for chunk in md_chunks: 

418 chunk_languages = coherence_ratio( 

419 chunk, 

420 language_threshold, 

421 ",".join(target_languages) if target_languages else None, 

422 ) 

423 

424 cd_ratios.append(chunk_languages) 

425 

426 cd_ratios_merged = merge_coherence_ratios(cd_ratios) 

427 

428 if cd_ratios_merged: 

429 logger.log( 

430 TRACE, 

431 "We detected language {} using {}".format( 

432 cd_ratios_merged, encoding_iana 

433 ), 

434 ) 

435 

436 current_match = CharsetMatch( 

437 sequences, 

438 encoding_iana, 

439 mean_mess_ratio, 

440 bom_or_sig_available, 

441 cd_ratios_merged, 

442 ( 

443 decoded_payload 

444 if ( 

445 is_too_large_sequence is False 

446 or encoding_iana in [specified_encoding, "ascii", "utf_8"] 

447 ) 

448 else None 

449 ), 

450 preemptive_declaration=specified_encoding, 

451 ) 

452 

453 results.append(current_match) 

454 

455 if ( 

456 encoding_iana in [specified_encoding, "ascii", "utf_8"] 

457 and mean_mess_ratio < 0.1 

458 ): 

459 # If md says nothing to worry about, then... stop immediately! 

460 if mean_mess_ratio == 0.0: 

461 logger.debug( 

462 "Encoding detection: %s is most likely the one.", 

463 current_match.encoding, 

464 ) 

465 if explain: # Defensive: ensure exit path clean handler 

466 logger.removeHandler(explain_handler) 

467 logger.setLevel(previous_logger_level) 

468 return CharsetMatches([current_match]) 

469 

470 early_stop_results.append(current_match) 

471 

472 if ( 

473 len(early_stop_results) 

474 and (specified_encoding is None or specified_encoding in tested) 

475 and "ascii" in tested 

476 and "utf_8" in tested 

477 ): 

478 probable_result: CharsetMatch = early_stop_results.best() # type: ignore[assignment] 

479 logger.debug( 

480 "Encoding detection: %s is most likely the one.", 

481 probable_result.encoding, 

482 ) 

483 if explain: # Defensive: ensure exit path clean handler 

484 logger.removeHandler(explain_handler) 

485 logger.setLevel(previous_logger_level) 

486 

487 return CharsetMatches([probable_result]) 

488 

489 if encoding_iana == sig_encoding: 

490 logger.debug( 

491 "Encoding detection: %s is most likely the one as we detected a BOM or SIG within " 

492 "the beginning of the sequence.", 

493 encoding_iana, 

494 ) 

495 if explain: # Defensive: ensure exit path clean handler 

496 logger.removeHandler(explain_handler) 

497 logger.setLevel(previous_logger_level) 

498 return CharsetMatches([results[encoding_iana]]) 

499 

500 if len(results) == 0: 

501 if fallback_u8 or fallback_ascii or fallback_specified: 

502 logger.log( 

503 TRACE, 

504 "Nothing got out of the detection process. Using ASCII/UTF-8/Specified fallback.", 

505 ) 

506 

507 if fallback_specified: 

508 logger.debug( 

509 "Encoding detection: %s will be used as a fallback match", 

510 fallback_specified.encoding, 

511 ) 

512 results.append(fallback_specified) 

513 elif ( 

514 (fallback_u8 and fallback_ascii is None) 

515 or ( 

516 fallback_u8 

517 and fallback_ascii 

518 and fallback_u8.fingerprint != fallback_ascii.fingerprint 

519 ) 

520 or (fallback_u8 is not None) 

521 ): 

522 logger.debug("Encoding detection: utf_8 will be used as a fallback match") 

523 results.append(fallback_u8) 

524 elif fallback_ascii: 

525 logger.debug("Encoding detection: ascii will be used as a fallback match") 

526 results.append(fallback_ascii) 

527 

528 if results: 

529 logger.debug( 

530 "Encoding detection: Found %s as plausible (best-candidate) for content. With %i alternatives.", 

531 results.best().encoding, # type: ignore 

532 len(results) - 1, 

533 ) 

534 else: 

535 logger.debug("Encoding detection: Unable to determine any suitable charset.") 

536 

537 if explain: 

538 logger.removeHandler(explain_handler) 

539 logger.setLevel(previous_logger_level) 

540 

541 return results 

542 

543 

544def from_fp( 

545 fp: BinaryIO, 

546 steps: int = 5, 

547 chunk_size: int = 512, 

548 threshold: float = 0.20, 

549 cp_isolation: list[str] | None = None, 

550 cp_exclusion: list[str] | None = None, 

551 preemptive_behaviour: bool = True, 

552 explain: bool = False, 

553 language_threshold: float = 0.1, 

554 enable_fallback: bool = True, 

555) -> CharsetMatches: 

556 """ 

557 Same thing than the function from_bytes but using a file pointer that is already ready. 

558 Will not close the file pointer. 

559 """ 

560 return from_bytes( 

561 fp.read(), 

562 steps, 

563 chunk_size, 

564 threshold, 

565 cp_isolation, 

566 cp_exclusion, 

567 preemptive_behaviour, 

568 explain, 

569 language_threshold, 

570 enable_fallback, 

571 ) 

572 

573 

574def from_path( 

575 path: str | bytes | PathLike, # type: ignore[type-arg] 

576 steps: int = 5, 

577 chunk_size: int = 512, 

578 threshold: float = 0.20, 

579 cp_isolation: list[str] | None = None, 

580 cp_exclusion: list[str] | None = None, 

581 preemptive_behaviour: bool = True, 

582 explain: bool = False, 

583 language_threshold: float = 0.1, 

584 enable_fallback: bool = True, 

585) -> CharsetMatches: 

586 """ 

587 Same thing than the function from_bytes but with one extra step. Opening and reading given file path in binary mode. 

588 Can raise IOError. 

589 """ 

590 with open(path, "rb") as fp: 

591 return from_fp( 

592 fp, 

593 steps, 

594 chunk_size, 

595 threshold, 

596 cp_isolation, 

597 cp_exclusion, 

598 preemptive_behaviour, 

599 explain, 

600 language_threshold, 

601 enable_fallback, 

602 ) 

603 

604 

605def is_binary( 

606 fp_or_path_or_payload: PathLike | str | BinaryIO | bytes, # type: ignore[type-arg] 

607 steps: int = 5, 

608 chunk_size: int = 512, 

609 threshold: float = 0.20, 

610 cp_isolation: list[str] | None = None, 

611 cp_exclusion: list[str] | None = None, 

612 preemptive_behaviour: bool = True, 

613 explain: bool = False, 

614 language_threshold: float = 0.1, 

615 enable_fallback: bool = False, 

616) -> bool: 

617 """ 

618 Detect if the given input (file, bytes, or path) points to a binary file. aka. not a string. 

619 Based on the same main heuristic algorithms and default kwargs at the sole exception that fallbacks match 

620 are disabled to be stricter around ASCII-compatible but unlikely to be a string. 

621 """ 

622 if isinstance(fp_or_path_or_payload, (str, PathLike)): 

623 guesses = from_path( 

624 fp_or_path_or_payload, 

625 steps=steps, 

626 chunk_size=chunk_size, 

627 threshold=threshold, 

628 cp_isolation=cp_isolation, 

629 cp_exclusion=cp_exclusion, 

630 preemptive_behaviour=preemptive_behaviour, 

631 explain=explain, 

632 language_threshold=language_threshold, 

633 enable_fallback=enable_fallback, 

634 ) 

635 elif isinstance( 

636 fp_or_path_or_payload, 

637 ( 

638 bytes, 

639 bytearray, 

640 ), 

641 ): 

642 guesses = from_bytes( 

643 fp_or_path_or_payload, 

644 steps=steps, 

645 chunk_size=chunk_size, 

646 threshold=threshold, 

647 cp_isolation=cp_isolation, 

648 cp_exclusion=cp_exclusion, 

649 preemptive_behaviour=preemptive_behaviour, 

650 explain=explain, 

651 language_threshold=language_threshold, 

652 enable_fallback=enable_fallback, 

653 ) 

654 else: 

655 guesses = from_fp( 

656 fp_or_path_or_payload, 

657 steps=steps, 

658 chunk_size=chunk_size, 

659 threshold=threshold, 

660 cp_isolation=cp_isolation, 

661 cp_exclusion=cp_exclusion, 

662 preemptive_behaviour=preemptive_behaviour, 

663 explain=explain, 

664 language_threshold=language_threshold, 

665 enable_fallback=enable_fallback, 

666 ) 

667 

668 return not guesses