Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/chardet/pipeline/structural.py: 7%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

203 statements  

1"""Stage 2b: Multi-byte structural probing. 

2 

3Computes how well byte patterns in the data match the expected multi-byte 

4structure for a given encoding. Used after byte-validity filtering (Stage 2a) 

5to further rank multi-byte encoding candidates. 

6 

7Note: ``from __future__ import annotations`` is intentionally omitted because 

8this module is compiled with mypyc, which does not support PEP 563 string 

9annotations. 

10""" 

11 

12from collections.abc import Callable 

13 

14from chardet.pipeline import HIGH_BYTES, PipelineContext 

15from chardet.registry import EncodingInfo 

16 

17# --------------------------------------------------------------------------- 

18# Per-encoding single-pass analyzers 

19# 

20# Each function walks the data once, computing three metrics simultaneously: 

21# - pair_ratio: valid multi-byte pairs / lead bytes (structural score) 

22# - mb_bytes: count of non-ASCII bytes in valid multi-byte sequences 

23# - lead_diversity: count of distinct lead byte values in valid pairs 

24# 

25# These are kept as separate functions (rather than a single parameterized 

26# analyzer) so that mypyc can inline the byte-range constants into each 

27# function's tight loop. 

28# --------------------------------------------------------------------------- 

29 

30 

31def _analyze_shift_jis( 

32 data: bytes, 

33) -> tuple[float, int, int]: 

34 """Single-pass Shift_JIS / CP932 structural analysis. 

35 

36 Lead bytes: 0x81-0x9F, 0xE0-0xEF 

37 Trail bytes: 0x40-0x7E, 0x80-0xFC 

38 

39 Returns (pair_ratio, mb_bytes, lead_diversity). 

40 """ 

41 lead_count = 0 

42 valid_count = 0 

43 mb = 0 

44 leads: set[int] = set() 

45 i = 0 

46 length = len(data) 

47 while i < length: 

48 b = data[i] 

49 if (0x81 <= b <= 0x9F) or (0xE0 <= b <= 0xEF): 

50 lead_count += 1 

51 if i + 1 < length: 

52 trail = data[i + 1] 

53 if (0x40 <= trail <= 0x7E) or (0x80 <= trail <= 0xFC): 

54 valid_count += 1 

55 leads.add(b) 

56 # Lead is always > 0x7F; trail may or may not be 

57 mb += 1 

58 if trail > 0x7F: 

59 mb += 1 

60 i += 2 

61 continue 

62 i += 1 

63 else: 

64 i += 1 

65 ratio = valid_count / lead_count if lead_count > 0 else 0.0 

66 return ratio, mb, len(leads) 

67 

68 

69def _analyze_euc_jp( 

70 data: bytes, 

71) -> tuple[float, int, int]: 

72 """Single-pass EUC-JP structural analysis. 

73 

74 Two-byte: Lead 0xA1-0xFE, Trail 0xA1-0xFE 

75 SS2 (half-width katakana): 0x8E + 0xA1-0xDF 

76 SS3 (JIS X 0212): 0x8F + 0xA1-0xFE + 0xA1-0xFE 

77 

78 Returns (pair_ratio, mb_bytes, lead_diversity). 

79 """ 

80 lead_count = 0 

81 valid_count = 0 

82 mb = 0 

83 leads: set[int] = set() 

84 i = 0 

85 length = len(data) 

86 while i < length: 

87 b = data[i] 

88 if b == 0x8E: 

89 # SS2 sequence 

90 lead_count += 1 

91 if i + 1 < length and 0xA1 <= data[i + 1] <= 0xDF: 

92 valid_count += 1 

93 leads.add(b) 

94 mb += 2 

95 i += 2 

96 continue 

97 i += 1 

98 elif b == 0x8F: 

99 # SS3 sequence 

100 lead_count += 1 

101 if ( 

102 i + 2 < length 

103 and 0xA1 <= data[i + 1] <= 0xFE 

104 and 0xA1 <= data[i + 2] <= 0xFE 

105 ): 

106 valid_count += 1 

107 leads.add(b) 

108 mb += 3 

109 i += 3 

110 continue 

111 i += 1 

112 elif 0xA1 <= b <= 0xFE: 

113 lead_count += 1 

114 if i + 1 < length and 0xA1 <= data[i + 1] <= 0xFE: 

115 valid_count += 1 

116 leads.add(b) 

117 mb += 2 

118 i += 2 

119 continue 

120 i += 1 

121 else: 

122 i += 1 

123 ratio = valid_count / lead_count if lead_count > 0 else 0.0 

124 return ratio, mb, len(leads) 

125 

126 

127def _analyze_euc_kr( 

128 data: bytes, 

129) -> tuple[float, int, int]: 

130 """Single-pass EUC-KR / CP949 structural analysis. 

131 

132 Lead 0xA1-0xFE; Trail 0xA1-0xFE 

133 

134 Returns (pair_ratio, mb_bytes, lead_diversity). 

135 """ 

