Compare commits
3 commits
cloudscrap
...
main
Author | SHA1 | Date | |
---|---|---|---|
Edward Betts | d845ec805f | ||
Edward Betts | 8c0acb1453 | ||
Edward Betts | a523af16e1 |
184
check.py
184
check.py
|
@ -2,6 +2,7 @@
|
||||||
|
|
||||||
"""Check if conference websites are live."""
|
"""Check if conference websites are live."""
|
||||||
|
|
||||||
|
import html
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
import smtplib
|
import smtplib
|
||||||
|
@ -11,6 +12,7 @@ from email.mime.text import MIMEText
|
||||||
from email.utils import formatdate, make_msgid
|
from email.utils import formatdate, make_msgid
|
||||||
from urllib.parse import urlparse, urlunparse
|
from urllib.parse import urlparse, urlunparse
|
||||||
|
|
||||||
|
import cloudscraper
|
||||||
import requests
|
import requests
|
||||||
import yaml
|
import yaml
|
||||||
from requests.adapters import HTTPAdapter
|
from requests.adapters import HTTPAdapter
|
||||||
|
@ -53,8 +55,14 @@ re_title = re.compile("<title>(.*?)</title>", re.DOTALL)
|
||||||
AGENT = config["browser"]["User-Agent"]
|
AGENT = config["browser"]["User-Agent"]
|
||||||
headers = {"User-Agent": AGENT, "Accept": "text/html"}
|
headers = {"User-Agent": AGENT, "Accept": "text/html"}
|
||||||
|
|
||||||
s = requests.Session()
|
s = cloudscraper.CloudScraper()
|
||||||
s.headers.update(headers)
|
s_no_dot = cloudscraper.CloudScraper()
|
||||||
|
|
||||||
|
# s = requests.Session()
|
||||||
|
# s.headers.update(headers)
|
||||||
|
|
||||||
|
# s_no_dot = requests.Session()
|
||||||
|
# s_no_dot.headers.update(headers)
|
||||||
|
|
||||||
# Create a session and mount the custom adapter for both HTTP and HTTPS requests.
|
# Create a session and mount the custom adapter for both HTTP and HTTPS requests.
|
||||||
adapter = AbsoluteDNSAdapter()
|
adapter = AbsoluteDNSAdapter()
|
||||||
|
@ -82,7 +90,7 @@ not_here_list = [
|
||||||
"Ooops! Could Not Find It",
|
"Ooops! Could Not Find It",
|
||||||
"OpenStreetMap Authentication Proxy",
|
"OpenStreetMap Authentication Proxy",
|
||||||
"Error 404",
|
"Error 404",
|
||||||
"Under Construction",
|
# "Under Construction",
|
||||||
"Page not found",
|
"Page not found",
|
||||||
"Error 404: Page not found",
|
"Error 404: Page not found",
|
||||||
"Barcamptools",
|
"Barcamptools",
|
||||||
|
@ -93,18 +101,23 @@ not_here_list = [
|
||||||
"You are not authorized to access this page",
|
"You are not authorized to access this page",
|
||||||
"Attention Required! | Cloudflare",
|
"Attention Required! | Cloudflare",
|
||||||
"This page doesn't currently exist",
|
"This page doesn't currently exist",
|
||||||
|
"ERROR 503 - Service Unavailable",
|
||||||
|
"ERROR 503",
|
||||||
|
"401 Authorization Required",
|
||||||
|
"Authorization Required",
|
||||||
|
"used Cloudflare to restrict access",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
def find_not_here_message(html: str) -> str | None:
|
def find_not_here_message(page_html: str) -> str | None:
|
||||||
"""Find not here message in web page."""
|
"""Find not here message in web page."""
|
||||||
return next((not_here for not_here in not_here_list if not_here in html), None)
|
return next((not_here for not_here in not_here_list if not_here in page_html), None)
|
||||||
|
|
||||||
|
|
||||||
def get_title(html: str) -> str:
|
def get_title(page_html: str) -> str:
|
||||||
"""Title from web page."""
|
"""Title from web page."""
|
||||||
m = re_title.search(html)
|
m = re_title.search(page_html)
|
||||||
return m.group(1).strip() if m and m.group(1) else "no title"
|
return html.unescape(m.group(1).strip()) if m and m.group(1) else "no title"
|
||||||
|
|
||||||
|
|
||||||
def normalize_url(url: str) -> str:
|
def normalize_url(url: str) -> str:
|
||||||
|
@ -122,34 +135,125 @@ def normalize_url(url: str) -> str:
|
||||||
return urlunparse(parsed_url._replace(netloc=normalized_netloc))
|
return urlunparse(parsed_url._replace(netloc=normalized_netloc))
|
||||||
|
|
||||||
|
|
||||||
def check_conference(
|
def url_to_filename(url: str) -> str:
|
||||||
name: str, src_url: str, year: int
|
"""
|
||||||
) -> tuple[bool, str, str | None]:
|
Convert a URL to a valid filename by replacing invalid characters with underscores.
|
||||||
"""Check if conference is live."""
|
|
||||||
url = src_url.format(year=year)
|
|
||||||
past_url = src_url.format(year=year - 1)
|
|
||||||
try:
|
|
||||||
# SotM Baltics has an invalid TLS certificate, but we don't care
|
|
||||||
r = s.get(url, verify=False)
|
|
||||||
except requests.exceptions.ConnectionError:
|
|
||||||
return (False, "connection refused", None)
|
|
||||||
|
|
||||||
not_here = find_not_here_message(r.text)
|
Args:
|
||||||
if (
|
url (str): The URL to be converted.
|
||||||
len(r.text) < 2048
|
|
||||||
and 'http-equiv="refresh"' in r.text
|
|
||||||
and str(year) not in r.text
|
|
||||||
):
|
|
||||||
return (False, "redirect to URL without year", r.url)
|
|
||||||
|
|
||||||
if normalize_url(r.url) == normalize_url(past_url):
|
Returns:
|
||||||
return (False, "redirect to previous year", r.url)
|
str: A valid filename.
|
||||||
|
"""
|
||||||
|
# Replace invalid characters with underscores
|
||||||
|
return re.sub(r'[\/:*?"<>|]', "_", url)
|
||||||
|
|
||||||
return (False, not_here, r.url) if not_here else (True, get_title(r.text), r.url)
|
|
||||||
|
def parse_opengraph_tags(html: str) -> dict[str, str]:
|
||||||
|
"""
|
||||||
|
Locate Open Graph meta tags, and return them as a dictionary.
|
||||||
|
|
||||||
|
Keys will match the name following 'og:', e.g. 'title', 'image', 'url', 'site_name'.
|
||||||
|
"""
|
||||||
|
pattern = re.compile(
|
||||||
|
r'<meta\s+property="og:([^"]+)"\s+content="([^"]+)"\s*/?>', flags=re.IGNORECASE
|
||||||
|
)
|
||||||
|
matches = pattern.findall(html)
|
||||||
|
|
||||||
|
og_tags = {}
|
||||||
|
for prop, content in matches:
|
||||||
|
og_tags[prop] = content
|
||||||
|
|
||||||
|
return og_tags
|
||||||
|
|
||||||
|
|
||||||
|
class Conference:
|
||||||
|
"""Conference."""
|
||||||
|
|
||||||
|
name: str
|
||||||
|
src_url: str
|
||||||
|
year: int
|
||||||
|
response: None | requests.models.Response
|
||||||
|
|
||||||
|
def __init__(self, name: str, src_url: str, year: int):
|
||||||
|
"""Init."""
|
||||||
|
self.name = name
|
||||||
|
self.src_url = src_url
|
||||||
|
self.year = year
|
||||||
|
self.response = None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def url(self) -> str:
|
||||||
|
"""Conference URL."""
|
||||||
|
return self.src_url.format(year=self.year)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def past_url(self) -> str:
|
||||||
|
"""URL for previous year."""
|
||||||
|
return self.src_url.format(year=self.year - 1)
|
||||||
|
|
||||||
|
def check(self) -> tuple[bool, str, str | None]:
|
||||||
|
"""Check if conference is live."""
|
||||||
|
no_dot = {"bsideskbh.dk", "pif.camp"}
|
||||||
|
session = (
|
||||||
|
s if all(hostname not in self.url for hostname in no_dot) else s_no_dot
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
r = session.get(self.url)
|
||||||
|
except requests.exceptions.ConnectionError:
|
||||||
|
return (False, "connection refused", None)
|
||||||
|
|
||||||
|
self.response = r
|
||||||
|
|
||||||
|
not_here = find_not_here_message(r.text)
|
||||||
|
if (
|
||||||
|
len(r.text) < 2048
|
||||||
|
and 'http-equiv="refresh"' in r.text
|
||||||
|
and str(self.year) not in r.text
|
||||||
|
):
|
||||||
|
return (False, "redirect to URL without year", r.url)
|
||||||
|
|
||||||
|
if normalize_url(r.url) == normalize_url(self.past_url):
|
||||||
|
return (False, "redirect to previous year", r.url)
|
||||||
|
|
||||||
|
if not_here:
|
||||||
|
return (False, not_here, r.url)
|
||||||
|
|
||||||
|
return (True, get_title(r.text), r.url)
|
||||||
|
|
||||||
|
def og_tags(self) -> dict[str, str]:
|
||||||
|
"""Open Graph tags."""
|
||||||
|
assert self.response
|
||||||
|
return parse_opengraph_tags(self.response.text)
|
||||||
|
|
||||||
|
def check_web_site(self) -> bool:
|
||||||
|
"""Check if an individual web site is live."""
|
||||||
|
assert "{year}" in self.src_url
|
||||||
|
live, msg, redirect_to_url = self.check()
|
||||||
|
if not live:
|
||||||
|
return False
|
||||||
|
|
||||||
|
og = "".join(f"\n{key}: {value}" for key, value in self.og_tags().items())
|
||||||
|
|
||||||
|
if og:
|
||||||
|
og = "\n\nOpen Graph\n\n" + og
|
||||||
|
|
||||||
|
if redirect_to_url and normalize_url(redirect_to_url) == normalize_url(
|
||||||
|
self.url
|
||||||
|
):
|
||||||
|
body = f"{self.name}\n{self.url}\n"
|
||||||
|
else:
|
||||||
|
body = f"{self.name}\n{self.url} redirects to {redirect_to_url}\n"
|
||||||
|
|
||||||
|
body += "Web page title: {msg}{og}" ""
|
||||||
|
send_mail(f"Conference site live: {self.name} - {self.year}", body)
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
def send_mail(subject: str, body: str) -> None:
|
def send_mail(subject: str, body: str) -> None:
|
||||||
"""Send an e-mail."""
|
"""Send an e-mail."""
|
||||||
|
return
|
||||||
mail_from_address = config["mail"]["from_address"]
|
mail_from_address = config["mail"]["from_address"]
|
||||||
mail_from_name = config["mail"]["from_name"]
|
mail_from_name = config["mail"]["from_name"]
|
||||||
mail_to_address = config["mail"]["to_address"]
|
mail_to_address = config["mail"]["to_address"]
|
||||||
|
@ -171,21 +275,10 @@ def send_mail(subject: str, body: str) -> None:
|
||||||
s.quit()
|
s.quit()
|
||||||
|
|
||||||
|
|
||||||
def check_conference_web_site(name: str, src_url: str, year: int) -> bool:
|
def check(name: str, src_url: str, year: int) -> bool:
|
||||||
"""Check if an individual web site is live."""
|
"""Check to see if conference site is live."""
|
||||||
assert "{year}" in src_url
|
conf = Conference(name, src_url, year)
|
||||||
live, msg, redirect_to_url = check_conference(name, src_url, year)
|
return conf.check_web_site()
|
||||||
url = src_url.format(year=year)
|
|
||||||
if live:
|
|
||||||
if redirect_to_url and normalize_url(redirect_to_url) == normalize_url(url):
|
|
||||||
body = f"{name}\n{url}\nWeb page title: {msg}"
|
|
||||||
else:
|
|
||||||
body = f"""{name}
|
|
||||||
{url} redirects to {redirect_to_url}
|
|
||||||
Web page title: {msg}"""
|
|
||||||
send_mail(f"Conference site live: {name} - {year}", body)
|
|
||||||
|
|
||||||
return live
|
|
||||||
|
|
||||||
|
|
||||||
def find_new_conference_web_sites(
|
def find_new_conference_web_sites(
|
||||||
|
@ -201,8 +294,7 @@ def find_new_conference_web_sites(
|
||||||
new += [
|
new += [
|
||||||
{"conference": name, "year": year, "live": today}
|
{"conference": name, "year": year, "live": today}
|
||||||
for year in (this_year, this_year + 1)
|
for year in (this_year, this_year + 1)
|
||||||
if (name, year) not in live_set
|
if (name, year) not in live_set and check(name, src_url, year)
|
||||||
and check_conference_web_site(name, src_url, year)
|
|
||||||
]
|
]
|
||||||
return new
|
return new
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue