Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/charset_normalizer/api.py: 7%

195 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-06 06:03 +0000

1import logging 

2from os import PathLike 

3from typing import Any, BinaryIO, List, Optional, Set 

4 

5from .cd import ( 

6 coherence_ratio, 

7 encoding_languages, 

8 mb_encoding_languages, 

9 merge_coherence_ratios, 

10) 

11from .constant import IANA_SUPPORTED, TOO_BIG_SEQUENCE, TOO_SMALL_SEQUENCE, TRACE 

12from .md import mess_ratio 

13from .models import CharsetMatch, CharsetMatches 

14from .utils import ( 

15 any_specified_encoding, 

16 cut_sequence_chunks, 

17 iana_name, 

18 identify_sig_or_bom, 

19 is_cp_similar, 

20 is_multi_byte_encoding, 

21 should_strip_sig_or_bom, 

22) 

23 

24# Will most likely be controversial 

25# logging.addLevelName(TRACE, "TRACE") 

26logger = logging.getLogger("charset_normalizer") 

27explain_handler = logging.StreamHandler() 

28explain_handler.setFormatter( 

29 logging.Formatter("%(asctime)s | %(levelname)s | %(message)s") 

30) 

31 

32 

33def from_bytes( 

34 sequences: bytes, 

35 steps: int = 5, 

36 chunk_size: int = 512, 

37 threshold: float = 0.2, 

38 cp_isolation: Optional[List[str]] = None, 

39 cp_exclusion: Optional[List[str]] = None, 

40 preemptive_behaviour: bool = True, 

41 explain: bool = False, 

42 language_threshold: float = 0.1, 

43) -> CharsetMatches: 

44 """ 

45 Given a raw bytes sequence, return the best possibles charset usable to render str objects. 

46 If there is no results, it is a strong indicator that the source is binary/not text. 

47 By default, the process will extract 5 blocks of 512o each to assess the mess and coherence of a given sequence. 

48 And will give up a particular code page after 20% of measured mess. Those criteria are customizable at will. 

49 

50 The preemptive behavior DOES NOT replace the traditional detection workflow, it prioritize a particular code page 

51 but never take it for granted. Can improve the performance. 

52 

53 You may want to focus your attention to some code page or/and not others, use cp_isolation and cp_exclusion for that 

54 purpose. 

55 

56 This function will strip the SIG in the payload/sequence every time except on UTF-16, UTF-32. 

57 By default the library does not setup any handler other than the NullHandler, if you choose to set the 'explain' 

58 toggle to True it will alter the logger configuration to add a StreamHandler that is suitable for debugging. 

59 Custom logging format and handler can be set manually. 

60 """ 

61 

62 if not isinstance(sequences, (bytearray, bytes)): 

63 raise TypeError( 

64 "Expected object of type bytes or bytearray, got: {0}".format( 

65 type(sequences) 

66 ) 

67 ) 

68 

69 if explain: 

70 previous_logger_level: int = logger.level 

71 logger.addHandler(explain_handler) 

72 logger.setLevel(TRACE) 

73 

74 length: int = len(sequences) 

75 

76 if length == 0: 

77 logger.debug("Encoding detection on empty bytes, assuming utf_8 intention.") 

78 if explain: 

79 logger.removeHandler(explain_handler) 

80 logger.setLevel(previous_logger_level or logging.WARNING) 

81 return CharsetMatches([CharsetMatch(sequences, "utf_8", 0.0, False, [], "")]) 

82 

83 if cp_isolation is not None: 

84 logger.log( 

85 TRACE, 

86 "cp_isolation is set. use this flag for debugging purpose. " 

87 "limited list of encoding allowed : %s.", 

88 ", ".join(cp_isolation), 

89 ) 

90 cp_isolation = [iana_name(cp, False) for cp in cp_isolation] 

91 else: 

92 cp_isolation = [] 

93 

94 if cp_exclusion is not None: 

