Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.9/dist-packages/pandas/io/parsers/python_parser.py: 57%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

656 statements  

1from __future__ import annotations 

2 

3from collections import ( 

4 abc, 

5 defaultdict, 

6) 

7from collections.abc import ( 

8 Hashable, 

9 Iterator, 

10 Mapping, 

11 Sequence, 

12) 

13import csv 

14from io import StringIO 

15import re 

16from typing import ( 

17 IO, 

18 TYPE_CHECKING, 

19 DefaultDict, 

20 Literal, 

21 cast, 

22) 

23import warnings 

24 

25import numpy as np 

26 

27from pandas._libs import lib 

28from pandas.errors import ( 

29 EmptyDataError, 

30 ParserError, 

31 ParserWarning, 

32) 

33from pandas.util._decorators import cache_readonly 

34from pandas.util._exceptions import find_stack_level 

35 

36from pandas.core.dtypes.common import ( 

37 is_bool_dtype, 

38 is_integer, 

39 is_numeric_dtype, 

40) 

41from pandas.core.dtypes.inference import is_dict_like 

42 

43from pandas.io.common import ( 

44 dedup_names, 

45 is_potential_multi_index, 

46) 

47from pandas.io.parsers.base_parser import ( 

48 ParserBase, 

49 parser_defaults, 

50) 

51 

52if TYPE_CHECKING: 

53 from pandas._typing import ( 

54 ArrayLike, 

55 ReadCsvBuffer, 

56 Scalar, 

57 ) 

58 

59 from pandas import ( 

60 Index, 

61 MultiIndex, 

62 ) 

63 

64# BOM character (byte order mark) 

65# This exists at the beginning of a file to indicate endianness 

66# of a file (stream). Unfortunately, this marker screws up parsing, 

67# so we need to remove it if we see it. 

68_BOM = "\ufeff" 

69 

70 

71class PythonParser(ParserBase): 

72 _no_thousands_columns: set[int] 

73 

74 def __init__(self, f: ReadCsvBuffer[str] | list, **kwds) -> None: 

75 """ 

76 Workhorse function for processing nested list into DataFrame 

77 """ 

78 super().__init__(kwds) 

79 

80 self.data: Iterator[str] | None = None 

81 self.buf: list = [] 

82 self.pos = 0 

83 self.line_pos = 0 

84 

85 self.skiprows = kwds["skiprows"] 

86 

87 if callable(self.skiprows): 

88 self.skipfunc = self.skiprows 

89 else: 

90 self.skipfunc = lambda x: x in self.skiprows 

91 

92 self.skipfooter = _validate_skipfooter_arg(kwds["skipfooter"]) 

93 self.delimiter = kwds["delimiter"] 

94 

95 self.quotechar = kwds["quotechar"] 

96 if isinstance(self.quotechar, str): 

97 self.quotechar = str(self.quotechar) 

98 

99 self.escapechar = kwds["escapechar"] 

100 self.doublequote = kwds["doublequote"] 

101 self.skipinitialspace = kwds["skipinitialspace"] 

102 self.lineterminator = kwds["lineterminator"] 

103 self.quoting = kwds["quoting"] 

104 self.skip_blank_lines = kwds["skip_blank_lines"] 

105 

106 self.has_index_names = False 

107 if "has_index_names" in kwds: 

108 self.has_index_names = kwds["has_index_names"] 

109 

110 self.verbose = kwds["verbose"] 

111 

112 self.thousands = kwds["thousands"] 

113 self.decimal = kwds["decimal"] 

114 

115 self.comment = kwds["comment"] 

116 

117 # Set self.data to something that can read lines. 

118 if isinstance(f, list): 

119 # read_excel: f is a list 

120 self.data = cast(Iterator[str], f) 

121 else: 

122 assert hasattr(f, "readline") 

123 self.data = self._make_reader(f) 

124 

125 # Get columns in two steps: infer from data, then 

126 # infer column indices from self.usecols if it is specified. 

127 self._col_indices: list[int] | None = None 

128 columns: list[list[Scalar | None]] 

129 ( 

130 columns, 

131 self.num_original_columns, 

132 self.unnamed_cols, 

133 ) = self._infer_columns() 

134 

135 # Now self.columns has the set of columns that we will process. 

136 # The original set is stored in self.original_columns. 

137 # error: Cannot determine type of 'index_names' 

138 ( 

139 self.columns, 

140 self.index_names, 

141 self.col_names, 

142 _, 

143 ) = self._extract_multi_indexer_columns( 

144 columns, 

145 self.index_names, # type: ignore[has-type] 

146 ) 

147 

148 # get popped off for index 

149 self.orig_names: list[Hashable] = list(self.columns) 

150 

151 # needs to be cleaned/refactored 

152 # multiple date column thing turning into a real spaghetti factory 

153 

154 if not self._has_complex_date_col: 

155 (index_names, self.orig_names, self.columns) = self._get_index_name() 

156 self._name_processed = True 

157 if self.index_names is None: 

158 self.index_names = index_names 

159 

160 if self._col_indices is None: 

161 self._col_indices = list(range(len(self.columns))) 

162 

163 self._parse_date_cols = self._validate_parse_dates_presence(self.columns) 

164 self._no_thousands_columns = self._set_no_thousand_columns() 

165 

166 if len(self.decimal) != 1: 

167 raise ValueError("Only length-1 decimal markers supported") 

168 

169 @cache_readonly 

170 def num(self) -> re.Pattern: 

171 decimal = re.escape(self.decimal) 

172 if self.thousands is None: 

173 regex = rf"^[\-\+]?[0-9]*({decimal}[0-9]*)?([0-9]?(E|e)\-?[0-9]+)?$" 

174 else: 

175 thousands = re.escape(self.thousands) 

176 regex = ( 

177 rf"^[\-\+]?([0-9]+{thousands}|[0-9])*({decimal}[0-9]*)?" 

178 rf"([0-9]?(E|e)\-?[0-9]+)?$" 

179 ) 

