#!/usr/bin/python3

"""Check if conference websites are live."""

import html
import os
import re
import smtplib
import warnings
from datetime import date
from email.mime.text import MIMEText
from email.utils import formatdate, make_msgid
from urllib.parse import urlparse, urlunparse

import cloudscraper
import requests
import yaml
from requests.adapters import HTTPAdapter
from urllib3.exceptions import InsecureRequestWarning
from urllib3.util.url import parse_url

from conference import LiveConference, config, load_yaml


class AbsoluteDNSAdapter(HTTPAdapter):
    """A custom adapter for requests to ensure hostnames are treated as absolute."""

    def add_dot_to_hostname(self, url: str) -> str:
        """Append a dot to the hostname to treat it as an absolute domain name."""
        parsed_url = parse_url(url)

        # Append a dot to the hostname if it's not already there.
        hostname = parsed_url.host
        assert hostname
        if not hostname.endswith("."):
            hostname += "."

        # Reconstruct the URL with the modified hostname.
        new_url: str = parsed_url._replace(host=hostname).url
        return new_url

    def send(self, request, **kwargs):  # type: ignore
        """Override the send method to modify the request URL before sending."""
        # Modify the request URL to ensure the hostname is treated as absolute.
        request.url = self.add_dot_to_hostname(request.url)
        return super().send(request, **kwargs)


# Suppress only the single InsecureRequestWarning from urllib3
warnings.filterwarnings("ignore", category=InsecureRequestWarning)


re_title = re.compile("<title>(.*?)</title>", re.DOTALL)

AGENT = config["browser"]["User-Agent"]
headers = {"User-Agent": AGENT, "Accept": "text/html"}

s = cloudscraper.CloudScraper()
s_no_dot = cloudscraper.CloudScraper()

# s = requests.Session()
# s.headers.update(headers)

# s_no_dot = requests.Session()
# s_no_dot.headers.update(headers)

# Create a session and mount the custom adapter for both HTTP and HTTPS requests.
adapter = AbsoluteDNSAdapter()
s.mount("http://", adapter)
s.mount("https://", adapter)


not_here_list = [
    "The specified URL was not found.",
    "There is currently no text in this page.",
    "This page does not exist yet",
    "404 Not Found",
    "500 Internal Server Error",
    "Test Page for the Apache HTTP Server",
    "Site not found &middot; GitHub Pages",
    "504: Gateway time-out",
    "504 Gateway Time-out",
    "502 Bad Gateway",
    "This page doesn't exist (404)",
    "Coming soon",
    "NOT_FOUND",
    "Resource Not Found",
    "Wikimedia Error",
    "The page you requested could not be found",
    "Ooops! Could Not Find It",
    "OpenStreetMap Authentication Proxy",
    "Error 404",
    # "Under Construction",
    "Page not found",
    "Error 404: Page not found",
    "Barcamptools",
    "That page can’t be found.",
    "looks like there's no page here",
    "404 page",
    "Database Error",
    "You are not authorized to access this page",
    "Attention Required! | Cloudflare",
    "This page doesn't currently exist",
    "ERROR 503 - Service Unavailable",
    "ERROR 503",
    "401 Authorization Required",
    "Authorization Required",
    "used Cloudflare to restrict access",
]


def find_not_here_message(page_html: str) -> str | None:
    """Find not here message in web page."""
    return next((not_here for not_here in not_here_list if not_here in page_html), None)


def get_title(page_html: str) -> str:
    """Title from web page."""
    m = re_title.search(page_html)
    return html.unescape(m.group(1).strip()) if m and m.group(1) else "no title"


def normalize_url(url: str) -> str:
    """
    Normalize the URL by parsing and reconstructing to ensure uniformity.

    This handles cases like differing schemes, casing in the domain
    and trailing slashes.
    """
    # Parse the URL into components
    parsed_url = urlparse(url)
    # Normalize the domain to lowercase and remove any trailing dot
    normalized_netloc = parsed_url.netloc.lower().rstrip(".")
    # Reconstruct the URL with normalized components
    return urlunparse(parsed_url._replace(netloc=normalized_netloc))


def url_to_filename(url: str) -> str:
    """
    Convert a URL to a valid filename by replacing invalid characters with underscores.

    Args:
        url (str): The URL to be converted.

    Returns:
        str: A valid filename.
    """
    # Replace invalid characters with underscores
    return re.sub(r'[\/:*?"<>|]', "_", url)


