Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pyasn1/codec/ber/decoder.py: 56%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

988 statements  

1# 

2# This file is part of pyasn1 software. 

3# 

4# Copyright (c) 2005-2020, Ilya Etingof <etingof@gmail.com> 

5# License: http://snmplabs.com/pyasn1/license.html 

6# 

7import os 

8 

9from pyasn1 import debug 

10from pyasn1 import error 

11from pyasn1.codec.ber import eoo 

12from pyasn1.codec.streaming import asSeekableStream 

13from pyasn1.codec.streaming import isEndOfStream 

14from pyasn1.codec.streaming import peekIntoStream 

15from pyasn1.codec.streaming import readFromStream 

16from pyasn1.compat.integer import from_bytes 

17from pyasn1.compat.octets import oct2int, octs2ints, ints2octs, null 

18from pyasn1.error import PyAsn1Error 

19from pyasn1.type import base 

20from pyasn1.type import char 

21from pyasn1.type import tag 

22from pyasn1.type import tagmap 

23from pyasn1.type import univ 

24from pyasn1.type import useful 

25 

26__all__ = ['StreamingDecoder', 'Decoder', 'decode'] 

27 

28LOG = debug.registerLoggee(__name__, flags=debug.DEBUG_DECODER) 

29 

30noValue = base.noValue 

31 

32SubstrateUnderrunError = error.SubstrateUnderrunError 

33 

34 

35class AbstractPayloadDecoder(object): 

36 protoComponent = None 

37 

38 def valueDecoder(self, substrate, asn1Spec, 

39 tagSet=None, length=None, state=None, 

40 decodeFun=None, substrateFun=None, 

41 **options): 

42 """Decode value with fixed byte length. 

43 

44 The decoder is allowed to consume as many bytes as necessary. 

45 """ 

46 raise error.PyAsn1Error('SingleItemDecoder not implemented for %s' % (tagSet,)) # TODO: Seems more like an NotImplementedError? 

47 

48 def indefLenValueDecoder(self, substrate, asn1Spec, 

49 tagSet=None, length=None, state=None, 

50 decodeFun=None, substrateFun=None, 

51 **options): 

52 """Decode value with undefined length. 

53 

54 The decoder is allowed to consume as many bytes as necessary. 

55 """ 

56 raise error.PyAsn1Error('Indefinite length mode decoder not implemented for %s' % (tagSet,)) # TODO: Seems more like an NotImplementedError? 

57 

58 @staticmethod 

59 def _passAsn1Object(asn1Object, options): 

60 if 'asn1Object' not in options: 

61 options['asn1Object'] = asn1Object 

62 

63 return options 

64 

65 

66class AbstractSimplePayloadDecoder(AbstractPayloadDecoder): 

67 @staticmethod 

68 def substrateCollector(asn1Object, substrate, length, options): 

69 for chunk in readFromStream(substrate, length, options): 

70 yield chunk 

71 

72 def _createComponent(self, asn1Spec, tagSet, value, **options): 

73 if options.get('native'): 

74 return value 

75 elif asn1Spec is None: 

76 return self.protoComponent.clone(value, tagSet=tagSet) 

77 elif value is noValue: 

78 return asn1Spec 

79 else: 

80 return asn1Spec.clone(value) 

81 

82 

83class RawPayloadDecoder(AbstractSimplePayloadDecoder): 

84 protoComponent = univ.Any('') 

85 

86 def valueDecoder(self, substrate, asn1Spec, 

87 tagSet=None, length=None, state=None, 

88 decodeFun=None, substrateFun=None, 

89 **options): 

90 if substrateFun: 

91 asn1Object = self._createComponent(asn1Spec, tagSet, '', **options) 

92 

93 for chunk in substrateFun(asn1Object, substrate, length, options): 

94 yield chunk 

95 

96 return 

97 

98 for value in decodeFun(substrate, asn1Spec, tagSet, length, **options): 

99 yield value 

100 

101 def indefLenValueDecoder(self, substrate, asn1Spec, 

102 tagSet=None, length=None, state=None, 

103 decodeFun=None, substrateFun=None, 

104 **options): 

105 if substrateFun: 

106 asn1Object = self._createComponent(asn1Spec, tagSet, '', **options) 

107 

108 for chunk in substrateFun(asn1Object, substrate, length, options): 

109 yield chunk 

110 

111 return 

112 

113 while True: 

114 for value in decodeFun( 

115 substrate, asn1Spec, tagSet, length, 

116 allowEoo=True, **options): 

117 

118 if value is eoo.endOfOctets: 

119 return 

120 

121 yield value 

122 

123 

124rawPayloadDecoder = RawPayloadDecoder() 

125 

126 

127class IntegerPayloadDecoder(AbstractSimplePayloadDecoder): 

128 protoComponent = univ.Integer(0) 

129 

130 def valueDecoder(self, substrate, asn1Spec, 

131 tagSet=None, length=None, state=None, 

132 decodeFun=None, substrateFun=None, 

133 **options): 

134 

135 if tagSet[0].tagFormat != tag.tagFormatSimple: 

136 raise error.PyAsn1Error('Simple tag format expected') 

137 

138 for chunk in readFromStream(substrate, length, options): 

139 if isinstance(chunk, SubstrateUnderrunError): 

140 yield chunk 

141 

142 if chunk: 

143 value = from_bytes(chunk, signed=True) 

144 

145 else: 

146 value = 0 

147 

148 yield self._createComponent(asn1Spec, tagSet, value, **options) 

149 

150 

151class BooleanPayloadDecoder(IntegerPayloadDecoder): 

152 protoComponent = univ.Boolean(0) 

153 

154 def _createComponent(self, asn1Spec, tagSet, value, **options): 

155 return IntegerPayloadDecoder._createComponent( 

156 self, asn1Spec, tagSet, value and 1 or 0, **options) 

157 

158 

159class BitStringPayloadDecoder(AbstractSimplePayloadDecoder): 

160 protoComponent = univ.BitString(()) 

161 supportConstructedForm = True 

162 

163 def valueDecoder(self, substrate, asn1Spec, 

164 tagSet=None, length=None, state=None, 

165 decodeFun=None, substrateFun=None, 

166 **options): 

167 

168 if substrateFun: 

169 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options) 

170 

171 for chunk in substrateFun(asn1Object, substrate, length, options): 

172 yield chunk 

173 

174 return 

175 

176 if not length: 

177 raise error.PyAsn1Error('Empty BIT STRING substrate') 

178 

179 for chunk in isEndOfStream(substrate): 

180 if isinstance(chunk, SubstrateUnderrunError): 

181 yield chunk 

182 

183 if chunk: 

184 raise error.PyAsn1Error('Empty BIT STRING substrate') 

185 

186 if tagSet[0].tagFormat == tag.tagFormatSimple: # XXX what tag to check? 

187 

188 for trailingBits in readFromStream(substrate, 1, options): 

189 if isinstance(trailingBits, SubstrateUnderrunError): 

190 yield trailingBits 

191 

192 trailingBits = ord(trailingBits) 

193 if trailingBits > 7: 

194 raise error.PyAsn1Error( 

195 'Trailing bits overflow %s' % trailingBits 

196 ) 

197 

198 for chunk in readFromStream(substrate, length - 1, options): 

199 if isinstance(chunk, SubstrateUnderrunError): 

200 yield chunk 

201 

202 value = self.protoComponent.fromOctetString( 

203 chunk, internalFormat=True, padding=trailingBits) 

204 

205 yield self._createComponent(asn1Spec, tagSet, value, **options) 

206 

207 return 

208 

209 if not self.supportConstructedForm: 

210 raise error.PyAsn1Error('Constructed encoding form prohibited ' 

211 'at %s' % self.__class__.__name__) 

212 

213 if LOG: 

214 LOG('assembling constructed serialization') 

215 

216 # All inner fragments are of the same type, treat them as octet string 

217 substrateFun = self.substrateCollector 

218 

219 bitString = self.protoComponent.fromOctetString(null, internalFormat=True) 

220 

221 current_position = substrate.tell() 

222 

223 while substrate.tell() - current_position < length: 

224 for component in decodeFun( 

225 substrate, self.protoComponent, substrateFun=substrateFun, 

226 **options): 

227 if isinstance(component, SubstrateUnderrunError): 

228 yield component 

229 

230 trailingBits = oct2int(component[0]) 

231 if trailingBits > 7: 

232 raise error.PyAsn1Error( 

233 'Trailing bits overflow %s' % trailingBits 

234 ) 

235 

236 bitString = self.protoComponent.fromOctetString( 

237 component[1:], internalFormat=True, 

238 prepend=bitString, padding=trailingBits 

239 ) 

240 

241 yield self._createComponent(asn1Spec, tagSet, bitString, **options) 

242 

243 def indefLenValueDecoder(self, substrate, asn1Spec, 

244 tagSet=None, length=None, state=None, 

245 decodeFun=None, substrateFun=None, 

246 **options): 

247 

248 if substrateFun: 

249 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options) 

250 

251 for chunk in substrateFun(asn1Object, substrate, length, options): 

252 yield chunk 

253 

254 return 

255 

256 # All inner fragments are of the same type, treat them as octet string 

257 substrateFun = self.substrateCollector 

258 

259 bitString = self.protoComponent.fromOctetString(null, internalFormat=True) 

