Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.9/dist-packages/dask/dataframe/io/csv.py: 11%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

340 statements  

1from __future__ import annotations 

2 

3import os 

4from collections.abc import Mapping 

5from io import BytesIO 

6from warnings import catch_warnings, simplefilter, warn 

7 

8try: 

9 import psutil 

10except ImportError: 

11 psutil = None # type: ignore 

12 

13import numpy as np 

14import pandas as pd 

15from fsspec.compression import compr 

16from fsspec.core import get_fs_token_paths 

17from fsspec.core import open as open_file 

18from fsspec.core import open_files 

19from fsspec.utils import infer_compression 

20from pandas.api.types import ( 

21 CategoricalDtype, 

22 is_datetime64_any_dtype, 

23 is_float_dtype, 

24 is_integer_dtype, 

25 is_object_dtype, 

26) 

27 

28from dask.base import tokenize 

29from dask.bytes import read_bytes 

30from dask.core import flatten 

31from dask.dataframe.backends import dataframe_creation_dispatch 

32from dask.dataframe.io.io import from_map 

33from dask.dataframe.io.utils import DataFrameIOFunction 

34from dask.dataframe.utils import clear_known_categories 

35from dask.delayed import delayed 

36from dask.utils import asciitable, parse_bytes 

37 

38 

39class CSVFunctionWrapper(DataFrameIOFunction): 

40 """ 

41 CSV Function-Wrapper Class 

42 Reads CSV data from disk to produce a partition (given a key). 

43 """ 

44 

45 def __init__( 

46 self, 

47 full_columns, 

48 columns, 

49 colname, 

50 head, 

51 header, 

52 reader, 

53 dtypes, 

54 enforce, 

55 kwargs, 

56 ): 

57 self.full_columns = full_columns 

58 self._columns = columns 

59 self.colname = colname 

60 self.head = head 

61 self.header = header 

62 self.reader = reader 

63 self.dtypes = dtypes 

64 self.enforce = enforce 

65 self.kwargs = kwargs 

66 

67 @property 

68 def columns(self): 

69 if self._columns is None: 

70 return self.full_columns 

71 if self.colname: 

72 return self._columns + [self.colname] 

73 return self._columns 

74 

75 def project_columns(self, columns): 

76 """Return a new CSVFunctionWrapper object with 

77 a sub-column projection. 

78 """ 

79 # Make sure columns is ordered correctly 

80 columns = [c for c in self.head.columns if c in columns] 

81 if columns == self.columns: 

82 return self 

83 if self.colname and self.colname not in columns: 

84 # when path-as-column is on, we must keep it at IO 

85 # whatever the selection 

86 head = self.head[columns + [self.colname]] 

87 else: 

88 head = self.head[columns] 

89 return CSVFunctionWrapper( 

90 self.full_columns, 

91 columns, 

92 self.colname, 

93 head, 

94 self.header, 

95 self.reader, 

96 {c: self.dtypes[c] for c in columns}, 

97 self.enforce, 

98 self.kwargs, 

99 ) 

100 

101 def __call__(self, part): 

102 # Part will be a 3-element tuple 

103 block, path, is_first, is_last = part 

104 

105 # Construct `path_info` 

106 if path is not None: 

107 path_info = ( 

108 self.colname, 

109 path, 

110 sorted(list(self.head[self.colname].cat.categories)), 

111 ) 

112 else: 

113 path_info = None 

114 

115 # Deal with arguments that are special 

116 # for the first block of each file 

117 write_header = False 

118 rest_kwargs = self.kwargs.copy() 

119 if not is_first: 

120 if rest_kwargs.get("names", None) is None: 

121 write_header = True 

122 rest_kwargs.pop("skiprows", None) 

123 if rest_kwargs.get("header", 0) is not None: 

124 rest_kwargs.pop("header", None) 

125 if not is_last: 

126 rest_kwargs.pop("skipfooter", None) 

127 

128 # Deal with column projection 

129 columns = self.full_columns 

130 project_after_read = False 

131 if self._columns is not None: 

132 if self.kwargs: 

133 # To be safe, if any kwargs are defined, avoid 

134 # changing `usecols` here. Instead, we can just 

135 # select columns after the read 

136 project_after_read = True 

137 else: 

138 columns = self._columns 

139 rest_kwargs["usecols"] = columns 

140 

141 # Call `pandas_read_text` 

142 df = pandas_read_text( 

143 self.reader, 

144 block, 

145 self.header, 

146 rest_kwargs, 

147 self.dtypes, 

148 columns, 

149 write_header, 

150 self.enforce, 

151 path_info, 

152 ) 

153 if project_after_read: 

154 return df[self.columns] 

155 return df 

156 

157 

158def pandas_read_text( 

159 reader, 

160 b, 

161 header, 

162 kwargs, 

163 dtypes=None, 

164 columns=None, 

165 write_header=True, 

166 enforce=False, 

167 path=None, 

168): 

169 """Convert a block of bytes to a Pandas DataFrame 

170 

171 Parameters 

172 ---------- 

173 reader : callable 

174 ``pd.read_csv`` or ``pd.read_table``. 

175 b : bytestring 

176 The content to be parsed with ``reader`` 

177 header : bytestring 

178 An optional header to prepend to ``b`` 

179 kwargs : dict 

180 A dictionary of keyword arguments to be passed to ``reader`` 

181 dtypes : dict 

182 dtypes to assign to columns 

183 path : tuple 

184 A tuple containing path column name, path to file, and an ordered list of paths. 

185 

186 See Also 

187 -------- 

188 dask.dataframe.csv.read_pandas_from_bytes 

189 """ 

190 bio = BytesIO() 

191 if write_header and not b.startswith(header.rstrip()): 

192 bio.write(header) 

193 bio.write(b) 

194 bio.seek(0) 

195 df = reader(bio, **kwargs) 

196 if dtypes: 

197 coerce_dtypes(df, dtypes) 

198 

199 if enforce and columns and (list(df.columns) != list(columns)): 

200 raise ValueError("Columns do not match", df.columns, columns) 

201 if path: 

202 colname, path, paths = path 

203 code = paths.index(path) 

204 df = df.assign( 

205 **{colname: pd.Categorical.from_codes(np.full(len(df), code), paths)} 

206 ) 

207 return df 

208 

209 

210def coerce_dtypes(df, dtypes): 

211 """Coerce dataframe to dtypes safely 

212 

213 Operates in place 

214 

215 Parameters 

216 ---------- 

217 df: Pandas DataFrame 

218 dtypes: dict like {'x': float} 

219 """ 

220 bad_dtypes = [] 

221 bad_dates = [] 

222 errors = [] 

223 for c in df.columns: 

224 if c in dtypes and df.dtypes[c] != dtypes[c]: 

225 actual = df.dtypes[c] 

226 desired = dtypes[c] 

227 if is_float_dtype(actual) and is_integer_dtype(desired): 

228 bad_dtypes.append((c, actual, desired)) 

229 elif is_object_dtype(actual) and is_datetime64_any_dtype(desired): 

230 # This can only occur when parse_dates is specified, but an 

231 # invalid date is encountered. Pandas then silently falls back 

232 # to object dtype. Since `object_array.astype(datetime)` will 

233 # silently overflow, error here and report. 

234 bad_dates.append(c) 

235 else: 

236 try: 

237 df[c] = df[c].astype(dtypes[c]) 

238 except Exception as e: 

239 bad_dtypes.append((c, actual, desired)) 

240 errors.append((c, e)) 

241 

242 if bad_dtypes: 

243 if errors: 

244 ex = "\n".join( 

245 f"- {c}\n {e!r}" for c, e in sorted(errors, key=lambda x: str(x[0])) 

246 ) 

247 exceptions = ( 

248 "The following columns also raised exceptions on " 

249 "conversion:\n\n%s\n\n" 

250 ) % ex 

251 extra = "" 

252 else: 

253 exceptions = "" 

254 # All mismatches are int->float, also suggest `assume_missing=True` 

255 extra = ( 

256 "\n\nAlternatively, provide `assume_missing=True` " 

257 "to interpret\n" 

258 "all unspecified integer columns as floats." 

259 ) 

260 

261 bad_dtypes = sorted(bad_dtypes, key=lambda x: str(x[0])) 

262 table = asciitable(["Column", "Found", "Expected"], bad_dtypes) 

263 dtype_kw = "dtype={%s}" % ",\n ".join( 

264 f"{k!r}: '{v}'" for (k, v, _) in bad_dtypes 

265 ) 

266 

267 dtype_msg = ( 

268 "{table}\n\n" 

269 "{exceptions}" 

270 "Usually this is due to dask's dtype inference failing, and\n" 

271 "*may* be fixed by specifying dtypes manually by adding:\n\n" 

272 "{dtype_kw}\n\n" 

273 "to the call to `read_csv`/`read_table`." 

274 "{extra}" 

275 ).format(table=table, exceptions=exceptions, dtype_kw=dtype_kw, extra=extra) 

276 else: 

277 dtype_msg = None 

278 

279 if bad_dates: 

280 also = " also " if bad_dtypes else " " 

281 cols = "\n".join("- %s" % c for c in bad_dates) 

282 date_msg = ( 

283 "The following columns{also}failed to properly parse as dates:\n\n" 

284 "{cols}\n\n" 

285 "This is usually due to an invalid value in that column. To\n" 

286 "diagnose and fix it's recommended to drop these columns from the\n" 

287 "`parse_dates` keyword, and manually convert them to dates later\n" 

288 "using `dd.to_datetime`." 

289 ).format(also=also, cols=cols) 

290 else: 

291 date_msg = None 

292 

293 if bad_dtypes or bad_dates: 

294 rule = "\n\n%s\n\n" % ("-" * 61) 

295 msg = "Mismatched dtypes found in `pd.read_csv`/`pd.read_table`.\n\n%s" % ( 

296 rule.join(filter(None, [dtype_msg, date_msg])) 

297 ) 

298 raise ValueError(msg) 

299 

300 

301def text_blocks_to_pandas( 

302 reader, 

303 block_lists, 

304 header, 

305 head, 

306 kwargs, 

307 enforce=False, 

308 specified_dtypes=None, 

309 path=None, 

310 blocksize=None, 

311 urlpath=None, 

312): 

313 """Convert blocks of bytes to a dask.dataframe 

314 

315 This accepts a list of lists of values of bytes where each list corresponds 

316 to one file, and the value of bytes concatenate to comprise the entire 

317 file, in order. 

318 

319 Parameters 

320 ---------- 

321 reader : callable 

322 ``pd.read_csv`` or ``pd.read_table``. 

323 block_lists : list of lists of delayed values of bytes 

324 The lists of bytestrings where each list corresponds to one logical file 

325 header : bytestring 

326 The header, found at the front of the first file, to be prepended to 

327 all blocks 

328 head : pd.DataFrame 

329 An example Pandas DataFrame to be used for metadata. 

330 kwargs : dict 

331 Keyword arguments to pass down to ``reader`` 

332 path : tuple, optional 

333 A tuple containing column name for path and the path_converter if provided 

334 

335 Returns 

336 ------- 

337 A dask.dataframe 

338 """ 

339 dtypes = head.dtypes.to_dict() 

340 # dtypes contains only instances of CategoricalDtype, which causes issues 

341 # in coerce_dtypes for non-uniform categories across partitions. 

342 # We will modify `dtype` (which is inferred) to 

343 # 1. contain instances of CategoricalDtypes for user-provided types 

344 # 2. contain 'category' for data inferred types 

345 categoricals = head.select_dtypes(include=["category"]).columns 

346 

347 if isinstance(specified_dtypes, Mapping): 

348 known_categoricals = [ 

349 k 

350 for k in categoricals 

351 if isinstance(specified_dtypes.get(k), CategoricalDtype) 

352 and specified_dtypes.get(k).categories is not None 

353 ] 

354 unknown_categoricals = categoricals.difference(known_categoricals) 

355 else: 

356 unknown_categoricals = categoricals 

357 

358 # Fixup the dtypes 

359 for k in unknown_categoricals: 

360 dtypes[k] = "category" 

361 

362 columns = list(head.columns) 

363 

364 blocks = tuple(flatten(block_lists)) 

365 # Create mask of first blocks from nested block_lists 

366 is_first = tuple(block_mask(block_lists)) 

367 is_last = tuple(block_mask_last(block_lists)) 

368 

369 if path: 

370 colname, path_converter = path 

371 paths = [b[1].path for b in blocks] 

372 if path_converter: 

373 paths = [path_converter(p) for p in paths] 

374 head = head.assign( 

375 **{ 

376 colname: pd.Categorical.from_codes( 

377 np.zeros(len(head), dtype=int), set(paths) 

378 ) 

379 } 

380 ) 

381 path = (colname, paths) 

382 

383 if len(unknown_categoricals): 

384 head = clear_known_categories(head, cols=unknown_categoricals) 

385 

386 # Define parts 

387 parts = [] 

388 colname, paths = path or (None, None) 

389 for i in range(len(blocks)): 

390 parts.append([blocks[i], paths[i] if paths else None, is_first[i], is_last[i]]) 

391 

392 # Construct the output collection with from_map 

393 return from_map( 

394 CSVFunctionWrapper( 

395 columns, 

396 None, 

397 colname, 

398 head, 

399 header, 

400 reader, 

401 dtypes, 

402 enforce, 

403 kwargs, 

404 ), 

405 parts, 

406 meta=head, 

407 label="read-csv", 

408 token=tokenize(reader, urlpath, columns, enforce, head, blocksize), 

409 enforce_metadata=False, 

410 produces_tasks=True, 

411 ) 

412 

413 

