Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pyasn1/codec/ber/decoder.py: 15%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1078 statements  

1# 

2# This file is part of pyasn1 software. 

3# 

4# Copyright (c) 2005-2020, Ilya Etingof <etingof@gmail.com> 

5# License: https://pyasn1.readthedocs.io/en/latest/license.html 

6# 

7import io 

8import os 

9import sys 

10import warnings 

11 

12from pyasn1 import debug 

13from pyasn1 import error 

14from pyasn1.codec.ber import eoo 

15from pyasn1.codec.streaming import asSeekableStream 

16from pyasn1.codec.streaming import isEndOfStream 

17from pyasn1.codec.streaming import peekIntoStream 

18from pyasn1.codec.streaming import readFromStream 

19from pyasn1.compat import _MISSING 

20from pyasn1.error import PyAsn1Error 

21from pyasn1.type import base 

22from pyasn1.type import char 

23from pyasn1.type import tag 

24from pyasn1.type import tagmap 

25from pyasn1.type import univ 

26from pyasn1.type import useful 

27 

28__all__ = ['StreamingDecoder', 'Decoder', 'decode'] 

29 

30LOG = debug.registerLoggee(__name__, flags=debug.DEBUG_DECODER) 

31 

32noValue = base.noValue 

33 

34SubstrateUnderrunError = error.SubstrateUnderrunError 

35 

36# Maximum number of continuation octets (high-bit set) allowed per OID arc. 

37# 20 octets allows up to 140-bit integers, supporting UUID-based OIDs 

38MAX_OID_ARC_CONTINUATION_OCTETS = 20 

39MAX_NESTING_DEPTH = 100 

40 

41# Maximum number of bytes in a BER length field (8 bytes = up to 2^64-1) 

42MAX_LENGTH_OCTETS = 8 

43 

44 

45class AbstractPayloadDecoder(object): 

46 protoComponent = None 

47 

48 def valueDecoder(self, substrate, asn1Spec, 

49 tagSet=None, length=None, state=None, 

50 decodeFun=None, substrateFun=None, 

51 **options): 

52 """Decode value with fixed byte length. 

53 

54 The decoder is allowed to consume as many bytes as necessary. 

55 """ 

56 raise error.PyAsn1Error('SingleItemDecoder not implemented for %s' % (tagSet,)) # TODO: Seems more like an NotImplementedError? 

57 

58 def indefLenValueDecoder(self, substrate, asn1Spec, 

59 tagSet=None, length=None, state=None, 

60 decodeFun=None, substrateFun=None, 

61 **options): 

62 """Decode value with undefined length. 

63 

64 The decoder is allowed to consume as many bytes as necessary. 

65 """ 

66 raise error.PyAsn1Error('Indefinite length mode decoder not implemented for %s' % (tagSet,)) # TODO: Seems more like an NotImplementedError? 

67 

68 @staticmethod 

69 def _passAsn1Object(asn1Object, options): 

70 if 'asn1Object' not in options: 

71 options['asn1Object'] = asn1Object 

72 

73 return options 

74 

75 

76class AbstractSimplePayloadDecoder(AbstractPayloadDecoder): 

77 @staticmethod 

78 def substrateCollector(asn1Object, substrate, length, options): 

79 for chunk in readFromStream(substrate, length, options): 

80 yield chunk 

81 

82 def _createComponent(self, asn1Spec, tagSet, value, **options): 

83 if options.get('native'): 

84 return value 

85 elif asn1Spec is None: 

86 return self.protoComponent.clone(value, tagSet=tagSet) 

87 elif value is noValue: 

88 return asn1Spec 

89 else: 

90 return asn1Spec.clone(value) 

91 

92 

93class RawPayloadDecoder(AbstractSimplePayloadDecoder): 

94 protoComponent = univ.Any('') 

95 

96 def valueDecoder(self, substrate, asn1Spec, 

97 tagSet=None, length=None, state=None, 

98 decodeFun=None, substrateFun=None, 

99 **options): 

100 if substrateFun: 

101 asn1Object = self._createComponent(asn1Spec, tagSet, '', **options) 

102 

103 for chunk in substrateFun(asn1Object, substrate, length, options): 

104 yield chunk 

105 

106 return 

107 

108 for value in decodeFun(substrate, asn1Spec, tagSet, length, **options): 

109 yield value 

110 

111 def indefLenValueDecoder(self, substrate, asn1Spec, 

112 tagSet=None, length=None, state=None, 

113 decodeFun=None, substrateFun=None, 

114 **options): 

115 if substrateFun: 

116 asn1Object = self._createComponent(asn1Spec, tagSet, '', **options) 

117 

118 for chunk in substrateFun(asn1Object, substrate, length, options): 

119 yield chunk 

120 

121 return 

122 

123 while True: 

124 for value in decodeFun( 

125 substrate, asn1Spec, tagSet, length, 

126 allowEoo=True, **options): 

127 

128 if value is eoo.endOfOctets: 

129 return 

130 

131 yield value 

132 

133 

134rawPayloadDecoder = RawPayloadDecoder() 

135 

136 

137class IntegerPayloadDecoder(AbstractSimplePayloadDecoder): 

138 protoComponent = univ.Integer(0) 

139 

140 def valueDecoder(self, substrate, asn1Spec, 

141 tagSet=None, length=None, state=None, 

142 decodeFun=None, substrateFun=None, 

143 **options): 

144 

145 if tagSet[0].tagFormat != tag.tagFormatSimple: 

146 raise error.PyAsn1Error('Simple tag format expected') 

147 

148 for chunk in readFromStream(substrate, length, options): 

149 if isinstance(chunk, SubstrateUnderrunError): 

150 yield chunk 

151 

152 if chunk: 

153 value = int.from_bytes(bytes(chunk), 'big', signed=True) 

154 

155 else: 

156 value = 0 

157 

158 yield self._createComponent(asn1Spec, tagSet, value, **options) 

159 

160 

161class BooleanPayloadDecoder(IntegerPayloadDecoder): 

162 protoComponent = univ.Boolean(0) 

163 

164 def _createComponent(self, asn1Spec, tagSet, value, **options): 

165 return IntegerPayloadDecoder._createComponent( 

166 self, asn1Spec, tagSet, value and 1 or 0, **options) 

167 

168 

169class BitStringPayloadDecoder(AbstractSimplePayloadDecoder): 

170 protoComponent = univ.BitString(()) 

171 supportConstructedForm = True 

172 

173 def valueDecoder(self, substrate, asn1Spec, 

174 tagSet=None, length=None, state=None, 

175 decodeFun=None, substrateFun=None, 

176 **options): 

177 

178 if substrateFun: 

179 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options) 

180 

181 for chunk in substrateFun(asn1Object, substrate, length, options): 

182 yield chunk 

183 

184 return 

185 

186 if not length: 

187 raise error.PyAsn1Error('Empty BIT STRING substrate') 

188 

189 for chunk in isEndOfStream(substrate): 

190 if isinstance(chunk, SubstrateUnderrunError): 

191 yield chunk 

192 

193 if chunk: 

194 raise error.PyAsn1Error('Empty BIT STRING substrate') 

195 

196 if tagSet[0].tagFormat == tag.tagFormatSimple: # XXX what tag to check? 

197 

198 for trailingBits in readFromStream(substrate, 1, options): 

199 if isinstance(trailingBits, SubstrateUnderrunError): 

200 yield trailingBits 

201 

202 trailingBits = ord(trailingBits) 

203 if trailingBits > 7: 

204 raise error.PyAsn1Error( 

205 'Trailing bits overflow %s' % trailingBits 

206 ) 

207 

208 for chunk in readFromStream(substrate, length - 1, options): 

209 if isinstance(chunk, SubstrateUnderrunError): 

210 yield chunk 

211 

212 value = self.protoComponent.fromOctetString( 

213 chunk, internalFormat=True, padding=trailingBits) 

214 

215 yield self._createComponent(asn1Spec, tagSet, value, **options) 

216 

217 return 

218 

219 if not self.supportConstructedForm: 

220 raise error.PyAsn1Error('Constructed encoding form prohibited ' 

221 'at %s' % self.__class__.__name__) 

222 

223 if LOG: 

224 LOG('assembling constructed serialization') 

225 

226 # All inner fragments are of the same type, treat them as octet string 

227 substrateFun = self.substrateCollector 

228 

229 bitString = self.protoComponent.fromOctetString(b'', internalFormat=True) 

230 

231 current_position = substrate.tell() 

232 

233 while substrate.tell() - current_position < length: 

234 for component in decodeFun( 

235 substrate, self.protoComponent, substrateFun=substrateFun, 

236 **options): 

237 if isinstance(component, SubstrateUnderrunError): 

238 yield component 

239 

240 trailingBits = component[0] 

241 if trailingBits > 7: 

242 raise error.PyAsn1Error( 

243 'Trailing bits overflow %s' % trailingBits 

244 ) 

245 

246 bitString = self.protoComponent.fromOctetString( 

247 component[1:], internalFormat=True, 

248 prepend=bitString, padding=trailingBits 

249 ) 

250 

251 yield self._createComponent(asn1Spec, tagSet, bitString, **options) 

252 

253 def indefLenValueDecoder(self, substrate, asn1Spec, 

254 tagSet=None, length=None, state=None, 

255 decodeFun=None, substrateFun=None, 

256 **options): 

257 

258 if substrateFun: 

259 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options) 

260 

261 for chunk in substrateFun(asn1Object, substrate, length, options): 

262 yield chunk 

263 

264 return 

265 

266 # All inner fragments are of the same type, treat them as octet string 

267 substrateFun = self.substrateCollector 

268 

269 bitString = self.protoComponent.fromOctetString(b'', internalFormat=True) 

270 

271 while True: # loop over fragments 

272 

273 for component in decodeFun( 

274 substrate, self.protoComponent, substrateFun=substrateFun, 

275 allowEoo=True, **options): 

276 

277 if component is eoo.endOfOctets: 

278 break 

279 

280 if isinstance(component, SubstrateUnderrunError): 

281 yield component 

282 

283 if component is eoo.endOfOctets: 

284 break 

285 

286 trailingBits = component[0] 

287 if trailingBits > 7: 

288 raise error.PyAsn1Error( 

289 'Trailing bits overflow %s' % trailingBits 

290 ) 