260 

261 while True: # loop over fragments 

262 

263 for component in decodeFun( 

264 substrate, self.protoComponent, substrateFun=substrateFun, 

265 allowEoo=True, **options): 

266 

267 if component is eoo.endOfOctets: 

268 break 

269 

270 if isinstance(component, SubstrateUnderrunError): 

271 yield component 

272 

273 if component is eoo.endOfOctets: 

274 break 

275 

276 trailingBits = oct2int(component[0]) 

277 if trailingBits > 7: 

278 raise error.PyAsn1Error( 

279 'Trailing bits overflow %s' % trailingBits 

280 ) 

281 

282 bitString = self.protoComponent.fromOctetString( 

283 component[1:], internalFormat=True, 

284 prepend=bitString, padding=trailingBits 

285 ) 

286 

287 yield self._createComponent(asn1Spec, tagSet, bitString, **options) 

288 

289 

290class OctetStringPayloadDecoder(AbstractSimplePayloadDecoder): 

291 protoComponent = univ.OctetString('') 

292 supportConstructedForm = True 

293 

294 def valueDecoder(self, substrate, asn1Spec, 

295 tagSet=None, length=None, state=None, 

296 decodeFun=None, substrateFun=None, 

297 **options): 

298 if substrateFun: 

299 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options) 

300 

301 for chunk in substrateFun(asn1Object, substrate, length, options): 

302 yield chunk 

303 

304 return 

305 

306 if tagSet[0].tagFormat == tag.tagFormatSimple: # XXX what tag to check? 

307 for chunk in readFromStream(substrate, length, options): 

308 if isinstance(chunk, SubstrateUnderrunError): 

309 yield chunk 

310 

311 yield self._createComponent(asn1Spec, tagSet, chunk, **options) 

312 

313 return 

314 

315 if not self.supportConstructedForm: 

316 raise error.PyAsn1Error('Constructed encoding form prohibited at %s' % self.__class__.__name__) 

317 

318 if LOG: 

319 LOG('assembling constructed serialization') 

320 

321 # All inner fragments are of the same type, treat them as octet string 

322 substrateFun = self.substrateCollector 

323 

324 header = null 

325 

326 original_position = substrate.tell() 

327 # head = popSubstream(substrate, length) 

328 while substrate.tell() - original_position < length: 

329 for component in decodeFun( 

330 substrate, self.protoComponent, substrateFun=substrateFun, 

331 **options): 

332 if isinstance(component, SubstrateUnderrunError): 

333 yield component 

334 

335 header += component 

336 

337 yield self._createComponent(asn1Spec, tagSet, header, **options) 

338 

339 def indefLenValueDecoder(self, substrate, asn1Spec, 

340 tagSet=None, length=None, state=None, 

341 decodeFun=None, substrateFun=None, 

342 **options): 

343 if substrateFun and substrateFun is not self.substrateCollector: 

344 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options) 

345 

346 for chunk in substrateFun(asn1Object, substrate, length, options): 

347 yield chunk 

348 

349 return 

350 

351 # All inner fragments are of the same type, treat them as octet string 

352 substrateFun = self.substrateCollector 

353 

354 header = null 

355 

356 while True: # loop over fragments 

357 

358 for component in decodeFun( 

359 substrate, self.protoComponent, substrateFun=substrateFun, 

360 allowEoo=True, **options): 

361 

362 if isinstance(component, SubstrateUnderrunError): 

363 yield component 

364 

365 if component is eoo.endOfOctets: 

366 break 

367 

368 if component is eoo.endOfOctets: 

369 break 

370 

371 header += component 

372 

373 yield self._createComponent(asn1Spec, tagSet, header, **options) 

374 

375 

376class NullPayloadDecoder(AbstractSimplePayloadDecoder): 

377 protoComponent = univ.Null('') 

378 

379 def valueDecoder(self, substrate, asn1Spec, 

380 tagSet=None, length=None, state=None, 

381 decodeFun=None, substrateFun=None, 

382 **options): 

383 

384 if tagSet[0].tagFormat != tag.tagFormatSimple: 

385 raise error.PyAsn1Error('Simple tag format expected') 

386 

387 for chunk in readFromStream(substrate, length, options): 

388 if isinstance(chunk, SubstrateUnderrunError): 

389 yield chunk 

390 

391 component = self._createComponent(asn1Spec, tagSet, '', **options) 

392 

393 if chunk: 

394 raise error.PyAsn1Error('Unexpected %d-octet substrate for Null' % length) 

395 

396 yield component 

397 

398 

399class ObjectIdentifierPayloadDecoder(AbstractSimplePayloadDecoder): 

400 protoComponent = univ.ObjectIdentifier(()) 

401 

402 def valueDecoder(self, substrate, asn1Spec, 

403 tagSet=None, length=None, state=None, 

404 decodeFun=None, substrateFun=None, 

405 **options): 

406 if tagSet[0].tagFormat != tag.tagFormatSimple: 

407 raise error.PyAsn1Error('Simple tag format expected') 

408 

409 for chunk in readFromStream(substrate, length, options): 

410 if isinstance(chunk, SubstrateUnderrunError): 

411 yield chunk 

412 

413 if not chunk: 

414 raise error.PyAsn1Error('Empty substrate') 

415 

416 chunk = octs2ints(chunk) 

417 

418 oid = () 

419 index = 0 

420 substrateLen = len(chunk) 

421 while index < substrateLen: 

422 subId = chunk[index] 

423 index += 1 

424 if subId < 128: 

425 oid += (subId,) 

426 elif subId > 128: 

427 # Construct subid from a number of octets 

428 nextSubId = subId 

429 subId = 0 

430 while nextSubId >= 128: 

431 subId = (subId << 7) + (nextSubId & 0x7F) 

432 if index >= substrateLen: 

433 raise error.SubstrateUnderrunError( 

434 'Short substrate for sub-OID past %s' % (oid,) 

435 ) 

436 nextSubId = chunk[index] 

437 index += 1 

438 oid += ((subId << 7) + nextSubId,) 

439 elif subId == 128: 

440 # ASN.1 spec forbids leading zeros (0x80) in OID 

441 # encoding, tolerating it opens a vulnerability. See 

442 # https://www.esat.kuleuven.be/cosic/publications/article-1432.pdf 

443 # page 7 

444 raise error.PyAsn1Error('Invalid octet 0x80 in OID encoding') 

445 

446 # Decode two leading arcs 

447 if 0 <= oid[0] <= 39: 

448 oid = (0,) + oid 

449 elif 40 <= oid[0] <= 79: 

450 oid = (1, oid[0] - 40) + oid[1:] 

451 elif oid[0] >= 80: 

452 oid = (2, oid[0] - 80) + oid[1:] 

453 else: 

454 raise error.PyAsn1Error('Malformed first OID octet: %s' % chunk[0]) 

455 

456 yield self._createComponent(asn1Spec, tagSet, oid, **options) 

457 

458 

459class RealPayloadDecoder(AbstractSimplePayloadDecoder): 

460 protoComponent = univ.Real() 

461 

462 def valueDecoder(self, substrate, asn1Spec, 

463 tagSet=None, length=None, state=None, 

464 decodeFun=None, substrateFun=None, 

465 **options): 

466 if tagSet[0].tagFormat != tag.tagFormatSimple: 

467 raise error.PyAsn1Error('Simple tag format expected') 

468 

469 for chunk in readFromStream(substrate, length, options): 

470 if isinstance(chunk, SubstrateUnderrunError): 

471 yield chunk 

472 

473 if not chunk: 

474 yield self._createComponent(asn1Spec, tagSet, 0.0, **options) 

475 return 

476 

477 fo = oct2int(chunk[0]) 

478 chunk = chunk[1:] 

479 if fo & 0x80: # binary encoding 

480 if not chunk: 

481 raise error.PyAsn1Error("Incomplete floating-point value") 

482 

483 if LOG: 

484 LOG('decoding binary encoded REAL') 

485 

486 n = (fo & 0x03) + 1 

487 

488 if n == 4: 

489 n = oct2int(chunk[0]) 

490 chunk = chunk[1:] 

491 

492 eo, chunk = chunk[:n], chunk[n:] 

493 

494 if not eo or not chunk: 

495 raise error.PyAsn1Error('Real exponent screwed') 

496 

497 e = oct2int(eo[0]) & 0x80 and -1 or 0 

498 

499 while eo: # exponent 

500 e <<= 8 

501 e |= oct2int(eo[0]) 

502 eo = eo[1:] 

503 

504 b = fo >> 4 & 0x03 # base bits 

505 

506 if b > 2: 

507 raise error.PyAsn1Error('Illegal Real base') 

508 

509 if b == 1: # encbase = 8 

510 e *= 3 

511 

512 elif b == 2: # encbase = 16 

513 e *= 4 

514 p = 0 

515 

516 while chunk: # value 

517 p <<= 8 

518 p |= oct2int(chunk[0]) 

519 chunk = chunk[1:] 

520 

521 if fo & 0x40: # sign bit 

522 p = -p 

523 

524 sf = fo >> 2 & 0x03 # scale bits 

525 p *= 2 ** sf 

526 value = (p, 2, e) 

527 

528 elif fo & 0x40: # infinite value 

529 if LOG: 

530 LOG('decoding infinite REAL') 

531 

