Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/chardet/pipeline/orchestrator.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

216 statements  

1"""Pipeline orchestrator — runs all detection stages in sequence. 

2 

3Note: ``from __future__ import annotations`` is intentionally omitted because 

4this module is compiled with mypyc, which does not support PEP 563 string 

5annotations. 

6""" 

7 

8import warnings 

9 

10from chardet._utils import DEFAULT_MAX_BYTES 

11from chardet.enums import EncodingEra 

12from chardet.models import ( 

13 BigramProfile, 

14 has_model_variants, 

15 infer_language, 

16 score_best_language, 

17) 

18from chardet.pipeline import ( 

19 _NONE_RESULT, 

20 DETERMINISTIC_CONFIDENCE, 

21 HIGH_BYTES, 

22 DetectionResult, 

23 PipelineContext, 

24) 

25from chardet.pipeline.ascii import detect_ascii 

26from chardet.pipeline.binary import is_binary 

27from chardet.pipeline.bom import detect_bom 

28from chardet.pipeline.confusion import resolve_confusion_groups 

29from chardet.pipeline.escape import detect_escape_encoding 

30from chardet.pipeline.magic import detect_magic 

31from chardet.pipeline.markup import detect_markup_charset 

32from chardet.pipeline.statistical import score_candidates 

33from chardet.pipeline.structural import ( 

34 compute_lead_byte_diversity, 

35 compute_multibyte_byte_coverage, 

36 compute_structural_score, 

37) 

38from chardet.pipeline.utf8 import detect_utf8 

39from chardet.pipeline.utf1632 import detect_utf1632_patterns 

40from chardet.pipeline.validity import filter_by_validity 

41from chardet.registry import REGISTRY, EncodingInfo, get_candidates 

42 

43_BINARY_RESULT = DetectionResult( 

44 encoding=None, 

45 confidence=DETERMINISTIC_CONFIDENCE, 

46 language=None, 

47 mime_type="application/octet-stream", 

48) 

49# Threshold at which a CJK structural score is confident enough to trigger 

50# combined structural+statistical ranking rather than purely statistical. 

51_STRUCTURAL_CONFIDENCE_THRESHOLD = 0.85 

52 

53# Maximum bytes used for statistical bigram scoring. Bigram models 

54# converge quickly — 16 KB is sufficient for discrimination across all 

55# language models (single-byte and multi-byte alike) while avoiding 

56# unnecessary work on large files. Experimentally verified: 0 real 

57# accuracy losses across 835 test files at this threshold. 

58_STAT_SCORE_MAX_BYTES = 16384 

59 

60# Common Western Latin encodings that share the iso-8859-1 character 

61# repertoire for the byte values where iso-8859-10 is indistinguishable. 

62# Used as swap targets when demoting iso-8859-10 — we prefer these over 

63# iso-8859-10, but do not want to accidentally promote an unrelated encoding 

64# (e.g. windows-1254). 

65_COMMON_LATIN_ENCODINGS: frozenset[str] = frozenset( 

66 { 

67 "iso8859-1", 

68 "iso8859-15", 

69 "cp1252", 

70 } 

71) 

72 

73# Bytes where iso-8859-10 decodes to a different character than iso-8859-1. 

74# Computed programmatically via: 

75# {b for b in range(0x80, 0x100) 

76# if bytes([b]).decode('iso-8859-10') != bytes([b]).decode('iso-8859-1')} 

77_ISO_8859_10_DISTINGUISHING: frozenset[int] = frozenset( 

78 { 

79 0xA1, 

80 0xA2, 

81 0xA3, 

82 0xA4, 

83 0xA5, 

84 0xA6, 

85 0xA8, 

86 0xA9, 

87 0xAA, 

88 0xAB, 

89 0xAC, 

90 0xAE, 

91 0xAF, 

92 0xB1, 

93 0xB2, 

94 0xB3, 

95 0xB4, 

96 0xB5, 

97 0xB6, 

98 0xB8, 

99 0xB9, 

100 0xBA, 

101 0xBB, 

102 0xBC, 

103 0xBD, 

104 0xBE, 

105 0xBF, 

106 0xC0, 

107 0xC7, 

108 0xC8, 

109 0xCA, 

110 0xCC, 

111 0xD1, 

112 0xD2, 

113 0xD7, 

114 0xD9, 

115 0xE0, 

116 0xE7, 

117 0xE8, 

118 0xEA, 

119 0xEC, 

120 0xF1, 

121 0xF2, 

122 0xF7, 

123 0xF9, 

124 0xFF, 

125 } 

126) 

127 

128# Bytes where iso-8859-14 decodes to a different character than iso-8859-1. 

129# Computed programmatically via: 

130# {b for b in range(0x80, 0x100) 

131# if bytes([b]).decode('iso-8859-14') != bytes([b]).decode('iso-8859-1')} 

132_ISO_8859_14_DISTINGUISHING: frozenset[int] = frozenset( 

133 { 

134 0xA1, 

135 0xA2, 

136 0xA4, 

137 0xA5, 

138 0xA6, 

139 0xA8, 

140 0xAA, 

141 0xAB, 

142 0xAC, 

143 0xAF, 

144 0xB0, 

145 0xB1, 

146 0xB2, 

147 0xB3, 

148 0xB4, 

149 0xB5, 

150 0xB7, 

151 0xB8, 

152 0xB9, 

153 0xBA, 

154 0xBB, 

155 0xBC, 

156 0xBD, 

157 0xBE, 

158 0xBF, 

159 0xD0, 

160 0xD7, 

161 0xDE, 

162 0xF0, 

163 0xF7, 

164 0xFE, 

165 } 

166) 

167 

168# Bytes where windows-1254 has Turkish-specific characters that differ from 

169# windows-1252. Windows-1254 differs from windows-1252 at 8 byte positions. 

170# Two (0x8E, 0x9E) are undefined in Windows-1254 but defined in Windows-1252; 

171# these are excluded here because undefined bytes are not useful for 

172# identifying Turkish text. The remaining six positions map to 

173# Turkish-specific letters and are the primary distinguishing signal. 

174_WINDOWS_1254_DISTINGUISHING: frozenset[int] = frozenset( 

175 {0xD0, 0xDD, 0xDE, 0xF0, 0xFD, 0xFE} 

176) 

177 

178# Encodings that are often false positives when their distinguishing bytes 

179# are absent. Keyed by encoding name -> frozenset of byte values where 

180# that encoding differs from iso-8859-1 (or windows-1252 in the case of 

181# windows-1254). 

182# Bytes where HP-Roman8 maps to lowercase accented letters but ISO-8859-1 

183# maps to uppercase letters. Real HP-Roman8 text (from HP-UX terminals) 

184# contains these bytes; data misdetected as HP-Roman8 typically does not. 

185# {b for b in range(0x80, 0x100) 

186# if (unicodedata.category(bytes([b]).decode('hp-roman8')) == 'Ll' 

187# and unicodedata.category(bytes([b]).decode('iso-8859-1')) == 'Lu')} 

188_HP_ROMAN8_DISTINGUISHING: frozenset[int] = frozenset( 

189 { 

190 0xC0, 

191 0xC1, 

192 0xC2, 

193 0xC3, 

194 0xC4, 

195 0xC5, 

196 0xC6, 

197 0xC7, 

198 0xC8, 

199 0xC9, 

200 0xCA, 

201 0xCB, 

202 0xCC, 

203 0xCD, 

204 0xCE, 

205 0xCF, 

206 0xD1, 

207 0xD4, 

208 0xD5, 

209 0xD6, 

210 0xD9, 

211 0xDD, 

212 0xDE, 

213 } 

214) 

215 

216_DEMOTION_CANDIDATES: dict[str, frozenset[int]] = { 

217 "iso8859-10": _ISO_8859_10_DISTINGUISHING, 

218 "iso8859-14": _ISO_8859_14_DISTINGUISHING, 

219 "cp1254": _WINDOWS_1254_DISTINGUISHING, 

220 "hp-roman8": _HP_ROMAN8_DISTINGUISHING, 

221} 

222 

223# Bytes where KOI8-T maps to Tajik-specific Cyrillic letters but KOI8-R 

224# maps to box-drawing characters. Presence of any of these bytes is strong 

225# evidence for KOI8-T over KOI8-R. 

226_KOI8_T_DISTINGUISHING: frozenset[int] = frozenset( 

227 {0x80, 0x81, 0x83, 0x8A, 0x8C, 0x8D, 0x8E, 0x90, 0xA1, 0xA2, 0xA5, 0xB5} 

228) 

229 

230 

231# Markup charset declarations that commonly refer to a Windows superset 

