Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pyasn1/codec/ber/decoder.py: 15%

991 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:37 +0000

1# 

2# This file is part of pyasn1 software. 

3# 

4# Copyright (c) 2005-2020, Ilya Etingof <etingof@gmail.com> 

5# License: https://pyasn1.readthedocs.io/en/latest/license.html 

6# 

7import os 

8 

9from pyasn1 import debug 

10from pyasn1 import error 

11from pyasn1.codec.ber import eoo 

12from pyasn1.codec.streaming import asSeekableStream 

13from pyasn1.codec.streaming import isEndOfStream 

14from pyasn1.codec.streaming import peekIntoStream 

15from pyasn1.codec.streaming import readFromStream 

16from pyasn1.compat import _MISSING 

17from pyasn1.compat.integer import from_bytes 

18from pyasn1.compat.octets import oct2int, octs2ints, ints2octs, null 

19from pyasn1.error import PyAsn1Error 

20from pyasn1.type import base 

21from pyasn1.type import char 

22from pyasn1.type import tag 

23from pyasn1.type import tagmap 

24from pyasn1.type import univ 

25from pyasn1.type import useful 

26 

27__all__ = ['StreamingDecoder', 'Decoder', 'decode'] 

28 

29LOG = debug.registerLoggee(__name__, flags=debug.DEBUG_DECODER) 

30 

31noValue = base.noValue 

32 

33SubstrateUnderrunError = error.SubstrateUnderrunError 

34 

35 

36class AbstractPayloadDecoder(object): 

37 protoComponent = None 

38 

39 def valueDecoder(self, substrate, asn1Spec, 

40 tagSet=None, length=None, state=None, 

41 decodeFun=None, substrateFun=None, 

42 **options): 

43 """Decode value with fixed byte length. 

44 

45 The decoder is allowed to consume as many bytes as necessary. 

46 """ 

47 raise error.PyAsn1Error('SingleItemDecoder not implemented for %s' % (tagSet,)) # TODO: Seems more like an NotImplementedError? 

48 

49 def indefLenValueDecoder(self, substrate, asn1Spec, 

50 tagSet=None, length=None, state=None, 

51 decodeFun=None, substrateFun=None, 

52 **options): 

53 """Decode value with undefined length. 

54 

55 The decoder is allowed to consume as many bytes as necessary. 

56 """ 

57 raise error.PyAsn1Error('Indefinite length mode decoder not implemented for %s' % (tagSet,)) # TODO: Seems more like an NotImplementedError? 

58 

59 @staticmethod 

60 def _passAsn1Object(asn1Object, options): 

61 if 'asn1Object' not in options: 

62 options['asn1Object'] = asn1Object 

63 

64 return options 

65 

66 

67class AbstractSimplePayloadDecoder(AbstractPayloadDecoder): 

68 @staticmethod 

69 def substrateCollector(asn1Object, substrate, length, options): 

70 for chunk in readFromStream(substrate, length, options): 

71 yield chunk 

72 

73 def _createComponent(self, asn1Spec, tagSet, value, **options): 

74 if options.get('native'): 

75 return value 

76 elif asn1Spec is None: 

77 return self.protoComponent.clone(value, tagSet=tagSet) 

78 elif value is noValue: 

79 return asn1Spec 

80 else: 

81 return asn1Spec.clone(value) 

82 

83 

84class RawPayloadDecoder(AbstractSimplePayloadDecoder): 

85 protoComponent = univ.Any('') 

86 

87 def valueDecoder(self, substrate, asn1Spec, 

88 tagSet=None, length=None, state=None, 

89 decodeFun=None, substrateFun=None, 

90 **options): 

91 if substrateFun: 

92 asn1Object = self._createComponent(asn1Spec, tagSet, '', **options) 

93 

94 for chunk in substrateFun(asn1Object, substrate, length, options): 

95 yield chunk 

96 

97 return 

98 

99 for value in decodeFun(substrate, asn1Spec, tagSet, length, **options): 

100 yield value 

101 

102 def indefLenValueDecoder(self, substrate, asn1Spec, 

103 tagSet=None, length=None, state=None, 

104 decodeFun=None, substrateFun=None, 

105 **options): 

106 if substrateFun: 

107 asn1Object = self._createComponent(asn1Spec, tagSet, '', **options) 

108 

109 for chunk in substrateFun(asn1Object, substrate, length, options): 

110 yield chunk 

111 

112 return 

113 

114 while True: 

115 for value in decodeFun( 

116 substrate, asn1Spec, tagSet, length, 

117 allowEoo=True, **options): 

118 

119 if value is eoo.endOfOctets: 

120 return 

121 

122 yield value 

123 

124 

125rawPayloadDecoder = RawPayloadDecoder() 

126 

127 

128class IntegerPayloadDecoder(AbstractSimplePayloadDecoder): 

129 protoComponent = univ.Integer(0) 

130 

131 def valueDecoder(self, substrate, asn1Spec, 

132 tagSet=None, length=None, state=None, 

133 decodeFun=None, substrateFun=None, 

134 **options): 

135 

136 if tagSet[0].tagFormat != tag.tagFormatSimple: 

137 raise error.PyAsn1Error('Simple tag format expected') 

138 

139 for chunk in readFromStream(substrate, length, options): 

140 if isinstance(chunk, SubstrateUnderrunError): 

141 yield chunk 

142 

143 if chunk: 

144 value = from_bytes(chunk, signed=True) 

145 

146 else: 

147 value = 0 

148 

149 yield self._createComponent(asn1Spec, tagSet, value, **options) 

150 

151 

152class BooleanPayloadDecoder(IntegerPayloadDecoder): 

153 protoComponent = univ.Boolean(0) 

154 

155 def _createComponent(self, asn1Spec, tagSet, value, **options): 

156 return IntegerPayloadDecoder._createComponent( 

157 self, asn1Spec, tagSet, value and 1 or 0, **options) 

158 

159 

160class BitStringPayloadDecoder(AbstractSimplePayloadDecoder): 

161 protoComponent = univ.BitString(()) 

162 supportConstructedForm = True 

163 

164 def valueDecoder(self, substrate, asn1Spec, 

165 tagSet=None, length=None, state=None, 

166 decodeFun=None, substrateFun=None, 

167 **options): 

168 

169 if substrateFun: 

170 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options) 

171 

172 for chunk in substrateFun(asn1Object, substrate, length, options): 

173 yield chunk 

174 

175 return 

176 

177 if not length: 

178 raise error.PyAsn1Error('Empty BIT STRING substrate') 

179 

180 for chunk in isEndOfStream(substrate): 

181 if isinstance(chunk, SubstrateUnderrunError): 

182 yield chunk 

183 

184 if chunk: 

185 raise error.PyAsn1Error('Empty BIT STRING substrate') 

186 

187 if tagSet[0].tagFormat == tag.tagFormatSimple: # XXX what tag to check? 

188 

189 for trailingBits in readFromStream(substrate, 1, options): 

190 if isinstance(trailingBits, SubstrateUnderrunError): 

191 yield trailingBits 

192 

193 trailingBits = ord(trailingBits) 

194 if trailingBits > 7: 

195 raise error.PyAsn1Error( 

196 'Trailing bits overflow %s' % trailingBits 

197 ) 

198 

199 for chunk in readFromStream(substrate, length - 1, options): 

200 if isinstance(chunk, SubstrateUnderrunError): 

201 yield chunk 

202 

203 value = self.protoComponent.fromOctetString( 

204 chunk, internalFormat=True, padding=trailingBits) 

205 

206 yield self._createComponent(asn1Spec, tagSet, value, **options) 

207 

208 return 

209 

210 if not self.supportConstructedForm: 

211 raise error.PyAsn1Error('Constructed encoding form prohibited ' 

212 'at %s' % self.__class__.__name__) 

213 

214 if LOG: 

215 LOG('assembling constructed serialization') 

216 

217 # All inner fragments are of the same type, treat them as octet string 

218 substrateFun = self.substrateCollector 

219 

220 bitString = self.protoComponent.fromOctetString(null, internalFormat=True) 

221 

222 current_position = substrate.tell() 

223 

224 while substrate.tell() - current_position < length: 

225 for component in decodeFun( 

226 substrate, self.protoComponent, substrateFun=substrateFun, 

227 **options): 

228 if isinstance(component, SubstrateUnderrunError): 

229 yield component 

230 

231 trailingBits = oct2int(component[0]) 

232 if trailingBits > 7: 

233 raise error.PyAsn1Error( 

234 'Trailing bits overflow %s' % trailingBits 

235 ) 

236 

237 bitString = self.protoComponent.fromOctetString( 

238 component[1:], internalFormat=True, 

239 prepend=bitString, padding=trailingBits 

240 ) 

241 

242 yield self._createComponent(asn1Spec, tagSet, bitString, **options) 

243 

244 def indefLenValueDecoder(self, substrate, asn1Spec, 

245 tagSet=None, length=None, state=None, 

246 decodeFun=None, substrateFun=None, 

247 **options): 

248 

249 if substrateFun: 

250 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options) 

251 

252 for chunk in substrateFun(asn1Object, substrate, length, options): 

253 yield chunk 

254 

255 return 

256 

257 # All inner fragments are of the same type, treat them as octet string 

258 substrateFun = self.substrateCollector 

259 

260 bitString = self.protoComponent.fromOctetString(null, internalFormat=True) 

