print(f"ERROR: blocker='{blocker}',software='{software}',exception[{type(e)}]:'{str(e)}'")
elif software == "gotosocial":
print(f"INFO: blocker='{blocker}',software='{software}'")
- try:
- # Blocks
- federation = fba.get_response(blocker, f"{fba.get_peers_url}?filter=suspended", fba.api_headers, (config.get("connection_timeout"), config.get("read_timeout"))).json()
-
- if (federation == None):
- print("WARNING: No valid response:", blocker);
- elif "error" in federation:
- print("WARNING: API returned error:", federation["error"])
- else:
- print(f"INFO: Checking {len(federation)} entries from blocker='{blocker}',software='{software}' ...")
- for peer in federation:
- blocked = peer["domain"].lower()
- # DEBUG: print("DEBUG: BEFORE blocked:", blocked)
- blocked = fba.tidyup_domain(blocked)
- # DEBUG: print("DEBUG: AFTER blocked:", blocked)
-
- if blocked == "":
- print("WARNING: blocked is empty:", blocker)
- continue
- elif blacklist.is_blacklisted(blocked):
- # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
- continue
- elif blocked.count("*") > 0:
- # GTS does not have hashes for obscured domains, so we have to guess it
- fba.cursor.execute(
- "SELECT domain, origin, nodeinfo_url FROM instances WHERE domain LIKE ? ORDER BY rowid LIMIT 1", [blocked.replace("*", "_")]
- )
- searchres = fba.cursor.fetchone()
-
- if searchres == None:
- print(f"WARNING: Cannot deobsfucate blocked='{blocked}' - SKIPPED!")
- continue
-
- blocked = searchres[0]
- origin = searchres[1]
- nodeinfo_url = searchres[2]
- elif not validators.domain(blocked):
- print(f"WARNING: blocked='{blocked}',software='{software}' is not a valid domain name - skipped!")
- continue
-
- # DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
- if not validators.domain(blocked):
- print(f"WARNING: blocked='{blocked}',software='{software}' is not a valid domain name - skipped!")
- continue
- elif not fba.is_instance_registered(blocked):
- # DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., blocker='{blocker}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
- fba.add_instance(blocked, blocker, inspect.currentframe().f_code.co_name, nodeinfo_url)
-
- if not blocks.is_instance_blocked(blocker, blocked, "reject"):
- # DEBUG: print(f"DEBUG: blocker='{blocker}' is blocking '{blocked}' for unknown reason at this point")
- blocks.add_instance(blocker, blocked, "unknown", "reject")
-
- blockdict.append({
- "blocked": blocked,
- "reason" : None
- })
- else:
- # DEBUG: print(f"DEBUG: Updating block last seen for blocker='{blocker}',blocked='{blocked}' ...")
- blocks.update_last_seen(blocker, blocked, "reject")
-
- if "public_comment" in peer:
- # DEBUG: print("DEBUG: Updating block reason:", blocker, blocked, peer["public_comment"])
- blocks.update_reason(peer["public_comment"], blocker, blocked, "reject")
-
- for entry in blockdict:
- if entry["blocked"] == blocked:
- # DEBUG: print(f"DEBUG: Setting block reason for blocked='{blocked}':'{peer['public_comment']}'")
- entry["reason"] = peer["public_comment"]
-
- # DEBUG: print("DEBUG: Committing changes ...")
- fba.connection.commit()
- except Exception as e:
- print(f"ERROR: blocker='{blocker}',software='{software}',exception[{type(e)}]:'{str(e)}'")
+ gotosocial.fetch_blocks(blocker, origin, nodeinfo_url)
else:
print("WARNING: Unknown software:", blocker, software)
raise ValueError(f"Parameter 'domain' is empty")
# DEBUG: print("DEBUG: Fetching friendica blocks from domain:", domain)
- blocklist = []
+ blocked = list()
try:
doc = bs4.BeautifulSoup(
# DEBUG: print("DEBUG: Instance has no block list:", domain)
return {}
- for line in blocklist.find("table").find_all("tr")[1:]:
+ table = blocklist.find("table")
+
+ # DEBUG: print(f"DEBUG: table[]='{type(table)}'")
+ if table.find("tbody"):
+ rows = table.find("tbody").find_all("tr")
+ else:
+ rows = table.find_all("tr")
+
+ # DEBUG: print(f"DEBUG: Found rows()={len(rows)}")
+ for line in rows:
# DEBUG: print(f"DEBUG: line='{line}'")
- blocklist.append({
+ blocked.append({
"domain": tidyup_domain(line.find_all("td")[0].text),
- "reason": tidyup_domain(line.find_all("td")[1].text)
+ "reason": tidyup_reason(line.find_all("td")[1].text)
})
+ # DEBUG: print("DEBUG: Next!")
# DEBUG: print("DEBUG: Returning blocklist() for domain:", domain, len(blocklist))
return {
- "reject": blocklist
+ "reject": blocked
}
def fetch_misskey_blocks(domain: str) -> dict:
# Replace â with "
reason = re.sub("â", "\"", reason)
- ## DEBUG: print(f"DEBUG: reason='{reason}' - EXIT!")
+ # DEBUG: print(f"DEBUG: reason='{reason}' - EXIT!")
return reason
def tidyup_domain(domain: str) -> str:
__all__ = [
+ 'gotosocial',
'lemmy',
'mastodon',
'misskey',
--- /dev/null
+# Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
+# Copyright (C) 2023 Free Software Foundation
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+import inspect
+import validators
+
+from fba import blacklist
+from fba import blocks
+from fba import config
+from fba import fba
+
+def fetch_blocks(blocker: str, origin: str, nodeinfo_url: str):
+ print(f"DEBUG: domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}' - CALLED!")
+ if type(domain) != str:
+ raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
+ elif domain == "":
+ raise ValueError(f"Parameter 'domain' is empty")
+ elif type(origin) != str and origin != None:
+ raise ValueError(f"Parameter origin[]={type(origin)} is not 'str'")
+ elif origin == "":
+ raise ValueError(f"Parameter 'origin' is empty")
+ elif type(nodeinfo_url) != str:
+ raise ValueError(f"Parameter nodeinfo_url[]={type(nodeinfo_url)} is not 'str'")
+ elif nodeinfo_url == "":
+ raise ValueError(f"Parameter 'nodeinfo_url' is empty")
+
+ try:
+ # Blocks
+ federation = fba.get_response(blocker, f"{fba.get_peers_url}?filter=suspended", fba.api_headers, (config.get("connection_timeout"), config.get("read_timeout"))).json()
+
+ if (federation == None):
+ print("WARNING: No valid response:", blocker);
+ elif "error" in federation:
+ print("WARNING: API returned error:", federation["error"])
+ else:
+ print(f"INFO: Checking {len(federation)} entries from blocker='{blocker}',software='{software}' ...")
+ for peer in federation:
+ blocked = peer["domain"].lower()
+ # DEBUG: print("DEBUG: BEFORE blocked:", blocked)
+ blocked = fba.tidyup_domain(blocked)
+ # DEBUG: print("DEBUG: AFTER blocked:", blocked)
+
+ if blocked == "":
+ print("WARNING: blocked is empty:", blocker)
+ continue
+ elif blacklist.is_blacklisted(blocked):
+ # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
+ continue
+ elif blocked.count("*") > 0:
+ # GTS does not have hashes for obscured domains, so we have to guess it
+ fba.cursor.execute(
+ "SELECT domain, origin, nodeinfo_url FROM instances WHERE domain LIKE ? ORDER BY rowid LIMIT 1", [blocked.replace("*", "_")]
+ )
+ searchres = fba.cursor.fetchone()
+
+ if searchres == None:
+ print(f"WARNING: Cannot deobsfucate blocked='{blocked}' - SKIPPED!")
+ continue
+
+ blocked = searchres[0]
+ origin = searchres[1]
+ nodeinfo_url = searchres[2]
+ elif not validators.domain(blocked):
+ print(f"WARNING: blocked='{blocked}',software='{software}' is not a valid domain name - skipped!")
+ continue
+
+ # DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
+ if not validators.domain(blocked):
+ print(f"WARNING: blocked='{blocked}',software='{software}' is not a valid domain name - skipped!")
+ continue
+ elif not fba.is_instance_registered(blocked):
+ # DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., blocker='{blocker}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
+ fba.add_instance(blocked, blocker, inspect.currentframe().f_code.co_name, nodeinfo_url)
+
+ if not blocks.is_instance_blocked(blocker, blocked, "reject"):
+ # DEBUG: print(f"DEBUG: blocker='{blocker}' is blocking '{blocked}' for unknown reason at this point")
+ blocks.add_instance(blocker, blocked, "unknown", "reject")
+
+ blockdict.append({
+ "blocked": blocked,
+ "reason" : None
+ })
+ else:
+ # DEBUG: print(f"DEBUG: Updating block last seen for blocker='{blocker}',blocked='{blocked}' ...")
+ blocks.update_last_seen(blocker, blocked, "reject")
+
+ if "public_comment" in peer:
+ # DEBUG: print("DEBUG: Updating block reason:", blocker, blocked, peer["public_comment"])
+ blocks.update_reason(peer["public_comment"], blocker, blocked, "reject")
+
+ for entry in blockdict:
+ if entry["blocked"] == blocked:
+ # DEBUG: print(f"DEBUG: Setting block reason for blocked='{blocked}':'{peer['public_comment']}'")
+ entry["reason"] = peer["public_comment"]
+
+ # DEBUG: print("DEBUG: Committing changes ...")
+ fba.connection.commit()
+ except Exception as e:
+ print(f"ERROR: blocker='{blocker}',software='{software}',exception[{type(e)}]:'{str(e)}'")
+
+ # DEBUG: print("DEBUG: EXIT!")