1"""Stage 2b: Multi-byte structural probing.
2
3Computes how well byte patterns in the data match the expected multi-byte
4structure for a given encoding. Used after byte-validity filtering (Stage 2a)
5to further rank multi-byte encoding candidates.
6
7Note: ``from __future__ import annotations`` is intentionally omitted because
8this module is compiled with mypyc, which does not support PEP 563 string
9annotations.
10"""
11
12from collections.abc import Callable
13
14from chardet.pipeline import HIGH_BYTES, PipelineContext
15from chardet.registry import EncodingInfo
16
17# ---------------------------------------------------------------------------
18# Per-encoding single-pass analyzers
19#
20# Each function walks the data once, computing three metrics simultaneously:
21# - pair_ratio: valid multi-byte pairs / lead bytes (structural score)
22# - mb_bytes: count of non-ASCII bytes in valid multi-byte sequences
23# - lead_diversity: count of distinct lead byte values in valid pairs
24#
25# These are kept as separate functions (rather than a single parameterized
26# analyzer) so that mypyc can inline the byte-range constants into each
27# function's tight loop.
28# ---------------------------------------------------------------------------
29
30
31def _analyze_shift_jis(
32 data: bytes,
33) -> tuple[float, int, int]:
34 """Single-pass Shift_JIS structural analysis.
35
36 Lead bytes: 0x81-0x9F, 0xE0-0xEF
37 Trail bytes: 0x40-0x7E, 0x80-0xFC
38
39 Returns (pair_ratio, mb_bytes, lead_diversity).
40 """
41 lead_count = 0
42 valid_count = 0
43 mb = 0
44 leads: set[int] = set()
45 i = 0
46 length = len(data)
47 while i < length:
48 b = data[i]
49 if (0x81 <= b <= 0x9F) or (0xE0 <= b <= 0xEF):
50 lead_count += 1
51 if i + 1 < length:
52 trail = data[i + 1]
53 if (0x40 <= trail <= 0x7E) or (0x80 <= trail <= 0xFC):
54 valid_count += 1
55 leads.add(b)
56 # Lead is always > 0x7F; trail may or may not be
57 mb += 1
58 if trail > 0x7F:
59 mb += 1
60 i += 2
61 continue
62 i += 1
63 else:
64 i += 1
65 ratio = valid_count / lead_count if lead_count > 0 else 0.0
66 return ratio, mb, len(leads)
67
68
69def _analyze_cp932(
70 data: bytes,
71) -> tuple[float, int, int]:
72 """Single-pass CP932 structural analysis.
73
74 Lead bytes: 0x81-0x9F, 0xE0-0xFC
75 Trail bytes: 0x40-0x7E, 0x80-0xFC
76
77 Extends Shift_JIS by raising the lead byte ceiling from 0xEF to 0xFC,
78 covering IBM vendor-defined characters (NEC-selected, IBM extensions).
79
80 Returns (pair_ratio, mb_bytes, lead_diversity).
81 """
82 lead_count = 0
83 valid_count = 0
84 mb = 0
85 leads: set[int] = set()
86 i = 0
87 length = len(data)
88 while i < length:
89 b = data[i]
90 if (0x81 <= b <= 0x9F) or (0xE0 <= b <= 0xFC):
91 lead_count += 1
92 if i + 1 < length:
93 trail = data[i + 1]
94 if (0x40 <= trail <= 0x7E) or (0x80 <= trail <= 0xFC):
95 valid_count += 1
96 leads.add(b)
97 # Lead is always > 0x7F; trail may or may not be
98 mb += 1
99 if trail > 0x7F:
100 mb += 1
101 i += 2
102 continue
103 i += 1
104 else:
105 i += 1
106 ratio = valid_count / lead_count if lead_count > 0 else 0.0
107 return ratio, mb, len(leads)
108
109
110def _analyze_euc_jp(
111 data: bytes,
112) -> tuple[float, int, int]:
113 """Single-pass EUC-JP structural analysis.
114
115 Two-byte: Lead 0xA1-0xFE, Trail 0xA1-0xFE
116 SS2 (half-width katakana): 0x8E + 0xA1-0xDF
117 SS3 (JIS X 0212): 0x8F + 0xA1-0xFE + 0xA1-0xFE
118
119 Returns (pair_ratio, mb_bytes, lead_diversity).
120 """
121 lead_count = 0
122 valid_count = 0
123 mb = 0
124 leads: set[int] = set()
125 i = 0
126 length = len(data)
127 while i < length:
128 b = data[i]
129 if b == 0x8E:
130 # SS2 sequence
131 lead_count += 1
132 if i + 1 < length and 0xA1 <= data[i + 1] <= 0xDF:
133 valid_count += 1
134 leads.add(b)
135 mb += 2
136 i += 2
137 continue
138 i += 1
139 elif b == 0x8F:
140 # SS3 sequence
141 lead_count += 1
142 if (
143 i + 2 < length
144 and 0xA1 <= data[i + 1] <= 0xFE
145 and 0xA1 <= data[i + 2] <= 0xFE
146 ):
147 valid_count += 1
148 leads.add(b)
149 mb += 3
150 i += 3
151 continue
152 i += 1
153 elif 0xA1 <= b <= 0xFE:
154 lead_count += 1
155 if i + 1 < length and 0xA1 <= data[i + 1] <= 0xFE:
156 valid_count += 1
157 leads.add(b)
158 mb += 2
159 i += 2
160 continue
161 i += 1
162 else:
163 i += 1
164 ratio = valid_count / lead_count if lead_count > 0 else 0.0
165 return ratio, mb, len(leads)
166
167
168def _analyze_euc_kr(
169 data: bytes,
170) -> tuple[float, int, int]:
171 """Single-pass EUC-KR structural analysis.
172
173 Lead 0xA1-0xFE; Trail 0xA1-0xFE
174
175 Returns (pair_ratio, mb_bytes, lead_diversity).
176 """
177 lead_count = 0
178 valid_count = 0
179 mb = 0
180 leads: set[int] = set()
181 i = 0
182 length = len(data)
183 while i < length:
184 b = data[i]
185 if 0xA1 <= b <= 0xFE:
186 lead_count += 1
187 if i + 1 < length and 0xA1 <= data[i + 1] <= 0xFE:
188 valid_count += 1
189 leads.add(b)
190 mb += 2
191 i += 2
192 continue
193 i += 1
194 else:
195 i += 1
196 ratio = valid_count / lead_count if lead_count > 0 else 0.0
197 return ratio, mb, len(leads)
198
199
200def _analyze_cp949(
201 data: bytes,
202) -> tuple[float, int, int]:
203 """Single-pass CP949 (Unified Hangul Code) structural analysis.
204
205 Lead bytes: 0x81-0xC8, 0xCA-0xFD
206 Trail bytes: 0x41-0x5A, 0x61-0x7A, 0x81-0xFE
207
208 Extends EUC-KR by lowering the lead byte floor from 0xA1 to 0x81 and
209 adding ASCII letter trail ranges plus 0x81-0xA0. 0xC9 is not a valid
210 UHC lead byte.
211
212 Returns (pair_ratio, mb_bytes, lead_diversity).
213 """
214 lead_count = 0
215 valid_count = 0
216 mb = 0
217 leads: set[int] = set()
218 i = 0
219 length = len(data)
220 while i < length:
221 b = data[i]
222 if (0x81 <= b <= 0xC8) or (0xCA <= b <= 0xFD):
223 lead_count += 1
224 if i + 1 < length:
225 trail = data[i + 1]
226 if (
227 (0x41 <= trail <= 0x5A)
228 or (0x61 <= trail <= 0x7A)
229 or (0x81 <= trail <= 0xFE)
230 ):
231 valid_count += 1
232 leads.add(b)
233 # Lead is always > 0x7F; trail may or may not be
234 mb += 1
235 if trail > 0x7F:
236 mb += 1
237 i += 2
238 continue
239 i += 1
240 else:
241 i += 1
242 ratio = valid_count / lead_count if lead_count > 0 else 0.0
243 return ratio, mb, len(leads)
244
245
246def _analyze_gb18030(
247 data: bytes,
248) -> tuple[float, int, int]:
249 """Single-pass GB18030 / GB2312 structural analysis.
250
251 Only counts strict GB2312 2-byte pairs (lead 0xA1-0xF7, trail 0xA1-0xFE)
252 and GB18030 4-byte sequences. The broader GBK extension range
253 (lead 0x81-0xFE, trail 0x40-0x7E / 0x80-0xFE) is intentionally excluded
254 because it is so permissive that unrelated single-byte data (EBCDIC, DOS
255 codepages, etc.) can score 1.0, leading to false positives.
256
257 Returns (pair_ratio, mb_bytes, lead_diversity).
258 """
259 lead_count = 0
260 valid_count = 0
261 mb = 0
262 leads: set[int] = set()
263 i = 0
264 length = len(data)
265 while i < length:
266 b = data[i]
267 if 0x81 <= b <= 0xFE:
268 lead_count += 1
269 # Try 4-byte first (byte2 in 0x30-0x39 distinguishes from 2-byte)
270 if (
271 i + 3 < length
272 and 0x30 <= data[i + 1] <= 0x39
273 and 0x81 <= data[i + 2] <= 0xFE
274 and 0x30 <= data[i + 3] <= 0x39
275 ):
276 valid_count += 1
277 leads.add(b)
278 mb += 2 # bytes 0 and 2 are non-ASCII
279 i += 4
280 continue
281 # 2-byte GB2312: Lead 0xA1-0xF7, Trail 0xA1-0xFE
282 if 0xA1 <= b <= 0xF7 and i + 1 < length and 0xA1 <= data[i + 1] <= 0xFE:
283 valid_count += 1
284 leads.add(b)
285 mb += 2 # both bytes are > 0x7F
286 i += 2
287 continue
288 i += 1
289 else:
290 i += 1
291 ratio = valid_count / lead_count if lead_count > 0 else 0.0
292 return ratio, mb, len(leads)
293
294
295def _analyze_big5(
296 data: bytes,
297) -> tuple[float, int, int]:
298 """Single-pass Big5 structural analysis.
299
300 Lead 0xA1-0xF9; Trail 0x40-0x7E, 0xA1-0xFE
301
302 Returns (pair_ratio, mb_bytes, lead_diversity).
303 """
304 lead_count = 0
305 valid_count = 0
306 mb = 0
307 leads: set[int] = set()
308 i = 0
309 length = len(data)
310 while i < length:
311 b = data[i]
312 if 0xA1 <= b <= 0xF9:
313 lead_count += 1
314 if i + 1 < length:
315 trail = data[i + 1]
316 if (0x40 <= trail <= 0x7E) or (0xA1 <= trail <= 0xFE):
317 valid_count += 1
318 leads.add(b)
319 # Lead is always > 0x7F; trail may or may not be
320 mb += 1
321 if trail > 0x7F:
322 mb += 1
323 i += 2
324 continue
325 i += 1
326 else:
327 i += 1
328 ratio = valid_count / lead_count if lead_count > 0 else 0.0
329 return ratio, mb, len(leads)
330
331
332def _analyze_big5hkscs(
333 data: bytes,
334) -> tuple[float, int, int]:
335 """Single-pass Big5-HKSCS structural analysis.
336
337 Lead bytes: 0x87-0xFE
338 Trail bytes: 0x40-0x7E, 0xA1-0xFE
339
340 Extends Big5 by lowering the lead byte floor from 0xA1 to 0x87 and
341 raising the ceiling from 0xF9 to 0xFE. 0x7F and 0x80-0xA0 are not
342 valid Big5/HKSCS trail bytes.
343
344 Returns (pair_ratio, mb_bytes, lead_diversity).
345 """
346 lead_count = 0
347 valid_count = 0
348 mb = 0
349 leads: set[int] = set()
350 i = 0
351 length = len(data)
352 while i < length:
353 b = data[i]
354 if 0x87 <= b <= 0xFE:
355 lead_count += 1
356 if i + 1 < length:
357 trail = data[i + 1]
358 if (0x40 <= trail <= 0x7E) or (0xA1 <= trail <= 0xFE):
359 valid_count += 1
360 leads.add(b)
361 # Lead is always > 0x7F; trail may or may not be
362 mb += 1
363 if trail > 0x7F:
364 mb += 1
365 i += 2
366 continue
367 i += 1
368 else:
369 i += 1
370 ratio = valid_count / lead_count if lead_count > 0 else 0.0
371 return ratio, mb, len(leads)
372
373
374def _analyze_johab(
375 data: bytes,
376) -> tuple[float, int, int]:
377 """Single-pass Johab structural analysis.
378
379 Lead: 0x84-0xD3, 0xD8-0xDE, 0xE0-0xF9
380 Trail: 0x31-0x7E, 0x91-0xFE
381
382 Returns (pair_ratio, mb_bytes, lead_diversity).
383 """
384 lead_count = 0
385 valid_count = 0
386 mb = 0
387 leads: set[int] = set()
388 i = 0
389 length = len(data)
390 while i < length:
391 b = data[i]
392 if (0x84 <= b <= 0xD3) or (0xD8 <= b <= 0xDE) or (0xE0 <= b <= 0xF9):
393 lead_count += 1
394 if i + 1 < length:
395 trail = data[i + 1]
396 if (0x31 <= trail <= 0x7E) or (0x91 <= trail <= 0xFE):
397 valid_count += 1
398 leads.add(b)
399 if b > 0x7F:
400 mb += 1
401 if trail > 0x7F:
402 mb += 1
403 i += 2
404 continue
405 i += 1
406 else:
407 i += 1
408 ratio = valid_count / lead_count if lead_count > 0 else 0.0
409 return ratio, mb, len(leads)
410
411
412# ---------------------------------------------------------------------------
413# Dispatch table: encoding name -> analyzer function
414# ---------------------------------------------------------------------------
415
416_ANALYZERS: dict[str, Callable[[bytes], tuple[float, int, int]]] = {
417 "shift_jis_2004": _analyze_shift_jis,
418 "cp932": _analyze_cp932,
419 "euc_jis_2004": _analyze_euc_jp,
420 "euc_kr": _analyze_euc_kr,
421 "cp949": _analyze_cp949,
422 "gb18030": _analyze_gb18030,
423 "big5hkscs": _analyze_big5hkscs,
424 "johab": _analyze_johab,
425}
426
427
428def _get_analysis(
429 data: bytes, name: str, ctx: PipelineContext
430) -> tuple[float, int, int] | None:
431 """Return cached analysis or compute and cache it."""
432 cached = ctx.analysis_cache.get(name)
433 if cached is not None:
434 return cached
435 analyzer = _ANALYZERS.get(name)
436 if analyzer is None:
437 return None
438 result = analyzer(data)
439 ctx.analysis_cache[name] = result
440 return result
441
442
443# ---------------------------------------------------------------------------
444# Public API
445# ---------------------------------------------------------------------------
446
447
448def compute_structural_score(
449 data: bytes, encoding_info: EncodingInfo, ctx: PipelineContext
450) -> float:
451 """Return 0.0--1.0 indicating how well *data* matches the encoding's structure.
452
453 For single-byte encodings, always returns 0.0. For empty data, always
454 returns 0.0.
455
456 :param data: The raw byte data to analyze.
457 :param encoding_info: Metadata for the encoding to probe.
458 :param ctx: Pipeline context for caching analysis results.
459 :returns: A structural fit score between 0.0 and 1.0.
460 """
461 if not data or not encoding_info.is_multibyte:
462 return 0.0
463
464 result = _get_analysis(data, encoding_info.name, ctx)
465 if result is None:
466 return 0.0
467
468 return result[0] # pair_ratio
469
470
471def compute_multibyte_byte_coverage(
472 data: bytes,
473 encoding_info: EncodingInfo,
474 ctx: PipelineContext,
475 non_ascii_count: int | None = None,
476) -> float:
477 """Ratio of non-ASCII bytes that participate in valid multi-byte sequences.
478
479 Genuine CJK text has nearly all non-ASCII bytes paired into valid
480 multi-byte sequences (coverage close to 1.0), while Latin text with
481 scattered high bytes has many orphan bytes (coverage well below 1.0).
482
483 :param data: The raw byte data to analyze.
484 :param encoding_info: Metadata for the encoding to probe.
485 :param ctx: Pipeline context for caching analysis results.
486 :param non_ascii_count: Pre-computed count of non-ASCII bytes, or ``None``
487 to compute from *data*.
488 :returns: A coverage ratio between 0.0 and 1.0.
489 """
490 if not data or not encoding_info.is_multibyte:
491 return 0.0
492
493 result = _get_analysis(data, encoding_info.name, ctx)
494 if result is None:
495 return 0.0
496
497 mb_bytes = result[1]
498
499 non_ascii = (
500 non_ascii_count
501 if non_ascii_count is not None
502 else len(data) - len(data.translate(None, HIGH_BYTES))
503 )
504 if non_ascii == 0:
505 return 0.0
506
507 return mb_bytes / non_ascii
508
509
510def compute_lead_byte_diversity(
511 data: bytes, encoding_info: EncodingInfo, ctx: PipelineContext
512) -> int:
513 """Count distinct lead byte values in valid multi-byte pairs.
514
515 Genuine CJK text uses lead bytes from across the encoding's full
516 repertoire. European text falsely matching a CJK structural scorer
517 clusters lead bytes in a narrow band.
518
519 :param data: The raw byte data to analyze.
520 :param encoding_info: Metadata for the encoding to probe.
521 :param ctx: Pipeline context for caching analysis results.
522 :returns: The number of distinct lead byte values found.
523 """
524 if not data or not encoding_info.is_multibyte:
525 return 0
526 result = _get_analysis(data, encoding_info.name, ctx)
527 if result is None:
528 return 256 # Unknown encoding -- don't gate
529 return result[2] # lead_diversity