Move timedelta_display() function to agenda.utils

This commit is contained in:
Edward Betts 2024-08-04 09:23:13 +02:00
parent 15c5053e44
commit b9adb3d15e
2 changed files with 22 additions and 20 deletions

View file

@ -3,26 +3,14 @@
from datetime import timedelta, timezone from datetime import timedelta, timezone
import dateutil.tz import dateutil.tz
import exchange_calendars import exchange_calendars # type: ignore
import pandas import pandas # type: ignore
from . import utils
here = dateutil.tz.tzlocal() here = dateutil.tz.tzlocal()
def timedelta_display(delta: timedelta) -> str:
"""Format timedelta as a human readable string."""
total_seconds = int(delta.total_seconds())
days, remainder = divmod(total_seconds, 24 * 60 * 60)
hours, remainder = divmod(remainder, 60 * 60)
mins, secs = divmod(remainder, 60)
return " ".join(
f"{v:>3} {label}"
for v, label in ((days, "days"), (hours, "hrs"), (mins, "mins"))
if v
)
def open_and_close() -> list[str]: def open_and_close() -> list[str]:
"""Stock markets open and close times.""" """Stock markets open and close times."""
# The trading calendars code is slow, maybe there is a faster way to do this # The trading calendars code is slow, maybe there is a faster way to do this
@ -40,11 +28,11 @@ def open_and_close() -> list[str]:
if cal.is_open_on_minute(now_local): if cal.is_open_on_minute(now_local):
next_close = cal.next_close(now).tz_convert(here) next_close = cal.next_close(now).tz_convert(here)
next_close = next_close.replace(minute=round(next_close.minute, -1)) next_close = next_close.replace(minute=round(next_close.minute, -1))
delta_close = timedelta_display(next_close - now_local) delta_close = utils.timedelta_display(next_close - now_local)
prev_open = cal.previous_open(now).tz_convert(here) prev_open = cal.previous_open(now).tz_convert(here)
prev_open = prev_open.replace(minute=round(prev_open.minute, -1)) prev_open = prev_open.replace(minute=round(prev_open.minute, -1))
delta_open = timedelta_display(now_local - prev_open) delta_open = utils.timedelta_display(now_local - prev_open)
msg = ( msg = (
f"{label:>6} market opened {delta_open} ago, " f"{label:>6} market opened {delta_open} ago, "
@ -54,7 +42,7 @@ def open_and_close() -> list[str]:
ts = cal.next_open(now) ts = cal.next_open(now)
ts = ts.replace(minute=round(ts.minute, -1)) ts = ts.replace(minute=round(ts.minute, -1))
ts = ts.tz_convert(here) ts = ts.tz_convert(here)
delta = timedelta_display(ts - now_local) delta = utils.timedelta_display(ts - now_local)
msg = f"{label:>6} market opens in {delta}" + ( msg = f"{label:>6} market opens in {delta}" + (
f" ({ts:%H:%M})" if (ts - now_local) < timedelta(days=1) else "" f" ({ts:%H:%M})" if (ts - now_local) < timedelta(days=1) else ""
) )

View file

@ -2,7 +2,7 @@
import os import os
import typing import typing
from datetime import date, datetime, timezone from datetime import date, datetime, timedelta, timezone
from time import time from time import time
@ -28,6 +28,20 @@ def as_datetime(d: datetime | date) -> datetime:
raise TypeError(f"Unsupported type: {type(d)}") raise TypeError(f"Unsupported type: {type(d)}")
def timedelta_display(delta: timedelta) -> str:
"""Format timedelta as a human readable string."""
total_seconds = int(delta.total_seconds())
days, remainder = divmod(total_seconds, 24 * 60 * 60)
hours, remainder = divmod(remainder, 60 * 60)
mins, secs = divmod(remainder, 60)
return " ".join(
f"{v} {label}"
for v, label in ((days, "days"), (hours, "hrs"), (mins, "mins"))
if v
)
def human_readable_delta(future_date: date) -> str | None: def human_readable_delta(future_date: date) -> str | None:
""" """
Calculate the human-readable time delta for a given future date. Calculate the human-readable time delta for a given future date.