1"""Extract PDF text preserving the layout of the source PDF""" 
    2 
    3from collections.abc import Iterator 
    4from itertools import groupby 
    5from math import ceil 
    6from pathlib import Path 
    7from typing import Any, Literal, Optional, TypedDict 
    8 
    9from ..._utils import logger_warning 
    10from .. import LAYOUT_NEW_BT_GROUP_SPACE_WIDTHS 
    11from ._font import Font 
    12from ._text_state_manager import TextStateManager 
    13from ._text_state_params import TextStateParams 
    14 
    15 
    16class BTGroup(TypedDict): 
    17    """ 
    18    Dict describing a line of text rendered within a BT/ET operator pair. 
    19    If multiple text show operations render text on the same line, the text 
    20    will be combined into a single BTGroup dict. 
    21 
    22    Keys: 
    23        tx: x coordinate of first character in BTGroup 
    24        ty: y coordinate of first character in BTGroup 
    25        font_size: nominal font size 
    26        font_height: effective font height 
    27        text: rendered text 
    28        displaced_tx: x coordinate of last character in BTGroup 
    29        flip_sort: -1 if page is upside down, else 1 
    30    """ 
    31 
    32    tx: float 
    33    ty: float 
    34    font_size: float 
    35    font_height: float 
    36    text: str 
    37    displaced_tx: float 
    38    flip_sort: Literal[-1, 1] 
    39 
    40 
    41def bt_group(tj_op: TextStateParams, rendered_text: str, dispaced_tx: float) -> BTGroup: 
    42    """ 
    43    BTGroup constructed from a TextStateParams instance, rendered text, and 
    44    displaced tx value. 
    45 
    46    Args: 
    47        tj_op (TextStateParams): TextStateParams instance 
    48        rendered_text (str): rendered text 
    49        dispaced_tx (float): x coordinate of last character in BTGroup 
    50 
    51    """ 
    52    return BTGroup( 
    53        tx=tj_op.tx, 
    54        ty=tj_op.ty, 
    55        font_size=tj_op.font_size, 
    56        font_height=tj_op.font_height, 
    57        text=rendered_text, 
    58        displaced_tx=dispaced_tx, 
    59        flip_sort=-1 if tj_op.flip_vertical else 1, 
    60    ) 
    61 
    62 
    63def recurs_to_target_op( 
    64    ops: Iterator[tuple[list[Any], bytes]], 
    65    text_state_mgr: TextStateManager, 
    66    end_target: Literal[b"Q", b"ET"], 
    67    fonts: dict[str, Font], 
    68    strip_rotated: bool = True, 
    69) -> tuple[list[BTGroup], list[TextStateParams]]: 
    70    """ 
    71    Recurse operators between BT/ET and/or q/Q operators managing the transform 
    72    stack and capturing text positioning and rendering data. 
    73 
    74    Args: 
    75        ops: iterator of operators in content stream 
    76        text_state_mgr: a TextStateManager instance 
    77        end_target: Either b"Q" (ends b"q" op) or b"ET" (ends b"BT" op) 
    78        fonts: font dictionary as returned by PageObject._layout_mode_fonts() 
    79 
    80    Returns: 
    81        tuple: list of BTGroup dicts + list of TextStateParams dataclass instances. 
    82 
    83    """ 
    84    # 1 entry per line of text rendered within each BT/ET operation. 
    85    bt_groups: list[BTGroup] = [] 
    86 
    87    # 1 entry per text show operator (Tj/TJ/'/") 
    88    tj_ops: list[TextStateParams] = [] 
    89 
    90    if end_target == b"Q": 
    91        # add new q level. cm's added at this level will be popped at next b'Q' 
    92        text_state_mgr.add_q() 
    93 
    94    for operands, op in ops: 
    95        # The loop is broken by the end target, or exits normally when there are no more ops. 
    96        if op == end_target: 
    97            if op == b"Q": 
    98                text_state_mgr.remove_q() 
    99            if op == b"ET": 
    100                if not tj_ops: 
    101                    return bt_groups, tj_ops 
    102                _text = "" 
    103                bt_idx = 0  # idx of first tj in this bt group 
    104                last_displaced_tx = tj_ops[bt_idx].displaced_tx 
    105                last_ty = tj_ops[bt_idx].ty 
    106                for _idx, _tj in enumerate( 
    107                    tj_ops 
    108                ):  # ... build text from new Tj operators 
    109                    if strip_rotated and _tj.rotated: 
    110                        continue 
    111                    if not _tj.font.interpretable:  # generates warning 
    112                        continue 
    113                    # if the y position of the text is greater than the font height, assume 
    114                    # the text is on a new line and start a new group 
    115                    if abs(_tj.ty - last_ty) > _tj.font_height: 
    116                        if _text.strip(): 
    117                            bt_groups.append( 
    118                                bt_group(tj_ops[bt_idx], _text, last_displaced_tx) 
    119                            ) 
    120                        bt_idx = _idx 
    121                        _text = "" 
    122 
    123                    # if the x position of the text is less than the last x position by 
    124                    # more than 5 spaces widths, assume the text order should be flipped 
    125                    # and start a new group 
    126                    if ( 
    127                        last_displaced_tx - _tj.tx 
    128                        > _tj.space_tx * LAYOUT_NEW_BT_GROUP_SPACE_WIDTHS 
    129                    ): 
    130                        if _text.strip(): 
    131                            bt_groups.append( 
    132                                bt_group(tj_ops[bt_idx], _text, last_displaced_tx) 
    133                            ) 
    134                        bt_idx = _idx 
    135                        last_displaced_tx = _tj.displaced_tx 
    136                        _text = "" 
    137 
    138                    # calculate excess x translation based on ending tx of previous Tj. 
    139                    # multiply by bool (_idx != bt_idx) to ensure spaces aren't double 
    140                    # applied to the first tj of a BTGroup in fixed_width_page(). 
    141                    excess_tx = round(_tj.tx - last_displaced_tx, 3) * (_idx != bt_idx) 
    142                    # space_tx could be 0 if either Tz or font_size was 0 for this _tj. 
    143                    spaces = int(excess_tx // _tj.space_tx) if _tj.space_tx else 0 
    144                    new_text = f'{" " * spaces}{_tj.txt}' 
    145 
    146                    last_ty = _tj.ty 
    147                    _text = f"{_text}{new_text}" 
    148                    last_displaced_tx = _tj.displaced_tx 
    149                if _text: 
    150                    bt_groups.append(bt_group(tj_ops[bt_idx], _text, last_displaced_tx)) 
    151                text_state_mgr.reset_tm() 
    152            break 
    153        if op == b"q": 
    154            bts, tjs = recurs_to_target_op( 
    155                ops, text_state_mgr, b"Q", fonts, strip_rotated 
    156            ) 
    157            bt_groups.extend(bts) 
    158            tj_ops.extend(tjs) 
    159        elif op == b"cm": 
    160            text_state_mgr.add_cm(*operands) 
    161        elif op == b"BT": 
    162            bts, tjs = recurs_to_target_op( 
    163                ops, text_state_mgr, b"ET", fonts, strip_rotated 
    164            ) 
    165            bt_groups.extend(bts) 
    166            tj_ops.extend(tjs) 
    167        elif op == b"Tj": 
    168            tj_ops.append(text_state_mgr.text_state_params(operands[0])) 
    169        elif op == b"TJ": 
    170            _tj = text_state_mgr.text_state_params() 
    171            for tj_op in operands[0]: 
    172                if isinstance(tj_op, bytes): 
    173                    _tj = text_state_mgr.text_state_params(tj_op) 
    174                    tj_ops.append(_tj) 
    175                else: 
    176                    text_state_mgr.add_trm(_tj.displacement_matrix(TD_offset=tj_op)) 
    177        elif op == b"'": 
    178            text_state_mgr.reset_trm() 
    179            text_state_mgr.add_tm([0, -text_state_mgr.TL]) 
    180            tj_ops.append(text_state_mgr.text_state_params(operands[0])) 
    181        elif op == b'"': 
    182            text_state_mgr.reset_trm() 
    183            text_state_mgr.set_state_param(b"Tw", operands[0]) 
    184            text_state_mgr.set_state_param(b"Tc", operands[1]) 
    185            text_state_mgr.add_tm([0, -text_state_mgr.TL]) 
    186            tj_ops.append(text_state_mgr.text_state_params(operands[2])) 
    187        elif op in (b"Td", b"Tm", b"TD", b"T*"): 
    188            text_state_mgr.reset_trm() 
    189            if op == b"Tm": 
    190                text_state_mgr.reset_tm() 
    191            elif op == b"TD": 
    192                text_state_mgr.set_state_param(b"TL", -operands[1]) 
    193            elif op == b"T*": 
    194                operands = [0, -text_state_mgr.TL] 
    195            text_state_mgr.add_tm(operands) 
    196        elif op == b"Tf": 
    197            text_state_mgr.set_font(fonts[operands[0]], operands[1]) 
    198        else:  # handle Tc, Tw, Tz, TL, and Ts operators 
    199            text_state_mgr.set_state_param(op, operands) 
    200    else: 
    201        logger_warning( 
    202            f"Unbalanced target operations, expected {end_target!r}.", 
    203            __name__, 
    204        ) 
    205    return bt_groups, tj_ops 
    206 
    207 
    208def y_coordinate_groups( 
    209    bt_groups: list[BTGroup], debug_path: Optional[Path] = None 
    210) -> dict[int, list[BTGroup]]: 
    211    """ 
    212    Group text operations by rendered y coordinate, i.e. the line number. 
    213 
    214    Args: 
    215        bt_groups: list of dicts as returned by text_show_operations() 
    216        debug_path (Path, optional): Path to a directory for saving debug output. 
    217 
    218    Returns: 
    219        Dict[int, List[BTGroup]]: dict of lists of text rendered by each BT operator 
    220            keyed by y coordinate 
    221 
    222    """ 
    223    ty_groups = { 
    224        ty: sorted(grp, key=lambda x: x["tx"]) 
    225        for ty, grp in groupby( 
    226            bt_groups, key=lambda bt_grp: int(bt_grp["ty"] * bt_grp["flip_sort"]) 
    227        ) 
    228    } 
    229    # combine groups whose y coordinates differ by less than the effective font height 
    230    # (accounts for mixed fonts and other minor oddities) 
    231    last_ty = next(iter(ty_groups)) 
    232    last_txs = {int(_t["tx"]) for _t in ty_groups[last_ty] if _t["text"].strip()} 
    233    for ty in list(ty_groups)[1:]: 
    234        fsz = min(ty_groups[_y][0]["font_height"] for _y in (ty, last_ty)) 
    235        txs = {int(_t["tx"]) for _t in ty_groups[ty] if _t["text"].strip()} 
    236        # prevent merge if both groups are rendering in the same x position. 
    237        no_text_overlap = not (txs & last_txs) 
    238        offset_less_than_font_height = abs(ty - last_ty) < fsz 
    239        if no_text_overlap and offset_less_than_font_height: 
    240            ty_groups[last_ty] = sorted( 
    241                ty_groups.pop(ty) + ty_groups[last_ty], key=lambda x: x["tx"] 
    242            ) 
    243            last_txs |= txs 
    244        else: 
    245            last_ty = ty 
    246            last_txs = txs 
    247    if debug_path:  # pragma: no cover 
    248        import json  # noqa: PLC0415 
    249 
    250        debug_path.joinpath("bt_groups.json").write_text( 
    251            json.dumps(ty_groups, indent=2, default=str), "utf-8" 
    252        ) 
    253    return ty_groups 
    254 
    255 
    256def text_show_operations( 
    257    ops: Iterator[tuple[list[Any], bytes]], 
    258    fonts: dict[str, Font], 
    259    strip_rotated: bool = True, 
    260    debug_path: Optional[Path] = None, 
    261) -> list[BTGroup]: 
    262    """ 
    263    Extract text from BT/ET operator pairs. 
    264 
    265    Args: 
    266        ops (Iterator[Tuple[List, bytes]]): iterator of operators in content stream 
    267        fonts (Dict[str, Font]): font dictionary 
    268        strip_rotated: Removes text if rotated w.r.t. to the page. Defaults to True. 
    269        debug_path (Path, optional): Path to a directory for saving debug output. 
    270 
    271    Returns: 
    272        List[BTGroup]: list of dicts of text rendered by each BT operator 
    273 
    274    """ 
    275    state_mgr = TextStateManager()  # transformation stack manager 
    276    bt_groups: list[BTGroup] = []  # BT operator dict 
    277    tj_ops: list[TextStateParams] = []  # Tj/TJ operator data 
    278    for operands, op in ops: 
    279        if op in (b"BT", b"q"): 
    280            bts, tjs = recurs_to_target_op( 
    281                ops, state_mgr, b"ET" if op == b"BT" else b"Q", fonts, strip_rotated 
    282            ) 
    283            bt_groups.extend(bts) 
    284            tj_ops.extend(tjs) 
    285        elif op == b"Tf": 
    286            state_mgr.set_font(fonts[operands[0]], operands[1]) 
    287        else:  # set Tc, Tw, Tz, TL, and Ts if required. ignores all other ops 
    288            state_mgr.set_state_param(op, operands) 
    289 
    290    if any(tj.rotated for tj in tj_ops): 
    291        if strip_rotated: 
    292            logger_warning( 
    293                "Rotated text discovered. Output will be incomplete.", __name__ 
    294            ) 
    295        else: 
    296            logger_warning( 
    297                "Rotated text discovered. Layout will be degraded.", __name__ 
    298            ) 
    299    if not all(tj.font.interpretable for tj in tj_ops): 
    300        logger_warning( 
    301            "PDF contains an uninterpretable font. Output will be incomplete.", __name__ 
    302        ) 
    303 
    304    # left align the data, i.e. decrement all tx values by min(tx) 
    305    min_x = min((x["tx"] for x in bt_groups), default=0.0) 
    306    bt_groups = [ 
    307        dict(ogrp, tx=ogrp["tx"] - min_x, displaced_tx=ogrp["displaced_tx"] - min_x)  # type: ignore[misc] 
    308        for ogrp in sorted( 
    309            bt_groups, key=lambda x: (x["ty"] * x["flip_sort"], -x["tx"]), reverse=True 
    310        ) 
    311    ] 
    312 
    313    if debug_path:  # pragma: no cover 
    314        import json  # noqa: PLC0415 
    315 
    316        debug_path.joinpath("bts.json").write_text( 
    317            json.dumps(bt_groups, indent=2, default=str), "utf-8" 
    318        ) 
    319        debug_path.joinpath("tjs.json").write_text( 
    320            json.dumps( 
    321                tj_ops, indent=2, default=lambda x: getattr(x, "to_dict", str)(x) 
    322            ), 
    323            "utf-8", 
    324        ) 
    325    return bt_groups 
    326 
    327 
    328def fixed_char_width(bt_groups: list[BTGroup], scale_weight: float = 1.25) -> float: 
    329    """ 
    330    Calculate average character width weighted by the length of the rendered 
    331    text in each sample for conversion to fixed-width layout. 
    332 
    333    Args: 
    334        bt_groups (List[BTGroup]): List of dicts of text rendered by each 
    335            BT operator 
    336 
    337    Returns: 
    338        float: fixed character width 
    339 
    340    """ 
    341    char_widths = [] 
    342    for _bt in bt_groups: 
    343        _len = len(_bt["text"]) * scale_weight 
    344        char_widths.append(((_bt["displaced_tx"] - _bt["tx"]) / _len, _len)) 
    345    return sum(_w * _l for _w, _l in char_widths) / sum(_l for _, _l in char_widths) 
    346 
    347 
    348def fixed_width_page( 
    349    ty_groups: dict[int, list[BTGroup]], char_width: float, space_vertically: bool, font_height_weight: float 
    350) -> str: 
    351    """ 
    352    Generate page text from text operations grouped by rendered y coordinate. 
    353 
    354    Args: 
    355        ty_groups: dict of text show ops as returned by y_coordinate_groups() 
    356        char_width: fixed character width 
    357        space_vertically: include blank lines inferred from y distance + font height. 
    358        font_height_weight: multiplier for font height when calculating blank lines. 
    359 
    360    Returns: 
    361        str: page text in a fixed width format that closely adheres to the rendered 
    362            layout in the source pdf. 
    363 
    364    """ 
    365    lines: list[str] = [] 
    366    last_y_coord = 0 
    367    for y_coord, line_data in ty_groups.items(): 
    368        if space_vertically and lines: 
    369            fh = line_data[0]["font_height"] 
    370            blank_lines = 0 if fh == 0 else ( 
    371                int(abs(y_coord - last_y_coord) / (fh * font_height_weight)) - 1 
    372            ) 
    373            lines.extend([""] * blank_lines) 
    374        line = "" 
    375        last_disp = 0.0 
    376        for bt_op in line_data: 
    377            offset = int(bt_op["tx"] // char_width) 
    378            spaces = (offset - len(line)) * (ceil(last_disp) < int(bt_op["tx"])) 
    379            line = f"{line}{' ' * spaces}{bt_op['text']}" 
    380            last_disp = bt_op["displaced_tx"] 
    381        if line.strip() or lines: 
    382            lines.append( 
    383                "".join(c if ord(c) < 14 or ord(c) > 31 else " " for c in line) 
    384            ) 
    385        last_y_coord = y_coord 
    386    return "\n".join(ln.rstrip() for ln in lines if space_vertically or ln.strip())