Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_vendor/msgpack/ext.py: 37%

81 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:48 +0000

1# coding: utf-8 

2from collections import namedtuple 

3import datetime 

4import sys 

5import struct 

6 

7 

8PY2 = sys.version_info[0] == 2 

9 

10if PY2: 

11 int_types = (int, long) 

12 _utc = None 

13else: 

14 int_types = int 

15 try: 

16 _utc = datetime.timezone.utc 

17 except AttributeError: 

18 _utc = datetime.timezone(datetime.timedelta(0)) 

19 

20 

21class ExtType(namedtuple("ExtType", "code data")): 

22 """ExtType represents ext type in msgpack.""" 

23 

24 def __new__(cls, code, data): 

25 if not isinstance(code, int): 

26 raise TypeError("code must be int") 

27 if not isinstance(data, bytes): 

28 raise TypeError("data must be bytes") 

29 if not 0 <= code <= 127: 

30 raise ValueError("code must be 0~127") 

31 return super(ExtType, cls).__new__(cls, code, data) 

32 

33 

34class Timestamp(object): 

35 """Timestamp represents the Timestamp extension type in msgpack. 

36 

37 When built with Cython, msgpack uses C methods to pack and unpack `Timestamp`. When using pure-Python 

38 msgpack, :func:`to_bytes` and :func:`from_bytes` are used to pack and unpack `Timestamp`. 

39 

40 This class is immutable: Do not override seconds and nanoseconds. 

41 """ 

42 

43 __slots__ = ["seconds", "nanoseconds"] 

44 

45 def __init__(self, seconds, nanoseconds=0): 

46 """Initialize a Timestamp object. 

47 

48 :param int seconds: 

49 Number of seconds since the UNIX epoch (00:00:00 UTC Jan 1 1970, minus leap seconds). 

50 May be negative. 

51 

52 :param int nanoseconds: 

53 Number of nanoseconds to add to `seconds` to get fractional time. 

54 Maximum is 999_999_999. Default is 0. 

55 

56 Note: Negative times (before the UNIX epoch) are represented as negative seconds + positive ns. 

57 """ 

58 if not isinstance(seconds, int_types): 

59 raise TypeError("seconds must be an integer") 

60 if not isinstance(nanoseconds, int_types): 

61 raise TypeError("nanoseconds must be an integer") 

62 if not (0 <= nanoseconds < 10**9): 

63 raise ValueError( 

64 "nanoseconds must be a non-negative integer less than 999999999." 

65 ) 

66 self.seconds = seconds 

67 self.nanoseconds = nanoseconds 

68 

69 def __repr__(self): 

70 """String representation of Timestamp.""" 

71 return "Timestamp(seconds={0}, nanoseconds={1})".format( 

72 self.seconds, self.nanoseconds 

73 ) 

74 

75 def __eq__(self, other): 

76 """Check for equality with another Timestamp object""" 

77 if type(other) is self.__class__: 

78 return ( 

79 self.seconds == other.seconds and self.nanoseconds == other.nanoseconds 

80 ) 

81 return False 

82 

83 def __ne__(self, other): 

84 """not-equals method (see :func:`__eq__()`)""" 

85 return not self.__eq__(other) 

86 

87 def __hash__(self): 

88 return hash((self.seconds, self.nanoseconds)) 

89 

90 @staticmethod 

91 def from_bytes(b): 

92 """Unpack bytes into a `Timestamp` object. 

93 

94 Used for pure-Python msgpack unpacking. 

95 

96 :param b: Payload from msgpack ext message with code -1 

97 :type b: bytes 

98 

99 :returns: Timestamp object unpacked from msgpack ext payload 

100 :rtype: Timestamp 

101 """ 

102 if len(b) == 4: 

103 seconds = struct.unpack("!L", b)[0] 

104 nanoseconds = 0 

105 elif len(b) == 8: 

106 data64 = struct.unpack("!Q", b)[0] 

107 seconds = data64 & 0x00000003FFFFFFFF 

108 nanoseconds = data64 >> 34 

109 elif len(b) == 12: 

110 nanoseconds, seconds = struct.unpack("!Iq", b) 

111 else: 

112 raise ValueError( 

113 "Timestamp type can only be created from 32, 64, or 96-bit byte objects" 

114 ) 

115 return Timestamp(seconds, nanoseconds) 

116 

117 def to_bytes(self): 

118 """Pack this Timestamp object into bytes. 

119 

120 Used for pure-Python msgpack packing. 

121 

122 :returns data: Payload for EXT message with code -1 (timestamp type) 

123 :rtype: bytes 

124 """ 

125 if (self.seconds >> 34) == 0: # seconds is non-negative and fits in 34 bits 

126 data64 = self.nanoseconds << 34 | self.seconds 

127 if data64 & 0xFFFFFFFF00000000 == 0: 

128 # nanoseconds is zero and seconds < 2**32, so timestamp 32 

129 data = struct.pack("!L", data64) 

130 else: 

131 # timestamp 64 

132 data = struct.pack("!Q", data64) 

133 else: 

134 # timestamp 96 

135 data = struct.pack("!Iq", self.nanoseconds, self.seconds) 

136 return data 

137 

138 @staticmethod 

139 def from_unix(unix_sec): 

140 """Create a Timestamp from posix timestamp in seconds. 

141 

142 :param unix_float: Posix timestamp in seconds. 

143 :type unix_float: int or float. 

144 """ 

145 seconds = int(unix_sec // 1) 

146 nanoseconds = int((unix_sec % 1) * 10**9) 

147 return Timestamp(seconds, nanoseconds) 

148 

149 def to_unix(self): 

150 """Get the timestamp as a floating-point value. 

151 

152 :returns: posix timestamp 

153 :rtype: float 

154 """ 

155 return self.seconds + self.nanoseconds / 1e9 

156 

157 @staticmethod 

158 def from_unix_nano(unix_ns): 

159 """Create a Timestamp from posix timestamp in nanoseconds. 

160 

161 :param int unix_ns: Posix timestamp in nanoseconds. 

162 :rtype: Timestamp 

163 """ 

164 return Timestamp(*divmod(unix_ns, 10**9)) 

165 

166 def to_unix_nano(self): 

167 """Get the timestamp as a unixtime in nanoseconds. 

168 

169 :returns: posix timestamp in nanoseconds 

170 :rtype: int 

171 """ 

172 return self.seconds * 10**9 + self.nanoseconds 

173 

174 def to_datetime(self): 

175 """Get the timestamp as a UTC datetime. 

176 

177 Python 2 is not supported. 

178 

179 :rtype: datetime. 

180 """ 

181 return datetime.datetime.fromtimestamp(0, _utc) + datetime.timedelta( 

182 seconds=self.to_unix() 

183 ) 

184 

185 @staticmethod 

186 def from_datetime(dt): 

187 """Create a Timestamp from datetime with tzinfo. 

188 

189 Python 2 is not supported. 

190 

191 :rtype: Timestamp 

192 """ 

193 return Timestamp.from_unix(dt.timestamp())