Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/confluent_kafka/serialization/__init__.py: 44%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

80 statements  

1#!/usr/bin/env python 

2# -*- coding: utf-8 -*- 

3# 

4# Copyright 2020 Confluent Inc. 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); 

7# you may not use this file except in compliance with the License. 

8# You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

15# See the License for the specific language governing permissions and 

16# limitations under the License. 

17# 

18import struct as _struct 

19from enum import Enum 

20from typing import Any, List, Optional 

21 

22from confluent_kafka._types import HeadersType 

23from confluent_kafka.error import KafkaException 

24 

25__all__ = [ 

26 'Deserializer', 

27 'IntegerDeserializer', 

28 'IntegerSerializer', 

29 'DoubleDeserializer', 

30 'DoubleSerializer', 

31 'StringDeserializer', 

32 'StringSerializer', 

33 'MessageField', 

34 'SerializationContext', 

35 'SerializationError', 

36 'Serializer', 

37] 

38 

39 

40class MessageField(str, Enum): 

41 """ 

42 Enum like object for identifying Message fields. 

43 

44 Attributes: 

45 KEY (str): Message key 

46 

47 VALUE (str): Message value 

48 """ 

49 

50 NONE = 'none' 

51 KEY = 'key' 

52 VALUE = 'value' 

53 

54 def __str__(self) -> str: 

55 return str(self.value) 

56 

57 

58class SerializationContext(object): 

59 """ 

60 SerializationContext provides additional context to the 

61 serializer/deserializer about the data it's serializing/deserializing. 

62 

63 Args: 

64 topic (str): Topic data is being produce to or consumed from. 

65 

66 field (MessageField): Describes what part of the message is 

67 being serialized. 

68 

69 headers (list): List of message header tuples. Defaults to None. 

70 """ 

71 

72 def __init__(self, topic: str, field: MessageField, headers: Optional[HeadersType] = None) -> None: 

73 self.topic = topic 

74 self.field = field 

75 self.headers = headers 

76 

77 

78class SerializationError(KafkaException): 

79 """Generic error from serializer package""" 

80 

81 pass 

82 

83 

84class Serializer(object): 

85 """ 

86 Extensible class from which all Serializer implementations derive. 

87 Serializers instruct Kafka clients on how to convert Python objects 

88 to bytes. 

89 

90 See built-in implementations, listed below, for an example of how to 

91 extend this class. 

92 

93 Note: 

94 This class is not directly instantiable. The derived classes must be 

95 used instead. 

96 

97 The following implementations are provided by this module. 

98 

99 Note: 

100 Unless noted elsewhere all numeric types are signed and serialization 

101 is big-endian. 

102 

103 .. list-table:: 

104 :header-rows: 1 

105 

106 * - Name 

107 - Type 

108 - Binary Format 

109 * - DoubleSerializer 

110 - float 

111 - IEEE 764 binary64 

112 * - IntegerSerializer 

113 - int 

114 - int32 

115 * - StringSerializer 

116 - unicode 

117 - unicode(encoding) 

118 """ 

119 

120 __slots__: List[str] = [] 

121 

122 def __call__(self, obj: Any, ctx: Optional[SerializationContext] = None) -> Optional[bytes]: 

123 """ 

124 Converts obj to bytes. 

125 

126 Args: 

127 obj (object): object to be serialized 

128 

129 ctx (SerializationContext): Metadata pertaining to the serialization 

130 operation 

131 

132 Raises: 

133 SerializerError if an error occurs during serialization 

134 

135 Returns: 

136 bytes if obj is not None, otherwise None 

137 """ 

138 

139 raise NotImplementedError 

140 

141 

142class Deserializer(object): 

