Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/scapy/asn1fields.py: 68%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
5# Acknowledgment: Maxence Tury <maxence.tury@ssi.gouv.fr>
7"""
8Classes that implement ASN.1 data structures.
9"""
11import copy
13from functools import reduce
15from scapy.asn1.asn1 import (
16 ASN1_BIT_STRING,
17 ASN1_BOOLEAN,
18 ASN1_Class,
19 ASN1_Class_UNIVERSAL,
20 ASN1_Error,
21 ASN1_INTEGER,
22 ASN1_NULL,
23 ASN1_OID,
24 ASN1_Object,
25 ASN1_STRING,
26)
27from scapy.asn1.ber import (
28 BER_Decoding_Error,
29 BER_id_dec,
30 BER_tagging_dec,
31 BER_tagging_enc,
32)
33from scapy.base_classes import BasePacket
34from scapy.compat import raw
35from scapy.volatile import (
36 GeneralizedTime,
37 RandChoice,
38 RandInt,
39 RandNum,
40 RandOID,
41 RandString,
42 RandField,
43)
45from scapy import packet
47from typing import (
48 Any,
49 AnyStr,
50 Callable,
51 Dict,
52 Generic,
53 List,
54 Optional,
55 Tuple,
56 Type,
57 TypeVar,
58 Union,
59 cast,
60 TYPE_CHECKING,
61)
63if TYPE_CHECKING:
64 from scapy.asn1packet import ASN1_Packet
67class ASN1F_badsequence(Exception):
68 pass
71class ASN1F_element(object):
72 pass
75##########################
76# Basic ASN1 Field #
77##########################
79_I = TypeVar('_I') # Internal storage
80_A = TypeVar('_A') # ASN.1 object
83class ASN1F_field(ASN1F_element, Generic[_I, _A]):
84 holds_packets = 0
85 islist = 0
86 ASN1_tag = ASN1_Class_UNIVERSAL.ANY
87 context = ASN1_Class_UNIVERSAL # type: Type[ASN1_Class]
89 def __init__(self,
90 name, # type: str
91 default, # type: Optional[_A]
92 context=None, # type: Optional[Type[ASN1_Class]]
93 implicit_tag=None, # type: Optional[int]
94 explicit_tag=None, # type: Optional[int]
95 flexible_tag=False, # type: Optional[bool]
96 size_len=None, # type: Optional[int]
97 ):
98 # type: (...) -> None
99 if context is not None:
100 self.context = context
101 self.name = name
102 if default is None:
103 self.default = default # type: Optional[_A]
104 elif isinstance(default, ASN1_NULL):
105 self.default = default # type: ignore
106 else:
107 self.default = self.ASN1_tag.asn1_object(default) # type: ignore
108 self.size_len = size_len
109 self.flexible_tag = flexible_tag
110 if (implicit_tag is not None) and (explicit_tag is not None):
111 err_msg = "field cannot be both implicitly and explicitly tagged"
112 raise ASN1_Error(err_msg)
113 self.implicit_tag = implicit_tag and int(implicit_tag)
114 self.explicit_tag = explicit_tag and int(explicit_tag)
115 # network_tag gets useful for ASN1F_CHOICE
116 self.network_tag = int(implicit_tag or explicit_tag or self.ASN1_tag)
117 self.owners = [] # type: List[Type[ASN1_Packet]]
119 def register_owner(self, cls):
120 # type: (Type[ASN1_Packet]) -> None
121 self.owners.append(cls)
123 def i2repr(self, pkt, x):
124 # type: (ASN1_Packet, _I) -> str
125 return repr(x)
127 def i2h(self, pkt, x):
128 # type: (ASN1_Packet, _I) -> Any
129 return x
131 def m2i(self, pkt, s):
132 # type: (ASN1_Packet, bytes) -> Tuple[_A, bytes]
133 """
134 The good thing about safedec is that it may still decode ASN1
135 even if there is a mismatch between the expected tag (self.ASN1_tag)
136 and the actual tag; the decoded ASN1 object will simply be put
137 into an ASN1_BADTAG object. However, safedec prevents the raising of
138 exceptions needed for ASN1F_optional processing.
139 Thus we use 'flexible_tag', which should be False with ASN1F_optional.
141 Regarding other fields, we might need to know whether encoding went
142 as expected or not. Noticeably, input methods from cert.py expect
143 certain exceptions to be raised. Hence default flexible_tag is False.
144 """
145 diff_tag, s = BER_tagging_dec(s, hidden_tag=self.ASN1_tag,
146 implicit_tag=self.implicit_tag,
147 explicit_tag=self.explicit_tag,
148 safe=self.flexible_tag,
149 _fname=self.name)
150 if diff_tag is not None:
151 # this implies that flexible_tag was True
152 if self.implicit_tag is not None:
153 self.implicit_tag = diff_tag
154 elif self.explicit_tag is not None:
155 self.explicit_tag = diff_tag
156 codec = self.ASN1_tag.get_codec(pkt.ASN1_codec)
157 if self.flexible_tag:
158 return codec.safedec(s, context=self.context) # type: ignore
159 else:
160 return codec.dec(s, context=self.context) # type: ignore
162 def i2m(self, pkt, x):
163 # type: (ASN1_Packet, Union[bytes, _I, _A]) -> bytes
164 if x is None:
165 return b""
166 if isinstance(x, ASN1_Object):
167 if (self.ASN1_tag == ASN1_Class_UNIVERSAL.ANY or
168 x.tag == ASN1_Class_UNIVERSAL.RAW or
169 x.tag == ASN1_Class_UNIVERSAL.ERROR or
170 self.ASN1_tag == x.tag):
171 s = x.enc(pkt.ASN1_codec)
172 else:
173 raise ASN1_Error("Encoding Error: got %r instead of an %r for field [%s]" % (x, self.ASN1_tag, self.name)) # noqa: E501
174 else:
175 s = self.ASN1_tag.get_codec(pkt.ASN1_codec).enc(x, size_len=self.size_len)
176 return BER_tagging_enc(s,
177 implicit_tag=self.implicit_tag,
178 explicit_tag=self.explicit_tag)
180 def any2i(self, pkt, x):
181 # type: (ASN1_Packet, Any) -> _I
182 return cast(_I, x)
184 def extract_packet(self,
185 cls, # type: Type[ASN1_Packet]
186 s, # type: bytes
187 _underlayer=None # type: Optional[ASN1_Packet]
188 ):
189 # type: (...) -> Tuple[ASN1_Packet, bytes]
190 try:
191 c = cls(s, _underlayer=_underlayer)
192 except ASN1F_badsequence:
193 c = packet.Raw(s, _underlayer=_underlayer) # type: ignore
194 cpad = c.getlayer(packet.Raw)
195 s = b""
196 if cpad is not None:
197 s = cpad.load
198 if cpad.underlayer:
199 del cpad.underlayer.payload
200 return c, s
202 def build(self, pkt):
203 # type: (ASN1_Packet) -> bytes
204 return self.i2m(pkt, getattr(pkt, self.name))
206 def dissect(self, pkt, s):
207 # type: (ASN1_Packet, bytes) -> bytes
208 v, s = self.m2i(pkt, s)
209 self.set_val(pkt, v)
210 return s
212 def do_copy(self, x):
213 # type: (Any) -> Any
214 if isinstance(x, list):
215 x = x[:]
216 for i in range(len(x)):
217 if isinstance(x[i], BasePacket):
218 x[i] = x[i].copy()
219 return x
220 if hasattr(x, "copy"):
221 return x.copy()
222 return x
224 def set_val(self, pkt, val):
225 # type: (ASN1_Packet, Any) -> None
226 setattr(pkt, self.name, val)
228 def is_empty(self, pkt):
229 # type: (ASN1_Packet) -> bool
230 return getattr(pkt, self.name) is None
232 def get_fields_list(self):
233 # type: () -> List[ASN1F_field[Any, Any]]
234 return [self]
236 def __str__(self):
237 # type: () -> str
238 return repr(self)
240 def randval(self):
241 # type: () -> RandField[_I]
242 return cast(RandField[_I], RandInt())
244 def copy(self):
245 # type: () -> ASN1F_field[_I, _A]
246 return copy.copy(self)
249############################
250# Simple ASN1 Fields #
251############################
253class ASN1F_BOOLEAN(ASN1F_field[bool, ASN1_BOOLEAN]):
254 ASN1_tag = ASN1_Class_UNIVERSAL.BOOLEAN
256 def randval(self):
257 # type: () -> RandChoice
258 return RandChoice(True, False)
261class ASN1F_INTEGER(ASN1F_field[int, ASN1_INTEGER]):
262 ASN1_tag = ASN1_Class_UNIVERSAL.INTEGER
264 def randval(self):
265 # type: () -> RandNum
266 return RandNum(-2**64, 2**64 - 1)
269class ASN1F_enum_INTEGER(ASN1F_INTEGER):
270 def __init__(self,
271 name, # type: str
272 default, # type: ASN1_INTEGER
273 enum, # type: Dict[int, str]
274 context=None, # type: Optional[Any]
275 implicit_tag=None, # type: Optional[Any]
276 explicit_tag=None, # type: Optional[Any]
277 ):
278 # type: (...) -> None
279 super(ASN1F_enum_INTEGER, self).__init__(
280 name, default, context=context,
281 implicit_tag=implicit_tag,
282 explicit_tag=explicit_tag
283 )
284 i2s = self.i2s = {} # type: Dict[int, str]
285 s2i = self.s2i = {} # type: Dict[str, int]
286 if isinstance(enum, list):
287 keys = range(len(enum))
288 else:
289 keys = list(enum)
290 if any(isinstance(x, str) for x in keys):
291 i2s, s2i = s2i, i2s # type: ignore
292 for k in keys:
293 i2s[k] = enum[k]
294 s2i[enum[k]] = k
296 def i2m(self,
297 pkt, # type: ASN1_Packet
298 s, # type: Union[bytes, str, int, ASN1_INTEGER]
299 ):
300 # type: (...) -> bytes
301 if not isinstance(s, str):
302 vs = s
303 else:
304 vs = self.s2i[s]
305 return super(ASN1F_enum_INTEGER, self).i2m(pkt, vs)
307 def i2repr(self,
308 pkt, # type: ASN1_Packet
309 x, # type: Union[str, int]
310 ):
311 # type: (...) -> str
312 if x is not None and isinstance(x, ASN1_INTEGER):
313 r = self.i2s.get(x.val)
314 if r:
315 return "'%s' %s" % (r, repr(x))
316 return repr(x)
319class ASN1F_BIT_STRING(ASN1F_field[str, ASN1_BIT_STRING]):
320 ASN1_tag = ASN1_Class_UNIVERSAL.BIT_STRING
322 def __init__(self,
323 name, # type: str
324 default, # type: Optional[Union[ASN1_BIT_STRING, AnyStr]]
325 default_readable=True, # type: bool
326 context=None, # type: Optional[Any]
327 implicit_tag=None, # type: Optional[int]
328 explicit_tag=None, # type: Optional[int]
329 ):
330 # type: (...) -> None
331 super(ASN1F_BIT_STRING, self).__init__(
332 name, None, context=context,
333 implicit_tag=implicit_tag,
334 explicit_tag=explicit_tag
335 )
336 if isinstance(default, (bytes, str)):
337 self.default = ASN1_BIT_STRING(default,
338 readable=default_readable)
339 else:
340 self.default = default
342 def randval(self):
343 # type: () -> RandString
344 return RandString(RandNum(0, 1000))
347class ASN1F_STRING(ASN1F_field[str, ASN1_STRING]):
348 ASN1_tag = ASN1_Class_UNIVERSAL.STRING
350 def randval(self):
351 # type: () -> RandString
352 return RandString(RandNum(0, 1000))
355class ASN1F_NULL(ASN1F_INTEGER):
356 ASN1_tag = ASN1_Class_UNIVERSAL.NULL
359class ASN1F_OID(ASN1F_field[str, ASN1_OID]):
360 ASN1_tag = ASN1_Class_UNIVERSAL.OID
362 def randval(self):
363 # type: () -> RandOID
364 return RandOID()
367class ASN1F_ENUMERATED(ASN1F_enum_INTEGER):
368 ASN1_tag = ASN1_Class_UNIVERSAL.ENUMERATED
371class ASN1F_UTF8_STRING(ASN1F_STRING):
372 ASN1_tag = ASN1_Class_UNIVERSAL.UTF8_STRING
375class ASN1F_NUMERIC_STRING(ASN1F_STRING):
376 ASN1_tag = ASN1_Class_UNIVERSAL.NUMERIC_STRING
379class ASN1F_PRINTABLE_STRING(ASN1F_STRING):
380 ASN1_tag = ASN1_Class_UNIVERSAL.PRINTABLE_STRING
383class ASN1F_T61_STRING(ASN1F_STRING):
384 ASN1_tag = ASN1_Class_UNIVERSAL.T61_STRING
387class ASN1F_VIDEOTEX_STRING(ASN1F_STRING):
388 ASN1_tag = ASN1_Class_UNIVERSAL.VIDEOTEX_STRING
391class ASN1F_IA5_STRING(ASN1F_STRING):
392 ASN1_tag = ASN1_Class_UNIVERSAL.IA5_STRING
395class ASN1F_GENERAL_STRING(ASN1F_STRING):
396 ASN1_tag = ASN1_Class_UNIVERSAL.GENERAL_STRING
399class ASN1F_UTC_TIME(ASN1F_STRING):
400 ASN1_tag = ASN1_Class_UNIVERSAL.UTC_TIME
402 def randval(self): # type: ignore
403 # type: () -> GeneralizedTime
404 return GeneralizedTime()
407class ASN1F_GENERALIZED_TIME(ASN1F_STRING):
408 ASN1_tag = ASN1_Class_UNIVERSAL.GENERALIZED_TIME
410 def randval(self): # type: ignore
411 # type: () -> GeneralizedTime
412 return GeneralizedTime()
415class ASN1F_ISO646_STRING(ASN1F_STRING):
416 ASN1_tag = ASN1_Class_UNIVERSAL.ISO646_STRING
419class ASN1F_UNIVERSAL_STRING(ASN1F_STRING):
420 ASN1_tag = ASN1_Class_UNIVERSAL.UNIVERSAL_STRING
423class ASN1F_BMP_STRING(ASN1F_STRING):
424 ASN1_tag = ASN1_Class_UNIVERSAL.BMP_STRING
427class ASN1F_SEQUENCE(ASN1F_field[List[Any], List[Any]]):
428 # Here is how you could decode a SEQUENCE
429 # with an unknown, private high-tag prefix :
430 # class PrivSeq(ASN1_Packet):
431 # ASN1_codec = ASN1_Codecs.BER
432 # ASN1_root = ASN1F_SEQUENCE(
433 # <asn1 field #0>,
434 # ...
435 # <asn1 field #N>,
436 # explicit_tag=0,
437 # flexible_tag=True)
438 # Because we use flexible_tag, the value of the explicit_tag does not matter. # noqa: E501
439 ASN1_tag = ASN1_Class_UNIVERSAL.SEQUENCE
440 holds_packets = 1
442 def __init__(self, *seq, **kwargs):
443 # type: (*Any, **Any) -> None
444 name = "dummy_seq_name"
445 default = [field.default for field in seq]
446 super(ASN1F_SEQUENCE, self).__init__(
447 name, default, **kwargs
448 )
449 self.seq = seq
450 self.islist = len(seq) > 1
452 def __repr__(self):
453 # type: () -> str
454 return "<%s%r>" % (self.__class__.__name__, self.seq)
456 def is_empty(self, pkt):
457 # type: (ASN1_Packet) -> bool
458 return all(f.is_empty(pkt) for f in self.seq)
460 def get_fields_list(self):
461 # type: () -> List[ASN1F_field[Any, Any]]
462 return reduce(lambda x, y: x + y.get_fields_list(),
463 self.seq, [])
465 def m2i(self, pkt, s):
466 # type: (Any, bytes) -> Tuple[Any, bytes]
467 """
468 ASN1F_SEQUENCE behaves transparently, with nested ASN1_objects being
469 dissected one by one. Because we use obj.dissect (see loop below)
470 instead of obj.m2i (as we trust dissect to do the appropriate set_vals)
471 we do not directly retrieve the list of nested objects.
472 Thus m2i returns an empty list (along with the proper remainder).
473 It is discarded by dissect() and should not be missed elsewhere.
474 """
475 diff_tag, s = BER_tagging_dec(s, hidden_tag=self.ASN1_tag,
476 implicit_tag=self.implicit_tag,
477 explicit_tag=self.explicit_tag,
478 safe=self.flexible_tag,
479 _fname=pkt.name)
480 if diff_tag is not None:
481 if self.implicit_tag is not None:
482 self.implicit_tag = diff_tag
483 elif self.explicit_tag is not None:
484 self.explicit_tag = diff_tag
485 codec = self.ASN1_tag.get_codec(pkt.ASN1_codec)
486 i, s, remain = codec.check_type_check_len(s)
487 if len(s) == 0:
488 for obj in self.seq:
489 obj.set_val(pkt, None)
490 else:
491 for obj in self.seq:
492 try:
493 s = obj.dissect(pkt, s)
494 except ASN1F_badsequence:
495 break
496 if len(s) > 0:
497 raise BER_Decoding_Error("unexpected remainder", remaining=s)
498 return [], remain
500 def dissect(self, pkt, s):
501 # type: (Any, bytes) -> bytes
502 _, x = self.m2i(pkt, s)
503 return x
505 def build(self, pkt):
506 # type: (ASN1_Packet) -> bytes
507 s = reduce(lambda x, y: x + y.build(pkt),
508 self.seq, b"")
509 return super(ASN1F_SEQUENCE, self).i2m(pkt, s)
512class ASN1F_SET(ASN1F_SEQUENCE):
513 ASN1_tag = ASN1_Class_UNIVERSAL.SET
516_SEQ_T = Union['ASN1_Packet', Type[ASN1F_field[Any, Any]], 'ASN1F_PACKET']
519class ASN1F_SEQUENCE_OF(ASN1F_field[List[_SEQ_T],
520 List[ASN1_Object[Any]]]):
521 """
522 Two types are allowed as cls: ASN1_Packet, ASN1F_field
523 """
524 ASN1_tag = ASN1_Class_UNIVERSAL.SEQUENCE
525 islist = 1
527 def __init__(self,
528 name, # type: str
529 default, # type: Any
530 cls, # type: _SEQ_T
531 context=None, # type: Optional[Any]
532 implicit_tag=None, # type: Optional[Any]
533 explicit_tag=None, # type: Optional[Any]
534 ):
535 # type: (...) -> None
536 if isinstance(cls, type) and issubclass(cls, ASN1F_field):
537 self.fld = cls
538 self._extract_packet = lambda s, pkt: self.fld(
539 self.name, b"").m2i(pkt, s)
540 self.holds_packets = 0
541 elif hasattr(cls, "ASN1_root") or callable(cls):
542 self.cls = cast("Type[ASN1_Packet]", cls)
543 self._extract_packet = lambda s, pkt: self.extract_packet(
544 self.cls, s, _underlayer=pkt)
545 self.holds_packets = 1
546 else:
547 raise ValueError("cls should be an ASN1_Packet or ASN1_field")
548 super(ASN1F_SEQUENCE_OF, self).__init__(
549 name, None, context=context,
550 implicit_tag=implicit_tag, explicit_tag=explicit_tag
551 )
552 self.default = default
554 def is_empty(self,
555 pkt, # type: ASN1_Packet
556 ):
557 # type: (...) -> bool
558 return ASN1F_field.is_empty(self, pkt)
560 def m2i(self,
561 pkt, # type: ASN1_Packet
562 s, # type: bytes
563 ):
564 # type: (...) -> Tuple[List[Any], bytes]
565 diff_tag, s = BER_tagging_dec(s, hidden_tag=self.ASN1_tag,
566 implicit_tag=self.implicit_tag,
567 explicit_tag=self.explicit_tag,
568 safe=self.flexible_tag)
569 if diff_tag is not None:
570 if self.implicit_tag is not None:
571 self.implicit_tag = diff_tag
572 elif self.explicit_tag is not None:
573 self.explicit_tag = diff_tag
574 codec = self.ASN1_tag.get_codec(pkt.ASN1_codec)
575 i, s, remain = codec.check_type_check_len(s)
576 lst = []
577 while s:
578 c, s = self._extract_packet(s, pkt) # type: ignore
579 if c:
580 lst.append(c)
581 if len(s) > 0:
582 raise BER_Decoding_Error("unexpected remainder", remaining=s)
583 return lst, remain
585 def build(self, pkt):
586 # type: (ASN1_Packet) -> bytes
587 val = getattr(pkt, self.name)
588 if isinstance(val, ASN1_Object) and \
589 val.tag == ASN1_Class_UNIVERSAL.RAW:
590 s = cast(Union[List[_SEQ_T], bytes], val)
591 elif val is None:
592 s = b""
593 else:
594 s = b"".join(raw(i) for i in val)
595 return self.i2m(pkt, s)
597 def randval(self):
598 # type: () -> Any
599 if self.holds_packets:
600 return packet.fuzz(self.cls())
601 else:
602 return self.fld(self.name, b"").randval()
604 def __repr__(self):
605 # type: () -> str
606 return "<%s %s>" % (self.__class__.__name__, self.name)
609class ASN1F_SET_OF(ASN1F_SEQUENCE_OF):
610 ASN1_tag = ASN1_Class_UNIVERSAL.SET
613class ASN1F_IPADDRESS(ASN1F_STRING):
614 ASN1_tag = ASN1_Class_UNIVERSAL.IPADDRESS
617class ASN1F_TIME_TICKS(ASN1F_INTEGER):
618 ASN1_tag = ASN1_Class_UNIVERSAL.TIME_TICKS
621#############################
622# Complex ASN1 Fields #
623#############################
625class ASN1F_optional(ASN1F_element):
626 def __init__(self, field):
627 # type: (ASN1F_field[Any, Any]) -> None
628 field.flexible_tag = False
629 self._field = field
631 def __getattr__(self, attr):
632 # type: (str) -> Optional[Any]
633 return getattr(self._field, attr)
635 def m2i(self, pkt, s):
636 # type: (ASN1_Packet, bytes) -> Tuple[Any, bytes]
637 try:
638 return self._field.m2i(pkt, s)
639 except (ASN1_Error, ASN1F_badsequence, BER_Decoding_Error):
640 # ASN1_Error may be raised by ASN1F_CHOICE
641 return None, s
643 def dissect(self, pkt, s):
644 # type: (ASN1_Packet, bytes) -> bytes
645 try:
646 return self._field.dissect(pkt, s)
647 except (ASN1_Error, ASN1F_badsequence, BER_Decoding_Error):
648 self._field.set_val(pkt, None)
649 return s
651 def build(self, pkt):
652 # type: (ASN1_Packet) -> bytes
653 if self._field.is_empty(pkt):
654 return b""
655 return self._field.build(pkt)
657 def any2i(self, pkt, x):
658 # type: (ASN1_Packet, Any) -> Any
659 return self._field.any2i(pkt, x)
661 def i2repr(self, pkt, x):
662 # type: (ASN1_Packet, Any) -> str
663 return self._field.i2repr(pkt, x)
666_CHOICE_T = Union['ASN1_Packet', Type[ASN1F_field[Any, Any]], 'ASN1F_PACKET']
669class ASN1F_CHOICE(ASN1F_field[_CHOICE_T, ASN1_Object[Any]]):
670 """
671 Multiple types are allowed: ASN1_Packet, ASN1F_field and ASN1F_PACKET(),
672 See layers/x509.py for examples.
673 Other ASN1F_field instances than ASN1F_PACKET instances must not be used.
674 """
675 holds_packets = 1
676 ASN1_tag = ASN1_Class_UNIVERSAL.ANY
678 def __init__(self, name, default, *args, **kwargs):
679 # type: (str, Any, *_CHOICE_T, **Any) -> None
680 if "implicit_tag" in kwargs:
681 err_msg = "ASN1F_CHOICE has been called with an implicit_tag"
682 raise ASN1_Error(err_msg)
683 self.implicit_tag = None
684 for kwarg in ["context", "explicit_tag"]:
685 setattr(self, kwarg, kwargs.get(kwarg))
686 super(ASN1F_CHOICE, self).__init__(
687 name, None, context=self.context,
688 explicit_tag=self.explicit_tag
689 )
690 self.default = default
691 self.current_choice = None
692 self.choices = {} # type: Dict[int, _CHOICE_T]
693 self.pktchoices = {}
694 for p in args:
695 if hasattr(p, "ASN1_root"):
696 p = cast('ASN1_Packet', p)
697 # should be ASN1_Packet
698 if hasattr(p.ASN1_root, "choices"):
699 root = cast(ASN1F_CHOICE, p.ASN1_root)
700 for k, v in root.choices.items():
701 # ASN1F_CHOICE recursion
702 self.choices[k] = v
703 else:
704 self.choices[p.ASN1_root.network_tag] = p
705 elif hasattr(p, "ASN1_tag"):
706 if isinstance(p, type):
707 # should be ASN1F_field class
708 self.choices[int(p.ASN1_tag)] = p
709 else:
710 # should be ASN1F_field instance
711 self.choices[p.network_tag] = p
712 self.pktchoices[hash(p.cls)] = (p.implicit_tag, p.explicit_tag) # noqa: E501
713 else:
714 raise ASN1_Error("ASN1F_CHOICE: no tag found for one field")
716 def m2i(self, pkt, s):
717 # type: (ASN1_Packet, bytes) -> Tuple[ASN1_Object[Any], bytes]
718 """
719 First we have to retrieve the appropriate choice.
720 Then we extract the field/packet, according to this choice.
721 """
722 if len(s) == 0:
723 raise ASN1_Error("ASN1F_CHOICE: got empty string")
724 _, s = BER_tagging_dec(s, hidden_tag=self.ASN1_tag,
725 explicit_tag=self.explicit_tag)
726 tag, _ = BER_id_dec(s)
727 if tag in self.choices:
728 choice = self.choices[tag]
729 else:
730 if self.flexible_tag:
731 choice = ASN1F_field
732 else:
733 raise ASN1_Error(
734 "ASN1F_CHOICE: unexpected field in '%s' "
735 "(tag %s not in possible tags %s)" % (
736 self.name, tag, list(self.choices.keys())
737 )
738 )
739 if hasattr(choice, "ASN1_root"):
740 # we don't want to import ASN1_Packet in this module...
741 return self.extract_packet(choice, s, _underlayer=pkt) # type: ignore
742 elif isinstance(choice, type):
743 return choice(self.name, b"").m2i(pkt, s)
744 else:
745 # XXX check properly if this is an ASN1F_PACKET
746 return choice.m2i(pkt, s)
748 def i2m(self, pkt, x):
749 # type: (ASN1_Packet, Any) -> bytes
750 if x is None:
751 s = b""
752 else:
753 s = raw(x)
754 if hash(type(x)) in self.pktchoices:
755 imp, exp = self.pktchoices[hash(type(x))]
756 s = BER_tagging_enc(s,
757 implicit_tag=imp,
758 explicit_tag=exp)
759 return BER_tagging_enc(s, explicit_tag=self.explicit_tag)
761 def randval(self):
762 # type: () -> RandChoice
763 randchoices = []
764 for p in self.choices.values():
765 if hasattr(p, "ASN1_root"):
766 # should be ASN1_Packet class
767 randchoices.append(packet.fuzz(p())) # type: ignore
768 elif hasattr(p, "ASN1_tag"):
769 if isinstance(p, type):
770 # should be (basic) ASN1F_field class
771 randchoices.append(p("dummy", None).randval())
772 else:
773 # should be ASN1F_PACKET instance
774 randchoices.append(p.randval())
775 return RandChoice(*randchoices)
778class ASN1F_PACKET(ASN1F_field['ASN1_Packet', Optional['ASN1_Packet']]):
779 holds_packets = 1
781 def __init__(self,
782 name, # type: str
783 default, # type: Optional[ASN1_Packet]
784 cls, # type: Type[ASN1_Packet]
785 context=None, # type: Optional[Any]
786 implicit_tag=None, # type: Optional[int]
787 explicit_tag=None, # type: Optional[int]
788 next_cls_cb=None, # type: Optional[Callable[[ASN1_Packet], Type[ASN1_Packet]]] # noqa: E501
789 ):
790 # type: (...) -> None
791 self.cls = cls
792 self.next_cls_cb = next_cls_cb
793 super(ASN1F_PACKET, self).__init__(
794 name, None, context=context,
795 implicit_tag=implicit_tag, explicit_tag=explicit_tag
796 )
797 if implicit_tag is None and explicit_tag is None and cls is not None:
798 if cls.ASN1_root.ASN1_tag == ASN1_Class_UNIVERSAL.SEQUENCE:
799 self.network_tag = 16 | 0x20 # 16 + CONSTRUCTED
800 self.default = default
802 def m2i(self, pkt, s):
803 # type: (ASN1_Packet, bytes) -> Tuple[Any, bytes]
804 if self.next_cls_cb:
805 cls = self.next_cls_cb(pkt) or self.cls
806 else:
807 cls = self.cls
808 if not hasattr(cls, "ASN1_root"):
809 # A normal Packet (!= ASN1)
810 return self.extract_packet(cls, s, _underlayer=pkt)
811 diff_tag, s = BER_tagging_dec(s, hidden_tag=cls.ASN1_root.ASN1_tag, # noqa: E501
812 implicit_tag=self.implicit_tag,
813 explicit_tag=self.explicit_tag,
814 safe=self.flexible_tag,
815 _fname=self.name)
816 if diff_tag is not None:
817 if self.implicit_tag is not None:
818 self.implicit_tag = diff_tag
819 elif self.explicit_tag is not None:
820 self.explicit_tag = diff_tag
821 if not s:
822 return None, s
823 return self.extract_packet(cls, s, _underlayer=pkt)
825 def i2m(self,
826 pkt, # type: ASN1_Packet
827 x # type: Union[bytes, ASN1_Packet, None, ASN1_Object[Optional[ASN1_Packet]]] # noqa: E501
828 ):
829 # type: (...) -> bytes
830 if x is None:
831 s = b""
832 elif isinstance(x, bytes):
833 s = x
834 elif isinstance(x, ASN1_Object):
835 if x.val:
836 s = raw(x.val)
837 else:
838 s = b""
839 else:
840 s = raw(x)
841 if not hasattr(x, "ASN1_root"):
842 # A normal Packet (!= ASN1)
843 return s
844 return BER_tagging_enc(s,
845 implicit_tag=self.implicit_tag,
846 explicit_tag=self.explicit_tag)
848 def any2i(self,
849 pkt, # type: ASN1_Packet
850 x # type: Union[bytes, ASN1_Packet, None, ASN1_Object[Optional[ASN1_Packet]]] # noqa: E501
851 ):
852 # type: (...) -> 'ASN1_Packet'
853 if hasattr(x, "add_underlayer"):
854 x.add_underlayer(pkt) # type: ignore
855 return super(ASN1F_PACKET, self).any2i(pkt, x)
857 def randval(self): # type: ignore
858 # type: () -> ASN1_Packet
859 return packet.fuzz(self.cls())
862class ASN1F_BIT_STRING_ENCAPS(ASN1F_BIT_STRING):
863 """
864 We may emulate simple string encapsulation with explicit_tag=0x04,
865 but we need a specific class for bit strings because of unused bits, etc.
866 """
867 ASN1_tag = ASN1_Class_UNIVERSAL.BIT_STRING
869 def __init__(self,
870 name, # type: str
871 default, # type: Optional[ASN1_Packet]
872 cls, # type: Type[ASN1_Packet]
873 context=None, # type: Optional[Any]
874 implicit_tag=None, # type: Optional[int]
875 explicit_tag=None, # type: Optional[int]
876 ):
877 # type: (...) -> None
878 self.cls = cls
879 super(ASN1F_BIT_STRING_ENCAPS, self).__init__( # type: ignore
880 name,
881 default and raw(default),
882 context=context,
883 implicit_tag=implicit_tag,
884 explicit_tag=explicit_tag
885 )
887 def m2i(self, pkt, s): # type: ignore
888 # type: (ASN1_Packet, bytes) -> Tuple[Optional[ASN1_Packet], bytes]
889 bit_string, remain = super(ASN1F_BIT_STRING_ENCAPS, self).m2i(pkt, s)
890 if len(bit_string.val) % 8 != 0:
891 raise BER_Decoding_Error("wrong bit string", remaining=s)
892 if bit_string.val_readable:
893 p, s = self.extract_packet(self.cls, bit_string.val_readable,
894 _underlayer=pkt)
895 else:
896 return None, bit_string.val_readable
897 if len(s) > 0:
898 raise BER_Decoding_Error("unexpected remainder", remaining=s)
899 return p, remain
901 def i2m(self, pkt, x): # type: ignore
902 # type: (ASN1_Packet, Optional[ASN1_BIT_STRING]) -> bytes
903 if not isinstance(x, ASN1_BIT_STRING):
904 x = ASN1_BIT_STRING(
905 b"" if x is None else bytes(x), # type: ignore
906 readable=True,
907 )
908 return super(ASN1F_BIT_STRING_ENCAPS, self).i2m(pkt, x)
911class ASN1F_FLAGS(ASN1F_BIT_STRING):
912 def __init__(self,
913 name, # type: str
914 default, # type: Optional[str]
915 mapping, # type: List[str]
916 context=None, # type: Optional[Any]
917 implicit_tag=None, # type: Optional[int]
918 explicit_tag=None, # type: Optional[Any]
919 ):
920 # type: (...) -> None
921 self.mapping = mapping
922 super(ASN1F_FLAGS, self).__init__(
923 name, default,
924 default_readable=False,
925 context=context,
926 implicit_tag=implicit_tag,
927 explicit_tag=explicit_tag
928 )
930 def any2i(self, pkt, x):
931 # type: (ASN1_Packet, Any) -> str
932 if isinstance(x, str):
933 if any(y not in ["0", "1"] for y in x):
934 # resolve the flags
935 value = ["0"] * len(self.mapping)
936 for i in x.split("+"):
937 value[self.mapping.index(i)] = "1"
938 x = "".join(value)
939 x = ASN1_BIT_STRING(x)
940 return super(ASN1F_FLAGS, self).any2i(pkt, x)
942 def get_flags(self, pkt):
943 # type: (ASN1_Packet) -> List[str]
944 fbytes = getattr(pkt, self.name).val
945 return [self.mapping[i] for i, positional in enumerate(fbytes)
946 if positional == '1' and i < len(self.mapping)]
948 def i2repr(self, pkt, x):
949 # type: (ASN1_Packet, Any) -> str
950 if x is not None:
951 pretty_s = ", ".join(self.get_flags(pkt))
952 return pretty_s + " " + repr(x)
953 return repr(x)