261 

262 while True: # loop over fragments 

263 

264 for component in decodeFun( 

265 substrate, self.protoComponent, substrateFun=substrateFun, 

266 allowEoo=True, **options): 

267 

268 if component is eoo.endOfOctets: 

269 break 

270 

271 if isinstance(component, SubstrateUnderrunError): 

272 yield component 

273 

274 if component is eoo.endOfOctets: 

275 break 

276 

277 trailingBits = oct2int(component[0]) 

278 if trailingBits > 7: 

279 raise error.PyAsn1Error( 

280 'Trailing bits overflow %s' % trailingBits 

281 ) 

282 

283 bitString = self.protoComponent.fromOctetString( 

284 component[1:], internalFormat=True, 

285 prepend=bitString, padding=trailingBits 

286 ) 

287 

288 yield self._createComponent(asn1Spec, tagSet, bitString, **options) 

289 

290 

291class OctetStringPayloadDecoder(AbstractSimplePayloadDecoder): 

292 protoComponent = univ.OctetString('') 

293 supportConstructedForm = True 

294 

295 def valueDecoder(self, substrate, asn1Spec, 

296 tagSet=None, length=None, state=None, 

297 decodeFun=None, substrateFun=None, 

298 **options): 

299 if substrateFun: 

300 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options) 

301 

302 for chunk in substrateFun(asn1Object, substrate, length, options): 

303 yield chunk 

304 

305 return 

306 

307 if tagSet[0].tagFormat == tag.tagFormatSimple: # XXX what tag to check? 

308 for chunk in readFromStream(substrate, length, options): 

309 if isinstance(chunk, SubstrateUnderrunError): 

310 yield chunk 

311 

312 yield self._createComponent(asn1Spec, tagSet, chunk, **options) 

313 

314 return 

315 

316 if not self.supportConstructedForm: 

317 raise error.PyAsn1Error('Constructed encoding form prohibited at %s' % self.__class__.__name__) 

318 

319 if LOG: 

320 LOG('assembling constructed serialization') 

321 

322 # All inner fragments are of the same type, treat them as octet string 

323 substrateFun = self.substrateCollector 

324 

325 header = null 

326 

327 original_position = substrate.tell() 

328 # head = popSubstream(substrate, length) 

329 while substrate.tell() - original_position < length: 

330 for component in decodeFun( 

331 substrate, self.protoComponent, substrateFun=substrateFun, 

332 **options): 

333 if isinstance(component, SubstrateUnderrunError): 

334 yield component 

335 

336 header += component 

337 

338 yield self._createComponent(asn1Spec, tagSet, header, **options) 

339 

340 def indefLenValueDecoder(self, substrate, asn1Spec, 

341 tagSet=None, length=None, state=None, 

342 decodeFun=None, substrateFun=None, 

343 **options): 

344 if substrateFun and substrateFun is not self.substrateCollector: 

345 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options) 

346 

347 for chunk in substrateFun(asn1Object, substrate, length, options): 

348 yield chunk 

349 

350 return 

351 

352 # All inner fragments are of the same type, treat them as octet string 

353 substrateFun = self.substrateCollector 

354 

355 header = null 

356 

357 while True: # loop over fragments 

358 

359 for component in decodeFun( 

360 substrate, self.protoComponent, substrateFun=substrateFun, 

361 allowEoo=True, **options): 

362 

363 if isinstance(component, SubstrateUnderrunError): 

364 yield component 

365 

366 if component is eoo.endOfOctets: 

367 break 

368 

369 if component is eoo.endOfOctets: 

370 break 

371 

372 header += component 

373 

374 yield self._createComponent(asn1Spec, tagSet, header, **options) 

375 

376 

377class NullPayloadDecoder(AbstractSimplePayloadDecoder): 

378 protoComponent = univ.Null('') 

379 

380 def valueDecoder(self, substrate, asn1Spec, 

381 tagSet=None, length=None, state=None, 

382 decodeFun=None, substrateFun=None, 

383 **options): 

384 

385 if tagSet[0].tagFormat != tag.tagFormatSimple: 

386 raise error.PyAsn1Error('Simple tag format expected') 

387 

388 for chunk in readFromStream(substrate, length, options): 

389 if isinstance(chunk, SubstrateUnderrunError): 

390 yield chunk 

391 

392 component = self._createComponent(asn1Spec, tagSet, '', **options) 

393 

394 if chunk: 

395 raise error.PyAsn1Error('Unexpected %d-octet substrate for Null' % length) 

396 

397 yield component 

398 

399 

400class ObjectIdentifierPayloadDecoder(AbstractSimplePayloadDecoder): 

401 protoComponent = univ.ObjectIdentifier(()) 

402 

403 def valueDecoder(self, substrate, asn1Spec, 

404 tagSet=None, length=None, state=None, 

405 decodeFun=None, substrateFun=None, 

406 **options): 

407 if tagSet[0].tagFormat != tag.tagFormatSimple: 

408 raise error.PyAsn1Error('Simple tag format expected') 

409 

410 for chunk in readFromStream(substrate, length, options): 

411 if isinstance(chunk, SubstrateUnderrunError): 

412 yield chunk 

413 

414 if not chunk: 

415 raise error.PyAsn1Error('Empty substrate') 

416 

417 chunk = octs2ints(chunk) 

418 

419 oid = () 

420 index = 0 

421 substrateLen = len(chunk) 

422 while index < substrateLen: 

423 subId = chunk[index] 

424 index += 1 

425 if subId < 128: 

426 oid += (subId,) 

427 elif subId > 128: 

428 # Construct subid from a number of octets 

429 nextSubId = subId 

430 subId = 0 

431 while nextSubId >= 128: 

432 subId = (subId << 7) + (nextSubId & 0x7F) 

433 if index >= substrateLen: 

434 raise error.SubstrateUnderrunError( 

435 'Short substrate for sub-OID past %s' % (oid,) 

436 ) 

437 nextSubId = chunk[index] 

438 index += 1 

439 oid += ((subId << 7) + nextSubId,) 

440 elif subId == 128: 

441 # ASN.1 spec forbids leading zeros (0x80) in OID 

442 # encoding, tolerating it opens a vulnerability. See 

443 # https://www.esat.kuleuven.be/cosic/publications/article-1432.pdf 

444 # page 7 

445 raise error.PyAsn1Error('Invalid octet 0x80 in OID encoding') 

446 

447 # Decode two leading arcs 

448 if 0 <= oid[0] <= 39: 

449 oid = (0,) + oid 

450 elif 40 <= oid[0] <= 79: 

451 oid = (1, oid[0] - 40) + oid[1:] 

452 elif oid[0] >= 80: 

453 oid = (2, oid[0] - 80) + oid[1:] 

454 else: 

455 raise error.PyAsn1Error('Malformed first OID octet: %s' % chunk[0]) 

456 

457 yield self._createComponent(asn1Spec, tagSet, oid, **options) 

458 

459 

460class RealPayloadDecoder(AbstractSimplePayloadDecoder): 

461 protoComponent = univ.Real() 

462 

463 def valueDecoder(self, substrate, asn1Spec, 

464 tagSet=None, length=None, state=None, 

465 decodeFun=None, substrateFun=None, 

466 **options): 

467 if tagSet[0].tagFormat != tag.tagFormatSimple: 

468 raise error.PyAsn1Error('Simple tag format expected') 

469 

470 for chunk in readFromStream(substrate, length, options): 

471 if isinstance(chunk, SubstrateUnderrunError): 

472 yield chunk 

473 

474 if not chunk: 

475 yield self._createComponent(asn1Spec, tagSet, 0.0, **options) 

476 return 

477 

478 fo = oct2int(chunk[0]) 

479 chunk = chunk[1:] 

480 if fo & 0x80: # binary encoding 

481 if not chunk: 

482 raise error.PyAsn1Error("Incomplete floating-point value") 

483 

484 if LOG: 

485 LOG('decoding binary encoded REAL') 

486 

487 n = (fo & 0x03) + 1 

488 

489 if n == 4: 

490 n = oct2int(chunk[0]) 

491 chunk = chunk[1:] 

492 

493 eo, chunk = chunk[:n], chunk[n:] 

494 

495 if not eo or not chunk: 

496 raise error.PyAsn1Error('Real exponent screwed') 

497 

498 e = oct2int(eo[0]) & 0x80 and -1 or 0 

499 

500 while eo: # exponent 

501 e <<= 8 

502 e |= oct2int(eo[0]) 

503 eo = eo[1:] 

504 

505 b = fo >> 4 & 0x03 # base bits 

506 

507 if b > 2: 

508 raise error.PyAsn1Error('Illegal Real base') 

509 

510 if b == 1: # encbase = 8 

511 e *= 3 

512 

513 elif b == 2: # encbase = 16 

514 e *= 4 

515 p = 0 

516 

517 while chunk: # value 

518 p <<= 8 

519 p |= oct2int(chunk[0]) 

520 chunk = chunk[1:] 

521 

522 if fo & 0x40: # sign bit 

523 p = -p 

524 

525 sf = fo >> 2 & 0x03 # scale bits 

526 p *= 2 ** sf 

527 value = (p, 2, e) 

528 

529 elif fo & 0x40: # infinite value 

530 if LOG: 

531 LOG('decoding infinite REAL') 

532 

533 value = fo & 0x01 and '-inf' or 'inf' 