232# encoding rather than the strict standard encoding. Japanese web content 

233# almost universally declares "Shift_JIS" but actually uses CP932 extensions; 

234# similarly, Korean web content declares "EUC-KR" but uses CP949/UHC. 

235# When the declared encoding resolves to the base (left), we check whether 

236# the superset (right) is a better structural match. 

237_MARKUP_SUPERSET_PROMOTIONS: dict[str, str] = { 

238 "shift_jis_2004": "cp932", 

239 "euc_kr": "cp949", 

240} 

241 

242 

243def _try_promote_markup_superset( 

244 data: bytes, 

245 markup_result: DetectionResult, 

246 allowed: frozenset[str], 

247) -> DetectionResult: 

248 """Promote a markup-declared encoding to its superset when structural evidence supports it. 

249 

250 If the declared encoding has a known superset, the superset validates the 

251 data, and the superset's structural score is materially better, return a 

252 new result using the superset encoding. Otherwise return the original. 

253 """ 

254 if markup_result.encoding is None: 

255 return markup_result 

256 superset_name = _MARKUP_SUPERSET_PROMOTIONS.get(markup_result.encoding) 

257 if superset_name is None or superset_name not in allowed: 

258 return markup_result 

259 superset_info = REGISTRY[superset_name] 

260 # Validate: superset must be able to decode the data 

261 try: 

262 data.decode(superset_name, errors="strict") 

263 except (UnicodeDecodeError, LookupError): 

264 return markup_result 

265 # Compare structural scores 

266 ctx = PipelineContext() 

267 base_score = compute_structural_score(data, REGISTRY[markup_result.encoding], ctx) 

268 superset_score = compute_structural_score(data, superset_info, ctx) 

269 if superset_score > base_score: 

270 return DetectionResult( 

271 superset_name, 

272 markup_result.confidence, 

273 markup_result.language, 

274 markup_result.mime_type, 

275 ) 

276 return markup_result 

277 

278 

279def _make_fallback_or_none( 

280 encoding: str, 

281 allowed: frozenset[str], 

282 param_name: str, 

283) -> list[DetectionResult]: 

284 """Return a low-confidence result for *encoding*, or ``encoding=None`` if filtered out. 

285 

286 ``stacklevel=5`` targets the public caller: 

287 detect() -> run_pipeline() -> _run_pipeline_core() -> _make_fallback_or_none(). 

288 """ 

289 if encoding not in allowed: 

290 warnings.warn( 

291 f"{param_name} {encoding!r} is excluded by " 

292 f"include_encodings/exclude_encodings; returning encoding=None", 

293 UserWarning, 

294 stacklevel=5, 

295 ) 

296 return [_NONE_RESULT] 

297 return [DetectionResult(encoding=encoding, confidence=0.10, language=None)] 

298 

299 

300def _should_demote(encoding: str, data: bytes) -> bool: 

301 """Return True if encoding is a demotion candidate with no distinguishing bytes. 

302 

303 Checks whether any non-ASCII byte in *data* falls in the set of byte 

304 values that decode differently under the given encoding vs iso-8859-1. 

305 If none do, the data is equally valid under both encodings and there is 

306 no byte-level evidence for preferring the candidate encoding. 

307 """ 

308 distinguishing = _DEMOTION_CANDIDATES.get(encoding) 

309 if distinguishing is None: 

310 return False 

311 return not any(b in distinguishing for b in data if b > 0x7F) 

312 

313 

314# Minimum structural score (valid multi-byte sequences / lead bytes) required 

315# to keep a CJK multi-byte candidate. Below this threshold the encoding is 

316# eliminated as a false positive (e.g. Shift_JIS matching Latin data where 

317# scattered high bytes look like lead bytes but rarely form valid pairs). 

318_CJK_MIN_MB_RATIO = 0.05 

319# Minimum number of non-ASCII bytes required for a CJK candidate to survive 

320# gating. Very short inputs are validated by the other gates (structural 

321# pair ratio, byte coverage) and by coverage-aware boosting in statistical 

322# scoring — so we keep this threshold low to let even 1-character CJK 

323# inputs compete. 

324_CJK_MIN_NON_ASCII = 2 

325# Minimum ratio of non-ASCII bytes that must participate in valid multi-byte 

326# sequences for a CJK candidate to survive gating. Genuine CJK text has 

