Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pyasn1/codec/ber/decoder.py: 15%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1070 statements  

1# 

2# This file is part of pyasn1 software. 

3# 

4# Copyright (c) 2005-2020, Ilya Etingof <etingof@gmail.com> 

5# License: https://pyasn1.readthedocs.io/en/latest/license.html 

6# 

7import io 

8import os 

9import sys 

10import warnings 

11 

12from pyasn1 import debug 

13from pyasn1 import error 

14from pyasn1.codec.ber import eoo 

15from pyasn1.codec.streaming import asSeekableStream 

16from pyasn1.codec.streaming import isEndOfStream 

17from pyasn1.codec.streaming import peekIntoStream 

18from pyasn1.codec.streaming import readFromStream 

19from pyasn1.compat import _MISSING 

20from pyasn1.error import PyAsn1Error 

21from pyasn1.type import base 

22from pyasn1.type import char 

23from pyasn1.type import tag 

24from pyasn1.type import tagmap 

25from pyasn1.type import univ 

26from pyasn1.type import useful 

27 

28__all__ = ['StreamingDecoder', 'Decoder', 'decode'] 

29 

30LOG = debug.registerLoggee(__name__, flags=debug.DEBUG_DECODER) 

31 

32noValue = base.noValue 

33 

34SubstrateUnderrunError = error.SubstrateUnderrunError 

35 

36# Maximum number of continuation octets (high-bit set) allowed per OID arc. 

37# 20 octets allows up to 140-bit integers, supporting UUID-based OIDs 

38MAX_OID_ARC_CONTINUATION_OCTETS = 20 

39 

40 

41class AbstractPayloadDecoder(object): 

42 protoComponent = None 

43 

44 def valueDecoder(self, substrate, asn1Spec, 

45 tagSet=None, length=None, state=None, 

46 decodeFun=None, substrateFun=None, 

47 **options): 

48 """Decode value with fixed byte length. 

49 

50 The decoder is allowed to consume as many bytes as necessary. 

51 """ 

52 raise error.PyAsn1Error('SingleItemDecoder not implemented for %s' % (tagSet,)) # TODO: Seems more like an NotImplementedError? 

53 

54 def indefLenValueDecoder(self, substrate, asn1Spec, 

55 tagSet=None, length=None, state=None, 

56 decodeFun=None, substrateFun=None, 

57 **options): 

58 """Decode value with undefined length. 

59 

60 The decoder is allowed to consume as many bytes as necessary. 

61 """ 

62 raise error.PyAsn1Error('Indefinite length mode decoder not implemented for %s' % (tagSet,)) # TODO: Seems more like an NotImplementedError? 

63 

64 @staticmethod 

65 def _passAsn1Object(asn1Object, options): 

66 if 'asn1Object' not in options: 

67 options['asn1Object'] = asn1Object 

68 

69 return options 

70 

71 

72class AbstractSimplePayloadDecoder(AbstractPayloadDecoder): 

73 @staticmethod 

74 def substrateCollector(asn1Object, substrate, length, options): 

75 for chunk in readFromStream(substrate, length, options): 

76 yield chunk 

77 

78 def _createComponent(self, asn1Spec, tagSet, value, **options): 

79 if options.get('native'): 

80 return value 

81 elif asn1Spec is None: 

82 return self.protoComponent.clone(value, tagSet=tagSet) 

83 elif value is noValue: 

84 return asn1Spec 

85 else: 

86 return asn1Spec.clone(value) 

87 

88 

89class RawPayloadDecoder(AbstractSimplePayloadDecoder): 

90 protoComponent = univ.Any('') 

91 

92 def valueDecoder(self, substrate, asn1Spec, 

93 tagSet=None, length=None, state=None, 

94 decodeFun=None, substrateFun=None, 

95 **options): 

96 if substrateFun: 

97 asn1Object = self._createComponent(asn1Spec, tagSet, '', **options) 

98 

99 for chunk in substrateFun(asn1Object, substrate, length, options): 

100 yield chunk 

101 

102 return 

103 

104 for value in decodeFun(substrate, asn1Spec, tagSet, length, **options): 

105 yield value 

106 

107 def indefLenValueDecoder(self, substrate, asn1Spec, 

108 tagSet=None, length=None, state=None, 

109 decodeFun=None, substrateFun=None, 

110 **options): 

111 if substrateFun: 

112 asn1Object = self._createComponent(asn1Spec, tagSet, '', **options) 

113 

114 for chunk in substrateFun(asn1Object, substrate, length, options): 

115 yield chunk 

116 

117 return 

118 

119 while True: 

120 for value in decodeFun( 

121 substrate, asn1Spec, tagSet, length, 

122 allowEoo=True, **options): 

123 

124 if value is eoo.endOfOctets: 

125 return 

126 

127 yield value 

128 

129 

130rawPayloadDecoder = RawPayloadDecoder() 

131 

132 

133class IntegerPayloadDecoder(AbstractSimplePayloadDecoder): 

134 protoComponent = univ.Integer(0) 

135 

136 def valueDecoder(self, substrate, asn1Spec, 

137 tagSet=None, length=None, state=None, 

138 decodeFun=None, substrateFun=None, 

139 **options): 

140 

141 if tagSet[0].tagFormat != tag.tagFormatSimple: 

142 raise error.PyAsn1Error('Simple tag format expected') 

143 

144 for chunk in readFromStream(substrate, length, options): 

145 if isinstance(chunk, SubstrateUnderrunError): 

146 yield chunk 

147 

148 if chunk: 

149 value = int.from_bytes(bytes(chunk), 'big', signed=True) 

150 

151 else: 

152 value = 0 

153 

154 yield self._createComponent(asn1Spec, tagSet, value, **options) 

155 

156 

157class BooleanPayloadDecoder(IntegerPayloadDecoder): 

158 protoComponent = univ.Boolean(0) 

159 

160 def _createComponent(self, asn1Spec, tagSet, value, **options): 

161 return IntegerPayloadDecoder._createComponent( 

162 self, asn1Spec, tagSet, value and 1 or 0, **options) 

163 

164 

165class BitStringPayloadDecoder(AbstractSimplePayloadDecoder): 

166 protoComponent = univ.BitString(()) 

167 supportConstructedForm = True 

168 

169 def valueDecoder(self, substrate, asn1Spec, 

170 tagSet=None, length=None, state=None, 

171 decodeFun=None, substrateFun=None, 

172 **options): 

173 

174 if substrateFun: 

175 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options) 

176 

177 for chunk in substrateFun(asn1Object, substrate, length, options): 

178 yield chunk 

179 

180 return 

181 

182 if not length: 

183 raise error.PyAsn1Error('Empty BIT STRING substrate') 

184 

185 for chunk in isEndOfStream(substrate): 

186 if isinstance(chunk, SubstrateUnderrunError): 

187 yield chunk 

188 

189 if chunk: 

190 raise error.PyAsn1Error('Empty BIT STRING substrate') 

191 

192 if tagSet[0].tagFormat == tag.tagFormatSimple: # XXX what tag to check? 

193 

194 for trailingBits in readFromStream(substrate, 1, options): 

195 if isinstance(trailingBits, SubstrateUnderrunError): 

196 yield trailingBits 

197 

198 trailingBits = ord(trailingBits) 

199 if trailingBits > 7: 

200 raise error.PyAsn1Error( 

201 'Trailing bits overflow %s' % trailingBits 

202 ) 

203 

204 for chunk in readFromStream(substrate, length - 1, options): 

205 if isinstance(chunk, SubstrateUnderrunError): 

206 yield chunk 

207 

208 value = self.protoComponent.fromOctetString( 

209 chunk, internalFormat=True, padding=trailingBits) 

210 

211 yield self._createComponent(asn1Spec, tagSet, value, **options) 

212 

213 return 

214 

215 if not self.supportConstructedForm: 

216 raise error.PyAsn1Error('Constructed encoding form prohibited ' 

217 'at %s' % self.__class__.__name__) 

218 

219 if LOG: 

220 LOG('assembling constructed serialization') 

221 

222 # All inner fragments are of the same type, treat them as octet string 

223 substrateFun = self.substrateCollector 

224 

225 bitString = self.protoComponent.fromOctetString(b'', internalFormat=True) 

226 

227 current_position = substrate.tell() 

228 

229 while substrate.tell() - current_position < length: 

230 for component in decodeFun( 

231 substrate, self.protoComponent, substrateFun=substrateFun, 

232 **options): 

233 if isinstance(component, SubstrateUnderrunError): 

234 yield component 

235 

236 trailingBits = component[0] 

237 if trailingBits > 7: 

238 raise error.PyAsn1Error( 

239 'Trailing bits overflow %s' % trailingBits 

240 ) 

241 

242 bitString = self.protoComponent.fromOctetString( 

243 component[1:], internalFormat=True, 

244 prepend=bitString, padding=trailingBits 

245 ) 

246 

247 yield self._createComponent(asn1Spec, tagSet, bitString, **options) 

248 

249 def indefLenValueDecoder(self, substrate, asn1Spec, 

250 tagSet=None, length=None, state=None, 

251 decodeFun=None, substrateFun=None, 

252 **options): 

253 

254 if substrateFun: 

255 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options) 

256 

257 for chunk in substrateFun(asn1Object, substrate, length, options): 

258 yield chunk 

259 

260 return 

261 

262 # All inner fragments are of the same type, treat them as octet string 

263 substrateFun = self.substrateCollector 

264 

265 bitString = self.protoComponent.fromOctetString(b'', internalFormat=True) 

266 

267 while True: # loop over fragments 

268 

269 for component in decodeFun( 

270 substrate, self.protoComponent, substrateFun=substrateFun, 

271 allowEoo=True, **options): 

272 

273 if component is eoo.endOfOctets: 

274 break 

275 

276 if isinstance(component, SubstrateUnderrunError): 

277 yield component 

278 

279 if component is eoo.endOfOctets: 

280 break 

281 

282 trailingBits = component[0] 

283 if trailingBits > 7: 

284 raise error.PyAsn1Error( 

285 'Trailing bits overflow %s' % trailingBits 

286 ) 

287 

