Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pyasn1/codec/ber/decoder.py: 16%

817 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:25 +0000

1# 

2# This file is part of pyasn1 software. 

3# 

4# Copyright (c) 2005-2019, Ilya Etingof <etingof@gmail.com> 

5# License: http://snmplabs.com/pyasn1/license.html 

6# 

7from pyasn1 import debug 

8from pyasn1 import error 

9from pyasn1.codec.ber import eoo 

10from pyasn1.compat.integer import from_bytes 

11from pyasn1.compat.octets import oct2int, octs2ints, ints2octs, null 

12from pyasn1.type import base 

13from pyasn1.type import char 

14from pyasn1.type import tag 

15from pyasn1.type import tagmap 

16from pyasn1.type import univ 

17from pyasn1.type import useful 

18 

19__all__ = ['decode'] 

20 

21LOG = debug.registerLoggee(__name__, flags=debug.DEBUG_DECODER) 

22 

23noValue = base.noValue 

24 

25 

26class AbstractDecoder(object): 

27 protoComponent = None 

28 

29 def valueDecoder(self, substrate, asn1Spec, 

30 tagSet=None, length=None, state=None, 

31 decodeFun=None, substrateFun=None, 

32 **options): 

33 raise error.PyAsn1Error('Decoder not implemented for %s' % (tagSet,)) 

34 

35 def indefLenValueDecoder(self, substrate, asn1Spec, 

36 tagSet=None, length=None, state=None, 

37 decodeFun=None, substrateFun=None, 

38 **options): 

39 raise error.PyAsn1Error('Indefinite length mode decoder not implemented for %s' % (tagSet,)) 

40 

41 

42class AbstractSimpleDecoder(AbstractDecoder): 

43 @staticmethod 

44 def substrateCollector(asn1Object, substrate, length): 

45 return substrate[:length], substrate[length:] 

46 

47 def _createComponent(self, asn1Spec, tagSet, value, **options): 

48 if options.get('native'): 

49 return value 

50 elif asn1Spec is None: 

51 return self.protoComponent.clone(value, tagSet=tagSet) 

52 elif value is noValue: 

53 return asn1Spec 

54 else: 

55 return asn1Spec.clone(value) 

56 

57 

58class ExplicitTagDecoder(AbstractSimpleDecoder): 

59 protoComponent = univ.Any('') 

60 

61 def valueDecoder(self, substrate, asn1Spec, 

62 tagSet=None, length=None, state=None, 

63 decodeFun=None, substrateFun=None, 

64 **options): 

65 if substrateFun: 

66 return substrateFun( 

67 self._createComponent(asn1Spec, tagSet, '', **options), 

68 substrate, length 

69 ) 

70 

71 head, tail = substrate[:length], substrate[length:] 

72 

73 value, _ = decodeFun(head, asn1Spec, tagSet, length, **options) 

74 

75 if LOG: 

76 LOG('explicit tag container carries %d octets of trailing payload ' 

77 '(will be lost!): %s' % (len(_), debug.hexdump(_))) 

78 

79 return value, tail 

80 

81 def indefLenValueDecoder(self, substrate, asn1Spec, 

82 tagSet=None, length=None, state=None, 

83 decodeFun=None, substrateFun=None, 

84 **options): 

85 if substrateFun: 

86 return substrateFun( 

87 self._createComponent(asn1Spec, tagSet, '', **options), 

88 substrate, length 

89 ) 

90 

91 value, substrate = decodeFun(substrate, asn1Spec, tagSet, length, **options) 

92 

93 eooMarker, substrate = decodeFun(substrate, allowEoo=True, **options) 

94 

95 if eooMarker is eoo.endOfOctets: 

96 return value, substrate 

97 else: 

98 raise error.PyAsn1Error('Missing end-of-octets terminator') 

99 

100 

101explicitTagDecoder = ExplicitTagDecoder() 

102 

103 

104class IntegerDecoder(AbstractSimpleDecoder): 

105 protoComponent = univ.Integer(0) 

106 

107 def valueDecoder(self, substrate, asn1Spec, 

108 tagSet=None, length=None, state=None, 

109 decodeFun=None, substrateFun=None, 

110 **options): 

111 

112 if tagSet[0].tagFormat != tag.tagFormatSimple: 

113 raise error.PyAsn1Error('Simple tag format expected') 

114 

115 head, tail = substrate[:length], substrate[length:] 

116 

117 if not head: 

118 return self._createComponent(asn1Spec, tagSet, 0, **options), tail 

119 

120 value = from_bytes(head, signed=True) 

121 

122 return self._createComponent(asn1Spec, tagSet, value, **options), tail 

123 

124 

125class BooleanDecoder(IntegerDecoder): 

126 protoComponent = univ.Boolean(0) 

127 

128 def _createComponent(self, asn1Spec, tagSet, value, **options): 

129 return IntegerDecoder._createComponent( 

130 self, asn1Spec, tagSet, value and 1 or 0, **options) 

131 

132 

133class BitStringDecoder(AbstractSimpleDecoder): 

134 protoComponent = univ.BitString(()) 

135 supportConstructedForm = True 

136 

137 def valueDecoder(self, substrate, asn1Spec, 

138 tagSet=None, length=None, state=None, 

139 decodeFun=None, substrateFun=None, 

140 **options): 

141 head, tail = substrate[:length], substrate[length:] 

142 

143 if substrateFun: 

144 return substrateFun(self._createComponent( 

145 asn1Spec, tagSet, noValue, **options), substrate, length) 

146 

147 if not head: 

148 raise error.PyAsn1Error('Empty BIT STRING substrate') 

149 

150 if tagSet[0].tagFormat == tag.tagFormatSimple: # XXX what tag to check? 

151 

152 trailingBits = oct2int(head[0]) 

153 if trailingBits > 7: 

154 raise error.PyAsn1Error( 

155 'Trailing bits overflow %s' % trailingBits 

156 ) 

157 

158 value = self.protoComponent.fromOctetString( 

159 head[1:], internalFormat=True, padding=trailingBits) 

160 

161 return self._createComponent(asn1Spec, tagSet, value, **options), tail 

162 

163 if not self.supportConstructedForm: 

164 raise error.PyAsn1Error('Constructed encoding form prohibited ' 

165 'at %s' % self.__class__.__name__) 

166 

167 if LOG: 

168 LOG('assembling constructed serialization') 

169 

170 # All inner fragments are of the same type, treat them as octet string 

171 substrateFun = self.substrateCollector 

172 

173 bitString = self.protoComponent.fromOctetString(null, internalFormat=True) 

174 

175 while head: 

176 component, head = decodeFun(head, self.protoComponent, 

177 substrateFun=substrateFun, **options) 

178 

179 trailingBits = oct2int(component[0]) 

180 if trailingBits > 7: 

181 raise error.PyAsn1Error( 

182 'Trailing bits overflow %s' % trailingBits 

183 ) 

184 

185 bitString = self.protoComponent.fromOctetString( 

186 component[1:], internalFormat=True, 

187 prepend=bitString, padding=trailingBits 

188 ) 

189 

190 return self._createComponent(asn1Spec, tagSet, bitString, **options), tail 

191 

192 def indefLenValueDecoder(self, substrate, asn1Spec, 

193 tagSet=None, length=None, state=None, 

194 decodeFun=None, substrateFun=None, 

195 **options): 

196 

197 if substrateFun: 

198 return substrateFun(self._createComponent(asn1Spec, tagSet, noValue, **options), substrate, length) 

199 

200 # All inner fragments are of the same type, treat them as octet string 