327# nearly all non-ASCII bytes in valid pairs (coverage >= 0.95); Latin text 

328# with scattered high bytes has many orphan bytes (coverage often < 0.5). 

329# The lowest true-positive coverage in the test suite is ~0.39 (a CP932 HTML 

330# file with many half-width katakana). 

331_CJK_MIN_BYTE_COVERAGE = 0.35 

332# Minimum number of distinct lead byte values for a CJK candidate to 

333# survive gating. Genuine CJK text uses a wide range of lead bytes; 

334# European false positives cluster in a narrow band. Only applied when 

335# there are enough non-ASCII bytes to expect diversity (see 

336# _CJK_DIVERSITY_MIN_NON_ASCII). 

337_CJK_MIN_LEAD_DIVERSITY = 4 

338# Minimum non-ASCII byte count before applying the lead diversity gate. 

339# Very small files (e.g. 8 non-ASCII bytes) may genuinely have low 

340# diversity even for real CJK text (e.g. repeated katakana). 

341_CJK_DIVERSITY_MIN_NON_ASCII = 16 

342 

343 

344def _gate_cjk_candidates( 

345 data: bytes, 

346 valid_candidates: tuple[EncodingInfo, ...], 

347 ctx: PipelineContext, 

348) -> tuple[EncodingInfo, ...]: 

349 """Eliminate CJK multi-byte candidates that lack genuine multi-byte structure. 

350 

351 Four checks are applied in order to each multi-byte candidate: 

352 

353 1. **Structural pair ratio** (valid_pairs / lead_bytes) must be 

354 >= ``_CJK_MIN_MB_RATIO``. Catches files with many orphan lead bytes. 

355 

356 2. **Minimum non-ASCII byte count**: the data must contain at least 

357 ``_CJK_MIN_NON_ASCII`` bytes > 0x7F. Tiny files with 1-5 high bytes 

358 can accidentally form perfect pairs and score 1.0 structurally. 

359 

360 3. **Byte coverage** (non-ASCII bytes in valid multi-byte sequences / 

361 total non-ASCII bytes) must be >= ``_CJK_MIN_BYTE_COVERAGE``. Latin 

362 text has many high bytes that are NOT consumed by multi-byte pairs; 

363 genuine CJK text has nearly all high bytes accounted for. 

364 

365 4. **Lead byte diversity**: the number of distinct lead byte values in 

366 valid pairs must be >= ``_CJK_MIN_LEAD_DIVERSITY``. Genuine CJK text 

367 draws from a wide repertoire of lead bytes; European false positives 

368 cluster in a narrow band (e.g. 0xC0-0xDF for accented Latin). 

369 

370 Returns the filtered candidate list. Structural scores are cached in 

371 ``ctx.mb_scores`` for reuse in Stage 2b. 

372 """ 

373 gated: list[EncodingInfo] = [] 

374 for enc in valid_candidates: 

375 if enc.is_multibyte: 

376 mb_score = compute_structural_score(data, enc, ctx) 

377 ctx.mb_scores[enc.name] = mb_score 

378 if mb_score < _CJK_MIN_MB_RATIO: 

379 continue # No multi-byte structure -> eliminate 

380 if ctx.non_ascii_count is None: 

381 ctx.non_ascii_count = len(data) - len(data.translate(None, HIGH_BYTES)) 

382 if ctx.non_ascii_count < _CJK_MIN_NON_ASCII: 

383 continue # Too few high bytes to trust the score 

384 byte_coverage = compute_multibyte_byte_coverage( 

385 data, enc, ctx, non_ascii_count=ctx.non_ascii_count 

386 ) 

387 ctx.mb_coverage[enc.name] = byte_coverage 

388 if byte_coverage < _CJK_MIN_BYTE_COVERAGE: 

389 continue # Most high bytes are orphans -> not CJK 

390 if ctx.non_ascii_count >= _CJK_DIVERSITY_MIN_NON_ASCII: 

391 lead_diversity = compute_lead_byte_diversity(data, enc, ctx) 

392 if lead_diversity < _CJK_MIN_LEAD_DIVERSITY: 

393 continue # Too few distinct lead bytes -> not CJK 

394 gated.append(enc) 

395 return tuple(gated) 

396 

397 

398def _score_structural_candidates( 

399 data: bytes, 

400 structural_scores: list[tuple[str, float]], 

401 valid_candidates: tuple[EncodingInfo, ...], 

402 ctx: PipelineContext, 

403) -> list[DetectionResult]: 

