Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pyasn1/codec/ber/encoder.py: 16%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

487 statements  

1# 

2# This file is part of pyasn1 software. 

3# 

4# Copyright (c) 2005-2020, Ilya Etingof <etingof@gmail.com> 

5# License: https://pyasn1.readthedocs.io/en/latest/license.html 

6# 

7import sys 

8import warnings 

9 

10from pyasn1 import debug 

11from pyasn1 import error 

12from pyasn1.codec.ber import eoo 

13from pyasn1.compat import _MISSING 

14from pyasn1.compat.integer import to_bytes 

15from pyasn1.type import char 

16from pyasn1.type import tag 

17from pyasn1.type import univ 

18from pyasn1.type import useful 

19 

20__all__ = ['Encoder', 'encode'] 

21 

22LOG = debug.registerLoggee(__name__, flags=debug.DEBUG_ENCODER) 

23 

24 

25class AbstractItemEncoder(object): 

26 supportIndefLenMode = True 

27 

28 # An outcome of otherwise legit call `encodeFun(eoo.endOfOctets)` 

29 eooIntegerSubstrate = (0, 0) 

30 eooOctetsSubstrate = bytes(eooIntegerSubstrate) 

31 

32 # noinspection PyMethodMayBeStatic 

33 def encodeTag(self, singleTag, isConstructed): 

34 tagClass, tagFormat, tagId = singleTag 

35 encodedTag = tagClass | tagFormat 

36 if isConstructed: 

37 encodedTag |= tag.tagFormatConstructed 

38 

39 if tagId < 31: 

40 return encodedTag | tagId, 

41 

42 else: 

43 substrate = tagId & 0x7f, 

44 

45 tagId >>= 7 

46 

47 while tagId: 

48 substrate = (0x80 | (tagId & 0x7f),) + substrate 

49 tagId >>= 7 

50 

51 return (encodedTag | 0x1F,) + substrate 

52 

53 def encodeLength(self, length, defMode): 

54 if not defMode and self.supportIndefLenMode: 

55 return (0x80,) 

56 

57 if length < 0x80: 

58 return length, 

59 

60 else: 

61 substrate = () 

62 while length: 

63 substrate = (length & 0xff,) + substrate 

64 length >>= 8 

65 

66 substrateLen = len(substrate) 

67 

68 if substrateLen > 126: 

69 raise error.PyAsn1Error('Length octets overflow (%d)' % substrateLen) 

70 

71 return (0x80 | substrateLen,) + substrate 

72 

73 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

74 raise error.PyAsn1Error('Not implemented') 

75 

76 def encode(self, value, asn1Spec=None, encodeFun=None, **options): 

77 

78 if asn1Spec is None: 

79 tagSet = value.tagSet 

80 else: 

81 tagSet = asn1Spec.tagSet 

82 

83 # untagged item? 

84 if not tagSet: 

85 substrate, isConstructed, isOctets = self.encodeValue( 

86 value, asn1Spec, encodeFun, **options 

87 ) 

88 return substrate 

89 

90 defMode = options.get('defMode', True) 

91 

92 substrate = b'' 

93 

94 for idx, singleTag in enumerate(tagSet.superTags): 

95 

96 defModeOverride = defMode 

97 

98 # base tag? 

99 if not idx: 

100 try: 

101 substrate, isConstructed, isOctets = self.encodeValue( 

102 value, asn1Spec, encodeFun, **options 

103 ) 

104 

105 except error.PyAsn1Error as exc: 

106 raise error.PyAsn1Error( 

107 'Error encoding %r: %s' % (value, exc)) 

108 

109 if LOG: 

110 LOG('encoded %svalue %s into %s' % ( 

111 isConstructed and 'constructed ' or '', value, substrate 

112 )) 

113 

114 if not substrate and isConstructed and options.get('ifNotEmpty', False): 

115 return substrate 

116 

117 if not isConstructed: 

118 defModeOverride = True 

119 

120 if LOG: 

121 LOG('overridden encoding mode into definitive for primitive type') 

122 

123 header = self.encodeTag(singleTag, isConstructed) 

124 

125 if LOG: 

126 LOG('encoded %stag %s into %s' % ( 

127 isConstructed and 'constructed ' or '', 

128 singleTag, debug.hexdump(bytes(header)))) 

129 

130 header += self.encodeLength(len(substrate), defModeOverride) 

131 

132 if LOG: 

133 LOG('encoded %s octets (tag + payload) into %s' % ( 

134 len(substrate), debug.hexdump(bytes(header)))) 

135 

136 if isOctets: 

137 substrate = bytes(header) + substrate 

138 

139 if not defModeOverride: 

140 substrate += self.eooOctetsSubstrate 

141 

142 else: 

143 substrate = header + substrate 

144 

145 if not defModeOverride: 

146 substrate += self.eooIntegerSubstrate 

147 

148 if not isOctets: 

149 substrate = bytes(substrate) 

150 

151 return substrate 

152 

153 

154class EndOfOctetsEncoder(AbstractItemEncoder): 

155 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

156 return b'', False, True 

157 

158 

159class BooleanEncoder(AbstractItemEncoder): 

160 supportIndefLenMode = False 

161 

162 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

163 return value and (1,) or (0,), False, False 

164 

165 

166class IntegerEncoder(AbstractItemEncoder): 

167 supportIndefLenMode = False 

168 supportCompactZero = False 

169 

170 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

171 if value == 0: 

172 if LOG: 

173 LOG('encoding %spayload for zero INTEGER' % ( 

174 self.supportCompactZero and 'no ' or '' 

175 )) 

176 

177 # de-facto way to encode zero 

178 if self.supportCompactZero: 

179 return (), False, False 

180 else: 

181 return (0,), False, False 

182 

183 return to_bytes(int(value), signed=True), False, True 

184 

185 

186class BitStringEncoder(AbstractItemEncoder): 

187 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

188 if asn1Spec is not None: 

189 # TODO: try to avoid ASN.1 schema instantiation 

190 value = asn1Spec.clone(value) 

191 

192 valueLength = len(value) 

193 if valueLength % 8: 

194 alignedValue = value << (8 - valueLength % 8) 

195 else: 

196 alignedValue = value 

197 

198 maxChunkSize = options.get('maxChunkSize', 0) 

199 if not maxChunkSize or len(alignedValue) <= maxChunkSize * 8: 

200 substrate = alignedValue.asOctets() 

201 return bytes((len(substrate) * 8 - valueLength,)) + substrate, False, True 

202 

203 if LOG: 

204 LOG('encoding into up to %s-octet chunks' % maxChunkSize) 

205 

206 baseTag = value.tagSet.baseTag 

207 

208 # strip off explicit tags 

209 if baseTag: 

210 tagSet = tag.TagSet(baseTag, baseTag) 

211 

212 else: 

213 tagSet = tag.TagSet() 

214 

215 alignedValue = alignedValue.clone(tagSet=tagSet) 

216 

217 stop = 0 

218 substrate = b'' 

219 while stop < valueLength: 

220 start = stop 

221 stop = min(start + maxChunkSize * 8, valueLength) 

222 substrate += encodeFun(alignedValue[start:stop], asn1Spec, **options) 

223 

224 return substrate, True, True 

225 

226 

227class OctetStringEncoder(AbstractItemEncoder): 

228 

229 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

230 

231 if asn1Spec is None: 

232 substrate = value.asOctets() 

233 

234 elif not isinstance(value, bytes): 

235 substrate = asn1Spec.clone(value).asOctets() 

236 

237 else: 

238 substrate = value 

239 

240 maxChunkSize = options.get('maxChunkSize', 0) 

241 

242 if not maxChunkSize or len(substrate) <= maxChunkSize: 

243 return substrate, False, True 

244 

245 if LOG: 

246 LOG('encoding into up to %s-octet chunks' % maxChunkSize) 

247 

248 # strip off explicit tags for inner chunks 

249 

250 if asn1Spec is None: 

251 baseTag = value.tagSet.baseTag 

252 

253 # strip off explicit tags 

254 if baseTag: 

255 tagSet = tag.TagSet(baseTag, baseTag) 

256 

257 else: 

258 tagSet = tag.TagSet() 

259 

260 asn1Spec = value.clone(tagSet=tagSet) 

