1from __future__ import annotations
2
3from collections import defaultdict
4from typing import (
5 TYPE_CHECKING,
6 Hashable,
7 Mapping,
8 Sequence,
9)
10import warnings
11
12import numpy as np
13
14from pandas._libs import (
15 lib,
16 parsers,
17)
18from pandas._typing import (
19 ArrayLike,
20 DtypeArg,
21 DtypeObj,
22 ReadCsvBuffer,
23)
24from pandas.compat._optional import import_optional_dependency
25from pandas.errors import DtypeWarning
26from pandas.util._exceptions import find_stack_level
27
28from pandas.core.dtypes.common import (
29 is_categorical_dtype,
30 pandas_dtype,
31)
32from pandas.core.dtypes.concat import (
33 concat_compat,
34 union_categoricals,
35)
36
37from pandas.core.indexes.api import ensure_index_from_sequences
38
39from pandas.io.common import (
40 dedup_names,
41 is_potential_multi_index,
42)
43from pandas.io.parsers.base_parser import (
44 ParserBase,
45 ParserError,
46 is_index_col,
47)
48
49if TYPE_CHECKING:
50 from pandas import (
51 Index,
52 MultiIndex,
53 )
54
55
56class CParserWrapper(ParserBase):
57 low_memory: bool
58 _reader: parsers.TextReader
59
60 def __init__(self, src: ReadCsvBuffer[str], **kwds) -> None:
61 super().__init__(kwds)
62 self.kwds = kwds
63 kwds = kwds.copy()
64
65 self.low_memory = kwds.pop("low_memory", False)
66
67 # #2442
68 # error: Cannot determine type of 'index_col'
69 kwds["allow_leading_cols"] = (
70 self.index_col is not False # type: ignore[has-type]
71 )
72
73 # GH20529, validate usecol arg before TextReader
74 kwds["usecols"] = self.usecols
75
76 # Have to pass int, would break tests using TextReader directly otherwise :(
77 kwds["on_bad_lines"] = self.on_bad_lines.value
78
79 for key in (
80 "storage_options",
81 "encoding",
82 "memory_map",
83 "compression",
84 ):
85 kwds.pop(key, None)
86
87 kwds["dtype"] = ensure_dtype_objs(kwds.get("dtype", None))
88 if "dtype_backend" not in kwds or kwds["dtype_backend"] is lib.no_default:
89 kwds["dtype_backend"] = "numpy"
90 if kwds["dtype_backend"] == "pyarrow":
91 # Fail here loudly instead of in cython after reading
92 import_optional_dependency("pyarrow")
93 self._reader = parsers.TextReader(src, **kwds)
94
95 self.unnamed_cols = self._reader.unnamed_cols
96
97 # error: Cannot determine type of 'names'
98 passed_names = self.names is None # type: ignore[has-type]
99
100 if self._reader.header is None:
101 self.names = None
102 else:
103 # error: Cannot determine type of 'names'
104 # error: Cannot determine type of 'index_names'
105 (
106 self.names, # type: ignore[has-type]
107 self.index_names,
108 self.col_names,
109 passed_names,
110 ) = self._extract_multi_indexer_columns(
111 self._reader.header,
112 self.index_names, # type: ignore[has-type]
113 passed_names,
114 )
115
116 # error: Cannot determine type of 'names'
117 if self.names is None: # type: ignore[has-type]
118 self.names = list(range(self._reader.table_width))
119
120 # gh-9755
121 #
122 # need to set orig_names here first
123 # so that proper indexing can be done
124 # with _set_noconvert_columns
125 #
126 # once names has been filtered, we will
127 # then set orig_names again to names
128 # error: Cannot determine type of 'names'
129 self.orig_names = self.names[:] # type: ignore[has-type]
130
131 if self.usecols:
132 usecols = self._evaluate_usecols(self.usecols, self.orig_names)
133
134 # GH 14671
135 # assert for mypy, orig_names is List or None, None would error in issubset
136 assert self.orig_names is not None
137 if self.usecols_dtype == "string" and not set(usecols).issubset(
138 self.orig_names
139 ):
140 self._validate_usecols_names(usecols, self.orig_names)
141
142 # error: Cannot determine type of 'names'
143 if len(self.names) > len(usecols): # type: ignore[has-type]
144 # error: Cannot determine type of 'names'
145 self.names = [ # type: ignore[has-type]
146 n
147 # error: Cannot determine type of 'names'
148 for i, n in enumerate(self.names) # type: ignore[has-type]
149 if (i in usecols or n in usecols)
150 ]
151
152 # error: Cannot determine type of 'names'
153 if len(self.names) < len(usecols): # type: ignore[has-type]
154 # error: Cannot determine type of 'names'
155 self._validate_usecols_names(
156 usecols,
157 self.names, # type: ignore[has-type]
158 )
159
160 # error: Cannot determine type of 'names'
161 self._validate_parse_dates_presence(self.names) # type: ignore[has-type]
162 self._set_noconvert_columns()
163
164 # error: Cannot determine type of 'names'
165 self.orig_names = self.names # type: ignore[has-type]
166
167 if not self._has_complex_date_col:
168 # error: Cannot determine type of 'index_col'
169 if self._reader.leading_cols == 0 and is_index_col(
170 self.index_col # type: ignore[has-type]
171 ):
172 self._name_processed = True
173 (
174 index_names,
175 # error: Cannot determine type of 'names'
176 self.names, # type: ignore[has-type]
177 self.index_col,
178 ) = self._clean_index_names(
179 # error: Cannot determine type of 'names'
180 self.names, # type: ignore[has-type]
181 # error: Cannot determine type of 'index_col'
182 self.index_col, # type: ignore[has-type]
183 )
184
185 if self.index_names is None:
186 self.index_names = index_names
187
188 if self._reader.header is None and not passed_names:
189 assert self.index_names is not None
190 self.index_names = [None] * len(self.index_names)
191
192 self._implicit_index = self._reader.leading_cols > 0
193
194 def close(self) -> None:
195 # close handles opened by C parser
196 try:
197 self._reader.close()
198 except ValueError:
199 pass
200
201 def _set_noconvert_columns(self) -> None:
202 """
203 Set the columns that should not undergo dtype conversions.
204
205 Currently, any column that is involved with date parsing will not
206 undergo such conversions.
207 """
208 assert self.orig_names is not None
209 # error: Cannot determine type of 'names'
210
211 # much faster than using orig_names.index(x) xref GH#44106
212 names_dict = {x: i for i, x in enumerate(self.orig_names)}
213 col_indices = [names_dict[x] for x in self.names] # type: ignore[has-type]
214 # error: Cannot determine type of 'names'
215 noconvert_columns = self._set_noconvert_dtype_columns(
216 col_indices,
217 self.names, # type: ignore[has-type]
218 )
219 for col in noconvert_columns:
220 self._reader.set_noconvert(col)
221
222 def read(
223 self,
224 nrows: int | None = None,
225 ) -> tuple[
226 Index | MultiIndex | None,
227 Sequence[Hashable] | MultiIndex,
228 Mapping[Hashable, ArrayLike],
229 ]:
230 index: Index | MultiIndex | None
231 column_names: Sequence[Hashable] | MultiIndex
232 try:
233 if self.low_memory:
234 chunks = self._reader.read_low_memory(nrows)
235 # destructive to chunks
236 data = _concatenate_chunks(chunks)
237
238 else:
239 data = self._reader.read(nrows)
240 except StopIteration:
241 if self._first_chunk:
242 self._first_chunk = False
243 names = dedup_names(
244 self.orig_names,
245 is_potential_multi_index(self.orig_names, self.index_col),
246 )
247 index, columns, col_dict = self._get_empty_meta(
248 names,
249 self.index_col,
250 self.index_names,
251 dtype=self.kwds.get("dtype"),
252 )
253 columns = self._maybe_make_multi_index_columns(columns, self.col_names)
254
255 if self.usecols is not None:
256 columns = self._filter_usecols(columns)
257
258 col_dict = {k: v for k, v in col_dict.items() if k in columns}
259
260 return index, columns, col_dict
261
262 else:
263 self.close()
264 raise
265
266 # Done with first read, next time raise StopIteration
267 self._first_chunk = False
268
269 # error: Cannot determine type of 'names'
270 names = self.names # type: ignore[has-type]
271
272 if self._reader.leading_cols:
273 if self._has_complex_date_col:
274 raise NotImplementedError("file structure not yet supported")
275
276 # implicit index, no index names
277 arrays = []
278
279 if self.index_col and self._reader.leading_cols != len(self.index_col):
280 raise ParserError(
281 "Could not construct index. Requested to use "
282 f"{len(self.index_col)} number of columns, but "
283 f"{self._reader.leading_cols} left to parse."
284 )
285
286 for i in range(self._reader.leading_cols):
287 if self.index_col is None:
288 values = data.pop(i)
289 else:
290 values = data.pop(self.index_col[i])
291
292 values = self._maybe_parse_dates(values, i, try_parse_dates=True)
293 arrays.append(values)
294
295 index = ensure_index_from_sequences(arrays)
296
297 if self.usecols is not None:
298 names = self._filter_usecols(names)
299
300 names = dedup_names(names, is_potential_multi_index(names, self.index_col))
301
302 # rename dict keys
303 data_tups = sorted(data.items())
304 data = {k: v for k, (i, v) in zip(names, data_tups)}
305
306 column_names, date_data = self._do_date_conversions(names, data)
307
308 # maybe create a mi on the columns
309 column_names = self._maybe_make_multi_index_columns(
310 column_names, self.col_names
311 )
312
313 else:
314 # rename dict keys
315 data_tups = sorted(data.items())
316
317 # ugh, mutation
318
319 # assert for mypy, orig_names is List or None, None would error in list(...)
320 assert self.orig_names is not None
321 names = list(self.orig_names)
322 names = dedup_names(names, is_potential_multi_index(names, self.index_col))
323
324 if self.usecols is not None:
325 names = self._filter_usecols(names)
326
327 # columns as list
328 alldata = [x[1] for x in data_tups]
329 if self.usecols is None:
330 self._check_data_length(names, alldata)
331
332 data = {k: v for k, (i, v) in zip(names, data_tups)}
333
334 names, date_data = self._do_date_conversions(names, data)
335 index, column_names = self._make_index(date_data, alldata, names)
336
337 return index, column_names, date_data
338
339 def _filter_usecols(self, names: Sequence[Hashable]) -> Sequence[Hashable]:
340 # hackish
341 usecols = self._evaluate_usecols(self.usecols, names)
342 if usecols is not None and len(names) != len(usecols):
343 names = [
344 name for i, name in enumerate(names) if i in usecols or name in usecols
345 ]
346 return names
347
348 def _get_index_names(self):
349 names = list(self._reader.header[0])
350 idx_names = None
351
352 if self._reader.leading_cols == 0 and self.index_col is not None:
353 (idx_names, names, self.index_col) = self._clean_index_names(
354 names, self.index_col
355 )
356
357 return names, idx_names
358
359 def _maybe_parse_dates(self, values, index: int, try_parse_dates: bool = True):
360 if try_parse_dates and self._should_parse_dates(index):
361 values = self._date_conv(
362 values,
363 col=self.index_names[index] if self.index_names is not None else None,
364 )
365 return values
366
367
368def _concatenate_chunks(chunks: list[dict[int, ArrayLike]]) -> dict:
369 """
370 Concatenate chunks of data read with low_memory=True.
371
372 The tricky part is handling Categoricals, where different chunks
373 may have different inferred categories.
374 """
375 names = list(chunks[0].keys())
376 warning_columns = []
377
378 result: dict = {}
379 for name in names:
380 arrs = [chunk.pop(name) for chunk in chunks]
381 # Check each arr for consistent types.
382 dtypes = {a.dtype for a in arrs}
383 non_cat_dtypes = {x for x in dtypes if not is_categorical_dtype(x)}
384
385 dtype = dtypes.pop()
386 if is_categorical_dtype(dtype):
387 result[name] = union_categoricals(arrs, sort_categories=False)
388 else:
389 result[name] = concat_compat(arrs)
390 if len(non_cat_dtypes) > 1 and result[name].dtype == np.dtype(object):
391 warning_columns.append(str(name))
392
393 if warning_columns:
394 warning_names = ",".join(warning_columns)
395 warning_message = " ".join(
396 [
397 f"Columns ({warning_names}) have mixed types. "
398 f"Specify dtype option on import or set low_memory=False."
399 ]
400 )
401 warnings.warn(warning_message, DtypeWarning, stacklevel=find_stack_level())
402 return result
403
404
405def ensure_dtype_objs(
406 dtype: DtypeArg | dict[Hashable, DtypeArg] | None
407) -> DtypeObj | dict[Hashable, DtypeObj] | None:
408 """
409 Ensure we have either None, a dtype object, or a dictionary mapping to
410 dtype objects.
411 """
412 if isinstance(dtype, defaultdict):
413 # "None" not callable [misc]
414 default_dtype = pandas_dtype(dtype.default_factory()) # type: ignore[misc]
415 dtype_converted: defaultdict = defaultdict(lambda: default_dtype)
416 for key in dtype.keys():
417 dtype_converted[key] = pandas_dtype(dtype[key])
418 return dtype_converted
419 elif isinstance(dtype, dict):
420 return {k: pandas_dtype(dtype[k]) for k in dtype}
421 elif dtype is not None:
422 return pandas_dtype(dtype)
423 return dtype