1from __future__ import annotations
2
3import os
4from collections.abc import Mapping
5from io import BytesIO
6from warnings import catch_warnings, simplefilter, warn
7
8try:
9 import psutil
10except ImportError:
11 psutil = None # type: ignore
12
13import numpy as np
14import pandas as pd
15from fsspec.compression import compr
16from fsspec.core import get_fs_token_paths
17from fsspec.core import open as open_file
18from fsspec.core import open_files
19from fsspec.utils import infer_compression
20from pandas.api.types import (
21 CategoricalDtype,
22 is_datetime64_any_dtype,
23 is_float_dtype,
24 is_integer_dtype,
25 is_object_dtype,
26)
27
28from dask.base import tokenize
29from dask.bytes import read_bytes
30from dask.core import flatten
31from dask.dataframe.backends import dataframe_creation_dispatch
32from dask.dataframe.io.io import from_map
33from dask.dataframe.io.utils import DataFrameIOFunction
34from dask.dataframe.utils import clear_known_categories
35from dask.delayed import delayed
36from dask.utils import asciitable, parse_bytes
37
38
39class CSVFunctionWrapper(DataFrameIOFunction):
40 """
41 CSV Function-Wrapper Class
42 Reads CSV data from disk to produce a partition (given a key).
43 """
44
45 def __init__(
46 self,
47 full_columns,
48 columns,
49 colname,
50 head,
51 header,
52 reader,
53 dtypes,
54 enforce,
55 kwargs,
56 ):
57 self.full_columns = full_columns
58 self._columns = columns
59 self.colname = colname
60 self.head = head
61 self.header = header
62 self.reader = reader
63 self.dtypes = dtypes
64 self.enforce = enforce
65 self.kwargs = kwargs
66
67 @property
68 def columns(self):
69 if self._columns is None:
70 return self.full_columns
71 if self.colname:
72 return self._columns + [self.colname]
73 return self._columns
74
75 def project_columns(self, columns):
76 """Return a new CSVFunctionWrapper object with
77 a sub-column projection.
78 """
79 # Make sure columns is ordered correctly
80 columns = [c for c in self.head.columns if c in columns]
81 if columns == self.columns:
82 return self
83 if self.colname and self.colname not in columns:
84 # when path-as-column is on, we must keep it at IO
85 # whatever the selection
86 head = self.head[columns + [self.colname]]
87 else:
88 head = self.head[columns]
89 return CSVFunctionWrapper(
90 self.full_columns,
91 columns,
92 self.colname,
93 head,
94 self.header,
95 self.reader,
96 {c: self.dtypes[c] for c in columns},
97 self.enforce,
98 self.kwargs,
99 )
100
101 def __call__(self, part):
102 # Part will be a 3-element tuple
103 block, path, is_first, is_last = part
104
105 # Construct `path_info`
106 if path is not None:
107 path_info = (
108 self.colname,
109 path,
110 sorted(list(self.head[self.colname].cat.categories)),
111 )
112 else:
113 path_info = None
114
115 # Deal with arguments that are special
116 # for the first block of each file
117 write_header = False
118 rest_kwargs = self.kwargs.copy()
119 if not is_first:
120 if rest_kwargs.get("names", None) is None:
121 write_header = True
122 rest_kwargs.pop("skiprows", None)
123 if rest_kwargs.get("header", 0) is not None:
124 rest_kwargs.pop("header", None)
125 if not is_last:
126 rest_kwargs.pop("skipfooter", None)
127
128 # Deal with column projection
129 columns = self.full_columns
130 project_after_read = False
131 if self._columns is not None:
132 if self.kwargs:
133 # To be safe, if any kwargs are defined, avoid
134 # changing `usecols` here. Instead, we can just
135 # select columns after the read
136 project_after_read = True
137 else:
138 columns = self._columns
139 rest_kwargs["usecols"] = columns
140
141 # Call `pandas_read_text`
142 df = pandas_read_text(
143 self.reader,
144 block,
145 self.header,
146 rest_kwargs,
147 self.dtypes,
148 columns,
149 write_header,
150 self.enforce,
151 path_info,
152 )
153 if project_after_read:
154 return df[self.columns]
155 return df
156
157
158def pandas_read_text(
159 reader,
160 b,
161 header,
162 kwargs,
163 dtypes=None,
164 columns=None,
165 write_header=True,
166 enforce=False,
167 path=None,
168):
169 """Convert a block of bytes to a Pandas DataFrame
170
171 Parameters
172 ----------
173 reader : callable
174 ``pd.read_csv`` or ``pd.read_table``.
175 b : bytestring
176 The content to be parsed with ``reader``
177 header : bytestring
178 An optional header to prepend to ``b``
179 kwargs : dict
180 A dictionary of keyword arguments to be passed to ``reader``
181 dtypes : dict
182 dtypes to assign to columns
183 path : tuple
184 A tuple containing path column name, path to file, and an ordered list of paths.
185
186 See Also
187 --------
188 dask.dataframe.csv.read_pandas_from_bytes
189 """
190 bio = BytesIO()
191 if write_header and not b.startswith(header.rstrip()):
192 bio.write(header)
193 bio.write(b)
194 bio.seek(0)
195 df = reader(bio, **kwargs)
196 if dtypes:
197 coerce_dtypes(df, dtypes)
198
199 if enforce and columns and (list(df.columns) != list(columns)):
200 raise ValueError("Columns do not match", df.columns, columns)
201 if path:
202 colname, path, paths = path
203 code = paths.index(path)
204 df = df.assign(
205 **{colname: pd.Categorical.from_codes(np.full(len(df), code), paths)}
206 )
207 return df
208
209
210def coerce_dtypes(df, dtypes):
211 """Coerce dataframe to dtypes safely
212
213 Operates in place
214
215 Parameters
216 ----------
217 df: Pandas DataFrame
218 dtypes: dict like {'x': float}
219 """
220 bad_dtypes = []
221 bad_dates = []
222 errors = []
223 for c in df.columns:
224 if c in dtypes and df.dtypes[c] != dtypes[c]:
225 actual = df.dtypes[c]
226 desired = dtypes[c]
227 if is_float_dtype(actual) and is_integer_dtype(desired):
228 bad_dtypes.append((c, actual, desired))
229 elif is_object_dtype(actual) and is_datetime64_any_dtype(desired):
230 # This can only occur when parse_dates is specified, but an
231 # invalid date is encountered. Pandas then silently falls back
232 # to object dtype. Since `object_array.astype(datetime)` will
233 # silently overflow, error here and report.
234 bad_dates.append(c)
235 else:
236 try:
237 df[c] = df[c].astype(dtypes[c])
238 except Exception as e:
239 bad_dtypes.append((c, actual, desired))
240 errors.append((c, e))
241
242 if bad_dtypes:
243 if errors:
244 ex = "\n".join(
245 f"- {c}\n {e!r}" for c, e in sorted(errors, key=lambda x: str(x[0]))
246 )
247 exceptions = (
248 "The following columns also raised exceptions on "
249 "conversion:\n\n%s\n\n"
250 ) % ex
251 extra = ""
252 else:
253 exceptions = ""
254 # All mismatches are int->float, also suggest `assume_missing=True`
255 extra = (
256 "\n\nAlternatively, provide `assume_missing=True` "
257 "to interpret\n"
258 "all unspecified integer columns as floats."
259 )
260
261 bad_dtypes = sorted(bad_dtypes, key=lambda x: str(x[0]))
262 table = asciitable(["Column", "Found", "Expected"], bad_dtypes)
263 dtype_kw = "dtype={%s}" % ",\n ".join(
264 f"{k!r}: '{v}'" for (k, v, _) in bad_dtypes
265 )
266
267 dtype_msg = (
268 "{table}\n\n"
269 "{exceptions}"
270 "Usually this is due to dask's dtype inference failing, and\n"
271 "*may* be fixed by specifying dtypes manually by adding:\n\n"
272 "{dtype_kw}\n\n"
273 "to the call to `read_csv`/`read_table`."
274 "{extra}"
275 ).format(table=table, exceptions=exceptions, dtype_kw=dtype_kw, extra=extra)
276 else:
277 dtype_msg = None
278
279 if bad_dates:
280 also = " also " if bad_dtypes else " "
281 cols = "\n".join("- %s" % c for c in bad_dates)
282 date_msg = (
283 "The following columns{also}failed to properly parse as dates:\n\n"
284 "{cols}\n\n"
285 "This is usually due to an invalid value in that column. To\n"
286 "diagnose and fix it's recommended to drop these columns from the\n"
287 "`parse_dates` keyword, and manually convert them to dates later\n"
288 "using `dd.to_datetime`."
289 ).format(also=also, cols=cols)
290 else:
291 date_msg = None
292
293 if bad_dtypes or bad_dates:
294 rule = "\n\n%s\n\n" % ("-" * 61)
295 msg = "Mismatched dtypes found in `pd.read_csv`/`pd.read_table`.\n\n%s" % (
296 rule.join(filter(None, [dtype_msg, date_msg]))
297 )
298 raise ValueError(msg)
299
300
301def text_blocks_to_pandas(
302 reader,
303 block_lists,
304 header,
305 head,
306 kwargs,
307 enforce=False,
308 specified_dtypes=None,
309 path=None,
310 blocksize=None,
311 urlpath=None,
312):
313 """Convert blocks of bytes to a dask.dataframe
314
315 This accepts a list of lists of values of bytes where each list corresponds
316 to one file, and the value of bytes concatenate to comprise the entire
317 file, in order.
318
319 Parameters
320 ----------
321 reader : callable
322 ``pd.read_csv`` or ``pd.read_table``.
323 block_lists : list of lists of delayed values of bytes
324 The lists of bytestrings where each list corresponds to one logical file
325 header : bytestring
326 The header, found at the front of the first file, to be prepended to
327 all blocks
328 head : pd.DataFrame
329 An example Pandas DataFrame to be used for metadata.
330 kwargs : dict
331 Keyword arguments to pass down to ``reader``
332 path : tuple, optional
333 A tuple containing column name for path and the path_converter if provided
334
335 Returns
336 -------
337 A dask.dataframe
338 """
339 dtypes = head.dtypes.to_dict()
340 # dtypes contains only instances of CategoricalDtype, which causes issues
341 # in coerce_dtypes for non-uniform categories across partitions.
342 # We will modify `dtype` (which is inferred) to
343 # 1. contain instances of CategoricalDtypes for user-provided types
344 # 2. contain 'category' for data inferred types
345 categoricals = head.select_dtypes(include=["category"]).columns
346
347 if isinstance(specified_dtypes, Mapping):
348 known_categoricals = [
349 k
350 for k in categoricals
351 if isinstance(specified_dtypes.get(k), CategoricalDtype)
352 and specified_dtypes.get(k).categories is not None
353 ]
354 unknown_categoricals = categoricals.difference(known_categoricals)
355 else:
356 unknown_categoricals = categoricals
357
358 # Fixup the dtypes
359 for k in unknown_categoricals:
360 dtypes[k] = "category"
361
362 columns = list(head.columns)
363
364 blocks = tuple(flatten(block_lists))
365 # Create mask of first blocks from nested block_lists
366 is_first = tuple(block_mask(block_lists))
367 is_last = tuple(block_mask_last(block_lists))
368
369 if path:
370 colname, path_converter = path
371 paths = [b[1].path for b in blocks]
372 if path_converter:
373 paths = [path_converter(p) for p in paths]
374 head = head.assign(
375 **{
376 colname: pd.Categorical.from_codes(
377 np.zeros(len(head), dtype=int), set(paths)
378 )
379 }
380 )
381 path = (colname, paths)
382
383 if len(unknown_categoricals):
384 head = clear_known_categories(head, cols=unknown_categoricals)
385
386 # Define parts
387 parts = []
388 colname, paths = path or (None, None)
389 for i in range(len(blocks)):
390 parts.append([blocks[i], paths[i] if paths else None, is_first[i], is_last[i]])
391
392 # Construct the output collection with from_map
393 return from_map(
394 CSVFunctionWrapper(
395 columns,
396 None,
397 colname,
398 head,
399 header,
400 reader,
401 dtypes,
402 enforce,
403 kwargs,
404 ),
405 parts,
406 meta=head,
407 label="read-csv",
408 token=tokenize(reader, urlpath, columns, enforce, head, blocksize),
409 enforce_metadata=False,
410 produces_tasks=True,
411 )
412
413
414def block_mask(block_lists):
415 """
416 Yields a flat iterable of booleans to mark the zeroth elements of the
417 nested input ``block_lists`` in a flattened output.
418
419 >>> list(block_mask([[1, 2], [3, 4], [5]]))
420 [True, False, True, False, True]
421 """
422 for block in block_lists:
423 if not block:
424 continue
425 yield True
426 yield from (False for _ in block[1:])
427
428
429def block_mask_last(block_lists):
430 """
431 Yields a flat iterable of booleans to mark the last element of the
432 nested input ``block_lists`` in a flattened output.
433
434 >>> list(block_mask_last([[1, 2], [3, 4], [5]]))
435 [False, True, False, True, True]
436 """
437 for block in block_lists:
438 if not block:
439 continue
440 yield from (False for _ in block[:-1])
441 yield True
442
443
444def auto_blocksize(total_memory, cpu_count):
445 memory_factor = 10
446 blocksize = int(total_memory // cpu_count / memory_factor)
447 return min(blocksize, int(64e6))
448
449
450def _infer_block_size():
451 default = 2**25
452 if psutil is not None:
453 with catch_warnings():
454 simplefilter("ignore", RuntimeWarning)
455 mem = psutil.virtual_memory().total
456 cpu = psutil.cpu_count()
457
458 if mem and cpu:
459 return auto_blocksize(mem, cpu)
460
461 return default
462
463
464# guess blocksize if psutil is installed or use acceptable default one if not
465AUTO_BLOCKSIZE = _infer_block_size()
466
467
468def read_pandas(
469 reader,
470 urlpath,
471 blocksize="default",
472 lineterminator=None,
473 compression="infer",
474 sample=256000,
475 sample_rows=10,
476 enforce=False,
477 assume_missing=False,
478 storage_options=None,
479 include_path_column=False,
480 **kwargs,
481):
482 reader_name = reader.__name__
483 if lineterminator is not None and len(lineterminator) == 1:
484 kwargs["lineterminator"] = lineterminator
485 else:
486 lineterminator = "\n"
487 if "encoding" in kwargs:
488 b_lineterminator = lineterminator.encode(kwargs["encoding"])
489 empty_blob = "".encode(kwargs["encoding"])
490 if empty_blob:
491 # This encoding starts with a Byte Order Mark (BOM), so strip that from the
492 # start of the line terminator, since this value is not a full file.
493 b_lineterminator = b_lineterminator[len(empty_blob) :]
494 else:
495 b_lineterminator = lineterminator.encode()
496 if include_path_column and isinstance(include_path_column, bool):
497 include_path_column = "path"
498 if "index" in kwargs or (
499 "index_col" in kwargs and kwargs.get("index_col") is not False
500 ):
501 raise ValueError(
502 "Keywords 'index' and 'index_col' not supported, except for "
503 "'index_col=False'. Use dd.{reader_name}(...).set_index('my-index') instead"
504 )
505 for kw in ["iterator", "chunksize"]:
506 if kw in kwargs:
507 raise ValueError(f"{kw} not supported for dd.{reader_name}")
508 if kwargs.get("nrows", None):
509 raise ValueError(
510 "The 'nrows' keyword is not supported by "
511 "`dd.{0}`. To achieve the same behavior, it's "
512 "recommended to use `dd.{0}(...)."
513 "head(n=nrows)`".format(reader_name)
514 )
515 if isinstance(kwargs.get("skiprows"), int):
516 lastskiprow = firstrow = kwargs.get("skiprows")
517 elif kwargs.get("skiprows") is None:
518 lastskiprow = firstrow = 0
519 else:
520 # When skiprows is a list, we expect more than max(skiprows) to
521 # be included in the sample. This means that [0,2] will work well,
522 # but [0, 440] might not work.
523 skiprows = set(kwargs.get("skiprows"))
524 lastskiprow = max(skiprows)
525 # find the firstrow that is not skipped, for use as header
526 firstrow = min(set(range(len(skiprows) + 1)) - set(skiprows))
527 if isinstance(kwargs.get("header"), list):
528 raise TypeError(f"List of header rows not supported for dd.{reader_name}")
529 if isinstance(kwargs.get("converters"), dict) and include_path_column:
530 path_converter = kwargs.get("converters").get(include_path_column, None)
531 else:
532 path_converter = None
533
534 # If compression is "infer", inspect the (first) path suffix and
535 # set the proper compression option if the suffix is recognized.
536 if compression == "infer":
537 # Translate the input urlpath to a simple path list
538 paths = get_fs_token_paths(urlpath, mode="rb", storage_options=storage_options)[
539 2
540 ]
541
542 # Check for at least one valid path
543 if len(paths) == 0:
544 raise OSError(f"{urlpath} resolved to no files")
545
546 # Infer compression from first path
547 compression = infer_compression(paths[0])
548
549 if blocksize == "default":
550 blocksize = AUTO_BLOCKSIZE
551 if isinstance(blocksize, str):
552 blocksize = parse_bytes(blocksize)
553 if blocksize and compression:
554 # NONE of the compressions should use chunking
555 warn(
556 "Warning %s compression does not support breaking apart files\n"
557 "Please ensure that each individual file can fit in memory and\n"
558 "use the keyword ``blocksize=None to remove this message``\n"
559 "Setting ``blocksize=None``" % compression
560 )
561 blocksize = None
562 if compression not in compr:
563 raise NotImplementedError("Compression format %s not installed" % compression)
564 if blocksize and sample and blocksize < sample and lastskiprow != 0:
565 warn(
566 "Unexpected behavior can result from passing skiprows when\n"
567 "blocksize is smaller than sample size.\n"
568 "Setting ``sample=blocksize``"
569 )
570 sample = blocksize
571 b_out = read_bytes(
572 urlpath,
573 delimiter=b_lineterminator,
574 blocksize=blocksize,
575 sample=sample,
576 compression=compression,
577 include_path=include_path_column,
578 **(storage_options or {}),
579 )
580
581 if include_path_column:
582 b_sample, values, paths = b_out
583 path = (include_path_column, path_converter)
584 else:
585 b_sample, values = b_out
586 path = None
587
588 if not isinstance(values[0], (tuple, list)):
589 values = [values]
590 # If we have not sampled, then use the first row of the first values
591 # as a representative sample.
592 if b_sample is False and len(values[0]):
593 b_sample = values[0][0].compute()
594
595 # Get header row, and check that sample is long enough. If the file
596 # contains a header row, we need at least 2 nonempty rows + the number of
597 # rows to skip.
598 names = kwargs.get("names", None)
599 header = kwargs.get("header", "infer" if names is None else None)
600 need = 1 if header is None else 2
601 if isinstance(header, int):
602 firstrow += header
603 if kwargs.get("comment"):
604 # if comment is provided, step through lines of b_sample and strip out comments
605 parts = []
606 for part in b_sample.split(b_lineterminator):
607 split_comment = part.decode().split(kwargs.get("comment"))
608 if len(split_comment) > 1:
609 # if line starts with comment, don't include that line in parts.
610 if len(split_comment[0]) > 0:
611 parts.append(split_comment[0].strip().encode())
612 else:
613 parts.append(part)
614 if len(parts) > need:
615 break
616 else:
617 parts = b_sample.split(
618 b_lineterminator, max(lastskiprow + need, firstrow + need)
619 )
620
621 # If the last partition is empty, don't count it
622 nparts = 0 if not parts else len(parts) - int(not parts[-1])
623
624 if sample is not False and nparts < lastskiprow + need and len(b_sample) >= sample:
625 raise ValueError(
626 "Sample is not large enough to include at least one "
627 "row of data. Please increase the number of bytes "
628 "in `sample` in the call to `read_csv`/`read_table`"
629 )
630
631 header = b"" if header is None else parts[firstrow] + b_lineterminator
632
633 # Use sample to infer dtypes and check for presence of include_path_column
634 head_kwargs = kwargs.copy()
635 head_kwargs.pop("skipfooter", None)
636 if head_kwargs.get("engine") == "pyarrow":
637 # Use c engine to infer since Arrow engine does not support nrows
638 head_kwargs["engine"] = "c"
639 try:
640 head = reader(BytesIO(b_sample), nrows=sample_rows, **head_kwargs)
641 except pd.errors.ParserError as e:
642 if "EOF" in str(e):
643 raise ValueError(
644 "EOF encountered while reading header. \n"
645 "Pass argument `sample_rows` and make sure the value of `sample` "
646 "is large enough to accommodate that many rows of data"
647 ) from e
648 raise
649 if include_path_column and (include_path_column in head.columns):
650 raise ValueError(
651 "Files already contain the column name: %s, so the "
652 "path column cannot use this name. Please set "
653 "`include_path_column` to a unique name." % include_path_column
654 )
655
656 specified_dtypes = kwargs.get("dtype", {})
657 if specified_dtypes is None:
658 specified_dtypes = {}
659 # If specified_dtypes is a single type, then all columns were specified
660 if assume_missing and isinstance(specified_dtypes, dict):
661 # Convert all non-specified integer columns to floats
662 for c in head.columns:
663 if is_integer_dtype(head[c].dtype) and c not in specified_dtypes:
664 head[c] = head[c].astype(float)
665
666 values = [[list(dsk.dask.values()) for dsk in block] for block in values]
667
668 return text_blocks_to_pandas(
669 reader,
670 values,
671 header,
672 head,
673 kwargs,
674 enforce=enforce,
675 specified_dtypes=specified_dtypes,
676 path=path,
677 blocksize=blocksize,
678 urlpath=urlpath,
679 )
680
681
682READ_DOC_TEMPLATE = """
683Read {file_type} files into a Dask.DataFrame
684
685This parallelizes the :func:`pandas.{reader}` function in the following ways:
686
687- It supports loading many files at once using globstrings:
688
689 >>> df = dd.{reader}('myfiles.*.csv') # doctest: +SKIP
690
691- In some cases it can break up large files:
692
693 >>> df = dd.{reader}('largefile.csv', blocksize=25e6) # 25MB chunks # doctest: +SKIP
694
695- It can read CSV files from external resources (e.g. S3, HDFS) by
696 providing a URL:
697
698 >>> df = dd.{reader}('s3://bucket/myfiles.*.csv') # doctest: +SKIP
699 >>> df = dd.{reader}('hdfs:///myfiles.*.csv') # doctest: +SKIP
700 >>> df = dd.{reader}('hdfs://namenode.example.com/myfiles.*.csv') # doctest: +SKIP
701
702Internally ``dd.{reader}`` uses :func:`pandas.{reader}` and supports many of the
703same keyword arguments with the same performance guarantees. See the docstring
704for :func:`pandas.{reader}` for more information on available keyword arguments.
705
706Parameters
707----------
708urlpath : string or list
709 Absolute or relative filepath(s). Prefix with a protocol like ``s3://``
710 to read from alternative filesystems. To read from multiple files you
711 can pass a globstring or a list of paths, with the caveat that they
712 must all have the same protocol.
713blocksize : str, int or None, optional
714 Number of bytes by which to cut up larger files. Default value is computed
715 based on available physical memory and the number of cores, up to a maximum
716 of 64MB. Can be a number like ``64000000`` or a string like ``"64MB"``. If
717 ``None``, a single block is used for each file.
718sample : int, optional
719 Number of bytes to use when determining dtypes
720assume_missing : bool, optional
721 If True, all integer columns that aren't specified in ``dtype`` are assumed
722 to contain missing values, and are converted to floats. Default is False.
723storage_options : dict, optional
724 Extra options that make sense for a particular storage connection, e.g.
725 host, port, username, password, etc.
726include_path_column : bool or str, optional
727 Whether or not to include the path to each particular file. If True a new
728 column is added to the dataframe called ``path``. If str, sets new column
729 name. Default is False.
730**kwargs
731 Extra keyword arguments to forward to :func:`pandas.{reader}`.
732
733Notes
734-----
735Dask dataframe tries to infer the ``dtype`` of each column by reading a sample
736from the start of the file (or of the first file if it's a glob). Usually this
737works fine, but if the ``dtype`` is different later in the file (or in other
738files) this can cause issues. For example, if all the rows in the sample had
739integer dtypes, but later on there was a ``NaN``, then this would error at
740compute time. To fix this, you have a few options:
741
742- Provide explicit dtypes for the offending columns using the ``dtype``
743 keyword. This is the recommended solution.
744
745- Use the ``assume_missing`` keyword to assume that all columns inferred as
746 integers contain missing values, and convert them to floats.
747
748- Increase the size of the sample using the ``sample`` keyword.
749
750It should also be noted that this function may fail if a {file_type} file
751includes quoted strings that contain the line terminator. To get around this
752you can specify ``blocksize=None`` to not split files into multiple partitions,
753at the cost of reduced parallelism.
754"""
755
756
757def make_reader(reader, reader_name, file_type):
758 def read(
759 urlpath,
760 blocksize="default",
761 lineterminator=None,
762 compression="infer",
763 sample=256000,
764 sample_rows=10,
765 enforce=False,
766 assume_missing=False,
767 storage_options=None,
768 include_path_column=False,
769 **kwargs,
770 ):
771 return read_pandas(
772 reader,
773 urlpath,
774 blocksize=blocksize,
775 lineterminator=lineterminator,
776 compression=compression,
777 sample=sample,
778 sample_rows=sample_rows,
779 enforce=enforce,
780 assume_missing=assume_missing,
781 storage_options=storage_options,
782 include_path_column=include_path_column,
783 **kwargs,
784 )
785
786 read.__doc__ = READ_DOC_TEMPLATE.format(reader=reader_name, file_type=file_type)
787 read.__name__ = reader_name
788 return read
789
790
791read_csv = dataframe_creation_dispatch.register_inplace(
792 backend="pandas",
793 name="read_csv",
794)(make_reader(pd.read_csv, "read_csv", "CSV"))
795
796
797read_table = make_reader(pd.read_table, "read_table", "delimited")
798read_fwf = make_reader(pd.read_fwf, "read_fwf", "fixed-width")
799
800
801def _write_csv(df, fil, *, depend_on=None, **kwargs):
802 with fil as f:
803 df.to_csv(f, **kwargs)
804 return os.path.normpath(fil.path)
805
806
807def to_csv(
808 df,
809 filename,
810 single_file=False,
811 encoding="utf-8",
812 mode="wt",
813 name_function=None,
814 compression=None,
815 compute=True,
816 scheduler=None,
817 storage_options=None,
818 header_first_partition_only=None,
819 compute_kwargs=None,
820 **kwargs,
821):
822 """
823 Store Dask DataFrame to CSV files
824
825 One filename per partition will be created. You can specify the
826 filenames in a variety of ways.
827
828 Use a globstring::
829
830 >>> df.to_csv('/path/to/data/export-*.csv') # doctest: +SKIP
831
832 The * will be replaced by the increasing sequence 0, 1, 2, ...
833
834 ::
835
836 /path/to/data/export-0.csv
837 /path/to/data/export-1.csv
838
839 Use a globstring and a ``name_function=`` keyword argument. The
840 name_function function should expect an integer and produce a string.
841 Strings produced by name_function must preserve the order of their
842 respective partition indices.
843
844 >>> from datetime import date, timedelta
845 >>> def name(i):
846 ... return str(date(2015, 1, 1) + i * timedelta(days=1))
847
848 >>> name(0)
849 '2015-01-01'
850 >>> name(15)
851 '2015-01-16'
852
853 >>> df.to_csv('/path/to/data/export-*.csv', name_function=name) # doctest: +SKIP
854
855 ::
856
857 /path/to/data/export-2015-01-01.csv
858 /path/to/data/export-2015-01-02.csv
859 ...
860
861 You can also provide an explicit list of paths::
862
863 >>> paths = ['/path/to/data/alice.csv', '/path/to/data/bob.csv', ...] # doctest: +SKIP
864 >>> df.to_csv(paths) # doctest: +SKIP
865
866 You can also provide a directory name:
867
868 >>> df.to_csv('/path/to/data') # doctest: +SKIP
869
870 The files will be numbered 0, 1, 2, (and so on) suffixed with '.part':
871
872 ::
873
874 /path/to/data/0.part
875 /path/to/data/1.part
876
877 Parameters
878 ----------
879 df : dask.DataFrame
880 Data to save
881 filename : string or list
882 Absolute or relative filepath(s). Prefix with a protocol like ``s3://``
883 to save to remote filesystems.
884 single_file : bool, default False
885 Whether to save everything into a single CSV file. Under the
886 single file mode, each partition is appended at the end of the
887 specified CSV file.
888 encoding : string, default 'utf-8'
889 A string representing the encoding to use in the output file.
890 mode : str, default 'w'
891 Python file mode. The default is 'w' (or 'wt'), for writing
892 a new file or overwriting an existing file in text mode. 'a'
893 (or 'at') will append to an existing file in text mode or
894 create a new file if it does not already exist. See :py:func:`open`.
895 name_function : callable, default None
896 Function accepting an integer (partition index) and producing a
897 string to replace the asterisk in the given filename globstring.
898 Should preserve the lexicographic order of partitions. Not
899 supported when ``single_file`` is True.
900 compression : string, optional
901 A string representing the compression to use in the output file,
902 allowed values are 'gzip', 'bz2', 'xz',
903 only used when the first argument is a filename.
904 compute : bool, default True
905 If True, immediately executes. If False, returns a set of delayed
906 objects, which can be computed at a later time.
907 storage_options : dict
908 Parameters passed on to the backend filesystem class.
909 header_first_partition_only : bool, default None
910 If set to True, only write the header row in the first output
911 file. By default, headers are written to all partitions under
912 the multiple file mode (``single_file`` is False) and written
913 only once under the single file mode (``single_file`` is True).
914 It must be True under the single file mode.
915 compute_kwargs : dict, optional
916 Options to be passed in to the compute method
917 kwargs : dict, optional
918 Additional parameters to pass to :meth:`pandas.DataFrame.to_csv`.
919
920 Returns
921 -------
922 The names of the file written if they were computed right away.
923 If not, the delayed tasks associated with writing the files.
924
925 Raises
926 ------
927 ValueError
928 If ``header_first_partition_only`` is set to False or
929 ``name_function`` is specified when ``single_file`` is True.
930
931 See Also
932 --------
933 fsspec.open_files
934 """
935 if single_file and name_function is not None:
936 raise ValueError("name_function is not supported under the single file mode")
937 if header_first_partition_only is None:
938 header_first_partition_only = single_file
939 elif not header_first_partition_only and single_file:
940 raise ValueError(
941 "header_first_partition_only cannot be False in the single file mode."
942 )
943 file_options = dict(
944 compression=compression,
945 encoding=encoding,
946 newline="",
947 **(storage_options or {}),
948 )
949 to_csv_chunk = delayed(_write_csv, pure=False)
950 dfs = df.to_delayed()
951 if single_file:
952 first_file = open_file(filename, mode=mode, **file_options)
953 value = to_csv_chunk(dfs[0], first_file, **kwargs)
954 append_mode = mode if "a" in mode else mode + "a"
955 append_mode = append_mode.replace("w", "").replace("x", "")
956 append_file = open_file(filename, mode=append_mode, **file_options)
957 kwargs["header"] = False
958 for d in dfs[1:]:
959 value = to_csv_chunk(d, append_file, depend_on=value, **kwargs)
960 values = [value]
961 files = [first_file]
962 else:
963 files = open_files(
964 filename,
965 mode=mode,
966 name_function=name_function,
967 num=df.npartitions,
968 **file_options,
969 )
970 values = [to_csv_chunk(dfs[0], files[0], **kwargs)]
971 if header_first_partition_only:
972 kwargs["header"] = False
973 values.extend(
974 [to_csv_chunk(d, f, **kwargs) for d, f in zip(dfs[1:], files[1:])]
975 )
976 if compute:
977 if compute_kwargs is None:
978 compute_kwargs = dict()
979
980 if scheduler is not None:
981 warn(
982 "The 'scheduler' keyword argument for `to_csv()` is deprecated and"
983 "will be removed in a future version. "
984 "Please use the `compute_kwargs` argument instead. "
985 f"For example, df.to_csv(..., compute_kwargs={{scheduler: {scheduler}}})",
986 FutureWarning,
987 )
988
989 if (
990 scheduler is not None
991 and compute_kwargs.get("scheduler") is not None
992 and compute_kwargs.get("scheduler") != scheduler
993 ):
994 raise ValueError(
995 f"Differing values for 'scheduler' have been passed in.\n"
996 f"scheduler argument: {scheduler}\n"
997 f"via compute_kwargs: {compute_kwargs.get('scheduler')}"
998 )
999
1000 if scheduler is not None and compute_kwargs.get("scheduler") is None:
1001 compute_kwargs["scheduler"] = scheduler
1002
1003 import dask
1004
1005 return list(dask.compute(*values, **compute_kwargs))
1006 else:
1007 return values
1008
1009
1010from dask.dataframe.core import _Frame
1011
1012_Frame.to_csv.__doc__ = to_csv.__doc__