1#
2# This file is part of pyasn1 software.
3#
4# Copyright (c) 2005-2020, Ilya Etingof <etingof@gmail.com>
5# License: https://pyasn1.readthedocs.io/en/latest/license.html
6#
7import io
8import os
9import sys
10import warnings
11
12from pyasn1 import debug
13from pyasn1 import error
14from pyasn1.codec.ber import eoo
15from pyasn1.codec.streaming import asSeekableStream
16from pyasn1.codec.streaming import isEndOfStream
17from pyasn1.codec.streaming import peekIntoStream
18from pyasn1.codec.streaming import readFromStream
19from pyasn1.compat import _MISSING
20from pyasn1.error import PyAsn1Error
21from pyasn1.type import base
22from pyasn1.type import char
23from pyasn1.type import tag
24from pyasn1.type import tagmap
25from pyasn1.type import univ
26from pyasn1.type import useful
27
28__all__ = ['StreamingDecoder', 'Decoder', 'decode']
29
30LOG = debug.registerLoggee(__name__, flags=debug.DEBUG_DECODER)
31
32noValue = base.noValue
33
34SubstrateUnderrunError = error.SubstrateUnderrunError
35
36# Maximum number of continuation octets (high-bit set) allowed per OID arc.
37# 20 octets allows up to 140-bit integers, supporting UUID-based OIDs
38MAX_OID_ARC_CONTINUATION_OCTETS = 20
39
40
41class AbstractPayloadDecoder(object):
42 protoComponent = None
43
44 def valueDecoder(self, substrate, asn1Spec,
45 tagSet=None, length=None, state=None,
46 decodeFun=None, substrateFun=None,
47 **options):
48 """Decode value with fixed byte length.
49
50 The decoder is allowed to consume as many bytes as necessary.
51 """
52 raise error.PyAsn1Error('SingleItemDecoder not implemented for %s' % (tagSet,)) # TODO: Seems more like an NotImplementedError?
53
54 def indefLenValueDecoder(self, substrate, asn1Spec,
55 tagSet=None, length=None, state=None,
56 decodeFun=None, substrateFun=None,
57 **options):
58 """Decode value with undefined length.
59
60 The decoder is allowed to consume as many bytes as necessary.
61 """
62 raise error.PyAsn1Error('Indefinite length mode decoder not implemented for %s' % (tagSet,)) # TODO: Seems more like an NotImplementedError?
63
64 @staticmethod
65 def _passAsn1Object(asn1Object, options):
66 if 'asn1Object' not in options:
67 options['asn1Object'] = asn1Object
68
69 return options
70
71
72class AbstractSimplePayloadDecoder(AbstractPayloadDecoder):
73 @staticmethod
74 def substrateCollector(asn1Object, substrate, length, options):
75 for chunk in readFromStream(substrate, length, options):
76 yield chunk
77
78 def _createComponent(self, asn1Spec, tagSet, value, **options):
79 if options.get('native'):
80 return value
81 elif asn1Spec is None:
82 return self.protoComponent.clone(value, tagSet=tagSet)
83 elif value is noValue:
84 return asn1Spec
85 else:
86 return asn1Spec.clone(value)
87
88
89class RawPayloadDecoder(AbstractSimplePayloadDecoder):
90 protoComponent = univ.Any('')
91
92 def valueDecoder(self, substrate, asn1Spec,
93 tagSet=None, length=None, state=None,
94 decodeFun=None, substrateFun=None,
95 **options):
96 if substrateFun:
97 asn1Object = self._createComponent(asn1Spec, tagSet, '', **options)
98
99 for chunk in substrateFun(asn1Object, substrate, length, options):
100 yield chunk
101
102 return
103
104 for value in decodeFun(substrate, asn1Spec, tagSet, length, **options):
105 yield value
106
107 def indefLenValueDecoder(self, substrate, asn1Spec,
108 tagSet=None, length=None, state=None,
109 decodeFun=None, substrateFun=None,
110 **options):
111 if substrateFun:
112 asn1Object = self._createComponent(asn1Spec, tagSet, '', **options)
113
114 for chunk in substrateFun(asn1Object, substrate, length, options):
115 yield chunk
116
117 return
118
119 while True:
120 for value in decodeFun(
121 substrate, asn1Spec, tagSet, length,
122 allowEoo=True, **options):
123
124 if value is eoo.endOfOctets:
125 return
126
127 yield value
128
129
130rawPayloadDecoder = RawPayloadDecoder()
131
132
133class IntegerPayloadDecoder(AbstractSimplePayloadDecoder):
134 protoComponent = univ.Integer(0)
135
136 def valueDecoder(self, substrate, asn1Spec,
137 tagSet=None, length=None, state=None,
138 decodeFun=None, substrateFun=None,
139 **options):
140
141 if tagSet[0].tagFormat != tag.tagFormatSimple:
142 raise error.PyAsn1Error('Simple tag format expected')
143
144 for chunk in readFromStream(substrate, length, options):
145 if isinstance(chunk, SubstrateUnderrunError):
146 yield chunk
147
148 if chunk:
149 value = int.from_bytes(bytes(chunk), 'big', signed=True)
150
151 else:
152 value = 0
153
154 yield self._createComponent(asn1Spec, tagSet, value, **options)
155
156
157class BooleanPayloadDecoder(IntegerPayloadDecoder):
158 protoComponent = univ.Boolean(0)
159
160 def _createComponent(self, asn1Spec, tagSet, value, **options):
161 return IntegerPayloadDecoder._createComponent(
162 self, asn1Spec, tagSet, value and 1 or 0, **options)
163
164
165class BitStringPayloadDecoder(AbstractSimplePayloadDecoder):
166 protoComponent = univ.BitString(())
167 supportConstructedForm = True
168
169 def valueDecoder(self, substrate, asn1Spec,
170 tagSet=None, length=None, state=None,
171 decodeFun=None, substrateFun=None,
172 **options):
173
174 if substrateFun:
175 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options)
176
177 for chunk in substrateFun(asn1Object, substrate, length, options):
178 yield chunk
179
180 return
181
182 if not length:
183 raise error.PyAsn1Error('Empty BIT STRING substrate')
184
185 for chunk in isEndOfStream(substrate):
186 if isinstance(chunk, SubstrateUnderrunError):
187 yield chunk
188
189 if chunk:
190 raise error.PyAsn1Error('Empty BIT STRING substrate')
191
192 if tagSet[0].tagFormat == tag.tagFormatSimple: # XXX what tag to check?
193
194 for trailingBits in readFromStream(substrate, 1, options):
195 if isinstance(trailingBits, SubstrateUnderrunError):
196 yield trailingBits
197
198 trailingBits = ord(trailingBits)
199 if trailingBits > 7:
200 raise error.PyAsn1Error(
201 'Trailing bits overflow %s' % trailingBits
202 )
203
204 for chunk in readFromStream(substrate, length - 1, options):
205 if isinstance(chunk, SubstrateUnderrunError):
206 yield chunk
207
208 value = self.protoComponent.fromOctetString(
209 chunk, internalFormat=True, padding=trailingBits)
210
211 yield self._createComponent(asn1Spec, tagSet, value, **options)
212
213 return
214
215 if not self.supportConstructedForm:
216 raise error.PyAsn1Error('Constructed encoding form prohibited '
217 'at %s' % self.__class__.__name__)
218
219 if LOG:
220 LOG('assembling constructed serialization')
221
222 # All inner fragments are of the same type, treat them as octet string
223 substrateFun = self.substrateCollector
224
225 bitString = self.protoComponent.fromOctetString(b'', internalFormat=True)
226
227 current_position = substrate.tell()
228
229 while substrate.tell() - current_position < length:
230 for component in decodeFun(
231 substrate, self.protoComponent, substrateFun=substrateFun,
232 **options):
233 if isinstance(component, SubstrateUnderrunError):
234 yield component
235
236 trailingBits = component[0]
237 if trailingBits > 7:
238 raise error.PyAsn1Error(
239 'Trailing bits overflow %s' % trailingBits
240 )
241
242 bitString = self.protoComponent.fromOctetString(
243 component[1:], internalFormat=True,
244 prepend=bitString, padding=trailingBits
245 )
246
247 yield self._createComponent(asn1Spec, tagSet, bitString, **options)
248
249 def indefLenValueDecoder(self, substrate, asn1Spec,
250 tagSet=None, length=None, state=None,
251 decodeFun=None, substrateFun=None,
252 **options):
253
254 if substrateFun:
255 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options)
256
257 for chunk in substrateFun(asn1Object, substrate, length, options):
258 yield chunk
259
260 return
261
262 # All inner fragments are of the same type, treat them as octet string
263 substrateFun = self.substrateCollector
264
265 bitString = self.protoComponent.fromOctetString(b'', internalFormat=True)
266
267 while True: # loop over fragments
268
269 for component in decodeFun(
270 substrate, self.protoComponent, substrateFun=substrateFun,
271 allowEoo=True, **options):
272
273 if component is eoo.endOfOctets:
274 break
275
276 if isinstance(component, SubstrateUnderrunError):
277 yield component
278
279 if component is eoo.endOfOctets:
280 break
281
282 trailingBits = component[0]
283 if trailingBits > 7:
284 raise error.PyAsn1Error(
285 'Trailing bits overflow %s' % trailingBits
286 )
287
288 bitString = self.protoComponent.fromOctetString(
289 component[1:], internalFormat=True,
290 prepend=bitString, padding=trailingBits
291 )
292
293 yield self._createComponent(asn1Spec, tagSet, bitString, **options)
294
295
296class OctetStringPayloadDecoder(AbstractSimplePayloadDecoder):
297 protoComponent = univ.OctetString('')
298 supportConstructedForm = True
299
300 def valueDecoder(self, substrate, asn1Spec,
301 tagSet=None, length=None, state=None,
302 decodeFun=None, substrateFun=None,
303 **options):
304 if substrateFun:
305 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options)
306
307 for chunk in substrateFun(asn1Object, substrate, length, options):
308 yield chunk
309
310 return
311
312 if tagSet[0].tagFormat == tag.tagFormatSimple: # XXX what tag to check?
313 for chunk in readFromStream(substrate, length, options):
314 if isinstance(chunk, SubstrateUnderrunError):
315 yield chunk
316
317 yield self._createComponent(asn1Spec, tagSet, chunk, **options)
318
319 return
320
321 if not self.supportConstructedForm:
322 raise error.PyAsn1Error('Constructed encoding form prohibited at %s' % self.__class__.__name__)
323
324 if LOG:
325 LOG('assembling constructed serialization')
326
327 # All inner fragments are of the same type, treat them as octet string
328 substrateFun = self.substrateCollector
329
330 header = b''
331
332 original_position = substrate.tell()
333 # head = popSubstream(substrate, length)
334 while substrate.tell() - original_position < length:
335 for component in decodeFun(
336 substrate, self.protoComponent, substrateFun=substrateFun,
337 **options):
338 if isinstance(component, SubstrateUnderrunError):
339 yield component
340
341 header += component
342
343 yield self._createComponent(asn1Spec, tagSet, header, **options)
344
345 def indefLenValueDecoder(self, substrate, asn1Spec,
346 tagSet=None, length=None, state=None,
347 decodeFun=None, substrateFun=None,
348 **options):
349 if substrateFun and substrateFun is not self.substrateCollector:
350 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options)
351
352 for chunk in substrateFun(asn1Object, substrate, length, options):
353 yield chunk
354
355 return
356
357 # All inner fragments are of the same type, treat them as octet string
358 substrateFun = self.substrateCollector
359
360 header = b''
361
362 while True: # loop over fragments
363
364 for component in decodeFun(
365 substrate, self.protoComponent, substrateFun=substrateFun,
366 allowEoo=True, **options):
367
368 if isinstance(component, SubstrateUnderrunError):
369 yield component
370
371 if component is eoo.endOfOctets:
372 break
373
374 if component is eoo.endOfOctets:
375 break
376
377 header += component
378
379 yield self._createComponent(asn1Spec, tagSet, header, **options)
380
381
382class NullPayloadDecoder(AbstractSimplePayloadDecoder):
383 protoComponent = univ.Null('')
384
385 def valueDecoder(self, substrate, asn1Spec,
386 tagSet=None, length=None, state=None,
387 decodeFun=None, substrateFun=None,
388 **options):
389
390 if tagSet[0].tagFormat != tag.tagFormatSimple:
391 raise error.PyAsn1Error('Simple tag format expected')
392
393 for chunk in readFromStream(substrate, length, options):
394 if isinstance(chunk, SubstrateUnderrunError):
395 yield chunk
396
397 component = self._createComponent(asn1Spec, tagSet, '', **options)
398
399 if chunk:
400 raise error.PyAsn1Error('Unexpected %d-octet substrate for Null' % length)
401
402 yield component
403
404
405class ObjectIdentifierPayloadDecoder(AbstractSimplePayloadDecoder):
406 protoComponent = univ.ObjectIdentifier(())
407
408 def valueDecoder(self, substrate, asn1Spec,
409 tagSet=None, length=None, state=None,
410 decodeFun=None, substrateFun=None,
411 **options):
412 if tagSet[0].tagFormat != tag.tagFormatSimple:
413 raise error.PyAsn1Error('Simple tag format expected')
414
415 for chunk in readFromStream(substrate, length, options):
416 if isinstance(chunk, SubstrateUnderrunError):
417 yield chunk
418
419 if not chunk:
420 raise error.PyAsn1Error('Empty substrate')
421
422 oid = ()
423 index = 0
424 substrateLen = len(chunk)
425 while index < substrateLen:
426 subId = chunk[index]
427 index += 1
428 if subId < 128:
429 oid += (subId,)
430 elif subId > 128:
431 # Construct subid from a number of octets
432 nextSubId = subId
433 subId = 0
434 continuationOctetCount = 0
435 while nextSubId >= 128:
436 continuationOctetCount += 1
437 if continuationOctetCount > MAX_OID_ARC_CONTINUATION_OCTETS:
438 raise error.PyAsn1Error(
439 'OID arc exceeds maximum continuation octets limit (%d) '
440 'at position %d' % (MAX_OID_ARC_CONTINUATION_OCTETS, index)
441 )
442 subId = (subId << 7) + (nextSubId & 0x7F)
443 if index >= substrateLen:
444 raise error.SubstrateUnderrunError(
445 'Short substrate for sub-OID past %s' % (oid,)
446 )
447 nextSubId = chunk[index]
448 index += 1
449 oid += ((subId << 7) + nextSubId,)
450 elif subId == 128:
451 # ASN.1 spec forbids leading zeros (0x80) in OID
452 # encoding, tolerating it opens a vulnerability. See
453 # https://www.esat.kuleuven.be/cosic/publications/article-1432.pdf
454 # page 7
455 raise error.PyAsn1Error('Invalid octet 0x80 in OID encoding')
456
457 # Decode two leading arcs
458 if 0 <= oid[0] <= 39:
459 oid = (0,) + oid
460 elif 40 <= oid[0] <= 79:
461 oid = (1, oid[0] - 40) + oid[1:]
462 elif oid[0] >= 80:
463 oid = (2, oid[0] - 80) + oid[1:]
464 else:
465 raise error.PyAsn1Error('Malformed first OID octet: %s' % chunk[0])
466
467 yield self._createComponent(asn1Spec, tagSet, oid, **options)
468
469
470class RelativeOIDPayloadDecoder(AbstractSimplePayloadDecoder):
471 protoComponent = univ.RelativeOID(())
472
473 def valueDecoder(self, substrate, asn1Spec,
474 tagSet=None, length=None, state=None,
475 decodeFun=None, substrateFun=None,
476 **options):
477 if tagSet[0].tagFormat != tag.tagFormatSimple:
478 raise error.PyAsn1Error('Simple tag format expected')
479
480 for chunk in readFromStream(substrate, length, options):
481 if isinstance(chunk, SubstrateUnderrunError):
482 yield chunk
483
484 if not chunk:
485 raise error.PyAsn1Error('Empty substrate')
486
487 reloid = ()
488 index = 0
489 substrateLen = len(chunk)
490 while index < substrateLen:
491 subId = chunk[index]
492 index += 1
493 if subId < 128:
494 reloid += (subId,)
495 elif subId > 128:
496 # Construct subid from a number of octets
497 nextSubId = subId
498 subId = 0
499 continuationOctetCount = 0
500 while nextSubId >= 128:
501 continuationOctetCount += 1
502 if continuationOctetCount > MAX_OID_ARC_CONTINUATION_OCTETS:
503 raise error.PyAsn1Error(
504 'RELATIVE-OID arc exceeds maximum continuation octets limit (%d) '
505 'at position %d' % (MAX_OID_ARC_CONTINUATION_OCTETS, index)
506 )
507 subId = (subId << 7) + (nextSubId & 0x7F)
508 if index >= substrateLen:
509 raise error.SubstrateUnderrunError(
510 'Short substrate for sub-OID past %s' % (reloid,)
511 )
512 nextSubId = chunk[index]
513 index += 1
514 reloid += ((subId << 7) + nextSubId,)
515 elif subId == 128:
516 # ASN.1 spec forbids leading zeros (0x80) in OID
517 # encoding, tolerating it opens a vulnerability. See
518 # https://www.esat.kuleuven.be/cosic/publications/article-1432.pdf
519 # page 7
520 raise error.PyAsn1Error('Invalid octet 0x80 in RELATIVE-OID encoding')
521
522 yield self._createComponent(asn1Spec, tagSet, reloid, **options)
523
524
525class RealPayloadDecoder(AbstractSimplePayloadDecoder):
526 protoComponent = univ.Real()
527
528 def valueDecoder(self, substrate, asn1Spec,
529 tagSet=None, length=None, state=None,
530 decodeFun=None, substrateFun=None,
531 **options):
532 if tagSet[0].tagFormat != tag.tagFormatSimple:
533 raise error.PyAsn1Error('Simple tag format expected')
534
535 for chunk in readFromStream(substrate, length, options):
536 if isinstance(chunk, SubstrateUnderrunError):
537 yield chunk
538
539 if not chunk:
540 yield self._createComponent(asn1Spec, tagSet, 0.0, **options)
541 return
542
543 fo = chunk[0]
544 chunk = chunk[1:]
545 if fo & 0x80: # binary encoding
546 if not chunk:
547 raise error.PyAsn1Error("Incomplete floating-point value")
548
549 if LOG:
550 LOG('decoding binary encoded REAL')
551
552 n = (fo & 0x03) + 1
553
554 if n == 4:
555 n = chunk[0]
556 chunk = chunk[1:]
557
558 eo, chunk = chunk[:n], chunk[n:]
559
560 if not eo or not chunk:
561 raise error.PyAsn1Error('Real exponent screwed')
562
563 e = eo[0] & 0x80 and -1 or 0
564
565 while eo: # exponent
566 e <<= 8
567 e |= eo[0]
568 eo = eo[1:]
569
570 b = fo >> 4 & 0x03 # base bits
571
572 if b > 2:
573 raise error.PyAsn1Error('Illegal Real base')
574
575 if b == 1: # encbase = 8
576 e *= 3
577
578 elif b == 2: # encbase = 16
579 e *= 4
580 p = 0
581
582 while chunk: # value
583 p <<= 8
584 p |= chunk[0]
585 chunk = chunk[1:]
586
587 if fo & 0x40: # sign bit
588 p = -p
589
590 sf = fo >> 2 & 0x03 # scale bits
591 p *= 2 ** sf
592 value = (p, 2, e)
593
594 elif fo & 0x40: # infinite value
595 if LOG:
596 LOG('decoding infinite REAL')
597
598 value = fo & 0x01 and '-inf' or 'inf'
599
600 elif fo & 0xc0 == 0: # character encoding
601 if not chunk:
602 raise error.PyAsn1Error("Incomplete floating-point value")
603
604 if LOG:
605 LOG('decoding character encoded REAL')
606
607 try:
608 if fo & 0x3 == 0x1: # NR1
609 value = (int(chunk), 10, 0)
610
611 elif fo & 0x3 == 0x2: # NR2
612 value = float(chunk)
613
614 elif fo & 0x3 == 0x3: # NR3
615 value = float(chunk)
616
617 else:
618 raise error.SubstrateUnderrunError(
619 'Unknown NR (tag %s)' % fo
620 )
621
622 except ValueError:
623 raise error.SubstrateUnderrunError(
624 'Bad character Real syntax'
625 )
626
627 else:
628 raise error.SubstrateUnderrunError(
629 'Unknown encoding (tag %s)' % fo
630 )
631
632 yield self._createComponent(asn1Spec, tagSet, value, **options)
633
634
635class AbstractConstructedPayloadDecoder(AbstractPayloadDecoder):
636 protoComponent = None
637
638
639class ConstructedPayloadDecoderBase(AbstractConstructedPayloadDecoder):
640 protoRecordComponent = None
641 protoSequenceComponent = None
642
643 def _getComponentTagMap(self, asn1Object, idx):
644 raise NotImplementedError
645
646 def _getComponentPositionByType(self, asn1Object, tagSet, idx):
647 raise NotImplementedError
648
649 def _decodeComponentsSchemaless(
650 self, substrate, tagSet=None, decodeFun=None,
651 length=None, **options):
652
653 asn1Object = None
654
655 components = []
656 componentTypes = set()
657
658 original_position = substrate.tell()
659
660 while length == -1 or substrate.tell() < original_position + length:
661 for component in decodeFun(substrate, **options):
662 if isinstance(component, SubstrateUnderrunError):
663 yield component
664
665 if length == -1 and component is eoo.endOfOctets:
666 break
667
668 components.append(component)
669 componentTypes.add(component.tagSet)
670
671 # Now we have to guess is it SEQUENCE/SET or SEQUENCE OF/SET OF
672 # The heuristics is:
673 # * 1+ components of different types -> likely SEQUENCE/SET
674 # * otherwise -> likely SEQUENCE OF/SET OF
675 if len(componentTypes) > 1:
676 protoComponent = self.protoRecordComponent
677
678 else:
679 protoComponent = self.protoSequenceComponent
680
681 asn1Object = protoComponent.clone(
682 # construct tagSet from base tag from prototype ASN.1 object
683 # and additional tags recovered from the substrate
684 tagSet=tag.TagSet(protoComponent.tagSet.baseTag, *tagSet.superTags)
685 )
686
687 if LOG:
688 LOG('guessed %r container type (pass `asn1Spec` to guide the '
689 'decoder)' % asn1Object)
690
691 for idx, component in enumerate(components):
692 asn1Object.setComponentByPosition(
693 idx, component,
694 verifyConstraints=False,
695 matchTags=False, matchConstraints=False
696 )
697
698 yield asn1Object
699
700 def valueDecoder(self, substrate, asn1Spec,
701 tagSet=None, length=None, state=None,
702 decodeFun=None, substrateFun=None,
703 **options):
704 if tagSet[0].tagFormat != tag.tagFormatConstructed:
705 raise error.PyAsn1Error('Constructed tag format expected')
706
707 original_position = substrate.tell()
708
709 if substrateFun:
710 if asn1Spec is not None:
711 asn1Object = asn1Spec.clone()
712
713 elif self.protoComponent is not None:
714 asn1Object = self.protoComponent.clone(tagSet=tagSet)
715
716 else:
717 asn1Object = self.protoRecordComponent, self.protoSequenceComponent
718
719 for chunk in substrateFun(asn1Object, substrate, length, options):
720 yield chunk
721
722 return
723
724 if asn1Spec is None:
725 for asn1Object in self._decodeComponentsSchemaless(
726 substrate, tagSet=tagSet, decodeFun=decodeFun,
727 length=length, **options):
728 if isinstance(asn1Object, SubstrateUnderrunError):
729 yield asn1Object
730
731 if substrate.tell() < original_position + length:
732 if LOG:
733 for trailing in readFromStream(substrate, context=options):
734 if isinstance(trailing, SubstrateUnderrunError):
735 yield trailing
736
737 LOG('Unused trailing %d octets encountered: %s' % (
738 len(trailing), debug.hexdump(trailing)))
739
740 yield asn1Object
741
742 return
743
744 asn1Object = asn1Spec.clone()
745 asn1Object.clear()
746
747 options = self._passAsn1Object(asn1Object, options)
748
749 if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId):
750
751 namedTypes = asn1Spec.componentType
752
753 isSetType = asn1Spec.typeId == univ.Set.typeId
754 isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault
755
756 if LOG:
757 LOG('decoding %sdeterministic %s type %r chosen by type ID' % (
758 not isDeterministic and 'non-' or '', isSetType and 'SET' or '',
759 asn1Spec))
760
761 seenIndices = set()
762 idx = 0
763 while substrate.tell() - original_position < length:
764 if not namedTypes:
765 componentType = None
766
767 elif isSetType:
768 componentType = namedTypes.tagMapUnique
769
770 else:
771 try:
772 if isDeterministic:
773 componentType = namedTypes[idx].asn1Object
774
775 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
776 componentType = namedTypes.getTagMapNearPosition(idx)
777
778 else:
779 componentType = namedTypes[idx].asn1Object
780
781 except IndexError:
782 raise error.PyAsn1Error(
783 'Excessive components decoded at %r' % (asn1Spec,)
784 )
785
786 for component in decodeFun(substrate, componentType, **options):
787 if isinstance(component, SubstrateUnderrunError):
788 yield component
789
790 if not isDeterministic and namedTypes:
791 if isSetType:
792 idx = namedTypes.getPositionByType(component.effectiveTagSet)
793
794 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
795 idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx)
796
797 asn1Object.setComponentByPosition(
798 idx, component,
799 verifyConstraints=False,
800 matchTags=False, matchConstraints=False
801 )
802
803 seenIndices.add(idx)
804 idx += 1
805
806 if LOG:
807 LOG('seen component indices %s' % seenIndices)
808
809 if namedTypes:
810 if not namedTypes.requiredComponents.issubset(seenIndices):
811 raise error.PyAsn1Error(
812 'ASN.1 object %s has uninitialized '
813 'components' % asn1Object.__class__.__name__)
814
815 if namedTypes.hasOpenTypes:
816
817 openTypes = options.get('openTypes', {})
818
819 if LOG:
820 LOG('user-specified open types map:')
821
822 for k, v in openTypes.items():
823 LOG('%s -> %r' % (k, v))
824
825 if openTypes or options.get('decodeOpenTypes', False):
826
827 for idx, namedType in enumerate(namedTypes.namedTypes):
828 if not namedType.openType:
829 continue
830
831 if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue:
832 continue
833
834 governingValue = asn1Object.getComponentByName(
835 namedType.openType.name
836 )
837
838 try:
839 openType = openTypes[governingValue]
840
841 except KeyError:
842
843 if LOG:
844 LOG('default open types map of component '
845 '"%s.%s" governed by component "%s.%s"'
846 ':' % (asn1Object.__class__.__name__,
847 namedType.name,
848 asn1Object.__class__.__name__,
849 namedType.openType.name))
850
851 for k, v in namedType.openType.items():
852 LOG('%s -> %r' % (k, v))
853
854 try:
855 openType = namedType.openType[governingValue]
856
857 except KeyError:
858 if LOG:
859 LOG('failed to resolve open type by governing '
860 'value %r' % (governingValue,))
861 continue
862
863 if LOG:
864 LOG('resolved open type %r by governing '
865 'value %r' % (openType, governingValue))
866
867 containerValue = asn1Object.getComponentByPosition(idx)
868
869 if containerValue.typeId in (
870 univ.SetOf.typeId, univ.SequenceOf.typeId):
871
872 for pos, containerElement in enumerate(
873 containerValue):
874
875 stream = asSeekableStream(containerValue[pos].asOctets())
876
877 for component in decodeFun(stream, asn1Spec=openType, **options):
878 if isinstance(component, SubstrateUnderrunError):
879 yield component
880
881 containerValue[pos] = component
882
883 else:
884 stream = asSeekableStream(asn1Object.getComponentByPosition(idx).asOctets())
885
886 for component in decodeFun(stream, asn1Spec=openType, **options):
887 if isinstance(component, SubstrateUnderrunError):
888 yield component
889
890 asn1Object.setComponentByPosition(idx, component)
891
892 else:
893 inconsistency = asn1Object.isInconsistent
894 if inconsistency:
895 raise error.PyAsn1Error(
896 f"ASN.1 object {asn1Object.__class__.__name__} is inconsistent")
897
898 else:
899 componentType = asn1Spec.componentType
900
901 if LOG:
902 LOG('decoding type %r chosen by given `asn1Spec`' % componentType)
903
904 idx = 0
905
906 while substrate.tell() - original_position < length:
907 for component in decodeFun(substrate, componentType, **options):
908 if isinstance(component, SubstrateUnderrunError):
909 yield component
910
911 asn1Object.setComponentByPosition(
912 idx, component,
913 verifyConstraints=False,
914 matchTags=False, matchConstraints=False
915 )
916
917 idx += 1
918
919 yield asn1Object
920
921 def indefLenValueDecoder(self, substrate, asn1Spec,
922 tagSet=None, length=None, state=None,
923 decodeFun=None, substrateFun=None,
924 **options):
925 if tagSet[0].tagFormat != tag.tagFormatConstructed:
926 raise error.PyAsn1Error('Constructed tag format expected')
927
928 if substrateFun is not None:
929 if asn1Spec is not None:
930 asn1Object = asn1Spec.clone()
931
932 elif self.protoComponent is not None:
933 asn1Object = self.protoComponent.clone(tagSet=tagSet)
934
935 else:
936 asn1Object = self.protoRecordComponent, self.protoSequenceComponent
937
938 for chunk in substrateFun(asn1Object, substrate, length, options):
939 yield chunk
940
941 return
942
943 if asn1Spec is None:
944 for asn1Object in self._decodeComponentsSchemaless(
945 substrate, tagSet=tagSet, decodeFun=decodeFun,
946 length=length, **dict(options, allowEoo=True)):
947 if isinstance(asn1Object, SubstrateUnderrunError):
948 yield asn1Object
949
950 yield asn1Object
951
952 return
953
954 asn1Object = asn1Spec.clone()
955 asn1Object.clear()
956
957 options = self._passAsn1Object(asn1Object, options)
958
959 if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId):
960
961 namedTypes = asn1Object.componentType
962
963 isSetType = asn1Object.typeId == univ.Set.typeId
964 isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault
965
966 if LOG:
967 LOG('decoding %sdeterministic %s type %r chosen by type ID' % (
968 not isDeterministic and 'non-' or '', isSetType and 'SET' or '',
969 asn1Spec))
970
971 seenIndices = set()
972
973 idx = 0
974
975 while True: # loop over components
976 if len(namedTypes) <= idx:
977 asn1Spec = None
978
979 elif isSetType:
980 asn1Spec = namedTypes.tagMapUnique
981
982 else:
983 try:
984 if isDeterministic:
985 asn1Spec = namedTypes[idx].asn1Object
986
987 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
988 asn1Spec = namedTypes.getTagMapNearPosition(idx)
989
990 else:
991 asn1Spec = namedTypes[idx].asn1Object
992
993 except IndexError:
994 raise error.PyAsn1Error(
995 'Excessive components decoded at %r' % (asn1Object,)
996 )
997
998 for component in decodeFun(substrate, asn1Spec, allowEoo=True, **options):
999
1000 if isinstance(component, SubstrateUnderrunError):
1001 yield component
1002
1003 if component is eoo.endOfOctets:
1004 break
1005
1006 if component is eoo.endOfOctets:
1007 break
1008
1009 if not isDeterministic and namedTypes:
1010 if isSetType:
1011 idx = namedTypes.getPositionByType(component.effectiveTagSet)
1012
1013 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
1014 idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx)
1015
1016 asn1Object.setComponentByPosition(
1017 idx, component,
1018 verifyConstraints=False,
1019 matchTags=False, matchConstraints=False
1020 )
1021
1022 seenIndices.add(idx)
1023 idx += 1
1024
1025 if LOG:
1026 LOG('seen component indices %s' % seenIndices)
1027
1028 if namedTypes:
1029 if not namedTypes.requiredComponents.issubset(seenIndices):
1030 raise error.PyAsn1Error(
1031 'ASN.1 object %s has uninitialized '
1032 'components' % asn1Object.__class__.__name__)
1033
1034 if namedTypes.hasOpenTypes:
1035
1036 openTypes = options.get('openTypes', {})
1037
1038 if LOG:
1039 LOG('user-specified open types map:')
1040
1041 for k, v in openTypes.items():
1042 LOG('%s -> %r' % (k, v))
1043
1044 if openTypes or options.get('decodeOpenTypes', False):
1045
1046 for idx, namedType in enumerate(namedTypes.namedTypes):
1047 if not namedType.openType:
1048 continue
1049
1050 if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue:
1051 continue
1052
1053 governingValue = asn1Object.getComponentByName(
1054 namedType.openType.name
1055 )
1056
1057 try:
1058 openType = openTypes[governingValue]
1059
1060 except KeyError:
1061
1062 if LOG:
1063 LOG('default open types map of component '
1064 '"%s.%s" governed by component "%s.%s"'
1065 ':' % (asn1Object.__class__.__name__,
1066 namedType.name,
1067 asn1Object.__class__.__name__,
1068 namedType.openType.name))
1069
1070 for k, v in namedType.openType.items():
1071 LOG('%s -> %r' % (k, v))
1072
1073 try:
1074 openType = namedType.openType[governingValue]
1075
1076 except KeyError:
1077 if LOG:
1078 LOG('failed to resolve open type by governing '
1079 'value %r' % (governingValue,))
1080 continue
1081
1082 if LOG:
1083 LOG('resolved open type %r by governing '
1084 'value %r' % (openType, governingValue))
1085
1086 containerValue = asn1Object.getComponentByPosition(idx)
1087
1088 if containerValue.typeId in (
1089 univ.SetOf.typeId, univ.SequenceOf.typeId):
1090
1091 for pos, containerElement in enumerate(
1092 containerValue):
1093
1094 stream = asSeekableStream(containerValue[pos].asOctets())
1095
1096 for component in decodeFun(stream, asn1Spec=openType,
1097 **dict(options, allowEoo=True)):
1098 if isinstance(component, SubstrateUnderrunError):
1099 yield component
1100
1101 if component is eoo.endOfOctets:
1102 break
1103
1104 containerValue[pos] = component
1105
1106 else:
1107 stream = asSeekableStream(asn1Object.getComponentByPosition(idx).asOctets())
1108 for component in decodeFun(stream, asn1Spec=openType,
1109 **dict(options, allowEoo=True)):
1110 if isinstance(component, SubstrateUnderrunError):
1111 yield component
1112
1113 if component is eoo.endOfOctets:
1114 break
1115
1116 asn1Object.setComponentByPosition(idx, component)
1117
1118 else:
1119 inconsistency = asn1Object.isInconsistent
1120 if inconsistency:
1121 raise error.PyAsn1Error(
1122 f"ASN.1 object {asn1Object.__class__.__name__} is inconsistent")
1123
1124 else:
1125 componentType = asn1Spec.componentType
1126
1127 if LOG:
1128 LOG('decoding type %r chosen by given `asn1Spec`' % componentType)
1129
1130 idx = 0
1131
1132 while True:
1133
1134 for component in decodeFun(
1135 substrate, componentType, allowEoo=True, **options):
1136
1137 if isinstance(component, SubstrateUnderrunError):
1138 yield component
1139
1140 if component is eoo.endOfOctets:
1141 break
1142
1143 if component is eoo.endOfOctets:
1144 break
1145
1146 asn1Object.setComponentByPosition(
1147 idx, component,
1148 verifyConstraints=False,
1149 matchTags=False, matchConstraints=False
1150 )
1151
1152 idx += 1
1153
1154 yield asn1Object
1155
1156
1157class SequenceOrSequenceOfPayloadDecoder(ConstructedPayloadDecoderBase):
1158 protoRecordComponent = univ.Sequence()
1159 protoSequenceComponent = univ.SequenceOf()
1160
1161
1162class SequencePayloadDecoder(SequenceOrSequenceOfPayloadDecoder):
1163 protoComponent = univ.Sequence()
1164
1165
1166class SequenceOfPayloadDecoder(SequenceOrSequenceOfPayloadDecoder):
1167 protoComponent = univ.SequenceOf()
1168
1169
1170class SetOrSetOfPayloadDecoder(ConstructedPayloadDecoderBase):
1171 protoRecordComponent = univ.Set()
1172 protoSequenceComponent = univ.SetOf()
1173
1174
1175class SetPayloadDecoder(SetOrSetOfPayloadDecoder):
1176 protoComponent = univ.Set()
1177
1178
1179class SetOfPayloadDecoder(SetOrSetOfPayloadDecoder):
1180 protoComponent = univ.SetOf()
1181
1182
1183class ChoicePayloadDecoder(ConstructedPayloadDecoderBase):
1184 protoComponent = univ.Choice()
1185
1186 def valueDecoder(self, substrate, asn1Spec,
1187 tagSet=None, length=None, state=None,
1188 decodeFun=None, substrateFun=None,
1189 **options):
1190 if asn1Spec is None:
1191 asn1Object = self.protoComponent.clone(tagSet=tagSet)
1192
1193 else:
1194 asn1Object = asn1Spec.clone()
1195
1196 if substrateFun:
1197 for chunk in substrateFun(asn1Object, substrate, length, options):
1198 yield chunk
1199
1200 return
1201
1202 options = self._passAsn1Object(asn1Object, options)
1203
1204 if asn1Object.tagSet == tagSet:
1205 if LOG:
1206 LOG('decoding %s as explicitly tagged CHOICE' % (tagSet,))
1207
1208 for component in decodeFun(
1209 substrate, asn1Object.componentTagMap, **options):
1210 if isinstance(component, SubstrateUnderrunError):
1211 yield component
1212
1213 else:
1214 if LOG:
1215 LOG('decoding %s as untagged CHOICE' % (tagSet,))
1216
1217 for component in decodeFun(
1218 substrate, asn1Object.componentTagMap, tagSet, length,
1219 state, **options):
1220 if isinstance(component, SubstrateUnderrunError):
1221 yield component
1222
1223 effectiveTagSet = component.effectiveTagSet
1224
1225 if LOG:
1226 LOG('decoded component %s, effective tag set %s' % (component, effectiveTagSet))
1227
1228 asn1Object.setComponentByType(
1229 effectiveTagSet, component,
1230 verifyConstraints=False,
1231 matchTags=False, matchConstraints=False,
1232 innerFlag=False
1233 )
1234
1235 yield asn1Object
1236
1237 def indefLenValueDecoder(self, substrate, asn1Spec,
1238 tagSet=None, length=None, state=None,
1239 decodeFun=None, substrateFun=None,
1240 **options):
1241 if asn1Spec is None:
1242 asn1Object = self.protoComponent.clone(tagSet=tagSet)
1243
1244 else:
1245 asn1Object = asn1Spec.clone()
1246
1247 if substrateFun:
1248 for chunk in substrateFun(asn1Object, substrate, length, options):
1249 yield chunk
1250
1251 return
1252
1253 options = self._passAsn1Object(asn1Object, options)
1254
1255 isTagged = asn1Object.tagSet == tagSet
1256
1257 if LOG:
1258 LOG('decoding %s as %stagged CHOICE' % (
1259 tagSet, isTagged and 'explicitly ' or 'un'))
1260
1261 while True:
1262
1263 if isTagged:
1264 iterator = decodeFun(
1265 substrate, asn1Object.componentType.tagMapUnique,
1266 **dict(options, allowEoo=True))
1267
1268 else:
1269 iterator = decodeFun(
1270 substrate, asn1Object.componentType.tagMapUnique,
1271 tagSet, length, state, **dict(options, allowEoo=True))
1272
1273 for component in iterator:
1274
1275 if isinstance(component, SubstrateUnderrunError):
1276 yield component
1277
1278 if component is eoo.endOfOctets:
1279 break
1280
1281 effectiveTagSet = component.effectiveTagSet
1282
1283 if LOG:
1284 LOG('decoded component %s, effective tag set '
1285 '%s' % (component, effectiveTagSet))
1286
1287 asn1Object.setComponentByType(
1288 effectiveTagSet, component,
1289 verifyConstraints=False,
1290 matchTags=False, matchConstraints=False,
1291 innerFlag=False
1292 )
1293
1294 if not isTagged:
1295 break
1296
1297 if not isTagged or component is eoo.endOfOctets:
1298 break
1299
1300 yield asn1Object
1301
1302
1303class AnyPayloadDecoder(AbstractSimplePayloadDecoder):
1304 protoComponent = univ.Any()
1305
1306 def valueDecoder(self, substrate, asn1Spec,
1307 tagSet=None, length=None, state=None,
1308 decodeFun=None, substrateFun=None,
1309 **options):
1310 if asn1Spec is None:
1311 isUntagged = True
1312
1313 elif asn1Spec.__class__ is tagmap.TagMap:
1314 isUntagged = tagSet not in asn1Spec.tagMap
1315
1316 else:
1317 isUntagged = tagSet != asn1Spec.tagSet
1318
1319 if isUntagged:
1320 fullPosition = substrate.markedPosition
1321 currentPosition = substrate.tell()
1322
1323 substrate.seek(fullPosition, os.SEEK_SET)
1324 length += currentPosition - fullPosition
1325
1326 if LOG:
1327 for chunk in peekIntoStream(substrate, length):
1328 if isinstance(chunk, SubstrateUnderrunError):
1329 yield chunk
1330 LOG('decoding as untagged ANY, substrate '
1331 '%s' % debug.hexdump(chunk))
1332
1333 if substrateFun:
1334 for chunk in substrateFun(
1335 self._createComponent(asn1Spec, tagSet, noValue, **options),
1336 substrate, length, options):
1337 yield chunk
1338
1339 return
1340
1341 for chunk in readFromStream(substrate, length, options):
1342 if isinstance(chunk, SubstrateUnderrunError):
1343 yield chunk
1344
1345 yield self._createComponent(asn1Spec, tagSet, chunk, **options)
1346
1347 def indefLenValueDecoder(self, substrate, asn1Spec,
1348 tagSet=None, length=None, state=None,
1349 decodeFun=None, substrateFun=None,
1350 **options):
1351 if asn1Spec is None:
1352 isTagged = False
1353
1354 elif asn1Spec.__class__ is tagmap.TagMap:
1355 isTagged = tagSet in asn1Spec.tagMap
1356
1357 else:
1358 isTagged = tagSet == asn1Spec.tagSet
1359
1360 if isTagged:
1361 # tagged Any type -- consume header substrate
1362 chunk = b''
1363
1364 if LOG:
1365 LOG('decoding as tagged ANY')
1366
1367 else:
1368 # TODO: Seems not to be tested
1369 fullPosition = substrate.markedPosition
1370 currentPosition = substrate.tell()
1371
1372 substrate.seek(fullPosition, os.SEEK_SET)
1373 for chunk in readFromStream(substrate, currentPosition - fullPosition, options):
1374 if isinstance(chunk, SubstrateUnderrunError):
1375 yield chunk
1376
1377 if LOG:
1378 LOG('decoding as untagged ANY, header substrate %s' % debug.hexdump(chunk))
1379
1380 # Any components do not inherit initial tag
1381 asn1Spec = self.protoComponent
1382
1383 if substrateFun and substrateFun is not self.substrateCollector:
1384 asn1Object = self._createComponent(
1385 asn1Spec, tagSet, noValue, **options)
1386
1387 for chunk in substrateFun(
1388 asn1Object, chunk + substrate, length + len(chunk), options):
1389 yield chunk
1390
1391 return
1392
1393 if LOG:
1394 LOG('assembling constructed serialization')
1395
1396 # All inner fragments are of the same type, treat them as octet string
1397 substrateFun = self.substrateCollector
1398
1399 while True: # loop over fragments
1400
1401 for component in decodeFun(
1402 substrate, asn1Spec, substrateFun=substrateFun,
1403 allowEoo=True, **options):
1404
1405 if isinstance(component, SubstrateUnderrunError):
1406 yield component
1407
1408 if component is eoo.endOfOctets:
1409 break
1410
1411 if component is eoo.endOfOctets:
1412 break
1413
1414 chunk += component
1415
1416 if substrateFun:
1417 yield chunk # TODO: Weird
1418
1419 else:
1420 yield self._createComponent(asn1Spec, tagSet, chunk, **options)
1421
1422
1423# character string types
1424class UTF8StringPayloadDecoder(OctetStringPayloadDecoder):
1425 protoComponent = char.UTF8String()
1426
1427
1428class NumericStringPayloadDecoder(OctetStringPayloadDecoder):
1429 protoComponent = char.NumericString()
1430
1431
1432class PrintableStringPayloadDecoder(OctetStringPayloadDecoder):
1433 protoComponent = char.PrintableString()
1434
1435
1436class TeletexStringPayloadDecoder(OctetStringPayloadDecoder):
1437 protoComponent = char.TeletexString()
1438
1439
1440class VideotexStringPayloadDecoder(OctetStringPayloadDecoder):
1441 protoComponent = char.VideotexString()
1442
1443
1444class IA5StringPayloadDecoder(OctetStringPayloadDecoder):
1445 protoComponent = char.IA5String()
1446
1447
1448class GraphicStringPayloadDecoder(OctetStringPayloadDecoder):
1449 protoComponent = char.GraphicString()
1450
1451
1452class VisibleStringPayloadDecoder(OctetStringPayloadDecoder):
1453 protoComponent = char.VisibleString()
1454
1455
1456class GeneralStringPayloadDecoder(OctetStringPayloadDecoder):
1457 protoComponent = char.GeneralString()
1458
1459
1460class UniversalStringPayloadDecoder(OctetStringPayloadDecoder):
1461 protoComponent = char.UniversalString()
1462
1463
1464class BMPStringPayloadDecoder(OctetStringPayloadDecoder):
1465 protoComponent = char.BMPString()
1466
1467
1468# "useful" types
1469class ObjectDescriptorPayloadDecoder(OctetStringPayloadDecoder):
1470 protoComponent = useful.ObjectDescriptor()
1471
1472
1473class GeneralizedTimePayloadDecoder(OctetStringPayloadDecoder):
1474 protoComponent = useful.GeneralizedTime()
1475
1476
1477class UTCTimePayloadDecoder(OctetStringPayloadDecoder):
1478 protoComponent = useful.UTCTime()
1479
1480
1481TAG_MAP = {
1482 univ.Integer.tagSet: IntegerPayloadDecoder(),
1483 univ.Boolean.tagSet: BooleanPayloadDecoder(),
1484 univ.BitString.tagSet: BitStringPayloadDecoder(),
1485 univ.OctetString.tagSet: OctetStringPayloadDecoder(),
1486 univ.Null.tagSet: NullPayloadDecoder(),
1487 univ.ObjectIdentifier.tagSet: ObjectIdentifierPayloadDecoder(),
1488 univ.RelativeOID.tagSet: RelativeOIDPayloadDecoder(),
1489 univ.Enumerated.tagSet: IntegerPayloadDecoder(),
1490 univ.Real.tagSet: RealPayloadDecoder(),
1491 univ.Sequence.tagSet: SequenceOrSequenceOfPayloadDecoder(), # conflicts with SequenceOf
1492 univ.Set.tagSet: SetOrSetOfPayloadDecoder(), # conflicts with SetOf
1493 univ.Choice.tagSet: ChoicePayloadDecoder(), # conflicts with Any
1494 # character string types
1495 char.UTF8String.tagSet: UTF8StringPayloadDecoder(),
1496 char.NumericString.tagSet: NumericStringPayloadDecoder(),
1497 char.PrintableString.tagSet: PrintableStringPayloadDecoder(),
1498 char.TeletexString.tagSet: TeletexStringPayloadDecoder(),
1499 char.VideotexString.tagSet: VideotexStringPayloadDecoder(),
1500 char.IA5String.tagSet: IA5StringPayloadDecoder(),
1501 char.GraphicString.tagSet: GraphicStringPayloadDecoder(),
1502 char.VisibleString.tagSet: VisibleStringPayloadDecoder(),
1503 char.GeneralString.tagSet: GeneralStringPayloadDecoder(),
1504 char.UniversalString.tagSet: UniversalStringPayloadDecoder(),
1505 char.BMPString.tagSet: BMPStringPayloadDecoder(),
1506 # useful types
1507 useful.ObjectDescriptor.tagSet: ObjectDescriptorPayloadDecoder(),
1508 useful.GeneralizedTime.tagSet: GeneralizedTimePayloadDecoder(),
1509 useful.UTCTime.tagSet: UTCTimePayloadDecoder()
1510}
1511
1512# Type-to-codec map for ambiguous ASN.1 types
1513TYPE_MAP = {
1514 univ.Set.typeId: SetPayloadDecoder(),
1515 univ.SetOf.typeId: SetOfPayloadDecoder(),
1516 univ.Sequence.typeId: SequencePayloadDecoder(),
1517 univ.SequenceOf.typeId: SequenceOfPayloadDecoder(),
1518 univ.Choice.typeId: ChoicePayloadDecoder(),
1519 univ.Any.typeId: AnyPayloadDecoder()
1520}
1521
1522# Put in non-ambiguous types for faster codec lookup
1523for typeDecoder in TAG_MAP.values():
1524 if typeDecoder.protoComponent is not None:
1525 typeId = typeDecoder.protoComponent.__class__.typeId
1526 if typeId is not None and typeId not in TYPE_MAP:
1527 TYPE_MAP[typeId] = typeDecoder
1528
1529
1530(stDecodeTag,
1531 stDecodeLength,
1532 stGetValueDecoder,
1533 stGetValueDecoderByAsn1Spec,
1534 stGetValueDecoderByTag,
1535 stTryAsExplicitTag,
1536 stDecodeValue,
1537 stDumpRawValue,
1538 stErrorCondition,
1539 stStop) = [x for x in range(10)]
1540
1541
1542EOO_SENTINEL = bytes((0, 0))
1543
1544
1545class SingleItemDecoder(object):
1546 defaultErrorState = stErrorCondition
1547 #defaultErrorState = stDumpRawValue
1548 defaultRawDecoder = AnyPayloadDecoder()
1549
1550 supportIndefLength = True
1551
1552 TAG_MAP = TAG_MAP
1553 TYPE_MAP = TYPE_MAP
1554
1555 def __init__(self, tagMap=_MISSING, typeMap=_MISSING, **ignored):
1556 self._tagMap = tagMap if tagMap is not _MISSING else self.TAG_MAP
1557 self._typeMap = typeMap if typeMap is not _MISSING else self.TYPE_MAP
1558
1559 # Tag & TagSet objects caches
1560 self._tagCache = {}
1561 self._tagSetCache = {}
1562
1563 def __call__(self, substrate, asn1Spec=None,
1564 tagSet=None, length=None, state=stDecodeTag,
1565 decodeFun=None, substrateFun=None,
1566 **options):
1567
1568 allowEoo = options.pop('allowEoo', False)
1569
1570 if LOG:
1571 LOG('decoder called at scope %s with state %d, working with up '
1572 'to %s octets of substrate: '
1573 '%s' % (debug.scope, state, length, substrate))
1574
1575 # Look for end-of-octets sentinel
1576 if allowEoo and self.supportIndefLength:
1577
1578 for eoo_candidate in readFromStream(substrate, 2, options):
1579 if isinstance(eoo_candidate, SubstrateUnderrunError):
1580 yield eoo_candidate
1581
1582 if eoo_candidate == EOO_SENTINEL:
1583 if LOG:
1584 LOG('end-of-octets sentinel found')
1585 yield eoo.endOfOctets
1586 return
1587
1588 else:
1589 substrate.seek(-2, os.SEEK_CUR)
1590
1591 tagMap = self._tagMap
1592 typeMap = self._typeMap
1593 tagCache = self._tagCache
1594 tagSetCache = self._tagSetCache
1595
1596 value = noValue
1597
1598 substrate.markedPosition = substrate.tell()
1599
1600 while state is not stStop:
1601
1602 if state is stDecodeTag:
1603 # Decode tag
1604 isShortTag = True
1605
1606 for firstByte in readFromStream(substrate, 1, options):
1607 if isinstance(firstByte, SubstrateUnderrunError):
1608 yield firstByte
1609
1610 firstOctet = ord(firstByte)
1611
1612 try:
1613 lastTag = tagCache[firstOctet]
1614
1615 except KeyError:
1616 integerTag = firstOctet
1617 tagClass = integerTag & 0xC0
1618 tagFormat = integerTag & 0x20
1619 tagId = integerTag & 0x1F
1620
1621 if tagId == 0x1F:
1622 isShortTag = False
1623 lengthOctetIdx = 0
1624 tagId = 0
1625
1626 while True:
1627 for integerByte in readFromStream(substrate, 1, options):
1628 if isinstance(integerByte, SubstrateUnderrunError):
1629 yield integerByte
1630
1631 if not integerByte:
1632 raise error.SubstrateUnderrunError(
1633 'Short octet stream on long tag decoding'
1634 )
1635
1636 integerTag = ord(integerByte)
1637 lengthOctetIdx += 1
1638 tagId <<= 7
1639 tagId |= (integerTag & 0x7F)
1640
1641 if not integerTag & 0x80:
1642 break
1643
1644 lastTag = tag.Tag(
1645 tagClass=tagClass, tagFormat=tagFormat, tagId=tagId
1646 )
1647
1648 if isShortTag:
1649 # cache short tags
1650 tagCache[firstOctet] = lastTag
1651
1652 if tagSet is None:
1653 if isShortTag:
1654 try:
1655 tagSet = tagSetCache[firstOctet]
1656
1657 except KeyError:
1658 # base tag not recovered
1659 tagSet = tag.TagSet((), lastTag)
1660 tagSetCache[firstOctet] = tagSet
1661 else:
1662 tagSet = tag.TagSet((), lastTag)
1663
1664 else:
1665 tagSet = lastTag + tagSet
1666
1667 state = stDecodeLength
1668
1669 if LOG:
1670 LOG('tag decoded into %s, decoding length' % tagSet)
1671
1672 if state is stDecodeLength:
1673 # Decode length
1674 for firstOctet in readFromStream(substrate, 1, options):
1675 if isinstance(firstOctet, SubstrateUnderrunError):
1676 yield firstOctet
1677
1678 firstOctet = ord(firstOctet)
1679
1680 if firstOctet < 128:
1681 length = firstOctet
1682
1683 elif firstOctet > 128:
1684 size = firstOctet & 0x7F
1685 # encoded in size bytes
1686 for encodedLength in readFromStream(substrate, size, options):
1687 if isinstance(encodedLength, SubstrateUnderrunError):
1688 yield encodedLength
1689 encodedLength = list(encodedLength)
1690 # missing check on maximum size, which shouldn't be a
1691 # problem, we can handle more than is possible
1692 if len(encodedLength) != size:
1693 raise error.SubstrateUnderrunError(
1694 '%s<%s at %s' % (size, len(encodedLength), tagSet)
1695 )
1696
1697 length = 0
1698 for lengthOctet in encodedLength:
1699 length <<= 8
1700 length |= lengthOctet
1701 size += 1
1702
1703 else: # 128 means indefinite
1704 length = -1
1705
1706 if length == -1 and not self.supportIndefLength:
1707 raise error.PyAsn1Error('Indefinite length encoding not supported by this codec')
1708
1709 state = stGetValueDecoder
1710
1711 if LOG:
1712 LOG('value length decoded into %d' % length)
1713
1714 if state is stGetValueDecoder:
1715 if asn1Spec is None:
1716 state = stGetValueDecoderByTag
1717
1718 else:
1719 state = stGetValueDecoderByAsn1Spec
1720 #
1721 # There're two ways of creating subtypes in ASN.1 what influences
1722 # decoder operation. These methods are:
1723 # 1) Either base types used in or no IMPLICIT tagging has been
1724 # applied on subtyping.
1725 # 2) Subtype syntax drops base type information (by means of
1726 # IMPLICIT tagging.
1727 # The first case allows for complete tag recovery from substrate
1728 # while the second one requires original ASN.1 type spec for
1729 # decoding.
1730 #
1731 # In either case a set of tags (tagSet) is coming from substrate
1732 # in an incremental, tag-by-tag fashion (this is the case of
1733 # EXPLICIT tag which is most basic). Outermost tag comes first
1734 # from the wire.
1735 #
1736 if state is stGetValueDecoderByTag:
1737 try:
1738 concreteDecoder = tagMap[tagSet]
1739
1740 except KeyError:
1741 concreteDecoder = None
1742
1743 if concreteDecoder:
1744 state = stDecodeValue
1745
1746 else:
1747 try:
1748 concreteDecoder = tagMap[tagSet[:1]]
1749
1750 except KeyError:
1751 concreteDecoder = None
1752
1753 if concreteDecoder:
1754 state = stDecodeValue
1755 else:
1756 state = stTryAsExplicitTag
1757
1758 if LOG:
1759 LOG('codec %s chosen by a built-in type, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag'))
1760 debug.scope.push(concreteDecoder is None and '?' or concreteDecoder.protoComponent.__class__.__name__)
1761
1762 if state is stGetValueDecoderByAsn1Spec:
1763
1764 if asn1Spec.__class__ is tagmap.TagMap:
1765 try:
1766 chosenSpec = asn1Spec[tagSet]
1767
1768 except KeyError:
1769 chosenSpec = None
1770
1771 if LOG:
1772 LOG('candidate ASN.1 spec is a map of:')
1773
1774 for firstOctet, v in asn1Spec.presentTypes.items():
1775 LOG(' %s -> %s' % (firstOctet, v.__class__.__name__))
1776
1777 if asn1Spec.skipTypes:
1778 LOG('but neither of: ')
1779 for firstOctet, v in asn1Spec.skipTypes.items():
1780 LOG(' %s -> %s' % (firstOctet, v.__class__.__name__))
1781 LOG('new candidate ASN.1 spec is %s, chosen by %s' % (chosenSpec is None and '<none>' or chosenSpec.prettyPrintType(), tagSet))
1782
1783 elif tagSet == asn1Spec.tagSet or tagSet in asn1Spec.tagMap:
1784 chosenSpec = asn1Spec
1785 if LOG:
1786 LOG('candidate ASN.1 spec is %s' % asn1Spec.__class__.__name__)
1787
1788 else:
1789 chosenSpec = None
1790
1791 if chosenSpec is not None:
1792 try:
1793 # ambiguous type or just faster codec lookup
1794 concreteDecoder = typeMap[chosenSpec.typeId]
1795
1796 if LOG:
1797 LOG('value decoder chosen for an ambiguous type by type ID %s' % (chosenSpec.typeId,))
1798
1799 except KeyError:
1800 # use base type for codec lookup to recover untagged types
1801 baseTagSet = tag.TagSet(chosenSpec.tagSet.baseTag, chosenSpec.tagSet.baseTag)
1802 try:
1803 # base type or tagged subtype
1804 concreteDecoder = tagMap[baseTagSet]
1805
1806 if LOG:
1807 LOG('value decoder chosen by base %s' % (baseTagSet,))
1808
1809 except KeyError:
1810 concreteDecoder = None
1811
1812 if concreteDecoder:
1813 asn1Spec = chosenSpec
1814 state = stDecodeValue
1815
1816 else:
1817 state = stTryAsExplicitTag
1818
1819 else:
1820 concreteDecoder = None
1821 state = stTryAsExplicitTag
1822
1823 if LOG:
1824 LOG('codec %s chosen by ASN.1 spec, decoding %s' % (state is stDecodeValue and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag'))
1825 debug.scope.push(chosenSpec is None and '?' or chosenSpec.__class__.__name__)
1826
1827 if state is stDecodeValue:
1828 if not options.get('recursiveFlag', True) and not substrateFun: # deprecate this
1829 def substrateFun(asn1Object, _substrate, _length, _options):
1830 """Legacy hack to keep the recursiveFlag=False option supported.
1831
1832 The decode(..., substrateFun=userCallback) option was introduced in 0.1.4 as a generalization
1833 of the old recursiveFlag=False option. Users should pass their callback instead of using
1834 recursiveFlag.
1835 """
1836 yield asn1Object
1837
1838 original_position = substrate.tell()
1839
1840 if length == -1: # indef length
1841 for value in concreteDecoder.indefLenValueDecoder(
1842 substrate, asn1Spec,
1843 tagSet, length, stGetValueDecoder,
1844 self, substrateFun, **options):
1845 if isinstance(value, SubstrateUnderrunError):
1846 yield value
1847
1848 else:
1849 for value in concreteDecoder.valueDecoder(
1850 substrate, asn1Spec,
1851 tagSet, length, stGetValueDecoder,
1852 self, substrateFun, **options):
1853 if isinstance(value, SubstrateUnderrunError):
1854 yield value
1855
1856 bytesRead = substrate.tell() - original_position
1857 if not substrateFun and bytesRead != length:
1858 raise PyAsn1Error(
1859 "Read %s bytes instead of expected %s." % (bytesRead, length))
1860 elif substrateFun and bytesRead > length:
1861 # custom substrateFun may be used for partial decoding, reading less is expected there
1862 raise PyAsn1Error(
1863 "Read %s bytes are more than expected %s." % (bytesRead, length))
1864
1865 if LOG:
1866 LOG('codec %s yields type %s, value:\n%s\n...' % (
1867 concreteDecoder.__class__.__name__, value.__class__.__name__,
1868 isinstance(value, base.Asn1Item) and value.prettyPrint() or value))
1869
1870 state = stStop
1871 break
1872
1873 if state is stTryAsExplicitTag:
1874 if (tagSet and
1875 tagSet[0].tagFormat == tag.tagFormatConstructed and
1876 tagSet[0].tagClass != tag.tagClassUniversal):
1877 # Assume explicit tagging
1878 concreteDecoder = rawPayloadDecoder
1879 state = stDecodeValue
1880
1881 else:
1882 concreteDecoder = None
1883 state = self.defaultErrorState
1884
1885 if LOG:
1886 LOG('codec %s chosen, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as failure'))
1887
1888 if state is stDumpRawValue:
1889 concreteDecoder = self.defaultRawDecoder
1890
1891 if LOG:
1892 LOG('codec %s chosen, decoding value' % concreteDecoder.__class__.__name__)
1893
1894 state = stDecodeValue
1895
1896 if state is stErrorCondition:
1897 raise error.PyAsn1Error(
1898 '%s not in asn1Spec: %r' % (tagSet, asn1Spec)
1899 )
1900
1901 if LOG:
1902 debug.scope.pop()
1903 LOG('decoder left scope %s, call completed' % debug.scope)
1904
1905 yield value
1906
1907
1908class StreamingDecoder(object):
1909 """Create an iterator that turns BER/CER/DER byte stream into ASN.1 objects.
1910
1911 On each iteration, consume whatever BER/CER/DER serialization is
1912 available in the `substrate` stream-like object and turns it into
1913 one or more, possibly nested, ASN.1 objects.
1914
1915 Parameters
1916 ----------
1917 substrate: :py:class:`file`, :py:class:`io.BytesIO`
1918 BER/CER/DER serialization in form of a byte stream
1919
1920 Keyword Args
1921 ------------
1922 asn1Spec: :py:class:`~pyasn1.type.base.PyAsn1Item`
1923 A pyasn1 type object to act as a template guiding the decoder.
1924 Depending on the ASN.1 structure being decoded, `asn1Spec` may
1925 or may not be required. One of the reasons why `asn1Spec` may
1926 me required is that ASN.1 structure is encoded in the *IMPLICIT*
1927 tagging mode.
1928
1929 Yields
1930 ------
1931 : :py:class:`~pyasn1.type.base.PyAsn1Item`, :py:class:`~pyasn1.error.SubstrateUnderrunError`
1932 Decoded ASN.1 object (possibly, nested) or
1933 :py:class:`~pyasn1.error.SubstrateUnderrunError` object indicating
1934 insufficient BER/CER/DER serialization on input to fully recover ASN.1
1935 objects from it.
1936
1937 In the latter case the caller is advised to ensure some more data in
1938 the input stream, then call the iterator again. The decoder will resume
1939 the decoding process using the newly arrived data.
1940
1941 The `context` property of :py:class:`~pyasn1.error.SubstrateUnderrunError`
1942 object might hold a reference to the partially populated ASN.1 object
1943 being reconstructed.
1944
1945 Raises
1946 ------
1947 ~pyasn1.error.PyAsn1Error, ~pyasn1.error.EndOfStreamError
1948 `PyAsn1Error` on deserialization error, `EndOfStreamError` on
1949 premature stream closure.
1950
1951 Examples
1952 --------
1953 Decode BER serialisation without ASN.1 schema
1954
1955 .. code-block:: pycon
1956
1957 >>> stream = io.BytesIO(
1958 ... b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03')
1959 >>>
1960 >>> for asn1Object in StreamingDecoder(stream):
1961 ... print(asn1Object)
1962 >>>
1963 SequenceOf:
1964 1 2 3
1965
1966 Decode BER serialisation with ASN.1 schema
1967
1968 .. code-block:: pycon
1969
1970 >>> stream = io.BytesIO(
1971 ... b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03')
1972 >>>
1973 >>> schema = SequenceOf(componentType=Integer())
1974 >>>
1975 >>> decoder = StreamingDecoder(stream, asn1Spec=schema)
1976 >>> for asn1Object in decoder:
1977 ... print(asn1Object)
1978 >>>
1979 SequenceOf:
1980 1 2 3
1981 """
1982
1983 SINGLE_ITEM_DECODER = SingleItemDecoder
1984
1985 def __init__(self, substrate, asn1Spec=None, **options):
1986 self._singleItemDecoder = self.SINGLE_ITEM_DECODER(**options)
1987 self._substrate = asSeekableStream(substrate)
1988 self._asn1Spec = asn1Spec
1989 self._options = options
1990
1991 def __iter__(self):
1992 while True:
1993 for asn1Object in self._singleItemDecoder(
1994 self._substrate, self._asn1Spec, **self._options):
1995 yield asn1Object
1996
1997 for chunk in isEndOfStream(self._substrate):
1998 if isinstance(chunk, SubstrateUnderrunError):
1999 yield
2000
2001 break
2002
2003 if chunk:
2004 break
2005
2006
2007class Decoder(object):
2008 """Create a BER decoder object.
2009
2010 Parse BER/CER/DER octet-stream into one, possibly nested, ASN.1 object.
2011 """
2012 STREAMING_DECODER = StreamingDecoder
2013
2014 @classmethod
2015 def __call__(cls, substrate, asn1Spec=None, **options):
2016 """Turns BER/CER/DER octet stream into an ASN.1 object.
2017
2018 Takes BER/CER/DER octet-stream in form of :py:class:`bytes`
2019 and decode it into an ASN.1 object
2020 (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) which
2021 may be a scalar or an arbitrary nested structure.
2022
2023 Parameters
2024 ----------
2025 substrate: :py:class:`bytes`
2026 BER/CER/DER octet-stream to parse
2027
2028 Keyword Args
2029 ------------
2030 asn1Spec: :py:class:`~pyasn1.type.base.PyAsn1Item`
2031 A pyasn1 type object (:py:class:`~pyasn1.type.base.PyAsn1Item`
2032 derivative) to act as a template guiding the decoder.
2033 Depending on the ASN.1 structure being decoded, `asn1Spec` may or
2034 may not be required. Most common reason for it to require is that
2035 ASN.1 structure is encoded in *IMPLICIT* tagging mode.
2036
2037 substrateFun: :py:class:`Union[
2038 Callable[[pyasn1.type.base.PyAsn1Item, bytes, int],
2039 Tuple[pyasn1.type.base.PyAsn1Item, bytes]],
2040 Callable[[pyasn1.type.base.PyAsn1Item, io.BytesIO, int, dict],
2041 Generator[Union[pyasn1.type.base.PyAsn1Item,
2042 pyasn1.error.SubstrateUnderrunError],
2043 None, None]]
2044 ]`
2045 User callback meant to generalize special use cases like non-recursive or
2046 partial decoding. A 3-arg non-streaming variant is supported for backwards
2047 compatiblilty in addition to the newer 4-arg streaming variant.
2048 The callback will receive the uninitialized object recovered from substrate
2049 as 1st argument, the uninterpreted payload as 2nd argument, and the length
2050 of the uninterpreted payload as 3rd argument. The streaming variant will
2051 additionally receive the decode(..., **options) kwargs as 4th argument.
2052 The non-streaming variant shall return an object that will be propagated
2053 as decode() return value as 1st item, and the remainig payload for further
2054 decode passes as 2nd item.
2055 The streaming variant shall yield an object that will be propagated as
2056 decode() return value, and leave the remaining payload in the stream.
2057
2058 Returns
2059 -------
2060 : :py:class:`tuple`
2061 A tuple of :py:class:`~pyasn1.type.base.PyAsn1Item` object
2062 recovered from BER/CER/DER substrate and the unprocessed trailing
2063 portion of the `substrate` (may be empty)
2064
2065 Raises
2066 ------
2067 : :py:class:`~pyasn1.error.PyAsn1Error`
2068 :py:class:`~pyasn1.error.SubstrateUnderrunError` on insufficient
2069 input or :py:class:`~pyasn1.error.PyAsn1Error` on decoding error.
2070
2071 Examples
2072 --------
2073 Decode BER/CER/DER serialisation without ASN.1 schema
2074
2075 .. code-block:: pycon
2076
2077 >>> s, unprocessed = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03')
2078 >>> str(s)
2079 SequenceOf:
2080 1 2 3
2081
2082 Decode BER/CER/DER serialisation with ASN.1 schema
2083
2084 .. code-block:: pycon
2085
2086 >>> seq = SequenceOf(componentType=Integer())
2087 >>> s, unprocessed = decode(
2088 b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03', asn1Spec=seq)
2089 >>> str(s)
2090 SequenceOf:
2091 1 2 3
2092
2093 """
2094 substrate = asSeekableStream(substrate)
2095
2096 if "substrateFun" in options:
2097 origSubstrateFun = options["substrateFun"]
2098
2099 def substrateFunWrapper(asn1Object, substrate, length, options=None):
2100 """Support both 0.4 and 0.5 style APIs.
2101
2102 substrateFun API has changed in 0.5 for use with streaming decoders. To stay backwards compatible,
2103 we first try if we received a streaming user callback. If that fails,we assume we've received a
2104 non-streaming v0.4 user callback and convert it for streaming on the fly
2105 """
2106 try:
2107 substrate_gen = origSubstrateFun(asn1Object, substrate, length, options)
2108 except TypeError as _value:
2109 if _value.__traceback__.tb_next:
2110 # Traceback depth > 1 means TypeError from inside user provided function
2111 raise
2112 # invariant maintained at Decoder.__call__ entry
2113 assert isinstance(substrate, io.BytesIO) # nosec assert_used
2114 substrate_gen = Decoder._callSubstrateFunV4asV5(origSubstrateFun, asn1Object, substrate, length)
2115 for value in substrate_gen:
2116 yield value
2117
2118 options["substrateFun"] = substrateFunWrapper
2119
2120 streamingDecoder = cls.STREAMING_DECODER(
2121 substrate, asn1Spec, **options)
2122
2123 for asn1Object in streamingDecoder:
2124 if isinstance(asn1Object, SubstrateUnderrunError):
2125 raise error.SubstrateUnderrunError('Short substrate on input')
2126
2127 try:
2128 tail = next(readFromStream(substrate))
2129
2130 except error.EndOfStreamError:
2131 tail = b''
2132
2133 return asn1Object, tail
2134
2135 @staticmethod
2136 def _callSubstrateFunV4asV5(substrateFunV4, asn1Object, substrate, length):
2137 substrate_bytes = substrate.read()
2138 if length == -1:
2139 length = len(substrate_bytes)
2140 value, nextSubstrate = substrateFunV4(asn1Object, substrate_bytes, length)
2141 nbytes = substrate.write(nextSubstrate)
2142 substrate.truncate()
2143 substrate.seek(-nbytes, os.SEEK_CUR)
2144 yield value
2145
2146#: Turns BER octet stream into an ASN.1 object.
2147#:
2148#: Takes BER octet-stream and decode it into an ASN.1 object
2149#: (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) which
2150#: may be a scalar or an arbitrary nested structure.
2151#:
2152#: Parameters
2153#: ----------
2154#: substrate: :py:class:`bytes`
2155#: BER octet-stream
2156#:
2157#: Keyword Args
2158#: ------------
2159#: asn1Spec: any pyasn1 type object e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative
2160#: A pyasn1 type object to act as a template guiding the decoder. Depending on the ASN.1 structure
2161#: being decoded, *asn1Spec* may or may not be required. Most common reason for
2162#: it to require is that ASN.1 structure is encoded in *IMPLICIT* tagging mode.
2163#:
2164#: Returns
2165#: -------
2166#: : :py:class:`tuple`
2167#: A tuple of pyasn1 object recovered from BER substrate (:py:class:`~pyasn1.type.base.PyAsn1Item` derivative)
2168#: and the unprocessed trailing portion of the *substrate* (may be empty)
2169#:
2170#: Raises
2171#: ------
2172#: ~pyasn1.error.PyAsn1Error, ~pyasn1.error.SubstrateUnderrunError
2173#: On decoding errors
2174#:
2175#: Notes
2176#: -----
2177#: This function is deprecated. Please use :py:class:`Decoder` or
2178#: :py:class:`StreamingDecoder` class instance.
2179#:
2180#: Examples
2181#: --------
2182#: Decode BER serialisation without ASN.1 schema
2183#:
2184#: .. code-block:: pycon
2185#:
2186#: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03')
2187#: >>> str(s)
2188#: SequenceOf:
2189#: 1 2 3
2190#:
2191#: Decode BER serialisation with ASN.1 schema
2192#:
2193#: .. code-block:: pycon
2194#:
2195#: >>> seq = SequenceOf(componentType=Integer())
2196#: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03', asn1Spec=seq)
2197#: >>> str(s)
2198#: SequenceOf:
2199#: 1 2 3
2200#:
2201decode = Decoder()
2202
2203def __getattr__(attr: str):
2204 if newAttr := {"tagMap": "TAG_MAP", "typeMap": "TYPE_MAP"}.get(attr):
2205 warnings.warn(f"{attr} is deprecated. Please use {newAttr} instead.", DeprecationWarning)
2206 return globals()[newAttr]
2207 raise AttributeError(attr)