X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=fba%2Fnetworks%2Fpleroma.py;h=4126999e8f08ea466980b4cfa3b5bd2cb1dc6f9a;hb=c41510ea7e38271d5aea26ec8c8a18983242a5a4;hp=42ec809bcca4c8ffc2d51e4a947932104e13abdd;hpb=e3e0987fbc57999a7009dd6b3c4b4aaa78f517c4;p=fba.git diff --git a/fba/networks/pleroma.py b/fba/networks/pleroma.py index 42ec809..4126999 100644 --- a/fba/networks/pleroma.py +++ b/fba/networks/pleroma.py @@ -17,17 +17,20 @@ import logging import bs4 +import validators from fba import database from fba import utils +from fba.helpers import blacklist from fba.helpers import config from fba.helpers import domain as domain_helper from fba.helpers import tidyup -from fba.http import federation from fba.http import network +from fba.http import nodeinfo +from fba.models import blocks from fba.models import instances logging.basicConfig(level=logging.INFO) @@ -41,46 +44,64 @@ language_mapping = { "followers-only" : "followers_only", "media removal" : "media_removal", "media_removal" : "media_removal", - "media force-set as sensitive": "nsfw", - "nsfw" : "nsfw", + "media force-set as sensitive": "media_nsfw", + "nsfw" : "media_nsfw", "reject" : "reject", "suspended servers": "reject", "silenced servers" : "silenced", - "removal from \"the whole known network\" timeline": " federated_timeline_removal", + "removal from \"the whole known network\" timeline": "federated_timeline_removal", } -def fetch_blocks(domain: str, nodeinfo_url: str) -> list: - logger.debug("domain='%s',nodeinfo_url='%s' - CALLED!", domain, nodeinfo_url) +def fetch_blocks(domain: str) -> list: + logger.debug("domain='%s' - CALLED!", domain) domain_helper.raise_on(domain) - if not isinstance(nodeinfo_url, str): - raise ValueError(f"Parameter nodeinfo_url[]='{type(nodeinfo_url)}' is not 'str'") - elif nodeinfo_url == "": - raise ValueError("Parameter 'nodeinfo_url' is empty") + if blacklist.is_blacklisted(domain): + raise Exception(f"domain='{domain}' is blacklisted but function is invoked.") + elif not instances.is_registered(domain): + raise Exception(f"domain='{domain}' is not registered but function is invoked.") + # Init variables blockdict = list() rows = None + try: - logger.debug("Fetching nodeinfo: domain='%s',nodeinfo_url='%s'", domain, nodeinfo_url) - rows = federation.fetch_nodeinfo(domain, nodeinfo_url) + logger.debug("Fetching nodeinfo: domain='%s'", domain) + rows = nodeinfo.fetch(domain, update_mode=False) + + if "error_message" in rows: + logger.warning("Error message '%s' during fetching nodeinfo for domain='%s'", rows["error_message"], domain) + instances.set_last_error(domain, rows) + instances.update(domain) + + logger.debug("Returning empty list ... - EXIT!") + return list() + elif "exception" in rows: + logger.warning("Exception '%s' during fetching nodeinfo for domain='%s' - EXIT!", type(rows["exception"]), domain) + return list() + elif "json" in rows: + logger.debug("rows[json] found for domain='%s'", domain) + rows = rows["json"] + except network.exceptions as exception: logger.warning("Exception '%s' during fetching nodeinfo from domain='%s'", type(exception), domain) instances.set_last_error(domain, exception) + logger.debug("rows[]='%s'", type(rows)) if rows is None: - logger.warning("Could not fetch nodeinfo from domain='%s'", domain) + logger.warning("Could not fetch nodeinfo from domain='%s' - EXIT!", domain) return list() elif "metadata" not in rows: - logger.warning("rows()=%d does not have key 'metadata', domain='%s'", len(rows), domain) + logger.warning("rows()=%d does not have key 'metadata', domain='%s' - EXIT!", len(rows), domain) return list() elif "federation" not in rows["metadata"]: - logger.warning("rows()=%d does not have key 'federation', domain='%s'", len(rows["metadata"]), domain) + logger.warning("rows()=%d does not have key 'federation', domain='%s' - EXIT!", len(rows["metadata"]), domain) return list() - data = rows["metadata"]["federation"] found = False - + data = rows["metadata"]["federation"] logger.debug("data[]='%s'", type(data)) + if "mrf_simple" in data: logger.debug("Found mrf_simple in API response from domain='%s'", domain) found = True @@ -93,7 +114,7 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list: } ).items(): logger.debug("block_level='%s', blocklist()=%d", block_level, len(blocklist)) - block_level = tidyup.domain(block_level) + block_level = tidyup.domain(block_level) if block_level != "" else None logger.debug("block_level='%s' - AFTER!", block_level) if block_level == "": @@ -103,60 +124,55 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list: logger.debug("domain='%s' skipping block_level='accept'", domain) continue - block_level = utils.alias_block_level(block_level) + block_level = blocks.alias_block_level(block_level) logger.debug("Checking %d entries from domain='%s',block_level='%s' ...", len(blocklist), domain, block_level) - if len(blocklist) > 0: - for blocked in blocklist: - logger.debug("blocked='%s' - BEFORE!", blocked) - blocked = tidyup.domain(blocked) - logger.debug("blocked='%s' - AFTER!", blocked) - - if blocked == "": - logger.warning("blocked is empty after tidyup.domain(): domain='%s',block_level='%s' - SKIPPED!", domain, block_level) - continue - elif not utils.is_domain_wanted(blocked): - logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked) - continue - - logger.debug("Invoking utils.deobfuscate_domain(%s, %s) ...", blocked, domain) - blocked = utils.deobfuscate_domain(blocked, domain) - - logger.debug("blocked='%s' - DEOBFUSCATED!", blocked) - if not utils.is_domain_wanted(blocked): - logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked) - continue - - logger.debug("Appending blocker='%s',blocked='%s',block_level='%s' ...", domain, blocked, block_level) - blockdict.append({ - "blocker" : domain, - "blocked" : blocked, - "reason" : None, - "block_level": block_level, - }) + for blocked in blocklist: + logger.debug("blocked='%s' - BEFORE!", blocked) + blocked = tidyup.domain(blocked) if blocked != "" else None + logger.debug("blocked='%s' - AFTER!", blocked) + + if blocked in [None, ""]: + logger.warning("blocked='%s' is empty after tidyup.domain(): domain='%s',block_level='%s' - SKIPPED!", blocked, domain, block_level) + continue + elif validators.domain(blocked) and blacklist.is_blacklisted(blocked): + logger.debug("blocked='%s' is blacklisted - SKIPPED!") + continue + + logger.debug("Invoking utils.deobfuscate(%s, %s) ...", blocked, domain) + blocked = utils.deobfuscate(blocked, domain) + logger.debug("blocked[%s]='%s' - DEOBFUSCATED!", type(blocked), blocked) + + if blocked in [None, ""]: + logger.warning("instance[host]='%s' is None or empty after tidyup.domain() - SKIPPED!", instance["host"]) + continue + elif not domain_helper.is_wanted(blocked): + logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked) + continue + + logger.debug("Appending blocker='%s',blocked='%s',block_level='%s' ...", domain, blocked, block_level) + blockdict.append({ + "blocker" : domain, + "blocked" : blocked, + "reason" : None, + "block_level": block_level, + }) elif "quarantined_instances" in data: logger.debug("Found 'quarantined_instances' in JSON response: domain='%s'", domain) found = True block_level = "quarantined" + logger.debug("Checking %d quarantined instance(s) ...", len(data["quarantined_instances"])) for blocked in data["quarantined_instances"]: logger.debug("blocked='%s' - BEFORE!", blocked) - blocked = tidyup.domain(blocked) + blocked = tidyup.domain(blocked) if blocked != "" else None logger.debug("blocked='%s' - AFTER!", blocked) - if blocked == "": + if blocked in [None, ""]: logger.warning("blocked is empty after tidyup.domain(): domain='%s',block_level='%s'", domain, block_level) continue - elif not utils.is_domain_wanted(blocked): - logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked) - continue - - logger.debug("Invoking utils.deobfuscate_domain(%s, %s) ...", blocked, domain) - blocked = utils.deobfuscate_domain(blocked, domain) - - logger.debug("blocked='%s' - DEOBFUSCATED!", blocked) - if not utils.is_domain_wanted(blocked): + elif not domain_helper.is_wanted(blocked): logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked) continue @@ -185,22 +201,22 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list: } ).items(): logger.debug("block_level='%s', info.items()=%d", block_level, len(info.items())) - block_level = tidyup.domain(block_level) + block_level = tidyup.domain(block_level) if block_level != "" else None logger.debug("block_level='%s' - AFTER!", block_level) - if block_level == "": - logger.warning("block_level is now empty!") + if block_level in [None, ""]: + logger.warning("block_level='%s' is now empty!", block_level) continue elif block_level == "accept": logger.debug("domain='%s': Skipping block_level='%s' ...", domain, block_level) continue - block_level = utils.alias_block_level(block_level) + block_level = blocks.alias_block_level(block_level) logger.debug("Checking %d entries from domain='%s',block_level='%s' ...", len(info.items()), domain, block_level) for blocked, reason in info.items(): logger.debug("blocked='%s',reason[%s]='%s' - BEFORE!", blocked, type(reason), reason) - blocked = tidyup.domain(blocked) + blocked = tidyup.domain(blocked) if blocked != "" else None logger.debug("blocked='%s' - AFTER!", blocked) if isinstance(reason, str): @@ -217,13 +233,6 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list: if blocked == "": logger.warning("blocked is empty after tidyup.domain(): domain='%s',block_level='%s'", domain, block_level) continue - elif not utils.is_domain_wanted(blocked): - logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked) - continue - - logger.debug("Invoking utils.deobfuscate_domain(%s, %s) ...", blocked, domain) - blocked = utils.deobfuscate_domain(blocked, domain) - logger.debug("blocked='%s' - DEOBFUSCATED!", blocked) logger.debug("Checking %d blockdict records ...", len(blockdict)) for block in blockdict: @@ -241,8 +250,8 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list: rows = data["quarantined_instances_info"]["quarantined_instances"] for blocked in rows: logger.debug("blocked='%s' - BEFORE!", blocked) - reason = tidyup.reason(rows[blocked]["reason"]) - blocked = tidyup.domain(blocked) + reason = tidyup.reason(rows[blocked]["reason"]) if rows[blocked]["reason"] != "" else None + blocked = tidyup.domain(blocked) if blocked != "" else None logger.debug("blocked='%s',reason='%s' - AFTER!", blocked, reason) if blocked not in rows or "reason" not in rows[blocked]: @@ -251,19 +260,8 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list: elif blocked == "": logger.warning("blocked is empty after tidyup.domain(): domain='%s',block_level='%s'", domain, block_level) continue - elif not utils.is_domain_wanted(blocked): - logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked) - continue - - logger.debug("Invoking utils.deobfuscate_domain(%s, %s) ...", blocked, domain) - blocked = utils.deobfuscate_domain(blocked, domain) - - logger.debug("blocked='%s' - DEOBFUSCATED!", blocked) - if not utils.is_domain_wanted(blocked): - logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked) - continue - logger.debug("Checking %d blockdict records ...", len(blockdict)) + logger.debug("Checking %d blockdict record(s) ...", len(blockdict)) for block in blockdict: logger.debug("block[blocked]='%s',blocked='%s'", block["blocked"], blocked) if block["blocked"] == blocked: @@ -272,27 +270,17 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list: else: logger.warning("Cannot find 'mrf_simple_info' or 'quarantined_instances_info' in JSON reply: domain='%s'", domain) + logger.debug("found='%s'", found) if not found: logger.debug("Did not find any useable JSON elements, domain='%s', continuing with /about page ...", domain) blocklist = fetch_blocks_from_about(domain) logger.debug("blocklist()=%d", len(blocklist)) if len(blocklist) > 0: - logger.info("Checking %d different blocklists ...", len(blocklist)) + logger.info("Checking %d different blocklist(s) ...", len(blocklist)) for block_level in blocklist: - logger.debug("block_level='%s'", block_level) - rows = blocklist[block_level] - - logger.debug("rows[%s]()=%d'", type(rows), len(rows)) - for block in rows: - logger.debug("Invoking utils.deobfuscate_domain(%s, %s) ...", block["blocked"], domain) - block["blocked"] = utils.deobfuscate_domain(block["blocked"], domain) - - logger.debug("block[blocked]='%s' - DEOBFUSCATED!", block["blocked"]) - if not utils.is_domain_wanted(block["blocked"]): - logger.debug("block[blocked]='%s' is not wanted - SKIPPED!", block["blocked"]) - continue - + logger.debug("Checking blocklist[%s]()=%d entries ...", block_level, blocklist[block_level]) + for block in blocklist[block_level]: logger.debug("Appending blocker='%s',block[blocked]='%s',block[reason]='%s',block_level='%s' ...",domain, block["blocked"], block["reason"], block_level) blockdict.append({ "blocker" : domain, @@ -308,8 +296,15 @@ def fetch_blocks_from_about(domain: str) -> dict: logger.debug("domain='%s' - CALLED!", domain) domain_helper.raise_on(domain) - logger.debug("Fetching mastodon blocks from domain='%s'", domain) + if blacklist.is_blacklisted(domain): + raise Exception(f"domain='{domain}' is blacklisted but function is invoked.") + elif not instances.is_registered(domain): + raise Exception(f"domain='{domain}' is not registered but function is invoked.") + + # Init variables doc = None + + logger.debug("Fetching mastodon blocks from domain='%s'", domain) for path in ["/instance/about/index.html"]: try: # Resetting doc type @@ -349,8 +344,9 @@ def fetch_blocks_from_about(domain: str) -> dict: "filtered_media": [], "followers_only": [], "silenced" : [], - "nsfw" : [], + "media_nsfw" : [], "media_removal" : [], + "federated_timeline_removal": [], } logger.debug("doc[]='%s'", type(doc)) @@ -383,8 +379,20 @@ def fetch_blocks_from_about(domain: str) -> dict: logger.debug("Found block_level='%s', importing domain blocks ...", block_level) for line in header.find_next("table").find_all("tr")[1:]: logger.debug("line[]='%s'", type(line)) - blocked = tidyup.domain(line.find_all("td")[0].text) - reason = tidyup.reason(line.find_all("td")[1].text) + blocked = line.find_all("td")[0].text + reason = line.find_all("td")[1].text + + logger.debug("blocked='%s',reason='%s' - BEFORE!", blocked, reason) + blocked = tidyup.domain(blocked) if blocked != "" else None + reason = tidyup.reason(reason) if reason != "" else None + logger.debug("blocked='%s',reason='%s' - AFTER!", blocked, reason) + + if blocked in [None, ""]: + logger.debug("domain='%s',block_level='%s': blocked='%s' is empty - SKIPPED!", domain, block_level, blocked) + continue + elif not domain_helper.is_wanted(blocked): + logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked) + continue logger.debug("Appending block_level='%s',blocked='%s',reason='%s' ...", block_level, blocked, reason) blocklist[block_level].append({