Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pyasn1/codec/ber/decoder.py: 15%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1061 statements  

1# 

2# This file is part of pyasn1 software. 

3# 

4# Copyright (c) 2005-2020, Ilya Etingof <etingof@gmail.com> 

5# License: https://pyasn1.readthedocs.io/en/latest/license.html 

6# 

7import io 

8import os 

9import sys 

10import warnings 

11 

12from pyasn1 import debug 

13from pyasn1 import error 

14from pyasn1.codec.ber import eoo 

15from pyasn1.codec.streaming import asSeekableStream 

16from pyasn1.codec.streaming import isEndOfStream 

17from pyasn1.codec.streaming import peekIntoStream 

18from pyasn1.codec.streaming import readFromStream 

19from pyasn1.compat import _MISSING 

20from pyasn1.error import PyAsn1Error 

21from pyasn1.type import base 

22from pyasn1.type import char 

23from pyasn1.type import tag 

24from pyasn1.type import tagmap 

25from pyasn1.type import univ 

26from pyasn1.type import useful 

27 

28__all__ = ['StreamingDecoder', 'Decoder', 'decode'] 

29 

30LOG = debug.registerLoggee(__name__, flags=debug.DEBUG_DECODER) 

31 

32noValue = base.noValue 

33 

34SubstrateUnderrunError = error.SubstrateUnderrunError 

35 

36 

37class AbstractPayloadDecoder(object): 

38 protoComponent = None 

39 

40 def valueDecoder(self, substrate, asn1Spec, 

41 tagSet=None, length=None, state=None, 

42 decodeFun=None, substrateFun=None, 

43 **options): 

44 """Decode value with fixed byte length. 

45 

46 The decoder is allowed to consume as many bytes as necessary. 

47 """ 

48 raise error.PyAsn1Error('SingleItemDecoder not implemented for %s' % (tagSet,)) # TODO: Seems more like an NotImplementedError? 

49 

50 def indefLenValueDecoder(self, substrate, asn1Spec, 

51 tagSet=None, length=None, state=None, 

52 decodeFun=None, substrateFun=None, 

53 **options): 

54 """Decode value with undefined length. 

55 

56 The decoder is allowed to consume as many bytes as necessary. 

57 """ 

58 raise error.PyAsn1Error('Indefinite length mode decoder not implemented for %s' % (tagSet,)) # TODO: Seems more like an NotImplementedError? 

59 

60 @staticmethod 

61 def _passAsn1Object(asn1Object, options): 

62 if 'asn1Object' not in options: 

63 options['asn1Object'] = asn1Object 

64 

65 return options 

66 

67 

68class AbstractSimplePayloadDecoder(AbstractPayloadDecoder): 

69 @staticmethod 

70 def substrateCollector(asn1Object, substrate, length, options): 

71 for chunk in readFromStream(substrate, length, options): 

72 yield chunk 

73 

74 def _createComponent(self, asn1Spec, tagSet, value, **options): 

75 if options.get('native'): 

76 return value 

77 elif asn1Spec is None: 

78 return self.protoComponent.clone(value, tagSet=tagSet) 

79 elif value is noValue: 

80 return asn1Spec 

81 else: 

82 return asn1Spec.clone(value) 

83 

84 

85class RawPayloadDecoder(AbstractSimplePayloadDecoder): 

86 protoComponent = univ.Any('') 

87 

88 def valueDecoder(self, substrate, asn1Spec, 

89 tagSet=None, length=None, state=None, 

90 decodeFun=None, substrateFun=None, 

91 **options): 

92 if substrateFun: 

93 asn1Object = self._createComponent(asn1Spec, tagSet, '', **options) 

94 

95 for chunk in substrateFun(asn1Object, substrate, length, options): 

96 yield chunk 

97 

98 return 

99 

100 for value in decodeFun(substrate, asn1Spec, tagSet, length, **options): 

101 yield value 

102 

103 def indefLenValueDecoder(self, substrate, asn1Spec, 

104 tagSet=None, length=None, state=None, 

105 decodeFun=None, substrateFun=None, 

106 **options): 

107 if substrateFun: 

108 asn1Object = self._createComponent(asn1Spec, tagSet, '', **options) 

109 

110 for chunk in substrateFun(asn1Object, substrate, length, options): 

111 yield chunk 

112 

113 return 

114 

115 while True: 

116 for value in decodeFun( 

117 substrate, asn1Spec, tagSet, length, 

118 allowEoo=True, **options): 

119 

120 if value is eoo.endOfOctets: 

121 return 

122 

123 yield value 

124 

125 

126rawPayloadDecoder = RawPayloadDecoder() 

127 

128 

129class IntegerPayloadDecoder(AbstractSimplePayloadDecoder): 

130 protoComponent = univ.Integer(0) 

131 

132 def valueDecoder(self, substrate, asn1Spec, 

133 tagSet=None, length=None, state=None, 

134 decodeFun=None, substrateFun=None, 

135 **options): 

136 

137 if tagSet[0].tagFormat != tag.tagFormatSimple: 

138 raise error.PyAsn1Error('Simple tag format expected') 

139 

140 for chunk in readFromStream(substrate, length, options): 

141 if isinstance(chunk, SubstrateUnderrunError): 

142 yield chunk 

143 

144 if chunk: 

145 value = int.from_bytes(bytes(chunk), 'big', signed=True) 

146 

147 else: 

148 value = 0 

149 

150 yield self._createComponent(asn1Spec, tagSet, value, **options) 

151 

152 

153class BooleanPayloadDecoder(IntegerPayloadDecoder): 

154 protoComponent = univ.Boolean(0) 

155 

156 def _createComponent(self, asn1Spec, tagSet, value, **options): 

157 return IntegerPayloadDecoder._createComponent( 

158 self, asn1Spec, tagSet, value and 1 or 0, **options) 

159 

160 

161class BitStringPayloadDecoder(AbstractSimplePayloadDecoder): 

162 protoComponent = univ.BitString(()) 

163 supportConstructedForm = True 

164 

165 def valueDecoder(self, substrate, asn1Spec, 

166 tagSet=None, length=None, state=None, 

167 decodeFun=None, substrateFun=None, 

168 **options): 

169 

170 if substrateFun: 

171 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options) 

172 

173 for chunk in substrateFun(asn1Object, substrate, length, options): 

174 yield chunk 

175 

176 return 

177 

178 if not length: 

179 raise error.PyAsn1Error('Empty BIT STRING substrate') 

180 

181 for chunk in isEndOfStream(substrate): 

182 if isinstance(chunk, SubstrateUnderrunError): 

183 yield chunk 

184 

185 if chunk: 

186 raise error.PyAsn1Error('Empty BIT STRING substrate') 

187 

188 if tagSet[0].tagFormat == tag.tagFormatSimple: # XXX what tag to check? 

189 

190 for trailingBits in readFromStream(substrate, 1, options): 

191 if isinstance(trailingBits, SubstrateUnderrunError): 

192 yield trailingBits 

193 

194 trailingBits = ord(trailingBits) 

195 if trailingBits > 7: 

196 raise error.PyAsn1Error( 

197 'Trailing bits overflow %s' % trailingBits 

198 ) 

199 

200 for chunk in readFromStream(substrate, length - 1, options): 

201 if isinstance(chunk, SubstrateUnderrunError): 

202 yield chunk 

203 

204 value = self.protoComponent.fromOctetString( 

205 chunk, internalFormat=True, padding=trailingBits) 

206 

207 yield self._createComponent(asn1Spec, tagSet, value, **options) 

208 

209 return 

210 

211 if not self.supportConstructedForm: 

212 raise error.PyAsn1Error('Constructed encoding form prohibited ' 

213 'at %s' % self.__class__.__name__) 

214 

215 if LOG: 

216 LOG('assembling constructed serialization') 

217 

218 # All inner fragments are of the same type, treat them as octet string 

219 substrateFun = self.substrateCollector 

220 

221 bitString = self.protoComponent.fromOctetString(b'', internalFormat=True) 

222 

223 current_position = substrate.tell() 

224 

225 while substrate.tell() - current_position < length: 

226 for component in decodeFun( 

227 substrate, self.protoComponent, substrateFun=substrateFun, 

228 **options): 

229 if isinstance(component, SubstrateUnderrunError): 

230 yield component 

231 

232 trailingBits = component[0] 

233 if trailingBits > 7: 

234 raise error.PyAsn1Error( 

235 'Trailing bits overflow %s' % trailingBits 

236 ) 

237 

238 bitString = self.protoComponent.fromOctetString( 

239 component[1:], internalFormat=True, 

240 prepend=bitString, padding=trailingBits 

241 ) 

242 

243 yield self._createComponent(asn1Spec, tagSet, bitString, **options) 

244 

245 def indefLenValueDecoder(self, substrate, asn1Spec, 

246 tagSet=None, length=None, state=None, 

247 decodeFun=None, substrateFun=None, 

248 **options): 

249 

250 if substrateFun: 

251 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options) 

252 

253 for chunk in substrateFun(asn1Object, substrate, length, options): 

254 yield chunk 

255 

256 return 

257 

258 # All inner fragments are of the same type, treat them as octet string 

259 substrateFun = self.substrateCollector 

260 

261 bitString = self.protoComponent.fromOctetString(b'', internalFormat=True) 

262 

263 while True: # loop over fragments 

264 

265 for component in decodeFun( 

266 substrate, self.protoComponent, substrateFun=substrateFun, 

267 allowEoo=True, **options): 

268 

269 if component is eoo.endOfOctets: 

270 break 

271 

272 if isinstance(component, SubstrateUnderrunError): 

273 yield component 

274 

275 if component is eoo.endOfOctets: 

276 break 

277 

278 trailingBits = component[0] 

279 if trailingBits > 7: 

280 raise error.PyAsn1Error( 

281 'Trailing bits overflow %s' % trailingBits 

282 ) 

283 

284 bitString = self.protoComponent.fromOctetString( 

285 component[1:], internalFormat=True, 

286 prepend=bitString, padding=trailingBits 

287 ) 

288 

289 yield self._createComponent(asn1Spec, tagSet, bitString, **options) 

290 

291 

292class OctetStringPayloadDecoder(AbstractSimplePayloadDecoder): 

293 protoComponent = univ.OctetString('') 

294 supportConstructedForm = True 

295 

296 def valueDecoder(self, substrate, asn1Spec, 

297 tagSet=None, length=None, state=None, 

298 decodeFun=None, substrateFun=None, 

299 **options): 

300 if substrateFun: 

301 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options) 

302 

303 for chunk in substrateFun(asn1Object, substrate, length, options): 

304 yield chunk 

305 

306 return 

307 

308 if tagSet[0].tagFormat == tag.tagFormatSimple: # XXX what tag to check? 

309 for chunk in readFromStream(substrate, length, options): 

310 if isinstance(chunk, SubstrateUnderrunError): 

311 yield chunk 

312 

313 yield self._createComponent(asn1Spec, tagSet, chunk, **options) 

314 

315 return 

316 

317 if not self.supportConstructedForm: 

318 raise error.PyAsn1Error('Constructed encoding form prohibited at %s' % self.__class__.__name__) 

319 

320 if LOG: 

321 LOG('assembling constructed serialization') 

322 

323 # All inner fragments are of the same type, treat them as octet string 

324 substrateFun = self.substrateCollector 

325 

326 header = b'' 

327 

328 original_position = substrate.tell() 

329 # head = popSubstream(substrate, length) 

330 while substrate.tell() - original_position < length: 

331 for component in decodeFun( 

332 substrate, self.protoComponent, substrateFun=substrateFun, 

333 **options): 

334 if isinstance(component, SubstrateUnderrunError): 

335 yield component 

336 

337 header += component 

338 

339 yield self._createComponent(asn1Spec, tagSet, header, **options) 

340 

341 def indefLenValueDecoder(self, substrate, asn1Spec, 

342 tagSet=None, length=None, state=None, 

343 decodeFun=None, substrateFun=None, 

344 **options): 

345 if substrateFun and substrateFun is not self.substrateCollector: 

346 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options) 

347 

348 for chunk in substrateFun(asn1Object, substrate, length, options): 

349 yield chunk 

350 

351 return 

352 

353 # All inner fragments are of the same type, treat them as octet string 

354 substrateFun = self.substrateCollector 

355 

356 header = b'' 

357 

358 while True: # loop over fragments 

359 

360 for component in decodeFun( 

361 substrate, self.protoComponent, substrateFun=substrateFun, 

362 allowEoo=True, **options): 

363 

364 if isinstance(component, SubstrateUnderrunError): 

365 yield component 

366 

367 if component is eoo.endOfOctets: 

368 break 

369 

370 if component is eoo.endOfOctets: 

371 break 

372 

373 header += component 

374 

375 yield self._createComponent(asn1Spec, tagSet, header, **options) 

376 

377 

378class NullPayloadDecoder(AbstractSimplePayloadDecoder): 

379 protoComponent = univ.Null('') 

380 

381 def valueDecoder(self, substrate, asn1Spec, 

382 tagSet=None, length=None, state=None, 

383 decodeFun=None, substrateFun=None, 

384 **options): 

385 

386 if tagSet[0].tagFormat != tag.tagFormatSimple: 

387 raise error.PyAsn1Error('Simple tag format expected') 

388 

389 for chunk in readFromStream(substrate, length, options): 

390 if isinstance(chunk, SubstrateUnderrunError): 

391 yield chunk 

392 

393 component = self._createComponent(asn1Spec, tagSet, '', **options) 

394 

395 if chunk: 

396 raise error.PyAsn1Error('Unexpected %d-octet substrate for Null' % length) 

397 

398 yield component 

399 

400 

401class ObjectIdentifierPayloadDecoder(AbstractSimplePayloadDecoder): 

402 protoComponent = univ.ObjectIdentifier(()) 

403 

404 def valueDecoder(self, substrate, asn1Spec, 

405 tagSet=None, length=None, state=None, 

406 decodeFun=None, substrateFun=None, 

407 **options): 

408 if tagSet[0].tagFormat != tag.tagFormatSimple: 

409 raise error.PyAsn1Error('Simple tag format expected') 

410 

411 for chunk in readFromStream(substrate, length, options): 

412 if isinstance(chunk, SubstrateUnderrunError): 

413 yield chunk 

414 

415 if not chunk: 

416 raise error.PyAsn1Error('Empty substrate') 

417 

418 oid = () 

419 index = 0 

420 substrateLen = len(chunk) 

421 while index < substrateLen: 

422 subId = chunk[index] 

423 index += 1 

424 if subId < 128: 

425 oid += (subId,) 

426 elif subId > 128: 

427 # Construct subid from a number of octets 

428 nextSubId = subId 

429 subId = 0 

430 while nextSubId >= 128: 

431 subId = (subId << 7) + (nextSubId & 0x7F) 

432 if index >= substrateLen: 

433 raise error.SubstrateUnderrunError( 

434 'Short substrate for sub-OID past %s' % (oid,) 

435 ) 

436 nextSubId = chunk[index] 

437 index += 1 

438 oid += ((subId << 7) + nextSubId,) 

439 elif subId == 128: 

440 # ASN.1 spec forbids leading zeros (0x80) in OID 

441 # encoding, tolerating it opens a vulnerability. See 

442 # https://www.esat.kuleuven.be/cosic/publications/article-1432.pdf 

443 # page 7 

444 raise error.PyAsn1Error('Invalid octet 0x80 in OID encoding') 

445 

446 # Decode two leading arcs 

447 if 0 <= oid[0] <= 39: 

448 oid = (0,) + oid 

449 elif 40 <= oid[0] <= 79: 

450 oid = (1, oid[0] - 40) + oid[1:] 

451 elif oid[0] >= 80: 

452 oid = (2, oid[0] - 80) + oid[1:] 

453 else: 

454 raise error.PyAsn1Error('Malformed first OID octet: %s' % chunk[0]) 

455 

456 yield self._createComponent(asn1Spec, tagSet, oid, **options) 

457 

458 

459class RelativeOIDPayloadDecoder(AbstractSimplePayloadDecoder): 

460 protoComponent = univ.RelativeOID(()) 

461 

462 def valueDecoder(self, substrate, asn1Spec, 

463 tagSet=None, length=None, state=None, 

464 decodeFun=None, substrateFun=None, 

465 **options): 

466 if tagSet[0].tagFormat != tag.tagFormatSimple: 

467 raise error.PyAsn1Error('Simple tag format expected') 

468 

469 for chunk in readFromStream(substrate, length, options): 

470 if isinstance(chunk, SubstrateUnderrunError): 

471 yield chunk 

472 

473 if not chunk: 

474 raise error.PyAsn1Error('Empty substrate') 

475 

476 reloid = () 

477 index = 0 

478 substrateLen = len(chunk) 

479 while index < substrateLen: 

480 subId = chunk[index] 

481 index += 1 

482 if subId < 128: 

483 reloid += (subId,) 

484 elif subId > 128: 

485 # Construct subid from a number of octets 

486 nextSubId = subId 

487 subId = 0 

488 while nextSubId >= 128: 

489 subId = (subId << 7) + (nextSubId & 0x7F) 

490 if index >= substrateLen: 

491 raise error.SubstrateUnderrunError( 

492 'Short substrate for sub-OID past %s' % (reloid,) 

493 ) 

494 nextSubId = chunk[index] 

495 index += 1 

496 reloid += ((subId << 7) + nextSubId,) 

497 elif subId == 128: 

498 # ASN.1 spec forbids leading zeros (0x80) in OID 

499 # encoding, tolerating it opens a vulnerability. See 

500 # https://www.esat.kuleuven.be/cosic/publications/article-1432.pdf 

501 # page 7 

502 raise error.PyAsn1Error('Invalid octet 0x80 in RELATIVE-OID encoding') 

503 

504 yield self._createComponent(asn1Spec, tagSet, reloid, **options) 

505 

506 

507class RealPayloadDecoder(AbstractSimplePayloadDecoder): 

508 protoComponent = univ.Real() 

509 

510 def valueDecoder(self, substrate, asn1Spec, 

511 tagSet=None, length=None, state=None, 

512 decodeFun=None, substrateFun=None, 

513 **options): 

514 if tagSet[0].tagFormat != tag.tagFormatSimple: 

515 raise error.PyAsn1Error('Simple tag format expected') 

516 

517 for chunk in readFromStream(substrate, length, options): 

518 if isinstance(chunk, SubstrateUnderrunError): 

519 yield chunk 

520 

521 if not chunk: 

522 yield self._createComponent(asn1Spec, tagSet, 0.0, **options) 

523 return 

524 

525 fo = chunk[0] 

526 chunk = chunk[1:] 

527 if fo & 0x80: # binary encoding 

528 if not chunk: 

529 raise error.PyAsn1Error("Incomplete floating-point value") 

530 

531 if LOG: 

532 LOG('decoding binary encoded REAL') 

533 

534 n = (fo & 0x03) + 1 

535 

536 if n == 4: 

537 n = chunk[0] 

538 chunk = chunk[1:] 

539 

540 eo, chunk = chunk[:n], chunk[n:] 

541 

542 if not eo or not chunk: 

543 raise error.PyAsn1Error('Real exponent screwed') 

544 

545 e = eo[0] & 0x80 and -1 or 0 

546 

547 while eo: # exponent 

548 e <<= 8 

549 e |= eo[0] 

550 eo = eo[1:] 

551 

552 b = fo >> 4 & 0x03 # base bits 

553 

554 if b > 2: 

555 raise error.PyAsn1Error('Illegal Real base') 

556 

557 if b == 1: # encbase = 8 

558 e *= 3 

559 

560 elif b == 2: # encbase = 16 

561 e *= 4 

562 p = 0 

563 

564 while chunk: # value 

565 p <<= 8 

566 p |= chunk[0] 

567 chunk = chunk[1:] 

568 

569 if fo & 0x40: # sign bit 

570 p = -p 

571 

