"""Read events from YAML.""" import os import typing from datetime import date, datetime import dateutil.rrule import isodate # type: ignore import yaml from . import uk_tz from .event import Event def midnight(d: date) -> datetime: """Convert from date to midnight on that day.""" return datetime.combine(d, datetime.min.time()) def dates_from_rrule( rrule: str, start: date, end: date ) -> typing.Sequence[datetime | date]: """Generate events from an RRULE between start_date and end_date.""" all_day = not any(param in rrule for param in ["BYHOUR", "BYMINUTE", "BYSECOND"]) return [ i.date() if all_day else uk_tz.localize(i) for i in dateutil.rrule.rrulestr(rrule, dtstart=midnight(start)).between( midnight(start), midnight(end) ) ] def get_yaml_event_date_field(item: dict[str, str]) -> str: """Event date field name.""" return ( "end_date" if item["name"] == "travel_insurance" else ("start_date" if "start_date" in item else "date") ) def get_yaml_event_end_date_field(item: dict[str, str]) -> str: """Event date field name.""" return ( "end_date" if item["name"] == "travel_insurance" else ("start_date" if "start_date" in item else "date") ) def read( data_dir: str, start: date, end: date, skip_trips: bool = False ) -> list[Event]: """Read eventes from YAML file.""" events: list[Event] = [] for item in yaml.safe_load(open(os.path.join(data_dir, "events.yaml"))): if "trip" in item and skip_trips: continue duration = ( isodate.parse_duration(item["duration"]) if "duration" in item else None ) dates = ( dates_from_rrule(item["rrule"], start, end) if "rrule" in item else [item[get_yaml_event_date_field(item)]] ) for dt in dates: e = Event( name=item["name"], date=dt, end_date=( dt + duration if duration else ( item.get("end_date") if item["name"] != "travel_insurance" else None ) ), title=item.get("title"), url=item.get("url"), ) events.append(e) return events