Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pandas/io/excel/_odfreader.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

141 statements  

1from __future__ import annotations 

2 

3from typing import ( 

4 TYPE_CHECKING, 

5 cast, 

6) 

7 

8import numpy as np 

9 

10from pandas._typing import ( 

11 FilePath, 

12 ReadBuffer, 

13 Scalar, 

14 StorageOptions, 

15) 

16from pandas.compat._optional import import_optional_dependency 

17from pandas.util._decorators import doc 

18 

19import pandas as pd 

20from pandas.core.shared_docs import _shared_docs 

21 

22from pandas.io.excel._base import BaseExcelReader 

23 

24if TYPE_CHECKING: 

25 from pandas._libs.tslibs.nattype import NaTType 

26 

27 

28@doc(storage_options=_shared_docs["storage_options"]) 

29class ODFReader(BaseExcelReader): 

30 def __init__( 

31 self, 

32 filepath_or_buffer: FilePath | ReadBuffer[bytes], 

33 storage_options: StorageOptions = None, 

34 ) -> None: 

35 """ 

36 Read tables out of OpenDocument formatted files. 

37 

38 Parameters 

39 ---------- 

40 filepath_or_buffer : str, path to be parsed or 

41 an open readable stream. 

42 {storage_options} 

43 """ 

44 import_optional_dependency("odf") 

45 super().__init__(filepath_or_buffer, storage_options=storage_options) 

46 

47 @property 

48 def _workbook_class(self): 

49 from odf.opendocument import OpenDocument 

50 

51 return OpenDocument 

52 

53 def load_workbook(self, filepath_or_buffer: FilePath | ReadBuffer[bytes]): 

54 from odf.opendocument import load 

55 

56 return load(filepath_or_buffer) 

57 

58 @property 

59 def empty_value(self) -> str: 

60 """Property for compat with other readers.""" 

61 return "" 

62 

63 @property 

64 def sheet_names(self) -> list[str]: 

65 """Return a list of sheet names present in the document""" 

66 from odf.table import Table 

67 

68 tables = self.book.getElementsByType(Table) 

69 return [t.getAttribute("name") for t in tables] 

70 

71 def get_sheet_by_index(self, index: int): 

72 from odf.table import Table 

73 

74 self.raise_if_bad_sheet_by_index(index) 

75 tables = self.book.getElementsByType(Table) 

76 return tables[index] 

77 

78 def get_sheet_by_name(self, name: str): 

79 from odf.table import Table 

80 

81 self.raise_if_bad_sheet_by_name(name) 

82 tables = self.book.getElementsByType(Table) 

83 

84 for table in tables: 

85 if table.getAttribute("name") == name: 

86 return table 

87 

88 self.close() 

89 raise ValueError(f"sheet {name} not found") 

90 

91 def get_sheet_data( 

92 self, sheet, file_rows_needed: int | None = None 

93 ) -> list[list[Scalar | NaTType]]: 

94 """ 

95 Parse an ODF Table into a list of lists 

96 """ 

97 from odf.table import ( 

98 CoveredTableCell, 

99 TableCell, 

100 TableRow, 

101 ) 

102 

103 covered_cell_name = CoveredTableCell().qname 

104 table_cell_name = TableCell().qname 

105 cell_names = {covered_cell_name, table_cell_name} 

106 

107 sheet_rows = sheet.getElementsByType(TableRow) 

108 empty_rows = 0 

109 max_row_len = 0 

110 

111 table: list[list[Scalar | NaTType]] = [] 

112 

113 for sheet_row in sheet_rows: 

114 sheet_cells = [ 

115 x 

116 for x in sheet_row.childNodes 

117 if hasattr(x, "qname") and x.qname in cell_names 

118 ] 

119 empty_cells = 0 

120 table_row: list[Scalar | NaTType] = [] 

121 

122 for sheet_cell in sheet_cells: 

123 if sheet_cell.qname == table_cell_name: 

124 value = self._get_cell_value(sheet_cell) 

125 else: 

126 value = self.empty_value 

127 

128 column_repeat = self._get_column_repeat(sheet_cell) 

129 

130 # Queue up empty values, writing only if content succeeds them 

131 if value == self.empty_value: 

132 empty_cells += column_repeat 