291 

292 bitString = self.protoComponent.fromOctetString( 

293 component[1:], internalFormat=True, 

294 prepend=bitString, padding=trailingBits 

295 ) 

296 

297 yield self._createComponent(asn1Spec, tagSet, bitString, **options) 

298 

299 

300class OctetStringPayloadDecoder(AbstractSimplePayloadDecoder): 

301 protoComponent = univ.OctetString('') 

302 supportConstructedForm = True 

303 

304 def valueDecoder(self, substrate, asn1Spec, 

305 tagSet=None, length=None, state=None, 

306 decodeFun=None, substrateFun=None, 

307 **options): 

308 if substrateFun: 

309 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options) 

310 

311 for chunk in substrateFun(asn1Object, substrate, length, options): 

312 yield chunk 

313 

314 return 

315 

316 if tagSet[0].tagFormat == tag.tagFormatSimple: # XXX what tag to check? 

317 for chunk in readFromStream(substrate, length, options): 

318 if isinstance(chunk, SubstrateUnderrunError): 

319 yield chunk 

320 

321 yield self._createComponent(asn1Spec, tagSet, chunk, **options) 

322 

323 return 

324 

325 if not self.supportConstructedForm: 

326 raise error.PyAsn1Error('Constructed encoding form prohibited at %s' % self.__class__.__name__) 

327 

328 if LOG: 

329 LOG('assembling constructed serialization') 

330 

331 # All inner fragments are of the same type, treat them as octet string 

332 substrateFun = self.substrateCollector 

333 

334 header = b'' 

335 

336 original_position = substrate.tell() 

337 # head = popSubstream(substrate, length) 

338 while substrate.tell() - original_position < length: 

339 for component in decodeFun( 

340 substrate, self.protoComponent, substrateFun=substrateFun, 

341 **options): 

342 if isinstance(component, SubstrateUnderrunError): 

343 yield component 

344 

345 header += component 

346 

347 yield self._createComponent(asn1Spec, tagSet, header, **options) 

348 

349 def indefLenValueDecoder(self, substrate, asn1Spec, 

350 tagSet=None, length=None, state=None, 

351 decodeFun=None, substrateFun=None, 

352 **options): 

353 if substrateFun and substrateFun is not self.substrateCollector: 

354 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options) 

355 

356 for chunk in substrateFun(asn1Object, substrate, length, options): 

357 yield chunk 

358 

359 return 

360 

361 # All inner fragments are of the same type, treat them as octet string 

362 substrateFun = self.substrateCollector 

363 

364 header = b'' 

365 

366 while True: # loop over fragments 

367 

368 for component in decodeFun( 

369 substrate, self.protoComponent, substrateFun=substrateFun, 

370 allowEoo=True, **options): 

371 

372 if isinstance(component, SubstrateUnderrunError): 

373 yield component 

374 

375 if component is eoo.endOfOctets: 

376 break 

377 

378 if component is eoo.endOfOctets: 

379 break 

380 

381 header += component 

382 

383 yield self._createComponent(asn1Spec, tagSet, header, **options) 

384 

385 

386class NullPayloadDecoder(AbstractSimplePayloadDecoder): 

387 protoComponent = univ.Null('') 

388 

389 def valueDecoder(self, substrate, asn1Spec, 

390 tagSet=None, length=None, state=None, 

391 decodeFun=None, substrateFun=None, 

392 **options): 

393 

394 if tagSet[0].tagFormat != tag.tagFormatSimple: 

395 raise error.PyAsn1Error('Simple tag format expected') 

396 

397 for chunk in readFromStream(substrate, length, options): 

398 if isinstance(chunk, SubstrateUnderrunError): 

399 yield chunk 

400 

401 component = self._createComponent(asn1Spec, tagSet, '', **options) 

402 

403 if chunk: 

404 raise error.PyAsn1Error('Unexpected %d-octet substrate for Null' % length) 

405 

406 yield component 

407 

408 

409class ObjectIdentifierPayloadDecoder(AbstractSimplePayloadDecoder): 

410 protoComponent = univ.ObjectIdentifier(()) 

411 

412 def valueDecoder(self, substrate, asn1Spec, 

413 tagSet=None, length=None, state=None, 

414 decodeFun=None, substrateFun=None, 

415 **options): 

416 if tagSet[0].tagFormat != tag.tagFormatSimple: 

417 raise error.PyAsn1Error('Simple tag format expected') 

418 

419 for chunk in readFromStream(substrate, length, options): 

420 if isinstance(chunk, SubstrateUnderrunError): 

421 yield chunk 

422 

423 if not chunk: 

424 raise error.PyAsn1Error('Empty substrate') 

425 

426 oid = () 

427 index = 0 

428 substrateLen = len(chunk) 

429 while index < substrateLen: 

430 subId = chunk[index] 

431 index += 1 

432 if subId < 128: 

433 oid += (subId,) 

434 elif subId > 128: 

435 # Construct subid from a number of octets 

436 nextSubId = subId 

437 subId = 0 

438 continuationOctetCount = 0 

439 while nextSubId >= 128: 

440 continuationOctetCount += 1 

441 if continuationOctetCount > MAX_OID_ARC_CONTINUATION_OCTETS: 

442 raise error.PyAsn1Error( 

443 'OID arc exceeds maximum continuation octets limit (%d) ' 

444 'at position %d' % (MAX_OID_ARC_CONTINUATION_OCTETS, index) 

445 ) 

446 subId = (subId << 7) + (nextSubId & 0x7F) 

447 if index >= substrateLen: 

448 raise error.SubstrateUnderrunError( 

449 'Short substrate for sub-OID past %s' % (oid,) 

450 ) 

451 nextSubId = chunk[index] 

452 index += 1 

453 oid += ((subId << 7) + nextSubId,) 

454 elif subId == 128: 

455 # ASN.1 spec forbids leading zeros (0x80) in OID 

456 # encoding, tolerating it opens a vulnerability. See 

457 # https://www.esat.kuleuven.be/cosic/publications/article-1432.pdf 

458 # page 7 

459 raise error.PyAsn1Error('Invalid octet 0x80 in OID encoding') 

460 

461 # Decode two leading arcs 

462 if 0 <= oid[0] <= 39: 

463 oid = (0,) + oid 

464 elif 40 <= oid[0] <= 79: 

465 oid = (1, oid[0] - 40) + oid[1:] 

466 elif oid[0] >= 80: 

467 oid = (2, oid[0] - 80) + oid[1:] 

468 else: 

469 raise error.PyAsn1Error('Malformed first OID octet: %s' % chunk[0]) 

470 

471 yield self._createComponent(asn1Spec, tagSet, oid, **options) 

472 

473 

474class RelativeOIDPayloadDecoder(AbstractSimplePayloadDecoder): 

475 protoComponent = univ.RelativeOID(()) 

476 

477 def valueDecoder(self, substrate, asn1Spec, 

478 tagSet=None, length=None, state=None, 

479 decodeFun=None, substrateFun=None, 

480 **options): 

481 if tagSet[0].tagFormat != tag.tagFormatSimple: 

482 raise error.PyAsn1Error('Simple tag format expected') 

483 

484 for chunk in readFromStream(substrate, length, options): 

485 if isinstance(chunk, SubstrateUnderrunError): 

486 yield chunk 

487 

488 if not chunk: 

489 raise error.PyAsn1Error('Empty substrate') 

490 

491 reloid = () 

492 index = 0 

493 substrateLen = len(chunk) 

494 while index < substrateLen: 

495 subId = chunk[index] 

496 index += 1 

497 if subId < 128: 

498 reloid += (subId,) 

499 elif subId > 128: 

500 # Construct subid from a number of octets 

501 nextSubId = subId 

502 subId = 0 

503 continuationOctetCount = 0 

504 while nextSubId >= 128: 

505 continuationOctetCount += 1 

506 if continuationOctetCount > MAX_OID_ARC_CONTINUATION_OCTETS: 

507 raise error.PyAsn1Error( 

508 'RELATIVE-OID arc exceeds maximum continuation octets limit (%d) ' 

509 'at position %d' % (MAX_OID_ARC_CONTINUATION_OCTETS, index) 

510 ) 

511 subId = (subId << 7) + (nextSubId & 0x7F) 

512 if index >= substrateLen: 

513 raise error.SubstrateUnderrunError( 

514 'Short substrate for sub-OID past %s' % (reloid,) 

515 ) 

516 nextSubId = chunk[index] 

517 index += 1 

518 reloid += ((subId << 7) + nextSubId,) 

519 elif subId == 128: 

520 # ASN.1 spec forbids leading zeros (0x80) in OID 

521 # encoding, tolerating it opens a vulnerability. See 

522 # https://www.esat.kuleuven.be/cosic/publications/article-1432.pdf 

523 # page 7 

524 raise error.PyAsn1Error('Invalid octet 0x80 in RELATIVE-OID encoding') 

525 

526 yield self._createComponent(asn1Spec, tagSet, reloid, **options) 

527 

528 

529class RealPayloadDecoder(AbstractSimplePayloadDecoder): 

530 protoComponent = univ.Real() 

531 

532 def valueDecoder(self, substrate, asn1Spec, 

533 tagSet=None, length=None, state=None, 

534 decodeFun=None, substrateFun=None, 

535 **options): 

536 if tagSet[0].tagFormat != tag.tagFormatSimple: 

537 raise error.PyAsn1Error('Simple tag format expected') 

538 

539 for chunk in readFromStream(substrate, length, options): 

540 if isinstance(chunk, SubstrateUnderrunError): 

541 yield chunk 

542 

543 if not chunk: 

544 yield self._createComponent(asn1Spec, tagSet, 0.0, **options) 

545 return 

546 

547 fo = chunk[0] 

548 chunk = chunk[1:] 

549 if fo & 0x80: # binary encoding 

550 if not chunk: 

551 raise error.PyAsn1Error("Incomplete floating-point value") 

552 

553 if LOG: 

554 LOG('decoding binary encoded REAL') 

555 

556 n = (fo & 0x03) + 1 

557 

558 if n == 4: 

559 n = chunk[0] 

560 chunk = chunk[1:] 

561 

562 eo, chunk = chunk[:n], chunk[n:] 

563 

564 if not eo or not chunk: 

565 raise error.PyAsn1Error('Real exponent screwed') 

