1######################## BEGIN LICENSE BLOCK ######################## 
    2# The Original Code is Mozilla Universal charset detector code. 
    3# 
    4# The Initial Developer of the Original Code is 
    5# Netscape Communications Corporation. 
    6# Portions created by the Initial Developer are Copyright (C) 2001 
    7# the Initial Developer. All Rights Reserved. 
    8# 
    9# Contributor(s): 
    10#   Mark Pilgrim - port to Python 
    11#   Shy Shalom - original C code 
    12#   Proofpoint, Inc. 
    13# 
    14# This library is free software; you can redistribute it and/or 
    15# modify it under the terms of the GNU Lesser General Public 
    16# License as published by the Free Software Foundation; either 
    17# version 2.1 of the License, or (at your option) any later version. 
    18# 
    19# This library is distributed in the hope that it will be useful, 
    20# but WITHOUT ANY WARRANTY; without even the implied warranty of 
    21# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
    22# Lesser General Public License for more details. 
    23# 
    24# You should have received a copy of the GNU Lesser General Public 
    25# License along with this library; if not, write to the Free Software 
    26# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 
    27# 02110-1301  USA 
    28######################### END LICENSE BLOCK ######################### 
    29 
    30from typing import Optional, Union 
    31 
    32from .chardistribution import CharDistributionAnalysis 
    33from .charsetprober import CharSetProber 
    34from .codingstatemachine import CodingStateMachine 
    35from .enums import LanguageFilter, MachineState, ProbingState 
    36 
    37 
    38class MultiByteCharSetProber(CharSetProber): 
    39    """ 
    40    MultiByteCharSetProber 
    41    """ 
    42 
    43    def __init__(self, lang_filter: LanguageFilter = LanguageFilter.NONE) -> None: 
    44        super().__init__(lang_filter=lang_filter) 
    45        self.distribution_analyzer: Optional[CharDistributionAnalysis] = None 
    46        self.coding_sm: Optional[CodingStateMachine] = None 
    47        self._last_char = bytearray(b"\0\0") 
    48 
    49    def reset(self) -> None: 
    50        super().reset() 
    51        if self.coding_sm: 
    52            self.coding_sm.reset() 
    53        if self.distribution_analyzer: 
    54            self.distribution_analyzer.reset() 
    55        self._last_char = bytearray(b"\0\0") 
    56 
    57    def feed(self, byte_str: Union[bytes, bytearray]) -> ProbingState: 
    58        assert self.coding_sm is not None 
    59        assert self.distribution_analyzer is not None 
    60 
    61        for i, byte in enumerate(byte_str): 
    62            coding_state = self.coding_sm.next_state(byte) 
    63            if coding_state == MachineState.ERROR: 
    64                self.logger.debug( 
    65                    "%s %s prober hit error at byte %s", 
    66                    self.charset_name, 
    67                    self.language, 
    68                    i, 
    69                ) 
    70                self._state = ProbingState.NOT_ME 
    71                break 
    72            if coding_state == MachineState.ITS_ME: 
    73                self._state = ProbingState.FOUND_IT 
    74                break 
    75            if coding_state == MachineState.START: 
    76                char_len = self.coding_sm.get_current_charlen() 
    77                if i == 0: 
    78                    self._last_char[1] = byte 
    79                    self.distribution_analyzer.feed(self._last_char, char_len) 
    80                else: 
    81                    self.distribution_analyzer.feed(byte_str[i - 1 : i + 1], char_len) 
    82 
    83        self._last_char[0] = byte_str[-1] 
    84 
    85        if self.state == ProbingState.DETECTING: 
    86            if self.distribution_analyzer.got_enough_data() and ( 
    87                self.get_confidence() > self.SHORTCUT_THRESHOLD 
    88            ): 
    89                self._state = ProbingState.FOUND_IT 
    90 
    91        return self.state 
    92 
    93    def get_confidence(self) -> float: 
    94        assert self.distribution_analyzer is not None 
    95        return self.distribution_analyzer.get_confidence()