Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.9/dist-packages/pandas/_testing/contexts.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

105 statements  

1from __future__ import annotations 

2 

3from contextlib import contextmanager 

4import os 

5from pathlib import Path 

6import tempfile 

7from typing import ( 

8 IO, 

9 TYPE_CHECKING, 

10 Any, 

11) 

12import uuid 

13 

14from pandas._config import using_copy_on_write 

15 

16from pandas.compat import PYPY 

17from pandas.errors import ChainedAssignmentError 

18 

19from pandas import set_option 

20 

21from pandas.io.common import get_handle 

22 

23if TYPE_CHECKING: 

24 from collections.abc import Generator 

25 

26 from pandas._typing import ( 

27 BaseBuffer, 

28 CompressionOptions, 

29 FilePath, 

30 ) 

31 

32 

33@contextmanager 

34def decompress_file( 

35 path: FilePath | BaseBuffer, compression: CompressionOptions 

36) -> Generator[IO[bytes], None, None]: 

37 """ 

38 Open a compressed file and return a file object. 

39 

40 Parameters 

41 ---------- 

42 path : str 

43 The path where the file is read from. 

44 

45 compression : {'gzip', 'bz2', 'zip', 'xz', 'zstd', None} 

46 Name of the decompression to use 

47 

48 Returns 

49 ------- 

50 file object 

51 """ 

52 with get_handle(path, "rb", compression=compression, is_text=False) as handle: 

53 yield handle.handle 

54 

55 

56@contextmanager 

57def set_timezone(tz: str) -> Generator[None, None, None]: 

58 """ 

59 Context manager for temporarily setting a timezone. 

60 

61 Parameters 

62 ---------- 

63 tz : str 

64 A string representing a valid timezone. 

65 

66 Examples 

67 -------- 

68 >>> from datetime import datetime 

69 >>> from dateutil.tz import tzlocal 

70 >>> tzlocal().tzname(datetime(2021, 1, 1)) # doctest: +SKIP 

71 'IST' 

72 

73 >>> with set_timezone('US/Eastern'): 

74 ... tzlocal().tzname(datetime(2021, 1, 1)) 

75 ... 

76 'EST' 

77 """ 

78 import time 

79 

80 def setTZ(tz) -> None: 

81 if tz is None: 

82 try: 

83 del os.environ["TZ"] 

84 except KeyError: 

85 pass 

86 else: 

87 os.environ["TZ"] = tz 

88 time.tzset() 

89 

90 orig_tz = os.environ.get("TZ") 

91 setTZ(tz) 

92 try: 

93 yield 

94 finally: 

95 setTZ(orig_tz) 

96 

97 

98@contextmanager 

99def ensure_clean( 

100 filename=None, return_filelike: bool = False, **kwargs: Any 

101) -> Generator[Any, None, None]: 

102 """ 

103 Gets a temporary path and agrees to remove on close. 

104 

105 This implementation does not use tempfile.mkstemp to avoid having a file handle. 

106 If the code using the returned path wants to delete the file itself, windows 

107 requires that no program has a file handle to it. 

108 

109 Parameters 

110 ---------- 

111 filename : str (optional) 

112 suffix of the created file. 

113 return_filelike : bool (default False) 

114 if True, returns a file-like which is *always* cleaned. Necessary for 

115 savefig and other functions which want to append extensions. 

116 **kwargs 

117 Additional keywords are passed to open(). 

118 

119 """ 

120 folder = Path(tempfile.gettempdir()) 

121 

122 if filename is None: 

123 filename = "" 

124 filename = str(uuid.uuid4()) + filename 

125 path = folder / filename 

126 

127 path.touch() 

128 

129 handle_or_str: str | IO = str(path) 

130 encoding = kwargs.pop("encoding", None) 

131 if return_filelike: 

132 kwargs.setdefault("mode", "w+b") 

133 if encoding is None and "b" not in kwargs["mode"]: 