288 bitString = self.protoComponent.fromOctetString( 

289 component[1:], internalFormat=True, 

290 prepend=bitString, padding=trailingBits 

291 ) 

292 

293 yield self._createComponent(asn1Spec, tagSet, bitString, **options) 

294 

295 

296class OctetStringPayloadDecoder(AbstractSimplePayloadDecoder): 

297 protoComponent = univ.OctetString('') 

298 supportConstructedForm = True 

299 

300 def valueDecoder(self, substrate, asn1Spec, 

301 tagSet=None, length=None, state=None, 

302 decodeFun=None, substrateFun=None, 

303 **options): 

304 if substrateFun: 

305 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options) 

306 

307 for chunk in substrateFun(asn1Object, substrate, length, options): 

308 yield chunk 

309 

310 return 

311 

312 if tagSet[0].tagFormat == tag.tagFormatSimple: # XXX what tag to check? 

313 for chunk in readFromStream(substrate, length, options): 

314 if isinstance(chunk, SubstrateUnderrunError): 

315 yield chunk 

316 

317 yield self._createComponent(asn1Spec, tagSet, chunk, **options) 

318 

319 return 

320 

321 if not self.supportConstructedForm: 

322 raise error.PyAsn1Error('Constructed encoding form prohibited at %s' % self.__class__.__name__) 

323 

324 if LOG: 

325 LOG('assembling constructed serialization') 

326 

327 # All inner fragments are of the same type, treat them as octet string 

328 substrateFun = self.substrateCollector 

329 

330 header = b'' 

331 

332 original_position = substrate.tell() 

333 # head = popSubstream(substrate, length) 

334 while substrate.tell() - original_position < length: 

335 for component in decodeFun( 

336 substrate, self.protoComponent, substrateFun=substrateFun, 

337 **options): 

338 if isinstance(component, SubstrateUnderrunError): 

339 yield component 

340 

341 header += component 

342 

343 yield self._createComponent(asn1Spec, tagSet, header, **options) 

344 

345 def indefLenValueDecoder(self, substrate, asn1Spec, 

346 tagSet=None, length=None, state=None, 

347 decodeFun=None, substrateFun=None, 

348 **options): 

349 if substrateFun and substrateFun is not self.substrateCollector: 

350 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options) 

351 

352 for chunk in substrateFun(asn1Object, substrate, length, options): 

353 yield chunk 

354 

355 return 

356 

357 # All inner fragments are of the same type, treat them as octet string 

358 substrateFun = self.substrateCollector 

359 

360 header = b'' 

361 

362 while True: # loop over fragments 

363 

364 for component in decodeFun( 

365 substrate, self.protoComponent, substrateFun=substrateFun, 

366 allowEoo=True, **options): 

367 

368 if isinstance(component, SubstrateUnderrunError): 

369 yield component 

370 

371 if component is eoo.endOfOctets: 

372 break 

373 

374 if component is eoo.endOfOctets: 

375 break 

376 

377 header += component 

378 

379 yield self._createComponent(asn1Spec, tagSet, header, **options) 

380 

381 

382class NullPayloadDecoder(AbstractSimplePayloadDecoder): 

383 protoComponent = univ.Null('') 

384 

385 def valueDecoder(self, substrate, asn1Spec, 

386 tagSet=None, length=None, state=None, 

387 decodeFun=None, substrateFun=None, 

388 **options): 

389 

390 if tagSet[0].tagFormat != tag.tagFormatSimple: 

391 raise error.PyAsn1Error('Simple tag format expected') 

392 

393 for chunk in readFromStream(substrate, length, options): 

394 if isinstance(chunk, SubstrateUnderrunError): 

395 yield chunk 

396 

397 component = self._createComponent(asn1Spec, tagSet, '', **options) 

398 

399 if chunk: 

400 raise error.PyAsn1Error('Unexpected %d-octet substrate for Null' % length) 

401 

402 yield component 

403 

404 

405class ObjectIdentifierPayloadDecoder(AbstractSimplePayloadDecoder): 

406 protoComponent = univ.ObjectIdentifier(()) 

407 

408 def valueDecoder(self, substrate, asn1Spec, 

409 tagSet=None, length=None, state=None, 

410 decodeFun=None, substrateFun=None, 

411 **options): 

412 if tagSet[0].tagFormat != tag.tagFormatSimple: 

413 raise error.PyAsn1Error('Simple tag format expected') 

414 

415 for chunk in readFromStream(substrate, length, options): 

416 if isinstance(chunk, SubstrateUnderrunError): 

417 yield chunk 

418 

419 if not chunk: 

420 raise error.PyAsn1Error('Empty substrate') 

421 

422 oid = () 

423 index = 0 

424 substrateLen = len(chunk) 

425 while index < substrateLen: 

426 subId = chunk[index] 

427 index += 1 

428 if subId < 128: 

429 oid += (subId,) 

430 elif subId > 128: 

431 # Construct subid from a number of octets 

432 nextSubId = subId 

433 subId = 0 

434 continuationOctetCount = 0 

435 while nextSubId >= 128: 

436 continuationOctetCount += 1 

437 if continuationOctetCount > MAX_OID_ARC_CONTINUATION_OCTETS: 

438 raise error.PyAsn1Error( 

439 'OID arc exceeds maximum continuation octets limit (%d) ' 

440 'at position %d' % (MAX_OID_ARC_CONTINUATION_OCTETS, index) 

441 ) 

442 subId = (subId << 7) + (nextSubId & 0x7F) 

443 if index >= substrateLen: 

444 raise error.SubstrateUnderrunError( 

445 'Short substrate for sub-OID past %s' % (oid,) 

446 ) 

447 nextSubId = chunk[index] 

448 index += 1 

449 oid += ((subId << 7) + nextSubId,) 

450 elif subId == 128: 

451 # ASN.1 spec forbids leading zeros (0x80) in OID 

452 # encoding, tolerating it opens a vulnerability. See 

453 # https://www.esat.kuleuven.be/cosic/publications/article-1432.pdf 

454 # page 7 

455 raise error.PyAsn1Error('Invalid octet 0x80 in OID encoding') 

456 

457 # Decode two leading arcs 

458 if 0 <= oid[0] <= 39: 

459 oid = (0,) + oid 

460 elif 40 <= oid[0] <= 79: 

461 oid = (1, oid[0] - 40) + oid[1:] 

462 elif oid[0] >= 80: 

463 oid = (2, oid[0] - 80) + oid[1:] 

464 else: 

465 raise error.PyAsn1Error('Malformed first OID octet: %s' % chunk[0]) 

466 

467 yield self._createComponent(asn1Spec, tagSet, oid, **options) 

468 

469 

470class RelativeOIDPayloadDecoder(AbstractSimplePayloadDecoder): 

471 protoComponent = univ.RelativeOID(()) 

472 

473 def valueDecoder(self, substrate, asn1Spec, 

474 tagSet=None, length=None, state=None, 

475 decodeFun=None, substrateFun=None, 

476 **options): 

477 if tagSet[0].tagFormat != tag.tagFormatSimple: 

478 raise error.PyAsn1Error('Simple tag format expected') 

479 

480 for chunk in readFromStream(substrate, length, options): 

481 if isinstance(chunk, SubstrateUnderrunError): 

482 yield chunk 

483 

484 if not chunk: 

485 raise error.PyAsn1Error('Empty substrate') 

486 

487 reloid = () 

488 index = 0 

489 substrateLen = len(chunk) 

490 while index < substrateLen: 

491 subId = chunk[index] 

492 index += 1 

493 if subId < 128: 

494 reloid += (subId,) 

495 elif subId > 128: 

496 # Construct subid from a number of octets 

497 nextSubId = subId 

498 subId = 0 

499 continuationOctetCount = 0 

500 while nextSubId >= 128: 

501 continuationOctetCount += 1 

502 if continuationOctetCount > MAX_OID_ARC_CONTINUATION_OCTETS: 

503 raise error.PyAsn1Error( 

504 'RELATIVE-OID arc exceeds maximum continuation octets limit (%d) ' 

505 'at position %d' % (MAX_OID_ARC_CONTINUATION_OCTETS, index) 

506 ) 

507 subId = (subId << 7) + (nextSubId & 0x7F) 

508 if index >= substrateLen: 

509 raise error.SubstrateUnderrunError( 

510 'Short substrate for sub-OID past %s' % (reloid,) 

511 ) 

512 nextSubId = chunk[index] 

513 index += 1 

514 reloid += ((subId << 7) + nextSubId,) 

515 elif subId == 128: 

516 # ASN.1 spec forbids leading zeros (0x80) in OID 

517 # encoding, tolerating it opens a vulnerability. See 

518 # https://www.esat.kuleuven.be/cosic/publications/article-1432.pdf 

519 # page 7 

520 raise error.PyAsn1Error('Invalid octet 0x80 in RELATIVE-OID encoding') 

521 

522 yield self._createComponent(asn1Spec, tagSet, reloid, **options) 

523 

524 

525class RealPayloadDecoder(AbstractSimplePayloadDecoder): 

526 protoComponent = univ.Real() 

527 

528 def valueDecoder(self, substrate, asn1Spec, 

529 tagSet=None, length=None, state=None, 

530 decodeFun=None, substrateFun=None, 

531 **options): 

532 if tagSet[0].tagFormat != tag.tagFormatSimple: 

533 raise error.PyAsn1Error('Simple tag format expected') 

534 

535 for chunk in readFromStream(substrate, length, options): 

536 if isinstance(chunk, SubstrateUnderrunError): 

537 yield chunk 

538 

539 if not chunk: 

540 yield self._createComponent(asn1Spec, tagSet, 0.0, **options) 

541 return 

542 

543 fo = chunk[0] 

544 chunk = chunk[1:] 

545 if fo & 0x80: # binary encoding 

546 if not chunk: 

547 raise error.PyAsn1Error("Incomplete floating-point value") 

548 

549 if LOG: 

550 LOG('decoding binary encoded REAL') 

551 

552 n = (fo & 0x03) + 1 

553 

554 if n == 4: 

555 n = chunk[0] 

556 chunk = chunk[1:] 

557 

558 eo, chunk = chunk[:n], chunk[n:] 

559 

560 if not eo or not chunk: 

561 raise error.PyAsn1Error('Real exponent screwed') 

