1from __future__ import annotations
2
3from collections import defaultdict
4from typing import TYPE_CHECKING
5import warnings
6
7import numpy as np
8
9from pandas._libs import (
10 lib,
11 parsers,
12)
13from pandas.compat._optional import import_optional_dependency
14from pandas.errors import DtypeWarning
15from pandas.util._exceptions import find_stack_level
16
17from pandas.core.dtypes.common import pandas_dtype
18from pandas.core.dtypes.concat import (
19 concat_compat,
20 union_categoricals,
21)
22from pandas.core.dtypes.dtypes import CategoricalDtype
23
24from pandas.core.indexes.api import ensure_index_from_sequences
25
26from pandas.io.common import (
27 dedup_names,
28 is_potential_multi_index,
29)
30from pandas.io.parsers.base_parser import (
31 ParserBase,
32 ParserError,
33 is_index_col,
34)
35
36if TYPE_CHECKING:
37 from collections.abc import (
38 Hashable,
39 Mapping,
40 Sequence,
41 )
42
43 from pandas._typing import (
44 ArrayLike,
45 DtypeArg,
46 DtypeObj,
47 ReadCsvBuffer,
48 )
49
50 from pandas import (
51 Index,
52 MultiIndex,
53 )
54
55
56class CParserWrapper(ParserBase):
57 low_memory: bool
58 _reader: parsers.TextReader
59
60 def __init__(self, src: ReadCsvBuffer[str], **kwds) -> None:
61 super().__init__(kwds)
62 self.kwds = kwds
63 kwds = kwds.copy()
64
65 self.low_memory = kwds.pop("low_memory", False)
66
67 # #2442
68 # error: Cannot determine type of 'index_col'
69 kwds["allow_leading_cols"] = (
70 self.index_col is not False # type: ignore[has-type]
71 )
72
73 # GH20529, validate usecol arg before TextReader
74 kwds["usecols"] = self.usecols
75
76 # Have to pass int, would break tests using TextReader directly otherwise :(
77 kwds["on_bad_lines"] = self.on_bad_lines.value
78
79 for key in (
80 "storage_options",
81 "encoding",
82 "memory_map",
83 "compression",
84 ):
85 kwds.pop(key, None)
86
87 kwds["dtype"] = ensure_dtype_objs(kwds.get("dtype", None))
88 if "dtype_backend" not in kwds or kwds["dtype_backend"] is lib.no_default:
89 kwds["dtype_backend"] = "numpy"
90 if kwds["dtype_backend"] == "pyarrow":
91 # Fail here loudly instead of in cython after reading
92 import_optional_dependency("pyarrow")
93 self._reader = parsers.TextReader(src, **kwds)
94
95 self.unnamed_cols = self._reader.unnamed_cols
96
97 # error: Cannot determine type of 'names'
98 passed_names = self.names is None # type: ignore[has-type]
99
100 if self._reader.header is None:
101 self.names = None
102 else:
103 # error: Cannot determine type of 'names'
104 # error: Cannot determine type of 'index_names'
105 (
106 self.names, # type: ignore[has-type]
107 self.index_names,
108 self.col_names,
109 passed_names,
110 ) = self._extract_multi_indexer_columns(
111 self._reader.header,
112 self.index_names, # type: ignore[has-type]
113 passed_names,
114 )
115
116 # error: Cannot determine type of 'names'
117 if self.names is None: # type: ignore[has-type]
118 self.names = list(range(self._reader.table_width))
119
120 # gh-9755
121 #
122 # need to set orig_names here first
123 # so that proper indexing can be done
124 # with _set_noconvert_columns
125 #
126 # once names has been filtered, we will
127 # then set orig_names again to names
128 # error: Cannot determine type of 'names'
129 self.orig_names = self.names[:] # type: ignore[has-type]
130
131 if self.usecols:
132 usecols = self._evaluate_usecols(self.usecols, self.orig_names)
133
134 # GH 14671
135 # assert for mypy, orig_names is List or None, None would error in issubset
136 assert self.orig_names is not None
137 if self.usecols_dtype == "string" and not set(usecols).issubset(
138 self.orig_names
139 ):
140 self._validate_usecols_names(usecols, self.orig_names)
141
142 # error: Cannot determine type of 'names'
143 if len(self.names) > len(usecols): # type: ignore[has-type]
144 # error: Cannot determine type of 'names'
145 self.names = [ # type: ignore[has-type]
146 n
147 # error: Cannot determine type of 'names'
148 for i, n in enumerate(self.names) # type: ignore[has-type]
149 if (i in usecols or n in usecols)
150 ]
151
152 # error: Cannot determine type of 'names'
153 if len(self.names) < len(usecols): # type: ignore[has-type]
154 # error: Cannot determine type of 'names'
155 self._validate_usecols_names(
156 usecols,
157 self.names, # type: ignore[has-type]
158 )
159
160 # error: Cannot determine type of 'names'
161 self._validate_parse_dates_presence(self.names) # type: ignore[has-type]
162 self._set_noconvert_columns()
163
164 # error: Cannot determine type of 'names'
165 self.orig_names = self.names # type: ignore[has-type]
166
167 if not self._has_complex_date_col:
168 # error: Cannot determine type of 'index_col'
169 if self._reader.leading_cols == 0 and is_index_col(
170 self.index_col # type: ignore[has-type]
171 ):
172 self._name_processed = True
173 (
174 index_names,
175 # error: Cannot determine type of 'names'
176 self.names, # type: ignore[has-type]
177 self.index_col,
178 ) = self._clean_index_names(
179 # error: Cannot determine type of 'names'
180 self.names, # type: ignore[has-type]
181 # error: Cannot determine type of 'index_col'
182 self.index_col, # type: ignore[has-type]
183 )
184
185 if self.index_names is None:
186 self.index_names = index_names
187
188 if self._reader.header is None and not passed_names:
189 assert self.index_names is not None
190 self.index_names = [None] * len(self.index_names)
191
192 self._implicit_index = self._reader.leading_cols > 0
193
194 def close(self) -> None:
195 # close handles opened by C parser
196 try:
197 self._reader.close()
198 except ValueError:
199 pass
200
201 def _set_noconvert_columns(self) -> None:
202 """
203 Set the columns that should not undergo dtype conversions.
204
205 Currently, any column that is involved with date parsing will not
206 undergo such conversions.
207 """
208 assert self.orig_names is not None
209 # error: Cannot determine type of 'names'
210
211 # much faster than using orig_names.index(x) xref GH#44106
212 names_dict = {x: i for i, x in enumerate(self.orig_names)}
213 col_indices = [names_dict[x] for x in self.names] # type: ignore[has-type]
214 # error: Cannot determine type of 'names'
215 noconvert_columns = self._set_noconvert_dtype_columns(
216 col_indices,
217 self.names, # type: ignore[has-type]
218 )
219 for col in noconvert_columns:
220 self._reader.set_noconvert(col)
221
222 def read(
223 self,
224 nrows: int | None = None,
225 ) -> tuple[
226 Index | MultiIndex | None,
227 Sequence[Hashable] | MultiIndex,
228 Mapping[Hashable, ArrayLike],
229 ]:
230 index: Index | MultiIndex | None
231 column_names: Sequence[Hashable] | MultiIndex
232 try:
233 if self.low_memory:
234 chunks = self._reader.read_low_memory(nrows)
235 # destructive to chunks
236 data = _concatenate_chunks(chunks)
237
238 else:
239 data = self._reader.read(nrows)
240 except StopIteration:
241 if self._first_chunk:
242 self._first_chunk = False
243 names = dedup_names(
244 self.orig_names,
245 is_potential_multi_index(self.orig_names, self.index_col),
246 )
247 index, columns, col_dict = self._get_empty_meta(
248 names,
249 dtype=self.dtype,
250 )
251 columns = self._maybe_make_multi_index_columns(columns, self.col_names)
252
253 if self.usecols is not None:
254 columns = self._filter_usecols(columns)
255
256 col_dict = {k: v for k, v in col_dict.items() if k in columns}
257
258 return index, columns, col_dict
259
260 else:
261 self.close()
262 raise
263
264 # Done with first read, next time raise StopIteration
265 self._first_chunk = False
266
267 # error: Cannot determine type of 'names'
268 names = self.names # type: ignore[has-type]
269
270 if self._reader.leading_cols:
271 if self._has_complex_date_col:
272 raise NotImplementedError("file structure not yet supported")
273
274 # implicit index, no index names
275 arrays = []
276
277 if self.index_col and self._reader.leading_cols != len(self.index_col):
278 raise ParserError(
279 "Could not construct index. Requested to use "
280 f"{len(self.index_col)} number of columns, but "
281 f"{self._reader.leading_cols} left to parse."
282 )
283
284 for i in range(self._reader.leading_cols):
285 if self.index_col is None:
286 values = data.pop(i)
287 else:
288 values = data.pop(self.index_col[i])
289
290 values = self._maybe_parse_dates(values, i, try_parse_dates=True)
291 arrays.append(values)
292
293 index = ensure_index_from_sequences(arrays)
294
295 if self.usecols is not None:
296 names = self._filter_usecols(names)
297
298 names = dedup_names(names, is_potential_multi_index(names, self.index_col))
299
300 # rename dict keys
301 data_tups = sorted(data.items())
302 data = {k: v for k, (i, v) in zip(names, data_tups)}
303
304 column_names, date_data = self._do_date_conversions(names, data)
305
306 # maybe create a mi on the columns
307 column_names = self._maybe_make_multi_index_columns(
308 column_names, self.col_names
309 )
310
311 else:
312 # rename dict keys
313 data_tups = sorted(data.items())
314
315 # ugh, mutation
316
317 # assert for mypy, orig_names is List or None, None would error in list(...)
318 assert self.orig_names is not None
319 names = list(self.orig_names)
320 names = dedup_names(names, is_potential_multi_index(names, self.index_col))
321
322 if self.usecols is not None:
323 names = self._filter_usecols(names)
324
325 # columns as list
326 alldata = [x[1] for x in data_tups]
327 if self.usecols is None:
328 self._check_data_length(names, alldata)
329
330 data = {k: v for k, (i, v) in zip(names, data_tups)}
331
332 names, date_data = self._do_date_conversions(names, data)
333 index, column_names = self._make_index(date_data, alldata, names)
334
335 return index, column_names, date_data
336
337 def _filter_usecols(self, names: Sequence[Hashable]) -> Sequence[Hashable]:
338 # hackish
339 usecols = self._evaluate_usecols(self.usecols, names)
340 if usecols is not None and len(names) != len(usecols):
341 names = [
342 name for i, name in enumerate(names) if i in usecols or name in usecols
343 ]
344 return names
345
346 def _maybe_parse_dates(self, values, index: int, try_parse_dates: bool = True):
347 if try_parse_dates and self._should_parse_dates(index):
348 values = self._date_conv(
349 values,
350 col=self.index_names[index] if self.index_names is not None else None,
351 )
352 return values
353
354
355def _concatenate_chunks(chunks: list[dict[int, ArrayLike]]) -> dict:
356 """
357 Concatenate chunks of data read with low_memory=True.
358
359 The tricky part is handling Categoricals, where different chunks
360 may have different inferred categories.
361 """
362 names = list(chunks[0].keys())
363 warning_columns = []
364
365 result: dict = {}
366 for name in names:
367 arrs = [chunk.pop(name) for chunk in chunks]
368 # Check each arr for consistent types.
369 dtypes = {a.dtype for a in arrs}
370 non_cat_dtypes = {x for x in dtypes if not isinstance(x, CategoricalDtype)}
371
372 dtype = dtypes.pop()
373 if isinstance(dtype, CategoricalDtype):
374 result[name] = union_categoricals(arrs, sort_categories=False)
375 else:
376 result[name] = concat_compat(arrs)
377 if len(non_cat_dtypes) > 1 and result[name].dtype == np.dtype(object):
378 warning_columns.append(str(name))
379
380 if warning_columns:
381 warning_names = ",".join(warning_columns)
382 warning_message = " ".join(
383 [
384 f"Columns ({warning_names}) have mixed types. "
385 f"Specify dtype option on import or set low_memory=False."
386 ]
387 )
388 warnings.warn(warning_message, DtypeWarning, stacklevel=find_stack_level())
389 return result
390
391
392def ensure_dtype_objs(
393 dtype: DtypeArg | dict[Hashable, DtypeArg] | None
394) -> DtypeObj | dict[Hashable, DtypeObj] | None:
395 """
396 Ensure we have either None, a dtype object, or a dictionary mapping to
397 dtype objects.
398 """
399 if isinstance(dtype, defaultdict):
400 # "None" not callable [misc]
401 default_dtype = pandas_dtype(dtype.default_factory()) # type: ignore[misc]
402 dtype_converted: defaultdict = defaultdict(lambda: default_dtype)
403 for key in dtype.keys():
404 dtype_converted[key] = pandas_dtype(dtype[key])
405 return dtype_converted
406 elif isinstance(dtype, dict):
407 return {k: pandas_dtype(dtype[k]) for k in dtype}
408 elif dtype is not None:
409 return pandas_dtype(dtype)
410 return dtype