#!/usr/bin/python3

"""Check if conference websites are live."""

import html
import os
import re
import smtplib
import warnings
from datetime import date
from email.mime.text import MIMEText
from email.utils import formatdate, make_msgid
from urllib.parse import urlparse, urlunparse

import cloudscraper
import requests
import yaml
from requests.adapters import HTTPAdapter
from urllib3.exceptions import InsecureRequestWarning
from urllib3.util.url import parse_url

from conference import LiveConference, config, load_yaml


class AbsoluteDNSAdapter(HTTPAdapter):
    """A custom adapter for requests to ensure hostnames are treated as absolute."""

    def add_dot_to_hostname(self, url: str) -> str:
        """Append a dot to the hostname to treat it as an absolute domain name."""
        parsed_url = parse_url(url)

        # Append a dot to the hostname if it's not already there.
        hostname = parsed_url.host
        assert hostname
        if not hostname.endswith("."):
            hostname += "."

        # Reconstruct the URL with the modified hostname.
        new_url: str = parsed_url._replace(host=hostname).url
        return new_url

    def send(self, request, **kwargs):  # type: ignore
        """Override the send method to modify the request URL before sending."""
        # Modify the request URL to ensure the hostname is treated as absolute.
        request.url = self.add_dot_to_hostname(request.url)
        return super().send(request, **kwargs)


# Suppress only the single InsecureRequestWarning from urllib3
warnings.filterwarnings("ignore", category=InsecureRequestWarning)


re_title = re.compile("<title>(.*?)</title>", re.DOTALL)

AGENT = config["browser"]["User-Agent"]
headers = {"User-Agent": AGENT, "Accept": "text/html"}

s = cloudscraper.CloudScraper()
s_no_dot = cloudscraper.CloudScraper()

# s = requests.Session()
# s.headers.update(headers)

# s_no_dot = requests.Session()
# s_no_dot.headers.update(headers)

# Create a session and mount the custom adapter for both HTTP and HTTPS requests.
adapter = AbsoluteDNSAdapter()
s.mount("http://", adapter)
s.mount("https://", adapter)


not_here_list = [
    "The specified URL was not found.",
    "There is currently no text in this page.",
    "This page does not exist yet",
    "404 Not Found",
    "500 Internal Server Error",
    "500: Internal Server Error",
    "Test Page for the Apache HTTP Server",
    "Site not found &middot; GitHub Pages",
    "504: Gateway time-out",
    "504 Gateway Time-out",
    "502 Bad Gateway",
    "This page doesn't exist (404)",
    "Coming soon",
    "NOT_FOUND",
    "Resource Not Found",
    "Wikimedia Error",
    "The page you requested could not be found",
    "Ooops! Could Not Find It",
    "OpenStreetMap Authentication Proxy",
    "Error 404",
    # "Under Construction",
    "Page not found",
    "Error 404: Page not found",
    "Barcamptools",
    "That page can’t be found.",
    "looks like there's no page here",
    "404 page",
    "Database Error",
    "You are not authorized to access this page",
    "Attention Required! | Cloudflare",
    "This page doesn't currently exist",
    "ERROR 503 - Service Unavailable",
    "ERROR 503",
    "401 Authorization Required",
    "Authorization Required",
    "used Cloudflare to restrict access",
]


def find_not_here_message(page_html: str) -> str | None:
    """Find not here message in web page."""
    return next((not_here for not_here in not_here_list if not_here in page_html), None)


def get_title(page_html: str) -> str:
    """Title from web page."""
    m = re_title.search(page_html)
    return html.unescape(m.group(1).strip()) if m and m.group(1) else "no title"


def normalize_url(url: str) -> str:
    """
    Normalize the URL by parsing and reconstructing to ensure uniformity.

    This handles cases like differing schemes, casing in the domain
    and trailing slashes.
    """
    # Parse the URL into components
    parsed_url = urlparse(url)
    # Normalize the domain to lowercase and remove any trailing dot
    normalized_netloc = parsed_url.netloc.lower().rstrip(".")
    # Reconstruct the URL with normalized components
    return urlunparse(parsed_url._replace(netloc=normalized_netloc))


def url_to_filename(url: str) -> str:
    """
    Convert a URL to a valid filename by replacing invalid characters with underscores.

    Args:
        url (str): The URL to be converted.

    Returns:
        str: A valid filename.
    """
    # Replace invalid characters with underscores
    return re.sub(r'[\/:*?"<>|]', "_", url)


