#!/usr/bin/python3 """Check if conference websites are live.""" import html import os import re import smtplib import warnings from datetime import date from email.mime.text import MIMEText from email.utils import formatdate, make_msgid from urllib.parse import urlparse, urlunparse import cloudscraper import requests import yaml from requests.adapters import HTTPAdapter from urllib3.exceptions import InsecureRequestWarning from urllib3.util.url import parse_url from conference import LiveConference, config, load_yaml class AbsoluteDNSAdapter(HTTPAdapter): """A custom adapter for requests to ensure hostnames are treated as absolute.""" def add_dot_to_hostname(self, url: str) -> str: """Append a dot to the hostname to treat it as an absolute domain name.""" parsed_url = parse_url(url) # Append a dot to the hostname if it's not already there. hostname = parsed_url.host assert hostname if not hostname.endswith("."): hostname += "." # Reconstruct the URL with the modified hostname. new_url: str = parsed_url._replace(host=hostname).url return new_url def send(self, request, **kwargs): # type: ignore """Override the send method to modify the request URL before sending.""" # Modify the request URL to ensure the hostname is treated as absolute. request.url = self.add_dot_to_hostname(request.url) return super().send(request, **kwargs) # Suppress only the single InsecureRequestWarning from urllib3 warnings.filterwarnings("ignore", category=InsecureRequestWarning) re_title = re.compile("(.*?)", re.DOTALL) AGENT = config["browser"]["User-Agent"] headers = {"User-Agent": AGENT, "Accept": "text/html"} s = cloudscraper.CloudScraper() s_no_dot = cloudscraper.CloudScraper() # s = requests.Session() # s.headers.update(headers) # s_no_dot = requests.Session() # s_no_dot.headers.update(headers) # Create a session and mount the custom adapter for both HTTP and HTTPS requests. adapter = AbsoluteDNSAdapter() s.mount("http://", adapter) s.mount("https://", adapter) not_here_list = [ "The specified URL was not found.", "There is currently no text in this page.", "This page does not exist yet", "404 Not Found", "500 Internal Server Error", "500: Internal Server Error", "Test Page for the Apache HTTP Server", "Site not found · GitHub Pages", "504: Gateway time-out", "504 Gateway Time-out", "502 Bad Gateway", "This page doesn't exist (404)", "Coming soon", "NOT_FOUND", "Resource Not Found", "Wikimedia Error", "The page you requested could not be found", "Ooops! Could Not Find It", "OpenStreetMap Authentication Proxy", "Error 404", # "Under Construction", "Page not found", "Error 404: Page not found", "Barcamptools", "That page can’t be found.", "looks like there's no page here", "404 page", "Database Error", "You are not authorized to access this page", "Attention Required! | Cloudflare", "This page doesn't currently exist", "ERROR 503 - Service Unavailable", "ERROR 503", "401 Authorization Required", "Authorization Required", "used Cloudflare to restrict access", ] def find_not_here_message(page_html: str) -> str | None: """Find not here message in web page.""" return next((not_here for not_here in not_here_list if not_here in page_html), None) def get_title(page_html: str) -> str: """Title from web page.""" m = re_title.search(page_html) return html.unescape(m.group(1).strip()) if m and m.group(1) else "no title" def normalize_url(url: str) -> str: """ Normalize the URL by parsing and reconstructing to ensure uniformity. This handles cases like differing schemes, casing in the domain and trailing slashes. """ # Parse the URL into components parsed_url = urlparse(url) # Normalize the domain to lowercase and remove any trailing dot normalized_netloc = parsed_url.netloc.lower().rstrip(".") # Reconstruct the URL with normalized components return urlunparse(parsed_url._replace(netloc=normalized_netloc)) def url_to_filename(url: str) -> str: """ Convert a URL to a valid filename by replacing invalid characters with underscores. Args: url (str): The URL to be converted. Returns: str: A valid filename. """ # Replace invalid characters with underscores return re.sub(r'[\/:*?"<>|]', "_", url) def parse_opengraph_tags(html: str) -> dict[str, str]: """ Locate Open Graph meta tags, and return them as a dictionary. Keys will match the name following 'og:', e.g. 'title', 'image', 'url', 'site_name'. """ pattern = re.compile( r'', flags=re.IGNORECASE ) matches = pattern.findall(html) og_tags = {} for prop, content in matches: og_tags[prop] = content return og_tags class Conference: """Conference.""" name: str src_url: str year: int response: None | requests.models.Response def __init__(self, name: str, src_url: str, year: int): """Init.""" self.name = name self.src_url = src_url self.year = year self.response = None @property def url(self) -> str: """Conference URL.""" return self.build_url(year=self.year) def build_url(self, year: int) -> str: """Build conference URL.""" return self.src_url.format(year=year, two_digit_year=year % 2000) @property def past_url(self) -> str: """URL for previous year.""" return self.build_url(year=self.year - 1) def check(self) -> tuple[bool, str, str | None]: """Check if conference is live.""" no_dot = {"bsideskbh.dk", "pif.camp"} url = self.url session = s if all(hostname not in url for hostname in no_dot) else s_no_dot try: r = session.get(url) except requests.exceptions.ConnectionError: return (False, "connection refused", None) self.response = r not_here = find_not_here_message(r.text) if ( len(r.text) < 2048 and 'http-equiv="refresh"' in r.text and str(self.year) not in r.text ): return (False, "redirect to URL without year", r.url) if normalize_url(r.url) == normalize_url(self.past_url): return (False, "redirect to previous year", r.url) if not_here: return (False, not_here, r.url) return (True, get_title(r.text), r.url) def og_tags(self) -> dict[str, str]: """Open Graph tags.""" assert self.response return parse_opengraph_tags(self.response.text) def check_web_site(self) -> bool: """Check if an individual web site is live.""" assert "{year}" in self.src_url or "{two_digit_year}" in self.src_url live, msg, redirect_to_url = self.check() if not live: return False og = "".join(f"\n{key}: {value}" for key, value in self.og_tags().items()) if og: og = "\n\nOpen Graph\n\n" + og if redirect_to_url and normalize_url(redirect_to_url) == normalize_url( self.url ): body = f"{self.name}\n{self.url}\n" else: body = f"{self.name}\n{self.url} redirects to {redirect_to_url}\n" body += "Web page title: {msg}{og}" "" send_mail(f"Conference site live: {self.name} - {self.year}", body) return True def send_mail(subject: str, body: str) -> None: """Send an e-mail.""" return mail_from_address = config["mail"]["from_address"] mail_from_name = config["mail"]["from_name"] mail_to_address = config["mail"]["to_address"] mail_to_name = config["mail"]["to_name"] msg = MIMEText(body, "plain", "UTF-8") msg["Subject"] = subject msg["To"] = f"{mail_to_name} <{mail_to_address}>" msg["From"] = f"{mail_from_name} <{mail_from_address}>" msg["Date"] = formatdate() msg["Message-ID"] = make_msgid() # extra mail headers from config for header_name, value in config["mail_headers"].items(): msg[header_name] = value s = smtplib.SMTP(config["mail"]["smtp_host"]) s.sendmail(mail_from_address, [mail_to_address], msg.as_string()) s.quit() def check(name: str, src_url: str, year: int) -> bool: """Check to see if conference site is live.""" conf = Conference(name, src_url, year) return conf.check_web_site() def find_new_conference_web_sites( today: date, live: list[LiveConference] ) -> list[LiveConference]: """Check for new conference web sites going live.""" this_year = today.year new: list[LiveConference] = [] live_set = {(c["conference"], c["year"]) for c in live} for name, src_url in load_yaml("conferences").items(): new += [ {"conference": name, "year": year, "live": today} for year in (this_year, this_year + 1) if (name, year) not in live_set and check(name, src_url, year) ] return new def main(show_not_live: bool = False) -> None: """Check fow new conference web sites.""" live: list[LiveConference] = load_yaml("live") if not (new := find_new_conference_web_sites(date.today(), live)): return live_filename = os.path.expanduser(config["data"]["live"]) with open(live_filename, "w") as out: yaml.dump(live + new, stream=out, sort_keys=False) if __name__ == "__main__": main()