Send alert emails for particular rocket launches

Closes: #156
This commit is contained in:
Edward Betts 2024-06-15 21:35:18 +01:00
parent ce07ae65b1
commit e400360697
2 changed files with 63 additions and 5 deletions

View file

@ -7,13 +7,15 @@ from datetime import datetime
import requests
from .types import StrDict
Launch = dict[str, typing.Any]
Summary = dict[str, typing.Any]
ttl = 60 * 60 * 2 # two hours
def next_launch_api(rocket_dir: str, limit: int = 200) -> list[Launch]:
def next_launch_api_data(rocket_dir: str, limit: int = 200) -> StrDict:
"""Get the next upcoming launches from the API."""
now = datetime.now()
filename = os.path.join(rocket_dir, now.strftime("%Y-%m-%d_%H:%M:%S.json"))
@ -21,8 +23,14 @@ def next_launch_api(rocket_dir: str, limit: int = 200) -> list[Launch]:
params: dict[str, str | int] = {"limit": limit}
r = requests.get(url, params=params)
data = r.json()
data: StrDict = r.json()
open(filename, "w").write(r.text)
return data
def next_launch_api(rocket_dir: str, limit: int = 200) -> list[Launch]:
"""Get the next upcoming launches from the API."""
data = next_launch_api_data(rocket_dir, limit)
return [summarize_launch(launch) for launch in data["results"]]
@ -138,7 +146,7 @@ def summarize_launch(launch: Launch) -> Summary:
}
def read_cached_launches(rocket_dir: str) -> list[Summary]:
def load_cached_launches(rocket_dir: str) -> StrDict:
"""Read the most recent cache of launches."""
existing = [x for x in (filename_timestamp(f) for f in os.listdir(rocket_dir)) if x]
@ -146,7 +154,12 @@ def read_cached_launches(rocket_dir: str) -> list[Summary]:
f = existing[0][1]
filename = os.path.join(rocket_dir, f)
data = json.load(open(filename))
return typing.cast(StrDict, json.load(open(filename)))
def read_cached_launches(rocket_dir: str) -> list[Summary]:
"""Read cached launches."""
data = load_cached_launches(rocket_dir)
return [summarize_launch(launch) for launch in data["results"]]