Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/chardet/pipeline/structural.py: 6%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

278 statements  

1"""Stage 2b: Multi-byte structural probing. 

2 

3Computes how well byte patterns in the data match the expected multi-byte 

4structure for a given encoding. Used after byte-validity filtering (Stage 2a) 

5to further rank multi-byte encoding candidates. 

6 

7Note: ``from __future__ import annotations`` is intentionally omitted because 

8this module is compiled with mypyc, which does not support PEP 563 string 

9annotations. 

10""" 

11 

12from collections.abc import Callable 

13 

14from chardet.pipeline import HIGH_BYTES, PipelineContext 

15from chardet.registry import EncodingInfo 

16 

17# --------------------------------------------------------------------------- 

18# Per-encoding single-pass analyzers 

19# 

20# Each function walks the data once, computing three metrics simultaneously: 

21# - pair_ratio: valid multi-byte pairs / lead bytes (structural score) 

22# - mb_bytes: count of non-ASCII bytes in valid multi-byte sequences 

23# - lead_diversity: count of distinct lead byte values in valid pairs 

24# 

25# These are kept as separate functions (rather than a single parameterized 

26# analyzer) so that mypyc can inline the byte-range constants into each 

27# function's tight loop. 

28# --------------------------------------------------------------------------- 

29 

30 

31def _analyze_shift_jis( 

32 data: bytes, 

33) -> tuple[float, int, int]: 

34 """Single-pass Shift_JIS structural analysis. 

35 

36 Lead bytes: 0x81-0x9F, 0xE0-0xEF 

37 Trail bytes: 0x40-0x7E, 0x80-0xFC 

38 

39 Returns (pair_ratio, mb_bytes, lead_diversity). 

40 """ 

41 lead_count = 0 

42 valid_count = 0 

43 mb = 0 

44 leads: set[int] = set() 

45 i = 0 

46 length = len(data) 

47 while i < length: 

48 b = data[i] 

49 if (0x81 <= b <= 0x9F) or (0xE0 <= b <= 0xEF): 

50 lead_count += 1 

51 if i + 1 < length: 

52 trail = data[i + 1] 

53 if (0x40 <= trail <= 0x7E) or (0x80 <= trail <= 0xFC): 

54 valid_count += 1 

55 leads.add(b) 

56 # Lead is always > 0x7F; trail may or may not be 

57 mb += 1 

58 if trail > 0x7F: 

59 mb += 1 

60 i += 2 

61 continue 

62 i += 1 

63 else: 

64 i += 1 

65 ratio = valid_count / lead_count if lead_count > 0 else 0.0 

66 return ratio, mb, len(leads) 

67 

68 

69def _analyze_cp932( 

70 data: bytes, 

71) -> tuple[float, int, int]: 

72 """Single-pass CP932 structural analysis. 

73 

74 Lead bytes: 0x81-0x9F, 0xE0-0xFC 

75 Trail bytes: 0x40-0x7E, 0x80-0xFC 

76 

77 Extends Shift_JIS by raising the lead byte ceiling from 0xEF to 0xFC, 

78 covering IBM vendor-defined characters (NEC-selected, IBM extensions). 

79 

80 Returns (pair_ratio, mb_bytes, lead_diversity). 

81 """ 

82 lead_count = 0 

83 valid_count = 0 

84 mb = 0 

85 leads: set[int] = set() 

86 i = 0 

87 length = len(data) 

88 while i < length: 

89 b = data[i] 

90 if (0x81 <= b <= 0x9F) or (0xE0 <= b <= 0xFC): 

91 lead_count += 1 

92 if i + 1 < length: 

93 trail = data[i + 1] 

94 if (0x40 <= trail <= 0x7E) or (0x80 <= trail <= 0xFC): 

95 valid_count += 1 

96 leads.add(b) 

97 # Lead is always > 0x7F; trail may or may not be 

98 mb += 1 

99 if trail > 0x7F: 

100 mb += 1 