134 encoding = "utf-8" 

135 handle_or_str = open(path, encoding=encoding, **kwargs) 

136 

137 try: 

138 yield handle_or_str 

139 finally: 

140 if not isinstance(handle_or_str, str): 

141 handle_or_str.close() 

142 if path.is_file(): 

143 path.unlink() 

144 

145 

146@contextmanager 

147def with_csv_dialect(name: str, **kwargs) -> Generator[None, None, None]: 

148 """ 

149 Context manager to temporarily register a CSV dialect for parsing CSV. 

150 

151 Parameters 

152 ---------- 

153 name : str 

154 The name of the dialect. 

155 kwargs : mapping 

156 The parameters for the dialect. 

157 

158 Raises 

159 ------ 

160 ValueError : the name of the dialect conflicts with a builtin one. 

161 

162 See Also 

163 -------- 

164 csv : Python's CSV library. 

165 """ 

166 import csv 

167 

168 _BUILTIN_DIALECTS = {"excel", "excel-tab", "unix"} 

169 

170 if name in _BUILTIN_DIALECTS: 

171 raise ValueError("Cannot override builtin dialect.") 

172 

173 csv.register_dialect(name, **kwargs) 

174 try: 

175 yield 

176 finally: 

177 csv.unregister_dialect(name) 

178 

179 

180@contextmanager 

181def use_numexpr(use, min_elements=None) -> Generator[None, None, None]: 

182 from pandas.core.computation import expressions as expr 

183 

184 if min_elements is None: 

185 min_elements = expr._MIN_ELEMENTS 

186 

187 olduse = expr.USE_NUMEXPR 

188 oldmin = expr._MIN_ELEMENTS 

189 set_option("compute.use_numexpr", use) 

190 expr._MIN_ELEMENTS = min_elements 

191 try: 

192 yield 

193 finally: 

194 expr._MIN_ELEMENTS = oldmin 

195 set_option("compute.use_numexpr", olduse) 

196 

197 

198def raises_chained_assignment_error(warn=True, extra_warnings=(), extra_match=()): 

199 from pandas._testing import assert_produces_warning 

200 

201 if not warn: 

202 from contextlib import nullcontext 

203 

204 return nullcontext() 

205 

206 if PYPY and not extra_warnings: 

207 from contextlib import nullcontext 

208 

209 return nullcontext() 

210 elif PYPY and extra_warnings: 

211 return assert_produces_warning( 

212 extra_warnings, 

213 match="|".join(extra_match), 

214 ) 

215 else: 

216 if using_copy_on_write(): 

217 warning = ChainedAssignmentError 

218 match = ( 

219 "A value is trying to be set on a copy of a DataFrame or Series " 

220 "through chained assignment" 

221 ) 

222 else: 

223 warning = FutureWarning # type: ignore[assignment] 

224 # TODO update match 

225 match = "ChainedAssignmentError" 

226 if extra_warnings: 

227 warning = (warning, *extra_warnings) # type: ignore[assignment] 

228 return assert_produces_warning( 

229 warning, 

230 match="|".join((match, *extra_match)), 

231 ) 

232 

233 

234def assert_cow_warning(warn=True, match=None, **kwargs): 

235 """ 

236 Assert that a warning is raised in the CoW warning mode. 

237 

238 Parameters 

239 ---------- 

240 warn : bool, default True 

241 By default, check that a warning is raised. Can be turned off by passing False. 

242 match : str 

243 The warning message to match against, if different from the default. 

244 kwargs 

245 Passed through to assert_produces_warning 

246 """ 

247 from pandas._testing import assert_produces_warning 

248 

249 if not warn: 

250 from contextlib import nullcontext 

251 

252 return nullcontext() 

253 

254 if not match: 

255 match = "Setting a value on a view" 

256 

257 return assert_produces_warning(FutureWarning, match=match, **kwargs)