Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/asn1fields.py: 72%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

490 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5# Acknowledgment: Maxence Tury <maxence.tury@ssi.gouv.fr> 

6 

7""" 

8Classes that implement ASN.1 data structures. 

9""" 

10 

11import copy 

12 

13from functools import reduce 

14 

15from scapy.asn1.asn1 import ( 

16 ASN1_BIT_STRING, 

17 ASN1_BOOLEAN, 

18 ASN1_Class, 

19 ASN1_Class_UNIVERSAL, 

20 ASN1_Error, 

21 ASN1_INTEGER, 

22 ASN1_NULL, 

23 ASN1_OID, 

24 ASN1_Object, 

25 ASN1_STRING, 

26) 

27from scapy.asn1.ber import ( 

28 BER_Decoding_Error, 

29 BER_id_dec, 

30 BER_tagging_dec, 

31 BER_tagging_enc, 

32) 

33from scapy.base_classes import BasePacket 

34from scapy.volatile import ( 

35 GeneralizedTime, 

36 RandChoice, 

37 RandInt, 

38 RandNum, 

39 RandOID, 

40 RandString, 

41 RandField, 

42) 

43 

44from scapy import packet 

45 

46from typing import ( 

47 Any, 

48 AnyStr, 

49 Callable, 

50 Dict, 

51 Generic, 

52 List, 

53 Optional, 

54 Tuple, 

55 Type, 

56 TypeVar, 

57 Union, 

58 cast, 

59 TYPE_CHECKING, 

60) 

61 

62if TYPE_CHECKING: 

63 from scapy.asn1packet import ASN1_Packet 

64 

65 

66class ASN1F_badsequence(Exception): 

67 pass 

68 

69 

70class ASN1F_element(object): 

71 pass 

72 

73 

74########################## 

75# Basic ASN1 Field # 

76########################## 

77 

78_I = TypeVar('_I') # Internal storage 

79_A = TypeVar('_A') # ASN.1 object 

80 

81 

82class ASN1F_field(ASN1F_element, Generic[_I, _A]): 

83 holds_packets = 0 

84 islist = 0 

85 ASN1_tag = ASN1_Class_UNIVERSAL.ANY 

86 context = ASN1_Class_UNIVERSAL # type: Type[ASN1_Class] 

87 

88 def __init__(self, 

89 name, # type: str 

90 default, # type: Optional[_A] 

91 context=None, # type: Optional[Type[ASN1_Class]] 

92 implicit_tag=None, # type: Optional[int] 

93 explicit_tag=None, # type: Optional[int] 

94 flexible_tag=False, # type: Optional[bool] 

95 size_len=None, # type: Optional[int] 

96 ): 

97 # type: (...) -> None 

98 if context is not None: 

99 self.context = context 

100 self.name = name 

101 if default is None: 

102 self.default = default # type: Optional[_A] 

103 elif isinstance(default, ASN1_NULL): 

104 self.default = default # type: ignore 

105 else: 

106 self.default = self.ASN1_tag.asn1_object(default) # type: ignore 

107 self.size_len = size_len 

108 self.flexible_tag = flexible_tag 

109 if (implicit_tag is not None) and (explicit_tag is not None): 

110 err_msg = "field cannot be both implicitly and explicitly tagged" 

111 raise ASN1_Error(err_msg) 

112 self.implicit_tag = implicit_tag and int(implicit_tag) 

113 self.explicit_tag = explicit_tag and int(explicit_tag) 

114 # network_tag gets useful for ASN1F_CHOICE 

115 self.network_tag = int(implicit_tag or explicit_tag or self.ASN1_tag) 

116 self.owners = [] # type: List[Type[ASN1_Packet]] 

117 

118 def register_owner(self, cls): 

119 # type: (Type[ASN1_Packet]) -> None 

120 self.owners.append(cls) 

121 

122 def i2repr(self, pkt, x): 

123 # type: (ASN1_Packet, _I) -> str 

124 return repr(x) 

125 

126 def i2h(self, pkt, x): 

127 # type: (ASN1_Packet, _I) -> Any 

128 return x 

129 

130 def m2i(self, pkt, s): 

131 # type: (ASN1_Packet, bytes) -> Tuple[_A, bytes] 

132 """ 

133 The good thing about safedec is that it may still decode ASN1 

134 even if there is a mismatch between the expected tag (self.ASN1_tag) 

135 and the actual tag; the decoded ASN1 object will simply be put 

136 into an ASN1_BADTAG object. However, safedec prevents the raising of 

137 exceptions needed for ASN1F_optional processing. 

138 Thus we use 'flexible_tag', which should be False with ASN1F_optional. 

139 

140 Regarding other fields, we might need to know whether encoding went 

141 as expected or not. Noticeably, input methods from cert.py expect 

142 certain exceptions to be raised. Hence default flexible_tag is False. 

143 """ 

144 diff_tag, s = BER_tagging_dec(s, hidden_tag=self.ASN1_tag, 

145 implicit_tag=self.implicit_tag, 

146 explicit_tag=self.explicit_tag, 

147 safe=self.flexible_tag, 

148 _fname=self.name) 

149 if diff_tag is not None: 

150 # this implies that flexible_tag was True 

151 if self.implicit_tag is not None: 

152 self.implicit_tag = diff_tag 