572 sf = fo >> 2 & 0x03 # scale bits 

573 p *= 2 ** sf 

574 value = (p, 2, e) 

575 

576 elif fo & 0x40: # infinite value 

577 if LOG: 

578 LOG('decoding infinite REAL') 

579 

580 value = fo & 0x01 and '-inf' or 'inf' 

581 

582 elif fo & 0xc0 == 0: # character encoding 

583 if not chunk: 

584 raise error.PyAsn1Error("Incomplete floating-point value") 

585 

586 if LOG: 

587 LOG('decoding character encoded REAL') 

588 

589 try: 

590 if fo & 0x3 == 0x1: # NR1 

591 value = (int(chunk), 10, 0) 

592 

593 elif fo & 0x3 == 0x2: # NR2 

594 value = float(chunk) 

595 

596 elif fo & 0x3 == 0x3: # NR3 

597 value = float(chunk) 

598 

599 else: 

600 raise error.SubstrateUnderrunError( 

601 'Unknown NR (tag %s)' % fo 

602 ) 

603 

604 except ValueError: 

605 raise error.SubstrateUnderrunError( 

606 'Bad character Real syntax' 

607 ) 

608 

609 else: 

610 raise error.SubstrateUnderrunError( 

611 'Unknown encoding (tag %s)' % fo 

612 ) 

613 

614 yield self._createComponent(asn1Spec, tagSet, value, **options) 

615 

616 

617class AbstractConstructedPayloadDecoder(AbstractPayloadDecoder): 

618 protoComponent = None 

619 

620 

621class ConstructedPayloadDecoderBase(AbstractConstructedPayloadDecoder): 

622 protoRecordComponent = None 

623 protoSequenceComponent = None 

624 

625 def _getComponentTagMap(self, asn1Object, idx): 

626 raise NotImplementedError 

627 

628 def _getComponentPositionByType(self, asn1Object, tagSet, idx): 

629 raise NotImplementedError 

630 

631 def _decodeComponentsSchemaless( 

632 self, substrate, tagSet=None, decodeFun=None, 

633 length=None, **options): 

634 

635 asn1Object = None 

636 

637 components = [] 

638 componentTypes = set() 

639 

640 original_position = substrate.tell() 

641 

642 while length == -1 or substrate.tell() < original_position + length: 

643 for component in decodeFun(substrate, **options): 

644 if isinstance(component, SubstrateUnderrunError): 

645 yield component 

646 

647 if length == -1 and component is eoo.endOfOctets: 

648 break 

649 

650 components.append(component) 

651 componentTypes.add(component.tagSet) 

652 

653 # Now we have to guess is it SEQUENCE/SET or SEQUENCE OF/SET OF 

654 # The heuristics is: 

655 # * 1+ components of different types -> likely SEQUENCE/SET 

656 # * otherwise -> likely SEQUENCE OF/SET OF 

657 if len(componentTypes) > 1: 

658 protoComponent = self.protoRecordComponent 

659 

660 else: 

661 protoComponent = self.protoSequenceComponent 

662 

663 asn1Object = protoComponent.clone( 

664 # construct tagSet from base tag from prototype ASN.1 object 

665 # and additional tags recovered from the substrate 

666 tagSet=tag.TagSet(protoComponent.tagSet.baseTag, *tagSet.superTags) 

667 ) 

668 

669 if LOG: 

670 LOG('guessed %r container type (pass `asn1Spec` to guide the ' 

671 'decoder)' % asn1Object) 

672 

673 for idx, component in enumerate(components): 

674 asn1Object.setComponentByPosition( 

675 idx, component, 

676 verifyConstraints=False, 

677 matchTags=False, matchConstraints=False 

678 ) 

679 

680 yield asn1Object 

681 

682 def valueDecoder(self, substrate, asn1Spec, 

683 tagSet=None, length=None, state=None, 

684 decodeFun=None, substrateFun=None, 

685 **options): 

686 if tagSet[0].tagFormat != tag.tagFormatConstructed: 

687 raise error.PyAsn1Error('Constructed tag format expected') 

688 

689 original_position = substrate.tell() 

690 

691 if substrateFun: 

692 if asn1Spec is not None: 

693 asn1Object = asn1Spec.clone() 

694 

695 elif self.protoComponent is not None: 

696 asn1Object = self.protoComponent.clone(tagSet=tagSet) 

697 

698 else: 

699 asn1Object = self.protoRecordComponent, self.protoSequenceComponent 

700 

701 for chunk in substrateFun(asn1Object, substrate, length, options): 

702 yield chunk 

703 

704 return 

705 

706 if asn1Spec is None: 

707 for asn1Object in self._decodeComponentsSchemaless( 

708 substrate, tagSet=tagSet, decodeFun=decodeFun, 

709 length=length, **options): 

710 if isinstance(asn1Object, SubstrateUnderrunError): 

711 yield asn1Object 

712 

713 if substrate.tell() < original_position + length: 

714 if LOG: 

715 for trailing in readFromStream(substrate, context=options): 

716 if isinstance(trailing, SubstrateUnderrunError): 

717 yield trailing 

718 

719 LOG('Unused trailing %d octets encountered: %s' % ( 

720 len(trailing), debug.hexdump(trailing))) 

721 

722 yield asn1Object 

723 

724 return 

725 

726 asn1Object = asn1Spec.clone() 

727 asn1Object.clear() 

728 

729 options = self._passAsn1Object(asn1Object, options) 

730 

731 if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId): 

732 

733 namedTypes = asn1Spec.componentType 

734 

735 isSetType = asn1Spec.typeId == univ.Set.typeId 

736 isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault 

737 

738 if LOG: 

739 LOG('decoding %sdeterministic %s type %r chosen by type ID' % ( 

740 not isDeterministic and 'non-' or '', isSetType and 'SET' or '', 

741 asn1Spec)) 

742 

743 seenIndices = set() 

744 idx = 0 

745 while substrate.tell() - original_position < length: 

746 if not namedTypes: 

747 componentType = None 

748 

749 elif isSetType: 

750 componentType = namedTypes.tagMapUnique 

751 

752 else: 

753 try: 

754 if isDeterministic: 

755 componentType = namedTypes[idx].asn1Object 

756 

757 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 

758 componentType = namedTypes.getTagMapNearPosition(idx) 

759 

760 else: 

761 componentType = namedTypes[idx].asn1Object 

762 

763 except IndexError: 

764 raise error.PyAsn1Error( 

765 'Excessive components decoded at %r' % (asn1Spec,) 

766 ) 

767 

768 for component in decodeFun(substrate, componentType, **options): 

769 if isinstance(component, SubstrateUnderrunError): 

770 yield component 

771 

772 if not isDeterministic and namedTypes: 

773 if isSetType: 

774 idx = namedTypes.getPositionByType(component.effectiveTagSet) 

775 

776 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 

777 idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx) 

778 

779 asn1Object.setComponentByPosition( 

780 idx, component, 

781 verifyConstraints=False, 

782 matchTags=False, matchConstraints=False 

783 ) 

784 

785 seenIndices.add(idx) 

786 idx += 1 

787 

788 if LOG: 

789 LOG('seen component indices %s' % seenIndices) 

790 

791 if namedTypes: 

792 if not namedTypes.requiredComponents.issubset(seenIndices): 

793 raise error.PyAsn1Error( 

794 'ASN.1 object %s has uninitialized ' 

795 'components' % asn1Object.__class__.__name__) 

796 

797 if namedTypes.hasOpenTypes: 

798 

799 openTypes = options.get('openTypes', {}) 

800 

801 if LOG: 

802 LOG('user-specified open types map:') 

803 

804 for k, v in openTypes.items(): 

805 LOG('%s -> %r' % (k, v)) 

806 

807 if openTypes or options.get('decodeOpenTypes', False): 

808 

809 for idx, namedType in enumerate(namedTypes.namedTypes): 

810 if not namedType.openType: 

811 continue 

812 

813 if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue: 

814 continue 

815 

816 governingValue = asn1Object.getComponentByName( 

817 namedType.openType.name 

818 ) 

819 

820 try: 

821 openType = openTypes[governingValue] 

822 

823 except KeyError: 

824 

825 if LOG: 

826 LOG('default open types map of component ' 

827 '"%s.%s" governed by component "%s.%s"' 

828 ':' % (asn1Object.__class__.__name__, 

829 namedType.name, 

830 asn1Object.__class__.__name__, 

831 namedType.openType.name)) 

832 

833 for k, v in namedType.openType.items(): 

834 LOG('%s -> %r' % (k, v)) 

835 

836 try: 

837 openType = namedType.openType[governingValue] 

838 

839 except KeyError: 

840 if LOG: 

841 LOG('failed to resolve open type by governing ' 

842 'value %r' % (governingValue,)) 

843 continue 

844 

845 if LOG: 

846 LOG('resolved open type %r by governing ' 

847 'value %r' % (openType, governingValue)) 

848 

849 containerValue = asn1Object.getComponentByPosition(idx) 

850 

851 if containerValue.typeId in ( 

852 univ.SetOf.typeId, univ.SequenceOf.typeId): 

853 

854 for pos, containerElement in enumerate( 

855 containerValue): 

856 

857 stream = asSeekableStream(containerValue[pos].asOctets()) 

858 

859 for component in decodeFun(stream, asn1Spec=openType, **options): 

860 if isinstance(component, SubstrateUnderrunError): 

861 yield component 

862 

863 containerValue[pos] = component 

864 

865 else: 

866 stream = asSeekableStream(asn1Object.getComponentByPosition(idx).asOctets()) 

867 

868 for component in decodeFun(stream, asn1Spec=openType, **options): 

869 if isinstance(component, SubstrateUnderrunError): 

870 yield component 

871 

872 asn1Object.setComponentByPosition(idx, component) 

873 

874 else: 

875 inconsistency = asn1Object.isInconsistent 

876 if inconsistency: 

877 raise error.PyAsn1Error( 

878 f"ASN.1 object {asn1Object.__class__.__name__} is inconsistent") 

879 

880 else: 

881 componentType = asn1Spec.componentType 

