Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/charset_normalizer/api.py: 7%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

202 statements  

1import logging 

2from os import PathLike 

3from typing import BinaryIO, List, Optional, Set, Union 

4 

5from .cd import ( 

6 coherence_ratio, 

7 encoding_languages, 

8 mb_encoding_languages, 

9 merge_coherence_ratios, 

10) 

11from .constant import IANA_SUPPORTED, TOO_BIG_SEQUENCE, TOO_SMALL_SEQUENCE, TRACE 

12from .md import mess_ratio 

13from .models import CharsetMatch, CharsetMatches 

14from .utils import ( 

15 any_specified_encoding, 

16 cut_sequence_chunks, 

17 iana_name, 

18 identify_sig_or_bom, 

19 is_cp_similar, 

20 is_multi_byte_encoding, 

21 should_strip_sig_or_bom, 

22) 

23 

24# Will most likely be controversial 

25# logging.addLevelName(TRACE, "TRACE") 

26logger = logging.getLogger("charset_normalizer") 

27explain_handler = logging.StreamHandler() 

28explain_handler.setFormatter( 

29 logging.Formatter("%(asctime)s | %(levelname)s | %(message)s") 

30) 

31 

32 

33def from_bytes( 

34 sequences: Union[bytes, bytearray], 

35 steps: int = 5, 

36 chunk_size: int = 512, 

37 threshold: float = 0.2, 

38 cp_isolation: Optional[List[str]] = None, 

39 cp_exclusion: Optional[List[str]] = None, 

40 preemptive_behaviour: bool = True, 

41 explain: bool = False, 

42 language_threshold: float = 0.1, 

43 enable_fallback: bool = True, 

44) -> CharsetMatches: 

45 """ 

46 Given a raw bytes sequence, return the best possibles charset usable to render str objects. 

47 If there is no results, it is a strong indicator that the source is binary/not text. 

48 By default, the process will extract 5 blocks of 512o each to assess the mess and coherence of a given sequence. 

49 And will give up a particular code page after 20% of measured mess. Those criteria are customizable at will. 

50 

51 The preemptive behavior DOES NOT replace the traditional detection workflow, it prioritize a particular code page 

52 but never take it for granted. Can improve the performance. 

53 

54 You may want to focus your attention to some code page or/and not others, use cp_isolation and cp_exclusion for that 

55 purpose. 

56 

57 This function will strip the SIG in the payload/sequence every time except on UTF-16, UTF-32. 

58 By default the library does not setup any handler other than the NullHandler, if you choose to set the 'explain' 

59 toggle to True it will alter the logger configuration to add a StreamHandler that is suitable for debugging. 

60 Custom logging format and handler can be set manually. 

61 """ 

62 

63 if not isinstance(sequences, (bytearray, bytes)): 

64 raise TypeError( 

65 "Expected object of type bytes or bytearray, got: {0}".format( 

66 type(sequences) 

67 ) 

68 ) 

69 

70 if explain: 

71 previous_logger_level: int = logger.level 

72 logger.addHandler(explain_handler) 

73 logger.setLevel(TRACE) 

74 

75 length: int = len(sequences) 

76 

77 if length == 0: 

78 logger.debug("Encoding detection on empty bytes, assuming utf_8 intention.") 

79 if explain: 

80 logger.removeHandler(explain_handler) 

81 logger.setLevel(previous_logger_level or logging.WARNING) 

82 return CharsetMatches([CharsetMatch(sequences, "utf_8", 0.0, False, [], "")]) 

83 

84 if cp_isolation is not None: 

85 logger.log( 

86 TRACE, 

87 "cp_isolation is set. use this flag for debugging purpose. " 

88 "limited list of encoding allowed : %s.", 

89 ", ".join(cp_isolation), 

90 ) 

91 cp_isolation = [iana_name(cp, False) for cp in cp_isolation] 

92 else: 

93 cp_isolation = [] 

94 

95 if cp_exclusion is not None: 

96 logger.log( 

97 TRACE, 

98 "cp_exclusion is set. use this flag for debugging purpose. " 

99 "limited list of encoding excluded : %s.", 

100 ", ".join(cp_exclusion), 

101 ) 

102 cp_exclusion = [iana_name(cp, False) for cp in cp_exclusion] 

103 else: 

104 cp_exclusion = [] 

105 

106 if length <= (chunk_size * steps): 

107 logger.log( 

108 TRACE, 

109 "override steps (%i) and chunk_size (%i) as content does not fit (%i byte(s) given) parameters.", 

110 steps, 

111 chunk_size, 

112 length, 

113 ) 

