1"""Pipeline orchestrator — runs all detection stages in sequence.
2
3Note: ``from __future__ import annotations`` is intentionally omitted because
4this module is compiled with mypyc, which does not support PEP 563 string
5annotations.
6"""
7
8import warnings
9
10from chardet._utils import DEFAULT_MAX_BYTES
11from chardet.enums import EncodingEra
12from chardet.pipeline import (
13 _NONE_RESULT,
14 DETERMINISTIC_CONFIDENCE,
15 HIGH_BYTES,
16 DetectionResult,
17 PipelineContext,
18)
19from chardet.pipeline.ascii import detect_ascii
20from chardet.pipeline.binary import is_binary
21from chardet.pipeline.bom import detect_bom
22from chardet.pipeline.escape import detect_escape_encoding
23from chardet.pipeline.language import fill_languages
24from chardet.pipeline.magic import detect_magic
25from chardet.pipeline.markup import detect_markup_charset, promote_markup_superset
26from chardet.pipeline.postprocess import postprocess_results
27from chardet.pipeline.statistical import score_candidates
28from chardet.pipeline.structural import (
29 compute_lead_byte_diversity,
30 compute_multibyte_byte_coverage,
31 compute_structural_score,
32)
33from chardet.pipeline.utf8 import detect_utf8
34from chardet.pipeline.utf1632 import detect_utf1632_patterns
35from chardet.pipeline.validity import filter_by_validity
36from chardet.registry import EncodingInfo, get_candidates
37
38_BINARY_RESULT = DetectionResult(
39 encoding=None,
40 confidence=DETERMINISTIC_CONFIDENCE,
41 language=None,
42 mime_type="application/octet-stream",
43)
44# Threshold at which a CJK structural score is confident enough to trigger
45# combined structural+statistical ranking rather than purely statistical.
46_STRUCTURAL_CONFIDENCE_THRESHOLD = 0.85
47
48# Maximum bytes used for statistical bigram scoring. Bigram models
49# converge quickly — 16 KB is sufficient for discrimination across all
50# language models (single-byte and multi-byte alike) while avoiding
51# unnecessary work on large files. Experimentally verified: 0 real
52# accuracy losses across 835 test files at this threshold.
53_STAT_SCORE_MAX_BYTES = 16384
54
55
56def _make_fallback_or_none(
57 encoding: str,
58 allowed: frozenset[str],
59 param_name: str,
60) -> list[DetectionResult]:
61 """Return a low-confidence result for *encoding*, or ``encoding=None`` if filtered out.
62
63 ``stacklevel=5`` targets the public caller:
64 detect() -> run_pipeline() -> _run_pipeline_core() -> _make_fallback_or_none().
65 """
66 if encoding not in allowed:
67 warnings.warn(
68 f"{param_name} {encoding!r} is excluded by "
69 f"include_encodings/exclude_encodings; returning encoding=None",
70 UserWarning,
71 stacklevel=5,
72 )
73 return [_NONE_RESULT]
74 return [DetectionResult(encoding=encoding, confidence=0.10, language=None)]
75
76
77# Minimum structural score (valid multi-byte sequences / lead bytes) required
78# to keep a CJK multi-byte candidate. Below this threshold the encoding is
79# eliminated as a false positive (e.g. Shift_JIS matching Latin data where
80# scattered high bytes look like lead bytes but rarely form valid pairs).
81_CJK_MIN_MB_RATIO = 0.05
82# Minimum number of non-ASCII bytes required for a CJK candidate to survive
83# gating. Very short inputs are validated by the other gates (structural
84# pair ratio, byte coverage) and by coverage-aware boosting in statistical
85# scoring — so we keep this threshold low to let even 1-character CJK
86# inputs compete.
87_CJK_MIN_NON_ASCII = 2
88# Minimum ratio of non-ASCII bytes that must participate in valid multi-byte
89# sequences for a CJK candidate to survive gating. Genuine CJK text has
90# nearly all non-ASCII bytes in valid pairs (coverage >= 0.95); Latin text
91# with scattered high bytes has many orphan bytes (coverage often < 0.5).
92# The lowest true-positive coverage in the test suite is ~0.39 (a CP932 HTML
93# file with many half-width katakana).
94_CJK_MIN_BYTE_COVERAGE = 0.35
95# Minimum number of distinct lead byte values for a CJK candidate to
96# survive gating. Genuine CJK text uses a wide range of lead bytes;
97# European false positives cluster in a narrow band. Only applied when
98# there are enough non-ASCII bytes to expect diversity (see
99# _CJK_DIVERSITY_MIN_NON_ASCII).
100_CJK_MIN_LEAD_DIVERSITY = 4
101# Minimum non-ASCII byte count before applying the lead diversity gate.
102# Very small files (e.g. 8 non-ASCII bytes) may genuinely have low
103# diversity even for real CJK text (e.g. repeated katakana).
104_CJK_DIVERSITY_MIN_NON_ASCII = 16
105
106
107def _gate_cjk_candidates(
108 data: bytes,
109 valid_candidates: tuple[EncodingInfo, ...],
110 ctx: PipelineContext,
111) -> tuple[EncodingInfo, ...]:
112 """Eliminate CJK multi-byte candidates that lack genuine multi-byte structure.
113
114 Four checks are applied in order to each multi-byte candidate:
115
116 1. **Structural pair ratio** (valid_pairs / lead_bytes) must be
117 >= ``_CJK_MIN_MB_RATIO``. Catches files with many orphan lead bytes.
118
119 2. **Minimum non-ASCII byte count**: the data must contain at least
120 ``_CJK_MIN_NON_ASCII`` bytes > 0x7F. Tiny files with 1-5 high bytes
121 can accidentally form perfect pairs and score 1.0 structurally.
122
123 3. **Byte coverage** (non-ASCII bytes in valid multi-byte sequences /
124 total non-ASCII bytes) must be >= ``_CJK_MIN_BYTE_COVERAGE``. Latin
125 text has many high bytes that are NOT consumed by multi-byte pairs;
126 genuine CJK text has nearly all high bytes accounted for.
127
128 4. **Lead byte diversity**: the number of distinct lead byte values in
129 valid pairs must be >= ``_CJK_MIN_LEAD_DIVERSITY``. Genuine CJK text
130 draws from a wide repertoire of lead bytes; European false positives
131 cluster in a narrow band (e.g. 0xC0-0xDF for accented Latin).
132
133 Returns the filtered candidate list. Structural scores are cached in
134 ``ctx.mb_scores`` for reuse in Stage 2b.
135 """
136 gated: list[EncodingInfo] = []
137 for enc in valid_candidates:
138 if enc.is_multibyte:
139 mb_score = compute_structural_score(data, enc, ctx)
140 ctx.mb_scores[enc.name] = mb_score
141 if mb_score < _CJK_MIN_MB_RATIO:
142 continue # No multi-byte structure -> eliminate
143 if ctx.non_ascii_count is None:
144 ctx.non_ascii_count = len(data) - len(data.translate(None, HIGH_BYTES))
145 if ctx.non_ascii_count < _CJK_MIN_NON_ASCII:
146 continue # Too few high bytes to trust the score
147 byte_coverage = compute_multibyte_byte_coverage(
148 data, enc, ctx, non_ascii_count=ctx.non_ascii_count
149 )
150 ctx.mb_coverage[enc.name] = byte_coverage
151 if byte_coverage < _CJK_MIN_BYTE_COVERAGE:
152 continue # Most high bytes are orphans -> not CJK
153 if ctx.non_ascii_count >= _CJK_DIVERSITY_MIN_NON_ASCII:
154 lead_diversity = compute_lead_byte_diversity(data, enc, ctx)
155 if lead_diversity < _CJK_MIN_LEAD_DIVERSITY:
156 continue # Too few distinct lead bytes -> not CJK
157 gated.append(enc)
158 return tuple(gated)
159
160
161def _score_structural_candidates(
162 data: bytes,
163 structural_scores: list[tuple[str, float]],
164 valid_candidates: tuple[EncodingInfo, ...],
165 ctx: PipelineContext,
166) -> list[DetectionResult]:
167 """Score structurally-valid CJK candidates using statistical bigrams.
168
169 When multiple CJK encodings score equally high structurally, statistical
170 scoring differentiates them (e.g. euc-jp vs big5 for Japanese data).
171 Single-byte candidates are also scored and included so that the caller
172 can compare CJK vs single-byte confidence.
173
174 Multi-byte candidates with high byte coverage (>= 0.95) receive a
175 confidence boost proportional to coverage. When nearly all non-ASCII
176 bytes form valid multi-byte pairs, the structural evidence is strong
177 and should increase the candidate's ranking relative to single-byte
178 alternatives whose bigram models may score higher on small samples.
179
180 Note: boosted confidence values may exceed 1.0 and are used only for
181 relative ranking among candidates. ``run_pipeline`` clamps all
182 confidence values to [0.0, 1.0] before returning to callers.
183 """
184 enc_lookup: dict[str, EncodingInfo] = {
185 e.name: e for e in valid_candidates if e.is_multibyte
186 }
187 valid_mb = tuple(
188 enc_lookup[name] for name, _sc in structural_scores if name in enc_lookup
189 )
190 single_byte = tuple(e for e in valid_candidates if not e.is_multibyte)
191 results = list(
192 score_candidates(data[:_STAT_SCORE_MAX_BYTES], (*valid_mb, *single_byte))
193 )
194
195 # Boost multi-byte candidates with high byte coverage.
196 boosted: list[DetectionResult] = []
197 for r in results:
198 coverage = ctx.mb_coverage.get(r.encoding, 0.0) if r.encoding else 0.0
199 if coverage >= 0.95:
200 boosted.append(
201 DetectionResult(
202 r.encoding, r.confidence * (1 + coverage), r.language, r.mime_type
203 )
204 )
205 else:
206 boosted.append(r)
207 boosted.sort(key=lambda x: x.confidence, reverse=True)
208 return boosted
209
210
211def _with_default_mime(result: DetectionResult) -> DetectionResult:
212 """Default ``mime_type`` to ``text/plain`` (text) or ``application/octet-stream`` (binary)."""
213 if result.mime_type is not None:
214 return result
215 mime = "text/plain" if result.encoding is not None else "application/octet-stream"
216 return DetectionResult(result.encoding, result.confidence, result.language, mime)
217
218
219def _run_pipeline_core( # noqa: PLR0913
220 data: bytes,
221 encoding_era: EncodingEra,
222 max_bytes: int = DEFAULT_MAX_BYTES,
223 *,
224 include_encodings: frozenset[str] | None = None,
225 exclude_encodings: frozenset[str] | None = None,
226 no_match_encoding: str = "cp1252",
227 empty_input_encoding: str = "utf-8",
228) -> list[DetectionResult]:
229 """Core pipeline logic. Returns list of results sorted by confidence."""
230 ctx = PipelineContext()
231 data = data[:max_bytes]
232
233 # Build candidate set once — used for both early-exit gating and
234 # statistical scoring. The set incorporates encoding_era, include, and
235 # exclude filters so all pipeline stages are gated consistently.
236 candidates = get_candidates(encoding_era, include_encodings, exclude_encodings)
237 allowed: frozenset[str] = frozenset(enc.name for enc in candidates)
238
239 if not data:
240 return _make_fallback_or_none(
241 empty_input_encoding, allowed, "empty_input_encoding"
242 )
243
244 # Stage 1a: BOM detection (runs first — BOMs are definitive and
245 # UTF-16/32 data looks binary due to null bytes)
246 bom_result = detect_bom(data)
247 if bom_result is not None and bom_result.encoding in allowed:
248 return [bom_result]
249
250 # Stage 1a+: UTF-16/32 null-byte pattern detection (for files without
251 # BOMs — must run before binary detection since these encodings contain
252 # many null bytes that would trigger the binary check)
253 utf1632_result = detect_utf1632_patterns(data)
254 if utf1632_result is not None and utf1632_result.encoding in allowed:
255 return [utf1632_result]
256
257 # Escape-sequence encodings (ISO-2022, HZ-GB-2312, UTF-7): must run
258 # before binary detection (ESC is a control byte) and before ASCII
259 # detection (HZ-GB-2312 uses only printable ASCII plus tildes).
260 escape_result = detect_escape_encoding(data)
261 if (
262 escape_result is not None
263 and escape_result.encoding is not None
264 and escape_result.encoding in allowed
265 ):
266 return [escape_result]
267
268 # Magic number detection for known binary formats — runs before
269 # UTF-8/ASCII prechecks to avoid unnecessary analysis on binary data.
270 magic_result = detect_magic(data)
271 if magic_result is not None:
272 return [magic_result]
273
274 # Pre-check UTF-8 to prevent false binary classification. Valid UTF-8
275 # with multi-byte sequences can contain control bytes (e.g. ESC for ANSI
276 # codes) that would otherwise exceed the binary threshold. We compute
277 # the result now but return it at the normal pipeline position (after
278 # markup) so that explicit charset declarations still take precedence.
279 utf8_precheck = detect_utf8(data)
280
281 # Pre-check ASCII to prevent false binary classification. ASCII text
282 # with null byte separators (e.g. find -print0 output) would exceed the
283 # binary threshold due to the null bytes. Like the UTF-8 precheck, we
284 # compute the result now but return it at the normal position (after
285 # markup) so explicit charset declarations still take precedence.
286 ascii_precheck = detect_ascii(data)
287
288 # Stage 0: Binary detection (skip when data is valid UTF-8 or ASCII)
289 # Binary detection (encoding=None) is NOT gated by filters.
290 if (
291 utf8_precheck is None
292 and ascii_precheck is None
293 and is_binary(data, max_bytes=max_bytes)
294 ):
295 return [_BINARY_RESULT]
296
297 # Stage 1b: Markup charset extraction (before ASCII/UTF-8 so explicit
298 # declarations like <?xml encoding="iso-8859-1"?> are honoured even
299 # when the bytes happen to be pure ASCII or valid UTF-8).
300 markup_result = detect_markup_charset(data)
301 if markup_result is not None and markup_result.encoding in allowed:
302 markup_result = promote_markup_superset(data, markup_result, allowed)
303 return [markup_result]
304
305 # Stage 1c: ASCII (use pre-computed result)
306 if ascii_precheck is not None and ascii_precheck.encoding in allowed:
307 return [ascii_precheck]
308
309 # Stage 1d: UTF-8 structural validation (use pre-computed result)
310 if utf8_precheck is not None and utf8_precheck.encoding in allowed:
311 return [utf8_precheck]
312
313 # Stage 2a: Byte validity filtering
314 valid_candidates = filter_by_validity(data, candidates)
315
316 if not valid_candidates:
317 return _make_fallback_or_none(no_match_encoding, allowed, "no_match_encoding")
318
319 # Gate: eliminate CJK multi-byte candidates that lack genuine
320 # multi-byte structure. Cache structural scores for Stage 2b.
321 valid_candidates = _gate_cjk_candidates(data, valid_candidates, ctx)
322
323 if not valid_candidates:
324 return _make_fallback_or_none(no_match_encoding, allowed, "no_match_encoding")
325
326 # Stage 2b: Structural probing for multi-byte encodings
327 # Reuse scores already computed during the CJK gate above.
328 structural_scores: list[tuple[str, float]] = []
329 for enc in valid_candidates:
330 if enc.is_multibyte:
331 score = ctx.mb_scores.get(enc.name)
332 if score is None: # pragma: no cover - gate always populates cache
333 score = compute_structural_score(data, enc, ctx)
334 if score > 0.0:
335 structural_scores.append((enc.name, score))
336
337 # If a multi-byte encoding scored very high, score all candidates
338 # (CJK + single-byte) statistically.
339 if structural_scores:
340 structural_scores.sort(key=lambda x: x[1], reverse=True)
341 _, best_score = structural_scores[0]
342 if best_score >= _STRUCTURAL_CONFIDENCE_THRESHOLD:
343 results = _score_structural_candidates(
344 data, structural_scores, valid_candidates, ctx
345 )
346 if results:
347 return postprocess_results(data, results)
348
349 # Stage 3: Statistical scoring for all remaining candidates.
350 # Bigram models converge quickly and don't benefit from scanning
351 # beyond 16 KB — cap the data to avoid unnecessary work on large files.
352 stat_data = data[:_STAT_SCORE_MAX_BYTES]
353 results = list(score_candidates(stat_data, tuple(valid_candidates)))
354 if not results:
355 return _make_fallback_or_none(no_match_encoding, allowed, "no_match_encoding")
356
357 return postprocess_results(data, results)
358
359
360def run_pipeline( # noqa: PLR0913
361 data: bytes,
362 encoding_era: EncodingEra,
363 max_bytes: int = DEFAULT_MAX_BYTES,
364 *,
365 include_encodings: frozenset[str] | None = None,
366 exclude_encodings: frozenset[str] | None = None,
367 no_match_encoding: str = "cp1252",
368 empty_input_encoding: str = "utf-8",
369) -> list[DetectionResult]:
370 """Run the full detection pipeline.
371
372 :param data: The raw byte data to analyze.
373 :param encoding_era: Filter candidates to a specific era of encodings.
374 :param max_bytes: Maximum number of bytes to process.
375 :param include_encodings: If not ``None``, only return these encodings.
376 :param exclude_encodings: If not ``None``, never return these encodings.
377 :param no_match_encoding: Encoding returned when no candidate survives.
378 :param empty_input_encoding: Encoding returned for empty input.
379 :returns: A list of :class:`DetectionResult` sorted by confidence descending.
380 """
381 results = _run_pipeline_core(
382 data,
383 encoding_era,
384 max_bytes,
385 include_encodings=include_encodings,
386 exclude_encodings=exclude_encodings,
387 no_match_encoding=no_match_encoding,
388 empty_input_encoding=empty_input_encoding,
389 )
390 results = fill_languages(data, results)
391 results = [_with_default_mime(r) for r in results]
392 if not results: # pragma: no cover
393 msg = "pipeline must always return at least one result"
394 raise RuntimeError(msg)
395 # Clamp confidence to [0.0, 1.0] at the public API boundary. Internal
396 # stages may boost confidence above 1.0 for ranking purposes (e.g.
397 # CJK byte-coverage boost), but callers expect a probability-like value.
398 return [
399 DetectionResult(r.encoding, min(r.confidence, 1.0), r.language, r.mime_type)
400 if r.confidence > 1.0
401 else r
402 for r in results
403 ]