Initial commit.
This commit is contained in:
commit
2a2a42fe5d
5 changed files with 5628 additions and 0 deletions
274
osm-pt-geojson
Executable file
274
osm-pt-geojson
Executable file
|
|
@ -0,0 +1,274 @@
|
|||
#!/usr/bin/python3
|
||||
"""Fetch an OSM public transport route relation and export it as GeoJSON."""
|
||||
import json
|
||||
import sys
|
||||
from typing import Any
|
||||
|
||||
import click
|
||||
import requests
|
||||
|
||||
OSM_API = "https://www.openstreetmap.org/api/0.6"
|
||||
|
||||
# Type aliases
|
||||
Coord = list[float] # [lon, lat]
|
||||
OsmTags = dict[str, str]
|
||||
OsmElement = dict[str, Any]
|
||||
GeoJson = dict[str, Any]
|
||||
|
||||
|
||||
def fetch_relation_full(relation_id: int) -> dict[str, Any]:
|
||||
"""Fetch the full OSM API response for a relation, including all member ways and nodes."""
|
||||
url = f"{OSM_API}/relation/{relation_id}/full.json"
|
||||
try:
|
||||
resp = requests.get(url, headers={"User-Agent": "osm-pt-geojson/1.0"}, timeout=30)
|
||||
except requests.RequestException as e:
|
||||
click.echo(f"Error: {e}", err=True)
|
||||
sys.exit(1)
|
||||
if resp.status_code != 200:
|
||||
click.echo(f"Error: HTTP {resp.status_code} fetching relation {relation_id}", err=True)
|
||||
sys.exit(1)
|
||||
result: dict[str, Any] = resp.json()
|
||||
return result
|
||||
|
||||
|
||||
def parse_elements(
|
||||
data: dict[str, Any],
|
||||
relation_id: int,
|
||||
) -> tuple[dict[int, OsmElement], dict[int, OsmElement], list[int], list[int], OsmTags]:
|
||||
"""Index nodes and ways; extract ordered stops and way IDs from the target relation."""
|
||||
nodes: dict[int, OsmElement] = {}
|
||||
ways: dict[int, OsmElement] = {}
|
||||
relation: OsmElement | None = None
|
||||
|
||||
for elem in data["elements"]:
|
||||
t = elem["type"]
|
||||
if t == "node":
|
||||
nodes[elem["id"]] = elem
|
||||
elif t == "way":
|
||||
ways[elem["id"]] = elem
|
||||
elif t == "relation" and elem["id"] == relation_id:
|
||||
relation = elem
|
||||
|
||||
if relation is None:
|
||||
click.echo(f"Error: relation {relation_id} not found in API response", err=True)
|
||||
sys.exit(1)
|
||||
|
||||
stop_ids: list[int] = []
|
||||
way_ids: list[int] = []
|
||||
|
||||
for member in relation["members"]:
|
||||
if member["role"] in ("stop", "stop_entry_only", "stop_exit_only") and member["type"] == "node":
|
||||
stop_ids.append(member["ref"])
|
||||
elif member["role"] == "" and member["type"] == "way":
|
||||
way_ids.append(member["ref"])
|
||||
|
||||
tags: OsmTags = relation.get("tags", {})
|
||||
return nodes, ways, stop_ids, way_ids, tags
|
||||
|
||||
|
||||
def build_route_coords(
|
||||
way_ids: list[int],
|
||||
ways: dict[int, OsmElement],
|
||||
nodes: dict[int, OsmElement],
|
||||
) -> list[Coord]:
|
||||
"""Chain ways into a single ordered list of [lon, lat] coordinates."""
|
||||
if not way_ids:
|
||||
return []
|
||||
|
||||
def way_node_ids(way_id: int) -> list[int]:
|
||||
return ways[way_id]["nodes"] if way_id in ways else []
|
||||
|
||||
chain: list[int] = list(way_node_ids(way_ids[0]))
|
||||
|
||||
for way_id in way_ids[1:]:
|
||||
wn = way_node_ids(way_id)
|
||||
if not wn:
|
||||
continue
|
||||
if chain[-1] == wn[0]:
|
||||
chain.extend(wn[1:])
|
||||
elif chain[-1] == wn[-1]:
|
||||
chain.extend(reversed(wn[:-1]))
|
||||
elif chain[0] == wn[-1]:
|
||||
chain = list(wn) + chain[1:]
|
||||
elif chain[0] == wn[0]:
|
||||
chain = list(reversed(wn)) + chain[1:]
|
||||
else:
|
||||
click.echo(f"Warning: gap before way {way_id}", err=True)
|
||||
chain.extend(wn)
|
||||
|
||||
return [[nodes[nid]["lon"], nodes[nid]["lat"]] for nid in chain if nid in nodes]
|
||||
|
||||
|
||||
def nearest_coord_index(lon: float, lat: float, route_coords: list[Coord]) -> int:
|
||||
"""Return the index in route_coords nearest to (lon, lat)."""
|
||||
best_i = 0
|
||||
best_d = float("inf")
|
||||
for i, (rlon, rlat) in enumerate(route_coords):
|
||||
d = (rlon - lon) ** 2 + (rlat - lat) ** 2
|
||||
if d < best_d:
|
||||
best_d = d
|
||||
best_i = i
|
||||
return best_i
|
||||
|
||||
|
||||
def node_name(node: OsmElement) -> str:
|
||||
"""Return a human-readable name for a node: name tag, ref tag, or node ID."""
|
||||
tags: OsmTags = node.get("tags", {})
|
||||
return tags.get("name") or tags.get("ref") or str(node["id"])
|
||||
|
||||
|
||||
def make_geojson(
|
||||
route_coords: list[Coord],
|
||||
stop_ids: list[int],
|
||||
nodes: dict[int, OsmElement],
|
||||
route_tags: OsmTags,
|
||||
idx_from: int | None = None,
|
||||
idx_to: int | None = None,
|
||||
no_stops: bool = False,
|
||||
) -> GeoJson:
|
||||
"""Build a GeoJSON FeatureCollection, optionally sliced between two coord indices."""
|
||||
features: list[GeoJson] = []
|
||||
|
||||
# Map stops to their nearest position on the route
|
||||
stop_positions: list[tuple[int, OsmElement]] = []
|
||||
for sid in stop_ids:
|
||||
if sid not in nodes:
|
||||
continue
|
||||
n = nodes[sid]
|
||||
idx = nearest_coord_index(n["lon"], n["lat"], route_coords)
|
||||
stop_positions.append((idx, n))
|
||||
|
||||
# Apply slice
|
||||
if idx_from is not None and idx_to is not None:
|
||||
if idx_from > idx_to:
|
||||
idx_from, idx_to = idx_to, idx_from
|
||||
geom_coords = route_coords[idx_from : idx_to + 1]
|
||||
stops_in_range = [(i, n) for i, n in stop_positions if idx_from <= i <= idx_to]
|
||||
else:
|
||||
geom_coords = route_coords
|
||||
stops_in_range = stop_positions
|
||||
|
||||
features.append(
|
||||
{
|
||||
"type": "Feature",
|
||||
"geometry": {"type": "LineString", "coordinates": geom_coords},
|
||||
"properties": {
|
||||
"name": route_tags.get("name"),
|
||||
"ref": route_tags.get("ref"),
|
||||
"from": route_tags.get("from"),
|
||||
"to": route_tags.get("to"),
|
||||
"route": route_tags.get("route"),
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
if not no_stops:
|
||||
for _, node in stops_in_range:
|
||||
features.append(
|
||||
{
|
||||
"type": "Feature",
|
||||
"geometry": {"type": "Point", "coordinates": [node["lon"], node["lat"]]},
|
||||
"properties": {"name": node_name(node)},
|
||||
}
|
||||
)
|
||||
|
||||
return {"type": "FeatureCollection", "features": features}
|
||||
|
||||
|
||||
def output_geojson(geojson: GeoJson, output_path: str | None) -> None:
|
||||
"""Write GeoJSON to a file, or to stdout if output_path is None."""
|
||||
text = json.dumps(geojson, ensure_ascii=False, indent=2)
|
||||
if output_path:
|
||||
with open(output_path, "w", encoding="utf-8") as f:
|
||||
f.write(text)
|
||||
click.echo(f"Wrote {output_path}", err=True)
|
||||
else:
|
||||
click.echo(text)
|
||||
|
||||
|
||||
@click.group()
|
||||
def cli() -> None:
|
||||
"""OSM public transport route → GeoJSON tool."""
|
||||
|
||||
|
||||
@cli.command("list-stations")
|
||||
@click.argument("relation_id", type=int)
|
||||
def list_stations(relation_id: int) -> None:
|
||||
"""List all stations in an OSM public transport route relation."""
|
||||
data = fetch_relation_full(relation_id)
|
||||
nodes, ways, stop_ids, way_ids, tags = parse_elements(data, relation_id)
|
||||
click.echo(f"Route: {tags.get('name', relation_id)}")
|
||||
click.echo(f"Stops ({len(stop_ids)}):")
|
||||
for i, sid in enumerate(stop_ids, 1):
|
||||
if sid in nodes:
|
||||
click.echo(f" {i:2}. {node_name(nodes[sid])}")
|
||||
else:
|
||||
click.echo(f" {i:2}. (node {sid} not in response)")
|
||||
|
||||
|
||||
@cli.command("route-between")
|
||||
@click.argument("relation_id", type=int)
|
||||
@click.argument("from_station")
|
||||
@click.argument("to_station")
|
||||
@click.option("--output", "-o", type=click.Path(), default=None, help="Output file (default: stdout)")
|
||||
@click.option("--no-stops", is_flag=True, default=False, help="Omit stop points from output.")
|
||||
def route_between(
|
||||
relation_id: int,
|
||||
from_station: str,
|
||||
to_station: str,
|
||||
output: str | None,
|
||||
no_stops: bool,
|
||||
) -> None:
|
||||
"""Output GeoJSON for the route segment between two named stations."""
|
||||
data = fetch_relation_full(relation_id)
|
||||
nodes, ways, stop_ids, way_ids, tags = parse_elements(data, relation_id)
|
||||
route_coords = build_route_coords(way_ids, ways, nodes)
|
||||
|
||||
def find_stop(name: str) -> int | None:
|
||||
for sid in stop_ids:
|
||||
if sid in nodes and node_name(nodes[sid]).lower() == name.lower():
|
||||
return sid
|
||||
return None
|
||||
|
||||
sid_from = find_stop(from_station)
|
||||
sid_to = find_stop(to_station)
|
||||
|
||||
errors = []
|
||||
if sid_from is None:
|
||||
errors.append(f"Station not found: {from_station!r}")
|
||||
if sid_to is None:
|
||||
errors.append(f"Station not found: {to_station!r}")
|
||||
if errors:
|
||||
for e in errors:
|
||||
click.echo(f"Error: {e}", err=True)
|
||||
click.echo("Available stations:", err=True)
|
||||
for sid in stop_ids:
|
||||
if sid in nodes:
|
||||
click.echo(f" {node_name(nodes[sid])}", err=True)
|
||||
sys.exit(1)
|
||||
|
||||
assert sid_from is not None and sid_to is not None
|
||||
idx_from = nearest_coord_index(nodes[sid_from]["lon"], nodes[sid_from]["lat"], route_coords)
|
||||
idx_to = nearest_coord_index(nodes[sid_to]["lon"], nodes[sid_to]["lat"], route_coords)
|
||||
|
||||
geojson = make_geojson(
|
||||
route_coords, stop_ids, nodes, tags, idx_from=idx_from, idx_to=idx_to, no_stops=no_stops
|
||||
)
|
||||
output_geojson(geojson, output)
|
||||
|
||||
|
||||
@cli.command("full-route")
|
||||
@click.argument("relation_id", type=int)
|
||||
@click.option("--output", "-o", type=click.Path(), default=None, help="Output file (default: stdout)")
|
||||
@click.option("--no-stops", is_flag=True, default=False, help="Omit stop points from output.")
|
||||
def full_route(relation_id: int, output: str | None, no_stops: bool) -> None:
|
||||
"""Output GeoJSON for the entire route, end to end."""
|
||||
data = fetch_relation_full(relation_id)
|
||||
nodes, ways, stop_ids, way_ids, tags = parse_elements(data, relation_id)
|
||||
route_coords = build_route_coords(way_ids, ways, nodes)
|
||||
geojson = make_geojson(route_coords, stop_ids, nodes, tags, no_stops=no_stops)
|
||||
output_geojson(geojson, output)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cli()
|
||||
Loading…
Add table
Add a link
Reference in a new issue