Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/asn1fields.py: 72%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
5# Acknowledgment: Maxence Tury <maxence.tury@ssi.gouv.fr>
7"""
8Classes that implement ASN.1 data structures.
9"""
11import copy
13from functools import reduce
15from scapy.asn1.asn1 import (
16 ASN1_BIT_STRING,
17 ASN1_BOOLEAN,
18 ASN1_Class,
19 ASN1_Class_UNIVERSAL,
20 ASN1_Error,
21 ASN1_INTEGER,
22 ASN1_NULL,
23 ASN1_OID,
24 ASN1_Object,
25 ASN1_STRING,
26)
27from scapy.asn1.ber import (
28 BER_Decoding_Error,
29 BER_id_dec,
30 BER_tagging_dec,
31 BER_tagging_enc,
32)
33from scapy.base_classes import BasePacket
34from scapy.volatile import (
35 GeneralizedTime,
36 RandChoice,
37 RandInt,
38 RandNum,
39 RandOID,
40 RandString,
41 RandField,
42)
44from scapy import packet
46from typing import (
47 Any,
48 AnyStr,
49 Callable,
50 Dict,
51 Generic,
52 List,
53 Optional,
54 Tuple,
55 Type,
56 TypeVar,
57 Union,
58 cast,
59 TYPE_CHECKING,
60)
62if TYPE_CHECKING:
63 from scapy.asn1packet import ASN1_Packet
66class ASN1F_badsequence(Exception):
67 pass
70class ASN1F_element(object):
71 pass
74##########################
75# Basic ASN1 Field #
76##########################
78_I = TypeVar('_I') # Internal storage
79_A = TypeVar('_A') # ASN.1 object
82class ASN1F_field(ASN1F_element, Generic[_I, _A]):
83 holds_packets = 0
84 islist = 0
85 ASN1_tag = ASN1_Class_UNIVERSAL.ANY
86 context = ASN1_Class_UNIVERSAL # type: Type[ASN1_Class]
88 def __init__(self,
89 name, # type: str
90 default, # type: Optional[_A]
91 context=None, # type: Optional[Type[ASN1_Class]]
92 implicit_tag=None, # type: Optional[int]
93 explicit_tag=None, # type: Optional[int]
94 flexible_tag=False, # type: Optional[bool]
95 size_len=None, # type: Optional[int]
96 ):
97 # type: (...) -> None
98 if context is not None:
99 self.context = context
100 self.name = name
101 if default is None:
102 self.default = default # type: Optional[_A]
103 elif isinstance(default, ASN1_NULL):
104 self.default = default # type: ignore
105 else:
106 self.default = self.ASN1_tag.asn1_object(default) # type: ignore
107 self.size_len = size_len
108 self.flexible_tag = flexible_tag
109 if (implicit_tag is not None) and (explicit_tag is not None):
110 err_msg = "field cannot be both implicitly and explicitly tagged"
111 raise ASN1_Error(err_msg)
112 self.implicit_tag = implicit_tag and int(implicit_tag)
113 self.explicit_tag = explicit_tag and int(explicit_tag)
114 # network_tag gets useful for ASN1F_CHOICE
115 self.network_tag = int(implicit_tag or explicit_tag or self.ASN1_tag)
116 self.owners = [] # type: List[Type[ASN1_Packet]]
118 def register_owner(self, cls):
119 # type: (Type[ASN1_Packet]) -> None
120 self.owners.append(cls)
122 def i2repr(self, pkt, x):
123 # type: (ASN1_Packet, _I) -> str
124 return repr(x)
126 def i2h(self, pkt, x):
127 # type: (ASN1_Packet, _I) -> Any
128 return x
130 def m2i(self, pkt, s):
131 # type: (ASN1_Packet, bytes) -> Tuple[_A, bytes]
132 """
133 The good thing about safedec is that it may still decode ASN1
134 even if there is a mismatch between the expected tag (self.ASN1_tag)
135 and the actual tag; the decoded ASN1 object will simply be put
136 into an ASN1_BADTAG object. However, safedec prevents the raising of
137 exceptions needed for ASN1F_optional processing.
138 Thus we use 'flexible_tag', which should be False with ASN1F_optional.
140 Regarding other fields, we might need to know whether encoding went
141 as expected or not. Noticeably, input methods from cert.py expect
142 certain exceptions to be raised. Hence default flexible_tag is False.
143 """
144 diff_tag, s = BER_tagging_dec(s, hidden_tag=self.ASN1_tag,
145 implicit_tag=self.implicit_tag,
146 explicit_tag=self.explicit_tag,
147 safe=self.flexible_tag,
148 _fname=self.name)
149 if diff_tag is not None:
150 # this implies that flexible_tag was True
151 if self.implicit_tag is not None:
152 self.implicit_tag = diff_tag
153 elif self.explicit_tag is not None:
154 self.explicit_tag = diff_tag
155 codec = self.ASN1_tag.get_codec(pkt.ASN1_codec)
156 if self.flexible_tag:
157 return codec.safedec(s, context=self.context) # type: ignore
158 else:
159 return codec.dec(s, context=self.context) # type: ignore
161 def i2m(self, pkt, x):
162 # type: (ASN1_Packet, Union[bytes, _I, _A]) -> bytes
163 if x is None:
164 return b""
165 if isinstance(x, ASN1_Object):
166 if (self.ASN1_tag == ASN1_Class_UNIVERSAL.ANY or
167 x.tag == ASN1_Class_UNIVERSAL.RAW or
168 x.tag == ASN1_Class_UNIVERSAL.ERROR or
169 self.ASN1_tag == x.tag):
170 s = x.enc(pkt.ASN1_codec)
171 else:
172 raise ASN1_Error("Encoding Error: got %r instead of an %r for field [%s]" % (x, self.ASN1_tag, self.name)) # noqa: E501
173 else:
174 s = self.ASN1_tag.get_codec(pkt.ASN1_codec).enc(x, size_len=self.size_len)
175 return BER_tagging_enc(s,
176 implicit_tag=self.implicit_tag,
177 explicit_tag=self.explicit_tag)
179 def any2i(self, pkt, x):
180 # type: (ASN1_Packet, Any) -> _I
181 return cast(_I, x)
183 def extract_packet(self,
184 cls, # type: Type[ASN1_Packet]
185 s, # type: bytes
186 _underlayer=None # type: Optional[ASN1_Packet]
187 ):
188 # type: (...) -> Tuple[ASN1_Packet, bytes]
189 try:
190 c = cls(s, _underlayer=_underlayer)
191 except ASN1F_badsequence:
192 c = packet.Raw(s, _underlayer=_underlayer) # type: ignore
193 cpad = c.getlayer(packet.Raw)
194 s = b""
195 if cpad is not None:
196 s = cpad.load
197 if cpad.underlayer:
198 del cpad.underlayer.payload
199 return c, s
201 def build(self, pkt):
202 # type: (ASN1_Packet) -> bytes
203 return self.i2m(pkt, getattr(pkt, self.name))
205 def dissect(self, pkt, s):
206 # type: (ASN1_Packet, bytes) -> bytes
207 v, s = self.m2i(pkt, s)
208 self.set_val(pkt, v)
209 return s
211 def do_copy(self, x):
212 # type: (Any) -> Any
213 if isinstance(x, list):
214 x = x[:]
215 for i in range(len(x)):
216 if isinstance(x[i], BasePacket):
217 x[i] = x[i].copy()
218 return x
219 if hasattr(x, "copy"):
220 return x.copy()
221 return x
223 def set_val(self, pkt, val):
224 # type: (ASN1_Packet, Any) -> None
225 setattr(pkt, self.name, val)
227 def is_empty(self, pkt):
228 # type: (ASN1_Packet) -> bool
229 return getattr(pkt, self.name) is None
231 def get_fields_list(self):
232 # type: () -> List[ASN1F_field[Any, Any]]
233 return [self]
235 def __str__(self):
236 # type: () -> str
237 return repr(self)
239 def randval(self):
240 # type: () -> RandField[_I]
241 return cast(RandField[_I], RandInt())
243 def copy(self):
244 # type: () -> ASN1F_field[_I, _A]
245 return copy.copy(self)
248############################
249# Simple ASN1 Fields #
250############################
252class ASN1F_BOOLEAN(ASN1F_field[bool, ASN1_BOOLEAN]):
253 ASN1_tag = ASN1_Class_UNIVERSAL.BOOLEAN
255 def randval(self):
256 # type: () -> RandChoice
257 return RandChoice(True, False)
260class ASN1F_INTEGER(ASN1F_field[int, ASN1_INTEGER]):
261 ASN1_tag = ASN1_Class_UNIVERSAL.INTEGER
263 def randval(self):
264 # type: () -> RandNum
265 return RandNum(-2**64, 2**64 - 1)
268class ASN1F_enum_INTEGER(ASN1F_INTEGER):
269 def __init__(self,
270 name, # type: str
271 default, # type: ASN1_INTEGER
272 enum, # type: Dict[int, str]
273 context=None, # type: Optional[Any]
274 implicit_tag=None, # type: Optional[Any]
275 explicit_tag=None, # type: Optional[Any]
276 ):
277 # type: (...) -> None
278 super(ASN1F_enum_INTEGER, self).__init__(
279 name, default, context=context,
280 implicit_tag=implicit_tag,
281 explicit_tag=explicit_tag
282 )
283 i2s = self.i2s = {} # type: Dict[int, str]
284 s2i = self.s2i = {} # type: Dict[str, int]
285 if isinstance(enum, list):
286 keys = range(len(enum))
287 else:
288 keys = list(enum)
289 if any(isinstance(x, str) for x in keys):
290 i2s, s2i = s2i, i2s # type: ignore
291 for k in keys:
292 i2s[k] = enum[k]
293 s2i[enum[k]] = k
295 def i2m(self,
296 pkt, # type: ASN1_Packet
297 s, # type: Union[bytes, str, int, ASN1_INTEGER]
298 ):
299 # type: (...) -> bytes
300 if not isinstance(s, str):
301 vs = s
302 else:
303 vs = self.s2i[s]
304 return super(ASN1F_enum_INTEGER, self).i2m(pkt, vs)
306 def i2repr(self,
307 pkt, # type: ASN1_Packet
308 x, # type: Union[str, int]
309 ):
310 # type: (...) -> str
311 if x is not None and isinstance(x, ASN1_INTEGER):
312 r = self.i2s.get(x.val)
313 if r:
314 return "'%s' %s" % (r, repr(x))
315 return repr(x)
318class ASN1F_BIT_STRING(ASN1F_field[str, ASN1_BIT_STRING]):
319 ASN1_tag = ASN1_Class_UNIVERSAL.BIT_STRING
321 def __init__(self,
322 name, # type: str
323 default, # type: Optional[Union[ASN1_BIT_STRING, AnyStr]]
324 default_readable=True, # type: bool
325 context=None, # type: Optional[Any]
326 implicit_tag=None, # type: Optional[int]
327 explicit_tag=None, # type: Optional[int]
328 ):
329 # type: (...) -> None
330 super(ASN1F_BIT_STRING, self).__init__(
331 name, None, context=context,
332 implicit_tag=implicit_tag,
333 explicit_tag=explicit_tag
334 )
335 if isinstance(default, (bytes, str)):
336 self.default = ASN1_BIT_STRING(default,
337 readable=default_readable)
338 else:
339 self.default = default
341 def randval(self):
342 # type: () -> RandString
343 return RandString(RandNum(0, 1000))
346class ASN1F_STRING(ASN1F_field[str, ASN1_STRING]):
347 ASN1_tag = ASN1_Class_UNIVERSAL.STRING
349 def randval(self):
350 # type: () -> RandString
351 return RandString(RandNum(0, 1000))
354class ASN1F_NULL(ASN1F_INTEGER):
355 ASN1_tag = ASN1_Class_UNIVERSAL.NULL
358class ASN1F_OID(ASN1F_field[str, ASN1_OID]):
359 ASN1_tag = ASN1_Class_UNIVERSAL.OID
361 def randval(self):
362 # type: () -> RandOID
363 return RandOID()
366class ASN1F_ENUMERATED(ASN1F_enum_INTEGER):
367 ASN1_tag = ASN1_Class_UNIVERSAL.ENUMERATED
370class ASN1F_UTF8_STRING(ASN1F_STRING):
371 ASN1_tag = ASN1_Class_UNIVERSAL.UTF8_STRING
374class ASN1F_NUMERIC_STRING(ASN1F_STRING):
375 ASN1_tag = ASN1_Class_UNIVERSAL.NUMERIC_STRING
378class ASN1F_PRINTABLE_STRING(ASN1F_STRING):
379 ASN1_tag = ASN1_Class_UNIVERSAL.PRINTABLE_STRING
382class ASN1F_T61_STRING(ASN1F_STRING):
383 ASN1_tag = ASN1_Class_UNIVERSAL.T61_STRING
386class ASN1F_VIDEOTEX_STRING(ASN1F_STRING):
387 ASN1_tag = ASN1_Class_UNIVERSAL.VIDEOTEX_STRING
390class ASN1F_IA5_STRING(ASN1F_STRING):
391 ASN1_tag = ASN1_Class_UNIVERSAL.IA5_STRING
394class ASN1F_GENERAL_STRING(ASN1F_STRING):
395 ASN1_tag = ASN1_Class_UNIVERSAL.GENERAL_STRING
398class ASN1F_UTC_TIME(ASN1F_STRING):
399 ASN1_tag = ASN1_Class_UNIVERSAL.UTC_TIME
401 def randval(self): # type: ignore
402 # type: () -> GeneralizedTime
403 return GeneralizedTime()
406class ASN1F_GENERALIZED_TIME(ASN1F_STRING):
407 ASN1_tag = ASN1_Class_UNIVERSAL.GENERALIZED_TIME
409 def randval(self): # type: ignore
410 # type: () -> GeneralizedTime
411 return GeneralizedTime()
414class ASN1F_ISO646_STRING(ASN1F_STRING):
415 ASN1_tag = ASN1_Class_UNIVERSAL.ISO646_STRING
418class ASN1F_UNIVERSAL_STRING(ASN1F_STRING):
419 ASN1_tag = ASN1_Class_UNIVERSAL.UNIVERSAL_STRING
422class ASN1F_BMP_STRING(ASN1F_STRING):
423 ASN1_tag = ASN1_Class_UNIVERSAL.BMP_STRING
426class ASN1F_SEQUENCE(ASN1F_field[List[Any], List[Any]]):
427 # Here is how you could decode a SEQUENCE
428 # with an unknown, private high-tag prefix :
429 # class PrivSeq(ASN1_Packet):
430 # ASN1_codec = ASN1_Codecs.BER
431 # ASN1_root = ASN1F_SEQUENCE(
432 # <asn1 field #0>,
433 # ...
434 # <asn1 field #N>,
435 # explicit_tag=0,
436 # flexible_tag=True)
437 # Because we use flexible_tag, the value of the explicit_tag does not matter. # noqa: E501
438 ASN1_tag = ASN1_Class_UNIVERSAL.SEQUENCE
439 holds_packets = 1
441 def __init__(self, *seq, **kwargs):
442 # type: (*Any, **Any) -> None
443 name = "dummy_seq_name"
444 default = [field.default for field in seq]
445 super(ASN1F_SEQUENCE, self).__init__(
446 name, default, **kwargs
447 )
448 self.seq = seq
449 self.islist = len(seq) > 1
451 def __repr__(self):
452 # type: () -> str
453 return "<%s%r>" % (self.__class__.__name__, self.seq)
455 def is_empty(self, pkt):
456 # type: (ASN1_Packet) -> bool
457 return all(f.is_empty(pkt) for f in self.seq)
459 def get_fields_list(self):
460 # type: () -> List[ASN1F_field[Any, Any]]
461 return reduce(lambda x, y: x + y.get_fields_list(),
462 self.seq, [])
464 def m2i(self, pkt, s):
465 # type: (Any, bytes) -> Tuple[Any, bytes]
466 """
467 ASN1F_SEQUENCE behaves transparently, with nested ASN1_objects being
468 dissected one by one. Because we use obj.dissect (see loop below)
469 instead of obj.m2i (as we trust dissect to do the appropriate set_vals)
470 we do not directly retrieve the list of nested objects.
471 Thus m2i returns an empty list (along with the proper remainder).
472 It is discarded by dissect() and should not be missed elsewhere.
473 """
474 diff_tag, s = BER_tagging_dec(s, hidden_tag=self.ASN1_tag,
475 implicit_tag=self.implicit_tag,
476 explicit_tag=self.explicit_tag,
477 safe=self.flexible_tag,
478 _fname=pkt.name)
479 if diff_tag is not None:
480 if self.implicit_tag is not None:
481 self.implicit_tag = diff_tag
482 elif self.explicit_tag is not None:
483 self.explicit_tag = diff_tag
484 codec = self.ASN1_tag.get_codec(pkt.ASN1_codec)
485 i, s, remain = codec.check_type_check_len(s)
486 if len(s) == 0:
487 for obj in self.seq:
488 obj.set_val(pkt, None)
489 else:
490 for obj in self.seq:
491 try:
492 s = obj.dissect(pkt, s)
493 except ASN1F_badsequence:
494 break
495 if len(s) > 0:
496 raise BER_Decoding_Error("unexpected remainder", remaining=s)
497 return [], remain
499 def dissect(self, pkt, s):
500 # type: (Any, bytes) -> bytes
501 _, x = self.m2i(pkt, s)
502 return x
504 def build(self, pkt):
505 # type: (ASN1_Packet) -> bytes
506 s = reduce(lambda x, y: x + y.build(pkt),
507 self.seq, b"")
508 return super(ASN1F_SEQUENCE, self).i2m(pkt, s)
511class ASN1F_SET(ASN1F_SEQUENCE):
512 ASN1_tag = ASN1_Class_UNIVERSAL.SET
515_SEQ_T = Union[
516 'ASN1_Packet',
517 Type[ASN1F_field[Any, Any]],
518 'ASN1F_PACKET',
519 ASN1F_field[Any, Any],
520]
523class ASN1F_SEQUENCE_OF(ASN1F_field[List[_SEQ_T],
524 List[ASN1_Object[Any]]]):
525 """
526 Two types are allowed as cls: ASN1_Packet, ASN1F_field
527 """
528 ASN1_tag = ASN1_Class_UNIVERSAL.SEQUENCE
529 islist = 1
531 def __init__(self,
532 name, # type: str
533 default, # type: Any
534 cls, # type: _SEQ_T
535 context=None, # type: Optional[Any]
536 implicit_tag=None, # type: Optional[Any]
537 explicit_tag=None, # type: Optional[Any]
538 ):
539 # type: (...) -> None
540 if isinstance(cls, type) and issubclass(cls, ASN1F_field) or \
541 isinstance(cls, ASN1F_field):
542 if isinstance(cls, type):
543 self.fld = cls(name, b"")
544 else:
545 self.fld = cls
546 self._extract_packet = lambda s, pkt: self.fld.m2i(pkt, s)
547 self.holds_packets = 0
548 elif hasattr(cls, "ASN1_root") or callable(cls):
549 self.cls = cast("Type[ASN1_Packet]", cls)
550 self._extract_packet = lambda s, pkt: self.extract_packet(
551 self.cls, s, _underlayer=pkt)
552 self.holds_packets = 1
553 else:
554 raise ValueError("cls should be an ASN1_Packet or ASN1_field")
555 super(ASN1F_SEQUENCE_OF, self).__init__(
556 name, None, context=context,
557 implicit_tag=implicit_tag, explicit_tag=explicit_tag
558 )
559 self.default = default
561 def is_empty(self,
562 pkt, # type: ASN1_Packet
563 ):
564 # type: (...) -> bool
565 return ASN1F_field.is_empty(self, pkt)
567 def m2i(self,
568 pkt, # type: ASN1_Packet
569 s, # type: bytes
570 ):
571 # type: (...) -> Tuple[List[Any], bytes]
572 diff_tag, s = BER_tagging_dec(s, hidden_tag=self.ASN1_tag,
573 implicit_tag=self.implicit_tag,
574 explicit_tag=self.explicit_tag,
575 safe=self.flexible_tag)
576 if diff_tag is not None:
577 if self.implicit_tag is not None:
578 self.implicit_tag = diff_tag
579 elif self.explicit_tag is not None:
580 self.explicit_tag = diff_tag
581 codec = self.ASN1_tag.get_codec(pkt.ASN1_codec)
582 i, s, remain = codec.check_type_check_len(s)
583 lst = []
584 while s:
585 c, s = self._extract_packet(s, pkt) # type: ignore
586 if c:
587 lst.append(c)
588 if len(s) > 0:
589 raise BER_Decoding_Error("unexpected remainder", remaining=s)
590 return lst, remain
592 def build(self, pkt):
593 # type: (ASN1_Packet) -> bytes
594 val = getattr(pkt, self.name)
595 if isinstance(val, ASN1_Object) and \
596 val.tag == ASN1_Class_UNIVERSAL.RAW:
597 s = cast(Union[List[_SEQ_T], bytes], val)
598 elif val is None:
599 s = b""
600 else:
601 s = b"".join(bytes(i) for i in val)
602 return self.i2m(pkt, s)
604 def i2repr(self, pkt, x):
605 # type: (ASN1_Packet, _I) -> str
606 if self.holds_packets:
607 return super(ASN1F_SEQUENCE_OF, self).i2repr(pkt, x) # type: ignore
608 elif x is None:
609 return "[]"
610 else:
611 return "[%s]" % ", ".join(
612 self.fld.i2repr(pkt, x) for x in x # type: ignore
613 )
615 def randval(self):
616 # type: () -> Any
617 if self.holds_packets:
618 return packet.fuzz(self.cls())
619 else:
620 return self.fld.randval()
622 def __repr__(self):
623 # type: () -> str
624 return "<%s %s>" % (self.__class__.__name__, self.name)
627class ASN1F_SET_OF(ASN1F_SEQUENCE_OF):
628 ASN1_tag = ASN1_Class_UNIVERSAL.SET
631class ASN1F_IPADDRESS(ASN1F_STRING):
632 ASN1_tag = ASN1_Class_UNIVERSAL.IPADDRESS
635class ASN1F_TIME_TICKS(ASN1F_INTEGER):
636 ASN1_tag = ASN1_Class_UNIVERSAL.TIME_TICKS
639#############################
640# Complex ASN1 Fields #
641#############################
643class ASN1F_optional(ASN1F_element):
644 """
645 ASN.1 field that is optional.
646 """
647 def __init__(self, field):
648 # type: (ASN1F_field[Any, Any]) -> None
649 field.flexible_tag = False
650 self._field = field
652 def __getattr__(self, attr):
653 # type: (str) -> Optional[Any]
654 return getattr(self._field, attr)
656 def m2i(self, pkt, s):
657 # type: (ASN1_Packet, bytes) -> Tuple[Any, bytes]
658 try:
659 return self._field.m2i(pkt, s)
660 except (ASN1_Error, ASN1F_badsequence, BER_Decoding_Error):
661 # ASN1_Error may be raised by ASN1F_CHOICE
662 return None, s
664 def dissect(self, pkt, s):
665 # type: (ASN1_Packet, bytes) -> bytes
666 try:
667 return self._field.dissect(pkt, s)
668 except (ASN1_Error, ASN1F_badsequence, BER_Decoding_Error):
669 self._field.set_val(pkt, None)
670 return s
672 def build(self, pkt):
673 # type: (ASN1_Packet) -> bytes
674 if self._field.is_empty(pkt):
675 return b""
676 return self._field.build(pkt)
678 def any2i(self, pkt, x):
679 # type: (ASN1_Packet, Any) -> Any
680 return self._field.any2i(pkt, x)
682 def i2repr(self, pkt, x):
683 # type: (ASN1_Packet, Any) -> str
684 return self._field.i2repr(pkt, x)
687class ASN1F_omit(ASN1F_field[None, None]):
688 """
689 ASN.1 field that is not specified. This is simply omitted on the network.
690 This is different from ASN1F_NULL which has a network representation.
691 """
692 def m2i(self, pkt, s):
693 # type: (ASN1_Packet, bytes) -> Tuple[None, bytes]
694 return None, s
696 def i2m(self, pkt, x):
697 # type: (ASN1_Packet, Optional[bytes]) -> bytes
698 return b""
701_CHOICE_T = Union['ASN1_Packet', Type[ASN1F_field[Any, Any]], 'ASN1F_PACKET']
704class ASN1F_CHOICE(ASN1F_field[_CHOICE_T, ASN1_Object[Any]]):
705 """
706 Multiple types are allowed: ASN1_Packet, ASN1F_field and ASN1F_PACKET(),
707 See layers/x509.py for examples.
708 Other ASN1F_field instances than ASN1F_PACKET instances must not be used.
709 """
710 holds_packets = 1
711 ASN1_tag = ASN1_Class_UNIVERSAL.ANY
713 def __init__(self, name, default, *args, **kwargs):
714 # type: (str, Any, *_CHOICE_T, **Any) -> None
715 if "implicit_tag" in kwargs:
716 err_msg = "ASN1F_CHOICE has been called with an implicit_tag"
717 raise ASN1_Error(err_msg)
718 self.implicit_tag = None
719 for kwarg in ["context", "explicit_tag"]:
720 setattr(self, kwarg, kwargs.get(kwarg))
721 super(ASN1F_CHOICE, self).__init__(
722 name, None, context=self.context,
723 explicit_tag=self.explicit_tag
724 )
725 self.default = default
726 self.current_choice = None
727 self.choices = {} # type: Dict[int, _CHOICE_T]
728 self.pktchoices = {}
729 for p in args:
730 if hasattr(p, "ASN1_root"):
731 p = cast('ASN1_Packet', p)
732 # should be ASN1_Packet
733 if hasattr(p.ASN1_root, "choices"):
734 root = cast(ASN1F_CHOICE, p.ASN1_root)
735 for k, v in root.choices.items():
736 # ASN1F_CHOICE recursion
737 self.choices[k] = v
738 else:
739 self.choices[p.ASN1_root.network_tag] = p
740 elif hasattr(p, "ASN1_tag"):
741 if isinstance(p, type):
742 # should be ASN1F_field class
743 self.choices[int(p.ASN1_tag)] = p
744 else:
745 # should be ASN1F_field instance
746 self.choices[p.network_tag] = p
747 self.pktchoices[hash(p.cls)] = (p.implicit_tag, p.explicit_tag) # noqa: E501
748 else:
749 raise ASN1_Error("ASN1F_CHOICE: no tag found for one field")
751 def m2i(self, pkt, s):
752 # type: (ASN1_Packet, bytes) -> Tuple[ASN1_Object[Any], bytes]
753 """
754 First we have to retrieve the appropriate choice.
755 Then we extract the field/packet, according to this choice.
756 """
757 if len(s) == 0:
758 raise ASN1_Error("ASN1F_CHOICE: got empty string")
759 _, s = BER_tagging_dec(s, hidden_tag=self.ASN1_tag,
760 explicit_tag=self.explicit_tag)
761 tag, _ = BER_id_dec(s)
762 if tag in self.choices:
763 choice = self.choices[tag]
764 else:
765 if self.flexible_tag:
766 choice = ASN1F_field
767 else:
768 raise ASN1_Error(
769 "ASN1F_CHOICE: unexpected field in '%s' "
770 "(tag %s not in possible tags %s)" % (
771 self.name, tag, list(self.choices.keys())
772 )
773 )
774 if hasattr(choice, "ASN1_root"):
775 # we don't want to import ASN1_Packet in this module...
776 return self.extract_packet(choice, s, _underlayer=pkt) # type: ignore
777 elif isinstance(choice, type):
778 return choice(self.name, b"").m2i(pkt, s)
779 else:
780 # XXX check properly if this is an ASN1F_PACKET
781 return choice.m2i(pkt, s)
783 def i2m(self, pkt, x):
784 # type: (ASN1_Packet, Any) -> bytes
785 if x is None:
786 s = b""
787 else:
788 s = bytes(x)
789 if hash(type(x)) in self.pktchoices:
790 imp, exp = self.pktchoices[hash(type(x))]
791 s = BER_tagging_enc(s,
792 implicit_tag=imp,
793 explicit_tag=exp)
794 return BER_tagging_enc(s, explicit_tag=self.explicit_tag)
796 def randval(self):
797 # type: () -> RandChoice
798 randchoices = []
799 for p in self.choices.values():
800 if hasattr(p, "ASN1_root"):
801 # should be ASN1_Packet class
802 randchoices.append(packet.fuzz(p())) # type: ignore
803 elif hasattr(p, "ASN1_tag"):
804 if isinstance(p, type):
805 # should be (basic) ASN1F_field class
806 randchoices.append(p("dummy", None).randval())
807 else:
808 # should be ASN1F_PACKET instance
809 randchoices.append(p.randval())
810 return RandChoice(*randchoices)
813class ASN1F_PACKET(ASN1F_field['ASN1_Packet', Optional['ASN1_Packet']]):
814 holds_packets = 1
816 def __init__(self,
817 name, # type: str
818 default, # type: Optional[ASN1_Packet]
819 cls, # type: Type[ASN1_Packet]
820 context=None, # type: Optional[Any]
821 implicit_tag=None, # type: Optional[int]
822 explicit_tag=None, # type: Optional[int]
823 next_cls_cb=None, # type: Optional[Callable[[ASN1_Packet], Type[ASN1_Packet]]] # noqa: E501
824 ):
825 # type: (...) -> None
826 self.cls = cls
827 self.next_cls_cb = next_cls_cb
828 super(ASN1F_PACKET, self).__init__(
829 name, None, context=context,
830 implicit_tag=implicit_tag, explicit_tag=explicit_tag
831 )
832 if implicit_tag is None and explicit_tag is None and cls is not None:
833 if cls.ASN1_root.ASN1_tag == ASN1_Class_UNIVERSAL.SEQUENCE:
834 self.network_tag = 16 | 0x20 # 16 + CONSTRUCTED
835 self.default = default
837 def m2i(self, pkt, s):
838 # type: (ASN1_Packet, bytes) -> Tuple[Any, bytes]
839 if self.next_cls_cb:
840 cls = self.next_cls_cb(pkt) or self.cls
841 else:
842 cls = self.cls
843 if not hasattr(cls, "ASN1_root"):
844 # A normal Packet (!= ASN1)
845 return self.extract_packet(cls, s, _underlayer=pkt)
846 diff_tag, s = BER_tagging_dec(s, hidden_tag=cls.ASN1_root.ASN1_tag, # noqa: E501
847 implicit_tag=self.implicit_tag,
848 explicit_tag=self.explicit_tag,
849 safe=self.flexible_tag,
850 _fname=self.name)
851 if diff_tag is not None:
852 if self.implicit_tag is not None:
853 self.implicit_tag = diff_tag
854 elif self.explicit_tag is not None:
855 self.explicit_tag = diff_tag
856 if not s:
857 return None, s
858 return self.extract_packet(cls, s, _underlayer=pkt)
860 def i2m(self,
861 pkt, # type: ASN1_Packet
862 x # type: Union[bytes, ASN1_Packet, None, ASN1_Object[Optional[ASN1_Packet]]] # noqa: E501
863 ):
864 # type: (...) -> bytes
865 if x is None:
866 s = b""
867 elif isinstance(x, bytes):
868 s = x
869 elif isinstance(x, ASN1_Object):
870 if x.val:
871 s = bytes(x.val)
872 else:
873 s = b""
874 else:
875 s = bytes(x)
876 if not hasattr(x, "ASN1_root"):
877 # A normal Packet (!= ASN1)
878 return s
879 return BER_tagging_enc(s,
880 implicit_tag=self.implicit_tag,
881 explicit_tag=self.explicit_tag)
883 def any2i(self,
884 pkt, # type: ASN1_Packet
885 x # type: Union[bytes, ASN1_Packet, None, ASN1_Object[Optional[ASN1_Packet]]] # noqa: E501
886 ):
887 # type: (...) -> 'ASN1_Packet'
888 if hasattr(x, "add_underlayer"):
889 x.add_underlayer(pkt) # type: ignore
890 return super(ASN1F_PACKET, self).any2i(pkt, x)
892 def randval(self): # type: ignore
893 # type: () -> ASN1_Packet
894 return packet.fuzz(self.cls())
897class ASN1F_BIT_STRING_ENCAPS(ASN1F_BIT_STRING):
898 """
899 We may emulate simple string encapsulation with explicit_tag=0x04,
900 but we need a specific class for bit strings because of unused bits, etc.
901 """
902 ASN1_tag = ASN1_Class_UNIVERSAL.BIT_STRING
904 def __init__(self,
905 name, # type: str
906 default, # type: Optional[ASN1_Packet]
907 cls, # type: Type[ASN1_Packet]
908 context=None, # type: Optional[Any]
909 implicit_tag=None, # type: Optional[int]
910 explicit_tag=None, # type: Optional[int]
911 ):
912 # type: (...) -> None
913 self.cls = cls
914 super(ASN1F_BIT_STRING_ENCAPS, self).__init__( # type: ignore
915 name,
916 default and bytes(default),
917 context=context,
918 implicit_tag=implicit_tag,
919 explicit_tag=explicit_tag
920 )
922 def m2i(self, pkt, s): # type: ignore
923 # type: (ASN1_Packet, bytes) -> Tuple[Optional[ASN1_Packet], bytes]
924 bit_string, remain = super(ASN1F_BIT_STRING_ENCAPS, self).m2i(pkt, s)
925 if len(bit_string.val) % 8 != 0:
926 raise BER_Decoding_Error("wrong bit string", remaining=s)
927 if bit_string.val_readable:
928 p, s = self.extract_packet(self.cls, bit_string.val_readable,
929 _underlayer=pkt)
930 else:
931 return None, bit_string.val_readable
932 if len(s) > 0:
933 raise BER_Decoding_Error("unexpected remainder", remaining=s)
934 return p, remain
936 def i2m(self, pkt, x): # type: ignore
937 # type: (ASN1_Packet, Optional[ASN1_BIT_STRING]) -> bytes
938 if not isinstance(x, ASN1_BIT_STRING):
939 x = ASN1_BIT_STRING(
940 b"" if x is None else bytes(x), # type: ignore
941 readable=True,
942 )
943 return super(ASN1F_BIT_STRING_ENCAPS, self).i2m(pkt, x)
946class ASN1F_FLAGS(ASN1F_BIT_STRING):
947 def __init__(self,
948 name, # type: str
949 default, # type: Optional[str]
950 mapping, # type: List[str]
951 context=None, # type: Optional[Any]
952 implicit_tag=None, # type: Optional[int]
953 explicit_tag=None, # type: Optional[Any]
954 ):
955 # type: (...) -> None
956 self.mapping = mapping
957 super(ASN1F_FLAGS, self).__init__(
958 name, default,
959 default_readable=False,
960 context=context,
961 implicit_tag=implicit_tag,
962 explicit_tag=explicit_tag
963 )
965 def any2i(self, pkt, x):
966 # type: (ASN1_Packet, Any) -> str
967 if isinstance(x, str):
968 if any(y not in ["0", "1"] for y in x):
969 # resolve the flags
970 value = ["0"] * len(self.mapping)
971 for i in x.split("+"):
972 value[self.mapping.index(i)] = "1"
973 x = "".join(value)
974 x = ASN1_BIT_STRING(x)
975 return super(ASN1F_FLAGS, self).any2i(pkt, x)
977 def get_flags(self, pkt):
978 # type: (ASN1_Packet) -> List[str]
979 fbytes = getattr(pkt, self.name).val
980 return [self.mapping[i] for i, positional in enumerate(fbytes)
981 if positional == '1' and i < len(self.mapping)]
983 def i2repr(self, pkt, x):
984 # type: (ASN1_Packet, Any) -> str
985 if x is not None:
986 pretty_s = ", ".join(self.get_flags(pkt))
987 return pretty_s + " " + repr(x)
988 return repr(x)
991class ASN1F_STRING_PacketField(ASN1F_STRING):
992 """
993 ASN1F_STRING that holds packets.
994 """
995 holds_packets = 1
997 def i2m(self, pkt, val):
998 # type: (ASN1_Packet, Any) -> bytes
999 if hasattr(val, "ASN1_root"):
1000 val = ASN1_STRING(bytes(val))
1001 return super(ASN1F_STRING_PacketField, self).i2m(pkt, val)
1003 def any2i(self, pkt, x):
1004 # type: (ASN1_Packet, Any) -> Any
1005 if hasattr(x, "add_underlayer"):
1006 x.add_underlayer(pkt)
1007 return super(ASN1F_STRING_PacketField, self).any2i(pkt, x)
1010class ASN1F_STRING_ENCAPS(ASN1F_STRING_PacketField):
1011 """
1012 ASN1F_STRING that encapsulates a single ASN1 packet.
1013 """
1015 def __init__(self,
1016 name, # type: str
1017 default, # type: Optional[ASN1_Packet]
1018 cls, # type: Type[ASN1_Packet]
1019 context=None, # type: Optional[Any]
1020 implicit_tag=None, # type: Optional[int]
1021 explicit_tag=None, # type: Optional[int]
1022 ):
1023 # type: (...) -> None
1024 self.cls = cls
1025 super(ASN1F_STRING_ENCAPS, self).__init__(
1026 name,
1027 default and bytes(default), # type: ignore
1028 context=context,
1029 implicit_tag=implicit_tag,
1030 explicit_tag=explicit_tag
1031 )
1033 def m2i(self, pkt, s): # type: ignore
1034 # type: (ASN1_Packet, bytes) -> Tuple[ASN1_Packet, bytes]
1035 val = super(ASN1F_STRING_ENCAPS, self).m2i(pkt, s)
1036 return self.cls(val[0].val, _underlayer=pkt), val[1]