201 substrateFun = self.substrateCollector 

202 

203 bitString = self.protoComponent.fromOctetString(null, internalFormat=True) 

204 

205 while substrate: 

206 component, substrate = decodeFun(substrate, self.protoComponent, 

207 substrateFun=substrateFun, 

208 allowEoo=True, **options) 

209 if component is eoo.endOfOctets: 

210 break 

211 

212 trailingBits = oct2int(component[0]) 

213 if trailingBits > 7: 

214 raise error.PyAsn1Error( 

215 'Trailing bits overflow %s' % trailingBits 

216 ) 

217 

218 bitString = self.protoComponent.fromOctetString( 

219 component[1:], internalFormat=True, 

220 prepend=bitString, padding=trailingBits 

221 ) 

222 

223 else: 

224 raise error.SubstrateUnderrunError('No EOO seen before substrate ends') 

225 

226 return self._createComponent(asn1Spec, tagSet, bitString, **options), substrate 

227 

228 

229class OctetStringDecoder(AbstractSimpleDecoder): 

230 protoComponent = univ.OctetString('') 

231 supportConstructedForm = True 

232 

233 def valueDecoder(self, substrate, asn1Spec, 

234 tagSet=None, length=None, state=None, 

235 decodeFun=None, substrateFun=None, 

236 **options): 

237 head, tail = substrate[:length], substrate[length:] 

238 

239 if substrateFun: 

240 return substrateFun(self._createComponent(asn1Spec, tagSet, noValue, **options), 

241 substrate, length) 

242 

243 if tagSet[0].tagFormat == tag.tagFormatSimple: # XXX what tag to check? 

244 return self._createComponent(asn1Spec, tagSet, head, **options), tail 

245 

246 if not self.supportConstructedForm: 

247 raise error.PyAsn1Error('Constructed encoding form prohibited at %s' % self.__class__.__name__) 

248 

249 if LOG: 

250 LOG('assembling constructed serialization') 

251 

252 # All inner fragments are of the same type, treat them as octet string 

253 substrateFun = self.substrateCollector 

254 

255 header = null 

256 

257 while head: 

258 component, head = decodeFun(head, self.protoComponent, 

259 substrateFun=substrateFun, 

260 **options) 

261 header += component 

262 

263 return self._createComponent(asn1Spec, tagSet, header, **options), tail 

264 

265 def indefLenValueDecoder(self, substrate, asn1Spec, 

266 tagSet=None, length=None, state=None, 

267 decodeFun=None, substrateFun=None, 

268 **options): 

269 if substrateFun and substrateFun is not self.substrateCollector: 

270 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options) 

271 return substrateFun(asn1Object, substrate, length) 

272 

273 # All inner fragments are of the same type, treat them as octet string 

274 substrateFun = self.substrateCollector 

275 

276 header = null 

277 

278 while substrate: 

279 component, substrate = decodeFun(substrate, 

280 self.protoComponent, 

281 substrateFun=substrateFun, 

282 allowEoo=True, **options) 

283 if component is eoo.endOfOctets: 

284 break 

285 

286 header += component 

287 

288 else: 

289 raise error.SubstrateUnderrunError( 

290 'No EOO seen before substrate ends' 

291 ) 

292 

293 return self._createComponent(asn1Spec, tagSet, header, **options), substrate 

294 

295 

296class NullDecoder(AbstractSimpleDecoder): 

297 protoComponent = univ.Null('') 

298 

299 def valueDecoder(self, substrate, asn1Spec, 

300 tagSet=None, length=None, state=None, 

301 decodeFun=None, substrateFun=None, 

302 **options): 

303 

304 if tagSet[0].tagFormat != tag.tagFormatSimple: 

305 raise error.PyAsn1Error('Simple tag format expected') 

306 

307 head, tail = substrate[:length], substrate[length:] 

308 

309 component = self._createComponent(asn1Spec, tagSet, '', **options) 

310 

311 if head: 

312 raise error.PyAsn1Error('Unexpected %d-octet substrate for Null' % length) 

313 

314 return component, tail 

315 

316 

317class ObjectIdentifierDecoder(AbstractSimpleDecoder): 

318 protoComponent = univ.ObjectIdentifier(()) 

319 

320 def valueDecoder(self, substrate, asn1Spec, 

321 tagSet=None, length=None, state=None, 

322 decodeFun=None, substrateFun=None, 

323 **options): 

324 if tagSet[0].tagFormat != tag.tagFormatSimple: 

325 raise error.PyAsn1Error('Simple tag format expected') 

326 

327 head, tail = substrate[:length], substrate[length:] 

328 if not head: 

329 raise error.PyAsn1Error('Empty substrate') 

330 

331 head = octs2ints(head) 

332 

333 oid = () 

334 index = 0 

335 substrateLen = len(head) 

336 while index < substrateLen: 

337 subId = head[index] 

338 index += 1 

339 if subId < 128: 

340 oid += (subId,) 

341 elif subId > 128: 

342 # Construct subid from a number of octets 

343 nextSubId = subId 

344 subId = 0 

345 while nextSubId >= 128: 

346 subId = (subId << 7) + (nextSubId & 0x7F) 

347 if index >= substrateLen: 

348 raise error.SubstrateUnderrunError( 

349 'Short substrate for sub-OID past %s' % (oid,) 

350 ) 

351 nextSubId = head[index] 

352 index += 1 

353 oid += ((subId << 7) + nextSubId,) 

354 elif subId == 128: 

355 # ASN.1 spec forbids leading zeros (0x80) in OID 

356 # encoding, tolerating it opens a vulnerability. See 

357 # https://www.esat.kuleuven.be/cosic/publications/article-1432.pdf 

358 # page 7 

359 raise error.PyAsn1Error('Invalid octet 0x80 in OID encoding') 

360 

361 # Decode two leading arcs 

362 if 0 <= oid[0] <= 39: 

363 oid = (0,) + oid 

364 elif 40 <= oid[0] <= 79: 

365 oid = (1, oid[0] - 40) + oid[1:] 

366 elif oid[0] >= 80: 

367 oid = (2, oid[0] - 80) + oid[1:] 

368 else: 

369 raise error.PyAsn1Error('Malformed first OID octet: %s' % head[0]) 

370 

371 return self._createComponent(asn1Spec, tagSet, oid, **options), tail 

372 

373 

374class RealDecoder(AbstractSimpleDecoder): 

375 protoComponent = univ.Real() 

376 

377 def valueDecoder(self, substrate, asn1Spec, 

378 tagSet=None, length=None, state=None, 

379 decodeFun=None, substrateFun=None, 

380 **options): 

381 if tagSet[0].tagFormat != tag.tagFormatSimple: 

382 raise error.PyAsn1Error('Simple tag format expected') 

383 

384 head, tail = substrate[:length], substrate[length:] 

385 

386 if not head: 

387 return self._createComponent(asn1Spec, tagSet, 0.0, **options), tail 

388 

389 fo = oct2int(head[0]) 

390 head = head[1:] 

391 if fo & 0x80: # binary encoding 

392 if not head: 

393 raise error.PyAsn1Error("Incomplete floating-point value") 

394 

395 if LOG: 

396 LOG('decoding binary encoded REAL') 

397 

398 n = (fo & 0x03) + 1 

399 

400 if n == 4: 

401 n = oct2int(head[0]) 

402 head = head[1:] 

403 

404 eo, head = head[:n], head[n:] 

405 

406 if not eo or not head: 

407 raise error.PyAsn1Error('Real exponent screwed') 

408 

409 e = oct2int(eo[0]) & 0x80 and -1 or 0 

410 

411 while eo: # exponent 

412 e <<= 8 

413 e |= oct2int(eo[0]) 

414 eo = eo[1:] 

415 

416 b = fo >> 4 & 0x03 # base bits 

417 

418 if b > 2: 

419 raise error.PyAsn1Error('Illegal Real base') 

420 

421 if b == 1: # encbase = 8 

422 e *= 3 

423 

424 elif b == 2: # encbase = 16 

425 e *= 4 

426 p = 0 

427 

428 while head: # value 

429 p <<= 8 

430 p |= oct2int(head[0]) 

431 head = head[1:] 

432 

433 if fo & 0x40: # sign bit 

434 p = -p 

435 

436 sf = fo >> 2 & 0x03 # scale bits 

437 p *= 2 ** sf 

438 value = (p, 2, e) 

439 

440 elif fo & 0x40: # infinite value 

441 if LOG: 

442 LOG('decoding infinite REAL') 

443 

444 value = fo & 0x01 and '-inf' or 'inf' 

445 

446 elif fo & 0xc0 == 0: # character encoding 

447 if not head: 

448 raise error.PyAsn1Error("Incomplete floating-point value") 

449 

450 if LOG: 

451 LOG('decoding character encoded REAL') 

452 

453 try: 

454 if fo & 0x3 == 0x1: # NR1 

455 value = (int(head), 10, 0) 

456 

457 elif fo & 0x3 == 0x2: # NR2 

458 value = float(head) 

459 

460 elif fo & 0x3 == 0x3: # NR3 

461 value = float(head) 

462 

463 else: 

464 raise error.SubstrateUnderrunError( 

465 'Unknown NR (tag %s)' % fo 

466 ) 

467 

468 except ValueError: 

469 raise error.SubstrateUnderrunError( 

470 'Bad character Real syntax' 

471 ) 

472 

473 else: 

474 raise error.SubstrateUnderrunError( 

475 'Unknown encoding (tag %s)' % fo 

476 ) 

477 

478 return self._createComponent(asn1Spec, tagSet, value, **options), tail 

479 

480 

481class AbstractConstructedDecoder(AbstractDecoder): 

482 protoComponent = None 

483 

484 

485class UniversalConstructedTypeDecoder(AbstractConstructedDecoder): 

486 protoRecordComponent = None 

487 protoSequenceComponent = None 

488 

489 def _getComponentTagMap(self, asn1Object, idx): 

490 raise NotImplementedError() 

491 

492 def _getComponentPositionByType(self, asn1Object, tagSet, idx): 

493 raise NotImplementedError() 

494 

495 def _decodeComponents(self, substrate, tagSet=None, decodeFun=None, **options): 

496 components = [] 

497 componentTypes = set() 

498 

499 while substrate: 

500 component, substrate = decodeFun(substrate, **options) 

501 if component is eoo.endOfOctets: 

502 break 

503 

504 components.append(component) 

505 componentTypes.add(component.tagSet) 

506 

507 # Now we have to guess is it SEQUENCE/SET or SEQUENCE OF/SET OF 

508 # The heuristics is: 

509 # * 1+ components of different types -> likely SEQUENCE/SET 

510 # * otherwise -> likely SEQUENCE OF/SET OF 

511 if len(componentTypes) > 1: 

512 protoComponent = self.protoRecordComponent 

513 

514 else: 

515 protoComponent = self.protoSequenceComponent 

516 

517 asn1Object = protoComponent.clone( 

518 # construct tagSet from base tag from prototype ASN.1 object 

519 # and additional tags recovered from the substrate 

520 tagSet=tag.TagSet(protoComponent.tagSet.baseTag, *tagSet.superTags) 

521 ) 

522 

523 if LOG: 

524 LOG('guessed %r container type (pass `asn1Spec` to guide the ' 

525 'decoder)' % asn1Object) 

526 

527 for idx, component in enumerate(components): 

528 asn1Object.setComponentByPosition( 

529 idx, component, 

530 verifyConstraints=False, 

531 matchTags=False, matchConstraints=False 

532 ) 

533 

534 return asn1Object, substrate 

535 

536 def valueDecoder(self, substrate, asn1Spec, 

537 tagSet=None, length=None, state=None, 

538 decodeFun=None, substrateFun=None, 

539 **options): 

540 if tagSet[0].tagFormat != tag.tagFormatConstructed: 

541 raise error.PyAsn1Error('Constructed tag format expected') 

542 

543 head, tail = substrate[:length], substrate[length:] 

544 

545 if substrateFun is not None: 

546 if asn1Spec is not None: 

547 asn1Object = asn1Spec.clone() 

548 

549 elif self.protoComponent is not None: 

550 asn1Object = self.protoComponent.clone(tagSet=tagSet) 

551 

552 else: 

553 asn1Object = self.protoRecordComponent, self.protoSequenceComponent 

554 

555 return substrateFun(asn1Object, substrate, length) 

556 

557 if asn1Spec is None: 

558 asn1Object, trailing = self._decodeComponents( 

559 head, tagSet=tagSet, decodeFun=decodeFun, **options 

560 ) 

561 

562 if trailing: 

563 if LOG: 

564 LOG('Unused trailing %d octets encountered: %s' % ( 

565 len(trailing), debug.hexdump(trailing))) 

566 

567 return asn1Object, tail 

568 

569 asn1Object = asn1Spec.clone() 

570 asn1Object.clear() 

571 

572 if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId): 

573 

574 namedTypes = asn1Spec.componentType 

575 

576 isSetType = asn1Spec.typeId == univ.Set.typeId 

577 isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault 

578 

579 if LOG: 

580 LOG('decoding %sdeterministic %s type %r chosen by type ID' % ( 

581 not isDeterministic and 'non-' or '', isSetType and 'SET' or '', 

582 asn1Spec)) 

583 

584 seenIndices = set() 

585 idx = 0 

586 while head: 

587 if not namedTypes: 

588 componentType = None 

589 

590 elif isSetType: 

591 componentType = namedTypes.tagMapUnique 

592 

593 else: 

594 try: 

595 if isDeterministic: 

596 componentType = namedTypes[idx].asn1Object 

597 

598 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 

599 componentType = namedTypes.getTagMapNearPosition(idx) 

600 

601 else: 

602 componentType = namedTypes[idx].asn1Object 

603 

604 except IndexError: 

605 raise error.PyAsn1Error( 

606 'Excessive components decoded at %r' % (asn1Spec,) 

607 ) 

608 

609 component, head = decodeFun(head, componentType, **options) 

610 

611 if not isDeterministic and namedTypes: 

612 if isSetType: 

613 idx = namedTypes.getPositionByType(component.effectiveTagSet) 

614 

615 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 

616 idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx) 

617 

618 asn1Object.setComponentByPosition( 

619 idx, component, 

620 verifyConstraints=False, 

621 matchTags=False, matchConstraints=False 

622 ) 

623 

624 seenIndices.add(idx) 

625 idx += 1 

626 

627 if LOG: 

628 LOG('seen component indices %s' % seenIndices) 

629 

630 if namedTypes: 

631 if not namedTypes.requiredComponents.issubset(seenIndices): 

632 raise error.PyAsn1Error( 

633 'ASN.1 object %s has uninitialized ' 

634 'components' % asn1Object.__class__.__name__) 

635 

636 if namedTypes.hasOpenTypes: 

637 

638 openTypes = options.get('openTypes', {}) 

639 

