1#
2# This file is part of pyasn1 software.
3#
4# Copyright (c) 2005-2020, Ilya Etingof <etingof@gmail.com>
5# License: https://pyasn1.readthedocs.io/en/latest/license.html
6#
7import io
8import os
9import sys
10
11
12from pyasn1 import debug
13from pyasn1 import error
14from pyasn1.codec.ber import eoo
15from pyasn1.codec.streaming import asSeekableStream
16from pyasn1.codec.streaming import isEndOfStream
17from pyasn1.codec.streaming import peekIntoStream
18from pyasn1.codec.streaming import readFromStream
19from pyasn1.compat import _MISSING
20from pyasn1.compat.integer import from_bytes
21from pyasn1.compat.octets import oct2int, octs2ints, ints2octs, null
22from pyasn1.error import PyAsn1Error
23from pyasn1.type import base
24from pyasn1.type import char
25from pyasn1.type import tag
26from pyasn1.type import tagmap
27from pyasn1.type import univ
28from pyasn1.type import useful
29
30__all__ = ['StreamingDecoder', 'Decoder', 'decode']
31
32LOG = debug.registerLoggee(__name__, flags=debug.DEBUG_DECODER)
33
34noValue = base.noValue
35
36SubstrateUnderrunError = error.SubstrateUnderrunError
37
38
39class AbstractPayloadDecoder(object):
40 protoComponent = None
41
42 def valueDecoder(self, substrate, asn1Spec,
43 tagSet=None, length=None, state=None,
44 decodeFun=None, substrateFun=None,
45 **options):
46 """Decode value with fixed byte length.
47
48 The decoder is allowed to consume as many bytes as necessary.
49 """
50 raise error.PyAsn1Error('SingleItemDecoder not implemented for %s' % (tagSet,)) # TODO: Seems more like an NotImplementedError?
51
52 def indefLenValueDecoder(self, substrate, asn1Spec,
53 tagSet=None, length=None, state=None,
54 decodeFun=None, substrateFun=None,
55 **options):
56 """Decode value with undefined length.
57
58 The decoder is allowed to consume as many bytes as necessary.
59 """
60 raise error.PyAsn1Error('Indefinite length mode decoder not implemented for %s' % (tagSet,)) # TODO: Seems more like an NotImplementedError?
61
62 @staticmethod
63 def _passAsn1Object(asn1Object, options):
64 if 'asn1Object' not in options:
65 options['asn1Object'] = asn1Object
66
67 return options
68
69
70class AbstractSimplePayloadDecoder(AbstractPayloadDecoder):
71 @staticmethod
72 def substrateCollector(asn1Object, substrate, length, options):
73 for chunk in readFromStream(substrate, length, options):
74 yield chunk
75
76 def _createComponent(self, asn1Spec, tagSet, value, **options):
77 if options.get('native'):
78 return value
79 elif asn1Spec is None:
80 return self.protoComponent.clone(value, tagSet=tagSet)
81 elif value is noValue:
82 return asn1Spec
83 else:
84 return asn1Spec.clone(value)
85
86
87class RawPayloadDecoder(AbstractSimplePayloadDecoder):
88 protoComponent = univ.Any('')
89
90 def valueDecoder(self, substrate, asn1Spec,
91 tagSet=None, length=None, state=None,
92 decodeFun=None, substrateFun=None,
93 **options):
94 if substrateFun:
95 asn1Object = self._createComponent(asn1Spec, tagSet, '', **options)
96
97 for chunk in substrateFun(asn1Object, substrate, length, options):
98 yield chunk
99
100 return
101
102 for value in decodeFun(substrate, asn1Spec, tagSet, length, **options):
103 yield value
104
105 def indefLenValueDecoder(self, substrate, asn1Spec,
106 tagSet=None, length=None, state=None,
107 decodeFun=None, substrateFun=None,
108 **options):
109 if substrateFun:
110 asn1Object = self._createComponent(asn1Spec, tagSet, '', **options)
111
112 for chunk in substrateFun(asn1Object, substrate, length, options):
113 yield chunk
114
115 return
116
117 while True:
118 for value in decodeFun(
119 substrate, asn1Spec, tagSet, length,
120 allowEoo=True, **options):
121
122 if value is eoo.endOfOctets:
123 return
124
125 yield value
126
127
128rawPayloadDecoder = RawPayloadDecoder()
129
130
131class IntegerPayloadDecoder(AbstractSimplePayloadDecoder):
132 protoComponent = univ.Integer(0)
133
134 def valueDecoder(self, substrate, asn1Spec,
135 tagSet=None, length=None, state=None,
136 decodeFun=None, substrateFun=None,
137 **options):
138
139 if tagSet[0].tagFormat != tag.tagFormatSimple:
140 raise error.PyAsn1Error('Simple tag format expected')
141
142 for chunk in readFromStream(substrate, length, options):
143 if isinstance(chunk, SubstrateUnderrunError):
144 yield chunk
145
146 if chunk:
147 value = from_bytes(chunk, signed=True)
148
149 else:
150 value = 0
151
152 yield self._createComponent(asn1Spec, tagSet, value, **options)
153
154
155class BooleanPayloadDecoder(IntegerPayloadDecoder):
156 protoComponent = univ.Boolean(0)
157
158 def _createComponent(self, asn1Spec, tagSet, value, **options):
159 return IntegerPayloadDecoder._createComponent(
160 self, asn1Spec, tagSet, value and 1 or 0, **options)
161
162
163class BitStringPayloadDecoder(AbstractSimplePayloadDecoder):
164 protoComponent = univ.BitString(())
165 supportConstructedForm = True
166
167 def valueDecoder(self, substrate, asn1Spec,
168 tagSet=None, length=None, state=None,
169 decodeFun=None, substrateFun=None,
170 **options):
171
172 if substrateFun:
173 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options)
174
175 for chunk in substrateFun(asn1Object, substrate, length, options):
176 yield chunk
177
178 return
179
180 if not length:
181 raise error.PyAsn1Error('Empty BIT STRING substrate')
182
183 for chunk in isEndOfStream(substrate):
184 if isinstance(chunk, SubstrateUnderrunError):
185 yield chunk
186
187 if chunk:
188 raise error.PyAsn1Error('Empty BIT STRING substrate')
189
190 if tagSet[0].tagFormat == tag.tagFormatSimple: # XXX what tag to check?
191
192 for trailingBits in readFromStream(substrate, 1, options):
193 if isinstance(trailingBits, SubstrateUnderrunError):
194 yield trailingBits
195
196 trailingBits = ord(trailingBits)
197 if trailingBits > 7:
198 raise error.PyAsn1Error(
199 'Trailing bits overflow %s' % trailingBits
200 )
201
202 for chunk in readFromStream(substrate, length - 1, options):
203 if isinstance(chunk, SubstrateUnderrunError):
204 yield chunk
205
206 value = self.protoComponent.fromOctetString(
207 chunk, internalFormat=True, padding=trailingBits)
208
209 yield self._createComponent(asn1Spec, tagSet, value, **options)
210
211 return
212
213 if not self.supportConstructedForm:
214 raise error.PyAsn1Error('Constructed encoding form prohibited '
215 'at %s' % self.__class__.__name__)
216
217 if LOG:
218 LOG('assembling constructed serialization')
219
220 # All inner fragments are of the same type, treat them as octet string
221 substrateFun = self.substrateCollector
222
223 bitString = self.protoComponent.fromOctetString(null, internalFormat=True)
224
225 current_position = substrate.tell()
226
227 while substrate.tell() - current_position < length:
228 for component in decodeFun(
229 substrate, self.protoComponent, substrateFun=substrateFun,
230 **options):
231 if isinstance(component, SubstrateUnderrunError):
232 yield component
233
234 trailingBits = oct2int(component[0])
235 if trailingBits > 7:
236 raise error.PyAsn1Error(
237 'Trailing bits overflow %s' % trailingBits
238 )
239
240 bitString = self.protoComponent.fromOctetString(
241 component[1:], internalFormat=True,
242 prepend=bitString, padding=trailingBits
243 )
244
245 yield self._createComponent(asn1Spec, tagSet, bitString, **options)
246
247 def indefLenValueDecoder(self, substrate, asn1Spec,
248 tagSet=None, length=None, state=None,
249 decodeFun=None, substrateFun=None,
250 **options):
251
252 if substrateFun:
253 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options)
254
255 for chunk in substrateFun(asn1Object, substrate, length, options):
256 yield chunk
257
258 return
259
260 # All inner fragments are of the same type, treat them as octet string
261 substrateFun = self.substrateCollector
262
263 bitString = self.protoComponent.fromOctetString(null, internalFormat=True)
264
265 while True: # loop over fragments
266
267 for component in decodeFun(
268 substrate, self.protoComponent, substrateFun=substrateFun,
269 allowEoo=True, **options):
270
271 if component is eoo.endOfOctets:
272 break
273
274 if isinstance(component, SubstrateUnderrunError):
275 yield component
276
277 if component is eoo.endOfOctets:
278 break
279
280 trailingBits = oct2int(component[0])
281 if trailingBits > 7:
282 raise error.PyAsn1Error(
283 'Trailing bits overflow %s' % trailingBits
284 )
285
286 bitString = self.protoComponent.fromOctetString(
287 component[1:], internalFormat=True,
288 prepend=bitString, padding=trailingBits
289 )
290
291 yield self._createComponent(asn1Spec, tagSet, bitString, **options)
292
293
294class OctetStringPayloadDecoder(AbstractSimplePayloadDecoder):
295 protoComponent = univ.OctetString('')
296 supportConstructedForm = True
297
298 def valueDecoder(self, substrate, asn1Spec,
299 tagSet=None, length=None, state=None,
300 decodeFun=None, substrateFun=None,
301 **options):
302 if substrateFun:
303 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options)
304
305 for chunk in substrateFun(asn1Object, substrate, length, options):
306 yield chunk
307
308 return
309
310 if tagSet[0].tagFormat == tag.tagFormatSimple: # XXX what tag to check?
311 for chunk in readFromStream(substrate, length, options):
312 if isinstance(chunk, SubstrateUnderrunError):
313 yield chunk
314
315 yield self._createComponent(asn1Spec, tagSet, chunk, **options)
316
317 return
318
319 if not self.supportConstructedForm:
320 raise error.PyAsn1Error('Constructed encoding form prohibited at %s' % self.__class__.__name__)
321
322 if LOG:
323 LOG('assembling constructed serialization')
324
325 # All inner fragments are of the same type, treat them as octet string
326 substrateFun = self.substrateCollector
327
328 header = null
329
330 original_position = substrate.tell()
331 # head = popSubstream(substrate, length)
332 while substrate.tell() - original_position < length:
333 for component in decodeFun(
334 substrate, self.protoComponent, substrateFun=substrateFun,
335 **options):
336 if isinstance(component, SubstrateUnderrunError):
337 yield component
338
339 header += component
340
341 yield self._createComponent(asn1Spec, tagSet, header, **options)
342
343 def indefLenValueDecoder(self, substrate, asn1Spec,
344 tagSet=None, length=None, state=None,
345 decodeFun=None, substrateFun=None,
346 **options):
347 if substrateFun and substrateFun is not self.substrateCollector:
348 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options)
349
350 for chunk in substrateFun(asn1Object, substrate, length, options):
351 yield chunk
352
353 return
354
355 # All inner fragments are of the same type, treat them as octet string
356 substrateFun = self.substrateCollector
357
358 header = null
359
360 while True: # loop over fragments
361
362 for component in decodeFun(
363 substrate, self.protoComponent, substrateFun=substrateFun,
364 allowEoo=True, **options):
365
366 if isinstance(component, SubstrateUnderrunError):
367 yield component
368
369 if component is eoo.endOfOctets:
370 break
371
372 if component is eoo.endOfOctets:
373 break
374
375 header += component
376
377 yield self._createComponent(asn1Spec, tagSet, header, **options)
378
379
380class NullPayloadDecoder(AbstractSimplePayloadDecoder):
381 protoComponent = univ.Null('')
382
383 def valueDecoder(self, substrate, asn1Spec,
384 tagSet=None, length=None, state=None,
385 decodeFun=None, substrateFun=None,
386 **options):
387
388 if tagSet[0].tagFormat != tag.tagFormatSimple:
389 raise error.PyAsn1Error('Simple tag format expected')
390
391 for chunk in readFromStream(substrate, length, options):
392 if isinstance(chunk, SubstrateUnderrunError):
393 yield chunk
394
395 component = self._createComponent(asn1Spec, tagSet, '', **options)
396
397 if chunk:
398 raise error.PyAsn1Error('Unexpected %d-octet substrate for Null' % length)
399
400 yield component
401
402
403class ObjectIdentifierPayloadDecoder(AbstractSimplePayloadDecoder):
404 protoComponent = univ.ObjectIdentifier(())
405
406 def valueDecoder(self, substrate, asn1Spec,
407 tagSet=None, length=None, state=None,
408 decodeFun=None, substrateFun=None,
409 **options):
410 if tagSet[0].tagFormat != tag.tagFormatSimple:
411 raise error.PyAsn1Error('Simple tag format expected')
412
413 for chunk in readFromStream(substrate, length, options):
414 if isinstance(chunk, SubstrateUnderrunError):
415 yield chunk
416
417 if not chunk:
418 raise error.PyAsn1Error('Empty substrate')
419
420 chunk = octs2ints(chunk)
421
422 oid = ()
423 index = 0
424 substrateLen = len(chunk)
425 while index < substrateLen:
426 subId = chunk[index]
427 index += 1
428 if subId < 128:
429 oid += (subId,)
430 elif subId > 128:
431 # Construct subid from a number of octets
432 nextSubId = subId
433 subId = 0
434 while nextSubId >= 128:
435 subId = (subId << 7) + (nextSubId & 0x7F)
436 if index >= substrateLen:
437 raise error.SubstrateUnderrunError(
438 'Short substrate for sub-OID past %s' % (oid,)
439 )
440 nextSubId = chunk[index]
441 index += 1
442 oid += ((subId << 7) + nextSubId,)
443 elif subId == 128:
444 # ASN.1 spec forbids leading zeros (0x80) in OID
445 # encoding, tolerating it opens a vulnerability. See
446 # https://www.esat.kuleuven.be/cosic/publications/article-1432.pdf
447 # page 7
448 raise error.PyAsn1Error('Invalid octet 0x80 in OID encoding')
449
450 # Decode two leading arcs
451 if 0 <= oid[0] <= 39:
452 oid = (0,) + oid
453 elif 40 <= oid[0] <= 79:
454 oid = (1, oid[0] - 40) + oid[1:]
455 elif oid[0] >= 80:
456 oid = (2, oid[0] - 80) + oid[1:]
457 else:
458 raise error.PyAsn1Error('Malformed first OID octet: %s' % chunk[0])
459
460 yield self._createComponent(asn1Spec, tagSet, oid, **options)
461
462
463class RelativeOIDPayloadDecoder(AbstractSimplePayloadDecoder):
464 protoComponent = univ.RelativeOID(())
465
466 def valueDecoder(self, substrate, asn1Spec,
467 tagSet=None, length=None, state=None,
468 decodeFun=None, substrateFun=None,
469 **options):
470 if tagSet[0].tagFormat != tag.tagFormatSimple:
471 raise error.PyAsn1Error('Simple tag format expected')
472
473 for chunk in readFromStream(substrate, length, options):
474 if isinstance(chunk, SubstrateUnderrunError):
475 yield chunk
476
477 if not chunk:
478 raise error.PyAsn1Error('Empty substrate')
479
480 chunk = octs2ints(chunk)
481
482 reloid = ()
483 index = 0
484 substrateLen = len(chunk)
485 while index < substrateLen:
486 subId = chunk[index]
487 index += 1
488 if subId < 128:
489 reloid += (subId,)
490 elif subId > 128:
491 # Construct subid from a number of octets
492 nextSubId = subId
493 subId = 0
494 while nextSubId >= 128:
495 subId = (subId << 7) + (nextSubId & 0x7F)
496 if index >= substrateLen:
497 raise error.SubstrateUnderrunError(
498 'Short substrate for sub-OID past %s' % (reloid,)
499 )
500 nextSubId = chunk[index]
501 index += 1
502 reloid += ((subId << 7) + nextSubId,)
503 elif subId == 128:
504 # ASN.1 spec forbids leading zeros (0x80) in OID
505 # encoding, tolerating it opens a vulnerability. See
506 # https://www.esat.kuleuven.be/cosic/publications/article-1432.pdf
507 # page 7
508 raise error.PyAsn1Error('Invalid octet 0x80 in RELATIVE-OID encoding')
509
510 yield self._createComponent(asn1Spec, tagSet, reloid, **options)
511
512
513class RealPayloadDecoder(AbstractSimplePayloadDecoder):
514 protoComponent = univ.Real()
515
516 def valueDecoder(self, substrate, asn1Spec,
517 tagSet=None, length=None, state=None,
518 decodeFun=None, substrateFun=None,
519 **options):
520 if tagSet[0].tagFormat != tag.tagFormatSimple:
521 raise error.PyAsn1Error('Simple tag format expected')
522
523 for chunk in readFromStream(substrate, length, options):
524 if isinstance(chunk, SubstrateUnderrunError):
525 yield chunk
526
527 if not chunk:
528 yield self._createComponent(asn1Spec, tagSet, 0.0, **options)
529 return
530
531 fo = oct2int(chunk[0])
532 chunk = chunk[1:]
533 if fo & 0x80: # binary encoding
534 if not chunk:
535 raise error.PyAsn1Error("Incomplete floating-point value")
536
537 if LOG:
538 LOG('decoding binary encoded REAL')
539
540 n = (fo & 0x03) + 1
541
542 if n == 4:
543 n = oct2int(chunk[0])
544 chunk = chunk[1:]
545
546 eo, chunk = chunk[:n], chunk[n:]
547
548 if not eo or not chunk:
549 raise error.PyAsn1Error('Real exponent screwed')
550
551 e = oct2int(eo[0]) & 0x80 and -1 or 0
552
553 while eo: # exponent
554 e <<= 8
555 e |= oct2int(eo[0])
556 eo = eo[1:]
557
558 b = fo >> 4 & 0x03 # base bits
559
560 if b > 2:
561 raise error.PyAsn1Error('Illegal Real base')
562
563 if b == 1: # encbase = 8
564 e *= 3
565
566 elif b == 2: # encbase = 16
567 e *= 4
568 p = 0
569
570 while chunk: # value
571 p <<= 8
572 p |= oct2int(chunk[0])
573 chunk = chunk[1:]
574
575 if fo & 0x40: # sign bit
576 p = -p
577
578 sf = fo >> 2 & 0x03 # scale bits
579 p *= 2 ** sf
580 value = (p, 2, e)
581
582 elif fo & 0x40: # infinite value
583 if LOG:
584 LOG('decoding infinite REAL')
585
586 value = fo & 0x01 and '-inf' or 'inf'
587
588 elif fo & 0xc0 == 0: # character encoding
589 if not chunk:
590 raise error.PyAsn1Error("Incomplete floating-point value")
591
592 if LOG:
593 LOG('decoding character encoded REAL')
594
595 try:
596 if fo & 0x3 == 0x1: # NR1
597 value = (int(chunk), 10, 0)
598
599 elif fo & 0x3 == 0x2: # NR2
600 value = float(chunk)
601
602 elif fo & 0x3 == 0x3: # NR3
603 value = float(chunk)
604
605 else:
606 raise error.SubstrateUnderrunError(
607 'Unknown NR (tag %s)' % fo
608 )
609
610 except ValueError:
611 raise error.SubstrateUnderrunError(
612 'Bad character Real syntax'
613 )
614
615 else:
616 raise error.SubstrateUnderrunError(
617 'Unknown encoding (tag %s)' % fo
618 )
619
620 yield self._createComponent(asn1Spec, tagSet, value, **options)
621
622
623class AbstractConstructedPayloadDecoder(AbstractPayloadDecoder):
624 protoComponent = None
625
626
627class ConstructedPayloadDecoderBase(AbstractConstructedPayloadDecoder):
628 protoRecordComponent = None
629 protoSequenceComponent = None
630
631 def _getComponentTagMap(self, asn1Object, idx):
632 raise NotImplementedError()
633
634 def _getComponentPositionByType(self, asn1Object, tagSet, idx):
635 raise NotImplementedError()
636
637 def _decodeComponentsSchemaless(
638 self, substrate, tagSet=None, decodeFun=None,
639 length=None, **options):
640
641 asn1Object = None
642
643 components = []
644 componentTypes = set()
645
646 original_position = substrate.tell()
647
648 while length == -1 or substrate.tell() < original_position + length:
649 for component in decodeFun(substrate, **options):
650 if isinstance(component, SubstrateUnderrunError):
651 yield component
652
653 if length == -1 and component is eoo.endOfOctets:
654 break
655
656 components.append(component)
657 componentTypes.add(component.tagSet)
658
659 # Now we have to guess is it SEQUENCE/SET or SEQUENCE OF/SET OF
660 # The heuristics is:
661 # * 1+ components of different types -> likely SEQUENCE/SET
662 # * otherwise -> likely SEQUENCE OF/SET OF
663 if len(componentTypes) > 1:
664 protoComponent = self.protoRecordComponent
665
666 else:
667 protoComponent = self.protoSequenceComponent
668
669 asn1Object = protoComponent.clone(
670 # construct tagSet from base tag from prototype ASN.1 object
671 # and additional tags recovered from the substrate
672 tagSet=tag.TagSet(protoComponent.tagSet.baseTag, *tagSet.superTags)
673 )
674
675 if LOG:
676 LOG('guessed %r container type (pass `asn1Spec` to guide the '
677 'decoder)' % asn1Object)
678
679 for idx, component in enumerate(components):
680 asn1Object.setComponentByPosition(
681 idx, component,
682 verifyConstraints=False,
683 matchTags=False, matchConstraints=False
684 )
685
686 yield asn1Object
687
688 def valueDecoder(self, substrate, asn1Spec,
689 tagSet=None, length=None, state=None,
690 decodeFun=None, substrateFun=None,
691 **options):
692 if tagSet[0].tagFormat != tag.tagFormatConstructed:
693 raise error.PyAsn1Error('Constructed tag format expected')
694
695 original_position = substrate.tell()
696
697 if substrateFun:
698 if asn1Spec is not None:
699 asn1Object = asn1Spec.clone()
700
701 elif self.protoComponent is not None:
702 asn1Object = self.protoComponent.clone(tagSet=tagSet)
703
704 else:
705 asn1Object = self.protoRecordComponent, self.protoSequenceComponent
706
707 for chunk in substrateFun(asn1Object, substrate, length, options):
708 yield chunk
709
710 return
711
712 if asn1Spec is None:
713 for asn1Object in self._decodeComponentsSchemaless(
714 substrate, tagSet=tagSet, decodeFun=decodeFun,
715 length=length, **options):
716 if isinstance(asn1Object, SubstrateUnderrunError):
717 yield asn1Object
718
719 if substrate.tell() < original_position + length:
720 if LOG:
721 for trailing in readFromStream(substrate, context=options):
722 if isinstance(trailing, SubstrateUnderrunError):
723 yield trailing
724
725 LOG('Unused trailing %d octets encountered: %s' % (
726 len(trailing), debug.hexdump(trailing)))
727
728 yield asn1Object
729
730 return
731
732 asn1Object = asn1Spec.clone()
733 asn1Object.clear()
734
735 options = self._passAsn1Object(asn1Object, options)
736
737 if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId):
738
739 namedTypes = asn1Spec.componentType
740
741 isSetType = asn1Spec.typeId == univ.Set.typeId
742 isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault
743
744 if LOG:
745 LOG('decoding %sdeterministic %s type %r chosen by type ID' % (
746 not isDeterministic and 'non-' or '', isSetType and 'SET' or '',
747 asn1Spec))
748
749 seenIndices = set()
750 idx = 0
751 while substrate.tell() - original_position < length:
752 if not namedTypes:
753 componentType = None
754
755 elif isSetType:
756 componentType = namedTypes.tagMapUnique
757
758 else:
759 try:
760 if isDeterministic:
761 componentType = namedTypes[idx].asn1Object
762
763 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
764 componentType = namedTypes.getTagMapNearPosition(idx)
765
766 else:
767 componentType = namedTypes[idx].asn1Object
768
769 except IndexError:
770 raise error.PyAsn1Error(
771 'Excessive components decoded at %r' % (asn1Spec,)
772 )
773
774 for component in decodeFun(substrate, componentType, **options):
775 if isinstance(component, SubstrateUnderrunError):
776 yield component
777
778 if not isDeterministic and namedTypes:
779 if isSetType:
780 idx = namedTypes.getPositionByType(component.effectiveTagSet)
781
782 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
783 idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx)
784
785 asn1Object.setComponentByPosition(
786 idx, component,
787 verifyConstraints=False,
788 matchTags=False, matchConstraints=False
789 )
790
791 seenIndices.add(idx)
792 idx += 1
793
794 if LOG:
795 LOG('seen component indices %s' % seenIndices)
796
797 if namedTypes:
798 if not namedTypes.requiredComponents.issubset(seenIndices):
799 raise error.PyAsn1Error(
800 'ASN.1 object %s has uninitialized '
801 'components' % asn1Object.__class__.__name__)
802
803 if namedTypes.hasOpenTypes:
804
805 openTypes = options.get('openTypes', {})
806
807 if LOG:
808 LOG('user-specified open types map:')
809
810 for k, v in openTypes.items():
811 LOG('%s -> %r' % (k, v))
812
813 if openTypes or options.get('decodeOpenTypes', False):
814
815 for idx, namedType in enumerate(namedTypes.namedTypes):
816 if not namedType.openType:
817 continue
818
819 if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue:
820 continue
821
822 governingValue = asn1Object.getComponentByName(
823 namedType.openType.name
824 )
825
826 try:
827 openType = openTypes[governingValue]
828
829 except KeyError:
830
831 if LOG:
832 LOG('default open types map of component '
833 '"%s.%s" governed by component "%s.%s"'
834 ':' % (asn1Object.__class__.__name__,
835 namedType.name,
836 asn1Object.__class__.__name__,
837 namedType.openType.name))
838
839 for k, v in namedType.openType.items():
840 LOG('%s -> %r' % (k, v))
841
842 try:
843 openType = namedType.openType[governingValue]
844
845 except KeyError:
846 if LOG:
847 LOG('failed to resolve open type by governing '
848 'value %r' % (governingValue,))
849 continue
850
851 if LOG:
852 LOG('resolved open type %r by governing '
853 'value %r' % (openType, governingValue))
854
855 containerValue = asn1Object.getComponentByPosition(idx)
856
857 if containerValue.typeId in (
858 univ.SetOf.typeId, univ.SequenceOf.typeId):
859
860 for pos, containerElement in enumerate(
861 containerValue):
862
863 stream = asSeekableStream(containerValue[pos].asOctets())
864
865 for component in decodeFun(stream, asn1Spec=openType, **options):
866 if isinstance(component, SubstrateUnderrunError):
867 yield component
868
869 containerValue[pos] = component
870
871 else:
872 stream = asSeekableStream(asn1Object.getComponentByPosition(idx).asOctets())
873
874 for component in decodeFun(stream, asn1Spec=openType, **options):
875 if isinstance(component, SubstrateUnderrunError):
876 yield component
877
878 asn1Object.setComponentByPosition(idx, component)
879
880 else:
881 inconsistency = asn1Object.isInconsistent
882 if inconsistency:
883 raise inconsistency
884
885 else:
886 componentType = asn1Spec.componentType
887
888 if LOG:
889 LOG('decoding type %r chosen by given `asn1Spec`' % componentType)
890
891 idx = 0
892
893 while substrate.tell() - original_position < length:
894 for component in decodeFun(substrate, componentType, **options):
895 if isinstance(component, SubstrateUnderrunError):
896 yield component
897
898 asn1Object.setComponentByPosition(
899 idx, component,
900 verifyConstraints=False,
901 matchTags=False, matchConstraints=False
902 )
903
904 idx += 1
905
906 yield asn1Object
907
908 def indefLenValueDecoder(self, substrate, asn1Spec,
909 tagSet=None, length=None, state=None,
910 decodeFun=None, substrateFun=None,
911 **options):
912 if tagSet[0].tagFormat != tag.tagFormatConstructed:
913 raise error.PyAsn1Error('Constructed tag format expected')
914
915 if substrateFun is not None:
916 if asn1Spec is not None:
917 asn1Object = asn1Spec.clone()
918
919 elif self.protoComponent is not None:
920 asn1Object = self.protoComponent.clone(tagSet=tagSet)
921
922 else:
923 asn1Object = self.protoRecordComponent, self.protoSequenceComponent
924
925 for chunk in substrateFun(asn1Object, substrate, length, options):
926 yield chunk
927
928 return
929
930 if asn1Spec is None:
931 for asn1Object in self._decodeComponentsSchemaless(
932 substrate, tagSet=tagSet, decodeFun=decodeFun,
933 length=length, **dict(options, allowEoo=True)):
934 if isinstance(asn1Object, SubstrateUnderrunError):
935 yield asn1Object
936
937 yield asn1Object
938
939 return
940
941 asn1Object = asn1Spec.clone()
942 asn1Object.clear()
943
944 options = self._passAsn1Object(asn1Object, options)
945
946 if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId):
947
948 namedTypes = asn1Object.componentType
949
950 isSetType = asn1Object.typeId == univ.Set.typeId
951 isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault
952
953 if LOG:
954 LOG('decoding %sdeterministic %s type %r chosen by type ID' % (
955 not isDeterministic and 'non-' or '', isSetType and 'SET' or '',
956 asn1Spec))
957
958 seenIndices = set()
959
960 idx = 0
961
962 while True: # loop over components
963 if len(namedTypes) <= idx:
964 asn1Spec = None
965
966 elif isSetType:
967 asn1Spec = namedTypes.tagMapUnique
968
969 else:
970 try:
971 if isDeterministic:
972 asn1Spec = namedTypes[idx].asn1Object
973
974 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
975 asn1Spec = namedTypes.getTagMapNearPosition(idx)
976
977 else:
978 asn1Spec = namedTypes[idx].asn1Object
979
980 except IndexError:
981 raise error.PyAsn1Error(
982 'Excessive components decoded at %r' % (asn1Object,)
983 )
984
985 for component in decodeFun(substrate, asn1Spec, allowEoo=True, **options):
986
987 if isinstance(component, SubstrateUnderrunError):
988 yield component
989
990 if component is eoo.endOfOctets:
991 break
992
993 if component is eoo.endOfOctets:
994 break
995
996 if not isDeterministic and namedTypes:
997 if isSetType:
998 idx = namedTypes.getPositionByType(component.effectiveTagSet)
999
1000 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
1001 idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx)
1002
1003 asn1Object.setComponentByPosition(
1004 idx, component,
1005 verifyConstraints=False,
1006 matchTags=False, matchConstraints=False
1007 )
1008
1009 seenIndices.add(idx)
1010 idx += 1
1011
1012 if LOG:
1013 LOG('seen component indices %s' % seenIndices)
1014
1015 if namedTypes:
1016 if not namedTypes.requiredComponents.issubset(seenIndices):
1017 raise error.PyAsn1Error(
1018 'ASN.1 object %s has uninitialized '
1019 'components' % asn1Object.__class__.__name__)
1020
1021 if namedTypes.hasOpenTypes:
1022
1023 openTypes = options.get('openTypes', {})
1024
1025 if LOG:
1026 LOG('user-specified open types map:')
1027
1028 for k, v in openTypes.items():
1029 LOG('%s -> %r' % (k, v))
1030
1031 if openTypes or options.get('decodeOpenTypes', False):
1032
1033 for idx, namedType in enumerate(namedTypes.namedTypes):
1034 if not namedType.openType:
1035 continue
1036
1037 if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue:
1038 continue
1039
1040 governingValue = asn1Object.getComponentByName(
1041 namedType.openType.name
1042 )
1043
1044 try:
1045 openType = openTypes[governingValue]
1046
1047 except KeyError:
1048
1049 if LOG:
1050 LOG('default open types map of component '
1051 '"%s.%s" governed by component "%s.%s"'
1052 ':' % (asn1Object.__class__.__name__,
1053 namedType.name,
1054 asn1Object.__class__.__name__,
1055 namedType.openType.name))
1056
1057 for k, v in namedType.openType.items():
1058 LOG('%s -> %r' % (k, v))
1059
1060 try:
1061 openType = namedType.openType[governingValue]
1062
1063 except KeyError:
1064 if LOG:
1065 LOG('failed to resolve open type by governing '
1066 'value %r' % (governingValue,))
1067 continue
1068
1069 if LOG:
1070 LOG('resolved open type %r by governing '
1071 'value %r' % (openType, governingValue))
1072
1073 containerValue = asn1Object.getComponentByPosition(idx)
1074
1075 if containerValue.typeId in (
1076 univ.SetOf.typeId, univ.SequenceOf.typeId):
1077
1078 for pos, containerElement in enumerate(
1079 containerValue):
1080
1081 stream = asSeekableStream(containerValue[pos].asOctets())
1082
1083 for component in decodeFun(stream, asn1Spec=openType,
1084 **dict(options, allowEoo=True)):
1085 if isinstance(component, SubstrateUnderrunError):
1086 yield component
1087
1088 if component is eoo.endOfOctets:
1089 break
1090
1091 containerValue[pos] = component
1092
1093 else:
1094 stream = asSeekableStream(asn1Object.getComponentByPosition(idx).asOctets())
1095 for component in decodeFun(stream, asn1Spec=openType,
1096 **dict(options, allowEoo=True)):
1097 if isinstance(component, SubstrateUnderrunError):
1098 yield component
1099
1100 if component is eoo.endOfOctets:
1101 break
1102
1103 asn1Object.setComponentByPosition(idx, component)
1104
1105 else:
1106 inconsistency = asn1Object.isInconsistent
1107 if inconsistency:
1108 raise inconsistency
1109
1110 else:
1111 componentType = asn1Spec.componentType
1112
1113 if LOG:
1114 LOG('decoding type %r chosen by given `asn1Spec`' % componentType)
1115
1116 idx = 0
1117
1118 while True:
1119
1120 for component in decodeFun(
1121 substrate, componentType, allowEoo=True, **options):
1122
1123 if isinstance(component, SubstrateUnderrunError):
1124 yield component
1125
1126 if component is eoo.endOfOctets:
1127 break
1128
1129 if component is eoo.endOfOctets:
1130 break
1131
1132 asn1Object.setComponentByPosition(
1133 idx, component,
1134 verifyConstraints=False,
1135 matchTags=False, matchConstraints=False
1136 )
1137
1138 idx += 1
1139
1140 yield asn1Object
1141
1142
1143class SequenceOrSequenceOfPayloadDecoder(ConstructedPayloadDecoderBase):
1144 protoRecordComponent = univ.Sequence()
1145 protoSequenceComponent = univ.SequenceOf()
1146
1147
1148class SequencePayloadDecoder(SequenceOrSequenceOfPayloadDecoder):
1149 protoComponent = univ.Sequence()
1150
1151
1152class SequenceOfPayloadDecoder(SequenceOrSequenceOfPayloadDecoder):
1153 protoComponent = univ.SequenceOf()
1154
1155
1156class SetOrSetOfPayloadDecoder(ConstructedPayloadDecoderBase):
1157 protoRecordComponent = univ.Set()
1158 protoSequenceComponent = univ.SetOf()
1159
1160
1161class SetPayloadDecoder(SetOrSetOfPayloadDecoder):
1162 protoComponent = univ.Set()
1163
1164
1165class SetOfPayloadDecoder(SetOrSetOfPayloadDecoder):
1166 protoComponent = univ.SetOf()
1167
1168
1169class ChoicePayloadDecoder(ConstructedPayloadDecoderBase):
1170 protoComponent = univ.Choice()
1171
1172 def valueDecoder(self, substrate, asn1Spec,
1173 tagSet=None, length=None, state=None,
1174 decodeFun=None, substrateFun=None,
1175 **options):
1176 if asn1Spec is None:
1177 asn1Object = self.protoComponent.clone(tagSet=tagSet)
1178
1179 else:
1180 asn1Object = asn1Spec.clone()
1181
1182 if substrateFun:
1183 for chunk in substrateFun(asn1Object, substrate, length, options):
1184 yield chunk
1185
1186 return
1187
1188 options = self._passAsn1Object(asn1Object, options)
1189
1190 if asn1Object.tagSet == tagSet:
1191 if LOG:
1192 LOG('decoding %s as explicitly tagged CHOICE' % (tagSet,))
1193
1194 for component in decodeFun(
1195 substrate, asn1Object.componentTagMap, **options):
1196 if isinstance(component, SubstrateUnderrunError):
1197 yield component
1198
1199 else:
1200 if LOG:
1201 LOG('decoding %s as untagged CHOICE' % (tagSet,))
1202
1203 for component in decodeFun(
1204 substrate, asn1Object.componentTagMap, tagSet, length,
1205 state, **options):
1206 if isinstance(component, SubstrateUnderrunError):
1207 yield component
1208
1209 effectiveTagSet = component.effectiveTagSet
1210
1211 if LOG:
1212 LOG('decoded component %s, effective tag set %s' % (component, effectiveTagSet))
1213
1214 asn1Object.setComponentByType(
1215 effectiveTagSet, component,
1216 verifyConstraints=False,
1217 matchTags=False, matchConstraints=False,
1218 innerFlag=False
1219 )
1220
1221 yield asn1Object
1222
1223 def indefLenValueDecoder(self, substrate, asn1Spec,
1224 tagSet=None, length=None, state=None,
1225 decodeFun=None, substrateFun=None,
1226 **options):
1227 if asn1Spec is None:
1228 asn1Object = self.protoComponent.clone(tagSet=tagSet)
1229
1230 else:
1231 asn1Object = asn1Spec.clone()
1232
1233 if substrateFun:
1234 for chunk in substrateFun(asn1Object, substrate, length, options):
1235 yield chunk
1236
1237 return
1238
1239 options = self._passAsn1Object(asn1Object, options)
1240
1241 isTagged = asn1Object.tagSet == tagSet
1242
1243 if LOG:
1244 LOG('decoding %s as %stagged CHOICE' % (
1245 tagSet, isTagged and 'explicitly ' or 'un'))
1246
1247 while True:
1248
1249 if isTagged:
1250 iterator = decodeFun(
1251 substrate, asn1Object.componentType.tagMapUnique,
1252 **dict(options, allowEoo=True))
1253
1254 else:
1255 iterator = decodeFun(
1256 substrate, asn1Object.componentType.tagMapUnique,
1257 tagSet, length, state, **dict(options, allowEoo=True))
1258
1259 for component in iterator:
1260
1261 if isinstance(component, SubstrateUnderrunError):
1262 yield component
1263
1264 if component is eoo.endOfOctets:
1265 break
1266
1267 effectiveTagSet = component.effectiveTagSet
1268
1269 if LOG:
1270 LOG('decoded component %s, effective tag set '
1271 '%s' % (component, effectiveTagSet))
1272
1273 asn1Object.setComponentByType(
1274 effectiveTagSet, component,
1275 verifyConstraints=False,
1276 matchTags=False, matchConstraints=False,
1277 innerFlag=False
1278 )
1279
1280 if not isTagged:
1281 break
1282
1283 if not isTagged or component is eoo.endOfOctets:
1284 break
1285
1286 yield asn1Object
1287
1288
1289class AnyPayloadDecoder(AbstractSimplePayloadDecoder):
1290 protoComponent = univ.Any()
1291
1292 def valueDecoder(self, substrate, asn1Spec,
1293 tagSet=None, length=None, state=None,
1294 decodeFun=None, substrateFun=None,
1295 **options):
1296 if asn1Spec is None:
1297 isUntagged = True
1298
1299 elif asn1Spec.__class__ is tagmap.TagMap:
1300 isUntagged = tagSet not in asn1Spec.tagMap
1301
1302 else:
1303 isUntagged = tagSet != asn1Spec.tagSet
1304
1305 if isUntagged:
1306 fullPosition = substrate.markedPosition
1307 currentPosition = substrate.tell()
1308
1309 substrate.seek(fullPosition, os.SEEK_SET)
1310 length += currentPosition - fullPosition
1311
1312 if LOG:
1313 for chunk in peekIntoStream(substrate, length):
1314 if isinstance(chunk, SubstrateUnderrunError):
1315 yield chunk
1316 LOG('decoding as untagged ANY, substrate '
1317 '%s' % debug.hexdump(chunk))
1318
1319 if substrateFun:
1320 for chunk in substrateFun(
1321 self._createComponent(asn1Spec, tagSet, noValue, **options),
1322 substrate, length, options):
1323 yield chunk
1324
1325 return
1326
1327 for chunk in readFromStream(substrate, length, options):
1328 if isinstance(chunk, SubstrateUnderrunError):
1329 yield chunk
1330
1331 yield self._createComponent(asn1Spec, tagSet, chunk, **options)
1332
1333 def indefLenValueDecoder(self, substrate, asn1Spec,
1334 tagSet=None, length=None, state=None,
1335 decodeFun=None, substrateFun=None,
1336 **options):
1337 if asn1Spec is None:
1338 isTagged = False
1339
1340 elif asn1Spec.__class__ is tagmap.TagMap:
1341 isTagged = tagSet in asn1Spec.tagMap
1342
1343 else:
1344 isTagged = tagSet == asn1Spec.tagSet
1345
1346 if isTagged:
1347 # tagged Any type -- consume header substrate
1348 chunk = null
1349
1350 if LOG:
1351 LOG('decoding as tagged ANY')
1352
1353 else:
1354 # TODO: Seems not to be tested
1355 fullPosition = substrate.markedPosition
1356 currentPosition = substrate.tell()
1357
1358 substrate.seek(fullPosition, os.SEEK_SET)
1359 for chunk in readFromStream(substrate, currentPosition - fullPosition, options):
1360 if isinstance(chunk, SubstrateUnderrunError):
1361 yield chunk
1362
1363 if LOG:
1364 LOG('decoding as untagged ANY, header substrate %s' % debug.hexdump(chunk))
1365
1366 # Any components do not inherit initial tag
1367 asn1Spec = self.protoComponent
1368
1369 if substrateFun and substrateFun is not self.substrateCollector:
1370 asn1Object = self._createComponent(
1371 asn1Spec, tagSet, noValue, **options)
1372
1373 for chunk in substrateFun(
1374 asn1Object, chunk + substrate, length + len(chunk), options):
1375 yield chunk
1376
1377 return
1378
1379 if LOG:
1380 LOG('assembling constructed serialization')
1381
1382 # All inner fragments are of the same type, treat them as octet string
1383 substrateFun = self.substrateCollector
1384
1385 while True: # loop over fragments
1386
1387 for component in decodeFun(
1388 substrate, asn1Spec, substrateFun=substrateFun,
1389 allowEoo=True, **options):
1390
1391 if isinstance(component, SubstrateUnderrunError):
1392 yield component
1393
1394 if component is eoo.endOfOctets:
1395 break
1396
1397 if component is eoo.endOfOctets:
1398 break
1399
1400 chunk += component
1401
1402 if substrateFun:
1403 yield chunk # TODO: Weird
1404
1405 else:
1406 yield self._createComponent(asn1Spec, tagSet, chunk, **options)
1407
1408
1409# character string types
1410class UTF8StringPayloadDecoder(OctetStringPayloadDecoder):
1411 protoComponent = char.UTF8String()
1412
1413
1414class NumericStringPayloadDecoder(OctetStringPayloadDecoder):
1415 protoComponent = char.NumericString()
1416
1417
1418class PrintableStringPayloadDecoder(OctetStringPayloadDecoder):
1419 protoComponent = char.PrintableString()
1420
1421
1422class TeletexStringPayloadDecoder(OctetStringPayloadDecoder):
1423 protoComponent = char.TeletexString()
1424
1425
1426class VideotexStringPayloadDecoder(OctetStringPayloadDecoder):
1427 protoComponent = char.VideotexString()
1428
1429
1430class IA5StringPayloadDecoder(OctetStringPayloadDecoder):
1431 protoComponent = char.IA5String()
1432
1433
1434class GraphicStringPayloadDecoder(OctetStringPayloadDecoder):
1435 protoComponent = char.GraphicString()
1436
1437
1438class VisibleStringPayloadDecoder(OctetStringPayloadDecoder):
1439 protoComponent = char.VisibleString()
1440
1441
1442class GeneralStringPayloadDecoder(OctetStringPayloadDecoder):
1443 protoComponent = char.GeneralString()
1444
1445
1446class UniversalStringPayloadDecoder(OctetStringPayloadDecoder):
1447 protoComponent = char.UniversalString()
1448
1449
1450class BMPStringPayloadDecoder(OctetStringPayloadDecoder):
1451 protoComponent = char.BMPString()
1452
1453
1454# "useful" types
1455class ObjectDescriptorPayloadDecoder(OctetStringPayloadDecoder):
1456 protoComponent = useful.ObjectDescriptor()
1457
1458
1459class GeneralizedTimePayloadDecoder(OctetStringPayloadDecoder):
1460 protoComponent = useful.GeneralizedTime()
1461
1462
1463class UTCTimePayloadDecoder(OctetStringPayloadDecoder):
1464 protoComponent = useful.UTCTime()
1465
1466
1467TAG_MAP = {
1468 univ.Integer.tagSet: IntegerPayloadDecoder(),
1469 univ.Boolean.tagSet: BooleanPayloadDecoder(),
1470 univ.BitString.tagSet: BitStringPayloadDecoder(),
1471 univ.OctetString.tagSet: OctetStringPayloadDecoder(),
1472 univ.Null.tagSet: NullPayloadDecoder(),
1473 univ.ObjectIdentifier.tagSet: ObjectIdentifierPayloadDecoder(),
1474 univ.RelativeOID.tagSet: RelativeOIDPayloadDecoder(),
1475 univ.Enumerated.tagSet: IntegerPayloadDecoder(),
1476 univ.Real.tagSet: RealPayloadDecoder(),
1477 univ.Sequence.tagSet: SequenceOrSequenceOfPayloadDecoder(), # conflicts with SequenceOf
1478 univ.Set.tagSet: SetOrSetOfPayloadDecoder(), # conflicts with SetOf
1479 univ.Choice.tagSet: ChoicePayloadDecoder(), # conflicts with Any
1480 # character string types
1481 char.UTF8String.tagSet: UTF8StringPayloadDecoder(),
1482 char.NumericString.tagSet: NumericStringPayloadDecoder(),
1483 char.PrintableString.tagSet: PrintableStringPayloadDecoder(),
1484 char.TeletexString.tagSet: TeletexStringPayloadDecoder(),
1485 char.VideotexString.tagSet: VideotexStringPayloadDecoder(),
1486 char.IA5String.tagSet: IA5StringPayloadDecoder(),
1487 char.GraphicString.tagSet: GraphicStringPayloadDecoder(),
1488 char.VisibleString.tagSet: VisibleStringPayloadDecoder(),
1489 char.GeneralString.tagSet: GeneralStringPayloadDecoder(),
1490 char.UniversalString.tagSet: UniversalStringPayloadDecoder(),
1491 char.BMPString.tagSet: BMPStringPayloadDecoder(),
1492 # useful types
1493 useful.ObjectDescriptor.tagSet: ObjectDescriptorPayloadDecoder(),
1494 useful.GeneralizedTime.tagSet: GeneralizedTimePayloadDecoder(),
1495 useful.UTCTime.tagSet: UTCTimePayloadDecoder()
1496}
1497
1498# Type-to-codec map for ambiguous ASN.1 types
1499TYPE_MAP = {
1500 univ.Set.typeId: SetPayloadDecoder(),
1501 univ.SetOf.typeId: SetOfPayloadDecoder(),
1502 univ.Sequence.typeId: SequencePayloadDecoder(),
1503 univ.SequenceOf.typeId: SequenceOfPayloadDecoder(),
1504 univ.Choice.typeId: ChoicePayloadDecoder(),
1505 univ.Any.typeId: AnyPayloadDecoder()
1506}
1507
1508# deprecated aliases, https://github.com/pyasn1/pyasn1/issues/9
1509tagMap = TAG_MAP
1510typeMap = TYPE_MAP
1511
1512# Put in non-ambiguous types for faster codec lookup
1513for typeDecoder in TAG_MAP.values():
1514 if typeDecoder.protoComponent is not None:
1515 typeId = typeDecoder.protoComponent.__class__.typeId
1516 if typeId is not None and typeId not in TYPE_MAP:
1517 TYPE_MAP[typeId] = typeDecoder
1518
1519
1520(stDecodeTag,
1521 stDecodeLength,
1522 stGetValueDecoder,
1523 stGetValueDecoderByAsn1Spec,
1524 stGetValueDecoderByTag,
1525 stTryAsExplicitTag,
1526 stDecodeValue,
1527 stDumpRawValue,
1528 stErrorCondition,
1529 stStop) = [x for x in range(10)]
1530
1531
1532EOO_SENTINEL = ints2octs((0, 0))
1533
1534
1535class SingleItemDecoder(object):
1536 defaultErrorState = stErrorCondition
1537 #defaultErrorState = stDumpRawValue
1538 defaultRawDecoder = AnyPayloadDecoder()
1539
1540 supportIndefLength = True
1541
1542 TAG_MAP = TAG_MAP
1543 TYPE_MAP = TYPE_MAP
1544
1545 def __init__(self, tagMap=_MISSING, typeMap=_MISSING, **ignored):
1546 self._tagMap = tagMap if tagMap is not _MISSING else self.TAG_MAP
1547 self._typeMap = typeMap if typeMap is not _MISSING else self.TYPE_MAP
1548
1549 # Tag & TagSet objects caches
1550 self._tagCache = {}
1551 self._tagSetCache = {}
1552
1553 def __call__(self, substrate, asn1Spec=None,
1554 tagSet=None, length=None, state=stDecodeTag,
1555 decodeFun=None, substrateFun=None,
1556 **options):
1557
1558 allowEoo = options.pop('allowEoo', False)
1559
1560 if LOG:
1561 LOG('decoder called at scope %s with state %d, working with up '
1562 'to %s octets of substrate: '
1563 '%s' % (debug.scope, state, length, substrate))
1564
1565 # Look for end-of-octets sentinel
1566 if allowEoo and self.supportIndefLength:
1567
1568 for eoo_candidate in readFromStream(substrate, 2, options):
1569 if isinstance(eoo_candidate, SubstrateUnderrunError):
1570 yield eoo_candidate
1571
1572 if eoo_candidate == EOO_SENTINEL:
1573 if LOG:
1574 LOG('end-of-octets sentinel found')
1575 yield eoo.endOfOctets
1576 return
1577
1578 else:
1579 substrate.seek(-2, os.SEEK_CUR)
1580
1581 tagMap = self._tagMap
1582 typeMap = self._typeMap
1583 tagCache = self._tagCache
1584 tagSetCache = self._tagSetCache
1585
1586 value = noValue
1587
1588 substrate.markedPosition = substrate.tell()
1589
1590 while state is not stStop:
1591
1592 if state is stDecodeTag:
1593 # Decode tag
1594 isShortTag = True
1595
1596 for firstByte in readFromStream(substrate, 1, options):
1597 if isinstance(firstByte, SubstrateUnderrunError):
1598 yield firstByte
1599
1600 firstOctet = ord(firstByte)
1601
1602 try:
1603 lastTag = tagCache[firstOctet]
1604
1605 except KeyError:
1606 integerTag = firstOctet
1607 tagClass = integerTag & 0xC0
1608 tagFormat = integerTag & 0x20
1609 tagId = integerTag & 0x1F
1610
1611 if tagId == 0x1F:
1612 isShortTag = False
1613 lengthOctetIdx = 0
1614 tagId = 0
1615
1616 while True:
1617 for integerByte in readFromStream(substrate, 1, options):
1618 if isinstance(integerByte, SubstrateUnderrunError):
1619 yield integerByte
1620
1621 if not integerByte:
1622 raise error.SubstrateUnderrunError(
1623 'Short octet stream on long tag decoding'
1624 )
1625
1626 integerTag = ord(integerByte)
1627 lengthOctetIdx += 1
1628 tagId <<= 7
1629 tagId |= (integerTag & 0x7F)
1630
1631 if not integerTag & 0x80:
1632 break
1633
1634 lastTag = tag.Tag(
1635 tagClass=tagClass, tagFormat=tagFormat, tagId=tagId
1636 )
1637
1638 if isShortTag:
1639 # cache short tags
1640 tagCache[firstOctet] = lastTag
1641
1642 if tagSet is None:
1643 if isShortTag:
1644 try:
1645 tagSet = tagSetCache[firstOctet]
1646
1647 except KeyError:
1648 # base tag not recovered
1649 tagSet = tag.TagSet((), lastTag)
1650 tagSetCache[firstOctet] = tagSet
1651 else:
1652 tagSet = tag.TagSet((), lastTag)
1653
1654 else:
1655 tagSet = lastTag + tagSet
1656
1657 state = stDecodeLength
1658
1659 if LOG:
1660 LOG('tag decoded into %s, decoding length' % tagSet)
1661
1662 if state is stDecodeLength:
1663 # Decode length
1664 for firstOctet in readFromStream(substrate, 1, options):
1665 if isinstance(firstOctet, SubstrateUnderrunError):
1666 yield firstOctet
1667
1668 firstOctet = ord(firstOctet)
1669
1670 if firstOctet < 128:
1671 length = firstOctet
1672
1673 elif firstOctet > 128:
1674 size = firstOctet & 0x7F
1675 # encoded in size bytes
1676 for encodedLength in readFromStream(substrate, size, options):
1677 if isinstance(encodedLength, SubstrateUnderrunError):
1678 yield encodedLength
1679 encodedLength = list(encodedLength)
1680 # missing check on maximum size, which shouldn't be a
1681 # problem, we can handle more than is possible
1682 if len(encodedLength) != size:
1683 raise error.SubstrateUnderrunError(
1684 '%s<%s at %s' % (size, len(encodedLength), tagSet)
1685 )
1686
1687 length = 0
1688 for lengthOctet in encodedLength:
1689 length <<= 8
1690 length |= oct2int(lengthOctet)
1691 size += 1
1692
1693 else: # 128 means indefinite
1694 length = -1
1695
1696 if length == -1 and not self.supportIndefLength:
1697 raise error.PyAsn1Error('Indefinite length encoding not supported by this codec')
1698
1699 state = stGetValueDecoder
1700
1701 if LOG:
1702 LOG('value length decoded into %d' % length)
1703
1704 if state is stGetValueDecoder:
1705 if asn1Spec is None:
1706 state = stGetValueDecoderByTag
1707
1708 else:
1709 state = stGetValueDecoderByAsn1Spec
1710 #
1711 # There're two ways of creating subtypes in ASN.1 what influences
1712 # decoder operation. These methods are:
1713 # 1) Either base types used in or no IMPLICIT tagging has been
1714 # applied on subtyping.
1715 # 2) Subtype syntax drops base type information (by means of
1716 # IMPLICIT tagging.
1717 # The first case allows for complete tag recovery from substrate
1718 # while the second one requires original ASN.1 type spec for
1719 # decoding.
1720 #
1721 # In either case a set of tags (tagSet) is coming from substrate
1722 # in an incremental, tag-by-tag fashion (this is the case of
1723 # EXPLICIT tag which is most basic). Outermost tag comes first
1724 # from the wire.
1725 #
1726 if state is stGetValueDecoderByTag:
1727 try:
1728 concreteDecoder = tagMap[tagSet]
1729
1730 except KeyError:
1731 concreteDecoder = None
1732
1733 if concreteDecoder:
1734 state = stDecodeValue
1735
1736 else:
1737 try:
1738 concreteDecoder = tagMap[tagSet[:1]]
1739
1740 except KeyError:
1741 concreteDecoder = None
1742
1743 if concreteDecoder:
1744 state = stDecodeValue
1745 else:
1746 state = stTryAsExplicitTag
1747
1748 if LOG:
1749 LOG('codec %s chosen by a built-in type, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag'))
1750 debug.scope.push(concreteDecoder is None and '?' or concreteDecoder.protoComponent.__class__.__name__)
1751
1752 if state is stGetValueDecoderByAsn1Spec:
1753
1754 if asn1Spec.__class__ is tagmap.TagMap:
1755 try:
1756 chosenSpec = asn1Spec[tagSet]
1757
1758 except KeyError:
1759 chosenSpec = None
1760
1761 if LOG:
1762 LOG('candidate ASN.1 spec is a map of:')
1763
1764 for firstOctet, v in asn1Spec.presentTypes.items():
1765 LOG(' %s -> %s' % (firstOctet, v.__class__.__name__))
1766
1767 if asn1Spec.skipTypes:
1768 LOG('but neither of: ')
1769 for firstOctet, v in asn1Spec.skipTypes.items():
1770 LOG(' %s -> %s' % (firstOctet, v.__class__.__name__))
1771 LOG('new candidate ASN.1 spec is %s, chosen by %s' % (chosenSpec is None and '<none>' or chosenSpec.prettyPrintType(), tagSet))
1772
1773 elif tagSet == asn1Spec.tagSet or tagSet in asn1Spec.tagMap:
1774 chosenSpec = asn1Spec
1775 if LOG:
1776 LOG('candidate ASN.1 spec is %s' % asn1Spec.__class__.__name__)
1777
1778 else:
1779 chosenSpec = None
1780
1781 if chosenSpec is not None:
1782 try:
1783 # ambiguous type or just faster codec lookup
1784 concreteDecoder = typeMap[chosenSpec.typeId]
1785
1786 if LOG:
1787 LOG('value decoder chosen for an ambiguous type by type ID %s' % (chosenSpec.typeId,))
1788
1789 except KeyError:
1790 # use base type for codec lookup to recover untagged types
1791 baseTagSet = tag.TagSet(chosenSpec.tagSet.baseTag, chosenSpec.tagSet.baseTag)
1792 try:
1793 # base type or tagged subtype
1794 concreteDecoder = tagMap[baseTagSet]
1795
1796 if LOG:
1797 LOG('value decoder chosen by base %s' % (baseTagSet,))
1798
1799 except KeyError:
1800 concreteDecoder = None
1801
1802 if concreteDecoder:
1803 asn1Spec = chosenSpec
1804 state = stDecodeValue
1805
1806 else:
1807 state = stTryAsExplicitTag
1808
1809 else:
1810 concreteDecoder = None
1811 state = stTryAsExplicitTag
1812
1813 if LOG:
1814 LOG('codec %s chosen by ASN.1 spec, decoding %s' % (state is stDecodeValue and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag'))
1815 debug.scope.push(chosenSpec is None and '?' or chosenSpec.__class__.__name__)
1816
1817 if state is stDecodeValue:
1818 if not options.get('recursiveFlag', True) and not substrateFun: # deprecate this
1819 def substrateFun(asn1Object, _substrate, _length, _options):
1820 """Legacy hack to keep the recursiveFlag=False option supported.
1821
1822 The decode(..., substrateFun=userCallback) option was introduced in 0.1.4 as a generalization
1823 of the old recursiveFlag=False option. Users should pass their callback instead of using
1824 recursiveFlag.
1825 """
1826 yield asn1Object
1827
1828 original_position = substrate.tell()
1829
1830 if length == -1: # indef length
1831 for value in concreteDecoder.indefLenValueDecoder(
1832 substrate, asn1Spec,
1833 tagSet, length, stGetValueDecoder,
1834 self, substrateFun, **options):
1835 if isinstance(value, SubstrateUnderrunError):
1836 yield value
1837
1838 else:
1839 for value in concreteDecoder.valueDecoder(
1840 substrate, asn1Spec,
1841 tagSet, length, stGetValueDecoder,
1842 self, substrateFun, **options):
1843 if isinstance(value, SubstrateUnderrunError):
1844 yield value
1845
1846 bytesRead = substrate.tell() - original_position
1847 if not substrateFun and bytesRead != length:
1848 raise PyAsn1Error(
1849 "Read %s bytes instead of expected %s." % (bytesRead, length))
1850 elif substrateFun and bytesRead > length:
1851 # custom substrateFun may be used for partial decoding, reading less is expected there
1852 raise PyAsn1Error(
1853 "Read %s bytes are more than expected %s." % (bytesRead, length))
1854
1855 if LOG:
1856 LOG('codec %s yields type %s, value:\n%s\n...' % (
1857 concreteDecoder.__class__.__name__, value.__class__.__name__,
1858 isinstance(value, base.Asn1Item) and value.prettyPrint() or value))
1859
1860 state = stStop
1861 break
1862
1863 if state is stTryAsExplicitTag:
1864 if (tagSet and
1865 tagSet[0].tagFormat == tag.tagFormatConstructed and
1866 tagSet[0].tagClass != tag.tagClassUniversal):
1867 # Assume explicit tagging
1868 concreteDecoder = rawPayloadDecoder
1869 state = stDecodeValue
1870
1871 else:
1872 concreteDecoder = None
1873 state = self.defaultErrorState
1874
1875 if LOG:
1876 LOG('codec %s chosen, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as failure'))
1877
1878 if state is stDumpRawValue:
1879 concreteDecoder = self.defaultRawDecoder
1880
1881 if LOG:
1882 LOG('codec %s chosen, decoding value' % concreteDecoder.__class__.__name__)
1883
1884 state = stDecodeValue
1885
1886 if state is stErrorCondition:
1887 raise error.PyAsn1Error(
1888 '%s not in asn1Spec: %r' % (tagSet, asn1Spec)
1889 )
1890
1891 if LOG:
1892 debug.scope.pop()
1893 LOG('decoder left scope %s, call completed' % debug.scope)
1894
1895 yield value
1896
1897
1898class StreamingDecoder(object):
1899 """Create an iterator that turns BER/CER/DER byte stream into ASN.1 objects.
1900
1901 On each iteration, consume whatever BER/CER/DER serialization is
1902 available in the `substrate` stream-like object and turns it into
1903 one or more, possibly nested, ASN.1 objects.
1904
1905 Parameters
1906 ----------
1907 substrate: :py:class:`file`, :py:class:`io.BytesIO`
1908 BER/CER/DER serialization in form of a byte stream
1909
1910 Keyword Args
1911 ------------
1912 asn1Spec: :py:class:`~pyasn1.type.base.PyAsn1Item`
1913 A pyasn1 type object to act as a template guiding the decoder.
1914 Depending on the ASN.1 structure being decoded, `asn1Spec` may
1915 or may not be required. One of the reasons why `asn1Spec` may
1916 me required is that ASN.1 structure is encoded in the *IMPLICIT*
1917 tagging mode.
1918
1919 Yields
1920 ------
1921 : :py:class:`~pyasn1.type.base.PyAsn1Item`, :py:class:`~pyasn1.error.SubstrateUnderrunError`
1922 Decoded ASN.1 object (possibly, nested) or
1923 :py:class:`~pyasn1.error.SubstrateUnderrunError` object indicating
1924 insufficient BER/CER/DER serialization on input to fully recover ASN.1
1925 objects from it.
1926
1927 In the latter case the caller is advised to ensure some more data in
1928 the input stream, then call the iterator again. The decoder will resume
1929 the decoding process using the newly arrived data.
1930
1931 The `context` property of :py:class:`~pyasn1.error.SubstrateUnderrunError`
1932 object might hold a reference to the partially populated ASN.1 object
1933 being reconstructed.
1934
1935 Raises
1936 ------
1937 ~pyasn1.error.PyAsn1Error, ~pyasn1.error.EndOfStreamError
1938 `PyAsn1Error` on deserialization error, `EndOfStreamError` on
1939 premature stream closure.
1940
1941 Examples
1942 --------
1943 Decode BER serialisation without ASN.1 schema
1944
1945 .. code-block:: pycon
1946
1947 >>> stream = io.BytesIO(
1948 ... b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03')
1949 >>>
1950 >>> for asn1Object in StreamingDecoder(stream):
1951 ... print(asn1Object)
1952 >>>
1953 SequenceOf:
1954 1 2 3
1955
1956 Decode BER serialisation with ASN.1 schema
1957
1958 .. code-block:: pycon
1959
1960 >>> stream = io.BytesIO(
1961 ... b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03')
1962 >>>
1963 >>> schema = SequenceOf(componentType=Integer())
1964 >>>
1965 >>> decoder = StreamingDecoder(stream, asn1Spec=schema)
1966 >>> for asn1Object in decoder:
1967 ... print(asn1Object)
1968 >>>
1969 SequenceOf:
1970 1 2 3
1971 """
1972
1973 SINGLE_ITEM_DECODER = SingleItemDecoder
1974
1975 def __init__(self, substrate, asn1Spec=None, **options):
1976 self._singleItemDecoder = self.SINGLE_ITEM_DECODER(**options)
1977 self._substrate = asSeekableStream(substrate)
1978 self._asn1Spec = asn1Spec
1979 self._options = options
1980
1981 def __iter__(self):
1982 while True:
1983 for asn1Object in self._singleItemDecoder(
1984 self._substrate, self._asn1Spec, **self._options):
1985 yield asn1Object
1986
1987 for chunk in isEndOfStream(self._substrate):
1988 if isinstance(chunk, SubstrateUnderrunError):
1989 yield
1990
1991 break
1992
1993 if chunk:
1994 break
1995
1996
1997class Decoder(object):
1998 """Create a BER decoder object.
1999
2000 Parse BER/CER/DER octet-stream into one, possibly nested, ASN.1 object.
2001 """
2002 STREAMING_DECODER = StreamingDecoder
2003
2004 @classmethod
2005 def __call__(cls, substrate, asn1Spec=None, **options):
2006 """Turns BER/CER/DER octet stream into an ASN.1 object.
2007
2008 Takes BER/CER/DER octet-stream in form of :py:class:`bytes` (Python 3)
2009 or :py:class:`str` (Python 2) and decode it into an ASN.1 object
2010 (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) which
2011 may be a scalar or an arbitrary nested structure.
2012
2013 Parameters
2014 ----------
2015 substrate: :py:class:`bytes` (Python 3) or :py:class:`str` (Python 2)
2016 BER/CER/DER octet-stream to parse
2017
2018 Keyword Args
2019 ------------
2020 asn1Spec: :py:class:`~pyasn1.type.base.PyAsn1Item`
2021 A pyasn1 type object (:py:class:`~pyasn1.type.base.PyAsn1Item`
2022 derivative) to act as a template guiding the decoder.
2023 Depending on the ASN.1 structure being decoded, `asn1Spec` may or
2024 may not be required. Most common reason for it to require is that
2025 ASN.1 structure is encoded in *IMPLICIT* tagging mode.
2026
2027 substrateFun: :py:class:`Union[
2028 Callable[[pyasn1.type.base.PyAsn1Item, bytes, int],
2029 Tuple[pyasn1.type.base.PyAsn1Item, bytes]],
2030 Callable[[pyasn1.type.base.PyAsn1Item, io.BytesIO, int, dict],
2031 Generator[Union[pyasn1.type.base.PyAsn1Item,
2032 pyasn1.error.SubstrateUnderrunError],
2033 None, None]]
2034 ]`
2035 User callback meant to generalize special use cases like non-recursive or
2036 partial decoding. A 3-arg non-streaming variant is supported for backwards
2037 compatiblilty in addition to the newer 4-arg streaming variant.
2038 The callback will receive the uninitialized object recovered from substrate
2039 as 1st argument, the uninterpreted payload as 2nd argument, and the length
2040 of the uninterpreted payload as 3rd argument. The streaming variant will
2041 additionally receive the decode(..., **options) kwargs as 4th argument.
2042 The non-streaming variant shall return an object that will be propagated
2043 as decode() return value as 1st item, and the remainig payload for further
2044 decode passes as 2nd item.
2045 The streaming variant shall yield an object that will be propagated as
2046 decode() return value, and leave the remaining payload in the stream.
2047
2048 Returns
2049 -------
2050 : :py:class:`tuple`
2051 A tuple of :py:class:`~pyasn1.type.base.PyAsn1Item` object
2052 recovered from BER/CER/DER substrate and the unprocessed trailing
2053 portion of the `substrate` (may be empty)
2054
2055 Raises
2056 ------
2057 : :py:class:`~pyasn1.error.PyAsn1Error`
2058 :py:class:`~pyasn1.error.SubstrateUnderrunError` on insufficient
2059 input or :py:class:`~pyasn1.error.PyAsn1Error` on decoding error.
2060
2061 Examples
2062 --------
2063 Decode BER/CER/DER serialisation without ASN.1 schema
2064
2065 .. code-block:: pycon
2066
2067 >>> s, unprocessed = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03')
2068 >>> str(s)
2069 SequenceOf:
2070 1 2 3
2071
2072 Decode BER/CER/DER serialisation with ASN.1 schema
2073
2074 .. code-block:: pycon
2075
2076 >>> seq = SequenceOf(componentType=Integer())
2077 >>> s, unprocessed = decode(
2078 b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03', asn1Spec=seq)
2079 >>> str(s)
2080 SequenceOf:
2081 1 2 3
2082
2083 """
2084 substrate = asSeekableStream(substrate)
2085
2086 if "substrateFun" in options:
2087 origSubstrateFun = options["substrateFun"]
2088
2089 def substrateFunWrapper(asn1Object, substrate, length, options=None):
2090 """Support both 0.4 and 0.5 style APIs.
2091
2092 substrateFun API has changed in 0.5 for use with streaming decoders. To stay backwards compatible,
2093 we first try if we received a streaming user callback. If that fails,we assume we've received a
2094 non-streaming v0.4 user callback and convert it for streaming on the fly
2095 """
2096 try:
2097 substrate_gen = origSubstrateFun(asn1Object, substrate, length, options)
2098 except TypeError:
2099 _type, _value, traceback = sys.exc_info()
2100 if traceback.tb_next:
2101 # Traceback depth > 1 means TypeError from inside user provided function
2102 raise
2103 # invariant maintained at Decoder.__call__ entry
2104 assert isinstance(substrate, io.BytesIO) # nosec assert_used
2105 substrate_gen = Decoder._callSubstrateFunV4asV5(origSubstrateFun, asn1Object, substrate, length)
2106 for value in substrate_gen:
2107 yield value
2108
2109 options["substrateFun"] = substrateFunWrapper
2110
2111 streamingDecoder = cls.STREAMING_DECODER(
2112 substrate, asn1Spec, **options)
2113
2114 for asn1Object in streamingDecoder:
2115 if isinstance(asn1Object, SubstrateUnderrunError):
2116 raise error.SubstrateUnderrunError('Short substrate on input')
2117
2118 try:
2119 tail = next(readFromStream(substrate))
2120
2121 except error.EndOfStreamError:
2122 tail = null
2123
2124 return asn1Object, tail
2125
2126 @staticmethod
2127 def _callSubstrateFunV4asV5(substrateFunV4, asn1Object, substrate, length):
2128 substrate_bytes = substrate.read()
2129 if length == -1:
2130 length = len(substrate_bytes)
2131 value, nextSubstrate = substrateFunV4(asn1Object, substrate_bytes, length)
2132 nbytes = substrate.write(nextSubstrate)
2133 substrate.truncate()
2134 substrate.seek(-nbytes, os.SEEK_CUR)
2135 yield value
2136
2137#: Turns BER octet stream into an ASN.1 object.
2138#:
2139#: Takes BER octet-stream and decode it into an ASN.1 object
2140#: (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) which
2141#: may be a scalar or an arbitrary nested structure.
2142#:
2143#: Parameters
2144#: ----------
2145#: substrate: :py:class:`bytes` (Python 3) or :py:class:`str` (Python 2)
2146#: BER octet-stream
2147#:
2148#: Keyword Args
2149#: ------------
2150#: asn1Spec: any pyasn1 type object e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative
2151#: A pyasn1 type object to act as a template guiding the decoder. Depending on the ASN.1 structure
2152#: being decoded, *asn1Spec* may or may not be required. Most common reason for
2153#: it to require is that ASN.1 structure is encoded in *IMPLICIT* tagging mode.
2154#:
2155#: Returns
2156#: -------
2157#: : :py:class:`tuple`
2158#: A tuple of pyasn1 object recovered from BER substrate (:py:class:`~pyasn1.type.base.PyAsn1Item` derivative)
2159#: and the unprocessed trailing portion of the *substrate* (may be empty)
2160#:
2161#: Raises
2162#: ------
2163#: ~pyasn1.error.PyAsn1Error, ~pyasn1.error.SubstrateUnderrunError
2164#: On decoding errors
2165#:
2166#: Notes
2167#: -----
2168#: This function is deprecated. Please use :py:class:`Decoder` or
2169#: :py:class:`StreamingDecoder` class instance.
2170#:
2171#: Examples
2172#: --------
2173#: Decode BER serialisation without ASN.1 schema
2174#:
2175#: .. code-block:: pycon
2176#:
2177#: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03')
2178#: >>> str(s)
2179#: SequenceOf:
2180#: 1 2 3
2181#:
2182#: Decode BER serialisation with ASN.1 schema
2183#:
2184#: .. code-block:: pycon
2185#:
2186#: >>> seq = SequenceOf(componentType=Integer())
2187#: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03', asn1Spec=seq)
2188#: >>> str(s)
2189#: SequenceOf:
2190#: 1 2 3
2191#:
2192decode = Decoder()