566 

567 e = eo[0] & 0x80 and -1 or 0 

568 

569 while eo: # exponent 

570 e <<= 8 

571 e |= eo[0] 

572 eo = eo[1:] 

573 

574 b = fo >> 4 & 0x03 # base bits 

575 

576 if b > 2: 

577 raise error.PyAsn1Error('Illegal Real base') 

578 

579 if b == 1: # encbase = 8 

580 e *= 3 

581 

582 elif b == 2: # encbase = 16 

583 e *= 4 

584 p = 0 

585 

586 while chunk: # value 

587 p <<= 8 

588 p |= chunk[0] 

589 chunk = chunk[1:] 

590 

591 if fo & 0x40: # sign bit 

592 p = -p 

593 

594 sf = fo >> 2 & 0x03 # scale bits 

595 p *= 2 ** sf 

596 value = (p, 2, e) 

597 

598 elif fo & 0x40: # infinite value 

599 if LOG: 

600 LOG('decoding infinite REAL') 

601 

602 value = fo & 0x01 and '-inf' or 'inf' 

603 

604 elif fo & 0xc0 == 0: # character encoding 

605 if not chunk: 

606 raise error.PyAsn1Error("Incomplete floating-point value") 

607 

608 if LOG: 

609 LOG('decoding character encoded REAL') 

610 

611 try: 

612 if fo & 0x3 == 0x1: # NR1 

613 value = (int(chunk), 10, 0) 

614 

615 elif fo & 0x3 == 0x2: # NR2 

616 value = float(chunk) 

617 

618 elif fo & 0x3 == 0x3: # NR3 

619 value = float(chunk) 

620 

621 else: 

622 raise error.SubstrateUnderrunError( 

623 'Unknown NR (tag %s)' % fo 

624 ) 

625 

626 except ValueError: 

627 raise error.SubstrateUnderrunError( 

628 'Bad character Real syntax' 

629 ) 

630 

631 else: 

632 raise error.SubstrateUnderrunError( 

633 'Unknown encoding (tag %s)' % fo 

634 ) 

635 

636 yield self._createComponent(asn1Spec, tagSet, value, **options) 

637 

638 

639class AbstractConstructedPayloadDecoder(AbstractPayloadDecoder): 

640 protoComponent = None 

641 

642 

643class ConstructedPayloadDecoderBase(AbstractConstructedPayloadDecoder): 

644 protoRecordComponent = None 

645 protoSequenceComponent = None 

646 

647 def _getComponentTagMap(self, asn1Object, idx): 

648 raise NotImplementedError 

649 

650 def _getComponentPositionByType(self, asn1Object, tagSet, idx): 

651 raise NotImplementedError 

652 

653 def _decodeComponentsSchemaless( 

654 self, substrate, tagSet=None, decodeFun=None, 

655 length=None, **options): 

656 

657 asn1Object = None 

658 

659 components = [] 

660 componentTypes = set() 

661 

662 original_position = substrate.tell() 

663 

664 while length == -1 or substrate.tell() < original_position + length: 

665 for component in decodeFun(substrate, **options): 

666 if isinstance(component, SubstrateUnderrunError): 

667 yield component 

668 

669 if length == -1 and component is eoo.endOfOctets: 

670 break 

671 

672 components.append(component) 

673 componentTypes.add(component.tagSet) 

674 

675 # Now we have to guess is it SEQUENCE/SET or SEQUENCE OF/SET OF 

676 # The heuristics is: 

677 # * 1+ components of different types -> likely SEQUENCE/SET 

678 # * otherwise -> likely SEQUENCE OF/SET OF 

679 if len(componentTypes) > 1: 

680 protoComponent = self.protoRecordComponent 

681 

682 else: 

683 protoComponent = self.protoSequenceComponent 

684 

685 asn1Object = protoComponent.clone( 

686 # construct tagSet from base tag from prototype ASN.1 object 

687 # and additional tags recovered from the substrate 

688 tagSet=tag.TagSet(protoComponent.tagSet.baseTag, *tagSet.superTags) 

689 ) 

690 

691 if LOG: 

692 LOG('guessed %r container type (pass `asn1Spec` to guide the ' 

693 'decoder)' % asn1Object) 

694 

695 for idx, component in enumerate(components): 

696 asn1Object.setComponentByPosition( 

697 idx, component, 

698 verifyConstraints=False, 

699 matchTags=False, matchConstraints=False 

700 ) 

701 

702 yield asn1Object 

703 

704 def valueDecoder(self, substrate, asn1Spec, 

705 tagSet=None, length=None, state=None, 

706 decodeFun=None, substrateFun=None, 

707 **options): 

708 if tagSet[0].tagFormat != tag.tagFormatConstructed: 

709 raise error.PyAsn1Error('Constructed tag format expected') 

710 

711 original_position = substrate.tell() 

712 

713 if substrateFun: 

714 if asn1Spec is not None: 

715 asn1Object = asn1Spec.clone() 

716 

717 elif self.protoComponent is not None: 

718 asn1Object = self.protoComponent.clone(tagSet=tagSet) 

719 

720 else: 

721 asn1Object = self.protoRecordComponent, self.protoSequenceComponent 

722 

723 for chunk in substrateFun(asn1Object, substrate, length, options): 

724 yield chunk 

725 

726 return 

727 

728 if asn1Spec is None: 

729 for asn1Object in self._decodeComponentsSchemaless( 

730 substrate, tagSet=tagSet, decodeFun=decodeFun, 

731 length=length, **options): 

732 if isinstance(asn1Object, SubstrateUnderrunError): 

733 yield asn1Object 

734 

735 if substrate.tell() < original_position + length: 

736 if LOG: 

737 for trailing in readFromStream(substrate, context=options): 

738 if isinstance(trailing, SubstrateUnderrunError): 

739 yield trailing 

740 

741 LOG('Unused trailing %d octets encountered: %s' % ( 

742 len(trailing), debug.hexdump(trailing))) 

743 

744 yield asn1Object 

745 

746 return 

747 

748 asn1Object = asn1Spec.clone() 

749 asn1Object.clear() 

750 

751 options = self._passAsn1Object(asn1Object, options) 

752 

753 if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId): 

754 

755 namedTypes = asn1Spec.componentType 

756 

757 isSetType = asn1Spec.typeId == univ.Set.typeId 

758 isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault 

759 

760 if LOG: 

761 LOG('decoding %sdeterministic %s type %r chosen by type ID' % ( 

762 not isDeterministic and 'non-' or '', isSetType and 'SET' or '', 

763 asn1Spec)) 

764 

765 seenIndices = set() 

766 idx = 0 

767 while substrate.tell() - original_position < length: 

768 if not namedTypes: 

769 componentType = None 

770 

771 elif isSetType: 

772 componentType = namedTypes.tagMapUnique 

773 

774 else: 

775 try: 

776 if isDeterministic: 

777 componentType = namedTypes[idx].asn1Object 

778 

779 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 

780 componentType = namedTypes.getTagMapNearPosition(idx) 

781 

782 else: 

783 componentType = namedTypes[idx].asn1Object 

784 

785 except IndexError: 

786 raise error.PyAsn1Error( 

787 'Excessive components decoded at %r' % (asn1Spec,) 

788 ) 

789 

790 for component in decodeFun(substrate, componentType, **options): 

791 if isinstance(component, SubstrateUnderrunError): 

792 yield component 

793 

794 if not isDeterministic and namedTypes: 

795 if isSetType: 

796 idx = namedTypes.getPositionByType(component.effectiveTagSet) 

797 

798 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 

799 idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx) 

800 

801 asn1Object.setComponentByPosition( 

802 idx, component, 

803 verifyConstraints=False, 

804 matchTags=False, matchConstraints=False 

805 ) 

806 

807 seenIndices.add(idx) 

808 idx += 1 

809 

810 if LOG: 

811 LOG('seen component indices %s' % seenIndices) 

812 

813 if namedTypes: 

814 if not namedTypes.requiredComponents.issubset(seenIndices): 

815 raise error.PyAsn1Error( 

816 'ASN.1 object %s has uninitialized ' 

817 'components' % asn1Object.__class__.__name__) 

818 

819 if namedTypes.hasOpenTypes: 

820 

821 openTypes = options.get('openTypes', {}) 

822 

823 if LOG: 

824 LOG('user-specified open types map:') 

825 

826 for k, v in openTypes.items(): 

827 LOG('%s -> %r' % (k, v)) 

828 

829 if openTypes or options.get('decodeOpenTypes', False): 

830 

831 for idx, namedType in enumerate(namedTypes.namedTypes): 

832 if not namedType.openType: 

833 continue 

834 

835 if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue: 

836 continue 

837 

838 governingValue = asn1Object.getComponentByName( 

839 namedType.openType.name 

840 ) 

841 

842 try: 

843 openType = openTypes[governingValue] 

844 

845 except KeyError: 

846 

847 if LOG: 

848 LOG('default open types map of component ' 

849 '"%s.%s" governed by component "%s.%s"' 

850 ':' % (asn1Object.__class__.__name__, 

851 namedType.name, 

852 asn1Object.__class__.__name__, 

853 namedType.openType.name)) 

854 

855 for k, v in namedType.openType.items(): 

856 LOG('%s -> %r' % (k, v)) 

857 

858 try: 

859 openType = namedType.openType[governingValue] 

860 

861 except KeyError: 

862 if LOG: 

863 LOG('failed to resolve open type by governing ' 

864 'value %r' % (governingValue,)) 

865 continue 

866 

867 if LOG: 

868 LOG('resolved open type %r by governing ' 

869 'value %r' % (openType, governingValue)) 

870 

871 containerValue = asn1Object.getComponentByPosition(idx) 

872 

873 if containerValue.typeId in ( 

874 univ.SetOf.typeId, univ.SequenceOf.typeId): 

875 

876 for pos, containerElement in enumerate( 

877 containerValue): 

878 

879 stream = asSeekableStream(containerValue[pos].asOctets()) 

880 

881 for component in decodeFun(stream, asn1Spec=openType, **options): 

882 if isinstance(component, SubstrateUnderrunError): 

883 yield component 

884 

885 containerValue[pos] = component 

886 

887 else: 

888 stream = asSeekableStream(asn1Object.getComponentByPosition(idx).asOctets()) 