562 

563 e = eo[0] & 0x80 and -1 or 0 

564 

565 while eo: # exponent 

566 e <<= 8 

567 e |= eo[0] 

568 eo = eo[1:] 

569 

570 b = fo >> 4 & 0x03 # base bits 

571 

572 if b > 2: 

573 raise error.PyAsn1Error('Illegal Real base') 

574 

575 if b == 1: # encbase = 8 

576 e *= 3 

577 

578 elif b == 2: # encbase = 16 

579 e *= 4 

580 p = 0 

581 

582 while chunk: # value 

583 p <<= 8 

584 p |= chunk[0] 

585 chunk = chunk[1:] 

586 

587 if fo & 0x40: # sign bit 

588 p = -p 

589 

590 sf = fo >> 2 & 0x03 # scale bits 

591 p *= 2 ** sf 

592 value = (p, 2, e) 

593 

594 elif fo & 0x40: # infinite value 

595 if LOG: 

596 LOG('decoding infinite REAL') 

597 

598 value = fo & 0x01 and '-inf' or 'inf' 

599 

600 elif fo & 0xc0 == 0: # character encoding 

601 if not chunk: 

602 raise error.PyAsn1Error("Incomplete floating-point value") 

603 

604 if LOG: 

605 LOG('decoding character encoded REAL') 

606 

607 try: 

608 if fo & 0x3 == 0x1: # NR1 

609 value = (int(chunk), 10, 0) 

610 

611 elif fo & 0x3 == 0x2: # NR2 

612 value = float(chunk) 

613 

614 elif fo & 0x3 == 0x3: # NR3 

615 value = float(chunk) 

616 

617 else: 

618 raise error.SubstrateUnderrunError( 

619 'Unknown NR (tag %s)' % fo 

620 ) 

621 

622 except ValueError: 

623 raise error.SubstrateUnderrunError( 

624 'Bad character Real syntax' 

625 ) 

626 

627 else: 

628 raise error.SubstrateUnderrunError( 

629 'Unknown encoding (tag %s)' % fo 

630 ) 

631 

632 yield self._createComponent(asn1Spec, tagSet, value, **options) 

633 

634 

635class AbstractConstructedPayloadDecoder(AbstractPayloadDecoder): 

636 protoComponent = None 

637 

638 

639class ConstructedPayloadDecoderBase(AbstractConstructedPayloadDecoder): 

640 protoRecordComponent = None 

641 protoSequenceComponent = None 

642 

643 def _getComponentTagMap(self, asn1Object, idx): 

644 raise NotImplementedError 

645 

646 def _getComponentPositionByType(self, asn1Object, tagSet, idx): 

647 raise NotImplementedError 

648 

649 def _decodeComponentsSchemaless( 

650 self, substrate, tagSet=None, decodeFun=None, 

651 length=None, **options): 

652 

653 asn1Object = None 

654 

655 components = [] 

656 componentTypes = set() 

657 

658 original_position = substrate.tell() 

659 

660 while length == -1 or substrate.tell() < original_position + length: 

661 for component in decodeFun(substrate, **options): 

662 if isinstance(component, SubstrateUnderrunError): 

663 yield component 

664 

665 if length == -1 and component is eoo.endOfOctets: 

666 break 

667 

668 components.append(component) 

669 componentTypes.add(component.tagSet) 

670 

671 # Now we have to guess is it SEQUENCE/SET or SEQUENCE OF/SET OF 

672 # The heuristics is: 

673 # * 1+ components of different types -> likely SEQUENCE/SET 

674 # * otherwise -> likely SEQUENCE OF/SET OF 

675 if len(componentTypes) > 1: 

676 protoComponent = self.protoRecordComponent 

677 

678 else: 

679 protoComponent = self.protoSequenceComponent 

680 

681 asn1Object = protoComponent.clone( 

682 # construct tagSet from base tag from prototype ASN.1 object 

683 # and additional tags recovered from the substrate 

684 tagSet=tag.TagSet(protoComponent.tagSet.baseTag, *tagSet.superTags) 

685 ) 

686 

687 if LOG: 

688 LOG('guessed %r container type (pass `asn1Spec` to guide the ' 

689 'decoder)' % asn1Object) 

690 

691 for idx, component in enumerate(components): 

692 asn1Object.setComponentByPosition( 

693 idx, component, 

694 verifyConstraints=False, 

695 matchTags=False, matchConstraints=False 

696 ) 

697 

698 yield asn1Object 

699 

700 def valueDecoder(self, substrate, asn1Spec, 

701 tagSet=None, length=None, state=None, 

702 decodeFun=None, substrateFun=None, 

703 **options): 

704 if tagSet[0].tagFormat != tag.tagFormatConstructed: 

705 raise error.PyAsn1Error('Constructed tag format expected') 

706 

707 original_position = substrate.tell() 

708 

709 if substrateFun: 

710 if asn1Spec is not None: 

711 asn1Object = asn1Spec.clone() 

712 

713 elif self.protoComponent is not None: 

714 asn1Object = self.protoComponent.clone(tagSet=tagSet) 

715 

716 else: 

717 asn1Object = self.protoRecordComponent, self.protoSequenceComponent 

718 

719 for chunk in substrateFun(asn1Object, substrate, length, options): 

720 yield chunk 

721 

722 return 

723 

724 if asn1Spec is None: 

725 for asn1Object in self._decodeComponentsSchemaless( 

726 substrate, tagSet=tagSet, decodeFun=decodeFun, 

727 length=length, **options): 

728 if isinstance(asn1Object, SubstrateUnderrunError): 

729 yield asn1Object 

730 

731 if substrate.tell() < original_position + length: 

732 if LOG: 

733 for trailing in readFromStream(substrate, context=options): 

734 if isinstance(trailing, SubstrateUnderrunError): 

735 yield trailing 

736 

737 LOG('Unused trailing %d octets encountered: %s' % ( 

738 len(trailing), debug.hexdump(trailing))) 

739 

740 yield asn1Object 

741 

742 return 

743 

744 asn1Object = asn1Spec.clone() 

745 asn1Object.clear() 

746 

747 options = self._passAsn1Object(asn1Object, options) 

748 

749 if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId): 

750 

751 namedTypes = asn1Spec.componentType 

752 

753 isSetType = asn1Spec.typeId == univ.Set.typeId 

754 isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault 

755 

756 if LOG: 

757 LOG('decoding %sdeterministic %s type %r chosen by type ID' % ( 

758 not isDeterministic and 'non-' or '', isSetType and 'SET' or '', 

759 asn1Spec)) 

760 

761 seenIndices = set() 

762 idx = 0 

763 while substrate.tell() - original_position < length: 

764 if not namedTypes: 

765 componentType = None 

766 

767 elif isSetType: 

768 componentType = namedTypes.tagMapUnique 

769 

770 else: 

771 try: 

772 if isDeterministic: 

773 componentType = namedTypes[idx].asn1Object 

774 

775 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 

776 componentType = namedTypes.getTagMapNearPosition(idx) 

777 

778 else: 

779 componentType = namedTypes[idx].asn1Object 

780 

781 except IndexError: 

782 raise error.PyAsn1Error( 

783 'Excessive components decoded at %r' % (asn1Spec,) 

784 ) 

785 

786 for component in decodeFun(substrate, componentType, **options): 

787 if isinstance(component, SubstrateUnderrunError): 

788 yield component 

789 

790 if not isDeterministic and namedTypes: 

791 if isSetType: 

792 idx = namedTypes.getPositionByType(component.effectiveTagSet) 

793 

794 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 

795 idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx) 

796 

797 asn1Object.setComponentByPosition( 

798 idx, component, 

799 verifyConstraints=False, 

800 matchTags=False, matchConstraints=False 

801 ) 

802 

803 seenIndices.add(idx) 

804 idx += 1 

805 

806 if LOG: 

807 LOG('seen component indices %s' % seenIndices) 

808 

809 if namedTypes: 

810 if not namedTypes.requiredComponents.issubset(seenIndices): 

811 raise error.PyAsn1Error( 

812 'ASN.1 object %s has uninitialized ' 

813 'components' % asn1Object.__class__.__name__) 

814 

815 if namedTypes.hasOpenTypes: 

816 

817 openTypes = options.get('openTypes', {}) 

818 

819 if LOG: 

820 LOG('user-specified open types map:') 

821 

822 for k, v in openTypes.items(): 

823 LOG('%s -> %r' % (k, v)) 

824 

825 if openTypes or options.get('decodeOpenTypes', False): 

826 

827 for idx, namedType in enumerate(namedTypes.namedTypes): 

828 if not namedType.openType: 

829 continue 

830 

831 if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue: 

832 continue 

833 

834 governingValue = asn1Object.getComponentByName( 

835 namedType.openType.name 

836 ) 

837 

838 try: 

839 openType = openTypes[governingValue] 

840 

841 except KeyError: 

842 

843 if LOG: 

844 LOG('default open types map of component ' 

845 '"%s.%s" governed by component "%s.%s"' 

846 ':' % (asn1Object.__class__.__name__, 

847 namedType.name, 

848 asn1Object.__class__.__name__, 

849 namedType.openType.name)) 

850 

851 for k, v in namedType.openType.items(): 

852 LOG('%s -> %r' % (k, v)) 

853 

854 try: 

855 openType = namedType.openType[governingValue] 

856 

857 except KeyError: 

858 if LOG: 

859 LOG('failed to resolve open type by governing ' 

860 'value %r' % (governingValue,)) 

861 continue 

862 

863 if LOG: 

864 LOG('resolved open type %r by governing ' 

865 'value %r' % (openType, governingValue)) 

866 

867 containerValue = asn1Object.getComponentByPosition(idx) 

868 

869 if containerValue.typeId in ( 

870 univ.SetOf.typeId, univ.SequenceOf.typeId): 

871 

872 for pos, containerElement in enumerate( 

873 containerValue): 

874 

875 stream = asSeekableStream(containerValue[pos].asOctets()) 

876 

877 for component in decodeFun(stream, asn1Spec=openType, **options): 

878 if isinstance(component, SubstrateUnderrunError): 

879 yield component 

880 

881 containerValue[pos] = component 

