trip: improve unbooked flight routing and airport pins

This commit is contained in:
Edward Betts 2026-03-02 13:09:06 +00:00
parent f0698acd59
commit 11ab5f6d28

View file

@ -4,6 +4,7 @@ import decimal
import hashlib import hashlib
import os import os
import typing import typing
import unicodedata
from datetime import date, datetime, timedelta, timezone from datetime import date, datetime, timedelta, timezone
import flask import flask
@ -23,6 +24,95 @@ class Airline(typing.TypedDict, total=False):
name: str name: str
def load_unbooked_flight_origin_rules(
data_dir: str,
) -> list[tuple[str, set[str]]]:
"""Load unbooked flight origin rules from personal data.
YAML schema:
- from: BRS
destinations: [AGP, ALC]
"""
filename = os.path.join(data_dir, "unbooked_flight_origins.yaml")
if not os.path.exists(filename):
return []
raw = yaml.safe_load(open(filename))
if not isinstance(raw, list):
return []
rules: list[tuple[str, set[str]]] = []
for item in raw:
if not isinstance(item, dict):
continue
from_iata = item.get("from")
destinations = item.get("destinations")
if not isinstance(from_iata, str) or not isinstance(destinations, list):
continue
destination_set = {d.upper() for d in destinations if isinstance(d, str)}
if destination_set:
rules.append((from_iata.upper(), destination_set))
return rules
def normalize_place_name(value: str) -> str:
"""Normalize place names for case/diacritics-insensitive matching."""
normalized = unicodedata.normalize("NFKD", value)
no_marks = "".join(ch for ch in normalized if not unicodedata.combining(ch))
return " ".join(no_marks.casefold().split())
def preferred_airport_iata(stop: StrDict, airports: dict[str, StrDict]) -> str | None:
"""Return preferred airport IATA for a stop from airport data."""
location = stop.get("location")
country = stop.get("country")
if not isinstance(location, str) or not isinstance(country, str):
return None
location_normalized = normalize_place_name(location)
country_lower = country.casefold()
match_scores: list[tuple[int, str]] = []
for iata, airport in airports.items():
airport_country = airport.get("country")
if (
not isinstance(airport_country, str)
or airport_country.casefold() != country_lower
):
continue
score = 0
city = airport.get("city")
if isinstance(city, str) and normalize_place_name(city) == location_normalized:
score = max(score, 3)
alt_name = airport.get("alt_name")
if (
isinstance(alt_name, str)
and normalize_place_name(alt_name) == location_normalized
):
score = max(score, 2)
name = airport.get("name")
if isinstance(name, str) and normalize_place_name(name) == location_normalized:
score = max(score, 1)
if score:
match_scores.append((score, iata))
if not match_scores:
return None
match_scores.sort(key=lambda item: (-item[0], item[1]))
return match_scores[0][1]
def get_unbooked_flight_origin_iata(
destination_iata: str | None, origin_rules: list[tuple[str, set[str]]]
) -> str:
"""Choose origin airport for an unbooked flight."""
if destination_iata:
for from_iata, destinations in origin_rules:
if destination_iata in destinations:
return from_iata
return "LHR"
def load_travel(travel_type: str, plural: str, data_dir: str) -> list[StrDict]: def load_travel(travel_type: str, plural: str, data_dir: str) -> list[StrDict]:
"""Read flight and train journeys.""" """Read flight and train journeys."""
items: list[StrDict] = travel.parse_yaml(plural, data_dir) items: list[StrDict] = travel.parse_yaml(plural, data_dir)
@ -266,15 +356,27 @@ def add_coordinates_for_unbooked_flights(
data_dir = flask.current_app.config["PERSONAL_DATA"] data_dir = flask.current_app.config["PERSONAL_DATA"]
airports = typing.cast(dict[str, StrDict], travel.parse_yaml("airports", data_dir)) airports = typing.cast(dict[str, StrDict], travel.parse_yaml("airports", data_dir))
lhr = airports["LHR"] iata_codes: set[str] = set()
coordinates.append( for route in routes:
{ if route["type"] != "unbooked_flight":
"name": lhr["name"], continue
"type": "airport", iata_codes.add(typing.cast(str, route.get("from_iata", "LHR")))
"latitude": lhr["latitude"], to_iata = route.get("to_iata")
"longitude": lhr["longitude"], if isinstance(to_iata, str):
} iata_codes.add(to_iata)
)
for iata in sorted(iata_codes):
airport = airports.get(iata)
if not airport:
continue
coordinates.append(
{
"name": airport["name"],
"type": "airport",
"latitude": airport["latitude"],
"longitude": airport["longitude"],
}
)
def stations_from_travel(t: StrDict) -> list[StrDict]: def stations_from_travel(t: StrDict) -> list[StrDict]:
@ -366,17 +468,18 @@ def collect_trip_coordinates(trip: Trip) -> list[StrDict]:
return coords return coords
def latlon_tuple_prefer_airport(stop: StrDict, data_dir: str) -> tuple[float, float]: def destination_latlon(
airport_lookup = { stop: StrDict, airports: dict[str, StrDict]
("Berlin", "de"): "BER", ) -> tuple[float, float] | None:
("Hamburg", "de"): "HAM", """Resolve destination coordinates from airport lookup or explicit lat/lon."""
} iata = preferred_airport_iata(stop, airports)
iata = airport_lookup.get((stop["location"], stop["country"])) if iata:
if not iata: airport = airports.get(iata)
if airport:
return latlon_tuple(airport)
if "latitude" in stop and "longitude" in stop:
return latlon_tuple(stop) return latlon_tuple(stop)
return None
airports = typing.cast(dict[str, StrDict], travel.parse_yaml("airports", data_dir))
return latlon_tuple(airports[iata])
def latlon_tuple(stop: StrDict) -> tuple[float, float]: def latlon_tuple(stop: StrDict) -> tuple[float, float]:
@ -476,20 +579,35 @@ def get_trip_routes(trip: Trip, data_dir: str) -> list[StrDict]:
if routes: if routes:
return routes return routes
lhr = (51.4775, -0.461389) airports = typing.cast(dict[str, StrDict], travel.parse_yaml("airports", data_dir))
origin_rules = load_unbooked_flight_origin_rules(data_dir)
return [ unbooked_routes = []
{ for item in trip.conferences:
"type": "unbooked_flight", if item["country"] in {"gb", "be"}: # not flying to Belgium
"key": f'LHR_{item["location"]}_{item["country"]}', continue
"from": lhr, destination_iata = preferred_airport_iata(item, airports)
"to": latlon_tuple_prefer_airport(item, data_dir), destination = destination_latlon(item, airports)
} if not destination:
for item in trip.conferences continue
if "latitude" in item from_iata = get_unbooked_flight_origin_iata(destination_iata, origin_rules)
and "longitude" in item from_airport = airports.get(from_iata)
and item["country"] not in {"gb", "be", "fr"} # not flying to Belgium or France if not from_airport:
] from_airport = airports["LHR"]
from_iata = "LHR"
unbooked_routes.append(
{
"type": "unbooked_flight",
"key": f'{from_iata}_{item["location"]}_{item["country"]}',
"from_iata": from_iata,
"to_iata": destination_iata,
"from": latlon_tuple(from_airport),
"to": destination,
}
)
return unbooked_routes
def get_coordinates_and_routes( def get_coordinates_and_routes(