Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pyasn1/codec/ber/encoder.py: 15%

453 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:16 +0000

1# 

2# This file is part of pyasn1 software. 

3# 

4# Copyright (c) 2005-2019, Ilya Etingof <etingof@gmail.com> 

5# License: http://snmplabs.com/pyasn1/license.html 

6# 

7import sys 

8 

9from pyasn1 import debug 

10from pyasn1 import error 

11from pyasn1.codec.ber import eoo 

12from pyasn1.compat.integer import to_bytes 

13from pyasn1.compat.octets import (int2oct, oct2int, ints2octs, null, 

14 str2octs, isOctetsType) 

15from pyasn1.type import char 

16from pyasn1.type import tag 

17from pyasn1.type import univ 

18from pyasn1.type import useful 

19 

20__all__ = ['encode'] 

21 

22LOG = debug.registerLoggee(__name__, flags=debug.DEBUG_ENCODER) 

23 

24 

25class AbstractItemEncoder(object): 

26 supportIndefLenMode = True 

27 

28 # An outcome of otherwise legit call `encodeFun(eoo.endOfOctets)` 

29 eooIntegerSubstrate = (0, 0) 

30 eooOctetsSubstrate = ints2octs(eooIntegerSubstrate) 

31 

32 # noinspection PyMethodMayBeStatic 

33 def encodeTag(self, singleTag, isConstructed): 

34 tagClass, tagFormat, tagId = singleTag 

35 encodedTag = tagClass | tagFormat 

36 if isConstructed: 

37 encodedTag |= tag.tagFormatConstructed 

38 

39 if tagId < 31: 

40 return encodedTag | tagId, 

41 

42 else: 

43 substrate = tagId & 0x7f, 

44 

45 tagId >>= 7 

46 

47 while tagId: 

48 substrate = (0x80 | (tagId & 0x7f),) + substrate 

49 tagId >>= 7 

50 

51 return (encodedTag | 0x1F,) + substrate 

52 

53 def encodeLength(self, length, defMode): 

54 if not defMode and self.supportIndefLenMode: 

55 return (0x80,) 

56 

57 if length < 0x80: 

58 return length, 

59 

60 else: 

61 substrate = () 

62 while length: 

63 substrate = (length & 0xff,) + substrate 

64 length >>= 8 

65 

66 substrateLen = len(substrate) 

67 

68 if substrateLen > 126: 

69 raise error.PyAsn1Error('Length octets overflow (%d)' % substrateLen) 

70 

71 return (0x80 | substrateLen,) + substrate 

72 

73 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

74 raise error.PyAsn1Error('Not implemented') 

75 

76 def encode(self, value, asn1Spec=None, encodeFun=None, **options): 

77 

78 if asn1Spec is None: 

79 tagSet = value.tagSet 

80 else: 

81 tagSet = asn1Spec.tagSet 

82 

83 # untagged item? 

84 if not tagSet: 

85 substrate, isConstructed, isOctets = self.encodeValue( 

86 value, asn1Spec, encodeFun, **options 

87 ) 

88 return substrate 

89 

90 defMode = options.get('defMode', True) 

91 

92 substrate = null 

93 

94 for idx, singleTag in enumerate(tagSet.superTags): 

95 

96 defModeOverride = defMode 

97 

98 # base tag? 

99 if not idx: 

100 try: 

101 substrate, isConstructed, isOctets = self.encodeValue( 

102 value, asn1Spec, encodeFun, **options 

103 ) 

104 

105 except error.PyAsn1Error: 

106 exc = sys.exc_info() 

107 raise error.PyAsn1Error( 

108 'Error encoding %r: %s' % (value, exc[1])) 

109 

110 if LOG: 

111 LOG('encoded %svalue %s into %s' % ( 

112 isConstructed and 'constructed ' or '', value, substrate 

113 )) 

114 

115 if not substrate and isConstructed and options.get('ifNotEmpty', False): 

116 return substrate 

117 

118 if not isConstructed: 

119 defModeOverride = True 

120 

121 if LOG: 

122 LOG('overridden encoding mode into definitive for primitive type') 

123 

124 header = self.encodeTag(singleTag, isConstructed) 

125 

126 if LOG: 

127 LOG('encoded %stag %s into %s' % ( 

128 isConstructed and 'constructed ' or '', 

129 singleTag, debug.hexdump(ints2octs(header)))) 

130 

