Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pyasn1/codec/ber/decoder.py: 15%
991 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:37 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:37 +0000
1#
2# This file is part of pyasn1 software.
3#
4# Copyright (c) 2005-2020, Ilya Etingof <etingof@gmail.com>
5# License: https://pyasn1.readthedocs.io/en/latest/license.html
6#
7import os
9from pyasn1 import debug
10from pyasn1 import error
11from pyasn1.codec.ber import eoo
12from pyasn1.codec.streaming import asSeekableStream
13from pyasn1.codec.streaming import isEndOfStream
14from pyasn1.codec.streaming import peekIntoStream
15from pyasn1.codec.streaming import readFromStream
16from pyasn1.compat import _MISSING
17from pyasn1.compat.integer import from_bytes
18from pyasn1.compat.octets import oct2int, octs2ints, ints2octs, null
19from pyasn1.error import PyAsn1Error
20from pyasn1.type import base
21from pyasn1.type import char
22from pyasn1.type import tag
23from pyasn1.type import tagmap
24from pyasn1.type import univ
25from pyasn1.type import useful
27__all__ = ['StreamingDecoder', 'Decoder', 'decode']
29LOG = debug.registerLoggee(__name__, flags=debug.DEBUG_DECODER)
31noValue = base.noValue
33SubstrateUnderrunError = error.SubstrateUnderrunError
36class AbstractPayloadDecoder(object):
37 protoComponent = None
39 def valueDecoder(self, substrate, asn1Spec,
40 tagSet=None, length=None, state=None,
41 decodeFun=None, substrateFun=None,
42 **options):
43 """Decode value with fixed byte length.
45 The decoder is allowed to consume as many bytes as necessary.
46 """
47 raise error.PyAsn1Error('SingleItemDecoder not implemented for %s' % (tagSet,)) # TODO: Seems more like an NotImplementedError?
49 def indefLenValueDecoder(self, substrate, asn1Spec,
50 tagSet=None, length=None, state=None,
51 decodeFun=None, substrateFun=None,
52 **options):
53 """Decode value with undefined length.
55 The decoder is allowed to consume as many bytes as necessary.
56 """
57 raise error.PyAsn1Error('Indefinite length mode decoder not implemented for %s' % (tagSet,)) # TODO: Seems more like an NotImplementedError?
59 @staticmethod
60 def _passAsn1Object(asn1Object, options):
61 if 'asn1Object' not in options:
62 options['asn1Object'] = asn1Object
64 return options
67class AbstractSimplePayloadDecoder(AbstractPayloadDecoder):
68 @staticmethod
69 def substrateCollector(asn1Object, substrate, length, options):
70 for chunk in readFromStream(substrate, length, options):
71 yield chunk
73 def _createComponent(self, asn1Spec, tagSet, value, **options):
74 if options.get('native'):
75 return value
76 elif asn1Spec is None:
77 return self.protoComponent.clone(value, tagSet=tagSet)
78 elif value is noValue:
79 return asn1Spec
80 else:
81 return asn1Spec.clone(value)
84class RawPayloadDecoder(AbstractSimplePayloadDecoder):
85 protoComponent = univ.Any('')
87 def valueDecoder(self, substrate, asn1Spec,
88 tagSet=None, length=None, state=None,
89 decodeFun=None, substrateFun=None,
90 **options):
91 if substrateFun:
92 asn1Object = self._createComponent(asn1Spec, tagSet, '', **options)
94 for chunk in substrateFun(asn1Object, substrate, length, options):
95 yield chunk
97 return
99 for value in decodeFun(substrate, asn1Spec, tagSet, length, **options):
100 yield value
102 def indefLenValueDecoder(self, substrate, asn1Spec,
103 tagSet=None, length=None, state=None,
104 decodeFun=None, substrateFun=None,
105 **options):
106 if substrateFun:
107 asn1Object = self._createComponent(asn1Spec, tagSet, '', **options)
109 for chunk in substrateFun(asn1Object, substrate, length, options):
110 yield chunk
112 return
114 while True:
115 for value in decodeFun(
116 substrate, asn1Spec, tagSet, length,
117 allowEoo=True, **options):
119 if value is eoo.endOfOctets:
120 return
122 yield value
125rawPayloadDecoder = RawPayloadDecoder()
128class IntegerPayloadDecoder(AbstractSimplePayloadDecoder):
129 protoComponent = univ.Integer(0)
131 def valueDecoder(self, substrate, asn1Spec,
132 tagSet=None, length=None, state=None,
133 decodeFun=None, substrateFun=None,
134 **options):
136 if tagSet[0].tagFormat != tag.tagFormatSimple:
137 raise error.PyAsn1Error('Simple tag format expected')
139 for chunk in readFromStream(substrate, length, options):
140 if isinstance(chunk, SubstrateUnderrunError):
141 yield chunk
143 if chunk:
144 value = from_bytes(chunk, signed=True)
146 else:
147 value = 0
149 yield self._createComponent(asn1Spec, tagSet, value, **options)
152class BooleanPayloadDecoder(IntegerPayloadDecoder):
153 protoComponent = univ.Boolean(0)
155 def _createComponent(self, asn1Spec, tagSet, value, **options):
156 return IntegerPayloadDecoder._createComponent(
157 self, asn1Spec, tagSet, value and 1 or 0, **options)
160class BitStringPayloadDecoder(AbstractSimplePayloadDecoder):
161 protoComponent = univ.BitString(())
162 supportConstructedForm = True
164 def valueDecoder(self, substrate, asn1Spec,
165 tagSet=None, length=None, state=None,
166 decodeFun=None, substrateFun=None,
167 **options):
169 if substrateFun:
170 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options)
172 for chunk in substrateFun(asn1Object, substrate, length, options):
173 yield chunk
175 return
177 if not length:
178 raise error.PyAsn1Error('Empty BIT STRING substrate')
180 for chunk in isEndOfStream(substrate):
181 if isinstance(chunk, SubstrateUnderrunError):
182 yield chunk
184 if chunk:
185 raise error.PyAsn1Error('Empty BIT STRING substrate')
187 if tagSet[0].tagFormat == tag.tagFormatSimple: # XXX what tag to check?
189 for trailingBits in readFromStream(substrate, 1, options):
190 if isinstance(trailingBits, SubstrateUnderrunError):
191 yield trailingBits
193 trailingBits = ord(trailingBits)
194 if trailingBits > 7:
195 raise error.PyAsn1Error(
196 'Trailing bits overflow %s' % trailingBits
197 )
199 for chunk in readFromStream(substrate, length - 1, options):
200 if isinstance(chunk, SubstrateUnderrunError):
201 yield chunk
203 value = self.protoComponent.fromOctetString(
204 chunk, internalFormat=True, padding=trailingBits)
206 yield self._createComponent(asn1Spec, tagSet, value, **options)
208 return
210 if not self.supportConstructedForm:
211 raise error.PyAsn1Error('Constructed encoding form prohibited '
212 'at %s' % self.__class__.__name__)
214 if LOG:
215 LOG('assembling constructed serialization')
217 # All inner fragments are of the same type, treat them as octet string
218 substrateFun = self.substrateCollector
220 bitString = self.protoComponent.fromOctetString(null, internalFormat=True)
222 current_position = substrate.tell()
224 while substrate.tell() - current_position < length:
225 for component in decodeFun(
226 substrate, self.protoComponent, substrateFun=substrateFun,
227 **options):
228 if isinstance(component, SubstrateUnderrunError):
229 yield component
231 trailingBits = oct2int(component[0])
232 if trailingBits > 7:
233 raise error.PyAsn1Error(
234 'Trailing bits overflow %s' % trailingBits
235 )
237 bitString = self.protoComponent.fromOctetString(
238 component[1:], internalFormat=True,
239 prepend=bitString, padding=trailingBits
240 )
242 yield self._createComponent(asn1Spec, tagSet, bitString, **options)
244 def indefLenValueDecoder(self, substrate, asn1Spec,
245 tagSet=None, length=None, state=None,
246 decodeFun=None, substrateFun=None,
247 **options):
249 if substrateFun:
250 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options)
252 for chunk in substrateFun(asn1Object, substrate, length, options):
253 yield chunk
255 return
257 # All inner fragments are of the same type, treat them as octet string
258 substrateFun = self.substrateCollector
260 bitString = self.protoComponent.fromOctetString(null, internalFormat=True)
262 while True: # loop over fragments
264 for component in decodeFun(
265 substrate, self.protoComponent, substrateFun=substrateFun,
266 allowEoo=True, **options):
268 if component is eoo.endOfOctets:
269 break
271 if isinstance(component, SubstrateUnderrunError):
272 yield component
274 if component is eoo.endOfOctets:
275 break
277 trailingBits = oct2int(component[0])
278 if trailingBits > 7:
279 raise error.PyAsn1Error(
280 'Trailing bits overflow %s' % trailingBits
281 )
283 bitString = self.protoComponent.fromOctetString(
284 component[1:], internalFormat=True,
285 prepend=bitString, padding=trailingBits
286 )
288 yield self._createComponent(asn1Spec, tagSet, bitString, **options)
291class OctetStringPayloadDecoder(AbstractSimplePayloadDecoder):
292 protoComponent = univ.OctetString('')
293 supportConstructedForm = True
295 def valueDecoder(self, substrate, asn1Spec,
296 tagSet=None, length=None, state=None,
297 decodeFun=None, substrateFun=None,
298 **options):
299 if substrateFun:
300 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options)
302 for chunk in substrateFun(asn1Object, substrate, length, options):
303 yield chunk
305 return
307 if tagSet[0].tagFormat == tag.tagFormatSimple: # XXX what tag to check?
308 for chunk in readFromStream(substrate, length, options):
309 if isinstance(chunk, SubstrateUnderrunError):
310 yield chunk
312 yield self._createComponent(asn1Spec, tagSet, chunk, **options)
314 return
316 if not self.supportConstructedForm:
317 raise error.PyAsn1Error('Constructed encoding form prohibited at %s' % self.__class__.__name__)
319 if LOG:
320 LOG('assembling constructed serialization')
322 # All inner fragments are of the same type, treat them as octet string
323 substrateFun = self.substrateCollector
325 header = null
327 original_position = substrate.tell()
328 # head = popSubstream(substrate, length)
329 while substrate.tell() - original_position < length:
330 for component in decodeFun(
331 substrate, self.protoComponent, substrateFun=substrateFun,
332 **options):
333 if isinstance(component, SubstrateUnderrunError):
334 yield component
336 header += component
338 yield self._createComponent(asn1Spec, tagSet, header, **options)
340 def indefLenValueDecoder(self, substrate, asn1Spec,
341 tagSet=None, length=None, state=None,
342 decodeFun=None, substrateFun=None,
343 **options):
344 if substrateFun and substrateFun is not self.substrateCollector:
345 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options)
347 for chunk in substrateFun(asn1Object, substrate, length, options):
348 yield chunk
350 return
352 # All inner fragments are of the same type, treat them as octet string
353 substrateFun = self.substrateCollector
355 header = null
357 while True: # loop over fragments
359 for component in decodeFun(
360 substrate, self.protoComponent, substrateFun=substrateFun,
361 allowEoo=True, **options):
363 if isinstance(component, SubstrateUnderrunError):
364 yield component
366 if component is eoo.endOfOctets:
367 break
369 if component is eoo.endOfOctets:
370 break
372 header += component
374 yield self._createComponent(asn1Spec, tagSet, header, **options)
377class NullPayloadDecoder(AbstractSimplePayloadDecoder):
378 protoComponent = univ.Null('')
380 def valueDecoder(self, substrate, asn1Spec,
381 tagSet=None, length=None, state=None,
382 decodeFun=None, substrateFun=None,
383 **options):
385 if tagSet[0].tagFormat != tag.tagFormatSimple:
386 raise error.PyAsn1Error('Simple tag format expected')
388 for chunk in readFromStream(substrate, length, options):
389 if isinstance(chunk, SubstrateUnderrunError):
390 yield chunk
392 component = self._createComponent(asn1Spec, tagSet, '', **options)
394 if chunk:
395 raise error.PyAsn1Error('Unexpected %d-octet substrate for Null' % length)
397 yield component
400class ObjectIdentifierPayloadDecoder(AbstractSimplePayloadDecoder):
401 protoComponent = univ.ObjectIdentifier(())
403 def valueDecoder(self, substrate, asn1Spec,
404 tagSet=None, length=None, state=None,
405 decodeFun=None, substrateFun=None,
406 **options):
407 if tagSet[0].tagFormat != tag.tagFormatSimple:
408 raise error.PyAsn1Error('Simple tag format expected')
410 for chunk in readFromStream(substrate, length, options):
411 if isinstance(chunk, SubstrateUnderrunError):
412 yield chunk
414 if not chunk:
415 raise error.PyAsn1Error('Empty substrate')
417 chunk = octs2ints(chunk)
419 oid = ()
420 index = 0
421 substrateLen = len(chunk)
422 while index < substrateLen:
423 subId = chunk[index]
424 index += 1
425 if subId < 128:
426 oid += (subId,)
427 elif subId > 128:
428 # Construct subid from a number of octets
429 nextSubId = subId
430 subId = 0
431 while nextSubId >= 128:
432 subId = (subId << 7) + (nextSubId & 0x7F)
433 if index >= substrateLen:
434 raise error.SubstrateUnderrunError(
435 'Short substrate for sub-OID past %s' % (oid,)
436 )
437 nextSubId = chunk[index]
438 index += 1
439 oid += ((subId << 7) + nextSubId,)
440 elif subId == 128:
441 # ASN.1 spec forbids leading zeros (0x80) in OID
442 # encoding, tolerating it opens a vulnerability. See
443 # https://www.esat.kuleuven.be/cosic/publications/article-1432.pdf
444 # page 7
445 raise error.PyAsn1Error('Invalid octet 0x80 in OID encoding')
447 # Decode two leading arcs
448 if 0 <= oid[0] <= 39:
449 oid = (0,) + oid
450 elif 40 <= oid[0] <= 79:
451 oid = (1, oid[0] - 40) + oid[1:]
452 elif oid[0] >= 80:
453 oid = (2, oid[0] - 80) + oid[1:]
454 else:
455 raise error.PyAsn1Error('Malformed first OID octet: %s' % chunk[0])
457 yield self._createComponent(asn1Spec, tagSet, oid, **options)
460class RealPayloadDecoder(AbstractSimplePayloadDecoder):
461 protoComponent = univ.Real()
463 def valueDecoder(self, substrate, asn1Spec,
464 tagSet=None, length=None, state=None,
465 decodeFun=None, substrateFun=None,
466 **options):
467 if tagSet[0].tagFormat != tag.tagFormatSimple:
468 raise error.PyAsn1Error('Simple tag format expected')
470 for chunk in readFromStream(substrate, length, options):
471 if isinstance(chunk, SubstrateUnderrunError):
472 yield chunk
474 if not chunk:
475 yield self._createComponent(asn1Spec, tagSet, 0.0, **options)
476 return
478 fo = oct2int(chunk[0])
479 chunk = chunk[1:]
480 if fo & 0x80: # binary encoding
481 if not chunk:
482 raise error.PyAsn1Error("Incomplete floating-point value")
484 if LOG:
485 LOG('decoding binary encoded REAL')
487 n = (fo & 0x03) + 1
489 if n == 4:
490 n = oct2int(chunk[0])
491 chunk = chunk[1:]
493 eo, chunk = chunk[:n], chunk[n:]
495 if not eo or not chunk:
496 raise error.PyAsn1Error('Real exponent screwed')
498 e = oct2int(eo[0]) & 0x80 and -1 or 0
500 while eo: # exponent
501 e <<= 8
502 e |= oct2int(eo[0])
503 eo = eo[1:]
505 b = fo >> 4 & 0x03 # base bits
507 if b > 2:
508 raise error.PyAsn1Error('Illegal Real base')
510 if b == 1: # encbase = 8
511 e *= 3
513 elif b == 2: # encbase = 16
514 e *= 4
515 p = 0
517 while chunk: # value
518 p <<= 8
519 p |= oct2int(chunk[0])
520 chunk = chunk[1:]
522 if fo & 0x40: # sign bit
523 p = -p
525 sf = fo >> 2 & 0x03 # scale bits
526 p *= 2 ** sf
527 value = (p, 2, e)
529 elif fo & 0x40: # infinite value
530 if LOG:
531 LOG('decoding infinite REAL')
533 value = fo & 0x01 and '-inf' or 'inf'
535 elif fo & 0xc0 == 0: # character encoding
536 if not chunk:
537 raise error.PyAsn1Error("Incomplete floating-point value")
539 if LOG:
540 LOG('decoding character encoded REAL')
542 try:
543 if fo & 0x3 == 0x1: # NR1
544 value = (int(chunk), 10, 0)
546 elif fo & 0x3 == 0x2: # NR2
547 value = float(chunk)
549 elif fo & 0x3 == 0x3: # NR3
550 value = float(chunk)
552 else:
553 raise error.SubstrateUnderrunError(
554 'Unknown NR (tag %s)' % fo
555 )
557 except ValueError:
558 raise error.SubstrateUnderrunError(
559 'Bad character Real syntax'
560 )
562 else:
563 raise error.SubstrateUnderrunError(
564 'Unknown encoding (tag %s)' % fo
565 )
567 yield self._createComponent(asn1Spec, tagSet, value, **options)
570class AbstractConstructedPayloadDecoder(AbstractPayloadDecoder):
571 protoComponent = None
574class ConstructedPayloadDecoderBase(AbstractConstructedPayloadDecoder):
575 protoRecordComponent = None
576 protoSequenceComponent = None
578 def _getComponentTagMap(self, asn1Object, idx):
579 raise NotImplementedError()
581 def _getComponentPositionByType(self, asn1Object, tagSet, idx):
582 raise NotImplementedError()
584 def _decodeComponentsSchemaless(
585 self, substrate, tagSet=None, decodeFun=None,
586 length=None, **options):
588 asn1Object = None
590 components = []
591 componentTypes = set()
593 original_position = substrate.tell()
595 while length == -1 or substrate.tell() < original_position + length:
596 for component in decodeFun(substrate, **options):
597 if isinstance(component, SubstrateUnderrunError):
598 yield component
600 if length == -1 and component is eoo.endOfOctets:
601 break
603 components.append(component)
604 componentTypes.add(component.tagSet)
606 # Now we have to guess is it SEQUENCE/SET or SEQUENCE OF/SET OF
607 # The heuristics is:
608 # * 1+ components of different types -> likely SEQUENCE/SET
609 # * otherwise -> likely SEQUENCE OF/SET OF
610 if len(componentTypes) > 1:
611 protoComponent = self.protoRecordComponent
613 else:
614 protoComponent = self.protoSequenceComponent
616 asn1Object = protoComponent.clone(
617 # construct tagSet from base tag from prototype ASN.1 object
618 # and additional tags recovered from the substrate
619 tagSet=tag.TagSet(protoComponent.tagSet.baseTag, *tagSet.superTags)
620 )
622 if LOG:
623 LOG('guessed %r container type (pass `asn1Spec` to guide the '
624 'decoder)' % asn1Object)
626 for idx, component in enumerate(components):
627 asn1Object.setComponentByPosition(
628 idx, component,
629 verifyConstraints=False,
630 matchTags=False, matchConstraints=False
631 )
633 yield asn1Object
635 def valueDecoder(self, substrate, asn1Spec,
636 tagSet=None, length=None, state=None,
637 decodeFun=None, substrateFun=None,
638 **options):
639 if tagSet[0].tagFormat != tag.tagFormatConstructed:
640 raise error.PyAsn1Error('Constructed tag format expected')
642 original_position = substrate.tell()
644 if substrateFun:
645 if asn1Spec is not None:
646 asn1Object = asn1Spec.clone()
648 elif self.protoComponent is not None:
649 asn1Object = self.protoComponent.clone(tagSet=tagSet)
651 else:
652 asn1Object = self.protoRecordComponent, self.protoSequenceComponent
654 for chunk in substrateFun(asn1Object, substrate, length, options):
655 yield chunk
657 return
659 if asn1Spec is None:
660 for asn1Object in self._decodeComponentsSchemaless(
661 substrate, tagSet=tagSet, decodeFun=decodeFun,
662 length=length, **options):
663 if isinstance(asn1Object, SubstrateUnderrunError):
664 yield asn1Object
666 if substrate.tell() < original_position + length:
667 if LOG:
668 for trailing in readFromStream(substrate, context=options):
669 if isinstance(trailing, SubstrateUnderrunError):
670 yield trailing
672 LOG('Unused trailing %d octets encountered: %s' % (
673 len(trailing), debug.hexdump(trailing)))
675 yield asn1Object
677 return
679 asn1Object = asn1Spec.clone()
680 asn1Object.clear()
682 options = self._passAsn1Object(asn1Object, options)
684 if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId):
686 namedTypes = asn1Spec.componentType
688 isSetType = asn1Spec.typeId == univ.Set.typeId
689 isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault
691 if LOG:
692 LOG('decoding %sdeterministic %s type %r chosen by type ID' % (
693 not isDeterministic and 'non-' or '', isSetType and 'SET' or '',
694 asn1Spec))
696 seenIndices = set()
697 idx = 0
698 while substrate.tell() - original_position < length:
699 if not namedTypes:
700 componentType = None
702 elif isSetType:
703 componentType = namedTypes.tagMapUnique
705 else:
706 try:
707 if isDeterministic:
708 componentType = namedTypes[idx].asn1Object
710 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
711 componentType = namedTypes.getTagMapNearPosition(idx)
713 else:
714 componentType = namedTypes[idx].asn1Object
716 except IndexError:
717 raise error.PyAsn1Error(
718 'Excessive components decoded at %r' % (asn1Spec,)
719 )
721 for component in decodeFun(substrate, componentType, **options):
722 if isinstance(component, SubstrateUnderrunError):
723 yield component
725 if not isDeterministic and namedTypes:
726 if isSetType:
727 idx = namedTypes.getPositionByType(component.effectiveTagSet)
729 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
730 idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx)
732 asn1Object.setComponentByPosition(
733 idx, component,
734 verifyConstraints=False,
735 matchTags=False, matchConstraints=False
736 )
738 seenIndices.add(idx)
739 idx += 1
741 if LOG:
742 LOG('seen component indices %s' % seenIndices)
744 if namedTypes:
745 if not namedTypes.requiredComponents.issubset(seenIndices):
746 raise error.PyAsn1Error(
747 'ASN.1 object %s has uninitialized '
748 'components' % asn1Object.__class__.__name__)
750 if namedTypes.hasOpenTypes:
752 openTypes = options.get('openTypes', {})
754 if LOG:
755 LOG('user-specified open types map:')
757 for k, v in openTypes.items():
758 LOG('%s -> %r' % (k, v))
760 if openTypes or options.get('decodeOpenTypes', False):
762 for idx, namedType in enumerate(namedTypes.namedTypes):
763 if not namedType.openType:
764 continue
766 if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue:
767 continue
769 governingValue = asn1Object.getComponentByName(
770 namedType.openType.name
771 )
773 try:
774 openType = openTypes[governingValue]
776 except KeyError:
778 if LOG:
779 LOG('default open types map of component '
780 '"%s.%s" governed by component "%s.%s"'
781 ':' % (asn1Object.__class__.__name__,
782 namedType.name,
783 asn1Object.__class__.__name__,
784 namedType.openType.name))
786 for k, v in namedType.openType.items():
787 LOG('%s -> %r' % (k, v))
789 try:
790 openType = namedType.openType[governingValue]
792 except KeyError:
793 if LOG:
794 LOG('failed to resolve open type by governing '
795 'value %r' % (governingValue,))
796 continue
798 if LOG:
799 LOG('resolved open type %r by governing '
800 'value %r' % (openType, governingValue))
802 containerValue = asn1Object.getComponentByPosition(idx)
804 if containerValue.typeId in (
805 univ.SetOf.typeId, univ.SequenceOf.typeId):
807 for pos, containerElement in enumerate(
808 containerValue):
810 stream = asSeekableStream(containerValue[pos].asOctets())
812 for component in decodeFun(stream, asn1Spec=openType, **options):
813 if isinstance(component, SubstrateUnderrunError):
814 yield component
816 containerValue[pos] = component
818 else:
819 stream = asSeekableStream(asn1Object.getComponentByPosition(idx).asOctets())
821 for component in decodeFun(stream, asn1Spec=openType, **options):
822 if isinstance(component, SubstrateUnderrunError):
823 yield component
825 asn1Object.setComponentByPosition(idx, component)
827 else:
828 inconsistency = asn1Object.isInconsistent
829 if inconsistency:
830 raise inconsistency
832 else:
833 componentType = asn1Spec.componentType
835 if LOG:
836 LOG('decoding type %r chosen by given `asn1Spec`' % componentType)
838 idx = 0
840 while substrate.tell() - original_position < length:
841 for component in decodeFun(substrate, componentType, **options):
842 if isinstance(component, SubstrateUnderrunError):
843 yield component
845 asn1Object.setComponentByPosition(
846 idx, component,
847 verifyConstraints=False,
848 matchTags=False, matchConstraints=False
849 )
851 idx += 1
853 yield asn1Object
855 def indefLenValueDecoder(self, substrate, asn1Spec,
856 tagSet=None, length=None, state=None,
857 decodeFun=None, substrateFun=None,
858 **options):
859 if tagSet[0].tagFormat != tag.tagFormatConstructed:
860 raise error.PyAsn1Error('Constructed tag format expected')
862 if substrateFun is not None:
863 if asn1Spec is not None:
864 asn1Object = asn1Spec.clone()
866 elif self.protoComponent is not None:
867 asn1Object = self.protoComponent.clone(tagSet=tagSet)
869 else:
870 asn1Object = self.protoRecordComponent, self.protoSequenceComponent
872 for chunk in substrateFun(asn1Object, substrate, length, options):
873 yield chunk
875 return
877 if asn1Spec is None:
878 for asn1Object in self._decodeComponentsSchemaless(
879 substrate, tagSet=tagSet, decodeFun=decodeFun,
880 length=length, **dict(options, allowEoo=True)):
881 if isinstance(asn1Object, SubstrateUnderrunError):
882 yield asn1Object
884 yield asn1Object
886 return
888 asn1Object = asn1Spec.clone()
889 asn1Object.clear()
891 options = self._passAsn1Object(asn1Object, options)
893 if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId):
895 namedTypes = asn1Object.componentType
897 isSetType = asn1Object.typeId == univ.Set.typeId
898 isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault
900 if LOG:
901 LOG('decoding %sdeterministic %s type %r chosen by type ID' % (
902 not isDeterministic and 'non-' or '', isSetType and 'SET' or '',
903 asn1Spec))
905 seenIndices = set()
907 idx = 0
909 while True: # loop over components
910 if len(namedTypes) <= idx:
911 asn1Spec = None
913 elif isSetType:
914 asn1Spec = namedTypes.tagMapUnique
916 else:
917 try:
918 if isDeterministic:
919 asn1Spec = namedTypes[idx].asn1Object
921 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
922 asn1Spec = namedTypes.getTagMapNearPosition(idx)
924 else:
925 asn1Spec = namedTypes[idx].asn1Object
927 except IndexError:
928 raise error.PyAsn1Error(
929 'Excessive components decoded at %r' % (asn1Object,)
930 )
932 for component in decodeFun(substrate, asn1Spec, allowEoo=True, **options):
934 if isinstance(component, SubstrateUnderrunError):
935 yield component
937 if component is eoo.endOfOctets:
938 break
940 if component is eoo.endOfOctets:
941 break
943 if not isDeterministic and namedTypes:
944 if isSetType:
945 idx = namedTypes.getPositionByType(component.effectiveTagSet)
947 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
948 idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx)
950 asn1Object.setComponentByPosition(
951 idx, component,
952 verifyConstraints=False,
953 matchTags=False, matchConstraints=False
954 )
956 seenIndices.add(idx)
957 idx += 1
959 if LOG:
960 LOG('seen component indices %s' % seenIndices)
962 if namedTypes:
963 if not namedTypes.requiredComponents.issubset(seenIndices):
964 raise error.PyAsn1Error(
965 'ASN.1 object %s has uninitialized '
966 'components' % asn1Object.__class__.__name__)
968 if namedTypes.hasOpenTypes:
970 openTypes = options.get('openTypes', {})
972 if LOG:
973 LOG('user-specified open types map:')
975 for k, v in openTypes.items():
976 LOG('%s -> %r' % (k, v))
978 if openTypes or options.get('decodeOpenTypes', False):
980 for idx, namedType in enumerate(namedTypes.namedTypes):
981 if not namedType.openType:
982 continue
984 if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue:
985 continue
987 governingValue = asn1Object.getComponentByName(
988 namedType.openType.name
989 )
991 try:
992 openType = openTypes[governingValue]
994 except KeyError:
996 if LOG:
997 LOG('default open types map of component '
998 '"%s.%s" governed by component "%s.%s"'
999 ':' % (asn1Object.__class__.__name__,
1000 namedType.name,
1001 asn1Object.__class__.__name__,
1002 namedType.openType.name))
1004 for k, v in namedType.openType.items():
1005 LOG('%s -> %r' % (k, v))
1007 try:
1008 openType = namedType.openType[governingValue]
1010 except KeyError:
1011 if LOG:
1012 LOG('failed to resolve open type by governing '
1013 'value %r' % (governingValue,))
1014 continue
1016 if LOG:
1017 LOG('resolved open type %r by governing '
1018 'value %r' % (openType, governingValue))
1020 containerValue = asn1Object.getComponentByPosition(idx)
1022 if containerValue.typeId in (
1023 univ.SetOf.typeId, univ.SequenceOf.typeId):
1025 for pos, containerElement in enumerate(
1026 containerValue):
1028 stream = asSeekableStream(containerValue[pos].asOctets())
1030 for component in decodeFun(stream, asn1Spec=openType,
1031 **dict(options, allowEoo=True)):
1032 if isinstance(component, SubstrateUnderrunError):
1033 yield component
1035 if component is eoo.endOfOctets:
1036 break
1038 containerValue[pos] = component
1040 else:
1041 stream = asSeekableStream(asn1Object.getComponentByPosition(idx).asOctets())
1042 for component in decodeFun(stream, asn1Spec=openType,
1043 **dict(options, allowEoo=True)):
1044 if isinstance(component, SubstrateUnderrunError):
1045 yield component
1047 if component is eoo.endOfOctets:
1048 break
1050 asn1Object.setComponentByPosition(idx, component)
1052 else:
1053 inconsistency = asn1Object.isInconsistent
1054 if inconsistency:
1055 raise inconsistency
1057 else:
1058 componentType = asn1Spec.componentType
1060 if LOG:
1061 LOG('decoding type %r chosen by given `asn1Spec`' % componentType)
1063 idx = 0
1065 while True:
1067 for component in decodeFun(
1068 substrate, componentType, allowEoo=True, **options):
1070 if isinstance(component, SubstrateUnderrunError):
1071 yield component
1073 if component is eoo.endOfOctets:
1074 break
1076 if component is eoo.endOfOctets:
1077 break
1079 asn1Object.setComponentByPosition(
1080 idx, component,
1081 verifyConstraints=False,
1082 matchTags=False, matchConstraints=False
1083 )
1085 idx += 1
1087 yield asn1Object
1090class SequenceOrSequenceOfPayloadDecoder(ConstructedPayloadDecoderBase):
1091 protoRecordComponent = univ.Sequence()
1092 protoSequenceComponent = univ.SequenceOf()
1095class SequencePayloadDecoder(SequenceOrSequenceOfPayloadDecoder):
1096 protoComponent = univ.Sequence()
1099class SequenceOfPayloadDecoder(SequenceOrSequenceOfPayloadDecoder):
1100 protoComponent = univ.SequenceOf()
1103class SetOrSetOfPayloadDecoder(ConstructedPayloadDecoderBase):
1104 protoRecordComponent = univ.Set()
1105 protoSequenceComponent = univ.SetOf()
1108class SetPayloadDecoder(SetOrSetOfPayloadDecoder):
1109 protoComponent = univ.Set()
1112class SetOfPayloadDecoder(SetOrSetOfPayloadDecoder):
1113 protoComponent = univ.SetOf()
1116class ChoicePayloadDecoder(ConstructedPayloadDecoderBase):
1117 protoComponent = univ.Choice()
1119 def valueDecoder(self, substrate, asn1Spec,
1120 tagSet=None, length=None, state=None,
1121 decodeFun=None, substrateFun=None,
1122 **options):
1123 if asn1Spec is None:
1124 asn1Object = self.protoComponent.clone(tagSet=tagSet)
1126 else:
1127 asn1Object = asn1Spec.clone()
1129 if substrateFun:
1130 for chunk in substrateFun(asn1Object, substrate, length, options):
1131 yield chunk
1133 return
1135 options = self._passAsn1Object(asn1Object, options)
1137 if asn1Object.tagSet == tagSet:
1138 if LOG:
1139 LOG('decoding %s as explicitly tagged CHOICE' % (tagSet,))
1141 for component in decodeFun(
1142 substrate, asn1Object.componentTagMap, **options):
1143 if isinstance(component, SubstrateUnderrunError):
1144 yield component
1146 else:
1147 if LOG:
1148 LOG('decoding %s as untagged CHOICE' % (tagSet,))
1150 for component in decodeFun(
1151 substrate, asn1Object.componentTagMap, tagSet, length,
1152 state, **options):
1153 if isinstance(component, SubstrateUnderrunError):
1154 yield component
1156 effectiveTagSet = component.effectiveTagSet
1158 if LOG:
1159 LOG('decoded component %s, effective tag set %s' % (component, effectiveTagSet))
1161 asn1Object.setComponentByType(
1162 effectiveTagSet, component,
1163 verifyConstraints=False,
1164 matchTags=False, matchConstraints=False,
1165 innerFlag=False
1166 )
1168 yield asn1Object
1170 def indefLenValueDecoder(self, substrate, asn1Spec,
1171 tagSet=None, length=None, state=None,
1172 decodeFun=None, substrateFun=None,
1173 **options):
1174 if asn1Spec is None:
1175 asn1Object = self.protoComponent.clone(tagSet=tagSet)
1177 else:
1178 asn1Object = asn1Spec.clone()
1180 if substrateFun:
1181 for chunk in substrateFun(asn1Object, substrate, length, options):
1182 yield chunk
1184 return
1186 options = self._passAsn1Object(asn1Object, options)
1188 isTagged = asn1Object.tagSet == tagSet
1190 if LOG:
1191 LOG('decoding %s as %stagged CHOICE' % (
1192 tagSet, isTagged and 'explicitly ' or 'un'))
1194 while True:
1196 if isTagged:
1197 iterator = decodeFun(
1198 substrate, asn1Object.componentType.tagMapUnique,
1199 **dict(options, allowEoo=True))
1201 else:
1202 iterator = decodeFun(
1203 substrate, asn1Object.componentType.tagMapUnique,
1204 tagSet, length, state, **dict(options, allowEoo=True))
1206 for component in iterator:
1208 if isinstance(component, SubstrateUnderrunError):
1209 yield component
1211 if component is eoo.endOfOctets:
1212 break
1214 effectiveTagSet = component.effectiveTagSet
1216 if LOG:
1217 LOG('decoded component %s, effective tag set '
1218 '%s' % (component, effectiveTagSet))
1220 asn1Object.setComponentByType(
1221 effectiveTagSet, component,
1222 verifyConstraints=False,
1223 matchTags=False, matchConstraints=False,
1224 innerFlag=False
1225 )
1227 if not isTagged:
1228 break
1230 if not isTagged or component is eoo.endOfOctets:
1231 break
1233 yield asn1Object
1236class AnyPayloadDecoder(AbstractSimplePayloadDecoder):
1237 protoComponent = univ.Any()
1239 def valueDecoder(self, substrate, asn1Spec,
1240 tagSet=None, length=None, state=None,
1241 decodeFun=None, substrateFun=None,
1242 **options):
1243 if asn1Spec is None:
1244 isUntagged = True
1246 elif asn1Spec.__class__ is tagmap.TagMap:
1247 isUntagged = tagSet not in asn1Spec.tagMap
1249 else:
1250 isUntagged = tagSet != asn1Spec.tagSet
1252 if isUntagged:
1253 fullPosition = substrate.markedPosition
1254 currentPosition = substrate.tell()
1256 substrate.seek(fullPosition, os.SEEK_SET)
1257 length += currentPosition - fullPosition
1259 if LOG:
1260 for chunk in peekIntoStream(substrate, length):
1261 if isinstance(chunk, SubstrateUnderrunError):
1262 yield chunk
1263 LOG('decoding as untagged ANY, substrate '
1264 '%s' % debug.hexdump(chunk))
1266 if substrateFun:
1267 for chunk in substrateFun(
1268 self._createComponent(asn1Spec, tagSet, noValue, **options),
1269 substrate, length, options):
1270 yield chunk
1272 return
1274 for chunk in readFromStream(substrate, length, options):
1275 if isinstance(chunk, SubstrateUnderrunError):
1276 yield chunk
1278 yield self._createComponent(asn1Spec, tagSet, chunk, **options)
1280 def indefLenValueDecoder(self, substrate, asn1Spec,
1281 tagSet=None, length=None, state=None,
1282 decodeFun=None, substrateFun=None,
1283 **options):
1284 if asn1Spec is None:
1285 isTagged = False
1287 elif asn1Spec.__class__ is tagmap.TagMap:
1288 isTagged = tagSet in asn1Spec.tagMap
1290 else:
1291 isTagged = tagSet == asn1Spec.tagSet
1293 if isTagged:
1294 # tagged Any type -- consume header substrate
1295 chunk = null
1297 if LOG:
1298 LOG('decoding as tagged ANY')
1300 else:
1301 # TODO: Seems not to be tested
1302 fullPosition = substrate.markedPosition
1303 currentPosition = substrate.tell()
1305 substrate.seek(fullPosition, os.SEEK_SET)
1306 for chunk in readFromStream(substrate, currentPosition - fullPosition, options):
1307 if isinstance(chunk, SubstrateUnderrunError):
1308 yield chunk
1310 if LOG:
1311 LOG('decoding as untagged ANY, header substrate %s' % debug.hexdump(chunk))
1313 # Any components do not inherit initial tag
1314 asn1Spec = self.protoComponent
1316 if substrateFun and substrateFun is not self.substrateCollector:
1317 asn1Object = self._createComponent(
1318 asn1Spec, tagSet, noValue, **options)
1320 for chunk in substrateFun(
1321 asn1Object, chunk + substrate, length + len(chunk), options):
1322 yield chunk
1324 return
1326 if LOG:
1327 LOG('assembling constructed serialization')
1329 # All inner fragments are of the same type, treat them as octet string
1330 substrateFun = self.substrateCollector
1332 while True: # loop over fragments
1334 for component in decodeFun(
1335 substrate, asn1Spec, substrateFun=substrateFun,
1336 allowEoo=True, **options):
1338 if isinstance(component, SubstrateUnderrunError):
1339 yield component
1341 if component is eoo.endOfOctets:
1342 break
1344 if component is eoo.endOfOctets:
1345 break
1347 chunk += component
1349 if substrateFun:
1350 yield chunk # TODO: Weird
1352 else:
1353 yield self._createComponent(asn1Spec, tagSet, chunk, **options)
1356# character string types
1357class UTF8StringPayloadDecoder(OctetStringPayloadDecoder):
1358 protoComponent = char.UTF8String()
1361class NumericStringPayloadDecoder(OctetStringPayloadDecoder):
1362 protoComponent = char.NumericString()
1365class PrintableStringPayloadDecoder(OctetStringPayloadDecoder):
1366 protoComponent = char.PrintableString()
1369class TeletexStringPayloadDecoder(OctetStringPayloadDecoder):
1370 protoComponent = char.TeletexString()
1373class VideotexStringPayloadDecoder(OctetStringPayloadDecoder):
1374 protoComponent = char.VideotexString()
1377class IA5StringPayloadDecoder(OctetStringPayloadDecoder):
1378 protoComponent = char.IA5String()
1381class GraphicStringPayloadDecoder(OctetStringPayloadDecoder):
1382 protoComponent = char.GraphicString()
1385class VisibleStringPayloadDecoder(OctetStringPayloadDecoder):
1386 protoComponent = char.VisibleString()
1389class GeneralStringPayloadDecoder(OctetStringPayloadDecoder):
1390 protoComponent = char.GeneralString()
1393class UniversalStringPayloadDecoder(OctetStringPayloadDecoder):
1394 protoComponent = char.UniversalString()
1397class BMPStringPayloadDecoder(OctetStringPayloadDecoder):
1398 protoComponent = char.BMPString()
1401# "useful" types
1402class ObjectDescriptorPayloadDecoder(OctetStringPayloadDecoder):
1403 protoComponent = useful.ObjectDescriptor()
1406class GeneralizedTimePayloadDecoder(OctetStringPayloadDecoder):
1407 protoComponent = useful.GeneralizedTime()
1410class UTCTimePayloadDecoder(OctetStringPayloadDecoder):
1411 protoComponent = useful.UTCTime()
1414TAG_MAP = {
1415 univ.Integer.tagSet: IntegerPayloadDecoder(),
1416 univ.Boolean.tagSet: BooleanPayloadDecoder(),
1417 univ.BitString.tagSet: BitStringPayloadDecoder(),
1418 univ.OctetString.tagSet: OctetStringPayloadDecoder(),
1419 univ.Null.tagSet: NullPayloadDecoder(),
1420 univ.ObjectIdentifier.tagSet: ObjectIdentifierPayloadDecoder(),
1421 univ.Enumerated.tagSet: IntegerPayloadDecoder(),
1422 univ.Real.tagSet: RealPayloadDecoder(),
1423 univ.Sequence.tagSet: SequenceOrSequenceOfPayloadDecoder(), # conflicts with SequenceOf
1424 univ.Set.tagSet: SetOrSetOfPayloadDecoder(), # conflicts with SetOf
1425 univ.Choice.tagSet: ChoicePayloadDecoder(), # conflicts with Any
1426 # character string types
1427 char.UTF8String.tagSet: UTF8StringPayloadDecoder(),
1428 char.NumericString.tagSet: NumericStringPayloadDecoder(),
1429 char.PrintableString.tagSet: PrintableStringPayloadDecoder(),
1430 char.TeletexString.tagSet: TeletexStringPayloadDecoder(),
1431 char.VideotexString.tagSet: VideotexStringPayloadDecoder(),
1432 char.IA5String.tagSet: IA5StringPayloadDecoder(),
1433 char.GraphicString.tagSet: GraphicStringPayloadDecoder(),
1434 char.VisibleString.tagSet: VisibleStringPayloadDecoder(),
1435 char.GeneralString.tagSet: GeneralStringPayloadDecoder(),
1436 char.UniversalString.tagSet: UniversalStringPayloadDecoder(),
1437 char.BMPString.tagSet: BMPStringPayloadDecoder(),
1438 # useful types
1439 useful.ObjectDescriptor.tagSet: ObjectDescriptorPayloadDecoder(),
1440 useful.GeneralizedTime.tagSet: GeneralizedTimePayloadDecoder(),
1441 useful.UTCTime.tagSet: UTCTimePayloadDecoder()
1442}
1444# Type-to-codec map for ambiguous ASN.1 types
1445TYPE_MAP = {
1446 univ.Set.typeId: SetPayloadDecoder(),
1447 univ.SetOf.typeId: SetOfPayloadDecoder(),
1448 univ.Sequence.typeId: SequencePayloadDecoder(),
1449 univ.SequenceOf.typeId: SequenceOfPayloadDecoder(),
1450 univ.Choice.typeId: ChoicePayloadDecoder(),
1451 univ.Any.typeId: AnyPayloadDecoder()
1452}
1454# deprecated aliases, https://github.com/pyasn1/pyasn1/issues/9
1455tagMap = TAG_MAP
1456typeMap = TYPE_MAP
1458# Put in non-ambiguous types for faster codec lookup
1459for typeDecoder in TAG_MAP.values():
1460 if typeDecoder.protoComponent is not None:
1461 typeId = typeDecoder.protoComponent.__class__.typeId
1462 if typeId is not None and typeId not in TYPE_MAP:
1463 TYPE_MAP[typeId] = typeDecoder
1466(stDecodeTag,
1467 stDecodeLength,
1468 stGetValueDecoder,
1469 stGetValueDecoderByAsn1Spec,
1470 stGetValueDecoderByTag,
1471 stTryAsExplicitTag,
1472 stDecodeValue,
1473 stDumpRawValue,
1474 stErrorCondition,
1475 stStop) = [x for x in range(10)]
1478EOO_SENTINEL = ints2octs((0, 0))
1481class SingleItemDecoder(object):
1482 defaultErrorState = stErrorCondition
1483 #defaultErrorState = stDumpRawValue
1484 defaultRawDecoder = AnyPayloadDecoder()
1486 supportIndefLength = True
1488 TAG_MAP = TAG_MAP
1489 TYPE_MAP = TYPE_MAP
1491 def __init__(self, tagMap=_MISSING, typeMap=_MISSING, **ignored):
1492 self._tagMap = tagMap if tagMap is not _MISSING else self.TAG_MAP
1493 self._typeMap = typeMap if typeMap is not _MISSING else self.TYPE_MAP
1495 # Tag & TagSet objects caches
1496 self._tagCache = {}
1497 self._tagSetCache = {}
1499 def __call__(self, substrate, asn1Spec=None,
1500 tagSet=None, length=None, state=stDecodeTag,
1501 decodeFun=None, substrateFun=None,
1502 **options):
1504 allowEoo = options.pop('allowEoo', False)
1506 if LOG:
1507 LOG('decoder called at scope %s with state %d, working with up '
1508 'to %s octets of substrate: '
1509 '%s' % (debug.scope, state, length, substrate))
1511 # Look for end-of-octets sentinel
1512 if allowEoo and self.supportIndefLength:
1514 for eoo_candidate in readFromStream(substrate, 2, options):
1515 if isinstance(eoo_candidate, SubstrateUnderrunError):
1516 yield eoo_candidate
1518 if eoo_candidate == EOO_SENTINEL:
1519 if LOG:
1520 LOG('end-of-octets sentinel found')
1521 yield eoo.endOfOctets
1522 return
1524 else:
1525 substrate.seek(-2, os.SEEK_CUR)
1527 tagMap = self._tagMap
1528 typeMap = self._typeMap
1529 tagCache = self._tagCache
1530 tagSetCache = self._tagSetCache
1532 value = noValue
1534 substrate.markedPosition = substrate.tell()
1536 while state is not stStop:
1538 if state is stDecodeTag:
1539 # Decode tag
1540 isShortTag = True
1542 for firstByte in readFromStream(substrate, 1, options):
1543 if isinstance(firstByte, SubstrateUnderrunError):
1544 yield firstByte
1546 firstOctet = ord(firstByte)
1548 try:
1549 lastTag = tagCache[firstOctet]
1551 except KeyError:
1552 integerTag = firstOctet
1553 tagClass = integerTag & 0xC0
1554 tagFormat = integerTag & 0x20
1555 tagId = integerTag & 0x1F
1557 if tagId == 0x1F:
1558 isShortTag = False
1559 lengthOctetIdx = 0
1560 tagId = 0
1562 while True:
1563 for integerByte in readFromStream(substrate, 1, options):
1564 if isinstance(integerByte, SubstrateUnderrunError):
1565 yield integerByte
1567 if not integerByte:
1568 raise error.SubstrateUnderrunError(
1569 'Short octet stream on long tag decoding'
1570 )
1572 integerTag = ord(integerByte)
1573 lengthOctetIdx += 1
1574 tagId <<= 7
1575 tagId |= (integerTag & 0x7F)
1577 if not integerTag & 0x80:
1578 break
1580 lastTag = tag.Tag(
1581 tagClass=tagClass, tagFormat=tagFormat, tagId=tagId
1582 )
1584 if isShortTag:
1585 # cache short tags
1586 tagCache[firstOctet] = lastTag
1588 if tagSet is None:
1589 if isShortTag:
1590 try:
1591 tagSet = tagSetCache[firstOctet]
1593 except KeyError:
1594 # base tag not recovered
1595 tagSet = tag.TagSet((), lastTag)
1596 tagSetCache[firstOctet] = tagSet
1597 else:
1598 tagSet = tag.TagSet((), lastTag)
1600 else:
1601 tagSet = lastTag + tagSet
1603 state = stDecodeLength
1605 if LOG:
1606 LOG('tag decoded into %s, decoding length' % tagSet)
1608 if state is stDecodeLength:
1609 # Decode length
1610 for firstOctet in readFromStream(substrate, 1, options):
1611 if isinstance(firstOctet, SubstrateUnderrunError):
1612 yield firstOctet
1614 firstOctet = ord(firstOctet)
1616 if firstOctet < 128:
1617 length = firstOctet
1619 elif firstOctet > 128:
1620 size = firstOctet & 0x7F
1621 # encoded in size bytes
1622 for encodedLength in readFromStream(substrate, size, options):
1623 if isinstance(encodedLength, SubstrateUnderrunError):
1624 yield encodedLength
1625 encodedLength = list(encodedLength)
1626 # missing check on maximum size, which shouldn't be a
1627 # problem, we can handle more than is possible
1628 if len(encodedLength) != size:
1629 raise error.SubstrateUnderrunError(
1630 '%s<%s at %s' % (size, len(encodedLength), tagSet)
1631 )
1633 length = 0
1634 for lengthOctet in encodedLength:
1635 length <<= 8
1636 length |= oct2int(lengthOctet)
1637 size += 1
1639 else: # 128 means indefinite
1640 length = -1
1642 if length == -1 and not self.supportIndefLength:
1643 raise error.PyAsn1Error('Indefinite length encoding not supported by this codec')
1645 state = stGetValueDecoder
1647 if LOG:
1648 LOG('value length decoded into %d' % length)
1650 if state is stGetValueDecoder:
1651 if asn1Spec is None:
1652 state = stGetValueDecoderByTag
1654 else:
1655 state = stGetValueDecoderByAsn1Spec
1656 #
1657 # There're two ways of creating subtypes in ASN.1 what influences
1658 # decoder operation. These methods are:
1659 # 1) Either base types used in or no IMPLICIT tagging has been
1660 # applied on subtyping.
1661 # 2) Subtype syntax drops base type information (by means of
1662 # IMPLICIT tagging.
1663 # The first case allows for complete tag recovery from substrate
1664 # while the second one requires original ASN.1 type spec for
1665 # decoding.
1666 #
1667 # In either case a set of tags (tagSet) is coming from substrate
1668 # in an incremental, tag-by-tag fashion (this is the case of
1669 # EXPLICIT tag which is most basic). Outermost tag comes first
1670 # from the wire.
1671 #
1672 if state is stGetValueDecoderByTag:
1673 try:
1674 concreteDecoder = tagMap[tagSet]
1676 except KeyError:
1677 concreteDecoder = None
1679 if concreteDecoder:
1680 state = stDecodeValue
1682 else:
1683 try:
1684 concreteDecoder = tagMap[tagSet[:1]]
1686 except KeyError:
1687 concreteDecoder = None
1689 if concreteDecoder:
1690 state = stDecodeValue
1691 else:
1692 state = stTryAsExplicitTag
1694 if LOG:
1695 LOG('codec %s chosen by a built-in type, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag'))
1696 debug.scope.push(concreteDecoder is None and '?' or concreteDecoder.protoComponent.__class__.__name__)
1698 if state is stGetValueDecoderByAsn1Spec:
1700 if asn1Spec.__class__ is tagmap.TagMap:
1701 try:
1702 chosenSpec = asn1Spec[tagSet]
1704 except KeyError:
1705 chosenSpec = None
1707 if LOG:
1708 LOG('candidate ASN.1 spec is a map of:')
1710 for firstOctet, v in asn1Spec.presentTypes.items():
1711 LOG(' %s -> %s' % (firstOctet, v.__class__.__name__))
1713 if asn1Spec.skipTypes:
1714 LOG('but neither of: ')
1715 for firstOctet, v in asn1Spec.skipTypes.items():
1716 LOG(' %s -> %s' % (firstOctet, v.__class__.__name__))
1717 LOG('new candidate ASN.1 spec is %s, chosen by %s' % (chosenSpec is None and '<none>' or chosenSpec.prettyPrintType(), tagSet))
1719 elif tagSet == asn1Spec.tagSet or tagSet in asn1Spec.tagMap:
1720 chosenSpec = asn1Spec
1721 if LOG:
1722 LOG('candidate ASN.1 spec is %s' % asn1Spec.__class__.__name__)
1724 else:
1725 chosenSpec = None
1727 if chosenSpec is not None:
1728 try:
1729 # ambiguous type or just faster codec lookup
1730 concreteDecoder = typeMap[chosenSpec.typeId]
1732 if LOG:
1733 LOG('value decoder chosen for an ambiguous type by type ID %s' % (chosenSpec.typeId,))
1735 except KeyError:
1736 # use base type for codec lookup to recover untagged types
1737 baseTagSet = tag.TagSet(chosenSpec.tagSet.baseTag, chosenSpec.tagSet.baseTag)
1738 try:
1739 # base type or tagged subtype
1740 concreteDecoder = tagMap[baseTagSet]
1742 if LOG:
1743 LOG('value decoder chosen by base %s' % (baseTagSet,))
1745 except KeyError:
1746 concreteDecoder = None
1748 if concreteDecoder:
1749 asn1Spec = chosenSpec
1750 state = stDecodeValue
1752 else:
1753 state = stTryAsExplicitTag
1755 else:
1756 concreteDecoder = None
1757 state = stTryAsExplicitTag
1759 if LOG:
1760 LOG('codec %s chosen by ASN.1 spec, decoding %s' % (state is stDecodeValue and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag'))
1761 debug.scope.push(chosenSpec is None and '?' or chosenSpec.__class__.__name__)
1763 if state is stDecodeValue:
1764 if not options.get('recursiveFlag', True) and not substrateFun: # deprecate this
1765 substrateFun = lambda a, b, c: (a, b[:c])
1767 original_position = substrate.tell()
1769 if length == -1: # indef length
1770 for value in concreteDecoder.indefLenValueDecoder(
1771 substrate, asn1Spec,
1772 tagSet, length, stGetValueDecoder,
1773 self, substrateFun, **options):
1774 if isinstance(value, SubstrateUnderrunError):
1775 yield value
1777 else:
1778 for value in concreteDecoder.valueDecoder(
1779 substrate, asn1Spec,
1780 tagSet, length, stGetValueDecoder,
1781 self, substrateFun, **options):
1782 if isinstance(value, SubstrateUnderrunError):
1783 yield value
1785 bytesRead = substrate.tell() - original_position
1786 if bytesRead != length:
1787 raise PyAsn1Error(
1788 "Read %s bytes instead of expected %s." % (bytesRead, length))
1790 if LOG:
1791 LOG('codec %s yields type %s, value:\n%s\n...' % (
1792 concreteDecoder.__class__.__name__, value.__class__.__name__,
1793 isinstance(value, base.Asn1Item) and value.prettyPrint() or value))
1795 state = stStop
1796 break
1798 if state is stTryAsExplicitTag:
1799 if (tagSet and
1800 tagSet[0].tagFormat == tag.tagFormatConstructed and
1801 tagSet[0].tagClass != tag.tagClassUniversal):
1802 # Assume explicit tagging
1803 concreteDecoder = rawPayloadDecoder
1804 state = stDecodeValue
1806 else:
1807 concreteDecoder = None
1808 state = self.defaultErrorState
1810 if LOG:
1811 LOG('codec %s chosen, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as failure'))
1813 if state is stDumpRawValue:
1814 concreteDecoder = self.defaultRawDecoder
1816 if LOG:
1817 LOG('codec %s chosen, decoding value' % concreteDecoder.__class__.__name__)
1819 state = stDecodeValue
1821 if state is stErrorCondition:
1822 raise error.PyAsn1Error(
1823 '%s not in asn1Spec: %r' % (tagSet, asn1Spec)
1824 )
1826 if LOG:
1827 debug.scope.pop()
1828 LOG('decoder left scope %s, call completed' % debug.scope)
1830 yield value
1833class StreamingDecoder(object):
1834 """Create an iterator that turns BER/CER/DER byte stream into ASN.1 objects.
1836 On each iteration, consume whatever BER/CER/DER serialization is
1837 available in the `substrate` stream-like object and turns it into
1838 one or more, possibly nested, ASN.1 objects.
1840 Parameters
1841 ----------
1842 substrate: :py:class:`file`, :py:class:`io.BytesIO`
1843 BER/CER/DER serialization in form of a byte stream
1845 Keyword Args
1846 ------------
1847 asn1Spec: :py:class:`~pyasn1.type.base.PyAsn1Item`
1848 A pyasn1 type object to act as a template guiding the decoder.
1849 Depending on the ASN.1 structure being decoded, `asn1Spec` may
1850 or may not be required. One of the reasons why `asn1Spec` may
1851 me required is that ASN.1 structure is encoded in the *IMPLICIT*
1852 tagging mode.
1854 Yields
1855 ------
1856 : :py:class:`~pyasn1.type.base.PyAsn1Item`, :py:class:`~pyasn1.error.SubstrateUnderrunError`
1857 Decoded ASN.1 object (possibly, nested) or
1858 :py:class:`~pyasn1.error.SubstrateUnderrunError` object indicating
1859 insufficient BER/CER/DER serialization on input to fully recover ASN.1
1860 objects from it.
1862 In the latter case the caller is advised to ensure some more data in
1863 the input stream, then call the iterator again. The decoder will resume
1864 the decoding process using the newly arrived data.
1866 The `context` property of :py:class:`~pyasn1.error.SubstrateUnderrunError`
1867 object might hold a reference to the partially populated ASN.1 object
1868 being reconstructed.
1870 Raises
1871 ------
1872 ~pyasn1.error.PyAsn1Error, ~pyasn1.error.EndOfStreamError
1873 `PyAsn1Error` on deserialization error, `EndOfStreamError` on
1874 premature stream closure.
1876 Examples
1877 --------
1878 Decode BER serialisation without ASN.1 schema
1880 .. code-block:: pycon
1882 >>> stream = io.BytesIO(
1883 ... b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03')
1884 >>>
1885 >>> for asn1Object in StreamingDecoder(stream):
1886 ... print(asn1Object)
1887 >>>
1888 SequenceOf:
1889 1 2 3
1891 Decode BER serialisation with ASN.1 schema
1893 .. code-block:: pycon
1895 >>> stream = io.BytesIO(
1896 ... b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03')
1897 >>>
1898 >>> schema = SequenceOf(componentType=Integer())
1899 >>>
1900 >>> decoder = StreamingDecoder(stream, asn1Spec=schema)
1901 >>> for asn1Object in decoder:
1902 ... print(asn1Object)
1903 >>>
1904 SequenceOf:
1905 1 2 3
1906 """
1908 SINGLE_ITEM_DECODER = SingleItemDecoder
1910 def __init__(self, substrate, asn1Spec=None, **options):
1911 self._singleItemDecoder = self.SINGLE_ITEM_DECODER(**options)
1912 self._substrate = asSeekableStream(substrate)
1913 self._asn1Spec = asn1Spec
1914 self._options = options
1916 def __iter__(self):
1917 while True:
1918 for asn1Object in self._singleItemDecoder(
1919 self._substrate, self._asn1Spec, **self._options):
1920 yield asn1Object
1922 for chunk in isEndOfStream(self._substrate):
1923 if isinstance(chunk, SubstrateUnderrunError):
1924 yield
1926 break
1928 if chunk:
1929 break
1932class Decoder(object):
1933 """Create a BER decoder object.
1935 Parse BER/CER/DER octet-stream into one, possibly nested, ASN.1 object.
1936 """
1937 STREAMING_DECODER = StreamingDecoder
1939 @classmethod
1940 def __call__(cls, substrate, asn1Spec=None, **options):
1941 """Turns BER/CER/DER octet stream into an ASN.1 object.
1943 Takes BER/CER/DER octet-stream in form of :py:class:`bytes` (Python 3)
1944 or :py:class:`str` (Python 2) and decode it into an ASN.1 object
1945 (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) which
1946 may be a scalar or an arbitrary nested structure.
1948 Parameters
1949 ----------
1950 substrate: :py:class:`bytes` (Python 3) or :py:class:`str` (Python 2)
1951 BER/CER/DER octet-stream to parse
1953 Keyword Args
1954 ------------
1955 asn1Spec: :py:class:`~pyasn1.type.base.PyAsn1Item`
1956 A pyasn1 type object (:py:class:`~pyasn1.type.base.PyAsn1Item`
1957 derivative) to act as a template guiding the decoder.
1958 Depending on the ASN.1 structure being decoded, `asn1Spec` may or
1959 may not be required. Most common reason for it to require is that
1960 ASN.1 structure is encoded in *IMPLICIT* tagging mode.
1962 Returns
1963 -------
1964 : :py:class:`tuple`
1965 A tuple of :py:class:`~pyasn1.type.base.PyAsn1Item` object
1966 recovered from BER/CER/DER substrate and the unprocessed trailing
1967 portion of the `substrate` (may be empty)
1969 Raises
1970 ------
1971 : :py:class:`~pyasn1.error.PyAsn1Error`
1972 :py:class:`~pyasn1.error.SubstrateUnderrunError` on insufficient
1973 input or :py:class:`~pyasn1.error.PyAsn1Error` on decoding error.
1975 Examples
1976 --------
1977 Decode BER/CER/DER serialisation without ASN.1 schema
1979 .. code-block:: pycon
1981 >>> s, unprocessed = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03')
1982 >>> str(s)
1983 SequenceOf:
1984 1 2 3
1986 Decode BER/CER/DER serialisation with ASN.1 schema
1988 .. code-block:: pycon
1990 >>> seq = SequenceOf(componentType=Integer())
1991 >>> s, unprocessed = decode(
1992 b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03', asn1Spec=seq)
1993 >>> str(s)
1994 SequenceOf:
1995 1 2 3
1997 """
1998 substrate = asSeekableStream(substrate)
2000 streamingDecoder = cls.STREAMING_DECODER(
2001 substrate, asn1Spec, **options)
2003 for asn1Object in streamingDecoder:
2004 if isinstance(asn1Object, SubstrateUnderrunError):
2005 raise error.SubstrateUnderrunError('Short substrate on input')
2007 try:
2008 tail = next(readFromStream(substrate))
2010 except error.EndOfStreamError:
2011 tail = null
2013 return asn1Object, tail
2016#: Turns BER octet stream into an ASN.1 object.
2017#:
2018#: Takes BER octet-stream and decode it into an ASN.1 object
2019#: (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) which
2020#: may be a scalar or an arbitrary nested structure.
2021#:
2022#: Parameters
2023#: ----------
2024#: substrate: :py:class:`bytes` (Python 3) or :py:class:`str` (Python 2)
2025#: BER octet-stream
2026#:
2027#: Keyword Args
2028#: ------------
2029#: asn1Spec: any pyasn1 type object e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative
2030#: A pyasn1 type object to act as a template guiding the decoder. Depending on the ASN.1 structure
2031#: being decoded, *asn1Spec* may or may not be required. Most common reason for
2032#: it to require is that ASN.1 structure is encoded in *IMPLICIT* tagging mode.
2033#:
2034#: Returns
2035#: -------
2036#: : :py:class:`tuple`
2037#: A tuple of pyasn1 object recovered from BER substrate (:py:class:`~pyasn1.type.base.PyAsn1Item` derivative)
2038#: and the unprocessed trailing portion of the *substrate* (may be empty)
2039#:
2040#: Raises
2041#: ------
2042#: ~pyasn1.error.PyAsn1Error, ~pyasn1.error.SubstrateUnderrunError
2043#: On decoding errors
2044#:
2045#: Notes
2046#: -----
2047#: This function is deprecated. Please use :py:class:`Decoder` or
2048#: :py:class:`StreamingDecoder` class instance.
2049#:
2050#: Examples
2051#: --------
2052#: Decode BER serialisation without ASN.1 schema
2053#:
2054#: .. code-block:: pycon
2055#:
2056#: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03')
2057#: >>> str(s)
2058#: SequenceOf:
2059#: 1 2 3
2060#:
2061#: Decode BER serialisation with ASN.1 schema
2062#:
2063#: .. code-block:: pycon
2064#:
2065#: >>> seq = SequenceOf(componentType=Integer())
2066#: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03', asn1Spec=seq)
2067#: >>> str(s)
2068#: SequenceOf:
2069#: 1 2 3
2070#:
2071decode = Decoder()