def parse_opengraph_tags(html: str) -> dict[str, str]:
    """
    Locate Open Graph meta tags, and return them as a dictionary.

    Keys will match the name following 'og:', e.g. 'title', 'image', 'url', 'site_name'.
    """
    pattern = re.compile(
        r'<meta\s+property="og:([^"]+)"\s+content="([^"]+)"\s*/?>', flags=re.IGNORECASE
    )
    matches = pattern.findall(html)

    og_tags = {}
    for prop, content in matches:
        og_tags[prop] = content

    return og_tags


class Conference:
    """Conference."""

    name: str
    src_url: str
    year: int
    response: None | requests.models.Response

    def __init__(self, name: str, src_url: str, year: int):
        """Init."""
        self.name = name
        self.src_url = src_url
        self.year = year
        self.response = None

    @property
    def url(self) -> str:
        """Conference URL."""
        return self.src_url.format(year=self.year)

    @property
    def past_url(self) -> str:
        """URL for previous year."""
        return self.src_url.format(year=self.year - 1)

    def check(self) -> tuple[bool, str, str | None]:
        """Check if conference is live."""
        no_dot = {"bsideskbh.dk", "pif.camp"}
        session = (
            s if all(hostname not in self.url for hostname in no_dot) else s_no_dot
        )
        try:
            r = session.get(self.url)
        except requests.exceptions.ConnectionError:
            return (False, "connection refused", None)

        self.response = r

        not_here = find_not_here_message(r.text)
        if (
            len(r.text) < 2048
            and 'http-equiv="refresh"' in r.text
            and str(self.year) not in r.text
        ):
            return (False, "redirect to URL without year", r.url)

        if normalize_url(r.url) == normalize_url(self.past_url):
            return (False, "redirect to previous year", r.url)

        if not_here:
            return (False, not_here, r.url)

        return (True, get_title(r.text), r.url)

    def og_tags(self) -> dict[str, str]:
        """Open Graph tags."""
        assert self.response
        return parse_opengraph_tags(self.response.text)

    def check_web_site(self) -> bool:
        """Check if an individual web site is live."""
        assert "{year}" in self.src_url
        live, msg, redirect_to_url = self.check()
        if not live:
            return False

        og = "".join(f"\n{key}: {value}" for key, value in self.og_tags().items())

        if og:
            og = "\n\nOpen Graph\n\n" + og

        if redirect_to_url and normalize_url(redirect_to_url) == normalize_url(
            self.url
        ):
            body = f"{self.name}\n{self.url}\n"
        else:
            body = f"{self.name}\n{self.url} redirects to {redirect_to_url}\n"

        body += "Web page title: {msg}{og}" ""
        send_mail(f"Conference site live: {self.name} - {self.year}", body)

        return True


def send_mail(subject: str, body: str) -> None:
    """Send an e-mail."""
    return
    mail_from_address = config["mail"]["from_address"]
    mail_from_name = config["mail"]["from_name"]
    mail_to_address = config["mail"]["to_address"]
    mail_to_name = config["mail"]["to_name"]
    msg = MIMEText(body, "plain", "UTF-8")

    msg["Subject"] = subject
    msg["To"] = f"{mail_to_name} <{mail_to_address}>"
    msg["From"] = f"{mail_from_name} <{mail_from_address}>"
    msg["Date"] = formatdate()
    msg["Message-ID"] = make_msgid()

    # extra mail headers from config
    for header_name, value in config["mail_headers"].items():
        msg[header_name] = value

    s = smtplib.SMTP(config["mail"]["smtp_host"])
    s.sendmail(mail_from_address, [mail_to_address], msg.as_string())
    s.quit()


def check(name: str, src_url: str, year: int) -> bool:
    """Check to see if conference site is live."""
    conf = Conference(name, src_url, year)
    return conf.check_web_site()


def find_new_conference_web_sites(
    today: date, live: list[LiveConference]
) -> list[LiveConference]:
    """Check for new conference web sites going live."""
    this_year = today.year

    new: list[LiveConference] = []

    live_set = {(c["conference"], c["year"]) for c in live}
    for name, src_url in load_yaml("conferences").items():
        new += [
            {"conference": name, "year": year, "live": today}
            for year in (this_year, this_year + 1)
            if (name, year) not in live_set and check(name, src_url, year)
        ]
    return new


def main(show_not_live: bool = False) -> None:
    """Check fow new conference web sites."""
    live: list[LiveConference] = load_yaml("live")
    if not (new := find_new_conference_web_sites(date.today(), live)):
        return

    live_filename = os.path.expanduser(config["data"]["live"])
    with open(live_filename, "w") as out:
        yaml.dump(live + new, stream=out, sort_keys=False)


if __name__ == "__main__":
    main()