Split stock market open/close code into own file

This commit is contained in:
Edward Betts 2023-11-06 09:24:17 +00:00
parent 2ea79c12d9
commit 74e135e9c3
2 changed files with 77 additions and 60 deletions

View file

@ -5,17 +5,15 @@ import operator
import os import os
import typing import typing
import warnings import warnings
from datetime import date, datetime, time, timedelta, timezone from datetime import date, datetime, time, timedelta
from time import time as unixtime from time import time as unixtime
import dateutil import dateutil
import dateutil.parser import dateutil.parser
import dateutil.tz import dateutil.tz
import exchange_calendars
import holidays import holidays
import httpx import httpx
import lxml import lxml
import pandas
import pytz import pytz
import yaml import yaml
from dateutil.easter import easter from dateutil.easter import easter
@ -29,6 +27,8 @@ from . import (
fx, fx,
gwr, gwr,
markets, markets,
stock_market,
subscription,
sun, sun,
thespacedevs, thespacedevs,
travel, travel,
@ -158,60 +158,6 @@ def uk_financial_year_end(input_date: date) -> date:
return uk_financial_year_end return uk_financial_year_end
def timedelta_display(delta: timedelta) -> str:
"""Format timedelta as a human readable string."""
total_seconds = int(delta.total_seconds())
days, remainder = divmod(total_seconds, 24 * 60 * 60)
hours, remainder = divmod(remainder, 60 * 60)
mins, secs = divmod(remainder, 60)
return " ".join(
f"{v:>3} {label}"
for v, label in ((days, "days"), (hours, "hrs"), (mins, "mins"))
if v
)
def stock_markets() -> list[str]:
"""Stock markets open and close times."""
# The trading calendars code is slow, maybe there is a faster way to do this
# Or we could cache the result
now = pandas.Timestamp.now(timezone.utc)
now_local = pandas.Timestamp.now(here)
markets = [
("XLON", "London"),
("XNYS", "US"),
]
reply = []
for code, label in markets:
cal = exchange_calendars.get_calendar(code)
if cal.is_open_on_minute(now_local):
next_close = cal.next_close(now).tz_convert(here)
next_close = next_close.replace(minute=round(next_close.minute, -1))
delta_close = timedelta_display(next_close - now_local)
prev_open = cal.previous_open(now).tz_convert(here)
prev_open = prev_open.replace(minute=round(prev_open.minute, -1))
delta_open = timedelta_display(now_local - prev_open)
msg = (
f"{label:>6} market opened {delta_open} ago, "
+ f"closes in {delta_close} ({next_close:%H:%M})"
)
else:
ts = cal.next_open(now)
ts = ts.replace(minute=round(ts.minute, -1))
ts = ts.tz_convert(here)
delta = timedelta_display(ts - now_local)
msg = f"{label:>6} market opens in {delta}" + (
f" ({ts:%H:%M})" if (ts - now_local) < timedelta(days=1) else ""
)
reply.append(msg)
return reply
def get_us_holidays(start_date: date, end_date: date) -> list[Event]: def get_us_holidays(start_date: date, end_date: date) -> list[Event]:
"""Date and name of next US holiday.""" """Date and name of next US holiday."""
found: list[Event] = [] found: list[Event] = []
@ -272,7 +218,7 @@ def as_date(d: date | datetime) -> date:
def get_accommodation(filepath: str) -> list[Event]: def get_accommodation(filepath: str) -> list[Event]:
"""Get birthdays from config.""" """Get accomodation from config."""
with open(filepath) as f: with open(filepath) as f:
return [ return [
Event( Event(
@ -343,7 +289,7 @@ async def get_data(now: datetime) -> typing.Mapping[str, str | object]:
"bank_holiday": bank_holiday, "bank_holiday": bank_holiday,
"us_holiday": get_us_holidays(last_year, next_year), "us_holiday": get_us_holidays(last_year, next_year),
"next_us_presidential_election": next_us_presidential_election, "next_us_presidential_election": next_us_presidential_election,
"stock_markets": stock_markets(), "stock_markets": stock_market.open_and_close(),
"uk_clock_change": timezone_transition( "uk_clock_change": timezone_transition(
minus_365, plus_365, "uk_clock_change", "Europe/London" minus_365, plus_365, "uk_clock_change", "Europe/London"
), ),
@ -381,7 +327,13 @@ async def get_data(now: datetime) -> typing.Mapping[str, str | object]:
reply["sunset"] = sun.sunset(observer) reply["sunset"] = sun.sunset(observer)
for key, value in xmas_last_posting_dates.items(): for key, value in xmas_last_posting_dates.items():
events.append(Event(name=f"xmas_last_{key}", date=value)) events.append(
Event(
name=f"xmas_last_{key}",
date=value,
url="https://www.postoffice.co.uk/last-posting-dates",
)
)
my_data = config["data"]["personal-data"] my_data = config["data"]["personal-data"]
events += birthday.get_birthdays(last_year, os.path.join(my_data, "entities.yaml")) events += birthday.get_birthdays(last_year, os.path.join(my_data, "entities.yaml"))
@ -390,6 +342,8 @@ async def get_data(now: datetime) -> typing.Mapping[str, str | object]:
events += conference.get_list(os.path.join(my_data, "conferences.yaml")) events += conference.get_list(os.path.join(my_data, "conferences.yaml"))
events += backwell_bins + bristol_bins events += backwell_bins + bristol_bins
events += subscription.get_events(os.path.join(my_data, "subscriptions.yaml"))
next_up_series = Event( next_up_series = Event(
date=date(2026, 6, 1), date=date(2026, 6, 1),
title="70 Up", title="70 Up",

63
agenda/stock_market.py Normal file
View file

@ -0,0 +1,63 @@
"""Stock market open and close times."""
from datetime import timedelta, timezone
import dateutil.tz
import exchange_calendars
import pandas
here = dateutil.tz.tzlocal()
def timedelta_display(delta: timedelta) -> str:
"""Format timedelta as a human readable string."""
total_seconds = int(delta.total_seconds())
days, remainder = divmod(total_seconds, 24 * 60 * 60)
hours, remainder = divmod(remainder, 60 * 60)
mins, secs = divmod(remainder, 60)
return " ".join(
f"{v:>3} {label}"
for v, label in ((days, "days"), (hours, "hrs"), (mins, "mins"))
if v
)
def open_and_close() -> list[str]:
"""Stock markets open and close times."""
# The trading calendars code is slow, maybe there is a faster way to do this
# Or we could cache the result
now = pandas.Timestamp.now(timezone.utc)
now_local = pandas.Timestamp.now(here)
markets = [
("XLON", "London"),
("XNYS", "US"),
]
reply = []
for code, label in markets:
cal = exchange_calendars.get_calendar(code)
if cal.is_open_on_minute(now_local):
next_close = cal.next_close(now).tz_convert(here)
next_close = next_close.replace(minute=round(next_close.minute, -1))
delta_close = timedelta_display(next_close - now_local)
prev_open = cal.previous_open(now).tz_convert(here)
prev_open = prev_open.replace(minute=round(prev_open.minute, -1))
delta_open = timedelta_display(now_local - prev_open)
msg = (
f"{label:>6} market opened {delta_open} ago, "
+ f"closes in {delta_close} ({next_close:%H:%M})"
)
else:
ts = cal.next_open(now)
ts = ts.replace(minute=round(ts.minute, -1))
ts = ts.tz_convert(here)
delta = timedelta_display(ts - now_local)
msg = f"{label:>6} market opens in {delta}" + (
f" ({ts:%H:%M})" if (ts - now_local) < timedelta(days=1) else ""
)
reply.append(msg)
return reply