114 steps = 1 

115 chunk_size = length 

116 

117 if steps > 1 and length / steps < chunk_size: 

118 chunk_size = int(length / steps) 

119 

120 is_too_small_sequence: bool = len(sequences) < TOO_SMALL_SEQUENCE 

121 is_too_large_sequence: bool = len(sequences) >= TOO_BIG_SEQUENCE 

122 

123 if is_too_small_sequence: 

124 logger.log( 

125 TRACE, 

126 "Trying to detect encoding from a tiny portion of ({}) byte(s).".format( 

127 length 

128 ), 

129 ) 

130 elif is_too_large_sequence: 

131 logger.log( 

132 TRACE, 

133 "Using lazy str decoding because the payload is quite large, ({}) byte(s).".format( 

134 length 

135 ), 

136 ) 

137 

138 prioritized_encodings: List[str] = [] 

139 

140 specified_encoding: Optional[str] = ( 

141 any_specified_encoding(sequences) if preemptive_behaviour else None 

142 ) 

143 

144 if specified_encoding is not None: 

145 prioritized_encodings.append(specified_encoding) 

146 logger.log( 

147 TRACE, 

148 "Detected declarative mark in sequence. Priority +1 given for %s.", 

149 specified_encoding, 

150 ) 

151 

152 tested: Set[str] = set() 

153 tested_but_hard_failure: List[str] = [] 

154 tested_but_soft_failure: List[str] = [] 

155 

156 fallback_ascii: Optional[CharsetMatch] = None 

157 fallback_u8: Optional[CharsetMatch] = None 

158 fallback_specified: Optional[CharsetMatch] = None 

159 

160 results: CharsetMatches = CharsetMatches() 

161 

162 sig_encoding, sig_payload = identify_sig_or_bom(sequences) 

163 

164 if sig_encoding is not None: 

165 prioritized_encodings.append(sig_encoding) 

166 logger.log( 

167 TRACE, 

168 "Detected a SIG or BOM mark on first %i byte(s). Priority +1 given for %s.", 

169 len(sig_payload), 

170 sig_encoding, 

171 ) 

172 

173 prioritized_encodings.append("ascii") 

174 

175 if "utf_8" not in prioritized_encodings: 

176 prioritized_encodings.append("utf_8") 

177 

178 for encoding_iana in prioritized_encodings + IANA_SUPPORTED: 

179 if cp_isolation and encoding_iana not in cp_isolation: 

180 continue 

181 

182 if cp_exclusion and encoding_iana in cp_exclusion: 

183 continue 

184 

185 if encoding_iana in tested: 

186 continue 

187 

188 tested.add(encoding_iana) 

189 

190 decoded_payload: Optional[str] = None 

191 bom_or_sig_available: bool = sig_encoding == encoding_iana 

192 strip_sig_or_bom: bool = bom_or_sig_available and should_strip_sig_or_bom( 

193 encoding_iana 

194 ) 

195 

196 if encoding_iana in {"utf_16", "utf_32"} and not bom_or_sig_available: 

197 logger.log( 

198 TRACE, 

199 "Encoding %s won't be tested as-is because it require a BOM. Will try some sub-encoder LE/BE.", 

200 encoding_iana, 

201 ) 

202 continue 

203 if encoding_iana in {"utf_7"} and not bom_or_sig_available: 

204 logger.log( 

205 TRACE, 

206 "Encoding %s won't be tested as-is because detection is unreliable without BOM/SIG.", 

207 encoding_iana, 

208 ) 

209 continue 

210 

211 try: 

212 is_multi_byte_decoder: bool = is_multi_byte_encoding(encoding_iana) 

213 except (ModuleNotFoundError, ImportError): 

214 logger.log( 

215 TRACE, 

216 "Encoding %s does not provide an IncrementalDecoder", 

217 encoding_iana, 

218 ) 

219 continue 

220 

221 try: 

222 if is_too_large_sequence and is_multi_byte_decoder is False: 

223 str( 

224 sequences[: int(50e4)] 

225 if strip_sig_or_bom is False 

226 else sequences[len(sig_payload) : int(50e4)], 

227 encoding=encoding_iana, 

228 ) 

229 else: 

230 decoded_payload = str( 

231 sequences 

232 if strip_sig_or_bom is False 

233 else sequences[len(sig_payload) :], 

234 encoding=encoding_iana, 

235 ) 

236 except (UnicodeDecodeError, LookupError) as e: 

