X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=fba%2Fnetworks%2Flemmy.py;h=bfe1e805f0eed78685d3fa7669de0bb966aca2ac;hb=9584067c49bcf277009aaf41f52c19f6425fd9ec;hp=c0b6afec27b2e685b8e526757241eed95acc9ae3;hpb=6561b3911cb701d653a39b474d9551297facc176;p=fba.git diff --git a/fba/networks/lemmy.py b/fba/networks/lemmy.py index c0b6afe..bfe1e80 100644 --- a/fba/networks/lemmy.py +++ b/fba/networks/lemmy.py @@ -19,6 +19,7 @@ import logging import bs4 +from fba.helpers import blacklist from fba.helpers import config from fba.helpers import domain as domain_helper from fba.helpers import tidyup @@ -33,10 +34,47 @@ logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) #logger.setLevel(logging.DEBUG) +# Lemmy translations +translations = [ + "Blocked Instances".lower(), + "Instàncies bloquejades".lower(), + "Blocáilte Ásc".lower(), + "封锁实例".lower(), + "Blokované instance".lower(), + "Geblokkeerde instanties".lower(), + "Blockerade instanser".lower(), + "Instàncias blocadas".lower(), + "Istanze bloccate".lower(), + "Instances bloquées".lower(), + "Letiltott példányok".lower(), + "Instancias bloqueadas".lower(), + "Blokeatuta dauden instantziak".lower(), + "차단된 인스턴스".lower(), + "Peladen Yang Diblokir".lower(), + "Blokerede servere".lower(), + "Blokitaj nodoj".lower(), + "Блокирани Инстанции".lower(), + "Blockierte Instanzen".lower(), + "Estetyt instanssit".lower(), + "Instâncias bloqueadas".lower(), + "Zablokowane instancje".lower(), + "Blokované inštancie".lower(), + "المثلاء المحجوبون".lower(), + "Užblokuoti serveriai".lower(), + "ブロックしたインスタンス".lower(), + "Блокированные Инстансы".lower(), + "Αποκλεισμένοι διακομιστές".lower(), + "封鎖站台".lower(), + "Instâncias bloqueadas".lower(), +] + def fetch_peers(domain: str, origin: str) -> list: logger.debug("domain='%s',origin='%s' - CALLED!", domain, origin) domain_helper.raise_on(domain) + if blacklist.is_blacklisted(domain): + raise Exception(f"domain='{domain}' is blacklisted but function is invoked.") + peers = list() # No CSRF by default, you don't have to add network.api_headers by yourself here @@ -87,38 +125,10 @@ def fetch_blocks(domain: str) -> list: logger.debug("domain='%s - CALLED!", domain) domain_helper.raise_on(domain) - translations = [ - "Blocked Instances".lower(), - "Instàncies bloquejades".lower(), - "Blocáilte Ásc".lower(), - "封锁实例".lower(), - "Blokované instance".lower(), - "Geblokkeerde instanties".lower(), - "Blockerade instanser".lower(), - "Instàncias blocadas".lower(), - "Istanze bloccate".lower(), - "Instances bloquées".lower(), - "Letiltott példányok".lower(), - "Instancias bloqueadas".lower(), - "Blokeatuta dauden instantziak".lower(), - "차단된 인스턴스".lower(), - "Peladen Yang Diblokir".lower(), - "Blokerede servere".lower(), - "Blokitaj nodoj".lower(), - "Блокирани Инстанции".lower(), - "Blockierte Instanzen".lower(), - "Estetyt instanssit".lower(), - "Instâncias bloqueadas".lower(), - "Zablokowane instancje".lower(), - "Blokované inštancie".lower(), - "المثلاء المحجوبون".lower(), - "Užblokuoti serveriai".lower(), - "ブロックしたインスタンス".lower(), - "Блокированные Инстансы".lower(), - "Αποκλεισμένοι διακομιστές".lower(), - "封鎖站台".lower(), - "Instâncias bloqueadas".lower(), - ] + if blacklist.is_blacklisted(domain): + raise Exception(f"domain='{domain}' is blacklisted but function is invoked.") + elif not instances.is_registered(domain): + raise Exception(f"domain='{domain}' is not registered but function is invoked.") blocklist = list() @@ -154,7 +164,7 @@ def fetch_blocks(domain: str) -> list: content = str(header.contents[0]) logger.debug("content[%s]='%s' - AFTER!", type(content), content) - if content is None: + if content is None or content == "": logger.debug("domain='%s' has returned empty header='%s' - SKIPPED!", domain, header) continue elif not isinstance(content, str): @@ -197,12 +207,15 @@ def fetch_blocks(domain: str) -> list: logger.debug("Found %d blocked instance(s) ...", len(blocking)) for tag in blocking: logger.debug("tag[]='%s'", type(tag)) - blocked = tidyup.domain(tag.contents[0]) + blocked = tidyup.domain(tag.contents[0]) if tag.contents[0] != "" else None logger.debug("blocked='%s'", blocked) - if blocked == "": + if blocked is None or blocked == "": logger.warning("blocked='%s' is empty after tidyup.domain() - SKIPPED!", tag.contents[0]) continue + elif not domain_helper.is_wanted(blocked): + logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked) + continue logger.debug("Appending blocker='%s',blocked='%s',block_level='reject' ...", domain, blocked) blocklist.append({ @@ -226,6 +239,9 @@ def fetch_instances(domain: str, origin: str) -> list: logger.debug("domain='%s',origin='%s' - CALLED!", domain, origin) domain_helper.raise_on(domain) + if blacklist.is_blacklisted(domain): + raise Exception(f"domain='{domain}' is blacklisted but function is invoked.") + peers = list() try: @@ -258,11 +274,16 @@ def fetch_instances(domain: str, origin: str) -> list: for tag in rows: logger.debug("tag[]='%s'", type(tag)) text = tag.contents[0] if isinstance(tag.contents[0], str) else tag.contents[0].text - peer = tidyup.domain(text) - logger.debug("peer='%s'", peer) + logger.debug("text='%s' - BEFORE!", text) - if peer == "": - logger.debug("peer is empty - SKIPPED!") + peer = tidyup.domain(text) if text != "" else None + logger.debug("peer='%s' - AFTER", peer) + + if peer is None or peer == "": + logger.warning("peer='%s' is empty, text='%s' - SKIPPED!", peer, text) + continue + elif not domain_helper.is_wanted(peer): + logger.debug("peer='%s' is not wanted - SKIPPED!", peer) continue elif peer in peers: logger.debug("peer='%s' already added - SKIPPED!", peer) @@ -291,6 +312,7 @@ def fetch_instances(domain: str, origin: str) -> list: def parse_script(doc: bs4.BeautifulSoup, only: str = None) -> list: logger.debug("doc[]='%s',only='%s' - CALLED!") + if not isinstance(doc, bs4.BeautifulSoup): raise ValueError(f"Parameter doc[]='{type(only)}' is not of type 'bs4.BeautifulSoup'") elif not isinstance(only, str) and only is not None: @@ -357,9 +379,11 @@ def parse_script(doc: bs4.BeautifulSoup, only: str = None) -> list: peer = tidyup.domain(row["domain"]) logger.debug("peer='%s' - AFTER!", peer) - if peer == "": - logger.debug("peer is empty - SKIPPED!") + if peer is None or peer == "": + logger.warning("peer='%s' is empty, row[domain]='%s' - SKIPPED!", peer, row["domain"]) continue + elif not domain_helper.is_wanted(peer): + logger.debug("peer='%s' is not wanted - SKIPPED!", peer) elif peer in peers: logger.debug("peer='%s' already added - SKIPPED!", peer) continue