882 

883 else: 

884 stream = asSeekableStream(asn1Object.getComponentByPosition(idx).asOctets()) 

885 

886 for component in decodeFun(stream, asn1Spec=openType, **options): 

887 if isinstance(component, SubstrateUnderrunError): 

888 yield component 

889 

890 asn1Object.setComponentByPosition(idx, component) 

891 

892 else: 

893 inconsistency = asn1Object.isInconsistent 

894 if inconsistency: 

895 raise error.PyAsn1Error( 

896 f"ASN.1 object {asn1Object.__class__.__name__} is inconsistent") 

897 

898 else: 

899 componentType = asn1Spec.componentType 

900 

901 if LOG: 

902 LOG('decoding type %r chosen by given `asn1Spec`' % componentType) 

903 

904 idx = 0 

905 

906 while substrate.tell() - original_position < length: 

907 for component in decodeFun(substrate, componentType, **options): 

908 if isinstance(component, SubstrateUnderrunError): 

909 yield component 

910 

911 asn1Object.setComponentByPosition( 

912 idx, component, 

913 verifyConstraints=False, 

914 matchTags=False, matchConstraints=False 

915 ) 

916 

917 idx += 1 

918 

919 yield asn1Object 

920 

921 def indefLenValueDecoder(self, substrate, asn1Spec, 

922 tagSet=None, length=None, state=None, 

923 decodeFun=None, substrateFun=None, 

924 **options): 

925 if tagSet[0].tagFormat != tag.tagFormatConstructed: 

926 raise error.PyAsn1Error('Constructed tag format expected') 

927 

928 if substrateFun is not None: 

929 if asn1Spec is not None: 

930 asn1Object = asn1Spec.clone() 

931 

932 elif self.protoComponent is not None: 

933 asn1Object = self.protoComponent.clone(tagSet=tagSet) 

934 

935 else: 

936 asn1Object = self.protoRecordComponent, self.protoSequenceComponent 

937 

938 for chunk in substrateFun(asn1Object, substrate, length, options): 

939 yield chunk 

940 

941 return 

942 

943 if asn1Spec is None: 

944 for asn1Object in self._decodeComponentsSchemaless( 

945 substrate, tagSet=tagSet, decodeFun=decodeFun, 

946 length=length, **dict(options, allowEoo=True)): 

947 if isinstance(asn1Object, SubstrateUnderrunError): 

948 yield asn1Object 

949 

950 yield asn1Object 

951 

952 return 

953 

954 asn1Object = asn1Spec.clone() 

955 asn1Object.clear() 

956 

957 options = self._passAsn1Object(asn1Object, options) 

958 

959 if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId): 

960 

961 namedTypes = asn1Object.componentType 

962 

963 isSetType = asn1Object.typeId == univ.Set.typeId 

964 isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault 

965 

966 if LOG: 

967 LOG('decoding %sdeterministic %s type %r chosen by type ID' % ( 

968 not isDeterministic and 'non-' or '', isSetType and 'SET' or '', 

969 asn1Spec)) 

970 

971 seenIndices = set() 

972 

973 idx = 0 

974 

975 while True: # loop over components 

976 if len(namedTypes) <= idx: 

977 asn1Spec = None 

978 

979 elif isSetType: 

980 asn1Spec = namedTypes.tagMapUnique 

981 

982 else: 

983 try: 

984 if isDeterministic: 

985 asn1Spec = namedTypes[idx].asn1Object 

986 

987 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 

988 asn1Spec = namedTypes.getTagMapNearPosition(idx) 

989 

990 else: 

991 asn1Spec = namedTypes[idx].asn1Object 

992 

993 except IndexError: 

994 raise error.PyAsn1Error( 

995 'Excessive components decoded at %r' % (asn1Object,) 

996 ) 

997 

998 for component in decodeFun(substrate, asn1Spec, allowEoo=True, **options): 

999 

1000 if isinstance(component, SubstrateUnderrunError): 

1001 yield component 

1002 

1003 if component is eoo.endOfOctets: 

1004 break 

1005 

1006 if component is eoo.endOfOctets: 

1007 break 

1008 

1009 if not isDeterministic and namedTypes: 

1010 if isSetType: 

1011 idx = namedTypes.getPositionByType(component.effectiveTagSet) 

1012 

1013 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 

1014 idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx) 

1015 

1016 asn1Object.setComponentByPosition( 

1017 idx, component, 

1018 verifyConstraints=False, 

1019 matchTags=False, matchConstraints=False 

1020 ) 

1021 

1022 seenIndices.add(idx) 

1023 idx += 1 

1024 

1025 if LOG: 

1026 LOG('seen component indices %s' % seenIndices) 

1027 

1028 if namedTypes: 

1029 if not namedTypes.requiredComponents.issubset(seenIndices): 

1030 raise error.PyAsn1Error( 

1031 'ASN.1 object %s has uninitialized ' 

1032 'components' % asn1Object.__class__.__name__) 

1033 

1034 if namedTypes.hasOpenTypes: 

1035 

1036 openTypes = options.get('openTypes', {}) 

1037 

1038 if LOG: 

1039 LOG('user-specified open types map:') 

1040 

1041 for k, v in openTypes.items(): 

1042 LOG('%s -> %r' % (k, v)) 

1043 

1044 if openTypes or options.get('decodeOpenTypes', False): 

1045 

1046 for idx, namedType in enumerate(namedTypes.namedTypes): 

1047 if not namedType.openType: 

1048 continue 

1049 

1050 if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue: 

1051 continue 

1052 

1053 governingValue = asn1Object.getComponentByName( 

1054 namedType.openType.name 

1055 ) 

1056 

1057 try: 

1058 openType = openTypes[governingValue] 

1059 

1060 except KeyError: 

1061 

1062 if LOG: 

1063 LOG('default open types map of component ' 

1064 '"%s.%s" governed by component "%s.%s"' 

1065 ':' % (asn1Object.__class__.__name__, 

1066 namedType.name, 

1067 asn1Object.__class__.__name__, 

1068 namedType.openType.name)) 

1069 

1070 for k, v in namedType.openType.items(): 

1071 LOG('%s -> %r' % (k, v)) 

1072 

1073 try: 

1074 openType = namedType.openType[governingValue] 

1075 

1076 except KeyError: 

1077 if LOG: 

1078 LOG('failed to resolve open type by governing ' 

1079 'value %r' % (governingValue,)) 

1080 continue 

1081 

1082 if LOG: 

1083 LOG('resolved open type %r by governing ' 

1084 'value %r' % (openType, governingValue)) 

1085 

1086 containerValue = asn1Object.getComponentByPosition(idx) 

1087 

1088 if containerValue.typeId in ( 

1089 univ.SetOf.typeId, univ.SequenceOf.typeId): 

1090 

1091 for pos, containerElement in enumerate( 

1092 containerValue): 

1093 

1094 stream = asSeekableStream(containerValue[pos].asOctets()) 

1095 

1096 for component in decodeFun(stream, asn1Spec=openType, 

1097 **dict(options, allowEoo=True)): 

1098 if isinstance(component, SubstrateUnderrunError): 

1099 yield component 

1100 

1101 if component is eoo.endOfOctets: 

1102 break 

1103 

1104 containerValue[pos] = component 

1105 

1106 else: 

1107 stream = asSeekableStream(asn1Object.getComponentByPosition(idx).asOctets()) 

1108 for component in decodeFun(stream, asn1Spec=openType, 

1109 **dict(options, allowEoo=True)): 

1110 if isinstance(component, SubstrateUnderrunError): 

1111 yield component 

1112 

1113 if component is eoo.endOfOctets: 

1114 break 

1115 

1116 asn1Object.setComponentByPosition(idx, component) 

1117 

1118 else: 

1119 inconsistency = asn1Object.isInconsistent 

1120 if inconsistency: 

1121 raise error.PyAsn1Error( 

1122 f"ASN.1 object {asn1Object.__class__.__name__} is inconsistent") 

1123 

1124 else: 

1125 componentType = asn1Spec.componentType 

1126 

1127 if LOG: 

1128 LOG('decoding type %r chosen by given `asn1Spec`' % componentType) 

1129 

1130 idx = 0 

1131 

1132 while True: 

1133 

1134 for component in decodeFun( 

1135 substrate, componentType, allowEoo=True, **options): 

1136 

1137 if isinstance(component, SubstrateUnderrunError): 

1138 yield component 

1139 

1140 if component is eoo.endOfOctets: 

1141 break 

1142 

1143 if component is eoo.endOfOctets: 

1144 break 

1145 

1146 asn1Object.setComponentByPosition( 

1147 idx, component, 

1148 verifyConstraints=False, 

1149 matchTags=False, matchConstraints=False 

1150 ) 

1151 

1152 idx += 1 

1153 

1154 yield asn1Object 

1155 

1156 

1157class SequenceOrSequenceOfPayloadDecoder(ConstructedPayloadDecoderBase): 

1158 protoRecordComponent = univ.Sequence() 

1159 protoSequenceComponent = univ.SequenceOf() 

1160 

1161 

1162class SequencePayloadDecoder(SequenceOrSequenceOfPayloadDecoder): 

1163 protoComponent = univ.Sequence() 

1164 

1165 

1166class SequenceOfPayloadDecoder(SequenceOrSequenceOfPayloadDecoder): 

1167 protoComponent = univ.SequenceOf() 

1168 

1169 

1170class SetOrSetOfPayloadDecoder(ConstructedPayloadDecoderBase): 

1171 protoRecordComponent = univ.Set() 

1172 protoSequenceComponent = univ.SetOf() 

1173 

1174 

1175class SetPayloadDecoder(SetOrSetOfPayloadDecoder): 

1176 protoComponent = univ.Set() 

1177 

1178 

1179class SetOfPayloadDecoder(SetOrSetOfPayloadDecoder): 

1180 protoComponent = univ.SetOf() 

1181 

1182 

1183class ChoicePayloadDecoder(ConstructedPayloadDecoderBase): 

1184 protoComponent = univ.Choice() 

1185 

1186 def valueDecoder(self, substrate, asn1Spec, 

1187 tagSet=None, length=None, state=None, 

1188 decodeFun=None, substrateFun=None, 

1189 **options): 