131 header += self.encodeLength(len(substrate), defModeOverride) 

132 

133 if LOG: 

134 LOG('encoded %s octets (tag + payload) into %s' % ( 

135 len(substrate), debug.hexdump(ints2octs(header)))) 

136 

137 if isOctets: 

138 substrate = ints2octs(header) + substrate 

139 

140 if not defModeOverride: 

141 substrate += self.eooOctetsSubstrate 

142 

143 else: 

144 substrate = header + substrate 

145 

146 if not defModeOverride: 

147 substrate += self.eooIntegerSubstrate 

148 

149 if not isOctets: 

150 substrate = ints2octs(substrate) 

151 

152 return substrate 

153 

154 

155class EndOfOctetsEncoder(AbstractItemEncoder): 

156 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

157 return null, False, True 

158 

159 

160class BooleanEncoder(AbstractItemEncoder): 

161 supportIndefLenMode = False 

162 

163 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

164 return value and (1,) or (0,), False, False 

165 

166 

167class IntegerEncoder(AbstractItemEncoder): 

168 supportIndefLenMode = False 

169 supportCompactZero = False 

170 

171 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

172 if value == 0: 

173 if LOG: 

174 LOG('encoding %spayload for zero INTEGER' % ( 

175 self.supportCompactZero and 'no ' or '' 

176 )) 

177 

178 # de-facto way to encode zero 

179 if self.supportCompactZero: 

180 return (), False, False 

181 else: 

182 return (0,), False, False 

183 

184 return to_bytes(int(value), signed=True), False, True 

185 

186 

187class BitStringEncoder(AbstractItemEncoder): 

188 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

189 if asn1Spec is not None: 

190 # TODO: try to avoid ASN.1 schema instantiation 

191 value = asn1Spec.clone(value) 

192 

193 valueLength = len(value) 

194 if valueLength % 8: 

195 alignedValue = value << (8 - valueLength % 8) 

196 else: 

197 alignedValue = value 

198 

199 maxChunkSize = options.get('maxChunkSize', 0) 

200 if not maxChunkSize or len(alignedValue) <= maxChunkSize * 8: 

201 substrate = alignedValue.asOctets() 

202 return int2oct(len(substrate) * 8 - valueLength) + substrate, False, True 

203 

204 if LOG: 

205 LOG('encoding into up to %s-octet chunks' % maxChunkSize) 

206 

207 baseTag = value.tagSet.baseTag 

208 

209 # strip off explicit tags 

210 if baseTag: 

211 tagSet = tag.TagSet(baseTag, baseTag) 

212 

213 else: 

214 tagSet = tag.TagSet() 

215 

216 alignedValue = alignedValue.clone(tagSet=tagSet) 

217 

218 stop = 0 

219 substrate = null 

220 while stop < valueLength: 

221 start = stop 

222 stop = min(start + maxChunkSize * 8, valueLength) 

223 substrate += encodeFun(alignedValue[start:stop], asn1Spec, **options) 

224 

225 return substrate, True, True 

226 

227 

228class OctetStringEncoder(AbstractItemEncoder): 

229 

230 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

231 

232 if asn1Spec is None: 

233 substrate = value.asOctets() 

234 

235 elif not isOctetsType(value): 

236 substrate = asn1Spec.clone(value).asOctets() 

237 

238 else: 

239 substrate = value 

240 

241 maxChunkSize = options.get('maxChunkSize', 0) 

242 

243 if not maxChunkSize or len(substrate) <= maxChunkSize: 

244 return substrate, False, True 

245 

246 if LOG: 

247 LOG('encoding into up to %s-octet chunks' % maxChunkSize) 

248 

249 # strip off explicit tags for inner chunks 

250 

251 if asn1Spec is None: 

252 baseTag = value.tagSet.baseTag 

253 

254 # strip off explicit tags 

255 if baseTag: 

256 tagSet = tag.TagSet(baseTag, baseTag) 

257 

258 else: 

259 tagSet = tag.TagSet() 

260 

261 asn1Spec = value.clone(tagSet=tagSet) 

262 

263 elif not isOctetsType(value): 

264 baseTag = asn1Spec.tagSet.baseTag 

265 

266 # strip off explicit tags 

267 if baseTag: 

268 tagSet = tag.TagSet(baseTag, baseTag) 

269 

270 else: 

271 tagSet = tag.TagSet() 

272 

273 asn1Spec = asn1Spec.clone(tagSet=tagSet) 

