Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/zmq/log/handlers.py: 37%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

65 statements  

1"""pyzmq logging handlers. 

2 

3This mainly defines the PUBHandler object for publishing logging messages over 

4a zmq.PUB socket. 

5 

6The PUBHandler can be used with the regular logging module, as in:: 

7 

8 >>> import logging 

9 >>> handler = PUBHandler('tcp://127.0.0.1:12345') 

10 >>> handler.root_topic = 'foo' 

11 >>> logger = logging.getLogger('foobar') 

12 >>> logger.setLevel(logging.DEBUG) 

13 >>> logger.addHandler(handler) 

14 

15Or using ``dictConfig``, as in:: 

16 

17 >>> from logging.config import dictConfig 

18 >>> socket = Context.instance().socket(PUB) 

19 >>> socket.connect('tcp://127.0.0.1:12345') 

20 >>> dictConfig({ 

21 >>> 'version': 1, 

22 >>> 'handlers': { 

23 >>> 'zmq': { 

24 >>> 'class': 'zmq.log.handlers.PUBHandler', 

25 >>> 'level': logging.DEBUG, 

26 >>> 'root_topic': 'foo', 

27 >>> 'interface_or_socket': socket 

28 >>> } 

29 >>> }, 

30 >>> 'root': { 

31 >>> 'level': 'DEBUG', 

32 >>> 'handlers': ['zmq'], 

33 >>> } 

34 >>> }) 

35 

36 

37After this point, all messages logged by ``logger`` will be published on the 

38PUB socket. 

39 

40Code adapted from StarCluster: 

41 

42 https://github.com/jtriley/StarCluster/blob/StarCluster-0.91/starcluster/logger.py 

43""" 

44 

45from __future__ import annotations 

46 

47import logging 

48from copy import copy 

49 

50import zmq 

51 

52# Copyright (C) PyZMQ Developers 

53# Distributed under the terms of the Modified BSD License. 

54 

55 

56TOPIC_DELIM = "::" # delimiter for splitting topics on the receiving end. 

57 

58 

59class PUBHandler(logging.Handler): 

60 """A basic logging handler that emits log messages through a PUB socket. 

61 

62 Takes a PUB socket already bound to interfaces or an interface to bind to. 

63 

64 Example:: 

65 

66 sock = context.socket(zmq.PUB) 

67 sock.bind('inproc://log') 

68 handler = PUBHandler(sock) 

69 

70 Or:: 

71 

72 handler = PUBHandler('inproc://loc') 

73 

74 These are equivalent. 

75 

76 Log messages handled by this handler are broadcast with ZMQ topics 

77 ``this.root_topic`` comes first, followed by the log level 

78 (DEBUG,INFO,etc.), followed by any additional subtopics specified in the 

79 message by: log.debug("subtopic.subsub::the real message") 

80 """ 

81 

82 ctx: zmq.Context 

83 socket: zmq.Socket 

84 

85 def __init__( 

86 self, 

87 interface_or_socket: str | zmq.Socket, 

88 context: zmq.Context | None = None, 

89 root_topic: str = '', 

90 ) -> None: 

91 logging.Handler.__init__(self) 

92 self.root_topic = root_topic 

93 self.formatters = { 

94 logging.DEBUG: logging.Formatter( 

95 "%(levelname)s %(filename)s:%(lineno)d - %(message)s\n" 

96 ), 

97 logging.INFO: logging.Formatter("%(message)s\n"), 

98 logging.WARN: logging.Formatter( 

99 "%(levelname)s %(filename)s:%(lineno)d - %(message)s\n" 

100 ), 

101 logging.ERROR: logging.Formatter( 

102 "%(levelname)s %(filename)s:%(lineno)d - %(message)s - %(exc_info)s\n" 

103 ), 

104 logging.CRITICAL: logging.Formatter( 

105 "%(levelname)s %(filename)s:%(lineno)d - %(message)s\n" 

106 ), 

107 } 

108 if isinstance(interface_or_socket, zmq.Socket): 

109 self.socket = interface_or_socket 

110 self.ctx = self.socket.context 

111 else: 

