1#
2# This file is part of pyasn1 software.
3#
4# Copyright (c) 2005-2020, Ilya Etingof <etingof@gmail.com>
5# License: https://pyasn1.readthedocs.io/en/latest/license.html
6#
7import io
8import os
9import sys
10import warnings
11
12from pyasn1 import debug
13from pyasn1 import error
14from pyasn1.codec.ber import eoo
15from pyasn1.codec.streaming import asSeekableStream
16from pyasn1.codec.streaming import isEndOfStream
17from pyasn1.codec.streaming import peekIntoStream
18from pyasn1.codec.streaming import readFromStream
19from pyasn1.compat import _MISSING
20from pyasn1.error import PyAsn1Error
21from pyasn1.type import base
22from pyasn1.type import char
23from pyasn1.type import tag
24from pyasn1.type import tagmap
25from pyasn1.type import univ
26from pyasn1.type import useful
27
28__all__ = ['StreamingDecoder', 'Decoder', 'decode']
29
30LOG = debug.registerLoggee(__name__, flags=debug.DEBUG_DECODER)
31
32noValue = base.noValue
33
34SubstrateUnderrunError = error.SubstrateUnderrunError
35
36# Maximum number of continuation octets (high-bit set) allowed per OID arc.
37# 20 octets allows up to 140-bit integers, supporting UUID-based OIDs
38MAX_OID_ARC_CONTINUATION_OCTETS = 20
39MAX_NESTING_DEPTH = 100
40
41# Maximum number of bytes in a BER length field (8 bytes = up to 2^64-1)
42MAX_LENGTH_OCTETS = 8
43
44
45class AbstractPayloadDecoder(object):
46 protoComponent = None
47
48 def valueDecoder(self, substrate, asn1Spec,
49 tagSet=None, length=None, state=None,
50 decodeFun=None, substrateFun=None,
51 **options):
52 """Decode value with fixed byte length.
53
54 The decoder is allowed to consume as many bytes as necessary.
55 """
56 raise error.PyAsn1Error('SingleItemDecoder not implemented for %s' % (tagSet,)) # TODO: Seems more like an NotImplementedError?
57
58 def indefLenValueDecoder(self, substrate, asn1Spec,
59 tagSet=None, length=None, state=None,
60 decodeFun=None, substrateFun=None,
61 **options):
62 """Decode value with undefined length.
63
64 The decoder is allowed to consume as many bytes as necessary.
65 """
66 raise error.PyAsn1Error('Indefinite length mode decoder not implemented for %s' % (tagSet,)) # TODO: Seems more like an NotImplementedError?
67
68 @staticmethod
69 def _passAsn1Object(asn1Object, options):
70 if 'asn1Object' not in options:
71 options['asn1Object'] = asn1Object
72
73 return options
74
75
76class AbstractSimplePayloadDecoder(AbstractPayloadDecoder):
77 @staticmethod
78 def substrateCollector(asn1Object, substrate, length, options):
79 for chunk in readFromStream(substrate, length, options):
80 yield chunk
81
82 def _createComponent(self, asn1Spec, tagSet, value, **options):
83 if options.get('native'):
84 return value
85 elif asn1Spec is None:
86 return self.protoComponent.clone(value, tagSet=tagSet)
87 elif value is noValue:
88 return asn1Spec
89 else:
90 return asn1Spec.clone(value)
91
92
93class RawPayloadDecoder(AbstractSimplePayloadDecoder):
94 protoComponent = univ.Any('')
95
96 def valueDecoder(self, substrate, asn1Spec,
97 tagSet=None, length=None, state=None,
98 decodeFun=None, substrateFun=None,
99 **options):
100 if substrateFun:
101 asn1Object = self._createComponent(asn1Spec, tagSet, '', **options)
102
103 for chunk in substrateFun(asn1Object, substrate, length, options):
104 yield chunk
105
106 return
107
108 for value in decodeFun(substrate, asn1Spec, tagSet, length, **options):
109 yield value
110
111 def indefLenValueDecoder(self, substrate, asn1Spec,
112 tagSet=None, length=None, state=None,
113 decodeFun=None, substrateFun=None,
114 **options):
115 if substrateFun:
116 asn1Object = self._createComponent(asn1Spec, tagSet, '', **options)
117
118 for chunk in substrateFun(asn1Object, substrate, length, options):
119 yield chunk
120
121 return
122
123 while True:
124 for value in decodeFun(
125 substrate, asn1Spec, tagSet, length,
126 allowEoo=True, **options):
127
128 if value is eoo.endOfOctets:
129 return
130
131 yield value
132
133
134rawPayloadDecoder = RawPayloadDecoder()
135
136
137class IntegerPayloadDecoder(AbstractSimplePayloadDecoder):
138 protoComponent = univ.Integer(0)
139
140 def valueDecoder(self, substrate, asn1Spec,
141 tagSet=None, length=None, state=None,
142 decodeFun=None, substrateFun=None,
143 **options):
144
145 if tagSet[0].tagFormat != tag.tagFormatSimple:
146 raise error.PyAsn1Error('Simple tag format expected')
147
148 for chunk in readFromStream(substrate, length, options):
149 if isinstance(chunk, SubstrateUnderrunError):
150 yield chunk
151
152 if chunk:
153 value = int.from_bytes(bytes(chunk), 'big', signed=True)
154
155 else:
156 value = 0
157
158 yield self._createComponent(asn1Spec, tagSet, value, **options)
159
160
161class BooleanPayloadDecoder(IntegerPayloadDecoder):
162 protoComponent = univ.Boolean(0)
163
164 def _createComponent(self, asn1Spec, tagSet, value, **options):
165 return IntegerPayloadDecoder._createComponent(
166 self, asn1Spec, tagSet, value and 1 or 0, **options)
167
168
169class BitStringPayloadDecoder(AbstractSimplePayloadDecoder):
170 protoComponent = univ.BitString(())
171 supportConstructedForm = True
172
173 def valueDecoder(self, substrate, asn1Spec,
174 tagSet=None, length=None, state=None,
175 decodeFun=None, substrateFun=None,
176 **options):
177
178 if substrateFun:
179 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options)
180
181 for chunk in substrateFun(asn1Object, substrate, length, options):
182 yield chunk
183
184 return
185
186 if not length:
187 raise error.PyAsn1Error('Empty BIT STRING substrate')
188
189 for chunk in isEndOfStream(substrate):
190 if isinstance(chunk, SubstrateUnderrunError):
191 yield chunk
192
193 if chunk:
194 raise error.PyAsn1Error('Empty BIT STRING substrate')
195
196 if tagSet[0].tagFormat == tag.tagFormatSimple: # XXX what tag to check?
197
198 for trailingBits in readFromStream(substrate, 1, options):
199 if isinstance(trailingBits, SubstrateUnderrunError):
200 yield trailingBits
201
202 trailingBits = ord(trailingBits)
203 if trailingBits > 7:
204 raise error.PyAsn1Error(
205 'Trailing bits overflow %s' % trailingBits
206 )
207
208 for chunk in readFromStream(substrate, length - 1, options):
209 if isinstance(chunk, SubstrateUnderrunError):
210 yield chunk
211
212 value = self.protoComponent.fromOctetString(
213 chunk, internalFormat=True, padding=trailingBits)
214
215 yield self._createComponent(asn1Spec, tagSet, value, **options)
216
217 return
218
219 if not self.supportConstructedForm:
220 raise error.PyAsn1Error('Constructed encoding form prohibited '
221 'at %s' % self.__class__.__name__)
222
223 if LOG:
224 LOG('assembling constructed serialization')
225
226 # All inner fragments are of the same type, treat them as octet string
227 substrateFun = self.substrateCollector
228
229 bitString = self.protoComponent.fromOctetString(b'', internalFormat=True)
230
231 current_position = substrate.tell()
232
233 while substrate.tell() - current_position < length:
234 for component in decodeFun(
235 substrate, self.protoComponent, substrateFun=substrateFun,
236 **options):
237 if isinstance(component, SubstrateUnderrunError):
238 yield component
239
240 trailingBits = component[0]
241 if trailingBits > 7:
242 raise error.PyAsn1Error(
243 'Trailing bits overflow %s' % trailingBits
244 )
245
246 bitString = self.protoComponent.fromOctetString(
247 component[1:], internalFormat=True,
248 prepend=bitString, padding=trailingBits
249 )
250
251 yield self._createComponent(asn1Spec, tagSet, bitString, **options)
252
253 def indefLenValueDecoder(self, substrate, asn1Spec,
254 tagSet=None, length=None, state=None,
255 decodeFun=None, substrateFun=None,
256 **options):
257
258 if substrateFun:
259 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options)
260
261 for chunk in substrateFun(asn1Object, substrate, length, options):
262 yield chunk
263
264 return
265
266 # All inner fragments are of the same type, treat them as octet string
267 substrateFun = self.substrateCollector
268
269 bitString = self.protoComponent.fromOctetString(b'', internalFormat=True)
270
271 while True: # loop over fragments
272
273 for component in decodeFun(
274 substrate, self.protoComponent, substrateFun=substrateFun,
275 allowEoo=True, **options):
276
277 if component is eoo.endOfOctets:
278 break
279
280 if isinstance(component, SubstrateUnderrunError):
281 yield component
282
283 if component is eoo.endOfOctets:
284 break
285
286 trailingBits = component[0]
287 if trailingBits > 7:
288 raise error.PyAsn1Error(
289 'Trailing bits overflow %s' % trailingBits
290 )
291
292 bitString = self.protoComponent.fromOctetString(
293 component[1:], internalFormat=True,
294 prepend=bitString, padding=trailingBits
295 )
296
297 yield self._createComponent(asn1Spec, tagSet, bitString, **options)
298
299
300class OctetStringPayloadDecoder(AbstractSimplePayloadDecoder):
301 protoComponent = univ.OctetString('')
302 supportConstructedForm = True
303
304 def valueDecoder(self, substrate, asn1Spec,
305 tagSet=None, length=None, state=None,
306 decodeFun=None, substrateFun=None,
307 **options):
308 if substrateFun:
309 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options)
310
311 for chunk in substrateFun(asn1Object, substrate, length, options):
312 yield chunk
313
314 return
315
316 if tagSet[0].tagFormat == tag.tagFormatSimple: # XXX what tag to check?
317 for chunk in readFromStream(substrate, length, options):
318 if isinstance(chunk, SubstrateUnderrunError):
319 yield chunk
320
321 yield self._createComponent(asn1Spec, tagSet, chunk, **options)
322
323 return
324
325 if not self.supportConstructedForm:
326 raise error.PyAsn1Error('Constructed encoding form prohibited at %s' % self.__class__.__name__)
327
328 if LOG:
329 LOG('assembling constructed serialization')
330
331 # All inner fragments are of the same type, treat them as octet string
332 substrateFun = self.substrateCollector
333
334 header = b''
335
336 original_position = substrate.tell()
337 # head = popSubstream(substrate, length)
338 while substrate.tell() - original_position < length:
339 for component in decodeFun(
340 substrate, self.protoComponent, substrateFun=substrateFun,
341 **options):
342 if isinstance(component, SubstrateUnderrunError):
343 yield component
344
345 header += component
346
347 yield self._createComponent(asn1Spec, tagSet, header, **options)
348
349 def indefLenValueDecoder(self, substrate, asn1Spec,
350 tagSet=None, length=None, state=None,
351 decodeFun=None, substrateFun=None,
352 **options):
353 if substrateFun and substrateFun is not self.substrateCollector:
354 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options)
355
356 for chunk in substrateFun(asn1Object, substrate, length, options):
357 yield chunk
358
359 return
360
361 # All inner fragments are of the same type, treat them as octet string
362 substrateFun = self.substrateCollector
363
364 header = b''
365
366 while True: # loop over fragments
367
368 for component in decodeFun(
369 substrate, self.protoComponent, substrateFun=substrateFun,
370 allowEoo=True, **options):
371
372 if isinstance(component, SubstrateUnderrunError):
373 yield component
374
375 if component is eoo.endOfOctets:
376 break
377
378 if component is eoo.endOfOctets:
379 break
380
381 header += component
382
383 yield self._createComponent(asn1Spec, tagSet, header, **options)
384
385
386class NullPayloadDecoder(AbstractSimplePayloadDecoder):
387 protoComponent = univ.Null('')
388
389 def valueDecoder(self, substrate, asn1Spec,
390 tagSet=None, length=None, state=None,
391 decodeFun=None, substrateFun=None,
392 **options):
393
394 if tagSet[0].tagFormat != tag.tagFormatSimple:
395 raise error.PyAsn1Error('Simple tag format expected')
396
397 for chunk in readFromStream(substrate, length, options):
398 if isinstance(chunk, SubstrateUnderrunError):
399 yield chunk
400
401 component = self._createComponent(asn1Spec, tagSet, '', **options)
402
403 if chunk:
404 raise error.PyAsn1Error('Unexpected %d-octet substrate for Null' % length)
405
406 yield component
407
408
409class ObjectIdentifierPayloadDecoder(AbstractSimplePayloadDecoder):
410 protoComponent = univ.ObjectIdentifier(())
411
412 def valueDecoder(self, substrate, asn1Spec,
413 tagSet=None, length=None, state=None,
414 decodeFun=None, substrateFun=None,
415 **options):
416 if tagSet[0].tagFormat != tag.tagFormatSimple:
417 raise error.PyAsn1Error('Simple tag format expected')
418
419 for chunk in readFromStream(substrate, length, options):
420 if isinstance(chunk, SubstrateUnderrunError):
421 yield chunk
422
423 if not chunk:
424 raise error.PyAsn1Error('Empty substrate')
425
426 oid = ()
427 index = 0
428 substrateLen = len(chunk)
429 while index < substrateLen:
430 subId = chunk[index]
431 index += 1
432 if subId < 128:
433 oid += (subId,)
434 elif subId > 128:
435 # Construct subid from a number of octets
436 nextSubId = subId
437 subId = 0
438 continuationOctetCount = 0
439 while nextSubId >= 128:
440 continuationOctetCount += 1
441 if continuationOctetCount > MAX_OID_ARC_CONTINUATION_OCTETS:
442 raise error.PyAsn1Error(
443 'OID arc exceeds maximum continuation octets limit (%d) '
444 'at position %d' % (MAX_OID_ARC_CONTINUATION_OCTETS, index)
445 )
446 subId = (subId << 7) + (nextSubId & 0x7F)
447 if index >= substrateLen:
448 raise error.SubstrateUnderrunError(
449 'Short substrate for sub-OID past %s' % (oid,)
450 )
451 nextSubId = chunk[index]
452 index += 1
453 oid += ((subId << 7) + nextSubId,)
454 elif subId == 128:
455 # ASN.1 spec forbids leading zeros (0x80) in OID
456 # encoding, tolerating it opens a vulnerability. See
457 # https://www.esat.kuleuven.be/cosic/publications/article-1432.pdf
458 # page 7
459 raise error.PyAsn1Error('Invalid octet 0x80 in OID encoding')
460
461 # Decode two leading arcs
462 if 0 <= oid[0] <= 39:
463 oid = (0,) + oid
464 elif 40 <= oid[0] <= 79:
465 oid = (1, oid[0] - 40) + oid[1:]
466 elif oid[0] >= 80:
467 oid = (2, oid[0] - 80) + oid[1:]
468 else:
469 raise error.PyAsn1Error('Malformed first OID octet: %s' % chunk[0])
470
471 yield self._createComponent(asn1Spec, tagSet, oid, **options)
472
473
474class RelativeOIDPayloadDecoder(AbstractSimplePayloadDecoder):
475 protoComponent = univ.RelativeOID(())
476
477 def valueDecoder(self, substrate, asn1Spec,
478 tagSet=None, length=None, state=None,
479 decodeFun=None, substrateFun=None,
480 **options):
481 if tagSet[0].tagFormat != tag.tagFormatSimple:
482 raise error.PyAsn1Error('Simple tag format expected')
483
484 for chunk in readFromStream(substrate, length, options):
485 if isinstance(chunk, SubstrateUnderrunError):
486 yield chunk
487
488 if not chunk:
489 raise error.PyAsn1Error('Empty substrate')
490
491 reloid = ()
492 index = 0
493 substrateLen = len(chunk)
494 while index < substrateLen:
495 subId = chunk[index]
496 index += 1
497 if subId < 128:
498 reloid += (subId,)
499 elif subId > 128:
500 # Construct subid from a number of octets
501 nextSubId = subId
502 subId = 0
503 continuationOctetCount = 0
504 while nextSubId >= 128:
505 continuationOctetCount += 1
506 if continuationOctetCount > MAX_OID_ARC_CONTINUATION_OCTETS:
507 raise error.PyAsn1Error(
508 'RELATIVE-OID arc exceeds maximum continuation octets limit (%d) '
509 'at position %d' % (MAX_OID_ARC_CONTINUATION_OCTETS, index)
510 )
511 subId = (subId << 7) + (nextSubId & 0x7F)
512 if index >= substrateLen:
513 raise error.SubstrateUnderrunError(
514 'Short substrate for sub-OID past %s' % (reloid,)
515 )
516 nextSubId = chunk[index]
517 index += 1
518 reloid += ((subId << 7) + nextSubId,)
519 elif subId == 128:
520 # ASN.1 spec forbids leading zeros (0x80) in OID
521 # encoding, tolerating it opens a vulnerability. See
522 # https://www.esat.kuleuven.be/cosic/publications/article-1432.pdf
523 # page 7
524 raise error.PyAsn1Error('Invalid octet 0x80 in RELATIVE-OID encoding')
525
526 yield self._createComponent(asn1Spec, tagSet, reloid, **options)
527
528
529class RealPayloadDecoder(AbstractSimplePayloadDecoder):
530 protoComponent = univ.Real()
531
532 def valueDecoder(self, substrate, asn1Spec,
533 tagSet=None, length=None, state=None,
534 decodeFun=None, substrateFun=None,
535 **options):
536 if tagSet[0].tagFormat != tag.tagFormatSimple:
537 raise error.PyAsn1Error('Simple tag format expected')
538
539 for chunk in readFromStream(substrate, length, options):
540 if isinstance(chunk, SubstrateUnderrunError):
541 yield chunk
542
543 if not chunk:
544 yield self._createComponent(asn1Spec, tagSet, 0.0, **options)
545 return
546
547 fo = chunk[0]
548 chunk = chunk[1:]
549 if fo & 0x80: # binary encoding
550 if not chunk:
551 raise error.PyAsn1Error("Incomplete floating-point value")
552
553 if LOG:
554 LOG('decoding binary encoded REAL')
555
556 n = (fo & 0x03) + 1
557
558 if n == 4:
559 n = chunk[0]
560 chunk = chunk[1:]
561
562 eo, chunk = chunk[:n], chunk[n:]
563
564 if not eo or not chunk:
565 raise error.PyAsn1Error('Real exponent screwed')
566
567 e = eo[0] & 0x80 and -1 or 0
568
569 while eo: # exponent
570 e <<= 8
571 e |= eo[0]
572 eo = eo[1:]
573
574 b = fo >> 4 & 0x03 # base bits
575
576 if b > 2:
577 raise error.PyAsn1Error('Illegal Real base')
578
579 if b == 1: # encbase = 8
580 e *= 3
581
582 elif b == 2: # encbase = 16
583 e *= 4
584 p = 0
585
586 while chunk: # value
587 p <<= 8
588 p |= chunk[0]
589 chunk = chunk[1:]
590
591 if fo & 0x40: # sign bit
592 p = -p
593
594 sf = fo >> 2 & 0x03 # scale bits
595 p *= 2 ** sf
596 value = (p, 2, e)
597
598 elif fo & 0x40: # infinite value
599 if LOG:
600 LOG('decoding infinite REAL')
601
602 value = fo & 0x01 and '-inf' or 'inf'
603
604 elif fo & 0xc0 == 0: # character encoding
605 if not chunk:
606 raise error.PyAsn1Error("Incomplete floating-point value")
607
608 if LOG:
609 LOG('decoding character encoded REAL')
610
611 try:
612 if fo & 0x3 == 0x1: # NR1
613 value = (int(chunk), 10, 0)
614
615 elif fo & 0x3 == 0x2: # NR2
616 value = float(chunk)
617
618 elif fo & 0x3 == 0x3: # NR3
619 value = float(chunk)
620
621 else:
622 raise error.SubstrateUnderrunError(
623 'Unknown NR (tag %s)' % fo
624 )
625
626 except ValueError:
627 raise error.SubstrateUnderrunError(
628 'Bad character Real syntax'
629 )
630
631 else:
632 raise error.SubstrateUnderrunError(
633 'Unknown encoding (tag %s)' % fo
634 )
635
636 yield self._createComponent(asn1Spec, tagSet, value, **options)
637
638
639class AbstractConstructedPayloadDecoder(AbstractPayloadDecoder):
640 protoComponent = None
641
642
643class ConstructedPayloadDecoderBase(AbstractConstructedPayloadDecoder):
644 protoRecordComponent = None
645 protoSequenceComponent = None
646
647 def _getComponentTagMap(self, asn1Object, idx):
648 raise NotImplementedError
649
650 def _getComponentPositionByType(self, asn1Object, tagSet, idx):
651 raise NotImplementedError
652
653 def _decodeComponentsSchemaless(
654 self, substrate, tagSet=None, decodeFun=None,
655 length=None, **options):
656
657 asn1Object = None
658
659 components = []
660 componentTypes = set()
661
662 original_position = substrate.tell()
663
664 while length == -1 or substrate.tell() < original_position + length:
665 for component in decodeFun(substrate, **options):
666 if isinstance(component, SubstrateUnderrunError):
667 yield component
668
669 if length == -1 and component is eoo.endOfOctets:
670 break
671
672 components.append(component)
673 componentTypes.add(component.tagSet)
674
675 # Now we have to guess is it SEQUENCE/SET or SEQUENCE OF/SET OF
676 # The heuristics is:
677 # * 1+ components of different types -> likely SEQUENCE/SET
678 # * otherwise -> likely SEQUENCE OF/SET OF
679 if len(componentTypes) > 1:
680 protoComponent = self.protoRecordComponent
681
682 else:
683 protoComponent = self.protoSequenceComponent
684
685 asn1Object = protoComponent.clone(
686 # construct tagSet from base tag from prototype ASN.1 object
687 # and additional tags recovered from the substrate
688 tagSet=tag.TagSet(protoComponent.tagSet.baseTag, *tagSet.superTags)
689 )
690
691 if LOG:
692 LOG('guessed %r container type (pass `asn1Spec` to guide the '
693 'decoder)' % asn1Object)
694
695 for idx, component in enumerate(components):
696 asn1Object.setComponentByPosition(
697 idx, component,
698 verifyConstraints=False,
699 matchTags=False, matchConstraints=False
700 )
701
702 yield asn1Object
703
704 def valueDecoder(self, substrate, asn1Spec,
705 tagSet=None, length=None, state=None,
706 decodeFun=None, substrateFun=None,
707 **options):
708 if tagSet[0].tagFormat != tag.tagFormatConstructed:
709 raise error.PyAsn1Error('Constructed tag format expected')
710
711 original_position = substrate.tell()
712
713 if substrateFun:
714 if asn1Spec is not None:
715 asn1Object = asn1Spec.clone()
716
717 elif self.protoComponent is not None:
718 asn1Object = self.protoComponent.clone(tagSet=tagSet)
719
720 else:
721 asn1Object = self.protoRecordComponent, self.protoSequenceComponent
722
723 for chunk in substrateFun(asn1Object, substrate, length, options):
724 yield chunk
725
726 return
727
728 if asn1Spec is None:
729 for asn1Object in self._decodeComponentsSchemaless(
730 substrate, tagSet=tagSet, decodeFun=decodeFun,
731 length=length, **options):
732 if isinstance(asn1Object, SubstrateUnderrunError):
733 yield asn1Object
734
735 if substrate.tell() < original_position + length:
736 if LOG:
737 for trailing in readFromStream(substrate, context=options):
738 if isinstance(trailing, SubstrateUnderrunError):
739 yield trailing
740
741 LOG('Unused trailing %d octets encountered: %s' % (
742 len(trailing), debug.hexdump(trailing)))
743
744 yield asn1Object
745
746 return
747
748 asn1Object = asn1Spec.clone()
749 asn1Object.clear()
750
751 options = self._passAsn1Object(asn1Object, options)
752
753 if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId):
754
755 namedTypes = asn1Spec.componentType
756
757 isSetType = asn1Spec.typeId == univ.Set.typeId
758 isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault
759
760 if LOG:
761 LOG('decoding %sdeterministic %s type %r chosen by type ID' % (
762 not isDeterministic and 'non-' or '', isSetType and 'SET' or '',
763 asn1Spec))
764
765 seenIndices = set()
766 idx = 0
767 while substrate.tell() - original_position < length:
768 if not namedTypes:
769 componentType = None
770
771 elif isSetType:
772 componentType = namedTypes.tagMapUnique
773
774 else:
775 try:
776 if isDeterministic:
777 componentType = namedTypes[idx].asn1Object
778
779 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
780 componentType = namedTypes.getTagMapNearPosition(idx)
781
782 else:
783 componentType = namedTypes[idx].asn1Object
784
785 except IndexError:
786 raise error.PyAsn1Error(
787 'Excessive components decoded at %r' % (asn1Spec,)
788 )
789
790 for component in decodeFun(substrate, componentType, **options):
791 if isinstance(component, SubstrateUnderrunError):
792 yield component
793
794 if not isDeterministic and namedTypes:
795 if isSetType:
796 idx = namedTypes.getPositionByType(component.effectiveTagSet)
797
798 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
799 idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx)
800
801 asn1Object.setComponentByPosition(
802 idx, component,
803 verifyConstraints=False,
804 matchTags=False, matchConstraints=False
805 )
806
807 seenIndices.add(idx)
808 idx += 1
809
810 if LOG:
811 LOG('seen component indices %s' % seenIndices)
812
813 if namedTypes:
814 if not namedTypes.requiredComponents.issubset(seenIndices):
815 raise error.PyAsn1Error(
816 'ASN.1 object %s has uninitialized '
817 'components' % asn1Object.__class__.__name__)
818
819 if namedTypes.hasOpenTypes:
820
821 openTypes = options.get('openTypes', {})
822
823 if LOG:
824 LOG('user-specified open types map:')
825
826 for k, v in openTypes.items():
827 LOG('%s -> %r' % (k, v))
828
829 if openTypes or options.get('decodeOpenTypes', False):
830
831 for idx, namedType in enumerate(namedTypes.namedTypes):
832 if not namedType.openType:
833 continue
834
835 if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue:
836 continue
837
838 governingValue = asn1Object.getComponentByName(
839 namedType.openType.name
840 )
841
842 try:
843 openType = openTypes[governingValue]
844
845 except KeyError:
846
847 if LOG:
848 LOG('default open types map of component '
849 '"%s.%s" governed by component "%s.%s"'
850 ':' % (asn1Object.__class__.__name__,
851 namedType.name,
852 asn1Object.__class__.__name__,
853 namedType.openType.name))
854
855 for k, v in namedType.openType.items():
856 LOG('%s -> %r' % (k, v))
857
858 try:
859 openType = namedType.openType[governingValue]
860
861 except KeyError:
862 if LOG:
863 LOG('failed to resolve open type by governing '
864 'value %r' % (governingValue,))
865 continue
866
867 if LOG:
868 LOG('resolved open type %r by governing '
869 'value %r' % (openType, governingValue))
870
871 containerValue = asn1Object.getComponentByPosition(idx)
872
873 if containerValue.typeId in (
874 univ.SetOf.typeId, univ.SequenceOf.typeId):
875
876 for pos, containerElement in enumerate(
877 containerValue):
878
879 stream = asSeekableStream(containerValue[pos].asOctets())
880
881 for component in decodeFun(stream, asn1Spec=openType, **options):
882 if isinstance(component, SubstrateUnderrunError):
883 yield component
884
885 containerValue[pos] = component
886
887 else:
888 stream = asSeekableStream(asn1Object.getComponentByPosition(idx).asOctets())
889
890 for component in decodeFun(stream, asn1Spec=openType, **options):
891 if isinstance(component, SubstrateUnderrunError):
892 yield component
893
894 asn1Object.setComponentByPosition(idx, component)
895
896 else:
897 inconsistency = asn1Object.isInconsistent
898 if inconsistency:
899 raise error.PyAsn1Error(
900 f"ASN.1 object {asn1Object.__class__.__name__} is inconsistent")
901
902 else:
903 componentType = asn1Spec.componentType
904
905 if LOG:
906 LOG('decoding type %r chosen by given `asn1Spec`' % componentType)
907
908 idx = 0
909
910 while substrate.tell() - original_position < length:
911 for component in decodeFun(substrate, componentType, **options):
912 if isinstance(component, SubstrateUnderrunError):
913 yield component
914
915 asn1Object.setComponentByPosition(
916 idx, component,
917 verifyConstraints=False,
918 matchTags=False, matchConstraints=False
919 )
920
921 idx += 1
922
923 yield asn1Object
924
925 def indefLenValueDecoder(self, substrate, asn1Spec,
926 tagSet=None, length=None, state=None,
927 decodeFun=None, substrateFun=None,
928 **options):
929 if tagSet[0].tagFormat != tag.tagFormatConstructed:
930 raise error.PyAsn1Error('Constructed tag format expected')
931
932 if substrateFun is not None:
933 if asn1Spec is not None:
934 asn1Object = asn1Spec.clone()
935
936 elif self.protoComponent is not None:
937 asn1Object = self.protoComponent.clone(tagSet=tagSet)
938
939 else:
940 asn1Object = self.protoRecordComponent, self.protoSequenceComponent
941
942 for chunk in substrateFun(asn1Object, substrate, length, options):
943 yield chunk
944
945 return
946
947 if asn1Spec is None:
948 for asn1Object in self._decodeComponentsSchemaless(
949 substrate, tagSet=tagSet, decodeFun=decodeFun,
950 length=length, **dict(options, allowEoo=True)):
951 if isinstance(asn1Object, SubstrateUnderrunError):
952 yield asn1Object
953
954 yield asn1Object
955
956 return
957
958 asn1Object = asn1Spec.clone()
959 asn1Object.clear()
960
961 options = self._passAsn1Object(asn1Object, options)
962
963 if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId):
964
965 namedTypes = asn1Object.componentType
966
967 isSetType = asn1Object.typeId == univ.Set.typeId
968 isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault
969
970 if LOG:
971 LOG('decoding %sdeterministic %s type %r chosen by type ID' % (
972 not isDeterministic and 'non-' or '', isSetType and 'SET' or '',
973 asn1Spec))
974
975 seenIndices = set()
976
977 idx = 0
978
979 while True: # loop over components
980 if len(namedTypes) <= idx:
981 asn1Spec = None
982
983 elif isSetType:
984 asn1Spec = namedTypes.tagMapUnique
985
986 else:
987 try:
988 if isDeterministic:
989 asn1Spec = namedTypes[idx].asn1Object
990
991 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
992 asn1Spec = namedTypes.getTagMapNearPosition(idx)
993
994 else:
995 asn1Spec = namedTypes[idx].asn1Object
996
997 except IndexError:
998 raise error.PyAsn1Error(
999 'Excessive components decoded at %r' % (asn1Object,)
1000 )
1001
1002 for component in decodeFun(substrate, asn1Spec, allowEoo=True, **options):
1003
1004 if isinstance(component, SubstrateUnderrunError):
1005 yield component
1006
1007 if component is eoo.endOfOctets:
1008 break
1009
1010 if component is eoo.endOfOctets:
1011 break
1012
1013 if not isDeterministic and namedTypes:
1014 if isSetType:
1015 idx = namedTypes.getPositionByType(component.effectiveTagSet)
1016
1017 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
1018 idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx)
1019
1020 asn1Object.setComponentByPosition(
1021 idx, component,
1022 verifyConstraints=False,
1023 matchTags=False, matchConstraints=False
1024 )
1025
1026 seenIndices.add(idx)
1027 idx += 1
1028
1029 if LOG:
1030 LOG('seen component indices %s' % seenIndices)
1031
1032 if namedTypes:
1033 if not namedTypes.requiredComponents.issubset(seenIndices):
1034 raise error.PyAsn1Error(
1035 'ASN.1 object %s has uninitialized '
1036 'components' % asn1Object.__class__.__name__)
1037
1038 if namedTypes.hasOpenTypes:
1039
1040 openTypes = options.get('openTypes', {})
1041
1042 if LOG:
1043 LOG('user-specified open types map:')
1044
1045 for k, v in openTypes.items():
1046 LOG('%s -> %r' % (k, v))
1047
1048 if openTypes or options.get('decodeOpenTypes', False):
1049
1050 for idx, namedType in enumerate(namedTypes.namedTypes):
1051 if not namedType.openType:
1052 continue
1053
1054 if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue:
1055 continue
1056
1057 governingValue = asn1Object.getComponentByName(
1058 namedType.openType.name
1059 )
1060
1061 try:
1062 openType = openTypes[governingValue]
1063
1064 except KeyError:
1065
1066 if LOG:
1067 LOG('default open types map of component '
1068 '"%s.%s" governed by component "%s.%s"'
1069 ':' % (asn1Object.__class__.__name__,
1070 namedType.name,
1071 asn1Object.__class__.__name__,
1072 namedType.openType.name))
1073
1074 for k, v in namedType.openType.items():
1075 LOG('%s -> %r' % (k, v))
1076
1077 try:
1078 openType = namedType.openType[governingValue]
1079
1080 except KeyError:
1081 if LOG:
1082 LOG('failed to resolve open type by governing '
1083 'value %r' % (governingValue,))
1084 continue
1085
1086 if LOG:
1087 LOG('resolved open type %r by governing '
1088 'value %r' % (openType, governingValue))
1089
1090 containerValue = asn1Object.getComponentByPosition(idx)
1091
1092 if containerValue.typeId in (
1093 univ.SetOf.typeId, univ.SequenceOf.typeId):
1094
1095 for pos, containerElement in enumerate(
1096 containerValue):
1097
1098 stream = asSeekableStream(containerValue[pos].asOctets())
1099
1100 for component in decodeFun(stream, asn1Spec=openType,
1101 **dict(options, allowEoo=True)):
1102 if isinstance(component, SubstrateUnderrunError):
1103 yield component
1104
1105 if component is eoo.endOfOctets:
1106 break
1107
1108 containerValue[pos] = component
1109
1110 else:
1111 stream = asSeekableStream(asn1Object.getComponentByPosition(idx).asOctets())
1112 for component in decodeFun(stream, asn1Spec=openType,
1113 **dict(options, allowEoo=True)):
1114 if isinstance(component, SubstrateUnderrunError):
1115 yield component
1116
1117 if component is eoo.endOfOctets:
1118 break
1119
1120 asn1Object.setComponentByPosition(idx, component)
1121
1122 else:
1123 inconsistency = asn1Object.isInconsistent
1124 if inconsistency:
1125 raise error.PyAsn1Error(
1126 f"ASN.1 object {asn1Object.__class__.__name__} is inconsistent")
1127
1128 else:
1129 componentType = asn1Spec.componentType
1130
1131 if LOG:
1132 LOG('decoding type %r chosen by given `asn1Spec`' % componentType)
1133
1134 idx = 0
1135
1136 while True:
1137
1138 for component in decodeFun(
1139 substrate, componentType, allowEoo=True, **options):
1140
1141 if isinstance(component, SubstrateUnderrunError):
1142 yield component
1143
1144 if component is eoo.endOfOctets:
1145 break
1146
1147 if component is eoo.endOfOctets:
1148 break
1149
1150 asn1Object.setComponentByPosition(
1151 idx, component,
1152 verifyConstraints=False,
1153 matchTags=False, matchConstraints=False
1154 )
1155
1156 idx += 1
1157
1158 yield asn1Object
1159
1160
1161class SequenceOrSequenceOfPayloadDecoder(ConstructedPayloadDecoderBase):
1162 protoRecordComponent = univ.Sequence()
1163 protoSequenceComponent = univ.SequenceOf()
1164
1165
1166class SequencePayloadDecoder(SequenceOrSequenceOfPayloadDecoder):
1167 protoComponent = univ.Sequence()
1168
1169
1170class SequenceOfPayloadDecoder(SequenceOrSequenceOfPayloadDecoder):
1171 protoComponent = univ.SequenceOf()
1172
1173
1174class SetOrSetOfPayloadDecoder(ConstructedPayloadDecoderBase):
1175 protoRecordComponent = univ.Set()
1176 protoSequenceComponent = univ.SetOf()
1177
1178
1179class SetPayloadDecoder(SetOrSetOfPayloadDecoder):
1180 protoComponent = univ.Set()
1181
1182
1183class SetOfPayloadDecoder(SetOrSetOfPayloadDecoder):
1184 protoComponent = univ.SetOf()
1185
1186
1187class ChoicePayloadDecoder(ConstructedPayloadDecoderBase):
1188 protoComponent = univ.Choice()
1189
1190 def valueDecoder(self, substrate, asn1Spec,
1191 tagSet=None, length=None, state=None,
1192 decodeFun=None, substrateFun=None,
1193 **options):
1194 if asn1Spec is None:
1195 asn1Object = self.protoComponent.clone(tagSet=tagSet)
1196
1197 else:
1198 asn1Object = asn1Spec.clone()
1199
1200 if substrateFun:
1201 for chunk in substrateFun(asn1Object, substrate, length, options):
1202 yield chunk
1203
1204 return
1205
1206 options = self._passAsn1Object(asn1Object, options)
1207
1208 if asn1Object.tagSet == tagSet:
1209 if LOG:
1210 LOG('decoding %s as explicitly tagged CHOICE' % (tagSet,))
1211
1212 for component in decodeFun(
1213 substrate, asn1Object.componentTagMap, **options):
1214 if isinstance(component, SubstrateUnderrunError):
1215 yield component
1216
1217 else:
1218 if LOG:
1219 LOG('decoding %s as untagged CHOICE' % (tagSet,))
1220
1221 for component in decodeFun(
1222 substrate, asn1Object.componentTagMap, tagSet, length,
1223 state, **options):
1224 if isinstance(component, SubstrateUnderrunError):
1225 yield component
1226
1227 effectiveTagSet = component.effectiveTagSet
1228
1229 if LOG:
1230 LOG('decoded component %s, effective tag set %s' % (component, effectiveTagSet))
1231
1232 asn1Object.setComponentByType(
1233 effectiveTagSet, component,
1234 verifyConstraints=False,
1235 matchTags=False, matchConstraints=False,
1236 innerFlag=False
1237 )
1238
1239 yield asn1Object
1240
1241 def indefLenValueDecoder(self, substrate, asn1Spec,
1242 tagSet=None, length=None, state=None,
1243 decodeFun=None, substrateFun=None,
1244 **options):
1245 if asn1Spec is None:
1246 asn1Object = self.protoComponent.clone(tagSet=tagSet)
1247
1248 else:
1249 asn1Object = asn1Spec.clone()
1250
1251 if substrateFun:
1252 for chunk in substrateFun(asn1Object, substrate, length, options):
1253 yield chunk
1254
1255 return
1256
1257 options = self._passAsn1Object(asn1Object, options)
1258
1259 isTagged = asn1Object.tagSet == tagSet
1260
1261 if LOG:
1262 LOG('decoding %s as %stagged CHOICE' % (
1263 tagSet, isTagged and 'explicitly ' or 'un'))
1264
1265 while True:
1266
1267 if isTagged:
1268 iterator = decodeFun(
1269 substrate, asn1Object.componentType.tagMapUnique,
1270 **dict(options, allowEoo=True))
1271
1272 else:
1273 iterator = decodeFun(
1274 substrate, asn1Object.componentType.tagMapUnique,
1275 tagSet, length, state, **dict(options, allowEoo=True))
1276
1277 for component in iterator:
1278
1279 if isinstance(component, SubstrateUnderrunError):
1280 yield component
1281
1282 if component is eoo.endOfOctets:
1283 break
1284
1285 effectiveTagSet = component.effectiveTagSet
1286
1287 if LOG:
1288 LOG('decoded component %s, effective tag set '
1289 '%s' % (component, effectiveTagSet))
1290
1291 asn1Object.setComponentByType(
1292 effectiveTagSet, component,
1293 verifyConstraints=False,
1294 matchTags=False, matchConstraints=False,
1295 innerFlag=False
1296 )
1297
1298 if not isTagged:
1299 break
1300
1301 if not isTagged or component is eoo.endOfOctets:
1302 break
1303
1304 yield asn1Object
1305
1306
1307class AnyPayloadDecoder(AbstractSimplePayloadDecoder):
1308 protoComponent = univ.Any()
1309
1310 def valueDecoder(self, substrate, asn1Spec,
1311 tagSet=None, length=None, state=None,
1312 decodeFun=None, substrateFun=None,
1313 **options):
1314 if asn1Spec is None:
1315 isUntagged = True
1316
1317 elif asn1Spec.__class__ is tagmap.TagMap:
1318 isUntagged = tagSet not in asn1Spec.tagMap
1319
1320 else:
1321 isUntagged = tagSet != asn1Spec.tagSet
1322
1323 if isUntagged:
1324 fullPosition = substrate.markedPosition
1325 currentPosition = substrate.tell()
1326
1327 substrate.seek(fullPosition, os.SEEK_SET)
1328 length += currentPosition - fullPosition
1329
1330 if LOG:
1331 for chunk in peekIntoStream(substrate, length):
1332 if isinstance(chunk, SubstrateUnderrunError):
1333 yield chunk
1334 LOG('decoding as untagged ANY, substrate '
1335 '%s' % debug.hexdump(chunk))
1336
1337 if substrateFun:
1338 for chunk in substrateFun(
1339 self._createComponent(asn1Spec, tagSet, noValue, **options),
1340 substrate, length, options):
1341 yield chunk
1342
1343 return
1344
1345 for chunk in readFromStream(substrate, length, options):
1346 if isinstance(chunk, SubstrateUnderrunError):
1347 yield chunk
1348
1349 yield self._createComponent(asn1Spec, tagSet, chunk, **options)
1350
1351 def indefLenValueDecoder(self, substrate, asn1Spec,
1352 tagSet=None, length=None, state=None,
1353 decodeFun=None, substrateFun=None,
1354 **options):
1355 if asn1Spec is None:
1356 isTagged = False
1357
1358 elif asn1Spec.__class__ is tagmap.TagMap:
1359 isTagged = tagSet in asn1Spec.tagMap
1360
1361 else:
1362 isTagged = tagSet == asn1Spec.tagSet
1363
1364 if isTagged:
1365 # tagged Any type -- consume header substrate
1366 chunk = b''
1367
1368 if LOG:
1369 LOG('decoding as tagged ANY')
1370
1371 else:
1372 # TODO: Seems not to be tested
1373 fullPosition = substrate.markedPosition
1374 currentPosition = substrate.tell()
1375
1376 substrate.seek(fullPosition, os.SEEK_SET)
1377 for chunk in readFromStream(substrate, currentPosition - fullPosition, options):
1378 if isinstance(chunk, SubstrateUnderrunError):
1379 yield chunk
1380
1381 if LOG:
1382 LOG('decoding as untagged ANY, header substrate %s' % debug.hexdump(chunk))
1383
1384 # Any components do not inherit initial tag
1385 asn1Spec = self.protoComponent
1386
1387 if substrateFun and substrateFun is not self.substrateCollector:
1388 asn1Object = self._createComponent(
1389 asn1Spec, tagSet, noValue, **options)
1390
1391 for chunk in substrateFun(
1392 asn1Object, chunk + substrate, length + len(chunk), options):
1393 yield chunk
1394
1395 return
1396
1397 if LOG:
1398 LOG('assembling constructed serialization')
1399
1400 # All inner fragments are of the same type, treat them as octet string
1401 substrateFun = self.substrateCollector
1402
1403 while True: # loop over fragments
1404
1405 for component in decodeFun(
1406 substrate, asn1Spec, substrateFun=substrateFun,
1407 allowEoo=True, **options):
1408
1409 if isinstance(component, SubstrateUnderrunError):
1410 yield component
1411
1412 if component is eoo.endOfOctets:
1413 break
1414
1415 if component is eoo.endOfOctets:
1416 break
1417
1418 chunk += component
1419
1420 if substrateFun:
1421 yield chunk # TODO: Weird
1422
1423 else:
1424 yield self._createComponent(asn1Spec, tagSet, chunk, **options)
1425
1426
1427# character string types
1428class UTF8StringPayloadDecoder(OctetStringPayloadDecoder):
1429 protoComponent = char.UTF8String()
1430
1431
1432class NumericStringPayloadDecoder(OctetStringPayloadDecoder):
1433 protoComponent = char.NumericString()
1434
1435
1436class PrintableStringPayloadDecoder(OctetStringPayloadDecoder):
1437 protoComponent = char.PrintableString()
1438
1439
1440class TeletexStringPayloadDecoder(OctetStringPayloadDecoder):
1441 protoComponent = char.TeletexString()
1442
1443
1444class VideotexStringPayloadDecoder(OctetStringPayloadDecoder):
1445 protoComponent = char.VideotexString()
1446
1447
1448class IA5StringPayloadDecoder(OctetStringPayloadDecoder):
1449 protoComponent = char.IA5String()
1450
1451
1452class GraphicStringPayloadDecoder(OctetStringPayloadDecoder):
1453 protoComponent = char.GraphicString()
1454
1455
1456class VisibleStringPayloadDecoder(OctetStringPayloadDecoder):
1457 protoComponent = char.VisibleString()
1458
1459
1460class GeneralStringPayloadDecoder(OctetStringPayloadDecoder):
1461 protoComponent = char.GeneralString()
1462
1463
1464class UniversalStringPayloadDecoder(OctetStringPayloadDecoder):
1465 protoComponent = char.UniversalString()
1466
1467
1468class BMPStringPayloadDecoder(OctetStringPayloadDecoder):
1469 protoComponent = char.BMPString()
1470
1471
1472# "useful" types
1473class ObjectDescriptorPayloadDecoder(OctetStringPayloadDecoder):
1474 protoComponent = useful.ObjectDescriptor()
1475
1476
1477class GeneralizedTimePayloadDecoder(OctetStringPayloadDecoder):
1478 protoComponent = useful.GeneralizedTime()
1479
1480
1481class UTCTimePayloadDecoder(OctetStringPayloadDecoder):
1482 protoComponent = useful.UTCTime()
1483
1484
1485TAG_MAP = {
1486 univ.Integer.tagSet: IntegerPayloadDecoder(),
1487 univ.Boolean.tagSet: BooleanPayloadDecoder(),
1488 univ.BitString.tagSet: BitStringPayloadDecoder(),
1489 univ.OctetString.tagSet: OctetStringPayloadDecoder(),
1490 univ.Null.tagSet: NullPayloadDecoder(),
1491 univ.ObjectIdentifier.tagSet: ObjectIdentifierPayloadDecoder(),
1492 univ.RelativeOID.tagSet: RelativeOIDPayloadDecoder(),
1493 univ.Enumerated.tagSet: IntegerPayloadDecoder(),
1494 univ.Real.tagSet: RealPayloadDecoder(),
1495 univ.Sequence.tagSet: SequenceOrSequenceOfPayloadDecoder(), # conflicts with SequenceOf
1496 univ.Set.tagSet: SetOrSetOfPayloadDecoder(), # conflicts with SetOf
1497 univ.Choice.tagSet: ChoicePayloadDecoder(), # conflicts with Any
1498 # character string types
1499 char.UTF8String.tagSet: UTF8StringPayloadDecoder(),
1500 char.NumericString.tagSet: NumericStringPayloadDecoder(),
1501 char.PrintableString.tagSet: PrintableStringPayloadDecoder(),
1502 char.TeletexString.tagSet: TeletexStringPayloadDecoder(),
1503 char.VideotexString.tagSet: VideotexStringPayloadDecoder(),
1504 char.IA5String.tagSet: IA5StringPayloadDecoder(),
1505 char.GraphicString.tagSet: GraphicStringPayloadDecoder(),
1506 char.VisibleString.tagSet: VisibleStringPayloadDecoder(),
1507 char.GeneralString.tagSet: GeneralStringPayloadDecoder(),
1508 char.UniversalString.tagSet: UniversalStringPayloadDecoder(),
1509 char.BMPString.tagSet: BMPStringPayloadDecoder(),
1510 # useful types
1511 useful.ObjectDescriptor.tagSet: ObjectDescriptorPayloadDecoder(),
1512 useful.GeneralizedTime.tagSet: GeneralizedTimePayloadDecoder(),
1513 useful.UTCTime.tagSet: UTCTimePayloadDecoder()
1514}
1515
1516# Type-to-codec map for ambiguous ASN.1 types
1517TYPE_MAP = {
1518 univ.Set.typeId: SetPayloadDecoder(),
1519 univ.SetOf.typeId: SetOfPayloadDecoder(),
1520 univ.Sequence.typeId: SequencePayloadDecoder(),
1521 univ.SequenceOf.typeId: SequenceOfPayloadDecoder(),
1522 univ.Choice.typeId: ChoicePayloadDecoder(),
1523 univ.Any.typeId: AnyPayloadDecoder()
1524}
1525
1526# Put in non-ambiguous types for faster codec lookup
1527for typeDecoder in TAG_MAP.values():
1528 if typeDecoder.protoComponent is not None:
1529 typeId = typeDecoder.protoComponent.__class__.typeId
1530 if typeId is not None and typeId not in TYPE_MAP:
1531 TYPE_MAP[typeId] = typeDecoder
1532
1533
1534(stDecodeTag,
1535 stDecodeLength,
1536 stGetValueDecoder,
1537 stGetValueDecoderByAsn1Spec,
1538 stGetValueDecoderByTag,
1539 stTryAsExplicitTag,
1540 stDecodeValue,
1541 stDumpRawValue,
1542 stErrorCondition,
1543 stStop) = [x for x in range(10)]
1544
1545
1546EOO_SENTINEL = bytes((0, 0))
1547
1548
1549class SingleItemDecoder(object):
1550 defaultErrorState = stErrorCondition
1551 #defaultErrorState = stDumpRawValue
1552 defaultRawDecoder = AnyPayloadDecoder()
1553
1554 supportIndefLength = True
1555
1556 TAG_MAP = TAG_MAP
1557 TYPE_MAP = TYPE_MAP
1558
1559 def __init__(self, tagMap=_MISSING, typeMap=_MISSING, **ignored):
1560 self._tagMap = tagMap if tagMap is not _MISSING else self.TAG_MAP
1561 self._typeMap = typeMap if typeMap is not _MISSING else self.TYPE_MAP
1562
1563 # Tag & TagSet objects caches
1564 self._tagCache = {}
1565 self._tagSetCache = {}
1566
1567 def __call__(self, substrate, asn1Spec=None,
1568 tagSet=None, length=None, state=stDecodeTag,
1569 decodeFun=None, substrateFun=None,
1570 **options):
1571
1572 _nestingLevel = options.get('_nestingLevel', 0)
1573
1574 if _nestingLevel > MAX_NESTING_DEPTH:
1575 raise error.PyAsn1Error(
1576 'ASN.1 structure nesting depth exceeds limit (%d)' % MAX_NESTING_DEPTH
1577 )
1578
1579 options['_nestingLevel'] = _nestingLevel + 1
1580
1581 allowEoo = options.pop('allowEoo', False)
1582
1583 if LOG:
1584 LOG('decoder called at scope %s with state %d, working with up '
1585 'to %s octets of substrate: '
1586 '%s' % (debug.scope, state, length, substrate))
1587
1588 # Look for end-of-octets sentinel
1589 if allowEoo and self.supportIndefLength:
1590
1591 for eoo_candidate in readFromStream(substrate, 2, options):
1592 if isinstance(eoo_candidate, SubstrateUnderrunError):
1593 yield eoo_candidate
1594
1595 if eoo_candidate == EOO_SENTINEL:
1596 if LOG:
1597 LOG('end-of-octets sentinel found')
1598 yield eoo.endOfOctets
1599 return
1600
1601 else:
1602 substrate.seek(-2, os.SEEK_CUR)
1603
1604 tagMap = self._tagMap
1605 typeMap = self._typeMap
1606 tagCache = self._tagCache
1607 tagSetCache = self._tagSetCache
1608
1609 value = noValue
1610
1611 substrate.markedPosition = substrate.tell()
1612
1613 while state is not stStop:
1614
1615 if state is stDecodeTag:
1616 # Decode tag
1617 isShortTag = True
1618
1619 for firstByte in readFromStream(substrate, 1, options):
1620 if isinstance(firstByte, SubstrateUnderrunError):
1621 yield firstByte
1622
1623 firstOctet = ord(firstByte)
1624
1625 try:
1626 lastTag = tagCache[firstOctet]
1627
1628 except KeyError:
1629 integerTag = firstOctet
1630 tagClass = integerTag & 0xC0
1631 tagFormat = integerTag & 0x20
1632 tagId = integerTag & 0x1F
1633
1634 if tagId == 0x1F:
1635 isShortTag = False
1636 lengthOctetIdx = 0
1637 tagId = 0
1638
1639 while True:
1640 for integerByte in readFromStream(substrate, 1, options):
1641 if isinstance(integerByte, SubstrateUnderrunError):
1642 yield integerByte
1643
1644 if not integerByte:
1645 raise error.SubstrateUnderrunError(
1646 'Short octet stream on long tag decoding'
1647 )
1648
1649 integerTag = ord(integerByte)
1650 lengthOctetIdx += 1
1651 tagId <<= 7
1652 tagId |= (integerTag & 0x7F)
1653
1654 if not integerTag & 0x80:
1655 break
1656
1657 lastTag = tag.Tag(
1658 tagClass=tagClass, tagFormat=tagFormat, tagId=tagId
1659 )
1660
1661 if isShortTag:
1662 # cache short tags
1663 tagCache[firstOctet] = lastTag
1664
1665 if tagSet is None:
1666 if isShortTag:
1667 try:
1668 tagSet = tagSetCache[firstOctet]
1669
1670 except KeyError:
1671 # base tag not recovered
1672 tagSet = tag.TagSet((), lastTag)
1673 tagSetCache[firstOctet] = tagSet
1674 else:
1675 tagSet = tag.TagSet((), lastTag)
1676
1677 else:
1678 tagSet = lastTag + tagSet
1679
1680 state = stDecodeLength
1681
1682 if LOG:
1683 LOG('tag decoded into %s, decoding length' % tagSet)
1684
1685 if state is stDecodeLength:
1686 # Decode length
1687 for firstOctet in readFromStream(substrate, 1, options):
1688 if isinstance(firstOctet, SubstrateUnderrunError):
1689 yield firstOctet
1690
1691 firstOctet = ord(firstOctet)
1692
1693 if firstOctet < 128:
1694 length = firstOctet
1695
1696 elif firstOctet > 128:
1697 size = firstOctet & 0x7F
1698
1699 if size > MAX_LENGTH_OCTETS:
1700 raise error.PyAsn1Error(
1701 'BER length field size %d exceeds limit (%d)' % (
1702 size, MAX_LENGTH_OCTETS)
1703 )
1704
1705 # encoded in size bytes
1706 for encodedLength in readFromStream(substrate, size, options):
1707 if isinstance(encodedLength, SubstrateUnderrunError):
1708 yield encodedLength
1709 encodedLength = list(encodedLength)
1710 if len(encodedLength) != size:
1711 raise error.SubstrateUnderrunError(
1712 '%s<%s at %s' % (size, len(encodedLength), tagSet)
1713 )
1714
1715 length = 0
1716 for lengthOctet in encodedLength:
1717 length <<= 8
1718 length |= lengthOctet
1719 size += 1
1720
1721 else: # 128 means indefinite
1722 length = -1
1723
1724 if length == -1 and not self.supportIndefLength:
1725 raise error.PyAsn1Error('Indefinite length encoding not supported by this codec')
1726
1727 state = stGetValueDecoder
1728
1729 if LOG:
1730 LOG('value length decoded into %d' % length)
1731
1732 if state is stGetValueDecoder:
1733 if asn1Spec is None:
1734 state = stGetValueDecoderByTag
1735
1736 else:
1737 state = stGetValueDecoderByAsn1Spec
1738 #
1739 # There're two ways of creating subtypes in ASN.1 what influences
1740 # decoder operation. These methods are:
1741 # 1) Either base types used in or no IMPLICIT tagging has been
1742 # applied on subtyping.
1743 # 2) Subtype syntax drops base type information (by means of
1744 # IMPLICIT tagging.
1745 # The first case allows for complete tag recovery from substrate
1746 # while the second one requires original ASN.1 type spec for
1747 # decoding.
1748 #
1749 # In either case a set of tags (tagSet) is coming from substrate
1750 # in an incremental, tag-by-tag fashion (this is the case of
1751 # EXPLICIT tag which is most basic). Outermost tag comes first
1752 # from the wire.
1753 #
1754 if state is stGetValueDecoderByTag:
1755 try:
1756 concreteDecoder = tagMap[tagSet]
1757
1758 except KeyError:
1759 concreteDecoder = None
1760
1761 if concreteDecoder:
1762 state = stDecodeValue
1763
1764 else:
1765 try:
1766 concreteDecoder = tagMap[tagSet[:1]]
1767
1768 except KeyError:
1769 concreteDecoder = None
1770
1771 if concreteDecoder:
1772 state = stDecodeValue
1773 else:
1774 state = stTryAsExplicitTag
1775
1776 if LOG:
1777 LOG('codec %s chosen by a built-in type, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag'))
1778 debug.scope.push(concreteDecoder is None and '?' or concreteDecoder.protoComponent.__class__.__name__)
1779
1780 if state is stGetValueDecoderByAsn1Spec:
1781
1782 if asn1Spec.__class__ is tagmap.TagMap:
1783 try:
1784 chosenSpec = asn1Spec[tagSet]
1785
1786 except KeyError:
1787 chosenSpec = None
1788
1789 if LOG:
1790 LOG('candidate ASN.1 spec is a map of:')
1791
1792 for firstOctet, v in asn1Spec.presentTypes.items():
1793 LOG(' %s -> %s' % (firstOctet, v.__class__.__name__))
1794
1795 if asn1Spec.skipTypes:
1796 LOG('but neither of: ')
1797 for firstOctet, v in asn1Spec.skipTypes.items():
1798 LOG(' %s -> %s' % (firstOctet, v.__class__.__name__))
1799 LOG('new candidate ASN.1 spec is %s, chosen by %s' % (chosenSpec is None and '<none>' or chosenSpec.prettyPrintType(), tagSet))
1800
1801 elif tagSet == asn1Spec.tagSet or tagSet in asn1Spec.tagMap:
1802 chosenSpec = asn1Spec
1803 if LOG:
1804 LOG('candidate ASN.1 spec is %s' % asn1Spec.__class__.__name__)
1805
1806 else:
1807 chosenSpec = None
1808
1809 if chosenSpec is not None:
1810 try:
1811 # ambiguous type or just faster codec lookup
1812 concreteDecoder = typeMap[chosenSpec.typeId]
1813
1814 if LOG:
1815 LOG('value decoder chosen for an ambiguous type by type ID %s' % (chosenSpec.typeId,))
1816
1817 except KeyError:
1818 # use base type for codec lookup to recover untagged types
1819 baseTagSet = tag.TagSet(chosenSpec.tagSet.baseTag, chosenSpec.tagSet.baseTag)
1820 try:
1821 # base type or tagged subtype
1822 concreteDecoder = tagMap[baseTagSet]
1823
1824 if LOG:
1825 LOG('value decoder chosen by base %s' % (baseTagSet,))
1826
1827 except KeyError:
1828 concreteDecoder = None
1829
1830 if concreteDecoder:
1831 asn1Spec = chosenSpec
1832 state = stDecodeValue
1833
1834 else:
1835 state = stTryAsExplicitTag
1836
1837 else:
1838 concreteDecoder = None
1839 state = stTryAsExplicitTag
1840
1841 if LOG:
1842 LOG('codec %s chosen by ASN.1 spec, decoding %s' % (state is stDecodeValue and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag'))
1843 debug.scope.push(chosenSpec is None and '?' or chosenSpec.__class__.__name__)
1844
1845 if state is stDecodeValue:
1846 if not options.get('recursiveFlag', True) and not substrateFun: # deprecate this
1847 def substrateFun(asn1Object, _substrate, _length, _options):
1848 """Legacy hack to keep the recursiveFlag=False option supported.
1849
1850 The decode(..., substrateFun=userCallback) option was introduced in 0.1.4 as a generalization
1851 of the old recursiveFlag=False option. Users should pass their callback instead of using
1852 recursiveFlag.
1853 """
1854 yield asn1Object
1855
1856 original_position = substrate.tell()
1857
1858 if length == -1: # indef length
1859 for value in concreteDecoder.indefLenValueDecoder(
1860 substrate, asn1Spec,
1861 tagSet, length, stGetValueDecoder,
1862 self, substrateFun, **options):
1863 if isinstance(value, SubstrateUnderrunError):
1864 yield value
1865
1866 else:
1867 for value in concreteDecoder.valueDecoder(
1868 substrate, asn1Spec,
1869 tagSet, length, stGetValueDecoder,
1870 self, substrateFun, **options):
1871 if isinstance(value, SubstrateUnderrunError):
1872 yield value
1873
1874 bytesRead = substrate.tell() - original_position
1875 if not substrateFun and bytesRead != length:
1876 raise PyAsn1Error(
1877 "Read %s bytes instead of expected %s." % (bytesRead, length))
1878 elif substrateFun and bytesRead > length:
1879 # custom substrateFun may be used for partial decoding, reading less is expected there
1880 raise PyAsn1Error(
1881 "Read %s bytes are more than expected %s." % (bytesRead, length))
1882
1883 if LOG:
1884 LOG('codec %s yields type %s, value:\n%s\n...' % (
1885 concreteDecoder.__class__.__name__, value.__class__.__name__,
1886 isinstance(value, base.Asn1Item) and value.prettyPrint() or value))
1887
1888 state = stStop
1889 break
1890
1891 if state is stTryAsExplicitTag:
1892 if (tagSet and
1893 tagSet[0].tagFormat == tag.tagFormatConstructed and
1894 tagSet[0].tagClass != tag.tagClassUniversal):
1895 # Assume explicit tagging
1896 concreteDecoder = rawPayloadDecoder
1897 state = stDecodeValue
1898
1899 else:
1900 concreteDecoder = None
1901 state = self.defaultErrorState
1902
1903 if LOG:
1904 LOG('codec %s chosen, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as failure'))
1905
1906 if state is stDumpRawValue:
1907 concreteDecoder = self.defaultRawDecoder
1908
1909 if LOG:
1910 LOG('codec %s chosen, decoding value' % concreteDecoder.__class__.__name__)
1911
1912 state = stDecodeValue
1913
1914 if state is stErrorCondition:
1915 raise error.PyAsn1Error(
1916 '%s not in asn1Spec: %r' % (tagSet, asn1Spec)
1917 )
1918
1919 if LOG:
1920 debug.scope.pop()
1921 LOG('decoder left scope %s, call completed' % debug.scope)
1922
1923 yield value
1924
1925
1926class StreamingDecoder(object):
1927 """Create an iterator that turns BER/CER/DER byte stream into ASN.1 objects.
1928
1929 On each iteration, consume whatever BER/CER/DER serialization is
1930 available in the `substrate` stream-like object and turns it into
1931 one or more, possibly nested, ASN.1 objects.
1932
1933 Parameters
1934 ----------
1935 substrate: :py:class:`file`, :py:class:`io.BytesIO`
1936 BER/CER/DER serialization in form of a byte stream
1937
1938 Keyword Args
1939 ------------
1940 asn1Spec: :py:class:`~pyasn1.type.base.PyAsn1Item`
1941 A pyasn1 type object to act as a template guiding the decoder.
1942 Depending on the ASN.1 structure being decoded, `asn1Spec` may
1943 or may not be required. One of the reasons why `asn1Spec` may
1944 me required is that ASN.1 structure is encoded in the *IMPLICIT*
1945 tagging mode.
1946
1947 Yields
1948 ------
1949 : :py:class:`~pyasn1.type.base.PyAsn1Item`, :py:class:`~pyasn1.error.SubstrateUnderrunError`
1950 Decoded ASN.1 object (possibly, nested) or
1951 :py:class:`~pyasn1.error.SubstrateUnderrunError` object indicating
1952 insufficient BER/CER/DER serialization on input to fully recover ASN.1
1953 objects from it.
1954
1955 In the latter case the caller is advised to ensure some more data in
1956 the input stream, then call the iterator again. The decoder will resume
1957 the decoding process using the newly arrived data.
1958
1959 The `context` property of :py:class:`~pyasn1.error.SubstrateUnderrunError`
1960 object might hold a reference to the partially populated ASN.1 object
1961 being reconstructed.
1962
1963 Raises
1964 ------
1965 ~pyasn1.error.PyAsn1Error, ~pyasn1.error.EndOfStreamError
1966 `PyAsn1Error` on deserialization error, `EndOfStreamError` on
1967 premature stream closure.
1968
1969 Examples
1970 --------
1971 Decode BER serialisation without ASN.1 schema
1972
1973 .. code-block:: pycon
1974
1975 >>> stream = io.BytesIO(
1976 ... b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03')
1977 >>>
1978 >>> for asn1Object in StreamingDecoder(stream):
1979 ... print(asn1Object)
1980 >>>
1981 SequenceOf:
1982 1 2 3
1983
1984 Decode BER serialisation with ASN.1 schema
1985
1986 .. code-block:: pycon
1987
1988 >>> stream = io.BytesIO(
1989 ... b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03')
1990 >>>
1991 >>> schema = SequenceOf(componentType=Integer())
1992 >>>
1993 >>> decoder = StreamingDecoder(stream, asn1Spec=schema)
1994 >>> for asn1Object in decoder:
1995 ... print(asn1Object)
1996 >>>
1997 SequenceOf:
1998 1 2 3
1999 """
2000
2001 SINGLE_ITEM_DECODER = SingleItemDecoder
2002
2003 def __init__(self, substrate, asn1Spec=None, **options):
2004 self._singleItemDecoder = self.SINGLE_ITEM_DECODER(**options)
2005 self._substrate = asSeekableStream(substrate)
2006 self._asn1Spec = asn1Spec
2007 self._options = options
2008
2009 def __iter__(self):
2010 while True:
2011 for asn1Object in self._singleItemDecoder(
2012 self._substrate, self._asn1Spec, **self._options):
2013 yield asn1Object
2014
2015 for chunk in isEndOfStream(self._substrate):
2016 if isinstance(chunk, SubstrateUnderrunError):
2017 yield
2018
2019 break
2020
2021 if chunk:
2022 break
2023
2024
2025class Decoder(object):
2026 """Create a BER decoder object.
2027
2028 Parse BER/CER/DER octet-stream into one, possibly nested, ASN.1 object.
2029 """
2030 STREAMING_DECODER = StreamingDecoder
2031
2032 @classmethod
2033 def __call__(cls, substrate, asn1Spec=None, **options):
2034 """Turns BER/CER/DER octet stream into an ASN.1 object.
2035
2036 Takes BER/CER/DER octet-stream in form of :py:class:`bytes`
2037 and decode it into an ASN.1 object
2038 (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) which
2039 may be a scalar or an arbitrary nested structure.
2040
2041 Parameters
2042 ----------
2043 substrate: :py:class:`bytes`
2044 BER/CER/DER octet-stream to parse
2045
2046 Keyword Args
2047 ------------
2048 asn1Spec: :py:class:`~pyasn1.type.base.PyAsn1Item`
2049 A pyasn1 type object (:py:class:`~pyasn1.type.base.PyAsn1Item`
2050 derivative) to act as a template guiding the decoder.
2051 Depending on the ASN.1 structure being decoded, `asn1Spec` may or
2052 may not be required. Most common reason for it to require is that
2053 ASN.1 structure is encoded in *IMPLICIT* tagging mode.
2054
2055 substrateFun: :py:class:`Union[
2056 Callable[[pyasn1.type.base.PyAsn1Item, bytes, int],
2057 Tuple[pyasn1.type.base.PyAsn1Item, bytes]],
2058 Callable[[pyasn1.type.base.PyAsn1Item, io.BytesIO, int, dict],
2059 Generator[Union[pyasn1.type.base.PyAsn1Item,
2060 pyasn1.error.SubstrateUnderrunError],
2061 None, None]]
2062 ]`
2063 User callback meant to generalize special use cases like non-recursive or
2064 partial decoding. A 3-arg non-streaming variant is supported for backwards
2065 compatiblilty in addition to the newer 4-arg streaming variant.
2066 The callback will receive the uninitialized object recovered from substrate
2067 as 1st argument, the uninterpreted payload as 2nd argument, and the length
2068 of the uninterpreted payload as 3rd argument. The streaming variant will
2069 additionally receive the decode(..., **options) kwargs as 4th argument.
2070 The non-streaming variant shall return an object that will be propagated
2071 as decode() return value as 1st item, and the remainig payload for further
2072 decode passes as 2nd item.
2073 The streaming variant shall yield an object that will be propagated as
2074 decode() return value, and leave the remaining payload in the stream.
2075
2076 Returns
2077 -------
2078 : :py:class:`tuple`
2079 A tuple of :py:class:`~pyasn1.type.base.PyAsn1Item` object
2080 recovered from BER/CER/DER substrate and the unprocessed trailing
2081 portion of the `substrate` (may be empty)
2082
2083 Raises
2084 ------
2085 : :py:class:`~pyasn1.error.PyAsn1Error`
2086 :py:class:`~pyasn1.error.SubstrateUnderrunError` on insufficient
2087 input or :py:class:`~pyasn1.error.PyAsn1Error` on decoding error.
2088
2089 Examples
2090 --------
2091 Decode BER/CER/DER serialisation without ASN.1 schema
2092
2093 .. code-block:: pycon
2094
2095 >>> s, unprocessed = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03')
2096 >>> str(s)
2097 SequenceOf:
2098 1 2 3
2099
2100 Decode BER/CER/DER serialisation with ASN.1 schema
2101
2102 .. code-block:: pycon
2103
2104 >>> seq = SequenceOf(componentType=Integer())
2105 >>> s, unprocessed = decode(
2106 b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03', asn1Spec=seq)
2107 >>> str(s)
2108 SequenceOf:
2109 1 2 3
2110
2111 """
2112 substrate = asSeekableStream(substrate)
2113
2114 if "substrateFun" in options:
2115 origSubstrateFun = options["substrateFun"]
2116
2117 def substrateFunWrapper(asn1Object, substrate, length, options=None):
2118 """Support both 0.4 and 0.5 style APIs.
2119
2120 substrateFun API has changed in 0.5 for use with streaming decoders. To stay backwards compatible,
2121 we first try if we received a streaming user callback. If that fails,we assume we've received a
2122 non-streaming v0.4 user callback and convert it for streaming on the fly
2123 """
2124 try:
2125 substrate_gen = origSubstrateFun(asn1Object, substrate, length, options)
2126 except TypeError as _value:
2127 if _value.__traceback__.tb_next:
2128 # Traceback depth > 1 means TypeError from inside user provided function
2129 raise
2130 # invariant maintained at Decoder.__call__ entry
2131 assert isinstance(substrate, io.BytesIO) # nosec assert_used
2132 substrate_gen = Decoder._callSubstrateFunV4asV5(origSubstrateFun, asn1Object, substrate, length)
2133 for value in substrate_gen:
2134 yield value
2135
2136 options["substrateFun"] = substrateFunWrapper
2137
2138 streamingDecoder = cls.STREAMING_DECODER(
2139 substrate, asn1Spec, **options)
2140
2141 for asn1Object in streamingDecoder:
2142 if isinstance(asn1Object, SubstrateUnderrunError):
2143 raise error.SubstrateUnderrunError('Short substrate on input')
2144
2145 try:
2146 tail = next(readFromStream(substrate))
2147
2148 except error.EndOfStreamError:
2149 tail = b''
2150
2151 return asn1Object, tail
2152
2153 @staticmethod
2154 def _callSubstrateFunV4asV5(substrateFunV4, asn1Object, substrate, length):
2155 substrate_bytes = substrate.read()
2156 if length == -1:
2157 length = len(substrate_bytes)
2158 value, nextSubstrate = substrateFunV4(asn1Object, substrate_bytes, length)
2159 nbytes = substrate.write(nextSubstrate)
2160 substrate.truncate()
2161 substrate.seek(-nbytes, os.SEEK_CUR)
2162 yield value
2163
2164#: Turns BER octet stream into an ASN.1 object.
2165#:
2166#: Takes BER octet-stream and decode it into an ASN.1 object
2167#: (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) which
2168#: may be a scalar or an arbitrary nested structure.
2169#:
2170#: Parameters
2171#: ----------
2172#: substrate: :py:class:`bytes`
2173#: BER octet-stream
2174#:
2175#: Keyword Args
2176#: ------------
2177#: asn1Spec: any pyasn1 type object e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative
2178#: A pyasn1 type object to act as a template guiding the decoder. Depending on the ASN.1 structure
2179#: being decoded, *asn1Spec* may or may not be required. Most common reason for
2180#: it to require is that ASN.1 structure is encoded in *IMPLICIT* tagging mode.
2181#:
2182#: Returns
2183#: -------
2184#: : :py:class:`tuple`
2185#: A tuple of pyasn1 object recovered from BER substrate (:py:class:`~pyasn1.type.base.PyAsn1Item` derivative)
2186#: and the unprocessed trailing portion of the *substrate* (may be empty)
2187#:
2188#: Raises
2189#: ------
2190#: ~pyasn1.error.PyAsn1Error, ~pyasn1.error.SubstrateUnderrunError
2191#: On decoding errors
2192#:
2193#: Notes
2194#: -----
2195#: This function is deprecated. Please use :py:class:`Decoder` or
2196#: :py:class:`StreamingDecoder` class instance.
2197#:
2198#: Examples
2199#: --------
2200#: Decode BER serialisation without ASN.1 schema
2201#:
2202#: .. code-block:: pycon
2203#:
2204#: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03')
2205#: >>> str(s)
2206#: SequenceOf:
2207#: 1 2 3
2208#:
2209#: Decode BER serialisation with ASN.1 schema
2210#:
2211#: .. code-block:: pycon
2212#:
2213#: >>> seq = SequenceOf(componentType=Integer())
2214#: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03', asn1Spec=seq)
2215#: >>> str(s)
2216#: SequenceOf:
2217#: 1 2 3
2218#:
2219decode = Decoder()
2220
2221def __getattr__(attr: str):
2222 if newAttr := {"tagMap": "TAG_MAP", "typeMap": "TYPE_MAP"}.get(attr):
2223 warnings.warn(f"{attr} is deprecated. Please use {newAttr} instead.", DeprecationWarning, stacklevel=2)
2224 return globals()[newAttr]
2225 raise AttributeError(attr)