"followers_only": blocks["suspended"]
}
+def tidyup_reason(reason: str) -> str:
+ # DEBUG: print(f"DEBUG: reason='{reason}' - CALLED!")
+ if type(reason) != str:
+ raise ValueError(f"Parameter reason[]={type(reason)} is not expected")
+
+ # Strip string
+ reason = reason.strip()
+
+ # Replace â with "
+ reason = re.sub("â", "\"", reason)
+
+ #print(f"DEBUG: reason='{reason}' - EXIT!")
+ return reason
+
def tidyup_domain(domain: str) -> str:
# DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!")
if type(domain) != str:
import validators
from fba import *
-boot.acquire_lock()
-
domains = list()
try:
fetched = fba.post_json_api("gql.api.bka.li", "/v1/graphql", json.dumps({
print(f"ERROR: Cannot fetch graphql,exception[{type(e)}]:'{str(e)}'")
sys.exit(255)
-# Show domains
# DEBUG: print(f"DEBUG: domains()={len(domains)}")
if len(domains) > 0:
+ boot.acquire_lock()
+
print(f"INFO: Adding {len(domains)} new instances ...")
for domain in domains:
print(f"INFO: Fetching instances from domain='{domain}' ...")
# DEBUG: print(f"DEBUG: domains()={len(domains)} - EXIT!")
return domains
-boot.acquire_lock()
-
domains = {
"silenced": list(),
"blocked": list(),
print(f"ERROR: Cannot fetch from meta.chaos.social,exception[{type(e)}]:'{str(e)}'")
sys.exit(255)
-# Show domains
# DEBUG: print(f"DEBUG: domains()={len(domains)}")
if len(domains) > 0:
+ boot.acquire_lock()
+
print(f"INFO: Adding {len(domains)} new instances ...")
for block_level in domains:
# DEBUG: print(f"DEBUG: block_level='{block_level}'")
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import reqto
-import rss_parser
+import atoma
import sys
from fba import *
-boot.acquire_lock()
-
feed = sys.argv[1]
domains = list()
# DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code='{response.status_code}',response.text()={len(response.text)}")
if response.ok and response.status_code < 300 and len(response.text) > 0:
# DEBUG: print(f"DEBUG: Parsing RSS feed ...")
- rss = rss_parser.Parser.parse(response.text)
- for item in rss.channel.items:
- # DEBUG: print(f"DEBUG: item.link={item.link}")
+ rss = atoma.parse_rss_bytes(response.content)
+
+ # DEBUG: print(f"DEBUG: rss[]={type(rss)}")
+ for item in rss.items:
+ # DEBUG: print(f"DEBUG: item={item}")
domain = item.link.split("=")[1]
if fba.is_blacklisted(domain):
except BaseException as e:
print(f"ERROR: Cannot fetch feed='{feed}',exception[{type(e)}]:'{str(e)}'")
-# Show domains
# DEBUG: print(f"DEBUG: domains()={len(domains)}")
if len(domains) > 0:
+ boot.acquire_lock()
+
print(f"INFO: Adding {len(domains)} new instances ...")
for domain in domains:
print(f"INFO: Fetching instances from domain='{domain}' ...")
+atoma
beautifulsoup4
fastapi
uvicorn
jinja2
eventlet
reqto
-rss-parser
validators
zc.lockfile