274 

275 pos = 0 

276 substrate = null 

277 

278 while True: 

279 chunk = value[pos:pos + maxChunkSize] 

280 if not chunk: 

281 break 

282 

283 substrate += encodeFun(chunk, asn1Spec, **options) 

284 pos += maxChunkSize 

285 

286 return substrate, True, True 

287 

288 

289class NullEncoder(AbstractItemEncoder): 

290 supportIndefLenMode = False 

291 

292 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

293 return null, False, True 

294 

295 

296class ObjectIdentifierEncoder(AbstractItemEncoder): 

297 supportIndefLenMode = False 

298 

299 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

300 if asn1Spec is not None: 

301 value = asn1Spec.clone(value) 

302 

303 oid = value.asTuple() 

304 

305 # Build the first pair 

306 try: 

307 first = oid[0] 

308 second = oid[1] 

309 

310 except IndexError: 

311 raise error.PyAsn1Error('Short OID %s' % (value,)) 

312 

313 if 0 <= second <= 39: 

314 if first == 1: 

315 oid = (second + 40,) + oid[2:] 

316 elif first == 0: 

317 oid = (second,) + oid[2:] 

318 elif first == 2: 

319 oid = (second + 80,) + oid[2:] 

320 else: 

321 raise error.PyAsn1Error('Impossible first/second arcs at %s' % (value,)) 

322 

323 elif first == 2: 

324 oid = (second + 80,) + oid[2:] 

325 

326 else: 

327 raise error.PyAsn1Error('Impossible first/second arcs at %s' % (value,)) 

328 

329 octets = () 

330 

331 # Cycle through subIds 

332 for subOid in oid: 

333 if 0 <= subOid <= 127: 

334 # Optimize for the common case 

335 octets += (subOid,) 

336 

337 elif subOid > 127: 

338 # Pack large Sub-Object IDs 

339 res = (subOid & 0x7f,) 

340 subOid >>= 7 

341 

342 while subOid: 

343 res = (0x80 | (subOid & 0x7f),) + res 

344 subOid >>= 7 

345 

346 # Add packed Sub-Object ID to resulted Object ID 

347 octets += res 

348 

349 else: 

350 raise error.PyAsn1Error('Negative OID arc %s at %s' % (subOid, value)) 

351 

352 return octets, False, False 

353 

354 

355class RealEncoder(AbstractItemEncoder): 

356 supportIndefLenMode = 0 

357 binEncBase = 2 # set to None to choose encoding base automatically 

358 

359 @staticmethod 

360 def _dropFloatingPoint(m, encbase, e): 

361 ms, es = 1, 1 

362 if m < 0: 

363 ms = -1 # mantissa sign 

364 

365 if e < 0: 

366 es = -1 # exponent sign 

367 

368 m *= ms 

369 

370 if encbase == 8: 

371 m *= 2 ** (abs(e) % 3 * es) 

372 e = abs(e) // 3 * es 

373 

374 elif encbase == 16: 

375 m *= 2 ** (abs(e) % 4 * es) 

376 e = abs(e) // 4 * es 

377 

378 while True: 

379 if int(m) != m: 

380 m *= encbase 

381 e -= 1 

382 continue 

383 break 

384 

385 return ms, int(m), encbase, e 

386 

387 def _chooseEncBase(self, value): 

388 m, b, e = value 

389 encBase = [2, 8, 16] 

390 if value.binEncBase in encBase: 

391 return self._dropFloatingPoint(m, value.binEncBase, e) 

392 

393 elif self.binEncBase in encBase: 

394 return self._dropFloatingPoint(m, self.binEncBase, e) 

395 

396 # auto choosing base 2/8/16 

397 mantissa = [m, m, m] 

398 exponent = [e, e, e] 

399 sign = 1 

400 encbase = 2 

401 e = float('inf') 

402 

403 for i in range(3): 

404 (sign, 

405 mantissa[i], 

406 encBase[i], 

407 exponent[i]) = self._dropFloatingPoint(mantissa[i], encBase[i], exponent[i]) 

408 

409 if abs(exponent[i]) < abs(e) or (abs(exponent[i]) == abs(e) and mantissa[i] < m): 

410 e = exponent[i] 

411 m = int(mantissa[i]) 

412 encbase = encBase[i] 

413 

414 if LOG: 

415 LOG('automatically chosen REAL encoding base %s, sign %s, mantissa %s, ' 

416 'exponent %s' % (encbase, sign, m, e)) 