95 logger.log( 

96 TRACE, 

97 "cp_exclusion is set. use this flag for debugging purpose. " 

98 "limited list of encoding excluded : %s.", 

99 ", ".join(cp_exclusion), 

100 ) 

101 cp_exclusion = [iana_name(cp, False) for cp in cp_exclusion] 

102 else: 

103 cp_exclusion = [] 

104 

105 if length <= (chunk_size * steps): 

106 logger.log( 

107 TRACE, 

108 "override steps (%i) and chunk_size (%i) as content does not fit (%i byte(s) given) parameters.", 

109 steps, 

110 chunk_size, 

111 length, 

112 ) 

113 steps = 1 

114 chunk_size = length 

115 

116 if steps > 1 and length / steps < chunk_size: 

117 chunk_size = int(length / steps) 

118 

119 is_too_small_sequence: bool = len(sequences) < TOO_SMALL_SEQUENCE 

120 is_too_large_sequence: bool = len(sequences) >= TOO_BIG_SEQUENCE 

121 

122 if is_too_small_sequence: 

123 logger.log( 

124 TRACE, 

125 "Trying to detect encoding from a tiny portion of ({}) byte(s).".format( 

126 length 

127 ), 

128 ) 

129 elif is_too_large_sequence: 

130 logger.log( 

131 TRACE, 

132 "Using lazy str decoding because the payload is quite large, ({}) byte(s).".format( 

133 length 

134 ), 

135 ) 

136 

137 prioritized_encodings: List[str] = [] 

138 

139 specified_encoding: Optional[str] = ( 

140 any_specified_encoding(sequences) if preemptive_behaviour else None 

141 ) 

142 

143 if specified_encoding is not None: 

144 prioritized_encodings.append(specified_encoding) 

145 logger.log( 

146 TRACE, 

147 "Detected declarative mark in sequence. Priority +1 given for %s.", 

148 specified_encoding, 

149 ) 

150 

151 tested: Set[str] = set() 

152 tested_but_hard_failure: List[str] = [] 

153 tested_but_soft_failure: List[str] = [] 

154 

155 fallback_ascii: Optional[CharsetMatch] = None 

156 fallback_u8: Optional[CharsetMatch] = None 

157 fallback_specified: Optional[CharsetMatch] = None 

158 

159 results: CharsetMatches = CharsetMatches() 

160 

161 sig_encoding, sig_payload = identify_sig_or_bom(sequences) 

162 

163 if sig_encoding is not None: 

164 prioritized_encodings.append(sig_encoding) 

165 logger.log( 

166 TRACE, 

167 "Detected a SIG or BOM mark on first %i byte(s). Priority +1 given for %s.", 

168 len(sig_payload), 

169 sig_encoding, 

170 ) 

171 

172 prioritized_encodings.append("ascii") 

173 

174 if "utf_8" not in prioritized_encodings: 

175 prioritized_encodings.append("utf_8") 

176 

177 for encoding_iana in prioritized_encodings + IANA_SUPPORTED: 

178 if cp_isolation and encoding_iana not in cp_isolation: 

179 continue 

180 

181 if cp_exclusion and encoding_iana in cp_exclusion: 

182 continue 

183 

184 if encoding_iana in tested: 

185 continue 

186 

187 tested.add(encoding_iana) 

188 

189 decoded_payload: Optional[str] = None 

190 bom_or_sig_available: bool = sig_encoding == encoding_iana 

191 strip_sig_or_bom: bool = bom_or_sig_available and should_strip_sig_or_bom( 

192 encoding_iana 

193 ) 

194 

195 if encoding_iana in {"utf_16", "utf_32"} and not bom_or_sig_available: 

196 logger.log( 

197 TRACE, 

198 "Encoding %s won't be tested as-is because it require a BOM. Will try some sub-encoder LE/BE.", 

199 encoding_iana, 

200 ) 

201 continue 

202 if encoding_iana in {"utf_7"} and not bom_or_sig_available: 

