Better email alerts for space launch updates

Fixes #200
This commit is contained in:
Edward Betts 2025-08-12 11:36:15 +01:00
parent fd7488bd04
commit 808f5c1d22

View file

@ -185,57 +185,147 @@ def get_launches(
return [summarize_launch(launch) for launch in data["results"]] return [summarize_launch(launch) for launch in data["results"]]
def format_launch_changes(differences: StrDict) -> str: def format_date(dt: datetime) -> str:
"""Convert deepdiff output to human-readable format.""" """Human readable date."""
changes = [] return dt.strftime("%d %b %Y at %H:%M UTC")
# Handle value changes
if "values_changed" in differences:
for path, change in differences["values_changed"].items(): def format_datetime_change(field_name: str, old_val: str, new_val: str) -> str:
field = path.replace("root['", "").replace("']", "").replace("root.", "") """Format a datetime field change with proper error handling."""
old_val = change["old_value"] try:
new_val = change["new_value"] old_dt = datetime.fromisoformat(old_val.replace("Z", "+00:00"))
# Format specific fields nicely new_dt = datetime.fromisoformat(new_val.replace("Z", "+00:00"))
if field == "net": return (
try: f"{field_name} changed from {format_date(old_dt)} to {format_date(new_dt)}"
old_dt = datetime.fromisoformat(old_val.replace("Z", "+00:00")) )
new_dt = datetime.fromisoformat(new_val.replace("Z", "+00:00")) except (ValueError, AttributeError):
changes.append( return f"{field_name} changed from {old_val} to {new_val}"
"Launch time changed from "
+ old_dt.strftime("%d %b %Y at %H:%M UTC")
+ " to " def format_datetime_update(field_name: str, new_val: str) -> str:
+ new_dt.strftime("%d %b %Y at %H:%M UTC") """Format a datetime field update (showing only the new value)."""
) try:
except: new_dt = datetime.fromisoformat(new_val.replace("Z", "+00:00"))
changes.append(f"Launch time changed from {old_val} to {new_val}") return f"{field_name}: {format_date(new_dt)}"
elif field == "name": except (ValueError, AttributeError):
changes.append(f"Mission name changed from '{old_val}' to '{new_val}'") return f"{field_name}: {new_val}"
elif field == "probability":
if old_val is None:
changes.append(f"Launch probability set to {new_val}%") def format_probability_change(old_val: int, new_val: int) -> str:
elif new_val is None: """Format probability field changes."""
changes.append("Launch probability removed") if old_val is None:
else: return f"Launch probability set to {new_val}%"
changes.append( elif new_val is None:
f"Launch probability changed from {old_val}% to {new_val}%" return "Launch probability removed"
) else:
elif "status" in field: return f"Launch probability changed from {old_val}% to {new_val}%"
def handle_value_changes(differences: StrDict) -> tuple[list[str], set[str]]:
"""Handle value changes in launch data."""
changes: list[str] = []
processed_paths: set[str] = set()
# Skip fields that aren't useful to users
SKIP_FIELDS = {
"agency_launch_attempt_count",
"agency_launch_attempt_count_year",
"location_launch_attempt_count",
"location_launch_attempt_count_year",
"pad_launch_attempt_count",
"pad_launch_attempt_count_year",
"orbital_launch_attempt_count",
"orbital_launch_attempt_count_year",
}
if "values_changed" not in differences:
return changes, processed_paths
for path, change in differences["values_changed"].items():
if any(path.startswith(processed) for processed in processed_paths):
continue
field = path.replace("root['", "").replace("']", "").replace("root.", "")
old_val = change["old_value"]
new_val = change["new_value"]
match field:
case "status['name']":
changes.append(f"Status changed from '{old_val}' to '{new_val}'") changes.append(f"Status changed from '{old_val}' to '{new_val}'")
else: processed_paths.update(
changes.append( [
f"{field.replace('_', ' ').title()} changed " "root['status']['id']",
+ f"from '{old_val}' to '{new_val}'" "root['status']['name']",
"root['status']['abbrev']",
"root['status']['description']",
]
) )
# Handle additions
case x if x.startswith("status["):
continue
case "net_precision['name']":
changes.append(
f"Launch precision changed from '{old_val}' to '{new_val}'"
)
processed_paths.update(
[
"root['net_precision']['id']",
"root['net_precision']['name']",
"root['net_precision']['abbrev']",
"root['net_precision']['description']",
]
)
case x if x.startswith("net_precision["):
continue
case "net":
changes.append(format_datetime_change("Launch time", old_val, new_val))
case "window_start":
changes.append(
format_datetime_change("Launch window start", old_val, new_val)
)
case "window_end":
changes.append(
format_datetime_change("Launch window end", old_val, new_val)
)
case "last_updated":
changes.append(format_datetime_update("Last updated", new_val))
case "name":
changes.append(f"Mission name changed from '{old_val}' to '{new_val}'")
case "probability":
changes.append(format_probability_change(old_val, new_val))
case x if x in SKIP_FIELDS:
continue
case _:
changes.append(f"{field} changed from '{old_val}' to '{new_val}'")
return changes, processed_paths
def format_launch_changes(differences: StrDict) -> str:
"""Convert deepdiff output to human-readable format using match/case."""
changes, processed_paths = handle_value_changes(differences)
# Handle other difference types...
if "dictionary_item_added" in differences: if "dictionary_item_added" in differences:
for path in differences["dictionary_item_added"]: for path in differences["dictionary_item_added"]:
field = path.replace("root['", "").replace("']", "").replace("root.", "") field = path.replace("root['", "").replace("']", "").replace("root.", "")
changes.append(f"New field added: {field.replace('_', ' ').title()}") changes.append(f"New field added: {field.replace('_', ' ').title()}")
# Handle removals
if "dictionary_item_removed" in differences: if "dictionary_item_removed" in differences:
for path in differences["dictionary_item_removed"]: for path in differences["dictionary_item_removed"]:
field = path.replace("root['", "").replace("']", "").replace("root.", "") field = path.replace("root['", "").replace("']", "").replace("root.", "")
changes.append(f"Field removed: {field.replace('_', ' ').title()}") changes.append(f"Field removed: {field.replace('_', ' ').title()}")
# Handle type changes
if "type_changes" in differences: if "type_changes" in differences:
for path, change in differences["type_changes"].items(): for path, change in differences["type_changes"].items():
field = path.replace("root['", "").replace("']", "").replace("root.", "") field = path.replace("root['", "").replace("']", "").replace("root.", "")
@ -243,6 +333,7 @@ def format_launch_changes(differences: StrDict) -> str:
f"{field.replace('_', ' ').title()} type changed " f"{field.replace('_', ' ').title()} type changed "
+ f"from {change['old_type'].__name__} to {change['new_type'].__name__}" + f"from {change['old_type'].__name__} to {change['new_type'].__name__}"
) )
return ( return (
"\n".join(f"{change}" for change in changes) "\n".join(f"{change}" for change in changes)
if changes if changes