417 

418 return sign, m, encbase, e 

419 

420 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

421 if asn1Spec is not None: 

422 value = asn1Spec.clone(value) 

423 

424 if value.isPlusInf: 

425 return (0x40,), False, False 

426 

427 if value.isMinusInf: 

428 return (0x41,), False, False 

429 

430 m, b, e = value 

431 

432 if not m: 

433 return null, False, True 

434 

435 if b == 10: 

436 if LOG: 

437 LOG('encoding REAL into character form') 

438 

439 return str2octs('\x03%dE%s%d' % (m, e == 0 and '+' or '', e)), False, True 

440 

441 elif b == 2: 

442 fo = 0x80 # binary encoding 

443 ms, m, encbase, e = self._chooseEncBase(value) 

444 

445 if ms < 0: # mantissa sign 

446 fo |= 0x40 # sign bit 

447 

448 # exponent & mantissa normalization 

449 if encbase == 2: 

450 while m & 0x1 == 0: 

451 m >>= 1 

452 e += 1 

453 

454 elif encbase == 8: 

455 while m & 0x7 == 0: 

456 m >>= 3 

457 e += 1 

458 fo |= 0x10 

459 

460 else: # encbase = 16 

461 while m & 0xf == 0: 

462 m >>= 4 

463 e += 1 

464 fo |= 0x20 

465 

466 sf = 0 # scale factor 

467 

468 while m & 0x1 == 0: 

469 m >>= 1 

470 sf += 1 

471 

472 if sf > 3: 

473 raise error.PyAsn1Error('Scale factor overflow') # bug if raised 

474 

475 fo |= sf << 2 

476 eo = null 

477 if e == 0 or e == -1: 

478 eo = int2oct(e & 0xff) 

479 

480 else: 

481 while e not in (0, -1): 

482 eo = int2oct(e & 0xff) + eo 

483 e >>= 8 

484 

485 if e == 0 and eo and oct2int(eo[0]) & 0x80: 

486 eo = int2oct(0) + eo 

487 

488 if e == -1 and eo and not (oct2int(eo[0]) & 0x80): 

489 eo = int2oct(0xff) + eo 

490 

491 n = len(eo) 

492 if n > 0xff: 

493 raise error.PyAsn1Error('Real exponent overflow') 

494 

495 if n == 1: 

496 pass 

497 

498 elif n == 2: 

499 fo |= 1 

500 

501 elif n == 3: 

502 fo |= 2 

503 

504 else: 

505 fo |= 3 

506 eo = int2oct(n & 0xff) + eo 

507 

508 po = null 

509 

510 while m: 

511 po = int2oct(m & 0xff) + po 

512 m >>= 8 

513 

514 substrate = int2oct(fo) + eo + po 

515 

516 return substrate, False, True 

517 

518 else: 

519 raise error.PyAsn1Error('Prohibited Real base %s' % b) 

520 

521 

522class SequenceEncoder(AbstractItemEncoder): 

523 omitEmptyOptionals = False 

524 

525 # TODO: handling three flavors of input is too much -- split over codecs 

526 

527 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

528 

529 substrate = null 

530 

531 omitEmptyOptionals = options.get( 

532 'omitEmptyOptionals', self.omitEmptyOptionals) 

533 

534 if LOG: 

535 LOG('%sencoding empty OPTIONAL components' % ( 

536 omitEmptyOptionals and 'not ' or '')) 

537 

538 if asn1Spec is None: 

539 # instance of ASN.1 schema 

540 inconsistency = value.isInconsistent 

541 if inconsistency: 

542 raise inconsistency 

543 

544 namedTypes = value.componentType 

545 

546 for idx, component in enumerate(value.values()): 

547 if namedTypes: 

548 namedType = namedTypes[idx] 

549 

550 if namedType.isOptional and not component.isValue: 

551 if LOG: 

552 LOG('not encoding OPTIONAL component %r' % (namedType,)) 

553 continue 

554 

555 if namedType.isDefaulted and component == namedType.asn1Object: 

556 if LOG: 

557 LOG('not encoding DEFAULT component %r' % (namedType,)) 

558 continue 

559 

560 if omitEmptyOptionals: 

561 options.update(ifNotEmpty=namedType.isOptional) 

562 

563 # wrap open type blob if needed 

564 if namedTypes and namedType.openType: 

565 

