Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pyasn1/codec/ber/decoder.py: 15%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1052 statements  

1# 

2# This file is part of pyasn1 software. 

3# 

4# Copyright (c) 2005-2020, Ilya Etingof <etingof@gmail.com> 

5# License: https://pyasn1.readthedocs.io/en/latest/license.html 

6# 

7import io 

8import os 

9import sys 

10 

11 

12from pyasn1 import debug 

13from pyasn1 import error 

14from pyasn1.codec.ber import eoo 

15from pyasn1.codec.streaming import asSeekableStream 

16from pyasn1.codec.streaming import isEndOfStream 

17from pyasn1.codec.streaming import peekIntoStream 

18from pyasn1.codec.streaming import readFromStream 

19from pyasn1.compat import _MISSING 

20from pyasn1.compat.integer import from_bytes 

21from pyasn1.compat.octets import oct2int, octs2ints, ints2octs, null 

22from pyasn1.error import PyAsn1Error 

23from pyasn1.type import base 

24from pyasn1.type import char 

25from pyasn1.type import tag 

26from pyasn1.type import tagmap 

27from pyasn1.type import univ 

28from pyasn1.type import useful 

29 

30__all__ = ['StreamingDecoder', 'Decoder', 'decode'] 

31 

32LOG = debug.registerLoggee(__name__, flags=debug.DEBUG_DECODER) 

33 

34noValue = base.noValue 

35 

36SubstrateUnderrunError = error.SubstrateUnderrunError 

37 

38 

39class AbstractPayloadDecoder(object): 

40 protoComponent = None 

41 

42 def valueDecoder(self, substrate, asn1Spec, 

43 tagSet=None, length=None, state=None, 

44 decodeFun=None, substrateFun=None, 

45 **options): 

46 """Decode value with fixed byte length. 

47 

48 The decoder is allowed to consume as many bytes as necessary. 

49 """ 

50 raise error.PyAsn1Error('SingleItemDecoder not implemented for %s' % (tagSet,)) # TODO: Seems more like an NotImplementedError? 

51 

52 def indefLenValueDecoder(self, substrate, asn1Spec, 

53 tagSet=None, length=None, state=None, 

54 decodeFun=None, substrateFun=None, 

55 **options): 

56 """Decode value with undefined length. 

57 

58 The decoder is allowed to consume as many bytes as necessary. 

59 """ 

60 raise error.PyAsn1Error('Indefinite length mode decoder not implemented for %s' % (tagSet,)) # TODO: Seems more like an NotImplementedError? 

61 

62 @staticmethod 

63 def _passAsn1Object(asn1Object, options): 

64 if 'asn1Object' not in options: 

65 options['asn1Object'] = asn1Object 

66 

67 return options 

68 

69 

70class AbstractSimplePayloadDecoder(AbstractPayloadDecoder): 

71 @staticmethod 

72 def substrateCollector(asn1Object, substrate, length, options): 

73 for chunk in readFromStream(substrate, length, options): 

74 yield chunk 

75 

76 def _createComponent(self, asn1Spec, tagSet, value, **options): 

77 if options.get('native'): 

78 return value 

79 elif asn1Spec is None: 

80 return self.protoComponent.clone(value, tagSet=tagSet) 

81 elif value is noValue: 

82 return asn1Spec 

83 else: 

84 return asn1Spec.clone(value) 

85 

86 

87class RawPayloadDecoder(AbstractSimplePayloadDecoder): 

88 protoComponent = univ.Any('') 

89 

90 def valueDecoder(self, substrate, asn1Spec, 

91 tagSet=None, length=None, state=None, 

92 decodeFun=None, substrateFun=None, 

93 **options): 

94 if substrateFun: 

95 asn1Object = self._createComponent(asn1Spec, tagSet, '', **options) 

96 

97 for chunk in substrateFun(asn1Object, substrate, length, options): 

98 yield chunk 

99 

100 return 

101 

102 for value in decodeFun(substrate, asn1Spec, tagSet, length, **options): 

103 yield value 

104 

105 def indefLenValueDecoder(self, substrate, asn1Spec, 

106 tagSet=None, length=None, state=None, 

107 decodeFun=None, substrateFun=None, 

108 **options): 

109 if substrateFun: 

110 asn1Object = self._createComponent(asn1Spec, tagSet, '', **options) 

111 

112 for chunk in substrateFun(asn1Object, substrate, length, options): 

113 yield chunk 

114 

115 return 

116 

117 while True: 

118 for value in decodeFun( 

119 substrate, asn1Spec, tagSet, length, 

120 allowEoo=True, **options): 

121 

122 if value is eoo.endOfOctets: 

123 return 

124 

125 yield value 

126 

127 

128rawPayloadDecoder = RawPayloadDecoder() 

129 

130 

131class IntegerPayloadDecoder(AbstractSimplePayloadDecoder): 

132 protoComponent = univ.Integer(0) 

133 

134 def valueDecoder(self, substrate, asn1Spec, 

135 tagSet=None, length=None, state=None, 

136 decodeFun=None, substrateFun=None, 

137 **options): 

138 

139 if tagSet[0].tagFormat != tag.tagFormatSimple: 

140 raise error.PyAsn1Error('Simple tag format expected') 

141 

142 for chunk in readFromStream(substrate, length, options): 

143 if isinstance(chunk, SubstrateUnderrunError): 

144 yield chunk 

145 

146 if chunk: 

147 value = from_bytes(chunk, signed=True) 

148 

149 else: 

150 value = 0 

151 

152 yield self._createComponent(asn1Spec, tagSet, value, **options) 

153 

154 

155class BooleanPayloadDecoder(IntegerPayloadDecoder): 

156 protoComponent = univ.Boolean(0) 

157 

158 def _createComponent(self, asn1Spec, tagSet, value, **options): 

159 return IntegerPayloadDecoder._createComponent( 

160 self, asn1Spec, tagSet, value and 1 or 0, **options) 

161 

162 

163class BitStringPayloadDecoder(AbstractSimplePayloadDecoder): 

164 protoComponent = univ.BitString(()) 

165 supportConstructedForm = True 

166 

167 def valueDecoder(self, substrate, asn1Spec, 

168 tagSet=None, length=None, state=None, 

169 decodeFun=None, substrateFun=None, 

170 **options): 

171 

172 if substrateFun: 

173 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options) 

174 

175 for chunk in substrateFun(asn1Object, substrate, length, options): 

176 yield chunk 

177 

178 return 

179 

180 if not length: 

181 raise error.PyAsn1Error('Empty BIT STRING substrate') 

182 

183 for chunk in isEndOfStream(substrate): 

184 if isinstance(chunk, SubstrateUnderrunError): 

185 yield chunk 

186 

187 if chunk: 

188 raise error.PyAsn1Error('Empty BIT STRING substrate') 

189 

190 if tagSet[0].tagFormat == tag.tagFormatSimple: # XXX what tag to check? 

191 

192 for trailingBits in readFromStream(substrate, 1, options): 

193 if isinstance(trailingBits, SubstrateUnderrunError): 

194 yield trailingBits 

195 

196 trailingBits = ord(trailingBits) 

197 if trailingBits > 7: 

198 raise error.PyAsn1Error( 

199 'Trailing bits overflow %s' % trailingBits 

200 ) 

201 

202 for chunk in readFromStream(substrate, length - 1, options): 

203 if isinstance(chunk, SubstrateUnderrunError): 

204 yield chunk 

205 

206 value = self.protoComponent.fromOctetString( 

207 chunk, internalFormat=True, padding=trailingBits) 

208 

209 yield self._createComponent(asn1Spec, tagSet, value, **options) 

210 

211 return 

212 

213 if not self.supportConstructedForm: 

214 raise error.PyAsn1Error('Constructed encoding form prohibited ' 

215 'at %s' % self.__class__.__name__) 

216 

217 if LOG: 

218 LOG('assembling constructed serialization') 

219 

220 # All inner fragments are of the same type, treat them as octet string 

221 substrateFun = self.substrateCollector 

222 

223 bitString = self.protoComponent.fromOctetString(null, internalFormat=True) 

224 

225 current_position = substrate.tell() 

226 

227 while substrate.tell() - current_position < length: 

228 for component in decodeFun( 

229 substrate, self.protoComponent, substrateFun=substrateFun, 

230 **options): 

231 if isinstance(component, SubstrateUnderrunError): 

232 yield component 

233 

234 trailingBits = oct2int(component[0]) 

235 if trailingBits > 7: 

236 raise error.PyAsn1Error( 

237 'Trailing bits overflow %s' % trailingBits 

238 ) 

239 

240 bitString = self.protoComponent.fromOctetString( 

241 component[1:], internalFormat=True, 

242 prepend=bitString, padding=trailingBits 

243 ) 

244 

245 yield self._createComponent(asn1Spec, tagSet, bitString, **options) 

246 

247 def indefLenValueDecoder(self, substrate, asn1Spec, 

248 tagSet=None, length=None, state=None, 

249 decodeFun=None, substrateFun=None, 

250 **options): 

251 

252 if substrateFun: 

253 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options) 

254 

255 for chunk in substrateFun(asn1Object, substrate, length, options): 

256 yield chunk 

257 

258 return 

259 

260 # All inner fragments are of the same type, treat them as octet string 

261 substrateFun = self.substrateCollector 

262 

263 bitString = self.protoComponent.fromOctetString(null, internalFormat=True) 

264 

265 while True: # loop over fragments 

266 

267 for component in decodeFun( 

268 substrate, self.protoComponent, substrateFun=substrateFun, 

269 allowEoo=True, **options): 

270 

271 if component is eoo.endOfOctets: 

272 break 

273 

274 if isinstance(component, SubstrateUnderrunError): 

275 yield component 

276 

277 if component is eoo.endOfOctets: 

278 break 

279 

280 trailingBits = oct2int(component[0]) 

281 if trailingBits > 7: 

282 raise error.PyAsn1Error( 

283 'Trailing bits overflow %s' % trailingBits 

284 ) 

285 

