import bs4
import validators
-from fba import blacklist
-from fba import blocks
-from fba import config
from fba import csrf
from fba import fba
-from fba import instances
-from fba import network
+from fba.helpers import blacklist
+from fba.helpers import config
from fba.helpers import tidyup
+from fba.http import network
+
+from fba.models import blocks
+from fba.models import instances
+
language_mapping = {
# English -> English
"Silenced instances" : "Silenced servers",
raise ValueError("Parameter 'domain' is empty")
# DEBUG: print("DEBUG: Fetching mastodon blocks from domain:", domain)
+ doc = None
+ for path in ["/about/more", "/about"]:
+ try:
+ # DEBUG: print(f"DEBUG: Fetching path='{path}' from domain='{domain}' ...")
+ doc = bs4.BeautifulSoup(
+ network.fetch_response(
+ domain,
+ path,
+ network.web_headers,
+ (config.get("connection_timeout"), config.get("read_timeout"))
+ ).text,
+ "html.parser",
+ )
+
+ if len(doc.find_all("h3")) > 0:
+ # DEBUG: print(f"DEBUG: path='{path}' had some headlines - BREAK!")
+ break
+
+ except network.exceptions as exception:
+ print(f"ERROR: Cannot fetch from domain='{domain}',exception='{type(exception)}'")
+ instances.set_last_error(domain, exception)
+ break
+
blocklist = {
"Suspended servers": [],
"Filtered media" : [],
"Silenced servers" : [],
}
- try:
- doc = bs4.BeautifulSoup(
- network.fetch_response(
- domain,
- "/about/more",
- network.web_headers,
- (config.get("connection_timeout"), config.get("read_timeout"))
- ).text,
- "html.parser",
- )
- except BaseException as exception:
- print("ERROR: Cannot fetch from domain:", domain, exception)
- instances.update_last_error(domain, exception)
- return {}
+ # DEBUG: print(f"DEBUG: doc[]='{type(doc)}'")
+ if doc is None:
+ print(f"WARNING: Cannot fetch any /about pages for domain='{domain}' - EXIT!")
+ return blocklist
for header in doc.find_all("h3"):
header_text = tidyup.reason(header.text)
if header_text in blocklist or header_text.lower() in blocklist:
# replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
for line in header.find_all_next("table")[0].find_all("tr")[1:]:
- blocklist[header_text].append(
- {
- "domain": tidyup.domain(line.find("span").text),
- "hash" : tidyup.domain(line.find("span")["title"][9:]),
- "reason": tidyup.reason(line.find_all("td")[1].text),
- }
- )
+ blocklist[header_text].append({
+ "domain": tidyup.domain(line.find("span").text),
+ "hash" : tidyup.domain(line.find("span")["title"][9:]),
+ "reason": tidyup.reason(line.find_all("td")[1].text),
+ })
else:
print(f"WARNING: header_text='{header_text}' not found in blocklist()={len(blocklist)}")
headers = csrf.determine(domain, dict())
except network.exceptions as exception:
print(f"WARNING: Exception '{type(exception)}' during checking CSRF (fetch_blocks,{__name__}) - EXIT!")
+ instances.set_last_error(domain, exception)
return
try:
(config.get("connection_timeout"), config.get("read_timeout"))
)
+ # DEBUG: print(f"DEBUG: data[]='{type(data)}'")
if "error_message" in data:
- print(f"WARNING: Was not able to fetch domain_blocks from domain='{domain}': status_code='{data['status_code']}',error_message='{data['error_message']}'")
- instances.update_last_error(domain, data)
+ # DEBUG: print(f"DEBUG: Was not able to fetch domain_blocks from domain='{domain}': status_code='{data['status_code']}',error_message='{data['error_message']}'")
+ instances.set_last_error(domain, data)
return
elif "json" in data and "error" in data["json"]:
print(f"WARNING: JSON API returned error message: '{data['json']['error']}'")
- instances.update_last_error(domain, data)
+ instances.set_last_error(domain, data)
return
else:
# Getting blocklist
if len(blocklist) > 0:
print(f"INFO: Checking {len(blocklist)} entries from domain='{domain}',software='mastodon' ...")
for block in blocklist:
+ # Check type
+ # DEBUG: print(f"DEBUG: block[]='{type(block)}'")
+ if not isinstance(block, dict):
+ # DEBUG: print(f"DEBUG: block[]='{type(block)}' is of type 'dict' - SKIPPED!")
+ continue
+
# Map block -> entry
# DEBUG: print(f"DEBUG: block[{type(block)}]='{block}'")
entry = {
"domain": block["domain"],
"hash" : block["digest"],
- "reason": block["comment"]
+ "reason": block["comment"] if "comment" in block else None
}
# DEBUG: print("DEBUG: severity,domain,hash,comment:", block['severity'], block['domain'], block['digest'], block['comment'])
# DEBUG: print(f"DEBUG: Adding entry='{entry}' with severity='{block['severity']}' ...")
rows['report_removal'].append(entry)
else:
- print("WARNING: Unknown severity:", block['severity'], block['domain'])
+ print(f"WARNING: Unknown severity='{block['severity']}', domain='{block['domain']}'")
else:
# DEBUG: print(f"DEBUG: domain='{domain}' has returned zero rows, trying /about/more page ...")
rows = fetch_blocks_from_about(domain)
if block_level == "":
print("WARNING: block_level is empty, domain:", domain)
continue
+ elif block_level == "accept":
+ # DEBUG: print(f"DEBUG: domain='{domain}' skipping block_level='accept'")
+ continue
# DEBUG: print(f"DEBUG: Checking {len(blocklist)} entries from domain='{domain}',software='mastodon',block_level='{block_level}' ...")
for block in blocklist:
continue
elif blocked.count("*") > 0:
# Doing the hash search for instance names as well to tidy up DB
- fba.cursor.execute(
- "SELECT domain, origin, nodeinfo_url FROM instances WHERE hash = ? LIMIT 1", [blocked_hash]
- )
- searchres = fba.cursor.fetchone()
+ row = instances.deobscure("*", blocked, blocked_hash)
- # DEBUG: print(f"DEBUG: searchres[]='{type(searchres)}'")
- if searchres is None:
+ # DEBUG: print(f"DEBUG: row[]='{type(row)}'")
+ if row is None:
print(f"WARNING: Cannot deobsfucate blocked='{blocked}',blocked_hash='{blocked_hash}' - SKIPPED!")
continue
- # DEBUG: print("DEBUG: Updating domain: ", searchres[0])
- blocked = searchres[0]
- origin = searchres[1]
- nodeinfo_url = searchres[2]
+ # DEBUG: print("DEBUG: Updating domain: ", row[0])
+ blocked = row[0]
+ origin = row[1]
+ nodeinfo_url = row[2]
+ elif blocked.count("?") > 0:
+ # Doing the hash search for instance names as well to tidy up DB
+ row = instances.deobscure("?", blocked, blocked_hash)
- # DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
- if not validators.domain(blocked):
- print(f"WARNING: blocked='{blocked}',software='mastodon' is not a valid domain name - skipped!")
- continue
- elif blocked.split(".")[-1] == "arpa":
- print(f"WARNING: blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
+ # DEBUG: print(f"DEBUG: row[]='{type(row)}'")
+ if row is None:
+ print(f"WARNING: Cannot deobsfucate blocked='{blocked}',blocked_hash='{blocked_hash}' - SKIPPED!")
continue
- elif not instances.is_registered(blocked):
- # DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
- instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
- elif not validators.domain(blocked):
- print(f"WARNING: blocked='{blocked}',software='mastodon' is not a valid domain name - skipped!")
+
+ # DEBUG: print("DEBUG: Updating domain: ", row[0])
+ blocked = row[0]
+ origin = row[1]
+ nodeinfo_url = row[2]
+
+ # DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
+ if not validators.domain(blocked):
+ print(f"WARNING: blocked='{blocked}',software='mastodon' is not a valid domain name - SKIPPED!")
+ continue
+ elif blocked.endswith(".arpa"):
+ print(f"WARNING: blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
+ continue
+ elif blocked.endswith(".tld"):
+ print(f"WARNING: blocked='{blocked}' is a fake domain, please don't crawl them!")
+ continue
+ elif blacklist.is_blacklisted(blocked):
+ # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - SKIPPED!")
continue
+ elif not instances.is_registered(blocked):
+ # DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
+ instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
# DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
if not validators.domain(blocked):
- print(f"WARNING: blocked='{blocked}',software='mastodon' is not a valid domain name - skipped!")
+ print(f"WARNING: blocked='{blocked}',software='mastodon' is not a valid domain name - SKIPPED!")
continue
- elif blocked.split(".")[-1] == "arpa":
+ elif blocked.endswith(".arpa"):
print(f"WARNING: blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
continue
+ elif blocked.endswith(".tld"):
+ print(f"WARNING: blocked='{blocked}' is a fake domain, please don't crawl them!")
+ continue
+ elif blacklist.is_blacklisted(blocked):
+ # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - SKIPPED!")
+ continue
elif not instances.is_registered(blocked):
# DEBUG: print("DEBUG: Hash wasn't found, adding:", blocked, domain)
instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
- blocking = blocked if blocked.count("*") <= 1 else blocked_hash
- # DEBUG: print(f"DEBUG: blocking='{blocking}',blocked='{blocked}',blocked_hash='{blocked_hash}'")
-
if not blocks.is_instance_blocked(domain, blocked, block_level):
# DEBUG: print("DEBUG: Blocking:", domain, blocked, block_level)
- blocks.add_instance(domain, blocking, reason, block_level)
+ blocks.add_instance(domain, blocked, reason, block_level)
if block_level == "reject":
found_blocks.append({
"reason" : reason
})
else:
- # DEBUG: print(f"DEBUG: Updating block last seen and reason for domain='{domain}',blocking='{blocking}' ...")
- blocks.update_last_seen(domain, blocking, block_level)
- blocks.update_reason(reason, domain, blocking, block_level)
+ # DEBUG: print(f"DEBUG: Updating block last seen and reason for domain='{domain}',blocked='{blocked}' ...")
+ blocks.update_last_seen(domain, blocked, block_level)
+ blocks.update_reason(reason, domain, blocked, block_level)
# DEBUG: print("DEBUG: Committing changes ...")
fba.connection.commit()
except network.exceptions as exception:
print(f"ERROR: domain='{domain}',software='mastodon',exception[{type(exception)}]:'{str(exception)}'")
+ instances.set_last_error(domain, exception)
# DEBUG: print("DEBUG: EXIT!")