889 

890 for component in decodeFun(stream, asn1Spec=openType, **options): 

891 if isinstance(component, SubstrateUnderrunError): 

892 yield component 

893 

894 asn1Object.setComponentByPosition(idx, component) 

895 

896 else: 

897 inconsistency = asn1Object.isInconsistent 

898 if inconsistency: 

899 raise error.PyAsn1Error( 

900 f"ASN.1 object {asn1Object.__class__.__name__} is inconsistent") 

901 

902 else: 

903 componentType = asn1Spec.componentType 

904 

905 if LOG: 

906 LOG('decoding type %r chosen by given `asn1Spec`' % componentType) 

907 

908 idx = 0 

909 

910 while substrate.tell() - original_position < length: 

911 for component in decodeFun(substrate, componentType, **options): 

912 if isinstance(component, SubstrateUnderrunError): 

913 yield component 

914 

915 asn1Object.setComponentByPosition( 

916 idx, component, 

917 verifyConstraints=False, 

918 matchTags=False, matchConstraints=False 

919 ) 

920 

921 idx += 1 

922 

923 yield asn1Object 

924 

925 def indefLenValueDecoder(self, substrate, asn1Spec, 

926 tagSet=None, length=None, state=None, 

927 decodeFun=None, substrateFun=None, 

928 **options): 

929 if tagSet[0].tagFormat != tag.tagFormatConstructed: 

930 raise error.PyAsn1Error('Constructed tag format expected') 

931 

932 if substrateFun is not None: 

933 if asn1Spec is not None: 

934 asn1Object = asn1Spec.clone() 

935 

936 elif self.protoComponent is not None: 

937 asn1Object = self.protoComponent.clone(tagSet=tagSet) 

938 

939 else: 

940 asn1Object = self.protoRecordComponent, self.protoSequenceComponent 

941 

942 for chunk in substrateFun(asn1Object, substrate, length, options): 

943 yield chunk 

944 

945 return 

946 

947 if asn1Spec is None: 

948 for asn1Object in self._decodeComponentsSchemaless( 

949 substrate, tagSet=tagSet, decodeFun=decodeFun, 

950 length=length, **dict(options, allowEoo=True)): 

951 if isinstance(asn1Object, SubstrateUnderrunError): 

952 yield asn1Object 

953 

954 yield asn1Object 

955 

956 return 

957 

958 asn1Object = asn1Spec.clone() 

959 asn1Object.clear() 

960 

961 options = self._passAsn1Object(asn1Object, options) 

962 

963 if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId): 

964 

965 namedTypes = asn1Object.componentType 

966 

967 isSetType = asn1Object.typeId == univ.Set.typeId 

968 isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault 

969 

970 if LOG: 

971 LOG('decoding %sdeterministic %s type %r chosen by type ID' % ( 

972 not isDeterministic and 'non-' or '', isSetType and 'SET' or '', 

973 asn1Spec)) 

974 

975 seenIndices = set() 

976 

977 idx = 0 

978 

979 while True: # loop over components 

980 if len(namedTypes) <= idx: 

981 asn1Spec = None 

982 

983 elif isSetType: 

984 asn1Spec = namedTypes.tagMapUnique 

985 

986 else: 

987 try: 

988 if isDeterministic: 

989 asn1Spec = namedTypes[idx].asn1Object 

990 

991 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 

992 asn1Spec = namedTypes.getTagMapNearPosition(idx) 

993 

994 else: 

995 asn1Spec = namedTypes[idx].asn1Object 

996 

997 except IndexError: 

998 raise error.PyAsn1Error( 

999 'Excessive components decoded at %r' % (asn1Object,) 

1000 ) 

1001 

1002 for component in decodeFun(substrate, asn1Spec, allowEoo=True, **options): 

1003 

1004 if isinstance(component, SubstrateUnderrunError): 

1005 yield component 

1006 

1007 if component is eoo.endOfOctets: 

1008 break 

1009 

1010 if component is eoo.endOfOctets: 

1011 break 

1012 

1013 if not isDeterministic and namedTypes: 

1014 if isSetType: 

1015 idx = namedTypes.getPositionByType(component.effectiveTagSet) 

1016 

1017 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 

1018 idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx) 

1019 

1020 asn1Object.setComponentByPosition( 

1021 idx, component, 

1022 verifyConstraints=False, 

1023 matchTags=False, matchConstraints=False 

1024 ) 

1025 

1026 seenIndices.add(idx) 

1027 idx += 1 

1028 

1029 if LOG: 

1030 LOG('seen component indices %s' % seenIndices) 

1031 

1032 if namedTypes: 

1033 if not namedTypes.requiredComponents.issubset(seenIndices): 

1034 raise error.PyAsn1Error( 

1035 'ASN.1 object %s has uninitialized ' 

1036 'components' % asn1Object.__class__.__name__) 

1037 

1038 if namedTypes.hasOpenTypes: 

1039 

1040 openTypes = options.get('openTypes', {}) 

1041 

1042 if LOG: 

1043 LOG('user-specified open types map:') 

1044 

1045 for k, v in openTypes.items(): 

1046 LOG('%s -> %r' % (k, v)) 

1047 

1048 if openTypes or options.get('decodeOpenTypes', False): 

1049 

1050 for idx, namedType in enumerate(namedTypes.namedTypes): 

1051 if not namedType.openType: 

1052 continue 

1053 

1054 if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue: 

1055 continue 

1056 

1057 governingValue = asn1Object.getComponentByName( 

1058 namedType.openType.name 

1059 ) 

1060 

1061 try: 

1062 openType = openTypes[governingValue] 

1063 

1064 except KeyError: 

1065 

1066 if LOG: 

1067 LOG('default open types map of component ' 

1068 '"%s.%s" governed by component "%s.%s"' 

1069 ':' % (asn1Object.__class__.__name__, 

1070 namedType.name, 

1071 asn1Object.__class__.__name__, 

1072 namedType.openType.name)) 

1073 

1074 for k, v in namedType.openType.items(): 

1075 LOG('%s -> %r' % (k, v)) 

1076 

1077 try: 

1078 openType = namedType.openType[governingValue] 

1079 

1080 except KeyError: 

1081 if LOG: 

1082 LOG('failed to resolve open type by governing ' 

1083 'value %r' % (governingValue,)) 

1084 continue 

1085 

1086 if LOG: 

1087 LOG('resolved open type %r by governing ' 

1088 'value %r' % (openType, governingValue)) 

1089 

1090 containerValue = asn1Object.getComponentByPosition(idx) 

1091 

1092 if containerValue.typeId in ( 

1093 univ.SetOf.typeId, univ.SequenceOf.typeId): 

1094 

1095 for pos, containerElement in enumerate( 

1096 containerValue): 

1097 

1098 stream = asSeekableStream(containerValue[pos].asOctets()) 

1099 

1100 for component in decodeFun(stream, asn1Spec=openType, 

1101 **dict(options, allowEoo=True)): 

1102 if isinstance(component, SubstrateUnderrunError): 

1103 yield component 

1104 

1105 if component is eoo.endOfOctets: 

1106 break 

1107 

1108 containerValue[pos] = component 

1109 

1110 else: 

1111 stream = asSeekableStream(asn1Object.getComponentByPosition(idx).asOctets()) 

1112 for component in decodeFun(stream, asn1Spec=openType, 

1113 **dict(options, allowEoo=True)): 

1114 if isinstance(component, SubstrateUnderrunError): 

1115 yield component 

1116 

1117 if component is eoo.endOfOctets: 

1118 break 

1119 

1120 asn1Object.setComponentByPosition(idx, component) 

1121 

1122 else: 

1123 inconsistency = asn1Object.isInconsistent 

1124 if inconsistency: 

1125 raise error.PyAsn1Error( 

1126 f"ASN.1 object {asn1Object.__class__.__name__} is inconsistent") 

1127 

1128 else: 

1129 componentType = asn1Spec.componentType 

1130 

1131 if LOG: 

1132 LOG('decoding type %r chosen by given `asn1Spec`' % componentType) 

1133 

1134 idx = 0 

1135 

1136 while True: 

1137 

1138 for component in decodeFun( 

1139 substrate, componentType, allowEoo=True, **options): 

1140 

1141 if isinstance(component, SubstrateUnderrunError): 

1142 yield component 

1143 

1144 if component is eoo.endOfOctets: 

1145 break 

1146 

1147 if component is eoo.endOfOctets: 

1148 break 

1149 

1150 asn1Object.setComponentByPosition( 

1151 idx, component, 

1152 verifyConstraints=False, 

1153 matchTags=False, matchConstraints=False 

1154 ) 

1155 

1156 idx += 1 

1157 

1158 yield asn1Object 

1159 

1160 

1161class SequenceOrSequenceOfPayloadDecoder(ConstructedPayloadDecoderBase): 

1162 protoRecordComponent = univ.Sequence() 

1163 protoSequenceComponent = univ.SequenceOf() 

1164 

1165 

1166class SequencePayloadDecoder(SequenceOrSequenceOfPayloadDecoder): 

1167 protoComponent = univ.Sequence() 

1168 

1169 

1170class SequenceOfPayloadDecoder(SequenceOrSequenceOfPayloadDecoder): 

1171 protoComponent = univ.SequenceOf() 

1172 

1173 

1174class SetOrSetOfPayloadDecoder(ConstructedPayloadDecoderBase): 

1175 protoRecordComponent = univ.Set() 

1176 protoSequenceComponent = univ.SetOf() 

1177 

1178 

1179class SetPayloadDecoder(SetOrSetOfPayloadDecoder): 

1180 protoComponent = univ.Set() 

1181 

1182 

1183class SetOfPayloadDecoder(SetOrSetOfPayloadDecoder): 

1184 protoComponent = univ.SetOf() 

1185 

1186 

1187class ChoicePayloadDecoder(ConstructedPayloadDecoderBase): 

1188 protoComponent = univ.Choice() 

1189 

1190 def valueDecoder(self, substrate, asn1Spec, 

1191 tagSet=None, length=None, state=None, 

1192 decodeFun=None, substrateFun=None, 

1193 **options): 

1194 if asn1Spec is None: 

1195 asn1Object = self.protoComponent.clone(tagSet=tagSet) 

1196 

1197 else: 