180 return re.compile(regex) 

181 

182 def _make_reader(self, f: IO[str] | ReadCsvBuffer[str]): 

183 sep = self.delimiter 

184 

185 if sep is None or len(sep) == 1: 

186 if self.lineterminator: 

187 raise ValueError( 

188 "Custom line terminators not supported in python parser (yet)" 

189 ) 

190 

191 class MyDialect(csv.Dialect): 

192 delimiter = self.delimiter 

193 quotechar = self.quotechar 

194 escapechar = self.escapechar 

195 doublequote = self.doublequote 

196 skipinitialspace = self.skipinitialspace 

197 quoting = self.quoting 

198 lineterminator = "\n" 

199 

200 dia = MyDialect 

201 

202 if sep is not None: 

203 dia.delimiter = sep 

204 else: 

205 # attempt to sniff the delimiter from the first valid line, 

206 # i.e. no comment line and not in skiprows 

207 line = f.readline() 

208 lines = self._check_comments([[line]])[0] 

209 while self.skipfunc(self.pos) or not lines: 

210 self.pos += 1 

211 line = f.readline() 

212 lines = self._check_comments([[line]])[0] 

213 lines_str = cast(list[str], lines) 

214 

215 # since `line` was a string, lines will be a list containing 

216 # only a single string 

217 line = lines_str[0] 

218 

219 self.pos += 1 

220 self.line_pos += 1 

221 sniffed = csv.Sniffer().sniff(line) 

222 dia.delimiter = sniffed.delimiter 

223 

224 # Note: encoding is irrelevant here 

225 line_rdr = csv.reader(StringIO(line), dialect=dia) 

226 self.buf.extend(list(line_rdr)) 

227 

228 # Note: encoding is irrelevant here 

229 reader = csv.reader(f, dialect=dia, strict=True) 

230 

231 else: 

232 

233 def _read(): 

234 line = f.readline() 

235 pat = re.compile(sep) 

236 

237 yield pat.split(line.strip()) 

238 

239 for line in f: 

240 yield pat.split(line.strip()) 

241 

242 reader = _read() 

243 

244 return reader 

245 

246 def read( 

247 self, rows: int | None = None 

248 ) -> tuple[ 

249 Index | None, Sequence[Hashable] | MultiIndex, Mapping[Hashable, ArrayLike] 

250 ]: 

251 try: 

252 content = self._get_lines(rows) 

253 except StopIteration: 

254 if self._first_chunk: 

255 content = [] 

256 else: 

257 self.close() 

258 raise 

259 

260 # done with first read, next time raise StopIteration 

261 self._first_chunk = False 

262 

263 columns: Sequence[Hashable] = list(self.orig_names) 

264 if not len(content): # pragma: no cover 

265 # DataFrame with the right metadata, even though it's length 0 

266 # error: Cannot determine type of 'index_col' 

267 names = dedup_names( 

268 self.orig_names, 

269 is_potential_multi_index( 

270 self.orig_names, 

271 self.index_col, # type: ignore[has-type] 

272 ), 

273 ) 

274 index, columns, col_dict = self._get_empty_meta( 

275 names, 

276 self.dtype, 

277 ) 

278 conv_columns = self._maybe_make_multi_index_columns(columns, self.col_names) 

279 return index, conv_columns, col_dict 

280 

281 # handle new style for names in index 

282 count_empty_content_vals = count_empty_vals(content[0]) 

283 indexnamerow = None 

284 if self.has_index_names and count_empty_content_vals == len(columns): 

285 indexnamerow = content[0] 

286 content = content[1:] 

287 

288 alldata = self._rows_to_cols(content) 

289 data, columns = self._exclude_implicit_index(alldata) 

290 

291 conv_data = self._convert_data(data) 

292 columns, conv_data = self._do_date_conversions(columns, conv_data) 

293 

294 index, result_columns = self._make_index( 

295 conv_data, alldata, columns, indexnamerow 

296 ) 

297 

298 return index, result_columns, conv_data 

299 

300 def _exclude_implicit_index( 

301 self, 

302 alldata: list[np.ndarray], 

303 ) -> tuple[Mapping[Hashable, np.ndarray], Sequence[Hashable]]: 

304 # error: Cannot determine type of 'index_col' 

305 names = dedup_names( 

306 self.orig_names, 

307 is_potential_multi_index( 

308 self.orig_names, 

309 self.index_col, # type: ignore[has-type] 

310 ), 

311 ) 

312 

313 offset = 0 

314 if self._implicit_index: 

315 # error: Cannot determine type of 'index_col' 

316 offset = len(self.index_col) # type: ignore[has-type] 

317 

318 len_alldata = len(alldata) 

319 self._check_data_length(names, alldata) 

320 

321 return { 

322 name: alldata[i + offset] for i, name in enumerate(names) if i < len_alldata 

323 }, names 

324 

325 # legacy 

326 def get_chunk( 

327 self, size: int | None = None 

328 ) -> tuple[ 

329 Index | None, Sequence[Hashable] | MultiIndex, Mapping[Hashable, ArrayLike] 

330 ]: 

331 if size is None: 

332 # error: "PythonParser" has no attribute "chunksize" 

333 size = self.chunksize # type: ignore[attr-defined] 

334 return self.read(rows=size) 

335 

336 def _convert_data( 

337 self, 

338 data: Mapping[Hashable, np.ndarray], 

339 ) -> Mapping[Hashable, ArrayLike]: 

340 # apply converters 

341 clean_conv = self._clean_mapping(self.converters) 

342 clean_dtypes = self._clean_mapping(self.dtype) 

343 

344 # Apply NA values. 

345 clean_na_values = {} 

346 clean_na_fvalues = {} 

347 

348 if isinstance(self.na_values, dict): 

349 for col in self.na_values: 

350 na_value = self.na_values[col] 

351 na_fvalue = self.na_fvalues[col] 

352 

353 if isinstance(col, int) and col not in self.orig_names: 

354 col = self.orig_names[col] 

