1from __future__ import annotations
2
3import contextlib
4import inspect
5import os
6import re
7from typing import TYPE_CHECKING
8import warnings
9
10if TYPE_CHECKING:
11 from collections.abc import Generator
12 from types import FrameType
13
14
15@contextlib.contextmanager
16def rewrite_exception(old_name: str, new_name: str) -> Generator[None, None, None]:
17 """
18 Rewrite the message of an exception.
19 """
20 try:
21 yield
22 except Exception as err:
23 if not err.args:
24 raise
25 msg = str(err.args[0])
26 msg = msg.replace(old_name, new_name)
27 args: tuple[str, ...] = (msg,)
28 if len(err.args) > 1:
29 args = args + err.args[1:]
30 err.args = args
31 raise
32
33
34def find_stack_level() -> int:
35 """
36 Find the first place in the stack that is not inside pandas
37 (tests notwithstanding).
38 """
39
40 import pandas as pd
41
42 pkg_dir = os.path.dirname(pd.__file__)
43 test_dir = os.path.join(pkg_dir, "tests")
44
45 # https://stackoverflow.com/questions/17407119/python-inspect-stack-is-slow
46 frame: FrameType | None = inspect.currentframe()
47 try:
48 n = 0
49 while frame:
50 filename = inspect.getfile(frame)
51 if filename.startswith(pkg_dir) and not filename.startswith(test_dir):
52 frame = frame.f_back
53 n += 1
54 else:
55 break
56 finally:
57 # See note in
58 # https://docs.python.org/3/library/inspect.html#inspect.Traceback
59 del frame
60 return n
61
62
63@contextlib.contextmanager
64def rewrite_warning(
65 target_message: str,
66 target_category: type[Warning],
67 new_message: str,
68 new_category: type[Warning] | None = None,
69) -> Generator[None, None, None]:
70 """
71 Rewrite the message of a warning.
72
73 Parameters
74 ----------
75 target_message : str
76 Warning message to match.
77 target_category : Warning
78 Warning type to match.
79 new_message : str
80 New warning message to emit.
81 new_category : Warning or None, default None
82 New warning type to emit. When None, will be the same as target_category.
83 """
84 if new_category is None:
85 new_category = target_category
86 with warnings.catch_warnings(record=True) as record:
87 yield
88 if len(record) > 0:
89 match = re.compile(target_message)
90 for warning in record:
91 if warning.category is target_category and re.search(
92 match, str(warning.message)
93 ):
94 category = new_category
95 message: Warning | str = new_message
96 else:
97 category, message = warning.category, warning.message
98 warnings.warn_explicit(
99 message=message,
100 category=category,
101 filename=warning.filename,
102 lineno=warning.lineno,
103 )