Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/chardet/pipeline/orchestrator.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

132 statements  

1"""Pipeline orchestrator — runs all detection stages in sequence. 

2 

3Note: ``from __future__ import annotations`` is intentionally omitted because 

4this module is compiled with mypyc, which does not support PEP 563 string 

5annotations. 

6""" 

7 

8import warnings 

9 

10from chardet._utils import DEFAULT_MAX_BYTES 

11from chardet.enums import EncodingEra 

12from chardet.pipeline import ( 

13 _NONE_RESULT, 

14 DETERMINISTIC_CONFIDENCE, 

15 HIGH_BYTES, 

16 DetectionResult, 

17 PipelineContext, 

18) 

19from chardet.pipeline.ascii import detect_ascii 

20from chardet.pipeline.binary import is_binary 

21from chardet.pipeline.bom import detect_bom 

22from chardet.pipeline.escape import detect_escape_encoding 

23from chardet.pipeline.language import fill_languages 

24from chardet.pipeline.magic import detect_magic 

25from chardet.pipeline.markup import detect_markup_charset, promote_markup_superset 

26from chardet.pipeline.postprocess import postprocess_results 

27from chardet.pipeline.statistical import score_candidates 

28from chardet.pipeline.structural import ( 

29 compute_lead_byte_diversity, 

30 compute_multibyte_byte_coverage, 

31 compute_structural_score, 

32) 

33from chardet.pipeline.utf8 import detect_utf8 

34from chardet.pipeline.utf1632 import detect_utf1632_patterns 

35from chardet.pipeline.validity import filter_by_validity 

36from chardet.registry import EncodingInfo, get_candidates 

37 

38_BINARY_RESULT = DetectionResult( 

39 encoding=None, 

40 confidence=DETERMINISTIC_CONFIDENCE, 

41 language=None, 

42 mime_type="application/octet-stream", 

43) 

44# Threshold at which a CJK structural score is confident enough to trigger 

45# combined structural+statistical ranking rather than purely statistical. 

46_STRUCTURAL_CONFIDENCE_THRESHOLD = 0.85 

47 

48# Maximum bytes used for statistical bigram scoring. Bigram models 

49# converge quickly — 16 KB is sufficient for discrimination across all 

50# language models (single-byte and multi-byte alike) while avoiding 

51# unnecessary work on large files. Experimentally verified: 0 real 

52# accuracy losses across 835 test files at this threshold. 

53_STAT_SCORE_MAX_BYTES = 16384 

54 

55 

56def _make_fallback_or_none( 

57 encoding: str, 

58 allowed: frozenset[str], 

59 param_name: str, 

60) -> list[DetectionResult]: 

61 """Return a low-confidence result for *encoding*, or ``encoding=None`` if filtered out. 

62 

63 ``stacklevel=5`` targets the public caller: 

64 detect() -> run_pipeline() -> _run_pipeline_core() -> _make_fallback_or_none(). 

65 """ 

66 if encoding not in allowed: 

67 warnings.warn( 

68 f"{param_name} {encoding!r} is excluded by " 

69 f"include_encodings/exclude_encodings; returning encoding=None", 

70 UserWarning, 

71 stacklevel=5, 

72 ) 

73 return [_NONE_RESULT] 

74 return [DetectionResult(encoding=encoding, confidence=0.10, language=None)] 

75 

76 

77# Minimum structural score (valid multi-byte sequences / lead bytes) required 

78# to keep a CJK multi-byte candidate. Below this threshold the encoding is 

79# eliminated as a false positive (e.g. Shift_JIS matching Latin data where 

80# scattered high bytes look like lead bytes but rarely form valid pairs). 

81_CJK_MIN_MB_RATIO = 0.05 

82# Minimum number of non-ASCII bytes required for a CJK candidate to survive 

83# gating. Very short inputs are validated by the other gates (structural 

84# pair ratio, byte coverage) and by coverage-aware boosting in statistical 

85# scoring — so we keep this threshold low to let even 1-character CJK 

86# inputs compete. 

87_CJK_MIN_NON_ASCII = 2 

88# Minimum ratio of non-ASCII bytes that must participate in valid multi-byte 

89# sequences for a CJK candidate to survive gating. Genuine CJK text has 

90# nearly all non-ASCII bytes in valid pairs (coverage >= 0.95); Latin text 

91# with scattered high bytes has many orphan bytes (coverage often < 0.5). 

92# The lowest true-positive coverage in the test suite is ~0.39 (a CP932 HTML 

93# file with many half-width katakana). 

94_CJK_MIN_BYTE_COVERAGE = 0.35 

95# Minimum number of distinct lead byte values for a CJK candidate to 

96# survive gating. Genuine CJK text uses a wide range of lead bytes; 

97# European false positives cluster in a narrow band. Only applied when 

98# there are enough non-ASCII bytes to expect diversity (see 

99# _CJK_DIVERSITY_MIN_NON_ASCII). 

100_CJK_MIN_LEAD_DIVERSITY = 4 

101# Minimum non-ASCII byte count before applying the lead diversity gate. 

102# Very small files (e.g. 8 non-ASCII bytes) may genuinely have low 

103# diversity even for real CJK text (e.g. repeated katakana). 

104_CJK_DIVERSITY_MIN_NON_ASCII = 16 

105 

106 

107def _gate_cjk_candidates( 

108 data: bytes, 

109 valid_candidates: tuple[EncodingInfo, ...], 

110 ctx: PipelineContext, 

111) -> tuple[EncodingInfo, ...]: 

112 """Eliminate CJK multi-byte candidates that lack genuine multi-byte structure. 

113 

114 Four checks are applied in order to each multi-byte candidate: 

115 

116 1. **Structural pair ratio** (valid_pairs / lead_bytes) must be 

117 >= ``_CJK_MIN_MB_RATIO``. Catches files with many orphan lead bytes. 

118 

119 2. **Minimum non-ASCII byte count**: the data must contain at least 

120 ``_CJK_MIN_NON_ASCII`` bytes > 0x7F. Tiny files with 1-5 high bytes 

121 can accidentally form perfect pairs and score 1.0 structurally. 

122 

123 3. **Byte coverage** (non-ASCII bytes in valid multi-byte sequences / 

124 total non-ASCII bytes) must be >= ``_CJK_MIN_BYTE_COVERAGE``. Latin 

125 text has many high bytes that are NOT consumed by multi-byte pairs; 

126 genuine CJK text has nearly all high bytes accounted for. 

127 

128 4. **Lead byte diversity**: the number of distinct lead byte values in 

129 valid pairs must be >= ``_CJK_MIN_LEAD_DIVERSITY``. Genuine CJK text 

130 draws from a wide repertoire of lead bytes; European false positives 

131 cluster in a narrow band (e.g. 0xC0-0xDF for accented Latin). 

132 

133 Returns the filtered candidate list. Structural scores are cached in 

134 ``ctx.mb_scores`` for reuse in Stage 2b. 

135 """ 

136 gated: list[EncodingInfo] = [] 

137 for enc in valid_candidates: 

138 if enc.is_multibyte: 

139 mb_score = compute_structural_score(data, enc, ctx) 

140 ctx.mb_scores[enc.name] = mb_score 

141 if mb_score < _CJK_MIN_MB_RATIO: 

142 continue # No multi-byte structure -> eliminate 

143 if ctx.non_ascii_count is None: 

144 ctx.non_ascii_count = len(data) - len(data.translate(None, HIGH_BYTES)) 

145 if ctx.non_ascii_count < _CJK_MIN_NON_ASCII: 

146 continue # Too few high bytes to trust the score 

147 byte_coverage = compute_multibyte_byte_coverage( 

148 data, enc, ctx, non_ascii_count=ctx.non_ascii_count 

149 ) 

150 ctx.mb_coverage[enc.name] = byte_coverage 

151 if byte_coverage < _CJK_MIN_BYTE_COVERAGE: 

152 continue # Most high bytes are orphans -> not CJK 

153 if ctx.non_ascii_count >= _CJK_DIVERSITY_MIN_NON_ASCII: 

154 lead_diversity = compute_lead_byte_diversity(data, enc, ctx) 

155 if lead_diversity < _CJK_MIN_LEAD_DIVERSITY: 

156 continue # Too few distinct lead bytes -> not CJK 

157 gated.append(enc) 

158 return tuple(gated) 

159 

160 

161def _score_structural_candidates( 

162 data: bytes, 

163 structural_scores: list[tuple[str, float]], 

164 valid_candidates: tuple[EncodingInfo, ...], 

165 ctx: PipelineContext, 

166) -> list[DetectionResult]: 

167 """Score structurally-valid CJK candidates using statistical bigrams. 

168 

169 When multiple CJK encodings score equally high structurally, statistical 

170 scoring differentiates them (e.g. euc-jp vs big5 for Japanese data). 

171 Single-byte candidates are also scored and included so that the caller 

172 can compare CJK vs single-byte confidence. 

173 

174 Multi-byte candidates with high byte coverage (>= 0.95) receive a 

175 confidence boost proportional to coverage. When nearly all non-ASCII 

176 bytes form valid multi-byte pairs, the structural evidence is strong 

177 and should increase the candidate's ranking relative to single-byte 

178 alternatives whose bigram models may score higher on small samples. 

179 

180 Note: boosted confidence values may exceed 1.0 and are used only for 

181 relative ranking among candidates. ``run_pipeline`` clamps all 

182 confidence values to [0.0, 1.0] before returning to callers. 

183 """ 

184 enc_lookup: dict[str, EncodingInfo] = { 

185 e.name: e for e in valid_candidates if e.is_multibyte 

186 } 

187 valid_mb = tuple( 

188 enc_lookup[name] for name, _sc in structural_scores if name in enc_lookup 

189 ) 

190 single_byte = tuple(e for e in valid_candidates if not e.is_multibyte) 

191 results = list( 

192 score_candidates(data[:_STAT_SCORE_MAX_BYTES], (*valid_mb, *single_byte)) 

193 ) 

194 

195 # Boost multi-byte candidates with high byte coverage. 

196 boosted: list[DetectionResult] = [] 

197 for r in results: 

198 coverage = ctx.mb_coverage.get(r.encoding, 0.0) if r.encoding else 0.0 

199 if coverage >= 0.95: 

200 boosted.append( 

201 DetectionResult( 

202 r.encoding, r.confidence * (1 + coverage), r.language, r.mime_type 

203 ) 

204 ) 

205 else: 

206 boosted.append(r) 

207 boosted.sort(key=lambda x: x.confidence, reverse=True) 

208 return boosted 

209 

210 

211def _with_default_mime(result: DetectionResult) -> DetectionResult: 

212 """Default ``mime_type`` to ``text/plain`` (text) or ``application/octet-stream`` (binary).""" 

213 if result.mime_type is not None: 

214 return result 

215 mime = "text/plain" if result.encoding is not None else "application/octet-stream" 

216 return DetectionResult(result.encoding, result.confidence, result.language, mime) 

217 

218 

219def _run_pipeline_core( # noqa: PLR0913 

220 data: bytes, 

221 encoding_era: EncodingEra, 

222 max_bytes: int = DEFAULT_MAX_BYTES, 

223 *, 

224 include_encodings: frozenset[str] | None = None, 

225 exclude_encodings: frozenset[str] | None = None, 

226 no_match_encoding: str = "cp1252", 

227 empty_input_encoding: str = "utf-8", 

228) -> list[DetectionResult]: 

229 """Core pipeline logic. Returns list of results sorted by confidence.""" 

230 ctx = PipelineContext() 

231 data = data[:max_bytes] 

232 

233 # Build candidate set once — used for both early-exit gating and 

234 # statistical scoring. The set incorporates encoding_era, include, and 

235 # exclude filters so all pipeline stages are gated consistently. 

236 candidates = get_candidates(encoding_era, include_encodings, exclude_encodings) 

237 allowed: frozenset[str] = frozenset(enc.name for enc in candidates) 

238 

239 if not data: 

240 return _make_fallback_or_none( 

241 empty_input_encoding, allowed, "empty_input_encoding" 

242 ) 

243 

244 # Stage 1a: BOM detection (runs first — BOMs are definitive and 

245 # UTF-16/32 data looks binary due to null bytes) 

246 bom_result = detect_bom(data) 

247 if bom_result is not None and bom_result.encoding in allowed: 

248 return [bom_result] 

249 

250 # Stage 1a+: UTF-16/32 null-byte pattern detection (for files without 

251 # BOMs — must run before binary detection since these encodings contain 

252 # many null bytes that would trigger the binary check) 

253 utf1632_result = detect_utf1632_patterns(data) 

254 if utf1632_result is not None and utf1632_result.encoding in allowed: 

255 return [utf1632_result] 

256 

257 # Escape-sequence encodings (ISO-2022, HZ-GB-2312, UTF-7): must run 

258 # before binary detection (ESC is a control byte) and before ASCII 

259 # detection (HZ-GB-2312 uses only printable ASCII plus tildes). 

260 escape_result = detect_escape_encoding(data) 

261 if ( 

262 escape_result is not None 

263 and escape_result.encoding is not None 

264 and escape_result.encoding in allowed 

265 ): 

266 return [escape_result] 

267 

268 # Magic number detection for known binary formats — runs before 

269 # UTF-8/ASCII prechecks to avoid unnecessary analysis on binary data. 

270 magic_result = detect_magic(data) 

271 if magic_result is not None: 

272 return [magic_result] 

273 

274 # Pre-check UTF-8 to prevent false binary classification. Valid UTF-8 

275 # with multi-byte sequences can contain control bytes (e.g. ESC for ANSI 

276 # codes) that would otherwise exceed the binary threshold. We compute 

277 # the result now but return it at the normal pipeline position (after 

278 # markup) so that explicit charset declarations still take precedence. 

279 utf8_precheck = detect_utf8(data) 

280 

281 # Pre-check ASCII to prevent false binary classification. ASCII text 

282 # with null byte separators (e.g. find -print0 output) would exceed the 

283 # binary threshold due to the null bytes. Like the UTF-8 precheck, we 

284 # compute the result now but return it at the normal position (after 

285 # markup) so explicit charset declarations still take precedence. 

286 ascii_precheck = detect_ascii(data) 

287 

288 # Stage 0: Binary detection (skip when data is valid UTF-8 or ASCII) 

289 # Binary detection (encoding=None) is NOT gated by filters. 

290 if ( 

291 utf8_precheck is None 

292 and ascii_precheck is None 

293 and is_binary(data, max_bytes=max_bytes) 

294 ): 

295 return [_BINARY_RESULT] 

296 

297 # Stage 1b: Markup charset extraction (before ASCII/UTF-8 so explicit 

298 # declarations like <?xml encoding="iso-8859-1"?> are honoured even 

299 # when the bytes happen to be pure ASCII or valid UTF-8). 

300 markup_result = detect_markup_charset(data) 

301 if markup_result is not None and markup_result.encoding in allowed: 

302 markup_result = promote_markup_superset(data, markup_result, allowed) 

303 return [markup_result] 

304 

305 # Stage 1c: ASCII (use pre-computed result) 

306 if ascii_precheck is not None and ascii_precheck.encoding in allowed: 

307 return [ascii_precheck] 

308 

309 # Stage 1d: UTF-8 structural validation (use pre-computed result) 

310 if utf8_precheck is not None and utf8_precheck.encoding in allowed: 

311 return [utf8_precheck] 

312 

313 # Stage 2a: Byte validity filtering 

314 valid_candidates = filter_by_validity(data, candidates) 

315 

316 if not valid_candidates: 

317 return _make_fallback_or_none(no_match_encoding, allowed, "no_match_encoding") 

318 

