Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pyasn1/codec/ber/decoder.py: 15%

1020 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:40 +0000

1# 

2# This file is part of pyasn1 software. 

3# 

4# Copyright (c) 2005-2020, Ilya Etingof <etingof@gmail.com> 

5# License: https://pyasn1.readthedocs.io/en/latest/license.html 

6# 

7import io 

8import os 

9import sys 

10 

11 

12from pyasn1 import debug 

13from pyasn1 import error 

14from pyasn1.codec.ber import eoo 

15from pyasn1.codec.streaming import asSeekableStream 

16from pyasn1.codec.streaming import isEndOfStream 

17from pyasn1.codec.streaming import peekIntoStream 

18from pyasn1.codec.streaming import readFromStream 

19from pyasn1.compat import _MISSING 

20from pyasn1.compat.integer import from_bytes 

21from pyasn1.compat.octets import oct2int, octs2ints, ints2octs, null 

22from pyasn1.error import PyAsn1Error 

23from pyasn1.type import base 

24from pyasn1.type import char 

25from pyasn1.type import tag 

26from pyasn1.type import tagmap 

27from pyasn1.type import univ 

28from pyasn1.type import useful 

29 

30__all__ = ['StreamingDecoder', 'Decoder', 'decode'] 

31 

32LOG = debug.registerLoggee(__name__, flags=debug.DEBUG_DECODER) 

33 

34noValue = base.noValue 

35 

36SubstrateUnderrunError = error.SubstrateUnderrunError 

37 

38 

39class AbstractPayloadDecoder(object): 

40 protoComponent = None 

41 

42 def valueDecoder(self, substrate, asn1Spec, 

43 tagSet=None, length=None, state=None, 

44 decodeFun=None, substrateFun=None, 

45 **options): 

46 """Decode value with fixed byte length. 

47 

48 The decoder is allowed to consume as many bytes as necessary. 

49 """ 

50 raise error.PyAsn1Error('SingleItemDecoder not implemented for %s' % (tagSet,)) # TODO: Seems more like an NotImplementedError? 

51 

52 def indefLenValueDecoder(self, substrate, asn1Spec, 

53 tagSet=None, length=None, state=None, 

54 decodeFun=None, substrateFun=None, 

55 **options): 

56 """Decode value with undefined length. 

57 

58 The decoder is allowed to consume as many bytes as necessary. 

59 """ 

60 raise error.PyAsn1Error('Indefinite length mode decoder not implemented for %s' % (tagSet,)) # TODO: Seems more like an NotImplementedError? 

61 

62 @staticmethod 

63 def _passAsn1Object(asn1Object, options): 

64 if 'asn1Object' not in options: 

65 options['asn1Object'] = asn1Object 

66 

67 return options 

68 

69 

70class AbstractSimplePayloadDecoder(AbstractPayloadDecoder): 

71 @staticmethod 

72 def substrateCollector(asn1Object, substrate, length, options): 

73 for chunk in readFromStream(substrate, length, options): 

74 yield chunk 

75 

76 def _createComponent(self, asn1Spec, tagSet, value, **options): 

77 if options.get('native'): 

78 return value 

79 elif asn1Spec is None: 

80 return self.protoComponent.clone(value, tagSet=tagSet) 

81 elif value is noValue: 

82 return asn1Spec 

83 else: 

84 return asn1Spec.clone(value) 

85 

86 

87class RawPayloadDecoder(AbstractSimplePayloadDecoder): 

88 protoComponent = univ.Any('') 

89 

90 def valueDecoder(self, substrate, asn1Spec, 

91 tagSet=None, length=None, state=None, 

92 decodeFun=None, substrateFun=None, 

93 **options): 

94 if substrateFun: 

95 asn1Object = self._createComponent(asn1Spec, tagSet, '', **options) 

96 

97 for chunk in substrateFun(asn1Object, substrate, length, options): 

98 yield chunk 

99 

100 return 

101 

102 for value in decodeFun(substrate, asn1Spec, tagSet, length, **options): 

103 yield value 

104 

105 def indefLenValueDecoder(self, substrate, asn1Spec, 

106 tagSet=None, length=None, state=None, 

107 decodeFun=None, substrateFun=None, 

108 **options): 

109 if substrateFun: 

110 asn1Object = self._createComponent(asn1Spec, tagSet, '', **options) 

111 

112 for chunk in substrateFun(asn1Object, substrate, length, options): 

113 yield chunk 

114 

115 return 

116 

117 while True: 

118 for value in decodeFun( 

119 substrate, asn1Spec, tagSet, length, 

120 allowEoo=True, **options): 

121 

122 if value is eoo.endOfOctets: 

123 return 

124 

125 yield value 

126 

127 

128rawPayloadDecoder = RawPayloadDecoder() 

129 

130 

131class IntegerPayloadDecoder(AbstractSimplePayloadDecoder): 

132 protoComponent = univ.Integer(0) 

133 

134 def valueDecoder(self, substrate, asn1Spec, 

135 tagSet=None, length=None, state=None, 

136 decodeFun=None, substrateFun=None, 

137 **options): 

138 

139 if tagSet[0].tagFormat != tag.tagFormatSimple: 

140 raise error.PyAsn1Error('Simple tag format expected') 

141 

142 for chunk in readFromStream(substrate, length, options): 

143 if isinstance(chunk, SubstrateUnderrunError): 

144 yield chunk 

145 

146 if chunk: 

147 value = from_bytes(chunk, signed=True) 

148 

149 else: 

150 value = 0 

151 

152 yield self._createComponent(asn1Spec, tagSet, value, **options) 

153 

154 

155class BooleanPayloadDecoder(IntegerPayloadDecoder): 

156 protoComponent = univ.Boolean(0) 

157 

158 def _createComponent(self, asn1Spec, tagSet, value, **options): 

159 return IntegerPayloadDecoder._createComponent( 

160 self, asn1Spec, tagSet, value and 1 or 0, **options) 

161 

162 

163class BitStringPayloadDecoder(AbstractSimplePayloadDecoder): 

164 protoComponent = univ.BitString(()) 

165 supportConstructedForm = True 

166 

167 def valueDecoder(self, substrate, asn1Spec, 

168 tagSet=None, length=None, state=None, 

169 decodeFun=None, substrateFun=None, 

170 **options): 

171 

172 if substrateFun: 

173 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options) 

174 

175 for chunk in substrateFun(asn1Object, substrate, length, options): 

176 yield chunk 

177 

178 return 

179 

180 if not length: 

181 raise error.PyAsn1Error('Empty BIT STRING substrate') 

182 

183 for chunk in isEndOfStream(substrate): 

184 if isinstance(chunk, SubstrateUnderrunError): 

185 yield chunk 

186 

187 if chunk: 

188 raise error.PyAsn1Error('Empty BIT STRING substrate') 

189 

190 if tagSet[0].tagFormat == tag.tagFormatSimple: # XXX what tag to check? 

191 

192 for trailingBits in readFromStream(substrate, 1, options): 

193 if isinstance(trailingBits, SubstrateUnderrunError): 

194 yield trailingBits 

195 

196 trailingBits = ord(trailingBits) 

197 if trailingBits > 7: 

198 raise error.PyAsn1Error( 

199 'Trailing bits overflow %s' % trailingBits 

200 ) 

201 

202 for chunk in readFromStream(substrate, length - 1, options): 

203 if isinstance(chunk, SubstrateUnderrunError): 

204 yield chunk 

205 

206 value = self.protoComponent.fromOctetString( 

207 chunk, internalFormat=True, padding=trailingBits) 

208 

209 yield self._createComponent(asn1Spec, tagSet, value, **options) 

210 

211 return 

212 

213 if not self.supportConstructedForm: 

214 raise error.PyAsn1Error('Constructed encoding form prohibited ' 

215 'at %s' % self.__class__.__name__) 

216 

217 if LOG: 

218 LOG('assembling constructed serialization') 

219 

220 # All inner fragments are of the same type, treat them as octet string 

221 substrateFun = self.substrateCollector 

222 

223 bitString = self.protoComponent.fromOctetString(null, internalFormat=True) 

224 

225 current_position = substrate.tell() 

226 

227 while substrate.tell() - current_position < length: 

228 for component in decodeFun( 

229 substrate, self.protoComponent, substrateFun=substrateFun, 

230 **options): 

231 if isinstance(component, SubstrateUnderrunError): 

232 yield component 

233 

234 trailingBits = oct2int(component[0]) 

235 if trailingBits > 7: 

236 raise error.PyAsn1Error( 

237 'Trailing bits overflow %s' % trailingBits 

238 ) 

239 

240 bitString = self.protoComponent.fromOctetString( 

241 component[1:], internalFormat=True, 

242 prepend=bitString, padding=trailingBits 

243 ) 

244 

245 yield self._createComponent(asn1Spec, tagSet, bitString, **options) 

246 

247 def indefLenValueDecoder(self, substrate, asn1Spec, 

248 tagSet=None, length=None, state=None, 

249 decodeFun=None, substrateFun=None, 

250 **options): 

251 

252 if substrateFun: 

253 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options) 

254 

255 for chunk in substrateFun(asn1Object, substrate, length, options): 

256 yield chunk 

257 

258 return 

259 

260 # All inner fragments are of the same type, treat them as octet string 

261 substrateFun = self.substrateCollector 

262 

263 bitString = self.protoComponent.fromOctetString(null, internalFormat=True) 

264 

265 while True: # loop over fragments 

266 

267 for component in decodeFun( 

268 substrate, self.protoComponent, substrateFun=substrateFun, 

269 allowEoo=True, **options): 

270 

271 if component is eoo.endOfOctets: 

272 break 

273 

274 if isinstance(component, SubstrateUnderrunError): 

275 yield component 

276 

277 if component is eoo.endOfOctets: 

278 break 

