1from __future__ import annotations
2
3from collections import (
4 abc,
5 defaultdict,
6)
7from collections.abc import (
8 Hashable,
9 Iterator,
10 Mapping,
11 Sequence,
12)
13import csv
14from io import StringIO
15import re
16from typing import (
17 IO,
18 TYPE_CHECKING,
19 DefaultDict,
20 Literal,
21 cast,
22)
23import warnings
24
25import numpy as np
26
27from pandas._libs import lib
28from pandas.errors import (
29 EmptyDataError,
30 ParserError,
31 ParserWarning,
32)
33from pandas.util._decorators import cache_readonly
34from pandas.util._exceptions import find_stack_level
35
36from pandas.core.dtypes.common import (
37 is_bool_dtype,
38 is_integer,
39 is_numeric_dtype,
40)
41from pandas.core.dtypes.inference import is_dict_like
42
43from pandas.io.common import (
44 dedup_names,
45 is_potential_multi_index,
46)
47from pandas.io.parsers.base_parser import (
48 ParserBase,
49 parser_defaults,
50)
51
52if TYPE_CHECKING:
53 from pandas._typing import (
54 ArrayLike,
55 ReadCsvBuffer,
56 Scalar,
57 )
58
59 from pandas import (
60 Index,
61 MultiIndex,
62 )
63
64# BOM character (byte order mark)
65# This exists at the beginning of a file to indicate endianness
66# of a file (stream). Unfortunately, this marker screws up parsing,
67# so we need to remove it if we see it.
68_BOM = "\ufeff"
69
70
71class PythonParser(ParserBase):
72 _no_thousands_columns: set[int]
73
74 def __init__(self, f: ReadCsvBuffer[str] | list, **kwds) -> None:
75 """
76 Workhorse function for processing nested list into DataFrame
77 """
78 super().__init__(kwds)
79
80 self.data: Iterator[str] | None = None
81 self.buf: list = []
82 self.pos = 0
83 self.line_pos = 0
84
85 self.skiprows = kwds["skiprows"]
86
87 if callable(self.skiprows):
88 self.skipfunc = self.skiprows
89 else:
90 self.skipfunc = lambda x: x in self.skiprows
91
92 self.skipfooter = _validate_skipfooter_arg(kwds["skipfooter"])
93 self.delimiter = kwds["delimiter"]
94
95 self.quotechar = kwds["quotechar"]
96 if isinstance(self.quotechar, str):
97 self.quotechar = str(self.quotechar)
98
99 self.escapechar = kwds["escapechar"]
100 self.doublequote = kwds["doublequote"]
101 self.skipinitialspace = kwds["skipinitialspace"]
102 self.lineterminator = kwds["lineterminator"]
103 self.quoting = kwds["quoting"]
104 self.skip_blank_lines = kwds["skip_blank_lines"]
105
106 self.has_index_names = False
107 if "has_index_names" in kwds:
108 self.has_index_names = kwds["has_index_names"]
109
110 self.verbose = kwds["verbose"]
111
112 self.thousands = kwds["thousands"]
113 self.decimal = kwds["decimal"]
114
115 self.comment = kwds["comment"]
116
117 # Set self.data to something that can read lines.
118 if isinstance(f, list):
119 # read_excel: f is a list
120 self.data = cast(Iterator[str], f)
121 else:
122 assert hasattr(f, "readline")
123 self.data = self._make_reader(f)
124
125 # Get columns in two steps: infer from data, then
126 # infer column indices from self.usecols if it is specified.
127 self._col_indices: list[int] | None = None
128 columns: list[list[Scalar | None]]
129 (
130 columns,
131 self.num_original_columns,
132 self.unnamed_cols,
133 ) = self._infer_columns()
134
135 # Now self.columns has the set of columns that we will process.
136 # The original set is stored in self.original_columns.
137 # error: Cannot determine type of 'index_names'
138 (
139 self.columns,
140 self.index_names,
141 self.col_names,
142 _,
143 ) = self._extract_multi_indexer_columns(
144 columns,
145 self.index_names, # type: ignore[has-type]
146 )
147
148 # get popped off for index
149 self.orig_names: list[Hashable] = list(self.columns)
150
151 # needs to be cleaned/refactored
152 # multiple date column thing turning into a real spaghetti factory
153
154 if not self._has_complex_date_col:
155 (index_names, self.orig_names, self.columns) = self._get_index_name()
156 self._name_processed = True
157 if self.index_names is None:
158 self.index_names = index_names
159
160 if self._col_indices is None:
161 self._col_indices = list(range(len(self.columns)))
162
163 self._parse_date_cols = self._validate_parse_dates_presence(self.columns)
164 self._no_thousands_columns = self._set_no_thousand_columns()
165
166 if len(self.decimal) != 1:
167 raise ValueError("Only length-1 decimal markers supported")
168
169 @cache_readonly
170 def num(self) -> re.Pattern:
171 decimal = re.escape(self.decimal)
172 if self.thousands is None:
173 regex = rf"^[\-\+]?[0-9]*({decimal}[0-9]*)?([0-9]?(E|e)\-?[0-9]+)?$"
174 else:
175 thousands = re.escape(self.thousands)
176 regex = (
177 rf"^[\-\+]?([0-9]+{thousands}|[0-9])*({decimal}[0-9]*)?"
178 rf"([0-9]?(E|e)\-?[0-9]+)?$"
179 )
180 return re.compile(regex)
181
182 def _make_reader(self, f: IO[str] | ReadCsvBuffer[str]):
183 sep = self.delimiter
184
185 if sep is None or len(sep) == 1:
186 if self.lineterminator:
187 raise ValueError(
188 "Custom line terminators not supported in python parser (yet)"
189 )
190
191 class MyDialect(csv.Dialect):
192 delimiter = self.delimiter
193 quotechar = self.quotechar
194 escapechar = self.escapechar
195 doublequote = self.doublequote
196 skipinitialspace = self.skipinitialspace
197 quoting = self.quoting
198 lineterminator = "\n"
199
200 dia = MyDialect
201
202 if sep is not None:
203 dia.delimiter = sep
204 else:
205 # attempt to sniff the delimiter from the first valid line,
206 # i.e. no comment line and not in skiprows
207 line = f.readline()
208 lines = self._check_comments([[line]])[0]
209 while self.skipfunc(self.pos) or not lines:
210 self.pos += 1
211 line = f.readline()
212 lines = self._check_comments([[line]])[0]
213 lines_str = cast(list[str], lines)
214
215 # since `line` was a string, lines will be a list containing
216 # only a single string
217 line = lines_str[0]
218
219 self.pos += 1
220 self.line_pos += 1
221 sniffed = csv.Sniffer().sniff(line)
222 dia.delimiter = sniffed.delimiter
223
224 # Note: encoding is irrelevant here
225 line_rdr = csv.reader(StringIO(line), dialect=dia)
226 self.buf.extend(list(line_rdr))
227
228 # Note: encoding is irrelevant here
229 reader = csv.reader(f, dialect=dia, strict=True)
230
231 else:
232
233 def _read():
234 line = f.readline()
235 pat = re.compile(sep)
236
237 yield pat.split(line.strip())
238
239 for line in f:
240 yield pat.split(line.strip())
241
242 reader = _read()
243
244 return reader
245
246 def read(
247 self, rows: int | None = None
248 ) -> tuple[
249 Index | None, Sequence[Hashable] | MultiIndex, Mapping[Hashable, ArrayLike]
250 ]:
251 try:
252 content = self._get_lines(rows)
253 except StopIteration:
254 if self._first_chunk:
255 content = []
256 else:
257 self.close()
258 raise
259
260 # done with first read, next time raise StopIteration
261 self._first_chunk = False
262
263 columns: Sequence[Hashable] = list(self.orig_names)
264 if not len(content): # pragma: no cover
265 # DataFrame with the right metadata, even though it's length 0
266 # error: Cannot determine type of 'index_col'
267 names = dedup_names(
268 self.orig_names,
269 is_potential_multi_index(
270 self.orig_names,
271 self.index_col, # type: ignore[has-type]
272 ),
273 )
274 index, columns, col_dict = self._get_empty_meta(
275 names,
276 self.dtype,
277 )
278 conv_columns = self._maybe_make_multi_index_columns(columns, self.col_names)
279 return index, conv_columns, col_dict
280
281 # handle new style for names in index
282 count_empty_content_vals = count_empty_vals(content[0])
283 indexnamerow = None
284 if self.has_index_names and count_empty_content_vals == len(columns):
285 indexnamerow = content[0]
286 content = content[1:]
287
288 alldata = self._rows_to_cols(content)
289 data, columns = self._exclude_implicit_index(alldata)
290
291 conv_data = self._convert_data(data)
292 columns, conv_data = self._do_date_conversions(columns, conv_data)
293
294 index, result_columns = self._make_index(
295 conv_data, alldata, columns, indexnamerow
296 )
297
298 return index, result_columns, conv_data
299
300 def _exclude_implicit_index(
301 self,
302 alldata: list[np.ndarray],
303 ) -> tuple[Mapping[Hashable, np.ndarray], Sequence[Hashable]]:
304 # error: Cannot determine type of 'index_col'
305 names = dedup_names(
306 self.orig_names,
307 is_potential_multi_index(
308 self.orig_names,
309 self.index_col, # type: ignore[has-type]
310 ),
311 )
312
313 offset = 0
314 if self._implicit_index:
315 # error: Cannot determine type of 'index_col'
316 offset = len(self.index_col) # type: ignore[has-type]
317
318 len_alldata = len(alldata)
319 self._check_data_length(names, alldata)
320
321 return {
322 name: alldata[i + offset] for i, name in enumerate(names) if i < len_alldata
323 }, names
324
325 # legacy
326 def get_chunk(
327 self, size: int | None = None
328 ) -> tuple[
329 Index | None, Sequence[Hashable] | MultiIndex, Mapping[Hashable, ArrayLike]
330 ]:
331 if size is None:
332 # error: "PythonParser" has no attribute "chunksize"
333 size = self.chunksize # type: ignore[attr-defined]
334 return self.read(rows=size)
335
336 def _convert_data(
337 self,
338 data: Mapping[Hashable, np.ndarray],
339 ) -> Mapping[Hashable, ArrayLike]:
340 # apply converters
341 clean_conv = self._clean_mapping(self.converters)
342 clean_dtypes = self._clean_mapping(self.dtype)
343
344 # Apply NA values.
345 clean_na_values = {}
346 clean_na_fvalues = {}
347
348 if isinstance(self.na_values, dict):
349 for col in self.na_values:
350 na_value = self.na_values[col]
351 na_fvalue = self.na_fvalues[col]
352
353 if isinstance(col, int) and col not in self.orig_names:
354 col = self.orig_names[col]
355
356 clean_na_values[col] = na_value
357 clean_na_fvalues[col] = na_fvalue
358 else:
359 clean_na_values = self.na_values
360 clean_na_fvalues = self.na_fvalues
361
362 return self._convert_to_ndarrays(
363 data,
364 clean_na_values,
365 clean_na_fvalues,
366 self.verbose,
367 clean_conv,
368 clean_dtypes,
369 )
370
371 @cache_readonly
372 def _have_mi_columns(self) -> bool:
373 if self.header is None:
374 return False
375
376 header = self.header
377 if isinstance(header, (list, tuple, np.ndarray)):
378 return len(header) > 1
379 else:
380 return False
381
382 def _infer_columns(
383 self,
384 ) -> tuple[list[list[Scalar | None]], int, set[Scalar | None]]:
385 names = self.names
386 num_original_columns = 0
387 clear_buffer = True
388 unnamed_cols: set[Scalar | None] = set()
389
390 if self.header is not None:
391 header = self.header
392 have_mi_columns = self._have_mi_columns
393
394 if isinstance(header, (list, tuple, np.ndarray)):
395 # we have a mi columns, so read an extra line
396 if have_mi_columns:
397 header = list(header) + [header[-1] + 1]
398 else:
399 header = [header]
400
401 columns: list[list[Scalar | None]] = []
402 for level, hr in enumerate(header):
403 try:
404 line = self._buffered_line()
405
406 while self.line_pos <= hr:
407 line = self._next_line()
408
409 except StopIteration as err:
410 if 0 < self.line_pos <= hr and (
411 not have_mi_columns or hr != header[-1]
412 ):
413 # If no rows we want to raise a different message and if
414 # we have mi columns, the last line is not part of the header
415 joi = list(map(str, header[:-1] if have_mi_columns else header))
416 msg = f"[{','.join(joi)}], len of {len(joi)}, "
417 raise ValueError(
418 f"Passed header={msg}"
419 f"but only {self.line_pos} lines in file"
420 ) from err
421
422 # We have an empty file, so check
423 # if columns are provided. That will
424 # serve as the 'line' for parsing
425 if have_mi_columns and hr > 0:
426 if clear_buffer:
427 self._clear_buffer()
428 columns.append([None] * len(columns[-1]))
429 return columns, num_original_columns, unnamed_cols
430
431 if not self.names:
432 raise EmptyDataError("No columns to parse from file") from err
433
434 line = self.names[:]
435
436 this_columns: list[Scalar | None] = []
437 this_unnamed_cols = []
438
439 for i, c in enumerate(line):
440 if c == "":
441 if have_mi_columns:
442 col_name = f"Unnamed: {i}_level_{level}"
443 else:
444 col_name = f"Unnamed: {i}"
445
446 this_unnamed_cols.append(i)
447 this_columns.append(col_name)
448 else:
449 this_columns.append(c)
450
451 if not have_mi_columns:
452 counts: DefaultDict = defaultdict(int)
453 # Ensure that regular columns are used before unnamed ones
454 # to keep given names and mangle unnamed columns
455 col_loop_order = [
456 i
457 for i in range(len(this_columns))
458 if i not in this_unnamed_cols
459 ] + this_unnamed_cols
460
461 # TODO: Use pandas.io.common.dedup_names instead (see #50371)
462 for i in col_loop_order:
463 col = this_columns[i]
464 old_col = col
465 cur_count = counts[col]
466
467 if cur_count > 0:
468 while cur_count > 0:
469 counts[old_col] = cur_count + 1
470 col = f"{old_col}.{cur_count}"
471 if col in this_columns:
472 cur_count += 1
473 else:
474 cur_count = counts[col]
475
476 if (
477 self.dtype is not None
478 and is_dict_like(self.dtype)
479 and self.dtype.get(old_col) is not None
480 and self.dtype.get(col) is None
481 ):
482 self.dtype.update({col: self.dtype.get(old_col)})
483 this_columns[i] = col
484 counts[col] = cur_count + 1
485 elif have_mi_columns:
486 # if we have grabbed an extra line, but its not in our
487 # format so save in the buffer, and create an blank extra
488 # line for the rest of the parsing code
489 if hr == header[-1]:
490 lc = len(this_columns)
491 # error: Cannot determine type of 'index_col'
492 sic = self.index_col # type: ignore[has-type]
493 ic = len(sic) if sic is not None else 0
494 unnamed_count = len(this_unnamed_cols)
495
496 # if wrong number of blanks or no index, not our format
497 if (lc != unnamed_count and lc - ic > unnamed_count) or ic == 0:
498 clear_buffer = False
499 this_columns = [None] * lc
500 self.buf = [self.buf[-1]]
501
502 columns.append(this_columns)
503 unnamed_cols.update({this_columns[i] for i in this_unnamed_cols})
504
505 if len(columns) == 1:
506 num_original_columns = len(this_columns)
507
508 if clear_buffer:
509 self._clear_buffer()
510
511 first_line: list[Scalar] | None
512 if names is not None:
513 # Read first row after header to check if data are longer
514 try:
515 first_line = self._next_line()
516 except StopIteration:
517 first_line = None
518
519 len_first_data_row = 0 if first_line is None else len(first_line)
520
521 if len(names) > len(columns[0]) and len(names) > len_first_data_row:
522 raise ValueError(
523 "Number of passed names did not match "
524 "number of header fields in the file"
525 )
526 if len(columns) > 1:
527 raise TypeError("Cannot pass names with multi-index columns")
528
529 if self.usecols is not None:
530 # Set _use_cols. We don't store columns because they are
531 # overwritten.
532 self._handle_usecols(columns, names, num_original_columns)
533 else:
534 num_original_columns = len(names)
535 if self._col_indices is not None and len(names) != len(
536 self._col_indices
537 ):
538 columns = [[names[i] for i in sorted(self._col_indices)]]
539 else:
540 columns = [names]
541 else:
542 columns = self._handle_usecols(
543 columns, columns[0], num_original_columns
544 )
545 else:
546 ncols = len(self._header_line)
547 num_original_columns = ncols
548
549 if not names:
550 columns = [list(range(ncols))]
551 columns = self._handle_usecols(columns, columns[0], ncols)
552 elif self.usecols is None or len(names) >= ncols:
553 columns = self._handle_usecols([names], names, ncols)
554 num_original_columns = len(names)
555 elif not callable(self.usecols) and len(names) != len(self.usecols):
556 raise ValueError(
557 "Number of passed names did not match number of "
558 "header fields in the file"
559 )
560 else:
561 # Ignore output but set used columns.
562 columns = [names]
563 self._handle_usecols(columns, columns[0], ncols)
564
565 return columns, num_original_columns, unnamed_cols
566
567 @cache_readonly
568 def _header_line(self):
569 # Store line for reuse in _get_index_name
570 if self.header is not None:
571 return None
572
573 try:
574 line = self._buffered_line()
575 except StopIteration as err:
576 if not self.names:
577 raise EmptyDataError("No columns to parse from file") from err
578
579 line = self.names[:]
580 return line
581
582 def _handle_usecols(
583 self,
584 columns: list[list[Scalar | None]],
585 usecols_key: list[Scalar | None],
586 num_original_columns: int,
587 ) -> list[list[Scalar | None]]:
588 """
589 Sets self._col_indices
590
591 usecols_key is used if there are string usecols.
592 """
593 col_indices: set[int] | list[int]
594 if self.usecols is not None:
595 if callable(self.usecols):
596 col_indices = self._evaluate_usecols(self.usecols, usecols_key)
597 elif any(isinstance(u, str) for u in self.usecols):
598 if len(columns) > 1:
599 raise ValueError(
600 "If using multiple headers, usecols must be integers."
601 )
602 col_indices = []
603
604 for col in self.usecols:
605 if isinstance(col, str):
606 try:
607 col_indices.append(usecols_key.index(col))
608 except ValueError:
609 self._validate_usecols_names(self.usecols, usecols_key)
610 else:
611 col_indices.append(col)
612 else:
613 missing_usecols = [
614 col for col in self.usecols if col >= num_original_columns
615 ]
616 if missing_usecols:
617 raise ParserError(
618 "Defining usecols with out-of-bounds indices is not allowed. "
619 f"{missing_usecols} are out-of-bounds.",
620 )
621 col_indices = self.usecols
622
623 columns = [
624 [n for i, n in enumerate(column) if i in col_indices]
625 for column in columns
626 ]
627 self._col_indices = sorted(col_indices)
628 return columns
629
630 def _buffered_line(self) -> list[Scalar]:
631 """
632 Return a line from buffer, filling buffer if required.
633 """
634 if len(self.buf) > 0:
635 return self.buf[0]
636 else:
637 return self._next_line()
638
639 def _check_for_bom(self, first_row: list[Scalar]) -> list[Scalar]:
640 """
641 Checks whether the file begins with the BOM character.
642 If it does, remove it. In addition, if there is quoting
643 in the field subsequent to the BOM, remove it as well
644 because it technically takes place at the beginning of
645 the name, not the middle of it.
646 """
647 # first_row will be a list, so we need to check
648 # that that list is not empty before proceeding.
649 if not first_row:
650 return first_row
651
652 # The first element of this row is the one that could have the
653 # BOM that we want to remove. Check that the first element is a
654 # string before proceeding.
655 if not isinstance(first_row[0], str):
656 return first_row
657
658 # Check that the string is not empty, as that would
659 # obviously not have a BOM at the start of it.
660 if not first_row[0]:
661 return first_row
662
663 # Since the string is non-empty, check that it does
664 # in fact begin with a BOM.
665 first_elt = first_row[0][0]
666 if first_elt != _BOM:
667 return first_row
668
669 first_row_bom = first_row[0]
670 new_row: str
671
672 if len(first_row_bom) > 1 and first_row_bom[1] == self.quotechar:
673 start = 2
674 quote = first_row_bom[1]
675 end = first_row_bom[2:].index(quote) + 2
676
677 # Extract the data between the quotation marks
678 new_row = first_row_bom[start:end]
679
680 # Extract any remaining data after the second
681 # quotation mark.
682 if len(first_row_bom) > end + 1:
683 new_row += first_row_bom[end + 1 :]
684
685 else:
686 # No quotation so just remove BOM from first element
687 new_row = first_row_bom[1:]
688
689 new_row_list: list[Scalar] = [new_row]
690 return new_row_list + first_row[1:]
691
692 def _is_line_empty(self, line: list[Scalar]) -> bool:
693 """
694 Check if a line is empty or not.
695
696 Parameters
697 ----------
698 line : str, array-like
699 The line of data to check.
700
701 Returns
702 -------
703 boolean : Whether or not the line is empty.
704 """
705 return not line or all(not x for x in line)
706
707 def _next_line(self) -> list[Scalar]:
708 if isinstance(self.data, list):
709 while self.skipfunc(self.pos):
710 if self.pos >= len(self.data):
711 break
712 self.pos += 1
713
714 while True:
715 try:
716 line = self._check_comments([self.data[self.pos]])[0]
717 self.pos += 1
718 # either uncommented or blank to begin with
719 if not self.skip_blank_lines and (
720 self._is_line_empty(self.data[self.pos - 1]) or line
721 ):
722 break
723 if self.skip_blank_lines:
724 ret = self._remove_empty_lines([line])
725 if ret:
726 line = ret[0]
727 break
728 except IndexError:
729 raise StopIteration
730 else:
731 while self.skipfunc(self.pos):
732 self.pos += 1
733 # assert for mypy, data is Iterator[str] or None, would error in next
734 assert self.data is not None
735 next(self.data)
736
737 while True:
738 orig_line = self._next_iter_line(row_num=self.pos + 1)
739 self.pos += 1
740
741 if orig_line is not None:
742 line = self._check_comments([orig_line])[0]
743
744 if self.skip_blank_lines:
745 ret = self._remove_empty_lines([line])
746
747 if ret:
748 line = ret[0]
749 break
750 elif self._is_line_empty(orig_line) or line:
751 break
752
753 # This was the first line of the file,
754 # which could contain the BOM at the
755 # beginning of it.
756 if self.pos == 1:
757 line = self._check_for_bom(line)
758
759 self.line_pos += 1
760 self.buf.append(line)
761 return line
762
763 def _alert_malformed(self, msg: str, row_num: int) -> None:
764 """
765 Alert a user about a malformed row, depending on value of
766 `self.on_bad_lines` enum.
767
768 If `self.on_bad_lines` is ERROR, the alert will be `ParserError`.
769 If `self.on_bad_lines` is WARN, the alert will be printed out.
770
771 Parameters
772 ----------
773 msg: str
774 The error message to display.
775 row_num: int
776 The row number where the parsing error occurred.
777 Because this row number is displayed, we 1-index,
778 even though we 0-index internally.
779 """
780 if self.on_bad_lines == self.BadLineHandleMethod.ERROR:
781 raise ParserError(msg)
782 if self.on_bad_lines == self.BadLineHandleMethod.WARN:
783 warnings.warn(
784 f"Skipping line {row_num}: {msg}\n",
785 ParserWarning,
786 stacklevel=find_stack_level(),
787 )
788
789 def _next_iter_line(self, row_num: int) -> list[Scalar] | None:
790 """
791 Wrapper around iterating through `self.data` (CSV source).
792
793 When a CSV error is raised, we check for specific
794 error messages that allow us to customize the
795 error message displayed to the user.
796
797 Parameters
798 ----------
799 row_num: int
800 The row number of the line being parsed.
801 """
802 try:
803 # assert for mypy, data is Iterator[str] or None, would error in next
804 assert self.data is not None
805 line = next(self.data)
806 # for mypy
807 assert isinstance(line, list)
808 return line
809 except csv.Error as e:
810 if self.on_bad_lines in (
811 self.BadLineHandleMethod.ERROR,
812 self.BadLineHandleMethod.WARN,
813 ):
814 msg = str(e)
815
816 if "NULL byte" in msg or "line contains NUL" in msg:
817 msg = (
818 "NULL byte detected. This byte "
819 "cannot be processed in Python's "
820 "native csv library at the moment, "
821 "so please pass in engine='c' instead"
822 )
823
824 if self.skipfooter > 0:
825 reason = (
826 "Error could possibly be due to "
827 "parsing errors in the skipped footer rows "
828 "(the skipfooter keyword is only applied "
829 "after Python's csv library has parsed "
830 "all rows)."
831 )
832 msg += ". " + reason
833
834 self._alert_malformed(msg, row_num)
835 return None
836
837 def _check_comments(self, lines: list[list[Scalar]]) -> list[list[Scalar]]:
838 if self.comment is None:
839 return lines
840 ret = []
841 for line in lines:
842 rl = []
843 for x in line:
844 if (
845 not isinstance(x, str)
846 or self.comment not in x
847 or x in self.na_values
848 ):
849 rl.append(x)
850 else:
851 x = x[: x.find(self.comment)]
852 if len(x) > 0:
853 rl.append(x)
854 break
855 ret.append(rl)
856 return ret
857
858 def _remove_empty_lines(self, lines: list[list[Scalar]]) -> list[list[Scalar]]:
859 """
860 Iterate through the lines and remove any that are
861 either empty or contain only one whitespace value
862
863 Parameters
864 ----------
865 lines : list of list of Scalars
866 The array of lines that we are to filter.
867
868 Returns
869 -------
870 filtered_lines : list of list of Scalars
871 The same array of lines with the "empty" ones removed.
872 """
873 # Remove empty lines and lines with only one whitespace value
874 ret = [
875 line
876 for line in lines
877 if (
878 len(line) > 1
879 or len(line) == 1
880 and (not isinstance(line[0], str) or line[0].strip())
881 )
882 ]
883 return ret
884
885 def _check_thousands(self, lines: list[list[Scalar]]) -> list[list[Scalar]]:
886 if self.thousands is None:
887 return lines
888
889 return self._search_replace_num_columns(
890 lines=lines, search=self.thousands, replace=""
891 )
892
893 def _search_replace_num_columns(
894 self, lines: list[list[Scalar]], search: str, replace: str
895 ) -> list[list[Scalar]]:
896 ret = []
897 for line in lines:
898 rl = []
899 for i, x in enumerate(line):
900 if (
901 not isinstance(x, str)
902 or search not in x
903 or i in self._no_thousands_columns
904 or not self.num.search(x.strip())
905 ):
906 rl.append(x)
907 else:
908 rl.append(x.replace(search, replace))
909 ret.append(rl)
910 return ret
911
912 def _check_decimal(self, lines: list[list[Scalar]]) -> list[list[Scalar]]:
913 if self.decimal == parser_defaults["decimal"]:
914 return lines
915
916 return self._search_replace_num_columns(
917 lines=lines, search=self.decimal, replace="."
918 )
919
920 def _clear_buffer(self) -> None:
921 self.buf = []
922
923 def _get_index_name(
924 self,
925 ) -> tuple[Sequence[Hashable] | None, list[Hashable], list[Hashable]]:
926 """
927 Try several cases to get lines:
928
929 0) There are headers on row 0 and row 1 and their
930 total summed lengths equals the length of the next line.
931 Treat row 0 as columns and row 1 as indices
932 1) Look for implicit index: there are more columns
933 on row 1 than row 0. If this is true, assume that row
934 1 lists index columns and row 0 lists normal columns.
935 2) Get index from the columns if it was listed.
936 """
937 columns: Sequence[Hashable] = self.orig_names
938 orig_names = list(columns)
939 columns = list(columns)
940
941 line: list[Scalar] | None
942 if self._header_line is not None:
943 line = self._header_line
944 else:
945 try:
946 line = self._next_line()
947 except StopIteration:
948 line = None
949
950 next_line: list[Scalar] | None
951 try:
952 next_line = self._next_line()
953 except StopIteration:
954 next_line = None
955
956 # implicitly index_col=0 b/c 1 fewer column names
957 implicit_first_cols = 0
958 if line is not None:
959 # leave it 0, #2442
960 # Case 1
961 # error: Cannot determine type of 'index_col'
962 index_col = self.index_col # type: ignore[has-type]
963 if index_col is not False:
964 implicit_first_cols = len(line) - self.num_original_columns
965
966 # Case 0
967 if (
968 next_line is not None
969 and self.header is not None
970 and index_col is not False
971 ):
972 if len(next_line) == len(line) + self.num_original_columns:
973 # column and index names on diff rows
974 self.index_col = list(range(len(line)))
975 self.buf = self.buf[1:]
976
977 for c in reversed(line):
978 columns.insert(0, c)
979
980 # Update list of original names to include all indices.
981 orig_names = list(columns)
982 self.num_original_columns = len(columns)
983 return line, orig_names, columns
984
985 if implicit_first_cols > 0:
986 # Case 1
987 self._implicit_index = True
988 if self.index_col is None:
989 self.index_col = list(range(implicit_first_cols))
990
991 index_name = None
992
993 else:
994 # Case 2
995 (index_name, _, self.index_col) = self._clean_index_names(
996 columns, self.index_col
997 )
998
999 return index_name, orig_names, columns
1000
1001 def _rows_to_cols(self, content: list[list[Scalar]]) -> list[np.ndarray]:
1002 col_len = self.num_original_columns
1003
1004 if self._implicit_index:
1005 col_len += len(self.index_col)
1006
1007 max_len = max(len(row) for row in content)
1008
1009 # Check that there are no rows with too many
1010 # elements in their row (rows with too few
1011 # elements are padded with NaN).
1012 # error: Non-overlapping identity check (left operand type: "List[int]",
1013 # right operand type: "Literal[False]")
1014 if (
1015 max_len > col_len
1016 and self.index_col is not False # type: ignore[comparison-overlap]
1017 and self.usecols is None
1018 ):
1019 footers = self.skipfooter if self.skipfooter else 0
1020 bad_lines = []
1021
1022 iter_content = enumerate(content)
1023 content_len = len(content)
1024 content = []
1025
1026 for i, _content in iter_content:
1027 actual_len = len(_content)
1028
1029 if actual_len > col_len:
1030 if callable(self.on_bad_lines):
1031 new_l = self.on_bad_lines(_content)
1032 if new_l is not None:
1033 content.append(new_l)
1034 elif self.on_bad_lines in (
1035 self.BadLineHandleMethod.ERROR,
1036 self.BadLineHandleMethod.WARN,
1037 ):
1038 row_num = self.pos - (content_len - i + footers)
1039 bad_lines.append((row_num, actual_len))
1040
1041 if self.on_bad_lines == self.BadLineHandleMethod.ERROR:
1042 break
1043 else:
1044 content.append(_content)
1045
1046 for row_num, actual_len in bad_lines:
1047 msg = (
1048 f"Expected {col_len} fields in line {row_num + 1}, saw "
1049 f"{actual_len}"
1050 )
1051 if (
1052 self.delimiter
1053 and len(self.delimiter) > 1
1054 and self.quoting != csv.QUOTE_NONE
1055 ):
1056 # see gh-13374
1057 reason = (
1058 "Error could possibly be due to quotes being "
1059 "ignored when a multi-char delimiter is used."
1060 )
1061 msg += ". " + reason
1062
1063 self._alert_malformed(msg, row_num + 1)
1064
1065 # see gh-13320
1066 zipped_content = list(lib.to_object_array(content, min_width=col_len).T)
1067
1068 if self.usecols:
1069 assert self._col_indices is not None
1070 col_indices = self._col_indices
1071
1072 if self._implicit_index:
1073 zipped_content = [
1074 a
1075 for i, a in enumerate(zipped_content)
1076 if (
1077 i < len(self.index_col)
1078 or i - len(self.index_col) in col_indices
1079 )
1080 ]
1081 else:
1082 zipped_content = [
1083 a for i, a in enumerate(zipped_content) if i in col_indices
1084 ]
1085 return zipped_content
1086
1087 def _get_lines(self, rows: int | None = None) -> list[list[Scalar]]:
1088 lines = self.buf
1089 new_rows = None
1090
1091 # already fetched some number
1092 if rows is not None:
1093 # we already have the lines in the buffer
1094 if len(self.buf) >= rows:
1095 new_rows, self.buf = self.buf[:rows], self.buf[rows:]
1096
1097 # need some lines
1098 else:
1099 rows -= len(self.buf)
1100
1101 if new_rows is None:
1102 if isinstance(self.data, list):
1103 if self.pos > len(self.data):
1104 raise StopIteration
1105 if rows is None:
1106 new_rows = self.data[self.pos :]
1107 new_pos = len(self.data)
1108 else:
1109 new_rows = self.data[self.pos : self.pos + rows]
1110 new_pos = self.pos + rows
1111
1112 new_rows = self._remove_skipped_rows(new_rows)
1113 lines.extend(new_rows)
1114 self.pos = new_pos
1115
1116 else:
1117 new_rows = []
1118 try:
1119 if rows is not None:
1120 row_index = 0
1121 row_ct = 0
1122 offset = self.pos if self.pos is not None else 0
1123 while row_ct < rows:
1124 # assert for mypy, data is Iterator[str] or None, would
1125 # error in next
1126 assert self.data is not None
1127 new_row = next(self.data)
1128 if not self.skipfunc(offset + row_index):
1129 row_ct += 1
1130 row_index += 1
1131 new_rows.append(new_row)
1132
1133 len_new_rows = len(new_rows)
1134 new_rows = self._remove_skipped_rows(new_rows)
1135 lines.extend(new_rows)
1136 else:
1137 rows = 0
1138
1139 while True:
1140 next_row = self._next_iter_line(row_num=self.pos + rows + 1)
1141 rows += 1
1142
1143 if next_row is not None:
1144 new_rows.append(next_row)
1145 len_new_rows = len(new_rows)
1146
1147 except StopIteration:
1148 len_new_rows = len(new_rows)
1149 new_rows = self._remove_skipped_rows(new_rows)
1150 lines.extend(new_rows)
1151 if len(lines) == 0:
1152 raise
1153 self.pos += len_new_rows
1154
1155 self.buf = []
1156 else:
1157 lines = new_rows
1158
1159 if self.skipfooter:
1160 lines = lines[: -self.skipfooter]
1161
1162 lines = self._check_comments(lines)
1163 if self.skip_blank_lines:
1164 lines = self._remove_empty_lines(lines)
1165 lines = self._check_thousands(lines)
1166 return self._check_decimal(lines)
1167
1168 def _remove_skipped_rows(self, new_rows: list[list[Scalar]]) -> list[list[Scalar]]:
1169 if self.skiprows:
1170 return [
1171 row for i, row in enumerate(new_rows) if not self.skipfunc(i + self.pos)
1172 ]
1173 return new_rows
1174
1175 def _set_no_thousand_columns(self) -> set[int]:
1176 no_thousands_columns: set[int] = set()
1177 if self.columns and self.parse_dates:
1178 assert self._col_indices is not None
1179 no_thousands_columns = self._set_noconvert_dtype_columns(
1180 self._col_indices, self.columns
1181 )
1182 if self.columns and self.dtype:
1183 assert self._col_indices is not None
1184 for i, col in zip(self._col_indices, self.columns):
1185 if not isinstance(self.dtype, dict) and not is_numeric_dtype(
1186 self.dtype
1187 ):
1188 no_thousands_columns.add(i)
1189 if (
1190 isinstance(self.dtype, dict)
1191 and col in self.dtype
1192 and (
1193 not is_numeric_dtype(self.dtype[col])
1194 or is_bool_dtype(self.dtype[col])
1195 )
1196 ):
1197 no_thousands_columns.add(i)
1198 return no_thousands_columns
1199
1200
1201class FixedWidthReader(abc.Iterator):
1202 """
1203 A reader of fixed-width lines.
1204 """
1205
1206 def __init__(
1207 self,
1208 f: IO[str] | ReadCsvBuffer[str],
1209 colspecs: list[tuple[int, int]] | Literal["infer"],
1210 delimiter: str | None,
1211 comment: str | None,
1212 skiprows: set[int] | None = None,
1213 infer_nrows: int = 100,
1214 ) -> None:
1215 self.f = f
1216 self.buffer: Iterator | None = None
1217 self.delimiter = "\r\n" + delimiter if delimiter else "\n\r\t "
1218 self.comment = comment
1219 if colspecs == "infer":
1220 self.colspecs = self.detect_colspecs(
1221 infer_nrows=infer_nrows, skiprows=skiprows
1222 )
1223 else:
1224 self.colspecs = colspecs
1225
1226 if not isinstance(self.colspecs, (tuple, list)):
1227 raise TypeError(
1228 "column specifications must be a list or tuple, "
1229 f"input was a {type(colspecs).__name__}"
1230 )
1231
1232 for colspec in self.colspecs:
1233 if not (
1234 isinstance(colspec, (tuple, list))
1235 and len(colspec) == 2
1236 and isinstance(colspec[0], (int, np.integer, type(None)))
1237 and isinstance(colspec[1], (int, np.integer, type(None)))
1238 ):
1239 raise TypeError(
1240 "Each column specification must be "
1241 "2 element tuple or list of integers"
1242 )
1243
1244 def get_rows(self, infer_nrows: int, skiprows: set[int] | None = None) -> list[str]:
1245 """
1246 Read rows from self.f, skipping as specified.
1247
1248 We distinguish buffer_rows (the first <= infer_nrows
1249 lines) from the rows returned to detect_colspecs
1250 because it's simpler to leave the other locations
1251 with skiprows logic alone than to modify them to
1252 deal with the fact we skipped some rows here as
1253 well.
1254
1255 Parameters
1256 ----------
1257 infer_nrows : int
1258 Number of rows to read from self.f, not counting
1259 rows that are skipped.
1260 skiprows: set, optional
1261 Indices of rows to skip.
1262
1263 Returns
1264 -------
1265 detect_rows : list of str
1266 A list containing the rows to read.
1267
1268 """
1269 if skiprows is None:
1270 skiprows = set()
1271 buffer_rows = []
1272 detect_rows = []
1273 for i, row in enumerate(self.f):
1274 if i not in skiprows:
1275 detect_rows.append(row)
1276 buffer_rows.append(row)
1277 if len(detect_rows) >= infer_nrows:
1278 break
1279 self.buffer = iter(buffer_rows)
1280 return detect_rows
1281
1282 def detect_colspecs(
1283 self, infer_nrows: int = 100, skiprows: set[int] | None = None
1284 ) -> list[tuple[int, int]]:
1285 # Regex escape the delimiters
1286 delimiters = "".join([rf"\{x}" for x in self.delimiter])
1287 pattern = re.compile(f"([^{delimiters}]+)")
1288 rows = self.get_rows(infer_nrows, skiprows)
1289 if not rows:
1290 raise EmptyDataError("No rows from which to infer column width")
1291 max_len = max(map(len, rows))
1292 mask = np.zeros(max_len + 1, dtype=int)
1293 if self.comment is not None:
1294 rows = [row.partition(self.comment)[0] for row in rows]
1295 for row in rows:
1296 for m in pattern.finditer(row):
1297 mask[m.start() : m.end()] = 1
1298 shifted = np.roll(mask, 1)
1299 shifted[0] = 0
1300 edges = np.where((mask ^ shifted) == 1)[0]
1301 edge_pairs = list(zip(edges[::2], edges[1::2]))
1302 return edge_pairs
1303
1304 def __next__(self) -> list[str]:
1305 # Argument 1 to "next" has incompatible type "Union[IO[str],
1306 # ReadCsvBuffer[str]]"; expected "SupportsNext[str]"
1307 if self.buffer is not None:
1308 try:
1309 line = next(self.buffer)
1310 except StopIteration:
1311 self.buffer = None
1312 line = next(self.f) # type: ignore[arg-type]
1313 else:
1314 line = next(self.f) # type: ignore[arg-type]
1315 # Note: 'colspecs' is a sequence of half-open intervals.
1316 return [line[from_:to].strip(self.delimiter) for (from_, to) in self.colspecs]
1317
1318
1319class FixedWidthFieldParser(PythonParser):
1320 """
1321 Specialization that Converts fixed-width fields into DataFrames.
1322 See PythonParser for details.
1323 """
1324
1325 def __init__(self, f: ReadCsvBuffer[str], **kwds) -> None:
1326 # Support iterators, convert to a list.
1327 self.colspecs = kwds.pop("colspecs")
1328 self.infer_nrows = kwds.pop("infer_nrows")
1329 PythonParser.__init__(self, f, **kwds)
1330
1331 def _make_reader(self, f: IO[str] | ReadCsvBuffer[str]) -> FixedWidthReader:
1332 return FixedWidthReader(
1333 f,
1334 self.colspecs,
1335 self.delimiter,
1336 self.comment,
1337 self.skiprows,
1338 self.infer_nrows,
1339 )
1340
1341 def _remove_empty_lines(self, lines: list[list[Scalar]]) -> list[list[Scalar]]:
1342 """
1343 Returns the list of lines without the empty ones. With fixed-width
1344 fields, empty lines become arrays of empty strings.
1345
1346 See PythonParser._remove_empty_lines.
1347 """
1348 return [
1349 line
1350 for line in lines
1351 if any(not isinstance(e, str) or e.strip() for e in line)
1352 ]
1353
1354
1355def count_empty_vals(vals) -> int:
1356 return sum(1 for v in vals if v == "" or v is None)
1357
1358
1359def _validate_skipfooter_arg(skipfooter: int) -> int:
1360 """
1361 Validate the 'skipfooter' parameter.
1362
1363 Checks whether 'skipfooter' is a non-negative integer.
1364 Raises a ValueError if that is not the case.
1365
1366 Parameters
1367 ----------
1368 skipfooter : non-negative integer
1369 The number of rows to skip at the end of the file.
1370
1371 Returns
1372 -------
1373 validated_skipfooter : non-negative integer
1374 The original input if the validation succeeds.
1375
1376 Raises
1377 ------
1378 ValueError : 'skipfooter' was not a non-negative integer.
1379 """
1380 if not is_integer(skipfooter):
1381 raise ValueError("skipfooter must be an integer")
1382
1383 if skipfooter < 0:
1384 raise ValueError("skipfooter cannot be negative")
1385
1386 # Incompatible return value type (got "Union[int, integer[Any]]", expected "int")
1387 return skipfooter # type: ignore[return-value]