1# Copyright 2017 Google LLC 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#     http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14 
    15"""Helpers for :mod:`datetime`.""" 
    16 
    17import calendar 
    18import datetime 
    19import re 
    20 
    21from google.protobuf import timestamp_pb2 
    22 
    23 
    24_UTC_EPOCH = datetime.datetime.fromtimestamp(0, datetime.timezone.utc) 
    25 
    26_RFC3339_MICROS = "%Y-%m-%dT%H:%M:%S.%fZ" 
    27_RFC3339_NO_FRACTION = "%Y-%m-%dT%H:%M:%S" 
    28# datetime.strptime cannot handle nanosecond precision:  parse w/ regex 
    29_RFC3339_NANOS = re.compile( 
    30    r""" 
    31    (?P<no_fraction> 
    32        \d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}  # YYYY-MM-DDTHH:MM:SS 
    33    ) 
    34    (                                        # Optional decimal part 
    35     \.                                      # decimal point 
    36     (?P<nanos>\d{1,9})                      # nanoseconds, maybe truncated 
    37    )? 
    38    Z                                        # Zulu 
    39""", 
    40    re.VERBOSE, 
    41) 
    42 
    43 
    44def _from_microseconds(value): 
    45    """Convert timestamp in microseconds since the unix epoch to datetime. 
    46 
    47    Args: 
    48        value (float): The timestamp to convert, in microseconds. 
    49 
    50    Returns: 
    51        datetime.datetime: The datetime object equivalent to the timestamp in 
    52            UTC. 
    53    """ 
    54    return _UTC_EPOCH + datetime.timedelta(microseconds=value) 
    55 
    56 
    57def _to_rfc3339(value, ignore_zone=True): 
    58    """Convert a datetime to an RFC3339 timestamp string. 
    59 
    60    Args: 
    61        value (datetime.datetime): 
    62            The datetime object to be converted to a string. 
    63        ignore_zone (bool): If True, then the timezone (if any) of the 
    64            datetime object is ignored and the datetime is treated as UTC. 
    65 
    66    Returns: 
    67        str: The RFC3339 formatted string representing the datetime. 
    68    """ 
    69    if not ignore_zone and value.tzinfo is not None: 
    70        # Convert to UTC and remove the time zone info. 
    71        value = value.replace(tzinfo=None) - value.utcoffset() 
    72 
    73    return value.strftime(_RFC3339_MICROS) 
    74 
    75 
    76class DatetimeWithNanoseconds(datetime.datetime): 
    77    """Track nanosecond in addition to normal datetime attrs. 
    78 
    79    Nanosecond can be passed only as a keyword argument. 
    80    """ 
    81 
    82    __slots__ = ("_nanosecond",) 
    83 
    84    # pylint: disable=arguments-differ 
    85    def __new__(cls, *args, **kw): 
    86        nanos = kw.pop("nanosecond", 0) 
    87        if nanos > 0: 
    88            if "microsecond" in kw: 
    89                raise TypeError("Specify only one of 'microsecond' or 'nanosecond'") 
    90            kw["microsecond"] = nanos // 1000 
    91        inst = datetime.datetime.__new__(cls, *args, **kw) 
    92        inst._nanosecond = nanos or 0 
    93        return inst 
    94 
    95    # pylint: disable=arguments-differ 
    96    def replace(self, *args, **kw): 
    97        """Return a date with the same value, except for those parameters given 
    98        new values by whichever keyword arguments are specified. For example, 
    99        if d == date(2002, 12, 31), then 
    100        d.replace(day=26) == date(2002, 12, 26). 
    101        NOTE: nanosecond and microsecond are mutually exclusive arguments. 
    102        """ 
    103 
    104        ms_provided = "microsecond" in kw 
    105        ns_provided = "nanosecond" in kw 
    106        provided_ns = kw.pop("nanosecond", 0) 
    107 
    108        prev_nanos = self.nanosecond 
    109 
    110        if ms_provided and ns_provided: 
    111            raise TypeError("Specify only one of 'microsecond' or 'nanosecond'") 
    112 
    113        if ns_provided: 
    114            # if nanos were provided, manipulate microsecond kw arg to super 
    115            kw["microsecond"] = provided_ns // 1000 
    116        inst = super().replace(*args, **kw) 
    117 
    118        if ms_provided: 
    119            # ms were provided, nanos are invalid, build from ms 
    120            inst._nanosecond = inst.microsecond * 1000 
    121        elif ns_provided: 
    122            # ns were provided, replace nanoseconds to match after calling super 
    123            inst._nanosecond = provided_ns 
    124        else: 
    125            # if neither ms or ns were provided, passthru previous nanos. 
    126            inst._nanosecond = prev_nanos 
    127 
    128        return inst 
    129 
    130    @property 
    131    def nanosecond(self): 
    132        """Read-only: nanosecond precision.""" 
    133        return self._nanosecond or self.microsecond * 1000 
    134 
    135    def rfc3339(self): 
    136        """Return an RFC3339-compliant timestamp. 
    137 
    138        Returns: 
    139            (str): Timestamp string according to RFC3339 spec. 
    140        """ 
    141        if self._nanosecond == 0: 
    142            return _to_rfc3339(self) 
    143        nanos = str(self._nanosecond).rjust(9, "0").rstrip("0") 
    144        return "{}.{}Z".format(self.strftime(_RFC3339_NO_FRACTION), nanos) 
    145 
    146    @classmethod 
    147    def from_rfc3339(cls, stamp): 
    148        """Parse RFC3339-compliant timestamp, preserving nanoseconds. 
    149 
    150        Args: 
    151            stamp (str): RFC3339 stamp, with up to nanosecond precision 
    152 
    153        Returns: 
    154            :class:`DatetimeWithNanoseconds`: 
    155                an instance matching the timestamp string 
    156 
    157        Raises: 
    158            ValueError: if `stamp` does not match the expected format 
    159        """ 
    160        with_nanos = _RFC3339_NANOS.match(stamp) 
    161        if with_nanos is None: 
    162            raise ValueError( 
    163                "Timestamp: {}, does not match pattern: {}".format( 
    164                    stamp, _RFC3339_NANOS.pattern 
    165                ) 
    166            ) 
    167        bare = datetime.datetime.strptime( 
    168            with_nanos.group("no_fraction"), _RFC3339_NO_FRACTION 
    169        ) 
    170        fraction = with_nanos.group("nanos") 
    171        if fraction is None: 
    172            nanos = 0 
    173        else: 
    174            scale = 9 - len(fraction) 
    175            nanos = int(fraction) * (10**scale) 
    176        return cls( 
    177            bare.year, 
    178            bare.month, 
    179            bare.day, 
    180            bare.hour, 
    181            bare.minute, 
    182            bare.second, 
    183            nanosecond=nanos, 
    184            tzinfo=datetime.timezone.utc, 
    185        ) 
    186 
    187    def timestamp_pb(self): 
    188        """Return a timestamp message. 
    189 
    190        Returns: 
    191            (:class:`~google.protobuf.timestamp_pb2.Timestamp`): Timestamp message 
    192        """ 
    193        inst = ( 
    194            self 
    195            if self.tzinfo is not None 
    196            else self.replace(tzinfo=datetime.timezone.utc) 
    197        ) 
    198        delta = inst - _UTC_EPOCH 
    199        seconds = int(delta.total_seconds()) 
    200        nanos = self._nanosecond or self.microsecond * 1000 
    201        return timestamp_pb2.Timestamp(seconds=seconds, nanos=nanos) 
    202 
    203    @classmethod 
    204    def from_timestamp_pb(cls, stamp): 
    205        """Parse RFC3339-compliant timestamp, preserving nanoseconds. 
    206 
    207        Args: 
    208            stamp (:class:`~google.protobuf.timestamp_pb2.Timestamp`): timestamp message 
    209 
    210        Returns: 
    211            :class:`DatetimeWithNanoseconds`: 
    212                an instance matching the timestamp message 
    213        """ 
    214        microseconds = int(stamp.seconds * 1e6) 
    215        bare = _from_microseconds(microseconds) 
    216        return cls( 
    217            bare.year, 
    218            bare.month, 
    219            bare.day, 
    220            bare.hour, 
    221            bare.minute, 
    222            bare.second, 
    223            nanosecond=stamp.nanos, 
    224            tzinfo=datetime.timezone.utc, 
    225        )