355 

356 clean_na_values[col] = na_value 

357 clean_na_fvalues[col] = na_fvalue 

358 else: 

359 clean_na_values = self.na_values 

360 clean_na_fvalues = self.na_fvalues 

361 

362 return self._convert_to_ndarrays( 

363 data, 

364 clean_na_values, 

365 clean_na_fvalues, 

366 self.verbose, 

367 clean_conv, 

368 clean_dtypes, 

369 ) 

370 

371 @cache_readonly 

372 def _have_mi_columns(self) -> bool: 

373 if self.header is None: 

374 return False 

375 

376 header = self.header 

377 if isinstance(header, (list, tuple, np.ndarray)): 

378 return len(header) > 1 

379 else: 

380 return False 

381 

382 def _infer_columns( 

383 self, 

384 ) -> tuple[list[list[Scalar | None]], int, set[Scalar | None]]: 

385 names = self.names 

386 num_original_columns = 0 

387 clear_buffer = True 

388 unnamed_cols: set[Scalar | None] = set() 

389 

390 if self.header is not None: 

391 header = self.header 

392 have_mi_columns = self._have_mi_columns 

393 

394 if isinstance(header, (list, tuple, np.ndarray)): 

395 # we have a mi columns, so read an extra line 

396 if have_mi_columns: 

397 header = list(header) + [header[-1] + 1] 

398 else: 

399 header = [header] 

400 

401 columns: list[list[Scalar | None]] = [] 

402 for level, hr in enumerate(header): 

403 try: 

404 line = self._buffered_line() 

405 

406 while self.line_pos <= hr: 

407 line = self._next_line() 

408 

409 except StopIteration as err: 

410 if 0 < self.line_pos <= hr and ( 

411 not have_mi_columns or hr != header[-1] 

412 ): 

413 # If no rows we want to raise a different message and if 

414 # we have mi columns, the last line is not part of the header 

415 joi = list(map(str, header[:-1] if have_mi_columns else header)) 

416 msg = f"[{','.join(joi)}], len of {len(joi)}, " 

417 raise ValueError( 

418 f"Passed header={msg}" 

419 f"but only {self.line_pos} lines in file" 

420 ) from err 

421 

422 # We have an empty file, so check 

423 # if columns are provided. That will 

424 # serve as the 'line' for parsing 

425 if have_mi_columns and hr > 0: 

426 if clear_buffer: 

427 self._clear_buffer() 

428 columns.append([None] * len(columns[-1])) 

429 return columns, num_original_columns, unnamed_cols 

430 

431 if not self.names: 

432 raise EmptyDataError("No columns to parse from file") from err 

433 

434 line = self.names[:] 

435 

436 this_columns: list[Scalar | None] = [] 

437 this_unnamed_cols = [] 

438 

439 for i, c in enumerate(line): 

440 if c == "": 

441 if have_mi_columns: 

442 col_name = f"Unnamed: {i}_level_{level}" 

443 else: 

444 col_name = f"Unnamed: {i}" 

445 

446 this_unnamed_cols.append(i) 

447 this_columns.append(col_name) 

448 else: 

449 this_columns.append(c) 

450 

451 if not have_mi_columns: 

452 counts: DefaultDict = defaultdict(int) 

453 # Ensure that regular columns are used before unnamed ones 

454 # to keep given names and mangle unnamed columns 

455 col_loop_order = [ 

456 i 

457 for i in range(len(this_columns)) 

458 if i not in this_unnamed_cols 

459 ] + this_unnamed_cols 

460 

461 # TODO: Use pandas.io.common.dedup_names instead (see #50371) 

462 for i in col_loop_order: 

463 col = this_columns[i] 

464 old_col = col 

465 cur_count = counts[col] 

466 

467 if cur_count > 0: 

468 while cur_count > 0: 

469 counts[old_col] = cur_count + 1 

470 col = f"{old_col}.{cur_count}" 

471 if col in this_columns: 

472 cur_count += 1 

473 else: 

474 cur_count = counts[col] 

475 

476 if ( 

477 self.dtype is not None 

478 and is_dict_like(self.dtype) 

479 and self.dtype.get(old_col) is not None 

480 and self.dtype.get(col) is None 

481 ): 

482 self.dtype.update({col: self.dtype.get(old_col)}) 

483 this_columns[i] = col 

484 counts[col] = cur_count + 1 

485 elif have_mi_columns: 

486 # if we have grabbed an extra line, but its not in our 

487 # format so save in the buffer, and create an blank extra 

488 # line for the rest of the parsing code 

489 if hr == header[-1]: 

490 lc = len(this_columns) 

491 # error: Cannot determine type of 'index_col' 

492 sic = self.index_col # type: ignore[has-type] 

493 ic = len(sic) if sic is not None else 0 

494 unnamed_count = len(this_unnamed_cols) 

495 

496 # if wrong number of blanks or no index, not our format 

497 if (lc != unnamed_count and lc - ic > unnamed_count) or ic == 0: 

498 clear_buffer = False 

499 this_columns = [None] * lc 

500 self.buf = [self.buf[-1]] 

501 

502 columns.append(this_columns) 

503 unnamed_cols.update({this_columns[i] for i in this_unnamed_cols}) 

504 

505 if len(columns) == 1: 

506 num_original_columns = len(this_columns) 

507 

508 if clear_buffer: 

509 self._clear_buffer() 

510 

511 first_line: list[Scalar] | None 

512 if names is not None: 

513 # Read first row after header to check if data are longer 

514 try: 

515 first_line = self._next_line() 

516 except StopIteration: 

517 first_line = None 

518 

519 len_first_data_row = 0 if first_line is None else len(first_line) 

520 

521 if len(names) > len(columns[0]) and len(names) > len_first_data_row: 

522 raise ValueError( 

523 "Number of passed names did not match " 

524 "number of header fields in the file" 

525 ) 

526 if len(columns) > 1: 

527 raise TypeError("Cannot pass names with multi-index columns") 

