Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/gunicorn/util.py: 39%
367 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:32 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:32 +0000
1# -*- coding: utf-8 -
2#
3# This file is part of gunicorn released under the MIT license.
4# See the NOTICE for more information.
5import ast
6import email.utils
7import errno
8import fcntl
9import html
10import importlib
11import inspect
12import io
13import logging
14import os
15import pwd
16import random
17import re
18import socket
19import sys
20import textwrap
21import time
22import traceback
23import warnings
25try:
26 import importlib.metadata as importlib_metadata
27except (ModuleNotFoundError, ImportError):
28 import importlib_metadata
30from gunicorn.errors import AppImportError
31from gunicorn.workers import SUPPORTED_WORKERS
32import urllib.parse
34REDIRECT_TO = getattr(os, 'devnull', '/dev/null')
36# Server and Date aren't technically hop-by-hop
37# headers, but they are in the purview of the
38# origin server which the WSGI spec says we should
39# act like. So we drop them and add our own.
40#
41# In the future, concatenation server header values
42# might be better, but nothing else does it and
43# dropping them is easier.
44hop_headers = set("""
45 connection keep-alive proxy-authenticate proxy-authorization
46 te trailers transfer-encoding upgrade
47 server date
48 """.split())
50try:
51 from setproctitle import setproctitle
53 def _setproctitle(title):
54 setproctitle("gunicorn: %s" % title)
55except ImportError:
56 def _setproctitle(title):
57 pass
60def load_entry_point(distribution, group, name):
61 dist_obj = importlib_metadata.distribution(distribution)
62 eps = [ep for ep in dist_obj.entry_points
63 if ep.group == group and ep.name == name]
64 if not eps:
65 raise ImportError("Entry point %r not found" % ((group, name),))
66 return eps[0].load()
69def load_class(uri, default="gunicorn.workers.sync.SyncWorker",
70 section="gunicorn.workers"):
71 if inspect.isclass(uri):
72 return uri
73 if uri.startswith("egg:"):
74 # uses entry points
75 entry_str = uri.split("egg:")[1]
76 try:
77 dist, name = entry_str.rsplit("#", 1)
78 except ValueError:
79 dist = entry_str
80 name = default
82 try:
83 return load_entry_point(dist, section, name)
84 except Exception:
85 exc = traceback.format_exc()
86 msg = "class uri %r invalid or not found: \n\n[%s]"
87 raise RuntimeError(msg % (uri, exc))
88 else:
89 components = uri.split('.')
90 if len(components) == 1:
91 while True:
92 if uri.startswith("#"):
93 uri = uri[1:]
95 if uri in SUPPORTED_WORKERS:
96 components = SUPPORTED_WORKERS[uri].split(".")
97 break
99 try:
100 return load_entry_point(
101 "gunicorn", section, uri
102 )
103 except Exception:
104 exc = traceback.format_exc()
105 msg = "class uri %r invalid or not found: \n\n[%s]"
106 raise RuntimeError(msg % (uri, exc))
108 klass = components.pop(-1)
110 try:
111 mod = importlib.import_module('.'.join(components))
112 except Exception:
113 exc = traceback.format_exc()
114 msg = "class uri %r invalid or not found: \n\n[%s]"
115 raise RuntimeError(msg % (uri, exc))
116 return getattr(mod, klass)
119positionals = (
120 inspect.Parameter.POSITIONAL_ONLY,
121 inspect.Parameter.POSITIONAL_OR_KEYWORD,
122)
125def get_arity(f):
126 sig = inspect.signature(f)
127 arity = 0
129 for param in sig.parameters.values():
130 if param.kind in positionals:
131 arity += 1
133 return arity
136def get_username(uid):
137 """ get the username for a user id"""
138 return pwd.getpwuid(uid).pw_name
141def set_owner_process(uid, gid, initgroups=False):
142 """ set user and group of workers processes """
144 if gid:
145 if uid:
146 try:
147 username = get_username(uid)
148 except KeyError:
149 initgroups = False
151 # versions of python < 2.6.2 don't manage unsigned int for
152 # groups like on osx or fedora
153 gid = abs(gid) & 0x7FFFFFFF
155 if initgroups:
156 os.initgroups(username, gid)
157 elif gid != os.getgid():
158 os.setgid(gid)
160 if uid and uid != os.getuid():
161 os.setuid(uid)
164def chown(path, uid, gid):
165 os.chown(path, uid, gid)
168if sys.platform.startswith("win"):
169 def _waitfor(func, pathname, waitall=False):
170 # Perform the operation
171 func(pathname)
172 # Now setup the wait loop
173 if waitall:
174 dirname = pathname
175 else:
176 dirname, name = os.path.split(pathname)
177 dirname = dirname or '.'
178 # Check for `pathname` to be removed from the filesystem.
179 # The exponential backoff of the timeout amounts to a total
180 # of ~1 second after which the deletion is probably an error
181 # anyway.
182 # Testing on a i7@4.3GHz shows that usually only 1 iteration is
183 # required when contention occurs.
184 timeout = 0.001
185 while timeout < 1.0:
186 # Note we are only testing for the existence of the file(s) in
187 # the contents of the directory regardless of any security or
188 # access rights. If we have made it this far, we have sufficient
189 # permissions to do that much using Python's equivalent of the
190 # Windows API FindFirstFile.
191 # Other Windows APIs can fail or give incorrect results when
192 # dealing with files that are pending deletion.
193 L = os.listdir(dirname)
194 if not L if waitall else name in L:
195 return
196 # Increase the timeout and try again
197 time.sleep(timeout)
198 timeout *= 2
199 warnings.warn('tests may fail, delete still pending for ' + pathname,
200 RuntimeWarning, stacklevel=4)
202 def _unlink(filename):
203 _waitfor(os.unlink, filename)
204else:
205 _unlink = os.unlink
208def unlink(filename):
209 try:
210 _unlink(filename)
211 except OSError as error:
212 # The filename need not exist.
213 if error.errno not in (errno.ENOENT, errno.ENOTDIR):
214 raise
217def is_ipv6(addr):
218 try:
219 socket.inet_pton(socket.AF_INET6, addr)
220 except socket.error: # not a valid address
221 return False
222 except ValueError: # ipv6 not supported on this platform
223 return False
224 return True
227def parse_address(netloc, default_port='8000'):
228 if re.match(r'unix:(//)?', netloc):
229 return re.split(r'unix:(//)?', netloc)[-1]
231 if netloc.startswith("fd://"):
232 fd = netloc[5:]
233 try:
234 return int(fd)
235 except ValueError:
236 raise RuntimeError("%r is not a valid file descriptor." % fd) from None
238 if netloc.startswith("tcp://"):
239 netloc = netloc.split("tcp://")[1]
240 host, port = netloc, default_port
242 if '[' in netloc and ']' in netloc:
243 host = netloc.split(']')[0][1:]
244 port = (netloc.split(']:') + [default_port])[1]
245 elif ':' in netloc:
246 host, port = (netloc.split(':') + [default_port])[:2]
247 elif netloc == "":
248 host, port = "0.0.0.0", default_port
250 try:
251 port = int(port)
252 except ValueError:
253 raise RuntimeError("%r is not a valid port number." % port)
255 return host.lower(), port
258def close_on_exec(fd):
259 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
260 flags |= fcntl.FD_CLOEXEC
261 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
264def set_non_blocking(fd):
265 flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK
266 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
269def close(sock):
270 try:
271 sock.close()
272 except socket.error:
273 pass
276try:
277 from os import closerange
278except ImportError:
279 def closerange(fd_low, fd_high):
280 # Iterate through and close all file descriptors.
281 for fd in range(fd_low, fd_high):
282 try:
283 os.close(fd)
284 except OSError: # ERROR, fd wasn't open to begin with (ignored)
285 pass
288def write_chunk(sock, data):
289 if isinstance(data, str):
290 data = data.encode('utf-8')
291 chunk_size = "%X\r\n" % len(data)
292 chunk = b"".join([chunk_size.encode('utf-8'), data, b"\r\n"])
293 sock.sendall(chunk)
296def write(sock, data, chunked=False):
297 if chunked:
298 return write_chunk(sock, data)
299 sock.sendall(data)
302def write_nonblock(sock, data, chunked=False):
303 timeout = sock.gettimeout()
304 if timeout != 0.0:
305 try:
306 sock.setblocking(0)
307 return write(sock, data, chunked)
308 finally:
309 sock.setblocking(1)
310 else:
311 return write(sock, data, chunked)
314def write_error(sock, status_int, reason, mesg):
315 html_error = textwrap.dedent("""\
316 <html>
317 <head>
318 <title>%(reason)s</title>
319 </head>
320 <body>
321 <h1><p>%(reason)s</p></h1>
322 %(mesg)s
323 </body>
324 </html>
325 """) % {"reason": reason, "mesg": html.escape(mesg)}
327 http = textwrap.dedent("""\
328 HTTP/1.1 %s %s\r
329 Connection: close\r
330 Content-Type: text/html\r
331 Content-Length: %d\r
332 \r
333 %s""") % (str(status_int), reason, len(html_error), html_error)
334 write_nonblock(sock, http.encode('latin1'))
337def _called_with_wrong_args(f):
338 """Check whether calling a function raised a ``TypeError`` because
339 the call failed or because something in the function raised the
340 error.
342 :param f: The function that was called.
343 :return: ``True`` if the call failed.
344 """
345 tb = sys.exc_info()[2]
347 try:
348 while tb is not None:
349 if tb.tb_frame.f_code is f.__code__:
350 # In the function, it was called successfully.
351 return False
353 tb = tb.tb_next
355 # Didn't reach the function.
356 return True
357 finally:
358 # Delete tb to break a circular reference in Python 2.
359 # https://docs.python.org/2/library/sys.html#sys.exc_info
360 del tb
363def import_app(module):
364 parts = module.split(":", 1)
365 if len(parts) == 1:
366 obj = "application"
367 else:
368 module, obj = parts[0], parts[1]
370 try:
371 mod = importlib.import_module(module)
372 except ImportError:
373 if module.endswith(".py") and os.path.exists(module):
374 msg = "Failed to find application, did you mean '%s:%s'?"
375 raise ImportError(msg % (module.rsplit(".", 1)[0], obj))
376 raise
378 # Parse obj as a single expression to determine if it's a valid
379 # attribute name or function call.
380 try:
381 expression = ast.parse(obj, mode="eval").body
382 except SyntaxError:
383 raise AppImportError(
384 "Failed to parse %r as an attribute name or function call." % obj
385 )
387 if isinstance(expression, ast.Name):
388 name = expression.id
389 args = kwargs = None
390 elif isinstance(expression, ast.Call):
391 # Ensure the function name is an attribute name only.
392 if not isinstance(expression.func, ast.Name):
393 raise AppImportError("Function reference must be a simple name: %r" % obj)
395 name = expression.func.id
397 # Parse the positional and keyword arguments as literals.
398 try:
399 args = [ast.literal_eval(arg) for arg in expression.args]
400 kwargs = {kw.arg: ast.literal_eval(kw.value) for kw in expression.keywords}
401 except ValueError:
402 # literal_eval gives cryptic error messages, show a generic
403 # message with the full expression instead.
404 raise AppImportError(
405 "Failed to parse arguments as literal values: %r" % obj
406 )
407 else:
408 raise AppImportError(
409 "Failed to parse %r as an attribute name or function call." % obj
410 )
412 is_debug = logging.root.level == logging.DEBUG
413 try:
414 app = getattr(mod, name)
415 except AttributeError:
416 if is_debug:
417 traceback.print_exception(*sys.exc_info())
418 raise AppImportError("Failed to find attribute %r in %r." % (name, module))
420 # If the expression was a function call, call the retrieved object
421 # to get the real application.
422 if args is not None:
423 try:
424 app = app(*args, **kwargs)
425 except TypeError as e:
426 # If the TypeError was due to bad arguments to the factory
427 # function, show Python's nice error message without a
428 # traceback.
429 if _called_with_wrong_args(app):
430 raise AppImportError(
431 "".join(traceback.format_exception_only(TypeError, e)).strip()
432 )
434 # Otherwise it was raised from within the function, show the
435 # full traceback.
436 raise
438 if app is None:
439 raise AppImportError("Failed to find application object: %r" % obj)
441 if not callable(app):
442 raise AppImportError("Application object must be callable.")
443 return app
446def getcwd():
447 # get current path, try to use PWD env first
448 try:
449 a = os.stat(os.environ['PWD'])
450 b = os.stat(os.getcwd())
451 if a.st_ino == b.st_ino and a.st_dev == b.st_dev:
452 cwd = os.environ['PWD']
453 else:
454 cwd = os.getcwd()
455 except Exception:
456 cwd = os.getcwd()
457 return cwd
460def http_date(timestamp=None):
461 """Return the current date and time formatted for a message header."""
462 if timestamp is None:
463 timestamp = time.time()
464 s = email.utils.formatdate(timestamp, localtime=False, usegmt=True)
465 return s
468def is_hoppish(header):
469 return header.lower().strip() in hop_headers
472def daemonize(enable_stdio_inheritance=False):
473 """\
474 Standard daemonization of a process.
475 http://www.faqs.org/faqs/unix-faq/programmer/faq/ section 1.7
476 """
477 if 'GUNICORN_FD' not in os.environ:
478 if os.fork():
479 os._exit(0)
480 os.setsid()
482 if os.fork():
483 os._exit(0)
485 os.umask(0o22)
487 # In both the following any file descriptors above stdin
488 # stdout and stderr are left untouched. The inheritance
489 # option simply allows one to have output go to a file
490 # specified by way of shell redirection when not wanting
491 # to use --error-log option.
493 if not enable_stdio_inheritance:
494 # Remap all of stdin, stdout and stderr on to
495 # /dev/null. The expectation is that users have
496 # specified the --error-log option.
498 closerange(0, 3)
500 fd_null = os.open(REDIRECT_TO, os.O_RDWR)
501 # PEP 446, make fd for /dev/null inheritable
502 os.set_inheritable(fd_null, True)
504 # expect fd_null to be always 0 here, but in-case not ...
505 if fd_null != 0:
506 os.dup2(fd_null, 0)
508 os.dup2(fd_null, 1)
509 os.dup2(fd_null, 2)
511 else:
512 fd_null = os.open(REDIRECT_TO, os.O_RDWR)
514 # Always redirect stdin to /dev/null as we would
515 # never expect to need to read interactive input.
517 if fd_null != 0:
518 os.close(0)
519 os.dup2(fd_null, 0)
521 # If stdout and stderr are still connected to
522 # their original file descriptors we check to see
523 # if they are associated with terminal devices.
524 # When they are we map them to /dev/null so that
525 # are still detached from any controlling terminal
526 # properly. If not we preserve them as they are.
527 #
528 # If stdin and stdout were not hooked up to the
529 # original file descriptors, then all bets are
530 # off and all we can really do is leave them as
531 # they were.
532 #
533 # This will allow 'gunicorn ... > output.log 2>&1'
534 # to work with stdout/stderr going to the file
535 # as expected.
536 #
537 # Note that if using --error-log option, the log
538 # file specified through shell redirection will
539 # only be used up until the log file specified
540 # by the option takes over. As it replaces stdout
541 # and stderr at the file descriptor level, then
542 # anything using stdout or stderr, including having
543 # cached a reference to them, will still work.
545 def redirect(stream, fd_expect):
546 try:
547 fd = stream.fileno()
548 if fd == fd_expect and stream.isatty():
549 os.close(fd)
550 os.dup2(fd_null, fd)
551 except AttributeError:
552 pass
554 redirect(sys.stdout, 1)
555 redirect(sys.stderr, 2)
558def seed():
559 try:
560 random.seed(os.urandom(64))
561 except NotImplementedError:
562 random.seed('%s.%s' % (time.time(), os.getpid()))
565def check_is_writable(path):
566 try:
567 with open(path, 'a') as f:
568 f.close()
569 except IOError as e:
570 raise RuntimeError("Error: '%s' isn't writable [%r]" % (path, e))
573def to_bytestring(value, encoding="utf8"):
574 """Converts a string argument to a byte string"""
575 if isinstance(value, bytes):
576 return value
577 if not isinstance(value, str):
578 raise TypeError('%r is not a string' % value)
580 return value.encode(encoding)
583def has_fileno(obj):
584 if not hasattr(obj, "fileno"):
585 return False
587 # check BytesIO case and maybe others
588 try:
589 obj.fileno()
590 except (AttributeError, IOError, io.UnsupportedOperation):
591 return False
593 return True
596def warn(msg):
597 print("!!!", file=sys.stderr)
599 lines = msg.splitlines()
600 for i, line in enumerate(lines):
601 if i == 0:
602 line = "WARNING: %s" % line
603 print("!!! %s" % line, file=sys.stderr)
605 print("!!!\n", file=sys.stderr)
606 sys.stderr.flush()
609def make_fail_app(msg):
610 msg = to_bytestring(msg)
612 def app(environ, start_response):
613 start_response("500 Internal Server Error", [
614 ("Content-Type", "text/plain"),
615 ("Content-Length", str(len(msg)))
616 ])
617 return [msg]
619 return app
622def split_request_uri(uri):
623 if uri.startswith("//"):
624 # When the path starts with //, urlsplit considers it as a
625 # relative uri while the RFC says we should consider it as abs_path
626 # http://www.w3.org/Protocols/rfc2616/rfc2616-sec5.html#sec5.1.2
627 # We use temporary dot prefix to workaround this behaviour
628 parts = urllib.parse.urlsplit("." + uri)
629 return parts._replace(path=parts.path[1:])
631 return urllib.parse.urlsplit(uri)
634# From six.reraise
635def reraise(tp, value, tb=None):
636 try:
637 if value is None:
638 value = tp()
639 if value.__traceback__ is not tb:
640 raise value.with_traceback(tb)
641 raise value
642 finally:
643 value = None
644 tb = None
647def bytes_to_str(b):
648 if isinstance(b, str):
649 return b
650 return str(b, 'latin1')
653def unquote_to_wsgi_str(string):
654 return urllib.parse.unquote_to_bytes(string).decode('latin-1')