Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/utils/operator_resources.py: 52%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

65 statements  

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20from airflow.configuration import conf 

21from airflow.exceptions import AirflowException 

22 

23# Constants for resources (megabytes are the base unit) 

24MB = 1 

25GB = 1024 * MB 

26TB = 1024 * GB 

27PB = 1024 * TB 

28EB = 1024 * PB 

29 

30 

31class Resource: 

32 """ 

33 Represents a resource requirement in an execution environment for an operator. 

34 

35 :param name: Name of the resource 

36 :param units_str: The string representing the units of a resource (e.g. MB for a CPU 

37 resource) to be used for display purposes 

38 :param qty: The number of units of the specified resource that are required for 

39 execution of the operator. 

40 """ 

41 

42 def __init__(self, name, units_str, qty): 

43 if qty < 0: 

44 raise AirflowException( 

45 f"Received resource quantity {qty} for resource {name}, " 

46 f"but resource quantity must be non-negative." 

47 ) 

48 

49 self._name = name 

50 self._units_str = units_str 

51 self._qty = qty 

52 

53 def __eq__(self, other): 

54 if not isinstance(other, self.__class__): 

55 return NotImplemented 

56 return self.__dict__ == other.__dict__ 

57 

58 def __repr__(self): 

59 return str(self.__dict__) 

60 

61 @property 

62 def name(self): 

63 """Name of the resource.""" 

64 return self._name 

65 

66 @property 

67 def units_str(self): 

68 """The string representing the units of a resource.""" 

69 return self._units_str 

70 

71 @property 

72 def qty(self): 

73 """The number of units of the specified resource that are required for execution of the operator.""" 

74 return self._qty 

75 

76 def to_dict(self): 

77 return { 

78 "name": self.name, 

79 "qty": self.qty, 

80 "units_str": self.units_str, 

81 } 

82 

83 

84class CpuResource(Resource): 

85 """Represents a CPU requirement in an execution environment for an operator.""" 

86 

87 def __init__(self, qty): 

88 super().__init__("CPU", "core(s)", qty) 

89 

90 

91class RamResource(Resource): 

92 """Represents a RAM requirement in an execution environment for an operator.""" 

93 

94 def __init__(self, qty): 

95 super().__init__("RAM", "MB", qty) 

96 

97 

98class DiskResource(Resource): 

99 """Represents a disk requirement in an execution environment for an operator.""" 

100 

101 def __init__(self, qty): 

102 super().__init__("Disk", "MB", qty) 

103 

104 

105class GpuResource(Resource): 

106 """Represents a GPU requirement in an execution environment for an operator.""" 

107 

108 def __init__(self, qty): 

109 super().__init__("GPU", "gpu(s)", qty) 

110 

111 

112class Resources: 

113 """ 

114 The resources required by an operator. 

115 

116 Resources that are not specified will use the default values from the airflow config. 

117 

118 :param cpus: The number of cpu cores that are required 

119 :param ram: The amount of RAM required 

120 :param disk: The amount of disk space required 

121 :param gpus: The number of gpu units that are required 

122 """ 

123 

124 def __init__( 

125 self, 

126 cpus=conf.getint("operators", "default_cpus"), 

127 ram=conf.getint("operators", "default_ram"), 

128 disk=conf.getint("operators", "default_disk"), 

129 gpus=conf.getint("operators", "default_gpus"), 

130 ): 

131 self.cpus = CpuResource(cpus) 

132 self.ram = RamResource(ram) 

133 self.disk = DiskResource(disk) 

134 self.gpus = GpuResource(gpus) 

135 

136 def __eq__(self, other): 

137 if not isinstance(other, self.__class__): 

138 return NotImplemented 

139 return self.__dict__ == other.__dict__ 

140 

141 def __repr__(self): 

142 return str(self.__dict__) 

143 

144 def to_dict(self): 

145 return { 

146 "cpus": self.cpus.to_dict(), 

147 "ram": self.ram.to_dict(), 

148 "disk": self.disk.to_dict(), 

149 "gpus": self.gpus.to_dict(), 

150 } 

151 

152 @classmethod 

153 def from_dict(cls, resources_dict: dict): 

154 """Create resources from resources dict.""" 

155 cpus = resources_dict["cpus"]["qty"] 

156 ram = resources_dict["ram"]["qty"] 

157 disk = resources_dict["disk"]["qty"] 

158 gpus = resources_dict["gpus"]["qty"] 

159 

160 return cls(cpus=cpus, ram=ram, disk=disk, gpus=gpus)