404 """Score structurally-valid CJK candidates using statistical bigrams. 

405 

406 When multiple CJK encodings score equally high structurally, statistical 

407 scoring differentiates them (e.g. euc-jp vs big5 for Japanese data). 

408 Single-byte candidates are also scored and included so that the caller 

409 can compare CJK vs single-byte confidence. 

410 

411 Multi-byte candidates with high byte coverage (>= 0.95) receive a 

412 confidence boost proportional to coverage. When nearly all non-ASCII 

413 bytes form valid multi-byte pairs, the structural evidence is strong 

414 and should increase the candidate's ranking relative to single-byte 

415 alternatives whose bigram models may score higher on small samples. 

416 

417 Note: boosted confidence values may exceed 1.0 and are used only for 

418 relative ranking among candidates. ``run_pipeline`` clamps all 

419 confidence values to [0.0, 1.0] before returning to callers. 

420 """ 

421 enc_lookup: dict[str, EncodingInfo] = { 

422 e.name: e for e in valid_candidates if e.is_multibyte 

423 } 

424 valid_mb = tuple( 

425 enc_lookup[name] for name, _sc in structural_scores if name in enc_lookup 

426 ) 

427 single_byte = tuple(e for e in valid_candidates if not e.is_multibyte) 

428 results = list( 

429 score_candidates(data[:_STAT_SCORE_MAX_BYTES], (*valid_mb, *single_byte)) 

430 ) 

431 

432 # Boost multi-byte candidates with high byte coverage. 

433 boosted: list[DetectionResult] = [] 

434 for r in results: 

435 coverage = ctx.mb_coverage.get(r.encoding, 0.0) if r.encoding else 0.0 

436 if coverage >= 0.95: 

437 boosted.append( 

438 DetectionResult( 

439 r.encoding, r.confidence * (1 + coverage), r.language, r.mime_type 

440 ) 

441 ) 

442 else: 

443 boosted.append(r) 

444 boosted.sort(key=lambda x: x.confidence, reverse=True) 

445 return boosted 

446 

447 

448def _demote_niche_latin( 

449 data: bytes, 

450 results: list[DetectionResult], 

451) -> list[DetectionResult]: 

452 """Demote niche Latin encodings when no distinguishing bytes are present. 

453 

454 Some bigram models (e.g. iso-8859-10, iso-8859-14, windows-1254) can win 

455 on data that contains only bytes shared with common Western Latin 

456 encodings. When there is no byte-level evidence for the winning 

457 encoding, promote the first common Western Latin candidate to the top and 

458 push the demoted encoding to last. 

459 """ 

460 if ( 

461 len(results) > 1 

462 and results[0].encoding is not None 

463 and _should_demote(results[0].encoding, data) 

464 ): 

465 demoted_encoding = results[0].encoding 

466 top_conf = results[0].confidence 

467 for r in results[1:]: 

468 if r.encoding in _COMMON_LATIN_ENCODINGS: 

469 promoted = DetectionResult( 

470 r.encoding, top_conf, r.language, r.mime_type 

471 ) 

472 others = [ 

473 x for x in results if x.encoding != demoted_encoding and x is not r 

474 ] 

475 demoted_entries = [x for x in results if x.encoding == demoted_encoding] 

476 return [promoted, *others, *demoted_entries] 

477 return results 

478 

479 

480def _promote_koi8t( 

481 data: bytes, 

482 results: list[DetectionResult], 

483) -> list[DetectionResult]: 

484 """Promote KOI8-T over KOI8-R when Tajik-specific bytes are present. 

485 

486 KOI8-T and KOI8-R share the entire 0xC0-0xFF Cyrillic letter block, 

487 making statistical discrimination difficult. However, KOI8-T maps 12 

488 bytes in 0x80-0xBF to Tajik-specific Cyrillic letters where KOI8-R has 

489 box-drawing characters. If any of these bytes appear, KOI8-T is the 

490 better match. 

491 """ 

492 if not results or results[0].encoding != "koi8-r": 

493 return results 

494 # Check if KOI8-T is anywhere in the results 

495 koi8t_idx = next((i for i, r in enumerate(results) if r.encoding == "koi8-t"), None) 

496 if koi8t_idx is None: 

497 return results 

498 # Check for Tajik-specific bytes 