532 value = fo & 0x01 and '-inf' or 'inf' 

533 

534 elif fo & 0xc0 == 0: # character encoding 

535 if not chunk: 

536 raise error.PyAsn1Error("Incomplete floating-point value") 

537 

538 if LOG: 

539 LOG('decoding character encoded REAL') 

540 

541 try: 

542 if fo & 0x3 == 0x1: # NR1 

543 value = (int(chunk), 10, 0) 

544 

545 elif fo & 0x3 == 0x2: # NR2 

546 value = float(chunk) 

547 

548 elif fo & 0x3 == 0x3: # NR3 

549 value = float(chunk) 

550 

551 else: 

552 raise error.SubstrateUnderrunError( 

553 'Unknown NR (tag %s)' % fo 

554 ) 

555 

556 except ValueError: 

557 raise error.SubstrateUnderrunError( 

558 'Bad character Real syntax' 

559 ) 

560 

561 else: 

562 raise error.SubstrateUnderrunError( 

563 'Unknown encoding (tag %s)' % fo 

564 ) 

565 

566 yield self._createComponent(asn1Spec, tagSet, value, **options) 

567 

568 

569class AbstractConstructedPayloadDecoder(AbstractPayloadDecoder): 

570 protoComponent = None 

571 

572 

573class ConstructedPayloadDecoderBase(AbstractConstructedPayloadDecoder): 

574 protoRecordComponent = None 

575 protoSequenceComponent = None 

576 

577 def _getComponentTagMap(self, asn1Object, idx): 

578 raise NotImplementedError() 

579 

580 def _getComponentPositionByType(self, asn1Object, tagSet, idx): 

581 raise NotImplementedError() 

582 

583 def _decodeComponentsSchemaless( 

584 self, substrate, tagSet=None, decodeFun=None, 

585 length=None, **options): 

586 

587 asn1Object = None 

588 

589 components = [] 

590 componentTypes = set() 

591 

592 original_position = substrate.tell() 

593 

594 while length == -1 or substrate.tell() < original_position + length: 

595 for component in decodeFun(substrate, **options): 

596 if isinstance(component, SubstrateUnderrunError): 

597 yield component 

598 

599 if length == -1 and component is eoo.endOfOctets: 

600 break 

601 

602 components.append(component) 

603 componentTypes.add(component.tagSet) 

604 

605 # Now we have to guess is it SEQUENCE/SET or SEQUENCE OF/SET OF 

606 # The heuristics is: 

607 # * 1+ components of different types -> likely SEQUENCE/SET 

608 # * otherwise -> likely SEQUENCE OF/SET OF 

609 if len(componentTypes) > 1: 

610 protoComponent = self.protoRecordComponent 

611 

612 else: 

613 protoComponent = self.protoSequenceComponent 

614 

615 asn1Object = protoComponent.clone( 

616 # construct tagSet from base tag from prototype ASN.1 object 

617 # and additional tags recovered from the substrate 

618 tagSet=tag.TagSet(protoComponent.tagSet.baseTag, *tagSet.superTags) 

619 ) 

620 

621 if LOG: 

622 LOG('guessed %r container type (pass `asn1Spec` to guide the ' 

623 'decoder)' % asn1Object) 

624 

625 for idx, component in enumerate(components): 

626 asn1Object.setComponentByPosition( 

627 idx, component, 

628 verifyConstraints=False, 

629 matchTags=False, matchConstraints=False 

630 ) 

631 

632 yield asn1Object 

633 

634 def valueDecoder(self, substrate, asn1Spec, 

635 tagSet=None, length=None, state=None, 

636 decodeFun=None, substrateFun=None, 

637 **options): 

638 if tagSet[0].tagFormat != tag.tagFormatConstructed: 

639 raise error.PyAsn1Error('Constructed tag format expected') 

640 

641 original_position = substrate.tell() 

642 

643 if substrateFun: 

644 if asn1Spec is not None: 

645 asn1Object = asn1Spec.clone() 

646 

647 elif self.protoComponent is not None: 

648 asn1Object = self.protoComponent.clone(tagSet=tagSet) 

649 

650 else: 

651 asn1Object = self.protoRecordComponent, self.protoSequenceComponent 

652 

653 for chunk in substrateFun(asn1Object, substrate, length, options): 

654 yield chunk 

655 

656 return 

657 

658 if asn1Spec is None: 

659 for asn1Object in self._decodeComponentsSchemaless( 

660 substrate, tagSet=tagSet, decodeFun=decodeFun, 

661 length=length, **options): 

662 if isinstance(asn1Object, SubstrateUnderrunError): 

663 yield asn1Object 

664 

665 if substrate.tell() < original_position + length: 

666 if LOG: 

667 for trailing in readFromStream(substrate, context=options): 

668 if isinstance(trailing, SubstrateUnderrunError): 

669 yield trailing 

670 

671 LOG('Unused trailing %d octets encountered: %s' % ( 

672 len(trailing), debug.hexdump(trailing))) 

673 

674 yield asn1Object 

675 

676 return 

677 

678 asn1Object = asn1Spec.clone() 

679 asn1Object.clear() 

680 

681 options = self._passAsn1Object(asn1Object, options) 

682 

683 if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId): 

684 

685 namedTypes = asn1Spec.componentType 

686 

687 isSetType = asn1Spec.typeId == univ.Set.typeId 

688 isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault 

689 

690 if LOG: 

691 LOG('decoding %sdeterministic %s type %r chosen by type ID' % ( 

692 not isDeterministic and 'non-' or '', isSetType and 'SET' or '', 

693 asn1Spec)) 

694 

695 seenIndices = set() 

696 idx = 0 

697 while substrate.tell() - original_position < length: 

698 if not namedTypes: 

699 componentType = None 

700 

701 elif isSetType: 

702 componentType = namedTypes.tagMapUnique 

703 

704 else: 

705 try: 

706 if isDeterministic: 

707 componentType = namedTypes[idx].asn1Object 

708 

709 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 

710 componentType = namedTypes.getTagMapNearPosition(idx) 

711 

712 else: 

713 componentType = namedTypes[idx].asn1Object 

714 

715 except IndexError: 

716 raise error.PyAsn1Error( 

717 'Excessive components decoded at %r' % (asn1Spec,) 

718 ) 

719 

720 for component in decodeFun(substrate, componentType, **options): 

721 if isinstance(component, SubstrateUnderrunError): 

722 yield component 

723 

724 if not isDeterministic and namedTypes: 

725 if isSetType: 

726 idx = namedTypes.getPositionByType(component.effectiveTagSet) 

727 

728 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 

729 idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx) 

730 

731 asn1Object.setComponentByPosition( 

732 idx, component, 

733 verifyConstraints=False, 

734 matchTags=False, matchConstraints=False 

735 ) 

736 

737 seenIndices.add(idx) 

738 idx += 1 

739 

740 if LOG: 

741 LOG('seen component indices %s' % seenIndices) 

742 

743 if namedTypes: 

744 if not namedTypes.requiredComponents.issubset(seenIndices): 

745 raise error.PyAsn1Error( 

746 'ASN.1 object %s has uninitialized ' 

747 'components' % asn1Object.__class__.__name__) 

748 

749 if namedTypes.hasOpenTypes: 

750 

751 openTypes = options.get('openTypes', {}) 

752 

753 if LOG: 

754 LOG('user-specified open types map:') 

755 

756 for k, v in openTypes.items(): 

757 LOG('%s -> %r' % (k, v)) 

758 

759 if openTypes or options.get('decodeOpenTypes', False): 

760 

761 for idx, namedType in enumerate(namedTypes.namedTypes): 

762 if not namedType.openType: 

763 continue 

764 

765 if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue: 

766 continue 

767 

768 governingValue = asn1Object.getComponentByName( 

769 namedType.openType.name 

770 ) 

771 

772 try: 

773 openType = openTypes[governingValue] 

774 

775 except KeyError: 

776 

777 if LOG: 

778 LOG('default open types map of component ' 

779 '"%s.%s" governed by component "%s.%s"' 

780 ':' % (asn1Object.__class__.__name__, 

781 namedType.name, 

782 asn1Object.__class__.__name__, 

783 namedType.openType.name)) 

784 

785 for k, v in namedType.openType.items(): 

786 LOG('%s -> %r' % (k, v)) 

787 

788 try: 

789 openType = namedType.openType[governingValue] 

790 

791 except KeyError: 

792 if LOG: 

793 LOG('failed to resolve open type by governing ' 

794 'value %r' % (governingValue,)) 

795 continue 

796 

797 if LOG: 

798 LOG('resolved open type %r by governing ' 

799 'value %r' % (openType, governingValue)) 

800 

801 containerValue = asn1Object.getComponentByPosition(idx) 

802 

803 if containerValue.typeId in ( 

804 univ.SetOf.typeId, univ.SequenceOf.typeId): 

805 

806 for pos, containerElement in enumerate( 

807 containerValue): 

808 

809 stream = asSeekableStream(containerValue[pos].asOctets()) 

810 

811 for component in decodeFun(stream, asn1Spec=openType, **options): 

812 if isinstance(component, SubstrateUnderrunError): 

813 yield component 

814 

815 containerValue[pos] = component 

816 

817 else: 

818 stream = asSeekableStream(asn1Object.getComponentByPosition(idx).asOctets()) 