528 

529 if self.usecols is not None: 

530 # Set _use_cols. We don't store columns because they are 

531 # overwritten. 

532 self._handle_usecols(columns, names, num_original_columns) 

533 else: 

534 num_original_columns = len(names) 

535 if self._col_indices is not None and len(names) != len( 

536 self._col_indices 

537 ): 

538 columns = [[names[i] for i in sorted(self._col_indices)]] 

539 else: 

540 columns = [names] 

541 else: 

542 columns = self._handle_usecols( 

543 columns, columns[0], num_original_columns 

544 ) 

545 else: 

546 ncols = len(self._header_line) 

547 num_original_columns = ncols 

548 

549 if not names: 

550 columns = [list(range(ncols))] 

551 columns = self._handle_usecols(columns, columns[0], ncols) 

552 elif self.usecols is None or len(names) >= ncols: 

553 columns = self._handle_usecols([names], names, ncols) 

554 num_original_columns = len(names) 

555 elif not callable(self.usecols) and len(names) != len(self.usecols): 

556 raise ValueError( 

557 "Number of passed names did not match number of " 

558 "header fields in the file" 

559 ) 

560 else: 

561 # Ignore output but set used columns. 

562 columns = [names] 

563 self._handle_usecols(columns, columns[0], ncols) 

564 

565 return columns, num_original_columns, unnamed_cols 

566 

567 @cache_readonly 

568 def _header_line(self): 

569 # Store line for reuse in _get_index_name 

570 if self.header is not None: 

571 return None 

572 

573 try: 

574 line = self._buffered_line() 

575 except StopIteration as err: 

576 if not self.names: 

577 raise EmptyDataError("No columns to parse from file") from err 

578 

579 line = self.names[:] 

580 return line 

581 

582 def _handle_usecols( 

583 self, 

584 columns: list[list[Scalar | None]], 

585 usecols_key: list[Scalar | None], 

586 num_original_columns: int, 

587 ) -> list[list[Scalar | None]]: 

588 """ 

589 Sets self._col_indices 

590 

591 usecols_key is used if there are string usecols. 

592 """ 

593 col_indices: set[int] | list[int] 

594 if self.usecols is not None: 

595 if callable(self.usecols): 

596 col_indices = self._evaluate_usecols(self.usecols, usecols_key) 

597 elif any(isinstance(u, str) for u in self.usecols): 

598 if len(columns) > 1: 

599 raise ValueError( 

600 "If using multiple headers, usecols must be integers." 

601 ) 

602 col_indices = [] 

603 

604 for col in self.usecols: 

605 if isinstance(col, str): 

606 try: 

607 col_indices.append(usecols_key.index(col)) 

608 except ValueError: 

609 self._validate_usecols_names(self.usecols, usecols_key) 

610 else: 

611 col_indices.append(col) 

612 else: 

613 missing_usecols = [ 

614 col for col in self.usecols if col >= num_original_columns 

615 ] 

616 if missing_usecols: 

617 raise ParserError( 

618 "Defining usecols with out-of-bounds indices is not allowed. " 

619 f"{missing_usecols} are out-of-bounds.", 

620 ) 

621 col_indices = self.usecols 

622 

623 columns = [ 

624 [n for i, n in enumerate(column) if i in col_indices] 

625 for column in columns 

626 ] 

627 self._col_indices = sorted(col_indices) 

628 return columns 

629 

630 def _buffered_line(self) -> list[Scalar]: 

631 """ 

632 Return a line from buffer, filling buffer if required. 

633 """ 

634 if len(self.buf) > 0: 

635 return self.buf[0] 

636 else: 

637 return self._next_line() 

638 

639 def _check_for_bom(self, first_row: list[Scalar]) -> list[Scalar]: 

640 """ 

641 Checks whether the file begins with the BOM character. 

642 If it does, remove it. In addition, if there is quoting 

643 in the field subsequent to the BOM, remove it as well 

644 because it technically takes place at the beginning of 

645 the name, not the middle of it. 

646 """ 

647 # first_row will be a list, so we need to check 

648 # that that list is not empty before proceeding. 

649 if not first_row: 

650 return first_row 

651 

652 # The first element of this row is the one that could have the 

653 # BOM that we want to remove. Check that the first element is a 

654 # string before proceeding. 

655 if not isinstance(first_row[0], str): 

656 return first_row 

657 

658 # Check that the string is not empty, as that would 

659 # obviously not have a BOM at the start of it. 

660 if not first_row[0]: 

661 return first_row 

662 

663 # Since the string is non-empty, check that it does 

664 # in fact begin with a BOM. 

665 first_elt = first_row[0][0] 

666 if first_elt != _BOM: 

667 return first_row 

668 

669 first_row_bom = first_row[0] 

670 new_row: str 

671 

672 if len(first_row_bom) > 1 and first_row_bom[1] == self.quotechar: 

673 start = 2 

674 quote = first_row_bom[1] 

675 end = first_row_bom[2:].index(quote) + 2 

676 

677 # Extract the data between the quotation marks 

678 new_row = first_row_bom[start:end] 

679 

680 # Extract any remaining data after the second 

681 # quotation mark. 

682 if len(first_row_bom) > end + 1: 

683 new_row += first_row_bom[end + 1 :] 

684 

685 else: 

686 # No quotation so just remove BOM from first element 

687 new_row = first_row_bom[1:] 

688 

689 new_row_list: list[Scalar] = [new_row] 

690 return new_row_list + first_row[1:] 

691 

692 def _is_line_empty(self, line: list[Scalar]) -> bool: 

693 """ 

694 Check if a line is empty or not. 

695 

696 Parameters 

697 ---------- 

698 line : str, array-like 

699 The line of data to check. 

700 

701 Returns 

702 ------- 

703 boolean : Whether or not the line is empty. 

704 """ 

705 return not line or all(not x for x in line) 

706 

707 def _next_line(self) -> list[Scalar]: 

708 if isinstance(self.data, list): 

