Watch for new upcoming geomob events and announce
This commit is contained in:
parent
ff51bb9ff9
commit
2d40a0c351
119
agenda/geomob.py
Normal file
119
agenda/geomob.py
Normal file
|
@ -0,0 +1,119 @@
|
||||||
|
"""Geomob events."""
|
||||||
|
|
||||||
|
import os
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from datetime import date, datetime
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
import dateutil.parser
|
||||||
|
import flask
|
||||||
|
import lxml.html
|
||||||
|
import requests
|
||||||
|
|
||||||
|
import agenda.mail
|
||||||
|
import agenda.utils
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class GeomobEvent:
|
||||||
|
"""Geomob event."""
|
||||||
|
|
||||||
|
date: date
|
||||||
|
href: str
|
||||||
|
hashtag: str
|
||||||
|
|
||||||
|
|
||||||
|
def extract_geomob_events(
|
||||||
|
tree: lxml.html.HtmlElement,
|
||||||
|
) -> List[GeomobEvent]:
|
||||||
|
"""Extract upcoming events from the HTML content."""
|
||||||
|
events = []
|
||||||
|
|
||||||
|
for event in tree.xpath('//ol[@class="event-list"]/li/a'):
|
||||||
|
date_str, _, hashtag = event.text_content().strip().rpartition(" ")
|
||||||
|
events.append(
|
||||||
|
GeomobEvent(
|
||||||
|
date=dateutil.parser.parse(date_str).date(),
|
||||||
|
href=event.get("href"),
|
||||||
|
hashtag=hashtag,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return events
|
||||||
|
|
||||||
|
|
||||||
|
def find_new_events(
|
||||||
|
prev: list[GeomobEvent], cur: list[GeomobEvent]
|
||||||
|
) -> list[GeomobEvent]:
|
||||||
|
"""Find new events that appear in cur but not in prev."""
|
||||||
|
return list(set(cur) - set(prev))
|
||||||
|
|
||||||
|
|
||||||
|
def geomob_email(new_events: list[GeomobEvent], base_url: str) -> tuple[str, str]:
|
||||||
|
"""Generate email subject and body for new events.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
new_events (List[Event]): List of new events.
|
||||||
|
base_url (str): The base URL of the website.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
tuple[str, str]: Email subject and body.
|
||||||
|
"""
|
||||||
|
assert new_events
|
||||||
|
|
||||||
|
subject = f"{len(new_events)} New Geomob Event(s) Announced"
|
||||||
|
|
||||||
|
body_lines = ["Hello,\n", "Here are the new Geomob events:\n"]
|
||||||
|
for event in new_events:
|
||||||
|
event_details = (
|
||||||
|
f"Date: {event.date}\n"
|
||||||
|
f"URL: {base_url + event.href}\n"
|
||||||
|
f"Hashtag: {event.hashtag}\n"
|
||||||
|
)
|
||||||
|
body_lines.append(event_details)
|
||||||
|
body_lines.append("-" * 40)
|
||||||
|
|
||||||
|
body = "\n".join(body_lines)
|
||||||
|
return (subject, body)
|
||||||
|
|
||||||
|
|
||||||
|
def get_cached_upcoming_events_list(geomob_dir: str) -> list[GeomobEvent]:
|
||||||
|
existing = [
|
||||||
|
x
|
||||||
|
for x in (
|
||||||
|
agenda.utils.filename_timestamp(f, "html") for f in os.listdir(geomob_dir)
|
||||||
|
)
|
||||||
|
if x
|
||||||
|
]
|
||||||
|
|
||||||
|
if not existing:
|
||||||
|
return []
|
||||||
|
existing.sort(reverse=True)
|
||||||
|
f = existing[0][1]
|
||||||
|
|
||||||
|
filename = os.path.join(geomob_dir, f)
|
||||||
|
return extract_geomob_events(lxml.html.parse(filename).getroot())
|
||||||
|
|
||||||
|
|
||||||
|
def update(config: flask.config.Config) -> None:
|
||||||
|
"""Get upcoming Geomob events and report new ones."""
|
||||||
|
geomob_dir = os.path.join(config["DATA_DIR"], "geomob")
|
||||||
|
|
||||||
|
prev_events = get_cached_upcoming_events_list(geomob_dir)
|
||||||
|
r = requests.get("https://thegeomob.com/")
|
||||||
|
cur_events = extract_geomob_events(lxml.html.fromstring(r.content))
|
||||||
|
|
||||||
|
if cur_events == prev_events:
|
||||||
|
return # no change
|
||||||
|
|
||||||
|
now = datetime.now()
|
||||||
|
new_filename = os.path.join(geomob_dir, now.strftime("%Y-%m-%d_%H:%M:%S.html"))
|
||||||
|
open(new_filename, "w").write(r.text)
|
||||||
|
|
||||||
|
new_events = list(set(cur_events) - set(prev_events))
|
||||||
|
if not new_events:
|
||||||
|
return
|
||||||
|
|
||||||
|
base_url = "https://thegeomob.com/"
|
||||||
|
subject, body = geomob_email(new_events, base_url)
|
||||||
|
agenda.mail.send_mail(config, subject, body)
|
|
@ -8,6 +8,7 @@ from datetime import datetime
|
||||||
import requests
|
import requests
|
||||||
|
|
||||||
from .types import StrDict
|
from .types import StrDict
|
||||||
|
from .utils import filename_timestamp
|
||||||
|
|
||||||
Launch = dict[str, typing.Any]
|
Launch = dict[str, typing.Any]
|
||||||
Summary = dict[str, typing.Any]
|
Summary = dict[str, typing.Any]
|
||||||
|
@ -34,15 +35,6 @@ def next_launch_api(rocket_dir: str, limit: int = 200) -> list[Launch]:
|
||||||
return [summarize_launch(launch) for launch in data["results"]]
|
return [summarize_launch(launch) for launch in data["results"]]
|
||||||
|
|
||||||
|
|
||||||
def filename_timestamp(filename: str) -> tuple[datetime, str] | None:
|
|
||||||
"""Get datetime from filename."""
|
|
||||||
try:
|
|
||||||
ts = datetime.strptime(filename, "%Y-%m-%d_%H:%M:%S.json")
|
|
||||||
except ValueError:
|
|
||||||
return None
|
|
||||||
return (ts, filename)
|
|
||||||
|
|
||||||
|
|
||||||
def format_time(time_str: str, net_precision: str) -> tuple[str, str | None]:
|
def format_time(time_str: str, net_precision: str) -> tuple[str, str | None]:
|
||||||
"""Format time based on precision."""
|
"""Format time based on precision."""
|
||||||
dt = datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%SZ")
|
dt = datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%SZ")
|
||||||
|
@ -149,7 +141,9 @@ def summarize_launch(launch: Launch) -> Summary:
|
||||||
|
|
||||||
def load_cached_launches(rocket_dir: str) -> StrDict:
|
def load_cached_launches(rocket_dir: str) -> StrDict:
|
||||||
"""Read the most recent cache of launches."""
|
"""Read the most recent cache of launches."""
|
||||||
existing = [x for x in (filename_timestamp(f) for f in os.listdir(rocket_dir)) if x]
|
existing = [
|
||||||
|
x for x in (filename_timestamp(f, "json") for f in os.listdir(rocket_dir)) if x
|
||||||
|
]
|
||||||
|
|
||||||
existing.sort(reverse=True)
|
existing.sort(reverse=True)
|
||||||
f = existing[0][1]
|
f = existing[0][1]
|
||||||
|
|
|
@ -56,3 +56,12 @@ def human_readable_delta(future_date: date) -> str | None:
|
||||||
parts.append(f"{days} day{'s' if days > 1 else ''}")
|
parts.append(f"{days} day{'s' if days > 1 else ''}")
|
||||||
|
|
||||||
return " ".join(parts) if parts else None
|
return " ".join(parts) if parts else None
|
||||||
|
|
||||||
|
|
||||||
|
def filename_timestamp(filename: str, ext: str) -> tuple[datetime, str] | None:
|
||||||
|
"""Get datetime from filename."""
|
||||||
|
try:
|
||||||
|
ts = datetime.strptime(filename, f"%Y-%m-%d_%H:%M:%S.{ext}")
|
||||||
|
except ValueError:
|
||||||
|
return None
|
||||||
|
return (ts, filename)
|
||||||
|
|
|
@ -13,6 +13,7 @@ import flask
|
||||||
import requests
|
import requests
|
||||||
|
|
||||||
import agenda.fx
|
import agenda.fx
|
||||||
|
import agenda.geomob
|
||||||
import agenda.gwr
|
import agenda.gwr
|
||||||
import agenda.mail
|
import agenda.mail
|
||||||
import agenda.thespacedevs
|
import agenda.thespacedevs
|
||||||
|
@ -165,6 +166,7 @@ def main() -> None:
|
||||||
asyncio.run(update_bristol_bins(app.config))
|
asyncio.run(update_bristol_bins(app.config))
|
||||||
update_gwr_advance_ticket_date(app.config)
|
update_gwr_advance_ticket_date(app.config)
|
||||||
update_gandi(app.config)
|
update_gandi(app.config)
|
||||||
|
agenda.geomob.update(app.config)
|
||||||
|
|
||||||
if hour % 12 == 0:
|
if hour % 12 == 0:
|
||||||
agenda.fx.get_rates(app.config)
|
agenda.fx.get_rates(app.config)
|
||||||
|
|
Loading…
Reference in a new issue