Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pdfplumber/table.py: 20%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import itertools
2from dataclasses import dataclass
3from operator import itemgetter
4from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple, Type, Union
6from . import utils
7from ._typing import T_bbox, T_num, T_obj, T_obj_iter, T_obj_list, T_point
9DEFAULT_SNAP_TOLERANCE = 3
10DEFAULT_JOIN_TOLERANCE = 3
11DEFAULT_MIN_WORDS_VERTICAL = 3
12DEFAULT_MIN_WORDS_HORIZONTAL = 1
14T_intersections = Dict[T_point, Dict[str, T_obj_list]]
15T_table_settings = Union["TableSettings", Dict[str, Any]]
17if TYPE_CHECKING: # pragma: nocover
18 from .page import Page
21def snap_edges(
22 edges: T_obj_list,
23 x_tolerance: T_num = DEFAULT_SNAP_TOLERANCE,
24 y_tolerance: T_num = DEFAULT_SNAP_TOLERANCE,
25) -> T_obj_list:
26 """
27 Given a list of edges, snap any within `tolerance` pixels of one another
28 to their positional average.
29 """
30 by_orientation: Dict[str, T_obj_list] = {"v": [], "h": []}
31 for e in edges:
32 by_orientation[e["orientation"]].append(e)
34 snapped_v = utils.snap_objects(by_orientation["v"], "x0", x_tolerance)
35 snapped_h = utils.snap_objects(by_orientation["h"], "top", y_tolerance)
36 return snapped_v + snapped_h
39def join_edge_group(
40 edges: T_obj_iter, orientation: str, tolerance: T_num = DEFAULT_JOIN_TOLERANCE
41) -> T_obj_list:
42 """
43 Given a list of edges along the same infinite line, join those that
44 are within `tolerance` pixels of one another.
45 """
46 if orientation == "h":
47 min_prop, max_prop = "x0", "x1"
48 elif orientation == "v":
49 min_prop, max_prop = "top", "bottom"
50 else:
51 raise ValueError("Orientation must be 'v' or 'h'")
53 sorted_edges = list(sorted(edges, key=itemgetter(min_prop)))
54 joined = [sorted_edges[0]]
55 for e in sorted_edges[1:]:
56 last = joined[-1]
57 if e[min_prop] <= (last[max_prop] + tolerance):
58 if e[max_prop] > last[max_prop]:
59 # Extend current edge to new extremity
60 joined[-1] = utils.resize_object(last, max_prop, e[max_prop])
61 else:
62 # Edge is separate from previous edges
63 joined.append(e)
65 return joined
68def merge_edges(
69 edges: T_obj_list,
70 snap_x_tolerance: T_num,
71 snap_y_tolerance: T_num,
72 join_x_tolerance: T_num,
73 join_y_tolerance: T_num,
74) -> T_obj_list:
75 """
76 Using the `snap_edges` and `join_edge_group` methods above,
77 merge a list of edges into a more "seamless" list.
78 """
80 def get_group(edge: T_obj) -> Tuple[str, T_num]:
81 if edge["orientation"] == "h":
82 return ("h", edge["top"])
83 else:
84 return ("v", edge["x0"])
86 if snap_x_tolerance > 0 or snap_y_tolerance > 0:
87 edges = snap_edges(edges, snap_x_tolerance, snap_y_tolerance)
89 _sorted = sorted(edges, key=get_group)
90 edge_groups = itertools.groupby(_sorted, key=get_group)
91 edge_gen = (
92 join_edge_group(
93 items, k[0], (join_x_tolerance if k[0] == "h" else join_y_tolerance)
94 )
95 for k, items in edge_groups
96 )
97 edges = list(itertools.chain(*edge_gen))
98 return edges
101def words_to_edges_h(
102 words: T_obj_list, word_threshold: int = DEFAULT_MIN_WORDS_HORIZONTAL
103) -> T_obj_list:
104 """
105 Find (imaginary) horizontal lines that connect the tops
106 of at least `word_threshold` words.
107 """
108 by_top = utils.cluster_objects(words, itemgetter("top"), 1)
109 large_clusters = filter(lambda x: len(x) >= word_threshold, by_top)
110 rects = list(map(utils.objects_to_rect, large_clusters))
111 if len(rects) == 0:
112 return []
113 min_x0 = min(map(itemgetter("x0"), rects))
114 max_x1 = max(map(itemgetter("x1"), rects))
116 edges = []
117 for r in rects:
118 edges += [
119 # Top of text
120 {
121 "x0": min_x0,
122 "x1": max_x1,
123 "top": r["top"],
124 "bottom": r["top"],
125 "width": max_x1 - min_x0,
126 "orientation": "h",
127 },
128 # For each detected row, we also add the 'bottom' line. This will
129 # generate extra edges, (some will be redundant with the next row
130 # 'top' line), but this catches the last row of every table.
131 {
132 "x0": min_x0,
133 "x1": max_x1,
134 "top": r["bottom"],
135 "bottom": r["bottom"],
136 "width": max_x1 - min_x0,
137 "orientation": "h",
138 },
139 ]
141 return edges
144def words_to_edges_v(
145 words: T_obj_list, word_threshold: int = DEFAULT_MIN_WORDS_VERTICAL
146) -> T_obj_list:
147 """
148 Find (imaginary) vertical lines that connect the left, right, or
149 center of at least `word_threshold` words.
150 """
151 # Find words that share the same left, right, or centerpoints
152 by_x0 = utils.cluster_objects(words, itemgetter("x0"), 1)
153 by_x1 = utils.cluster_objects(words, itemgetter("x1"), 1)
155 def get_center(word: T_obj) -> T_num:
156 return float(word["x0"] + word["x1"]) / 2
158 by_center = utils.cluster_objects(words, get_center, 1)
159 clusters = by_x0 + by_x1 + by_center
161 # Find the points that align with the most words
162 sorted_clusters = sorted(clusters, key=lambda x: -len(x))
163 large_clusters = filter(lambda x: len(x) >= word_threshold, sorted_clusters)
165 # For each of those points, find the bboxes fitting all matching words
166 bboxes = list(map(utils.objects_to_bbox, large_clusters))
168 # Iterate through those bboxes, condensing overlapping bboxes
169 condensed_bboxes: List[T_bbox] = []
170 for bbox in bboxes:
171 overlap = any(utils.get_bbox_overlap(bbox, c) for c in condensed_bboxes)
172 if not overlap:
173 condensed_bboxes.append(bbox)
175 if len(condensed_bboxes) == 0:
176 return []
178 condensed_rects = map(utils.bbox_to_rect, condensed_bboxes)
179 sorted_rects = list(sorted(condensed_rects, key=itemgetter("x0")))
181 max_x1 = max(map(itemgetter("x1"), sorted_rects))
182 min_top = min(map(itemgetter("top"), sorted_rects))
183 max_bottom = max(map(itemgetter("bottom"), sorted_rects))
185 return [
186 {
187 "x0": b["x0"],
188 "x1": b["x0"],
189 "top": min_top,
190 "bottom": max_bottom,
191 "height": max_bottom - min_top,
192 "orientation": "v",
193 }
194 for b in sorted_rects
195 ] + [
196 {
197 "x0": max_x1,
198 "x1": max_x1,
199 "top": min_top,
200 "bottom": max_bottom,
201 "height": max_bottom - min_top,
202 "orientation": "v",
203 }
204 ]
207def edges_to_intersections(
208 edges: T_obj_list, x_tolerance: T_num = 1, y_tolerance: T_num = 1
209) -> T_intersections:
210 """
211 Given a list of edges, return the points at which they intersect
212 within `tolerance` pixels.
213 """
214 intersections: T_intersections = {}
215 v_edges, h_edges = [
216 list(filter(lambda x: x["orientation"] == o, edges)) for o in ("v", "h")
217 ]
218 for v in sorted(v_edges, key=itemgetter("x0", "top")):
219 for h in sorted(h_edges, key=itemgetter("top", "x0")):
220 if (
221 (v["top"] <= (h["top"] + y_tolerance))
222 and (v["bottom"] >= (h["top"] - y_tolerance))
223 and (v["x0"] >= (h["x0"] - x_tolerance))
224 and (v["x0"] <= (h["x1"] + x_tolerance))
225 ):
226 vertex = (v["x0"], h["top"])
227 if vertex not in intersections:
228 intersections[vertex] = {"v": [], "h": []}
229 intersections[vertex]["v"].append(v)
230 intersections[vertex]["h"].append(h)
231 return intersections
234def intersections_to_cells(intersections: T_intersections) -> List[T_bbox]:
235 """
236 Given a list of points (`intersections`), return all rectangular "cells"
237 that those points describe.
239 `intersections` should be a dictionary with (x0, top) tuples as keys,
240 and a list of edge objects as values. The edge objects should correspond
241 to the edges that touch the intersection.
242 """
244 def edge_connects(p1: T_point, p2: T_point) -> bool:
245 def edges_to_set(edges: T_obj_list) -> Set[T_bbox]:
246 return set(map(utils.obj_to_bbox, edges))
248 if p1[0] == p2[0]:
249 common = edges_to_set(intersections[p1]["v"]).intersection(
250 edges_to_set(intersections[p2]["v"])
251 )
252 if len(common):
253 return True
255 if p1[1] == p2[1]:
256 common = edges_to_set(intersections[p1]["h"]).intersection(
257 edges_to_set(intersections[p2]["h"])
258 )
259 if len(common):
260 return True
261 return False
263 points = list(sorted(intersections.keys()))
264 n_points = len(points)
266 def find_smallest_cell(points: List[T_point], i: int) -> Optional[T_bbox]:
267 if i == n_points - 1:
268 return None
269 pt = points[i]
270 rest = points[i + 1 :]
271 # Get all the points directly below and directly right
272 below = [x for x in rest if x[0] == pt[0]]
273 right = [x for x in rest if x[1] == pt[1]]
274 for below_pt in below:
275 if not edge_connects(pt, below_pt):
276 continue
278 for right_pt in right:
279 if not edge_connects(pt, right_pt):
280 continue
282 bottom_right = (right_pt[0], below_pt[1])
284 if (
285 (bottom_right in intersections)
286 and edge_connects(bottom_right, right_pt)
287 and edge_connects(bottom_right, below_pt)
288 ):
290 return (pt[0], pt[1], bottom_right[0], bottom_right[1])
291 return None
293 cell_gen = (find_smallest_cell(points, i) for i in range(len(points)))
294 return list(filter(None, cell_gen))
297def cells_to_tables(cells: List[T_bbox]) -> List[List[T_bbox]]:
298 """
299 Given a list of bounding boxes (`cells`), return a list of tables that
300 hold those cells most simply (and contiguously).
301 """
303 def bbox_to_corners(bbox: T_bbox) -> Tuple[T_point, T_point, T_point, T_point]:
304 x0, top, x1, bottom = bbox
305 return ((x0, top), (x0, bottom), (x1, top), (x1, bottom))
307 remaining_cells = list(cells)
309 # Iterate through the cells found above, and assign them
310 # to contiguous tables
312 current_corners: Set[T_point] = set()
313 current_cells: List[T_bbox] = []
315 tables = []
316 while len(remaining_cells):
317 initial_cell_count = len(current_cells)
318 for cell in list(remaining_cells):
319 cell_corners = bbox_to_corners(cell)
320 # If we're just starting a table ...
321 if len(current_cells) == 0:
322 # ... immediately assign it to the empty group
323 current_corners |= set(cell_corners)
324 current_cells.append(cell)
325 remaining_cells.remove(cell)
326 else:
327 # How many corners does this table share with the current group?
328 corner_count = sum(c in current_corners for c in cell_corners)
330 # If touching on at least one corner...
331 if corner_count > 0:
332 # ... assign it to the current group
333 current_corners |= set(cell_corners)
334 current_cells.append(cell)
335 remaining_cells.remove(cell)
337 # If this iteration did not find any more cells to append...
338 if len(current_cells) == initial_cell_count:
339 # ... start a new cell group
340 tables.append(list(current_cells))
341 current_corners.clear()
342 current_cells.clear()
344 # Once we have exhausting the list of cells ...
346 # ... and we have a cell group that has not been stored
347 if len(current_cells):
348 # ... store it.
349 tables.append(list(current_cells))
351 # Sort the tables top-to-bottom-left-to-right based on the value of the
352 # topmost-and-then-leftmost coordinate of a table.
353 _sorted = sorted(tables, key=lambda t: min((c[1], c[0]) for c in t))
354 filtered = [t for t in _sorted if len(t) > 1]
355 return filtered
358class CellGroup(object):
359 def __init__(self, cells: List[Optional[T_bbox]]):
360 self.cells = cells
361 self.bbox = (
362 min(map(itemgetter(0), filter(None, cells))),
363 min(map(itemgetter(1), filter(None, cells))),
364 max(map(itemgetter(2), filter(None, cells))),
365 max(map(itemgetter(3), filter(None, cells))),
366 )
369class Row(CellGroup):
370 pass
373class Column(CellGroup):
374 pass
377class Table(object):
378 def __init__(self, page: "Page", cells: List[T_bbox]):
379 self.page = page
380 self.cells = cells
382 @property
383 def bbox(self) -> T_bbox:
384 c = self.cells
385 return (
386 min(map(itemgetter(0), c)),
387 min(map(itemgetter(1), c)),
388 max(map(itemgetter(2), c)),
389 max(map(itemgetter(3), c)),
390 )
392 def _get_rows_or_cols(self, kind: Type[CellGroup]) -> List[CellGroup]:
393 axis = 0 if kind is Row else 1
394 antiaxis = int(not axis)
396 # Sort first by top/x0, then by x0/top
397 _sorted = sorted(self.cells, key=itemgetter(antiaxis, axis))
399 # Sort get all x0s/tops
400 xs = list(sorted(set(map(itemgetter(axis), self.cells))))
402 # Group by top/x0
403 grouped = itertools.groupby(_sorted, itemgetter(antiaxis))
405 rows = []
406 # for y/x, row/column-cells ...
407 for y, row_cells in grouped:
408 xdict = {cell[axis]: cell for cell in row_cells}
409 row = kind([xdict.get(x) for x in xs])
410 rows.append(row)
411 return rows
413 @property
414 def rows(self) -> List[CellGroup]:
415 return self._get_rows_or_cols(Row)
417 @property
418 def columns(self) -> List[CellGroup]:
419 return self._get_rows_or_cols(Column)
421 def extract(self, **kwargs: Any) -> List[List[Optional[str]]]:
423 chars = self.page.chars
424 table_arr = []
426 def char_in_bbox(char: T_obj, bbox: T_bbox) -> bool:
427 v_mid = (char["top"] + char["bottom"]) / 2
428 h_mid = (char["x0"] + char["x1"]) / 2
429 x0, top, x1, bottom = bbox
430 return bool(
431 (h_mid >= x0) and (h_mid < x1) and (v_mid >= top) and (v_mid < bottom)
432 )
434 for row in self.rows:
435 arr = []
436 row_chars = [char for char in chars if char_in_bbox(char, row.bbox)]
438 for cell in row.cells:
439 if cell is None:
440 cell_text = None
441 else:
442 cell_chars = [
443 char for char in row_chars if char_in_bbox(char, cell)
444 ]
446 if len(cell_chars):
447 if "layout" in kwargs:
448 kwargs["layout_width"] = cell[2] - cell[0]
449 kwargs["layout_height"] = cell[3] - cell[1]
450 kwargs["layout_bbox"] = cell
451 cell_text = utils.extract_text(cell_chars, **kwargs)
452 else:
453 cell_text = ""
454 arr.append(cell_text)
455 table_arr.append(arr)
457 return table_arr
460TABLE_STRATEGIES = ["lines", "lines_strict", "text", "explicit"]
461NON_NEGATIVE_SETTINGS = [
462 "snap_tolerance",
463 "snap_x_tolerance",
464 "snap_y_tolerance",
465 "join_tolerance",
466 "join_x_tolerance",
467 "join_y_tolerance",
468 "edge_min_length",
469 "edge_min_length_prefilter",
470 "min_words_vertical",
471 "min_words_horizontal",
472 "intersection_tolerance",
473 "intersection_x_tolerance",
474 "intersection_y_tolerance",
475]
478class UnsetFloat(float):
479 pass
482UNSET = UnsetFloat(0)
485@dataclass
486class TableSettings:
487 vertical_strategy: str = "lines"
488 horizontal_strategy: str = "lines"
489 explicit_vertical_lines: Optional[List[Union[T_obj, T_num]]] = None
490 explicit_horizontal_lines: Optional[List[Union[T_obj, T_num]]] = None
491 snap_tolerance: T_num = DEFAULT_SNAP_TOLERANCE
492 snap_x_tolerance: T_num = UNSET
493 snap_y_tolerance: T_num = UNSET
494 join_tolerance: T_num = DEFAULT_JOIN_TOLERANCE
495 join_x_tolerance: T_num = UNSET
496 join_y_tolerance: T_num = UNSET
497 edge_min_length: T_num = 3
498 edge_min_length_prefilter: T_num = 1
499 min_words_vertical: int = DEFAULT_MIN_WORDS_VERTICAL
500 min_words_horizontal: int = DEFAULT_MIN_WORDS_HORIZONTAL
501 intersection_tolerance: T_num = 3
502 intersection_x_tolerance: T_num = UNSET
503 intersection_y_tolerance: T_num = UNSET
504 text_settings: Optional[Dict[str, Any]] = None
506 def __post_init__(self) -> None:
507 """Clean up user-provided table settings.
509 Validates that the table settings provided consists of acceptable values and
510 returns a cleaned up version. The cleaned up version fills out the missing
511 values with the default values in the provided settings.
513 TODO: Can be further used to validate that the values are of the correct
514 type. For example, raising a value error when a non-boolean input is
515 provided for the key ``keep_blank_chars``.
517 :param table_settings: User-provided table settings.
518 :returns: A cleaned up version of the user-provided table settings.
519 :raises ValueError: When an unrecognised key is provided.
520 """
522 for setting in NON_NEGATIVE_SETTINGS:
523 if (getattr(self, setting) or 0) < 0:
524 raise ValueError(f"Table setting '{setting}' cannot be negative")
526 for orientation in ["horizontal", "vertical"]:
527 strategy = getattr(self, orientation + "_strategy")
528 if strategy not in TABLE_STRATEGIES:
529 raise ValueError(
530 f"{orientation}_strategy must be one of"
531 f'{{{",".join(TABLE_STRATEGIES)}}}'
532 )
534 if self.text_settings is None:
535 self.text_settings = {}
537 # This next section is for backwards compatibility
538 for attr in ["x_tolerance", "y_tolerance"]:
539 if attr not in self.text_settings:
540 self.text_settings[attr] = self.text_settings.get("tolerance", 3)
542 if "tolerance" in self.text_settings:
543 del self.text_settings["tolerance"]
544 # End of that section
546 for attr, fallback in [
547 ("snap_x_tolerance", "snap_tolerance"),
548 ("snap_y_tolerance", "snap_tolerance"),
549 ("join_x_tolerance", "join_tolerance"),
550 ("join_y_tolerance", "join_tolerance"),
551 ("intersection_x_tolerance", "intersection_tolerance"),
552 ("intersection_y_tolerance", "intersection_tolerance"),
553 ]:
554 if getattr(self, attr) is UNSET:
555 setattr(self, attr, getattr(self, fallback))
557 @classmethod
558 def resolve(cls, settings: Optional[T_table_settings]) -> "TableSettings":
559 if settings is None:
560 return cls()
561 elif isinstance(settings, cls):
562 return settings
563 elif isinstance(settings, dict):
564 core_settings = {}
565 text_settings = {}
566 for k, v in settings.items():
567 if k[:5] == "text_":
568 text_settings[k[5:]] = v
569 else:
570 core_settings[k] = v
571 core_settings["text_settings"] = text_settings
572 return cls(**core_settings)
573 else:
574 raise ValueError(f"Cannot resolve settings: {settings}")
577class TableFinder(object):
578 """
579 Given a PDF page, find plausible table structures.
581 Largely borrowed from Anssi Nurminen's master's thesis:
582 http://dspace.cc.tut.fi/dpub/bitstream/handle/123456789/21520/Nurminen.pdf?sequence=3
584 ... and inspired by Tabula:
585 https://github.com/tabulapdf/tabula-extractor/issues/16
586 """
588 def __init__(self, page: "Page", settings: Optional[T_table_settings] = None):
589 self.page = page
590 self.settings = TableSettings.resolve(settings)
591 self.edges = self.get_edges()
592 self.intersections = edges_to_intersections(
593 self.edges,
594 self.settings.intersection_x_tolerance,
595 self.settings.intersection_y_tolerance,
596 )
597 self.cells = intersections_to_cells(self.intersections)
598 self.tables = [
599 Table(self.page, cell_group) for cell_group in cells_to_tables(self.cells)
600 ]
602 def get_edges(self) -> T_obj_list:
603 settings = self.settings
605 for orientation in ["vertical", "horizontal"]:
606 strategy = getattr(settings, orientation + "_strategy")
607 if strategy == "explicit":
608 lines = getattr(settings, "explicit_" + orientation + "_lines")
609 if len(lines) < 2:
610 raise ValueError(
611 f"If {orientation}_strategy == 'explicit', "
612 f"explicit_{orientation}_lines "
613 f"must be specified as a list/tuple of two or more "
614 f"floats/ints."
615 )
617 v_strat = settings.vertical_strategy
618 h_strat = settings.horizontal_strategy
620 if v_strat == "text" or h_strat == "text":
621 words = self.page.extract_words(**(settings.text_settings or {}))
623 v_explicit = []
624 for desc in settings.explicit_vertical_lines or []:
625 if isinstance(desc, dict):
626 for e in utils.obj_to_edges(desc):
627 if e["orientation"] == "v":
628 v_explicit.append(e)
629 else:
630 v_explicit.append(
631 {
632 "x0": desc,
633 "x1": desc,
634 "top": self.page.bbox[1],
635 "bottom": self.page.bbox[3],
636 "height": self.page.bbox[3] - self.page.bbox[1],
637 "orientation": "v",
638 }
639 )
641 if v_strat == "lines":
642 v_base = utils.filter_edges(
643 self.page.edges, "v", min_length=settings.edge_min_length_prefilter
644 )
645 elif v_strat == "lines_strict":
646 v_base = utils.filter_edges(
647 self.page.edges,
648 "v",
649 edge_type="line",
650 min_length=settings.edge_min_length_prefilter,
651 )
652 elif v_strat == "text":
653 v_base = words_to_edges_v(words, word_threshold=settings.min_words_vertical)
654 elif v_strat == "explicit":
655 v_base = []
657 v = v_base + v_explicit
659 h_explicit = []
660 for desc in settings.explicit_horizontal_lines or []:
661 if isinstance(desc, dict):
662 for e in utils.obj_to_edges(desc):
663 if e["orientation"] == "h":
664 h_explicit.append(e)
665 else:
666 h_explicit.append(
667 {
668 "x0": self.page.bbox[0],
669 "x1": self.page.bbox[2],
670 "width": self.page.bbox[2] - self.page.bbox[0],
671 "top": desc,
672 "bottom": desc,
673 "orientation": "h",
674 }
675 )
677 if h_strat == "lines":
678 h_base = utils.filter_edges(
679 self.page.edges, "h", min_length=settings.edge_min_length_prefilter
680 )
681 elif h_strat == "lines_strict":
682 h_base = utils.filter_edges(
683 self.page.edges,
684 "h",
685 edge_type="line",
686 min_length=settings.edge_min_length_prefilter,
687 )
688 elif h_strat == "text":
689 h_base = words_to_edges_h(
690 words, word_threshold=settings.min_words_horizontal
691 )
692 elif h_strat == "explicit":
693 h_base = []
695 h = h_base + h_explicit
697 edges = list(v) + list(h)
699 edges = merge_edges(
700 edges,
701 snap_x_tolerance=settings.snap_x_tolerance,
702 snap_y_tolerance=settings.snap_y_tolerance,
703 join_x_tolerance=settings.join_x_tolerance,
704 join_y_tolerance=settings.join_y_tolerance,
705 )
707 return utils.filter_edges(edges, min_length=settings.edge_min_length)