286 bitString = self.protoComponent.fromOctetString( 

287 component[1:], internalFormat=True, 

288 prepend=bitString, padding=trailingBits 

289 ) 

290 

291 yield self._createComponent(asn1Spec, tagSet, bitString, **options) 

292 

293 

294class OctetStringPayloadDecoder(AbstractSimplePayloadDecoder): 

295 protoComponent = univ.OctetString('') 

296 supportConstructedForm = True 

297 

298 def valueDecoder(self, substrate, asn1Spec, 

299 tagSet=None, length=None, state=None, 

300 decodeFun=None, substrateFun=None, 

301 **options): 

302 if substrateFun: 

303 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options) 

304 

305 for chunk in substrateFun(asn1Object, substrate, length, options): 

306 yield chunk 

307 

308 return 

309 

310 if tagSet[0].tagFormat == tag.tagFormatSimple: # XXX what tag to check? 

311 for chunk in readFromStream(substrate, length, options): 

312 if isinstance(chunk, SubstrateUnderrunError): 

313 yield chunk 

314 

315 yield self._createComponent(asn1Spec, tagSet, chunk, **options) 

316 

317 return 

318 

319 if not self.supportConstructedForm: 

320 raise error.PyAsn1Error('Constructed encoding form prohibited at %s' % self.__class__.__name__) 

321 

322 if LOG: 

323 LOG('assembling constructed serialization') 

324 

325 # All inner fragments are of the same type, treat them as octet string 

326 substrateFun = self.substrateCollector 

327 

328 header = null 

329 

330 original_position = substrate.tell() 

331 # head = popSubstream(substrate, length) 

332 while substrate.tell() - original_position < length: 

333 for component in decodeFun( 

334 substrate, self.protoComponent, substrateFun=substrateFun, 

335 **options): 

336 if isinstance(component, SubstrateUnderrunError): 

337 yield component 

338 

339 header += component 

340 

341 yield self._createComponent(asn1Spec, tagSet, header, **options) 

342 

343 def indefLenValueDecoder(self, substrate, asn1Spec, 

344 tagSet=None, length=None, state=None, 

345 decodeFun=None, substrateFun=None, 

346 **options): 

347 if substrateFun and substrateFun is not self.substrateCollector: 

348 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options) 

349 

350 for chunk in substrateFun(asn1Object, substrate, length, options): 

351 yield chunk 

352 

353 return 

354 

355 # All inner fragments are of the same type, treat them as octet string 

356 substrateFun = self.substrateCollector 

357 

358 header = null 

359 

360 while True: # loop over fragments 

361 

362 for component in decodeFun( 

363 substrate, self.protoComponent, substrateFun=substrateFun, 

364 allowEoo=True, **options): 

365 

366 if isinstance(component, SubstrateUnderrunError): 

367 yield component 

368 

369 if component is eoo.endOfOctets: 

370 break 

371 

372 if component is eoo.endOfOctets: 

373 break 

374 

375 header += component 

376 

377 yield self._createComponent(asn1Spec, tagSet, header, **options) 

378 

379 

380class NullPayloadDecoder(AbstractSimplePayloadDecoder): 

381 protoComponent = univ.Null('') 

382 

383 def valueDecoder(self, substrate, asn1Spec, 

384 tagSet=None, length=None, state=None, 

385 decodeFun=None, substrateFun=None, 

386 **options): 

387 

388 if tagSet[0].tagFormat != tag.tagFormatSimple: 

389 raise error.PyAsn1Error('Simple tag format expected') 

390 

391 for chunk in readFromStream(substrate, length, options): 

392 if isinstance(chunk, SubstrateUnderrunError): 

393 yield chunk 

394 

395 component = self._createComponent(asn1Spec, tagSet, '', **options) 

396 

397 if chunk: 

398 raise error.PyAsn1Error('Unexpected %d-octet substrate for Null' % length) 

399 

400 yield component 

401 

402 

403class ObjectIdentifierPayloadDecoder(AbstractSimplePayloadDecoder): 

404 protoComponent = univ.ObjectIdentifier(()) 

405 

406 def valueDecoder(self, substrate, asn1Spec, 

407 tagSet=None, length=None, state=None, 

408 decodeFun=None, substrateFun=None, 

409 **options): 

410 if tagSet[0].tagFormat != tag.tagFormatSimple: 

411 raise error.PyAsn1Error('Simple tag format expected') 

412 

413 for chunk in readFromStream(substrate, length, options): 

414 if isinstance(chunk, SubstrateUnderrunError): 

415 yield chunk 

416 

417 if not chunk: 

418 raise error.PyAsn1Error('Empty substrate') 

419 

420 chunk = octs2ints(chunk) 

421 

422 oid = () 

423 index = 0 

424 substrateLen = len(chunk) 

425 while index < substrateLen: 

426 subId = chunk[index] 

427 index += 1 

428 if subId < 128: 

429 oid += (subId,) 

430 elif subId > 128: 

431 # Construct subid from a number of octets 

432 nextSubId = subId 

433 subId = 0 

434 while nextSubId >= 128: 

435 subId = (subId << 7) + (nextSubId & 0x7F) 

436 if index >= substrateLen: 

437 raise error.SubstrateUnderrunError( 

438 'Short substrate for sub-OID past %s' % (oid,) 

439 ) 

440 nextSubId = chunk[index] 

441 index += 1 

442 oid += ((subId << 7) + nextSubId,) 

443 elif subId == 128: 

444 # ASN.1 spec forbids leading zeros (0x80) in OID 

445 # encoding, tolerating it opens a vulnerability. See 

446 # https://www.esat.kuleuven.be/cosic/publications/article-1432.pdf 

447 # page 7 

448 raise error.PyAsn1Error('Invalid octet 0x80 in OID encoding') 

449 

450 # Decode two leading arcs 

451 if 0 <= oid[0] <= 39: 

452 oid = (0,) + oid 

453 elif 40 <= oid[0] <= 79: 

454 oid = (1, oid[0] - 40) + oid[1:] 

455 elif oid[0] >= 80: 

456 oid = (2, oid[0] - 80) + oid[1:] 

457 else: 

458 raise error.PyAsn1Error('Malformed first OID octet: %s' % chunk[0]) 

459 

460 yield self._createComponent(asn1Spec, tagSet, oid, **options) 

461 

462 

463class RelativeOIDPayloadDecoder(AbstractSimplePayloadDecoder): 

464 protoComponent = univ.RelativeOID(()) 

465 

466 def valueDecoder(self, substrate, asn1Spec, 

467 tagSet=None, length=None, state=None, 

468 decodeFun=None, substrateFun=None, 

469 **options): 

470 if tagSet[0].tagFormat != tag.tagFormatSimple: 

471 raise error.PyAsn1Error('Simple tag format expected') 

472 

473 for chunk in readFromStream(substrate, length, options): 

474 if isinstance(chunk, SubstrateUnderrunError): 

475 yield chunk 

476 

477 if not chunk: 

478 raise error.PyAsn1Error('Empty substrate') 

479 

480 chunk = octs2ints(chunk) 

481 

482 reloid = () 

483 index = 0 

484 substrateLen = len(chunk) 

485 while index < substrateLen: 

486 subId = chunk[index] 

487 index += 1 

488 if subId < 128: 

489 reloid += (subId,) 

490 elif subId > 128: 

491 # Construct subid from a number of octets 

492 nextSubId = subId 

493 subId = 0 

494 while nextSubId >= 128: 

495 subId = (subId << 7) + (nextSubId & 0x7F) 

496 if index >= substrateLen: 

497 raise error.SubstrateUnderrunError( 

498 'Short substrate for sub-OID past %s' % (reloid,) 

499 ) 

500 nextSubId = chunk[index] 

501 index += 1 

502 reloid += ((subId << 7) + nextSubId,) 

503 elif subId == 128: 

504 # ASN.1 spec forbids leading zeros (0x80) in OID 

505 # encoding, tolerating it opens a vulnerability. See 

506 # https://www.esat.kuleuven.be/cosic/publications/article-1432.pdf 

507 # page 7 

508 raise error.PyAsn1Error('Invalid octet 0x80 in RELATIVE-OID encoding') 

509 

510 yield self._createComponent(asn1Spec, tagSet, reloid, **options) 

511 

512 

513class RealPayloadDecoder(AbstractSimplePayloadDecoder): 

514 protoComponent = univ.Real() 

515 

516 def valueDecoder(self, substrate, asn1Spec, 

517 tagSet=None, length=None, state=None, 

518 decodeFun=None, substrateFun=None, 

519 **options): 

520 if tagSet[0].tagFormat != tag.tagFormatSimple: 

521 raise error.PyAsn1Error('Simple tag format expected') 

522 

523 for chunk in readFromStream(substrate, length, options): 

524 if isinstance(chunk, SubstrateUnderrunError): 

525 yield chunk 

526 

527 if not chunk: 

528 yield self._createComponent(asn1Spec, tagSet, 0.0, **options) 

529 return 

530 

531 fo = oct2int(chunk[0]) 

532 chunk = chunk[1:] 

533 if fo & 0x80: # binary encoding 

534 if not chunk: 

535 raise error.PyAsn1Error("Incomplete floating-point value") 

536 

537 if LOG: 

538 LOG('decoding binary encoded REAL') 

539 

540 n = (fo & 0x03) + 1 

541 

542 if n == 4: 

543 n = oct2int(chunk[0]) 

544 chunk = chunk[1:] 

545 

546 eo, chunk = chunk[:n], chunk[n:] 

547 

548 if not eo or not chunk: 

549 raise error.PyAsn1Error('Real exponent screwed') 

550 

551 e = oct2int(eo[0]) & 0x80 and -1 or 0 

552 

553 while eo: # exponent 

554 e <<= 8 

555 e |= oct2int(eo[0]) 

556 eo = eo[1:] 

557 

558 b = fo >> 4 & 0x03 # base bits 

559 

560 if b > 2: 

561 raise error.PyAsn1Error('Illegal Real base') 

562 

563 if b == 1: # encbase = 8 

564 e *= 3 

565 

566 elif b == 2: # encbase = 16 

567 e *= 4 