1190 if asn1Spec is None: 

1191 asn1Object = self.protoComponent.clone(tagSet=tagSet) 

1192 

1193 else: 

1194 asn1Object = asn1Spec.clone() 

1195 

1196 if substrateFun: 

1197 for chunk in substrateFun(asn1Object, substrate, length, options): 

1198 yield chunk 

1199 

1200 return 

1201 

1202 options = self._passAsn1Object(asn1Object, options) 

1203 

1204 if asn1Object.tagSet == tagSet: 

1205 if LOG: 

1206 LOG('decoding %s as explicitly tagged CHOICE' % (tagSet,)) 

1207 

1208 for component in decodeFun( 

1209 substrate, asn1Object.componentTagMap, **options): 

1210 if isinstance(component, SubstrateUnderrunError): 

1211 yield component 

1212 

1213 else: 

1214 if LOG: 

1215 LOG('decoding %s as untagged CHOICE' % (tagSet,)) 

1216 

1217 for component in decodeFun( 

1218 substrate, asn1Object.componentTagMap, tagSet, length, 

1219 state, **options): 

1220 if isinstance(component, SubstrateUnderrunError): 

1221 yield component 

1222 

1223 effectiveTagSet = component.effectiveTagSet 

1224 

1225 if LOG: 

1226 LOG('decoded component %s, effective tag set %s' % (component, effectiveTagSet)) 

1227 

1228 asn1Object.setComponentByType( 

1229 effectiveTagSet, component, 

1230 verifyConstraints=False, 

1231 matchTags=False, matchConstraints=False, 

1232 innerFlag=False 

1233 ) 

1234 

1235 yield asn1Object 

1236 

1237 def indefLenValueDecoder(self, substrate, asn1Spec, 

1238 tagSet=None, length=None, state=None, 

1239 decodeFun=None, substrateFun=None, 

1240 **options): 

1241 if asn1Spec is None: 

1242 asn1Object = self.protoComponent.clone(tagSet=tagSet) 

1243 

1244 else: 

1245 asn1Object = asn1Spec.clone() 

1246 

1247 if substrateFun: 

1248 for chunk in substrateFun(asn1Object, substrate, length, options): 

1249 yield chunk 

1250 

1251 return 

1252 

1253 options = self._passAsn1Object(asn1Object, options) 

1254 

1255 isTagged = asn1Object.tagSet == tagSet 

1256 

1257 if LOG: 

1258 LOG('decoding %s as %stagged CHOICE' % ( 

1259 tagSet, isTagged and 'explicitly ' or 'un')) 

1260 

1261 while True: 

1262 

1263 if isTagged: 

1264 iterator = decodeFun( 

1265 substrate, asn1Object.componentType.tagMapUnique, 

1266 **dict(options, allowEoo=True)) 

1267 

1268 else: 

1269 iterator = decodeFun( 

1270 substrate, asn1Object.componentType.tagMapUnique, 

1271 tagSet, length, state, **dict(options, allowEoo=True)) 

1272 

1273 for component in iterator: 

1274 

1275 if isinstance(component, SubstrateUnderrunError): 

1276 yield component 

1277 

1278 if component is eoo.endOfOctets: 

1279 break 

1280 

1281 effectiveTagSet = component.effectiveTagSet 

1282 

1283 if LOG: 

1284 LOG('decoded component %s, effective tag set ' 

1285 '%s' % (component, effectiveTagSet)) 

1286 

1287 asn1Object.setComponentByType( 

1288 effectiveTagSet, component, 

1289 verifyConstraints=False, 

1290 matchTags=False, matchConstraints=False, 

1291 innerFlag=False 

1292 ) 

1293 

1294 if not isTagged: 

1295 break 

1296 

1297 if not isTagged or component is eoo.endOfOctets: 

1298 break 

1299 

1300 yield asn1Object 

1301 

1302 

1303class AnyPayloadDecoder(AbstractSimplePayloadDecoder): 

1304 protoComponent = univ.Any() 

1305 

1306 def valueDecoder(self, substrate, asn1Spec, 

1307 tagSet=None, length=None, state=None, 

1308 decodeFun=None, substrateFun=None, 

1309 **options): 

1310 if asn1Spec is None: 

1311 isUntagged = True 

1312 

1313 elif asn1Spec.__class__ is tagmap.TagMap: 

1314 isUntagged = tagSet not in asn1Spec.tagMap 

1315 

1316 else: 

1317 isUntagged = tagSet != asn1Spec.tagSet 

1318 

1319 if isUntagged: 

1320 fullPosition = substrate.markedPosition 

1321 currentPosition = substrate.tell() 

1322 

1323 substrate.seek(fullPosition, os.SEEK_SET) 

1324 length += currentPosition - fullPosition 

1325 

1326 if LOG: 

1327 for chunk in peekIntoStream(substrate, length): 

1328 if isinstance(chunk, SubstrateUnderrunError): 

1329 yield chunk 

1330 LOG('decoding as untagged ANY, substrate ' 

1331 '%s' % debug.hexdump(chunk)) 

1332 

1333 if substrateFun: 

1334 for chunk in substrateFun( 

1335 self._createComponent(asn1Spec, tagSet, noValue, **options), 

1336 substrate, length, options): 

1337 yield chunk 

1338 

1339 return 

1340 

1341 for chunk in readFromStream(substrate, length, options): 

1342 if isinstance(chunk, SubstrateUnderrunError): 

1343 yield chunk 

1344 

1345 yield self._createComponent(asn1Spec, tagSet, chunk, **options) 

1346 

1347 def indefLenValueDecoder(self, substrate, asn1Spec, 

1348 tagSet=None, length=None, state=None, 

1349 decodeFun=None, substrateFun=None, 

1350 **options): 

1351 if asn1Spec is None: 

1352 isTagged = False 

1353 

1354 elif asn1Spec.__class__ is tagmap.TagMap: 

1355 isTagged = tagSet in asn1Spec.tagMap 

1356 

1357 else: 

1358 isTagged = tagSet == asn1Spec.tagSet 

1359 

1360 if isTagged: 

1361 # tagged Any type -- consume header substrate 

1362 chunk = b'' 

1363 

1364 if LOG: 

1365 LOG('decoding as tagged ANY') 

1366 

1367 else: 

1368 # TODO: Seems not to be tested 

1369 fullPosition = substrate.markedPosition 

1370 currentPosition = substrate.tell() 

1371 

1372 substrate.seek(fullPosition, os.SEEK_SET) 

1373 for chunk in readFromStream(substrate, currentPosition - fullPosition, options): 

1374 if isinstance(chunk, SubstrateUnderrunError): 

1375 yield chunk 

1376 

1377 if LOG: 

1378 LOG('decoding as untagged ANY, header substrate %s' % debug.hexdump(chunk)) 

1379 

1380 # Any components do not inherit initial tag 

1381 asn1Spec = self.protoComponent 

1382 

1383 if substrateFun and substrateFun is not self.substrateCollector: 

1384 asn1Object = self._createComponent( 

1385 asn1Spec, tagSet, noValue, **options) 

1386 

1387 for chunk in substrateFun( 

1388 asn1Object, chunk + substrate, length + len(chunk), options): 

1389 yield chunk 

1390 

1391 return 

1392 

1393 if LOG: 

1394 LOG('assembling constructed serialization') 

1395 

1396 # All inner fragments are of the same type, treat them as octet string 

1397 substrateFun = self.substrateCollector 

1398 

1399 while True: # loop over fragments 

1400 

1401 for component in decodeFun( 

1402 substrate, asn1Spec, substrateFun=substrateFun, 

1403 allowEoo=True, **options): 

1404 

1405 if isinstance(component, SubstrateUnderrunError): 

1406 yield component 

1407 

1408 if component is eoo.endOfOctets: 

1409 break 

1410 

1411 if component is eoo.endOfOctets: 

1412 break 

1413 

1414 chunk += component 

1415 

1416 if substrateFun: 

1417 yield chunk # TODO: Weird 

1418 

1419 else: 

1420 yield self._createComponent(asn1Spec, tagSet, chunk, **options) 

1421 

1422 

1423# character string types 

1424class UTF8StringPayloadDecoder(OctetStringPayloadDecoder): 

1425 protoComponent = char.UTF8String() 

1426 

1427 

1428class NumericStringPayloadDecoder(OctetStringPayloadDecoder): 

1429 protoComponent = char.NumericString() 

1430 

1431 

1432class PrintableStringPayloadDecoder(OctetStringPayloadDecoder): 

1433 protoComponent = char.PrintableString() 

1434 

1435 

1436class TeletexStringPayloadDecoder(OctetStringPayloadDecoder): 

1437 protoComponent = char.TeletexString() 

1438 

1439 

1440class VideotexStringPayloadDecoder(OctetStringPayloadDecoder): 

1441 protoComponent = char.VideotexString() 

1442 

1443 

1444class IA5StringPayloadDecoder(OctetStringPayloadDecoder): 

1445 protoComponent = char.IA5String() 

1446 

1447 

1448class GraphicStringPayloadDecoder(OctetStringPayloadDecoder): 

1449 protoComponent = char.GraphicString() 

1450 

1451 

1452class VisibleStringPayloadDecoder(OctetStringPayloadDecoder): 

1453 protoComponent = char.VisibleString() 

1454 

1455 

1456class GeneralStringPayloadDecoder(OctetStringPayloadDecoder): 

1457 protoComponent = char.GeneralString() 

1458 

1459 

1460class UniversalStringPayloadDecoder(OctetStringPayloadDecoder): 

1461 protoComponent = char.UniversalString() 

1462 

1463 

1464class BMPStringPayloadDecoder(OctetStringPayloadDecoder): 

1465 protoComponent = char.BMPString() 

1466 

1467 

1468# "useful" types 

1469class ObjectDescriptorPayloadDecoder(OctetStringPayloadDecoder): 

1470 protoComponent = useful.ObjectDescriptor() 

1471 

1472 

1473class GeneralizedTimePayloadDecoder(OctetStringPayloadDecoder): 

1474 protoComponent = useful.GeneralizedTime() 

1475 

1476 

1477class UTCTimePayloadDecoder(OctetStringPayloadDecoder): 

1478 protoComponent = useful.UTCTime() 

1479 

1480 

1481TAG_MAP = { 

1482 univ.Integer.tagSet: IntegerPayloadDecoder(), 

1483 univ.Boolean.tagSet: BooleanPayloadDecoder(), 

1484 univ.BitString.tagSet: BitStringPayloadDecoder(), 

1485 univ.OctetString.tagSet: OctetStringPayloadDecoder(), 

1486 univ.Null.tagSet: NullPayloadDecoder(), 

1487 univ.ObjectIdentifier.tagSet: ObjectIdentifierPayloadDecoder(), 

1488 univ.RelativeOID.tagSet: RelativeOIDPayloadDecoder(), 

1489 univ.Enumerated.tagSet: IntegerPayloadDecoder(), 

1490 univ.Real.tagSet: RealPayloadDecoder(), 

1491 univ.Sequence.tagSet: SequenceOrSequenceOfPayloadDecoder(), # conflicts with SequenceOf 

1492 univ.Set.tagSet: SetOrSetOfPayloadDecoder(), # conflicts with SetOf 

1493 univ.Choice.tagSet: ChoicePayloadDecoder(), # conflicts with Any 

1494 # character string types 

1495 char.UTF8String.tagSet: UTF8StringPayloadDecoder(), 

1496 char.NumericString.tagSet: NumericStringPayloadDecoder(), 

1497 char.PrintableString.tagSet: PrintableStringPayloadDecoder(), 

1498 char.TeletexString.tagSet: TeletexStringPayloadDecoder(), 

1499 char.VideotexString.tagSet: VideotexStringPayloadDecoder(), 

1500 char.IA5String.tagSet: IA5StringPayloadDecoder(), 

1501 char.GraphicString.tagSet: GraphicStringPayloadDecoder(), 

1502 char.VisibleString.tagSet: VisibleStringPayloadDecoder(), 

1503 char.GeneralString.tagSet: GeneralStringPayloadDecoder(), 

1504 char.UniversalString.tagSet: UniversalStringPayloadDecoder(), 

1505 char.BMPString.tagSet: BMPStringPayloadDecoder(), 

1506 # useful types 

1507 useful.ObjectDescriptor.tagSet: ObjectDescriptorPayloadDecoder(), 

1508 useful.GeneralizedTime.tagSet: GeneralizedTimePayloadDecoder(), 

1509 useful.UTCTime.tagSet: UTCTimePayloadDecoder() 

1510} 

1511 

1512# Type-to-codec map for ambiguous ASN.1 types 

1513TYPE_MAP = { 

1514 univ.Set.typeId: SetPayloadDecoder(), 

1515 univ.SetOf.typeId: SetOfPayloadDecoder(), 

1516 univ.Sequence.typeId: SequencePayloadDecoder(), 

1517 univ.SequenceOf.typeId: SequenceOfPayloadDecoder(), 

1518 univ.Choice.typeId: ChoicePayloadDecoder(), 

1519 univ.Any.typeId: AnyPayloadDecoder() 

1520} 

1521 

1522# Put in non-ambiguous types for faster codec lookup 

1523for typeDecoder in TAG_MAP.values(): 

1524 if typeDecoder.protoComponent is not None: 

1525 typeId = typeDecoder.protoComponent.__class__.typeId 

1526 if typeId is not None and typeId not in TYPE_MAP: 

1527 TYPE_MAP[typeId] = typeDecoder 

1528 

1529 

1530(stDecodeTag, 

1531 stDecodeLength, 

1532 stGetValueDecoder, 

1533 stGetValueDecoderByAsn1Spec, 

1534 stGetValueDecoderByTag, 

1535 stTryAsExplicitTag, 

1536 stDecodeValue, 

1537 stDumpRawValue, 

1538 stErrorCondition, 

1539 stStop) = [x for x in range(10)] 

1540 

1541 

1542EOO_SENTINEL = bytes((0, 0)) 

1543 

1544 

1545class SingleItemDecoder(object): 

1546 defaultErrorState = stErrorCondition 

1547 #defaultErrorState = stDumpRawValue 

1548 defaultRawDecoder = AnyPayloadDecoder() 

1549 

1550 supportIndefLength = True 

1551 

1552 TAG_MAP = TAG_MAP 

1553 TYPE_MAP = TYPE_MAP 

1554 

1555 def __init__(self, tagMap=_MISSING, typeMap=_MISSING, **ignored): 

1556 self._tagMap = tagMap if tagMap is not _MISSING else self.TAG_MAP 

1557 self._typeMap = typeMap if typeMap is not _MISSING else self.TYPE_MAP 

1558 

1559 # Tag & TagSet objects caches 

1560 self._tagCache = {} 

1561 self._tagSetCache = {} 

1562 

1563 def __call__(self, substrate, asn1Spec=None, 

1564 tagSet=None, length=None, state=stDecodeTag, 

1565 decodeFun=None, substrateFun=None, 

1566 **options): 

1567 

1568 allowEoo = options.pop('allowEoo', False) 

1569 

1570 if LOG: 

1571 LOG('decoder called at scope %s with state %d, working with up ' 

1572 'to %s octets of substrate: ' 

1573 '%s' % (debug.scope, state, length, substrate)) 

1574 

1575 # Look for end-of-octets sentinel 

1576 if allowEoo and self.supportIndefLength: 

1577 

1578 for eoo_candidate in readFromStream(substrate, 2, options): 

1579 if isinstance(eoo_candidate, SubstrateUnderrunError): 

1580 yield eoo_candidate 

1581 

1582 if eoo_candidate == EOO_SENTINEL: 

1583 if LOG: 

1584 LOG('end-of-octets sentinel found') 

1585 yield eoo.endOfOctets 

1586 return 

1587 

1588 else: 

1589 substrate.seek(-2, os.SEEK_CUR) 

1590 

1591 tagMap = self._tagMap 

1592 typeMap = self._typeMap 

1593 tagCache = self._tagCache 

1594 tagSetCache = self._tagSetCache 

1595 

1596 value = noValue 

1597 

1598 substrate.markedPosition = substrate.tell() 

1599 

1600 while state is not stStop: 

1601 

1602 if state is stDecodeTag: 

1603 # Decode tag 

1604 isShortTag = True 

1605 

1606 for firstByte in readFromStream(substrate, 1, options): 

1607 if isinstance(firstByte, SubstrateUnderrunError): 

1608 yield firstByte 

1609 

1610 firstOctet = ord(firstByte) 

1611 

1612 try: 

1613 lastTag = tagCache[firstOctet] 

1614 

1615 except KeyError: 

1616 integerTag = firstOctet 

1617 tagClass = integerTag & 0xC0 

1618 tagFormat = integerTag & 0x20 

1619 tagId = integerTag & 0x1F 

1620 

1621 if tagId == 0x1F: 

1622 isShortTag = False 

1623 lengthOctetIdx = 0 

1624 tagId = 0 

1625 

1626 while True: 

1627 for integerByte in readFromStream(substrate, 1, options): 

1628 if isinstance(integerByte, SubstrateUnderrunError): 

1629 yield integerByte 

1630 

1631 if not integerByte: 

1632 raise error.SubstrateUnderrunError( 

1633 'Short octet stream on long tag decoding' 

1634 ) 

1635 

1636 integerTag = ord(integerByte) 

1637 lengthOctetIdx += 1 

1638 tagId <<= 7 

1639 tagId |= (integerTag & 0x7F) 

1640 

1641 if not integerTag & 0x80: 

1642 break 

1643 

1644 lastTag = tag.Tag( 

1645 tagClass=tagClass, tagFormat=tagFormat, tagId=tagId 

1646 ) 

1647 

1648 if isShortTag: 

1649 # cache short tags 

1650 tagCache[firstOctet] = lastTag 

1651 

1652 if tagSet is None: 

1653 if isShortTag: 

1654 try: 

1655 tagSet = tagSetCache[firstOctet] 

1656 

1657 except KeyError: 

1658 # base tag not recovered 

1659 tagSet = tag.TagSet((), lastTag) 

1660 tagSetCache[firstOctet] = tagSet 

1661 else: 

1662 tagSet = tag.TagSet((), lastTag) 

1663 

1664 else: 

1665 tagSet = lastTag + tagSet 

1666 

1667 state = stDecodeLength 

1668 

1669 if LOG: 

1670 LOG('tag decoded into %s, decoding length' % tagSet) 

1671 

1672 if state is stDecodeLength: 

1673 # Decode length 

1674 for firstOctet in readFromStream(substrate, 1, options): 

1675 if isinstance(firstOctet, SubstrateUnderrunError): 

1676 yield firstOctet 

1677 

1678 firstOctet = ord(firstOctet) 

1679 

1680 if firstOctet < 128: 

1681 length = firstOctet 

1682 

1683 elif firstOctet > 128: 

1684 size = firstOctet & 0x7F 

1685 # encoded in size bytes 

1686 for encodedLength in readFromStream(substrate, size, options): 

1687 if isinstance(encodedLength, SubstrateUnderrunError): 

1688 yield encodedLength 

1689 encodedLength = list(encodedLength) 

1690 # missing check on maximum size, which shouldn't be a 

1691 # problem, we can handle more than is possible 

1692 if len(encodedLength) != size: 

1693 raise error.SubstrateUnderrunError( 

1694 '%s<%s at %s' % (size, len(encodedLength), tagSet) 

1695 ) 

1696 

1697 length = 0 

1698 for lengthOctet in encodedLength: 

1699 length <<= 8 

1700 length |= lengthOctet 

1701 size += 1 

1702 

1703 else: # 128 means indefinite 

1704 length = -1 

1705 

1706 if length == -1 and not self.supportIndefLength: 

1707 raise error.PyAsn1Error('Indefinite length encoding not supported by this codec') 

1708 

1709 state = stGetValueDecoder 

1710 

1711 if LOG: 

1712 LOG('value length decoded into %d' % length) 

1713 

1714 if state is stGetValueDecoder: 

1715 if asn1Spec is None: 

1716 state = stGetValueDecoderByTag 

1717 

1718 else: 

1719 state = stGetValueDecoderByAsn1Spec 

1720 # 

