1"""Confusion group resolution for similar single-byte encodings.
2
3At runtime, loads pre-computed distinguishing byte maps from confusion.bin
4and uses them to resolve statistical scoring ties between similar encodings.
5
6Build-time computation (``compute_confusion_groups``, ``compute_distinguishing_maps``,
7``serialize_confusion_data``) lives in ``scripts/confusion_training.py``.
8"""
9
10from __future__ import annotations
11
12import functools
13import importlib.resources
14import struct
15import warnings
16
17from chardet.models import (
18 NON_ASCII_BIGRAM_WEIGHT,
19 BigramProfile,
20 get_enc_index,
21 score_with_profile,
22)
23from chardet.pipeline import DetectionResult
24from chardet.registry import lookup_encoding
25
26# Type alias for the distinguishing map structure:
27# Maps (enc_a, enc_b) -> (distinguishing_byte_set, {byte_val: (cat_a, cat_b)})
28DistinguishingMaps = dict[
29 tuple[str, str],
30 tuple[frozenset[int], dict[int, tuple[str, str]]],
31]
32
33# uint8 -> Unicode general category, inverse of the mapping in
34# scripts/confusion_training.py used at serialization time.
35_INT_TO_CATEGORY: dict[int, str] = {
36 0: "Lu",
37 1: "Ll",
38 2: "Lt",
39 3: "Lm",
40 4: "Lo",
41 5: "Mn",
42 6: "Mc",
43 7: "Me",
44 8: "Nd",
45 9: "Nl",
46 10: "No",
47 11: "Pc",
48 12: "Pd",
49 13: "Ps",
50 14: "Pe",
51 15: "Pi",
52 16: "Pf",
53 17: "Po",
54 18: "Sm",
55 19: "Sc",
56 20: "Sk",
57 21: "So",
58 22: "Zs",
59 23: "Zl",
60 24: "Zp",
61 25: "Cc",
62 26: "Cf",
63 27: "Cs",
64 28: "Co",
65 29: "Cn",
66}
67
68# Inverse mapping for serialization — used by scripts/confusion_training.py.
69_CATEGORY_TO_INT: dict[str, int] = {v: k for k, v in _INT_TO_CATEGORY.items()}
70
71
72def deserialize_confusion_data_from_bytes(data: bytes) -> DistinguishingMaps:
73 """Load confusion group data from raw bytes.
74
75 :param data: The raw binary content of a confusion.bin file.
76 :returns: A :data:`DistinguishingMaps` dictionary keyed by encoding pairs.
77 """
78 result: DistinguishingMaps = {}
79 offset = 0
80 (num_pairs,) = struct.unpack_from("!H", data, offset)
81 offset += 2
82
83 for _ in range(num_pairs):
84 (name_a_len,) = struct.unpack_from("!B", data, offset)
85 offset += 1
86 name_a = data[offset : offset + name_a_len].decode("utf-8")
87 offset += name_a_len
88
89 (name_b_len,) = struct.unpack_from("!B", data, offset)
90 offset += 1
91 name_b = data[offset : offset + name_b_len].decode("utf-8")
92 offset += name_b_len
93
94 (num_diffs,) = struct.unpack_from("!B", data, offset)
95 offset += 1
96
97 diff_bytes_list: list[int] = []
98 categories: dict[int, tuple[str, str]] = {}
99 for _ in range(num_diffs):
100 bv, cat_a_int, cat_b_int = struct.unpack_from("!BBB", data, offset)
101 offset += 3
102 diff_bytes_list.append(bv)
103 categories[bv] = (
104 _INT_TO_CATEGORY.get(cat_a_int, "Cn"),
105 _INT_TO_CATEGORY.get(cat_b_int, "Cn"),
106 )
107 result[(name_a, name_b)] = (frozenset(diff_bytes_list), categories)
108
109 return result
110
111
112@functools.cache
113def load_confusion_data() -> DistinguishingMaps:
114 """Load confusion group data from the bundled confusion.bin file.
115
116 :returns: A :data:`DistinguishingMaps` dictionary keyed by encoding pairs.
117 """
118 ref = importlib.resources.files("chardet.models").joinpath("confusion.bin")
119 raw = ref.read_bytes()
120 if not raw:
121 warnings.warn(
122 "chardet confusion.bin is empty — confusion resolution disabled; "
123 "reinstall chardet to fix",
124 RuntimeWarning,
125 stacklevel=2,
126 )
127 return {}
128 try:
129 raw_maps = deserialize_confusion_data_from_bytes(raw)
130 except (struct.error, UnicodeDecodeError) as e:
131 msg = f"corrupt confusion.bin: {e}"
132 raise ValueError(msg) from e
133 # Normalize keys to canonical codec names so pipeline output matches.
134 normalized: DistinguishingMaps = {}
135 for (a, b), value in raw_maps.items():
136 norm_a = lookup_encoding(a) or a
137 norm_b = lookup_encoding(b) or b
138 normalized[(norm_a, norm_b)] = value
139 return normalized
140
141
142# Unicode general category preference scores for voting resolution.
143# Higher scores indicate more linguistically meaningful characters.
144_CATEGORY_PREFERENCE: dict[str, int] = {
145 "Lu": 10,
146 "Ll": 10,
147 "Lt": 10,
148 "Lm": 9,
149 "Lo": 9,
150 "Nd": 8,
151 "Nl": 7,
152 "No": 7,
153 "Pc": 6,
154 "Pd": 6,
155 "Ps": 6,
156 "Pe": 6,
157 "Pi": 6,
158 "Pf": 6,
159 "Po": 6,
160 "Sc": 5,
161 "Sm": 5,
162 "Sk": 4,
163 "So": 4,
164 "Zs": 3,
165 "Zl": 3,
166 "Zp": 3,
167 "Cf": 2,
168 "Cc": 1,
169 "Co": 1,
170 "Cs": 0,
171 "Cn": 0,
172 "Mn": 5,
173 "Mc": 5,
174 "Me": 5,
175}
176
177
178def resolve_by_category_voting(
179 data: bytes,
180 enc_a: str,
181 enc_b: str,
182 diff_bytes: frozenset[int],
183 categories: dict[int, tuple[str, str]],
184) -> str | None:
185 """Resolve between two encodings using Unicode category voting.
186
187 For each distinguishing byte present in the data, compare the Unicode
188 general category under each encoding. The encoding whose interpretation
189 has the higher category preference score gets a vote. The encoding with
190 more votes wins.
191
192 :param data: The raw byte data to examine.
193 :param enc_a: First encoding name.
194 :param enc_b: Second encoding name.
195 :param diff_bytes: Byte values where the two encodings differ.
196 :param categories: Mapping of byte value to ``(cat_a, cat_b)`` Unicode
197 general category pairs.
198 :returns: The winning encoding name, or ``None`` if tied.
199 """
200 votes_a = 0
201 votes_b = 0
202 relevant = frozenset(data) & diff_bytes
203 if not relevant:
204 return None
205 for bv in relevant:
206 cat_a, cat_b = categories[bv]
207 pref_a = _CATEGORY_PREFERENCE.get(cat_a, 0)
208 pref_b = _CATEGORY_PREFERENCE.get(cat_b, 0)
209 if pref_a > pref_b:
210 votes_a += pref_a - pref_b
211 elif pref_b > pref_a:
212 votes_b += pref_b - pref_a
213 if votes_a > votes_b:
214 return enc_a
215 if votes_b > votes_a:
216 return enc_b
217 return None
218
219
220def _best_variant_score(
221 profile: BigramProfile,
222 index: dict[str, list[tuple[str | None, bytearray, str]]],
223 enc: str,
224) -> float:
225 """Return the best bigram score across all language variants for *enc*."""
226 variants = index.get(enc)
227 if not variants:
228 return 0.0
229 return max(
230 score_with_profile(profile, model, model_key)
231 for _, model, model_key in variants
232 )
233
234
235def resolve_by_bigram_rescore(
236 data: bytes,
237 enc_a: str,
238 enc_b: str,
239 diff_bytes: frozenset[int],
240) -> str | None:
241 """Resolve between two encodings by re-scoring only distinguishing bigrams.
242
243 Builds a focused bigram profile containing only bigrams where at least one
244 byte is a distinguishing byte, then scores both encodings against their
245 best language model.
246
247 :param data: The raw byte data to examine.
248 :param enc_a: First encoding name.
249 :param enc_b: Second encoding name.
250 :param diff_bytes: Byte values where the two encodings differ.
251 :returns: The winning encoding name, or ``None`` if tied.
252 """
253 if len(data) < 2:
254 return None
255
256 freq: dict[int, int] = {}
257 for i in range(len(data) - 1):
258 b1 = data[i]
259 b2 = data[i + 1]
260 if b1 not in diff_bytes and b2 not in diff_bytes:
261 continue
262 idx = (b1 << 8) | b2
263 weight = NON_ASCII_BIGRAM_WEIGHT if (b1 > 0x7F or b2 > 0x7F) else 1
264 freq[idx] = freq.get(idx, 0) + weight
265
266 if not freq:
267 return None
268
269 profile = BigramProfile.from_weighted_freq(freq)
270
271 index = get_enc_index()
272 best_a = _best_variant_score(profile, index, enc_a)
273 best_b = _best_variant_score(profile, index, enc_b)
274
275 if best_a > best_b:
276 return enc_a
277 if best_b > best_a:
278 return enc_b
279 return None
280
281
282def _find_pair_key(
283 maps: DistinguishingMaps,
284 enc_a: str,
285 enc_b: str,
286) -> tuple[str, str] | None:
287 """Find the canonical key for a pair of encodings in the confusion maps."""
288 if (enc_a, enc_b) in maps:
289 return (enc_a, enc_b)
290 if (enc_b, enc_a) in maps:
291 return (enc_b, enc_a)
292 return None
293
294
295def resolve_confusion_groups(
296 data: bytes,
297 results: list[DetectionResult],
298) -> list[DetectionResult]:
299 """Resolve confusion between similar encodings in the top results.
300
301 Compares the top two results. If they form a known confusion pair,
302 it determines which encoding should win by checking the
303 resolve_by_bigram_rescore and resolve_by_category_voting tie-breakers
304 and giving precedence to bigram re-scoring when they disagree.
305
306 :param data: The raw byte data to examine.
307 :param results: Detection results sorted by confidence descending.
308 :returns: A reordered list of :class:`DetectionResult` with the winner first.
309 """
310 if len(results) < 2:
311 return results
312
313 top = results[0]
314 second = results[1]
315 if top.encoding is None or second.encoding is None:
316 return results
317
318 maps = load_confusion_data()
319 pair_key = _find_pair_key(maps, top.encoding, second.encoding)
320 if pair_key is None:
321 return results
322
323 diff_bytes, categories = maps[pair_key]
324 enc_a, enc_b = pair_key
325
326 cat_winner = resolve_by_category_voting(data, enc_a, enc_b, diff_bytes, categories)
327 bigram_winner = resolve_by_bigram_rescore(data, enc_a, enc_b, diff_bytes)
328 winner = bigram_winner if bigram_winner is not None else cat_winner
329
330 if winner is None or winner == top.encoding:
331 return results
332
333 return [second, top, *results[2:]]