203 logger.log( 

204 TRACE, 

205 "Encoding %s won't be tested as-is because detection is unreliable without BOM/SIG.", 

206 encoding_iana, 

207 ) 

208 continue 

209 

210 try: 

211 is_multi_byte_decoder: bool = is_multi_byte_encoding(encoding_iana) 

212 except (ModuleNotFoundError, ImportError): 

213 logger.log( 

214 TRACE, 

215 "Encoding %s does not provide an IncrementalDecoder", 

216 encoding_iana, 

217 ) 

218 continue 

219 

220 try: 

221 if is_too_large_sequence and is_multi_byte_decoder is False: 

222 str( 

223 sequences[: int(50e4)] 

224 if strip_sig_or_bom is False 

225 else sequences[len(sig_payload) : int(50e4)], 

226 encoding=encoding_iana, 

227 ) 

228 else: 

229 decoded_payload = str( 

230 sequences 

231 if strip_sig_or_bom is False 

232 else sequences[len(sig_payload) :], 

233 encoding=encoding_iana, 

234 ) 

235 except (UnicodeDecodeError, LookupError) as e: 

236 if not isinstance(e, LookupError): 

237 logger.log( 

238 TRACE, 

239 "Code page %s does not fit given bytes sequence at ALL. %s", 

240 encoding_iana, 

241 str(e), 

242 ) 

243 tested_but_hard_failure.append(encoding_iana) 

244 continue 

245 

246 similar_soft_failure_test: bool = False 

247 

248 for encoding_soft_failed in tested_but_soft_failure: 

249 if is_cp_similar(encoding_iana, encoding_soft_failed): 

250 similar_soft_failure_test = True 

251 break 

252 

253 if similar_soft_failure_test: 

254 logger.log( 

255 TRACE, 

256 "%s is deemed too similar to code page %s and was consider unsuited already. Continuing!", 

257 encoding_iana, 

258 encoding_soft_failed, 

259 ) 

260 continue 

261 

262 r_ = range( 

263 0 if not bom_or_sig_available else len(sig_payload), 

264 length, 

265 int(length / steps), 

266 ) 

267 

268 multi_byte_bonus: bool = ( 

269 is_multi_byte_decoder 

270 and decoded_payload is not None 

271 and len(decoded_payload) < length 

272 ) 

273 

274 if multi_byte_bonus: 

275 logger.log( 

276 TRACE, 

277 "Code page %s is a multi byte encoding table and it appear that at least one character " 

278 "was encoded using n-bytes.", 

279 encoding_iana, 

280 ) 

281 

282 max_chunk_gave_up: int = int(len(r_) / 4) 

283 

284 max_chunk_gave_up = max(max_chunk_gave_up, 2) 

285 early_stop_count: int = 0 

286 lazy_str_hard_failure = False 

287 

288 md_chunks: List[str] = [] 

289 md_ratios = [] 

290 

291 try: 

292 for chunk in cut_sequence_chunks( 

293 sequences, 

294 encoding_iana, 

295 r_, 

296 chunk_size, 

297 bom_or_sig_available, 

298 strip_sig_or_bom, 

299 sig_payload, 

300 is_multi_byte_decoder, 

301 decoded_payload, 

302 ): 

303 md_chunks.append(chunk) 

304 

305 md_ratios.append( 

306 mess_ratio( 

307 chunk, 

308 threshold, 

309 explain is True and 1 <= len(cp_isolation) <= 2, 

310 ) 

311 ) 

312 

313 if md_ratios[-1] >= threshold: 

314 early_stop_count += 1 

315 

316 if (early_stop_count >= max_chunk_gave_up) or ( 

317 bom_or_sig_available and strip_sig_or_bom is False 

318 ): 

319 break 

320 except ( 

321 UnicodeDecodeError 

322 ) as e: # Lazy str loading may have missed something there 

323 logger.log( 

324 TRACE, 

325 "LazyStr Loading: After MD chunk decode, code page %s does not fit given bytes sequence at ALL. %s", 

326 encoding_iana, 

327 str(e), 

328 ) 

