1#
2# This file is part of pyasn1 software.
3#
4# Copyright (c) 2005-2020, Ilya Etingof <etingof@gmail.com>
5# License: https://pyasn1.readthedocs.io/en/latest/license.html
6#
7import io
8import os
9import sys
10import warnings
11
12from pyasn1 import debug
13from pyasn1 import error
14from pyasn1.codec.ber import eoo
15from pyasn1.codec.streaming import asSeekableStream
16from pyasn1.codec.streaming import isEndOfStream
17from pyasn1.codec.streaming import peekIntoStream
18from pyasn1.codec.streaming import readFromStream
19from pyasn1.compat import _MISSING
20from pyasn1.error import PyAsn1Error
21from pyasn1.type import base
22from pyasn1.type import char
23from pyasn1.type import tag
24from pyasn1.type import tagmap
25from pyasn1.type import univ
26from pyasn1.type import useful
27
28__all__ = ['StreamingDecoder', 'Decoder', 'decode']
29
30LOG = debug.registerLoggee(__name__, flags=debug.DEBUG_DECODER)
31
32noValue = base.noValue
33
34SubstrateUnderrunError = error.SubstrateUnderrunError
35
36
37class AbstractPayloadDecoder(object):
38 protoComponent = None
39
40 def valueDecoder(self, substrate, asn1Spec,
41 tagSet=None, length=None, state=None,
42 decodeFun=None, substrateFun=None,
43 **options):
44 """Decode value with fixed byte length.
45
46 The decoder is allowed to consume as many bytes as necessary.
47 """
48 raise error.PyAsn1Error('SingleItemDecoder not implemented for %s' % (tagSet,)) # TODO: Seems more like an NotImplementedError?
49
50 def indefLenValueDecoder(self, substrate, asn1Spec,
51 tagSet=None, length=None, state=None,
52 decodeFun=None, substrateFun=None,
53 **options):
54 """Decode value with undefined length.
55
56 The decoder is allowed to consume as many bytes as necessary.
57 """
58 raise error.PyAsn1Error('Indefinite length mode decoder not implemented for %s' % (tagSet,)) # TODO: Seems more like an NotImplementedError?
59
60 @staticmethod
61 def _passAsn1Object(asn1Object, options):
62 if 'asn1Object' not in options:
63 options['asn1Object'] = asn1Object
64
65 return options
66
67
68class AbstractSimplePayloadDecoder(AbstractPayloadDecoder):
69 @staticmethod
70 def substrateCollector(asn1Object, substrate, length, options):
71 for chunk in readFromStream(substrate, length, options):
72 yield chunk
73
74 def _createComponent(self, asn1Spec, tagSet, value, **options):
75 if options.get('native'):
76 return value
77 elif asn1Spec is None:
78 return self.protoComponent.clone(value, tagSet=tagSet)
79 elif value is noValue:
80 return asn1Spec
81 else:
82 return asn1Spec.clone(value)
83
84
85class RawPayloadDecoder(AbstractSimplePayloadDecoder):
86 protoComponent = univ.Any('')
87
88 def valueDecoder(self, substrate, asn1Spec,
89 tagSet=None, length=None, state=None,
90 decodeFun=None, substrateFun=None,
91 **options):
92 if substrateFun:
93 asn1Object = self._createComponent(asn1Spec, tagSet, '', **options)
94
95 for chunk in substrateFun(asn1Object, substrate, length, options):
96 yield chunk
97
98 return
99
100 for value in decodeFun(substrate, asn1Spec, tagSet, length, **options):
101 yield value
102
103 def indefLenValueDecoder(self, substrate, asn1Spec,
104 tagSet=None, length=None, state=None,
105 decodeFun=None, substrateFun=None,
106 **options):
107 if substrateFun:
108 asn1Object = self._createComponent(asn1Spec, tagSet, '', **options)
109
110 for chunk in substrateFun(asn1Object, substrate, length, options):
111 yield chunk
112
113 return
114
115 while True:
116 for value in decodeFun(
117 substrate, asn1Spec, tagSet, length,
118 allowEoo=True, **options):
119
120 if value is eoo.endOfOctets:
121 return
122
123 yield value
124
125
126rawPayloadDecoder = RawPayloadDecoder()
127
128
129class IntegerPayloadDecoder(AbstractSimplePayloadDecoder):
130 protoComponent = univ.Integer(0)
131
132 def valueDecoder(self, substrate, asn1Spec,
133 tagSet=None, length=None, state=None,
134 decodeFun=None, substrateFun=None,
135 **options):
136
137 if tagSet[0].tagFormat != tag.tagFormatSimple:
138 raise error.PyAsn1Error('Simple tag format expected')
139
140 for chunk in readFromStream(substrate, length, options):
141 if isinstance(chunk, SubstrateUnderrunError):
142 yield chunk
143
144 if chunk:
145 value = int.from_bytes(bytes(chunk), 'big', signed=True)
146
147 else:
148 value = 0
149
150 yield self._createComponent(asn1Spec, tagSet, value, **options)
151
152
153class BooleanPayloadDecoder(IntegerPayloadDecoder):
154 protoComponent = univ.Boolean(0)
155
156 def _createComponent(self, asn1Spec, tagSet, value, **options):
157 return IntegerPayloadDecoder._createComponent(
158 self, asn1Spec, tagSet, value and 1 or 0, **options)
159
160
161class BitStringPayloadDecoder(AbstractSimplePayloadDecoder):
162 protoComponent = univ.BitString(())
163 supportConstructedForm = True
164
165 def valueDecoder(self, substrate, asn1Spec,
166 tagSet=None, length=None, state=None,
167 decodeFun=None, substrateFun=None,
168 **options):
169
170 if substrateFun:
171 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options)
172
173 for chunk in substrateFun(asn1Object, substrate, length, options):
174 yield chunk
175
176 return
177
178 if not length:
179 raise error.PyAsn1Error('Empty BIT STRING substrate')
180
181 for chunk in isEndOfStream(substrate):
182 if isinstance(chunk, SubstrateUnderrunError):
183 yield chunk
184
185 if chunk:
186 raise error.PyAsn1Error('Empty BIT STRING substrate')
187
188 if tagSet[0].tagFormat == tag.tagFormatSimple: # XXX what tag to check?
189
190 for trailingBits in readFromStream(substrate, 1, options):
191 if isinstance(trailingBits, SubstrateUnderrunError):
192 yield trailingBits
193
194 trailingBits = ord(trailingBits)
195 if trailingBits > 7:
196 raise error.PyAsn1Error(
197 'Trailing bits overflow %s' % trailingBits
198 )
199
200 for chunk in readFromStream(substrate, length - 1, options):
201 if isinstance(chunk, SubstrateUnderrunError):
202 yield chunk
203
204 value = self.protoComponent.fromOctetString(
205 chunk, internalFormat=True, padding=trailingBits)
206
207 yield self._createComponent(asn1Spec, tagSet, value, **options)
208
209 return
210
211 if not self.supportConstructedForm:
212 raise error.PyAsn1Error('Constructed encoding form prohibited '
213 'at %s' % self.__class__.__name__)
214
215 if LOG:
216 LOG('assembling constructed serialization')
217
218 # All inner fragments are of the same type, treat them as octet string
219 substrateFun = self.substrateCollector
220
221 bitString = self.protoComponent.fromOctetString(b'', internalFormat=True)
222
223 current_position = substrate.tell()
224
225 while substrate.tell() - current_position < length:
226 for component in decodeFun(
227 substrate, self.protoComponent, substrateFun=substrateFun,
228 **options):
229 if isinstance(component, SubstrateUnderrunError):
230 yield component
231
232 trailingBits = component[0]
233 if trailingBits > 7:
234 raise error.PyAsn1Error(
235 'Trailing bits overflow %s' % trailingBits
236 )
237
238 bitString = self.protoComponent.fromOctetString(
239 component[1:], internalFormat=True,
240 prepend=bitString, padding=trailingBits
241 )
242
243 yield self._createComponent(asn1Spec, tagSet, bitString, **options)
244
245 def indefLenValueDecoder(self, substrate, asn1Spec,
246 tagSet=None, length=None, state=None,
247 decodeFun=None, substrateFun=None,
248 **options):
249
250 if substrateFun:
251 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options)
252
253 for chunk in substrateFun(asn1Object, substrate, length, options):
254 yield chunk
255
256 return
257
258 # All inner fragments are of the same type, treat them as octet string
259 substrateFun = self.substrateCollector
260
261 bitString = self.protoComponent.fromOctetString(b'', internalFormat=True)
262
263 while True: # loop over fragments
264
265 for component in decodeFun(
266 substrate, self.protoComponent, substrateFun=substrateFun,
267 allowEoo=True, **options):
268
269 if component is eoo.endOfOctets:
270 break
271
272 if isinstance(component, SubstrateUnderrunError):
273 yield component
274
275 if component is eoo.endOfOctets:
276 break
277
278 trailingBits = component[0]
279 if trailingBits > 7:
280 raise error.PyAsn1Error(
281 'Trailing bits overflow %s' % trailingBits
282 )
283
284 bitString = self.protoComponent.fromOctetString(
285 component[1:], internalFormat=True,
286 prepend=bitString, padding=trailingBits
287 )
288
289 yield self._createComponent(asn1Spec, tagSet, bitString, **options)
290
291
292class OctetStringPayloadDecoder(AbstractSimplePayloadDecoder):
293 protoComponent = univ.OctetString('')
294 supportConstructedForm = True
295
296 def valueDecoder(self, substrate, asn1Spec,
297 tagSet=None, length=None, state=None,
298 decodeFun=None, substrateFun=None,
299 **options):
300 if substrateFun:
301 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options)
302
303 for chunk in substrateFun(asn1Object, substrate, length, options):
304 yield chunk
305
306 return
307
308 if tagSet[0].tagFormat == tag.tagFormatSimple: # XXX what tag to check?
309 for chunk in readFromStream(substrate, length, options):
310 if isinstance(chunk, SubstrateUnderrunError):
311 yield chunk
312
313 yield self._createComponent(asn1Spec, tagSet, chunk, **options)
314
315 return
316
317 if not self.supportConstructedForm:
318 raise error.PyAsn1Error('Constructed encoding form prohibited at %s' % self.__class__.__name__)
319
320 if LOG:
321 LOG('assembling constructed serialization')
322
323 # All inner fragments are of the same type, treat them as octet string
324 substrateFun = self.substrateCollector
325
326 header = b''
327
328 original_position = substrate.tell()
329 # head = popSubstream(substrate, length)
330 while substrate.tell() - original_position < length:
331 for component in decodeFun(
332 substrate, self.protoComponent, substrateFun=substrateFun,
333 **options):
334 if isinstance(component, SubstrateUnderrunError):
335 yield component
336
337 header += component
338
339 yield self._createComponent(asn1Spec, tagSet, header, **options)
340
341 def indefLenValueDecoder(self, substrate, asn1Spec,
342 tagSet=None, length=None, state=None,
343 decodeFun=None, substrateFun=None,
344 **options):
345 if substrateFun and substrateFun is not self.substrateCollector:
346 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options)
347
348 for chunk in substrateFun(asn1Object, substrate, length, options):
349 yield chunk
350
351 return
352
353 # All inner fragments are of the same type, treat them as octet string
354 substrateFun = self.substrateCollector
355
356 header = b''
357
358 while True: # loop over fragments
359
360 for component in decodeFun(
361 substrate, self.protoComponent, substrateFun=substrateFun,
362 allowEoo=True, **options):
363
364 if isinstance(component, SubstrateUnderrunError):
365 yield component
366
367 if component is eoo.endOfOctets:
368 break
369
370 if component is eoo.endOfOctets:
371 break
372
373 header += component
374
375 yield self._createComponent(asn1Spec, tagSet, header, **options)
376
377
378class NullPayloadDecoder(AbstractSimplePayloadDecoder):
379 protoComponent = univ.Null('')
380
381 def valueDecoder(self, substrate, asn1Spec,
382 tagSet=None, length=None, state=None,
383 decodeFun=None, substrateFun=None,
384 **options):
385
386 if tagSet[0].tagFormat != tag.tagFormatSimple:
387 raise error.PyAsn1Error('Simple tag format expected')
388
389 for chunk in readFromStream(substrate, length, options):
390 if isinstance(chunk, SubstrateUnderrunError):
391 yield chunk
392
393 component = self._createComponent(asn1Spec, tagSet, '', **options)
394
395 if chunk:
396 raise error.PyAsn1Error('Unexpected %d-octet substrate for Null' % length)
397
398 yield component
399
400
401class ObjectIdentifierPayloadDecoder(AbstractSimplePayloadDecoder):
402 protoComponent = univ.ObjectIdentifier(())
403
404 def valueDecoder(self, substrate, asn1Spec,
405 tagSet=None, length=None, state=None,
406 decodeFun=None, substrateFun=None,
407 **options):
408 if tagSet[0].tagFormat != tag.tagFormatSimple:
409 raise error.PyAsn1Error('Simple tag format expected')
410
411 for chunk in readFromStream(substrate, length, options):
412 if isinstance(chunk, SubstrateUnderrunError):
413 yield chunk
414
415 if not chunk:
416 raise error.PyAsn1Error('Empty substrate')
417
418 oid = ()
419 index = 0
420 substrateLen = len(chunk)
421 while index < substrateLen:
422 subId = chunk[index]
423 index += 1
424 if subId < 128:
425 oid += (subId,)
426 elif subId > 128:
427 # Construct subid from a number of octets
428 nextSubId = subId
429 subId = 0
430 while nextSubId >= 128:
431 subId = (subId << 7) + (nextSubId & 0x7F)
432 if index >= substrateLen:
433 raise error.SubstrateUnderrunError(
434 'Short substrate for sub-OID past %s' % (oid,)
435 )
436 nextSubId = chunk[index]
437 index += 1
438 oid += ((subId << 7) + nextSubId,)
439 elif subId == 128:
440 # ASN.1 spec forbids leading zeros (0x80) in OID
441 # encoding, tolerating it opens a vulnerability. See
442 # https://www.esat.kuleuven.be/cosic/publications/article-1432.pdf
443 # page 7
444 raise error.PyAsn1Error('Invalid octet 0x80 in OID encoding')
445
446 # Decode two leading arcs
447 if 0 <= oid[0] <= 39:
448 oid = (0,) + oid
449 elif 40 <= oid[0] <= 79:
450 oid = (1, oid[0] - 40) + oid[1:]
451 elif oid[0] >= 80:
452 oid = (2, oid[0] - 80) + oid[1:]
453 else:
454 raise error.PyAsn1Error('Malformed first OID octet: %s' % chunk[0])
455
456 yield self._createComponent(asn1Spec, tagSet, oid, **options)
457
458
459class RelativeOIDPayloadDecoder(AbstractSimplePayloadDecoder):
460 protoComponent = univ.RelativeOID(())
461
462 def valueDecoder(self, substrate, asn1Spec,
463 tagSet=None, length=None, state=None,
464 decodeFun=None, substrateFun=None,
465 **options):
466 if tagSet[0].tagFormat != tag.tagFormatSimple:
467 raise error.PyAsn1Error('Simple tag format expected')
468
469 for chunk in readFromStream(substrate, length, options):
470 if isinstance(chunk, SubstrateUnderrunError):
471 yield chunk
472
473 if not chunk:
474 raise error.PyAsn1Error('Empty substrate')
475
476 reloid = ()
477 index = 0
478 substrateLen = len(chunk)
479 while index < substrateLen:
480 subId = chunk[index]
481 index += 1
482 if subId < 128:
483 reloid += (subId,)
484 elif subId > 128:
485 # Construct subid from a number of octets
486 nextSubId = subId
487 subId = 0
488 while nextSubId >= 128:
489 subId = (subId << 7) + (nextSubId & 0x7F)
490 if index >= substrateLen:
491 raise error.SubstrateUnderrunError(
492 'Short substrate for sub-OID past %s' % (reloid,)
493 )
494 nextSubId = chunk[index]
495 index += 1
496 reloid += ((subId << 7) + nextSubId,)
497 elif subId == 128:
498 # ASN.1 spec forbids leading zeros (0x80) in OID
499 # encoding, tolerating it opens a vulnerability. See
500 # https://www.esat.kuleuven.be/cosic/publications/article-1432.pdf
501 # page 7
502 raise error.PyAsn1Error('Invalid octet 0x80 in RELATIVE-OID encoding')
503
504 yield self._createComponent(asn1Spec, tagSet, reloid, **options)
505
506
507class RealPayloadDecoder(AbstractSimplePayloadDecoder):
508 protoComponent = univ.Real()
509
510 def valueDecoder(self, substrate, asn1Spec,
511 tagSet=None, length=None, state=None,
512 decodeFun=None, substrateFun=None,
513 **options):
514 if tagSet[0].tagFormat != tag.tagFormatSimple:
515 raise error.PyAsn1Error('Simple tag format expected')
516
517 for chunk in readFromStream(substrate, length, options):
518 if isinstance(chunk, SubstrateUnderrunError):
519 yield chunk
520
521 if not chunk:
522 yield self._createComponent(asn1Spec, tagSet, 0.0, **options)
523 return
524
525 fo = chunk[0]
526 chunk = chunk[1:]
527 if fo & 0x80: # binary encoding
528 if not chunk:
529 raise error.PyAsn1Error("Incomplete floating-point value")
530
531 if LOG:
532 LOG('decoding binary encoded REAL')
533
534 n = (fo & 0x03) + 1
535
536 if n == 4:
537 n = chunk[0]
538 chunk = chunk[1:]
539
540 eo, chunk = chunk[:n], chunk[n:]
541
542 if not eo or not chunk:
543 raise error.PyAsn1Error('Real exponent screwed')
544
545 e = eo[0] & 0x80 and -1 or 0
546
547 while eo: # exponent
548 e <<= 8
549 e |= eo[0]
550 eo = eo[1:]
551
552 b = fo >> 4 & 0x03 # base bits
553
554 if b > 2:
555 raise error.PyAsn1Error('Illegal Real base')
556
557 if b == 1: # encbase = 8
558 e *= 3
559
560 elif b == 2: # encbase = 16
561 e *= 4
562 p = 0
563
564 while chunk: # value
565 p <<= 8
566 p |= chunk[0]
567 chunk = chunk[1:]
568
569 if fo & 0x40: # sign bit
570 p = -p
571
572 sf = fo >> 2 & 0x03 # scale bits
573 p *= 2 ** sf
574 value = (p, 2, e)
575
576 elif fo & 0x40: # infinite value
577 if LOG:
578 LOG('decoding infinite REAL')
579
580 value = fo & 0x01 and '-inf' or 'inf'
581
582 elif fo & 0xc0 == 0: # character encoding
583 if not chunk:
584 raise error.PyAsn1Error("Incomplete floating-point value")
585
586 if LOG:
587 LOG('decoding character encoded REAL')
588
589 try:
590 if fo & 0x3 == 0x1: # NR1
591 value = (int(chunk), 10, 0)
592
593 elif fo & 0x3 == 0x2: # NR2
594 value = float(chunk)
595
596 elif fo & 0x3 == 0x3: # NR3
597 value = float(chunk)
598
599 else:
600 raise error.SubstrateUnderrunError(
601 'Unknown NR (tag %s)' % fo
602 )
603
604 except ValueError:
605 raise error.SubstrateUnderrunError(
606 'Bad character Real syntax'
607 )
608
609 else:
610 raise error.SubstrateUnderrunError(
611 'Unknown encoding (tag %s)' % fo
612 )
613
614 yield self._createComponent(asn1Spec, tagSet, value, **options)
615
616
617class AbstractConstructedPayloadDecoder(AbstractPayloadDecoder):
618 protoComponent = None
619
620
621class ConstructedPayloadDecoderBase(AbstractConstructedPayloadDecoder):
622 protoRecordComponent = None
623 protoSequenceComponent = None
624
625 def _getComponentTagMap(self, asn1Object, idx):
626 raise NotImplementedError
627
628 def _getComponentPositionByType(self, asn1Object, tagSet, idx):
629 raise NotImplementedError
630
631 def _decodeComponentsSchemaless(
632 self, substrate, tagSet=None, decodeFun=None,
633 length=None, **options):
634
635 asn1Object = None
636
637 components = []
638 componentTypes = set()
639
640 original_position = substrate.tell()
641
642 while length == -1 or substrate.tell() < original_position + length:
643 for component in decodeFun(substrate, **options):
644 if isinstance(component, SubstrateUnderrunError):
645 yield component
646
647 if length == -1 and component is eoo.endOfOctets:
648 break
649
650 components.append(component)
651 componentTypes.add(component.tagSet)
652
653 # Now we have to guess is it SEQUENCE/SET or SEQUENCE OF/SET OF
654 # The heuristics is:
655 # * 1+ components of different types -> likely SEQUENCE/SET
656 # * otherwise -> likely SEQUENCE OF/SET OF
657 if len(componentTypes) > 1:
658 protoComponent = self.protoRecordComponent
659
660 else:
661 protoComponent = self.protoSequenceComponent
662
663 asn1Object = protoComponent.clone(
664 # construct tagSet from base tag from prototype ASN.1 object
665 # and additional tags recovered from the substrate
666 tagSet=tag.TagSet(protoComponent.tagSet.baseTag, *tagSet.superTags)
667 )
668
669 if LOG:
670 LOG('guessed %r container type (pass `asn1Spec` to guide the '
671 'decoder)' % asn1Object)
672
673 for idx, component in enumerate(components):
674 asn1Object.setComponentByPosition(
675 idx, component,
676 verifyConstraints=False,
677 matchTags=False, matchConstraints=False
678 )
679
680 yield asn1Object
681
682 def valueDecoder(self, substrate, asn1Spec,
683 tagSet=None, length=None, state=None,
684 decodeFun=None, substrateFun=None,
685 **options):
686 if tagSet[0].tagFormat != tag.tagFormatConstructed:
687 raise error.PyAsn1Error('Constructed tag format expected')
688
689 original_position = substrate.tell()
690
691 if substrateFun:
692 if asn1Spec is not None:
693 asn1Object = asn1Spec.clone()
694
695 elif self.protoComponent is not None:
696 asn1Object = self.protoComponent.clone(tagSet=tagSet)
697
698 else:
699 asn1Object = self.protoRecordComponent, self.protoSequenceComponent
700
701 for chunk in substrateFun(asn1Object, substrate, length, options):
702 yield chunk
703
704 return
705
706 if asn1Spec is None:
707 for asn1Object in self._decodeComponentsSchemaless(
708 substrate, tagSet=tagSet, decodeFun=decodeFun,
709 length=length, **options):
710 if isinstance(asn1Object, SubstrateUnderrunError):
711 yield asn1Object
712
713 if substrate.tell() < original_position + length:
714 if LOG:
715 for trailing in readFromStream(substrate, context=options):
716 if isinstance(trailing, SubstrateUnderrunError):
717 yield trailing
718
719 LOG('Unused trailing %d octets encountered: %s' % (
720 len(trailing), debug.hexdump(trailing)))
721
722 yield asn1Object
723
724 return
725
726 asn1Object = asn1Spec.clone()
727 asn1Object.clear()
728
729 options = self._passAsn1Object(asn1Object, options)
730
731 if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId):
732
733 namedTypes = asn1Spec.componentType
734
735 isSetType = asn1Spec.typeId == univ.Set.typeId
736 isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault
737
738 if LOG:
739 LOG('decoding %sdeterministic %s type %r chosen by type ID' % (
740 not isDeterministic and 'non-' or '', isSetType and 'SET' or '',
741 asn1Spec))
742
743 seenIndices = set()
744 idx = 0
745 while substrate.tell() - original_position < length:
746 if not namedTypes:
747 componentType = None
748
749 elif isSetType:
750 componentType = namedTypes.tagMapUnique
751
752 else:
753 try:
754 if isDeterministic:
755 componentType = namedTypes[idx].asn1Object
756
757 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
758 componentType = namedTypes.getTagMapNearPosition(idx)
759
760 else:
761 componentType = namedTypes[idx].asn1Object
762
763 except IndexError:
764 raise error.PyAsn1Error(
765 'Excessive components decoded at %r' % (asn1Spec,)
766 )
767
768 for component in decodeFun(substrate, componentType, **options):
769 if isinstance(component, SubstrateUnderrunError):
770 yield component
771
772 if not isDeterministic and namedTypes:
773 if isSetType:
774 idx = namedTypes.getPositionByType(component.effectiveTagSet)
775
776 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
777 idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx)
778
779 asn1Object.setComponentByPosition(
780 idx, component,
781 verifyConstraints=False,
782 matchTags=False, matchConstraints=False
783 )
784
785 seenIndices.add(idx)
786 idx += 1
787
788 if LOG:
789 LOG('seen component indices %s' % seenIndices)
790
791 if namedTypes:
792 if not namedTypes.requiredComponents.issubset(seenIndices):
793 raise error.PyAsn1Error(
794 'ASN.1 object %s has uninitialized '
795 'components' % asn1Object.__class__.__name__)
796
797 if namedTypes.hasOpenTypes:
798
799 openTypes = options.get('openTypes', {})
800
801 if LOG:
802 LOG('user-specified open types map:')
803
804 for k, v in openTypes.items():
805 LOG('%s -> %r' % (k, v))
806
807 if openTypes or options.get('decodeOpenTypes', False):
808
809 for idx, namedType in enumerate(namedTypes.namedTypes):
810 if not namedType.openType:
811 continue
812
813 if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue:
814 continue
815
816 governingValue = asn1Object.getComponentByName(
817 namedType.openType.name
818 )
819
820 try:
821 openType = openTypes[governingValue]
822
823 except KeyError:
824
825 if LOG:
826 LOG('default open types map of component '
827 '"%s.%s" governed by component "%s.%s"'
828 ':' % (asn1Object.__class__.__name__,
829 namedType.name,
830 asn1Object.__class__.__name__,
831 namedType.openType.name))
832
833 for k, v in namedType.openType.items():
834 LOG('%s -> %r' % (k, v))
835
836 try:
837 openType = namedType.openType[governingValue]
838
839 except KeyError:
840 if LOG:
841 LOG('failed to resolve open type by governing '
842 'value %r' % (governingValue,))
843 continue
844
845 if LOG:
846 LOG('resolved open type %r by governing '
847 'value %r' % (openType, governingValue))
848
849 containerValue = asn1Object.getComponentByPosition(idx)
850
851 if containerValue.typeId in (
852 univ.SetOf.typeId, univ.SequenceOf.typeId):
853
854 for pos, containerElement in enumerate(
855 containerValue):
856
857 stream = asSeekableStream(containerValue[pos].asOctets())
858
859 for component in decodeFun(stream, asn1Spec=openType, **options):
860 if isinstance(component, SubstrateUnderrunError):
861 yield component
862
863 containerValue[pos] = component
864
865 else:
866 stream = asSeekableStream(asn1Object.getComponentByPosition(idx).asOctets())
867
868 for component in decodeFun(stream, asn1Spec=openType, **options):
869 if isinstance(component, SubstrateUnderrunError):
870 yield component
871
872 asn1Object.setComponentByPosition(idx, component)
873
874 else:
875 inconsistency = asn1Object.isInconsistent
876 if inconsistency:
877 raise error.PyAsn1Error(
878 f"ASN.1 object {asn1Object.__class__.__name__} is inconsistent")
879
880 else:
881 componentType = asn1Spec.componentType
882
883 if LOG:
884 LOG('decoding type %r chosen by given `asn1Spec`' % componentType)
885
886 idx = 0
887
888 while substrate.tell() - original_position < length:
889 for component in decodeFun(substrate, componentType, **options):
890 if isinstance(component, SubstrateUnderrunError):
891 yield component
892
893 asn1Object.setComponentByPosition(
894 idx, component,
895 verifyConstraints=False,
896 matchTags=False, matchConstraints=False
897 )
898
899 idx += 1
900
901 yield asn1Object
902
903 def indefLenValueDecoder(self, substrate, asn1Spec,
904 tagSet=None, length=None, state=None,
905 decodeFun=None, substrateFun=None,
906 **options):
907 if tagSet[0].tagFormat != tag.tagFormatConstructed:
908 raise error.PyAsn1Error('Constructed tag format expected')
909
910 if substrateFun is not None:
911 if asn1Spec is not None:
912 asn1Object = asn1Spec.clone()
913
914 elif self.protoComponent is not None:
915 asn1Object = self.protoComponent.clone(tagSet=tagSet)
916
917 else:
918 asn1Object = self.protoRecordComponent, self.protoSequenceComponent
919
920 for chunk in substrateFun(asn1Object, substrate, length, options):
921 yield chunk
922
923 return
924
925 if asn1Spec is None:
926 for asn1Object in self._decodeComponentsSchemaless(
927 substrate, tagSet=tagSet, decodeFun=decodeFun,
928 length=length, **dict(options, allowEoo=True)):
929 if isinstance(asn1Object, SubstrateUnderrunError):
930 yield asn1Object
931
932 yield asn1Object
933
934 return
935
936 asn1Object = asn1Spec.clone()
937 asn1Object.clear()
938
939 options = self._passAsn1Object(asn1Object, options)
940
941 if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId):
942
943 namedTypes = asn1Object.componentType
944
945 isSetType = asn1Object.typeId == univ.Set.typeId
946 isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault
947
948 if LOG:
949 LOG('decoding %sdeterministic %s type %r chosen by type ID' % (
950 not isDeterministic and 'non-' or '', isSetType and 'SET' or '',
951 asn1Spec))
952
953 seenIndices = set()
954
955 idx = 0
956
957 while True: # loop over components
958 if len(namedTypes) <= idx:
959 asn1Spec = None
960
961 elif isSetType:
962 asn1Spec = namedTypes.tagMapUnique
963
964 else:
965 try:
966 if isDeterministic:
967 asn1Spec = namedTypes[idx].asn1Object
968
969 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
970 asn1Spec = namedTypes.getTagMapNearPosition(idx)
971
972 else:
973 asn1Spec = namedTypes[idx].asn1Object
974
975 except IndexError:
976 raise error.PyAsn1Error(
977 'Excessive components decoded at %r' % (asn1Object,)
978 )
979
980 for component in decodeFun(substrate, asn1Spec, allowEoo=True, **options):
981
982 if isinstance(component, SubstrateUnderrunError):
983 yield component
984
985 if component is eoo.endOfOctets:
986 break
987
988 if component is eoo.endOfOctets:
989 break
990
991 if not isDeterministic and namedTypes:
992 if isSetType:
993 idx = namedTypes.getPositionByType(component.effectiveTagSet)
994
995 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted:
996 idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx)
997
998 asn1Object.setComponentByPosition(
999 idx, component,
1000 verifyConstraints=False,
1001 matchTags=False, matchConstraints=False
1002 )
1003
1004 seenIndices.add(idx)
1005 idx += 1
1006
1007 if LOG:
1008 LOG('seen component indices %s' % seenIndices)
1009
1010 if namedTypes:
1011 if not namedTypes.requiredComponents.issubset(seenIndices):
1012 raise error.PyAsn1Error(
1013 'ASN.1 object %s has uninitialized '
1014 'components' % asn1Object.__class__.__name__)
1015
1016 if namedTypes.hasOpenTypes:
1017
1018 openTypes = options.get('openTypes', {})
1019
1020 if LOG:
1021 LOG('user-specified open types map:')
1022
1023 for k, v in openTypes.items():
1024 LOG('%s -> %r' % (k, v))
1025
1026 if openTypes or options.get('decodeOpenTypes', False):
1027
1028 for idx, namedType in enumerate(namedTypes.namedTypes):
1029 if not namedType.openType:
1030 continue
1031
1032 if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue:
1033 continue
1034
1035 governingValue = asn1Object.getComponentByName(
1036 namedType.openType.name
1037 )
1038
1039 try:
1040 openType = openTypes[governingValue]
1041
1042 except KeyError:
1043
1044 if LOG:
1045 LOG('default open types map of component '
1046 '"%s.%s" governed by component "%s.%s"'
1047 ':' % (asn1Object.__class__.__name__,
1048 namedType.name,
1049 asn1Object.__class__.__name__,
1050 namedType.openType.name))
1051
1052 for k, v in namedType.openType.items():
1053 LOG('%s -> %r' % (k, v))
1054
1055 try:
1056 openType = namedType.openType[governingValue]
1057
1058 except KeyError:
1059 if LOG:
1060 LOG('failed to resolve open type by governing '
1061 'value %r' % (governingValue,))
1062 continue
1063
1064 if LOG:
1065 LOG('resolved open type %r by governing '
1066 'value %r' % (openType, governingValue))
1067
1068 containerValue = asn1Object.getComponentByPosition(idx)
1069
1070 if containerValue.typeId in (
1071 univ.SetOf.typeId, univ.SequenceOf.typeId):
1072
1073 for pos, containerElement in enumerate(
1074 containerValue):
1075
1076 stream = asSeekableStream(containerValue[pos].asOctets())
1077
1078 for component in decodeFun(stream, asn1Spec=openType,
1079 **dict(options, allowEoo=True)):
1080 if isinstance(component, SubstrateUnderrunError):
1081 yield component
1082
1083 if component is eoo.endOfOctets:
1084 break
1085
1086 containerValue[pos] = component
1087
1088 else:
1089 stream = asSeekableStream(asn1Object.getComponentByPosition(idx).asOctets())
1090 for component in decodeFun(stream, asn1Spec=openType,
1091 **dict(options, allowEoo=True)):
1092 if isinstance(component, SubstrateUnderrunError):
1093 yield component
1094
1095 if component is eoo.endOfOctets:
1096 break
1097
1098 asn1Object.setComponentByPosition(idx, component)
1099
1100 else:
1101 inconsistency = asn1Object.isInconsistent
1102 if inconsistency:
1103 raise error.PyAsn1Error(
1104 f"ASN.1 object {asn1Object.__class__.__name__} is inconsistent")
1105
1106 else:
1107 componentType = asn1Spec.componentType
1108
1109 if LOG:
1110 LOG('decoding type %r chosen by given `asn1Spec`' % componentType)
1111
1112 idx = 0
1113
1114 while True:
1115
1116 for component in decodeFun(
1117 substrate, componentType, allowEoo=True, **options):
1118
1119 if isinstance(component, SubstrateUnderrunError):
1120 yield component
1121
1122 if component is eoo.endOfOctets:
1123 break
1124
1125 if component is eoo.endOfOctets:
1126 break
1127
1128 asn1Object.setComponentByPosition(
1129 idx, component,
1130 verifyConstraints=False,
1131 matchTags=False, matchConstraints=False
1132 )
1133
1134 idx += 1
1135
1136 yield asn1Object
1137
1138
1139class SequenceOrSequenceOfPayloadDecoder(ConstructedPayloadDecoderBase):
1140 protoRecordComponent = univ.Sequence()
1141 protoSequenceComponent = univ.SequenceOf()
1142
1143
1144class SequencePayloadDecoder(SequenceOrSequenceOfPayloadDecoder):
1145 protoComponent = univ.Sequence()
1146
1147
1148class SequenceOfPayloadDecoder(SequenceOrSequenceOfPayloadDecoder):
1149 protoComponent = univ.SequenceOf()
1150
1151
1152class SetOrSetOfPayloadDecoder(ConstructedPayloadDecoderBase):
1153 protoRecordComponent = univ.Set()
1154 protoSequenceComponent = univ.SetOf()
1155
1156
1157class SetPayloadDecoder(SetOrSetOfPayloadDecoder):
1158 protoComponent = univ.Set()
1159
1160
1161class SetOfPayloadDecoder(SetOrSetOfPayloadDecoder):
1162 protoComponent = univ.SetOf()
1163
1164
1165class ChoicePayloadDecoder(ConstructedPayloadDecoderBase):
1166 protoComponent = univ.Choice()
1167
1168 def valueDecoder(self, substrate, asn1Spec,
1169 tagSet=None, length=None, state=None,
1170 decodeFun=None, substrateFun=None,
1171 **options):
1172 if asn1Spec is None:
1173 asn1Object = self.protoComponent.clone(tagSet=tagSet)
1174
1175 else:
1176 asn1Object = asn1Spec.clone()
1177
1178 if substrateFun:
1179 for chunk in substrateFun(asn1Object, substrate, length, options):
1180 yield chunk
1181
1182 return
1183
1184 options = self._passAsn1Object(asn1Object, options)
1185
1186 if asn1Object.tagSet == tagSet:
1187 if LOG:
1188 LOG('decoding %s as explicitly tagged CHOICE' % (tagSet,))
1189
1190 for component in decodeFun(
1191 substrate, asn1Object.componentTagMap, **options):
1192 if isinstance(component, SubstrateUnderrunError):
1193 yield component
1194
1195 else:
1196 if LOG:
1197 LOG('decoding %s as untagged CHOICE' % (tagSet,))
1198
1199 for component in decodeFun(
1200 substrate, asn1Object.componentTagMap, tagSet, length,
1201 state, **options):
1202 if isinstance(component, SubstrateUnderrunError):
1203 yield component
1204
1205 effectiveTagSet = component.effectiveTagSet
1206
1207 if LOG:
1208 LOG('decoded component %s, effective tag set %s' % (component, effectiveTagSet))
1209
1210 asn1Object.setComponentByType(
1211 effectiveTagSet, component,
1212 verifyConstraints=False,
1213 matchTags=False, matchConstraints=False,
1214 innerFlag=False
1215 )
1216
1217 yield asn1Object
1218
1219 def indefLenValueDecoder(self, substrate, asn1Spec,
1220 tagSet=None, length=None, state=None,
1221 decodeFun=None, substrateFun=None,
1222 **options):
1223 if asn1Spec is None:
1224 asn1Object = self.protoComponent.clone(tagSet=tagSet)
1225
1226 else:
1227 asn1Object = asn1Spec.clone()
1228
1229 if substrateFun:
1230 for chunk in substrateFun(asn1Object, substrate, length, options):
1231 yield chunk
1232
1233 return
1234
1235 options = self._passAsn1Object(asn1Object, options)
1236
1237 isTagged = asn1Object.tagSet == tagSet
1238
1239 if LOG:
1240 LOG('decoding %s as %stagged CHOICE' % (
1241 tagSet, isTagged and 'explicitly ' or 'un'))
1242
1243 while True:
1244
1245 if isTagged:
1246 iterator = decodeFun(
1247 substrate, asn1Object.componentType.tagMapUnique,
1248 **dict(options, allowEoo=True))
1249
1250 else:
1251 iterator = decodeFun(
1252 substrate, asn1Object.componentType.tagMapUnique,
1253 tagSet, length, state, **dict(options, allowEoo=True))
1254
1255 for component in iterator:
1256
1257 if isinstance(component, SubstrateUnderrunError):
1258 yield component
1259
1260 if component is eoo.endOfOctets:
1261 break
1262
1263 effectiveTagSet = component.effectiveTagSet
1264
1265 if LOG:
1266 LOG('decoded component %s, effective tag set '
1267 '%s' % (component, effectiveTagSet))
1268
1269 asn1Object.setComponentByType(
1270 effectiveTagSet, component,
1271 verifyConstraints=False,
1272 matchTags=False, matchConstraints=False,
1273 innerFlag=False
1274 )
1275
1276 if not isTagged:
1277 break
1278
1279 if not isTagged or component is eoo.endOfOctets:
1280 break
1281
1282 yield asn1Object
1283
1284
1285class AnyPayloadDecoder(AbstractSimplePayloadDecoder):
1286 protoComponent = univ.Any()
1287
1288 def valueDecoder(self, substrate, asn1Spec,
1289 tagSet=None, length=None, state=None,
1290 decodeFun=None, substrateFun=None,
1291 **options):
1292 if asn1Spec is None:
1293 isUntagged = True
1294
1295 elif asn1Spec.__class__ is tagmap.TagMap:
1296 isUntagged = tagSet not in asn1Spec.tagMap
1297
1298 else:
1299 isUntagged = tagSet != asn1Spec.tagSet
1300
1301 if isUntagged:
1302 fullPosition = substrate.markedPosition
1303 currentPosition = substrate.tell()
1304
1305 substrate.seek(fullPosition, os.SEEK_SET)
1306 length += currentPosition - fullPosition
1307
1308 if LOG:
1309 for chunk in peekIntoStream(substrate, length):
1310 if isinstance(chunk, SubstrateUnderrunError):
1311 yield chunk
1312 LOG('decoding as untagged ANY, substrate '
1313 '%s' % debug.hexdump(chunk))
1314
1315 if substrateFun:
1316 for chunk in substrateFun(
1317 self._createComponent(asn1Spec, tagSet, noValue, **options),
1318 substrate, length, options):
1319 yield chunk
1320
1321 return
1322
1323 for chunk in readFromStream(substrate, length, options):
1324 if isinstance(chunk, SubstrateUnderrunError):
1325 yield chunk
1326
1327 yield self._createComponent(asn1Spec, tagSet, chunk, **options)
1328
1329 def indefLenValueDecoder(self, substrate, asn1Spec,
1330 tagSet=None, length=None, state=None,
1331 decodeFun=None, substrateFun=None,
1332 **options):
1333 if asn1Spec is None:
1334 isTagged = False
1335
1336 elif asn1Spec.__class__ is tagmap.TagMap:
1337 isTagged = tagSet in asn1Spec.tagMap
1338
1339 else:
1340 isTagged = tagSet == asn1Spec.tagSet
1341
1342 if isTagged:
1343 # tagged Any type -- consume header substrate
1344 chunk = b''
1345
1346 if LOG:
1347 LOG('decoding as tagged ANY')
1348
1349 else:
1350 # TODO: Seems not to be tested
1351 fullPosition = substrate.markedPosition
1352 currentPosition = substrate.tell()
1353
1354 substrate.seek(fullPosition, os.SEEK_SET)
1355 for chunk in readFromStream(substrate, currentPosition - fullPosition, options):
1356 if isinstance(chunk, SubstrateUnderrunError):
1357 yield chunk
1358
1359 if LOG:
1360 LOG('decoding as untagged ANY, header substrate %s' % debug.hexdump(chunk))
1361
1362 # Any components do not inherit initial tag
1363 asn1Spec = self.protoComponent
1364
1365 if substrateFun and substrateFun is not self.substrateCollector:
1366 asn1Object = self._createComponent(
1367 asn1Spec, tagSet, noValue, **options)
1368
1369 for chunk in substrateFun(
1370 asn1Object, chunk + substrate, length + len(chunk), options):
1371 yield chunk
1372
1373 return
1374
1375 if LOG:
1376 LOG('assembling constructed serialization')
1377
1378 # All inner fragments are of the same type, treat them as octet string
1379 substrateFun = self.substrateCollector
1380
1381 while True: # loop over fragments
1382
1383 for component in decodeFun(
1384 substrate, asn1Spec, substrateFun=substrateFun,
1385 allowEoo=True, **options):
1386
1387 if isinstance(component, SubstrateUnderrunError):
1388 yield component
1389
1390 if component is eoo.endOfOctets:
1391 break
1392
1393 if component is eoo.endOfOctets:
1394 break
1395
1396 chunk += component
1397
1398 if substrateFun:
1399 yield chunk # TODO: Weird
1400
1401 else:
1402 yield self._createComponent(asn1Spec, tagSet, chunk, **options)
1403
1404
1405# character string types
1406class UTF8StringPayloadDecoder(OctetStringPayloadDecoder):
1407 protoComponent = char.UTF8String()
1408
1409
1410class NumericStringPayloadDecoder(OctetStringPayloadDecoder):
1411 protoComponent = char.NumericString()
1412
1413
1414class PrintableStringPayloadDecoder(OctetStringPayloadDecoder):
1415 protoComponent = char.PrintableString()
1416
1417
1418class TeletexStringPayloadDecoder(OctetStringPayloadDecoder):
1419 protoComponent = char.TeletexString()
1420
1421
1422class VideotexStringPayloadDecoder(OctetStringPayloadDecoder):
1423 protoComponent = char.VideotexString()
1424
1425
1426class IA5StringPayloadDecoder(OctetStringPayloadDecoder):
1427 protoComponent = char.IA5String()
1428
1429
1430class GraphicStringPayloadDecoder(OctetStringPayloadDecoder):
1431 protoComponent = char.GraphicString()
1432
1433
1434class VisibleStringPayloadDecoder(OctetStringPayloadDecoder):
1435 protoComponent = char.VisibleString()
1436
1437
1438class GeneralStringPayloadDecoder(OctetStringPayloadDecoder):
1439 protoComponent = char.GeneralString()
1440
1441
1442class UniversalStringPayloadDecoder(OctetStringPayloadDecoder):
1443 protoComponent = char.UniversalString()
1444
1445
1446class BMPStringPayloadDecoder(OctetStringPayloadDecoder):
1447 protoComponent = char.BMPString()
1448
1449
1450# "useful" types
1451class ObjectDescriptorPayloadDecoder(OctetStringPayloadDecoder):
1452 protoComponent = useful.ObjectDescriptor()
1453
1454
1455class GeneralizedTimePayloadDecoder(OctetStringPayloadDecoder):
1456 protoComponent = useful.GeneralizedTime()
1457
1458
1459class UTCTimePayloadDecoder(OctetStringPayloadDecoder):
1460 protoComponent = useful.UTCTime()
1461
1462
1463TAG_MAP = {
1464 univ.Integer.tagSet: IntegerPayloadDecoder(),
1465 univ.Boolean.tagSet: BooleanPayloadDecoder(),
1466 univ.BitString.tagSet: BitStringPayloadDecoder(),
1467 univ.OctetString.tagSet: OctetStringPayloadDecoder(),
1468 univ.Null.tagSet: NullPayloadDecoder(),
1469 univ.ObjectIdentifier.tagSet: ObjectIdentifierPayloadDecoder(),
1470 univ.RelativeOID.tagSet: RelativeOIDPayloadDecoder(),
1471 univ.Enumerated.tagSet: IntegerPayloadDecoder(),
1472 univ.Real.tagSet: RealPayloadDecoder(),
1473 univ.Sequence.tagSet: SequenceOrSequenceOfPayloadDecoder(), # conflicts with SequenceOf
1474 univ.Set.tagSet: SetOrSetOfPayloadDecoder(), # conflicts with SetOf
1475 univ.Choice.tagSet: ChoicePayloadDecoder(), # conflicts with Any
1476 # character string types
1477 char.UTF8String.tagSet: UTF8StringPayloadDecoder(),
1478 char.NumericString.tagSet: NumericStringPayloadDecoder(),
1479 char.PrintableString.tagSet: PrintableStringPayloadDecoder(),
1480 char.TeletexString.tagSet: TeletexStringPayloadDecoder(),
1481 char.VideotexString.tagSet: VideotexStringPayloadDecoder(),
1482 char.IA5String.tagSet: IA5StringPayloadDecoder(),
1483 char.GraphicString.tagSet: GraphicStringPayloadDecoder(),
1484 char.VisibleString.tagSet: VisibleStringPayloadDecoder(),
1485 char.GeneralString.tagSet: GeneralStringPayloadDecoder(),
1486 char.UniversalString.tagSet: UniversalStringPayloadDecoder(),
1487 char.BMPString.tagSet: BMPStringPayloadDecoder(),
1488 # useful types
1489 useful.ObjectDescriptor.tagSet: ObjectDescriptorPayloadDecoder(),
1490 useful.GeneralizedTime.tagSet: GeneralizedTimePayloadDecoder(),
1491 useful.UTCTime.tagSet: UTCTimePayloadDecoder()
1492}
1493
1494# Type-to-codec map for ambiguous ASN.1 types
1495TYPE_MAP = {
1496 univ.Set.typeId: SetPayloadDecoder(),
1497 univ.SetOf.typeId: SetOfPayloadDecoder(),
1498 univ.Sequence.typeId: SequencePayloadDecoder(),
1499 univ.SequenceOf.typeId: SequenceOfPayloadDecoder(),
1500 univ.Choice.typeId: ChoicePayloadDecoder(),
1501 univ.Any.typeId: AnyPayloadDecoder()
1502}
1503
1504# Put in non-ambiguous types for faster codec lookup
1505for typeDecoder in TAG_MAP.values():
1506 if typeDecoder.protoComponent is not None:
1507 typeId = typeDecoder.protoComponent.__class__.typeId
1508 if typeId is not None and typeId not in TYPE_MAP:
1509 TYPE_MAP[typeId] = typeDecoder
1510
1511
1512(stDecodeTag,
1513 stDecodeLength,
1514 stGetValueDecoder,
1515 stGetValueDecoderByAsn1Spec,
1516 stGetValueDecoderByTag,
1517 stTryAsExplicitTag,
1518 stDecodeValue,
1519 stDumpRawValue,
1520 stErrorCondition,
1521 stStop) = [x for x in range(10)]
1522
1523
1524EOO_SENTINEL = bytes((0, 0))
1525
1526
1527class SingleItemDecoder(object):
1528 defaultErrorState = stErrorCondition
1529 #defaultErrorState = stDumpRawValue
1530 defaultRawDecoder = AnyPayloadDecoder()
1531
1532 supportIndefLength = True
1533
1534 TAG_MAP = TAG_MAP
1535 TYPE_MAP = TYPE_MAP
1536
1537 def __init__(self, tagMap=_MISSING, typeMap=_MISSING, **ignored):
1538 self._tagMap = tagMap if tagMap is not _MISSING else self.TAG_MAP
1539 self._typeMap = typeMap if typeMap is not _MISSING else self.TYPE_MAP
1540
1541 # Tag & TagSet objects caches
1542 self._tagCache = {}
1543 self._tagSetCache = {}
1544
1545 def __call__(self, substrate, asn1Spec=None,
1546 tagSet=None, length=None, state=stDecodeTag,
1547 decodeFun=None, substrateFun=None,
1548 **options):
1549
1550 allowEoo = options.pop('allowEoo', False)
1551
1552 if LOG:
1553 LOG('decoder called at scope %s with state %d, working with up '
1554 'to %s octets of substrate: '
1555 '%s' % (debug.scope, state, length, substrate))
1556
1557 # Look for end-of-octets sentinel
1558 if allowEoo and self.supportIndefLength:
1559
1560 for eoo_candidate in readFromStream(substrate, 2, options):
1561 if isinstance(eoo_candidate, SubstrateUnderrunError):
1562 yield eoo_candidate
1563
1564 if eoo_candidate == EOO_SENTINEL:
1565 if LOG:
1566 LOG('end-of-octets sentinel found')
1567 yield eoo.endOfOctets
1568 return
1569
1570 else:
1571 substrate.seek(-2, os.SEEK_CUR)
1572
1573 tagMap = self._tagMap
1574 typeMap = self._typeMap
1575 tagCache = self._tagCache
1576 tagSetCache = self._tagSetCache
1577
1578 value = noValue
1579
1580 substrate.markedPosition = substrate.tell()
1581
1582 while state is not stStop:
1583
1584 if state is stDecodeTag:
1585 # Decode tag
1586 isShortTag = True
1587
1588 for firstByte in readFromStream(substrate, 1, options):
1589 if isinstance(firstByte, SubstrateUnderrunError):
1590 yield firstByte
1591
1592 firstOctet = ord(firstByte)
1593
1594 try:
1595 lastTag = tagCache[firstOctet]
1596
1597 except KeyError:
1598 integerTag = firstOctet
1599 tagClass = integerTag & 0xC0
1600 tagFormat = integerTag & 0x20
1601 tagId = integerTag & 0x1F
1602
1603 if tagId == 0x1F:
1604 isShortTag = False
1605 lengthOctetIdx = 0
1606 tagId = 0
1607
1608 while True:
1609 for integerByte in readFromStream(substrate, 1, options):
1610 if isinstance(integerByte, SubstrateUnderrunError):
1611 yield integerByte
1612
1613 if not integerByte:
1614 raise error.SubstrateUnderrunError(
1615 'Short octet stream on long tag decoding'
1616 )
1617
1618 integerTag = ord(integerByte)
1619 lengthOctetIdx += 1
1620 tagId <<= 7
1621 tagId |= (integerTag & 0x7F)
1622
1623 if not integerTag & 0x80:
1624 break
1625
1626 lastTag = tag.Tag(
1627 tagClass=tagClass, tagFormat=tagFormat, tagId=tagId
1628 )
1629
1630 if isShortTag:
1631 # cache short tags
1632 tagCache[firstOctet] = lastTag
1633
1634 if tagSet is None:
1635 if isShortTag:
1636 try:
1637 tagSet = tagSetCache[firstOctet]
1638
1639 except KeyError:
1640 # base tag not recovered
1641 tagSet = tag.TagSet((), lastTag)
1642 tagSetCache[firstOctet] = tagSet
1643 else:
1644 tagSet = tag.TagSet((), lastTag)
1645
1646 else:
1647 tagSet = lastTag + tagSet
1648
1649 state = stDecodeLength
1650
1651 if LOG:
1652 LOG('tag decoded into %s, decoding length' % tagSet)
1653
1654 if state is stDecodeLength:
1655 # Decode length
1656 for firstOctet in readFromStream(substrate, 1, options):
1657 if isinstance(firstOctet, SubstrateUnderrunError):
1658 yield firstOctet
1659
1660 firstOctet = ord(firstOctet)
1661
1662 if firstOctet < 128:
1663 length = firstOctet
1664
1665 elif firstOctet > 128:
1666 size = firstOctet & 0x7F
1667 # encoded in size bytes
1668 for encodedLength in readFromStream(substrate, size, options):
1669 if isinstance(encodedLength, SubstrateUnderrunError):
1670 yield encodedLength
1671 encodedLength = list(encodedLength)
1672 # missing check on maximum size, which shouldn't be a
1673 # problem, we can handle more than is possible
1674 if len(encodedLength) != size:
1675 raise error.SubstrateUnderrunError(
1676 '%s<%s at %s' % (size, len(encodedLength), tagSet)
1677 )
1678
1679 length = 0
1680 for lengthOctet in encodedLength:
1681 length <<= 8
1682 length |= lengthOctet
1683 size += 1
1684
1685 else: # 128 means indefinite
1686 length = -1
1687
1688 if length == -1 and not self.supportIndefLength:
1689 raise error.PyAsn1Error('Indefinite length encoding not supported by this codec')
1690
1691 state = stGetValueDecoder
1692
1693 if LOG:
1694 LOG('value length decoded into %d' % length)
1695
1696 if state is stGetValueDecoder:
1697 if asn1Spec is None:
1698 state = stGetValueDecoderByTag
1699
1700 else:
1701 state = stGetValueDecoderByAsn1Spec
1702 #
1703 # There're two ways of creating subtypes in ASN.1 what influences
1704 # decoder operation. These methods are:
1705 # 1) Either base types used in or no IMPLICIT tagging has been
1706 # applied on subtyping.
1707 # 2) Subtype syntax drops base type information (by means of
1708 # IMPLICIT tagging.
1709 # The first case allows for complete tag recovery from substrate
1710 # while the second one requires original ASN.1 type spec for
1711 # decoding.
1712 #
1713 # In either case a set of tags (tagSet) is coming from substrate
1714 # in an incremental, tag-by-tag fashion (this is the case of
1715 # EXPLICIT tag which is most basic). Outermost tag comes first
1716 # from the wire.
1717 #
1718 if state is stGetValueDecoderByTag:
1719 try:
1720 concreteDecoder = tagMap[tagSet]
1721
1722 except KeyError:
1723 concreteDecoder = None
1724
1725 if concreteDecoder:
1726 state = stDecodeValue
1727
1728 else:
1729 try:
1730 concreteDecoder = tagMap[tagSet[:1]]
1731
1732 except KeyError:
1733 concreteDecoder = None
1734
1735 if concreteDecoder:
1736 state = stDecodeValue
1737 else:
1738 state = stTryAsExplicitTag
1739
1740 if LOG:
1741 LOG('codec %s chosen by a built-in type, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag'))
1742 debug.scope.push(concreteDecoder is None and '?' or concreteDecoder.protoComponent.__class__.__name__)
1743
1744 if state is stGetValueDecoderByAsn1Spec:
1745
1746 if asn1Spec.__class__ is tagmap.TagMap:
1747 try:
1748 chosenSpec = asn1Spec[tagSet]
1749
1750 except KeyError:
1751 chosenSpec = None
1752
1753 if LOG:
1754 LOG('candidate ASN.1 spec is a map of:')
1755
1756 for firstOctet, v in asn1Spec.presentTypes.items():
1757 LOG(' %s -> %s' % (firstOctet, v.__class__.__name__))
1758
1759 if asn1Spec.skipTypes:
1760 LOG('but neither of: ')
1761 for firstOctet, v in asn1Spec.skipTypes.items():
1762 LOG(' %s -> %s' % (firstOctet, v.__class__.__name__))
1763 LOG('new candidate ASN.1 spec is %s, chosen by %s' % (chosenSpec is None and '<none>' or chosenSpec.prettyPrintType(), tagSet))
1764
1765 elif tagSet == asn1Spec.tagSet or tagSet in asn1Spec.tagMap:
1766 chosenSpec = asn1Spec
1767 if LOG:
1768 LOG('candidate ASN.1 spec is %s' % asn1Spec.__class__.__name__)
1769
1770 else:
1771 chosenSpec = None
1772
1773 if chosenSpec is not None:
1774 try:
1775 # ambiguous type or just faster codec lookup
1776 concreteDecoder = typeMap[chosenSpec.typeId]
1777
1778 if LOG:
1779 LOG('value decoder chosen for an ambiguous type by type ID %s' % (chosenSpec.typeId,))
1780
1781 except KeyError:
1782 # use base type for codec lookup to recover untagged types
1783 baseTagSet = tag.TagSet(chosenSpec.tagSet.baseTag, chosenSpec.tagSet.baseTag)
1784 try:
1785 # base type or tagged subtype
1786 concreteDecoder = tagMap[baseTagSet]
1787
1788 if LOG:
1789 LOG('value decoder chosen by base %s' % (baseTagSet,))
1790
1791 except KeyError:
1792 concreteDecoder = None
1793
1794 if concreteDecoder:
1795 asn1Spec = chosenSpec
1796 state = stDecodeValue
1797
1798 else:
1799 state = stTryAsExplicitTag
1800
1801 else:
1802 concreteDecoder = None
1803 state = stTryAsExplicitTag
1804
1805 if LOG:
1806 LOG('codec %s chosen by ASN.1 spec, decoding %s' % (state is stDecodeValue and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag'))
1807 debug.scope.push(chosenSpec is None and '?' or chosenSpec.__class__.__name__)
1808
1809 if state is stDecodeValue:
1810 if not options.get('recursiveFlag', True) and not substrateFun: # deprecate this
1811 def substrateFun(asn1Object, _substrate, _length, _options):
1812 """Legacy hack to keep the recursiveFlag=False option supported.
1813
1814 The decode(..., substrateFun=userCallback) option was introduced in 0.1.4 as a generalization
1815 of the old recursiveFlag=False option. Users should pass their callback instead of using
1816 recursiveFlag.
1817 """
1818 yield asn1Object
1819
1820 original_position = substrate.tell()
1821
1822 if length == -1: # indef length
1823 for value in concreteDecoder.indefLenValueDecoder(
1824 substrate, asn1Spec,
1825 tagSet, length, stGetValueDecoder,
1826 self, substrateFun, **options):
1827 if isinstance(value, SubstrateUnderrunError):
1828 yield value
1829
1830 else:
1831 for value in concreteDecoder.valueDecoder(
1832 substrate, asn1Spec,
1833 tagSet, length, stGetValueDecoder,
1834 self, substrateFun, **options):
1835 if isinstance(value, SubstrateUnderrunError):
1836 yield value
1837
1838 bytesRead = substrate.tell() - original_position
1839 if not substrateFun and bytesRead != length:
1840 raise PyAsn1Error(
1841 "Read %s bytes instead of expected %s." % (bytesRead, length))
1842 elif substrateFun and bytesRead > length:
1843 # custom substrateFun may be used for partial decoding, reading less is expected there
1844 raise PyAsn1Error(
1845 "Read %s bytes are more than expected %s." % (bytesRead, length))
1846
1847 if LOG:
1848 LOG('codec %s yields type %s, value:\n%s\n...' % (
1849 concreteDecoder.__class__.__name__, value.__class__.__name__,
1850 isinstance(value, base.Asn1Item) and value.prettyPrint() or value))
1851
1852 state = stStop
1853 break
1854
1855 if state is stTryAsExplicitTag:
1856 if (tagSet and
1857 tagSet[0].tagFormat == tag.tagFormatConstructed and
1858 tagSet[0].tagClass != tag.tagClassUniversal):
1859 # Assume explicit tagging
1860 concreteDecoder = rawPayloadDecoder
1861 state = stDecodeValue
1862
1863 else:
1864 concreteDecoder = None
1865 state = self.defaultErrorState
1866
1867 if LOG:
1868 LOG('codec %s chosen, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as failure'))
1869
1870 if state is stDumpRawValue:
1871 concreteDecoder = self.defaultRawDecoder
1872
1873 if LOG:
1874 LOG('codec %s chosen, decoding value' % concreteDecoder.__class__.__name__)
1875
1876 state = stDecodeValue
1877
1878 if state is stErrorCondition:
1879 raise error.PyAsn1Error(
1880 '%s not in asn1Spec: %r' % (tagSet, asn1Spec)
1881 )
1882
1883 if LOG:
1884 debug.scope.pop()
1885 LOG('decoder left scope %s, call completed' % debug.scope)
1886
1887 yield value
1888
1889
1890class StreamingDecoder(object):
1891 """Create an iterator that turns BER/CER/DER byte stream into ASN.1 objects.
1892
1893 On each iteration, consume whatever BER/CER/DER serialization is
1894 available in the `substrate` stream-like object and turns it into
1895 one or more, possibly nested, ASN.1 objects.
1896
1897 Parameters
1898 ----------
1899 substrate: :py:class:`file`, :py:class:`io.BytesIO`
1900 BER/CER/DER serialization in form of a byte stream
1901
1902 Keyword Args
1903 ------------
1904 asn1Spec: :py:class:`~pyasn1.type.base.PyAsn1Item`
1905 A pyasn1 type object to act as a template guiding the decoder.
1906 Depending on the ASN.1 structure being decoded, `asn1Spec` may
1907 or may not be required. One of the reasons why `asn1Spec` may
1908 me required is that ASN.1 structure is encoded in the *IMPLICIT*
1909 tagging mode.
1910
1911 Yields
1912 ------
1913 : :py:class:`~pyasn1.type.base.PyAsn1Item`, :py:class:`~pyasn1.error.SubstrateUnderrunError`
1914 Decoded ASN.1 object (possibly, nested) or
1915 :py:class:`~pyasn1.error.SubstrateUnderrunError` object indicating
1916 insufficient BER/CER/DER serialization on input to fully recover ASN.1
1917 objects from it.
1918
1919 In the latter case the caller is advised to ensure some more data in
1920 the input stream, then call the iterator again. The decoder will resume
1921 the decoding process using the newly arrived data.
1922
1923 The `context` property of :py:class:`~pyasn1.error.SubstrateUnderrunError`
1924 object might hold a reference to the partially populated ASN.1 object
1925 being reconstructed.
1926
1927 Raises
1928 ------
1929 ~pyasn1.error.PyAsn1Error, ~pyasn1.error.EndOfStreamError
1930 `PyAsn1Error` on deserialization error, `EndOfStreamError` on
1931 premature stream closure.
1932
1933 Examples
1934 --------
1935 Decode BER serialisation without ASN.1 schema
1936
1937 .. code-block:: pycon
1938
1939 >>> stream = io.BytesIO(
1940 ... b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03')
1941 >>>
1942 >>> for asn1Object in StreamingDecoder(stream):
1943 ... print(asn1Object)
1944 >>>
1945 SequenceOf:
1946 1 2 3
1947
1948 Decode BER serialisation with ASN.1 schema
1949
1950 .. code-block:: pycon
1951
1952 >>> stream = io.BytesIO(
1953 ... b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03')
1954 >>>
1955 >>> schema = SequenceOf(componentType=Integer())
1956 >>>
1957 >>> decoder = StreamingDecoder(stream, asn1Spec=schema)
1958 >>> for asn1Object in decoder:
1959 ... print(asn1Object)
1960 >>>
1961 SequenceOf:
1962 1 2 3
1963 """
1964
1965 SINGLE_ITEM_DECODER = SingleItemDecoder
1966
1967 def __init__(self, substrate, asn1Spec=None, **options):
1968 self._singleItemDecoder = self.SINGLE_ITEM_DECODER(**options)
1969 self._substrate = asSeekableStream(substrate)
1970 self._asn1Spec = asn1Spec
1971 self._options = options
1972
1973 def __iter__(self):
1974 while True:
1975 for asn1Object in self._singleItemDecoder(
1976 self._substrate, self._asn1Spec, **self._options):
1977 yield asn1Object
1978
1979 for chunk in isEndOfStream(self._substrate):
1980 if isinstance(chunk, SubstrateUnderrunError):
1981 yield
1982
1983 break
1984
1985 if chunk:
1986 break
1987
1988
1989class Decoder(object):
1990 """Create a BER decoder object.
1991
1992 Parse BER/CER/DER octet-stream into one, possibly nested, ASN.1 object.
1993 """
1994 STREAMING_DECODER = StreamingDecoder
1995
1996 @classmethod
1997 def __call__(cls, substrate, asn1Spec=None, **options):
1998 """Turns BER/CER/DER octet stream into an ASN.1 object.
1999
2000 Takes BER/CER/DER octet-stream in form of :py:class:`bytes`
2001 and decode it into an ASN.1 object
2002 (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) which
2003 may be a scalar or an arbitrary nested structure.
2004
2005 Parameters
2006 ----------
2007 substrate: :py:class:`bytes`
2008 BER/CER/DER octet-stream to parse
2009
2010 Keyword Args
2011 ------------
2012 asn1Spec: :py:class:`~pyasn1.type.base.PyAsn1Item`
2013 A pyasn1 type object (:py:class:`~pyasn1.type.base.PyAsn1Item`
2014 derivative) to act as a template guiding the decoder.
2015 Depending on the ASN.1 structure being decoded, `asn1Spec` may or
2016 may not be required. Most common reason for it to require is that
2017 ASN.1 structure is encoded in *IMPLICIT* tagging mode.
2018
2019 substrateFun: :py:class:`Union[
2020 Callable[[pyasn1.type.base.PyAsn1Item, bytes, int],
2021 Tuple[pyasn1.type.base.PyAsn1Item, bytes]],
2022 Callable[[pyasn1.type.base.PyAsn1Item, io.BytesIO, int, dict],
2023 Generator[Union[pyasn1.type.base.PyAsn1Item,
2024 pyasn1.error.SubstrateUnderrunError],
2025 None, None]]
2026 ]`
2027 User callback meant to generalize special use cases like non-recursive or
2028 partial decoding. A 3-arg non-streaming variant is supported for backwards
2029 compatiblilty in addition to the newer 4-arg streaming variant.
2030 The callback will receive the uninitialized object recovered from substrate
2031 as 1st argument, the uninterpreted payload as 2nd argument, and the length
2032 of the uninterpreted payload as 3rd argument. The streaming variant will
2033 additionally receive the decode(..., **options) kwargs as 4th argument.
2034 The non-streaming variant shall return an object that will be propagated
2035 as decode() return value as 1st item, and the remainig payload for further
2036 decode passes as 2nd item.
2037 The streaming variant shall yield an object that will be propagated as
2038 decode() return value, and leave the remaining payload in the stream.
2039
2040 Returns
2041 -------
2042 : :py:class:`tuple`
2043 A tuple of :py:class:`~pyasn1.type.base.PyAsn1Item` object
2044 recovered from BER/CER/DER substrate and the unprocessed trailing
2045 portion of the `substrate` (may be empty)
2046
2047 Raises
2048 ------
2049 : :py:class:`~pyasn1.error.PyAsn1Error`
2050 :py:class:`~pyasn1.error.SubstrateUnderrunError` on insufficient
2051 input or :py:class:`~pyasn1.error.PyAsn1Error` on decoding error.
2052
2053 Examples
2054 --------
2055 Decode BER/CER/DER serialisation without ASN.1 schema
2056
2057 .. code-block:: pycon
2058
2059 >>> s, unprocessed = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03')
2060 >>> str(s)
2061 SequenceOf:
2062 1 2 3
2063
2064 Decode BER/CER/DER serialisation with ASN.1 schema
2065
2066 .. code-block:: pycon
2067
2068 >>> seq = SequenceOf(componentType=Integer())
2069 >>> s, unprocessed = decode(
2070 b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03', asn1Spec=seq)
2071 >>> str(s)
2072 SequenceOf:
2073 1 2 3
2074
2075 """
2076 substrate = asSeekableStream(substrate)
2077
2078 if "substrateFun" in options:
2079 origSubstrateFun = options["substrateFun"]
2080
2081 def substrateFunWrapper(asn1Object, substrate, length, options=None):
2082 """Support both 0.4 and 0.5 style APIs.
2083
2084 substrateFun API has changed in 0.5 for use with streaming decoders. To stay backwards compatible,
2085 we first try if we received a streaming user callback. If that fails,we assume we've received a
2086 non-streaming v0.4 user callback and convert it for streaming on the fly
2087 """
2088 try:
2089 substrate_gen = origSubstrateFun(asn1Object, substrate, length, options)
2090 except TypeError as _value:
2091 if _value.__traceback__.tb_next:
2092 # Traceback depth > 1 means TypeError from inside user provided function
2093 raise
2094 # invariant maintained at Decoder.__call__ entry
2095 assert isinstance(substrate, io.BytesIO) # nosec assert_used
2096 substrate_gen = Decoder._callSubstrateFunV4asV5(origSubstrateFun, asn1Object, substrate, length)
2097 for value in substrate_gen:
2098 yield value
2099
2100 options["substrateFun"] = substrateFunWrapper
2101
2102 streamingDecoder = cls.STREAMING_DECODER(
2103 substrate, asn1Spec, **options)
2104
2105 for asn1Object in streamingDecoder:
2106 if isinstance(asn1Object, SubstrateUnderrunError):
2107 raise error.SubstrateUnderrunError('Short substrate on input')
2108
2109 try:
2110 tail = next(readFromStream(substrate))
2111
2112 except error.EndOfStreamError:
2113 tail = b''
2114
2115 return asn1Object, tail
2116
2117 @staticmethod
2118 def _callSubstrateFunV4asV5(substrateFunV4, asn1Object, substrate, length):
2119 substrate_bytes = substrate.read()
2120 if length == -1:
2121 length = len(substrate_bytes)
2122 value, nextSubstrate = substrateFunV4(asn1Object, substrate_bytes, length)
2123 nbytes = substrate.write(nextSubstrate)
2124 substrate.truncate()
2125 substrate.seek(-nbytes, os.SEEK_CUR)
2126 yield value
2127
2128#: Turns BER octet stream into an ASN.1 object.
2129#:
2130#: Takes BER octet-stream and decode it into an ASN.1 object
2131#: (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) which
2132#: may be a scalar or an arbitrary nested structure.
2133#:
2134#: Parameters
2135#: ----------
2136#: substrate: :py:class:`bytes`
2137#: BER octet-stream
2138#:
2139#: Keyword Args
2140#: ------------
2141#: asn1Spec: any pyasn1 type object e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative
2142#: A pyasn1 type object to act as a template guiding the decoder. Depending on the ASN.1 structure
2143#: being decoded, *asn1Spec* may or may not be required. Most common reason for
2144#: it to require is that ASN.1 structure is encoded in *IMPLICIT* tagging mode.
2145#:
2146#: Returns
2147#: -------
2148#: : :py:class:`tuple`
2149#: A tuple of pyasn1 object recovered from BER substrate (:py:class:`~pyasn1.type.base.PyAsn1Item` derivative)
2150#: and the unprocessed trailing portion of the *substrate* (may be empty)
2151#:
2152#: Raises
2153#: ------
2154#: ~pyasn1.error.PyAsn1Error, ~pyasn1.error.SubstrateUnderrunError
2155#: On decoding errors
2156#:
2157#: Notes
2158#: -----
2159#: This function is deprecated. Please use :py:class:`Decoder` or
2160#: :py:class:`StreamingDecoder` class instance.
2161#:
2162#: Examples
2163#: --------
2164#: Decode BER serialisation without ASN.1 schema
2165#:
2166#: .. code-block:: pycon
2167#:
2168#: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03')
2169#: >>> str(s)
2170#: SequenceOf:
2171#: 1 2 3
2172#:
2173#: Decode BER serialisation with ASN.1 schema
2174#:
2175#: .. code-block:: pycon
2176#:
2177#: >>> seq = SequenceOf(componentType=Integer())
2178#: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03', asn1Spec=seq)
2179#: >>> str(s)
2180#: SequenceOf:
2181#: 1 2 3
2182#:
2183decode = Decoder()
2184
2185def __getattr__(attr: str):
2186 if newAttr := {"tagMap": "TAG_MAP", "typeMap": "TYPE_MAP"}.get(attr):
2187 warnings.warn(f"{attr} is deprecated. Please use {newAttr} instead.", DeprecationWarning)
2188 return globals()[newAttr]
2189 raise AttributeError(attr)