]> git.mxchange.org Git - fba.git/blob - fba/networks/lemmy.py
Continued:
[fba.git] / fba / networks / lemmy.py
1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17 import inspect
18 import logging
19
20 import bs4
21
22 from fba import csrf
23 from fba import database
24 from fba import utils
25
26 from fba.helpers import config
27 from fba.helpers import domain as domain_helper
28 from fba.helpers import tidyup
29
30 from fba.http import federation
31 from fba.http import network
32
33 from fba.models import blocks
34 from fba.models import instances
35
36 logging.basicConfig(level=logging.INFO)
37 logger = logging.getLogger(__name__)
38
39 def fetch_peers(domain: str) -> list:
40     logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
41     domain_helper.raise_on(domain)
42
43     peers = list()
44
45     # No CSRF by default, you don't have to add network.api_headers by yourself here
46     headers = tuple()
47
48     try:
49         logger.debug("Checking CSRF for domain='%s'", domain)
50         headers = csrf.determine(domain, dict())
51     except network.exceptions as exception:
52         logger.warning("Exception '%s' during checking CSRF (fetch_peers,%s) - EXIT!", type(exception), __name__)
53         instances.set_last_error(domain, exception)
54         return list()
55
56     try:
57         logger.debug("Fetching '/api/v3/site' from domain='%s' ...", domain)
58         data = network.get_json_api(
59             domain,
60             "/api/v3/site",
61             headers,
62             (config.get("connection_timeout"), config.get("read_timeout"))
63         )
64
65         logger.debug("data[]='%s'", type(data))
66         if "error_message" in data:
67             logger.warning("Could not reach any JSON API: domain='%s'", domain)
68             instances.set_last_error(domain, data)
69         elif "federated_instances" in data["json"] and isinstance(data["json"]["federated_instances"], dict):
70             logger.debug("Found federated_instances for domain='%s'", domain)
71             peers = peers + federation.add_peers(data["json"]["federated_instances"])
72             logger.debug("Added instance(s) to peers")
73         else:
74             logger.warning("JSON response does not contain 'federated_instances', domain='%s'", domain)
75             instances.set_last_error(domain, data)
76
77     except network.exceptions as exception:
78         logger.warning("Exception during fetching JSON: domain='%s',exception[%s]:'%s'", domain, type(exception), str(exception))
79         instances.set_last_error(domain, exception)
80
81     logger.debug("Adding %d for domain='%s'", len(peers), domain)
82     instances.set_total_peers(domain, peers)
83
84     logger.debug("peers()=%d - EXIT!", len(peers))
85     return peers
86
87 def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
88     logger.debug("domain='%s,nodeinfo_url='%s' - CALLED!", domain, nodeinfo_url)
89     domain_helper.raise_on(domain)
90
91     if not isinstance(nodeinfo_url, str):
92         raise ValueError(f"Parameter nodeinfo_url[]='{type(nodeinfo_url)}' is not 'str'")
93     elif nodeinfo_url == "":
94         raise ValueError("Parameter 'nodeinfo_url' is empty")
95
96     translations = [
97         "Blocked Instances",
98         "Instàncies bloquejades",
99         "Blocáilte Ásc",
100         "封锁实例",
101         "Blokované instance",
102         "Geblokkeerde instanties",
103         "Blockerade instanser",
104         "Instàncias blocadas",
105         "Istanze bloccate",
106         "Instances bloquées",
107         "Letiltott példányok",
108         "Instancias bloqueadas",
109         "Blokeatuta dauden instantziak",
110         "차단된 인스턴스",
111         "Peladen Yang Diblokir",
112         "Blokerede servere",
113         "Blokitaj nodoj",
114         "Блокирани Инстанции",
115         "Blockierte Instanzen",
116         "Estetyt instanssit",
117         "Instâncias bloqueadas",
118         "Zablokowane instancje",
119         "Blokované inštancie",
120         "المثلاء المحجوبون",
121         "Užblokuoti serveriai",
122         "ブロックしたインスタンス",
123         "Блокированные Инстансы",
124         "Αποκλεισμένοι διακομιστές",
125         "封鎖站台",
126         "Instâncias bloqueadas",
127     ]
128
129     blocklist = list()
130
131     try:
132         # json endpoint for newer mastodongs
133         logger.debug("Fetching /instances from domain='%s'", domain)
134         response = network.fetch_response(
135             domain,
136             "/instances",
137             network.web_headers,
138             (config.get("connection_timeout"), config.get("read_timeout"))
139         )
140
141         logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
142         if response.ok and response.status_code < 300 and response.text != "":
143             logger.debug("Parsing %s Bytes ...", len(response.text))
144
145             doc = bs4.BeautifulSoup(response.text, "html.parser")
146             logger.debug("doc[]='%s'", type(doc))
147
148             headers = doc.findAll("h5")
149             found = None
150             logger.debug("Search in %d header(s) ...", len(headers))
151             for header in headers:
152                 logger.debug("header[]='%s'", type(header))
153                 content = header.contents[0]
154
155                 logger.debug("content[%s]='%s'", type(content), content)
156                 if content in translations:
157                     logger.debug("Found header with blocked instances - BREAK!")
158                     found = header
159                     break
160
161             logger.debug("found[]='%s'", type(found))
162             if found is None:
163                 logger.debug("domain='%s' is not blocking any instances - EXIT!", domain)
164                 return blocklist
165
166             blocking = found.find_next("ul").findAll("a")
167             logger.debug("Found %d blocked instance(s) ...", len(blocking))
168             for tag in blocking:
169                 logger.debug("tag[]='%s'", type(tag))
170                 blocked = tidyup.domain(tag.contents[0])
171                 logger.debug("blocked='%s'", blocked)
172
173                 if not utils.is_domain_wanted(blocked):
174                     logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
175                     continue
176
177                 logger.debug("Appending blocker='%s',blocked='%s',block_level='reject'", domain, blocked)
178                 blocklist.append({
179                     "blocker"    : domain,
180                     "blocked"    : blocked,
181                     "reason"     : None,
182                     "block_level": "reject",
183                 })
184
185     except network.exceptions as exception:
186         logger.warning("domain='%s',exception[%s]:'%s'", domain, type(exception), str(exception))
187         instances.set_last_error(domain, exception)
188
189     logger.debug("blocklist()=%d - EXIT!", len(blocklist))
190     return blocklist