279 

280 trailingBits = oct2int(component[0]) 

281 if trailingBits > 7: 

282 raise error.PyAsn1Error( 

283 'Trailing bits overflow %s' % trailingBits 

284 ) 

285 

286 bitString = self.protoComponent.fromOctetString( 

287 component[1:], internalFormat=True, 

288 prepend=bitString, padding=trailingBits 

289 ) 

290 

291 yield self._createComponent(asn1Spec, tagSet, bitString, **options) 

292 

293 

294class OctetStringPayloadDecoder(AbstractSimplePayloadDecoder): 

295 protoComponent = univ.OctetString('') 

296 supportConstructedForm = True 

297 

298 def valueDecoder(self, substrate, asn1Spec, 

299 tagSet=None, length=None, state=None, 

300 decodeFun=None, substrateFun=None, 

301 **options): 

302 if substrateFun: 

303 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options) 

304 

305 for chunk in substrateFun(asn1Object, substrate, length, options): 

306 yield chunk 

307 

308 return 

309 

310 if tagSet[0].tagFormat == tag.tagFormatSimple: # XXX what tag to check? 

311 for chunk in readFromStream(substrate, length, options): 

312 if isinstance(chunk, SubstrateUnderrunError): 

313 yield chunk 

314 

315 yield self._createComponent(asn1Spec, tagSet, chunk, **options) 

316 

317 return 

318 

319 if not self.supportConstructedForm: 

320 raise error.PyAsn1Error('Constructed encoding form prohibited at %s' % self.__class__.__name__) 

321 

322 if LOG: 

323 LOG('assembling constructed serialization') 

324 

325 # All inner fragments are of the same type, treat them as octet string 

326 substrateFun = self.substrateCollector 

327 

328 header = null 

329 

330 original_position = substrate.tell() 

331 # head = popSubstream(substrate, length) 

332 while substrate.tell() - original_position < length: 

333 for component in decodeFun( 

334 substrate, self.protoComponent, substrateFun=substrateFun, 

335 **options): 

336 if isinstance(component, SubstrateUnderrunError): 

337 yield component 

338 

339 header += component 

340 

341 yield self._createComponent(asn1Spec, tagSet, header, **options) 

342 

343 def indefLenValueDecoder(self, substrate, asn1Spec, 

344 tagSet=None, length=None, state=None, 

345 decodeFun=None, substrateFun=None, 

346 **options): 

347 if substrateFun and substrateFun is not self.substrateCollector: 

348 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options) 

349 

350 for chunk in substrateFun(asn1Object, substrate, length, options): 

351 yield chunk 

352 

353 return 

354 

355 # All inner fragments are of the same type, treat them as octet string 

356 substrateFun = self.substrateCollector 

357 

358 header = null 

359 

360 while True: # loop over fragments 

361 

362 for component in decodeFun( 

363 substrate, self.protoComponent, substrateFun=substrateFun, 

364 allowEoo=True, **options): 

365 

366 if isinstance(component, SubstrateUnderrunError): 

367 yield component 

368 

369 if component is eoo.endOfOctets: 

370 break 

371 

372 if component is eoo.endOfOctets: 

373 break 

374 

375 header += component 

376 

377 yield self._createComponent(asn1Spec, tagSet, header, **options) 

378 

379 

380class NullPayloadDecoder(AbstractSimplePayloadDecoder): 

381 protoComponent = univ.Null('') 

382 

383 def valueDecoder(self, substrate, asn1Spec, 

384 tagSet=None, length=None, state=None, 

385 decodeFun=None, substrateFun=None, 

386 **options): 

387 

388 if tagSet[0].tagFormat != tag.tagFormatSimple: 

389 raise error.PyAsn1Error('Simple tag format expected') 

390 

391 for chunk in readFromStream(substrate, length, options): 

392 if isinstance(chunk, SubstrateUnderrunError): 

393 yield chunk 

394 

395 component = self._createComponent(asn1Spec, tagSet, '', **options) 

396 

397 if chunk: 

398 raise error.PyAsn1Error('Unexpected %d-octet substrate for Null' % length) 

399 

400 yield component 

401 

402 

403class ObjectIdentifierPayloadDecoder(AbstractSimplePayloadDecoder): 

404 protoComponent = univ.ObjectIdentifier(()) 

405 

406 def valueDecoder(self, substrate, asn1Spec, 

407 tagSet=None, length=None, state=None, 

408 decodeFun=None, substrateFun=None, 

409 **options): 

410 if tagSet[0].tagFormat != tag.tagFormatSimple: 

411 raise error.PyAsn1Error('Simple tag format expected') 

412 

413 for chunk in readFromStream(substrate, length, options): 

414 if isinstance(chunk, SubstrateUnderrunError): 

415 yield chunk 

416 

417 if not chunk: 

418 raise error.PyAsn1Error('Empty substrate') 

419 

420 chunk = octs2ints(chunk) 

421 

422 oid = () 

423 index = 0 

424 substrateLen = len(chunk) 

425 while index < substrateLen: 

426 subId = chunk[index] 

427 index += 1 

428 if subId < 128: 

429 oid += (subId,) 

430 elif subId > 128: 

431 # Construct subid from a number of octets 

432 nextSubId = subId 

433 subId = 0 

434 while nextSubId >= 128: 

435 subId = (subId << 7) + (nextSubId & 0x7F) 

436 if index >= substrateLen: 

437 raise error.SubstrateUnderrunError( 

438 'Short substrate for sub-OID past %s' % (oid,) 

439 ) 

440 nextSubId = chunk[index] 

441 index += 1 

442 oid += ((subId << 7) + nextSubId,) 

443 elif subId == 128: 

444 # ASN.1 spec forbids leading zeros (0x80) in OID 

445 # encoding, tolerating it opens a vulnerability. See 

446 # https://www.esat.kuleuven.be/cosic/publications/article-1432.pdf 

447 # page 7 

448 raise error.PyAsn1Error('Invalid octet 0x80 in OID encoding') 

449 

450 # Decode two leading arcs 

451 if 0 <= oid[0] <= 39: 

452 oid = (0,) + oid 

453 elif 40 <= oid[0] <= 79: 

454 oid = (1, oid[0] - 40) + oid[1:] 

455 elif oid[0] >= 80: 

456 oid = (2, oid[0] - 80) + oid[1:] 

457 else: 

458 raise error.PyAsn1Error('Malformed first OID octet: %s' % chunk[0]) 

459 

460 yield self._createComponent(asn1Spec, tagSet, oid, **options) 

461 

462 

463class RealPayloadDecoder(AbstractSimplePayloadDecoder): 

464 protoComponent = univ.Real() 

465 

466 def valueDecoder(self, substrate, asn1Spec, 

467 tagSet=None, length=None, state=None, 

468 decodeFun=None, substrateFun=None, 

469 **options): 

470 if tagSet[0].tagFormat != tag.tagFormatSimple: 

471 raise error.PyAsn1Error('Simple tag format expected') 

472 

473 for chunk in readFromStream(substrate, length, options): 

474 if isinstance(chunk, SubstrateUnderrunError): 

475 yield chunk 

476 

477 if not chunk: 

478 yield self._createComponent(asn1Spec, tagSet, 0.0, **options) 

479 return 

480 

481 fo = oct2int(chunk[0]) 

482 chunk = chunk[1:] 

483 if fo & 0x80: # binary encoding 

484 if not chunk: 

485 raise error.PyAsn1Error("Incomplete floating-point value") 

486 

487 if LOG: 

488 LOG('decoding binary encoded REAL') 

489 

490 n = (fo & 0x03) + 1 

491 

492 if n == 4: 

493 n = oct2int(chunk[0]) 

494 chunk = chunk[1:] 

495 

496 eo, chunk = chunk[:n], chunk[n:] 

497 

498 if not eo or not chunk: 

499 raise error.PyAsn1Error('Real exponent screwed') 

500 

501 e = oct2int(eo[0]) & 0x80 and -1 or 0 

502 

503 while eo: # exponent 

504 e <<= 8 

505 e |= oct2int(eo[0]) 

506 eo = eo[1:] 

507 

508 b = fo >> 4 & 0x03 # base bits 

509 

510 if b > 2: 

511 raise error.PyAsn1Error('Illegal Real base') 

512 

513 if b == 1: # encbase = 8 

514 e *= 3 

515 

516 elif b == 2: # encbase = 16 

517 e *= 4 

518 p = 0 

519 

520 while chunk: # value 

521 p <<= 8 

522 p |= oct2int(chunk[0]) 

523 chunk = chunk[1:] 

524 

525 if fo & 0x40: # sign bit 

526 p = -p 

527 

528 sf = fo >> 2 & 0x03 # scale bits 

529 p *= 2 ** sf 

530 value = (p, 2, e) 

531 

532 elif fo & 0x40: # infinite value 

533 if LOG: 

534 LOG('decoding infinite REAL') 

535 

536 value = fo & 0x01 and '-inf' or 'inf' 

537 

538 elif fo & 0xc0 == 0: # character encoding 

539 if not chunk: 

540 raise error.PyAsn1Error("Incomplete floating-point value") 

541 

542 if LOG: 

543 LOG('decoding character encoded REAL') 

544 

545 try: 

546 if fo & 0x3 == 0x1: # NR1 

547 value = (int(chunk), 10, 0) 

548 

549 elif fo & 0x3 == 0x2: # NR2 

550 value = float(chunk) 

551 

552 elif fo & 0x3 == 0x3: # NR3 

553 value = float(chunk) 

554 

555 else: 

556 raise error.SubstrateUnderrunError( 

557 'Unknown NR (tag %s)' % fo 

558 ) 

559 

560 except ValueError: 

561 raise error.SubstrateUnderrunError( 

562 'Bad character Real syntax' 

563 ) 

564 

565 else: 