def parse_opengraph_tags(html: str) -> dict[str, str]:
    """
    Locate Open Graph meta tags, and return them as a dictionary.

    Keys will match the name following 'og:', e.g. 'title', 'image', 'url', 'site_name'.
    """
    pattern = re.compile(
        r'<meta\s+property="og:([^"]+)"\s+content="([^"]+)"\s*/?>', flags=re.IGNORECASE
    )
    matches = pattern.findall(html)

    og_tags = {}
    for prop, content in matches:
        og_tags[prop] = content

    return og_tags


class Conference:
    """Conference."""

    name: str
    src_url: str
    year: int
    response: None | requests.models.Response

    def __init__(self, name: str, src_url: str, year: int):
        """Init."""
        self.name = name
        self.src_url = src_url
        self.year = year
        self.response = None

    @property
    def url(self) -> str:
        """Conference URL."""
        return self.build_url(year=self.year)

    def build_url(self, year: int) -> str:
        """Build conference URL."""
        return self.src_url.format(year=year, two_digit_year=year % 2000)

    @property
    def past_url(self) -> str:
        """URL for previous year."""
        return self.build_url(year=self.year - 1)

    def check(self) -> tuple[bool, str, str | None]:
        """Check if conference is live."""
        no_dot = {"bsideskbh.dk", "pif.camp"}
        url = self.url
        session = s if all(hostname not in url for hostname in no_dot) else s_no_dot
        try:
            r = session.get(url)
        except requests.exceptions.ConnectionError:
            return (False, "connection refused", None)

        self.response = r

        not_here = find_not_here_message(r.text)
        if (
            len(r.text) < 2048
            and 'http-equiv="refresh"' in r.text
            and str(self.year) not in r.text
        ):
            return (False, "redirect to URL without year", r.url)

        if normalize_url(r.url) == normalize_url(self.past_url):
            return (False, "redirect to previous year", r.url)

        if not_here:
            return (False, not_here, r.url)

        return (True, get_title(r.text), r.url)

    def og_tags(self) -> dict[str, str]:
        """Open Graph tags."""
        return parse_opengraph_tags(self.response.text) if self.response else {}

    def check_web_site(self) -> bool:
        """Check if an individual web site is live."""
        assert "{year}" in self.src_url or "{two_digit_year}" in self.src_url
        live, msg, redirect_to_url = self.check()
        if not live:
            return False

        og = "".join(f"\n{key}: {value}" for key, value in self.og_tags().items())

        if og:
            og = "\n\nOpen Graph\n\n" + og

        if redirect_to_url and normalize_url(redirect_to_url) == normalize_url(
            self.url
        ):
            body = f"{self.name}\n{self.url}\n"
        else:
            body = f"{self.name}\n{self.url} redirects to {redirect_to_url}\n"

        body += "Web page title: {msg}{og}" ""
        send_mail(f"Conference site live: {self.name} - {self.year}", body)

        return True


def send_mail(subject: str, body: str) -> None:
    """Send an e-mail."""
    return
    mail_from_address = config["mail"]["from_address"]
    mail_from_name = config["mail"]["from_name"]
    mail_to_address = config["mail"]["to_address"]
    mail_to_name = config["mail"]["to_name"]
    msg = MIMEText(body, "plain", "UTF-8")

    msg["Subject"] = subject
    msg["To"] = f"{mail_to_name} <{mail_to_address}>"
    msg["From"] = f"{mail_from_name} <{mail_from_address}>"
    msg["Date"] = formatdate()
    msg["Message-ID"] = make_msgid()

    # extra mail headers from config
    for header_name, value in config["mail_headers"].items():
        msg[header_name] = value

    s = smtplib.SMTP(config["mail"]["smtp_host"])
    s.sendmail(mail_from_address, [mail_to_address], msg.as_string())
    s.quit()


def check(name: str, src_url: str, year: int) -> bool:
    """Check to see if conference site is live."""
    conf = Conference(name, src_url, year)
    return conf.check_web_site()


def find_new_conference_web_sites(
    today: date, live: list[LiveConference]
) -> list[LiveConference]:
    """Check for new conference web sites going live."""
    this_year = today.year

    new: list[LiveConference] = []

    live_set = {(c["conference"], c["year"]) for c in live}
    for name, src_url in load_yaml("conferences").items():
        new += [
            {"conference": name, "year": year, "live": today}
            for year in (this_year, this_year + 1)
            if (name, year) not in live_set and check(name, src_url, year)
        ]
    return new


def main(show_not_live: bool = False) -> None:
    """Check fow new conference web sites."""
    live: list[LiveConference] = load_yaml("live")
    if not (new := find_new_conference_web_sites(date.today(), live)):
        return

    live_filename = os.path.expanduser(config["data"]["live"])
    with open(live_filename, "w") as out:
        yaml.dump(live + new, stream=out, sort_keys=False)


if __name__ == "__main__":
    main()