499 if any(b in _KOI8_T_DISTINGUISHING for b in data if b > 0x7F): 

500 koi8t_result = results[koi8t_idx] 

501 top_conf = results[0].confidence 

502 promoted = DetectionResult( 

503 koi8t_result.encoding, 

504 top_conf, 

505 koi8t_result.language, 

506 koi8t_result.mime_type, 

507 ) 

508 others = [r for i, r in enumerate(results) if i != koi8t_idx] 

509 return [promoted, *others] 

510 return results 

511 

512 

513# Maximum bytes of data used for language scoring in _fill_language. 

514# Language bigrams converge quickly — 2 KB is sufficient for discrimination 

515# across all language models while keeping Tier 3 (language-model scoring) fast. 

516_LANG_SCORE_MAX_BYTES = 2048 

517 

518 

519def _to_utf8(data: bytes, encoding: str) -> bytes | None: 

520 """Decode data from encoding and re-encode as UTF-8 for language scoring. 

521 

522 Returns None if the encoding is unknown. For UTF-8, returns data as-is. 

523 Uses ``errors="ignore"`` because the data already passed byte-validity 

524 filtering for the detected encoding; any residual invalid bytes are 

525 irrelevant for language scoring. 

526 """ 

527 if encoding == "utf-8": 

528 return data 

529 try: 

530 return data.decode(encoding, errors="ignore").encode( 

531 "utf-8", errors="surrogatepass" 

532 ) 

533 except (LookupError, TypeError): 

534 return None 

535 

536 

537def _fill_metadata( 

538 data: bytes, results: list[DetectionResult] 

539) -> list[DetectionResult]: 

540 """Fill in language and mime_type for results missing them. 

541 

542 **Language** (only for text results where ``encoding is not None``): 

543 

544 Tier 1: single-language encodings via hardcoded map (instant). 

545 Tier 2: multi-language encodings via statistical bigram scoring (lazy). 

546 Tier 3: decode to UTF-8, score against UTF-8 language models (universal fallback). 

547 

548 **MIME type**: text results default to ``"text/plain"``, binary results 

549 (``encoding is None``) default to ``"application/octet-stream"``. 

550 """ 

551 filled: list[DetectionResult] = [] 

552 profile: BigramProfile | None = None 

553 utf8_profile: BigramProfile | None = None 

554 for result in results: 

555 lang = result.language 

556 if lang is None and result.encoding is not None: 

557 # Tier 1: single-language encoding 

558 lang = infer_language(result.encoding) 

559 # Tier 2: statistical scoring for multi-language encodings 

560 if lang is None and data and has_model_variants(result.encoding): 

561 if profile is None: 

562 profile = BigramProfile(data) 

563 _, lang = score_best_language(data, result.encoding, profile=profile) 

564 # Tier 3: decode to UTF-8, score against UTF-8 language models 

565 if lang is None and data and has_model_variants("utf-8"): 

566 utf8_data = _to_utf8(data, result.encoding) 

567 if utf8_data: 

568 if utf8_profile is None or result.encoding != "utf-8": 

569 utf8_profile = BigramProfile(utf8_data) 

570 _, lang = score_best_language( 

571 utf8_data, "utf-8", profile=utf8_profile 

572 ) 

573 

574 mime = result.mime_type 

575 if mime is None: 

576 mime = ( 

577 "text/plain" 

578 if result.encoding is not None 

579 else "application/octet-stream" 

580 ) 

581 

582 if lang != result.language or mime != result.mime_type: 

583 filled.append( 

584 DetectionResult(result.encoding, result.confidence, lang, mime) 

585 ) 

586 else: 

587 filled.append(result) 

588 return filled 

589 

590 

591def _postprocess_results( 

592 data: bytes, 

593 results: list[DetectionResult], 

594) -> list[DetectionResult]: 

595 """Apply confusion resolution, niche Latin demotion, and KOI8-T promotion.""" 

596 results = resolve_confusion_groups(data, results) 

597 results = _demote_niche_latin(data, results) 

598 return _promote_koi8t(data, results) 

599 

600 

601def _run_pipeline_core( # noqa: PLR0913 

602 data: bytes, 

603 encoding_era: EncodingEra, 

604 max_bytes: int = DEFAULT_MAX_BYTES, 

605 *, 

606 include_encodings: frozenset[str] | None = None, 

607 exclude_encodings: frozenset[str] | None = None, 

608 no_match_encoding: str = "cp1252", 

609 empty_input_encoding: str = "utf-8", 

610) -> list[DetectionResult]: 

