1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
5# Acknowledgment: Maxence Tury <maxence.tury@ssi.gouv.fr>
6
7"""
8Classes that implement ASN.1 data structures.
9"""
10
11import copy
12
13from functools import reduce
14
15from scapy.asn1.asn1 import (
16 ASN1_BIT_STRING,
17 ASN1_BOOLEAN,
18 ASN1_Class,
19 ASN1_Class_UNIVERSAL,
20 ASN1_Error,
21 ASN1_INTEGER,
22 ASN1_NULL,
23 ASN1_OID,
24 ASN1_Object,
25 ASN1_STRING,
26)
27from scapy.asn1.ber import (
28 BER_Decoding_Error,
29 BER_id_dec,
30 BER_tagging_dec,
31 BER_tagging_enc,
32)
33from scapy.base_classes import BasePacket
34from scapy.compat import raw
35from scapy.volatile import (
36 GeneralizedTime,
37 RandChoice,
38 RandInt,
39 RandNum,
40 RandOID,
41 RandString,
42 RandField,
43)
44
45from scapy import packet
46
47from typing import (
48 Any,
49 AnyStr,
50 Callable,
51 Dict,
52 Generic,
53 List,
54 Optional,
55 Tuple,
56 Type,
57 TypeVar,
58 Union,
59 cast,
60 TYPE_CHECKING,
61)
62
63if TYPE_CHECKING:
64 from scapy.asn1packet import ASN1_Packet
65
66
67class ASN1F_badsequence(Exception):
68 pass
69
70
71class ASN1F_element(object):
72 pass
73
74
75##########################
76# Basic ASN1 Field #
77##########################
78
79_I = TypeVar('_I') # Internal storage
80_A = TypeVar('_A') # ASN.1 object
81
82
83class ASN1F_field(ASN1F_element, Generic[_I, _A]):
84 holds_packets = 0
85 islist = 0
86 ASN1_tag = ASN1_Class_UNIVERSAL.ANY
87 context = ASN1_Class_UNIVERSAL # type: Type[ASN1_Class]
88
89 def __init__(self,
90 name, # type: str
91 default, # type: Optional[_A]
92 context=None, # type: Optional[Type[ASN1_Class]]
93 implicit_tag=None, # type: Optional[int]
94 explicit_tag=None, # type: Optional[int]
95 flexible_tag=False, # type: Optional[bool]
96 size_len=None, # type: Optional[int]
97 ):
98 # type: (...) -> None
99 if context is not None:
100 self.context = context
101 self.name = name
102 if default is None:
103 self.default = default # type: Optional[_A]
104 elif isinstance(default, ASN1_NULL):
105 self.default = default # type: ignore
106 else:
107 self.default = self.ASN1_tag.asn1_object(default) # type: ignore
108 self.size_len = size_len
109 self.flexible_tag = flexible_tag
110 if (implicit_tag is not None) and (explicit_tag is not None):
111 err_msg = "field cannot be both implicitly and explicitly tagged"
112 raise ASN1_Error(err_msg)
113 self.implicit_tag = implicit_tag and int(implicit_tag)
114 self.explicit_tag = explicit_tag and int(explicit_tag)
115 # network_tag gets useful for ASN1F_CHOICE
116 self.network_tag = int(implicit_tag or explicit_tag or self.ASN1_tag)
117 self.owners = [] # type: List[Type[ASN1_Packet]]
118
119 def register_owner(self, cls):
120 # type: (Type[ASN1_Packet]) -> None
121 self.owners.append(cls)
122
123 def i2repr(self, pkt, x):
124 # type: (ASN1_Packet, _I) -> str
125 return repr(x)
126
127 def i2h(self, pkt, x):
128 # type: (ASN1_Packet, _I) -> Any
129 return x
130
131 def m2i(self, pkt, s):
132 # type: (ASN1_Packet, bytes) -> Tuple[_A, bytes]
133 """
134 The good thing about safedec is that it may still decode ASN1
135 even if there is a mismatch between the expected tag (self.ASN1_tag)
136 and the actual tag; the decoded ASN1 object will simply be put
137 into an ASN1_BADTAG object. However, safedec prevents the raising of
138 exceptions needed for ASN1F_optional processing.
139 Thus we use 'flexible_tag', which should be False with ASN1F_optional.
140
141 Regarding other fields, we might need to know whether encoding went
142 as expected or not. Noticeably, input methods from cert.py expect
143 certain exceptions to be raised. Hence default flexible_tag is False.
144 """
145 diff_tag, s = BER_tagging_dec(s, hidden_tag=self.ASN1_tag,
146 implicit_tag=self.implicit_tag,
147 explicit_tag=self.explicit_tag,
148 safe=self.flexible_tag,
149 _fname=self.name)
150 if diff_tag is not None:
151 # this implies that flexible_tag was True
152 if self.implicit_tag is not None:
153 self.implicit_tag = diff_tag
154 elif self.explicit_tag is not None:
155 self.explicit_tag = diff_tag
156 codec = self.ASN1_tag.get_codec(pkt.ASN1_codec)
157 if self.flexible_tag:
158 return codec.safedec(s, context=self.context) # type: ignore
159 else:
160 return codec.dec(s, context=self.context) # type: ignore
161
162 def i2m(self, pkt, x):
163 # type: (ASN1_Packet, Union[bytes, _I, _A]) -> bytes
164 if x is None:
165 return b""
166 if isinstance(x, ASN1_Object):
167 if (self.ASN1_tag == ASN1_Class_UNIVERSAL.ANY or
168 x.tag == ASN1_Class_UNIVERSAL.RAW or
169 x.tag == ASN1_Class_UNIVERSAL.ERROR or
170 self.ASN1_tag == x.tag):
171 s = x.enc(pkt.ASN1_codec)
172 else:
173 raise ASN1_Error("Encoding Error: got %r instead of an %r for field [%s]" % (x, self.ASN1_tag, self.name)) # noqa: E501
174 else:
175 s = self.ASN1_tag.get_codec(pkt.ASN1_codec).enc(x, size_len=self.size_len)
176 return BER_tagging_enc(s,
177 implicit_tag=self.implicit_tag,
178 explicit_tag=self.explicit_tag)
179
180 def any2i(self, pkt, x):
181 # type: (ASN1_Packet, Any) -> _I
182 return cast(_I, x)
183
184 def extract_packet(self,
185 cls, # type: Type[ASN1_Packet]
186 s, # type: bytes
187 _underlayer=None # type: Optional[ASN1_Packet]
188 ):
189 # type: (...) -> Tuple[ASN1_Packet, bytes]
190 try:
191 c = cls(s, _underlayer=_underlayer)
192 except ASN1F_badsequence:
193 c = packet.Raw(s, _underlayer=_underlayer) # type: ignore
194 cpad = c.getlayer(packet.Raw)
195 s = b""
196 if cpad is not None:
197 s = cpad.load
198 if cpad.underlayer:
199 del cpad.underlayer.payload
200 return c, s
201
202 def build(self, pkt):
203 # type: (ASN1_Packet) -> bytes
204 return self.i2m(pkt, getattr(pkt, self.name))
205
206 def dissect(self, pkt, s):
207 # type: (ASN1_Packet, bytes) -> bytes
208 v, s = self.m2i(pkt, s)
209 self.set_val(pkt, v)
210 return s
211
212 def do_copy(self, x):
213 # type: (Any) -> Any
214 if isinstance(x, list):
215 x = x[:]
216 for i in range(len(x)):
217 if isinstance(x[i], BasePacket):
218 x[i] = x[i].copy()
219 return x
220 if hasattr(x, "copy"):
221 return x.copy()
222 return x
223
224 def set_val(self, pkt, val):
225 # type: (ASN1_Packet, Any) -> None
226 setattr(pkt, self.name, val)
227
228 def is_empty(self, pkt):
229 # type: (ASN1_Packet) -> bool
230 return getattr(pkt, self.name) is None
231
232 def get_fields_list(self):
233 # type: () -> List[ASN1F_field[Any, Any]]
234 return [self]
235
236 def __str__(self):
237 # type: () -> str
238 return repr(self)
239
240 def randval(self):
241 # type: () -> RandField[_I]
242 return cast(RandField[_I], RandInt())
243
244 def copy(self):
245 # type: () -> ASN1F_field[_I, _A]
246 return copy.copy(self)
247
248
249############################
250# Simple ASN1 Fields #
251############################
252
253class ASN1F_BOOLEAN(ASN1F_field[bool, ASN1_BOOLEAN]):
254 ASN1_tag = ASN1_Class_UNIVERSAL.BOOLEAN
255
256 def randval(self):
257 # type: () -> RandChoice
258 return RandChoice(True, False)
259
260
261class ASN1F_INTEGER(ASN1F_field[int, ASN1_INTEGER]):
262 ASN1_tag = ASN1_Class_UNIVERSAL.INTEGER
263
264 def randval(self):
265 # type: () -> RandNum
266 return RandNum(-2**64, 2**64 - 1)
267
268
269class ASN1F_enum_INTEGER(ASN1F_INTEGER):
270 def __init__(self,
271 name, # type: str
272 default, # type: ASN1_INTEGER
273 enum, # type: Dict[int, str]
274 context=None, # type: Optional[Any]
275 implicit_tag=None, # type: Optional[Any]
276 explicit_tag=None, # type: Optional[Any]
277 ):
278 # type: (...) -> None
279 super(ASN1F_enum_INTEGER, self).__init__(
280 name, default, context=context,
281 implicit_tag=implicit_tag,
282 explicit_tag=explicit_tag
283 )
284 i2s = self.i2s = {} # type: Dict[int, str]
285 s2i = self.s2i = {} # type: Dict[str, int]
286 if isinstance(enum, list):
287 keys = range(len(enum))
288 else:
289 keys = list(enum)
290 if any(isinstance(x, str) for x in keys):
291 i2s, s2i = s2i, i2s # type: ignore
292 for k in keys:
293 i2s[k] = enum[k]
294 s2i[enum[k]] = k
295
296 def i2m(self,
297 pkt, # type: ASN1_Packet
298 s, # type: Union[bytes, str, int, ASN1_INTEGER]
299 ):
300 # type: (...) -> bytes
301 if not isinstance(s, str):
302 vs = s
303 else:
304 vs = self.s2i[s]
305 return super(ASN1F_enum_INTEGER, self).i2m(pkt, vs)
306
307 def i2repr(self,
308 pkt, # type: ASN1_Packet
309 x, # type: Union[str, int]
310 ):
311 # type: (...) -> str
312 if x is not None and isinstance(x, ASN1_INTEGER):
313 r = self.i2s.get(x.val)
314 if r:
315 return "'%s' %s" % (r, repr(x))
316 return repr(x)
317
318
319class ASN1F_BIT_STRING(ASN1F_field[str, ASN1_BIT_STRING]):
320 ASN1_tag = ASN1_Class_UNIVERSAL.BIT_STRING
321
322 def __init__(self,
323 name, # type: str
324 default, # type: Optional[Union[ASN1_BIT_STRING, AnyStr]]
325 default_readable=True, # type: bool
326 context=None, # type: Optional[Any]
327 implicit_tag=None, # type: Optional[int]
328 explicit_tag=None, # type: Optional[int]
329 ):
330 # type: (...) -> None
331 super(ASN1F_BIT_STRING, self).__init__(
332 name, None, context=context,
333 implicit_tag=implicit_tag,
334 explicit_tag=explicit_tag
335 )
336 if isinstance(default, (bytes, str)):
337 self.default = ASN1_BIT_STRING(default,
338 readable=default_readable)
339 else:
340 self.default = default
341
342 def randval(self):
343 # type: () -> RandString
344 return RandString(RandNum(0, 1000))
345
346
347class ASN1F_STRING(ASN1F_field[str, ASN1_STRING]):
348 ASN1_tag = ASN1_Class_UNIVERSAL.STRING
349
350 def randval(self):
351 # type: () -> RandString
352 return RandString(RandNum(0, 1000))
353
354
355class ASN1F_NULL(ASN1F_INTEGER):
356 ASN1_tag = ASN1_Class_UNIVERSAL.NULL
357
358
359class ASN1F_OID(ASN1F_field[str, ASN1_OID]):
360 ASN1_tag = ASN1_Class_UNIVERSAL.OID
361
362 def randval(self):
363 # type: () -> RandOID
364 return RandOID()
365
366
367class ASN1F_ENUMERATED(ASN1F_enum_INTEGER):
368 ASN1_tag = ASN1_Class_UNIVERSAL.ENUMERATED
369
370
371class ASN1F_UTF8_STRING(ASN1F_STRING):
372 ASN1_tag = ASN1_Class_UNIVERSAL.UTF8_STRING
373
374
375class ASN1F_NUMERIC_STRING(ASN1F_STRING):
376 ASN1_tag = ASN1_Class_UNIVERSAL.NUMERIC_STRING
377
378
379class ASN1F_PRINTABLE_STRING(ASN1F_STRING):
380 ASN1_tag = ASN1_Class_UNIVERSAL.PRINTABLE_STRING
381
382
383class ASN1F_T61_STRING(ASN1F_STRING):
384 ASN1_tag = ASN1_Class_UNIVERSAL.T61_STRING
385
386
387class ASN1F_VIDEOTEX_STRING(ASN1F_STRING):
388 ASN1_tag = ASN1_Class_UNIVERSAL.VIDEOTEX_STRING
389
390
391class ASN1F_IA5_STRING(ASN1F_STRING):
392 ASN1_tag = ASN1_Class_UNIVERSAL.IA5_STRING
393
394
395class ASN1F_GENERAL_STRING(ASN1F_STRING):
396 ASN1_tag = ASN1_Class_UNIVERSAL.GENERAL_STRING
397
398
399class ASN1F_UTC_TIME(ASN1F_STRING):
400 ASN1_tag = ASN1_Class_UNIVERSAL.UTC_TIME
401
402 def randval(self): # type: ignore
403 # type: () -> GeneralizedTime
404 return GeneralizedTime()
405
406
407class ASN1F_GENERALIZED_TIME(ASN1F_STRING):
408 ASN1_tag = ASN1_Class_UNIVERSAL.GENERALIZED_TIME
409
410 def randval(self): # type: ignore
411 # type: () -> GeneralizedTime
412 return GeneralizedTime()
413
414
415class ASN1F_ISO646_STRING(ASN1F_STRING):
416 ASN1_tag = ASN1_Class_UNIVERSAL.ISO646_STRING
417
418
419class ASN1F_UNIVERSAL_STRING(ASN1F_STRING):
420 ASN1_tag = ASN1_Class_UNIVERSAL.UNIVERSAL_STRING
421
422
423class ASN1F_BMP_STRING(ASN1F_STRING):
424 ASN1_tag = ASN1_Class_UNIVERSAL.BMP_STRING
425
426
427class ASN1F_SEQUENCE(ASN1F_field[List[Any], List[Any]]):
428 # Here is how you could decode a SEQUENCE
429 # with an unknown, private high-tag prefix :
430 # class PrivSeq(ASN1_Packet):
431 # ASN1_codec = ASN1_Codecs.BER
432 # ASN1_root = ASN1F_SEQUENCE(
433 # <asn1 field #0>,
434 # ...
435 # <asn1 field #N>,
436 # explicit_tag=0,
437 # flexible_tag=True)
438 # Because we use flexible_tag, the value of the explicit_tag does not matter. # noqa: E501
439 ASN1_tag = ASN1_Class_UNIVERSAL.SEQUENCE
440 holds_packets = 1
441
442 def __init__(self, *seq, **kwargs):
443 # type: (*Any, **Any) -> None
444 name = "dummy_seq_name"
445 default = [field.default for field in seq]
446 super(ASN1F_SEQUENCE, self).__init__(
447 name, default, **kwargs
448 )
449 self.seq = seq
450 self.islist = len(seq) > 1
451
452 def __repr__(self):
453 # type: () -> str
454 return "<%s%r>" % (self.__class__.__name__, self.seq)
455
456 def is_empty(self, pkt):
457 # type: (ASN1_Packet) -> bool
458 return all(f.is_empty(pkt) for f in self.seq)
459
460 def get_fields_list(self):
461 # type: () -> List[ASN1F_field[Any, Any]]
462 return reduce(lambda x, y: x + y.get_fields_list(),
463 self.seq, [])
464
465 def m2i(self, pkt, s):
466 # type: (Any, bytes) -> Tuple[Any, bytes]
467 """
468 ASN1F_SEQUENCE behaves transparently, with nested ASN1_objects being
469 dissected one by one. Because we use obj.dissect (see loop below)
470 instead of obj.m2i (as we trust dissect to do the appropriate set_vals)
471 we do not directly retrieve the list of nested objects.
472 Thus m2i returns an empty list (along with the proper remainder).
473 It is discarded by dissect() and should not be missed elsewhere.
474 """
475 diff_tag, s = BER_tagging_dec(s, hidden_tag=self.ASN1_tag,
476 implicit_tag=self.implicit_tag,
477 explicit_tag=self.explicit_tag,
478 safe=self.flexible_tag,
479 _fname=pkt.name)
480 if diff_tag is not None:
481 if self.implicit_tag is not None:
482 self.implicit_tag = diff_tag
483 elif self.explicit_tag is not None:
484 self.explicit_tag = diff_tag
485 codec = self.ASN1_tag.get_codec(pkt.ASN1_codec)
486 i, s, remain = codec.check_type_check_len(s)
487 if len(s) == 0:
488 for obj in self.seq:
489 obj.set_val(pkt, None)
490 else:
491 for obj in self.seq:
492 try:
493 s = obj.dissect(pkt, s)
494 except ASN1F_badsequence:
495 break
496 if len(s) > 0:
497 raise BER_Decoding_Error("unexpected remainder", remaining=s)
498 return [], remain
499
500 def dissect(self, pkt, s):
501 # type: (Any, bytes) -> bytes
502 _, x = self.m2i(pkt, s)
503 return x
504
505 def build(self, pkt):
506 # type: (ASN1_Packet) -> bytes
507 s = reduce(lambda x, y: x + y.build(pkt),
508 self.seq, b"")
509 return super(ASN1F_SEQUENCE, self).i2m(pkt, s)
510
511
512class ASN1F_SET(ASN1F_SEQUENCE):
513 ASN1_tag = ASN1_Class_UNIVERSAL.SET
514
515
516_SEQ_T = Union[
517 'ASN1_Packet',
518 Type[ASN1F_field[Any, Any]],
519 'ASN1F_PACKET',
520 ASN1F_field[Any, Any],
521]
522
523
524class ASN1F_SEQUENCE_OF(ASN1F_field[List[_SEQ_T],
525 List[ASN1_Object[Any]]]):
526 """
527 Two types are allowed as cls: ASN1_Packet, ASN1F_field
528 """
529 ASN1_tag = ASN1_Class_UNIVERSAL.SEQUENCE
530 islist = 1
531
532 def __init__(self,
533 name, # type: str
534 default, # type: Any
535 cls, # type: _SEQ_T
536 context=None, # type: Optional[Any]
537 implicit_tag=None, # type: Optional[Any]
538 explicit_tag=None, # type: Optional[Any]
539 ):
540 # type: (...) -> None
541 if isinstance(cls, type) and issubclass(cls, ASN1F_field) or \
542 isinstance(cls, ASN1F_field):
543 if isinstance(cls, type):
544 self.fld = cls(name, b"")
545 else:
546 self.fld = cls
547 self._extract_packet = lambda s, pkt: self.fld.m2i(pkt, s)
548 self.holds_packets = 0
549 elif hasattr(cls, "ASN1_root") or callable(cls):
550 self.cls = cast("Type[ASN1_Packet]", cls)
551 self._extract_packet = lambda s, pkt: self.extract_packet(
552 self.cls, s, _underlayer=pkt)
553 self.holds_packets = 1
554 else:
555 raise ValueError("cls should be an ASN1_Packet or ASN1_field")
556 super(ASN1F_SEQUENCE_OF, self).__init__(
557 name, None, context=context,
558 implicit_tag=implicit_tag, explicit_tag=explicit_tag
559 )
560 self.default = default
561
562 def is_empty(self,
563 pkt, # type: ASN1_Packet
564 ):
565 # type: (...) -> bool
566 return ASN1F_field.is_empty(self, pkt)
567
568 def m2i(self,
569 pkt, # type: ASN1_Packet
570 s, # type: bytes
571 ):
572 # type: (...) -> Tuple[List[Any], bytes]
573 diff_tag, s = BER_tagging_dec(s, hidden_tag=self.ASN1_tag,
574 implicit_tag=self.implicit_tag,
575 explicit_tag=self.explicit_tag,
576 safe=self.flexible_tag)
577 if diff_tag is not None:
578 if self.implicit_tag is not None:
579 self.implicit_tag = diff_tag
580 elif self.explicit_tag is not None:
581 self.explicit_tag = diff_tag
582 codec = self.ASN1_tag.get_codec(pkt.ASN1_codec)
583 i, s, remain = codec.check_type_check_len(s)
584 lst = []
585 while s:
586 c, s = self._extract_packet(s, pkt) # type: ignore
587 if c:
588 lst.append(c)
589 if len(s) > 0:
590 raise BER_Decoding_Error("unexpected remainder", remaining=s)
591 return lst, remain
592
593 def build(self, pkt):
594 # type: (ASN1_Packet) -> bytes
595 val = getattr(pkt, self.name)
596 if isinstance(val, ASN1_Object) and \
597 val.tag == ASN1_Class_UNIVERSAL.RAW:
598 s = cast(Union[List[_SEQ_T], bytes], val)
599 elif val is None:
600 s = b""
601 else:
602 s = b"".join(raw(i) for i in val)
603 return self.i2m(pkt, s)
604
605 def i2repr(self, pkt, x):
606 # type: (ASN1_Packet, _I) -> str
607 if self.holds_packets:
608 return super(ASN1F_SEQUENCE_OF, self).i2repr(pkt, x) # type: ignore
609 elif x is None:
610 return "[]"
611 else:
612 return "[%s]" % ", ".join(
613 self.fld.i2repr(pkt, x) for x in x # type: ignore
614 )
615
616 def randval(self):
617 # type: () -> Any
618 if self.holds_packets:
619 return packet.fuzz(self.cls())
620 else:
621 return self.fld.randval()
622
623 def __repr__(self):
624 # type: () -> str
625 return "<%s %s>" % (self.__class__.__name__, self.name)
626
627
628class ASN1F_SET_OF(ASN1F_SEQUENCE_OF):
629 ASN1_tag = ASN1_Class_UNIVERSAL.SET
630
631
632class ASN1F_IPADDRESS(ASN1F_STRING):
633 ASN1_tag = ASN1_Class_UNIVERSAL.IPADDRESS
634
635
636class ASN1F_TIME_TICKS(ASN1F_INTEGER):
637 ASN1_tag = ASN1_Class_UNIVERSAL.TIME_TICKS
638
639
640#############################
641# Complex ASN1 Fields #
642#############################
643
644class ASN1F_optional(ASN1F_element):
645 def __init__(self, field):
646 # type: (ASN1F_field[Any, Any]) -> None
647 field.flexible_tag = False
648 self._field = field
649
650 def __getattr__(self, attr):
651 # type: (str) -> Optional[Any]
652 return getattr(self._field, attr)
653
654 def m2i(self, pkt, s):
655 # type: (ASN1_Packet, bytes) -> Tuple[Any, bytes]
656 try:
657 return self._field.m2i(pkt, s)
658 except (ASN1_Error, ASN1F_badsequence, BER_Decoding_Error):
659 # ASN1_Error may be raised by ASN1F_CHOICE
660 return None, s
661
662 def dissect(self, pkt, s):
663 # type: (ASN1_Packet, bytes) -> bytes
664 try:
665 return self._field.dissect(pkt, s)
666 except (ASN1_Error, ASN1F_badsequence, BER_Decoding_Error):
667 self._field.set_val(pkt, None)
668 return s
669
670 def build(self, pkt):
671 # type: (ASN1_Packet) -> bytes
672 if self._field.is_empty(pkt):
673 return b""
674 return self._field.build(pkt)
675
676 def any2i(self, pkt, x):
677 # type: (ASN1_Packet, Any) -> Any
678 return self._field.any2i(pkt, x)
679
680 def i2repr(self, pkt, x):
681 # type: (ASN1_Packet, Any) -> str
682 return self._field.i2repr(pkt, x)
683
684
685_CHOICE_T = Union['ASN1_Packet', Type[ASN1F_field[Any, Any]], 'ASN1F_PACKET']
686
687
688class ASN1F_CHOICE(ASN1F_field[_CHOICE_T, ASN1_Object[Any]]):
689 """
690 Multiple types are allowed: ASN1_Packet, ASN1F_field and ASN1F_PACKET(),
691 See layers/x509.py for examples.
692 Other ASN1F_field instances than ASN1F_PACKET instances must not be used.
693 """
694 holds_packets = 1
695 ASN1_tag = ASN1_Class_UNIVERSAL.ANY
696
697 def __init__(self, name, default, *args, **kwargs):
698 # type: (str, Any, *_CHOICE_T, **Any) -> None
699 if "implicit_tag" in kwargs:
700 err_msg = "ASN1F_CHOICE has been called with an implicit_tag"
701 raise ASN1_Error(err_msg)
702 self.implicit_tag = None
703 for kwarg in ["context", "explicit_tag"]:
704 setattr(self, kwarg, kwargs.get(kwarg))
705 super(ASN1F_CHOICE, self).__init__(
706 name, None, context=self.context,
707 explicit_tag=self.explicit_tag
708 )
709 self.default = default
710 self.current_choice = None
711 self.choices = {} # type: Dict[int, _CHOICE_T]
712 self.pktchoices = {}
713 for p in args:
714 if hasattr(p, "ASN1_root"):
715 p = cast('ASN1_Packet', p)
716 # should be ASN1_Packet
717 if hasattr(p.ASN1_root, "choices"):
718 root = cast(ASN1F_CHOICE, p.ASN1_root)
719 for k, v in root.choices.items():
720 # ASN1F_CHOICE recursion
721 self.choices[k] = v
722 else:
723 self.choices[p.ASN1_root.network_tag] = p
724 elif hasattr(p, "ASN1_tag"):
725 if isinstance(p, type):
726 # should be ASN1F_field class
727 self.choices[int(p.ASN1_tag)] = p
728 else:
729 # should be ASN1F_field instance
730 self.choices[p.network_tag] = p
731 self.pktchoices[hash(p.cls)] = (p.implicit_tag, p.explicit_tag) # noqa: E501
732 else:
733 raise ASN1_Error("ASN1F_CHOICE: no tag found for one field")
734
735 def m2i(self, pkt, s):
736 # type: (ASN1_Packet, bytes) -> Tuple[ASN1_Object[Any], bytes]
737 """
738 First we have to retrieve the appropriate choice.
739 Then we extract the field/packet, according to this choice.
740 """
741 if len(s) == 0:
742 raise ASN1_Error("ASN1F_CHOICE: got empty string")
743 _, s = BER_tagging_dec(s, hidden_tag=self.ASN1_tag,
744 explicit_tag=self.explicit_tag)
745 tag, _ = BER_id_dec(s)
746 if tag in self.choices:
747 choice = self.choices[tag]
748 else:
749 if self.flexible_tag:
750 choice = ASN1F_field
751 else:
752 raise ASN1_Error(
753 "ASN1F_CHOICE: unexpected field in '%s' "
754 "(tag %s not in possible tags %s)" % (
755 self.name, tag, list(self.choices.keys())
756 )
757 )
758 if hasattr(choice, "ASN1_root"):
759 # we don't want to import ASN1_Packet in this module...
760 return self.extract_packet(choice, s, _underlayer=pkt) # type: ignore
761 elif isinstance(choice, type):
762 return choice(self.name, b"").m2i(pkt, s)
763 else:
764 # XXX check properly if this is an ASN1F_PACKET
765 return choice.m2i(pkt, s)
766
767 def i2m(self, pkt, x):
768 # type: (ASN1_Packet, Any) -> bytes
769 if x is None:
770 s = b""
771 else:
772 s = raw(x)
773 if hash(type(x)) in self.pktchoices:
774 imp, exp = self.pktchoices[hash(type(x))]
775 s = BER_tagging_enc(s,
776 implicit_tag=imp,
777 explicit_tag=exp)
778 return BER_tagging_enc(s, explicit_tag=self.explicit_tag)
779
780 def randval(self):
781 # type: () -> RandChoice
782 randchoices = []
783 for p in self.choices.values():
784 if hasattr(p, "ASN1_root"):
785 # should be ASN1_Packet class
786 randchoices.append(packet.fuzz(p())) # type: ignore
787 elif hasattr(p, "ASN1_tag"):
788 if isinstance(p, type):
789 # should be (basic) ASN1F_field class
790 randchoices.append(p("dummy", None).randval())
791 else:
792 # should be ASN1F_PACKET instance
793 randchoices.append(p.randval())
794 return RandChoice(*randchoices)
795
796
797class ASN1F_PACKET(ASN1F_field['ASN1_Packet', Optional['ASN1_Packet']]):
798 holds_packets = 1
799
800 def __init__(self,
801 name, # type: str
802 default, # type: Optional[ASN1_Packet]
803 cls, # type: Type[ASN1_Packet]
804 context=None, # type: Optional[Any]
805 implicit_tag=None, # type: Optional[int]
806 explicit_tag=None, # type: Optional[int]
807 next_cls_cb=None, # type: Optional[Callable[[ASN1_Packet], Type[ASN1_Packet]]] # noqa: E501
808 ):
809 # type: (...) -> None
810 self.cls = cls
811 self.next_cls_cb = next_cls_cb
812 super(ASN1F_PACKET, self).__init__(
813 name, None, context=context,
814 implicit_tag=implicit_tag, explicit_tag=explicit_tag
815 )
816 if implicit_tag is None and explicit_tag is None and cls is not None:
817 if cls.ASN1_root.ASN1_tag == ASN1_Class_UNIVERSAL.SEQUENCE:
818 self.network_tag = 16 | 0x20 # 16 + CONSTRUCTED
819 self.default = default
820
821 def m2i(self, pkt, s):
822 # type: (ASN1_Packet, bytes) -> Tuple[Any, bytes]
823 if self.next_cls_cb:
824 cls = self.next_cls_cb(pkt) or self.cls
825 else:
826 cls = self.cls
827 if not hasattr(cls, "ASN1_root"):
828 # A normal Packet (!= ASN1)
829 return self.extract_packet(cls, s, _underlayer=pkt)
830 diff_tag, s = BER_tagging_dec(s, hidden_tag=cls.ASN1_root.ASN1_tag, # noqa: E501
831 implicit_tag=self.implicit_tag,
832 explicit_tag=self.explicit_tag,
833 safe=self.flexible_tag,
834 _fname=self.name)
835 if diff_tag is not None:
836 if self.implicit_tag is not None:
837 self.implicit_tag = diff_tag
838 elif self.explicit_tag is not None:
839 self.explicit_tag = diff_tag
840 if not s:
841 return None, s
842 return self.extract_packet(cls, s, _underlayer=pkt)
843
844 def i2m(self,
845 pkt, # type: ASN1_Packet
846 x # type: Union[bytes, ASN1_Packet, None, ASN1_Object[Optional[ASN1_Packet]]] # noqa: E501
847 ):
848 # type: (...) -> bytes
849 if x is None:
850 s = b""
851 elif isinstance(x, bytes):
852 s = x
853 elif isinstance(x, ASN1_Object):
854 if x.val:
855 s = raw(x.val)
856 else:
857 s = b""
858 else:
859 s = raw(x)
860 if not hasattr(x, "ASN1_root"):
861 # A normal Packet (!= ASN1)
862 return s
863 return BER_tagging_enc(s,
864 implicit_tag=self.implicit_tag,
865 explicit_tag=self.explicit_tag)
866
867 def any2i(self,
868 pkt, # type: ASN1_Packet
869 x # type: Union[bytes, ASN1_Packet, None, ASN1_Object[Optional[ASN1_Packet]]] # noqa: E501
870 ):
871 # type: (...) -> 'ASN1_Packet'
872 if hasattr(x, "add_underlayer"):
873 x.add_underlayer(pkt) # type: ignore
874 return super(ASN1F_PACKET, self).any2i(pkt, x)
875
876 def randval(self): # type: ignore
877 # type: () -> ASN1_Packet
878 return packet.fuzz(self.cls())
879
880
881class ASN1F_BIT_STRING_ENCAPS(ASN1F_BIT_STRING):
882 """
883 We may emulate simple string encapsulation with explicit_tag=0x04,
884 but we need a specific class for bit strings because of unused bits, etc.
885 """
886 ASN1_tag = ASN1_Class_UNIVERSAL.BIT_STRING
887
888 def __init__(self,
889 name, # type: str
890 default, # type: Optional[ASN1_Packet]
891 cls, # type: Type[ASN1_Packet]
892 context=None, # type: Optional[Any]
893 implicit_tag=None, # type: Optional[int]
894 explicit_tag=None, # type: Optional[int]
895 ):
896 # type: (...) -> None
897 self.cls = cls
898 super(ASN1F_BIT_STRING_ENCAPS, self).__init__( # type: ignore
899 name,
900 default and raw(default),
901 context=context,
902 implicit_tag=implicit_tag,
903 explicit_tag=explicit_tag
904 )
905
906 def m2i(self, pkt, s): # type: ignore
907 # type: (ASN1_Packet, bytes) -> Tuple[Optional[ASN1_Packet], bytes]
908 bit_string, remain = super(ASN1F_BIT_STRING_ENCAPS, self).m2i(pkt, s)
909 if len(bit_string.val) % 8 != 0:
910 raise BER_Decoding_Error("wrong bit string", remaining=s)
911 if bit_string.val_readable:
912 p, s = self.extract_packet(self.cls, bit_string.val_readable,
913 _underlayer=pkt)
914 else:
915 return None, bit_string.val_readable
916 if len(s) > 0:
917 raise BER_Decoding_Error("unexpected remainder", remaining=s)
918 return p, remain
919
920 def i2m(self, pkt, x): # type: ignore
921 # type: (ASN1_Packet, Optional[ASN1_BIT_STRING]) -> bytes
922 if not isinstance(x, ASN1_BIT_STRING):
923 x = ASN1_BIT_STRING(
924 b"" if x is None else bytes(x), # type: ignore
925 readable=True,
926 )
927 return super(ASN1F_BIT_STRING_ENCAPS, self).i2m(pkt, x)
928
929
930class ASN1F_FLAGS(ASN1F_BIT_STRING):
931 def __init__(self,
932 name, # type: str
933 default, # type: Optional[str]
934 mapping, # type: List[str]
935 context=None, # type: Optional[Any]
936 implicit_tag=None, # type: Optional[int]
937 explicit_tag=None, # type: Optional[Any]
938 ):
939 # type: (...) -> None
940 self.mapping = mapping
941 super(ASN1F_FLAGS, self).__init__(
942 name, default,
943 default_readable=False,
944 context=context,
945 implicit_tag=implicit_tag,
946 explicit_tag=explicit_tag
947 )
948
949 def any2i(self, pkt, x):
950 # type: (ASN1_Packet, Any) -> str
951 if isinstance(x, str):
952 if any(y not in ["0", "1"] for y in x):
953 # resolve the flags
954 value = ["0"] * len(self.mapping)
955 for i in x.split("+"):
956 value[self.mapping.index(i)] = "1"
957 x = "".join(value)
958 x = ASN1_BIT_STRING(x)
959 return super(ASN1F_FLAGS, self).any2i(pkt, x)
960
961 def get_flags(self, pkt):
962 # type: (ASN1_Packet) -> List[str]
963 fbytes = getattr(pkt, self.name).val
964 return [self.mapping[i] for i, positional in enumerate(fbytes)
965 if positional == '1' and i < len(self.mapping)]
966
967 def i2repr(self, pkt, x):
968 # type: (ASN1_Packet, Any) -> str
969 if x is not None:
970 pretty_s = ", ".join(self.get_flags(pkt))
971 return pretty_s + " " + repr(x)
972 return repr(x)
973
974
975class ASN1F_STRING_PacketField(ASN1F_STRING):
976 """
977 ASN1F_STRING that holds packets.
978 """
979 holds_packets = 1
980
981 def i2m(self, pkt, val):
982 # type: (ASN1_Packet, Any) -> bytes
983 if hasattr(val, "ASN1_root"):
984 val = ASN1_STRING(bytes(val))
985 return super(ASN1F_STRING_PacketField, self).i2m(pkt, val)
986
987 def any2i(self, pkt, x):
988 # type: (ASN1_Packet, Any) -> Any
989 if hasattr(x, "add_underlayer"):
990 x.add_underlayer(pkt)
991 return super(ASN1F_STRING_PacketField, self).any2i(pkt, x)
992
993
994class ASN1F_STRING_ENCAPS(ASN1F_STRING_PacketField):
995 """
996 ASN1F_STRING that encapsulates a single ASN1 packet.
997 """
998
999 def __init__(self,
1000 name, # type: str
1001 default, # type: Optional[ASN1_Packet]
1002 cls, # type: Type[ASN1_Packet]
1003 context=None, # type: Optional[Any]
1004 implicit_tag=None, # type: Optional[int]
1005 explicit_tag=None, # type: Optional[int]
1006 ):
1007 # type: (...) -> None
1008 self.cls = cls
1009 super(ASN1F_STRING_ENCAPS, self).__init__(
1010 name,
1011 default and bytes(default), # type: ignore
1012 context=context,
1013 implicit_tag=implicit_tag,
1014 explicit_tag=explicit_tag
1015 )
1016
1017 def m2i(self, pkt, s): # type: ignore
1018 # type: (ASN1_Packet, bytes) -> Tuple[ASN1_Packet, bytes]
1019 val = super(ASN1F_STRING_ENCAPS, self).m2i(pkt, s)
1020 return self.cls(val[0].val, _underlayer=pkt), val[1]