Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/google/protobuf/internal/well_known_types.py: 29%

291 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:40 +0000

1# Protocol Buffers - Google's data interchange format 

2# Copyright 2008 Google Inc. All rights reserved. 

3# 

4# Use of this source code is governed by a BSD-style 

5# license that can be found in the LICENSE file or at 

6# https://developers.google.com/open-source/licenses/bsd 

7 

8"""Contains well known classes. 

9 

10This files defines well known classes which need extra maintenance including: 

11 - Any 

12 - Duration 

13 - FieldMask 

14 - Struct 

15 - Timestamp 

16""" 

17 

18__author__ = 'jieluo@google.com (Jie Luo)' 

19 

20import calendar 

21import collections.abc 

22import datetime 

23 

24from google.protobuf.internal import field_mask 

25 

26FieldMask = field_mask.FieldMask 

27 

28_TIMESTAMPFOMAT = '%Y-%m-%dT%H:%M:%S' 

29_NANOS_PER_SECOND = 1000000000 

30_NANOS_PER_MILLISECOND = 1000000 

31_NANOS_PER_MICROSECOND = 1000 

32_MILLIS_PER_SECOND = 1000 

33_MICROS_PER_SECOND = 1000000 

34_SECONDS_PER_DAY = 24 * 3600 

35_DURATION_SECONDS_MAX = 315576000000 

36 

37_EPOCH_DATETIME_NAIVE = datetime.datetime(1970, 1, 1, tzinfo=None) 

38_EPOCH_DATETIME_AWARE = _EPOCH_DATETIME_NAIVE.replace( 

39 tzinfo=datetime.timezone.utc 

40) 

41 

42 

43class Any(object): 

44 """Class for Any Message type.""" 

45 

46 __slots__ = () 

47 

48 def Pack(self, msg, type_url_prefix='type.googleapis.com/', 

49 deterministic=None): 

50 """Packs the specified message into current Any message.""" 

51 if len(type_url_prefix) < 1 or type_url_prefix[-1] != '/': 

52 self.type_url = '%s/%s' % (type_url_prefix, msg.DESCRIPTOR.full_name) 

53 else: 

54 self.type_url = '%s%s' % (type_url_prefix, msg.DESCRIPTOR.full_name) 

55 self.value = msg.SerializeToString(deterministic=deterministic) 

56 

57 def Unpack(self, msg): 

58 """Unpacks the current Any message into specified message.""" 

59 descriptor = msg.DESCRIPTOR 

60 if not self.Is(descriptor): 

61 return False 

62 msg.ParseFromString(self.value) 

63 return True 

64 

65 def TypeName(self): 

66 """Returns the protobuf type name of the inner message.""" 

67 # Only last part is to be used: b/25630112 

68 return self.type_url.split('/')[-1] 

69 

70 def Is(self, descriptor): 

71 """Checks if this Any represents the given protobuf type.""" 

72 return '/' in self.type_url and self.TypeName() == descriptor.full_name 

73 

74 

75class Timestamp(object): 

76 """Class for Timestamp message type.""" 

77 

78 __slots__ = () 

79 

80 def ToJsonString(self): 

81 """Converts Timestamp to RFC 3339 date string format. 

82 

83 Returns: 

84 A string converted from timestamp. The string is always Z-normalized 

85 and uses 3, 6 or 9 fractional digits as required to represent the 

86 exact time. Example of the return format: '1972-01-01T10:00:20.021Z' 

87 """ 

88 nanos = self.nanos % _NANOS_PER_SECOND 

89 total_sec = self.seconds + (self.nanos - nanos) // _NANOS_PER_SECOND 

90 seconds = total_sec % _SECONDS_PER_DAY 

91 days = (total_sec - seconds) // _SECONDS_PER_DAY 

92 dt = datetime.datetime(1970, 1, 1) + datetime.timedelta(days, seconds) 

93 

94 result = dt.isoformat() 

95 if (nanos % 1e9) == 0: 

96 # If there are 0 fractional digits, the fractional 

97 # point '.' should be omitted when serializing. 

98 return result + 'Z' 

99 if (nanos % 1e6) == 0: 

100 # Serialize 3 fractional digits. 

101 return result + '.%03dZ' % (nanos / 1e6) 

102 if (nanos % 1e3) == 0: 

103 # Serialize 6 fractional digits. 

104 return result + '.%06dZ' % (nanos / 1e3) 

105 # Serialize 9 fractional digits. 

106 return result + '.%09dZ' % nanos 

107 

108 def FromJsonString(self, value): 

109 """Parse a RFC 3339 date string format to Timestamp. 

110 

111 Args: 

112 value: A date string. Any fractional digits (or none) and any offset are 

113 accepted as long as they fit into nano-seconds precision. 

114 Example of accepted format: '1972-01-01T10:00:20.021-05:00' 

115 

116 Raises: 

117 ValueError: On parsing problems. 

118 """ 

119 if not isinstance(value, str): 

120 raise ValueError('Timestamp JSON value not a string: {!r}'.format(value)) 

121 timezone_offset = value.find('Z') 

122 if timezone_offset == -1: 

123 timezone_offset = value.find('+') 

124 if timezone_offset == -1: 

125 timezone_offset = value.rfind('-') 

126 if timezone_offset == -1: 

127 raise ValueError( 

128 'Failed to parse timestamp: missing valid timezone offset.') 

129 time_value = value[0:timezone_offset] 

130 # Parse datetime and nanos. 

131 point_position = time_value.find('.') 

132 if point_position == -1: 

133 second_value = time_value 

134 nano_value = '' 

135 else: 

136 second_value = time_value[:point_position] 

137 nano_value = time_value[point_position + 1:] 

138 if 't' in second_value: 

139 raise ValueError( 

140 'time data \'{0}\' does not match format \'%Y-%m-%dT%H:%M:%S\', ' 

141 'lowercase \'t\' is not accepted'.format(second_value)) 

142 date_object = datetime.datetime.strptime(second_value, _TIMESTAMPFOMAT) 

143 td = date_object - datetime.datetime(1970, 1, 1) 

144 seconds = td.seconds + td.days * _SECONDS_PER_DAY 

145 if len(nano_value) > 9: 

146 raise ValueError( 

147 'Failed to parse Timestamp: nanos {0} more than ' 

148 '9 fractional digits.'.format(nano_value)) 

149 if nano_value: 

150 nanos = round(float('0.' + nano_value) * 1e9) 

151 else: 

152 nanos = 0 

153 # Parse timezone offsets. 

154 if value[timezone_offset] == 'Z': 

155 if len(value) != timezone_offset + 1: 

156 raise ValueError('Failed to parse timestamp: invalid trailing' 

157 ' data {0}.'.format(value)) 

158 else: 

159 timezone = value[timezone_offset:] 

160 pos = timezone.find(':') 

161 if pos == -1: 

162 raise ValueError( 

163 'Invalid timezone offset value: {0}.'.format(timezone)) 

164 if timezone[0] == '+': 

165 seconds -= (int(timezone[1:pos])*60+int(timezone[pos+1:]))*60 

166 else: 

167 seconds += (int(timezone[1:pos])*60+int(timezone[pos+1:]))*60 

168 # Set seconds and nanos 

169 self.seconds = int(seconds) 

170 self.nanos = int(nanos) 

171 

172 def GetCurrentTime(self): 

173 """Get the current UTC into Timestamp.""" 

174 self.FromDatetime(datetime.datetime.utcnow()) 

175 

176 def ToNanoseconds(self): 

177 """Converts Timestamp to nanoseconds since epoch.""" 

178 return self.seconds * _NANOS_PER_SECOND + self.nanos 

179 

180 def ToMicroseconds(self): 

181 """Converts Timestamp to microseconds since epoch.""" 