568 p = 0 

569 

570 while chunk: # value 

571 p <<= 8 

572 p |= oct2int(chunk[0]) 

573 chunk = chunk[1:] 

574 

575 if fo & 0x40: # sign bit 

576 p = -p 

577 

578 sf = fo >> 2 & 0x03 # scale bits 

579 p *= 2 ** sf 

580 value = (p, 2, e) 

581 

582 elif fo & 0x40: # infinite value 

583 if LOG: 

584 LOG('decoding infinite REAL') 

585 

586 value = fo & 0x01 and '-inf' or 'inf' 

587 

588 elif fo & 0xc0 == 0: # character encoding 

589 if not chunk: 

590 raise error.PyAsn1Error("Incomplete floating-point value") 

591 

592 if LOG: 

593 LOG('decoding character encoded REAL') 

594 

595 try: 

596 if fo & 0x3 == 0x1: # NR1 

597 value = (int(chunk), 10, 0) 

598 

599 elif fo & 0x3 == 0x2: # NR2 

600 value = float(chunk) 

601 

602 elif fo & 0x3 == 0x3: # NR3 

603 value = float(chunk) 

604 

605 else: 

606 raise error.SubstrateUnderrunError( 

607 'Unknown NR (tag %s)' % fo 

608 ) 

609 

610 except ValueError: 

611 raise error.SubstrateUnderrunError( 

612 'Bad character Real syntax' 

613 ) 

614 

615 else: 

616 raise error.SubstrateUnderrunError( 

617 'Unknown encoding (tag %s)' % fo 

618 ) 

619 

620 yield self._createComponent(asn1Spec, tagSet, value, **options) 

621 

622 

623class AbstractConstructedPayloadDecoder(AbstractPayloadDecoder): 

624 protoComponent = None 

625 

626 

627class ConstructedPayloadDecoderBase(AbstractConstructedPayloadDecoder): 

628 protoRecordComponent = None 

629 protoSequenceComponent = None 

630 

631 def _getComponentTagMap(self, asn1Object, idx): 

632 raise NotImplementedError() 

633 

634 def _getComponentPositionByType(self, asn1Object, tagSet, idx): 

635 raise NotImplementedError() 

636 

637 def _decodeComponentsSchemaless( 

638 self, substrate, tagSet=None, decodeFun=None, 

639 length=None, **options): 

640 

641 asn1Object = None 

642 

643 components = [] 

644 componentTypes = set() 

645 

646 original_position = substrate.tell() 

647 

648 while length == -1 or substrate.tell() < original_position + length: 

649 for component in decodeFun(substrate, **options): 

650 if isinstance(component, SubstrateUnderrunError): 

651 yield component 

652 

653 if length == -1 and component is eoo.endOfOctets: 

654 break 

655 

656 components.append(component) 

657 componentTypes.add(component.tagSet) 

658 

659 # Now we have to guess is it SEQUENCE/SET or SEQUENCE OF/SET OF 

660 # The heuristics is: 

661 # * 1+ components of different types -> likely SEQUENCE/SET 

662 # * otherwise -> likely SEQUENCE OF/SET OF 

663 if len(componentTypes) > 1: 

664 protoComponent = self.protoRecordComponent 

665 

666 else: 

667 protoComponent = self.protoSequenceComponent 

668 

669 asn1Object = protoComponent.clone( 

670 # construct tagSet from base tag from prototype ASN.1 object 

671 # and additional tags recovered from the substrate 

672 tagSet=tag.TagSet(protoComponent.tagSet.baseTag, *tagSet.superTags) 

673 ) 

674 

675 if LOG: 

676 LOG('guessed %r container type (pass `asn1Spec` to guide the ' 

677 'decoder)' % asn1Object) 

678 

679 for idx, component in enumerate(components): 

680 asn1Object.setComponentByPosition( 

681 idx, component, 

682 verifyConstraints=False, 

683 matchTags=False, matchConstraints=False 

684 ) 

685 

686 yield asn1Object 

687 

688 def valueDecoder(self, substrate, asn1Spec, 

689 tagSet=None, length=None, state=None, 

690 decodeFun=None, substrateFun=None, 

691 **options): 

692 if tagSet[0].tagFormat != tag.tagFormatConstructed: 

693 raise error.PyAsn1Error('Constructed tag format expected') 

694 

695 original_position = substrate.tell() 

696 

697 if substrateFun: 

698 if asn1Spec is not None: 

699 asn1Object = asn1Spec.clone() 

700 

701 elif self.protoComponent is not None: 

702 asn1Object = self.protoComponent.clone(tagSet=tagSet) 

703 

704 else: 

705 asn1Object = self.protoRecordComponent, self.protoSequenceComponent 

706 

707 for chunk in substrateFun(asn1Object, substrate, length, options): 

708 yield chunk 

709 

710 return 

711 

712 if asn1Spec is None: 

713 for asn1Object in self._decodeComponentsSchemaless( 

714 substrate, tagSet=tagSet, decodeFun=decodeFun, 

715 length=length, **options): 

716 if isinstance(asn1Object, SubstrateUnderrunError): 

717 yield asn1Object 

718 

719 if substrate.tell() < original_position + length: 

720 if LOG: 

721 for trailing in readFromStream(substrate, context=options): 

722 if isinstance(trailing, SubstrateUnderrunError): 

723 yield trailing 

724 

725 LOG('Unused trailing %d octets encountered: %s' % ( 

726 len(trailing), debug.hexdump(trailing))) 

727 

728 yield asn1Object 

729 

730 return 

731 

732 asn1Object = asn1Spec.clone() 

733 asn1Object.clear() 

734 

735 options = self._passAsn1Object(asn1Object, options) 

736 

737 if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId): 

738 

739 namedTypes = asn1Spec.componentType 

740 

741 isSetType = asn1Spec.typeId == univ.Set.typeId 

742 isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault 

743 

744 if LOG: 

745 LOG('decoding %sdeterministic %s type %r chosen by type ID' % ( 

746 not isDeterministic and 'non-' or '', isSetType and 'SET' or '', 

747 asn1Spec)) 

748 

749 seenIndices = set() 

750 idx = 0 

751 while substrate.tell() - original_position < length: 

752 if not namedTypes: 

753 componentType = None 

754 

755 elif isSetType: 

756 componentType = namedTypes.tagMapUnique 

757 

758 else: 

759 try: 

760 if isDeterministic: 

761 componentType = namedTypes[idx].asn1Object 

762 

763 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 

764 componentType = namedTypes.getTagMapNearPosition(idx) 

765 

766 else: 

767 componentType = namedTypes[idx].asn1Object 

768 

769 except IndexError: 

770 raise error.PyAsn1Error( 

771 'Excessive components decoded at %r' % (asn1Spec,) 

772 ) 

773 

774 for component in decodeFun(substrate, componentType, **options): 

775 if isinstance(component, SubstrateUnderrunError): 

776 yield component 

777 

778 if not isDeterministic and namedTypes: 

779 if isSetType: 

780 idx = namedTypes.getPositionByType(component.effectiveTagSet) 

781 

782 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 

783 idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx) 

784 

785 asn1Object.setComponentByPosition( 

786 idx, component, 

787 verifyConstraints=False, 

788 matchTags=False, matchConstraints=False 

789 ) 

790 

791 seenIndices.add(idx) 

792 idx += 1 

793 

794 if LOG: 

795 LOG('seen component indices %s' % seenIndices) 

796 

797 if namedTypes: 

798 if not namedTypes.requiredComponents.issubset(seenIndices): 

799 raise error.PyAsn1Error( 

800 'ASN.1 object %s has uninitialized ' 

801 'components' % asn1Object.__class__.__name__) 

802 

803 if namedTypes.hasOpenTypes: 

804 

805 openTypes = options.get('openTypes', {}) 

806 

807 if LOG: 

808 LOG('user-specified open types map:') 

809 

810 for k, v in openTypes.items(): 

811 LOG('%s -> %r' % (k, v)) 

812 

813 if openTypes or options.get('decodeOpenTypes', False): 

814 

815 for idx, namedType in enumerate(namedTypes.namedTypes): 

816 if not namedType.openType: 

817 continue 

818 

819 if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue: 

820 continue 

821 

822 governingValue = asn1Object.getComponentByName( 

823 namedType.openType.name 

824 ) 

825 

826 try: 

827 openType = openTypes[governingValue] 

828 

829 except KeyError: 

830 

831 if LOG: 

832 LOG('default open types map of component ' 

833 '"%s.%s" governed by component "%s.%s"' 

834 ':' % (asn1Object.__class__.__name__, 

835 namedType.name, 

836 asn1Object.__class__.__name__, 

837 namedType.openType.name)) 

838 

839 for k, v in namedType.openType.items(): 

840 LOG('%s -> %r' % (k, v)) 

841 

842 try: 

843 openType = namedType.openType[governingValue] 

844 

845 except KeyError: 

846 if LOG: 

847 LOG('failed to resolve open type by governing ' 

848 'value %r' % (governingValue,)) 

849 continue 

850 

851 if LOG: 

852 LOG('resolved open type %r by governing ' 

853 'value %r' % (openType, governingValue)) 

854 

855 containerValue = asn1Object.getComponentByPosition(idx) 

856 

857 if containerValue.typeId in ( 

858 univ.SetOf.typeId, univ.SequenceOf.typeId): 

859 

860 for pos, containerElement in enumerate( 

861 containerValue): 

862 

863 stream = asSeekableStream(containerValue[pos].asOctets()) 

864 

865 for component in decodeFun(stream, asn1Spec=openType, **options): 

866 if isinstance(component, SubstrateUnderrunError): 

867 yield component 

868 

869 containerValue[pos] = component 

870 

871 else: 

872 stream = asSeekableStream(asn1Object.getComponentByPosition(idx).asOctets()) 

873 

874 for component in decodeFun(stream, asn1Spec=openType, **options): 

875 if isinstance(component, SubstrateUnderrunError): 

876 yield component 

877 

878 asn1Object.setComponentByPosition(idx, component) 

879 

880 else: 

