1"""Use zoneinfo timezones"""
2
3from __future__ import annotations
4
5import copy
6import copyreg
7import functools
8from datetime import datetime, tzinfo
9from io import StringIO
10from typing import TYPE_CHECKING, Optional
11
12from dateutil.rrule import rrule, rruleset
13from dateutil.tz import tzical
14from dateutil.tz.tz import _tzicalvtz
15
16from icalendar.compatibility import zoneinfo
17from icalendar.tools import is_date, to_datetime
18
19from .provider import TZProvider
20
21if TYPE_CHECKING:
22 from icalendar import prop
23 from icalendar.cal import Timezone
24 from icalendar.prop import vDDDTypes
25
26
27class ZONEINFO(TZProvider):
28 """Provide icalendar with timezones from zoneinfo."""
29
30 name = "zoneinfo"
31 utc = zoneinfo.ZoneInfo("UTC")
32 _available_timezones = zoneinfo.available_timezones()
33
34 def localize(self, dt: datetime, tz: zoneinfo.ZoneInfo) -> datetime:
35 """Localize a datetime to a timezone."""
36 return dt.replace(tzinfo=tz)
37
38 def localize_utc(self, dt: datetime) -> datetime:
39 """Return the datetime in UTC."""
40 if getattr(dt, "tzinfo", False) and dt.tzinfo is not None:
41 return dt.astimezone(self.utc)
42 return self.localize(dt, self.utc)
43
44 def timezone(self, name: str) -> Optional[tzinfo]:
45 """Return a timezone with a name or None if we cannot find it."""
46 try:
47 return zoneinfo.ZoneInfo(name)
48 except zoneinfo.ZoneInfoNotFoundError:
49 pass
50 except ValueError:
51 # ValueError: ZoneInfo keys may not be absolute paths, got: /Europe/CUSTOM
52 pass
53
54 def knows_timezone_id(self, tzid: str) -> bool:
55 """Whether the timezone is already cached by the implementation."""
56 return tzid in self._available_timezones
57
58 def fix_rrule_until(self, rrule: rrule, ical_rrule: prop.vRecur) -> None:
59 """Make sure the until value works for the rrule generated from the ical_rrule.""" # noqa: E501
60 if not {"UNTIL", "COUNT"}.intersection(ical_rrule.keys()):
61 # zoninfo does not know any transition dates after 2038
62 rrule._until = datetime(2038, 12, 31, tzinfo=self.utc) # noqa: SLF001
63
64 def create_timezone(self, tz: Timezone.Timezone) -> tzinfo:
65 """Create a timezone from the given information."""
66 try:
67 return self._create_timezone(tz)
68 except ValueError:
69 # We might have a custom component in there.
70 # see https://github.com/python/cpython/issues/120217
71 tz = copy.deepcopy(tz)
72 for sub in tz.walk():
73 for attr in list(sub.keys()):
74 if attr.lower().startswith("x-"):
75 sub.pop(attr)
76 for sub in tz.subcomponents:
77 start: vDDDTypes = sub.get("DTSTART")
78 if start and is_date(start.dt):
79 # ValueError: Unsupported DTSTART param in VTIMEZONE: VALUE=DATE
80 sub.DTSTART = to_datetime(start.dt)
81 return self._create_timezone(tz)
82
83 def _create_timezone(self, tz: Timezone.Timezone) -> tzinfo:
84 """Create a timezone and maybe fail"""
85 file = StringIO(tz.to_ical().decode("UTF-8", "replace"))
86 return tzical(file).get()
87
88 def uses_pytz(self) -> bool:
89 """Whether we use pytz."""
90 return False
91
92 def uses_zoneinfo(self) -> bool:
93 """Whether we use zoneinfo."""
94 return True
95
96
97def pickle_tzicalvtz(tzicalvtz: _tzicalvtz):
98 """Because we use dateutil.tzical, we need to make it pickle-able."""
99 return _tzicalvtz, (tzicalvtz._tzid, tzicalvtz._comps) # noqa: SLF001
100
101
102copyreg.pickle(_tzicalvtz, pickle_tzicalvtz)
103
104
105def pickle_rrule_with_cache(self: rrule):
106 """Make sure we can also pickle rrules that cache.
107
108 This is mainly copied from rrule.replace.
109 """
110 new_kwargs = {
111 "interval": self._interval,
112 "count": self._count,
113 "dtstart": self._dtstart,
114 "freq": self._freq,
115 "until": self._until,
116 "wkst": self._wkst,
117 "cache": self._cache is not None,
118 }
119 new_kwargs.update(self._original_rule)
120 # from https://stackoverflow.com/a/64915638/1320237
121 return functools.partial(rrule, new_kwargs.pop("freq"), **new_kwargs), ()
122
123
124copyreg.pickle(rrule, pickle_rrule_with_cache)
125
126
127def pickle_rruleset_with_cache(rs: rruleset):
128 """Pickle an rruleset."""
129 return unpickle_rruleset_with_cache, (
130 rs._rrule, # noqa: SLF001
131 rs._rdate, # noqa: SLF001
132 rs._exrule, # noqa: SLF001
133 rs._exdate, # noqa: SLF001
134 rs._cache is not None, # noqa: SLF001
135 )
136
137
138def unpickle_rruleset_with_cache(rrule, rdate, exrule, exdate, cache):
139 """unpickling the rruleset."""
140 rs = rruleset(cache)
141 for o in rrule:
142 rs.rrule(o)
143 for o in rdate:
144 rs.rdate(o)
145 for o in exrule:
146 rs.exrule(o)
147 for o in exdate:
148 rs.exdate(o)
149 return rs
150
151
152copyreg.pickle(rruleset, pickle_rruleset_with_cache)
153
154__all__ = ["ZONEINFO"]