611 """Core pipeline logic. Returns list of results sorted by confidence.""" 

612 ctx = PipelineContext() 

613 data = data[:max_bytes] 

614 

615 # Build candidate set once — used for both early-exit gating and 

616 # statistical scoring. The set incorporates encoding_era, include, and 

617 # exclude filters so all pipeline stages are gated consistently. 

618 candidates = get_candidates(encoding_era, include_encodings, exclude_encodings) 

619 allowed: frozenset[str] = frozenset(enc.name for enc in candidates) 

620 

621 if not data: 

622 return _make_fallback_or_none( 

623 empty_input_encoding, allowed, "empty_input_encoding" 

624 ) 

625 

626 # Stage 1a: BOM detection (runs first — BOMs are definitive and 

627 # UTF-16/32 data looks binary due to null bytes) 

628 bom_result = detect_bom(data) 

629 if bom_result is not None and bom_result.encoding in allowed: 

630 return [bom_result] 

631 

632 # Stage 1a+: UTF-16/32 null-byte pattern detection (for files without 

633 # BOMs — must run before binary detection since these encodings contain 

634 # many null bytes that would trigger the binary check) 

635 utf1632_result = detect_utf1632_patterns(data) 

636 if utf1632_result is not None and utf1632_result.encoding in allowed: 

637 return [utf1632_result] 

638 

639 # Escape-sequence encodings (ISO-2022, HZ-GB-2312, UTF-7): must run 

640 # before binary detection (ESC is a control byte) and before ASCII 

641 # detection (HZ-GB-2312 uses only printable ASCII plus tildes). 

642 escape_result = detect_escape_encoding(data) 

643 if ( 

644 escape_result is not None 

645 and escape_result.encoding is not None 

646 and escape_result.encoding in allowed 

647 ): 

648 return [escape_result] 

649 

650 # Magic number detection for known binary formats — runs before 

651 # UTF-8/ASCII prechecks to avoid unnecessary analysis on binary data. 

652 magic_result = detect_magic(data) 

653 if magic_result is not None: 

654 return [magic_result] 

655 

656 # Pre-check UTF-8 to prevent false binary classification. Valid UTF-8 

657 # with multi-byte sequences can contain control bytes (e.g. ESC for ANSI 

658 # codes) that would otherwise exceed the binary threshold. We compute 

659 # the result now but return it at the normal pipeline position (after 

660 # markup) so that explicit charset declarations still take precedence. 

661 utf8_precheck = detect_utf8(data) 

662 

663 # Pre-check ASCII to prevent false binary classification. ASCII text 

664 # with null byte separators (e.g. find -print0 output) would exceed the 

665 # binary threshold due to the null bytes. Like the UTF-8 precheck, we 

666 # compute the result now but return it at the normal position (after 

667 # markup) so explicit charset declarations still take precedence. 

668 ascii_precheck = detect_ascii(data) 

669 

670 # Stage 0: Binary detection (skip when data is valid UTF-8 or ASCII) 

671 # Binary detection (encoding=None) is NOT gated by filters. 

672 if ( 

673 utf8_precheck is None 

674 and ascii_precheck is None 

675 and is_binary(data, max_bytes=max_bytes) 

676 ): 

677 return [_BINARY_RESULT] 

678 

679 # Stage 1b: Markup charset extraction (before ASCII/UTF-8 so explicit 

680 # declarations like <?xml encoding="iso-8859-1"?> are honoured even 

681 # when the bytes happen to be pure ASCII or valid UTF-8). 

682 markup_result = detect_markup_charset(data) 

683 if markup_result is not None and markup_result.encoding in allowed: 

684 markup_result = _try_promote_markup_superset(data, markup_result, allowed) 

685 return [markup_result] 

686 

687 # Stage 1c: ASCII (use pre-computed result) 

688 if ascii_precheck is not None and ascii_precheck.encoding in allowed: 

689 return [ascii_precheck] 

690 

691 # Stage 1d: UTF-8 structural validation (use pre-computed result) 

692 if utf8_precheck is not None and utf8_precheck.encoding in allowed: 

693 return [utf8_precheck] 

694 

695 # Stage 2a: Byte validity filtering 

