291 lines
9.6 KiB
Python
291 lines
9.6 KiB
Python
import configparser
|
|
import json
|
|
import os
|
|
import typing
|
|
import warnings
|
|
from datetime import date, datetime, timedelta, timezone
|
|
from decimal import Decimal
|
|
from time import time as unixtime
|
|
|
|
import dateutil
|
|
import dateutil.parser
|
|
import exchange_calendars
|
|
import holidays
|
|
import pandas
|
|
import pytz
|
|
import requests
|
|
from dateutil.easter import easter
|
|
|
|
# from agenda import spacexdata
|
|
|
|
warnings.simplefilter(action="ignore", category=FutureWarning)
|
|
|
|
|
|
# deadline to file tax return
|
|
# credit card expiry dates
|
|
# morzine ski lifts
|
|
# chalet availablity calendar
|
|
|
|
# sunrise and sunset
|
|
# starlink visible
|
|
|
|
here = dateutil.tz.tzlocal()
|
|
now = datetime.now()
|
|
today = now.date()
|
|
now_str = now.strftime("%Y-%m-%d_%H:%M")
|
|
now_utc = datetime.now(timezone.utc)
|
|
|
|
next_us_presidential_election = date(2024, 11, 5)
|
|
next_uk_general_election = date(2024, 5, 2)
|
|
|
|
xmas_last_posting_dates = {"first": date(2023, 12, 20), "second": date(2023, 12, 18)}
|
|
|
|
config_filename = os.path.join(os.path.dirname(__file__), "..", "config")
|
|
|
|
assert os.path.exists(config_filename)
|
|
|
|
config = configparser.ConfigParser()
|
|
config.read(config_filename)
|
|
|
|
access_key = config.get("exchangerate", "access_key")
|
|
data_dir = config.get("data", "dir")
|
|
|
|
|
|
def next_uk_mothers_day(input_date: date) -> date:
|
|
"""Calculate the date of the next UK Mother's Day from the current date."""
|
|
current_year = input_date.year
|
|
|
|
easter_date = easter(current_year)
|
|
|
|
# Calculate the date of Mother's Day, which is the fourth Sunday of Lent
|
|
mothers_day = easter_date + timedelta(weeks=3)
|
|
|
|
# Check if Mother's Day has already passed this year
|
|
if input_date > mothers_day:
|
|
# If it has passed, calculate for the next year
|
|
easter_date = easter(current_year + 1)
|
|
mothers_day = easter_date + timedelta(weeks=3)
|
|
|
|
return mothers_day
|
|
|
|
|
|
def next_uk_fathers_day(input_date: date) -> date:
|
|
"""Calculate the date of the next UK Father's Day from the current date."""
|
|
# Get the current date
|
|
# Calculate the day of the week for the current date (0 = Monday, 6 = Sunday)
|
|
current_day_of_week = input_date.weekday()
|
|
|
|
# Calculate the number of days until the next Sunday
|
|
days_until_sunday = (6 - current_day_of_week) % 7
|
|
|
|
# Calculate the date of the next Sunday
|
|
next_sunday = today + timedelta(days=days_until_sunday)
|
|
|
|
# Calculate the date of Father's Day, which is the third Sunday of June
|
|
fathers_day = date(next_sunday.year, 6, 1) + timedelta(
|
|
weeks=2, days=next_sunday.weekday()
|
|
)
|
|
|
|
# Check if Father's Day has already passed this year
|
|
if today > fathers_day:
|
|
# If it has passed, calculate for the next year
|
|
fathers_day = date(fathers_day.year + 1, 6, 1) + timedelta(
|
|
weeks=2, days=next_sunday.weekday()
|
|
)
|
|
|
|
return fathers_day
|
|
|
|
|
|
def get_next_timezone_transition(tz_name: str) -> datetime:
|
|
"""Datetime of the next time the clocks change."""
|
|
tz = pytz.timezone(tz_name)
|
|
dt = next(t for t in tz._utc_transition_times if t > now)
|
|
|
|
return typing.cast(datetime, dt)
|
|
|
|
|
|
def get_next_bank_holiday() -> dict[str, date | str]:
|
|
"""Date and name of the next UK bank holiday."""
|
|
url = "https://www.gov.uk/bank-holidays.json"
|
|
filename = os.path.join(data_dir, "bank-holidays.json")
|
|
mtime = os.path.getmtime(filename)
|
|
if (unixtime() - mtime) > 3600: # one hour
|
|
r = requests.get(url)
|
|
open(filename, "w").write(r.text)
|
|
|
|
events = json.load(open(filename))["england-and-wales"]["events"]
|
|
next_holiday = None
|
|
for event in events:
|
|
event_date = datetime.strptime(event["date"], "%Y-%m-%d").date()
|
|
if event_date < today:
|
|
continue
|
|
next_holiday = {"date": event_date, "title": event["title"]}
|
|
break
|
|
|
|
assert next_holiday
|
|
|
|
return next_holiday
|
|
|
|
|
|
def get_gbpusd() -> Decimal:
|
|
"""Get the current value for GBPUSD, with caching."""
|
|
fx_dir = os.path.join(data_dir, "fx")
|
|
existing_data = os.listdir(fx_dir)
|
|
existing = [f for f in existing_data if f.endswith("_GBPUSD.json")]
|
|
if existing:
|
|
recent_filename = max(existing)
|
|
recent = datetime.strptime(recent_filename, "%Y-%m-%d_%H:%M_GBPUSD.json")
|
|
delta = now - recent
|
|
|
|
if existing and delta < timedelta(hours=6):
|
|
full = os.path.join(fx_dir, recent_filename)
|
|
data = json.load(open(full), parse_float=Decimal)
|
|
if "quotes" in data and "USDGBP" in data["quotes"]:
|
|
return typing.cast(Decimal, 1 / data["quotes"]["USDGBP"])
|
|
|
|
url = "http://api.exchangerate.host/live"
|
|
params = {"currencies": "GBP,USD", "access_key": access_key}
|
|
|
|
filename = f"{fx_dir}/{now_str}_GBPUSD.json"
|
|
r = requests.get(url, params=params)
|
|
open(filename, "w").write(r.text)
|
|
data = json.loads(r.text, parse_float=Decimal)
|
|
|
|
return typing.cast(Decimal, 1 / data["quotes"]["USDGBP"])
|
|
|
|
|
|
def next_economist(input_date: date) -> date:
|
|
"""Next date that the Economist is published."""
|
|
# Define the publication day (Thursday) and the day of the week of the input date
|
|
publication_day = 3 # Thursday (0 - Monday, 1 - Tuesday, ..., 6 - Sunday)
|
|
current_day_of_week = input_date.weekday()
|
|
current_week_number = today.isocalendar().week
|
|
|
|
# Define the list of weeks when The Economist is not published
|
|
non_publication_weeks = [26, 56]
|
|
|
|
# Check if the input date is a publication day (Thursday)
|
|
if (
|
|
current_day_of_week == publication_day
|
|
and current_week_number not in non_publication_weeks
|
|
):
|
|
return input_date
|
|
|
|
# Calculate the date for the next Thursday after the input date
|
|
days_until_next_thursday = (publication_day - current_day_of_week + 7) % 7
|
|
next_thursday_date = input_date + timedelta(days=days_until_next_thursday)
|
|
|
|
# Check if the next Thursday falls in a non-publication week
|
|
while next_thursday_date.isocalendar().week in non_publication_weeks:
|
|
# If it does, add 7 days to find the next Thursday
|
|
next_thursday_date += timedelta(days=7)
|
|
|
|
return next_thursday_date
|
|
|
|
|
|
def xmas_day(input_date: date) -> date:
|
|
"""Christmas day."""
|
|
return date(input_date.year, 12, 25)
|
|
|
|
|
|
def uk_financial_year_end(input_date: date) -> date:
|
|
"""Next date of the end of the UK financial year, April 5th."""
|
|
# Determine the year of the input date
|
|
year = input_date.year
|
|
|
|
# Calculate the UK financial year end date (April 5th)
|
|
uk_financial_year_end = date(year, 4, 5)
|
|
|
|
# If the input date is on or after the UK financial year end date,
|
|
# move to the next year
|
|
if input_date > uk_financial_year_end:
|
|
uk_financial_year_end = date(year + 1, 4, 5)
|
|
|
|
return uk_financial_year_end
|
|
|
|
|
|
def timedelta_display(delta: timedelta) -> str:
|
|
"""Format timedelta as a human readable string."""
|
|
total_seconds = int(delta.total_seconds())
|
|
days, remainder = divmod(total_seconds, 24 * 60 * 60)
|
|
hours, remainder = divmod(remainder, 60 * 60)
|
|
mins, secs = divmod(remainder, 60)
|
|
|
|
return " ".join(
|
|
f"{v:>3} {label}"
|
|
for v, label in ((days, "days"), (hours, "hrs"), (mins, "mins"))
|
|
if v
|
|
)
|
|
|
|
|
|
def stock_markets() -> list[str]:
|
|
"""Stock markets open and close times."""
|
|
# The trading calendars code is slow, maybe there is a faster way to do this
|
|
# Or we could cache the result
|
|
now = pandas.Timestamp.now(timezone.utc)
|
|
now_local = pandas.Timestamp.now(here)
|
|
markets = [
|
|
("XLON", "London"),
|
|
("XNYS", "US"),
|
|
]
|
|
reply = []
|
|
for code, label in markets:
|
|
cal = exchange_calendars.get_calendar(code)
|
|
|
|
if cal.is_open_on_minute(now_local):
|
|
next_close = cal.next_close(now).tz_convert(here)
|
|
next_close = next_close.replace(minute=round(next_close.minute, -1))
|
|
delta_close = timedelta_display(next_close - now_local)
|
|
|
|
prev_open = cal.previous_open(now).tz_convert(here)
|
|
prev_open = prev_open.replace(minute=round(prev_open.minute, -1))
|
|
delta_open = timedelta_display(now_local - prev_open)
|
|
|
|
msg = (
|
|
f"{label:>6} market opened {delta_open} ago, "
|
|
+ f"closes in {delta_close} ({next_close:%H:%M})"
|
|
)
|
|
else:
|
|
ts = cal.next_open(now)
|
|
ts = ts.replace(minute=round(ts.minute, -1))
|
|
ts = ts.tz_convert(here)
|
|
delta = timedelta_display(ts - now_local)
|
|
msg = f"{label:>6} market opens in {delta}" + (
|
|
f" ({ts:%H:%M})" if (ts - now_local) < timedelta(days=1) else ""
|
|
)
|
|
|
|
reply.append(msg)
|
|
return reply
|
|
|
|
|
|
def get_us_holiday() -> dict[str, date | str]:
|
|
"""Date and name of next US holiday."""
|
|
hols = holidays.UnitedStates(years=[today.year, today.year + 1])
|
|
next_hol = next(x for x in sorted(hols.items()) if x[0] >= today)
|
|
|
|
return {"date": next_hol[0], "title": next_hol[1]}
|
|
|
|
|
|
def get_data() -> dict[str, str | object]:
|
|
"""Get data to display on agenda dashboard."""
|
|
reply = {
|
|
"now": now,
|
|
"gbpusd": get_gbpusd(),
|
|
"next_economist": next_economist(today),
|
|
"bank_holiday": get_next_bank_holiday(),
|
|
"us_holiday": get_us_holiday(),
|
|
"next_uk_general_election": next_uk_general_election,
|
|
"next_us_presidential_election": next_us_presidential_election,
|
|
# "spacex": spacexdata.get_next_spacex_launch(limit=20),
|
|
"stock_markets": stock_markets(),
|
|
"uk_clock_change": get_next_timezone_transition("Europe/London"),
|
|
"us_clock_change": get_next_timezone_transition("America/New_York"),
|
|
"mothers_day": next_uk_mothers_day(today),
|
|
"fathers_day": next_uk_fathers_day(today),
|
|
"uk_financial_year_end": uk_financial_year_end(today),
|
|
"xmas_last_posting_dates": xmas_last_posting_dates,
|
|
"xmas_day": xmas_day(today),
|
|
}
|
|
|
|
return reply
|