261 

262 elif not isinstance(value, bytes): 

263 baseTag = asn1Spec.tagSet.baseTag 

264 

265 # strip off explicit tags 

266 if baseTag: 

267 tagSet = tag.TagSet(baseTag, baseTag) 

268 

269 else: 

270 tagSet = tag.TagSet() 

271 

272 asn1Spec = asn1Spec.clone(tagSet=tagSet) 

273 

274 pos = 0 

275 substrate = b'' 

276 

277 while True: 

278 chunk = value[pos:pos + maxChunkSize] 

279 if not chunk: 

280 break 

281 

282 substrate += encodeFun(chunk, asn1Spec, **options) 

283 pos += maxChunkSize 

284 

285 return substrate, True, True 

286 

287 

288class NullEncoder(AbstractItemEncoder): 

289 supportIndefLenMode = False 

290 

291 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

292 return b'', False, True 

293 

294 

295class ObjectIdentifierEncoder(AbstractItemEncoder): 

296 supportIndefLenMode = False 

297 

298 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

299 if asn1Spec is not None: 

300 value = asn1Spec.clone(value) 

301 

302 oid = value.asTuple() 

303 

304 # Build the first pair 

305 try: 

306 first = oid[0] 

307 second = oid[1] 

308 

309 except IndexError: 

310 raise error.PyAsn1Error('Short OID %s' % (value,)) 

311 

312 if 0 <= second <= 39: 

313 if first == 1: 

314 oid = (second + 40,) + oid[2:] 

315 elif first == 0: 

316 oid = (second,) + oid[2:] 

317 elif first == 2: 

318 oid = (second + 80,) + oid[2:] 

319 else: 

320 raise error.PyAsn1Error('Impossible first/second arcs at %s' % (value,)) 

321 

322 elif first == 2: 

323 oid = (second + 80,) + oid[2:] 

324 

325 else: 

326 raise error.PyAsn1Error('Impossible first/second arcs at %s' % (value,)) 

327 

328 octets = () 

329 

330 # Cycle through subIds 

331 for subOid in oid: 

332 if 0 <= subOid <= 127: 

333 # Optimize for the common case 

334 octets += (subOid,) 

335 

336 elif subOid > 127: 

337 # Pack large Sub-Object IDs 

338 res = (subOid & 0x7f,) 

339 subOid >>= 7 

340 

341 while subOid: 

342 res = (0x80 | (subOid & 0x7f),) + res 

343 subOid >>= 7 

344 

345 # Add packed Sub-Object ID to resulted Object ID 

346 octets += res 

347 

348 else: 

349 raise error.PyAsn1Error('Negative OID arc %s at %s' % (subOid, value)) 

350 

351 return octets, False, False 

352 

353 

354class RelativeOIDEncoder(AbstractItemEncoder): 

355 supportIndefLenMode = False 

356 

357 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

358 if asn1Spec is not None: 

359 value = asn1Spec.clone(value) 

360 

361 octets = () 

362 

363 # Cycle through subIds 

364 for subOid in value.asTuple(): 

365 if 0 <= subOid <= 127: 

366 # Optimize for the common case 

367 octets += (subOid,) 

368 

369 elif subOid > 127: 

370 # Pack large Sub-Object IDs 

371 res = (subOid & 0x7f,) 

372 subOid >>= 7 

373 

374 while subOid: 

375 res = (0x80 | (subOid & 0x7f),) + res 

376 subOid >>= 7 

377 

378 # Add packed Sub-Object ID to resulted RELATIVE-OID 

379 octets += res 

380 

381 else: 

382 raise error.PyAsn1Error('Negative RELATIVE-OID arc %s at %s' % (subOid, value)) 

383 

384 return octets, False, False 

385 

386 

387class RealEncoder(AbstractItemEncoder): 

388 supportIndefLenMode = False 

389 binEncBase = 2 # set to None to choose encoding base automatically 

390 

391 @staticmethod 

392 def _dropFloatingPoint(m, encbase, e): 

393 ms, es = 1, 1 

394 if m < 0: 

395 ms = -1 # mantissa sign 

396 

397 if e < 0: 

