agenda/agenda/thespacedevs.py

334 lines
12 KiB
Python

"""Get details of upcoming space launches."""
import json
import os
import typing
from datetime import datetime
import requests
from .types import StrDict
from .utils import filename_timestamp, get_most_recent_file
Launch = dict[str, typing.Any]
Summary = dict[str, typing.Any]
ttl = 60 * 60 * 2 # two hours
LIMIT = 500
def next_launch_api_data(rocket_dir: str, limit: int = LIMIT) -> StrDict | None:
"""Get the next upcoming launches from the API."""
now = datetime.now()
filename = os.path.join(rocket_dir, now.strftime("%Y-%m-%d_%H:%M:%S.json"))
url = "https://ll.thespacedevs.com/2.2.0/launch/upcoming/"
params: dict[str, str | int] = {"limit": limit}
r = requests.get(url, params=params)
try:
data: StrDict = r.json()
except requests.exceptions.JSONDecodeError:
return None
open(filename, "w").write(r.text)
return data
def next_launch_api(rocket_dir: str, limit: int = LIMIT) -> list[Summary] | None:
"""Get the next upcoming launches from the API."""
data = next_launch_api_data(rocket_dir, limit)
if not data:
return None
return [summarize_launch(launch) for launch in data["results"]]
def format_time(time_str: str, net_precision: str) -> tuple[str, str | None]:
"""Format time based on precision."""
dt = datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%SZ")
include_time = False
# Format the date based on precision
time_format: str | None
field2 = None
match net_precision:
case "Year":
time_format = "%Y"
case "Month":
time_format = "%b %Y"
case "Week":
time_format = "%d %b %Y"
field2 = "(Week of)"
case "Day" | "Hour" | "Minute":
time_format = "%d %b %Y"
include_time = net_precision in {"Hour", "Minute"}
case "Second":
time_format = "%d %b %Y"
include_time = True
case _ if net_precision and net_precision.startswith("Quarter "):
time_format = f"Q{net_precision[-1]} %Y"
case _:
time_format = None
if not time_format:
return (repr(time_str), repr(net_precision))
assert time_format
formatted = dt.strftime(time_format)
if include_time:
return (formatted, dt.strftime("%H:%M"))
else:
return (formatted, field2)
launch_providers = {
"Indian Space Research Organization": "ISRO",
"United Launch Alliance": "ULA",
"Payload Aerospace S.L.": "Payload Aerospace",
"Russian Federal Space Agency (ROSCOSMOS)": "ROSCOSMOS",
"China Aerospace Science and Technology Corporation": "CASC",
}
def get_nested(data: dict[str, typing.Any], keys: list[str]) -> typing.Any | None:
"""
Safely get a nested value from a dictionary.
Args:
data (Dict[str, Any]): The dictionary to search.
keys (List[str]): A list of keys for the nested lookup.
Returns:
Optional[Any]: The retrieved value, or None if any key is missing.
"""
for key in keys:
if data is None or key not in data:
return None
data = data[key]
return data
def summarize_launch(launch: Launch) -> Summary:
"""Summarize rocket launch."""
try:
launch_provider = launch["launch_service_provider"]["name"]
launch_provider_abbrev = launch_providers.get(launch_provider)
except (TypeError, IndexError):
launch_provider = None
launch_provider_abbrev = None
net_precision = typing.cast(str, get_nested(launch, ["net_precision", "name"]))
t0_date, t0_time = format_time(launch["net"], net_precision)
return {
"name": launch.get("name"),
"slug": launch["slug"],
"status": launch.get("status"),
"net": launch.get("net"),
"net_precision": net_precision,
"t0_date": t0_date,
"t0_time": t0_time,
"window_start": launch.get("window_start"),
"window_end": launch.get("window_end"),
"launch_provider": launch_provider,
"launch_provider_abbrev": launch_provider_abbrev,
"launch_provider_type": get_nested(launch, ["launch_service_provider", "type"]),
"rocket": launch["rocket"]["configuration"],
"mission": launch.get("mission"),
"mission_name": get_nested(launch, ["mission", "name"]),
"pad_name": launch["pad"]["name"],
"pad_wikipedia_url": launch["pad"]["wiki_url"],
"location": launch["pad"]["location"]["name"],
"country_code": launch["pad"]["country_code"],
"orbit": get_nested(launch, ["mission", "orbit"]),
"probability": launch["probability"],
"weather_concerns": launch["weather_concerns"],
"image": launch.get("image"),
}
def load_cached_launches(rocket_dir: str) -> StrDict | None:
"""Read the most recent cache of launches."""
filename = get_most_recent_file(rocket_dir, "json")
return typing.cast(StrDict, json.load(open(filename))) if filename else None
def read_cached_launches(rocket_dir: str) -> list[Summary]:
"""Read cached launches."""
data = load_cached_launches(rocket_dir)
return [summarize_launch(launch) for launch in data["results"]] if data else []
def get_launches(
rocket_dir: str, limit: int = LIMIT, refresh: bool = False
) -> list[Summary] | None:
"""Get rocket launches with caching."""
now = datetime.now()
existing = [
x for x in (filename_timestamp(f, "json") for f in os.listdir(rocket_dir)) if x
]
existing.sort(reverse=True)
if refresh or not existing or (now - existing[0][0]).seconds > ttl:
try:
return next_launch_api(rocket_dir, limit=limit)
except Exception:
pass # fallback to cached version
f = existing[0][1]
filename = os.path.join(rocket_dir, f)
data = json.load(open(filename))
return [summarize_launch(launch) for launch in data["results"]]
def format_date(dt: datetime) -> str:
"""Human readable date."""
return dt.strftime("%d %b %Y at %H:%M UTC")
def format_datetime_change(field_name: str, old_val: str, new_val: str) -> str:
"""Format a datetime field change with proper error handling."""
try:
old_dt = datetime.fromisoformat(old_val.replace("Z", "+00:00"))
new_dt = datetime.fromisoformat(new_val.replace("Z", "+00:00"))
return (
f"{field_name} changed from {format_date(old_dt)} to {format_date(new_dt)}"
)
except (ValueError, AttributeError):
return f"{field_name} changed from {old_val} to {new_val}"
def format_datetime_update(field_name: str, new_val: str) -> str:
"""Format a datetime field update (showing only the new value)."""
try:
new_dt = datetime.fromisoformat(new_val.replace("Z", "+00:00"))
return f"{field_name}: {format_date(new_dt)}"
except (ValueError, AttributeError):
return f"{field_name}: {new_val}"
def format_probability_change(old_val: int, new_val: int) -> str:
"""Format probability field changes."""
if old_val is None:
return f"Launch probability set to {new_val}%"
elif new_val is None:
return "Launch probability removed"
else:
return f"Launch probability changed from {old_val}% to {new_val}%"
def format_launch_changes(differences: StrDict) -> str:
"""Convert deepdiff output to human-readable format."""
changes: list[str] = []
processed_paths: set[str] = set()
SKIP_FIELDS = {
"agency_launch_attempt_count",
"agency_launch_attempt_count_year",
"location_launch_attempt_count",
"location_launch_attempt_count_year",
"pad_launch_attempt_count",
"pad_launch_attempt_count_year",
"orbital_launch_attempt_count",
"orbital_launch_attempt_count_year",
}
# --- 1. Handle Special Group Value Changes ---
# Process high-level, user-friendly summaries first.
values = differences.get("values_changed", {})
if "root['status']['name']" in values:
old_val = values["root['status']['name']"]["old_value"]
new_val = values["root['status']['name']"]["new_value"]
changes.append(f"Status changed from '{old_val}' to '{new_val}'")
processed_paths.add("root['status']")
if "root['net_precision']['name']" in values:
old_val = values["root['net_precision']['name']"]["old_value"]
new_val = values["root['net_precision']['name']"]["new_value"]
changes.append(f"Launch precision changed from '{old_val}' to '{new_val}'")
processed_paths.add("root['net_precision']")
# --- 2. Handle Type Changes ---
# This is often more significant than a value change (e.g., probability becoming None).
if "type_changes" in differences:
for path, change in differences["type_changes"].items():
if any(path.startswith(p) for p in processed_paths):
continue
field = path.replace("root['", "").replace("']", "").replace("root.", "")
if field == "probability":
# Use custom formatter only for meaningful None transitions.
if change["old_type"] is type(None) or change["new_type"] is type(None):
changes.append(
format_probability_change(
change["old_value"], change["new_value"]
)
)
else: # For other type changes (e.g., int to str), use the generic message.
changes.append(
f"{field.replace('_', ' ').title()} type changed "
+ f"from {change['old_type'].__name__} to {change['new_type'].__name__}"
)
else:
changes.append(
f"{field.replace('_', ' ').title()} type changed "
+ f"from {change['old_type'].__name__} to {change['new_type'].__name__}"
)
processed_paths.add(path)
# --- 3. Handle Remaining Value Changes ---
for path, change in values.items():
if any(path.startswith(p) for p in processed_paths):
continue
field = path.replace("root['", "").replace("']", "").replace("root.", "")
if field in SKIP_FIELDS:
continue
old_val = change["old_value"]
new_val = change["new_value"]
match field:
case "net":
changes.append(format_datetime_change("Launch time", old_val, new_val))
case "window_start":
changes.append(
format_datetime_change("Launch window start", old_val, new_val)
)
case "window_end":
changes.append(
format_datetime_change("Launch window end", old_val, new_val)
)
case "last_updated":
changes.append(format_datetime_update("Last updated", new_val))
case "name":
changes.append(f"Mission name changed from '{old_val}' to '{new_val}'")
case "probability":
changes.append(format_probability_change(old_val, new_val))
case _:
changes.append(f"{field} changed from '{old_val}' to '{new_val}'")
processed_paths.add(path)
# --- 4. Handle Added/Removed Fields ---
if "dictionary_item_added" in differences:
for path in differences["dictionary_item_added"]:
field = path.replace("root['", "").replace("']", "").replace("root.", "")
changes.append(f"New field added: {field.replace('_', ' ').title()}")
if "dictionary_item_removed" in differences:
for path in differences["dictionary_item_removed"]:
field = path.replace("root['", "").replace("']", "").replace("root.", "")
changes.append(f"Field removed: {field.replace('_', ' ').title()}")
# Sort changes for deterministic output in tests
return (
"\n".join(f"{change}" for change in sorted(changes))
if changes
else "No specific changes detected"
)