__all__ = [
+ 'blacklist',
'blocks',
'boot',
'cache',
--- /dev/null
+# Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
+# Copyright (C) 2023 Free Software Foundation
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+# Don't check these, known trolls/flooders/testing/developing
+blacklist = [
+ # Floods network with fake nodes as "research" project
+ "activitypub-troll.cf",
+ # Similar troll
+ "gab.best",
+ # Similar troll
+ "4chan.icu",
+ # Flooder (?)
+ "social.shrimpcam.pw",
+ # Flooder (?)
+ "mastotroll.netz.org",
+ # Testing/developing installations
+ "ngrok.io",
+ "ngrok-free.app",
+ "misskeytest.chn.moe",
+]
+
+def is_blacklisted(domain: str) -> bool:
+ # DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!")
+ if type(domain) != str:
+ raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
+ elif domain == "":
+ raise ValueError(f"Parameter 'domain' is empty")
+
+ blacklisted = False
+ for peer in blacklist:
+ # DEBUG: print(f"DEBUG: Checking peer='{peer}' ...")
+ if peer in domain:
+ # DEBUG: print(f"DEBUG: domain='{domain}' is blacklisted.")
+ blacklisted = True
+
+ # DEBUG: print(f"DEBUG: blacklisted='{blacklisted}' - EXIT!")
+ return blacklisted
import time
import validators
+from fba import blacklist
from fba import fba
def update_reason(reason: str, blocker: str, blocked: str, block_level: str):
raise ValueError(f"Parameter 'blocked' is empty")
elif not validators.domain(blocked.split("/")[0]):
raise ValueError(f"Bad blocked='{blocked}'")
- elif fba.is_blacklisted(blocker):
+ elif blacklist.is_blacklisted(blocker):
raise Exception(f"blocker='{blocker}' is blacklisted but function invoked")
- elif fba.is_blacklisted(blocked):
+ elif blacklist.is_blacklisted(blocked):
raise Exception(f"blocked='{blocked}' is blacklisted but function invoked")
if reason != None:
import time
import validators
+from fba import blacklist
from fba import blocks
from fba import boot
from fba import config
if not validators.domain(args.domain):
print(f"WARNING: args.domain='{args.domain}' is not valid")
status = 100
- elif fba.is_blacklisted(args.domain):
+ elif blacklist.is_blacklisted(args.domain):
print(f"WARNING: args.domain='{args.domain}' is blacklisted")
status = 101
elif fba.is_instance_registered(args.domain):
elif not validators.domain(entry["domain"]):
print(f"WARNING: domain='{entry['domain']}' is not a valid domain - SKIPPED!")
continue
- elif fba.is_blacklisted(entry["domain"]):
+ elif blacklist.is_blacklisted(entry["domain"]):
# DEBUG: print(f"DEBUG: domain='{entry['domain']}' is blacklisted - SKIPPED!")
continue
elif fba.is_instance_registered(entry["domain"]):
if not validators.domain(args.domain):
print(f"WARNING: domain='{args.domain}' is not valid.")
return
- elif fba.is_blacklisted(args.domain):
+ elif blacklist.is_blacklisted(args.domain):
print(f"WARNING: domain='{args.domain}' is blacklisted, won't check it!")
return
elif not fba.is_instance_registered(args.domain):
if blocker == "":
print("WARNING: blocker is now empty!")
continue
- elif fba.is_blacklisted(blocker):
+ elif blacklist.is_blacklisted(blocker):
print(f"WARNING: blocker='{blocker}' is blacklisted now!")
continue
# DEBUG: print(f"DEBUG: blocker='{blocker}'")
- fba.update_last_blocked(blocker)
+ instances.update_last_blocked(blocker)
if software == "pleroma":
print(f"INFO: blocker='{blocker}',software='{software}'")
if blocked == "":
print("WARNING: blocked is empty:", blocker)
continue
- elif fba.is_blacklisted(blocked):
+ elif blacklist.is_blacklisted(blocked):
# DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
continue
elif blocked.count("*") > 0:
if blocked == "":
print("WARNING: blocked is empty:", blocker)
continue
- elif fba.is_blacklisted(blocked):
+ elif blacklist.is_blacklisted(blocked):
# DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
continue
elif blocked.count("*") > 0:
# DEBUG: print(f"DEBUG: item={item}")
domain = item.link.split("=")[1]
- if fba.is_blacklisted(domain):
+ if blacklist.is_blacklisted(domain):
# DEBUG: print(f"DEBUG: domain='{domain}' is blacklisted - SKIPPED!")
continue
elif domain in domains:
domain = fba.tidyup_domain(href)
# DEBUG: print(f"DEBUG: domain='{domain}'")
- if fba.is_blacklisted(domain):
+ if blacklist.is_blacklisted(domain):
# DEBUG: print(f"DEBUG: domain='{domain}' is blacklisted - SKIPPED!")
continue
elif domain in domains:
print(f"INFO: Checking {len(rows)} entries ...")
for row in rows:
# DEBUG: print("DEBUG: domain:", row[0])
- if fba.is_blacklisted(row[0]):
+ if blacklist.is_blacklisted(row[0]):
print("WARNING: domain is blacklisted:", row[0])
continue
from urllib.parse import urlparse
+from fba import blacklist
from fba import cache
from fba import config
from fba import instances
from fba.network import lemmy
from fba.network import misskey
-# Don't check these, known trolls/flooders/testing/developing
-blacklist = [
- # Floods network with fake nodes as "research" project
- "activitypub-troll.cf",
- # Similar troll
- "gab.best",
- # Similar troll
- "4chan.icu",
- # Flooder (?)
- "social.shrimpcam.pw",
- # Flooder (?)
- "mastotroll.netz.org",
- # Testing/developing installations
- "ngrok.io",
- "ngrok-free.app",
- "misskeytest.chn.moe",
-]
-
# Array with pending errors needed to be written to database
pending_errors = {
}
elif not validators.domain(instance.split("/")[0]):
print(f"WARNING: Bad instance='{instance}' from domain='{domain}',origin='{origin}',software='{software}'")
continue
- elif is_blacklisted(instance):
+ elif blacklist.is_blacklisted(instance):
# DEBUG: print("DEBUG: instance is blacklisted:", instance)
continue
peer = tidyup_domain(peer)
# DEBUG: print(f"DEBUG: peer='{peer}' - AFTER!")
- if is_blacklisted(peer):
+ if blacklist.is_blacklisted(peer):
# DEBUG: print(f"DEBUG: peer='{peer}' is blacklisted, skipped!")
continue
# DEBUG: print(f"DEBUG: software='{software}' - EXIT!")
return software
-def is_blacklisted(domain: str) -> bool:
+def blacklist.is_blacklisted(domain: str) -> bool:
+ # DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!")
if type(domain) != str:
raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
blacklisted = False
for peer in blacklist:
+ # DEBUG: print(f"DEBUG: Checking peer='{peer}' ...")
if peer in domain:
+ # DEBUG: print(f"DEBUG: domain='{domain}' is blacklisted.")
blacklisted = True
+ # DEBUG: print(f"DEBUG: blacklisted='{blacklisted}' - EXIT!")
return blacklisted
def remove_pending_error(domain: str):
return hashlib.sha256(domain.encode("utf-8")).hexdigest()
-def update_last_blocked(domain: str):
- if type(domain) != str:
- raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
- elif domain == "":
- raise ValueError(f"Parameter 'domain' is empty")
-
- # DEBUG: print("DEBUG: Updating last_blocked for domain", domain)
- instances.set("last_blocked", domain, time.time())
-
- # Running pending updated
- # DEBUG: print(f"DEBUG: Invoking instances.update_instance_data({domain}) ...")
- instances.update_instance_data(domain)
-
- # DEBUG: print("DEBUG: EXIT!")
-
def log_error(domain: str, response: requests.models.Response):
# DEBUG: print("DEBUG: domain,response[]:", domain, type(response))
if type(domain) != str:
# DEBUG: print("DEBUG: EXIT!")
-def update_last_instance_fetch(domain: str):
- # DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!")
- if type(domain) != str:
- raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
- elif domain == "":
- raise ValueError(f"Parameter 'domain' is empty")
-
- # DEBUG: print("DEBUG: Updating last_instance_fetch for domain:", domain)
- instances.set("last_instance_fetch", domain, time.time())
-
- # Running pending updated
- # DEBUG: print(f"DEBUG: Invoking instances.update_instance_data({domain}) ...")
- instances.update_instance_data(domain)
-
- # DEBUG: print("DEBUG: EXIT!")
-
def update_last_nodeinfo(domain: str):
# DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!")
if type(domain) != str:
instances.set("total_peers", domain, len(peers))
# DEBUG: print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...")
- update_last_instance_fetch(domain)
+ instances.update_last_instance_fetch(domain)
# DEBUG: print("DEBUG: Returning peers[]:", type(peers))
return peers
raise ValueError(f"Bad domain name='{domain}'")
elif origin is not None and not validators.domain(origin.split("/")[0]):
raise ValueError(f"Bad origin name='{origin}'")
- elif is_blacklisted(domain):
+ elif blacklist.is_blacklisted(domain):
raise Exception(f"domain='{domain}' is blacklisted, but method invoked")
# DEBUG: print("DEBUG: domain,origin,originator,path:", domain, origin, originator, path)
break
# DEBUG: print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...")
- update_last_instance_fetch(domain)
+ instances.update_last_instance_fetch(domain)
# DEBUG: print("DEBUG: Returning for domain,blocked(),suspended():", domain, len(blocks["blocked"]), len(blocks["suspended"]))
return {
# DEBUG: print(f"DEBUG: domain='{domain}',reason='{reason}'")
- if is_blacklisted(domain):
+ if blacklist.is_blacklisted(domain):
print(f"WARNING: domain='{domain}' is blacklisted - skipped!")
continue
elif domain == "gab.com/.ai, develop.gab.com":
sys.exit(255)
# DEBUG: print("DEBUG: EXIT!")
+
+def update_last_instance_fetch(domain: str):
+ # DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!")
+ if type(domain) != str:
+ raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
+ elif domain == "":
+ raise ValueError(f"Parameter 'domain' is empty")
+
+ # DEBUG: print("DEBUG: Updating last_instance_fetch for domain:", domain)
+ set("last_instance_fetch", domain, time.time())
+
+ # Running pending updated
+ # DEBUG: print(f"DEBUG: Invoking update_instance_data({domain}) ...")
+ update_instance_data(domain)
+
+ # DEBUG: print("DEBUG: EXIT!")
+
+def update_last_blocked(domain: str):
+ if type(domain) != str:
+ raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
+ elif domain == "":
+ raise ValueError(f"Parameter 'domain' is empty")
+
+ # DEBUG: print("DEBUG: Updating last_blocked for domain", domain)
+ set("last_blocked", domain, time.time())
+
+ # Running pending updated
+ # DEBUG: print(f"DEBUG: Invoking update_instance_data({domain}) ...")
+ update_instance_data(domain)
+
+ # DEBUG: print("DEBUG: EXIT!")
instances.set("total_peers", domain, len(peers))
# DEBUG: print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...")
- fba.update_last_instance_fetch(domain)
+ instances.update_last_instance_fetch(domain)
# DEBUG: print("DEBUG: Returning peers[]:", type(peers))
return peers
import bs4
import validators
+from fba import blacklist
from fba import blocks
from fba import config
from fba import fba
if blocked == "":
print("WARNING: blocked is empty:", domain)
continue
- elif fba.is_blacklisted(blocked):
+ elif blacklist.is_blacklisted(blocked):
# DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
continue
elif blocked.count("*") > 0:
import json
+from fba import blacklist
from fba import config
from fba import fba
from fba import instances
elif type(row["host"]) != str:
print(f"WARNING: row[host][]={type(row['host'])} is not 'str'")
continue
- elif fba.is_blacklisted(row["host"]):
+ elif blacklist.is_blacklisted(row["host"]):
# DEBUG: print(f"DEBUG: row[host]='{row['host']}' is blacklisted. domain='{domain}'")
continue
elif row["host"] in peers:
instances.set("total_peers", domain, len(peers))
# DEBUG: print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...")
- fba.update_last_instance_fetch(domain)
+ instances.update_last_instance_fetch(domain)
# DEBUG: print("DEBUG: Returning peers[]:", type(peers))
return peers
instances.set("total_peers", domain, len(peers))
# DEBUG: print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...")
- update_last_instance_fetch(domain)
+ instances.update_last_instance_fetch(domain)
# DEBUG: print("DEBUG: Returning peers[]:", type(peers))
return peers
import inspect
import validators
+from fba import blacklist
from fba import blocks
from fba import fba
if blocked == "":
print("WARNING: blocked is empty after fba.tidyup_domain():", domain, block_level)
continue
- elif fba.is_blacklisted(blocked):
+ elif blacklist.is_blacklisted(blocked):
# DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
continue
elif blocked.count("*") > 1:
if blocked == "":
print("WARNING: blocked is empty after fba.tidyup_domain():", domain, block_level)
continue
- elif fba.is_blacklisted(blocked):
+ elif blacklist.is_blacklisted(blocked):
# DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
continue
elif blocked.count("*") > 1: