192 lines
7.1 KiB
Python
192 lines
7.1 KiB
Python
"""
|
|
Circle Line timetable between Paddington (H&C Line) and King's Cross St Pancras.
|
|
|
|
Parses the TransXChange XML file on first use and caches the result in memory.
|
|
"""
|
|
import os
|
|
import re
|
|
import xml.etree.ElementTree as ET
|
|
from datetime import datetime, timedelta
|
|
|
|
_PAD_STOP = '9400ZZLUPAH1' # Paddington (H&C Line)
|
|
_KXP_STOP = '9400ZZLUKSX3' # King's Cross St Pancras
|
|
|
|
from config.default import CIRCLE_LINE_XML as _TXC_XML # overridden by app config after import
|
|
_NS = {'t': 'http://www.transxchange.org.uk/'}
|
|
|
|
# Populated on first call to next_service(); maps direction -> day-type -> sorted
|
|
# list of (origin_depart_seconds, destination_arrive_seconds) measured from midnight.
|
|
_timetable: dict[str, dict[str, list[tuple[int, int]]]] | None = None
|
|
|
|
|
|
def _parse_duration(s: str | None) -> int:
|
|
if not s:
|
|
return 0
|
|
m = re.match(r'PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?', s)
|
|
return int(m.group(1) or 0) * 3600 + int(m.group(2) or 0) * 60 + int(m.group(3) or 0)
|
|
|
|
|
|
def _load_timetable() -> dict[str, dict[str, list[tuple[int, int]]]]:
|
|
tree = ET.parse(_TXC_XML)
|
|
root = tree.getroot()
|
|
|
|
# Build JPS id -> [(from_stop, to_stop, runtime_secs, wait_secs)]
|
|
jps_map: dict[str, list[tuple]] = {}
|
|
for jps_el in root.find('t:JourneyPatternSections', _NS):
|
|
links = []
|
|
for link in jps_el.findall('t:JourneyPatternTimingLink', _NS):
|
|
fr = link.find('t:From/t:StopPointRef', _NS)
|
|
to = link.find('t:To/t:StopPointRef', _NS)
|
|
rt = link.find('t:RunTime', _NS)
|
|
wait = link.find('t:From/t:WaitTime', _NS)
|
|
links.append((
|
|
fr.text if fr is not None else None,
|
|
to.text if to is not None else None,
|
|
_parse_duration(rt.text if rt is not None else None),
|
|
_parse_duration(wait.text if wait is not None else None),
|
|
))
|
|
jps_map[jps_el.get('id')] = links
|
|
|
|
def _seconds_to_depart(links, stop):
|
|
"""Seconds from journey start until departure from *stop*."""
|
|
elapsed = 0
|
|
for fr, to, rt, wait in links:
|
|
elapsed += wait
|
|
if fr == stop:
|
|
return elapsed
|
|
elapsed += rt
|
|
return None
|
|
|
|
def _seconds_to_arrive(links, stop):
|
|
"""Seconds from journey start until arrival at *stop*."""
|
|
elapsed = 0
|
|
for fr, to, rt, wait in links:
|
|
elapsed += wait + rt
|
|
if to == stop:
|
|
return elapsed
|
|
return None
|
|
|
|
# Map JP id -> [(direction, origin_depart_offset_secs, destination_arrive_offset_secs)].
|
|
jp_offsets: dict[str, list[tuple[str, int, int]]] = {}
|
|
for svc in root.find('t:Services', _NS):
|
|
for jp in svc.findall('.//t:JourneyPattern', _NS):
|
|
jps_ref = jp.find('t:JourneyPatternSectionRefs', _NS)
|
|
if jps_ref is None:
|
|
continue
|
|
links = jps_map.get(jps_ref.text, [])
|
|
stops = [l[0] for l in links] + ([links[-1][1]] if links else [])
|
|
offsets = []
|
|
if (
|
|
_PAD_STOP in stops
|
|
and _KXP_STOP in stops
|
|
and stops.index(_PAD_STOP) < stops.index(_KXP_STOP)
|
|
):
|
|
pad_off = _seconds_to_depart(links, _PAD_STOP)
|
|
kxp_off = _seconds_to_arrive(links, _KXP_STOP)
|
|
if pad_off is not None and kxp_off is not None:
|
|
offsets.append(('pad_to_kx', pad_off, kxp_off))
|
|
if (
|
|
_PAD_STOP in stops
|
|
and _KXP_STOP in stops
|
|
and stops.index(_KXP_STOP) < stops.index(_PAD_STOP)
|
|
):
|
|
kxp_off = _seconds_to_depart(links, _KXP_STOP)
|
|
pad_off = _seconds_to_arrive(links, _PAD_STOP)
|
|
if kxp_off is not None and pad_off is not None:
|
|
offsets.append(('kx_to_pad', kxp_off, pad_off))
|
|
if offsets:
|
|
jp_offsets[jp.get('id')] = offsets
|
|
|
|
result: dict[str, dict[str, list[tuple[int, int]]]] = {
|
|
'pad_to_kx': {
|
|
'MondayToFriday': [],
|
|
'Saturday': [],
|
|
'Sunday': [],
|
|
},
|
|
'kx_to_pad': {
|
|
'MondayToFriday': [],
|
|
'Saturday': [],
|
|
'Sunday': [],
|
|
},
|
|
}
|
|
|
|
for vj in root.find('t:VehicleJourneys', _NS):
|
|
jp_ref = vj.find('t:JourneyPatternRef', _NS)
|
|
dep_time = vj.find('t:DepartureTime', _NS)
|
|
op = vj.find('t:OperatingProfile', _NS)
|
|
if jp_ref is None or dep_time is None or jp_ref.text not in jp_offsets:
|
|
continue
|
|
h, m, s = map(int, dep_time.text.split(':'))
|
|
dep_secs = h * 3600 + m * 60 + s
|
|
rdt = op.find('.//t:DaysOfWeek', _NS) if op is not None else None
|
|
if rdt is None:
|
|
continue
|
|
for day_el in rdt:
|
|
day_type = day_el.tag.split('}')[-1]
|
|
for direction, origin_off, dest_off in jp_offsets[jp_ref.text]:
|
|
if day_type in result[direction]:
|
|
result[direction][day_type].append((
|
|
dep_secs + origin_off,
|
|
dep_secs + dest_off,
|
|
))
|
|
|
|
for direction in result:
|
|
for key in result[direction]:
|
|
result[direction][key].sort()
|
|
return result
|
|
|
|
|
|
def _get_timetable() -> dict[str, dict[str, list[tuple[int, int]]]]:
|
|
global _timetable
|
|
if _timetable is None:
|
|
_timetable = _load_timetable()
|
|
return _timetable
|
|
|
|
|
|
def _day_type(weekday: int) -> str:
|
|
if weekday < 5:
|
|
return 'MondayToFriday'
|
|
return 'Saturday' if weekday == 5 else 'Sunday'
|
|
|
|
|
|
def next_service(
|
|
earliest_board: datetime, direction: str = 'pad_to_kx'
|
|
) -> tuple[datetime, datetime] | None:
|
|
"""
|
|
Given the earliest time a passenger can board at Paddington (H&C Line),
|
|
return (circle_line_depart, arrive_kings_cross) as datetimes, or None if
|
|
no service is found before midnight.
|
|
|
|
The caller is responsible for adding any walk time from the GWR platform
|
|
before passing *earliest_board*.
|
|
"""
|
|
services = upcoming_services(earliest_board, count=1, direction=direction)
|
|
return services[0] if services else None
|
|
|
|
|
|
def upcoming_services(
|
|
earliest_board: datetime, count: int = 2, direction: str = 'pad_to_kx'
|
|
) -> list[tuple[datetime, datetime]]:
|
|
"""
|
|
Return up to *count* Circle line services for *direction*, starting from
|
|
*earliest_board*.
|
|
|
|
Each element is (depart_origin, arrive_destination) as datetimes.
|
|
"""
|
|
timetable = _get_timetable().get(direction, {})[_day_type(earliest_board.weekday())]
|
|
board_secs = (
|
|
earliest_board.hour * 3600
|
|
+ earliest_board.minute * 60
|
|
+ earliest_board.second
|
|
)
|
|
midnight = earliest_board.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
results = []
|
|
for pad_secs, kxp_secs in timetable:
|
|
if pad_secs >= board_secs:
|
|
results.append((
|
|
midnight + timedelta(seconds=pad_secs),
|
|
midnight + timedelta(seconds=kxp_secs),
|
|
))
|
|
if len(results) == count:
|
|
break
|
|
return results
|