Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.9/dist-packages/pandas/io/excel/_odfreader.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

140 statements  

1from __future__ import annotations 

2 

3from typing import ( 

4 TYPE_CHECKING, 

5 cast, 

6) 

7 

8import numpy as np 

9 

10from pandas._typing import ( 

11 FilePath, 

12 ReadBuffer, 

13 Scalar, 

14 StorageOptions, 

15) 

16from pandas.compat._optional import import_optional_dependency 

17from pandas.util._decorators import doc 

18 

19import pandas as pd 

20from pandas.core.shared_docs import _shared_docs 

21 

22from pandas.io.excel._base import BaseExcelReader 

23 

24if TYPE_CHECKING: 

25 from odf.opendocument import OpenDocument 

26 

27 from pandas._libs.tslibs.nattype import NaTType 

28 

29 

30@doc(storage_options=_shared_docs["storage_options"]) 

31class ODFReader(BaseExcelReader["OpenDocument"]): 

32 def __init__( 

33 self, 

34 filepath_or_buffer: FilePath | ReadBuffer[bytes], 

35 storage_options: StorageOptions | None = None, 

36 engine_kwargs: dict | None = None, 

37 ) -> None: 

38 """ 

39 Read tables out of OpenDocument formatted files. 

40 

41 Parameters 

42 ---------- 

43 filepath_or_buffer : str, path to be parsed or 

44 an open readable stream. 

45 {storage_options} 

46 engine_kwargs : dict, optional 

47 Arbitrary keyword arguments passed to excel engine. 

48 """ 

49 import_optional_dependency("odf") 

50 super().__init__( 

51 filepath_or_buffer, 

52 storage_options=storage_options, 

53 engine_kwargs=engine_kwargs, 

54 ) 

55 

56 @property 

57 def _workbook_class(self) -> type[OpenDocument]: 

58 from odf.opendocument import OpenDocument 

59 

60 return OpenDocument 

61 

62 def load_workbook( 

63 self, filepath_or_buffer: FilePath | ReadBuffer[bytes], engine_kwargs 

64 ) -> OpenDocument: 

65 from odf.opendocument import load 

66 

67 return load(filepath_or_buffer, **engine_kwargs) 

68 

69 @property 

70 def empty_value(self) -> str: 

71 """Property for compat with other readers.""" 

72 return "" 

73 

74 @property 

75 def sheet_names(self) -> list[str]: 

76 """Return a list of sheet names present in the document""" 

77 from odf.table import Table 

78 

79 tables = self.book.getElementsByType(Table) 

80 return [t.getAttribute("name") for t in tables] 

81 

82 def get_sheet_by_index(self, index: int): 

83 from odf.table import Table 

84 

85 self.raise_if_bad_sheet_by_index(index) 

86 tables = self.book.getElementsByType(Table) 

87 return tables[index] 

88 

89 def get_sheet_by_name(self, name: str): 

90 from odf.table import Table 

91 

92 self.raise_if_bad_sheet_by_name(name) 

93 tables = self.book.getElementsByType(Table) 

94 

95 for table in tables: 

96 if table.getAttribute("name") == name: 

97 return table 

98 

99 self.close() 

100 raise ValueError(f"sheet {name} not found") 

101 

102 def get_sheet_data( 

103 self, sheet, file_rows_needed: int | None = None 

104 ) -> list[list[Scalar | NaTType]]: 

105 """ 

106 Parse an ODF Table into a list of lists 

107 """ 

108 from odf.table import ( 

109 CoveredTableCell, 

110 TableCell, 

111 TableRow, 

112 ) 

113 

114 covered_cell_name = CoveredTableCell().qname 

115 table_cell_name = TableCell().qname 

116 cell_names = {covered_cell_name, table_cell_name} 

117 

118 sheet_rows = sheet.getElementsByType(TableRow) 

119 empty_rows = 0 

120 max_row_len = 0 

121 

122 table: list[list[Scalar | NaTType]] = [] 

123 

124 for sheet_row in sheet_rows: 

125 sheet_cells = [ 

126 x 

127 for x in sheet_row.childNodes 

128 if hasattr(x, "qname") and x.qname in cell_names 

129 ] 

130 empty_cells = 0 

131 table_row: list[Scalar | NaTType] = [] 

132 

133 for sheet_cell in sheet_cells: 

134 if sheet_cell.qname == table_cell_name: 