566 wrapType = namedType.asn1Object 

567 

568 if wrapType.typeId in ( 

569 univ.SetOf.typeId, univ.SequenceOf.typeId): 

570 

571 substrate += encodeFun( 

572 component, asn1Spec, 

573 **dict(options, wrapType=wrapType.componentType)) 

574 

575 else: 

576 chunk = encodeFun(component, asn1Spec, **options) 

577 

578 if wrapType.isSameTypeWith(component): 

579 substrate += chunk 

580 

581 else: 

582 substrate += encodeFun(chunk, wrapType, **options) 

583 

584 if LOG: 

585 LOG('wrapped with wrap type %r' % (wrapType,)) 

586 

587 else: 

588 substrate += encodeFun(component, asn1Spec, **options) 

589 

590 else: 

591 # bare Python value + ASN.1 schema 

592 for idx, namedType in enumerate(asn1Spec.componentType.namedTypes): 

593 

594 try: 

595 component = value[namedType.name] 

596 

597 except KeyError: 

598 raise error.PyAsn1Error('Component name "%s" not found in %r' % ( 

599 namedType.name, value)) 

600 

601 if namedType.isOptional and namedType.name not in value: 

602 if LOG: 

603 LOG('not encoding OPTIONAL component %r' % (namedType,)) 

604 continue 

605 

606 if namedType.isDefaulted and component == namedType.asn1Object: 

607 if LOG: 

608 LOG('not encoding DEFAULT component %r' % (namedType,)) 

609 continue 

610 

611 if omitEmptyOptionals: 

612 options.update(ifNotEmpty=namedType.isOptional) 

613 

614 componentSpec = namedType.asn1Object 

615 

616 # wrap open type blob if needed 

617 if namedType.openType: 

618 

619 if componentSpec.typeId in ( 

620 univ.SetOf.typeId, univ.SequenceOf.typeId): 

621 

622 substrate += encodeFun( 

623 component, componentSpec, 

624 **dict(options, wrapType=componentSpec.componentType)) 

625 

626 else: 

627 chunk = encodeFun(component, componentSpec, **options) 

628 

629 if componentSpec.isSameTypeWith(component): 

630 substrate += chunk 

631 

632 else: 

633 substrate += encodeFun(chunk, componentSpec, **options) 

634 

635 if LOG: 

636 LOG('wrapped with wrap type %r' % (componentSpec,)) 

637 

638 else: 

639 substrate += encodeFun(component, componentSpec, **options) 

640 

641 return substrate, True, True 

642 

643 

644class SequenceOfEncoder(AbstractItemEncoder): 

645 def _encodeComponents(self, value, asn1Spec, encodeFun, **options): 

646 

647 if asn1Spec is None: 

648 inconsistency = value.isInconsistent 

649 if inconsistency: 

650 raise inconsistency 

651 

652 else: 

653 asn1Spec = asn1Spec.componentType 

654 

655 chunks = [] 

656 

657 wrapType = options.pop('wrapType', None) 

658 

659 for idx, component in enumerate(value): 

660 chunk = encodeFun(component, asn1Spec, **options) 

661 

662 if (wrapType is not None and 

663 not wrapType.isSameTypeWith(component)): 

664 # wrap encoded value with wrapper container (e.g. ANY) 

665 chunk = encodeFun(chunk, wrapType, **options) 

666 

667 if LOG: 

668 LOG('wrapped with wrap type %r' % (wrapType,)) 

669 

670 chunks.append(chunk) 

671 

672 return chunks 

673 

674 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

675 chunks = self._encodeComponents( 

676 value, asn1Spec, encodeFun, **options) 

677 

678 return null.join(chunks), True, True 

679 

680 

681class ChoiceEncoder(AbstractItemEncoder): 

682 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

683 if asn1Spec is None: 

684 component = value.getComponent() 

685 else: 

686 names = [namedType.name for namedType in asn1Spec.componentType.namedTypes 

687 if namedType.name in value] 

688 if len(names) != 1: 

689 raise error.PyAsn1Error('%s components for Choice at %r' % (len(names) and 'Multiple ' or 'None ', value)) 

690 

691 name = names[0] 

692 

693 component = value[name] 

694 asn1Spec = asn1Spec[name] 

695 

696 return encodeFun(component, asn1Spec, **options), True, True 

697 

698 

699class AnyEncoder(OctetStringEncoder): 

700 def encodeValue(self, value, asn1Spec, encodeFun, **options): 