1198 asn1Object = asn1Spec.clone() 

1199 

1200 if substrateFun: 

1201 for chunk in substrateFun(asn1Object, substrate, length, options): 

1202 yield chunk 

1203 

1204 return 

1205 

1206 options = self._passAsn1Object(asn1Object, options) 

1207 

1208 if asn1Object.tagSet == tagSet: 

1209 if LOG: 

1210 LOG('decoding %s as explicitly tagged CHOICE' % (tagSet,)) 

1211 

1212 for component in decodeFun( 

1213 substrate, asn1Object.componentTagMap, **options): 

1214 if isinstance(component, SubstrateUnderrunError): 

1215 yield component 

1216 

1217 else: 

1218 if LOG: 

1219 LOG('decoding %s as untagged CHOICE' % (tagSet,)) 

1220 

1221 for component in decodeFun( 

1222 substrate, asn1Object.componentTagMap, tagSet, length, 

1223 state, **options): 

1224 if isinstance(component, SubstrateUnderrunError): 

1225 yield component 

1226 

1227 effectiveTagSet = component.effectiveTagSet 

1228 

1229 if LOG: 

1230 LOG('decoded component %s, effective tag set %s' % (component, effectiveTagSet)) 

1231 

1232 asn1Object.setComponentByType( 

1233 effectiveTagSet, component, 

1234 verifyConstraints=False, 

1235 matchTags=False, matchConstraints=False, 

1236 innerFlag=False 

1237 ) 

1238 

1239 yield asn1Object 

1240 

1241 def indefLenValueDecoder(self, substrate, asn1Spec, 

1242 tagSet=None, length=None, state=None, 

1243 decodeFun=None, substrateFun=None, 

1244 **options): 

1245 if asn1Spec is None: 

1246 asn1Object = self.protoComponent.clone(tagSet=tagSet) 

1247 

1248 else: 

1249 asn1Object = asn1Spec.clone() 

1250 

1251 if substrateFun: 

1252 for chunk in substrateFun(asn1Object, substrate, length, options): 

1253 yield chunk 

1254 

1255 return 

1256 

1257 options = self._passAsn1Object(asn1Object, options) 

1258 

1259 isTagged = asn1Object.tagSet == tagSet 

1260 

1261 if LOG: 

1262 LOG('decoding %s as %stagged CHOICE' % ( 

1263 tagSet, isTagged and 'explicitly ' or 'un')) 

1264 

1265 while True: 

1266 

1267 if isTagged: 

1268 iterator = decodeFun( 

1269 substrate, asn1Object.componentType.tagMapUnique, 

1270 **dict(options, allowEoo=True)) 

1271 

1272 else: 

1273 iterator = decodeFun( 

1274 substrate, asn1Object.componentType.tagMapUnique, 

1275 tagSet, length, state, **dict(options, allowEoo=True)) 

1276 

1277 for component in iterator: 

1278 

1279 if isinstance(component, SubstrateUnderrunError): 

1280 yield component 

1281 

1282 if component is eoo.endOfOctets: 

1283 break 

1284 

1285 effectiveTagSet = component.effectiveTagSet 

1286 

1287 if LOG: 

1288 LOG('decoded component %s, effective tag set ' 

1289 '%s' % (component, effectiveTagSet)) 

1290 

1291 asn1Object.setComponentByType( 

1292 effectiveTagSet, component, 

1293 verifyConstraints=False, 

1294 matchTags=False, matchConstraints=False, 

1295 innerFlag=False 

1296 ) 

1297 

1298 if not isTagged: 

1299 break 

1300 

1301 if not isTagged or component is eoo.endOfOctets: 

1302 break 

1303 

1304 yield asn1Object 

1305 

1306 

1307class AnyPayloadDecoder(AbstractSimplePayloadDecoder): 

1308 protoComponent = univ.Any() 

1309 

1310 def valueDecoder(self, substrate, asn1Spec, 

1311 tagSet=None, length=None, state=None, 

1312 decodeFun=None, substrateFun=None, 

1313 **options): 

1314 if asn1Spec is None: 

1315 isUntagged = True 

1316 

1317 elif asn1Spec.__class__ is tagmap.TagMap: 

1318 isUntagged = tagSet not in asn1Spec.tagMap 

1319 

1320 else: 

1321 isUntagged = tagSet != asn1Spec.tagSet 

1322 

1323 if isUntagged: 

1324 fullPosition = substrate.markedPosition 

1325 currentPosition = substrate.tell() 

1326 

1327 substrate.seek(fullPosition, os.SEEK_SET) 

1328 length += currentPosition - fullPosition 

1329 

1330 if LOG: 

1331 for chunk in peekIntoStream(substrate, length): 

1332 if isinstance(chunk, SubstrateUnderrunError): 

1333 yield chunk 

1334 LOG('decoding as untagged ANY, substrate ' 

1335 '%s' % debug.hexdump(chunk)) 

1336 

1337 if substrateFun: 

1338 for chunk in substrateFun( 

1339 self._createComponent(asn1Spec, tagSet, noValue, **options), 

1340 substrate, length, options): 

1341 yield chunk 

1342 

1343 return 

1344 

1345 for chunk in readFromStream(substrate, length, options): 

1346 if isinstance(chunk, SubstrateUnderrunError): 

1347 yield chunk 

1348 

1349 yield self._createComponent(asn1Spec, tagSet, chunk, **options) 

1350 

1351 def indefLenValueDecoder(self, substrate, asn1Spec, 

1352 tagSet=None, length=None, state=None, 

1353 decodeFun=None, substrateFun=None, 

1354 **options): 

1355 if asn1Spec is None: 

1356 isTagged = False 

1357 

1358 elif asn1Spec.__class__ is tagmap.TagMap: 

1359 isTagged = tagSet in asn1Spec.tagMap 

1360 

1361 else: 

1362 isTagged = tagSet == asn1Spec.tagSet 

1363 

1364 if isTagged: 

1365 # tagged Any type -- consume header substrate 

1366 chunk = b'' 

1367 

1368 if LOG: 

1369 LOG('decoding as tagged ANY') 

1370 

1371 else: 

1372 # TODO: Seems not to be tested 

1373 fullPosition = substrate.markedPosition 

1374 currentPosition = substrate.tell() 

1375 

1376 substrate.seek(fullPosition, os.SEEK_SET) 

1377 for chunk in readFromStream(substrate, currentPosition - fullPosition, options): 

1378 if isinstance(chunk, SubstrateUnderrunError): 

1379 yield chunk 

1380 

1381 if LOG: 

1382 LOG('decoding as untagged ANY, header substrate %s' % debug.hexdump(chunk)) 

1383 

1384 # Any components do not inherit initial tag 

1385 asn1Spec = self.protoComponent 

1386 

1387 if substrateFun and substrateFun is not self.substrateCollector: 

1388 asn1Object = self._createComponent( 

1389 asn1Spec, tagSet, noValue, **options) 

1390 

1391 for chunk in substrateFun( 

1392 asn1Object, chunk + substrate, length + len(chunk), options): 

1393 yield chunk 

1394 

1395 return 

1396 

1397 if LOG: 

1398 LOG('assembling constructed serialization') 

1399 

1400 # All inner fragments are of the same type, treat them as octet string 

1401 substrateFun = self.substrateCollector 

1402 

1403 while True: # loop over fragments 

1404 

1405 for component in decodeFun( 

1406 substrate, asn1Spec, substrateFun=substrateFun, 

1407 allowEoo=True, **options): 

1408 

1409 if isinstance(component, SubstrateUnderrunError): 

1410 yield component 

1411 

1412 if component is eoo.endOfOctets: 

1413 break 

1414 

1415 if component is eoo.endOfOctets: 

1416 break 

1417 

1418 chunk += component 

1419 

1420 if substrateFun: 

1421 yield chunk # TODO: Weird 

1422 

1423 else: 

1424 yield self._createComponent(asn1Spec, tagSet, chunk, **options) 

1425 

1426 

1427# character string types 

1428class UTF8StringPayloadDecoder(OctetStringPayloadDecoder): 

1429 protoComponent = char.UTF8String() 

1430 

1431 

1432class NumericStringPayloadDecoder(OctetStringPayloadDecoder): 

1433 protoComponent = char.NumericString() 

1434 

1435 

1436class PrintableStringPayloadDecoder(OctetStringPayloadDecoder): 

1437 protoComponent = char.PrintableString() 

1438 

1439 

1440class TeletexStringPayloadDecoder(OctetStringPayloadDecoder): 

1441 protoComponent = char.TeletexString() 

1442 

1443 

1444class VideotexStringPayloadDecoder(OctetStringPayloadDecoder): 

1445 protoComponent = char.VideotexString() 

1446 

1447 

1448class IA5StringPayloadDecoder(OctetStringPayloadDecoder): 

1449 protoComponent = char.IA5String() 

1450 

1451 

1452class GraphicStringPayloadDecoder(OctetStringPayloadDecoder): 

1453 protoComponent = char.GraphicString() 

1454 

1455 

1456class VisibleStringPayloadDecoder(OctetStringPayloadDecoder): 

1457 protoComponent = char.VisibleString() 

1458 

1459 

1460class GeneralStringPayloadDecoder(OctetStringPayloadDecoder): 

1461 protoComponent = char.GeneralString() 

1462 

1463 

1464class UniversalStringPayloadDecoder(OctetStringPayloadDecoder): 

1465 protoComponent = char.UniversalString() 

1466 

1467 

1468class BMPStringPayloadDecoder(OctetStringPayloadDecoder): 

1469 protoComponent = char.BMPString() 

1470 

1471 

1472# "useful" types 

1473class ObjectDescriptorPayloadDecoder(OctetStringPayloadDecoder): 

1474 protoComponent = useful.ObjectDescriptor() 

1475 

1476 

1477class GeneralizedTimePayloadDecoder(OctetStringPayloadDecoder): 

1478 protoComponent = useful.GeneralizedTime() 

1479 

1480 

1481class UTCTimePayloadDecoder(OctetStringPayloadDecoder): 

1482 protoComponent = useful.UTCTime() 

1483 

1484 

