import logging
import numpy
import time
-
-from urllib.parse import urlparse
+import urllib
import argparse
import atoma
logger.debug("Invoking locking.acquire() ...")
locking.acquire()
- components = urlparse(args.feed)
- domain = components.netloc.lower().split(":")[0]
+ components = urllib.parse.urlparse(args.feed)
+ hostname = components.netloc.lower().split(":")[0]
- logger.debug("domain='%s'", domain)
- if sources.is_recent(domain):
- logger.info("API from domain='%s' has recently being accessed - EXIT!", domain)
+ logger.debug("hostname='%s'", hostname)
+ if sources.is_recent(hostname):
+ logger.info("API from hostname='%s' has recently being accessed - EXIT!", hostname)
return 0
else:
- logger.debug("domain='%s' has not been recently used, marking ...", domain)
- sources.update(domain)
+ logger.debug("hostname='%s' has not been recently used, marking ...", hostname)
+ sources.update(hostname)
logger.info("Fetch FBA-specific RSS args.feed='%s' ...", args.feed)
response = network.fetch_url(
for item in rss.items:
logger.debug("item[%s]='%s'", type(item), item)
domain = item.link.split("=")[1]
+
+ logger.debug("domain='%s',tidyup - BEFORE!", domain)
domain = tidyup.domain(domain) if domain not in[None, ""] else None
- logger.debug("domain='%s' - AFTER!", domain)
+ logger.debug("domain='%s',tidyup - AFTER!", domain)
if domain in [None, ""]:
logger.debug("domain[%s]='%s' is empty after tidyup.domain() - SKIPPED!", type(domain), domain)
continue
+ elif not domain_helper.is_wanted(domain):
+ logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
+ continue
- logger.debug("domain='%s' - BEFORE!", domain)
+ logger.debug("domain='%s',idna - BEFORE!", domain)
domain = domain_helper.encode_idna(domain)
- logger.debug("domain='%s' - AFTER!", domain)
+ logger.debug("domain='%s',idna - AFTER!", domain)
if not domain_helper.is_wanted(domain):
logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
if args.feed is not None and validators.url(args.feed):
logger.debug("Setting feed='%s' ...", args.feed)
feed = str(args.feed)
- source_domain = urlparse(args.feed).netloc
+ source_domain = urllib.parse.urlparse(args.feed).netloc
if sources.is_recent(source_domain):
logger.info("API from source_domain='%s' has recently being accessed - EXIT!", source_domain)
continue
logger.debug("row[url]='%s' - BEFORE!", row["url"])
- domain = urlparse(row["url"]).netloc.lower().split(":")[0]
+ domain = urllib.parse.urlparse(row["url"]).netloc.lower().split(":")[0]
logger.debug("domain='%s' - AFTER!", domain)
if domain in [None, ""]:
logger.debug("Invoking locking.acquire() ...")
locking.acquire()
+ # Init domain list
+ domains = []
+
if args.domain not in [None, ""]:
logger.debug("Fetching instances record for args.domain='%s' ...", args.domain)
database.cursor.execute("SELECT domain, software, nodeinfo_url FROM instances WHERE software IN ('activityrelay', 'aoderelay', 'selective-relay', 'pub-relay') AND domain = ? LIMIT 1", [args.domain])
logger.debug("Fetch all relay instances ...")
database.cursor.execute("SELECT domain, software, nodeinfo_url FROM instances WHERE software IN ('activityrelay', 'aoderelay', 'selective-relay', 'pub-relay') AND nodeinfo_url IS NOT NULL ORDER BY last_updated DESC")
- domains = []
rows = database.cursor.fetchall()
logger.info("Checking %d relays ...", len(rows))
logger.debug("row[domain]='%s' has recently been fetched - SKIPPED!", row["domain"])
continue
+ # Init variables
peers = []
+ doc = None
+
try:
logger.debug("row[domain]='%s',row[software]='%s' - checking ....", row["domain"], row["software"])
if row["software"] == "pub-relay":
logger.warning("tag[%s]='%s' is not type of 'bs4.element.Tag' - SKIPPED!", type(tag), tag)
continue
- components = urlparse(link.get("href"))
+ components = urllib.parse.urlparse(link.get("href"))
logger.debug("components(%d)='%s'", len(components), components)
domain = components.netloc.lower().split(":")[0]
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import logging
+import urllib
from functools import lru_cache
-from urllib.parse import urlparse
import validators
punycode = encode_idna(domain)
logger.debug("punycode='%s'", punycode)
- components = urlparse(url)
+ components = urllib.parse.urlparse(url)
logger.debug("components[]='%s',punycode='%s'", type(components), punycode)
is_found = (punycode in [components.netloc, components.hostname])
logger.debug("punycode='%s' - AFTER!", punycode)
if "/" in punycode:
- components = urlparse("https://" + punycode)
+ components = urllib.parse.urlparse("https://" + punycode)
logger.debug("components[%s](%d)='%s'", type(components), len(components), components)
punycode = components.netloc.encode("idna").decode("utf-8") + components.path
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import logging
-
-from urllib.parse import urlparse
+import urllib
import bs4
import requests
logger.warning("instance[url]='%s' is not a valid URL - SKIPPED!", instance["url"])
continue
- components = urlparse(instance["url"])
+ components = urllib.parse.urlparse(instance["url"])
logger.debug("components[%s]()=%d", type(components), len(components))
instance = components.netloc.lower().split(":")[0]
elif not domain_helper.is_in_url(domain, response_url):
logger.warning("domain='%s' doesn't match response.url='%s', maybe redirect to other domain?", domain, response.url)
- components = urlparse(response.url)
+ components = urllib.parse.urlparse(response.url)
domain2 = components.netloc.lower().split(":")[0]
logger.debug("domain2='%s'", domain2)
import csv
import logging
import time
-
-from urllib.parse import urlparse
+import urllib
import eventlet
import reqto
raise TypeError(f"Parameter allow_redirects[]='{type(allow_redirects)}' has not expected type 'bool'")
logger.debug("Parsing url='%s' ...", url)
- components = urlparse(url)
+ components = urllib.parse.urlparse(url)
# Invoke other function, avoid trailing ?
logger.debug("components[%s]='%s'", type(components), components)
logger.debug("Fetching path='%s' from netloc='%s' ...", components.path, components.netloc)
response = _fetch_response(
components.netloc.split(":")[0],
- components.path if isinstance(components.path, str) and components.path != '' else '/',
+ components.path if isinstance(components.path, str) and components.path != "" else "/",
headers=headers,
timeout=timeout,
allow_redirects=allow_redirects
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import logging
+import urllib
import validators
-from urllib.parse import urlparse
-
from fba.helpers import blacklist
from fba.helpers import config
from fba.helpers import domain as domain_helper
url = link["href"].lower()
logger.debug("Parsing url='%s' ...", url)
- components = urlparse(url)
+ components = urllib.parse.urlparse(url)
logger.debug("components[%s]='%s'", type(components), components)
if components.scheme == "" and components.netloc == "":
logger.warning("link[href]='%s' has no scheme and host name in it, prepending from domain='%s'", link['href'], domain)
url = f"https://{domain}{url}"
- components = urlparse(url)
+ components = urllib.parse.urlparse(url)
elif components.netloc == "":
logger.warning("link[href]='%s' has no netloc set, setting domain='%s'", link["href"], domain)
url = f"{components.scheme}://{domain}{components.path}"
- components = urlparse(url)
+ components = urllib.parse.urlparse(url)
domain2 = components.netloc.lower().split(":")[0]
logger.debug("domain2='%s'", domain2)