881 inconsistency = asn1Object.isInconsistent 

882 if inconsistency: 

883 raise inconsistency 

884 

885 else: 

886 componentType = asn1Spec.componentType 

887 

888 if LOG: 

889 LOG('decoding type %r chosen by given `asn1Spec`' % componentType) 

890 

891 idx = 0 

892 

893 while substrate.tell() - original_position < length: 

894 for component in decodeFun(substrate, componentType, **options): 

895 if isinstance(component, SubstrateUnderrunError): 

896 yield component 

897 

898 asn1Object.setComponentByPosition( 

899 idx, component, 

900 verifyConstraints=False, 

901 matchTags=False, matchConstraints=False 

902 ) 

903 

904 idx += 1 

905 

906 yield asn1Object 

907 

908 def indefLenValueDecoder(self, substrate, asn1Spec, 

909 tagSet=None, length=None, state=None, 

910 decodeFun=None, substrateFun=None, 

911 **options): 

912 if tagSet[0].tagFormat != tag.tagFormatConstructed: 

913 raise error.PyAsn1Error('Constructed tag format expected') 

914 

915 if substrateFun is not None: 

916 if asn1Spec is not None: 

917 asn1Object = asn1Spec.clone() 

918 

919 elif self.protoComponent is not None: 

920 asn1Object = self.protoComponent.clone(tagSet=tagSet) 

921 

922 else: 

923 asn1Object = self.protoRecordComponent, self.protoSequenceComponent 

924 

925 for chunk in substrateFun(asn1Object, substrate, length, options): 

926 yield chunk 

927 

928 return 

929 

930 if asn1Spec is None: 

931 for asn1Object in self._decodeComponentsSchemaless( 

932 substrate, tagSet=tagSet, decodeFun=decodeFun, 

933 length=length, **dict(options, allowEoo=True)): 

934 if isinstance(asn1Object, SubstrateUnderrunError): 

935 yield asn1Object 

936 

937 yield asn1Object 

938 

939 return 

940 

941 asn1Object = asn1Spec.clone() 

942 asn1Object.clear() 

943 

944 options = self._passAsn1Object(asn1Object, options) 

945 

946 if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId): 

947 

948 namedTypes = asn1Object.componentType 

949 

950 isSetType = asn1Object.typeId == univ.Set.typeId 

951 isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault 

952 

953 if LOG: 

954 LOG('decoding %sdeterministic %s type %r chosen by type ID' % ( 

955 not isDeterministic and 'non-' or '', isSetType and 'SET' or '', 

956 asn1Spec)) 

957 

958 seenIndices = set() 

959 

960 idx = 0 

961 

962 while True: # loop over components 

963 if len(namedTypes) <= idx: 

964 asn1Spec = None 

965 

966 elif isSetType: 

967 asn1Spec = namedTypes.tagMapUnique 

968 

969 else: 

970 try: 

971 if isDeterministic: 

972 asn1Spec = namedTypes[idx].asn1Object 

973 

974 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 

975 asn1Spec = namedTypes.getTagMapNearPosition(idx) 

976 

977 else: 

978 asn1Spec = namedTypes[idx].asn1Object 

979 

980 except IndexError: 

981 raise error.PyAsn1Error( 

982 'Excessive components decoded at %r' % (asn1Object,) 

983 ) 

984 

985 for component in decodeFun(substrate, asn1Spec, allowEoo=True, **options): 

986 

987 if isinstance(component, SubstrateUnderrunError): 

988 yield component 

989 

990 if component is eoo.endOfOctets: 

991 break 

992 

993 if component is eoo.endOfOctets: 

994 break 

995 

996 if not isDeterministic and namedTypes: 

997 if isSetType: 

998 idx = namedTypes.getPositionByType(component.effectiveTagSet) 

999 

1000 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 

1001 idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx) 

1002 

1003 asn1Object.setComponentByPosition( 

1004 idx, component, 

1005 verifyConstraints=False, 

1006 matchTags=False, matchConstraints=False 

1007 ) 

1008 

1009 seenIndices.add(idx) 

1010 idx += 1 

1011 

1012 if LOG: 

1013 LOG('seen component indices %s' % seenIndices) 

1014 

1015 if namedTypes: 

1016 if not namedTypes.requiredComponents.issubset(seenIndices): 

1017 raise error.PyAsn1Error( 

1018 'ASN.1 object %s has uninitialized ' 

1019 'components' % asn1Object.__class__.__name__) 

1020 

1021 if namedTypes.hasOpenTypes: 

1022 

1023 openTypes = options.get('openTypes', {}) 

1024 

1025 if LOG: 

1026 LOG('user-specified open types map:') 

1027 

1028 for k, v in openTypes.items(): 

1029 LOG('%s -> %r' % (k, v)) 

1030 

1031 if openTypes or options.get('decodeOpenTypes', False): 

1032 

1033 for idx, namedType in enumerate(namedTypes.namedTypes): 

1034 if not namedType.openType: 

1035 continue 

1036 

1037 if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue: 

1038 continue 

1039 

1040 governingValue = asn1Object.getComponentByName( 

1041 namedType.openType.name 

1042 ) 

1043 

1044 try: 

1045 openType = openTypes[governingValue] 

1046 

1047 except KeyError: 

1048 

1049 if LOG: 

1050 LOG('default open types map of component ' 

1051 '"%s.%s" governed by component "%s.%s"' 

1052 ':' % (asn1Object.__class__.__name__, 

1053 namedType.name, 

1054 asn1Object.__class__.__name__, 

1055 namedType.openType.name)) 

1056 

1057 for k, v in namedType.openType.items(): 

1058 LOG('%s -> %r' % (k, v)) 

1059 

1060 try: 

1061 openType = namedType.openType[governingValue] 

1062 

1063 except KeyError: 

1064 if LOG: 

1065 LOG('failed to resolve open type by governing ' 

1066 'value %r' % (governingValue,)) 

1067 continue 

1068 

1069 if LOG: 

1070 LOG('resolved open type %r by governing ' 

1071 'value %r' % (openType, governingValue)) 

1072 

1073 containerValue = asn1Object.getComponentByPosition(idx) 

1074 

1075 if containerValue.typeId in ( 

1076 univ.SetOf.typeId, univ.SequenceOf.typeId): 

1077 

1078 for pos, containerElement in enumerate( 

1079 containerValue): 

1080 

1081 stream = asSeekableStream(containerValue[pos].asOctets()) 

1082 

1083 for component in decodeFun(stream, asn1Spec=openType, 

1084 **dict(options, allowEoo=True)): 

1085 if isinstance(component, SubstrateUnderrunError): 

1086 yield component 

1087 

1088 if component is eoo.endOfOctets: 

1089 break 

1090 

1091 containerValue[pos] = component 

1092 

1093 else: 

1094 stream = asSeekableStream(asn1Object.getComponentByPosition(idx).asOctets()) 

1095 for component in decodeFun(stream, asn1Spec=openType, 

1096 **dict(options, allowEoo=True)): 

1097 if isinstance(component, SubstrateUnderrunError): 

1098 yield component 

1099 

1100 if component is eoo.endOfOctets: 

1101 break 

1102 

1103 asn1Object.setComponentByPosition(idx, component) 

1104 

1105 else: 

1106 inconsistency = asn1Object.isInconsistent 

1107 if inconsistency: 

1108 raise inconsistency 

1109 

1110 else: 

1111 componentType = asn1Spec.componentType 

1112 

1113 if LOG: 

1114 LOG('decoding type %r chosen by given `asn1Spec`' % componentType) 

1115 

1116 idx = 0 

1117 

1118 while True: 

1119 

1120 for component in decodeFun( 

1121 substrate, componentType, allowEoo=True, **options): 

1122 

1123 if isinstance(component, SubstrateUnderrunError): 

1124 yield component 

1125 

1126 if component is eoo.endOfOctets: 

1127 break 

1128 

1129 if component is eoo.endOfOctets: 

1130 break 

1131 

1132 asn1Object.setComponentByPosition( 

1133 idx, component, 

1134 verifyConstraints=False, 

1135 matchTags=False, matchConstraints=False 

1136 ) 

1137 

1138 idx += 1 

1139 

1140 yield asn1Object 

1141 

1142 

1143class SequenceOrSequenceOfPayloadDecoder(ConstructedPayloadDecoderBase): 

1144 protoRecordComponent = univ.Sequence() 

1145 protoSequenceComponent = univ.SequenceOf() 

1146 

1147 

1148class SequencePayloadDecoder(SequenceOrSequenceOfPayloadDecoder): 

1149 protoComponent = univ.Sequence() 

1150 

1151 

1152class SequenceOfPayloadDecoder(SequenceOrSequenceOfPayloadDecoder): 

1153 protoComponent = univ.SequenceOf() 

1154 

1155 

1156class SetOrSetOfPayloadDecoder(ConstructedPayloadDecoderBase): 

1157 protoRecordComponent = univ.Set() 

1158 protoSequenceComponent = univ.SetOf() 

1159 

1160 

1161class SetPayloadDecoder(SetOrSetOfPayloadDecoder): 

1162 protoComponent = univ.Set() 

1163 

1164 

1165class SetOfPayloadDecoder(SetOrSetOfPayloadDecoder): 

1166 protoComponent = univ.SetOf() 

1167 

1168 

1169class ChoicePayloadDecoder(ConstructedPayloadDecoderBase): 

1170 protoComponent = univ.Choice() 

1171 

1172 def valueDecoder(self, substrate, asn1Spec, 

1173 tagSet=None, length=None, state=None, 

1174 decodeFun=None, substrateFun=None, 

1175 **options): 

1176 if asn1Spec is None: 

1177 asn1Object = self.protoComponent.clone(tagSet=tagSet) 

1178 

1179 else: 

1180 asn1Object = asn1Spec.clone() 

1181 

1182 if substrateFun: 

1183 for chunk in substrateFun(asn1Object, substrate, length, options): 

1184 yield chunk 

1185 

1186 return 

1187 

1188 options = self._passAsn1Object(asn1Object, options) 

1189 

