1#
2# This file is part of pyasn1 software.
3#
4# Copyright (c) 2005-2020, Ilya Etingof <etingof@gmail.com>
5# License: https://pyasn1.readthedocs.io/en/latest/license.html
6#
7import sys
8import warnings
9
10from pyasn1 import debug
11from pyasn1 import error
12from pyasn1.codec.ber import eoo
13from pyasn1.compat import _MISSING
14from pyasn1.compat.integer import to_bytes
15from pyasn1.type import char
16from pyasn1.type import tag
17from pyasn1.type import univ
18from pyasn1.type import useful
19
20__all__ = ['Encoder', 'encode']
21
22LOG = debug.registerLoggee(__name__, flags=debug.DEBUG_ENCODER)
23
24
25class AbstractItemEncoder(object):
26 supportIndefLenMode = True
27
28 # An outcome of otherwise legit call `encodeFun(eoo.endOfOctets)`
29 eooIntegerSubstrate = (0, 0)
30 eooOctetsSubstrate = bytes(eooIntegerSubstrate)
31
32 # noinspection PyMethodMayBeStatic
33 def encodeTag(self, singleTag, isConstructed):
34 tagClass, tagFormat, tagId = singleTag
35 encodedTag = tagClass | tagFormat
36 if isConstructed:
37 encodedTag |= tag.tagFormatConstructed
38
39 if tagId < 31:
40 return encodedTag | tagId,
41
42 else:
43 substrate = tagId & 0x7f,
44
45 tagId >>= 7
46
47 while tagId:
48 substrate = (0x80 | (tagId & 0x7f),) + substrate
49 tagId >>= 7
50
51 return (encodedTag | 0x1F,) + substrate
52
53 def encodeLength(self, length, defMode):
54 if not defMode and self.supportIndefLenMode:
55 return (0x80,)
56
57 if length < 0x80:
58 return length,
59
60 else:
61 substrate = ()
62 while length:
63 substrate = (length & 0xff,) + substrate
64 length >>= 8
65
66 substrateLen = len(substrate)
67
68 if substrateLen > 126:
69 raise error.PyAsn1Error('Length octets overflow (%d)' % substrateLen)
70
71 return (0x80 | substrateLen,) + substrate
72
73 def encodeValue(self, value, asn1Spec, encodeFun, **options):
74 raise error.PyAsn1Error('Not implemented')
75
76 def encode(self, value, asn1Spec=None, encodeFun=None, **options):
77
78 if asn1Spec is None:
79 tagSet = value.tagSet
80 else:
81 tagSet = asn1Spec.tagSet
82
83 # untagged item?
84 if not tagSet:
85 substrate, isConstructed, isOctets = self.encodeValue(
86 value, asn1Spec, encodeFun, **options
87 )
88 return substrate
89
90 defMode = options.get('defMode', True)
91
92 substrate = b''
93
94 for idx, singleTag in enumerate(tagSet.superTags):
95
96 defModeOverride = defMode
97
98 # base tag?
99 if not idx:
100 try:
101 substrate, isConstructed, isOctets = self.encodeValue(
102 value, asn1Spec, encodeFun, **options
103 )
104
105 except error.PyAsn1Error as exc:
106 raise error.PyAsn1Error(
107 'Error encoding %r: %s' % (value, exc))
108
109 if LOG:
110 LOG('encoded %svalue %s into %s' % (
111 isConstructed and 'constructed ' or '', value, substrate
112 ))
113
114 if not substrate and isConstructed and options.get('ifNotEmpty', False):
115 return substrate
116
117 if not isConstructed:
118 defModeOverride = True
119
120 if LOG:
121 LOG('overridden encoding mode into definitive for primitive type')
122
123 header = self.encodeTag(singleTag, isConstructed)
124
125 if LOG:
126 LOG('encoded %stag %s into %s' % (
127 isConstructed and 'constructed ' or '',
128 singleTag, debug.hexdump(bytes(header))))
129
130 header += self.encodeLength(len(substrate), defModeOverride)
131
132 if LOG:
133 LOG('encoded %s octets (tag + payload) into %s' % (
134 len(substrate), debug.hexdump(bytes(header))))
135
136 if isOctets:
137 substrate = bytes(header) + substrate
138
139 if not defModeOverride:
140 substrate += self.eooOctetsSubstrate
141
142 else:
143 substrate = header + substrate
144
145 if not defModeOverride:
146 substrate += self.eooIntegerSubstrate
147
148 if not isOctets:
149 substrate = bytes(substrate)
150
151 return substrate
152
153
154class EndOfOctetsEncoder(AbstractItemEncoder):
155 def encodeValue(self, value, asn1Spec, encodeFun, **options):
156 return b'', False, True
157
158
159class BooleanEncoder(AbstractItemEncoder):
160 supportIndefLenMode = False
161
162 def encodeValue(self, value, asn1Spec, encodeFun, **options):
163 return value and (1,) or (0,), False, False
164
165
166class IntegerEncoder(AbstractItemEncoder):
167 supportIndefLenMode = False
168 supportCompactZero = False
169
170 def encodeValue(self, value, asn1Spec, encodeFun, **options):
171 if value == 0:
172 if LOG:
173 LOG('encoding %spayload for zero INTEGER' % (
174 self.supportCompactZero and 'no ' or ''
175 ))
176
177 # de-facto way to encode zero
178 if self.supportCompactZero:
179 return (), False, False
180 else:
181 return (0,), False, False
182
183 return to_bytes(int(value), signed=True), False, True
184
185
186class BitStringEncoder(AbstractItemEncoder):
187 def encodeValue(self, value, asn1Spec, encodeFun, **options):
188 if asn1Spec is not None:
189 # TODO: try to avoid ASN.1 schema instantiation
190 value = asn1Spec.clone(value)
191
192 valueLength = len(value)
193 if valueLength % 8:
194 alignedValue = value << (8 - valueLength % 8)
195 else:
196 alignedValue = value
197
198 maxChunkSize = options.get('maxChunkSize', 0)
199 if not maxChunkSize or len(alignedValue) <= maxChunkSize * 8:
200 substrate = alignedValue.asOctets()
201 return bytes((len(substrate) * 8 - valueLength,)) + substrate, False, True
202
203 if LOG:
204 LOG('encoding into up to %s-octet chunks' % maxChunkSize)
205
206 baseTag = value.tagSet.baseTag
207
208 # strip off explicit tags
209 if baseTag:
210 tagSet = tag.TagSet(baseTag, baseTag)
211
212 else:
213 tagSet = tag.TagSet()
214
215 alignedValue = alignedValue.clone(tagSet=tagSet)
216
217 stop = 0
218 substrate = b''
219 while stop < valueLength:
220 start = stop
221 stop = min(start + maxChunkSize * 8, valueLength)
222 substrate += encodeFun(alignedValue[start:stop], asn1Spec, **options)
223
224 return substrate, True, True
225
226
227class OctetStringEncoder(AbstractItemEncoder):
228
229 def encodeValue(self, value, asn1Spec, encodeFun, **options):
230
231 if asn1Spec is None:
232 substrate = value.asOctets()
233
234 elif not isinstance(value, bytes):
235 substrate = asn1Spec.clone(value).asOctets()
236
237 else:
238 substrate = value
239
240 maxChunkSize = options.get('maxChunkSize', 0)
241
242 if not maxChunkSize or len(substrate) <= maxChunkSize:
243 return substrate, False, True
244
245 if LOG:
246 LOG('encoding into up to %s-octet chunks' % maxChunkSize)
247
248 # strip off explicit tags for inner chunks
249
250 if asn1Spec is None:
251 baseTag = value.tagSet.baseTag
252
253 # strip off explicit tags
254 if baseTag:
255 tagSet = tag.TagSet(baseTag, baseTag)
256
257 else:
258 tagSet = tag.TagSet()
259
260 asn1Spec = value.clone(tagSet=tagSet)
261
262 elif not isinstance(value, bytes):
263 baseTag = asn1Spec.tagSet.baseTag
264
265 # strip off explicit tags
266 if baseTag:
267 tagSet = tag.TagSet(baseTag, baseTag)
268
269 else:
270 tagSet = tag.TagSet()
271
272 asn1Spec = asn1Spec.clone(tagSet=tagSet)
273
274 pos = 0
275 substrate = b''
276
277 while True:
278 chunk = value[pos:pos + maxChunkSize]
279 if not chunk:
280 break
281
282 substrate += encodeFun(chunk, asn1Spec, **options)
283 pos += maxChunkSize
284
285 return substrate, True, True
286
287
288class NullEncoder(AbstractItemEncoder):
289 supportIndefLenMode = False
290
291 def encodeValue(self, value, asn1Spec, encodeFun, **options):
292 return b'', False, True
293
294
295class ObjectIdentifierEncoder(AbstractItemEncoder):
296 supportIndefLenMode = False
297
298 def encodeValue(self, value, asn1Spec, encodeFun, **options):
299 if asn1Spec is not None:
300 value = asn1Spec.clone(value)
301
302 oid = value.asTuple()
303
304 # Build the first pair
305 try:
306 first = oid[0]
307 second = oid[1]
308
309 except IndexError:
310 raise error.PyAsn1Error('Short OID %s' % (value,))
311
312 if 0 <= second <= 39:
313 if first == 1:
314 oid = (second + 40,) + oid[2:]
315 elif first == 0:
316 oid = (second,) + oid[2:]
317 elif first == 2:
318 oid = (second + 80,) + oid[2:]
319 else:
320 raise error.PyAsn1Error('Impossible first/second arcs at %s' % (value,))
321
322 elif first == 2:
323 oid = (second + 80,) + oid[2:]
324
325 else:
326 raise error.PyAsn1Error('Impossible first/second arcs at %s' % (value,))
327
328 octets = ()
329
330 # Cycle through subIds
331 for subOid in oid:
332 if 0 <= subOid <= 127:
333 # Optimize for the common case
334 octets += (subOid,)
335
336 elif subOid > 127:
337 # Pack large Sub-Object IDs
338 res = (subOid & 0x7f,)
339 subOid >>= 7
340
341 while subOid:
342 res = (0x80 | (subOid & 0x7f),) + res
343 subOid >>= 7
344
345 # Add packed Sub-Object ID to resulted Object ID
346 octets += res
347
348 else:
349 raise error.PyAsn1Error('Negative OID arc %s at %s' % (subOid, value))
350
351 return octets, False, False
352
353
354class RelativeOIDEncoder(AbstractItemEncoder):
355 supportIndefLenMode = False
356
357 def encodeValue(self, value, asn1Spec, encodeFun, **options):
358 if asn1Spec is not None:
359 value = asn1Spec.clone(value)
360
361 octets = ()
362
363 # Cycle through subIds
364 for subOid in value.asTuple():
365 if 0 <= subOid <= 127:
366 # Optimize for the common case
367 octets += (subOid,)
368
369 elif subOid > 127:
370 # Pack large Sub-Object IDs
371 res = (subOid & 0x7f,)
372 subOid >>= 7
373
374 while subOid:
375 res = (0x80 | (subOid & 0x7f),) + res
376 subOid >>= 7
377
378 # Add packed Sub-Object ID to resulted RELATIVE-OID
379 octets += res
380
381 else:
382 raise error.PyAsn1Error('Negative RELATIVE-OID arc %s at %s' % (subOid, value))
383
384 return octets, False, False
385
386
387class RealEncoder(AbstractItemEncoder):
388 supportIndefLenMode = False
389 binEncBase = 2 # set to None to choose encoding base automatically
390
391 @staticmethod
392 def _dropFloatingPoint(m, encbase, e):
393 ms, es = 1, 1
394 if m < 0:
395 ms = -1 # mantissa sign
396
397 if e < 0:
398 es = -1 # exponent sign
399
400 m *= ms
401
402 if encbase == 8:
403 m *= 2 ** (abs(e) % 3 * es)
404 e = abs(e) // 3 * es
405
406 elif encbase == 16:
407 m *= 2 ** (abs(e) % 4 * es)
408 e = abs(e) // 4 * es
409
410 while True:
411 if int(m) != m:
412 m *= encbase
413 e -= 1
414 continue
415 break
416
417 return ms, int(m), encbase, e
418
419 def _chooseEncBase(self, value):
420 m, b, e = value
421 encBase = [2, 8, 16]
422 if value.binEncBase in encBase:
423 return self._dropFloatingPoint(m, value.binEncBase, e)
424
425 elif self.binEncBase in encBase:
426 return self._dropFloatingPoint(m, self.binEncBase, e)
427
428 # auto choosing base 2/8/16
429 mantissa = [m, m, m]
430 exponent = [e, e, e]
431 sign = 1
432 encbase = 2
433 e = float('inf')
434
435 for i in range(3):
436 (sign,
437 mantissa[i],
438 encBase[i],
439 exponent[i]) = self._dropFloatingPoint(mantissa[i], encBase[i], exponent[i])
440
441 if abs(exponent[i]) < abs(e) or (abs(exponent[i]) == abs(e) and mantissa[i] < m):
442 e = exponent[i]
443 m = int(mantissa[i])
444 encbase = encBase[i]
445
446 if LOG:
447 LOG('automatically chosen REAL encoding base %s, sign %s, mantissa %s, '
448 'exponent %s' % (encbase, sign, m, e))
449
450 return sign, m, encbase, e
451
452 def encodeValue(self, value, asn1Spec, encodeFun, **options):
453 if asn1Spec is not None:
454 value = asn1Spec.clone(value)
455
456 if value.isPlusInf:
457 return (0x40,), False, False
458
459 if value.isMinusInf:
460 return (0x41,), False, False
461
462 m, b, e = value
463
464 if not m:
465 return b'', False, True
466
467 if b == 10:
468 if LOG:
469 LOG('encoding REAL into character form')
470
471 return b'\x03%dE%s%d' % (m, e == 0 and b'+' or b'', e), False, True
472
473 elif b == 2:
474 fo = 0x80 # binary encoding
475 ms, m, encbase, e = self._chooseEncBase(value)
476
477 if ms < 0: # mantissa sign
478 fo |= 0x40 # sign bit
479
480 # exponent & mantissa normalization
481 if encbase == 2:
482 while m & 0x1 == 0:
483 m >>= 1
484 e += 1
485
486 elif encbase == 8:
487 while m & 0x7 == 0:
488 m >>= 3
489 e += 1
490 fo |= 0x10
491
492 else: # encbase = 16
493 while m & 0xf == 0:
494 m >>= 4
495 e += 1
496 fo |= 0x20
497
498 sf = 0 # scale factor
499
500 while m & 0x1 == 0:
501 m >>= 1
502 sf += 1
503
504 if sf > 3:
505 raise error.PyAsn1Error('Scale factor overflow') # bug if raised
506
507 fo |= sf << 2
508 eo = b''
509 if e == 0 or e == -1:
510 eo = bytes((e & 0xff,))
511
512 else:
513 while e not in (0, -1):
514 eo = bytes((e & 0xff,)) + eo
515 e >>= 8
516
517 if e == 0 and eo and eo[0] & 0x80:
518 eo = bytes((0,)) + eo
519
520 if e == -1 and eo and not (eo[0] & 0x80):
521 eo = bytes((0xff,)) + eo
522
523 n = len(eo)
524 if n > 0xff:
525 raise error.PyAsn1Error('Real exponent overflow')
526
527 if n == 1:
528 pass
529
530 elif n == 2:
531 fo |= 1
532
533 elif n == 3:
534 fo |= 2
535
536 else:
537 fo |= 3
538 eo = bytes((n & 0xff,)) + eo
539
540 po = b''
541
542 while m:
543 po = bytes((m & 0xff,)) + po
544 m >>= 8
545
546 substrate = bytes((fo,)) + eo + po
547
548 return substrate, False, True
549
550 else:
551 raise error.PyAsn1Error('Prohibited Real base %s' % b)
552
553
554class SequenceEncoder(AbstractItemEncoder):
555 omitEmptyOptionals = False
556
557 # TODO: handling three flavors of input is too much -- split over codecs
558
559 def encodeValue(self, value, asn1Spec, encodeFun, **options):
560
561 substrate = b''
562
563 omitEmptyOptionals = options.get(
564 'omitEmptyOptionals', self.omitEmptyOptionals)
565
566 if LOG:
567 LOG('%sencoding empty OPTIONAL components' % (
568 omitEmptyOptionals and 'not ' or ''))
569
570 if asn1Spec is None:
571 # instance of ASN.1 schema
572 inconsistency = value.isInconsistent
573 if inconsistency:
574 raise error.PyAsn1Error(
575 f"ASN.1 object {value.__class__.__name__} is inconsistent")
576
577 namedTypes = value.componentType
578
579 for idx, component in enumerate(value.values()):
580 if namedTypes:
581 namedType = namedTypes[idx]
582
583 if namedType.isOptional and not component.isValue:
584 if LOG:
585 LOG('not encoding OPTIONAL component %r' % (namedType,))
586 continue
587
588 if namedType.isDefaulted and component == namedType.asn1Object:
589 if LOG:
590 LOG('not encoding DEFAULT component %r' % (namedType,))
591 continue
592
593 if omitEmptyOptionals:
594 options.update(ifNotEmpty=namedType.isOptional)
595
596 # wrap open type blob if needed
597 if namedTypes and namedType.openType:
598
599 wrapType = namedType.asn1Object
600
601 if wrapType.typeId in (
602 univ.SetOf.typeId, univ.SequenceOf.typeId):
603
604 substrate += encodeFun(
605 component, asn1Spec,
606 **dict(options, wrapType=wrapType.componentType))
607
608 else:
609 chunk = encodeFun(component, asn1Spec, **options)
610
611 if wrapType.isSameTypeWith(component):
612 substrate += chunk
613
614 else:
615 substrate += encodeFun(chunk, wrapType, **options)
616
617 if LOG:
618 LOG('wrapped with wrap type %r' % (wrapType,))
619
620 else:
621 substrate += encodeFun(component, asn1Spec, **options)
622
623 else:
624 # bare Python value + ASN.1 schema
625 for idx, namedType in enumerate(asn1Spec.componentType.namedTypes):
626
627 try:
628 component = value[namedType.name]
629
630 except KeyError:
631 raise error.PyAsn1Error('Component name "%s" not found in %r' % (
632 namedType.name, value))
633
634 if namedType.isOptional and namedType.name not in value:
635 if LOG:
636 LOG('not encoding OPTIONAL component %r' % (namedType,))
637 continue
638
639 if namedType.isDefaulted and component == namedType.asn1Object:
640 if LOG:
641 LOG('not encoding DEFAULT component %r' % (namedType,))
642 continue
643
644 if omitEmptyOptionals:
645 options.update(ifNotEmpty=namedType.isOptional)
646
647 componentSpec = namedType.asn1Object
648
649 # wrap open type blob if needed
650 if namedType.openType:
651
652 if componentSpec.typeId in (
653 univ.SetOf.typeId, univ.SequenceOf.typeId):
654
655 substrate += encodeFun(
656 component, componentSpec,
657 **dict(options, wrapType=componentSpec.componentType))
658
659 else:
660 chunk = encodeFun(component, componentSpec, **options)
661
662 if componentSpec.isSameTypeWith(component):
663 substrate += chunk
664
665 else:
666 substrate += encodeFun(chunk, componentSpec, **options)
667
668 if LOG:
669 LOG('wrapped with wrap type %r' % (componentSpec,))
670
671 else:
672 substrate += encodeFun(component, componentSpec, **options)
673
674 return substrate, True, True
675
676
677class SequenceOfEncoder(AbstractItemEncoder):
678 def _encodeComponents(self, value, asn1Spec, encodeFun, **options):
679
680 if asn1Spec is None:
681 inconsistency = value.isInconsistent
682 if inconsistency:
683 raise error.PyAsn1Error(
684 f"ASN.1 object {value.__class__.__name__} is inconsistent")
685
686 else:
687 asn1Spec = asn1Spec.componentType
688
689 chunks = []
690
691 wrapType = options.pop('wrapType', None)
692
693 for idx, component in enumerate(value):
694 chunk = encodeFun(component, asn1Spec, **options)
695
696 if (wrapType is not None and
697 not wrapType.isSameTypeWith(component)):
698 # wrap encoded value with wrapper container (e.g. ANY)
699 chunk = encodeFun(chunk, wrapType, **options)
700
701 if LOG:
702 LOG('wrapped with wrap type %r' % (wrapType,))
703
704 chunks.append(chunk)
705
706 return chunks
707
708 def encodeValue(self, value, asn1Spec, encodeFun, **options):
709 chunks = self._encodeComponents(
710 value, asn1Spec, encodeFun, **options)
711
712 return b''.join(chunks), True, True
713
714
715class ChoiceEncoder(AbstractItemEncoder):
716 def encodeValue(self, value, asn1Spec, encodeFun, **options):
717 if asn1Spec is None:
718 component = value.getComponent()
719 else:
720 names = [namedType.name for namedType in asn1Spec.componentType.namedTypes
721 if namedType.name in value]
722 if len(names) != 1:
723 raise error.PyAsn1Error('%s components for Choice at %r' % (len(names) and 'Multiple ' or 'None ', value))
724
725 name = names[0]
726
727 component = value[name]
728 asn1Spec = asn1Spec[name]
729
730 return encodeFun(component, asn1Spec, **options), True, True
731
732
733class AnyEncoder(OctetStringEncoder):
734 def encodeValue(self, value, asn1Spec, encodeFun, **options):
735 if asn1Spec is None:
736 value = value.asOctets()
737 elif not isinstance(value, bytes):
738 value = asn1Spec.clone(value).asOctets()
739
740 return value, not options.get('defMode', True), True
741
742
743TAG_MAP = {
744 eoo.endOfOctets.tagSet: EndOfOctetsEncoder(),
745 univ.Boolean.tagSet: BooleanEncoder(),
746 univ.Integer.tagSet: IntegerEncoder(),
747 univ.BitString.tagSet: BitStringEncoder(),
748 univ.OctetString.tagSet: OctetStringEncoder(),
749 univ.Null.tagSet: NullEncoder(),
750 univ.ObjectIdentifier.tagSet: ObjectIdentifierEncoder(),
751 univ.RelativeOID.tagSet: RelativeOIDEncoder(),
752 univ.Enumerated.tagSet: IntegerEncoder(),
753 univ.Real.tagSet: RealEncoder(),
754 # Sequence & Set have same tags as SequenceOf & SetOf
755 univ.SequenceOf.tagSet: SequenceOfEncoder(),
756 univ.SetOf.tagSet: SequenceOfEncoder(),
757 univ.Choice.tagSet: ChoiceEncoder(),
758 # character string types
759 char.UTF8String.tagSet: OctetStringEncoder(),
760 char.NumericString.tagSet: OctetStringEncoder(),
761 char.PrintableString.tagSet: OctetStringEncoder(),
762 char.TeletexString.tagSet: OctetStringEncoder(),
763 char.VideotexString.tagSet: OctetStringEncoder(),
764 char.IA5String.tagSet: OctetStringEncoder(),
765 char.GraphicString.tagSet: OctetStringEncoder(),
766 char.VisibleString.tagSet: OctetStringEncoder(),
767 char.GeneralString.tagSet: OctetStringEncoder(),
768 char.UniversalString.tagSet: OctetStringEncoder(),
769 char.BMPString.tagSet: OctetStringEncoder(),
770 # useful types
771 useful.ObjectDescriptor.tagSet: OctetStringEncoder(),
772 useful.GeneralizedTime.tagSet: OctetStringEncoder(),
773 useful.UTCTime.tagSet: OctetStringEncoder()
774}
775
776# Put in ambiguous & non-ambiguous types for faster codec lookup
777TYPE_MAP = {
778 univ.Boolean.typeId: BooleanEncoder(),
779 univ.Integer.typeId: IntegerEncoder(),
780 univ.BitString.typeId: BitStringEncoder(),
781 univ.OctetString.typeId: OctetStringEncoder(),
782 univ.Null.typeId: NullEncoder(),
783 univ.ObjectIdentifier.typeId: ObjectIdentifierEncoder(),
784 univ.RelativeOID.typeId: RelativeOIDEncoder(),
785 univ.Enumerated.typeId: IntegerEncoder(),
786 univ.Real.typeId: RealEncoder(),
787 # Sequence & Set have same tags as SequenceOf & SetOf
788 univ.Set.typeId: SequenceEncoder(),
789 univ.SetOf.typeId: SequenceOfEncoder(),
790 univ.Sequence.typeId: SequenceEncoder(),
791 univ.SequenceOf.typeId: SequenceOfEncoder(),
792 univ.Choice.typeId: ChoiceEncoder(),
793 univ.Any.typeId: AnyEncoder(),
794 # character string types
795 char.UTF8String.typeId: OctetStringEncoder(),
796 char.NumericString.typeId: OctetStringEncoder(),
797 char.PrintableString.typeId: OctetStringEncoder(),
798 char.TeletexString.typeId: OctetStringEncoder(),
799 char.VideotexString.typeId: OctetStringEncoder(),
800 char.IA5String.typeId: OctetStringEncoder(),
801 char.GraphicString.typeId: OctetStringEncoder(),
802 char.VisibleString.typeId: OctetStringEncoder(),
803 char.GeneralString.typeId: OctetStringEncoder(),
804 char.UniversalString.typeId: OctetStringEncoder(),
805 char.BMPString.typeId: OctetStringEncoder(),
806 # useful types
807 useful.ObjectDescriptor.typeId: OctetStringEncoder(),
808 useful.GeneralizedTime.typeId: OctetStringEncoder(),
809 useful.UTCTime.typeId: OctetStringEncoder()
810}
811
812
813class SingleItemEncoder(object):
814 fixedDefLengthMode = None
815 fixedChunkSize = None
816
817 TAG_MAP = TAG_MAP
818 TYPE_MAP = TYPE_MAP
819
820 def __init__(self, tagMap=_MISSING, typeMap=_MISSING, **ignored):
821 self._tagMap = tagMap if tagMap is not _MISSING else self.TAG_MAP
822 self._typeMap = typeMap if typeMap is not _MISSING else self.TYPE_MAP
823
824 def __call__(self, value, asn1Spec=None, **options):
825 try:
826 if asn1Spec is None:
827 typeId = value.typeId
828 else:
829 typeId = asn1Spec.typeId
830
831 except AttributeError:
832 raise error.PyAsn1Error('Value %r is not ASN.1 type instance '
833 'and "asn1Spec" not given' % (value,))
834
835 if LOG:
836 LOG('encoder called in %sdef mode, chunk size %s for type %s, '
837 'value:\n%s' % (not options.get('defMode', True) and 'in' or '',
838 options.get('maxChunkSize', 0),
839 asn1Spec is None and value.prettyPrintType() or
840 asn1Spec.prettyPrintType(), value))
841
842 if self.fixedDefLengthMode is not None:
843 options.update(defMode=self.fixedDefLengthMode)
844
845 if self.fixedChunkSize is not None:
846 options.update(maxChunkSize=self.fixedChunkSize)
847
848 try:
849 concreteEncoder = self._typeMap[typeId]
850
851 if LOG:
852 LOG('using value codec %s chosen by type ID '
853 '%s' % (concreteEncoder.__class__.__name__, typeId))
854
855 except KeyError:
856 if asn1Spec is None:
857 tagSet = value.tagSet
858 else:
859 tagSet = asn1Spec.tagSet
860
861 # use base type for codec lookup to recover untagged types
862 baseTagSet = tag.TagSet(tagSet.baseTag, tagSet.baseTag)
863
864 try:
865 concreteEncoder = self._tagMap[baseTagSet]
866
867 except KeyError:
868 raise error.PyAsn1Error('No encoder for %r (%s)' % (value, tagSet))
869
870 if LOG:
871 LOG('using value codec %s chosen by tagSet '
872 '%s' % (concreteEncoder.__class__.__name__, tagSet))
873
874 substrate = concreteEncoder.encode(value, asn1Spec, self, **options)
875
876 if LOG:
877 LOG('codec %s built %s octets of substrate: %s\nencoder '
878 'completed' % (concreteEncoder, len(substrate),
879 debug.hexdump(substrate)))
880
881 return substrate
882
883
884class Encoder(object):
885 SINGLE_ITEM_ENCODER = SingleItemEncoder
886
887 def __init__(self, tagMap=_MISSING, typeMap=_MISSING, **options):
888 self._singleItemEncoder = self.SINGLE_ITEM_ENCODER(
889 tagMap=tagMap, typeMap=typeMap, **options
890 )
891
892 def __call__(self, pyObject, asn1Spec=None, **options):
893 return self._singleItemEncoder(
894 pyObject, asn1Spec=asn1Spec, **options)
895
896
897#: Turns ASN.1 object into BER octet stream.
898#:
899#: Takes any ASN.1 object (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative)
900#: walks all its components recursively and produces a BER octet stream.
901#:
902#: Parameters
903#: ----------
904#: value: either a Python or pyasn1 object (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative)
905#: A Python or pyasn1 object to encode. If Python object is given, `asnSpec`
906#: parameter is required to guide the encoding process.
907#:
908#: Keyword Args
909#: ------------
910#: asn1Spec:
911#: Optional ASN.1 schema or value object e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative
912#:
913#: defMode: :py:class:`bool`
914#: If :obj:`False`, produces indefinite length encoding
915#:
916#: maxChunkSize: :py:class:`int`
917#: Maximum chunk size in chunked encoding mode (0 denotes unlimited chunk size)
918#:
919#: Returns
920#: -------
921#: : :py:class:`bytes`
922#: Given ASN.1 object encoded into BER octetstream
923#:
924#: Raises
925#: ------
926#: ~pyasn1.error.PyAsn1Error
927#: On encoding errors
928#:
929#: Examples
930#: --------
931#: Encode Python value into BER with ASN.1 schema
932#:
933#: .. code-block:: pycon
934#:
935#: >>> seq = SequenceOf(componentType=Integer())
936#: >>> encode([1, 2, 3], asn1Spec=seq)
937#: b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03'
938#:
939#: Encode ASN.1 value object into BER
940#:
941#: .. code-block:: pycon
942#:
943#: >>> seq = SequenceOf(componentType=Integer())
944#: >>> seq.extend([1, 2, 3])
945#: >>> encode(seq)
946#: b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03'
947#:
948encode = Encoder()
949
950def __getattr__(attr: str):
951 if newAttr := {"tagMap": "TAG_MAP", "typeMap": "TYPE_MAP"}.get(attr):
952 warnings.warn(f"{attr} is deprecated. Please use {newAttr} instead.", DeprecationWarning)
953 return globals()[newAttr]
954 raise AttributeError(attr)