101 i += 2 

102 continue 

103 i += 1 

104 else: 

105 i += 1 

106 ratio = valid_count / lead_count if lead_count > 0 else 0.0 

107 return ratio, mb, len(leads) 

108 

109 

110def _analyze_euc_jp( 

111 data: bytes, 

112) -> tuple[float, int, int]: 

113 """Single-pass EUC-JP structural analysis. 

114 

115 Two-byte: Lead 0xA1-0xFE, Trail 0xA1-0xFE 

116 SS2 (half-width katakana): 0x8E + 0xA1-0xDF 

117 SS3 (JIS X 0212): 0x8F + 0xA1-0xFE + 0xA1-0xFE 

118 

119 Returns (pair_ratio, mb_bytes, lead_diversity). 

120 """ 

121 lead_count = 0 

122 valid_count = 0 

123 mb = 0 

124 leads: set[int] = set() 

125 i = 0 

126 length = len(data) 

127 while i < length: 

128 b = data[i] 

129 if b == 0x8E: 

130 # SS2 sequence 

131 lead_count += 1 

132 if i + 1 < length and 0xA1 <= data[i + 1] <= 0xDF: 

133 valid_count += 1 

134 leads.add(b) 

135 mb += 2 

136 i += 2 

137 continue 

138 i += 1 

139 elif b == 0x8F: 

140 # SS3 sequence 

141 lead_count += 1 

142 if ( 

143 i + 2 < length 

144 and 0xA1 <= data[i + 1] <= 0xFE 

145 and 0xA1 <= data[i + 2] <= 0xFE 

146 ): 

147 valid_count += 1 

148 leads.add(b) 

149 mb += 3 

150 i += 3 

151 continue 

152 i += 1 

153 elif 0xA1 <= b <= 0xFE: 

154 lead_count += 1 

155 if i + 1 < length and 0xA1 <= data[i + 1] <= 0xFE: 

156 valid_count += 1 

157 leads.add(b) 

158 mb += 2 

159 i += 2 

160 continue 

161 i += 1 

162 else: 

163 i += 1 

164 ratio = valid_count / lead_count if lead_count > 0 else 0.0 

165 return ratio, mb, len(leads) 

166 

167 

168def _analyze_euc_kr( 

169 data: bytes, 

170) -> tuple[float, int, int]: 

171 """Single-pass EUC-KR structural analysis. 

172 

173 Lead 0xA1-0xFE; Trail 0xA1-0xFE 

174 

175 Returns (pair_ratio, mb_bytes, lead_diversity). 

176 """ 

177 lead_count = 0 

178 valid_count = 0 

179 mb = 0 

180 leads: set[int] = set() 

181 i = 0 

182 length = len(data) 

183 while i < length: 

184 b = data[i] 

185 if 0xA1 <= b <= 0xFE: 

186 lead_count += 1 

187 if i + 1 < length and 0xA1 <= data[i + 1] <= 0xFE: 

188 valid_count += 1 

189 leads.add(b) 

190 mb += 2 

191 i += 2 

192 continue 

193 i += 1 

194 else: 

195 i += 1 

196 ratio = valid_count / lead_count if lead_count > 0 else 0.0 

197 return ratio, mb, len(leads) 

198 

199 

200def _analyze_cp949( 

201 data: bytes, 

202) -> tuple[float, int, int]: 

203 """Single-pass CP949 (Unified Hangul Code) structural analysis. 

204 

205 Lead bytes: 0x81-0xC8, 0xCA-0xFD 

206 Trail bytes: 0x41-0x5A, 0x61-0x7A, 0x81-0xFE 

207 

208 Extends EUC-KR by lowering the lead byte floor from 0xA1 to 0x81 and 

209 adding ASCII letter trail ranges plus 0x81-0xA0. 0xC9 is not a valid 

210 UHC lead byte. 

211 

212 Returns (pair_ratio, mb_bytes, lead_diversity). 

213 """ 

214 lead_count = 0 

