Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pyasn1/codec/ber/decoder.py: 16%
817 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:25 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:25 +0000
1#
2# This file is part of pyasn1 software.
3#
4# Copyright (c) 2005-2019, Ilya Etingof <etingof@gmail.com>
5# License: http://snmplabs.com/pyasn1/license.html
6#
7from pyasn1 import debug
8from pyasn1 import error
9from pyasn1.codec.ber import eoo
10from pyasn1.compat.integer import from_bytes
11from pyasn1.compat.octets import oct2int, octs2ints, ints2octs, null
12from pyasn1.type import base
13from pyasn1.type import char
14from pyasn1.type import tag
15from pyasn1.type import tagmap
16from pyasn1.type import univ
17from pyasn1.type import useful
19__all__ = ['decode']
21LOG = debug.registerLoggee(__name__, flags=debug.DEBUG_DECODER)
23noValue = base.noValue
26class AbstractDecoder(object):
27 protoComponent = None
29 def valueDecoder(self, substrate, asn1Spec,
30 tagSet=None, length=None, state=None,
31 decodeFun=None, substrateFun=None,
32 **options):
33 raise error.PyAsn1Error('Decoder not implemented for %s' % (tagSet,))
35 def indefLenValueDecoder(self, substrate, asn1Spec,
36 tagSet=None, length=None, state=None,
37 decodeFun=None, substrateFun=None,
38 **options):
39 raise error.PyAsn1Error('Indefinite length mode decoder not implemented for %s' % (tagSet,))
42class AbstractSimpleDecoder(AbstractDecoder):
43 @staticmethod
44 def substrateCollector(asn1Object, substrate, length):
45 return substrate[:length], substrate[length:]
47 def _createComponent(self, asn1Spec, tagSet, value, **options):
48 if options.get('native'):
49 return value
50 elif asn1Spec is None:
51 return self.protoComponent.clone(value, tagSet=tagSet)
52 elif value is noValue:
53 return asn1Spec
54 else:
55 return asn1Spec.clone(value)
58class ExplicitTagDecoder(AbstractSimpleDecoder):
59 protoComponent = univ.Any('')
61 def valueDecoder(self, substrate, asn1Spec,
62 tagSet=None, length=None, state=None,
63 decodeFun=None, substrateFun=None,
64 **options):
65 if substrateFun:
66 return substrateFun(
67 self._createComponent(asn1Spec, tagSet, '', **options),
68 substrate, length
69 )
71 head, tail = substrate[:length], substrate[length:]
73 value, _ = decodeFun(head, asn1Spec, tagSet, length, **options)
75 if LOG:
76 LOG('explicit tag container carries %d octets of trailing payload '
77 '(will be lost!): %s' % (len(_), debug.hexdump(_)))
79 return value, tail
81 def indefLenValueDecoder(self, substrate, asn1Spec,
82 tagSet=None, length=None, state=None,
83 decodeFun=None, substrateFun=None,
84 **options):
85 if substrateFun:
86 return substrateFun(
87 self._createComponent(asn1Spec, tagSet, '', **options),
88 substrate, length
89 )
91 value, substrate = decodeFun(substrate, asn1Spec, tagSet, length, **options)
93 eooMarker, substrate = decodeFun(substrate, allowEoo=True, **options)
95 if eooMarker is eoo.endOfOctets:
96 return value, substrate
97 else:
98 raise error.PyAsn1Error('Missing end-of-octets terminator')
101explicitTagDecoder = ExplicitTagDecoder()
104class IntegerDecoder(AbstractSimpleDecoder):
105 protoComponent = univ.Integer(0)
107 def valueDecoder(self, substrate, asn1Spec,
108 tagSet=None, length=None, state=None,
109 decodeFun=None, substrateFun=None,
110 **options):
112 if tagSet[0].tagFormat != tag.tagFormatSimple:
113 raise error.PyAsn1Error('Simple tag format expected')
115 head, tail = substrate[:length], substrate[length:]
117 if not head:
118 return self._createComponent(asn1Spec, tagSet, 0, **options), tail
120 value = from_bytes(head, signed=True)
122 return self._createComponent(asn1Spec, tagSet, value, **options), tail
125class BooleanDecoder(IntegerDecoder):
126 protoComponent = univ.Boolean(0)
128 def _createComponent(self, asn1Spec, tagSet, value, **options):
129 return IntegerDecoder._createComponent(
130 self, asn1Spec, tagSet, value and 1 or 0, **options)
133class BitStringDecoder(AbstractSimpleDecoder):
134 protoComponent = univ.BitString(())
135 supportConstructedForm = True
137 def valueDecoder(self, substrate, asn1Spec,
138 tagSet=None, length=None, state=None,
139 decodeFun=None, substrateFun=None,
140 **options):
141 head, tail = substrate[:length], substrate[length:]
143 if substrateFun:
144 return substrateFun(self._createComponent(
145 asn1Spec, tagSet, noValue, **options), substrate, length)
147 if not head:
148 raise error.PyAsn1Error('Empty BIT STRING substrate')
150 if tagSet[0].tagFormat == tag.tagFormatSimple: # XXX what tag to check?
152 trailingBits = oct2int(head[0])
153 if trailingBits > 7:
154 raise error.PyAsn1Error(
155 'Trailing bits overflow %s' % trailingBits
156 )
158 value = self.protoComponent.fromOctetString(
159 head[1:], internalFormat=True, padding=trailingBits)
161 return self._createComponent(asn1Spec, tagSet, value, **options), tail
163 if not self.supportConstructedForm:
164 raise error.PyAsn1Error('Constructed encoding form prohibited '
165 'at %s' % self.__class__.__name__)
167 if LOG:
168 LOG('assembling constructed serialization')
170 # All inner fragments are of the same type, treat them as octet string
171 substrateFun = self.substrateCollector
173 bitString = self.protoComponent.fromOctetString(null, internalFormat=True)
175 while head:
176 component, head = decodeFun(head, self.protoComponent,
177 substrateFun=substrateFun, **options)
179 trailingBits = oct2int(component[0])
180 if trailingBits > 7:
181 raise error.PyAsn1Error(
182 'Trailing bits overflow %s' % trailingBits
183 )
185 bitString = self.protoComponent.fromOctetString(
186 component[1:], internalFormat=True,
187 prepend=bitString, padding=trailingBits
188 )
190 return self._createComponent(asn1Spec, tagSet, bitString, **options), tail
192 def indefLenValueDecoder(self, substrate, asn1Spec,
193 tagSet=None, length=None, state=None,
194 decodeFun=None, substrateFun=None,
195 **options):
197 if substrateFun:
198 return substrateFun(self._createComponent(asn1Spec, tagSet, noValue, **options), substrate, length)
200 # All inner fragments are of the same type, treat them as octet string
201 substrateFun = self.substrateCollector
203 bitString = self.protoComponent.fromOctetString(null, internalFormat=True)
205 while substrate:
206 component, substrate = decodeFun(substrate, self.protoComponent,
207 substrateFun=substrateFun,
208 allowEoo=True, **options)
209 if component is eoo.endOfOctets:
210 break
212 trailingBits = oct2int(component[0])
213 if trailingBits > 7:
214 raise error.PyAsn1Error(
215 'Trailing bits overflow %s' % trailingBits
216 )
218 bitString = self.protoComponent.fromOctetString(
219 component[1:], internalFormat=True,
220 prepend=bitString, padding=trailingBits
221 )
223 else:
224 raise error.SubstrateUnderrunError('No EOO seen before substrate ends')
226 return self._createComponent(asn1Spec, tagSet, bitString, **options), substrate
229class OctetStringDecoder(AbstractSimpleDecoder):
230 protoComponent = univ.OctetString('')
231 supportConstructedForm = True
233 def valueDecoder(self, substrate, asn1Spec,
234 tagSet=None, length=None, state=None,
235 decodeFun=None, substrateFun=None,
236 **options):
237 head, tail = substrate[:length], substrate[length:]
239 if substrateFun:
240 return substrateFun(self._createComponent(asn1Spec, tagSet, noValue, **options),
241 substrate, length)
243 if tagSet[0].tagFormat == tag.tagFormatSimple: # XXX what tag to check?
244 return self._createComponent(asn1Spec, tagSet, head, **options), tail
246 if not self.supportConstructedForm:
247 raise error.PyAsn1Error('Constructed encoding form prohibited at %s' % self.__class__.__name__)
249 if LOG:
250 LOG('assembling constructed serialization')
252 # All inner fragments are of the same type, treat them as octet string
253 substrateFun = self.substrateCollector
255 header = null
257 while head:
258 component, head = decodeFun(head, self.protoComponent,
259 substrateFun=substrateFun,
260 **options)
261 header += component
263 return self._createComponent(asn1Spec, tagSet, header, **options), tail
265 def indefLenValueDecoder(self, substrate, asn1Spec,
266 tagSet=None, length=None, state=None,
267 decodeFun=None, substrateFun=None,
268 **options):
269 if substrateFun and substrateFun is not self.substrateCollector:
270 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options)
271 return substrateFun(asn1Object, substrate, length)
273 # All inner fragments are of the same type, treat them as octet string
274 substrateFun = self.substrateCollector
276 header = null
278 while substrate:
279 component, substrate = decodeFun(substrate,
280 self.protoComponent,
281 substrateFun=substrateFun,
282 allowEoo=True, **options)
283 if component is eoo.endOfOctets:
284 break
286 header += component
288 else:
289 raise error.SubstrateUnderrunError(
290 'No EOO seen before substrate ends'
291 )
293 return self._createComponent(asn1Spec, tagSet, header, **options), substrate
296class NullDecoder(AbstractSimpleDecoder):
297 protoComponent = univ.Null('')
299 def valueDecoder(self, substrate, asn1Spec,
300 tagSet=None, length=None, state=None,
301 decodeFun=None, substrateFun=None,
302 **options):
304 if tagSet[0].tagFormat != tag.tagFormatSimple:
305 raise error.PyAsn1Error('Simple tag format expected')
307 head, tail = substrate[:length], substrate[length:]
309 component = self._createComponent(asn1Spec, tagSet, '', **options)
311 if head:
312 raise error.PyAsn1Error('Unexpected %d-octet substrate for Null' % length)
314 return component, tail
317class ObjectIdentifierDecoder(AbstractSimpleDecoder):
318 protoComponent = univ.ObjectIdentifier(())
320 def valueDecoder(self, substrate, asn1Spec,
321 tagSet=None, length=None, state=None,
322 decodeFun=None, substrateFun=None,
323 **options):
324 if tagSet[0].tagFormat != tag.tagFormatSimple:
325 raise error.PyAsn1Error('Simple tag format expected')
327 head, tail = substrate[:length], substrate[length:]
328 if not head:
329 raise error.PyAsn1Error('Empty substrate')
331 head = octs2ints(head)
333 oid = ()
334 index = 0
335 substrateLen = len(head)
336 while index < substrateLen:
337 subId = head[index]
338 index += 1
339 if subId < 128:
340 oid += (subId,)
341 elif subId > 128:
342 # Construct subid from a number of octets
343 nextSubId = subId
344 subId = 0
345 while nextSubId >= 128:
346 subId = (subId << 7) + (nextSubId & 0x7F)
347 if index >= substrateLen:
348 raise error.SubstrateUnderrunError(
349 'Short substrate for sub-OID past %s' % (oid,)
350 )
351 nextSubId = head[index]
352 index += 1
353 oid += ((subId << 7) + nextSubId,)
354 elif subId == 128:
355 # ASN.1 spec forbids leading zeros (0x80) in OID
356 # encoding, tolerating it opens a vulnerability. See
357 # https://www.esat.kuleuven.be/cosic/publications/article-1432.pdf
358 # page 7
359 raise error.PyAsn1Error('Invalid octet 0x80 in OID encoding')
361 # Decode two leading arcs
362 if 0 <= oid[0] <= 39:
363 oid = (0,) + oid
364 elif 40 <= oid[0] <= 79:
365 oid = (1, oid[0] - 40) + oid[1:]
366 elif oid[0] >= 80:
367 oid = (2, oid[0] - 80) + oid[1:]
368 else:
369 raise error.PyAsn1Error('Malformed first OID octet: %s' % head[0])
371 return self._createComponent(asn1Spec, tagSet, oid, **options), tail
374class RealDecoder(AbstractSimpleDecoder):
375 protoComponent = univ.Real()
377 def valueDecoder(self, substrate, asn1Spec,
378 tagSet=None, length=None, state=None,
379 decodeFun=None, substrateFun=None,
380 **options):
381 if tagSet[0].tagFormat != tag.tagFormatSimple:
382 raise error.PyAsn1Error('Simple tag format expected')
384 head, tail = substrate[:length], substrate[length:]
386 if not head:
387 return self._createComponent(asn1Spec, tagSet, 0.0, **options), tail
389 fo = oct2int(head[0])
390 head = head[1:]
391 if fo & 0x80: # binary encoding
392 if not head:
393 raise error.PyAsn1Error("Incomplete floating-point value")
395 if LOG:
396 LOG('decoding binary encoded REAL')
398 n = (fo & 0x03) + 1
400 if n == 4:
401 n = oct2int(head[0])
402 head = head[1:]
404 eo, head = head[:n], head[n:]
406 if not eo or not head:
407 raise error.PyAsn1Error('Real exponent screwed')
409 e = oct2int(eo[0]) & 0x80 and -1 or 0
411 while eo: # exponent
412 e <<= 8
413 e |= oct2int(eo[0])
414 eo = eo[1:]
416 b = fo >> 4 & 0x03 # base bits
418 if b > 2:
419 raise error.PyAsn1Error('Illegal Real base')
421 if b == 1: # encbase = 8
422 e *= 3
424 elif b == 2: # encbase = 16
425 e *= 4
426 p = 0
428 while head: # value
429 p <<= 8
430 p |= oct2int(head[0])
431 head = head[1:]
433 if fo & 0x40: # sign bit
434 p = -p
436 sf = fo >> 2 & 0x03 # scale bits
437 p *= 2 ** sf
438 value = (p, 2, e)
440 elif fo & 0x40: # infinite value
441 if LOG:
442 LOG('decoding infinite REAL')
444 value = fo & 0x01 and '-inf' or 'inf'
446 elif fo & 0xc0 == 0: # character encoding
447 if not head:
448 raise error.PyAsn1Error("Incomplete floating-point value")
450 if LOG:
451 LOG('decoding character encoded REAL')
453 try:
454 if fo & 0x3 == 0x1: # NR1
455 value = (int(head), 10, 0)
457 elif fo & 0x3 == 0x2: # NR2
458 value = float(head)
460 elif fo & 0x3 == 0x3: # NR3
461 value = float(head)
463 else:
464 raise error.SubstrateUnderrunError(
465 'Unknown NR (tag %s)' % fo
466 )
468 except ValueError:
469 raise error.SubstrateUnderrunError(
470 'Bad character Real syntax'
471 )
473 else:
474 raise error.SubstrateUnderrunError(
475 'Unknown encoding (tag %s)' % fo
476 )
478 return self._createComponent(asn1Spec, tagSet, value, **options), tail
481class AbstractConstructedDecoder(AbstractDecoder):
482 protoComponent = None
485class UniversalConstructedTypeDecoder(AbstractConstructedDecoder):
486 protoRecordComponent = None
487 protoSequenceComponent = None
489 def _getComponentTagMap(self, asn1Object, idx):
490 raise NotImplementedError()
492 def _getComponentPositionByType(self, asn1Object, tagSet, idx):
493 raise NotImplementedError()
495 def _decodeComponents(self, substrate, tagSet=None, decodeFun=None, **options):
496 components = []
497 componentTypes = set()
499 while substrate:
500 component, substrate = decodeFun(substrate, **options)
501 if component is eoo.endOfOctets:
502 break
504 components.append(component)
505 componentTypes.add(component.tagSet)
507 # Now we have to guess is it SEQUENCE/SET or SEQUENCE OF/SET OF
508 # The heuristics is:
509 # * 1+ components of different types -> likely SEQUENCE/SET
510 # * otherwise -> likely SEQUENCE OF/SET OF
511 if len(componentTypes) > 1:
512 protoComponent = self.protoRecordComponent
514 else:
515 protoComponent = self.protoSequenceComponent
517 asn1Object = protoComponent.clone(
518 # construct tagSet from base tag from prototype ASN.1 object
519 # and additional tags recovered from the substrate
520 tagSet=tag.TagSet(protoComponent.tagSet.baseTag, *tagSet.superTags)
521 )
523 if LOG:
524 LOG('guessed %r container type (pass `asn1Spec` to guide the '
525 'decoder)' % asn1Object)
527 for idx, component in enumerate(components):
528 asn1Object.setComponentByPosition(
529 idx, component,
530 verifyConstraints=False,
531 matchTags=False, matchConstraints=False
532 )
534 return asn1Object, substrate
536 def valueDecoder(self, substrate, asn1Spec,
537 tagSet=None, length=None, state=None,
538 decodeFun=None, substrateFun=None,
539 **options):
540 if tagSet[0].tagFormat != tag.tagFormatConstructed:
541 raise error.PyAsn1Error('Constructed tag format expected')
543 head, tail = substrate[:length], substrate[length:]
545 if substrateFun is not None:
546 if asn1Spec is not None:
547 asn1Object = asn1Spec.clone()
549 elif self.protoComponent is not None:
550 asn1Object = self.protoComponent.clone(tagSet=tagSet)
552 else:
553 asn1Object = self.protoRecordComponent, self.protoSequenceComponent
555 return substrateFun(asn1Object, substrate, length)
557 if asn1Spec is None:
558 asn1Object, trailing = self._decodeComponents(
559 head, tagSet=tagSet, decodeFun=decodeFun, **options
560 )
562 if trailing:
563 if LOG:
564 LOG('Unused trailing %d octets encountered: %s' % (
565 len(trailing), debug.hexdump(trailing)))
567 return asn1Object, tail
569 asn1Object = asn1Spec.clone()
570 asn1Object.clear()
572 if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId):
574 namedTypes = asn1Spec.componentType
576 isSetType = asn1Spec.typeId == univ.Set.typeId
577 isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault
579 if LOG:
580 LOG('decoding %sdeterministic %s type %r chosen by type ID' % (
581 not isDeterministic and 'non-' or '', isSetType and 'SET' or '',
582 asn1Spec))
584 seenIndices = set()
585 idx = 0
586 while head:
587 if not namedTypes:
588 componentType = None
590 elif isSetType:
591 componentType = namedTypes.tagMapUnique
593 else:
594 try:
595 if isDeterministic:
596 componentType = namedTypes[idx].asn1Object
598 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
599 componentType = namedTypes.getTagMapNearPosition(idx)
601 else:
602 componentType = namedTypes[idx].asn1Object
604 except IndexError:
605 raise error.PyAsn1Error(
606 'Excessive components decoded at %r' % (asn1Spec,)
607 )
609 component, head = decodeFun(head, componentType, **options)
611 if not isDeterministic and namedTypes:
612 if isSetType:
613 idx = namedTypes.getPositionByType(component.effectiveTagSet)
615 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
616 idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx)
618 asn1Object.setComponentByPosition(
619 idx, component,
620 verifyConstraints=False,
621 matchTags=False, matchConstraints=False
622 )
624 seenIndices.add(idx)
625 idx += 1
627 if LOG:
628 LOG('seen component indices %s' % seenIndices)
630 if namedTypes:
631 if not namedTypes.requiredComponents.issubset(seenIndices):
632 raise error.PyAsn1Error(
633 'ASN.1 object %s has uninitialized '
634 'components' % asn1Object.__class__.__name__)
636 if namedTypes.hasOpenTypes:
638 openTypes = options.get('openTypes', {})
640 if LOG:
641 LOG('user-specified open types map:')
643 for k, v in openTypes.items():
644 LOG('%s -> %r' % (k, v))
646 if openTypes or options.get('decodeOpenTypes', False):
648 for idx, namedType in enumerate(namedTypes.namedTypes):
649 if not namedType.openType:
650 continue
652 if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue:
653 continue
655 governingValue = asn1Object.getComponentByName(
656 namedType.openType.name
657 )
659 try:
660 openType = openTypes[governingValue]
662 except KeyError:
664 if LOG:
665 LOG('default open types map of component '
666 '"%s.%s" governed by component "%s.%s"'
667 ':' % (asn1Object.__class__.__name__,
668 namedType.name,
669 asn1Object.__class__.__name__,
670 namedType.openType.name))
672 for k, v in namedType.openType.items():
673 LOG('%s -> %r' % (k, v))
675 try:
676 openType = namedType.openType[governingValue]
678 except KeyError:
679 if LOG:
680 LOG('failed to resolve open type by governing '
681 'value %r' % (governingValue,))
682 continue
684 if LOG:
685 LOG('resolved open type %r by governing '
686 'value %r' % (openType, governingValue))
688 containerValue = asn1Object.getComponentByPosition(idx)
690 if containerValue.typeId in (
691 univ.SetOf.typeId, univ.SequenceOf.typeId):
693 for pos, containerElement in enumerate(
694 containerValue):
696 component, rest = decodeFun(
697 containerValue[pos].asOctets(),
698 asn1Spec=openType, **options
699 )
701 containerValue[pos] = component
703 else:
704 component, rest = decodeFun(
705 asn1Object.getComponentByPosition(idx).asOctets(),
706 asn1Spec=openType, **options
707 )
709 asn1Object.setComponentByPosition(idx, component)
711 else:
712 inconsistency = asn1Object.isInconsistent
713 if inconsistency:
714 raise inconsistency
716 else:
717 asn1Object = asn1Spec.clone()
718 asn1Object.clear()
720 componentType = asn1Spec.componentType
722 if LOG:
723 LOG('decoding type %r chosen by given `asn1Spec`' % componentType)
725 idx = 0
727 while head:
728 component, head = decodeFun(head, componentType, **options)
729 asn1Object.setComponentByPosition(
730 idx, component,
731 verifyConstraints=False,
732 matchTags=False, matchConstraints=False
733 )
735 idx += 1
737 return asn1Object, tail
739 def indefLenValueDecoder(self, substrate, asn1Spec,
740 tagSet=None, length=None, state=None,
741 decodeFun=None, substrateFun=None,
742 **options):
743 if tagSet[0].tagFormat != tag.tagFormatConstructed:
744 raise error.PyAsn1Error('Constructed tag format expected')
746 if substrateFun is not None:
747 if asn1Spec is not None:
748 asn1Object = asn1Spec.clone()
750 elif self.protoComponent is not None:
751 asn1Object = self.protoComponent.clone(tagSet=tagSet)
753 else:
754 asn1Object = self.protoRecordComponent, self.protoSequenceComponent
756 return substrateFun(asn1Object, substrate, length)
758 if asn1Spec is None:
759 return self._decodeComponents(
760 substrate, tagSet=tagSet, decodeFun=decodeFun,
761 **dict(options, allowEoo=True)
762 )
764 asn1Object = asn1Spec.clone()
765 asn1Object.clear()
767 if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId):
769 namedTypes = asn1Object.componentType
771 isSetType = asn1Object.typeId == univ.Set.typeId
772 isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault
774 if LOG:
775 LOG('decoding %sdeterministic %s type %r chosen by type ID' % (
776 not isDeterministic and 'non-' or '', isSetType and 'SET' or '',
777 asn1Spec))
779 seenIndices = set()
780 idx = 0
781 while substrate:
782 if len(namedTypes) <= idx:
783 asn1Spec = None
785 elif isSetType:
786 asn1Spec = namedTypes.tagMapUnique
788 else:
789 try:
790 if isDeterministic:
791 asn1Spec = namedTypes[idx].asn1Object
793 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
794 asn1Spec = namedTypes.getTagMapNearPosition(idx)
796 else:
797 asn1Spec = namedTypes[idx].asn1Object
799 except IndexError:
800 raise error.PyAsn1Error(
801 'Excessive components decoded at %r' % (asn1Object,)
802 )
804 component, substrate = decodeFun(substrate, asn1Spec, allowEoo=True, **options)
805 if component is eoo.endOfOctets:
806 break
808 if not isDeterministic and namedTypes:
809 if isSetType:
810 idx = namedTypes.getPositionByType(component.effectiveTagSet)
811 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
812 idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx)
814 asn1Object.setComponentByPosition(
815 idx, component,
816 verifyConstraints=False,
817 matchTags=False, matchConstraints=False
818 )
820 seenIndices.add(idx)
821 idx += 1
823 else:
824 raise error.SubstrateUnderrunError(
825 'No EOO seen before substrate ends'
826 )
828 if LOG:
829 LOG('seen component indices %s' % seenIndices)
831 if namedTypes:
832 if not namedTypes.requiredComponents.issubset(seenIndices):
833 raise error.PyAsn1Error('ASN.1 object %s has uninitialized components' % asn1Object.__class__.__name__)
835 if namedTypes.hasOpenTypes:
837 openTypes = options.get('openTypes', {})
839 if LOG:
840 LOG('user-specified open types map:')
842 for k, v in openTypes.items():
843 LOG('%s -> %r' % (k, v))
845 if openTypes or options.get('decodeOpenTypes', False):
847 for idx, namedType in enumerate(namedTypes.namedTypes):
848 if not namedType.openType:
849 continue
851 if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue:
852 continue
854 governingValue = asn1Object.getComponentByName(
855 namedType.openType.name
856 )
858 try:
859 openType = openTypes[governingValue]
861 except KeyError:
863 if LOG:
864 LOG('default open types map of component '
865 '"%s.%s" governed by component "%s.%s"'
866 ':' % (asn1Object.__class__.__name__,
867 namedType.name,
868 asn1Object.__class__.__name__,
869 namedType.openType.name))
871 for k, v in namedType.openType.items():
872 LOG('%s -> %r' % (k, v))
874 try:
875 openType = namedType.openType[governingValue]
877 except KeyError:
878 if LOG:
879 LOG('failed to resolve open type by governing '
880 'value %r' % (governingValue,))
881 continue
883 if LOG:
884 LOG('resolved open type %r by governing '
885 'value %r' % (openType, governingValue))
887 containerValue = asn1Object.getComponentByPosition(idx)
889 if containerValue.typeId in (
890 univ.SetOf.typeId, univ.SequenceOf.typeId):
892 for pos, containerElement in enumerate(
893 containerValue):
895 component, rest = decodeFun(
896 containerValue[pos].asOctets(),
897 asn1Spec=openType, **dict(options, allowEoo=True)
898 )
900 containerValue[pos] = component
902 else:
903 component, rest = decodeFun(
904 asn1Object.getComponentByPosition(idx).asOctets(),
905 asn1Spec=openType, **dict(options, allowEoo=True)
906 )
908 if component is not eoo.endOfOctets:
909 asn1Object.setComponentByPosition(idx, component)
911 else:
912 inconsistency = asn1Object.isInconsistent
913 if inconsistency:
914 raise inconsistency
916 else:
917 asn1Object = asn1Spec.clone()
918 asn1Object.clear()
920 componentType = asn1Spec.componentType
922 if LOG:
923 LOG('decoding type %r chosen by given `asn1Spec`' % componentType)
925 idx = 0
927 while substrate:
928 component, substrate = decodeFun(substrate, componentType, allowEoo=True, **options)
930 if component is eoo.endOfOctets:
931 break
933 asn1Object.setComponentByPosition(
934 idx, component,
935 verifyConstraints=False,
936 matchTags=False, matchConstraints=False
937 )
939 idx += 1
941 else:
942 raise error.SubstrateUnderrunError(
943 'No EOO seen before substrate ends'
944 )
946 return asn1Object, substrate
949class SequenceOrSequenceOfDecoder(UniversalConstructedTypeDecoder):
950 protoRecordComponent = univ.Sequence()
951 protoSequenceComponent = univ.SequenceOf()
954class SequenceDecoder(SequenceOrSequenceOfDecoder):
955 protoComponent = univ.Sequence()
958class SequenceOfDecoder(SequenceOrSequenceOfDecoder):
959 protoComponent = univ.SequenceOf()
962class SetOrSetOfDecoder(UniversalConstructedTypeDecoder):
963 protoRecordComponent = univ.Set()
964 protoSequenceComponent = univ.SetOf()
967class SetDecoder(SetOrSetOfDecoder):
968 protoComponent = univ.Set()
972class SetOfDecoder(SetOrSetOfDecoder):
973 protoComponent = univ.SetOf()
976class ChoiceDecoder(AbstractConstructedDecoder):
977 protoComponent = univ.Choice()
979 def valueDecoder(self, substrate, asn1Spec,
980 tagSet=None, length=None, state=None,
981 decodeFun=None, substrateFun=None,
982 **options):
983 head, tail = substrate[:length], substrate[length:]
985 if asn1Spec is None:
986 asn1Object = self.protoComponent.clone(tagSet=tagSet)
988 else:
989 asn1Object = asn1Spec.clone()
991 if substrateFun:
992 return substrateFun(asn1Object, substrate, length)
994 if asn1Object.tagSet == tagSet:
995 if LOG:
996 LOG('decoding %s as explicitly tagged CHOICE' % (tagSet,))
998 component, head = decodeFun(
999 head, asn1Object.componentTagMap, **options
1000 )
1002 else:
1003 if LOG:
1004 LOG('decoding %s as untagged CHOICE' % (tagSet,))
1006 component, head = decodeFun(
1007 head, asn1Object.componentTagMap,
1008 tagSet, length, state, **options
1009 )
1011 effectiveTagSet = component.effectiveTagSet
1013 if LOG:
1014 LOG('decoded component %s, effective tag set %s' % (component, effectiveTagSet))
1016 asn1Object.setComponentByType(
1017 effectiveTagSet, component,
1018 verifyConstraints=False,
1019 matchTags=False, matchConstraints=False,
1020 innerFlag=False
1021 )
1023 return asn1Object, tail
1025 def indefLenValueDecoder(self, substrate, asn1Spec,
1026 tagSet=None, length=None, state=None,
1027 decodeFun=None, substrateFun=None,
1028 **options):
1029 if asn1Spec is None:
1030 asn1Object = self.protoComponent.clone(tagSet=tagSet)
1031 else:
1032 asn1Object = asn1Spec.clone()
1034 if substrateFun:
1035 return substrateFun(asn1Object, substrate, length)
1037 if asn1Object.tagSet == tagSet:
1038 if LOG:
1039 LOG('decoding %s as explicitly tagged CHOICE' % (tagSet,))
1041 component, substrate = decodeFun(
1042 substrate, asn1Object.componentType.tagMapUnique, **options
1043 )
1045 # eat up EOO marker
1046 eooMarker, substrate = decodeFun(
1047 substrate, allowEoo=True, **options
1048 )
1050 if eooMarker is not eoo.endOfOctets:
1051 raise error.PyAsn1Error('No EOO seen before substrate ends')
1053 else:
1054 if LOG:
1055 LOG('decoding %s as untagged CHOICE' % (tagSet,))
1057 component, substrate = decodeFun(
1058 substrate, asn1Object.componentType.tagMapUnique,
1059 tagSet, length, state, **options
1060 )
1062 effectiveTagSet = component.effectiveTagSet
1064 if LOG:
1065 LOG('decoded component %s, effective tag set %s' % (component, effectiveTagSet))
1067 asn1Object.setComponentByType(
1068 effectiveTagSet, component,
1069 verifyConstraints=False,
1070 matchTags=False, matchConstraints=False,
1071 innerFlag=False
1072 )
1074 return asn1Object, substrate
1077class AnyDecoder(AbstractSimpleDecoder):
1078 protoComponent = univ.Any()
1080 def valueDecoder(self, substrate, asn1Spec,
1081 tagSet=None, length=None, state=None,
1082 decodeFun=None, substrateFun=None,
1083 **options):
1084 if asn1Spec is None:
1085 isUntagged = True
1087 elif asn1Spec.__class__ is tagmap.TagMap:
1088 isUntagged = tagSet not in asn1Spec.tagMap
1090 else:
1091 isUntagged = tagSet != asn1Spec.tagSet
1093 if isUntagged:
1094 fullSubstrate = options['fullSubstrate']
1096 # untagged Any container, recover inner header substrate
1097 length += len(fullSubstrate) - len(substrate)
1098 substrate = fullSubstrate
1100 if LOG:
1101 LOG('decoding as untagged ANY, substrate %s' % debug.hexdump(substrate))
1103 if substrateFun:
1104 return substrateFun(self._createComponent(asn1Spec, tagSet, noValue, **options),
1105 substrate, length)
1107 head, tail = substrate[:length], substrate[length:]
1109 return self._createComponent(asn1Spec, tagSet, head, **options), tail
1111 def indefLenValueDecoder(self, substrate, asn1Spec,
1112 tagSet=None, length=None, state=None,
1113 decodeFun=None, substrateFun=None,
1114 **options):
1115 if asn1Spec is None:
1116 isTagged = False
1118 elif asn1Spec.__class__ is tagmap.TagMap:
1119 isTagged = tagSet in asn1Spec.tagMap
1121 else:
1122 isTagged = tagSet == asn1Spec.tagSet
1124 if isTagged:
1125 # tagged Any type -- consume header substrate
1126 header = null
1128 if LOG:
1129 LOG('decoding as tagged ANY')
1131 else:
1132 fullSubstrate = options['fullSubstrate']
1134 # untagged Any, recover header substrate
1135 header = fullSubstrate[:-len(substrate)]
1137 if LOG:
1138 LOG('decoding as untagged ANY, header substrate %s' % debug.hexdump(header))
1140 # Any components do not inherit initial tag
1141 asn1Spec = self.protoComponent
1143 if substrateFun and substrateFun is not self.substrateCollector:
1144 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options)
1145 return substrateFun(asn1Object, header + substrate, length + len(header))
1147 if LOG:
1148 LOG('assembling constructed serialization')
1150 # All inner fragments are of the same type, treat them as octet string
1151 substrateFun = self.substrateCollector
1153 while substrate:
1154 component, substrate = decodeFun(substrate, asn1Spec,
1155 substrateFun=substrateFun,
1156 allowEoo=True, **options)
1157 if component is eoo.endOfOctets:
1158 break
1160 header += component
1162 else:
1163 raise error.SubstrateUnderrunError(
1164 'No EOO seen before substrate ends'
1165 )
1167 if substrateFun:
1168 return header, substrate
1170 else:
1171 return self._createComponent(asn1Spec, tagSet, header, **options), substrate
1174# character string types
1175class UTF8StringDecoder(OctetStringDecoder):
1176 protoComponent = char.UTF8String()
1179class NumericStringDecoder(OctetStringDecoder):
1180 protoComponent = char.NumericString()
1183class PrintableStringDecoder(OctetStringDecoder):
1184 protoComponent = char.PrintableString()
1187class TeletexStringDecoder(OctetStringDecoder):
1188 protoComponent = char.TeletexString()
1191class VideotexStringDecoder(OctetStringDecoder):
1192 protoComponent = char.VideotexString()
1195class IA5StringDecoder(OctetStringDecoder):
1196 protoComponent = char.IA5String()
1199class GraphicStringDecoder(OctetStringDecoder):
1200 protoComponent = char.GraphicString()
1203class VisibleStringDecoder(OctetStringDecoder):
1204 protoComponent = char.VisibleString()
1207class GeneralStringDecoder(OctetStringDecoder):
1208 protoComponent = char.GeneralString()
1211class UniversalStringDecoder(OctetStringDecoder):
1212 protoComponent = char.UniversalString()
1215class BMPStringDecoder(OctetStringDecoder):
1216 protoComponent = char.BMPString()
1219# "useful" types
1220class ObjectDescriptorDecoder(OctetStringDecoder):
1221 protoComponent = useful.ObjectDescriptor()
1224class GeneralizedTimeDecoder(OctetStringDecoder):
1225 protoComponent = useful.GeneralizedTime()
1228class UTCTimeDecoder(OctetStringDecoder):
1229 protoComponent = useful.UTCTime()
1232tagMap = {
1233 univ.Integer.tagSet: IntegerDecoder(),
1234 univ.Boolean.tagSet: BooleanDecoder(),
1235 univ.BitString.tagSet: BitStringDecoder(),
1236 univ.OctetString.tagSet: OctetStringDecoder(),
1237 univ.Null.tagSet: NullDecoder(),
1238 univ.ObjectIdentifier.tagSet: ObjectIdentifierDecoder(),
1239 univ.Enumerated.tagSet: IntegerDecoder(),
1240 univ.Real.tagSet: RealDecoder(),
1241 univ.Sequence.tagSet: SequenceOrSequenceOfDecoder(), # conflicts with SequenceOf
1242 univ.Set.tagSet: SetOrSetOfDecoder(), # conflicts with SetOf
1243 univ.Choice.tagSet: ChoiceDecoder(), # conflicts with Any
1244 # character string types
1245 char.UTF8String.tagSet: UTF8StringDecoder(),
1246 char.NumericString.tagSet: NumericStringDecoder(),
1247 char.PrintableString.tagSet: PrintableStringDecoder(),
1248 char.TeletexString.tagSet: TeletexStringDecoder(),
1249 char.VideotexString.tagSet: VideotexStringDecoder(),
1250 char.IA5String.tagSet: IA5StringDecoder(),
1251 char.GraphicString.tagSet: GraphicStringDecoder(),
1252 char.VisibleString.tagSet: VisibleStringDecoder(),
1253 char.GeneralString.tagSet: GeneralStringDecoder(),
1254 char.UniversalString.tagSet: UniversalStringDecoder(),
1255 char.BMPString.tagSet: BMPStringDecoder(),
1256 # useful types
1257 useful.ObjectDescriptor.tagSet: ObjectDescriptorDecoder(),
1258 useful.GeneralizedTime.tagSet: GeneralizedTimeDecoder(),
1259 useful.UTCTime.tagSet: UTCTimeDecoder()
1260}
1262# Type-to-codec map for ambiguous ASN.1 types
1263typeMap = {
1264 univ.Set.typeId: SetDecoder(),
1265 univ.SetOf.typeId: SetOfDecoder(),
1266 univ.Sequence.typeId: SequenceDecoder(),
1267 univ.SequenceOf.typeId: SequenceOfDecoder(),
1268 univ.Choice.typeId: ChoiceDecoder(),
1269 univ.Any.typeId: AnyDecoder()
1270}
1272# Put in non-ambiguous types for faster codec lookup
1273for typeDecoder in tagMap.values():
1274 if typeDecoder.protoComponent is not None:
1275 typeId = typeDecoder.protoComponent.__class__.typeId
1276 if typeId is not None and typeId not in typeMap:
1277 typeMap[typeId] = typeDecoder
1280(stDecodeTag,
1281 stDecodeLength,
1282 stGetValueDecoder,
1283 stGetValueDecoderByAsn1Spec,
1284 stGetValueDecoderByTag,
1285 stTryAsExplicitTag,
1286 stDecodeValue,
1287 stDumpRawValue,
1288 stErrorCondition,
1289 stStop) = [x for x in range(10)]
1292class Decoder(object):
1293 defaultErrorState = stErrorCondition
1294 #defaultErrorState = stDumpRawValue
1295 defaultRawDecoder = AnyDecoder()
1296 supportIndefLength = True
1298 # noinspection PyDefaultArgument
1299 def __init__(self, tagMap, typeMap={}):
1300 self.__tagMap = tagMap
1301 self.__typeMap = typeMap
1302 # Tag & TagSet objects caches
1303 self.__tagCache = {}
1304 self.__tagSetCache = {}
1305 self.__eooSentinel = ints2octs((0, 0))
1307 def __call__(self, substrate, asn1Spec=None,
1308 tagSet=None, length=None, state=stDecodeTag,
1309 decodeFun=None, substrateFun=None,
1310 **options):
1312 if LOG:
1313 LOG('decoder called at scope %s with state %d, working with up to %d octets of substrate: %s' % (debug.scope, state, len(substrate), debug.hexdump(substrate)))
1315 allowEoo = options.pop('allowEoo', False)
1317 # Look for end-of-octets sentinel
1318 if allowEoo and self.supportIndefLength:
1319 if substrate[:2] == self.__eooSentinel:
1320 if LOG:
1321 LOG('end-of-octets sentinel found')
1322 return eoo.endOfOctets, substrate[2:]
1324 value = noValue
1326 tagMap = self.__tagMap
1327 typeMap = self.__typeMap
1328 tagCache = self.__tagCache
1329 tagSetCache = self.__tagSetCache
1331 fullSubstrate = substrate
1333 while state is not stStop:
1335 if state is stDecodeTag:
1336 if not substrate:
1337 raise error.SubstrateUnderrunError(
1338 'Short octet stream on tag decoding'
1339 )
1341 # Decode tag
1342 isShortTag = True
1343 firstOctet = substrate[0]
1344 substrate = substrate[1:]
1346 try:
1347 lastTag = tagCache[firstOctet]
1349 except KeyError:
1350 integerTag = oct2int(firstOctet)
1351 tagClass = integerTag & 0xC0
1352 tagFormat = integerTag & 0x20
1353 tagId = integerTag & 0x1F
1355 if tagId == 0x1F:
1356 isShortTag = False
1357 lengthOctetIdx = 0
1358 tagId = 0
1360 try:
1361 while True:
1362 integerTag = oct2int(substrate[lengthOctetIdx])
1363 lengthOctetIdx += 1
1364 tagId <<= 7
1365 tagId |= (integerTag & 0x7F)
1366 if not integerTag & 0x80:
1367 break
1369 substrate = substrate[lengthOctetIdx:]
1371 except IndexError:
1372 raise error.SubstrateUnderrunError(
1373 'Short octet stream on long tag decoding'
1374 )
1376 lastTag = tag.Tag(
1377 tagClass=tagClass, tagFormat=tagFormat, tagId=tagId
1378 )
1380 if isShortTag:
1381 # cache short tags
1382 tagCache[firstOctet] = lastTag
1384 if tagSet is None:
1385 if isShortTag:
1386 try:
1387 tagSet = tagSetCache[firstOctet]
1389 except KeyError:
1390 # base tag not recovered
1391 tagSet = tag.TagSet((), lastTag)
1392 tagSetCache[firstOctet] = tagSet
1393 else:
1394 tagSet = tag.TagSet((), lastTag)
1396 else:
1397 tagSet = lastTag + tagSet
1399 state = stDecodeLength
1401 if LOG:
1402 LOG('tag decoded into %s, decoding length' % tagSet)
1404 if state is stDecodeLength:
1405 # Decode length
1406 if not substrate:
1407 raise error.SubstrateUnderrunError(
1408 'Short octet stream on length decoding'
1409 )
1411 firstOctet = oct2int(substrate[0])
1413 if firstOctet < 128:
1414 size = 1
1415 length = firstOctet
1417 elif firstOctet > 128:
1418 size = firstOctet & 0x7F
1419 # encoded in size bytes
1420 encodedLength = octs2ints(substrate[1:size + 1])
1421 # missing check on maximum size, which shouldn't be a
1422 # problem, we can handle more than is possible
1423 if len(encodedLength) != size:
1424 raise error.SubstrateUnderrunError(
1425 '%s<%s at %s' % (size, len(encodedLength), tagSet)
1426 )
1428 length = 0
1429 for lengthOctet in encodedLength:
1430 length <<= 8
1431 length |= lengthOctet
1432 size += 1
1434 else:
1435 size = 1
1436 length = -1
1438 substrate = substrate[size:]
1440 if length == -1:
1441 if not self.supportIndefLength:
1442 raise error.PyAsn1Error('Indefinite length encoding not supported by this codec')
1444 else:
1445 if len(substrate) < length:
1446 raise error.SubstrateUnderrunError('%d-octet short' % (length - len(substrate)))
1448 state = stGetValueDecoder
1450 if LOG:
1451 LOG('value length decoded into %d, payload substrate is: %s' % (length, debug.hexdump(length == -1 and substrate or substrate[:length])))
1453 if state is stGetValueDecoder:
1454 if asn1Spec is None:
1455 state = stGetValueDecoderByTag
1457 else:
1458 state = stGetValueDecoderByAsn1Spec
1459 #
1460 # There're two ways of creating subtypes in ASN.1 what influences
1461 # decoder operation. These methods are:
1462 # 1) Either base types used in or no IMPLICIT tagging has been
1463 # applied on subtyping.
1464 # 2) Subtype syntax drops base type information (by means of
1465 # IMPLICIT tagging.
1466 # The first case allows for complete tag recovery from substrate
1467 # while the second one requires original ASN.1 type spec for
1468 # decoding.
1469 #
1470 # In either case a set of tags (tagSet) is coming from substrate
1471 # in an incremental, tag-by-tag fashion (this is the case of
1472 # EXPLICIT tag which is most basic). Outermost tag comes first
1473 # from the wire.
1474 #
1475 if state is stGetValueDecoderByTag:
1476 try:
1477 concreteDecoder = tagMap[tagSet]
1479 except KeyError:
1480 concreteDecoder = None
1482 if concreteDecoder:
1483 state = stDecodeValue
1485 else:
1486 try:
1487 concreteDecoder = tagMap[tagSet[:1]]
1489 except KeyError:
1490 concreteDecoder = None
1492 if concreteDecoder:
1493 state = stDecodeValue
1494 else:
1495 state = stTryAsExplicitTag
1497 if LOG:
1498 LOG('codec %s chosen by a built-in type, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag'))
1499 debug.scope.push(concreteDecoder is None and '?' or concreteDecoder.protoComponent.__class__.__name__)
1501 if state is stGetValueDecoderByAsn1Spec:
1503 if asn1Spec.__class__ is tagmap.TagMap:
1504 try:
1505 chosenSpec = asn1Spec[tagSet]
1507 except KeyError:
1508 chosenSpec = None
1510 if LOG:
1511 LOG('candidate ASN.1 spec is a map of:')
1513 for firstOctet, v in asn1Spec.presentTypes.items():
1514 LOG(' %s -> %s' % (firstOctet, v.__class__.__name__))
1516 if asn1Spec.skipTypes:
1517 LOG('but neither of: ')
1518 for firstOctet, v in asn1Spec.skipTypes.items():
1519 LOG(' %s -> %s' % (firstOctet, v.__class__.__name__))
1520 LOG('new candidate ASN.1 spec is %s, chosen by %s' % (chosenSpec is None and '<none>' or chosenSpec.prettyPrintType(), tagSet))
1522 elif tagSet == asn1Spec.tagSet or tagSet in asn1Spec.tagMap:
1523 chosenSpec = asn1Spec
1524 if LOG:
1525 LOG('candidate ASN.1 spec is %s' % asn1Spec.__class__.__name__)
1527 else:
1528 chosenSpec = None
1530 if chosenSpec is not None:
1531 try:
1532 # ambiguous type or just faster codec lookup
1533 concreteDecoder = typeMap[chosenSpec.typeId]
1535 if LOG:
1536 LOG('value decoder chosen for an ambiguous type by type ID %s' % (chosenSpec.typeId,))
1538 except KeyError:
1539 # use base type for codec lookup to recover untagged types
1540 baseTagSet = tag.TagSet(chosenSpec.tagSet.baseTag, chosenSpec.tagSet.baseTag)
1541 try:
1542 # base type or tagged subtype
1543 concreteDecoder = tagMap[baseTagSet]
1545 if LOG:
1546 LOG('value decoder chosen by base %s' % (baseTagSet,))
1548 except KeyError:
1549 concreteDecoder = None
1551 if concreteDecoder:
1552 asn1Spec = chosenSpec
1553 state = stDecodeValue
1555 else:
1556 state = stTryAsExplicitTag
1558 else:
1559 concreteDecoder = None
1560 state = stTryAsExplicitTag
1562 if LOG:
1563 LOG('codec %s chosen by ASN.1 spec, decoding %s' % (state is stDecodeValue and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag'))
1564 debug.scope.push(chosenSpec is None and '?' or chosenSpec.__class__.__name__)
1566 if state is stDecodeValue:
1567 if not options.get('recursiveFlag', True) and not substrateFun: # deprecate this
1568 substrateFun = lambda a, b, c: (a, b[:c])
1570 options.update(fullSubstrate=fullSubstrate)
1572 if length == -1: # indef length
1573 value, substrate = concreteDecoder.indefLenValueDecoder(
1574 substrate, asn1Spec,
1575 tagSet, length, stGetValueDecoder,
1576 self, substrateFun,
1577 **options
1578 )
1580 else:
1581 value, substrate = concreteDecoder.valueDecoder(
1582 substrate, asn1Spec,
1583 tagSet, length, stGetValueDecoder,
1584 self, substrateFun,
1585 **options
1586 )
1588 if LOG:
1589 LOG('codec %s yields type %s, value:\n%s\n...remaining substrate is: %s' % (concreteDecoder.__class__.__name__, value.__class__.__name__, isinstance(value, base.Asn1Item) and value.prettyPrint() or value, substrate and debug.hexdump(substrate) or '<none>'))
1591 state = stStop
1592 break
1594 if state is stTryAsExplicitTag:
1595 if (tagSet and
1596 tagSet[0].tagFormat == tag.tagFormatConstructed and
1597 tagSet[0].tagClass != tag.tagClassUniversal):
1598 # Assume explicit tagging
1599 concreteDecoder = explicitTagDecoder
1600 state = stDecodeValue
1602 else:
1603 concreteDecoder = None
1604 state = self.defaultErrorState
1606 if LOG:
1607 LOG('codec %s chosen, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as failure'))
1609 if state is stDumpRawValue:
1610 concreteDecoder = self.defaultRawDecoder
1612 if LOG:
1613 LOG('codec %s chosen, decoding value' % concreteDecoder.__class__.__name__)
1615 state = stDecodeValue
1617 if state is stErrorCondition:
1618 raise error.PyAsn1Error(
1619 '%s not in asn1Spec: %r' % (tagSet, asn1Spec)
1620 )
1622 if LOG:
1623 debug.scope.pop()
1624 LOG('decoder left scope %s, call completed' % debug.scope)
1626 return value, substrate
1629#: Turns BER octet stream into an ASN.1 object.
1630#:
1631#: Takes BER octet-stream and decode it into an ASN.1 object
1632#: (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) which
1633#: may be a scalar or an arbitrary nested structure.
1634#:
1635#: Parameters
1636#: ----------
1637#: substrate: :py:class:`bytes` (Python 3) or :py:class:`str` (Python 2)
1638#: BER octet-stream
1639#:
1640#: Keyword Args
1641#: ------------
1642#: asn1Spec: any pyasn1 type object e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative
1643#: A pyasn1 type object to act as a template guiding the decoder. Depending on the ASN.1 structure
1644#: being decoded, *asn1Spec* may or may not be required. Most common reason for
1645#: it to require is that ASN.1 structure is encoded in *IMPLICIT* tagging mode.
1646#:
1647#: Returns
1648#: -------
1649#: : :py:class:`tuple`
1650#: A tuple of pyasn1 object recovered from BER substrate (:py:class:`~pyasn1.type.base.PyAsn1Item` derivative)
1651#: and the unprocessed trailing portion of the *substrate* (may be empty)
1652#:
1653#: Raises
1654#: ------
1655#: ~pyasn1.error.PyAsn1Error, ~pyasn1.error.SubstrateUnderrunError
1656#: On decoding errors
1657#:
1658#: Examples
1659#: --------
1660#: Decode BER serialisation without ASN.1 schema
1661#:
1662#: .. code-block:: pycon
1663#:
1664#: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03')
1665#: >>> str(s)
1666#: SequenceOf:
1667#: 1 2 3
1668#:
1669#: Decode BER serialisation with ASN.1 schema
1670#:
1671#: .. code-block:: pycon
1672#:
1673#: >>> seq = SequenceOf(componentType=Integer())
1674#: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03', asn1Spec=seq)
1675#: >>> str(s)
1676#: SequenceOf:
1677#: 1 2 3
1678#:
1679decode = Decoder(tagMap, typeMap)
1681# XXX
1682# non-recursive decoding; return position rather than substrate