1485TAG_MAP = { 

1486 univ.Integer.tagSet: IntegerPayloadDecoder(), 

1487 univ.Boolean.tagSet: BooleanPayloadDecoder(), 

1488 univ.BitString.tagSet: BitStringPayloadDecoder(), 

1489 univ.OctetString.tagSet: OctetStringPayloadDecoder(), 

1490 univ.Null.tagSet: NullPayloadDecoder(), 

1491 univ.ObjectIdentifier.tagSet: ObjectIdentifierPayloadDecoder(), 

1492 univ.RelativeOID.tagSet: RelativeOIDPayloadDecoder(), 

1493 univ.Enumerated.tagSet: IntegerPayloadDecoder(), 

1494 univ.Real.tagSet: RealPayloadDecoder(), 

1495 univ.Sequence.tagSet: SequenceOrSequenceOfPayloadDecoder(), # conflicts with SequenceOf 

1496 univ.Set.tagSet: SetOrSetOfPayloadDecoder(), # conflicts with SetOf 

1497 univ.Choice.tagSet: ChoicePayloadDecoder(), # conflicts with Any 

1498 # character string types 

1499 char.UTF8String.tagSet: UTF8StringPayloadDecoder(), 

1500 char.NumericString.tagSet: NumericStringPayloadDecoder(), 

1501 char.PrintableString.tagSet: PrintableStringPayloadDecoder(), 

1502 char.TeletexString.tagSet: TeletexStringPayloadDecoder(), 

1503 char.VideotexString.tagSet: VideotexStringPayloadDecoder(), 

1504 char.IA5String.tagSet: IA5StringPayloadDecoder(), 

1505 char.GraphicString.tagSet: GraphicStringPayloadDecoder(), 

1506 char.VisibleString.tagSet: VisibleStringPayloadDecoder(), 

1507 char.GeneralString.tagSet: GeneralStringPayloadDecoder(), 

1508 char.UniversalString.tagSet: UniversalStringPayloadDecoder(), 

1509 char.BMPString.tagSet: BMPStringPayloadDecoder(), 

1510 # useful types 

1511 useful.ObjectDescriptor.tagSet: ObjectDescriptorPayloadDecoder(), 

1512 useful.GeneralizedTime.tagSet: GeneralizedTimePayloadDecoder(), 

1513 useful.UTCTime.tagSet: UTCTimePayloadDecoder() 

1514} 

1515 

1516# Type-to-codec map for ambiguous ASN.1 types 

1517TYPE_MAP = { 

1518 univ.Set.typeId: SetPayloadDecoder(), 

1519 univ.SetOf.typeId: SetOfPayloadDecoder(), 

1520 univ.Sequence.typeId: SequencePayloadDecoder(), 

1521 univ.SequenceOf.typeId: SequenceOfPayloadDecoder(), 

1522 univ.Choice.typeId: ChoicePayloadDecoder(), 

1523 univ.Any.typeId: AnyPayloadDecoder() 

1524} 

1525 

1526# Put in non-ambiguous types for faster codec lookup 

1527for typeDecoder in TAG_MAP.values(): 

1528 if typeDecoder.protoComponent is not None: 

1529 typeId = typeDecoder.protoComponent.__class__.typeId 

1530 if typeId is not None and typeId not in TYPE_MAP: 

1531 TYPE_MAP[typeId] = typeDecoder 

1532 

1533 

1534(stDecodeTag, 

1535 stDecodeLength, 

1536 stGetValueDecoder, 

1537 stGetValueDecoderByAsn1Spec, 

1538 stGetValueDecoderByTag, 

1539 stTryAsExplicitTag, 

1540 stDecodeValue, 

1541 stDumpRawValue, 

1542 stErrorCondition, 

1543 stStop) = [x for x in range(10)] 

1544 

1545 

1546EOO_SENTINEL = bytes((0, 0)) 

1547 

1548 

1549class SingleItemDecoder(object): 

1550 defaultErrorState = stErrorCondition 

1551 #defaultErrorState = stDumpRawValue 

1552 defaultRawDecoder = AnyPayloadDecoder() 

1553 

1554 supportIndefLength = True 

1555 

1556 TAG_MAP = TAG_MAP 

1557 TYPE_MAP = TYPE_MAP 

1558 

1559 def __init__(self, tagMap=_MISSING, typeMap=_MISSING, **ignored): 

1560 self._tagMap = tagMap if tagMap is not _MISSING else self.TAG_MAP 

1561 self._typeMap = typeMap if typeMap is not _MISSING else self.TYPE_MAP 

1562 

1563 # Tag & TagSet objects caches 

1564 self._tagCache = {} 

1565 self._tagSetCache = {} 

1566 

1567 def __call__(self, substrate, asn1Spec=None, 

1568 tagSet=None, length=None, state=stDecodeTag, 

1569 decodeFun=None, substrateFun=None, 

1570 **options): 

1571 

1572 _nestingLevel = options.get('_nestingLevel', 0) 

1573 

1574 if _nestingLevel > MAX_NESTING_DEPTH: 

1575 raise error.PyAsn1Error( 

1576 'ASN.1 structure nesting depth exceeds limit (%d)' % MAX_NESTING_DEPTH 

1577 ) 

1578 

1579 options['_nestingLevel'] = _nestingLevel + 1 

1580 

1581 allowEoo = options.pop('allowEoo', False) 

1582 

1583 if LOG: 

1584 LOG('decoder called at scope %s with state %d, working with up ' 

1585 'to %s octets of substrate: ' 

1586 '%s' % (debug.scope, state, length, substrate)) 

1587 

1588 # Look for end-of-octets sentinel 

1589 if allowEoo and self.supportIndefLength: 

1590 

1591 for eoo_candidate in readFromStream(substrate, 2, options): 

1592 if isinstance(eoo_candidate, SubstrateUnderrunError): 

1593 yield eoo_candidate 

1594 

1595 if eoo_candidate == EOO_SENTINEL: 

1596 if LOG: 

1597 LOG('end-of-octets sentinel found') 

1598 yield eoo.endOfOctets 

1599 return 

1600 

1601 else: 

1602 substrate.seek(-2, os.SEEK_CUR) 

1603 

1604 tagMap = self._tagMap 

1605 typeMap = self._typeMap 

1606 tagCache = self._tagCache 

1607 tagSetCache = self._tagSetCache 

1608 

1609 value = noValue 

1610 

1611 substrate.markedPosition = substrate.tell() 

1612 

1613 while state is not stStop: 

1614 

1615 if state is stDecodeTag: 

1616 # Decode tag 

1617 isShortTag = True 

1618 

1619 for firstByte in readFromStream(substrate, 1, options): 

1620 if isinstance(firstByte, SubstrateUnderrunError): 

1621 yield firstByte 

1622 

1623 firstOctet = ord(firstByte) 

1624 

1625 try: 

1626 lastTag = tagCache[firstOctet] 

1627 

1628 except KeyError: 

1629 integerTag = firstOctet 

1630 tagClass = integerTag & 0xC0 

1631 tagFormat = integerTag & 0x20 

1632 tagId = integerTag & 0x1F 

1633 

1634 if tagId == 0x1F: 

1635 isShortTag = False 

1636 lengthOctetIdx = 0 

1637 tagId = 0 

1638 

1639 while True: 

1640 for integerByte in readFromStream(substrate, 1, options): 

1641 if isinstance(integerByte, SubstrateUnderrunError): 

1642 yield integerByte 

1643 

1644 if not integerByte: 

1645 raise error.SubstrateUnderrunError( 

1646 'Short octet stream on long tag decoding' 

1647 ) 

1648 

1649 integerTag = ord(integerByte) 

1650 lengthOctetIdx += 1 

1651 tagId <<= 7 

1652 tagId |= (integerTag & 0x7F) 

1653 

1654 if not integerTag & 0x80: 

1655 break 

1656 

1657 lastTag = tag.Tag( 

1658 tagClass=tagClass, tagFormat=tagFormat, tagId=tagId 

1659 ) 

1660 

1661 if isShortTag: 

1662 # cache short tags 

1663 tagCache[firstOctet] = lastTag 

1664 

1665 if tagSet is None: 

1666 if isShortTag: 

1667 try: 

1668 tagSet = tagSetCache[firstOctet] 

1669 

1670 except KeyError: 

1671 # base tag not recovered 

1672 tagSet = tag.TagSet((), lastTag) 

1673 tagSetCache[firstOctet] = tagSet 

1674 else: 

1675 tagSet = tag.TagSet((), lastTag) 

1676 

1677 else: 

1678 tagSet = lastTag + tagSet 

1679 

1680 state = stDecodeLength 

1681 

1682 if LOG: 

1683 LOG('tag decoded into %s, decoding length' % tagSet) 

1684 

1685 if state is stDecodeLength: 

1686 # Decode length 

1687 for firstOctet in readFromStream(substrate, 1, options): 

1688 if isinstance(firstOctet, SubstrateUnderrunError): 

1689 yield firstOctet 

1690 

1691 firstOctet = ord(firstOctet) 

1692 

1693 if firstOctet < 128: 

1694 length = firstOctet 

1695 

1696 elif firstOctet > 128: 

1697 size = firstOctet & 0x7F 

1698 

1699 if size > MAX_LENGTH_OCTETS: 

1700 raise error.PyAsn1Error( 

1701 'BER length field size %d exceeds limit (%d)' % ( 

1702 size, MAX_LENGTH_OCTETS) 

1703 ) 

1704 

1705 # encoded in size bytes 

1706 for encodedLength in readFromStream(substrate, size, options): 

1707 if isinstance(encodedLength, SubstrateUnderrunError): 

1708 yield encodedLength 

1709 encodedLength = list(encodedLength) 

1710 if len(encodedLength) != size: 

1711 raise error.SubstrateUnderrunError( 

1712 '%s<%s at %s' % (size, len(encodedLength), tagSet) 

1713 ) 

1714 

1715 length = 0 

1716 for lengthOctet in encodedLength: 

1717 length <<= 8 

1718 length |= lengthOctet 

1719 size += 1 

1720 

1721 else: # 128 means indefinite 

1722 length = -1 

1723 

1724 if length == -1 and not self.supportIndefLength: 

1725 raise error.PyAsn1Error('Indefinite length encoding not supported by this codec') 

1726 

1727 state = stGetValueDecoder 

1728 

1729 if LOG: 

1730 LOG('value length decoded into %d' % length) 

1731 

1732 if state is stGetValueDecoder: 