136 lead_count = 0 

137 valid_count = 0 

138 mb = 0 

139 leads: set[int] = set() 

140 i = 0 

141 length = len(data) 

142 while i < length: 

143 b = data[i] 

144 if 0xA1 <= b <= 0xFE: 

145 lead_count += 1 

146 if i + 1 < length and 0xA1 <= data[i + 1] <= 0xFE: 

147 valid_count += 1 

148 leads.add(b) 

149 mb += 2 

150 i += 2 

151 continue 

152 i += 1 

153 else: 

154 i += 1 

155 ratio = valid_count / lead_count if lead_count > 0 else 0.0 

156 return ratio, mb, len(leads) 

157 

158 

159def _analyze_gb18030( 

160 data: bytes, 

161) -> tuple[float, int, int]: 

162 """Single-pass GB18030 / GB2312 structural analysis. 

163 

164 Only counts strict GB2312 2-byte pairs (lead 0xA1-0xF7, trail 0xA1-0xFE) 

165 and GB18030 4-byte sequences. The broader GBK extension range 

166 (lead 0x81-0xFE, trail 0x40-0x7E / 0x80-0xFE) is intentionally excluded 

167 because it is so permissive that unrelated single-byte data (EBCDIC, DOS 

168 codepages, etc.) can score 1.0, leading to false positives. 

169 

170 Returns (pair_ratio, mb_bytes, lead_diversity). 

171 """ 

172 lead_count = 0 

173 valid_count = 0 

174 mb = 0 

175 leads: set[int] = set() 

176 i = 0 

177 length = len(data) 

178 while i < length: 

179 b = data[i] 

180 if 0x81 <= b <= 0xFE: 

181 lead_count += 1 

182 # Try 4-byte first (byte2 in 0x30-0x39 distinguishes from 2-byte) 

183 if ( 

184 i + 3 < length 

185 and 0x30 <= data[i + 1] <= 0x39 

186 and 0x81 <= data[i + 2] <= 0xFE 

187 and 0x30 <= data[i + 3] <= 0x39 

188 ): 

189 valid_count += 1 

190 leads.add(b) 

191 mb += 2 # bytes 0 and 2 are non-ASCII 

192 i += 4 

193 continue 

194 # 2-byte GB2312: Lead 0xA1-0xF7, Trail 0xA1-0xFE 

195 if 0xA1 <= b <= 0xF7 and i + 1 < length and 0xA1 <= data[i + 1] <= 0xFE: 

196 valid_count += 1 

197 leads.add(b) 

198 mb += 2 # both bytes are > 0x7F 

199 i += 2 

200 continue 

201 i += 1 

202 else: 

203 i += 1 

204 ratio = valid_count / lead_count if lead_count > 0 else 0.0 

205 return ratio, mb, len(leads) 

206 

207 

208def _analyze_big5( 

209 data: bytes, 

210) -> tuple[float, int, int]: 

211 """Single-pass Big5 structural analysis. 

212 

213 Lead 0xA1-0xF9; Trail 0x40-0x7E, 0xA1-0xFE 

214 

215 Returns (pair_ratio, mb_bytes, lead_diversity). 

216 """ 

217 lead_count = 0 

218 valid_count = 0 

219 mb = 0 

220 leads: set[int] = set() 

221 i = 0 

222 length = len(data) 

223 while i < length: 

224 b = data[i] 

225 if 0xA1 <= b <= 0xF9: 

226 lead_count += 1 

227 if i + 1 < length: 

228 trail = data[i + 1] 

229 if (0x40 <= trail <= 0x7E) or (0xA1 <= trail <= 0xFE): 

230 valid_count += 1 

231 leads.add(b) 

232 # Lead is always > 0x7F; trail may or may not be 

233 mb += 1 

234 if trail > 0x7F: 

235 mb += 1 

236 i += 2 

237 continue 

238 i += 1 

239 else: 

240 i += 1 

241 ratio = valid_count / lead_count if lead_count > 0 else 0.0 

242 return ratio, mb, len(leads) 

243 

244 

245def _analyze_johab( 

246 data: bytes, 

247) -> tuple[float, int, int]: 

248 """Single-pass Johab structural analysis. 

249 

250 Lead: 0x84-0xD3, 0xD8-0xDE, 0xE0-0xF9 

251 Trail: 0x31-0x7E, 0x91-0xFE 

252 

253 Returns (pair_ratio, mb_bytes, lead_diversity). 

254 """ 

255 lead_count = 0 

256 valid_count = 0 

257 mb = 0 

258 leads: set[int] = set() 

259 i = 0 

260 length = len(data) 

261 while i < length: 

262 b = data[i] 

263 if (0x84 <= b <= 0xD3) or (0xD8 <= b <= 0xDE) or (0xE0 <= b <= 0xF9): 

264 lead_count += 1 

265 if i + 1 < length: 

266 trail = data[i + 1] 

267 if (0x31 <= trail <= 0x7E) or (0x91 <= trail <= 0xFE): 