819 

820 for component in decodeFun(stream, asn1Spec=openType, **options): 

821 if isinstance(component, SubstrateUnderrunError): 

822 yield component 

823 

824 asn1Object.setComponentByPosition(idx, component) 

825 

826 else: 

827 inconsistency = asn1Object.isInconsistent 

828 if inconsistency: 

829 raise inconsistency 

830 

831 else: 

832 componentType = asn1Spec.componentType 

833 

834 if LOG: 

835 LOG('decoding type %r chosen by given `asn1Spec`' % componentType) 

836 

837 idx = 0 

838 

839 while substrate.tell() - original_position < length: 

840 for component in decodeFun(substrate, componentType, **options): 

841 if isinstance(component, SubstrateUnderrunError): 

842 yield component 

843 

844 asn1Object.setComponentByPosition( 

845 idx, component, 

846 verifyConstraints=False, 

847 matchTags=False, matchConstraints=False 

848 ) 

849 

850 idx += 1 

851 

852 yield asn1Object 

853 

854 def indefLenValueDecoder(self, substrate, asn1Spec, 

855 tagSet=None, length=None, state=None, 

856 decodeFun=None, substrateFun=None, 

857 **options): 

858 if tagSet[0].tagFormat != tag.tagFormatConstructed: 

859 raise error.PyAsn1Error('Constructed tag format expected') 

860 

861 if substrateFun is not None: 

862 if asn1Spec is not None: 

863 asn1Object = asn1Spec.clone() 

864 

865 elif self.protoComponent is not None: 

866 asn1Object = self.protoComponent.clone(tagSet=tagSet) 

867 

868 else: 

869 asn1Object = self.protoRecordComponent, self.protoSequenceComponent 

870 

871 for chunk in substrateFun(asn1Object, substrate, length, options): 

872 yield chunk 

873 

874 return 

875 

876 if asn1Spec is None: 

877 for asn1Object in self._decodeComponentsSchemaless( 

878 substrate, tagSet=tagSet, decodeFun=decodeFun, 

879 length=length, **dict(options, allowEoo=True)): 

880 if isinstance(asn1Object, SubstrateUnderrunError): 

881 yield asn1Object 

882 

883 yield asn1Object 

884 

885 return 

886 

887 asn1Object = asn1Spec.clone() 

888 asn1Object.clear() 

889 

890 options = self._passAsn1Object(asn1Object, options) 

891 

892 if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId): 

893 

894 namedTypes = asn1Object.componentType 

895 

896 isSetType = asn1Object.typeId == univ.Set.typeId 

897 isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault 

898 

899 if LOG: 

900 LOG('decoding %sdeterministic %s type %r chosen by type ID' % ( 

901 not isDeterministic and 'non-' or '', isSetType and 'SET' or '', 

902 asn1Spec)) 

903 

904 seenIndices = set() 

905 

906 idx = 0 

907 

908 while True: # loop over components 

909 if len(namedTypes) <= idx: 

910 asn1Spec = None 

911 

912 elif isSetType: 

913 asn1Spec = namedTypes.tagMapUnique 

914 

915 else: 

916 try: 

917 if isDeterministic: 

918 asn1Spec = namedTypes[idx].asn1Object 

919 

920 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 

921 asn1Spec = namedTypes.getTagMapNearPosition(idx) 

922 

923 else: 

924 asn1Spec = namedTypes[idx].asn1Object 

925 

926 except IndexError: 

927 raise error.PyAsn1Error( 

928 'Excessive components decoded at %r' % (asn1Object,) 

929 ) 

930 

931 for component in decodeFun(substrate, asn1Spec, allowEoo=True, **options): 

932 

933 if isinstance(component, SubstrateUnderrunError): 

934 yield component 

935 

936 if component is eoo.endOfOctets: 

937 break 

938 

939 if component is eoo.endOfOctets: 

940 break 

941 

942 if not isDeterministic and namedTypes: 

943 if isSetType: 

944 idx = namedTypes.getPositionByType(component.effectiveTagSet) 

945 

946 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 

947 idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx) 

948 

949 asn1Object.setComponentByPosition( 

950 idx, component, 

951 verifyConstraints=False, 

952 matchTags=False, matchConstraints=False 

953 ) 

954 

955 seenIndices.add(idx) 

956 idx += 1 

957 

958 if LOG: 

959 LOG('seen component indices %s' % seenIndices) 

960 

961 if namedTypes: 

962 if not namedTypes.requiredComponents.issubset(seenIndices): 

963 raise error.PyAsn1Error( 

964 'ASN.1 object %s has uninitialized ' 

965 'components' % asn1Object.__class__.__name__) 

966 

967 if namedTypes.hasOpenTypes: 

968 

969 openTypes = options.get('openTypes', {}) 

970 

971 if LOG: 

972 LOG('user-specified open types map:') 

973 

974 for k, v in openTypes.items(): 

975 LOG('%s -> %r' % (k, v)) 

976 

977 if openTypes or options.get('decodeOpenTypes', False): 

978 

979 for idx, namedType in enumerate(namedTypes.namedTypes): 

980 if not namedType.openType: 

981 continue 

982 

983 if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue: 

984 continue 

985 

986 governingValue = asn1Object.getComponentByName( 

987 namedType.openType.name 

988 ) 

989 

990 try: 

991 openType = openTypes[governingValue] 

992 

993 except KeyError: 

994 

995 if LOG: 

996 LOG('default open types map of component ' 

997 '"%s.%s" governed by component "%s.%s"' 

998 ':' % (asn1Object.__class__.__name__, 

999 namedType.name, 

1000 asn1Object.__class__.__name__, 

1001 namedType.openType.name)) 

1002 

1003 for k, v in namedType.openType.items(): 

1004 LOG('%s -> %r' % (k, v)) 

1005 

1006 try: 

1007 openType = namedType.openType[governingValue] 

1008 

1009 except KeyError: 

1010 if LOG: 

1011 LOG('failed to resolve open type by governing ' 

1012 'value %r' % (governingValue,)) 

1013 continue 

1014 

1015 if LOG: 

1016 LOG('resolved open type %r by governing ' 

1017 'value %r' % (openType, governingValue)) 

1018 

1019 containerValue = asn1Object.getComponentByPosition(idx) 

1020 

1021 if containerValue.typeId in ( 

1022 univ.SetOf.typeId, univ.SequenceOf.typeId): 

1023 

1024 for pos, containerElement in enumerate( 

1025 containerValue): 

1026 

1027 stream = asSeekableStream(containerValue[pos].asOctets()) 

1028 

1029 for component in decodeFun(stream, asn1Spec=openType, 

1030 **dict(options, allowEoo=True)): 

1031 if isinstance(component, SubstrateUnderrunError): 

1032 yield component 

1033 

1034 if component is eoo.endOfOctets: 

1035 break 

1036 

1037 containerValue[pos] = component 

1038 

1039 else: 

1040 stream = asSeekableStream(asn1Object.getComponentByPosition(idx).asOctets()) 

1041 for component in decodeFun(stream, asn1Spec=openType, 

1042 **dict(options, allowEoo=True)): 

1043 if isinstance(component, SubstrateUnderrunError): 

1044 yield component 

1045 

1046 if component is eoo.endOfOctets: 

1047 break 

1048 

1049 asn1Object.setComponentByPosition(idx, component) 

1050 

1051 else: 

1052 inconsistency = asn1Object.isInconsistent 

1053 if inconsistency: 

1054 raise inconsistency 

1055 

1056 else: 

1057 componentType = asn1Spec.componentType 

1058 

1059 if LOG: 

1060 LOG('decoding type %r chosen by given `asn1Spec`' % componentType) 

1061 

1062 idx = 0 

1063 

1064 while True: 

1065 

1066 for component in decodeFun( 

1067 substrate, componentType, allowEoo=True, **options): 

1068 

1069 if isinstance(component, SubstrateUnderrunError): 

1070 yield component 

1071 

1072 if component is eoo.endOfOctets: 

1073 break 

1074 

1075 if component is eoo.endOfOctets: 

1076 break 

1077 

1078 asn1Object.setComponentByPosition( 

1079 idx, component, 

1080 verifyConstraints=False, 

1081 matchTags=False, matchConstraints=False 

1082 ) 

1083 

1084 idx += 1 

1085 

1086 yield asn1Object 

1087 

1088 

1089class SequenceOrSequenceOfPayloadDecoder(ConstructedPayloadDecoderBase): 

1090 protoRecordComponent = univ.Sequence() 

1091 protoSequenceComponent = univ.SequenceOf() 

1092 

1093 

1094class SequencePayloadDecoder(SequenceOrSequenceOfPayloadDecoder): 

1095 protoComponent = univ.Sequence() 

1096 

1097 

1098class SequenceOfPayloadDecoder(SequenceOrSequenceOfPayloadDecoder): 

1099 protoComponent = univ.SequenceOf() 

1100 

1101 

1102class SetOrSetOfPayloadDecoder(ConstructedPayloadDecoderBase): 

1103 protoRecordComponent = univ.Set() 

1104 protoSequenceComponent = univ.SetOf() 

1105 

1106 

1107class SetPayloadDecoder(SetOrSetOfPayloadDecoder): 

1108 protoComponent = univ.Set() 

1109 

1110 