135 value = self._get_cell_value(sheet_cell) 

136 else: 

137 value = self.empty_value 

138 

139 column_repeat = self._get_column_repeat(sheet_cell) 

140 

141 # Queue up empty values, writing only if content succeeds them 

142 if value == self.empty_value: 

143 empty_cells += column_repeat 

144 else: 

145 table_row.extend([self.empty_value] * empty_cells) 

146 empty_cells = 0 

147 table_row.extend([value] * column_repeat) 

148 

149 if max_row_len < len(table_row): 

150 max_row_len = len(table_row) 

151 

152 row_repeat = self._get_row_repeat(sheet_row) 

153 if len(table_row) == 0: 

154 empty_rows += row_repeat 

155 else: 

156 # add blank rows to our table 

157 table.extend([[self.empty_value]] * empty_rows) 

158 empty_rows = 0 

159 table.extend(table_row for _ in range(row_repeat)) 

160 if file_rows_needed is not None and len(table) >= file_rows_needed: 

161 break 

162 

163 # Make our table square 

164 for row in table: 

165 if len(row) < max_row_len: 

166 row.extend([self.empty_value] * (max_row_len - len(row))) 

167 

168 return table 

169 

170 def _get_row_repeat(self, row) -> int: 

171 """ 

172 Return number of times this row was repeated 

173 Repeating an empty row appeared to be a common way 

174 of representing sparse rows in the table. 

175 """ 

176 from odf.namespaces import TABLENS 

177 

178 return int(row.attributes.get((TABLENS, "number-rows-repeated"), 1)) 

179 

180 def _get_column_repeat(self, cell) -> int: 

181 from odf.namespaces import TABLENS 

182 

183 return int(cell.attributes.get((TABLENS, "number-columns-repeated"), 1)) 

184 

185 def _get_cell_value(self, cell) -> Scalar | NaTType: 

186 from odf.namespaces import OFFICENS 

187 

188 if str(cell) == "#N/A": 

189 return np.nan 

190 

191 cell_type = cell.attributes.get((OFFICENS, "value-type")) 

192 if cell_type == "boolean": 

193 if str(cell) == "TRUE": 

194 return True 

195 return False 

196 if cell_type is None: 

197 return self.empty_value 

198 elif cell_type == "float": 

199 # GH5394 

200 cell_value = float(cell.attributes.get((OFFICENS, "value"))) 

201 val = int(cell_value) 

202 if val == cell_value: 

203 return val 

204 return cell_value 

205 elif cell_type == "percentage": 

206 cell_value = cell.attributes.get((OFFICENS, "value")) 

207 return float(cell_value) 

208 elif cell_type == "string": 

209 return self._get_cell_string_value(cell) 

210 elif cell_type == "currency": 

211 cell_value = cell.attributes.get((OFFICENS, "value")) 

212 return float(cell_value) 

213 elif cell_type == "date": 

214 cell_value = cell.attributes.get((OFFICENS, "date-value")) 

215 return pd.Timestamp(cell_value) 

216 elif cell_type == "time": 

217 stamp = pd.Timestamp(str(cell)) 

218 # cast needed here because Scalar doesn't include datetime.time 

219 return cast(Scalar, stamp.time()) 

220 else: 

221 self.close() 

222 raise ValueError(f"Unrecognized type {cell_type}") 

223 

224 def _get_cell_string_value(self, cell) -> str: 

225 """ 

226 Find and decode OpenDocument text:s tags that represent 

227 a run length encoded sequence of space characters. 

228 """ 

229 from odf.element import Element 

230 from odf.namespaces import TEXTNS 

231 from odf.office import Annotation 

232 from odf.text import S 

233 

234 office_annotation = Annotation().qname 

235 text_s = S().qname 

236 

237 value = [] 

238 

239 for fragment in cell.childNodes: 

240 if isinstance(fragment, Element): 

241 if fragment.qname == text_s: 

242 spaces = int(fragment.attributes.get((TEXTNS, "c"), 1)) 

243 value.append(" " * spaces) 

244 elif fragment.qname == office_annotation: 

245 continue 

246 else: 

247 # recursive impl needed in case of nested fragments 

248 # with multiple spaces 

249 # https://github.com/pandas-dev/pandas/pull/36175#discussion_r484639704 

250 value.append(self._get_cell_string_value(fragment)) 

251 else: 

252 value.append(str(fragment).strip("\n")) 

253 return "".join(value)