import bs4
+from fba.helpers import blacklist
from fba.helpers import config
from fba.helpers import domain as domain_helper
from fba.helpers import tidyup
logger = logging.getLogger(__name__)
#logger.setLevel(logging.DEBUG)
+# Lemmy translations
+translations = [
+ "Blocked Instances".lower(),
+ "Instàncies bloquejades".lower(),
+ "Blocáilte Ásc".lower(),
+ "封锁实例".lower(),
+ "Blokované instance".lower(),
+ "Geblokkeerde instanties".lower(),
+ "Blockerade instanser".lower(),
+ "Instàncias blocadas".lower(),
+ "Istanze bloccate".lower(),
+ "Instances bloquées".lower(),
+ "Letiltott példányok".lower(),
+ "Instancias bloqueadas".lower(),
+ "Blokeatuta dauden instantziak".lower(),
+ "차단된 인스턴스".lower(),
+ "Peladen Yang Diblokir".lower(),
+ "Blokerede servere".lower(),
+ "Blokitaj nodoj".lower(),
+ "Блокирани Инстанции".lower(),
+ "Blockierte Instanzen".lower(),
+ "Estetyt instanssit".lower(),
+ "Instâncias bloqueadas".lower(),
+ "Zablokowane instancje".lower(),
+ "Blokované inštancie".lower(),
+ "المثلاء المحجوبون".lower(),
+ "Užblokuoti serveriai".lower(),
+ "ブロックしたインスタンス".lower(),
+ "Блокированные Инстансы".lower(),
+ "Αποκλεισμένοι διακομιστές".lower(),
+ "封鎖站台".lower(),
+ "Instâncias bloqueadas".lower(),
+]
+
def fetch_peers(domain: str, origin: str) -> list:
logger.debug("domain='%s',origin='%s' - CALLED!", domain, origin)
domain_helper.raise_on(domain)
+ if blacklist.is_blacklisted(domain):
+ raise Exception(f"domain='{domain}' is blacklisted but function is invoked.")
+
peers = list()
# No CSRF by default, you don't have to add network.api_headers by yourself here
logger.debug("domain='%s - CALLED!", domain)
domain_helper.raise_on(domain)
- translations = [
- "Blocked Instances".lower(),
- "Instàncies bloquejades".lower(),
- "Blocáilte Ásc".lower(),
- "封锁实例".lower(),
- "Blokované instance".lower(),
- "Geblokkeerde instanties".lower(),
- "Blockerade instanser".lower(),
- "Instàncias blocadas".lower(),
- "Istanze bloccate".lower(),
- "Instances bloquées".lower(),
- "Letiltott példányok".lower(),
- "Instancias bloqueadas".lower(),
- "Blokeatuta dauden instantziak".lower(),
- "차단된 인스턴스".lower(),
- "Peladen Yang Diblokir".lower(),
- "Blokerede servere".lower(),
- "Blokitaj nodoj".lower(),
- "Блокирани Инстанции".lower(),
- "Blockierte Instanzen".lower(),
- "Estetyt instanssit".lower(),
- "Instâncias bloqueadas".lower(),
- "Zablokowane instancje".lower(),
- "Blokované inštancie".lower(),
- "المثلاء المحجوبون".lower(),
- "Užblokuoti serveriai".lower(),
- "ブロックしたインスタンス".lower(),
- "Блокированные Инстансы".lower(),
- "Αποκλεισμένοι διακομιστές".lower(),
- "封鎖站台".lower(),
- "Instâncias bloqueadas".lower(),
- ]
+ if blacklist.is_blacklisted(domain):
+ raise Exception(f"domain='{domain}' is blacklisted but function is invoked.")
+ elif not instances.is_registered(domain):
+ raise Exception(f"domain='{domain}' is not registered but function is invoked.")
blocklist = list()
content = str(header.contents[0])
logger.debug("content[%s]='%s' - AFTER!", type(content), content)
- if content is None:
+ if content is None or content == "":
logger.debug("domain='%s' has returned empty header='%s' - SKIPPED!", domain, header)
continue
elif not isinstance(content, str):
logger.debug("Found %d blocked instance(s) ...", len(blocking))
for tag in blocking:
logger.debug("tag[]='%s'", type(tag))
- blocked = tidyup.domain(tag.contents[0])
+ blocked = tidyup.domain(tag.contents[0]) if tag.contents[0] != "" else None
logger.debug("blocked='%s'", blocked)
- if blocked == "":
+ if blocked is None or blocked == "":
logger.warning("blocked='%s' is empty after tidyup.domain() - SKIPPED!", tag.contents[0])
continue
+ elif not domain_helper.is_wanted(blocked):
+ logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
+ continue
logger.debug("Appending blocker='%s',blocked='%s',block_level='reject' ...", domain, blocked)
blocklist.append({
logger.debug("domain='%s',origin='%s' - CALLED!", domain, origin)
domain_helper.raise_on(domain)
+ if blacklist.is_blacklisted(domain):
+ raise Exception(f"domain='{domain}' is blacklisted but function is invoked.")
+
peers = list()
try:
for tag in rows:
logger.debug("tag[]='%s'", type(tag))
text = tag.contents[0] if isinstance(tag.contents[0], str) else tag.contents[0].text
- peer = tidyup.domain(text)
- logger.debug("peer='%s'", peer)
+ logger.debug("text='%s' - BEFORE!", text)
- if peer == "":
- logger.debug("peer is empty - SKIPPED!")
+ peer = tidyup.domain(text) if text != "" else None
+ logger.debug("peer='%s' - AFTER", peer)
+
+ if peer is None or peer == "":
+ logger.warning("peer='%s' is empty, text='%s' - SKIPPED!", peer, text)
+ continue
+ elif not domain_helper.is_wanted(peer):
+ logger.debug("peer='%s' is not wanted - SKIPPED!", peer)
continue
elif peer in peers:
logger.debug("peer='%s' already added - SKIPPED!", peer)
def parse_script(doc: bs4.BeautifulSoup, only: str = None) -> list:
logger.debug("doc[]='%s',only='%s' - CALLED!")
+
if not isinstance(doc, bs4.BeautifulSoup):
raise ValueError(f"Parameter doc[]='{type(only)}' is not of type 'bs4.BeautifulSoup'")
elif not isinstance(only, str) and only is not None:
peer = tidyup.domain(row["domain"])
logger.debug("peer='%s' - AFTER!", peer)
- if peer == "":
- logger.debug("peer is empty - SKIPPED!")
+ if peer is None or peer == "":
+ logger.warning("peer='%s' is empty, row[domain]='%s' - SKIPPED!", peer, row["domain"])
continue
+ elif not domain_helper.is_wanted(peer):
+ logger.debug("peer='%s' is not wanted - SKIPPED!", peer)
elif peer in peers:
logger.debug("peer='%s' already added - SKIPPED!", peer)
continue