Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pdfplumber/pdf.py: 72%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

120 statements  

1import itertools 

2import logging 

3import pathlib 

4from io import BufferedReader, BytesIO 

5from types import TracebackType 

6from typing import Any, Dict, Generator, List, Literal, Optional, Tuple, Type, Union 

7 

8from pdfminer.layout import LAParams 

9from pdfminer.pdfdocument import PDFDocument 

10from pdfminer.pdfinterp import PDFResourceManager 

11from pdfminer.pdfpage import PDFPage 

12from pdfminer.pdfparser import PDFParser 

13 

14from ._typing import T_num, T_obj_list 

15from .container import Container 

16from .page import Page 

17from .repair import T_repair_setting, _repair 

18from .structure import PDFStructTree, StructTreeMissing 

19from .utils import resolve_and_decode 

20from .utils.exceptions import PdfminerException 

21 

22logger = logging.getLogger(__name__) 

23 

24 

25class PDF(Container): 

26 cached_properties: List[str] = Container.cached_properties + ["_pages"] 

27 

28 def __init__( 

29 self, 

30 stream: Union[BufferedReader, BytesIO], 

31 stream_is_external: bool = False, 

32 path: Optional[pathlib.Path] = None, 

33 pages: Optional[Union[List[int], Tuple[int]]] = None, 

34 laparams: Optional[Dict[str, Any]] = None, 

35 password: Optional[str] = None, 

36 strict_metadata: bool = False, 

37 unicode_norm: Optional[Literal["NFC", "NFKC", "NFD", "NFKD"]] = None, 

38 raise_unicode_errors: bool = True, 

39 ): 

40 self.stream = stream 

41 self.stream_is_external = stream_is_external 

42 self.path = path 

43 self.pages_to_parse = pages 

44 self.laparams = None if laparams is None else LAParams(**laparams) 

45 self.password = password 

46 self.unicode_norm = unicode_norm 

47 self.raise_unicode_errors = raise_unicode_errors 

48 

49 try: 

50 self.doc = PDFDocument(PDFParser(stream), password=password or "") 

51 except Exception as e: 

52 raise PdfminerException(e) 

53 self.rsrcmgr = PDFResourceManager() 

54 self.metadata = {} 

55 

56 for info in self.doc.info: 

57 self.metadata.update(info) 

58 for k, v in self.metadata.items(): 

59 try: 

60 self.metadata[k] = resolve_and_decode(v) 

61 except Exception as e: # pragma: nocover 

62 if strict_metadata: 

63 # Raise an exception since unable to resolve the metadata value. 

64 raise 

65 # This metadata value could not be parsed. Instead of failing the PDF 

66 # read, treat it as a warning only if `strict_metadata=False`. 

67 logger.warning( 

68 f'[WARNING] Metadata key "{k}" could not be parsed due to ' 

69 f"exception: {str(e)}" 

70 ) 

71 

72 @classmethod 

73 def open( 

74 cls, 

75 path_or_fp: Union[str, pathlib.Path, BufferedReader, BytesIO], 

76 pages: Optional[Union[List[int], Tuple[int]]] = None, 

77 laparams: Optional[Dict[str, Any]] = None, 

78 password: Optional[str] = None, 

79 strict_metadata: bool = False, 

80 unicode_norm: Optional[Literal["NFC", "NFKC", "NFD", "NFKD"]] = None, 

81 repair: bool = False, 

82 gs_path: Optional[Union[str, pathlib.Path]] = None, 

83 repair_setting: T_repair_setting = "default", 

84 raise_unicode_errors: bool = True, 

85 ) -> "PDF": 

86 

87 stream: Union[BufferedReader, BytesIO] 

88 

89 if repair: 

90 stream = _repair( 

91 path_or_fp, password=password, gs_path=gs_path, setting=repair_setting 

92 ) 

93 stream_is_external = False 

94 # Although the original file has a path, 

95 # the repaired version does not 

96 path = None 

97 elif isinstance(path_or_fp, (str, pathlib.Path)): 

98 stream = open(path_or_fp, "rb") 

99 stream_is_external = False 

100 path = pathlib.Path(path_or_fp) 

101 else: 

102 stream = path_or_fp 

103 stream_is_external = True 

104 path = None 

105 

106 try: 

107 return cls( 

108 stream, 

109 path=path, 

110 pages=pages, 

111 laparams=laparams, 

112 password=password, 

113 strict_metadata=strict_metadata, 

114 unicode_norm=unicode_norm, 

115 stream_is_external=stream_is_external, 

116 raise_unicode_errors=raise_unicode_errors, 

117 ) 

118 

119 except PdfminerException: 

120 if not stream_is_external: 

121 stream.close() 

122 raise 

123 

124 def close(self) -> None: 

125 self.flush_cache() 

126 

127 for page in self.pages: 

128 page.close() 

129 

130 if not self.stream_is_external: 

131 self.stream.close() 

132 

133 def __enter__(self) -> "PDF": 

134 return self 

135 

136 def __exit__( 

137 self, 

138 t: Optional[Type[BaseException]], 

139 value: Optional[BaseException], 

140 traceback: Optional[TracebackType], 

141 ) -> None: 

142 self.close() 

143 

144 @property 

145 def pages(self) -> List[Page]: 

146 if hasattr(self, "_pages"): 

147 return self._pages 

148 

149 doctop: T_num = 0 

150 pp = self.pages_to_parse 

151 self._pages: List[Page] = [] 

152 

153 def iter_pages() -> Generator[PDFPage, None, None]: 

154 gen = PDFPage.create_pages(self.doc) 

155 while True: 

156 try: 

157 yield next(gen) 

158 except StopIteration: 

159 break 

160 except Exception as e: 

161 raise PdfminerException(e) 

162 

163 for i, page in enumerate(iter_pages()): 

164 page_number = i + 1 

165 if pp is not None and page_number not in pp: 

166 continue 

167 p = Page(self, page, page_number=page_number, initial_doctop=doctop) 

168 self._pages.append(p) 

169 doctop += p.height 

170 return self._pages 

171 

172 @property 

173 def objects(self) -> Dict[str, T_obj_list]: 

174 if hasattr(self, "_objects"): 

175 return self._objects 

176 all_objects: Dict[str, T_obj_list] = {} 

177 for p in self.pages: 

178 for kind in p.objects.keys(): 

179 all_objects[kind] = all_objects.get(kind, []) + p.objects[kind] 

180 self._objects: Dict[str, T_obj_list] = all_objects 

181 return self._objects 

182 

183 @property 

184 def annots(self) -> List[Dict[str, Any]]: 

185 gen = (p.annots for p in self.pages) 

186 return list(itertools.chain(*gen)) 

187 

188 @property 

189 def hyperlinks(self) -> List[Dict[str, Any]]: 

190 gen = (p.hyperlinks for p in self.pages) 

191 return list(itertools.chain(*gen)) 

192 

193 @property 

194 def structure_tree(self) -> List[Dict[str, Any]]: 

195 """Return the structure tree for the document.""" 

196 try: 

197 return [elem.to_dict() for elem in PDFStructTree(self)] 

198 except StructTreeMissing: 

199 return [] 

200 

201 def to_dict(self, object_types: Optional[List[str]] = None) -> Dict[str, Any]: 

202 return { 

203 "metadata": self.metadata, 

204 "pages": [page.to_dict(object_types) for page in self.pages], 

205 }