agenda/agenda/__init__.py
2023-10-08 19:53:44 +01:00

388 lines
12 KiB
Python

import configparser
import json
import operator
import os
import typing
import warnings
from datetime import date, datetime, timedelta, timezone
from decimal import Decimal
from time import time as unixtime
from typing import List
import dateutil
import dateutil.parser
import exchange_calendars
import holidays
import lxml
import pandas
import pytz
import requests
import yaml
from dateutil.easter import easter
from agenda import thespacedevs
from . import waste_schedule
from .types import Event
warnings.simplefilter(action="ignore", category=FutureWarning)
# deadline to file tax return
# credit card expiry dates
# morzine ski lifts
# chalet availablity calendar
# sunrise and sunset
# starlink visible
here = dateutil.tz.tzlocal()
next_us_presidential_election = date(2024, 11, 5)
xmas_last_posting_dates = {"first": date(2023, 12, 20), "second": date(2023, 12, 18)}
config_filename = os.path.join(os.path.dirname(__file__), "..", "config")
assert os.path.exists(config_filename)
config = configparser.ConfigParser()
config.read(config_filename)
access_key = config.get("exchangerate", "access_key")
data_dir = config.get("data", "dir")
def next_uk_mothers_day(input_date: date) -> date:
"""Calculate the date of the next UK Mother's Day from the current date."""
current_year = input_date.year
easter_date = easter(current_year)
# Calculate the date of Mother's Day, which is the fourth Sunday of Lent
mothers_day = easter_date + timedelta(weeks=3)
# Check if Mother's Day has already passed this year
if input_date > mothers_day:
# If it has passed, calculate for the next year
easter_date = easter(current_year + 1)
mothers_day = easter_date + timedelta(weeks=3)
return mothers_day
def next_uk_fathers_day(input_date: date) -> date:
"""Calculate the date of the next UK Father's Day from the current date."""
# Get the current date
# Calculate the day of the week for the current date (0 = Monday, 6 = Sunday)
current_day_of_week = input_date.weekday()
# Calculate the number of days until the next Sunday
days_until_sunday = (6 - current_day_of_week) % 7
# Calculate the date of the next Sunday
next_sunday = input_date + timedelta(days=days_until_sunday)
# Calculate the date of Father's Day, which is the third Sunday of June
fathers_day = date(next_sunday.year, 6, 1) + timedelta(
weeks=2, days=next_sunday.weekday()
)
# Check if Father's Day has already passed this year
if input_date > fathers_day:
# If it has passed, calculate for the next year
fathers_day = date(fathers_day.year + 1, 6, 1) + timedelta(
weeks=2, days=next_sunday.weekday()
)
return fathers_day
def get_next_timezone_transition(from_dt: datetime, tz_name: str) -> date:
"""Datetime of the next time the clocks change."""
tz = pytz.timezone(tz_name)
dt = next(t for t in tz._utc_transition_times if t > from_dt)
return typing.cast(date, dt.date())
def get_next_bank_holiday(input_date: date) -> list[Event]:
"""Date and name of the next UK bank holiday."""
url = "https://www.gov.uk/bank-holidays.json"
filename = os.path.join(data_dir, "bank-holidays.json")
mtime = os.path.getmtime(filename)
if (unixtime() - mtime) > 3600: # one hour
r = requests.get(url)
open(filename, "w").write(r.text)
year_later = input_date + timedelta(days=365)
events = json.load(open(filename))["england-and-wales"]["events"]
hols: list[Event] = []
for event in events:
event_date = datetime.strptime(event["date"], "%Y-%m-%d").date()
if event_date < input_date:
continue
if event_date > year_later:
break
hols.append(Event(name="bank_holiday", date=event_date, title=event["title"]))
return hols
def get_gbpusd() -> Decimal:
"""Get the current value for GBPUSD, with caching."""
now = datetime.now()
now_str = now.strftime("%Y-%m-%d_%H:%M")
fx_dir = os.path.join(data_dir, "fx")
existing_data = os.listdir(fx_dir)
existing = [f for f in existing_data if f.endswith("_GBPUSD.json")]
if existing:
recent_filename = max(existing)
recent = datetime.strptime(recent_filename, "%Y-%m-%d_%H:%M_GBPUSD.json")
delta = now - recent
if existing and delta < timedelta(hours=6):
full = os.path.join(fx_dir, recent_filename)
data = json.load(open(full), parse_float=Decimal)
if "quotes" in data and "USDGBP" in data["quotes"]:
return typing.cast(Decimal, 1 / data["quotes"]["USDGBP"])
url = "http://api.exchangerate.host/live"
params = {"currencies": "GBP,USD", "access_key": access_key}
filename = f"{fx_dir}/{now_str}_GBPUSD.json"
r = requests.get(url, params=params)
open(filename, "w").write(r.text)
data = json.loads(r.text, parse_float=Decimal)
return typing.cast(Decimal, 1 / data["quotes"]["USDGBP"])
def next_economist(input_date: date) -> date:
"""Next date that the Economist is published."""
# Define the publication day (Thursday) and the day of the week of the input date
publication_day = 3 # Thursday (0 - Monday, 1 - Tuesday, ..., 6 - Sunday)
current_day_of_week = input_date.weekday()
current_week_number = input_date.isocalendar().week
# Define the list of weeks when The Economist is not published
non_publication_weeks = [26, 56]
# Check if the input date is a publication day (Thursday)
if (
current_day_of_week == publication_day
and current_week_number not in non_publication_weeks
):
return input_date
# Calculate the date for the next Thursday after the input date
days_until_next_thursday = (publication_day - current_day_of_week + 7) % 7
next_thursday_date = input_date + timedelta(days=days_until_next_thursday)
# Check if the next Thursday falls in a non-publication week
while next_thursday_date.isocalendar().week in non_publication_weeks:
# If it does, add 7 days to find the next Thursday
next_thursday_date += timedelta(days=7)
return next_thursday_date
def uk_financial_year_end(input_date: date) -> date:
"""Next date of the end of the UK financial year, April 5th."""
# Determine the year of the input date
year = input_date.year
# Calculate the UK financial year end date (April 5th)
uk_financial_year_end = date(year, 4, 5)
# If the input date is on or after the UK financial year end date,
# move to the next year
if input_date > uk_financial_year_end:
uk_financial_year_end = date(year + 1, 4, 5)
return uk_financial_year_end
def timedelta_display(delta: timedelta) -> str:
"""Format timedelta as a human readable string."""
total_seconds = int(delta.total_seconds())
days, remainder = divmod(total_seconds, 24 * 60 * 60)
hours, remainder = divmod(remainder, 60 * 60)
mins, secs = divmod(remainder, 60)
return " ".join(
f"{v:>3} {label}"
for v, label in ((days, "days"), (hours, "hrs"), (mins, "mins"))
if v
)
def stock_markets() -> list[str]:
"""Stock markets open and close times."""
# The trading calendars code is slow, maybe there is a faster way to do this
# Or we could cache the result
now = pandas.Timestamp.now(timezone.utc)
now_local = pandas.Timestamp.now(here)
markets = [
("XLON", "London"),
("XNYS", "US"),
]
reply = []
for code, label in markets:
cal = exchange_calendars.get_calendar(code)
if cal.is_open_on_minute(now_local):
next_close = cal.next_close(now).tz_convert(here)
next_close = next_close.replace(minute=round(next_close.minute, -1))
delta_close = timedelta_display(next_close - now_local)
prev_open = cal.previous_open(now).tz_convert(here)
prev_open = prev_open.replace(minute=round(prev_open.minute, -1))
delta_open = timedelta_display(now_local - prev_open)
msg = (
f"{label:>6} market opened {delta_open} ago, "
+ f"closes in {delta_close} ({next_close:%H:%M})"
)
else:
ts = cal.next_open(now)
ts = ts.replace(minute=round(ts.minute, -1))
ts = ts.tz_convert(here)
delta = timedelta_display(ts - now_local)
msg = f"{label:>6} market opens in {delta}" + (
f" ({ts:%H:%M})" if (ts - now_local) < timedelta(days=1) else ""
)
reply.append(msg)
return reply
def get_us_holidays(input_date: date) -> list[Event]:
"""Date and name of next US holiday."""
hols = holidays.UnitedStates(years=[input_date.year, input_date.year + 1])
return [
Event(name="us_holiday", date=hol_date, title=title)
for hol_date, title in sorted(hols.items())
if hol_date >= input_date
]
def get_conferences(input_date: date, filepath: str) -> List[Event]:
"""Read conferences from a YAML file and return a list of Event objects."""
with open(filepath, "r") as f:
data = yaml.safe_load(f)
events = []
for conf in data.get("conferences", []):
event_date = conf["start"]
# Skip the conference if it is before the input date.
if event_date < input_date:
continue
event = Event(
name="conference",
date=event_date,
title=f'{conf["name"]} ({conf["location"]})',
)
events.append(event)
return events
def next_birthday(from_date: date, birth_date: date) -> tuple[date, int]:
"""Calculate the date of the next birthday based on a given birth date."""
next_birthday_date = birth_date.replace(year=from_date.year)
if from_date > next_birthday_date:
next_birthday_date = birth_date.replace(year=from_date.year + 1)
age_at_next_birthday = next_birthday_date.year - birth_date.year
return next_birthday_date, age_at_next_birthday
def get_birthdays(from_date: date, config: configparser.ConfigParser) -> list[Event]:
"""Get birthdays from config."""
if "birthdays" not in config:
return []
events = []
for name, date_str in config["birthdays"].items():
bday = next_birthday(from_date, datetime.strptime(date_str, "%Y-%m-%d").date())
events.append(
Event(
date=bday[0],
name="birthday",
title=f"{name.title()} (aged {bday[1]})",
)
)
return events
def waste_collection_events() -> list[Event]:
"""Waste colllection events."""
postcode = "BS48 3HG"
uprn = "24071046"
html = waste_schedule.get_html(data_dir, postcode, uprn)
root = lxml.html.fromstring(html)
events = waste_schedule.parse(root)
return events
def get_data(now: datetime) -> dict[str, str | object]:
"""Get data to display on agenda dashboard."""
rocket_dir = os.path.join(data_dir, "thespacedevs")
today = now.date()
reply = {
"now": now,
"gbpusd": get_gbpusd(),
"next_economist": next_economist(today),
"bank_holiday": get_next_bank_holiday(today),
"us_holiday": get_us_holidays(today),
"next_us_presidential_election": next_us_presidential_election,
"stock_markets": stock_markets(),
"uk_clock_change": get_next_timezone_transition(now, "Europe/London"),
"us_clock_change": get_next_timezone_transition(now, "America/New_York"),
"mothers_day": next_uk_mothers_day(today),
"fathers_day": next_uk_fathers_day(today),
"uk_financial_year_end": uk_financial_year_end(today),
"xmas_last_posting_dates": xmas_last_posting_dates,
"rockets": thespacedevs.get_launches(rocket_dir, limit=40),
}
skip = {"now", "gbpusd", "rockets", "stock_markets", "xmas_last_posting_dates"}
events: list[Event] = []
for key, value in reply.items():
if key in skip:
continue
if "holiday" in key:
assert isinstance(value, list)
events += value
else:
assert isinstance(value, date)
event = Event(name=key, date=value)
events.append(event)
for key, value in xmas_last_posting_dates.items():
events.append(Event(name=f"xmas_last_{key}", date=value))
events += get_birthdays(today, config)
events += get_conferences(today, "conferences.yaml")
events += waste_collection_events()
next_up_series = Event(
date=date(2026, 6, 1),
title="70 Up",
name="next_up_series",
)
events.append(next_up_series)
events.sort(key=operator.attrgetter("date"))
reply["events"] = events
return reply