Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/utils/platform.py: 40%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17"""Platform and system specific function."""
19from __future__ import annotations
21import getpass
22import logging
23import os
24import pkgutil
25import platform
26import sys
28from airflow.compat.functools import cache
30IS_WINDOWS = platform.system() == "Windows"
32log = logging.getLogger(__name__)
35def is_tty():
36 """Check if stdout is connected (is associated with a terminal device) to a tty(-like) device."""
37 if not hasattr(sys.stdout, "isatty"):
38 return False
39 return sys.stdout.isatty()
42def is_terminal_support_colors() -> bool:
43 """Try to determine if the current terminal supports colors."""
44 if sys.platform == "win32":
45 return False
46 if not is_tty():
47 return False
48 if "COLORTERM" in os.environ:
49 return True
50 term = os.environ.get("TERM", "dumb").lower()
51 if term in ("xterm", "linux") or "color" in term:
52 return True
53 return False
56def get_airflow_git_version():
57 """Return the git commit hash representing the current version of the application."""
58 git_version = None
59 try:
60 git_version = str(pkgutil.get_data("airflow", "git_version"), encoding="UTF-8")
61 except Exception as e:
62 log.debug(e)
64 return git_version
67@cache
68def getuser() -> str:
69 """
70 Get the username of the current user, or error with a nice error message if there's no current user.
72 We don't want to fall back to os.getuid() because not having a username
73 probably means the rest of the user environment is wrong (e.g. no $HOME).
74 Explicit failure is better than silently trying to work badly.
75 """
76 try:
77 return getpass.getuser()
78 except KeyError:
79 # Inner import to avoid circular import
80 from airflow.exceptions import AirflowConfigException
82 raise AirflowConfigException(
83 "The user that Airflow is running as has no username; you must run"
84 "Airflow as a full user, with a username and home directory, "
85 "in order for it to function properly."
86 )