709 while self.skipfunc(self.pos): 

710 if self.pos >= len(self.data): 

711 break 

712 self.pos += 1 

713 

714 while True: 

715 try: 

716 line = self._check_comments([self.data[self.pos]])[0] 

717 self.pos += 1 

718 # either uncommented or blank to begin with 

719 if not self.skip_blank_lines and ( 

720 self._is_line_empty(self.data[self.pos - 1]) or line 

721 ): 

722 break 

723 if self.skip_blank_lines: 

724 ret = self._remove_empty_lines([line]) 

725 if ret: 

726 line = ret[0] 

727 break 

728 except IndexError: 

729 raise StopIteration 

730 else: 

731 while self.skipfunc(self.pos): 

732 self.pos += 1 

733 # assert for mypy, data is Iterator[str] or None, would error in next 

734 assert self.data is not None 

735 next(self.data) 

736 

737 while True: 

738 orig_line = self._next_iter_line(row_num=self.pos + 1) 

739 self.pos += 1 

740 

741 if orig_line is not None: 

742 line = self._check_comments([orig_line])[0] 

743 

744 if self.skip_blank_lines: 

745 ret = self._remove_empty_lines([line]) 

746 

747 if ret: 

748 line = ret[0] 

749 break 

750 elif self._is_line_empty(orig_line) or line: 

751 break 

752 

753 # This was the first line of the file, 

754 # which could contain the BOM at the 

755 # beginning of it. 

756 if self.pos == 1: 

757 line = self._check_for_bom(line) 

758 

759 self.line_pos += 1 

760 self.buf.append(line) 

761 return line 

762 

763 def _alert_malformed(self, msg: str, row_num: int) -> None: 

764 """ 

765 Alert a user about a malformed row, depending on value of 

766 `self.on_bad_lines` enum. 

767 

768 If `self.on_bad_lines` is ERROR, the alert will be `ParserError`. 

769 If `self.on_bad_lines` is WARN, the alert will be printed out. 

770 

771 Parameters 

772 ---------- 

773 msg: str 

774 The error message to display. 

775 row_num: int 

776 The row number where the parsing error occurred. 

777 Because this row number is displayed, we 1-index, 

778 even though we 0-index internally. 

779 """ 

780 if self.on_bad_lines == self.BadLineHandleMethod.ERROR: 

781 raise ParserError(msg) 

782 if self.on_bad_lines == self.BadLineHandleMethod.WARN: 

783 warnings.warn( 

784 f"Skipping line {row_num}: {msg}\n", 

785 ParserWarning, 

786 stacklevel=find_stack_level(), 

787 ) 

788 

789 def _next_iter_line(self, row_num: int) -> list[Scalar] | None: 

790 """ 

791 Wrapper around iterating through `self.data` (CSV source). 

792 

793 When a CSV error is raised, we check for specific 

794 error messages that allow us to customize the 

795 error message displayed to the user. 

796 

797 Parameters 

798 ---------- 

799 row_num: int 

800 The row number of the line being parsed. 

801 """ 

802 try: 

803 # assert for mypy, data is Iterator[str] or None, would error in next 

804 assert self.data is not None 

805 line = next(self.data) 

806 # for mypy 

807 assert isinstance(line, list) 

808 return line 

809 except csv.Error as e: 

810 if self.on_bad_lines in ( 

811 self.BadLineHandleMethod.ERROR, 

812 self.BadLineHandleMethod.WARN, 

813 ): 

814 msg = str(e) 

815 

816 if "NULL byte" in msg or "line contains NUL" in msg: 

817 msg = ( 

818 "NULL byte detected. This byte " 

819 "cannot be processed in Python's " 

820 "native csv library at the moment, " 

821 "so please pass in engine='c' instead" 

822 ) 

823 

824 if self.skipfooter > 0: 

825 reason = ( 

826 "Error could possibly be due to " 

827 "parsing errors in the skipped footer rows " 

828 "(the skipfooter keyword is only applied " 

829 "after Python's csv library has parsed " 

830 "all rows)." 

831 ) 

832 msg += ". " + reason 

833 

834 self._alert_malformed(msg, row_num) 

835 return None 

836 

837 def _check_comments(self, lines: list[list[Scalar]]) -> list[list[Scalar]]: 

838 if self.comment is None: 

839 return lines 

840 ret = [] 

841 for line in lines: 

842 rl = [] 

843 for x in line: 

844 if ( 

845 not isinstance(x, str) 

846 or self.comment not in x 

847 or x in self.na_values 

848 ): 

849 rl.append(x) 

850 else: 

851 x = x[: x.find(self.comment)] 

852 if len(x) > 0: 

853 rl.append(x) 

854 break 

855 ret.append(rl) 

856 return ret 

857 

858 def _remove_empty_lines(self, lines: list[list[Scalar]]) -> list[list[Scalar]]: 

859 """ 

860 Iterate through the lines and remove any that are 

861 either empty or contain only one whitespace value 

862 

863 Parameters 

864 ---------- 

865 lines : list of list of Scalars 

866 The array of lines that we are to filter. 

867 

868 Returns 

869 ------- 

870 filtered_lines : list of list of Scalars 

871 The same array of lines with the "empty" ones removed. 

872 """ 

873 # Remove empty lines and lines with only one whitespace value 

874 ret = [ 

875 line 

876 for line in lines 

877 if ( 

878 len(line) > 1 

879 or len(line) == 1 

880 and (not isinstance(line[0], str) or line[0].strip()) 

881 ) 

882 ] 

883 return ret 

884 

885 def _check_thousands(self, lines: list[list[Scalar]]) -> list[list[Scalar]]: 

886 if self.thousands is None: 

887 return lines 

888 

889 return self._search_replace_num_columns( 

890 lines=lines, search=self.thousands, replace="" 

891 ) 

892 

893 def _search_replace_num_columns( 

894 self, lines: list[list[Scalar]], search: str, replace: str 

895 ) -> list[list[Scalar]]: 

896 ret = [] 

