Various improvements
This commit is contained in:
parent
2090268754
commit
876eb6a759
5 changed files with 98 additions and 72 deletions
|
|
@ -13,7 +13,6 @@ Data path: props.pageProps.pageData.liveDepartures[]
|
|||
.destination.model.scheduledArrivalDateTime → destination arrival
|
||||
(already filtered to the requested stop, not the final stop)
|
||||
"""
|
||||
import asyncio
|
||||
import json
|
||||
import re
|
||||
import httpx
|
||||
|
|
@ -62,29 +61,26 @@ def _parse(html: str, destination: str) -> list[dict]:
|
|||
dep_time = _hhmm(dep['origin']['model']['scheduledDepartureDateTime'])
|
||||
arr_time = _hhmm(dep['destination']['model']['scheduledArrivalDateTime'])
|
||||
if dep_time and arr_time:
|
||||
carrier = dep.get('model', {}).get('carrier', 'ES')
|
||||
number = dep.get('model', {}).get('trainNumber', '')
|
||||
services.append({
|
||||
'depart_st_pancras': dep_time,
|
||||
'arrive_destination': arr_time,
|
||||
'destination': destination,
|
||||
'train_number': f"{carrier} {number}" if number else '',
|
||||
})
|
||||
return sorted(services, key=lambda s: s['depart_st_pancras'])
|
||||
|
||||
|
||||
async def fetch(destination: str, travel_date: str,
|
||||
user_agent: str = DEFAULT_UA) -> list[dict]:
|
||||
def fetch(destination: str, travel_date: str,
|
||||
user_agent: str = DEFAULT_UA) -> list[dict]:
|
||||
url = ROUTE_URLS[destination]
|
||||
headers = {
|
||||
'User-Agent': user_agent,
|
||||
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
|
||||
'Accept-Language': 'en-GB,en;q=0.9',
|
||||
}
|
||||
async with httpx.AsyncClient(headers=headers, follow_redirects=True, timeout=20) as client:
|
||||
r = await client.get(url, params={'date': travel_date})
|
||||
with httpx.Client(headers=headers, follow_redirects=True, timeout=20) as client:
|
||||
r = client.get(url, params={'date': travel_date})
|
||||
r.raise_for_status()
|
||||
return _parse(r.text, destination)
|
||||
|
||||
|
||||
def get_eurostar_times(destination: str, travel_date: str,
|
||||
user_agent: str = DEFAULT_UA) -> list[dict]:
|
||||
"""Synchronous wrapper for CLI/testing."""
|
||||
return asyncio.run(fetch(destination, travel_date, user_agent))
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue