Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/urlextract/cachefile.py: 49%
106 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:11 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:11 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3"""
4cachefile.py - file with classes handling cached TLDs (e.g. downloads, updates)
6.. Licence MIT
7.. codeauthor:: Jan Lipovský <janlipovsky@gmail.com>, janlipovsky.cz
8.. contributors: https://github.com/lipoja/URLExtract/graphs/contributors
9"""
11import logging
12import os
13import tempfile
14import urllib.request
15from typing import Set, Iterable, Tuple, List, Union, NoReturn
17from datetime import datetime
18from urllib.error import URLError, HTTPError
20import idna # type: ignore
21import filelock
22from platformdirs import user_cache_dir
25class CacheFileError(Exception):
26 """Raised when some error occurred regarding file with cached TLDs."""
28 pass
31class CacheFile:
32 """Class for working with cached TLDs in file."""
34 # file name of cached list of TLDs downloaded from IANA
35 _CACHE_FILE_NAME = "tlds-alpha-by-domain.txt"
36 _DATA_DIR = "data"
38 # name used in appdir
39 _URLEXTRACT_NAME = "urlextract"
41 def __init__(self, cache_dir=None):
42 """
43 :param str cache_dir: base path for TLD cache, defaults to data dir
44 :raises: CacheFileError when cached file is not readable for user
45 """
47 self._logger = logging.getLogger(self._URLEXTRACT_NAME)
49 self._user_defined_cache_dir = cache_dir
50 self._default_cache_file = False
52 # full path for cached file with list of TLDs
53 self._tld_list_path = self._get_cache_file_path()
54 if not os.access(self._tld_list_path, os.F_OK):
55 self._logger.info(
56 "Cache file not found in '%s'. "
57 "Use URLExtract.update() to download newest version.",
58 self._tld_list_path,
59 )
60 self._logger.info(
61 "Using default list of TLDs provided in urlextract package."
62 )
63 self._tld_list_path = self._get_default_cache_file_path()
64 self._default_cache_file = True
66 def _get_default_cache_dir(self) -> str:
67 """
68 Returns default cache directory (data directory)
70 :raises: CacheFileError when default cached file does not is exist
71 :return: path to default cache directory
72 :rtype: str
73 """
75 return os.path.join(os.path.dirname(__file__), self._DATA_DIR)
77 def _get_default_cache_file_path(self) -> str:
78 """
79 Returns default cache file path
81 :return: default cache file path (to data directory)
82 :rtype: str
83 """
85 default_list_path = os.path.join(
86 self._get_default_cache_dir(), self._CACHE_FILE_NAME
87 )
89 if not os.access(default_list_path, os.F_OK):
90 raise CacheFileError(
91 "Default cache file does not exist " "'{}'!".format(default_list_path)
92 )
94 return default_list_path
96 def _get_writable_cache_dir(self) -> str:
97 """
98 Get writable cache directory with fallback to user's cache directory
99 and global temp directory
101 :raises: CacheFileError when cached directory is not writable for user
102 :return: path to cache directory
103 :rtype: str
104 """
105 dir_path_data = self._get_default_cache_dir()
107 if os.access(dir_path_data, os.W_OK):
108 self._default_cache_file = True
109 return dir_path_data
111 dir_path_user = user_cache_dir(self._URLEXTRACT_NAME)
112 if not os.path.exists(dir_path_user):
113 try:
114 os.makedirs(dir_path_user, exist_ok=True)
115 except PermissionError:
116 # if PermissionError exception is raised we should continue
117 # and try to set the last fallback dir
118 pass
120 if os.access(dir_path_user, os.W_OK):
121 return dir_path_user
123 dir_path_temp = tempfile.gettempdir()
124 if os.access(dir_path_temp, os.W_OK):
125 return dir_path_temp
127 raise CacheFileError("Cache directories are not writable.")
129 def _get_cache_file_path(self) -> str:
130 """
131 Get path for cache file
133 :raises: CacheFileError when cached directory is not writable for user
134 :return: Full path to cached file with TLDs
135 :rtype: str
136 """
137 if self._user_defined_cache_dir is None:
138 # Tries to get writable cache dir with fallback to users data dir
139 # and temp directory
140 cache_dir = self._get_writable_cache_dir()
141 else:
142 cache_dir = self._user_defined_cache_dir
143 if not os.access(self._user_defined_cache_dir, os.W_OK):
144 raise CacheFileError(
145 "Cache directory {} is not writable.".format(
146 self._user_defined_cache_dir
147 )
148 )
150 # get path for cached file
151 return os.path.join(cache_dir, self._CACHE_FILE_NAME)
153 def _get_cache_lock_file_path(self) -> str:
154 """
155 Get path for cache file lock
157 :raises: CacheFileError when cached directory is not writable for user
158 :return: Full path to cached file lock
159 :rtype: str
160 """
161 return self._get_cache_file_path() + ".lock"
163 def _download_tlds_list(self) -> bool:
164 """
165 Function downloads list of TLDs from IANA.
166 LINK: https://data.iana.org/TLD/tlds-alpha-by-domain.txt
168 :return: True if list was downloaded, False in case of an error
169 :rtype: bool
170 """
171 url_list = "https://data.iana.org/TLD/tlds-alpha-by-domain.txt"
173 # Default cache file exist (set by _default_cache_file)
174 # and we want to write permission
175 if self._default_cache_file and not os.access(self._tld_list_path, os.W_OK):
176 self._logger.info("Default cache file is not writable.")
177 self._tld_list_path = self._get_cache_file_path()
178 self._logger.info("Changed path of cache file to: %s", self._tld_list_path)
180 if (
181 os.path.exists(self._tld_list_path)
182 and os.access(self._tld_list_path, os.F_OK)
183 and not os.access(self._tld_list_path, os.W_OK)
184 ):
185 self._logger.error(
186 "ERROR: Cache file is not writable for current "
187 "user. ({})".format(self._tld_list_path)
188 )
189 return False
191 req = urllib.request.Request(url_list)
192 req.add_header(
193 "User-Agent",
194 "Mozilla/5.0 (Windows NT 6.0; "
195 "WOW64; rv:24.0) Gecko/20100101 "
196 "Firefox/24.0",
197 )
198 try:
199 with urllib.request.urlopen(req) as f:
200 page = f.read().decode("utf-8")
201 except HTTPError as e:
202 self._logger.error(
203 "ERROR: Can not download list of TLDs. "
204 "(HTTPError: {})".format(e.reason)
205 )
206 return False
207 except URLError as e:
208 self._logger.error(
209 "ERROR: Can not download list of TLDs. "
210 "(URLError: {})".format(e.reason)
211 )
212 return False
214 with filelock.FileLock(self._get_cache_lock_file_path()):
215 with open(self._tld_list_path, "w") as ftld:
216 ftld.write(page)
218 return True
220 def _load_cached_tlds(self) -> Set[str]:
221 """
222 Loads TLDs from cached file to set.
224 :return: Set of current TLDs
225 :rtype: set
226 """
228 # check if cached file is readable
229 if not os.access(self._tld_list_path, os.R_OK):
230 self._logger.error(
231 "Cached file is not readable for current "
232 "user. ({})".format(self._tld_list_path)
233 )
234 raise CacheFileError("Cached file is not readable for current user.")
236 set_of_tlds: Set[str] = set()
238 with filelock.FileLock(self._get_cache_lock_file_path()):
239 with open(self._tld_list_path, "r") as f_cache_tld:
240 for line in f_cache_tld:
241 tld = line.strip().lower()
242 # skip empty lines
243 if not tld:
244 continue
245 # skip comments
246 if tld[0] == "#":
247 continue
249 set_of_tlds.add("." + tld)
250 set_of_tlds.add("." + idna.decode(tld))
252 return set_of_tlds
254 def _get_last_cachefile_modification(self) -> Union[datetime, None]:
255 """
256 Get last modification of cache file with TLDs.
258 :return: Date and time of last modification or
259 None when file does not exist
260 :rtype: datetime|None
261 """
263 try:
264 mtime = os.path.getmtime(self._tld_list_path)
265 except OSError:
266 return None
268 return datetime.fromtimestamp(mtime)