237 if not isinstance(e, LookupError): 

238 logger.log( 

239 TRACE, 

240 "Code page %s does not fit given bytes sequence at ALL. %s", 

241 encoding_iana, 

242 str(e), 

243 ) 

244 tested_but_hard_failure.append(encoding_iana) 

245 continue 

246 

247 similar_soft_failure_test: bool = False 

248 

249 for encoding_soft_failed in tested_but_soft_failure: 

250 if is_cp_similar(encoding_iana, encoding_soft_failed): 

251 similar_soft_failure_test = True 

252 break 

253 

254 if similar_soft_failure_test: 

255 logger.log( 

256 TRACE, 

257 "%s is deemed too similar to code page %s and was consider unsuited already. Continuing!", 

258 encoding_iana, 

259 encoding_soft_failed, 

260 ) 

261 continue 

262 

263 r_ = range( 

264 0 if not bom_or_sig_available else len(sig_payload), 

265 length, 

266 int(length / steps), 

267 ) 

268 

269 multi_byte_bonus: bool = ( 

270 is_multi_byte_decoder 

271 and decoded_payload is not None 

272 and len(decoded_payload) < length 

273 ) 

274 

275 if multi_byte_bonus: 

276 logger.log( 

277 TRACE, 

278 "Code page %s is a multi byte encoding table and it appear that at least one character " 

279 "was encoded using n-bytes.", 

280 encoding_iana, 

281 ) 

282 

283 max_chunk_gave_up: int = int(len(r_) / 4) 

284 

285 max_chunk_gave_up = max(max_chunk_gave_up, 2) 

286 early_stop_count: int = 0 

287 lazy_str_hard_failure = False 

288 

289 md_chunks: List[str] = [] 

290 md_ratios = [] 

291 

292 try: 

293 for chunk in cut_sequence_chunks( 

294 sequences, 

295 encoding_iana, 

296 r_, 

297 chunk_size, 

298 bom_or_sig_available, 

299 strip_sig_or_bom, 

300 sig_payload, 

301 is_multi_byte_decoder, 

302 decoded_payload, 

303 ): 

304 md_chunks.append(chunk) 

305 

306 md_ratios.append( 

307 mess_ratio( 

308 chunk, 

309 threshold, 

310 explain is True and 1 <= len(cp_isolation) <= 2, 

311 ) 

312 ) 

313 

314 if md_ratios[-1] >= threshold: 

315 early_stop_count += 1 

316 

317 if (early_stop_count >= max_chunk_gave_up) or ( 

318 bom_or_sig_available and strip_sig_or_bom is False 

319 ): 

320 break 

321 except ( 

322 UnicodeDecodeError 

323 ) as e: # Lazy str loading may have missed something there 

324 logger.log( 

325 TRACE, 

326 "LazyStr Loading: After MD chunk decode, code page %s does not fit given bytes sequence at ALL. %s", 

327 encoding_iana, 

328 str(e), 

329 ) 

330 early_stop_count = max_chunk_gave_up 

331 lazy_str_hard_failure = True 

332 

333 # We might want to check the sequence again with the whole content 

334 # Only if initial MD tests passes 

335 if ( 

336 not lazy_str_hard_failure 

337 and is_too_large_sequence 

338 and not is_multi_byte_decoder 

339 ): 

340 try: 

341 sequences[int(50e3) :].decode(encoding_iana, errors="strict") 

342 except UnicodeDecodeError as e: 

343 logger.log( 

344 TRACE, 

345 "LazyStr Loading: After final lookup, code page %s does not fit given bytes sequence at ALL. %s", 

346 encoding_iana, 

347 str(e), 

348 ) 

349 tested_but_hard_failure.append(encoding_iana) 

350 continue 

351 

352 mean_mess_ratio: float = sum(md_ratios) / len(md_ratios) if md_ratios else 0.0 

353 if mean_mess_ratio >= threshold or early_stop_count >= max_chunk_gave_up: 

354 tested_but_soft_failure.append(encoding_iana) 

355 logger.log( 

356 TRACE, 

357 "%s was excluded because of initial chaos probing. Gave up %i time(s). " 

358 "Computed mean chaos is %f %%.", 

359 encoding_iana, 

360 early_stop_count, 

361 round(mean_mess_ratio * 100, ndigits=3), 

362 ) 

363 # Preparing those fallbacks in case we got nothing. 

364 if ( 

365 enable_fallback 

366 and encoding_iana in ["ascii", "utf_8", specified_encoding] 

367 and not lazy_str_hard_failure 

368 ): 

