1from __future__ import annotations
2
3from collections import (
4 abc,
5 defaultdict,
6)
7import csv
8from io import StringIO
9import re
10import sys
11from typing import (
12 IO,
13 TYPE_CHECKING,
14 DefaultDict,
15 Hashable,
16 Iterator,
17 List,
18 Literal,
19 Mapping,
20 Sequence,
21 cast,
22)
23
24import numpy as np
25
26from pandas._libs import lib
27from pandas._typing import (
28 ArrayLike,
29 ReadCsvBuffer,
30 Scalar,
31)
32from pandas.errors import (
33 EmptyDataError,
34 ParserError,
35)
36
37from pandas.core.dtypes.common import is_integer
38from pandas.core.dtypes.inference import is_dict_like
39
40from pandas.io.common import (
41 dedup_names,
42 is_potential_multi_index,
43)
44from pandas.io.parsers.base_parser import (
45 ParserBase,
46 parser_defaults,
47)
48
49if TYPE_CHECKING:
50 from pandas import (
51 Index,
52 MultiIndex,
53 )
54
55# BOM character (byte order mark)
56# This exists at the beginning of a file to indicate endianness
57# of a file (stream). Unfortunately, this marker screws up parsing,
58# so we need to remove it if we see it.
59_BOM = "\ufeff"
60
61
62class PythonParser(ParserBase):
63 def __init__(self, f: ReadCsvBuffer[str] | list, **kwds) -> None:
64 """
65 Workhorse function for processing nested list into DataFrame
66 """
67 super().__init__(kwds)
68
69 self.data: Iterator[str] | None = None
70 self.buf: list = []
71 self.pos = 0
72 self.line_pos = 0
73
74 self.skiprows = kwds["skiprows"]
75
76 if callable(self.skiprows):
77 self.skipfunc = self.skiprows
78 else:
79 self.skipfunc = lambda x: x in self.skiprows
80
81 self.skipfooter = _validate_skipfooter_arg(kwds["skipfooter"])
82 self.delimiter = kwds["delimiter"]
83
84 self.quotechar = kwds["quotechar"]
85 if isinstance(self.quotechar, str):
86 self.quotechar = str(self.quotechar)
87
88 self.escapechar = kwds["escapechar"]
89 self.doublequote = kwds["doublequote"]
90 self.skipinitialspace = kwds["skipinitialspace"]
91 self.lineterminator = kwds["lineterminator"]
92 self.quoting = kwds["quoting"]
93 self.skip_blank_lines = kwds["skip_blank_lines"]
94
95 self.names_passed = kwds["names"] or None
96
97 self.has_index_names = False
98 if "has_index_names" in kwds:
99 self.has_index_names = kwds["has_index_names"]
100
101 self.verbose = kwds["verbose"]
102
103 self.thousands = kwds["thousands"]
104 self.decimal = kwds["decimal"]
105
106 self.comment = kwds["comment"]
107
108 # Set self.data to something that can read lines.
109 if isinstance(f, list):
110 # read_excel: f is a list
111 self.data = cast(Iterator[str], f)
112 else:
113 assert hasattr(f, "readline")
114 self._make_reader(f)
115
116 # Get columns in two steps: infer from data, then
117 # infer column indices from self.usecols if it is specified.
118 self._col_indices: list[int] | None = None
119 columns: list[list[Scalar | None]]
120 (
121 columns,
122 self.num_original_columns,
123 self.unnamed_cols,
124 ) = self._infer_columns()
125
126 # Now self.columns has the set of columns that we will process.
127 # The original set is stored in self.original_columns.
128 # error: Cannot determine type of 'index_names'
129 (
130 self.columns,
131 self.index_names,
132 self.col_names,
133 _,
134 ) = self._extract_multi_indexer_columns(
135 columns,
136 self.index_names, # type: ignore[has-type]
137 )
138
139 # get popped off for index
140 self.orig_names: list[Hashable] = list(self.columns)
141
142 # needs to be cleaned/refactored
143 # multiple date column thing turning into a real spaghetti factory
144
145 if not self._has_complex_date_col:
146 (index_names, self.orig_names, self.columns) = self._get_index_name(
147 self.columns
148 )
149 self._name_processed = True
150 if self.index_names is None:
151 self.index_names = index_names
152
153 if self._col_indices is None:
154 self._col_indices = list(range(len(self.columns)))
155
156 self._parse_date_cols = self._validate_parse_dates_presence(self.columns)
157 no_thousands_columns: set[int] | None = None
158 if self.parse_dates:
159 no_thousands_columns = self._set_noconvert_dtype_columns(
160 self._col_indices, self.columns
161 )
162 self._no_thousands_columns = no_thousands_columns
163
164 if len(self.decimal) != 1:
165 raise ValueError("Only length-1 decimal markers supported")
166
167 decimal = re.escape(self.decimal)
168 if self.thousands is None:
169 regex = rf"^[\-\+]?[0-9]*({decimal}[0-9]*)?([0-9]?(E|e)\-?[0-9]+)?$"
170 else:
171 thousands = re.escape(self.thousands)
172 regex = (
173 rf"^[\-\+]?([0-9]+{thousands}|[0-9])*({decimal}[0-9]*)?"
174 rf"([0-9]?(E|e)\-?[0-9]+)?$"
175 )
176 self.num = re.compile(regex)
177
178 def _make_reader(self, f: IO[str] | ReadCsvBuffer[str]) -> None:
179 sep = self.delimiter
180
181 if sep is None or len(sep) == 1:
182 if self.lineterminator:
183 raise ValueError(
184 "Custom line terminators not supported in python parser (yet)"
185 )
186
187 class MyDialect(csv.Dialect):
188 delimiter = self.delimiter
189 quotechar = self.quotechar
190 escapechar = self.escapechar
191 doublequote = self.doublequote
192 skipinitialspace = self.skipinitialspace
193 quoting = self.quoting
194 lineterminator = "\n"
195
196 dia = MyDialect
197
198 if sep is not None:
199 dia.delimiter = sep
200 else:
201 # attempt to sniff the delimiter from the first valid line,
202 # i.e. no comment line and not in skiprows
203 line = f.readline()
204 lines = self._check_comments([[line]])[0]
205 while self.skipfunc(self.pos) or not lines:
206 self.pos += 1
207 line = f.readline()
208 lines = self._check_comments([[line]])[0]
209 lines_str = cast(List[str], lines)
210
211 # since `line` was a string, lines will be a list containing
212 # only a single string
213 line = lines_str[0]
214
215 self.pos += 1
216 self.line_pos += 1
217 sniffed = csv.Sniffer().sniff(line)
218 dia.delimiter = sniffed.delimiter
219
220 # Note: encoding is irrelevant here
221 line_rdr = csv.reader(StringIO(line), dialect=dia)
222 self.buf.extend(list(line_rdr))
223
224 # Note: encoding is irrelevant here
225 reader = csv.reader(f, dialect=dia, strict=True)
226
227 else:
228
229 def _read():
230 line = f.readline()
231 pat = re.compile(sep)
232
233 yield pat.split(line.strip())
234
235 for line in f:
236 yield pat.split(line.strip())
237
238 reader = _read()
239
240 # error: Incompatible types in assignment (expression has type "_reader",
241 # variable has type "Union[IO[Any], RawIOBase, BufferedIOBase, TextIOBase,
242 # TextIOWrapper, mmap, None]")
243 self.data = reader # type: ignore[assignment]
244
245 def read(
246 self, rows: int | None = None
247 ) -> tuple[
248 Index | None, Sequence[Hashable] | MultiIndex, Mapping[Hashable, ArrayLike]
249 ]:
250 try:
251 content = self._get_lines(rows)
252 except StopIteration:
253 if self._first_chunk:
254 content = []
255 else:
256 self.close()
257 raise
258
259 # done with first read, next time raise StopIteration
260 self._first_chunk = False
261
262 columns: Sequence[Hashable] = list(self.orig_names)
263 if not len(content): # pragma: no cover
264 # DataFrame with the right metadata, even though it's length 0
265 # error: Cannot determine type of 'index_col'
266 names = dedup_names(
267 self.orig_names,
268 is_potential_multi_index(
269 self.orig_names,
270 self.index_col, # type: ignore[has-type]
271 ),
272 )
273 # error: Cannot determine type of 'index_col'
274 index, columns, col_dict = self._get_empty_meta(
275 names,
276 self.index_col, # type: ignore[has-type]
277 self.index_names,
278 self.dtype,
279 )
280 conv_columns = self._maybe_make_multi_index_columns(columns, self.col_names)
281 return index, conv_columns, col_dict
282
283 # handle new style for names in index
284 count_empty_content_vals = count_empty_vals(content[0])
285 indexnamerow = None
286 if self.has_index_names and count_empty_content_vals == len(columns):
287 indexnamerow = content[0]
288 content = content[1:]
289
290 alldata = self._rows_to_cols(content)
291 data, columns = self._exclude_implicit_index(alldata)
292
293 conv_data = self._convert_data(data)
294 columns, conv_data = self._do_date_conversions(columns, conv_data)
295
296 index, result_columns = self._make_index(
297 conv_data, alldata, columns, indexnamerow
298 )
299
300 return index, result_columns, conv_data
301
302 def _exclude_implicit_index(
303 self,
304 alldata: list[np.ndarray],
305 ) -> tuple[Mapping[Hashable, np.ndarray], Sequence[Hashable]]:
306 # error: Cannot determine type of 'index_col'
307 names = dedup_names(
308 self.orig_names,
309 is_potential_multi_index(
310 self.orig_names,
311 self.index_col, # type: ignore[has-type]
312 ),
313 )
314
315 offset = 0
316 if self._implicit_index:
317 # error: Cannot determine type of 'index_col'
318 offset = len(self.index_col) # type: ignore[has-type]
319
320 len_alldata = len(alldata)
321 self._check_data_length(names, alldata)
322
323 return {
324 name: alldata[i + offset] for i, name in enumerate(names) if i < len_alldata
325 }, names
326
327 # legacy
328 def get_chunk(
329 self, size: int | None = None
330 ) -> tuple[
331 Index | None, Sequence[Hashable] | MultiIndex, Mapping[Hashable, ArrayLike]
332 ]:
333 if size is None:
334 # error: "PythonParser" has no attribute "chunksize"
335 size = self.chunksize # type: ignore[attr-defined]
336 return self.read(rows=size)
337
338 def _convert_data(
339 self,
340 data: Mapping[Hashable, np.ndarray],
341 ) -> Mapping[Hashable, ArrayLike]:
342 # apply converters
343 clean_conv = self._clean_mapping(self.converters)
344 clean_dtypes = self._clean_mapping(self.dtype)
345
346 # Apply NA values.
347 clean_na_values = {}
348 clean_na_fvalues = {}
349
350 if isinstance(self.na_values, dict):
351 for col in self.na_values:
352 na_value = self.na_values[col]
353 na_fvalue = self.na_fvalues[col]
354
355 if isinstance(col, int) and col not in self.orig_names:
356 col = self.orig_names[col]
357
358 clean_na_values[col] = na_value
359 clean_na_fvalues[col] = na_fvalue
360 else:
361 clean_na_values = self.na_values
362 clean_na_fvalues = self.na_fvalues
363
364 return self._convert_to_ndarrays(
365 data,
366 clean_na_values,
367 clean_na_fvalues,
368 self.verbose,
369 clean_conv,
370 clean_dtypes,
371 )
372
373 def _infer_columns(
374 self,
375 ) -> tuple[list[list[Scalar | None]], int, set[Scalar | None]]:
376 names = self.names
377 num_original_columns = 0
378 clear_buffer = True
379 unnamed_cols: set[Scalar | None] = set()
380 self._header_line = None
381
382 if self.header is not None:
383 header = self.header
384
385 if isinstance(header, (list, tuple, np.ndarray)):
386 have_mi_columns = len(header) > 1
387 # we have a mi columns, so read an extra line
388 if have_mi_columns:
389 header = list(header) + [header[-1] + 1]
390 else:
391 have_mi_columns = False
392 header = [header]
393
394 columns: list[list[Scalar | None]] = []
395 for level, hr in enumerate(header):
396 try:
397 line = self._buffered_line()
398
399 while self.line_pos <= hr:
400 line = self._next_line()
401
402 except StopIteration as err:
403 if 0 < self.line_pos <= hr and (
404 not have_mi_columns or hr != header[-1]
405 ):
406 # If no rows we want to raise a different message and if
407 # we have mi columns, the last line is not part of the header
408 joi = list(map(str, header[:-1] if have_mi_columns else header))
409 msg = f"[{','.join(joi)}], len of {len(joi)}, "
410 raise ValueError(
411 f"Passed header={msg}"
412 f"but only {self.line_pos} lines in file"
413 ) from err
414
415 # We have an empty file, so check
416 # if columns are provided. That will
417 # serve as the 'line' for parsing
418 if have_mi_columns and hr > 0:
419 if clear_buffer:
420 self._clear_buffer()
421 columns.append([None] * len(columns[-1]))
422 return columns, num_original_columns, unnamed_cols
423
424 if not self.names:
425 raise EmptyDataError("No columns to parse from file") from err
426
427 line = self.names[:]
428
429 this_columns: list[Scalar | None] = []
430 this_unnamed_cols = []
431
432 for i, c in enumerate(line):
433 if c == "":
434 if have_mi_columns:
435 col_name = f"Unnamed: {i}_level_{level}"
436 else:
437 col_name = f"Unnamed: {i}"
438
439 this_unnamed_cols.append(i)
440 this_columns.append(col_name)
441 else:
442 this_columns.append(c)
443
444 if not have_mi_columns:
445 counts: DefaultDict = defaultdict(int)
446 # Ensure that regular columns are used before unnamed ones
447 # to keep given names and mangle unnamed columns
448 col_loop_order = [
449 i
450 for i in range(len(this_columns))
451 if i not in this_unnamed_cols
452 ] + this_unnamed_cols
453
454 # TODO: Use pandas.io.common.dedup_names instead (see #50371)
455 for i in col_loop_order:
456 col = this_columns[i]
457 old_col = col
458 cur_count = counts[col]
459
460 if cur_count > 0:
461 while cur_count > 0:
462 counts[old_col] = cur_count + 1
463 col = f"{old_col}.{cur_count}"
464 if col in this_columns:
465 cur_count += 1
466 else:
467 cur_count = counts[col]
468
469 if (
470 self.dtype is not None
471 and is_dict_like(self.dtype)
472 and self.dtype.get(old_col) is not None
473 and self.dtype.get(col) is None
474 ):
475 self.dtype.update({col: self.dtype.get(old_col)})
476 this_columns[i] = col
477 counts[col] = cur_count + 1
478 elif have_mi_columns:
479 # if we have grabbed an extra line, but its not in our
480 # format so save in the buffer, and create an blank extra
481 # line for the rest of the parsing code
482 if hr == header[-1]:
483 lc = len(this_columns)
484 # error: Cannot determine type of 'index_col'
485 sic = self.index_col # type: ignore[has-type]
486 ic = len(sic) if sic is not None else 0
487 unnamed_count = len(this_unnamed_cols)
488
489 # if wrong number of blanks or no index, not our format
490 if (lc != unnamed_count and lc - ic > unnamed_count) or ic == 0:
491 clear_buffer = False
492 this_columns = [None] * lc
493 self.buf = [self.buf[-1]]
494
495 columns.append(this_columns)
496 unnamed_cols.update({this_columns[i] for i in this_unnamed_cols})
497
498 if len(columns) == 1:
499 num_original_columns = len(this_columns)
500
501 if clear_buffer:
502 self._clear_buffer()
503
504 first_line: list[Scalar] | None
505 if names is not None:
506 # Read first row after header to check if data are longer
507 try:
508 first_line = self._next_line()
509 except StopIteration:
510 first_line = None
511
512 len_first_data_row = 0 if first_line is None else len(first_line)
513
514 if len(names) > len(columns[0]) and len(names) > len_first_data_row:
515 raise ValueError(
516 "Number of passed names did not match "
517 "number of header fields in the file"
518 )
519 if len(columns) > 1:
520 raise TypeError("Cannot pass names with multi-index columns")
521
522 if self.usecols is not None:
523 # Set _use_cols. We don't store columns because they are
524 # overwritten.
525 self._handle_usecols(columns, names, num_original_columns)
526 else:
527 num_original_columns = len(names)
528 if self._col_indices is not None and len(names) != len(
529 self._col_indices
530 ):
531 columns = [[names[i] for i in sorted(self._col_indices)]]
532 else:
533 columns = [names]
534 else:
535 columns = self._handle_usecols(
536 columns, columns[0], num_original_columns
537 )
538 else:
539 try:
540 line = self._buffered_line()
541
542 except StopIteration as err:
543 if not names:
544 raise EmptyDataError("No columns to parse from file") from err
545
546 line = names[:]
547
548 # Store line, otherwise it is lost for guessing the index
549 self._header_line = line
550 ncols = len(line)
551 num_original_columns = ncols
552
553 if not names:
554 columns = [list(range(ncols))]
555 columns = self._handle_usecols(
556 columns, columns[0], num_original_columns
557 )
558 else:
559 if self.usecols is None or len(names) >= num_original_columns:
560 columns = self._handle_usecols([names], names, num_original_columns)
561 num_original_columns = len(names)
562 else:
563 if not callable(self.usecols) and len(names) != len(self.usecols):
564 raise ValueError(
565 "Number of passed names did not match number of "
566 "header fields in the file"
567 )
568 # Ignore output but set used columns.
569 self._handle_usecols([names], names, ncols)
570 columns = [names]
571 num_original_columns = ncols
572
573 return columns, num_original_columns, unnamed_cols
574
575 def _handle_usecols(
576 self,
577 columns: list[list[Scalar | None]],
578 usecols_key: list[Scalar | None],
579 num_original_columns: int,
580 ) -> list[list[Scalar | None]]:
581 """
582 Sets self._col_indices
583
584 usecols_key is used if there are string usecols.
585 """
586 col_indices: set[int] | list[int]
587 if self.usecols is not None:
588 if callable(self.usecols):
589 col_indices = self._evaluate_usecols(self.usecols, usecols_key)
590 elif any(isinstance(u, str) for u in self.usecols):
591 if len(columns) > 1:
592 raise ValueError(
593 "If using multiple headers, usecols must be integers."
594 )
595 col_indices = []
596
597 for col in self.usecols:
598 if isinstance(col, str):
599 try:
600 col_indices.append(usecols_key.index(col))
601 except ValueError:
602 self._validate_usecols_names(self.usecols, usecols_key)
603 else:
604 col_indices.append(col)
605 else:
606 missing_usecols = [
607 col for col in self.usecols if col >= num_original_columns
608 ]
609 if missing_usecols:
610 raise ParserError(
611 "Defining usecols without of bounds indices is not allowed. "
612 f"{missing_usecols} are out of bounds.",
613 )
614 col_indices = self.usecols
615
616 columns = [
617 [n for i, n in enumerate(column) if i in col_indices]
618 for column in columns
619 ]
620 self._col_indices = sorted(col_indices)
621 return columns
622
623 def _buffered_line(self) -> list[Scalar]:
624 """
625 Return a line from buffer, filling buffer if required.
626 """
627 if len(self.buf) > 0:
628 return self.buf[0]
629 else:
630 return self._next_line()
631
632 def _check_for_bom(self, first_row: list[Scalar]) -> list[Scalar]:
633 """
634 Checks whether the file begins with the BOM character.
635 If it does, remove it. In addition, if there is quoting
636 in the field subsequent to the BOM, remove it as well
637 because it technically takes place at the beginning of
638 the name, not the middle of it.
639 """
640 # first_row will be a list, so we need to check
641 # that that list is not empty before proceeding.
642 if not first_row:
643 return first_row
644
645 # The first element of this row is the one that could have the
646 # BOM that we want to remove. Check that the first element is a
647 # string before proceeding.
648 if not isinstance(first_row[0], str):
649 return first_row
650
651 # Check that the string is not empty, as that would
652 # obviously not have a BOM at the start of it.
653 if not first_row[0]:
654 return first_row
655
656 # Since the string is non-empty, check that it does
657 # in fact begin with a BOM.
658 first_elt = first_row[0][0]
659 if first_elt != _BOM:
660 return first_row
661
662 first_row_bom = first_row[0]
663 new_row: str
664
665 if len(first_row_bom) > 1 and first_row_bom[1] == self.quotechar:
666 start = 2
667 quote = first_row_bom[1]
668 end = first_row_bom[2:].index(quote) + 2
669
670 # Extract the data between the quotation marks
671 new_row = first_row_bom[start:end]
672
673 # Extract any remaining data after the second
674 # quotation mark.
675 if len(first_row_bom) > end + 1:
676 new_row += first_row_bom[end + 1 :]
677
678 else:
679 # No quotation so just remove BOM from first element
680 new_row = first_row_bom[1:]
681
682 new_row_list: list[Scalar] = [new_row]
683 return new_row_list + first_row[1:]
684
685 def _is_line_empty(self, line: list[Scalar]) -> bool:
686 """
687 Check if a line is empty or not.
688
689 Parameters
690 ----------
691 line : str, array-like
692 The line of data to check.
693
694 Returns
695 -------
696 boolean : Whether or not the line is empty.
697 """
698 return not line or all(not x for x in line)
699
700 def _next_line(self) -> list[Scalar]:
701 if isinstance(self.data, list):
702 while self.skipfunc(self.pos):
703 if self.pos >= len(self.data):
704 break
705 self.pos += 1
706
707 while True:
708 try:
709 line = self._check_comments([self.data[self.pos]])[0]
710 self.pos += 1
711 # either uncommented or blank to begin with
712 if not self.skip_blank_lines and (
713 self._is_line_empty(self.data[self.pos - 1]) or line
714 ):
715 break
716 if self.skip_blank_lines:
717 ret = self._remove_empty_lines([line])
718 if ret:
719 line = ret[0]
720 break
721 except IndexError:
722 raise StopIteration
723 else:
724 while self.skipfunc(self.pos):
725 self.pos += 1
726 # assert for mypy, data is Iterator[str] or None, would error in next
727 assert self.data is not None
728 next(self.data)
729
730 while True:
731 orig_line = self._next_iter_line(row_num=self.pos + 1)
732 self.pos += 1
733
734 if orig_line is not None:
735 line = self._check_comments([orig_line])[0]
736
737 if self.skip_blank_lines:
738 ret = self._remove_empty_lines([line])
739
740 if ret:
741 line = ret[0]
742 break
743 elif self._is_line_empty(orig_line) or line:
744 break
745
746 # This was the first line of the file,
747 # which could contain the BOM at the
748 # beginning of it.
749 if self.pos == 1:
750 line = self._check_for_bom(line)
751
752 self.line_pos += 1
753 self.buf.append(line)
754 return line
755
756 def _alert_malformed(self, msg: str, row_num: int) -> None:
757 """
758 Alert a user about a malformed row, depending on value of
759 `self.on_bad_lines` enum.
760
761 If `self.on_bad_lines` is ERROR, the alert will be `ParserError`.
762 If `self.on_bad_lines` is WARN, the alert will be printed out.
763
764 Parameters
765 ----------
766 msg: str
767 The error message to display.
768 row_num: int
769 The row number where the parsing error occurred.
770 Because this row number is displayed, we 1-index,
771 even though we 0-index internally.
772 """
773 if self.on_bad_lines == self.BadLineHandleMethod.ERROR:
774 raise ParserError(msg)
775 if self.on_bad_lines == self.BadLineHandleMethod.WARN:
776 base = f"Skipping line {row_num}: "
777 sys.stderr.write(base + msg + "\n")
778
779 def _next_iter_line(self, row_num: int) -> list[Scalar] | None:
780 """
781 Wrapper around iterating through `self.data` (CSV source).
782
783 When a CSV error is raised, we check for specific
784 error messages that allow us to customize the
785 error message displayed to the user.
786
787 Parameters
788 ----------
789 row_num: int
790 The row number of the line being parsed.
791 """
792 try:
793 # assert for mypy, data is Iterator[str] or None, would error in next
794 assert self.data is not None
795 line = next(self.data)
796 # for mypy
797 assert isinstance(line, list)
798 return line
799 except csv.Error as e:
800 if self.on_bad_lines in (
801 self.BadLineHandleMethod.ERROR,
802 self.BadLineHandleMethod.WARN,
803 ):
804 msg = str(e)
805
806 if "NULL byte" in msg or "line contains NUL" in msg:
807 msg = (
808 "NULL byte detected. This byte "
809 "cannot be processed in Python's "
810 "native csv library at the moment, "
811 "so please pass in engine='c' instead"
812 )
813
814 if self.skipfooter > 0:
815 reason = (
816 "Error could possibly be due to "
817 "parsing errors in the skipped footer rows "
818 "(the skipfooter keyword is only applied "
819 "after Python's csv library has parsed "
820 "all rows)."
821 )
822 msg += ". " + reason
823
824 self._alert_malformed(msg, row_num)
825 return None
826
827 def _check_comments(self, lines: list[list[Scalar]]) -> list[list[Scalar]]:
828 if self.comment is None:
829 return lines
830 ret = []
831 for line in lines:
832 rl = []
833 for x in line:
834 if (
835 not isinstance(x, str)
836 or self.comment not in x
837 or x in self.na_values
838 ):
839 rl.append(x)
840 else:
841 x = x[: x.find(self.comment)]
842 if len(x) > 0:
843 rl.append(x)
844 break
845 ret.append(rl)
846 return ret
847
848 def _remove_empty_lines(self, lines: list[list[Scalar]]) -> list[list[Scalar]]:
849 """
850 Iterate through the lines and remove any that are
851 either empty or contain only one whitespace value
852
853 Parameters
854 ----------
855 lines : list of list of Scalars
856 The array of lines that we are to filter.
857
858 Returns
859 -------
860 filtered_lines : list of list of Scalars
861 The same array of lines with the "empty" ones removed.
862 """
863 ret = []
864 for line in lines:
865 # Remove empty lines and lines with only one whitespace value
866 if (
867 len(line) > 1
868 or len(line) == 1
869 and (not isinstance(line[0], str) or line[0].strip())
870 ):
871 ret.append(line)
872 return ret
873
874 def _check_thousands(self, lines: list[list[Scalar]]) -> list[list[Scalar]]:
875 if self.thousands is None:
876 return lines
877
878 return self._search_replace_num_columns(
879 lines=lines, search=self.thousands, replace=""
880 )
881
882 def _search_replace_num_columns(
883 self, lines: list[list[Scalar]], search: str, replace: str
884 ) -> list[list[Scalar]]:
885 ret = []
886 for line in lines:
887 rl = []
888 for i, x in enumerate(line):
889 if (
890 not isinstance(x, str)
891 or search not in x
892 or (self._no_thousands_columns and i in self._no_thousands_columns)
893 or not self.num.search(x.strip())
894 ):
895 rl.append(x)
896 else:
897 rl.append(x.replace(search, replace))
898 ret.append(rl)
899 return ret
900
901 def _check_decimal(self, lines: list[list[Scalar]]) -> list[list[Scalar]]:
902 if self.decimal == parser_defaults["decimal"]:
903 return lines
904
905 return self._search_replace_num_columns(
906 lines=lines, search=self.decimal, replace="."
907 )
908
909 def _clear_buffer(self) -> None:
910 self.buf = []
911
912 _implicit_index = False
913
914 def _get_index_name(
915 self, columns: Sequence[Hashable]
916 ) -> tuple[Sequence[Hashable] | None, list[Hashable], list[Hashable]]:
917 """
918 Try several cases to get lines:
919
920 0) There are headers on row 0 and row 1 and their
921 total summed lengths equals the length of the next line.
922 Treat row 0 as columns and row 1 as indices
923 1) Look for implicit index: there are more columns
924 on row 1 than row 0. If this is true, assume that row
925 1 lists index columns and row 0 lists normal columns.
926 2) Get index from the columns if it was listed.
927 """
928 orig_names = list(columns)
929 columns = list(columns)
930
931 line: list[Scalar] | None
932 if self._header_line is not None:
933 line = self._header_line
934 else:
935 try:
936 line = self._next_line()
937 except StopIteration:
938 line = None
939
940 next_line: list[Scalar] | None
941 try:
942 next_line = self._next_line()
943 except StopIteration:
944 next_line = None
945
946 # implicitly index_col=0 b/c 1 fewer column names
947 implicit_first_cols = 0
948 if line is not None:
949 # leave it 0, #2442
950 # Case 1
951 # error: Cannot determine type of 'index_col'
952 index_col = self.index_col # type: ignore[has-type]
953 if index_col is not False:
954 implicit_first_cols = len(line) - self.num_original_columns
955
956 # Case 0
957 if (
958 next_line is not None
959 and self.header is not None
960 and index_col is not False
961 ):
962 if len(next_line) == len(line) + self.num_original_columns:
963 # column and index names on diff rows
964 self.index_col = list(range(len(line)))
965 self.buf = self.buf[1:]
966
967 for c in reversed(line):
968 columns.insert(0, c)
969
970 # Update list of original names to include all indices.
971 orig_names = list(columns)
972 self.num_original_columns = len(columns)
973 return line, orig_names, columns
974
975 if implicit_first_cols > 0:
976 # Case 1
977 self._implicit_index = True
978 if self.index_col is None:
979 self.index_col = list(range(implicit_first_cols))
980
981 index_name = None
982
983 else:
984 # Case 2
985 (index_name, _, self.index_col) = self._clean_index_names(
986 columns, self.index_col
987 )
988
989 return index_name, orig_names, columns
990
991 def _rows_to_cols(self, content: list[list[Scalar]]) -> list[np.ndarray]:
992 col_len = self.num_original_columns
993
994 if self._implicit_index:
995 col_len += len(self.index_col)
996
997 max_len = max(len(row) for row in content)
998
999 # Check that there are no rows with too many
1000 # elements in their row (rows with too few
1001 # elements are padded with NaN).
1002 # error: Non-overlapping identity check (left operand type: "List[int]",
1003 # right operand type: "Literal[False]")
1004 if (
1005 max_len > col_len
1006 and self.index_col is not False # type: ignore[comparison-overlap]
1007 and self.usecols is None
1008 ):
1009 footers = self.skipfooter if self.skipfooter else 0
1010 bad_lines = []
1011
1012 iter_content = enumerate(content)
1013 content_len = len(content)
1014 content = []
1015
1016 for i, _content in iter_content:
1017 actual_len = len(_content)
1018
1019 if actual_len > col_len:
1020 if callable(self.on_bad_lines):
1021 new_l = self.on_bad_lines(_content)
1022 if new_l is not None:
1023 content.append(new_l)
1024 elif self.on_bad_lines in (
1025 self.BadLineHandleMethod.ERROR,
1026 self.BadLineHandleMethod.WARN,
1027 ):
1028 row_num = self.pos - (content_len - i + footers)
1029 bad_lines.append((row_num, actual_len))
1030
1031 if self.on_bad_lines == self.BadLineHandleMethod.ERROR:
1032 break
1033 else:
1034 content.append(_content)
1035
1036 for row_num, actual_len in bad_lines:
1037 msg = (
1038 f"Expected {col_len} fields in line {row_num + 1}, saw "
1039 f"{actual_len}"
1040 )
1041 if (
1042 self.delimiter
1043 and len(self.delimiter) > 1
1044 and self.quoting != csv.QUOTE_NONE
1045 ):
1046 # see gh-13374
1047 reason = (
1048 "Error could possibly be due to quotes being "
1049 "ignored when a multi-char delimiter is used."
1050 )
1051 msg += ". " + reason
1052
1053 self._alert_malformed(msg, row_num + 1)
1054
1055 # see gh-13320
1056 zipped_content = list(lib.to_object_array(content, min_width=col_len).T)
1057
1058 if self.usecols:
1059 assert self._col_indices is not None
1060 col_indices = self._col_indices
1061
1062 if self._implicit_index:
1063 zipped_content = [
1064 a
1065 for i, a in enumerate(zipped_content)
1066 if (
1067 i < len(self.index_col)
1068 or i - len(self.index_col) in col_indices
1069 )
1070 ]
1071 else:
1072 zipped_content = [
1073 a for i, a in enumerate(zipped_content) if i in col_indices
1074 ]
1075 return zipped_content
1076
1077 def _get_lines(self, rows: int | None = None) -> list[list[Scalar]]:
1078 lines = self.buf
1079 new_rows = None
1080
1081 # already fetched some number
1082 if rows is not None:
1083 # we already have the lines in the buffer
1084 if len(self.buf) >= rows:
1085 new_rows, self.buf = self.buf[:rows], self.buf[rows:]
1086
1087 # need some lines
1088 else:
1089 rows -= len(self.buf)
1090
1091 if new_rows is None:
1092 if isinstance(self.data, list):
1093 if self.pos > len(self.data):
1094 raise StopIteration
1095 if rows is None:
1096 new_rows = self.data[self.pos :]
1097 new_pos = len(self.data)
1098 else:
1099 new_rows = self.data[self.pos : self.pos + rows]
1100 new_pos = self.pos + rows
1101
1102 new_rows = self._remove_skipped_rows(new_rows)
1103 lines.extend(new_rows)
1104 self.pos = new_pos
1105
1106 else:
1107 new_rows = []
1108 try:
1109 if rows is not None:
1110 rows_to_skip = 0
1111 if self.skiprows is not None and self.pos is not None:
1112 # Only read additional rows if pos is in skiprows
1113 rows_to_skip = len(
1114 set(self.skiprows) - set(range(self.pos))
1115 )
1116
1117 for _ in range(rows + rows_to_skip):
1118 # assert for mypy, data is Iterator[str] or None, would
1119 # error in next
1120 assert self.data is not None
1121 new_rows.append(next(self.data))
1122
1123 len_new_rows = len(new_rows)
1124 new_rows = self._remove_skipped_rows(new_rows)
1125 lines.extend(new_rows)
1126 else:
1127 rows = 0
1128
1129 while True:
1130 new_row = self._next_iter_line(row_num=self.pos + rows + 1)
1131 rows += 1
1132
1133 if new_row is not None:
1134 new_rows.append(new_row)
1135 len_new_rows = len(new_rows)
1136
1137 except StopIteration:
1138 len_new_rows = len(new_rows)
1139 new_rows = self._remove_skipped_rows(new_rows)
1140 lines.extend(new_rows)
1141 if len(lines) == 0:
1142 raise
1143 self.pos += len_new_rows
1144
1145 self.buf = []
1146 else:
1147 lines = new_rows
1148
1149 if self.skipfooter:
1150 lines = lines[: -self.skipfooter]
1151
1152 lines = self._check_comments(lines)
1153 if self.skip_blank_lines:
1154 lines = self._remove_empty_lines(lines)
1155 lines = self._check_thousands(lines)
1156 return self._check_decimal(lines)
1157
1158 def _remove_skipped_rows(self, new_rows: list[list[Scalar]]) -> list[list[Scalar]]:
1159 if self.skiprows:
1160 return [
1161 row for i, row in enumerate(new_rows) if not self.skipfunc(i + self.pos)
1162 ]
1163 return new_rows
1164
1165
1166class FixedWidthReader(abc.Iterator):
1167 """
1168 A reader of fixed-width lines.
1169 """
1170
1171 def __init__(
1172 self,
1173 f: IO[str] | ReadCsvBuffer[str],
1174 colspecs: list[tuple[int, int]] | Literal["infer"],
1175 delimiter: str | None,
1176 comment: str | None,
1177 skiprows: set[int] | None = None,
1178 infer_nrows: int = 100,
1179 ) -> None:
1180 self.f = f
1181 self.buffer: Iterator | None = None
1182 self.delimiter = "\r\n" + delimiter if delimiter else "\n\r\t "
1183 self.comment = comment
1184 if colspecs == "infer":
1185 self.colspecs = self.detect_colspecs(
1186 infer_nrows=infer_nrows, skiprows=skiprows
1187 )
1188 else:
1189 self.colspecs = colspecs
1190
1191 if not isinstance(self.colspecs, (tuple, list)):
1192 raise TypeError(
1193 "column specifications must be a list or tuple, "
1194 f"input was a {type(colspecs).__name__}"
1195 )
1196
1197 for colspec in self.colspecs:
1198 if not (
1199 isinstance(colspec, (tuple, list))
1200 and len(colspec) == 2
1201 and isinstance(colspec[0], (int, np.integer, type(None)))
1202 and isinstance(colspec[1], (int, np.integer, type(None)))
1203 ):
1204 raise TypeError(
1205 "Each column specification must be "
1206 "2 element tuple or list of integers"
1207 )
1208
1209 def get_rows(self, infer_nrows: int, skiprows: set[int] | None = None) -> list[str]:
1210 """
1211 Read rows from self.f, skipping as specified.
1212
1213 We distinguish buffer_rows (the first <= infer_nrows
1214 lines) from the rows returned to detect_colspecs
1215 because it's simpler to leave the other locations
1216 with skiprows logic alone than to modify them to
1217 deal with the fact we skipped some rows here as
1218 well.
1219
1220 Parameters
1221 ----------
1222 infer_nrows : int
1223 Number of rows to read from self.f, not counting
1224 rows that are skipped.
1225 skiprows: set, optional
1226 Indices of rows to skip.
1227
1228 Returns
1229 -------
1230 detect_rows : list of str
1231 A list containing the rows to read.
1232
1233 """
1234 if skiprows is None:
1235 skiprows = set()
1236 buffer_rows = []
1237 detect_rows = []
1238 for i, row in enumerate(self.f):
1239 if i not in skiprows:
1240 detect_rows.append(row)
1241 buffer_rows.append(row)
1242 if len(detect_rows) >= infer_nrows:
1243 break
1244 self.buffer = iter(buffer_rows)
1245 return detect_rows
1246
1247 def detect_colspecs(
1248 self, infer_nrows: int = 100, skiprows: set[int] | None = None
1249 ) -> list[tuple[int, int]]:
1250 # Regex escape the delimiters
1251 delimiters = "".join([rf"\{x}" for x in self.delimiter])
1252 pattern = re.compile(f"([^{delimiters}]+)")
1253 rows = self.get_rows(infer_nrows, skiprows)
1254 if not rows:
1255 raise EmptyDataError("No rows from which to infer column width")
1256 max_len = max(map(len, rows))
1257 mask = np.zeros(max_len + 1, dtype=int)
1258 if self.comment is not None:
1259 rows = [row.partition(self.comment)[0] for row in rows]
1260 for row in rows:
1261 for m in pattern.finditer(row):
1262 mask[m.start() : m.end()] = 1
1263 shifted = np.roll(mask, 1)
1264 shifted[0] = 0
1265 edges = np.where((mask ^ shifted) == 1)[0]
1266 edge_pairs = list(zip(edges[::2], edges[1::2]))
1267 return edge_pairs
1268
1269 def __next__(self) -> list[str]:
1270 # Argument 1 to "next" has incompatible type "Union[IO[str],
1271 # ReadCsvBuffer[str]]"; expected "SupportsNext[str]"
1272 if self.buffer is not None:
1273 try:
1274 line = next(self.buffer)
1275 except StopIteration:
1276 self.buffer = None
1277 line = next(self.f) # type: ignore[arg-type]
1278 else:
1279 line = next(self.f) # type: ignore[arg-type]
1280 # Note: 'colspecs' is a sequence of half-open intervals.
1281 return [line[from_:to].strip(self.delimiter) for (from_, to) in self.colspecs]
1282
1283
1284class FixedWidthFieldParser(PythonParser):
1285 """
1286 Specialization that Converts fixed-width fields into DataFrames.
1287 See PythonParser for details.
1288 """
1289
1290 def __init__(self, f: ReadCsvBuffer[str], **kwds) -> None:
1291 # Support iterators, convert to a list.
1292 self.colspecs = kwds.pop("colspecs")
1293 self.infer_nrows = kwds.pop("infer_nrows")
1294 PythonParser.__init__(self, f, **kwds)
1295
1296 def _make_reader(self, f: IO[str] | ReadCsvBuffer[str]) -> None:
1297 self.data = FixedWidthReader(
1298 f,
1299 self.colspecs,
1300 self.delimiter,
1301 self.comment,
1302 self.skiprows,
1303 self.infer_nrows,
1304 )
1305
1306 def _remove_empty_lines(self, lines: list[list[Scalar]]) -> list[list[Scalar]]:
1307 """
1308 Returns the list of lines without the empty ones. With fixed-width
1309 fields, empty lines become arrays of empty strings.
1310
1311 See PythonParser._remove_empty_lines.
1312 """
1313 return [
1314 line
1315 for line in lines
1316 if any(not isinstance(e, str) or e.strip() for e in line)
1317 ]
1318
1319
1320def count_empty_vals(vals) -> int:
1321 return sum(1 for v in vals if v == "" or v is None)
1322
1323
1324def _validate_skipfooter_arg(skipfooter: int) -> int:
1325 """
1326 Validate the 'skipfooter' parameter.
1327
1328 Checks whether 'skipfooter' is a non-negative integer.
1329 Raises a ValueError if that is not the case.
1330
1331 Parameters
1332 ----------
1333 skipfooter : non-negative integer
1334 The number of rows to skip at the end of the file.
1335
1336 Returns
1337 -------
1338 validated_skipfooter : non-negative integer
1339 The original input if the validation succeeds.
1340
1341 Raises
1342 ------
1343 ValueError : 'skipfooter' was not a non-negative integer.
1344 """
1345 if not is_integer(skipfooter):
1346 raise ValueError("skipfooter must be an integer")
1347
1348 if skipfooter < 0:
1349 raise ValueError("skipfooter cannot be negative")
1350
1351 return skipfooter