1733 if asn1Spec is None: 

1734 state = stGetValueDecoderByTag 

1735 

1736 else: 

1737 state = stGetValueDecoderByAsn1Spec 

1738 # 

1739 # There're two ways of creating subtypes in ASN.1 what influences 

1740 # decoder operation. These methods are: 

1741 # 1) Either base types used in or no IMPLICIT tagging has been 

1742 # applied on subtyping. 

1743 # 2) Subtype syntax drops base type information (by means of 

1744 # IMPLICIT tagging. 

1745 # The first case allows for complete tag recovery from substrate 

1746 # while the second one requires original ASN.1 type spec for 

1747 # decoding. 

1748 # 

1749 # In either case a set of tags (tagSet) is coming from substrate 

1750 # in an incremental, tag-by-tag fashion (this is the case of 

1751 # EXPLICIT tag which is most basic). Outermost tag comes first 

1752 # from the wire. 

1753 # 

1754 if state is stGetValueDecoderByTag: 

1755 try: 

1756 concreteDecoder = tagMap[tagSet] 

1757 

1758 except KeyError: 

1759 concreteDecoder = None 

1760 

1761 if concreteDecoder: 

1762 state = stDecodeValue 

1763 

1764 else: 

1765 try: 

1766 concreteDecoder = tagMap[tagSet[:1]] 

1767 

1768 except KeyError: 

1769 concreteDecoder = None 

1770 

1771 if concreteDecoder: 

1772 state = stDecodeValue 

1773 else: 

1774 state = stTryAsExplicitTag 

1775 

1776 if LOG: 

1777 LOG('codec %s chosen by a built-in type, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag')) 

1778 debug.scope.push(concreteDecoder is None and '?' or concreteDecoder.protoComponent.__class__.__name__) 

1779 

1780 if state is stGetValueDecoderByAsn1Spec: 

1781 

1782 if asn1Spec.__class__ is tagmap.TagMap: 

1783 try: 

1784 chosenSpec = asn1Spec[tagSet] 

1785 

1786 except KeyError: 

1787 chosenSpec = None 

1788 

1789 if LOG: 

1790 LOG('candidate ASN.1 spec is a map of:') 

1791 

1792 for firstOctet, v in asn1Spec.presentTypes.items(): 

1793 LOG(' %s -> %s' % (firstOctet, v.__class__.__name__)) 

1794 

1795 if asn1Spec.skipTypes: 

1796 LOG('but neither of: ') 

1797 for firstOctet, v in asn1Spec.skipTypes.items(): 

1798 LOG(' %s -> %s' % (firstOctet, v.__class__.__name__)) 

1799 LOG('new candidate ASN.1 spec is %s, chosen by %s' % (chosenSpec is None and '<none>' or chosenSpec.prettyPrintType(), tagSet)) 

1800 

1801 elif tagSet == asn1Spec.tagSet or tagSet in asn1Spec.tagMap: 

1802 chosenSpec = asn1Spec 

1803 if LOG: 

1804 LOG('candidate ASN.1 spec is %s' % asn1Spec.__class__.__name__) 

1805 

1806 else: 

1807 chosenSpec = None 

1808 

1809 if chosenSpec is not None: 

1810 try: 

1811 # ambiguous type or just faster codec lookup 

1812 concreteDecoder = typeMap[chosenSpec.typeId] 

1813 

1814 if LOG: 

1815 LOG('value decoder chosen for an ambiguous type by type ID %s' % (chosenSpec.typeId,)) 

1816 

1817 except KeyError: 

1818 # use base type for codec lookup to recover untagged types 

1819 baseTagSet = tag.TagSet(chosenSpec.tagSet.baseTag, chosenSpec.tagSet.baseTag) 

1820 try: 

1821 # base type or tagged subtype 

1822 concreteDecoder = tagMap[baseTagSet] 

1823 

1824 if LOG: 

1825 LOG('value decoder chosen by base %s' % (baseTagSet,)) 

1826 

1827 except KeyError: 

1828 concreteDecoder = None 

1829 

1830 if concreteDecoder: 

1831 asn1Spec = chosenSpec 

1832 state = stDecodeValue 

1833 

1834 else: 

1835 state = stTryAsExplicitTag 

1836 

1837 else: 

1838 concreteDecoder = None 

1839 state = stTryAsExplicitTag 

1840 

1841 if LOG: 