1111class SetOfPayloadDecoder(SetOrSetOfPayloadDecoder): 

1112 protoComponent = univ.SetOf() 

1113 

1114 

1115class ChoicePayloadDecoder(ConstructedPayloadDecoderBase): 

1116 protoComponent = univ.Choice() 

1117 

1118 def valueDecoder(self, substrate, asn1Spec, 

1119 tagSet=None, length=None, state=None, 

1120 decodeFun=None, substrateFun=None, 

1121 **options): 

1122 if asn1Spec is None: 

1123 asn1Object = self.protoComponent.clone(tagSet=tagSet) 

1124 

1125 else: 

1126 asn1Object = asn1Spec.clone() 

1127 

1128 if substrateFun: 

1129 for chunk in substrateFun(asn1Object, substrate, length, options): 

1130 yield chunk 

1131 

1132 return 

1133 

1134 options = self._passAsn1Object(asn1Object, options) 

1135 

1136 if asn1Object.tagSet == tagSet: 

1137 if LOG: 

1138 LOG('decoding %s as explicitly tagged CHOICE' % (tagSet,)) 

1139 

1140 for component in decodeFun( 

1141 substrate, asn1Object.componentTagMap, **options): 

1142 if isinstance(component, SubstrateUnderrunError): 

1143 yield component 

1144 

1145 else: 

1146 if LOG: 

1147 LOG('decoding %s as untagged CHOICE' % (tagSet,)) 

1148 

1149 for component in decodeFun( 

1150 substrate, asn1Object.componentTagMap, tagSet, length, 

1151 state, **options): 

1152 if isinstance(component, SubstrateUnderrunError): 

1153 yield component 

1154 

1155 effectiveTagSet = component.effectiveTagSet 

1156 

1157 if LOG: 

1158 LOG('decoded component %s, effective tag set %s' % (component, effectiveTagSet)) 

1159 

1160 asn1Object.setComponentByType( 

1161 effectiveTagSet, component, 

1162 verifyConstraints=False, 

1163 matchTags=False, matchConstraints=False, 

1164 innerFlag=False 

1165 ) 

1166 

1167 yield asn1Object 

1168 

1169 def indefLenValueDecoder(self, substrate, asn1Spec, 

1170 tagSet=None, length=None, state=None, 

1171 decodeFun=None, substrateFun=None, 

1172 **options): 

1173 if asn1Spec is None: 

1174 asn1Object = self.protoComponent.clone(tagSet=tagSet) 

1175 

1176 else: 

1177 asn1Object = asn1Spec.clone() 

1178 

1179 if substrateFun: 

1180 for chunk in substrateFun(asn1Object, substrate, length, options): 

1181 yield chunk 

1182 

1183 return 

1184 

1185 options = self._passAsn1Object(asn1Object, options) 

1186 

1187 isTagged = asn1Object.tagSet == tagSet 

1188 

1189 if LOG: 

1190 LOG('decoding %s as %stagged CHOICE' % ( 

1191 tagSet, isTagged and 'explicitly ' or 'un')) 

1192 

1193 while True: 

1194 

1195 if isTagged: 

1196 iterator = decodeFun( 

1197 substrate, asn1Object.componentType.tagMapUnique, 

1198 **dict(options, allowEoo=True)) 

1199 

1200 else: 

1201 iterator = decodeFun( 

1202 substrate, asn1Object.componentType.tagMapUnique, 

1203 tagSet, length, state, **dict(options, allowEoo=True)) 

1204 

1205 for component in iterator: 

1206 

1207 if isinstance(component, SubstrateUnderrunError): 

1208 yield component 

1209 

1210 if component is eoo.endOfOctets: 

1211 break 

1212 

1213 effectiveTagSet = component.effectiveTagSet 

1214 

1215 if LOG: 

1216 LOG('decoded component %s, effective tag set ' 

1217 '%s' % (component, effectiveTagSet)) 

1218 

1219 asn1Object.setComponentByType( 

1220 effectiveTagSet, component, 

1221 verifyConstraints=False, 

1222 matchTags=False, matchConstraints=False, 

1223 innerFlag=False 

1224 ) 

1225 

1226 if not isTagged: 

1227 break 

1228 

1229 if not isTagged or component is eoo.endOfOctets: 

1230 break 

1231 

1232 yield asn1Object 

1233 

1234 

1235class AnyPayloadDecoder(AbstractSimplePayloadDecoder): 

1236 protoComponent = univ.Any() 

1237 

1238 def valueDecoder(self, substrate, asn1Spec, 

1239 tagSet=None, length=None, state=None, 

1240 decodeFun=None, substrateFun=None, 

1241 **options): 

1242 if asn1Spec is None: 

1243 isUntagged = True 

1244 

1245 elif asn1Spec.__class__ is tagmap.TagMap: 

1246 isUntagged = tagSet not in asn1Spec.tagMap 

1247 

1248 else: 

1249 isUntagged = tagSet != asn1Spec.tagSet 

1250 

1251 if isUntagged: 

1252 fullPosition = substrate.markedPosition 

1253 currentPosition = substrate.tell() 

1254 

1255 substrate.seek(fullPosition, os.SEEK_SET) 

1256 length += currentPosition - fullPosition 

1257 

1258 if LOG: 

1259 for chunk in peekIntoStream(substrate, length): 

1260 if isinstance(chunk, SubstrateUnderrunError): 

1261 yield chunk 

1262 LOG('decoding as untagged ANY, substrate ' 

1263 '%s' % debug.hexdump(chunk)) 

1264 

1265 if substrateFun: 

1266 for chunk in substrateFun( 

1267 self._createComponent(asn1Spec, tagSet, noValue, **options), 

1268 substrate, length, options): 

1269 yield chunk 

1270 

1271 return 

1272 

1273 for chunk in readFromStream(substrate, length, options): 

1274 if isinstance(chunk, SubstrateUnderrunError): 

1275 yield chunk 

1276 

1277 yield self._createComponent(asn1Spec, tagSet, chunk, **options) 

1278 

1279 def indefLenValueDecoder(self, substrate, asn1Spec, 

1280 tagSet=None, length=None, state=None, 

1281 decodeFun=None, substrateFun=None, 

1282 **options): 

1283 if asn1Spec is None: 

1284 isTagged = False 

1285 

1286 elif asn1Spec.__class__ is tagmap.TagMap: 

1287 isTagged = tagSet in asn1Spec.tagMap 

1288 

1289 else: 

1290 isTagged = tagSet == asn1Spec.tagSet 

1291 

1292 if isTagged: 

1293 # tagged Any type -- consume header substrate 

1294 chunk = null 

1295 

1296 if LOG: 

1297 LOG('decoding as tagged ANY') 

1298 

1299 else: 

1300 # TODO: Seems not to be tested 

1301 fullPosition = substrate.markedPosition 

1302 currentPosition = substrate.tell() 

1303 

1304 substrate.seek(fullPosition, os.SEEK_SET) 

1305 for chunk in readFromStream(substrate, currentPosition - fullPosition, options): 

1306 if isinstance(chunk, SubstrateUnderrunError): 

1307 yield chunk 

1308 

1309 if LOG: 

1310 LOG('decoding as untagged ANY, header substrate %s' % debug.hexdump(chunk)) 

1311 

1312 # Any components do not inherit initial tag 

1313 asn1Spec = self.protoComponent 

1314 

1315 if substrateFun and substrateFun is not self.substrateCollector: 

1316 asn1Object = self._createComponent( 

1317 asn1Spec, tagSet, noValue, **options) 

1318 

1319 for chunk in substrateFun( 

1320 asn1Object, chunk + substrate, length + len(chunk), options): 

1321 yield chunk 

1322 

1323 return 

1324 

1325 if LOG: 

1326 LOG('assembling constructed serialization') 

1327 

1328 # All inner fragments are of the same type, treat them as octet string 

1329 substrateFun = self.substrateCollector 

1330 

1331 while True: # loop over fragments 

1332 

1333 for component in decodeFun( 

1334 substrate, asn1Spec, substrateFun=substrateFun, 

1335 allowEoo=True, **options): 

1336 

1337 if isinstance(component, SubstrateUnderrunError): 

1338 yield component 

1339 

1340 if component is eoo.endOfOctets: 

1341 break 

1342 

1343 if component is eoo.endOfOctets: 

1344 break 

1345 

1346 chunk += component 

1347 

1348 if substrateFun: 

1349 yield chunk # TODO: Weird 

1350 

1351 else: 

1352 yield self._createComponent(asn1Spec, tagSet, chunk, **options) 

1353 

1354 

1355# character string types 

1356class UTF8StringPayloadDecoder(OctetStringPayloadDecoder): 

1357 protoComponent = char.UTF8String() 

1358 

1359 

1360class NumericStringPayloadDecoder(OctetStringPayloadDecoder): 

1361 protoComponent = char.NumericString() 

1362 

1363 

1364class PrintableStringPayloadDecoder(OctetStringPayloadDecoder): 

1365 protoComponent = char.PrintableString() 

1366 

1367 

1368class TeletexStringPayloadDecoder(OctetStringPayloadDecoder): 

1369 protoComponent = char.TeletexString() 

1370 

1371 

1372class VideotexStringPayloadDecoder(OctetStringPayloadDecoder): 

1373 protoComponent = char.VideotexString() 

1374 

1375 