398 es = -1 # exponent sign 

399 

400 m *= ms 

401 

402 if encbase == 8: 

403 m *= 2 ** (abs(e) % 3 * es) 

404 e = abs(e) // 3 * es 

405 

406 elif encbase == 16: 

407 m *= 2 ** (abs(e) % 4 * es) 

408 e = abs(e) // 4 * es 

409 

410 while True: 

411 if int(m) != m: 

412 m *= encbase 

413 e -= 1 

414 continue 

415 break 

416 

417 return ms, int(m), encbase, e 

418 

419 def _chooseEncBase(self, value): 

420 m, b, e = value 

421 encBase = [2, 8, 16] 

422 if value.binEncBase in encBase: 

423 return self._dropFloatingPoint(m, value.binEncBase, e) 

424 

425 elif self.binEncBase in encBase: 

426 return self._dropFloatingPoint(m, self.binEncBase, e) 

427 

428 # auto choosing base 2/8/16 

429 mantissa = [m, m, m] 

430 exponent = [e, e, e] 

431 sign = 1 

432 encbase = 2 

433 e = float('inf') 

434 

435 for i in range(3): 

436 (sign, 

437 mantissa[i], 

438 encBase[i], 

439 exponent[i]) = self._dropFloatingPoint(mantissa[i], encBase[i], exponent[i]) 

440 

441 if abs(exponent[i]) < abs(e) or (abs(exponent[i]) == abs(e) and mantissa[i] < m): 

442 e = exponent[i] 

443 m = int(mantissa[i]) 

444 encbase = encBase[i] 

445 

446 if LOG: 

447 LOG('automatically chosen REAL encoding base %s, sign %s, mantissa %s, ' 

448 'exponent %s' % (encbase, sign, m, e)) 

449 

450 return sign, m, encbase, e 

451 

452 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

453 if asn1Spec is not None: 

454 value = asn1Spec.clone(value) 

455 

456 if value.isPlusInf: 

457 return (0x40,), False, False 

458 

459 if value.isMinusInf: 

460 return (0x41,), False, False 

461 

462 m, b, e = value 

463 

464 if not m: 

465 return b'', False, True 

466 

467 if b == 10: 

468 if LOG: 

469 LOG('encoding REAL into character form') 

470 

471 return b'\x03%dE%s%d' % (m, e == 0 and b'+' or b'', e), False, True 

472 

473 elif b == 2: 

474 fo = 0x80 # binary encoding 

475 ms, m, encbase, e = self._chooseEncBase(value) 

476 

477 if ms < 0: # mantissa sign 

478 fo |= 0x40 # sign bit 

479 

480 # exponent & mantissa normalization 

481 if encbase == 2: 

482 while m & 0x1 == 0: 

483 m >>= 1 

484 e += 1 

485 

486 elif encbase == 8: 

487 while m & 0x7 == 0: 

488 m >>= 3 

489 e += 1 

490 fo |= 0x10 

491 

492 else: # encbase = 16 

493 while m & 0xf == 0: 

494 m >>= 4 

495 e += 1 

496 fo |= 0x20 

497 

498 sf = 0 # scale factor 

499 

500 while m & 0x1 == 0: 

501 m >>= 1 

502 sf += 1 

503 

504 if sf > 3: 

505 raise error.PyAsn1Error('Scale factor overflow') # bug if raised 

506 

507 fo |= sf << 2 

508 eo = b'' 

509 if e == 0 or e == -1: 

510 eo = bytes((e & 0xff,)) 

511 

512 else: 

513 while e not in (0, -1): 

514 eo = bytes((e & 0xff,)) + eo 

515 e >>= 8 

516 

517 if e == 0 and eo and eo[0] & 0x80: 

518 eo = bytes((0,)) + eo 

519 

520 if e == -1 and eo and not (eo[0] & 0x80): 

521 eo = bytes((0xff,)) + eo 

522 

523 n = len(eo) 

524 if n > 0xff: 

525 raise error.PyAsn1Error('Real exponent overflow') 

526 

527 if n == 1: 

528 pass 

529 

530 elif n == 2: 

531 fo |= 1 

532 

533 elif n == 3: 

534 fo |= 2 

535 

536 else: 

537 fo |= 3 

538 eo = bytes((n & 0xff,)) + eo 

539 

540 po = b'' 

541 

542 while m: 

543 po = bytes((m & 0xff,)) + po 

544 m >>= 8 

545 

546 substrate = bytes((fo,)) + eo + po 

547 

548 return substrate, False, True 

549 

550 else: 

551 raise error.PyAsn1Error('Prohibited Real base %s' % b) 

552 

553 

554class SequenceEncoder(AbstractItemEncoder): 

555 omitEmptyOptionals = False 

556 

557 # TODO: handling three flavors of input is too much -- split over codecs 

558 

559 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

560 

561 substrate = b'' 

562 

563 omitEmptyOptionals = options.get( 

564 'omitEmptyOptionals', self.omitEmptyOptionals) 

565 

566 if LOG: 

567 LOG('%sencoding empty OPTIONAL components' % ( 

568 omitEmptyOptionals and 'not ' or '')) 

569 

570 if asn1Spec is None: 

571 # instance of ASN.1 schema 

572 inconsistency = value.isInconsistent 

573 if inconsistency: 

574 raise error.PyAsn1Error( 

575 f"ASN.1 object {value.__class__.__name__} is inconsistent") 

576 

577 namedTypes = value.componentType 

578 

579 for idx, component in enumerate(value.values()): 

580 if namedTypes: 

581 namedType = namedTypes[idx] 

582 

583 if namedType.isOptional and not component.isValue: 

584 if LOG: 

585 LOG('not encoding OPTIONAL component %r' % (namedType,)) 

586 continue 

587 

588 if namedType.isDefaulted and component == namedType.asn1Object: 

589 if LOG: 

590 LOG('not encoding DEFAULT component %r' % (namedType,)) 

591 continue 

592 

593 if omitEmptyOptionals: 

594 options.update(ifNotEmpty=namedType.isOptional) 

595 

596 # wrap open type blob if needed 

597 if namedTypes and namedType.openType: 

598 

599 wrapType = namedType.asn1Object 

600 

601 if wrapType.typeId in ( 

602 univ.SetOf.typeId, univ.SequenceOf.typeId): 

603 

604 substrate += encodeFun( 

605 component, asn1Spec, 

606 **dict(options, wrapType=wrapType.componentType)) 

607 

608 else: 

609 chunk = encodeFun(component, asn1Spec, **options) 

610 

611 if wrapType.isSameTypeWith(component): 

612 substrate += chunk 

613 

614 else: 

615 substrate += encodeFun(chunk, wrapType, **options) 

616 

617 if LOG: 

618 LOG('wrapped with wrap type %r' % (wrapType,)) 

619 

620 else: 

621 substrate += encodeFun(component, asn1Spec, **options) 

622 

623 else: 

624 # bare Python value + ASN.1 schema 

625 for idx, namedType in enumerate(asn1Spec.componentType.namedTypes): 

626 

627 try: 

628 component = value[namedType.name] 

629 

630 except KeyError: 

631 raise error.PyAsn1Error('Component name "%s" not found in %r' % ( 

632 namedType.name, value)) 

633 

634 if namedType.isOptional and namedType.name not in value: 

635 if LOG: 

636 LOG('not encoding OPTIONAL component %r' % (namedType,)) 

637 continue 

638 

639 if namedType.isDefaulted and component == namedType.asn1Object: 

640 if LOG: 

641 LOG('not encoding DEFAULT component %r' % (namedType,)) 

642 continue 

643 

644 if omitEmptyOptionals: 

645 options.update(ifNotEmpty=namedType.isOptional) 

646 

647 componentSpec = namedType.asn1Object 

648 

649 # wrap open type blob if needed 

650 if namedType.openType: 

651 

652 if componentSpec.typeId in ( 

653 univ.SetOf.typeId, univ.SequenceOf.typeId): 

654 

655 substrate += encodeFun( 

656 component, componentSpec, 

657 **dict(options, wrapType=componentSpec.componentType)) 

658 

659 else: 

660 chunk = encodeFun(component, componentSpec, **options) 

661 

662 if componentSpec.isSameTypeWith(component): 

