1"""Stage 2b: Multi-byte structural probing.
2
3Computes how well byte patterns in the data match the expected multi-byte
4structure for a given encoding. Used after byte-validity filtering (Stage 2a)
5to further rank multi-byte encoding candidates.
6
7Note: ``from __future__ import annotations`` is intentionally omitted because
8this module is compiled with mypyc, which does not support PEP 563 string
9annotations.
10"""
11
12from collections.abc import Callable
13
14from chardet.pipeline import HIGH_BYTES, PipelineContext
15from chardet.registry import EncodingInfo
16
17# ---------------------------------------------------------------------------
18# Per-encoding single-pass analyzers
19#
20# Each function walks the data once, computing three metrics simultaneously:
21# - pair_ratio: valid multi-byte pairs / lead bytes (structural score)
22# - mb_bytes: count of non-ASCII bytes in valid multi-byte sequences
23# - lead_diversity: count of distinct lead byte values in valid pairs
24#
25# These are kept as separate functions (rather than a single parameterized
26# analyzer) so that mypyc can inline the byte-range constants into each
27# function's tight loop.
28# ---------------------------------------------------------------------------
29
30
31def _analyze_shift_jis(
32 data: bytes,
33) -> tuple[float, int, int]:
34 """Single-pass Shift_JIS / CP932 structural analysis.
35
36 Lead bytes: 0x81-0x9F, 0xE0-0xEF
37 Trail bytes: 0x40-0x7E, 0x80-0xFC
38
39 Returns (pair_ratio, mb_bytes, lead_diversity).
40 """
41 lead_count = 0
42 valid_count = 0
43 mb = 0
44 leads: set[int] = set()
45 i = 0
46 length = len(data)
47 while i < length:
48 b = data[i]
49 if (0x81 <= b <= 0x9F) or (0xE0 <= b <= 0xEF):
50 lead_count += 1
51 if i + 1 < length:
52 trail = data[i + 1]
53 if (0x40 <= trail <= 0x7E) or (0x80 <= trail <= 0xFC):
54 valid_count += 1
55 leads.add(b)
56 # Lead is always > 0x7F; trail may or may not be
57 mb += 1
58 if trail > 0x7F:
59 mb += 1
60 i += 2
61 continue
62 i += 1
63 else:
64 i += 1
65 ratio = valid_count / lead_count if lead_count > 0 else 0.0
66 return ratio, mb, len(leads)
67
68
69def _analyze_euc_jp(
70 data: bytes,
71) -> tuple[float, int, int]:
72 """Single-pass EUC-JP structural analysis.
73
74 Two-byte: Lead 0xA1-0xFE, Trail 0xA1-0xFE
75 SS2 (half-width katakana): 0x8E + 0xA1-0xDF
76 SS3 (JIS X 0212): 0x8F + 0xA1-0xFE + 0xA1-0xFE
77
78 Returns (pair_ratio, mb_bytes, lead_diversity).
79 """
80 lead_count = 0
81 valid_count = 0
82 mb = 0
83 leads: set[int] = set()
84 i = 0
85 length = len(data)
86 while i < length:
87 b = data[i]
88 if b == 0x8E:
89 # SS2 sequence
90 lead_count += 1
91 if i + 1 < length and 0xA1 <= data[i + 1] <= 0xDF:
92 valid_count += 1
93 leads.add(b)
94 mb += 2
95 i += 2
96 continue
97 i += 1
98 elif b == 0x8F:
99 # SS3 sequence
100 lead_count += 1
101 if (
102 i + 2 < length
103 and 0xA1 <= data[i + 1] <= 0xFE
104 and 0xA1 <= data[i + 2] <= 0xFE
105 ):
106 valid_count += 1
107 leads.add(b)
108 mb += 3
109 i += 3
110 continue
111 i += 1
112 elif 0xA1 <= b <= 0xFE:
113 lead_count += 1
114 if i + 1 < length and 0xA1 <= data[i + 1] <= 0xFE:
115 valid_count += 1
116 leads.add(b)
117 mb += 2
118 i += 2
119 continue
120 i += 1
121 else:
122 i += 1
123 ratio = valid_count / lead_count if lead_count > 0 else 0.0
124 return ratio, mb, len(leads)
125
126
127def _analyze_euc_kr(
128 data: bytes,
129) -> tuple[float, int, int]:
130 """Single-pass EUC-KR / CP949 structural analysis.
131
132 Lead 0xA1-0xFE; Trail 0xA1-0xFE
133
134 Returns (pair_ratio, mb_bytes, lead_diversity).
135 """
136 lead_count = 0
137 valid_count = 0
138 mb = 0
139 leads: set[int] = set()
140 i = 0
141 length = len(data)
142 while i < length:
143 b = data[i]
144 if 0xA1 <= b <= 0xFE:
145 lead_count += 1
146 if i + 1 < length and 0xA1 <= data[i + 1] <= 0xFE:
147 valid_count += 1
148 leads.add(b)
149 mb += 2
150 i += 2
151 continue
152 i += 1
153 else:
154 i += 1
155 ratio = valid_count / lead_count if lead_count > 0 else 0.0
156 return ratio, mb, len(leads)
157
158
159def _analyze_gb18030(
160 data: bytes,
161) -> tuple[float, int, int]:
162 """Single-pass GB18030 / GB2312 structural analysis.
163
164 Only counts strict GB2312 2-byte pairs (lead 0xA1-0xF7, trail 0xA1-0xFE)
165 and GB18030 4-byte sequences. The broader GBK extension range
166 (lead 0x81-0xFE, trail 0x40-0x7E / 0x80-0xFE) is intentionally excluded
167 because it is so permissive that unrelated single-byte data (EBCDIC, DOS
168 codepages, etc.) can score 1.0, leading to false positives.
169
170 Returns (pair_ratio, mb_bytes, lead_diversity).
171 """
172 lead_count = 0
173 valid_count = 0
174 mb = 0
175 leads: set[int] = set()
176 i = 0
177 length = len(data)
178 while i < length:
179 b = data[i]
180 if 0x81 <= b <= 0xFE:
181 lead_count += 1
182 # Try 4-byte first (byte2 in 0x30-0x39 distinguishes from 2-byte)
183 if (
184 i + 3 < length
185 and 0x30 <= data[i + 1] <= 0x39
186 and 0x81 <= data[i + 2] <= 0xFE
187 and 0x30 <= data[i + 3] <= 0x39
188 ):
189 valid_count += 1
190 leads.add(b)
191 mb += 2 # bytes 0 and 2 are non-ASCII
192 i += 4
193 continue
194 # 2-byte GB2312: Lead 0xA1-0xF7, Trail 0xA1-0xFE
195 if 0xA1 <= b <= 0xF7 and i + 1 < length and 0xA1 <= data[i + 1] <= 0xFE:
196 valid_count += 1
197 leads.add(b)
198 mb += 2 # both bytes are > 0x7F
199 i += 2
200 continue
201 i += 1
202 else:
203 i += 1
204 ratio = valid_count / lead_count if lead_count > 0 else 0.0
205 return ratio, mb, len(leads)
206
207
208def _analyze_big5(
209 data: bytes,
210) -> tuple[float, int, int]:
211 """Single-pass Big5 structural analysis.
212
213 Lead 0xA1-0xF9; Trail 0x40-0x7E, 0xA1-0xFE
214
215 Returns (pair_ratio, mb_bytes, lead_diversity).
216 """
217 lead_count = 0
218 valid_count = 0
219 mb = 0
220 leads: set[int] = set()
221 i = 0
222 length = len(data)
223 while i < length:
224 b = data[i]
225 if 0xA1 <= b <= 0xF9:
226 lead_count += 1
227 if i + 1 < length:
228 trail = data[i + 1]
229 if (0x40 <= trail <= 0x7E) or (0xA1 <= trail <= 0xFE):
230 valid_count += 1
231 leads.add(b)
232 # Lead is always > 0x7F; trail may or may not be
233 mb += 1
234 if trail > 0x7F:
235 mb += 1
236 i += 2
237 continue
238 i += 1
239 else:
240 i += 1
241 ratio = valid_count / lead_count if lead_count > 0 else 0.0
242 return ratio, mb, len(leads)
243
244
245def _analyze_johab(
246 data: bytes,
247) -> tuple[float, int, int]:
248 """Single-pass Johab structural analysis.
249
250 Lead: 0x84-0xD3, 0xD8-0xDE, 0xE0-0xF9
251 Trail: 0x31-0x7E, 0x91-0xFE
252
253 Returns (pair_ratio, mb_bytes, lead_diversity).
254 """
255 lead_count = 0
256 valid_count = 0
257 mb = 0
258 leads: set[int] = set()
259 i = 0
260 length = len(data)
261 while i < length:
262 b = data[i]
263 if (0x84 <= b <= 0xD3) or (0xD8 <= b <= 0xDE) or (0xE0 <= b <= 0xF9):
264 lead_count += 1
265 if i + 1 < length:
266 trail = data[i + 1]
267 if (0x31 <= trail <= 0x7E) or (0x91 <= trail <= 0xFE):
268 valid_count += 1
269 leads.add(b)
270 if b > 0x7F:
271 mb += 1
272 if trail > 0x7F:
273 mb += 1
274 i += 2
275 continue
276 i += 1
277 else:
278 i += 1
279 ratio = valid_count / lead_count if lead_count > 0 else 0.0
280 return ratio, mb, len(leads)
281
282
283# ---------------------------------------------------------------------------
284# Dispatch table: encoding name -> analyzer function
285# ---------------------------------------------------------------------------
286
287_ANALYZERS: dict[str, Callable[[bytes], tuple[float, int, int]]] = {
288 "shift_jis_2004": _analyze_shift_jis,
289 "cp932": _analyze_shift_jis,
290 "euc_jis_2004": _analyze_euc_jp,
291 "euc_kr": _analyze_euc_kr,
292 "cp949": _analyze_euc_kr,
293 "gb18030": _analyze_gb18030,
294 "big5hkscs": _analyze_big5,
295 "johab": _analyze_johab,
296}
297
298
299def _get_analysis(
300 data: bytes, name: str, ctx: PipelineContext
301) -> tuple[float, int, int] | None:
302 """Return cached analysis or compute and cache it."""
303 cached = ctx.analysis_cache.get(name)
304 if cached is not None:
305 return cached
306 analyzer = _ANALYZERS.get(name)
307 if analyzer is None:
308 return None
309 result = analyzer(data)
310 ctx.analysis_cache[name] = result
311 return result
312
313
314# ---------------------------------------------------------------------------
315# Public API
316# ---------------------------------------------------------------------------
317
318
319def compute_structural_score(
320 data: bytes, encoding_info: EncodingInfo, ctx: PipelineContext
321) -> float:
322 """Return 0.0--1.0 indicating how well *data* matches the encoding's structure.
323
324 For single-byte encodings, always returns 0.0. For empty data, always
325 returns 0.0.
326
327 :param data: The raw byte data to analyze.
328 :param encoding_info: Metadata for the encoding to probe.
329 :param ctx: Pipeline context for caching analysis results.
330 :returns: A structural fit score between 0.0 and 1.0.
331 """
332 if not data or not encoding_info.is_multibyte:
333 return 0.0
334
335 result = _get_analysis(data, encoding_info.name, ctx)
336 if result is None:
337 return 0.0
338
339 return result[0] # pair_ratio
340
341
342def compute_multibyte_byte_coverage(
343 data: bytes,
344 encoding_info: EncodingInfo,
345 ctx: PipelineContext,
346 non_ascii_count: int | None = None,
347) -> float:
348 """Ratio of non-ASCII bytes that participate in valid multi-byte sequences.
349
350 Genuine CJK text has nearly all non-ASCII bytes paired into valid
351 multi-byte sequences (coverage close to 1.0), while Latin text with
352 scattered high bytes has many orphan bytes (coverage well below 1.0).
353
354 :param data: The raw byte data to analyze.
355 :param encoding_info: Metadata for the encoding to probe.
356 :param ctx: Pipeline context for caching analysis results.
357 :param non_ascii_count: Pre-computed count of non-ASCII bytes, or ``None``
358 to compute from *data*.
359 :returns: A coverage ratio between 0.0 and 1.0.
360 """
361 if not data or not encoding_info.is_multibyte:
362 return 0.0
363
364 result = _get_analysis(data, encoding_info.name, ctx)
365 if result is None:
366 return 0.0
367
368 mb_bytes = result[1]
369
370 non_ascii = (
371 non_ascii_count
372 if non_ascii_count is not None
373 else len(data) - len(data.translate(None, HIGH_BYTES))
374 )
375 if non_ascii == 0:
376 return 0.0
377
378 return mb_bytes / non_ascii
379
380
381def compute_lead_byte_diversity(
382 data: bytes, encoding_info: EncodingInfo, ctx: PipelineContext
383) -> int:
384 """Count distinct lead byte values in valid multi-byte pairs.
385
386 Genuine CJK text uses lead bytes from across the encoding's full
387 repertoire. European text falsely matching a CJK structural scorer
388 clusters lead bytes in a narrow band.
389
390 :param data: The raw byte data to analyze.
391 :param encoding_info: Metadata for the encoding to probe.
392 :param ctx: Pipeline context for caching analysis results.
393 :returns: The number of distinct lead byte values found.
394 """
395 if not data or not encoding_info.is_multibyte:
396 return 0
397 result = _get_analysis(data, encoding_info.name, ctx)
398 if result is None:
399 return 256 # Unknown encoding -- don't gate
400 return result[2] # lead_diversity