319 # Gate: eliminate CJK multi-byte candidates that lack genuine 

320 # multi-byte structure. Cache structural scores for Stage 2b. 

321 valid_candidates = _gate_cjk_candidates(data, valid_candidates, ctx) 

322 

323 if not valid_candidates: 

324 return _make_fallback_or_none(no_match_encoding, allowed, "no_match_encoding") 

325 

326 # Stage 2b: Structural probing for multi-byte encodings 

327 # Reuse scores already computed during the CJK gate above. 

328 structural_scores: list[tuple[str, float]] = [] 

329 for enc in valid_candidates: 

330 if enc.is_multibyte: 

331 score = ctx.mb_scores.get(enc.name) 

332 if score is None: # pragma: no cover - gate always populates cache 

333 score = compute_structural_score(data, enc, ctx) 

334 if score > 0.0: 

335 structural_scores.append((enc.name, score)) 

336 

337 # If a multi-byte encoding scored very high, score all candidates 

338 # (CJK + single-byte) statistically. 

339 if structural_scores: 

340 structural_scores.sort(key=lambda x: x[1], reverse=True) 

341 _, best_score = structural_scores[0] 

342 if best_score >= _STRUCTURAL_CONFIDENCE_THRESHOLD: 

343 results = _score_structural_candidates( 

344 data, structural_scores, valid_candidates, ctx 

345 ) 

346 if results: 

347 return postprocess_results(data, results) 

348 

349 # Stage 3: Statistical scoring for all remaining candidates. 

350 # Bigram models converge quickly and don't benefit from scanning 

351 # beyond 16 KB — cap the data to avoid unnecessary work on large files. 

352 stat_data = data[:_STAT_SCORE_MAX_BYTES] 

353 results = list(score_candidates(stat_data, tuple(valid_candidates))) 

354 if not results: 

355 return _make_fallback_or_none(no_match_encoding, allowed, "no_match_encoding") 

356 

357 return postprocess_results(data, results) 

358 

359 

360def run_pipeline( # noqa: PLR0913 

361 data: bytes, 

362 encoding_era: EncodingEra, 

363 max_bytes: int = DEFAULT_MAX_BYTES, 

364 *, 

365 include_encodings: frozenset[str] | None = None, 

366 exclude_encodings: frozenset[str] | None = None, 

367 no_match_encoding: str = "cp1252", 

368 empty_input_encoding: str = "utf-8", 

369) -> list[DetectionResult]: 

370 """Run the full detection pipeline. 

371 

372 :param data: The raw byte data to analyze. 

373 :param encoding_era: Filter candidates to a specific era of encodings. 

374 :param max_bytes: Maximum number of bytes to process. 

375 :param include_encodings: If not ``None``, only return these encodings. 

376 :param exclude_encodings: If not ``None``, never return these encodings. 

377 :param no_match_encoding: Encoding returned when no candidate survives. 

378 :param empty_input_encoding: Encoding returned for empty input. 

379 :returns: A list of :class:`DetectionResult` sorted by confidence descending. 

380 """ 

381 results = _run_pipeline_core( 

382 data, 

383 encoding_era, 

384 max_bytes, 

385 include_encodings=include_encodings, 

386 exclude_encodings=exclude_encodings, 

387 no_match_encoding=no_match_encoding, 

388 empty_input_encoding=empty_input_encoding, 

389 ) 

390 results = fill_languages(data, results) 

391 results = [_with_default_mime(r) for r in results] 

392 if not results: # pragma: no cover 

393 msg = "pipeline must always return at least one result" 

394 raise RuntimeError(msg) 

395 # Clamp confidence to [0.0, 1.0] at the public API boundary. Internal 

396 # stages may boost confidence above 1.0 for ranking purposes (e.g. 

397 # CJK byte-coverage boost), but callers expect a probability-like value. 

398 return [ 

399 DetectionResult(r.encoding, min(r.confidence, 1.0), r.language, r.mime_type) 

400 if r.confidence > 1.0 

401 else r 

402 for r in results 

403 ]