1"""Pipeline orchestrator — runs all detection stages in sequence.
2
3Note: ``from __future__ import annotations`` is intentionally omitted because
4this module is compiled with mypyc, which does not support PEP 563 string
5annotations.
6"""
7
8import warnings
9
10from chardet._utils import DEFAULT_MAX_BYTES
11from chardet.enums import EncodingEra
12from chardet.models import (
13 BigramProfile,
14 has_model_variants,
15 infer_language,
16 score_best_language,
17)
18from chardet.pipeline import (
19 _NONE_RESULT,
20 DETERMINISTIC_CONFIDENCE,
21 HIGH_BYTES,
22 DetectionResult,
23 PipelineContext,
24)
25from chardet.pipeline.ascii import detect_ascii
26from chardet.pipeline.binary import is_binary
27from chardet.pipeline.bom import detect_bom
28from chardet.pipeline.confusion import resolve_confusion_groups
29from chardet.pipeline.escape import detect_escape_encoding
30from chardet.pipeline.magic import detect_magic
31from chardet.pipeline.markup import detect_markup_charset
32from chardet.pipeline.statistical import score_candidates
33from chardet.pipeline.structural import (
34 compute_lead_byte_diversity,
35 compute_multibyte_byte_coverage,
36 compute_structural_score,
37)
38from chardet.pipeline.utf8 import detect_utf8
39from chardet.pipeline.utf1632 import detect_utf1632_patterns
40from chardet.pipeline.validity import filter_by_validity
41from chardet.registry import REGISTRY, EncodingInfo, get_candidates
42
43_BINARY_RESULT = DetectionResult(
44 encoding=None,
45 confidence=DETERMINISTIC_CONFIDENCE,
46 language=None,
47 mime_type="application/octet-stream",
48)
49# Threshold at which a CJK structural score is confident enough to trigger
50# combined structural+statistical ranking rather than purely statistical.
51_STRUCTURAL_CONFIDENCE_THRESHOLD = 0.85
52
53# Maximum bytes used for statistical bigram scoring. Bigram models
54# converge quickly — 16 KB is sufficient for discrimination across all
55# language models (single-byte and multi-byte alike) while avoiding
56# unnecessary work on large files. Experimentally verified: 0 real
57# accuracy losses across 835 test files at this threshold.
58_STAT_SCORE_MAX_BYTES = 16384
59
60# Common Western Latin encodings that share the iso-8859-1 character
61# repertoire for the byte values where iso-8859-10 is indistinguishable.
62# Used as swap targets when demoting iso-8859-10 — we prefer these over
63# iso-8859-10, but do not want to accidentally promote an unrelated encoding
64# (e.g. windows-1254).
65_COMMON_LATIN_ENCODINGS: frozenset[str] = frozenset(
66 {
67 "iso8859-1",
68 "iso8859-15",
69 "cp1252",
70 }
71)
72
73# Bytes where iso-8859-10 decodes to a different character than iso-8859-1.
74# Computed programmatically via:
75# {b for b in range(0x80, 0x100)
76# if bytes([b]).decode('iso-8859-10') != bytes([b]).decode('iso-8859-1')}
77_ISO_8859_10_DISTINGUISHING: frozenset[int] = frozenset(
78 {
79 0xA1,
80 0xA2,
81 0xA3,
82 0xA4,
83 0xA5,
84 0xA6,
85 0xA8,
86 0xA9,
87 0xAA,
88 0xAB,
89 0xAC,
90 0xAE,
91 0xAF,
92 0xB1,
93 0xB2,
94 0xB3,
95 0xB4,
96 0xB5,
97 0xB6,
98 0xB8,
99 0xB9,
100 0xBA,
101 0xBB,
102 0xBC,
103 0xBD,
104 0xBE,
105 0xBF,
106 0xC0,
107 0xC7,
108 0xC8,
109 0xCA,
110 0xCC,
111 0xD1,
112 0xD2,
113 0xD7,
114 0xD9,
115 0xE0,
116 0xE7,
117 0xE8,
118 0xEA,
119 0xEC,
120 0xF1,
121 0xF2,
122 0xF7,
123 0xF9,
124 0xFF,
125 }
126)
127
128# Bytes where iso-8859-14 decodes to a different character than iso-8859-1.
129# Computed programmatically via:
130# {b for b in range(0x80, 0x100)
131# if bytes([b]).decode('iso-8859-14') != bytes([b]).decode('iso-8859-1')}
132_ISO_8859_14_DISTINGUISHING: frozenset[int] = frozenset(
133 {
134 0xA1,
135 0xA2,
136 0xA4,
137 0xA5,
138 0xA6,
139 0xA8,
140 0xAA,
141 0xAB,
142 0xAC,
143 0xAF,
144 0xB0,
145 0xB1,
146 0xB2,
147 0xB3,
148 0xB4,
149 0xB5,
150 0xB7,
151 0xB8,
152 0xB9,
153 0xBA,
154 0xBB,
155 0xBC,
156 0xBD,
157 0xBE,
158 0xBF,
159 0xD0,
160 0xD7,
161 0xDE,
162 0xF0,
163 0xF7,
164 0xFE,
165 }
166)
167
168# Bytes where windows-1254 has Turkish-specific characters that differ from
169# windows-1252. Windows-1254 differs from windows-1252 at 8 byte positions.
170# Two (0x8E, 0x9E) are undefined in Windows-1254 but defined in Windows-1252;
171# these are excluded here because undefined bytes are not useful for
172# identifying Turkish text. The remaining six positions map to
173# Turkish-specific letters and are the primary distinguishing signal.
174_WINDOWS_1254_DISTINGUISHING: frozenset[int] = frozenset(
175 {0xD0, 0xDD, 0xDE, 0xF0, 0xFD, 0xFE}
176)
177
178# Encodings that are often false positives when their distinguishing bytes
179# are absent. Keyed by encoding name -> frozenset of byte values where
180# that encoding differs from iso-8859-1 (or windows-1252 in the case of
181# windows-1254).
182# Bytes where HP-Roman8 maps to lowercase accented letters but ISO-8859-1
183# maps to uppercase letters. Real HP-Roman8 text (from HP-UX terminals)
184# contains these bytes; data misdetected as HP-Roman8 typically does not.
185# {b for b in range(0x80, 0x100)
186# if (unicodedata.category(bytes([b]).decode('hp-roman8')) == 'Ll'
187# and unicodedata.category(bytes([b]).decode('iso-8859-1')) == 'Lu')}
188_HP_ROMAN8_DISTINGUISHING: frozenset[int] = frozenset(
189 {
190 0xC0,
191 0xC1,
192 0xC2,
193 0xC3,
194 0xC4,
195 0xC5,
196 0xC6,
197 0xC7,
198 0xC8,
199 0xC9,
200 0xCA,
201 0xCB,
202 0xCC,
203 0xCD,
204 0xCE,
205 0xCF,
206 0xD1,
207 0xD4,
208 0xD5,
209 0xD6,
210 0xD9,
211 0xDD,
212 0xDE,
213 }
214)
215
216_DEMOTION_CANDIDATES: dict[str, frozenset[int]] = {
217 "iso8859-10": _ISO_8859_10_DISTINGUISHING,
218 "iso8859-14": _ISO_8859_14_DISTINGUISHING,
219 "cp1254": _WINDOWS_1254_DISTINGUISHING,
220 "hp-roman8": _HP_ROMAN8_DISTINGUISHING,
221}
222
223# Bytes where KOI8-T maps to Tajik-specific Cyrillic letters but KOI8-R
224# maps to box-drawing characters. Presence of any of these bytes is strong
225# evidence for KOI8-T over KOI8-R.
226_KOI8_T_DISTINGUISHING: frozenset[int] = frozenset(
227 {0x80, 0x81, 0x83, 0x8A, 0x8C, 0x8D, 0x8E, 0x90, 0xA1, 0xA2, 0xA5, 0xB5}
228)
229
230
231# Markup charset declarations that commonly refer to a Windows superset
232# encoding rather than the strict standard encoding. Japanese web content
233# almost universally declares "Shift_JIS" but actually uses CP932 extensions;
234# similarly, Korean web content declares "EUC-KR" but uses CP949/UHC.
235# When the declared encoding resolves to the base (left), we check whether
236# the superset (right) is a better structural match.
237_MARKUP_SUPERSET_PROMOTIONS: dict[str, str] = {
238 "shift_jis_2004": "cp932",
239 "euc_kr": "cp949",
240}
241
242
243def _try_promote_markup_superset(
244 data: bytes,
245 markup_result: DetectionResult,
246 allowed: frozenset[str],
247) -> DetectionResult:
248 """Promote a markup-declared encoding to its superset when structural evidence supports it.
249
250 If the declared encoding has a known superset, the superset validates the
251 data, and the superset's structural score is materially better, return a
252 new result using the superset encoding. Otherwise return the original.
253 """
254 if markup_result.encoding is None:
255 return markup_result
256 superset_name = _MARKUP_SUPERSET_PROMOTIONS.get(markup_result.encoding)
257 if superset_name is None or superset_name not in allowed:
258 return markup_result
259 superset_info = REGISTRY[superset_name]
260 # Validate: superset must be able to decode the data
261 try:
262 data.decode(superset_name, errors="strict")
263 except (UnicodeDecodeError, LookupError):
264 return markup_result
265 # Compare structural scores
266 ctx = PipelineContext()
267 base_score = compute_structural_score(data, REGISTRY[markup_result.encoding], ctx)
268 superset_score = compute_structural_score(data, superset_info, ctx)
269 if superset_score > base_score:
270 return DetectionResult(
271 superset_name,
272 markup_result.confidence,
273 markup_result.language,
274 markup_result.mime_type,
275 )
276 return markup_result
277
278
279def _make_fallback_or_none(
280 encoding: str,
281 allowed: frozenset[str],
282 param_name: str,
283) -> list[DetectionResult]:
284 """Return a low-confidence result for *encoding*, or ``encoding=None`` if filtered out.
285
286 ``stacklevel=5`` targets the public caller:
287 detect() -> run_pipeline() -> _run_pipeline_core() -> _make_fallback_or_none().
288 """
289 if encoding not in allowed:
290 warnings.warn(
291 f"{param_name} {encoding!r} is excluded by "
292 f"include_encodings/exclude_encodings; returning encoding=None",
293 UserWarning,
294 stacklevel=5,
295 )
296 return [_NONE_RESULT]
297 return [DetectionResult(encoding=encoding, confidence=0.10, language=None)]
298
299
300def _should_demote(encoding: str, data: bytes) -> bool:
301 """Return True if encoding is a demotion candidate with no distinguishing bytes.
302
303 Checks whether any non-ASCII byte in *data* falls in the set of byte
304 values that decode differently under the given encoding vs iso-8859-1.
305 If none do, the data is equally valid under both encodings and there is
306 no byte-level evidence for preferring the candidate encoding.
307 """
308 distinguishing = _DEMOTION_CANDIDATES.get(encoding)
309 if distinguishing is None:
310 return False
311 return not any(b in distinguishing for b in data if b > 0x7F)
312
313
314# Minimum structural score (valid multi-byte sequences / lead bytes) required
315# to keep a CJK multi-byte candidate. Below this threshold the encoding is
316# eliminated as a false positive (e.g. Shift_JIS matching Latin data where
317# scattered high bytes look like lead bytes but rarely form valid pairs).
318_CJK_MIN_MB_RATIO = 0.05
319# Minimum number of non-ASCII bytes required for a CJK candidate to survive
320# gating. Very short inputs are validated by the other gates (structural
321# pair ratio, byte coverage) and by coverage-aware boosting in statistical
322# scoring — so we keep this threshold low to let even 1-character CJK
323# inputs compete.
324_CJK_MIN_NON_ASCII = 2
325# Minimum ratio of non-ASCII bytes that must participate in valid multi-byte
326# sequences for a CJK candidate to survive gating. Genuine CJK text has
327# nearly all non-ASCII bytes in valid pairs (coverage >= 0.95); Latin text
328# with scattered high bytes has many orphan bytes (coverage often < 0.5).
329# The lowest true-positive coverage in the test suite is ~0.39 (a CP932 HTML
330# file with many half-width katakana).
331_CJK_MIN_BYTE_COVERAGE = 0.35
332# Minimum number of distinct lead byte values for a CJK candidate to
333# survive gating. Genuine CJK text uses a wide range of lead bytes;
334# European false positives cluster in a narrow band. Only applied when
335# there are enough non-ASCII bytes to expect diversity (see
336# _CJK_DIVERSITY_MIN_NON_ASCII).
337_CJK_MIN_LEAD_DIVERSITY = 4
338# Minimum non-ASCII byte count before applying the lead diversity gate.
339# Very small files (e.g. 8 non-ASCII bytes) may genuinely have low
340# diversity even for real CJK text (e.g. repeated katakana).
341_CJK_DIVERSITY_MIN_NON_ASCII = 16
342
343
344def _gate_cjk_candidates(
345 data: bytes,
346 valid_candidates: tuple[EncodingInfo, ...],
347 ctx: PipelineContext,
348) -> tuple[EncodingInfo, ...]:
349 """Eliminate CJK multi-byte candidates that lack genuine multi-byte structure.
350
351 Four checks are applied in order to each multi-byte candidate:
352
353 1. **Structural pair ratio** (valid_pairs / lead_bytes) must be
354 >= ``_CJK_MIN_MB_RATIO``. Catches files with many orphan lead bytes.
355
356 2. **Minimum non-ASCII byte count**: the data must contain at least
357 ``_CJK_MIN_NON_ASCII`` bytes > 0x7F. Tiny files with 1-5 high bytes
358 can accidentally form perfect pairs and score 1.0 structurally.
359
360 3. **Byte coverage** (non-ASCII bytes in valid multi-byte sequences /
361 total non-ASCII bytes) must be >= ``_CJK_MIN_BYTE_COVERAGE``. Latin
362 text has many high bytes that are NOT consumed by multi-byte pairs;
363 genuine CJK text has nearly all high bytes accounted for.
364
365 4. **Lead byte diversity**: the number of distinct lead byte values in
366 valid pairs must be >= ``_CJK_MIN_LEAD_DIVERSITY``. Genuine CJK text
367 draws from a wide repertoire of lead bytes; European false positives
368 cluster in a narrow band (e.g. 0xC0-0xDF for accented Latin).
369
370 Returns the filtered candidate list. Structural scores are cached in
371 ``ctx.mb_scores`` for reuse in Stage 2b.
372 """
373 gated: list[EncodingInfo] = []
374 for enc in valid_candidates:
375 if enc.is_multibyte:
376 mb_score = compute_structural_score(data, enc, ctx)
377 ctx.mb_scores[enc.name] = mb_score
378 if mb_score < _CJK_MIN_MB_RATIO:
379 continue # No multi-byte structure -> eliminate
380 if ctx.non_ascii_count is None:
381 ctx.non_ascii_count = len(data) - len(data.translate(None, HIGH_BYTES))
382 if ctx.non_ascii_count < _CJK_MIN_NON_ASCII:
383 continue # Too few high bytes to trust the score
384 byte_coverage = compute_multibyte_byte_coverage(
385 data, enc, ctx, non_ascii_count=ctx.non_ascii_count
386 )
387 ctx.mb_coverage[enc.name] = byte_coverage
388 if byte_coverage < _CJK_MIN_BYTE_COVERAGE:
389 continue # Most high bytes are orphans -> not CJK
390 if ctx.non_ascii_count >= _CJK_DIVERSITY_MIN_NON_ASCII:
391 lead_diversity = compute_lead_byte_diversity(data, enc, ctx)
392 if lead_diversity < _CJK_MIN_LEAD_DIVERSITY:
393 continue # Too few distinct lead bytes -> not CJK
394 gated.append(enc)
395 return tuple(gated)
396
397
398def _score_structural_candidates(
399 data: bytes,
400 structural_scores: list[tuple[str, float]],
401 valid_candidates: tuple[EncodingInfo, ...],
402 ctx: PipelineContext,
403) -> list[DetectionResult]:
404 """Score structurally-valid CJK candidates using statistical bigrams.
405
406 When multiple CJK encodings score equally high structurally, statistical
407 scoring differentiates them (e.g. euc-jp vs big5 for Japanese data).
408 Single-byte candidates are also scored and included so that the caller
409 can compare CJK vs single-byte confidence.
410
411 Multi-byte candidates with high byte coverage (>= 0.95) receive a
412 confidence boost proportional to coverage. When nearly all non-ASCII
413 bytes form valid multi-byte pairs, the structural evidence is strong
414 and should increase the candidate's ranking relative to single-byte
415 alternatives whose bigram models may score higher on small samples.
416
417 Note: boosted confidence values may exceed 1.0 and are used only for
418 relative ranking among candidates. ``run_pipeline`` clamps all
419 confidence values to [0.0, 1.0] before returning to callers.
420 """
421 enc_lookup: dict[str, EncodingInfo] = {
422 e.name: e for e in valid_candidates if e.is_multibyte
423 }
424 valid_mb = tuple(
425 enc_lookup[name] for name, _sc in structural_scores if name in enc_lookup
426 )
427 single_byte = tuple(e for e in valid_candidates if not e.is_multibyte)
428 results = list(
429 score_candidates(data[:_STAT_SCORE_MAX_BYTES], (*valid_mb, *single_byte))
430 )
431
432 # Boost multi-byte candidates with high byte coverage.
433 boosted: list[DetectionResult] = []
434 for r in results:
435 coverage = ctx.mb_coverage.get(r.encoding, 0.0) if r.encoding else 0.0
436 if coverage >= 0.95:
437 boosted.append(
438 DetectionResult(
439 r.encoding, r.confidence * (1 + coverage), r.language, r.mime_type
440 )
441 )
442 else:
443 boosted.append(r)
444 boosted.sort(key=lambda x: x.confidence, reverse=True)
445 return boosted
446
447
448def _demote_niche_latin(
449 data: bytes,
450 results: list[DetectionResult],
451) -> list[DetectionResult]:
452 """Demote niche Latin encodings when no distinguishing bytes are present.
453
454 Some bigram models (e.g. iso-8859-10, iso-8859-14, windows-1254) can win
455 on data that contains only bytes shared with common Western Latin
456 encodings. When there is no byte-level evidence for the winning
457 encoding, promote the first common Western Latin candidate to the top and
458 push the demoted encoding to last.
459 """
460 if (
461 len(results) > 1
462 and results[0].encoding is not None
463 and _should_demote(results[0].encoding, data)
464 ):
465 demoted_encoding = results[0].encoding
466 top_conf = results[0].confidence
467 for r in results[1:]:
468 if r.encoding in _COMMON_LATIN_ENCODINGS:
469 promoted = DetectionResult(
470 r.encoding, top_conf, r.language, r.mime_type
471 )
472 others = [
473 x for x in results if x.encoding != demoted_encoding and x is not r
474 ]
475 demoted_entries = [x for x in results if x.encoding == demoted_encoding]
476 return [promoted, *others, *demoted_entries]
477 return results
478
479
480def _promote_koi8t(
481 data: bytes,
482 results: list[DetectionResult],
483) -> list[DetectionResult]:
484 """Promote KOI8-T over KOI8-R when Tajik-specific bytes are present.
485
486 KOI8-T and KOI8-R share the entire 0xC0-0xFF Cyrillic letter block,
487 making statistical discrimination difficult. However, KOI8-T maps 12
488 bytes in 0x80-0xBF to Tajik-specific Cyrillic letters where KOI8-R has
489 box-drawing characters. If any of these bytes appear, KOI8-T is the
490 better match.
491 """
492 if not results or results[0].encoding != "koi8-r":
493 return results
494 # Check if KOI8-T is anywhere in the results
495 koi8t_idx = next((i for i, r in enumerate(results) if r.encoding == "koi8-t"), None)
496 if koi8t_idx is None:
497 return results
498 # Check for Tajik-specific bytes
499 if any(b in _KOI8_T_DISTINGUISHING for b in data if b > 0x7F):
500 koi8t_result = results[koi8t_idx]
501 top_conf = results[0].confidence
502 promoted = DetectionResult(
503 koi8t_result.encoding,
504 top_conf,
505 koi8t_result.language,
506 koi8t_result.mime_type,
507 )
508 others = [r for i, r in enumerate(results) if i != koi8t_idx]
509 return [promoted, *others]
510 return results
511
512
513# Maximum bytes of data used for language scoring in _fill_language.
514# Language bigrams converge quickly — 2 KB is sufficient for discrimination
515# across all language models while keeping Tier 3 (language-model scoring) fast.
516_LANG_SCORE_MAX_BYTES = 2048
517
518
519def _to_utf8(data: bytes, encoding: str) -> bytes | None:
520 """Decode data from encoding and re-encode as UTF-8 for language scoring.
521
522 Returns None if the encoding is unknown. For UTF-8, returns data as-is.
523 Uses ``errors="ignore"`` because the data already passed byte-validity
524 filtering for the detected encoding; any residual invalid bytes are
525 irrelevant for language scoring.
526 """
527 if encoding == "utf-8":
528 return data
529 try:
530 return data.decode(encoding, errors="ignore").encode(
531 "utf-8", errors="surrogatepass"
532 )
533 except (LookupError, TypeError):
534 return None
535
536
537def _fill_metadata(
538 data: bytes, results: list[DetectionResult]
539) -> list[DetectionResult]:
540 """Fill in language and mime_type for results missing them.
541
542 **Language** (only for text results where ``encoding is not None``):
543
544 Tier 1: single-language encodings via hardcoded map (instant).
545 Tier 2: multi-language encodings via statistical bigram scoring (lazy).
546 Tier 3: decode to UTF-8, score against UTF-8 language models (universal fallback).
547
548 **MIME type**: text results default to ``"text/plain"``, binary results
549 (``encoding is None``) default to ``"application/octet-stream"``.
550 """
551 filled: list[DetectionResult] = []
552 profile: BigramProfile | None = None
553 utf8_profile: BigramProfile | None = None
554 for result in results:
555 lang = result.language
556 if lang is None and result.encoding is not None:
557 # Tier 1: single-language encoding
558 lang = infer_language(result.encoding)
559 # Tier 2: statistical scoring for multi-language encodings
560 if lang is None and data and has_model_variants(result.encoding):
561 if profile is None:
562 profile = BigramProfile(data)
563 _, lang = score_best_language(data, result.encoding, profile=profile)
564 # Tier 3: decode to UTF-8, score against UTF-8 language models
565 if lang is None and data and has_model_variants("utf-8"):
566 utf8_data = _to_utf8(data, result.encoding)
567 if utf8_data:
568 if utf8_profile is None or result.encoding != "utf-8":
569 utf8_profile = BigramProfile(utf8_data)
570 _, lang = score_best_language(
571 utf8_data, "utf-8", profile=utf8_profile
572 )
573
574 mime = result.mime_type
575 if mime is None:
576 mime = (
577 "text/plain"
578 if result.encoding is not None
579 else "application/octet-stream"
580 )
581
582 if lang != result.language or mime != result.mime_type:
583 filled.append(
584 DetectionResult(result.encoding, result.confidence, lang, mime)
585 )
586 else:
587 filled.append(result)
588 return filled
589
590
591def _postprocess_results(
592 data: bytes,
593 results: list[DetectionResult],
594) -> list[DetectionResult]:
595 """Apply confusion resolution, niche Latin demotion, and KOI8-T promotion."""
596 results = resolve_confusion_groups(data, results)
597 results = _demote_niche_latin(data, results)
598 return _promote_koi8t(data, results)
599
600
601def _run_pipeline_core( # noqa: PLR0913
602 data: bytes,
603 encoding_era: EncodingEra,
604 max_bytes: int = DEFAULT_MAX_BYTES,
605 *,
606 include_encodings: frozenset[str] | None = None,
607 exclude_encodings: frozenset[str] | None = None,
608 no_match_encoding: str = "cp1252",
609 empty_input_encoding: str = "utf-8",
610) -> list[DetectionResult]:
611 """Core pipeline logic. Returns list of results sorted by confidence."""
612 ctx = PipelineContext()
613 data = data[:max_bytes]
614
615 # Build candidate set once — used for both early-exit gating and
616 # statistical scoring. The set incorporates encoding_era, include, and
617 # exclude filters so all pipeline stages are gated consistently.
618 candidates = get_candidates(encoding_era, include_encodings, exclude_encodings)
619 allowed: frozenset[str] = frozenset(enc.name for enc in candidates)
620
621 if not data:
622 return _make_fallback_or_none(
623 empty_input_encoding, allowed, "empty_input_encoding"
624 )
625
626 # Stage 1a: BOM detection (runs first — BOMs are definitive and
627 # UTF-16/32 data looks binary due to null bytes)
628 bom_result = detect_bom(data)
629 if bom_result is not None and bom_result.encoding in allowed:
630 return [bom_result]
631
632 # Stage 1a+: UTF-16/32 null-byte pattern detection (for files without
633 # BOMs — must run before binary detection since these encodings contain
634 # many null bytes that would trigger the binary check)
635 utf1632_result = detect_utf1632_patterns(data)
636 if utf1632_result is not None and utf1632_result.encoding in allowed:
637 return [utf1632_result]
638
639 # Escape-sequence encodings (ISO-2022, HZ-GB-2312, UTF-7): must run
640 # before binary detection (ESC is a control byte) and before ASCII
641 # detection (HZ-GB-2312 uses only printable ASCII plus tildes).
642 escape_result = detect_escape_encoding(data)
643 if (
644 escape_result is not None
645 and escape_result.encoding is not None
646 and escape_result.encoding in allowed
647 ):
648 return [escape_result]
649
650 # Magic number detection for known binary formats — runs before
651 # UTF-8/ASCII prechecks to avoid unnecessary analysis on binary data.
652 magic_result = detect_magic(data)
653 if magic_result is not None:
654 return [magic_result]
655
656 # Pre-check UTF-8 to prevent false binary classification. Valid UTF-8
657 # with multi-byte sequences can contain control bytes (e.g. ESC for ANSI
658 # codes) that would otherwise exceed the binary threshold. We compute
659 # the result now but return it at the normal pipeline position (after
660 # markup) so that explicit charset declarations still take precedence.
661 utf8_precheck = detect_utf8(data)
662
663 # Pre-check ASCII to prevent false binary classification. ASCII text
664 # with null byte separators (e.g. find -print0 output) would exceed the
665 # binary threshold due to the null bytes. Like the UTF-8 precheck, we
666 # compute the result now but return it at the normal position (after
667 # markup) so explicit charset declarations still take precedence.
668 ascii_precheck = detect_ascii(data)
669
670 # Stage 0: Binary detection (skip when data is valid UTF-8 or ASCII)
671 # Binary detection (encoding=None) is NOT gated by filters.
672 if (
673 utf8_precheck is None
674 and ascii_precheck is None
675 and is_binary(data, max_bytes=max_bytes)
676 ):
677 return [_BINARY_RESULT]
678
679 # Stage 1b: Markup charset extraction (before ASCII/UTF-8 so explicit
680 # declarations like <?xml encoding="iso-8859-1"?> are honoured even
681 # when the bytes happen to be pure ASCII or valid UTF-8).
682 markup_result = detect_markup_charset(data)
683 if markup_result is not None and markup_result.encoding in allowed:
684 markup_result = _try_promote_markup_superset(data, markup_result, allowed)
685 return [markup_result]
686
687 # Stage 1c: ASCII (use pre-computed result)
688 if ascii_precheck is not None and ascii_precheck.encoding in allowed:
689 return [ascii_precheck]
690
691 # Stage 1d: UTF-8 structural validation (use pre-computed result)
692 if utf8_precheck is not None and utf8_precheck.encoding in allowed:
693 return [utf8_precheck]
694
695 # Stage 2a: Byte validity filtering
696 valid_candidates = filter_by_validity(data, candidates)
697
698 if not valid_candidates:
699 return _make_fallback_or_none(no_match_encoding, allowed, "no_match_encoding")
700
701 # Gate: eliminate CJK multi-byte candidates that lack genuine
702 # multi-byte structure. Cache structural scores for Stage 2b.
703 valid_candidates = _gate_cjk_candidates(data, valid_candidates, ctx)
704
705 if not valid_candidates:
706 return _make_fallback_or_none(no_match_encoding, allowed, "no_match_encoding")
707
708 # Stage 2b: Structural probing for multi-byte encodings
709 # Reuse scores already computed during the CJK gate above.
710 structural_scores: list[tuple[str, float]] = []
711 for enc in valid_candidates:
712 if enc.is_multibyte:
713 score = ctx.mb_scores.get(enc.name)
714 if score is None: # pragma: no cover - gate always populates cache
715 score = compute_structural_score(data, enc, ctx)
716 if score > 0.0:
717 structural_scores.append((enc.name, score))
718
719 # If a multi-byte encoding scored very high, score all candidates
720 # (CJK + single-byte) statistically.
721 if structural_scores:
722 structural_scores.sort(key=lambda x: x[1], reverse=True)
723 _, best_score = structural_scores[0]
724 if best_score >= _STRUCTURAL_CONFIDENCE_THRESHOLD:
725 results = _score_structural_candidates(
726 data, structural_scores, valid_candidates, ctx
727 )
728 return _postprocess_results(data, results)
729
730 # Stage 3: Statistical scoring for all remaining candidates.
731 # Bigram models converge quickly and don't benefit from scanning
732 # beyond 16 KB — cap the data to avoid unnecessary work on large files.
733 stat_data = data[:_STAT_SCORE_MAX_BYTES]
734 results = list(score_candidates(stat_data, tuple(valid_candidates)))
735 if not results:
736 return _make_fallback_or_none(no_match_encoding, allowed, "no_match_encoding")
737
738 return _postprocess_results(data, results)
739
740
741def run_pipeline( # noqa: PLR0913
742 data: bytes,
743 encoding_era: EncodingEra,
744 max_bytes: int = DEFAULT_MAX_BYTES,
745 *,
746 include_encodings: frozenset[str] | None = None,
747 exclude_encodings: frozenset[str] | None = None,
748 no_match_encoding: str = "cp1252",
749 empty_input_encoding: str = "utf-8",
750) -> list[DetectionResult]:
751 """Run the full detection pipeline.
752
753 :param data: The raw byte data to analyze.
754 :param encoding_era: Filter candidates to a specific era of encodings.
755 :param max_bytes: Maximum number of bytes to process.
756 :param include_encodings: If not ``None``, only return these encodings.
757 :param exclude_encodings: If not ``None``, never return these encodings.
758 :param no_match_encoding: Encoding returned when no candidate survives.
759 :param empty_input_encoding: Encoding returned for empty input.
760 :returns: A list of :class:`DetectionResult` sorted by confidence descending.
761 """
762 results = _run_pipeline_core(
763 data,
764 encoding_era,
765 max_bytes,
766 include_encodings=include_encodings,
767 exclude_encodings=exclude_encodings,
768 no_match_encoding=no_match_encoding,
769 empty_input_encoding=empty_input_encoding,
770 )
771 # Language scoring uses only the first 2 KB — bigrams converge quickly
772 # and this keeps Tier 3 (language-model scoring) fast even on large inputs.
773 results = _fill_metadata(data[:_LANG_SCORE_MAX_BYTES], results)
774 if not results: # pragma: no cover
775 msg = "pipeline must always return at least one result"
776 raise RuntimeError(msg)
777 # Clamp confidence to [0.0, 1.0] at the public API boundary. Internal
778 # stages may boost confidence above 1.0 for ranking purposes (e.g.
779 # CJK byte-coverage boost), but callers expect a probability-like value.
780 return [
781 DetectionResult(r.encoding, min(r.confidence, 1.0), r.language, r.mime_type)
782 if r.confidence > 1.0
783 else r
784 for r in results
785 ]