329 early_stop_count = max_chunk_gave_up 

330 lazy_str_hard_failure = True 

331 

332 # We might want to check the sequence again with the whole content 

333 # Only if initial MD tests passes 

334 if ( 

335 not lazy_str_hard_failure 

336 and is_too_large_sequence 

337 and not is_multi_byte_decoder 

338 ): 

339 try: 

340 sequences[int(50e3) :].decode(encoding_iana, errors="strict") 

341 except UnicodeDecodeError as e: 

342 logger.log( 

343 TRACE, 

344 "LazyStr Loading: After final lookup, code page %s does not fit given bytes sequence at ALL. %s", 

345 encoding_iana, 

346 str(e), 

347 ) 

348 tested_but_hard_failure.append(encoding_iana) 

349 continue 

350 

351 mean_mess_ratio: float = sum(md_ratios) / len(md_ratios) if md_ratios else 0.0 

352 if mean_mess_ratio >= threshold or early_stop_count >= max_chunk_gave_up: 

353 tested_but_soft_failure.append(encoding_iana) 

354 logger.log( 

355 TRACE, 

356 "%s was excluded because of initial chaos probing. Gave up %i time(s). " 

357 "Computed mean chaos is %f %%.", 

358 encoding_iana, 

359 early_stop_count, 

360 round(mean_mess_ratio * 100, ndigits=3), 

361 ) 

362 # Preparing those fallbacks in case we got nothing. 

363 if ( 

364 encoding_iana in ["ascii", "utf_8", specified_encoding] 

365 and not lazy_str_hard_failure 

366 ): 

367 fallback_entry = CharsetMatch( 

368 sequences, encoding_iana, threshold, False, [], decoded_payload 

369 ) 

370 if encoding_iana == specified_encoding: 

371 fallback_specified = fallback_entry 

372 elif encoding_iana == "ascii": 

373 fallback_ascii = fallback_entry 

374 else: 

375 fallback_u8 = fallback_entry 

376 continue 

377 

378 logger.log( 

379 TRACE, 

380 "%s passed initial chaos probing. Mean measured chaos is %f %%", 

381 encoding_iana, 

382 round(mean_mess_ratio * 100, ndigits=3), 

383 ) 

384 

385 if not is_multi_byte_decoder: 

386 target_languages: List[str] = encoding_languages(encoding_iana) 

387 else: 

388 target_languages = mb_encoding_languages(encoding_iana) 

389 

390 if target_languages: 

391 logger.log( 

392 TRACE, 

393 "{} should target any language(s) of {}".format( 

394 encoding_iana, str(target_languages) 

395 ), 

396 ) 

397 

398 cd_ratios = [] 

399 

400 # We shall skip the CD when its about ASCII 

401 # Most of the time its not relevant to run "language-detection" on it. 

402 if encoding_iana != "ascii": 

403 for chunk in md_chunks: 

404 chunk_languages = coherence_ratio( 

405 chunk, 

406 language_threshold, 

407 ",".join(target_languages) if target_languages else None, 

408 ) 

409 

410 cd_ratios.append(chunk_languages) 

411 

412 cd_ratios_merged = merge_coherence_ratios(cd_ratios) 

413 

414 if cd_ratios_merged: 

415 logger.log( 

416 TRACE, 

417 "We detected language {} using {}".format( 

418 cd_ratios_merged, encoding_iana 

419 ), 

420 ) 

421 

422 results.append( 

423 CharsetMatch( 

424 sequences, 

425 encoding_iana, 

426 mean_mess_ratio, 

427 bom_or_sig_available, 

428 cd_ratios_merged, 

429 decoded_payload, 

430 ) 

431 ) 

432 

433 if ( 

434 encoding_iana in [specified_encoding, "ascii", "utf_8"] 

435 and mean_mess_ratio < 0.1 

436 ): 

437 logger.debug( 

438 "Encoding detection: %s is most likely the one.", encoding_iana 

439 ) 

