Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pyasn1/codec/ber/decoder.py: 15%
1020 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
1#
2# This file is part of pyasn1 software.
3#
4# Copyright (c) 2005-2020, Ilya Etingof <etingof@gmail.com>
5# License: https://pyasn1.readthedocs.io/en/latest/license.html
6#
7import io
8import os
9import sys
12from pyasn1 import debug
13from pyasn1 import error
14from pyasn1.codec.ber import eoo
15from pyasn1.codec.streaming import asSeekableStream
16from pyasn1.codec.streaming import isEndOfStream
17from pyasn1.codec.streaming import peekIntoStream
18from pyasn1.codec.streaming import readFromStream
19from pyasn1.compat import _MISSING
20from pyasn1.compat.integer import from_bytes
21from pyasn1.compat.octets import oct2int, octs2ints, ints2octs, null
22from pyasn1.error import PyAsn1Error
23from pyasn1.type import base
24from pyasn1.type import char
25from pyasn1.type import tag
26from pyasn1.type import tagmap
27from pyasn1.type import univ
28from pyasn1.type import useful
30__all__ = ['StreamingDecoder', 'Decoder', 'decode']
32LOG = debug.registerLoggee(__name__, flags=debug.DEBUG_DECODER)
34noValue = base.noValue
36SubstrateUnderrunError = error.SubstrateUnderrunError
39class AbstractPayloadDecoder(object):
40 protoComponent = None
42 def valueDecoder(self, substrate, asn1Spec,
43 tagSet=None, length=None, state=None,
44 decodeFun=None, substrateFun=None,
45 **options):
46 """Decode value with fixed byte length.
48 The decoder is allowed to consume as many bytes as necessary.
49 """
50 raise error.PyAsn1Error('SingleItemDecoder not implemented for %s' % (tagSet,)) # TODO: Seems more like an NotImplementedError?
52 def indefLenValueDecoder(self, substrate, asn1Spec,
53 tagSet=None, length=None, state=None,
54 decodeFun=None, substrateFun=None,
55 **options):
56 """Decode value with undefined length.
58 The decoder is allowed to consume as many bytes as necessary.
59 """
60 raise error.PyAsn1Error('Indefinite length mode decoder not implemented for %s' % (tagSet,)) # TODO: Seems more like an NotImplementedError?
62 @staticmethod
63 def _passAsn1Object(asn1Object, options):
64 if 'asn1Object' not in options:
65 options['asn1Object'] = asn1Object
67 return options
70class AbstractSimplePayloadDecoder(AbstractPayloadDecoder):
71 @staticmethod
72 def substrateCollector(asn1Object, substrate, length, options):
73 for chunk in readFromStream(substrate, length, options):
74 yield chunk
76 def _createComponent(self, asn1Spec, tagSet, value, **options):
77 if options.get('native'):
78 return value
79 elif asn1Spec is None:
80 return self.protoComponent.clone(value, tagSet=tagSet)
81 elif value is noValue:
82 return asn1Spec
83 else:
84 return asn1Spec.clone(value)
87class RawPayloadDecoder(AbstractSimplePayloadDecoder):
88 protoComponent = univ.Any('')
90 def valueDecoder(self, substrate, asn1Spec,
91 tagSet=None, length=None, state=None,
92 decodeFun=None, substrateFun=None,
93 **options):
94 if substrateFun:
95 asn1Object = self._createComponent(asn1Spec, tagSet, '', **options)
97 for chunk in substrateFun(asn1Object, substrate, length, options):
98 yield chunk
100 return
102 for value in decodeFun(substrate, asn1Spec, tagSet, length, **options):
103 yield value
105 def indefLenValueDecoder(self, substrate, asn1Spec,
106 tagSet=None, length=None, state=None,
107 decodeFun=None, substrateFun=None,
108 **options):
109 if substrateFun:
110 asn1Object = self._createComponent(asn1Spec, tagSet, '', **options)
112 for chunk in substrateFun(asn1Object, substrate, length, options):
113 yield chunk
115 return
117 while True:
118 for value in decodeFun(
119 substrate, asn1Spec, tagSet, length,
120 allowEoo=True, **options):
122 if value is eoo.endOfOctets:
123 return
125 yield value
128rawPayloadDecoder = RawPayloadDecoder()
131class IntegerPayloadDecoder(AbstractSimplePayloadDecoder):
132 protoComponent = univ.Integer(0)
134 def valueDecoder(self, substrate, asn1Spec,
135 tagSet=None, length=None, state=None,
136 decodeFun=None, substrateFun=None,
137 **options):
139 if tagSet[0].tagFormat != tag.tagFormatSimple:
140 raise error.PyAsn1Error('Simple tag format expected')
142 for chunk in readFromStream(substrate, length, options):
143 if isinstance(chunk, SubstrateUnderrunError):
144 yield chunk
146 if chunk:
147 value = from_bytes(chunk, signed=True)
149 else:
150 value = 0
152 yield self._createComponent(asn1Spec, tagSet, value, **options)
155class BooleanPayloadDecoder(IntegerPayloadDecoder):
156 protoComponent = univ.Boolean(0)
158 def _createComponent(self, asn1Spec, tagSet, value, **options):
159 return IntegerPayloadDecoder._createComponent(
160 self, asn1Spec, tagSet, value and 1 or 0, **options)
163class BitStringPayloadDecoder(AbstractSimplePayloadDecoder):
164 protoComponent = univ.BitString(())
165 supportConstructedForm = True
167 def valueDecoder(self, substrate, asn1Spec,
168 tagSet=None, length=None, state=None,
169 decodeFun=None, substrateFun=None,
170 **options):
172 if substrateFun:
173 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options)
175 for chunk in substrateFun(asn1Object, substrate, length, options):
176 yield chunk
178 return
180 if not length:
181 raise error.PyAsn1Error('Empty BIT STRING substrate')
183 for chunk in isEndOfStream(substrate):
184 if isinstance(chunk, SubstrateUnderrunError):
185 yield chunk
187 if chunk:
188 raise error.PyAsn1Error('Empty BIT STRING substrate')
190 if tagSet[0].tagFormat == tag.tagFormatSimple: # XXX what tag to check?
192 for trailingBits in readFromStream(substrate, 1, options):
193 if isinstance(trailingBits, SubstrateUnderrunError):
194 yield trailingBits
196 trailingBits = ord(trailingBits)
197 if trailingBits > 7:
198 raise error.PyAsn1Error(
199 'Trailing bits overflow %s' % trailingBits
200 )
202 for chunk in readFromStream(substrate, length - 1, options):
203 if isinstance(chunk, SubstrateUnderrunError):
204 yield chunk
206 value = self.protoComponent.fromOctetString(
207 chunk, internalFormat=True, padding=trailingBits)
209 yield self._createComponent(asn1Spec, tagSet, value, **options)
211 return
213 if not self.supportConstructedForm:
214 raise error.PyAsn1Error('Constructed encoding form prohibited '
215 'at %s' % self.__class__.__name__)
217 if LOG:
218 LOG('assembling constructed serialization')
220 # All inner fragments are of the same type, treat them as octet string
221 substrateFun = self.substrateCollector
223 bitString = self.protoComponent.fromOctetString(null, internalFormat=True)
225 current_position = substrate.tell()
227 while substrate.tell() - current_position < length:
228 for component in decodeFun(
229 substrate, self.protoComponent, substrateFun=substrateFun,
230 **options):
231 if isinstance(component, SubstrateUnderrunError):
232 yield component
234 trailingBits = oct2int(component[0])
235 if trailingBits > 7:
236 raise error.PyAsn1Error(
237 'Trailing bits overflow %s' % trailingBits
238 )
240 bitString = self.protoComponent.fromOctetString(
241 component[1:], internalFormat=True,
242 prepend=bitString, padding=trailingBits
243 )
245 yield self._createComponent(asn1Spec, tagSet, bitString, **options)
247 def indefLenValueDecoder(self, substrate, asn1Spec,
248 tagSet=None, length=None, state=None,
249 decodeFun=None, substrateFun=None,
250 **options):
252 if substrateFun:
253 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options)
255 for chunk in substrateFun(asn1Object, substrate, length, options):
256 yield chunk
258 return
260 # All inner fragments are of the same type, treat them as octet string
261 substrateFun = self.substrateCollector
263 bitString = self.protoComponent.fromOctetString(null, internalFormat=True)
265 while True: # loop over fragments
267 for component in decodeFun(
268 substrate, self.protoComponent, substrateFun=substrateFun,
269 allowEoo=True, **options):
271 if component is eoo.endOfOctets:
272 break
274 if isinstance(component, SubstrateUnderrunError):
275 yield component
277 if component is eoo.endOfOctets:
278 break
280 trailingBits = oct2int(component[0])
281 if trailingBits > 7:
282 raise error.PyAsn1Error(
283 'Trailing bits overflow %s' % trailingBits
284 )
286 bitString = self.protoComponent.fromOctetString(
287 component[1:], internalFormat=True,
288 prepend=bitString, padding=trailingBits
289 )
291 yield self._createComponent(asn1Spec, tagSet, bitString, **options)
294class OctetStringPayloadDecoder(AbstractSimplePayloadDecoder):
295 protoComponent = univ.OctetString('')
296 supportConstructedForm = True
298 def valueDecoder(self, substrate, asn1Spec,
299 tagSet=None, length=None, state=None,
300 decodeFun=None, substrateFun=None,
301 **options):
302 if substrateFun:
303 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options)
305 for chunk in substrateFun(asn1Object, substrate, length, options):
306 yield chunk
308 return
310 if tagSet[0].tagFormat == tag.tagFormatSimple: # XXX what tag to check?
311 for chunk in readFromStream(substrate, length, options):
312 if isinstance(chunk, SubstrateUnderrunError):
313 yield chunk
315 yield self._createComponent(asn1Spec, tagSet, chunk, **options)
317 return
319 if not self.supportConstructedForm:
320 raise error.PyAsn1Error('Constructed encoding form prohibited at %s' % self.__class__.__name__)
322 if LOG:
323 LOG('assembling constructed serialization')
325 # All inner fragments are of the same type, treat them as octet string
326 substrateFun = self.substrateCollector
328 header = null
330 original_position = substrate.tell()
331 # head = popSubstream(substrate, length)
332 while substrate.tell() - original_position < length:
333 for component in decodeFun(
334 substrate, self.protoComponent, substrateFun=substrateFun,
335 **options):
336 if isinstance(component, SubstrateUnderrunError):
337 yield component
339 header += component
341 yield self._createComponent(asn1Spec, tagSet, header, **options)
343 def indefLenValueDecoder(self, substrate, asn1Spec,
344 tagSet=None, length=None, state=None,
345 decodeFun=None, substrateFun=None,
346 **options):
347 if substrateFun and substrateFun is not self.substrateCollector:
348 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options)
350 for chunk in substrateFun(asn1Object, substrate, length, options):
351 yield chunk
353 return
355 # All inner fragments are of the same type, treat them as octet string
356 substrateFun = self.substrateCollector
358 header = null
360 while True: # loop over fragments
362 for component in decodeFun(
363 substrate, self.protoComponent, substrateFun=substrateFun,
364 allowEoo=True, **options):
366 if isinstance(component, SubstrateUnderrunError):
367 yield component
369 if component is eoo.endOfOctets:
370 break
372 if component is eoo.endOfOctets:
373 break
375 header += component
377 yield self._createComponent(asn1Spec, tagSet, header, **options)
380class NullPayloadDecoder(AbstractSimplePayloadDecoder):
381 protoComponent = univ.Null('')
383 def valueDecoder(self, substrate, asn1Spec,
384 tagSet=None, length=None, state=None,
385 decodeFun=None, substrateFun=None,
386 **options):
388 if tagSet[0].tagFormat != tag.tagFormatSimple:
389 raise error.PyAsn1Error('Simple tag format expected')
391 for chunk in readFromStream(substrate, length, options):
392 if isinstance(chunk, SubstrateUnderrunError):
393 yield chunk
395 component = self._createComponent(asn1Spec, tagSet, '', **options)
397 if chunk:
398 raise error.PyAsn1Error('Unexpected %d-octet substrate for Null' % length)
400 yield component
403class ObjectIdentifierPayloadDecoder(AbstractSimplePayloadDecoder):
404 protoComponent = univ.ObjectIdentifier(())
406 def valueDecoder(self, substrate, asn1Spec,
407 tagSet=None, length=None, state=None,
408 decodeFun=None, substrateFun=None,
409 **options):
410 if tagSet[0].tagFormat != tag.tagFormatSimple:
411 raise error.PyAsn1Error('Simple tag format expected')
413 for chunk in readFromStream(substrate, length, options):
414 if isinstance(chunk, SubstrateUnderrunError):
415 yield chunk
417 if not chunk:
418 raise error.PyAsn1Error('Empty substrate')
420 chunk = octs2ints(chunk)
422 oid = ()
423 index = 0
424 substrateLen = len(chunk)
425 while index < substrateLen:
426 subId = chunk[index]
427 index += 1
428 if subId < 128:
429 oid += (subId,)
430 elif subId > 128:
431 # Construct subid from a number of octets
432 nextSubId = subId
433 subId = 0
434 while nextSubId >= 128:
435 subId = (subId << 7) + (nextSubId & 0x7F)
436 if index >= substrateLen:
437 raise error.SubstrateUnderrunError(
438 'Short substrate for sub-OID past %s' % (oid,)
439 )
440 nextSubId = chunk[index]
441 index += 1
442 oid += ((subId << 7) + nextSubId,)
443 elif subId == 128:
444 # ASN.1 spec forbids leading zeros (0x80) in OID
445 # encoding, tolerating it opens a vulnerability. See
446 # https://www.esat.kuleuven.be/cosic/publications/article-1432.pdf
447 # page 7
448 raise error.PyAsn1Error('Invalid octet 0x80 in OID encoding')
450 # Decode two leading arcs
451 if 0 <= oid[0] <= 39:
452 oid = (0,) + oid
453 elif 40 <= oid[0] <= 79:
454 oid = (1, oid[0] - 40) + oid[1:]
455 elif oid[0] >= 80:
456 oid = (2, oid[0] - 80) + oid[1:]
457 else:
458 raise error.PyAsn1Error('Malformed first OID octet: %s' % chunk[0])
460 yield self._createComponent(asn1Spec, tagSet, oid, **options)
463class RealPayloadDecoder(AbstractSimplePayloadDecoder):
464 protoComponent = univ.Real()
466 def valueDecoder(self, substrate, asn1Spec,
467 tagSet=None, length=None, state=None,
468 decodeFun=None, substrateFun=None,
469 **options):
470 if tagSet[0].tagFormat != tag.tagFormatSimple:
471 raise error.PyAsn1Error('Simple tag format expected')
473 for chunk in readFromStream(substrate, length, options):
474 if isinstance(chunk, SubstrateUnderrunError):
475 yield chunk
477 if not chunk:
478 yield self._createComponent(asn1Spec, tagSet, 0.0, **options)
479 return
481 fo = oct2int(chunk[0])
482 chunk = chunk[1:]
483 if fo & 0x80: # binary encoding
484 if not chunk:
485 raise error.PyAsn1Error("Incomplete floating-point value")
487 if LOG:
488 LOG('decoding binary encoded REAL')
490 n = (fo & 0x03) + 1
492 if n == 4:
493 n = oct2int(chunk[0])
494 chunk = chunk[1:]
496 eo, chunk = chunk[:n], chunk[n:]
498 if not eo or not chunk:
499 raise error.PyAsn1Error('Real exponent screwed')
501 e = oct2int(eo[0]) & 0x80 and -1 or 0
503 while eo: # exponent
504 e <<= 8
505 e |= oct2int(eo[0])
506 eo = eo[1:]
508 b = fo >> 4 & 0x03 # base bits
510 if b > 2:
511 raise error.PyAsn1Error('Illegal Real base')
513 if b == 1: # encbase = 8
514 e *= 3
516 elif b == 2: # encbase = 16
517 e *= 4
518 p = 0
520 while chunk: # value
521 p <<= 8
522 p |= oct2int(chunk[0])
523 chunk = chunk[1:]
525 if fo & 0x40: # sign bit
526 p = -p
528 sf = fo >> 2 & 0x03 # scale bits
529 p *= 2 ** sf
530 value = (p, 2, e)
532 elif fo & 0x40: # infinite value
533 if LOG:
534 LOG('decoding infinite REAL')
536 value = fo & 0x01 and '-inf' or 'inf'
538 elif fo & 0xc0 == 0: # character encoding
539 if not chunk:
540 raise error.PyAsn1Error("Incomplete floating-point value")
542 if LOG:
543 LOG('decoding character encoded REAL')
545 try:
546 if fo & 0x3 == 0x1: # NR1
547 value = (int(chunk), 10, 0)
549 elif fo & 0x3 == 0x2: # NR2
550 value = float(chunk)
552 elif fo & 0x3 == 0x3: # NR3
553 value = float(chunk)
555 else:
556 raise error.SubstrateUnderrunError(
557 'Unknown NR (tag %s)' % fo
558 )
560 except ValueError:
561 raise error.SubstrateUnderrunError(
562 'Bad character Real syntax'
563 )
565 else:
566 raise error.SubstrateUnderrunError(
567 'Unknown encoding (tag %s)' % fo
568 )
570 yield self._createComponent(asn1Spec, tagSet, value, **options)
573class AbstractConstructedPayloadDecoder(AbstractPayloadDecoder):
574 protoComponent = None
577class ConstructedPayloadDecoderBase(AbstractConstructedPayloadDecoder):
578 protoRecordComponent = None
579 protoSequenceComponent = None
581 def _getComponentTagMap(self, asn1Object, idx):
582 raise NotImplementedError()
584 def _getComponentPositionByType(self, asn1Object, tagSet, idx):
585 raise NotImplementedError()
587 def _decodeComponentsSchemaless(
588 self, substrate, tagSet=None, decodeFun=None,
589 length=None, **options):
591 asn1Object = None
593 components = []
594 componentTypes = set()
596 original_position = substrate.tell()
598 while length == -1 or substrate.tell() < original_position + length:
599 for component in decodeFun(substrate, **options):
600 if isinstance(component, SubstrateUnderrunError):
601 yield component
603 if length == -1 and component is eoo.endOfOctets:
604 break
606 components.append(component)
607 componentTypes.add(component.tagSet)
609 # Now we have to guess is it SEQUENCE/SET or SEQUENCE OF/SET OF
610 # The heuristics is:
611 # * 1+ components of different types -> likely SEQUENCE/SET
612 # * otherwise -> likely SEQUENCE OF/SET OF
613 if len(componentTypes) > 1:
614 protoComponent = self.protoRecordComponent
616 else:
617 protoComponent = self.protoSequenceComponent
619 asn1Object = protoComponent.clone(
620 # construct tagSet from base tag from prototype ASN.1 object
621 # and additional tags recovered from the substrate
622 tagSet=tag.TagSet(protoComponent.tagSet.baseTag, *tagSet.superTags)
623 )
625 if LOG:
626 LOG('guessed %r container type (pass `asn1Spec` to guide the '
627 'decoder)' % asn1Object)
629 for idx, component in enumerate(components):
630 asn1Object.setComponentByPosition(
631 idx, component,
632 verifyConstraints=False,
633 matchTags=False, matchConstraints=False
634 )
636 yield asn1Object
638 def valueDecoder(self, substrate, asn1Spec,
639 tagSet=None, length=None, state=None,
640 decodeFun=None, substrateFun=None,
641 **options):
642 if tagSet[0].tagFormat != tag.tagFormatConstructed:
643 raise error.PyAsn1Error('Constructed tag format expected')
645 original_position = substrate.tell()
647 if substrateFun:
648 if asn1Spec is not None:
649 asn1Object = asn1Spec.clone()
651 elif self.protoComponent is not None:
652 asn1Object = self.protoComponent.clone(tagSet=tagSet)
654 else:
655 asn1Object = self.protoRecordComponent, self.protoSequenceComponent
657 for chunk in substrateFun(asn1Object, substrate, length, options):
658 yield chunk
660 return
662 if asn1Spec is None:
663 for asn1Object in self._decodeComponentsSchemaless(
664 substrate, tagSet=tagSet, decodeFun=decodeFun,
665 length=length, **options):
666 if isinstance(asn1Object, SubstrateUnderrunError):
667 yield asn1Object
669 if substrate.tell() < original_position + length:
670 if LOG:
671 for trailing in readFromStream(substrate, context=options):
672 if isinstance(trailing, SubstrateUnderrunError):
673 yield trailing
675 LOG('Unused trailing %d octets encountered: %s' % (
676 len(trailing), debug.hexdump(trailing)))
678 yield asn1Object
680 return
682 asn1Object = asn1Spec.clone()
683 asn1Object.clear()
685 options = self._passAsn1Object(asn1Object, options)
687 if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId):
689 namedTypes = asn1Spec.componentType
691 isSetType = asn1Spec.typeId == univ.Set.typeId
692 isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault
694 if LOG:
695 LOG('decoding %sdeterministic %s type %r chosen by type ID' % (
696 not isDeterministic and 'non-' or '', isSetType and 'SET' or '',
697 asn1Spec))
699 seenIndices = set()
700 idx = 0
701 while substrate.tell() - original_position < length:
702 if not namedTypes:
703 componentType = None
705 elif isSetType:
706 componentType = namedTypes.tagMapUnique
708 else:
709 try:
710 if isDeterministic:
711 componentType = namedTypes[idx].asn1Object
713 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
714 componentType = namedTypes.getTagMapNearPosition(idx)
716 else:
717 componentType = namedTypes[idx].asn1Object
719 except IndexError:
720 raise error.PyAsn1Error(
721 'Excessive components decoded at %r' % (asn1Spec,)
722 )
724 for component in decodeFun(substrate, componentType, **options):
725 if isinstance(component, SubstrateUnderrunError):
726 yield component
728 if not isDeterministic and namedTypes:
729 if isSetType:
730 idx = namedTypes.getPositionByType(component.effectiveTagSet)
732 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
733 idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx)
735 asn1Object.setComponentByPosition(
736 idx, component,
737 verifyConstraints=False,
738 matchTags=False, matchConstraints=False
739 )
741 seenIndices.add(idx)
742 idx += 1
744 if LOG:
745 LOG('seen component indices %s' % seenIndices)
747 if namedTypes:
748 if not namedTypes.requiredComponents.issubset(seenIndices):
749 raise error.PyAsn1Error(
750 'ASN.1 object %s has uninitialized '
751 'components' % asn1Object.__class__.__name__)
753 if namedTypes.hasOpenTypes:
755 openTypes = options.get('openTypes', {})
757 if LOG:
758 LOG('user-specified open types map:')
760 for k, v in openTypes.items():
761 LOG('%s -> %r' % (k, v))
763 if openTypes or options.get('decodeOpenTypes', False):
765 for idx, namedType in enumerate(namedTypes.namedTypes):
766 if not namedType.openType:
767 continue
769 if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue:
770 continue
772 governingValue = asn1Object.getComponentByName(
773 namedType.openType.name
774 )
776 try:
777 openType = openTypes[governingValue]
779 except KeyError:
781 if LOG:
782 LOG('default open types map of component '
783 '"%s.%s" governed by component "%s.%s"'
784 ':' % (asn1Object.__class__.__name__,
785 namedType.name,
786 asn1Object.__class__.__name__,
787 namedType.openType.name))
789 for k, v in namedType.openType.items():
790 LOG('%s -> %r' % (k, v))
792 try:
793 openType = namedType.openType[governingValue]
795 except KeyError:
796 if LOG:
797 LOG('failed to resolve open type by governing '
798 'value %r' % (governingValue,))
799 continue
801 if LOG:
802 LOG('resolved open type %r by governing '
803 'value %r' % (openType, governingValue))
805 containerValue = asn1Object.getComponentByPosition(idx)
807 if containerValue.typeId in (
808 univ.SetOf.typeId, univ.SequenceOf.typeId):
810 for pos, containerElement in enumerate(
811 containerValue):
813 stream = asSeekableStream(containerValue[pos].asOctets())
815 for component in decodeFun(stream, asn1Spec=openType, **options):
816 if isinstance(component, SubstrateUnderrunError):
817 yield component
819 containerValue[pos] = component
821 else:
822 stream = asSeekableStream(asn1Object.getComponentByPosition(idx).asOctets())
824 for component in decodeFun(stream, asn1Spec=openType, **options):
825 if isinstance(component, SubstrateUnderrunError):
826 yield component
828 asn1Object.setComponentByPosition(idx, component)
830 else:
831 inconsistency = asn1Object.isInconsistent
832 if inconsistency:
833 raise inconsistency
835 else:
836 componentType = asn1Spec.componentType
838 if LOG:
839 LOG('decoding type %r chosen by given `asn1Spec`' % componentType)
841 idx = 0
843 while substrate.tell() - original_position < length:
844 for component in decodeFun(substrate, componentType, **options):
845 if isinstance(component, SubstrateUnderrunError):
846 yield component
848 asn1Object.setComponentByPosition(
849 idx, component,
850 verifyConstraints=False,
851 matchTags=False, matchConstraints=False
852 )
854 idx += 1
856 yield asn1Object
858 def indefLenValueDecoder(self, substrate, asn1Spec,
859 tagSet=None, length=None, state=None,
860 decodeFun=None, substrateFun=None,
861 **options):
862 if tagSet[0].tagFormat != tag.tagFormatConstructed:
863 raise error.PyAsn1Error('Constructed tag format expected')
865 if substrateFun is not None:
866 if asn1Spec is not None:
867 asn1Object = asn1Spec.clone()
869 elif self.protoComponent is not None:
870 asn1Object = self.protoComponent.clone(tagSet=tagSet)
872 else:
873 asn1Object = self.protoRecordComponent, self.protoSequenceComponent
875 for chunk in substrateFun(asn1Object, substrate, length, options):
876 yield chunk
878 return
880 if asn1Spec is None:
881 for asn1Object in self._decodeComponentsSchemaless(
882 substrate, tagSet=tagSet, decodeFun=decodeFun,
883 length=length, **dict(options, allowEoo=True)):
884 if isinstance(asn1Object, SubstrateUnderrunError):
885 yield asn1Object
887 yield asn1Object
889 return
891 asn1Object = asn1Spec.clone()
892 asn1Object.clear()
894 options = self._passAsn1Object(asn1Object, options)
896 if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId):
898 namedTypes = asn1Object.componentType
900 isSetType = asn1Object.typeId == univ.Set.typeId
901 isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault
903 if LOG:
904 LOG('decoding %sdeterministic %s type %r chosen by type ID' % (
905 not isDeterministic and 'non-' or '', isSetType and 'SET' or '',
906 asn1Spec))
908 seenIndices = set()
910 idx = 0
912 while True: # loop over components
913 if len(namedTypes) <= idx:
914 asn1Spec = None
916 elif isSetType:
917 asn1Spec = namedTypes.tagMapUnique
919 else:
920 try:
921 if isDeterministic:
922 asn1Spec = namedTypes[idx].asn1Object
924 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
925 asn1Spec = namedTypes.getTagMapNearPosition(idx)
927 else:
928 asn1Spec = namedTypes[idx].asn1Object
930 except IndexError:
931 raise error.PyAsn1Error(
932 'Excessive components decoded at %r' % (asn1Object,)
933 )
935 for component in decodeFun(substrate, asn1Spec, allowEoo=True, **options):
937 if isinstance(component, SubstrateUnderrunError):
938 yield component
940 if component is eoo.endOfOctets:
941 break
943 if component is eoo.endOfOctets:
944 break
946 if not isDeterministic and namedTypes:
947 if isSetType:
948 idx = namedTypes.getPositionByType(component.effectiveTagSet)
950 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
951 idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx)
953 asn1Object.setComponentByPosition(
954 idx, component,
955 verifyConstraints=False,
956 matchTags=False, matchConstraints=False
957 )
959 seenIndices.add(idx)
960 idx += 1
962 if LOG:
963 LOG('seen component indices %s' % seenIndices)
965 if namedTypes:
966 if not namedTypes.requiredComponents.issubset(seenIndices):
967 raise error.PyAsn1Error(
968 'ASN.1 object %s has uninitialized '
969 'components' % asn1Object.__class__.__name__)
971 if namedTypes.hasOpenTypes:
973 openTypes = options.get('openTypes', {})
975 if LOG:
976 LOG('user-specified open types map:')
978 for k, v in openTypes.items():
979 LOG('%s -> %r' % (k, v))
981 if openTypes or options.get('decodeOpenTypes', False):
983 for idx, namedType in enumerate(namedTypes.namedTypes):
984 if not namedType.openType:
985 continue
987 if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue:
988 continue
990 governingValue = asn1Object.getComponentByName(
991 namedType.openType.name
992 )
994 try:
995 openType = openTypes[governingValue]
997 except KeyError:
999 if LOG:
1000 LOG('default open types map of component '
1001 '"%s.%s" governed by component "%s.%s"'
1002 ':' % (asn1Object.__class__.__name__,
1003 namedType.name,
1004 asn1Object.__class__.__name__,
1005 namedType.openType.name))
1007 for k, v in namedType.openType.items():
1008 LOG('%s -> %r' % (k, v))
1010 try:
1011 openType = namedType.openType[governingValue]
1013 except KeyError:
1014 if LOG:
1015 LOG('failed to resolve open type by governing '
1016 'value %r' % (governingValue,))
1017 continue
1019 if LOG:
1020 LOG('resolved open type %r by governing '
1021 'value %r' % (openType, governingValue))
1023 containerValue = asn1Object.getComponentByPosition(idx)
1025 if containerValue.typeId in (
1026 univ.SetOf.typeId, univ.SequenceOf.typeId):
1028 for pos, containerElement in enumerate(
1029 containerValue):
1031 stream = asSeekableStream(containerValue[pos].asOctets())
1033 for component in decodeFun(stream, asn1Spec=openType,
1034 **dict(options, allowEoo=True)):
1035 if isinstance(component, SubstrateUnderrunError):
1036 yield component
1038 if component is eoo.endOfOctets:
1039 break
1041 containerValue[pos] = component
1043 else:
1044 stream = asSeekableStream(asn1Object.getComponentByPosition(idx).asOctets())
1045 for component in decodeFun(stream, asn1Spec=openType,
1046 **dict(options, allowEoo=True)):
1047 if isinstance(component, SubstrateUnderrunError):
1048 yield component
1050 if component is eoo.endOfOctets:
1051 break
1053 asn1Object.setComponentByPosition(idx, component)
1055 else:
1056 inconsistency = asn1Object.isInconsistent
1057 if inconsistency:
1058 raise inconsistency
1060 else:
1061 componentType = asn1Spec.componentType
1063 if LOG:
1064 LOG('decoding type %r chosen by given `asn1Spec`' % componentType)
1066 idx = 0
1068 while True:
1070 for component in decodeFun(
1071 substrate, componentType, allowEoo=True, **options):
1073 if isinstance(component, SubstrateUnderrunError):
1074 yield component
1076 if component is eoo.endOfOctets:
1077 break
1079 if component is eoo.endOfOctets:
1080 break
1082 asn1Object.setComponentByPosition(
1083 idx, component,
1084 verifyConstraints=False,
1085 matchTags=False, matchConstraints=False
1086 )
1088 idx += 1
1090 yield asn1Object
1093class SequenceOrSequenceOfPayloadDecoder(ConstructedPayloadDecoderBase):
1094 protoRecordComponent = univ.Sequence()
1095 protoSequenceComponent = univ.SequenceOf()
1098class SequencePayloadDecoder(SequenceOrSequenceOfPayloadDecoder):
1099 protoComponent = univ.Sequence()
1102class SequenceOfPayloadDecoder(SequenceOrSequenceOfPayloadDecoder):
1103 protoComponent = univ.SequenceOf()
1106class SetOrSetOfPayloadDecoder(ConstructedPayloadDecoderBase):
1107 protoRecordComponent = univ.Set()
1108 protoSequenceComponent = univ.SetOf()
1111class SetPayloadDecoder(SetOrSetOfPayloadDecoder):
1112 protoComponent = univ.Set()
1115class SetOfPayloadDecoder(SetOrSetOfPayloadDecoder):
1116 protoComponent = univ.SetOf()
1119class ChoicePayloadDecoder(ConstructedPayloadDecoderBase):
1120 protoComponent = univ.Choice()
1122 def valueDecoder(self, substrate, asn1Spec,
1123 tagSet=None, length=None, state=None,
1124 decodeFun=None, substrateFun=None,
1125 **options):
1126 if asn1Spec is None:
1127 asn1Object = self.protoComponent.clone(tagSet=tagSet)
1129 else:
1130 asn1Object = asn1Spec.clone()
1132 if substrateFun:
1133 for chunk in substrateFun(asn1Object, substrate, length, options):
1134 yield chunk
1136 return
1138 options = self._passAsn1Object(asn1Object, options)
1140 if asn1Object.tagSet == tagSet:
1141 if LOG:
1142 LOG('decoding %s as explicitly tagged CHOICE' % (tagSet,))
1144 for component in decodeFun(
1145 substrate, asn1Object.componentTagMap, **options):
1146 if isinstance(component, SubstrateUnderrunError):
1147 yield component
1149 else:
1150 if LOG:
1151 LOG('decoding %s as untagged CHOICE' % (tagSet,))
1153 for component in decodeFun(
1154 substrate, asn1Object.componentTagMap, tagSet, length,
1155 state, **options):
1156 if isinstance(component, SubstrateUnderrunError):
1157 yield component
1159 effectiveTagSet = component.effectiveTagSet
1161 if LOG:
1162 LOG('decoded component %s, effective tag set %s' % (component, effectiveTagSet))
1164 asn1Object.setComponentByType(
1165 effectiveTagSet, component,
1166 verifyConstraints=False,
1167 matchTags=False, matchConstraints=False,
1168 innerFlag=False
1169 )
1171 yield asn1Object
1173 def indefLenValueDecoder(self, substrate, asn1Spec,
1174 tagSet=None, length=None, state=None,
1175 decodeFun=None, substrateFun=None,
1176 **options):
1177 if asn1Spec is None:
1178 asn1Object = self.protoComponent.clone(tagSet=tagSet)
1180 else:
1181 asn1Object = asn1Spec.clone()
1183 if substrateFun:
1184 for chunk in substrateFun(asn1Object, substrate, length, options):
1185 yield chunk
1187 return
1189 options = self._passAsn1Object(asn1Object, options)
1191 isTagged = asn1Object.tagSet == tagSet
1193 if LOG:
1194 LOG('decoding %s as %stagged CHOICE' % (
1195 tagSet, isTagged and 'explicitly ' or 'un'))
1197 while True:
1199 if isTagged:
1200 iterator = decodeFun(
1201 substrate, asn1Object.componentType.tagMapUnique,
1202 **dict(options, allowEoo=True))
1204 else:
1205 iterator = decodeFun(
1206 substrate, asn1Object.componentType.tagMapUnique,
1207 tagSet, length, state, **dict(options, allowEoo=True))
1209 for component in iterator:
1211 if isinstance(component, SubstrateUnderrunError):
1212 yield component
1214 if component is eoo.endOfOctets:
1215 break
1217 effectiveTagSet = component.effectiveTagSet
1219 if LOG:
1220 LOG('decoded component %s, effective tag set '
1221 '%s' % (component, effectiveTagSet))
1223 asn1Object.setComponentByType(
1224 effectiveTagSet, component,
1225 verifyConstraints=False,
1226 matchTags=False, matchConstraints=False,
1227 innerFlag=False
1228 )
1230 if not isTagged:
1231 break
1233 if not isTagged or component is eoo.endOfOctets:
1234 break
1236 yield asn1Object
1239class AnyPayloadDecoder(AbstractSimplePayloadDecoder):
1240 protoComponent = univ.Any()
1242 def valueDecoder(self, substrate, asn1Spec,
1243 tagSet=None, length=None, state=None,
1244 decodeFun=None, substrateFun=None,
1245 **options):
1246 if asn1Spec is None:
1247 isUntagged = True
1249 elif asn1Spec.__class__ is tagmap.TagMap:
1250 isUntagged = tagSet not in asn1Spec.tagMap
1252 else:
1253 isUntagged = tagSet != asn1Spec.tagSet
1255 if isUntagged:
1256 fullPosition = substrate.markedPosition
1257 currentPosition = substrate.tell()
1259 substrate.seek(fullPosition, os.SEEK_SET)
1260 length += currentPosition - fullPosition
1262 if LOG:
1263 for chunk in peekIntoStream(substrate, length):
1264 if isinstance(chunk, SubstrateUnderrunError):
1265 yield chunk
1266 LOG('decoding as untagged ANY, substrate '
1267 '%s' % debug.hexdump(chunk))
1269 if substrateFun:
1270 for chunk in substrateFun(
1271 self._createComponent(asn1Spec, tagSet, noValue, **options),
1272 substrate, length, options):
1273 yield chunk
1275 return
1277 for chunk in readFromStream(substrate, length, options):
1278 if isinstance(chunk, SubstrateUnderrunError):
1279 yield chunk
1281 yield self._createComponent(asn1Spec, tagSet, chunk, **options)
1283 def indefLenValueDecoder(self, substrate, asn1Spec,
1284 tagSet=None, length=None, state=None,
1285 decodeFun=None, substrateFun=None,
1286 **options):
1287 if asn1Spec is None:
1288 isTagged = False
1290 elif asn1Spec.__class__ is tagmap.TagMap:
1291 isTagged = tagSet in asn1Spec.tagMap
1293 else:
1294 isTagged = tagSet == asn1Spec.tagSet
1296 if isTagged:
1297 # tagged Any type -- consume header substrate
1298 chunk = null
1300 if LOG:
1301 LOG('decoding as tagged ANY')
1303 else:
1304 # TODO: Seems not to be tested
1305 fullPosition = substrate.markedPosition
1306 currentPosition = substrate.tell()
1308 substrate.seek(fullPosition, os.SEEK_SET)
1309 for chunk in readFromStream(substrate, currentPosition - fullPosition, options):
1310 if isinstance(chunk, SubstrateUnderrunError):
1311 yield chunk
1313 if LOG:
1314 LOG('decoding as untagged ANY, header substrate %s' % debug.hexdump(chunk))
1316 # Any components do not inherit initial tag
1317 asn1Spec = self.protoComponent
1319 if substrateFun and substrateFun is not self.substrateCollector:
1320 asn1Object = self._createComponent(
1321 asn1Spec, tagSet, noValue, **options)
1323 for chunk in substrateFun(
1324 asn1Object, chunk + substrate, length + len(chunk), options):
1325 yield chunk
1327 return
1329 if LOG:
1330 LOG('assembling constructed serialization')
1332 # All inner fragments are of the same type, treat them as octet string
1333 substrateFun = self.substrateCollector
1335 while True: # loop over fragments
1337 for component in decodeFun(
1338 substrate, asn1Spec, substrateFun=substrateFun,
1339 allowEoo=True, **options):
1341 if isinstance(component, SubstrateUnderrunError):
1342 yield component
1344 if component is eoo.endOfOctets:
1345 break
1347 if component is eoo.endOfOctets:
1348 break
1350 chunk += component
1352 if substrateFun:
1353 yield chunk # TODO: Weird
1355 else:
1356 yield self._createComponent(asn1Spec, tagSet, chunk, **options)
1359# character string types
1360class UTF8StringPayloadDecoder(OctetStringPayloadDecoder):
1361 protoComponent = char.UTF8String()
1364class NumericStringPayloadDecoder(OctetStringPayloadDecoder):
1365 protoComponent = char.NumericString()
1368class PrintableStringPayloadDecoder(OctetStringPayloadDecoder):
1369 protoComponent = char.PrintableString()
1372class TeletexStringPayloadDecoder(OctetStringPayloadDecoder):
1373 protoComponent = char.TeletexString()
1376class VideotexStringPayloadDecoder(OctetStringPayloadDecoder):
1377 protoComponent = char.VideotexString()
1380class IA5StringPayloadDecoder(OctetStringPayloadDecoder):
1381 protoComponent = char.IA5String()
1384class GraphicStringPayloadDecoder(OctetStringPayloadDecoder):
1385 protoComponent = char.GraphicString()
1388class VisibleStringPayloadDecoder(OctetStringPayloadDecoder):
1389 protoComponent = char.VisibleString()
1392class GeneralStringPayloadDecoder(OctetStringPayloadDecoder):
1393 protoComponent = char.GeneralString()
1396class UniversalStringPayloadDecoder(OctetStringPayloadDecoder):
1397 protoComponent = char.UniversalString()
1400class BMPStringPayloadDecoder(OctetStringPayloadDecoder):
1401 protoComponent = char.BMPString()
1404# "useful" types
1405class ObjectDescriptorPayloadDecoder(OctetStringPayloadDecoder):
1406 protoComponent = useful.ObjectDescriptor()
1409class GeneralizedTimePayloadDecoder(OctetStringPayloadDecoder):
1410 protoComponent = useful.GeneralizedTime()
1413class UTCTimePayloadDecoder(OctetStringPayloadDecoder):
1414 protoComponent = useful.UTCTime()
1417TAG_MAP = {
1418 univ.Integer.tagSet: IntegerPayloadDecoder(),
1419 univ.Boolean.tagSet: BooleanPayloadDecoder(),
1420 univ.BitString.tagSet: BitStringPayloadDecoder(),
1421 univ.OctetString.tagSet: OctetStringPayloadDecoder(),
1422 univ.Null.tagSet: NullPayloadDecoder(),
1423 univ.ObjectIdentifier.tagSet: ObjectIdentifierPayloadDecoder(),
1424 univ.Enumerated.tagSet: IntegerPayloadDecoder(),
1425 univ.Real.tagSet: RealPayloadDecoder(),
1426 univ.Sequence.tagSet: SequenceOrSequenceOfPayloadDecoder(), # conflicts with SequenceOf
1427 univ.Set.tagSet: SetOrSetOfPayloadDecoder(), # conflicts with SetOf
1428 univ.Choice.tagSet: ChoicePayloadDecoder(), # conflicts with Any
1429 # character string types
1430 char.UTF8String.tagSet: UTF8StringPayloadDecoder(),
1431 char.NumericString.tagSet: NumericStringPayloadDecoder(),
1432 char.PrintableString.tagSet: PrintableStringPayloadDecoder(),
1433 char.TeletexString.tagSet: TeletexStringPayloadDecoder(),
1434 char.VideotexString.tagSet: VideotexStringPayloadDecoder(),
1435 char.IA5String.tagSet: IA5StringPayloadDecoder(),
1436 char.GraphicString.tagSet: GraphicStringPayloadDecoder(),
1437 char.VisibleString.tagSet: VisibleStringPayloadDecoder(),
1438 char.GeneralString.tagSet: GeneralStringPayloadDecoder(),
1439 char.UniversalString.tagSet: UniversalStringPayloadDecoder(),
1440 char.BMPString.tagSet: BMPStringPayloadDecoder(),
1441 # useful types
1442 useful.ObjectDescriptor.tagSet: ObjectDescriptorPayloadDecoder(),
1443 useful.GeneralizedTime.tagSet: GeneralizedTimePayloadDecoder(),
1444 useful.UTCTime.tagSet: UTCTimePayloadDecoder()
1445}
1447# Type-to-codec map for ambiguous ASN.1 types
1448TYPE_MAP = {
1449 univ.Set.typeId: SetPayloadDecoder(),
1450 univ.SetOf.typeId: SetOfPayloadDecoder(),
1451 univ.Sequence.typeId: SequencePayloadDecoder(),
1452 univ.SequenceOf.typeId: SequenceOfPayloadDecoder(),
1453 univ.Choice.typeId: ChoicePayloadDecoder(),
1454 univ.Any.typeId: AnyPayloadDecoder()
1455}
1457# deprecated aliases, https://github.com/pyasn1/pyasn1/issues/9
1458tagMap = TAG_MAP
1459typeMap = TYPE_MAP
1461# Put in non-ambiguous types for faster codec lookup
1462for typeDecoder in TAG_MAP.values():
1463 if typeDecoder.protoComponent is not None:
1464 typeId = typeDecoder.protoComponent.__class__.typeId
1465 if typeId is not None and typeId not in TYPE_MAP:
1466 TYPE_MAP[typeId] = typeDecoder
1469(stDecodeTag,
1470 stDecodeLength,
1471 stGetValueDecoder,
1472 stGetValueDecoderByAsn1Spec,
1473 stGetValueDecoderByTag,
1474 stTryAsExplicitTag,
1475 stDecodeValue,
1476 stDumpRawValue,
1477 stErrorCondition,
1478 stStop) = [x for x in range(10)]
1481EOO_SENTINEL = ints2octs((0, 0))
1484class SingleItemDecoder(object):
1485 defaultErrorState = stErrorCondition
1486 #defaultErrorState = stDumpRawValue
1487 defaultRawDecoder = AnyPayloadDecoder()
1489 supportIndefLength = True
1491 TAG_MAP = TAG_MAP
1492 TYPE_MAP = TYPE_MAP
1494 def __init__(self, tagMap=_MISSING, typeMap=_MISSING, **ignored):
1495 self._tagMap = tagMap if tagMap is not _MISSING else self.TAG_MAP
1496 self._typeMap = typeMap if typeMap is not _MISSING else self.TYPE_MAP
1498 # Tag & TagSet objects caches
1499 self._tagCache = {}
1500 self._tagSetCache = {}
1502 def __call__(self, substrate, asn1Spec=None,
1503 tagSet=None, length=None, state=stDecodeTag,
1504 decodeFun=None, substrateFun=None,
1505 **options):
1507 allowEoo = options.pop('allowEoo', False)
1509 if LOG:
1510 LOG('decoder called at scope %s with state %d, working with up '
1511 'to %s octets of substrate: '
1512 '%s' % (debug.scope, state, length, substrate))
1514 # Look for end-of-octets sentinel
1515 if allowEoo and self.supportIndefLength:
1517 for eoo_candidate in readFromStream(substrate, 2, options):
1518 if isinstance(eoo_candidate, SubstrateUnderrunError):
1519 yield eoo_candidate
1521 if eoo_candidate == EOO_SENTINEL:
1522 if LOG:
1523 LOG('end-of-octets sentinel found')
1524 yield eoo.endOfOctets
1525 return
1527 else:
1528 substrate.seek(-2, os.SEEK_CUR)
1530 tagMap = self._tagMap
1531 typeMap = self._typeMap
1532 tagCache = self._tagCache
1533 tagSetCache = self._tagSetCache
1535 value = noValue
1537 substrate.markedPosition = substrate.tell()
1539 while state is not stStop:
1541 if state is stDecodeTag:
1542 # Decode tag
1543 isShortTag = True
1545 for firstByte in readFromStream(substrate, 1, options):
1546 if isinstance(firstByte, SubstrateUnderrunError):
1547 yield firstByte
1549 firstOctet = ord(firstByte)
1551 try:
1552 lastTag = tagCache[firstOctet]
1554 except KeyError:
1555 integerTag = firstOctet
1556 tagClass = integerTag & 0xC0
1557 tagFormat = integerTag & 0x20
1558 tagId = integerTag & 0x1F
1560 if tagId == 0x1F:
1561 isShortTag = False
1562 lengthOctetIdx = 0
1563 tagId = 0
1565 while True:
1566 for integerByte in readFromStream(substrate, 1, options):
1567 if isinstance(integerByte, SubstrateUnderrunError):
1568 yield integerByte
1570 if not integerByte:
1571 raise error.SubstrateUnderrunError(
1572 'Short octet stream on long tag decoding'
1573 )
1575 integerTag = ord(integerByte)
1576 lengthOctetIdx += 1
1577 tagId <<= 7
1578 tagId |= (integerTag & 0x7F)
1580 if not integerTag & 0x80:
1581 break
1583 lastTag = tag.Tag(
1584 tagClass=tagClass, tagFormat=tagFormat, tagId=tagId
1585 )
1587 if isShortTag:
1588 # cache short tags
1589 tagCache[firstOctet] = lastTag
1591 if tagSet is None:
1592 if isShortTag:
1593 try:
1594 tagSet = tagSetCache[firstOctet]
1596 except KeyError:
1597 # base tag not recovered
1598 tagSet = tag.TagSet((), lastTag)
1599 tagSetCache[firstOctet] = tagSet
1600 else:
1601 tagSet = tag.TagSet((), lastTag)
1603 else:
1604 tagSet = lastTag + tagSet
1606 state = stDecodeLength
1608 if LOG:
1609 LOG('tag decoded into %s, decoding length' % tagSet)
1611 if state is stDecodeLength:
1612 # Decode length
1613 for firstOctet in readFromStream(substrate, 1, options):
1614 if isinstance(firstOctet, SubstrateUnderrunError):
1615 yield firstOctet
1617 firstOctet = ord(firstOctet)
1619 if firstOctet < 128:
1620 length = firstOctet
1622 elif firstOctet > 128:
1623 size = firstOctet & 0x7F
1624 # encoded in size bytes
1625 for encodedLength in readFromStream(substrate, size, options):
1626 if isinstance(encodedLength, SubstrateUnderrunError):
1627 yield encodedLength
1628 encodedLength = list(encodedLength)
1629 # missing check on maximum size, which shouldn't be a
1630 # problem, we can handle more than is possible
1631 if len(encodedLength) != size:
1632 raise error.SubstrateUnderrunError(
1633 '%s<%s at %s' % (size, len(encodedLength), tagSet)
1634 )
1636 length = 0
1637 for lengthOctet in encodedLength:
1638 length <<= 8
1639 length |= oct2int(lengthOctet)
1640 size += 1
1642 else: # 128 means indefinite
1643 length = -1
1645 if length == -1 and not self.supportIndefLength:
1646 raise error.PyAsn1Error('Indefinite length encoding not supported by this codec')
1648 state = stGetValueDecoder
1650 if LOG:
1651 LOG('value length decoded into %d' % length)
1653 if state is stGetValueDecoder:
1654 if asn1Spec is None:
1655 state = stGetValueDecoderByTag
1657 else:
1658 state = stGetValueDecoderByAsn1Spec
1659 #
1660 # There're two ways of creating subtypes in ASN.1 what influences
1661 # decoder operation. These methods are:
1662 # 1) Either base types used in or no IMPLICIT tagging has been
1663 # applied on subtyping.
1664 # 2) Subtype syntax drops base type information (by means of
1665 # IMPLICIT tagging.
1666 # The first case allows for complete tag recovery from substrate
1667 # while the second one requires original ASN.1 type spec for
1668 # decoding.
1669 #
1670 # In either case a set of tags (tagSet) is coming from substrate
1671 # in an incremental, tag-by-tag fashion (this is the case of
1672 # EXPLICIT tag which is most basic). Outermost tag comes first
1673 # from the wire.
1674 #
1675 if state is stGetValueDecoderByTag:
1676 try:
1677 concreteDecoder = tagMap[tagSet]
1679 except KeyError:
1680 concreteDecoder = None
1682 if concreteDecoder:
1683 state = stDecodeValue
1685 else:
1686 try:
1687 concreteDecoder = tagMap[tagSet[:1]]
1689 except KeyError:
1690 concreteDecoder = None
1692 if concreteDecoder:
1693 state = stDecodeValue
1694 else:
1695 state = stTryAsExplicitTag
1697 if LOG:
1698 LOG('codec %s chosen by a built-in type, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag'))
1699 debug.scope.push(concreteDecoder is None and '?' or concreteDecoder.protoComponent.__class__.__name__)
1701 if state is stGetValueDecoderByAsn1Spec:
1703 if asn1Spec.__class__ is tagmap.TagMap:
1704 try:
1705 chosenSpec = asn1Spec[tagSet]
1707 except KeyError:
1708 chosenSpec = None
1710 if LOG:
1711 LOG('candidate ASN.1 spec is a map of:')
1713 for firstOctet, v in asn1Spec.presentTypes.items():
1714 LOG(' %s -> %s' % (firstOctet, v.__class__.__name__))
1716 if asn1Spec.skipTypes:
1717 LOG('but neither of: ')
1718 for firstOctet, v in asn1Spec.skipTypes.items():
1719 LOG(' %s -> %s' % (firstOctet, v.__class__.__name__))
1720 LOG('new candidate ASN.1 spec is %s, chosen by %s' % (chosenSpec is None and '<none>' or chosenSpec.prettyPrintType(), tagSet))
1722 elif tagSet == asn1Spec.tagSet or tagSet in asn1Spec.tagMap:
1723 chosenSpec = asn1Spec
1724 if LOG:
1725 LOG('candidate ASN.1 spec is %s' % asn1Spec.__class__.__name__)
1727 else:
1728 chosenSpec = None
1730 if chosenSpec is not None:
1731 try:
1732 # ambiguous type or just faster codec lookup
1733 concreteDecoder = typeMap[chosenSpec.typeId]
1735 if LOG:
1736 LOG('value decoder chosen for an ambiguous type by type ID %s' % (chosenSpec.typeId,))
1738 except KeyError:
1739 # use base type for codec lookup to recover untagged types
1740 baseTagSet = tag.TagSet(chosenSpec.tagSet.baseTag, chosenSpec.tagSet.baseTag)
1741 try:
1742 # base type or tagged subtype
1743 concreteDecoder = tagMap[baseTagSet]
1745 if LOG:
1746 LOG('value decoder chosen by base %s' % (baseTagSet,))
1748 except KeyError:
1749 concreteDecoder = None
1751 if concreteDecoder:
1752 asn1Spec = chosenSpec
1753 state = stDecodeValue
1755 else:
1756 state = stTryAsExplicitTag
1758 else:
1759 concreteDecoder = None
1760 state = stTryAsExplicitTag
1762 if LOG:
1763 LOG('codec %s chosen by ASN.1 spec, decoding %s' % (state is stDecodeValue and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag'))
1764 debug.scope.push(chosenSpec is None and '?' or chosenSpec.__class__.__name__)
1766 if state is stDecodeValue:
1767 if not options.get('recursiveFlag', True) and not substrateFun: # deprecate this
1768 def substrateFun(asn1Object, _substrate, _length, _options):
1769 """Legacy hack to keep the recursiveFlag=False option supported.
1771 The decode(..., substrateFun=userCallback) option was introduced in 0.1.4 as a generalization
1772 of the old recursiveFlag=False option. Users should pass their callback instead of using
1773 recursiveFlag.
1774 """
1775 yield asn1Object
1777 original_position = substrate.tell()
1779 if length == -1: # indef length
1780 for value in concreteDecoder.indefLenValueDecoder(
1781 substrate, asn1Spec,
1782 tagSet, length, stGetValueDecoder,
1783 self, substrateFun, **options):
1784 if isinstance(value, SubstrateUnderrunError):
1785 yield value
1787 else:
1788 for value in concreteDecoder.valueDecoder(
1789 substrate, asn1Spec,
1790 tagSet, length, stGetValueDecoder,
1791 self, substrateFun, **options):
1792 if isinstance(value, SubstrateUnderrunError):
1793 yield value
1795 bytesRead = substrate.tell() - original_position
1796 if not substrateFun and bytesRead != length:
1797 raise PyAsn1Error(
1798 "Read %s bytes instead of expected %s." % (bytesRead, length))
1799 elif substrateFun and bytesRead > length:
1800 # custom substrateFun may be used for partial decoding, reading less is expected there
1801 raise PyAsn1Error(
1802 "Read %s bytes are more than expected %s." % (bytesRead, length))
1804 if LOG:
1805 LOG('codec %s yields type %s, value:\n%s\n...' % (
1806 concreteDecoder.__class__.__name__, value.__class__.__name__,
1807 isinstance(value, base.Asn1Item) and value.prettyPrint() or value))
1809 state = stStop
1810 break
1812 if state is stTryAsExplicitTag:
1813 if (tagSet and
1814 tagSet[0].tagFormat == tag.tagFormatConstructed and
1815 tagSet[0].tagClass != tag.tagClassUniversal):
1816 # Assume explicit tagging
1817 concreteDecoder = rawPayloadDecoder
1818 state = stDecodeValue
1820 else:
1821 concreteDecoder = None
1822 state = self.defaultErrorState
1824 if LOG:
1825 LOG('codec %s chosen, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as failure'))
1827 if state is stDumpRawValue:
1828 concreteDecoder = self.defaultRawDecoder
1830 if LOG:
1831 LOG('codec %s chosen, decoding value' % concreteDecoder.__class__.__name__)
1833 state = stDecodeValue
1835 if state is stErrorCondition:
1836 raise error.PyAsn1Error(
1837 '%s not in asn1Spec: %r' % (tagSet, asn1Spec)
1838 )
1840 if LOG:
1841 debug.scope.pop()
1842 LOG('decoder left scope %s, call completed' % debug.scope)
1844 yield value
1847class StreamingDecoder(object):
1848 """Create an iterator that turns BER/CER/DER byte stream into ASN.1 objects.
1850 On each iteration, consume whatever BER/CER/DER serialization is
1851 available in the `substrate` stream-like object and turns it into
1852 one or more, possibly nested, ASN.1 objects.
1854 Parameters
1855 ----------
1856 substrate: :py:class:`file`, :py:class:`io.BytesIO`
1857 BER/CER/DER serialization in form of a byte stream
1859 Keyword Args
1860 ------------
1861 asn1Spec: :py:class:`~pyasn1.type.base.PyAsn1Item`
1862 A pyasn1 type object to act as a template guiding the decoder.
1863 Depending on the ASN.1 structure being decoded, `asn1Spec` may
1864 or may not be required. One of the reasons why `asn1Spec` may
1865 me required is that ASN.1 structure is encoded in the *IMPLICIT*
1866 tagging mode.
1868 Yields
1869 ------
1870 : :py:class:`~pyasn1.type.base.PyAsn1Item`, :py:class:`~pyasn1.error.SubstrateUnderrunError`
1871 Decoded ASN.1 object (possibly, nested) or
1872 :py:class:`~pyasn1.error.SubstrateUnderrunError` object indicating
1873 insufficient BER/CER/DER serialization on input to fully recover ASN.1
1874 objects from it.
1876 In the latter case the caller is advised to ensure some more data in
1877 the input stream, then call the iterator again. The decoder will resume
1878 the decoding process using the newly arrived data.
1880 The `context` property of :py:class:`~pyasn1.error.SubstrateUnderrunError`
1881 object might hold a reference to the partially populated ASN.1 object
1882 being reconstructed.
1884 Raises
1885 ------
1886 ~pyasn1.error.PyAsn1Error, ~pyasn1.error.EndOfStreamError
1887 `PyAsn1Error` on deserialization error, `EndOfStreamError` on
1888 premature stream closure.
1890 Examples
1891 --------
1892 Decode BER serialisation without ASN.1 schema
1894 .. code-block:: pycon
1896 >>> stream = io.BytesIO(
1897 ... b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03')
1898 >>>
1899 >>> for asn1Object in StreamingDecoder(stream):
1900 ... print(asn1Object)
1901 >>>
1902 SequenceOf:
1903 1 2 3
1905 Decode BER serialisation with ASN.1 schema
1907 .. code-block:: pycon
1909 >>> stream = io.BytesIO(
1910 ... b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03')
1911 >>>
1912 >>> schema = SequenceOf(componentType=Integer())
1913 >>>
1914 >>> decoder = StreamingDecoder(stream, asn1Spec=schema)
1915 >>> for asn1Object in decoder:
1916 ... print(asn1Object)
1917 >>>
1918 SequenceOf:
1919 1 2 3
1920 """
1922 SINGLE_ITEM_DECODER = SingleItemDecoder
1924 def __init__(self, substrate, asn1Spec=None, **options):
1925 self._singleItemDecoder = self.SINGLE_ITEM_DECODER(**options)
1926 self._substrate = asSeekableStream(substrate)
1927 self._asn1Spec = asn1Spec
1928 self._options = options
1930 def __iter__(self):
1931 while True:
1932 for asn1Object in self._singleItemDecoder(
1933 self._substrate, self._asn1Spec, **self._options):
1934 yield asn1Object
1936 for chunk in isEndOfStream(self._substrate):
1937 if isinstance(chunk, SubstrateUnderrunError):
1938 yield
1940 break
1942 if chunk:
1943 break
1946class Decoder(object):
1947 """Create a BER decoder object.
1949 Parse BER/CER/DER octet-stream into one, possibly nested, ASN.1 object.
1950 """
1951 STREAMING_DECODER = StreamingDecoder
1953 @classmethod
1954 def __call__(cls, substrate, asn1Spec=None, **options):
1955 """Turns BER/CER/DER octet stream into an ASN.1 object.
1957 Takes BER/CER/DER octet-stream in form of :py:class:`bytes` (Python 3)
1958 or :py:class:`str` (Python 2) and decode it into an ASN.1 object
1959 (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) which
1960 may be a scalar or an arbitrary nested structure.
1962 Parameters
1963 ----------
1964 substrate: :py:class:`bytes` (Python 3) or :py:class:`str` (Python 2)
1965 BER/CER/DER octet-stream to parse
1967 Keyword Args
1968 ------------
1969 asn1Spec: :py:class:`~pyasn1.type.base.PyAsn1Item`
1970 A pyasn1 type object (:py:class:`~pyasn1.type.base.PyAsn1Item`
1971 derivative) to act as a template guiding the decoder.
1972 Depending on the ASN.1 structure being decoded, `asn1Spec` may or
1973 may not be required. Most common reason for it to require is that
1974 ASN.1 structure is encoded in *IMPLICIT* tagging mode.
1976 substrateFun: :py:class:`Union[
1977 Callable[[pyasn1.type.base.PyAsn1Item, bytes, int],
1978 Tuple[pyasn1.type.base.PyAsn1Item, bytes]],
1979 Callable[[pyasn1.type.base.PyAsn1Item, io.BytesIO, int, dict],
1980 Generator[Union[pyasn1.type.base.PyAsn1Item,
1981 pyasn1.error.SubstrateUnderrunError],
1982 None, None]]
1983 ]`
1984 User callback meant to generalize special use cases like non-recursive or
1985 partial decoding. A 3-arg non-streaming variant is supported for backwards
1986 compatiblilty in addition to the newer 4-arg streaming variant.
1987 The callback will receive the uninitialized object recovered from substrate
1988 as 1st argument, the uninterpreted payload as 2nd argument, and the length
1989 of the uninterpreted payload as 3rd argument. The streaming variant will
1990 additionally receive the decode(..., **options) kwargs as 4th argument.
1991 The non-streaming variant shall return an object that will be propagated
1992 as decode() return value as 1st item, and the remainig payload for further
1993 decode passes as 2nd item.
1994 The streaming variant shall yield an object that will be propagated as
1995 decode() return value, and leave the remaining payload in the stream.
1997 Returns
1998 -------
1999 : :py:class:`tuple`
2000 A tuple of :py:class:`~pyasn1.type.base.PyAsn1Item` object
2001 recovered from BER/CER/DER substrate and the unprocessed trailing
2002 portion of the `substrate` (may be empty)
2004 Raises
2005 ------
2006 : :py:class:`~pyasn1.error.PyAsn1Error`
2007 :py:class:`~pyasn1.error.SubstrateUnderrunError` on insufficient
2008 input or :py:class:`~pyasn1.error.PyAsn1Error` on decoding error.
2010 Examples
2011 --------
2012 Decode BER/CER/DER serialisation without ASN.1 schema
2014 .. code-block:: pycon
2016 >>> s, unprocessed = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03')
2017 >>> str(s)
2018 SequenceOf:
2019 1 2 3
2021 Decode BER/CER/DER serialisation with ASN.1 schema
2023 .. code-block:: pycon
2025 >>> seq = SequenceOf(componentType=Integer())
2026 >>> s, unprocessed = decode(
2027 b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03', asn1Spec=seq)
2028 >>> str(s)
2029 SequenceOf:
2030 1 2 3
2032 """
2033 substrate = asSeekableStream(substrate)
2035 if "substrateFun" in options:
2036 origSubstrateFun = options["substrateFun"]
2038 def substrateFunWrapper(asn1Object, substrate, length, options=None):
2039 """Support both 0.4 and 0.5 style APIs.
2041 substrateFun API has changed in 0.5 for use with streaming decoders. To stay backwards compatible,
2042 we first try if we received a streaming user callback. If that fails,we assume we've received a
2043 non-streaming v0.4 user callback and convert it for streaming on the fly
2044 """
2045 try:
2046 substrate_gen = origSubstrateFun(asn1Object, substrate, length, options)
2047 except TypeError:
2048 _type, _value, traceback = sys.exc_info()
2049 if traceback.tb_next:
2050 # Traceback depth > 1 means TypeError from inside user provided function
2051 raise
2052 # invariant maintained at Decoder.__call__ entry
2053 assert isinstance(substrate, io.BytesIO) # nosec assert_used
2054 substrate_gen = Decoder._callSubstrateFunV4asV5(origSubstrateFun, asn1Object, substrate, length)
2055 for value in substrate_gen:
2056 yield value
2058 options["substrateFun"] = substrateFunWrapper
2060 streamingDecoder = cls.STREAMING_DECODER(
2061 substrate, asn1Spec, **options)
2063 for asn1Object in streamingDecoder:
2064 if isinstance(asn1Object, SubstrateUnderrunError):
2065 raise error.SubstrateUnderrunError('Short substrate on input')
2067 try:
2068 tail = next(readFromStream(substrate))
2070 except error.EndOfStreamError:
2071 tail = null
2073 return asn1Object, tail
2075 @staticmethod
2076 def _callSubstrateFunV4asV5(substrateFunV4, asn1Object, substrate, length):
2077 substrate_bytes = substrate.read()
2078 if length == -1:
2079 length = len(substrate_bytes)
2080 value, nextSubstrate = substrateFunV4(asn1Object, substrate_bytes, length)
2081 nbytes = substrate.write(nextSubstrate)
2082 substrate.truncate()
2083 substrate.seek(-nbytes, os.SEEK_CUR)
2084 yield value
2086#: Turns BER octet stream into an ASN.1 object.
2087#:
2088#: Takes BER octet-stream and decode it into an ASN.1 object
2089#: (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) which
2090#: may be a scalar or an arbitrary nested structure.
2091#:
2092#: Parameters
2093#: ----------
2094#: substrate: :py:class:`bytes` (Python 3) or :py:class:`str` (Python 2)
2095#: BER octet-stream
2096#:
2097#: Keyword Args
2098#: ------------
2099#: asn1Spec: any pyasn1 type object e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative
2100#: A pyasn1 type object to act as a template guiding the decoder. Depending on the ASN.1 structure
2101#: being decoded, *asn1Spec* may or may not be required. Most common reason for
2102#: it to require is that ASN.1 structure is encoded in *IMPLICIT* tagging mode.
2103#:
2104#: Returns
2105#: -------
2106#: : :py:class:`tuple`
2107#: A tuple of pyasn1 object recovered from BER substrate (:py:class:`~pyasn1.type.base.PyAsn1Item` derivative)
2108#: and the unprocessed trailing portion of the *substrate* (may be empty)
2109#:
2110#: Raises
2111#: ------
2112#: ~pyasn1.error.PyAsn1Error, ~pyasn1.error.SubstrateUnderrunError
2113#: On decoding errors
2114#:
2115#: Notes
2116#: -----
2117#: This function is deprecated. Please use :py:class:`Decoder` or
2118#: :py:class:`StreamingDecoder` class instance.
2119#:
2120#: Examples
2121#: --------
2122#: Decode BER serialisation without ASN.1 schema
2123#:
2124#: .. code-block:: pycon
2125#:
2126#: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03')
2127#: >>> str(s)
2128#: SequenceOf:
2129#: 1 2 3
2130#:
2131#: Decode BER serialisation with ASN.1 schema
2132#:
2133#: .. code-block:: pycon
2134#:
2135#: >>> seq = SequenceOf(componentType=Integer())
2136#: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03', asn1Spec=seq)
2137#: >>> str(s)
2138#: SequenceOf:
2139#: 1 2 3
2140#:
2141decode = Decoder()