1"""Extract PDF text preserving the layout of the source PDF"""
2
3from collections.abc import Iterator
4from itertools import groupby
5from math import ceil
6from pathlib import Path
7from typing import Any, Literal, Optional, TypedDict
8
9from ..._font import Font
10from ..._utils import logger_warning
11from .. import LAYOUT_NEW_BT_GROUP_SPACE_WIDTHS
12from ._text_state_manager import TextStateManager
13from ._text_state_params import TextStateParams
14
15WHITESPACE_LIMIT = 10_000
16NEWLINE_LIMIT = 1_000
17
18
19class BTGroup(TypedDict):
20 """
21 Dict describing a line of text rendered within a BT/ET operator pair.
22 If multiple text show operations render text on the same line, the text
23 will be combined into a single BTGroup dict.
24
25 Keys:
26 tx: x coordinate of first character in BTGroup
27 ty: y coordinate of first character in BTGroup
28 font_size: nominal font size
29 font_height: effective font height
30 text: rendered text
31 displaced_tx: x coordinate of last character in BTGroup
32 flip_sort: -1 if page is upside down, else 1
33 """
34
35 tx: float
36 ty: float
37 font_size: float
38 font_height: float
39 text: str
40 displaced_tx: float
41 flip_sort: Literal[-1, 1]
42
43
44def bt_group(tj_op: TextStateParams, rendered_text: str, displaced_tx: float) -> BTGroup:
45 """
46 BTGroup constructed from a TextStateParams instance, rendered text, and
47 displaced tx value.
48
49 Args:
50 tj_op (TextStateParams): TextStateParams instance
51 rendered_text (str): rendered text
52 displaced_tx (float): x coordinate of last character in BTGroup
53
54 """
55 return BTGroup(
56 tx=tj_op.tx,
57 ty=tj_op.ty,
58 font_size=tj_op.font_size,
59 font_height=tj_op.font_height,
60 text=rendered_text,
61 displaced_tx=displaced_tx,
62 flip_sort=-1 if tj_op.flip_vertical else 1,
63 )
64
65
66def recurse_to_target_op(
67 ops: Iterator[tuple[list[Any], bytes]],
68 text_state_mgr: TextStateManager,
69 end_target: Literal[b"Q", b"ET"],
70 fonts: dict[str, Font],
71 strip_rotated: bool = True,
72) -> tuple[list[BTGroup], list[TextStateParams]]:
73 """
74 Recurse operators between BT/ET and/or q/Q operators managing the transform
75 stack and capturing text positioning and rendering data.
76
77 Args:
78 ops: iterator of operators in content stream
79 text_state_mgr: a TextStateManager instance
80 end_target: Either b"Q" (ends b"q" op) or b"ET" (ends b"BT" op)
81 fonts: font dictionary as returned by PageObject._layout_mode_fonts()
82
83 Returns:
84 tuple: list of BTGroup dicts + list of TextStateParams dataclass instances.
85
86 """
87 # 1 entry per line of text rendered within each BT/ET operation.
88 bt_groups: list[BTGroup] = []
89
90 # 1 entry per text show operator (Tj/TJ/'/")
91 tj_ops: list[TextStateParams] = []
92
93 if end_target == b"Q":
94 # add new q level. cm's added at this level will be popped at next b'Q'
95 text_state_mgr.add_q()
96
97 for operands, op in ops:
98 # The loop is broken by the end target, or exits normally when there are no more ops.
99 if op == end_target:
100 if op == b"Q":
101 text_state_mgr.remove_q()
102 if op == b"ET":
103 if not tj_ops:
104 return bt_groups, tj_ops
105 _text = ""
106 bt_idx = 0 # idx of first tj in this bt group
107 last_displaced_tx = tj_ops[bt_idx].displaced_tx
108 last_ty = tj_ops[bt_idx].ty
109 for _idx, _tj in enumerate(
110 tj_ops
111 ): # ... build text from new Tj operators
112 if strip_rotated and _tj.rotated:
113 continue
114 if not _tj.font.interpretable: # generates warning
115 continue
116 # if the y position of the text is greater than the font height, assume
117 # the text is on a new line and start a new group
118 if abs(_tj.ty - last_ty) > _tj.font_height:
119 if _text.strip():
120 bt_groups.append(
121 bt_group(tj_ops[bt_idx], _text, last_displaced_tx)
122 )
123 bt_idx = _idx
124 _text = ""
125
126 # if the x position of the text is less than the last x position by
127 # more than 5 spaces widths, assume the text order should be flipped
128 # and start a new group
129 if (
130 last_displaced_tx - _tj.tx
131 > _tj.space_tx * LAYOUT_NEW_BT_GROUP_SPACE_WIDTHS
132 ):
133 if _text.strip():
134 bt_groups.append(
135 bt_group(tj_ops[bt_idx], _text, last_displaced_tx)
136 )
137 bt_idx = _idx
138 last_displaced_tx = _tj.displaced_tx
139 _text = ""
140
141 # calculate excess x translation based on ending tx of previous Tj.
142 # multiply by bool (_idx != bt_idx) to ensure spaces aren't double
143 # applied to the first tj of a BTGroup in fixed_width_page().
144 excess_tx = round(_tj.tx - last_displaced_tx, 3) * (_idx != bt_idx)
145 # space_tx could be 0 if either Tz or font_size was 0 for this _tj.
146 spaces = int(excess_tx // _tj.space_tx) if _tj.space_tx else 0
147 if spaces > WHITESPACE_LIMIT:
148 logger_warning(
149 "Limiting excessive whitespace from %(actual)d to %(limit)d characters.",
150 actual=spaces, limit=WHITESPACE_LIMIT, source=__name__
151 )
152 spaces = WHITESPACE_LIMIT
153 new_text = f'{" " * spaces}{_tj.text}'
154
155 last_ty = _tj.ty
156 _text = f"{_text}{new_text}"
157 last_displaced_tx = _tj.displaced_tx
158 if _text:
159 bt_groups.append(bt_group(tj_ops[bt_idx], _text, last_displaced_tx))
160 text_state_mgr.reset_tm()
161 break
162 if op == b"q":
163 bts, tjs = recurse_to_target_op(
164 ops, text_state_mgr, b"Q", fonts, strip_rotated
165 )
166 bt_groups.extend(bts)
167 tj_ops.extend(tjs)
168 elif op == b"cm":
169 text_state_mgr.add_cm(*operands)
170 elif op == b"BT":
171 bts, tjs = recurse_to_target_op(
172 ops, text_state_mgr, b"ET", fonts, strip_rotated
173 )
174 bt_groups.extend(bts)
175 tj_ops.extend(tjs)
176 elif op == b"Tj":
177 tj_ops.append(text_state_mgr.text_state_params(operands[0]))
178 elif op == b"TJ":
179 _tj = text_state_mgr.text_state_params()
180 for tj_op in operands[0]:
181 if isinstance(tj_op, bytes):
182 _tj = text_state_mgr.text_state_params(tj_op)
183 tj_ops.append(_tj)
184 else:
185 text_state_mgr.add_trm(_tj.displacement_matrix(td_offset=tj_op))
186 elif op == b"'":
187 text_state_mgr.reset_trm()
188 text_state_mgr.add_tm([0, -text_state_mgr.TL])
189 tj_ops.append(text_state_mgr.text_state_params(operands[0]))
190 elif op == b'"':
191 text_state_mgr.reset_trm()
192 text_state_mgr.set_state_param(b"Tw", operands[0])
193 text_state_mgr.set_state_param(b"Tc", operands[1])
194 text_state_mgr.add_tm([0, -text_state_mgr.TL])
195 tj_ops.append(text_state_mgr.text_state_params(operands[2]))
196 elif op in (b"Td", b"Tm", b"TD", b"T*"):
197 text_state_mgr.reset_trm()
198 if op == b"Tm":
199 text_state_mgr.reset_tm()
200 elif op == b"TD":
201 text_state_mgr.set_state_param(b"TL", -operands[1])
202 elif op == b"T*":
203 operands = [0, -text_state_mgr.TL]
204 text_state_mgr.add_tm(operands)
205 elif op == b"Tf":
206 text_state_mgr.set_font(fonts[operands[0]], operands[1])
207 else: # handle Tc, Tw, Tz, TL, and Ts operators
208 text_state_mgr.set_state_param(op, operands)
209 else:
210 logger_warning(
211 "Unbalanced target operations, expected %(end_target)r.",
212 source=__name__,
213 end_target=end_target,
214 )
215 return bt_groups, tj_ops
216
217
218def y_coordinate_groups(
219 bt_groups: list[BTGroup], debug_path: Optional[Path] = None
220) -> dict[int, list[BTGroup]]:
221 """
222 Group text operations by rendered y coordinate, i.e. the line number.
223
224 Args:
225 bt_groups: list of dicts as returned by text_show_operations()
226 debug_path (Path, optional): Path to a directory for saving debug output.
227
228 Returns:
229 Dict[int, List[BTGroup]]: dict of lists of text rendered by each BT operator
230 keyed by y coordinate
231
232 """
233 ty_groups = {
234 ty: sorted(grp, key=lambda x: x["tx"])
235 for ty, grp in groupby(
236 bt_groups, key=lambda bt_grp: int(bt_grp["ty"] * bt_grp["flip_sort"])
237 )
238 }
239 # combine groups whose y coordinates differ by less than the effective font height
240 # (accounts for mixed fonts and other minor oddities)
241 last_ty = next(iter(ty_groups))
242 last_txs = {int(_t["tx"]) for _t in ty_groups[last_ty] if _t["text"].strip()}
243 for ty in list(ty_groups)[1:]:
244 fsz = min(ty_groups[_y][0]["font_height"] for _y in (ty, last_ty))
245 txs = {int(_t["tx"]) for _t in ty_groups[ty] if _t["text"].strip()}
246 # prevent merge if both groups are rendering in the same x position.
247 no_text_overlap = not (txs & last_txs)
248 offset_less_than_font_height = abs(ty - last_ty) < fsz
249 if no_text_overlap and offset_less_than_font_height:
250 ty_groups[last_ty] = sorted(
251 ty_groups.pop(ty) + ty_groups[last_ty], key=lambda x: x["tx"]
252 )
253 last_txs |= txs
254 else:
255 last_ty = ty
256 last_txs = txs
257 if debug_path: # pragma: no cover
258 import json # noqa: PLC0415
259
260 debug_path.joinpath("bt_groups.json").write_text(
261 json.dumps(ty_groups, indent=2, default=str), "utf-8"
262 )
263 return ty_groups
264
265
266def text_show_operations(
267 ops: Iterator[tuple[list[Any], bytes]],
268 fonts: dict[str, Font],
269 strip_rotated: bool = True,
270 debug_path: Optional[Path] = None,
271) -> list[BTGroup]:
272 """
273 Extract text from BT/ET operator pairs.
274
275 Args:
276 ops (Iterator[Tuple[List, bytes]]): iterator of operators in content stream
277 fonts (Dict[str, Font]): font dictionary
278 strip_rotated: Removes text if rotated w.r.t. to the page. Defaults to True.
279 debug_path (Path, optional): Path to a directory for saving debug output.
280
281 Returns:
282 List[BTGroup]: list of dicts of text rendered by each BT operator
283
284 """
285 state_mgr = TextStateManager() # transformation stack manager
286 bt_groups: list[BTGroup] = [] # BT operator dict
287 tj_ops: list[TextStateParams] = [] # Tj/TJ operator data
288 for operands, op in ops:
289 if op in (b"BT", b"q"):
290 bts, tjs = recurse_to_target_op(
291 ops, state_mgr, b"ET" if op == b"BT" else b"Q", fonts, strip_rotated
292 )
293 bt_groups.extend(bts)
294 tj_ops.extend(tjs)
295 elif op == b"Tf":
296 state_mgr.set_font(fonts[operands[0]], operands[1])
297 else: # set Tc, Tw, Tz, TL, and Ts if required. ignores all other ops
298 state_mgr.set_state_param(op, operands)
299
300 if any(tj.rotated for tj in tj_ops):
301 if strip_rotated:
302 logger_warning(
303 "Rotated text discovered. Output will be incomplete.", source=__name__
304 )
305 else:
306 logger_warning(
307 "Rotated text discovered. Layout will be degraded.", source=__name__
308 )
309 if not all(tj.font.interpretable for tj in tj_ops):
310 logger_warning(
311 "PDF contains an uninterpretable font. Output will be incomplete.", source=__name__
312 )
313
314 # left align the data, i.e. decrement all tx values by min(tx)
315 min_x = min((x["tx"] for x in bt_groups), default=0.0)
316 bt_groups = [
317 dict(ogrp, tx=ogrp["tx"] - min_x, displaced_tx=ogrp["displaced_tx"] - min_x) # type: ignore[misc]
318 for ogrp in sorted(
319 bt_groups, key=lambda x: (x["ty"] * x["flip_sort"], -x["tx"]), reverse=True
320 )
321 ]
322
323 if debug_path: # pragma: no cover
324 import json # noqa: PLC0415
325
326 debug_path.joinpath("bts.json").write_text(
327 json.dumps(bt_groups, indent=2, default=str), "utf-8"
328 )
329 debug_path.joinpath("tjs.json").write_text(
330 json.dumps(
331 tj_ops, indent=2, default=lambda x: getattr(x, "to_dict", str)(x)
332 ),
333 "utf-8",
334 )
335 return bt_groups
336
337
338def fixed_char_width(bt_groups: list[BTGroup], scale_weight: float = 1.25) -> float:
339 """
340 Calculate average character width weighted by the length of the rendered
341 text in each sample for conversion to fixed-width layout.
342
343 Args:
344 bt_groups (List[BTGroup]): List of dicts of text rendered by each
345 BT operator
346
347 Returns:
348 float: fixed character width
349
350 """
351 char_widths = []
352 for _bt in bt_groups:
353 _len = len(_bt["text"]) * scale_weight
354 char_widths.append(((_bt["displaced_tx"] - _bt["tx"]) / _len, _len))
355 return sum(_w * _l for _w, _l in char_widths) / sum(_l for _, _l in char_widths)
356
357
358def fixed_width_page(
359 ty_groups: dict[int, list[BTGroup]], char_width: float, space_vertically: bool, font_height_weight: float
360) -> str:
361 """
362 Generate page text from text operations grouped by rendered y coordinate.
363
364 Args:
365 ty_groups: dict of text show ops as returned by y_coordinate_groups()
366 char_width: fixed character width
367 space_vertically: include blank lines inferred from y distance + font height.
368 font_height_weight: multiplier for font height when calculating blank lines.
369
370 Returns:
371 str: page text in a fixed width format that closely adheres to the rendered
372 layout in the source pdf.
373
374 """
375 lines: list[str] = []
376 last_y_coord = 0
377 table = str.maketrans(dict.fromkeys(range(14, 32), " "))
378 for y_coord, line_data in ty_groups.items():
379 if space_vertically and lines:
380 fh = line_data[0]["font_height"]
381 blank_lines = 0 if fh == 0 else (
382 int(abs(y_coord - last_y_coord) / (fh * font_height_weight)) - 1
383 )
384 if blank_lines > NEWLINE_LIMIT:
385 logger_warning(
386 "Limiting excessive newlines from %(actual)d to %(limit)d.",
387 actual=blank_lines, limit=NEWLINE_LIMIT, source=__name__
388 )
389 blank_lines = NEWLINE_LIMIT
390 lines.extend([""] * blank_lines)
391
392 line_parts = [] # It uses a list to construct the line, avoiding string concatenation.
393 current_len = 0 # Track the size with int instead of len(str) overhead.
394 last_disp = 0.0
395 for bt_op in line_data:
396 tx = bt_op["tx"]
397 offset = int(tx // char_width)
398 needed_spaces = offset - current_len
399 if needed_spaces > 0 and ceil(last_disp) < int(tx):
400 if needed_spaces > WHITESPACE_LIMIT:
401 logger_warning(
402 "Limiting excessive whitespace from %(actual)d to %(limit)d characters.",
403 actual=needed_spaces, limit=WHITESPACE_LIMIT, source=__name__
404 )
405 needed_spaces = WHITESPACE_LIMIT
406 padding = " " * needed_spaces
407 line_parts.append(padding)
408 current_len += needed_spaces
409
410 raw_text = bt_op["text"]
411 text = raw_text.translate(table)
412 line_parts.append(text)
413 current_len += len(text)
414 last_disp = bt_op["displaced_tx"]
415
416 full_line = "".join(line_parts).rstrip()
417 if full_line.strip() or (space_vertically and lines):
418 lines.append(full_line)
419
420 last_y_coord = y_coord
421
422 return "\n".join(lines)