Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pyasn1/codec/cer/encoder.py: 32%

139 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:16 +0000

1# 

2# This file is part of pyasn1 software. 

3# 

4# Copyright (c) 2005-2019, Ilya Etingof <etingof@gmail.com> 

5# License: http://snmplabs.com/pyasn1/license.html 

6# 

7from pyasn1 import error 

8from pyasn1.codec.ber import encoder 

9from pyasn1.compat.octets import str2octs, null 

10from pyasn1.type import univ 

11from pyasn1.type import useful 

12 

13__all__ = ['encode'] 

14 

15 

16class BooleanEncoder(encoder.IntegerEncoder): 

17 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

18 if value == 0: 

19 substrate = (0,) 

20 else: 

21 substrate = (255,) 

22 return substrate, False, False 

23 

24 

25class RealEncoder(encoder.RealEncoder): 

26 def _chooseEncBase(self, value): 

27 m, b, e = value 

28 return self._dropFloatingPoint(m, b, e) 

29 

30 

31# specialized GeneralStringEncoder here 

32 

33class TimeEncoderMixIn(object): 

34 Z_CHAR = ord('Z') 

35 PLUS_CHAR = ord('+') 

36 MINUS_CHAR = ord('-') 

37 COMMA_CHAR = ord(',') 

38 DOT_CHAR = ord('.') 

39 ZERO_CHAR = ord('0') 

40 

41 MIN_LENGTH = 12 

42 MAX_LENGTH = 19 

43 

44 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

45 # CER encoding constraints: 

46 # - minutes are mandatory, seconds are optional 

47 # - sub-seconds must NOT be zero / no meaningless zeros 

48 # - no hanging fraction dot 

49 # - time in UTC (Z) 

50 # - only dot is allowed for fractions 

51 

52 if asn1Spec is not None: 

53 value = asn1Spec.clone(value) 

54 

55 numbers = value.asNumbers() 

56 

57 if self.PLUS_CHAR in numbers or self.MINUS_CHAR in numbers: 

58 raise error.PyAsn1Error('Must be UTC time: %r' % value) 

59 

60 if numbers[-1] != self.Z_CHAR: 

61 raise error.PyAsn1Error('Missing "Z" time zone specifier: %r' % value) 

62 

63 if self.COMMA_CHAR in numbers: 

64 raise error.PyAsn1Error('Comma in fractions disallowed: %r' % value) 

65 

66 if self.DOT_CHAR in numbers: 

67 

68 isModified = False 

69 

70 numbers = list(numbers) 

71 

72 searchIndex = min(numbers.index(self.DOT_CHAR) + 4, len(numbers) - 1) 

73 

74 while numbers[searchIndex] != self.DOT_CHAR: 

75 if numbers[searchIndex] == self.ZERO_CHAR: 

76 del numbers[searchIndex] 

77 isModified = True 

78 

79 searchIndex -= 1 

80 

81 searchIndex += 1 

82 

83 if searchIndex < len(numbers): 

84 if numbers[searchIndex] == self.Z_CHAR: 

85 # drop hanging comma 

86 del numbers[searchIndex - 1] 

87 isModified = True 

88 

89 if isModified: 

90 value = value.clone(numbers) 

91 

92 if not self.MIN_LENGTH < len(numbers) < self.MAX_LENGTH: 

93 raise error.PyAsn1Error('Length constraint violated: %r' % value) 

94 

95 options.update(maxChunkSize=1000) 

96 

97 return encoder.OctetStringEncoder.encodeValue( 

98 self, value, asn1Spec, encodeFun, **options 

99 ) 

100 

101 

102class GeneralizedTimeEncoder(TimeEncoderMixIn, encoder.OctetStringEncoder): 

103 MIN_LENGTH = 12 

104 MAX_LENGTH = 20 

105 

106 

107class UTCTimeEncoder(TimeEncoderMixIn, encoder.OctetStringEncoder): 

108 MIN_LENGTH = 10 

109 MAX_LENGTH = 14 

110 

111 

112class SetOfEncoder(encoder.SequenceOfEncoder): 

113 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

114 chunks = self._encodeComponents( 

115 value, asn1Spec, encodeFun, **options) 

116 

117 # sort by serialised and padded components 

118 if len(chunks) > 1: 

119 zero = str2octs('\x00') 

120 maxLen = max(map(len, chunks)) 

121 paddedChunks = [ 

122 (x.ljust(maxLen, zero), x) for x in chunks 

123 ] 

124 paddedChunks.sort(key=lambda x: x[0]) 

125 

126 chunks = [x[1] for x in paddedChunks] 

127 

128 return null.join(chunks), True, True 

129 

130 

131class SequenceOfEncoder(encoder.SequenceOfEncoder): 

132 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

133 

134 if options.get('ifNotEmpty', False) and not len(value): 

135 return null, True, True 

136 

137 chunks = self._encodeComponents( 

138 value, asn1Spec, encodeFun, **options) 

139 

140 return null.join(chunks), True, True 

141 

142 

143class SetEncoder(encoder.SequenceEncoder): 

144 @staticmethod 

145 def _componentSortKey(componentAndType): 

146 """Sort SET components by tag 

147 

148 Sort regardless of the Choice value (static sort) 

149 """ 

150 component, asn1Spec = componentAndType 

151 

152 if asn1Spec is None: 

153 asn1Spec = component 

154 

155 if asn1Spec.typeId == univ.Choice.typeId and not asn1Spec.tagSet: 

156 if asn1Spec.tagSet: 

157 return asn1Spec.tagSet 

158 else: 

159 return asn1Spec.componentType.minTagSet 

