# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
-from fba import config
-from fba import federation
-from fba import instances
-from fba import network
+import inspect
+import logging
+
+import bs4
+
+from fba import csrf
+from fba import database
+from fba import utils
+
+from fba.helpers import config
+from fba.helpers import domain as domain_helper
+from fba.helpers import tidyup
+
+from fba.http import federation
+from fba.http import network
+
+from fba.models import blocks
+from fba.models import instances
+
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
def fetch_peers(domain: str) -> list:
- # DEBUG: print(f"DEBUG: domain({len(domain)})={domain},software='lemmy' - CALLED!")
- if not isinstance(domain, str):
- raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
- elif domain == "":
- raise ValueError("Parameter 'domain' is empty")
+ logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
+ domain_helper.raise_on(domain)
peers = list()
+
+ # No CSRF by default, you don't have to add network.api_headers by yourself here
+ headers = tuple()
+
try:
- # DEBUG: print(f"DEBUG: domain='{domain}' is Lemmy, fetching JSON ...")
- response = network.fetch_response(
+ logger.debug("Checking CSRF for domain='%s'", domain)
+ headers = csrf.determine(domain, dict())
+ except network.exceptions as exception:
+ logger.warning("Exception '%s' during checking CSRF (fetch_peers,%s) - EXIT!", type(exception), __name__)
+ instances.set_last_error(domain, exception)
+ return peers
+
+ try:
+ logger.debug("Fetching '/api/v3/site' from domain='%s' ...", domain)
+ data = network.get_json_api(
domain,
"/api/v3/site",
- network.api_headers,
+ headers,
(config.get("connection_timeout"), config.get("read_timeout"))
)
- data = network.json_from_response(response)
-
- # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code='{response.status_code}',data[]='{type(data)}'")
- if not response.ok or response.status_code >= 400:
- print("WARNING: Could not reach any JSON API:", domain)
- instances.update_last_error(domain, response)
- elif response.ok and isinstance(data, list):
- print(f"UNSUPPORTED: domain='{domain}' returned a list: '{data}'")
- elif "federated_instances" in data:
- # DEBUG: print(f"DEBUG: Found federated_instances for domain='{domain}'")
- peers = peers + federation.add_peers(data["federated_instances"])
- # DEBUG: print("DEBUG: Added instance(s) to peers")
+ logger.debug("data[]='%s'", type(data))
+ if "error_message" in data:
+ logger.warning("Could not reach any JSON API: domain='%s'", domain)
+ instances.set_last_error(domain, data)
+ elif "federated_instances" in data["json"] and isinstance(data["json"]["federated_instances"], dict):
+ logger.debug("Found federated_instances for domain='%s'", domain)
+ peers = peers + federation.add_peers(data["json"]["federated_instances"])
+ logger.debug("Added instance(s) to peers")
else:
- print("WARNING: JSON response does not contain 'federated_instances':", domain)
- instances.update_last_error(domain, response)
-
- except BaseException as exception:
- print(f"WARNING: Exception during fetching JSON: domain='{domain}',exception[{type(exception)}]:'{str(exception)}'")
+ logger.warning("JSON response does not contain 'federated_instances', domain='%s'", domain)
+ instances.set_last_error(domain, data)
- # DEBUG: print(f"DEBUG: Adding '{len(peers)}' for domain='{domain}'")
- instances.set_data("total_peers", domain, len(peers))
+ except network.exceptions as exception:
+ logger.warning("Exception during fetching JSON: domain='%s',exception[%s]:'%s'", domain, type(exception), str(exception))
+ instances.set_last_error(domain, exception)
- # DEBUG: print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...")
- instances.update_last_instance_fetch(domain)
+ logger.debug("Adding %d for domain='%s'", len(peers), domain)
+ instances.set_total_peers(domain, peers)
- # DEBUG: print("DEBUG: Returning peers[]:", type(peers))
+ logger.debug("peers()=%d - EXIT!", len(peers))
return peers
+
+def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
+ logger.debug("domain='%s,nodeinfo_url='%s' - CALLED!", domain, nodeinfo_url)
+ domain_helper.raise_on(domain)
+
+ if not isinstance(nodeinfo_url, str):
+ raise ValueError(f"Parameter nodeinfo_url[]='{type(nodeinfo_url)}' is not 'str'")
+ elif nodeinfo_url == "":
+ raise ValueError("Parameter 'nodeinfo_url' is empty")
+
+ translations = [
+ "Blocked Instances",
+ "Instàncies bloquejades",
+ "Blocáilte Ásc",
+ "封锁实例",
+ "Blokované instance",
+ "Geblokkeerde instanties",
+ "Blockerade instanser",
+ "Instàncias blocadas",
+ "Istanze bloccate",
+ "Instances bloquées",
+ "Letiltott példányok",
+ "Instancias bloqueadas",
+ "Blokeatuta dauden instantziak",
+ "차단된 인스턴스",
+ "Peladen Yang Diblokir",
+ "Blokerede servere",
+ "Blokitaj nodoj",
+ "Блокирани Инстанции",
+ "Blockierte Instanzen",
+ "Estetyt instanssit",
+ "Instâncias bloqueadas",
+ "Zablokowane instancje",
+ "Blokované inštancie",
+ "المثلاء المحجوبون",
+ "Užblokuoti serveriai",
+ "ブロックしたインスタンス",
+ "Блокированные Инстансы",
+ "Αποκλεισμένοι διακομιστές",
+ "封鎖站台",
+ "Instâncias bloqueadas",
+ ]
+
+ blocklist = list()
+
+ try:
+ # json endpoint for newer mastodongs
+ logger.debug("Fetching /instances from domain='%s'", domain)
+ response = network.fetch_response(
+ domain,
+ "/instances",
+ network.web_headers,
+ (config.get("connection_timeout"), config.get("read_timeout"))
+ )
+
+ logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
+ if response.ok and response.status_code < 300 and response.text != "":
+ logger.debug("Parsing %s Bytes ...", len(response.text))
+
+ doc = bs4.BeautifulSoup(response.text, "html.parser")
+ logger.debug("doc[]='%s'", type(doc))
+
+ headers = doc.findAll("h5")
+ found = None
+ logger.debug("Search in %d header(s) ...", len(headers))
+ for header in headers:
+ logger.debug("header[]='%s'", type(header))
+ content = header.contents[0]
+
+ logger.debug("content[%s]='%s'", type(content), content)
+ if content in translations:
+ logger.debug("Found header with blocked instances - BREAK!")
+ found = header
+ break
+
+ logger.debug("found[]='%s'", type(found))
+ if found is None:
+ logger.debug("domain='%s' is not blocking any instances - EXIT!", domain)
+ return blocklist
+
+ blocking = found.find_next("ul").findAll("a")
+ logger.debug("Found %d blocked instance(s) ...", len(blocking))
+ for tag in blocking:
+ logger.debug("tag[]='%s'", type(tag))
+ blocked = tidyup.domain(tag.contents[0])
+ logger.debug("blocked='%s'", blocked)
+
+ if not utils.is_domain_wanted(blocked):
+ logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
+ continue
+
+ logger.debug("Appending blocker='%s',blocked='%s',block_level='reject'", domain, blocked)
+ blocklist.append({
+ "blocker" : domain,
+ "blocked" : blocked,
+ "reason" : None,
+ "block_level": "reject",
+ })
+
+ except network.exceptions as exception:
+ logger.warning("domain='%s',exception[%s]:'%s'", domain, type(exception), str(exception))
+ instances.set_last_error(domain, exception)
+
+ logger.debug("blocklist()=%d - EXIT!", len(blocklist))
+ return blocklist