268 valid_count += 1 

269 leads.add(b) 

270 if b > 0x7F: 

271 mb += 1 

272 if trail > 0x7F: 

273 mb += 1 

274 i += 2 

275 continue 

276 i += 1 

277 else: 

278 i += 1 

279 ratio = valid_count / lead_count if lead_count > 0 else 0.0 

280 return ratio, mb, len(leads) 

281 

282 

283# --------------------------------------------------------------------------- 

284# Dispatch table: encoding name -> analyzer function 

285# --------------------------------------------------------------------------- 

286 

287_ANALYZERS: dict[str, Callable[[bytes], tuple[float, int, int]]] = { 

288 "shift_jis_2004": _analyze_shift_jis, 

289 "cp932": _analyze_shift_jis, 

290 "euc_jis_2004": _analyze_euc_jp, 

291 "euc_kr": _analyze_euc_kr, 

292 "cp949": _analyze_euc_kr, 

293 "gb18030": _analyze_gb18030, 

294 "big5hkscs": _analyze_big5, 

295 "johab": _analyze_johab, 

296} 

297 

298 

299def _get_analysis( 

300 data: bytes, name: str, ctx: PipelineContext 

301) -> tuple[float, int, int] | None: 

302 """Return cached analysis or compute and cache it.""" 

303 cached = ctx.analysis_cache.get(name) 

304 if cached is not None: 

305 return cached 

306 analyzer = _ANALYZERS.get(name) 

307 if analyzer is None: 

308 return None 

309 result = analyzer(data) 

310 ctx.analysis_cache[name] = result 

311 return result 

312 

313 

314# --------------------------------------------------------------------------- 

315# Public API 

316# --------------------------------------------------------------------------- 

317 

318 

319def compute_structural_score( 

320 data: bytes, encoding_info: EncodingInfo, ctx: PipelineContext 

321) -> float: 

322 """Return 0.0--1.0 indicating how well *data* matches the encoding's structure. 

323 

324 For single-byte encodings, always returns 0.0. For empty data, always 

325 returns 0.0. 

326 

327 :param data: The raw byte data to analyze. 

328 :param encoding_info: Metadata for the encoding to probe. 

329 :param ctx: Pipeline context for caching analysis results. 

330 :returns: A structural fit score between 0.0 and 1.0. 

331 """ 

332 if not data or not encoding_info.is_multibyte: 

333 return 0.0 

334 

335 result = _get_analysis(data, encoding_info.name, ctx) 

336 if result is None: 

337 return 0.0 

338 

339 return result[0] # pair_ratio 

340 

341 

342def compute_multibyte_byte_coverage( 

343 data: bytes, 

344 encoding_info: EncodingInfo, 

345 ctx: PipelineContext, 

346 non_ascii_count: int | None = None, 

347) -> float: 

348 """Ratio of non-ASCII bytes that participate in valid multi-byte sequences. 

349 

350 Genuine CJK text has nearly all non-ASCII bytes paired into valid 

351 multi-byte sequences (coverage close to 1.0), while Latin text with 

352 scattered high bytes has many orphan bytes (coverage well below 1.0). 

353 

354 :param data: The raw byte data to analyze. 

355 :param encoding_info: Metadata for the encoding to probe. 

356 :param ctx: Pipeline context for caching analysis results. 

357 :param non_ascii_count: Pre-computed count of non-ASCII bytes, or ``None`` 

358 to compute from *data*. 

359 :returns: A coverage ratio between 0.0 and 1.0. 

360 """ 

361 if not data or not encoding_info.is_multibyte: 

362 return 0.0 

363 

364 result = _get_analysis(data, encoding_info.name, ctx) 

365 if result is None: 

366 return 0.0 

367 

368 mb_bytes = result[1] 

369 

370 non_ascii = ( 

371 non_ascii_count 

372 if non_ascii_count is not None 

373 else len(data) - len(data.translate(None, HIGH_BYTES)) 

374 ) 

375 if non_ascii == 0: 

376 return 0.0 

377 

378 return mb_bytes / non_ascii 

379 

380 

381def compute_lead_byte_diversity( 

382 data: bytes, encoding_info: EncodingInfo, ctx: PipelineContext 

383) -> int: 

384 """Count distinct lead byte values in valid multi-byte pairs. 

385 

386 Genuine CJK text uses lead bytes from across the encoding's full 

387 repertoire. European text falsely matching a CJK structural scorer 

388 clusters lead bytes in a narrow band. 

389 

390 :param data: The raw byte data to analyze. 

391 :param encoding_info: Metadata for the encoding to probe. 

392 :param ctx: Pipeline context for caching analysis results. 

393 :returns: The number of distinct lead byte values found. 

394 """ 

395 if not data or not encoding_info.is_multibyte: 

396 return 0 

397 result = _get_analysis(data, encoding_info.name, ctx) 

398 if result is None: 

399 return 256 # Unknown encoding -- don't gate 

400 return result[2] # lead_diversity