]> git.mxchange.org Git - fba.git/blob - fba/http/csrf.py
b84d5aaf6c7bb5a36a89158917655c88e6ec7570
[fba.git] / fba / http / csrf.py
1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17 import logging
18
19 import bs4
20 import reqto
21 import requests
22
23 from fba.helpers import blacklist
24 from fba.helpers import config
25 from fba.helpers import cookies
26 from fba.helpers import domain as domain_helper
27
28 from fba.http import network
29
30 from fba.models import instances
31
32 logging.basicConfig(level=logging.INFO)
33 logger = logging.getLogger(__name__)
34
35 def determine(domain: str, headers: dict) -> dict:
36     logger.debug("domain='%s',headers()=%d - CALLED!", domain, len(headers))
37     domain_helper.raise_on(domain)
38
39     if blacklist.is_blacklisted(domain):
40         raise Exception(f"domain='{domain}' is blacklisted but function is invoked.")
41     elif not isinstance(headers, dict):
42         raise ValueError(f"Parameter headers[]='{type(headers)}' is not of type 'dict'")
43
44     # Default headers with no CSRF
45     reqheaders = headers
46
47     # Fetch / to check for meta tag indicating csrf
48     logger.debug("Fetching / from domain='%s' for CSRF check ...", domain)
49     response = network.fetch_response(
50         domain,
51         "/",
52         headers=network.web_headers,
53         timeout=(config.get("connection_timeout"), config.get("read_timeout"))
54     )
55
56     logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
57     if response.ok and response.status_code == 200 and response.text.strip() != "" and response.text.find("<html") > 0 and domain_helper.is_in_url(domain, response.url):
58         # Save cookies
59         logger.debug("Parsing response.text()=%d Bytes ...", len(response.text))
60         cookies.store(domain, response.cookies.get_dict())
61
62         # Parse text
63         meta = bs4.BeautifulSoup(
64             response.text,
65             "html.parser"
66         )
67         logger.debug("meta[]='%s'", type(meta))
68         tag = meta.find("meta", attrs={"name": "csrf-token"})
69
70         logger.debug("tag[%s]='%s'", type(tag), tag)
71         if tag is not None:
72             logger.debug("Adding CSRF token='%s' for domain='%s'", tag["content"], domain)
73             reqheaders["X-CSRF-Token"] = tag["content"]
74     elif not domain_helper.is_in_url(domain, response.url):
75         logger.warning("domain='%s' doesn't match with response.url='%s', maybe redirect to other domain?", domain, response.url)
76
77         message = f"Redirect from domain='{domain}' to response.url='{response.url}'"
78         instances.set_last_error(domain, message)
79         raise requests.exceptions.TooManyRedirects(message)
80
81     logger.debug("reqheaders()=%d - EXIT!", len(reqheaders))
82     return reqheaders