Initial commit

This commit is contained in:
Edward Betts 2023-10-02 20:35:30 +01:00
commit 46b2dbf4f3
9 changed files with 660 additions and 0 deletions

298
agenda/__init__.py Normal file
View file

@ -0,0 +1,298 @@
#!/usr/bin/python3
import configparser
import json
import os
import sys
import typing
import warnings
from datetime import date, datetime, timedelta, timezone
from decimal import Decimal
from time import time as unixtime
import dateutil
import dateutil.parser
import exchange_calendars
import holidays
import pandas
import pytz
import requests
from agenda import spacexdata
# import trading_calendars
warnings.simplefilter(action="ignore", category=FutureWarning)
# bin and recycling information
# birthdays
# conferences
# end of financial year: 5 April
# deadline to file tax return
# Christmas day
# last Christmas posting day
# wedding anniversary
# DONE: markets open/close
# DONE: next bank holiday
# DONE: economist
# could know about the Christmas double issue
# economist technology quarterly
# credit card expiry dates
# morzine ski lifts
# chalet availablity calendar
# sunrise and sunset
# starlink visible
# next installment of the Up TV series: 70 Up in 2026
# end of Amazon Prime
# mother's day - fourth Sunday of Lent
# father's day - third Sunday of June
here = dateutil.tz.tzlocal()
now = datetime.now()
today = now.date()
now_str = now.strftime("%Y-%m-%d_%H:%M")
now_utc = datetime.now(timezone.utc)
data_dir = "/home/edward/lib/data"
do_refresh = len(sys.argv) > 1 and sys.argv[1] == "--refresh"
next_us_presidential_election = date(2024, 11, 5)
next_uk_general_election = date(2024, 5, 2)
uk_lockdown_start = datetime(2020, 3, 23, 20, 30)
config = configparser.ConfigParser()
config.read(os.path.join(os.path.dirname(__file__), "config"))
access_key = config.get("exchangerate", "access_key")
def get_next_timezone_transition(tz_name: str) -> datetime:
"""Datetime of the next time the clocks change."""
tz = pytz.timezone(tz_name)
dt = next(t for t in tz._utc_transition_times if t > now)
return typing.cast(datetime, dt)
def get_next_bank_holiday() -> dict[str, date | str]:
"""Date and name of the next UK bank holiday."""
url = "https://www.gov.uk/bank-holidays.json"
filename = os.path.join(data_dir, "bank-holidays.json")
mtime = os.path.getmtime(filename)
if (unixtime() - mtime) > 3600: # one hour
r = requests.get(url)
open(filename, "w").write(r.text)
events = json.load(open(filename))["england-and-wales"]["events"]
next_holiday = None
for event in events:
event_date = datetime.strptime(event["date"], "%Y-%m-%d").date()
if event_date < today:
continue
next_holiday = {"date": event_date, "title": event["title"]}
break
assert next_holiday
return next_holiday
def get_gbpusd() -> Decimal:
"""Get the current value for GBPUSD, with caching."""
fx_dir = os.path.join(data_dir, "fx")
existing_data = os.listdir(fx_dir)
existing = [f for f in existing_data if f.endswith("_GBPUSD.json")]
if existing:
recent_filename = max(existing)
recent = datetime.strptime(recent_filename, "%Y-%m-%d_%H:%M_GBPUSD.json")
delta = now - recent
if not do_refresh and existing and delta < timedelta(hours=6):
full = os.path.join(fx_dir, recent_filename)
data = json.load(open(full), parse_float=Decimal)
if "quotes" in data and "USDGBP" in data["quotes"]:
return typing.cast(Decimal, 1 / data["quotes"]["USDGBP"])
# url = 'https://api.exchangeratesapi.io/latest?base=GBP&symbols=USD'
# url = "https://api.exchangerate.host/latest?base=GBP&symbols=USD"
url = "http://api.exchangerate.host/live"
params = {"currencies": "GBP,USD", "access_key": access_key}
filename = f"{fx_dir}/{now_str}_GBPUSD.json"
r = requests.get(url, params=params)
open(filename, "w").write(r.text)
data = json.loads(r.text, parse_float=Decimal)
return typing.cast(Decimal, 1 / data["quotes"]["USDGBP"])
def next_economist() -> date:
"""Next date that the Economist is published."""
# TODO: handle the Christmas double issue correctly
return today + timedelta((3 - today.weekday()) % 7)
def timedelta_display(delta: timedelta) -> str:
"""Format timedelta as a human readable string."""
total_seconds = int(delta.total_seconds())
days, remainder = divmod(total_seconds, 24 * 60 * 60)
hours, remainder = divmod(remainder, 60 * 60)
mins, secs = divmod(remainder, 60)
return " ".join(
f"{v:>3} {label}"
for v, label in ((days, "days"), (hours, "hrs"), (mins, "mins"))
if v
)
def stock_markets() -> list[str]:
"""Stock markets open and close times."""
# The trading calendars code is slow, maybe there is a faster way to do this
# Or we could cache the result
now = pandas.Timestamp.now(timezone.utc)
now_local = pandas.Timestamp.now(here)
markets = [
("XLON", "London"),
("XNYS", "US"),
]
reply = []
for code, label in markets:
cal = exchange_calendars.get_calendar(code)
if cal.is_open_on_minute(now_local):
next_close = cal.next_close(now).tz_convert(here)
next_close = next_close.replace(minute=round(next_close.minute, -1))
delta_close = timedelta_display(next_close - now_local)
prev_open = cal.previous_open(now).tz_convert(here)
prev_open = prev_open.replace(minute=round(prev_open.minute, -1))
delta_open = timedelta_display(now_local - prev_open)
msg = (
f"{label:>6} market opened {delta_open} ago, "
+ f"closes in {delta_close} ({next_close:%H:%M})"
)
else:
ts = cal.next_open(now)
ts = ts.replace(minute=round(ts.minute, -1))
ts = ts.tz_convert(here)
delta = timedelta_display(ts - now_local)
msg = f"{label:>6} market opens in {delta}" + (
f" ({ts:%H:%M})" if (ts - now_local) < timedelta(days=1) else ""
)
reply.append(msg)
return reply
def get_us_holiday() -> dict[str, date | str]:
"""Date and name of next US holiday."""
hols = holidays.UnitedStates(years=[today.year, today.year + 1])
next_hol = next(x for x in sorted(hols.items()) if x[0] >= today)
return {"date": next_hol[0], "title": next_hol[1]}
def days_since_lockdown_start() -> float:
"""Days since the start of the first UK Covid lockdown in 2020."""
return (now - uk_lockdown_start).total_seconds() / (3600 * 24)
def get_data() -> dict[str, str | object]:
"""Get data to display on agenda dashboard."""
reply = {
"now": now,
"gbpusd": get_gbpusd(),
# "lockdown_days": days_since_lockdown_start(),
"next_economist": next_economist(),
"bank_holiday": get_next_bank_holiday(),
"us_holiday": get_us_holiday(),
"next_uk_general_election": next_uk_general_election,
"next_us_presidential_election": next_us_presidential_election,
# "spacex": spacexdata.get_next_spacex_launch(limit=20),
"stock_markets": stock_markets(),
"uk_clock_change": get_next_timezone_transition("Europe/London"),
"us_clock_change": get_next_timezone_transition("America/New_York"),
}
return reply
def main():
ocado = datetime(2021, 1, 21, 9, 30)
ocado_last_edit = datetime(2021, 1, 20, 22, 10)
# stamp_duty_holiday = date(2021, 3, 31)
gbpusd = get_gbpusd()
spacex = spacexdata.get_next_spacex_launch(limit=20)
bank_holiday = get_next_bank_holiday()
us_holiday = get_us_holiday()
lockdown_days = (now - uk_lockdown_start).total_seconds() / (3600 * 24)
def days_hours(when):
delta = when - (now if when.tzinfo is None else now_utc)
return f"{delta.days:>5,d} days {delta.seconds // 3600:>2.0f} hours"
def days(when):
return " today" if when == today else f"{(when - today).days:>5,d} days"
print()
print(f"Today is {now:%A, %-d %b %Y} GBPUSD: {gbpusd:,.3f}")
print()
print(
f" lockdown: {lockdown_days:.1f} days ({lockdown_days / 7:.2f} weeks) so far"
)
if ocado_last_edit > now:
print(
f" Ocado last edit: {days_hours(ocado_last_edit):19s} {ocado_last_edit:%a, %d %b %H:%M}"
)
if ocado_last_edit > now:
print(f" Ocado delivery: {days_hours(ocado):9s} {ocado:%a, %d %b %H:%M}")
# print(f' sitar: {days_hours(sitar)} {sitar:%a, %d %b %H:%M}')
print(f" The Economist: {days(next_economist()):20s} (Thursday)")
print(
f' UK bank holiday: {days(bank_holiday["date"]):20s} {bank_holiday["date"]:%a, %d %b} {bank_holiday["title"]}'
)
print(
f' US holiday: {days(us_holiday["date"]):20s} {us_holiday["date"]:%a, %d %b} {us_holiday["title"]}'
)
# print(f'stamp duty holiday: {days(stamp_duty_holiday):20s} {stamp_duty_holiday:%a, %d %b}')
print(
f" general election: {days(next_uk_general_election):20s} {next_uk_general_election:%a, %d %b %Y}"
)
print(
f" US pres. election: {days(next_us_presidential_election):20s} {next_us_presidential_election:%a, %d %b %Y}"
)
# print(f' Amazon prime: {days(amazon_prime):18s} {amazon_prime:%a, %d %b}')
# print(f'extend transition: {days_hours(extend_transition_deadline)} {extend_transition_deadline:%a, %d %b %H:%M} deadline to extend the transition period ')
print()
print(" SpaceX launchs")
for launch in spacex:
if launch["when"] > now_utc + timedelta(days=120):
continue
if launch["date_precision"] == "day":
print(
f' {days(launch["when"].date()):>8s} {launch["when"]:%a, %d %b} {launch["name"]} ({launch["rocket"]}, {" + ".join(launch["payloads"])})',
launch["date_precision"],
)
else:
print(
f' {days_hours(launch["when"])} {launch["when"]:%a, %d %b %H:%M} {launch["name"]} ({launch["rocket"]}, {" + ".join(launch["payloads"])})',
launch["date_precision"] or "",
)
print()
for line in stock_markets():
print(line)