566 raise error.SubstrateUnderrunError( 

567 'Unknown encoding (tag %s)' % fo 

568 ) 

569 

570 yield self._createComponent(asn1Spec, tagSet, value, **options) 

571 

572 

573class AbstractConstructedPayloadDecoder(AbstractPayloadDecoder): 

574 protoComponent = None 

575 

576 

577class ConstructedPayloadDecoderBase(AbstractConstructedPayloadDecoder): 

578 protoRecordComponent = None 

579 protoSequenceComponent = None 

580 

581 def _getComponentTagMap(self, asn1Object, idx): 

582 raise NotImplementedError() 

583 

584 def _getComponentPositionByType(self, asn1Object, tagSet, idx): 

585 raise NotImplementedError() 

586 

587 def _decodeComponentsSchemaless( 

588 self, substrate, tagSet=None, decodeFun=None, 

589 length=None, **options): 

590 

591 asn1Object = None 

592 

593 components = [] 

594 componentTypes = set() 

595 

596 original_position = substrate.tell() 

597 

598 while length == -1 or substrate.tell() < original_position + length: 

599 for component in decodeFun(substrate, **options): 

600 if isinstance(component, SubstrateUnderrunError): 

601 yield component 

602 

603 if length == -1 and component is eoo.endOfOctets: 

604 break 

605 

606 components.append(component) 

607 componentTypes.add(component.tagSet) 

608 

609 # Now we have to guess is it SEQUENCE/SET or SEQUENCE OF/SET OF 

610 # The heuristics is: 

611 # * 1+ components of different types -> likely SEQUENCE/SET 

612 # * otherwise -> likely SEQUENCE OF/SET OF 

613 if len(componentTypes) > 1: 

614 protoComponent = self.protoRecordComponent 

615 

616 else: 

617 protoComponent = self.protoSequenceComponent 

618 

619 asn1Object = protoComponent.clone( 

620 # construct tagSet from base tag from prototype ASN.1 object 

621 # and additional tags recovered from the substrate 

622 tagSet=tag.TagSet(protoComponent.tagSet.baseTag, *tagSet.superTags) 

623 ) 

624 

625 if LOG: 

626 LOG('guessed %r container type (pass `asn1Spec` to guide the ' 

627 'decoder)' % asn1Object) 

628 

629 for idx, component in enumerate(components): 

630 asn1Object.setComponentByPosition( 

631 idx, component, 

632 verifyConstraints=False, 

633 matchTags=False, matchConstraints=False 

634 ) 

635 

636 yield asn1Object 

637 

638 def valueDecoder(self, substrate, asn1Spec, 

639 tagSet=None, length=None, state=None, 

640 decodeFun=None, substrateFun=None, 

641 **options): 

642 if tagSet[0].tagFormat != tag.tagFormatConstructed: 

643 raise error.PyAsn1Error('Constructed tag format expected') 

644 

645 original_position = substrate.tell() 

646 

647 if substrateFun: 

648 if asn1Spec is not None: 

649 asn1Object = asn1Spec.clone() 

650 

651 elif self.protoComponent is not None: 

652 asn1Object = self.protoComponent.clone(tagSet=tagSet) 

653 

654 else: 

655 asn1Object = self.protoRecordComponent, self.protoSequenceComponent 

656 

657 for chunk in substrateFun(asn1Object, substrate, length, options): 

658 yield chunk 

659 

660 return 

661 

662 if asn1Spec is None: 

663 for asn1Object in self._decodeComponentsSchemaless( 

664 substrate, tagSet=tagSet, decodeFun=decodeFun, 

665 length=length, **options): 

666 if isinstance(asn1Object, SubstrateUnderrunError): 

667 yield asn1Object 

668 

669 if substrate.tell() < original_position + length: 

670 if LOG: 

671 for trailing in readFromStream(substrate, context=options): 

672 if isinstance(trailing, SubstrateUnderrunError): 

673 yield trailing 

674 

675 LOG('Unused trailing %d octets encountered: %s' % ( 

676 len(trailing), debug.hexdump(trailing))) 

677 

678 yield asn1Object 

679 

680 return 

681 

682 asn1Object = asn1Spec.clone() 

683 asn1Object.clear() 

684 

685 options = self._passAsn1Object(asn1Object, options) 

686 

687 if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId): 

688 

689 namedTypes = asn1Spec.componentType 

690 

691 isSetType = asn1Spec.typeId == univ.Set.typeId 

692 isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault 

693 

694 if LOG: 

695 LOG('decoding %sdeterministic %s type %r chosen by type ID' % ( 

696 not isDeterministic and 'non-' or '', isSetType and 'SET' or '', 

697 asn1Spec)) 

698 

699 seenIndices = set() 

700 idx = 0 

701 while substrate.tell() - original_position < length: 

702 if not namedTypes: 

703 componentType = None 

704 

705 elif isSetType: 

706 componentType = namedTypes.tagMapUnique 

707 

708 else: 

709 try: 

710 if isDeterministic: 

711 componentType = namedTypes[idx].asn1Object 

712 

713 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 

714 componentType = namedTypes.getTagMapNearPosition(idx) 

715 

716 else: 

717 componentType = namedTypes[idx].asn1Object 

718 

719 except IndexError: 

720 raise error.PyAsn1Error( 

721 'Excessive components decoded at %r' % (asn1Spec,) 

722 ) 

723 

724 for component in decodeFun(substrate, componentType, **options): 

725 if isinstance(component, SubstrateUnderrunError): 

726 yield component 

727 

728 if not isDeterministic and namedTypes: 

729 if isSetType: 

730 idx = namedTypes.getPositionByType(component.effectiveTagSet) 

731 

732 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 

733 idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx) 

734 

735 asn1Object.setComponentByPosition( 

736 idx, component, 

737 verifyConstraints=False, 

738 matchTags=False, matchConstraints=False 

739 ) 

740 

741 seenIndices.add(idx) 

742 idx += 1 

743 

744 if LOG: 

745 LOG('seen component indices %s' % seenIndices) 

746 

747 if namedTypes: 

748 if not namedTypes.requiredComponents.issubset(seenIndices): 

749 raise error.PyAsn1Error( 

750 'ASN.1 object %s has uninitialized ' 

751 'components' % asn1Object.__class__.__name__) 

752 

753 if namedTypes.hasOpenTypes: 

754 

755 openTypes = options.get('openTypes', {}) 

756 

757 if LOG: 

758 LOG('user-specified open types map:') 

759 

760 for k, v in openTypes.items(): 

761 LOG('%s -> %r' % (k, v)) 

762 

763 if openTypes or options.get('decodeOpenTypes', False): 

764 

765 for idx, namedType in enumerate(namedTypes.namedTypes): 

766 if not namedType.openType: 

767 continue 

768 

769 if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue: 

770 continue 

771 

772 governingValue = asn1Object.getComponentByName( 

773 namedType.openType.name 

774 ) 

775 

776 try: 

777 openType = openTypes[governingValue] 

778 

779 except KeyError: 

780 

781 if LOG: 

782 LOG('default open types map of component ' 

783 '"%s.%s" governed by component "%s.%s"' 

784 ':' % (asn1Object.__class__.__name__, 

785 namedType.name, 

786 asn1Object.__class__.__name__, 

787 namedType.openType.name)) 

788 

789 for k, v in namedType.openType.items(): 

790 LOG('%s -> %r' % (k, v)) 

791 

792 try: 

793 openType = namedType.openType[governingValue] 

794 

795 except KeyError: 

796 if LOG: 

797 LOG('failed to resolve open type by governing ' 

798 'value %r' % (governingValue,)) 

799 continue 

800 

801 if LOG: 

802 LOG('resolved open type %r by governing ' 

803 'value %r' % (openType, governingValue)) 

804 

805 containerValue = asn1Object.getComponentByPosition(idx) 

806 

807 if containerValue.typeId in ( 

808 univ.SetOf.typeId, univ.SequenceOf.typeId): 

809 

810 for pos, containerElement in enumerate( 

811 containerValue): 

812 

813 stream = asSeekableStream(containerValue[pos].asOctets()) 

814 

815 for component in decodeFun(stream, asn1Spec=openType, **options): 

816 if isinstance(component, SubstrateUnderrunError): 

817 yield component 

818 

819 containerValue[pos] = component 

820 

821 else: 

822 stream = asSeekableStream(asn1Object.getComponentByPosition(idx).asOctets()) 

823 

824 for component in decodeFun(stream, asn1Spec=openType, **options): 

825 if isinstance(component, SubstrateUnderrunError): 

826 yield component 

827 

828 asn1Object.setComponentByPosition(idx, component) 

829 

830 else: 

831 inconsistency = asn1Object.isInconsistent 

832 if inconsistency: 

833 raise inconsistency 

834 

835 else: 

836 componentType = asn1Spec.componentType 

837 

838 if LOG: 

839 LOG('decoding type %r chosen by given `asn1Spec`' % componentType) 

840 

841 idx = 0 

842 

843 while substrate.tell() - original_position < length: 

844 for component in decodeFun(substrate, componentType, **options): 

845 if isinstance(component, SubstrateUnderrunError): 

846 yield component 

847 

848 asn1Object.setComponentByPosition( 

849 idx, component, 

850 verifyConstraints=False, 

851 matchTags=False, matchConstraints=False 

852 ) 

853 

854 idx += 1 

855 

856 yield asn1Object 

857 

858 def indefLenValueDecoder(self, substrate, asn1Spec, 

859 tagSet=None, length=None, state=None, 

860 decodeFun=None, substrateFun=None, 

861 **options): 

862 if tagSet[0].tagFormat != tag.tagFormatConstructed: 

863 raise error.PyAsn1Error('Constructed tag format expected') 

864 

865 if substrateFun is not None: 

866 if asn1Spec is not None: 