663 substrate += chunk 

664 

665 else: 

666 substrate += encodeFun(chunk, componentSpec, **options) 

667 

668 if LOG: 

669 LOG('wrapped with wrap type %r' % (componentSpec,)) 

670 

671 else: 

672 substrate += encodeFun(component, componentSpec, **options) 

673 

674 return substrate, True, True 

675 

676 

677class SequenceOfEncoder(AbstractItemEncoder): 

678 def _encodeComponents(self, value, asn1Spec, encodeFun, **options): 

679 

680 if asn1Spec is None: 

681 inconsistency = value.isInconsistent 

682 if inconsistency: 

683 raise error.PyAsn1Error( 

684 f"ASN.1 object {value.__class__.__name__} is inconsistent") 

685 

686 else: 

687 asn1Spec = asn1Spec.componentType 

688 

689 chunks = [] 

690 

691 wrapType = options.pop('wrapType', None) 

692 

693 for idx, component in enumerate(value): 

694 chunk = encodeFun(component, asn1Spec, **options) 

695 

696 if (wrapType is not None and 

697 not wrapType.isSameTypeWith(component)): 

698 # wrap encoded value with wrapper container (e.g. ANY) 

699 chunk = encodeFun(chunk, wrapType, **options) 

700 

701 if LOG: 

702 LOG('wrapped with wrap type %r' % (wrapType,)) 

703 

704 chunks.append(chunk) 

705 

706 return chunks 

707 

708 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

709 chunks = self._encodeComponents( 

710 value, asn1Spec, encodeFun, **options) 

711 

712 return b''.join(chunks), True, True 

713 

714 

715class ChoiceEncoder(AbstractItemEncoder): 

716 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

717 if asn1Spec is None: 

718 component = value.getComponent() 

719 else: 

720 names = [namedType.name for namedType in asn1Spec.componentType.namedTypes 

721 if namedType.name in value] 

722 if len(names) != 1: 

723 raise error.PyAsn1Error('%s components for Choice at %r' % (len(names) and 'Multiple ' or 'None ', value)) 

724 

725 name = names[0] 

726 

727 component = value[name] 

728 asn1Spec = asn1Spec[name] 

729 

730 return encodeFun(component, asn1Spec, **options), True, True 

731 

732 

733class AnyEncoder(OctetStringEncoder): 

734 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

735 if asn1Spec is None: 

736 value = value.asOctets() 

737 elif not isinstance(value, bytes): 

738 value = asn1Spec.clone(value).asOctets() 

739 

740 return value, not options.get('defMode', True), True 

741 

742 

743TAG_MAP = { 

744 eoo.endOfOctets.tagSet: EndOfOctetsEncoder(), 

745 univ.Boolean.tagSet: BooleanEncoder(), 

746 univ.Integer.tagSet: IntegerEncoder(), 

747 univ.BitString.tagSet: BitStringEncoder(), 

748 univ.OctetString.tagSet: OctetStringEncoder(), 

749 univ.Null.tagSet: NullEncoder(), 

750 univ.ObjectIdentifier.tagSet: ObjectIdentifierEncoder(), 

751 univ.RelativeOID.tagSet: RelativeOIDEncoder(), 

752 univ.Enumerated.tagSet: IntegerEncoder(), 

753 univ.Real.tagSet: RealEncoder(), 

754 # Sequence & Set have same tags as SequenceOf & SetOf 

755 univ.SequenceOf.tagSet: SequenceOfEncoder(), 

756 univ.SetOf.tagSet: SequenceOfEncoder(), 

757 univ.Choice.tagSet: ChoiceEncoder(), 

758 # character string types 

759 char.UTF8String.tagSet: OctetStringEncoder(), 

760 char.NumericString.tagSet: OctetStringEncoder(), 

761 char.PrintableString.tagSet: OctetStringEncoder(), 

762 char.TeletexString.tagSet: OctetStringEncoder(), 

763 char.VideotexString.tagSet: OctetStringEncoder(), 

764 char.IA5String.tagSet: OctetStringEncoder(), 

765 char.GraphicString.tagSet: OctetStringEncoder(), 

766 char.VisibleString.tagSet: OctetStringEncoder(), 

767 char.GeneralString.tagSet: OctetStringEncoder(), 

768 char.UniversalString.tagSet: OctetStringEncoder(), 

769 char.BMPString.tagSet: OctetStringEncoder(), 

770 # useful types 

771 useful.ObjectDescriptor.tagSet: OctetStringEncoder(), 

772 useful.GeneralizedTime.tagSet: OctetStringEncoder(), 

773 useful.UTCTime.tagSet: OctetStringEncoder() 

774} 