1376class IA5StringPayloadDecoder(OctetStringPayloadDecoder): 

1377 protoComponent = char.IA5String() 

1378 

1379 

1380class GraphicStringPayloadDecoder(OctetStringPayloadDecoder): 

1381 protoComponent = char.GraphicString() 

1382 

1383 

1384class VisibleStringPayloadDecoder(OctetStringPayloadDecoder): 

1385 protoComponent = char.VisibleString() 

1386 

1387 

1388class GeneralStringPayloadDecoder(OctetStringPayloadDecoder): 

1389 protoComponent = char.GeneralString() 

1390 

1391 

1392class UniversalStringPayloadDecoder(OctetStringPayloadDecoder): 

1393 protoComponent = char.UniversalString() 

1394 

1395 

1396class BMPStringPayloadDecoder(OctetStringPayloadDecoder): 

1397 protoComponent = char.BMPString() 

1398 

1399 

1400# "useful" types 

1401class ObjectDescriptorPayloadDecoder(OctetStringPayloadDecoder): 

1402 protoComponent = useful.ObjectDescriptor() 

1403 

1404 

1405class GeneralizedTimePayloadDecoder(OctetStringPayloadDecoder): 

1406 protoComponent = useful.GeneralizedTime() 

1407 

1408 

1409class UTCTimePayloadDecoder(OctetStringPayloadDecoder): 

1410 protoComponent = useful.UTCTime() 

1411 

1412 

1413TAG_MAP = { 

1414 univ.Integer.tagSet: IntegerPayloadDecoder(), 

1415 univ.Boolean.tagSet: BooleanPayloadDecoder(), 

1416 univ.BitString.tagSet: BitStringPayloadDecoder(), 

1417 univ.OctetString.tagSet: OctetStringPayloadDecoder(), 

1418 univ.Null.tagSet: NullPayloadDecoder(), 

1419 univ.ObjectIdentifier.tagSet: ObjectIdentifierPayloadDecoder(), 

1420 univ.Enumerated.tagSet: IntegerPayloadDecoder(), 

1421 univ.Real.tagSet: RealPayloadDecoder(), 

1422 univ.Sequence.tagSet: SequenceOrSequenceOfPayloadDecoder(), # conflicts with SequenceOf 

1423 univ.Set.tagSet: SetOrSetOfPayloadDecoder(), # conflicts with SetOf 

1424 univ.Choice.tagSet: ChoicePayloadDecoder(), # conflicts with Any 

1425 # character string types 

1426 char.UTF8String.tagSet: UTF8StringPayloadDecoder(), 

1427 char.NumericString.tagSet: NumericStringPayloadDecoder(), 

1428 char.PrintableString.tagSet: PrintableStringPayloadDecoder(), 

1429 char.TeletexString.tagSet: TeletexStringPayloadDecoder(), 

1430 char.VideotexString.tagSet: VideotexStringPayloadDecoder(), 

1431 char.IA5String.tagSet: IA5StringPayloadDecoder(), 

1432 char.GraphicString.tagSet: GraphicStringPayloadDecoder(), 

1433 char.VisibleString.tagSet: VisibleStringPayloadDecoder(), 

1434 char.GeneralString.tagSet: GeneralStringPayloadDecoder(), 

1435 char.UniversalString.tagSet: UniversalStringPayloadDecoder(), 

1436 char.BMPString.tagSet: BMPStringPayloadDecoder(), 

1437 # useful types 

1438 useful.ObjectDescriptor.tagSet: ObjectDescriptorPayloadDecoder(), 

1439 useful.GeneralizedTime.tagSet: GeneralizedTimePayloadDecoder(), 

1440 useful.UTCTime.tagSet: UTCTimePayloadDecoder() 

1441} 

1442 

1443# Type-to-codec map for ambiguous ASN.1 types 

1444TYPE_MAP = { 

1445 univ.Set.typeId: SetPayloadDecoder(), 

1446 univ.SetOf.typeId: SetOfPayloadDecoder(), 

1447 univ.Sequence.typeId: SequencePayloadDecoder(), 

1448 univ.SequenceOf.typeId: SequenceOfPayloadDecoder(), 

1449 univ.Choice.typeId: ChoicePayloadDecoder(), 

1450 univ.Any.typeId: AnyPayloadDecoder() 

1451} 

1452 

1453# Put in non-ambiguous types for faster codec lookup 

1454for typeDecoder in TAG_MAP.values(): 

1455 if typeDecoder.protoComponent is not None: 

1456 typeId = typeDecoder.protoComponent.__class__.typeId 

1457 if typeId is not None and typeId not in TYPE_MAP: 

1458 TYPE_MAP[typeId] = typeDecoder 

1459 

1460 

1461(stDecodeTag, 

1462 stDecodeLength, 

1463 stGetValueDecoder, 

1464 stGetValueDecoderByAsn1Spec, 

1465 stGetValueDecoderByTag, 

1466 stTryAsExplicitTag, 

1467 stDecodeValue, 

1468 stDumpRawValue, 

1469 stErrorCondition, 

1470 stStop) = [x for x in range(10)] 

1471 

1472 

1473EOO_SENTINEL = ints2octs((0, 0)) 

1474 

1475 

1476class SingleItemDecoder(object): 

1477 defaultErrorState = stErrorCondition 

1478 #defaultErrorState = stDumpRawValue 

1479 defaultRawDecoder = AnyPayloadDecoder() 

1480 

1481 supportIndefLength = True 

1482 

1483 TAG_MAP = TAG_MAP 

1484 TYPE_MAP = TYPE_MAP 

1485 

1486 def __init__(self, **options): 

1487 self._tagMap = options.get('tagMap', self.TAG_MAP) 

1488 self._typeMap = options.get('typeMap', self.TYPE_MAP) 

1489 

1490 # Tag & TagSet objects caches 

1491 self._tagCache = {} 

1492 self._tagSetCache = {} 

1493 

1494 def __call__(self, substrate, asn1Spec=None, 

1495 tagSet=None, length=None, state=stDecodeTag, 

1496 decodeFun=None, substrateFun=None, 

1497 **options): 

1498 

1499 allowEoo = options.pop('allowEoo', False) 

1500 

1501 if LOG: 

1502 LOG('decoder called at scope %s with state %d, working with up ' 

1503 'to %s octets of substrate: ' 

1504 '%s' % (debug.scope, state, length, substrate)) 

1505 

1506 # Look for end-of-octets sentinel 

1507 if allowEoo and self.supportIndefLength: 

1508 

1509 for eoo_candidate in readFromStream(substrate, 2, options): 

1510 if isinstance(eoo_candidate, SubstrateUnderrunError): 

1511 yield eoo_candidate 

1512 

1513 if eoo_candidate == EOO_SENTINEL: 

1514 if LOG: 

1515 LOG('end-of-octets sentinel found') 

1516 yield eoo.endOfOctets 

1517 return 

1518 

1519 else: 

1520 substrate.seek(-2, os.SEEK_CUR) 

1521 

1522 tagMap = self._tagMap 

1523 typeMap = self._typeMap 

1524 tagCache = self._tagCache 

1525 tagSetCache = self._tagSetCache 

1526 

1527 value = noValue 

1528 

1529 substrate.markedPosition = substrate.tell() 

1530 

1531 while state is not stStop: 

1532 

1533 if state is stDecodeTag: 

1534 # Decode tag 

1535 isShortTag = True 

1536 

1537 for firstByte in readFromStream(substrate, 1, options): 

1538 if isinstance(firstByte, SubstrateUnderrunError): 

1539 yield firstByte 

1540 

1541 firstOctet = ord(firstByte) 

1542 

1543 try: 

1544 lastTag = tagCache[firstOctet] 

1545 

1546 except KeyError: 

1547 integerTag = firstOctet 

1548 tagClass = integerTag & 0xC0 

1549 tagFormat = integerTag & 0x20 

1550 tagId = integerTag & 0x1F 

1551 

1552 if tagId == 0x1F: 

1553 isShortTag = False 

1554 lengthOctetIdx = 0 

1555 tagId = 0 

1556 

1557 while True: 

1558 for integerByte in readFromStream(substrate, 1, options): 

1559 if isinstance(integerByte, SubstrateUnderrunError): 

1560 yield integerByte 

1561 

1562 if not integerByte: 

1563 raise error.SubstrateUnderrunError( 

1564 'Short octet stream on long tag decoding' 

1565 ) 

1566 

1567 integerTag = ord(integerByte) 

1568 lengthOctetIdx += 1 

1569 tagId <<= 7 

1570 tagId |= (integerTag & 0x7F) 

1571 

1572 if not integerTag & 0x80: 

1573 break 

1574 

1575 lastTag = tag.Tag( 

1576 tagClass=tagClass, tagFormat=tagFormat, tagId=tagId 

1577 ) 

1578 

1579 if isShortTag: 

1580 # cache short tags 

1581 tagCache[firstOctet] = lastTag 

1582 

1583 if tagSet is None: 

1584 if isShortTag: 

1585 try: 

1586 tagSet = tagSetCache[firstOctet] 

1587 

1588 except KeyError: 

1589 # base tag not recovered 

1590 tagSet = tag.TagSet((), lastTag) 

1591 tagSetCache[firstOctet] = tagSet 

1592 else: 

1593 tagSet = tag.TagSet((), lastTag) 

