Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/urlextract/cachefile.py: 49%

106 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 07:11 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3""" 

4cachefile.py - file with classes handling cached TLDs (e.g. downloads, updates) 

5 

6.. Licence MIT 

7.. codeauthor:: Jan Lipovský <janlipovsky@gmail.com>, janlipovsky.cz 

8.. contributors: https://github.com/lipoja/URLExtract/graphs/contributors 

9""" 

10 

11import logging 

12import os 

13import tempfile 

14import urllib.request 

15from typing import Set, Iterable, Tuple, List, Union, NoReturn 

16 

17from datetime import datetime 

18from urllib.error import URLError, HTTPError 

19 

20import idna # type: ignore 

21import filelock 

22from platformdirs import user_cache_dir 

23 

24 

25class CacheFileError(Exception): 

26 """Raised when some error occurred regarding file with cached TLDs.""" 

27 

28 pass 

29 

30 

31class CacheFile: 

32 """Class for working with cached TLDs in file.""" 

33 

34 # file name of cached list of TLDs downloaded from IANA 

35 _CACHE_FILE_NAME = "tlds-alpha-by-domain.txt" 

36 _DATA_DIR = "data" 

37 

38 # name used in appdir 

39 _URLEXTRACT_NAME = "urlextract" 

40 

41 def __init__(self, cache_dir=None): 

42 """ 

43 :param str cache_dir: base path for TLD cache, defaults to data dir 

44 :raises: CacheFileError when cached file is not readable for user 

45 """ 

46 

47 self._logger = logging.getLogger(self._URLEXTRACT_NAME) 

48 

49 self._user_defined_cache_dir = cache_dir 

50 self._default_cache_file = False 

51 

52 # full path for cached file with list of TLDs 

53 self._tld_list_path = self._get_cache_file_path() 

54 if not os.access(self._tld_list_path, os.F_OK): 

55 self._logger.info( 

56 "Cache file not found in '%s'. " 

57 "Use URLExtract.update() to download newest version.", 

58 self._tld_list_path, 

59 ) 

60 self._logger.info( 

61 "Using default list of TLDs provided in urlextract package." 

62 ) 

63 self._tld_list_path = self._get_default_cache_file_path() 

64 self._default_cache_file = True 

65 

66 def _get_default_cache_dir(self) -> str: 

67 """ 

68 Returns default cache directory (data directory) 

69 

70 :raises: CacheFileError when default cached file does not is exist 

71 :return: path to default cache directory 

72 :rtype: str 

73 """ 

74 

75 return os.path.join(os.path.dirname(__file__), self._DATA_DIR) 

76 

77 def _get_default_cache_file_path(self) -> str: 

78 """ 

79 Returns default cache file path 

80 

81 :return: default cache file path (to data directory) 

82 :rtype: str 

83 """ 

84 

85 default_list_path = os.path.join( 

86 self._get_default_cache_dir(), self._CACHE_FILE_NAME 

87 ) 

88 

89 if not os.access(default_list_path, os.F_OK): 

90 raise CacheFileError( 

91 "Default cache file does not exist " "'{}'!".format(default_list_path) 

92 ) 

93 

94 return default_list_path 

95 

96 def _get_writable_cache_dir(self) -> str: 

97 """ 

98 Get writable cache directory with fallback to user's cache directory 

99 and global temp directory 

100 

101 :raises: CacheFileError when cached directory is not writable for user 

102 :return: path to cache directory 

103 :rtype: str 

104 """ 

105 dir_path_data = self._get_default_cache_dir() 

106 

107 if os.access(dir_path_data, os.W_OK): 

108 self._default_cache_file = True 

109 return dir_path_data 

110 

111 dir_path_user = user_cache_dir(self._URLEXTRACT_NAME) 

112 if not os.path.exists(dir_path_user): 

113 try: 

114 os.makedirs(dir_path_user, exist_ok=True) 

115 except PermissionError: 

116 # if PermissionError exception is raised we should continue 

117 # and try to set the last fallback dir 

118 pass 

119 

120 if os.access(dir_path_user, os.W_OK): 

121 return dir_path_user 

122 

123 dir_path_temp = tempfile.gettempdir() 

124 if os.access(dir_path_temp, os.W_OK): 

125 return dir_path_temp 

126 

127 raise CacheFileError("Cache directories are not writable.") 

128 

129 def _get_cache_file_path(self) -> str: 

130 """ 

131 Get path for cache file 

132 

133 :raises: CacheFileError when cached directory is not writable for user 

134 :return: Full path to cached file with TLDs 

135 :rtype: str 

136 """ 

137 if self._user_defined_cache_dir is None: 