775 

776# Put in ambiguous & non-ambiguous types for faster codec lookup 

777TYPE_MAP = { 

778 univ.Boolean.typeId: BooleanEncoder(), 

779 univ.Integer.typeId: IntegerEncoder(), 

780 univ.BitString.typeId: BitStringEncoder(), 

781 univ.OctetString.typeId: OctetStringEncoder(), 

782 univ.Null.typeId: NullEncoder(), 

783 univ.ObjectIdentifier.typeId: ObjectIdentifierEncoder(), 

784 univ.RelativeOID.typeId: RelativeOIDEncoder(), 

785 univ.Enumerated.typeId: IntegerEncoder(), 

786 univ.Real.typeId: RealEncoder(), 

787 # Sequence & Set have same tags as SequenceOf & SetOf 

788 univ.Set.typeId: SequenceEncoder(), 

789 univ.SetOf.typeId: SequenceOfEncoder(), 

790 univ.Sequence.typeId: SequenceEncoder(), 

791 univ.SequenceOf.typeId: SequenceOfEncoder(), 

792 univ.Choice.typeId: ChoiceEncoder(), 

793 univ.Any.typeId: AnyEncoder(), 

794 # character string types 

795 char.UTF8String.typeId: OctetStringEncoder(), 

796 char.NumericString.typeId: OctetStringEncoder(), 

797 char.PrintableString.typeId: OctetStringEncoder(), 

798 char.TeletexString.typeId: OctetStringEncoder(), 

799 char.VideotexString.typeId: OctetStringEncoder(), 

800 char.IA5String.typeId: OctetStringEncoder(), 

801 char.GraphicString.typeId: OctetStringEncoder(), 

802 char.VisibleString.typeId: OctetStringEncoder(), 

803 char.GeneralString.typeId: OctetStringEncoder(), 

804 char.UniversalString.typeId: OctetStringEncoder(), 

805 char.BMPString.typeId: OctetStringEncoder(), 

806 # useful types 

807 useful.ObjectDescriptor.typeId: OctetStringEncoder(), 

808 useful.GeneralizedTime.typeId: OctetStringEncoder(), 

809 useful.UTCTime.typeId: OctetStringEncoder() 

810} 

811 

812 

813class SingleItemEncoder(object): 

814 fixedDefLengthMode = None 

815 fixedChunkSize = None 

816 

817 TAG_MAP = TAG_MAP 

818 TYPE_MAP = TYPE_MAP 

819 

820 def __init__(self, tagMap=_MISSING, typeMap=_MISSING, **ignored): 

821 self._tagMap = tagMap if tagMap is not _MISSING else self.TAG_MAP 

822 self._typeMap = typeMap if typeMap is not _MISSING else self.TYPE_MAP 

823 

824 def __call__(self, value, asn1Spec=None, **options): 

825 try: 

826 if asn1Spec is None: 

827 typeId = value.typeId 

828 else: 

829 typeId = asn1Spec.typeId 

830 

831 except AttributeError: 

832 raise error.PyAsn1Error('Value %r is not ASN.1 type instance ' 

833 'and "asn1Spec" not given' % (value,)) 

834 

835 if LOG: 

836 LOG('encoder called in %sdef mode, chunk size %s for type %s, ' 

837 'value:\n%s' % (not options.get('defMode', True) and 'in' or '', 

838 options.get('maxChunkSize', 0), 

839 asn1Spec is None and value.prettyPrintType() or 

840 asn1Spec.prettyPrintType(), value)) 

841 

842 if self.fixedDefLengthMode is not None: 

843 options.update(defMode=self.fixedDefLengthMode) 

844 

