Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/airflow/utils/operator_resources.py: 52%

65 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# 

2# Licensed to the Apache Software Foundation (ASF) under one 

3# or more contributor license agreements. See the NOTICE file 

4# distributed with this work for additional information 

5# regarding copyright ownership. The ASF licenses this file 

6# to you under the Apache License, Version 2.0 (the 

7# "License"); you may not use this file except in compliance 

8# with the License. You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, 

13# software distributed under the License is distributed on an 

14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 

15# KIND, either express or implied. See the License for the 

16# specific language governing permissions and limitations 

17# under the License. 

18from __future__ import annotations 

19 

20from airflow.configuration import conf 

21from airflow.exceptions import AirflowException 

22 

23# Constants for resources (megabytes are the base unit) 

24MB = 1 

25GB = 1024 * MB 

26TB = 1024 * GB 

27PB = 1024 * TB 

28EB = 1024 * PB 

29 

30 

31class Resource: 

32 """ 

33 Represents a resource requirement in an execution environment for an operator. 

34 

35 :param name: Name of the resource 

36 :param units_str: The string representing the units of a resource (e.g. MB for a CPU 

37 resource) to be used for display purposes 

38 :param qty: The number of units of the specified resource that are required for 

39 execution of the operator. 

40 """ 

41 

42 def __init__(self, name, units_str, qty): 

43 if qty < 0: 

44 raise AirflowException( 

45 f"Received resource quantity {qty} for resource {name}, " 

46 f"but resource quantity must be non-negative." 

47 ) 

48 

49 self._name = name 

50 self._units_str = units_str 

51 self._qty = qty 

52 

53 def __eq__(self, other): 

54 if not isinstance(other, self.__class__): 

55 return NotImplemented 

56 return self.__dict__ == other.__dict__ 

57 

58 def __repr__(self): 

59 return str(self.__dict__) 

60 

61 @property 

62 def name(self): 

63 """Name of the resource.""" 

64 return self._name 

65 

66 @property 

67 def units_str(self): 

68 """The string representing the units of a resource.""" 

69 return self._units_str 

70 

71 @property 

72 def qty(self): 

73 """ 

74 The number of units of the specified resource that are required for 

75 execution of the operator. 

76 """ 

77 return self._qty 

78 

79 def to_dict(self): 

80 return { 

81 "name": self.name, 

82 "qty": self.qty, 

83 "units_str": self.units_str, 

84 } 

85 

86 

87class CpuResource(Resource): 

88 """Represents a CPU requirement in an execution environment for an operator.""" 

89 

90 def __init__(self, qty): 

91 super().__init__("CPU", "core(s)", qty) 

92 

93 

94class RamResource(Resource): 

95 """Represents a RAM requirement in an execution environment for an operator.""" 

96 

97 def __init__(self, qty): 

98 super().__init__("RAM", "MB", qty) 

99 

100 

101class DiskResource(Resource): 

102 """Represents a disk requirement in an execution environment for an operator.""" 

103 

104 def __init__(self, qty): 

105 super().__init__("Disk", "MB", qty) 

106 

107 

108class GpuResource(Resource): 

109 """Represents a GPU requirement in an execution environment for an operator.""" 

110 

111 def __init__(self, qty): 

112 super().__init__("GPU", "gpu(s)", qty) 

113 

114 

115class Resources: 

116 """ 

117 The resources required by an operator. Resources that are not specified will use the 

118 default values from the airflow config. 

119 

120 :param cpus: The number of cpu cores that are required 

121 :param ram: The amount of RAM required 

122 :param disk: The amount of disk space required 

123 :param gpus: The number of gpu units that are required 

124 """ 

125 

126 def __init__( 

127 self, 

128 cpus=conf.getint("operators", "default_cpus"), 

129 ram=conf.getint("operators", "default_ram"), 

130 disk=conf.getint("operators", "default_disk"), 

131 gpus=conf.getint("operators", "default_gpus"), 

132 ): 

133 self.cpus = CpuResource(cpus) 

134 self.ram = RamResource(ram) 

135 self.disk = DiskResource(disk) 

136 self.gpus = GpuResource(gpus) 

137 

138 def __eq__(self, other): 

139 if not isinstance(other, self.__class__): 

140 return NotImplemented 

141 return self.__dict__ == other.__dict__ 

142 

143 def __repr__(self): 

144 return str(self.__dict__) 

145 

146 def to_dict(self): 

147 return { 

148 "cpus": self.cpus.to_dict(), 

149 "ram": self.ram.to_dict(), 

150 "disk": self.disk.to_dict(), 

151 "gpus": self.gpus.to_dict(), 

152 } 

153 

154 @classmethod 

155 def from_dict(cls, resources_dict: dict): 

156 """Create resources from resources dict""" 

157 cpus = resources_dict["cpus"]["qty"] 

158 ram = resources_dict["ram"]["qty"] 

159 disk = resources_dict["disk"]["qty"] 

160 gpus = resources_dict["gpus"]["qty"] 

161 

162 return cls(cpus=cpus, ram=ram, disk=disk, gpus=gpus)