# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
+import json
import logging
import bs4
-from fba import csrf
-from fba import utils
-
+from fba.helpers import blacklist
from fba.helpers import config
from fba.helpers import domain as domain_helper
from fba.helpers import tidyup
+from fba.http import csrf
from fba.http import federation
from fba.http import network
logger = logging.getLogger(__name__)
#logger.setLevel(logging.DEBUG)
+# Lemmy translations
+translations = [
+ "Blocked Instances".lower(),
+ "Instàncies bloquejades".lower(),
+ "Blocáilte Ásc".lower(),
+ "封锁实例".lower(),
+ "Blokované instance".lower(),
+ "Geblokkeerde instanties".lower(),
+ "Blockerade instanser".lower(),
+ "Instàncias blocadas".lower(),
+ "Istanze bloccate".lower(),
+ "Instances bloquées".lower(),
+ "Letiltott példányok".lower(),
+ "Instancias bloqueadas".lower(),
+ "Blokeatuta dauden instantziak".lower(),
+ "차단된 인스턴스".lower(),
+ "Peladen Yang Diblokir".lower(),
+ "Blokerede servere".lower(),
+ "Blokitaj nodoj".lower(),
+ "Блокирани Инстанции".lower(),
+ "Blockierte Instanzen".lower(),
+ "Estetyt instanssit".lower(),
+ "Instâncias bloqueadas".lower(),
+ "Zablokowane instancje".lower(),
+ "Blokované inštancie".lower(),
+ "المثلاء المحجوبون".lower(),
+ "Užblokuoti serveriai".lower(),
+ "ブロックしたインスタンス".lower(),
+ "Блокированные Инстансы".lower(),
+ "Αποκλεισμένοι διακομιστές".lower(),
+ "封鎖站台".lower(),
+ "Instâncias bloqueadas".lower(),
+]
+
def fetch_peers(domain: str, origin: str) -> list:
logger.debug("domain='%s',origin='%s' - CALLED!", domain, origin)
domain_helper.raise_on(domain)
+ if blacklist.is_blacklisted(domain):
+ raise Exception(f"domain='{domain}' is blacklisted but function is invoked.")
+
peers = list()
# No CSRF by default, you don't have to add network.api_headers by yourself here
logger.debug("Checking CSRF for domain='%s'", domain)
headers = csrf.determine(domain, dict())
except network.exceptions as exception:
- logger.warning("Exception '%s' during checking CSRF (fetch_peers,%s) - EXIT!", type(exception), __name__)
+ logger.warning("Exception '%s' during checking CSRF (fetch_peers,%s)", type(exception), __name__)
instances.set_last_error(domain, exception)
+
+ logger.debug("Returning empty list ... - EXIT!")
return list()
try:
logger.debug("Found federated_instances for domain='%s'", domain)
peers = peers + federation.add_peers(data["json"]["federated_instances"])
- logger.debug("Marking domain='%s' as successfully handled ...")
+ logger.debug("Marking domain='%s' as successfully handled ...", domain)
instances.set_success(domain)
if len(peers) == 0:
logger.debug("peers()=%d - EXIT!", len(peers))
return peers
-def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
- logger.debug("domain='%s,nodeinfo_url='%s' - CALLED!", domain, nodeinfo_url)
+def fetch_blocks(domain: str) -> list:
+ logger.debug("domain='%s - CALLED!", domain)
domain_helper.raise_on(domain)
- if not isinstance(nodeinfo_url, str):
- raise ValueError(f"Parameter nodeinfo_url[]='{type(nodeinfo_url)}' is not 'str'")
- elif nodeinfo_url == "":
- raise ValueError("Parameter 'nodeinfo_url' is empty")
-
- translations = [
- "Blocked Instances".lower(),
- "Instàncies bloquejades".lower(),
- "Blocáilte Ásc".lower(),
- "封锁实例".lower(),
- "Blokované instance".lower(),
- "Geblokkeerde instanties".lower(),
- "Blockerade instanser".lower(),
- "Instàncias blocadas".lower(),
- "Istanze bloccate".lower(),
- "Instances bloquées".lower(),
- "Letiltott példányok".lower(),
- "Instancias bloqueadas".lower(),
- "Blokeatuta dauden instantziak".lower(),
- "차단된 인스턴스".lower(),
- "Peladen Yang Diblokir".lower(),
- "Blokerede servere".lower(),
- "Blokitaj nodoj".lower(),
- "Блокирани Инстанции".lower(),
- "Blockierte Instanzen".lower(),
- "Estetyt instanssit".lower(),
- "Instâncias bloqueadas".lower(),
- "Zablokowane instancje".lower(),
- "Blokované inštancie".lower(),
- "المثلاء المحجوبون".lower(),
- "Užblokuoti serveriai".lower(),
- "ブロックしたインスタンス".lower(),
- "Блокированные Инстансы".lower(),
- "Αποκλεισμένοι διακομιστές".lower(),
- "封鎖站台".lower(),
- "Instâncias bloqueadas".lower(),
- ]
+ if blacklist.is_blacklisted(domain):
+ raise Exception(f"domain='{domain}' is blacklisted but function is invoked.")
+ elif not instances.is_registered(domain):
+ raise Exception(f"domain='{domain}' is not registered but function is invoked.")
blocklist = list()
)
logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
- if response.ok and response.status_code < 300 and response.text != "":
+ if response.ok and response.status_code == 200 and response.text != "":
logger.debug("Parsing %s Bytes ...", len(response.text))
doc = bs4.BeautifulSoup(response.text, "html.parser")
logger.debug("doc[]='%s'", type(doc))
- headers = doc.findAll("h5")
found = None
- logger.debug("Search in %d header(s) ...", len(headers))
- for header in headers:
- logger.debug("header[]='%s'", type(header))
- content = header.contents[0]
-
- logger.debug("content[%s]='%s'", type(content), content)
- if content is None:
- logger.debug("domain='%s' has returned empty header='%s' - SKIPPED!", domain, header)
- continue
- elif not isinstance(content, str):
- logger.debug("content[]='%s' is not supported/wanted type 'str' - SKIPPED!", type(content))
- continue
- elif content.lower() in translations:
- logger.debug("Found header with blocked instances - BREAK!")
- found = header
+ for criteria in [{"class": "home-instances container-lg"}, {"class": "container"}]:
+ logger.debug("criteria='%s'", criteria)
+ containers = doc.findAll("div", criteria)
+
+ logger.debug("Checking %d containers ...", len(containers))
+ for container in containers:
+ logger.debug("container[]='%s'", type(container))
+ for header in container.find_all(["h2", "h3", "h4", "h5"]):
+ content = header
+ logger.debug("header[%s]='%s' - BEFORE!", type(header), header)
+ if header is not None:
+ content = str(header.contents[0])
+ logger.debug("content[%s]='%s' - AFTER!", type(content), content)
+
+ if content is None or content == "":
+ logger.debug("domain='%s' has returned empty header='%s' - SKIPPED!", domain, header)
+ continue
+ elif not isinstance(content, str):
+ logger.debug("content[]='%s' is not supported/wanted type 'str' - SKIPPED!", type(content))
+ continue
+ elif content.lower() in translations:
+ logger.debug("Found header='%s' with blocked instances - BREAK(3) !", header)
+ found = header
+ break
+
+ logger.debug("found[]='%s'", type(found))
+ if found is not None:
+ logger.debug("Found header with blocked instances - BREAK(2) !")
+ break
+
+ logger.debug("found[]='%s'", type(found))
+ if found is not None:
+ logger.debug("Found header with blocked instances - BREAK(1) !")
break
logger.debug("found[]='%s'", type(found))
if found is None:
- logger.debug("domain='%s' is not blocking any instances - EXIT!", domain)
+ logger.info("domain='%s' has no HTML blocklist, checking scripts ...", domain)
+ peers = parse_script(doc, "blocked")
+
+ logger.debug("domain='%s' has %d peer(s).", domain, len(peers))
+ for blocked in peers:
+ logger.debug("Appending blocker='%s',blocked='%s',block_level='reject' ...", domain, blocked)
+ blocklist.append({
+ "blocker" : domain,
+ "blocked" : blocked,
+ "reason" : None,
+ "block_level": "reject",
+ })
+
+ logger.debug("blocklist()=%d - EXIT!", len(blocklist))
return blocklist
- blocking = found.find_next(["ul","table"]).findAll("a")
+ blocking = found.find_next(["ul", "table"]).findAll("a")
logger.debug("Found %d blocked instance(s) ...", len(blocking))
for tag in blocking:
logger.debug("tag[]='%s'", type(tag))
- blocked = tidyup.domain(tag.contents[0])
+ blocked = tidyup.domain(tag.contents[0]) if tag.contents[0] != "" else None
logger.debug("blocked='%s'", blocked)
- if not utils.is_domain_wanted(blocked):
+ if blocked is None or blocked == "":
+ logger.warning("blocked='%s' is empty after tidyup.domain() - SKIPPED!", tag.contents[0])
+ continue
+ elif not domain_helper.is_wanted(blocked):
logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
- logger.debug("Appending blocker='%s',blocked='%s',block_level='reject'", domain, blocked)
+ logger.debug("Appending blocker='%s',blocked='%s',block_level='reject' ...", domain, blocked)
blocklist.append({
"blocker" : domain,
"blocked" : blocked,
"reason" : None,
"block_level": "reject",
})
+ else:
+ logger.warning("Cannot fetch /instances due to error: response.ok='%s',response.status_code=%d,response.details='%s'", response.ok, response.status_code, response.reason)
+ instances.set_last_error(domain, response)
except network.exceptions as exception:
logger.warning("domain='%s',exception[%s]:'%s'", domain, type(exception), str(exception))
logger.debug("domain='%s',origin='%s' - CALLED!", domain, origin)
domain_helper.raise_on(domain)
+ if blacklist.is_blacklisted(domain):
+ raise Exception(f"domain='{domain}' is blacklisted but function is invoked.")
+
peers = list()
try:
)
logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
- if response.ok and response.status_code < 300 and response.text != "":
+ if response.ok and response.status_code == 200 and response.text != "":
logger.debug("Parsing %s Bytes ...", len(response.text))
doc = bs4.BeautifulSoup(response.text, "html.parser")
logger.debug("doc[]='%s'", type(doc))
- headers = doc.findAll("h5")
- logger.debug("Checking %d headers ...", len(headers))
- for header in headers:
- logger.debug("header[%s]='%s'", type(header), header)
-
- rows = header.find_next(["ul","table"]).findAll("a")
- logger.debug("Found %d blocked instance(s) ...", len(rows))
- for tag in rows:
- logger.debug("tag[]='%s'", type(tag))
- text = tag.contents[0] if isinstance(tag.contents[0], str) else tag.contents[0].text
- peer = tidyup.domain(text)
- logger.debug("peer='%s'", peer)
-
- if peer == "":
- logger.debug("peer is empty - SKIPPED!")
- continue
- elif not utils.is_domain_wanted(peer):
- logger.debug("peer='%s' is not wanted - SKIPPED!", peer)
- continue
- elif peer in peers:
- logger.debug("peer='%s' already added - SKIPPED!", peer)
- continue
-
- logger.debug("Appending peer='%s' ...", peer)
- peers.append(peer)
-
- logger.debug("Marking domain='%s' as successfully handled ...")
+ for criteria in [{"class": "home-instances container-lg"}, {"class": "container"}]:
+ logger.debug("criteria='%s'", criteria)
+ containers = doc.findAll("div", criteria)
+
+ logger.debug("Checking %d containers ...", len(containers))
+ for header in containers:
+ logger.debug("header[%s]='%s'", type(header), header)
+
+ rows = header.find_next(["ul","table"]).findAll("a")
+ logger.debug("Found %d instance(s) ...", len(rows))
+ for tag in rows:
+ logger.debug("tag[]='%s'", type(tag))
+ text = tag.contents[0] if isinstance(tag.contents[0], str) else tag.contents[0].text
+ logger.debug("text='%s' - BEFORE!", text)
+
+ peer = tidyup.domain(text) if text != "" else None
+ logger.debug("peer='%s' - AFTER", peer)
+
+ if peer is None or peer == "":
+ logger.warning("peer='%s' is empty, text='%s' - SKIPPED!", peer, text)
+ continue
+ elif not domain_helper.is_wanted(peer):
+ logger.debug("peer='%s' is not wanted - SKIPPED!", peer)
+ continue
+ elif peer in peers:
+ logger.debug("peer='%s' already added - SKIPPED!", peer)
+ continue
+
+ logger.debug("Appending peer='%s' ...", peer)
+ peers.append(peer)
+
+ logger.debug("peers()=%d", len(peers))
+ if len(peers) == 0:
+ logger.debug("Found no peers for domain='%s', trying script tag ...", domain)
+ peers = parse_script(doc)
+ else:
+ logger.warning("Cannot fetch /instances due to error: response.ok='%s',response.status_code=%d,response.details='%s'", response.ok, response.status_code, response.reason)
+ instances.set_last_error(domain, response)
+
+ logger.debug("Marking domain='%s' as successfully handled, peers()=%d ...", domain, len(peers))
instances.set_success(domain)
except network.exceptions as exception:
logger.debug("peers()=%d - EXIT!", len(peers))
return peers
+
+def parse_script(doc: bs4.BeautifulSoup, only: str = None) -> list:
+ logger.debug("doc[]='%s',only='%s' - CALLED!")
+
+ if not isinstance(doc, bs4.BeautifulSoup):
+ raise ValueError(f"Parameter doc[]='{type(only)}' is not of type 'bs4.BeautifulSoup'")
+ elif not isinstance(only, str) and only is not None:
+ raise ValueError(f"Parameter only[]='{type(only)}' is not of type 'str'")
+ elif isinstance(only, str) and only == "":
+ raise ValueError("Parameter 'only' is empty")
+
+ scripts = doc.find_all("script")
+ peers = list()
+
+ logger.debug("scripts()=%d", len(scripts))
+ for script in scripts:
+ logger.debug("script[%s].contents()=%d", type(script), len(script.contents))
+ if len(script.contents) == 0:
+ logger.debug("script has no contents - SKIPPED!")
+ continue
+ elif not script.contents[0].startswith("window.isoData"):
+ logger.debug("script.contents[0]='%s' does not start with window.isoData - SKIPPED!", script.contents[0])
+ continue
+
+ logger.debug("script.contents[0][]='%s'", type(script.contents[0]))
+
+ iso_data = script.contents[0].split("=")[1].strip().replace(":undefined", ":\"undefined\"")
+ logger.debug("iso_data[%s]='%s'", type(iso_data), iso_data)
+
+ parsed = None
+ try:
+ parsed = json.loads(iso_data)
+ except json.decoder.JSONDecodeError as exception:
+ logger.warning("Exception '%s' during parsing %d Bytes: '%s' - EXIT!", type(exception), len(iso_data), str(exception))
+ return list()
+
+ logger.debug("parsed[%s]()=%d", type(parsed), len(parsed))
+
+ if "routeData" not in parsed:
+ logger.warning("parsed[%s]()=%d does not contain element 'routeData'", type(parsed), len(parsed))
+ continue
+ elif "federatedInstancesResponse" not in parsed["routeData"]:
+ logger.warning("parsed[routeData][%s]()=%d does not contain element 'federatedInstancesResponse'", type(parsed["routeData"]), len(parsed["routeData"]))
+ continue
+ elif "data" not in parsed["routeData"]["federatedInstancesResponse"]:
+ logger.warning("parsed[routeData][federatedInstancesResponse][%s]()=%d does not contain element 'data'", type(parsed["routeData"]["federatedInstancesResponse"]), len(parsed["routeData"]["federatedInstancesResponse"]))
+ continue
+ elif "federated_instances" not in parsed["routeData"]["federatedInstancesResponse"]["data"]:
+ logger.warning("parsed[routeData][federatedInstancesResponse][data][%s]()=%d does not contain element 'data'", type(parsed["routeData"]["federatedInstancesResponse"]["data"]), len(parsed["routeData"]["federatedInstancesResponse"]["data"]))
+ continue
+
+ data = parsed["routeData"]["federatedInstancesResponse"]["data"]["federated_instances"]
+ logger.debug("Checking %d data elements ...", len(data))
+ for element in data:
+ logger.debug("element='%s'", element)
+ if isinstance(only, str) and only != element:
+ logger.debug("Skipping unwanted element='%s',only='%s'", element, only)
+ continue
+
+ logger.debug("Checking data[%s]()=%d row(s) ...", element, len(data[element]))
+ for row in data[element]:
+ logger.debug("row[]='%s'", type(row))
+ if "domain" not in row:
+ logger.warning("row()=%d has no element 'domain' - SKIPPED!", len(row))
+ continue
+
+ logger.debug("row[domain]='%s' - BEFORE!", row["domain"])
+ peer = tidyup.domain(row["domain"])
+ logger.debug("peer='%s' - AFTER!", peer)
+
+ if peer is None or peer == "":
+ logger.warning("peer='%s' is empty, row[domain]='%s' - SKIPPED!", peer, row["domain"])
+ continue
+ elif not domain_helper.is_wanted(peer):
+ logger.debug("peer='%s' is not wanted - SKIPPED!", peer)
+ elif peer in peers:
+ logger.debug("peer='%s' already added - SKIPPED!", peer)
+ continue
+
+ logger.debug("Appending peer='%s' ...", peer)
+ peers.append(peer)
+
+ logger.debug("peers()=%d - EXIT!", len(peers))
+ return peers