1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3"""
4urlextract_core.py - file with definition of URLExtract class and urlextract cli
5
6.. Created on 2016-07-29
7.. Licence MIT
8.. codeauthor:: Jan Lipovský <janlipovsky@gmail.com>, janlipovsky.cz
9.. contributors: https://github.com/lipoja/URLExtract/graphs/contributors
10"""
11from argparse import Namespace
12import functools
13import ipaddress
14import logging
15import re
16import socket
17from typing import Set, Iterable, Tuple, List, Union, NoReturn, Generator
18import string
19import sys
20from collections import OrderedDict
21from datetime import datetime, timedelta
22
23import uritools # type: ignore
24
25from urlextract.cachefile import CacheFile, CacheFileError
26
27# version of URLExtract (do not forget to change it in setup.py as well)
28__version__ = "1.9.0"
29
30# default value for maximum count of processed URLs by find_url
31DEFAULT_LIMIT = 10000
32
33
34class URLExtract(CacheFile):
35 """
36 Class for finding and extracting URLs from given string.
37
38 **Examples:**
39
40 .. code-block:: python
41
42 from urlextract import URLExtract
43
44 extractor = URLExtract()
45 urls = extractor.find_urls("Let's have URL example.com example.")
46 print(urls) # prints: ['example.com']
47
48 # Another way is to get a generator over found URLs in text:
49 for url in extractor.gen_urls(example_text):
50 print(url) # prints: ['example.com']
51
52 # Or if you want to just check if there is at least one URL in text:
53 if extractor.has_urls(example_text):
54 print("Given text contains some URL")
55 """
56
57 # compiled regexp for naive validation of host name
58 _hostname_re = re.compile(r"^([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])$")
59
60 # list of enclosure of URL that should be removed
61 _enclosure = {
62 ("(", ")"),
63 ("{", "}"),
64 ("[", "]"),
65 ('"', '"'),
66 ("\\", "\\"),
67 ("'", "'"),
68 ("`", "`"),
69 }
70
71 _ipv4_tld = [".{}".format(ip) for ip in reversed(range(256))]
72 _ignore_list: Set[str] = set()
73 _permit_list: Set[str] = set()
74
75 _limit = DEFAULT_LIMIT
76
77 def __init__(
78 self,
79 extract_email=False,
80 cache_dns=True,
81 extract_localhost=True,
82 limit=DEFAULT_LIMIT,
83 allow_mixed_case_hostname=True,
84 **kwargs, # noqa E999
85 ):
86 """
87 Initialize function for URLExtract class.
88 Tries to get cached TLDs, if cached file does not exist it will try
89 to download new list from IANA and save it to cache file.
90
91 :param bool extract_email: True if we want to extract email from text.
92 Disabled by default
93 :param bool cache_dns: True replaces socket DNS lookup with caching
94 equivalent provided by dnspython.
95 Enabled by default
96 :param bool extract_localhost: True if we want to extract 'localhost'
97 as URL from text.
98 Enabled by default
99 :param int limit: maximum count of processed URLs by find_url function
100 default value defined as global variable DEFAULT_LIMIT
101 :param bool allow_mixed_case_hostname: True if hostname can contain mixed case letters
102 (upper-case and lower-case).
103 Disabled by default
104 """
105 super(URLExtract, self).__init__(**kwargs)
106
107 self._tlds_re = None
108 self._extract_localhost = extract_localhost
109 self._extract_email = extract_email
110 self._cache_dns = cache_dns
111 self._limit = limit
112 self._allow_mixed_case_hostname = allow_mixed_case_hostname
113 self._reload_tlds_from_file()
114
115 # general stop characters
116 general_stop_chars = {'"', "<", ">", ";"}
117 # defining default stop chars left
118 self._stop_chars_left = set(string.whitespace)
119 self._stop_chars_left |= general_stop_chars | {"|", "=", "]", ")", "}"}
120
121 # default stop characters on left side from schema
122 self._stop_chars_left_from_schema = self._stop_chars_left.copy() | {":"}
123
124 # defining default stop chars left
125 self._stop_chars_right = set(string.whitespace)
126 self._stop_chars_right |= general_stop_chars
127
128 # characters that are allowed to be right after TLD
129 self._after_tld_chars = self._get_after_tld_chars()
130
131 def _get_after_tld_chars(self) -> Set[str]:
132 """Initialize after tld characters"""
133 after_tld_chars = set(string.whitespace)
134 after_tld_chars |= {"/", '"', "'", "<", ">", "?", ":", ".", ","}
135 # get left enclosure characters
136 _, right_enclosure = zip(*self._enclosure)
137 # add right enclosure characters to be valid after TLD
138 # for correct parsing of URL e.g. (example.com)
139 after_tld_chars |= set(right_enclosure)
140
141 return after_tld_chars
142
143 def _reload_tlds_from_file(self):
144 """
145 Reloads TLDs from file and compile regexp.
146 :raises: CacheFileError when cached file is not readable for user
147 """
148
149 tlds = sorted(self._load_cached_tlds(), key=len, reverse=True)
150 tlds += self._ipv4_tld
151 if self._extract_localhost:
152 tlds.append("localhost")
153 re_escaped = [re.escape(str(tld)) for tld in tlds]
154 self._tlds_re = re.compile("|".join(re_escaped), flags=re.IGNORECASE)
155
156 @property
157 def extract_email(self) -> bool:
158 """
159 If set to True email will be extracted from text
160
161 :rtype: bool
162 """
163 return self._extract_email
164
165 @extract_email.setter
166 def extract_email(self, extract: bool):
167 """
168 Set if emails will be extracted from text
169
170 :param bool extract: True if emails should be extracted False otherwise
171 """
172 self._extract_email = extract
173
174 @property
175 def allow_mixed_case_hostname(self) -> bool:
176 """
177 If set to True host should contain mixed case letters (upper-case and lower-case)
178
179 :rtype: bool
180 """
181 return self._allow_mixed_case_hostname
182
183 @allow_mixed_case_hostname.setter
184 def allow_mixed_case_hostname(self, allow_mixed_case: bool):
185 """
186 Set if mixed case hostnames are allowed
187
188 :param bool allow_mixed_case: True if we should allow mixed case hostnames False otherwise
189 """
190 self._allow_mixed_case_hostname = allow_mixed_case
191
192 @property
193 def extract_localhost(self) -> bool:
194 """
195 If set to True 'localhost' will be extracted as URL from text
196
197 :rtype: bool
198 """
199 return self._extract_localhost
200
201 @extract_localhost.setter
202 def extract_localhost(self, enable: bool):
203 """
204 Set if 'localhost' will be extracted as URL from text
205
206 :param bool enable: True if 'localhost' should be extracted
207 False otherwise
208 """
209 self._extract_localhost = enable
210
211 @property
212 def ignore_list(self) -> Set[str]:
213 """
214 Set of URLs to be ignored (not returned) while extracting from text
215
216 :return: Returns set of ignored URLs
217 :rtype: set(str)
218 """
219 return self._ignore_list
220
221 @ignore_list.setter
222 def ignore_list(self, ignore_list: Set[str]):
223 """
224 Set of URLs to be ignored (not returned) while extracting from text
225
226 :param set(str) ignore_list: set of URLs
227 """
228 self._ignore_list = ignore_list
229
230 def load_ignore_list(self, file_name):
231 """
232 Load URLs from file into ignore list
233
234 :param str file_name: path to file containing URLs
235 """
236 with open(file_name) as f:
237 for line in f:
238 url = line.strip()
239 if not url:
240 continue
241 self._ignore_list.add(url)
242
243 @property
244 def permit_list(self):
245 """
246 Set of URLs that can be processed
247
248 :return: Returns set of URLs that can be processed
249 :rtype: set(str)
250 """
251 return self._permit_list
252
253 @permit_list.setter
254 def permit_list(self, permit_list):
255 """
256 Set of URLs that can be processed
257
258 :param set(str) permit_list: set of URLs
259 """
260 self._permit_list = permit_list
261
262 def load_permit_list(self, file_name):
263 """
264 Load URLs from file into permit list
265
266 :param str file_name: path to file containing URLs
267 """
268 with open(file_name) as f:
269 for line in f:
270 url = line.strip()
271 if not url:
272 continue
273 self._permit_list.add(url)
274
275 def update(self):
276 """
277 Update TLD list cache file.
278
279 :return: True if update was successful False otherwise
280 :rtype: bool
281 """
282 if not self._download_tlds_list():
283 return False
284
285 self._reload_tlds_from_file()
286
287 return True
288
289 def update_when_older(self, days: int) -> bool:
290 """
291 Update TLD list cache file if the list is older than
292 number of days given in parameter `days` or if it does not exist.
293
294 :param int days: number of days from last change
295 :return: True if update was successful, False otherwise
296 :rtype: bool
297 """
298
299 last_cache = self._get_last_cachefile_modification()
300 if last_cache is None:
301 return self.update()
302
303 time_to_update = last_cache + timedelta(days=days)
304
305 if datetime.now() >= time_to_update:
306 return self.update()
307
308 return True
309
310 @staticmethod
311 def get_version() -> str:
312 """
313 Returns version number.
314
315 :return: version number
316 :rtype: str
317 """
318
319 return __version__
320
321 def get_after_tld_chars(self) -> List[str]:
322 """
323 Returns list of chars that are allowed after TLD
324
325 :return: list of chars that are allowed after TLD
326 :rtype: list
327 """
328
329 return list(self._after_tld_chars)
330
331 def set_after_tld_chars(self, after_tld_chars: Iterable[str]):
332 """
333 Set chars that are allowed after TLD.
334
335 :param list after_tld_chars: list of characters
336 """
337
338 self._after_tld_chars = set(after_tld_chars)
339
340 def get_stop_chars_left(self) -> Set[str]:
341 """
342 Returns set of stop chars for text on left from TLD.
343
344 :return: set of stop chars
345 :rtype: set
346 """
347 return self._stop_chars_left
348
349 def set_stop_chars_left(self, stop_chars: Set[str]):
350 """
351 Set stop characters for text on left from TLD.
352 Stop characters are used when determining end of URL.
353
354 :param set stop_chars: set of characters
355 :raises: TypeError
356 """
357 if not isinstance(stop_chars, set):
358 raise TypeError(
359 "stop_chars should be type set "
360 "but {} was given".format(type(stop_chars))
361 )
362
363 self._stop_chars_left = stop_chars
364
365 def get_stop_chars_left_from_scheme(self) -> Set[str]:
366 """
367 Returns set of stop chars for text on left from scheme.
368
369 :return: set of stop chars
370 :rtype: set
371 """
372 return self._stop_chars_left_from_schema
373
374 def set_stop_chars_left_from_scheme(self, stop_chars: Set[str]):
375 """
376 Set stop characters for text on left from scheme.
377 Stop characters are used when determining end of URL.
378
379 :param set stop_chars: set of characters
380 :raises: TypeError
381 """
382 if not isinstance(stop_chars, set):
383 raise TypeError(
384 "stop_chars should be type set "
385 "but {} was given".format(type(stop_chars))
386 )
387
388 self._stop_chars_left_from_schema = stop_chars
389
390 def get_stop_chars_right(self) -> Set[str]:
391 """
392 Returns set of stop chars for text on right from TLD.
393
394 :return: set of stop chars
395 :rtype: set
396 """
397 return self._stop_chars_right
398
399 def set_stop_chars_right(self, stop_chars: Set[str]):
400 """
401 Set stop characters for text on right from TLD.
402 Stop characters are used when determining end of URL.
403
404 :param set stop_chars: set of characters
405 :raises: TypeError
406 """
407 if not isinstance(stop_chars, set):
408 raise TypeError(
409 "stop_chars should be type set "
410 "but {} was given".format(type(stop_chars))
411 )
412
413 self._stop_chars_right = stop_chars
414
415 def get_enclosures(self) -> Set[Tuple[str, str]]:
416 """
417 Returns set of enclosure pairs that might be used to enclosure URL.
418 For example brackets (example.com), [example.com], {example.com}
419
420 :return: set of tuple of enclosure characters
421 :rtype: set(tuple(str,str))
422 """
423 return self._enclosure
424
425 def add_enclosure(self, left_char: str, right_char: str):
426 """
427 Add new enclosure pair of characters. That and should be removed
428 when their presence is detected at beginning and end of found URL
429
430 :param str left_char: left character of enclosure pair - e.g. "("
431 :param str right_char: right character of enclosure pair - e.g. ")"
432 """
433 assert len(left_char) == 1, "Parameter left_char must be character not string"
434 assert len(right_char) == 1, "Parameter right_char must be character not string"
435 self._enclosure.add((left_char, right_char))
436
437 self._after_tld_chars = self._get_after_tld_chars()
438
439 def remove_enclosure(self, left_char: str, right_char: str):
440 """
441 Remove enclosure pair from set of enclosures.
442
443 :param str left_char: left character of enclosure pair - e.g. "("
444 :param str right_char: right character of enclosure pair - e.g. ")"
445 """
446 assert len(left_char) == 1, "Parameter left_char must be character not string"
447 assert len(right_char) == 1, "Parameter right_char must be character not string"
448 rm_enclosure = (left_char, right_char)
449 if rm_enclosure in self._enclosure:
450 self._enclosure.remove(rm_enclosure)
451
452 self._after_tld_chars = self._get_after_tld_chars()
453
454 def _complete_url(
455 self, text: str, tld_pos: int, tld: str, check_dns=False, with_schema_only=False
456 ) -> str:
457 """
458 Expand string in both sides to match whole URL.
459
460 :param str text: text where we want to find URL
461 :param int tld_pos: position of TLD
462 :param str tld: matched TLD which should be in text
463 :param bool check_dns: filter results to valid domains
464 :param bool with_schema_only: get domains with schema only
465 (e.g. https://janlipovsky.cz but not example.com)
466 :return: returns URL
467 :rtype: str
468 """
469
470 left_ok = True
471 right_ok = True
472
473 # hack to fix Markdown link match
474 possible_markdown = False
475 right_enclosure_pos = None
476
477 max_len = len(text) - 1
478 end_pos = tld_pos
479 start_pos = tld_pos
480 in_scheme = False
481 while left_ok or right_ok:
482 if left_ok:
483 if start_pos <= 0:
484 left_ok = False
485 else:
486 # For Markdown link is typical to have "](" these
487 # brackets next to each other without white space
488 if text[start_pos] == "(" and text[start_pos - 1] == "]":
489 possible_markdown = True
490 if (
491 in_scheme
492 and text[start_pos - 1] in self._stop_chars_left_from_schema
493 ):
494 left_ok = False
495 if (
496 left_ok
497 and text[start_pos - 1] not in self._stop_chars_left
498 # Allow only ASCII characters in authority and schema
499 and ord(text[start_pos - 1]) <= 127
500 ):
501 start_pos -= 1
502 else:
503 left_ok = False
504 if right_ok:
505 if end_pos >= max_len:
506 right_ok = False
507 else:
508 if text[end_pos + 1] not in self._stop_chars_right:
509 # correcting Markdown matches
510 if right_enclosure_pos is None and text[end_pos + 1] == ")":
511 right_enclosure_pos = end_pos + 1
512 end_pos += 1
513 else:
514 right_ok = False
515
516 if text[start_pos : start_pos + 3] == "://":
517 in_scheme = True
518
519 # correcting Markdown matches
520 if possible_markdown and right_enclosure_pos is not None:
521 end_pos = right_enclosure_pos
522
523 complete_url = text[start_pos : end_pos + 1].lstrip("/")
524 # remove last character from url
525 # when it is allowed character right after TLD (e.g. dot, comma)
526 temp_tlds = {tld + c for c in self._after_tld_chars}
527 # get only dot+tld+one_char and compare
528 extended_tld = complete_url[len(complete_url) - len(tld) - 1 :]
529 if extended_tld in temp_tlds:
530 # We do not want to change found URL
531 if not extended_tld.endswith("/"):
532 complete_url = complete_url[:-1]
533
534 complete_url = self._split_markdown(complete_url, tld_pos - start_pos)
535 complete_url = self._remove_enclosure_from_url(
536 complete_url, tld_pos - start_pos, tld
537 )
538
539 # search for enclosures before URL ignoring space character " "
540 # when URL contains right enclosure character (issue #77)
541 enclosure_map = {
542 left_char: right_char for left_char, right_char in self._enclosure
543 }
544 if any(
545 enclosure in complete_url[tld_pos - start_pos :]
546 for enclosure in enclosure_map.values()
547 ):
548 enclosure_space_char = True
549 enclosure_found = False
550 tmp_start_pos = start_pos
551 while enclosure_space_char:
552 if tmp_start_pos <= 0:
553 break
554 if text[tmp_start_pos - 1] == " ":
555 tmp_start_pos -= 1
556 elif text[tmp_start_pos - 1] in enclosure_map.keys():
557 tmp_start_pos -= 1
558 enclosure_found = True
559 else:
560 enclosure_space_char = False
561
562 if enclosure_found:
563 pre_url = text[tmp_start_pos:start_pos]
564 extended_complete_url = pre_url + complete_url
565 complete_url = self._remove_enclosure_from_url(
566 extended_complete_url, tld_pos - tmp_start_pos, tld
567 )
568 # URL should not start/end with whitespace
569 complete_url = complete_url.strip()
570 # URL should not start with two backslashes
571 if complete_url.startswith("//"):
572 complete_url = complete_url[2:]
573 # URL should not start with unreserved characters
574 if complete_url.startswith(("-", ".", "~", "_")):
575 complete_url = complete_url[1:]
576 if not self._is_domain_valid(
577 complete_url, tld, check_dns=check_dns, with_schema_only=with_schema_only
578 ):
579 return ""
580
581 return complete_url
582
583 def _validate_tld_match(self, text: str, matched_tld: str, tld_pos: int) -> bool:
584 """
585 Validate TLD match - tells if at found position is really TLD.
586
587 :param str text: text where we want to find URLs
588 :param str matched_tld: matched TLD
589 :param int tld_pos: position of matched TLD
590 :return: True if match is valid, False otherwise
591 :rtype: bool
592 """
593 if tld_pos > len(text):
594 return False
595
596 right_tld_pos = tld_pos + len(matched_tld)
597 if len(text) > right_tld_pos:
598 if text[right_tld_pos] in self._after_tld_chars:
599 if tld_pos > 0 and text[tld_pos - 1] not in self._stop_chars_left:
600 return True
601 else:
602 if tld_pos > 0 and text[tld_pos - 1] not in self._stop_chars_left:
603 return True
604
605 return False
606
607 def _is_domain_valid(
608 self, url: str, tld: str, check_dns=False, with_schema_only=False
609 ):
610 """
611 Checks if given URL has valid domain name (ignores subdomains)
612
613 :param str url: complete URL that we want to check
614 :param str tld: TLD that should be found at the end of URL (hostname)
615 :param bool check_dns: filter results to valid domains
616 :param bool with_schema_only: URL must contain schema (protocol)
617 to be considered valid
618 :return: True if URL is valid, False otherwise
619 :rtype: bool
620
621 >>> extractor = URLExtract()
622 >>> extractor._is_domain_valid("janlipovsky.cz", ".cz")
623 True
624
625 >>> extractor._is_domain_valid("https://janlipovsky.cz", ".cz")
626 True
627
628 >>> extractor._is_domain_valid("invalid.cz.", ".cz")
629 False
630
631 >>> extractor._is_domain_valid("invalid.cz,", ".cz")
632 False
633
634 >>> extractor._is_domain_valid("in.v_alid.cz", ".cz")
635 False
636
637 >>> extractor._is_domain_valid("-is.valid.cz", ".cz")
638 True
639
640 >>> extractor._is_domain_valid("not.valid-.cz", ".cz")
641 False
642
643 >>> extractor._is_domain_valid("http://blog/media/path.io.jpg", ".cz")
644 False
645 """
646
647 if not url:
648 return False
649
650 scheme_pos = url.find("://")
651 if scheme_pos == -1:
652 if with_schema_only:
653 return False
654 url = "http://" + url
655 added_schema = True
656 else:
657 added_schema = False
658
659 url_parts = uritools.urisplit(url)
660 # <scheme>://<authority>/<path>?<query>#<fragment>
661
662 # authority can't start with @
663 if url_parts.authority and url_parts.authority.startswith("@"):
664 return False
665
666 # if URI contains user info and schema was automatically added
667 # the url is probably an email
668 if url_parts.getuserinfo() and added_schema:
669 # do not collect emails
670 if not self._extract_email:
671 return False
672 else:
673 # if we want to extract email we have to be sure that it
674 # really is email -> given URL does not have other parts
675 if (
676 url_parts.getport()
677 or url_parts.getpath()
678 or url_parts.getquery()
679 or url_parts.getfragment()
680 ):
681 return False
682
683 try:
684 host = url_parts.gethost()
685 except ValueError:
686 self._logger.info(
687 "Invalid host '%s'. " "If the host is valid report a bug.", url
688 )
689 return False
690
691 if not host:
692 return False
693
694 if not self.allow_mixed_case_hostname:
695 # we have to take url_parts.host instead of host variable because url_parts.host is not normalized
696 if not (
697 all(s.islower() for s in url_parts.host if s.isalpha())
698 or all(s.isupper() for s in url_parts.host if s.isalpha())
699 ):
700 return False
701
702 if self._permit_list and host not in self._permit_list:
703 return False
704
705 if host in self._ignore_list:
706 return False
707
708 # IP address are valid hosts
709 is_ipv4 = isinstance(host, ipaddress.IPv4Address)
710 if is_ipv4:
711 return True
712
713 # when TLD is a number the host must be IP
714 if tld in self._ipv4_tld and not is_ipv4:
715 return False
716
717 host_parts = host.split(".")
718
719 if self._extract_localhost and host_parts == ["localhost"]:
720 return True
721
722 if len(host_parts) <= 1:
723 return False
724
725 host_tld = "." + host_parts[-1]
726 if host_tld.lower() != tld.lower():
727 return False
728
729 top = host_parts[-2]
730
731 if self._hostname_re.match(top) is None:
732 return False
733
734 if check_dns:
735 if self._cache_dns is True:
736 dns_cache_install()
737 self._cache_dns = False
738
739 try:
740 socket.gethostbyname(host)
741 except socket.herror as err:
742 if err.errno == 0:
743 self._logger.info(
744 "Unable to resolve address {}: {}".format(host, err)
745 )
746 else:
747 self._logger.info(err)
748 return False
749 except Exception as err:
750 self._logger.info(
751 "Unknown exception during gethostbyname({}) {!r}".format(host, err)
752 )
753 return False
754
755 return True
756
757 def _remove_enclosure_from_url(self, text_url: str, tld_pos: int, tld: str) -> str:
758 """
759 Removes enclosure characters from URL given in text_url.
760 For example: (example.com) -> example.com
761
762 :param str text_url: text with URL that we want to extract from
763 enclosure of two characters
764 :param int tld_pos: position of TLD in text_url
765 :param str tld: matched TLD which should be in text
766 :return: URL that has removed enclosure
767 :rtype: str
768 """
769
770 enclosure_map = {
771 left_char: right_char for left_char, right_char in self._enclosure
772 }
773 # get position of most right left_char of enclosure pairs
774 left_pos = max(
775 [
776 text_url.rfind(left_char, 0, tld_pos)
777 for left_char in enclosure_map.keys()
778 ]
779 )
780 left_char = text_url[left_pos] if left_pos >= 0 else ""
781 right_char = enclosure_map.get(left_char, "")
782 # get count of left and right enclosure characters and
783 left_char_count = text_url[: left_pos + 1].count(left_char)
784 right_char_count = text_url[left_pos:].count(right_char)
785 # we want to find only pairs and ignore rest (more occurrences)
786 min_count = min(left_char_count, right_char_count)
787
788 right_pos = len(text_url) + 1
789 # find position of Nth occurrence of right enclosure character
790 for i in range(max(min_count, 1)):
791 right_pos = text_url[:right_pos].rfind(right_char)
792
793 if right_pos < 0 or right_pos < tld_pos:
794 right_pos = len(text_url)
795
796 new_url = text_url[left_pos + 1 : right_pos]
797 tld_pos -= left_pos + 1
798
799 # Get valid domain when we have input as: example.com)/path
800 # we assume that if there is enclosure character after TLD it is
801 # the end URL itself therefore we remove the rest
802 after_tld_pos = tld_pos + len(tld)
803 if after_tld_pos < len(new_url):
804 if new_url[after_tld_pos] in enclosure_map.values():
805 new_url_tmp = new_url[:after_tld_pos]
806 return self._remove_enclosure_from_url(new_url_tmp, tld_pos, tld)
807
808 return new_url
809
810 @staticmethod
811 def _split_markdown(text_url: str, tld_pos: int) -> str:
812 """
813 Split markdown URL. There is an issue wen Markdown URL is found.
814 Parsing of the URL does not stop on right place so wrongly found URL
815 has to be split.
816
817 :param str text_url: URL that we want to extract from enclosure
818 :param int tld_pos: position of TLD
819 :return: URL that has removed enclosure
820 :rtype: str
821 """
822 # Markdown url can looks like:
823 # [http://example.com/](http://example.com/status/210)
824
825 left_bracket_pos = text_url.find("[")
826 # subtract 3 because URL is never shorter than 3 characters
827 if left_bracket_pos > tld_pos - 3:
828 return text_url
829
830 right_bracket_pos = text_url.find(")")
831 if right_bracket_pos < tld_pos:
832 return text_url
833
834 middle_pos = text_url.rfind("](")
835 if middle_pos > tld_pos:
836 return text_url[left_bracket_pos + 1 : middle_pos]
837 return text_url
838
839 @staticmethod
840 # TODO: fix DOC to accomodate to return value
841 def _get_tld_pos(url: str, tld: str) -> int:
842 """
843 Return position of TLD in hostname.
844
845 :param str url: URL in which TLD should be located
846 :param str tld: TLD we want ot find
847 :return:
848 """
849 tpm_url = "http://" + url if url.find("://") == -1 else url
850
851 url_parts = uritools.urisplit(tpm_url)
852 host = str(url_parts.gethost())
853 # `host` is always returned in lowercase,
854 # so make sure `url` & `tld` must also be lowercase,
855 # otherwise the `find()` may fail.
856 offset = url.lower().find(host)
857 return host.rfind(tld.lower()) + offset
858
859 # TODO: move type assertion to be Generator based
860 # found https://stackoverflow.com/a/38423388/14669675
861 def gen_urls(
862 self, text: str, check_dns=False, get_indices=False, with_schema_only=False
863 ) -> Generator[Union[str, Tuple[str, Tuple[int, int]]], None, None]:
864 """
865 Creates generator over found URLs in given text.
866
867 :param str text: text where we want to find URLs
868 :param bool check_dns: filter results to valid domains
869 :param bool get_indices: whether to return beginning and
870 ending indices as (<url>, (idx_begin, idx_end))
871 :param bool with_schema_only: get domains with schema only
872 :yields: URL or URL with indices found in text or empty string if nothing was found
873 :rtype: str|tuple(str, tuple(int, int))
874 """
875 tld_pos = 0
876 matched_tlds = self._tlds_re.findall(text)
877
878 while matched_tlds:
879 tld = matched_tlds.pop(0)
880 tmp_text = text[tld_pos:]
881 offset = tld_pos
882 tld_pos = tmp_text.find(tld)
883 validated = self._validate_tld_match(text, tld, offset + tld_pos)
884 if tld_pos != -1 and validated:
885 tmp_url = self._complete_url(
886 text,
887 offset + tld_pos,
888 tld,
889 check_dns=check_dns,
890 with_schema_only=with_schema_only,
891 )
892
893 if tmp_url:
894 # do not search for TLD in already extracted URL
895 tld_pos_url = self._get_tld_pos(tmp_url, tld)
896 # move cursor right after found TLD
897 tld_pos += len(tld) + offset
898 # move cursor after end of found URL
899 rest_url = tmp_url[tld_pos_url + len(tld) :]
900 tld_pos += len(rest_url)
901
902 # remove all matched TLDs that were found in currently
903 # extracted URL (tmp_url resp. rest_url)
904 while matched_tlds:
905 new_tld = matched_tlds[0]
906 tmp_tld_pos_url = rest_url.find(new_tld)
907 if tmp_tld_pos_url < 0:
908 break
909 rest_url = rest_url[tmp_tld_pos_url + len(new_tld) :]
910 matched_tlds.pop(0)
911
912 if get_indices:
913 yield tmp_url, (tld_pos - len(tmp_url), tld_pos)
914 else:
915 yield tmp_url
916
917 continue
918
919 # move cursor right after found TLD
920 tld_pos += len(tld) + offset
921
922 def find_urls(
923 self,
924 text: str,
925 only_unique=False,
926 check_dns=False,
927 get_indices=False,
928 with_schema_only=False,
929 ) -> List[Union[str, Tuple[str, Tuple[int, int]]]]:
930 """
931 Find all URLs in given text.
932
933 :param str text: text where we want to find URLs
934 :param bool only_unique: return only unique URLs
935 :param bool check_dns: filter results to valid domains
936 :return: list of URLs found in text
937 :param bool get_indices: whether to return beginning and
938 ending indices as (<url>, (idx_begin, idx_end))
939 :param bool with_schema_only: get domains with schema only
940 (e.g. https://janlipovsky.cz but not example.com)
941 :rtype: list
942
943 :raises URLExtractError: Raised when count of found URLs reaches
944 given limit. Processed URLs are returned in `data` argument.
945 """
946 urls = self.gen_urls(
947 text,
948 check_dns=check_dns,
949 get_indices=get_indices,
950 with_schema_only=with_schema_only,
951 )
952 if self._limit is None:
953 if only_unique:
954 return list(OrderedDict.fromkeys(urls))
955 return list(urls)
956
957 result_urls: List[Union[str, Tuple[str, Tuple[int, int]]]] = []
958 url = next(urls, "")
959 url_count = 1
960 while url:
961 if url_count > self._limit:
962 err = "Limit for extracting URLs was reached. [{} URLs]".format(
963 self._limit
964 )
965 self._logger.error(err)
966
967 raise URLExtractError(err, data=result_urls)
968
969 result_urls.append(url)
970 url = next(urls, "")
971 url_count += 1
972
973 if only_unique:
974 return list(OrderedDict.fromkeys(result_urls))
975 return result_urls
976
977 def has_urls(self, text: str, check_dns=False, with_schema_only=False) -> bool:
978 """
979 Checks if text contains any valid URL.
980 Returns True if text contains at least one URL.
981
982 :param text: text where we want to find URLs
983 :param bool check_dns: filter results to valid domains
984 :param bool with_schema_only: consider domains with schema only
985 :return: True if et least one URL was found, False otherwise
986 :rtype: bool
987 """
988
989 return any(
990 self.gen_urls(text, check_dns=check_dns, with_schema_only=with_schema_only)
991 )
992
993
994class URLExtractError(Exception):
995 """
996 Raised when some error occurred during processing URLs.
997
998 Attributes:
999 message -- explanation of the error
1000 data -- input expression in which the error occurred
1001 """
1002
1003 def __init__(self, message, data):
1004 self.data = data
1005 self.message = message
1006
1007
1008def report_issue(func):
1009 """Friendly message with link to GitHub for easier reporting"""
1010
1011 @functools.wraps(func)
1012 def wrapper_urlextract_cli(*args, **kwargs):
1013 try:
1014 return func(*args, **kwargs)
1015 except Exception:
1016 print(
1017 "Error: An unexpected error occurred. "
1018 "If you can't resolve this issue please report it to: "
1019 "https://github.com/lipoja/URLExtract/issues "
1020 "and help us improve urlextract!",
1021 file=sys.stderr,
1022 )
1023 raise
1024
1025 return wrapper_urlextract_cli
1026
1027
1028@report_issue
1029def _urlextract_cli():
1030 """
1031 urlextract - command line program that will print all URLs to stdout
1032 Usage: urlextract [input_file] [-u] [-v]
1033
1034 input_file - text file with URLs to extract
1035 """
1036 import argparse
1037
1038 # TODO: add type checking here
1039 def get_args() -> Namespace:
1040 """Parse programs arguments"""
1041 parser = argparse.ArgumentParser(
1042 description="urlextract - prints out all URLs that were "
1043 "found in input file or stdin based on locating "
1044 "their TLDs"
1045 )
1046
1047 ver = URLExtract.get_version()
1048 parser.add_argument(
1049 "-v",
1050 "--version",
1051 action="version",
1052 version="%(prog)s - version {}".format(ver),
1053 )
1054
1055 parser.add_argument(
1056 "-u",
1057 "--unique",
1058 dest="unique",
1059 action="store_true",
1060 help="print out only unique URLs found in file",
1061 )
1062
1063 parser.add_argument(
1064 "-dl",
1065 "--disable-localhost",
1066 dest="disable_localhost",
1067 action="store_true",
1068 help='disable extracting "localhost" as URL',
1069 )
1070
1071 parser.add_argument(
1072 "-c",
1073 "--check-dns",
1074 dest="check_dns",
1075 action="store_true",
1076 help="print out only URLs for existing domain names",
1077 )
1078
1079 parser.add_argument(
1080 "-i",
1081 "--ignore-file",
1082 metavar="<ignore_file>",
1083 type=str,
1084 default=None,
1085 help="input text file with URLs to exclude from extraction",
1086 )
1087
1088 parser.add_argument(
1089 "-p",
1090 "--permit-file",
1091 metavar="<permit_file>",
1092 type=str,
1093 default=None,
1094 help="input text file with URLs that can be processed",
1095 )
1096
1097 parser.add_argument(
1098 "-l",
1099 "--limit",
1100 dest="limit",
1101 type=int,
1102 default=DEFAULT_LIMIT,
1103 help="Maximum count of URLs that can be processed. "
1104 "Set 0 to disable the limit. "
1105 "Default: {}".format(DEFAULT_LIMIT),
1106 )
1107
1108 parser.add_argument(
1109 "input_file",
1110 nargs="?",
1111 metavar="<input_file>",
1112 type=argparse.FileType(),
1113 default=sys.stdin,
1114 help="input text file with URLs to extract",
1115 )
1116
1117 parsed_args = parser.parse_args()
1118 return parsed_args
1119
1120 args = get_args()
1121 logging.basicConfig(
1122 level=logging.WARNING,
1123 stream=sys.stderr,
1124 format="%(asctime)s - %(levelname)s (%(name)s): %(message)s",
1125 )
1126 logger = logging.getLogger("urlextract")
1127
1128 try:
1129 limit = None if args.limit <= 0 else args.limit
1130 urlextract = URLExtract(limit=limit)
1131 if args.disable_localhost:
1132 urlextract.extract_localhost = False
1133 if args.ignore_file:
1134 urlextract.load_ignore_list(args.ignore_file)
1135 if args.permit_file:
1136 urlextract.load_permit_list(args.permit_file)
1137 urlextract.update_when_older(30)
1138 content = args.input_file.read()
1139 try:
1140 for url in urlextract.find_urls(content, args.unique, args.check_dns):
1141 print(url)
1142 except URLExtractError as e:
1143 logger.error(
1144 "You can set limit using --limit parameter. "
1145 "See --help for more details."
1146 )
1147 for url in e.data:
1148 print(url)
1149
1150 except CacheFileError as e:
1151 logger.error(str(e))
1152 sys.exit(-1)
1153 finally:
1154 args.input_file.close()
1155
1156
1157def dns_cache_install() -> None:
1158 try:
1159 from dns import resolver as dnspython_resolver_module # type: ignore
1160 from dns_cache.resolver import ExceptionCachingResolver # type: ignore
1161
1162 if not dnspython_resolver_module.default_resolver:
1163 dnspython_resolver_module.default_resolver = ExceptionCachingResolver()
1164 del dnspython_resolver_module
1165 except ImportError:
1166 pass
1167
1168 try:
1169 from dns.resolver import ( # type: ignore
1170 LRUCache,
1171 Resolver,
1172 _resolver,
1173 default_resolver,
1174 override_system_resolver,
1175 )
1176 except ImportError:
1177 return
1178
1179 if default_resolver:
1180 if not default_resolver.cache:
1181 default_resolver.cache = LRUCache()
1182 resolver = default_resolver
1183 elif _resolver and _resolver.cache:
1184 resolver = _resolver
1185 else:
1186 resolver = Resolver()
1187 resolver.cache = LRUCache()
1188 override_system_resolver(resolver)
1189
1190
1191if __name__ == "__main__":
1192 _urlextract_cli()