1from __future__ import annotations
2
3from contextlib import contextmanager
4import os
5from pathlib import Path
6import tempfile
7from typing import (
8 IO,
9 TYPE_CHECKING,
10 Any,
11)
12import uuid
13
14from pandas._config import using_copy_on_write
15
16from pandas.compat import PYPY
17from pandas.errors import ChainedAssignmentError
18
19from pandas import set_option
20
21from pandas.io.common import get_handle
22
23if TYPE_CHECKING:
24 from collections.abc import Generator
25
26 from pandas._typing import (
27 BaseBuffer,
28 CompressionOptions,
29 FilePath,
30 )
31
32
33@contextmanager
34def decompress_file(
35 path: FilePath | BaseBuffer, compression: CompressionOptions
36) -> Generator[IO[bytes], None, None]:
37 """
38 Open a compressed file and return a file object.
39
40 Parameters
41 ----------
42 path : str
43 The path where the file is read from.
44
45 compression : {'gzip', 'bz2', 'zip', 'xz', 'zstd', None}
46 Name of the decompression to use
47
48 Returns
49 -------
50 file object
51 """
52 with get_handle(path, "rb", compression=compression, is_text=False) as handle:
53 yield handle.handle
54
55
56@contextmanager
57def set_timezone(tz: str) -> Generator[None, None, None]:
58 """
59 Context manager for temporarily setting a timezone.
60
61 Parameters
62 ----------
63 tz : str
64 A string representing a valid timezone.
65
66 Examples
67 --------
68 >>> from datetime import datetime
69 >>> from dateutil.tz import tzlocal
70 >>> tzlocal().tzname(datetime(2021, 1, 1)) # doctest: +SKIP
71 'IST'
72
73 >>> with set_timezone('US/Eastern'):
74 ... tzlocal().tzname(datetime(2021, 1, 1))
75 ...
76 'EST'
77 """
78 import time
79
80 def setTZ(tz) -> None:
81 if tz is None:
82 try:
83 del os.environ["TZ"]
84 except KeyError:
85 pass
86 else:
87 os.environ["TZ"] = tz
88 time.tzset()
89
90 orig_tz = os.environ.get("TZ")
91 setTZ(tz)
92 try:
93 yield
94 finally:
95 setTZ(orig_tz)
96
97
98@contextmanager
99def ensure_clean(
100 filename=None, return_filelike: bool = False, **kwargs: Any
101) -> Generator[Any, None, None]:
102 """
103 Gets a temporary path and agrees to remove on close.
104
105 This implementation does not use tempfile.mkstemp to avoid having a file handle.
106 If the code using the returned path wants to delete the file itself, windows
107 requires that no program has a file handle to it.
108
109 Parameters
110 ----------
111 filename : str (optional)
112 suffix of the created file.
113 return_filelike : bool (default False)
114 if True, returns a file-like which is *always* cleaned. Necessary for
115 savefig and other functions which want to append extensions.
116 **kwargs
117 Additional keywords are passed to open().
118
119 """
120 folder = Path(tempfile.gettempdir())
121
122 if filename is None:
123 filename = ""
124 filename = str(uuid.uuid4()) + filename
125 path = folder / filename
126
127 path.touch()
128
129 handle_or_str: str | IO = str(path)
130 encoding = kwargs.pop("encoding", None)
131 if return_filelike:
132 kwargs.setdefault("mode", "w+b")
133 if encoding is None and "b" not in kwargs["mode"]:
134 encoding = "utf-8"
135 handle_or_str = open(path, encoding=encoding, **kwargs)
136
137 try:
138 yield handle_or_str
139 finally:
140 if not isinstance(handle_or_str, str):
141 handle_or_str.close()
142 if path.is_file():
143 path.unlink()
144
145
146@contextmanager
147def with_csv_dialect(name: str, **kwargs) -> Generator[None, None, None]:
148 """
149 Context manager to temporarily register a CSV dialect for parsing CSV.
150
151 Parameters
152 ----------
153 name : str
154 The name of the dialect.
155 kwargs : mapping
156 The parameters for the dialect.
157
158 Raises
159 ------
160 ValueError : the name of the dialect conflicts with a builtin one.
161
162 See Also
163 --------
164 csv : Python's CSV library.
165 """
166 import csv
167
168 _BUILTIN_DIALECTS = {"excel", "excel-tab", "unix"}
169
170 if name in _BUILTIN_DIALECTS:
171 raise ValueError("Cannot override builtin dialect.")
172
173 csv.register_dialect(name, **kwargs)
174 try:
175 yield
176 finally:
177 csv.unregister_dialect(name)
178
179
180@contextmanager
181def use_numexpr(use, min_elements=None) -> Generator[None, None, None]:
182 from pandas.core.computation import expressions as expr
183
184 if min_elements is None:
185 min_elements = expr._MIN_ELEMENTS
186
187 olduse = expr.USE_NUMEXPR
188 oldmin = expr._MIN_ELEMENTS
189 set_option("compute.use_numexpr", use)
190 expr._MIN_ELEMENTS = min_elements
191 try:
192 yield
193 finally:
194 expr._MIN_ELEMENTS = oldmin
195 set_option("compute.use_numexpr", olduse)
196
197
198def raises_chained_assignment_error(warn=True, extra_warnings=(), extra_match=()):
199 from pandas._testing import assert_produces_warning
200
201 if not warn:
202 from contextlib import nullcontext
203
204 return nullcontext()
205
206 if PYPY and not extra_warnings:
207 from contextlib import nullcontext
208
209 return nullcontext()
210 elif PYPY and extra_warnings:
211 return assert_produces_warning(
212 extra_warnings,
213 match="|".join(extra_match),
214 )
215 else:
216 if using_copy_on_write():
217 warning = ChainedAssignmentError
218 match = (
219 "A value is trying to be set on a copy of a DataFrame or Series "
220 "through chained assignment"
221 )
222 else:
223 warning = FutureWarning # type: ignore[assignment]
224 # TODO update match
225 match = "ChainedAssignmentError"
226 if extra_warnings:
227 warning = (warning, *extra_warnings) # type: ignore[assignment]
228 return assert_produces_warning(
229 warning,
230 match="|".join((match, *extra_match)),
231 )
232
233
234def assert_cow_warning(warn=True, match=None, **kwargs):
235 """
236 Assert that a warning is raised in the CoW warning mode.
237
238 Parameters
239 ----------
240 warn : bool, default True
241 By default, check that a warning is raised. Can be turned off by passing False.
242 match : str
243 The warning message to match against, if different from the default.
244 kwargs
245 Passed through to assert_produces_warning
246 """
247 from pandas._testing import assert_produces_warning
248
249 if not warn:
250 from contextlib import nullcontext
251
252 return nullcontext()
253
254 if not match:
255 match = "Setting a value on a view"
256
257 return assert_produces_warning(FutureWarning, match=match, **kwargs)