867 asn1Object = asn1Spec.clone() 

868 

869 elif self.protoComponent is not None: 

870 asn1Object = self.protoComponent.clone(tagSet=tagSet) 

871 

872 else: 

873 asn1Object = self.protoRecordComponent, self.protoSequenceComponent 

874 

875 for chunk in substrateFun(asn1Object, substrate, length, options): 

876 yield chunk 

877 

878 return 

879 

880 if asn1Spec is None: 

881 for asn1Object in self._decodeComponentsSchemaless( 

882 substrate, tagSet=tagSet, decodeFun=decodeFun, 

883 length=length, **dict(options, allowEoo=True)): 

884 if isinstance(asn1Object, SubstrateUnderrunError): 

885 yield asn1Object 

886 

887 yield asn1Object 

888 

889 return 

890 

891 asn1Object = asn1Spec.clone() 

892 asn1Object.clear() 

893 

894 options = self._passAsn1Object(asn1Object, options) 

895 

896 if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId): 

897 

898 namedTypes = asn1Object.componentType 

899 

900 isSetType = asn1Object.typeId == univ.Set.typeId 

901 isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault 

902 

903 if LOG: 

904 LOG('decoding %sdeterministic %s type %r chosen by type ID' % ( 

905 not isDeterministic and 'non-' or '', isSetType and 'SET' or '', 

906 asn1Spec)) 

907 

908 seenIndices = set() 

909 

910 idx = 0 

911 

912 while True: # loop over components 

913 if len(namedTypes) <= idx: 

914 asn1Spec = None 

915 

916 elif isSetType: 

917 asn1Spec = namedTypes.tagMapUnique 

918 

919 else: 

920 try: 

921 if isDeterministic: 

922 asn1Spec = namedTypes[idx].asn1Object 

923 

924 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 

925 asn1Spec = namedTypes.getTagMapNearPosition(idx) 

926 

927 else: 

928 asn1Spec = namedTypes[idx].asn1Object 

929 

930 except IndexError: 

931 raise error.PyAsn1Error( 

932 'Excessive components decoded at %r' % (asn1Object,) 

933 ) 

934 

935 for component in decodeFun(substrate, asn1Spec, allowEoo=True, **options): 

936 

937 if isinstance(component, SubstrateUnderrunError): 

938 yield component 

939 

940 if component is eoo.endOfOctets: 

941 break 

942 

943 if component is eoo.endOfOctets: 

944 break 

945 

946 if not isDeterministic and namedTypes: 

947 if isSetType: 

948 idx = namedTypes.getPositionByType(component.effectiveTagSet) 

949 

950 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 

951 idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx) 

952 

953 asn1Object.setComponentByPosition( 

954 idx, component, 

955 verifyConstraints=False, 

956 matchTags=False, matchConstraints=False 

957 ) 

958 

959 seenIndices.add(idx) 

960 idx += 1 

961 

962 if LOG: 

963 LOG('seen component indices %s' % seenIndices) 

964 

965 if namedTypes: 

966 if not namedTypes.requiredComponents.issubset(seenIndices): 

967 raise error.PyAsn1Error( 

968 'ASN.1 object %s has uninitialized ' 

969 'components' % asn1Object.__class__.__name__) 

970 

971 if namedTypes.hasOpenTypes: 

972 

973 openTypes = options.get('openTypes', {}) 

974 

975 if LOG: 

976 LOG('user-specified open types map:') 

977 

978 for k, v in openTypes.items(): 

979 LOG('%s -> %r' % (k, v)) 

980 

981 if openTypes or options.get('decodeOpenTypes', False): 

982 

983 for idx, namedType in enumerate(namedTypes.namedTypes): 

984 if not namedType.openType: 

985 continue 

986 

987 if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue: 

988 continue 

989 

990 governingValue = asn1Object.getComponentByName( 

991 namedType.openType.name 

992 ) 

993 

994 try: 

995 openType = openTypes[governingValue] 

996 

997 except KeyError: 

998 

999 if LOG: 

1000 LOG('default open types map of component ' 

1001 '"%s.%s" governed by component "%s.%s"' 

1002 ':' % (asn1Object.__class__.__name__, 

1003 namedType.name, 

1004 asn1Object.__class__.__name__, 

1005 namedType.openType.name)) 

1006 

1007 for k, v in namedType.openType.items(): 

1008 LOG('%s -> %r' % (k, v)) 

1009 

1010 try: 

1011 openType = namedType.openType[governingValue] 

1012 

1013 except KeyError: 

1014 if LOG: 

1015 LOG('failed to resolve open type by governing ' 

1016 'value %r' % (governingValue,)) 

1017 continue 

1018 

1019 if LOG: 

1020 LOG('resolved open type %r by governing ' 

1021 'value %r' % (openType, governingValue)) 

1022 

1023 containerValue = asn1Object.getComponentByPosition(idx) 

1024 

1025 if containerValue.typeId in ( 

1026 univ.SetOf.typeId, univ.SequenceOf.typeId): 

1027 

1028 for pos, containerElement in enumerate( 

1029 containerValue): 

1030 

1031 stream = asSeekableStream(containerValue[pos].asOctets()) 

1032 

1033 for component in decodeFun(stream, asn1Spec=openType, 

1034 **dict(options, allowEoo=True)): 

1035 if isinstance(component, SubstrateUnderrunError): 

1036 yield component 

1037 

1038 if component is eoo.endOfOctets: 

1039 break 

1040 

1041 containerValue[pos] = component 

1042 

1043 else: 

1044 stream = asSeekableStream(asn1Object.getComponentByPosition(idx).asOctets()) 

1045 for component in decodeFun(stream, asn1Spec=openType, 

1046 **dict(options, allowEoo=True)): 

1047 if isinstance(component, SubstrateUnderrunError): 

1048 yield component 

1049 

1050 if component is eoo.endOfOctets: 

1051 break 

1052 

1053 asn1Object.setComponentByPosition(idx, component) 

1054 

1055 else: 

1056 inconsistency = asn1Object.isInconsistent 

1057 if inconsistency: 

1058 raise inconsistency 

1059 

1060 else: 

1061 componentType = asn1Spec.componentType 

1062 

1063 if LOG: 

1064 LOG('decoding type %r chosen by given `asn1Spec`' % componentType) 

1065 

1066 idx = 0 

1067 

1068 while True: 

1069 

1070 for component in decodeFun( 

1071 substrate, componentType, allowEoo=True, **options): 

1072 

1073 if isinstance(component, SubstrateUnderrunError): 

1074 yield component 

1075 

1076 if component is eoo.endOfOctets: 

1077 break 

1078 

1079 if component is eoo.endOfOctets: 

1080 break 

1081 

1082 asn1Object.setComponentByPosition( 

1083 idx, component, 

1084 verifyConstraints=False, 

1085 matchTags=False, matchConstraints=False 

1086 ) 

1087 

1088 idx += 1 

1089 

1090 yield asn1Object 

1091 

1092 

1093class SequenceOrSequenceOfPayloadDecoder(ConstructedPayloadDecoderBase): 

1094 protoRecordComponent = univ.Sequence() 

1095 protoSequenceComponent = univ.SequenceOf() 

1096 

1097 

1098class SequencePayloadDecoder(SequenceOrSequenceOfPayloadDecoder): 

1099 protoComponent = univ.Sequence() 

1100 

1101 

1102class SequenceOfPayloadDecoder(SequenceOrSequenceOfPayloadDecoder): 

1103 protoComponent = univ.SequenceOf() 

1104 

1105 

1106class SetOrSetOfPayloadDecoder(ConstructedPayloadDecoderBase): 

1107 protoRecordComponent = univ.Set() 

1108 protoSequenceComponent = univ.SetOf() 

1109 

1110 

1111class SetPayloadDecoder(SetOrSetOfPayloadDecoder): 

1112 protoComponent = univ.Set() 

1113 

1114 

1115class SetOfPayloadDecoder(SetOrSetOfPayloadDecoder): 

1116 protoComponent = univ.SetOf() 

1117 

1118 

1119class ChoicePayloadDecoder(ConstructedPayloadDecoderBase): 

1120 protoComponent = univ.Choice() 

1121 

1122 def valueDecoder(self, substrate, asn1Spec, 

1123 tagSet=None, length=None, state=None, 

1124 decodeFun=None, substrateFun=None, 

1125 **options): 

1126 if asn1Spec is None: 

1127 asn1Object = self.protoComponent.clone(tagSet=tagSet) 

1128 

1129 else: 

1130 asn1Object = asn1Spec.clone() 

1131 

1132 if substrateFun: 

1133 for chunk in substrateFun(asn1Object, substrate, length, options): 

1134 yield chunk 

1135 

1136 return 

1137 

1138 options = self._passAsn1Object(asn1Object, options) 

1139 

1140 if asn1Object.tagSet == tagSet: 

1141 if LOG: 

1142 LOG('decoding %s as explicitly tagged CHOICE' % (tagSet,)) 

1143 

1144 for component in decodeFun( 

1145 substrate, asn1Object.componentTagMap, **options): 

1146 if isinstance(component, SubstrateUnderrunError): 

1147 yield component 

1148 

1149 else: 

1150 if LOG: 

1151 LOG('decoding %s as untagged CHOICE' % (tagSet,)) 

1152 

1153 for component in decodeFun( 

1154 substrate, asn1Object.componentTagMap, tagSet, length, 

1155 state, **options): 

1156 if isinstance(component, SubstrateUnderrunError): 

1157 yield component 

1158 

1159 effectiveTagSet = component.effectiveTagSet 

1160 

1161 if LOG: 

1162 LOG('decoded component %s, effective tag set %s' % (component, effectiveTagSet)) 

1163 