696 valid_candidates = filter_by_validity(data, candidates) 

697 

698 if not valid_candidates: 

699 return _make_fallback_or_none(no_match_encoding, allowed, "no_match_encoding") 

700 

701 # Gate: eliminate CJK multi-byte candidates that lack genuine 

702 # multi-byte structure. Cache structural scores for Stage 2b. 

703 valid_candidates = _gate_cjk_candidates(data, valid_candidates, ctx) 

704 

705 if not valid_candidates: 

706 return _make_fallback_or_none(no_match_encoding, allowed, "no_match_encoding") 

707 

708 # Stage 2b: Structural probing for multi-byte encodings 

709 # Reuse scores already computed during the CJK gate above. 

710 structural_scores: list[tuple[str, float]] = [] 

711 for enc in valid_candidates: 

712 if enc.is_multibyte: 

713 score = ctx.mb_scores.get(enc.name) 

714 if score is None: # pragma: no cover - gate always populates cache 

715 score = compute_structural_score(data, enc, ctx) 

716 if score > 0.0: 

717 structural_scores.append((enc.name, score)) 

718 

719 # If a multi-byte encoding scored very high, score all candidates 

720 # (CJK + single-byte) statistically. 

721 if structural_scores: 

722 structural_scores.sort(key=lambda x: x[1], reverse=True) 

723 _, best_score = structural_scores[0] 

724 if best_score >= _STRUCTURAL_CONFIDENCE_THRESHOLD: 

725 results = _score_structural_candidates( 

726 data, structural_scores, valid_candidates, ctx 

727 ) 

728 return _postprocess_results(data, results) 

729 

730 # Stage 3: Statistical scoring for all remaining candidates. 

731 # Bigram models converge quickly and don't benefit from scanning 

732 # beyond 16 KB — cap the data to avoid unnecessary work on large files. 

733 stat_data = data[:_STAT_SCORE_MAX_BYTES] 

734 results = list(score_candidates(stat_data, tuple(valid_candidates))) 

735 if not results: 

736 return _make_fallback_or_none(no_match_encoding, allowed, "no_match_encoding") 

737 

738 return _postprocess_results(data, results) 

739 

740 

741def run_pipeline( # noqa: PLR0913 

742 data: bytes, 

743 encoding_era: EncodingEra, 

744 max_bytes: int = DEFAULT_MAX_BYTES, 

745 *, 

746 include_encodings: frozenset[str] | None = None, 

747 exclude_encodings: frozenset[str] | None = None, 

748 no_match_encoding: str = "cp1252", 

749 empty_input_encoding: str = "utf-8", 

750) -> list[DetectionResult]: 

751 """Run the full detection pipeline. 

752 

753 :param data: The raw byte data to analyze. 

754 :param encoding_era: Filter candidates to a specific era of encodings. 

755 :param max_bytes: Maximum number of bytes to process. 

756 :param include_encodings: If not ``None``, only return these encodings. 

757 :param exclude_encodings: If not ``None``, never return these encodings. 

758 :param no_match_encoding: Encoding returned when no candidate survives. 

759 :param empty_input_encoding: Encoding returned for empty input. 

760 :returns: A list of :class:`DetectionResult` sorted by confidence descending. 

761 """ 

762 results = _run_pipeline_core( 

763 data, 

764 encoding_era, 

765 max_bytes, 

766 include_encodings=include_encodings, 

767 exclude_encodings=exclude_encodings, 

768 no_match_encoding=no_match_encoding, 

769 empty_input_encoding=empty_input_encoding, 

770 ) 

771 # Language scoring uses only the first 2 KB — bigrams converge quickly 

772 # and this keeps Tier 3 (language-model scoring) fast even on large inputs. 

773 results = _fill_metadata(data[:_LANG_SCORE_MAX_BYTES], results) 

774 if not results: # pragma: no cover 

775 msg = "pipeline must always return at least one result" 

776 raise RuntimeError(msg) 

777 # Clamp confidence to [0.0, 1.0] at the public API boundary. Internal 

778 # stages may boost confidence above 1.0 for ranking purposes (e.g. 

779 # CJK byte-coverage boost), but callers expect a probability-like value. 

780 return [ 

781 DetectionResult(r.encoding, min(r.confidence, 1.0), r.language, r.mime_type) 

782 if r.confidence > 1.0 

783 else r 

784 for r in results 

785 ]