1# Copyright 2017 Google LLC 
    2# 
    3# Licensed under the Apache License, Version 2.0 (the "License"); 
    4# you may not use this file except in compliance with the License. 
    5# You may obtain a copy of the License at 
    6# 
    7#     http://www.apache.org/licenses/LICENSE-2.0 
    8# 
    9# Unless required by applicable law or agreed to in writing, software 
    10# distributed under the License is distributed on an "AS IS" BASIS, 
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
    12# See the License for the specific language governing permissions and 
    13# limitations under the License. 
    14 
    15"""Helpers for :mod:`datetime`.""" 
    16 
    17import calendar 
    18import datetime 
    19import re 
    20 
    21from google.protobuf import timestamp_pb2 
    22 
    23 
    24_UTC_EPOCH = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) 
    25_RFC3339_MICROS = "%Y-%m-%dT%H:%M:%S.%fZ" 
    26_RFC3339_NO_FRACTION = "%Y-%m-%dT%H:%M:%S" 
    27# datetime.strptime cannot handle nanosecond precision:  parse w/ regex 
    28_RFC3339_NANOS = re.compile( 
    29    r""" 
    30    (?P<no_fraction> 
    31        \d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}  # YYYY-MM-DDTHH:MM:SS 
    32    ) 
    33    (                                        # Optional decimal part 
    34     \.                                      # decimal point 
    35     (?P<nanos>\d{1,9})                      # nanoseconds, maybe truncated 
    36    )? 
    37    Z                                        # Zulu 
    38""", 
    39    re.VERBOSE, 
    40) 
    41 
    42 
    43def utcnow(): 
    44    """A :meth:`datetime.datetime.utcnow()` alias to allow mocking in tests.""" 
    45    return datetime.datetime.now(tz=datetime.timezone.utc).replace(tzinfo=None) 
    46 
    47 
    48def to_milliseconds(value): 
    49    """Convert a zone-aware datetime to milliseconds since the unix epoch. 
    50 
    51    Args: 
    52        value (datetime.datetime): The datetime to covert. 
    53 
    54    Returns: 
    55        int: Milliseconds since the unix epoch. 
    56    """ 
    57    micros = to_microseconds(value) 
    58    return micros // 1000 
    59 
    60 
    61def from_microseconds(value): 
    62    """Convert timestamp in microseconds since the unix epoch to datetime. 
    63 
    64    Args: 
    65        value (float): The timestamp to convert, in microseconds. 
    66 
    67    Returns: 
    68        datetime.datetime: The datetime object equivalent to the timestamp in 
    69            UTC. 
    70    """ 
    71    return _UTC_EPOCH + datetime.timedelta(microseconds=value) 
    72 
    73 
    74def to_microseconds(value): 
    75    """Convert a datetime to microseconds since the unix epoch. 
    76 
    77    Args: 
    78        value (datetime.datetime): The datetime to covert. 
    79 
    80    Returns: 
    81        int: Microseconds since the unix epoch. 
    82    """ 
    83    if not value.tzinfo: 
    84        value = value.replace(tzinfo=datetime.timezone.utc) 
    85    # Regardless of what timezone is on the value, convert it to UTC. 
    86    value = value.astimezone(datetime.timezone.utc) 
    87    # Convert the datetime to a microsecond timestamp. 
    88    return int(calendar.timegm(value.timetuple()) * 1e6) + value.microsecond 
    89 
    90 
    91def from_iso8601_date(value): 
    92    """Convert a ISO8601 date string to a date. 
    93 
    94    Args: 
    95        value (str): The ISO8601 date string. 
    96 
    97    Returns: 
    98        datetime.date: A date equivalent to the date string. 
    99    """ 
    100    return datetime.datetime.strptime(value, "%Y-%m-%d").date() 
    101 
    102 
    103def from_iso8601_time(value): 
    104    """Convert a zoneless ISO8601 time string to a time. 
    105 
    106    Args: 
    107        value (str): The ISO8601 time string. 
    108 
    109    Returns: 
    110        datetime.time: A time equivalent to the time string. 
    111    """ 
    112    return datetime.datetime.strptime(value, "%H:%M:%S").time() 
    113 
    114 
    115def from_rfc3339(value): 
    116    """Convert an RFC3339-format timestamp to a native datetime. 
    117 
    118    Supported formats include those without fractional seconds, or with 
    119    any fraction up to nanosecond precision. 
    120 
    121    .. note:: 
    122        Python datetimes do not support nanosecond precision; this function 
    123        therefore truncates such values to microseconds. 
    124 
    125    Args: 
    126        value (str): The RFC3339 string to convert. 
    127 
    128    Returns: 
    129        datetime.datetime: The datetime object equivalent to the timestamp 
    130        in UTC. 
    131 
    132    Raises: 
    133        ValueError: If the timestamp does not match the RFC3339 
    134            regular expression. 
    135    """ 
    136    with_nanos = _RFC3339_NANOS.match(value) 
    137 
    138    if with_nanos is None: 
    139        raise ValueError( 
    140            "Timestamp: {!r}, does not match pattern: {!r}".format( 
    141                value, _RFC3339_NANOS.pattern 
    142            ) 
    143        ) 
    144 
    145    bare_seconds = datetime.datetime.strptime( 
    146        with_nanos.group("no_fraction"), _RFC3339_NO_FRACTION 
    147    ) 
    148    fraction = with_nanos.group("nanos") 
    149 
    150    if fraction is None: 
    151        micros = 0 
    152    else: 
    153        scale = 9 - len(fraction) 
    154        nanos = int(fraction) * (10**scale) 
    155        micros = nanos // 1000 
    156 
    157    return bare_seconds.replace(microsecond=micros, tzinfo=datetime.timezone.utc) 
    158 
    159 
    160from_rfc3339_nanos = from_rfc3339  # from_rfc3339_nanos method was deprecated. 
    161 
    162 
    163def to_rfc3339(value, ignore_zone=True): 
    164    """Convert a datetime to an RFC3339 timestamp string. 
    165 
    166    Args: 
    167        value (datetime.datetime): 
    168            The datetime object to be converted to a string. 
    169        ignore_zone (bool): If True, then the timezone (if any) of the 
    170            datetime object is ignored and the datetime is treated as UTC. 
    171 
    172    Returns: 
    173        str: The RFC3339 formatted string representing the datetime. 
    174    """ 
    175    if not ignore_zone and value.tzinfo is not None: 
    176        # Convert to UTC and remove the time zone info. 
    177        value = value.replace(tzinfo=None) - value.utcoffset() 
    178 
    179    return value.strftime(_RFC3339_MICROS) 
    180 
    181 
    182class DatetimeWithNanoseconds(datetime.datetime): 
    183    """Track nanosecond in addition to normal datetime attrs. 
    184 
    185    Nanosecond can be passed only as a keyword argument. 
    186    """ 
    187 
    188    __slots__ = ("_nanosecond",) 
    189 
    190    # pylint: disable=arguments-differ 
    191    def __new__(cls, *args, **kw): 
    192        nanos = kw.pop("nanosecond", 0) 
    193        if nanos > 0: 
    194            if "microsecond" in kw: 
    195                raise TypeError("Specify only one of 'microsecond' or 'nanosecond'") 
    196            kw["microsecond"] = nanos // 1000 
    197        inst = datetime.datetime.__new__(cls, *args, **kw) 
    198        inst._nanosecond = nanos or 0 
    199        return inst 
    200 
    201    # pylint: disable=arguments-differ 
    202 
    203    @property 
    204    def nanosecond(self): 
    205        """Read-only: nanosecond precision.""" 
    206        return self._nanosecond 
    207 
    208    def rfc3339(self): 
    209        """Return an RFC3339-compliant timestamp. 
    210 
    211        Returns: 
    212            (str): Timestamp string according to RFC3339 spec. 
    213        """ 
    214        if self._nanosecond == 0: 
    215            return to_rfc3339(self) 
    216        nanos = str(self._nanosecond).rjust(9, "0").rstrip("0") 
    217        return "{}.{}Z".format(self.strftime(_RFC3339_NO_FRACTION), nanos) 
    218 
    219    @classmethod 
    220    def from_rfc3339(cls, stamp): 
    221        """Parse RFC3339-compliant timestamp, preserving nanoseconds. 
    222 
    223        Args: 
    224            stamp (str): RFC3339 stamp, with up to nanosecond precision 
    225 
    226        Returns: 
    227            :class:`DatetimeWithNanoseconds`: 
    228                an instance matching the timestamp string 
    229 
    230        Raises: 
    231            ValueError: if `stamp` does not match the expected format 
    232        """ 
    233        with_nanos = _RFC3339_NANOS.match(stamp) 
    234        if with_nanos is None: 
    235            raise ValueError( 
    236                "Timestamp: {}, does not match pattern: {}".format( 
    237                    stamp, _RFC3339_NANOS.pattern 
    238                ) 
    239            ) 
    240        bare = datetime.datetime.strptime( 
    241            with_nanos.group("no_fraction"), _RFC3339_NO_FRACTION 
    242        ) 
    243        fraction = with_nanos.group("nanos") 
    244        if fraction is None: 
    245            nanos = 0 
    246        else: 
    247            scale = 9 - len(fraction) 
    248            nanos = int(fraction) * (10**scale) 
    249        return cls( 
    250            bare.year, 
    251            bare.month, 
    252            bare.day, 
    253            bare.hour, 
    254            bare.minute, 
    255            bare.second, 
    256            nanosecond=nanos, 
    257            tzinfo=datetime.timezone.utc, 
    258        ) 
    259 
    260    def timestamp_pb(self): 
    261        """Return a timestamp message. 
    262 
    263        Returns: 
    264            (:class:`~google.protobuf.timestamp_pb2.Timestamp`): Timestamp message 
    265        """ 
    266        inst = ( 
    267            self 
    268            if self.tzinfo is not None 
    269            else self.replace(tzinfo=datetime.timezone.utc) 
    270        ) 
    271        delta = inst - _UTC_EPOCH 
    272        seconds = int(delta.total_seconds()) 
    273        nanos = self._nanosecond or self.microsecond * 1000 
    274        return timestamp_pb2.Timestamp(seconds=seconds, nanos=nanos) 
    275 
    276    @classmethod 
    277    def from_timestamp_pb(cls, stamp): 
    278        """Parse RFC3339-compliant timestamp, preserving nanoseconds. 
    279 
    280        Args: 
    281            stamp (:class:`~google.protobuf.timestamp_pb2.Timestamp`): timestamp message 
    282 
    283        Returns: 
    284            :class:`DatetimeWithNanoseconds`: 
    285                an instance matching the timestamp message 
    286        """ 
    287        microseconds = int(stamp.seconds * 1e6) 
    288        bare = from_microseconds(microseconds) 
    289        return cls( 
    290            bare.year, 
    291            bare.month, 
    292            bare.day, 
    293            bare.hour, 
    294            bare.minute, 
    295            bare.second, 
    296            nanosecond=stamp.nanos, 
    297            tzinfo=datetime.timezone.utc, 
    298        )