534 

535 elif fo & 0xc0 == 0: # character encoding 

536 if not chunk: 

537 raise error.PyAsn1Error("Incomplete floating-point value") 

538 

539 if LOG: 

540 LOG('decoding character encoded REAL') 

541 

542 try: 

543 if fo & 0x3 == 0x1: # NR1 

544 value = (int(chunk), 10, 0) 

545 

546 elif fo & 0x3 == 0x2: # NR2 

547 value = float(chunk) 

548 

549 elif fo & 0x3 == 0x3: # NR3 

550 value = float(chunk) 

551 

552 else: 

553 raise error.SubstrateUnderrunError( 

554 'Unknown NR (tag %s)' % fo 

555 ) 

556 

557 except ValueError: 

558 raise error.SubstrateUnderrunError( 

559 'Bad character Real syntax' 

560 ) 

561 

562 else: 

563 raise error.SubstrateUnderrunError( 

564 'Unknown encoding (tag %s)' % fo 

565 ) 

566 

567 yield self._createComponent(asn1Spec, tagSet, value, **options) 

568 

569 

570class AbstractConstructedPayloadDecoder(AbstractPayloadDecoder): 

571 protoComponent = None 

572 

573 

574class ConstructedPayloadDecoderBase(AbstractConstructedPayloadDecoder): 

575 protoRecordComponent = None 

576 protoSequenceComponent = None 

577 

578 def _getComponentTagMap(self, asn1Object, idx): 

579 raise NotImplementedError() 

580 

581 def _getComponentPositionByType(self, asn1Object, tagSet, idx): 

582 raise NotImplementedError() 

583 

584 def _decodeComponentsSchemaless( 

585 self, substrate, tagSet=None, decodeFun=None, 

586 length=None, **options): 

587 

588 asn1Object = None 

589 

590 components = [] 

591 componentTypes = set() 

592 

593 original_position = substrate.tell() 

594 

595 while length == -1 or substrate.tell() < original_position + length: 

596 for component in decodeFun(substrate, **options): 

597 if isinstance(component, SubstrateUnderrunError): 

598 yield component 

599 

600 if length == -1 and component is eoo.endOfOctets: 

601 break 

602 

603 components.append(component) 

604 componentTypes.add(component.tagSet) 

605 

606 # Now we have to guess is it SEQUENCE/SET or SEQUENCE OF/SET OF 

607 # The heuristics is: 

608 # * 1+ components of different types -> likely SEQUENCE/SET 

609 # * otherwise -> likely SEQUENCE OF/SET OF 

610 if len(componentTypes) > 1: 

611 protoComponent = self.protoRecordComponent 

612 

613 else: 

614 protoComponent = self.protoSequenceComponent 

615 

616 asn1Object = protoComponent.clone( 

617 # construct tagSet from base tag from prototype ASN.1 object 

618 # and additional tags recovered from the substrate 

619 tagSet=tag.TagSet(protoComponent.tagSet.baseTag, *tagSet.superTags) 

620 ) 

621 

622 if LOG: 

623 LOG('guessed %r container type (pass `asn1Spec` to guide the ' 

624 'decoder)' % asn1Object) 

625 

626 for idx, component in enumerate(components): 

627 asn1Object.setComponentByPosition( 

628 idx, component, 

629 verifyConstraints=False, 

630 matchTags=False, matchConstraints=False 

631 ) 

632 

633 yield asn1Object 

634 

635 def valueDecoder(self, substrate, asn1Spec, 

636 tagSet=None, length=None, state=None, 

637 decodeFun=None, substrateFun=None, 

638 **options): 

639 if tagSet[0].tagFormat != tag.tagFormatConstructed: 

640 raise error.PyAsn1Error('Constructed tag format expected') 

641 

642 original_position = substrate.tell() 

643 

644 if substrateFun: 

645 if asn1Spec is not None: 

646 asn1Object = asn1Spec.clone() 

647 

648 elif self.protoComponent is not None: 

649 asn1Object = self.protoComponent.clone(tagSet=tagSet) 

650 

651 else: 

652 asn1Object = self.protoRecordComponent, self.protoSequenceComponent 

653 

654 for chunk in substrateFun(asn1Object, substrate, length, options): 

655 yield chunk 

656 

657 return 

658 

659 if asn1Spec is None: 

660 for asn1Object in self._decodeComponentsSchemaless( 

661 substrate, tagSet=tagSet, decodeFun=decodeFun, 

662 length=length, **options): 

663 if isinstance(asn1Object, SubstrateUnderrunError): 

664 yield asn1Object 

665 

666 if substrate.tell() < original_position + length: 

667 if LOG: 

668 for trailing in readFromStream(substrate, context=options): 

669 if isinstance(trailing, SubstrateUnderrunError): 

670 yield trailing 

671 

672 LOG('Unused trailing %d octets encountered: %s' % ( 

673 len(trailing), debug.hexdump(trailing))) 

674 

675 yield asn1Object 

676 

677 return 

678 

679 asn1Object = asn1Spec.clone() 

680 asn1Object.clear() 

681 

682 options = self._passAsn1Object(asn1Object, options) 

683 

684 if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId): 

685 

686 namedTypes = asn1Spec.componentType 

687 

688 isSetType = asn1Spec.typeId == univ.Set.typeId 

689 isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault 

690 

691 if LOG: 

692 LOG('decoding %sdeterministic %s type %r chosen by type ID' % ( 

693 not isDeterministic and 'non-' or '', isSetType and 'SET' or '', 

694 asn1Spec)) 

695 

696 seenIndices = set() 

697 idx = 0 

698 while substrate.tell() - original_position < length: 

699 if not namedTypes: 

700 componentType = None 

701 

702 elif isSetType: 

703 componentType = namedTypes.tagMapUnique 

704 

705 else: 

706 try: 

707 if isDeterministic: 

708 componentType = namedTypes[idx].asn1Object 

709 

710 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 

711 componentType = namedTypes.getTagMapNearPosition(idx) 

712 

713 else: 

714 componentType = namedTypes[idx].asn1Object 

715 

716 except IndexError: 

717 raise error.PyAsn1Error( 

718 'Excessive components decoded at %r' % (asn1Spec,) 

719 ) 

720 

721 for component in decodeFun(substrate, componentType, **options): 

722 if isinstance(component, SubstrateUnderrunError): 

723 yield component 

724 

725 if not isDeterministic and namedTypes: 

726 if isSetType: 

727 idx = namedTypes.getPositionByType(component.effectiveTagSet) 

728 

729 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 

730 idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx) 

731 

732 asn1Object.setComponentByPosition( 

733 idx, component, 

734 verifyConstraints=False, 

735 matchTags=False, matchConstraints=False 

736 ) 

737 

738 seenIndices.add(idx) 

739 idx += 1 

740 

741 if LOG: 

742 LOG('seen component indices %s' % seenIndices) 

743 

744 if namedTypes: 

745 if not namedTypes.requiredComponents.issubset(seenIndices): 

746 raise error.PyAsn1Error( 

747 'ASN.1 object %s has uninitialized ' 

748 'components' % asn1Object.__class__.__name__) 

749 

750 if namedTypes.hasOpenTypes: 

751 

752 openTypes = options.get('openTypes', {}) 

753 

754 if LOG: 

755 LOG('user-specified open types map:') 

756 

757 for k, v in openTypes.items(): 

758 LOG('%s -> %r' % (k, v)) 

759 

760 if openTypes or options.get('decodeOpenTypes', False): 

761 

762 for idx, namedType in enumerate(namedTypes.namedTypes): 

763 if not namedType.openType: 

764 continue 

765 

766 if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue: 

767 continue 

768 

769 governingValue = asn1Object.getComponentByName( 

770 namedType.openType.name 

771 ) 

772 

773 try: 

774 openType = openTypes[governingValue] 

775 

776 except KeyError: 

777 

778 if LOG: 

779 LOG('default open types map of component ' 

780 '"%s.%s" governed by component "%s.%s"' 

781 ':' % (asn1Object.__class__.__name__, 

782 namedType.name, 

783 asn1Object.__class__.__name__, 

784 namedType.openType.name)) 

785 

786 for k, v in namedType.openType.items(): 

787 LOG('%s -> %r' % (k, v)) 

788 

789 try: 

790 openType = namedType.openType[governingValue] 

791 

792 except KeyError: 

793 if LOG: 

794 LOG('failed to resolve open type by governing ' 

795 'value %r' % (governingValue,)) 

796 continue 

797 

798 if LOG: 

799 LOG('resolved open type %r by governing ' 

800 'value %r' % (openType, governingValue)) 

801 

802 containerValue = asn1Object.getComponentByPosition(idx) 

803 

804 if containerValue.typeId in ( 

805 univ.SetOf.typeId, univ.SequenceOf.typeId): 

806 

807 for pos, containerElement in enumerate( 

808 containerValue): 

809 

810 stream = asSeekableStream(containerValue[pos].asOctets()) 

811 

812 for component in decodeFun(stream, asn1Spec=openType, **options): 

813 if isinstance(component, SubstrateUnderrunError): 

814 yield component 

815 

816 containerValue[pos] = component 

817 

818 else: 

819 stream = asSeekableStream(asn1Object.getComponentByPosition(idx).asOctets()) 

820 

821 for component in decodeFun(stream, asn1Spec=openType, **options): 