1190 if asn1Object.tagSet == tagSet: 

1191 if LOG: 

1192 LOG('decoding %s as explicitly tagged CHOICE' % (tagSet,)) 

1193 

1194 for component in decodeFun( 

1195 substrate, asn1Object.componentTagMap, **options): 

1196 if isinstance(component, SubstrateUnderrunError): 

1197 yield component 

1198 

1199 else: 

1200 if LOG: 

1201 LOG('decoding %s as untagged CHOICE' % (tagSet,)) 

1202 

1203 for component in decodeFun( 

1204 substrate, asn1Object.componentTagMap, tagSet, length, 

1205 state, **options): 

1206 if isinstance(component, SubstrateUnderrunError): 

1207 yield component 

1208 

1209 effectiveTagSet = component.effectiveTagSet 

1210 

1211 if LOG: 

1212 LOG('decoded component %s, effective tag set %s' % (component, effectiveTagSet)) 

1213 

1214 asn1Object.setComponentByType( 

1215 effectiveTagSet, component, 

1216 verifyConstraints=False, 

1217 matchTags=False, matchConstraints=False, 

1218 innerFlag=False 

1219 ) 

1220 

1221 yield asn1Object 

1222 

1223 def indefLenValueDecoder(self, substrate, asn1Spec, 

1224 tagSet=None, length=None, state=None, 

1225 decodeFun=None, substrateFun=None, 

1226 **options): 

1227 if asn1Spec is None: 

1228 asn1Object = self.protoComponent.clone(tagSet=tagSet) 

1229 

1230 else: 

1231 asn1Object = asn1Spec.clone() 

1232 

1233 if substrateFun: 

1234 for chunk in substrateFun(asn1Object, substrate, length, options): 

1235 yield chunk 

1236 

1237 return 

1238 

1239 options = self._passAsn1Object(asn1Object, options) 

1240 

1241 isTagged = asn1Object.tagSet == tagSet 

1242 

1243 if LOG: 

1244 LOG('decoding %s as %stagged CHOICE' % ( 

1245 tagSet, isTagged and 'explicitly ' or 'un')) 

1246 

1247 while True: 

1248 

1249 if isTagged: 

1250 iterator = decodeFun( 

1251 substrate, asn1Object.componentType.tagMapUnique, 

1252 **dict(options, allowEoo=True)) 

1253 

1254 else: 

1255 iterator = decodeFun( 

1256 substrate, asn1Object.componentType.tagMapUnique, 

1257 tagSet, length, state, **dict(options, allowEoo=True)) 

1258 

1259 for component in iterator: 

1260 

1261 if isinstance(component, SubstrateUnderrunError): 

1262 yield component 

1263 

1264 if component is eoo.endOfOctets: 

1265 break 

1266 

1267 effectiveTagSet = component.effectiveTagSet 

1268 

1269 if LOG: 

1270 LOG('decoded component %s, effective tag set ' 

1271 '%s' % (component, effectiveTagSet)) 

1272 

1273 asn1Object.setComponentByType( 

1274 effectiveTagSet, component, 

1275 verifyConstraints=False, 

1276 matchTags=False, matchConstraints=False, 

1277 innerFlag=False 

1278 ) 

1279 

1280 if not isTagged: 

1281 break 

1282 

1283 if not isTagged or component is eoo.endOfOctets: 

1284 break 

1285 

1286 yield asn1Object 

1287 

1288 

1289class AnyPayloadDecoder(AbstractSimplePayloadDecoder): 

1290 protoComponent = univ.Any() 

1291 

1292 def valueDecoder(self, substrate, asn1Spec, 

1293 tagSet=None, length=None, state=None, 

1294 decodeFun=None, substrateFun=None, 

1295 **options): 

1296 if asn1Spec is None: 

1297 isUntagged = True 

1298 

1299 elif asn1Spec.__class__ is tagmap.TagMap: 

1300 isUntagged = tagSet not in asn1Spec.tagMap 

1301 

1302 else: 

1303 isUntagged = tagSet != asn1Spec.tagSet 

1304 

1305 if isUntagged: 

1306 fullPosition = substrate.markedPosition 

1307 currentPosition = substrate.tell() 

1308 

1309 substrate.seek(fullPosition, os.SEEK_SET) 

1310 length += currentPosition - fullPosition 

1311 

1312 if LOG: 

1313 for chunk in peekIntoStream(substrate, length): 

1314 if isinstance(chunk, SubstrateUnderrunError): 

1315 yield chunk 

1316 LOG('decoding as untagged ANY, substrate ' 

1317 '%s' % debug.hexdump(chunk)) 

1318 

1319 if substrateFun: 

1320 for chunk in substrateFun( 

1321 self._createComponent(asn1Spec, tagSet, noValue, **options), 

1322 substrate, length, options): 

1323 yield chunk 

1324 

1325 return 

1326 

1327 for chunk in readFromStream(substrate, length, options): 

1328 if isinstance(chunk, SubstrateUnderrunError): 

1329 yield chunk 

1330 

1331 yield self._createComponent(asn1Spec, tagSet, chunk, **options) 

1332 

1333 def indefLenValueDecoder(self, substrate, asn1Spec, 

1334 tagSet=None, length=None, state=None, 

1335 decodeFun=None, substrateFun=None, 

1336 **options): 

1337 if asn1Spec is None: 

1338 isTagged = False 

1339 

1340 elif asn1Spec.__class__ is tagmap.TagMap: 

1341 isTagged = tagSet in asn1Spec.tagMap 

1342 

1343 else: 

1344 isTagged = tagSet == asn1Spec.tagSet 

1345 

1346 if isTagged: 

1347 # tagged Any type -- consume header substrate 

1348 chunk = null 

1349 

1350 if LOG: 

1351 LOG('decoding as tagged ANY') 

1352 

1353 else: 

1354 # TODO: Seems not to be tested 

1355 fullPosition = substrate.markedPosition 

1356 currentPosition = substrate.tell() 

1357 

1358 substrate.seek(fullPosition, os.SEEK_SET) 

1359 for chunk in readFromStream(substrate, currentPosition - fullPosition, options): 

1360 if isinstance(chunk, SubstrateUnderrunError): 

1361 yield chunk 

1362 

1363 if LOG: 

1364 LOG('decoding as untagged ANY, header substrate %s' % debug.hexdump(chunk)) 

1365 

1366 # Any components do not inherit initial tag 

1367 asn1Spec = self.protoComponent 

1368 

1369 if substrateFun and substrateFun is not self.substrateCollector: 

1370 asn1Object = self._createComponent( 

1371 asn1Spec, tagSet, noValue, **options) 

1372 

1373 for chunk in substrateFun( 

1374 asn1Object, chunk + substrate, length + len(chunk), options): 

1375 yield chunk 

1376 

1377 return 

1378 

1379 if LOG: 

1380 LOG('assembling constructed serialization') 

1381 

1382 # All inner fragments are of the same type, treat them as octet string 

1383 substrateFun = self.substrateCollector 

1384 

1385 while True: # loop over fragments 

1386 

1387 for component in decodeFun( 

1388 substrate, asn1Spec, substrateFun=substrateFun, 

1389 allowEoo=True, **options): 

1390 

1391 if isinstance(component, SubstrateUnderrunError): 

1392 yield component 

1393 

1394 if component is eoo.endOfOctets: 

1395 break 

1396 

1397 if component is eoo.endOfOctets: 

1398 break 

1399 

1400 chunk += component 

1401 

1402 if substrateFun: 

1403 yield chunk # TODO: Weird 

1404 

1405 else: 

1406 yield self._createComponent(asn1Spec, tagSet, chunk, **options) 

1407 

1408 

1409# character string types 

1410class UTF8StringPayloadDecoder(OctetStringPayloadDecoder): 

1411 protoComponent = char.UTF8String() 

1412 

1413 

1414class NumericStringPayloadDecoder(OctetStringPayloadDecoder): 

1415 protoComponent = char.NumericString() 

1416 

1417 

1418class PrintableStringPayloadDecoder(OctetStringPayloadDecoder): 

1419 protoComponent = char.PrintableString() 

1420 

1421 

1422class TeletexStringPayloadDecoder(OctetStringPayloadDecoder): 

1423 protoComponent = char.TeletexString() 

1424 

1425 

1426class VideotexStringPayloadDecoder(OctetStringPayloadDecoder): 

1427 protoComponent = char.VideotexString() 

1428 

1429 

1430class IA5StringPayloadDecoder(OctetStringPayloadDecoder): 

1431 protoComponent = char.IA5String() 

1432 

1433 

1434class GraphicStringPayloadDecoder(OctetStringPayloadDecoder): 

1435 protoComponent = char.GraphicString() 

1436 

1437 

1438class VisibleStringPayloadDecoder(OctetStringPayloadDecoder): 

1439 protoComponent = char.VisibleString() 

1440 

1441 

1442class GeneralStringPayloadDecoder(OctetStringPayloadDecoder): 

1443 protoComponent = char.GeneralString() 

1444 

1445 

1446class UniversalStringPayloadDecoder(OctetStringPayloadDecoder): 

1447 protoComponent = char.UniversalString() 

1448 

1449 

1450class BMPStringPayloadDecoder(OctetStringPayloadDecoder): 

1451 protoComponent = char.BMPString() 

1452 

1453 

1454# "useful" types 

1455class ObjectDescriptorPayloadDecoder(OctetStringPayloadDecoder): 

1456 protoComponent = useful.ObjectDescriptor() 

1457 

1458 

1459class GeneralizedTimePayloadDecoder(OctetStringPayloadDecoder): 

1460 protoComponent = useful.GeneralizedTime() 

1461 

1462 

1463class UTCTimePayloadDecoder(OctetStringPayloadDecoder): 

1464 protoComponent = useful.UTCTime() 

1465 

1466 