153 elif self.explicit_tag is not None: 

154 self.explicit_tag = diff_tag 

155 codec = self.ASN1_tag.get_codec(pkt.ASN1_codec) 

156 if self.flexible_tag: 

157 return codec.safedec(s, context=self.context) # type: ignore 

158 else: 

159 return codec.dec(s, context=self.context) # type: ignore 

160 

161 def i2m(self, pkt, x): 

162 # type: (ASN1_Packet, Union[bytes, _I, _A]) -> bytes 

163 if x is None: 

164 return b"" 

165 if isinstance(x, ASN1_Object): 

166 if (self.ASN1_tag == ASN1_Class_UNIVERSAL.ANY or 

167 x.tag == ASN1_Class_UNIVERSAL.RAW or 

168 x.tag == ASN1_Class_UNIVERSAL.ERROR or 

169 self.ASN1_tag == x.tag): 

170 s = x.enc(pkt.ASN1_codec) 

171 else: 

172 raise ASN1_Error("Encoding Error: got %r instead of an %r for field [%s]" % (x, self.ASN1_tag, self.name)) # noqa: E501 

173 else: 

174 s = self.ASN1_tag.get_codec(pkt.ASN1_codec).enc(x, size_len=self.size_len) 

175 return BER_tagging_enc(s, 

176 implicit_tag=self.implicit_tag, 

177 explicit_tag=self.explicit_tag) 

178 

179 def any2i(self, pkt, x): 

180 # type: (ASN1_Packet, Any) -> _I 

181 return cast(_I, x) 

182 

183 def extract_packet(self, 

184 cls, # type: Type[ASN1_Packet] 

185 s, # type: bytes 

186 _underlayer=None # type: Optional[ASN1_Packet] 

187 ): 

188 # type: (...) -> Tuple[ASN1_Packet, bytes] 

189 try: 

190 c = cls(s, _underlayer=_underlayer) 

191 except ASN1F_badsequence: 

192 c = packet.Raw(s, _underlayer=_underlayer) # type: ignore 

193 cpad = c.getlayer(packet.Raw) 

194 s = b"" 

195 if cpad is not None: 

196 s = cpad.load 

197 if cpad.underlayer: 

198 del cpad.underlayer.payload 

199 return c, s 

200 

201 def build(self, pkt): 

202 # type: (ASN1_Packet) -> bytes 

203 return self.i2m(pkt, getattr(pkt, self.name)) 

204 

205 def dissect(self, pkt, s): 

206 # type: (ASN1_Packet, bytes) -> bytes 

207 v, s = self.m2i(pkt, s) 

208 self.set_val(pkt, v) 

209 return s 

210 

211 def do_copy(self, x): 

212 # type: (Any) -> Any 

213 if isinstance(x, list): 

214 x = x[:] 

215 for i in range(len(x)): 

216 if isinstance(x[i], BasePacket): 

217 x[i] = x[i].copy() 

218 return x 

219 if hasattr(x, "copy"): 

220 return x.copy() 

221 return x 

222 

223 def set_val(self, pkt, val): 

224 # type: (ASN1_Packet, Any) -> None 

225 setattr(pkt, self.name, val) 

226 

227 def is_empty(self, pkt): 

228 # type: (ASN1_Packet) -> bool 

229 return getattr(pkt, self.name) is None 

230 

231 def get_fields_list(self): 

232 # type: () -> List[ASN1F_field[Any, Any]] 

233 return [self] 

234 

235 def __str__(self): 

236 # type: () -> str 

237 return repr(self) 

238 

239 def randval(self): 

240 # type: () -> RandField[_I] 

241 return cast(RandField[_I], RandInt()) 

242 

243 def copy(self): 

244 # type: () -> ASN1F_field[_I, _A] 

245 return copy.copy(self) 

246 

247 

248############################ 

249# Simple ASN1 Fields # 

250############################ 

251 

252class ASN1F_BOOLEAN(ASN1F_field[bool, ASN1_BOOLEAN]): 

253 ASN1_tag = ASN1_Class_UNIVERSAL.BOOLEAN 

254 

255 def randval(self): 

256 # type: () -> RandChoice 

257 return RandChoice(True, False) 

258 

259 

260class ASN1F_INTEGER(ASN1F_field[int, ASN1_INTEGER]): 

261 ASN1_tag = ASN1_Class_UNIVERSAL.INTEGER 

262 

263 def randval(self): 

264 # type: () -> RandNum 

265 return RandNum(-2**64, 2**64 - 1) 

266 

267 

268class ASN1F_enum_INTEGER(ASN1F_INTEGER): 

269 def __init__(self, 

270 name, # type: str 

271 default, # type: ASN1_INTEGER 

272 enum, # type: Dict[int, str] 

273 context=None, # type: Optional[Any] 

274 implicit_tag=None, # type: Optional[Any] 

275 explicit_tag=None, # type: Optional[Any] 

276 ): 

277 # type: (...) -> None 

278 super(ASN1F_enum_INTEGER, self).__init__( 

279 name, default, context=context, 

280 implicit_tag=implicit_tag, 

281 explicit_tag=explicit_tag 

282 ) 

283 i2s = self.i2s = {} # type: Dict[int, str] 

284 s2i = self.s2i = {} # type: Dict[str, int] 

285 if isinstance(enum, list): 