897 for line in lines: 

898 rl = [] 

899 for i, x in enumerate(line): 

900 if ( 

901 not isinstance(x, str) 

902 or search not in x 

903 or i in self._no_thousands_columns 

904 or not self.num.search(x.strip()) 

905 ): 

906 rl.append(x) 

907 else: 

908 rl.append(x.replace(search, replace)) 

909 ret.append(rl) 

910 return ret 

911 

912 def _check_decimal(self, lines: list[list[Scalar]]) -> list[list[Scalar]]: 

913 if self.decimal == parser_defaults["decimal"]: 

914 return lines 

915 

916 return self._search_replace_num_columns( 

917 lines=lines, search=self.decimal, replace="." 

918 ) 

919 

920 def _clear_buffer(self) -> None: 

921 self.buf = [] 

922 

923 def _get_index_name( 

924 self, 

925 ) -> tuple[Sequence[Hashable] | None, list[Hashable], list[Hashable]]: 

926 """ 

927 Try several cases to get lines: 

928 

929 0) There are headers on row 0 and row 1 and their 

930 total summed lengths equals the length of the next line. 

931 Treat row 0 as columns and row 1 as indices 

932 1) Look for implicit index: there are more columns 

933 on row 1 than row 0. If this is true, assume that row 

934 1 lists index columns and row 0 lists normal columns. 

935 2) Get index from the columns if it was listed. 

936 """ 

937 columns: Sequence[Hashable] = self.orig_names 

938 orig_names = list(columns) 

939 columns = list(columns) 

940 

941 line: list[Scalar] | None 

942 if self._header_line is not None: 

943 line = self._header_line 

944 else: 

945 try: 

946 line = self._next_line() 

947 except StopIteration: 

948 line = None 

949 

950 next_line: list[Scalar] | None 

951 try: 

952 next_line = self._next_line() 

953 except StopIteration: 

954 next_line = None 

955 

956 # implicitly index_col=0 b/c 1 fewer column names 

957 implicit_first_cols = 0 

958 if line is not None: 

959 # leave it 0, #2442 

960 # Case 1 

961 # error: Cannot determine type of 'index_col' 

962 index_col = self.index_col # type: ignore[has-type] 

963 if index_col is not False: 

964 implicit_first_cols = len(line) - self.num_original_columns 

965 

966 # Case 0 

967 if ( 

968 next_line is not None 

969 and self.header is not None 

970 and index_col is not False 

971 ): 

972 if len(next_line) == len(line) + self.num_original_columns: 

973 # column and index names on diff rows 

974 self.index_col = list(range(len(line))) 

975 self.buf = self.buf[1:] 

976 

977 for c in reversed(line): 

978 columns.insert(0, c) 

979 

980 # Update list of original names to include all indices. 

981 orig_names = list(columns) 

982 self.num_original_columns = len(columns) 

983 return line, orig_names, columns 

984 

985 if implicit_first_cols > 0: 

986 # Case 1 

987 self._implicit_index = True 

988 if self.index_col is None: 

989 self.index_col = list(range(implicit_first_cols)) 

990 

991 index_name = None 

992 

993 else: 

994 # Case 2 

995 (index_name, _, self.index_col) = self._clean_index_names( 

996 columns, self.index_col 

997 ) 

998 

999 return index_name, orig_names, columns 

1000 

1001 def _rows_to_cols(self, content: list[list[Scalar]]) -> list[np.ndarray]: 

1002 col_len = self.num_original_columns 

1003 

1004 if self._implicit_index: 

1005 col_len += len(self.index_col) 

1006 

1007 max_len = max(len(row) for row in content) 

1008 

1009 # Check that there are no rows with too many 

1010 # elements in their row (rows with too few 

1011 # elements are padded with NaN). 

1012 # error: Non-overlapping identity check (left operand type: "List[int]", 

1013 # right operand type: "Literal[False]") 

1014 if ( 

1015 max_len > col_len 

1016 and self.index_col is not False # type: ignore[comparison-overlap] 

1017 and self.usecols is None 

1018 ): 

1019 footers = self.skipfooter if self.skipfooter else 0 

1020 bad_lines = [] 

1021 

1022 iter_content = enumerate(content) 

1023 content_len = len(content) 

1024 content = [] 

1025 

1026 for i, _content in iter_content: 

1027 actual_len = len(_content) 

1028 

1029 if actual_len > col_len: 

1030 if callable(self.on_bad_lines): 

1031 new_l = self.on_bad_lines(_content) 

1032 if new_l is not None: 

1033 content.append(new_l) 

1034 elif self.on_bad_lines in ( 

1035 self.BadLineHandleMethod.ERROR, 

1036 self.BadLineHandleMethod.WARN, 

1037 ): 

1038 row_num = self.pos - (content_len - i + footers) 

1039 bad_lines.append((row_num, actual_len)) 

1040 

1041 if self.on_bad_lines == self.BadLineHandleMethod.ERROR: 

1042 break 

1043 else: 

1044 content.append(_content) 

1045 

1046 for row_num, actual_len in bad_lines: 

1047 msg = ( 

1048 f"Expected {col_len} fields in line {row_num + 1}, saw " 

1049 f"{actual_len}" 

1050 ) 

1051 if ( 

1052 self.delimiter 

1053 and len(self.delimiter) > 1 

1054 and self.quoting != csv.QUOTE_NONE 

1055 ): 

1056 # see gh-13374 

1057 reason = ( 

1058 "Error could possibly be due to quotes being " 

1059 "ignored when a multi-char delimiter is used." 

1060 ) 

1061 msg += ". " + reason 

1062 

1063 self._alert_malformed(msg, row_num + 1) 

1064 

1065 # see gh-13320 

1066 zipped_content = list(lib.to_object_array(content, min_width=col_len).T) 

1067 

1068 if self.usecols: 

1069 assert self._col_indices is not None 

1070 col_indices = self._col_indices 

1071 

1072 if self._implicit_index: 