882 

883 if LOG: 

884 LOG('decoding type %r chosen by given `asn1Spec`' % componentType) 

885 

886 idx = 0 

887 

888 while substrate.tell() - original_position < length: 

889 for component in decodeFun(substrate, componentType, **options): 

890 if isinstance(component, SubstrateUnderrunError): 

891 yield component 

892 

893 asn1Object.setComponentByPosition( 

894 idx, component, 

895 verifyConstraints=False, 

896 matchTags=False, matchConstraints=False 

897 ) 

898 

899 idx += 1 

900 

901 yield asn1Object 

902 

903 def indefLenValueDecoder(self, substrate, asn1Spec, 

904 tagSet=None, length=None, state=None, 

905 decodeFun=None, substrateFun=None, 

906 **options): 

907 if tagSet[0].tagFormat != tag.tagFormatConstructed: 

908 raise error.PyAsn1Error('Constructed tag format expected') 

909 

910 if substrateFun is not None: 

911 if asn1Spec is not None: 

912 asn1Object = asn1Spec.clone() 

913 

914 elif self.protoComponent is not None: 

915 asn1Object = self.protoComponent.clone(tagSet=tagSet) 

916 

917 else: 

918 asn1Object = self.protoRecordComponent, self.protoSequenceComponent 

919 

920 for chunk in substrateFun(asn1Object, substrate, length, options): 

921 yield chunk 

922 

923 return 

924 

925 if asn1Spec is None: 

926 for asn1Object in self._decodeComponentsSchemaless( 

927 substrate, tagSet=tagSet, decodeFun=decodeFun, 

928 length=length, **dict(options, allowEoo=True)): 

929 if isinstance(asn1Object, SubstrateUnderrunError): 

930 yield asn1Object 

931 

932 yield asn1Object 

933 

934 return 

935 

936 asn1Object = asn1Spec.clone() 

937 asn1Object.clear() 

938 

939 options = self._passAsn1Object(asn1Object, options) 

940 

941 if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId): 

942 

943 namedTypes = asn1Object.componentType 

944 

945 isSetType = asn1Object.typeId == univ.Set.typeId 

946 isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault 

947 

948 if LOG: 

949 LOG('decoding %sdeterministic %s type %r chosen by type ID' % ( 

950 not isDeterministic and 'non-' or '', isSetType and 'SET' or '', 

951 asn1Spec)) 

952 

953 seenIndices = set() 

954 

955 idx = 0 

956 

957 while True: # loop over components 

958 if len(namedTypes) <= idx: 

959 asn1Spec = None 

960 

961 elif isSetType: 

962 asn1Spec = namedTypes.tagMapUnique 

963 

964 else: 

965 try: 

966 if isDeterministic: 

967 asn1Spec = namedTypes[idx].asn1Object 

968 

969 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 

970 asn1Spec = namedTypes.getTagMapNearPosition(idx) 

971 

972 else: 

973 asn1Spec = namedTypes[idx].asn1Object 

974 

975 except IndexError: 

976 raise error.PyAsn1Error( 

977 'Excessive components decoded at %r' % (asn1Object,) 

978 ) 

979 

980 for component in decodeFun(substrate, asn1Spec, allowEoo=True, **options): 

981 

982 if isinstance(component, SubstrateUnderrunError): 

983 yield component 

984 

985 if component is eoo.endOfOctets: 

986 break 

987 

988 if component is eoo.endOfOctets: 

989 break 

990 

991 if not isDeterministic and namedTypes: 

992 if isSetType: 

993 idx = namedTypes.getPositionByType(component.effectiveTagSet) 

994 

995 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 

996 idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx) 

997 

998 asn1Object.setComponentByPosition( 

999 idx, component, 

1000 verifyConstraints=False, 

1001 matchTags=False, matchConstraints=False 

1002 ) 

1003 

1004 seenIndices.add(idx) 

1005 idx += 1 

1006 

1007 if LOG: 

1008 LOG('seen component indices %s' % seenIndices) 

1009 

1010 if namedTypes: 

1011 if not namedTypes.requiredComponents.issubset(seenIndices): 

1012 raise error.PyAsn1Error( 

1013 'ASN.1 object %s has uninitialized ' 

1014 'components' % asn1Object.__class__.__name__) 

1015 

1016 if namedTypes.hasOpenTypes: 

1017 

1018 openTypes = options.get('openTypes', {}) 

1019 

1020 if LOG: 

1021 LOG('user-specified open types map:') 

1022 

1023 for k, v in openTypes.items(): 

1024 LOG('%s -> %r' % (k, v)) 

1025 

1026 if openTypes or options.get('decodeOpenTypes', False): 

1027 

1028 for idx, namedType in enumerate(namedTypes.namedTypes): 

1029 if not namedType.openType: 

1030 continue 

1031 

1032 if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue: 

1033 continue 

1034 

1035 governingValue = asn1Object.getComponentByName( 

1036 namedType.openType.name 

1037 ) 

1038 

1039 try: 

1040 openType = openTypes[governingValue] 

1041 

1042 except KeyError: 

1043 

1044 if LOG: 

1045 LOG('default open types map of component ' 

1046 '"%s.%s" governed by component "%s.%s"' 

1047 ':' % (asn1Object.__class__.__name__, 

1048 namedType.name, 

1049 asn1Object.__class__.__name__, 

1050 namedType.openType.name)) 

1051 

1052 for k, v in namedType.openType.items(): 

1053 LOG('%s -> %r' % (k, v)) 

1054 

1055 try: 

1056 openType = namedType.openType[governingValue] 

1057 

1058 except KeyError: 

1059 if LOG: 

1060 LOG('failed to resolve open type by governing ' 

1061 'value %r' % (governingValue,)) 

1062 continue 

1063 

1064 if LOG: 

1065 LOG('resolved open type %r by governing ' 

1066 'value %r' % (openType, governingValue)) 

1067 

1068 containerValue = asn1Object.getComponentByPosition(idx) 

1069 

1070 if containerValue.typeId in ( 

1071 univ.SetOf.typeId, univ.SequenceOf.typeId): 

1072 

1073 for pos, containerElement in enumerate( 

1074 containerValue): 

1075 

1076 stream = asSeekableStream(containerValue[pos].asOctets()) 

1077 

1078 for component in decodeFun(stream, asn1Spec=openType, 

1079 **dict(options, allowEoo=True)): 

1080 if isinstance(component, SubstrateUnderrunError): 

1081 yield component 

1082 

1083 if component is eoo.endOfOctets: 

1084 break 

1085 

1086 containerValue[pos] = component 

1087 

1088 else: 

1089 stream = asSeekableStream(asn1Object.getComponentByPosition(idx).asOctets()) 

1090 for component in decodeFun(stream, asn1Spec=openType, 

1091 **dict(options, allowEoo=True)): 

1092 if isinstance(component, SubstrateUnderrunError): 

1093 yield component 

1094 

1095 if component is eoo.endOfOctets: 

1096 break 

1097 

1098 asn1Object.setComponentByPosition(idx, component) 

1099 

1100 else: 

1101 inconsistency = asn1Object.isInconsistent 

1102 if inconsistency: 

1103 raise error.PyAsn1Error( 

1104 f"ASN.1 object {asn1Object.__class__.__name__} is inconsistent") 

1105 

1106 else: 

1107 componentType = asn1Spec.componentType 

1108 

1109 if LOG: 

1110 LOG('decoding type %r chosen by given `asn1Spec`' % componentType) 

1111 

1112 idx = 0 

1113 

1114 while True: 

1115 

1116 for component in decodeFun( 

1117 substrate, componentType, allowEoo=True, **options): 

1118 

1119 if isinstance(component, SubstrateUnderrunError): 

1120 yield component 

1121 

1122 if component is eoo.endOfOctets: 

1123 break 

1124 

1125 if component is eoo.endOfOctets: 

1126 break 

1127 

1128 asn1Object.setComponentByPosition( 

1129 idx, component, 

1130 verifyConstraints=False, 

1131 matchTags=False, matchConstraints=False 

1132 ) 

1133 

1134 idx += 1 

1135 

1136 yield asn1Object 

1137 

1138 

1139class SequenceOrSequenceOfPayloadDecoder(ConstructedPayloadDecoderBase): 

1140 protoRecordComponent = univ.Sequence() 

1141 protoSequenceComponent = univ.SequenceOf() 

1142 

1143 

1144class SequencePayloadDecoder(SequenceOrSequenceOfPayloadDecoder): 

1145 protoComponent = univ.Sequence() 

1146 

1147 

1148class SequenceOfPayloadDecoder(SequenceOrSequenceOfPayloadDecoder): 

1149 protoComponent = univ.SequenceOf() 

1150 

1151 

1152class SetOrSetOfPayloadDecoder(ConstructedPayloadDecoderBase): 

1153 protoRecordComponent = univ.Set() 

1154 protoSequenceComponent = univ.SetOf() 

1155 

1156 

1157class SetPayloadDecoder(SetOrSetOfPayloadDecoder): 

1158 protoComponent = univ.Set() 

1159 

1160 

1161class SetOfPayloadDecoder(SetOrSetOfPayloadDecoder): 

1162 protoComponent = univ.SetOf() 

1163 

1164 

1165class ChoicePayloadDecoder(ConstructedPayloadDecoderBase): 

1166 protoComponent = univ.Choice() 

1167 

1168 def valueDecoder(self, substrate, asn1Spec, 

1169 tagSet=None, length=None, state=None, 

1170 decodeFun=None, substrateFun=None, 

1171 **options): 

1172 if asn1Spec is None: 

1173 asn1Object = self.protoComponent.clone(tagSet=tagSet) 

1174 

1175 else: 

1176 asn1Object = asn1Spec.clone() 

1177 

1178 if substrateFun: 

1179 for chunk in substrateFun(asn1Object, substrate, length, options): 

1180 yield chunk 

1181 

1182 return 

1183 

1184 options = self._passAsn1Object(asn1Object, options) 

1185 

1186 if asn1Object.tagSet == tagSet: 

1187 if LOG: 

