Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/c7n/log.py: 25%

143 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1# Copyright The Cloud Custodian Authors. 

2# SPDX-License-Identifier: Apache-2.0 

3""" 

4Python Standard Logging integration with CloudWatch Logs 

5 

6Double Buffered with background thread delivery. 

7 

8We do an initial buffering on the log handler directly, to avoid 

9some of the overhead of pushing to the queue (albeit dubious as 

10std logging does default lock acquisition around handler emit). 

11also uses a single thread for all outbound. Background thread 

12uses a separate session. 

13""" 

14from c7n.exceptions import ClientError 

15 

16import itertools 

17import logging 

18from operator import itemgetter 

19import threading 

20import time 

21 

22try: 

23 import Queue 

24except ImportError: # pragma: no cover 

25 import queue as Queue 

26 

27from c7n.utils import get_retry 

28 

29FLUSH_MARKER = object() 

30SHUTDOWN_MARKER = object() 

31 

32EMPTY = Queue.Empty 

33 

34 

35class Error: 

36 

37 AlreadyAccepted = "DataAlreadyAcceptedException" 

38 InvalidToken = "InvalidSequenceTokenException" 

39 ResourceExists = "ResourceAlreadyExistsException" 

40 

41 @staticmethod 

42 def code(e): 

43 return e.response.get('Error', {}).get('Code') 

44 

45 

46class CloudWatchLogHandler(logging.Handler): 

47 """Python Log Handler to Send to Cloud Watch Logs 

48 

49 https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/WhatIsCloudWatchLogs.html 

50 """ 

51 

52 batch_size = 20 

53 batch_interval = 40 

54 batch_min_buffer = 10 

55 

56 def __init__(self, log_group=__name__, log_stream=None, 

57 session_factory=None): 

58 super(CloudWatchLogHandler, self).__init__() 

59 self.log_group = log_group 

60 self.log_stream = log_stream 

61 self.session_factory = session_factory 

62 self.transport = None 

63 self.queue = Queue.Queue() 

64 self.threads = [] 

65 # do some basic buffering before sending to transport to minimize 

66 # queue/threading overhead 

67 self.buf = [] 

68 self.last_seen = time.time() 

69 # Logging module internally is tracking all handlers, for final 

70 # cleanup atexit, custodian is a bit more explicitly scoping shutdown to 

71 # each policy, so use a sentinel value to avoid deadlocks. 

72 self.shutdown = True 

73 retry = get_retry(('ThrottlingException',)) 

74 try: 

75 client = self.session_factory().client('logs') 

76 logs = retry( 

77 client.describe_log_groups, 

78 logGroupNamePrefix=self.log_group)['logGroups'] 

79 if not [lg for lg in logs if lg['logGroupName'] == self.log_group]: 

80 retry(client.create_log_group, 

81 logGroupName=self.log_group) 

82 except ClientError as e: 

83 if Error.code(e) != Error.ResourceExists: 

84 raise 

85 

86 # Begin logging.Handler API 

87 def emit(self, message): 

88 """Send logs""" 

89 # We're sending messages asynchronously, bubble to caller when 

90 # we've detected an error on the message. This isn't great, 

91 # but options once we've gone async without a deferred/promise 

92 # aren't great. 

93 if self.transport and self.transport.error: 

94 raise self.transport.error 

95 

96 # Sanity safety, people do like to recurse by attaching to 

97 # root log :-( 

98 if message.name.startswith('boto'): 

99 return 

100 

101 msg = self.format_message(message) 

102 if not self.transport: 

103 self.shutdown = False 

104 self.start_transports() 

105 self.buf.append(msg) 

106 self.flush_buffers( 

107 (message.created - self.last_seen >= self.batch_interval)) 

108 

109 self.last_seen = message.created 

110 

111 def flush(self): 

112 """Ensure all logging output has been flushed.""" 

113 if self.shutdown: 

114 return 

115 self.flush_buffers(force=True) 

116 self.queue.put(FLUSH_MARKER) 

117 self.queue.join() 

118 

119 def close(self): 

120 if self.shutdown: 

121 return 

122 self.shutdown = True 

123 self.queue.put(SHUTDOWN_MARKER) 

124 self.queue.join() 

125 for t in self.threads: 

126 t.join() 

127 self.threads = [] 

128 

129 # End logging.Handler API 

130 

131 def format_message(self, msg): 

132 """format message.""" 

133 return {'timestamp': int(msg.created * 1000), 

134 'message': self.format(msg), 

135 'stream': self.log_stream or msg.name, 

136 'group': self.log_group} 

137 

138 def start_transports(self): 

139 """start thread transports.""" 

140 self.transport = Transport( 

141 self.queue, self.batch_size, self.batch_interval, 

142 self.session_factory) 

143 thread = threading.Thread(target=self.transport.loop) 

144 self.threads.append(thread) 

145 thread.daemon = True 

146 thread.start() 

147 

148 def flush_buffers(self, force=False): 

149 if not force and len(self.buf) < self.batch_min_buffer: 

150 return 

151 self.queue.put(self.buf) 

152 self.buf = [] 

153 

154 

155class Transport: 

156 

157 def __init__(self, queue, batch_size, batch_interval, session_factory): 

158 self.queue = queue 

159 self.batch_size = batch_size 

160 self.batch_interval = batch_interval 

161 self.client = session_factory().client('logs') 

162 self.sequences = {} 

163 self.buffers = {} 

164 self.error = None 

165 

166 def create_stream(self, group, stream): 

167 try: 

168 self.client.create_log_stream( 

169 logGroupName=group, logStreamName=stream) 

170 except ClientError as e: 

171 if Error.code(e) != Error.ResourceExists: 

172 self.error = e 

173 return False 

174 return True 

175 

176 def send(self): 

177 for k, messages in self.buffers.items(): 

178 self.send_group(k, messages) 

179 self.buffers = {} 

180 

181 def send_group(self, k, messages): 

182 group, stream = k.split('=', 1) 

183 if stream not in self.sequences: 

184 if not self.create_stream(group, stream): 

185 return 

186 self.sequences[stream] = None 

187 params = dict( 

188 logGroupName=group, logStreamName=stream, 

189 logEvents=sorted( 

190 messages, key=itemgetter('timestamp'), reverse=False)) 

191 if self.sequences[stream]: 

192 params['sequenceToken'] = self.sequences[stream] 

193 try: 

194 response = self.client.put_log_events(**params) 

195 except ClientError as e: 

196 if Error.code(e) in (Error.AlreadyAccepted, Error.InvalidToken): 

197 self.sequences[stream] = e.response['Error']['Message'].rsplit( 

198 " ", 1)[-1] 

199 return self.send_group(k, messages) 

200 self.error = e 

201 return 

202 self.sequences[stream] = response['nextSequenceToken'] 

203 

204 def loop(self): 

205 def keyed(datum): 

206 return "%s=%s" % ( 

207 datum.pop('group'), datum.pop('stream')) 

208 

209 while True: 

210 try: 

211 datum = self.queue.get(block=True, timeout=self.batch_interval) 

212 except EMPTY: 

213 if Queue is None: 

214 return 

215 datum = None 

216 if datum is None: 

217 # Timeout reached, flush 

218 self.send() 

219 continue 

220 elif datum == FLUSH_MARKER: 

221 self.send() 

222 elif datum == SHUTDOWN_MARKER: 

223 self.queue.task_done() 

224 return 

225 else: 

226 for k, group in itertools.groupby(datum, keyed): 

227 self.buffers.setdefault(k, []).extend(group) 

228 self.queue.task_done()