1from __future__ import annotations 
    2 
    3import logging 
    4from os import PathLike 
    5from typing import BinaryIO 
    6 
    7from .cd import ( 
    8    coherence_ratio, 
    9    encoding_languages, 
    10    mb_encoding_languages, 
    11    merge_coherence_ratios, 
    12) 
    13from .constant import IANA_SUPPORTED, TOO_BIG_SEQUENCE, TOO_SMALL_SEQUENCE, TRACE 
    14from .md import mess_ratio 
    15from .models import CharsetMatch, CharsetMatches 
    16from .utils import ( 
    17    any_specified_encoding, 
    18    cut_sequence_chunks, 
    19    iana_name, 
    20    identify_sig_or_bom, 
    21    is_cp_similar, 
    22    is_multi_byte_encoding, 
    23    should_strip_sig_or_bom, 
    24) 
    25 
    26logger = logging.getLogger("charset_normalizer") 
    27explain_handler = logging.StreamHandler() 
    28explain_handler.setFormatter( 
    29    logging.Formatter("%(asctime)s | %(levelname)s | %(message)s") 
    30) 
    31 
    32 
    33def from_bytes( 
    34    sequences: bytes | bytearray, 
    35    steps: int = 5, 
    36    chunk_size: int = 512, 
    37    threshold: float = 0.2, 
    38    cp_isolation: list[str] | None = None, 
    39    cp_exclusion: list[str] | None = None, 
    40    preemptive_behaviour: bool = True, 
    41    explain: bool = False, 
    42    language_threshold: float = 0.1, 
    43    enable_fallback: bool = True, 
    44) -> CharsetMatches: 
    45    """ 
    46    Given a raw bytes sequence, return the best possibles charset usable to render str objects. 
    47    If there is no results, it is a strong indicator that the source is binary/not text. 
    48    By default, the process will extract 5 blocks of 512o each to assess the mess and coherence of a given sequence. 
    49    And will give up a particular code page after 20% of measured mess. Those criteria are customizable at will. 
    50 
    51    The preemptive behavior DOES NOT replace the traditional detection workflow, it prioritize a particular code page 
    52    but never take it for granted. Can improve the performance. 
    53 
    54    You may want to focus your attention to some code page or/and not others, use cp_isolation and cp_exclusion for that 
    55    purpose. 
    56 
    57    This function will strip the SIG in the payload/sequence every time except on UTF-16, UTF-32. 
    58    By default the library does not setup any handler other than the NullHandler, if you choose to set the 'explain' 
    59    toggle to True it will alter the logger configuration to add a StreamHandler that is suitable for debugging. 
    60    Custom logging format and handler can be set manually. 
    61    """ 
    62 
    63    if not isinstance(sequences, (bytearray, bytes)): 
    64        raise TypeError( 
    65            "Expected object of type bytes or bytearray, got: {}".format( 
    66                type(sequences) 
    67            ) 
    68        ) 
    69 
    70    if explain: 
    71        previous_logger_level: int = logger.level 
    72        logger.addHandler(explain_handler) 
    73        logger.setLevel(TRACE) 
    74 
    75    length: int = len(sequences) 
    76 
    77    if length == 0: 
    78        logger.debug("Encoding detection on empty bytes, assuming utf_8 intention.") 
    79        if explain:  # Defensive: ensure exit path clean handler 
    80            logger.removeHandler(explain_handler) 
    81            logger.setLevel(previous_logger_level or logging.WARNING) 
    82        return CharsetMatches([CharsetMatch(sequences, "utf_8", 0.0, False, [], "")]) 
    83 
    84    if cp_isolation is not None: 
    85        logger.log( 
    86            TRACE, 
    87            "cp_isolation is set. use this flag for debugging purpose. " 
    88            "limited list of encoding allowed : %s.", 
    89            ", ".join(cp_isolation), 
    90        ) 
    91        cp_isolation = [iana_name(cp, False) for cp in cp_isolation] 
    92    else: 
    93        cp_isolation = [] 
    94 
    95    if cp_exclusion is not None: 
    96        logger.log( 
    97            TRACE, 
    98            "cp_exclusion is set. use this flag for debugging purpose. " 
    99            "limited list of encoding excluded : %s.", 
    100            ", ".join(cp_exclusion), 
    101        ) 
    102        cp_exclusion = [iana_name(cp, False) for cp in cp_exclusion] 
    103    else: 
    104        cp_exclusion = [] 
    105 
    106    if length <= (chunk_size * steps): 
    107        logger.log( 
    108            TRACE, 
    109            "override steps (%i) and chunk_size (%i) as content does not fit (%i byte(s) given) parameters.", 
    110            steps, 
    111            chunk_size, 
    112            length, 
    113        ) 
    114        steps = 1 
    115        chunk_size = length 
    116 
    117    if steps > 1 and length / steps < chunk_size: 
    118        chunk_size = int(length / steps) 
    119 
    120    is_too_small_sequence: bool = len(sequences) < TOO_SMALL_SEQUENCE 
    121    is_too_large_sequence: bool = len(sequences) >= TOO_BIG_SEQUENCE 
    122 
    123    if is_too_small_sequence: 
    124        logger.log( 
    125            TRACE, 
    126            "Trying to detect encoding from a tiny portion of ({}) byte(s).".format( 
    127                length 
    128            ), 
    129        ) 
    130    elif is_too_large_sequence: 
    131        logger.log( 
    132            TRACE, 
    133            "Using lazy str decoding because the payload is quite large, ({}) byte(s).".format( 
    134                length 
    135            ), 
    136        ) 
    137 
    138    prioritized_encodings: list[str] = [] 
    139 
    140    specified_encoding: str | None = ( 
    141        any_specified_encoding(sequences) if preemptive_behaviour else None 
    142    ) 
    143 
    144    if specified_encoding is not None: 
    145        prioritized_encodings.append(specified_encoding) 
    146        logger.log( 
    147            TRACE, 
    148            "Detected declarative mark in sequence. Priority +1 given for %s.", 
    149            specified_encoding, 
    150        ) 
    151 
    152    tested: set[str] = set() 
    153    tested_but_hard_failure: list[str] = [] 
    154    tested_but_soft_failure: list[str] = [] 
    155 
    156    fallback_ascii: CharsetMatch | None = None 
    157    fallback_u8: CharsetMatch | None = None 
    158    fallback_specified: CharsetMatch | None = None 
    159 
    160    results: CharsetMatches = CharsetMatches() 
    161 
    162    early_stop_results: CharsetMatches = CharsetMatches() 
    163 
    164    sig_encoding, sig_payload = identify_sig_or_bom(sequences) 
    165 
    166    if sig_encoding is not None: 
    167        prioritized_encodings.append(sig_encoding) 
    168        logger.log( 
    169            TRACE, 
    170            "Detected a SIG or BOM mark on first %i byte(s). Priority +1 given for %s.", 
    171            len(sig_payload), 
    172            sig_encoding, 
    173        ) 
    174 
    175    prioritized_encodings.append("ascii") 
    176 
    177    if "utf_8" not in prioritized_encodings: 
    178        prioritized_encodings.append("utf_8") 
    179 
    180    for encoding_iana in prioritized_encodings + IANA_SUPPORTED: 
    181        if cp_isolation and encoding_iana not in cp_isolation: 
    182            continue 
    183 
    184        if cp_exclusion and encoding_iana in cp_exclusion: 
    185            continue 
    186 
    187        if encoding_iana in tested: 
    188            continue 
    189 
    190        tested.add(encoding_iana) 
    191 
    192        decoded_payload: str | None = None 
    193        bom_or_sig_available: bool = sig_encoding == encoding_iana 
    194        strip_sig_or_bom: bool = bom_or_sig_available and should_strip_sig_or_bom( 
    195            encoding_iana 
    196        ) 
    197 
    198        if encoding_iana in {"utf_16", "utf_32"} and not bom_or_sig_available: 
    199            logger.log( 
    200                TRACE, 
    201                "Encoding %s won't be tested as-is because it require a BOM. Will try some sub-encoder LE/BE.", 
    202                encoding_iana, 
    203            ) 
    204            continue 
    205        if encoding_iana in {"utf_7"} and not bom_or_sig_available: 
    206            logger.log( 
    207                TRACE, 
    208                "Encoding %s won't be tested as-is because detection is unreliable without BOM/SIG.", 
    209                encoding_iana, 
    210            ) 
    211            continue 
    212 
    213        try: 
    214            is_multi_byte_decoder: bool = is_multi_byte_encoding(encoding_iana) 
    215        except (ModuleNotFoundError, ImportError): 
    216            logger.log( 
    217                TRACE, 
    218                "Encoding %s does not provide an IncrementalDecoder", 
    219                encoding_iana, 
    220            ) 
    221            continue 
    222 
    223        try: 
    224            if is_too_large_sequence and is_multi_byte_decoder is False: 
    225                str( 
    226                    ( 
    227                        sequences[: int(50e4)] 
    228                        if strip_sig_or_bom is False 
    229                        else sequences[len(sig_payload) : int(50e4)] 
    230                    ), 
    231                    encoding=encoding_iana, 
    232                ) 
    233            else: 
    234                decoded_payload = str( 
    235                    ( 
    236                        sequences 
    237                        if strip_sig_or_bom is False 
    238                        else sequences[len(sig_payload) :] 
    239                    ), 
    240                    encoding=encoding_iana, 
    241                ) 
    242        except (UnicodeDecodeError, LookupError) as e: 
    243            if not isinstance(e, LookupError): 
    244                logger.log( 
    245                    TRACE, 
    246                    "Code page %s does not fit given bytes sequence at ALL. %s", 
    247                    encoding_iana, 
    248                    str(e), 
    249                ) 
    250            tested_but_hard_failure.append(encoding_iana) 
    251            continue 
    252 
    253        similar_soft_failure_test: bool = False 
    254 
    255        for encoding_soft_failed in tested_but_soft_failure: 
    256            if is_cp_similar(encoding_iana, encoding_soft_failed): 
    257                similar_soft_failure_test = True 
    258                break 
    259 
    260        if similar_soft_failure_test: 
    261            logger.log( 
    262                TRACE, 
    263                "%s is deemed too similar to code page %s and was consider unsuited already. Continuing!", 
    264                encoding_iana, 
    265                encoding_soft_failed, 
    266            ) 
    267            continue 
    268 
    269        r_ = range( 
    270            0 if not bom_or_sig_available else len(sig_payload), 
    271            length, 
    272            int(length / steps), 
    273        ) 
    274 
    275        multi_byte_bonus: bool = ( 
    276            is_multi_byte_decoder 
    277            and decoded_payload is not None 
    278            and len(decoded_payload) < length 
    279        ) 
    280 
    281        if multi_byte_bonus: 
    282            logger.log( 
    283                TRACE, 
    284                "Code page %s is a multi byte encoding table and it appear that at least one character " 
    285                "was encoded using n-bytes.", 
    286                encoding_iana, 
    287            ) 
    288 
    289        max_chunk_gave_up: int = int(len(r_) / 4) 
    290 
    291        max_chunk_gave_up = max(max_chunk_gave_up, 2) 
    292        early_stop_count: int = 0 
    293        lazy_str_hard_failure = False 
    294 
    295        md_chunks: list[str] = [] 
    296        md_ratios = [] 
    297 
    298        try: 
    299            for chunk in cut_sequence_chunks( 
    300                sequences, 
    301                encoding_iana, 
    302                r_, 
    303                chunk_size, 
    304                bom_or_sig_available, 
    305                strip_sig_or_bom, 
    306                sig_payload, 
    307                is_multi_byte_decoder, 
    308                decoded_payload, 
    309            ): 
    310                md_chunks.append(chunk) 
    311 
    312                md_ratios.append( 
    313                    mess_ratio( 
    314                        chunk, 
    315                        threshold, 
    316                        explain is True and 1 <= len(cp_isolation) <= 2, 
    317                    ) 
    318                ) 
    319 
    320                if md_ratios[-1] >= threshold: 
    321                    early_stop_count += 1 
    322 
    323                if (early_stop_count >= max_chunk_gave_up) or ( 
    324                    bom_or_sig_available and strip_sig_or_bom is False 
    325                ): 
    326                    break 
    327        except ( 
    328            UnicodeDecodeError 
    329        ) as e:  # Lazy str loading may have missed something there 
    330            logger.log( 
    331                TRACE, 
    332                "LazyStr Loading: After MD chunk decode, code page %s does not fit given bytes sequence at ALL. %s", 
    333                encoding_iana, 
    334                str(e), 
    335            ) 
    336            early_stop_count = max_chunk_gave_up 
    337            lazy_str_hard_failure = True 
    338 
    339        # We might want to check the sequence again with the whole content 
    340        # Only if initial MD tests passes 
    341        if ( 
    342            not lazy_str_hard_failure 
    343            and is_too_large_sequence 
    344            and not is_multi_byte_decoder 
    345        ): 
    346            try: 
    347                sequences[int(50e3) :].decode(encoding_iana, errors="strict") 
    348            except UnicodeDecodeError as e: 
    349                logger.log( 
    350                    TRACE, 
    351                    "LazyStr Loading: After final lookup, code page %s does not fit given bytes sequence at ALL. %s", 
    352                    encoding_iana, 
    353                    str(e), 
    354                ) 
    355                tested_but_hard_failure.append(encoding_iana) 
    356                continue 
    357 
    358        mean_mess_ratio: float = sum(md_ratios) / len(md_ratios) if md_ratios else 0.0 
    359        if mean_mess_ratio >= threshold or early_stop_count >= max_chunk_gave_up: 
    360            tested_but_soft_failure.append(encoding_iana) 
    361            logger.log( 
    362                TRACE, 
    363                "%s was excluded because of initial chaos probing. Gave up %i time(s). " 
    364                "Computed mean chaos is %f %%.", 
    365                encoding_iana, 
    366                early_stop_count, 
    367                round(mean_mess_ratio * 100, ndigits=3), 
    368            ) 
    369            # Preparing those fallbacks in case we got nothing. 
    370            if ( 
    371                enable_fallback 
    372                and encoding_iana 
    373                in ["ascii", "utf_8", specified_encoding, "utf_16", "utf_32"] 
    374                and not lazy_str_hard_failure 
    375            ): 
    376                fallback_entry = CharsetMatch( 
    377                    sequences, 
    378                    encoding_iana, 
    379                    threshold, 
    380                    bom_or_sig_available, 
    381                    [], 
    382                    decoded_payload, 
    383                    preemptive_declaration=specified_encoding, 
    384                ) 
    385                if encoding_iana == specified_encoding: 
    386                    fallback_specified = fallback_entry 
    387                elif encoding_iana == "ascii": 
    388                    fallback_ascii = fallback_entry 
    389                else: 
    390                    fallback_u8 = fallback_entry 
    391            continue 
    392 
    393        logger.log( 
    394            TRACE, 
    395            "%s passed initial chaos probing. Mean measured chaos is %f %%", 
    396            encoding_iana, 
    397            round(mean_mess_ratio * 100, ndigits=3), 
    398        ) 
    399 
    400        if not is_multi_byte_decoder: 
    401            target_languages: list[str] = encoding_languages(encoding_iana) 
    402        else: 
    403            target_languages = mb_encoding_languages(encoding_iana) 
    404 
    405        if target_languages: 
    406            logger.log( 
    407                TRACE, 
    408                "{} should target any language(s) of {}".format( 
    409                    encoding_iana, str(target_languages) 
    410                ), 
    411            ) 
    412 
    413        cd_ratios = [] 
    414 
    415        # We shall skip the CD when its about ASCII 
    416        # Most of the time its not relevant to run "language-detection" on it. 
    417        if encoding_iana != "ascii": 
    418            for chunk in md_chunks: 
    419                chunk_languages = coherence_ratio( 
    420                    chunk, 
    421                    language_threshold, 
    422                    ",".join(target_languages) if target_languages else None, 
    423                ) 
    424 
    425                cd_ratios.append(chunk_languages) 
    426 
    427        cd_ratios_merged = merge_coherence_ratios(cd_ratios) 
    428 
    429        if cd_ratios_merged: 
    430            logger.log( 
    431                TRACE, 
    432                "We detected language {} using {}".format( 
    433                    cd_ratios_merged, encoding_iana 
    434                ), 
    435            ) 
    436 
    437        current_match = CharsetMatch( 
    438            sequences, 
    439            encoding_iana, 
    440            mean_mess_ratio, 
    441            bom_or_sig_available, 
    442            cd_ratios_merged, 
    443            ( 
    444                decoded_payload 
    445                if ( 
    446                    is_too_large_sequence is False 
    447                    or encoding_iana in [specified_encoding, "ascii", "utf_8"] 
    448                ) 
    449                else None 
    450            ), 
    451            preemptive_declaration=specified_encoding, 
    452        ) 
    453 
    454        results.append(current_match) 
    455 
    456        if ( 
    457            encoding_iana in [specified_encoding, "ascii", "utf_8"] 
    458            and mean_mess_ratio < 0.1 
    459        ): 
    460            # If md says nothing to worry about, then... stop immediately! 
    461            if mean_mess_ratio == 0.0: 
    462                logger.debug( 
    463                    "Encoding detection: %s is most likely the one.", 
    464                    current_match.encoding, 
    465                ) 
    466                if explain:  # Defensive: ensure exit path clean handler 
    467                    logger.removeHandler(explain_handler) 
    468                    logger.setLevel(previous_logger_level) 
    469                return CharsetMatches([current_match]) 
    470 
    471            early_stop_results.append(current_match) 
    472 
    473        if ( 
    474            len(early_stop_results) 
    475            and (specified_encoding is None or specified_encoding in tested) 
    476            and "ascii" in tested 
    477            and "utf_8" in tested 
    478        ): 
    479            probable_result: CharsetMatch = early_stop_results.best()  # type: ignore[assignment] 
    480            logger.debug( 
    481                "Encoding detection: %s is most likely the one.", 
    482                probable_result.encoding, 
    483            ) 
    484            if explain:  # Defensive: ensure exit path clean handler 
    485                logger.removeHandler(explain_handler) 
    486                logger.setLevel(previous_logger_level) 
    487 
    488            return CharsetMatches([probable_result]) 
    489 
    490        if encoding_iana == sig_encoding: 
    491            logger.debug( 
    492                "Encoding detection: %s is most likely the one as we detected a BOM or SIG within " 
    493                "the beginning of the sequence.", 
    494                encoding_iana, 
    495            ) 
    496            if explain:  # Defensive: ensure exit path clean handler 
    497                logger.removeHandler(explain_handler) 
    498                logger.setLevel(previous_logger_level) 
    499            return CharsetMatches([results[encoding_iana]]) 
    500 
    501    if len(results) == 0: 
    502        if fallback_u8 or fallback_ascii or fallback_specified: 
    503            logger.log( 
    504                TRACE, 
    505                "Nothing got out of the detection process. Using ASCII/UTF-8/Specified fallback.", 
    506            ) 
    507 
    508        if fallback_specified: 
    509            logger.debug( 
    510                "Encoding detection: %s will be used as a fallback match", 
    511                fallback_specified.encoding, 
    512            ) 
    513            results.append(fallback_specified) 
    514        elif ( 
    515            (fallback_u8 and fallback_ascii is None) 
    516            or ( 
    517                fallback_u8 
    518                and fallback_ascii 
    519                and fallback_u8.fingerprint != fallback_ascii.fingerprint 
    520            ) 
    521            or (fallback_u8 is not None) 
    522        ): 
    523            logger.debug("Encoding detection: utf_8 will be used as a fallback match") 
    524            results.append(fallback_u8) 
    525        elif fallback_ascii: 
    526            logger.debug("Encoding detection: ascii will be used as a fallback match") 
    527            results.append(fallback_ascii) 
    528 
    529    if results: 
    530        logger.debug( 
    531            "Encoding detection: Found %s as plausible (best-candidate) for content. With %i alternatives.", 
    532            results.best().encoding,  # type: ignore 
    533            len(results) - 1, 
    534        ) 
    535    else: 
    536        logger.debug("Encoding detection: Unable to determine any suitable charset.") 
    537 
    538    if explain: 
    539        logger.removeHandler(explain_handler) 
    540        logger.setLevel(previous_logger_level) 
    541 
    542    return results 
    543 
    544 
    545def from_fp( 
    546    fp: BinaryIO, 
    547    steps: int = 5, 
    548    chunk_size: int = 512, 
    549    threshold: float = 0.20, 
    550    cp_isolation: list[str] | None = None, 
    551    cp_exclusion: list[str] | None = None, 
    552    preemptive_behaviour: bool = True, 
    553    explain: bool = False, 
    554    language_threshold: float = 0.1, 
    555    enable_fallback: bool = True, 
    556) -> CharsetMatches: 
    557    """ 
    558    Same thing than the function from_bytes but using a file pointer that is already ready. 
    559    Will not close the file pointer. 
    560    """ 
    561    return from_bytes( 
    562        fp.read(), 
    563        steps, 
    564        chunk_size, 
    565        threshold, 
    566        cp_isolation, 
    567        cp_exclusion, 
    568        preemptive_behaviour, 
    569        explain, 
    570        language_threshold, 
    571        enable_fallback, 
    572    ) 
    573 
    574 
    575def from_path( 
    576    path: str | bytes | PathLike,  # type: ignore[type-arg] 
    577    steps: int = 5, 
    578    chunk_size: int = 512, 
    579    threshold: float = 0.20, 
    580    cp_isolation: list[str] | None = None, 
    581    cp_exclusion: list[str] | None = None, 
    582    preemptive_behaviour: bool = True, 
    583    explain: bool = False, 
    584    language_threshold: float = 0.1, 
    585    enable_fallback: bool = True, 
    586) -> CharsetMatches: 
    587    """ 
    588    Same thing than the function from_bytes but with one extra step. Opening and reading given file path in binary mode. 
    589    Can raise IOError. 
    590    """ 
    591    with open(path, "rb") as fp: 
    592        return from_fp( 
    593            fp, 
    594            steps, 
    595            chunk_size, 
    596            threshold, 
    597            cp_isolation, 
    598            cp_exclusion, 
    599            preemptive_behaviour, 
    600            explain, 
    601            language_threshold, 
    602            enable_fallback, 
    603        ) 
    604 
    605 
    606def is_binary( 
    607    fp_or_path_or_payload: PathLike | str | BinaryIO | bytes,  # type: ignore[type-arg] 
    608    steps: int = 5, 
    609    chunk_size: int = 512, 
    610    threshold: float = 0.20, 
    611    cp_isolation: list[str] | None = None, 
    612    cp_exclusion: list[str] | None = None, 
    613    preemptive_behaviour: bool = True, 
    614    explain: bool = False, 
    615    language_threshold: float = 0.1, 
    616    enable_fallback: bool = False, 
    617) -> bool: 
    618    """ 
    619    Detect if the given input (file, bytes, or path) points to a binary file. aka. not a string. 
    620    Based on the same main heuristic algorithms and default kwargs at the sole exception that fallbacks match 
    621    are disabled to be stricter around ASCII-compatible but unlikely to be a string. 
    622    """ 
    623    if isinstance(fp_or_path_or_payload, (str, PathLike)): 
    624        guesses = from_path( 
    625            fp_or_path_or_payload, 
    626            steps=steps, 
    627            chunk_size=chunk_size, 
    628            threshold=threshold, 
    629            cp_isolation=cp_isolation, 
    630            cp_exclusion=cp_exclusion, 
    631            preemptive_behaviour=preemptive_behaviour, 
    632            explain=explain, 
    633            language_threshold=language_threshold, 
    634            enable_fallback=enable_fallback, 
    635        ) 
    636    elif isinstance( 
    637        fp_or_path_or_payload, 
    638        ( 
    639            bytes, 
    640            bytearray, 
    641        ), 
    642    ): 
    643        guesses = from_bytes( 
    644            fp_or_path_or_payload, 
    645            steps=steps, 
    646            chunk_size=chunk_size, 
    647            threshold=threshold, 
    648            cp_isolation=cp_isolation, 
    649            cp_exclusion=cp_exclusion, 
    650            preemptive_behaviour=preemptive_behaviour, 
    651            explain=explain, 
    652            language_threshold=language_threshold, 
    653            enable_fallback=enable_fallback, 
    654        ) 
    655    else: 
    656        guesses = from_fp( 
    657            fp_or_path_or_payload, 
    658            steps=steps, 
    659            chunk_size=chunk_size, 
    660            threshold=threshold, 
    661            cp_isolation=cp_isolation, 
    662            cp_exclusion=cp_exclusion, 
    663            preemptive_behaviour=preemptive_behaviour, 
    664            explain=explain, 
    665            language_threshold=language_threshold, 
    666            enable_fallback=enable_fallback, 
    667        ) 
    668 
    669    return not guesses