1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18from __future__ import annotations
19
20import functools
21import logging
22import pkgutil
23import sys
24from collections import defaultdict
25from collections.abc import Callable, Iterator
26from importlib import import_module
27from typing import TYPE_CHECKING
28
29from .file_discovery import (
30 find_path_from_directory as find_path_from_directory,
31)
32
33if sys.version_info >= (3, 12):
34 from importlib import metadata
35else:
36 import importlib_metadata as metadata
37
38log = logging.getLogger(__name__)
39
40EPnD = tuple[metadata.EntryPoint, metadata.Distribution]
41
42if TYPE_CHECKING:
43 from types import ModuleType
44
45
46def import_string(dotted_path: str):
47 """
48 Import a dotted module path and return the attribute/class designated by the last name in the path.
49
50 Raise ImportError if the import failed.
51 """
52 # TODO: Add support for nested classes. Currently, it only works for top-level classes.
53 try:
54 module_path, class_name = dotted_path.rsplit(".", 1)
55 except ValueError:
56 raise ImportError(f"{dotted_path} doesn't look like a module path")
57
58 module = import_module(module_path)
59
60 try:
61 return getattr(module, class_name)
62 except AttributeError:
63 raise ImportError(f'Module "{module_path}" does not define a "{class_name}" attribute/class')
64
65
66def qualname(o: object | Callable, use_qualname: bool = False, exclude_module: bool = False) -> str:
67 """
68 Convert an attribute/class/callable to a string.
69
70 By default, returns a string importable by ``import_string`` (includes module path).
71 With exclude_module=True, returns only the qualified name without module prefix,
72 useful for stable identification across deployments where module paths may vary.
73 """
74 if callable(o) and hasattr(o, "__module__"):
75 if exclude_module:
76 if hasattr(o, "__qualname__"):
77 return o.__qualname__
78 if hasattr(o, "__name__"):
79 return o.__name__
80 # Handle functools.partial objects specifically (not just any object with 'func' attr)
81 if isinstance(o, functools.partial):
82 return qualname(o.func, exclude_module=True)
83 return type(o).__qualname__
84 if use_qualname and hasattr(o, "__qualname__"):
85 return f"{o.__module__}.{o.__qualname__}"
86 if hasattr(o, "__name__"):
87 return f"{o.__module__}.{o.__name__}"
88
89 cls = o
90
91 if not isinstance(cls, type): # instance or class
92 cls = type(cls)
93
94 name = cls.__qualname__
95 module = cls.__module__
96
97 if exclude_module:
98 return name
99
100 if module and module != "__builtin__":
101 return f"{module}.{name}"
102
103 return name
104
105
106def iter_namespace(ns: ModuleType):
107 return pkgutil.iter_modules(ns.__path__, ns.__name__ + ".")
108
109
110def is_valid_dotpath(path: str) -> bool:
111 """
112 Check if a string follows valid dotpath format (ie: 'package.subpackage.module').
113
114 :param path: String to check
115 """
116 import re
117
118 if not isinstance(path, str):
119 return False
120
121 # Pattern explanation:
122 # ^ - Start of string
123 # [a-zA-Z_] - Must start with letter or underscore
124 # [a-zA-Z0-9_] - Following chars can be letters, numbers, or underscores
125 # (\.[a-zA-Z_][a-zA-Z0-9_]*)* - Can be followed by dots and valid identifiers
126 # $ - End of string
127 pattern = r"^[a-zA-Z_][a-zA-Z0-9_]*(\.[a-zA-Z_][a-zA-Z0-9_]*)*$"
128
129 return bool(re.match(pattern, path))
130
131
132@functools.cache
133def _get_grouped_entry_points() -> dict[str, list[EPnD]]:
134 mapping: dict[str, list[EPnD]] = defaultdict(list)
135 for dist in metadata.distributions():
136 try:
137 for e in dist.entry_points:
138 mapping[e.group].append((e, dist))
139 except Exception as e:
140 log.warning("Error when retrieving package metadata (skipping it): %s, %s", dist, e)
141 return mapping
142
143
144def entry_points_with_dist(group: str) -> Iterator[EPnD]:
145 """
146 Retrieve entry points of the given group.
147
148 This is like the ``entry_points()`` function from ``importlib.metadata``,
149 except it also returns the distribution the entry point was loaded from.
150
151 Note that this may return multiple distributions to the same package if they
152 are loaded from different ``sys.path`` entries. The caller site should
153 implement appropriate deduplication logic if needed.
154
155 :param group: Filter results to only this entrypoint group
156 :return: Generator of (EntryPoint, Distribution) objects for the specified groups
157 """
158 return iter(_get_grouped_entry_points()[group])