822 if isinstance(component, SubstrateUnderrunError): 

823 yield component 

824 

825 asn1Object.setComponentByPosition(idx, component) 

826 

827 else: 

828 inconsistency = asn1Object.isInconsistent 

829 if inconsistency: 

830 raise inconsistency 

831 

832 else: 

833 componentType = asn1Spec.componentType 

834 

835 if LOG: 

836 LOG('decoding type %r chosen by given `asn1Spec`' % componentType) 

837 

838 idx = 0 

839 

840 while substrate.tell() - original_position < length: 

841 for component in decodeFun(substrate, componentType, **options): 

842 if isinstance(component, SubstrateUnderrunError): 

843 yield component 

844 

845 asn1Object.setComponentByPosition( 

846 idx, component, 

847 verifyConstraints=False, 

848 matchTags=False, matchConstraints=False 

849 ) 

850 

851 idx += 1 

852 

853 yield asn1Object 

854 

855 def indefLenValueDecoder(self, substrate, asn1Spec, 

856 tagSet=None, length=None, state=None, 

857 decodeFun=None, substrateFun=None, 

858 **options): 

859 if tagSet[0].tagFormat != tag.tagFormatConstructed: 

860 raise error.PyAsn1Error('Constructed tag format expected') 

861 

862 if substrateFun is not None: 

863 if asn1Spec is not None: 

864 asn1Object = asn1Spec.clone() 

865 

866 elif self.protoComponent is not None: 

867 asn1Object = self.protoComponent.clone(tagSet=tagSet) 

868 

869 else: 

870 asn1Object = self.protoRecordComponent, self.protoSequenceComponent 

871 

872 for chunk in substrateFun(asn1Object, substrate, length, options): 

873 yield chunk 

874 

875 return 

876 

877 if asn1Spec is None: 

878 for asn1Object in self._decodeComponentsSchemaless( 

879 substrate, tagSet=tagSet, decodeFun=decodeFun, 

880 length=length, **dict(options, allowEoo=True)): 

881 if isinstance(asn1Object, SubstrateUnderrunError): 

882 yield asn1Object 

883 

884 yield asn1Object 

885 

886 return 

887 

888 asn1Object = asn1Spec.clone() 

889 asn1Object.clear() 

890 

891 options = self._passAsn1Object(asn1Object, options) 

892 

893 if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId): 

894 

895 namedTypes = asn1Object.componentType 

896 

897 isSetType = asn1Object.typeId == univ.Set.typeId 

898 isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault 

899 

900 if LOG: 

901 LOG('decoding %sdeterministic %s type %r chosen by type ID' % ( 

902 not isDeterministic and 'non-' or '', isSetType and 'SET' or '', 

903 asn1Spec)) 

904 

905 seenIndices = set() 

906 

907 idx = 0 

908 

909 while True: # loop over components 

910 if len(namedTypes) <= idx: 

911 asn1Spec = None 

912 

913 elif isSetType: 

914 asn1Spec = namedTypes.tagMapUnique 

915 

916 else: 

917 try: 

918 if isDeterministic: 

919 asn1Spec = namedTypes[idx].asn1Object 

920 

921 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 

922 asn1Spec = namedTypes.getTagMapNearPosition(idx) 

923 

924 else: 

925 asn1Spec = namedTypes[idx].asn1Object 

926 

927 except IndexError: 

928 raise error.PyAsn1Error( 

929 'Excessive components decoded at %r' % (asn1Object,) 

930 ) 

931 

932 for component in decodeFun(substrate, asn1Spec, allowEoo=True, **options): 

933 

934 if isinstance(component, SubstrateUnderrunError): 

935 yield component 

936 

937 if component is eoo.endOfOctets: 

938 break 

939 

940 if component is eoo.endOfOctets: 

941 break 

942 

943 if not isDeterministic and namedTypes: 

944 if isSetType: 

945 idx = namedTypes.getPositionByType(component.effectiveTagSet) 

946 

947 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 

948 idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx) 

949 

950 asn1Object.setComponentByPosition( 

951 idx, component, 

952 verifyConstraints=False, 

953 matchTags=False, matchConstraints=False 

954 ) 

955 

956 seenIndices.add(idx) 

957 idx += 1 

958 

959 if LOG: 

960 LOG('seen component indices %s' % seenIndices) 

961 

962 if namedTypes: 

963 if not namedTypes.requiredComponents.issubset(seenIndices): 

964 raise error.PyAsn1Error( 

965 'ASN.1 object %s has uninitialized ' 

966 'components' % asn1Object.__class__.__name__) 

967 

968 if namedTypes.hasOpenTypes: 

969 

970 openTypes = options.get('openTypes', {}) 

971 

972 if LOG: 

973 LOG('user-specified open types map:') 

974 

975 for k, v in openTypes.items(): 

976 LOG('%s -> %r' % (k, v)) 

977 

978 if openTypes or options.get('decodeOpenTypes', False): 

979 

980 for idx, namedType in enumerate(namedTypes.namedTypes): 

981 if not namedType.openType: 

982 continue 

983 

984 if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue: 

985 continue 

986 

987 governingValue = asn1Object.getComponentByName( 

988 namedType.openType.name 

989 ) 

990 

991 try: 

992 openType = openTypes[governingValue] 

993 

994 except KeyError: 

995 

996 if LOG: 

997 LOG('default open types map of component ' 

998 '"%s.%s" governed by component "%s.%s"' 

999 ':' % (asn1Object.__class__.__name__, 

1000 namedType.name, 

1001 asn1Object.__class__.__name__, 

1002 namedType.openType.name)) 

1003 

1004 for k, v in namedType.openType.items(): 

1005 LOG('%s -> %r' % (k, v)) 

1006 

1007 try: 

1008 openType = namedType.openType[governingValue] 

1009 

1010 except KeyError: 

1011 if LOG: 

1012 LOG('failed to resolve open type by governing ' 

1013 'value %r' % (governingValue,)) 

1014 continue 

1015 

1016 if LOG: 

1017 LOG('resolved open type %r by governing ' 

1018 'value %r' % (openType, governingValue)) 

1019 

1020 containerValue = asn1Object.getComponentByPosition(idx) 

1021 

1022 if containerValue.typeId in ( 

1023 univ.SetOf.typeId, univ.SequenceOf.typeId): 

1024 

1025 for pos, containerElement in enumerate( 

1026 containerValue): 

1027 

1028 stream = asSeekableStream(containerValue[pos].asOctets()) 

1029 

1030 for component in decodeFun(stream, asn1Spec=openType, 

1031 **dict(options, allowEoo=True)): 

1032 if isinstance(component, SubstrateUnderrunError): 

1033 yield component 

1034 

1035 if component is eoo.endOfOctets: 

1036 break 

1037 

1038 containerValue[pos] = component 

1039 

1040 else: 

1041 stream = asSeekableStream(asn1Object.getComponentByPosition(idx).asOctets()) 

1042 for component in decodeFun(stream, asn1Spec=openType, 

1043 **dict(options, allowEoo=True)): 

1044 if isinstance(component, SubstrateUnderrunError): 

1045 yield component 

1046 

1047 if component is eoo.endOfOctets: 

1048 break 

1049 

1050 asn1Object.setComponentByPosition(idx, component) 

1051 

1052 else: 

1053 inconsistency = asn1Object.isInconsistent 

1054 if inconsistency: 

1055 raise inconsistency 

1056 

1057 else: 

1058 componentType = asn1Spec.componentType 

1059 

1060 if LOG: 

1061 LOG('decoding type %r chosen by given `asn1Spec`' % componentType) 

1062 

1063 idx = 0 

1064 

1065 while True: 

1066 

1067 for component in decodeFun( 

1068 substrate, componentType, allowEoo=True, **options): 

1069 

1070 if isinstance(component, SubstrateUnderrunError): 

1071 yield component 

1072 

1073 if component is eoo.endOfOctets: 

1074 break 

1075 

1076 if component is eoo.endOfOctets: 

1077 break 

1078 

1079 asn1Object.setComponentByPosition( 

1080 idx, component, 

1081 verifyConstraints=False, 

1082 matchTags=False, matchConstraints=False 

1083 ) 

1084 

1085 idx += 1 

1086 

1087 yield asn1Object 

1088 

1089 

1090class SequenceOrSequenceOfPayloadDecoder(ConstructedPayloadDecoderBase): 

1091 protoRecordComponent = univ.Sequence() 

1092 protoSequenceComponent = univ.SequenceOf() 

1093 

1094 

1095class SequencePayloadDecoder(SequenceOrSequenceOfPayloadDecoder): 

1096 protoComponent = univ.Sequence() 

1097 

1098 

1099class SequenceOfPayloadDecoder(SequenceOrSequenceOfPayloadDecoder): 

1100 protoComponent = univ.SequenceOf() 

1101 

1102 

1103class SetOrSetOfPayloadDecoder(ConstructedPayloadDecoderBase): 

1104 protoRecordComponent = univ.Set() 

1105 protoSequenceComponent = univ.SetOf() 

1106 

1107 

1108class SetPayloadDecoder(SetOrSetOfPayloadDecoder): 

1109 protoComponent = univ.Set() 

1110 

1111 

1112class SetOfPayloadDecoder(SetOrSetOfPayloadDecoder): 

1113 protoComponent = univ.SetOf() 

