X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;ds=sidebyside;f=fba%2Fnetworks%2Fmastodon.py;h=be22e94f9eba52f0d413ad2503733fc3e3623310;hb=7ce34f60c519ff20cc76b2030a91a0cefde3062b;hp=fcfbc6e4f659a839214dc079b982a8ee492f2f9f;hpb=e1be335657c92533caffa48a18f4d458150007df;p=fba.git diff --git a/fba/networks/mastodon.py b/fba/networks/mastodon.py index fcfbc6e..be22e94 100644 --- a/fba/networks/mastodon.py +++ b/fba/networks/mastodon.py @@ -15,17 +15,18 @@ # along with this program. If not, see . import logging +import validators import bs4 -from fba import csrf - +from fba.helpers import blacklist from fba.helpers import config from fba.helpers import domain as domain_helper from fba.helpers import tidyup from fba.http import network +from fba.models import blocks from fba.models import instances logging.basicConfig(level=logging.INFO) @@ -62,7 +63,12 @@ def fetch_blocks_from_about(domain: str) -> dict: logger.debug("domain='%s' - CALLED!", domain) domain_helper.raise_on(domain) - logger.debug("Fetching mastodon blocks from domain='%s'", domain) + if blacklist.is_blacklisted(domain): + raise Exception(f"domain='{domain}' is blacklisted but function is invoked.") + elif not instances.is_registered(domain): + raise Exception(f"domain='{domain}' is not registered but function is invoked.") + + logger.info("Fetching mastodon blocks from domain='%s'", domain) doc = None for path in ["/about/more", "/about"]: try: @@ -111,10 +117,24 @@ def fetch_blocks_from_about(domain: str) -> dict: if header_text in blocklist or header_text.lower() in blocklist: # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu for line in header.find_all_next("table")[0].find_all("tr")[1:]: + domain = line.find("span").text + digest = line.find("span")["title"][9:] + reason = line.find_all("td")[1].text + + logger.debug("domain='%s',reason='%s' - BEFORE!", domain, reason) + domain = tidyup.domain(domain) if domain != "" else None + reason = tidyup.reason(reason) if reason != "" else None + + logger.debug("domain='%s',reason='%s' - AFTER!", domain, reason) + if domain is None or domain == "": + logger.warning("domain='%s' is empty,line='%s' - SKIPPED!", domain, line) + continue + + logger.debug("Appending domain='%s',digest='%s',reason='%s' to blocklist header_text='%s' ...", domain, digest, reason, blocklist) blocklist[header_text].append({ - "domain": tidyup.domain(line.find("span").text), - "hash" : tidyup.domain(line.find("span")["title"][9:]), - "reason": tidyup.reason(line.find_all("td")[1].text), + "domain": domain, + "digest": digest, + "reason": reason, }) else: logger.warning("header_text='%s' not found in blocklist()=%d", header_text, len(blocklist)) @@ -126,84 +146,58 @@ def fetch_blocks_from_about(domain: str) -> dict: "followers_only": blocklist["Limited servers"] + blocklist["Silenced servers"], } -def fetch_blocks(domain: str, nodeinfo_url: str) -> list: - logger.debug("domain='%s',nodeinfo_url='%s' - CALLED!", domain, nodeinfo_url) +def fetch_blocks(domain: str) -> list: + logger.debug("domain='%s' - CALLED!", domain) domain_helper.raise_on(domain) - if not isinstance(nodeinfo_url, str): - raise ValueError(f"Parameter nodeinfo_url[]='{type(nodeinfo_url)}' is not 'str'") - elif nodeinfo_url == "": - raise ValueError("Parameter 'nodeinfo_url' is empty") + if blacklist.is_blacklisted(domain): + raise Exception(f"domain='{domain}' is blacklisted but function is invoked.") + elif not instances.is_registered(domain): + raise Exception(f"domain='{domain}' is not registered but function is invoked.") - # Init block list blocklist = list() - # No CSRF by default, you don't have to add network.api_headers by yourself here - headers = tuple() - - try: - logger.debug("Checking CSRF for domain='%s'", domain) - headers = csrf.determine(domain, dict()) - except network.exceptions as exception: - logger.warning("Exception '%s' during checking CSRF (fetch_blocks,%s) - EXIT!", type(exception), __name__) - instances.set_last_error(domain, exception) - return blocklist - - try: - # json endpoint for newer mastodongs - logger.debug("Querying API domain_blocks: domain='%s'", domain) - data = network.get_json_api( - domain, - "/api/v1/instance/domain_blocks", - headers, - (config.get("connection_timeout"), config.get("read_timeout")) - ) - - logger.debug("data[]='%s'", type(data)) - if "error_message" in data: - logger.debug("Was not able to fetch domain_blocks from domain='%s': status_code=%d,error_message='%s'", domain, data['status_code'], data['error_message']) - instances.set_last_error(domain, data) - return blocklist - elif "json" in data and "error" in data["json"]: - logger.warning("JSON API returned error message: '%s'", data['json']['error']) - instances.set_last_error(domain, data) - return blocklist - else: - # Getting blocklist - rows = data["json"] - - logger.debug("Marking domain='%s' as successfully handled ...") - instances.set_success(domain) - - if len(rows) == 0: - logger.debug("domain='%s' has returned zero rows, trying /about/more page ...", domain) - rows = fetch_blocks_from_about(domain) - - if len(rows) > 0: - logger.debug("Checking %d entries from domain='%s' ...", len(rows), domain) - for block in rows: - # Check type - logger.debug("block[]='%s'", type(block)) - if not isinstance(block, dict): - logger.debug("block[]='%s' is of type 'dict' - SKIPPED!", type(block)) - continue - - reason = tidyup.reason(block["comment"]) if "comment" in block and block["comment"] is not None and block["comment"] != "" else None - - logger.debug("Appending blocker='%s',blocked='%s',reason='%s',block_level='%s'", domain, block["domain"], reason, block["severity"]) - blocklist.append({ - "blocker" : domain, - "blocked" : block["domain"], - "hash" : block["digest"], - "reason" : reason, - "block_level": block["severity"] - }) - else: - logger.debug("domain='%s' has no block list") - - except network.exceptions as exception: - logger.warning("domain='%s',exception[%s]='%s'", domain, type(exception), str(exception)) - instances.set_last_error(domain, exception) + logger.debug("Invoking fetch_blocks_from_about(%s) ...", domain) + rows = fetch_blocks_from_about(domain) + + logger.debug("rows[%s]()=%d", type(rows), len(rows)) + if len(rows) > 0: + logger.debug("Checking %d entries from domain='%s' ...", len(rows), domain) + for block in rows: + # Check type + logger.debug("block[]='%s'", type(block)) + if not isinstance(block, dict): + logger.debug("block[]='%s' is of type 'dict' - SKIPPED!", type(block)) + continue + elif "domain" not in block: + logger.debug("block='%s'", block) + logger.warning("block()=%d does not contain element 'domain' - SKIPPED!", len(block)) + continue + elif not domain_helper.is_wanted(block["domain"]): + logger.debug("block[domain]='%s' is not wanted - SKIPPED!", block["domain"]) + continue + elif "severity" not in block: + logger.warning("block()=%d does not contain element 'severity' - SKIPPED!", len(block)) + continue + elif block["severity"] in ["accept", "accepted"]: + logger.debug("block[domain]='%s' has unwanted severity level '%s' - SKIPPED!", block["domain"], block["severity"]) + continue + elif "digest" in block and not validators.hashes.sha256(block["digest"]): + logger.warning("block[domain]='%s' has invalid block[digest]='%s' - SKIPPED!", block["domain"], block["digest"]) + continue + + reason = tidyup.reason(block["comment"]) if "comment" in block and block["comment"] is not None and block["comment"] != "" else None + + logger.debug("Appending blocker='%s',blocked='%s',reason='%s',block_level='%s'", domain, block["domain"], reason, block["severity"]) + blocklist.append({ + "blocker" : domain, + "blocked" : block["domain"], + "digest" : block["digest"] if "digest" in block else None, + "reason" : reason, + "block_level": blocks.alias_block_level(block["severity"]), + }) + else: + logger.debug("domain='%s' has no block list", domain) logger.debug("blocklist()=%d - EXIT!", len(blocklist)) return blocklist