from fba import fba
def check_instance(args: argparse.Namespace) -> int:
- # DEBUG: print(f"DEBUG: args.domain='{args.domain}' - CALLED!")
+ print(f"DEBUG: args.domain='{args.domain}' - CALLED!")
status = 0
if not validators.domain(args.domain):
print(f"WARNING: args.domain='{args.domain}' is not valid")
else:
print(f"INFO: args.domain='{args.domain}' is not known")
- # DEBUG: print(f"DEBUG: status={status} - EXIT!")
+ print(f"DEBUG: status={status} - EXIT!")
return status
def fetch_bkali(args: argparse.Namespace):
- # DEBUG: print(f"DEBUG: args[]={type(args)} - CALLED!")
+ print(f"DEBUG: args[]={type(args)} - CALLED!")
domains = list()
try:
fetched = fba.post_json_api("gql.api.bka.li", "/v1/graphql", json.dumps({
"query": "query domainlist {nodeinfo(order_by: {domain: asc}) {domain}}"
}))
- # DEBUG: print(f"DEBUG: fetched({len(fetched)})[]='{type(fetched)}'")
+ print(f"DEBUG: fetched({len(fetched)})[]='{type(fetched)}'")
if len(fetched) == 0:
raise Exception("WARNING: Returned no records")
elif not "data" in fetched:
raise Exception(f"WARNING: fetched()={len(fetched['data'])} does not contain element 'nodeinfo'")
for entry in fetched["data"]["nodeinfo"]:
- # DEBUG: print(f"DEBUG: entry['{type(entry)}']='{entry}'")
+ print(f"DEBUG: entry['{type(entry)}']='{entry}'")
if not "domain" in entry:
print(f"WARNING: entry does not contain 'domain' - SKIPPED!")
continue
print(f"WARNING: domain='{entry['domain']}' is not a valid domain - SKIPPED!")
continue
elif fba.is_blacklisted(entry["domain"]):
- # DEBUG: print(f"DEBUG: domain='{entry['domain']}' is blacklisted - SKIPPED!")
+ print(f"DEBUG: domain='{entry['domain']}' is blacklisted - SKIPPED!")
continue
elif fba.is_instance_registered(entry["domain"]):
- # DEBUG: print(f"DEBUG: domain='{entry['domain']}' is already registered - SKIPPED!")
+ print(f"DEBUG: domain='{entry['domain']}' is already registered - SKIPPED!")
continue
- # DEBUG: print(f"DEBUG: Adding domain='{entry['domain']}' ...")
+ print(f"DEBUG: Adding domain='{entry['domain']}' ...")
domains.append(entry["domain"])
except BaseException as e:
print(f"ERROR: Cannot fetch graphql,exception[{type(e)}]:'{str(e)}'")
sys.exit(255)
- # DEBUG: print(f"DEBUG: domains()={len(domains)}")
+ print(f"DEBUG: domains()={len(domains)}")
if len(domains) > 0:
boot.acquire_lock()
print(f"INFO: Fetching instances from domain='{domain}' ...")
fba.fetch_instances(domain, None, None, sys.argv[0])
- # DEBUG: print("DEBUG: EXIT!")
+ print("DEBUG: EXIT!")
def fetch_blocks(args: argparse.Namespace):
print(f"DEBUG: args[]={type(args)} - CALLED!")
rows = fba.cursor.fetchall()
print(f"INFO: Checking {len(rows)} entries ...")
for blocker, software, origin, nodeinfo_url in rows:
- # DEBUG: print("DEBUG: BEFORE blocker,software,origin,nodeinfo_url:", blocker, software, origin, nodeinfo_url)
+ print("DEBUG: BEFORE blocker,software,origin,nodeinfo_url:", blocker, software, origin, nodeinfo_url)
blockdict = []
blocker = fba.tidyup_domain(blocker)
- # DEBUG: print("DEBUG: AFTER blocker,software:", blocker, software)
+ print("DEBUG: AFTER blocker,software:", blocker, software)
if blocker == "":
print("WARNING: blocker is now empty!")
print(f"WARNING: blocker='{blocker}' is blacklisted now!")
continue
- # DEBUG: print(f"DEBUG: blocker='{blocker}'")
+ print(f"DEBUG: blocker='{blocker}'")
fba.update_last_blocked(blocker)
if software == "pleroma":
print(f"WARNING: json()={len(json['metadata'])} does not have key 'federation', blocker='{blocker}'")
continue
- # DEBUG: print("DEBUG: Updating nodeinfo:", blocker)
+ print("DEBUG: Updating nodeinfo:", blocker)
fba.update_last_nodeinfo(blocker)
federation = json["metadata"]["federation"]
if "enabled" in federation:
- # DEBUG: print("DEBUG: Instance has no block list to analyze:", blocker)
+ print("DEBUG: Instance has no block list to analyze:", blocker)
continue
if "mrf_simple" in federation:
{**federation["mrf_simple"],
**{"quarantined_instances": federation["quarantined_instances"]}}
).items():
- # DEBUG: print("DEBUG: block_level, blocks():", block_level, len(blocks))
+ print("DEBUG: block_level, blocks():", block_level, len(blocks))
block_level = fba.tidyup_domain(block_level)
- # DEBUG: print("DEBUG: BEFORE block_level:", block_level)
+ print("DEBUG: BEFORE block_level:", block_level)
if block_level == "":
print("WARNING: block_level is now empty!")
continue
- # DEBUG: print(f"DEBUG: Checking {len(blocks)} entries from blocker='{blocker}',software='{software}',block_level='{block_level}' ...")
+ print(f"DEBUG: Checking {len(blocks)} entries from blocker='{blocker}',software='{software}',block_level='{block_level}' ...")
for blocked in blocks:
- # DEBUG: print("DEBUG: BEFORE blocked:", blocked)
+ print("DEBUG: BEFORE blocked:", blocked)
blocked = fba.tidyup_domain(blocked)
- # DEBUG: print("DEBUG: AFTER blocked:", blocked)
+ print("DEBUG: AFTER blocked:", blocked)
if blocked == "":
print("WARNING: blocked is empty after fba.tidyup_domain():", blocker, block_level)
continue
elif fba.is_blacklisted(blocked):
- # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
+ print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
continue
elif blocked.count("*") > 1:
# -ACK!-oma also started obscuring domains without hash
"SELECT domain, nodeinfo_url FROM instances WHERE domain LIKE ? ORDER BY rowid LIMIT 1", [blocked.replace("*", "_")]
)
searchres = fba.cursor.fetchone()
- # DEBUG: print("DEBUG: searchres[]:", type(searchres))
+ print("DEBUG: searchres[]:", type(searchres))
if searchres == None:
print(f"WARNING: Cannot deobsfucate blocked='{blocked}' - SKIPPED!")
blocked = searchres[0]
nodeinfo_url = searchres[1]
- # DEBUG: print("DEBUG: Looked up domain:", blocked)
+ print("DEBUG: Looked up domain:", blocked)
elif not validators.domain(blocked):
print(f"WARNING: blocked='{blocked}',software='{software}' is not a valid domain name - skipped!")
continue
- # DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
+ print("DEBUG: Looking up instance by domain:", blocked)
if not validators.domain(blocked):
print(f"WARNING: blocked='{blocked}',software='{software}' is not a valid domain name - skipped!")
continue
elif not fba.is_instance_registered(blocked):
- # DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., blocker='{blocker}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
+ print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., blocker='{blocker}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
fba.add_instance(blocked, blocker, sys.argv[0], nodeinfo_url)
if not fba.is_instance_blocked(blocker, blocked, block_level):
- # DEBUG: print("DEBUG: Blocking:", blocker, blocked, block_level)
+ print("DEBUG: Blocking:", blocker, blocked, block_level)
fba.block_instance(blocker, blocked, "unknown", block_level)
if block_level == "reject":
- # DEBUG: print("DEBUG: Adding to blockdict:", blocked)
+ print("DEBUG: Adding to blockdict:", blocked)
blockdict.append(
{
"blocked": blocked,
"reason" : None
})
else:
- # DEBUG: print(f"DEBUG: Updating block last seen for blocker='{blocker}',blocked='{blocked}' ...")
+ print(f"DEBUG: Updating block last seen for blocker='{blocker}',blocked='{blocked}' ...")
fba.update_last_seen(blocker, blocked, block_level)
- # DEBUG: print("DEBUG: Committing changes ...")
+ print("DEBUG: Committing changes ...")
fba.connection.commit()
# Reasons
if "mrf_simple_info" in federation:
- # DEBUG: print("DEBUG: Found mrf_simple_info:", blocker)
+ print("DEBUG: Found mrf_simple_info:", blocker)
for block_level, info in (
{**federation["mrf_simple_info"],
**(federation["quarantined_instances_info"]
if "quarantined_instances_info" in federation
else {})}
).items():
- # DEBUG: print("DEBUG: block_level, info.items():", block_level, len(info.items()))
+ print("DEBUG: block_level, info.items():", block_level, len(info.items()))
block_level = fba.tidyup_domain(block_level)
- # DEBUG: print("DEBUG: BEFORE block_level:", block_level)
+ print("DEBUG: BEFORE block_level:", block_level)
if block_level == "":
print("WARNING: block_level is now empty!")
continue
- # DEBUG: print(f"DEBUG: Checking {len(info.items())} entries from blocker='{blocker}',software='{software}',block_level='{block_level}' ...")
+ print(f"DEBUG: Checking {len(info.items())} entries from blocker='{blocker}',software='{software}',block_level='{block_level}' ...")
for blocked, reason in info.items():
- # DEBUG: print("DEBUG: BEFORE blocked:", blocked)
+ print("DEBUG: BEFORE blocked:", blocked)
blocked = fba.tidyup_domain(blocked)
- # DEBUG: print("DEBUG: AFTER blocked:", blocked)
+ print("DEBUG: AFTER blocked:", blocked)
if blocked == "":
print("WARNING: blocked is empty after fba.tidyup_domain():", blocker, block_level)
continue
elif fba.is_blacklisted(blocked):
- # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
+ print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
continue
elif blocked.count("*") > 1:
# same domain guess as above, but for reasons field
print(f"WARNING: blocked='{blocked}',software='{software}' is not a valid domain name - skipped!")
continue
- # DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
+ print("DEBUG: Looking up instance by domain:", blocked)
if not validators.domain(blocked):
print(f"WARNING: blocked='{blocked}',software='{software}' is not a valid domain name - skipped!")
continue
elif not fba.is_instance_registered(blocked):
- # DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., blocker='{blocker}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
+ print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., blocker='{blocker}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
fba.add_instance(blocked, blocker, sys.argv[0], nodeinfo_url)
- # DEBUG: print("DEBUG: Updating block reason:", blocker, blocked, reason["reason"])
+ print("DEBUG: Updating block reason:", blocker, blocked, reason["reason"])
fba.update_block_reason(reason["reason"], blocker, blocked, block_level)
for entry in blockdict:
if entry["blocked"] == blocked:
- # DEBUG: print("DEBUG: Updating entry reason:", blocked)
+ print("DEBUG: Updating entry reason:", blocked)
entry["reason"] = reason["reason"]
fba.connection.commit()
}
# handling CSRF, I've saw at least one server requiring it to access the endpoint
- # DEBUG: print("DEBUG: Fetching meta:", blocker)
+ print("DEBUG: Fetching meta:", blocker)
meta = bs4.BeautifulSoup(
fba.get_response(blocker, "/", fba.headers, (config.get("connection_timeout"), config.get("read_timeout"))).text,
"html.parser",
)
try:
csrf = meta.find("meta", attrs={"name": "csrf-token"})["content"]
- # DEBUG: print("DEBUG: Adding CSRF token:", blocker, csrf)
+ print("DEBUG: Adding CSRF token:", blocker, csrf)
reqheaders = {**fba.api_headers, **{"X-CSRF-Token": csrf}}
except BaseException as e:
- # DEBUG: print("DEBUG: No CSRF token found, using normal headers:", blocker, e)
+ print("DEBUG: No CSRF token found, using normal headers:", blocker, e)
reqheaders = fba.api_headers
- # DEBUG: print("DEBUG: Querying API domain_blocks:", blocker)
+ print("DEBUG: Querying API domain_blocks:", blocker)
blocks = fba.get_response(blocker, "/api/v1/instance/domain_blocks", reqheaders, (config.get("connection_timeout"), config.get("read_timeout"))).json()
print(f"INFO: Checking {len(blocks)} entries from blocker='{blocker}',software='{software}' ...")
'reason': block['comment']
}
- # DEBUG: print("DEBUG: severity,domain,hash,comment:", block['severity'], block['domain'], block['digest'], block['comment'])
+ print("DEBUG: severity,domain,hash,comment:", block['severity'], block['domain'], block['digest'], block['comment'])
if block['severity'] == 'suspend':
- # DEBUG: print(f"DEBUG: Adding entry='{entry}' with severity='{block['severity']}' ...")
+ print(f"DEBUG: Adding entry='{entry}' with severity='{block['severity']}' ...")
json['reject'].append(entry)
elif block['severity'] == 'silence':
- # DEBUG: print(f"DEBUG: Adding entry='{entry}' with severity='{block['severity']}' ...")
+ print(f"DEBUG: Adding entry='{entry}' with severity='{block['severity']}' ...")
json['followers_only'].append(entry)
elif block['severity'] == 'reject_media':
- # DEBUG: print(f"DEBUG: Adding entry='{entry}' with severity='{block['severity']}' ...")
+ print(f"DEBUG: Adding entry='{entry}' with severity='{block['severity']}' ...")
json['media_removal'].append(entry)
elif block['severity'] == 'reject_reports':
- # DEBUG: print(f"DEBUG: Adding entry='{entry}' with severity='{block['severity']}' ...")
+ print(f"DEBUG: Adding entry='{entry}' with severity='{block['severity']}' ...")
json['report_removal'].append(entry)
else:
print("WARNING: Unknown severity:", block['severity'], block['domain'])
except BaseException as e:
- # DEBUG: print(f"DEBUG: Failed, trying mastodon-specific fetches: blocker='{blocker}',exception[{type(e)}]={str(e)}")
+ print(f"DEBUG: Failed, trying mastodon-specific fetches: blocker='{blocker}',exception[{type(e)}]={str(e)}")
json = fba.get_mastodon_blocks(blocker)
print(f"INFO: Checking {len(json.items())} entries from blocker='{blocker}',software='{software}' ...")
for block_level, blocks in json.items():
- # DEBUG: print("DEBUG: blocker,block_level,blocks():", blocker, block_level, len(blocks))
+ print("DEBUG: blocker,block_level,blocks():", blocker, block_level, len(blocks))
block_level = fba.tidyup_domain(block_level)
- # DEBUG: print("DEBUG: AFTER-block_level:", block_level)
+ print("DEBUG: AFTER-block_level:", block_level)
if block_level == "":
print("WARNING: block_level is empty, blocker:", blocker)
continue
- # DEBUG: print(f"DEBUG: Checking {len(blocks)} entries from blocker='{blocker}',software='{software}',block_level='{block_level}' ...")
+ print(f"DEBUG: Checking {len(blocks)} entries from blocker='{blocker}',software='{software}',block_level='{block_level}' ...")
for block in blocks:
blocked, blocked_hash, reason = block.values()
- # DEBUG: print("DEBUG: blocked,hash,reason:", blocked, blocked_hash, reason)
+ print("DEBUG: blocked,hash,reason:", blocked, blocked_hash, reason)
blocked = fba.tidyup_domain(blocked)
- # DEBUG: print("DEBUG: AFTER-blocked:", blocked)
+ print("DEBUG: AFTER-blocked:", blocked)
if blocked == "":
print("WARNING: blocked is empty:", blocker)
continue
elif fba.is_blacklisted(blocked):
- # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
+ print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
continue
elif blocked.count("*") > 0:
# Doing the hash search for instance names as well to tidy up DB
print(f"WARNING: Cannot deobsfucate blocked='{blocked}',blocked_hash='{blocked_hash}' - SKIPPED!")
continue
- # DEBUG: print("DEBUG: Updating domain: ", searchres[0])
+ print("DEBUG: Updating domain: ", searchres[0])
blocked = searchres[0]
origin = searchres[1]
nodeinfo_url = searchres[2]
- # DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
+ print("DEBUG: Looking up instance by domain:", blocked)
if not validators.domain(blocked):
print(f"WARNING: blocked='{blocked}',software='{software}' is not a valid domain name - skipped!")
continue
elif not fba.is_instance_registered(blocked):
- # DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., blocker='{blocker}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
+ print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., blocker='{blocker}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
fba.add_instance(blocked, blocker, sys.argv[0], nodeinfo_url)
elif not validators.domain(blocked):
print(f"WARNING: blocked='{blocked}',software='{software}' is not a valid domain name - skipped!")
continue
- # DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
+ print("DEBUG: Looking up instance by domain:", blocked)
if not validators.domain(blocked):
print(f"WARNING: blocked='{blocked}',software='{software}' is not a valid domain name - skipped!")
continue
elif not fba.is_instance_registered(blocked):
- # DEBUG: print("DEBUG: Hash wasn't found, adding:", blocked, blocker)
+ print("DEBUG: Hash wasn't found, adding:", blocked, blocker)
fba.add_instance(blocked, blocker, sys.argv[0], nodeinfo_url)
blocking = blocked if blocked.count("*") <= 1 else blocked_hash
- # DEBUG: print(f"DEBUG: blocking='{blocking}',blocked='{blocked}',blocked_hash='{blocked_hash}'")
+ print(f"DEBUG: blocking='{blocking}',blocked='{blocked}',blocked_hash='{blocked_hash}'")
if not fba.is_instance_blocked(blocker, blocked, block_level):
- # DEBUG: print("DEBUG: Blocking:", blocker, blocked, block_level)
+ print("DEBUG: Blocking:", blocker, blocked, block_level)
fba.block_instance(blocker, blocking, reason, block_level)
if block_level == "reject":
"reason" : reason
})
else:
- # DEBUG: print(f"DEBUG: Updating block last seen and reason for blocker='{blocker}',blocking='{blocking}' ...")
+ print(f"DEBUG: Updating block last seen and reason for blocker='{blocker}',blocking='{blocking}' ...")
fba.update_last_seen(blocker, blocking, block_level)
fba.update_block_reason(reason, blocker, blocking, block_level)
- # DEBUG: print("DEBUG: Committing changes ...")
+ print("DEBUG: Committing changes ...")
fba.connection.commit()
except Exception as e:
print(f"ERROR: blocker='{blocker}',software='{software}',exception[{type(e)}]:'{str(e)}'")
print(f"INFO: Checking {len(json.items())} entries from blocker='{blocker}',software='{software}' ...")
for block_level, blocks in json.items():
- # DEBUG: print("DEBUG: blocker,block_level,blocks():", blocker, block_level, len(blocks))
+ print("DEBUG: blocker,block_level,blocks():", blocker, block_level, len(blocks))
block_level = fba.tidyup_domain(block_level)
- # DEBUG: print("DEBUG: AFTER-block_level:", block_level)
+ print("DEBUG: AFTER-block_level:", block_level)
if block_level == "":
print("WARNING: block_level is empty, blocker:", blocker)
continue
- # DEBUG: print(f"DEBUG: Checking {len(blocks)} entries from blocker='{blocker}',software='{software}',block_level='{block_level}' ...")
+ print(f"DEBUG: Checking {len(blocks)} entries from blocker='{blocker}',software='{software}',block_level='{block_level}' ...")
for block in blocks:
blocked, reason = block.values()
- # DEBUG: print("DEBUG: BEFORE blocked:", blocked)
+ print("DEBUG: BEFORE blocked:", blocked)
blocked = fba.tidyup_domain(blocked)
- # DEBUG: print("DEBUG: AFTER blocked:", blocked)
+ print("DEBUG: AFTER blocked:", blocked)
if blocked == "":
print("WARNING: blocked is empty:", blocker)
continue
elif fba.is_blacklisted(blocked):
- # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
+ print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
continue
elif blocked.count("*") > 0:
# Some friendica servers also obscure domains without hash
print(f"WARNING: blocked='{blocked}',software='{software}' is not a valid domain name - skipped!")
continue
- # DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
+ print("DEBUG: Looking up instance by domain:", blocked)
if not validators.domain(blocked):
print(f"WARNING: blocked='{blocked}',software='{software}' is not a valid domain name - skipped!")
continue
elif not fba.is_instance_registered(blocked):
- # DEBUG: print("DEBUG: Hash wasn't found, adding:", blocked, blocker)
+ print("DEBUG: Hash wasn't found, adding:", blocked, blocker)
fba.add_instance(blocked, blocker, sys.argv[0], nodeinfo_url)
if not fba.is_instance_blocked(blocker, blocked, block_level):
"reason" : reason
})
else:
- # DEBUG: print(f"DEBUG: Updating block last seen and reason for blocker='{blocker}',blocked='{blocked}' ...")
+ print(f"DEBUG: Updating block last seen and reason for blocker='{blocker}',blocked='{blocked}' ...")
fba.update_last_seen(blocker, blocked, block_level)
fba.update_block_reason(reason, blocker, blocked, block_level)
- # DEBUG: print("DEBUG: Committing changes ...")
+ print("DEBUG: Committing changes ...")
fba.connection.commit()
except Exception as e:
print(f"ERROR: blocker='{blocker}',software='{software}',exception[{type(e)}]:'{str(e)}'")
print(f"INFO: Checking {len(federation)} entries from blocker='{blocker}',software='{software}' ...")
for peer in federation:
blocked = peer["domain"].lower()
- # DEBUG: print("DEBUG: BEFORE blocked:", blocked)
+ print("DEBUG: BEFORE blocked:", blocked)
blocked = fba.tidyup_domain(blocked)
- # DEBUG: print("DEBUG: AFTER blocked:", blocked)
+ print("DEBUG: AFTER blocked:", blocked)
if blocked == "":
print("WARNING: blocked is empty:", blocker)
continue
elif fba.is_blacklisted(blocked):
- # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
+ print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
continue
elif blocked.count("*") > 0:
# GTS does not have hashes for obscured domains, so we have to guess it
print(f"WARNING: blocked='{blocked}',software='{software}' is not a valid domain name - skipped!")
continue
- # DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
+ print("DEBUG: Looking up instance by domain:", blocked)
if not validators.domain(blocked):
print(f"WARNING: blocked='{blocked}',software='{software}' is not a valid domain name - skipped!")
continue
elif not fba.is_instance_registered(blocked):
- # DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., blocker='{blocker}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
+ print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., blocker='{blocker}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
fba.add_instance(blocked, blocker, sys.argv[0], nodeinfo_url)
if not fba.is_instance_blocked(blocker, blocked, "reject"):
- # DEBUG: print(f"DEBUG: blocker='{blocker}' is blocking '{blocked}' for unknown reason at this point")
+ print(f"DEBUG: blocker='{blocker}' is blocking '{blocked}' for unknown reason at this point")
fba.block_instance(blocker, blocked, "unknown", "reject")
blockdict.append({
"reason" : None
})
else:
- # DEBUG: print(f"DEBUG: Updating block last seen for blocker='{blocker}',blocked='{blocked}' ...")
+ print(f"DEBUG: Updating block last seen for blocker='{blocker}',blocked='{blocked}' ...")
fba.update_last_seen(blocker, blocked, "reject")
if "public_comment" in peer:
- # DEBUG: print("DEBUG: Updating block reason:", blocker, blocked, peer["public_comment"])
+ print("DEBUG: Updating block reason:", blocker, blocked, peer["public_comment"])
fba.update_block_reason(peer["public_comment"], blocker, blocked, "reject")
for entry in blockdict:
if entry["blocked"] == blocked:
- # DEBUG: print(f"DEBUG: Setting block reason for blocked='{blocked}':'{peer['public_comment']}'")
+ print(f"DEBUG: Setting block reason for blocked='{blocked}':'{peer['public_comment']}'")
entry["reason"] = peer["public_comment"]
- # DEBUG: print("DEBUG: Committing changes ...")
+ print("DEBUG: Committing changes ...")
fba.connection.commit()
except Exception as e:
print(f"ERROR: blocker='{blocker}',software='{software}',exception[{type(e)}]:'{str(e)}'")
blockdict = []
- # DEBUG: print("DEBUG: EXIT!")
+ print("DEBUG: EXIT!")
def fetch_cs(args: argparse.Namespace):
- # DEBUG: print(f"DEBUG: args[]={type(args)} - CALLED!")
+ print(f"DEBUG: args[]={type(args)} - CALLED!")
domains = {
"silenced": list(),
"blocked": list(),
try:
doc = bs4.BeautifulSoup(
- reqto.get("https://meta.chaos.social/federation", headers=fba.headers, timeout=(config.get("connection_timeout"), config.get("read_timeout"))).text,
+ fba.get_response("meta.chaos.social", "/federation", fba.headers, (config.get("connection_timeout"), config.get("read_timeout"))).text,
"html.parser",
)
- # DEBUG: print(f"DEBUG: doc()={len(doc)}[]={type(doc)}")
+ print(f"DEBUG: doc()={len(doc)}[]={type(doc)}")
silenced = doc.find("h2", {"id": "silenced-instances"}).findNext("table")
- # DEBUG: print(f"DEBUG: silenced[]={type(silenced)}")
+ print(f"DEBUG: silenced[]={type(silenced)}")
domains["silenced"] = domains["silenced"] + find_domains(silenced)
blocked = doc.find("h2", {"id": "blocked-instances"}).findNext("table")
- # DEBUG: print(f"DEBUG: blocked[]={type(blocked)}")
+ print(f"DEBUG: blocked[]={type(blocked)}")
domains["blocked"] = domains["blocked"] + find_domains(blocked)
except BaseException as e:
print(f"ERROR: Cannot fetch from meta.chaos.social,exception[{type(e)}]:'{str(e)}'")
sys.exit(255)
- # DEBUG: print(f"DEBUG: domains()={len(domains)}")
+ print(f"DEBUG: domains()={len(domains)}")
if len(domains) > 0:
boot.acquire_lock()
print(f"INFO: Adding {len(domains)} new instances ...")
for block_level in domains:
- # DEBUG: print(f"DEBUG: block_level='{block_level}'")
+ print(f"DEBUG: block_level='{block_level}'")
for row in domains[block_level]:
- # DEBUG: print(f"DEBUG: row='{row}'")
+ print(f"DEBUG: row='{row}'")
if not fba.is_instance_registered(row["domain"]):
print(f"INFO: Fetching instances from domain='{row['domain']}' ...")
fba.fetch_instances(row["domain"], None, None, sys.argv[0])
if not fba.is_instance_blocked('chaos.social', row["domain"], block_level):
- # DEBUG: print(f"DEBUG: domain='{row['domain']}',block_level='{block_level}' blocked by chaos.social, adding ...")
+ print(f"DEBUG: domain='{row['domain']}',block_level='{block_level}' blocked by chaos.social, adding ...")
fba.block_instance('chaos.social', row["domain"], row["reason"], block_level)
- # DEBUG: print("DEBUG: Committing changes ...")
+ print("DEBUG: Committing changes ...")
fba.connection.commit()
- # DEBUG: print("DEBUG: EXIT!")
+ print("DEBUG: EXIT!")
def fetch_fba_rss(args: argparse.Namespace):
- # DEBUG: print(f"DEBUG: args[]={type(args)} - CALLED!")
+ print(f"DEBUG: args[]={type(args)} - CALLED!")
domains = list()
try:
print(f"INFO: Fetch FBA-specific RSS args.feed='{args.feed}' ...")
- response = reqto.get(args.feed, headers=fba.headers, timeout=(config.get("connection_timeout"), config.get("read_timeout")))
+ response = fba.get_url(args.feed, fba.headers, config.get("connection_timeout"), config.get("read_timeout"))
- # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code='{response.status_code}',response.text()={len(response.text)}")
+ print(f"DEBUG: response.ok={response.ok},response.status_code='{response.status_code}',response.text()={len(response.text)}")
if response.ok and response.status_code < 300 and len(response.text) > 0:
- # DEBUG: print(f"DEBUG: Parsing RSS feed ...")
+ print(f"DEBUG: Parsing RSS feed ...")
rss = atoma.parse_rss_bytes(response.content)
- # DEBUG: print(f"DEBUG: rss[]={type(rss)}")
+ print(f"DEBUG: rss[]={type(rss)}")
for item in rss.items:
- # DEBUG: print(f"DEBUG: item={item}")
+ print(f"DEBUG: item={item}")
domain = item.link.split("=")[1]
if fba.is_blacklisted(domain):
- # DEBUG: print(f"DEBUG: domain='{domain}' is blacklisted - SKIPPED!")
+ print(f"DEBUG: domain='{domain}' is blacklisted - SKIPPED!")
continue
elif domain in domains:
- # DEBUG: print(f"DEBUG: domain='{domain}' is already added - SKIPPED!")
+ print(f"DEBUG: domain='{domain}' is already added - SKIPPED!")
continue
elif fba.is_instance_registered(domain):
- # DEBUG: print(f"DEBUG: domain='{domain}' is already registered - SKIPPED!")
+ print(f"DEBUG: domain='{domain}' is already registered - SKIPPED!")
continue
- # DEBUG: print(f"DEBUG: domain='{domain}'")
+ print(f"DEBUG: domain='{domain}'")
domains.append(domain)
except BaseException as e:
print(f"ERROR: Cannot fetch feed='{feed}',exception[{type(e)}]:'{str(e)}'")
sys.exit(255)
- # DEBUG: print(f"DEBUG: domains()={len(domains)}")
+ print(f"DEBUG: domains()={len(domains)}")
if len(domains) > 0:
boot.acquire_lock()
print(f"INFO: Fetching instances from domain='{domain}' ...")
fba.fetch_instances(domain, None, None, sys.argv[0])
- # DEBUG: print("DEBUG: EXIT!")
+ print("DEBUG: EXIT!")
def fetch_fbabot_atom(args: argparse.Namespace):
- # DEBUG: print(f"DEBUG: args[]={type(args)} - CALLED!")
+ print(f"DEBUG: args[]={type(args)} - CALLED!")
feed = "https://ryona.agency/users/fba/feed.atom"
domains = list()
try:
print(f"INFO: Fetching ATOM feed='{feed}' from FBA bot account ...")
- response = reqto.get(feed, headers=fba.headers, timeout=(config.get("connection_timeout"), config.get("read_timeout")))
+ response = fba.get_url(feed, fba.headers, (config.get("connection_timeout"), config.get("read_timeout")))
- # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code='{response.status_code}',response.text()={len(response.text)}")
+ print(f"DEBUG: response.ok={response.ok},response.status_code='{response.status_code}',response.text()={len(response.text)}")
if response.ok and response.status_code < 300 and len(response.text) > 0:
- # DEBUG: print(f"DEBUG: Parsing ATOM feed ...")
+ print(f"DEBUG: Parsing ATOM feed ...")
atom = atoma.parse_atom_bytes(response.content)
- # DEBUG: print(f"DEBUG: atom[]={type(atom)}")
+ print(f"DEBUG: atom[]={type(atom)}")
for entry in atom.entries:
- # DEBUG: print(f"DEBUG: entry[]={type(entry)}")
+ print(f"DEBUG: entry[]={type(entry)}")
doc = bs4.BeautifulSoup(entry.content.value, "html.parser")
- # DEBUG: print(f"DEBUG: doc[]={type(doc)}")
+ print(f"DEBUG: doc[]={type(doc)}")
for element in doc.findAll("a"):
- # DEBUG: print(f"DEBUG: element[{type(element)}]={element}")
+ print(f"DEBUG: element[{type(element)}]={element}")
domain = fba.tidyup_domain(element["href"])
- # DEBUG: print(f"DEBUG: domain='{domain}'")
+ print(f"DEBUG: domain='{domain}'")
if fba.is_blacklisted(domain):
- # DEBUG: print(f"DEBUG: domain='{domain}' is blacklisted - SKIPPED!")
+ print(f"DEBUG: domain='{domain}' is blacklisted - SKIPPED!")
continue
elif domain in domains:
- # DEBUG: print(f"DEBUG: domain='{domain}' is already added - SKIPPED!")
+ print(f"DEBUG: domain='{domain}' is already added - SKIPPED!")
continue
elif fba.is_instance_registered(domain):
- # DEBUG: print(f"DEBUG: domain='{domain}' is already registered - SKIPPED!")
+ print(f"DEBUG: domain='{domain}' is already registered - SKIPPED!")
continue
- # DEBUG: print(f"DEBUG: domain='{domain}'")
+ print(f"DEBUG: domain='{domain}'")
domains.append(domain)
except BaseException as e:
print(f"ERROR: Cannot fetch feed='{feed}',exception[{type(e)}]:'{str(e)}'")
sys.exit(255)
- # DEBUG: print(f"DEBUG: domains()={len(domains)}")
+ print(f"DEBUG: domains()={len(domains)}")
if len(domains) > 0:
boot.acquire_lock()
print(f"INFO: Fetching instances from domain='{domain}' ...")
fba.fetch_instances(domain, None, None, sys.argv[0])
- # DEBUG: print("DEBUG: EXIT!")
+ print("DEBUG: EXIT!")
def fetch_instances(args: argparse.Namespace):
- # DEBUG: print(f"DEBUG: args[]={type(args)} - CALLED!")
+ print(f"DEBUG: args[]={type(args)} - CALLED!")
boot.acquire_lock()
# Initial fetch
rows = fba.cursor.fetchall()
print(f"INFO: Checking {len(rows)} entries ...")
for row in rows:
- # DEBUG: print("DEBUG: domain:", row[0])
+ print("DEBUG: domain:", row[0])
if fba.is_blacklisted(row[0]):
print("WARNING: domain is blacklisted:", row[0])
continue
print(f"INFO: Fetching instances for instance '{row[0]}' ('{row[2]}') of origin='{row[1]}',nodeinfo_url='{row[3]}'")
fba.fetch_instances(row[0], row[1], row[2], sys.argv[0], row[3])
- # DEBUG: print("DEBUG: EXIT!")
+ print("DEBUG: EXIT!")
import sqlite3
import sys
import time
+import urllib
import validators
from fba import cache
return type(var) in {int, str, float, bool} or var == None
def fetch_instances(domain: str, origin: str, software: str, script: str, path: str = None):
- # DEBUG: print(f"DEBUG: domain={domain},origin={origin},software={software},path={path} - CALLED!")
+ print(f"DEBUG: domain={domain},origin={origin},software={software},path={path} - CALLED!")
if type(domain) != str:
raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
raise ValueError(f"Parameter 'domain' cannot be empty")
if not is_instance_registered(domain):
- # DEBUG: print("DEBUG: Adding new domain:", domain, origin)
+ print("DEBUG: Adding new domain:", domain, origin)
add_instance(domain, origin, script, path)
- # DEBUG: print("DEBUG: Fetching instances for domain:", domain, software)
+ print("DEBUG: Fetching instances for domain:", domain, software)
peerlist = get_peers(domain, software)
if (peerlist is None):
print("ERROR: Cannot fetch peers:", domain)
return
elif instances.has_pending_instance_data(domain):
- # DEBUG: print(f"DEBUG: domain='{domain}' has pending nodeinfo data, flushing ...")
+ print(f"DEBUG: domain='{domain}' has pending nodeinfo data, flushing ...")
instances.update_instance_data(domain)
print(f"INFO: Checking {len(peerlist)} instances from {domain} ...")
# Skip "None" types as tidup() cannot parse them
continue
- # DEBUG: print(f"DEBUG: instance='{instance}' - BEFORE")
+ print(f"DEBUG: instance='{instance}' - BEFORE")
instance = tidyup_domain(instance)
- # DEBUG: print(f"DEBUG: instance='{instance}' - AFTER")
+ print(f"DEBUG: instance='{instance}' - AFTER")
if instance == "":
print("WARNING: Empty instance after tidyup_domain(), domain:", domain)
print(f"WARNING: Bad instance='{instance}' from domain='{domain}',origin='{origin}',software='{software}'")
continue
elif is_blacklisted(instance):
- # DEBUG: print("DEBUG: instance is blacklisted:", instance)
+ print("DEBUG: instance is blacklisted:", instance)
continue
- # DEBUG: print("DEBUG: Handling instance:", instance)
+ print("DEBUG: Handling instance:", instance)
try:
if not is_instance_registered(instance):
- # DEBUG: print("DEBUG: Adding new instance:", instance, domain)
+ print("DEBUG: Adding new instance:", instance, domain)
add_instance(instance, domain, sys.argv[0])
except BaseException as e:
print(f"ERROR: instance='{instance}',exception[{type(e)}]:'{str(e)}'")
continue
- # DEBUG: print("DEBUG: EXIT!")
+ print("DEBUG: EXIT!")
def add_peers(rows: dict) -> list:
- # DEBUG: print(f"DEBUG: rows()={len(rows)} - CALLED!")
+ print(f"DEBUG: rows()={len(rows)} - CALLED!")
peers = list()
for element in ["linked", "allowed", "blocked"]:
- # DEBUG: print(f"DEBUG: Checking element='{element}'")
+ print(f"DEBUG: Checking element='{element}'")
if element in rows and rows[element] != None:
- # DEBUG: print(f"DEBUG: Adding {len(rows[element])} peer(s) to peers list ...")
+ print(f"DEBUG: Adding {len(rows[element])} peer(s) to peers list ...")
for peer in rows[element]:
- # DEBUG: print(f"DEBUG: peer='{peer}' - BEFORE!")
+ print(f"DEBUG: peer='{peer}' - BEFORE!")
peer = tidyup_domain(peer)
- # DEBUG: print(f"DEBUG: peer='{peer}' - AFTER!")
+ print(f"DEBUG: peer='{peer}' - AFTER!")
if is_blacklisted(peer):
- # DEBUG: print(f"DEBUG: peer='{peer}' is blacklisted, skipped!")
+ print(f"DEBUG: peer='{peer}' is blacklisted, skipped!")
continue
- # DEBUG: print(f"DEBUG: Adding peer='{peer}' ...")
+ print(f"DEBUG: Adding peer='{peer}' ...")
peers.append(peer)
- # DEBUG: print(f"DEBUG: peers()={len(peers)} - EXIT!")
+ print(f"DEBUG: peers()={len(peers)} - EXIT!")
return peers
def remove_version(software: str) -> str:
- # DEBUG: print(f"DEBUG: software='{software}' - CALLED!")
+ print(f"DEBUG: software='{software}' - CALLED!")
if not "." in software and " " not in software:
print(f"WARNING: software='{software}' does not contain a version number.")
return software
elif " - " in software:
temp = software.split(" - ")[0]
- # DEBUG: print(f"DEBUG: software='{software}'")
+ print(f"DEBUG: software='{software}'")
version = None
if " " in software:
version = temp.split(" ")[-1]
elif "-" in software:
version = temp.split("-")[-1]
else:
- # DEBUG: print(f"DEBUG: Was not able to find common seperator, returning untouched software='{software}'")
+ print(f"DEBUG: Was not able to find common seperator, returning untouched software='{software}'")
return software
matches = None
match = None
- # DEBUG: print(f"DEBUG: Checking {len(patterns)} patterns ...")
+ print(f"DEBUG: Checking {len(patterns)} patterns ...")
for pattern in patterns:
# Run match()
match = pattern.match(version)
- # DEBUG: print(f"DEBUG: match[]={type(match)}")
+ print(f"DEBUG: match[]={type(match)}")
if type(match) is re.Match:
break
- # DEBUG: print(f"DEBUG: version[{type(version)}]='{version}',match='{match}'")
+ print(f"DEBUG: version[{type(version)}]='{version}',match='{match}'")
if type(match) is not re.Match:
print(f"WARNING: version='{version}' does not match regex, leaving software='{software}' untouched.")
return software
- # DEBUG: print(f"DEBUG: Found valid version number: '{version}', removing it ...")
+ print(f"DEBUG: Found valid version number: '{version}', removing it ...")
end = len(temp) - len(version) - 1
- # DEBUG: print(f"DEBUG: end[{type(end)}]={end}")
+ print(f"DEBUG: end[{type(end)}]={end}")
software = temp[0:end].strip()
if " version" in software:
- # DEBUG: print(f"DEBUG: software='{software}' contains word ' version'")
+ print(f"DEBUG: software='{software}' contains word ' version'")
software = strip_until(software, " version")
- # DEBUG: print(f"DEBUG: software='{software}' - EXIT!")
+ print(f"DEBUG: software='{software}' - EXIT!")
return software
def strip_powered_by(software: str) -> str:
- # DEBUG: print(f"DEBUG: software='{software}' - CALLED!")
+ print(f"DEBUG: software='{software}' - CALLED!")
if software == "":
print(f"ERROR: Bad method call, 'software' is empty")
raise Exception("Parameter 'software' is empty")
return software
start = software.find("powered by ")
- # DEBUG: print(f"DEBUG: start[{type(start)}]='{start}'")
+ print(f"DEBUG: start[{type(start)}]='{start}'")
software = software[start + 11:].strip()
- # DEBUG: print(f"DEBUG: software='{software}'")
+ print(f"DEBUG: software='{software}'")
software = strip_until(software, " - ")
- # DEBUG: print(f"DEBUG: software='{software}' - EXIT!")
+ print(f"DEBUG: software='{software}' - EXIT!")
return software
def strip_hosted_on(software: str) -> str:
- # DEBUG: print(f"DEBUG: software='{software}' - CALLED!")
+ print(f"DEBUG: software='{software}' - CALLED!")
if software == "":
print(f"ERROR: Bad method call, 'software' is empty")
raise Exception("Parameter 'software' is empty")
return software
end = software.find("hosted on ")
- # DEBUG: print(f"DEBUG: end[{type(end)}]='{end}'")
+ print(f"DEBUG: end[{type(end)}]='{end}'")
software = software[0, start].strip()
- # DEBUG: print(f"DEBUG: software='{software}'")
+ print(f"DEBUG: software='{software}'")
software = strip_until(software, " - ")
- # DEBUG: print(f"DEBUG: software='{software}' - EXIT!")
+ print(f"DEBUG: software='{software}' - EXIT!")
return software
def strip_until(software: str, until: str) -> str:
- # DEBUG: print(f"DEBUG: software='{software}',until='{until}' - CALLED!")
+ print(f"DEBUG: software='{software}',until='{until}' - CALLED!")
if software == "":
print(f"ERROR: Bad method call, 'software' is empty")
raise Exception("Parameter 'software' is empty")
# Next, strip until part
end = software.find(until)
- # DEBUG: print(f"DEBUG: end[{type(end)}]='{end}'")
+ print(f"DEBUG: end[{type(end)}]='{end}'")
if end > 0:
software = software[0:end].strip()
- # DEBUG: print(f"DEBUG: software='{software}' - EXIT!")
+ print(f"DEBUG: software='{software}' - EXIT!")
return software
def is_blacklisted(domain: str) -> bool:
except:
pass
- # DEBUG: print("DEBUG: EXIT!")
+ print("DEBUG: EXIT!")
def get_hash(domain: str) -> str:
if type(domain) != str:
elif domain == "":
raise ValueError(f"Parameter 'domain' cannot be empty")
- # DEBUG: print("DEBUG: Updating last_blocked for domain", domain)
+ print("DEBUG: Updating last_blocked for domain", domain)
instances.set("last_blocked", domain, time.time())
# Running pending updated
- # DEBUG: print(f"DEBUG: Invoking instances.update_instance_data({domain}) ...")
+ print(f"DEBUG: Invoking instances.update_instance_data({domain}) ...")
instances.update_instance_data(domain)
- # DEBUG: print("DEBUG: EXIT!")
+ print("DEBUG: EXIT!")
def log_error(domain: str, response: requests.models.Response):
- # DEBUG: print("DEBUG: domain,response[]:", domain, type(response))
+ print("DEBUG: domain,response[]:", domain, type(response))
if type(domain) != str:
raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
raise ValueError(f"Parameter 'domain' cannot be empty")
try:
- # DEBUG: print("DEBUG: BEFORE response[]:", type(response))
+ print("DEBUG: BEFORE response[]:", type(response))
if isinstance(response, BaseException) or isinstance(response, json.decoder.JSONDecodeError):
response = str(response)
- # DEBUG: print("DEBUG: AFTER response[]:", type(response))
+ print("DEBUG: AFTER response[]:", type(response))
if type(response) is str:
cursor.execute("INSERT INTO error_log (domain, error_code, error_message, created) VALUES (?, 999, ?, ?)",[
domain,
])
# Cleanup old entries
- # DEBUG: print(f"DEBUG: Purging old records (distance: {config.get('error_log_cleanup')})")
+ print(f"DEBUG: Purging old records (distance: {config.get('error_log_cleanup')})")
cursor.execute("DELETE FROM error_log WHERE created < ?", [time.time() - config.get("error_log_cleanup")])
except BaseException as e:
print(f"ERROR: failed SQL query: domain='{domain}',exception[{type(e)}]:'{str(e)}'")
sys.exit(255)
- # DEBUG: print("DEBUG: EXIT!")
+ print("DEBUG: EXIT!")
def update_last_error(domain: str, response: requests.models.Response):
- # DEBUG: print("DEBUG: domain,response[]:", domain, type(response))
+ print("DEBUG: domain,response[]:", domain, type(response))
if type(domain) != str:
raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
raise ValueError(f"Parameter 'domain' cannot be empty")
- # DEBUG: print("DEBUG: BEFORE response[]:", type(response))
+ print("DEBUG: BEFORE response[]:", type(response))
if isinstance(response, BaseException) or isinstance(response, json.decoder.JSONDecodeError):
response = f"{type}:str(response)"
- # DEBUG: print("DEBUG: AFTER response[]:", type(response))
+ print("DEBUG: AFTER response[]:", type(response))
if type(response) is str:
- # DEBUG: print(f"DEBUG: Setting last_error_details='{response}'");
+ print(f"DEBUG: Setting last_error_details='{response}'");
instances.set("last_status_code" , domain, 999)
instances.set("last_error_details", domain, response)
else:
- # DEBUG: print(f"DEBUG: Setting last_error_details='{response.reason}'");
+ print(f"DEBUG: Setting last_error_details='{response.reason}'");
instances.set("last_status_code" , domain, response.status_code)
instances.set("last_error_details", domain, response.reason)
# Running pending updated
- # DEBUG: print(f"DEBUG: Invoking instances.update_instance_data({domain}) ...")
+ print(f"DEBUG: Invoking instances.update_instance_data({domain}) ...")
instances.update_instance_data(domain)
log_error(domain, response)
- # DEBUG: print("DEBUG: EXIT!")
+ print("DEBUG: EXIT!")
def update_last_instance_fetch(domain: str):
- # DEBUG: print(f"DEBUG: domain={domain} - CALLED!")
+ print(f"DEBUG: domain={domain} - CALLED!")
if type(domain) != str:
raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
raise ValueError(f"Parameter 'domain' cannot be empty")
- # DEBUG: print("DEBUG: Updating last_instance_fetch for domain:", domain)
+ print("DEBUG: Updating last_instance_fetch for domain:", domain)
instances.set("last_instance_fetch", domain, time.time())
# Running pending updated
- # DEBUG: print(f"DEBUG: Invoking instances.update_instance_data({domain}) ...")
+ print(f"DEBUG: Invoking instances.update_instance_data({domain}) ...")
instances.update_instance_data(domain)
- # DEBUG: print("DEBUG: EXIT!")
+ print("DEBUG: EXIT!")
def update_last_nodeinfo(domain: str):
- # DEBUG: print(f"DEBUG: domain={domain} - CALLED!")
+ print(f"DEBUG: domain={domain} - CALLED!")
if type(domain) != str:
raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
raise ValueError(f"Parameter 'domain' cannot be empty")
- # DEBUG: print("DEBUG: Updating last_nodeinfo for domain:", domain)
+ print("DEBUG: Updating last_nodeinfo for domain:", domain)
instances.set("last_nodeinfo", domain, time.time())
instances.set("last_updated" , domain, time.time())
# Running pending updated
- # DEBUG: print(f"DEBUG: Invoking instances.update_instance_data({domain}) ...")
+ print(f"DEBUG: Invoking instances.update_instance_data({domain}) ...")
instances.update_instance_data(domain)
- # DEBUG: print("DEBUG: EXIT!")
+ print("DEBUG: EXIT!")
def get_peers(domain: str, software: str) -> list:
- # DEBUG: print(f"DEBUG: domain({len(domain)})={domain},software={software} - CALLED!")
+ print(f"DEBUG: domain({len(domain)})={domain},software={software} - CALLED!")
if type(domain) != str:
raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
peers = list()
if software == "misskey":
- # DEBUG: print(f"DEBUG: domain='{domain}' is misskey, sending API POST request ...")
+ print(f"DEBUG: domain='{domain}' is misskey, sending API POST request ...")
offset = 0
step = config.get("misskey_limit")
# instances page-by-page, since that troonware doesn't support
# sending them all at once
while True:
- # DEBUG: print(f"DEBUG: Fetching offset='{offset}' from '{domain}' ...")
+ print(f"DEBUG: Fetching offset='{offset}' from '{domain}' ...")
if offset == 0:
fetched = post_json_api(domain, "/api/federation/instances", json.dumps({
"sort" : "+pubAt",
"Origin": domain
})
- # DEBUG: print(f"DEBUG: fetched()={len(fetched)}")
+ print(f"DEBUG: fetched()={len(fetched)}")
if len(fetched) == 0:
- # DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
+ print("DEBUG: Returned zero bytes, exiting loop:", domain)
break
elif len(fetched) != config.get("misskey_limit"):
- # DEBUG: print(f"DEBUG: Fetched '{len(fetched)}' row(s) but expected: '{config.get('misskey_limit')}'")
+ print(f"DEBUG: Fetched '{len(fetched)}' row(s) but expected: '{config.get('misskey_limit')}'")
offset = offset + (config.get("misskey_limit") - len(fetched))
else:
- # DEBUG: print("DEBUG: Raising offset by step:", step)
+ print("DEBUG: Raising offset by step:", step)
offset = offset + step
# Check records
- # DEBUG: print(f"DEBUG: fetched({len(fetched)})[]={type(fetched)}")
+ print(f"DEBUG: fetched({len(fetched)})[]={type(fetched)}")
if isinstance(fetched, dict) and "error" in fetched and "message" in fetched["error"]:
print(f"WARNING: post_json_api() returned error: {fetched['error']['message']}")
update_last_error(domain, fetched["error"]["message"])
already = 0
for row in fetched:
- # DEBUG: print(f"DEBUG: row():{len(row)}")
+ print(f"DEBUG: row():{len(row)}")
if not "host" in row:
print(f"WARNING: row()={len(row)} does not contain element 'host': {row},domain='{domain}'")
continue
print(f"WARNING: row[host][]={type(row['host'])} is not 'str'")
continue
elif is_blacklisted(row["host"]):
- # DEBUG: print(f"DEBUG: row[host]='{row['host']}' is blacklisted. domain='{domain}'")
+ print(f"DEBUG: row[host]='{row['host']}' is blacklisted. domain='{domain}'")
continue
elif row["host"] in peers:
- # DEBUG: print(f"DEBUG: Not adding row[host]='{row['host']}', already found.")
+ print(f"DEBUG: Not adding row[host]='{row['host']}', already found.")
already = already + 1
continue
- # DEBUG: print(f"DEBUG: Adding peer: '{row['host']}'")
+ print(f"DEBUG: Adding peer: '{row['host']}'")
peers.append(row["host"])
if already == len(fetched):
print(f"WARNING: Host returned same set of '{already}' instances, aborting loop!")
break
- # DEBUG: print(f"DEBUG: Adding '{len(peers)}' for domain='{domain}'")
+ print(f"DEBUG: Adding '{len(peers)}' for domain='{domain}'")
instances.set("total_peers", domain, len(peers))
- # DEBUG: print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...")
+ print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...")
update_last_instance_fetch(domain)
- # DEBUG: print("DEBUG: Returning peers[]:", type(peers))
+ print("DEBUG: Returning peers[]:", type(peers))
return peers
elif software == "lemmy":
- # DEBUG: print(f"DEBUG: domain='{domain}' is Lemmy, fetching JSON ...")
+ print(f"DEBUG: domain='{domain}' is Lemmy, fetching JSON ...")
try:
response = get_response(domain, "/api/v3/site", api_headers, (config.get("connection_timeout"), config.get("read_timeout")))
data = json_from_response(response)
- # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code='{response.status_code}',data[]='{type(data)}'")
+ print(f"DEBUG: response.ok={response.ok},response.status_code='{response.status_code}',data[]='{type(data)}'")
if not response.ok or response.status_code >= 400:
print("WARNING: Could not reach any JSON API:", domain)
update_last_error(domain, response)
elif response.ok and isinstance(data, list):
- # DEBUG: print(f"DEBUG: domain='{domain}' returned a list: '{data}'")
+ print(f"DEBUG: domain='{domain}' returned a list: '{data}'")
sys.exit(255)
elif "federated_instances" in data:
- # DEBUG: print(f"DEBUG: Found federated_instances for domain='{domain}'")
+ print(f"DEBUG: Found federated_instances for domain='{domain}'")
peers = peers + add_peers(data["federated_instances"])
- # DEBUG: print("DEBUG: Added instance(s) to peers")
+ print("DEBUG: Added instance(s) to peers")
else:
print("WARNING: JSON response does not contain 'federated_instances':", domain)
update_last_error(domain, response)
except BaseException as e:
print(f"WARNING: Exception during fetching JSON: domain='{domain}',exception[{type(e)}]:'{str(e)}'")
- # DEBUG: print(f"DEBUG: Adding '{len(peers)}' for domain='{domain}'")
+ print(f"DEBUG: Adding '{len(peers)}' for domain='{domain}'")
instances.set("total_peers", domain, len(peers))
- # DEBUG: print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...")
+ print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...")
update_last_instance_fetch(domain)
- # DEBUG: print("DEBUG: Returning peers[]:", type(peers))
+ print("DEBUG: Returning peers[]:", type(peers))
return peers
elif software == "peertube":
- # DEBUG: print(f"DEBUG: domain='{domain}' is a PeerTube, fetching JSON ...")
+ print(f"DEBUG: domain='{domain}' is a PeerTube, fetching JSON ...")
start = 0
for mode in ["followers", "following"]:
- # DEBUG: print(f"DEBUG: domain='{domain}',mode='{mode}'")
+ print(f"DEBUG: domain='{domain}',mode='{mode}'")
while True:
try:
response = get_response(domain, "/api/v1/server/{mode}?start={start}&count=100", headers, (config.get("connection_timeout"), config.get("read_timeout")))
data = json_from_response(response)
- # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code='{response.status_code}',data[]='{type(data)}'")
+ print(f"DEBUG: response.ok={response.ok},response.status_code='{response.status_code}',data[]='{type(data)}'")
if response.ok and isinstance(data, dict):
- # DEBUG: print("DEBUG: Success, data:", len(data))
+ print("DEBUG: Success, data:", len(data))
if "data" in data:
- # DEBUG: print(f"DEBUG: Found {len(data['data'])} record(s).")
+ print(f"DEBUG: Found {len(data['data'])} record(s).")
for record in data["data"]:
- # DEBUG: print(f"DEBUG: record()={len(record)}")
+ print(f"DEBUG: record()={len(record)}")
if mode in record and "host" in record[mode]:
- # DEBUG: print(f"DEBUG: Found host={record[mode]['host']}, adding ...")
+ print(f"DEBUG: Found host={record[mode]['host']}, adding ...")
peers.append(record[mode]["host"])
else:
print(f"WARNING: record from '{domain}' has no '{mode}' or 'host' record: {record}")
if len(data["data"]) < 100:
- # DEBUG: print("DEBUG: Reached end of JSON response:", domain)
+ print("DEBUG: Reached end of JSON response:", domain)
break
# Continue with next row
except BaseException as e:
print(f"WARNING: Exception during fetching JSON: domain='{domain}',exception[{type(e)}]:'{str(e)}'")
- # DEBUG: print(f"DEBUG: Adding '{len(peers)}' for domain='{domain}'")
+ print(f"DEBUG: Adding '{len(peers)}' for domain='{domain}'")
instances.set("total_peers", domain, len(peers))
- # DEBUG: print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...")
+ print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...")
update_last_instance_fetch(domain)
- # DEBUG: print("DEBUG: Returning peers[]:", type(peers))
+ print("DEBUG: Returning peers[]:", type(peers))
return peers
- # DEBUG: print(f"DEBUG: Fetching get_peers_url='{get_peers_url}' from '{domain}' ...")
+ print(f"DEBUG: Fetching get_peers_url='{get_peers_url}' from '{domain}' ...")
try:
response = get_response(domain, get_peers_url, api_headers, (config.get("connection_timeout"), config.get("read_timeout")))
data = json_from_response(response)
- # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'")
+ print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'")
if not response.ok or response.status_code >= 400:
- # DEBUG: print(f"DEBUG: Was not able to fetch '{get_peers_url}', trying alternative ...")
+ print(f"DEBUG: Was not able to fetch '{get_peers_url}', trying alternative ...")
response = get_response(domain, "/api/v3/site", api_headers, (config.get("connection_timeout"), config.get("read_timeout")))
data = json_from_response(response)
- # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'")
+ print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'")
if not response.ok or response.status_code >= 400:
print("WARNING: Could not reach any JSON API:", domain)
update_last_error(domain, response)
elif response.ok and isinstance(data, list):
- # DEBUG: print(f"DEBUG: domain='{domain}' returned a list: '{data}'")
+ print(f"DEBUG: domain='{domain}' returned a list: '{data}'")
sys.exit(255)
elif "federated_instances" in data:
- # DEBUG: print(f"DEBUG: Found federated_instances for domain='{domain}'")
+ print(f"DEBUG: Found federated_instances for domain='{domain}'")
peers = peers + add_peers(data["federated_instances"])
- # DEBUG: print("DEBUG: Added instance(s) to peers")
+ print("DEBUG: Added instance(s) to peers")
else:
print("WARNING: JSON response does not contain 'federated_instances':", domain)
update_last_error(domain, response)
else:
- # DEBUG: print("DEBUG: Querying API was successful:", domain, len(data))
+ print("DEBUG: Querying API was successful:", domain, len(data))
peers = data
except BaseException as e:
print("WARNING: Some error during get():", domain, e)
update_last_error(domain, e)
- # DEBUG: print(f"DEBUG: Adding '{len(peers)}' for domain='{domain}'")
+ print(f"DEBUG: Adding '{len(peers)}' for domain='{domain}'")
instances.set("total_peers", domain, len(peers))
- # DEBUG: print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...")
+ print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...")
update_last_instance_fetch(domain)
- # DEBUG: print("DEBUG: Returning peers[]:", type(peers))
+ print("DEBUG: Returning peers[]:", type(peers))
return peers
def post_json_api(domain: str, path: str, parameter: str, extra_headers: dict = {}) -> dict:
elif type(parameter) != str:
raise ValueError(f"parameter[]={type(parameter)} is not 'str'")
- # DEBUG: print("DEBUG: Sending POST to domain,path,parameter:", domain, path, parameter, extra_headers)
+ print("DEBUG: Sending POST to domain,path,parameter:", domain, path, parameter, extra_headers)
data = {}
try:
- response = reqto.post(f"https://{domain}{path}", data=parameter, headers={**api_headers, **extra_headers}, timeout=(config.get("connection_timeout"), config.get("read_timeout")))
+ response = reqto.post(
+ f"https://{domain}{path}",
+ data=parameter,
+ headers={**api_headers, **extra_headers},
+ timeout=(config.get("connection_timeout"), config.get("read_timeout"))
+ )
data = json_from_response(response)
- # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'")
+ print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'")
if not response.ok or response.status_code >= 400:
print(f"WARNING: Cannot query JSON API: domain='{domain}',path='{path}',parameter()={len(parameter)},response.status_code='{response.status_code}',data[]='{type(data)}'")
update_last_error(domain, response)
except BaseException as e:
print(f"WARNING: Some error during post(): domain='{domain}',path='{path}',parameter()={len(parameter)},exception[{type(e)}]:'{str(e)}'")
- # DEBUG: print(f"DEBUG: Returning data({len(data)})=[]:{type(data)}")
+ print(f"DEBUG: Returning data({len(data)})=[]:{type(data)}")
return data
def fetch_nodeinfo(domain: str, path: str = None) -> list:
- # DEBUG: print(f"DEBUG: domain='{domain}',path={path} - CALLED!")
+ print(f"DEBUG: domain='{domain}',path={path} - CALLED!")
if type(domain) != str:
raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
elif type(path) != str and path != None:
raise ValueError(f"Parameter path[]={type(path)} is not 'str'")
- # DEBUG: print("DEBUG: Fetching nodeinfo from domain,path:", domain, path)
+ print(f"DEBUG: Fetching nodeinfo from domain='{domain}' ...")
nodeinfo = fetch_wellknown_nodeinfo(domain)
- # DEBUG: print(f"DEBUG: nodeinfo({len(nodeinfo)})={nodeinfo}")
+ print(f"DEBUG: nodeinfo({len(nodeinfo)})={nodeinfo}")
if len(nodeinfo) > 0:
- # DEBUG: print("DEBUG: nodeinfo()={len(nodeinfo))} - EXIT!")
+ print("DEBUG: nodeinfo()={len(nodeinfo))} - EXIT!")
return nodeinfo
- requests = [
- f"https://{domain}/nodeinfo/2.1.json",
- f"https://{domain}/nodeinfo/2.1",
- f"https://{domain}/nodeinfo/2.0.json",
- f"https://{domain}/nodeinfo/2.0",
- f"https://{domain}/nodeinfo/1.0",
- f"https://{domain}/api/v1/instance"
+ request_paths = [
+ "/nodeinfo/2.1.json",
+ "/nodeinfo/2.1",
+ "/nodeinfo/2.0.json",
+ "/nodeinfo/2.0",
+ "/nodeinfo/1.0",
+ "/api/v1/instance"
]
data = {}
- for request in requests:
- if path != None and path != "" and request != path:
- # DEBUG: print(f"DEBUG: path='{path}' does not match request='{request}' - SKIPPED!")
+ for request in request_paths:
+ if path != None and path != "" and path != f"https://{domain}{path}":
+ print(f"DEBUG: path='{path}' does not match request='{request}' - SKIPPED!")
continue
try:
- # DEBUG: print("DEBUG: Fetching request:", request)
- response = reqto.get(request, headers=api_headers, timeout=(config.get("nodeinfo_connection_timeout"), config.get("nodeinfo_read_timeout")))
+ print(f"DEBUG: Fetching request='{request}' from domain='{domain}' ...")
+ response = get_response(domain, request, api_headers, (config.get("nodeinfo_connection_timeout"), config.get("nodeinfo_read_timeout")))
data = json_from_response(response)
- # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'")
+ print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'")
if response.ok and isinstance(data, dict):
- # DEBUG: print("DEBUG: Success:", request)
+ print("DEBUG: Success:", request)
instances.set("detection_mode", domain, "STATIC_CHECK")
instances.set("nodeinfo_url" , domain, request)
break
elif response.ok and isinstance(data, list):
- # DEBUG: print(f"DEBUG: domain='{domain}' returned a list: '{data}'")
+ print(f"UNSUPPORTED: domain='{domain}' returned a list: '{data}'")
sys.exit(255)
elif not response.ok or response.status_code >= 400:
print("WARNING: Failed fetching nodeinfo from domain:", domain)
continue
except BaseException as e:
- # DEBUG: print("DEBUG: Cannot fetch API request:", request)
+ print("DEBUG: Cannot fetch API request:", request)
update_last_error(domain, e)
pass
- # DEBUG: print(f"DEBUG: data()={len(data)} - EXIT!")
+ print(f"DEBUG: data()={len(data)} - EXIT!")
return data
def fetch_wellknown_nodeinfo(domain: str) -> list:
- # DEBUG: print(f"DEBUG: domain={domain} - CALLED!")
+ print(f"DEBUG: domain={domain} - CALLED!")
if type(domain) != str:
raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
raise ValueError(f"Parameter 'domain' cannot be empty")
- # DEBUG: print("DEBUG: Fetching .well-known info for domain:", domain)
+ print("DEBUG: Fetching .well-known info for domain:", domain)
data = {}
try:
response = get_response(domain, "/.well-known/nodeinfo", api_headers, (config.get("nodeinfo_connection_timeout"), config.get("nodeinfo_read_timeout")))
data = json_from_response(response)
- # DEBUG: print("DEBUG: domain,response.ok,data[]:", domain, response.ok, type(data))
+ print("DEBUG: domain,response.ok,data[]:", domain, response.ok, type(data))
if response.ok and isinstance(data, dict):
nodeinfo = data
- # DEBUG: print("DEBUG: Found entries:", len(nodeinfo), domain)
+ print("DEBUG: Found entries:", len(nodeinfo), domain)
if "links" in nodeinfo:
- # DEBUG: print("DEBUG: Found links in nodeinfo():", len(nodeinfo["links"]))
+ print("DEBUG: Found links in nodeinfo():", len(nodeinfo["links"]))
for link in nodeinfo["links"]:
- # DEBUG: print("DEBUG: rel,href:", link["rel"], link["href"])
+ print("DEBUG: rel,href:", link["rel"], link["href"])
if link["rel"] in nodeinfo_identifier:
- # DEBUG: print("DEBUG: Fetching nodeinfo from:", link["href"])
- response = reqto.get(link["href"])
+ print("DEBUG: Fetching nodeinfo from:", link["href"])
+ response = get_url(link["href"])
data = json_from_response(response)
- # DEBUG: print("DEBUG: href,response.ok,response.status_code:", link["href"], response.ok, response.status_code)
+ print("DEBUG: href,response.ok,response.status_code:", link["href"], response.ok, response.status_code)
if response.ok and isinstance(data, dict):
- # DEBUG: print("DEBUG: Found JSON nodeinfo():", len(data))
+ print("DEBUG: Found JSON nodeinfo():", len(data))
instances.set("detection_mode", domain, "AUTO_DISCOVERY")
instances.set("nodeinfo_url" , domain, link["href"])
break
update_last_error(domain, e)
pass
- # DEBUG: print("DEBUG: Returning data[]:", type(data))
+ print("DEBUG: Returning data[]:", type(data))
return data
def fetch_generator_from_path(domain: str, path: str = "/") -> str:
- # DEBUG: print(f"DEBUG: domain({len(domain)})={domain},path={path} - CALLED!")
+ print(f"DEBUG: domain({len(domain)})={domain},path={path} - CALLED!")
if type(domain) != str:
raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
elif path == "":
raise ValueError(f"Parameter 'domain' cannot be empty")
- # DEBUG: print(f"DEBUG: domain='{domain}',path='{path}' - CALLED!")
+ print(f"DEBUG: domain='{domain}',path='{path}' - CALLED!")
software = None
try:
- # DEBUG: print(f"DEBUG: Fetching path='{path}' from '{domain}' ...")
+ print(f"DEBUG: Fetching path='{path}' from '{domain}' ...")
response = get_response(domain, path, headers, (config.get("connection_timeout"), config.get("read_timeout")))
- # DEBUG: print("DEBUG: domain,response.ok,response.status_code,response.text[]:", domain, response.ok, response.status_code, type(response.text))
+ print("DEBUG: domain,response.ok,response.status_code,response.text[]:", domain, response.ok, response.status_code, type(response.text))
if response.ok and response.status_code < 300 and len(response.text) > 0:
- # DEBUG: print("DEBUG: Search for <meta name='generator'>:", domain)
+ print("DEBUG: Search for <meta name='generator'>:", domain)
doc = bs4.BeautifulSoup(response.text, "html.parser")
- # DEBUG: print("DEBUG: doc[]:", type(doc))
+ print("DEBUG: doc[]:", type(doc))
generator = doc.find("meta", {"name": "generator"})
site_name = doc.find("meta", {"property": "og:site_name"})
- # DEBUG: print(f"DEBUG: generator='{generator}',site_name='{site_name}'")
+ print(f"DEBUG: generator='{generator}',site_name='{site_name}'")
if isinstance(generator, bs4.element.Tag):
- # DEBUG: print("DEBUG: Found generator meta tag:", domain)
+ print("DEBUG: Found generator meta tag:", domain)
software = tidyup_domain(generator.get("content"))
print(f"INFO: domain='{domain}' is generated by '{software}'")
instances.set("detection_mode", domain, "GENERATOR")
remove_pending_error(domain)
elif isinstance(site_name, bs4.element.Tag):
- # DEBUG: print("DEBUG: Found property=og:site_name:", domain)
+ print("DEBUG: Found property=og:site_name:", domain)
sofware = tidyup_domain(site_name.get("content"))
print(f"INFO: domain='{domain}' has og:site_name='{software}'")
instances.set("detection_mode", domain, "SITE_NAME")
remove_pending_error(domain)
except BaseException as e:
- # DEBUG: print(f"DEBUG: Cannot fetch / from '{domain}':", e)
+ print(f"DEBUG: Cannot fetch / from '{domain}':", e)
update_last_error(domain, e)
pass
- # DEBUG: print(f"DEBUG: software[]={type(software)}")
+ print(f"DEBUG: software[]={type(software)}")
if type(software) is str and software == "":
- # DEBUG: print(f"DEBUG: Corrected empty string to None for software of domain='{domain}'")
+ print(f"DEBUG: Corrected empty string to None for software of domain='{domain}'")
software = None
elif type(software) is str and ("." in software or " " in software):
- # DEBUG: print(f"DEBUG: software='{software}' may contain a version number, domain='{domain}', removing it ...")
+ print(f"DEBUG: software='{software}' may contain a version number, domain='{domain}', removing it ...")
software = remove_version(software)
- # DEBUG: print(f"DEBUG: software[]={type(software)}")
+ print(f"DEBUG: software[]={type(software)}")
if type(software) is str and " powered by " in software:
- # DEBUG: print(f"DEBUG: software='{software}' has 'powered by' in it")
+ print(f"DEBUG: software='{software}' has 'powered by' in it")
software = remove_version(strip_powered_by(software))
elif type(software) is str and " hosted on " in software:
- # DEBUG: print(f"DEBUG: software='{software}' has 'hosted on' in it")
+ print(f"DEBUG: software='{software}' has 'hosted on' in it")
software = remove_version(strip_hosted_on(software))
elif type(software) is str and " by " in software:
- # DEBUG: print(f"DEBUG: software='{software}' has ' by ' in it")
+ print(f"DEBUG: software='{software}' has ' by ' in it")
software = strip_until(software, " by ")
elif type(software) is str and " see " in software:
- # DEBUG: print(f"DEBUG: software='{software}' has ' see ' in it")
+ print(f"DEBUG: software='{software}' has ' see ' in it")
software = strip_until(software, " see ")
- # DEBUG: print(f"DEBUG: software='{software}' - EXIT!")
+ print(f"DEBUG: software='{software}' - EXIT!")
return software
def determine_software(domain: str, path: str = None) -> str:
- # DEBUG: print(f"DEBUG: domain({len(domain)})={domain},path={path} - CALLED!")
+ print(f"DEBUG: domain({len(domain)})={domain},path={path} - CALLED!")
if type(domain) != str:
raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
elif type(path) != str and path != None:
raise ValueError(f"Parameter path[]={type(path)} is not 'str'")
- # DEBUG: print("DEBUG: Determining software for domain,path:", domain, path)
+ print("DEBUG: Determining software for domain,path:", domain, path)
software = None
- # DEBUG: print(f"DEBUG: Fetching nodeinfo from '{domain}' ...")
+ print(f"DEBUG: Fetching nodeinfo from '{domain}' ...")
data = fetch_nodeinfo(domain, path)
- # DEBUG: print("DEBUG: data[]:", type(data))
+ print("DEBUG: data[]:", type(data))
if not isinstance(data, dict) or len(data) == 0:
- # DEBUG: print("DEBUG: Could not determine software type:", domain)
+ print("DEBUG: Could not determine software type:", domain)
return fetch_generator_from_path(domain)
- # DEBUG: print("DEBUG: data():", len(data), data)
+ print("DEBUG: data():", len(data), data)
if "status" in data and data["status"] == "error" and "message" in data:
print("WARNING: JSON response is an error:", data["message"])
update_last_error(domain, data["message"])
update_last_error(domain, data["message"])
return fetch_generator_from_path(domain)
elif "software" not in data or "name" not in data["software"]:
- # DEBUG: print(f"DEBUG: JSON response from domain='{domain}' does not include [software][name], fetching / ...")
+ print(f"DEBUG: JSON response from domain='{domain}' does not include [software][name], fetching / ...")
software = fetch_generator_from_path(domain)
- # DEBUG: print(f"DEBUG: Generator for domain='{domain}' is: {software}, EXIT!")
+ print(f"DEBUG: Generator for domain='{domain}' is: {software}, EXIT!")
return software
software = tidyup_domain(data["software"]["name"])
- # DEBUG: print("DEBUG: sofware after tidyup_domain():", software)
+ print("DEBUG: sofware after tidyup_domain():", software)
if software in ["akkoma", "rebased"]:
- # DEBUG: print("DEBUG: Setting pleroma:", domain, software)
+ print("DEBUG: Setting pleroma:", domain, software)
software = "pleroma"
elif software in ["hometown", "ecko"]:
- # DEBUG: print("DEBUG: Setting mastodon:", domain, software)
+ print("DEBUG: Setting mastodon:", domain, software)
software = "mastodon"
elif software in ["calckey", "groundpolis", "foundkey", "cherrypick", "meisskey"]:
- # DEBUG: print("DEBUG: Setting misskey:", domain, software)
+ print("DEBUG: Setting misskey:", domain, software)
software = "misskey"
elif software.find("/") > 0:
print("WARNING: Spliting of slash:", software)
print("WARNING: Spliting of pipe:", software)
software = tidyup_domain(software.split("|")[0]);
elif "powered by" in software:
- # DEBUG: print(f"DEBUG: software='{software}' has 'powered by' in it")
+ print(f"DEBUG: software='{software}' has 'powered by' in it")
software = strip_powered_by(software)
elif type(software) is str and " by " in software:
- # DEBUG: print(f"DEBUG: software='{software}' has ' by ' in it")
+ print(f"DEBUG: software='{software}' has ' by ' in it")
software = strip_until(software, " by ")
elif type(software) is str and " see " in software:
- # DEBUG: print(f"DEBUG: software='{software}' has ' see ' in it")
+ print(f"DEBUG: software='{software}' has ' see ' in it")
software = strip_until(software, " see ")
- # DEBUG: print(f"DEBUG: software[]={type(software)}")
+ print(f"DEBUG: software[]={type(software)}")
if software == "":
print("WARNING: tidyup_domain() left no software name behind:", domain)
software = None
- # DEBUG: print(f"DEBUG: software[]={type(software)}")
+ print(f"DEBUG: software[]={type(software)}")
if str(software) == "":
- # DEBUG: print(f"DEBUG: software for '{domain}' was not detected, trying generator ...")
+ print(f"DEBUG: software for '{domain}' was not detected, trying generator ...")
software = fetch_generator_from_path(domain)
elif len(str(software)) > 0 and ("." in software or " " in software):
- # DEBUG: print(f"DEBUG: software='{software}' may contain a version number, domain='{domain}', removing it ...")
+ print(f"DEBUG: software='{software}' may contain a version number, domain='{domain}', removing it ...")
software = remove_version(software)
- # DEBUG: print(f"DEBUG: software[]={type(software)}")
+ print(f"DEBUG: software[]={type(software)}")
if type(software) is str and "powered by" in software:
- # DEBUG: print(f"DEBUG: software='{software}' has 'powered by' in it")
+ print(f"DEBUG: software='{software}' has 'powered by' in it")
software = remove_version(strip_powered_by(software))
- # DEBUG: print("DEBUG: Returning domain,software:", domain, software)
+ print("DEBUG: Returning domain,software:", domain, software)
return software
def update_block_reason(reason: str, blocker: str, blocked: str, block_level: str):
- # DEBUG: print(f"DEBUG: reason='{reason}',blocker={blocker},blocked={blocked},block_level={block_level} - CALLED!")
+ print(f"DEBUG: reason='{reason}',blocker={blocker},blocked={blocked},block_level={block_level} - CALLED!")
if type(reason) != str and reason != None:
raise ValueError(f"Parameter reason[]='{type(reason)}' is not 'str'")
elif type(blocker) != str:
elif type(block_level) != str:
raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not 'str'")
- # DEBUG: print("DEBUG: Updating block reason:", reason, blocker, blocked, block_level)
+ print("DEBUG: Updating block reason:", reason, blocker, blocked, block_level)
try:
cursor.execute(
"UPDATE blocks SET reason = ?, last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ? AND reason IN ('','unknown') LIMIT 1",
),
)
- # DEBUG: print(f"DEBUG: cursor.rowcount={cursor.rowcount}")
+ print(f"DEBUG: cursor.rowcount={cursor.rowcount}")
if cursor.rowcount == 0:
- # DEBUG: print(f"DEBUG: Did not update any rows: blocker='{blocker}',blocked='{blocked}',block_level='{block_level}',reason='{reason}' - EXIT!")
+ print(f"DEBUG: Did not update any rows: blocker='{blocker}',blocked='{blocked}',block_level='{block_level}',reason='{reason}' - EXIT!")
return
except BaseException as e:
print(f"ERROR: failed SQL query: reason='{reason}',blocker='{blocker}',blocked='{blocked}',block_level='{block_level}',exception[{type(e)}]:'{str(e)}'")
sys.exit(255)
- # DEBUG: print("DEBUG: EXIT!")
+ print("DEBUG: EXIT!")
def update_last_seen(blocker: str, blocked: str, block_level: str):
- # DEBUG: print("DEBUG: Updating last_seen for:", blocker, blocked, block_level)
+ print("DEBUG: Updating last_seen for:", blocker, blocked, block_level)
try:
cursor.execute(
"UPDATE blocks SET last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ? LIMIT 1",
)
)
- # DEBUG: print(f"DEBUG: cursor.rowcount={cursor.rowcount}")
+ print(f"DEBUG: cursor.rowcount={cursor.rowcount}")
if cursor.rowcount == 0:
- # DEBUG: print(f"DEBUG: Did not update any rows: blocker='{blocker}',blocked='{blocked}',block_level='{block_level}' - EXIT!")
+ print(f"DEBUG: Did not update any rows: blocker='{blocker}',blocked='{blocked}',block_level='{block_level}' - EXIT!")
return
except BaseException as e:
print(f"ERROR: failed SQL query: blocker='{blocker}',blocked='{blocked}',block_level='{block_level}',exception[{type(e)}]:'{str(e)}'")
sys.exit(255)
- # DEBUG: print("DEBUG: EXIT!")
+ print("DEBUG: EXIT!")
def is_instance_blocked(blocker: str, blocked: str, block_level: str) -> bool:
- # DEBUG: print(f"DEBUG: blocker={blocker},blocked={blocked},block_level={block_level} - CALLED!")
+ print(f"DEBUG: blocker={blocker},blocked={blocked},block_level={block_level} - CALLED!")
if type(blocker) != str:
raise ValueError(f"Parameter blocker[]={type(blocker)} is not of type 'str'")
elif blocker == "":
is_blocked = cursor.fetchone() != None
- # DEBUG: print(f"DEBUG: is_blocked='{is_blocked}' - EXIT!")
+ print(f"DEBUG: is_blocked='{is_blocked}' - EXIT!")
return is_blocked
def block_instance(blocker: str, blocked: str, reason: str, block_level: str):
- # DEBUG: print("DEBUG: blocker,blocked,reason,block_level:", blocker, blocked, reason, block_level)
+ print("DEBUG: blocker,blocked,reason,block_level:", blocker, blocked, reason, block_level)
if type(blocker) != str:
raise ValueError(f"Parameter blocker[]={type(blocker)} is not 'str'")
elif blocker == "":
print(f"ERROR: failed SQL query: blocker='{blocker}',blocked='{blocked}',reason='{reason}',block_level='{block_level}',exception[{type(e)}]:'{str(e)}'")
sys.exit(255)
- # DEBUG: print("DEBUG: EXIT!")
+ print("DEBUG: EXIT!")
def is_instance_registered(domain: str) -> bool:
- # DEBUG: print(f"DEBUG: domain={domain} - CALLED!")
+ print(f"DEBUG: domain={domain} - CALLED!")
if type(domain) != str:
raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
raise ValueError(f"Parameter 'domain' cannot be empty")
- # DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!")
+ print(f"DEBUG: domain='{domain}' - CALLED!")
if not cache.key_exists("is_registered"):
- # DEBUG: print(f"DEBUG: Cache for 'is_registered' not initialized, fetching all rows ...")
+ print(f"DEBUG: Cache for 'is_registered' not initialized, fetching all rows ...")
try:
cursor.execute("SELECT domain FROM instances")
# Is cache found?
registered = cache.sub_key_exists("is_registered", domain)
- # DEBUG: print(f"DEBUG: registered='{registered}' - EXIT!")
+ print(f"DEBUG: registered='{registered}' - EXIT!")
return registered
def add_instance(domain: str, origin: str, originator: str, path: str = None):
- # DEBUG: print(f"DEBUG: domain={domain},origin={origin},originator={originator},path={path} - CALLED!")
+ print(f"DEBUG: domain={domain},origin={origin},originator={originator},path={path} - CALLED!")
if type(domain) != str:
raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
elif is_blacklisted(domain):
raise Exception(f"domain='{domain}' is blacklisted, but method invoked")
- # DEBUG: print("DEBUG: domain,origin,originator,path:", domain, origin, originator, path)
+ print("DEBUG: domain,origin,originator,path:", domain, origin, originator, path)
software = determine_software(domain, path)
- # DEBUG: print("DEBUG: Determined software:", software)
+ print("DEBUG: Determined software:", software)
print(f"INFO: Adding instance domain='{domain}' (origin='{origin}',software='{software}')")
try:
cache.set_sub_key("is_registered", domain, True)
if instances.has_pending_instance_data(domain):
- # DEBUG: print(f"DEBUG: domain='{domain}' has pending nodeinfo being updated ...")
+ print(f"DEBUG: domain='{domain}' has pending nodeinfo being updated ...")
instances.set("last_status_code" , domain, None)
instances.set("last_error_details", domain, None)
instances.update_instance_data(domain)
remove_pending_error(domain)
if domain in pending_errors:
- # DEBUG: print("DEBUG: domain has pending error being updated:", domain)
+ print("DEBUG: domain has pending error being updated:", domain)
update_last_error(domain, pending_errors[domain])
remove_pending_error(domain)
print(f"ERROR: failed SQL query: domain='{domain}',exception[{type(e)}]:'{str(e)}'")
sys.exit(255)
else:
- # DEBUG: print("DEBUG: Updating nodeinfo for domain:", domain)
+ print("DEBUG: Updating nodeinfo for domain:", domain)
update_last_nodeinfo(domain)
- # DEBUG: print("DEBUG: EXIT!")
+ print("DEBUG: EXIT!")
def send_bot_post(instance: str, blocks: dict):
- # DEBUG: print(f"DEBUG: instance={instance},blocks()={len(blocks)} - CALLED!")
+ print(f"DEBUG: instance={instance},blocks()={len(blocks)} - CALLED!")
message = instance + " has blocked the following instances:\n\n"
truncated = False
return True
def get_mastodon_blocks(domain: str) -> dict:
- # DEBUG: print(f"DEBUG: domain={domain} - CALLED!")
+ print(f"DEBUG: domain={domain} - CALLED!")
if type(domain) != str:
raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
raise ValueError(f"Parameter 'domain' cannot be empty")
- # DEBUG: print("DEBUG: Fetching mastodon blocks from domain:", domain)
+ print("DEBUG: Fetching mastodon blocks from domain:", domain)
blocks = {
"Suspended servers": [],
"Filtered media" : [],
header_text = tidyup_domain(header.text)
if header_text in language_mapping:
- # DEBUG: print(f"DEBUG: header_text='{header_text}'")
+ print(f"DEBUG: header_text='{header_text}'")
header_text = language_mapping[header_text]
if header_text in blocks or header_text.lower() in blocks:
}
)
- # DEBUG: print("DEBUG: Returning blocks for domain:", domain)
+ print("DEBUG: Returning blocks for domain:", domain)
return {
"reject" : blocks["Suspended servers"],
"media_removal" : blocks["Filtered media"],
}
def get_friendica_blocks(domain: str) -> dict:
- # DEBUG: print(f"DEBUG: domain={domain} - CALLED!")
+ print(f"DEBUG: domain={domain} - CALLED!")
if type(domain) != str:
raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
raise ValueError(f"Parameter 'domain' cannot be empty")
- # DEBUG: print("DEBUG: Fetching friendica blocks from domain:", domain)
+ print("DEBUG: Fetching friendica blocks from domain:", domain)
blocks = []
try:
# Prevents exceptions:
if blocklist is None:
- # DEBUG: print("DEBUG: Instance has no block list:", domain)
+ print("DEBUG: Instance has no block list:", domain)
return {}
for line in blocklist.find("table").find_all("tr")[1:]:
- # DEBUG: print(f"DEBUG: line='{line}'")
+ print(f"DEBUG: line='{line}'")
blocks.append({
"domain": tidyup_domain(line.find_all("td")[0].text),
"reason": tidyup_domain(line.find_all("td")[1].text)
})
- # DEBUG: print("DEBUG: Returning blocks() for domain:", domain, len(blocks))
+ print("DEBUG: Returning blocks() for domain:", domain, len(blocks))
return {
"reject": blocks
}
def get_misskey_blocks(domain: str) -> dict:
- # DEBUG: print(f"DEBUG: domain={domain} - CALLED!")
+ print(f"DEBUG: domain={domain} - CALLED!")
if type(domain) != str:
raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
raise ValueError(f"Parameter 'domain' cannot be empty")
- # DEBUG: print("DEBUG: Fetching misskey blocks from domain:", domain)
+ print("DEBUG: Fetching misskey blocks from domain:", domain)
blocks = {
"suspended": [],
"blocked" : []
# instances page-by-page, since that troonware doesn't support
# sending them all at once
try:
- # DEBUG: print(f"DEBUG: Fetching offset='{offset}' from '{domain}' ...")
+ print(f"DEBUG: Fetching offset='{offset}' from '{domain}' ...")
if offset == 0:
- # DEBUG: print("DEBUG: Sending JSON API request to domain,step,offset:", domain, step, offset)
+ print("DEBUG: Sending JSON API request to domain,step,offset:", domain, step, offset)
fetched = post_json_api(domain, "/api/federation/instances", json.dumps({
"sort" : "+pubAt",
"host" : None,
"Origin": domain
})
else:
- # DEBUG: print("DEBUG: Sending JSON API request to domain,step,offset:", domain, step, offset)
+ print("DEBUG: Sending JSON API request to domain,step,offset:", domain, step, offset)
fetched = post_json_api(domain, "/api/federation/instances", json.dumps({
"sort" : "+pubAt",
"host" : None,
"Origin": domain
})
- # DEBUG: print("DEBUG: fetched():", len(fetched))
+ print("DEBUG: fetched():", len(fetched))
if len(fetched) == 0:
- # DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
+ print("DEBUG: Returned zero bytes, exiting loop:", domain)
break
elif len(fetched) != config.get("misskey_limit"):
- # DEBUG: print(f"DEBUG: Fetched '{len(fetched)}' row(s) but expected: '{config.get('misskey_limit')}'")
+ print(f"DEBUG: Fetched '{len(fetched)}' row(s) but expected: '{config.get('misskey_limit')}'")
offset = offset + (config.get("misskey_limit") - len(fetched))
else:
- # DEBUG: print("DEBUG: Raising offset by step:", step)
+ print("DEBUG: Raising offset by step:", step)
offset = offset + step
for instance in fetched:
# same shit, different asshole ("blocked" aka full suspend)
try:
if offset == 0:
- # DEBUG: print("DEBUG: Sending JSON API request to domain,step,offset:", domain, step, offset)
+ print("DEBUG: Sending JSON API request to domain,step,offset:", domain, step, offset)
fetched = post_json_api(domain,"/api/federation/instances", json.dumps({
"sort" : "+pubAt",
"host" : None,
"Origin": domain
})
else:
- # DEBUG: print("DEBUG: Sending JSON API request to domain,step,offset:", domain, step, offset)
+ print("DEBUG: Sending JSON API request to domain,step,offset:", domain, step, offset)
fetched = post_json_api(domain,"/api/federation/instances", json.dumps({
"sort" : "+pubAt",
"host" : None,
"Origin": domain
})
- # DEBUG: print("DEBUG: fetched():", len(fetched))
+ print("DEBUG: fetched():", len(fetched))
if len(fetched) == 0:
- # DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
+ print("DEBUG: Returned zero bytes, exiting loop:", domain)
break
elif len(fetched) != config.get("misskey_limit"):
- # DEBUG: print(f"DEBUG: Fetched '{len(fetched)}' row(s) but expected: '{config.get('misskey_limit')}'")
+ print(f"DEBUG: Fetched '{len(fetched)}' row(s) but expected: '{config.get('misskey_limit')}'")
offset = offset + (config.get("misskey_limit") - len(fetched))
else:
- # DEBUG: print("DEBUG: Raising offset by step:", step)
+ print("DEBUG: Raising offset by step:", step)
offset = offset + step
for instance in fetched:
offset = 0
break
- # DEBUG: print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...")
+ print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...")
update_last_instance_fetch(domain)
- # DEBUG: print("DEBUG: Returning for domain,blocked(),suspended():", domain, len(blocks["blocked"]), len(blocks["suspended"]))
+ print("DEBUG: Returning for domain,blocked(),suspended():", domain, len(blocks["blocked"]), len(blocks["suspended"]))
return {
"reject" : blocks["blocked"],
"followers_only": blocks["suspended"]
}
def tidyup_reason(reason: str) -> str:
- # DEBUG: print(f"DEBUG: reason='{reason}' - CALLED!")
+ print(f"DEBUG: reason='{reason}' - CALLED!")
if type(reason) != str:
raise ValueError(f"Parameter reason[]={type(reason)} is not expected")
return reason
def tidyup_domain(domain: str) -> str:
- # DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!")
+ print(f"DEBUG: domain='{domain}' - CALLED!")
if type(domain) != str:
raise ValueError(f"Parameter domain[]={type(domain)} is not expected")
# No individual users in block lists
domain = re.sub("(.+)\@", "", domain)
- # DEBUG: print(f"DEBUG: domain='{domain}' - EXIT!")
+ print(f"DEBUG: domain='{domain}' - EXIT!")
return domain
def json_from_response(response: requests.models.Response) -> list:
- # DEBUG: print(f"DEBUG: response[]={type(response)} - CALLED!")
+ print(f"DEBUG: response[]={type(response)} - CALLED!")
if not isinstance(response, requests.models.Response):
raise ValueError(f"Parameter response[]='{type(response)}' is not type of 'Response'")
data = list()
if response.text.strip() != "":
- # DEBUG: print(f"DEBUG: response.text()={len(response.text)} is not empty, invoking response.json() ...")
+ print(f"DEBUG: response.text()={len(response.text)} is not empty, invoking response.json() ...")
try:
data = response.json()
except json.decoder.JSONDecodeError:
pass
- # DEBUG: print(f"DEBUG: data[]={type(data)} - EXIT!")
+ print(f"DEBUG: data[]={type(data)} - EXIT!")
return data
def get_response(domain: str, path: str, headers: dict, timeout: list) -> requests.models.Response:
- # DEBUG: print(f"DEBUG: domain='{domain}',path='{path}',headers()={len(headers)},timeout={timeout} - CALLED!")
+ print(f"DEBUG: domain='{domain}',path='{path}',headers()={len(headers)},timeout={timeout} - CALLED!")
if type(domain) != str:
raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
elif domain == "":
raise ValueError("Parameter 'path' cannot be empty")
try:
- # DEBUG: print(f"DEBUG: Sending request to '{domain}{path}' ...")
- response = reqto.get(f"https://{domain}{path}", headers=headers, timeout=timeout);
+ print(f"DEBUG: Sending request to '{domain}{path}' ...")
+ response = reqto.get(
+ f"https://{domain}{path}",
+ headers=headers,
+ timeout=timeout
+ );
except requests.exceptions.ConnectionError as e:
- # DEBUG: print(f"DEBUG: Fetching '{path}' from '{domain}' failed. exception[{type(e)}]='{str(e)}'")
+ print(f"DEBUG: Fetching '{path}' from '{domain}' failed. exception[{type(e)}]='{str(e)}'")
update_last_error(domain, e)
raise e
- # DEBUG: print(f"DEBUG: response[]='{type(response)}' - EXXIT!")
+ print(f"DEBUG: response[]='{type(response)}' - EXXIT!")
return response
def has_element(elements: list, key: str, value: any) -> bool:
- # DEBUG: print(f"DEBUG: element()={len(element)},key='{key}',value[]='{type(value)}' - CALLED!")
+ print(f"DEBUG: element()={len(element)},key='{key}',value[]='{type(value)}' - CALLED!")
if type(key) != str:
raise ValueError(f"Parameter key[]='{type(key)}' is not 'str'")
elif key == "":
raise ValueError("Parameter 'key' cannot be empty")
has = False
- # DEBUG: print(f"DEBUG: Checking elements()={len(elements)} ...")
+ print(f"DEBUG: Checking elements()={len(elements)} ...")
for element in elements:
- # DEBUG: print(f"DEBUG: element[]='{type(element)}'")
+ print(f"DEBUG: element[]='{type(element)}'")
if type(element) != dict:
raise ValueError(f"element[]='{type(element)}' is not 'dict'")
elif not key in element:
has = True
break
- # DEBUG: print(f"DEBUG: has={has} - EXIT!")
+ print(f"DEBUG: has={has} - EXIT!")
return has
def find_domains(tag: bs4.element.Tag) -> list:
- # DEBUG: print(f"DEBUG: tag[]={type(tag)} - CALLED!")
+ print(f"DEBUG: tag[]={type(tag)} - CALLED!")
if not isinstance(tag, bs4.element.Tag):
raise ValueError(f"Parameter tag[]={type(tag)} is not type of bs4.element.Tag")
elif not isinstance(tag, bs4.element.Tag):
domains = list()
for element in tag.select("tr"):
- # DEBUG: print(f"DEBUG: element[]={type(element)}")
+ print(f"DEBUG: element[]={type(element)}")
if not element.find("td"):
- # DEBUG: print("DEBUG: Skipping element, no <td> found")
+ print("DEBUG: Skipping element, no <td> found")
continue
domain = tidyup_domain(element.find("td").text)
reason = tidyup_reason(element.findAll("td")[1].text)
- # DEBUG: print(f"DEBUG: domain='{domain}',reason='{reason}'")
+ print(f"DEBUG: domain='{domain}',reason='{reason}'")
if is_blacklisted(domain):
print(f"WARNING: domain='{domain}' is blacklisted - skipped!")
print(f"WARNING: domain='{domain}' is not a valid domain - skipped!")
continue
- # DEBUG: print(f"DEBUG: Adding domain='{domain}' ...")
+ print(f"DEBUG: Adding domain='{domain}' ...")
domains.append({
"domain": domain,
"reason": reason,
})
- # DEBUG: print(f"DEBUG: domains()={len(domains)} - EXIT!")
+ print(f"DEBUG: domains()={len(domains)} - EXIT!")
return domains
+
+def get_url(url: str) -> requests.models.Response:
+ print(f"DEBUG: url='{url}' - CALLED!")
+ if type(url) != str:
+ raise ValueError(f"Parameter url[]='{type(url)}' is not 'str'")
+ elif url == "":
+ raise ValueError("Parameter 'url' cannot be empty")
+
+ print(f"DEBUG: Parsing url='{url}'")
+ components = urllib.parse(url)
+
+ # Invoke other function, avoid trailing ?
+ if components.query != "":
+ response = get_response(components.hostname, f"{components.path}?{components.query}")
+ else:
+ response = get_response(components.hostname, f"{components.path}")
+
+ print(f"DEBUG: response[]='{type(response)}' - EXXIT!")
+ return response