# along with this program. If not, see <https://www.gnu.org/licenses/>.
import bs4
-from fba import cache
import hashlib
import re
import reqto
import time
import validators
-with open("config.json") as f:
- config = json.loads(f.read())
+from fba import cache
+from fba import config
+from fba import instances
# Don't check these, known trolls/flooders/testing/developing
blacklist = [
# HTTP headers for non-API requests
headers = {
- "User-Agent": config["useragent"],
+ "User-Agent": config.get("useragent"),
}
+
# HTTP headers for API requests
api_headers = {
- "User-Agent": config["useragent"],
+ "User-Agent": config.get("useragent"),
"Content-Type": "application/json",
}
-# Found info from node, such as nodeinfo URL, detection mode that needs to be
-# written to database. Both arrays must be filled at the same time or else
-# update_instance_data() will fail
-instance_data = {
- # Detection mode: 'AUTO_DISCOVERY', 'STATIC_CHECKS' or 'GENERATOR'
- # NULL means all detection methods have failed (maybe still reachable instance)
- "detection_mode" : {},
- # Found nodeinfo URL
- "nodeinfo_url" : {},
- # Found total peers
- "total_peers" : {},
- # Last fetched instances
- "last_instance_fetch": {},
- # Last updated
- "last_updated" : {},
- # Last blocked
- "last_blocked" : {},
- # Last nodeinfo (fetched)
- "last_nodeinfo" : {},
- # Last status code
- "last_status_code" : {},
- # Last error details
- "last_error_details" : {},
-}
-
language_mapping = {
# English -> English
"Silenced instances" : "Silenced servers",
if (peerlist is None):
print("ERROR: Cannot fetch peers:", domain)
return
- elif has_pending_instance_data(domain):
+ elif instances.has_pending_instance_data(domain):
# DEBUG: print(f"DEBUG: domain='{domain}' has pending nodeinfo data, flushing ...")
- update_instance_data(domain)
+ instances.update_instance_data(domain)
print(f"INFO: Checking {len(peerlist)} instances from {domain} ...")
for instance in peerlist:
continue
# DEBUG: print(f"DEBUG: instance[{type(instance}]={instance} - BEFORE")
- instance = tidyup(instance)
+ instance = tidyup_domain(instance)
# DEBUG: print(f"DEBUG: instance[{type(instance}]={instance} - AFTER")
if instance == "":
- print("WARNING: Empty instance after tidyup(), domain:", domain)
+ print("WARNING: Empty instance after tidyup_domain(), domain:", domain)
continue
elif not validators.domain(instance.split("/")[0]):
print(f"WARNING: Bad instance='{instance}' from domain='{domain}',origin='{origin}',software='{software}'")
# DEBUG: print("DEBUG: EXIT!")
-def set_instance_data(key: str, domain: str, value: any):
- # NOISY-DEBUG: print(f"DEBUG: key='{key}',domain='{domain}',value[]='{type(value)}' - CALLED!")
- if type(key) != str:
- raise ValueError("Parameter key[]='{type(key)}' is not 'str'")
- elif key == "":
- raise ValueError(f"Parameter 'key' cannot be empty")
- elif type(domain) != str:
- raise ValueError("Parameter domain[]='{type(domain)}' is not 'str'")
- elif domain == "":
- raise ValueError(f"Parameter 'domain' cannot be empty")
- elif not key in instance_data:
- raise ValueError(f"key='{key}' not found in instance_data")
- elif not is_primitive(value):
- raise ValueError(f"value[]='{type(value)}' is not a primitive type")
-
- # Set it
- instance_data[key][domain] = value
-
- # DEBUG: print("DEBUG: EXIT!")
-
def add_peers(rows: dict) -> list:
# DEBUG: print(f"DEBUG: rows()={len(rows)} - CALLED!")
peers = list()
# DEBUG: print(f"DEBUG: Adding {len(rows[element])} peer(s) to peers list ...")
for peer in rows[element]:
# DEBUG: print(f"DEBUG: peer='{peer}' - BEFORE!")
- peer = tidyup(peer)
+ peer = tidyup_domain(peer)
# DEBUG: print(f"DEBUG: peer='{peer}' - AFTER!")
if is_blacklisted(peer):
raise ValueError(f"Parameter 'domain' cannot be empty")
# DEBUG: print("DEBUG: Updating last_blocked for domain", domain)
- set_instance_data("last_blocked", domain, time.time())
+ instances.set_instance_data("last_blocked", domain, time.time())
# Running pending updated
- # DEBUG: print(f"DEBUG: Invoking update_instance_data({domain}) ...")
- update_instance_data(domain)
-
- # DEBUG: print("DEBUG: EXIT!")
-
-def has_pending_instance_data(domain: str) -> bool:
- # DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!")
- if type(domain) != str:
- raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
- elif domain == "":
- raise ValueError(f"Parameter 'domain' cannot be empty")
-
- has_pending = False
- for key in instance_data:
- # DEBUG: print(f"DEBUG: key='{key}',domain='{domain}',instance_data[key]()='{len(instance_data[key])}'")
- if domain in instance_data[key]:
- has_pending = True
- break
-
- # DEBUG: print(f"DEBUG: has_pending='{has_pending}' - EXIT!")
- return has_pending
-
-def update_instance_data(domain: str):
- if type(domain) != str:
- raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
- elif domain == "":
- raise ValueError(f"Parameter 'domain' cannot be empty")
-
- # DEBUG: print(f"DEBUG: Updating nodeinfo for domain='{domain}' ...")
- sql_string = ''
- fields = list()
- for key in instance_data:
- # DEBUG: print("DEBUG: key:", key)
- if domain in instance_data[key]:
- # DEBUG: print(f"DEBUG: Adding '{instance_data[key][domain]}' for key='{key}' ...")
- fields.append(instance_data[key][domain])
- sql_string += f" {key} = ?,"
-
- fields.append(domain)
-
- if sql_string == '':
- raise ValueError(f"No fields have been set, but method invoked, domain='{domain}'")
-
- # DEBUG: print(f"DEBUG: sql_string='{sql_string}',fields()={len(fields)}")
- sql_string = "UPDATE instances SET" + sql_string + " last_updated = TIME() WHERE domain = ? LIMIT 1"
- # DEBUG: print("DEBUG: sql_string:", sql_string)
-
- try:
- # DEBUG: print("DEBUG: Executing SQL:", sql_string)
- cursor.execute(sql_string, fields)
-
- # DEBUG: print(f"DEBUG: Success! (rowcount={cursor.rowcount })")
- if cursor.rowcount == 0:
- print(f"WARNING: Did not update any rows: domain='{domain}',fields()={len(fields)} - EXIT!")
- return
-
- connection.commit()
-
- # DEBUG: print("DEBUG: Deleting instance_data for domain:", domain)
- for key in instance_data:
- try:
- # DEBUG: print("DEBUG: Deleting key:", key)
- del instance_data[key][domain]
- except:
- pass
-
- except BaseException as e:
- print(f"ERROR: failed SQL query: domain='{domain}',sql_string='{sql_string}',exception[{type(e)}]:'{str(e)}'")
- sys.exit(255)
+ # DEBUG: print(f"DEBUG: Invoking instances.update_instance_data({domain}) ...")
+ instances.update_instance_data(domain)
# DEBUG: print("DEBUG: EXIT!")
])
# Cleanup old entries
- # DEBUG: print(f"DEBUG: Purging old records (distance: {config['error_log_cleanup']})")
- cursor.execute("DELETE FROM error_log WHERE created < ?", [time.time() - config["error_log_cleanup"]])
+ # DEBUG: print(f"DEBUG: Purging old records (distance: {config.get('error_log_cleanup')})")
+ cursor.execute("DELETE FROM error_log WHERE created < ?", [time.time() - config.get("error_log_cleanup")])
except BaseException as e:
print(f"ERROR: failed SQL query: domain='{domain}',exception[{type(e)}]:'{str(e)}'")
sys.exit(255)
# DEBUG: print("DEBUG: AFTER res[]:", type(res))
if type(res) is str:
# DEBUG: print(f"DEBUG: Setting last_error_details='{res}'");
- set_instance_data("last_status_code" , domain, 999)
- set_instance_data("last_error_details", domain, res)
+ instances.set_instance_data("last_status_code" , domain, 999)
+ instances.set_instance_data("last_error_details", domain, res)
else:
# DEBUG: print(f"DEBUG: Setting last_error_details='{res.reason}'");
- set_instance_data("last_status_code" , domain, res.status_code)
- set_instance_data("last_error_details", domain, res.reason)
+ instances.set_instance_data("last_status_code" , domain, res.status_code)
+ instances.set_instance_data("last_error_details", domain, res.reason)
# Running pending updated
- # DEBUG: print(f"DEBUG: Invoking update_instance_data({domain}) ...")
- update_instance_data(domain)
+ # DEBUG: print(f"DEBUG: Invoking instances.update_instance_data({domain}) ...")
+ instances.update_instance_data(domain)
log_error(domain, res)
# DEBUG: print("DEBUG: EXIT!")
def update_last_instance_fetch(domain: str):
+ # DEBUG: print(f"DEBUG: domain={domain} - CALLED!")
if type(domain) != str:
raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
raise ValueError(f"Parameter 'domain' cannot be empty")
# DEBUG: print("DEBUG: Updating last_instance_fetch for domain:", domain)
- set_instance_data("last_instance_fetch", domain, time.time())
+ instances.set_instance_data("last_instance_fetch", domain, time.time())
# Running pending updated
- # DEBUG: print(f"DEBUG: Invoking update_instance_data({domain}) ...")
- update_instance_data(domain)
+ # DEBUG: print(f"DEBUG: Invoking instances.update_instance_data({domain}) ...")
+ instances.update_instance_data(domain)
# DEBUG: print("DEBUG: EXIT!")
def update_last_nodeinfo(domain: str):
+ # DEBUG: print(f"DEBUG: domain={domain} - CALLED!")
if type(domain) != str:
raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
raise ValueError(f"Parameter 'domain' cannot be empty")
# DEBUG: print("DEBUG: Updating last_nodeinfo for domain:", domain)
- set_instance_data("last_nodeinfo", domain, time.time())
- set_instance_data("last_updated" , domain, time.time())
+ instances.set_instance_data("last_nodeinfo", domain, time.time())
+ instances.set_instance_data("last_updated" , domain, time.time())
# Running pending updated
- # DEBUG: print(f"DEBUG: Invoking update_instance_data({domain}) ...")
- update_instance_data(domain)
+ # DEBUG: print(f"DEBUG: Invoking instances.update_instance_data({domain}) ...")
+ instances.update_instance_data(domain)
# DEBUG: print("DEBUG: EXIT!")
def get_peers(domain: str, software: str) -> list:
+ # DEBUG: print(f"DEBUG: domain({len(domain)})={domain},software={software} - CALLED!")
if type(domain) != str:
raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
if software == "misskey":
# DEBUG: print(f"DEBUG: domain='{domain}' is misskey, sending API POST request ...")
offset = 0
- step = config["misskey_offset"]
+ step = config.get("misskey_offset")
# iterating through all "suspended" (follow-only in its terminology)
# instances page-by-page, since that troonware doesn't support
if len(fetched) == 0:
# DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
break
- elif len(fetched) != config["misskey_offset"]:
- # DEBUG: print(f"DEBUG: Fetched '{len(fetched)}' row(s) but expected: '{config['misskey_offset']}'")
- offset = offset + (config["misskey_offset"] - len(fetched))
+ elif len(fetched) != config.get("misskey_offset"):
+ # DEBUG: print(f"DEBUG: Fetched '{len(fetched)}' row(s) but expected: '{config.get('misskey_offset')}'")
+ offset = offset + (config.get("misskey_offset") - len(fetched))
else:
# DEBUG: print("DEBUG: Raising offset by step:", step)
offset = offset + step
peers.append(row["host"])
# DEBUG: print(f"DEBUG: Adding '{len(peers)}' for domain='{domain}'")
- set_instance_data("total_peers", domain, len(peers))
+ instances.set_instance_data("total_peers", domain, len(peers))
# DEBUG: print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...")
update_last_instance_fetch(domain)
elif software == "lemmy":
# DEBUG: print(f"DEBUG: domain='{domain}' is Lemmy, fetching JSON ...")
try:
- res = reqto.get(f"https://{domain}/api/v3/site", headers=api_headers, timeout=(config["connection_timeout"], config["read_timeout"]))
+ res = reqto.get(f"https://{domain}/api/v3/site", headers=api_headers, timeout=(config.get("connection_timeout"), config.get("read_timeout")))
data = res.json()
# DEBUG: print(f"DEBUG: res.ok={res.ok},res.status_code='{res.status_code}',data[]='{type(data)}'")
print(f"WARNING: Exception during fetching JSON: domain='{domain}',exception[{type(e)}]:'{str(e)}'")
# DEBUG: print(f"DEBUG: Adding '{len(peers)}' for domain='{domain}'")
- set_instance_data("total_peers", domain, len(peers))
+ instances.set_instance_data("total_peers", domain, len(peers))
# DEBUG: print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...")
update_last_instance_fetch(domain)
# DEBUG: print(f"DEBUG: domain='{domain}',mode='{mode}'")
while True:
try:
- res = reqto.get(f"https://{domain}/api/v1/server/{mode}?start={start}&count=100", headers=headers, timeout=(config["connection_timeout"], config["read_timeout"]))
+ res = reqto.get(f"https://{domain}/api/v1/server/{mode}?start={start}&count=100", headers=headers, timeout=(config.get("connection_timeout"), config.get("read_timeout")))
data = res.json()
# DEBUG: print(f"DEBUG: res.ok={res.ok},res.status_code='{res.status_code}',data[]='{type(data)}'")
print(f"WARNING: Exception during fetching JSON: domain='{domain}',exception[{type(e)}]:'{str(e)}'")
# DEBUG: print(f"DEBUG: Adding '{len(peers)}' for domain='{domain}'")
- set_instance_data("total_peers", domain, len(peers))
+ instances.set_instance_data("total_peers", domain, len(peers))
# DEBUG: print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...")
update_last_instance_fetch(domain)
# DEBUG: print(f"DEBUG: Fetching get_peers_url='{get_peers_url}' from '{domain}' ...")
try:
- res = reqto.get(f"https://{domain}{get_peers_url}", headers=api_headers, timeout=(config["connection_timeout"], config["read_timeout"]))
+ res = reqto.get(f"https://{domain}{get_peers_url}", headers=api_headers, timeout=(config.get("connection_timeout"), config.get("read_timeout")))
data = res.json()
# DEBUG: print(f"DEBUG: res.ok={res.ok},res.status_code={res.status_code},data[]='{type(data)}'")
if not res.ok or res.status_code >= 400:
# DEBUG: print(f"DEBUG: Was not able to fetch '{get_peers_url}', trying alternative ...")
- res = reqto.get(f"https://{domain}/api/v3/site", headers=api_headers, timeout=(config["connection_timeout"], config["read_timeout"]))
+ res = reqto.get(f"https://{domain}/api/v3/site", headers=api_headers, timeout=(config.get("connection_timeout"), config.get("read_timeout")))
data = res.json()
# DEBUG: print(f"DEBUG: res.ok={res.ok},res.status_code={res.status_code},data[]='{type(data)}'")
update_last_error(domain, e)
# DEBUG: print(f"DEBUG: Adding '{len(peers)}' for domain='{domain}'")
- set_instance_data("total_peers", domain, len(peers))
+ instances.set_instance_data("total_peers", domain, len(peers))
# DEBUG: print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...")
update_last_instance_fetch(domain)
# DEBUG: print("DEBUG: Sending POST to domain,path,parameter:", domain, path, parameter, extra_headers)
data = {}
try:
- res = reqto.post(f"https://{domain}{path}", data=parameter, headers={**api_headers, **extra_headers}, timeout=(config["connection_timeout"], config["read_timeout"]))
+ res = reqto.post(f"https://{domain}{path}", data=parameter, headers={**api_headers, **extra_headers}, timeout=(config.get("connection_timeout"), config.get("read_timeout")))
data = res.json()
# DEBUG: print(f"DEBUG: res.ok={res.ok},res.status_code={res.status_code},data[]='{type(data)}'")
return data
def fetch_nodeinfo(domain: str, path: str = None) -> list:
+ # DEBUG: print(f"DEBUG: domain='{domain}',path={path} - CALLED!")
if type(domain) != str:
raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
raise ValueError(f"Parameter 'domain' cannot be empty")
+ elif type(path) != str and path != None:
+ raise ValueError(f"Parameter path[]={type(path)} is not 'str'")
# DEBUG: print("DEBUG: Fetching nodeinfo from domain,path:", domain, path)
-
nodeinfo = fetch_wellknown_nodeinfo(domain)
- # DEBUG: print("DEBUG: nodeinfo:", nodeinfo)
+ # DEBUG: print(f"DEBUG: nodeinfo({len(nodeinfo)})={nodeinfo}")
if len(nodeinfo) > 0:
- # DEBUG: print("DEBUG: Returning auto-discovered nodeinfo:", len(nodeinfo))
+ # DEBUG: print("DEBUG: nodeinfo()={len(nodeinfo))} - EXIT!")
return nodeinfo
requests = [
try:
# DEBUG: print("DEBUG: Fetching request:", request)
- res = reqto.get(request, headers=api_headers, timeout=(config["connection_timeout"], config["read_timeout"]))
+ res = reqto.get(request, headers=api_headers, timeout=(config.get("connection_timeout"), config.get("read_timeout")))
data = res.json()
# DEBUG: print(f"DEBUG: res.ok={res.ok},res.status_code={res.status_code},data[]='{type(data)}'")
if res.ok and isinstance(data, dict):
# DEBUG: print("DEBUG: Success:", request)
- set_instance_data("detection_mode", domain, "STATIC_CHECK")
- set_instance_data("nodeinfo_url" , domain, request)
+ instances.set_instance_data("detection_mode", domain, "STATIC_CHECK")
+ instances.set_instance_data("nodeinfo_url" , domain, request)
break
elif res.ok and isinstance(data, list):
# DEBUG: print(f"DEBUG: domain='{domain}' returned a list: '{data}'")
update_last_error(domain, e)
pass
- # DEBUG: print("DEBUG: Returning data[]:", type(data))
+ # DEBUG: print(f"DEBUG: data()={len(data)} - EXIT!")
return data
def fetch_wellknown_nodeinfo(domain: str) -> list:
+ # DEBUG: print(f"DEBUG: domain={domain} - CALLED!")
if type(domain) != str:
raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
data = {}
try:
- res = reqto.get(f"https://{domain}/.well-known/nodeinfo", headers=api_headers, timeout=(config["connection_timeout"], config["read_timeout"]))
+ res = reqto.get(f"https://{domain}/.well-known/nodeinfo", headers=api_headers, timeout=(config.get("connection_timeout"), config.get("read_timeout")))
data = res.json()
# DEBUG: print("DEBUG: domain,res.ok,data[]:", domain, res.ok, type(data))
# DEBUG: print("DEBUG: href,res.ok,res.status_code:", link["href"], res.ok, res.status_code)
if res.ok and isinstance(data, dict):
# DEBUG: print("DEBUG: Found JSON nodeinfo():", len(data))
- set_instance_data("detection_mode", domain, "AUTO_DISCOVERY")
- set_instance_data("nodeinfo_url" , domain, link["href"])
+ instances.set_instance_data("detection_mode", domain, "AUTO_DISCOVERY")
+ instances.set_instance_data("nodeinfo_url" , domain, link["href"])
break
else:
print("WARNING: Unknown 'rel' value:", domain, link["rel"])
return data
def fetch_generator_from_path(domain: str, path: str = "/") -> str:
+ # DEBUG: print(f"DEBUG: domain({len(domain)})={domain},path={path} - CALLED!")
if type(domain) != str:
raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
try:
# DEBUG: print(f"DEBUG: Fetching path='{path}' from '{domain}' ...")
- res = reqto.get(f"https://{domain}{path}", headers=headers, timeout=(config["connection_timeout"], config["read_timeout"]))
+ res = reqto.get(f"https://{domain}{path}", headers=headers, timeout=(config.get("connection_timeout"), config.get("read_timeout")))
# DEBUG: print("DEBUG: domain,res.ok,res.status_code,res.text[]:", domain, res.ok, res.status_code, type(res.text))
if res.ok and res.status_code < 300 and len(res.text) > 0:
# DEBUG: print(f"DEBUG: generator='{generator}',site_name='{site_name}'")
if isinstance(generator, bs4.element.Tag):
# DEBUG: print("DEBUG: Found generator meta tag:", domain)
- software = tidyup(generator.get("content"))
+ software = tidyup_domain(generator.get("content"))
print(f"INFO: domain='{domain}' is generated by '{software}'")
- set_instance_data("detection_mode", domain, "GENERATOR")
+ instances.set_instance_data("detection_mode", domain, "GENERATOR")
remove_pending_error(domain)
elif isinstance(site_name, bs4.element.Tag):
# DEBUG: print("DEBUG: Found property=og:site_name:", domain)
- sofware = tidyup(site_name.get("content"))
+ sofware = tidyup_domain(site_name.get("content"))
print(f"INFO: domain='{domain}' has og:site_name='{software}'")
- set_instance_data("detection_mode", domain, "SITE_NAME")
+ instances.set_instance_data("detection_mode", domain, "SITE_NAME")
remove_pending_error(domain)
except BaseException as e:
return software
def determine_software(domain: str, path: str = None) -> str:
+ # DEBUG: print(f"DEBUG: domain({len(domain)})={domain},path={path} - CALLED!")
if type(domain) != str:
raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
raise ValueError(f"Parameter 'domain' cannot be empty")
+ elif type(path) != str and path != None:
+ raise ValueError(f"Parameter path[]={type(path)} is not 'str'")
# DEBUG: print("DEBUG: Determining software for domain,path:", domain, path)
software = None
# DEBUG: print(f"DEBUG: Generator for domain='{domain}' is: {software}, EXIT!")
return software
- software = tidyup(data["software"]["name"])
+ software = tidyup_domain(data["software"]["name"])
- # DEBUG: print("DEBUG: sofware after tidyup():", software)
+ # DEBUG: print("DEBUG: sofware after tidyup_domain():", software)
if software in ["akkoma", "rebased"]:
# DEBUG: print("DEBUG: Setting pleroma:", domain, software)
software = "pleroma"
software = software.split("/")[-1];
elif software.find("|") > 0:
print("WARNING: Spliting of pipe:", software)
- software = tidyup(software.split("|")[0]);
+ software = tidyup_domain(software.split("|")[0]);
elif "powered by" in software:
# DEBUG: print(f"DEBUG: software='{software}' has 'powered by' in it")
software = strip_powered_by(software)
# DEBUG: print(f"DEBUG: software[]={type(software)}")
if software == "":
- print("WARNING: tidyup() left no software name behind:", domain)
+ print("WARNING: tidyup_domain() left no software name behind:", domain)
software = None
# DEBUG: print(f"DEBUG: software[]={type(software)}")
return software
def update_block_reason(reason: str, blocker: str, blocked: str, block_level: str):
+ # DEBUG: print(f"DEBUG: reason='{reason}',blocker={blocker},blocked={blocked},block_level={block_level} - CALLED!")
if type(reason) != str and reason != None:
raise ValueError(f"Parameter reason[]='{type(reason)}' is not 'str'")
elif type(blocker) != str:
# DEBUG: print("DEBUG: EXIT!")
+def is_instance_blocked(blocker: str, blocked: str, block_level: str) -> bool:
+ # DEBUG: print(f"DEBUG: blocker={blocker},blocked={blocked},block_level={block_level} - CALLED!")
+ if type(blocker) != str:
+ raise ValueError(f"Parameter blocker[]={type(blocker)} is not of type 'str'")
+ elif blocker == "":
+ raise ValueError("Parameter 'blocker' cannot be empty")
+ elif type(blocked) != str:
+ raise ValueError(f"Parameter blocked[]={type(blocked)} is not of type 'str'")
+ elif blocked == "":
+ raise ValueError("Parameter 'blocked' cannot be empty")
+ elif type(block_level) != str:
+ raise ValueError(f"Parameter block_level[]={type(block_level)} is not of type 'str'")
+ elif block_level == "":
+ raise ValueError("Parameter 'block_level' cannot be empty")
+
+ cursor.execute(
+ "SELECT * FROM blocks WHERE blocker = ? AND blocked = ? AND block_level = ? LIMIT 1",
+ (
+ blocker,
+ blocked,
+ block_level
+ ),
+ )
+
+ is_blocked = cursor.fetchone() != None
+
+ # DEBUG: print(f"DEBUG: is_blocked='{is_blocked}' - EXIT!")
+ return is_blocked
+
def block_instance(blocker: str, blocked: str, reason: str, block_level: str):
# DEBUG: print("DEBUG: blocker,blocked,reason,block_level:", blocker, blocked, reason, block_level)
if type(blocker) != str:
# DEBUG: print("DEBUG: EXIT!")
def is_instance_registered(domain: str) -> bool:
+ # DEBUG: print(f"DEBUG: domain={domain} - CALLED!")
if type(domain) != str:
raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
return registered
def add_instance(domain: str, origin: str, originator: str, path: str = None):
+ # DEBUG: print(f"DEBUG: domain={domain},origin={origin},originator={originator},path={path} - CALLED!")
if type(domain) != str:
raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
),
)
- set_cache_key("is_registered", domain, True)
+ cache.set_cache_key("is_registered", domain, True)
- if has_pending_instance_data(domain):
+ if instances.has_pending_instance_data(domain):
# DEBUG: print(f"DEBUG: domain='{domain}' has pending nodeinfo being updated ...")
- set_instance_data("last_status_code" , domain, None)
- set_instance_data("last_error_details", domain, None)
- update_instance_data(domain)
+ instances.set_instance_data("last_status_code" , domain, None)
+ instances.set_instance_data("last_error_details", domain, None)
+ instances.update_instance_data(domain)
remove_pending_error(domain)
if domain in pending_errors:
# DEBUG: print("DEBUG: EXIT!")
def send_bot_post(instance: str, blocks: dict):
+ # DEBUG: print(f"DEBUG: instance={instance},blocks()={len(blocks)} - CALLED!")
message = instance + " has blocked the following instances:\n\n"
truncated = False
if truncated:
message = message + "(the list has been truncated to the first 20 entries)"
- botheaders = {**api_headers, **{"Authorization": "Bearer " + config["bot_token"]}}
+ botheaders = {**api_headers, **{"Authorization": "Bearer " + config.get("bot_token")}}
req = reqto.post(
- f"{config['bot_instance']}/api/v1/statuses",
+ f"{config.get('bot_instance')}/api/v1/statuses",
data={
"status" : message,
- "visibility" : config['bot_visibility'],
+ "visibility" : config.get('bot_visibility'),
"content_type": "text/plain"
},
headers=botheaders,
return True
def get_mastodon_blocks(domain: str) -> dict:
+ # DEBUG: print(f"DEBUG: domain={domain} - CALLED!")
if type(domain) != str:
raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
try:
doc = bs4.BeautifulSoup(
- reqto.get(f"https://{domain}/about", headers=headers, timeout=(config["connection_timeout"], config["read_timeout"])).text,
+ reqto.get(f"https://{domain}/about", headers=headers, timeout=(config.get("connection_timeout"), config.get("read_timeout"))).text,
"html.parser",
)
except BaseException as e:
return {}
for header in doc.find_all("h3"):
- header_text = tidyup(header.text)
+ header_text = tidyup_domain(header.text)
if header_text in language_mapping:
# DEBUG: print(f"DEBUG: header_text='{header_text}'")
for line in header.find_all_next("table")[0].find_all("tr")[1:]:
blocks[header_text].append(
{
- "domain": tidyup(line.find("span").text),
- "hash" : tidyup(line.find("span")["title"][9:]),
- "reason": tidyup(line.find_all("td")[1].text),
+ "domain": tidyup_domain(line.find("span").text),
+ "hash" : tidyup_domain(line.find("span")["title"][9:]),
+ "reason": tidyup_domain(line.find_all("td")[1].text),
}
)
}
def get_friendica_blocks(domain: str) -> dict:
+ # DEBUG: print(f"DEBUG: domain={domain} - CALLED!")
if type(domain) != str:
raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
try:
doc = bs4.BeautifulSoup(
- reqto.get(f"https://{domain}/friendica", headers=headers, timeout=(config["connection_timeout"], config["read_timeout"])).text,
+ reqto.get(f"https://{domain}/friendica", headers=headers, timeout=(config.get("connection_timeout"), config.get("read_timeout"))).text,
"html.parser",
)
except BaseException as e:
for line in blocklist.find("table").find_all("tr")[1:]:
# DEBUG: print(f"DEBUG: line='{line}'")
blocks.append({
- "domain": tidyup(line.find_all("td")[0].text),
- "reason": tidyup(line.find_all("td")[1].text)
+ "domain": tidyup_domain(line.find_all("td")[0].text),
+ "reason": tidyup_domain(line.find_all("td")[1].text)
})
# DEBUG: print("DEBUG: Returning blocks() for domain:", domain, len(blocks))
}
def get_misskey_blocks(domain: str) -> dict:
+ # DEBUG: print(f"DEBUG: domain={domain} - CALLED!")
if type(domain) != str:
raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
}
offset = 0
- step = config["misskey_offset"]
+ step = config.get("misskey_offset")
while True:
# iterating through all "suspended" (follow-only in its terminology)
# instances page-by-page, since that troonware doesn't support
if len(fetched) == 0:
# DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
break
- elif len(fetched) != config["misskey_offset"]:
- # DEBUG: print(f"DEBUG: Fetched '{len(fetched)}' row(s) but expected: '{config['misskey_offset']}'")
- offset = offset + (config["misskey_offset"] - len(fetched))
+ elif len(fetched) != config.get("misskey_offset"):
+ # DEBUG: print(f"DEBUG: Fetched '{len(fetched)}' row(s) but expected: '{config.get('misskey_offset')}'")
+ offset = offset + (config.get("misskey_offset") - len(fetched))
else:
# DEBUG: print("DEBUG: Raising offset by step:", step)
offset = offset + step
if instance["isSuspended"]:
blocks["suspended"].append(
{
- "domain": tidyup(instance["host"]),
+ "domain": tidyup_domain(instance["host"]),
# no reason field, nothing
"reason": None
}
if len(fetched) == 0:
# DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
break
- elif len(fetched) != config["misskey_offset"]:
- # DEBUG: print(f"DEBUG: Fetched '{len(fetched)}' row(s) but expected: '{config['misskey_offset']}'")
- offset = offset + (config["misskey_offset"] - len(fetched))
+ elif len(fetched) != config.get("misskey_offset"):
+ # DEBUG: print(f"DEBUG: Fetched '{len(fetched)}' row(s) but expected: '{config.get('misskey_offset')}'")
+ offset = offset + (config.get("misskey_offset") - len(fetched))
else:
# DEBUG: print("DEBUG: Raising offset by step:", step)
offset = offset + step
for instance in fetched:
if instance["isBlocked"]:
blocks["blocked"].append({
- "domain": tidyup(instance["host"]),
+ "domain": tidyup_domain(instance["host"]),
"reason": None
})
"followers_only": blocks["suspended"]
}
-def tidyup(string: str) -> str:
- if type(string) != str:
- raise ValueError(f"Parameter string[]={type(string)} is not expected")
+def tidyup_domain(domain: str) -> str:
+ # DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!")
+ if type(domain) != str:
+ raise ValueError(f"Parameter domain[]={type(domain)} is not expected")
- # some retards put their blocks in variable case
- string = string.lower().strip()
+ # All lower-case and strip spaces out
+ domain = domain.lower().strip()
- # other retards put the port
- string = re.sub("\:\d+$", "", string)
+ # No port number
+ domain = re.sub("\:\d+$", "", domain)
- # bigger retards put the schema in their blocklist, sometimes even without slashes
- string = re.sub("^https?\:(\/*)", "", string)
+ # No protocol, sometimes without the slashes
+ domain = re.sub("^https?\:(\/*)", "", domain)
- # and trailing slash
- string = re.sub("\/$", "", string)
+ # No trailing slash
+ domain = re.sub("\/$", "", domain)
- # and the @
- string = re.sub("^\@", "", string)
+ # No @ sign
+ domain = re.sub("^\@", "", domain)
- # the biggest retards of them all try to block individual users
- string = re.sub("(.+)\@", "", string)
+ # No individual users in block lists
+ domain = re.sub("(.+)\@", "", domain)
- return string
+ # DEBUG: print(f"DEBUG: domain='{domain}' - EXIT!")
+ return domain