1842 LOG('codec %s chosen by ASN.1 spec, decoding %s' % (state is stDecodeValue and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag')) 

1843 debug.scope.push(chosenSpec is None and '?' or chosenSpec.__class__.__name__) 

1844 

1845 if state is stDecodeValue: 

1846 if not options.get('recursiveFlag', True) and not substrateFun: # deprecate this 

1847 def substrateFun(asn1Object, _substrate, _length, _options): 

1848 """Legacy hack to keep the recursiveFlag=False option supported. 

1849 

1850 The decode(..., substrateFun=userCallback) option was introduced in 0.1.4 as a generalization 

1851 of the old recursiveFlag=False option. Users should pass their callback instead of using 

1852 recursiveFlag. 

1853 """ 

1854 yield asn1Object 

1855 

1856 original_position = substrate.tell() 

1857 

1858 if length == -1: # indef length 

1859 for value in concreteDecoder.indefLenValueDecoder( 

1860 substrate, asn1Spec, 

1861 tagSet, length, stGetValueDecoder, 

1862 self, substrateFun, **options): 

1863 if isinstance(value, SubstrateUnderrunError): 

1864 yield value 

1865 

1866 else: 

1867 for value in concreteDecoder.valueDecoder( 

1868 substrate, asn1Spec, 

1869 tagSet, length, stGetValueDecoder, 

1870 self, substrateFun, **options): 

1871 if isinstance(value, SubstrateUnderrunError): 

1872 yield value 

1873 

1874 bytesRead = substrate.tell() - original_position 

1875 if not substrateFun and bytesRead != length: 

1876 raise PyAsn1Error( 

1877 "Read %s bytes instead of expected %s." % (bytesRead, length)) 

1878 elif substrateFun and bytesRead > length: 

1879 # custom substrateFun may be used for partial decoding, reading less is expected there 

1880 raise PyAsn1Error( 

1881 "Read %s bytes are more than expected %s." % (bytesRead, length)) 

1882 

1883 if LOG: 

1884 LOG('codec %s yields type %s, value:\n%s\n...' % ( 

1885 concreteDecoder.__class__.__name__, value.__class__.__name__, 

1886 isinstance(value, base.Asn1Item) and value.prettyPrint() or value)) 

1887 

1888 state = stStop 

1889 break 

1890 

1891 if state is stTryAsExplicitTag: 

1892 if (tagSet and 

1893 tagSet[0].tagFormat == tag.tagFormatConstructed and 

1894 tagSet[0].tagClass != tag.tagClassUniversal): 

1895 # Assume explicit tagging 

1896 concreteDecoder = rawPayloadDecoder 

1897 state = stDecodeValue 

1898 

1899 else: 

1900 concreteDecoder = None 

1901 state = self.defaultErrorState 

1902 

1903 if LOG: 

1904 LOG('codec %s chosen, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as failure')) 

1905 

1906 if state is stDumpRawValue: 

1907 concreteDecoder = self.defaultRawDecoder 

1908 

1909 if LOG: 

1910 LOG('codec %s chosen, decoding value' % concreteDecoder.__class__.__name__) 

1911 

1912 state = stDecodeValue 

1913 

1914 if state is stErrorCondition: 

1915 raise error.PyAsn1Error( 

1916 '%s not in asn1Spec: %r' % (tagSet, asn1Spec) 

1917 ) 

1918 

1919 if LOG: 

1920 debug.scope.pop() 

1921 LOG('decoder left scope %s, call completed' % debug.scope) 

1922 

1923 yield value 

1924 

1925 

1926class StreamingDecoder(object): 

1927 """Create an iterator that turns BER/CER/DER byte stream into ASN.1 objects. 

1928 

1929 On each iteration, consume whatever BER/CER/DER serialization is 

1930 available in the `substrate` stream-like object and turns it into 

1931 one or more, possibly nested, ASN.1 objects. 

1932 

1933 Parameters 

1934 ---------- 

1935 substrate: :py:class:`file`, :py:class:`io.BytesIO` 

1936 BER/CER/DER serialization in form of a byte stream 

1937 

1938 Keyword Args 

1939 ------------ 

1940 asn1Spec: :py:class:`~pyasn1.type.base.PyAsn1Item` 

1941 A pyasn1 type object to act as a template guiding the decoder. 

1942 Depending on the ASN.1 structure being decoded, `asn1Spec` may 

1943 or may not be required. One of the reasons why `asn1Spec` may 

1944 me required is that ASN.1 structure is encoded in the *IMPLICIT* 

1945 tagging mode. 

1946 

1947 Yields 

1948 ------ 

1949 : :py:class:`~pyasn1.type.base.PyAsn1Item`, :py:class:`~pyasn1.error.SubstrateUnderrunError` 

1950 Decoded ASN.1 object (possibly, nested) or 

1951 :py:class:`~pyasn1.error.SubstrateUnderrunError` object indicating 

1952 insufficient BER/CER/DER serialization on input to fully recover ASN.1 

1953 objects from it. 

1954 

1955 In the latter case the caller is advised to ensure some more data in 

1956 the input stream, then call the iterator again. The decoder will resume 

1957 the decoding process using the newly arrived data. 

1958 

1959 The `context` property of :py:class:`~pyasn1.error.SubstrateUnderrunError` 

1960 object might hold a reference to the partially populated ASN.1 object 

1961 being reconstructed. 

1962 

1963 Raises 

1964 ------ 

1965 ~pyasn1.error.PyAsn1Error, ~pyasn1.error.EndOfStreamError 

1966 `PyAsn1Error` on deserialization error, `EndOfStreamError` on 

1967 premature stream closure. 

1968 

1969 Examples 

1970 -------- 

1971 Decode BER serialisation without ASN.1 schema 

1972 

1973 .. code-block:: pycon 

1974 

1975 >>> stream = io.BytesIO( 

1976 ... b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03') 

1977 >>> 

1978 >>> for asn1Object in StreamingDecoder(stream): 

1979 ... print(asn1Object) 

1980 >>> 

1981 SequenceOf: 

1982 1 2 3 

1983 

1984 Decode BER serialisation with ASN.1 schema 

1985 

1986 .. code-block:: pycon 

1987 

1988 >>> stream = io.BytesIO( 

1989 ... b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03') 

1990 >>> 

1991 >>> schema = SequenceOf(componentType=Integer()) 

1992 >>> 

1993 >>> decoder = StreamingDecoder(stream, asn1Spec=schema) 

1994 >>> for asn1Object in decoder: 

1995 ... print(asn1Object) 

1996 >>> 

1997 SequenceOf: 

1998 1 2 3 

1999 """ 

2000 

2001 SINGLE_ITEM_DECODER = SingleItemDecoder 

2002 

2003 def __init__(self, substrate, asn1Spec=None, **options): 

2004 self._singleItemDecoder = self.SINGLE_ITEM_DECODER(**options) 

2005 self._substrate = asSeekableStream(substrate) 

2006 self._asn1Spec = asn1Spec 

2007 self._options = options 

2008 

2009 def __iter__(self): 

2010 while True: 

2011 for asn1Object in self._singleItemDecoder( 

2012 self._substrate, self._asn1Spec, **self._options): 

2013 yield asn1Object 

2014 

2015 for chunk in isEndOfStream(self._substrate): 

2016 if isinstance(chunk, SubstrateUnderrunError): 

2017 yield 

2018 

2019 break 

2020 

2021 if chunk: 

2022 break 

2023 

2024 

2025class Decoder(object): 

2026 """Create a BER decoder object. 

2027 

2028 Parse BER/CER/DER octet-stream into one, possibly nested, ASN.1 object. 

2029 """ 

2030 STREAMING_DECODER = StreamingDecoder 

2031 

2032 @classmethod 

2033 def __call__(cls, substrate, asn1Spec=None, **options): 

2034 """Turns BER/CER/DER octet stream into an ASN.1 object. 

2035 

2036 Takes BER/CER/DER octet-stream in form of :py:class:`bytes` 

2037 and decode it into an ASN.1 object 

2038 (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) which 

2039 may be a scalar or an arbitrary nested structure. 

2040 

2041 Parameters 

2042 ---------- 

2043 substrate: :py:class:`bytes` 

2044 BER/CER/DER octet-stream to parse 

2045 

2046 Keyword Args 

2047 ------------ 

2048 asn1Spec: :py:class:`~pyasn1.type.base.PyAsn1Item` 

2049 A pyasn1 type object (:py:class:`~pyasn1.type.base.PyAsn1Item` 

2050 derivative) to act as a template guiding the decoder. 

2051 Depending on the ASN.1 structure being decoded, `asn1Spec` may or 

2052 may not be required. Most common reason for it to require is that 

2053 ASN.1 structure is encoded in *IMPLICIT* tagging mode. 

2054 

2055 substrateFun: :py:class:`Union[ 

2056 Callable[[pyasn1.type.base.PyAsn1Item, bytes, int], 

2057 Tuple[pyasn1.type.base.PyAsn1Item, bytes]], 

2058 Callable[[pyasn1.type.base.PyAsn1Item, io.BytesIO, int, dict], 

2059 Generator[Union[pyasn1.type.base.PyAsn1Item, 

2060 pyasn1.error.SubstrateUnderrunError], 

2061 None, None]] 

2062 ]` 

2063 User callback meant to generalize special use cases like non-recursive or 

2064 partial decoding. A 3-arg non-streaming variant is supported for backwards 

2065 compatiblilty in addition to the newer 4-arg streaming variant. 

2066 The callback will receive the uninitialized object recovered from substrate 

2067 as 1st argument, the uninterpreted payload as 2nd argument, and the length 

2068 of the uninterpreted payload as 3rd argument. The streaming variant will 

2069 additionally receive the decode(..., **options) kwargs as 4th argument. 

2070 The non-streaming variant shall return an object that will be propagated 

2071 as decode() return value as 1st item, and the remainig payload for further 

2072 decode passes as 2nd item. 

2073 The streaming variant shall yield an object that will be propagated as 

2074 decode() return value, and leave the remaining payload in the stream. 

2075 

2076 Returns 

2077 ------- 

2078 : :py:class:`tuple` 

2079 A tuple of :py:class:`~pyasn1.type.base.PyAsn1Item` object 

2080 recovered from BER/CER/DER substrate and the unprocessed trailing 

2081 portion of the `substrate` (may be empty) 

2082 

2083 Raises 

2084 ------ 

2085 : :py:class:`~pyasn1.error.PyAsn1Error` 

2086 :py:class:`~pyasn1.error.SubstrateUnderrunError` on insufficient 

2087 input or :py:class:`~pyasn1.error.PyAsn1Error` on decoding error. 

2088 

2089 Examples 

2090 -------- 

2091 Decode BER/CER/DER serialisation without ASN.1 schema 

2092 

2093 .. code-block:: pycon 

2094 

2095 >>> s, unprocessed = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03') 

2096 >>> str(s) 

2097 SequenceOf: 

2098 1 2 3 

2099 

2100 Decode BER/CER/DER serialisation with ASN.1 schema 

2101 

2102 .. code-block:: pycon 

2103 

2104 >>> seq = SequenceOf(componentType=Integer()) 

2105 >>> s, unprocessed = decode( 

2106 b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03', asn1Spec=seq) 

2107 >>> str(s) 

2108 SequenceOf: 

2109 1 2 3 

2110 

2111 """ 

2112 substrate = asSeekableStream(substrate) 

2113 

2114 if "substrateFun" in options: 

2115 origSubstrateFun = options["substrateFun"] 

2116 

2117 def substrateFunWrapper(asn1Object, substrate, length, options=None): 

2118 """Support both 0.4 and 0.5 style APIs. 

2119 

2120 substrateFun API has changed in 0.5 for use with streaming decoders. To stay backwards compatible, 

2121 we first try if we received a streaming user callback. If that fails,we assume we've received a 

2122 non-streaming v0.4 user callback and convert it for streaming on the fly 

2123 """ 

2124 try: 

2125 substrate_gen = origSubstrateFun(asn1Object, substrate, length, options) 

2126 except TypeError as _value: 

2127 if _value.__traceback__.tb_next: 

2128 # Traceback depth > 1 means TypeError from inside user provided function 

2129 raise 

2130 # invariant maintained at Decoder.__call__ entry 

2131 assert isinstance(substrate, io.BytesIO) # nosec assert_used 

2132 substrate_gen = Decoder._callSubstrateFunV4asV5(origSubstrateFun, asn1Object, substrate, length) 

2133 for value in substrate_gen: 

2134 yield value 

2135 

2136 options["substrateFun"] = substrateFunWrapper 

2137 

2138 streamingDecoder = cls.STREAMING_DECODER( 

2139 substrate, asn1Spec, **options) 

2140 

2141 for asn1Object in streamingDecoder: 

2142 if isinstance(asn1Object, SubstrateUnderrunError): 

2143 raise error.SubstrateUnderrunError('Short substrate on input') 

2144 

2145 try: 

2146 tail = next(readFromStream(substrate)) 

2147 

2148 except error.EndOfStreamError: 

2149 tail = b'' 

2150 

2151 return asn1Object, tail 

2152 

2153 @staticmethod 

2154 def _callSubstrateFunV4asV5(substrateFunV4, asn1Object, substrate, length): 

2155 substrate_bytes = substrate.read() 

2156 if length == -1: 

2157 length = len(substrate_bytes) 

2158 value, nextSubstrate = substrateFunV4(asn1Object, substrate_bytes, length) 

2159 nbytes = substrate.write(nextSubstrate) 

2160 substrate.truncate() 

2161 substrate.seek(-nbytes, os.SEEK_CUR) 

2162 yield value 

2163 

2164#: Turns BER octet stream into an ASN.1 object. 

2165#: 

2166#: Takes BER octet-stream and decode it into an ASN.1 object 

2167#: (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) which 

2168#: may be a scalar or an arbitrary nested structure. 

2169#: 

2170#: Parameters 

2171#: ---------- 

2172#: substrate: :py:class:`bytes` 

2173#: BER octet-stream 

2174#: 

2175#: Keyword Args 

2176#: ------------ 

2177#: asn1Spec: any pyasn1 type object e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative 

2178#: A pyasn1 type object to act as a template guiding the decoder. Depending on the ASN.1 structure 

2179#: being decoded, *asn1Spec* may or may not be required. Most common reason for 

2180#: it to require is that ASN.1 structure is encoded in *IMPLICIT* tagging mode. 

2181#: 

2182#: Returns 

2183#: ------- 

2184#: : :py:class:`tuple` 

2185#: A tuple of pyasn1 object recovered from BER substrate (:py:class:`~pyasn1.type.base.PyAsn1Item` derivative) 

2186#: and the unprocessed trailing portion of the *substrate* (may be empty) 

2187#: 

2188#: Raises 

2189#: ------ 

2190#: ~pyasn1.error.PyAsn1Error, ~pyasn1.error.SubstrateUnderrunError 

2191#: On decoding errors 

2192#: 

2193#: Notes 

2194#: ----- 

2195#: This function is deprecated. Please use :py:class:`Decoder` or 

2196#: :py:class:`StreamingDecoder` class instance. 

2197#: 

2198#: Examples 

2199#: -------- 

2200#: Decode BER serialisation without ASN.1 schema 

2201#: 

2202#: .. code-block:: pycon 

2203#: 

2204#: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03') 

2205#: >>> str(s) 

2206#: SequenceOf: 

2207#: 1 2 3 

2208#: 

2209#: Decode BER serialisation with ASN.1 schema 

2210#: 

2211#: .. code-block:: pycon 

2212#: 

2213#: >>> seq = SequenceOf(componentType=Integer()) 

2214#: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03', asn1Spec=seq) 

2215#: >>> str(s) 

2216#: SequenceOf: 

2217#: 1 2 3 

2218#: 

2219decode = Decoder() 

2220 

2221def __getattr__(attr: str): 

2222 if newAttr := {"tagMap": "TAG_MAP", "typeMap": "TYPE_MAP"}.get(attr): 

2223 warnings.warn(f"{attr} is deprecated. Please use {newAttr} instead.", DeprecationWarning, stacklevel=2) 

2224 return globals()[newAttr] 

2225 raise AttributeError(attr)