conference-check/check.py

305 lines
9 KiB
Python
Executable file
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/python3
"""Check if conference websites are live."""
import os
import re
import smtplib
import warnings
from datetime import date
from email.mime.text import MIMEText
from email.utils import formatdate, make_msgid
from urllib.parse import urlparse, urlunparse
import requests
import yaml
from requests.adapters import HTTPAdapter
from urllib3.exceptions import InsecureRequestWarning
from urllib3.util.url import parse_url
from conference import LiveConference, config, load_yaml
class AbsoluteDNSAdapter(HTTPAdapter):
"""A custom adapter for requests to ensure hostnames are treated as absolute."""
def add_dot_to_hostname(self, url: str) -> str:
"""Append a dot to the hostname to treat it as an absolute domain name."""
parsed_url = parse_url(url)
# Append a dot to the hostname if it's not already there.
hostname = parsed_url.host
assert hostname
if not hostname.endswith("."):
hostname += "."
# Reconstruct the URL with the modified hostname.
new_url: str = parsed_url._replace(host=hostname).url
return new_url
def send(self, request, **kwargs): # type: ignore
"""Override the send method to modify the request URL before sending."""
# Modify the request URL to ensure the hostname is treated as absolute.
request.url = self.add_dot_to_hostname(request.url)
return super().send(request, **kwargs)
# Suppress only the single InsecureRequestWarning from urllib3
warnings.filterwarnings("ignore", category=InsecureRequestWarning)
re_title = re.compile("<title>(.*?)</title>", re.DOTALL)
AGENT = config["browser"]["User-Agent"]
headers = {"User-Agent": AGENT, "Accept": "text/html"}
s = requests.Session()
s.headers.update(headers)
# Create a session and mount the custom adapter for both HTTP and HTTPS requests.
adapter = AbsoluteDNSAdapter()
s.mount("http://", adapter)
s.mount("https://", adapter)
not_here_list = [
"The specified URL was not found.",
"There is currently no text in this page.",
"This page does not exist yet",
"404 Not Found",
"500 Internal Server Error",
"Test Page for the Apache HTTP Server",
"Site not found &middot; GitHub Pages",
"504: Gateway time-out",
"504 Gateway Time-out",
"502 Bad Gateway",
"This page doesn't exist (404)",
"Coming soon",
"NOT_FOUND",
"Resource Not Found",
"Wikimedia Error",
"The page you requested could not be found",
"Ooops! Could Not Find It",
"OpenStreetMap Authentication Proxy",
"Error 404",
"Under Construction",
"Page not found",
"Error 404: Page not found",
"Barcamptools",
"That page cant be found.",
"looks like there's no page here",
"404 page",
"Database Error",
"You are not authorized to access this page",
"Attention Required! | Cloudflare",
"This page doesn't currently exist",
"ERROR 503 - Service Unavailable",
"ERROR 503",
]
def find_not_here_message(html: str) -> str | None:
"""Find not here message in web page."""
return next((not_here for not_here in not_here_list if not_here in html), None)
def get_title(html: str) -> str:
"""Title from web page."""
m = re_title.search(html)
return m.group(1).strip() if m and m.group(1) else "no title"
def normalize_url(url: str) -> str:
"""
Normalize the URL by parsing and reconstructing to ensure uniformity.
This handles cases like differing schemes, casing in the domain
and trailing slashes.
"""
# Parse the URL into components
parsed_url = urlparse(url)
# Normalize the domain to lowercase and remove any trailing dot
normalized_netloc = parsed_url.netloc.lower().rstrip(".")
# Reconstruct the URL with normalized components
return urlunparse(parsed_url._replace(netloc=normalized_netloc))
def url_to_filename(url: str) -> str:
"""
Convert a URL to a valid filename by replacing invalid characters with underscores.
Args:
url (str): The URL to be converted.
Returns:
str: A valid filename.
"""
# Replace invalid characters with underscores
return re.sub(r'[\/:*?"<>|]', "_", url)
def parse_opengraph_tags(html: str) -> dict[str, str]:
"""
Locate Open Graph meta tags, and return them as a dictionary.
Keys will match the name following 'og:', e.g. 'title', 'image', 'url', 'site_name'.
"""
pattern = re.compile(
r'<meta\s+property="og:([^"]+)"\s+content="([^"]+)"\s*/?>', flags=re.IGNORECASE
)
matches = pattern.findall(html)
og_tags = {}
for prop, content in matches:
og_tags[prop] = content
return og_tags
class Conference:
"""Conference."""
name: str
src_url: str
year: int
response: None | requests.models.Response
def __init__(self, name: str, src_url: str, year: int):
"""Init."""
self.name = name
self.src_url = src_url
self.year = year
self.response = None
@property
def url(self) -> str:
"""Conference URL."""
return self.src_url.format(year=self.year)
@property
def past_url(self) -> str:
"""URL for previous year."""
return self.src_url.format(year=self.year - 1)
def check(self) -> tuple[bool, str, str | None]:
"""Check if conference is live."""
try:
# SotM Baltics has an invalid TLS certificate, but we don't care
r = s.get(self.url, verify=False)
except requests.exceptions.ConnectionError:
return (False, "connection refused", None)
self.response = r
filename = url_to_filename(self.url)
with open(os.path.join("sites", filename + ".html"), "w") as out:
out.write(r.text)
not_here = find_not_here_message(r.text)
if (
len(r.text) < 2048
and 'http-equiv="refresh"' in r.text
and str(self.year) not in r.text
):
return (False, "redirect to URL without year", r.url)
if normalize_url(r.url) == normalize_url(self.past_url):
return (False, "redirect to previous year", r.url)
if not_here:
return (False, not_here, r.url)
return (True, get_title(r.text), r.url)
def og_tags(self) -> dict[str, str]:
"""Open Graph tags."""
assert self.response
return parse_opengraph_tags(self.response.text)
def check_web_site(self) -> bool:
"""Check if an individual web site is live."""
assert "{year}" in self.src_url
live, msg, redirect_to_url = self.check()
if not live:
return False
og = "".join(f"\n{key}: {value}" for key, value in self.og_tags().items())
if og:
og = "\n\nOpen Graph\n\n" + og
if redirect_to_url and normalize_url(redirect_to_url) == normalize_url(
self.url
):
body = f"{self.name}\n{self.url}\n"
else:
body = f"{self.name}\n{self.url} redirects to {redirect_to_url}\n"
body += "Web page title: {msg}{og}" ""
send_mail(f"Conference site live: {self.name} - {self.year}", body)
return True
def send_mail(subject: str, body: str) -> None:
"""Send an e-mail."""
return
mail_from_address = config["mail"]["from_address"]
mail_from_name = config["mail"]["from_name"]
mail_to_address = config["mail"]["to_address"]
mail_to_name = config["mail"]["to_name"]
msg = MIMEText(body, "plain", "UTF-8")
msg["Subject"] = subject
msg["To"] = f"{mail_to_name} <{mail_to_address}>"
msg["From"] = f"{mail_from_name} <{mail_from_address}>"
msg["Date"] = formatdate()
msg["Message-ID"] = make_msgid()
# extra mail headers from config
for header_name, value in config["mail_headers"].items():
msg[header_name] = value
s = smtplib.SMTP(config["mail"]["smtp_host"])
s.sendmail(mail_from_address, [mail_to_address], msg.as_string())
s.quit()
def check(name: str, src_url: str, year: int) -> bool:
"""Check to see if conference site is live."""
conf = Conference(name, src_url, year)
return conf.check_web_site()
def find_new_conference_web_sites(
today: date, live: list[LiveConference]
) -> list[LiveConference]:
"""Check for new conference web sites going live."""
this_year = today.year
new: list[LiveConference] = []
live_set = {(c["conference"], c["year"]) for c in live}
for name, src_url in load_yaml("conferences").items():
new += [
{"conference": name, "year": year, "live": today}
for year in (this_year, this_year + 1)
if (name, year) not in live_set and check(name, src_url, year)
]
return new
def main(show_not_live: bool = False) -> None:
"""Check fow new conference web sites."""
live: list[LiveConference] = load_yaml("live")
if not (new := find_new_conference_web_sites(date.today(), live)):
return
live_filename = os.path.expanduser(config["data"]["live"])
with open(live_filename, "w") as out:
yaml.dump(live + new, stream=out, sort_keys=False)
if __name__ == "__main__":
main()