agenda/update.py

125 lines
3.2 KiB
Python
Executable file

#!/usr/bin/python3
"""Combined update script for various data sources."""
import asyncio
import os
import smtplib
import sys
from datetime import date, datetime
from email.message import EmailMessage
from email.utils import formatdate, make_msgid
from time import time
import requests
import agenda.thespacedevs
import agenda.types
import agenda.uk_holiday
import agenda.waste_schedule
from agenda import gwr
config = __import__("config.default", fromlist=[""])
async def update_bank_holidays() -> None:
"""Update cached copy of UK Bank holidays."""
t0 = time()
events = await agenda.uk_holiday.get_holiday_list(config.DATA_DIR)
time_taken = time() - t0
if not sys.stdin.isatty():
return
print(len(events), "bank holidays in list")
print(f"took {time_taken:.1f} seconds")
async def update_bristol_bins() -> None:
"""Update waste schedule from Bristol City Council."""
t0 = time()
events = await agenda.waste_schedule.get_bristol_gov_uk(
date.today(), config.DATA_DIR, config.BRISTOL_UPRN, refresh=True
)
time_taken = time() - t0
if not sys.stdin.isatty():
return
for event in events:
print(event)
print(f"took {time_taken:.1f} seconds")
def send_mail(subject: str, body: str) -> None:
"""Send an e-mail."""
msg = EmailMessage()
msg["Subject"] = subject
msg["To"] = f"{config.NAME} <{config.MAIL_TO}>"
msg["From"] = f"{config.NAME} <{config.MAIL_FROM}>"
msg["Date"] = formatdate()
msg["Message-ID"] = make_msgid()
# Add extra mail headers
for header, value in config.getattr("MAIL_HEADERS", []):
msg[header] = value
msg.set_content(body)
s = smtplib.SMTP(config.SMTP_HOST)
s.sendmail(config.MAIL_TO, [config.MAIL_TO], msg.as_string())
s.quit()
def update_gwr_advance_ticket_date() -> None:
"""Update GWR advance ticket date cache."""
filename = os.path.join(config.DATA_DIR, "advance-tickets.html")
existing_html = open(filename).read()
existing_date = gwr.extract_weekday_date(existing_html)
new_html = requests.get(gwr.url).text
open(filename, "w").write(new_html)
new_date = gwr.extract_weekday_date(new_html)
if existing_date == new_date:
if sys.stdin.isatty():
print("date has't changed:", existing_date)
return
subject = f"New GWR advance ticket booking date: {new_date}"
body = f"""Old date: {existing_date}
New date: {new_date}
{gwr.url}
Agenda: https://edwardbetts.com/agenda/
"""
send_mail(subject, body)
def update_thespacedevs() -> None:
"""Update cache of space launch API."""
rocket_dir = os.path.join(config.DATA_DIR, "thespacedevs")
t0 = time()
rockets = agenda.thespacedevs.next_launch_api(rocket_dir)
time_taken = time() - t0
if not sys.stdin.isatty():
return
print(len(rockets), "launches")
print(f"took {time_taken:.1f} seconds")
def main() -> None:
"""Update caches."""
now = datetime.now()
hour = now.hour
if hour % 3 == 0:
asyncio.run(update_bank_holidays())
asyncio.run(update_bristol_bins())
update_gwr_advance_ticket_date()
update_thespacedevs()
if __name__ == "__main__":
main()