1"""Pipeline orchestrator — runs all detection stages in sequence."""
2
3from __future__ import annotations
4
5from chardet._utils import DEFAULT_MAX_BYTES
6from chardet.enums import EncodingEra
7from chardet.models import (
8 BigramProfile,
9 has_model_variants,
10 infer_language,
11 score_best_language,
12)
13from chardet.pipeline import (
14 DETERMINISTIC_CONFIDENCE,
15 HIGH_BYTES,
16 DetectionResult,
17 PipelineContext,
18)
19from chardet.pipeline.ascii import detect_ascii
20from chardet.pipeline.binary import is_binary
21from chardet.pipeline.bom import detect_bom
22from chardet.pipeline.confusion import resolve_confusion_groups
23from chardet.pipeline.escape import detect_escape_encoding
24from chardet.pipeline.markup import detect_markup_charset
25from chardet.pipeline.statistical import score_candidates
26from chardet.pipeline.structural import (
27 compute_lead_byte_diversity,
28 compute_multibyte_byte_coverage,
29 compute_structural_score,
30)
31from chardet.pipeline.utf8 import detect_utf8
32from chardet.pipeline.utf1632 import detect_utf1632_patterns
33from chardet.pipeline.validity import filter_by_validity
34from chardet.registry import REGISTRY, EncodingInfo, get_candidates
35
36_BINARY_RESULT = DetectionResult(
37 encoding=None, confidence=DETERMINISTIC_CONFIDENCE, language=None
38)
39# UTF-8 is the default encoding for empty input, matching web standards
40# (HTML5 default encoding is UTF-8).
41_EMPTY_RESULT = DetectionResult(encoding="utf-8", confidence=0.10, language=None)
42# windows-1252 is the most common single-byte encoding on the web and the
43# HTTP/1.1 default charset — used when no encoding can be determined.
44_FALLBACK_RESULT = DetectionResult(encoding="cp1252", confidence=0.10, language=None)
45# Threshold at which a CJK structural score is confident enough to trigger
46# combined structural+statistical ranking rather than purely statistical.
47_STRUCTURAL_CONFIDENCE_THRESHOLD = 0.85
48
49# Common Western Latin encodings that share the iso-8859-1 character
50# repertoire for the byte values where iso-8859-10 is indistinguishable.
51# Used as swap targets when demoting iso-8859-10 — we prefer these over
52# iso-8859-10, but do not want to accidentally promote an unrelated encoding
53# (e.g. windows-1254).
54_COMMON_LATIN_ENCODINGS: frozenset[str] = frozenset(
55 {
56 "iso8859-1",
57 "iso8859-15",
58 "cp1252",
59 }
60)
61
62# Bytes where iso-8859-10 decodes to a different character than iso-8859-1.
63# Computed programmatically via:
64# {b for b in range(0x80, 0x100)
65# if bytes([b]).decode('iso-8859-10') != bytes([b]).decode('iso-8859-1')}
66_ISO_8859_10_DISTINGUISHING: frozenset[int] = frozenset(
67 {
68 0xA1,
69 0xA2,
70 0xA3,
71 0xA4,
72 0xA5,
73 0xA6,
74 0xA8,
75 0xA9,
76 0xAA,
77 0xAB,
78 0xAC,
79 0xAE,
80 0xAF,
81 0xB1,
82 0xB2,
83 0xB3,
84 0xB4,
85 0xB5,
86 0xB6,
87 0xB8,
88 0xB9,
89 0xBA,
90 0xBB,
91 0xBC,
92 0xBD,
93 0xBE,
94 0xBF,
95 0xC0,
96 0xC7,
97 0xC8,
98 0xCA,
99 0xCC,
100 0xD1,
101 0xD2,
102 0xD7,
103 0xD9,
104 0xE0,
105 0xE7,
106 0xE8,
107 0xEA,
108 0xEC,
109 0xF1,
110 0xF2,
111 0xF7,
112 0xF9,
113 0xFF,
114 }
115)
116
117# Bytes where iso-8859-14 decodes to a different character than iso-8859-1.
118# Computed programmatically via:
119# {b for b in range(0x80, 0x100)
120# if bytes([b]).decode('iso-8859-14') != bytes([b]).decode('iso-8859-1')}
121_ISO_8859_14_DISTINGUISHING: frozenset[int] = frozenset(
122 {
123 0xA1,
124 0xA2,
125 0xA4,
126 0xA5,
127 0xA6,
128 0xA8,
129 0xAA,
130 0xAB,
131 0xAC,
132 0xAF,
133 0xB0,
134 0xB1,
135 0xB2,
136 0xB3,
137 0xB4,
138 0xB5,
139 0xB7,
140 0xB8,
141 0xB9,
142 0xBA,
143 0xBB,
144 0xBC,
145 0xBD,
146 0xBE,
147 0xBF,
148 0xD0,
149 0xD7,
150 0xDE,
151 0xF0,
152 0xF7,
153 0xFE,
154 }
155)
156
157# Bytes where windows-1254 has Turkish-specific characters that differ from
158# windows-1252. Windows-1254 differs from windows-1252 at 8 byte positions.
159# Two (0x8E, 0x9E) are undefined in Windows-1254 but defined in Windows-1252;
160# these are excluded here because undefined bytes are not useful for
161# identifying Turkish text. The remaining six positions map to
162# Turkish-specific letters and are the primary distinguishing signal.
163_WINDOWS_1254_DISTINGUISHING: frozenset[int] = frozenset(
164 {0xD0, 0xDD, 0xDE, 0xF0, 0xFD, 0xFE}
165)
166
167# Encodings that are often false positives when their distinguishing bytes
168# are absent. Keyed by encoding name -> frozenset of byte values where
169# that encoding differs from iso-8859-1 (or windows-1252 in the case of
170# windows-1254).
171_DEMOTION_CANDIDATES: dict[str, frozenset[int]] = {
172 "iso8859-10": _ISO_8859_10_DISTINGUISHING,
173 "iso8859-14": _ISO_8859_14_DISTINGUISHING,
174 "cp1254": _WINDOWS_1254_DISTINGUISHING,
175}
176
177# Bytes where KOI8-T maps to Tajik-specific Cyrillic letters but KOI8-R
178# maps to box-drawing characters. Presence of any of these bytes is strong
179# evidence for KOI8-T over KOI8-R.
180_KOI8_T_DISTINGUISHING: frozenset[int] = frozenset(
181 {0x80, 0x81, 0x83, 0x8A, 0x8C, 0x8D, 0x8E, 0x90, 0xA1, 0xA2, 0xA5, 0xB5}
182)
183
184
185def _should_demote(encoding: str, data: bytes) -> bool:
186 """Return True if encoding is a demotion candidate with no distinguishing bytes.
187
188 Checks whether any non-ASCII byte in *data* falls in the set of byte
189 values that decode differently under the given encoding vs iso-8859-1.
190 If none do, the data is equally valid under both encodings and there is
191 no byte-level evidence for preferring the candidate encoding.
192 """
193 distinguishing = _DEMOTION_CANDIDATES.get(encoding)
194 if distinguishing is None:
195 return False
196 return not any(b in distinguishing for b in data if b > 0x7F)
197
198
199# Minimum structural score (valid multi-byte sequences / lead bytes) required
200# to keep a CJK multi-byte candidate. Below this threshold the encoding is
201# eliminated as a false positive (e.g. Shift_JIS matching Latin data where
202# scattered high bytes look like lead bytes but rarely form valid pairs).
203_CJK_MIN_MB_RATIO = 0.05
204# Minimum number of non-ASCII bytes required for a CJK candidate to survive
205# gating. Very short inputs are validated by the other gates (structural
206# pair ratio, byte coverage) and by coverage-aware boosting in statistical
207# scoring — so we keep this threshold low to let even 1-character CJK
208# inputs compete.
209_CJK_MIN_NON_ASCII = 2
210# Minimum ratio of non-ASCII bytes that must participate in valid multi-byte
211# sequences for a CJK candidate to survive gating. Genuine CJK text has
212# nearly all non-ASCII bytes in valid pairs (coverage >= 0.95); Latin text
213# with scattered high bytes has many orphan bytes (coverage often < 0.5).
214# The lowest true-positive coverage in the test suite is ~0.39 (a CP932 HTML
215# file with many half-width katakana).
216_CJK_MIN_BYTE_COVERAGE = 0.35
217# Minimum number of distinct lead byte values for a CJK candidate to
218# survive gating. Genuine CJK text uses a wide range of lead bytes;
219# European false positives cluster in a narrow band. Only applied when
220# there are enough non-ASCII bytes to expect diversity (see
221# _CJK_DIVERSITY_MIN_NON_ASCII).
222_CJK_MIN_LEAD_DIVERSITY = 4
223# Minimum non-ASCII byte count before applying the lead diversity gate.
224# Very small files (e.g. 8 non-ASCII bytes) may genuinely have low
225# diversity even for real CJK text (e.g. repeated katakana).
226_CJK_DIVERSITY_MIN_NON_ASCII = 16
227
228
229def _gate_cjk_candidates(
230 data: bytes,
231 valid_candidates: tuple[EncodingInfo, ...],
232 ctx: PipelineContext,
233) -> tuple[EncodingInfo, ...]:
234 """Eliminate CJK multi-byte candidates that lack genuine multi-byte structure.
235
236 Four checks are applied in order to each multi-byte candidate:
237
238 1. **Structural pair ratio** (valid_pairs / lead_bytes) must be
239 >= ``_CJK_MIN_MB_RATIO``. Catches files with many orphan lead bytes.
240
241 2. **Minimum non-ASCII byte count**: the data must contain at least
242 ``_CJK_MIN_NON_ASCII`` bytes > 0x7F. Tiny files with 1-5 high bytes
243 can accidentally form perfect pairs and score 1.0 structurally.
244
245 3. **Byte coverage** (non-ASCII bytes in valid multi-byte sequences /
246 total non-ASCII bytes) must be >= ``_CJK_MIN_BYTE_COVERAGE``. Latin
247 text has many high bytes that are NOT consumed by multi-byte pairs;
248 genuine CJK text has nearly all high bytes accounted for.
249
250 4. **Lead byte diversity**: the number of distinct lead byte values in
251 valid pairs must be >= ``_CJK_MIN_LEAD_DIVERSITY``. Genuine CJK text
252 draws from a wide repertoire of lead bytes; European false positives
253 cluster in a narrow band (e.g. 0xC0-0xDF for accented Latin).
254
255 Returns the filtered candidate list. Structural scores are cached in
256 ``ctx.mb_scores`` for reuse in Stage 2b.
257 """
258 gated: list[EncodingInfo] = []
259 for enc in valid_candidates:
260 if enc.is_multibyte:
261 mb_score = compute_structural_score(data, enc, ctx)
262 ctx.mb_scores[enc.name] = mb_score
263 if mb_score < _CJK_MIN_MB_RATIO:
264 continue # No multi-byte structure -> eliminate
265 if ctx.non_ascii_count is None:
266 ctx.non_ascii_count = len(data) - len(data.translate(None, HIGH_BYTES))
267 if ctx.non_ascii_count < _CJK_MIN_NON_ASCII:
268 continue # Too few high bytes to trust the score
269 byte_coverage = compute_multibyte_byte_coverage(
270 data, enc, ctx, non_ascii_count=ctx.non_ascii_count
271 )
272 ctx.mb_coverage[enc.name] = byte_coverage
273 if byte_coverage < _CJK_MIN_BYTE_COVERAGE:
274 continue # Most high bytes are orphans -> not CJK
275 if ctx.non_ascii_count >= _CJK_DIVERSITY_MIN_NON_ASCII:
276 lead_diversity = compute_lead_byte_diversity(data, enc, ctx)
277 if lead_diversity < _CJK_MIN_LEAD_DIVERSITY:
278 continue # Too few distinct lead bytes -> not CJK
279 gated.append(enc)
280 return tuple(gated)
281
282
283def _score_structural_candidates(
284 data: bytes,
285 structural_scores: list[tuple[str, float]],
286 valid_candidates: tuple[EncodingInfo, ...],
287 ctx: PipelineContext,
288) -> list[DetectionResult]:
289 """Score structurally-valid CJK candidates using statistical bigrams.
290
291 When multiple CJK encodings score equally high structurally, statistical
292 scoring differentiates them (e.g. euc-jp vs big5 for Japanese data).
293 Single-byte candidates are also scored and included so that the caller
294 can compare CJK vs single-byte confidence.
295
296 Multi-byte candidates with high byte coverage (>= 0.95) receive a
297 confidence boost proportional to coverage. When nearly all non-ASCII
298 bytes form valid multi-byte pairs, the structural evidence is strong
299 and should increase the candidate's ranking relative to single-byte
300 alternatives whose bigram models may score higher on small samples.
301
302 Note: boosted confidence values may exceed 1.0 and are used only for
303 relative ranking among candidates. ``run_pipeline`` clamps all
304 confidence values to [0.0, 1.0] before returning to callers.
305 """
306 enc_lookup: dict[str, EncodingInfo] = {
307 e.name: e for e in valid_candidates if e.is_multibyte
308 }
309 valid_mb = tuple(
310 enc_lookup[name] for name, _sc in structural_scores if name in enc_lookup
311 )
312 single_byte = tuple(e for e in valid_candidates if not e.is_multibyte)
313 results = list(score_candidates(data, (*valid_mb, *single_byte)))
314
315 # Boost multi-byte candidates with high byte coverage.
316 boosted: list[DetectionResult] = []
317 for r in results:
318 coverage = ctx.mb_coverage.get(r.encoding, 0.0) if r.encoding else 0.0
319 if coverage >= 0.95:
320 boosted.append(
321 DetectionResult(
322 encoding=r.encoding,
323 confidence=r.confidence * (1 + coverage),
324 language=r.language,
325 )
326 )
327 else:
328 boosted.append(r)
329 boosted.sort(key=lambda x: x.confidence, reverse=True)
330 return boosted
331
332
333def _demote_niche_latin(
334 data: bytes,
335 results: list[DetectionResult],
336) -> list[DetectionResult]:
337 """Demote niche Latin encodings when no distinguishing bytes are present.
338
339 Some bigram models (e.g. iso-8859-10, iso-8859-14, windows-1254) can win
340 on data that contains only bytes shared with common Western Latin
341 encodings. When there is no byte-level evidence for the winning
342 encoding, promote the first common Western Latin candidate to the top and
343 push the demoted encoding to last.
344 """
345 if (
346 len(results) > 1
347 and results[0].encoding is not None
348 and _should_demote(results[0].encoding, data)
349 ):
350 demoted_encoding = results[0].encoding
351 for r in results[1:]:
352 if r.encoding in _COMMON_LATIN_ENCODINGS:
353 others = [
354 x for x in results if x.encoding != demoted_encoding and x is not r
355 ]
356 demoted_entries = [x for x in results if x.encoding == demoted_encoding]
357 return [r, *others, *demoted_entries]
358 return results
359
360
361def _promote_koi8t(
362 data: bytes,
363 results: list[DetectionResult],
364) -> list[DetectionResult]:
365 """Promote KOI8-T over KOI8-R when Tajik-specific bytes are present.
366
367 KOI8-T and KOI8-R share the entire 0xC0-0xFF Cyrillic letter block,
368 making statistical discrimination difficult. However, KOI8-T maps 12
369 bytes in 0x80-0xBF to Tajik-specific Cyrillic letters where KOI8-R has
370 box-drawing characters. If any of these bytes appear, KOI8-T is the
371 better match.
372 """
373 if not results or results[0].encoding != "koi8-r":
374 return results
375 # Check if KOI8-T is anywhere in the results
376 koi8t_idx = next((i for i, r in enumerate(results) if r.encoding == "koi8-t"), None)
377 if koi8t_idx is None:
378 return results
379 # Check for Tajik-specific bytes
380 if any(b in _KOI8_T_DISTINGUISHING for b in data if b > 0x7F):
381 koi8t_result = results[koi8t_idx]
382 others = [r for i, r in enumerate(results) if i != koi8t_idx]
383 return [koi8t_result, *others]
384 return results
385
386
387# Maximum bytes of data used for language scoring in _fill_language.
388# Language bigrams converge quickly — 2 KB is sufficient for discrimination
389# across all language models while keeping Tier 3 (language-model scoring) fast.
390_LANG_SCORE_MAX_BYTES = 2048
391
392
393def _to_utf8(data: bytes, encoding: str) -> bytes | None:
394 """Decode data from encoding and re-encode as UTF-8 for language scoring.
395
396 Returns None if the encoding is unknown. For UTF-8, returns data as-is.
397 Uses ``errors="ignore"`` because the data already passed byte-validity
398 filtering for the detected encoding; any residual invalid bytes are
399 irrelevant for language scoring.
400 """
401 if encoding == "utf-8":
402 return data
403 try:
404 return data.decode(encoding, errors="ignore").encode(
405 "utf-8", errors="surrogatepass"
406 )
407 except (LookupError, TypeError):
408 return None
409
410
411def _fill_language(
412 data: bytes, results: list[DetectionResult]
413) -> list[DetectionResult]:
414 """Fill in language for results missing it.
415
416 Tier 1: single-language encodings via hardcoded map (instant).
417 Tier 2: multi-language encodings via statistical bigram scoring (lazy).
418 Tier 3: decode to UTF-8, score against UTF-8 language models (universal fallback).
419 """
420 filled: list[DetectionResult] = []
421 profile: BigramProfile | None = None
422 utf8_profile: BigramProfile | None = None
423 for result in results:
424 if result.language is None and result.encoding is not None:
425 # Tier 1: single-language encoding
426 lang = infer_language(result.encoding)
427 # Tier 2: statistical scoring for multi-language encodings
428 if lang is None and data and has_model_variants(result.encoding):
429 if profile is None:
430 profile = BigramProfile(data)
431 _, lang = score_best_language(data, result.encoding, profile=profile)
432 # Tier 3: decode to UTF-8, score against UTF-8 language models
433 if lang is None and data and has_model_variants("utf-8"):
434 utf8_data = _to_utf8(data, result.encoding)
435 if utf8_data:
436 if utf8_profile is None or result.encoding != "utf-8":
437 utf8_profile = BigramProfile(utf8_data)
438 _, lang = score_best_language(
439 utf8_data, "utf-8", profile=utf8_profile
440 )
441 if lang is not None:
442 filled.append(
443 DetectionResult(
444 encoding=result.encoding,
445 confidence=result.confidence,
446 language=lang,
447 )
448 )
449 continue
450 filled.append(result)
451 return filled
452
453
454def _postprocess_results(
455 data: bytes,
456 results: list[DetectionResult],
457) -> list[DetectionResult]:
458 """Apply confusion resolution, niche Latin demotion, and KOI8-T promotion."""
459 results = resolve_confusion_groups(data, results)
460 results = _demote_niche_latin(data, results)
461 return _promote_koi8t(data, results)
462
463
464def _run_pipeline_core(
465 data: bytes,
466 encoding_era: EncodingEra,
467 max_bytes: int = DEFAULT_MAX_BYTES,
468) -> list[DetectionResult]:
469 """Core pipeline logic. Returns list of results sorted by confidence."""
470 ctx = PipelineContext()
471 data = data[:max_bytes]
472
473 if not data:
474 return [_EMPTY_RESULT]
475
476 # Stage 1a: BOM detection (runs first — BOMs are definitive and
477 # UTF-16/32 data looks binary due to null bytes)
478 bom_result = detect_bom(data)
479 if bom_result is not None:
480 return [bom_result]
481
482 # Stage 1a+: UTF-16/32 null-byte pattern detection (for files without
483 # BOMs — must run before binary detection since these encodings contain
484 # many null bytes that would trigger the binary check)
485 utf1632_result = detect_utf1632_patterns(data)
486 if utf1632_result is not None:
487 return [utf1632_result]
488
489 # Escape-sequence encodings (ISO-2022, HZ-GB-2312, UTF-7): must run
490 # before binary detection (ESC is a control byte) and before ASCII
491 # detection (HZ-GB-2312 uses only printable ASCII plus tildes).
492 # Gate the result on encoding_era so that deprecated encodings like
493 # UTF-7 (disabled by browsers since ~2020 as an XSS vector) are only
494 # returned when the caller's era filter includes them.
495 escape_result = detect_escape_encoding(data)
496 if escape_result is not None and escape_result.encoding is not None:
497 enc_info = REGISTRY.get(escape_result.encoding)
498 if enc_info is None or encoding_era & enc_info.era:
499 return [escape_result]
500
501 # Pre-check UTF-8 to prevent false binary classification. Valid UTF-8
502 # with multi-byte sequences can contain control bytes (e.g. ESC for ANSI
503 # codes) that would otherwise exceed the binary threshold. We compute
504 # the result now but return it at the normal pipeline position (after
505 # markup) so that explicit charset declarations still take precedence.
506 utf8_precheck = detect_utf8(data)
507
508 # Stage 0: Binary detection (skip when data is valid multi-byte UTF-8)
509 if utf8_precheck is None and is_binary(data, max_bytes=max_bytes):
510 return [_BINARY_RESULT]
511
512 # Stage 1b: Markup charset extraction (before ASCII/UTF-8 so explicit
513 # declarations like <?xml encoding="iso-8859-1"?> are honoured even
514 # when the bytes happen to be pure ASCII or valid UTF-8).
515 markup_result = detect_markup_charset(data)
516 if markup_result is not None:
517 return [markup_result]
518
519 # Stage 1c: ASCII
520 ascii_result = detect_ascii(data)
521 if ascii_result is not None:
522 return [ascii_result]
523
524 # Stage 1d: UTF-8 structural validation (use pre-computed result)
525 if utf8_precheck is not None:
526 return [utf8_precheck]
527
528 # Stage 2a: Byte validity filtering
529 candidates = get_candidates(encoding_era)
530 valid_candidates = filter_by_validity(data, candidates)
531
532 if not valid_candidates:
533 return [_FALLBACK_RESULT]
534
535 # Gate: eliminate CJK multi-byte candidates that lack genuine
536 # multi-byte structure. Cache structural scores for Stage 2b.
537 valid_candidates = _gate_cjk_candidates(data, valid_candidates, ctx)
538
539 if not valid_candidates:
540 return [_FALLBACK_RESULT]
541
542 # Stage 2b: Structural probing for multi-byte encodings
543 # Reuse scores already computed during the CJK gate above.
544 structural_scores: list[tuple[str, float]] = []
545 for enc in valid_candidates:
546 if enc.is_multibyte:
547 score = ctx.mb_scores.get(enc.name)
548 if score is None: # pragma: no cover - gate always populates cache
549 score = compute_structural_score(data, enc, ctx)
550 if score > 0.0:
551 structural_scores.append((enc.name, score))
552
553 # If a multi-byte encoding scored very high, score all candidates
554 # (CJK + single-byte) statistically.
555 if structural_scores:
556 structural_scores.sort(key=lambda x: x[1], reverse=True)
557 _, best_score = structural_scores[0]
558 if best_score >= _STRUCTURAL_CONFIDENCE_THRESHOLD:
559 results = _score_structural_candidates(
560 data, structural_scores, valid_candidates, ctx
561 )
562 return _postprocess_results(data, results)
563
564 # Stage 3: Statistical scoring for all remaining candidates
565 results = list(score_candidates(data, tuple(valid_candidates)))
566 if not results:
567 return [_FALLBACK_RESULT]
568
569 return _postprocess_results(data, results)
570
571
572def run_pipeline(
573 data: bytes,
574 encoding_era: EncodingEra,
575 max_bytes: int = DEFAULT_MAX_BYTES,
576) -> list[DetectionResult]:
577 """Run the full detection pipeline.
578
579 :param data: The raw byte data to analyze.
580 :param encoding_era: Filter candidates to a specific era of encodings.
581 :param max_bytes: Maximum number of bytes to process.
582 :returns: A list of :class:`DetectionResult` sorted by confidence descending.
583 """
584 results = _run_pipeline_core(data, encoding_era, max_bytes)
585 # Language scoring uses only the first 2 KB — bigrams converge quickly
586 # and this keeps Tier 3 (language-model scoring) fast even on large inputs.
587 results = _fill_language(data[:_LANG_SCORE_MAX_BYTES], results)
588 if not results: # pragma: no cover
589 msg = "pipeline must always return at least one result"
590 raise RuntimeError(msg)
591 # Clamp confidence to [0.0, 1.0] at the public API boundary. Internal
592 # stages may boost confidence above 1.0 for ranking purposes (e.g.
593 # CJK byte-coverage boost), but callers expect a probability-like value.
594 return [
595 DetectionResult(r.encoding, min(r.confidence, 1.0), r.language)
596 if r.confidence > 1.0
597 else r
598 for r in results
599 ]