Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/build/lib/airflow/utils/operator_resources.py: 52%
65 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
20from airflow.configuration import conf
21from airflow.exceptions import AirflowException
23# Constants for resources (megabytes are the base unit)
24MB = 1
25GB = 1024 * MB
26TB = 1024 * GB
27PB = 1024 * TB
28EB = 1024 * PB
31class Resource:
32 """
33 Represents a resource requirement in an execution environment for an operator.
35 :param name: Name of the resource
36 :param units_str: The string representing the units of a resource (e.g. MB for a CPU
37 resource) to be used for display purposes
38 :param qty: The number of units of the specified resource that are required for
39 execution of the operator.
40 """
42 def __init__(self, name, units_str, qty):
43 if qty < 0:
44 raise AirflowException(
45 f"Received resource quantity {qty} for resource {name}, "
46 f"but resource quantity must be non-negative."
47 )
49 self._name = name
50 self._units_str = units_str
51 self._qty = qty
53 def __eq__(self, other):
54 if not isinstance(other, self.__class__):
55 return NotImplemented
56 return self.__dict__ == other.__dict__
58 def __repr__(self):
59 return str(self.__dict__)
61 @property
62 def name(self):
63 """Name of the resource."""
64 return self._name
66 @property
67 def units_str(self):
68 """The string representing the units of a resource."""
69 return self._units_str
71 @property
72 def qty(self):
73 """
74 The number of units of the specified resource that are required for
75 execution of the operator.
76 """
77 return self._qty
79 def to_dict(self):
80 return {
81 "name": self.name,
82 "qty": self.qty,
83 "units_str": self.units_str,
84 }
87class CpuResource(Resource):
88 """Represents a CPU requirement in an execution environment for an operator."""
90 def __init__(self, qty):
91 super().__init__("CPU", "core(s)", qty)
94class RamResource(Resource):
95 """Represents a RAM requirement in an execution environment for an operator."""
97 def __init__(self, qty):
98 super().__init__("RAM", "MB", qty)
101class DiskResource(Resource):
102 """Represents a disk requirement in an execution environment for an operator."""
104 def __init__(self, qty):
105 super().__init__("Disk", "MB", qty)
108class GpuResource(Resource):
109 """Represents a GPU requirement in an execution environment for an operator."""
111 def __init__(self, qty):
112 super().__init__("GPU", "gpu(s)", qty)
115class Resources:
116 """
117 The resources required by an operator. Resources that are not specified will use the
118 default values from the airflow config.
120 :param cpus: The number of cpu cores that are required
121 :param ram: The amount of RAM required
122 :param disk: The amount of disk space required
123 :param gpus: The number of gpu units that are required
124 """
126 def __init__(
127 self,
128 cpus=conf.getint("operators", "default_cpus"),
129 ram=conf.getint("operators", "default_ram"),
130 disk=conf.getint("operators", "default_disk"),
131 gpus=conf.getint("operators", "default_gpus"),
132 ):
133 self.cpus = CpuResource(cpus)
134 self.ram = RamResource(ram)
135 self.disk = DiskResource(disk)
136 self.gpus = GpuResource(gpus)
138 def __eq__(self, other):
139 if not isinstance(other, self.__class__):
140 return NotImplemented
141 return self.__dict__ == other.__dict__
143 def __repr__(self):
144 return str(self.__dict__)
146 def to_dict(self):
147 return {
148 "cpus": self.cpus.to_dict(),
149 "ram": self.ram.to_dict(),
150 "disk": self.disk.to_dict(),
151 "gpus": self.gpus.to_dict(),
152 }
154 @classmethod
155 def from_dict(cls, resources_dict: dict):
156 """Create resources from resources dict."""
157 cpus = resources_dict["cpus"]["qty"]
158 ram = resources_dict["ram"]["qty"]
159 disk = resources_dict["disk"]["qty"]
160 gpus = resources_dict["gpus"]["qty"]
162 return cls(cpus=cpus, ram=ram, disk=disk, gpus=gpus)