Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/utils/platform.py: 40%
40 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# Licensed to the Apache Software Foundation (ASF) under one
2# or more contributor license agreements. See the NOTICE file
3# distributed with this work for additional information
4# regarding copyright ownership. The ASF licenses this file
5# to you under the Apache License, Version 2.0 (the
6# "License"); you may not use this file except in compliance
7# with the License. You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing,
12# software distributed under the License is distributed on an
13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14# KIND, either express or implied. See the License for the
15# specific language governing permissions and limitations
16# under the License.
17"""Platform and system specific function."""
18from __future__ import annotations
20import getpass
21import logging
22import os
23import pkgutil
24import platform
25import sys
27from airflow.compat.functools import cache
29IS_WINDOWS = platform.system() == "Windows"
31log = logging.getLogger(__name__)
34def is_tty():
35 """
36 Checks if the standard output is connected (is associated with a terminal device) to a tty(-like)
37 device.
38 """
39 if not hasattr(sys.stdout, "isatty"):
40 return False
41 return sys.stdout.isatty()
44def is_terminal_support_colors() -> bool:
45 """Try to determine if the current terminal supports colors."""
46 if sys.platform == "win32":
47 return False
48 if not is_tty():
49 return False
50 if "COLORTERM" in os.environ:
51 return True
52 term = os.environ.get("TERM", "dumb").lower()
53 if term in ("xterm", "linux") or "color" in term:
54 return True
55 return False
58def get_airflow_git_version():
59 """Returns the git commit hash representing the current version of the application."""
60 git_version = None
61 try:
62 git_version = str(pkgutil.get_data("airflow", "git_version"), encoding="UTF-8")
63 except Exception as e:
64 log.debug(e)
66 return git_version
69@cache
70def getuser() -> str:
71 """
72 Gets the username associated with the current user, or error with a nice
73 error message if there's no current user.
75 We don't want to fall back to os.getuid() because not having a username
76 probably means the rest of the user environment is wrong (e.g. no $HOME).
77 Explicit failure is better than silently trying to work badly.
78 """
79 try:
80 return getpass.getuser()
81 except KeyError:
82 # Inner import to avoid circular import
83 from airflow.exceptions import AirflowConfigException
85 raise AirflowConfigException(
86 "The user that Airflow is running as has no username; you must run"
87 "Airflow as a full user, with a username and home directory, "
88 "in order for it to function properly."
89 )