Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/protobuf/internal/well_known_types.py: 29%
291 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
1# Protocol Buffers - Google's data interchange format
2# Copyright 2008 Google Inc. All rights reserved.
3#
4# Use of this source code is governed by a BSD-style
5# license that can be found in the LICENSE file or at
6# https://developers.google.com/open-source/licenses/bsd
8"""Contains well known classes.
10This files defines well known classes which need extra maintenance including:
11 - Any
12 - Duration
13 - FieldMask
14 - Struct
15 - Timestamp
16"""
18__author__ = 'jieluo@google.com (Jie Luo)'
20import calendar
21import collections.abc
22import datetime
24from google.protobuf.internal import field_mask
26FieldMask = field_mask.FieldMask
28_TIMESTAMPFOMAT = '%Y-%m-%dT%H:%M:%S'
29_NANOS_PER_SECOND = 1000000000
30_NANOS_PER_MILLISECOND = 1000000
31_NANOS_PER_MICROSECOND = 1000
32_MILLIS_PER_SECOND = 1000
33_MICROS_PER_SECOND = 1000000
34_SECONDS_PER_DAY = 24 * 3600
35_DURATION_SECONDS_MAX = 315576000000
37_EPOCH_DATETIME_NAIVE = datetime.datetime(1970, 1, 1, tzinfo=None)
38_EPOCH_DATETIME_AWARE = _EPOCH_DATETIME_NAIVE.replace(
39 tzinfo=datetime.timezone.utc
40)
43class Any(object):
44 """Class for Any Message type."""
46 __slots__ = ()
48 def Pack(self, msg, type_url_prefix='type.googleapis.com/',
49 deterministic=None):
50 """Packs the specified message into current Any message."""
51 if len(type_url_prefix) < 1 or type_url_prefix[-1] != '/':
52 self.type_url = '%s/%s' % (type_url_prefix, msg.DESCRIPTOR.full_name)
53 else:
54 self.type_url = '%s%s' % (type_url_prefix, msg.DESCRIPTOR.full_name)
55 self.value = msg.SerializeToString(deterministic=deterministic)
57 def Unpack(self, msg):
58 """Unpacks the current Any message into specified message."""
59 descriptor = msg.DESCRIPTOR
60 if not self.Is(descriptor):
61 return False
62 msg.ParseFromString(self.value)
63 return True
65 def TypeName(self):
66 """Returns the protobuf type name of the inner message."""
67 # Only last part is to be used: b/25630112
68 return self.type_url.split('/')[-1]
70 def Is(self, descriptor):
71 """Checks if this Any represents the given protobuf type."""
72 return '/' in self.type_url and self.TypeName() == descriptor.full_name
75class Timestamp(object):
76 """Class for Timestamp message type."""
78 __slots__ = ()
80 def ToJsonString(self):
81 """Converts Timestamp to RFC 3339 date string format.
83 Returns:
84 A string converted from timestamp. The string is always Z-normalized
85 and uses 3, 6 or 9 fractional digits as required to represent the
86 exact time. Example of the return format: '1972-01-01T10:00:20.021Z'
87 """
88 nanos = self.nanos % _NANOS_PER_SECOND
89 total_sec = self.seconds + (self.nanos - nanos) // _NANOS_PER_SECOND
90 seconds = total_sec % _SECONDS_PER_DAY
91 days = (total_sec - seconds) // _SECONDS_PER_DAY
92 dt = datetime.datetime(1970, 1, 1) + datetime.timedelta(days, seconds)
94 result = dt.isoformat()
95 if (nanos % 1e9) == 0:
96 # If there are 0 fractional digits, the fractional
97 # point '.' should be omitted when serializing.
98 return result + 'Z'
99 if (nanos % 1e6) == 0:
100 # Serialize 3 fractional digits.
101 return result + '.%03dZ' % (nanos / 1e6)
102 if (nanos % 1e3) == 0:
103 # Serialize 6 fractional digits.
104 return result + '.%06dZ' % (nanos / 1e3)
105 # Serialize 9 fractional digits.
106 return result + '.%09dZ' % nanos
108 def FromJsonString(self, value):
109 """Parse a RFC 3339 date string format to Timestamp.
111 Args:
112 value: A date string. Any fractional digits (or none) and any offset are
113 accepted as long as they fit into nano-seconds precision.
114 Example of accepted format: '1972-01-01T10:00:20.021-05:00'
116 Raises:
117 ValueError: On parsing problems.
118 """
119 if not isinstance(value, str):
120 raise ValueError('Timestamp JSON value not a string: {!r}'.format(value))
121 timezone_offset = value.find('Z')
122 if timezone_offset == -1:
123 timezone_offset = value.find('+')
124 if timezone_offset == -1:
125 timezone_offset = value.rfind('-')
126 if timezone_offset == -1:
127 raise ValueError(
128 'Failed to parse timestamp: missing valid timezone offset.')
129 time_value = value[0:timezone_offset]
130 # Parse datetime and nanos.
131 point_position = time_value.find('.')
132 if point_position == -1:
133 second_value = time_value
134 nano_value = ''
135 else:
136 second_value = time_value[:point_position]
137 nano_value = time_value[point_position + 1:]
138 if 't' in second_value:
139 raise ValueError(
140 'time data \'{0}\' does not match format \'%Y-%m-%dT%H:%M:%S\', '
141 'lowercase \'t\' is not accepted'.format(second_value))
142 date_object = datetime.datetime.strptime(second_value, _TIMESTAMPFOMAT)
143 td = date_object - datetime.datetime(1970, 1, 1)
144 seconds = td.seconds + td.days * _SECONDS_PER_DAY
145 if len(nano_value) > 9:
146 raise ValueError(
147 'Failed to parse Timestamp: nanos {0} more than '
148 '9 fractional digits.'.format(nano_value))
149 if nano_value:
150 nanos = round(float('0.' + nano_value) * 1e9)
151 else:
152 nanos = 0
153 # Parse timezone offsets.
154 if value[timezone_offset] == 'Z':
155 if len(value) != timezone_offset + 1:
156 raise ValueError('Failed to parse timestamp: invalid trailing'
157 ' data {0}.'.format(value))
158 else:
159 timezone = value[timezone_offset:]
160 pos = timezone.find(':')
161 if pos == -1:
162 raise ValueError(
163 'Invalid timezone offset value: {0}.'.format(timezone))
164 if timezone[0] == '+':
165 seconds -= (int(timezone[1:pos])*60+int(timezone[pos+1:]))*60
166 else:
167 seconds += (int(timezone[1:pos])*60+int(timezone[pos+1:]))*60
168 # Set seconds and nanos
169 self.seconds = int(seconds)
170 self.nanos = int(nanos)
172 def GetCurrentTime(self):
173 """Get the current UTC into Timestamp."""
174 self.FromDatetime(datetime.datetime.utcnow())
176 def ToNanoseconds(self):
177 """Converts Timestamp to nanoseconds since epoch."""
178 return self.seconds * _NANOS_PER_SECOND + self.nanos
180 def ToMicroseconds(self):
181 """Converts Timestamp to microseconds since epoch."""
182 return (self.seconds * _MICROS_PER_SECOND +
183 self.nanos // _NANOS_PER_MICROSECOND)
185 def ToMilliseconds(self):
186 """Converts Timestamp to milliseconds since epoch."""
187 return (self.seconds * _MILLIS_PER_SECOND +
188 self.nanos // _NANOS_PER_MILLISECOND)
190 def ToSeconds(self):
191 """Converts Timestamp to seconds since epoch."""
192 return self.seconds
194 def FromNanoseconds(self, nanos):
195 """Converts nanoseconds since epoch to Timestamp."""
196 self.seconds = nanos // _NANOS_PER_SECOND
197 self.nanos = nanos % _NANOS_PER_SECOND
199 def FromMicroseconds(self, micros):
200 """Converts microseconds since epoch to Timestamp."""
201 self.seconds = micros // _MICROS_PER_SECOND
202 self.nanos = (micros % _MICROS_PER_SECOND) * _NANOS_PER_MICROSECOND
204 def FromMilliseconds(self, millis):
205 """Converts milliseconds since epoch to Timestamp."""
206 self.seconds = millis // _MILLIS_PER_SECOND
207 self.nanos = (millis % _MILLIS_PER_SECOND) * _NANOS_PER_MILLISECOND
209 def FromSeconds(self, seconds):
210 """Converts seconds since epoch to Timestamp."""
211 self.seconds = seconds
212 self.nanos = 0
214 def ToDatetime(self, tzinfo=None):
215 """Converts Timestamp to a datetime.
217 Args:
218 tzinfo: A datetime.tzinfo subclass; defaults to None.
220 Returns:
221 If tzinfo is None, returns a timezone-naive UTC datetime (with no timezone
222 information, i.e. not aware that it's UTC).
224 Otherwise, returns a timezone-aware datetime in the input timezone.
225 """
226 # Using datetime.fromtimestamp for this would avoid constructing an extra
227 # timedelta object and possibly an extra datetime. Unfortuantely, that has
228 # the disadvantage of not handling the full precision (on all platforms, see
229 # https://github.com/python/cpython/issues/109849) or full range (on some
230 # platforms, see https://github.com/python/cpython/issues/110042) of
231 # datetime.
232 delta = datetime.timedelta(
233 seconds=self.seconds,
234 microseconds=_RoundTowardZero(self.nanos, _NANOS_PER_MICROSECOND),
235 )
236 if tzinfo is None:
237 return _EPOCH_DATETIME_NAIVE + delta
238 else:
239 # Note the tz conversion has to come after the timedelta arithmetic.
240 return (_EPOCH_DATETIME_AWARE + delta).astimezone(tzinfo)
242 def FromDatetime(self, dt):
243 """Converts datetime to Timestamp.
245 Args:
246 dt: A datetime. If it's timezone-naive, it's assumed to be in UTC.
247 """
248 # Using this guide: http://wiki.python.org/moin/WorkingWithTime
249 # And this conversion guide: http://docs.python.org/library/time.html
251 # Turn the date parameter into a tuple (struct_time) that can then be
252 # manipulated into a long value of seconds. During the conversion from
253 # struct_time to long, the source date in UTC, and so it follows that the
254 # correct transformation is calendar.timegm()
255 self.seconds = calendar.timegm(dt.utctimetuple())
256 self.nanos = dt.microsecond * _NANOS_PER_MICROSECOND
259class Duration(object):
260 """Class for Duration message type."""
262 __slots__ = ()
264 def ToJsonString(self):
265 """Converts Duration to string format.
267 Returns:
268 A string converted from self. The string format will contains
269 3, 6, or 9 fractional digits depending on the precision required to
270 represent the exact Duration value. For example: "1s", "1.010s",
271 "1.000000100s", "-3.100s"
272 """
273 _CheckDurationValid(self.seconds, self.nanos)
274 if self.seconds < 0 or self.nanos < 0:
275 result = '-'
276 seconds = - self.seconds + int((0 - self.nanos) // 1e9)
277 nanos = (0 - self.nanos) % 1e9
278 else:
279 result = ''
280 seconds = self.seconds + int(self.nanos // 1e9)
281 nanos = self.nanos % 1e9
282 result += '%d' % seconds
283 if (nanos % 1e9) == 0:
284 # If there are 0 fractional digits, the fractional
285 # point '.' should be omitted when serializing.
286 return result + 's'
287 if (nanos % 1e6) == 0:
288 # Serialize 3 fractional digits.
289 return result + '.%03ds' % (nanos / 1e6)
290 if (nanos % 1e3) == 0:
291 # Serialize 6 fractional digits.
292 return result + '.%06ds' % (nanos / 1e3)
293 # Serialize 9 fractional digits.
294 return result + '.%09ds' % nanos
296 def FromJsonString(self, value):
297 """Converts a string to Duration.
299 Args:
300 value: A string to be converted. The string must end with 's'. Any
301 fractional digits (or none) are accepted as long as they fit into
302 precision. For example: "1s", "1.01s", "1.0000001s", "-3.100s
304 Raises:
305 ValueError: On parsing problems.
306 """
307 if not isinstance(value, str):
308 raise ValueError('Duration JSON value not a string: {!r}'.format(value))
309 if len(value) < 1 or value[-1] != 's':
310 raise ValueError(
311 'Duration must end with letter "s": {0}.'.format(value))
312 try:
313 pos = value.find('.')
314 if pos == -1:
315 seconds = int(value[:-1])
316 nanos = 0
317 else:
318 seconds = int(value[:pos])
319 if value[0] == '-':
320 nanos = int(round(float('-0{0}'.format(value[pos: -1])) *1e9))
321 else:
322 nanos = int(round(float('0{0}'.format(value[pos: -1])) *1e9))
323 _CheckDurationValid(seconds, nanos)
324 self.seconds = seconds
325 self.nanos = nanos
326 except ValueError as e:
327 raise ValueError(
328 'Couldn\'t parse duration: {0} : {1}.'.format(value, e))
330 def ToNanoseconds(self):
331 """Converts a Duration to nanoseconds."""
332 return self.seconds * _NANOS_PER_SECOND + self.nanos
334 def ToMicroseconds(self):
335 """Converts a Duration to microseconds."""
336 micros = _RoundTowardZero(self.nanos, _NANOS_PER_MICROSECOND)
337 return self.seconds * _MICROS_PER_SECOND + micros
339 def ToMilliseconds(self):
340 """Converts a Duration to milliseconds."""
341 millis = _RoundTowardZero(self.nanos, _NANOS_PER_MILLISECOND)
342 return self.seconds * _MILLIS_PER_SECOND + millis
344 def ToSeconds(self):
345 """Converts a Duration to seconds."""
346 return self.seconds
348 def FromNanoseconds(self, nanos):
349 """Converts nanoseconds to Duration."""
350 self._NormalizeDuration(nanos // _NANOS_PER_SECOND,
351 nanos % _NANOS_PER_SECOND)
353 def FromMicroseconds(self, micros):
354 """Converts microseconds to Duration."""
355 self._NormalizeDuration(
356 micros // _MICROS_PER_SECOND,
357 (micros % _MICROS_PER_SECOND) * _NANOS_PER_MICROSECOND)
359 def FromMilliseconds(self, millis):
360 """Converts milliseconds to Duration."""
361 self._NormalizeDuration(
362 millis // _MILLIS_PER_SECOND,
363 (millis % _MILLIS_PER_SECOND) * _NANOS_PER_MILLISECOND)
365 def FromSeconds(self, seconds):
366 """Converts seconds to Duration."""
367 self.seconds = seconds
368 self.nanos = 0
370 def ToTimedelta(self):
371 """Converts Duration to timedelta."""
372 return datetime.timedelta(
373 seconds=self.seconds, microseconds=_RoundTowardZero(
374 self.nanos, _NANOS_PER_MICROSECOND))
376 def FromTimedelta(self, td):
377 """Converts timedelta to Duration."""
378 self._NormalizeDuration(td.seconds + td.days * _SECONDS_PER_DAY,
379 td.microseconds * _NANOS_PER_MICROSECOND)
381 def _NormalizeDuration(self, seconds, nanos):
382 """Set Duration by seconds and nanos."""
383 # Force nanos to be negative if the duration is negative.
384 if seconds < 0 and nanos > 0:
385 seconds += 1
386 nanos -= _NANOS_PER_SECOND
387 self.seconds = seconds
388 self.nanos = nanos
391def _CheckDurationValid(seconds, nanos):
392 if seconds < -_DURATION_SECONDS_MAX or seconds > _DURATION_SECONDS_MAX:
393 raise ValueError(
394 'Duration is not valid: Seconds {0} must be in range '
395 '[-315576000000, 315576000000].'.format(seconds))
396 if nanos <= -_NANOS_PER_SECOND or nanos >= _NANOS_PER_SECOND:
397 raise ValueError(
398 'Duration is not valid: Nanos {0} must be in range '
399 '[-999999999, 999999999].'.format(nanos))
400 if (nanos < 0 and seconds > 0) or (nanos > 0 and seconds < 0):
401 raise ValueError(
402 'Duration is not valid: Sign mismatch.')
405def _RoundTowardZero(value, divider):
406 """Truncates the remainder part after division."""
407 # For some languages, the sign of the remainder is implementation
408 # dependent if any of the operands is negative. Here we enforce
409 # "rounded toward zero" semantics. For example, for (-5) / 2 an
410 # implementation may give -3 as the result with the remainder being
411 # 1. This function ensures we always return -2 (closer to zero).
412 result = value // divider
413 remainder = value % divider
414 if result < 0 and remainder > 0:
415 return result + 1
416 else:
417 return result
420def _SetStructValue(struct_value, value):
421 if value is None:
422 struct_value.null_value = 0
423 elif isinstance(value, bool):
424 # Note: this check must come before the number check because in Python
425 # True and False are also considered numbers.
426 struct_value.bool_value = value
427 elif isinstance(value, str):
428 struct_value.string_value = value
429 elif isinstance(value, (int, float)):
430 struct_value.number_value = value
431 elif isinstance(value, (dict, Struct)):
432 struct_value.struct_value.Clear()
433 struct_value.struct_value.update(value)
434 elif isinstance(value, (list, tuple, ListValue)):
435 struct_value.list_value.Clear()
436 struct_value.list_value.extend(value)
437 else:
438 raise ValueError('Unexpected type')
441def _GetStructValue(struct_value):
442 which = struct_value.WhichOneof('kind')
443 if which == 'struct_value':
444 return struct_value.struct_value
445 elif which == 'null_value':
446 return None
447 elif which == 'number_value':
448 return struct_value.number_value
449 elif which == 'string_value':
450 return struct_value.string_value
451 elif which == 'bool_value':
452 return struct_value.bool_value
453 elif which == 'list_value':
454 return struct_value.list_value
455 elif which is None:
456 raise ValueError('Value not set')
459class Struct(object):
460 """Class for Struct message type."""
462 __slots__ = ()
464 def __getitem__(self, key):
465 return _GetStructValue(self.fields[key])
467 def __contains__(self, item):
468 return item in self.fields
470 def __setitem__(self, key, value):
471 _SetStructValue(self.fields[key], value)
473 def __delitem__(self, key):
474 del self.fields[key]
476 def __len__(self):
477 return len(self.fields)
479 def __iter__(self):
480 return iter(self.fields)
482 def keys(self): # pylint: disable=invalid-name
483 return self.fields.keys()
485 def values(self): # pylint: disable=invalid-name
486 return [self[key] for key in self]
488 def items(self): # pylint: disable=invalid-name
489 return [(key, self[key]) for key in self]
491 def get_or_create_list(self, key):
492 """Returns a list for this key, creating if it didn't exist already."""
493 if not self.fields[key].HasField('list_value'):
494 # Clear will mark list_value modified which will indeed create a list.
495 self.fields[key].list_value.Clear()
496 return self.fields[key].list_value
498 def get_or_create_struct(self, key):
499 """Returns a struct for this key, creating if it didn't exist already."""
500 if not self.fields[key].HasField('struct_value'):
501 # Clear will mark struct_value modified which will indeed create a struct.
502 self.fields[key].struct_value.Clear()
503 return self.fields[key].struct_value
505 def update(self, dictionary): # pylint: disable=invalid-name
506 for key, value in dictionary.items():
507 _SetStructValue(self.fields[key], value)
509collections.abc.MutableMapping.register(Struct)
512class ListValue(object):
513 """Class for ListValue message type."""
515 __slots__ = ()
517 def __len__(self):
518 return len(self.values)
520 def append(self, value):
521 _SetStructValue(self.values.add(), value)
523 def extend(self, elem_seq):
524 for value in elem_seq:
525 self.append(value)
527 def __getitem__(self, index):
528 """Retrieves item by the specified index."""
529 return _GetStructValue(self.values.__getitem__(index))
531 def __setitem__(self, index, value):
532 _SetStructValue(self.values.__getitem__(index), value)
534 def __delitem__(self, key):
535 del self.values[key]
537 def items(self):
538 for i in range(len(self)):
539 yield self[i]
541 def add_struct(self):
542 """Appends and returns a struct value as the next value in the list."""
543 struct_value = self.values.add().struct_value
544 # Clear will mark struct_value modified which will indeed create a struct.
545 struct_value.Clear()
546 return struct_value
548 def add_list(self):
549 """Appends and returns a list value as the next value in the list."""
550 list_value = self.values.add().list_value
551 # Clear will mark list_value modified which will indeed create a list.
552 list_value.Clear()
553 return list_value
555collections.abc.MutableSequence.register(ListValue)
558# LINT.IfChange(wktbases)
559WKTBASES = {
560 'google.protobuf.Any': Any,
561 'google.protobuf.Duration': Duration,
562 'google.protobuf.FieldMask': FieldMask,
563 'google.protobuf.ListValue': ListValue,
564 'google.protobuf.Struct': Struct,
565 'google.protobuf.Timestamp': Timestamp,
566}
567# LINT.ThenChange(//depot/google.protobuf/compiler/python/pyi_generator.cc:wktbases)