1"""Extract PDF text preserving the layout of the source PDF"""
2
3from collections.abc import Iterator
4from itertools import groupby
5from math import ceil
6from pathlib import Path
7from typing import Any, Literal, Optional, TypedDict
8
9from ..._utils import logger_warning
10from .. import LAYOUT_NEW_BT_GROUP_SPACE_WIDTHS
11from ._font import Font
12from ._text_state_manager import TextStateManager
13from ._text_state_params import TextStateParams
14
15
16class BTGroup(TypedDict):
17 """
18 Dict describing a line of text rendered within a BT/ET operator pair.
19 If multiple text show operations render text on the same line, the text
20 will be combined into a single BTGroup dict.
21
22 Keys:
23 tx: x coordinate of first character in BTGroup
24 ty: y coordinate of first character in BTGroup
25 font_size: nominal font size
26 font_height: effective font height
27 text: rendered text
28 displaced_tx: x coordinate of last character in BTGroup
29 flip_sort: -1 if page is upside down, else 1
30 """
31
32 tx: float
33 ty: float
34 font_size: float
35 font_height: float
36 text: str
37 displaced_tx: float
38 flip_sort: Literal[-1, 1]
39
40
41def bt_group(tj_op: TextStateParams, rendered_text: str, dispaced_tx: float) -> BTGroup:
42 """
43 BTGroup constructed from a TextStateParams instance, rendered text, and
44 displaced tx value.
45
46 Args:
47 tj_op (TextStateParams): TextStateParams instance
48 rendered_text (str): rendered text
49 dispaced_tx (float): x coordinate of last character in BTGroup
50
51 """
52 return BTGroup(
53 tx=tj_op.tx,
54 ty=tj_op.ty,
55 font_size=tj_op.font_size,
56 font_height=tj_op.font_height,
57 text=rendered_text,
58 displaced_tx=dispaced_tx,
59 flip_sort=-1 if tj_op.flip_vertical else 1,
60 )
61
62
63def recurs_to_target_op(
64 ops: Iterator[tuple[list[Any], bytes]],
65 text_state_mgr: TextStateManager,
66 end_target: Literal[b"Q", b"ET"],
67 fonts: dict[str, Font],
68 strip_rotated: bool = True,
69) -> tuple[list[BTGroup], list[TextStateParams]]:
70 """
71 Recurse operators between BT/ET and/or q/Q operators managing the transform
72 stack and capturing text positioning and rendering data.
73
74 Args:
75 ops: iterator of operators in content stream
76 text_state_mgr: a TextStateManager instance
77 end_target: Either b"Q" (ends b"q" op) or b"ET" (ends b"BT" op)
78 fonts: font dictionary as returned by PageObject._layout_mode_fonts()
79
80 Returns:
81 tuple: list of BTGroup dicts + list of TextStateParams dataclass instances.
82
83 """
84 # 1 entry per line of text rendered within each BT/ET operation.
85 bt_groups: list[BTGroup] = []
86
87 # 1 entry per text show operator (Tj/TJ/'/")
88 tj_ops: list[TextStateParams] = []
89
90 if end_target == b"Q":
91 # add new q level. cm's added at this level will be popped at next b'Q'
92 text_state_mgr.add_q()
93
94 for operands, op in ops:
95 # The loop is broken by the end target, or exits normally when there are no more ops.
96 if op == end_target:
97 if op == b"Q":
98 text_state_mgr.remove_q()
99 if op == b"ET":
100 if not tj_ops:
101 return bt_groups, tj_ops
102 _text = ""
103 bt_idx = 0 # idx of first tj in this bt group
104 last_displaced_tx = tj_ops[bt_idx].displaced_tx
105 last_ty = tj_ops[bt_idx].ty
106 for _idx, _tj in enumerate(
107 tj_ops
108 ): # ... build text from new Tj operators
109 if strip_rotated and _tj.rotated:
110 continue
111 if not _tj.font.interpretable: # generates warning
112 continue
113 # if the y position of the text is greater than the font height, assume
114 # the text is on a new line and start a new group
115 if abs(_tj.ty - last_ty) > _tj.font_height:
116 if _text.strip():
117 bt_groups.append(
118 bt_group(tj_ops[bt_idx], _text, last_displaced_tx)
119 )
120 bt_idx = _idx
121 _text = ""
122
123 # if the x position of the text is less than the last x position by
124 # more than 5 spaces widths, assume the text order should be flipped
125 # and start a new group
126 if (
127 last_displaced_tx - _tj.tx
128 > _tj.space_tx * LAYOUT_NEW_BT_GROUP_SPACE_WIDTHS
129 ):
130 if _text.strip():
131 bt_groups.append(
132 bt_group(tj_ops[bt_idx], _text, last_displaced_tx)
133 )
134 bt_idx = _idx
135 last_displaced_tx = _tj.displaced_tx
136 _text = ""
137
138 # calculate excess x translation based on ending tx of previous Tj.
139 # multiply by bool (_idx != bt_idx) to ensure spaces aren't double
140 # applied to the first tj of a BTGroup in fixed_width_page().
141 excess_tx = round(_tj.tx - last_displaced_tx, 3) * (_idx != bt_idx)
142 # space_tx could be 0 if either Tz or font_size was 0 for this _tj.
143 spaces = int(excess_tx // _tj.space_tx) if _tj.space_tx else 0
144 new_text = f'{" " * spaces}{_tj.txt}'
145
146 last_ty = _tj.ty
147 _text = f"{_text}{new_text}"
148 last_displaced_tx = _tj.displaced_tx
149 if _text:
150 bt_groups.append(bt_group(tj_ops[bt_idx], _text, last_displaced_tx))
151 text_state_mgr.reset_tm()
152 break
153 if op == b"q":
154 bts, tjs = recurs_to_target_op(
155 ops, text_state_mgr, b"Q", fonts, strip_rotated
156 )
157 bt_groups.extend(bts)
158 tj_ops.extend(tjs)
159 elif op == b"cm":
160 text_state_mgr.add_cm(*operands)
161 elif op == b"BT":
162 bts, tjs = recurs_to_target_op(
163 ops, text_state_mgr, b"ET", fonts, strip_rotated
164 )
165 bt_groups.extend(bts)
166 tj_ops.extend(tjs)
167 elif op == b"Tj":
168 tj_ops.append(text_state_mgr.text_state_params(operands[0]))
169 elif op == b"TJ":
170 _tj = text_state_mgr.text_state_params()
171 for tj_op in operands[0]:
172 if isinstance(tj_op, bytes):
173 _tj = text_state_mgr.text_state_params(tj_op)
174 tj_ops.append(_tj)
175 else:
176 text_state_mgr.add_trm(_tj.displacement_matrix(TD_offset=tj_op))
177 elif op == b"'":
178 text_state_mgr.reset_trm()
179 text_state_mgr.add_tm([0, -text_state_mgr.TL])
180 tj_ops.append(text_state_mgr.text_state_params(operands[0]))
181 elif op == b'"':
182 text_state_mgr.reset_trm()
183 text_state_mgr.set_state_param(b"Tw", operands[0])
184 text_state_mgr.set_state_param(b"Tc", operands[1])
185 text_state_mgr.add_tm([0, -text_state_mgr.TL])
186 tj_ops.append(text_state_mgr.text_state_params(operands[2]))
187 elif op in (b"Td", b"Tm", b"TD", b"T*"):
188 text_state_mgr.reset_trm()
189 if op == b"Tm":
190 text_state_mgr.reset_tm()
191 elif op == b"TD":
192 text_state_mgr.set_state_param(b"TL", -operands[1])
193 elif op == b"T*":
194 operands = [0, -text_state_mgr.TL]
195 text_state_mgr.add_tm(operands)
196 elif op == b"Tf":
197 text_state_mgr.set_font(fonts[operands[0]], operands[1])
198 else: # handle Tc, Tw, Tz, TL, and Ts operators
199 text_state_mgr.set_state_param(op, operands)
200 else:
201 logger_warning(
202 f"Unbalanced target operations, expected {end_target!r}.",
203 __name__,
204 )
205 return bt_groups, tj_ops
206
207
208def y_coordinate_groups(
209 bt_groups: list[BTGroup], debug_path: Optional[Path] = None
210) -> dict[int, list[BTGroup]]:
211 """
212 Group text operations by rendered y coordinate, i.e. the line number.
213
214 Args:
215 bt_groups: list of dicts as returned by text_show_operations()
216 debug_path (Path, optional): Path to a directory for saving debug output.
217
218 Returns:
219 Dict[int, List[BTGroup]]: dict of lists of text rendered by each BT operator
220 keyed by y coordinate
221
222 """
223 ty_groups = {
224 ty: sorted(grp, key=lambda x: x["tx"])
225 for ty, grp in groupby(
226 bt_groups, key=lambda bt_grp: int(bt_grp["ty"] * bt_grp["flip_sort"])
227 )
228 }
229 # combine groups whose y coordinates differ by less than the effective font height
230 # (accounts for mixed fonts and other minor oddities)
231 last_ty = next(iter(ty_groups))
232 last_txs = {int(_t["tx"]) for _t in ty_groups[last_ty] if _t["text"].strip()}
233 for ty in list(ty_groups)[1:]:
234 fsz = min(ty_groups[_y][0]["font_height"] for _y in (ty, last_ty))
235 txs = {int(_t["tx"]) for _t in ty_groups[ty] if _t["text"].strip()}
236 # prevent merge if both groups are rendering in the same x position.
237 no_text_overlap = not (txs & last_txs)
238 offset_less_than_font_height = abs(ty - last_ty) < fsz
239 if no_text_overlap and offset_less_than_font_height:
240 ty_groups[last_ty] = sorted(
241 ty_groups.pop(ty) + ty_groups[last_ty], key=lambda x: x["tx"]
242 )
243 last_txs |= txs
244 else:
245 last_ty = ty
246 last_txs = txs
247 if debug_path: # pragma: no cover
248 import json # noqa: PLC0415
249
250 debug_path.joinpath("bt_groups.json").write_text(
251 json.dumps(ty_groups, indent=2, default=str), "utf-8"
252 )
253 return ty_groups
254
255
256def text_show_operations(
257 ops: Iterator[tuple[list[Any], bytes]],
258 fonts: dict[str, Font],
259 strip_rotated: bool = True,
260 debug_path: Optional[Path] = None,
261) -> list[BTGroup]:
262 """
263 Extract text from BT/ET operator pairs.
264
265 Args:
266 ops (Iterator[Tuple[List, bytes]]): iterator of operators in content stream
267 fonts (Dict[str, Font]): font dictionary
268 strip_rotated: Removes text if rotated w.r.t. to the page. Defaults to True.
269 debug_path (Path, optional): Path to a directory for saving debug output.
270
271 Returns:
272 List[BTGroup]: list of dicts of text rendered by each BT operator
273
274 """
275 state_mgr = TextStateManager() # transformation stack manager
276 bt_groups: list[BTGroup] = [] # BT operator dict
277 tj_ops: list[TextStateParams] = [] # Tj/TJ operator data
278 for operands, op in ops:
279 if op in (b"BT", b"q"):
280 bts, tjs = recurs_to_target_op(
281 ops, state_mgr, b"ET" if op == b"BT" else b"Q", fonts, strip_rotated
282 )
283 bt_groups.extend(bts)
284 tj_ops.extend(tjs)
285 elif op == b"Tf":
286 state_mgr.set_font(fonts[operands[0]], operands[1])
287 else: # set Tc, Tw, Tz, TL, and Ts if required. ignores all other ops
288 state_mgr.set_state_param(op, operands)
289
290 if any(tj.rotated for tj in tj_ops):
291 if strip_rotated:
292 logger_warning(
293 "Rotated text discovered. Output will be incomplete.", __name__
294 )
295 else:
296 logger_warning(
297 "Rotated text discovered. Layout will be degraded.", __name__
298 )
299 if not all(tj.font.interpretable for tj in tj_ops):
300 logger_warning(
301 "PDF contains an uninterpretable font. Output will be incomplete.", __name__
302 )
303
304 # left align the data, i.e. decrement all tx values by min(tx)
305 min_x = min((x["tx"] for x in bt_groups), default=0.0)
306 bt_groups = [
307 dict(ogrp, tx=ogrp["tx"] - min_x, displaced_tx=ogrp["displaced_tx"] - min_x) # type: ignore[misc]
308 for ogrp in sorted(
309 bt_groups, key=lambda x: (x["ty"] * x["flip_sort"], -x["tx"]), reverse=True
310 )
311 ]
312
313 if debug_path: # pragma: no cover
314 import json # noqa: PLC0415
315
316 debug_path.joinpath("bts.json").write_text(
317 json.dumps(bt_groups, indent=2, default=str), "utf-8"
318 )
319 debug_path.joinpath("tjs.json").write_text(
320 json.dumps(
321 tj_ops, indent=2, default=lambda x: getattr(x, "to_dict", str)(x)
322 ),
323 "utf-8",
324 )
325 return bt_groups
326
327
328def fixed_char_width(bt_groups: list[BTGroup], scale_weight: float = 1.25) -> float:
329 """
330 Calculate average character width weighted by the length of the rendered
331 text in each sample for conversion to fixed-width layout.
332
333 Args:
334 bt_groups (List[BTGroup]): List of dicts of text rendered by each
335 BT operator
336
337 Returns:
338 float: fixed character width
339
340 """
341 char_widths = []
342 for _bt in bt_groups:
343 _len = len(_bt["text"]) * scale_weight
344 char_widths.append(((_bt["displaced_tx"] - _bt["tx"]) / _len, _len))
345 return sum(_w * _l for _w, _l in char_widths) / sum(_l for _, _l in char_widths)
346
347
348def fixed_width_page(
349 ty_groups: dict[int, list[BTGroup]], char_width: float, space_vertically: bool, font_height_weight: float
350) -> str:
351 """
352 Generate page text from text operations grouped by rendered y coordinate.
353
354 Args:
355 ty_groups: dict of text show ops as returned by y_coordinate_groups()
356 char_width: fixed character width
357 space_vertically: include blank lines inferred from y distance + font height.
358 font_height_weight: multiplier for font height when calculating blank lines.
359
360 Returns:
361 str: page text in a fixed width format that closely adheres to the rendered
362 layout in the source pdf.
363
364 """
365 lines: list[str] = []
366 last_y_coord = 0
367 for y_coord, line_data in ty_groups.items():
368 if space_vertically and lines:
369 fh = line_data[0]["font_height"]
370 blank_lines = 0 if fh == 0 else (
371 int(abs(y_coord - last_y_coord) / (fh * font_height_weight)) - 1
372 )
373 lines.extend([""] * blank_lines)
374 line = ""
375 last_disp = 0.0
376 for bt_op in line_data:
377 offset = int(bt_op["tx"] // char_width)
378 spaces = (offset - len(line)) * (ceil(last_disp) < int(bt_op["tx"]))
379 line = f"{line}{' ' * spaces}{bt_op['text']}"
380 last_disp = bt_op["displaced_tx"]
381 if line.strip() or lines:
382 lines.append(
383 "".join(c if ord(c) < 14 or ord(c) > 31 else " " for c in line)
384 )
385 last_y_coord = y_coord
386 return "\n".join(ln.rstrip() for ln in lines if space_vertically or ln.strip())