640 if LOG: 

641 LOG('user-specified open types map:') 

642 

643 for k, v in openTypes.items(): 

644 LOG('%s -> %r' % (k, v)) 

645 

646 if openTypes or options.get('decodeOpenTypes', False): 

647 

648 for idx, namedType in enumerate(namedTypes.namedTypes): 

649 if not namedType.openType: 

650 continue 

651 

652 if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue: 

653 continue 

654 

655 governingValue = asn1Object.getComponentByName( 

656 namedType.openType.name 

657 ) 

658 

659 try: 

660 openType = openTypes[governingValue] 

661 

662 except KeyError: 

663 

664 if LOG: 

665 LOG('default open types map of component ' 

666 '"%s.%s" governed by component "%s.%s"' 

667 ':' % (asn1Object.__class__.__name__, 

668 namedType.name, 

669 asn1Object.__class__.__name__, 

670 namedType.openType.name)) 

671 

672 for k, v in namedType.openType.items(): 

673 LOG('%s -> %r' % (k, v)) 

674 

675 try: 

676 openType = namedType.openType[governingValue] 

677 

678 except KeyError: 

679 if LOG: 

680 LOG('failed to resolve open type by governing ' 

681 'value %r' % (governingValue,)) 

682 continue 

683 

684 if LOG: 

685 LOG('resolved open type %r by governing ' 

686 'value %r' % (openType, governingValue)) 

687 

688 containerValue = asn1Object.getComponentByPosition(idx) 

689 

690 if containerValue.typeId in ( 

691 univ.SetOf.typeId, univ.SequenceOf.typeId): 

692 

693 for pos, containerElement in enumerate( 

694 containerValue): 

695 

696 component, rest = decodeFun( 

697 containerValue[pos].asOctets(), 

698 asn1Spec=openType, **options 

699 ) 

700 

701 containerValue[pos] = component 

702 

703 else: 

704 component, rest = decodeFun( 

705 asn1Object.getComponentByPosition(idx).asOctets(), 

706 asn1Spec=openType, **options 

707 ) 

708 

709 asn1Object.setComponentByPosition(idx, component) 

710 

711 else: 

712 inconsistency = asn1Object.isInconsistent 

713 if inconsistency: 

714 raise inconsistency 

715 

716 else: 

717 asn1Object = asn1Spec.clone() 

718 asn1Object.clear() 

719 

720 componentType = asn1Spec.componentType 

721 

722 if LOG: 

723 LOG('decoding type %r chosen by given `asn1Spec`' % componentType) 

724 

725 idx = 0 

726 

727 while head: 

728 component, head = decodeFun(head, componentType, **options) 

729 asn1Object.setComponentByPosition( 

730 idx, component, 

731 verifyConstraints=False, 

732 matchTags=False, matchConstraints=False 

733 ) 

734 

735 idx += 1 

736 

737 return asn1Object, tail 

738 

739 def indefLenValueDecoder(self, substrate, asn1Spec, 

740 tagSet=None, length=None, state=None, 

741 decodeFun=None, substrateFun=None, 

742 **options): 

743 if tagSet[0].tagFormat != tag.tagFormatConstructed: 

744 raise error.PyAsn1Error('Constructed tag format expected') 

745 

746 if substrateFun is not None: 

747 if asn1Spec is not None: 

748 asn1Object = asn1Spec.clone() 

749 

750 elif self.protoComponent is not None: 

751 asn1Object = self.protoComponent.clone(tagSet=tagSet) 

752 

753 else: 

754 asn1Object = self.protoRecordComponent, self.protoSequenceComponent 

755 

756 return substrateFun(asn1Object, substrate, length) 

757 

758 if asn1Spec is None: 

759 return self._decodeComponents( 

760 substrate, tagSet=tagSet, decodeFun=decodeFun, 

761 **dict(options, allowEoo=True) 

762 ) 

763 

764 asn1Object = asn1Spec.clone() 

765 asn1Object.clear() 

766 

767 if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId): 

768 

769 namedTypes = asn1Object.componentType 

770 

771 isSetType = asn1Object.typeId == univ.Set.typeId 

772 isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault 

773 

774 if LOG: 

775 LOG('decoding %sdeterministic %s type %r chosen by type ID' % ( 

776 not isDeterministic and 'non-' or '', isSetType and 'SET' or '', 

777 asn1Spec)) 

778 

779 seenIndices = set() 

780 idx = 0 

781 while substrate: 

782 if len(namedTypes) <= idx: 

783 asn1Spec = None 

784 

785 elif isSetType: 

786 asn1Spec = namedTypes.tagMapUnique 

787 

788 else: 

789 try: 

790 if isDeterministic: 

791 asn1Spec = namedTypes[idx].asn1Object 

792 

793 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 

794 asn1Spec = namedTypes.getTagMapNearPosition(idx) 

795 

796 else: 

797 asn1Spec = namedTypes[idx].asn1Object 

798 

799 except IndexError: 

800 raise error.PyAsn1Error( 

801 'Excessive components decoded at %r' % (asn1Object,) 

802 ) 

803 

804 component, substrate = decodeFun(substrate, asn1Spec, allowEoo=True, **options) 

805 if component is eoo.endOfOctets: 

806 break 

807 

808 if not isDeterministic and namedTypes: 

809 if isSetType: 

810 idx = namedTypes.getPositionByType(component.effectiveTagSet) 

811 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 

812 idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx) 

813 

814 asn1Object.setComponentByPosition( 

815 idx, component, 

816 verifyConstraints=False, 

817 matchTags=False, matchConstraints=False 

818 ) 

819 

820 seenIndices.add(idx) 

821 idx += 1 

822 

823 else: 

824 raise error.SubstrateUnderrunError( 

825 'No EOO seen before substrate ends' 

826 ) 

827 

828 if LOG: 

829 LOG('seen component indices %s' % seenIndices) 

830 

831 if namedTypes: 

832 if not namedTypes.requiredComponents.issubset(seenIndices): 

833 raise error.PyAsn1Error('ASN.1 object %s has uninitialized components' % asn1Object.__class__.__name__) 

834 

835 if namedTypes.hasOpenTypes: 

836 

837 openTypes = options.get('openTypes', {}) 

838 

839 if LOG: 

840 LOG('user-specified open types map:') 

841 

842 for k, v in openTypes.items(): 

843 LOG('%s -> %r' % (k, v)) 

844 

845 if openTypes or options.get('decodeOpenTypes', False): 

846 

847 for idx, namedType in enumerate(namedTypes.namedTypes): 

848 if not namedType.openType: 

849 continue 

850 

851 if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue: 

852 continue 

853 

854 governingValue = asn1Object.getComponentByName( 

855 namedType.openType.name 

856 ) 

857 

858 try: 

859 openType = openTypes[governingValue] 

860 

861 except KeyError: 

862 

863 if LOG: 

864 LOG('default open types map of component ' 

865 '"%s.%s" governed by component "%s.%s"' 

866 ':' % (asn1Object.__class__.__name__, 

867 namedType.name, 

868 asn1Object.__class__.__name__, 

869 namedType.openType.name)) 

870 

871 for k, v in namedType.openType.items(): 

872 LOG('%s -> %r' % (k, v)) 

873 

874 try: 

875 openType = namedType.openType[governingValue] 

876 

877 except KeyError: 

878 if LOG: 

879 LOG('failed to resolve open type by governing ' 

880 'value %r' % (governingValue,)) 

881 continue 

882 

883 if LOG: 

