Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/urlextract/urlextract_core.py: 66%
447 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:11 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:11 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3"""
4urlextract_core.py - file with definition of URLExtract class and urlextract cli
6.. Created on 2016-07-29
7.. Licence MIT
8.. codeauthor:: Jan Lipovský <janlipovsky@gmail.com>, janlipovsky.cz
9.. contributors: https://github.com/lipoja/URLExtract/graphs/contributors
10"""
11from argparse import Namespace
12import functools
13import ipaddress
14import logging
15import re
16import socket
17from typing import Set, Iterable, Tuple, List, Union, NoReturn, Generator
18import string
19import sys
20from collections import OrderedDict
21from datetime import datetime, timedelta
23import uritools # type: ignore
25from urlextract.cachefile import CacheFile, CacheFileError
27# version of URLExtract (do not forget to change it in setup.py as well)
28__version__ = "1.8.0"
30# default value for maximum count of processed URLs by find_url
31DEFAULT_LIMIT = 10000
34class URLExtract(CacheFile):
35 """
36 Class for finding and extracting URLs from given string.
38 **Examples:**
40 .. code-block:: python
42 from urlextract import URLExtract
44 extractor = URLExtract()
45 urls = extractor.find_urls("Let's have URL example.com example.")
46 print(urls) # prints: ['example.com']
48 # Another way is to get a generator over found URLs in text:
49 for url in extractor.gen_urls(example_text):
50 print(url) # prints: ['example.com']
52 # Or if you want to just check if there is at least one URL in text:
53 if extractor.has_urls(example_text):
54 print("Given text contains some URL")
55 """
57 # compiled regexp for naive validation of host name
58 _hostname_re = re.compile(r"^([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])$")
60 # list of enclosure of URL that should be removed
61 _enclosure = {
62 ("(", ")"),
63 ("{", "}"),
64 ("[", "]"),
65 ('"', '"'),
66 ("\\", "\\"),
67 ("'", "'"),
68 ("`", "`"),
69 }
71 _ipv4_tld = [".{}".format(ip) for ip in reversed(range(256))]
72 _ignore_list: Set[str] = set()
73 _permit_list: Set[str] = set()
75 _limit = DEFAULT_LIMIT
77 def __init__(
78 self,
79 extract_email=False,
80 cache_dns=True,
81 extract_localhost=True,
82 limit=DEFAULT_LIMIT,
83 allow_mixed_case_hostname=True,
84 **kwargs, # noqa E999
85 ):
86 """
87 Initialize function for URLExtract class.
88 Tries to get cached TLDs, if cached file does not exist it will try
89 to download new list from IANA and save it to cache file.
91 :param bool extract_email: True if we want to extract email from text.
92 Disabled by default
93 :param bool cache_dns: True replaces socket DNS lookup with caching
94 equivalent provided by dnspython.
95 Enabled by default
96 :param bool extract_localhost: True if we want to extract 'localhost'
97 as URL from text.
98 Enabled by default
99 :param int limit: maximum count of processed URLs by find_url function
100 default value defined as global variable DEFAULT_LIMIT
101 :param bool allow_mixed_case_hostname: True if hostname can contain mixed case letters
102 (upper-case and lower-case).
103 Disabled by default
104 """
105 super(URLExtract, self).__init__(**kwargs)
107 self._tlds_re = None
108 self._extract_localhost = extract_localhost
109 self._extract_email = extract_email
110 self._cache_dns = cache_dns
111 self._limit = limit
112 self._allow_mixed_case_hostname = allow_mixed_case_hostname
113 self._reload_tlds_from_file()
115 # general stop characters
116 general_stop_chars = {'"', "<", ">", ";"}
117 # defining default stop chars left
118 self._stop_chars_left = set(string.whitespace)
119 self._stop_chars_left |= general_stop_chars | {"|", "=", "]", ")", "}"}
121 # default stop characters on left side from schema
122 self._stop_chars_left_from_schema = self._stop_chars_left.copy() | {":"}
124 # defining default stop chars left
125 self._stop_chars_right = set(string.whitespace)
126 self._stop_chars_right |= general_stop_chars
128 # characters that are allowed to be right after TLD
129 self._after_tld_chars = self._get_after_tld_chars()
131 def _get_after_tld_chars(self) -> Set[str]:
132 """Initialize after tld characters"""
133 after_tld_chars = set(string.whitespace)
134 after_tld_chars |= {"/", '"', "'", "<", ">", "?", ":", ".", ","}
135 # get left enclosure characters
136 _, right_enclosure = zip(*self._enclosure)
137 # add right enclosure characters to be valid after TLD
138 # for correct parsing of URL e.g. (example.com)
139 after_tld_chars |= set(right_enclosure)
141 return after_tld_chars
143 def _reload_tlds_from_file(self):
144 """
145 Reloads TLDs from file and compile regexp.
146 :raises: CacheFileError when cached file is not readable for user
147 """
149 tlds = sorted(self._load_cached_tlds(), key=len, reverse=True)
150 tlds += self._ipv4_tld
151 if self._extract_localhost:
152 tlds.append("localhost")
153 re_escaped = [re.escape(str(tld)) for tld in tlds]
154 self._tlds_re = re.compile("|".join(re_escaped), flags=re.IGNORECASE)
156 @property
157 def extract_email(self) -> bool:
158 """
159 If set to True email will be extracted from text
161 :rtype: bool
162 """
163 return self._extract_email
165 @extract_email.setter
166 def extract_email(self, extract: bool):
167 """
168 Set if emails will be extracted from text
170 :param bool extract: True if emails should be extracted False otherwise
171 """
172 self._extract_email = extract
174 @property
175 def allow_mixed_case_hostname(self) -> bool:
176 """
177 If set to True host should contain mixed case letters (upper-case and lower-case)
179 :rtype: bool
180 """
181 return self._allow_mixed_case_hostname
183 @allow_mixed_case_hostname.setter
184 def allow_mixed_case_hostname(self, allow_mixed_case: bool):
185 """
186 Set if mixed case hostnames are allowed
188 :param bool allow_mixed_case: True if we should allow mixed case hostnames False otherwise
189 """
190 self._allow_mixed_case_hostname = allow_mixed_case
192 @property
193 def extract_localhost(self) -> bool:
194 """
195 If set to True 'localhost' will be extracted as URL from text
197 :rtype: bool
198 """
199 return self._extract_localhost
201 @extract_localhost.setter
202 def extract_localhost(self, enable: bool):
203 """
204 Set if 'localhost' will be extracted as URL from text
206 :param bool enable: True if 'localhost' should be extracted
207 False otherwise
208 """
209 self._extract_localhost = enable
211 @property
212 def ignore_list(self) -> Set[str]:
213 """
214 Set of URLs to be ignored (not returned) while extracting from text
216 :return: Returns set of ignored URLs
217 :rtype: set(str)
218 """
219 return self._ignore_list
221 @ignore_list.setter
222 def ignore_list(self, ignore_list: Set[str]):
223 """
224 Set of URLs to be ignored (not returned) while extracting from text
226 :param set(str) ignore_list: set of URLs
227 """
228 self._ignore_list = ignore_list
230 def load_ignore_list(self, file_name):
231 """
232 Load URLs from file into ignore list
234 :param str file_name: path to file containing URLs
235 """
236 with open(file_name) as f:
237 for line in f:
238 url = line.strip()
239 if not url:
240 continue
241 self._ignore_list.add(url)
243 @property
244 def permit_list(self):
245 """
246 Set of URLs that can be processed
248 :return: Returns set of URLs that can be processed
249 :rtype: set(str)
250 """
251 return self._permit_list
253 @permit_list.setter
254 def permit_list(self, permit_list):
255 """
256 Set of URLs that can be processed
258 :param set(str) permit_list: set of URLs
259 """
260 self._permit_list = permit_list
262 def load_permit_list(self, file_name):
263 """
264 Load URLs from file into permit list
266 :param str file_name: path to file containing URLs
267 """
268 with open(file_name) as f:
269 for line in f:
270 url = line.strip()
271 if not url:
272 continue
273 self._permit_list.add(url)
275 def update(self):
276 """
277 Update TLD list cache file.
279 :return: True if update was successful False otherwise
280 :rtype: bool
281 """
282 if not self._download_tlds_list():
283 return False
285 self._reload_tlds_from_file()
287 return True
289 def update_when_older(self, days: int) -> bool:
290 """
291 Update TLD list cache file if the list is older than
292 number of days given in parameter `days` or if it does not exist.
294 :param int days: number of days from last change
295 :return: True if update was successful, False otherwise
296 :rtype: bool
297 """
299 last_cache = self._get_last_cachefile_modification()
300 if last_cache is None:
301 return self.update()
303 time_to_update = last_cache + timedelta(days=days)
305 if datetime.now() >= time_to_update:
306 return self.update()
308 return True
310 @staticmethod
311 def get_version() -> str:
312 """
313 Returns version number.
315 :return: version number
316 :rtype: str
317 """
319 return __version__
321 def get_after_tld_chars(self) -> List[str]:
322 """
323 Returns list of chars that are allowed after TLD
325 :return: list of chars that are allowed after TLD
326 :rtype: list
327 """
329 return list(self._after_tld_chars)
331 def set_after_tld_chars(self, after_tld_chars: Iterable[str]):
332 """
333 Set chars that are allowed after TLD.
335 :param list after_tld_chars: list of characters
336 """
338 self._after_tld_chars = set(after_tld_chars)
340 def get_stop_chars_left(self) -> Set[str]:
341 """
342 Returns set of stop chars for text on left from TLD.
344 :return: set of stop chars
345 :rtype: set
346 """
347 return self._stop_chars_left
349 def set_stop_chars_left(self, stop_chars: Set[str]):
350 """
351 Set stop characters for text on left from TLD.
352 Stop characters are used when determining end of URL.
354 :param set stop_chars: set of characters
355 :raises: TypeError
356 """
357 if not isinstance(stop_chars, set):
358 raise TypeError(
359 "stop_chars should be type set "
360 "but {} was given".format(type(stop_chars))
361 )
363 self._stop_chars_left = stop_chars
365 def get_stop_chars_left_from_scheme(self) -> Set[str]:
366 """
367 Returns set of stop chars for text on left from scheme.
369 :return: set of stop chars
370 :rtype: set
371 """
372 return self._stop_chars_left_from_schema
374 def set_stop_chars_left_from_scheme(self, stop_chars: Set[str]):
375 """
376 Set stop characters for text on left from scheme.
377 Stop characters are used when determining end of URL.
379 :param set stop_chars: set of characters
380 :raises: TypeError
381 """
382 if not isinstance(stop_chars, set):
383 raise TypeError(
384 "stop_chars should be type set "
385 "but {} was given".format(type(stop_chars))
386 )
388 self._stop_chars_left_from_schema = stop_chars
390 def get_stop_chars_right(self) -> Set[str]:
391 """
392 Returns set of stop chars for text on right from TLD.
394 :return: set of stop chars
395 :rtype: set
396 """
397 return self._stop_chars_right
399 def set_stop_chars_right(self, stop_chars: Set[str]):
400 """
401 Set stop characters for text on right from TLD.
402 Stop characters are used when determining end of URL.
404 :param set stop_chars: set of characters
405 :raises: TypeError
406 """
407 if not isinstance(stop_chars, set):
408 raise TypeError(
409 "stop_chars should be type set "
410 "but {} was given".format(type(stop_chars))
411 )
413 self._stop_chars_right = stop_chars
415 def get_enclosures(self) -> Set[Tuple[str, str]]:
416 """
417 Returns set of enclosure pairs that might be used to enclosure URL.
418 For example brackets (example.com), [example.com], {example.com}
420 :return: set of tuple of enclosure characters
421 :rtype: set(tuple(str,str))
422 """
423 return self._enclosure
425 def add_enclosure(self, left_char: str, right_char: str):
426 """
427 Add new enclosure pair of characters. That and should be removed
428 when their presence is detected at beginning and end of found URL
430 :param str left_char: left character of enclosure pair - e.g. "("
431 :param str right_char: right character of enclosure pair - e.g. ")"
432 """
433 assert len(left_char) == 1, "Parameter left_char must be character not string"
434 assert len(right_char) == 1, "Parameter right_char must be character not string"
435 self._enclosure.add((left_char, right_char))
437 self._after_tld_chars = self._get_after_tld_chars()
439 def remove_enclosure(self, left_char: str, right_char: str):
440 """
441 Remove enclosure pair from set of enclosures.
443 :param str left_char: left character of enclosure pair - e.g. "("
444 :param str right_char: right character of enclosure pair - e.g. ")"
445 """
446 assert len(left_char) == 1, "Parameter left_char must be character not string"
447 assert len(right_char) == 1, "Parameter right_char must be character not string"
448 rm_enclosure = (left_char, right_char)
449 if rm_enclosure in self._enclosure:
450 self._enclosure.remove(rm_enclosure)
452 self._after_tld_chars = self._get_after_tld_chars()
454 def _complete_url(
455 self, text: str, tld_pos: int, tld: str, check_dns=False, with_schema_only=False
456 ) -> str:
457 """
458 Expand string in both sides to match whole URL.
460 :param str text: text where we want to find URL
461 :param int tld_pos: position of TLD
462 :param str tld: matched TLD which should be in text
463 :param bool check_dns: filter results to valid domains
464 :param bool with_schema_only: get domains with schema only
465 (e.g. https://janlipovsky.cz but not example.com)
466 :return: returns URL
467 :rtype: str
468 """
470 left_ok = True
471 right_ok = True
473 max_len = len(text) - 1
474 end_pos = tld_pos
475 start_pos = tld_pos
476 in_scheme = False
477 while left_ok or right_ok:
478 if left_ok:
479 if start_pos <= 0:
480 left_ok = False
481 else:
482 if (
483 in_scheme
484 and text[start_pos - 1] in self._stop_chars_left_from_schema
485 ):
486 left_ok = False
487 if left_ok and text[start_pos - 1] not in self._stop_chars_left:
488 start_pos -= 1
489 else:
490 left_ok = False
491 if right_ok:
492 if end_pos >= max_len:
493 right_ok = False
494 else:
495 if text[end_pos + 1] not in self._stop_chars_right:
496 end_pos += 1
497 else:
498 right_ok = False
500 if text[start_pos : start_pos + 3] == "://":
501 in_scheme = True
503 complete_url = text[start_pos : end_pos + 1].lstrip("/")
504 # remove last character from url
505 # when it is allowed character right after TLD (e.g. dot, comma)
506 temp_tlds = {tld + c for c in self._after_tld_chars}
507 # get only dot+tld+one_char and compare
508 extended_tld = complete_url[len(complete_url) - len(tld) - 1 :]
509 if extended_tld in temp_tlds:
510 # We do not want to change found URL
511 if not extended_tld.endswith("/"):
512 complete_url = complete_url[:-1]
514 complete_url = self._split_markdown(complete_url, tld_pos - start_pos)
515 complete_url = self._remove_enclosure_from_url(
516 complete_url, tld_pos - start_pos, tld
517 )
519 # search for enclosures before URL ignoring space character " "
520 # when URL contains right enclosure character (issue #77)
521 enclosure_map = {
522 left_char: right_char for left_char, right_char in self._enclosure
523 }
524 if any(
525 enclosure in complete_url[tld_pos - start_pos :]
526 for enclosure in enclosure_map.values()
527 ):
528 enclosure_space_char = True
529 enclosure_found = False
530 tmp_start_pos = start_pos
531 while enclosure_space_char:
532 if tmp_start_pos <= 0:
533 break
534 if text[tmp_start_pos - 1] == " ":
535 tmp_start_pos -= 1
536 elif text[tmp_start_pos - 1] in enclosure_map.keys():
537 tmp_start_pos -= 1
538 enclosure_found = True
539 else:
540 enclosure_space_char = False
542 if enclosure_found:
543 pre_url = text[tmp_start_pos:start_pos]
544 extended_complete_url = pre_url + complete_url
545 complete_url = self._remove_enclosure_from_url(
546 extended_complete_url, tld_pos - tmp_start_pos, tld
547 )
548 # URL should not start/end with whitespace
549 complete_url = complete_url.strip()
550 # URL should not start with two backslashes
551 if complete_url.startswith("//"):
552 complete_url = complete_url[2:]
553 # URL should not start with unreserved characters
554 if complete_url.startswith(("-", ".", "~", "_")):
555 complete_url = complete_url[1:]
556 if not self._is_domain_valid(
557 complete_url, tld, check_dns=check_dns, with_schema_only=with_schema_only
558 ):
559 return ""
561 return complete_url
563 def _validate_tld_match(self, text: str, matched_tld: str, tld_pos: int) -> bool:
564 """
565 Validate TLD match - tells if at found position is really TLD.
567 :param str text: text where we want to find URLs
568 :param str matched_tld: matched TLD
569 :param int tld_pos: position of matched TLD
570 :return: True if match is valid, False otherwise
571 :rtype: bool
572 """
573 if tld_pos > len(text):
574 return False
576 right_tld_pos = tld_pos + len(matched_tld)
577 if len(text) > right_tld_pos:
578 if text[right_tld_pos] in self._after_tld_chars:
579 if tld_pos > 0 and text[tld_pos - 1] not in self._stop_chars_left:
580 return True
581 else:
582 if tld_pos > 0 and text[tld_pos - 1] not in self._stop_chars_left:
583 return True
585 return False
587 def _is_domain_valid(
588 self, url: str, tld: str, check_dns=False, with_schema_only=False
589 ):
590 """
591 Checks if given URL has valid domain name (ignores subdomains)
593 :param str url: complete URL that we want to check
594 :param str tld: TLD that should be found at the end of URL (hostname)
595 :param bool check_dns: filter results to valid domains
596 :param bool with_schema_only: URL must contain schema (protocol)
597 to be considered valid
598 :return: True if URL is valid, False otherwise
599 :rtype: bool
601 >>> extractor = URLExtract()
602 >>> extractor._is_domain_valid("janlipovsky.cz", ".cz")
603 True
605 >>> extractor._is_domain_valid("https://janlipovsky.cz", ".cz")
606 True
608 >>> extractor._is_domain_valid("invalid.cz.", ".cz")
609 False
611 >>> extractor._is_domain_valid("invalid.cz,", ".cz")
612 False
614 >>> extractor._is_domain_valid("in.v_alid.cz", ".cz")
615 False
617 >>> extractor._is_domain_valid("-is.valid.cz", ".cz")
618 True
620 >>> extractor._is_domain_valid("not.valid-.cz", ".cz")
621 False
623 >>> extractor._is_domain_valid("http://blog/media/path.io.jpg", ".cz")
624 False
625 """
627 if not url:
628 return False
630 scheme_pos = url.find("://")
631 if scheme_pos == -1:
632 if with_schema_only:
633 return False
634 url = "http://" + url
635 added_schema = True
636 else:
637 added_schema = False
639 url_parts = uritools.urisplit(url)
640 # <scheme>://<authority>/<path>?<query>#<fragment>
642 # authority can't start with @
643 if url_parts.authority and url_parts.authority.startswith("@"):
644 return False
646 # if URI contains user info and schema was automatically added
647 # the url is probably an email
648 if url_parts.getuserinfo() and added_schema:
649 # do not collect emails
650 if not self._extract_email:
651 return False
652 else:
653 # if we want to extract email we have to be sure that it
654 # really is email -> given URL does not have other parts
655 if (
656 url_parts.getport()
657 or url_parts.getpath()
658 or url_parts.getquery()
659 or url_parts.getfragment()
660 ):
661 return False
663 try:
664 host = url_parts.gethost()
665 except ValueError:
666 self._logger.info(
667 "Invalid host '%s'. " "If the host is valid report a bug.", url
668 )
669 return False
671 if not host:
672 return False
674 if not self.allow_mixed_case_hostname:
675 # we have to take url_parts.host instead of host variable because url_parts.host is not normalized
676 return all(s.islower() for s in url_parts.host if s.isalpha()) or all(
677 s.isupper() for s in url_parts.host if s.isalpha()
678 )
680 if self._permit_list and host not in self._permit_list:
681 return False
683 if host in self._ignore_list:
684 return False
686 # IP address are valid hosts
687 is_ipv4 = isinstance(host, ipaddress.IPv4Address)
688 if is_ipv4:
689 return True
691 # when TLD is a number the host must be IP
692 if tld in self._ipv4_tld and not is_ipv4:
693 return False
695 host_parts = host.split(".")
697 if self._extract_localhost and host_parts == ["localhost"]:
698 return True
700 if len(host_parts) <= 1:
701 return False
703 host_tld = "." + host_parts[-1]
704 if host_tld.lower() != tld.lower():
705 return False
707 top = host_parts[-2]
709 if self._hostname_re.match(top) is None:
710 return False
712 if check_dns:
713 if self._cache_dns is True:
714 dns_cache_install()
715 self._cache_dns = False
717 try:
718 socket.gethostbyname(host)
719 except socket.herror as err:
720 if err.errno == 0:
721 self._logger.info(
722 "Unable to resolve address {}: {}".format(host, err)
723 )
724 else:
725 self._logger.info(err)
726 return False
727 except Exception as err:
728 self._logger.info(
729 "Unknown exception during gethostbyname({}) {!r}".format(host, err)
730 )
731 return False
733 return True
735 def _remove_enclosure_from_url(self, text_url: str, tld_pos: int, tld: str) -> str:
736 """
737 Removes enclosure characters from URL given in text_url.
738 For example: (example.com) -> example.com
740 :param str text_url: text with URL that we want to extract from
741 enclosure of two characters
742 :param int tld_pos: position of TLD in text_url
743 :param str tld: matched TLD which should be in text
744 :return: URL that has removed enclosure
745 :rtype: str
746 """
748 enclosure_map = {
749 left_char: right_char for left_char, right_char in self._enclosure
750 }
751 # get position of most right left_char of enclosure pairs
752 left_pos = max(
753 [
754 text_url.rfind(left_char, 0, tld_pos)
755 for left_char in enclosure_map.keys()
756 ]
757 )
758 left_char = text_url[left_pos] if left_pos >= 0 else ""
759 right_char = enclosure_map.get(left_char, "")
760 # get count of left and right enclosure characters and
761 left_char_count = text_url[: left_pos + 1].count(left_char)
762 right_char_count = text_url[left_pos:].count(right_char)
763 # we want to find only pairs and ignore rest (more occurrences)
764 min_count = min(left_char_count, right_char_count)
766 right_pos = len(text_url) + 1
767 # find position of Nth occurrence of right enclosure character
768 for i in range(max(min_count, 1)):
769 right_pos = text_url[:right_pos].rfind(right_char)
771 if right_pos < 0 or right_pos < tld_pos:
772 right_pos = len(text_url)
774 new_url = text_url[left_pos + 1 : right_pos]
775 tld_pos -= left_pos + 1
777 # Get valid domain when we have input as: example.com)/path
778 # we assume that if there is enclosure character after TLD it is
779 # the end URL itself therefore we remove the rest
780 after_tld_pos = tld_pos + len(tld)
781 if after_tld_pos < len(new_url):
782 if new_url[after_tld_pos] in enclosure_map.values():
783 new_url_tmp = new_url[:after_tld_pos]
784 return self._remove_enclosure_from_url(new_url_tmp, tld_pos, tld)
786 return new_url
788 @staticmethod
789 def _split_markdown(text_url: str, tld_pos: int) -> str:
790 """
791 Split markdown URL. There is an issue wen Markdown URL is found.
792 Parsing of the URL does not stop on right place so wrongly found URL
793 has to be split.
795 :param str text_url: URL that we want to extract from enclosure
796 :param int tld_pos: position of TLD
797 :return: URL that has removed enclosure
798 :rtype: str
799 """
800 # Markdown url can looks like:
801 # [http://example.com/](http://example.com/status/210)
803 left_bracket_pos = text_url.find("[")
804 # subtract 3 because URL is never shorter than 3 characters
805 if left_bracket_pos > tld_pos - 3:
806 return text_url
808 right_bracket_pos = text_url.find(")")
809 if right_bracket_pos < tld_pos:
810 return text_url
812 middle_pos = text_url.rfind("](")
813 if middle_pos > tld_pos:
814 return text_url[left_bracket_pos + 1 : middle_pos]
815 return text_url
817 @staticmethod
818 # TODO: fix DOC to accomodate to return value
819 def _get_tld_pos(url: str, tld: str) -> int:
820 """
821 Return position of TLD in hostname.
823 :param str url: URL in which TLD should be located
824 :param str tld: TLD we want ot find
825 :return:
826 """
827 tpm_url = "http://" + url if url.find("://") == -1 else url
829 url_parts = uritools.urisplit(tpm_url)
830 host = str(url_parts.gethost())
831 # `host` is always returned in lowercase,
832 # so make sure `url` & `tld` must also be lowercase,
833 # otherwise the `find()` may fail.
834 offset = url.lower().find(host)
835 return host.rfind(tld.lower()) + offset
837 # TODO: move type assertion to be Generator based
838 # found https://stackoverflow.com/a/38423388/14669675
839 def gen_urls(
840 self, text: str, check_dns=False, get_indices=False, with_schema_only=False
841 ) -> Generator[Union[str, Tuple[str, Tuple[int, int]]], None, None]:
842 """
843 Creates generator over found URLs in given text.
845 :param str text: text where we want to find URLs
846 :param bool check_dns: filter results to valid domains
847 :param bool get_indices: whether to return beginning and
848 ending indices as (<url>, (idx_begin, idx_end))
849 :param bool with_schema_only: get domains with schema only
850 :yields: URL or URL with indices found in text or empty string if nothing was found
851 :rtype: str|tuple(str, tuple(int, int))
852 """
853 tld_pos = 0
854 matched_tlds = self._tlds_re.findall(text)
856 while matched_tlds:
857 tld = matched_tlds.pop(0)
858 tmp_text = text[tld_pos:]
859 offset = tld_pos
860 tld_pos = tmp_text.find(tld)
861 validated = self._validate_tld_match(text, tld, offset + tld_pos)
862 if tld_pos != -1 and validated:
863 tmp_url = self._complete_url(
864 text,
865 offset + tld_pos,
866 tld,
867 check_dns=check_dns,
868 with_schema_only=with_schema_only,
869 )
871 if tmp_url:
872 # do not search for TLD in already extracted URL
873 tld_pos_url = self._get_tld_pos(tmp_url, tld)
874 # move cursor right after found TLD
875 tld_pos += len(tld) + offset
876 # move cursor after end of found URL
877 rest_url = tmp_url[tld_pos_url + len(tld) :]
878 tld_pos += len(rest_url)
880 # remove all matched TLDs that were found in currently
881 # extracted URL (tmp_url resp. rest_url)
882 while matched_tlds:
883 new_tld = matched_tlds[0]
884 tmp_tld_pos_url = rest_url.find(new_tld)
885 if tmp_tld_pos_url < 0:
886 break
887 rest_url = rest_url[tmp_tld_pos_url + len(new_tld) :]
888 matched_tlds.pop(0)
890 if get_indices:
891 yield tmp_url, (tld_pos - len(tmp_url), tld_pos)
892 else:
893 yield tmp_url
895 continue
897 # move cursor right after found TLD
898 tld_pos += len(tld) + offset
900 def find_urls(
901 self,
902 text: str,
903 only_unique=False,
904 check_dns=False,
905 get_indices=False,
906 with_schema_only=False,
907 ) -> List[Union[str, Tuple[str, Tuple[int, int]]]]:
908 """
909 Find all URLs in given text.
911 :param str text: text where we want to find URLs
912 :param bool only_unique: return only unique URLs
913 :param bool check_dns: filter results to valid domains
914 :return: list of URLs found in text
915 :param bool get_indices: whether to return beginning and
916 ending indices as (<url>, (idx_begin, idx_end))
917 :param bool with_schema_only: get domains with schema only
918 (e.g. https://janlipovsky.cz but not example.com)
919 :rtype: list
921 :raises URLExtractError: Raised when count of found URLs reaches
922 given limit. Processed URLs are returned in `data` argument.
923 """
924 urls = self.gen_urls(
925 text,
926 check_dns=check_dns,
927 get_indices=get_indices,
928 with_schema_only=with_schema_only,
929 )
930 if self._limit is None:
931 if only_unique:
932 return list(OrderedDict.fromkeys(urls))
933 return list(urls)
935 result_urls: List[Union[str, Tuple[str, Tuple[int, int]]]] = []
936 url = next(urls, "")
937 url_count = 1
938 while url:
939 if url_count > self._limit:
940 err = "Limit for extracting URLs was reached. [{} URLs]".format(
941 self._limit
942 )
943 self._logger.error(err)
945 raise URLExtractError(err, data=result_urls)
947 result_urls.append(url)
948 url = next(urls, "")
949 url_count += 1
951 if only_unique:
952 return list(OrderedDict.fromkeys(result_urls))
953 return result_urls
955 def has_urls(self, text: str, check_dns=False, with_schema_only=False) -> bool:
956 """
957 Checks if text contains any valid URL.
958 Returns True if text contains at least one URL.
960 :param text: text where we want to find URLs
961 :param bool check_dns: filter results to valid domains
962 :param bool with_schema_only: consider domains with schema only
963 :return: True if et least one URL was found, False otherwise
964 :rtype: bool
965 """
967 return any(
968 self.gen_urls(text, check_dns=check_dns, with_schema_only=with_schema_only)
969 )
972class URLExtractError(Exception):
973 """
974 Raised when some error occurred during processing URLs.
976 Attributes:
977 message -- explanation of the error
978 data -- input expression in which the error occurred
979 """
981 def __init__(self, message, data):
982 self.data = data
983 self.message = message
986def report_issue(func):
987 """Friendly message with link to GitHub for easier reporting"""
989 @functools.wraps(func)
990 def wrapper_urlextract_cli(*args, **kwargs):
991 try:
992 return func(*args, **kwargs)
993 except Exception:
994 print(
995 "Error: An unexpected error occurred. "
996 "If you can't resolve this issue please report it to: "
997 "https://github.com/lipoja/URLExtract/issues "
998 "and help us improve urlextract!",
999 file=sys.stderr,
1000 )
1001 raise
1003 return wrapper_urlextract_cli
1006@report_issue
1007def _urlextract_cli():
1008 """
1009 urlextract - command line program that will print all URLs to stdout
1010 Usage: urlextract [input_file] [-u] [-v]
1012 input_file - text file with URLs to extract
1013 """
1014 import argparse
1016 # TODO: add type checking here
1017 def get_args() -> Namespace:
1018 """Parse programs arguments"""
1019 parser = argparse.ArgumentParser(
1020 description="urlextract - prints out all URLs that were "
1021 "found in input file or stdin based on locating "
1022 "their TLDs"
1023 )
1025 ver = URLExtract.get_version()
1026 parser.add_argument(
1027 "-v",
1028 "--version",
1029 action="version",
1030 version="%(prog)s - version {}".format(ver),
1031 )
1033 parser.add_argument(
1034 "-u",
1035 "--unique",
1036 dest="unique",
1037 action="store_true",
1038 help="print out only unique URLs found in file",
1039 )
1041 parser.add_argument(
1042 "-dl",
1043 "--disable-localhost",
1044 dest="disable_localhost",
1045 action="store_true",
1046 help='disable extracting "localhost" as URL',
1047 )
1049 parser.add_argument(
1050 "-c",
1051 "--check-dns",
1052 dest="check_dns",
1053 action="store_true",
1054 help="print out only URLs for existing domain names",
1055 )
1057 parser.add_argument(
1058 "-i",
1059 "--ignore-file",
1060 metavar="<ignore_file>",
1061 type=str,
1062 default=None,
1063 help="input text file with URLs to exclude from extraction",
1064 )
1066 parser.add_argument(
1067 "-p",
1068 "--permit-file",
1069 metavar="<permit_file>",
1070 type=str,
1071 default=None,
1072 help="input text file with URLs that can be processed",
1073 )
1075 parser.add_argument(
1076 "-l",
1077 "--limit",
1078 dest="limit",
1079 type=int,
1080 default=DEFAULT_LIMIT,
1081 help="Maximum count of URLs that can be processed. "
1082 "Set 0 to disable the limit. "
1083 "Default: {}".format(DEFAULT_LIMIT),
1084 )
1086 parser.add_argument(
1087 "input_file",
1088 nargs="?",
1089 metavar="<input_file>",
1090 type=argparse.FileType(),
1091 default=sys.stdin,
1092 help="input text file with URLs to extract",
1093 )
1095 parsed_args = parser.parse_args()
1096 return parsed_args
1098 args = get_args()
1099 logging.basicConfig(
1100 level=logging.WARNING,
1101 stream=sys.stderr,
1102 format="%(asctime)s - %(levelname)s (%(name)s): %(message)s",
1103 )
1104 logger = logging.getLogger("urlextract")
1106 try:
1107 limit = None if args.limit <= 0 else args.limit
1108 urlextract = URLExtract(limit=limit)
1109 if args.disable_localhost:
1110 urlextract.extract_localhost = False
1111 if args.ignore_file:
1112 urlextract.load_ignore_list(args.ignore_file)
1113 if args.permit_file:
1114 urlextract.load_permit_list(args.permit_file)
1115 urlextract.update_when_older(30)
1116 content = args.input_file.read()
1117 try:
1118 for url in urlextract.find_urls(content, args.unique, args.check_dns):
1119 print(url)
1120 except URLExtractError as e:
1121 logger.error(
1122 "You can set limit using --limit parameter. "
1123 "See --help for more details."
1124 )
1125 for url in e.data:
1126 print(url)
1128 except CacheFileError as e:
1129 logger.error(str(e))
1130 sys.exit(-1)
1131 finally:
1132 args.input_file.close()
1135def dns_cache_install() -> None:
1136 try:
1137 from dns import resolver as dnspython_resolver_module # type: ignore
1138 from dns_cache.resolver import ExceptionCachingResolver # type: ignore
1140 if not dnspython_resolver_module.default_resolver:
1141 dnspython_resolver_module.default_resolver = ExceptionCachingResolver()
1142 del dnspython_resolver_module
1143 except ImportError:
1144 pass
1146 try:
1147 from dns.resolver import ( # type: ignore
1148 LRUCache,
1149 Resolver,
1150 _resolver,
1151 default_resolver,
1152 override_system_resolver,
1153 )
1154 except ImportError:
1155 return
1157 if default_resolver:
1158 if not default_resolver.cache:
1159 default_resolver.cache = LRUCache()
1160 resolver = default_resolver
1161 elif _resolver and _resolver.cache:
1162 resolver = _resolver
1163 else:
1164 resolver = Resolver()
1165 resolver.cache = LRUCache()
1166 override_system_resolver(resolver)
1169if __name__ == "__main__":
1170 _urlextract_cli()