]> git.mxchange.org Git - fba.git/blob - fba/networks/lemmy.py
47afc1b5833d67abdbc1ecc414aaca4de4d10d16
[fba.git] / fba / networks / lemmy.py
1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17 import logging
18
19 import bs4
20
21 from fba import csrf
22 from fba import utils
23
24 from fba.helpers import config
25 from fba.helpers import domain as domain_helper
26 from fba.helpers import tidyup
27
28 from fba.http import federation
29 from fba.http import network
30
31 from fba.models import instances
32
33 logging.basicConfig(level=logging.INFO)
34 logger = logging.getLogger(__name__)
35 #logger.setLevel(logging.DEBUG)
36
37 def fetch_peers(domain: str, origin: str) -> list:
38     logger.debug("domain='%s',origin='%s' - CALLED!", domain, origin)
39     domain_helper.raise_on(domain)
40
41     peers = list()
42
43     # No CSRF by default, you don't have to add network.api_headers by yourself here
44     headers = tuple()
45
46     try:
47         logger.debug("Checking CSRF for domain='%s'", domain)
48         headers = csrf.determine(domain, dict())
49     except network.exceptions as exception:
50         logger.warning("Exception '%s' during checking CSRF (fetch_peers,%s) - EXIT!", type(exception), __name__)
51         instances.set_last_error(domain, exception)
52         return list()
53
54     try:
55         logger.debug("Fetching '/api/v3/site' from domain='%s' ...", domain)
56         data = network.get_json_api(
57             domain,
58             "/api/v3/site",
59             headers,
60             (config.get("connection_timeout"), config.get("read_timeout"))
61         )
62
63         logger.debug("data[]='%s'", type(data))
64         if "error_message" in data:
65             logger.warning("Could not reach any JSON API: domain='%s'", domain)
66             instances.set_last_error(domain, data)
67         elif "federated_instances" in data["json"] and isinstance(data["json"]["federated_instances"], dict):
68             logger.debug("Found federated_instances for domain='%s'", domain)
69             peers = peers + federation.add_peers(data["json"]["federated_instances"])
70
71             logger.debug("Marking domain='%s' as successfully handled ...")
72             instances.set_success(domain)
73
74         if len(peers) == 0:
75             logger.warning("Fetching instances for domain='%s' from /instances ...", domain)
76             peers = fetch_instances(domain, origin)
77
78     except network.exceptions as exception:
79         logger.warning("Exception during fetching JSON: domain='%s',exception[%s]:'%s'", domain, type(exception), str(exception))
80         instances.set_last_error(domain, exception)
81
82     logger.debug("peers()=%d - EXIT!", len(peers))
83     return peers
84
85 def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
86     logger.debug("domain='%s,nodeinfo_url='%s' - CALLED!", domain, nodeinfo_url)
87     domain_helper.raise_on(domain)
88
89     if not isinstance(nodeinfo_url, str):
90         raise ValueError(f"Parameter nodeinfo_url[]='{type(nodeinfo_url)}' is not 'str'")
91     elif nodeinfo_url == "":
92         raise ValueError("Parameter 'nodeinfo_url' is empty")
93
94     translations = [
95         "Blocked Instances".lower(),
96         "Instàncies bloquejades".lower(),
97         "Blocáilte Ásc".lower(),
98         "封锁实例".lower(),
99         "Blokované instance".lower(),
100         "Geblokkeerde instanties".lower(),
101         "Blockerade instanser".lower(),
102         "Instàncias blocadas".lower(),
103         "Istanze bloccate".lower(),
104         "Instances bloquées".lower(),
105         "Letiltott példányok".lower(),
106         "Instancias bloqueadas".lower(),
107         "Blokeatuta dauden instantziak".lower(),
108         "차단된 인스턴스".lower(),
109         "Peladen Yang Diblokir".lower(),
110         "Blokerede servere".lower(),
111         "Blokitaj nodoj".lower(),
112         "Блокирани Инстанции".lower(),
113         "Blockierte Instanzen".lower(),
114         "Estetyt instanssit".lower(),
115         "Instâncias bloqueadas".lower(),
116         "Zablokowane instancje".lower(),
117         "Blokované inštancie".lower(),
118         "المثلاء المحجوبون".lower(),
119         "Užblokuoti serveriai".lower(),
120         "ブロックしたインスタンス".lower(),
121         "Блокированные Инстансы".lower(),
122         "Αποκλεισμένοι διακομιστές".lower(),
123         "封鎖站台".lower(),
124         "Instâncias bloqueadas".lower(),
125     ]
126
127     blocklist = list()
128
129     try:
130         # json endpoint for newer mastodongs
131         logger.debug("Fetching /instances from domain='%s'", domain)
132         response = network.fetch_response(
133             domain,
134             "/instances",
135             network.web_headers,
136             (config.get("connection_timeout"), config.get("read_timeout"))
137         )
138
139         logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
140         if response.ok and response.status_code < 300 and response.text != "":
141             logger.debug("Parsing %s Bytes ...", len(response.text))
142
143             doc = bs4.BeautifulSoup(response.text, "html.parser")
144             logger.debug("doc[]='%s'", type(doc))
145
146             headers = doc.findAll("h5")
147             found = None
148             logger.debug("Search in %d header(s) ...", len(headers))
149             for header in headers:
150                 logger.debug("header[]='%s'", type(header))
151                 content = header.contents[0]
152
153                 logger.debug("content[%s]='%s'", type(content), content)
154                 if content is None:
155                     logger.debug("domain='%s' has returned empty header='%s' - SKIPPED!", domain, header)
156                     continue
157                 elif not isinstance(content, str):
158                     logger.debug("content[]='%s' is not supported/wanted type 'str' - SKIPPED!", type(content))
159                     continue
160                 elif content.lower() in translations:
161                     logger.debug("Found header with blocked instances - BREAK!")
162                     found = header
163                     break
164
165             logger.debug("found[]='%s'", type(found))
166             if found is None:
167                 logger.debug("domain='%s' is not blocking any instances - EXIT!", domain)
168                 return blocklist
169
170             blocking = found.find_next(["ul","table"]).findAll("a")
171             logger.debug("Found %d blocked instance(s) ...", len(blocking))
172             for tag in blocking:
173                 logger.debug("tag[]='%s'", type(tag))
174                 blocked = tidyup.domain(tag.contents[0])
175                 logger.debug("blocked='%s'", blocked)
176
177                 if not utils.is_domain_wanted(blocked):
178                     logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
179                     continue
180
181                 logger.debug("Appending blocker='%s',blocked='%s',block_level='reject'", domain, blocked)
182                 blocklist.append({
183                     "blocker"    : domain,
184                     "blocked"    : blocked,
185                     "reason"     : None,
186                     "block_level": "reject",
187                 })
188
189     except network.exceptions as exception:
190         logger.warning("domain='%s',exception[%s]:'%s'", domain, type(exception), str(exception))
191         instances.set_last_error(domain, exception)
192
193     logger.debug("blocklist()=%d - EXIT!", len(blocklist))
194     return blocklist
195
196 def fetch_instances(domain: str, origin: str) -> list:
197     logger.debug("domain='%s',origin='%s' - CALLED!", domain, origin)
198     domain_helper.raise_on(domain)
199
200     peers = list()
201
202     try:
203         # json endpoint for newer mastodongs
204         logger.debug("Fetching /instances from domain='%s'", domain)
205         response = network.fetch_response(
206             domain,
207             "/instances",
208             network.web_headers,
209             (config.get("connection_timeout"), config.get("read_timeout"))
210         )
211
212         logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
213         if response.ok and response.status_code < 300 and response.text != "":
214             logger.debug("Parsing %s Bytes ...", len(response.text))
215
216             doc = bs4.BeautifulSoup(response.text, "html.parser")
217             logger.debug("doc[]='%s'", type(doc))
218
219             headers = doc.findAll("h5")
220             logger.debug("Checking %d headers ...", len(headers))
221             for header in headers:
222                 logger.debug("header[%s]='%s'", type(header), header)
223
224                 rows = header.find_next(["ul","table"]).findAll("a")
225                 logger.debug("Found %d blocked instance(s) ...", len(rows))
226                 for tag in rows:
227                     logger.debug("tag[]='%s'", type(tag))
228                     text = tag.contents[0] if isinstance(tag.contents[0], str) else tag.contents[0].text
229                     peer = tidyup.domain(text)
230                     logger.debug("peer='%s'", peer)
231
232                     if peer == "":
233                         logger.debug("peer is empty - SKIPPED!")
234                         continue
235                     elif not utils.is_domain_wanted(peer):
236                         logger.debug("peer='%s' is not wanted - SKIPPED!", peer)
237                         continue
238                     elif peer in peers:
239                         logger.debug("peer='%s' already added - SKIPPED!", peer)
240                         continue
241
242                     logger.debug("Appending peer='%s' ...", peer)
243                     peers.append(peer)
244
245         logger.debug("Marking domain='%s' as successfully handled ...")
246         instances.set_success(domain)
247
248     except network.exceptions as exception:
249         logger.warning("domain='%s',exception[%s]:'%s'", domain, type(exception), str(exception))
250         instances.set_last_error(domain, exception)
251
252     logger.debug("peers()=%d - EXIT!", len(peers))
253     return peers