139
agenda/spacexdata.py Normal file
View file

@ -0,0 +1,139 @@
import json
import os
import sys
from datetime import datetime, timezone
from typing import Any
import dateutil
import dateutil.parser
import requests
data_dir = "/home/edward/lib/data"
spacex_dir = os.path.join(data_dir, "spacex")
now = datetime.now()
here = dateutil.tz.tzlocal()
do_refresh = len(sys.argv) > 1 and sys.argv[1] == "--refresh"
Launch = dict[str, str | datetime | list[Any]]
def filename_timestamp(filename: str) -> tuple[datetime, str] | None:
"""Get datetime from filename."""
try:
ts = datetime.strptime(filename, "%Y-%m-%d_%H:%M:%S.json")
except ValueError:
return None
return (ts, filename)
def next_spacex_launch_api(limit: int) -> list[Launch]:
"""Get the next upcoming launches from the API."""
filename = os.path.join(spacex_dir, now.strftime("%Y-%m-%d_%H:%M:%S.json"))
url = "https://api.spacexdata.com/v4/launches/upcoming"
params: dict[str, str | int] = dict(tbd="false", limit=limit)
r = requests.get(url, params=params)
open(filename, "w").write(r.text)
launch_list = r.json()
if "error" in launch_list:
print(r.url)
print(launch_list)
raise ValueError
return [
parse_get_next_spacex_launch(launch)
for launch in launch_list
if launch["date_precision"] not in ("month", "quarter", "half")
]
def get_spacex_payloads() -> dict[str, str]:
"""Load SpaceX payloads or refresh if missing."""
filename = os.path.join(spacex_dir, "payloads.json")
if not os.path.exists(filename):
refresh_payloads()
data = json.load(open(filename))
# print(json.dumps(data, indent=2))
for p in data:
if "type" in p:
continue
print(json.dumps(p, indent=2))
return {p["id"]: p["type"] for p in data}
def get_spacex_rockets() -> dict[str, str]:
"""Load mapping of rocket ID to rocket name."""
filename = os.path.join(spacex_dir, "rockets.json")
return {r["id"]: r["name"] for r in json.load(open(filename))}
def refresh_payloads() -> None:
"""Download list of payloads and save."""
url = "https://api.spacexdata.com/v4/payloads"
filename = os.path.join(spacex_dir, "payloads.json")
r = requests.get(url)
open(filename, "w").write(r.text)
rocket_map = get_spacex_rockets()
payloads = get_spacex_payloads()
def parse_get_next_spacex_launch(launch: dict[str, Any]) -> Launch:
global payloads
date_utc = dateutil.parser.parse(launch["date_utc"])
date_utc.replace(tzinfo=timezone.utc)
# TODO: payload
need_refresh = any(p not in payloads for p in launch["payloads"])
if need_refresh:
refresh_payloads()
payloads = get_spacex_payloads()
assert all(p in payloads for p in launch["payloads"])
return {
"rocket": rocket_map[launch["rocket"]],
"name": launch["name"],
"when": date_utc.astimezone(here),
"date_precision": launch["date_precision"],
"payloads": [payloads.get(p, "[unknown payload]") for p in launch["payloads"]],
}
def get_most_recent_existing(existing):
for _, f in existing:
filename = os.path.join(spacex_dir, f)
launch_list = json.load(open(filename))
if "error" in launch_list:
print(launch_list)
continue
return launch_list
def get_next_spacex_launch(limit: int) -> list[Launch]:
filename = os.path.join(data_dir, "next_spacex_launch.json")
spacex_dir = os.path.join(data_dir, "spacex")
existing = [x for x in (filename_timestamp(f) for f in os.listdir(spacex_dir)) if x]
existing.sort(reverse=True)
if do_refresh or not existing or (now - existing[0][0]).seconds > 3600: # one hour
try:
return next_spacex_launch_api(limit=limit)
except ValueError:
print("*** SpaceX next launch error ***")
for _, f in existing:
filename = os.path.join(spacex_dir, f)
launch_list = json.load(open(filename))
if "error" in launch_list:
print(launch_list)
continue
return [
parse_get_next_spacex_launch(launch)
for launch in launch_list
if launch["date_precision"] not in ("month", "quarter", "half")
]