]> git.mxchange.org Git - fba.git/blob - fba/helpers/domain.py
Continued:
[fba.git] / fba / helpers / domain.py
1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17 import logging
18
19 from urllib.parse import urlparse
20
21 import validators
22
23 from fba.helpers import blacklist
24 from fba.helpers import config
25
26 from fba.models import instances
27
28 logging.basicConfig(level=logging.INFO)
29 logger = logging.getLogger(__name__)
30
31 # In-function cache
32 _cache = {}
33
34 def raise_on(domain: str):
35     logger.debug("domain='%s' - CALLED!", domain)
36
37     if not isinstance(domain, str):
38         raise ValueError(f"Parameter domain[]='{type(domain)}' is not of type 'str'")
39     elif domain == "":
40         raise ValueError("Parameter 'domain' is empty")
41     elif "raise_on" in _cache and domain in _cache["raise_on"]:
42         logger.debug("Returning cached is_found='%s' - EXIT!", _cache["raise_on"][domain])
43         return _cache["raise_on"][domain]
44     elif domain.lower() != domain:
45         raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
46     elif not validators.domain(domain.split("/")[0]):
47         raise ValueError(f"domain='{domain}' is not a valid domain")
48     elif domain.endswith(".onion"):
49         raise ValueError(f"domain='{domain}' is a TOR, please don't crawl them!")
50     elif domain.endswith(".i2p") and config.get("allow_i2p_domain") == "true":
51         raise ValueError(f"domain='{domain}' is an I2P, please don't crawl them!")
52     elif domain.endswith(".arpa"):
53         raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
54     elif domain.endswith(".tld"):
55         raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
56     elif not "raise_on" in _cache:
57         logger.debug("Initializing cache for function 'raise_on' ...")
58         _cache["raise_on"] = {}
59
60     _cache["raise_on"][domain] = True
61     logger.debug("EXIT!")
62
63 def is_in_url(domain: str, url: str) -> bool:
64     logger.debug("domain='%s',url='%s' - CALLED!", domain, url)
65     raise_on(domain)
66
67     if not isinstance(url, str):
68         raise ValueError(f"Parameter url[]='{type(url)}' is not of type 'str'")
69     elif url == "":
70         raise ValueError("Parameter 'url' is empty")
71     elif "is_in_url" in _cache and domain + url in _cache["is_in_url"]:
72         logger.debug("Returning cached is_found='%s' - EXIT!", _cache["is_in_url"][domain + url])
73         return _cache["is_in_url"][domain + url]
74     elif "is_in_url" not in _cache:
75         logger.debug("Initializing cache for function 'is_in_url' ...")
76         _cache["is_in_url"] = {}
77
78     punycode = domain.encode("idna").decode("utf-8")
79
80     components = urlparse(url)
81     logger.debug("components[]='%s',punycode='%s'", type(components), punycode)
82
83     is_found = (punycode in [components.netloc, components.hostname])
84     _cache["is_in_url"][domain + url] = is_found
85
86     logger.debug("is_found='%s' - EXIT!", is_found)
87     return is_found
88
89 def is_wanted(domain: str) -> bool:
90     logger.debug("domain='%s' - CALLED!", domain)
91
92     if not isinstance(domain, str):
93         raise ValueError(f"Parameter domain[]='{type(domain)}' is not of type 'str'")
94     elif domain == "":
95         raise ValueError("Parameter 'domain' is empty")
96     elif "is_wanted" in _cache and domain in _cache["is_wanted"]:
97         logger.debug("Returning cached is_found='%s' - EXIT!", _cache["is_wanted"][domain])
98         return _cache["is_wanted"][domain]
99     elif "is_wanted" not in _cache:
100         logger.debug("Initializing cache for function 'is_wanted' ...")
101         _cache["is_wanted"] = {}
102
103     wanted = True
104     if domain.lower() != domain:
105         logger.debug("domain='%s' is not all-lowercase - setting False ...", domain)
106         wanted = False
107     elif not validators.domain(domain.split("/")[0]):
108         logger.debug("domain='%s' is not a valid domain name - setting False ...", domain)
109         wanted = False
110     elif domain.endswith(".arpa"):
111         logger.debug("domain='%s' is a domain for reversed IP addresses - setting False ...", domain)
112         wanted = False
113     elif domain.endswith(".onion"):
114         logger.debug("domain='%s' is a TOR .onion domain - setting False ...", domain)
115         wanted = False
116     elif domain.endswith(".i2p") and config.get("allow_i2p_domain") == "true":
117         logger.debug("domain='%s' is an I2P domain - setting False ...", domain)
118         wanted = False
119     elif domain.endswith(".tld"):
120         logger.debug("domain='%s' is a fake domain - setting False ...", domain)
121         wanted = False
122     elif blacklist.is_blacklisted(domain):
123         logger.debug("domain='%s' is blacklisted - setting False ...", domain)
124         wanted = False
125     elif domain.find("/profile/") > 0 or domain.find("/users/") > 0 or (instances.is_registered(domain.split("/")[0]) and domain.find("/c/") > 0):
126         logger.debug("domain='%s' is a single user", domain)
127         wanted = False
128     elif domain.find("/tag/") > 0:
129         logger.debug("domain='%s' is a tag", domain)
130         wanted = False
131
132     _cache["is_wanted"][domain] = wanted
133     logger.debug("wanted='%s' - EXIT!", wanted)
134     return wanted