1594 

1595 else: 

1596 tagSet = lastTag + tagSet 

1597 

1598 state = stDecodeLength 

1599 

1600 if LOG: 

1601 LOG('tag decoded into %s, decoding length' % tagSet) 

1602 

1603 if state is stDecodeLength: 

1604 # Decode length 

1605 for firstOctet in readFromStream(substrate, 1, options): 

1606 if isinstance(firstOctet, SubstrateUnderrunError): 

1607 yield firstOctet 

1608 

1609 firstOctet = ord(firstOctet) 

1610 

1611 if firstOctet < 128: 

1612 length = firstOctet 

1613 

1614 elif firstOctet > 128: 

1615 size = firstOctet & 0x7F 

1616 # encoded in size bytes 

1617 for encodedLength in readFromStream(substrate, size, options): 

1618 if isinstance(encodedLength, SubstrateUnderrunError): 

1619 yield encodedLength 

1620 encodedLength = list(encodedLength) 

1621 # missing check on maximum size, which shouldn't be a 

1622 # problem, we can handle more than is possible 

1623 if len(encodedLength) != size: 

1624 raise error.SubstrateUnderrunError( 

1625 '%s<%s at %s' % (size, len(encodedLength), tagSet) 

1626 ) 

1627 

1628 length = 0 

1629 for lengthOctet in encodedLength: 

1630 length <<= 8 

1631 length |= oct2int(lengthOctet) 

1632 size += 1 

1633 

1634 else: # 128 means indefinite 

1635 length = -1 

1636 

1637 if length == -1 and not self.supportIndefLength: 

1638 raise error.PyAsn1Error('Indefinite length encoding not supported by this codec') 

1639 

1640 state = stGetValueDecoder 

1641 

1642 if LOG: 

1643 LOG('value length decoded into %d' % length) 

1644 

1645 if state is stGetValueDecoder: 

1646 if asn1Spec is None: 

1647 state = stGetValueDecoderByTag 

1648 

1649 else: 

1650 state = stGetValueDecoderByAsn1Spec 

1651 # 

1652 # There're two ways of creating subtypes in ASN.1 what influences 

1653 # decoder operation. These methods are: 

1654 # 1) Either base types used in or no IMPLICIT tagging has been 

1655 # applied on subtyping. 

1656 # 2) Subtype syntax drops base type information (by means of 

1657 # IMPLICIT tagging. 

1658 # The first case allows for complete tag recovery from substrate 

1659 # while the second one requires original ASN.1 type spec for 

1660 # decoding. 

1661 # 

1662 # In either case a set of tags (tagSet) is coming from substrate 

1663 # in an incremental, tag-by-tag fashion (this is the case of 

1664 # EXPLICIT tag which is most basic). Outermost tag comes first 

1665 # from the wire. 

1666 # 

1667 if state is stGetValueDecoderByTag: 

1668 try: 

1669 concreteDecoder = tagMap[tagSet] 

1670 

1671 except KeyError: 

1672 concreteDecoder = None 

1673 

1674 if concreteDecoder: 

1675 state = stDecodeValue 

1676 

1677 else: 

1678 try: 

1679 concreteDecoder = tagMap[tagSet[:1]] 

1680 

1681 except KeyError: 

1682 concreteDecoder = None 

1683 

1684 if concreteDecoder: 

1685 state = stDecodeValue 

1686 else: 

1687 state = stTryAsExplicitTag 

1688 

1689 if LOG: 

1690 LOG('codec %s chosen by a built-in type, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag')) 

1691 debug.scope.push(concreteDecoder is None and '?' or concreteDecoder.protoComponent.__class__.__name__) 

1692 

1693 if state is stGetValueDecoderByAsn1Spec: 

1694 

1695 if asn1Spec.__class__ is tagmap.TagMap: 

1696 try: 

1697 chosenSpec = asn1Spec[tagSet] 

1698 

1699 except KeyError: 

1700 chosenSpec = None 

1701 

1702 if LOG: 

1703 LOG('candidate ASN.1 spec is a map of:') 

1704 

1705 for firstOctet, v in asn1Spec.presentTypes.items(): 

1706 LOG(' %s -> %s' % (firstOctet, v.__class__.__name__)) 

1707 

1708 if asn1Spec.skipTypes: 

1709 LOG('but neither of: ') 

1710 for firstOctet, v in asn1Spec.skipTypes.items(): 

1711 LOG(' %s -> %s' % (firstOctet, v.__class__.__name__)) 

1712 LOG('new candidate ASN.1 spec is %s, chosen by %s' % (chosenSpec is None and '<none>' or chosenSpec.prettyPrintType(), tagSet)) 

1713 

1714 elif tagSet == asn1Spec.tagSet or tagSet in asn1Spec.tagMap: 

1715 chosenSpec = asn1Spec 

1716 if LOG: 

1717 LOG('candidate ASN.1 spec is %s' % asn1Spec.__class__.__name__) 

1718 

1719 else: 

1720 chosenSpec = None 

1721 

1722 if chosenSpec is not None: 

1723 try: 

1724 # ambiguous type or just faster codec lookup 

1725 concreteDecoder = typeMap[chosenSpec.typeId] 

1726 

1727 if LOG: 

1728 LOG('value decoder chosen for an ambiguous type by type ID %s' % (chosenSpec.typeId,)) 

1729 

1730 except KeyError: 

1731 # use base type for codec lookup to recover untagged types 

1732 baseTagSet = tag.TagSet(chosenSpec.tagSet.baseTag, chosenSpec.tagSet.baseTag) 

1733 try: 

1734 # base type or tagged subtype 

1735 concreteDecoder = tagMap[baseTagSet] 

1736 

1737 if LOG: 

1738 LOG('value decoder chosen by base %s' % (baseTagSet,)) 

1739 

1740 except KeyError: 

1741 concreteDecoder = None 

1742 

1743 if concreteDecoder: 

1744 asn1Spec = chosenSpec 

1745 state = stDecodeValue 

1746 

1747 else: 

1748 state = stTryAsExplicitTag 

1749 

1750 else: 

1751 concreteDecoder = None 

1752 state = stTryAsExplicitTag 

1753 

1754 if LOG: 