701 if asn1Spec is None: 

702 value = value.asOctets() 

703 elif not isOctetsType(value): 

704 value = asn1Spec.clone(value).asOctets() 

705 

706 return value, not options.get('defMode', True), True 

707 

708 

709tagMap = { 

710 eoo.endOfOctets.tagSet: EndOfOctetsEncoder(), 

711 univ.Boolean.tagSet: BooleanEncoder(), 

712 univ.Integer.tagSet: IntegerEncoder(), 

713 univ.BitString.tagSet: BitStringEncoder(), 

714 univ.OctetString.tagSet: OctetStringEncoder(), 

715 univ.Null.tagSet: NullEncoder(), 

716 univ.ObjectIdentifier.tagSet: ObjectIdentifierEncoder(), 

717 univ.Enumerated.tagSet: IntegerEncoder(), 

718 univ.Real.tagSet: RealEncoder(), 

719 # Sequence & Set have same tags as SequenceOf & SetOf 

720 univ.SequenceOf.tagSet: SequenceOfEncoder(), 

721 univ.SetOf.tagSet: SequenceOfEncoder(), 

722 univ.Choice.tagSet: ChoiceEncoder(), 

723 # character string types 

724 char.UTF8String.tagSet: OctetStringEncoder(), 

725 char.NumericString.tagSet: OctetStringEncoder(), 

726 char.PrintableString.tagSet: OctetStringEncoder(), 

727 char.TeletexString.tagSet: OctetStringEncoder(), 

728 char.VideotexString.tagSet: OctetStringEncoder(), 

729 char.IA5String.tagSet: OctetStringEncoder(), 

730 char.GraphicString.tagSet: OctetStringEncoder(), 

731 char.VisibleString.tagSet: OctetStringEncoder(), 

732 char.GeneralString.tagSet: OctetStringEncoder(), 

733 char.UniversalString.tagSet: OctetStringEncoder(), 

734 char.BMPString.tagSet: OctetStringEncoder(), 

735 # useful types 

736 useful.ObjectDescriptor.tagSet: OctetStringEncoder(), 

737 useful.GeneralizedTime.tagSet: OctetStringEncoder(), 

738 useful.UTCTime.tagSet: OctetStringEncoder() 

739} 

740 

741# Put in ambiguous & non-ambiguous types for faster codec lookup 

742typeMap = { 

743 univ.Boolean.typeId: BooleanEncoder(), 

744 univ.Integer.typeId: IntegerEncoder(), 

745 univ.BitString.typeId: BitStringEncoder(), 

746 univ.OctetString.typeId: OctetStringEncoder(), 

747 univ.Null.typeId: NullEncoder(), 

748 univ.ObjectIdentifier.typeId: ObjectIdentifierEncoder(), 

749 univ.Enumerated.typeId: IntegerEncoder(), 

750 univ.Real.typeId: RealEncoder(), 

751 # Sequence & Set have same tags as SequenceOf & SetOf 

752 univ.Set.typeId: SequenceEncoder(), 

753 univ.SetOf.typeId: SequenceOfEncoder(), 

754 univ.Sequence.typeId: SequenceEncoder(), 

755 univ.SequenceOf.typeId: SequenceOfEncoder(), 

756 univ.Choice.typeId: ChoiceEncoder(), 

757 univ.Any.typeId: AnyEncoder(), 

758 # character string types 

759 char.UTF8String.typeId: OctetStringEncoder(), 

760 char.NumericString.typeId: OctetStringEncoder(), 

761 char.PrintableString.typeId: OctetStringEncoder(), 

762 char.TeletexString.typeId: OctetStringEncoder(), 

763 char.VideotexString.typeId: OctetStringEncoder(), 

764 char.IA5String.typeId: OctetStringEncoder(), 

765 char.GraphicString.typeId: OctetStringEncoder(), 

766 char.VisibleString.typeId: OctetStringEncoder(), 

767 char.GeneralString.typeId: OctetStringEncoder(), 

768 char.UniversalString.typeId: OctetStringEncoder(), 

769 char.BMPString.typeId: OctetStringEncoder(), 

770 # useful types 

771 useful.ObjectDescriptor.typeId: OctetStringEncoder(), 

772 useful.GeneralizedTime.typeId: OctetStringEncoder(), 

773 useful.UTCTime.typeId: OctetStringEncoder() 

774} 

775 

776 

777class Encoder(object): 

