249 lines
7.5 KiB
Python
249 lines
7.5 KiB
Python
"""UK school holidays (Bristol) via iCalendar."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import datetime
|
|
import json
|
|
import os
|
|
|
|
import httpx
|
|
|
|
from .event import Event
|
|
|
|
school_holiday_page_url = (
|
|
"https://www.bristol.gov.uk/residents/schools-learning-and-early-years/"
|
|
"school-term-and-holiday-dates"
|
|
)
|
|
school_holiday_ics_url = (
|
|
"https://www.bristol.gov.uk/files/documents/"
|
|
"4641-bristol-school-term-and-holiday-dates-2021-2022-and-2022-2023-and-2023-"
|
|
"2024-calendar"
|
|
)
|
|
|
|
|
|
def ics_filename(data_dir: str) -> str:
|
|
"""Filename for cached school-holiday ICS."""
|
|
assert os.path.exists(data_dir)
|
|
return os.path.join(data_dir, "bristol-school-holidays.ics")
|
|
|
|
|
|
def json_filename(data_dir: str) -> str:
|
|
"""Filename for cached parsed school-holiday data."""
|
|
assert os.path.exists(data_dir)
|
|
return os.path.join(data_dir, "bristol-school-holidays.json")
|
|
|
|
|
|
def _unescape_ics_text(value: str) -> str:
|
|
"""Decode escaped ICS text values."""
|
|
return (
|
|
value.replace("\\n", " ")
|
|
.replace("\\N", " ")
|
|
.replace("\\,", ",")
|
|
.replace("\\;", ";")
|
|
.replace("\\\\", "\\")
|
|
).strip()
|
|
|
|
|
|
def unfold_ics_lines(ics_text: str) -> list[str]:
|
|
"""Unfold folded ICS lines (RFC5545)."""
|
|
unfolded: list[str] = []
|
|
for raw_line in ics_text.splitlines():
|
|
line = raw_line.rstrip("\r\n")
|
|
if not line:
|
|
continue
|
|
if unfolded and line[:1] in {" ", "\t"}:
|
|
unfolded[-1] += line[1:]
|
|
else:
|
|
unfolded.append(line)
|
|
return unfolded
|
|
|
|
|
|
def _parse_ics_date(value: str) -> datetime.date:
|
|
"""Parse date/date-time values in ICS."""
|
|
value = value.strip()
|
|
if "T" in value:
|
|
date_part = value.split("T", 1)[0]
|
|
return datetime.datetime.strptime(date_part, "%Y%m%d").date()
|
|
return datetime.datetime.strptime(value, "%Y%m%d").date()
|
|
|
|
|
|
def _is_school_holiday_summary(summary: str) -> bool:
|
|
"""Return True if summary looks like a school holiday event."""
|
|
lower = summary.lower()
|
|
if "holiday" not in lower:
|
|
return False
|
|
if "bank holiday" in lower:
|
|
return False
|
|
return True
|
|
|
|
|
|
def _clean_summary(summary: str) -> str:
|
|
"""Normalise holiday summary text for display."""
|
|
summary = _unescape_ics_text(summary)
|
|
# The feed embeds long policy notes in parentheses after the name.
|
|
if " (" in summary:
|
|
summary = summary.split(" (", 1)[0]
|
|
return summary.strip()
|
|
|
|
|
|
def parse_school_holidays_from_ics(ics_text: str) -> list[Event]:
|
|
"""Parse school holiday ranges from an ICS file as Events."""
|
|
events: list[Event] = []
|
|
current: dict[str, str] = {}
|
|
|
|
def flush_current() -> None:
|
|
summary = current.get("SUMMARY")
|
|
dtstart = current.get("DTSTART")
|
|
dtend = current.get("DTEND")
|
|
if not summary or not dtstart or not dtend:
|
|
return
|
|
|
|
clean_summary = _clean_summary(summary)
|
|
if not _is_school_holiday_summary(clean_summary):
|
|
return
|
|
|
|
start_date = _parse_ics_date(dtstart)
|
|
end_exclusive = _parse_ics_date(dtend)
|
|
end_date = end_exclusive - datetime.timedelta(days=1)
|
|
if end_date < start_date:
|
|
return
|
|
|
|
events.append(
|
|
Event(
|
|
name="uk_school_holiday",
|
|
date=start_date,
|
|
end_date=end_date,
|
|
title=clean_summary,
|
|
url=school_holiday_page_url,
|
|
)
|
|
)
|
|
|
|
for line in unfold_ics_lines(ics_text):
|
|
if line == "BEGIN:VEVENT":
|
|
current = {}
|
|
continue
|
|
if line == "END:VEVENT":
|
|
flush_current()
|
|
current = {}
|
|
continue
|
|
|
|
if ":" not in line:
|
|
continue
|
|
|
|
key_part, value = line.split(":", 1)
|
|
key = key_part.split(";", 1)[0].upper()
|
|
|
|
if key in {"SUMMARY", "DTSTART", "DTEND"}:
|
|
current[key] = value.strip()
|
|
|
|
# De-duplicate by title/date-range.
|
|
unique: dict[tuple[str, datetime.date, datetime.date], Event] = {}
|
|
for event in events:
|
|
end_date = event.end_as_date
|
|
unique[(event.title or event.name, event.as_date, end_date)] = event
|
|
|
|
return sorted(unique.values(), key=lambda item: (item.as_date, item.end_as_date))
|
|
|
|
|
|
def write_school_holidays_json(events: list[Event], data_dir: str) -> None:
|
|
"""Write parsed school-holiday events to JSON cache."""
|
|
filename = json_filename(data_dir)
|
|
payload: list[dict[str, str]] = [
|
|
{
|
|
"name": event.name,
|
|
"title": event.title or event.name,
|
|
"start": event.as_date.isoformat(),
|
|
"end": event.end_as_date.isoformat(),
|
|
"url": event.url or "",
|
|
}
|
|
for event in events
|
|
]
|
|
with open(filename, "w", encoding="utf-8") as out:
|
|
json.dump(payload, out, indent=2)
|
|
|
|
|
|
def read_school_holidays_json(data_dir: str) -> list[Event]:
|
|
"""Read parsed school-holiday events from JSON cache."""
|
|
filename = json_filename(data_dir)
|
|
if not os.path.exists(filename):
|
|
return []
|
|
|
|
with open(filename, encoding="utf-8") as in_file:
|
|
loaded = json.load(in_file)
|
|
if not isinstance(loaded, list):
|
|
return []
|
|
|
|
parsed_events: list[Event] = []
|
|
for raw_item in loaded:
|
|
if not isinstance(raw_item, dict):
|
|
continue
|
|
title = raw_item.get("title")
|
|
start_value = raw_item.get("start")
|
|
end_value = raw_item.get("end")
|
|
if not (
|
|
isinstance(title, str)
|
|
and isinstance(start_value, str)
|
|
and isinstance(end_value, str)
|
|
):
|
|
continue
|
|
|
|
try:
|
|
start_date = datetime.date.fromisoformat(start_value)
|
|
end_date = datetime.date.fromisoformat(end_value)
|
|
except ValueError:
|
|
continue
|
|
|
|
event_url = raw_item.get("url")
|
|
parsed_events.append(
|
|
Event(
|
|
name="uk_school_holiday",
|
|
date=start_date,
|
|
end_date=end_date,
|
|
title=title,
|
|
url=event_url if isinstance(event_url, str) and event_url else None,
|
|
)
|
|
)
|
|
|
|
return sorted(parsed_events, key=lambda item: (item.as_date, item.end_as_date))
|
|
|
|
|
|
def school_holiday_list(
|
|
start_date: datetime.date,
|
|
end_date: datetime.date,
|
|
data_dir: str,
|
|
) -> list[Event]:
|
|
"""Get cached school-holiday events overlapping the supplied range."""
|
|
items = read_school_holidays_json(data_dir)
|
|
return [
|
|
item
|
|
for item in items
|
|
if item.as_date <= end_date and item.end_as_date >= start_date
|
|
]
|
|
|
|
|
|
async def get_holiday_list(data_dir: str) -> list[Event]:
|
|
"""Download, parse and cache school-holiday data."""
|
|
headers = {
|
|
"User-Agent": (
|
|
"Mozilla/5.0 (X11; Linux x86_64) "
|
|
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0 Safari/537.36"
|
|
),
|
|
"Accept": "text/calendar,*/*;q=0.9",
|
|
"Referer": school_holiday_page_url,
|
|
}
|
|
|
|
async with httpx.AsyncClient(timeout=60.0, follow_redirects=True) as client:
|
|
response = await client.get(school_holiday_ics_url, headers=headers)
|
|
response.raise_for_status()
|
|
|
|
content_type = response.headers.get("content-type", "")
|
|
ics_text = response.text
|
|
if "text/calendar" not in content_type and "BEGIN:VCALENDAR" not in ics_text:
|
|
raise ValueError("School holiday ICS download did not return calendar content")
|
|
|
|
with open(ics_filename(data_dir), "w", encoding="utf-8") as out:
|
|
out.write(ics_text)
|
|
|
|
events = parse_school_holidays_from_ics(ics_text)
|
|
write_school_holidays_json(events, data_dir)
|
|
return events
|