286 keys = range(len(enum)) 

287 else: 

288 keys = list(enum) 

289 if any(isinstance(x, str) for x in keys): 

290 i2s, s2i = s2i, i2s # type: ignore 

291 for k in keys: 

292 i2s[k] = enum[k] 

293 s2i[enum[k]] = k 

294 

295 def i2m(self, 

296 pkt, # type: ASN1_Packet 

297 s, # type: Union[bytes, str, int, ASN1_INTEGER] 

298 ): 

299 # type: (...) -> bytes 

300 if not isinstance(s, str): 

301 vs = s 

302 else: 

303 vs = self.s2i[s] 

304 return super(ASN1F_enum_INTEGER, self).i2m(pkt, vs) 

305 

306 def i2repr(self, 

307 pkt, # type: ASN1_Packet 

308 x, # type: Union[str, int] 

309 ): 

310 # type: (...) -> str 

311 if x is not None and isinstance(x, ASN1_INTEGER): 

312 r = self.i2s.get(x.val) 

313 if r: 

314 return "'%s' %s" % (r, repr(x)) 

315 return repr(x) 

316 

317 

318class ASN1F_BIT_STRING(ASN1F_field[str, ASN1_BIT_STRING]): 

319 ASN1_tag = ASN1_Class_UNIVERSAL.BIT_STRING 

320 

321 def __init__(self, 

322 name, # type: str 

323 default, # type: Optional[Union[ASN1_BIT_STRING, AnyStr]] 

324 default_readable=True, # type: bool 

325 context=None, # type: Optional[Any] 

326 implicit_tag=None, # type: Optional[int] 

327 explicit_tag=None, # type: Optional[int] 

328 ): 

329 # type: (...) -> None 

330 super(ASN1F_BIT_STRING, self).__init__( 

331 name, None, context=context, 

332 implicit_tag=implicit_tag, 

333 explicit_tag=explicit_tag 

334 ) 

335 if isinstance(default, (bytes, str)): 

336 self.default = ASN1_BIT_STRING(default, 

337 readable=default_readable) 

338 else: 

339 self.default = default 

340 

341 def randval(self): 

342 # type: () -> RandString 

343 return RandString(RandNum(0, 1000)) 

344 

345 

346class ASN1F_STRING(ASN1F_field[str, ASN1_STRING]): 

347 ASN1_tag = ASN1_Class_UNIVERSAL.STRING 

348 

349 def randval(self): 

350 # type: () -> RandString 

351 return RandString(RandNum(0, 1000)) 

352 

353 

354class ASN1F_NULL(ASN1F_INTEGER): 

355 ASN1_tag = ASN1_Class_UNIVERSAL.NULL 

356 

357 

358class ASN1F_OID(ASN1F_field[str, ASN1_OID]): 

359 ASN1_tag = ASN1_Class_UNIVERSAL.OID 

360 

361 def randval(self): 

362 # type: () -> RandOID 

363 return RandOID() 

364 

365 

366class ASN1F_ENUMERATED(ASN1F_enum_INTEGER): 

367 ASN1_tag = ASN1_Class_UNIVERSAL.ENUMERATED 

368 

369 

370class ASN1F_UTF8_STRING(ASN1F_STRING): 

371 ASN1_tag = ASN1_Class_UNIVERSAL.UTF8_STRING 

372 

373 

374class ASN1F_NUMERIC_STRING(ASN1F_STRING): 

375 ASN1_tag = ASN1_Class_UNIVERSAL.NUMERIC_STRING 

376 

377 

378class ASN1F_PRINTABLE_STRING(ASN1F_STRING): 

379 ASN1_tag = ASN1_Class_UNIVERSAL.PRINTABLE_STRING 

380 

381 

382class ASN1F_T61_STRING(ASN1F_STRING): 

383 ASN1_tag = ASN1_Class_UNIVERSAL.T61_STRING 

384 

385 

386class ASN1F_VIDEOTEX_STRING(ASN1F_STRING): 

387 ASN1_tag = ASN1_Class_UNIVERSAL.VIDEOTEX_STRING 

388 

389 

390class ASN1F_IA5_STRING(ASN1F_STRING): 

391 ASN1_tag = ASN1_Class_UNIVERSAL.IA5_STRING 

392 

393 

394class ASN1F_GENERAL_STRING(ASN1F_STRING): 

395 ASN1_tag = ASN1_Class_UNIVERSAL.GENERAL_STRING 

396 

397 

398class ASN1F_UTC_TIME(ASN1F_STRING): 

399 ASN1_tag = ASN1_Class_UNIVERSAL.UTC_TIME 

400 

401 def randval(self): # type: ignore 

402 # type: () -> GeneralizedTime 

403 return GeneralizedTime() 

404 

405 

406class ASN1F_GENERALIZED_TIME(ASN1F_STRING): 

407 ASN1_tag = ASN1_Class_UNIVERSAL.GENERALIZED_TIME 

408 

409 def randval(self): # type: ignore 

410 # type: () -> GeneralizedTime 

411 return GeneralizedTime() 

412 

413 

414class ASN1F_ISO646_STRING(ASN1F_STRING): 

415 ASN1_tag = ASN1_Class_UNIVERSAL.ISO646_STRING 

416 

417 

418class ASN1F_UNIVERSAL_STRING(ASN1F_STRING): 