1188 LOG('decoding %s as explicitly tagged CHOICE' % (tagSet,)) 

1189 

1190 for component in decodeFun( 

1191 substrate, asn1Object.componentTagMap, **options): 

1192 if isinstance(component, SubstrateUnderrunError): 

1193 yield component 

1194 

1195 else: 

1196 if LOG: 

1197 LOG('decoding %s as untagged CHOICE' % (tagSet,)) 

1198 

1199 for component in decodeFun( 

1200 substrate, asn1Object.componentTagMap, tagSet, length, 

1201 state, **options): 

1202 if isinstance(component, SubstrateUnderrunError): 

1203 yield component 

1204 

1205 effectiveTagSet = component.effectiveTagSet 

1206 

1207 if LOG: 

1208 LOG('decoded component %s, effective tag set %s' % (component, effectiveTagSet)) 

1209 

1210 asn1Object.setComponentByType( 

1211 effectiveTagSet, component, 

1212 verifyConstraints=False, 

1213 matchTags=False, matchConstraints=False, 

1214 innerFlag=False 

1215 ) 

1216 

1217 yield asn1Object 

1218 

1219 def indefLenValueDecoder(self, substrate, asn1Spec, 

1220 tagSet=None, length=None, state=None, 

1221 decodeFun=None, substrateFun=None, 

1222 **options): 

1223 if asn1Spec is None: 

1224 asn1Object = self.protoComponent.clone(tagSet=tagSet) 

1225 

1226 else: 

1227 asn1Object = asn1Spec.clone() 

1228 

1229 if substrateFun: 

1230 for chunk in substrateFun(asn1Object, substrate, length, options): 

1231 yield chunk 

1232 

1233 return 

1234 

1235 options = self._passAsn1Object(asn1Object, options) 

1236 

1237 isTagged = asn1Object.tagSet == tagSet 

1238 

1239 if LOG: 

1240 LOG('decoding %s as %stagged CHOICE' % ( 

1241 tagSet, isTagged and 'explicitly ' or 'un')) 

1242 

1243 while True: 

1244 

1245 if isTagged: 

1246 iterator = decodeFun( 

1247 substrate, asn1Object.componentType.tagMapUnique, 

1248 **dict(options, allowEoo=True)) 

1249 

1250 else: 

1251 iterator = decodeFun( 

1252 substrate, asn1Object.componentType.tagMapUnique, 

1253 tagSet, length, state, **dict(options, allowEoo=True)) 

1254 

1255 for component in iterator: 

1256 

1257 if isinstance(component, SubstrateUnderrunError): 

1258 yield component 

1259 

1260 if component is eoo.endOfOctets: 

1261 break 

1262 

1263 effectiveTagSet = component.effectiveTagSet 

1264 

1265 if LOG: 

1266 LOG('decoded component %s, effective tag set ' 

1267 '%s' % (component, effectiveTagSet)) 

1268 

1269 asn1Object.setComponentByType( 

1270 effectiveTagSet, component, 

1271 verifyConstraints=False, 

1272 matchTags=False, matchConstraints=False, 

1273 innerFlag=False 

1274 ) 

1275 

1276 if not isTagged: 

1277 break 

1278 

1279 if not isTagged or component is eoo.endOfOctets: 

1280 break 

1281 

1282 yield asn1Object 

1283 

1284 

1285class AnyPayloadDecoder(AbstractSimplePayloadDecoder): 

1286 protoComponent = univ.Any() 

1287 

1288 def valueDecoder(self, substrate, asn1Spec, 

1289 tagSet=None, length=None, state=None, 

1290 decodeFun=None, substrateFun=None, 

1291 **options): 

1292 if asn1Spec is None: 

1293 isUntagged = True 

1294 

1295 elif asn1Spec.__class__ is tagmap.TagMap: 

1296 isUntagged = tagSet not in asn1Spec.tagMap 

1297 

1298 else: 

1299 isUntagged = tagSet != asn1Spec.tagSet 

1300 

1301 if isUntagged: 

1302 fullPosition = substrate.markedPosition 

1303 currentPosition = substrate.tell() 

1304 

1305 substrate.seek(fullPosition, os.SEEK_SET) 

1306 length += currentPosition - fullPosition 

1307 

1308 if LOG: 

1309 for chunk in peekIntoStream(substrate, length): 

1310 if isinstance(chunk, SubstrateUnderrunError): 

1311 yield chunk 

1312 LOG('decoding as untagged ANY, substrate ' 

1313 '%s' % debug.hexdump(chunk)) 

1314 

1315 if substrateFun: 

1316 for chunk in substrateFun( 

1317 self._createComponent(asn1Spec, tagSet, noValue, **options), 

1318 substrate, length, options): 

1319 yield chunk 

1320 

1321 return 

1322 

1323 for chunk in readFromStream(substrate, length, options): 

1324 if isinstance(chunk, SubstrateUnderrunError): 

1325 yield chunk 

1326 

1327 yield self._createComponent(asn1Spec, tagSet, chunk, **options) 

1328 

1329 def indefLenValueDecoder(self, substrate, asn1Spec, 

1330 tagSet=None, length=None, state=None, 

1331 decodeFun=None, substrateFun=None, 

1332 **options): 

1333 if asn1Spec is None: 

1334 isTagged = False 

1335 

1336 elif asn1Spec.__class__ is tagmap.TagMap: 

1337 isTagged = tagSet in asn1Spec.tagMap 

1338 

1339 else: 

1340 isTagged = tagSet == asn1Spec.tagSet 

1341 

1342 if isTagged: 

1343 # tagged Any type -- consume header substrate 

1344 chunk = b'' 

1345 

1346 if LOG: 

1347 LOG('decoding as tagged ANY') 

1348 

1349 else: 

1350 # TODO: Seems not to be tested 

1351 fullPosition = substrate.markedPosition 

1352 currentPosition = substrate.tell() 

1353 

1354 substrate.seek(fullPosition, os.SEEK_SET) 

1355 for chunk in readFromStream(substrate, currentPosition - fullPosition, options): 

1356 if isinstance(chunk, SubstrateUnderrunError): 

1357 yield chunk 

1358 

1359 if LOG: 

1360 LOG('decoding as untagged ANY, header substrate %s' % debug.hexdump(chunk)) 

1361 

1362 # Any components do not inherit initial tag 

1363 asn1Spec = self.protoComponent 

1364 

1365 if substrateFun and substrateFun is not self.substrateCollector: 

1366 asn1Object = self._createComponent( 

1367 asn1Spec, tagSet, noValue, **options) 

1368 

1369 for chunk in substrateFun( 

1370 asn1Object, chunk + substrate, length + len(chunk), options): 

1371 yield chunk 

1372 

1373 return 

1374 

1375 if LOG: 

1376 LOG('assembling constructed serialization') 

1377 

1378 # All inner fragments are of the same type, treat them as octet string 

1379 substrateFun = self.substrateCollector 

1380 

1381 while True: # loop over fragments 

1382 

1383 for component in decodeFun( 

1384 substrate, asn1Spec, substrateFun=substrateFun, 

1385 allowEoo=True, **options): 

1386 

1387 if isinstance(component, SubstrateUnderrunError): 

1388 yield component 

1389 

1390 if component is eoo.endOfOctets: 

1391 break 

1392 

1393 if component is eoo.endOfOctets: 

1394 break 

1395 

1396 chunk += component 

1397 

1398 if substrateFun: 

1399 yield chunk # TODO: Weird 

1400 

1401 else: 

1402 yield self._createComponent(asn1Spec, tagSet, chunk, **options) 

1403 

1404 

1405# character string types 

1406class UTF8StringPayloadDecoder(OctetStringPayloadDecoder): 

1407 protoComponent = char.UTF8String() 

1408 

1409 

1410class NumericStringPayloadDecoder(OctetStringPayloadDecoder): 

1411 protoComponent = char.NumericString() 

1412 

1413 

1414class PrintableStringPayloadDecoder(OctetStringPayloadDecoder): 

1415 protoComponent = char.PrintableString() 

1416 

1417 

1418class TeletexStringPayloadDecoder(OctetStringPayloadDecoder): 

1419 protoComponent = char.TeletexString() 

1420 

1421 

1422class VideotexStringPayloadDecoder(OctetStringPayloadDecoder): 

1423 protoComponent = char.VideotexString() 

1424 

1425 

1426class IA5StringPayloadDecoder(OctetStringPayloadDecoder): 

1427 protoComponent = char.IA5String() 

1428 

1429 

1430class GraphicStringPayloadDecoder(OctetStringPayloadDecoder): 

1431 protoComponent = char.GraphicString() 

1432 

1433 

1434class VisibleStringPayloadDecoder(OctetStringPayloadDecoder): 

1435 protoComponent = char.VisibleString() 

1436 

1437 

1438class GeneralStringPayloadDecoder(OctetStringPayloadDecoder): 

1439 protoComponent = char.GeneralString() 

1440 

1441 

1442class UniversalStringPayloadDecoder(OctetStringPayloadDecoder): 

1443 protoComponent = char.UniversalString() 

1444 

1445 

1446class BMPStringPayloadDecoder(OctetStringPayloadDecoder): 

1447 protoComponent = char.BMPString() 

1448 

1449 

1450# "useful" types 

1451class ObjectDescriptorPayloadDecoder(OctetStringPayloadDecoder): 

1452 protoComponent = useful.ObjectDescriptor() 

1453 

1454 

1455class GeneralizedTimePayloadDecoder(OctetStringPayloadDecoder): 

1456 protoComponent = useful.GeneralizedTime() 

1457 

1458 

1459class UTCTimePayloadDecoder(OctetStringPayloadDecoder): 

1460 protoComponent = useful.UTCTime() 

1461 

1462 