884 LOG('resolved open type %r by governing ' 

885 'value %r' % (openType, governingValue)) 

886 

887 containerValue = asn1Object.getComponentByPosition(idx) 

888 

889 if containerValue.typeId in ( 

890 univ.SetOf.typeId, univ.SequenceOf.typeId): 

891 

892 for pos, containerElement in enumerate( 

893 containerValue): 

894 

895 component, rest = decodeFun( 

896 containerValue[pos].asOctets(), 

897 asn1Spec=openType, **dict(options, allowEoo=True) 

898 ) 

899 

900 containerValue[pos] = component 

901 

902 else: 

903 component, rest = decodeFun( 

904 asn1Object.getComponentByPosition(idx).asOctets(), 

905 asn1Spec=openType, **dict(options, allowEoo=True) 

906 ) 

907 

908 if component is not eoo.endOfOctets: 

909 asn1Object.setComponentByPosition(idx, component) 

910 

911 else: 

912 inconsistency = asn1Object.isInconsistent 

913 if inconsistency: 

914 raise inconsistency 

915 

916 else: 

917 asn1Object = asn1Spec.clone() 

918 asn1Object.clear() 

919 

920 componentType = asn1Spec.componentType 

921 

922 if LOG: 

923 LOG('decoding type %r chosen by given `asn1Spec`' % componentType) 

924 

925 idx = 0 

926 

927 while substrate: 

928 component, substrate = decodeFun(substrate, componentType, allowEoo=True, **options) 

929 

930 if component is eoo.endOfOctets: 

931 break 

932 

933 asn1Object.setComponentByPosition( 

934 idx, component, 

935 verifyConstraints=False, 

936 matchTags=False, matchConstraints=False 

937 ) 

938 

939 idx += 1 

940 

941 else: 

942 raise error.SubstrateUnderrunError( 

943 'No EOO seen before substrate ends' 

944 ) 

945 

946 return asn1Object, substrate 

947 

948 

949class SequenceOrSequenceOfDecoder(UniversalConstructedTypeDecoder): 

950 protoRecordComponent = univ.Sequence() 

951 protoSequenceComponent = univ.SequenceOf() 

952 

953 

954class SequenceDecoder(SequenceOrSequenceOfDecoder): 

955 protoComponent = univ.Sequence() 

956 

957 

958class SequenceOfDecoder(SequenceOrSequenceOfDecoder): 

959 protoComponent = univ.SequenceOf() 

960 

961 

962class SetOrSetOfDecoder(UniversalConstructedTypeDecoder): 

963 protoRecordComponent = univ.Set() 

964 protoSequenceComponent = univ.SetOf() 

965 

966 

967class SetDecoder(SetOrSetOfDecoder): 

968 protoComponent = univ.Set() 

969 

970 

971 

972class SetOfDecoder(SetOrSetOfDecoder): 

973 protoComponent = univ.SetOf() 

974 

975 

976class ChoiceDecoder(AbstractConstructedDecoder): 

977 protoComponent = univ.Choice() 

978 

979 def valueDecoder(self, substrate, asn1Spec, 

980 tagSet=None, length=None, state=None, 

981 decodeFun=None, substrateFun=None, 

982 **options): 

983 head, tail = substrate[:length], substrate[length:] 

984 

985 if asn1Spec is None: 

986 asn1Object = self.protoComponent.clone(tagSet=tagSet) 

987 

988 else: 

989 asn1Object = asn1Spec.clone() 

990 

991 if substrateFun: 

992 return substrateFun(asn1Object, substrate, length) 

993 

994 if asn1Object.tagSet == tagSet: 

995 if LOG: 

996 LOG('decoding %s as explicitly tagged CHOICE' % (tagSet,)) 

997 

998 component, head = decodeFun( 

999 head, asn1Object.componentTagMap, **options 

1000 ) 

1001 

1002 else: 

1003 if LOG: 

1004 LOG('decoding %s as untagged CHOICE' % (tagSet,)) 

1005 

1006 component, head = decodeFun( 

1007 head, asn1Object.componentTagMap, 

1008 tagSet, length, state, **options 

1009 ) 

1010 

1011 effectiveTagSet = component.effectiveTagSet 

1012 

1013 if LOG: 

1014 LOG('decoded component %s, effective tag set %s' % (component, effectiveTagSet)) 

1015 

1016 asn1Object.setComponentByType( 

1017 effectiveTagSet, component, 

1018 verifyConstraints=False, 

1019 matchTags=False, matchConstraints=False, 

1020 innerFlag=False 

1021 ) 

1022 

1023 return asn1Object, tail 

1024 

1025 def indefLenValueDecoder(self, substrate, asn1Spec, 

1026 tagSet=None, length=None, state=None, 

1027 decodeFun=None, substrateFun=None, 

1028 **options): 

1029 if asn1Spec is None: 

1030 asn1Object = self.protoComponent.clone(tagSet=tagSet) 

1031 else: 

1032 asn1Object = asn1Spec.clone() 

1033 

1034 if substrateFun: 

1035 return substrateFun(asn1Object, substrate, length) 

1036 

1037 if asn1Object.tagSet == tagSet: 

1038 if LOG: 

1039 LOG('decoding %s as explicitly tagged CHOICE' % (tagSet,)) 

1040 

1041 component, substrate = decodeFun( 

1042 substrate, asn1Object.componentType.tagMapUnique, **options 

1043 ) 

1044 

1045 # eat up EOO marker 

1046 eooMarker, substrate = decodeFun( 

1047 substrate, allowEoo=True, **options 

1048 ) 

1049 

1050 if eooMarker is not eoo.endOfOctets: 

1051 raise error.PyAsn1Error('No EOO seen before substrate ends') 

1052 

1053 else: 

1054 if LOG: 

1055 LOG('decoding %s as untagged CHOICE' % (tagSet,)) 

1056 

1057 component, substrate = decodeFun( 

1058 substrate, asn1Object.componentType.tagMapUnique, 

1059 tagSet, length, state, **options 

1060 ) 

1061 

1062 effectiveTagSet = component.effectiveTagSet 

1063 

1064 if LOG: 

1065 LOG('decoded component %s, effective tag set %s' % (component, effectiveTagSet)) 

1066 

1067 asn1Object.setComponentByType( 

1068 effectiveTagSet, component, 

1069 verifyConstraints=False, 

1070 matchTags=False, matchConstraints=False, 

1071 innerFlag=False 

1072 ) 

1073 

1074 return asn1Object, substrate 

1075 

1076 

1077class AnyDecoder(AbstractSimpleDecoder): 

1078 protoComponent = univ.Any() 

1079 

1080 def valueDecoder(self, substrate, asn1Spec, 

1081 tagSet=None, length=None, state=None, 

1082 decodeFun=None, substrateFun=None, 

1083 **options): 

1084 if asn1Spec is None: 

1085 isUntagged = True 

1086 

1087 elif asn1Spec.__class__ is tagmap.TagMap: 

1088 isUntagged = tagSet not in asn1Spec.tagMap 

1089 

1090 else: 

1091 isUntagged = tagSet != asn1Spec.tagSet 

1092 

1093 if isUntagged: 

1094 fullSubstrate = options['fullSubstrate'] 

1095 

1096 # untagged Any container, recover inner header substrate 

1097 length += len(fullSubstrate) - len(substrate) 

1098 substrate = fullSubstrate 

1099 

1100 if LOG: 

1101 LOG('decoding as untagged ANY, substrate %s' % debug.hexdump(substrate)) 

1102 

1103 if substrateFun: 