1467TAG_MAP = { 

1468 univ.Integer.tagSet: IntegerPayloadDecoder(), 

1469 univ.Boolean.tagSet: BooleanPayloadDecoder(), 

1470 univ.BitString.tagSet: BitStringPayloadDecoder(), 

1471 univ.OctetString.tagSet: OctetStringPayloadDecoder(), 

1472 univ.Null.tagSet: NullPayloadDecoder(), 

1473 univ.ObjectIdentifier.tagSet: ObjectIdentifierPayloadDecoder(), 

1474 univ.RelativeOID.tagSet: RelativeOIDPayloadDecoder(), 

1475 univ.Enumerated.tagSet: IntegerPayloadDecoder(), 

1476 univ.Real.tagSet: RealPayloadDecoder(), 

1477 univ.Sequence.tagSet: SequenceOrSequenceOfPayloadDecoder(), # conflicts with SequenceOf 

1478 univ.Set.tagSet: SetOrSetOfPayloadDecoder(), # conflicts with SetOf 

1479 univ.Choice.tagSet: ChoicePayloadDecoder(), # conflicts with Any 

1480 # character string types 

1481 char.UTF8String.tagSet: UTF8StringPayloadDecoder(), 

1482 char.NumericString.tagSet: NumericStringPayloadDecoder(), 

1483 char.PrintableString.tagSet: PrintableStringPayloadDecoder(), 

1484 char.TeletexString.tagSet: TeletexStringPayloadDecoder(), 

1485 char.VideotexString.tagSet: VideotexStringPayloadDecoder(), 

1486 char.IA5String.tagSet: IA5StringPayloadDecoder(), 

1487 char.GraphicString.tagSet: GraphicStringPayloadDecoder(), 

1488 char.VisibleString.tagSet: VisibleStringPayloadDecoder(), 

1489 char.GeneralString.tagSet: GeneralStringPayloadDecoder(), 

1490 char.UniversalString.tagSet: UniversalStringPayloadDecoder(), 

1491 char.BMPString.tagSet: BMPStringPayloadDecoder(), 

1492 # useful types 

1493 useful.ObjectDescriptor.tagSet: ObjectDescriptorPayloadDecoder(), 

1494 useful.GeneralizedTime.tagSet: GeneralizedTimePayloadDecoder(), 

1495 useful.UTCTime.tagSet: UTCTimePayloadDecoder() 

1496} 

1497 

1498# Type-to-codec map for ambiguous ASN.1 types 

1499TYPE_MAP = { 

1500 univ.Set.typeId: SetPayloadDecoder(), 

1501 univ.SetOf.typeId: SetOfPayloadDecoder(), 

1502 univ.Sequence.typeId: SequencePayloadDecoder(), 

1503 univ.SequenceOf.typeId: SequenceOfPayloadDecoder(), 

1504 univ.Choice.typeId: ChoicePayloadDecoder(), 

1505 univ.Any.typeId: AnyPayloadDecoder() 

1506} 

1507 

1508# deprecated aliases, https://github.com/pyasn1/pyasn1/issues/9 

1509tagMap = TAG_MAP 

1510typeMap = TYPE_MAP 

1511 

1512# Put in non-ambiguous types for faster codec lookup 

1513for typeDecoder in TAG_MAP.values(): 

1514 if typeDecoder.protoComponent is not None: 

1515 typeId = typeDecoder.protoComponent.__class__.typeId 

1516 if typeId is not None and typeId not in TYPE_MAP: 

1517 TYPE_MAP[typeId] = typeDecoder 

1518 

1519 

1520(stDecodeTag, 

1521 stDecodeLength, 

1522 stGetValueDecoder, 

1523 stGetValueDecoderByAsn1Spec, 

1524 stGetValueDecoderByTag, 

1525 stTryAsExplicitTag, 

1526 stDecodeValue, 

1527 stDumpRawValue, 

1528 stErrorCondition, 

1529 stStop) = [x for x in range(10)] 

1530 

1531 

1532EOO_SENTINEL = ints2octs((0, 0)) 

1533 

1534 

1535class SingleItemDecoder(object): 

1536 defaultErrorState = stErrorCondition 

1537 #defaultErrorState = stDumpRawValue 

1538 defaultRawDecoder = AnyPayloadDecoder() 

1539 

1540 supportIndefLength = True 

1541 

1542 TAG_MAP = TAG_MAP 

1543 TYPE_MAP = TYPE_MAP 

1544 

1545 def __init__(self, tagMap=_MISSING, typeMap=_MISSING, **ignored): 

1546 self._tagMap = tagMap if tagMap is not _MISSING else self.TAG_MAP 

1547 self._typeMap = typeMap if typeMap is not _MISSING else self.TYPE_MAP 

1548 

1549 # Tag & TagSet objects caches 

1550 self._tagCache = {} 

1551 self._tagSetCache = {} 

1552 

1553 def __call__(self, substrate, asn1Spec=None, 

1554 tagSet=None, length=None, state=stDecodeTag, 

1555 decodeFun=None, substrateFun=None, 

1556 **options): 

1557 

1558 allowEoo = options.pop('allowEoo', False) 

1559 

1560 if LOG: 

1561 LOG('decoder called at scope %s with state %d, working with up ' 

1562 'to %s octets of substrate: ' 

1563 '%s' % (debug.scope, state, length, substrate)) 

1564 

1565 # Look for end-of-octets sentinel 

1566 if allowEoo and self.supportIndefLength: 

1567 

1568 for eoo_candidate in readFromStream(substrate, 2, options): 

1569 if isinstance(eoo_candidate, SubstrateUnderrunError): 

1570 yield eoo_candidate 

1571 

1572 if eoo_candidate == EOO_SENTINEL: 

1573 if LOG: 

1574 LOG('end-of-octets sentinel found') 

1575 yield eoo.endOfOctets 

1576 return 

1577 

1578 else: 

1579 substrate.seek(-2, os.SEEK_CUR) 

1580 

1581 tagMap = self._tagMap 

1582 typeMap = self._typeMap 

1583 tagCache = self._tagCache 

1584 tagSetCache = self._tagSetCache 

1585 

1586 value = noValue 

1587 

1588 substrate.markedPosition = substrate.tell() 

1589 

1590 while state is not stStop: 

1591 

1592 if state is stDecodeTag: 

1593 # Decode tag 

1594 isShortTag = True 

1595 

1596 for firstByte in readFromStream(substrate, 1, options): 

1597 if isinstance(firstByte, SubstrateUnderrunError): 

1598 yield firstByte 

1599 

1600 firstOctet = ord(firstByte) 

1601 

1602 try: 

1603 lastTag = tagCache[firstOctet] 

1604 

1605 except KeyError: 

1606 integerTag = firstOctet 

1607 tagClass = integerTag & 0xC0 

1608 tagFormat = integerTag & 0x20 

1609 tagId = integerTag & 0x1F 

1610 

1611 if tagId == 0x1F: 

1612 isShortTag = False 

1613 lengthOctetIdx = 0 

1614 tagId = 0 

1615 

1616 while True: 

1617 for integerByte in readFromStream(substrate, 1, options): 

1618 if isinstance(integerByte, SubstrateUnderrunError): 

1619 yield integerByte 

1620 

1621 if not integerByte: 

1622 raise error.SubstrateUnderrunError( 

1623 'Short octet stream on long tag decoding' 

1624 ) 

1625 

1626 integerTag = ord(integerByte) 

1627 lengthOctetIdx += 1 

1628 tagId <<= 7 

1629 tagId |= (integerTag & 0x7F) 

1630 

1631 if not integerTag & 0x80: 

1632 break 

1633 

1634 lastTag = tag.Tag( 

1635 tagClass=tagClass, tagFormat=tagFormat, tagId=tagId 

1636 ) 

1637 

1638 if isShortTag: 

1639 # cache short tags 

1640 tagCache[firstOctet] = lastTag 

1641 

1642 if tagSet is None: 

1643 if isShortTag: 

1644 try: 

1645 tagSet = tagSetCache[firstOctet] 

1646 

1647 except KeyError: 

1648 # base tag not recovered 

1649 tagSet = tag.TagSet((), lastTag) 

1650 tagSetCache[firstOctet] = tagSet 

1651 else: 

1652 tagSet = tag.TagSet((), lastTag) 

1653 

1654 else: 

1655 tagSet = lastTag + tagSet 

1656 

1657 state = stDecodeLength 

1658 

1659 if LOG: 

1660 LOG('tag decoded into %s, decoding length' % tagSet) 

1661 

1662 if state is stDecodeLength: 

1663 # Decode length 

1664 for firstOctet in readFromStream(substrate, 1, options): 

1665 if isinstance(firstOctet, SubstrateUnderrunError): 

1666 yield firstOctet 

1667 

1668 firstOctet = ord(firstOctet) 

1669 

1670 if firstOctet < 128: 

1671 length = firstOctet 

1672 

1673 elif firstOctet > 128: 

1674 size = firstOctet & 0x7F 

1675 # encoded in size bytes 

1676 for encodedLength in readFromStream(substrate, size, options): 

1677 if isinstance(encodedLength, SubstrateUnderrunError): 

1678 yield encodedLength 

1679 encodedLength = list(encodedLength) 

1680 # missing check on maximum size, which shouldn't be a 

1681 # problem, we can handle more than is possible 

1682 if len(encodedLength) != size: 

1683 raise error.SubstrateUnderrunError( 

1684 '%s<%s at %s' % (size, len(encodedLength), tagSet) 

1685 ) 

1686 

1687 length = 0 

1688 for lengthOctet in encodedLength: 

1689 length <<= 8 

1690 length |= oct2int(lengthOctet) 

1691 size += 1 

1692 

1693 else: # 128 means indefinite 

1694 length = -1 

1695 

1696 if length == -1 and not self.supportIndefLength: 

1697 raise error.PyAsn1Error('Indefinite length encoding not supported by this codec') 

1698 

1699 state = stGetValueDecoder 

1700 

1701 if LOG: 

1702 LOG('value length decoded into %d' % length) 

1703 

1704 if state is stGetValueDecoder: 

1705 if asn1Spec is None: 

1706 state = stGetValueDecoderByTag 

1707 

1708 else: 

1709 state = stGetValueDecoderByAsn1Spec 

1710 # 

1711 # There're two ways of creating subtypes in ASN.1 what influences 