1114 

1115 

1116class ChoicePayloadDecoder(ConstructedPayloadDecoderBase): 

1117 protoComponent = univ.Choice() 

1118 

1119 def valueDecoder(self, substrate, asn1Spec, 

1120 tagSet=None, length=None, state=None, 

1121 decodeFun=None, substrateFun=None, 

1122 **options): 

1123 if asn1Spec is None: 

1124 asn1Object = self.protoComponent.clone(tagSet=tagSet) 

1125 

1126 else: 

1127 asn1Object = asn1Spec.clone() 

1128 

1129 if substrateFun: 

1130 for chunk in substrateFun(asn1Object, substrate, length, options): 

1131 yield chunk 

1132 

1133 return 

1134 

1135 options = self._passAsn1Object(asn1Object, options) 

1136 

1137 if asn1Object.tagSet == tagSet: 

1138 if LOG: 

1139 LOG('decoding %s as explicitly tagged CHOICE' % (tagSet,)) 

1140 

1141 for component in decodeFun( 

1142 substrate, asn1Object.componentTagMap, **options): 

1143 if isinstance(component, SubstrateUnderrunError): 

1144 yield component 

1145 

1146 else: 

1147 if LOG: 

1148 LOG('decoding %s as untagged CHOICE' % (tagSet,)) 

1149 

1150 for component in decodeFun( 

1151 substrate, asn1Object.componentTagMap, tagSet, length, 

1152 state, **options): 

1153 if isinstance(component, SubstrateUnderrunError): 

1154 yield component 

1155 

1156 effectiveTagSet = component.effectiveTagSet 

1157 

1158 if LOG: 

1159 LOG('decoded component %s, effective tag set %s' % (component, effectiveTagSet)) 

1160 

1161 asn1Object.setComponentByType( 

1162 effectiveTagSet, component, 

1163 verifyConstraints=False, 

1164 matchTags=False, matchConstraints=False, 

1165 innerFlag=False 

1166 ) 

1167 

1168 yield asn1Object 

1169 

1170 def indefLenValueDecoder(self, substrate, asn1Spec, 

1171 tagSet=None, length=None, state=None, 

1172 decodeFun=None, substrateFun=None, 

1173 **options): 

1174 if asn1Spec is None: 

1175 asn1Object = self.protoComponent.clone(tagSet=tagSet) 

1176 

1177 else: 

1178 asn1Object = asn1Spec.clone() 

1179 

1180 if substrateFun: 

1181 for chunk in substrateFun(asn1Object, substrate, length, options): 

1182 yield chunk 

1183 

1184 return 

1185 

1186 options = self._passAsn1Object(asn1Object, options) 

1187 

1188 isTagged = asn1Object.tagSet == tagSet 

1189 

1190 if LOG: 

1191 LOG('decoding %s as %stagged CHOICE' % ( 

1192 tagSet, isTagged and 'explicitly ' or 'un')) 

1193 

1194 while True: 

1195 

1196 if isTagged: 

1197 iterator = decodeFun( 

1198 substrate, asn1Object.componentType.tagMapUnique, 

1199 **dict(options, allowEoo=True)) 

1200 

1201 else: 

1202 iterator = decodeFun( 

1203 substrate, asn1Object.componentType.tagMapUnique, 

1204 tagSet, length, state, **dict(options, allowEoo=True)) 

1205 

1206 for component in iterator: 

1207 

1208 if isinstance(component, SubstrateUnderrunError): 

1209 yield component 

1210 

1211 if component is eoo.endOfOctets: 

1212 break 

1213 

1214 effectiveTagSet = component.effectiveTagSet 

1215 

1216 if LOG: 

1217 LOG('decoded component %s, effective tag set ' 

1218 '%s' % (component, effectiveTagSet)) 

1219 

1220 asn1Object.setComponentByType( 

1221 effectiveTagSet, component, 

1222 verifyConstraints=False, 

1223 matchTags=False, matchConstraints=False, 

1224 innerFlag=False 

1225 ) 

1226 

1227 if not isTagged: 

1228 break 

1229 

1230 if not isTagged or component is eoo.endOfOctets: 

1231 break 

1232 

1233 yield asn1Object 

1234 

1235 

1236class AnyPayloadDecoder(AbstractSimplePayloadDecoder): 

1237 protoComponent = univ.Any() 

1238 

1239 def valueDecoder(self, substrate, asn1Spec, 

1240 tagSet=None, length=None, state=None, 

1241 decodeFun=None, substrateFun=None, 

1242 **options): 

1243 if asn1Spec is None: 

1244 isUntagged = True 

1245 

1246 elif asn1Spec.__class__ is tagmap.TagMap: 

1247 isUntagged = tagSet not in asn1Spec.tagMap 

1248 

1249 else: 

1250 isUntagged = tagSet != asn1Spec.tagSet 

1251 

1252 if isUntagged: 

1253 fullPosition = substrate.markedPosition 

1254 currentPosition = substrate.tell() 

1255 

1256 substrate.seek(fullPosition, os.SEEK_SET) 

1257 length += currentPosition - fullPosition 

1258 

1259 if LOG: 

1260 for chunk in peekIntoStream(substrate, length): 

1261 if isinstance(chunk, SubstrateUnderrunError): 

1262 yield chunk 

1263 LOG('decoding as untagged ANY, substrate ' 

1264 '%s' % debug.hexdump(chunk)) 

1265 

1266 if substrateFun: 

1267 for chunk in substrateFun( 

1268 self._createComponent(asn1Spec, tagSet, noValue, **options), 

1269 substrate, length, options): 

1270 yield chunk 

1271 

1272 return 

1273 

1274 for chunk in readFromStream(substrate, length, options): 

1275 if isinstance(chunk, SubstrateUnderrunError): 

1276 yield chunk 

1277 

1278 yield self._createComponent(asn1Spec, tagSet, chunk, **options) 

1279 

1280 def indefLenValueDecoder(self, substrate, asn1Spec, 

1281 tagSet=None, length=None, state=None, 

1282 decodeFun=None, substrateFun=None, 

1283 **options): 

1284 if asn1Spec is None: 

1285 isTagged = False 

1286 

1287 elif asn1Spec.__class__ is tagmap.TagMap: 

1288 isTagged = tagSet in asn1Spec.tagMap 

1289 

1290 else: 

1291 isTagged = tagSet == asn1Spec.tagSet 

1292 

1293 if isTagged: 

1294 # tagged Any type -- consume header substrate 

1295 chunk = null 

1296 

1297 if LOG: 

1298 LOG('decoding as tagged ANY') 

1299 

1300 else: 

1301 # TODO: Seems not to be tested 

1302 fullPosition = substrate.markedPosition 

1303 currentPosition = substrate.tell() 

1304 

1305 substrate.seek(fullPosition, os.SEEK_SET) 

1306 for chunk in readFromStream(substrate, currentPosition - fullPosition, options): 

1307 if isinstance(chunk, SubstrateUnderrunError): 

1308 yield chunk 

1309 

1310 if LOG: 

1311 LOG('decoding as untagged ANY, header substrate %s' % debug.hexdump(chunk)) 

1312 

1313 # Any components do not inherit initial tag 

1314 asn1Spec = self.protoComponent 

1315 

1316 if substrateFun and substrateFun is not self.substrateCollector: 

1317 asn1Object = self._createComponent( 

1318 asn1Spec, tagSet, noValue, **options) 

1319 

1320 for chunk in substrateFun( 

1321 asn1Object, chunk + substrate, length + len(chunk), options): 

1322 yield chunk 

1323 

1324 return 

1325 

1326 if LOG: 

1327 LOG('assembling constructed serialization') 

1328 

1329 # All inner fragments are of the same type, treat them as octet string 

1330 substrateFun = self.substrateCollector 

1331 

1332 while True: # loop over fragments 

1333 

1334 for component in decodeFun( 

1335 substrate, asn1Spec, substrateFun=substrateFun, 

1336 allowEoo=True, **options): 

1337 

1338 if isinstance(component, SubstrateUnderrunError): 

1339 yield component 

1340 

1341 if component is eoo.endOfOctets: 

1342 break 

1343 

1344 if component is eoo.endOfOctets: 

1345 break 

1346 

1347 chunk += component 

1348 

1349 if substrateFun: 

1350 yield chunk # TODO: Weird 

1351 

1352 else: 

1353 yield self._createComponent(asn1Spec, tagSet, chunk, **options) 

1354 

1355 

1356# character string types 

1357class UTF8StringPayloadDecoder(OctetStringPayloadDecoder): 

1358 protoComponent = char.UTF8String() 

1359 

1360 

1361class NumericStringPayloadDecoder(OctetStringPayloadDecoder): 

1362 protoComponent = char.NumericString() 

1363 

1364 

1365class PrintableStringPayloadDecoder(OctetStringPayloadDecoder): 

1366 protoComponent = char.PrintableString() 

1367 

1368 

1369class TeletexStringPayloadDecoder(OctetStringPayloadDecoder): 

1370 protoComponent = char.TeletexString() 

1371 

1372 

1373class VideotexStringPayloadDecoder(OctetStringPayloadDecoder): 

1374 protoComponent = char.VideotexString() 

1375 

1376 

1377class IA5StringPayloadDecoder(OctetStringPayloadDecoder): 

1378 protoComponent = char.IA5String() 

1379 

1380 

