Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pyasn1/codec/cer/encoder.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

149 statements  

1# 

2# This file is part of pyasn1 software. 

3# 

4# Copyright (c) 2005-2020, Ilya Etingof <etingof@gmail.com> 

5# License: https://pyasn1.readthedocs.io/en/latest/license.html 

6# 

7import warnings 

8 

9from pyasn1 import error 

10from pyasn1.codec.ber import encoder 

11from pyasn1.type import univ 

12from pyasn1.type import useful 

13 

14__all__ = ['Encoder', 'encode'] 

15 

16 

17class BooleanEncoder(encoder.IntegerEncoder): 

18 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

19 if value == 0: 

20 substrate = (0,) 

21 else: 

22 substrate = (255,) 

23 return substrate, False, False 

24 

25 

26class RealEncoder(encoder.RealEncoder): 

27 def _chooseEncBase(self, value): 

28 m, b, e = value 

29 return self._dropFloatingPoint(m, b, e) 

30 

31 

32# specialized GeneralStringEncoder here 

33 

34class TimeEncoderMixIn(object): 

35 Z_CHAR = ord('Z') 

36 PLUS_CHAR = ord('+') 

37 MINUS_CHAR = ord('-') 

38 COMMA_CHAR = ord(',') 

39 DOT_CHAR = ord('.') 

40 ZERO_CHAR = ord('0') 

41 

42 MIN_LENGTH = 12 

43 MAX_LENGTH = 19 

44 

45 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

46 # CER encoding constraints: 

47 # - minutes are mandatory, seconds are optional 

48 # - sub-seconds must NOT be zero / no meaningless zeros 

49 # - no hanging fraction dot 

50 # - time in UTC (Z) 

51 # - only dot is allowed for fractions 

52 

53 if asn1Spec is not None: 

54 value = asn1Spec.clone(value) 

55 

56 numbers = value.asNumbers() 

57 

58 if self.PLUS_CHAR in numbers or self.MINUS_CHAR in numbers: 

59 raise error.PyAsn1Error('Must be UTC time: %r' % value) 

60 

61 if numbers[-1] != self.Z_CHAR: 

62 raise error.PyAsn1Error('Missing "Z" time zone specifier: %r' % value) 

63 

64 if self.COMMA_CHAR in numbers: 

65 raise error.PyAsn1Error('Comma in fractions disallowed: %r' % value) 

66 

67 if self.DOT_CHAR in numbers: 

68 

69 isModified = False 

70 

71 numbers = list(numbers) 

72 

73 searchIndex = min(numbers.index(self.DOT_CHAR) + 4, len(numbers) - 1) 

74 

75 while numbers[searchIndex] != self.DOT_CHAR: 

76 if numbers[searchIndex] == self.ZERO_CHAR: 

77 del numbers[searchIndex] 

78 isModified = True 

79 

80 searchIndex -= 1 

81 

82 searchIndex += 1 

83 

84 if searchIndex < len(numbers): 

85 if numbers[searchIndex] == self.Z_CHAR: 

86 # drop hanging comma 

87 del numbers[searchIndex - 1] 

88 isModified = True 

89 

90 if isModified: 

91 value = value.clone(numbers) 

92 

93 if not self.MIN_LENGTH < len(numbers) < self.MAX_LENGTH: 

94 raise error.PyAsn1Error('Length constraint violated: %r' % value) 

95 

96 options.update(maxChunkSize=1000) 

97 

98 return encoder.OctetStringEncoder.encodeValue( 

99 self, value, asn1Spec, encodeFun, **options 

100 ) 

101 

102 

103class GeneralizedTimeEncoder(TimeEncoderMixIn, encoder.OctetStringEncoder): 

104 MIN_LENGTH = 12 

105 MAX_LENGTH = 20 

106 

107 

108class UTCTimeEncoder(TimeEncoderMixIn, encoder.OctetStringEncoder): 

109 MIN_LENGTH = 10 

110 MAX_LENGTH = 14 

111 

112 