419 ASN1_tag = ASN1_Class_UNIVERSAL.UNIVERSAL_STRING 

420 

421 

422class ASN1F_BMP_STRING(ASN1F_STRING): 

423 ASN1_tag = ASN1_Class_UNIVERSAL.BMP_STRING 

424 

425 

426class ASN1F_SEQUENCE(ASN1F_field[List[Any], List[Any]]): 

427 # Here is how you could decode a SEQUENCE 

428 # with an unknown, private high-tag prefix : 

429 # class PrivSeq(ASN1_Packet): 

430 # ASN1_codec = ASN1_Codecs.BER 

431 # ASN1_root = ASN1F_SEQUENCE( 

432 # <asn1 field #0>, 

433 # ... 

434 # <asn1 field #N>, 

435 # explicit_tag=0, 

436 # flexible_tag=True) 

437 # Because we use flexible_tag, the value of the explicit_tag does not matter. # noqa: E501 

438 ASN1_tag = ASN1_Class_UNIVERSAL.SEQUENCE 

439 holds_packets = 1 

440 

441 def __init__(self, *seq, **kwargs): 

442 # type: (*Any, **Any) -> None 

443 name = "dummy_seq_name" 

444 default = [field.default for field in seq] 

445 super(ASN1F_SEQUENCE, self).__init__( 

446 name, default, **kwargs 

447 ) 

448 self.seq = seq 

449 self.islist = len(seq) > 1 

450 

451 def __repr__(self): 

452 # type: () -> str 

453 return "<%s%r>" % (self.__class__.__name__, self.seq) 

454 

455 def is_empty(self, pkt): 

456 # type: (ASN1_Packet) -> bool 

457 return all(f.is_empty(pkt) for f in self.seq) 

458 

459 def get_fields_list(self): 

460 # type: () -> List[ASN1F_field[Any, Any]] 

461 return reduce(lambda x, y: x + y.get_fields_list(), 

462 self.seq, []) 

463 

464 def m2i(self, pkt, s): 

465 # type: (Any, bytes) -> Tuple[Any, bytes] 

466 """ 

467 ASN1F_SEQUENCE behaves transparently, with nested ASN1_objects being 

468 dissected one by one. Because we use obj.dissect (see loop below) 

469 instead of obj.m2i (as we trust dissect to do the appropriate set_vals) 

470 we do not directly retrieve the list of nested objects. 

471 Thus m2i returns an empty list (along with the proper remainder). 

472 It is discarded by dissect() and should not be missed elsewhere. 

473 """ 

474 diff_tag, s = BER_tagging_dec(s, hidden_tag=self.ASN1_tag, 

475 implicit_tag=self.implicit_tag, 

476 explicit_tag=self.explicit_tag, 

477 safe=self.flexible_tag, 

478 _fname=pkt.name) 

479 if diff_tag is not None: 

480 if self.implicit_tag is not None: 

481 self.implicit_tag = diff_tag 

482 elif self.explicit_tag is not None: 

483 self.explicit_tag = diff_tag 

484 codec = self.ASN1_tag.get_codec(pkt.ASN1_codec) 

485 i, s, remain = codec.check_type_check_len(s) 

486 if len(s) == 0: 

487 for obj in self.seq: 

488 obj.set_val(pkt, None) 

489 else: 

490 for obj in self.seq: 

491 try: 

492 s = obj.dissect(pkt, s) 

493 except ASN1F_badsequence: 

494 break 

495 if len(s) > 0: 

496 raise BER_Decoding_Error("unexpected remainder", remaining=s) 

497 return [], remain 

498 

499 def dissect(self, pkt, s): 

500 # type: (Any, bytes) -> bytes 

501 _, x = self.m2i(pkt, s) 

502 return x 

503 

504 def build(self, pkt): 

505 # type: (ASN1_Packet) -> bytes 

506 s = reduce(lambda x, y: x + y.build(pkt), 

507 self.seq, b"") 

508 return super(ASN1F_SEQUENCE, self).i2m(pkt, s) 

509 

510 

511class ASN1F_SET(ASN1F_SEQUENCE): 

512 ASN1_tag = ASN1_Class_UNIVERSAL.SET 

513 

514 

515_SEQ_T = Union[ 

516 'ASN1_Packet', 

517 Type[ASN1F_field[Any, Any]], 

518 'ASN1F_PACKET', 

519 ASN1F_field[Any, Any], 

520] 

521 

522 

523class ASN1F_SEQUENCE_OF(ASN1F_field[List[_SEQ_T], 

524 List[ASN1_Object[Any]]]): 

525 """ 

526 Two types are allowed as cls: ASN1_Packet, ASN1F_field 

527 """ 

528 ASN1_tag = ASN1_Class_UNIVERSAL.SEQUENCE 

529 islist = 1 

530 

531 def __init__(self, 

532 name, # type: str 

533 default, # type: Any 

534 cls, # type: _SEQ_T 

535 context=None, # type: Optional[Any] 

536 implicit_tag=None, # type: Optional[Any] 

537 explicit_tag=None, # type: Optional[Any] 

538 ): 

539 # type: (...) -> None 

540 if isinstance(cls, type) and issubclass(cls, ASN1F_field) or \ 

541 isinstance(cls, ASN1F_field): 

542 if isinstance(cls, type): 

