parent
4ade643de6
commit
a37af733cd
124
update.py
Executable file
124
update.py
Executable file
|
@ -0,0 +1,124 @@
|
||||||
|
#!/usr/bin/python3
|
||||||
|
"""Combined update script for various data sources."""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import os
|
||||||
|
import smtplib
|
||||||
|
import sys
|
||||||
|
from datetime import date, datetime
|
||||||
|
from email.message import EmailMessage
|
||||||
|
from email.utils import formatdate, make_msgid
|
||||||
|
from time import time
|
||||||
|
|
||||||
|
import requests
|
||||||
|
|
||||||
|
import agenda.thespacedevs
|
||||||
|
import agenda.types
|
||||||
|
import agenda.uk_holiday
|
||||||
|
import agenda.waste_schedule
|
||||||
|
from agenda import gwr
|
||||||
|
|
||||||
|
config = __import__("config.default", fromlist=[""])
|
||||||
|
|
||||||
|
|
||||||
|
async def update_bank_holidays() -> None:
|
||||||
|
"""Update cached copy of UK Bank holidays."""
|
||||||
|
t0 = time()
|
||||||
|
events = await agenda.uk_holiday.get_holiday_list(config.DATA_DIR)
|
||||||
|
time_taken = time() - t0
|
||||||
|
if not sys.stdin.isatty():
|
||||||
|
return
|
||||||
|
print(len(events), "bank holidays in list")
|
||||||
|
print(f"took {time_taken:.1f} seconds")
|
||||||
|
|
||||||
|
|
||||||
|
async def update_bristol_bins() -> None:
|
||||||
|
"""Update waste schedule from Bristol City Council."""
|
||||||
|
t0 = time()
|
||||||
|
events = await agenda.waste_schedule.get_bristol_gov_uk(
|
||||||
|
date.today(), config.DATA_DIR, config.BRISTOL_UPRN, refresh=True
|
||||||
|
)
|
||||||
|
time_taken = time() - t0
|
||||||
|
if not sys.stdin.isatty():
|
||||||
|
return
|
||||||
|
for event in events:
|
||||||
|
print(event)
|
||||||
|
print(f"took {time_taken:.1f} seconds")
|
||||||
|
|
||||||
|
|
||||||
|
def send_mail(subject: str, body: str) -> None:
|
||||||
|
"""Send an e-mail."""
|
||||||
|
msg = EmailMessage()
|
||||||
|
|
||||||
|
msg["Subject"] = subject
|
||||||
|
msg["To"] = f"{config.NAME} <{config.MAIL_TO}>"
|
||||||
|
msg["From"] = f"{config.NAME} <{config.MAIL_FROM}>"
|
||||||
|
msg["Date"] = formatdate()
|
||||||
|
msg["Message-ID"] = make_msgid()
|
||||||
|
|
||||||
|
# Add extra mail headers
|
||||||
|
for header, value in config.getattr("MAIL_HEADERS", []):
|
||||||
|
msg[header] = value
|
||||||
|
|
||||||
|
msg.set_content(body)
|
||||||
|
|
||||||
|
s = smtplib.SMTP(config.SMTP_HOST)
|
||||||
|
s.sendmail(config.MAIL_TO, [config.MAIL_TO], msg.as_string())
|
||||||
|
s.quit()
|
||||||
|
|
||||||
|
|
||||||
|
def update_gwr_advance_ticket_date() -> None:
|
||||||
|
"""Update GWR advance ticket date cache."""
|
||||||
|
filename = os.path.join(config.DATA_DIR, "advance-tickets.html")
|
||||||
|
existing_html = open(filename).read()
|
||||||
|
existing_date = gwr.extract_weekday_date(existing_html)
|
||||||
|
|
||||||
|
new_html = requests.get(gwr.url).text
|
||||||
|
open(filename, "w").write(new_html)
|
||||||
|
|
||||||
|
new_date = gwr.extract_weekday_date(new_html)
|
||||||
|
|
||||||
|
if existing_date == new_date:
|
||||||
|
if sys.stdin.isatty():
|
||||||
|
print("date has't changed:", existing_date)
|
||||||
|
return
|
||||||
|
|
||||||
|
subject = f"New GWR advance ticket booking date: {new_date}"
|
||||||
|
body = f"""Old date: {existing_date}
|
||||||
|
New date: {new_date}
|
||||||
|
|
||||||
|
{gwr.url}
|
||||||
|
|
||||||
|
Agenda: https://edwardbetts.com/agenda/
|
||||||
|
"""
|
||||||
|
send_mail(subject, body)
|
||||||
|
|
||||||
|
|
||||||
|
def update_thespacedevs() -> None:
|
||||||
|
"""Update cache of space launch API."""
|
||||||
|
rocket_dir = os.path.join(config.DATA_DIR, "thespacedevs")
|
||||||
|
|
||||||
|
t0 = time()
|
||||||
|
rockets = agenda.thespacedevs.next_launch_api(rocket_dir)
|
||||||
|
time_taken = time() - t0
|
||||||
|
if not sys.stdin.isatty():
|
||||||
|
return
|
||||||
|
print(len(rockets), "launches")
|
||||||
|
print(f"took {time_taken:.1f} seconds")
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
"""Update caches."""
|
||||||
|
now = datetime.now()
|
||||||
|
hour = now.hour
|
||||||
|
|
||||||
|
if hour % 3 == 0:
|
||||||
|
asyncio.run(update_bank_holidays())
|
||||||
|
asyncio.run(update_bristol_bins())
|
||||||
|
update_gwr_advance_ticket_date()
|
||||||
|
|
||||||
|
update_thespacedevs()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
|
@ -1,26 +0,0 @@
|
||||||
#!/usr/bin/python3
|
|
||||||
"""Update cached copy of UK Bank holidays."""
|
|
||||||
|
|
||||||
import asyncio
|
|
||||||
import sys
|
|
||||||
from time import time
|
|
||||||
|
|
||||||
import agenda.types
|
|
||||||
import agenda.uk_holiday
|
|
||||||
from agenda.types import StrDict
|
|
||||||
|
|
||||||
config = __import__("config.default", fromlist=[""])
|
|
||||||
|
|
||||||
|
|
||||||
async def get_bank_holidays() -> list[StrDict]:
|
|
||||||
"""Call UK Government bank holidays API and cache results."""
|
|
||||||
return await agenda.uk_holiday.get_holiday_list(config.DATA_DIR)
|
|
||||||
|
|
||||||
|
|
||||||
t0 = time()
|
|
||||||
events: list[StrDict] = asyncio.run(get_bank_holidays())
|
|
||||||
time_taken = time() - t0
|
|
||||||
if not sys.stdin.isatty():
|
|
||||||
sys.exit(0)
|
|
||||||
print(len(events), "bank holidays in list")
|
|
||||||
print(f"took {time_taken:.1f} seconds")
|
|
|
@ -1,29 +0,0 @@
|
||||||
#!/usr/bin/python3
|
|
||||||
"""Update waste schedule from Bristol City Council."""
|
|
||||||
|
|
||||||
import asyncio
|
|
||||||
import sys
|
|
||||||
from datetime import date
|
|
||||||
from time import time
|
|
||||||
|
|
||||||
import agenda.types
|
|
||||||
import agenda.waste_schedule
|
|
||||||
|
|
||||||
config = __import__("config.default", fromlist=[""])
|
|
||||||
|
|
||||||
|
|
||||||
async def bristol_waste_collection_events() -> list[agenda.types.Event]:
|
|
||||||
"""Waste colllection events."""
|
|
||||||
return await agenda.waste_schedule.get_bristol_gov_uk(
|
|
||||||
date.today(), config.DATA_DIR, config.BRISTOL_UPRN, refresh=True
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
today = date.today()
|
|
||||||
t0 = time()
|
|
||||||
events = asyncio.run(bristol_waste_collection_events())
|
|
||||||
time_taken = time() - t0
|
|
||||||
if sys.stdin.isatty():
|
|
||||||
for event in events:
|
|
||||||
print(event)
|
|
||||||
print(f"took {time_taken:.1f} seconds")
|
|
|
@ -1,66 +0,0 @@
|
||||||
#!/usr/bin/python3
|
|
||||||
"""Update GWR advance ticket date cache."""
|
|
||||||
|
|
||||||
import os.path
|
|
||||||
import smtplib
|
|
||||||
import sys
|
|
||||||
from email.message import EmailMessage
|
|
||||||
from email.utils import formatdate, make_msgid
|
|
||||||
|
|
||||||
import requests
|
|
||||||
|
|
||||||
from agenda import gwr
|
|
||||||
|
|
||||||
config = __import__("config.default", fromlist=[""])
|
|
||||||
|
|
||||||
|
|
||||||
def send_mail(subject: str, body: str) -> None:
|
|
||||||
"""Send an e-mail."""
|
|
||||||
msg = EmailMessage()
|
|
||||||
|
|
||||||
msg["Subject"] = subject
|
|
||||||
msg["To"] = f"{config.NAME} <{config.MAIL_TO}>"
|
|
||||||
msg["From"] = f"{config.NAME} <{config.MAIL_FROM}>"
|
|
||||||
msg["Date"] = formatdate()
|
|
||||||
msg["Message-ID"] = make_msgid()
|
|
||||||
|
|
||||||
# Add extra mail headers
|
|
||||||
for header, value in config.getattr("MAIL_HEADERS", []):
|
|
||||||
msg[header] = value
|
|
||||||
|
|
||||||
msg.set_content(body)
|
|
||||||
|
|
||||||
s = smtplib.SMTP(config.SMTP_HOST)
|
|
||||||
s.sendmail(config.MAIL_TO, [config.MAIL_TO], msg.as_string())
|
|
||||||
s.quit()
|
|
||||||
|
|
||||||
|
|
||||||
def main() -> None:
|
|
||||||
"""Get date from web page and compare with existing."""
|
|
||||||
filename = os.path.join(config.DATA_DIR, "advance-tickets.html")
|
|
||||||
existing_html = open(filename).read()
|
|
||||||
existing_date = gwr.extract_weekday_date(existing_html)
|
|
||||||
|
|
||||||
new_html = requests.get(gwr.url).text
|
|
||||||
open(filename, "w").write(new_html)
|
|
||||||
|
|
||||||
new_date = gwr.extract_weekday_date(new_html)
|
|
||||||
|
|
||||||
if existing_date == new_date:
|
|
||||||
if sys.stdin.isatty():
|
|
||||||
print("date has't changed:", existing_date)
|
|
||||||
return
|
|
||||||
|
|
||||||
subject = f"New GWR advance ticket booking date: {new_date}"
|
|
||||||
body = f"""Old date: {existing_date}
|
|
||||||
New date: {new_date}
|
|
||||||
|
|
||||||
{gwr.url}
|
|
||||||
|
|
||||||
Agenda: https://edwardbetts.com/agenda/
|
|
||||||
"""
|
|
||||||
send_mail(subject, body)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
|
@ -1,20 +0,0 @@
|
||||||
#!/usr/bin/python3
|
|
||||||
"""Update cache of space launch API."""
|
|
||||||
|
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
from time import time
|
|
||||||
|
|
||||||
import agenda.thespacedevs
|
|
||||||
|
|
||||||
config = __import__("config.default", fromlist=[""])
|
|
||||||
rocket_dir = os.path.join(config.DATA_DIR, "thespacedevs")
|
|
||||||
|
|
||||||
t0 = time()
|
|
||||||
|
|
||||||
rockets = agenda.thespacedevs.next_launch_api(rocket_dir)
|
|
||||||
time_taken = time() - t0
|
|
||||||
if not sys.stdin.isatty():
|
|
||||||
sys.exit(0)
|
|
||||||
print(len(rockets), "launches")
|
|
||||||
print(f"took {time_taken:.1f} seconds")
|
|
Loading…
Reference in a new issue