1721 # There're two ways of creating subtypes in ASN.1 what influences 

1722 # decoder operation. These methods are: 

1723 # 1) Either base types used in or no IMPLICIT tagging has been 

1724 # applied on subtyping. 

1725 # 2) Subtype syntax drops base type information (by means of 

1726 # IMPLICIT tagging. 

1727 # The first case allows for complete tag recovery from substrate 

1728 # while the second one requires original ASN.1 type spec for 

1729 # decoding. 

1730 # 

1731 # In either case a set of tags (tagSet) is coming from substrate 

1732 # in an incremental, tag-by-tag fashion (this is the case of 

1733 # EXPLICIT tag which is most basic). Outermost tag comes first 

1734 # from the wire. 

1735 # 

1736 if state is stGetValueDecoderByTag: 

1737 try: 

1738 concreteDecoder = tagMap[tagSet] 

1739 

1740 except KeyError: 

1741 concreteDecoder = None 

1742 

1743 if concreteDecoder: 

1744 state = stDecodeValue 

1745 

1746 else: 

1747 try: 

1748 concreteDecoder = tagMap[tagSet[:1]] 

1749 

1750 except KeyError: 

1751 concreteDecoder = None 

1752 

1753 if concreteDecoder: 

1754 state = stDecodeValue 

1755 else: 

1756 state = stTryAsExplicitTag 

1757 

1758 if LOG: 

1759 LOG('codec %s chosen by a built-in type, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag')) 

1760 debug.scope.push(concreteDecoder is None and '?' or concreteDecoder.protoComponent.__class__.__name__) 

1761 

1762 if state is stGetValueDecoderByAsn1Spec: 

1763 

1764 if asn1Spec.__class__ is tagmap.TagMap: 

1765 try: 

1766 chosenSpec = asn1Spec[tagSet] 

1767 

1768 except KeyError: 

1769 chosenSpec = None 

1770 

1771 if LOG: 

1772 LOG('candidate ASN.1 spec is a map of:') 

1773 

1774 for firstOctet, v in asn1Spec.presentTypes.items(): 

1775 LOG(' %s -> %s' % (firstOctet, v.__class__.__name__)) 

1776 

1777 if asn1Spec.skipTypes: 

1778 LOG('but neither of: ') 

1779 for firstOctet, v in asn1Spec.skipTypes.items(): 

1780 LOG(' %s -> %s' % (firstOctet, v.__class__.__name__)) 

1781 LOG('new candidate ASN.1 spec is %s, chosen by %s' % (chosenSpec is None and '<none>' or chosenSpec.prettyPrintType(), tagSet)) 

1782 

1783 elif tagSet == asn1Spec.tagSet or tagSet in asn1Spec.tagMap: 

1784 chosenSpec = asn1Spec 

1785 if LOG: 

1786 LOG('candidate ASN.1 spec is %s' % asn1Spec.__class__.__name__) 

1787 

1788 else: 

1789 chosenSpec = None 

1790 

1791 if chosenSpec is not None: 

1792 try: 

1793 # ambiguous type or just faster codec lookup 

1794 concreteDecoder = typeMap[chosenSpec.typeId] 

1795 

1796 if LOG: 

1797 LOG('value decoder chosen for an ambiguous type by type ID %s' % (chosenSpec.typeId,)) 

1798 

1799 except KeyError: 

1800 # use base type for codec lookup to recover untagged types 

1801 baseTagSet = tag.TagSet(chosenSpec.tagSet.baseTag, chosenSpec.tagSet.baseTag) 

1802 try: 

1803 # base type or tagged subtype 

1804 concreteDecoder = tagMap[baseTagSet] 

1805 

1806 if LOG: 

1807 LOG('value decoder chosen by base %s' % (baseTagSet,)) 

1808 

1809 except KeyError: 

1810 concreteDecoder = None 

1811 

1812 if concreteDecoder: 

1813 asn1Spec = chosenSpec 

1814 state = stDecodeValue 

1815 

1816 else: 

1817 state = stTryAsExplicitTag 

1818 

1819 else: 

1820 concreteDecoder = None 

1821 state = stTryAsExplicitTag 

1822 

1823 if LOG: 