1712 # decoder operation. These methods are: 

1713 # 1) Either base types used in or no IMPLICIT tagging has been 

1714 # applied on subtyping. 

1715 # 2) Subtype syntax drops base type information (by means of 

1716 # IMPLICIT tagging. 

1717 # The first case allows for complete tag recovery from substrate 

1718 # while the second one requires original ASN.1 type spec for 

1719 # decoding. 

1720 # 

1721 # In either case a set of tags (tagSet) is coming from substrate 

1722 # in an incremental, tag-by-tag fashion (this is the case of 

1723 # EXPLICIT tag which is most basic). Outermost tag comes first 

1724 # from the wire. 

1725 # 

1726 if state is stGetValueDecoderByTag: 

1727 try: 

1728 concreteDecoder = tagMap[tagSet] 

1729 

1730 except KeyError: 

1731 concreteDecoder = None 

1732 

1733 if concreteDecoder: 

1734 state = stDecodeValue 

1735 

1736 else: 

1737 try: 

1738 concreteDecoder = tagMap[tagSet[:1]] 

1739 

1740 except KeyError: 

1741 concreteDecoder = None 

1742 

1743 if concreteDecoder: 

1744 state = stDecodeValue 

1745 else: 

1746 state = stTryAsExplicitTag 

1747 

1748 if LOG: 

1749 LOG('codec %s chosen by a built-in type, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag')) 

1750 debug.scope.push(concreteDecoder is None and '?' or concreteDecoder.protoComponent.__class__.__name__) 

1751 

1752 if state is stGetValueDecoderByAsn1Spec: 

1753 

1754 if asn1Spec.__class__ is tagmap.TagMap: 

1755 try: 

1756 chosenSpec = asn1Spec[tagSet] 

1757 

1758 except KeyError: 

1759 chosenSpec = None 

1760 

1761 if LOG: 

1762 LOG('candidate ASN.1 spec is a map of:') 

1763 

1764 for firstOctet, v in asn1Spec.presentTypes.items(): 

1765 LOG(' %s -> %s' % (firstOctet, v.__class__.__name__)) 

1766 

1767 if asn1Spec.skipTypes: 

1768 LOG('but neither of: ') 

1769 for firstOctet, v in asn1Spec.skipTypes.items(): 

1770 LOG(' %s -> %s' % (firstOctet, v.__class__.__name__)) 

1771 LOG('new candidate ASN.1 spec is %s, chosen by %s' % (chosenSpec is None and '<none>' or chosenSpec.prettyPrintType(), tagSet)) 

1772 

1773 elif tagSet == asn1Spec.tagSet or tagSet in asn1Spec.tagMap: 

1774 chosenSpec = asn1Spec 

1775 if LOG: 

1776 LOG('candidate ASN.1 spec is %s' % asn1Spec.__class__.__name__) 

1777 

1778 else: 

1779 chosenSpec = None 

1780 

1781 if chosenSpec is not None: 

1782 try: 

1783 # ambiguous type or just faster codec lookup 

1784 concreteDecoder = typeMap[chosenSpec.typeId] 

1785 

1786 if LOG: 

1787 LOG('value decoder chosen for an ambiguous type by type ID %s' % (chosenSpec.typeId,)) 

1788 

1789 except KeyError: 

1790 # use base type for codec lookup to recover untagged types 

1791 baseTagSet = tag.TagSet(chosenSpec.tagSet.baseTag, chosenSpec.tagSet.baseTag) 

1792 try: 

1793 # base type or tagged subtype 

1794 concreteDecoder = tagMap[baseTagSet] 

1795 

1796 if LOG: 

1797 LOG('value decoder chosen by base %s' % (baseTagSet,)) 

1798 

1799 except KeyError: 

1800 concreteDecoder = None 

1801 

1802 if concreteDecoder: 

1803 asn1Spec = chosenSpec 

1804 state = stDecodeValue 

1805 

1806 else: 

1807 state = stTryAsExplicitTag 

1808 

1809 else: 

1810 concreteDecoder = None 

1811 state = stTryAsExplicitTag 

1812 

1813 if LOG: 

