Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pyasn1/codec/ber/encoder.py: 15%
453 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:16 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:16 +0000
1#
2# This file is part of pyasn1 software.
3#
4# Copyright (c) 2005-2019, Ilya Etingof <etingof@gmail.com>
5# License: http://snmplabs.com/pyasn1/license.html
6#
7import sys
9from pyasn1 import debug
10from pyasn1 import error
11from pyasn1.codec.ber import eoo
12from pyasn1.compat.integer import to_bytes
13from pyasn1.compat.octets import (int2oct, oct2int, ints2octs, null,
14 str2octs, isOctetsType)
15from pyasn1.type import char
16from pyasn1.type import tag
17from pyasn1.type import univ
18from pyasn1.type import useful
20__all__ = ['encode']
22LOG = debug.registerLoggee(__name__, flags=debug.DEBUG_ENCODER)
25class AbstractItemEncoder(object):
26 supportIndefLenMode = True
28 # An outcome of otherwise legit call `encodeFun(eoo.endOfOctets)`
29 eooIntegerSubstrate = (0, 0)
30 eooOctetsSubstrate = ints2octs(eooIntegerSubstrate)
32 # noinspection PyMethodMayBeStatic
33 def encodeTag(self, singleTag, isConstructed):
34 tagClass, tagFormat, tagId = singleTag
35 encodedTag = tagClass | tagFormat
36 if isConstructed:
37 encodedTag |= tag.tagFormatConstructed
39 if tagId < 31:
40 return encodedTag | tagId,
42 else:
43 substrate = tagId & 0x7f,
45 tagId >>= 7
47 while tagId:
48 substrate = (0x80 | (tagId & 0x7f),) + substrate
49 tagId >>= 7
51 return (encodedTag | 0x1F,) + substrate
53 def encodeLength(self, length, defMode):
54 if not defMode and self.supportIndefLenMode:
55 return (0x80,)
57 if length < 0x80:
58 return length,
60 else:
61 substrate = ()
62 while length:
63 substrate = (length & 0xff,) + substrate
64 length >>= 8
66 substrateLen = len(substrate)
68 if substrateLen > 126:
69 raise error.PyAsn1Error('Length octets overflow (%d)' % substrateLen)
71 return (0x80 | substrateLen,) + substrate
73 def encodeValue(self, value, asn1Spec, encodeFun, **options):
74 raise error.PyAsn1Error('Not implemented')
76 def encode(self, value, asn1Spec=None, encodeFun=None, **options):
78 if asn1Spec is None:
79 tagSet = value.tagSet
80 else:
81 tagSet = asn1Spec.tagSet
83 # untagged item?
84 if not tagSet:
85 substrate, isConstructed, isOctets = self.encodeValue(
86 value, asn1Spec, encodeFun, **options
87 )
88 return substrate
90 defMode = options.get('defMode', True)
92 substrate = null
94 for idx, singleTag in enumerate(tagSet.superTags):
96 defModeOverride = defMode
98 # base tag?
99 if not idx:
100 try:
101 substrate, isConstructed, isOctets = self.encodeValue(
102 value, asn1Spec, encodeFun, **options
103 )
105 except error.PyAsn1Error:
106 exc = sys.exc_info()
107 raise error.PyAsn1Error(
108 'Error encoding %r: %s' % (value, exc[1]))
110 if LOG:
111 LOG('encoded %svalue %s into %s' % (
112 isConstructed and 'constructed ' or '', value, substrate
113 ))
115 if not substrate and isConstructed and options.get('ifNotEmpty', False):
116 return substrate
118 if not isConstructed:
119 defModeOverride = True
121 if LOG:
122 LOG('overridden encoding mode into definitive for primitive type')
124 header = self.encodeTag(singleTag, isConstructed)
126 if LOG:
127 LOG('encoded %stag %s into %s' % (
128 isConstructed and 'constructed ' or '',
129 singleTag, debug.hexdump(ints2octs(header))))
131 header += self.encodeLength(len(substrate), defModeOverride)
133 if LOG:
134 LOG('encoded %s octets (tag + payload) into %s' % (
135 len(substrate), debug.hexdump(ints2octs(header))))
137 if isOctets:
138 substrate = ints2octs(header) + substrate
140 if not defModeOverride:
141 substrate += self.eooOctetsSubstrate
143 else:
144 substrate = header + substrate
146 if not defModeOverride:
147 substrate += self.eooIntegerSubstrate
149 if not isOctets:
150 substrate = ints2octs(substrate)
152 return substrate
155class EndOfOctetsEncoder(AbstractItemEncoder):
156 def encodeValue(self, value, asn1Spec, encodeFun, **options):
157 return null, False, True
160class BooleanEncoder(AbstractItemEncoder):
161 supportIndefLenMode = False
163 def encodeValue(self, value, asn1Spec, encodeFun, **options):
164 return value and (1,) or (0,), False, False
167class IntegerEncoder(AbstractItemEncoder):
168 supportIndefLenMode = False
169 supportCompactZero = False
171 def encodeValue(self, value, asn1Spec, encodeFun, **options):
172 if value == 0:
173 if LOG:
174 LOG('encoding %spayload for zero INTEGER' % (
175 self.supportCompactZero and 'no ' or ''
176 ))
178 # de-facto way to encode zero
179 if self.supportCompactZero:
180 return (), False, False
181 else:
182 return (0,), False, False
184 return to_bytes(int(value), signed=True), False, True
187class BitStringEncoder(AbstractItemEncoder):
188 def encodeValue(self, value, asn1Spec, encodeFun, **options):
189 if asn1Spec is not None:
190 # TODO: try to avoid ASN.1 schema instantiation
191 value = asn1Spec.clone(value)
193 valueLength = len(value)
194 if valueLength % 8:
195 alignedValue = value << (8 - valueLength % 8)
196 else:
197 alignedValue = value
199 maxChunkSize = options.get('maxChunkSize', 0)
200 if not maxChunkSize or len(alignedValue) <= maxChunkSize * 8:
201 substrate = alignedValue.asOctets()
202 return int2oct(len(substrate) * 8 - valueLength) + substrate, False, True
204 if LOG:
205 LOG('encoding into up to %s-octet chunks' % maxChunkSize)
207 baseTag = value.tagSet.baseTag
209 # strip off explicit tags
210 if baseTag:
211 tagSet = tag.TagSet(baseTag, baseTag)
213 else:
214 tagSet = tag.TagSet()
216 alignedValue = alignedValue.clone(tagSet=tagSet)
218 stop = 0
219 substrate = null
220 while stop < valueLength:
221 start = stop
222 stop = min(start + maxChunkSize * 8, valueLength)
223 substrate += encodeFun(alignedValue[start:stop], asn1Spec, **options)
225 return substrate, True, True
228class OctetStringEncoder(AbstractItemEncoder):
230 def encodeValue(self, value, asn1Spec, encodeFun, **options):
232 if asn1Spec is None:
233 substrate = value.asOctets()
235 elif not isOctetsType(value):
236 substrate = asn1Spec.clone(value).asOctets()
238 else:
239 substrate = value
241 maxChunkSize = options.get('maxChunkSize', 0)
243 if not maxChunkSize or len(substrate) <= maxChunkSize:
244 return substrate, False, True
246 if LOG:
247 LOG('encoding into up to %s-octet chunks' % maxChunkSize)
249 # strip off explicit tags for inner chunks
251 if asn1Spec is None:
252 baseTag = value.tagSet.baseTag
254 # strip off explicit tags
255 if baseTag:
256 tagSet = tag.TagSet(baseTag, baseTag)
258 else:
259 tagSet = tag.TagSet()
261 asn1Spec = value.clone(tagSet=tagSet)
263 elif not isOctetsType(value):
264 baseTag = asn1Spec.tagSet.baseTag
266 # strip off explicit tags
267 if baseTag:
268 tagSet = tag.TagSet(baseTag, baseTag)
270 else:
271 tagSet = tag.TagSet()
273 asn1Spec = asn1Spec.clone(tagSet=tagSet)
275 pos = 0
276 substrate = null
278 while True:
279 chunk = value[pos:pos + maxChunkSize]
280 if not chunk:
281 break
283 substrate += encodeFun(chunk, asn1Spec, **options)
284 pos += maxChunkSize
286 return substrate, True, True
289class NullEncoder(AbstractItemEncoder):
290 supportIndefLenMode = False
292 def encodeValue(self, value, asn1Spec, encodeFun, **options):
293 return null, False, True
296class ObjectIdentifierEncoder(AbstractItemEncoder):
297 supportIndefLenMode = False
299 def encodeValue(self, value, asn1Spec, encodeFun, **options):
300 if asn1Spec is not None:
301 value = asn1Spec.clone(value)
303 oid = value.asTuple()
305 # Build the first pair
306 try:
307 first = oid[0]
308 second = oid[1]
310 except IndexError:
311 raise error.PyAsn1Error('Short OID %s' % (value,))
313 if 0 <= second <= 39:
314 if first == 1:
315 oid = (second + 40,) + oid[2:]
316 elif first == 0:
317 oid = (second,) + oid[2:]
318 elif first == 2:
319 oid = (second + 80,) + oid[2:]
320 else:
321 raise error.PyAsn1Error('Impossible first/second arcs at %s' % (value,))
323 elif first == 2:
324 oid = (second + 80,) + oid[2:]
326 else:
327 raise error.PyAsn1Error('Impossible first/second arcs at %s' % (value,))
329 octets = ()
331 # Cycle through subIds
332 for subOid in oid:
333 if 0 <= subOid <= 127:
334 # Optimize for the common case
335 octets += (subOid,)
337 elif subOid > 127:
338 # Pack large Sub-Object IDs
339 res = (subOid & 0x7f,)
340 subOid >>= 7
342 while subOid:
343 res = (0x80 | (subOid & 0x7f),) + res
344 subOid >>= 7
346 # Add packed Sub-Object ID to resulted Object ID
347 octets += res
349 else:
350 raise error.PyAsn1Error('Negative OID arc %s at %s' % (subOid, value))
352 return octets, False, False
355class RealEncoder(AbstractItemEncoder):
356 supportIndefLenMode = 0
357 binEncBase = 2 # set to None to choose encoding base automatically
359 @staticmethod
360 def _dropFloatingPoint(m, encbase, e):
361 ms, es = 1, 1
362 if m < 0:
363 ms = -1 # mantissa sign
365 if e < 0:
366 es = -1 # exponent sign
368 m *= ms
370 if encbase == 8:
371 m *= 2 ** (abs(e) % 3 * es)
372 e = abs(e) // 3 * es
374 elif encbase == 16:
375 m *= 2 ** (abs(e) % 4 * es)
376 e = abs(e) // 4 * es
378 while True:
379 if int(m) != m:
380 m *= encbase
381 e -= 1
382 continue
383 break
385 return ms, int(m), encbase, e
387 def _chooseEncBase(self, value):
388 m, b, e = value
389 encBase = [2, 8, 16]
390 if value.binEncBase in encBase:
391 return self._dropFloatingPoint(m, value.binEncBase, e)
393 elif self.binEncBase in encBase:
394 return self._dropFloatingPoint(m, self.binEncBase, e)
396 # auto choosing base 2/8/16
397 mantissa = [m, m, m]
398 exponent = [e, e, e]
399 sign = 1
400 encbase = 2
401 e = float('inf')
403 for i in range(3):
404 (sign,
405 mantissa[i],
406 encBase[i],
407 exponent[i]) = self._dropFloatingPoint(mantissa[i], encBase[i], exponent[i])
409 if abs(exponent[i]) < abs(e) or (abs(exponent[i]) == abs(e) and mantissa[i] < m):
410 e = exponent[i]
411 m = int(mantissa[i])
412 encbase = encBase[i]
414 if LOG:
415 LOG('automatically chosen REAL encoding base %s, sign %s, mantissa %s, '
416 'exponent %s' % (encbase, sign, m, e))
418 return sign, m, encbase, e
420 def encodeValue(self, value, asn1Spec, encodeFun, **options):
421 if asn1Spec is not None:
422 value = asn1Spec.clone(value)
424 if value.isPlusInf:
425 return (0x40,), False, False
427 if value.isMinusInf:
428 return (0x41,), False, False
430 m, b, e = value
432 if not m:
433 return null, False, True
435 if b == 10:
436 if LOG:
437 LOG('encoding REAL into character form')
439 return str2octs('\x03%dE%s%d' % (m, e == 0 and '+' or '', e)), False, True
441 elif b == 2:
442 fo = 0x80 # binary encoding
443 ms, m, encbase, e = self._chooseEncBase(value)
445 if ms < 0: # mantissa sign
446 fo |= 0x40 # sign bit
448 # exponent & mantissa normalization
449 if encbase == 2:
450 while m & 0x1 == 0:
451 m >>= 1
452 e += 1
454 elif encbase == 8:
455 while m & 0x7 == 0:
456 m >>= 3
457 e += 1
458 fo |= 0x10
460 else: # encbase = 16
461 while m & 0xf == 0:
462 m >>= 4
463 e += 1
464 fo |= 0x20
466 sf = 0 # scale factor
468 while m & 0x1 == 0:
469 m >>= 1
470 sf += 1
472 if sf > 3:
473 raise error.PyAsn1Error('Scale factor overflow') # bug if raised
475 fo |= sf << 2
476 eo = null
477 if e == 0 or e == -1:
478 eo = int2oct(e & 0xff)
480 else:
481 while e not in (0, -1):
482 eo = int2oct(e & 0xff) + eo
483 e >>= 8
485 if e == 0 and eo and oct2int(eo[0]) & 0x80:
486 eo = int2oct(0) + eo
488 if e == -1 and eo and not (oct2int(eo[0]) & 0x80):
489 eo = int2oct(0xff) + eo
491 n = len(eo)
492 if n > 0xff:
493 raise error.PyAsn1Error('Real exponent overflow')
495 if n == 1:
496 pass
498 elif n == 2:
499 fo |= 1
501 elif n == 3:
502 fo |= 2
504 else:
505 fo |= 3
506 eo = int2oct(n & 0xff) + eo
508 po = null
510 while m:
511 po = int2oct(m & 0xff) + po
512 m >>= 8
514 substrate = int2oct(fo) + eo + po
516 return substrate, False, True
518 else:
519 raise error.PyAsn1Error('Prohibited Real base %s' % b)
522class SequenceEncoder(AbstractItemEncoder):
523 omitEmptyOptionals = False
525 # TODO: handling three flavors of input is too much -- split over codecs
527 def encodeValue(self, value, asn1Spec, encodeFun, **options):
529 substrate = null
531 omitEmptyOptionals = options.get(
532 'omitEmptyOptionals', self.omitEmptyOptionals)
534 if LOG:
535 LOG('%sencoding empty OPTIONAL components' % (
536 omitEmptyOptionals and 'not ' or ''))
538 if asn1Spec is None:
539 # instance of ASN.1 schema
540 inconsistency = value.isInconsistent
541 if inconsistency:
542 raise inconsistency
544 namedTypes = value.componentType
546 for idx, component in enumerate(value.values()):
547 if namedTypes:
548 namedType = namedTypes[idx]
550 if namedType.isOptional and not component.isValue:
551 if LOG:
552 LOG('not encoding OPTIONAL component %r' % (namedType,))
553 continue
555 if namedType.isDefaulted and component == namedType.asn1Object:
556 if LOG:
557 LOG('not encoding DEFAULT component %r' % (namedType,))
558 continue
560 if omitEmptyOptionals:
561 options.update(ifNotEmpty=namedType.isOptional)
563 # wrap open type blob if needed
564 if namedTypes and namedType.openType:
566 wrapType = namedType.asn1Object
568 if wrapType.typeId in (
569 univ.SetOf.typeId, univ.SequenceOf.typeId):
571 substrate += encodeFun(
572 component, asn1Spec,
573 **dict(options, wrapType=wrapType.componentType))
575 else:
576 chunk = encodeFun(component, asn1Spec, **options)
578 if wrapType.isSameTypeWith(component):
579 substrate += chunk
581 else:
582 substrate += encodeFun(chunk, wrapType, **options)
584 if LOG:
585 LOG('wrapped with wrap type %r' % (wrapType,))
587 else:
588 substrate += encodeFun(component, asn1Spec, **options)
590 else:
591 # bare Python value + ASN.1 schema
592 for idx, namedType in enumerate(asn1Spec.componentType.namedTypes):
594 try:
595 component = value[namedType.name]
597 except KeyError:
598 raise error.PyAsn1Error('Component name "%s" not found in %r' % (
599 namedType.name, value))
601 if namedType.isOptional and namedType.name not in value:
602 if LOG:
603 LOG('not encoding OPTIONAL component %r' % (namedType,))
604 continue
606 if namedType.isDefaulted and component == namedType.asn1Object:
607 if LOG:
608 LOG('not encoding DEFAULT component %r' % (namedType,))
609 continue
611 if omitEmptyOptionals:
612 options.update(ifNotEmpty=namedType.isOptional)
614 componentSpec = namedType.asn1Object
616 # wrap open type blob if needed
617 if namedType.openType:
619 if componentSpec.typeId in (
620 univ.SetOf.typeId, univ.SequenceOf.typeId):
622 substrate += encodeFun(
623 component, componentSpec,
624 **dict(options, wrapType=componentSpec.componentType))
626 else:
627 chunk = encodeFun(component, componentSpec, **options)
629 if componentSpec.isSameTypeWith(component):
630 substrate += chunk
632 else:
633 substrate += encodeFun(chunk, componentSpec, **options)
635 if LOG:
636 LOG('wrapped with wrap type %r' % (componentSpec,))
638 else:
639 substrate += encodeFun(component, componentSpec, **options)
641 return substrate, True, True
644class SequenceOfEncoder(AbstractItemEncoder):
645 def _encodeComponents(self, value, asn1Spec, encodeFun, **options):
647 if asn1Spec is None:
648 inconsistency = value.isInconsistent
649 if inconsistency:
650 raise inconsistency
652 else:
653 asn1Spec = asn1Spec.componentType
655 chunks = []
657 wrapType = options.pop('wrapType', None)
659 for idx, component in enumerate(value):
660 chunk = encodeFun(component, asn1Spec, **options)
662 if (wrapType is not None and
663 not wrapType.isSameTypeWith(component)):
664 # wrap encoded value with wrapper container (e.g. ANY)
665 chunk = encodeFun(chunk, wrapType, **options)
667 if LOG:
668 LOG('wrapped with wrap type %r' % (wrapType,))
670 chunks.append(chunk)
672 return chunks
674 def encodeValue(self, value, asn1Spec, encodeFun, **options):
675 chunks = self._encodeComponents(
676 value, asn1Spec, encodeFun, **options)
678 return null.join(chunks), True, True
681class ChoiceEncoder(AbstractItemEncoder):
682 def encodeValue(self, value, asn1Spec, encodeFun, **options):
683 if asn1Spec is None:
684 component = value.getComponent()
685 else:
686 names = [namedType.name for namedType in asn1Spec.componentType.namedTypes
687 if namedType.name in value]
688 if len(names) != 1:
689 raise error.PyAsn1Error('%s components for Choice at %r' % (len(names) and 'Multiple ' or 'None ', value))
691 name = names[0]
693 component = value[name]
694 asn1Spec = asn1Spec[name]
696 return encodeFun(component, asn1Spec, **options), True, True
699class AnyEncoder(OctetStringEncoder):
700 def encodeValue(self, value, asn1Spec, encodeFun, **options):
701 if asn1Spec is None:
702 value = value.asOctets()
703 elif not isOctetsType(value):
704 value = asn1Spec.clone(value).asOctets()
706 return value, not options.get('defMode', True), True
709tagMap = {
710 eoo.endOfOctets.tagSet: EndOfOctetsEncoder(),
711 univ.Boolean.tagSet: BooleanEncoder(),
712 univ.Integer.tagSet: IntegerEncoder(),
713 univ.BitString.tagSet: BitStringEncoder(),
714 univ.OctetString.tagSet: OctetStringEncoder(),
715 univ.Null.tagSet: NullEncoder(),
716 univ.ObjectIdentifier.tagSet: ObjectIdentifierEncoder(),
717 univ.Enumerated.tagSet: IntegerEncoder(),
718 univ.Real.tagSet: RealEncoder(),
719 # Sequence & Set have same tags as SequenceOf & SetOf
720 univ.SequenceOf.tagSet: SequenceOfEncoder(),
721 univ.SetOf.tagSet: SequenceOfEncoder(),
722 univ.Choice.tagSet: ChoiceEncoder(),
723 # character string types
724 char.UTF8String.tagSet: OctetStringEncoder(),
725 char.NumericString.tagSet: OctetStringEncoder(),
726 char.PrintableString.tagSet: OctetStringEncoder(),
727 char.TeletexString.tagSet: OctetStringEncoder(),
728 char.VideotexString.tagSet: OctetStringEncoder(),
729 char.IA5String.tagSet: OctetStringEncoder(),
730 char.GraphicString.tagSet: OctetStringEncoder(),
731 char.VisibleString.tagSet: OctetStringEncoder(),
732 char.GeneralString.tagSet: OctetStringEncoder(),
733 char.UniversalString.tagSet: OctetStringEncoder(),
734 char.BMPString.tagSet: OctetStringEncoder(),
735 # useful types
736 useful.ObjectDescriptor.tagSet: OctetStringEncoder(),
737 useful.GeneralizedTime.tagSet: OctetStringEncoder(),
738 useful.UTCTime.tagSet: OctetStringEncoder()
739}
741# Put in ambiguous & non-ambiguous types for faster codec lookup
742typeMap = {
743 univ.Boolean.typeId: BooleanEncoder(),
744 univ.Integer.typeId: IntegerEncoder(),
745 univ.BitString.typeId: BitStringEncoder(),
746 univ.OctetString.typeId: OctetStringEncoder(),
747 univ.Null.typeId: NullEncoder(),
748 univ.ObjectIdentifier.typeId: ObjectIdentifierEncoder(),
749 univ.Enumerated.typeId: IntegerEncoder(),
750 univ.Real.typeId: RealEncoder(),
751 # Sequence & Set have same tags as SequenceOf & SetOf
752 univ.Set.typeId: SequenceEncoder(),
753 univ.SetOf.typeId: SequenceOfEncoder(),
754 univ.Sequence.typeId: SequenceEncoder(),
755 univ.SequenceOf.typeId: SequenceOfEncoder(),
756 univ.Choice.typeId: ChoiceEncoder(),
757 univ.Any.typeId: AnyEncoder(),
758 # character string types
759 char.UTF8String.typeId: OctetStringEncoder(),
760 char.NumericString.typeId: OctetStringEncoder(),
761 char.PrintableString.typeId: OctetStringEncoder(),
762 char.TeletexString.typeId: OctetStringEncoder(),
763 char.VideotexString.typeId: OctetStringEncoder(),
764 char.IA5String.typeId: OctetStringEncoder(),
765 char.GraphicString.typeId: OctetStringEncoder(),
766 char.VisibleString.typeId: OctetStringEncoder(),
767 char.GeneralString.typeId: OctetStringEncoder(),
768 char.UniversalString.typeId: OctetStringEncoder(),
769 char.BMPString.typeId: OctetStringEncoder(),
770 # useful types
771 useful.ObjectDescriptor.typeId: OctetStringEncoder(),
772 useful.GeneralizedTime.typeId: OctetStringEncoder(),
773 useful.UTCTime.typeId: OctetStringEncoder()
774}
777class Encoder(object):
778 fixedDefLengthMode = None
779 fixedChunkSize = None
781 # noinspection PyDefaultArgument
782 def __init__(self, tagMap, typeMap={}):
783 self.__tagMap = tagMap
784 self.__typeMap = typeMap
786 def __call__(self, value, asn1Spec=None, **options):
787 try:
788 if asn1Spec is None:
789 typeId = value.typeId
790 else:
791 typeId = asn1Spec.typeId
793 except AttributeError:
794 raise error.PyAsn1Error('Value %r is not ASN.1 type instance '
795 'and "asn1Spec" not given' % (value,))
797 if LOG:
798 LOG('encoder called in %sdef mode, chunk size %s for '
799 'type %s, value:\n%s' % (not options.get('defMode', True) and 'in' or '', options.get('maxChunkSize', 0), asn1Spec is None and value.prettyPrintType() or asn1Spec.prettyPrintType(), value))
801 if self.fixedDefLengthMode is not None:
802 options.update(defMode=self.fixedDefLengthMode)
804 if self.fixedChunkSize is not None:
805 options.update(maxChunkSize=self.fixedChunkSize)
808 try:
809 concreteEncoder = self.__typeMap[typeId]
811 if LOG:
812 LOG('using value codec %s chosen by type ID %s' % (concreteEncoder.__class__.__name__, typeId))
814 except KeyError:
815 if asn1Spec is None:
816 tagSet = value.tagSet
817 else:
818 tagSet = asn1Spec.tagSet
820 # use base type for codec lookup to recover untagged types
821 baseTagSet = tag.TagSet(tagSet.baseTag, tagSet.baseTag)
823 try:
824 concreteEncoder = self.__tagMap[baseTagSet]
826 except KeyError:
827 raise error.PyAsn1Error('No encoder for %r (%s)' % (value, tagSet))
829 if LOG:
830 LOG('using value codec %s chosen by tagSet %s' % (concreteEncoder.__class__.__name__, tagSet))
832 substrate = concreteEncoder.encode(value, asn1Spec, self, **options)
834 if LOG:
835 LOG('codec %s built %s octets of substrate: %s\nencoder completed' % (concreteEncoder, len(substrate), debug.hexdump(substrate)))
837 return substrate
839#: Turns ASN.1 object into BER octet stream.
840#:
841#: Takes any ASN.1 object (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative)
842#: walks all its components recursively and produces a BER octet stream.
843#:
844#: Parameters
845#: ----------
846#: value: either a Python or pyasn1 object (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative)
847#: A Python or pyasn1 object to encode. If Python object is given, `asnSpec`
848#: parameter is required to guide the encoding process.
849#:
850#: Keyword Args
851#: ------------
852#: asn1Spec:
853#: Optional ASN.1 schema or value object e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative
854#:
855#: defMode: :py:class:`bool`
856#: If :obj:`False`, produces indefinite length encoding
857#:
858#: maxChunkSize: :py:class:`int`
859#: Maximum chunk size in chunked encoding mode (0 denotes unlimited chunk size)
860#:
861#: Returns
862#: -------
863#: : :py:class:`bytes` (Python 3) or :py:class:`str` (Python 2)
864#: Given ASN.1 object encoded into BER octetstream
865#:
866#: Raises
867#: ------
868#: ~pyasn1.error.PyAsn1Error
869#: On encoding errors
870#:
871#: Examples
872#: --------
873#: Encode Python value into BER with ASN.1 schema
874#:
875#: .. code-block:: pycon
876#:
877#: >>> seq = SequenceOf(componentType=Integer())
878#: >>> encode([1, 2, 3], asn1Spec=seq)
879#: b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03'
880#:
881#: Encode ASN.1 value object into BER
882#:
883#: .. code-block:: pycon
884#:
885#: >>> seq = SequenceOf(componentType=Integer())
886#: >>> seq.extend([1, 2, 3])
887#: >>> encode(seq)
888#: b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03'
889#:
890encode = Encoder(tagMap, typeMap)