1"""pyzmq logging handlers.
2
3This mainly defines the PUBHandler object for publishing logging messages over
4a zmq.PUB socket.
5
6The PUBHandler can be used with the regular logging module, as in::
7
8 >>> import logging
9 >>> handler = PUBHandler('tcp://127.0.0.1:12345')
10 >>> handler.root_topic = 'foo'
11 >>> logger = logging.getLogger('foobar')
12 >>> logger.setLevel(logging.DEBUG)
13 >>> logger.addHandler(handler)
14
15Or using ``dictConfig``, as in::
16
17 >>> from logging.config import dictConfig
18 >>> socket = Context.instance().socket(PUB)
19 >>> socket.connect('tcp://127.0.0.1:12345')
20 >>> dictConfig({
21 >>> 'version': 1,
22 >>> 'handlers': {
23 >>> 'zmq': {
24 >>> 'class': 'zmq.log.handlers.PUBHandler',
25 >>> 'level': logging.DEBUG,
26 >>> 'root_topic': 'foo',
27 >>> 'interface_or_socket': socket
28 >>> }
29 >>> },
30 >>> 'root': {
31 >>> 'level': 'DEBUG',
32 >>> 'handlers': ['zmq'],
33 >>> }
34 >>> })
35
36
37After this point, all messages logged by ``logger`` will be published on the
38PUB socket.
39
40Code adapted from StarCluster:
41
42 https://github.com/jtriley/StarCluster/blob/StarCluster-0.91/starcluster/logger.py
43"""
44
45from __future__ import annotations
46
47import logging
48from copy import copy
49
50import zmq
51
52# Copyright (C) PyZMQ Developers
53# Distributed under the terms of the Modified BSD License.
54
55
56TOPIC_DELIM = "::" # delimiter for splitting topics on the receiving end.
57
58
59class PUBHandler(logging.Handler):
60 """A basic logging handler that emits log messages through a PUB socket.
61
62 Takes a PUB socket already bound to interfaces or an interface to bind to.
63
64 Example::
65
66 sock = context.socket(zmq.PUB)
67 sock.bind('inproc://log')
68 handler = PUBHandler(sock)
69
70 Or::
71
72 handler = PUBHandler('inproc://loc')
73
74 These are equivalent.
75
76 Log messages handled by this handler are broadcast with ZMQ topics
77 ``this.root_topic`` comes first, followed by the log level
78 (DEBUG,INFO,etc.), followed by any additional subtopics specified in the
79 message by: log.debug("subtopic.subsub::the real message")
80 """
81
82 ctx: zmq.Context
83 socket: zmq.Socket
84
85 def __init__(
86 self,
87 interface_or_socket: str | zmq.Socket,
88 context: zmq.Context | None = None,
89 root_topic: str = '',
90 ) -> None:
91 logging.Handler.__init__(self)
92 self.root_topic = root_topic
93 self.formatters = {
94 logging.DEBUG: logging.Formatter(
95 "%(levelname)s %(filename)s:%(lineno)d - %(message)s\n"
96 ),
97 logging.INFO: logging.Formatter("%(message)s\n"),
98 logging.WARN: logging.Formatter(
99 "%(levelname)s %(filename)s:%(lineno)d - %(message)s\n"
100 ),
101 logging.ERROR: logging.Formatter(
102 "%(levelname)s %(filename)s:%(lineno)d - %(message)s - %(exc_info)s\n"
103 ),
104 logging.CRITICAL: logging.Formatter(
105 "%(levelname)s %(filename)s:%(lineno)d - %(message)s\n"
106 ),
107 }
108 if isinstance(interface_or_socket, zmq.Socket):
109 self.socket = interface_or_socket
110 self.ctx = self.socket.context
111 else:
112 self.ctx = context or zmq.Context()
113 self.socket = self.ctx.socket(zmq.PUB)
114 self.socket.bind(interface_or_socket)
115
116 @property
117 def root_topic(self) -> str:
118 return self._root_topic
119
120 @root_topic.setter
121 def root_topic(self, value: str):
122 self.setRootTopic(value)
123
124 def setRootTopic(self, root_topic: str):
125 """Set the root topic for this handler.
126
127 This value is prepended to all messages published by this handler, and it
128 defaults to the empty string ''. When you subscribe to this socket, you must
129 set your subscription to an empty string, or to at least the first letter of
130 the binary representation of this string to ensure you receive any messages
131 from this handler.
132
133 If you use the default empty string root topic, messages will begin with
134 the binary representation of the log level string (INFO, WARN, etc.).
135 Note that ZMQ SUB sockets can have multiple subscriptions.
136 """
137 if isinstance(root_topic, bytes):
138 root_topic = root_topic.decode("utf8")
139 self._root_topic = root_topic
140
141 def setFormatter(self, fmt, level=logging.NOTSET):
142 """Set the Formatter for this handler.
143
144 If no level is provided, the same format is used for all levels. This
145 will overwrite all selective formatters set in the object constructor.
146 """
147 if level == logging.NOTSET:
148 for fmt_level in self.formatters.keys():
149 self.formatters[fmt_level] = fmt
150 else:
151 self.formatters[level] = fmt
152
153 def format(self, record):
154 """Format a record."""
155 return self.formatters[record.levelno].format(record)
156
157 def emit(self, record):
158 """Emit a log message on my socket."""
159
160 # LogRecord.getMessage explicitly allows msg to be anything _castable_ to a str
161 try:
162 topic, msg = str(record.msg).split(TOPIC_DELIM, 1)
163 except ValueError:
164 topic = ""
165 else:
166 # copy to avoid mutating LogRecord in-place
167 record = copy(record)
168 record.msg = msg
169
170 try:
171 bmsg = self.format(record).encode("utf8")
172 except Exception:
173 self.handleError(record)
174 return
175
176 topic_list = []
177
178 if self.root_topic:
179 topic_list.append(self.root_topic)
180
181 topic_list.append(record.levelname)
182
183 if topic:
184 topic_list.append(topic)
185
186 btopic = '.'.join(topic_list).encode("utf8", "replace")
187
188 self.socket.send_multipart([btopic, bmsg])
189
190
191class TopicLogger(logging.Logger):
192 """A simple wrapper that takes an additional argument to log methods.
193
194 All the regular methods exist, but instead of one msg argument, two
195 arguments: topic, msg are passed.
196
197 That is::
198
199 logger.debug('msg')
200
201 Would become::
202
203 logger.debug('topic.sub', 'msg')
204 """
205
206 def log(self, level, topic, msg, *args, **kwargs):
207 """Log 'msg % args' with level and topic.
208
209 To pass exception information, use the keyword argument exc_info
210 with a True value::
211
212 logger.log(level, "zmq.fun", "We have a %s",
213 "mysterious problem", exc_info=1)
214 """
215 logging.Logger.log(self, level, f'{topic}{TOPIC_DELIM}{msg}', *args, **kwargs)
216
217
218# Generate the methods of TopicLogger, since they are just adding a
219# topic prefix to a message.
220for name in "debug warn warning error critical fatal".split():
221 try:
222 meth = getattr(logging.Logger, name)
223 except AttributeError:
224 # some methods are missing, e.g. Logger.warn was removed from Python 3.13
225 continue
226 setattr(
227 TopicLogger,
228 name,
229 lambda self, level, topic, msg, *args, **kwargs: meth(
230 self, level, topic + TOPIC_DELIM + msg, *args, **kwargs
231 ),
232 )