1164 asn1Object.setComponentByType( 

1165 effectiveTagSet, component, 

1166 verifyConstraints=False, 

1167 matchTags=False, matchConstraints=False, 

1168 innerFlag=False 

1169 ) 

1170 

1171 yield asn1Object 

1172 

1173 def indefLenValueDecoder(self, substrate, asn1Spec, 

1174 tagSet=None, length=None, state=None, 

1175 decodeFun=None, substrateFun=None, 

1176 **options): 

1177 if asn1Spec is None: 

1178 asn1Object = self.protoComponent.clone(tagSet=tagSet) 

1179 

1180 else: 

1181 asn1Object = asn1Spec.clone() 

1182 

1183 if substrateFun: 

1184 for chunk in substrateFun(asn1Object, substrate, length, options): 

1185 yield chunk 

1186 

1187 return 

1188 

1189 options = self._passAsn1Object(asn1Object, options) 

1190 

1191 isTagged = asn1Object.tagSet == tagSet 

1192 

1193 if LOG: 

1194 LOG('decoding %s as %stagged CHOICE' % ( 

1195 tagSet, isTagged and 'explicitly ' or 'un')) 

1196 

1197 while True: 

1198 

1199 if isTagged: 

1200 iterator = decodeFun( 

1201 substrate, asn1Object.componentType.tagMapUnique, 

1202 **dict(options, allowEoo=True)) 

1203 

1204 else: 

1205 iterator = decodeFun( 

1206 substrate, asn1Object.componentType.tagMapUnique, 

1207 tagSet, length, state, **dict(options, allowEoo=True)) 

1208 

1209 for component in iterator: 

1210 

1211 if isinstance(component, SubstrateUnderrunError): 

1212 yield component 

1213 

1214 if component is eoo.endOfOctets: 

1215 break 

1216 

1217 effectiveTagSet = component.effectiveTagSet 

1218 

1219 if LOG: 

1220 LOG('decoded component %s, effective tag set ' 

1221 '%s' % (component, effectiveTagSet)) 

1222 

1223 asn1Object.setComponentByType( 

1224 effectiveTagSet, component, 

1225 verifyConstraints=False, 

1226 matchTags=False, matchConstraints=False, 

1227 innerFlag=False 

1228 ) 

1229 

1230 if not isTagged: 

1231 break 

1232 

1233 if not isTagged or component is eoo.endOfOctets: 

1234 break 

1235 

1236 yield asn1Object 

1237 

1238 

1239class AnyPayloadDecoder(AbstractSimplePayloadDecoder): 

1240 protoComponent = univ.Any() 

1241 

1242 def valueDecoder(self, substrate, asn1Spec, 

1243 tagSet=None, length=None, state=None, 

1244 decodeFun=None, substrateFun=None, 

1245 **options): 

1246 if asn1Spec is None: 

1247 isUntagged = True 

1248 

1249 elif asn1Spec.__class__ is tagmap.TagMap: 

1250 isUntagged = tagSet not in asn1Spec.tagMap 

1251 

1252 else: 

1253 isUntagged = tagSet != asn1Spec.tagSet 

1254 

1255 if isUntagged: 

1256 fullPosition = substrate.markedPosition 

1257 currentPosition = substrate.tell() 

1258 

1259 substrate.seek(fullPosition, os.SEEK_SET) 

1260 length += currentPosition - fullPosition 

1261 

1262 if LOG: 

1263 for chunk in peekIntoStream(substrate, length): 

1264 if isinstance(chunk, SubstrateUnderrunError): 

1265 yield chunk 

1266 LOG('decoding as untagged ANY, substrate ' 

1267 '%s' % debug.hexdump(chunk)) 

1268 

1269 if substrateFun: 

1270 for chunk in substrateFun( 

1271 self._createComponent(asn1Spec, tagSet, noValue, **options), 

1272 substrate, length, options): 

1273 yield chunk 

1274 

1275 return 

1276 

1277 for chunk in readFromStream(substrate, length, options): 

1278 if isinstance(chunk, SubstrateUnderrunError): 

1279 yield chunk 

1280 

1281 yield self._createComponent(asn1Spec, tagSet, chunk, **options) 

1282 

1283 def indefLenValueDecoder(self, substrate, asn1Spec, 

1284 tagSet=None, length=None, state=None, 

1285 decodeFun=None, substrateFun=None, 

1286 **options): 

1287 if asn1Spec is None: 

1288 isTagged = False 

1289 

1290 elif asn1Spec.__class__ is tagmap.TagMap: 

1291 isTagged = tagSet in asn1Spec.tagMap 

1292 

1293 else: 

1294 isTagged = tagSet == asn1Spec.tagSet 

1295 

1296 if isTagged: 

1297 # tagged Any type -- consume header substrate 

1298 chunk = null 

1299 

1300 if LOG: 

1301 LOG('decoding as tagged ANY') 

1302 

1303 else: 

1304 # TODO: Seems not to be tested 

1305 fullPosition = substrate.markedPosition 

1306 currentPosition = substrate.tell() 

1307 

1308 substrate.seek(fullPosition, os.SEEK_SET) 

1309 for chunk in readFromStream(substrate, currentPosition - fullPosition, options): 

1310 if isinstance(chunk, SubstrateUnderrunError): 

1311 yield chunk 

1312 

1313 if LOG: 

1314 LOG('decoding as untagged ANY, header substrate %s' % debug.hexdump(chunk)) 

1315 

1316 # Any components do not inherit initial tag 

1317 asn1Spec = self.protoComponent 

1318 

1319 if substrateFun and substrateFun is not self.substrateCollector: 

1320 asn1Object = self._createComponent( 

1321 asn1Spec, tagSet, noValue, **options) 

1322 

1323 for chunk in substrateFun( 

1324 asn1Object, chunk + substrate, length + len(chunk), options): 

1325 yield chunk 

1326 

1327 return 

1328 

1329 if LOG: 

1330 LOG('assembling constructed serialization') 

1331 

1332 # All inner fragments are of the same type, treat them as octet string 

1333 substrateFun = self.substrateCollector 

1334 

1335 while True: # loop over fragments 

1336 

1337 for component in decodeFun( 

1338 substrate, asn1Spec, substrateFun=substrateFun, 

1339 allowEoo=True, **options): 

1340 

1341 if isinstance(component, SubstrateUnderrunError): 

1342 yield component 

1343 

1344 if component is eoo.endOfOctets: 

1345 break 

1346 

1347 if component is eoo.endOfOctets: 

1348 break 

1349 

1350 chunk += component 

1351 

1352 if substrateFun: 

1353 yield chunk # TODO: Weird 

1354 

1355 else: 

1356 yield self._createComponent(asn1Spec, tagSet, chunk, **options) 

1357 

1358 

1359# character string types 

1360class UTF8StringPayloadDecoder(OctetStringPayloadDecoder): 

1361 protoComponent = char.UTF8String() 

1362 

1363 

1364class NumericStringPayloadDecoder(OctetStringPayloadDecoder): 

1365 protoComponent = char.NumericString() 

1366 

1367 

1368class PrintableStringPayloadDecoder(OctetStringPayloadDecoder): 

1369 protoComponent = char.PrintableString() 

1370 

1371 

1372class TeletexStringPayloadDecoder(OctetStringPayloadDecoder): 

1373 protoComponent = char.TeletexString() 

1374 

1375 

1376class VideotexStringPayloadDecoder(OctetStringPayloadDecoder): 

1377 protoComponent = char.VideotexString() 

1378 

1379 

1380class IA5StringPayloadDecoder(OctetStringPayloadDecoder): 

1381 protoComponent = char.IA5String() 

1382 

1383 

1384class GraphicStringPayloadDecoder(OctetStringPayloadDecoder): 

1385 protoComponent = char.GraphicString() 

1386 

1387 

1388class VisibleStringPayloadDecoder(OctetStringPayloadDecoder): 

1389 protoComponent = char.VisibleString() 

1390 

1391 

1392class GeneralStringPayloadDecoder(OctetStringPayloadDecoder): 

1393 protoComponent = char.GeneralString() 

1394 

1395 

1396class UniversalStringPayloadDecoder(OctetStringPayloadDecoder): 

1397 protoComponent = char.UniversalString() 

1398 

1399 

1400class BMPStringPayloadDecoder(OctetStringPayloadDecoder): 

1401 protoComponent = char.BMPString() 

1402 

1403 

1404# "useful" types 

1405class ObjectDescriptorPayloadDecoder(OctetStringPayloadDecoder): 

1406 protoComponent = useful.ObjectDescriptor() 

1407 

1408 

1409class GeneralizedTimePayloadDecoder(OctetStringPayloadDecoder): 

1410 protoComponent = useful.GeneralizedTime() 

1411 

1412 

1413class UTCTimePayloadDecoder(OctetStringPayloadDecoder): 

1414 protoComponent = useful.UTCTime() 

1415 

1416 