160 else: 

161 return asn1Spec.tagSet 

162 

163 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

164 

165 substrate = null 

166 

167 comps = [] 

168 compsMap = {} 

169 

170 if asn1Spec is None: 

171 # instance of ASN.1 schema 

172 inconsistency = value.isInconsistent 

173 if inconsistency: 

174 raise inconsistency 

175 

176 namedTypes = value.componentType 

177 

178 for idx, component in enumerate(value.values()): 

179 if namedTypes: 

180 namedType = namedTypes[idx] 

181 

182 if namedType.isOptional and not component.isValue: 

183 continue 

184 

185 if namedType.isDefaulted and component == namedType.asn1Object: 

186 continue 

187 

188 compsMap[id(component)] = namedType 

189 

190 else: 

191 compsMap[id(component)] = None 

192 

193 comps.append((component, asn1Spec)) 

194 

195 else: 

196 # bare Python value + ASN.1 schema 

197 for idx, namedType in enumerate(asn1Spec.componentType.namedTypes): 

198 

199 try: 

200 component = value[namedType.name] 

201 

202 except KeyError: 

203 raise error.PyAsn1Error('Component name "%s" not found in %r' % (namedType.name, value)) 

204 

205 if namedType.isOptional and namedType.name not in value: 

206 continue 

207 

208 if namedType.isDefaulted and component == namedType.asn1Object: 

209 continue 

210 

211 compsMap[id(component)] = namedType 

212 comps.append((component, asn1Spec[idx])) 

213 

214 for comp, compType in sorted(comps, key=self._componentSortKey): 

215 namedType = compsMap[id(comp)] 

216 

217 if namedType: 

218 options.update(ifNotEmpty=namedType.isOptional) 

219 

220 chunk = encodeFun(comp, compType, **options) 

221 

222 # wrap open type blob if needed 

223 if namedType and namedType.openType: 

224 wrapType = namedType.asn1Object 

225 if wrapType.tagSet and not wrapType.isSameTypeWith(comp): 

226 chunk = encodeFun(chunk, wrapType, **options) 

227 

228 substrate += chunk 

229 

230 return substrate, True, True 

231 

232 

233class SequenceEncoder(encoder.SequenceEncoder): 

234 omitEmptyOptionals = True 

235 

236 

237tagMap = encoder.tagMap.copy() 

238tagMap.update({ 

239 univ.Boolean.tagSet: BooleanEncoder(), 

240 univ.Real.tagSet: RealEncoder(), 

241 useful.GeneralizedTime.tagSet: GeneralizedTimeEncoder(), 

242 useful.UTCTime.tagSet: UTCTimeEncoder(), 

243 # Sequence & Set have same tags as SequenceOf & SetOf 

244 univ.SetOf.tagSet: SetOfEncoder(), 

245 univ.Sequence.typeId: SequenceEncoder() 

246}) 

247 

248typeMap = encoder.typeMap.copy() 

249typeMap.update({ 

250 univ.Boolean.typeId: BooleanEncoder(), 

251 univ.Real.typeId: RealEncoder(), 

252 useful.GeneralizedTime.typeId: GeneralizedTimeEncoder(), 

253 useful.UTCTime.typeId: UTCTimeEncoder(), 

254 # Sequence & Set have same tags as SequenceOf & SetOf 

255 univ.Set.typeId: SetEncoder(), 

256 univ.SetOf.typeId: SetOfEncoder(), 

257 univ.Sequence.typeId: SequenceEncoder(), 

258 univ.SequenceOf.typeId: SequenceOfEncoder() 

259}) 

260 

261 

262class Encoder(encoder.Encoder): 

263 fixedDefLengthMode = False 

264 fixedChunkSize = 1000 

265 

266#: Turns ASN.1 object into CER octet stream. 

267#: 

268#: Takes any ASN.1 object (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) 

269#: walks all its components recursively and produces a CER octet stream. 

270#: 

271#: Parameters 

272#: ---------- 

273#: value: either a Python or pyasn1 object (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) 

274#: A Python or pyasn1 object to encode. If Python object is given, `asnSpec` 

275#: parameter is required to guide the encoding process. 

276#: 

277#: Keyword Args 

278#: ------------ 

279#: asn1Spec: 

280#: Optional ASN.1 schema or value object e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative 

281#: 

282#: Returns 

283#: ------- 

284#: : :py:class:`bytes` (Python 3) or :py:class:`str` (Python 2) 

285#: Given ASN.1 object encoded into BER octet-stream 

286#: 

287#: Raises 

288#: ------ 

289#: ~pyasn1.error.PyAsn1Error 

290#: On encoding errors 

291#: 

292#: Examples 

293#: -------- 

294#: Encode Python value into CER with ASN.1 schema 

295#: 

296#: .. code-block:: pycon 

297#: 

298#: >>> seq = SequenceOf(componentType=Integer()) 

299#: >>> encode([1, 2, 3], asn1Spec=seq) 

300#: b'0\x80\x02\x01\x01\x02\x01\x02\x02\x01\x03\x00\x00' 

301#: 

302#: Encode ASN.1 value object into CER 

303#: 

304#: .. code-block:: pycon 

305#: 

306#: >>> seq = SequenceOf(componentType=Integer()) 

307#: >>> seq.extend([1, 2, 3]) 

308#: >>> encode(seq) 

309#: b'0\x80\x02\x01\x01\x02\x01\x02\x02\x01\x03\x00\x00' 

310#: 

311encode = Encoder(tagMap, typeMap) 

312 

313# EncoderFactory queries class instance and builds a map of tags -> encoders