414def block_mask(block_lists): 

415 """ 

416 Yields a flat iterable of booleans to mark the zeroth elements of the 

417 nested input ``block_lists`` in a flattened output. 

418 

419 >>> list(block_mask([[1, 2], [3, 4], [5]])) 

420 [True, False, True, False, True] 

421 """ 

422 for block in block_lists: 

423 if not block: 

424 continue 

425 yield True 

426 yield from (False for _ in block[1:]) 

427 

428 

429def block_mask_last(block_lists): 

430 """ 

431 Yields a flat iterable of booleans to mark the last element of the 

432 nested input ``block_lists`` in a flattened output. 

433 

434 >>> list(block_mask_last([[1, 2], [3, 4], [5]])) 

435 [False, True, False, True, True] 

436 """ 

437 for block in block_lists: 

438 if not block: 

439 continue 

440 yield from (False for _ in block[:-1]) 

441 yield True 

442 

443 

444def auto_blocksize(total_memory, cpu_count): 

445 memory_factor = 10 

446 blocksize = int(total_memory // cpu_count / memory_factor) 

447 return min(blocksize, int(64e6)) 

448 

449 

450def _infer_block_size(): 

451 default = 2**25 

452 if psutil is not None: 

453 with catch_warnings(): 

454 simplefilter("ignore", RuntimeWarning) 

455 mem = psutil.virtual_memory().total 

456 cpu = psutil.cpu_count() 

457 

458 if mem and cpu: 

459 return auto_blocksize(mem, cpu) 

460 

461 return default 

462 

463 

464# guess blocksize if psutil is installed or use acceptable default one if not 

465AUTO_BLOCKSIZE = _infer_block_size() 

466 

467 

468def read_pandas( 

469 reader, 

470 urlpath, 

471 blocksize="default", 

472 lineterminator=None, 

473 compression="infer", 

474 sample=256000, 

475 sample_rows=10, 

476 enforce=False, 

477 assume_missing=False, 

478 storage_options=None, 

479 include_path_column=False, 

480 **kwargs, 

481): 

482 reader_name = reader.__name__ 

483 if lineterminator is not None and len(lineterminator) == 1: 

484 kwargs["lineterminator"] = lineterminator 

485 else: 

486 lineterminator = "\n" 

487 if "encoding" in kwargs: 

488 b_lineterminator = lineterminator.encode(kwargs["encoding"]) 

489 empty_blob = "".encode(kwargs["encoding"]) 

490 if empty_blob: 

491 # This encoding starts with a Byte Order Mark (BOM), so strip that from the 

492 # start of the line terminator, since this value is not a full file. 

493 b_lineterminator = b_lineterminator[len(empty_blob) :] 

494 else: 

495 b_lineterminator = lineterminator.encode() 

496 if include_path_column and isinstance(include_path_column, bool): 

497 include_path_column = "path" 

498 if "index" in kwargs or ( 

499 "index_col" in kwargs and kwargs.get("index_col") is not False 

500 ): 

501 raise ValueError( 

502 "Keywords 'index' and 'index_col' not supported, except for " 

503 "'index_col=False'. Use dd.{reader_name}(...).set_index('my-index') instead" 

504 ) 

505 for kw in ["iterator", "chunksize"]: 

506 if kw in kwargs: 

507 raise ValueError(f"{kw} not supported for dd.{reader_name}") 

508 if kwargs.get("nrows", None): 

509 raise ValueError( 

510 "The 'nrows' keyword is not supported by " 

511 "`dd.{0}`. To achieve the same behavior, it's " 

512 "recommended to use `dd.{0}(...)." 

513 "head(n=nrows)`".format(reader_name) 

514 ) 

515 if isinstance(kwargs.get("skiprows"), int): 

516 lastskiprow = firstrow = kwargs.get("skiprows") 

517 elif kwargs.get("skiprows") is None: 

518 lastskiprow = firstrow = 0 

519 else: 

520 # When skiprows is a list, we expect more than max(skiprows) to 

521 # be included in the sample. This means that [0,2] will work well, 

522 # but [0, 440] might not work. 

523 skiprows = set(kwargs.get("skiprows")) 

524 lastskiprow = max(skiprows) 

525 # find the firstrow that is not skipped, for use as header 

526 firstrow = min(set(range(len(skiprows) + 1)) - set(skiprows)) 

527 if isinstance(kwargs.get("header"), list): 

528 raise TypeError(f"List of header rows not supported for dd.{reader_name}") 

529 if isinstance(kwargs.get("converters"), dict) and include_path_column: 

530 path_converter = kwargs.get("converters").get(include_path_column, None) 

531 else: 

532 path_converter = None 

533 

534 # If compression is "infer", inspect the (first) path suffix and 

535 # set the proper compression option if the suffix is recognized. 

536 if compression == "infer": 

537 # Translate the input urlpath to a simple path list 

538 paths = get_fs_token_paths(urlpath, mode="rb", storage_options=storage_options)[ 

539 2 

540 ] 

541 

542 # Check for at least one valid path 

543 if len(paths) == 0: 

544 raise OSError(f"{urlpath} resolved to no files") 

545 

546 # Infer compression from first path 

547 compression = infer_compression(paths[0]) 

548 

549 if blocksize == "default": 

550 blocksize = AUTO_BLOCKSIZE 

551 if isinstance(blocksize, str): 

552 blocksize = parse_bytes(blocksize) 

553 if blocksize and compression: 

554 # NONE of the compressions should use chunking 

555 warn( 

556 "Warning %s compression does not support breaking apart files\n" 

557 "Please ensure that each individual file can fit in memory and\n" 

558 "use the keyword ``blocksize=None to remove this message``\n" 

559 "Setting ``blocksize=None``" % compression 

560 ) 

561 blocksize = None 

562 if compression not in compr: 

563 raise NotImplementedError("Compression format %s not installed" % compression) 

564 if blocksize and sample and blocksize < sample and lastskiprow != 0: 

565 warn( 

566 "Unexpected behavior can result from passing skiprows when\n" 

567 "blocksize is smaller than sample size.\n" 

568 "Setting ``sample=blocksize``" 

569 ) 

570 sample = blocksize 

571 b_out = read_bytes( 

572 urlpath, 

573 delimiter=b_lineterminator, 

574 blocksize=blocksize, 

575 sample=sample, 

576 compression=compression, 

577 include_path=include_path_column, 

578 **(storage_options or {}), 

579 ) 

580 

581 if include_path_column: 

582 b_sample, values, paths = b_out 

583 path = (include_path_column, path_converter) 

584 else: 

585 b_sample, values = b_out 

586 path = None 

587 

588 if not isinstance(values[0], (tuple, list)): 

589 values = [values] 

590 # If we have not sampled, then use the first row of the first values 

591 # as a representative sample. 

592 if b_sample is False and len(values[0]): 

593 b_sample = values[0][0].compute() 

594 

595 # Get header row, and check that sample is long enough. If the file 

596 # contains a header row, we need at least 2 nonempty rows + the number of 

597 # rows to skip. 

598 names = kwargs.get("names", None) 

599 header = kwargs.get("header", "infer" if names is None else None) 

600 need = 1 if header is None else 2 

601 if isinstance(header, int): 

602 firstrow += header 

603 if kwargs.get("comment"): 

604 # if comment is provided, step through lines of b_sample and strip out comments 

605 parts = [] 

606 for part in b_sample.split(b_lineterminator): 

607 split_comment = part.decode().split(kwargs.get("comment")) 

608 if len(split_comment) > 1: 

609 # if line starts with comment, don't include that line in parts. 

610 if len(split_comment[0]) > 0: 

611 parts.append(split_comment[0].strip().encode()) 

612 else: 

613 parts.append(part) 

614 if len(parts) > need: 

615 break 

616 else: 

617 parts = b_sample.split( 

618 b_lineterminator, max(lastskiprow + need, firstrow + need) 

619 ) 

620 

621 # If the last partition is empty, don't count it 

622 nparts = 0 if not parts else len(parts) - int(not parts[-1]) 

623 

624 if sample is not False and nparts < lastskiprow + need and len(b_sample) >= sample: 

625 raise ValueError( 

626 "Sample is not large enough to include at least one " 

627 "row of data. Please increase the number of bytes " 

628 "in `sample` in the call to `read_csv`/`read_table`" 

629 ) 

630 

631 header = b"" if header is None else parts[firstrow] + b_lineterminator 

632 

633 # Use sample to infer dtypes and check for presence of include_path_column 

634 head_kwargs = kwargs.copy() 

635 head_kwargs.pop("skipfooter", None) 

636 if head_kwargs.get("engine") == "pyarrow": 

637 # Use c engine to infer since Arrow engine does not support nrows 

638 head_kwargs["engine"] = "c" 

639 try: 

640 head = reader(BytesIO(b_sample), nrows=sample_rows, **head_kwargs) 

641 except pd.errors.ParserError as e: 

642 if "EOF" in str(e): 

643 raise ValueError( 

644 "EOF encountered while reading header. \n" 

645 "Pass argument `sample_rows` and make sure the value of `sample` " 

646 "is large enough to accommodate that many rows of data" 

647 ) from e 

648 raise 

649 if include_path_column and (include_path_column in head.columns): 

650 raise ValueError( 

651 "Files already contain the column name: %s, so the " 

652 "path column cannot use this name. Please set " 

653 "`include_path_column` to a unique name." % include_path_column 

654 ) 

655 

656 specified_dtypes = kwargs.get("dtype", {}) 

657 if specified_dtypes is None: 

658 specified_dtypes = {} 

659 # If specified_dtypes is a single type, then all columns were specified 

660 if assume_missing and isinstance(specified_dtypes, dict): 

661 # Convert all non-specified integer columns to floats 

662 for c in head.columns: 

663 if is_integer_dtype(head[c].dtype) and c not in specified_dtypes: 

664 head[c] = head[c].astype(float) 

665 

666 values = [[list(dsk.dask.values()) for dsk in block] for block in values] 

667 

668 return text_blocks_to_pandas( 

669 reader, 

670 values, 

671 header, 

672 head, 

673 kwargs, 

674 enforce=enforce, 

675 specified_dtypes=specified_dtypes, 

676 path=path, 

677 blocksize=blocksize, 

678 urlpath=urlpath, 

679 ) 

680 

681 

682READ_DOC_TEMPLATE = """ 

683Read {file_type} files into a Dask.DataFrame 

684 

685This parallelizes the :func:`pandas.{reader}` function in the following ways: 

686 

687- It supports loading many files at once using globstrings: 

688 

689 >>> df = dd.{reader}('myfiles.*.csv') # doctest: +SKIP 

690 

691- In some cases it can break up large files: 

692 

693 >>> df = dd.{reader}('largefile.csv', blocksize=25e6) # 25MB chunks # doctest: +SKIP 

694 

695- It can read CSV files from external resources (e.g. S3, HDFS) by 

696 providing a URL: 

697 

698 >>> df = dd.{reader}('s3://bucket/myfiles.*.csv') # doctest: +SKIP 

699 >>> df = dd.{reader}('hdfs:///myfiles.*.csv') # doctest: +SKIP 

700 >>> df = dd.{reader}('hdfs://namenode.example.com/myfiles.*.csv') # doctest: +SKIP 

701 

702Internally ``dd.{reader}`` uses :func:`pandas.{reader}` and supports many of the 

703same keyword arguments with the same performance guarantees. See the docstring 

704for :func:`pandas.{reader}` for more information on available keyword arguments. 

705 

706Parameters 

707---------- 

708urlpath : string or list 

709 Absolute or relative filepath(s). Prefix with a protocol like ``s3://`` 

710 to read from alternative filesystems. To read from multiple files you 

711 can pass a globstring or a list of paths, with the caveat that they 

712 must all have the same protocol. 

713blocksize : str, int or None, optional 

714 Number of bytes by which to cut up larger files. Default value is computed 

715 based on available physical memory and the number of cores, up to a maximum 

716 of 64MB. Can be a number like ``64000000`` or a string like ``"64MB"``. If 

717 ``None``, a single block is used for each file. 

718sample : int, optional 

719 Number of bytes to use when determining dtypes 

720assume_missing : bool, optional 

721 If True, all integer columns that aren't specified in ``dtype`` are assumed 

722 to contain missing values, and are converted to floats. Default is False. 

723storage_options : dict, optional 

724 Extra options that make sense for a particular storage connection, e.g. 

725 host, port, username, password, etc. 

726include_path_column : bool or str, optional 

727 Whether or not to include the path to each particular file. If True a new 

728 column is added to the dataframe called ``path``. If str, sets new column 

729 name. Default is False. 

730**kwargs 

731 Extra keyword arguments to forward to :func:`pandas.{reader}`. 

732 

733Notes 

734----- 

735Dask dataframe tries to infer the ``dtype`` of each column by reading a sample 

736from the start of the file (or of the first file if it's a glob). Usually this 

737works fine, but if the ``dtype`` is different later in the file (or in other 

738files) this can cause issues. For example, if all the rows in the sample had 

739integer dtypes, but later on there was a ``NaN``, then this would error at 

740compute time. To fix this, you have a few options: 

741 

742- Provide explicit dtypes for the offending columns using the ``dtype`` 

743 keyword. This is the recommended solution. 

744 

745- Use the ``assume_missing`` keyword to assume that all columns inferred as 

746 integers contain missing values, and convert them to floats. 

747 

748- Increase the size of the sample using the ``sample`` keyword. 

749 

750It should also be noted that this function may fail if a {file_type} file 

751includes quoted strings that contain the line terminator. To get around this 

752you can specify ``blocksize=None`` to not split files into multiple partitions, 

753at the cost of reduced parallelism. 

754""" 

755 

756 

757def make_reader(reader, reader_name, file_type): 

758 def read( 

759 urlpath, 

760 blocksize="default", 

761 lineterminator=None, 

762 compression="infer", 

763 sample=256000, 

764 sample_rows=10, 

765 enforce=False, 

766 assume_missing=False, 

767 storage_options=None, 

768 include_path_column=False, 

769 **kwargs, 

770 ): 

771 return read_pandas( 

772 reader, 

773 urlpath, 

774 blocksize=blocksize, 

775 lineterminator=lineterminator, 

776 compression=compression, 

777 sample=sample, 

778 sample_rows=sample_rows, 

779 enforce=enforce, 

780 assume_missing=assume_missing, 

781 storage_options=storage_options, 

782 include_path_column=include_path_column, 

783 **kwargs, 

784 ) 

785 

786 read.__doc__ = READ_DOC_TEMPLATE.format(reader=reader_name, file_type=file_type) 

787 read.__name__ = reader_name 

788 return read 

789 

790 

791read_csv = dataframe_creation_dispatch.register_inplace( 

792 backend="pandas", 

793 name="read_csv", 

794)(make_reader(pd.read_csv, "read_csv", "CSV")) 

795 

796 

797read_table = make_reader(pd.read_table, "read_table", "delimited") 

798read_fwf = make_reader(pd.read_fwf, "read_fwf", "fixed-width") 

799 

800 

801def _write_csv(df, fil, *, depend_on=None, **kwargs): 

802 with fil as f: 

803 df.to_csv(f, **kwargs) 

804 return os.path.normpath(fil.path) 

805 

806 

807def to_csv( 

808 df, 

809 filename, 

810 single_file=False, 

811 encoding="utf-8", 

812 mode="wt", 

813 name_function=None, 

814 compression=None, 

815 compute=True, 

816 scheduler=None, 

817 storage_options=None, 

818 header_first_partition_only=None, 

819 compute_kwargs=None, 

820 **kwargs, 

821): 

822 """ 

823 Store Dask DataFrame to CSV files 

824 

825 One filename per partition will be created. You can specify the 

826 filenames in a variety of ways. 

827 

828 Use a globstring:: 

829 

830 >>> df.to_csv('/path/to/data/export-*.csv') # doctest: +SKIP 

831 

832 The * will be replaced by the increasing sequence 0, 1, 2, ... 

833 

834 :: 

835 

836 /path/to/data/export-0.csv 

837 /path/to/data/export-1.csv 

838 

839 Use a globstring and a ``name_function=`` keyword argument. The 

840 name_function function should expect an integer and produce a string. 

841 Strings produced by name_function must preserve the order of their 

842 respective partition indices. 

843 

844 >>> from datetime import date, timedelta 

845 >>> def name(i): 

846 ... return str(date(2015, 1, 1) + i * timedelta(days=1)) 

847 

848 >>> name(0) 

849 '2015-01-01' 

850 >>> name(15) 

851 '2015-01-16' 

852 

853 >>> df.to_csv('/path/to/data/export-*.csv', name_function=name) # doctest: +SKIP 

854 

855 :: 

856 

857 /path/to/data/export-2015-01-01.csv 

858 /path/to/data/export-2015-01-02.csv 

859 ... 

860 

861 You can also provide an explicit list of paths:: 

862 

863 >>> paths = ['/path/to/data/alice.csv', '/path/to/data/bob.csv', ...] # doctest: +SKIP 

864 >>> df.to_csv(paths) # doctest: +SKIP 

865 

866 You can also provide a directory name: 

867 

868 >>> df.to_csv('/path/to/data') # doctest: +SKIP 

869 

870 The files will be numbered 0, 1, 2, (and so on) suffixed with '.part': 

871 

872 :: 

873 

874 /path/to/data/0.part 

875 /path/to/data/1.part 

876 

877 Parameters 

878 ---------- 

879 df : dask.DataFrame 

880 Data to save 

881 filename : string or list 

882 Absolute or relative filepath(s). Prefix with a protocol like ``s3://`` 

883 to save to remote filesystems. 

884 single_file : bool, default False 

885 Whether to save everything into a single CSV file. Under the 

886 single file mode, each partition is appended at the end of the 

887 specified CSV file. 

888 encoding : string, default 'utf-8' 

889 A string representing the encoding to use in the output file. 

890 mode : str, default 'w' 

891 Python file mode. The default is 'w' (or 'wt'), for writing 

892 a new file or overwriting an existing file in text mode. 'a' 

893 (or 'at') will append to an existing file in text mode or 

894 create a new file if it does not already exist. See :py:func:`open`. 

895 name_function : callable, default None 

896 Function accepting an integer (partition index) and producing a 

897 string to replace the asterisk in the given filename globstring. 

898 Should preserve the lexicographic order of partitions. Not 

899 supported when ``single_file`` is True. 

900 compression : string, optional 

901 A string representing the compression to use in the output file, 

902 allowed values are 'gzip', 'bz2', 'xz', 

903 only used when the first argument is a filename. 

904 compute : bool, default True 

905 If True, immediately executes. If False, returns a set of delayed 

906 objects, which can be computed at a later time. 

907 storage_options : dict 

908 Parameters passed on to the backend filesystem class. 

909 header_first_partition_only : bool, default None 

910 If set to True, only write the header row in the first output 

911 file. By default, headers are written to all partitions under 

912 the multiple file mode (``single_file`` is False) and written 

913 only once under the single file mode (``single_file`` is True). 

914 It must be True under the single file mode. 

915 compute_kwargs : dict, optional 

916 Options to be passed in to the compute method 

917 kwargs : dict, optional 

918 Additional parameters to pass to :meth:`pandas.DataFrame.to_csv`. 

919 

920 Returns 

921 ------- 

922 The names of the file written if they were computed right away. 

923 If not, the delayed tasks associated with writing the files. 

924 

925 Raises 

926 ------ 

927 ValueError 

928 If ``header_first_partition_only`` is set to False or 

929 ``name_function`` is specified when ``single_file`` is True. 

930 

931 See Also 

932 -------- 

933 fsspec.open_files 

934 """ 

935 if single_file and name_function is not None: 

936 raise ValueError("name_function is not supported under the single file mode") 

937 if header_first_partition_only is None: 

938 header_first_partition_only = single_file 

939 elif not header_first_partition_only and single_file: 

940 raise ValueError( 

941 "header_first_partition_only cannot be False in the single file mode." 

942 ) 

943 file_options = dict( 

944 compression=compression, 

945 encoding=encoding, 

946 newline="", 

947 **(storage_options or {}), 

948 ) 

949 to_csv_chunk = delayed(_write_csv, pure=False) 

950 dfs = df.to_delayed() 

951 if single_file: 

952 first_file = open_file(filename, mode=mode, **file_options) 

953 value = to_csv_chunk(dfs[0], first_file, **kwargs) 

954 append_mode = mode if "a" in mode else mode + "a" 

955 append_mode = append_mode.replace("w", "").replace("x", "") 

956 append_file = open_file(filename, mode=append_mode, **file_options) 

957 kwargs["header"] = False 

958 for d in dfs[1:]: 

959 value = to_csv_chunk(d, append_file, depend_on=value, **kwargs) 

960 values = [value] 

961 files = [first_file] 

962 else: 

963 files = open_files( 

964 filename, 

965 mode=mode, 

966 name_function=name_function, 

967 num=df.npartitions, 

968 **file_options, 

969 ) 

970 values = [to_csv_chunk(dfs[0], files[0], **kwargs)] 

971 if header_first_partition_only: 

972 kwargs["header"] = False 

973 values.extend( 

974 [to_csv_chunk(d, f, **kwargs) for d, f in zip(dfs[1:], files[1:])] 

975 ) 

976 if compute: 

977 if compute_kwargs is None: 

978 compute_kwargs = dict() 

979 

980 if scheduler is not None: 

981 warn( 

982 "The 'scheduler' keyword argument for `to_csv()` is deprecated and" 

983 "will be removed in a future version. " 

984 "Please use the `compute_kwargs` argument instead. " 

985 f"For example, df.to_csv(..., compute_kwargs={{scheduler: {scheduler}}})", 

986 FutureWarning, 

987 ) 

988 

989 if ( 

990 scheduler is not None 

991 and compute_kwargs.get("scheduler") is not None 

992 and compute_kwargs.get("scheduler") != scheduler 

993 ): 

994 raise ValueError( 

995 f"Differing values for 'scheduler' have been passed in.\n" 

996 f"scheduler argument: {scheduler}\n" 

997 f"via compute_kwargs: {compute_kwargs.get('scheduler')}" 

998 ) 

999 

1000 if scheduler is not None and compute_kwargs.get("scheduler") is None: 

1001 compute_kwargs["scheduler"] = scheduler 

1002 

1003 import dask 

1004 

1005 return list(dask.compute(*values, **compute_kwargs)) 

1006 else: 

1007 return values 

1008 

1009 

1010from dask.dataframe.core import _Frame 

1011 

1012_Frame.to_csv.__doc__ = to_csv.__doc__