778 fixedDefLengthMode = None 

779 fixedChunkSize = None 

780 

781 # noinspection PyDefaultArgument 

782 def __init__(self, tagMap, typeMap={}): 

783 self.__tagMap = tagMap 

784 self.__typeMap = typeMap 

785 

786 def __call__(self, value, asn1Spec=None, **options): 

787 try: 

788 if asn1Spec is None: 

789 typeId = value.typeId 

790 else: 

791 typeId = asn1Spec.typeId 

792 

793 except AttributeError: 

794 raise error.PyAsn1Error('Value %r is not ASN.1 type instance ' 

795 'and "asn1Spec" not given' % (value,)) 

796 

797 if LOG: 

798 LOG('encoder called in %sdef mode, chunk size %s for ' 

799 'type %s, value:\n%s' % (not options.get('defMode', True) and 'in' or '', options.get('maxChunkSize', 0), asn1Spec is None and value.prettyPrintType() or asn1Spec.prettyPrintType(), value)) 

800 

801 if self.fixedDefLengthMode is not None: 

802 options.update(defMode=self.fixedDefLengthMode) 

803 

804 if self.fixedChunkSize is not None: 

805 options.update(maxChunkSize=self.fixedChunkSize) 

806 

807 

808 try: 

809 concreteEncoder = self.__typeMap[typeId] 

810 

811 if LOG: 

812 LOG('using value codec %s chosen by type ID %s' % (concreteEncoder.__class__.__name__, typeId)) 

813 

814 except KeyError: 

815 if asn1Spec is None: 

816 tagSet = value.tagSet 

817 else: 

818 tagSet = asn1Spec.tagSet 

819 

820 # use base type for codec lookup to recover untagged types 

821 baseTagSet = tag.TagSet(tagSet.baseTag, tagSet.baseTag) 

822 

823 try: 

824 concreteEncoder = self.__tagMap[baseTagSet] 

825 

826 except KeyError: 

827 raise error.PyAsn1Error('No encoder for %r (%s)' % (value, tagSet)) 

828 

829 if LOG: 

830 LOG('using value codec %s chosen by tagSet %s' % (concreteEncoder.__class__.__name__, tagSet)) 

831 

832 substrate = concreteEncoder.encode(value, asn1Spec, self, **options) 

833 

834 if LOG: 

835 LOG('codec %s built %s octets of substrate: %s\nencoder completed' % (concreteEncoder, len(substrate), debug.hexdump(substrate))) 

836 

837 return substrate 

838 

839#: Turns ASN.1 object into BER octet stream. 

840#: 

841#: Takes any ASN.1 object (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) 

842#: walks all its components recursively and produces a BER octet stream. 

843#: 

844#: Parameters 

845#: ---------- 

846#: value: either a Python or pyasn1 object (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) 

847#: A Python or pyasn1 object to encode. If Python object is given, `asnSpec` 

848#: parameter is required to guide the encoding process. 

849#: 

850#: Keyword Args 

851#: ------------ 

852#: asn1Spec: 

853#: Optional ASN.1 schema or value object e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative 

854#: 

855#: defMode: :py:class:`bool` 

856#: If :obj:`False`, produces indefinite length encoding 

857#: 

858#: maxChunkSize: :py:class:`int` 

859#: Maximum chunk size in chunked encoding mode (0 denotes unlimited chunk size) 

860#: 

861#: Returns 

862#: ------- 

863#: : :py:class:`bytes` (Python 3) or :py:class:`str` (Python 2) 

864#: Given ASN.1 object encoded into BER octetstream 

865#: 

866#: Raises 

867#: ------ 

868#: ~pyasn1.error.PyAsn1Error 

869#: On encoding errors 

870#: 

871#: Examples 

872#: -------- 

873#: Encode Python value into BER with ASN.1 schema 

874#: 

875#: .. code-block:: pycon 

876#: 

877#: >>> seq = SequenceOf(componentType=Integer()) 

878#: >>> encode([1, 2, 3], asn1Spec=seq) 

879#: b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03' 

880#: 

881#: Encode ASN.1 value object into BER 

882#: 

883#: .. code-block:: pycon 

884#: 

885#: >>> seq = SequenceOf(componentType=Integer()) 

886#: >>> seq.extend([1, 2, 3]) 

887#: >>> encode(seq) 

888#: b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03' 

889#: 

890encode = Encoder(tagMap, typeMap)