182 return (self.seconds * _MICROS_PER_SECOND + 

183 self.nanos // _NANOS_PER_MICROSECOND) 

184 

185 def ToMilliseconds(self): 

186 """Converts Timestamp to milliseconds since epoch.""" 

187 return (self.seconds * _MILLIS_PER_SECOND + 

188 self.nanos // _NANOS_PER_MILLISECOND) 

189 

190 def ToSeconds(self): 

191 """Converts Timestamp to seconds since epoch.""" 

192 return self.seconds 

193 

194 def FromNanoseconds(self, nanos): 

195 """Converts nanoseconds since epoch to Timestamp.""" 

196 self.seconds = nanos // _NANOS_PER_SECOND 

197 self.nanos = nanos % _NANOS_PER_SECOND 

198 

199 def FromMicroseconds(self, micros): 

200 """Converts microseconds since epoch to Timestamp.""" 

201 self.seconds = micros // _MICROS_PER_SECOND 

202 self.nanos = (micros % _MICROS_PER_SECOND) * _NANOS_PER_MICROSECOND 

203 

204 def FromMilliseconds(self, millis): 

205 """Converts milliseconds since epoch to Timestamp.""" 

206 self.seconds = millis // _MILLIS_PER_SECOND 

207 self.nanos = (millis % _MILLIS_PER_SECOND) * _NANOS_PER_MILLISECOND 

208 

209 def FromSeconds(self, seconds): 

210 """Converts seconds since epoch to Timestamp.""" 

211 self.seconds = seconds 

212 self.nanos = 0 

213 

214 def ToDatetime(self, tzinfo=None): 

215 """Converts Timestamp to a datetime. 

216 

217 Args: 

218 tzinfo: A datetime.tzinfo subclass; defaults to None. 

219 

220 Returns: 

221 If tzinfo is None, returns a timezone-naive UTC datetime (with no timezone 

222 information, i.e. not aware that it's UTC). 

223 

224 Otherwise, returns a timezone-aware datetime in the input timezone. 

225 """ 

226 # Using datetime.fromtimestamp for this would avoid constructing an extra 

227 # timedelta object and possibly an extra datetime. Unfortuantely, that has 

228 # the disadvantage of not handling the full precision (on all platforms, see 

229 # https://github.com/python/cpython/issues/109849) or full range (on some 

230 # platforms, see https://github.com/python/cpython/issues/110042) of 

231 # datetime. 

232 delta = datetime.timedelta( 

233 seconds=self.seconds, 

234 microseconds=_RoundTowardZero(self.nanos, _NANOS_PER_MICROSECOND), 

235 ) 

236 if tzinfo is None: 

237 return _EPOCH_DATETIME_NAIVE + delta 

238 else: 

239 # Note the tz conversion has to come after the timedelta arithmetic. 

240 return (_EPOCH_DATETIME_AWARE + delta).astimezone(tzinfo) 

241 

242 def FromDatetime(self, dt): 

243 """Converts datetime to Timestamp. 

244 

245 Args: 

246 dt: A datetime. If it's timezone-naive, it's assumed to be in UTC. 

247 """ 

248 # Using this guide: http://wiki.python.org/moin/WorkingWithTime 

249 # And this conversion guide: http://docs.python.org/library/time.html 

250 

251 # Turn the date parameter into a tuple (struct_time) that can then be 

252 # manipulated into a long value of seconds. During the conversion from 

253 # struct_time to long, the source date in UTC, and so it follows that the 

254 # correct transformation is calendar.timegm() 

255 self.seconds = calendar.timegm(dt.utctimetuple()) 

256 self.nanos = dt.microsecond * _NANOS_PER_MICROSECOND 

257 

258 

259class Duration(object): 

260 """Class for Duration message type.""" 

261 

262 __slots__ = () 

263 

264 def ToJsonString(self): 

265 """Converts Duration to string format. 

266 

267 Returns: 

268 A string converted from self. The string format will contains 

269 3, 6, or 9 fractional digits depending on the precision required to 

270 represent the exact Duration value. For example: "1s", "1.010s", 

271 "1.000000100s", "-3.100s" 

272 """ 

273 _CheckDurationValid(self.seconds, self.nanos) 

274 if self.seconds < 0 or self.nanos < 0: 

275 result = '-' 

276 seconds = - self.seconds + int((0 - self.nanos) // 1e9) 

277 nanos = (0 - self.nanos) % 1e9 

278 else: 

279 result = '' 

280 seconds = self.seconds + int(self.nanos // 1e9) 

281 nanos = self.nanos % 1e9 

282 result += '%d' % seconds 

283 if (nanos % 1e9) == 0: 

284 # If there are 0 fractional digits, the fractional 

285 # point '.' should be omitted when serializing. 

286 return result + 's' 

287 if (nanos % 1e6) == 0: 

288 # Serialize 3 fractional digits. 

289 return result + '.%03ds' % (nanos / 1e6) 

290 if (nanos % 1e3) == 0: 

291 # Serialize 6 fractional digits. 

292 return result + '.%06ds' % (nanos / 1e3) 

293 # Serialize 9 fractional digits. 

294 return result + '.%09ds' % nanos 

295 

296 def FromJsonString(self, value): 

297 """Converts a string to Duration. 

298 

299 Args: 

300 value: A string to be converted. The string must end with 's'. Any 

301 fractional digits (or none) are accepted as long as they fit into 

302 precision. For example: "1s", "1.01s", "1.0000001s", "-3.100s 

303 

304 Raises: 

305 ValueError: On parsing problems. 

306 """ 

307 if not isinstance(value, str): 

308 raise ValueError('Duration JSON value not a string: {!r}'.format(value)) 

309 if len(value) < 1 or value[-1] != 's': 

310 raise ValueError( 

311 'Duration must end with letter "s": {0}.'.format(value)) 

312 try: 

313 pos = value.find('.') 

314 if pos == -1: 

315 seconds = int(value[:-1]) 

316 nanos = 0 

317 else: 

318 seconds = int(value[:pos]) 

319 if value[0] == '-': 

320 nanos = int(round(float('-0{0}'.format(value[pos: -1])) *1e9)) 

321 else: 

322 nanos = int(round(float('0{0}'.format(value[pos: -1])) *1e9)) 

323 _CheckDurationValid(seconds, nanos) 

324 self.seconds = seconds 

325 self.nanos = nanos 

326 except ValueError as e: 

327 raise ValueError( 

328 'Couldn\'t parse duration: {0} : {1}.'.format(value, e)) 

329 

330 def ToNanoseconds(self): 

331 """Converts a Duration to nanoseconds.""" 

332 return self.seconds * _NANOS_PER_SECOND + self.nanos 

333 

334 def ToMicroseconds(self): 

335 """Converts a Duration to microseconds.""" 

336 micros = _RoundTowardZero(self.nanos, _NANOS_PER_MICROSECOND) 

337 return self.seconds * _MICROS_PER_SECOND + micros 

338 

339 def ToMilliseconds(self): 

340 """Converts a Duration to milliseconds.""" 

341 millis = _RoundTowardZero(self.nanos, _NANOS_PER_MILLISECOND) 

342 return self.seconds * _MILLIS_PER_SECOND + millis 

343 

344 def ToSeconds(self): 

345 """Converts a Duration to seconds.""" 

346 return self.seconds 

347 

348 def FromNanoseconds(self, nanos): 

349 """Converts nanoseconds to Duration.""" 

350 self._NormalizeDuration(nanos // _NANOS_PER_SECOND, 

351 nanos % _NANOS_PER_SECOND) 

352 

353 def FromMicroseconds(self, micros): 

354 """Converts microseconds to Duration.""" 

355 self._NormalizeDuration( 

356 micros // _MICROS_PER_SECOND, 

357 (micros % _MICROS_PER_SECOND) * _NANOS_PER_MICROSECOND) 

358 

359 def FromMilliseconds(self, millis): 

360 """Converts milliseconds to Duration.""" 

361 self._NormalizeDuration( 

362 millis // _MILLIS_PER_SECOND, 

363 (millis % _MILLIS_PER_SECOND) * _NANOS_PER_MILLISECOND) 

364 

365 def FromSeconds(self, seconds): 

366 """Converts seconds to Duration.""" 

367 self.seconds = seconds 

368 self.nanos = 0 

369 

370 def ToTimedelta(self): 

371 """Converts Duration to timedelta.""" 

372 return datetime.timedelta( 

373 seconds=self.seconds, microseconds=_RoundTowardZero( 

374 self.nanos, _NANOS_PER_MICROSECOND)) 

375 

376 def FromTimedelta(self, td): 

377 """Converts timedelta to Duration.""" 

378 self._NormalizeDuration(td.seconds + td.days * _SECONDS_PER_DAY, 

379 td.microseconds * _NANOS_PER_MICROSECOND) 

380 

381 def _NormalizeDuration(self, seconds, nanos): 

382 """Set Duration by seconds and nanos.""" 

383 # Force nanos to be negative if the duration is negative. 

384 if seconds < 0 and nanos > 0: 

385 seconds += 1 

386 nanos -= _NANOS_PER_SECOND 

387 self.seconds = seconds 

388 self.nanos = nanos 

389 

390 

391def _CheckDurationValid(seconds, nanos): 

392 if seconds < -_DURATION_SECONDS_MAX or seconds > _DURATION_SECONDS_MAX: 

393 raise ValueError( 

394 'Duration is not valid: Seconds {0} must be in range ' 

395 '[-315576000000, 315576000000].'.format(seconds)) 

396 if nanos <= -_NANOS_PER_SECOND or nanos >= _NANOS_PER_SECOND: 

397 raise ValueError( 

398 'Duration is not valid: Nanos {0} must be in range ' 

399 '[-999999999, 999999999].'.format(nanos)) 

400 if (nanos < 0 and seconds > 0) or (nanos > 0 and seconds < 0): 

401 raise ValueError( 

402 'Duration is not valid: Sign mismatch.') 

403 

404 

405def _RoundTowardZero(value, divider): 

406 """Truncates the remainder part after division.""" 

407 # For some languages, the sign of the remainder is implementation 

408 # dependent if any of the operands is negative. Here we enforce 

409 # "rounded toward zero" semantics. For example, for (-5) / 2 an 

410 # implementation may give -3 as the result with the remainder being 

411 # 1. This function ensures we always return -2 (closer to zero). 

412 result = value // divider 

413 remainder = value % divider 

414 if result < 0 and remainder > 0: 

415 return result + 1 

416 else: 

417 return result 

418 

419 

420def _SetStructValue(struct_value, value): 

421 if value is None: 

422 struct_value.null_value = 0 

423 elif isinstance(value, bool): 

424 # Note: this check must come before the number check because in Python 

425 # True and False are also considered numbers. 

426 struct_value.bool_value = value 

427 elif isinstance(value, str): 

428 struct_value.string_value = value 

429 elif isinstance(value, (int, float)): 

430 struct_value.number_value = value 

431 elif isinstance(value, (dict, Struct)): 

432 struct_value.struct_value.Clear() 

433 struct_value.struct_value.update(value) 

434 elif isinstance(value, (list, tuple, ListValue)): 

435 struct_value.list_value.Clear() 

436 struct_value.list_value.extend(value) 

437 else: 

438 raise ValueError('Unexpected type') 

439 

440 

441def _GetStructValue(struct_value): 

442 which = struct_value.WhichOneof('kind') 

443 if which == 'struct_value': 

444 return struct_value.struct_value 

445 elif which == 'null_value': 

446 return None 

447 elif which == 'number_value': 

448 return struct_value.number_value 

449 elif which == 'string_value': 

450 return struct_value.string_value 

451 elif which == 'bool_value': 

452 return struct_value.bool_value 

453 elif which == 'list_value': 

454 return struct_value.list_value 

455 elif which is None: 

456 raise ValueError('Value not set') 

457 

458 

459class Struct(object): 

460 """Class for Struct message type.""" 

461 

462 __slots__ = () 

463 

464 def __getitem__(self, key): 

465 return _GetStructValue(self.fields[key]) 

466 

467 def __contains__(self, item): 

468 return item in self.fields 

469 

470 def __setitem__(self, key, value): 

471 _SetStructValue(self.fields[key], value) 

472 

473 def __delitem__(self, key): 

474 del self.fields[key] 

475 

476 def __len__(self): 

477 return len(self.fields) 

478 

479 def __iter__(self): 

480 return iter(self.fields) 

481 

482 def keys(self): # pylint: disable=invalid-name 

483 return self.fields.keys() 

484 

485 def values(self): # pylint: disable=invalid-name 

486 return [self[key] for key in self] 

487 

488 def items(self): # pylint: disable=invalid-name 

489 return [(key, self[key]) for key in self] 

490 

491 def get_or_create_list(self, key): 

492 """Returns a list for this key, creating if it didn't exist already.""" 

493 if not self.fields[key].HasField('list_value'): 

494 # Clear will mark list_value modified which will indeed create a list. 

495 self.fields[key].list_value.Clear() 

496 return self.fields[key].list_value 

497 

498 def get_or_create_struct(self, key): 

499 """Returns a struct for this key, creating if it didn't exist already.""" 

500 if not self.fields[key].HasField('struct_value'): 

501 # Clear will mark struct_value modified which will indeed create a struct. 

502 self.fields[key].struct_value.Clear() 

503 return self.fields[key].struct_value 

504 

505 def update(self, dictionary): # pylint: disable=invalid-name 

506 for key, value in dictionary.items(): 

507 _SetStructValue(self.fields[key], value) 

508 

509collections.abc.MutableMapping.register(Struct) 

510 

511 

512class ListValue(object): 

513 """Class for ListValue message type.""" 

514 

515 __slots__ = () 

516 

517 def __len__(self): 

518 return len(self.values) 

519 

520 def append(self, value): 

521 _SetStructValue(self.values.add(), value) 

522 

523 def extend(self, elem_seq): 

524 for value in elem_seq: 

525 self.append(value) 

526 

527 def __getitem__(self, index): 

528 """Retrieves item by the specified index.""" 

529 return _GetStructValue(self.values.__getitem__(index)) 

530 

531 def __setitem__(self, index, value): 

532 _SetStructValue(self.values.__getitem__(index), value) 

533 

534 def __delitem__(self, key): 

535 del self.values[key] 

536 

537 def items(self): 

538 for i in range(len(self)): 

539 yield self[i] 

540 

541 def add_struct(self): 

542 """Appends and returns a struct value as the next value in the list.""" 

543 struct_value = self.values.add().struct_value 

544 # Clear will mark struct_value modified which will indeed create a struct. 

545 struct_value.Clear() 

546 return struct_value 

547 

548 def add_list(self): 

549 """Appends and returns a list value as the next value in the list.""" 

550 list_value = self.values.add().list_value 

551 # Clear will mark list_value modified which will indeed create a list. 

552 list_value.Clear() 

553 return list_value 

554 

555collections.abc.MutableSequence.register(ListValue) 

556 

557 

558# LINT.IfChange(wktbases) 

559WKTBASES = { 

560 'google.protobuf.Any': Any, 

561 'google.protobuf.Duration': Duration, 

562 'google.protobuf.FieldMask': FieldMask, 

563 'google.protobuf.ListValue': ListValue, 

564 'google.protobuf.Struct': Struct, 

565 'google.protobuf.Timestamp': Timestamp, 

566} 

567# LINT.ThenChange(//depot/google.protobuf/compiler/python/pyi_generator.cc:wktbases)