Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/utils/platform.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

40 statements  

1# Licensed to the Apache Software Foundation (ASF) under one 

2# or more contributor license agreements. See the NOTICE file 

3# distributed with this work for additional information 

4# regarding copyright ownership. The ASF licenses this file 

5# to you under the Apache License, Version 2.0 (the 

6# "License"); you may not use this file except in compliance 

7# with the License. You may obtain a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, 

12# software distributed under the License is distributed on an 

13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

14# KIND, either express or implied. See the License for the 

15# specific language governing permissions and limitations 

16# under the License. 

17"""Platform and system specific function.""" 

18 

19from __future__ import annotations 

20 

21import getpass 

22import logging 

23import os 

24import pkgutil 

25import platform 

26import sys 

27 

28from airflow.compat.functools import cache 

29 

30IS_WINDOWS = platform.system() == "Windows" 

31 

32log = logging.getLogger(__name__) 

33 

34 

35def is_tty(): 

36 """Check if stdout is connected (is associated with a terminal device) to a tty(-like) device.""" 

37 if not hasattr(sys.stdout, "isatty"): 

38 return False 

39 return sys.stdout.isatty() 

40 

41 

42def is_terminal_support_colors() -> bool: 

43 """Try to determine if the current terminal supports colors.""" 

44 if sys.platform == "win32": 

45 return False 

46 if not is_tty(): 

47 return False 

48 if "COLORTERM" in os.environ: 

49 return True 

50 term = os.environ.get("TERM", "dumb").lower() 

51 if term in ("xterm", "linux") or "color" in term: 

52 return True 

53 return False 

54 

55 

56def get_airflow_git_version(): 

57 """Return the git commit hash representing the current version of the application.""" 

58 git_version = None 

59 try: 

60 git_version = str(pkgutil.get_data("airflow", "git_version"), encoding="UTF-8") 

61 except Exception as e: 

62 log.debug(e) 

63 

64 return git_version 

65 

66 

67@cache 

68def getuser() -> str: 

69 """ 

70 Get the username of the current user, or error with a nice error message if there's no current user. 

71 

72 We don't want to fall back to os.getuid() because not having a username 

73 probably means the rest of the user environment is wrong (e.g. no $HOME). 

74 Explicit failure is better than silently trying to work badly. 

75 """ 

76 try: 

77 return getpass.getuser() 

78 except KeyError: 

79 # Inner import to avoid circular import 

80 from airflow.exceptions import AirflowConfigException 

81 

82 raise AirflowConfigException( 

83 "The user that Airflow is running as has no username; you must run" 

84 "Airflow as a full user, with a username and home directory, " 

85 "in order for it to function properly." 

86 )