1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3"""
4cachefile.py - file with classes handling cached TLDs (e.g. downloads, updates)
5
6.. Licence MIT
7.. codeauthor:: Jan Lipovský <janlipovsky@gmail.com>, janlipovsky.cz
8.. contributors: https://github.com/lipoja/URLExtract/graphs/contributors
9"""
10
11import logging
12import os
13import tempfile
14import urllib.request
15from typing import Set, Iterable, Tuple, List, Union, NoReturn
16
17from datetime import datetime
18from urllib.error import URLError, HTTPError
19
20import idna # type: ignore
21import filelock
22from platformdirs import user_cache_dir
23
24
25class CacheFileError(Exception):
26 """Raised when some error occurred regarding file with cached TLDs."""
27
28 pass
29
30
31class CacheFile:
32 """Class for working with cached TLDs in file."""
33
34 # file name of cached list of TLDs downloaded from IANA
35 _CACHE_FILE_NAME = "tlds-alpha-by-domain.txt"
36 _DATA_DIR = "data"
37
38 # name used in appdir
39 _URLEXTRACT_NAME = "urlextract"
40
41 def __init__(self, cache_dir=None):
42 """
43 :param str cache_dir: base path for TLD cache, defaults to data dir
44 :raises: CacheFileError when cached file is not readable for user
45 """
46
47 self._logger = logging.getLogger(self._URLEXTRACT_NAME)
48
49 self._user_defined_cache_dir = cache_dir
50 self._default_cache_file = False
51
52 # full path for cached file with list of TLDs
53 self._tld_list_path = self._get_cache_file_path()
54 if not os.access(self._tld_list_path, os.F_OK):
55 self._logger.info(
56 "Cache file not found in '%s'. "
57 "Use URLExtract.update() to download newest version.",
58 self._tld_list_path,
59 )
60 self._logger.info(
61 "Using default list of TLDs provided in urlextract package."
62 )
63 self._tld_list_path = self._get_default_cache_file_path()
64 self._default_cache_file = True
65
66 def _get_default_cache_dir(self) -> str:
67 """
68 Returns default cache directory (data directory)
69
70 :raises: CacheFileError when default cached file does not is exist
71 :return: path to default cache directory
72 :rtype: str
73 """
74
75 return os.path.join(os.path.dirname(__file__), self._DATA_DIR)
76
77 def _get_default_cache_file_path(self) -> str:
78 """
79 Returns default cache file path
80
81 :return: default cache file path (to data directory)
82 :rtype: str
83 """
84
85 default_list_path = os.path.join(
86 self._get_default_cache_dir(), self._CACHE_FILE_NAME
87 )
88
89 if not os.access(default_list_path, os.F_OK):
90 raise CacheFileError(
91 "Default cache file does not exist " "'{}'!".format(default_list_path)
92 )
93
94 return default_list_path
95
96 def _get_writable_cache_dir(self) -> str:
97 """
98 Get writable cache directory with fallback to user's cache directory
99 and global temp directory
100
101 :raises: CacheFileError when cached directory is not writable for user
102 :return: path to cache directory
103 :rtype: str
104 """
105 dir_path_data = self._get_default_cache_dir()
106
107 if os.access(dir_path_data, os.W_OK):
108 self._default_cache_file = True
109 return dir_path_data
110
111 dir_path_user = user_cache_dir(self._URLEXTRACT_NAME)
112 if not os.path.exists(dir_path_user):
113 try:
114 os.makedirs(dir_path_user, exist_ok=True)
115 except PermissionError:
116 # if PermissionError exception is raised we should continue
117 # and try to set the last fallback dir
118 pass
119
120 if os.access(dir_path_user, os.W_OK):
121 return dir_path_user
122
123 dir_path_temp = tempfile.gettempdir()
124 if os.access(dir_path_temp, os.W_OK):
125 return dir_path_temp
126
127 raise CacheFileError("Cache directories are not writable.")
128
129 def _get_cache_file_path(self) -> str:
130 """
131 Get path for cache file
132
133 :raises: CacheFileError when cached directory is not writable for user
134 :return: Full path to cached file with TLDs
135 :rtype: str
136 """
137 if self._user_defined_cache_dir is None:
138 # Tries to get writable cache dir with fallback to users data dir
139 # and temp directory
140 cache_dir = self._get_writable_cache_dir()
141 else:
142 cache_dir = self._user_defined_cache_dir
143 if not os.access(self._user_defined_cache_dir, os.W_OK):
144 raise CacheFileError(
145 "Cache directory {} is not writable.".format(
146 self._user_defined_cache_dir
147 )
148 )
149
150 # get path for cached file
151 return os.path.join(cache_dir, self._CACHE_FILE_NAME)
152
153 def _get_cache_lock_file_path(self) -> str:
154 """
155 Get path for cache file lock
156
157 :raises: CacheFileError when cached directory is not writable for user
158 :return: Full path to cached file lock
159 :rtype: str
160 """
161 return self._get_cache_file_path() + ".lock"
162
163 def _download_tlds_list(self) -> bool:
164 """
165 Function downloads list of TLDs from IANA.
166 LINK: https://data.iana.org/TLD/tlds-alpha-by-domain.txt
167
168 :return: True if list was downloaded, False in case of an error
169 :rtype: bool
170 """
171 url_list = "https://data.iana.org/TLD/tlds-alpha-by-domain.txt"
172
173 # Default cache file exist (set by _default_cache_file)
174 # and we want to write permission
175 if self._default_cache_file and not os.access(self._tld_list_path, os.W_OK):
176 self._logger.info("Default cache file is not writable.")
177 self._tld_list_path = self._get_cache_file_path()
178 self._logger.info("Changed path of cache file to: %s", self._tld_list_path)
179
180 if (
181 os.path.exists(self._tld_list_path)
182 and os.access(self._tld_list_path, os.F_OK)
183 and not os.access(self._tld_list_path, os.W_OK)
184 ):
185 self._logger.error(
186 "ERROR: Cache file is not writable for current "
187 "user. ({})".format(self._tld_list_path)
188 )
189 return False
190
191 req = urllib.request.Request(url_list)
192 req.add_header(
193 "User-Agent",
194 "Mozilla/5.0 (Windows NT 6.0; "
195 "WOW64; rv:24.0) Gecko/20100101 "
196 "Firefox/24.0",
197 )
198 try:
199 with urllib.request.urlopen(req) as f:
200 page = f.read().decode("utf-8")
201 except HTTPError as e:
202 self._logger.error(
203 "ERROR: Can not download list of TLDs. "
204 "(HTTPError: {})".format(e.reason)
205 )
206 return False
207 except URLError as e:
208 self._logger.error(
209 "ERROR: Can not download list of TLDs. "
210 "(URLError: {})".format(e.reason)
211 )
212 return False
213
214 with filelock.FileLock(self._get_cache_lock_file_path()):
215 with open(self._tld_list_path, "w") as ftld:
216 ftld.write(page)
217
218 return True
219
220 def _load_cached_tlds(self) -> Set[str]:
221 """
222 Loads TLDs from cached file to set.
223
224 :return: Set of current TLDs
225 :rtype: set
226 """
227
228 # check if cached file is readable
229 if not os.access(self._tld_list_path, os.R_OK):
230 self._logger.error(
231 "Cached file is not readable for current "
232 "user. ({})".format(self._tld_list_path)
233 )
234 raise CacheFileError("Cached file is not readable for current user.")
235
236 set_of_tlds: Set[str] = set()
237
238 with filelock.FileLock(self._get_cache_lock_file_path()):
239 with open(self._tld_list_path, "r") as f_cache_tld:
240 for line in f_cache_tld:
241 tld = line.strip().lower()
242 # skip empty lines
243 if not tld:
244 continue
245 # skip comments
246 if tld[0] == "#":
247 continue
248
249 set_of_tlds.add("." + tld)
250 set_of_tlds.add("." + idna.decode(tld))
251
252 return set_of_tlds
253
254 def _get_last_cachefile_modification(self) -> Union[datetime, None]:
255 """
256 Get last modification of cache file with TLDs.
257
258 :return: Date and time of last modification or
259 None when file does not exist
260 :rtype: datetime|None
261 """
262
263 try:
264 mtime = os.path.getmtime(self._tld_list_path)
265 except OSError:
266 return None
267
268 return datetime.fromtimestamp(mtime)