1755 LOG('codec %s chosen by ASN.1 spec, decoding %s' % (state is stDecodeValue and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag')) 

1756 debug.scope.push(chosenSpec is None and '?' or chosenSpec.__class__.__name__) 

1757 

1758 if state is stDecodeValue: 

1759 if not options.get('recursiveFlag', True) and not substrateFun: # deprecate this 

1760 substrateFun = lambda a, b, c: (a, b[:c]) 

1761 

1762 original_position = substrate.tell() 

1763 

1764 if length == -1: # indef length 

1765 for value in concreteDecoder.indefLenValueDecoder( 

1766 substrate, asn1Spec, 

1767 tagSet, length, stGetValueDecoder, 

1768 self, substrateFun, **options): 

1769 if isinstance(value, SubstrateUnderrunError): 

1770 yield value 

1771 

1772 else: 

1773 for value in concreteDecoder.valueDecoder( 

1774 substrate, asn1Spec, 

1775 tagSet, length, stGetValueDecoder, 

1776 self, substrateFun, **options): 

1777 if isinstance(value, SubstrateUnderrunError): 

1778 yield value 

1779 

1780 bytesRead = substrate.tell() - original_position 

1781 if bytesRead != length: 

1782 raise PyAsn1Error( 

1783 "Read %s bytes instead of expected %s." % (bytesRead, length)) 

1784 

1785 if LOG: 

1786 LOG('codec %s yields type %s, value:\n%s\n...' % ( 

1787 concreteDecoder.__class__.__name__, value.__class__.__name__, 

1788 isinstance(value, base.Asn1Item) and value.prettyPrint() or value)) 

1789 

1790 state = stStop 

1791 break 

1792 

1793 if state is stTryAsExplicitTag: 

1794 if (tagSet and 

1795 tagSet[0].tagFormat == tag.tagFormatConstructed and 

1796 tagSet[0].tagClass != tag.tagClassUniversal): 

1797 # Assume explicit tagging 

1798 concreteDecoder = rawPayloadDecoder 

1799 state = stDecodeValue 

1800 

1801 else: 

1802 concreteDecoder = None 

1803 state = self.defaultErrorState 

1804 

1805 if LOG: 

1806 LOG('codec %s chosen, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as failure')) 

1807 

1808 if state is stDumpRawValue: 

1809 concreteDecoder = self.defaultRawDecoder 

1810 

1811 if LOG: 

1812 LOG('codec %s chosen, decoding value' % concreteDecoder.__class__.__name__) 

1813 

1814 state = stDecodeValue 

1815 

1816 if state is stErrorCondition: 

1817 raise error.PyAsn1Error( 

1818 '%s not in asn1Spec: %r' % (tagSet, asn1Spec) 

1819 ) 

1820 

1821 if LOG: 

1822 debug.scope.pop() 

1823 LOG('decoder left scope %s, call completed' % debug.scope) 

1824 

1825 yield value 

1826 

1827 

1828class StreamingDecoder(object): 

1829 """Create an iterator that turns BER/CER/DER byte stream into ASN.1 objects. 

1830 

1831 On each iteration, consume whatever BER/CER/DER serialization is 

1832 available in the `substrate` stream-like object and turns it into 

1833 one or more, possibly nested, ASN.1 objects. 

1834 

1835 Parameters 

1836 ---------- 

1837 substrate: :py:class:`file`, :py:class:`io.BytesIO` 

1838 BER/CER/DER serialization in form of a byte stream 

1839 

1840 Keyword Args 

1841 ------------ 

1842 asn1Spec: :py:class:`~pyasn1.type.base.PyAsn1Item` 

1843 A pyasn1 type object to act as a template guiding the decoder. 

1844 Depending on the ASN.1 structure being decoded, `asn1Spec` may 

1845 or may not be required. One of the reasons why `asn1Spec` may 

1846 me required is that ASN.1 structure is encoded in the *IMPLICIT* 

1847 tagging mode. 

1848 

1849 Yields 

1850 ------ 

1851 : :py:class:`~pyasn1.type.base.PyAsn1Item`, :py:class:`~pyasn1.error.SubstrateUnderrunError` 

1852 Decoded ASN.1 object (possibly, nested) or 

1853 :py:class:`~pyasn1.error.SubstrateUnderrunError` object indicating 

1854 insufficient BER/CER/DER serialization on input to fully recover ASN.1 

1855 objects from it. 

1856  

1857 In the latter case the caller is advised to ensure some more data in 

1858 the input stream, then call the iterator again. The decoder will resume 

1859 the decoding process using the newly arrived data. 

1860 

1861 The `context` property of :py:class:`~pyasn1.error.SubstrateUnderrunError` 

1862 object might hold a reference to the partially populated ASN.1 object 

1863 being reconstructed. 

1864 

1865 Raises 

1866 ------ 

1867 ~pyasn1.error.PyAsn1Error, ~pyasn1.error.EndOfStreamError 

1868 `PyAsn1Error` on deserialization error, `EndOfStreamError` on 

1869 premature stream closure. 

1870 

1871 Examples 

1872 -------- 

1873 Decode BER serialisation without ASN.1 schema 

1874 

1875 .. code-block:: pycon 

1876 

1877 >>> stream = io.BytesIO( 

1878 ... b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03') 

1879 >>> 

1880 >>> for asn1Object in StreamingDecoder(stream): 

1881 ... print(asn1Object) 

1882 >>> 

1883 SequenceOf: 

1884 1 2 3 

1885 

1886 Decode BER serialisation with ASN.1 schema 

1887 

1888 .. code-block:: pycon 

1889 

1890 >>> stream = io.BytesIO( 

1891 ... b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03') 

1892 >>> 

1893 >>> schema = SequenceOf(componentType=Integer()) 

1894 >>> 

1895 >>> decoder = StreamingDecoder(stream, asn1Spec=schema) 

1896 >>> for asn1Object in decoder: 

1897 ... print(asn1Object) 

1898 >>> 

1899 SequenceOf: 

1900 1 2 3 

1901 """ 

1902 

1903 SINGLE_ITEM_DECODER = SingleItemDecoder 

1904 

1905 def __init__(self, substrate, asn1Spec=None, **options): 

1906 self._singleItemDecoder = self.SINGLE_ITEM_DECODER(**options) 

1907 self._substrate = asSeekableStream(substrate) 

1908 self._asn1Spec = asn1Spec 

1909 self._options = options 

1910 

1911 def __iter__(self): 

1912 while True: 

1913 for asn1Object in self._singleItemDecoder( 

1914 self._substrate, self._asn1Spec, **self._options): 

1915 yield asn1Object 

1916 

1917 for chunk in isEndOfStream(self._substrate): 

1918 if isinstance(chunk, SubstrateUnderrunError): 

1919 yield 

1920 

1921 break 

1922 

1923 if chunk: 

1924 break 

1925 

1926 

1927class Decoder(object): 

1928 """Create a BER decoder object. 

1929 

1930 Parse BER/CER/DER octet-stream into one, possibly nested, ASN.1 object. 

1931 """ 

1932 STREAMING_DECODER = StreamingDecoder 

1933 

1934 @classmethod 

1935 def __call__(cls, substrate, asn1Spec=None, **options): 

1936 """Turns BER/CER/DER octet stream into an ASN.1 object. 

1937 

1938 Takes BER/CER/DER octet-stream in form of :py:class:`bytes` (Python 3) 

1939 or :py:class:`str` (Python 2) and decode it into an ASN.1 object 

1940 (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) which 

1941 may be a scalar or an arbitrary nested structure. 

1942 

1943 Parameters 

1944 ---------- 

1945 substrate: :py:class:`bytes` (Python 3) or :py:class:`str` (Python 2) 

1946 BER/CER/DER octet-stream to parse 

1947 

1948 Keyword Args 

1949 ------------ 

1950 asn1Spec: :py:class:`~pyasn1.type.base.PyAsn1Item` 

1951 A pyasn1 type object (:py:class:`~pyasn1.type.base.PyAsn1Item` 

1952 derivative) to act as a template guiding the decoder. 

1953 Depending on the ASN.1 structure being decoded, `asn1Spec` may or 

1954 may not be required. Most common reason for it to require is that 

1955 ASN.1 structure is encoded in *IMPLICIT* tagging mode. 

1956 

1957 Returns 

1958 ------- 

1959 : :py:class:`tuple` 

1960 A tuple of :py:class:`~pyasn1.type.base.PyAsn1Item` object 

1961 recovered from BER/CER/DER substrate and the unprocessed trailing 

1962 portion of the `substrate` (may be empty) 

1963 

1964 Raises 

1965 ------ 

1966 : :py:class:`~pyasn1.error.PyAsn1Error` 

1967 :py:class:`~pyasn1.error.SubstrateUnderrunError` on insufficient 

1968 input or :py:class:`~pyasn1.error.PyAsn1Error` on decoding error. 

1969 

1970 Examples 

1971 -------- 

1972 Decode BER/CER/DER serialisation without ASN.1 schema 

1973 

1974 .. code-block:: pycon 

1975 

1976 >>> s, unprocessed = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03') 

1977 >>> str(s) 

1978 SequenceOf: 

1979 1 2 3 

1980 

1981 Decode BER/CER/DER serialisation with ASN.1 schema 

1982 

1983 .. code-block:: pycon 

1984 

1985 >>> seq = SequenceOf(componentType=Integer()) 

1986 >>> s, unprocessed = decode( 

1987 b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03', asn1Spec=seq) 

1988 >>> str(s) 

1989 SequenceOf: 

1990 1 2 3 

1991 

1992 """ 

1993 substrate = asSeekableStream(substrate) 

1994 

1995 streamingDecoder = cls.STREAMING_DECODER( 

1996 substrate, asn1Spec, **options) 

1997 

1998 for asn1Object in streamingDecoder: 

1999 if isinstance(asn1Object, SubstrateUnderrunError): 

2000 raise error.SubstrateUnderrunError('Short substrate on input') 

2001 

2002 try: 

2003 tail = next(readFromStream(substrate)) 

2004 

2005 except error.EndOfStreamError: 

2006 tail = null 

2007 

2008 return asn1Object, tail 

2009 

2010 

2011#: Turns BER octet stream into an ASN.1 object. 

2012#: 

2013#: Takes BER octet-stream and decode it into an ASN.1 object 

2014#: (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) which 

2015#: may be a scalar or an arbitrary nested structure. 

2016#: 

2017#: Parameters 

2018#: ---------- 

2019#: substrate: :py:class:`bytes` (Python 3) or :py:class:`str` (Python 2) 

2020#: BER octet-stream 

2021#: 

2022#: Keyword Args 

2023#: ------------ 

2024#: asn1Spec: any pyasn1 type object e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative 

2025#: A pyasn1 type object to act as a template guiding the decoder. Depending on the ASN.1 structure 

2026#: being decoded, *asn1Spec* may or may not be required. Most common reason for 

2027#: it to require is that ASN.1 structure is encoded in *IMPLICIT* tagging mode. 

2028#: 

2029#: Returns 

2030#: ------- 

2031#: : :py:class:`tuple` 

2032#: A tuple of pyasn1 object recovered from BER substrate (:py:class:`~pyasn1.type.base.PyAsn1Item` derivative) 

2033#: and the unprocessed trailing portion of the *substrate* (may be empty) 

2034#: 

2035#: Raises 

2036#: ------ 

2037#: ~pyasn1.error.PyAsn1Error, ~pyasn1.error.SubstrateUnderrunError 

2038#: On decoding errors 

2039#: 

2040#: Notes 

2041#: ----- 

2042#: This function is deprecated. Please use :py:class:`Decoder` or 

2043#: :py:class:`StreamingDecoder` class instance. 

2044#: 

2045#: Examples 

2046#: -------- 

2047#: Decode BER serialisation without ASN.1 schema 

2048#: 

2049#: .. code-block:: pycon 

2050#: 

2051#: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03') 

2052#: >>> str(s) 

2053#: SequenceOf: 

2054#: 1 2 3 

2055#: 

2056#: Decode BER serialisation with ASN.1 schema 

2057#: 

2058#: .. code-block:: pycon 

2059#: 

2060#: >>> seq = SequenceOf(componentType=Integer()) 

2061#: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03', asn1Spec=seq) 

2062#: >>> str(s) 

2063#: SequenceOf: 

2064#: 1 2 3 

2065#: 

2066decode = Decoder()