blocks.db*
blocklists/*/
__pycache__/
-venv/
config.json
*.cover
*.pyc
+report.txt
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
+#logger.setLevel(logging.DEBUG)
def check_instance(args: argparse.Namespace) -> int:
logger.debug("args.domain='%s' - CALLED!", args.domain)
if args.domain is not None and args.domain != "":
# Re-check single domain
- logger.debug(f"Querying database for single args.domain='{args.domain}' ...")
+ logger.debug("Querying database for single args.domain='%s' ...", args.domain)
database.cursor.execute(
"SELECT domain, software, origin, nodeinfo_url FROM instances WHERE domain = ?", [args.domain]
)
elif args.software is not None and args.software != "":
# Re-check single software
- logger.debug(f"Querying database for args.software='{args.software}' ...")
+ logger.debug("Querying database for args.software='%s' ...", args.software)
database.cursor.execute(
"SELECT domain, software, origin, nodeinfo_url FROM instances WHERE software = ?", [args.software]
)
rows = database.cursor.fetchall()
logger.info("Checking %d entries ...", len(rows))
for blocker, software, origin, nodeinfo_url in rows:
- logger.debug("BEFORE blocker='%s',software='%s',origin='%s',nodeinfo_url='%s'", blocker, software, origin, nodeinfo_url)
+ logger.debug("blocker='%s',software='%s',origin='%s',nodeinfo_url='%s'", blocker, software, origin, nodeinfo_url)
blockdict = list()
blocker = tidyup.domain(blocker)
- logger.debug("AFTER blocker='%s',software='%s'", blocker, software)
+ logger.debug("blocker='%s' - AFTER!", blocker)
if blocker == "":
logger.warning("blocker is now empty!")
continue
elif nodeinfo_url is None or nodeinfo_url == "":
- logger.debug(f"blocker='{blocker}',software='{software}' has empty nodeinfo_url")
+ logger.debug("blocker='%s',software='%s' has empty nodeinfo_url", blocker, software)
continue
elif not utils.is_domain_wanted(blocker):
logger.warning("blocker='%s' is not wanted - SKIPPED!", blocker)
continue
- logger.debug(f"blocker='{blocker}'")
+ logger.debug("blocker='%s'", blocker)
instances.set_last_blocked(blocker)
instances.set_has_obfucation(blocker, False)
+ blocking = list()
if software == "pleroma":
logger.info("blocker='%s',software='%s'", blocker, software)
- pleroma.fetch_blocks(blocker, origin, nodeinfo_url)
+ blocking = pleroma.fetch_blocks(blocker, nodeinfo_url)
elif software == "mastodon":
logger.info("blocker='%s',software='%s'", blocker, software)
- mastodon.fetch_blocks(blocker, origin, nodeinfo_url)
+ blocking = mastodon.fetch_blocks(blocker, nodeinfo_url)
elif software == "lemmy":
logger.info("blocker='%s',software='%s'", blocker, software)
- lemmy.fetch_blocks(blocker, origin, nodeinfo_url)
- elif software == "friendica" or software == "misskey":
+ blocking = lemmy.fetch_blocks(blocker, nodeinfo_url)
+ elif software == "friendica":
logger.info("blocker='%s',software='%s'", blocker, software)
+ blocking = friendica.fetch_blocks(blocker)
+ elif software == "misskey":
+ logger.info("blocker='%s',software='%s'", blocker, software)
+ blocking = misskey.fetch_blocks(blocker)
+ else:
+ logger.warning("Unknown software: blocker='%s',software='%s'", blocker, software)
+
+ logger.info("Checking %s entries from blocker='%s',software='%s' ...", len(blocking), blocker, software)
+ for block in blocking:
+ logger.debug("blocked='%s',block_level='%s',reason='%s'", block['blocked'], block['block_level'], block['reason'])
+
+ if block['block_level'] == "":
+ logger.warning("block_level is empty, blocker='%s',blocked='%s'", block['blocker'], block['blocked'])
+ continue
+
+ logger.debug("blocked='%s',reason='%s' - BEFORE!", block['blocked'], block['reason'])
+ block['blocked'] = tidyup.domain(block['blocked'])
+ block['reason'] = tidyup.reason(block['reason']) if block['reason'] is not None and block['reason'] != "" else None
+ logger.debug("blocked='%s',reason='%s' - AFTER!", block['blocked'], block['reason'])
- blocking = list()
- if software == "friendica":
- blocking = friendica.fetch_blocks(blocker)
- elif software == "misskey":
- blocking = misskey.fetch_blocks(blocker)
-
- logger.info("Checking %s entries from blocker='%s',software='%s' ...", len(blocking.items()), blocker, software)
- for block_level, blocklist in blocking.items():
- logger.debug("blocker='%s',block_level='%s',blocklist()=%d", blocker, block_level, len(blocklist))
- block_level = tidyup.domain(block_level)
- logger.debug("AFTER-block_level='%s'", block_level)
- if block_level == "":
- logger.warning("block_level is empty, blocker='%s'", blocker)
+ if block['blocked'] == "":
+ logger.warning("blocked is empty, blocker='%s'", blocker)
+ continue
+ elif block['blocked'].count("*") > 0:
+ logger.debug("blocker='%s' uses obfucated domains, marking ...", blocker)
+ instances.set_has_obfucation(blocker, True)
+
+ # Some friendica servers also obscure domains without hash
+ row = instances.deobfucate("*", block['blocked'])
+
+ logger.debug("row[]='%s'", type(row))
+ if row is None:
+ logger.warning("Cannot deobfucate blocked='%s',blocker='%s',software='%s' - SKIPPED!", block['blocked'], blocker, software)
continue
- logger.debug(f"Checking {len(blocklist)} entries from blocker='{blocker}',software='{software}',block_level='{block_level}' ...")
- for block in blocklist:
- blocked, reason = block.values()
- logger.debug(f"blocked='{blocked}',reason='{reason}' - BEFORE!")
- blocked = tidyup.domain(blocked)
- reason = tidyup.reason(reason) if reason is not None and reason != "" else None
- logger.debug("blocked='%s',reason='%s' - AFTER!", blocked, reason)
+ block['blocked'] = row[0]
+ origin = row[1]
+ nodeinfo_url = row[2]
+ elif block['blocked'].count("?") > 0:
+ logger.debug("blocker='%s' uses obfucated domains, marking ...", blocker)
+ instances.set_has_obfucation(blocker, True)
- if blocked == "":
- logger.warning("blocked is empty, blocker='%s'", blocker)
- continue
- elif blocked.count("*") > 0:
- logger.debug("blocker='%s' uses obfucated domains, marking ...", blocker)
- instances.set_has_obfucation(blocker, True)
-
- # Some friendica servers also obscure domains without hash
- row = instances.deobfucate("*", blocked)
-
- logger.debug("row[]='%s'", type(row))
- if row is None:
- logger.warning("Cannot deobfucate blocked='%s',blocker='%s',software='%s' - SKIPPED!", blocked, blocker, software)
- continue
-
- blocked = row[0]
- origin = row[1]
- nodeinfo_url = row[2]
- elif blocked.count("?") > 0:
- logger.debug("blocker='%s' uses obfucated domains, marking ...", blocker)
- instances.set_has_obfucation(blocker, True)
-
- # Some obscure them with question marks, not sure if that's dependent on version or not
- row = instances.deobfucate("?", blocked)
-
- logger.debug("row[]='%s'", type(row))
- if row is None:
- logger.warning("Cannot deobfucate blocked='%s',blocker='%s',software='%s' - SKIPPED!", blocked, blocker, software)
- continue
-
- blocked = row[0]
- origin = row[1]
- nodeinfo_url = row[2]
-
- logger.debug("Looking up instance by domainm, blocked='%s'", blocked)
- if not utils.is_domain_wanted(blocked):
- logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
- continue
- elif not instances.is_registered(blocked):
- logger.debug("Hash wasn't found, adding: blocked='%s',blocker='%s'", blocked, blocker)
- try:
- instances.add(blocked, blocker, inspect.currentframe().f_code.co_name, nodeinfo_url)
- except network.exceptions as exception:
- print(f"Exception during adding blocked='{blocked}',blocker='{blocker}': '{type(exception)}'")
- continue
-
- if not blocks.is_instance_blocked(blocker, blocked, block_level):
- blocks.add_instance(blocker, blocked, reason, block_level)
-
- if block_level == "reject":
- blockdict.append({
- "blocked": blocked,
- "reason" : reason
- })
- else:
- logger.debug(f"Updating block last seen and reason for blocker='{blocker}',blocked='{blocked}' ...")
- blocks.update_last_seen(blocker, blocked, block_level)
- blocks.update_reason(reason, blocker, blocked, block_level)
-
- logger.debug(f"Invoking cookies.clear({blocked}) ...")
- cookies.clear(blocked)
+ # Some obscure them with question marks, not sure if that's dependent on version or not
+ row = instances.deobfucate("?", block['blocked'])
- logger.debug("Invoking commit() ...")
- database.connection.commit()
- else:
- logger.warning("Unknown software: blocker='%s',software='%s'", blocker, software)
+ logger.debug("row[]='%s'", type(row))
+ if row is None:
+ logger.warning("Cannot deobfucate blocked='%s',blocker='%s',software='%s' - SKIPPED!", block['blocked'], blocker, software)
+ continue
+
+ block['blocked'] = row[0]
+ origin = row[1]
+ nodeinfo_url = row[2]
+
+ logger.debug("Looking up instance by domainm, blocked='%s'", block['blocked'])
+ if not utils.is_domain_wanted(block['blocked']):
+ logger.debug("blocked='%s' is not wanted - SKIPPED!", block['blocked'])
+ continue
+ elif not instances.is_registered(block['blocked']):
+ logger.debug("Hash wasn't found, adding: blocked='%s',blocker='%s'", block['blocked'], blocker)
+ try:
+ instances.add(block['blocked'], blocker, inspect.currentframe().f_code.co_name, nodeinfo_url)
+ except network.exceptions as exception:
+ logger.warning("Exception during adding blocked='%s',blocker='%s': '%s'", block['blocked'], blocker, type(exception))
+ continue
+
+ if not blocks.is_instance_blocked(blocker, block['blocked'], block['block_level']):
+ blocks.add_instance(blocker, block['blocked'], block['reason'], block['block_level'])
+
+ if block['block_level'] == "reject":
+ blockdict.append({
+ "blocked": block['blocked'],
+ "reason" : block['reason'],
+ })
+ else:
+ logger.debug("Updating block last seen and reason for blocker='%s',blocked='%s' ...", blocker, block['blocked'])
+ blocks.update_last_seen(blocker, block['blocked'], block['block_level'])
+ blocks.update_reason(block['reason'], blocker, block['blocked'], block['block_level'])
+
+ logger.debug("Invoking cookies.clear(%s) ...", block['blocked'])
+ cookies.clear(block['blocked'])
if instances.has_pending(blocker):
logger.debug("Invoking instances.update_data(%s) ...", blocker)
instances.update_data(blocker)
+ logger.debug("Invoking commit() ...")
+ database.connection.commit()
+
if config.get("bot_enabled") and len(blockdict) > 0:
network.send_bot_post(blocker, blockdict)
domain_helper.raise_on(domain)
if not isinstance(path, str):
- raise ValueError("path[]='%s' is not 'str'", type(path))
+ raise ValueError(f"path[]='{type(path)}' is not 'str'")
elif path == "":
raise ValueError("Parameter 'path' is empty")
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
+#logger.setLevel(logging.DEBUG)
-def fetch_blocks(domain: str) -> dict:
+def fetch_blocks(domain: str) -> list:
logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
domain_helper.raise_on(domain)
except network.exceptions as exception:
logger.warning("Exception '%s' during fetching instances from domain='%s'", type(exception), domain)
instances.set_last_error(domain, exception)
- return dict()
+ return blocklist
# Prevents exceptions:
if block_tag is None:
logger.debug("Instance has no block list: domain='%s'", domain)
- return dict()
+ return blocklist
table = block_tag.find("table")
- logger.debug(f"table[]='{type(table)}'")
+ logger.debug("table[]='%s'", type(table))
if table.find("tbody"):
rows = table.find("tbody").find_all("tr")
else:
rows = table.find_all("tr")
- logger.debug(f"Found rows()={len(rows)}")
+ logger.debug("Found rows()=%d", len(rows))
for line in rows:
- logger.debug(f"line='{line}'")
+ logger.debug("line='%s'", line)
blocked = tidyup.domain(line.find_all("td")[0].text)
reason = tidyup.reason(line.find_all("td")[1].text)
- logger.debug(f"blocked='{blocked}',reason='{reason}'")
+ logger.debug("blocked='%s',reason='%s'", blocked, reason)
+ if blocked == "":
+ logger.debug("line[]='%s' returned empty blocked domain - SKIPPED!")
+ continue
+ elif blocked.count("*") > 0:
+ logger.debug("domain='%s' uses obfucated domains, marking ...", domain)
+ instances.set_has_obfucation(domain, True)
+
+ # Obscured domain name with no hash
+ row = instances.deobfucate("*", blocked)
+
+ logger.debug("row[]='%s'", type(row))
+ if row is None:
+ logger.warning("Cannot deobfucate blocked='%s',domain='%s' - SKIPPED!", blocked, domain)
+ continue
+
+ logger.debug("blocked='%s' de-obscured to '%s'", blocked, row[0])
+ blocked = row[0]
+ elif blocked.count("?") > 0:
+ logger.debug("domain='%s' uses obfucated domains, marking ...", domain)
+ instances.set_has_obfucation(domain, True)
+
+ # Obscured domain name with no hash
+ row = instances.deobfucate("?", blocked)
+
+ logger.debug("row[]='%s'", type(row))
+ if row is None:
+ logger.warning("Cannot deobfucate blocked='%s',domain='%s' - SKIPPED!", blocked, domain)
+ continue
+
+ logger.debug("blocked='%s' de-obscured to '%s'", blocked, row[0])
+ blocked = row[0]
+
+ logger.debug("blocked[%s]='%s'", type(blocked), blocked)
if not utils.is_domain_wanted(blocked):
logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
logger.debug(f"Appending blocked='{blocked}',reason='{reason}'")
blocklist.append({
- "domain": tidyup.domain(blocked),
- "reason": tidyup.reason(reason)
+ "blocker" : domain,
+ "blocked" : tidyup.domain(blocked),
+ "reason" : tidyup.reason(reason),
+ "block_level": "reject",
})
- logger.debug("Next!")
- logger.debug("Returning blocklist()=%d for domain='%s' - EXIT!", len(blocklist), domain)
- return {
- "reject": blocklist
- }
+ logger.debug("blocklist()=%d - EXIT!", len(blocklist))
+ return blocklist
logger.debug("peers()=%d - EXIT!", len(peers))
return peers
-def fetch_blocks(domain: str, origin: str, nodeinfo_url: str):
- logger.debug(f"domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}' - CALLED!")
+def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
+ logger.debug("domain='%s,nodeinfo_url='%s' - CALLED!", domain, nodeinfo_url)
domain_helper.raise_on(domain)
- if not isinstance(origin, str) and origin is not None:
- raise ValueError(f"Parameter origin[]='{type(origin)}' is not 'str'")
- elif origin == "":
- raise ValueError("Parameter 'origin' is empty")
- elif not isinstance(nodeinfo_url, str):
+
+ if not isinstance(nodeinfo_url, str):
raise ValueError(f"Parameter nodeinfo_url[]='{type(nodeinfo_url)}' is not 'str'")
elif nodeinfo_url == "":
raise ValueError("Parameter 'nodeinfo_url' is empty")
"Instâncias bloqueadas",
]
+ blocklist = list()
+
try:
# json endpoint for newer mastodongs
- found_blocks = list()
-
- logger.debug(f"Fetching /instances from domain='{domain}'")
+ logger.debug("Fetching /instances from domain='%s'", domain)
response = network.fetch_response(
domain,
"/instances",
logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
if response.ok and response.status_code < 300 and response.text != "":
- logger.debug(f"Parsing {len(response.text)} Bytes ...")
+ logger.debug("Parsing %s Bytes ...", len(response.text))
doc = bs4.BeautifulSoup(response.text, "html.parser")
- logger.debug(f"doc[]={type(doc)}")
+ logger.debug("doc[]='%s'", type(doc))
headers = doc.findAll("h5")
found = None
- logger.debug(f"Search in {len(headers)} header(s) ...")
+ logger.debug("Search in %d header(s) ...", len(headers))
for header in headers:
- logger.debug(f"header[]={type(header)}")
+ logger.debug("header[]='%s'", type(header))
content = header.contents[0]
- logger.debug(f"content='{content}'")
+ logger.debug("content[%s]='%s'", type(content), content)
if content in translations:
logger.debug("Found header with blocked instances - BREAK!")
found = header
break
- logger.debug(f"found[]='{type(found)}'")
+ logger.debug("found[]='%s'", type(found))
if found is None:
logger.debug("domain='%s' is not blocking any instances - EXIT!", domain)
- return
+ return blocklist
blocking = found.find_next("ul").findAll("a")
- logger.debug(f"Found {len(blocking)} blocked instance(s) ...")
+ logger.debug("Found %d blocked instance(s) ...", len(blocking))
for tag in blocking:
- logger.debug(f"tag[]='{type(tag)}'")
+ logger.debug("tag[]='%s'", type(tag))
blocked = tidyup.domain(tag.contents[0])
+ logger.debug("blocked='%s'", blocked)
- logger.debug(f"blocked='{blocked}'")
if not utils.is_domain_wanted(blocked):
logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
- elif not instances.is_registered(blocked):
- logger.debug("Hash wasn't found, adding: blocked='%s',domain='%s'", blocked, domain)
- instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
-
- if not blocks.is_instance_blocked(domain, blocked, "reject"):
- logger.debug("Blocking: domain='%s',blocked='%s' (reject)", domain, blocked)
- blocks.add_instance(domain, blocked, None, "reject")
-
- found_blocks.append({
- "blocked": blocked,
- "reason" : None
- })
- else:
- logger.debug(f"Updating block last seen for domain='{domain}',blocked='{blocked}' ...")
- blocks.update_last_seen(domain, blocked, "reject")
-
- logger.debug("Invoking commit() ...")
- database.connection.commit()
+
+ logger.debug("Appending blocker='%s',blocked='%s',block_level='reject'", domain, blocked)
+ blocklist.append({
+ "blocker" : domain,
+ "blocked" : blocked,
+ "reason" : None,
+ "block_level": "reject",
+ })
+
except network.exceptions as exception:
logger.warning("domain='%s',exception[%s]:'%s'", domain, type(exception), str(exception))
instances.set_last_error(domain, exception)
- logger.debug("EXIT!")
+ logger.debug("blocklist()=%d - EXIT!", len(blocklist))
+ return blocklist
"followers_only": blocklist["Limited servers"] + blocklist["Silenced servers"],
}
-def fetch_blocks(domain: str, origin: str, nodeinfo_url: str):
- logger.debug(f"domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}' - CALLED!")
+def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
+ logger.debug("domain='%s',nodeinfo_url='%s' - CALLED!", domain, nodeinfo_url)
domain_helper.raise_on(domain)
- if not isinstance(origin, str) and origin is not None:
- raise ValueError(f"Parameter origin[]='{type(origin)}' is not 'str'")
- elif origin == "":
- raise ValueError("Parameter 'origin' is empty")
- elif not isinstance(nodeinfo_url, str):
+
+ if not isinstance(nodeinfo_url, str):
raise ValueError(f"Parameter nodeinfo_url[]='{type(nodeinfo_url)}' is not 'str'")
elif nodeinfo_url == "":
raise ValueError("Parameter 'nodeinfo_url' is empty")
+ # Init block list
+ blocklist = list()
+
# No CSRF by default, you don't have to add network.api_headers by yourself here
headers = tuple()
except network.exceptions as exception:
logger.warning("Exception '%s' during checking CSRF (fetch_blocks,%s) - EXIT!", type(exception), __name__)
instances.set_last_error(domain, exception)
- return
+ return blocklist
try:
# json endpoint for newer mastodongs
- found_blocks = list()
- blocklist = list()
-
- rows = {
- "reject" : [],
- "media_removal" : [],
- "followers_only": [],
- "report_removal": [],
- }
-
logger.debug("Querying API domain_blocks: domain='%s'", domain)
data = network.get_json_api(
domain,
logger.debug("data[]='%s'", type(data))
if "error_message" in data:
- logger.debug(f"Was not able to fetch domain_blocks from domain='{domain}': status_code='{data['status_code']}',error_message='{data['error_message']}'")
+ logger.debug("Was not able to fetch domain_blocks from domain='%s': status_code='%d',error_message='%s'", domain, data['status_code'], data['error_message'])
instances.set_last_error(domain, data)
- return
+ return blocklist
elif "json" in data and "error" in data["json"]:
logger.warning("JSON API returned error message: '%s'", data['json']['error'])
instances.set_last_error(domain, data)
- return
+ return blocklist
else:
# Getting blocklist
- blocklist = data["json"]
+ rows = data["json"]
- if len(blocklist) > 0:
- logger.info("Checking %d entries from domain='%s' ...", len(blocklist), domain)
- for block in blocklist:
+ if len(rows) == 0:
+ logger.debug("domain='%s' has returned zero rows, trying /about/more page ...", domain)
+ rows = fetch_blocks_from_about(domain)
+
+ if len(rows) > 0:
+ logger.info("Checking %d entries from domain='%s' ...", len(rows), domain)
+ for block in rows:
# Check type
logger.debug("block[]='%s'", type(block))
if not isinstance(block, dict):
logger.debug(f"block[]='{type(block)}' is of type 'dict' - SKIPPED!")
continue
- # Map block -> entry
- logger.debug(f"block[{type(block)}]='{block}'")
- entry = {
- "domain": block["domain"],
- "hash" : block["digest"],
- "reason": block["comment"] if "comment" in block else None
- }
-
- logger.debug("severity='%s',domain='%s',hash='%s',comment='%s'", block['severity'], block['domain'], block['digest'], block['comment'])
- if block['severity'] == 'suspend':
- logger.debug("Adding entry='%s' with severity='%s' ...", entry, block['severity'])
- rows['reject'].append(entry)
- elif block['severity'] == 'silence':
- logger.debug("Adding entry='%s' with severity='%s' ...", entry, block['severity'])
- rows['followers_only'].append(entry)
- elif block['severity'] == 'reject_media':
- logger.debug("Adding entry='%s' with severity='%s' ...", entry, block['severity'])
- rows['media_removal'].append(entry)
- elif block['severity'] == 'reject_reports':
- logger.debug("Adding entry='%s' with severity='%s' ...", entry, block['severity'])
- rows['report_removal'].append(entry)
- else:
- logger.warning("Unknown severity='%s', domain='%s'", block['severity'], block['domain'])
- else:
- logger.debug("domain='%s' has returned zero rows, trying /about/more page ...", domain)
- rows = fetch_blocks_from_about(domain)
-
- logger.info("Checking %d entries from domain='%s' ...", len(rows.items()), domain)
- for block_level, blocklist in rows.items():
- logger.debug("domain='%s',block_level='%s',blocklist()=%d", domain, block_level, len(blocklist))
- block_level = tidyup.domain(block_level)
-
- logger.debug("block_level='%s' - AFTER!", block_level)
- if block_level == "":
- logger.warning("block_level is empty, domain='%s'", domain)
- continue
- elif block_level == "accept":
- logger.debug("domain='%s' skipping block_level='accept'", domain)
- continue
-
- logger.debug("Checking %s entries from domain='{domain}',block_level='{block_level}' ...", len(blocklist))
- for block in blocklist:
- logger.debug("block[]='%s'", type(block))
- blocked, blocked_hash, reason = block.values()
+ reason = tidyup.reason(block["comment"]) if "comment" in block else None
- logger.debug("blocked='%s',blocked_hash='%s',reason='%s'", blocked, blocked_hash, reason)
- blocked = tidyup.domain(blocked)
- reason = tidyup.reason(reason) if reason is not None and reason != "" else None
- logger.debug("blocked='%s',reason='%s' - AFTER!", blocked, reason)
+ logger.debug("Appending blocker='%s',blocked='%s',reason='%s',block_level='%s'", domain, block['domain'], reason, block['severity'])
+ blocklist.append({
+ "blocker" : domain,
+ "blocked" : block["domain"],
+ "hash" : block["digest"],
+ "reason" : reason,
+ "block_level": block["severity"]
+ })
+ else:
+ logger.debug("domain='%s' has no block list")
- if blocked == "":
- logger.warning("blocked is empty, domain='%s'", domain)
- continue
- elif blocked.count("*") > 0:
- logger.debug("domain='%s' uses obfucated domains, marking ...", domain)
- instances.set_has_obfucation(domain, True)
-
- # Doing the hash search for instance names as well to tidy up DB
- row = instances.deobfucate("*", blocked, blocked_hash)
-
- logger.debug("row[]='%s'", type(row))
- if row is None:
- logger.warning("Cannot deobfucate blocked='%s',blocked_hash='%s' - SKIPPED!", blocked, blocked_hash)
- continue
-
- logger.debug("Updating domain: row[0]='%s'", row[0])
- blocked = row[0]
- origin = row[1]
- nodeinfo_url = row[2]
- elif blocked.count("?") > 0:
- logger.debug("domain='%s' uses obfucated domains, marking ...", domain)
- instances.set_has_obfucation(domain, True)
-
- # Doing the hash search for instance names as well to tidy up DB
- row = instances.deobfucate("?", blocked, blocked_hash)
-
- logger.debug("row[]='%s'", type(row))
- if row is None:
- logger.warning("Cannot deobfucate blocked='%s',blocked_hash='%s' - SKIPPED!", blocked, blocked_hash)
- continue
-
- logger.debug("Updating domain: row[0]='%s'", row[0])
- blocked = row[0]
- origin = row[1]
- nodeinfo_url = row[2]
-
- logger.debug("Looking up instance by domain: blocked='%s'", blocked)
- if not utils.is_domain_wanted(blocked):
- logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
- continue
- elif not instances.is_registered(blocked):
- logger.debug(f"Domain blocked='%s' wasn't found, adding ..., domain='%s',origin='%s',nodeinfo_url='%s'", blocked, domain, origin, nodeinfo_url)
- instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
+ logger.info("Checking %d entries from domain='%s' ...", len(blocklist), domain)
+ for block in blocklist:
+ logger.debug("blocker='%s',blocked='%s',reason='%s',block_level='%s'", block['blocker'], block['blocked'], block['reason'], block['block_level'])
+ block['blocked'] = tidyup.domain(block['blocked'])
+ block['reason'] = tidyup.reason(block['reason'])
+ logger.debug("blocked='%s',reason='%s' - AFTER!", block['blocked'], block['reason'])
- logger.debug("Looking up instance by domain: blocked='%s'", blocked)
- if not utils.is_domain_wanted(blocked):
- logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
- continue
- elif not instances.is_registered(blocked):
- logger.debug("Hash wasn't found, adding: blocked='%s',domain='%s'", blocked, domain)
- instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
-
- if not blocks.is_instance_blocked(domain, blocked, block_level):
- logger.debug("Blocking domain='%s',blocked='%s',block_level='%s' ...", domain, blocked, block_level)
- blocks.add_instance(domain, blocked, reason, block_level)
-
- if block_level == "reject":
- found_blocks.append({
- "blocked": blocked,
- "reason" : reason
- })
- else:
- logger.debug("Updating block last seen and reason for domain='%s',blocked='%s' ...", domain, blocked)
- blocks.update_last_seen(domain, blocked, block_level)
- blocks.update_reason(reason, domain, blocked, block_level)
-
- logger.debug("Invoking commit() ...")
- database.connection.commit()
except network.exceptions as exception:
logger.warning("domain='%s',exception[%s]='%s'", domain, type(exception), str(exception))
instances.set_last_error(domain, exception)
- logger.debug("EXIT!")
+ logger.debug("blocklist()=%d - EXIT!", len(blocklist))
+ return blocklist
logger.debug("peers()=%d - EXIT!", len(peers))
return peers
-def fetch_blocks(domain: str) -> dict:
+def fetch_blocks(domain: str) -> list:
logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
domain_helper.raise_on(domain)
logger.debug("Fetching misskey blocks from domain='%s'", domain)
- blocklist = {
- "suspended": [],
- "blocked" : []
- }
+ blocklist = list()
offset = 0
step = config.get("misskey_limit")
for instance in rows:
# Is it there?
logger.debug("instance[%s]='%s'", type(instance), instance)
- if "isSuspended" in instance and instance["isSuspended"] and not dicts.has_key(blocklist["suspended"], "domain", instance["host"]):
+ if "isSuspended" in instance and instance["isSuspended"]:
count = count + 1
- blocklist["suspended"].append({
- "domain": tidyup.domain(instance["host"]),
- # no reason field, nothing
- "reason": None
+ blocklist.append({
+ "blocker" : domain,
+ "blocked" : tidyup.domain(instance["host"]),
+ "reason" : None,
+ "block_level": "suspended",
})
logger.debug("count=%d", count)
for instance in rows:
# Is it there?
logger.debug("instance[%s]='%s'", type(instance), instance)
- if "isBlocked" in instance and instance["isBlocked"] and not dicts.has_key(blocklist["blocked"], "domain", instance["host"]):
+ if "isBlocked" in instance and instance["isBlocked"]:
count = count + 1
- blocklist["blocked"].append({
- "domain": tidyup.domain(instance["host"]),
- "reason": None
+ blocked = tidyup.domain(instance["host"])
+ logger.debug("Appending blocker='%s',blocked='%s',block_level='reject'", domain, blocked)
+ blocklist.append({
+ "blocker" : domain,
+ "blocked" : blocked,
+ "reason" : None,
+ "block_level": "reject",
})
logger.debug("count=%d", count)
offset = 0
break
- logger.debug("blocked()=%d,suspended()=%d - EXIT!", len(blocklist['blocked']), len(blocklist['suspended']))
- return {
- "reject" : blocklist["blocked"],
- "followers_only": blocklist["suspended"]
- }
+ logger.debug("blocklist()=%d - EXIT!", len(blocklist))
+ return blocklist
"Reject": "Suspended servers",
}
-def fetch_blocks(domain: str, origin: str, nodeinfo_url: str):
- logger.debug(f"domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}' - CALLED!")
+def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
+ logger.debug("domain='%s',nodeinfo_url='%s' - CALLED!", domain, nodeinfo_url)
domain_helper.raise_on(domain)
- if not isinstance(origin, str) and origin is not None:
- raise ValueError(f"Parameter origin[]='{type(origin)}' is not 'str'")
- elif origin == "":
- raise ValueError("Parameter 'origin' is empty")
- elif not isinstance(nodeinfo_url, str):
+
+ if not isinstance(nodeinfo_url, str):
raise ValueError(f"Parameter nodeinfo_url[]='{type(nodeinfo_url)}' is not 'str'")
elif nodeinfo_url == "":
raise ValueError("Parameter 'nodeinfo_url' is empty")
- # @TODO Unused blockdict
blockdict = list()
rows = None
try:
logger.debug("row[]='%s'", type(row))
if row is None:
- logger.warning("Cannot deobfucate blocked='%s',domain='%s',origin='%s' - SKIPPED!", blocked, domain, origin)
+ logger.warning("Cannot deobfucate blocked='%s',domain='%s' - SKIPPED!", blocked, domain)
continue
logger.debug("blocked='%s' de-obscured to '%s'", blocked, row[0])
- blocked = row[0]
- origin = row[1]
- nodeinfo_url = row[2]
+ blocked = row[0]
elif blocked.count("?") > 0:
logger.debug("domain='%s' uses obfucated domains, marking ...", domain)
instances.set_has_obfucation(domain, True)
logger.debug("row[]='%s'", type(row))
if row is None:
- logger.warning("Cannot deobfucate blocked='%s',domain='%s',origin='%s' - SKIPPED!", blocked, domain, origin)
+ logger.warning("Cannot deobfucate blocked='%s',domain='%s' - SKIPPED!", blocked, domain)
continue
logger.debug("blocked='%s' de-obscured to '%s'", blocked, row[0])
- blocked = row[0]
- origin = row[1]
- nodeinfo_url = row[2]
+ blocked = row[0]
logger.debug("blocked='%s'", blocked)
if not utils.is_domain_wanted(blocked):
logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
- elif not instances.is_registered(blocked):
- # Commit changes
- logger.debug("Invoking commit() ...")
- database.connection.commit()
-
- logger.debug("Domain blocked='%s' wasn't found, adding ..., domain='%s',origin='%s',nodeinfo_url='%s'", blocked, domain, origin, nodeinfo_url)
- instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
-
- if not blocks.is_instance_blocked(domain, blocked, block_level):
- logger.debug("Blocking domain='%s',blocked='%s',block_level='%s' ...", domain, blocked, block_level)
- blocks.add_instance(domain, blocked, None, block_level)
-
- if block_level == "reject":
- logger.debug("Appending blocked='%s' ...", blocked)
- blockdict.append({
- "blocked": blocked,
- "reason" : None
- })
- else:
- logger.debug("Updating block last seen for domain='%s',blocked='%s',block_level='%s' ...", domain, blocked, block_level)
- blocks.update_last_seen(domain, blocked, block_level)
+
+ logger.debug("Appending blocker='%s',blocked='%s',block_level='%s' ...", domain, blocked, block_level)
+ blockdict.append({
+ "blocker" : domain,
+ "blocked" : blocked,
+ "reason" : None,
+ "block_level": block_level,
+ })
+
elif "quarantined_instances" in data:
logger.debug("Found 'quarantined_instances' in JSON response: domain='%s'", domain)
found = True
logger.debug("row[]='%s'", type(row))
if row is None:
- logger.warning("Cannot deobfucate blocked='%s',domain='%s',origin='%s' - SKIPPED!", blocked, domain, origin)
+ logger.warning("Cannot deobfucate blocked='%s',domain='%s' - SKIPPED!", blocked, domain)
continue
logger.debug("blocked='%s' de-obscured to '%s'", blocked, row[0])
- blocked = row[0]
- origin = row[1]
- nodeinfo_url = row[2]
+ blocked = row[0]
elif blocked.count("?") > 0:
logger.debug("domain='%s' uses obfucated domains, marking ...", domain)
instances.set_has_obfucation(domain, True)
logger.debug("row[]='%s'", type(row))
if row is None:
- logger.warning("Cannot deobfucate blocked='%s',domain='%s',origin='%s' - SKIPPED!", blocked, domain, origin)
+ logger.warning("Cannot deobfucate blocked='%s',domain='%s' - SKIPPED!", blocked, domain)
continue
logger.debug("blocked='%s' de-obscured to '%s'", blocked, row[0])
- blocked = row[0]
- origin = row[1]
- nodeinfo_url = row[2]
+ blocked = row[0]
logger.debug("blocked='%s' - DEOBFUCATED!", blocked)
if not utils.is_domain_wanted(blocked):
logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
- elif not instances.is_registered(blocked):
- # Commit changes
- logger.debug("Invoking commit() ...")
- database.connection.commit()
-
- logger.debug("Domain blocked='%s' wasn't found, adding ..., domain='%s',origin='%s',nodeinfo_url='{nodeinfo_url}'", blocked, domain, origin)
- instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
- if not blocks.is_instance_blocked(domain, blocked, block_level):
- logger.debug("Blocking domain='%s',blocked='%s',block_level='%s' ...", domain, blocked, block_level)
- blocks.add_instance(domain, blocked, None, block_level)
+ logger.debug("Appending blocker='%s',blocked='%s',block_level='%s' ...", domain, blocked, block_level)
+ blockdict.append({
+ "blocker" : domain,
+ "blocked" : blocked,
+ "reason" : None,
+ "block_level": block_level,
+ })
- if block_level == "reject":
- logger.debug("Appending blocked='%s' ...", blocked)
- blockdict.append({
- "blocked": blocked,
- "reason" : None
- })
- else:
- logger.debug("Updating block last seen for domain='%s',blocked='%s',block_level='%s' ...", domain, blocked, block_level)
- blocks.update_last_seen(domain, blocked, block_level)
else:
logger.warning("Cannot find 'mrf_simple' or 'quarantined_instances' in JSON reply: domain='%s'", domain)
logger.debug("row[]='%s'", type(row))
if row is None:
- logger.warning("Cannot deobfucate blocked='%s',domain='%s',origin='%s' - SKIPPED!", blocked, domain, origin)
+ logger.warning("Cannot deobfucate blocked='%s',domain='%s' - SKIPPED!", blocked, domain)
continue
logger.debug("blocked='%s' de-obscured to '%s'", blocked, row[0])
- blocked = row[0]
- origin = row[1]
- nodeinfo_url = row[2]
+ blocked = row[0]
elif blocked.count("?") > 0:
logger.debug("domain='%s' uses obfucated domains, marking ...", domain)
instances.set_has_obfucation(domain, True)
logger.debug("row[]='%s'", type(row))
if row is None:
- logger.warning("Cannot deobfucate blocked='%s',domain='%s',origin='%s' - SKIPPED!", blocked, domain, origin)
+ logger.warning("Cannot deobfucate blocked='%s',domain='%s' - SKIPPED!", blocked, domain)
continue
logger.debug("blocked='%s' de-obscured to '%s'", blocked, row[0])
- blocked = row[0]
- origin = row[1]
- nodeinfo_url = row[2]
+ blocked = row[0]
logger.debug("blocked='%s' - DEOBFUCATED!", blocked)
if not utils.is_domain_wanted(blocked):
logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
- elif not instances.is_registered(blocked):
- logger.debug("Domain blocked='%s' wasn't found, adding ..., domain='%s',origin='%s',nodeinfo_url='%s'", blocked, domain, origin, nodeinfo_url)
- instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
-
- logger.debug("Updating block reason: reason='%s',domain='%s',blocked='%s',block_level='%s'", reason, domain, blocked, block_level)
- blocks.update_reason(reason, domain, blocked, block_level)
logger.debug("Checking %d blockdict records ...", len(blockdict))
- for entry in blockdict:
- if entry["blocked"] == blocked:
- logger.debug("Updating entry reason: blocked='%s',reason='%s'", blocked, reason)
- entry["reason"] = reason
+ for block in blockdict:
+ logger.debug("block[blocker]='%s',blocker='%s'", block['blocker'], blocker)
+ if block['blocked'] == blocked:
+ logger.debug("Updating reason='%s' for blocker='%s'", reason, block['blocked'])
+ block['reason'] = reason
elif "quarantined_instances_info" in data and "quarantined_instances" in data["quarantined_instances_info"]:
logger.debug("Found 'quarantined_instances_info' in JSON response: domain='%s'", domain)
logger.debug("row[]='%s'", type(row))
if row is None:
- logger.warning("Cannot deobfucate blocked='%s',domain='%s',origin='%s' - SKIPPED!", blocked, domain, origin)
+ logger.warning("Cannot deobfucate blocked='%s',domain='%s' - SKIPPED!", blocked, domain)
continue
logger.debug("blocked='%s' de-obscured to '%s'", blocked, row[0])
- blocked = row[0]
- origin = row[1]
- nodeinfo_url = row[2]
+ blocked = row[0]
elif blocked.count("?") > 0:
logger.debug("domain='%s' uses obfucated domains, marking ...", domain)
instances.set_has_obfucation(domain, True)
logger.debug("row[]='%s'", type(row))
if row is None:
- logger.warning("Cannot deobfucate blocked='%s',domain='%s',origin='%s' - SKIPPED!", blocked, domain, origin)
+ logger.warning("Cannot deobfucate blocked='%s',domain='%s' - SKIPPED!", blocked, domain)
continue
logger.debug("blocked='%s' de-obscured to '%s'", blocked, row[0])
- blocked = row[0]
- origin = row[1]
- nodeinfo_url = row[2]
+ blocked = row[0]
logger.debug("blocked='%s'", blocked)
if not utils.is_domain_wanted(blocked):
logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
- elif not instances.is_registered(blocked):
- logger.debug("Domain blocked='%s' wasn't found, adding ..., domain='%s',origin='%s',nodeinfo_url='%s'", blocked, domain, origin, nodeinfo_url)
- instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
-
- logger.debug("Updating block reason: reason='%s',domain='%s',blocked='%s',block_level='%s'", reason, domain, blocked, block_level)
- blocks.update_reason(reason, domain, blocked, block_level)
logger.debug("Checking %d blockdict records ...", len(blockdict))
- for entry in blockdict:
- if entry["blocked"] == blocked:
- logger.debug("Updating entry reason: blocked='%s',reason='%s'", blocked, reason)
- entry["reason"] = reason
+ for block in blockdict:
+ logger.debug("block[blocker]='%s',blocker='%s'", block['blocker'], blocker)
+ if block['blocked'] == blocked:
+ logger.debug("Updating reason='%s' for blocker='%s'", reason, block['blocked'])
+ block['reason'] = reason
else:
logger.warning("Cannot find 'mrf_simple_info' or 'quarantined_instances_info' in JSON reply: domain='%s'", domain)
logger.debug("row[]='%s'", type(row))
if row is None:
- logger.warning("Cannot deobfucate blocked='%s',domain='%s',origin='%s' - SKIPPED!", blocked, domain, origin)
+ logger.warning("Cannot deobfucate blocked='%s',domain='%s' - SKIPPED!", blocked, domain)
continue
logger.debug("blocked='%s' de-obscured to '%s'", blocked, row[0])
- blocked = row[0]
- origin = row[1]
- nodeinfo_url = row[2]
+ blocked = row[0]
elif blocked.count("?") > 0:
logger.debug("domain='%s' uses obfucated domains, marking ...", domain)
instances.set_has_obfucation(domain, True)
logger.debug("row[]='%s'", type(row))
if row is None:
- logger.warning("Cannot deobfucate blocked='%s',domain='%s',origin='%s' - SKIPPED!", blocked, domain, origin)
+ logger.warning("Cannot deobfucate blocked='%s',domain='%s' - SKIPPED!", blocked, domain)
continue
logger.debug("blocked='%s' de-obscured to '%s'", blocked, row[0])
- blocked = row[0]
- origin = row[1]
- nodeinfo_url = row[2]
+ blocked = row[0]
logger.debug("blocked='%s' - DEOBFUCATED!", blocked)
if not utils.is_domain_wanted(blocked):
logger.warning("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
- elif not instances.is_registered(blocked):
- logger.debug("Domain blocked='%s' wasn't found, adding ..., domain='%s',origin='%s',nodeinfo_url='%s'", blocked, domain, origin, nodeinfo_url)
- instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
-
- if not blocks.is_instance_blocked(domain, blocked, block_level):
- logger.debug("Blocking domain='%s',blocked='%s', block_level='%s' ...", domain, blocked, block_level)
- blocks.add_instance(domain, blocked, reason, block_level)
-
- if block_level == "reject":
- logger.debug("Appending blocked='%s' ...", blocked)
- blockdict.append({
- "blocked": blocked,
- "reason" : reason
- })
- else:
- logger.debug("Updating block last seen for domain='%s',blocked='%s',block_level='%s' ...", domain, blocked, block_level)
- blocks.update_reason(reason, domain, blocked, block_level)
- logger.debug("Invoking commit() ...")
- database.connection.commit()
+ logger.debug("Appending blocker='%s',blocked='%s',reason='%s',block_level='%s' ...",domain, blocked, reason, block_level)
+ blockdict.append({
+ "blocker" : domain,
+ "blocked" : blocked,
+ "reason" : reason,
+ "block_level": block_level,
+ })
- logger.debug("EXIT!")
+ logger.debug("blockdict()=%d - EXIT!", len(blockdict))
+ return blockdict
def fetch_blocks_from_about(domain: str) -> dict:
logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
--- /dev/null
+#!/bin/sh
+
+pylint --verbose --rcfile=pylint.rc fba *.py > report.txt