121 lines
3.4 KiB
Python
121 lines
3.4 KiB
Python
"""Utility functions."""
|
|
|
|
import os
|
|
import typing
|
|
from datetime import date, datetime, timedelta, timezone
|
|
from time import time
|
|
|
|
|
|
def as_date(d: datetime | date) -> date:
|
|
"""Convert datetime to date."""
|
|
match d:
|
|
case datetime():
|
|
return d.date()
|
|
case date():
|
|
return d
|
|
case _:
|
|
raise TypeError(f"Unsupported type: {type(d)}")
|
|
|
|
|
|
def as_datetime(d: datetime | date) -> datetime:
|
|
"""Date/time of event."""
|
|
match d:
|
|
case datetime():
|
|
return d
|
|
case date():
|
|
return datetime.combine(d, datetime.min.time()).replace(tzinfo=timezone.utc)
|
|
case _:
|
|
raise TypeError(f"Unsupported type: {type(d)}")
|
|
|
|
|
|
def timedelta_display(delta: timedelta) -> str:
|
|
"""Format timedelta as a human readable string."""
|
|
total_seconds = int(delta.total_seconds())
|
|
days, remainder = divmod(total_seconds, 24 * 60 * 60)
|
|
hours, remainder = divmod(remainder, 60 * 60)
|
|
mins, secs = divmod(remainder, 60)
|
|
|
|
return " ".join(
|
|
f"{v} {label}"
|
|
for v, label in ((days, "days"), (hours, "hrs"), (mins, "mins"))
|
|
if v
|
|
)
|
|
|
|
|
|
def plural(value: int, unit: str) -> str:
|
|
"""Value + unit with unit written as singular or plural as appropriate."""
|
|
return f"{value} {unit}{'s' if value > 1 else ''}"
|
|
|
|
|
|
def human_readable_delta(future_date: date) -> str | None:
|
|
"""
|
|
Calculate the human-readable time delta for a given future date.
|
|
|
|
Args:
|
|
future_date (date): The future date as a datetime.date object.
|
|
|
|
Returns:
|
|
str: Human-readable time delta.
|
|
"""
|
|
# Ensure the input is a future date
|
|
if future_date <= date.today():
|
|
return None
|
|
|
|
# Calculate the delta
|
|
delta = future_date - date.today()
|
|
|
|
# Convert delta to a more human-readable format
|
|
months, days = divmod(delta.days, 30)
|
|
weeks, days = divmod(days, 7)
|
|
|
|
# Formatting the output
|
|
parts = [
|
|
plural(value, unit)
|
|
for value, unit in ((months, "month"), (weeks, "week"), (days, "days"))
|
|
if value > 0
|
|
]
|
|
return " ".join(parts) if parts else None
|
|
|
|
|
|
def filename_timestamp(filename: str, ext: str) -> tuple[datetime, str] | None:
|
|
"""Get datetime from filename."""
|
|
try:
|
|
ts = datetime.strptime(filename, f"%Y-%m-%d_%H:%M:%S.{ext}")
|
|
except ValueError:
|
|
return None
|
|
return (ts, filename)
|
|
|
|
|
|
def get_most_recent_file(directory: str, ext: str) -> str | None:
|
|
"""Get most recent file from directory."""
|
|
existing = [
|
|
x for x in (filename_timestamp(f, ext) for f in os.listdir(directory)) if x
|
|
]
|
|
if not existing:
|
|
return None
|
|
existing.sort(reverse=True)
|
|
return os.path.join(directory, existing[0][1])
|
|
|
|
|
|
def make_waste_dir(data_dir: str) -> None:
|
|
"""Make waste dir if missing."""
|
|
waste_dir = os.path.join(data_dir, "waste")
|
|
if not os.path.exists(waste_dir):
|
|
os.mkdir(waste_dir)
|
|
|
|
|
|
async def time_function(
|
|
name: str,
|
|
func: typing.Callable[..., typing.Coroutine[typing.Any, typing.Any, typing.Any]],
|
|
*args: typing.Any,
|
|
**kwargs: typing.Any,
|
|
) -> tuple[str, typing.Any, float, Exception | None]:
|
|
"""Time the execution of an asynchronous function."""
|
|
start_time, result, exception = time(), None, None
|
|
try:
|
|
result = await func(*args, **kwargs)
|
|
except Exception as e:
|
|
exception = e
|
|
end_time = time()
|
|
return name, result, end_time - start_time, exception
|