1417TAG_MAP = { 

1418 univ.Integer.tagSet: IntegerPayloadDecoder(), 

1419 univ.Boolean.tagSet: BooleanPayloadDecoder(), 

1420 univ.BitString.tagSet: BitStringPayloadDecoder(), 

1421 univ.OctetString.tagSet: OctetStringPayloadDecoder(), 

1422 univ.Null.tagSet: NullPayloadDecoder(), 

1423 univ.ObjectIdentifier.tagSet: ObjectIdentifierPayloadDecoder(), 

1424 univ.Enumerated.tagSet: IntegerPayloadDecoder(), 

1425 univ.Real.tagSet: RealPayloadDecoder(), 

1426 univ.Sequence.tagSet: SequenceOrSequenceOfPayloadDecoder(), # conflicts with SequenceOf 

1427 univ.Set.tagSet: SetOrSetOfPayloadDecoder(), # conflicts with SetOf 

1428 univ.Choice.tagSet: ChoicePayloadDecoder(), # conflicts with Any 

1429 # character string types 

1430 char.UTF8String.tagSet: UTF8StringPayloadDecoder(), 

1431 char.NumericString.tagSet: NumericStringPayloadDecoder(), 

1432 char.PrintableString.tagSet: PrintableStringPayloadDecoder(), 

1433 char.TeletexString.tagSet: TeletexStringPayloadDecoder(), 

1434 char.VideotexString.tagSet: VideotexStringPayloadDecoder(), 

1435 char.IA5String.tagSet: IA5StringPayloadDecoder(), 

1436 char.GraphicString.tagSet: GraphicStringPayloadDecoder(), 

1437 char.VisibleString.tagSet: VisibleStringPayloadDecoder(), 

1438 char.GeneralString.tagSet: GeneralStringPayloadDecoder(), 

1439 char.UniversalString.tagSet: UniversalStringPayloadDecoder(), 

1440 char.BMPString.tagSet: BMPStringPayloadDecoder(), 

1441 # useful types 

1442 useful.ObjectDescriptor.tagSet: ObjectDescriptorPayloadDecoder(), 

1443 useful.GeneralizedTime.tagSet: GeneralizedTimePayloadDecoder(), 

1444 useful.UTCTime.tagSet: UTCTimePayloadDecoder() 

1445} 

1446 

1447# Type-to-codec map for ambiguous ASN.1 types 

1448TYPE_MAP = { 

1449 univ.Set.typeId: SetPayloadDecoder(), 

1450 univ.SetOf.typeId: SetOfPayloadDecoder(), 

1451 univ.Sequence.typeId: SequencePayloadDecoder(), 

1452 univ.SequenceOf.typeId: SequenceOfPayloadDecoder(), 

1453 univ.Choice.typeId: ChoicePayloadDecoder(), 

1454 univ.Any.typeId: AnyPayloadDecoder() 

1455} 

1456 

1457# deprecated aliases, https://github.com/pyasn1/pyasn1/issues/9 

1458tagMap = TAG_MAP 

1459typeMap = TYPE_MAP 

1460 

1461# Put in non-ambiguous types for faster codec lookup 

1462for typeDecoder in TAG_MAP.values(): 

1463 if typeDecoder.protoComponent is not None: 

1464 typeId = typeDecoder.protoComponent.__class__.typeId 

1465 if typeId is not None and typeId not in TYPE_MAP: 

1466 TYPE_MAP[typeId] = typeDecoder 

1467 

1468 

1469(stDecodeTag, 

1470 stDecodeLength, 

1471 stGetValueDecoder, 

1472 stGetValueDecoderByAsn1Spec, 

1473 stGetValueDecoderByTag, 

1474 stTryAsExplicitTag, 

1475 stDecodeValue, 

1476 stDumpRawValue, 

1477 stErrorCondition, 

1478 stStop) = [x for x in range(10)] 

1479 

1480 

1481EOO_SENTINEL = ints2octs((0, 0)) 

1482 

1483 

1484class SingleItemDecoder(object): 

1485 defaultErrorState = stErrorCondition 

1486 #defaultErrorState = stDumpRawValue 

1487 defaultRawDecoder = AnyPayloadDecoder() 

1488 

1489 supportIndefLength = True 

1490 

1491 TAG_MAP = TAG_MAP 

1492 TYPE_MAP = TYPE_MAP 

1493 

1494 def __init__(self, tagMap=_MISSING, typeMap=_MISSING, **ignored): 

1495 self._tagMap = tagMap if tagMap is not _MISSING else self.TAG_MAP 

1496 self._typeMap = typeMap if typeMap is not _MISSING else self.TYPE_MAP 

1497 

1498 # Tag & TagSet objects caches 

1499 self._tagCache = {} 

1500 self._tagSetCache = {} 

1501 

1502 def __call__(self, substrate, asn1Spec=None, 

1503 tagSet=None, length=None, state=stDecodeTag, 

1504 decodeFun=None, substrateFun=None, 

1505 **options): 

1506 

1507 allowEoo = options.pop('allowEoo', False) 

1508 

1509 if LOG: 

1510 LOG('decoder called at scope %s with state %d, working with up ' 

1511 'to %s octets of substrate: ' 

1512 '%s' % (debug.scope, state, length, substrate)) 

1513 

1514 # Look for end-of-octets sentinel 

1515 if allowEoo and self.supportIndefLength: 

1516 

1517 for eoo_candidate in readFromStream(substrate, 2, options): 

1518 if isinstance(eoo_candidate, SubstrateUnderrunError): 

1519 yield eoo_candidate 

1520 

1521 if eoo_candidate == EOO_SENTINEL: 

1522 if LOG: 

1523 LOG('end-of-octets sentinel found') 

1524 yield eoo.endOfOctets 

1525 return 

1526 

1527 else: 

1528 substrate.seek(-2, os.SEEK_CUR) 

1529 

1530 tagMap = self._tagMap 

1531 typeMap = self._typeMap 

1532 tagCache = self._tagCache 

1533 tagSetCache = self._tagSetCache 

1534 

1535 value = noValue 

1536 

1537 substrate.markedPosition = substrate.tell() 

1538 

1539 while state is not stStop: 

1540 

1541 if state is stDecodeTag: 

1542 # Decode tag 

1543 isShortTag = True 

1544 

1545 for firstByte in readFromStream(substrate, 1, options): 

1546 if isinstance(firstByte, SubstrateUnderrunError): 

1547 yield firstByte 

1548 

1549 firstOctet = ord(firstByte) 

1550 

1551 try: 

1552 lastTag = tagCache[firstOctet] 

1553 

1554 except KeyError: 

1555 integerTag = firstOctet 

1556 tagClass = integerTag & 0xC0 

1557 tagFormat = integerTag & 0x20 

1558 tagId = integerTag & 0x1F 

1559 

1560 if tagId == 0x1F: 

1561 isShortTag = False 

1562 lengthOctetIdx = 0 

1563 tagId = 0 

1564 

1565 while True: 

1566 for integerByte in readFromStream(substrate, 1, options): 

1567 if isinstance(integerByte, SubstrateUnderrunError): 

1568 yield integerByte 

1569 

1570 if not integerByte: 

1571 raise error.SubstrateUnderrunError( 

1572 'Short octet stream on long tag decoding' 

1573 ) 

1574 

1575 integerTag = ord(integerByte) 

1576 lengthOctetIdx += 1 

1577 tagId <<= 7 

1578 tagId |= (integerTag & 0x7F) 

1579 

1580 if not integerTag & 0x80: 

1581 break 

1582 

1583 lastTag = tag.Tag( 

1584 tagClass=tagClass, tagFormat=tagFormat, tagId=tagId 

1585 ) 

1586 

1587 if isShortTag: 

1588 # cache short tags 

1589 tagCache[firstOctet] = lastTag 

1590 

1591 if tagSet is None: 

1592 if isShortTag: 

1593 try: 

1594 tagSet = tagSetCache[firstOctet] 

1595 

1596 except KeyError: 

1597 # base tag not recovered 

1598 tagSet = tag.TagSet((), lastTag) 

1599 tagSetCache[firstOctet] = tagSet 

1600 else: 

1601 tagSet = tag.TagSet((), lastTag) 

1602 

1603 else: 

1604 tagSet = lastTag + tagSet 

1605 

1606 state = stDecodeLength 

1607 

1608 if LOG: 

1609 LOG('tag decoded into %s, decoding length' % tagSet) 

1610 

1611 if state is stDecodeLength: 

1612 # Decode length 

1613 for firstOctet in readFromStream(substrate, 1, options): 

1614 if isinstance(firstOctet, SubstrateUnderrunError): 

1615 yield firstOctet 

1616 

1617 firstOctet = ord(firstOctet) 

1618 

1619 if firstOctet < 128: 

1620 length = firstOctet 

1621 

1622 elif firstOctet > 128: 

1623 size = firstOctet & 0x7F 

1624 # encoded in size bytes 

1625 for encodedLength in readFromStream(substrate, size, options): 

1626 if isinstance(encodedLength, SubstrateUnderrunError): 

1627 yield encodedLength 

1628 encodedLength = list(encodedLength) 

1629 # missing check on maximum size, which shouldn't be a 

1630 # problem, we can handle more than is possible 

1631 if len(encodedLength) != size: 

1632 raise error.SubstrateUnderrunError( 

1633 '%s<%s at %s' % (size, len(encodedLength), tagSet) 

1634 ) 

1635 

1636 length = 0 

1637 for lengthOctet in encodedLength: 

1638 length <<= 8 

1639 length |= oct2int(lengthOctet) 

1640 size += 1 

1641 

1642 else: # 128 means indefinite 

1643 length = -1 

1644 

1645 if length == -1 and not self.supportIndefLength: 

1646 raise error.PyAsn1Error('Indefinite length encoding not supported by this codec') 

1647 

1648 state = stGetValueDecoder 

1649 

1650 if LOG: 

1651 LOG('value length decoded into %d' % length) 

1652 

1653 if state is stGetValueDecoder: 

1654 if asn1Spec is None: 

1655 state = stGetValueDecoderByTag 

1656 

1657 else: 

1658 state = stGetValueDecoderByAsn1Spec 

1659 # 

1660 # There're two ways of creating subtypes in ASN.1 what influences 

1661 # decoder operation. These methods are: 

1662 # 1) Either base types used in or no IMPLICIT tagging has been 

1663 # applied on subtyping. 

1664 # 2) Subtype syntax drops base type information (by means of 

1665 # IMPLICIT tagging. 

1666 # The first case allows for complete tag recovery from substrate 

1667 # while the second one requires original ASN.1 type spec for 

1668 # decoding. 

1669 # 

1670 # In either case a set of tags (tagSet) is coming from substrate 

1671 # in an incremental, tag-by-tag fashion (this is the case of 

