Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/confluent_kafka/serialization/__init__.py: 44%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

80 statements  

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3# 

4# Copyright 2020 Confluent Inc. 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); 

7# you may not use this file except in compliance with the License. 

8# You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

15# See the License for the specific language governing permissions and 

16# limitations under the License. 

17# 

18import struct as _struct 

19from enum import Enum 

20from typing import Any, List, Optional 

21 

22from confluent_kafka.error import KafkaException 

23from confluent_kafka._types import HeadersType 

24 

25__all__ = ['Deserializer', 

26 'IntegerDeserializer', 

27 'IntegerSerializer', 

28 'DoubleDeserializer', 

29 'DoubleSerializer', 

30 'StringDeserializer', 

31 'StringSerializer', 

32 'MessageField', 

33 'SerializationContext', 

34 'SerializationError', 

35 'Serializer'] 

36 

37 

38class MessageField(str, Enum): 

39 """ 

40 Enum like object for identifying Message fields. 

41 

42 Attributes: 

43 KEY (str): Message key 

44 

45 VALUE (str): Message value 

46 """ 

47 

48 NONE = 'none' 

49 KEY = 'key' 

50 VALUE = 'value' 

51 

52 def __str__(self) -> str: 

53 return str(self.value) 

54 

55 

56class SerializationContext(object): 

57 """ 

58 SerializationContext provides additional context to the 

59 serializer/deserializer about the data it's serializing/deserializing. 

60 

61 Args: 

62 topic (str): Topic data is being produce to or consumed from. 

63 

64 field (MessageField): Describes what part of the message is 

65 being serialized. 

66 

67 headers (list): List of message header tuples. Defaults to None. 

68 """ 

69 

70 def __init__(self, topic: str, field: MessageField, headers: Optional[HeadersType] = None) -> None: 

71 self.topic = topic 

72 self.field = field 

73 self.headers = headers 

74 

75 

76class SerializationError(KafkaException): 

77 """Generic error from serializer package""" 

78 pass 

79 

80 

81class Serializer(object): 

82 """ 

83 Extensible class from which all Serializer implementations derive. 

84 Serializers instruct Kafka clients on how to convert Python objects 

85 to bytes. 

86 

87 See built-in implementations, listed below, for an example of how to 

88 extend this class. 

89 

90 Note: 

91 This class is not directly instantiable. The derived classes must be 

92 used instead. 

93 

94 The following implementations are provided by this module. 

95 

96 Note: 

97 Unless noted elsewhere all numeric types are signed and serialization 

98 is big-endian. 

99 

100 .. list-table:: 

101 :header-rows: 1 

102 

103 * - Name 

104 - Type 

105 - Binary Format 

106 * - DoubleSerializer 

107 - float 

108 - IEEE 764 binary64 

109 * - IntegerSerializer 

110 - int 

111 - int32 

112 * - StringSerializer 

113 - unicode 

114 - unicode(encoding) 

115 """ 

116 

117 __slots__: List[str] = [] 

118 

119 def __call__(self, obj: Any, ctx: Optional[SerializationContext] = None) -> Optional[bytes]: 

120 """ 

121 Converts obj to bytes. 

122 

123 Args: 

124 obj (object): object to be serialized 

125 

126 ctx (SerializationContext): Metadata pertaining to the serialization 

127 operation 

128 

129 Raises: 

130 SerializerError if an error occurs during serialization 

131 

132 Returns: 

133 bytes if obj is not None, otherwise None 

134 """ 

135 

136 raise NotImplementedError 

137 

138 

139class Deserializer(object): 