369 fallback_entry = CharsetMatch( 

370 sequences, encoding_iana, threshold, False, [], decoded_payload 

371 ) 

372 if encoding_iana == specified_encoding: 

373 fallback_specified = fallback_entry 

374 elif encoding_iana == "ascii": 

375 fallback_ascii = fallback_entry 

376 else: 

377 fallback_u8 = fallback_entry 

378 continue 

379 

380 logger.log( 

381 TRACE, 

382 "%s passed initial chaos probing. Mean measured chaos is %f %%", 

383 encoding_iana, 

384 round(mean_mess_ratio * 100, ndigits=3), 

385 ) 

386 

387 if not is_multi_byte_decoder: 

388 target_languages: List[str] = encoding_languages(encoding_iana) 

389 else: 

390 target_languages = mb_encoding_languages(encoding_iana) 

391 

392 if target_languages: 

393 logger.log( 

394 TRACE, 

395 "{} should target any language(s) of {}".format( 

396 encoding_iana, str(target_languages) 

397 ), 

398 ) 

399 

400 cd_ratios = [] 

401 

402 # We shall skip the CD when its about ASCII 

403 # Most of the time its not relevant to run "language-detection" on it. 

404 if encoding_iana != "ascii": 

405 for chunk in md_chunks: 

406 chunk_languages = coherence_ratio( 

407 chunk, 

408 language_threshold, 

409 ",".join(target_languages) if target_languages else None, 

410 ) 

411 

412 cd_ratios.append(chunk_languages) 

413 

414 cd_ratios_merged = merge_coherence_ratios(cd_ratios) 

415 

416 if cd_ratios_merged: 

417 logger.log( 

418 TRACE, 

419 "We detected language {} using {}".format( 

420 cd_ratios_merged, encoding_iana 

421 ), 

422 ) 

423 

424 results.append( 

425 CharsetMatch( 

426 sequences, 

427 encoding_iana, 

428 mean_mess_ratio, 

429 bom_or_sig_available, 

430 cd_ratios_merged, 

431 decoded_payload, 

432 ) 

433 ) 

434 

435 if ( 

436 encoding_iana in [specified_encoding, "ascii", "utf_8"] 

437 and mean_mess_ratio < 0.1 

438 ): 

439 logger.debug( 

440 "Encoding detection: %s is most likely the one.", encoding_iana 

441 ) 

442 if explain: 

443 logger.removeHandler(explain_handler) 

444 logger.setLevel(previous_logger_level) 

445 return CharsetMatches([results[encoding_iana]]) 

446 

447 if encoding_iana == sig_encoding: 

448 logger.debug( 

449 "Encoding detection: %s is most likely the one as we detected a BOM or SIG within " 

450 "the beginning of the sequence.", 

451 encoding_iana, 

452 ) 

453 if explain: 

454 logger.removeHandler(explain_handler) 

455 logger.setLevel(previous_logger_level) 

456 return CharsetMatches([results[encoding_iana]]) 

457 

458 if len(results) == 0: 

459 if fallback_u8 or fallback_ascii or fallback_specified: 

460 logger.log( 

461 TRACE, 

462 "Nothing got out of the detection process. Using ASCII/UTF-8/Specified fallback.", 

463 ) 

464 

465 if fallback_specified: 

466 logger.debug( 

467 "Encoding detection: %s will be used as a fallback match", 

468 fallback_specified.encoding, 

469 ) 

470 results.append(fallback_specified) 

471 elif ( 

472 (fallback_u8 and fallback_ascii is None) 

473 or ( 

474 fallback_u8 

475 and fallback_ascii 

476 and fallback_u8.fingerprint != fallback_ascii.fingerprint 

477 ) 

478 or (fallback_u8 is not None) 

479 ): 

480 logger.debug("Encoding detection: utf_8 will be used as a fallback match") 

481 results.append(fallback_u8) 

482 elif fallback_ascii: 

483 logger.debug("Encoding detection: ascii will be used as a fallback match") 

484 results.append(fallback_ascii) 

485 

486 if results: 

487 logger.debug( 

488 "Encoding detection: Found %s as plausible (best-candidate) for content. With %i alternatives.", 

489 results.best().encoding, # type: ignore 

490 len(results) - 1, 

491 ) 

492 else: 

493 logger.debug("Encoding detection: Unable to determine any suitable charset.") 

494 

495 if explain: 

496 logger.removeHandler(explain_handler) 