1073 zipped_content = [ 

1074 a 

1075 for i, a in enumerate(zipped_content) 

1076 if ( 

1077 i < len(self.index_col) 

1078 or i - len(self.index_col) in col_indices 

1079 ) 

1080 ] 

1081 else: 

1082 zipped_content = [ 

1083 a for i, a in enumerate(zipped_content) if i in col_indices 

1084 ] 

1085 return zipped_content 

1086 

1087 def _get_lines(self, rows: int | None = None) -> list[list[Scalar]]: 

1088 lines = self.buf 

1089 new_rows = None 

1090 

1091 # already fetched some number 

1092 if rows is not None: 

1093 # we already have the lines in the buffer 

1094 if len(self.buf) >= rows: 

1095 new_rows, self.buf = self.buf[:rows], self.buf[rows:] 

1096 

1097 # need some lines 

1098 else: 

1099 rows -= len(self.buf) 

1100 

1101 if new_rows is None: 

1102 if isinstance(self.data, list): 

1103 if self.pos > len(self.data): 

1104 raise StopIteration 

1105 if rows is None: 

1106 new_rows = self.data[self.pos :] 

1107 new_pos = len(self.data) 

1108 else: 

1109 new_rows = self.data[self.pos : self.pos + rows] 

1110 new_pos = self.pos + rows 

1111 

1112 new_rows = self._remove_skipped_rows(new_rows) 

1113 lines.extend(new_rows) 

1114 self.pos = new_pos 

1115 

1116 else: 

1117 new_rows = [] 

1118 try: 

1119 if rows is not None: 

1120 row_index = 0 

1121 row_ct = 0 

1122 offset = self.pos if self.pos is not None else 0 

1123 while row_ct < rows: 

1124 # assert for mypy, data is Iterator[str] or None, would 

1125 # error in next 

1126 assert self.data is not None 

1127 new_row = next(self.data) 

1128 if not self.skipfunc(offset + row_index): 

1129 row_ct += 1 

1130 row_index += 1 

1131 new_rows.append(new_row) 

1132 

1133 len_new_rows = len(new_rows) 

1134 new_rows = self._remove_skipped_rows(new_rows) 

1135 lines.extend(new_rows) 

1136 else: 

1137 rows = 0 

1138 

1139 while True: 

1140 next_row = self._next_iter_line(row_num=self.pos + rows + 1) 

1141 rows += 1 

1142 

1143 if next_row is not None: 

1144 new_rows.append(next_row) 

1145 len_new_rows = len(new_rows) 

1146 

1147 except StopIteration: 

1148 len_new_rows = len(new_rows) 

1149 new_rows = self._remove_skipped_rows(new_rows) 

1150 lines.extend(new_rows) 

1151 if len(lines) == 0: 

1152 raise 

1153 self.pos += len_new_rows 

1154 

1155 self.buf = [] 

1156 else: 

1157 lines = new_rows 

1158 

1159 if self.skipfooter: 

1160 lines = lines[: -self.skipfooter] 

1161 

1162 lines = self._check_comments(lines) 

1163 if self.skip_blank_lines: 

1164 lines = self._remove_empty_lines(lines) 

1165 lines = self._check_thousands(lines) 

1166 return self._check_decimal(lines) 

1167 

1168 def _remove_skipped_rows(self, new_rows: list[list[Scalar]]) -> list[list[Scalar]]: 

1169 if self.skiprows: 

1170 return [ 

1171 row for i, row in enumerate(new_rows) if not self.skipfunc(i + self.pos) 

1172 ] 

1173 return new_rows 

1174 

1175 def _set_no_thousand_columns(self) -> set[int]: 

1176 no_thousands_columns: set[int] = set() 

1177 if self.columns and self.parse_dates: 

1178 assert self._col_indices is not None 

1179 no_thousands_columns = self._set_noconvert_dtype_columns( 

1180 self._col_indices, self.columns 

1181 ) 

1182 if self.columns and self.dtype: 

1183 assert self._col_indices is not None 

1184 for i, col in zip(self._col_indices, self.columns): 

1185 if not isinstance(self.dtype, dict) and not is_numeric_dtype( 

1186 self.dtype 

1187 ): 

1188 no_thousands_columns.add(i) 

1189 if ( 

1190 isinstance(self.dtype, dict) 

1191 and col in self.dtype 

1192 and ( 

1193 not is_numeric_dtype(self.dtype[col]) 

1194 or is_bool_dtype(self.dtype[col]) 

1195 ) 

1196 ): 

1197 no_thousands_columns.add(i) 

1198 return no_thousands_columns 

1199 

1200 

1201class FixedWidthReader(abc.Iterator): 

1202 """ 

1203 A reader of fixed-width lines. 

1204 """ 

1205 

1206 def __init__( 

1207 self, 

1208 f: IO[str] | ReadCsvBuffer[str], 

1209 colspecs: list[tuple[int, int]] | Literal["infer"], 

1210 delimiter: str | None, 

1211 comment: str | None, 

1212 skiprows: set[int] | None = None, 

1213 infer_nrows: int = 100, 

1214 ) -> None: 

1215 self.f = f 

1216 self.buffer: Iterator | None = None 

1217 self.delimiter = "\r\n" + delimiter if delimiter else "\n\r\t " 

1218 self.comment = comment 

1219 if colspecs == "infer": 

1220 self.colspecs = self.detect_colspecs( 

1221 infer_nrows=infer_nrows, skiprows=skiprows 

1222 ) 

1223 else: 

1224 self.colspecs = colspecs 

1225 

1226 if not isinstance(self.colspecs, (tuple, list)): 

1227 raise TypeError( 

1228 "column specifications must be a list or tuple, " 

1229 f"input was a {type(colspecs).__name__}" 

1230 ) 

1231 

1232 for colspec in self.colspecs: 

1233 if not ( 

1234 isinstance(colspec, (tuple, list)) 

1235 and len(colspec) == 2 

1236 and isinstance(colspec[0], (int, np.integer, type(None))) 

1237 and isinstance(colspec[1], (int, np.integer, type(None))) 

1238 ): 

