Coverage for /pythoncovmergedfiles/medio/medio/src/paramiko/paramiko/ber.py: 22%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

87 statements  

1# Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com> 

2# 

3# This file is part of paramiko. 

4# 

5# Paramiko is free software; you can redistribute it and/or modify it under the 

6# terms of the GNU Lesser General Public License as published by the Free 

7# Software Foundation; either version 2.1 of the License, or (at your option) 

8# any later version. 

9# 

10# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY 

11# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 

12# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 

13# details. 

14# 

15# You should have received a copy of the GNU Lesser General Public License 

16# along with Paramiko; if not, write to the Free Software Foundation, Inc., 

17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

18from paramiko.common import max_byte, zero_byte, byte_ord, byte_chr 

19 

20import paramiko.util as util 

21from paramiko.util import b 

22from paramiko.sftp import int64 

23 

24 

25class BERException(Exception): 

26 pass 

27 

28 

29class BER: 

30 """ 

31 Robey's tiny little attempt at a BER decoder. 

32 """ 

33 

34 def __init__(self, content=bytes()): 

35 self.content = b(content) 

36 self.idx = 0 

37 

38 def asbytes(self): 

39 return self.content 

40 

41 def __str__(self): 

42 return self.asbytes() 

43 

44 def __repr__(self): 

45 return "BER('" + repr(self.content) + "')" 

46 

47 def decode(self): 

48 return self.decode_next() 

49 

50 def decode_next(self): 

51 if self.idx >= len(self.content): 

52 return None 

53 ident = byte_ord(self.content[self.idx]) 

54 self.idx += 1 

55 if (ident & 31) == 31: 

56 # identifier > 30 

57 ident = 0 

58 while self.idx < len(self.content): 

59 t = byte_ord(self.content[self.idx]) 

60 self.idx += 1 

61 ident = (ident << 7) | (t & 0x7F) 

62 if not (t & 0x80): 

63 break 

64 if self.idx >= len(self.content): 

65 return None 

66 # now fetch length 

67 size = byte_ord(self.content[self.idx]) 

68 self.idx += 1 

69 if size & 0x80: 

70 # more complimicated... 

71 # FIXME: theoretically should handle indefinite-length (0x80) 

72 t = size & 0x7F 

73 if self.idx + t > len(self.content): 

74 return None 

75 size = util.inflate_long( 

76 self.content[self.idx : self.idx + t], True 

77 ) 

78 self.idx += t 

79 if self.idx + size > len(self.content): 

80 # can't fit 

81 return None 

82 data = self.content[self.idx : self.idx + size] 

83 self.idx += size 

84 # now switch on id 

85 if ident == 0x30: 

86 # sequence 

87 return self.decode_sequence(data) 

88 elif ident == 2: 

89 # int 

90 return util.inflate_long(data) 

91 else: 

92 # 1: boolean (00 false, otherwise true) 

93 msg = "Unknown ber encoding type {:d} (robey is lazy)" 

94 raise BERException(msg.format(ident)) 

95 

96 @staticmethod 

97 def decode_sequence(data): 

98 out = [] 

99 ber = BER(data) 

100 while True: 

101 x = ber.decode_next() 

102 if x is None: 

103 break 

104 out.append(x) 

105 return out 

106 

107 def encode_tlv(self, ident, val): 

108 # no need to support ident > 31 here 

109 self.content += byte_chr(ident) 

110 if len(val) > 0x7F: 

111 lenstr = util.deflate_long(len(val)) 

112 self.content += byte_chr(0x80 + len(lenstr)) + lenstr 

113 else: 

114 self.content += byte_chr(len(val)) 

115 self.content += val 

116 

117 def encode(self, x): 

118 if type(x) is bool: 

119 if x: 

120 self.encode_tlv(1, max_byte) 

121 else: 

122 self.encode_tlv(1, zero_byte) 

123 elif (type(x) is int) or (type(x) is int64): 

124 self.encode_tlv(2, util.deflate_long(x)) 

125 elif type(x) is str: 

126 self.encode_tlv(4, x) 

127 elif (type(x) is list) or (type(x) is tuple): 

128 self.encode_tlv(0x30, self.encode_sequence(x)) 

129 else: 

130 raise BERException( 

131 "Unknown type for encoding: {!r}".format(type(x)) 

132 ) 

133 

134 @staticmethod 

135 def encode_sequence(data): 

136 ber = BER() 

137 for item in data: 

138 ber.encode(item) 

139 return ber.asbytes()