113class SetOfEncoder(encoder.SequenceOfEncoder): 

114 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

115 chunks = self._encodeComponents( 

116 value, asn1Spec, encodeFun, **options) 

117 

118 # sort by serialised and padded components 

119 if len(chunks) > 1: 

120 zero = b'\x00' 

121 maxLen = max(map(len, chunks)) 

122 paddedChunks = [ 

123 (x.ljust(maxLen, zero), x) for x in chunks 

124 ] 

125 paddedChunks.sort(key=lambda x: x[0]) 

126 

127 chunks = [x[1] for x in paddedChunks] 

128 

129 return b''.join(chunks), True, True 

130 

131 

132class SequenceOfEncoder(encoder.SequenceOfEncoder): 

133 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

134 

135 if options.get('ifNotEmpty', False) and not len(value): 

136 return b'', True, True 

137 

138 chunks = self._encodeComponents( 

139 value, asn1Spec, encodeFun, **options) 

140 

141 return b''.join(chunks), True, True 

142 

143 

144class SetEncoder(encoder.SequenceEncoder): 

145 @staticmethod 

146 def _componentSortKey(componentAndType): 

147 """Sort SET components by tag 

148 

149 Sort regardless of the Choice value (static sort) 

150 """ 

151 component, asn1Spec = componentAndType 

152 

153 if asn1Spec is None: 

154 asn1Spec = component 

155 

156 if asn1Spec.typeId == univ.Choice.typeId and not asn1Spec.tagSet: 

157 if asn1Spec.tagSet: 

158 return asn1Spec.tagSet 

159 else: 

160 return asn1Spec.componentType.minTagSet 

161 else: 

162 return asn1Spec.tagSet 

163 

164 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

165 

166 substrate = b'' 

167 

168 comps = [] 

169 compsMap = {} 

170 

171 if asn1Spec is None: 

172 # instance of ASN.1 schema 

173 inconsistency = value.isInconsistent 

174 if inconsistency: 

175 raise error.PyAsn1Error( 

176 f"ASN.1 object {value.__class__.__name__} is inconsistent") 

177 

178 namedTypes = value.componentType 

179 

180 for idx, component in enumerate(value.values()): 

181 if namedTypes: 

182 namedType = namedTypes[idx] 

183 

184 if namedType.isOptional and not component.isValue: 

185 continue 

186 

187 if namedType.isDefaulted and component == namedType.asn1Object: 

188 continue 

189 

190 compsMap[id(component)] = namedType 

191 

192 else: 

193 compsMap[id(component)] = None 

194 

195 comps.append((component, asn1Spec)) 

196 

197 else: 

198 # bare Python value + ASN.1 schema 

199 for idx, namedType in enumerate(asn1Spec.componentType.namedTypes): 

200 

201 try: 

202 component = value[namedType.name] 

203 

204 except KeyError: 

205 raise error.PyAsn1Error('Component name "%s" not found in %r' % (namedType.name, value)) 

206 

207 if namedType.isOptional and namedType.name not in value: 

208 continue 

209 

210 if namedType.isDefaulted and component == namedType.asn1Object: 

211 continue 

212 

213 compsMap[id(component)] = namedType 

214 comps.append((component, asn1Spec[idx])) 

215 

216 for comp, compType in sorted(comps, key=self._componentSortKey): 

217 namedType = compsMap[id(comp)] 

218 

219 if namedType: 

220 options.update(ifNotEmpty=namedType.isOptional) 

221 

222 chunk = encodeFun(comp, compType, **options) 

223 

224 # wrap open type blob if needed 

225 if namedType and namedType.openType: 

226 wrapType = namedType.asn1Object 

227 if wrapType.tagSet and not wrapType.isSameTypeWith(comp): 

228 chunk = encodeFun(chunk, wrapType, **options) 

229 

230 substrate += chunk 

231 

232 return substrate, True, True 

233 

234 

235class SequenceEncoder(encoder.SequenceEncoder): 

236 omitEmptyOptionals = True 

