#!/usr/bin/python3 """Combined update script for various data sources.""" import asyncio import json import os import smtplib import sys import typing from datetime import date, datetime from email.message import EmailMessage from email.utils import formatdate, make_msgid from time import time import flask import requests import agenda.fx import agenda.thespacedevs import agenda.types import agenda.uk_holiday import agenda.waste_schedule from agenda import gwr from agenda.types import StrDict from web_view import app async def update_bank_holidays(config: flask.config.Config) -> None: """Update cached copy of UK Bank holidays.""" t0 = time() events = await agenda.uk_holiday.get_holiday_list(config["DATA_DIR"]) time_taken = time() - t0 if not sys.stdin.isatty(): return print(len(events), "bank holidays in list") print(f"took {time_taken:.1f} seconds") async def update_bristol_bins(config: flask.config.Config) -> None: """Update waste schedule from Bristol City Council.""" t0 = time() events = await agenda.waste_schedule.get_bristol_gov_uk( date.today(), config["DATA_DIR"], config["BRISTOL_UPRN"], refresh=True ) time_taken = time() - t0 if not sys.stdin.isatty(): return for event in events: print(event) print(f"took {time_taken:.1f} seconds") def send_mail(config: flask.config.Config, subject: str, body: str) -> None: """Send an e-mail.""" msg = EmailMessage() msg["Subject"] = subject msg["To"] = f"{config['NAME']} <{config['MAIL_TO']}>" msg["From"] = f"{config['NAME']} <{config['MAIL_FROM']}>" msg["Date"] = formatdate() msg["Message-ID"] = make_msgid() # Add extra mail headers for header, value in config["MAIL_HEADERS"]: msg[header] = value msg.set_content(body) s = smtplib.SMTP(config["SMTP_HOST"]) s.sendmail(config["MAIL_TO"], [config["MAIL_TO"]], msg.as_string()) s.quit() def update_gwr_advance_ticket_date(config: flask.config.Config) -> None: """Update GWR advance ticket date cache.""" filename = os.path.join(config["DATA_DIR"], "advance-tickets.html") existing_html = open(filename).read() existing_date = gwr.extract_weekday_date(existing_html) new_html = requests.get(gwr.url).text open(filename, "w").write(new_html) new_date = gwr.extract_weekday_date(new_html) if existing_date == new_date: if sys.stdin.isatty(): print("date has't changed:", existing_date) return subject = f"New GWR advance ticket booking date: {new_date}" body = f"""Old date: {existing_date} New date: {new_date} {gwr.url} Agenda: https://edwardbetts.com/agenda/ """ send_mail(config, subject, body) def report_space_launch_change( config: flask.config.Config, prev_launch: StrDict, cur_launch: StrDict ) -> None: """Send mail to announce change to space launch data.""" subject = f'Change to {cur_launch["name"]}' body = f""" A space launch of interest was updated. Get ready for two big piles of JSON. Old launch data {json.dumps(prev_launch, indent=2)} New launch data {json.dumps(cur_launch, indent=2)} """ send_mail(config, subject, body) def get_launch_by_slug(data: StrDict, slug: str) -> StrDict: """Find last update for space launch.""" return {item["slug"]: typing.cast(StrDict, item) for item in data["results"]}[slug] def update_thespacedevs(config: flask.config.Config) -> None: """Update cache of space launch API.""" rocket_dir = os.path.join(config["DATA_DIR"], "thespacedevs") existing_data = agenda.thespacedevs.load_cached_launches(rocket_dir) prev_launches = { slug: get_launch_by_slug(existing_data, slug) for slug in config["FOLLOW_LAUNCHES"] } t0 = time() data = agenda.thespacedevs.next_launch_api_data(rocket_dir) cur_launches = { slug: get_launch_by_slug(data, slug) for slug in config["FOLLOW_LAUNCHES"] } for slug in config["FOLLOW_LAUNCHES"]: prev, cur = prev_launches[slug], cur_launches[slug] if prev["last_updated"] != cur["last_updated"]: report_space_launch_change(config, prev, cur) time_taken = time() - t0 if not sys.stdin.isatty(): return rockets = [agenda.thespacedevs.summarize_launch(item) for item in data["results"]] print(len(rockets), "launches") print(f"took {time_taken:.1f} seconds") def update_gandi(config: flask.config.Config) -> None: """Retrieve list of domains from gandi.net.""" url = "https://api.gandi.net/v5/domain/domains" headers = {"authorization": "Bearer " + config["GANDI_TOKEN"]} filename = os.path.join(config["DATA_DIR"], "gandi_domains.json") r = requests.request("GET", url, headers=headers) items = r.json() assert isinstance(items, list) assert all(item["fqdn"] and item["dates"]["registry_ends_at"] for item in items) with open(filename, "w") as out: out.write(r.text) def main() -> None: """Update caches.""" now = datetime.now() hour = now.hour with app.app_context(): if hour % 3 == 0: asyncio.run(update_bank_holidays(app.config)) asyncio.run(update_bristol_bins(app.config)) update_gwr_advance_ticket_date(app.config) update_gandi(app.config) if hour % 12 == 0: agenda.fx.get_rates(app.config) update_thespacedevs(app.config) if __name__ == "__main__": main()