Watch for new upcoming geomob events and announce

This commit is contained in:
Edward Betts 2024-07-08 11:48:56 +01:00
parent ff51bb9ff9
commit 07cf7dee3c
4 changed files with 135 additions and 10 deletions

120
agenda/geomob.py Normal file
View file

@ -0,0 +1,120 @@
"""Geomob events."""
import os
from dataclasses import dataclass
from datetime import date, datetime
from typing import List
import dateutil.parser
import flask
import lxml.html
import requests
import agenda.mail
import agenda.utils
@dataclass(frozen=True)
class GeomobEvent:
"""Geomob event."""
date: date
href: str
hashtag: str
def extract_geomob_events(
tree: lxml.html.HtmlElement,
) -> List[GeomobEvent]:
"""Extract upcoming events from the HTML content."""
events = []
for event in tree.xpath('//ol[@class="event-list"]/li/a'):
date_str, _, hashtag = event.text_content().strip().rpartition(" ")
events.append(
GeomobEvent(
date=dateutil.parser.parse(date_str).date(),
href=event.get("href"),
hashtag=hashtag,
)
)
return events
def find_new_events(
prev: list[GeomobEvent], cur: list[GeomobEvent]
) -> list[GeomobEvent]:
"""Find new events that appear in cur but not in prev."""
return list(set(cur) - set(prev))
def geomob_email(new_events: list[GeomobEvent], base_url: str) -> tuple[str, str]:
"""Generate email subject and body for new events.
Args:
new_events (List[Event]): List of new events.
base_url (str): The base URL of the website.
Returns:
tuple[str, str]: Email subject and body.
"""
assert new_events
subject = f"{len(new_events)} New Geomob Event(s) Announced"
body_lines = ["Hello,\n", "Here are the new Geomob events:\n"]
for event in new_events:
event_details = (
f"Date: {event.date}\n"
f"URL: {base_url + event.href}\n"
f"Hashtag: {event.hashtag}\n"
)
body_lines.append(event_details)
body_lines.append("-" * 40)
body = "\n".join(body_lines)
return (subject, body)
def get_cached_upcoming_events_list(geomob_dir: str) -> list[GeomobEvent]:
"""Get known geomob events."""
existing = [
x
for x in (
agenda.utils.filename_timestamp(f, "html") for f in os.listdir(geomob_dir)
)
if x
]
if not existing:
return []
existing.sort(reverse=True)
f = existing[0][1]
filename = os.path.join(geomob_dir, f)
return extract_geomob_events(lxml.html.parse(filename).getroot())
def update(config: flask.config.Config) -> None:
"""Get upcoming Geomob events and report new ones."""
geomob_dir = os.path.join(config["DATA_DIR"], "geomob")
prev_events = get_cached_upcoming_events_list(geomob_dir)
r = requests.get("https://thegeomob.com/")
cur_events = extract_geomob_events(lxml.html.fromstring(r.content))
if cur_events == prev_events:
return # no change
now = datetime.now()
new_filename = os.path.join(geomob_dir, now.strftime("%Y-%m-%d_%H:%M:%S.html"))
open(new_filename, "w").write(r.text)
new_events = list(set(cur_events) - set(prev_events))
if not new_events:
return
base_url = "https://thegeomob.com/"
subject, body = geomob_email(new_events, base_url)
agenda.mail.send_mail(config, subject, body)

View file

@ -8,6 +8,7 @@ from datetime import datetime
import requests
from .types import StrDict
from .utils import filename_timestamp
Launch = dict[str, typing.Any]
Summary = dict[str, typing.Any]
@ -34,15 +35,6 @@ def next_launch_api(rocket_dir: str, limit: int = 200) -> list[Launch]:
return [summarize_launch(launch) for launch in data["results"]]
def filename_timestamp(filename: str) -> tuple[datetime, str] | None:
"""Get datetime from filename."""
try:
ts = datetime.strptime(filename, "%Y-%m-%d_%H:%M:%S.json")
except ValueError:
return None
return (ts, filename)
def format_time(time_str: str, net_precision: str) -> tuple[str, str | None]:
"""Format time based on precision."""
dt = datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%SZ")
@ -149,7 +141,9 @@ def summarize_launch(launch: Launch) -> Summary:
def load_cached_launches(rocket_dir: str) -> StrDict:
"""Read the most recent cache of launches."""
existing = [x for x in (filename_timestamp(f) for f in os.listdir(rocket_dir)) if x]
existing = [
x for x in (filename_timestamp(f, "json") for f in os.listdir(rocket_dir)) if x
]
existing.sort(reverse=True)
f = existing[0][1]

View file

@ -56,3 +56,12 @@ def human_readable_delta(future_date: date) -> str | None:
parts.append(f"{days} day{'s' if days > 1 else ''}")
return " ".join(parts) if parts else None
def filename_timestamp(filename: str, ext: str) -> tuple[datetime, str] | None:
"""Get datetime from filename."""
try:
ts = datetime.strptime(filename, f"%Y-%m-%d_%H:%M:%S.{ext}")
except ValueError:
return None
return (ts, filename)

View file

@ -13,6 +13,7 @@ import flask
import requests
import agenda.fx
import agenda.geomob
import agenda.gwr
import agenda.mail
import agenda.thespacedevs
@ -165,6 +166,7 @@ def main() -> None:
asyncio.run(update_bristol_bins(app.config))
update_gwr_advance_ticket_date(app.config)
update_gandi(app.config)
agenda.geomob.update(app.config)
if hour % 12 == 0:
agenda.fx.get_rates(app.config)