1from __future__ import annotations
2
3from contextlib import contextmanager
4import os
5from pathlib import Path
6import tempfile
7from typing import (
8 IO,
9 Any,
10 Generator,
11)
12import uuid
13
14from pandas._typing import (
15 BaseBuffer,
16 CompressionOptions,
17 FilePath,
18)
19from pandas.compat import PYPY
20from pandas.errors import ChainedAssignmentError
21
22from pandas import set_option
23
24from pandas.io.common import get_handle
25
26
27@contextmanager
28def decompress_file(
29 path: FilePath | BaseBuffer, compression: CompressionOptions
30) -> Generator[IO[bytes], None, None]:
31 """
32 Open a compressed file and return a file object.
33
34 Parameters
35 ----------
36 path : str
37 The path where the file is read from.
38
39 compression : {'gzip', 'bz2', 'zip', 'xz', 'zstd', None}
40 Name of the decompression to use
41
42 Returns
43 -------
44 file object
45 """
46 with get_handle(path, "rb", compression=compression, is_text=False) as handle:
47 yield handle.handle
48
49
50@contextmanager
51def set_timezone(tz: str) -> Generator[None, None, None]:
52 """
53 Context manager for temporarily setting a timezone.
54
55 Parameters
56 ----------
57 tz : str
58 A string representing a valid timezone.
59
60 Examples
61 --------
62 >>> from datetime import datetime
63 >>> from dateutil.tz import tzlocal
64 >>> tzlocal().tzname(datetime(2021, 1, 1)) # doctest: +SKIP
65 'IST'
66
67 >>> with set_timezone('US/Eastern'):
68 ... tzlocal().tzname(datetime(2021, 1, 1))
69 ...
70 'EST'
71 """
72 import time
73
74 def setTZ(tz) -> None:
75 if tz is None:
76 try:
77 del os.environ["TZ"]
78 except KeyError:
79 pass
80 else:
81 os.environ["TZ"] = tz
82 time.tzset()
83
84 orig_tz = os.environ.get("TZ")
85 setTZ(tz)
86 try:
87 yield
88 finally:
89 setTZ(orig_tz)
90
91
92@contextmanager
93def ensure_clean(
94 filename=None, return_filelike: bool = False, **kwargs: Any
95) -> Generator[Any, None, None]:
96 """
97 Gets a temporary path and agrees to remove on close.
98
99 This implementation does not use tempfile.mkstemp to avoid having a file handle.
100 If the code using the returned path wants to delete the file itself, windows
101 requires that no program has a file handle to it.
102
103 Parameters
104 ----------
105 filename : str (optional)
106 suffix of the created file.
107 return_filelike : bool (default False)
108 if True, returns a file-like which is *always* cleaned. Necessary for
109 savefig and other functions which want to append extensions.
110 **kwargs
111 Additional keywords are passed to open().
112
113 """
114 folder = Path(tempfile.gettempdir())
115
116 if filename is None:
117 filename = ""
118 filename = str(uuid.uuid4()) + filename
119 path = folder / filename
120
121 path.touch()
122
123 handle_or_str: str | IO = str(path)
124 if return_filelike:
125 kwargs.setdefault("mode", "w+b")
126 handle_or_str = open(path, **kwargs)
127
128 try:
129 yield handle_or_str
130 finally:
131 if not isinstance(handle_or_str, str):
132 handle_or_str.close()
133 if path.is_file():
134 path.unlink()
135
136
137@contextmanager
138def ensure_safe_environment_variables() -> Generator[None, None, None]:
139 """
140 Get a context manager to safely set environment variables
141
142 All changes will be undone on close, hence environment variables set
143 within this contextmanager will neither persist nor change global state.
144 """
145 saved_environ = dict(os.environ)
146 try:
147 yield
148 finally:
149 os.environ.clear()
150 os.environ.update(saved_environ)
151
152
153@contextmanager
154def with_csv_dialect(name, **kwargs) -> Generator[None, None, None]:
155 """
156 Context manager to temporarily register a CSV dialect for parsing CSV.
157
158 Parameters
159 ----------
160 name : str
161 The name of the dialect.
162 kwargs : mapping
163 The parameters for the dialect.
164
165 Raises
166 ------
167 ValueError : the name of the dialect conflicts with a builtin one.
168
169 See Also
170 --------
171 csv : Python's CSV library.
172 """
173 import csv
174
175 _BUILTIN_DIALECTS = {"excel", "excel-tab", "unix"}
176
177 if name in _BUILTIN_DIALECTS:
178 raise ValueError("Cannot override builtin dialect.")
179
180 csv.register_dialect(name, **kwargs)
181 try:
182 yield
183 finally:
184 csv.unregister_dialect(name)
185
186
187@contextmanager
188def use_numexpr(use, min_elements=None) -> Generator[None, None, None]:
189 from pandas.core.computation import expressions as expr
190
191 if min_elements is None:
192 min_elements = expr._MIN_ELEMENTS
193
194 olduse = expr.USE_NUMEXPR
195 oldmin = expr._MIN_ELEMENTS
196 set_option("compute.use_numexpr", use)
197 expr._MIN_ELEMENTS = min_elements
198 try:
199 yield
200 finally:
201 expr._MIN_ELEMENTS = oldmin
202 set_option("compute.use_numexpr", olduse)
203
204
205def raises_chained_assignment_error():
206 if PYPY:
207 from contextlib import nullcontext
208
209 return nullcontext()
210 else:
211 from pandas._testing import assert_produces_warning
212
213 return assert_produces_warning(
214 ChainedAssignmentError,
215 match=(
216 "A value is trying to be set on a copy of a DataFrame or Series "
217 "through chained assignment"
218 ),
219 )