1672 # EXPLICIT tag which is most basic). Outermost tag comes first 

1673 # from the wire. 

1674 # 

1675 if state is stGetValueDecoderByTag: 

1676 try: 

1677 concreteDecoder = tagMap[tagSet] 

1678 

1679 except KeyError: 

1680 concreteDecoder = None 

1681 

1682 if concreteDecoder: 

1683 state = stDecodeValue 

1684 

1685 else: 

1686 try: 

1687 concreteDecoder = tagMap[tagSet[:1]] 

1688 

1689 except KeyError: 

1690 concreteDecoder = None 

1691 

1692 if concreteDecoder: 

1693 state = stDecodeValue 

1694 else: 

1695 state = stTryAsExplicitTag 

1696 

1697 if LOG: 

1698 LOG('codec %s chosen by a built-in type, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag')) 

1699 debug.scope.push(concreteDecoder is None and '?' or concreteDecoder.protoComponent.__class__.__name__) 

1700 

1701 if state is stGetValueDecoderByAsn1Spec: 

1702 

1703 if asn1Spec.__class__ is tagmap.TagMap: 

1704 try: 

1705 chosenSpec = asn1Spec[tagSet] 

1706 

1707 except KeyError: 

1708 chosenSpec = None 

1709 

1710 if LOG: 

1711 LOG('candidate ASN.1 spec is a map of:') 

1712 

1713 for firstOctet, v in asn1Spec.presentTypes.items(): 

1714 LOG(' %s -> %s' % (firstOctet, v.__class__.__name__)) 

1715 

1716 if asn1Spec.skipTypes: 

1717 LOG('but neither of: ') 

1718 for firstOctet, v in asn1Spec.skipTypes.items(): 

1719 LOG(' %s -> %s' % (firstOctet, v.__class__.__name__)) 

1720 LOG('new candidate ASN.1 spec is %s, chosen by %s' % (chosenSpec is None and '<none>' or chosenSpec.prettyPrintType(), tagSet)) 

1721 

1722 elif tagSet == asn1Spec.tagSet or tagSet in asn1Spec.tagMap: 

1723 chosenSpec = asn1Spec 

1724 if LOG: 

1725 LOG('candidate ASN.1 spec is %s' % asn1Spec.__class__.__name__) 

1726 

1727 else: 

1728 chosenSpec = None 

1729 

1730 if chosenSpec is not None: 

1731 try: 

1732 # ambiguous type or just faster codec lookup 

1733 concreteDecoder = typeMap[chosenSpec.typeId] 

1734 

1735 if LOG: 

1736 LOG('value decoder chosen for an ambiguous type by type ID %s' % (chosenSpec.typeId,)) 

1737 

1738 except KeyError: 

1739 # use base type for codec lookup to recover untagged types 

1740 baseTagSet = tag.TagSet(chosenSpec.tagSet.baseTag, chosenSpec.tagSet.baseTag) 

1741 try: 

1742 # base type or tagged subtype 

1743 concreteDecoder = tagMap[baseTagSet] 

1744 

1745 if LOG: 

1746 LOG('value decoder chosen by base %s' % (baseTagSet,)) 

1747 

1748 except KeyError: 

1749 concreteDecoder = None 

1750 

1751 if concreteDecoder: 

1752 asn1Spec = chosenSpec 

1753 state = stDecodeValue 

1754 

1755 else: 

1756 state = stTryAsExplicitTag 

1757 

1758 else: 

1759 concreteDecoder = None 

1760 state = stTryAsExplicitTag 

1761 

1762 if LOG: 

1763 LOG('codec %s chosen by ASN.1 spec, decoding %s' % (state is stDecodeValue and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag')) 

1764 debug.scope.push(chosenSpec is None and '?' or chosenSpec.__class__.__name__) 

1765 

1766 if state is stDecodeValue: 

1767 if not options.get('recursiveFlag', True) and not substrateFun: # deprecate this 

1768 def substrateFun(asn1Object, _substrate, _length, _options): 

1769 """Legacy hack to keep the recursiveFlag=False option supported. 

1770 

1771 The decode(..., substrateFun=userCallback) option was introduced in 0.1.4 as a generalization 

1772 of the old recursiveFlag=False option. Users should pass their callback instead of using 

1773 recursiveFlag. 

1774 """ 

1775 yield asn1Object 

1776 

1777 original_position = substrate.tell() 

1778 

1779 if length == -1: # indef length 

1780 for value in concreteDecoder.indefLenValueDecoder( 

1781 substrate, asn1Spec, 

1782 tagSet, length, stGetValueDecoder, 

1783 self, substrateFun, **options): 

1784 if isinstance(value, SubstrateUnderrunError): 

1785 yield value 

1786 

1787 else: 

1788 for value in concreteDecoder.valueDecoder( 

1789 substrate, asn1Spec, 

1790 tagSet, length, stGetValueDecoder, 

1791 self, substrateFun, **options): 

1792 if isinstance(value, SubstrateUnderrunError): 

1793 yield value 

1794 

1795 bytesRead = substrate.tell() - original_position 

1796 if not substrateFun and bytesRead != length: 

1797 raise PyAsn1Error( 

1798 "Read %s bytes instead of expected %s." % (bytesRead, length)) 

1799 elif substrateFun and bytesRead > length: 

1800 # custom substrateFun may be used for partial decoding, reading less is expected there 

1801 raise PyAsn1Error( 

1802 "Read %s bytes are more than expected %s." % (bytesRead, length)) 

1803 

1804 if LOG: 

1805 LOG('codec %s yields type %s, value:\n%s\n...' % ( 

1806 concreteDecoder.__class__.__name__, value.__class__.__name__, 

1807 isinstance(value, base.Asn1Item) and value.prettyPrint() or value)) 

1808 

1809 state = stStop 

1810 break 

1811 

1812 if state is stTryAsExplicitTag: 

1813 if (tagSet and 

1814 tagSet[0].tagFormat == tag.tagFormatConstructed and 

1815 tagSet[0].tagClass != tag.tagClassUniversal): 

1816 # Assume explicit tagging 

1817 concreteDecoder = rawPayloadDecoder 

1818 state = stDecodeValue 

1819 

1820 else: 

1821 concreteDecoder = None 

1822 state = self.defaultErrorState 

1823 

1824 if LOG: 

1825 LOG('codec %s chosen, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as failure')) 

1826 

1827 if state is stDumpRawValue: 

1828 concreteDecoder = self.defaultRawDecoder 

1829 

1830 if LOG: 

1831 LOG('codec %s chosen, decoding value' % concreteDecoder.__class__.__name__) 

1832 

1833 state = stDecodeValue 

1834 

1835 if state is stErrorCondition: 

1836 raise error.PyAsn1Error( 

1837 '%s not in asn1Spec: %r' % (tagSet, asn1Spec) 

1838 ) 

1839 

1840 if LOG: 

1841 debug.scope.pop() 

1842 LOG('decoder left scope %s, call completed' % debug.scope) 

1843 

1844 yield value 

1845 

1846 

1847class StreamingDecoder(object): 

1848 """Create an iterator that turns BER/CER/DER byte stream into ASN.1 objects. 

1849 

1850 On each iteration, consume whatever BER/CER/DER serialization is 

1851 available in the `substrate` stream-like object and turns it into 

1852 one or more, possibly nested, ASN.1 objects. 

1853 

1854 Parameters 

1855 ---------- 

1856 substrate: :py:class:`file`, :py:class:`io.BytesIO` 

1857 BER/CER/DER serialization in form of a byte stream 

1858 

1859 Keyword Args 

1860 ------------ 

1861 asn1Spec: :py:class:`~pyasn1.type.base.PyAsn1Item` 

1862 A pyasn1 type object to act as a template guiding the decoder. 

1863 Depending on the ASN.1 structure being decoded, `asn1Spec` may 

1864 or may not be required. One of the reasons why `asn1Spec` may 

1865 me required is that ASN.1 structure is encoded in the *IMPLICIT* 

1866 tagging mode. 

1867 

1868 Yields 

1869 ------ 

1870 : :py:class:`~pyasn1.type.base.PyAsn1Item`, :py:class:`~pyasn1.error.SubstrateUnderrunError` 

1871 Decoded ASN.1 object (possibly, nested) or 

1872 :py:class:`~pyasn1.error.SubstrateUnderrunError` object indicating 

1873 insufficient BER/CER/DER serialization on input to fully recover ASN.1 

1874 objects from it. 

1875  

1876 In the latter case the caller is advised to ensure some more data in 

1877 the input stream, then call the iterator again. The decoder will resume 

1878 the decoding process using the newly arrived data. 

1879 

1880 The `context` property of :py:class:`~pyasn1.error.SubstrateUnderrunError` 

1881 object might hold a reference to the partially populated ASN.1 object 

1882 being reconstructed. 

1883 

1884 Raises 

1885 ------ 

1886 ~pyasn1.error.PyAsn1Error, ~pyasn1.error.EndOfStreamError 

1887 `PyAsn1Error` on deserialization error, `EndOfStreamError` on 

1888 premature stream closure. 

1889 

1890 Examples 

1891 -------- 

1892 Decode BER serialisation without ASN.1 schema 

1893 

1894 .. code-block:: pycon 

1895 

1896 >>> stream = io.BytesIO( 

1897 ... b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03') 

1898 >>> 

1899 >>> for asn1Object in StreamingDecoder(stream): 

1900 ... print(asn1Object) 

1901 >>> 

1902 SequenceOf: 

1903 1 2 3 

1904 

1905 Decode BER serialisation with ASN.1 schema 

1906 

1907 .. code-block:: pycon 

1908 

1909 >>> stream = io.BytesIO( 

1910 ... b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03') 

1911 >>> 

1912 >>> schema = SequenceOf(componentType=Integer()) 

1913 >>> 

1914 >>> decoder = StreamingDecoder(stream, asn1Spec=schema) 

1915 >>> for asn1Object in decoder: 

1916 ... print(asn1Object) 

1917 >>> 

1918 SequenceOf: 

1919 1 2 3 

1920 """ 

1921 

1922 SINGLE_ITEM_DECODER = SingleItemDecoder 

1923 

1924 def __init__(self, substrate, asn1Spec=None, **options): 

1925 self._singleItemDecoder = self.SINGLE_ITEM_DECODER(**options) 

1926 self._substrate = asSeekableStream(substrate) 

1927 self._asn1Spec = asn1Spec 

1928 self._options = options 

1929 

1930 def __iter__(self): 

1931 while True: 

1932 for asn1Object in self._singleItemDecoder( 

1933 self._substrate, self._asn1Spec, **self._options): 

1934 yield asn1Object 

1935 

1936 for chunk in isEndOfStream(self._substrate): 

1937 if isinstance(chunk, SubstrateUnderrunError): 

1938 yield 

1939 

1940 break 

1941 

1942 if chunk: 

1943 break 

1944 

1945 

1946class Decoder(object): 

1947 """Create a BER decoder object. 

1948 

1949 Parse BER/CER/DER octet-stream into one, possibly nested, ASN.1 object. 

1950 """ 

1951 STREAMING_DECODER = StreamingDecoder 

1952 

1953 @classmethod 

1954 def __call__(cls, substrate, asn1Spec=None, **options): 

1955 """Turns BER/CER/DER octet stream into an ASN.1 object. 

1956 

1957 Takes BER/CER/DER octet-stream in form of :py:class:`bytes` (Python 3) 

1958 or :py:class:`str` (Python 2) and decode it into an ASN.1 object 

1959 (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) which 

1960 may be a scalar or an arbitrary nested structure. 

1961 

1962 Parameters 

1963 ---------- 

1964 substrate: :py:class:`bytes` (Python 3) or :py:class:`str` (Python 2) 

1965 BER/CER/DER octet-stream to parse 

1966 

1967 Keyword Args 

1968 ------------ 

1969 asn1Spec: :py:class:`~pyasn1.type.base.PyAsn1Item` 

1970 A pyasn1 type object (:py:class:`~pyasn1.type.base.PyAsn1Item` 

1971 derivative) to act as a template guiding the decoder. 

1972 Depending on the ASN.1 structure being decoded, `asn1Spec` may or 

1973 may not be required. Most common reason for it to require is that 

1974 ASN.1 structure is encoded in *IMPLICIT* tagging mode. 

1975 

1976 substrateFun: :py:class:`Union[ 

1977 Callable[[pyasn1.type.base.PyAsn1Item, bytes, int], 

1978 Tuple[pyasn1.type.base.PyAsn1Item, bytes]], 

1979 Callable[[pyasn1.type.base.PyAsn1Item, io.BytesIO, int, dict], 

1980 Generator[Union[pyasn1.type.base.PyAsn1Item, 

1981 pyasn1.error.SubstrateUnderrunError], 

1982 None, None]] 

1983 ]` 

1984 User callback meant to generalize special use cases like non-recursive or 

1985 partial decoding. A 3-arg non-streaming variant is supported for backwards 

1986 compatiblilty in addition to the newer 4-arg streaming variant. 

1987 The callback will receive the uninitialized object recovered from substrate 

1988 as 1st argument, the uninterpreted payload as 2nd argument, and the length 

1989 of the uninterpreted payload as 3rd argument. The streaming variant will 

1990 additionally receive the decode(..., **options) kwargs as 4th argument. 

1991 The non-streaming variant shall return an object that will be propagated 

1992 as decode() return value as 1st item, and the remainig payload for further 

1993 decode passes as 2nd item. 

1994 The streaming variant shall yield an object that will be propagated as 

1995 decode() return value, and leave the remaining payload in the stream. 

1996 

1997 Returns 

1998 ------- 

1999 : :py:class:`tuple` 

2000 A tuple of :py:class:`~pyasn1.type.base.PyAsn1Item` object 

2001 recovered from BER/CER/DER substrate and the unprocessed trailing 

2002 portion of the `substrate` (may be empty) 

2003 

2004 Raises 

2005 ------ 

2006 : :py:class:`~pyasn1.error.PyAsn1Error` 

2007 :py:class:`~pyasn1.error.SubstrateUnderrunError` on insufficient 

2008 input or :py:class:`~pyasn1.error.PyAsn1Error` on decoding error. 

2009 

2010 Examples 

2011 -------- 

2012 Decode BER/CER/DER serialisation without ASN.1 schema 

2013 

2014 .. code-block:: pycon 

2015 

2016 >>> s, unprocessed = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03') 

2017 >>> str(s) 

2018 SequenceOf: 

2019 1 2 3 

2020 

2021 Decode BER/CER/DER serialisation with ASN.1 schema 

2022 

2023 .. code-block:: pycon 

2024 

2025 >>> seq = SequenceOf(componentType=Integer()) 

2026 >>> s, unprocessed = decode( 

2027 b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03', asn1Spec=seq) 

2028 >>> str(s) 

2029 SequenceOf: 

2030 1 2 3 

2031 

2032 """ 

2033 substrate = asSeekableStream(substrate) 

2034 

2035 if "substrateFun" in options: 

2036 origSubstrateFun = options["substrateFun"] 

2037 

2038 def substrateFunWrapper(asn1Object, substrate, length, options=None): 

2039 """Support both 0.4 and 0.5 style APIs. 

2040 

2041 substrateFun API has changed in 0.5 for use with streaming decoders. To stay backwards compatible, 

2042 we first try if we received a streaming user callback. If that fails,we assume we've received a 

2043 non-streaming v0.4 user callback and convert it for streaming on the fly 

2044 """ 

2045 try: 

2046 substrate_gen = origSubstrateFun(asn1Object, substrate, length, options) 

2047 except TypeError: 

2048 _type, _value, traceback = sys.exc_info() 

2049 if traceback.tb_next: 

2050 # Traceback depth > 1 means TypeError from inside user provided function 

2051 raise 

2052 # invariant maintained at Decoder.__call__ entry 

2053 assert isinstance(substrate, io.BytesIO) # nosec assert_used 

2054 substrate_gen = Decoder._callSubstrateFunV4asV5(origSubstrateFun, asn1Object, substrate, length) 

2055 for value in substrate_gen: 

2056 yield value 

2057 

2058 options["substrateFun"] = substrateFunWrapper 

2059 

2060 streamingDecoder = cls.STREAMING_DECODER( 

2061 substrate, asn1Spec, **options) 

2062 

2063 for asn1Object in streamingDecoder: 

2064 if isinstance(asn1Object, SubstrateUnderrunError): 

2065 raise error.SubstrateUnderrunError('Short substrate on input') 

2066 

2067 try: 

2068 tail = next(readFromStream(substrate)) 

2069 

2070 except error.EndOfStreamError: 

2071 tail = null 

2072 

2073 return asn1Object, tail 

2074 

2075 @staticmethod 

2076 def _callSubstrateFunV4asV5(substrateFunV4, asn1Object, substrate, length): 

2077 substrate_bytes = substrate.read() 

2078 if length == -1: 

2079 length = len(substrate_bytes) 

2080 value, nextSubstrate = substrateFunV4(asn1Object, substrate_bytes, length) 

2081 nbytes = substrate.write(nextSubstrate) 

2082 substrate.truncate() 

2083 substrate.seek(-nbytes, os.SEEK_CUR) 

2084 yield value 

2085 

2086#: Turns BER octet stream into an ASN.1 object. 

2087#: 

2088#: Takes BER octet-stream and decode it into an ASN.1 object 

2089#: (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) which 

2090#: may be a scalar or an arbitrary nested structure. 

2091#: 

2092#: Parameters 

2093#: ---------- 

2094#: substrate: :py:class:`bytes` (Python 3) or :py:class:`str` (Python 2) 

2095#: BER octet-stream 

2096#: 

2097#: Keyword Args 

2098#: ------------ 

2099#: asn1Spec: any pyasn1 type object e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative 

2100#: A pyasn1 type object to act as a template guiding the decoder. Depending on the ASN.1 structure 

2101#: being decoded, *asn1Spec* may or may not be required. Most common reason for 

2102#: it to require is that ASN.1 structure is encoded in *IMPLICIT* tagging mode. 

2103#: 

2104#: Returns 

2105#: ------- 

2106#: : :py:class:`tuple` 

2107#: A tuple of pyasn1 object recovered from BER substrate (:py:class:`~pyasn1.type.base.PyAsn1Item` derivative) 

2108#: and the unprocessed trailing portion of the *substrate* (may be empty) 

2109#: 

2110#: Raises 

2111#: ------ 

2112#: ~pyasn1.error.PyAsn1Error, ~pyasn1.error.SubstrateUnderrunError 

2113#: On decoding errors 

2114#: 

2115#: Notes 

2116#: ----- 

2117#: This function is deprecated. Please use :py:class:`Decoder` or 

2118#: :py:class:`StreamingDecoder` class instance. 

2119#: 

2120#: Examples 

2121#: -------- 

2122#: Decode BER serialisation without ASN.1 schema 

2123#: 

2124#: .. code-block:: pycon 

2125#: 

2126#: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03') 

2127#: >>> str(s) 

2128#: SequenceOf: 

2129#: 1 2 3 

2130#: 

2131#: Decode BER serialisation with ASN.1 schema 

2132#: 

2133#: .. code-block:: pycon 

2134#: 

2135#: >>> seq = SequenceOf(componentType=Integer()) 

2136#: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03', asn1Spec=seq) 

2137#: >>> str(s) 

2138#: SequenceOf: 

2139#: 1 2 3 

2140#: 

2141decode = Decoder()