1239 raise TypeError( 

1240 "Each column specification must be " 

1241 "2 element tuple or list of integers" 

1242 ) 

1243 

1244 def get_rows(self, infer_nrows: int, skiprows: set[int] | None = None) -> list[str]: 

1245 """ 

1246 Read rows from self.f, skipping as specified. 

1247 

1248 We distinguish buffer_rows (the first <= infer_nrows 

1249 lines) from the rows returned to detect_colspecs 

1250 because it's simpler to leave the other locations 

1251 with skiprows logic alone than to modify them to 

1252 deal with the fact we skipped some rows here as 

1253 well. 

1254 

1255 Parameters 

1256 ---------- 

1257 infer_nrows : int 

1258 Number of rows to read from self.f, not counting 

1259 rows that are skipped. 

1260 skiprows: set, optional 

1261 Indices of rows to skip. 

1262 

1263 Returns 

1264 ------- 

1265 detect_rows : list of str 

1266 A list containing the rows to read. 

1267 

1268 """ 

1269 if skiprows is None: 

1270 skiprows = set() 

1271 buffer_rows = [] 

1272 detect_rows = [] 

1273 for i, row in enumerate(self.f): 

1274 if i not in skiprows: 

1275 detect_rows.append(row) 

1276 buffer_rows.append(row) 

1277 if len(detect_rows) >= infer_nrows: 

1278 break 

1279 self.buffer = iter(buffer_rows) 

1280 return detect_rows 

1281 

1282 def detect_colspecs( 

1283 self, infer_nrows: int = 100, skiprows: set[int] | None = None 

1284 ) -> list[tuple[int, int]]: 

1285 # Regex escape the delimiters 

1286 delimiters = "".join([rf"\{x}" for x in self.delimiter]) 

1287 pattern = re.compile(f"([^{delimiters}]+)") 

1288 rows = self.get_rows(infer_nrows, skiprows) 

1289 if not rows: 

1290 raise EmptyDataError("No rows from which to infer column width") 

1291 max_len = max(map(len, rows)) 

1292 mask = np.zeros(max_len + 1, dtype=int) 

1293 if self.comment is not None: 

1294 rows = [row.partition(self.comment)[0] for row in rows] 

1295 for row in rows: 

1296 for m in pattern.finditer(row): 

1297 mask[m.start() : m.end()] = 1 

1298 shifted = np.roll(mask, 1) 

1299 shifted[0] = 0 

1300 edges = np.where((mask ^ shifted) == 1)[0] 

1301 edge_pairs = list(zip(edges[::2], edges[1::2])) 

1302 return edge_pairs 

1303 

1304 def __next__(self) -> list[str]: 

1305 # Argument 1 to "next" has incompatible type "Union[IO[str], 

1306 # ReadCsvBuffer[str]]"; expected "SupportsNext[str]" 

1307 if self.buffer is not None: 

1308 try: 

1309 line = next(self.buffer) 

1310 except StopIteration: 

1311 self.buffer = None 

1312 line = next(self.f) # type: ignore[arg-type] 

1313 else: 

1314 line = next(self.f) # type: ignore[arg-type] 

1315 # Note: 'colspecs' is a sequence of half-open intervals. 

1316 return [line[from_:to].strip(self.delimiter) for (from_, to) in self.colspecs] 

1317 

1318 

1319class FixedWidthFieldParser(PythonParser): 

1320 """ 

1321 Specialization that Converts fixed-width fields into DataFrames. 

1322 See PythonParser for details. 

1323 """ 

1324 

1325 def __init__(self, f: ReadCsvBuffer[str], **kwds) -> None: 

1326 # Support iterators, convert to a list. 

1327 self.colspecs = kwds.pop("colspecs") 

1328 self.infer_nrows = kwds.pop("infer_nrows") 

1329 PythonParser.__init__(self, f, **kwds) 

1330 

1331 def _make_reader(self, f: IO[str] | ReadCsvBuffer[str]) -> FixedWidthReader: 

1332 return FixedWidthReader( 

1333 f, 

1334 self.colspecs, 

1335 self.delimiter, 

1336 self.comment, 

1337 self.skiprows, 

1338 self.infer_nrows, 

1339 ) 

1340 

1341 def _remove_empty_lines(self, lines: list[list[Scalar]]) -> list[list[Scalar]]: 

1342 """ 

1343 Returns the list of lines without the empty ones. With fixed-width 

1344 fields, empty lines become arrays of empty strings. 

1345 

1346 See PythonParser._remove_empty_lines. 

1347 """ 

1348 return [ 

1349 line 

1350 for line in lines 

1351 if any(not isinstance(e, str) or e.strip() for e in line) 

1352 ] 

1353 

1354 

1355def count_empty_vals(vals) -> int: 

1356 return sum(1 for v in vals if v == "" or v is None) 

1357 

1358 

1359def _validate_skipfooter_arg(skipfooter: int) -> int: 

1360 """ 

1361 Validate the 'skipfooter' parameter. 

1362 

1363 Checks whether 'skipfooter' is a non-negative integer. 

1364 Raises a ValueError if that is not the case. 

1365 

1366 Parameters 

1367 ---------- 

1368 skipfooter : non-negative integer 

1369 The number of rows to skip at the end of the file. 

1370 

1371 Returns 

1372 ------- 

1373 validated_skipfooter : non-negative integer 

1374 The original input if the validation succeeds. 

1375 

1376 Raises 

1377 ------ 

1378 ValueError : 'skipfooter' was not a non-negative integer. 

1379 """ 

1380 if not is_integer(skipfooter): 

1381 raise ValueError("skipfooter must be an integer") 

1382 

1383 if skipfooter < 0: 

1384 raise ValueError("skipfooter cannot be negative") 

1385 

1386 # Incompatible return value type (got "Union[int, integer[Any]]", expected "int") 

1387 return skipfooter # type: ignore[return-value]