1"""Extract PDF text preserving the layout of the source PDF"""
2
3from itertools import groupby
4from math import ceil
5from pathlib import Path
6from typing import Any, Dict, Iterator, List, Literal, Optional, Tuple, TypedDict
7
8from ..._utils import logger_warning
9from .. import LAYOUT_NEW_BT_GROUP_SPACE_WIDTHS
10from ._font import Font
11from ._text_state_manager import TextStateManager
12from ._text_state_params import TextStateParams
13
14
15class BTGroup(TypedDict):
16 """
17 Dict describing a line of text rendered within a BT/ET operator pair.
18 If multiple text show operations render text on the same line, the text
19 will be combined into a single BTGroup dict.
20
21 Keys:
22 tx: x coordinate of first character in BTGroup
23 ty: y coordinate of first character in BTGroup
24 font_size: nominal font size
25 font_height: effective font height
26 text: rendered text
27 displaced_tx: x coordinate of last character in BTGroup
28 flip_sort: -1 if page is upside down, else 1
29 """
30
31 tx: float
32 ty: float
33 font_size: float
34 font_height: float
35 text: str
36 displaced_tx: float
37 flip_sort: Literal[-1, 1]
38
39
40def bt_group(tj_op: TextStateParams, rendered_text: str, dispaced_tx: float) -> BTGroup:
41 """
42 BTGroup constructed from a TextStateParams instance, rendered text, and
43 displaced tx value.
44
45 Args:
46 tj_op (TextStateParams): TextStateParams instance
47 rendered_text (str): rendered text
48 dispaced_tx (float): x coordinate of last character in BTGroup
49
50 """
51 return BTGroup(
52 tx=tj_op.tx,
53 ty=tj_op.ty,
54 font_size=tj_op.font_size,
55 font_height=tj_op.font_height,
56 text=rendered_text,
57 displaced_tx=dispaced_tx,
58 flip_sort=-1 if tj_op.flip_vertical else 1,
59 )
60
61
62def recurs_to_target_op(
63 ops: Iterator[Tuple[List[Any], bytes]],
64 text_state_mgr: TextStateManager,
65 end_target: Literal[b"Q", b"ET"],
66 fonts: Dict[str, Font],
67 strip_rotated: bool = True,
68) -> Tuple[List[BTGroup], List[TextStateParams]]:
69 """
70 Recurse operators between BT/ET and/or q/Q operators managing the transform
71 stack and capturing text positioning and rendering data.
72
73 Args:
74 ops: iterator of operators in content stream
75 text_state_mgr: a TextStateManager instance
76 end_target: Either b"Q" (ends b"q" op) or b"ET" (ends b"BT" op)
77 fonts: font dictionary as returned by PageObject._layout_mode_fonts()
78
79 Returns:
80 tuple: list of BTGroup dicts + list of TextStateParams dataclass instances.
81
82 """
83 # 1 entry per line of text rendered within each BT/ET operation.
84 bt_groups: List[BTGroup] = []
85
86 # 1 entry per text show operator (Tj/TJ/'/")
87 tj_ops: List[TextStateParams] = []
88
89 if end_target == b"Q":
90 # add new q level. cm's added at this level will be popped at next b'Q'
91 text_state_mgr.add_q()
92
93 for operands, op in ops:
94 # The loop is broken by the end target, or exits normally when there are no more ops.
95 if op == end_target:
96 if op == b"Q":
97 text_state_mgr.remove_q()
98 if op == b"ET":
99 if not tj_ops:
100 return bt_groups, tj_ops
101 _text = ""
102 bt_idx = 0 # idx of first tj in this bt group
103 last_displaced_tx = tj_ops[bt_idx].displaced_tx
104 last_ty = tj_ops[bt_idx].ty
105 for _idx, _tj in enumerate(
106 tj_ops
107 ): # ... build text from new Tj operators
108 if strip_rotated and _tj.rotated:
109 continue
110 if not _tj.font.interpretable: # generates warning
111 continue
112 # if the y position of the text is greater than the font height, assume
113 # the text is on a new line and start a new group
114 if abs(_tj.ty - last_ty) > _tj.font_height:
115 if _text.strip():
116 bt_groups.append(
117 bt_group(tj_ops[bt_idx], _text, last_displaced_tx)
118 )
119 bt_idx = _idx
120 _text = ""
121
122 # if the x position of the text is less than the last x position by
123 # more than 5 spaces widths, assume the text order should be flipped
124 # and start a new group
125 if (
126 last_displaced_tx - _tj.tx
127 > _tj.space_tx * LAYOUT_NEW_BT_GROUP_SPACE_WIDTHS
128 ):
129 if _text.strip():
130 bt_groups.append(
131 bt_group(tj_ops[bt_idx], _text, last_displaced_tx)
132 )
133 bt_idx = _idx
134 last_displaced_tx = _tj.displaced_tx
135 _text = ""
136
137 # calculate excess x translation based on ending tx of previous Tj.
138 # multiply by bool (_idx != bt_idx) to ensure spaces aren't double
139 # applied to the first tj of a BTGroup in fixed_width_page().
140 excess_tx = round(_tj.tx - last_displaced_tx, 3) * (_idx != bt_idx)
141 # space_tx could be 0 if either Tz or font_size was 0 for this _tj.
142 spaces = int(excess_tx // _tj.space_tx) if _tj.space_tx else 0
143 new_text = f'{" " * spaces}{_tj.txt}'
144
145 last_ty = _tj.ty
146 _text = f"{_text}{new_text}"
147 last_displaced_tx = _tj.displaced_tx
148 if _text:
149 bt_groups.append(bt_group(tj_ops[bt_idx], _text, last_displaced_tx))
150 text_state_mgr.reset_tm()
151 break
152 if op == b"q":
153 bts, tjs = recurs_to_target_op(
154 ops, text_state_mgr, b"Q", fonts, strip_rotated
155 )
156 bt_groups.extend(bts)
157 tj_ops.extend(tjs)
158 elif op == b"cm":
159 text_state_mgr.add_cm(*operands)
160 elif op == b"BT":
161 bts, tjs = recurs_to_target_op(
162 ops, text_state_mgr, b"ET", fonts, strip_rotated
163 )
164 bt_groups.extend(bts)
165 tj_ops.extend(tjs)
166 elif op == b"Tj":
167 tj_ops.append(text_state_mgr.text_state_params(operands[0]))
168 elif op == b"TJ":
169 _tj = text_state_mgr.text_state_params()
170 for tj_op in operands[0]:
171 if isinstance(tj_op, bytes):
172 _tj = text_state_mgr.text_state_params(tj_op)
173 tj_ops.append(_tj)
174 else:
175 text_state_mgr.add_trm(_tj.displacement_matrix(TD_offset=tj_op))
176 elif op == b"'":
177 text_state_mgr.reset_trm()
178 text_state_mgr.add_tm([0, -text_state_mgr.TL])
179 tj_ops.append(text_state_mgr.text_state_params(operands[0]))
180 elif op == b'"':
181 text_state_mgr.reset_trm()
182 text_state_mgr.set_state_param(b"Tw", operands[0])
183 text_state_mgr.set_state_param(b"Tc", operands[1])
184 text_state_mgr.add_tm([0, -text_state_mgr.TL])
185 tj_ops.append(text_state_mgr.text_state_params(operands[2]))
186 elif op in (b"Td", b"Tm", b"TD", b"T*"):
187 text_state_mgr.reset_trm()
188 if op == b"Tm":
189 text_state_mgr.reset_tm()
190 elif op == b"TD":
191 text_state_mgr.set_state_param(b"TL", -operands[1])
192 elif op == b"T*":
193 operands = [0, -text_state_mgr.TL]
194 text_state_mgr.add_tm(operands)
195 elif op == b"Tf":
196 text_state_mgr.set_font(fonts[operands[0]], operands[1])
197 else: # handle Tc, Tw, Tz, TL, and Ts operators
198 text_state_mgr.set_state_param(op, operands)
199 else:
200 logger_warning(
201 f"Unbalanced target operations, expected {end_target!r}.",
202 __name__,
203 )
204 return bt_groups, tj_ops
205
206
207def y_coordinate_groups(
208 bt_groups: List[BTGroup], debug_path: Optional[Path] = None
209) -> Dict[int, List[BTGroup]]:
210 """
211 Group text operations by rendered y coordinate, i.e. the line number.
212
213 Args:
214 bt_groups: list of dicts as returned by text_show_operations()
215 debug_path (Path, optional): Path to a directory for saving debug output.
216
217 Returns:
218 Dict[int, List[BTGroup]]: dict of lists of text rendered by each BT operator
219 keyed by y coordinate
220
221 """
222 ty_groups = {
223 ty: sorted(grp, key=lambda x: x["tx"])
224 for ty, grp in groupby(
225 bt_groups, key=lambda bt_grp: int(bt_grp["ty"] * bt_grp["flip_sort"])
226 )
227 }
228 # combine groups whose y coordinates differ by less than the effective font height
229 # (accounts for mixed fonts and other minor oddities)
230 last_ty = next(iter(ty_groups))
231 last_txs = {int(_t["tx"]) for _t in ty_groups[last_ty] if _t["text"].strip()}
232 for ty in list(ty_groups)[1:]:
233 fsz = min(ty_groups[_y][0]["font_height"] for _y in (ty, last_ty))
234 txs = {int(_t["tx"]) for _t in ty_groups[ty] if _t["text"].strip()}
235 # prevent merge if both groups are rendering in the same x position.
236 no_text_overlap = not (txs & last_txs)
237 offset_less_than_font_height = abs(ty - last_ty) < fsz
238 if no_text_overlap and offset_less_than_font_height:
239 ty_groups[last_ty] = sorted(
240 ty_groups.pop(ty) + ty_groups[last_ty], key=lambda x: x["tx"]
241 )
242 last_txs |= txs
243 else:
244 last_ty = ty
245 last_txs = txs
246 if debug_path: # pragma: no cover
247 import json # noqa: PLC0415
248
249 debug_path.joinpath("bt_groups.json").write_text(
250 json.dumps(ty_groups, indent=2, default=str), "utf-8"
251 )
252 return ty_groups
253
254
255def text_show_operations(
256 ops: Iterator[Tuple[List[Any], bytes]],
257 fonts: Dict[str, Font],
258 strip_rotated: bool = True,
259 debug_path: Optional[Path] = None,
260) -> List[BTGroup]:
261 """
262 Extract text from BT/ET operator pairs.
263
264 Args:
265 ops (Iterator[Tuple[List, bytes]]): iterator of operators in content stream
266 fonts (Dict[str, Font]): font dictionary
267 strip_rotated: Removes text if rotated w.r.t. to the page. Defaults to True.
268 debug_path (Path, optional): Path to a directory for saving debug output.
269
270 Returns:
271 List[BTGroup]: list of dicts of text rendered by each BT operator
272
273 """
274 state_mgr = TextStateManager() # transformation stack manager
275 bt_groups: List[BTGroup] = [] # BT operator dict
276 tj_ops: List[TextStateParams] = [] # Tj/TJ operator data
277 for operands, op in ops:
278 if op in (b"BT", b"q"):
279 bts, tjs = recurs_to_target_op(
280 ops, state_mgr, b"ET" if op == b"BT" else b"Q", fonts, strip_rotated
281 )
282 bt_groups.extend(bts)
283 tj_ops.extend(tjs)
284 elif op == b"Tf":
285 state_mgr.set_font(fonts[operands[0]], operands[1])
286 else: # set Tc, Tw, Tz, TL, and Ts if required. ignores all other ops
287 state_mgr.set_state_param(op, operands)
288
289 if any(tj.rotated for tj in tj_ops):
290 if strip_rotated:
291 logger_warning(
292 "Rotated text discovered. Output will be incomplete.", __name__
293 )
294 else:
295 logger_warning(
296 "Rotated text discovered. Layout will be degraded.", __name__
297 )
298 if not all(tj.font.interpretable for tj in tj_ops):
299 logger_warning(
300 "PDF contains an uninterpretable font. Output will be incomplete.", __name__
301 )
302
303 # left align the data, i.e. decrement all tx values by min(tx)
304 min_x = min((x["tx"] for x in bt_groups), default=0.0)
305 bt_groups = [
306 dict(ogrp, tx=ogrp["tx"] - min_x, displaced_tx=ogrp["displaced_tx"] - min_x) # type: ignore[misc]
307 for ogrp in sorted(
308 bt_groups, key=lambda x: (x["ty"] * x["flip_sort"], -x["tx"]), reverse=True
309 )
310 ]
311
312 if debug_path: # pragma: no cover
313 import json # noqa: PLC0415
314
315 debug_path.joinpath("bts.json").write_text(
316 json.dumps(bt_groups, indent=2, default=str), "utf-8"
317 )
318 debug_path.joinpath("tjs.json").write_text(
319 json.dumps(
320 tj_ops, indent=2, default=lambda x: getattr(x, "to_dict", str)(x)
321 ),
322 "utf-8",
323 )
324 return bt_groups
325
326
327def fixed_char_width(bt_groups: List[BTGroup], scale_weight: float = 1.25) -> float:
328 """
329 Calculate average character width weighted by the length of the rendered
330 text in each sample for conversion to fixed-width layout.
331
332 Args:
333 bt_groups (List[BTGroup]): List of dicts of text rendered by each
334 BT operator
335
336 Returns:
337 float: fixed character width
338
339 """
340 char_widths = []
341 for _bt in bt_groups:
342 _len = len(_bt["text"]) * scale_weight
343 char_widths.append(((_bt["displaced_tx"] - _bt["tx"]) / _len, _len))
344 return sum(_w * _l for _w, _l in char_widths) / sum(_l for _, _l in char_widths)
345
346
347def fixed_width_page(
348 ty_groups: Dict[int, List[BTGroup]], char_width: float, space_vertically: bool, font_height_weight: float
349) -> str:
350 """
351 Generate page text from text operations grouped by rendered y coordinate.
352
353 Args:
354 ty_groups: dict of text show ops as returned by y_coordinate_groups()
355 char_width: fixed character width
356 space_vertically: include blank lines inferred from y distance + font height.
357 font_height_weight: multiplier for font height when calculating blank lines.
358
359 Returns:
360 str: page text in a fixed width format that closely adheres to the rendered
361 layout in the source pdf.
362
363 """
364 lines: List[str] = []
365 last_y_coord = 0
366 for y_coord, line_data in ty_groups.items():
367 if space_vertically and lines:
368 fh = line_data[0]["font_height"]
369 blank_lines = 0 if fh == 0 else (
370 int(abs(y_coord - last_y_coord) / (fh * font_height_weight)) - 1
371 )
372 lines.extend([""] * blank_lines)
373 line = ""
374 last_disp = 0.0
375 for bt_op in line_data:
376 offset = int(bt_op["tx"] // char_width)
377 spaces = (offset - len(line)) * (ceil(last_disp) < int(bt_op["tx"]))
378 line = f"{line}{' ' * spaces}{bt_op['text']}"
379 last_disp = bt_op["displaced_tx"]
380 if line.strip() or lines:
381 lines.append(
382 "".join(c if ord(c) < 14 or ord(c) > 31 else " " for c in line)
383 )
384 last_y_coord = y_coord
385 return "\n".join(ln.rstrip() for ln in lines if space_vertically or ln.strip())