Coverage for /pythoncovmergedfiles/medio/medio/src/airflow/airflow/hooks/package_index.py: 45%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18"""Hook for additional Package Indexes (Python)."""
20from __future__ import annotations
22import subprocess
23from typing import Any
24from urllib.parse import quote, urlparse
26from airflow.hooks.base import BaseHook
29class PackageIndexHook(BaseHook):
30 """Specify package indexes/Python package sources using Airflow connections."""
32 conn_name_attr = "pi_conn_id"
33 default_conn_name = "package_index_default"
34 conn_type = "package_index"
35 hook_name = "Package Index (Python)"
37 def __init__(self, pi_conn_id: str = default_conn_name, **kwargs) -> None:
38 super().__init__(**kwargs)
39 self.pi_conn_id = pi_conn_id
40 self.conn = None
42 @staticmethod
43 def get_ui_field_behaviour() -> dict[str, Any]:
44 """Return custom field behaviour."""
45 return {
46 "hidden_fields": ["schema", "port", "extra"],
47 "relabeling": {"host": "Package Index URL"},
48 "placeholders": {
49 "host": "Example: https://my-package-mirror.net/pypi/repo-name/simple",
50 "login": "Username for package index",
51 "password": "Password for package index (will be masked)",
52 },
53 }
55 @staticmethod
56 def _get_basic_auth_conn_url(index_url: str, user: str | None, password: str | None) -> str:
57 """Return a connection URL with basic auth credentials based on connection config."""
58 url = urlparse(index_url)
59 host = url.netloc.split("@")[-1]
60 if user:
61 if password:
62 host = f"{quote(user)}:{quote(password)}@{host}"
63 else:
64 host = f"{quote(user)}@{host}"
65 return url._replace(netloc=host).geturl()
67 def get_conn(self) -> Any:
68 """Return connection for the hook."""
69 return self.get_connection_url()
71 def get_connection_url(self) -> Any:
72 """Return a connection URL with embedded credentials."""
73 conn = self.get_connection(self.pi_conn_id)
74 index_url = conn.host
75 if not index_url:
76 raise ValueError("Please provide an index URL.")
77 return self._get_basic_auth_conn_url(index_url, conn.login, conn.password)
79 def test_connection(self) -> tuple[bool, str]:
80 """Test connection to package index url."""
81 conn_url = self.get_connection_url()
82 proc = subprocess.run(
83 ["pip", "search", "not-existing-test-package", "--no-input", "--index", conn_url],
84 check=False,
85 capture_output=True,
86 )
87 conn = self.get_connection(self.pi_conn_id)
88 if proc.returncode not in [
89 0, # executed successfully, found package
90 23, # executed successfully, didn't find any packages
91 # (but we do not expect it to find 'not-existing-test-package')
92 ]:
93 return False, f"Connection test to {conn.host} failed. Error: {str(proc.stderr)}"
95 return True, f"Connection to {conn.host} tested successfully!"