Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pyasn1/codec/cer/encoder.py: 32%
139 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:16 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:16 +0000
1#
2# This file is part of pyasn1 software.
3#
4# Copyright (c) 2005-2019, Ilya Etingof <etingof@gmail.com>
5# License: http://snmplabs.com/pyasn1/license.html
6#
7from pyasn1 import error
8from pyasn1.codec.ber import encoder
9from pyasn1.compat.octets import str2octs, null
10from pyasn1.type import univ
11from pyasn1.type import useful
13__all__ = ['encode']
16class BooleanEncoder(encoder.IntegerEncoder):
17 def encodeValue(self, value, asn1Spec, encodeFun, **options):
18 if value == 0:
19 substrate = (0,)
20 else:
21 substrate = (255,)
22 return substrate, False, False
25class RealEncoder(encoder.RealEncoder):
26 def _chooseEncBase(self, value):
27 m, b, e = value
28 return self._dropFloatingPoint(m, b, e)
31# specialized GeneralStringEncoder here
33class TimeEncoderMixIn(object):
34 Z_CHAR = ord('Z')
35 PLUS_CHAR = ord('+')
36 MINUS_CHAR = ord('-')
37 COMMA_CHAR = ord(',')
38 DOT_CHAR = ord('.')
39 ZERO_CHAR = ord('0')
41 MIN_LENGTH = 12
42 MAX_LENGTH = 19
44 def encodeValue(self, value, asn1Spec, encodeFun, **options):
45 # CER encoding constraints:
46 # - minutes are mandatory, seconds are optional
47 # - sub-seconds must NOT be zero / no meaningless zeros
48 # - no hanging fraction dot
49 # - time in UTC (Z)
50 # - only dot is allowed for fractions
52 if asn1Spec is not None:
53 value = asn1Spec.clone(value)
55 numbers = value.asNumbers()
57 if self.PLUS_CHAR in numbers or self.MINUS_CHAR in numbers:
58 raise error.PyAsn1Error('Must be UTC time: %r' % value)
60 if numbers[-1] != self.Z_CHAR:
61 raise error.PyAsn1Error('Missing "Z" time zone specifier: %r' % value)
63 if self.COMMA_CHAR in numbers:
64 raise error.PyAsn1Error('Comma in fractions disallowed: %r' % value)
66 if self.DOT_CHAR in numbers:
68 isModified = False
70 numbers = list(numbers)
72 searchIndex = min(numbers.index(self.DOT_CHAR) + 4, len(numbers) - 1)
74 while numbers[searchIndex] != self.DOT_CHAR:
75 if numbers[searchIndex] == self.ZERO_CHAR:
76 del numbers[searchIndex]
77 isModified = True
79 searchIndex -= 1
81 searchIndex += 1
83 if searchIndex < len(numbers):
84 if numbers[searchIndex] == self.Z_CHAR:
85 # drop hanging comma
86 del numbers[searchIndex - 1]
87 isModified = True
89 if isModified:
90 value = value.clone(numbers)
92 if not self.MIN_LENGTH < len(numbers) < self.MAX_LENGTH:
93 raise error.PyAsn1Error('Length constraint violated: %r' % value)
95 options.update(maxChunkSize=1000)
97 return encoder.OctetStringEncoder.encodeValue(
98 self, value, asn1Spec, encodeFun, **options
99 )
102class GeneralizedTimeEncoder(TimeEncoderMixIn, encoder.OctetStringEncoder):
103 MIN_LENGTH = 12
104 MAX_LENGTH = 20
107class UTCTimeEncoder(TimeEncoderMixIn, encoder.OctetStringEncoder):
108 MIN_LENGTH = 10
109 MAX_LENGTH = 14
112class SetOfEncoder(encoder.SequenceOfEncoder):
113 def encodeValue(self, value, asn1Spec, encodeFun, **options):
114 chunks = self._encodeComponents(
115 value, asn1Spec, encodeFun, **options)
117 # sort by serialised and padded components
118 if len(chunks) > 1:
119 zero = str2octs('\x00')
120 maxLen = max(map(len, chunks))
121 paddedChunks = [
122 (x.ljust(maxLen, zero), x) for x in chunks
123 ]
124 paddedChunks.sort(key=lambda x: x[0])
126 chunks = [x[1] for x in paddedChunks]
128 return null.join(chunks), True, True
131class SequenceOfEncoder(encoder.SequenceOfEncoder):
132 def encodeValue(self, value, asn1Spec, encodeFun, **options):
134 if options.get('ifNotEmpty', False) and not len(value):
135 return null, True, True
137 chunks = self._encodeComponents(
138 value, asn1Spec, encodeFun, **options)
140 return null.join(chunks), True, True
143class SetEncoder(encoder.SequenceEncoder):
144 @staticmethod
145 def _componentSortKey(componentAndType):
146 """Sort SET components by tag
148 Sort regardless of the Choice value (static sort)
149 """
150 component, asn1Spec = componentAndType
152 if asn1Spec is None:
153 asn1Spec = component
155 if asn1Spec.typeId == univ.Choice.typeId and not asn1Spec.tagSet:
156 if asn1Spec.tagSet:
157 return asn1Spec.tagSet
158 else:
159 return asn1Spec.componentType.minTagSet
160 else:
161 return asn1Spec.tagSet
163 def encodeValue(self, value, asn1Spec, encodeFun, **options):
165 substrate = null
167 comps = []
168 compsMap = {}
170 if asn1Spec is None:
171 # instance of ASN.1 schema
172 inconsistency = value.isInconsistent
173 if inconsistency:
174 raise inconsistency
176 namedTypes = value.componentType
178 for idx, component in enumerate(value.values()):
179 if namedTypes:
180 namedType = namedTypes[idx]
182 if namedType.isOptional and not component.isValue:
183 continue
185 if namedType.isDefaulted and component == namedType.asn1Object:
186 continue
188 compsMap[id(component)] = namedType
190 else:
191 compsMap[id(component)] = None
193 comps.append((component, asn1Spec))
195 else:
196 # bare Python value + ASN.1 schema
197 for idx, namedType in enumerate(asn1Spec.componentType.namedTypes):
199 try:
200 component = value[namedType.name]
202 except KeyError:
203 raise error.PyAsn1Error('Component name "%s" not found in %r' % (namedType.name, value))
205 if namedType.isOptional and namedType.name not in value:
206 continue
208 if namedType.isDefaulted and component == namedType.asn1Object:
209 continue
211 compsMap[id(component)] = namedType
212 comps.append((component, asn1Spec[idx]))
214 for comp, compType in sorted(comps, key=self._componentSortKey):
215 namedType = compsMap[id(comp)]
217 if namedType:
218 options.update(ifNotEmpty=namedType.isOptional)
220 chunk = encodeFun(comp, compType, **options)
222 # wrap open type blob if needed
223 if namedType and namedType.openType:
224 wrapType = namedType.asn1Object
225 if wrapType.tagSet and not wrapType.isSameTypeWith(comp):
226 chunk = encodeFun(chunk, wrapType, **options)
228 substrate += chunk
230 return substrate, True, True
233class SequenceEncoder(encoder.SequenceEncoder):
234 omitEmptyOptionals = True
237tagMap = encoder.tagMap.copy()
238tagMap.update({
239 univ.Boolean.tagSet: BooleanEncoder(),
240 univ.Real.tagSet: RealEncoder(),
241 useful.GeneralizedTime.tagSet: GeneralizedTimeEncoder(),
242 useful.UTCTime.tagSet: UTCTimeEncoder(),
243 # Sequence & Set have same tags as SequenceOf & SetOf
244 univ.SetOf.tagSet: SetOfEncoder(),
245 univ.Sequence.typeId: SequenceEncoder()
246})
248typeMap = encoder.typeMap.copy()
249typeMap.update({
250 univ.Boolean.typeId: BooleanEncoder(),
251 univ.Real.typeId: RealEncoder(),
252 useful.GeneralizedTime.typeId: GeneralizedTimeEncoder(),
253 useful.UTCTime.typeId: UTCTimeEncoder(),
254 # Sequence & Set have same tags as SequenceOf & SetOf
255 univ.Set.typeId: SetEncoder(),
256 univ.SetOf.typeId: SetOfEncoder(),
257 univ.Sequence.typeId: SequenceEncoder(),
258 univ.SequenceOf.typeId: SequenceOfEncoder()
259})
262class Encoder(encoder.Encoder):
263 fixedDefLengthMode = False
264 fixedChunkSize = 1000
266#: Turns ASN.1 object into CER octet stream.
267#:
268#: Takes any ASN.1 object (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative)
269#: walks all its components recursively and produces a CER octet stream.
270#:
271#: Parameters
272#: ----------
273#: value: either a Python or pyasn1 object (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative)
274#: A Python or pyasn1 object to encode. If Python object is given, `asnSpec`
275#: parameter is required to guide the encoding process.
276#:
277#: Keyword Args
278#: ------------
279#: asn1Spec:
280#: Optional ASN.1 schema or value object e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative
281#:
282#: Returns
283#: -------
284#: : :py:class:`bytes` (Python 3) or :py:class:`str` (Python 2)
285#: Given ASN.1 object encoded into BER octet-stream
286#:
287#: Raises
288#: ------
289#: ~pyasn1.error.PyAsn1Error
290#: On encoding errors
291#:
292#: Examples
293#: --------
294#: Encode Python value into CER with ASN.1 schema
295#:
296#: .. code-block:: pycon
297#:
298#: >>> seq = SequenceOf(componentType=Integer())
299#: >>> encode([1, 2, 3], asn1Spec=seq)
300#: b'0\x80\x02\x01\x01\x02\x01\x02\x02\x01\x03\x00\x00'
301#:
302#: Encode ASN.1 value object into CER
303#:
304#: .. code-block:: pycon
305#:
306#: >>> seq = SequenceOf(componentType=Integer())
307#: >>> seq.extend([1, 2, 3])
308#: >>> encode(seq)
309#: b'0\x80\x02\x01\x01\x02\x01\x02\x02\x01\x03\x00\x00'
310#:
311encode = Encoder(tagMap, typeMap)
313# EncoderFactory queries class instance and builds a map of tags -> encoders