1104 return substrateFun(self._createComponent(asn1Spec, tagSet, noValue, **options), 

1105 substrate, length) 

1106 

1107 head, tail = substrate[:length], substrate[length:] 

1108 

1109 return self._createComponent(asn1Spec, tagSet, head, **options), tail 

1110 

1111 def indefLenValueDecoder(self, substrate, asn1Spec, 

1112 tagSet=None, length=None, state=None, 

1113 decodeFun=None, substrateFun=None, 

1114 **options): 

1115 if asn1Spec is None: 

1116 isTagged = False 

1117 

1118 elif asn1Spec.__class__ is tagmap.TagMap: 

1119 isTagged = tagSet in asn1Spec.tagMap 

1120 

1121 else: 

1122 isTagged = tagSet == asn1Spec.tagSet 

1123 

1124 if isTagged: 

1125 # tagged Any type -- consume header substrate 

1126 header = null 

1127 

1128 if LOG: 

1129 LOG('decoding as tagged ANY') 

1130 

1131 else: 

1132 fullSubstrate = options['fullSubstrate'] 

1133 

1134 # untagged Any, recover header substrate 

1135 header = fullSubstrate[:-len(substrate)] 

1136 

1137 if LOG: 

1138 LOG('decoding as untagged ANY, header substrate %s' % debug.hexdump(header)) 

1139 

1140 # Any components do not inherit initial tag 

1141 asn1Spec = self.protoComponent 

1142 

1143 if substrateFun and substrateFun is not self.substrateCollector: 

1144 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options) 

1145 return substrateFun(asn1Object, header + substrate, length + len(header)) 

1146 

1147 if LOG: 

1148 LOG('assembling constructed serialization') 

1149 

1150 # All inner fragments are of the same type, treat them as octet string 

1151 substrateFun = self.substrateCollector 

1152 

1153 while substrate: 

1154 component, substrate = decodeFun(substrate, asn1Spec, 

1155 substrateFun=substrateFun, 

1156 allowEoo=True, **options) 

1157 if component is eoo.endOfOctets: 

1158 break 

1159 

1160 header += component 

1161 

1162 else: 

1163 raise error.SubstrateUnderrunError( 

1164 'No EOO seen before substrate ends' 

1165 ) 

1166 

1167 if substrateFun: 

1168 return header, substrate 

1169 

1170 else: 

1171 return self._createComponent(asn1Spec, tagSet, header, **options), substrate 

1172 

1173 

1174# character string types 

1175class UTF8StringDecoder(OctetStringDecoder): 

1176 protoComponent = char.UTF8String() 

1177 

1178 

1179class NumericStringDecoder(OctetStringDecoder): 

1180 protoComponent = char.NumericString() 

1181 

1182 

1183class PrintableStringDecoder(OctetStringDecoder): 

1184 protoComponent = char.PrintableString() 

1185 

1186 

1187class TeletexStringDecoder(OctetStringDecoder): 

1188 protoComponent = char.TeletexString() 

1189 

1190 

1191class VideotexStringDecoder(OctetStringDecoder): 

1192 protoComponent = char.VideotexString() 

1193 

1194 

1195class IA5StringDecoder(OctetStringDecoder): 

1196 protoComponent = char.IA5String() 

1197 

1198 

1199class GraphicStringDecoder(OctetStringDecoder): 

1200 protoComponent = char.GraphicString() 

1201 

1202 

1203class VisibleStringDecoder(OctetStringDecoder): 

1204 protoComponent = char.VisibleString() 

1205 

1206 

1207class GeneralStringDecoder(OctetStringDecoder): 

1208 protoComponent = char.GeneralString() 

1209 

1210 

1211class UniversalStringDecoder(OctetStringDecoder): 

1212 protoComponent = char.UniversalString() 

1213 

1214 

1215class BMPStringDecoder(OctetStringDecoder): 

1216 protoComponent = char.BMPString() 

1217 

1218 

1219# "useful" types 

1220class ObjectDescriptorDecoder(OctetStringDecoder): 

1221 protoComponent = useful.ObjectDescriptor() 

1222 

1223 

1224class GeneralizedTimeDecoder(OctetStringDecoder): 

1225 protoComponent = useful.GeneralizedTime() 

1226 

1227 

1228class UTCTimeDecoder(OctetStringDecoder): 

1229 protoComponent = useful.UTCTime() 

1230 

1231 

1232tagMap = { 

1233 univ.Integer.tagSet: IntegerDecoder(), 

1234 univ.Boolean.tagSet: BooleanDecoder(), 

1235 univ.BitString.tagSet: BitStringDecoder(), 

1236 univ.OctetString.tagSet: OctetStringDecoder(), 

1237 univ.Null.tagSet: NullDecoder(), 

1238 univ.ObjectIdentifier.tagSet: ObjectIdentifierDecoder(), 

1239 univ.Enumerated.tagSet: IntegerDecoder(), 

1240 univ.Real.tagSet: RealDecoder(), 

1241 univ.Sequence.tagSet: SequenceOrSequenceOfDecoder(), # conflicts with SequenceOf 

1242 univ.Set.tagSet: SetOrSetOfDecoder(), # conflicts with SetOf 

1243 univ.Choice.tagSet: ChoiceDecoder(), # conflicts with Any 

1244 # character string types 

1245 char.UTF8String.tagSet: UTF8StringDecoder(), 

1246 char.NumericString.tagSet: NumericStringDecoder(), 

1247 char.PrintableString.tagSet: PrintableStringDecoder(), 

1248 char.TeletexString.tagSet: TeletexStringDecoder(), 

1249 char.VideotexString.tagSet: VideotexStringDecoder(), 

1250 char.IA5String.tagSet: IA5StringDecoder(), 

1251 char.GraphicString.tagSet: GraphicStringDecoder(), 

1252 char.VisibleString.tagSet: VisibleStringDecoder(), 

1253 char.GeneralString.tagSet: GeneralStringDecoder(), 

1254 char.UniversalString.tagSet: UniversalStringDecoder(), 

1255 char.BMPString.tagSet: BMPStringDecoder(), 

1256 # useful types 

1257 useful.ObjectDescriptor.tagSet: ObjectDescriptorDecoder(), 

1258 useful.GeneralizedTime.tagSet: GeneralizedTimeDecoder(), 

1259 useful.UTCTime.tagSet: UTCTimeDecoder() 

1260} 

1261 

1262# Type-to-codec map for ambiguous ASN.1 types 

1263typeMap = { 

1264 univ.Set.typeId: SetDecoder(), 

1265 univ.SetOf.typeId: SetOfDecoder(), 

1266 univ.Sequence.typeId: SequenceDecoder(), 

1267 univ.SequenceOf.typeId: SequenceOfDecoder(), 

1268 univ.Choice.typeId: ChoiceDecoder(), 

1269 univ.Any.typeId: AnyDecoder() 

1270} 

1271 

1272# Put in non-ambiguous types for faster codec lookup 

1273for typeDecoder in tagMap.values(): 

1274 if typeDecoder.protoComponent is not None: 

1275 typeId = typeDecoder.protoComponent.__class__.typeId 

1276 if typeId is not None and typeId not in typeMap: 

1277 typeMap[typeId] = typeDecoder 

1278 

1279 

1280(stDecodeTag, 

1281 stDecodeLength, 

1282 stGetValueDecoder, 

1283 stGetValueDecoderByAsn1Spec, 

1284 stGetValueDecoderByTag, 

1285 stTryAsExplicitTag, 

1286 stDecodeValue, 

1287 stDumpRawValue, 

1288 stErrorCondition, 

1289 stStop) = [x for x in range(10)] 