215 valid_count = 0 

216 mb = 0 

217 leads: set[int] = set() 

218 i = 0 

219 length = len(data) 

220 while i < length: 

221 b = data[i] 

222 if (0x81 <= b <= 0xC8) or (0xCA <= b <= 0xFD): 

223 lead_count += 1 

224 if i + 1 < length: 

225 trail = data[i + 1] 

226 if ( 

227 (0x41 <= trail <= 0x5A) 

228 or (0x61 <= trail <= 0x7A) 

229 or (0x81 <= trail <= 0xFE) 

230 ): 

231 valid_count += 1 

232 leads.add(b) 

233 # Lead is always > 0x7F; trail may or may not be 

234 mb += 1 

235 if trail > 0x7F: 

236 mb += 1 

237 i += 2 

238 continue 

239 i += 1 

240 else: 

241 i += 1 

242 ratio = valid_count / lead_count if lead_count > 0 else 0.0 

243 return ratio, mb, len(leads) 

244 

245 

246def _analyze_gb18030( 

247 data: bytes, 

248) -> tuple[float, int, int]: 

249 """Single-pass GB18030 / GB2312 structural analysis. 

250 

251 Only counts strict GB2312 2-byte pairs (lead 0xA1-0xF7, trail 0xA1-0xFE) 

252 and GB18030 4-byte sequences. The broader GBK extension range 

253 (lead 0x81-0xFE, trail 0x40-0x7E / 0x80-0xFE) is intentionally excluded 

254 because it is so permissive that unrelated single-byte data (EBCDIC, DOS 

255 codepages, etc.) can score 1.0, leading to false positives. 

256 

257 Returns (pair_ratio, mb_bytes, lead_diversity). 

258 """ 

259 lead_count = 0 

260 valid_count = 0 

261 mb = 0 

262 leads: set[int] = set() 

263 i = 0 

264 length = len(data) 

265 while i < length: 

266 b = data[i] 

267 if 0x81 <= b <= 0xFE: 

268 lead_count += 1 

269 # Try 4-byte first (byte2 in 0x30-0x39 distinguishes from 2-byte) 

270 if ( 

271 i + 3 < length 

272 and 0x30 <= data[i + 1] <= 0x39 

273 and 0x81 <= data[i + 2] <= 0xFE 

274 and 0x30 <= data[i + 3] <= 0x39 

275 ): 

276 valid_count += 1 

277 leads.add(b) 

278 mb += 2 # bytes 0 and 2 are non-ASCII 

279 i += 4 

280 continue 

281 # 2-byte GB2312: Lead 0xA1-0xF7, Trail 0xA1-0xFE 

282 if 0xA1 <= b <= 0xF7 and i + 1 < length and 0xA1 <= data[i + 1] <= 0xFE: 

283 valid_count += 1 

284 leads.add(b) 

285 mb += 2 # both bytes are > 0x7F 

286 i += 2 

287 continue 

288 i += 1 

289 else: 

290 i += 1 

291 ratio = valid_count / lead_count if lead_count > 0 else 0.0 

292 return ratio, mb, len(leads) 

293 

294 

295def _analyze_big5( 

296 data: bytes, 

297) -> tuple[float, int, int]: 

298 """Single-pass Big5 structural analysis. 

299 

300 Lead 0xA1-0xF9; Trail 0x40-0x7E, 0xA1-0xFE 

301 

302 Returns (pair_ratio, mb_bytes, lead_diversity). 

303 """ 

304 lead_count = 0 

305 valid_count = 0 

306 mb = 0 

307 leads: set[int] = set() 

308 i = 0 

309 length = len(data) 

310 while i < length: 

311 b = data[i] 

312 if 0xA1 <= b <= 0xF9: 

313 lead_count += 1 

314 if i + 1 < length: 

315 trail = data[i + 1] 

316 if (0x40 <= trail <= 0x7E) or (0xA1 <= trail <= 0xFE): 

317 valid_count += 1 

