agenda/agenda/utils.py

119 lines
3.3 KiB
Python

"""Utility functions."""
import os
import typing
from datetime import date, datetime, timedelta, timezone
from time import time
def as_date(d: datetime | date) -> date:
"""Convert datetime to date."""
match d:
case datetime():
return d.date()
case date():
return d
case _:
raise TypeError(f"Unsupported type: {type(d)}")
def as_datetime(d: datetime | date) -> datetime:
"""Date/time of event."""
match d:
case datetime():
return d
case date():
return datetime.combine(d, datetime.min.time()).replace(tzinfo=timezone.utc)
case _:
raise TypeError(f"Unsupported type: {type(d)}")
def timedelta_display(delta: timedelta) -> str:
"""Format timedelta as a human readable string."""
total_seconds = int(delta.total_seconds())
days, remainder = divmod(total_seconds, 24 * 60 * 60)
hours, remainder = divmod(remainder, 60 * 60)
mins, secs = divmod(remainder, 60)
return " ".join(
f"{v} {label}"
for v, label in ((days, "days"), (hours, "hrs"), (mins, "mins"))
if v
)
def human_readable_delta(future_date: date) -> str | None:
"""
Calculate the human-readable time delta for a given future date.
Args:
future_date (date): The future date as a datetime.date object.
Returns:
str: Human-readable time delta.
"""
# Ensure the input is a future date
if future_date <= date.today():
return None
# Calculate the delta
delta = future_date - date.today()
# Convert delta to a more human-readable format
months, days = divmod(delta.days, 30)
weeks, days = divmod(days, 7)
# Formatting the output
parts = []
if months > 0:
parts.append(f"{months} month{'s' if months > 1 else ''}")
if weeks > 0:
parts.append(f"{weeks} week{'s' if weeks > 1 else ''}")
if days > 0:
parts.append(f"{days} day{'s' if days > 1 else ''}")
return " ".join(parts) if parts else None
def filename_timestamp(filename: str, ext: str) -> tuple[datetime, str] | None:
"""Get datetime from filename."""
try:
ts = datetime.strptime(filename, f"%Y-%m-%d_%H:%M:%S.{ext}")
except ValueError:
return None
return (ts, filename)
def get_most_recent_file(directory: str, ext: str) -> str | None:
"""Get most recent file from directory."""
existing = [
x for x in (filename_timestamp(f, ext) for f in os.listdir(directory)) if x
]
if not existing:
return None
existing.sort(reverse=True)
return os.path.join(directory, existing[0][1])
def make_waste_dir(data_dir: str) -> None:
"""Make waste dir if missing."""
waste_dir = os.path.join(data_dir, "waste")
if not os.path.exists(waste_dir):
os.mkdir(waste_dir)
async def time_function(
name: str,
func: typing.Callable[..., typing.Coroutine[typing.Any, typing.Any, typing.Any]],
*args: typing.Any,
**kwargs: typing.Any,
) -> tuple[str, typing.Any, float, Exception | None]:
"""Time the execution of an asynchronous function."""
start_time, result, exception = time(), None, None
try:
result = await func(*args, **kwargs)
except Exception as e:
exception = e
end_time = time()
return name, result, end_time - start_time, exception