"""Agenda data.""" import asyncio import itertools import os import typing from datetime import date, datetime, timedelta from time import time import dateutil.rrule import dateutil.tz import flask import isodate # type: ignore import lxml import pytz import yaml from . import ( accommodation, birthday, calendar, conference, domains, economist, fx, gwr, hn, holidays, meetup, stock_market, subscription, sun, thespacedevs, travel, uk_holiday, uk_tz, waste_schedule, ) from .types import Event, StrDict here = dateutil.tz.tzlocal() # deadline to file tax return # credit card expiry dates # morzine ski lifts # chalet availablity calendar # starlink visible def timezone_transition( start: datetime, end: datetime, key: str, tz_name: str ) -> list[Event]: """Clocks changes.""" tz = pytz.timezone(tz_name) return [ Event(name=key, date=pytz.utc.localize(t).astimezone(tz)) for t in tz._utc_transition_times # type: ignore if start <= t <= end ] def midnight(d: date) -> datetime: """Convert from date to midnight on that day.""" return datetime.combine(d, datetime.min.time()) def dates_from_rrule( rrule: str, start: date, end: date ) -> typing.Sequence[datetime | date]: """Generate events from an RRULE between start_date and end_date.""" all_day = not any(param in rrule for param in ["BYHOUR", "BYMINUTE", "BYSECOND"]) return [ i.date() if all_day else uk_tz.localize(i) for i in dateutil.rrule.rrulestr(rrule, dtstart=midnight(start)).between( midnight(start), midnight(end) ) ] async def waste_collection_events(data_dir: str) -> list[Event]: """Waste colllection events.""" postcode = "BS48 3HG" uprn = "24071046" html = await waste_schedule.get_html(data_dir, postcode, uprn) root = lxml.html.fromstring(html) events = waste_schedule.parse(root) return events async def bristol_waste_collection_events( data_dir: str, start_date: date ) -> list[Event]: """Waste colllection events.""" uprn = "358335" return await waste_schedule.get_bristol_gov_uk(start_date, data_dir, uprn) def get_yaml_event_date_field(item: dict[str, str]) -> str: """Event date field name.""" return ( "end_date" if item["name"] == "travel_insurance" else ("start_date" if "start_date" in item else "date") ) def get_yaml_event_end_date_field(item: dict[str, str]) -> str: """Event date field name.""" return ( "end_date" if item["name"] == "travel_insurance" else ("start_date" if "start_date" in item else "date") ) def read_events_yaml(data_dir: str, start: date, end: date) -> list[Event]: """Read eventes from YAML file.""" events: list[Event] = [] for item in yaml.safe_load(open(os.path.join(data_dir, "events.yaml"))): duration = ( isodate.parse_duration(item["duration"]) if "duration" in item else None ) dates = ( dates_from_rrule(item["rrule"], start, end) if "rrule" in item else [item[get_yaml_event_date_field(item)]] ) for dt in dates: e = Event( name=item["name"], date=dt, end_date=( dt + duration if duration else ( item.get("end_date") if item["name"] != "travel_insurance" else None ) ), title=item.get("title"), url=item.get("url"), ) events.append(e) return events def find_events_during_stay( accommodation_events: list[Event], markets: list[Event] ) -> list[Event]: """Market events that happen during accommodation stays.""" overlapping_markets = [] for market in markets: market_date = market.as_date assert isinstance(market_date, date) for e in accommodation_events: start, end = e.as_date, e.end_as_date assert start and end and all(isinstance(i, date) for i in (start, end)) # Check if the market date is within the accommodation dates. if start <= market_date <= end: overlapping_markets.append(market) break # Breaks the inner loop if overlap is found. return overlapping_markets def find_gaps(events: list[Event], min_gap_days: int = 3) -> list[StrDict]: """Gaps of at least `min_gap_days` between events in a list of events.""" # Sort events by start date gaps: list[tuple[date, date]] = [] previous_event_end = None by_start_date = { d: list(on_day) for d, on_day in itertools.groupby(events, key=lambda e: e.as_date) } by_end_date = { d: list(on_day) for d, on_day in itertools.groupby(events, key=lambda e: e.end_as_date) } for event in events: # Use start date for current event start_date = event.as_date # If previous event exists, calculate the gap if previous_event_end: gap_days = (start_date - previous_event_end).days if gap_days >= (min_gap_days + 2): start_end = ( previous_event_end + timedelta(days=1), start_date - timedelta(days=1), ) gaps.append(start_end) # Update previous event end date end = event.end_as_date if not previous_event_end or end > previous_event_end: previous_event_end = end return [ { "start": gap_start, "end": gap_end, "after": by_start_date[gap_end + timedelta(days=1)], "before": by_end_date[gap_start - timedelta(days=1)], } for gap_start, gap_end in gaps ] def busy_event(e: Event) -> bool: """Busy.""" if e.name not in { "event", "accommodation", "conference", "dodainville", "transport", "meetup", "party", }: return False if e.title in ("IA UK board meeting", "Mill Road Winter Fair"): return False if e.name == "conference" and not e.going: return False if not e.title: return True if e.title == "LHG Run Club" or "Third Thursday Social" in e.title: return False lc_title = e.title.lower() return "rebels" not in lc_title and "south west data social" not in lc_title async def time_function( name: str, func: typing.Callable[..., typing.Coroutine[typing.Any, typing.Any, typing.Any]], *args, **kwargs, ) -> tuple[str, typing.Any, float]: """Time the execution of an asynchronous function.""" start_time = time() result = await func(*args, **kwargs) end_time = time() return name, result, end_time - start_time async def get_data( now: datetime, config: flask.config.Config ) -> typing.Mapping[str, str | object]: """Get data to display on agenda dashboard.""" data_dir = config["DATA_DIR"] rocket_dir = os.path.join(data_dir, "thespacedevs") today = now.date() two_weeks_ago = today - timedelta(weeks=2) last_week = today - timedelta(weeks=1) last_year = today - timedelta(days=365) next_year = today + timedelta(days=2 * 365) minus_365 = now - timedelta(days=365) plus_365 = now + timedelta(days=365) t0 = time() result_list = await asyncio.gather( time_function("gbpusd", fx.get_gbpusd, config), time_function("gwr_advance_tickets", gwr.advance_ticket_date, data_dir), time_function("rockets", thespacedevs.get_launches, rocket_dir, limit=40), time_function("backwell_bins", waste_collection_events, data_dir), time_function("bristol_bins", bristol_waste_collection_events, data_dir, today), ) results = {call[0]: call[1] for call in result_list} gwr_advance_tickets = results["gwr_advance_tickets"] data_gather_seconds = time() - t0 t0 = time() stock_market_times = stock_market.open_and_close() stock_market_times_seconds = time() - t0 reply: dict[str, typing.Any] = { "now": now, "gbpusd": results["gbpusd"], "stock_markets": stock_market_times, "rockets": results["rockets"], "gwr_advance_tickets": gwr_advance_tickets, "data_gather_seconds": data_gather_seconds, "stock_market_times_seconds": stock_market_times_seconds, "timings": [(call[0], call[2]) for call in result_list], } my_data = config["PERSONAL_DATA"] events = ( [ Event(name="mothers_day", date=uk_holiday.get_mothers_day(today)), ] + timezone_transition(minus_365, plus_365, "uk_clock_change", "Europe/London") + timezone_transition( minus_365, plus_365, "us_clock_change", "America/New_York" ) ) if gwr_advance_tickets: events.append(Event(name="gwr_advance_tickets", date=gwr_advance_tickets)) us_hols = holidays.us_holidays(last_year, next_year) events += holidays.get_nyse_holidays(last_year, next_year, us_hols) accommodation_events = accommodation.get_events( os.path.join(my_data, "accommodation.yaml") ) holiday_list = holidays.get_all(last_year, next_year, data_dir) events += holidays.combine_holidays(holiday_list) events += birthday.get_birthdays(last_year, os.path.join(my_data, "entities.yaml")) events += accommodation_events events += travel.all_events(my_data) events += conference.get_list(os.path.join(my_data, "conferences.yaml")) events += results["backwell_bins"] + results["bristol_bins"] events += read_events_yaml(my_data, last_year, next_year) events += subscription.get_events(os.path.join(my_data, "subscriptions.yaml")) events += economist.publication_dates(last_week, next_year) events += meetup.get_events(my_data) events += hn.whoishiring(last_year, next_year) events += domains.renewal_dates(my_data) # hide markets that happen while away optional = [ e for e in events if e.name == "market" or (e.title and "LHG Run Club" in e.title) ] going = [e for e in events if e.going] overlapping_markets = find_events_during_stay( accommodation_events + going, optional ) for market in overlapping_markets: events.remove(market) for launch in results["rockets"]: dt = None if launch["net_precision"] == "Day": dt = datetime.strptime(launch["net"], "%Y-%m-%dT%H:%M:%SZ").date() elif launch["t0_time"]: dt = pytz.utc.localize( datetime.strptime(launch["net"], "%Y-%m-%dT%H:%M:%SZ") ) if not dt: continue rocket_name = f'{launch["rocket"]}: {launch["mission_name"] or "[no mission]"}' e = Event(name="rocket", date=dt, title=rocket_name) events.append(e) events += [Event(name="today", date=today)] busy_events = [ e for e in sorted(events, key=lambda e: e.as_date) if e.as_date > today and e.as_date < next_year and busy_event(e) ] gaps = find_gaps(busy_events) events += [ Event(name="gap", date=gap["start"], end_date=gap["end"]) for gap in gaps ] # Sort events by their datetime; the "today" event is prioritised # at the top of the list for today. This is achieved by sorting first by # the datetime attribute, and then ensuring that events with the name # "today" are ordered before others on the same date. events.sort(key=lambda e: (e.as_datetime, e.name != "today")) reply["gaps"] = gaps observer = sun.bristol() reply["sunrise"] = sun.sunrise(observer) reply["sunset"] = sun.sunset(observer) reply["events"] = events reply["last_week"] = last_week reply["two_weeks_ago"] = two_weeks_ago reply["fullcalendar_events"] = calendar.build_events(events) return reply