318 leads.add(b) 

319 # Lead is always > 0x7F; trail may or may not be 

320 mb += 1 

321 if trail > 0x7F: 

322 mb += 1 

323 i += 2 

324 continue 

325 i += 1 

326 else: 

327 i += 1 

328 ratio = valid_count / lead_count if lead_count > 0 else 0.0 

329 return ratio, mb, len(leads) 

330 

331 

332def _analyze_big5hkscs( 

333 data: bytes, 

334) -> tuple[float, int, int]: 

335 """Single-pass Big5-HKSCS structural analysis. 

336 

337 Lead bytes: 0x87-0xFE 

338 Trail bytes: 0x40-0x7E, 0xA1-0xFE 

339 

340 Extends Big5 by lowering the lead byte floor from 0xA1 to 0x87 and 

341 raising the ceiling from 0xF9 to 0xFE. 0x7F and 0x80-0xA0 are not 

342 valid Big5/HKSCS trail bytes. 

343 

344 Returns (pair_ratio, mb_bytes, lead_diversity). 

345 """ 

346 lead_count = 0 

347 valid_count = 0 

348 mb = 0 

349 leads: set[int] = set() 

350 i = 0 

351 length = len(data) 

352 while i < length: 

353 b = data[i] 

354 if 0x87 <= b <= 0xFE: 

355 lead_count += 1 

356 if i + 1 < length: 

357 trail = data[i + 1] 

358 if (0x40 <= trail <= 0x7E) or (0xA1 <= trail <= 0xFE): 

359 valid_count += 1 

360 leads.add(b) 

361 # Lead is always > 0x7F; trail may or may not be 

362 mb += 1 

363 if trail > 0x7F: 

364 mb += 1 

365 i += 2 

366 continue 

367 i += 1 

368 else: 

369 i += 1 

370 ratio = valid_count / lead_count if lead_count > 0 else 0.0 

371 return ratio, mb, len(leads) 

372 

373 

374def _analyze_johab( 

375 data: bytes, 

376) -> tuple[float, int, int]: 

377 """Single-pass Johab structural analysis. 

378 

379 Lead: 0x84-0xD3, 0xD8-0xDE, 0xE0-0xF9 

380 Trail: 0x31-0x7E, 0x91-0xFE 

381 

382 Returns (pair_ratio, mb_bytes, lead_diversity). 

383 """ 

384 lead_count = 0 

385 valid_count = 0 

386 mb = 0 

387 leads: set[int] = set() 

388 i = 0 

389 length = len(data) 

390 while i < length: 

391 b = data[i] 

392 if (0x84 <= b <= 0xD3) or (0xD8 <= b <= 0xDE) or (0xE0 <= b <= 0xF9): 

393 lead_count += 1 

394 if i + 1 < length: 

395 trail = data[i + 1] 

396 if (0x31 <= trail <= 0x7E) or (0x91 <= trail <= 0xFE): 

397 valid_count += 1 

398 leads.add(b) 

399 if b > 0x7F: 

400 mb += 1 

401 if trail > 0x7F: 

402 mb += 1 

403 i += 2 

404 continue 

405 i += 1 

406 else: 

407 i += 1 

408 ratio = valid_count / lead_count if lead_count > 0 else 0.0 

409 return ratio, mb, len(leads) 

410 

411 

412# --------------------------------------------------------------------------- 

413# Dispatch table: encoding name -> analyzer function 

414# --------------------------------------------------------------------------- 

415 

416_ANALYZERS: dict[str, Callable[[bytes], tuple[float, int, int]]] = { 

417 "shift_jis_2004": _analyze_shift_jis, 

418 "cp932": _analyze_cp932, 

419 "euc_jis_2004": _analyze_euc_jp, 

420 "euc_kr": _analyze_euc_kr, 

421 "cp949": _analyze_cp949, 

422 "gb18030": _analyze_gb18030, 

423 "big5hkscs": _analyze_big5hkscs, 

424 "johab": _analyze_johab, 

425} 

