diff --git a/.eslintrc.js b/.eslintrc.js deleted file mode 100644 index 92b3fea..0000000 --- a/.eslintrc.js +++ /dev/null @@ -1,17 +0,0 @@ -module.exports = { - "env": { - "browser": true, - "es6": true - }, - "extends": "eslint:recommended", - "globals": { - "Atomics": "readonly", - "SharedArrayBuffer": "readonly" - }, - "parserOptions": { - "ecmaVersion": 14, - "sourceType": "module" - }, - "rules": { - } -}; diff --git a/.gitignore b/.gitignore index 87339fa..cf41dea 100644 --- a/.gitignore +++ b/.gitignore @@ -3,8 +3,4 @@ __pycache__/ __pycache__ .mypy_cache config -.hypothesis -personal-data -static/bootstrap5 -static/leaflet* -static/es-module-shims +.hypothesis/ diff --git a/AGENTS.md b/AGENTS.md deleted file mode 100644 index f74c53c..0000000 --- a/AGENTS.md +++ /dev/null @@ -1,32 +0,0 @@ -# Development Guidelines - -## Project Overview -This is a personal agenda web application built with Flask that tracks various events and important dates: -- Events: birthdays, holidays, travel itineraries, conferences, waste collection schedules -- Space launches, meteor showers, astronomical events -- Financial information (FX rates, stock market) -- UK-specific features (holidays, waste collection, railway schedules) -- Authentication via UniAuth -- Frontend uses Bootstrap 5, Leaflet for maps, FullCalendar for calendar views - -## Python Environment -- Always use `python3` directly, never `python` -- All Python code should include type annotations -- Use `typing.Any` instead of `Any` in type hints (import from typing module) -- Run `mypy --strict` (fix any type errors in the file) and `black` on modified code after creating or modifying Python files -- Avoid running `black .` -- Main entry point: `python3 web_view.py` (Flask app on port 5000) -- Tests: Use `pytest` (tests in `/tests/` directory) - -## Project Structure -- `agenda/` - Main Python package with modules for different event types -- `web_view.py` - Flask web application entry point -- `templates/` - Jinja2 HTML templates -- `static/` - CSS, JS, and frontend assets -- `config/` - Configuration files -- `personal-data/` - User's personal data (not in git) - -## Git Workflow -- Avoid committing unrelated untracked files (e.g., `node_modules/`, build artifacts) -- Only commit relevant project files -- Personal data directory (`personal-data/`) is excluded from git diff --git a/agenda/__init__.py b/agenda/__init__.py index 0428f0c..baf8813 100644 --- a/agenda/__init__.py +++ b/agenda/__init__.py @@ -2,7 +2,6 @@ from datetime import date, datetime, time -import pycountry import pytz uk_tz = pytz.timezone("Europe/London") @@ -11,34 +10,3 @@ uk_tz = pytz.timezone("Europe/London") def uk_time(d: date, t: time) -> datetime: """Combine time and date for UK timezone.""" return uk_tz.localize(datetime.combine(d, t)) - - -def format_list_with_ampersand(items: list[str]) -> str: - """Join a list of strings with commas and an ampersand.""" - if len(items) > 1: - return ", ".join(items[:-1]) + " & " + items[-1] - elif items: - return items[0] - return "" - - -def get_country(alpha_2: str | None) -> pycountry.db.Country | None: - """Lookup country by alpha-2 country code.""" - if not alpha_2: - return None - if alpha_2.count(",") > 3: # ESA - return pycountry.db.Country(flag="πͺπΊ", name="ESA") - if not alpha_2: - return None - if alpha_2 == "xk": - return pycountry.db.Country( - flag="\U0001f1fd\U0001f1f0", name="Kosovo", alpha_2="xk" - ) - - country: pycountry.db.Country | None = None - if len(alpha_2) == 2: - country = pycountry.countries.get(alpha_2=alpha_2.upper()) - elif len(alpha_2) == 3: - country = pycountry.countries.get(alpha_3=alpha_2.upper()) - - return country diff --git a/agenda/accommodation.py b/agenda/accommodation.py index 59e9d64..3ce4b9e 100644 --- a/agenda/accommodation.py +++ b/agenda/accommodation.py @@ -1,19 +1,20 @@ -"""Accommodation.""" +"""Accomodation""" import yaml -from .event import Event +from .types import Event def get_events(filepath: str) -> list[Event]: - """Get accommodation from YAML.""" + """Get accomodation from YAML.""" with open(filepath) as f: return [ Event( date=item["from"], end_date=item["to"], name="accommodation", - title=( + title="π§³" + + ( f'{item["location"]} Airbnb' if item.get("operator") == "airbnb" else item["name"] diff --git a/agenda/airbnb.py b/agenda/airbnb.py deleted file mode 100644 index dfbf3c2..0000000 --- a/agenda/airbnb.py +++ /dev/null @@ -1,190 +0,0 @@ -"""Library for parsing Airbnb booking HTML files.""" - -import json -import re -import typing -from datetime import datetime -from typing import Any -from zoneinfo import ZoneInfo - -import lxml.html -import pycountry - -StrDict = dict[str, typing.Any] - - -def build_datetime(date_str: str, time_str: str, tz_name: str) -> datetime: - """ - Combine an ISO date string, HH:MM time string, and a timezone name - into a timezone-aware datetime in the specified timezone. - """ - dt_str = f"{date_str}T{time_str}" - naive_dt = datetime.fromisoformat(dt_str) - return naive_dt.replace(tzinfo=ZoneInfo(tz_name)) - - -def list_to_dict(items: list[typing.Any]) -> dict[str, typing.Any]: - """Convert a flat list to a dict, assuming alternating keys and values.""" - return {items[i]: items[i + 1] for i in range(0, len(items), 2)} - - -def extract_country_code(address: str) -> str | None: - """Return ISO 3166-1 alpha-2 country code from a free-text address.""" - address_lower = address.lower() - for country in pycountry.countries: - if country.name.lower() in address_lower: - return str(country.alpha_2.lower()) - if ( - hasattr(country, "official_name") - and country.official_name.lower() in address_lower - ): - return str(country.alpha_2.lower()) - return None - - -def get_json_blob(tree: Any) -> str: - data_id = "data-injector-instances" - js_string = tree.xpath(f'//*[@id="{data_id}"]/text()')[0] - return str(js_string) - - -def get_ui_state(tree: Any) -> StrDict: - data_id = "data-injector-instances" - js_string = tree.xpath(f'//*[@id="{data_id}"]/text()')[0] - big_blob = json.loads(str(js_string)) - ui_state = walk_tree(big_blob, "uiState") - return list_to_dict(ui_state[0]) - - -def get_reservation_data(ui_state: StrDict) -> StrDict: - return { - row["id"]: row for row in ui_state["reservation"]["scheduled_event"]["rows"] - } - - -def get_room_url(tree: Any) -> str | None: - for e in tree.xpath('//a[@data-testid="reservation-destination-link"]'): - href = e.get("href") - assert isinstance(href, str) - if not href.startswith("/room"): - continue - return "https://www.airbnb.co.uk" + href - return None - - -def get_price_from_reservation(reservation: StrDict) -> str: - price = reservation["payment_summary"]["subtitle"] - assert isinstance(price, str) - tc = "Total cost: " - if price.startswith(tc): - price = price[len(tc) :] - assert price[0] == "Β£" - return price[1:] - - -def extract_booking_from_html(html_file: str) -> StrDict: - """Extract booking information from Airbnb HTML file.""" - - with open(html_file, "r", encoding="utf-8") as f: - text_content = f.read() - - confirmation_match = re.search( - r"/trips/v1/reservation-details/ro/RESERVATION2_CHECKIN/([A-Z0-9]+)", - text_content, - ) - if confirmation_match is None: - raise ValueError("Could not find confirmation code in HTML") - confirmation_code = confirmation_match.group(1) - - tree = lxml.html.parse(html_file) - root = tree.getroot() - try: - ui_state = get_ui_state(tree) - except Exception: - print(html_file) - raise - - reservation = get_reservation_data(ui_state) - m_guests = re.match(r"^(\d+) guests?$", reservation["guests"]["subtitle"]) - if m_guests is None: - raise ValueError("Could not parse number of guests") - number_of_adults = int(m_guests.group(1)) - - price = get_price_from_reservation(reservation) - metadata = ui_state["reservation"]["metadata"] - country_code = metadata["country"].lower() - - title = reservation["dynamic_marquee_title_image_v3"]["title"] - location = title.rpartition(" in ")[2] - - checkin_checkout = reservation["checkin_checkout_arrival_guide"] - check_in_time = checkin_checkout["leading_subtitle"] - check_out_time = checkin_checkout["trailing_subtitle"] - - check_in = build_datetime( - metadata["check_in_date"], check_in_time, metadata["timezone"] - ) - - check_out = build_datetime( - metadata["check_out_date"], check_out_time, metadata["timezone"] - ) - - address = reservation["map"]["address"] if "map" in reservation else None - - if "header_action.pdp" in reservation: - name = reservation["header_action.pdp"]["subtitle"] - else: - name = root.findtext(".//h1") - - booking = { - "type": "apartment", - "operator": "airbnb", - "name": name, - "location": location, - "booking_reference": confirmation_code, - "booking_url": f"https://www.airbnb.co.uk/trips/v1/reservation-details/ro/RESERVATION2_CHECKIN/{confirmation_code}", - "country": country_code, - "latitude": metadata["lat"], - "longitude": metadata["lng"], - "timezone": metadata["timezone"], - "from": check_in, - "to": check_out, - "price": price, - "currency": "GBP", - "number_of_adults": number_of_adults, - } - if address: - booking["address"] = address - - room_url = get_room_url(tree) - if room_url is not None: - booking["url"] = room_url - - return booking - - -def walk_tree(data: Any, want_key: str) -> Any: - """Recursively search for a dict containing 'reservation' and return its value.""" - if isinstance(data, dict): - if want_key in data: - return data[want_key] - for key, value in data.items(): - result = walk_tree(value, want_key) - if result is not None: - return result - elif isinstance(data, list): - for item in data: - result = walk_tree(item, want_key) - if result is not None: - return result - return None - - -def parse_multiple_files(filenames: list[str]) -> list[StrDict]: - """Parse multiple Airbnb HTML files and return a list of booking dictionaries.""" - bookings = [] - for html_file in sorted(filenames): - booking = extract_booking_from_html(html_file) - assert booking - bookings.append(booking) - return bookings diff --git a/agenda/birthday.py b/agenda/birthday.py index 6fdfd21..fdebd5d 100644 --- a/agenda/birthday.py +++ b/agenda/birthday.py @@ -4,7 +4,7 @@ from datetime import date import yaml -from .event import Event +from .types import Event YEAR_NOT_KNOWN = 1900 @@ -42,7 +42,7 @@ def get_birthdays(from_date: date, filepath: str) -> list[Event]: Event( date=bday.replace(year=bday.year + offset), name="birthday", - title=f'{entity["label"]} ({display_age})', + title=f'π {entity["label"]} ({display_age})', ) ) diff --git a/agenda/bristol_waste.py b/agenda/bristol_waste.py deleted file mode 100644 index db8f80f..0000000 --- a/agenda/bristol_waste.py +++ /dev/null @@ -1,131 +0,0 @@ -"""Waste collection schedules.""" - -import json -import os -import typing -from collections import defaultdict -from datetime import date, datetime, timedelta - -import httpx - -from .event import Event -from .utils import make_waste_dir - -ttl_hours = 12 - - -BristolSchedule = list[dict[str, typing.Any]] - - -async def get(start_date: date, data_dir: str, uprn: str, cache: str) -> list[Event]: - """Get waste collection schedule from Bristol City Council.""" - by_date: defaultdict[date, list[str]] = defaultdict(list) - for item in await get_data(data_dir, uprn, cache): - service = get_service(item) - for d in collections(item): - if d < start_date and service not in by_date[d]: - by_date[d].append(service) - - return [ - Event(name="waste_schedule", date=d, title="Bristol: " + ", ".join(services)) - for d, services in by_date.items() - ] - - -async def get_data(data_dir: str, uprn: str, cache: str) -> BristolSchedule: - """Get Bristol Waste schedule, with cache.""" - now = datetime.now() - waste_dir = os.path.join(data_dir, "waste") - - make_waste_dir(data_dir) - - existing_data = os.listdir(waste_dir) - existing = [f for f in existing_data if f.endswith(f"_{uprn}.json")] - if existing: - recent_filename = max(existing) - recent = datetime.strptime(recent_filename, f"%Y-%m-%d_%H:%M_{uprn}.json") - delta = now - recent - - def get_from_recent() -> BristolSchedule: - json_data = json.load(open(os.path.join(waste_dir, recent_filename))) - return typing.cast(BristolSchedule, json_data["data"]) - - if ( - cache != "refresh" - and existing - and (cache == "force" or delta < timedelta(hours=ttl_hours)) - ): - return get_from_recent() - - try: - r = await get_web_data(uprn) - except httpx.ReadTimeout: - return get_from_recent() - - with open(f'{waste_dir}/{now.strftime("%Y-%m-%d_%H:%M")}_{uprn}.json', "wb") as out: - out.write(r.content) - - return typing.cast(BristolSchedule, r.json()["data"]) - - -async def get_web_data(uprn: str) -> httpx.Response: - """Get JSON from Bristol City Council.""" - UA = "Mozilla/5.0 (Windows NT 10.0; Win64; x64)" - HEADERS = { - "Accept": "*/*", - "Accept-Language": "en-GB,en;q=0.9", - "Connection": "keep-alive", - "Ocp-Apim-Subscription-Key": "47ffd667d69c4a858f92fc38dc24b150", - "Ocp-Apim-Trace": "true", - "Origin": "https://bristolcouncil.powerappsportals.com", - "Referer": "https://bristolcouncil.powerappsportals.com/", - "Sec-Fetch-Dest": "empty", - "Sec-Fetch-Mode": "cors", - "Sec-Fetch-Site": "cross-site", - "Sec-GPC": "1", - "User-Agent": UA, - } - - _uprn = str(uprn).zfill(12) - - async with httpx.AsyncClient(timeout=20) as client: - # Initialise form - payload = {"servicetypeid": "7dce896c-b3ba-ea11-a812-000d3a7f1cdc"} - response = await client.get( - "https://bristolcouncil.powerappsportals.com/completedynamicformunauth/", - headers=HEADERS, - params=payload, - ) - - host = "bcprdapidyna002.azure-api.net" - - # Set the search criteria - payload = {"Uprn": "UPRN" + _uprn} - response = await client.post( - f"https://{host}/bcprdfundyna001-llpg/DetailedLLPG", - headers=HEADERS, - json=payload, - ) - - # Retrieve the schedule - payload = {"uprn": _uprn} - response = await client.post( - f"https://{host}/bcprdfundyna001-alloy/NextCollectionDates", - headers=HEADERS, - json=payload, - ) - - return response - - -def get_service(item: dict[str, typing.Any]) -> str: - """Bristol waste service name.""" - service: str = item["containerName"] - return "Recycling" if "Recycling" in service else service.partition(" ")[2] - - -def collections(item: dict[str, typing.Any]) -> typing.Iterable[date]: - """Bristol dates from collections.""" - for collection in item["collection"]: - for collection_date_key in ["nextCollectionDate", "lastCollectionDate"]: - yield date.fromisoformat(collection[collection_date_key][:10]) diff --git a/agenda/busy.py b/agenda/busy.py deleted file mode 100644 index 8452903..0000000 --- a/agenda/busy.py +++ /dev/null @@ -1,655 +0,0 @@ -"""Identify busy events and gaps when nothing is scheduled.""" - -import itertools -import typing -from datetime import date, datetime, timedelta - -import flask -import pycountry - -from . import events_yaml, get_country, travel -from .event import Event -from .types import StrDict, Trip - - -def busy_event(e: Event) -> bool: - """Busy.""" - if e.name not in { - "event", - "accommodation", - "conference", - "transport", - "meetup", - "party", - "trip", - "hackathon", - }: - return False - - if e.title in ("IA UK board meeting", "Mill Road Winter Fair"): - return False - - if e.name == "conference" and not e.going: - return False - if not e.title: - return True - if e.title == "LHG Run Club" or "Third Thursday Social" in e.title: - return False - - lc_title = e.title.lower() - return ( - "rebels" not in lc_title - and "south west data social" not in lc_title - and "dorkbot" not in lc_title - ) - - -def get_busy_events( - start: date, config: flask.config.Config, trips: list[Trip] -) -> list[Event]: - """Find busy events from a year ago to two years in the future.""" - last_year = start - timedelta(days=365) - next_year = start + timedelta(days=2 * 365) - - my_data = config["PERSONAL_DATA"] - events = events_yaml.read(my_data, last_year, next_year, skip_trips=True) - - for trip in trips: - event_type = "trip" - if trip.events and not trip.conferences: - event_type = trip.events[0]["name"] - elif len(trip.conferences) == 1 and trip.conferences[0].get("hackathon"): - event_type = "hackathon" - events.append( - Event( - name=event_type, - title=trip.title + " " + trip.country_flags, - date=trip.start, - end_date=trip.end, - url=flask.url_for("trip_page", start=trip.start.isoformat()), - ) - ) - - busy_events = [ - e - for e in sorted(events, key=lambda e: e.as_date) - if (e.as_date >= start or (e.end_date and e.end_as_date >= start)) - and e.as_date < next_year - and busy_event(e) - ] - - return busy_events - - -def _parse_datetime_field(datetime_obj: datetime | date) -> tuple[datetime, date]: - """Parse a datetime field that could be datetime object or string.""" - if hasattr(datetime_obj, "date"): - return datetime_obj, datetime_obj.date() - elif isinstance(datetime_obj, str): - dt = datetime.fromisoformat(datetime_obj.replace("Z", "+00:00")) - return dt, dt.date() - else: - raise ValueError(f"Invalid datetime format: {datetime_obj}") - - -def _get_accommodation_location( - acc: StrDict, on_trip: bool = False -) -> tuple[str | None, pycountry.db.Country]: - """Get location from accommodation data.""" - c = get_country(acc["country"]) - assert c - assert isinstance(acc["location"], str) - return (acc["location"] if on_trip else None, c) - - -def _find_most_recent_travel_within_trip( - trip: Trip, - target_date: date, -) -> tuple[str | None, pycountry.db.Country] | None: - """Find the most recent travel location within a trip.""" - uk_airports = {"LHR", "LGW", "STN", "LTN", "BRS", "BHX", "MAN", "EDI", "GLA"} - - trip_most_recent_date = None - trip_most_recent_location = None - trip_most_recent_datetime = None - - # Check flights within trip period - for travel_item in trip.travel: - if travel_item["type"] == "flight" and "arrive" in travel_item: - arrive_datetime, arrive_date = _parse_datetime_field(travel_item["arrive"]) - - # Only consider flights within this trip and before target date - if not (trip.start <= arrive_date <= target_date): - continue - # Compare both date and time to handle same-day flights correctly - if ( - trip_most_recent_date is None - or arrive_date > trip_most_recent_date - or ( - arrive_date == trip_most_recent_date - and ( - trip_most_recent_datetime is None - or arrive_datetime > trip_most_recent_datetime - ) - ) - ): - trip_most_recent_date = arrive_date - trip_most_recent_datetime = arrive_datetime - destination_airport = travel_item["to"] - assert "to_airport" in travel_item - airport_info = travel_item["to_airport"] - airport_country = airport_info["country"] - if airport_country == "gb": - if destination_airport in uk_airports: - # UK airport while on trip - show actual location - location_name = airport_info.get( - "city", airport_info.get("name", "London") - ) - trip_most_recent_location = ( - location_name, - get_country("gb"), - ) - else: - trip_most_recent_location = (None, get_country("gb")) - else: - location_name = airport_info.get( - "city", airport_info.get("name", destination_airport) - ) - trip_most_recent_location = ( - location_name, - get_country(airport_country), - ) - - # Check accommodations within trip period - for acc in trip.accommodation: - if "from" in acc: - try: - _, acc_date = _parse_datetime_field(acc["from"]) - except ValueError: - continue - - # Only consider accommodations within this trip and before/on target date - if trip.start <= acc_date <= target_date: - # Accommodation takes precedence over flights on the same date - # or if it's genuinely more recent - if ( - trip_most_recent_date is None - or acc_date > trip_most_recent_date - or acc_date == trip_most_recent_date - ): - trip_most_recent_date = acc_date - trip_most_recent_location = _get_accommodation_location( - acc, on_trip=True - ) - - # Check trains within trip period - for travel_item in trip.travel: - if travel_item["type"] == "train": - for leg in travel_item.get("legs", []): - if "arrive" in leg: - try: - arrive_datetime, arrive_date = _parse_datetime_field( - leg["arrive"] - ) - except ValueError: - continue - - # Only consider trains within this trip and before target date - if trip.start <= arrive_date <= target_date: - # Compare both date and time to handle same-day arrivals correctly - if ( - trip_most_recent_date is None - or arrive_date > trip_most_recent_date - or ( - arrive_date == trip_most_recent_date - and ( - trip_most_recent_datetime is None - or arrive_datetime > trip_most_recent_datetime - ) - ) - ): - trip_most_recent_date = arrive_date - trip_most_recent_datetime = arrive_datetime - # For trains, we can get station info from to_station if available - destination = leg.get("to") - assert "to_station" in leg - station_info = leg["to_station"] - station_country = station_info["country"] - if station_country == "gb": - trip_most_recent_location = ( - destination, - get_country("gb"), - ) - else: - trip_most_recent_location = ( - destination, - get_country(station_country), - ) - - # Check ferries within trip period - for travel_item in trip.travel: - if travel_item["type"] == "ferry" and "arrive" in travel_item: - try: - arrive_datetime, arrive_date = _parse_datetime_field( - travel_item["arrive"] - ) - except ValueError: - continue - - # Only consider ferries within this trip and before target date - if trip.start <= arrive_date <= target_date: - # Compare both date and time to handle same-day arrivals correctly - if ( - trip_most_recent_date is None - or arrive_date > trip_most_recent_date - or ( - arrive_date == trip_most_recent_date - and ( - trip_most_recent_datetime is None - or arrive_datetime > trip_most_recent_datetime - ) - ) - ): - trip_most_recent_date = arrive_date - trip_most_recent_datetime = arrive_datetime - # For ferries, we can get terminal info from to_terminal if available - destination = travel_item.get("to") - assert "to_terminal" in travel_item - terminal_info = travel_item["to_terminal"] - terminal_country = terminal_info.get("country", "gb") - terminal_city = terminal_info.get("city", destination) - if terminal_country == "gb": - trip_most_recent_location = ( - terminal_city, - get_country("gb"), - ) - else: - trip_most_recent_location = ( - terminal_city, - get_country(terminal_country), - ) - - return trip_most_recent_location - - -def _get_trip_location_by_progression( - trip: Trip, target_date: date -) -> tuple[str | None, pycountry.db.Country | None] | None: - """Determine location based on trip progression and date.""" - locations = trip.locations() - if not locations: - return None - - # If only one location, use it (when on a trip, always show the location) - if len(locations) == 1: - city, country = locations[0] - return (city, country) - - # Multiple locations: use progression through the trip - trip_duration = (trip.end - trip.start).days + 1 - days_into_trip = (target_date - trip.start).days - - # Simple progression: first half at first location, second half at last location - if days_into_trip <= trip_duration // 2: - city, country = locations[0] - else: - city, country = locations[-1] - - return (city, country) - - -def _find_most_recent_travel_before_date( - target_date: date, - trips: list[Trip], -) -> tuple[str | None, pycountry.db.Country | None] | None: - """Find the most recent travel location before a given date.""" - uk_airports = {"LHR", "LGW", "STN", "LTN", "BRS", "BHX", "MAN", "EDI", "GLA"} - - most_recent_location = None - most_recent_date = None - most_recent_datetime = None - - # Check all travel across all trips - for trip in trips: - # Check flights - for travel_item in trip.travel: - if travel_item["type"] == "flight" and "arrive" in travel_item: - try: - arrive_datetime, arrive_date = _parse_datetime_field( - travel_item["arrive"] - ) - except ValueError: - continue - - if arrive_date <= target_date: - # Compare both date and time to handle same-day flights correctly - if ( - most_recent_date is None - or arrive_date > most_recent_date - or ( - arrive_date == most_recent_date - and ( - most_recent_datetime is None - or arrive_datetime > most_recent_datetime - ) - ) - ): - most_recent_date = arrive_date - most_recent_datetime = arrive_datetime - destination_airport = travel_item["to"] - # For flights, determine if we're "on trip" based on whether this is within any trip period - on_trip = any( - t.start <= arrive_date <= (t.end or t.start) for t in trips - ) - - if "to_airport" in travel_item: - airport_info = travel_item["to_airport"] - airport_country = airport_info.get("country", "gb") - if airport_country == "gb": - if not on_trip: - # When not on a trip, UK airports mean home - most_recent_location = (None, get_country("gb")) - else: - # When on a trip, show the actual location even for UK airports - location_name = airport_info.get( - "city", airport_info.get("name", "London") - ) - most_recent_location = ( - location_name, - get_country("gb"), - ) - else: - location_name = airport_info.get( - "city", - airport_info.get("name", destination_airport), - ) - most_recent_location = ( - location_name, - get_country(airport_country), - ) - else: - most_recent_location = ( - destination_airport, - get_country("gb"), - ) - - # Check trains - elif travel_item["type"] == "train": - for leg in travel_item.get("legs", []): - if "arrive" in leg: - try: - arrive_datetime, arrive_date = _parse_datetime_field( - leg["arrive"] - ) - except ValueError: - continue - - if arrive_date <= target_date: - # Compare both date and time to handle same-day arrivals correctly - if ( - most_recent_date is None - or arrive_date > most_recent_date - or ( - arrive_date == most_recent_date - and ( - most_recent_datetime is None - or arrive_datetime > most_recent_datetime - ) - ) - ): - most_recent_date = arrive_date - most_recent_datetime = arrive_datetime - destination = leg.get("to") - on_trip = any( - t.start <= arrive_date <= (t.end or t.start) - for t in trips - ) - - if "to_station" in leg: - station_info = leg["to_station"] - station_country = station_info.get("country", "gb") - if station_country == "gb": - if not on_trip: - most_recent_location = ( - None, - get_country("gb"), - ) - else: - most_recent_location = ( - destination, - get_country("gb"), - ) - else: - most_recent_location = ( - destination, - get_country(station_country), - ) - else: - most_recent_location = ( - destination, - get_country("gb"), - ) - - # Check ferries - elif travel_item["type"] == "ferry" and "arrive" in travel_item: - try: - arrive_datetime, arrive_date = _parse_datetime_field( - travel_item["arrive"] - ) - except ValueError: - continue - - if arrive_date <= target_date: - # Compare both date and time to handle same-day arrivals correctly - if ( - most_recent_date is None - or arrive_date > most_recent_date - or ( - arrive_date == most_recent_date - and ( - most_recent_datetime is None - or arrive_datetime > most_recent_datetime - ) - ) - ): - most_recent_date = arrive_date - most_recent_datetime = arrive_datetime - destination = travel_item.get("to") - on_trip = any( - t.start <= arrive_date <= (t.end or t.start) for t in trips - ) - - if "to_terminal" in travel_item: - terminal_info = travel_item["to_terminal"] - terminal_country = terminal_info.get("country", "gb") - terminal_city = terminal_info.get("city", destination) - if terminal_country == "gb": - if not on_trip: - most_recent_location = (None, get_country("gb")) - else: - most_recent_location = ( - terminal_city, - get_country("gb"), - ) - else: - most_recent_location = ( - terminal_city, - get_country(terminal_country), - ) - else: - most_recent_location = (destination, get_country("gb")) - - # Check accommodation - only override if accommodation is more recent - for acc in trip.accommodation: - if "from" in acc: - try: - _, acc_date = _parse_datetime_field(acc["from"]) - except ValueError: - continue - - if acc_date <= target_date: - # Only update if this accommodation is more recent than existing result - if most_recent_date is None or acc_date > most_recent_date: - most_recent_date = acc_date - on_trip = any( - t.start <= acc_date <= (t.end or t.start) for t in trips - ) - most_recent_location = _get_accommodation_location( - acc, on_trip=on_trip - ) - - return most_recent_location - - -def _check_return_home_heuristic( - target_date: date, trips: list[Trip] -) -> tuple[str | None, pycountry.db.Country | None] | None: - """Check if should return home based on recent trips that have ended.""" - for trip in trips: - if trip.end and trip.end < target_date: - locations = trip.locations() - if locations: - final_city, final_country = locations[-1] - final_alpha_2 = final_country.alpha_2 - days_since_trip = (target_date - trip.end).days - - # If trip ended in UK, you should be home now - if hasattr(final_country, "alpha_2") and final_country.alpha_2 == "GB": - return (None, get_country("gb")) - - return None - - -def get_location_for_date( - target_date: date, - trips: list[Trip], -) -> tuple[str | None, pycountry.db.Country | None]: - """Get location (city, country) for a specific date using travel history.""" - # First check if currently on a trip - for trip in trips: - if not (trip.start <= target_date <= (trip.end or trip.start)): - continue - # For trips, find the most recent travel within the trip period - trip_location = _find_most_recent_travel_within_trip( - trip, - target_date, - ) - if trip_location: - return trip_location - - # Fallback: determine location based on trip progression and date - progression_location = _get_trip_location_by_progression(trip, target_date) - if progression_location: - return progression_location - - # Find most recent travel before this date - recent_travel = _find_most_recent_travel_before_date(target_date, trips) - - # Check for recent trips that have ended - prioritize this over individual travel data - # This handles cases where you're traveling home after a trip (e.g. stopovers, connections) - return_home = _check_return_home_heuristic(target_date, trips) - if return_home: - return return_home - - # Return most recent location or default to home - if recent_travel: - return recent_travel - - return (None, get_country("gb")) - - -def weekends( - start: date, busy_events: list[Event], trips: list[Trip], data_dir: str -) -> typing.Sequence[StrDict]: - """Next ten weekends.""" - weekday = start.weekday() - - # Calculate the difference to the next or previous Saturday - if weekday == 6: # Sunday - start_date = start - timedelta(days=1) - else: - start_date = start + timedelta(days=(5 - weekday)) - - weekends_info = [] - for i in range(52): - saturday = start_date + timedelta(weeks=i) - sunday = saturday + timedelta(days=1) - - saturday_events = [ - event - for event in busy_events - if event.end_date and event.as_date <= saturday <= event.end_as_date - ] - sunday_events = [ - event - for event in busy_events - if event.end_date and event.as_date <= sunday <= event.end_as_date - ] - - saturday_location = get_location_for_date( - saturday, - trips, - ) - sunday_location = get_location_for_date( - sunday, - trips, - ) - - weekends_info.append( - { - "date": saturday, - "saturday": saturday_events, - "sunday": sunday_events, - "saturday_location": saturday_location, - "sunday_location": sunday_location, - } - ) - - return weekends_info - - -def find_gaps(events: list[Event], min_gap_days: int = 3) -> list[StrDict]: - """Gaps of at least `min_gap_days` between events in a list of events.""" - # Sort events by start date - - gaps: list[tuple[date, date]] = [] - previous_event_end = None - - by_start_date = { - d: list(on_day) - for d, on_day in itertools.groupby(events, key=lambda e: e.as_date) - } - - by_end_date = { - d: list(on_day) - for d, on_day in itertools.groupby(events, key=lambda e: e.end_as_date) - } - - for event in events: - # Use start date for current event - start_date = event.as_date - - # If previous event exists, calculate the gap - if previous_event_end: - gap_days = (start_date - previous_event_end).days - if gap_days >= (min_gap_days + 2): - start_end = ( - previous_event_end + timedelta(days=1), - start_date - timedelta(days=1), - ) - gaps.append(start_end) - - # Update previous event end date - end = event.end_as_date - if not previous_event_end or end > previous_event_end: - previous_event_end = end - - return [ - { - "start": gap_start, - "end": gap_end, - "after": by_start_date[gap_end + timedelta(days=1)], - "before": by_end_date[gap_start - timedelta(days=1)], - } - for gap_start, gap_end in gaps - ] diff --git a/agenda/calendar.py b/agenda/calendar.py index f037a3c..4d2a8d8 100644 --- a/agenda/calendar.py +++ b/agenda/calendar.py @@ -1,115 +1,79 @@ """Calendar.""" import typing -import uuid from datetime import timedelta -from .event import Event +from .types import Event -# A map to associate event types with a specific calendar ID -event_type_calendar_map = { - "bank_holiday": "uk_holidays", - "conference": "conferences", - "us_holiday": "us_holidays", - "birthday": "birthdays", - "waste_schedule": "home", - "accommodation": "travel", - "market": "markets", +event_type_color_map = { + "bank_holiday": "success-subtle", + "conference": "primary-subtle", + "us_holiday": "secondary-subtle", + "birthday": "info-subtle", + "waste_schedule": "danger-subtle", } -# Define the calendars (categories) for TOAST UI -# These will be passed to the frontend to configure colors and names -toastui_calendars = [ - {"id": "default", "name": "General", "backgroundColor": "#00a9ff"}, - {"id": "uk_holidays", "name": "UK Bank Holiday", "backgroundColor": "#28a745"}, - {"id": "us_holidays", "name": "US Holiday", "backgroundColor": "#6c757d"}, - {"id": "conferences", "name": "Conference", "backgroundColor": "#007bff"}, - {"id": "birthdays", "name": "Birthday", "backgroundColor": "#17a2b8"}, - {"id": "home", "name": "Home", "backgroundColor": "#dc3545"}, - {"id": "travel", "name": "Travel", "backgroundColor": "#ffc107", "color": "#000"}, - {"id": "markets", "name": "Markets", "backgroundColor": "#e2e3e5", "color": "#000"}, -] +colors = { + "primary-subtle": "#cfe2ff", + "secondary-subtle": "#e2e3e5", + "success-subtle": "#d1e7dd", + "info-subtle": "#cff4fc", + "warning-subtle": "#fff3cd", + "danger-subtle": "#f8d7da", +} -def build_toastui_events(events: list[Event]) -> list[dict[str, typing.Any]]: - """Build a list of event objects for TOAST UI Calendar.""" +def build_events(events: list[Event]) -> list[dict[str, typing.Any]]: + """Build list of events for FullCalendar.""" items: list[dict[str, typing.Any]] = [] + one_day = timedelta(days=1) for e in events: if e.name == "today": continue - - # Determine the calendar ID for the event, defaulting if not mapped - calendar_id = event_type_calendar_map.get(e.name, "default") - - # Handle special case for 'accommodation' if e.name == "accommodation": assert e.title and e.end_date - # All-day event for the duration of the stay - items.append( - { - "id": str(uuid.uuid4()), - "calendarId": calendar_id, - "title": e.title_with_emoji, - "start": e.as_date.isoformat(), - "end": (e.end_as_date + one_day).isoformat(), - "isAllday": True, - "raw": {"url": e.url}, - } - ) - # Timed event for check-in - items.append( - { - "id": str(uuid.uuid4()), - "calendarId": calendar_id, - "title": f"Check-in: {e.title}", - "start": e.date.isoformat(), - "end": (e.date + timedelta(hours=1)).isoformat(), - "isAllday": False, - "raw": {"url": e.url}, - } - ) - # Timed event for check-out - items.append( - { - "id": str(uuid.uuid4()), - "calendarId": calendar_id, - "title": f"Checkout: {e.title}", - "start": e.end_date.isoformat(), - "end": (e.end_date + timedelta(hours=1)).isoformat(), - "isAllday": False, - "raw": {"url": e.url}, - } - ) + item = { + "allDay": True, + "title": e.display_title, + "start": e.as_date.isoformat(), + "end": (e.end_as_date + one_day).isoformat(), + "url": e.url, + } + items.append(item) + + item = { + "allDay": False, + "title": "checkin: " + e.title, + "start": e.date.isoformat(), + "url": e.url, + } + items.append(item) + item = { + "allDay": False, + "title": "checkout: " + e.title, + "start": e.end_date.isoformat(), + "url": e.url, + } + items.append(item) + continue - # Handle all other events - start_iso = e.date.isoformat() if e.has_time: - end_iso = (e.end_date or e.date + timedelta(minutes=30)).isoformat() + end = e.end_date or e.date + timedelta(hours=1) else: - # For all-day events, the end date is exclusive. - # For a single-day event, the end date should be the same as the start date. - # For a multi-day event, a day is added to make the range inclusive. - if e.end_date: - # This is a multi-day event, so add one day to the end. - end_date = e.end_as_date + one_day - else: - # This is a single-day event. The end date is the same as the start date. - end_date = e.as_date - end_iso = end_date.isoformat() - + end = (e.end_as_date if e.end_date else e.as_date) + one_day item = { - "id": str(uuid.uuid4()), - "calendarId": calendar_id, - "title": e.title_with_emoji, - "start": start_iso, - "end": end_iso, - "isAllday": not e.has_time, + "allDay": not e.has_time, + "title": e.display_title, + "start": e.date.isoformat(), + "end": end.isoformat(), } + if e.name in event_type_color_map: + item["color"] = colors[event_type_color_map[e.name]] + item["textColor"] = "black" if e.url: - item["raw"] = {"url": e.url} + item["url"] = e.url items.append(item) - return items diff --git a/agenda/carnival.py b/agenda/carnival.py deleted file mode 100644 index 7f6f2f4..0000000 --- a/agenda/carnival.py +++ /dev/null @@ -1,33 +0,0 @@ -"""Calculate the date for carnival.""" - -from datetime import date, timedelta - -from dateutil.easter import easter - -from .event import Event - - -def rio_carnival_events(start_date: date, end_date: date) -> list[Event]: - """List of events for Rio Carnival for each year between start_date and end_date.""" - events = [] - for year in range(start_date.year, end_date.year + 1): - easter_date = easter(year) - carnival_start = easter_date - timedelta(days=51) - carnival_end = easter_date - timedelta(days=46) - - # Only include the carnival if it falls within the specified date range - if ( - start_date <= carnival_start <= end_date - or start_date <= carnival_end <= end_date - ): - events.append( - Event( - name="carnival", - title="Rio Carnival", - date=carnival_start, - end_date=carnival_end, - url="https://en.wikipedia.org/wiki/Rio_Carnival", - ) - ) - - return events diff --git a/agenda/conference.py b/agenda/conference.py index 32c1256..da697fe 100644 --- a/agenda/conference.py +++ b/agenda/conference.py @@ -6,10 +6,7 @@ from datetime import date, datetime import yaml -from . import utils -from .event import Event - -MAX_CONF_DAYS = 20 +from .types import Event @dataclasses.dataclass @@ -21,8 +18,6 @@ class Conference: location: str start: date | datetime end: date | datetime - trip: date | None = None - country: str | None = None venue: str | None = None address: str | None = None url: str | None = None @@ -34,16 +29,6 @@ class Conference: online: bool = False price: decimal.Decimal | None = None currency: str | None = None - latitude: float | None = None - longitude: float | None = None - cfp_end: date | None = None - cfp_url: str | None = None - free: bool | None = None - hackathon: bool | None = None - ticket_type: str | None = None - attendees: int | None = None - hashtag: str | None = None - description: str | None = None @property def display_name(self) -> str: @@ -57,28 +42,17 @@ class Conference: def get_list(filepath: str) -> list[Event]: """Read conferences from a YAML file and return a list of Event objects.""" - events: list[Event] = [] - for conf in (Conference(**conf) for conf in yaml.safe_load(open(filepath, "r"))): - assert conf.start <= conf.end - duration = (utils.as_date(conf.end) - utils.as_date(conf.start)).days - assert duration < MAX_CONF_DAYS - event = Event( + return [ + Event( name="conference", date=conf.start, end_date=conf.end, - title=conf.display_name, + title=f"π€ {conf.display_name}", url=conf.url, going=conf.going, ) - events.append(event) - if not conf.cfp_end: - continue - cfp_end_event = Event( - name="cfp_end", - date=conf.cfp_end, - title="CFP end: " + conf.display_name, - url=conf.cfp_url or conf.url, + for conf in ( + Conference(**conf) + for conf in yaml.safe_load(open(filepath, "r"))["conferences"] ) - events.append(cfp_end_event) - - return events + ] diff --git a/agenda/data.py b/agenda/data.py index 7054a72..a508c39 100644 --- a/agenda/data.py +++ b/agenda/data.py @@ -1,50 +1,51 @@ """Agenda data.""" import asyncio +import collections +import itertools import os import typing from datetime import date, datetime, timedelta -from time import time import dateutil.rrule import dateutil.tz import flask +import holidays +import isodate # type: ignore import lxml import pytz +import yaml from . import ( accommodation, birthday, - bristol_waste, - busy, - carnival, + calendar, conference, domains, economist, - events_yaml, - gandi, + fx, gwr, hn, - holidays, meetup, - n_somerset_waste, stock_market, subscription, sun, thespacedevs, travel, uk_holiday, + uk_tz, + waste_schedule, ) -from .event import Event -from .types import StrDict -from .utils import time_function +from .types import Event, Holiday + +StrDict = dict[str, typing.Any] here = dateutil.tz.tzlocal() # deadline to file tax return # credit card expiry dates # morzine ski lifts -# chalet availability calendar +# chalet availablity calendar # starlink visible @@ -61,114 +62,298 @@ def timezone_transition( ] -async def n_somerset_waste_collection_events( - data_dir: str, postcode: str, uprn: str, force_cache: bool = False +def us_holidays(start_date: date, end_date: date) -> list[Holiday]: + """Get US holidays.""" + found: list[Holiday] = [] + for year in range(start_date.year, end_date.year + 1): + hols = holidays.country_holidays("US", years=year, language="en") + found += [ + Holiday(date=hol_date, name=title, country="us") + for hol_date, title in hols.items() + if start_date < hol_date < end_date + ] + + extra = [] + for h in found: + if h.name != "Thanksgiving": + continue + extra += [ + Holiday(date=h.date + timedelta(days=1), name="Black Friday", country="us"), + Holiday(date=h.date + timedelta(days=4), name="Cyber Monday", country="us"), + ] + + return found + extra + + +def get_nyse_holidays( + start_date: date, end_date: date, us_hols: list[Holiday] ) -> list[Event]: + """NYSE holidays.""" + known_us_hols = {(h.date, h.name) for h in us_hols} + found: list[Event] = [] + rename = {"Thanksgiving Day": "Thanksgiving"} + for year in range(start_date.year, end_date.year + 1): + hols = holidays.financial_holidays("NYSE", years=year) + found += [ + Event( + name="holiday", + date=hol_date, + title=rename.get(title, title), + ) + for hol_date, title in hols.items() + if start_date < hol_date < end_date + ] + found = [hol for hol in found if (hol.date, hol.title) not in known_us_hols] + for hol in found: + assert hol.title + hol.title += " (NYSE)" + return found + + +def get_holidays(country: str, start_date: date, end_date: date) -> list[Holiday]: + """Get holidays.""" + found: list[Holiday] = [] + for year in range(start_date.year, end_date.year + 1): + hols = holidays.country_holidays(country.upper(), years=year, language="en_US") + found += [ + Holiday( + date=hol_date, + name=title, + country=country.lower(), + ) + for hol_date, title in hols.items() + if start_date < hol_date < end_date + ] + + return found + + +def midnight(d: date) -> datetime: + """Convert from date to midnight on that day.""" + return datetime.combine(d, datetime.min.time()) + + +def dates_from_rrule( + rrule: str, start: date, end: date +) -> typing.Sequence[datetime | date]: + """Generate events from an RRULE between start_date and end_date.""" + all_day = not any(param in rrule for param in ["BYHOUR", "BYMINUTE", "BYSECOND"]) + + return [ + i.date() if all_day else uk_tz.localize(i) + for i in dateutil.rrule.rrulestr(rrule, dtstart=midnight(start)).between( + midnight(start), midnight(end) + ) + ] + + +async def waste_collection_events(data_dir: str) -> list[Event]: """Waste colllection events.""" - html = await n_somerset_waste.get_html(data_dir, postcode, uprn, force_cache) + postcode = "BS48 3HG" + uprn = "24071046" + + html = await waste_schedule.get_html(data_dir, postcode, uprn) root = lxml.html.fromstring(html) - events = n_somerset_waste.parse(root) + events = waste_schedule.parse(root) return events async def bristol_waste_collection_events( - data_dir: str, start_date: date, uprn: str, force_cache: bool = False + data_dir: str, start_date: date ) -> list[Event]: """Waste colllection events.""" - cache = "force" if force_cache else "recent" - return await bristol_waste.get(start_date, data_dir, uprn, cache) + uprn = "358335" + + return await waste_schedule.get_bristol_gov_uk(start_date, data_dir, uprn) -def find_events_during_stay( - accommodation_events: list[Event], markets: list[Event] -) -> list[Event]: - """Market events that happen during accommodation stays.""" - overlapping_markets = [] - for market in markets: - market_date = market.as_date - assert isinstance(market_date, date) - for e in accommodation_events: - start, end = e.as_date, e.end_as_date - assert start and end and all(isinstance(i, date) for i in (start, end)) - # Check if the market date is within the accommodation dates. - if start <= market_date <= end: - overlapping_markets.append(market) - break # Breaks the inner loop if overlap is found. - return overlapping_markets +def combine_holidays(holidays: list[Holiday]) -> list[Event]: + """Combine UK and US holidays with the same date and title.""" + all_countries = {h.country for h in holidays} -def hide_markets_while_away( - events: list[Event], accommodation_events: list[Event] -) -> None: - """Hide markets that happen while away.""" - optional = [ - e - for e in events - if e.name == "market" or (e.title and "LHG Run Club" in e.title) - ] - going = [e for e in events if e.going] + standard_name = { + (1, 1): "New Year's Day", + (1, 6): "Epiphany", + (5, 1): "Labour Day", + (8, 15): "Assumption Day", + (12, 8): "Immaculate conception", + (12, 25): "Christmas Day", + (12, 26): "Boxing Day", + } - overlapping_markets = find_events_during_stay( - accommodation_events + going, optional - ) - for market in overlapping_markets: - events.remove(market) + combined: collections.defaultdict[ + tuple[date, str], set[str] + ] = collections.defaultdict(set) + for h in holidays: + assert isinstance(h.name, str) and isinstance(h.date, date) -class AgendaData(typing.TypedDict, total=False): - """Agenda Data.""" + event_key = (h.date, standard_name.get((h.date.month, h.date.day), h.name)) + combined[event_key].add(h.country) - now: datetime - stock_markets: list[str] - rockets: list[thespacedevs.Summary] - gwr_advance_tickets: date | None - data_gather_seconds: float - stock_market_times_seconds: float - timings: list[tuple[str, float]] - events: list[Event] - accommodation_events: list[Event] - gaps: list[StrDict] - sunrise: datetime - sunset: datetime - last_week: date - two_weeks_ago: date - errors: list[tuple[str, Exception]] - - -def rocket_launch_events(rockets: list[thespacedevs.Summary]) -> list[Event]: - """Rocket launch events.""" events: list[Event] = [] - for launch in rockets: - dt = None - - net_precision = launch["net_precision"] - skip = {"Year", "Month", "Quarter", "Fiscal Year"} - if net_precision == "Day": - dt = datetime.strptime(launch["net"], "%Y-%m-%dT%H:%M:%SZ").date() - elif ( - net_precision - and net_precision not in skip - and "Year" not in net_precision - and launch["t0_time"] - ): - dt = pytz.utc.localize( - datetime.strptime(launch["net"], "%Y-%m-%dT%H:%M:%SZ") + for (d, name), countries in combined.items(): + if len(countries) == len(all_countries): + country_list = "" + elif len(countries) < len(all_countries) / 2: + country_list = ", ".join(sorted(country.upper() for country in countries)) + else: + country_list = "not " + ", ".join( + sorted(country.upper() for country in all_countries - set(countries)) ) - if not dt: - continue - - rocket_name = ( - f'{launch["rocket"]["full_name"]}: ' - + f'{launch["mission_name"] or "[no mission]"}' + e = Event( + name="holiday", + date=d, + title=f"{name} ({country_list})" if country_list else name, ) - e = Event(name="rocket", date=dt, title=rocket_name) events.append(e) return events -async def get_data(now: datetime, config: flask.config.Config) -> AgendaData: +def get_yaml_event_date_field(item: dict[str, str]) -> str: + """Event date field name.""" + return ( + "end_date" + if item["name"] == "travel_insurance" + else ("start_date" if "start_date" in item else "date") + ) + + +def get_yaml_event_end_date_field(item: dict[str, str]) -> str: + """Event date field name.""" + return ( + "end_date" + if item["name"] == "travel_insurance" + else ("start_date" if "start_date" in item else "date") + ) + + +def read_events_yaml(data_dir: str, start: date, end: date) -> list[Event]: + """Read eventes from YAML file.""" + events: list[Event] = [] + for item in yaml.safe_load(open(os.path.join(data_dir, "events.yaml"))): + duration = ( + isodate.parse_duration(item["duration"]) if "duration" in item else None + ) + dates = ( + dates_from_rrule(item["rrule"], start, end) + if "rrule" in item + else [item[get_yaml_event_date_field(item)]] + ) + for dt in dates: + e = Event( + name=item["name"], + date=dt, + end_date=( + dt + duration + if duration + else ( + item.get("end_date") + if item["name"] != "travel_insurance" + else None + ) + ), + title=item.get("title"), + url=item.get("url"), + ) + events.append(e) + return events + + +def find_markets_during_stay( + accommodation_events: list[Event], markets: list[Event] +) -> list[Event]: + """Market events that happen during accommodation stays.""" + overlapping_markets = [] + for market in markets: + for e in accommodation_events: + # Check if the market date is within the accommodation dates. + if e.as_date <= market.as_date <= e.end_as_date: + overlapping_markets.append(market) + break # Breaks the inner loop if overlap is found. + return overlapping_markets + + +def find_gaps(events: list[Event], min_gap_days: int = 3) -> list[StrDict]: + """Gaps of at least `min_gap_days` between events in a list of events.""" + # Sort events by start date + + gaps: list[tuple[date, date]] = [] + previous_event_end = None + + by_start_date = { + d: list(on_day) + for d, on_day in itertools.groupby(events, key=lambda e: e.as_date) + } + + by_end_date = { + d: list(on_day) + for d, on_day in itertools.groupby(events, key=lambda e: e.end_as_date) + } + + for event in events: + # Use start date for current event + start_date = event.as_date + + # If previous event exists, calculate the gap + if previous_event_end: + gap_days = (start_date - previous_event_end).days + if gap_days >= (min_gap_days + 2): + start_end = ( + previous_event_end + timedelta(days=1), + start_date - timedelta(days=1), + ) + gaps.append(start_end) + + # Update previous event end date + end = event.end_as_date + if not previous_event_end or end > previous_event_end: + previous_event_end = end + + return [ + { + "start": gap_start, + "end": gap_end, + "after": by_start_date[gap_end + timedelta(days=1)], + "before": by_end_date[gap_start - timedelta(days=1)], + } + for gap_start, gap_end in gaps + ] + + +def busy_event(e: Event) -> bool: + """Busy.""" + if e.name not in { + "event", + "accommodation", + "conference", + "dodainville", + "transport", + "meetup", + "party", + }: + return False + + if e.title in ("IA UK board meeting", "Mill Road Winter Fair"): + return False + + if e.name == "conference" and not e.going: + return False + if not e.title: + return True + if e.title == "LHG Run Club" or "Third Thursday Social" in e.title: + return False + + lc_title = e.title.lower() + return "rebels" not in lc_title and "south west data social" not in lc_title + + +async def get_data( + now: datetime, config: flask.config.Config +) -> typing.Mapping[str, str | object]: """Get data to display on agenda dashboard.""" data_dir = config["DATA_DIR"] @@ -182,51 +367,28 @@ async def get_data(now: datetime, config: flask.config.Config) -> AgendaData: minus_365 = now - timedelta(days=365) plus_365 = now + timedelta(days=365) - t0 = time() - offline_mode = bool(config.get("OFFLINE_MODE")) - result_list = await asyncio.gather( - time_function( - "gwr_advance_tickets", gwr.advance_ticket_date, data_dir, offline_mode - ), - time_function( - "backwell_bins", - n_somerset_waste_collection_events, - data_dir, - config["BACKWELL_POSTCODE"], - config["BACKWELL_UPRN"], - offline_mode, - ), - time_function( - "bristol_bins", - bristol_waste_collection_events, - data_dir, - today, - config["BRISTOL_UPRN"], - offline_mode, - ), + ( + gbpusd, + gwr_advance_tickets, + bank_holiday, + rockets, + backwell_bins, + bristol_bins, + ) = await asyncio.gather( + fx.get_gbpusd(config), + gwr.advance_ticket_date(data_dir), + uk_holiday.bank_holiday_list(last_year, next_year, data_dir), + thespacedevs.get_launches(rocket_dir, limit=40), + waste_collection_events(data_dir), + bristol_waste_collection_events(data_dir, today), ) - rockets = thespacedevs.read_cached_launches(rocket_dir) - results = {call[0]: call[1] for call in result_list} - - errors = [(call[0], call[3]) for call in result_list if call[3]] - - gwr_advance_tickets = results["gwr_advance_tickets"] - - data_gather_seconds = time() - t0 - t0 = time() - - stock_market_times = stock_market.open_and_close() - stock_market_times_seconds = time() - t0 - - reply: AgendaData = { + reply: dict[str, typing.Any] = { "now": now, - "stock_markets": stock_market_times, + "gbpusd": gbpusd, + "stock_markets": stock_market.open_and_close(), "rockets": rockets, "gwr_advance_tickets": gwr_advance_tickets, - "data_gather_seconds": data_gather_seconds, - "stock_market_times_seconds": stock_market_times_seconds, - "timings": [(call[0], call[2]) for call in result_list], } my_data = config["PERSONAL_DATA"] @@ -243,43 +405,85 @@ async def get_data(now: datetime, config: flask.config.Config) -> AgendaData: if gwr_advance_tickets: events.append(Event(name="gwr_advance_tickets", date=gwr_advance_tickets)) - us_hols = holidays.us_holidays(last_year, next_year) - events += holidays.get_nyse_holidays(last_year, next_year, us_hols) + us_hols = us_holidays(last_year, next_year) + + holidays: list[Holiday] = bank_holiday + us_hols + for country in ( + "at", + "be", + "br", + "ch", + "cz", + "de", + "dk", + "ee", + "es", + "fi", + "fr", + "gr", + "it", + "ke", + "nl", + "pl", + ): + holidays += get_holidays(country, last_year, next_year) + + events += get_nyse_holidays(last_year, next_year, us_hols) accommodation_events = accommodation.get_events( os.path.join(my_data, "accommodation.yaml") ) - holiday_list = holidays.get_all(last_year, next_year, data_dir) - events += holidays.combine_holidays(holiday_list) - if flask.g.user.is_authenticated: - events += birthday.get_birthdays( - last_year, os.path.join(my_data, "entities.yaml") - ) - events += domains.renewal_dates(my_data) + events += combine_holidays(holidays) + events += birthday.get_birthdays(last_year, os.path.join(my_data, "entities.yaml")) events += accommodation_events events += travel.all_events(my_data) events += conference.get_list(os.path.join(my_data, "conferences.yaml")) - for key in "backwell_bins", "bristol_bins": - if results[key]: - events += results[key] - events += events_yaml.read(my_data, last_year, next_year) + events += backwell_bins + bristol_bins + events += read_events_yaml(my_data, last_year, next_year) events += subscription.get_events(os.path.join(my_data, "subscriptions.yaml")) - events += gandi.get_events(data_dir) events += economist.publication_dates(last_week, next_year) events += meetup.get_events(my_data) events += hn.whoishiring(last_year, next_year) - events += carnival.rio_carnival_events(last_year, next_year) - events += rocket_launch_events(rockets) + + events += domains.renewal_dates(my_data) + + # hide markets that happen while away + markets = [e for e in events if e.name == "market"] + going = [e for e in events if e.going] + + overlapping_markets = find_markets_during_stay( + accommodation_events + going, markets + ) + for market in overlapping_markets: + events.remove(market) + + for launch in rockets: + dt = None + + if launch["net_precision"] == "Day": + dt = datetime.strptime(launch["net"], "%Y-%m-%dT00:00:00Z").date() + elif launch["t0_time"]: + dt = pytz.utc.localize( + datetime.strptime(launch["net"], "%Y-%m-%dT%H:%M:%SZ") + ) + + if not dt: + continue + + rocket_name = f'π{launch["rocket"]}: {launch["mission_name"] or "[no mission]"}' + e = Event(name="rocket", date=dt, title=rocket_name) + events.append(e) + events += [Event(name="today", date=today)] busy_events = [ e for e in sorted(events, key=lambda e: e.as_date) - if e.as_date > today and e.as_date < next_year and busy.busy_event(e) + if e.as_date > today and e.as_date < next_year and busy_event(e) ] - gaps = busy.find_gaps(busy_events) + gaps = find_gaps(busy_events) events += [ Event(name="gap", date=gap["start"], end_date=gap["end"]) for gap in gaps @@ -297,10 +501,9 @@ async def get_data(now: datetime, config: flask.config.Config) -> AgendaData: reply["sunrise"] = sun.sunrise(observer) reply["sunset"] = sun.sunset(observer) reply["events"] = events - reply["accommodation_events"] = accommodation_events reply["last_week"] = last_week reply["two_weeks_ago"] = two_weeks_ago - reply["errors"] = errors + reply["fullcalendar_events"] = calendar.build_events(events) return reply diff --git a/agenda/domains.py b/agenda/domains.py index 5b03bff..99c614e 100644 --- a/agenda/domains.py +++ b/agenda/domains.py @@ -1,10 +1,10 @@ -"""Domain renewal dates.""" +"""Accomodation.""" import csv import os from datetime import datetime -from .event import Event +from .types import Event url = "https://admin.gandi.net/domain/01578ef0-a84b-11e7-bdf3-00163e6dc886/" diff --git a/agenda/economist.py b/agenda/economist.py index 21a8199..4438b98 100644 --- a/agenda/economist.py +++ b/agenda/economist.py @@ -5,7 +5,7 @@ from datetime import date, time, timedelta from dateutil.relativedelta import TH, relativedelta from . import uk_time -from .event import Event +from .types import Event def publication_dates(start_date: date, end_date: date) -> list[Event]: diff --git a/agenda/event.py b/agenda/event.py deleted file mode 100644 index b975625..0000000 --- a/agenda/event.py +++ /dev/null @@ -1,149 +0,0 @@ -"""Types.""" - -import datetime -from dataclasses import dataclass - -from . import utils -from .types import DateOrDateTime, StrDict - -emojis = { - "market": "π§Ί", - "us_presidential_election": "π³οΈπΊπΈ", - "bus_route_closure": "πβ", - "meetup": "π₯", - "dinner": "π·", - "party": "π·", - "ba_voucher": "βοΈ", - "accommodation": "π¨", # alternative: π§³ - "flight": "βοΈ", - "conference": "π€", - "rocket": "π", - "birthday": "π", - "waste_schedule": "ποΈ", - "economist": "π°", - "running": "π", - "critical_mass": "π΄", - "trip": "π§³", - "hackathon": "π»", -} - - -@dataclass -class Event: - """Event.""" - - name: str - date: DateOrDateTime - end_date: DateOrDateTime | None = None - title: str | None = None - url: str | None = None - going: bool | None = None - - @property - def as_datetime(self) -> datetime.datetime: - """Date/time of event.""" - return utils.as_datetime(self.date) - - @property - def has_time(self) -> bool: - """Event has a time associated with it.""" - return isinstance(self.date, datetime.datetime) - - @property - def as_date(self) -> datetime.date: - """Date of event.""" - return ( - self.date.date() if isinstance(self.date, datetime.datetime) else self.date - ) - - @property - def end_as_date(self) -> datetime.date: - """Date of event.""" - return ( - ( - self.end_date.date() - if isinstance(self.end_date, datetime.datetime) - else self.end_date - ) - if self.end_date - else self.as_date - ) - - @property - def display_time(self) -> str | None: - """Time for display on web page.""" - return ( - self.date.strftime("%H:%M") - if isinstance(self.date, datetime.datetime) - else None - ) - - @property - def display_timezone(self) -> str | None: - """Timezone for display on web page.""" - return ( - self.date.strftime("%z") - if isinstance(self.date, datetime.datetime) - else None - ) - - def display_duration(self) -> str | None: - """Duration for display.""" - if self.end_as_date != self.as_date or not self.has_time: - return None - - assert isinstance(self.date, datetime.datetime) - assert isinstance(self.end_date, datetime.datetime) - - secs: int = int((self.end_date - self.date).total_seconds()) - - hours: int = secs // 3600 - mins: int = (secs % 3600) // 60 - - if mins == 0: - return f"{hours:d}h" - if hours == 0: - return f"{mins:d} mins" - - return f"{hours:d}h {mins:02d} mins" - - def delta_days(self, today: datetime.date) -> str: - """Return number of days from today as a string.""" - delta = (self.as_date - today).days - - match delta: - case 0: - return "today" - case 1: - return "1 day" - case _: - return f"{delta:,d} days" - - @property - def display_date(self) -> str: - """Date for display on web page.""" - if isinstance(self.date, datetime.datetime): - return self.date.strftime("%a, %d, %b %Y %H:%M %z") - else: - return self.date.strftime("%a, %d, %b %Y") - - @property - def display_title(self) -> str: - """Name for display.""" - return self.title or self.name - - @property - def emoji(self) -> str | None: - """Emoji.""" - if self.title == "LHG Run Club": - return "ππ»" - return emojis.get(self.name) - - @property - def title_with_emoji(self) -> str | None: - """Title with optional emoji at the start.""" - title = self.title or self.name - if title is None: - return None - emoji = self.emoji - return f"{emoji} {title}" if emoji else title diff --git a/agenda/events_yaml.py b/agenda/events_yaml.py deleted file mode 100644 index 8678d64..0000000 --- a/agenda/events_yaml.py +++ /dev/null @@ -1,85 +0,0 @@ -"""Read events from YAML.""" - -import os -import typing -from datetime import date, datetime - -import dateutil.rrule -import isodate # type: ignore -import yaml - -from . import uk_tz -from .event import Event - - -def midnight(d: date) -> datetime: - """Convert from date to midnight on that day.""" - return datetime.combine(d, datetime.min.time()) - - -def dates_from_rrule( - rrule: str, start: date, end: date -) -> typing.Sequence[datetime | date]: - """Generate events from an RRULE between start_date and end_date.""" - all_day = not any(param in rrule for param in ["BYHOUR", "BYMINUTE", "BYSECOND"]) - - return [ - i.date() if all_day else uk_tz.localize(i) - for i in dateutil.rrule.rrulestr(rrule, dtstart=midnight(start)).between( - midnight(start), midnight(end) - ) - ] - - -def get_yaml_event_date_field(item: dict[str, str]) -> str: - """Event date field name.""" - return ( - "end_date" - if item["name"] == "travel_insurance" - else ("start_date" if "start_date" in item else "date") - ) - - -def get_yaml_event_end_date_field(item: dict[str, str]) -> str: - """Event date field name.""" - return ( - "end_date" - if item["name"] == "travel_insurance" - else ("start_date" if "start_date" in item else "date") - ) - - -def read( - data_dir: str, start: date, end: date, skip_trips: bool = False -) -> list[Event]: - """Read eventes from YAML file.""" - events: list[Event] = [] - for item in yaml.safe_load(open(os.path.join(data_dir, "events.yaml"))): - if "trip" in item and skip_trips: - continue - duration = ( - isodate.parse_duration(item["duration"]) if "duration" in item else None - ) - dates = ( - dates_from_rrule(item["rrule"], start, end) - if "rrule" in item - else [item[get_yaml_event_date_field(item)]] - ) - for dt in dates: - e = Event( - name=item["name"], - date=dt, - end_date=( - dt + duration - if duration - else ( - item.get("end_date") - if item["name"] != "travel_insurance" - else None - ) - ), - title=item.get("title"), - url=item.get("url"), - ) - events.append(e) - return events diff --git a/agenda/fx.py b/agenda/fx.py index 61c4bb7..1d48e49 100644 --- a/agenda/fx.py +++ b/agenda/fx.py @@ -42,74 +42,3 @@ async def get_gbpusd(config: flask.config.Config) -> Decimal: data = json.loads(r.text, parse_float=Decimal) return typing.cast(Decimal, 1 / data["quotes"]["USDGBP"]) - - -def read_cached_rates( - filename: str | None, currencies: list[str] -) -> dict[str, Decimal]: - """Read FX rates from cache.""" - if filename is None: - return {} - - with open(filename) as file: - data = json.load(file, parse_float=Decimal) - - return { - cur: Decimal(data["quotes"][f"GBP{cur}"]) - for cur in currencies - if f"GBP{cur}" in data["quotes"] - } - - -def get_rates(config: flask.config.Config) -> dict[str, Decimal]: - """Get current values of exchange rates for a list of currencies against GBP.""" - currencies = config["CURRENCIES"] - access_key = config["EXCHANGERATE_ACCESS_KEY"] - data_dir = config["DATA_DIR"] - - now = datetime.now() - now_str = now.strftime("%Y-%m-%d_%H:%M") - fx_dir = os.path.join(data_dir, "fx") - os.makedirs(fx_dir, exist_ok=True) # Ensure the directory exists - - currency_string = ",".join(sorted(currencies)) - file_suffix = f"{currency_string}_to_GBP.json" - existing_data = os.listdir(fx_dir) - existing_files = [f for f in existing_data if f.endswith(".json")] - - full_path: str | None = None - - if existing_files: - recent_filename = max(existing_files) - recent = datetime.strptime(recent_filename[:16], "%Y-%m-%d_%H:%M") - delta = now - recent - - full_path = os.path.join(fx_dir, recent_filename) - if recent_filename.endswith(file_suffix) and ( - delta < timedelta(hours=12) or config["OFFLINE_MODE"] - ): - return read_cached_rates(full_path, currencies) - - url = "http://api.exchangerate.host/live" - params = {"currencies": currency_string, "source": "GBP", "access_key": access_key} - - filename = f"{now_str}_{file_suffix}" - try: - with httpx.Client() as client: - response = client.get(url, params=params, timeout=10) - except (httpx.ConnectError, httpx.ReadTimeout): - return read_cached_rates(full_path, currencies) - - try: - data = json.loads(response.text, parse_float=Decimal) - except json.decoder.JSONDecodeError: - return read_cached_rates(full_path, currencies) - - with open(os.path.join(fx_dir, filename), "w") as file: - file.write(response.text) - - return { - cur: Decimal(data["quotes"][f"GBP{cur}"]) - for cur in currencies - if f"GBP{cur}" in data["quotes"] - } diff --git a/agenda/gandi.py b/agenda/gandi.py deleted file mode 100644 index b5f3278..0000000 --- a/agenda/gandi.py +++ /dev/null @@ -1,27 +0,0 @@ -"""Gandi domain renewal dates.""" - -import os -from .event import Event -from datetime import datetime - -import json - - -def get_events(data_dir: str) -> list[Event]: - """Get subscription renewal dates.""" - filename = os.path.join(data_dir, "gandi_domains.json") - - with open(filename) as f: - items = json.load(f) - - assert isinstance(items, list) - assert all(item["fqdn"] and item["dates"]["registry_ends_at"] for item in items) - - return [ - Event( - date=datetime.fromisoformat(item["dates"]["registry_ends_at"]).date(), - name="domain", - title=item["fqdn"] + " renewal", - ) - for item in items - ] diff --git a/agenda/geomob.py b/agenda/geomob.py deleted file mode 100644 index f7383b3..0000000 --- a/agenda/geomob.py +++ /dev/null @@ -1,110 +0,0 @@ -"""Geomob events.""" - -import os -from dataclasses import dataclass -from datetime import date, datetime -from typing import List - -import dateutil.parser -import flask -import lxml.html -import requests - -import agenda.mail -import agenda.utils - - -@dataclass(frozen=True) -class GeomobEvent: - """Geomob event.""" - - date: date - href: str - hashtag: str - - -def extract_events( - tree: lxml.html.HtmlElement, -) -> List[GeomobEvent]: - """Extract upcoming events from the HTML content.""" - events = [] - - for event in tree.xpath('//ol[@class="event-list"]/li/a'): - date_str, _, hashtag = event.text_content().strip().rpartition(" ") - events.append( - GeomobEvent( - date=dateutil.parser.parse(date_str).date(), - href=event.get("href"), - hashtag=hashtag, - ) - ) - - return events - - -def find_new_events( - prev: list[GeomobEvent], cur: list[GeomobEvent] -) -> list[GeomobEvent]: - """Find new events that appear in cur but not in prev.""" - return list(set(cur) - set(prev)) - - -def geomob_email(new_events: list[GeomobEvent], base_url: str) -> tuple[str, str]: - """Generate email subject and body for new events. - - Args: - new_events (List[Event]): List of new events. - base_url (str): The base URL of the website. - - Returns: - tuple[str, str]: Email subject and body. - """ - assert new_events - - subject = f"{len(new_events)} New Geomob Event(s) Announced" - - body_lines = ["Hello,\n", "Here are the new Geomob events:\n"] - for event in new_events: - url = base_url + event.href - # Check for double slashes in the path part only (after protocol) - if "://" in url: - protocol, rest = url.split("://", 1) - assert "//" not in rest, f"Double slash found in URL path: {url}" - else: - assert "//" not in url, f"Double slash found in URL: {url}" - event_details = f"Date: {event.date}\nURL: {url}\nHashtag: {event.hashtag}\n" - body_lines.append(event_details) - body_lines.append("-" * 40) - - body = "\n".join(body_lines) - return (subject, body) - - -def get_cached_upcoming_events_list(geomob_dir: str) -> list[GeomobEvent]: - """Get known geomob events.""" - filename = agenda.utils.get_most_recent_file(geomob_dir, "html") - return extract_events(lxml.html.parse(filename).getroot()) if filename else [] - - -def update(config: flask.config.Config) -> None: - """Get upcoming Geomob events and report new ones.""" - geomob_dir = os.path.join(config["DATA_DIR"], "geomob") - - prev_events = get_cached_upcoming_events_list(geomob_dir) - r = requests.get("https://thegeomob.com/") - cur_events = extract_events(lxml.html.fromstring(r.content)) - - if cur_events == prev_events: - return # no change - - now = datetime.now() - new_filename = os.path.join(geomob_dir, now.strftime("%Y-%m-%d_%H:%M:%S.html")) - open(new_filename, "w").write(r.text) - - new_events = list(set(cur_events) - set(prev_events)) - if not new_events: - return - - base_url = "https://thegeomob.com" - subject, body = geomob_email(new_events, base_url) - agenda.mail.send_mail(config, subject, body) diff --git a/agenda/gwr.py b/agenda/gwr.py index cc7e968..5806939 100644 --- a/agenda/gwr.py +++ b/agenda/gwr.py @@ -10,30 +10,6 @@ import httpx url = "https://www.gwr.com/your-tickets/choosing-your-ticket/advance-tickets" -def parse_date_string(date_str: str) -> date: - """Parse date string from HTML.""" - if not date_str[-1].isdigit(): # If the year is missing, use the current year - date_str += f" {date.today().year}" - - return datetime.strptime(date_str, "%A %d %B %Y").date() - - -def extract_dates(html: str) -> None | dict[str, date]: - """Extract dates from HTML.""" - pattern = re.compile( - r"