1463TAG_MAP = { 

1464 univ.Integer.tagSet: IntegerPayloadDecoder(), 

1465 univ.Boolean.tagSet: BooleanPayloadDecoder(), 

1466 univ.BitString.tagSet: BitStringPayloadDecoder(), 

1467 univ.OctetString.tagSet: OctetStringPayloadDecoder(), 

1468 univ.Null.tagSet: NullPayloadDecoder(), 

1469 univ.ObjectIdentifier.tagSet: ObjectIdentifierPayloadDecoder(), 

1470 univ.RelativeOID.tagSet: RelativeOIDPayloadDecoder(), 

1471 univ.Enumerated.tagSet: IntegerPayloadDecoder(), 

1472 univ.Real.tagSet: RealPayloadDecoder(), 

1473 univ.Sequence.tagSet: SequenceOrSequenceOfPayloadDecoder(), # conflicts with SequenceOf 

1474 univ.Set.tagSet: SetOrSetOfPayloadDecoder(), # conflicts with SetOf 

1475 univ.Choice.tagSet: ChoicePayloadDecoder(), # conflicts with Any 

1476 # character string types 

1477 char.UTF8String.tagSet: UTF8StringPayloadDecoder(), 

1478 char.NumericString.tagSet: NumericStringPayloadDecoder(), 

1479 char.PrintableString.tagSet: PrintableStringPayloadDecoder(), 

1480 char.TeletexString.tagSet: TeletexStringPayloadDecoder(), 

1481 char.VideotexString.tagSet: VideotexStringPayloadDecoder(), 

1482 char.IA5String.tagSet: IA5StringPayloadDecoder(), 

1483 char.GraphicString.tagSet: GraphicStringPayloadDecoder(), 

1484 char.VisibleString.tagSet: VisibleStringPayloadDecoder(), 

1485 char.GeneralString.tagSet: GeneralStringPayloadDecoder(), 

1486 char.UniversalString.tagSet: UniversalStringPayloadDecoder(), 

1487 char.BMPString.tagSet: BMPStringPayloadDecoder(), 

1488 # useful types 

1489 useful.ObjectDescriptor.tagSet: ObjectDescriptorPayloadDecoder(), 

1490 useful.GeneralizedTime.tagSet: GeneralizedTimePayloadDecoder(), 

1491 useful.UTCTime.tagSet: UTCTimePayloadDecoder() 

1492} 

1493 

1494# Type-to-codec map for ambiguous ASN.1 types 

1495TYPE_MAP = { 

1496 univ.Set.typeId: SetPayloadDecoder(), 

1497 univ.SetOf.typeId: SetOfPayloadDecoder(), 

1498 univ.Sequence.typeId: SequencePayloadDecoder(), 

1499 univ.SequenceOf.typeId: SequenceOfPayloadDecoder(), 

1500 univ.Choice.typeId: ChoicePayloadDecoder(), 

1501 univ.Any.typeId: AnyPayloadDecoder() 

1502} 

1503 

1504# Put in non-ambiguous types for faster codec lookup 

1505for typeDecoder in TAG_MAP.values(): 

1506 if typeDecoder.protoComponent is not None: 

1507 typeId = typeDecoder.protoComponent.__class__.typeId 

1508 if typeId is not None and typeId not in TYPE_MAP: 

1509 TYPE_MAP[typeId] = typeDecoder 

1510 

1511 

1512(stDecodeTag, 

1513 stDecodeLength, 

1514 stGetValueDecoder, 

1515 stGetValueDecoderByAsn1Spec, 

1516 stGetValueDecoderByTag, 

1517 stTryAsExplicitTag, 

1518 stDecodeValue, 

1519 stDumpRawValue, 

1520 stErrorCondition, 

1521 stStop) = [x for x in range(10)] 

1522 

1523 

1524EOO_SENTINEL = bytes((0, 0)) 

1525 

1526 

1527class SingleItemDecoder(object): 

1528 defaultErrorState = stErrorCondition 

1529 #defaultErrorState = stDumpRawValue 

1530 defaultRawDecoder = AnyPayloadDecoder() 

1531 

1532 supportIndefLength = True 

1533 

1534 TAG_MAP = TAG_MAP 

1535 TYPE_MAP = TYPE_MAP 

1536 

1537 def __init__(self, tagMap=_MISSING, typeMap=_MISSING, **ignored): 

1538 self._tagMap = tagMap if tagMap is not _MISSING else self.TAG_MAP 

1539 self._typeMap = typeMap if typeMap is not _MISSING else self.TYPE_MAP 

1540 

1541 # Tag & TagSet objects caches 

1542 self._tagCache = {} 

1543 self._tagSetCache = {} 

1544 

1545 def __call__(self, substrate, asn1Spec=None, 

1546 tagSet=None, length=None, state=stDecodeTag, 

1547 decodeFun=None, substrateFun=None, 

1548 **options): 

1549 

1550 allowEoo = options.pop('allowEoo', False) 

1551 

1552 if LOG: 

1553 LOG('decoder called at scope %s with state %d, working with up ' 

1554 'to %s octets of substrate: ' 

1555 '%s' % (debug.scope, state, length, substrate)) 

1556 

1557 # Look for end-of-octets sentinel 

1558 if allowEoo and self.supportIndefLength: 

1559 

1560 for eoo_candidate in readFromStream(substrate, 2, options): 

1561 if isinstance(eoo_candidate, SubstrateUnderrunError): 

1562 yield eoo_candidate 

1563 

1564 if eoo_candidate == EOO_SENTINEL: 

1565 if LOG: 

1566 LOG('end-of-octets sentinel found') 

1567 yield eoo.endOfOctets 

1568 return 

1569 

1570 else: 

1571 substrate.seek(-2, os.SEEK_CUR) 

1572 

1573 tagMap = self._tagMap 

1574 typeMap = self._typeMap 

1575 tagCache = self._tagCache 

1576 tagSetCache = self._tagSetCache 

1577 

1578 value = noValue 

1579 

1580 substrate.markedPosition = substrate.tell() 

1581 

1582 while state is not stStop: 

1583 

1584 if state is stDecodeTag: 

1585 # Decode tag 

1586 isShortTag = True 

1587 

1588 for firstByte in readFromStream(substrate, 1, options): 

1589 if isinstance(firstByte, SubstrateUnderrunError): 

1590 yield firstByte 

1591 

1592 firstOctet = ord(firstByte) 

1593 

1594 try: 

1595 lastTag = tagCache[firstOctet] 

1596 

1597 except KeyError: 

1598 integerTag = firstOctet 

1599 tagClass = integerTag & 0xC0 

1600 tagFormat = integerTag & 0x20 

1601 tagId = integerTag & 0x1F 

1602 

1603 if tagId == 0x1F: 

1604 isShortTag = False 

1605 lengthOctetIdx = 0 

1606 tagId = 0 

1607 

1608 while True: 

1609 for integerByte in readFromStream(substrate, 1, options): 

1610 if isinstance(integerByte, SubstrateUnderrunError): 

1611 yield integerByte 

1612 

1613 if not integerByte: 

1614 raise error.SubstrateUnderrunError( 

1615 'Short octet stream on long tag decoding' 

1616 ) 

1617 

1618 integerTag = ord(integerByte) 

1619 lengthOctetIdx += 1 

1620 tagId <<= 7 

1621 tagId |= (integerTag & 0x7F) 

1622 

1623 if not integerTag & 0x80: 

1624 break 

1625 

1626 lastTag = tag.Tag( 

1627 tagClass=tagClass, tagFormat=tagFormat, tagId=tagId 

1628 ) 

1629 

1630 if isShortTag: 

1631 # cache short tags 

1632 tagCache[firstOctet] = lastTag 

1633 

1634 if tagSet is None: 

1635 if isShortTag: 

1636 try: 

1637 tagSet = tagSetCache[firstOctet] 

1638 

1639 except KeyError: 

1640 # base tag not recovered 

1641 tagSet = tag.TagSet((), lastTag) 

1642 tagSetCache[firstOctet] = tagSet 

1643 else: 

1644 tagSet = tag.TagSet((), lastTag) 

1645 

1646 else: 

1647 tagSet = lastTag + tagSet 

1648 

1649 state = stDecodeLength 

1650 

1651 if LOG: 

1652 LOG('tag decoded into %s, decoding length' % tagSet) 

1653 

1654 if state is stDecodeLength: 

1655 # Decode length 

1656 for firstOctet in readFromStream(substrate, 1, options): 

1657 if isinstance(firstOctet, SubstrateUnderrunError): 

1658 yield firstOctet 

1659 

1660 firstOctet = ord(firstOctet) 

1661 

1662 if firstOctet < 128: 

1663 length = firstOctet 

1664 

1665 elif firstOctet > 128: 

1666 size = firstOctet & 0x7F 

1667 # encoded in size bytes 

1668 for encodedLength in readFromStream(substrate, size, options): 

1669 if isinstance(encodedLength, SubstrateUnderrunError): 

1670 yield encodedLength 

1671 encodedLength = list(encodedLength) 

1672 # missing check on maximum size, which shouldn't be a 

1673 # problem, we can handle more than is possible 

1674 if len(encodedLength) != size: 

1675 raise error.SubstrateUnderrunError( 

1676 '%s<%s at %s' % (size, len(encodedLength), tagSet) 

1677 ) 

1678 

1679 length = 0 

1680 for lengthOctet in encodedLength: 

1681 length <<= 8 

1682 length |= lengthOctet 

1683 size += 1 

1684 

1685 else: # 128 means indefinite 

1686 length = -1 

1687 

1688 if length == -1 and not self.supportIndefLength: 

1689 raise error.PyAsn1Error('Indefinite length encoding not supported by this codec') 

1690 

1691 state = stGetValueDecoder 

1692 

1693 if LOG: 

1694 LOG('value length decoded into %d' % length) 

1695 

1696 if state is stGetValueDecoder: 

1697 if asn1Spec is None: 

1698 state = stGetValueDecoderByTag 

1699 

1700 else: 

1701 state = stGetValueDecoderByAsn1Spec 

1702 # 

1703 # There're two ways of creating subtypes in ASN.1 what influences 

1704 # decoder operation. These methods are: 

1705 # 1) Either base types used in or no IMPLICIT tagging has been 

1706 # applied on subtyping. 

1707 # 2) Subtype syntax drops base type information (by means of 