112 self.ctx = context or zmq.Context() 

113 self.socket = self.ctx.socket(zmq.PUB) 

114 self.socket.bind(interface_or_socket) 

115 

116 @property 

117 def root_topic(self) -> str: 

118 return self._root_topic 

119 

120 @root_topic.setter 

121 def root_topic(self, value: str): 

122 self.setRootTopic(value) 

123 

124 def setRootTopic(self, root_topic: str): 

125 """Set the root topic for this handler. 

126 

127 This value is prepended to all messages published by this handler, and it 

128 defaults to the empty string ''. When you subscribe to this socket, you must 

129 set your subscription to an empty string, or to at least the first letter of 

130 the binary representation of this string to ensure you receive any messages 

131 from this handler. 

132 

133 If you use the default empty string root topic, messages will begin with 

134 the binary representation of the log level string (INFO, WARN, etc.). 

135 Note that ZMQ SUB sockets can have multiple subscriptions. 

136 """ 

137 if isinstance(root_topic, bytes): 

138 root_topic = root_topic.decode("utf8") 

139 self._root_topic = root_topic 

140 

141 def setFormatter(self, fmt, level=logging.NOTSET): 

142 """Set the Formatter for this handler. 

143 

144 If no level is provided, the same format is used for all levels. This 

145 will overwrite all selective formatters set in the object constructor. 

146 """ 

147 if level == logging.NOTSET: 

148 for fmt_level in self.formatters.keys(): 

149 self.formatters[fmt_level] = fmt 

150 else: 

151 self.formatters[level] = fmt 

152 

153 def format(self, record): 

154 """Format a record.""" 

155 return self.formatters[record.levelno].format(record) 

156 

157 def emit(self, record): 

158 """Emit a log message on my socket.""" 

159 

160 # LogRecord.getMessage explicitly allows msg to be anything _castable_ to a str 

161 try: 

162 topic, msg = str(record.msg).split(TOPIC_DELIM, 1) 

163 except ValueError: 

164 topic = "" 

165 else: 

166 # copy to avoid mutating LogRecord in-place 

167 record = copy(record) 

168 record.msg = msg 

169 

170 try: 

171 bmsg = self.format(record).encode("utf8") 

172 except Exception: 

173 self.handleError(record) 

174 return 

175 

176 topic_list = [] 

177 

178 if self.root_topic: 

179 topic_list.append(self.root_topic) 

180 

181 topic_list.append(record.levelname) 

182 

183 if topic: 

184 topic_list.append(topic) 

185 

186 btopic = '.'.join(topic_list).encode("utf8", "replace") 

187 

188 self.socket.send_multipart([btopic, bmsg]) 

189 

190 

191class TopicLogger(logging.Logger): 

192 """A simple wrapper that takes an additional argument to log methods. 

193 

194 All the regular methods exist, but instead of one msg argument, two 

195 arguments: topic, msg are passed. 

196 

197 That is:: 

198 

199 logger.debug('msg') 

200 

201 Would become:: 

202 

203 logger.debug('topic.sub', 'msg') 

204 """ 

205 

206 def log(self, level, topic, msg, *args, **kwargs): 

207 """Log 'msg % args' with level and topic. 

208 

209 To pass exception information, use the keyword argument exc_info 

210 with a True value:: 

211 

212 logger.log(level, "zmq.fun", "We have a %s", 

213 "mysterious problem", exc_info=1) 

214 """ 

215 logging.Logger.log(self, level, f'{topic}{TOPIC_DELIM}{msg}', *args, **kwargs) 

216 

217 

218# Generate the methods of TopicLogger, since they are just adding a 

219# topic prefix to a message. 

220for name in "debug warn warning error critical fatal".split(): 

221 try: 

222 meth = getattr(logging.Logger, name) 

223 except AttributeError: 

224 # some methods are missing, e.g. Logger.warn was removed from Python 3.13 

225 continue 

226 setattr( 

227 TopicLogger, 

228 name, 

229 lambda self, level, topic, msg, *args, **kwargs: meth( 

230 self, level, topic + TOPIC_DELIM + msg, *args, **kwargs 

231 ), 

232 )