143 """ 

144 Extensible class from which all Deserializer implementations derive. 

145 Deserializers instruct Kafka clients on how to convert bytes to objects. 

146 

147 See built-in implementations, listed below, for an example of how to 

148 extend this class. 

149 

150 Note: 

151 This class is not directly instantiable. The derived classes must be 

152 used instead. 

153 

154 The following implementations are provided by this module. 

155 

156 Note: 

157 Unless noted elsewhere all numeric types are signed and 

158 serialization is big-endian. 

159 

160 .. list-table:: 

161 :header-rows: 1 

162 

163 * - Name 

164 - Type 

165 - Binary Format 

166 * - DoubleDeserializer 

167 - float 

168 - IEEE 764 binary64 

169 * - IntegerDeserializer 

170 - int 

171 - int32 

172 * - StringDeserializer 

173 - unicode 

174 - unicode(encoding) 

175 """ 

176 

177 __slots__: List[str] = [] 

178 

179 def __call__(self, value: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Any: 

180 """ 

181 Convert bytes to object 

182 

183 Args: 

184 value (bytes): bytes to be deserialized 

185 

186 ctx (SerializationContext): Metadata pertaining to the serialization 

187 operation 

188 

189 Raises: 

190 SerializerError if an error occurs during deserialization 

191 

192 Returns: 

193 object if data is not None, otherwise None 

194 """ 

195 

196 raise NotImplementedError 

197 

198 

199class DoubleSerializer(Serializer): 

200 """ 

201 Serializes float to IEEE 764 binary64. 

202 

203 See Also: 

204 `DoubleSerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/DoubleSerializer.html>`_ 

205 

206 """ # noqa: E501 

207 

208 def __call__(self, obj: Optional[float], ctx: Optional[SerializationContext] = None) -> Optional[bytes]: 

209 """ 

210 Args: 

211 obj (object): object to be serialized 

212 

213 ctx (SerializationContext): Metadata pertaining to the serialization 

214 operation 

215 

216 Note: 

217 None objects are represented as Kafka Null. 

218 

219 Raises: 

220 SerializerError if an error occurs during serialization. 

221 

222 Returns: 

223 IEEE 764 binary64 bytes if obj is not None, otherwise None 

224 """ 

225 

226 if obj is None: 

227 return None 

228 

229 try: 

230 return _struct.pack('>d', obj) 

231 except _struct.error as e: 

232 raise SerializationError(str(e)) 

233 

234 

235class DoubleDeserializer(Deserializer): 

236 """ 

237 Deserializes float to IEEE 764 binary64. 

238 

239 See Also: 

240 `DoubleDeserializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/DoubleDeserializer.html>`_ 

241 """ # noqa: E501 

242 

243 def __call__(self, value: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Optional[float]: 

244 """ 

245 Deserializes float from IEEE 764 binary64 bytes. 

246 

247 Args: 

248 value (bytes): bytes to be deserialized 

249 

250 ctx (SerializationContext): Metadata pertaining to the serialization 

251 operation 

252 

253 Raises: 

254 SerializerError if an error occurs during deserialization. 

255 

256 Returns: 

257 float if data is not None, otherwise None 

258 """ 

259 

260 if value is None: 

261 return None 

262 

263 try: 

264 return _struct.unpack('>d', value)[0] 

265 except _struct.error as e: 

266 raise SerializationError(str(e)) 

267 

268 

269class IntegerSerializer(Serializer): 

270 """ 

271 Serializes int to int32 bytes. 

272 

273 See Also: 

274 `IntegerSerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/IntegerSerializer.html>`_ 

275 """ # noqa: E501 

276 

277 def __call__(self, obj: Optional[int], ctx: Optional[SerializationContext] = None) -> Optional[bytes]: 

278 """ 

279 Serializes int as int32 bytes. 

280 

281 Args: 

282 obj (object): object to be serialized 

283 

284 ctx (SerializationContext): Metadata pertaining to the serialization 

285 operation 

286 

287 Note: 

288 None objects are represented as Kafka Null. 

289 

290 Raises: 

291 SerializerError if an error occurs during serialization 

292 

293 Returns: 

294 int32 bytes if obj is not None, else None 

295 """ 

296 

297 if obj is None: 

298 return None 

299 

300 try: 

301 return _struct.pack('>i', obj) 

302 except _struct.error as e: 

303 raise SerializationError(str(e)) 

304 

305 

306class IntegerDeserializer(Deserializer): 

307 """ 

308 Deserializes int to int32 bytes. 

309 

310 See Also: 

311 `IntegerDeserializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/IntegerDeserializer.html>`_ 

312 """ # noqa: E501 

313 

314 def __call__(self, value: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Optional[int]: 

315 """ 

316 Deserializes int from int32 bytes. 

317 

318 Args: 

319 value (bytes): bytes to be deserialized 

320 

321 ctx (SerializationContext): Metadata pertaining to the serialization 

322 operation 

323 

324 Raises: 

325 SerializerError if an error occurs during deserialization. 

326 

327 Returns: 

328 int if data is not None, otherwise None 

329 """ 

330 

331 if value is None: 

332 return None 

333 

334 try: 

335 return _struct.unpack('>i', value)[0] 

336 except _struct.error as e: 

337 raise SerializationError(str(e)) 

338 

339 

340class StringSerializer(Serializer): 

341 """ 

342 Serializes unicode to bytes per the configured codec. Defaults to ``utf_8``. 

343 

344 Note: 

345 None objects are represented as Kafka Null. 

346 

347 Args: 

348 codec (str, optional): encoding scheme. Defaults to utf_8 

349 

350 See Also: 

351 `Supported encodings <https://docs.python.org/3/library/codecs.html#standard-encodings>`_ 

352 

353 `StringSerializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/StringSerializer.html>`_ 

354 """ # noqa: E501 

355 

356 def __init__(self, codec: str = 'utf_8') -> None: 

357 self.codec = codec 

358 

359 def __call__(self, obj: Optional[str], ctx: Optional[SerializationContext] = None) -> Optional[bytes]: 

360 """ 

361 Serializes a str(py2:unicode) to bytes. 

362 

363 Compatibility Note: 

364 Python 2 str objects must be converted to unicode objects. 

365 Python 3 all str objects are already unicode objects. 

366 

367 Args: 

368 obj (object): object to be serialized 

369 

370 ctx (SerializationContext): Metadata pertaining to the serialization 

371 operation 

372 

373 Raises: 

374 SerializerError if an error occurs during serialization. 

375 

376 Returns: 

377 serialized bytes if obj is not None, otherwise None 

378 """ 

379 

380 if obj is None: 

381 return None 

382 

383 try: 

384 return obj.encode(self.codec) 

385 except _struct.error as e: 

386 raise SerializationError(str(e)) 

387 

388 

389class StringDeserializer(Deserializer): 

390 """ 

391 Deserializes a str(py2:unicode) from bytes. 

392 

393 Args: 

394 codec (str, optional): encoding scheme. Defaults to utf_8 

395 

396 See Also: 

397 `Supported encodings <https://docs.python.org/3/library/codecs.html#standard-encodings>`_ 

398 

399 `StringDeserializer Javadoc <https://docs.confluent.io/current/clients/javadocs/org/apache/kafka/common/serialization/StringDeserializer.html>`_ 

400 """ # noqa: E501 

401 

402 def __init__(self, codec: str = 'utf_8') -> None: 

403 self.codec = codec 

404 

405 def __call__(self, value: Optional[bytes], ctx: Optional[SerializationContext] = None) -> Optional[str]: 

406 """ 

407 Serializes unicode to bytes per the configured codec. Defaults to ``utf_8``. 

408 

409 Compatibility Note: 

410 Python 2 str objects must be converted to unicode objects by the 

411 application prior to using this serializer. 

412 

413 Python 3 all str objects are already unicode objects. 

414 

415 Args: 

416 value (bytes): bytes to be deserialized 

417 

418 ctx (SerializationContext): Metadata pertaining to the serialization 

419 operation 

420 

421 Raises: 

422 SerializerError if an error occurs during deserialization. 

423 

424 Returns: 

425 unicode if data is not None, otherwise None 

426 """ 

427 

428 if value is None: 

429 return None 

430 

431 try: 

432 return value.decode(self.codec) 

433 except _struct.error as e: 

434 raise SerializationError(str(e))