1708 # IMPLICIT tagging. 

1709 # The first case allows for complete tag recovery from substrate 

1710 # while the second one requires original ASN.1 type spec for 

1711 # decoding. 

1712 # 

1713 # In either case a set of tags (tagSet) is coming from substrate 

1714 # in an incremental, tag-by-tag fashion (this is the case of 

1715 # EXPLICIT tag which is most basic). Outermost tag comes first 

1716 # from the wire. 

1717 # 

1718 if state is stGetValueDecoderByTag: 

1719 try: 

1720 concreteDecoder = tagMap[tagSet] 

1721 

1722 except KeyError: 

1723 concreteDecoder = None 

1724 

1725 if concreteDecoder: 

1726 state = stDecodeValue 

1727 

1728 else: 

1729 try: 

1730 concreteDecoder = tagMap[tagSet[:1]] 

1731 

1732 except KeyError: 

1733 concreteDecoder = None 

1734 

1735 if concreteDecoder: 

1736 state = stDecodeValue 

1737 else: 

1738 state = stTryAsExplicitTag 

1739 

1740 if LOG: 

1741 LOG('codec %s chosen by a built-in type, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag')) 

1742 debug.scope.push(concreteDecoder is None and '?' or concreteDecoder.protoComponent.__class__.__name__) 

1743 

1744 if state is stGetValueDecoderByAsn1Spec: 

1745 

1746 if asn1Spec.__class__ is tagmap.TagMap: 

1747 try: 

1748 chosenSpec = asn1Spec[tagSet] 

1749 

1750 except KeyError: 

1751 chosenSpec = None 

1752 

1753 if LOG: 

1754 LOG('candidate ASN.1 spec is a map of:') 

1755 

1756 for firstOctet, v in asn1Spec.presentTypes.items(): 

1757 LOG(' %s -> %s' % (firstOctet, v.__class__.__name__)) 

1758 

1759 if asn1Spec.skipTypes: 

1760 LOG('but neither of: ') 

1761 for firstOctet, v in asn1Spec.skipTypes.items(): 

1762 LOG(' %s -> %s' % (firstOctet, v.__class__.__name__)) 

1763 LOG('new candidate ASN.1 spec is %s, chosen by %s' % (chosenSpec is None and '<none>' or chosenSpec.prettyPrintType(), tagSet)) 

1764 

1765 elif tagSet == asn1Spec.tagSet or tagSet in asn1Spec.tagMap: 

1766 chosenSpec = asn1Spec 

1767 if LOG: 

1768 LOG('candidate ASN.1 spec is %s' % asn1Spec.__class__.__name__) 

1769 

1770 else: 

1771 chosenSpec = None 

1772 

1773 if chosenSpec is not None: 

1774 try: 

1775 # ambiguous type or just faster codec lookup 

1776 concreteDecoder = typeMap[chosenSpec.typeId] 

1777 

1778 if LOG: 

1779 LOG('value decoder chosen for an ambiguous type by type ID %s' % (chosenSpec.typeId,)) 

1780 

1781 except KeyError: 

1782 # use base type for codec lookup to recover untagged types 

1783 baseTagSet = tag.TagSet(chosenSpec.tagSet.baseTag, chosenSpec.tagSet.baseTag) 

1784 try: 

1785 # base type or tagged subtype 

1786 concreteDecoder = tagMap[baseTagSet] 

1787 

1788 if LOG: 

1789 LOG('value decoder chosen by base %s' % (baseTagSet,)) 

1790 

1791 except KeyError: 

1792 concreteDecoder = None 

1793 

1794 if concreteDecoder: 

1795 asn1Spec = chosenSpec 

1796 state = stDecodeValue 

1797 

1798 else: 

1799 state = stTryAsExplicitTag 

1800 

1801 else: 

1802 concreteDecoder = None 

1803 state = stTryAsExplicitTag 

1804 

1805 if LOG: 

