1# 
    2# This file is part of pyasn1 software. 
    3# 
    4# Copyright (c) 2005-2020, Ilya Etingof <etingof@gmail.com> 
    5# License: https://pyasn1.readthedocs.io/en/latest/license.html 
    6# 
    7import sys 
    8import warnings 
    9 
    10from pyasn1 import debug 
    11from pyasn1 import error 
    12from pyasn1.codec.ber import eoo 
    13from pyasn1.compat import _MISSING 
    14from pyasn1.compat.integer import to_bytes 
    15from pyasn1.type import char 
    16from pyasn1.type import tag 
    17from pyasn1.type import univ 
    18from pyasn1.type import useful 
    19 
    20__all__ = ['Encoder', 'encode'] 
    21 
    22LOG = debug.registerLoggee(__name__, flags=debug.DEBUG_ENCODER) 
    23 
    24 
    25class AbstractItemEncoder(object): 
    26    supportIndefLenMode = True 
    27 
    28    # An outcome of otherwise legit call `encodeFun(eoo.endOfOctets)` 
    29    eooIntegerSubstrate = (0, 0) 
    30    eooOctetsSubstrate = bytes(eooIntegerSubstrate) 
    31 
    32    # noinspection PyMethodMayBeStatic 
    33    def encodeTag(self, singleTag, isConstructed): 
    34        tagClass, tagFormat, tagId = singleTag 
    35        encodedTag = tagClass | tagFormat 
    36        if isConstructed: 
    37            encodedTag |= tag.tagFormatConstructed 
    38 
    39        if tagId < 31: 
    40            return encodedTag | tagId, 
    41 
    42        else: 
    43            substrate = tagId & 0x7f, 
    44 
    45            tagId >>= 7 
    46 
    47            while tagId: 
    48                substrate = (0x80 | (tagId & 0x7f),) + substrate 
    49                tagId >>= 7 
    50 
    51            return (encodedTag | 0x1F,) + substrate 
    52 
    53    def encodeLength(self, length, defMode): 
    54        if not defMode and self.supportIndefLenMode: 
    55            return (0x80,) 
    56 
    57        if length < 0x80: 
    58            return length, 
    59 
    60        else: 
    61            substrate = () 
    62            while length: 
    63                substrate = (length & 0xff,) + substrate 
    64                length >>= 8 
    65 
    66            substrateLen = len(substrate) 
    67 
    68            if substrateLen > 126: 
    69                raise error.PyAsn1Error('Length octets overflow (%d)' % substrateLen) 
    70 
    71            return (0x80 | substrateLen,) + substrate 
    72 
    73    def encodeValue(self, value, asn1Spec, encodeFun, **options): 
    74        raise error.PyAsn1Error('Not implemented') 
    75 
    76    def encode(self, value, asn1Spec=None, encodeFun=None, **options): 
    77 
    78        if asn1Spec is None: 
    79            tagSet = value.tagSet 
    80        else: 
    81            tagSet = asn1Spec.tagSet 
    82 
    83        # untagged item? 
    84        if not tagSet: 
    85            substrate, isConstructed, isOctets = self.encodeValue( 
    86                value, asn1Spec, encodeFun, **options 
    87            ) 
    88            return substrate 
    89 
    90        defMode = options.get('defMode', True) 
    91 
    92        substrate = b'' 
    93 
    94        for idx, singleTag in enumerate(tagSet.superTags): 
    95 
    96            defModeOverride = defMode 
    97 
    98            # base tag? 
    99            if not idx: 
    100                try: 
    101                    substrate, isConstructed, isOctets = self.encodeValue( 
    102                        value, asn1Spec, encodeFun, **options 
    103                    ) 
    104 
    105                except error.PyAsn1Error as exc: 
    106                    raise error.PyAsn1Error( 
    107                        'Error encoding %r: %s' % (value, exc)) 
    108 
    109                if LOG: 
    110                    LOG('encoded %svalue %s into %s' % ( 
    111                        isConstructed and 'constructed ' or '', value, substrate 
    112                    )) 
    113 
    114                if not substrate and isConstructed and options.get('ifNotEmpty', False): 
    115                    return substrate 
    116 
    117                if not isConstructed: 
    118                    defModeOverride = True 
    119 
    120                    if LOG: 
    121                        LOG('overridden encoding mode into definitive for primitive type') 
    122 
    123            header = self.encodeTag(singleTag, isConstructed) 
    124 
    125            if LOG: 
    126                LOG('encoded %stag %s into %s' % ( 
    127                    isConstructed and 'constructed ' or '', 
    128                    singleTag, debug.hexdump(bytes(header)))) 
    129 
    130            header += self.encodeLength(len(substrate), defModeOverride) 
    131 
    132            if LOG: 
    133                LOG('encoded %s octets (tag + payload) into %s' % ( 
    134                    len(substrate), debug.hexdump(bytes(header)))) 
    135 
    136            if isOctets: 
    137                substrate = bytes(header) + substrate 
    138 
    139                if not defModeOverride: 
    140                    substrate += self.eooOctetsSubstrate 
    141 
    142            else: 
    143                substrate = header + substrate 
    144 
    145                if not defModeOverride: 
    146                    substrate += self.eooIntegerSubstrate 
    147 
    148        if not isOctets: 
    149            substrate = bytes(substrate) 
    150 
    151        return substrate 
    152 
    153 
    154class EndOfOctetsEncoder(AbstractItemEncoder): 
    155    def encodeValue(self, value, asn1Spec, encodeFun, **options): 
    156        return b'', False, True 
    157 
    158 
    159class BooleanEncoder(AbstractItemEncoder): 
    160    supportIndefLenMode = False 
    161 
    162    def encodeValue(self, value, asn1Spec, encodeFun, **options): 
    163        return value and (1,) or (0,), False, False 
    164 
    165 
    166class IntegerEncoder(AbstractItemEncoder): 
    167    supportIndefLenMode = False 
    168    supportCompactZero = False 
    169 
    170    def encodeValue(self, value, asn1Spec, encodeFun, **options): 
    171        if value == 0: 
    172            if LOG: 
    173                LOG('encoding %spayload for zero INTEGER' % ( 
    174                    self.supportCompactZero and 'no ' or '' 
    175                )) 
    176 
    177            # de-facto way to encode zero 
    178            if self.supportCompactZero: 
    179                return (), False, False 
    180            else: 
    181                return (0,), False, False 
    182 
    183        return to_bytes(int(value), signed=True), False, True 
    184 
    185 
    186class BitStringEncoder(AbstractItemEncoder): 
    187    def encodeValue(self, value, asn1Spec, encodeFun, **options): 
    188        if asn1Spec is not None: 
    189            # TODO: try to avoid ASN.1 schema instantiation 
    190            value = asn1Spec.clone(value) 
    191 
    192        valueLength = len(value) 
    193        if valueLength % 8: 
    194            alignedValue = value << (8 - valueLength % 8) 
    195        else: 
    196            alignedValue = value 
    197 
    198        maxChunkSize = options.get('maxChunkSize', 0) 
    199        if not maxChunkSize or len(alignedValue) <= maxChunkSize * 8: 
    200            substrate = alignedValue.asOctets() 
    201            return bytes((len(substrate) * 8 - valueLength,)) + substrate, False, True 
    202 
    203        if LOG: 
    204            LOG('encoding into up to %s-octet chunks' % maxChunkSize) 
    205 
    206        baseTag = value.tagSet.baseTag 
    207 
    208        # strip off explicit tags 
    209        if baseTag: 
    210            tagSet = tag.TagSet(baseTag, baseTag) 
    211 
    212        else: 
    213            tagSet = tag.TagSet() 
    214 
    215        alignedValue = alignedValue.clone(tagSet=tagSet) 
    216 
    217        stop = 0 
    218        substrate = b'' 
    219        while stop < valueLength: 
    220            start = stop 
    221            stop = min(start + maxChunkSize * 8, valueLength) 
    222            substrate += encodeFun(alignedValue[start:stop], asn1Spec, **options) 
    223 
    224        return substrate, True, True 
    225 
    226 
    227class OctetStringEncoder(AbstractItemEncoder): 
    228 
    229    def encodeValue(self, value, asn1Spec, encodeFun, **options): 
    230 
    231        if asn1Spec is None: 
    232            substrate = value.asOctets() 
    233 
    234        elif not isinstance(value, bytes): 
    235            substrate = asn1Spec.clone(value).asOctets() 
    236 
    237        else: 
    238            substrate = value 
    239 
    240        maxChunkSize = options.get('maxChunkSize', 0) 
    241 
    242        if not maxChunkSize or len(substrate) <= maxChunkSize: 
    243            return substrate, False, True 
    244 
    245        if LOG: 
    246            LOG('encoding into up to %s-octet chunks' % maxChunkSize) 
    247 
    248        # strip off explicit tags for inner chunks 
    249 
    250        if asn1Spec is None: 
    251            baseTag = value.tagSet.baseTag 
    252 
    253            # strip off explicit tags 
    254            if baseTag: 
    255                tagSet = tag.TagSet(baseTag, baseTag) 
    256 
    257            else: 
    258                tagSet = tag.TagSet() 
    259 
    260            asn1Spec = value.clone(tagSet=tagSet) 
    261 
    262        elif not isinstance(value, bytes): 
    263            baseTag = asn1Spec.tagSet.baseTag 
    264 
    265            # strip off explicit tags 
    266            if baseTag: 
    267                tagSet = tag.TagSet(baseTag, baseTag) 
    268 
    269            else: 
    270                tagSet = tag.TagSet() 
    271 
    272            asn1Spec = asn1Spec.clone(tagSet=tagSet) 
    273 
    274        pos = 0 
    275        substrate = b'' 
    276 
    277        while True: 
    278            chunk = value[pos:pos + maxChunkSize] 
    279            if not chunk: 
    280                break 
    281 
    282            substrate += encodeFun(chunk, asn1Spec, **options) 
    283            pos += maxChunkSize 
    284 
    285        return substrate, True, True 
    286 
    287 
    288class NullEncoder(AbstractItemEncoder): 
    289    supportIndefLenMode = False 
    290 
    291    def encodeValue(self, value, asn1Spec, encodeFun, **options): 
    292        return b'', False, True 
    293 
    294 
    295class ObjectIdentifierEncoder(AbstractItemEncoder): 
    296    supportIndefLenMode = False 
    297 
    298    def encodeValue(self, value, asn1Spec, encodeFun, **options): 
    299        if asn1Spec is not None: 
    300            value = asn1Spec.clone(value) 
    301 
    302        oid = value.asTuple() 
    303 
    304        # Build the first pair 
    305        try: 
    306            first = oid[0] 
    307            second = oid[1] 
    308 
    309        except IndexError: 
    310            raise error.PyAsn1Error('Short OID %s' % (value,)) 
    311 
    312        if 0 <= second <= 39: 
    313            if first == 1: 
    314                oid = (second + 40,) + oid[2:] 
    315            elif first == 0: 
    316                oid = (second,) + oid[2:] 
    317            elif first == 2: 
    318                oid = (second + 80,) + oid[2:] 
    319            else: 
    320                raise error.PyAsn1Error('Impossible first/second arcs at %s' % (value,)) 
    321 
    322        elif first == 2: 
    323            oid = (second + 80,) + oid[2:] 
    324 
    325        else: 
    326            raise error.PyAsn1Error('Impossible first/second arcs at %s' % (value,)) 
    327 
    328        octets = () 
    329 
    330        # Cycle through subIds 
    331        for subOid in oid: 
    332            if 0 <= subOid <= 127: 
    333                # Optimize for the common case 
    334                octets += (subOid,) 
    335 
    336            elif subOid > 127: 
    337                # Pack large Sub-Object IDs 
    338                res = (subOid & 0x7f,) 
    339                subOid >>= 7 
    340 
    341                while subOid: 
    342                    res = (0x80 | (subOid & 0x7f),) + res 
    343                    subOid >>= 7 
    344 
    345                # Add packed Sub-Object ID to resulted Object ID 
    346                octets += res 
    347 
    348            else: 
    349                raise error.PyAsn1Error('Negative OID arc %s at %s' % (subOid, value)) 
    350 
    351        return octets, False, False 
    352 
    353 
    354class RelativeOIDEncoder(AbstractItemEncoder): 
    355    supportIndefLenMode = False 
    356 
    357    def encodeValue(self, value, asn1Spec, encodeFun, **options): 
    358        if asn1Spec is not None: 
    359            value = asn1Spec.clone(value) 
    360 
    361        octets = () 
    362 
    363        # Cycle through subIds 
    364        for subOid in value.asTuple(): 
    365            if 0 <= subOid <= 127: 
    366                # Optimize for the common case 
    367                octets += (subOid,) 
    368 
    369            elif subOid > 127: 
    370                # Pack large Sub-Object IDs 
    371                res = (subOid & 0x7f,) 
    372                subOid >>= 7 
    373 
    374                while subOid: 
    375                    res = (0x80 | (subOid & 0x7f),) + res 
    376                    subOid >>= 7 
    377 
    378                # Add packed Sub-Object ID to resulted RELATIVE-OID 
    379                octets += res 
    380 
    381            else: 
    382                raise error.PyAsn1Error('Negative RELATIVE-OID arc %s at %s' % (subOid, value)) 
    383 
    384        return octets, False, False 
    385 
    386 
    387class RealEncoder(AbstractItemEncoder): 
    388    supportIndefLenMode = False 
    389    binEncBase = 2  # set to None to choose encoding base automatically 
    390 
    391    @staticmethod 
    392    def _dropFloatingPoint(m, encbase, e): 
    393        ms, es = 1, 1 
    394        if m < 0: 
    395            ms = -1  # mantissa sign 
    396 
    397        if e < 0: 
    398            es = -1  # exponent sign 
    399 
    400        m *= ms 
    401 
    402        if encbase == 8: 
    403            m *= 2 ** (abs(e) % 3 * es) 
    404            e = abs(e) // 3 * es 
    405 
    406        elif encbase == 16: 
    407            m *= 2 ** (abs(e) % 4 * es) 
    408            e = abs(e) // 4 * es 
    409 
    410        while True: 
    411            if int(m) != m: 
    412                m *= encbase 
    413                e -= 1 
    414                continue 
    415            break 
    416 
    417        return ms, int(m), encbase, e 
    418 
    419    def _chooseEncBase(self, value): 
    420        m, b, e = value 
    421        encBase = [2, 8, 16] 
    422        if value.binEncBase in encBase: 
    423            return self._dropFloatingPoint(m, value.binEncBase, e) 
    424 
    425        elif self.binEncBase in encBase: 
    426            return self._dropFloatingPoint(m, self.binEncBase, e) 
    427 
    428        # auto choosing base 2/8/16 
    429        mantissa = [m, m, m] 
    430        exponent = [e, e, e] 
    431        sign = 1 
    432        encbase = 2 
    433        e = float('inf') 
    434 
    435        for i in range(3): 
    436            (sign, 
    437             mantissa[i], 
    438             encBase[i], 
    439             exponent[i]) = self._dropFloatingPoint(mantissa[i], encBase[i], exponent[i]) 
    440 
    441            if abs(exponent[i]) < abs(e) or (abs(exponent[i]) == abs(e) and mantissa[i] < m): 
    442                e = exponent[i] 
    443                m = int(mantissa[i]) 
    444                encbase = encBase[i] 
    445 
    446        if LOG: 
    447            LOG('automatically chosen REAL encoding base %s, sign %s, mantissa %s, ' 
    448                'exponent %s' % (encbase, sign, m, e)) 
    449 
    450        return sign, m, encbase, e 
    451 
    452    def encodeValue(self, value, asn1Spec, encodeFun, **options): 
    453        if asn1Spec is not None: 
    454            value = asn1Spec.clone(value) 
    455 
    456        if value.isPlusInf: 
    457            return (0x40,), False, False 
    458 
    459        if value.isMinusInf: 
    460            return (0x41,), False, False 
    461 
    462        m, b, e = value 
    463 
    464        if not m: 
    465            return b'', False, True 
    466 
    467        if b == 10: 
    468            if LOG: 
    469                LOG('encoding REAL into character form') 
    470 
    471            return b'\x03%dE%s%d' % (m, e == 0 and b'+' or b'', e), False, True 
    472 
    473        elif b == 2: 
    474            fo = 0x80  # binary encoding 
    475            ms, m, encbase, e = self._chooseEncBase(value) 
    476 
    477            if ms < 0:  # mantissa sign 
    478                fo |= 0x40  # sign bit 
    479 
    480            # exponent & mantissa normalization 
    481            if encbase == 2: 
    482                while m & 0x1 == 0: 
    483                    m >>= 1 
    484                    e += 1 
    485 
    486            elif encbase == 8: 
    487                while m & 0x7 == 0: 
    488                    m >>= 3 
    489                    e += 1 
    490                fo |= 0x10 
    491 
    492            else:  # encbase = 16 
    493                while m & 0xf == 0: 
    494                    m >>= 4 
    495                    e += 1 
    496                fo |= 0x20 
    497 
    498            sf = 0  # scale factor 
    499 
    500            while m & 0x1 == 0: 
    501                m >>= 1 
    502                sf += 1 
    503 
    504            if sf > 3: 
    505                raise error.PyAsn1Error('Scale factor overflow')  # bug if raised 
    506 
    507            fo |= sf << 2 
    508            eo = b'' 
    509            if e == 0 or e == -1: 
    510                eo = bytes((e & 0xff,)) 
    511 
    512            else: 
    513                while e not in (0, -1): 
    514                    eo = bytes((e & 0xff,)) + eo 
    515                    e >>= 8 
    516 
    517                if e == 0 and eo and eo[0] & 0x80: 
    518                    eo = bytes((0,)) + eo 
    519 
    520                if e == -1 and eo and not (eo[0] & 0x80): 
    521                    eo = bytes((0xff,)) + eo 
    522 
    523            n = len(eo) 
    524            if n > 0xff: 
    525                raise error.PyAsn1Error('Real exponent overflow') 
    526 
    527            if n == 1: 
    528                pass 
    529 
    530            elif n == 2: 
    531                fo |= 1 
    532 
    533            elif n == 3: 
    534                fo |= 2 
    535 
    536            else: 
    537                fo |= 3 
    538                eo = bytes((n & 0xff,)) + eo 
    539 
    540            po = b'' 
    541 
    542            while m: 
    543                po = bytes((m & 0xff,)) + po 
    544                m >>= 8 
    545 
    546            substrate = bytes((fo,)) + eo + po 
    547 
    548            return substrate, False, True 
    549 
    550        else: 
    551            raise error.PyAsn1Error('Prohibited Real base %s' % b) 
    552 
    553 
    554class SequenceEncoder(AbstractItemEncoder): 
    555    omitEmptyOptionals = False 
    556 
    557    # TODO: handling three flavors of input is too much -- split over codecs 
    558 
    559    def encodeValue(self, value, asn1Spec, encodeFun, **options): 
    560 
    561        substrate = b'' 
    562 
    563        omitEmptyOptionals = options.get( 
    564            'omitEmptyOptionals', self.omitEmptyOptionals) 
    565 
    566        if LOG: 
    567            LOG('%sencoding empty OPTIONAL components' % ( 
    568                    omitEmptyOptionals and 'not ' or '')) 
    569 
    570        if asn1Spec is None: 
    571            # instance of ASN.1 schema 
    572            inconsistency = value.isInconsistent 
    573            if inconsistency: 
    574                raise error.PyAsn1Error( 
    575                    f"ASN.1 object {value.__class__.__name__} is inconsistent") 
    576 
    577            namedTypes = value.componentType 
    578 
    579            for idx, component in enumerate(value.values()): 
    580                if namedTypes: 
    581                    namedType = namedTypes[idx] 
    582 
    583                    if namedType.isOptional and not component.isValue: 
    584                        if LOG: 
    585                            LOG('not encoding OPTIONAL component %r' % (namedType,)) 
    586                        continue 
    587 
    588                    if namedType.isDefaulted and component == namedType.asn1Object: 
    589                        if LOG: 
    590                            LOG('not encoding DEFAULT component %r' % (namedType,)) 
    591                        continue 
    592 
    593                    if omitEmptyOptionals: 
    594                        options.update(ifNotEmpty=namedType.isOptional) 
    595 
    596                # wrap open type blob if needed 
    597                if namedTypes and namedType.openType: 
    598 
    599                    wrapType = namedType.asn1Object 
    600 
    601                    if wrapType.typeId in ( 
    602                            univ.SetOf.typeId, univ.SequenceOf.typeId): 
    603 
    604                        substrate += encodeFun( 
    605                                component, asn1Spec, 
    606                                **dict(options, wrapType=wrapType.componentType)) 
    607 
    608                    else: 
    609                        chunk = encodeFun(component, asn1Spec, **options) 
    610 
    611                        if wrapType.isSameTypeWith(component): 
    612                            substrate += chunk 
    613 
    614                        else: 
    615                            substrate += encodeFun(chunk, wrapType, **options) 
    616 
    617                            if LOG: 
    618                                LOG('wrapped with wrap type %r' % (wrapType,)) 
    619 
    620                else: 
    621                    substrate += encodeFun(component, asn1Spec, **options) 
    622 
    623        else: 
    624            # bare Python value + ASN.1 schema 
    625            for idx, namedType in enumerate(asn1Spec.componentType.namedTypes): 
    626 
    627                try: 
    628                    component = value[namedType.name] 
    629 
    630                except KeyError: 
    631                    raise error.PyAsn1Error('Component name "%s" not found in %r' % ( 
    632                        namedType.name, value)) 
    633 
    634                if namedType.isOptional and namedType.name not in value: 
    635                    if LOG: 
    636                        LOG('not encoding OPTIONAL component %r' % (namedType,)) 
    637                    continue 
    638 
    639                if namedType.isDefaulted and component == namedType.asn1Object: 
    640                    if LOG: 
    641                        LOG('not encoding DEFAULT component %r' % (namedType,)) 
    642                    continue 
    643 
    644                if omitEmptyOptionals: 
    645                    options.update(ifNotEmpty=namedType.isOptional) 
    646 
    647                componentSpec = namedType.asn1Object 
    648 
    649                # wrap open type blob if needed 
    650                if namedType.openType: 
    651 
    652                    if componentSpec.typeId in ( 
    653                            univ.SetOf.typeId, univ.SequenceOf.typeId): 
    654 
    655                        substrate += encodeFun( 
    656                                component, componentSpec, 
    657                                **dict(options, wrapType=componentSpec.componentType)) 
    658 
    659                    else: 
    660                        chunk = encodeFun(component, componentSpec, **options) 
    661 
    662                        if componentSpec.isSameTypeWith(component): 
    663                            substrate += chunk 
    664 
    665                        else: 
    666                            substrate += encodeFun(chunk, componentSpec, **options) 
    667 
    668                            if LOG: 
    669                                LOG('wrapped with wrap type %r' % (componentSpec,)) 
    670 
    671                else: 
    672                    substrate += encodeFun(component, componentSpec, **options) 
    673 
    674        return substrate, True, True 
    675 
    676 
    677class SequenceOfEncoder(AbstractItemEncoder): 
    678    def _encodeComponents(self, value, asn1Spec, encodeFun, **options): 
    679 
    680        if asn1Spec is None: 
    681            inconsistency = value.isInconsistent 
    682            if inconsistency: 
    683                raise error.PyAsn1Error( 
    684                    f"ASN.1 object {value.__class__.__name__} is inconsistent") 
    685 
    686        else: 
    687            asn1Spec = asn1Spec.componentType 
    688 
    689        chunks = [] 
    690 
    691        wrapType = options.pop('wrapType', None) 
    692 
    693        for idx, component in enumerate(value): 
    694            chunk = encodeFun(component, asn1Spec, **options) 
    695 
    696            if (wrapType is not None and 
    697                    not wrapType.isSameTypeWith(component)): 
    698                # wrap encoded value with wrapper container (e.g. ANY) 
    699                chunk = encodeFun(chunk, wrapType, **options) 
    700 
    701                if LOG: 
    702                    LOG('wrapped with wrap type %r' % (wrapType,)) 
    703 
    704            chunks.append(chunk) 
    705 
    706        return chunks 
    707 
    708    def encodeValue(self, value, asn1Spec, encodeFun, **options): 
    709        chunks = self._encodeComponents( 
    710            value, asn1Spec, encodeFun, **options) 
    711 
    712        return b''.join(chunks), True, True 
    713 
    714 
    715class ChoiceEncoder(AbstractItemEncoder): 
    716    def encodeValue(self, value, asn1Spec, encodeFun, **options): 
    717        if asn1Spec is None: 
    718            component = value.getComponent() 
    719        else: 
    720            names = [namedType.name for namedType in asn1Spec.componentType.namedTypes 
    721                     if namedType.name in value] 
    722            if len(names) != 1: 
    723                raise error.PyAsn1Error('%s components for Choice at %r' % (len(names) and 'Multiple ' or 'None ', value)) 
    724 
    725            name = names[0] 
    726 
    727            component = value[name] 
    728            asn1Spec = asn1Spec[name] 
    729 
    730        return encodeFun(component, asn1Spec, **options), True, True 
    731 
    732 
    733class AnyEncoder(OctetStringEncoder): 
    734    def encodeValue(self, value, asn1Spec, encodeFun, **options): 
    735        if asn1Spec is None: 
    736            value = value.asOctets() 
    737        elif not isinstance(value, bytes): 
    738            value = asn1Spec.clone(value).asOctets() 
    739 
    740        return value, not options.get('defMode', True), True 
    741 
    742 
    743TAG_MAP = { 
    744    eoo.endOfOctets.tagSet: EndOfOctetsEncoder(), 
    745    univ.Boolean.tagSet: BooleanEncoder(), 
    746    univ.Integer.tagSet: IntegerEncoder(), 
    747    univ.BitString.tagSet: BitStringEncoder(), 
    748    univ.OctetString.tagSet: OctetStringEncoder(), 
    749    univ.Null.tagSet: NullEncoder(), 
    750    univ.ObjectIdentifier.tagSet: ObjectIdentifierEncoder(), 
    751    univ.RelativeOID.tagSet: RelativeOIDEncoder(), 
    752    univ.Enumerated.tagSet: IntegerEncoder(), 
    753    univ.Real.tagSet: RealEncoder(), 
    754    # Sequence & Set have same tags as SequenceOf & SetOf 
    755    univ.SequenceOf.tagSet: SequenceOfEncoder(), 
    756    univ.SetOf.tagSet: SequenceOfEncoder(), 
    757    univ.Choice.tagSet: ChoiceEncoder(), 
    758    # character string types 
    759    char.UTF8String.tagSet: OctetStringEncoder(), 
    760    char.NumericString.tagSet: OctetStringEncoder(), 
    761    char.PrintableString.tagSet: OctetStringEncoder(), 
    762    char.TeletexString.tagSet: OctetStringEncoder(), 
    763    char.VideotexString.tagSet: OctetStringEncoder(), 
    764    char.IA5String.tagSet: OctetStringEncoder(), 
    765    char.GraphicString.tagSet: OctetStringEncoder(), 
    766    char.VisibleString.tagSet: OctetStringEncoder(), 
    767    char.GeneralString.tagSet: OctetStringEncoder(), 
    768    char.UniversalString.tagSet: OctetStringEncoder(), 
    769    char.BMPString.tagSet: OctetStringEncoder(), 
    770    # useful types 
    771    useful.ObjectDescriptor.tagSet: OctetStringEncoder(), 
    772    useful.GeneralizedTime.tagSet: OctetStringEncoder(), 
    773    useful.UTCTime.tagSet: OctetStringEncoder() 
    774} 
    775 
    776# Put in ambiguous & non-ambiguous types for faster codec lookup 
    777TYPE_MAP = { 
    778    univ.Boolean.typeId: BooleanEncoder(), 
    779    univ.Integer.typeId: IntegerEncoder(), 
    780    univ.BitString.typeId: BitStringEncoder(), 
    781    univ.OctetString.typeId: OctetStringEncoder(), 
    782    univ.Null.typeId: NullEncoder(), 
    783    univ.ObjectIdentifier.typeId: ObjectIdentifierEncoder(), 
    784    univ.RelativeOID.typeId: RelativeOIDEncoder(), 
    785    univ.Enumerated.typeId: IntegerEncoder(), 
    786    univ.Real.typeId: RealEncoder(), 
    787    # Sequence & Set have same tags as SequenceOf & SetOf 
    788    univ.Set.typeId: SequenceEncoder(), 
    789    univ.SetOf.typeId: SequenceOfEncoder(), 
    790    univ.Sequence.typeId: SequenceEncoder(), 
    791    univ.SequenceOf.typeId: SequenceOfEncoder(), 
    792    univ.Choice.typeId: ChoiceEncoder(), 
    793    univ.Any.typeId: AnyEncoder(), 
    794    # character string types 
    795    char.UTF8String.typeId: OctetStringEncoder(), 
    796    char.NumericString.typeId: OctetStringEncoder(), 
    797    char.PrintableString.typeId: OctetStringEncoder(), 
    798    char.TeletexString.typeId: OctetStringEncoder(), 
    799    char.VideotexString.typeId: OctetStringEncoder(), 
    800    char.IA5String.typeId: OctetStringEncoder(), 
    801    char.GraphicString.typeId: OctetStringEncoder(), 
    802    char.VisibleString.typeId: OctetStringEncoder(), 
    803    char.GeneralString.typeId: OctetStringEncoder(), 
    804    char.UniversalString.typeId: OctetStringEncoder(), 
    805    char.BMPString.typeId: OctetStringEncoder(), 
    806    # useful types 
    807    useful.ObjectDescriptor.typeId: OctetStringEncoder(), 
    808    useful.GeneralizedTime.typeId: OctetStringEncoder(), 
    809    useful.UTCTime.typeId: OctetStringEncoder() 
    810} 
    811 
    812 
    813class SingleItemEncoder(object): 
    814    fixedDefLengthMode = None 
    815    fixedChunkSize = None 
    816 
    817    TAG_MAP = TAG_MAP 
    818    TYPE_MAP = TYPE_MAP 
    819 
    820    def __init__(self, tagMap=_MISSING, typeMap=_MISSING, **ignored): 
    821        self._tagMap = tagMap if tagMap is not _MISSING else self.TAG_MAP 
    822        self._typeMap = typeMap if typeMap is not _MISSING else self.TYPE_MAP 
    823 
    824    def __call__(self, value, asn1Spec=None, **options): 
    825        try: 
    826            if asn1Spec is None: 
    827                typeId = value.typeId 
    828            else: 
    829                typeId = asn1Spec.typeId 
    830 
    831        except AttributeError: 
    832            raise error.PyAsn1Error('Value %r is not ASN.1 type instance ' 
    833                                    'and "asn1Spec" not given' % (value,)) 
    834 
    835        if LOG: 
    836            LOG('encoder called in %sdef mode, chunk size %s for type %s, ' 
    837                'value:\n%s' % (not options.get('defMode', True) and 'in' or '', 
    838                                options.get('maxChunkSize', 0), 
    839                                asn1Spec is None and value.prettyPrintType() or 
    840                                asn1Spec.prettyPrintType(), value)) 
    841 
    842        if self.fixedDefLengthMode is not None: 
    843            options.update(defMode=self.fixedDefLengthMode) 
    844 
    845        if self.fixedChunkSize is not None: 
    846            options.update(maxChunkSize=self.fixedChunkSize) 
    847 
    848        try: 
    849            concreteEncoder = self._typeMap[typeId] 
    850 
    851            if LOG: 
    852                LOG('using value codec %s chosen by type ID ' 
    853                    '%s' % (concreteEncoder.__class__.__name__, typeId)) 
    854 
    855        except KeyError: 
    856            if asn1Spec is None: 
    857                tagSet = value.tagSet 
    858            else: 
    859                tagSet = asn1Spec.tagSet 
    860 
    861            # use base type for codec lookup to recover untagged types 
    862            baseTagSet = tag.TagSet(tagSet.baseTag, tagSet.baseTag) 
    863 
    864            try: 
    865                concreteEncoder = self._tagMap[baseTagSet] 
    866 
    867            except KeyError: 
    868                raise error.PyAsn1Error('No encoder for %r (%s)' % (value, tagSet)) 
    869 
    870            if LOG: 
    871                LOG('using value codec %s chosen by tagSet ' 
    872                    '%s' % (concreteEncoder.__class__.__name__, tagSet)) 
    873 
    874        substrate = concreteEncoder.encode(value, asn1Spec, self, **options) 
    875 
    876        if LOG: 
    877            LOG('codec %s built %s octets of substrate: %s\nencoder ' 
    878                'completed' % (concreteEncoder, len(substrate), 
    879                               debug.hexdump(substrate))) 
    880 
    881        return substrate 
    882 
    883 
    884class Encoder(object): 
    885    SINGLE_ITEM_ENCODER = SingleItemEncoder 
    886 
    887    def __init__(self, tagMap=_MISSING, typeMap=_MISSING, **options): 
    888        self._singleItemEncoder = self.SINGLE_ITEM_ENCODER( 
    889            tagMap=tagMap, typeMap=typeMap, **options 
    890        ) 
    891 
    892    def __call__(self, pyObject, asn1Spec=None, **options): 
    893        return self._singleItemEncoder( 
    894            pyObject, asn1Spec=asn1Spec, **options) 
    895 
    896 
    897#: Turns ASN.1 object into BER octet stream. 
    898#: 
    899#: Takes any ASN.1 object (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) 
    900#: walks all its components recursively and produces a BER octet stream. 
    901#: 
    902#: Parameters 
    903#: ---------- 
    904#: value: either a Python or pyasn1 object (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) 
    905#:     A Python or pyasn1 object to encode. If Python object is given, `asnSpec` 
    906#:     parameter is required to guide the encoding process. 
    907#: 
    908#: Keyword Args 
    909#: ------------ 
    910#: asn1Spec: 
    911#:     Optional ASN.1 schema or value object e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative 
    912#: 
    913#: defMode: :py:class:`bool` 
    914#:     If :obj:`False`, produces indefinite length encoding 
    915#: 
    916#: maxChunkSize: :py:class:`int` 
    917#:     Maximum chunk size in chunked encoding mode (0 denotes unlimited chunk size) 
    918#: 
    919#: Returns 
    920#: ------- 
    921#: : :py:class:`bytes` 
    922#:     Given ASN.1 object encoded into BER octetstream 
    923#: 
    924#: Raises 
    925#: ------ 
    926#: ~pyasn1.error.PyAsn1Error 
    927#:     On encoding errors 
    928#: 
    929#: Examples 
    930#: -------- 
    931#: Encode Python value into BER with ASN.1 schema 
    932#: 
    933#: .. code-block:: pycon 
    934#: 
    935#:    >>> seq = SequenceOf(componentType=Integer()) 
    936#:    >>> encode([1, 2, 3], asn1Spec=seq) 
    937#:    b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03' 
    938#: 
    939#: Encode ASN.1 value object into BER 
    940#: 
    941#: .. code-block:: pycon 
    942#: 
    943#:    >>> seq = SequenceOf(componentType=Integer()) 
    944#:    >>> seq.extend([1, 2, 3]) 
    945#:    >>> encode(seq) 
    946#:    b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03' 
    947#: 
    948encode = Encoder() 
    949 
    950def __getattr__(attr: str): 
    951    if newAttr := {"tagMap": "TAG_MAP", "typeMap": "TYPE_MAP"}.get(attr): 
    952        warnings.warn(f"{attr} is deprecated. Please use {newAttr} instead.", DeprecationWarning) 
    953        return globals()[newAttr] 
    954    raise AttributeError(attr)