129 lines
4.1 KiB
Python
129 lines
4.1 KiB
Python
import json
|
|
import os
|
|
import typing
|
|
from datetime import datetime
|
|
|
|
import httpx
|
|
|
|
Launch = dict[str, typing.Any]
|
|
Summary = dict[str, typing.Any]
|
|
|
|
|
|
async def next_launch_api(rocket_dir: str, limit: int = 200) -> list[Launch]:
|
|
"""Get the next upcoming launches from the API."""
|
|
now = datetime.now()
|
|
filename = os.path.join(rocket_dir, now.strftime("%Y-%m-%d_%H:%M:%S.json"))
|
|
url = "https://ll.thespacedevs.com/2.2.0/launch/upcoming/"
|
|
|
|
params: dict[str, str | int] = {"limit": limit}
|
|
async with httpx.AsyncClient() as client:
|
|
r = await client.get(url, params=params)
|
|
open(filename, "w").write(r.text)
|
|
data = r.json()
|
|
return [summarize_launch(launch) for launch in data["results"]]
|
|
|
|
|
|
def filename_timestamp(filename: str) -> tuple[datetime, str] | None:
|
|
"""Get datetime from filename."""
|
|
try:
|
|
ts = datetime.strptime(filename, "%Y-%m-%d_%H:%M:%S.json")
|
|
except ValueError:
|
|
return None
|
|
return (ts, filename)
|
|
|
|
|
|
def format_time(time_str: str, net_precision: str) -> tuple[str, str | None]:
|
|
"""Format time based on precision."""
|
|
dt = datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
include_time = False
|
|
# Format the date based on precision
|
|
time_format: str | None = None
|
|
field2 = None
|
|
if net_precision == "Month":
|
|
time_format = "%b %Y"
|
|
if net_precision == "Week":
|
|
time_format = "%d %b %Y"
|
|
field2 = "(Week of)"
|
|
if net_precision == "Day":
|
|
time_format = "%d %b %Y"
|
|
if net_precision == "Hour":
|
|
time_format = "%d %b %Y"
|
|
include_time = True
|
|
if net_precision == "Minute":
|
|
time_format = "%d %b %Y"
|
|
include_time = True
|
|
elif net_precision == "Second":
|
|
time_format = "%d %b %Y"
|
|
include_time = True
|
|
elif net_precision.startswith("Quarter "):
|
|
time_format = f"Q{net_precision[-1]} %Y"
|
|
|
|
if not time_format:
|
|
return (repr(time_str), repr(net_precision))
|
|
|
|
assert time_format
|
|
|
|
formatted = dt.strftime(time_format)
|
|
|
|
if include_time:
|
|
return (formatted, dt.strftime("%H:%M"))
|
|
else:
|
|
return (formatted, field2)
|
|
|
|
|
|
launch_providers = {
|
|
"Indian Space Research Organization": "ISRO",
|
|
"United Launch Alliance": "ULA",
|
|
"Payload Aerospace S.L.": "Payload Aerospace",
|
|
"Russian Federal Space Agency (ROSCOSMOS)": "ROSCOSMOS",
|
|
"China Aerospace Science and Technology Corporation": "CASC",
|
|
}
|
|
|
|
|
|
def summarize_launch(launch: Launch) -> Summary:
|
|
"""Summarize rocket launch."""
|
|
launch_provider = launch["launch_service_provider"]["name"]
|
|
launch_provider_abbrev = launch_providers.get(launch_provider)
|
|
|
|
t0_date, t0_time = format_time(launch["net"], launch["net_precision"]["name"])
|
|
|
|
return {
|
|
"name": launch["name"],
|
|
"status": launch["status"],
|
|
"t0_date": t0_date,
|
|
"t0_time": t0_time,
|
|
"window_start": launch["window_start"],
|
|
"window_end": launch["window_end"],
|
|
"launch_provider": launch_provider,
|
|
"launch_provider_abbrev": launch_provider_abbrev,
|
|
"launch_provider_type": launch["launch_service_provider"]["type"],
|
|
"rocket": launch["rocket"]["configuration"]["full_name"],
|
|
"mission": launch["mission"],
|
|
"pad_name": launch["pad"]["name"],
|
|
"pad_wikipedia_url": launch["pad"]["wiki_url"],
|
|
"location": launch["pad"]["location"]["name"],
|
|
"country_code": launch["pad"]["country_code"],
|
|
"orbit": launch["mission"]["orbit"],
|
|
}
|
|
|
|
|
|
async def get_launches(rocket_dir: str, limit: int = 200) -> list[Summary]:
|
|
"""Get rocket launches with caching."""
|
|
now = datetime.now()
|
|
existing = [x for x in (filename_timestamp(f) for f in os.listdir(rocket_dir)) if x]
|
|
|
|
existing.sort(reverse=True)
|
|
|
|
if not existing or (now - existing[0][0]).seconds > 3600: # one hour
|
|
try:
|
|
return await next_launch_api(rocket_dir, limit=limit)
|
|
except ValueError:
|
|
print("*** SpaceX next launch error ***")
|
|
|
|
f = existing[0][1]
|
|
|
|
filename = os.path.join(rocket_dir, f)
|
|
data = json.load(open(filename))
|
|
return [summarize_launch(launch) for launch in data["results"]]
|