1814 LOG('codec %s chosen by ASN.1 spec, decoding %s' % (state is stDecodeValue and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag')) 

1815 debug.scope.push(chosenSpec is None and '?' or chosenSpec.__class__.__name__) 

1816 

1817 if state is stDecodeValue: 

1818 if not options.get('recursiveFlag', True) and not substrateFun: # deprecate this 

1819 def substrateFun(asn1Object, _substrate, _length, _options): 

1820 """Legacy hack to keep the recursiveFlag=False option supported. 

1821 

1822 The decode(..., substrateFun=userCallback) option was introduced in 0.1.4 as a generalization 

1823 of the old recursiveFlag=False option. Users should pass their callback instead of using 

1824 recursiveFlag. 

1825 """ 

1826 yield asn1Object 

1827 

1828 original_position = substrate.tell() 

1829 

1830 if length == -1: # indef length 

1831 for value in concreteDecoder.indefLenValueDecoder( 

1832 substrate, asn1Spec, 

1833 tagSet, length, stGetValueDecoder, 

1834 self, substrateFun, **options): 

1835 if isinstance(value, SubstrateUnderrunError): 

1836 yield value 

1837 

1838 else: 

1839 for value in concreteDecoder.valueDecoder( 

1840 substrate, asn1Spec, 

1841 tagSet, length, stGetValueDecoder, 

1842 self, substrateFun, **options): 

1843 if isinstance(value, SubstrateUnderrunError): 

1844 yield value 

1845 

1846 bytesRead = substrate.tell() - original_position 

1847 if not substrateFun and bytesRead != length: 

1848 raise PyAsn1Error( 

1849 "Read %s bytes instead of expected %s." % (bytesRead, length)) 

1850 elif substrateFun and bytesRead > length: 

1851 # custom substrateFun may be used for partial decoding, reading less is expected there 

1852 raise PyAsn1Error( 

1853 "Read %s bytes are more than expected %s." % (bytesRead, length)) 

1854 

1855 if LOG: 

1856 LOG('codec %s yields type %s, value:\n%s\n...' % ( 

1857 concreteDecoder.__class__.__name__, value.__class__.__name__, 

1858 isinstance(value, base.Asn1Item) and value.prettyPrint() or value)) 

1859 

1860 state = stStop 

1861 break 

1862 

1863 if state is stTryAsExplicitTag: 

1864 if (tagSet and 

1865 tagSet[0].tagFormat == tag.tagFormatConstructed and 

1866 tagSet[0].tagClass != tag.tagClassUniversal): 

1867 # Assume explicit tagging 

1868 concreteDecoder = rawPayloadDecoder 

1869 state = stDecodeValue 

1870 

1871 else: 

1872 concreteDecoder = None 

1873 state = self.defaultErrorState 

1874 

1875 if LOG: 

1876 LOG('codec %s chosen, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as failure')) 

1877 

1878 if state is stDumpRawValue: 

1879 concreteDecoder = self.defaultRawDecoder 

1880 

1881 if LOG: 

1882 LOG('codec %s chosen, decoding value' % concreteDecoder.__class__.__name__) 

1883 

1884 state = stDecodeValue 

1885 

1886 if state is stErrorCondition: 

1887 raise error.PyAsn1Error( 

1888 '%s not in asn1Spec: %r' % (tagSet, asn1Spec) 

1889 ) 

1890 

1891 if LOG: 

1892 debug.scope.pop() 

1893 LOG('decoder left scope %s, call completed' % debug.scope) 

1894 

1895 yield value 

1896 

1897 

1898class StreamingDecoder(object): 

1899 """Create an iterator that turns BER/CER/DER byte stream into ASN.1 objects. 

1900 

1901 On each iteration, consume whatever BER/CER/DER serialization is 

1902 available in the `substrate` stream-like object and turns it into 

1903 one or more, possibly nested, ASN.1 objects. 

1904 

1905 Parameters 

1906 ---------- 

1907 substrate: :py:class:`file`, :py:class:`io.BytesIO` 

1908 BER/CER/DER serialization in form of a byte stream 

1909 

1910 Keyword Args 

1911 ------------ 

1912 asn1Spec: :py:class:`~pyasn1.type.base.PyAsn1Item` 

1913 A pyasn1 type object to act as a template guiding the decoder. 

1914 Depending on the ASN.1 structure being decoded, `asn1Spec` may 

1915 or may not be required. One of the reasons why `asn1Spec` may 

1916 me required is that ASN.1 structure is encoded in the *IMPLICIT* 

1917 tagging mode. 

1918 

1919 Yields 

1920 ------ 

1921 : :py:class:`~pyasn1.type.base.PyAsn1Item`, :py:class:`~pyasn1.error.SubstrateUnderrunError` 

1922 Decoded ASN.1 object (possibly, nested) or 

1923 :py:class:`~pyasn1.error.SubstrateUnderrunError` object indicating 

1924 insufficient BER/CER/DER serialization on input to fully recover ASN.1 

1925 objects from it. 

1926  

1927 In the latter case the caller is advised to ensure some more data in 

1928 the input stream, then call the iterator again. The decoder will resume 

1929 the decoding process using the newly arrived data. 

1930 

1931 The `context` property of :py:class:`~pyasn1.error.SubstrateUnderrunError` 

1932 object might hold a reference to the partially populated ASN.1 object 

1933 being reconstructed. 

1934 

1935 Raises 

1936 ------ 

1937 ~pyasn1.error.PyAsn1Error, ~pyasn1.error.EndOfStreamError 

1938 `PyAsn1Error` on deserialization error, `EndOfStreamError` on 

1939 premature stream closure. 

1940 

1941 Examples 

1942 -------- 

1943 Decode BER serialisation without ASN.1 schema 

1944 

1945 .. code-block:: pycon 

1946 

1947 >>> stream = io.BytesIO( 

1948 ... b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03') 

1949 >>> 

1950 >>> for asn1Object in StreamingDecoder(stream): 

1951 ... print(asn1Object) 

1952 >>> 

1953 SequenceOf: 

1954 1 2 3 

1955 

1956 Decode BER serialisation with ASN.1 schema 

1957 

1958 .. code-block:: pycon 

1959 

1960 >>> stream = io.BytesIO( 

1961 ... b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03') 

1962 >>> 

1963 >>> schema = SequenceOf(componentType=Integer()) 

1964 >>> 

1965 >>> decoder = StreamingDecoder(stream, asn1Spec=schema) 

1966 >>> for asn1Object in decoder: 

1967 ... print(asn1Object) 

1968 >>> 

1969 SequenceOf: 

1970 1 2 3 

1971 """ 

1972 

1973 SINGLE_ITEM_DECODER = SingleItemDecoder 

1974 

1975 def __init__(self, substrate, asn1Spec=None, **options): 

1976 self._singleItemDecoder = self.SINGLE_ITEM_DECODER(**options) 

1977 self._substrate = asSeekableStream(substrate) 

1978 self._asn1Spec = asn1Spec 

1979 self._options = options 

1980 

1981 def __iter__(self): 

1982 while True: 

1983 for asn1Object in self._singleItemDecoder( 

1984 self._substrate, self._asn1Spec, **self._options): 

1985 yield asn1Object 

1986 

1987 for chunk in isEndOfStream(self._substrate): 

1988 if isinstance(chunk, SubstrateUnderrunError): 

1989 yield 

1990 

1991 break 

1992 

1993 if chunk: 

1994 break 

1995 

1996 

1997class Decoder(object): 

1998 """Create a BER decoder object. 

1999 

2000 Parse BER/CER/DER octet-stream into one, possibly nested, ASN.1 object. 

2001 """ 

2002 STREAMING_DECODER = StreamingDecoder 

2003 

2004 @classmethod 

2005 def __call__(cls, substrate, asn1Spec=None, **options): 

2006 """Turns BER/CER/DER octet stream into an ASN.1 object. 

2007 

2008 Takes BER/CER/DER octet-stream in form of :py:class:`bytes` (Python 3) 

2009 or :py:class:`str` (Python 2) and decode it into an ASN.1 object 

2010 (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) which 

2011 may be a scalar or an arbitrary nested structure. 

2012 

2013 Parameters 

2014 ---------- 

2015 substrate: :py:class:`bytes` (Python 3) or :py:class:`str` (Python 2) 

2016 BER/CER/DER octet-stream to parse 

2017 

2018 Keyword Args 

2019 ------------ 

2020 asn1Spec: :py:class:`~pyasn1.type.base.PyAsn1Item` 

2021 A pyasn1 type object (:py:class:`~pyasn1.type.base.PyAsn1Item` 

2022 derivative) to act as a template guiding the decoder. 

2023 Depending on the ASN.1 structure being decoded, `asn1Spec` may or 

2024 may not be required. Most common reason for it to require is that 

2025 ASN.1 structure is encoded in *IMPLICIT* tagging mode. 

2026 

2027 substrateFun: :py:class:`Union[ 

2028 Callable[[pyasn1.type.base.PyAsn1Item, bytes, int], 

2029 Tuple[pyasn1.type.base.PyAsn1Item, bytes]], 

2030 Callable[[pyasn1.type.base.PyAsn1Item, io.BytesIO, int, dict], 

2031 Generator[Union[pyasn1.type.base.PyAsn1Item, 

2032 pyasn1.error.SubstrateUnderrunError], 

2033 None, None]] 

2034 ]` 

2035 User callback meant to generalize special use cases like non-recursive or 

2036 partial decoding. A 3-arg non-streaming variant is supported for backwards 

2037 compatiblilty in addition to the newer 4-arg streaming variant. 

2038 The callback will receive the uninitialized object recovered from substrate 

2039 as 1st argument, the uninterpreted payload as 2nd argument, and the length 

2040 of the uninterpreted payload as 3rd argument. The streaming variant will 

2041 additionally receive the decode(..., **options) kwargs as 4th argument. 

2042 The non-streaming variant shall return an object that will be propagated 

2043 as decode() return value as 1st item, and the remainig payload for further 

2044 decode passes as 2nd item. 

2045 The streaming variant shall yield an object that will be propagated as 

2046 decode() return value, and leave the remaining payload in the stream. 

2047 

2048 Returns 

2049 ------- 

2050 : :py:class:`tuple` 

2051 A tuple of :py:class:`~pyasn1.type.base.PyAsn1Item` object 

2052 recovered from BER/CER/DER substrate and the unprocessed trailing 

2053 portion of the `substrate` (may be empty) 

2054 

2055 Raises 

2056 ------ 

2057 : :py:class:`~pyasn1.error.PyAsn1Error` 

2058 :py:class:`~pyasn1.error.SubstrateUnderrunError` on insufficient 

2059 input or :py:class:`~pyasn1.error.PyAsn1Error` on decoding error. 

2060 

2061 Examples 

2062 -------- 

2063 Decode BER/CER/DER serialisation without ASN.1 schema 

2064 

2065 .. code-block:: pycon 

2066 

2067 >>> s, unprocessed = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03') 

2068 >>> str(s) 

2069 SequenceOf: 

2070 1 2 3 

2071 

2072 Decode BER/CER/DER serialisation with ASN.1 schema 

2073 

2074 .. code-block:: pycon 

2075 

2076 >>> seq = SequenceOf(componentType=Integer()) 

2077 >>> s, unprocessed = decode( 

2078 b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03', asn1Spec=seq) 

2079 >>> str(s) 

2080 SequenceOf: 

2081 1 2 3 

2082 

2083 """ 

2084 substrate = asSeekableStream(substrate) 

2085 

2086 if "substrateFun" in options: 

2087 origSubstrateFun = options["substrateFun"] 

2088 

2089 def substrateFunWrapper(asn1Object, substrate, length, options=None): 

2090 """Support both 0.4 and 0.5 style APIs. 

2091 

2092 substrateFun API has changed in 0.5 for use with streaming decoders. To stay backwards compatible, 

2093 we first try if we received a streaming user callback. If that fails,we assume we've received a 

2094 non-streaming v0.4 user callback and convert it for streaming on the fly 

2095 """ 

2096 try: 

2097 substrate_gen = origSubstrateFun(asn1Object, substrate, length, options) 

2098 except TypeError: 

2099 _type, _value, traceback = sys.exc_info() 

2100 if traceback.tb_next: 

2101 # Traceback depth > 1 means TypeError from inside user provided function 

2102 raise 

2103 # invariant maintained at Decoder.__call__ entry 

2104 assert isinstance(substrate, io.BytesIO) # nosec assert_used 

2105 substrate_gen = Decoder._callSubstrateFunV4asV5(origSubstrateFun, asn1Object, substrate, length) 

2106 for value in substrate_gen: 

2107 yield value 

2108 

2109 options["substrateFun"] = substrateFunWrapper 

2110 

2111 streamingDecoder = cls.STREAMING_DECODER( 

2112 substrate, asn1Spec, **options) 

2113 

2114 for asn1Object in streamingDecoder: 

2115 if isinstance(asn1Object, SubstrateUnderrunError): 

2116 raise error.SubstrateUnderrunError('Short substrate on input') 

2117 

2118 try: 

2119 tail = next(readFromStream(substrate)) 

2120 

2121 except error.EndOfStreamError: 

2122 tail = null 

2123 

2124 return asn1Object, tail 

2125 

2126 @staticmethod 

2127 def _callSubstrateFunV4asV5(substrateFunV4, asn1Object, substrate, length): 

2128 substrate_bytes = substrate.read() 

2129 if length == -1: 

2130 length = len(substrate_bytes) 

2131 value, nextSubstrate = substrateFunV4(asn1Object, substrate_bytes, length) 

2132 nbytes = substrate.write(nextSubstrate) 

2133 substrate.truncate() 

2134 substrate.seek(-nbytes, os.SEEK_CUR) 

2135 yield value 

2136 

2137#: Turns BER octet stream into an ASN.1 object. 

2138#: 

2139#: Takes BER octet-stream and decode it into an ASN.1 object 

2140#: (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) which 

2141#: may be a scalar or an arbitrary nested structure. 

2142#: 

2143#: Parameters 

2144#: ---------- 

2145#: substrate: :py:class:`bytes` (Python 3) or :py:class:`str` (Python 2) 

2146#: BER octet-stream 

2147#: 

2148#: Keyword Args 

2149#: ------------ 

2150#: asn1Spec: any pyasn1 type object e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative 

2151#: A pyasn1 type object to act as a template guiding the decoder. Depending on the ASN.1 structure 

2152#: being decoded, *asn1Spec* may or may not be required. Most common reason for 

2153#: it to require is that ASN.1 structure is encoded in *IMPLICIT* tagging mode. 

2154#: 

2155#: Returns 

2156#: ------- 

2157#: : :py:class:`tuple` 

2158#: A tuple of pyasn1 object recovered from BER substrate (:py:class:`~pyasn1.type.base.PyAsn1Item` derivative) 

2159#: and the unprocessed trailing portion of the *substrate* (may be empty) 

2160#: 

2161#: Raises 

2162#: ------ 

2163#: ~pyasn1.error.PyAsn1Error, ~pyasn1.error.SubstrateUnderrunError 

2164#: On decoding errors 

2165#: 

2166#: Notes 

2167#: ----- 

2168#: This function is deprecated. Please use :py:class:`Decoder` or 

2169#: :py:class:`StreamingDecoder` class instance. 

2170#: 

2171#: Examples 

2172#: -------- 

2173#: Decode BER serialisation without ASN.1 schema 

2174#: 

2175#: .. code-block:: pycon 

2176#: 

2177#: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03') 

2178#: >>> str(s) 

2179#: SequenceOf: 

2180#: 1 2 3 

2181#: 

2182#: Decode BER serialisation with ASN.1 schema 

2183#: 

2184#: .. code-block:: pycon 

2185#: 

2186#: >>> seq = SequenceOf(componentType=Integer()) 

2187#: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03', asn1Spec=seq) 

2188#: >>> str(s) 

2189#: SequenceOf: 

2190#: 1 2 3 

2191#: 

2192decode = Decoder()