1824 LOG('codec %s chosen by ASN.1 spec, decoding %s' % (state is stDecodeValue and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag')) 

1825 debug.scope.push(chosenSpec is None and '?' or chosenSpec.__class__.__name__) 

1826 

1827 if state is stDecodeValue: 

1828 if not options.get('recursiveFlag', True) and not substrateFun: # deprecate this 

1829 def substrateFun(asn1Object, _substrate, _length, _options): 

1830 """Legacy hack to keep the recursiveFlag=False option supported. 

1831 

1832 The decode(..., substrateFun=userCallback) option was introduced in 0.1.4 as a generalization 

1833 of the old recursiveFlag=False option. Users should pass their callback instead of using 

1834 recursiveFlag. 

1835 """ 

1836 yield asn1Object 

1837 

1838 original_position = substrate.tell() 

1839 

1840 if length == -1: # indef length 

1841 for value in concreteDecoder.indefLenValueDecoder( 

1842 substrate, asn1Spec, 

1843 tagSet, length, stGetValueDecoder, 

1844 self, substrateFun, **options): 

1845 if isinstance(value, SubstrateUnderrunError): 

1846 yield value 

1847 

1848 else: 

1849 for value in concreteDecoder.valueDecoder( 

1850 substrate, asn1Spec, 

1851 tagSet, length, stGetValueDecoder, 

1852 self, substrateFun, **options): 

1853 if isinstance(value, SubstrateUnderrunError): 

1854 yield value 

1855 

1856 bytesRead = substrate.tell() - original_position 

1857 if not substrateFun and bytesRead != length: 

1858 raise PyAsn1Error( 

1859 "Read %s bytes instead of expected %s." % (bytesRead, length)) 

1860 elif substrateFun and bytesRead > length: 

1861 # custom substrateFun may be used for partial decoding, reading less is expected there 

1862 raise PyAsn1Error( 

1863 "Read %s bytes are more than expected %s." % (bytesRead, length)) 

1864 

1865 if LOG: 

1866 LOG('codec %s yields type %s, value:\n%s\n...' % ( 

1867 concreteDecoder.__class__.__name__, value.__class__.__name__, 

1868 isinstance(value, base.Asn1Item) and value.prettyPrint() or value)) 

1869 

1870 state = stStop 

1871 break 

1872 

1873 if state is stTryAsExplicitTag: 

1874 if (tagSet and 

1875 tagSet[0].tagFormat == tag.tagFormatConstructed and 

1876 tagSet[0].tagClass != tag.tagClassUniversal): 

1877 # Assume explicit tagging 

1878 concreteDecoder = rawPayloadDecoder 

1879 state = stDecodeValue 

1880 

1881 else: 

1882 concreteDecoder = None 

1883 state = self.defaultErrorState 

1884 

1885 if LOG: 

1886 LOG('codec %s chosen, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as failure')) 

1887 

1888 if state is stDumpRawValue: 

1889 concreteDecoder = self.defaultRawDecoder 

1890 

1891 if LOG: 

1892 LOG('codec %s chosen, decoding value' % concreteDecoder.__class__.__name__) 

1893 

1894 state = stDecodeValue 

1895 

1896 if state is stErrorCondition: 

1897 raise error.PyAsn1Error( 

1898 '%s not in asn1Spec: %r' % (tagSet, asn1Spec) 

1899 ) 

1900 

1901 if LOG: 

1902 debug.scope.pop() 

1903 LOG('decoder left scope %s, call completed' % debug.scope) 

1904 

1905 yield value 

1906 

1907 

1908class StreamingDecoder(object): 

1909 """Create an iterator that turns BER/CER/DER byte stream into ASN.1 objects. 

1910 

1911 On each iteration, consume whatever BER/CER/DER serialization is 

1912 available in the `substrate` stream-like object and turns it into 

1913 one or more, possibly nested, ASN.1 objects. 

1914 

1915 Parameters 

1916 ---------- 

1917 substrate: :py:class:`file`, :py:class:`io.BytesIO` 

1918 BER/CER/DER serialization in form of a byte stream 

1919 

1920 Keyword Args 

1921 ------------ 

1922 asn1Spec: :py:class:`~pyasn1.type.base.PyAsn1Item` 

1923 A pyasn1 type object to act as a template guiding the decoder. 

1924 Depending on the ASN.1 structure being decoded, `asn1Spec` may 

1925 or may not be required. One of the reasons why `asn1Spec` may 

1926 me required is that ASN.1 structure is encoded in the *IMPLICIT* 

1927 tagging mode. 

1928 

1929 Yields 

1930 ------ 

1931 : :py:class:`~pyasn1.type.base.PyAsn1Item`, :py:class:`~pyasn1.error.SubstrateUnderrunError` 

1932 Decoded ASN.1 object (possibly, nested) or 

1933 :py:class:`~pyasn1.error.SubstrateUnderrunError` object indicating 

1934 insufficient BER/CER/DER serialization on input to fully recover ASN.1 

1935 objects from it. 

1936 

1937 In the latter case the caller is advised to ensure some more data in 

1938 the input stream, then call the iterator again. The decoder will resume 

1939 the decoding process using the newly arrived data. 

1940 

1941 The `context` property of :py:class:`~pyasn1.error.SubstrateUnderrunError` 

1942 object might hold a reference to the partially populated ASN.1 object 

1943 being reconstructed. 

1944 

1945 Raises 

1946 ------ 

1947 ~pyasn1.error.PyAsn1Error, ~pyasn1.error.EndOfStreamError 

1948 `PyAsn1Error` on deserialization error, `EndOfStreamError` on 

1949 premature stream closure. 

1950 

1951 Examples 

1952 -------- 

1953 Decode BER serialisation without ASN.1 schema 

1954 

1955 .. code-block:: pycon 

1956 

1957 >>> stream = io.BytesIO( 

1958 ... b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03') 

1959 >>> 

1960 >>> for asn1Object in StreamingDecoder(stream): 

1961 ... print(asn1Object) 

1962 >>> 

1963 SequenceOf: 

1964 1 2 3 

1965 

1966 Decode BER serialisation with ASN.1 schema 

1967 

1968 .. code-block:: pycon 

1969 

1970 >>> stream = io.BytesIO( 

1971 ... b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03') 

1972 >>> 

1973 >>> schema = SequenceOf(componentType=Integer()) 

1974 >>> 

1975 >>> decoder = StreamingDecoder(stream, asn1Spec=schema) 

1976 >>> for asn1Object in decoder: 

1977 ... print(asn1Object) 

1978 >>> 

1979 SequenceOf: 

1980 1 2 3 

1981 """ 

1982 

1983 SINGLE_ITEM_DECODER = SingleItemDecoder 

1984 

1985 def __init__(self, substrate, asn1Spec=None, **options): 

1986 self._singleItemDecoder = self.SINGLE_ITEM_DECODER(**options) 

1987 self._substrate = asSeekableStream(substrate) 

1988 self._asn1Spec = asn1Spec 

1989 self._options = options 

1990 

1991 def __iter__(self): 

1992 while True: 

1993 for asn1Object in self._singleItemDecoder( 

1994 self._substrate, self._asn1Spec, **self._options): 

1995 yield asn1Object 

1996 

1997 for chunk in isEndOfStream(self._substrate): 

1998 if isinstance(chunk, SubstrateUnderrunError): 

1999 yield 

2000 

2001 break 

2002 

2003 if chunk: 

2004 break 

2005 

2006 

2007class Decoder(object): 

2008 """Create a BER decoder object. 

2009 

2010 Parse BER/CER/DER octet-stream into one, possibly nested, ASN.1 object. 

2011 """ 

2012 STREAMING_DECODER = StreamingDecoder 

2013 

2014 @classmethod 

2015 def __call__(cls, substrate, asn1Spec=None, **options): 

2016 """Turns BER/CER/DER octet stream into an ASN.1 object. 

2017 

2018 Takes BER/CER/DER octet-stream in form of :py:class:`bytes` 

2019 and decode it into an ASN.1 object 

2020 (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) which 

2021 may be a scalar or an arbitrary nested structure. 

2022 

2023 Parameters 

2024 ---------- 

2025 substrate: :py:class:`bytes` 

2026 BER/CER/DER octet-stream to parse 

2027 

2028 Keyword Args 

2029 ------------ 

2030 asn1Spec: :py:class:`~pyasn1.type.base.PyAsn1Item` 

2031 A pyasn1 type object (:py:class:`~pyasn1.type.base.PyAsn1Item` 

2032 derivative) to act as a template guiding the decoder. 

2033 Depending on the ASN.1 structure being decoded, `asn1Spec` may or 

2034 may not be required. Most common reason for it to require is that 

2035 ASN.1 structure is encoded in *IMPLICIT* tagging mode. 

2036 

2037 substrateFun: :py:class:`Union[ 

2038 Callable[[pyasn1.type.base.PyAsn1Item, bytes, int], 

2039 Tuple[pyasn1.type.base.PyAsn1Item, bytes]], 

2040 Callable[[pyasn1.type.base.PyAsn1Item, io.BytesIO, int, dict], 

2041 Generator[Union[pyasn1.type.base.PyAsn1Item, 

2042 pyasn1.error.SubstrateUnderrunError], 

2043 None, None]] 

2044 ]` 

2045 User callback meant to generalize special use cases like non-recursive or 

2046 partial decoding. A 3-arg non-streaming variant is supported for backwards 

2047 compatiblilty in addition to the newer 4-arg streaming variant. 

2048 The callback will receive the uninitialized object recovered from substrate 

2049 as 1st argument, the uninterpreted payload as 2nd argument, and the length 

2050 of the uninterpreted payload as 3rd argument. The streaming variant will 

2051 additionally receive the decode(..., **options) kwargs as 4th argument. 

2052 The non-streaming variant shall return an object that will be propagated 

2053 as decode() return value as 1st item, and the remainig payload for further 

2054 decode passes as 2nd item. 

2055 The streaming variant shall yield an object that will be propagated as 

2056 decode() return value, and leave the remaining payload in the stream. 

2057 

2058 Returns 

2059 ------- 

2060 : :py:class:`tuple` 

2061 A tuple of :py:class:`~pyasn1.type.base.PyAsn1Item` object 

2062 recovered from BER/CER/DER substrate and the unprocessed trailing 

2063 portion of the `substrate` (may be empty) 

2064 

2065 Raises 

2066 ------ 

2067 : :py:class:`~pyasn1.error.PyAsn1Error` 

2068 :py:class:`~pyasn1.error.SubstrateUnderrunError` on insufficient 

2069 input or :py:class:`~pyasn1.error.PyAsn1Error` on decoding error. 

2070 

2071 Examples 

2072 -------- 

2073 Decode BER/CER/DER serialisation without ASN.1 schema 

2074 

2075 .. code-block:: pycon 

2076 

2077 >>> s, unprocessed = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03') 

2078 >>> str(s) 

2079 SequenceOf: 

2080 1 2 3 

2081 

2082 Decode BER/CER/DER serialisation with ASN.1 schema 

2083 

2084 .. code-block:: pycon 

2085 

2086 >>> seq = SequenceOf(componentType=Integer()) 

2087 >>> s, unprocessed = decode( 

2088 b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03', asn1Spec=seq) 

2089 >>> str(s) 

2090 SequenceOf: 

2091 1 2 3 

2092 

2093 """ 

2094 substrate = asSeekableStream(substrate) 

2095 

2096 if "substrateFun" in options: 

2097 origSubstrateFun = options["substrateFun"] 

2098 

2099 def substrateFunWrapper(asn1Object, substrate, length, options=None): 

2100 """Support both 0.4 and 0.5 style APIs. 

2101 

2102 substrateFun API has changed in 0.5 for use with streaming decoders. To stay backwards compatible, 

2103 we first try if we received a streaming user callback. If that fails,we assume we've received a 

2104 non-streaming v0.4 user callback and convert it for streaming on the fly 

2105 """ 

2106 try: 

2107 substrate_gen = origSubstrateFun(asn1Object, substrate, length, options) 

2108 except TypeError as _value: 

2109 if _value.__traceback__.tb_next: 

2110 # Traceback depth > 1 means TypeError from inside user provided function 

2111 raise 

2112 # invariant maintained at Decoder.__call__ entry 

2113 assert isinstance(substrate, io.BytesIO) # nosec assert_used 

2114 substrate_gen = Decoder._callSubstrateFunV4asV5(origSubstrateFun, asn1Object, substrate, length) 

2115 for value in substrate_gen: 

2116 yield value 

2117 

2118 options["substrateFun"] = substrateFunWrapper 

2119 

2120 streamingDecoder = cls.STREAMING_DECODER( 

2121 substrate, asn1Spec, **options) 

2122 

2123 for asn1Object in streamingDecoder: 

2124 if isinstance(asn1Object, SubstrateUnderrunError): 

2125 raise error.SubstrateUnderrunError('Short substrate on input') 

2126 

2127 try: 

2128 tail = next(readFromStream(substrate)) 

2129 

2130 except error.EndOfStreamError: 

2131 tail = b'' 

2132 

2133 return asn1Object, tail 

2134 

2135 @staticmethod 

2136 def _callSubstrateFunV4asV5(substrateFunV4, asn1Object, substrate, length): 

2137 substrate_bytes = substrate.read() 

2138 if length == -1: 

2139 length = len(substrate_bytes) 

2140 value, nextSubstrate = substrateFunV4(asn1Object, substrate_bytes, length) 

2141 nbytes = substrate.write(nextSubstrate) 

2142 substrate.truncate() 

2143 substrate.seek(-nbytes, os.SEEK_CUR) 

2144 yield value 

2145 

2146#: Turns BER octet stream into an ASN.1 object. 

2147#: 

2148#: Takes BER octet-stream and decode it into an ASN.1 object 

2149#: (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) which 

2150#: may be a scalar or an arbitrary nested structure. 

2151#: 

2152#: Parameters 

2153#: ---------- 

2154#: substrate: :py:class:`bytes` 

2155#: BER octet-stream 

2156#: 

2157#: Keyword Args 

2158#: ------------ 

2159#: asn1Spec: any pyasn1 type object e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative 

2160#: A pyasn1 type object to act as a template guiding the decoder. Depending on the ASN.1 structure 

2161#: being decoded, *asn1Spec* may or may not be required. Most common reason for 

2162#: it to require is that ASN.1 structure is encoded in *IMPLICIT* tagging mode. 

2163#: 

2164#: Returns 

2165#: ------- 

2166#: : :py:class:`tuple` 

2167#: A tuple of pyasn1 object recovered from BER substrate (:py:class:`~pyasn1.type.base.PyAsn1Item` derivative) 

2168#: and the unprocessed trailing portion of the *substrate* (may be empty) 

2169#: 

2170#: Raises 

2171#: ------ 

2172#: ~pyasn1.error.PyAsn1Error, ~pyasn1.error.SubstrateUnderrunError 

2173#: On decoding errors 

2174#: 

2175#: Notes 

2176#: ----- 

2177#: This function is deprecated. Please use :py:class:`Decoder` or 

2178#: :py:class:`StreamingDecoder` class instance. 

2179#: 

2180#: Examples 

2181#: -------- 

2182#: Decode BER serialisation without ASN.1 schema 

2183#: 

2184#: .. code-block:: pycon 

2185#: 

2186#: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03') 

2187#: >>> str(s) 

2188#: SequenceOf: 

2189#: 1 2 3 

2190#: 

2191#: Decode BER serialisation with ASN.1 schema 

2192#: 

2193#: .. code-block:: pycon 

2194#: 

2195#: >>> seq = SequenceOf(componentType=Integer()) 

2196#: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03', asn1Spec=seq) 

2197#: >>> str(s) 

2198#: SequenceOf: 

2199#: 1 2 3 

2200#: 

2201decode = Decoder() 

2202 

2203def __getattr__(attr: str): 

2204 if newAttr := {"tagMap": "TAG_MAP", "typeMap": "TYPE_MAP"}.get(attr): 

2205 warnings.warn(f"{attr} is deprecated. Please use {newAttr} instead.", DeprecationWarning) 

2206 return globals()[newAttr] 

2207 raise AttributeError(attr)