426 

427 

428def _get_analysis( 

429 data: bytes, name: str, ctx: PipelineContext 

430) -> tuple[float, int, int] | None: 

431 """Return cached analysis or compute and cache it.""" 

432 cached = ctx.analysis_cache.get(name) 

433 if cached is not None: 

434 return cached 

435 analyzer = _ANALYZERS.get(name) 

436 if analyzer is None: 

437 return None 

438 result = analyzer(data) 

439 ctx.analysis_cache[name] = result 

440 return result 

441 

442 

443# --------------------------------------------------------------------------- 

444# Public API 

445# --------------------------------------------------------------------------- 

446 

447 

448def compute_structural_score( 

449 data: bytes, encoding_info: EncodingInfo, ctx: PipelineContext 

450) -> float: 

451 """Return 0.0--1.0 indicating how well *data* matches the encoding's structure. 

452 

453 For single-byte encodings, always returns 0.0. For empty data, always 

454 returns 0.0. 

455 

456 :param data: The raw byte data to analyze. 

457 :param encoding_info: Metadata for the encoding to probe. 

458 :param ctx: Pipeline context for caching analysis results. 

459 :returns: A structural fit score between 0.0 and 1.0. 

460 """ 

461 if not data or not encoding_info.is_multibyte: 

462 return 0.0 

463 

464 result = _get_analysis(data, encoding_info.name, ctx) 

465 if result is None: 

466 return 0.0 

467 

468 return result[0] # pair_ratio 

469 

470 

471def compute_multibyte_byte_coverage( 

472 data: bytes, 

473 encoding_info: EncodingInfo, 

474 ctx: PipelineContext, 

475 non_ascii_count: int | None = None, 

476) -> float: 

477 """Ratio of non-ASCII bytes that participate in valid multi-byte sequences. 

478 

479 Genuine CJK text has nearly all non-ASCII bytes paired into valid 

480 multi-byte sequences (coverage close to 1.0), while Latin text with 

481 scattered high bytes has many orphan bytes (coverage well below 1.0). 

482 

483 :param data: The raw byte data to analyze. 

484 :param encoding_info: Metadata for the encoding to probe. 

485 :param ctx: Pipeline context for caching analysis results. 

486 :param non_ascii_count: Pre-computed count of non-ASCII bytes, or ``None`` 

487 to compute from *data*. 

488 :returns: A coverage ratio between 0.0 and 1.0. 

489 """ 

490 if not data or not encoding_info.is_multibyte: 

491 return 0.0 

492 

493 result = _get_analysis(data, encoding_info.name, ctx) 

494 if result is None: 

495 return 0.0 

496 

497 mb_bytes = result[1] 

498 

499 non_ascii = ( 

500 non_ascii_count 

501 if non_ascii_count is not None 

502 else len(data) - len(data.translate(None, HIGH_BYTES)) 

503 ) 

504 if non_ascii == 0: 

505 return 0.0 

506 

507 return mb_bytes / non_ascii 

508 

509 

510def compute_lead_byte_diversity( 

511 data: bytes, encoding_info: EncodingInfo, ctx: PipelineContext 

512) -> int: 

513 """Count distinct lead byte values in valid multi-byte pairs. 

514 

515 Genuine CJK text uses lead bytes from across the encoding's full 

516 repertoire. European text falsely matching a CJK structural scorer 

517 clusters lead bytes in a narrow band. 

518 

519 :param data: The raw byte data to analyze. 

520 :param encoding_info: Metadata for the encoding to probe. 

521 :param ctx: Pipeline context for caching analysis results. 

522 :returns: The number of distinct lead byte values found. 

523 """ 

524 if not data or not encoding_info.is_multibyte: 

525 return 0 

526 result = _get_analysis(data, encoding_info.name, ctx) 

527 if result is None: 

528 return 256 # Unknown encoding -- don't gate 

529 return result[2] # lead_diversity