497 logger.setLevel(previous_logger_level) 

498 

499 return results 

500 

501 

502def from_fp( 

503 fp: BinaryIO, 

504 steps: int = 5, 

505 chunk_size: int = 512, 

506 threshold: float = 0.20, 

507 cp_isolation: Optional[List[str]] = None, 

508 cp_exclusion: Optional[List[str]] = None, 

509 preemptive_behaviour: bool = True, 

510 explain: bool = False, 

511 language_threshold: float = 0.1, 

512 enable_fallback: bool = True, 

513) -> CharsetMatches: 

514 """ 

515 Same thing than the function from_bytes but using a file pointer that is already ready. 

516 Will not close the file pointer. 

517 """ 

518 return from_bytes( 

519 fp.read(), 

520 steps, 

521 chunk_size, 

522 threshold, 

523 cp_isolation, 

524 cp_exclusion, 

525 preemptive_behaviour, 

526 explain, 

527 language_threshold, 

528 enable_fallback, 

529 ) 

530 

531 

532def from_path( 

533 path: Union[str, bytes, PathLike], # type: ignore[type-arg] 

534 steps: int = 5, 

535 chunk_size: int = 512, 

536 threshold: float = 0.20, 

537 cp_isolation: Optional[List[str]] = None, 

538 cp_exclusion: Optional[List[str]] = None, 

539 preemptive_behaviour: bool = True, 

540 explain: bool = False, 

541 language_threshold: float = 0.1, 

542 enable_fallback: bool = True, 

543) -> CharsetMatches: 

544 """ 

545 Same thing than the function from_bytes but with one extra step. Opening and reading given file path in binary mode. 

546 Can raise IOError. 

547 """ 

548 with open(path, "rb") as fp: 

549 return from_fp( 

550 fp, 

551 steps, 

552 chunk_size, 

553 threshold, 

554 cp_isolation, 

555 cp_exclusion, 

556 preemptive_behaviour, 

557 explain, 

558 language_threshold, 

559 enable_fallback, 

560 ) 

561 

562 

563def is_binary( 

564 fp_or_path_or_payload: Union[PathLike, str, BinaryIO, bytes], # type: ignore[type-arg] 

565 steps: int = 5, 

566 chunk_size: int = 512, 

567 threshold: float = 0.20, 

568 cp_isolation: Optional[List[str]] = None, 

569 cp_exclusion: Optional[List[str]] = None, 

570 preemptive_behaviour: bool = True, 

571 explain: bool = False, 

572 language_threshold: float = 0.1, 

573 enable_fallback: bool = False, 

574) -> bool: 

575 """ 

576 Detect if the given input (file, bytes, or path) points to a binary file. aka. not a string. 

577 Based on the same main heuristic algorithms and default kwargs at the sole exception that fallbacks match 

578 are disabled to be stricter around ASCII-compatible but unlikely to be a string. 

579 """ 

580 if isinstance(fp_or_path_or_payload, (str, PathLike)): 

581 guesses = from_path( 

582 fp_or_path_or_payload, 

583 steps=steps, 

584 chunk_size=chunk_size, 

585 threshold=threshold, 

586 cp_isolation=cp_isolation, 

587 cp_exclusion=cp_exclusion, 

588 preemptive_behaviour=preemptive_behaviour, 

589 explain=explain, 

590 language_threshold=language_threshold, 

591 enable_fallback=enable_fallback, 

592 ) 

593 elif isinstance( 

594 fp_or_path_or_payload, 

595 ( 

596 bytes, 

597 bytearray, 

598 ), 

599 ): 

600 guesses = from_bytes( 

601 fp_or_path_or_payload, 

602 steps=steps, 

603 chunk_size=chunk_size, 

604 threshold=threshold, 

605 cp_isolation=cp_isolation, 

606 cp_exclusion=cp_exclusion, 

607 preemptive_behaviour=preemptive_behaviour, 

608 explain=explain, 

609 language_threshold=language_threshold, 

610 enable_fallback=enable_fallback, 

611 ) 

612 else: 

613 guesses = from_fp( 

614 fp_or_path_or_payload, 

615 steps=steps, 

616 chunk_size=chunk_size, 

617 threshold=threshold, 

618 cp_isolation=cp_isolation, 

619 cp_exclusion=cp_exclusion, 

620 preemptive_behaviour=preemptive_behaviour, 

621 explain=explain, 

622 language_threshold=language_threshold, 

623 enable_fallback=enable_fallback, 

624 ) 

625 

626 return not guesses