440 if explain: 

441 logger.removeHandler(explain_handler) 

442 logger.setLevel(previous_logger_level) 

443 return CharsetMatches([results[encoding_iana]]) 

444 

445 if encoding_iana == sig_encoding: 

446 logger.debug( 

447 "Encoding detection: %s is most likely the one as we detected a BOM or SIG within " 

448 "the beginning of the sequence.", 

449 encoding_iana, 

450 ) 

451 if explain: 

452 logger.removeHandler(explain_handler) 

453 logger.setLevel(previous_logger_level) 

454 return CharsetMatches([results[encoding_iana]]) 

455 

456 if len(results) == 0: 

457 if fallback_u8 or fallback_ascii or fallback_specified: 

458 logger.log( 

459 TRACE, 

460 "Nothing got out of the detection process. Using ASCII/UTF-8/Specified fallback.", 

461 ) 

462 

463 if fallback_specified: 

464 logger.debug( 

465 "Encoding detection: %s will be used as a fallback match", 

466 fallback_specified.encoding, 

467 ) 

468 results.append(fallback_specified) 

469 elif ( 

470 (fallback_u8 and fallback_ascii is None) 

471 or ( 

472 fallback_u8 

473 and fallback_ascii 

474 and fallback_u8.fingerprint != fallback_ascii.fingerprint 

475 ) 

476 or (fallback_u8 is not None) 

477 ): 

478 logger.debug("Encoding detection: utf_8 will be used as a fallback match") 

479 results.append(fallback_u8) 

480 elif fallback_ascii: 

481 logger.debug("Encoding detection: ascii will be used as a fallback match") 

482 results.append(fallback_ascii) 

483 

484 if results: 

485 logger.debug( 

486 "Encoding detection: Found %s as plausible (best-candidate) for content. With %i alternatives.", 

487 results.best().encoding, # type: ignore 

488 len(results) - 1, 

489 ) 

490 else: 

491 logger.debug("Encoding detection: Unable to determine any suitable charset.") 

492 

493 if explain: 

494 logger.removeHandler(explain_handler) 

495 logger.setLevel(previous_logger_level) 

496 

497 return results 

498 

499 

500def from_fp( 

501 fp: BinaryIO, 

502 steps: int = 5, 

503 chunk_size: int = 512, 

504 threshold: float = 0.20, 

505 cp_isolation: Optional[List[str]] = None, 

506 cp_exclusion: Optional[List[str]] = None, 

507 preemptive_behaviour: bool = True, 

508 explain: bool = False, 

509 language_threshold: float = 0.1, 

510) -> CharsetMatches: 

511 """ 

512 Same thing than the function from_bytes but using a file pointer that is already ready. 

513 Will not close the file pointer. 

514 """ 

515 return from_bytes( 

516 fp.read(), 

517 steps, 

518 chunk_size, 

519 threshold, 

520 cp_isolation, 

521 cp_exclusion, 

522 preemptive_behaviour, 

523 explain, 

524 language_threshold, 

525 ) 

526 

527 

528def from_path( 

529 path: "PathLike[Any]", 

530 steps: int = 5, 

531 chunk_size: int = 512, 

532 threshold: float = 0.20, 

533 cp_isolation: Optional[List[str]] = None, 

534 cp_exclusion: Optional[List[str]] = None, 

535 preemptive_behaviour: bool = True, 

536 explain: bool = False, 

537 language_threshold: float = 0.1, 

538) -> CharsetMatches: 

539 """ 

540 Same thing than the function from_bytes but with one extra step. Opening and reading given file path in binary mode. 

541 Can raise IOError. 

542 """ 

543 with open(path, "rb") as fp: 

544 return from_fp( 

545 fp, 

546 steps, 

547 chunk_size, 

548 threshold, 

549 cp_isolation, 

550 cp_exclusion, 

551 preemptive_behaviour, 

552 explain, 

553 language_threshold, 

554 )