140 """ 

141 Extensible class from which all Deserializer implementations derive. 

142 Deserializers instruct Kafka clients on how to convert bytes to objects. 

143 

144 See built-in implementations, listed below, for an example of how to 

145 extend this class. 

146 

147 Note: 

148 This class is not directly instantiable. The derived classes must be 

149 used instead. 

150 

151 The following implementations are provided by this module. 

152 

153 Note: 

154 Unless noted elsewhere all numeric types are signed and 

155 serialization is big-endian. 

156 

157 .. list-table:: 

158 :header-rows: 1 

159 

160 * - Name 

161 - Type 

162 - Binary Format 

163 * - DoubleDeserializer 

164 - float 

165 - IEEE 764 binary64 

166 * - IntegerDeserializer 

167 - int 

168 - int32 

169 * - StringDeserializer 

170 - unicode 

171 - unicode(encoding) 

172 """ 

173 

174 __slots__: List[str] = [] 

175 

176 def __call__(self, value: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Any: 

177 """ 

178 Convert bytes to object 

179 

180 Args: 

181 value (bytes): bytes to be deserialized 

182 

183 ctx (SerializationContext): Metadata pertaining to the serialization 

184 operation 

185 

186 Raises: 

187 SerializerError if an error occurs during deserialization 

188 

189 Returns: 

190 object if data is not None, otherwise None 

191 """ 

192 

193 raise NotImplementedError 

194 

195 

196class DoubleSerializer(Serializer): 

197 """ 

198 Serializes float to IEEE 764 binary64. 

199 

200 See Also: 

201 `DoubleSerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/DoubleSerializer.html>`_ 

202 

203 """ # noqa: E501 

204 

205 def __call__(self, obj: Optional[float], ctx: Optional[SerializationContext] = None) -> Optional[bytes]: 

206 """ 

207 Args: 

208 obj (object): object to be serialized 

209 

210 ctx (SerializationContext): Metadata pertaining to the serialization 

211 operation 

212 

213 Note: 

214 None objects are represented as Kafka Null. 

215 

216 Raises: 

217 SerializerError if an error occurs during serialization. 

218 

219 Returns: 

220 IEEE 764 binary64 bytes if obj is not None, otherwise None 

221 """ 

222 

223 if obj is None: 

224 return None 

225 

226 try: 

227 return _struct.pack('>d', obj) 

228 except _struct.error as e: 

229 raise SerializationError(str(e)) 

230 

231 

232class DoubleDeserializer(Deserializer): 

233 """ 

234 Deserializes float to IEEE 764 binary64. 

235 

236 See Also: 

237 `DoubleDeserializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/DoubleDeserializer.html>`_ 

238 """ # noqa: E501 

239 

240 def __call__(self, value: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Optional[float]: 

241 """ 

242 Deserializes float from IEEE 764 binary64 bytes. 

243 

244 Args: 

245 value (bytes): bytes to be deserialized 

246 

247 ctx (SerializationContext): Metadata pertaining to the serialization 

248 operation 

249 

250 Raises: 

251 SerializerError if an error occurs during deserialization. 

252 

253 Returns: 

254 float if data is not None, otherwise None 

255 """ 

256 

257 if value is None: 

258 return None 

259 

260 try: 

261 return _struct.unpack('>d', value)[0] 

262 except _struct.error as e: 

263 raise SerializationError(str(e)) 

264 

265 

266class IntegerSerializer(Serializer): 

267 """ 

268 Serializes int to int32 bytes. 

269 

270 See Also: 

271 `IntegerSerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/IntegerSerializer.html>`_ 

272 """ # noqa: E501 

273 

274 def __call__(self, obj: Optional[int], ctx: Optional[SerializationContext] = None) -> Optional[bytes]: 

275 """ 

276 Serializes int as int32 bytes. 

277 

278 Args: 

279 obj (object): object to be serialized 

280 

281 ctx (SerializationContext): Metadata pertaining to the serialization 

282 operation 

283 

284 Note: 

285 None objects are represented as Kafka Null. 

286 

287 Raises: 

288 SerializerError if an error occurs during serialization 

289 

290 Returns: 

291 int32 bytes if obj is not None, else None 

292 """ 

293 

294 if obj is None: 

295 return None 

296 

297 try: 

298 return _struct.pack('>i', obj) 

299 except _struct.error as e: 

300 raise SerializationError(str(e)) 

301 

302 

303class IntegerDeserializer(Deserializer): 

304 """ 

305 Deserializes int to int32 bytes. 

306 

307 See Also: 

308 `IntegerDeserializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/IntegerDeserializer.html>`_ 

309 """ # noqa: E501 

310 

311 def __call__(self, value: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Optional[int]: 

312 """ 

313 Deserializes int from int32 bytes. 

314 

315 Args: 

316 value (bytes): bytes to be deserialized 

317 

318 ctx (SerializationContext): Metadata pertaining to the serialization 

319 operation 

320 

321 Raises: 

322 SerializerError if an error occurs during deserialization. 

323 

324 Returns: 

325 int if data is not None, otherwise None 

326 """ 

327 

328 if value is None: 

329 return None 

330 

331 try: 

332 return _struct.unpack('>i', value)[0] 

333 except _struct.error as e: 

334 raise SerializationError(str(e)) 

335 

336 

337class StringSerializer(Serializer): 

338 """ 

339 Serializes unicode to bytes per the configured codec. Defaults to ``utf_8``. 

340 

341 Note: 

342 None objects are represented as Kafka Null. 

343 

344 Args: 

345 codec (str, optional): encoding scheme. Defaults to utf_8 

346 

347 See Also: 

348 `Supported encodings <https://docs.python.org/3/library/codecs.html#standard-encodings>`_ 

349 

350 `StringSerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/StringSerializer.html>`_ 

351 """ # noqa: E501 

352 

353 def __init__(self, codec: str = 'utf_8') -> None: 

354 self.codec = codec 

355 

356 def __call__(self, obj: Optional[str], ctx: Optional[SerializationContext] = None) -> Optional[bytes]: 

357 """ 

358 Serializes a str(py2:unicode) to bytes. 

359 

360 Compatibility Note: 

361 Python 2 str objects must be converted to unicode objects. 

362 Python 3 all str objects are already unicode objects. 

363 

364 Args: 

365 obj (object): object to be serialized 

366 

367 ctx (SerializationContext): Metadata pertaining to the serialization 

368 operation 

369 

370 Raises: 

371 SerializerError if an error occurs during serialization. 

372 

373 Returns: 

374 serialized bytes if obj is not None, otherwise None 

375 """ 

376 

377 if obj is None: 

378 return None 

379 

380 try: 

381 return obj.encode(self.codec) 

382 except _struct.error as e: 

383 raise SerializationError(str(e)) 

384 

385 

386class StringDeserializer(Deserializer): 

387 """ 

388 Deserializes a str(py2:unicode) from bytes. 

389 

390 Args: 

391 codec (str, optional): encoding scheme. Defaults to utf_8 

392 

393 See Also: 

394 `Supported encodings <https://docs.python.org/3/library/codecs.html#standard-encodings>`_ 

395 

396 `StringDeserializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/StringDeserializer.html>`_ 

397 """ # noqa: E501 

398 

399 def __init__(self, codec: str = 'utf_8') -> None: 

400 self.codec = codec 

401 

402 def __call__(self, value: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Optional[str]: 

403 """ 

404 Serializes unicode to bytes per the configured codec. Defaults to ``utf_8``. 

405 

406 Compatibility Note: 

407 Python 2 str objects must be converted to unicode objects by the 

408 application prior to using this serializer. 

409 

410 Python 3 all str objects are already unicode objects. 

411 

412 Args: 

413 value (bytes): bytes to be deserialized 

414 

415 ctx (SerializationContext): Metadata pertaining to the serialization 

416 operation 

417 

418 Raises: 

419 SerializerError if an error occurs during deserialization. 

420 

421 Returns: 

422 unicode if data is not None, otherwise None 

423 """ 

424 

425 if value is None: 

426 return None 

427 

428 try: 

429 return value.decode(self.codec) 

430 except _struct.error as e: 

431 raise SerializationError(str(e))