import csv
import inspect
import json
-import sys
import time
import argparse
import bs4
import markdown
import reqto
-import requests
import validators
from fba import blacklist
-from fba import blocks
from fba import config
from fba import federation
from fba import fba
-from fba import instances
-from fba import locking
from fba import network
+from fba.helpers import locking
from fba.helpers import tidyup
+from fba.models import blocks
+from fba.models import instances
+
from fba.networks import friendica
from fba.networks import mastodon
from fba.networks import misskey
# DEBUG: print(f"DEBUG: status={status} - EXIT!")
return status
-def fetch_bkali(args: argparse.Namespace):
- # DEBUG: print(f"DEBUG: args[]={type(args)} - CALLED!")
+def fetch_bkali(args: argparse.Namespace) -> int:
+ # DEBUG: print(f"DEBUG: args[]='{type(args)}' - CALLED!")
domains = list()
try:
fetched = network.post_json_api("gql.api.bka.li", "/v1/graphql", json.dumps({
"query": "query domainlist {nodeinfo(order_by: {domain: asc}) {domain}}"
}))
- # DEBUG: print(f"DEBUG: fetched({len(fetched)})[]='{type(fetched)}'")
- if len(fetched) == 0:
+ # DEBUG: print(f"DEBUG: fetched[]='{type(fetched)}'")
+ if "error_message" in fetched:
+ print(f"WARNING: post_json_api() for 'gql.api.bka.li' returned error message: {fetched['error_message']}")
+ return 100
+ elif isinstance(fetched["json"], dict) and "error" in fetched["json"] and "message" in fetched["json"]["error"]:
+ print(f"WARNING: post_json_api() returned error: {fetched['error']['message']}")
+ return 101
+
+ rows = fetched["json"]
+
+ # DEBUG: print(f"DEBUG: rows({len(rows)})[]='{type(rows)}'")
+ if len(rows) == 0:
raise Exception("WARNING: Returned no records")
- elif "data" not in fetched:
- raise Exception(f"WARNING: fetched()={len(fetched)} does not contain key 'data'")
- elif "nodeinfo" not in fetched["data"]:
- raise Exception(f"WARNING: fetched()={len(fetched['data'])} does not contain key 'nodeinfo'")
+ elif "data" not in rows:
+ raise Exception(f"WARNING: rows()={len(rows)} does not contain key 'data'")
+ elif "nodeinfo" not in rows["data"]:
+ raise Exception(f"WARNING: rows()={len(rows['data'])} does not contain key 'nodeinfo'")
- for entry in fetched["data"]["nodeinfo"]:
+ for entry in rows["data"]["nodeinfo"]:
# DEBUG: print(f"DEBUG: entry['{type(entry)}']='{entry}'")
if not "domain" in entry:
print(f"WARNING: entry()={len(entry)} does not contain 'domain' - SKIPPED!")
elif instances.is_registered(entry["domain"]):
# DEBUG: print(f"DEBUG: domain='{entry['domain']}' is already registered - SKIPPED!")
continue
+ elif instances.is_recent(entry["domain"]):
+ # DEBUG: print(f"DEBUG: domain='{entry['domain']}' has been recently fetched - SKIPPED!")
+ continue
# DEBUG: print(f"DEBUG: Adding domain='{entry['domain']}' ...")
domains.append(entry["domain"])
- except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, UnicodeEncodeError) as exception:
- print(f"ERROR: Cannot fetch graphql,exception[{type(exception)}]:'{str(exception)}'")
- sys.exit(255)
+ except network.exceptions as exception:
+ print(f"ERROR: Cannot fetch graphql,exception[{type(exception)}]:'{str(exception)}' - EXIT!")
+ return 102
# DEBUG: print(f"DEBUG: domains()={len(domains)}")
if len(domains) > 0:
try:
print(f"INFO: Fetching instances from domain='{domain}' ...")
federation.fetch_instances(domain, None, None, inspect.currentframe().f_code.co_name)
- except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, UnicodeEncodeError) as exception:
- print(f"WARNING: Exception '{type(exception)}' during fetching instances from domain='{domain}'")
- instances.update_last_error(domain, exception)
+ except network.exceptions as exception:
+ print(f"WARNING: Exception '{type(exception)}' during fetching instances (fetch_bkali) from domain='{domain}'")
+ instances.set_last_error(domain, exception)
- # DEBUG: print("DEBUG: EXIT!")
+ # DEBUG: print("DEBUG: Success - EXIT!")
+ return 0
def fetch_blocks(args: argparse.Namespace):
- # DEBUG: print(f"DEBUG: args[]={type(args)} - CALLED!")
+ # DEBUG: print(f"DEBUG: args[]='{type(args)}' - CALLED!")
if args.domain is not None and args.domain != "":
# DEBUG: print(f"DEBUG: args.domain='{args.domain}' - checking ...")
if not validators.domain(args.domain):
if args.domain is not None and args.domain != "":
# Re-check single domain
+ # DEBUG: print(f"DEBUG: Querying database for single args.domain='{args.domain}' ...")
fba.cursor.execute(
- "SELECT domain, software, origin, nodeinfo_url FROM instances WHERE software IN ('pleroma', 'mastodon', 'friendica', 'misskey', 'bookwyrm', 'takahe') AND domain = ?", [args.domain]
+ "SELECT domain, software, origin, nodeinfo_url FROM instances WHERE domain = ?", [args.domain]
)
else:
# Re-check after "timeout" (aka. minimum interval)
continue
# DEBUG: print(f"DEBUG: blocker='{blocker}'")
- instances.update_last_blocked(blocker)
+ instances.set_last_blocked(blocker)
if software == "pleroma":
print(f"INFO: blocker='{blocker}',software='{software}'")
continue
elif blocked.count("*") > 0:
# Some friendica servers also obscure domains without hash
- fba.cursor.execute(
- "SELECT domain, origin, nodeinfo_url FROM instances WHERE domain LIKE ? ORDER BY rowid LIMIT 1", [blocked.replace("*", "_")]
- )
+ row = instances.deobscure("*", blocked)
- searchres = fba.cursor.fetchone()
-
- if searchres is None:
- print(f"WARNING: Cannot deobsfucate blocked='{blocked}' - SKIPPED!")
+ # DEBUG: print(f"DEBUG: row[]='{type(row)}'")
+ if row is None:
+ print(f"WARNING: Cannot deobsfucate blocked='{blocked}',blocker='{blocker}',software='{software}' - SKIPPED!")
continue
- blocked = searchres[0]
- origin = searchres[1]
- nodeinfo_url = searchres[2]
+ blocked = row[0]
+ origin = row[1]
+ nodeinfo_url = row[2]
elif blocked.count("?") > 0:
# Some obscure them with question marks, not sure if that's dependent on version or not
- fba.cursor.execute(
- "SELECT domain, origin, nodeinfo_url FROM instances WHERE domain LIKE ? ORDER BY rowid LIMIT 1", [blocked.replace("?", "_")]
- )
-
- searchres = fba.cursor.fetchone()
+ row = instances.deobscure("?", blocked)
- if searchres is None:
- print(f"WARNING: Cannot deobsfucate blocked='{blocked}' - SKIPPED!")
+ # DEBUG: print(f"DEBUG: row[]='{type(row)}'")
+ if row is None:
+ print(f"WARNING: Cannot deobsfucate blocked='{blocked}',blocker='{blocker}',software='{software}' - SKIPPED!")
continue
- blocked = searchres[0]
- origin = searchres[1]
- nodeinfo_url = searchres[2]
- elif not validators.domain(blocked):
- print(f"WARNING: blocked='{blocked}',software='{software}' is not a valid domain name - skipped!")
- continue
+ blocked = row[0]
+ origin = row[1]
+ nodeinfo_url = row[2]
# DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
if not validators.domain(blocked):
- print(f"WARNING: blocked='{blocked}',software='{software}' is not a valid domain name - skipped!")
+ print(f"WARNING: blocked='{blocked}',software='{software}' is not a valid domain name - SKIPPED!")
+ continue
+ elif blocked.endswith(".arpa"):
+ # DEBUG: print(f"DEBUG: blocked='{blocked}' is ending with '.arpa' - SKIPPED!")
continue
elif not instances.is_registered(blocked):
# DEBUG: print("DEBUG: Hash wasn't found, adding:", blocked, blocker)
- instances.add(blocked, blocker, inspect.currentframe().f_code.co_name, nodeinfo_url)
+ try:
+ instances.add(blocked, blocker, inspect.currentframe().f_code.co_name, nodeinfo_url)
+ except network.exceptions as exception:
+ print(f"Exception during adding blocked='{blocked}',blocker='{blocker}': '{type(exception)}'")
+ continue
if not blocks.is_instance_blocked(blocker, blocked, block_level):
blocks.add_instance(blocker, blocked, reason, block_level)
else:
print("WARNING: Unknown software:", blocker, software)
+ if instances.has_pending(blocker):
+ # DEBUG: print(f"DEBUG: Invoking instances.update_data({blocker}) ...")
+ instances.update_data(blocker)
+
if config.get("bot_enabled") and len(blockdict) > 0:
network.send_bot_post(blocker, blockdict)
# DEBUG: print("DEBUG: EXIT!")
def fetch_cs(args: argparse.Namespace):
- # DEBUG: print(f"DEBUG: args[]={type(args)} - CALLED!")
+ # DEBUG: print(f"DEBUG: args[]='{type(args)}' - CALLED!")
extensions = [
'extra',
'abbr',
}
raw = fba.fetch_url("https://raw.githubusercontent.com/chaossocial/meta/master/federation.md", network.web_headers, (config.get("connection_timeout"), config.get("read_timeout"))).text
- # DEBUG: print(f"DEBUG: raw()={len(raw)}[]={type(raw)}")
+ # DEBUG: print(f"DEBUG: raw()={len(raw)}[]='{type(raw)}'")
doc = bs4.BeautifulSoup(markdown.markdown(raw, extensions=extensions), features='html.parser')
- # DEBUG: print(f"DEBUG: doc()={len(doc)}[]={type(doc)}")
+ # DEBUG: print(f"DEBUG: doc()={len(doc)}[]='{type(doc)}'")
silenced = doc.find("h2", {"id": "silenced-instances"}).findNext("table").find("tbody")
- # DEBUG: print(f"DEBUG: silenced[]={type(silenced)}")
+ # DEBUG: print(f"DEBUG: silenced[]='{type(silenced)}'")
domains["silenced"] = domains["silenced"] + federation.find_domains(silenced)
blocked = doc.find("h2", {"id": "blocked-instances"}).findNext("table").find("tbody")
- # DEBUG: print(f"DEBUG: blocked[]={type(blocked)}")
+ # DEBUG: print(f"DEBUG: blocked[]='{type(blocked)}'")
domains["reject"] = domains["reject"] + federation.find_domains(blocked)
# DEBUG: print(f"DEBUG: domains()={len(domains)}")
try:
print(f"INFO: Fetching instances from domain='{row['domain']}' ...")
federation.fetch_instances(row["domain"], 'chaos.social', None, inspect.currentframe().f_code.co_name)
- except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, UnicodeEncodeError) as exception:
- print(f"WARNING: Exception '{type(exception)}' during fetching instances from domain='{row['domain']}'")
- instances.update_last_error(row["domain"], exception)
+ except network.exceptions as exception:
+ print(f"WARNING: Exception '{type(exception)}' during fetching instances (fetch_cs) from domain='{row['domain']}'")
+ instances.set_last_error(row["domain"], exception)
# DEBUG: print("DEBUG: Committing changes ...")
fba.connection.commit()
# DEBUG: print("DEBUG: EXIT!")
def fetch_fba_rss(args: argparse.Namespace):
- # DEBUG: print(f"DEBUG: args[]={type(args)} - CALLED!")
+ # DEBUG: print(f"DEBUG: args[]='{type(args)}' - CALLED!")
domains = list()
print(f"INFO: Fetch FBA-specific RSS args.feed='{args.feed}' ...")
# DEBUG: print(f"DEBUG: Parsing RSS feed ({len(response.text)} Bytes) ...")
rss = atoma.parse_rss_bytes(response.content)
- # DEBUG: print(f"DEBUG: rss[]={type(rss)}")
+ # DEBUG: print(f"DEBUG: rss[]='{type(rss)}'")
for item in rss.items:
# DEBUG: print(f"DEBUG: item={item}")
domain = item.link.split("=")[1]
try:
print(f"INFO: Fetching instances from domain='{domain}' ...")
federation.fetch_instances(domain, None, None, inspect.currentframe().f_code.co_name)
- except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, UnicodeEncodeError) as exception:
- print(f"WARNING: Exception '{type(exception)}' during fetching instances from domain='{domain}'")
- instances.update_last_error(domain, exception)
+ except network.exceptions as exception:
+ print(f"WARNING: Exception '{type(exception)}' during fetching instances (fetch_fba_rss) from domain='{domain}'")
+ instances.set_last_error(domain, exception)
# DEBUG: print("DEBUG: EXIT!")
def fetch_fbabot_atom(args: argparse.Namespace):
- # DEBUG: print(f"DEBUG: args[]={type(args)} - CALLED!")
+ # DEBUG: print(f"DEBUG: args[]='{type(args)}' - CALLED!")
feed = "https://ryona.agency/users/fba/feed.atom"
domains = list()
# DEBUG: print(f"DEBUG: Parsing ATOM feed ({len(response.text)} Bytes) ...")
atom = atoma.parse_atom_bytes(response.content)
- # DEBUG: print(f"DEBUG: atom[]={type(atom)}")
+ # DEBUG: print(f"DEBUG: atom[]='{type(atom)}'")
for entry in atom.entries:
- # DEBUG: print(f"DEBUG: entry[]={type(entry)}")
+ # DEBUG: print(f"DEBUG: entry[]='{type(entry)}'")
doc = bs4.BeautifulSoup(entry.content.value, "html.parser")
- # DEBUG: print(f"DEBUG: doc[]={type(doc)}")
+ # DEBUG: print(f"DEBUG: doc[]='{type(doc)}'")
for element in doc.findAll("a"):
for href in element["href"].split(","):
# DEBUG: print(f"DEBUG: href[{type(href)}]={href}")
try:
print(f"INFO: Fetching instances from domain='{domain}' ...")
federation.fetch_instances(domain, None, None, inspect.currentframe().f_code.co_name)
- except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, UnicodeEncodeError) as exception:
- print(f"WARNING: Exception '{type(exception)}' during fetching instances from domain='{domain}'")
- instances.update_last_error(domain, exception)
+ except network.exceptions as exception:
+ print(f"WARNING: Exception '{type(exception)}' during fetching instances (fetch_fbabot_atom) from domain='{domain}'")
+ instances.set_last_error(domain, exception)
# DEBUG: print("DEBUG: EXIT!")
-def fetch_instances(args: argparse.Namespace):
- # DEBUG: print(f"DEBUG: args[]={type(args)} - CALLED!")
+def fetch_instances(args: argparse.Namespace) -> int:
+ # DEBUG: print(f"DEBUG: args[]='{type(args)}' - CALLED!")
locking.acquire()
# Initial fetch
try:
print(f"INFO: Fetching instances from args.domain='{args.domain}' ...")
federation.fetch_instances(args.domain, None, None, inspect.currentframe().f_code.co_name)
- except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, UnicodeEncodeError) as exception:
- print(f"WARNING: Exception '{type(exception)}' during fetching instances from args.domain='{args.domain}'")
- instances.update_last_error(args.domain, exception)
- return
+ except network.exceptions as exception:
+ print(f"WARNING: Exception '{type(exception)}' during fetching instances (fetch_instances) from args.domain='{args.domain}'")
+ instances.set_last_error(args.domain, exception)
+
+ return 100
if args.single:
# DEBUG: print("DEBUG: Not fetching more instances - EXIT!")
- return
+ return 0
# Loop through some instances
fba.cursor.execute(
try:
print(f"INFO: Fetching instances for instance '{row[0]}' ('{row[2]}') of origin='{row[1]}',nodeinfo_url='{row[3]}'")
federation.fetch_instances(row[0], row[1], row[2], inspect.currentframe().f_code.co_name, row[3])
- except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, UnicodeEncodeError) as exception:
- print(f"WARNING: Exception '{type(exception)}' during fetching instances from domain='{row[0]}'")
- instances.update_last_error(row[0], exception)
+ except network.exceptions as exception:
+ print(f"WARNING: Exception '{type(exception)}' during fetching instances (fetch_instances) from domain='{row[0]}'")
+ instances.set_last_error(row[0], exception)
+
+ # DEBUG: print("DEBUG: Success - EXIT!")
+ return 0
+
+def fetch_oliphant(args: argparse.Namespace):
+ # DEBUG: print(f"DEBUG: args[]='{type(args)}' - CALLED!")
+ locking.acquire()
+
+ # Base URL
+ base_url = "https://codeberg.org/oliphant/blocklists/raw/branch/main/blocklists"
+
+ # URLs to fetch
+ blocklists = (
+ {
+ "blocker": "artisan.chat",
+ "csv_url": "mastodon/artisan.chat.csv",
+ },{
+ "blocker": "mastodon.art",
+ "csv_url": "mastodon/mastodon.art.csv",
+ },{
+ "blocker": "pleroma.envs.net",
+ "csv_url": "mastodon/pleroma.envs.net.csv",
+ },{
+ "blocker": "oliphant.social",
+ "csv_url": "mastodon/_unified_tier3_blocklist.csv",
+ },{
+ "blocker": "mastodon.online",
+ "csv_url": "mastodon/mastodon.online.csv",
+ },{
+ "blocker": "mastodon.social",
+ "csv_url": "mastodon/mastodon.social.csv",
+ },{
+ "blocker": "mastodon.social",
+ "csv_url": "other/missing-tier0-mastodon.social.csv",
+ },{
+ "blocker": "rage.love",
+ "csv_url": "mastodon/rage.love.csv",
+ },{
+ "blocker": "sunny.garden",
+ "csv_url": "mastodon/sunny.garden.csv",
+ },{
+ "blocker": "solarpunk.moe",
+ "csv_url": "mastodon/solarpunk.moe.csv",
+ },{
+ "blocker": "toot.wales",
+ "csv_url": "mastodon/toot.wales.csv",
+ },{
+ "blocker": "union.place",
+ "csv_url": "mastodon/union.place.csv",
+ }
+ )
+
+ domains = list()
+ for block in blocklists:
+ # Is domain given and not equal blocker?
+ if isinstance(args.domain, str) and args.domain != block["blocker"]:
+ # DEBUG: print(f"DEBUG: Skipping blocker='{block['blocker']}', not matching args.domain='{args.domain}'")
+ continue
+ elif args.domain in domains:
+ # DEBUG: print(f"DEBUG: args.domain='{args.domain}' already handled - SKIPPED!")
+ continue
+
+ # Fetch this URL
+ print(f"INFO: Fetching csv_url='{block['csv_url']}' for blocker='{block['blocker']}' ...")
+ response = fba.fetch_url(f"{base_url}/{block['csv_url']}", network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
+
+ # DEBUG: print(f"DEBUG: response[]='{type(response)}'")
+ if response.ok and response.content != "":
+ # DEBUG: print(f"DEBUG: Fetched {len(response.content)} Bytes, parsing CSV ...")
+ reader = csv.DictReader(response.content.decode('utf-8').splitlines(), dialect="unix")
+
+ # DEBUG: print(f"DEBUG: reader[]='{type(reader)}'")
+ for row in reader:
+ domain = None
+ if "#domain" in row:
+ domain = row["#domain"]
+ elif "domain" in row:
+ domain = row["domain"]
+ else:
+ # DEBUG: print(f"DEBUG: row='{row}' does not contain domain column")
+ continue
+
+ # DEBUG: print(f"DEBUG: Marking domain='{domain}' as handled")
+ domains.append(domain)
+
+ # DEBUG: print(f"DEBUG: Processing domain='{domain}' ...")
+ processed = fba.process_domain(domain, block["blocker"], inspect.currentframe().f_code.co_name)
+
+ # DEBUG: print(f"DEBUG: processed='{processed}'")
# DEBUG: print("DEBUG: EXIT!")
-def fetch_federater(args: argparse.Namespace):
- # DEBUG: print(f"DEBUG: args[]={type(args)} - CALLED!")
+def fetch_txt(args: argparse.Namespace):
+ # DEBUG: print(f"DEBUG: args[]='{type(args)}' - CALLED!")
locking.acquire()
- # Fetch this URL
- response = fba.fetch_url("https://github.com/federater/blocks_recommended/raw/main/federater.csv", network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
- # DEBUG: print(f"DEBUG: response[]='{type(response)}'")
- if response.ok and response.content != "":
- # DEBUG: print(f"DEBUG: Fetched {len(response.content)} Bytes, parsing CSV ...")
- ## DEBUG: print(f"DEBUG: response.content={response.content}")
- reader = csv.DictReader(response.content.decode('utf-8').splitlines(), dialect='unix')
- #, fieldnames='domain,severity,reject_media,reject_reports,public_comment,obfuscate'
- # DEBUG: print(f"DEBUG: reader[]={type(reader)}")
- for row in reader:
- if not validators.domain(row["#domain"]):
- print(f"WARNING: domain='{row['#domain']}' is not a valid domain - skipped!")
- continue
- elif blacklist.is_blacklisted(row["#domain"]):
- print(f"WARNING: domain='{row['#domain']}' is blacklisted - skipped!")
- continue
- elif instances.is_registered(row["#domain"]):
- # DEBUG: print(f"DEBUG: domain='{row['#domain']}' is already registered - skipped!")
- continue
+ # Static URLs
+ urls = (
+ "https://seirdy.one/pb/bsl.txt",
+ )
- try:
- print(f"INFO: Fetching instances for instane='{row['#domain']}' ...")
- federation.fetch_instances(row["#domain"], None, None, inspect.currentframe().f_code.co_name)
- except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, UnicodeEncodeError) as exception:
- print(f"WARNING: Exception '{type(exception)}' during fetching instances from domain='{row['#domain']}'")
- instances.update_last_error(row["#domain"], exception)
+ print(f"INFO: Checking {len(urls)} text file(s) ...")
+ for url in urls:
+ # DEBUG: print(f"DEBUG: Fetching url='{url}' ...")
+ response = fba.fetch_url(url, network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
+
+ # DEBUG: print(f"DEBUG: response[]='{type(response)}'")
+ if response.ok and response.text != "":
+ # DEBUG: print(f"DEBUG: Returned {len(response.text.strip())} Bytes for processing")
+ domains = response.text.split("\n")
+
+ print(f"INFO: Processing {len(domains)} domains ...")
+ for domain in domains:
+ if domain == "":
+ continue
+
+ # DEBUG: print(f"DEBUG: domain='{domain}'")
+ processed = fba.process_domain(domain, 'seirdy.one', inspect.currentframe().f_code.co_name)
+
+ # DEBUG: print(f"DEBUG: processed='{processed}'")
+ if not processed:
+ # DEBUG: print(f"DEBUG: domain='{domain}' was not generically processed - SKIPPED!")
+ continue
# DEBUG: print("DEBUG: EXIT!")