1290 

1291 

1292class Decoder(object): 

1293 defaultErrorState = stErrorCondition 

1294 #defaultErrorState = stDumpRawValue 

1295 defaultRawDecoder = AnyDecoder() 

1296 supportIndefLength = True 

1297 

1298 # noinspection PyDefaultArgument 

1299 def __init__(self, tagMap, typeMap={}): 

1300 self.__tagMap = tagMap 

1301 self.__typeMap = typeMap 

1302 # Tag & TagSet objects caches 

1303 self.__tagCache = {} 

1304 self.__tagSetCache = {} 

1305 self.__eooSentinel = ints2octs((0, 0)) 

1306 

1307 def __call__(self, substrate, asn1Spec=None, 

1308 tagSet=None, length=None, state=stDecodeTag, 

1309 decodeFun=None, substrateFun=None, 

1310 **options): 

1311 

1312 if LOG: 

1313 LOG('decoder called at scope %s with state %d, working with up to %d octets of substrate: %s' % (debug.scope, state, len(substrate), debug.hexdump(substrate))) 

1314 

1315 allowEoo = options.pop('allowEoo', False) 

1316 

1317 # Look for end-of-octets sentinel 

1318 if allowEoo and self.supportIndefLength: 

1319 if substrate[:2] == self.__eooSentinel: 

1320 if LOG: 

1321 LOG('end-of-octets sentinel found') 

1322 return eoo.endOfOctets, substrate[2:] 

1323 

1324 value = noValue 

1325 

1326 tagMap = self.__tagMap 

1327 typeMap = self.__typeMap 

1328 tagCache = self.__tagCache 

1329 tagSetCache = self.__tagSetCache 

1330 

1331 fullSubstrate = substrate 

1332 

1333 while state is not stStop: 

1334 

1335 if state is stDecodeTag: 

1336 if not substrate: 

1337 raise error.SubstrateUnderrunError( 

1338 'Short octet stream on tag decoding' 

1339 ) 

1340 

1341 # Decode tag 

1342 isShortTag = True 

1343 firstOctet = substrate[0] 

1344 substrate = substrate[1:] 

1345 

1346 try: 

1347 lastTag = tagCache[firstOctet] 

1348 

1349 except KeyError: 

1350 integerTag = oct2int(firstOctet) 

1351 tagClass = integerTag & 0xC0 

1352 tagFormat = integerTag & 0x20 

1353 tagId = integerTag & 0x1F 

1354 

1355 if tagId == 0x1F: 

1356 isShortTag = False 

1357 lengthOctetIdx = 0 

1358 tagId = 0 

1359 

1360 try: 

1361 while True: 

1362 integerTag = oct2int(substrate[lengthOctetIdx]) 

1363 lengthOctetIdx += 1 

1364 tagId <<= 7 

1365 tagId |= (integerTag & 0x7F) 

1366 if not integerTag & 0x80: 

1367 break 

1368 

1369 substrate = substrate[lengthOctetIdx:] 

1370 

1371 except IndexError: 

1372 raise error.SubstrateUnderrunError( 

1373 'Short octet stream on long tag decoding' 

1374 ) 

1375 

1376 lastTag = tag.Tag( 

1377 tagClass=tagClass, tagFormat=tagFormat, tagId=tagId 

1378 ) 

1379 

1380 if isShortTag: 

1381 # cache short tags 

1382 tagCache[firstOctet] = lastTag 

1383 

1384 if tagSet is None: 

1385 if isShortTag: 

1386 try: 

1387 tagSet = tagSetCache[firstOctet] 

1388 

1389 except KeyError: 

1390 # base tag not recovered 

1391 tagSet = tag.TagSet((), lastTag) 

1392 tagSetCache[firstOctet] = tagSet 

1393 else: 

1394 tagSet = tag.TagSet((), lastTag) 

1395 

1396 else: 

1397 tagSet = lastTag + tagSet 

1398 

1399 state = stDecodeLength 

1400 

1401 if LOG: 

1402 LOG('tag decoded into %s, decoding length' % tagSet) 

1403 

1404 if state is stDecodeLength: 

1405 # Decode length 

1406 if not substrate: 

1407 raise error.SubstrateUnderrunError( 

1408 'Short octet stream on length decoding' 

1409 ) 

1410 

1411 firstOctet = oct2int(substrate[0]) 

1412 

1413 if firstOctet < 128: 

1414 size = 1 

1415 length = firstOctet 

1416 

1417 elif firstOctet > 128: 

1418 size = firstOctet & 0x7F 

1419 # encoded in size bytes 

1420 encodedLength = octs2ints(substrate[1:size + 1]) 

1421 # missing check on maximum size, which shouldn't be a 

1422 # problem, we can handle more than is possible 

1423 if len(encodedLength) != size: 

1424 raise error.SubstrateUnderrunError( 

1425 '%s<%s at %s' % (size, len(encodedLength), tagSet) 

1426 ) 

1427 

1428 length = 0 

1429 for lengthOctet in encodedLength: 

1430 length <<= 8 

1431 length |= lengthOctet 

1432 size += 1 

1433 

1434 else: 

1435 size = 1 

1436 length = -1 

1437 

1438 substrate = substrate[size:] 

1439 

1440 if length == -1: 

1441 if not self.supportIndefLength: 

1442 raise error.PyAsn1Error('Indefinite length encoding not supported by this codec') 

1443 

1444 else: 

1445 if len(substrate) < length: 

1446 raise error.SubstrateUnderrunError('%d-octet short' % (length - len(substrate))) 

1447 

1448 state = stGetValueDecoder 

1449 

1450 if LOG: 

1451 LOG('value length decoded into %d, payload substrate is: %s' % (length, debug.hexdump(length == -1 and substrate or substrate[:length]))) 

1452 

1453 if state is stGetValueDecoder: 

1454 if asn1Spec is None: 

1455 state = stGetValueDecoderByTag 

1456 

1457 else: 

1458 state = stGetValueDecoderByAsn1Spec 

1459 # 

1460 # There're two ways of creating subtypes in ASN.1 what influences 

1461 # decoder operation. These methods are: 

1462 # 1) Either base types used in or no IMPLICIT tagging has been 

1463 # applied on subtyping. 

1464 # 2) Subtype syntax drops base type information (by means of 

1465 # IMPLICIT tagging. 

1466 # The first case allows for complete tag recovery from substrate 

1467 # while the second one requires original ASN.1 type spec for 

1468 # decoding. 

1469 # 

1470 # In either case a set of tags (tagSet) is coming from substrate 

1471 # in an incremental, tag-by-tag fashion (this is the case of 

1472 # EXPLICIT tag which is most basic). Outermost tag comes first 

1473 # from the wire. 

1474 # 

1475 if state is stGetValueDecoderByTag: 

1476 try: 

1477 concreteDecoder = tagMap[tagSet] 

1478 

1479 except KeyError: 

1480 concreteDecoder = None 

1481 

1482 if concreteDecoder: 

1483 state = stDecodeValue 

1484 

1485 else: 

1486 try: 

1487 concreteDecoder = tagMap[tagSet[:1]] 

1488 

1489 except KeyError: 

1490 concreteDecoder = None 

1491 

1492 if concreteDecoder: 

1493 state = stDecodeValue 

1494 else: 

1495 state = stTryAsExplicitTag 

1496 

1497 if LOG: 

1498 LOG('codec %s chosen by a built-in type, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag')) 

1499 debug.scope.push(concreteDecoder is None and '?' or concreteDecoder.protoComponent.__class__.__name__) 

1500 

1501 if state is stGetValueDecoderByAsn1Spec: 

1502 

1503 if asn1Spec.__class__ is tagmap.TagMap: 

1504 try: 

1505 chosenSpec = asn1Spec[tagSet] 

1506 

1507 except KeyError: 

1508 chosenSpec = None 

1509 

1510 if LOG: 

1511 LOG('candidate ASN.1 spec is a map of:') 

1512 

1513 for firstOctet, v in asn1Spec.presentTypes.items(): 

1514 LOG(' %s -> %s' % (firstOctet, v.__class__.__name__)) 

1515 

1516 if asn1Spec.skipTypes: 

1517 LOG('but neither of: ') 

1518 for firstOctet, v in asn1Spec.skipTypes.items(): 

1519 LOG(' %s -> %s' % (firstOctet, v.__class__.__name__)) 

1520 LOG('new candidate ASN.1 spec is %s, chosen by %s' % (chosenSpec is None and '<none>' or chosenSpec.prettyPrintType(), tagSet)) 

1521 

1522 elif tagSet == asn1Spec.tagSet or tagSet in asn1Spec.tagMap: 

1523 chosenSpec = asn1Spec 

1524 if LOG: 

1525 LOG('candidate ASN.1 spec is %s' % asn1Spec.__class__.__name__) 

1526 

1527 else: 

1528 chosenSpec = None 

1529 

1530 if chosenSpec is not None: 

1531 try: 

1532 # ambiguous type or just faster codec lookup 

1533 concreteDecoder = typeMap[chosenSpec.typeId] 

1534 

1535 if LOG: 

1536 LOG('value decoder chosen for an ambiguous type by type ID %s' % (chosenSpec.typeId,)) 

1537 

1538 except KeyError: 

1539 # use base type for codec lookup to recover untagged types 

1540 baseTagSet = tag.TagSet(chosenSpec.tagSet.baseTag, chosenSpec.tagSet.baseTag) 

1541 try: 

1542 # base type or tagged subtype 

1543 concreteDecoder = tagMap[baseTagSet] 

1544 

1545 if LOG: 

1546 LOG('value decoder chosen by base %s' % (baseTagSet,)) 

1547 

1548 except KeyError: 

1549 concreteDecoder = None 

1550 

1551 if concreteDecoder: 

1552 asn1Spec = chosenSpec 

1553 state = stDecodeValue 

1554 

1555 else: 

1556 state = stTryAsExplicitTag 

1557 

1558 else: 

1559 concreteDecoder = None 

1560 state = stTryAsExplicitTag 

1561 

1562 if LOG: 

1563 LOG('codec %s chosen by ASN.1 spec, decoding %s' % (state is stDecodeValue and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag')) 

1564 debug.scope.push(chosenSpec is None and '?' or chosenSpec.__class__.__name__) 

1565 

1566 if state is stDecodeValue: 

1567 if not options.get('recursiveFlag', True) and not substrateFun: # deprecate this 

1568 substrateFun = lambda a, b, c: (a, b[:c]) 

1569 

1570 options.update(fullSubstrate=fullSubstrate) 

1571 

1572 if length == -1: # indef length 

1573 value, substrate = concreteDecoder.indefLenValueDecoder( 

1574 substrate, asn1Spec, 

1575 tagSet, length, stGetValueDecoder, 

1576 self, substrateFun, 

1577 **options 

1578 ) 

1579 

1580 else: 

1581 value, substrate = concreteDecoder.valueDecoder( 

1582 substrate, asn1Spec, 

1583 tagSet, length, stGetValueDecoder, 

1584 self, substrateFun, 

1585 **options 

1586 ) 

1587 

1588 if LOG: 

1589 LOG('codec %s yields type %s, value:\n%s\n...remaining substrate is: %s' % (concreteDecoder.__class__.__name__, value.__class__.__name__, isinstance(value, base.Asn1Item) and value.prettyPrint() or value, substrate and debug.hexdump(substrate) or '<none>')) 

1590 

1591 state = stStop 

1592 break 

1593 

1594 if state is stTryAsExplicitTag: 

1595 if (tagSet and 

1596 tagSet[0].tagFormat == tag.tagFormatConstructed and 

1597 tagSet[0].tagClass != tag.tagClassUniversal): 

1598 # Assume explicit tagging 

1599 concreteDecoder = explicitTagDecoder 

1600 state = stDecodeValue 

1601 

1602 else: 

1603 concreteDecoder = None 

1604 state = self.defaultErrorState 

1605 

1606 if LOG: 

1607 LOG('codec %s chosen, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as failure')) 

1608 

1609 if state is stDumpRawValue: 

1610 concreteDecoder = self.defaultRawDecoder 

1611 

1612 if LOG: 

1613 LOG('codec %s chosen, decoding value' % concreteDecoder.__class__.__name__) 

1614 

1615 state = stDecodeValue 

1616 

1617 if state is stErrorCondition: 

1618 raise error.PyAsn1Error( 

1619 '%s not in asn1Spec: %r' % (tagSet, asn1Spec) 

1620 ) 

1621 

1622 if LOG: 

1623 debug.scope.pop() 

1624 LOG('decoder left scope %s, call completed' % debug.scope) 

1625 

1626 return value, substrate 

1627 

1628 

1629#: Turns BER octet stream into an ASN.1 object. 

1630#: 

1631#: Takes BER octet-stream and decode it into an ASN.1 object 

1632#: (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) which 

1633#: may be a scalar or an arbitrary nested structure. 

1634#: 

1635#: Parameters 

1636#: ---------- 

1637#: substrate: :py:class:`bytes` (Python 3) or :py:class:`str` (Python 2) 

1638#: BER octet-stream 

1639#: 

1640#: Keyword Args 

1641#: ------------ 

1642#: asn1Spec: any pyasn1 type object e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative 

1643#: A pyasn1 type object to act as a template guiding the decoder. Depending on the ASN.1 structure 

1644#: being decoded, *asn1Spec* may or may not be required. Most common reason for 

1645#: it to require is that ASN.1 structure is encoded in *IMPLICIT* tagging mode. 

1646#: 

1647#: Returns 

1648#: ------- 

1649#: : :py:class:`tuple` 

1650#: A tuple of pyasn1 object recovered from BER substrate (:py:class:`~pyasn1.type.base.PyAsn1Item` derivative) 

1651#: and the unprocessed trailing portion of the *substrate* (may be empty) 

1652#: 

1653#: Raises 

1654#: ------ 

1655#: ~pyasn1.error.PyAsn1Error, ~pyasn1.error.SubstrateUnderrunError 

1656#: On decoding errors 

1657#: 

1658#: Examples 

1659#: -------- 

1660#: Decode BER serialisation without ASN.1 schema 

1661#: 

1662#: .. code-block:: pycon 

1663#: 

1664#: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03') 

1665#: >>> str(s) 

1666#: SequenceOf: 

1667#: 1 2 3 

1668#: 

1669#: Decode BER serialisation with ASN.1 schema 

1670#: 

1671#: .. code-block:: pycon 

1672#: 

1673#: >>> seq = SequenceOf(componentType=Integer()) 

1674#: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03', asn1Spec=seq) 

1675#: >>> str(s) 

1676#: SequenceOf: 

1677#: 1 2 3 

1678#: 

1679decode = Decoder(tagMap, typeMap) 

1680 

1681# XXX 

1682# non-recursive decoding; return position rather than substrate