845 if self.fixedChunkSize is not None: 

846 options.update(maxChunkSize=self.fixedChunkSize) 

847 

848 try: 

849 concreteEncoder = self._typeMap[typeId] 

850 

851 if LOG: 

852 LOG('using value codec %s chosen by type ID ' 

853 '%s' % (concreteEncoder.__class__.__name__, typeId)) 

854 

855 except KeyError: 

856 if asn1Spec is None: 

857 tagSet = value.tagSet 

858 else: 

859 tagSet = asn1Spec.tagSet 

860 

861 # use base type for codec lookup to recover untagged types 

862 baseTagSet = tag.TagSet(tagSet.baseTag, tagSet.baseTag) 

863 

864 try: 

865 concreteEncoder = self._tagMap[baseTagSet] 

866 

867 except KeyError: 

868 raise error.PyAsn1Error('No encoder for %r (%s)' % (value, tagSet)) 

869 

870 if LOG: 

871 LOG('using value codec %s chosen by tagSet ' 

872 '%s' % (concreteEncoder.__class__.__name__, tagSet)) 

873 

874 substrate = concreteEncoder.encode(value, asn1Spec, self, **options) 

875 

876 if LOG: 

877 LOG('codec %s built %s octets of substrate: %s\nencoder ' 

878 'completed' % (concreteEncoder, len(substrate), 

879 debug.hexdump(substrate))) 

880 

881 return substrate 

882 

883 

884class Encoder(object): 

885 SINGLE_ITEM_ENCODER = SingleItemEncoder 

886 

887 def __init__(self, tagMap=_MISSING, typeMap=_MISSING, **options): 

888 self._singleItemEncoder = self.SINGLE_ITEM_ENCODER( 

889 tagMap=tagMap, typeMap=typeMap, **options 

890 ) 

891 

892 def __call__(self, pyObject, asn1Spec=None, **options): 

893 return self._singleItemEncoder( 

894 pyObject, asn1Spec=asn1Spec, **options) 

895 

896 

897#: Turns ASN.1 object into BER octet stream. 

898#: 

899#: Takes any ASN.1 object (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) 

900#: walks all its components recursively and produces a BER octet stream. 

901#: 

902#: Parameters 

903#: ---------- 

904#: value: either a Python or pyasn1 object (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) 

905#: A Python or pyasn1 object to encode. If Python object is given, `asnSpec` 

906#: parameter is required to guide the encoding process. 

907#: 

908#: Keyword Args 

909#: ------------ 

910#: asn1Spec: 

911#: Optional ASN.1 schema or value object e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative 

912#: 

913#: defMode: :py:class:`bool` 

914#: If :obj:`False`, produces indefinite length encoding 

915#: 

916#: maxChunkSize: :py:class:`int` 

917#: Maximum chunk size in chunked encoding mode (0 denotes unlimited chunk size) 

918#: 

919#: Returns 

920#: ------- 

921#: : :py:class:`bytes` 

922#: Given ASN.1 object encoded into BER octetstream 

923#: 

924#: Raises 

925#: ------ 

926#: ~pyasn1.error.PyAsn1Error 

927#: On encoding errors 

928#: 

929#: Examples 

930#: -------- 

931#: Encode Python value into BER with ASN.1 schema 

932#: 

933#: .. code-block:: pycon 

934#: 

935#: >>> seq = SequenceOf(componentType=Integer()) 

936#: >>> encode([1, 2, 3], asn1Spec=seq) 

937#: b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03' 

938#: 

939#: Encode ASN.1 value object into BER 

940#: 

941#: .. code-block:: pycon 

942#: 

943#: >>> seq = SequenceOf(componentType=Integer()) 

944#: >>> seq.extend([1, 2, 3]) 

945#: >>> encode(seq) 

946#: b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03' 

947#: 

948encode = Encoder() 

949 

950def __getattr__(attr: str): 

951 if newAttr := {"tagMap": "TAG_MAP", "typeMap": "TYPE_MAP"}.get(attr): 

952 warnings.warn(f"{attr} is deprecated. Please use {newAttr} instead.", DeprecationWarning) 

953 return globals()[newAttr] 

954 raise AttributeError(attr)