543 self.fld = cls(name, b"") 

544 else: 

545 self.fld = cls 

546 self._extract_packet = lambda s, pkt: self.fld.m2i(pkt, s) 

547 self.holds_packets = 0 

548 elif hasattr(cls, "ASN1_root") or callable(cls): 

549 self.cls = cast("Type[ASN1_Packet]", cls) 

550 self._extract_packet = lambda s, pkt: self.extract_packet( 

551 self.cls, s, _underlayer=pkt) 

552 self.holds_packets = 1 

553 else: 

554 raise ValueError("cls should be an ASN1_Packet or ASN1_field") 

555 super(ASN1F_SEQUENCE_OF, self).__init__( 

556 name, None, context=context, 

557 implicit_tag=implicit_tag, explicit_tag=explicit_tag 

558 ) 

559 self.default = default 

560 

561 def is_empty(self, 

562 pkt, # type: ASN1_Packet 

563 ): 

564 # type: (...) -> bool 

565 return ASN1F_field.is_empty(self, pkt) 

566 

567 def m2i(self, 

568 pkt, # type: ASN1_Packet 

569 s, # type: bytes 

570 ): 

571 # type: (...) -> Tuple[List[Any], bytes] 

572 diff_tag, s = BER_tagging_dec(s, hidden_tag=self.ASN1_tag, 

573 implicit_tag=self.implicit_tag, 

574 explicit_tag=self.explicit_tag, 

575 safe=self.flexible_tag) 

576 if diff_tag is not None: 

577 if self.implicit_tag is not None: 

578 self.implicit_tag = diff_tag 

579 elif self.explicit_tag is not None: 

580 self.explicit_tag = diff_tag 

581 codec = self.ASN1_tag.get_codec(pkt.ASN1_codec) 

582 i, s, remain = codec.check_type_check_len(s) 

583 lst = [] 

584 while s: 

585 c, s = self._extract_packet(s, pkt) # type: ignore 

586 if c: 

587 lst.append(c) 

588 if len(s) > 0: 

589 raise BER_Decoding_Error("unexpected remainder", remaining=s) 

590 return lst, remain 

591 

592 def build(self, pkt): 

593 # type: (ASN1_Packet) -> bytes 

594 val = getattr(pkt, self.name) 

595 if isinstance(val, ASN1_Object) and \ 

596 val.tag == ASN1_Class_UNIVERSAL.RAW: 

597 s = cast(Union[List[_SEQ_T], bytes], val) 

598 elif val is None: 

599 s = b"" 

600 else: 

601 s = b"".join(bytes(i) for i in val) 

602 return self.i2m(pkt, s) 

603 

604 def i2repr(self, pkt, x): 

605 # type: (ASN1_Packet, _I) -> str 

606 if self.holds_packets: 

607 return super(ASN1F_SEQUENCE_OF, self).i2repr(pkt, x) # type: ignore 

608 elif x is None: 

609 return "[]" 

610 else: 

611 return "[%s]" % ", ".join( 

612 self.fld.i2repr(pkt, x) for x in x # type: ignore 

613 ) 

614 

615 def randval(self): 

616 # type: () -> Any 

617 if self.holds_packets: 

618 return packet.fuzz(self.cls()) 

619 else: 

620 return self.fld.randval() 

621 

622 def __repr__(self): 

623 # type: () -> str 

624 return "<%s %s>" % (self.__class__.__name__, self.name) 

625 

626 

627class ASN1F_SET_OF(ASN1F_SEQUENCE_OF): 

628 ASN1_tag = ASN1_Class_UNIVERSAL.SET 

629 

630 

631class ASN1F_IPADDRESS(ASN1F_STRING): 

632 ASN1_tag = ASN1_Class_UNIVERSAL.IPADDRESS 

633 

634 

635class ASN1F_TIME_TICKS(ASN1F_INTEGER): 

636 ASN1_tag = ASN1_Class_UNIVERSAL.TIME_TICKS 

637 

638 

639############################# 

640# Complex ASN1 Fields # 

641############################# 

642 

643class ASN1F_optional(ASN1F_element): 

644 """ 

645 ASN.1 field that is optional. 

646 """ 

647 def __init__(self, field): 

648 # type: (ASN1F_field[Any, Any]) -> None 

649 field.flexible_tag = False 

650 self._field = field 

651 

652 def __getattr__(self, attr): 

653 # type: (str) -> Optional[Any] 

654 return getattr(self._field, attr) 

655 

656 def m2i(self, pkt, s): 

657 # type: (ASN1_Packet, bytes) -> Tuple[Any, bytes] 

658 try: 

659 return self._field.m2i(pkt, s) 

660 except (ASN1_Error, ASN1F_badsequence, BER_Decoding_Error): 

661 # ASN1_Error may be raised by ASN1F_CHOICE 

662 return None, s 

663 

664 def dissect(self, pkt, s): 

665 # type: (ASN1_Packet, bytes) -> bytes 

666 try: 

667 return self._field.dissect(pkt, s) 

668 except (ASN1_Error, ASN1F_badsequence, BER_Decoding_Error): 

669 self._field.set_val(pkt, None) 

670 return s 

671 

672 def build(self, pkt): 

673 # type: (ASN1_Packet) -> bytes 

674 if self._field.is_empty(pkt): 

675 return b"" 