1381class GraphicStringPayloadDecoder(OctetStringPayloadDecoder): 

1382 protoComponent = char.GraphicString() 

1383 

1384 

1385class VisibleStringPayloadDecoder(OctetStringPayloadDecoder): 

1386 protoComponent = char.VisibleString() 

1387 

1388 

1389class GeneralStringPayloadDecoder(OctetStringPayloadDecoder): 

1390 protoComponent = char.GeneralString() 

1391 

1392 

1393class UniversalStringPayloadDecoder(OctetStringPayloadDecoder): 

1394 protoComponent = char.UniversalString() 

1395 

1396 

1397class BMPStringPayloadDecoder(OctetStringPayloadDecoder): 

1398 protoComponent = char.BMPString() 

1399 

1400 

1401# "useful" types 

1402class ObjectDescriptorPayloadDecoder(OctetStringPayloadDecoder): 

1403 protoComponent = useful.ObjectDescriptor() 

1404 

1405 

1406class GeneralizedTimePayloadDecoder(OctetStringPayloadDecoder): 

1407 protoComponent = useful.GeneralizedTime() 

1408 

1409 

1410class UTCTimePayloadDecoder(OctetStringPayloadDecoder): 

1411 protoComponent = useful.UTCTime() 

1412 

1413 

1414TAG_MAP = { 

1415 univ.Integer.tagSet: IntegerPayloadDecoder(), 

1416 univ.Boolean.tagSet: BooleanPayloadDecoder(), 

1417 univ.BitString.tagSet: BitStringPayloadDecoder(), 

1418 univ.OctetString.tagSet: OctetStringPayloadDecoder(), 

1419 univ.Null.tagSet: NullPayloadDecoder(), 

1420 univ.ObjectIdentifier.tagSet: ObjectIdentifierPayloadDecoder(), 

1421 univ.Enumerated.tagSet: IntegerPayloadDecoder(), 

1422 univ.Real.tagSet: RealPayloadDecoder(), 

1423 univ.Sequence.tagSet: SequenceOrSequenceOfPayloadDecoder(), # conflicts with SequenceOf 

1424 univ.Set.tagSet: SetOrSetOfPayloadDecoder(), # conflicts with SetOf 

1425 univ.Choice.tagSet: ChoicePayloadDecoder(), # conflicts with Any 

1426 # character string types 

1427 char.UTF8String.tagSet: UTF8StringPayloadDecoder(), 

1428 char.NumericString.tagSet: NumericStringPayloadDecoder(), 

1429 char.PrintableString.tagSet: PrintableStringPayloadDecoder(), 

1430 char.TeletexString.tagSet: TeletexStringPayloadDecoder(), 

1431 char.VideotexString.tagSet: VideotexStringPayloadDecoder(), 

1432 char.IA5String.tagSet: IA5StringPayloadDecoder(), 

1433 char.GraphicString.tagSet: GraphicStringPayloadDecoder(), 

1434 char.VisibleString.tagSet: VisibleStringPayloadDecoder(), 

1435 char.GeneralString.tagSet: GeneralStringPayloadDecoder(), 

1436 char.UniversalString.tagSet: UniversalStringPayloadDecoder(), 

1437 char.BMPString.tagSet: BMPStringPayloadDecoder(), 

1438 # useful types 

1439 useful.ObjectDescriptor.tagSet: ObjectDescriptorPayloadDecoder(), 

1440 useful.GeneralizedTime.tagSet: GeneralizedTimePayloadDecoder(), 

1441 useful.UTCTime.tagSet: UTCTimePayloadDecoder() 

1442} 

1443 

1444# Type-to-codec map for ambiguous ASN.1 types 

1445TYPE_MAP = { 

1446 univ.Set.typeId: SetPayloadDecoder(), 

1447 univ.SetOf.typeId: SetOfPayloadDecoder(), 

1448 univ.Sequence.typeId: SequencePayloadDecoder(), 

1449 univ.SequenceOf.typeId: SequenceOfPayloadDecoder(), 

1450 univ.Choice.typeId: ChoicePayloadDecoder(), 

1451 univ.Any.typeId: AnyPayloadDecoder() 

1452} 

1453 

1454# deprecated aliases, https://github.com/pyasn1/pyasn1/issues/9 

1455tagMap = TAG_MAP 

1456typeMap = TYPE_MAP 

1457 

1458# Put in non-ambiguous types for faster codec lookup 

1459for typeDecoder in TAG_MAP.values(): 

1460 if typeDecoder.protoComponent is not None: 

1461 typeId = typeDecoder.protoComponent.__class__.typeId 

1462 if typeId is not None and typeId not in TYPE_MAP: 

1463 TYPE_MAP[typeId] = typeDecoder 

1464 

1465 

1466(stDecodeTag, 

1467 stDecodeLength, 

1468 stGetValueDecoder, 

1469 stGetValueDecoderByAsn1Spec, 

1470 stGetValueDecoderByTag, 

1471 stTryAsExplicitTag, 

1472 stDecodeValue, 

1473 stDumpRawValue, 

1474 stErrorCondition, 

1475 stStop) = [x for x in range(10)] 

1476 

1477 

1478EOO_SENTINEL = ints2octs((0, 0)) 

1479 

1480 

1481class SingleItemDecoder(object): 

1482 defaultErrorState = stErrorCondition 

1483 #defaultErrorState = stDumpRawValue 

1484 defaultRawDecoder = AnyPayloadDecoder() 

1485 

1486 supportIndefLength = True 

1487 

1488 TAG_MAP = TAG_MAP 

1489 TYPE_MAP = TYPE_MAP 

1490 

1491 def __init__(self, tagMap=_MISSING, typeMap=_MISSING, **ignored): 

1492 self._tagMap = tagMap if tagMap is not _MISSING else self.TAG_MAP 

1493 self._typeMap = typeMap if typeMap is not _MISSING else self.TYPE_MAP 

1494 

1495 # Tag & TagSet objects caches 

1496 self._tagCache = {} 

1497 self._tagSetCache = {} 

1498 

1499 def __call__(self, substrate, asn1Spec=None, 

1500 tagSet=None, length=None, state=stDecodeTag, 

1501 decodeFun=None, substrateFun=None, 

1502 **options): 

1503 

1504 allowEoo = options.pop('allowEoo', False) 

1505 

1506 if LOG: 

1507 LOG('decoder called at scope %s with state %d, working with up ' 

1508 'to %s octets of substrate: ' 

1509 '%s' % (debug.scope, state, length, substrate)) 

1510 

1511 # Look for end-of-octets sentinel 

1512 if allowEoo and self.supportIndefLength: 

1513 

1514 for eoo_candidate in readFromStream(substrate, 2, options): 

1515 if isinstance(eoo_candidate, SubstrateUnderrunError): 

1516 yield eoo_candidate 

1517 

1518 if eoo_candidate == EOO_SENTINEL: 

1519 if LOG: 

1520 LOG('end-of-octets sentinel found') 

1521 yield eoo.endOfOctets 

1522 return 

1523 

1524 else: 

1525 substrate.seek(-2, os.SEEK_CUR) 

1526 

1527 tagMap = self._tagMap 

1528 typeMap = self._typeMap 

1529 tagCache = self._tagCache 

1530 tagSetCache = self._tagSetCache 

1531 

1532 value = noValue 

1533 

1534 substrate.markedPosition = substrate.tell() 

1535 

1536 while state is not stStop: 

1537 

1538 if state is stDecodeTag: 

1539 # Decode tag 

1540 isShortTag = True 

1541 

1542 for firstByte in readFromStream(substrate, 1, options): 

1543 if isinstance(firstByte, SubstrateUnderrunError): 

1544 yield firstByte 

1545 

1546 firstOctet = ord(firstByte) 

1547 

1548 try: 

1549 lastTag = tagCache[firstOctet] 

1550 

1551 except KeyError: 

1552 integerTag = firstOctet 

1553 tagClass = integerTag & 0xC0 

1554 tagFormat = integerTag & 0x20 

1555 tagId = integerTag & 0x1F 

1556 

1557 if tagId == 0x1F: 

1558 isShortTag = False 

1559 lengthOctetIdx = 0 

1560 tagId = 0 

1561 

1562 while True: 

1563 for integerByte in readFromStream(substrate, 1, options): 

1564 if isinstance(integerByte, SubstrateUnderrunError): 

1565 yield integerByte 

1566 

1567 if not integerByte: 

1568 raise error.SubstrateUnderrunError( 

1569 'Short octet stream on long tag decoding' 

1570 ) 

1571 

1572 integerTag = ord(integerByte) 

1573 lengthOctetIdx += 1 

1574 tagId <<= 7 

1575 tagId |= (integerTag & 0x7F) 

1576 

1577 if not integerTag & 0x80: 

1578 break 

1579 

1580 lastTag = tag.Tag( 

1581 tagClass=tagClass, tagFormat=tagFormat, tagId=tagId 

1582 ) 

1583 

1584 if isShortTag: 

1585 # cache short tags 

1586 tagCache[firstOctet] = lastTag 

1587 

1588 if tagSet is None: 

1589 if isShortTag: 

1590 try: 

1591 tagSet = tagSetCache[firstOctet] 

1592 

1593 except KeyError: 

1594 # base tag not recovered 

1595 tagSet = tag.TagSet((), lastTag) 

1596 tagSetCache[firstOctet] = tagSet 