237 

238 

239TAG_MAP = encoder.TAG_MAP.copy() 

240 

241TAG_MAP.update({ 

242 univ.Boolean.tagSet: BooleanEncoder(), 

243 univ.Real.tagSet: RealEncoder(), 

244 useful.GeneralizedTime.tagSet: GeneralizedTimeEncoder(), 

245 useful.UTCTime.tagSet: UTCTimeEncoder(), 

246 # Sequence & Set have same tags as SequenceOf & SetOf 

247 univ.SetOf.tagSet: SetOfEncoder(), 

248 univ.Sequence.typeId: SequenceEncoder() 

249}) 

250 

251TYPE_MAP = encoder.TYPE_MAP.copy() 

252 

253TYPE_MAP.update({ 

254 univ.Boolean.typeId: BooleanEncoder(), 

255 univ.Real.typeId: RealEncoder(), 

256 useful.GeneralizedTime.typeId: GeneralizedTimeEncoder(), 

257 useful.UTCTime.typeId: UTCTimeEncoder(), 

258 # Sequence & Set have same tags as SequenceOf & SetOf 

259 univ.Set.typeId: SetEncoder(), 

260 univ.SetOf.typeId: SetOfEncoder(), 

261 univ.Sequence.typeId: SequenceEncoder(), 

262 univ.SequenceOf.typeId: SequenceOfEncoder() 

263}) 

264 

265 

266class SingleItemEncoder(encoder.SingleItemEncoder): 

267 fixedDefLengthMode = False 

268 fixedChunkSize = 1000 

269 

270 TAG_MAP = TAG_MAP 

271 TYPE_MAP = TYPE_MAP 

272 

273 

274class Encoder(encoder.Encoder): 

275 SINGLE_ITEM_ENCODER = SingleItemEncoder 

276 

277 

278#: Turns ASN.1 object into CER octet stream. 

279#: 

280#: Takes any ASN.1 object (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) 

281#: walks all its components recursively and produces a CER octet stream. 

282#: 

283#: Parameters 

284#: ---------- 

285#: value: either a Python or pyasn1 object (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) 

286#: A Python or pyasn1 object to encode. If Python object is given, `asnSpec` 

287#: parameter is required to guide the encoding process. 

288#: 

289#: Keyword Args 

290#: ------------ 

291#: asn1Spec: 

292#: Optional ASN.1 schema or value object e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative 

293#: 

294#: Returns 

295#: ------- 

296#: : :py:class:`bytes` 

297#: Given ASN.1 object encoded into BER octet-stream 

298#: 

299#: Raises 

300#: ------ 

301#: ~pyasn1.error.PyAsn1Error 

302#: On encoding errors 

303#: 

304#: Examples 

305#: -------- 

306#: Encode Python value into CER with ASN.1 schema 

307#: 

308#: .. code-block:: pycon 

309#: 

310#: >>> seq = SequenceOf(componentType=Integer()) 

311#: >>> encode([1, 2, 3], asn1Spec=seq) 

312#: b'0\x80\x02\x01\x01\x02\x01\x02\x02\x01\x03\x00\x00' 

313#: 

314#: Encode ASN.1 value object into CER 

315#: 

316#: .. code-block:: pycon 

317#: 

318#: >>> seq = SequenceOf(componentType=Integer()) 

319#: >>> seq.extend([1, 2, 3]) 

320#: >>> encode(seq) 

321#: b'0\x80\x02\x01\x01\x02\x01\x02\x02\x01\x03\x00\x00' 

322#: 

323encode = Encoder() 

324 

325# EncoderFactory queries class instance and builds a map of tags -> encoders 

326 

327def __getattr__(attr: str): 

328 if newAttr := {"tagMap": "TAG_MAP", "typeMap": "TYPE_MAP"}.get(attr): 

329 warnings.warn(f"{attr} is deprecated. Please use {newAttr} instead.", DeprecationWarning) 

330 return globals()[newAttr] 

331 raise AttributeError(attr)