1######################## BEGIN LICENSE BLOCK ########################
2# The Original Code is Mozilla Universal charset detector code.
3#
4# The Initial Developer of the Original Code is
5# Netscape Communications Corporation.
6# Portions created by the Initial Developer are Copyright (C) 2001
7# the Initial Developer. All Rights Reserved.
8#
9# Contributor(s):
10# Mark Pilgrim - port to Python
11# Shy Shalom - original C code
12#
13# This library is free software; you can redistribute it and/or
14# modify it under the terms of the GNU Lesser General Public
15# License as published by the Free Software Foundation; either
16# version 2.1 of the License, or (at your option) any later version.
17#
18# This library is distributed in the hope that it will be useful,
19# but WITHOUT ANY WARRANTY; without even the implied warranty of
20# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
21# Lesser General Public License for more details.
22#
23# You should have received a copy of the GNU Lesser General Public
24# License along with this library; if not, see
25# <https://www.gnu.org/licenses/>.
26######################### END LICENSE BLOCK #########################
27"""
28Module containing the UniversalDetector detector class, which is the primary
29class a user of ``chardet`` should use.
30
31:author: Mark Pilgrim (initial port to Python)
32:author: Shy Shalom (original C code)
33:author: Dan Blanchard (major refactoring for 3.0)
34:author: Ian Cordasco
35"""
36
37import codecs
38import logging
39import re
40from typing import Optional, Union
41
42from .charsetgroupprober import CharSetGroupProber
43from .charsetprober import CharSetProber
44from .enums import EncodingEra, InputState, LanguageFilter, ProbingState
45from .escprober import EscCharSetProber
46from .mbcsgroupprober import MBCSGroupProber
47from .metadata.charsets import get_charset, is_unicode_encoding
48from .resultdict import ResultDict
49from .sbcsgroupprober import ISO_WIN_MAP, SBCSGroupProber
50from .utf1632prober import UTF1632Prober
51
52
53class UniversalDetector:
54 """
55 The ``UniversalDetector`` class underlies the ``chardet.detect`` function
56 and coordinates all of the different charset probers.
57
58 To get a ``dict`` containing an encoding and its confidence, you can simply
59 run:
60
61 .. code::
62
63 u = UniversalDetector()
64 u.feed(some_bytes)
65 u.close()
66 detected = u.result
67
68 """
69
70 MINIMUM_THRESHOLD = 0.20
71 HIGH_BYTE_DETECTOR = re.compile(b"[\x80-\xff]")
72 ESC_DETECTOR = re.compile(b"(\033|~{)")
73 # Threshold for "very close" confidence scores where era preference applies
74 VERY_CLOSE_THRESHOLD = 0.005 # 0.5%
75
76 # Map ISO encodings to their Windows equivalents (imported from sbcsgroupprober)
77 ISO_WIN_MAP = ISO_WIN_MAP
78
79 # Based on https://encoding.spec.whatwg.org/#names-and-labels
80 # Maps legacy encoding names to their modern/superset equivalents.
81 # Uses Python's canonical codec names (case-insensitive).
82 LEGACY_MAP = {
83 "ascii": "Windows-1252", # ASCII is subset of Windows-1252
84 "euc-kr": "CP949", # EUC-KR extended by CP949 (aka Windows-949)
85 "iso-8859-1": "Windows-1252", # Latin-1 extended by Windows-1252
86 "iso-8859-2": "Windows-1250", # Central European
87 "iso-8859-5": "Windows-1251", # Cyrillic
88 "iso-8859-6": "Windows-1256", # Arabic
89 "iso-8859-7": "Windows-1253", # Greek
90 "iso-8859-8": "Windows-1255", # Hebrew
91 "iso-8859-9": "Windows-1254", # Turkish
92 "iso-8859-11": "CP874", # Thai, extended by CP874 (aka Windows-874)
93 "iso-8859-13": "Windows-1257", # Baltic
94 "tis-620": "CP874", # Thai, equivalent to Windows-874
95 }
96
97 def __init__(
98 self,
99 lang_filter: LanguageFilter = LanguageFilter.ALL,
100 should_rename_legacy: bool | None = None,
101 encoding_era: EncodingEra = EncodingEra.MODERN_WEB,
102 max_bytes: int = 200_000,
103 ) -> None:
104 self._esc_charset_prober: Optional[EscCharSetProber] = None
105 self._utf1632_prober: Optional[UTF1632Prober] = None
106 self._charset_probers: list[CharSetProber] = []
107 self.result: ResultDict = {
108 "encoding": None,
109 "confidence": 0.0,
110 "language": None,
111 }
112 self.done = False
113 self._got_data = False
114 self._input_state = InputState.PURE_ASCII
115 self._last_char = b""
116 self.lang_filter = lang_filter
117 self.logger = logging.getLogger(__name__)
118 if should_rename_legacy is None:
119 should_rename_legacy = encoding_era == EncodingEra.MODERN_WEB
120 self.should_rename_legacy = should_rename_legacy
121 self.encoding_era = encoding_era
122 self._total_bytes_fed = 0
123 self.max_bytes = max_bytes
124 self.reset()
125
126 @property
127 def input_state(self) -> int:
128 return self._input_state
129
130 @property
131 def has_win_bytes(self) -> bool:
132 """Check if Windows-specific bytes were detected by the SBCS prober."""
133 for prober in self._charset_probers:
134 if isinstance(prober, SBCSGroupProber):
135 return prober._has_win_bytes
136 return False
137
138 @property
139 def charset_probers(self) -> list[CharSetProber]:
140 return self._charset_probers
141
142 @property
143 def nested_probers(self) -> list[CharSetProber]:
144 """Get a flat list of all nested charset probers."""
145 nested = []
146 for prober in self._charset_probers:
147 if isinstance(prober, CharSetGroupProber):
148 nested.extend(getattr(prober, "probers", []))
149 else:
150 nested.append(prober)
151 return nested
152
153 @property
154 def active_probers(self) -> list[CharSetProber]:
155 """Get a flat list of all active (not falsey and not in NOT_ME state) nested charset probers."""
156 return [prober for prober in self.nested_probers if prober and prober.active]
157
158 def _apply_encoding_heuristics(
159 self, charset_name: str, confidence: float, winning_prober: CharSetProber
160 ) -> tuple[str, float]:
161 """
162 Apply heuristic adjustments to the winning encoding based on:
163 1. Encoding era preferences (prefer newer/Unicode encodings)
164 2. Mac/Windows/ISO byte pattern disambiguation
165
166 Returns: (adjusted_charset_name, adjusted_confidence)
167 """
168 lower_charset_name = charset_name.lower()
169
170 # Build a cache of all alternative probers in a single pass
171 # Only consider top-level probers (group probers like SBCS, MBCS, UTF-8, etc.)
172 # Do NOT look inside group probers - they handle their own disambiguation
173 current_charset = get_charset(lower_charset_name)
174 current_era = current_charset.encoding_era.value
175 current_is_unicode = is_unicode_encoding(lower_charset_name)
176
177 for prober in self._charset_probers:
178 if not prober or not prober.active or prober == winning_prober:
179 continue
180
181 alt_charset_name = (prober.charset_name or "").lower()
182 if not alt_charset_name: # Skip probers without a charset name
183 continue
184
185 alt_confidence = prober.get_confidence()
186 alt_charset = get_charset(alt_charset_name)
187 alt_era = alt_charset.encoding_era.value
188 alt_is_unicode = is_unicode_encoding(alt_charset_name)
189
190 should_prefer_alt = False
191 if alt_era < current_era:
192 # Alternative has better (lower numbered) era
193 should_prefer_alt = True
194 elif alt_era == current_era and alt_is_unicode and not current_is_unicode:
195 # Both same era, but alt is Unicode
196 should_prefer_alt = True
197
198 # If alternative should be preferred and is very close in confidence
199 if should_prefer_alt and alt_confidence >= confidence * (
200 1 - self.VERY_CLOSE_THRESHOLD
201 ):
202 charset_name = alt_charset_name
203 lower_charset_name = charset_name
204 confidence = alt_confidence
205 current_era = alt_era
206 current_is_unicode = alt_is_unicode
207 self.logger.debug(
208 f"Era preference: {alt_charset} (era {alt_era}, unicode={alt_is_unicode}) "
209 f"preferred over prior winner"
210 )
211
212 # Single-byte encoding heuristics are now handled in SBCSGroupProber
213 # No additional heuristics needed here at the UniversalDetector level
214
215 return charset_name, confidence
216
217 def _get_utf8_prober(self) -> Optional[CharSetProber]:
218 """
219 Get the UTF-8 prober from the charset probers.
220 Returns None if not found.
221 """
222 for prober in self.nested_probers:
223 if prober.charset_name and "utf-8" in prober.charset_name.lower():
224 return prober
225 return None
226
227 def reset(self) -> None:
228 """
229 Reset the UniversalDetector and all of its probers back to their
230 initial states. This is called by ``__init__``, so you only need to
231 call this directly in between analyses of different documents.
232 """
233 self.result = {"encoding": None, "confidence": 0.0, "language": None}
234 self.done = False
235 self._got_data = False
236 self._input_state = InputState.PURE_ASCII
237 self._last_char = b""
238 self._total_bytes_fed = 0
239 if self._esc_charset_prober:
240 self._esc_charset_prober.reset()
241 if self._utf1632_prober:
242 self._utf1632_prober.reset()
243 for prober in self._charset_probers:
244 prober.reset()
245
246 def feed(self, byte_str: Union[bytes, bytearray]) -> None:
247 """
248 Takes a chunk of a document and feeds it through all of the relevant
249 charset probers.
250
251 After calling ``feed``, you can check the value of the ``done``
252 attribute to see if you need to continue feeding the
253 ``UniversalDetector`` more data, or if it has made a prediction
254 (in the ``result`` attribute).
255
256 .. note::
257 You should always call ``close`` when you're done feeding in your
258 document if ``done`` is not already ``True``.
259 """
260 if self.done:
261 return
262
263 if not byte_str:
264 return
265
266 if not isinstance(byte_str, bytearray):
267 byte_str = bytearray(byte_str)
268
269 # First check for known BOMs, since these are guaranteed to be correct
270 if not self._got_data:
271 # If the data starts with BOM, we know it is UTF
272 if byte_str.startswith(codecs.BOM_UTF8):
273 # EF BB BF UTF-8 with BOM
274 self.result = {
275 "encoding": "UTF-8-SIG",
276 "confidence": 1.0,
277 "language": "",
278 }
279 elif byte_str.startswith((codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE)):
280 # FF FE 00 00 UTF-32, little-endian BOM
281 # 00 00 FE FF UTF-32, big-endian BOM
282 self.result = {"encoding": "UTF-32", "confidence": 1.0, "language": ""}
283 elif byte_str.startswith((codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE)):
284 # FF FE UTF-16, little endian BOM
285 # FE FF UTF-16, big endian BOM
286 self.result = {"encoding": "UTF-16", "confidence": 1.0, "language": ""}
287 else:
288 # Binary file detection - check for excessive null bytes early
289 # But UTF-16/32 have null bytes, so check for patterns first
290
291 # Check for no-BOM UTF-16/32 patterns (alternating nulls)
292 # UTF-32LE: XX 00 00 00 pattern (every 4th byte is null)
293 # UTF-32BE: 00 00 00 XX pattern (first 3 of 4 bytes are null)
294 # UTF-16LE: XX 00 pattern (every other byte is null in odd positions)
295 # UTF-16BE: 00 XX pattern (every other byte is null in even positions)
296 looks_like_utf16_32 = False
297
298 # Use larger sample for better pattern detection
299 sample_size = min(len(byte_str), 200)
300 if sample_size >= 50:
301 sample = byte_str[:sample_size]
302
303 # Count nulls in even and odd positions (for UTF-16 detection)
304 even_nulls = sum(
305 1 for i in range(0, sample_size, 2) if sample[i] == 0
306 )
307 odd_nulls = sum(
308 1 for i in range(1, sample_size, 2) if sample[i] == 0
309 )
310
311 # Check for UTF-32 patterns (more nulls in groups of 4)
312 # For UTF-32LE: positions 1,2,3 of every 4 bytes might be null
313 # For UTF-32BE: positions 0,1,2 of every 4 bytes might be null
314 if sample_size >= 100:
315 mod1_nulls = sum(
316 1 for i in range(1, sample_size, 4) if sample[i] == 0
317 )
318 mod2_nulls = sum(
319 1 for i in range(2, sample_size, 4) if sample[i] == 0
320 )
321 mod3_nulls = sum(
322 1 for i in range(3, sample_size, 4) if sample[i] == 0
323 )
324
325 # Strong UTF-32 signal: consistent null pattern in 2+ of the 3 positions
326 utf32_nulls = [mod1_nulls, mod2_nulls, mod3_nulls]
327 if sum(n > sample_size // 8 for n in utf32_nulls) >= 2:
328 looks_like_utf16_32 = True
329
330 # UTF-16 detection: significant nulls in even OR odd positions
331 # Lower threshold: 12% of positions (24 out of 200)
332 utf16_threshold = sample_size // 16
333 if even_nulls > utf16_threshold or odd_nulls > utf16_threshold:
334 looks_like_utf16_32 = True
335
336 if not looks_like_utf16_32:
337 # Sample first 8KB to detect binary files
338 check_size = min(len(byte_str), 8192)
339 null_count = byte_str[:check_size].count(0)
340
341 if null_count > check_size * 0.1: # >10% null bytes
342 # Likely a binary file, not text
343 self.result = {
344 "encoding": None,
345 "confidence": 0.0,
346 "language": "",
347 }
348 self.done = True
349 return
350
351 self._got_data = True
352 if self.result["encoding"] is not None:
353 self.done = True
354 return
355
356 # If none of those matched and we've only see ASCII so far, check
357 # for high bytes and escape sequences
358 if self._input_state == InputState.PURE_ASCII:
359 if self.HIGH_BYTE_DETECTOR.search(byte_str):
360 self._input_state = InputState.HIGH_BYTE
361 elif (
362 self._input_state == InputState.PURE_ASCII
363 and self.ESC_DETECTOR.search(self._last_char + byte_str)
364 ):
365 self._input_state = InputState.ESC_ASCII
366
367 self._last_char = byte_str[-1:]
368
369 # Track total bytes processed
370 self._total_bytes_fed += len(byte_str)
371
372 # Stop processing after processing enough data
373 # Don't set done=True here, let close() finalize the result
374 if self._total_bytes_fed > self.max_bytes:
375 return
376
377 # next we will look to see if it is appears to be either a UTF-16 or
378 # UTF-32 encoding
379 if not self._utf1632_prober:
380 self._utf1632_prober = UTF1632Prober()
381
382 if self._utf1632_prober.state == ProbingState.DETECTING:
383 if self._utf1632_prober.feed(byte_str) == ProbingState.FOUND_IT:
384 self.result = {
385 "encoding": self._utf1632_prober.charset_name,
386 "confidence": self._utf1632_prober.get_confidence(),
387 "language": "",
388 }
389 self.done = True
390 return
391
392 # If we've seen escape sequences, use the EscCharSetProber, which
393 # uses a simple state machine to check for known escape sequences in
394 # HZ and ISO-2022 encodings, since those are the only encodings that
395 # use such sequences.
396 if self._input_state == InputState.ESC_ASCII:
397 if not self._esc_charset_prober:
398 self._esc_charset_prober = EscCharSetProber(self.lang_filter)
399 if self._esc_charset_prober.feed(byte_str) == ProbingState.FOUND_IT:
400 self.result = {
401 "encoding": self._esc_charset_prober.charset_name,
402 "confidence": self._esc_charset_prober.get_confidence(),
403 "language": self._esc_charset_prober.language,
404 }
405 self.done = True
406 # If we've seen high bytes (i.e., those with values greater than 127),
407 # we need to do more complicated checks using all our multi-byte and
408 # single-byte probers that are left. The single-byte probers
409 # use character bigram distributions to determine the encoding, whereas
410 # the multi-byte probers use a combination of character unigram and
411 # bigram distributions.
412 elif self._input_state == InputState.HIGH_BYTE:
413 if not self._charset_probers:
414 self._charset_probers = [
415 MBCSGroupProber(
416 lang_filter=self.lang_filter, encoding_era=self.encoding_era
417 )
418 ]
419 # If we're checking non-CJK encodings, use single-byte prober
420 if self.lang_filter & LanguageFilter.NON_CJK:
421 self._charset_probers.append(
422 SBCSGroupProber(
423 encoding_era=self.encoding_era, lang_filter=self.lang_filter
424 )
425 )
426 for prober in self._charset_probers:
427 if prober.feed(byte_str) == ProbingState.FOUND_IT:
428 charset_name = prober.charset_name
429 # Rename legacy encodings if requested
430 if self.should_rename_legacy:
431 charset_name = self.LEGACY_MAP.get(
432 (charset_name or "").lower(), charset_name
433 )
434 self.result = {
435 "encoding": charset_name,
436 "confidence": prober.get_confidence(),
437 "language": prober.language,
438 }
439 self.done = True
440 break
441
442 def close(self) -> ResultDict:
443 """
444 Stop analyzing the current document and come up with a final
445 prediction.
446
447 :returns: The ``result`` attribute, a ``dict`` with the keys
448 `encoding`, `confidence`, and `language`.
449 """
450 # Don't bother with checks if we're already done
451 if self.done:
452 return self.result
453 self.done = True
454
455 if not self._got_data:
456 self.logger.debug("no data received!")
457
458 # Default to ASCII if it is all we've seen so far
459 elif self._input_state == InputState.PURE_ASCII:
460 self.result = {"encoding": "ascii", "confidence": 1.0, "language": ""}
461
462 # Check if escape prober found anything
463 elif self._input_state == InputState.ESC_ASCII:
464 if self._esc_charset_prober:
465 charset_name = self._esc_charset_prober.charset_name
466 if charset_name:
467 self.result = {
468 "encoding": charset_name,
469 "confidence": self._esc_charset_prober.get_confidence(),
470 "language": self._esc_charset_prober.language,
471 }
472 else:
473 # ESC prober didn't identify a specific encoding
474 # Since input is pure ASCII + ESC, default to UTF-8
475 self.result = {
476 "encoding": "utf-8",
477 "confidence": 1.0,
478 "language": "",
479 }
480
481 # If we have seen non-ASCII, return the best that met MINIMUM_THRESHOLD
482 elif self._input_state == InputState.HIGH_BYTE:
483 prober_confidence = None
484 max_prober_confidence = 0.0
485 max_prober = None
486 for prober in self._charset_probers:
487 if not prober:
488 continue
489 prober_confidence = prober.get_confidence()
490 if prober_confidence > max_prober_confidence:
491 max_prober_confidence = prober_confidence
492 max_prober = prober
493 if max_prober and (max_prober_confidence > self.MINIMUM_THRESHOLD):
494 charset_name = max_prober.charset_name
495 assert charset_name is not None
496 lower_charset_name = charset_name.lower()
497 confidence = max_prober.get_confidence()
498
499 # Find the actual winning nested prober (max_prober might be a group prober)
500 winning_nested_prober = None
501 for prober in self.nested_probers:
502 if (
503 prober
504 and prober.active
505 and prober.charset_name
506 and prober.charset_name.lower() == lower_charset_name
507 and abs(prober.get_confidence() - confidence) < 0.0001
508 ):
509 winning_nested_prober = prober
510 break
511
512 # Apply heuristic adjustments in a single pass over active probers
513 charset_name, confidence = self._apply_encoding_heuristics(
514 charset_name, confidence, winning_nested_prober or max_prober
515 )
516 # Rename legacy encodings with superset encodings if asked
517 if self.should_rename_legacy:
518 charset_name = self.LEGACY_MAP.get(
519 (charset_name or "").lower(), charset_name
520 )
521 self.result = {
522 "encoding": charset_name,
523 "confidence": confidence,
524 "language": max_prober.language,
525 }
526 else:
527 # Default to UTF-8 if no encoding met threshold AND UTF-8 prober
528 # hasn't determined this is NOT UTF-8
529 # UTF-8 is now the most common encoding on the web and a superset of ASCII
530 utf8_prober = self._get_utf8_prober()
531 if utf8_prober and utf8_prober.active:
532 # UTF-8 prober didn't rule it out, so default to UTF-8
533 self.result = {
534 "encoding": utf8_prober.charset_name,
535 "confidence": utf8_prober.get_confidence(),
536 "language": utf8_prober.language,
537 }
538 else:
539 # UTF-8 was ruled out, return None
540 self.result = {
541 "encoding": None,
542 "confidence": 0.0,
543 "language": None,
544 }
545
546 # Log all prober confidences if none met MINIMUM_THRESHOLD
547 if self.logger.getEffectiveLevel() <= logging.DEBUG:
548 if self.result["encoding"] is None:
549 self.logger.debug("no probers hit minimum threshold")
550 for prober in self.nested_probers:
551 if not prober:
552 continue
553 self.logger.debug(
554 "%s %s confidence = %s",
555 prober.charset_name,
556 prober.language,
557 prober.get_confidence(),
558 )
559 return self.result