]> git.mxchange.org Git - fba.git/blob - fba/helpers/domain.py
Continued:
[fba.git] / fba / helpers / domain.py
1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17 import logging
18
19 from urllib.parse import urlparse
20
21 import validators
22
23 from fba.helpers import blacklist
24 from fba.helpers import config
25
26 from fba.models import instances
27
28 logging.basicConfig(level=logging.INFO)
29 logger = logging.getLogger(__name__)
30
31 # In-function cache
32 _cache = {
33     # Cache for function is_in_url()
34     "is_in_url": {},
35
36     # Cache for function is_wanted()
37     "is_wanted": {},
38
39     # Cache for function raise_on()
40     "raise_on": {},
41 }
42
43 def raise_on(domain: str):
44     logger.debug("domain='%s' - CALLED!", domain)
45
46     if not isinstance(domain, str):
47         raise ValueError(f"Parameter domain[]='{type(domain)}' is not of type 'str'")
48     elif domain == "":
49         raise ValueError("Parameter 'domain' is empty")
50     elif domain in _cache["raise_on"]:
51         logger.debug("Returning cached raised_on='%s' - EXIT!", _cache["raise_on"][domain])
52         return _cache["raise_on"][domain]
53     elif domain.lower() != domain:
54         raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
55     elif not validators.domain(domain.split("/")[0]):
56         raise ValueError(f"domain='{domain}' is not a valid domain")
57     elif domain.endswith(".onion"):
58         raise ValueError(f"domain='{domain}' is a TOR, please don't crawl them!")
59     elif domain.endswith(".i2p") and config.get("allow_i2p_domain") == "true":
60         raise ValueError(f"domain='{domain}' is an I2P, please don't crawl them!")
61     elif domain.endswith(".arpa"):
62         raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
63     elif domain.endswith(".tld"):
64         raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
65
66     _cache["raise_on"][domain] = True
67     logger.debug("EXIT!")
68
69 def is_in_url(domain: str, url: str) -> bool:
70     logger.debug("domain='%s',url='%s' - CALLED!", domain, url)
71     raise_on(domain)
72
73     if not isinstance(url, str):
74         raise ValueError(f"Parameter url[]='{type(url)}' is not of type 'str'")
75     elif url == "":
76         raise ValueError("Parameter 'url' is empty")
77     elif domain + url in _cache["is_in_url"]:
78         logger.debug("Returning cached is_in_url='%s' - EXIT!", _cache["is_in_url"][domain + url])
79         return _cache["is_in_url"][domain + url]
80
81     punycode = domain.encode("idna").decode("utf-8")
82
83     components = urlparse(url)
84     logger.debug("components[]='%s',punycode='%s'", type(components), punycode)
85
86     is_found = (punycode in [components.netloc, components.hostname])
87
88     # Set cache
89     _cache["is_in_url"][domain + url] = is_found
90
91     logger.debug("is_found='%s' - EXIT!", is_found)
92     return is_found
93
94 def is_wanted(domain: str) -> bool:
95     logger.debug("domain='%s' - CALLED!", domain)
96
97     if not isinstance(domain, str):
98         raise ValueError(f"Parameter domain[]='{type(domain)}' is not of type 'str'")
99     elif domain == "":
100         raise ValueError("Parameter 'domain' is empty")
101     elif domain in _cache["is_wanted"]:
102         logger.debug("Returning cached is_found='%s' - EXIT!", _cache["is_wanted"][domain])
103         return _cache["is_wanted"][domain]
104
105     wanted = True
106     if domain.lower() != domain:
107         logger.debug("domain='%s' is not all-lowercase - setting False ...", domain)
108         wanted = False
109     elif not validators.domain(domain.split("/")[0]):
110         logger.debug("domain='%s' is not a valid domain name - setting False ...", domain)
111         wanted = False
112     elif domain.endswith(".arpa"):
113         logger.debug("domain='%s' is a domain for reversed IP addresses - setting False ...", domain)
114         wanted = False
115     elif domain.endswith(".onion"):
116         logger.debug("domain='%s' is a TOR .onion domain - setting False ...", domain)
117         wanted = False
118     elif domain.endswith(".i2p") and config.get("allow_i2p_domain") == "true":
119         logger.debug("domain='%s' is an I2P domain - setting False ...", domain)
120         wanted = False
121     elif domain.endswith(".tld"):
122         logger.debug("domain='%s' is a fake domain - setting False ...", domain)
123         wanted = False
124     elif blacklist.is_blacklisted(domain):
125         logger.debug("domain='%s' is blacklisted - setting False ...", domain)
126         wanted = False
127     elif domain.find("/profile/") > 0 or domain.find("/users/") > 0 or (instances.is_registered(domain.split("/")[0]) and domain.find("/c/") > 0):
128         logger.debug("domain='%s' is a single user", domain)
129         wanted = False
130     elif domain.find("/tag/") > 0:
131         logger.debug("domain='%s' is a tag", domain)
132         wanted = False
133
134     # Set cache
135     _cache["is_wanted"][domain] = wanted
136
137     logger.debug("wanted='%s' - EXIT!", wanted)
138     return wanted