133 else: 

134 table_row.extend([self.empty_value] * empty_cells) 

135 empty_cells = 0 

136 table_row.extend([value] * column_repeat) 

137 

138 if max_row_len < len(table_row): 

139 max_row_len = len(table_row) 

140 

141 row_repeat = self._get_row_repeat(sheet_row) 

142 if self._is_empty_row(sheet_row): 

143 empty_rows += row_repeat 

144 else: 

145 # add blank rows to our table 

146 table.extend([[self.empty_value]] * empty_rows) 

147 empty_rows = 0 

148 for _ in range(row_repeat): 

149 table.append(table_row) 

150 if file_rows_needed is not None and len(table) >= file_rows_needed: 

151 break 

152 

153 # Make our table square 

154 for row in table: 

155 if len(row) < max_row_len: 

156 row.extend([self.empty_value] * (max_row_len - len(row))) 

157 

158 return table 

159 

160 def _get_row_repeat(self, row) -> int: 

161 """ 

162 Return number of times this row was repeated 

163 Repeating an empty row appeared to be a common way 

164 of representing sparse rows in the table. 

165 """ 

166 from odf.namespaces import TABLENS 

167 

168 return int(row.attributes.get((TABLENS, "number-rows-repeated"), 1)) 

169 

170 def _get_column_repeat(self, cell) -> int: 

171 from odf.namespaces import TABLENS 

172 

173 return int(cell.attributes.get((TABLENS, "number-columns-repeated"), 1)) 

174 

175 def _is_empty_row(self, row) -> bool: 

176 """ 

177 Helper function to find empty rows 

178 """ 

179 for column in row.childNodes: 

180 if len(column.childNodes) > 0: 

181 return False 

182 

183 return True 

184 

185 def _get_cell_value(self, cell) -> Scalar | NaTType: 

186 from odf.namespaces import OFFICENS 

187 

188 if str(cell) == "#N/A": 

189 return np.nan 

190 

191 cell_type = cell.attributes.get((OFFICENS, "value-type")) 

192 if cell_type == "boolean": 

193 if str(cell) == "TRUE": 

194 return True 

195 return False 

196 if cell_type is None: 

197 return self.empty_value 

198 elif cell_type == "float": 

199 # GH5394 

200 cell_value = float(cell.attributes.get((OFFICENS, "value"))) 

201 val = int(cell_value) 

202 if val == cell_value: 

203 return val 

204 return cell_value 

205 elif cell_type == "percentage": 

206 cell_value = cell.attributes.get((OFFICENS, "value")) 

207 return float(cell_value) 

208 elif cell_type == "string": 

209 return self._get_cell_string_value(cell) 

210 elif cell_type == "currency": 

211 cell_value = cell.attributes.get((OFFICENS, "value")) 

212 return float(cell_value) 

213 elif cell_type == "date": 

214 cell_value = cell.attributes.get((OFFICENS, "date-value")) 

215 return pd.Timestamp(cell_value) 

216 elif cell_type == "time": 

217 stamp = pd.Timestamp(str(cell)) 

218 # cast needed here because Scalar doesn't include datetime.time 

219 return cast(Scalar, stamp.time()) 

220 else: 

221 self.close() 

222 raise ValueError(f"Unrecognized type {cell_type}") 

223 

224 def _get_cell_string_value(self, cell) -> str: 

225 """ 

226 Find and decode OpenDocument text:s tags that represent 

227 a run length encoded sequence of space characters. 

228 """ 

229 from odf.element import Element 

230 from odf.namespaces import TEXTNS 

231 from odf.text import S 

232 

233 text_s = S().qname 

234 

235 value = [] 

236 

237 for fragment in cell.childNodes: 

238 if isinstance(fragment, Element): 

239 if fragment.qname == text_s: 

240 spaces = int(fragment.attributes.get((TEXTNS, "c"), 1)) 

241 value.append(" " * spaces) 

242 else: 

243 # recursive impl needed in case of nested fragments 

244 # with multiple spaces 

245 # https://github.com/pandas-dev/pandas/pull/36175#discussion_r484639704 

246 value.append(self._get_cell_string_value(fragment)) 

247 else: 

248 value.append(str(fragment).strip("\n")) 

249 return "".join(value)