676 return self._field.build(pkt) 

677 

678 def any2i(self, pkt, x): 

679 # type: (ASN1_Packet, Any) -> Any 

680 return self._field.any2i(pkt, x) 

681 

682 def i2repr(self, pkt, x): 

683 # type: (ASN1_Packet, Any) -> str 

684 return self._field.i2repr(pkt, x) 

685 

686 

687class ASN1F_omit(ASN1F_field[None, None]): 

688 """ 

689 ASN.1 field that is not specified. This is simply omitted on the network. 

690 This is different from ASN1F_NULL which has a network representation. 

691 """ 

692 def m2i(self, pkt, s): 

693 # type: (ASN1_Packet, bytes) -> Tuple[None, bytes] 

694 return None, s 

695 

696 def i2m(self, pkt, x): 

697 # type: (ASN1_Packet, Optional[bytes]) -> bytes 

698 return b"" 

699 

700 

701_CHOICE_T = Union['ASN1_Packet', Type[ASN1F_field[Any, Any]], 'ASN1F_PACKET'] 

702 

703 

704class ASN1F_CHOICE(ASN1F_field[_CHOICE_T, ASN1_Object[Any]]): 

705 """ 

706 Multiple types are allowed: ASN1_Packet, ASN1F_field and ASN1F_PACKET(), 

707 See layers/x509.py for examples. 

708 Other ASN1F_field instances than ASN1F_PACKET instances must not be used. 

709 """ 

710 holds_packets = 1 

711 ASN1_tag = ASN1_Class_UNIVERSAL.ANY 

712 

713 def __init__(self, name, default, *args, **kwargs): 

714 # type: (str, Any, *_CHOICE_T, **Any) -> None 

715 if "implicit_tag" in kwargs: 

716 err_msg = "ASN1F_CHOICE has been called with an implicit_tag" 

717 raise ASN1_Error(err_msg) 

718 self.implicit_tag = None 

719 for kwarg in ["context", "explicit_tag"]: 

720 setattr(self, kwarg, kwargs.get(kwarg)) 

721 super(ASN1F_CHOICE, self).__init__( 

722 name, None, context=self.context, 

723 explicit_tag=self.explicit_tag 

724 ) 

725 self.default = default 

726 self.current_choice = None 

727 self.choices = {} # type: Dict[int, _CHOICE_T] 

728 self.pktchoices = {} 

729 for p in args: 

730 if hasattr(p, "ASN1_root"): 

731 p = cast('ASN1_Packet', p) 

732 # should be ASN1_Packet 

733 if hasattr(p.ASN1_root, "choices"): 

734 root = cast(ASN1F_CHOICE, p.ASN1_root) 

735 for k, v in root.choices.items(): 

736 # ASN1F_CHOICE recursion 

737 self.choices[k] = v 

738 else: 

739 self.choices[p.ASN1_root.network_tag] = p 

740 elif hasattr(p, "ASN1_tag"): 

741 if isinstance(p, type): 

742 # should be ASN1F_field class 

743 self.choices[int(p.ASN1_tag)] = p 

744 else: 

745 # should be ASN1F_field instance 

746 self.choices[p.network_tag] = p 

747 self.pktchoices[hash(p.cls)] = (p.implicit_tag, p.explicit_tag) # noqa: E501 

748 else: 

749 raise ASN1_Error("ASN1F_CHOICE: no tag found for one field") 

750 

751 def m2i(self, pkt, s): 

752 # type: (ASN1_Packet, bytes) -> Tuple[ASN1_Object[Any], bytes] 

753 """ 

754 First we have to retrieve the appropriate choice. 

755 Then we extract the field/packet, according to this choice. 

756 """ 

757 if len(s) == 0: 

758 raise ASN1_Error("ASN1F_CHOICE: got empty string") 

759 _, s = BER_tagging_dec(s, hidden_tag=self.ASN1_tag, 

760 explicit_tag=self.explicit_tag) 

761 tag, _ = BER_id_dec(s) 

762 if tag in self.choices: 

763 choice = self.choices[tag] 

764 else: 

765 if self.flexible_tag: 

766 choice = ASN1F_field 

767 else: 

768 raise ASN1_Error( 

769 "ASN1F_CHOICE: unexpected field in '%s' " 

770 "(tag %s not in possible tags %s)" % ( 

771 self.name, tag, list(self.choices.keys()) 

772 ) 

773 ) 

774 if hasattr(choice, "ASN1_root"): 

775 # we don't want to import ASN1_Packet in this module... 

776 return self.extract_packet(choice, s, _underlayer=pkt) # type: ignore 

777 elif isinstance(choice, type): 

778 return choice(self.name, b"").m2i(pkt, s) 

779 else: 

780 # XXX check properly if this is an ASN1F_PACKET 

781 return choice.m2i(pkt, s) 

782 

783 def i2m(self, pkt, x): 

784 # type: (ASN1_Packet, Any) -> bytes 

785 if x is None: 

786 s = b"" 

787 else: 

788 s = bytes(x) 

789 if hash(type(x)) in self.pktchoices: 

790 imp, exp = self.pktchoices[hash(type(x))] 

791 s = BER_tagging_enc(s, 

792 implicit_tag=imp, 

793 explicit_tag=exp) 

794 return BER_tagging_enc(s, explicit_tag=self.explicit_tag) 

