]> git.mxchange.org Git - fba.git/blob - fba/networks/lemmy.py
Continued:
[fba.git] / fba / networks / lemmy.py
1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17 import logging
18
19 import bs4
20
21 from fba import csrf
22 from fba import utils
23
24 from fba.helpers import config
25 from fba.helpers import domain as domain_helper
26 from fba.helpers import tidyup
27
28 from fba.http import federation
29 from fba.http import network
30
31 from fba.models import instances
32
33 logging.basicConfig(level=logging.INFO)
34 logger = logging.getLogger(__name__)
35 #logger.setLevel(logging.DEBUG)
36
37 def fetch_peers(domain: str, origin: str) -> list:
38     logger.debug("domain='%s',origin='%s' - CALLED!", domain, origin)
39     domain_helper.raise_on(domain)
40
41     peers = list()
42
43     # No CSRF by default, you don't have to add network.api_headers by yourself here
44     headers = tuple()
45
46     try:
47         logger.debug("Checking CSRF for domain='%s'", domain)
48         headers = csrf.determine(domain, dict())
49     except network.exceptions as exception:
50         logger.warning("Exception '%s' during checking CSRF (fetch_peers,%s) - EXIT!", type(exception), __name__)
51         instances.set_last_error(domain, exception)
52         return list()
53
54     try:
55         logger.debug("Fetching '/api/v3/site' from domain='%s' ...", domain)
56         data = network.get_json_api(
57             domain,
58             "/api/v3/site",
59             headers,
60             (config.get("connection_timeout"), config.get("read_timeout"))
61         )
62
63         logger.debug("data[]='%s'", type(data))
64         if "error_message" in data:
65             logger.warning("Could not reach any JSON API: domain='%s'", domain)
66             instances.set_last_error(domain, data)
67         elif "federated_instances" in data["json"] and isinstance(data["json"]["federated_instances"], dict):
68             logger.debug("Found federated_instances for domain='%s'", domain)
69             peers = peers + federation.add_peers(data["json"]["federated_instances"])
70
71             logger.debug("Marking domain='%s' as successfully handled ...", domain)
72             instances.set_success(domain)
73
74         if len(peers) == 0:
75             logger.warning("Fetching instances for domain='%s' from /instances ...", domain)
76             peers = fetch_instances(domain, origin)
77
78     except network.exceptions as exception:
79         logger.warning("Exception during fetching JSON: domain='%s',exception[%s]:'%s'", domain, type(exception), str(exception))
80         instances.set_last_error(domain, exception)
81
82     logger.debug("peers()=%d - EXIT!", len(peers))
83     return peers
84
85 def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
86     logger.debug("domain='%s,nodeinfo_url='%s' - CALLED!", domain, nodeinfo_url)
87     domain_helper.raise_on(domain)
88
89     if not isinstance(nodeinfo_url, str):
90         raise ValueError(f"Parameter nodeinfo_url[]='{type(nodeinfo_url)}' is not 'str'")
91     elif nodeinfo_url == "":
92         raise ValueError("Parameter 'nodeinfo_url' is empty")
93
94     translations = [
95         "Blocked Instances".lower(),
96         "Instàncies bloquejades".lower(),
97         "Blocáilte Ásc".lower(),
98         "封锁实例".lower(),
99         "Blokované instance".lower(),
100         "Geblokkeerde instanties".lower(),
101         "Blockerade instanser".lower(),
102         "Instàncias blocadas".lower(),
103         "Istanze bloccate".lower(),
104         "Instances bloquées".lower(),
105         "Letiltott példányok".lower(),
106         "Instancias bloqueadas".lower(),
107         "Blokeatuta dauden instantziak".lower(),
108         "차단된 인스턴스".lower(),
109         "Peladen Yang Diblokir".lower(),
110         "Blokerede servere".lower(),
111         "Blokitaj nodoj".lower(),
112         "Блокирани Инстанции".lower(),
113         "Blockierte Instanzen".lower(),
114         "Estetyt instanssit".lower(),
115         "Instâncias bloqueadas".lower(),
116         "Zablokowane instancje".lower(),
117         "Blokované inštancie".lower(),
118         "المثلاء المحجوبون".lower(),
119         "Užblokuoti serveriai".lower(),
120         "ブロックしたインスタンス".lower(),
121         "Блокированные Инстансы".lower(),
122         "Αποκλεισμένοι διακομιστές".lower(),
123         "封鎖站台".lower(),
124         "Instâncias bloqueadas".lower(),
125     ]
126
127     blocklist = list()
128
129     try:
130         # json endpoint for newer mastodongs
131         logger.debug("Fetching /instances from domain='%s'", domain)
132         response = network.fetch_response(
133             domain,
134             "/instances",
135             network.web_headers,
136             (config.get("connection_timeout"), config.get("read_timeout"))
137         )
138
139         logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
140         if response.ok and response.status_code < 300 and response.text != "":
141             logger.debug("Parsing %s Bytes ...", len(response.text))
142
143             doc = bs4.BeautifulSoup(response.text, "html.parser")
144             logger.debug("doc[]='%s'", type(doc))
145
146             headers = doc.findAll("h5")
147             found = None
148             logger.debug("Search in %d header(s) ...", len(headers))
149             for header in headers:
150                 logger.debug("header[]='%s'", type(header))
151                 content = header.contents[0]
152
153                 logger.debug("content[%s]='%s'", type(content), content)
154                 if content is None:
155                     logger.debug("domain='%s' has returned empty header='%s' - SKIPPED!", domain, header)
156                     continue
157                 elif not isinstance(content, str):
158                     logger.debug("content[]='%s' is not supported/wanted type 'str' - SKIPPED!", type(content))
159                     continue
160                 elif content.lower() in translations:
161                     logger.debug("Found header with blocked instances - BREAK!")
162                     found = header
163                     break
164
165             logger.debug("found[]='%s'", type(found))
166             if found is None:
167                 logger.debug("domain='%s' is not blocking any instances - EXIT!", domain)
168                 return blocklist
169
170             blocking = found.find_next(["ul","table"]).findAll("a")
171             logger.debug("Found %d blocked instance(s) ...", len(blocking))
172             for tag in blocking:
173                 logger.debug("tag[]='%s'", type(tag))
174                 blocked = tidyup.domain(tag.contents[0])
175                 logger.debug("blocked='%s'", blocked)
176
177                 if blocked == "":
178                     logger.warning("blocked='%s' is empty after tidyup.domain() - SKIPPED!", tag.contents[0])
179                     continue
180                 elif not utils.is_domain_wanted(blocked):
181                     logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
182                     continue
183
184                 logger.debug("Appending blocker='%s',blocked='%s',block_level='reject' ...", domain, blocked)
185                 blocklist.append({
186                     "blocker"    : domain,
187                     "blocked"    : blocked,
188                     "reason"     : None,
189                     "block_level": "reject",
190                 })
191
192     except network.exceptions as exception:
193         logger.warning("domain='%s',exception[%s]:'%s'", domain, type(exception), str(exception))
194         instances.set_last_error(domain, exception)
195
196     logger.debug("blocklist()=%d - EXIT!", len(blocklist))
197     return blocklist
198
199 def fetch_instances(domain: str, origin: str) -> list:
200     logger.debug("domain='%s',origin='%s' - CALLED!", domain, origin)
201     domain_helper.raise_on(domain)
202
203     peers = list()
204
205     try:
206         # json endpoint for newer mastodongs
207         logger.debug("Fetching /instances from domain='%s'", domain)
208         response = network.fetch_response(
209             domain,
210             "/instances",
211             network.web_headers,
212             (config.get("connection_timeout"), config.get("read_timeout"))
213         )
214
215         logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
216         if response.ok and response.status_code < 300 and response.text != "":
217             logger.debug("Parsing %s Bytes ...", len(response.text))
218
219             doc = bs4.BeautifulSoup(response.text, "html.parser")
220             logger.debug("doc[]='%s'", type(doc))
221
222             headers = doc.findAll("h5")
223             logger.debug("Checking %d headers ...", len(headers))
224             for header in headers:
225                 logger.debug("header[%s]='%s'", type(header), header)
226
227                 rows = header.find_next(["ul","table"]).findAll("a")
228                 logger.debug("Found %d blocked instance(s) ...", len(rows))
229                 for tag in rows:
230                     logger.debug("tag[]='%s'", type(tag))
231                     text = tag.contents[0] if isinstance(tag.contents[0], str) else tag.contents[0].text
232                     peer = tidyup.domain(text)
233                     logger.debug("peer='%s'", peer)
234
235                     if peer == "":
236                         logger.debug("peer is empty - SKIPPED!")
237                         continue
238                     elif not utils.is_domain_wanted(peer):
239                         logger.debug("peer='%s' is not wanted - SKIPPED!", peer)
240                         continue
241                     elif peer in peers:
242                         logger.debug("peer='%s' already added - SKIPPED!", peer)
243                         continue
244
245                     logger.debug("Appending peer='%s' ...", peer)
246                     peers.append(peer)
247
248         logger.debug("Marking domain='%s' as successfully handled ...", domain)
249         instances.set_success(domain)
250
251     except network.exceptions as exception:
252         logger.warning("domain='%s',exception[%s]:'%s'", domain, type(exception), str(exception))
253         instances.set_last_error(domain, exception)
254
255     logger.debug("peers()=%d - EXIT!", len(peers))
256     return peers