Send alerts for all space launch test flights
In addition to the configured FOLLOW_LAUNCHES, also send emails for any launch whose mission.type == "Test Flight" even if its slug is not in FOLLOW_LAUNCHES. Fixes #192.
This commit is contained in:
parent
97c0531a22
commit
09eb44b55d
53
update.py
53
update.py
|
@ -239,6 +239,12 @@ View all launches: https://edwardbetts.com/agenda/launches
|
|||
agenda.mail.send_mail(config, subject, body)
|
||||
|
||||
|
||||
def is_test_flight(launch: StrDict) -> bool:
|
||||
"""Return True if the launch is a test flight."""
|
||||
mission = typing.cast(dict[str, typing.Any] | None, launch.get("mission"))
|
||||
return bool(mission and mission.get("type") == "Test Flight")
|
||||
|
||||
|
||||
def get_launch_by_slug(data: StrDict, slug: str) -> StrDict | None:
|
||||
"""Find last update for space launch."""
|
||||
return {item["slug"]: typing.cast(StrDict, item) for item in data["results"]}.get(
|
||||
|
@ -247,29 +253,58 @@ def get_launch_by_slug(data: StrDict, slug: str) -> StrDict | None:
|
|||
|
||||
|
||||
def update_thespacedevs(config: flask.config.Config) -> None:
|
||||
"""Update cache of space launch API."""
|
||||
"""Update cache of space launch API and send emails on relevant changes.
|
||||
|
||||
In addition to the configured FOLLOW_LAUNCHES, also send emails for any
|
||||
launch whose mission.type == "Test Flight" even if its slug is not in
|
||||
FOLLOW_LAUNCHES.
|
||||
"""
|
||||
rocket_dir = os.path.join(config["DATA_DIR"], "thespacedevs")
|
||||
|
||||
existing_data = agenda.thespacedevs.load_cached_launches(rocket_dir)
|
||||
assert existing_data
|
||||
prev_launches = {
|
||||
slug: get_launch_by_slug(existing_data, slug)
|
||||
for slug in config["FOLLOW_LAUNCHES"]
|
||||
|
||||
# Always follow configured slugs
|
||||
follow_slugs: set[str] = set(config["FOLLOW_LAUNCHES"])
|
||||
|
||||
# Identify test-flight slugs present in the previous cache
|
||||
prev_test_slugs: set[str] = {
|
||||
typing.cast(str, item["slug"])
|
||||
for item in existing_data.get("results", [])
|
||||
if is_test_flight(typing.cast(StrDict, item))
|
||||
}
|
||||
|
||||
t0 = time()
|
||||
data = agenda.thespacedevs.next_launch_api_data(rocket_dir)
|
||||
if not data:
|
||||
return # thespacedevs API call failed
|
||||
cur_launches = {
|
||||
slug: get_launch_by_slug(data, slug) for slug in config["FOLLOW_LAUNCHES"]
|
||||
|
||||
# Identify test-flight slugs present in the current data
|
||||
cur_test_slugs: set[str] = {
|
||||
typing.cast(str, item["slug"])
|
||||
for item in data.get("results", [])
|
||||
if is_test_flight(typing.cast(StrDict, item))
|
||||
}
|
||||
|
||||
for slug in config["FOLLOW_LAUNCHES"]:
|
||||
prev, cur = prev_launches[slug], cur_launches[slug]
|
||||
# Add any test-flight slugs (whether old or new), excluding those we already
|
||||
# explicitly follow.
|
||||
extra_test_slugs = (prev_test_slugs | cur_test_slugs) - follow_slugs
|
||||
|
||||
# Final set of slugs to evaluate for changes
|
||||
slugs_to_check = follow_slugs | extra_test_slugs
|
||||
|
||||
# Build prev/cur lookup dicts for all slugs we're checking
|
||||
prev_launches = {
|
||||
slug: get_launch_by_slug(existing_data, slug) for slug in slugs_to_check
|
||||
}
|
||||
cur_launches = {slug: get_launch_by_slug(data, slug) for slug in slugs_to_check}
|
||||
|
||||
# Emit reports when a launch appears/disappears or changes
|
||||
for slug in slugs_to_check:
|
||||
prev, cur = prev_launches.get(slug), cur_launches.get(slug)
|
||||
if prev is None and cur is None:
|
||||
continue
|
||||
if prev and cur and prev["last_updated"] == cur["last_updated"]:
|
||||
if prev and cur and prev.get("last_updated") == cur.get("last_updated"):
|
||||
continue
|
||||
report_space_launch_change(config, prev, cur)
|
||||
|
||||
|
|
Loading…
Reference in a new issue