795 

796 def randval(self): 

797 # type: () -> RandChoice 

798 randchoices = [] 

799 for p in self.choices.values(): 

800 if hasattr(p, "ASN1_root"): 

801 # should be ASN1_Packet class 

802 randchoices.append(packet.fuzz(p())) # type: ignore 

803 elif hasattr(p, "ASN1_tag"): 

804 if isinstance(p, type): 

805 # should be (basic) ASN1F_field class 

806 randchoices.append(p("dummy", None).randval()) 

807 else: 

808 # should be ASN1F_PACKET instance 

809 randchoices.append(p.randval()) 

810 return RandChoice(*randchoices) 

811 

812 

813class ASN1F_PACKET(ASN1F_field['ASN1_Packet', Optional['ASN1_Packet']]): 

814 holds_packets = 1 

815 

816 def __init__(self, 

817 name, # type: str 

818 default, # type: Optional[ASN1_Packet] 

819 cls, # type: Type[ASN1_Packet] 

820 context=None, # type: Optional[Any] 

821 implicit_tag=None, # type: Optional[int] 

822 explicit_tag=None, # type: Optional[int] 

823 next_cls_cb=None, # type: Optional[Callable[[ASN1_Packet], Type[ASN1_Packet]]] # noqa: E501 

824 ): 

825 # type: (...) -> None 

826 self.cls = cls 

827 self.next_cls_cb = next_cls_cb 

828 super(ASN1F_PACKET, self).__init__( 

829 name, None, context=context, 

830 implicit_tag=implicit_tag, explicit_tag=explicit_tag 

831 ) 

832 if implicit_tag is None and explicit_tag is None and cls is not None: 

833 if cls.ASN1_root.ASN1_tag == ASN1_Class_UNIVERSAL.SEQUENCE: 

834 self.network_tag = 16 | 0x20 # 16 + CONSTRUCTED 

835 self.default = default 

836 

837 def m2i(self, pkt, s): 

838 # type: (ASN1_Packet, bytes) -> Tuple[Any, bytes] 

839 if self.next_cls_cb: 

840 cls = self.next_cls_cb(pkt) or self.cls 

841 else: 

842 cls = self.cls 

843 if not hasattr(cls, "ASN1_root"): 

844 # A normal Packet (!= ASN1) 

845 return self.extract_packet(cls, s, _underlayer=pkt) 

846 diff_tag, s = BER_tagging_dec(s, hidden_tag=cls.ASN1_root.ASN1_tag, # noqa: E501 

847 implicit_tag=self.implicit_tag, 

848 explicit_tag=self.explicit_tag, 

849 safe=self.flexible_tag, 

850 _fname=self.name) 

851 if diff_tag is not None: 

852 if self.implicit_tag is not None: 

853 self.implicit_tag = diff_tag 

854 elif self.explicit_tag is not None: 

855 self.explicit_tag = diff_tag 

856 if not s: 

857 return None, s 

858 return self.extract_packet(cls, s, _underlayer=pkt) 

859 

860 def i2m(self, 

861 pkt, # type: ASN1_Packet 

862 x # type: Union[bytes, ASN1_Packet, None, ASN1_Object[Optional[ASN1_Packet]]] # noqa: E501 

863 ): 

864 # type: (...) -> bytes 

865 if x is None: 

866 s = b"" 

867 elif isinstance(x, bytes): 

868 s = x 

869 elif isinstance(x, ASN1_Object): 

870 if x.val: 

871 s = bytes(x.val) 

872 else: 

873 s = b"" 

874 else: 

875 s = bytes(x) 

876 if not hasattr(x, "ASN1_root"): 

877 # A normal Packet (!= ASN1) 

878 return s 

879 return BER_tagging_enc(s, 

880 implicit_tag=self.implicit_tag, 

881 explicit_tag=self.explicit_tag) 

882 

883 def any2i(self, 

884 pkt, # type: ASN1_Packet 

885 x # type: Union[bytes, ASN1_Packet, None, ASN1_Object[Optional[ASN1_Packet]]] # noqa: E501 

886 ): 

887 # type: (...) -> 'ASN1_Packet' 

888 if hasattr(x, "add_underlayer"): 

889 x.add_underlayer(pkt) # type: ignore 

890 return super(ASN1F_PACKET, self).any2i(pkt, x) 

891 

892 def randval(self): # type: ignore 

893 # type: () -> ASN1_Packet 

894 return packet.fuzz(self.cls()) 

895 

896 

897class ASN1F_BIT_STRING_ENCAPS(ASN1F_BIT_STRING): 

898 """ 

899 We may emulate simple string encapsulation with explicit_tag=0x04, 

900 but we need a specific class for bit strings because of unused bits, etc. 

901 """ 

902 ASN1_tag = ASN1_Class_UNIVERSAL.BIT_STRING 

903 

904 def __init__(self, 

905 name, # type: str 

906 default, # type: Optional[ASN1_Packet] 

907 cls, # type: Type[ASN1_Packet] 

908 context=None, # type: Optional[Any] 

909 implicit_tag=None, # type: Optional[int] 

910 explicit_tag=None, # type: Optional[int] 

911 ): 

912 # type: (...) -> None 

913 self.cls = cls 

