1from __future__ import annotations
2
3from typing import (
4 TYPE_CHECKING,
5 cast,
6)
7
8import numpy as np
9
10from pandas._typing import (
11 FilePath,
12 ReadBuffer,
13 Scalar,
14 StorageOptions,
15)
16from pandas.compat._optional import import_optional_dependency
17from pandas.util._decorators import doc
18
19import pandas as pd
20from pandas.core.shared_docs import _shared_docs
21
22from pandas.io.excel._base import BaseExcelReader
23
24if TYPE_CHECKING:
25 from pandas._libs.tslibs.nattype import NaTType
26
27
28@doc(storage_options=_shared_docs["storage_options"])
29class ODFReader(BaseExcelReader):
30 def __init__(
31 self,
32 filepath_or_buffer: FilePath | ReadBuffer[bytes],
33 storage_options: StorageOptions = None,
34 ) -> None:
35 """
36 Read tables out of OpenDocument formatted files.
37
38 Parameters
39 ----------
40 filepath_or_buffer : str, path to be parsed or
41 an open readable stream.
42 {storage_options}
43 """
44 import_optional_dependency("odf")
45 super().__init__(filepath_or_buffer, storage_options=storage_options)
46
47 @property
48 def _workbook_class(self):
49 from odf.opendocument import OpenDocument
50
51 return OpenDocument
52
53 def load_workbook(self, filepath_or_buffer: FilePath | ReadBuffer[bytes]):
54 from odf.opendocument import load
55
56 return load(filepath_or_buffer)
57
58 @property
59 def empty_value(self) -> str:
60 """Property for compat with other readers."""
61 return ""
62
63 @property
64 def sheet_names(self) -> list[str]:
65 """Return a list of sheet names present in the document"""
66 from odf.table import Table
67
68 tables = self.book.getElementsByType(Table)
69 return [t.getAttribute("name") for t in tables]
70
71 def get_sheet_by_index(self, index: int):
72 from odf.table import Table
73
74 self.raise_if_bad_sheet_by_index(index)
75 tables = self.book.getElementsByType(Table)
76 return tables[index]
77
78 def get_sheet_by_name(self, name: str):
79 from odf.table import Table
80
81 self.raise_if_bad_sheet_by_name(name)
82 tables = self.book.getElementsByType(Table)
83
84 for table in tables:
85 if table.getAttribute("name") == name:
86 return table
87
88 self.close()
89 raise ValueError(f"sheet {name} not found")
90
91 def get_sheet_data(
92 self, sheet, file_rows_needed: int | None = None
93 ) -> list[list[Scalar | NaTType]]:
94 """
95 Parse an ODF Table into a list of lists
96 """
97 from odf.table import (
98 CoveredTableCell,
99 TableCell,
100 TableRow,
101 )
102
103 covered_cell_name = CoveredTableCell().qname
104 table_cell_name = TableCell().qname
105 cell_names = {covered_cell_name, table_cell_name}
106
107 sheet_rows = sheet.getElementsByType(TableRow)
108 empty_rows = 0
109 max_row_len = 0
110
111 table: list[list[Scalar | NaTType]] = []
112
113 for sheet_row in sheet_rows:
114 sheet_cells = [
115 x
116 for x in sheet_row.childNodes
117 if hasattr(x, "qname") and x.qname in cell_names
118 ]
119 empty_cells = 0
120 table_row: list[Scalar | NaTType] = []
121
122 for sheet_cell in sheet_cells:
123 if sheet_cell.qname == table_cell_name:
124 value = self._get_cell_value(sheet_cell)
125 else:
126 value = self.empty_value
127
128 column_repeat = self._get_column_repeat(sheet_cell)
129
130 # Queue up empty values, writing only if content succeeds them
131 if value == self.empty_value:
132 empty_cells += column_repeat
133 else:
134 table_row.extend([self.empty_value] * empty_cells)
135 empty_cells = 0
136 table_row.extend([value] * column_repeat)
137
138 if max_row_len < len(table_row):
139 max_row_len = len(table_row)
140
141 row_repeat = self._get_row_repeat(sheet_row)
142 if self._is_empty_row(sheet_row):
143 empty_rows += row_repeat
144 else:
145 # add blank rows to our table
146 table.extend([[self.empty_value]] * empty_rows)
147 empty_rows = 0
148 for _ in range(row_repeat):
149 table.append(table_row)
150 if file_rows_needed is not None and len(table) >= file_rows_needed:
151 break
152
153 # Make our table square
154 for row in table:
155 if len(row) < max_row_len:
156 row.extend([self.empty_value] * (max_row_len - len(row)))
157
158 return table
159
160 def _get_row_repeat(self, row) -> int:
161 """
162 Return number of times this row was repeated
163 Repeating an empty row appeared to be a common way
164 of representing sparse rows in the table.
165 """
166 from odf.namespaces import TABLENS
167
168 return int(row.attributes.get((TABLENS, "number-rows-repeated"), 1))
169
170 def _get_column_repeat(self, cell) -> int:
171 from odf.namespaces import TABLENS
172
173 return int(cell.attributes.get((TABLENS, "number-columns-repeated"), 1))
174
175 def _is_empty_row(self, row) -> bool:
176 """
177 Helper function to find empty rows
178 """
179 for column in row.childNodes:
180 if len(column.childNodes) > 0:
181 return False
182
183 return True
184
185 def _get_cell_value(self, cell) -> Scalar | NaTType:
186 from odf.namespaces import OFFICENS
187
188 if str(cell) == "#N/A":
189 return np.nan
190
191 cell_type = cell.attributes.get((OFFICENS, "value-type"))
192 if cell_type == "boolean":
193 if str(cell) == "TRUE":
194 return True
195 return False
196 if cell_type is None:
197 return self.empty_value
198 elif cell_type == "float":
199 # GH5394
200 cell_value = float(cell.attributes.get((OFFICENS, "value")))
201 val = int(cell_value)
202 if val == cell_value:
203 return val
204 return cell_value
205 elif cell_type == "percentage":
206 cell_value = cell.attributes.get((OFFICENS, "value"))
207 return float(cell_value)
208 elif cell_type == "string":
209 return self._get_cell_string_value(cell)
210 elif cell_type == "currency":
211 cell_value = cell.attributes.get((OFFICENS, "value"))
212 return float(cell_value)
213 elif cell_type == "date":
214 cell_value = cell.attributes.get((OFFICENS, "date-value"))
215 return pd.Timestamp(cell_value)
216 elif cell_type == "time":
217 stamp = pd.Timestamp(str(cell))
218 # cast needed here because Scalar doesn't include datetime.time
219 return cast(Scalar, stamp.time())
220 else:
221 self.close()
222 raise ValueError(f"Unrecognized type {cell_type}")
223
224 def _get_cell_string_value(self, cell) -> str:
225 """
226 Find and decode OpenDocument text:s tags that represent
227 a run length encoded sequence of space characters.
228 """
229 from odf.element import Element
230 from odf.namespaces import TEXTNS
231 from odf.text import S
232
233 text_s = S().qname
234
235 value = []
236
237 for fragment in cell.childNodes:
238 if isinstance(fragment, Element):
239 if fragment.qname == text_s:
240 spaces = int(fragment.attributes.get((TEXTNS, "c"), 1))
241 value.append(" " * spaces)
242 else:
243 # recursive impl needed in case of nested fragments
244 # with multiple spaces
245 # https://github.com/pandas-dev/pandas/pull/36175#discussion_r484639704
246 value.append(self._get_cell_string_value(fragment))
247 else:
248 value.append(str(fragment).strip("\n"))
249 return "".join(value)