138 # Tries to get writable cache dir with fallback to users data dir 

139 # and temp directory 

140 cache_dir = self._get_writable_cache_dir() 

141 else: 

142 cache_dir = self._user_defined_cache_dir 

143 if not os.access(self._user_defined_cache_dir, os.W_OK): 

144 raise CacheFileError( 

145 "Cache directory {} is not writable.".format( 

146 self._user_defined_cache_dir 

147 ) 

148 ) 

149 

150 # get path for cached file 

151 return os.path.join(cache_dir, self._CACHE_FILE_NAME) 

152 

153 def _get_cache_lock_file_path(self) -> str: 

154 """ 

155 Get path for cache file lock 

156 

157 :raises: CacheFileError when cached directory is not writable for user 

158 :return: Full path to cached file lock 

159 :rtype: str 

160 """ 

161 return self._get_cache_file_path() + ".lock" 

162 

163 def _download_tlds_list(self) -> bool: 

164 """ 

165 Function downloads list of TLDs from IANA. 

166 LINK: https://data.iana.org/TLD/tlds-alpha-by-domain.txt 

167 

168 :return: True if list was downloaded, False in case of an error 

169 :rtype: bool 

170 """ 

171 url_list = "https://data.iana.org/TLD/tlds-alpha-by-domain.txt" 

172 

173 # Default cache file exist (set by _default_cache_file) 

174 # and we want to write permission 

175 if self._default_cache_file and not os.access(self._tld_list_path, os.W_OK): 

176 self._logger.info("Default cache file is not writable.") 

177 self._tld_list_path = self._get_cache_file_path() 

178 self._logger.info("Changed path of cache file to: %s", self._tld_list_path) 

179 

180 if ( 

181 os.path.exists(self._tld_list_path) 

182 and os.access(self._tld_list_path, os.F_OK) 

183 and not os.access(self._tld_list_path, os.W_OK) 

184 ): 

185 self._logger.error( 

186 "ERROR: Cache file is not writable for current " 

187 "user. ({})".format(self._tld_list_path) 

188 ) 

189 return False 

190 

191 req = urllib.request.Request(url_list) 

192 req.add_header( 

193 "User-Agent", 

194 "Mozilla/5.0 (Windows NT 6.0; " 

195 "WOW64; rv:24.0) Gecko/20100101 " 

196 "Firefox/24.0", 

197 ) 

198 try: 

199 with urllib.request.urlopen(req) as f: 

200 page = f.read().decode("utf-8") 

201 except HTTPError as e: 

202 self._logger.error( 

203 "ERROR: Can not download list of TLDs. " 

204 "(HTTPError: {})".format(e.reason) 

205 ) 

206 return False 

207 except URLError as e: 

208 self._logger.error( 

209 "ERROR: Can not download list of TLDs. " 

210 "(URLError: {})".format(e.reason) 

211 ) 

212 return False 

213 

214 with filelock.FileLock(self._get_cache_lock_file_path()): 

215 with open(self._tld_list_path, "w") as ftld: 

216 ftld.write(page) 

217 

218 return True 

219 

220 def _load_cached_tlds(self) -> Set[str]: 

221 """ 

222 Loads TLDs from cached file to set. 

223 

224 :return: Set of current TLDs 

225 :rtype: set 

226 """ 

227 

228 # check if cached file is readable 

229 if not os.access(self._tld_list_path, os.R_OK): 

230 self._logger.error( 

231 "Cached file is not readable for current " 

232 "user. ({})".format(self._tld_list_path) 

233 ) 

234 raise CacheFileError("Cached file is not readable for current user.") 

235 

236 set_of_tlds: Set[str] = set() 

237 

238 with filelock.FileLock(self._get_cache_lock_file_path()): 

239 with open(self._tld_list_path, "r") as f_cache_tld: 

240 for line in f_cache_tld: 

241 tld = line.strip().lower() 

242 # skip empty lines 

243 if not tld: 

244 continue 

245 # skip comments 

246 if tld[0] == "#": 

247 continue 

248 

249 set_of_tlds.add("." + tld) 

250 set_of_tlds.add("." + idna.decode(tld)) 

251 

252 return set_of_tlds 

253 

254 def _get_last_cachefile_modification(self) -> Union[datetime, None]: 

255 """ 

256 Get last modification of cache file with TLDs. 

257 

258 :return: Date and time of last modification or 

259 None when file does not exist 

260 :rtype: datetime|None 

261 """ 

262 

263 try: 

264 mtime = os.path.getmtime(self._tld_list_path) 

265 except OSError: 

266 return None 

267 

268 return datetime.fromtimestamp(mtime)