agenda/agenda/utils.py
2025-11-03 19:39:28 +00:00

135 lines
3.9 KiB
Python

"""Utility functions."""
import os
import typing
from datetime import date, datetime, time, timedelta, timezone
from time import time as unixtime
from zoneinfo import ZoneInfo
from .types import StrDict
def as_date(d: datetime | date) -> date:
"""Convert datetime to date."""
match d:
case datetime():
return d.date()
case date():
return d
case _:
raise TypeError(f"Unsupported type: {type(d)}")
def as_datetime(d: datetime | date) -> datetime:
"""Date/time of event."""
match d:
case datetime():
return d
case date():
return datetime.combine(d, datetime.min.time()).replace(tzinfo=timezone.utc)
case _:
raise TypeError(f"Unsupported type: {type(d)}")
def timedelta_display(delta: timedelta) -> str:
"""Format timedelta as a human readable string."""
total_seconds = int(delta.total_seconds())
days, remainder = divmod(total_seconds, 24 * 60 * 60)
hours, remainder = divmod(remainder, 60 * 60)
mins, secs = divmod(remainder, 60)
return " ".join(
f"{v} {label}"
for v, label in ((days, "days"), (hours, "hrs"), (mins, "mins"))
if v
)
def plural(value: int, unit: str) -> str:
"""Value + unit with unit written as singular or plural as appropriate."""
return f"{value} {unit}{'s' if value > 1 else ''}"
def human_readable_delta(future_date: date) -> str | None:
"""
Calculate the human-readable time delta for a given future date.
Args:
future_date (date): The future date as a datetime.date object.
Returns:
str: Human-readable time delta.
"""
# Ensure the input is a future date
if future_date <= date.today():
return None
# Calculate the delta
delta = future_date - date.today()
# Convert delta to a more human-readable format
months, days = divmod(delta.days, 30)
weeks, days = divmod(days, 7)
# Formatting the output
parts = [
plural(value, unit)
for value, unit in ((months, "month"), (weeks, "week"), (days, "day"))
if value > 0
]
return " ".join(parts) if parts else None
def filename_timestamp(filename: str, ext: str) -> tuple[datetime, str] | None:
"""Get datetime from filename."""
try:
ts = datetime.strptime(filename, f"%Y-%m-%d_%H:%M:%S.{ext}")
except ValueError:
return None
return (ts, filename)
def get_most_recent_file(directory: str, ext: str) -> str | None:
"""Get most recent file from directory."""
existing = [
x for x in (filename_timestamp(f, ext) for f in os.listdir(directory)) if x
]
if not existing:
return None
existing.sort(reverse=True)
return os.path.join(directory, existing[0][1])
def make_waste_dir(data_dir: str) -> None:
"""Make waste dir if missing."""
waste_dir = os.path.join(data_dir, "waste")
if not os.path.exists(waste_dir):
os.mkdir(waste_dir)
async def time_function(
name: str,
func: typing.Callable[..., typing.Coroutine[typing.Any, typing.Any, typing.Any]],
*args: typing.Any,
**kwargs: typing.Any,
) -> tuple[str, typing.Any, float, Exception | None]:
"""Time the execution of an asynchronous function."""
start_time, result, exception = unixtime(), None, None
try:
result = await func(*args, **kwargs)
except Exception as e:
exception = e
end_time = unixtime()
return name, result, end_time - start_time, exception
def depart_datetime(item: StrDict) -> datetime:
"""Return a datetime for this travel item.
If the travel item already has a datetime return that, otherwise if the
departure time is just a date return midnight UTC for that date.
"""
depart = item["depart"]
if isinstance(depart, datetime):
return depart
return datetime.combine(depart, time.min).replace(tzinfo=ZoneInfo("UTC"))