1######################## BEGIN LICENSE BLOCK ########################
2# The Original Code is Mozilla Universal charset detector code.
3#
4# The Initial Developer of the Original Code is
5# Netscape Communications Corporation.
6# Portions created by the Initial Developer are Copyright (C) 2001
7# the Initial Developer. All Rights Reserved.
8#
9# Contributor(s):
10# Mark Pilgrim - port to Python
11# Shy Shalom - original C code
12# Proofpoint, Inc.
13#
14# This library is free software; you can redistribute it and/or
15# modify it under the terms of the GNU Lesser General Public
16# License as published by the Free Software Foundation; either
17# version 2.1 of the License, or (at your option) any later version.
18#
19# This library is distributed in the hope that it will be useful,
20# but WITHOUT ANY WARRANTY; without even the implied warranty of
21# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
22# Lesser General Public License for more details.
23#
24# You should have received a copy of the GNU Lesser General Public
25# License along with this library; if not, see
26# <https://www.gnu.org/licenses/>.
27######################### END LICENSE BLOCK #########################
28
29from typing import Optional, Union
30
31from .chardistribution import CharDistributionAnalysis
32from .charsetprober import CharSetProber
33from .codingstatemachine import CodingStateMachine
34from .enums import EncodingEra, LanguageFilter, MachineState, ProbingState
35
36
37class MultiByteCharSetProber(CharSetProber):
38 """
39 MultiByteCharSetProber
40 """
41
42 def __init__(
43 self,
44 lang_filter: LanguageFilter = LanguageFilter.ALL,
45 encoding_era: EncodingEra = EncodingEra.ALL,
46 ) -> None:
47 super().__init__(lang_filter=lang_filter, encoding_era=encoding_era)
48 self.distribution_analyzer: Optional[CharDistributionAnalysis] = None
49 self.coding_sm: Optional[CodingStateMachine] = None
50 self._last_char = bytearray(b"\0\0")
51
52 def reset(self) -> None:
53 super().reset()
54 if self.coding_sm:
55 self.coding_sm.reset()
56 if self.distribution_analyzer:
57 self.distribution_analyzer.reset()
58 self._last_char = bytearray(b"\0\0")
59
60 def feed(self, byte_str: Union[bytes, bytearray]) -> ProbingState:
61 assert self.coding_sm is not None
62 assert self.distribution_analyzer is not None
63
64 for i, byte in enumerate(byte_str):
65 coding_state = self.coding_sm.next_state(byte)
66 if coding_state == MachineState.ERROR:
67 self.logger.debug(
68 "%s %s prober hit error at byte %s",
69 self.charset_name,
70 self.language,
71 i,
72 )
73 self._state = ProbingState.NOT_ME
74 break
75 if coding_state == MachineState.ITS_ME:
76 self._state = ProbingState.FOUND_IT
77 break
78 if coding_state == MachineState.START:
79 char_len = self.coding_sm.get_current_charlen()
80 if i == 0:
81 self._last_char[1] = byte
82 self.distribution_analyzer.feed(self._last_char, char_len)
83 else:
84 self.distribution_analyzer.feed(byte_str[i - 1 : i + 1], char_len)
85
86 self._last_char[0] = byte_str[-1]
87
88 if self.state == ProbingState.DETECTING:
89 if self.distribution_analyzer.got_enough_data() and (
90 self.get_confidence() > self.SHORTCUT_THRESHOLD
91 ):
92 self._state = ProbingState.FOUND_IT
93
94 return self.state
95
96 def get_confidence(self) -> float:
97 assert self.distribution_analyzer is not None
98 return self.distribution_analyzer.get_confidence()