914 super(ASN1F_BIT_STRING_ENCAPS, self).__init__( # type: ignore 

915 name, 

916 default and bytes(default), 

917 context=context, 

918 implicit_tag=implicit_tag, 

919 explicit_tag=explicit_tag 

920 ) 

921 

922 def m2i(self, pkt, s): # type: ignore 

923 # type: (ASN1_Packet, bytes) -> Tuple[Optional[ASN1_Packet], bytes] 

924 bit_string, remain = super(ASN1F_BIT_STRING_ENCAPS, self).m2i(pkt, s) 

925 if len(bit_string.val) % 8 != 0: 

926 raise BER_Decoding_Error("wrong bit string", remaining=s) 

927 if bit_string.val_readable: 

928 p, s = self.extract_packet(self.cls, bit_string.val_readable, 

929 _underlayer=pkt) 

930 else: 

931 return None, bit_string.val_readable 

932 if len(s) > 0: 

933 raise BER_Decoding_Error("unexpected remainder", remaining=s) 

934 return p, remain 

935 

936 def i2m(self, pkt, x): # type: ignore 

937 # type: (ASN1_Packet, Optional[ASN1_BIT_STRING]) -> bytes 

938 if not isinstance(x, ASN1_BIT_STRING): 

939 x = ASN1_BIT_STRING( 

940 b"" if x is None else bytes(x), # type: ignore 

941 readable=True, 

942 ) 

943 return super(ASN1F_BIT_STRING_ENCAPS, self).i2m(pkt, x) 

944 

945 

946class ASN1F_FLAGS(ASN1F_BIT_STRING): 

947 def __init__(self, 

948 name, # type: str 

949 default, # type: Optional[str] 

950 mapping, # type: List[str] 

951 context=None, # type: Optional[Any] 

952 implicit_tag=None, # type: Optional[int] 

953 explicit_tag=None, # type: Optional[Any] 

954 ): 

955 # type: (...) -> None 

956 self.mapping = mapping 

957 super(ASN1F_FLAGS, self).__init__( 

958 name, default, 

959 default_readable=False, 

960 context=context, 

961 implicit_tag=implicit_tag, 

962 explicit_tag=explicit_tag 

963 ) 

964 

965 def any2i(self, pkt, x): 

966 # type: (ASN1_Packet, Any) -> str 

967 if isinstance(x, str): 

968 if any(y not in ["0", "1"] for y in x): 

969 # resolve the flags 

970 value = ["0"] * len(self.mapping) 

971 for i in x.split("+"): 

972 value[self.mapping.index(i)] = "1" 

973 x = "".join(value) 

974 x = ASN1_BIT_STRING(x) 

975 return super(ASN1F_FLAGS, self).any2i(pkt, x) 

976 

977 def get_flags(self, pkt): 

978 # type: (ASN1_Packet) -> List[str] 

979 fbytes = getattr(pkt, self.name).val 

980 return [self.mapping[i] for i, positional in enumerate(fbytes) 

981 if positional == '1' and i < len(self.mapping)] 

982 

983 def i2repr(self, pkt, x): 

984 # type: (ASN1_Packet, Any) -> str 

985 if x is not None: 

986 pretty_s = ", ".join(self.get_flags(pkt)) 

987 return pretty_s + " " + repr(x) 

988 return repr(x) 

989 

990 

991class ASN1F_STRING_PacketField(ASN1F_STRING): 

992 """ 

993 ASN1F_STRING that holds packets. 

994 """ 

995 holds_packets = 1 

996 

997 def i2m(self, pkt, val): 

998 # type: (ASN1_Packet, Any) -> bytes 

999 if hasattr(val, "ASN1_root"): 

1000 val = ASN1_STRING(bytes(val)) 

1001 return super(ASN1F_STRING_PacketField, self).i2m(pkt, val) 

1002 

1003 def any2i(self, pkt, x): 

1004 # type: (ASN1_Packet, Any) -> Any 

1005 if hasattr(x, "add_underlayer"): 

1006 x.add_underlayer(pkt) 

1007 return super(ASN1F_STRING_PacketField, self).any2i(pkt, x) 

1008 

1009 

1010class ASN1F_STRING_ENCAPS(ASN1F_STRING_PacketField): 

1011 """ 

1012 ASN1F_STRING that encapsulates a single ASN1 packet. 

1013 """ 

1014 

1015 def __init__(self, 

1016 name, # type: str 

1017 default, # type: Optional[ASN1_Packet] 

1018 cls, # type: Type[ASN1_Packet] 

1019 context=None, # type: Optional[Any] 

1020 implicit_tag=None, # type: Optional[int] 

1021 explicit_tag=None, # type: Optional[int] 

1022 ): 

1023 # type: (...) -> None 

1024 self.cls = cls 

1025 super(ASN1F_STRING_ENCAPS, self).__init__( 

1026 name, 

1027 default and bytes(default), # type: ignore 

1028 context=context, 

1029 implicit_tag=implicit_tag, 

1030 explicit_tag=explicit_tag 

1031 ) 

1032 

1033 def m2i(self, pkt, s): # type: ignore 

1034 # type: (ASN1_Packet, bytes) -> Tuple[ASN1_Packet, bytes] 

1035 val = super(ASN1F_STRING_ENCAPS, self).m2i(pkt, s) 

1036 return self.cls(val[0].val, _underlayer=pkt), val[1]