1from __future__ import annotations
2
3from typing import (
4 TYPE_CHECKING,
5 cast,
6)
7
8import numpy as np
9
10from pandas._typing import (
11 FilePath,
12 ReadBuffer,
13 Scalar,
14 StorageOptions,
15)
16from pandas.compat._optional import import_optional_dependency
17from pandas.util._decorators import doc
18
19import pandas as pd
20from pandas.core.shared_docs import _shared_docs
21
22from pandas.io.excel._base import BaseExcelReader
23
24if TYPE_CHECKING:
25 from odf.opendocument import OpenDocument
26
27 from pandas._libs.tslibs.nattype import NaTType
28
29
30@doc(storage_options=_shared_docs["storage_options"])
31class ODFReader(BaseExcelReader["OpenDocument"]):
32 def __init__(
33 self,
34 filepath_or_buffer: FilePath | ReadBuffer[bytes],
35 storage_options: StorageOptions | None = None,
36 engine_kwargs: dict | None = None,
37 ) -> None:
38 """
39 Read tables out of OpenDocument formatted files.
40
41 Parameters
42 ----------
43 filepath_or_buffer : str, path to be parsed or
44 an open readable stream.
45 {storage_options}
46 engine_kwargs : dict, optional
47 Arbitrary keyword arguments passed to excel engine.
48 """
49 import_optional_dependency("odf")
50 super().__init__(
51 filepath_or_buffer,
52 storage_options=storage_options,
53 engine_kwargs=engine_kwargs,
54 )
55
56 @property
57 def _workbook_class(self) -> type[OpenDocument]:
58 from odf.opendocument import OpenDocument
59
60 return OpenDocument
61
62 def load_workbook(
63 self, filepath_or_buffer: FilePath | ReadBuffer[bytes], engine_kwargs
64 ) -> OpenDocument:
65 from odf.opendocument import load
66
67 return load(filepath_or_buffer, **engine_kwargs)
68
69 @property
70 def empty_value(self) -> str:
71 """Property for compat with other readers."""
72 return ""
73
74 @property
75 def sheet_names(self) -> list[str]:
76 """Return a list of sheet names present in the document"""
77 from odf.table import Table
78
79 tables = self.book.getElementsByType(Table)
80 return [t.getAttribute("name") for t in tables]
81
82 def get_sheet_by_index(self, index: int):
83 from odf.table import Table
84
85 self.raise_if_bad_sheet_by_index(index)
86 tables = self.book.getElementsByType(Table)
87 return tables[index]
88
89 def get_sheet_by_name(self, name: str):
90 from odf.table import Table
91
92 self.raise_if_bad_sheet_by_name(name)
93 tables = self.book.getElementsByType(Table)
94
95 for table in tables:
96 if table.getAttribute("name") == name:
97 return table
98
99 self.close()
100 raise ValueError(f"sheet {name} not found")
101
102 def get_sheet_data(
103 self, sheet, file_rows_needed: int | None = None
104 ) -> list[list[Scalar | NaTType]]:
105 """
106 Parse an ODF Table into a list of lists
107 """
108 from odf.table import (
109 CoveredTableCell,
110 TableCell,
111 TableRow,
112 )
113
114 covered_cell_name = CoveredTableCell().qname
115 table_cell_name = TableCell().qname
116 cell_names = {covered_cell_name, table_cell_name}
117
118 sheet_rows = sheet.getElementsByType(TableRow)
119 empty_rows = 0
120 max_row_len = 0
121
122 table: list[list[Scalar | NaTType]] = []
123
124 for sheet_row in sheet_rows:
125 sheet_cells = [
126 x
127 for x in sheet_row.childNodes
128 if hasattr(x, "qname") and x.qname in cell_names
129 ]
130 empty_cells = 0
131 table_row: list[Scalar | NaTType] = []
132
133 for sheet_cell in sheet_cells:
134 if sheet_cell.qname == table_cell_name:
135 value = self._get_cell_value(sheet_cell)
136 else:
137 value = self.empty_value
138
139 column_repeat = self._get_column_repeat(sheet_cell)
140
141 # Queue up empty values, writing only if content succeeds them
142 if value == self.empty_value:
143 empty_cells += column_repeat
144 else:
145 table_row.extend([self.empty_value] * empty_cells)
146 empty_cells = 0
147 table_row.extend([value] * column_repeat)
148
149 if max_row_len < len(table_row):
150 max_row_len = len(table_row)
151
152 row_repeat = self._get_row_repeat(sheet_row)
153 if len(table_row) == 0:
154 empty_rows += row_repeat
155 else:
156 # add blank rows to our table
157 table.extend([[self.empty_value]] * empty_rows)
158 empty_rows = 0
159 table.extend(table_row for _ in range(row_repeat))
160 if file_rows_needed is not None and len(table) >= file_rows_needed:
161 break
162
163 # Make our table square
164 for row in table:
165 if len(row) < max_row_len:
166 row.extend([self.empty_value] * (max_row_len - len(row)))
167
168 return table
169
170 def _get_row_repeat(self, row) -> int:
171 """
172 Return number of times this row was repeated
173 Repeating an empty row appeared to be a common way
174 of representing sparse rows in the table.
175 """
176 from odf.namespaces import TABLENS
177
178 return int(row.attributes.get((TABLENS, "number-rows-repeated"), 1))
179
180 def _get_column_repeat(self, cell) -> int:
181 from odf.namespaces import TABLENS
182
183 return int(cell.attributes.get((TABLENS, "number-columns-repeated"), 1))
184
185 def _get_cell_value(self, cell) -> Scalar | NaTType:
186 from odf.namespaces import OFFICENS
187
188 if str(cell) == "#N/A":
189 return np.nan
190
191 cell_type = cell.attributes.get((OFFICENS, "value-type"))
192 if cell_type == "boolean":
193 if str(cell) == "TRUE":
194 return True
195 return False
196 if cell_type is None:
197 return self.empty_value
198 elif cell_type == "float":
199 # GH5394
200 cell_value = float(cell.attributes.get((OFFICENS, "value")))
201 val = int(cell_value)
202 if val == cell_value:
203 return val
204 return cell_value
205 elif cell_type == "percentage":
206 cell_value = cell.attributes.get((OFFICENS, "value"))
207 return float(cell_value)
208 elif cell_type == "string":
209 return self._get_cell_string_value(cell)
210 elif cell_type == "currency":
211 cell_value = cell.attributes.get((OFFICENS, "value"))
212 return float(cell_value)
213 elif cell_type == "date":
214 cell_value = cell.attributes.get((OFFICENS, "date-value"))
215 return pd.Timestamp(cell_value)
216 elif cell_type == "time":
217 stamp = pd.Timestamp(str(cell))
218 # cast needed here because Scalar doesn't include datetime.time
219 return cast(Scalar, stamp.time())
220 else:
221 self.close()
222 raise ValueError(f"Unrecognized type {cell_type}")
223
224 def _get_cell_string_value(self, cell) -> str:
225 """
226 Find and decode OpenDocument text:s tags that represent
227 a run length encoded sequence of space characters.
228 """
229 from odf.element import Element
230 from odf.namespaces import TEXTNS
231 from odf.office import Annotation
232 from odf.text import S
233
234 office_annotation = Annotation().qname
235 text_s = S().qname
236
237 value = []
238
239 for fragment in cell.childNodes:
240 if isinstance(fragment, Element):
241 if fragment.qname == text_s:
242 spaces = int(fragment.attributes.get((TEXTNS, "c"), 1))
243 value.append(" " * spaces)
244 elif fragment.qname == office_annotation:
245 continue
246 else:
247 # recursive impl needed in case of nested fragments
248 # with multiple spaces
249 # https://github.com/pandas-dev/pandas/pull/36175#discussion_r484639704
250 value.append(self._get_cell_string_value(fragment))
251 else:
252 value.append(str(fragment).strip("\n"))
253 return "".join(value)