""" Combine GWR Bristol→Paddington trains with Eurostar St Pancras→destination trains. """ from flask import ( Flask, render_template, redirect, url_for, request, abort, jsonify, Response, stream_with_context, ) from flask.typing import ResponseReturnValue from datetime import date, timedelta from pathlib import Path from typing import Any, Callable, Generator import json import os from cache import get_cached, set_cached import scraper.eurostar as eurostar_scraper import scraper.gwr_fares as gwr_fares_scraper import scraper.realtime_trains as rtt_scraper from trip_planner import ( INBOUND_MAX_CONNECTION_MINUTES, INBOUND_MIN_CONNECTION_MINUTES, combine_inbound_trips, combine_trips, find_unreachable_inbound_eurostars, find_unreachable_morning_eurostars, ) RTT_PADDINGTON_URL = ( "https://www.realtimetrains.co.uk/search/detailed/" "gb-nr:PAD/from/gb-nr:{crs}/{date}/0000-2359" "?stp=WVS&show=pax-calls&order=wtt" ) RTT_STATION_URL = ( "https://www.realtimetrains.co.uk/search/detailed/" "gb-nr:{crs}/to/gb-nr:PAD/{date}/0000-2359" "?stp=WVS&show=pax-calls&order=wtt" ) app = Flask(__name__, instance_relative_config=False) app.config.from_object("config.default") _local = os.path.join(os.path.dirname(__file__), "config", "local.py") if os.path.exists(_local): app.config.from_pyfile(_local) import cache import circle_line cache.CACHE_DIR = app.config["CACHE_DIR"] # type: ignore[attr-defined] circle_line._TXC_XML = app.config["CIRCLE_LINE_XML"] # type: ignore[attr-defined] def _load_stations() -> list[tuple[str, str]]: tsv = Path(__file__).parent / "data" / "direct_to_paddington.tsv" stations = [] for line in tsv.read_text().splitlines(): line = line.strip() if "\t" in line: name, crs = line.split("\t", 1) stations.append((name, crs)) return sorted(stations, key=lambda x: x[0]) STATIONS = _load_stations() STATION_BY_CRS = {crs: name for name, crs in STATIONS} DESTINATIONS = { "paris": "Paris Gare du Nord", "brussels": "Brussels Midi", "lille": "Lille Europe", "amsterdam": "Amsterdam Centraal", "rotterdam": "Rotterdam Centraal", "cologne": "Cologne Hbf", } @app.route("/") def index() -> ResponseReturnValue: today = date.today().isoformat() default_min, default_max = _get_defaults() return render_template( "index.html", destinations=DESTINATIONS, today=today, stations=STATIONS, default_min_connection=default_min, default_max_connection=default_max, valid_min_connections=sorted(VALID_MIN_CONNECTIONS), valid_max_connections=sorted(VALID_MAX_CONNECTIONS), default_return_date=(date.today() + timedelta(days=7)).isoformat(), ) VALID_MIN_CONNECTIONS = {45, 50, 60, 70, 80, 90, 100, 110, 120} VALID_MAX_CONNECTIONS = {60, 70, 80, 90, 100, 110, 120, 130, 140, 150, 160, 170, 180} VALID_INBOUND_MIN_CONNECTIONS = {20, 30, 40, 45, 50, 60, 70, 80, 90, 100, 110, 120} VALID_INBOUND_RETURN_MIN_CONNECTIONS = {30, 40, 50, 60} VALID_INBOUND_MAX_CONNECTIONS = { 60, 70, 80, 90, 100, 110, 120, 130, 140, 150, 160, 170, 180, } VALID_JOURNEY_TYPES = {"outbound", "inbound", "return"} VALID_NR_CLASSES = {"walkon", "advance_std", "advance_1st"} VALID_ES_CLASSES = {"standard", "plus"} DEFAULT_NR_CLASS = "walkon" DEFAULT_ES_CLASS = "standard" NR_TIMETABLE_PERIODS = [ (date(2026, 5, 17), date(2026, 12, 12), "2026-05-17_2026-12-12"), ] def _weekday_for(section_date: str) -> str: return date.fromisoformat(section_date).strftime("%a").lower() def _month_for(section_date: str) -> str: return date.fromisoformat(section_date).strftime("%Y-%m") def _nr_timetable_period_key(section_date: str) -> str: dt = date.fromisoformat(section_date) for start, end, key in NR_TIMETABLE_PERIODS: if start <= dt <= end: return key return dt.strftime("%Y-%m") def _nr_exact_cache_key(direction: str, station_crs: str, section_date: str) -> str: return f"rtt_{direction}_{station_crs}_{section_date}" def _nr_weekday_cache_key(direction: str, station_crs: str, section_date: str) -> str: return ( f"weekday_rtt_{direction}_{station_crs}_" f"{_nr_timetable_period_key(section_date)}_{_weekday_for(section_date)}" ) def _walkon_weekday_cache_key( direction: str, station_crs: str, section_date: str ) -> str: return ( f"weekday_gwr_fares_{direction}_{station_crs}_" f"{_nr_timetable_period_key(section_date)}_{_weekday_for(section_date)}" ) def _eurostar_exact_cache_key( direction: str, section_date: str, destination: str ) -> str: return f"eurostar_{direction}_{section_date}_{destination}" def _eurostar_weekday_cache_key( direction: str, section_date: str, destination: str ) -> str: return ( f"weekday_eurostar_{direction}_{destination}_" f"{_month_for(section_date)}_{_weekday_for(section_date)}" ) def _eurostar_return_exact_cache_key( travel_date: str, return_date: str, destination: str ) -> str: return f"eurostar_return_{travel_date}_{return_date}_{destination}" def _eurostar_return_weekday_cache_key( travel_date: str, return_date: str, destination: str ) -> str: return ( f"weekday_eurostar_return_{destination}_" f"{_month_for(travel_date)}_{_weekday_for(travel_date)}_" f"{_month_for(return_date)}_{_weekday_for(return_date)}" ) def _strip_nr_timetable(trains: list[dict[str, Any]]) -> list[dict[str, Any]]: keys = { "depart_bristol", "arrive_paddington", "depart_paddington", "arrive_destination", "arrive_platform", "headcode", } return [{k: train[k] for k in keys if k in train} for train in trains] def _strip_eurostar_timetable(services: list[dict[str, Any]]) -> list[dict[str, Any]]: keys = { "depart_st_pancras", "arrive_destination", "depart_destination", "arrive_st_pancras", "destination", "train_number", } return [{k: service[k] for k in keys if k in service} for service in services] def _strip_eurostar_return_timetable(es_return: Any) -> dict[str, list[dict[str, Any]]]: if not isinstance(es_return, dict): return {"outbound": [], "inbound": []} return { "outbound": _strip_eurostar_timetable(es_return.get("outbound", [])), "inbound": _strip_eurostar_timetable(es_return.get("inbound", [])), } def _timetable_signature(data: Any) -> str: return json.dumps(data, sort_keys=True, separators=(",", ":")) def _eurostar_prices_by_row( section_id: str, direction: str, services: list[dict[str, Any]] ) -> dict[str, dict[str, Any]]: prices = {} for service in services: key = ( service.get("depart_st_pancras") if direction == "outbound" else service.get("depart_destination") ) if not key: continue prices[f"{section_id}:{key}"] = { "es_standard": ( {"price": service.get("price"), "seats": service.get("seats")} if service.get("price") is not None else None ), "es_plus": ( {"price": service.get("plus_price"), "seats": service.get("plus_seats")} if service.get("plus_price") is not None else None ), } return prices def _get_defaults() -> tuple[int, int]: return ( app.config["DEFAULT_MIN_CONNECTION"], app.config["DEFAULT_MAX_CONNECTION"], ) def _parse_connection(raw: str | None, default: int, valid_set: set[int]) -> int: try: val = int(raw or "") except (TypeError, ValueError): return default return val if val in valid_set else default def _section_trip_fares(section: dict[str, Any]) -> dict[str, Any]: trip_fares = {} for row in section["rows"]: circle_svcs = row.get("circle_services") or [] circle_fare = circle_svcs[0]["fare"] if circle_svcs else 0 walkon = ( {"price": row["ticket_price"], "ticket": row.get("ticket_name", "")} if row.get("ticket_price") is not None else None ) es_std = ( {"price": row["eurostar_price"], "seats": row.get("eurostar_seats")} if row.get("eurostar_price") is not None else None ) es_plus = ( { "price": row["eurostar_plus_price"], "seats": row.get("eurostar_plus_seats"), } if row.get("eurostar_plus_price") is not None else None ) trip_fares[row["row_key"]] = { "section": section["id"], "eurostar_key": row.get("eurostar_key"), "advance_key": row.get("depart_bristol") or row.get("depart_paddington"), "walkon": walkon, "es_standard": es_std, "es_plus": es_plus, "circle_fare": circle_fare, } return trip_fares def _build_summary_html( sections: list[dict[str, Any]], journey_type: str, from_cache_parts: list[str], provisional_timetable: bool, ) -> str: def pl(n: int, word: str) -> str: return f"{n} {word}{'s' if n != 1 else ''}" if journey_type == "return": parts = [] for s in sections: label = "Outbound" if s["direction"] == "outbound" else "Return" parts.append( f"{label}: {pl(s['gwr_count'], 'National Rail service')}, {pl(s['eurostar_count'], 'Eurostar service')}" ) html = "  ·  ".join(parts) else: s = sections[0] html = f"{pl(s['gwr_count'], 'National Rail service')}  ·  {pl(s['eurostar_count'], 'Eurostar service')}" if from_cache_parts: html += '  ·  (cached)' if provisional_timetable: html += '  ·  checking exact timetable' return html def _results_url( station_crs: str, slug: str, travel_date: str, journey_type: str = "outbound", return_date: str | None = None, **params: Any, ) -> str: params = {k: v for k, v in params.items() if v is not None} if journey_type == "return": return url_for( "return_results", station_crs=station_crs, slug=slug, travel_date=travel_date, return_date=return_date, **params, ) if journey_type == "inbound": params["journey_type"] = "inbound" return url_for( "results", station_crs=station_crs, slug=slug, travel_date=travel_date, **params, ) @app.route("/search") def search() -> ResponseReturnValue: slug = request.args.get("destination", "") travel_date = request.args.get("travel_date", "") return_date = request.args.get("return_date", "") journey_type = request.args.get("journey_type", "outbound") if journey_type not in VALID_JOURNEY_TYPES: journey_type = "outbound" station_crs = request.args.get("station_crs", "BRI") if station_crs not in STATION_BY_CRS: station_crs = "BRI" if journey_type == "inbound": default_min, default_max = ( INBOUND_MIN_CONNECTION_MINUTES, INBOUND_MAX_CONNECTION_MINUTES, ) valid_min, valid_max = ( VALID_INBOUND_MIN_CONNECTIONS, VALID_INBOUND_MAX_CONNECTIONS, ) else: default_min, default_max = _get_defaults() valid_min, valid_max = VALID_MIN_CONNECTIONS, VALID_MAX_CONNECTIONS min_conn = _parse_connection( request.args.get("min_connection"), default_min, valid_min ) max_conn = _parse_connection( request.args.get("max_connection"), default_max, valid_max ) nr_class = request.args.get("nr_class", DEFAULT_NR_CLASS) if nr_class not in VALID_NR_CLASSES: nr_class = DEFAULT_NR_CLASS es_class = request.args.get("es_class", DEFAULT_ES_CLASS) if es_class not in VALID_ES_CLASSES: es_class = DEFAULT_ES_CLASS if journey_type == "return": try: if return_date and date.fromisoformat(return_date) < date.fromisoformat( travel_date ): return_date = "" except ValueError: return_date = "" if ( slug in DESTINATIONS and travel_date and (journey_type != "return" or return_date) ): return redirect( _results_url( station_crs=station_crs, slug=slug, travel_date=travel_date, journey_type=journey_type, return_date=return_date if journey_type == "return" else None, min_connection=None if min_conn == default_min else min_conn, max_connection=None if max_conn == default_max else max_conn, nr_class=None if nr_class == DEFAULT_NR_CLASS else nr_class, es_class=None if es_class == DEFAULT_ES_CLASS else es_class, ) ) return redirect(url_for("index")) @app.route("/results///") def results(station_crs: str, slug: str, travel_date: str) -> ResponseReturnValue: return _results( station_crs, slug, travel_date, request.args.get("journey_type", "outbound"), request.args.get("return_date"), ) @app.route("/results////return/") def return_results( station_crs: str, slug: str, travel_date: str, return_date: str ) -> ResponseReturnValue: return _results(station_crs, slug, travel_date, "return", return_date) def _results( station_crs: str, slug: str, travel_date: str, journey_type: str, return_date: str | None, ) -> ResponseReturnValue: departure_station_name = STATION_BY_CRS.get(station_crs) if departure_station_name is None: abort(404) destination = DESTINATIONS.get(slug) if not destination or not travel_date: return redirect(url_for("index")) if journey_type not in VALID_JOURNEY_TYPES: journey_type = "outbound" if journey_type == "return": try: if not return_date or date.fromisoformat(return_date) < date.fromisoformat( travel_date ): return redirect(url_for("index")) except ValueError: return redirect(url_for("index")) if journey_type == "inbound": default_min, default_max = ( INBOUND_MIN_CONNECTION_MINUTES, INBOUND_MAX_CONNECTION_MINUTES, ) valid_min, valid_max = ( VALID_INBOUND_MIN_CONNECTIONS, VALID_INBOUND_MAX_CONNECTIONS, ) else: default_min, default_max = _get_defaults() valid_min, valid_max = VALID_MIN_CONNECTIONS, VALID_MAX_CONNECTIONS min_connection = _parse_connection( request.args.get("min_connection"), default_min, valid_min ) max_connection = _parse_connection( request.args.get("max_connection"), default_max, valid_max ) nr_class = request.args.get("nr_class", DEFAULT_NR_CLASS) if nr_class not in VALID_NR_CLASSES: nr_class = DEFAULT_NR_CLASS es_class = request.args.get("es_class", DEFAULT_ES_CLASS) if es_class not in VALID_ES_CLASSES: es_class = DEFAULT_ES_CLASS inbound_min_connection = INBOUND_MIN_CONNECTION_MINUTES if journey_type == "return": def _p(raw: str | None, default: str, valid: set[str]) -> str: return raw if raw in valid else default nr_class_out = _p( request.args.get("nr_class_out"), DEFAULT_NR_CLASS, VALID_NR_CLASSES ) nr_class_in = _p( request.args.get("nr_class_in"), DEFAULT_NR_CLASS, VALID_NR_CLASSES ) es_class_out = _p( request.args.get("es_class_out"), DEFAULT_ES_CLASS, VALID_ES_CLASSES ) es_class_in = _p( request.args.get("es_class_in"), DEFAULT_ES_CLASS, VALID_ES_CLASSES ) inbound_min_connection = _parse_connection( request.args.get("min_connection_in"), INBOUND_MIN_CONNECTION_MINUTES, VALID_INBOUND_RETURN_MIN_CONNECTIONS, ) else: nr_class_out = nr_class_in = nr_class es_class_out = es_class_in = es_class render = request.args.get("render") if render not in ("full", "stream") and not ( app.config.get("TESTING") and request.args.get("progressive") != "1" ): dt = date.fromisoformat(travel_date) travel_date_display = dt.strftime("%A %-d %B %Y") return_date_display = ( date.fromisoformat(return_date).strftime("%A %-d %B %Y") if return_date else None ) base_args = dict(request.args) base_args.pop("progressive", None) base_args.pop("journey_type", None) base_args.pop("return_date", None) stream_args = {**base_args, "render": "stream"} full_args = {**base_args, "render": "full"} return render_template( "results_loading.html", destination=destination, departure_station_name=departure_station_name, journey_type=journey_type, travel_date_display=travel_date_display, return_date=return_date, return_date_display=return_date_display, stream_url=_results_url( station_crs=station_crs, slug=slug, travel_date=travel_date, journey_type=journey_type, return_date=return_date, **stream_args, ), full_results_url=_results_url( station_crs=station_crs, slug=slug, travel_date=travel_date, journey_type=journey_type, return_date=return_date, **full_args, ), index_url=url_for("index"), ) user_agent = request.headers.get("User-Agent", rtt_scraper.DEFAULT_UA) error_messages = [] from_cache_parts = [] provisional_timetable = False def cached_fetch(key: str, ttl: int, fetcher: Callable[[], Any], label: str) -> Any: cached = get_cached(key, ttl=ttl) if cached is not None: from_cache_parts.append(key) return cached try: data = fetcher() set_cached(key, data) return data except Exception as e: error_messages.append(f"Could not fetch {label}: {e}") return [] if label != "GWR fares" else {} def cached_timetable_fetch( exact_key: str, weekday_key: str, fetcher: Callable[[], Any], label: str, stripper: Callable[[Any], Any], ttl: int | None = None, ) -> tuple[Any, bool]: nonlocal provisional_timetable cached = get_cached(exact_key, ttl=ttl) if cached is not None: from_cache_parts.append(exact_key) return cached, False weekday_cached = get_cached(weekday_key) if weekday_cached is not None: from_cache_parts.append(weekday_key) provisional_timetable = True return weekday_cached, True try: data = fetcher() set_cached(exact_key, data) set_cached(weekday_key, stripper(data)) return data, False except Exception as e: error_messages.append(f"Could not fetch {label}: {e}") if label == "Eurostar return times": return {"outbound": [], "inbound": []}, False return [], False es_return: dict[str, Any] = {"outbound": [], "inbound": []} es_return_provisional = False def _fetch_es_return() -> None: nonlocal es_return, es_return_provisional assert return_date is not None es_return, es_return_provisional = cached_timetable_fetch( _eurostar_return_exact_cache_key(travel_date, return_date, destination), _eurostar_return_weekday_cache_key(travel_date, return_date, destination), lambda: eurostar_scraper.fetch_return( destination, travel_date, return_date ), "Eurostar return times", _strip_eurostar_return_timetable, 24 * 3600, ) if not isinstance(es_return, dict): es_return = {"outbound": [], "inbound": []} def build_section( section_id: str, direction: str, section_date: str, eurostar_services: list[dict[str, Any]] | None = None, ) -> dict[str, Any]: section_min_connection = min_connection section_max_connection = max_connection if journey_type == "return" and direction == "inbound": section_min_connection = inbound_min_connection section_max_connection = INBOUND_MAX_CONNECTION_MINUTES rtt_direction = ( "to_paddington" if direction == "outbound" else "from_paddington" ) rtt_cache_key = _nr_exact_cache_key(rtt_direction, station_crs, section_date) rtt_weekday_cache_key = _nr_weekday_cache_key( rtt_direction, station_crs, section_date ) gwr_cache_key = f"gwr_fares_{rtt_direction}_{station_crs}_{section_date}" advance_cache_key = f"gwr_advance_{rtt_direction}_{station_crs}_{section_date}" if direction == "outbound": trains, nr_provisional = cached_timetable_fetch( rtt_cache_key, rtt_weekday_cache_key, lambda: rtt_scraper.fetch(section_date, user_agent, station_crs), "GWR trains", _strip_nr_timetable, ) else: trains, nr_provisional = cached_timetable_fetch( rtt_cache_key, rtt_weekday_cache_key, lambda: rtt_scraper.fetch_from_paddington( section_date, user_agent, station_crs ), "GWR trains", _strip_nr_timetable, ) es_provisional = es_return_provisional if journey_type == "return" else False if eurostar_services is None: es_cache_key = _eurostar_exact_cache_key( direction, section_date, destination ) es_weekday_cache_key = _eurostar_weekday_cache_key( direction, section_date, destination ) es_fetcher = ( (lambda: eurostar_scraper.fetch(destination, section_date)) if direction == "outbound" else ( lambda: eurostar_scraper.fetch( destination, section_date, direction=direction ) ) ) eurostar_services, es_provisional = cached_timetable_fetch( es_cache_key, es_weekday_cache_key, es_fetcher, "Eurostar times", _strip_eurostar_timetable, 24 * 3600, ) fare_direction = ( "to_paddington" if direction == "outbound" else "from_paddington" ) gwr_fares: dict[str, Any] = {} cached_advance = get_cached(advance_cache_key, ttl=24 * 3600) walkon_weekday_key = _walkon_weekday_cache_key( rtt_direction, station_crs, section_date ) exact_walkon = get_cached(gwr_cache_key, ttl=30 * 24 * 3600) cached_walkon = ( exact_walkon if exact_walkon is not None else get_cached(walkon_weekday_key) ) if direction == "outbound": trips = combine_trips( trains, eurostar_services, section_date, section_min_connection, section_max_connection, gwr_fares, ) unreachable = find_unreachable_morning_eurostars( trains, eurostar_services, section_date, section_min_connection, section_max_connection, ) if trips: first_es_depart = min(t["depart_st_pancras"] for t in trips) unreachable = [ s for s in unreachable if s["depart_st_pancras"] < first_es_depart ] rows = sorted( [{"row_type": "trip", "direction": direction, **trip} for trip in trips] + [ {"row_type": "unreachable", "direction": direction, **svc} for svc in unreachable ], key=lambda row: row["depart_st_pancras"], ) else: trips = combine_inbound_trips( eurostar_services, trains, section_date, section_min_connection, section_max_connection, gwr_fares, ) unreachable = find_unreachable_inbound_eurostars( eurostar_services, trains, section_date, section_min_connection, section_max_connection, ) if trips: first_es_depart = min(t["depart_destination"] for t in trips) unreachable = [ s for s in unreachable if s["depart_destination"] < first_es_depart ] rows = sorted( [{"row_type": "trip", "direction": direction, **trip} for trip in trips] + [ {"row_type": "unreachable", "direction": direction, **svc} for svc in unreachable ], key=lambda row: row["depart_destination"], ) es_by_key = { ( svc.get("depart_st_pancras") if direction == "outbound" else svc.get("depart_destination") ): svc for svc in eurostar_services } for row in rows: key = ( row.get("depart_st_pancras") if direction == "outbound" else row.get("depart_destination") ) es = es_by_key.get(key, {}) row["eurostar_price"] = es.get("price") row["eurostar_seats"] = es.get("seats") row["eurostar_plus_price"] = es.get("plus_price") row["eurostar_plus_seats"] = es.get("plus_seats") row["eurostar_key"] = f"{section_id}:{key}" if row.get("row_type") == "trip": nr_key = row.get("depart_bristol") or row.get("depart_paddington") row["row_key"] = f"{section_id}:{nr_key}:{key}" else: row["row_key"] = f"{section_id}:unreachable:{key}" dt = date.fromisoformat(section_date) return { "id": section_id, "direction": direction, "date": section_date, "date_display": dt.strftime("%A %-d %B %Y"), "rows": rows, "trips": trips, "gwr_count": len(trains), "eurostar_count": len(eurostar_services), "min_connection": section_min_connection, "max_connection": section_max_connection, "provisional_timetable": nr_provisional or es_provisional, "advance_fares": cached_advance, "cached_walkon_fares": cached_walkon, "walkon_api_url": url_for( "api_walkon_fares", station_crs=station_crs, travel_date=section_date, direction=fare_direction, ), "advance_api_url": url_for( "api_advance_fares", station_crs=station_crs, travel_date=section_date, direction=fare_direction, ), "advance_stream_url": url_for( "api_advance_fares_stream", station_crs=station_crs, travel_date=section_date, direction=fare_direction, ), } if render == "stream": def generate() -> Generator[str, None, None]: dt = date.fromisoformat(travel_date) prev_date = (dt - timedelta(days=1)).isoformat() next_date = (dt + timedelta(days=1)).isoformat() travel_date_display = dt.strftime("%A %-d %B %Y") return_date_display = None prev_return_date = return_date next_return_date = return_date if return_date: return_dt = date.fromisoformat(return_date) return_date_display = return_dt.strftime("%A %-d %B %Y") prev_return_date = (return_dt - timedelta(days=1)).isoformat() next_return_date = (return_dt + timedelta(days=1)).isoformat() eurostar_url = eurostar_scraper.search_url( destination, travel_date, direction=journey_type, return_date=return_date, ) rtt_url = RTT_PADDINGTON_URL.format(crs=station_crs, date=travel_date) rtt_station_url = RTT_STATION_URL.format(crs=station_crs, date=travel_date) url_min = None if min_connection == default_min else min_connection url_max = None if max_connection == default_max else max_connection url_nr = None if nr_class == DEFAULT_NR_CLASS else nr_class url_es = None if es_class == DEFAULT_ES_CLASS else es_class if journey_type == "return": common_url_args: dict[str, Any] = { "journey_type": journey_type, "return_date": return_date, "min_connection": url_min, "max_connection": url_max, "min_connection_in": ( None if inbound_min_connection == INBOUND_MIN_CONNECTION_MINUTES else inbound_min_connection ), "nr_class_out": ( None if nr_class_out == DEFAULT_NR_CLASS else nr_class_out ), "nr_class_in": ( None if nr_class_in == DEFAULT_NR_CLASS else nr_class_in ), "es_class_out": ( None if es_class_out == DEFAULT_ES_CLASS else es_class_out ), "es_class_in": ( None if es_class_in == DEFAULT_ES_CLASS else es_class_in ), } else: common_url_args = { "journey_type": journey_type, "return_date": return_date, "min_connection": url_min, "max_connection": url_max, "nr_class": url_nr, "es_class": url_es, } prev_results_url = _results_url( station_crs, slug, prev_date, **{**common_url_args, "return_date": prev_return_date}, ) next_results_url = _results_url( station_crs, slug, next_date, **{**common_url_args, "return_date": next_return_date}, ) prev_outbound_url = _results_url( station_crs, slug, prev_date, **common_url_args ) next_outbound_url = _results_url( station_crs, slug, next_date, **common_url_args ) prev_return_url = ( _results_url( station_crs, slug, travel_date, **{**common_url_args, "return_date": prev_return_date}, ) if return_date else None ) next_return_url = ( _results_url( station_crs, slug, travel_date, **{**common_url_args, "return_date": next_return_date}, ) if return_date else None ) destination_links = [ ( destination_slug, destination_name, _results_url( station_crs, destination_slug, travel_date, **common_url_args ), ) for destination_slug, destination_name in DESTINATIONS.items() ] results_base_url = _results_url( station_crs, slug, travel_date, journey_type=journey_type, return_date=return_date, ) if journey_type == "return": shell_sections = [ { "id": "outbound", "direction": "outbound", "min_connection": min_connection, "max_connection": max_connection, }, { "id": "inbound", "direction": "inbound", "min_connection": inbound_min_connection, "max_connection": INBOUND_MAX_CONNECTION_MINUTES, }, ] shell_nr_classes = {"outbound": nr_class_out, "inbound": nr_class_in} shell_es_classes = {"outbound": es_class_out, "inbound": es_class_in} shell_section_directions = { "outbound": "outbound", "inbound": "inbound", } else: shell_sections = [ { "id": "main", "direction": journey_type, "min_connection": min_connection, "max_connection": max_connection, }, ] shell_nr_classes = {"main": nr_class} shell_es_classes = {"main": es_class} shell_section_directions = {"main": journey_type} shell_html = render_template( "results_shell.html", journey_type=journey_type, destination=destination, departure_station_name=departure_station_name, travel_date=travel_date, return_date=return_date, travel_date_display=travel_date_display, return_date_display=return_date_display, slug=slug, sections=shell_sections, nr_classes=shell_nr_classes, es_classes=shell_es_classes, nr_classes_json=json.dumps(shell_nr_classes), es_classes_json=json.dumps(shell_es_classes), section_directions_json=json.dumps(shell_section_directions), results_base_url=results_base_url, prev_results_url=prev_results_url, next_results_url=next_results_url, prev_outbound_url=prev_outbound_url, next_outbound_url=next_outbound_url, prev_return_url=prev_return_url, next_return_url=next_return_url, destination_links=destination_links, eurostar_url=eurostar_url, rtt_url=rtt_url, rtt_station_url=rtt_station_url, min_connection=min_connection, max_connection=max_connection, default_min_connection=default_min, default_max_connection=default_max, default_inbound_min_connection=INBOUND_MIN_CONNECTION_MINUTES, valid_min_connections=sorted(valid_min), valid_max_connections=sorted(valid_max), inbound_min_connection=inbound_min_connection, valid_inbound_return_min_connections=sorted( VALID_INBOUND_RETURN_MIN_CONNECTIONS ), ) yield f"data: {json.dumps({'type': 'shell', 'html': shell_html})}\n\n" if journey_type == "return": assert return_date is not None _fetch_es_return() sections_spec: list[ tuple[str, str, str, list[dict[str, Any]] | None] ] = [ ( "outbound", "outbound", travel_date, es_return.get("outbound", []), ), ("inbound", "inbound", return_date, es_return.get("inbound", [])), ] else: sections_spec = [("main", journey_type, travel_date, None)] built_sections: list[dict[str, Any]] = [] for section_id, direction, section_date, eurostar_services in sections_spec: section = build_section( section_id, direction, section_date, eurostar_services ) built_sections.append(section) section_html = render_template( "results_section.html", section=section, destination=destination, departure_station_name=departure_station_name, ) yield f"data: {json.dumps({'type': 'section', 'id': section_id, 'html': section_html, 'trip_fares': _section_trip_fares(section), 'advance_fares': section['advance_fares'], 'walkon_cached_fares': section.get('cached_walkon_fares'), 'walkon_api_url': section['walkon_api_url'], 'advance_api_url': section['advance_api_url'], 'advance_stream_url': section['advance_stream_url']})}\n\n" if journey_type == "return": timetable_refresh_url = url_for( "api_return_results_refresh", station_crs=station_crs, slug=slug, travel_date=travel_date, return_date=return_date, ) else: timetable_refresh_url = url_for( "api_results_refresh", station_crs=station_crs, slug=slug, travel_date=travel_date, journey_type=journey_type if journey_type == "inbound" else None, ) summary_html = _build_summary_html( built_sections, journey_type, from_cache_parts, provisional_timetable ) yield f"data: {json.dumps({'type': 'done', 'timetable_refresh_url': timetable_refresh_url, 'provisional_timetable': provisional_timetable, 'summary_html': summary_html})}\n\n" return Response(stream_with_context(generate()), mimetype="text/event-stream") if journey_type == "return": assert return_date is not None _fetch_es_return() sections = [ build_section( "outbound", "outbound", travel_date, es_return.get("outbound", []) ), build_section( "inbound", "inbound", return_date, es_return.get("inbound", []) ), ] else: sections = [build_section("main", journey_type, travel_date)] nr_classes = {} es_classes = {} section_directions = {} for section in sections: direction = section["direction"] section_directions[section["id"]] = direction nr_classes[section["id"]] = ( nr_class_out if direction == "outbound" else nr_class_in ) es_classes[section["id"]] = ( es_class_out if direction == "outbound" else es_class_in ) no_prices_note = None all_es_prices = [ row.get("eurostar_price") for section in sections for row in section["rows"] if row.get("row_type") == "trip" ] if ( not provisional_timetable and all_es_prices and all(price is None for price in all_es_prices) ): no_prices_note = ( "Eurostar prices not yet available — tickets may not be on sale yet." ) dt = date.fromisoformat(travel_date) prev_date = (dt - timedelta(days=1)).isoformat() next_date = (dt + timedelta(days=1)).isoformat() travel_date_display = dt.strftime("%A %-d %B %Y") return_date_display = None prev_return_date = return_date next_return_date = return_date if return_date: return_dt = date.fromisoformat(return_date) return_date_display = return_dt.strftime("%A %-d %B %Y") prev_return_date = (return_dt - timedelta(days=1)).isoformat() next_return_date = (return_dt + timedelta(days=1)).isoformat() eurostar_url = eurostar_scraper.search_url( destination, travel_date, direction=journey_type, return_date=return_date ) rtt_url = RTT_PADDINGTON_URL.format(crs=station_crs, date=travel_date) rtt_station_url = RTT_STATION_URL.format(crs=station_crs, date=travel_date) url_min = None if min_connection == default_min else min_connection url_max = None if max_connection == default_max else max_connection url_nr = None if nr_class == DEFAULT_NR_CLASS else nr_class url_es = None if es_class == DEFAULT_ES_CLASS else es_class common_url_args: dict[str, Any] if journey_type == "return": common_url_args = { "journey_type": journey_type, "return_date": return_date, "min_connection": url_min, "max_connection": url_max, "min_connection_in": ( None if inbound_min_connection == INBOUND_MIN_CONNECTION_MINUTES else inbound_min_connection ), "nr_class_out": None if nr_class_out == DEFAULT_NR_CLASS else nr_class_out, "nr_class_in": None if nr_class_in == DEFAULT_NR_CLASS else nr_class_in, "es_class_out": None if es_class_out == DEFAULT_ES_CLASS else es_class_out, "es_class_in": None if es_class_in == DEFAULT_ES_CLASS else es_class_in, } else: common_url_args = { "journey_type": journey_type, "return_date": return_date, "min_connection": url_min, "max_connection": url_max, "nr_class": url_nr, "es_class": url_es, } prev_results_url = _results_url( station_crs, slug, prev_date, **{**common_url_args, "return_date": prev_return_date}, ) next_results_url = _results_url( station_crs, slug, next_date, **{**common_url_args, "return_date": next_return_date}, ) prev_outbound_url = _results_url(station_crs, slug, prev_date, **common_url_args) next_outbound_url = _results_url(station_crs, slug, next_date, **common_url_args) prev_return_url = ( _results_url( station_crs, slug, travel_date, **{**common_url_args, "return_date": prev_return_date}, ) if return_date else None ) next_return_url = ( _results_url( station_crs, slug, travel_date, **{**common_url_args, "return_date": next_return_date}, ) if return_date else None ) destination_links = [ ( destination_slug, destination_name, _results_url( station_crs, destination_slug, travel_date, **common_url_args, ), ) for destination_slug, destination_name in DESTINATIONS.items() ] results_base_url = _results_url( station_crs, slug, travel_date, journey_type=journey_type, return_date=return_date, ) trip_fares = {} advance_fares = {} walkon_cached_fares = {} walkon_api_urls = {} advance_api_urls = {} advance_stream_urls = {} for section in sections: advance_fares[section["id"]] = section["advance_fares"] walkon_cached_fares[section["id"]] = section.get("cached_walkon_fares") walkon_api_urls[section["id"]] = section["walkon_api_url"] advance_api_urls[section["id"]] = section["advance_api_url"] advance_stream_urls[section["id"]] = section["advance_stream_url"] for row in section["rows"]: circle_svcs = row.get("circle_services") or [] circle_fare = circle_svcs[0]["fare"] if circle_svcs else 0 walkon = ( {"price": row["ticket_price"], "ticket": row.get("ticket_name", "")} if row.get("ticket_price") is not None else None ) es_std = ( {"price": row["eurostar_price"], "seats": row.get("eurostar_seats")} if row.get("eurostar_price") is not None else None ) es_plus = ( { "price": row["eurostar_plus_price"], "seats": row.get("eurostar_plus_seats"), } if row.get("eurostar_plus_price") is not None else None ) trip_fares[row["row_key"]] = { "section": section["id"], "eurostar_key": row.get("eurostar_key"), "advance_key": row.get("depart_bristol") or row.get("depart_paddington"), "walkon": walkon, "es_standard": es_std, "es_plus": es_plus, "circle_fare": circle_fare, } if journey_type == "return": timetable_refresh_url = url_for( "api_return_results_refresh", station_crs=station_crs, slug=slug, travel_date=travel_date, return_date=return_date, ) else: timetable_refresh_url = url_for( "api_results_refresh", station_crs=station_crs, slug=slug, travel_date=travel_date, journey_type=journey_type if journey_type == "inbound" else None, ) return render_template( "results.html", sections=sections, trips=sections[0]["trips"] if sections else [], result_rows=sections[0]["rows"] if sections else [], unreachable_morning_services=[], destinations=DESTINATIONS, destination=destination, travel_date=travel_date, return_date=return_date, journey_type=journey_type, slug=slug, station_crs=station_crs, departure_station_name=departure_station_name, prev_date=prev_date, next_date=next_date, prev_results_url=prev_results_url, next_results_url=next_results_url, prev_outbound_url=prev_outbound_url, next_outbound_url=next_outbound_url, prev_return_url=prev_return_url, next_return_url=next_return_url, destination_links=destination_links, results_base_url=results_base_url, travel_date_display=travel_date_display, return_date_display=return_date_display, gwr_count=sum(section["gwr_count"] for section in sections), eurostar_count=sum(section["eurostar_count"] for section in sections), from_cache=bool(from_cache_parts), provisional_timetable=provisional_timetable, error="; ".join(error_messages) if error_messages else None, no_prices_note=no_prices_note, eurostar_url=eurostar_url, rtt_url=rtt_url, rtt_station_url=rtt_station_url, min_connection=min_connection, max_connection=max_connection, default_min_connection=default_min, default_max_connection=default_max, url_min_connection=url_min, url_max_connection=url_max, nr_class=nr_class, es_class=es_class, url_nr_class=url_nr, url_es_class=url_es, nr_classes=nr_classes, es_classes=es_classes, nr_classes_json=json.dumps(nr_classes), es_classes_json=json.dumps(es_classes), section_directions_json=json.dumps(section_directions), trip_fares_json=json.dumps(trip_fares), advance_fares_json=json.dumps(advance_fares), walkon_cached_fares_json=json.dumps(walkon_cached_fares), walkon_api_urls_json=json.dumps(walkon_api_urls), advance_api_urls_json=json.dumps(advance_api_urls), advance_stream_urls_json=json.dumps(advance_stream_urls), timetable_refresh_url=timetable_refresh_url, advance_fares_api_url=url_for( "api_advance_fares", station_crs=station_crs, travel_date=travel_date ), advance_fares_stream_url=url_for( "api_advance_fares_stream", station_crs=station_crs, travel_date=travel_date ), valid_min_connections=sorted(valid_min), valid_max_connections=sorted(valid_max), inbound_min_connection=inbound_min_connection, default_inbound_min_connection=INBOUND_MIN_CONNECTION_MINUTES, valid_inbound_return_min_connections=sorted( VALID_INBOUND_RETURN_MIN_CONNECTIONS ), ) def _fetch_exact_nr_timetable( station_crs: str, section_date: str, direction: str, user_agent: str ) -> list[dict[str, Any]]: rtt_direction = "to_paddington" if direction == "outbound" else "from_paddington" exact_key = _nr_exact_cache_key(rtt_direction, station_crs, section_date) weekday_key = _nr_weekday_cache_key(rtt_direction, station_crs, section_date) trains = ( rtt_scraper.fetch(section_date, user_agent, station_crs) if direction == "outbound" else rtt_scraper.fetch_from_paddington(section_date, user_agent, station_crs) ) set_cached(exact_key, trains) set_cached(weekday_key, _strip_nr_timetable(trains)) return trains def _fetch_exact_eurostar_single( destination: str, section_date: str, direction: str ) -> list[dict[str, Any]]: exact_key = _eurostar_exact_cache_key(direction, section_date, destination) weekday_key = _eurostar_weekday_cache_key(direction, section_date, destination) services = ( eurostar_scraper.fetch(destination, section_date) if direction == "outbound" else eurostar_scraper.fetch(destination, section_date, direction=direction) ) set_cached(exact_key, services) set_cached(weekday_key, _strip_eurostar_timetable(services)) return services def _fetch_exact_eurostar_return( destination: str, travel_date: str, return_date: str ) -> dict[str, list[dict[str, Any]]]: exact_key = _eurostar_return_exact_cache_key(travel_date, return_date, destination) weekday_key = _eurostar_return_weekday_cache_key( travel_date, return_date, destination ) services = eurostar_scraper.fetch_return(destination, travel_date, return_date) set_cached(exact_key, services) set_cached(weekday_key, _strip_eurostar_return_timetable(services)) return services @app.route("/api/walkon_fares//") def api_walkon_fares( station_crs: str, travel_date: str ) -> Response | tuple[Response, int]: if station_crs not in STATION_BY_CRS: abort(404) direction = request.args.get("direction", "to_paddington") if direction not in {"to_paddington", "from_paddington"}: direction = "to_paddington" cache_key = f"gwr_fares_{direction}_{station_crs}_{travel_date}" weekday_key = _walkon_weekday_cache_key(direction, station_crs, travel_date) cached = get_cached(cache_key, ttl=30 * 24 * 3600) if cached is not None: if get_cached(weekday_key) is None: set_cached(weekday_key, cached) return jsonify(cached) try: fares = ( gwr_fares_scraper.fetch(station_crs, travel_date) if direction == "to_paddington" else gwr_fares_scraper.fetch(station_crs, travel_date, direction=direction) ) set_cached(cache_key, fares) set_cached(weekday_key, fares) return jsonify(fares) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/results_refresh///") def api_results_refresh(station_crs: str, slug: str, travel_date: str) -> Response: return _api_results_refresh( station_crs, slug, travel_date, request.args.get("return_date") ) @app.route( "/api/results_refresh////return/" ) def api_return_results_refresh( station_crs: str, slug: str, travel_date: str, return_date: str ) -> Response: return _api_results_refresh(station_crs, slug, travel_date, return_date, "return") def _api_results_refresh( station_crs: str, slug: str, travel_date: str, return_date: str | None = None, path_journey_type: str | None = None, ) -> Response: if station_crs not in STATION_BY_CRS: abort(404) destination = DESTINATIONS.get(slug) if not destination: abort(404) journey_type = path_journey_type or request.args.get("journey_type", "outbound") if journey_type not in VALID_JOURNEY_TYPES: journey_type = "outbound" if return_date is None: return_date = request.args.get("return_date") if journey_type == "return" and not return_date: abort(400) user_agent = request.headers.get("User-Agent", rtt_scraper.DEFAULT_UA) def generate() -> Generator[str, None, None]: try: old_es_weekdays: dict[str, Any] = {} if journey_type == "return": assert return_date is not None old_es_weekdays["return"] = get_cached( _eurostar_return_weekday_cache_key( travel_date, return_date, destination ) ) es_return = _fetch_exact_eurostar_return( destination, travel_date, return_date ) sections = [ ( "outbound", "outbound", travel_date, es_return.get("outbound", []), ), ("inbound", "inbound", return_date, es_return.get("inbound", [])), ] else: direction = journey_type section_date = travel_date old_es_weekdays["main"] = get_cached( _eurostar_weekday_cache_key(direction, section_date, destination) ) es_services = _fetch_exact_eurostar_single( destination, section_date, direction ) sections = [("main", direction, section_date, es_services)] reload_needed = False eurostar_prices = {} for section_id, direction, section_date, es_services in sections: nr_weekday_key = _nr_weekday_cache_key( "to_paddington" if direction == "outbound" else "from_paddington", station_crs, section_date, ) old_nr_weekday = get_cached(nr_weekday_key) exact_nr = _fetch_exact_nr_timetable( station_crs, section_date, direction, user_agent ) if old_nr_weekday is not None and _timetable_signature( old_nr_weekday ) != _timetable_signature(_strip_nr_timetable(exact_nr)): reload_needed = True old_es_weekday = ( old_es_weekdays["return"] if journey_type == "return" else old_es_weekdays[section_id] ) exact_es_timetable = ( _strip_eurostar_return_timetable(es_return) if journey_type == "return" else _strip_eurostar_timetable(es_services) ) if old_es_weekday is not None and _timetable_signature( old_es_weekday ) != _timetable_signature(exact_es_timetable): reload_needed = True eurostar_prices.update( _eurostar_prices_by_row(section_id, direction, es_services) ) if reload_needed: yield f"data: {json.dumps({'type': 'reload'})}\n\n" yield f"data: {json.dumps({'type': 'done'})}\n\n" return if eurostar_prices: yield f"data: {json.dumps({'type': 'eurostar_prices', 'prices': eurostar_prices})}\n\n" for section_id, direction, section_date, _es_services in sections: fare_direction = ( "to_paddington" if direction == "outbound" else "from_paddington" ) cache_key = f"gwr_fares_{fare_direction}_{station_crs}_{section_date}" cached = get_cached(cache_key, ttl=30 * 24 * 3600) if cached is None: cached = ( gwr_fares_scraper.fetch(station_crs, section_date) if fare_direction == "to_paddington" else gwr_fares_scraper.fetch( station_crs, section_date, direction=fare_direction ) ) set_cached(cache_key, cached) yield f"data: {json.dumps({'type': 'walkon_fares', 'section': section_id, 'fares': cached})}\n\n" except Exception as e: yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n" return yield f"data: {json.dumps({'type': 'done'})}\n\n" return Response(stream_with_context(generate()), mimetype="text/event-stream") @app.route("/api/advance_fares//") def api_advance_fares( station_crs: str, travel_date: str ) -> Response | tuple[Response, int]: if station_crs not in STATION_BY_CRS: abort(404) direction = request.args.get("direction", "to_paddington") if direction not in {"to_paddington", "from_paddington"}: direction = "to_paddington" cache_key = f"gwr_advance_{direction}_{station_crs}_{travel_date}" cached = get_cached(cache_key, ttl=24 * 3600) if cached is not None: return jsonify(cached) try: fares = ( gwr_fares_scraper.fetch_advance(station_crs, travel_date) if direction == "to_paddington" else gwr_fares_scraper.fetch_advance( station_crs, travel_date, direction=direction ) ) set_cached(cache_key, fares) return jsonify(fares) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/advance_fares_stream//") def api_advance_fares_stream(station_crs: str, travel_date: str) -> Response: if station_crs not in STATION_BY_CRS: abort(404) direction = request.args.get("direction", "to_paddington") if direction not in {"to_paddington", "from_paddington"}: direction = "to_paddington" cache_key = f"gwr_advance_{direction}_{station_crs}_{travel_date}" def generate() -> Generator[str, None, None]: cached = get_cached(cache_key, ttl=24 * 3600) if cached is not None: yield f"data: {json.dumps({'type': 'fares', 'fares': cached})}\n\n" yield f"data: {json.dumps({'type': 'done'})}\n\n" return accumulated: dict[str, Any] = {} try: stream = ( gwr_fares_scraper.fetch_advance_streaming(station_crs, travel_date) if direction == "to_paddington" else gwr_fares_scraper.fetch_advance_streaming( station_crs, travel_date, direction=direction ) ) for page_fares in stream: for dep_time, fare_data in page_fares.items(): if dep_time not in accumulated: accumulated[dep_time] = { "advance_std": None, "advance_1st": None, } if fare_data.get("advance_std"): accumulated[dep_time]["advance_std"] = fare_data["advance_std"] if fare_data.get("advance_1st"): accumulated[dep_time]["advance_1st"] = fare_data["advance_1st"] yield f"data: {json.dumps({'type': 'fares', 'fares': page_fares})}\n\n" except Exception as e: yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n" return set_cached(cache_key, accumulated) yield f"data: {json.dumps({'type': 'done'})}\n\n" return Response( stream_with_context(generate()), mimetype="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, ) if __name__ == "__main__": app.run(debug=True, host="0.0.0.0")