1806 LOG('codec %s chosen by ASN.1 spec, decoding %s' % (state is stDecodeValue and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag')) 

1807 debug.scope.push(chosenSpec is None and '?' or chosenSpec.__class__.__name__) 

1808 

1809 if state is stDecodeValue: 

1810 if not options.get('recursiveFlag', True) and not substrateFun: # deprecate this 

1811 def substrateFun(asn1Object, _substrate, _length, _options): 

1812 """Legacy hack to keep the recursiveFlag=False option supported. 

1813 

1814 The decode(..., substrateFun=userCallback) option was introduced in 0.1.4 as a generalization 

1815 of the old recursiveFlag=False option. Users should pass their callback instead of using 

1816 recursiveFlag. 

1817 """ 

1818 yield asn1Object 

1819 

1820 original_position = substrate.tell() 

1821 

1822 if length == -1: # indef length 

1823 for value in concreteDecoder.indefLenValueDecoder( 

1824 substrate, asn1Spec, 

1825 tagSet, length, stGetValueDecoder, 

1826 self, substrateFun, **options): 

1827 if isinstance(value, SubstrateUnderrunError): 

1828 yield value 

1829 

1830 else: 

1831 for value in concreteDecoder.valueDecoder( 

1832 substrate, asn1Spec, 

1833 tagSet, length, stGetValueDecoder, 

1834 self, substrateFun, **options): 

1835 if isinstance(value, SubstrateUnderrunError): 

1836 yield value 

1837 

1838 bytesRead = substrate.tell() - original_position 

1839 if not substrateFun and bytesRead != length: 

1840 raise PyAsn1Error( 

1841 "Read %s bytes instead of expected %s." % (bytesRead, length)) 

1842 elif substrateFun and bytesRead > length: 

1843 # custom substrateFun may be used for partial decoding, reading less is expected there 

1844 raise PyAsn1Error( 

1845 "Read %s bytes are more than expected %s." % (bytesRead, length)) 

1846 

1847 if LOG: 

1848 LOG('codec %s yields type %s, value:\n%s\n...' % ( 

1849 concreteDecoder.__class__.__name__, value.__class__.__name__, 

1850 isinstance(value, base.Asn1Item) and value.prettyPrint() or value)) 

1851 

1852 state = stStop 

1853 break 

1854 

1855 if state is stTryAsExplicitTag: 

1856 if (tagSet and 

1857 tagSet[0].tagFormat == tag.tagFormatConstructed and 

1858 tagSet[0].tagClass != tag.tagClassUniversal): 

1859 # Assume explicit tagging 

1860 concreteDecoder = rawPayloadDecoder 

1861 state = stDecodeValue 

1862 

1863 else: 

1864 concreteDecoder = None 

1865 state = self.defaultErrorState 

1866 

1867 if LOG: 

1868 LOG('codec %s chosen, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as failure')) 

1869 

1870 if state is stDumpRawValue: 

1871 concreteDecoder = self.defaultRawDecoder 

1872 

1873 if LOG: 

1874 LOG('codec %s chosen, decoding value' % concreteDecoder.__class__.__name__) 

1875 

1876 state = stDecodeValue 

1877 

1878 if state is stErrorCondition: 

1879 raise error.PyAsn1Error( 

1880 '%s not in asn1Spec: %r' % (tagSet, asn1Spec) 

1881 ) 

1882 

1883 if LOG: 

1884 debug.scope.pop() 

1885 LOG('decoder left scope %s, call completed' % debug.scope) 

1886 

1887 yield value 

1888 

1889 

1890class StreamingDecoder(object): 

1891 """Create an iterator that turns BER/CER/DER byte stream into ASN.1 objects. 

1892 

1893 On each iteration, consume whatever BER/CER/DER serialization is 

1894 available in the `substrate` stream-like object and turns it into 

1895 one or more, possibly nested, ASN.1 objects. 

1896 

1897 Parameters 

1898 ---------- 

1899 substrate: :py:class:`file`, :py:class:`io.BytesIO` 

1900 BER/CER/DER serialization in form of a byte stream 

1901 

1902 Keyword Args 

1903 ------------ 

1904 asn1Spec: :py:class:`~pyasn1.type.base.PyAsn1Item` 

1905 A pyasn1 type object to act as a template guiding the decoder. 

1906 Depending on the ASN.1 structure being decoded, `asn1Spec` may 

1907 or may not be required. One of the reasons why `asn1Spec` may 

1908 me required is that ASN.1 structure is encoded in the *IMPLICIT* 

1909 tagging mode. 

1910 

1911 Yields 

1912 ------ 

1913 : :py:class:`~pyasn1.type.base.PyAsn1Item`, :py:class:`~pyasn1.error.SubstrateUnderrunError` 

1914 Decoded ASN.1 object (possibly, nested) or 

1915 :py:class:`~pyasn1.error.SubstrateUnderrunError` object indicating 

1916 insufficient BER/CER/DER serialization on input to fully recover ASN.1 

1917 objects from it. 

1918  

1919 In the latter case the caller is advised to ensure some more data in 

1920 the input stream, then call the iterator again. The decoder will resume 

1921 the decoding process using the newly arrived data. 

1922 

1923 The `context` property of :py:class:`~pyasn1.error.SubstrateUnderrunError` 

1924 object might hold a reference to the partially populated ASN.1 object 

1925 being reconstructed. 

1926 

1927 Raises 

1928 ------ 

1929 ~pyasn1.error.PyAsn1Error, ~pyasn1.error.EndOfStreamError 

1930 `PyAsn1Error` on deserialization error, `EndOfStreamError` on 

1931 premature stream closure. 

1932 

1933 Examples 

1934 -------- 

1935 Decode BER serialisation without ASN.1 schema 

1936 

1937 .. code-block:: pycon 

1938 

1939 >>> stream = io.BytesIO( 

1940 ... b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03') 

1941 >>> 

1942 >>> for asn1Object in StreamingDecoder(stream): 

1943 ... print(asn1Object) 

1944 >>> 

1945 SequenceOf: 

1946 1 2 3 

1947 

1948 Decode BER serialisation with ASN.1 schema 

1949 

1950 .. code-block:: pycon 

1951 

1952 >>> stream = io.BytesIO( 

1953 ... b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03') 

1954 >>> 

1955 >>> schema = SequenceOf(componentType=Integer()) 

1956 >>> 

1957 >>> decoder = StreamingDecoder(stream, asn1Spec=schema) 

1958 >>> for asn1Object in decoder: 

1959 ... print(asn1Object) 

1960 >>> 

1961 SequenceOf: 

1962 1 2 3 

1963 """ 

1964 

1965 SINGLE_ITEM_DECODER = SingleItemDecoder 

1966 

1967 def __init__(self, substrate, asn1Spec=None, **options): 

1968 self._singleItemDecoder = self.SINGLE_ITEM_DECODER(**options) 

1969 self._substrate = asSeekableStream(substrate) 

1970 self._asn1Spec = asn1Spec 

1971 self._options = options 

1972 

1973 def __iter__(self): 

1974 while True: 

1975 for asn1Object in self._singleItemDecoder( 

1976 self._substrate, self._asn1Spec, **self._options): 

1977 yield asn1Object 

1978 

1979 for chunk in isEndOfStream(self._substrate): 

1980 if isinstance(chunk, SubstrateUnderrunError): 

1981 yield 

1982 

1983 break 

1984 

1985 if chunk: 

1986 break 

1987 

1988 

1989class Decoder(object): 

1990 """Create a BER decoder object. 

1991 

1992 Parse BER/CER/DER octet-stream into one, possibly nested, ASN.1 object. 

1993 """ 

1994 STREAMING_DECODER = StreamingDecoder 

1995 

1996 @classmethod 

1997 def __call__(cls, substrate, asn1Spec=None, **options): 

1998 """Turns BER/CER/DER octet stream into an ASN.1 object. 

1999 

2000 Takes BER/CER/DER octet-stream in form of :py:class:`bytes` 

2001 and decode it into an ASN.1 object 

2002 (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) which 

2003 may be a scalar or an arbitrary nested structure. 

2004 

2005 Parameters 

2006 ---------- 

2007 substrate: :py:class:`bytes` 

2008 BER/CER/DER octet-stream to parse 

2009 

2010 Keyword Args 

2011 ------------ 

2012 asn1Spec: :py:class:`~pyasn1.type.base.PyAsn1Item` 

2013 A pyasn1 type object (:py:class:`~pyasn1.type.base.PyAsn1Item` 

2014 derivative) to act as a template guiding the decoder. 

2015 Depending on the ASN.1 structure being decoded, `asn1Spec` may or 

2016 may not be required. Most common reason for it to require is that 

2017 ASN.1 structure is encoded in *IMPLICIT* tagging mode. 

2018 

2019 substrateFun: :py:class:`Union[ 

2020 Callable[[pyasn1.type.base.PyAsn1Item, bytes, int], 

2021 Tuple[pyasn1.type.base.PyAsn1Item, bytes]], 

2022 Callable[[pyasn1.type.base.PyAsn1Item, io.BytesIO, int, dict], 

2023 Generator[Union[pyasn1.type.base.PyAsn1Item, 

2024 pyasn1.error.SubstrateUnderrunError], 

2025 None, None]] 

2026 ]` 

2027 User callback meant to generalize special use cases like non-recursive or 

2028 partial decoding. A 3-arg non-streaming variant is supported for backwards 

2029 compatiblilty in addition to the newer 4-arg streaming variant. 

2030 The callback will receive the uninitialized object recovered from substrate 

2031 as 1st argument, the uninterpreted payload as 2nd argument, and the length 

2032 of the uninterpreted payload as 3rd argument. The streaming variant will 

2033 additionally receive the decode(..., **options) kwargs as 4th argument. 

2034 The non-streaming variant shall return an object that will be propagated 

2035 as decode() return value as 1st item, and the remainig payload for further 

2036 decode passes as 2nd item. 

2037 The streaming variant shall yield an object that will be propagated as 

2038 decode() return value, and leave the remaining payload in the stream. 

2039 

2040 Returns 

2041 ------- 

2042 : :py:class:`tuple` 

2043 A tuple of :py:class:`~pyasn1.type.base.PyAsn1Item` object 

2044 recovered from BER/CER/DER substrate and the unprocessed trailing 

2045 portion of the `substrate` (may be empty) 

2046 

2047 Raises 

2048 ------ 

2049 : :py:class:`~pyasn1.error.PyAsn1Error` 

2050 :py:class:`~pyasn1.error.SubstrateUnderrunError` on insufficient 

2051 input or :py:class:`~pyasn1.error.PyAsn1Error` on decoding error. 

2052 

2053 Examples 

2054 -------- 

2055 Decode BER/CER/DER serialisation without ASN.1 schema 

2056 

2057 .. code-block:: pycon 

2058 

2059 >>> s, unprocessed = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03') 

2060 >>> str(s) 

2061 SequenceOf: 

2062 1 2 3 

2063 

2064 Decode BER/CER/DER serialisation with ASN.1 schema 

2065 

2066 .. code-block:: pycon 

2067 

2068 >>> seq = SequenceOf(componentType=Integer()) 

2069 >>> s, unprocessed = decode( 

2070 b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03', asn1Spec=seq) 

2071 >>> str(s) 

2072 SequenceOf: 

2073 1 2 3 

2074 

2075 """ 

2076 substrate = asSeekableStream(substrate) 

2077 

2078 if "substrateFun" in options: 

2079 origSubstrateFun = options["substrateFun"] 

2080 

2081 def substrateFunWrapper(asn1Object, substrate, length, options=None): 

2082 """Support both 0.4 and 0.5 style APIs. 

2083 

2084 substrateFun API has changed in 0.5 for use with streaming decoders. To stay backwards compatible, 

2085 we first try if we received a streaming user callback. If that fails,we assume we've received a 

2086 non-streaming v0.4 user callback and convert it for streaming on the fly 

2087 """ 

2088 try: 

2089 substrate_gen = origSubstrateFun(asn1Object, substrate, length, options) 

2090 except TypeError as _value: 

2091 if _value.__traceback__.tb_next: 

2092 # Traceback depth > 1 means TypeError from inside user provided function 

2093 raise 

2094 # invariant maintained at Decoder.__call__ entry 

2095 assert isinstance(substrate, io.BytesIO) # nosec assert_used 

2096 substrate_gen = Decoder._callSubstrateFunV4asV5(origSubstrateFun, asn1Object, substrate, length) 

2097 for value in substrate_gen: 

2098 yield value 

2099 

2100 options["substrateFun"] = substrateFunWrapper 

2101 

2102 streamingDecoder = cls.STREAMING_DECODER( 

2103 substrate, asn1Spec, **options) 

2104 

2105 for asn1Object in streamingDecoder: 

2106 if isinstance(asn1Object, SubstrateUnderrunError): 

2107 raise error.SubstrateUnderrunError('Short substrate on input') 

2108 

2109 try: 

2110 tail = next(readFromStream(substrate)) 

2111 

2112 except error.EndOfStreamError: 

2113 tail = b'' 

2114 

2115 return asn1Object, tail 

2116 

2117 @staticmethod 

2118 def _callSubstrateFunV4asV5(substrateFunV4, asn1Object, substrate, length): 

2119 substrate_bytes = substrate.read() 

2120 if length == -1: 

2121 length = len(substrate_bytes) 

2122 value, nextSubstrate = substrateFunV4(asn1Object, substrate_bytes, length) 

2123 nbytes = substrate.write(nextSubstrate) 

2124 substrate.truncate() 

2125 substrate.seek(-nbytes, os.SEEK_CUR) 

2126 yield value 

2127 

2128#: Turns BER octet stream into an ASN.1 object. 

2129#: 

2130#: Takes BER octet-stream and decode it into an ASN.1 object 

2131#: (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) which 

2132#: may be a scalar or an arbitrary nested structure. 

2133#: 

2134#: Parameters 

2135#: ---------- 

2136#: substrate: :py:class:`bytes` 

2137#: BER octet-stream 

2138#: 

2139#: Keyword Args 

2140#: ------------ 

2141#: asn1Spec: any pyasn1 type object e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative 

2142#: A pyasn1 type object to act as a template guiding the decoder. Depending on the ASN.1 structure 

2143#: being decoded, *asn1Spec* may or may not be required. Most common reason for 

2144#: it to require is that ASN.1 structure is encoded in *IMPLICIT* tagging mode. 

2145#: 

2146#: Returns 

2147#: ------- 

2148#: : :py:class:`tuple` 

2149#: A tuple of pyasn1 object recovered from BER substrate (:py:class:`~pyasn1.type.base.PyAsn1Item` derivative) 

2150#: and the unprocessed trailing portion of the *substrate* (may be empty) 

2151#: 

2152#: Raises 

2153#: ------ 

2154#: ~pyasn1.error.PyAsn1Error, ~pyasn1.error.SubstrateUnderrunError 

2155#: On decoding errors 

2156#: 

2157#: Notes 

2158#: ----- 

2159#: This function is deprecated. Please use :py:class:`Decoder` or 

2160#: :py:class:`StreamingDecoder` class instance. 

2161#: 

2162#: Examples 

2163#: -------- 

2164#: Decode BER serialisation without ASN.1 schema 

2165#: 

2166#: .. code-block:: pycon 

2167#: 

2168#: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03') 

2169#: >>> str(s) 

2170#: SequenceOf: 

2171#: 1 2 3 

2172#: 

2173#: Decode BER serialisation with ASN.1 schema 

2174#: 

2175#: .. code-block:: pycon 

2176#: 

2177#: >>> seq = SequenceOf(componentType=Integer()) 

2178#: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03', asn1Spec=seq) 

2179#: >>> str(s) 

2180#: SequenceOf: 

2181#: 1 2 3 

2182#: 

2183decode = Decoder() 

2184 

2185def __getattr__(attr: str): 

2186 if newAttr := {"tagMap": "TAG_MAP", "typeMap": "TYPE_MAP"}.get(attr): 

2187 warnings.warn(f"{attr} is deprecated. Please use {newAttr} instead.", DeprecationWarning) 

2188 return globals()[newAttr] 

2189 raise AttributeError(attr)