1597 else: 

1598 tagSet = tag.TagSet((), lastTag) 

1599 

1600 else: 

1601 tagSet = lastTag + tagSet 

1602 

1603 state = stDecodeLength 

1604 

1605 if LOG: 

1606 LOG('tag decoded into %s, decoding length' % tagSet) 

1607 

1608 if state is stDecodeLength: 

1609 # Decode length 

1610 for firstOctet in readFromStream(substrate, 1, options): 

1611 if isinstance(firstOctet, SubstrateUnderrunError): 

1612 yield firstOctet 

1613 

1614 firstOctet = ord(firstOctet) 

1615 

1616 if firstOctet < 128: 

1617 length = firstOctet 

1618 

1619 elif firstOctet > 128: 

1620 size = firstOctet & 0x7F 

1621 # encoded in size bytes 

1622 for encodedLength in readFromStream(substrate, size, options): 

1623 if isinstance(encodedLength, SubstrateUnderrunError): 

1624 yield encodedLength 

1625 encodedLength = list(encodedLength) 

1626 # missing check on maximum size, which shouldn't be a 

1627 # problem, we can handle more than is possible 

1628 if len(encodedLength) != size: 

1629 raise error.SubstrateUnderrunError( 

1630 '%s<%s at %s' % (size, len(encodedLength), tagSet) 

1631 ) 

1632 

1633 length = 0 

1634 for lengthOctet in encodedLength: 

1635 length <<= 8 

1636 length |= oct2int(lengthOctet) 

1637 size += 1 

1638 

1639 else: # 128 means indefinite 

1640 length = -1 

1641 

1642 if length == -1 and not self.supportIndefLength: 

1643 raise error.PyAsn1Error('Indefinite length encoding not supported by this codec') 

1644 

1645 state = stGetValueDecoder 

1646 

1647 if LOG: 

1648 LOG('value length decoded into %d' % length) 

1649 

1650 if state is stGetValueDecoder: 

1651 if asn1Spec is None: 

1652 state = stGetValueDecoderByTag 

1653 

1654 else: 

1655 state = stGetValueDecoderByAsn1Spec 

1656 # 

1657 # There're two ways of creating subtypes in ASN.1 what influences 

1658 # decoder operation. These methods are: 

1659 # 1) Either base types used in or no IMPLICIT tagging has been 

1660 # applied on subtyping. 

1661 # 2) Subtype syntax drops base type information (by means of 

1662 # IMPLICIT tagging. 

1663 # The first case allows for complete tag recovery from substrate 

1664 # while the second one requires original ASN.1 type spec for 

1665 # decoding. 

1666 # 

1667 # In either case a set of tags (tagSet) is coming from substrate 

1668 # in an incremental, tag-by-tag fashion (this is the case of 

1669 # EXPLICIT tag which is most basic). Outermost tag comes first 

1670 # from the wire. 

1671 # 

1672 if state is stGetValueDecoderByTag: 

1673 try: 

1674 concreteDecoder = tagMap[tagSet] 

1675 

1676 except KeyError: 

1677 concreteDecoder = None 

1678 

1679 if concreteDecoder: 

1680 state = stDecodeValue 

1681 

1682 else: 

1683 try: 

1684 concreteDecoder = tagMap[tagSet[:1]] 

1685 

1686 except KeyError: 

1687 concreteDecoder = None 

1688 

1689 if concreteDecoder: 

1690 state = stDecodeValue 

1691 else: 

1692 state = stTryAsExplicitTag 

1693 

1694 if LOG: 

1695 LOG('codec %s chosen by a built-in type, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag')) 

1696 debug.scope.push(concreteDecoder is None and '?' or concreteDecoder.protoComponent.__class__.__name__) 

1697 

1698 if state is stGetValueDecoderByAsn1Spec: 

1699 

1700 if asn1Spec.__class__ is tagmap.TagMap: 

1701 try: 

1702 chosenSpec = asn1Spec[tagSet] 

1703 

1704 except KeyError: 

1705 chosenSpec = None 

1706 

1707 if LOG: 

1708 LOG('candidate ASN.1 spec is a map of:') 

1709 

1710 for firstOctet, v in asn1Spec.presentTypes.items(): 

1711 LOG(' %s -> %s' % (firstOctet, v.__class__.__name__)) 

1712 

1713 if asn1Spec.skipTypes: 

1714 LOG('but neither of: ') 

1715 for firstOctet, v in asn1Spec.skipTypes.items(): 

1716 LOG(' %s -> %s' % (firstOctet, v.__class__.__name__)) 

1717 LOG('new candidate ASN.1 spec is %s, chosen by %s' % (chosenSpec is None and '<none>' or chosenSpec.prettyPrintType(), tagSet)) 

1718 

1719 elif tagSet == asn1Spec.tagSet or tagSet in asn1Spec.tagMap: 

1720 chosenSpec = asn1Spec 

1721 if LOG: 

1722 LOG('candidate ASN.1 spec is %s' % asn1Spec.__class__.__name__) 

1723 

1724 else: 

1725 chosenSpec = None 

1726 

1727 if chosenSpec is not None: 

1728 try: 

1729 # ambiguous type or just faster codec lookup 

1730 concreteDecoder = typeMap[chosenSpec.typeId] 

1731 

1732 if LOG: 

1733 LOG('value decoder chosen for an ambiguous type by type ID %s' % (chosenSpec.typeId,)) 

1734 

1735 except KeyError: 

1736 # use base type for codec lookup to recover untagged types 

1737 baseTagSet = tag.TagSet(chosenSpec.tagSet.baseTag, chosenSpec.tagSet.baseTag) 

1738 try: 

1739 # base type or tagged subtype 

1740 concreteDecoder = tagMap[baseTagSet] 

1741 

1742 if LOG: 

1743 LOG('value decoder chosen by base %s' % (baseTagSet,)) 

1744 

1745 except KeyError: 

1746 concreteDecoder = None 

1747 

1748 if concreteDecoder: 

1749 asn1Spec = chosenSpec 

1750 state = stDecodeValue 

1751 

1752 else: 

1753 state = stTryAsExplicitTag 

1754 

1755 else: 

1756 concreteDecoder = None 

1757 state = stTryAsExplicitTag 

1758 

1759 if LOG: 

1760 LOG('codec %s chosen by ASN.1 spec, decoding %s' % (state is stDecodeValue and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag')) 

1761 debug.scope.push(chosenSpec is None and '?' or chosenSpec.__class__.__name__) 

1762 

1763 if state is stDecodeValue: 

1764 if not options.get('recursiveFlag', True) and not substrateFun: # deprecate this 

1765 substrateFun = lambda a, b, c: (a, b[:c]) 

1766 

1767 original_position = substrate.tell() 

1768 

1769 if length == -1: # indef length 

1770 for value in concreteDecoder.indefLenValueDecoder( 

1771 substrate, asn1Spec, 

1772 tagSet, length, stGetValueDecoder, 

1773 self, substrateFun, **options): 

1774 if isinstance(value, SubstrateUnderrunError): 

1775 yield value 

1776 

1777 else: 

1778 for value in concreteDecoder.valueDecoder( 

1779 substrate, asn1Spec, 

1780 tagSet, length, stGetValueDecoder, 

1781 self, substrateFun, **options): 

1782 if isinstance(value, SubstrateUnderrunError): 

1783 yield value 

1784 

1785 bytesRead = substrate.tell() - original_position 

1786 if bytesRead != length: 

1787 raise PyAsn1Error( 

1788 "Read %s bytes instead of expected %s." % (bytesRead, length)) 

1789 

1790 if LOG: 

1791 LOG('codec %s yields type %s, value:\n%s\n...' % ( 

1792 concreteDecoder.__class__.__name__, value.__class__.__name__, 

1793 isinstance(value, base.Asn1Item) and value.prettyPrint() or value)) 

1794 

1795 state = stStop 

1796 break 

1797 

1798 if state is stTryAsExplicitTag: 

1799 if (tagSet and 

1800 tagSet[0].tagFormat == tag.tagFormatConstructed and 

1801 tagSet[0].tagClass != tag.tagClassUniversal): 

1802 # Assume explicit tagging 

1803 concreteDecoder = rawPayloadDecoder 

1804 state = stDecodeValue 

1805 

1806 else: 

1807 concreteDecoder = None 

1808 state = self.defaultErrorState 

1809 

1810 if LOG: 

1811 LOG('codec %s chosen, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as failure')) 

1812 

1813 if state is stDumpRawValue: 

1814 concreteDecoder = self.defaultRawDecoder 

1815 

1816 if LOG: 

1817 LOG('codec %s chosen, decoding value' % concreteDecoder.__class__.__name__) 

1818 

1819 state = stDecodeValue 

1820 

1821 if state is stErrorCondition: 

1822 raise error.PyAsn1Error( 

1823 '%s not in asn1Spec: %r' % (tagSet, asn1Spec) 

1824 ) 

1825 

1826 if LOG: 

1827 debug.scope.pop() 

1828 LOG('decoder left scope %s, call completed' % debug.scope) 

1829 

1830 yield value 

1831 

1832 

1833class StreamingDecoder(object): 

1834 """Create an iterator that turns BER/CER/DER byte stream into ASN.1 objects. 

1835 

1836 On each iteration, consume whatever BER/CER/DER serialization is 

1837 available in the `substrate` stream-like object and turns it into 

1838 one or more, possibly nested, ASN.1 objects. 

1839 

1840 Parameters 

1841 ---------- 

1842 substrate: :py:class:`file`, :py:class:`io.BytesIO` 

1843 BER/CER/DER serialization in form of a byte stream 

1844 

1845 Keyword Args 

1846 ------------ 

1847 asn1Spec: :py:class:`~pyasn1.type.base.PyAsn1Item` 

1848 A pyasn1 type object to act as a template guiding the decoder. 

1849 Depending on the ASN.1 structure being decoded, `asn1Spec` may 

1850 or may not be required. One of the reasons why `asn1Spec` may 

1851 me required is that ASN.1 structure is encoded in the *IMPLICIT* 

1852 tagging mode. 

1853 

1854 Yields 

1855 ------ 

1856 : :py:class:`~pyasn1.type.base.PyAsn1Item`, :py:class:`~pyasn1.error.SubstrateUnderrunError` 

1857 Decoded ASN.1 object (possibly, nested) or 

1858 :py:class:`~pyasn1.error.SubstrateUnderrunError` object indicating 

1859 insufficient BER/CER/DER serialization on input to fully recover ASN.1 

1860 objects from it. 

1861  

1862 In the latter case the caller is advised to ensure some more data in 

1863 the input stream, then call the iterator again. The decoder will resume 

1864 the decoding process using the newly arrived data. 

1865 

1866 The `context` property of :py:class:`~pyasn1.error.SubstrateUnderrunError` 

1867 object might hold a reference to the partially populated ASN.1 object 

1868 being reconstructed. 

1869 

1870 Raises 

1871 ------ 

1872 ~pyasn1.error.PyAsn1Error, ~pyasn1.error.EndOfStreamError 

1873 `PyAsn1Error` on deserialization error, `EndOfStreamError` on 

1874 premature stream closure. 

1875 

1876 Examples 

1877 -------- 

1878 Decode BER serialisation without ASN.1 schema 

1879 

1880 .. code-block:: pycon 

1881 

1882 >>> stream = io.BytesIO( 

1883 ... b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03') 

1884 >>> 

1885 >>> for asn1Object in StreamingDecoder(stream): 

1886 ... print(asn1Object) 

1887 >>> 

1888 SequenceOf: 

1889 1 2 3 

1890 

1891 Decode BER serialisation with ASN.1 schema 

1892 

1893 .. code-block:: pycon 

1894 

1895 >>> stream = io.BytesIO( 

1896 ... b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03') 

1897 >>> 

1898 >>> schema = SequenceOf(componentType=Integer()) 

1899 >>> 

1900 >>> decoder = StreamingDecoder(stream, asn1Spec=schema) 

1901 >>> for asn1Object in decoder: 

1902 ... print(asn1Object) 

1903 >>> 

1904 SequenceOf: 

1905 1 2 3 

1906 """ 

1907 

1908 SINGLE_ITEM_DECODER = SingleItemDecoder 

1909 

1910 def __init__(self, substrate, asn1Spec=None, **options): 

1911 self._singleItemDecoder = self.SINGLE_ITEM_DECODER(**options) 

1912 self._substrate = asSeekableStream(substrate) 

1913 self._asn1Spec = asn1Spec 

1914 self._options = options 

1915 

1916 def __iter__(self): 

1917 while True: 

1918 for asn1Object in self._singleItemDecoder( 

1919 self._substrate, self._asn1Spec, **self._options): 

1920 yield asn1Object 

1921 

1922 for chunk in isEndOfStream(self._substrate): 

1923 if isinstance(chunk, SubstrateUnderrunError): 

1924 yield 

1925 

1926 break 

1927 

1928 if chunk: 

1929 break 

1930 

1931 

1932class Decoder(object): 

1933 """Create a BER decoder object. 

1934 

1935 Parse BER/CER/DER octet-stream into one, possibly nested, ASN.1 object. 

1936 """ 

1937 STREAMING_DECODER = StreamingDecoder 

1938 

1939 @classmethod 

1940 def __call__(cls, substrate, asn1Spec=None, **options): 

1941 """Turns BER/CER/DER octet stream into an ASN.1 object. 

1942 

1943 Takes BER/CER/DER octet-stream in form of :py:class:`bytes` (Python 3) 

1944 or :py:class:`str` (Python 2) and decode it into an ASN.1 object 

1945 (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) which 

1946 may be a scalar or an arbitrary nested structure. 

1947 

1948 Parameters 

1949 ---------- 

1950 substrate: :py:class:`bytes` (Python 3) or :py:class:`str` (Python 2) 

1951 BER/CER/DER octet-stream to parse 

1952 

1953 Keyword Args 

1954 ------------ 

1955 asn1Spec: :py:class:`~pyasn1.type.base.PyAsn1Item` 

1956 A pyasn1 type object (:py:class:`~pyasn1.type.base.PyAsn1Item` 

1957 derivative) to act as a template guiding the decoder. 

1958 Depending on the ASN.1 structure being decoded, `asn1Spec` may or 

1959 may not be required. Most common reason for it to require is that 

1960 ASN.1 structure is encoded in *IMPLICIT* tagging mode. 

1961 

1962 Returns 

1963 ------- 

1964 : :py:class:`tuple` 

1965 A tuple of :py:class:`~pyasn1.type.base.PyAsn1Item` object 

1966 recovered from BER/CER/DER substrate and the unprocessed trailing 

1967 portion of the `substrate` (may be empty) 

1968 

1969 Raises 

1970 ------ 

1971 : :py:class:`~pyasn1.error.PyAsn1Error` 

1972 :py:class:`~pyasn1.error.SubstrateUnderrunError` on insufficient 

1973 input or :py:class:`~pyasn1.error.PyAsn1Error` on decoding error. 

1974 

1975 Examples 

1976 -------- 

1977 Decode BER/CER/DER serialisation without ASN.1 schema 

1978 

1979 .. code-block:: pycon 

1980 

1981 >>> s, unprocessed = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03') 

1982 >>> str(s) 

1983 SequenceOf: 

1984 1 2 3 

1985 

1986 Decode BER/CER/DER serialisation with ASN.1 schema 

1987 

1988 .. code-block:: pycon 

1989 

1990 >>> seq = SequenceOf(componentType=Integer()) 

1991 >>> s, unprocessed = decode( 

1992 b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03', asn1Spec=seq) 

1993 >>> str(s) 

1994 SequenceOf: 

1995 1 2 3 

1996 

1997 """ 

1998 substrate = asSeekableStream(substrate) 

1999 

2000 streamingDecoder = cls.STREAMING_DECODER( 

2001 substrate, asn1Spec, **options) 

2002 

2003 for asn1Object in streamingDecoder: 

2004 if isinstance(asn1Object, SubstrateUnderrunError): 

2005 raise error.SubstrateUnderrunError('Short substrate on input') 

2006 

2007 try: 

2008 tail = next(readFromStream(substrate)) 

2009 

2010 except error.EndOfStreamError: 

2011 tail = null 

2012 

2013 return asn1Object, tail 

2014 

2015 

2016#: Turns BER octet stream into an ASN.1 object. 

2017#: 

2018#: Takes BER octet-stream and decode it into an ASN.1 object 

2019#: (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) which 

2020#: may be a scalar or an arbitrary nested structure. 

2021#: 

2022#: Parameters 

2023#: ---------- 

2024#: substrate: :py:class:`bytes` (Python 3) or :py:class:`str` (Python 2) 

2025#: BER octet-stream 

2026#: 

2027#: Keyword Args 

2028#: ------------ 

2029#: asn1Spec: any pyasn1 type object e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative 

2030#: A pyasn1 type object to act as a template guiding the decoder. Depending on the ASN.1 structure 

2031#: being decoded, *asn1Spec* may or may not be required. Most common reason for 

2032#: it to require is that ASN.1 structure is encoded in *IMPLICIT* tagging mode. 

2033#: 

2034#: Returns 

2035#: ------- 

2036#: : :py:class:`tuple` 

2037#: A tuple of pyasn1 object recovered from BER substrate (:py:class:`~pyasn1.type.base.PyAsn1Item` derivative) 

2038#: and the unprocessed trailing portion of the *substrate* (may be empty) 

2039#: 

2040#: Raises 

2041#: ------ 

2042#: ~pyasn1.error.PyAsn1Error, ~pyasn1.error.SubstrateUnderrunError 

2043#: On decoding errors 

2044#: 

2045#: Notes 

2046#: ----- 

2047#: This function is deprecated. Please use :py:class:`Decoder` or 

2048#: :py:class:`StreamingDecoder` class instance. 

2049#: 

2050#: Examples 

2051#: -------- 

2052#: Decode BER serialisation without ASN.1 schema 

2053#: 

2054#: .. code-block:: pycon 

2055#: 

2056#: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03') 

2057#: >>> str(s) 

2058#: SequenceOf: 

2059#: 1 2 3 

2060#: 

2061#: Decode BER serialisation with ASN.1 schema 

2062#: 

2063#: .. code-block:: pycon 

2064#: 

2065#: >>> seq = SequenceOf(componentType=Integer()) 

2066#: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03', asn1Spec=seq) 

2067#: >>> str(s) 

2068#: SequenceOf: 

2069#: 1 2 3 

2070#: 

2071decode = Decoder()