Directory rename

This commit is contained in:
Edward Betts 2025-11-15 19:00:09 +00:00
parent 74ec79ad9e
commit 9fbe130841
8 changed files with 1 additions and 1 deletions

View file

@ -0,0 +1,59 @@
"""Package initialization for the alt text generator app."""
from pathlib import Path
from flask import Flask
from .cache import AltTextCache
from .config import load_settings
from .immich import ImmichClient
from .openai_client import AltTextGenerator
from .mastodon import MastodonClient
def create_app() -> Flask:
"""Create and configure the Flask application."""
package_path = Path(__file__).resolve().parent
project_root = package_path.parent
app = Flask(
__name__,
template_folder=str(project_root / "templates"),
static_folder=str(project_root / "static"),
)
settings = load_settings()
app.config.update(settings)
secret_key = app.config.get("SECRET_KEY") or "dev-secret-key"
app.config["SECRET_KEY"] = secret_key
db_path = app.config.get("ALT_TEXT_DB")
if not db_path:
db_path = str(Path(app.instance_path) / "alt_text_cache.db")
app.config["ALT_TEXT_DB"] = db_path
Path(app.instance_path).mkdir(parents=True, exist_ok=True)
app.immich_client = ImmichClient(
base_url=app.config["IMMICH_API_URL"],
api_key=app.config["IMMICH_API_KEY"],
)
app.alt_text_cache = AltTextCache(db_path)
app.alt_text_generator = AltTextGenerator(
api_key=app.config["OPENAI_API_KEY"],
model=app.config.get("OPENAI_MODEL", "gpt-4o-mini"),
)
mastodon_token = app.config.get("MASTODON_ACCESS_TOKEN")
if mastodon_token:
app.mastodon_client = MastodonClient(
base_url=app.config.get("MASTODON_BASE_URL", "http://localhost:3000"),
access_token=mastodon_token,
)
else:
app.mastodon_client = None
from . import routes # pragma: no cover
app.register_blueprint(routes.bp)
return app

View file

@ -0,0 +1,59 @@
"""SQLite-backed cache for generated alt text."""
from __future__ import annotations
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
class AltTextCache:
"""Minimal cache with a SQLite backend."""
def __init__(self, db_path: str) -> None:
self.db_path = db_path
self._ensure_db()
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
return conn
def _ensure_db(self) -> None:
path = Path(self.db_path)
path.parent.mkdir(parents=True, exist_ok=True)
with self._connect() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS alt_text (
asset_id TEXT PRIMARY KEY,
alt_text TEXT NOT NULL,
updated_at TEXT NOT NULL
)
"""
)
conn.commit()
def get(self, asset_id: str) -> Optional[str]:
with self._connect() as conn:
row = conn.execute(
"SELECT alt_text FROM alt_text WHERE asset_id = ?",
(asset_id,),
).fetchone()
return row[0] if row else None
def set(self, asset_id: str, alt_text: str) -> None:
timestamp = datetime.now(timezone.utc).isoformat()
with self._connect() as conn:
conn.execute(
"""
INSERT INTO alt_text(asset_id, alt_text, updated_at)
VALUES(?, ?, ?)
ON CONFLICT(asset_id)
DO UPDATE SET alt_text = excluded.alt_text,
updated_at = excluded.updated_at
""",
(asset_id, alt_text, timestamp),
)
conn.commit()

View file

@ -0,0 +1,46 @@
"""Application configuration helpers."""
from __future__ import annotations
import os
from typing import Dict
from dotenv import load_dotenv
DEFAULT_IMMICH_URL = "https://photos.4angle.com/"
class ConfigError(RuntimeError):
"""Raised when critical configuration is missing."""
def load_settings() -> Dict[str, str]:
"""Load configuration from environment variables and ``api_keys``."""
load_dotenv()
settings: Dict[str, str] = {
"IMMICH_API_URL": os.getenv("IMMICH_API_URL", DEFAULT_IMMICH_URL).rstrip("/"),
"IMMICH_API_KEY": os.getenv("IMMICH_API_KEY", ""),
"OPENAI_API_KEY": os.getenv("OPENAI_API_KEY", ""),
"RECENT_DAYS": int(os.getenv("RECENT_DAYS", "3")),
"OPENAI_MODEL": os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
"ALT_TEXT_DB": os.getenv("ALT_TEXT_DB", ""),
"SECRET_KEY": os.getenv("SECRET_KEY", ""),
"MASTODON_BASE_URL": os.getenv(
"MASTODON_BASE_URL", "http://localhost:3000"
).rstrip("/"),
"MASTODON_ACCESS_TOKEN": os.getenv("MASTODON_ACCESS_TOKEN", ""),
"MASTODON_CLIENT_KEY": os.getenv("MASTODON_CLIENT_KEY", ""),
"MASTODON_CLIENT_SECRET": os.getenv("MASTODON_CLIENT_SECRET", ""),
}
missing = [
key for key in ("IMMICH_API_KEY", "OPENAI_API_KEY") if not settings.get(key)
]
if missing:
raise ConfigError(
"Missing required configuration values: " + ", ".join(missing)
)
return settings

214
station_announcer/immich.py Normal file
View file

@ -0,0 +1,214 @@
"""Client helpers for talking to Immich."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional, Tuple
import requests
class ImmichError(RuntimeError):
"""Raised when the Immich API is unavailable or returns an error."""
@dataclass
class ImmichAsset:
"""Subset of metadata needed by the UI."""
id: str
file_name: str
captured_at: Optional[str]
thumbnail_url: str
preview_url: str
original_url: str
web_url: str
latitude: Optional[float]
longitude: Optional[float]
location_label: Optional[str]
class ImmichClient:
"""Lightweight wrapper around the Immich REST API."""
def __init__(self, base_url: str, api_key: str) -> None:
if not base_url:
raise ValueError("base_url is required")
if not api_key:
raise ValueError("api_key is required")
self.base_url = base_url.rstrip("/")
self.session = requests.Session()
self.session.headers.update(
{
"x-api-key": api_key,
"accept": "application/json",
}
)
def _request(self, method: str, path: str, **kwargs: Any) -> Any:
url = f"{self.base_url}{path}"
timeout = kwargs.pop("timeout", 15)
try:
response = self.session.request(method, url, timeout=timeout, **kwargs)
response.raise_for_status()
if response.content:
return response.json()
return {}
except (
requests.RequestException
) as exc: # pragma: no cover - network failure path
raise ImmichError(str(exc)) from exc
def _parse_coordinate(self, value: Any) -> Optional[float]:
if value is None:
return None
if isinstance(value, (int, float)):
return float(value)
try:
return float(value)
except (TypeError, ValueError):
return None
def _build_location_label(self, *sources: Dict[str, Any]) -> Optional[str]:
parts: List[str] = []
for source in sources:
if not source:
continue
for key in ("city", "state", "country"):
value = source.get(key)
if value and value not in parts:
parts.append(value)
return ", ".join(parts) if parts else None
def _build_asset(self, asset_data: Dict[str, Any]) -> ImmichAsset:
asset_id = asset_data.get("id")
if not asset_id:
raise ImmichError("Asset payload missing id")
file_name = (
asset_data.get("originalFileName") or asset_data.get("fileName") or "Photo"
)
captured_at = (
asset_data.get("fileCreatedAt")
or asset_data.get("createdAt")
or asset_data.get("exifInfo", {}).get("dateTimeOriginal")
)
exif_info = asset_data.get("exifInfo") or {}
position = asset_data.get("position") or asset_data.get("geolocation") or {}
latitude = self._parse_coordinate(
exif_info.get("latitude")
or exif_info.get("gpsLatitude")
or position.get("latitude")
)
longitude = self._parse_coordinate(
exif_info.get("longitude")
or exif_info.get("gpsLongitude")
or position.get("longitude")
)
location_label = self._build_location_label(exif_info, position)
thumbnail_url = (
f"{self.base_url}/api/assets/{asset_id}/thumbnail?size=thumbnail"
)
preview_url = f"{self.base_url}/api/assets/{asset_id}/thumbnail?size=preview"
original_url = f"{self.base_url}/api/assets/{asset_id}/original"
web_url = f"{self.base_url}/photos/{asset_id}"
return ImmichAsset(
id=asset_id,
file_name=file_name,
captured_at=captured_at,
thumbnail_url=thumbnail_url,
preview_url=preview_url,
original_url=original_url,
web_url=web_url,
latitude=latitude,
longitude=longitude,
location_label=location_label,
)
def get_recent_assets(self, days: int = 3, limit: int = 200) -> List[ImmichAsset]:
"""Fetch assets created in the last ``days`` days."""
since = datetime.now(timezone.utc) - timedelta(days=days)
since_iso = since.isoformat().replace("+00:00", "Z")
payload = {
"size": limit,
"page": 1,
"orderBy": "takenAt",
"orderDirection": "DESC",
"metadata": {
"types": ["IMAGE"],
"takenAfter": since_iso,
},
}
data = self._request("POST", "/api/search/metadata", json=payload)
def _normalize_items(value: Any) -> List[Any]:
if isinstance(value, list):
return value
if isinstance(value, dict):
for key in ("items", "results", "data", "assets"):
nested = value.get(key)
if isinstance(nested, list):
return nested
return []
items: List[Any] = []
for key in ("items", "assets", "results", "data"):
candidate = _normalize_items(data.get(key))
if candidate:
items = candidate
break
assets: List[ImmichAsset] = []
for item in items:
asset_payload: Optional[Any] = None
if isinstance(item, dict):
asset_payload = item.get("asset", item)
if isinstance(asset_payload, str):
asset_payload = {"id": asset_payload}
elif isinstance(item, str):
asset_payload = {"id": item}
if isinstance(asset_payload, dict):
# Some responses only send the id; fetch missing metadata.
if set(asset_payload.keys()) == {"id"}:
assets.append(self.get_asset(asset_payload["id"]))
else:
assets.append(self._build_asset(asset_payload))
return assets
def get_asset(self, asset_id: str) -> ImmichAsset:
"""Fetch a single asset."""
data = self._request("GET", f"/api/assets/{asset_id}")
return self._build_asset(data)
def fetch_asset_content(self, asset_id: str, variant: str) -> Tuple[bytes, str]:
"""Download binary image content for the requested variant."""
variant = variant.lower()
if variant == "thumbnail":
path = f"/api/assets/{asset_id}/thumbnail"
params = {"size": "thumbnail"}
elif variant == "preview":
path = f"/api/assets/{asset_id}/thumbnail"
params = {"size": "preview"}
elif variant == "original":
path = f"/api/assets/{asset_id}/original"
params = None
else:
raise ImmichError(f"Unsupported asset variant: {variant}")
url = f"{self.base_url}{path}"
try:
response = self.session.get(url, params=params, timeout=30)
response.raise_for_status()
except requests.RequestException as exc: # pragma: no cover
raise ImmichError(str(exc)) from exc
mime_type = response.headers.get("Content-Type", "application/octet-stream")
return response.content, mime_type

View file

@ -0,0 +1,117 @@
"""Client helpers for posting to Mastodon."""
from __future__ import annotations
from typing import Any, Dict, Optional, Sequence
import requests
class MastodonError(RuntimeError):
"""Raised when the Mastodon API indicates a failure."""
class MastodonClient:
"""Minimal Mastodon API wrapper for uploading media and posting statuses."""
def __init__(self, base_url: str, access_token: str) -> None:
if not base_url:
raise ValueError("base_url is required")
if not access_token:
raise ValueError("access_token is required")
self.base_url = base_url.rstrip("/")
self.session = requests.Session()
self.session.headers.update(
{
"Authorization": f"Bearer {access_token}",
"Accept": "application/json",
"User-Agent": "ImmichAltTextHelper/0.1 (+https://photos.4angle.com)",
}
)
self._account_id: Optional[str] = None
def _raise_for_error(self, response: requests.Response) -> None:
try:
payload = response.json()
except Exception: # pragma: no cover - fallback path
payload = response.text
message = f"{response.status_code} {response.reason}: {payload}"
raise MastodonError(message)
def _request(self, method: str, path: str, **kwargs: Any) -> Dict[str, Any]:
url = f"{self.base_url}{path}"
try:
response = self.session.request(method, url, timeout=30, **kwargs)
except requests.RequestException as exc: # pragma: no cover - network failure
raise MastodonError(str(exc)) from exc
if not response.ok:
self._raise_for_error(response)
if response.content:
return response.json()
return {}
def upload_media(
self, filename: str, data: bytes, mime_type: str, alt_text: str
) -> str:
files = {
"file": (
filename or "photo.jpg",
data,
mime_type or "application/octet-stream",
)
}
form = {}
if alt_text:
form["description"] = alt_text
payload = self._request("POST", "/api/v2/media", files=files, data=form)
media_id = payload.get("id")
if not media_id:
raise MastodonError("Mastodon response missing media id")
return media_id
def create_status(
self, text: str, media_ids: Sequence[str], in_reply_to_id: Optional[str] = None
) -> Dict[str, str]:
if not text.strip():
raise MastodonError("Post text cannot be empty")
payload = {
"status": text,
"language": "en",
"media_ids": list(media_ids),
}
if in_reply_to_id:
payload["in_reply_to_id"] = in_reply_to_id
data = self._request("POST", "/api/v1/statuses", json=payload)
status_id = data.get("id")
status_url = data.get("url")
if not status_id:
raise MastodonError("Mastodon response missing status id")
return {"id": status_id, "url": status_url}
def _get_account_id(self) -> str:
if not self._account_id:
data = self._request("GET", "/api/v1/accounts/verify_credentials")
account_id = data.get("id")
if not account_id:
raise MastodonError("Unable to determine account id")
self._account_id = str(account_id)
return self._account_id
def get_latest_status(self) -> Optional[Dict[str, Any]]:
account_id = self._get_account_id()
params = {"limit": 1, "exclude_reblogs": True}
items = self._request(
"GET", f"/api/v1/accounts/{account_id}/statuses", params=params
)
if isinstance(items, list) and items:
status = items[0]
return {
"id": status.get("id"),
"content": status.get("content"),
"created_at": status.get("created_at"),
"url": status.get("url"),
}
return None

View file

@ -0,0 +1,142 @@
"""Thin wrapper around the OpenAI API for generating alt text."""
from __future__ import annotations
from typing import Any, Dict, List, Optional
import requests
class OpenAIClientError(RuntimeError):
"""Raised when the OpenAI API cannot fulfill a request."""
class AltTextGenerationError(OpenAIClientError):
"""Raised when the OpenAI API cannot generate alt text."""
class TextImprovementError(OpenAIClientError):
"""Raised when the OpenAI API cannot improve post text."""
class AltTextGenerator:
"""Request alt text from a GPT-4o compatible OpenAI endpoint."""
def __init__(self, api_key: str, model: str = "gpt-4o-mini") -> None:
if not api_key:
raise ValueError("OPENAI_API_KEY is required")
self.model = model
self.session = requests.Session()
self.session.headers.update(
{
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
)
self.endpoint = "https://api.openai.com/v1/chat/completions"
def generate_alt_text(
self,
image_source: str,
notes: Optional[str] = None,
captured_at: Optional[str] = None,
location: Optional[str] = None,
coordinates: Optional[str] = None,
) -> str:
if not image_source:
raise AltTextGenerationError("Image URL required for alt text generation")
prompt_lines = [
"You write vivid but concise Mastodon alt text.",
"Keep it under 400 characters and mention key visual details, colours, "
"actions, and text. No need to mention the mood unless you think it is "
"super relevant.",
"Avoid speculation beyond what is visible. Use UK English spelling.",
]
if notes:
prompt_lines.append(f"Creator notes: {notes.strip()}")
if captured_at:
prompt_lines.append(f"Captured: {captured_at}")
if location:
prompt_lines.append(f"Location: {location}")
if coordinates:
prompt_lines.append(f"Coordinates: {coordinates}")
text_prompt = "\n".join(prompt_lines)
content: List[Dict[str, Any]] = [
{"type": "text", "text": text_prompt},
{"type": "image_url", "image_url": {"url": image_source}},
]
payload = {
"model": self.model,
"temperature": 0.2,
"max_tokens": 300,
"messages": [
{
"role": "system",
"content": "You help write accessible alt text for social media posts.",
},
{
"role": "user",
"content": content,
},
],
}
try:
response = self.session.post(self.endpoint, json=payload, timeout=30)
response.raise_for_status()
data = response.json()
except requests.RequestException as exc: # pragma: no cover
raise AltTextGenerationError(str(exc)) from exc
choices = data.get("choices") or []
if not choices:
raise AltTextGenerationError("OpenAI response did not include choices")
message = choices[0].get("message", {})
content_text = message.get("content")
if not content_text:
raise AltTextGenerationError("OpenAI response missing content")
return content_text.strip()
def improve_post_text(
self, draft_text: str, instructions: Optional[str] = None
) -> str:
if not draft_text or not draft_text.strip():
raise TextImprovementError("Post text cannot be empty")
prompt_parts = [
"You review Mastodon drafts and rewrite them in UK English.",
"Keep the tone warm, accessible, and descriptive without exaggeration.",
"Ensure clarity, fix spelling or grammar, and keep content suitable for social media.",
]
if instructions:
prompt_parts.append(f"Additional instructions: {instructions.strip()}")
prompt_parts.append("Return only the improved post text.")
user_content = f"Draft post:\\n{draft_text.strip()}"
payload = {
"model": self.model,
"temperature": 0.4,
"max_tokens": 400,
"messages": [
{"role": "system", "content": "\n".join(prompt_parts)},
{"role": "user", "content": user_content},
],
}
try:
response = self.session.post(self.endpoint, json=payload, timeout=30)
response.raise_for_status()
data = response.json()
except requests.RequestException as exc: # pragma: no cover
raise TextImprovementError(str(exc)) from exc
choices = data.get("choices") or []
if not choices:
raise TextImprovementError("OpenAI response did not include choices")
content_text = choices[0].get("message", {}).get("content")
if not content_text:
raise TextImprovementError("OpenAI response missing content")
return content_text.strip()

410
station_announcer/routes.py Normal file
View file

@ -0,0 +1,410 @@
"""Flask routes for the alt text generator."""
from __future__ import annotations
import base64
from datetime import datetime
from flask import (
Blueprint,
Response,
abort,
current_app,
flash,
redirect,
render_template,
request,
url_for,
)
from .immich import ImmichAsset, ImmichError
from .mastodon import MastodonError
from .openai_client import AltTextGenerationError, TextImprovementError
bp = Blueprint("main", __name__)
def _parse_timestamp(raw: str | None) -> datetime | None:
if not raw:
return None
try:
normalized = raw.replace("Z", "+00:00")
return datetime.fromisoformat(normalized)
except ValueError:
return None
def _humanize_timestamp(raw: str | None) -> str | None:
dt = _parse_timestamp(raw)
if not dt:
return raw
return dt.strftime("%d %b %Y %H:%M")
def _timestamp_with_timezone(raw: str | None) -> str | None:
dt = _parse_timestamp(raw)
if not dt:
return raw
label = dt.strftime("%d %b %Y %H:%M")
tzname = dt.tzname()
if tzname:
return f"{label} {tzname}"
offset = dt.utcoffset()
if offset is not None:
total_minutes = int(offset.total_seconds() // 60)
hours, minutes = divmod(abs(total_minutes), 60)
sign = "+" if total_minutes >= 0 else "-"
return f"{label} UTC{sign}{hours:02d}:{minutes:02d}"
return f"{label} (timezone unknown)"
def _coordinates_text(asset: ImmichAsset) -> str | None:
if asset.latitude is None or asset.longitude is None:
return None
return f"{asset.latitude:.5f}, {asset.longitude:.5f}"
def _generate_alt_text_for_asset(asset: ImmichAsset, notes: str | None = None) -> str:
immich_client = current_app.immich_client
generator = current_app.alt_text_generator
content, mime_type = immich_client.fetch_asset_content(asset.id, "preview")
data_url = "data:{};base64,{}".format(
mime_type, base64.b64encode(content).decode("ascii")
)
return generator.generate_alt_text(
data_url,
notes,
captured_at=_timestamp_with_timezone(asset.captured_at),
location=asset.location_label,
coordinates=_coordinates_text(asset),
)
def _ensure_alt_text(asset: ImmichAsset) -> str:
cache = current_app.alt_text_cache
cached = cache.get(asset.id)
if cached:
return cached
generated = _generate_alt_text_for_asset(asset)
cache.set(asset.id, generated)
return generated
def _unique_asset_ids(values: list[str]) -> list[str]:
seen: set[str] = set()
ordered: list[str] = []
for value in values:
value = (value or "").strip()
if not value or value in seen:
continue
seen.add(value)
ordered.append(value)
return ordered
MAX_MEDIA_ATTACHMENTS = 4
@bp.route("/")
def index():
return redirect(url_for("main.compose_select"))
@bp.route("/alt-text-helper")
def alt_helper():
immich_client = current_app.immich_client
alt_cache = current_app.alt_text_cache
days = current_app.config.get("RECENT_DAYS", 3)
assets = []
error_message = None
try:
for asset in immich_client.get_recent_assets(days=days):
assets.append(
{
"id": asset.id,
"file_name": asset.file_name,
"captured_at": asset.captured_at,
"captured_display": _humanize_timestamp(asset.captured_at),
"thumbnail_url": asset.thumbnail_url,
"alt_text": alt_cache.get(asset.id),
}
)
except ImmichError as exc:
error_message = str(exc)
return render_template("index.html", assets=assets, error_message=error_message)
@bp.route("/assets/<asset_id>", methods=["GET", "POST"])
def asset_detail(asset_id: str):
immich_client = current_app.immich_client
alt_cache = current_app.alt_text_cache
error_message = None
notes = ""
try:
asset = immich_client.get_asset(asset_id)
except ImmichError as exc:
return render_template(
"detail.html",
asset=None,
alt_text=None,
error_message=str(exc),
notes=notes,
)
alt_text = alt_cache.get(asset_id)
if request.method == "POST":
notes = request.form.get("notes", "")
try:
generated = _generate_alt_text_for_asset(asset, notes)
alt_cache.set(asset_id, generated)
flash("Alt text generated.")
return redirect(url_for("main.asset_detail", asset_id=asset_id))
except ImmichError as exc:
error_message = f"Failed to fetch image: {exc}"
except AltTextGenerationError as exc:
error_message = str(exc)
formatted_asset = {
"id": asset.id,
"file_name": asset.file_name,
"captured_at": asset.captured_at,
"captured_display": _humanize_timestamp(asset.captured_at),
"thumbnail_url": asset.thumbnail_url,
"preview_url": asset.preview_url,
"original_url": asset.original_url,
"web_url": asset.web_url,
"latitude": asset.latitude,
"longitude": asset.longitude,
"location": asset.location_label,
}
return render_template(
"detail.html",
asset=formatted_asset,
alt_text=alt_cache.get(asset_id),
error_message=error_message,
notes=notes,
)
@bp.route("/compose/select", methods=["GET", "POST"])
def compose_select():
immich_client = current_app.immich_client
alt_cache = current_app.alt_text_cache
days = current_app.config.get("RECENT_DAYS", 3)
asset_objs: list[ImmichAsset] = []
error_message = None
try:
asset_objs = immich_client.get_recent_assets(days=days, limit=200)
except ImmichError as exc:
error_message = str(exc)
assets = []
for asset in asset_objs:
assets.append(
{
"id": asset.id,
"file_name": asset.file_name,
"captured_display": _humanize_timestamp(asset.captured_at),
"alt_text": alt_cache.get(asset.id),
}
)
selected_set: set[str] = set()
if request.method == "POST":
selected = _unique_asset_ids(request.form.getlist("asset_ids"))
selected_set = set(selected)
if not selected:
error_message = "Select at least one photo."
elif len(selected) > MAX_MEDIA_ATTACHMENTS:
error_message = f"Select up to {MAX_MEDIA_ATTACHMENTS} photos."
else:
return redirect(url_for("main.compose_draft", ids=",".join(selected)))
return render_template(
"compose_select.html",
assets=assets,
error_message=error_message,
max_photos=MAX_MEDIA_ATTACHMENTS,
selected_ids=selected_set,
)
@bp.route("/compose/draft", methods=["GET", "POST"])
def compose_draft():
mastodon_client = getattr(current_app, "mastodon_client", None)
immich_client = current_app.immich_client
alt_cache = current_app.alt_text_cache
generator = current_app.alt_text_generator
error_message = None
post_text = ""
instructions = ""
reply_to_latest = False
if request.method == "POST":
asset_ids = _unique_asset_ids(request.form.getlist("asset_ids"))
post_text = request.form.get("post_text", "")
instructions = request.form.get("post_instructions", "")
reply_values = request.form.getlist("reply_to_latest")
reply_to_latest = any(value in {"1", "on", "true"} for value in reply_values)
latest_status_id_form = request.form.get("latest_status_id")
else:
ids_param = request.args.get("ids", "")
asset_ids = _unique_asset_ids(ids_param.split(",") if ids_param else [])
latest_status_id_form = None
if not asset_ids:
flash("Choose photos before composing a post.")
return redirect(url_for("main.compose_select"))
if len(asset_ids) > MAX_MEDIA_ATTACHMENTS:
flash(f"Select at most {MAX_MEDIA_ATTACHMENTS} photos.")
return redirect(url_for("main.compose_select"))
latest_status_raw = None
latest_status_error = None
if mastodon_client:
try:
latest_status_raw = mastodon_client.get_latest_status()
except MastodonError as exc:
latest_status_error = str(exc)
assets: list[ImmichAsset] = []
try:
for asset_id in asset_ids:
assets.append(immich_client.get_asset(asset_id))
except ImmichError as exc:
return render_template(
"compose_draft.html",
assets=[],
error_message=str(exc),
post_text=post_text,
instructions=instructions,
mastodon_ready=bool(mastodon_client),
)
latest_status = None
if latest_status_raw:
latest_status = {
"id": latest_status_raw.get("id"),
"content": latest_status_raw.get("content", ""),
"created_display": _humanize_timestamp(latest_status_raw.get("created_at")),
"url": latest_status_raw.get("url"),
}
elif latest_status_id_form:
latest_status = {
"id": latest_status_id_form,
"content": "",
"created_display": None,
"url": None,
}
asset_entries = []
for asset in assets:
if request.method == "POST":
alt_value = request.form.get(f"alt_text_{asset.id}", "")
else:
try:
alt_value = _ensure_alt_text(asset)
except (ImmichError, AltTextGenerationError) as exc:
alt_value = ""
if not error_message:
error_message = str(exc)
asset_entries.append(
{
"id": asset.id,
"file_name": asset.file_name,
"captured_display": _humanize_timestamp(asset.captured_at),
"preview_url": url_for(
"main.asset_proxy", asset_id=asset.id, variant="preview"
),
"alt_text": alt_value,
}
)
if request.method == "POST":
action = request.form.get("action")
if action == "refine":
try:
post_text = generator.improve_post_text(post_text, instructions)
flash("Post refined with ChatGPT.")
except TextImprovementError as exc:
error_message = str(exc)
elif action == "post":
if not mastodon_client:
error_message = "Configure Mastodon access before posting."
elif not post_text.strip():
error_message = "Enter post text before posting."
else:
try:
media_ids: list[str] = []
for entry, asset in zip(asset_entries, assets):
alt_value = entry["alt_text"].strip()
if not alt_value:
raise ValueError(
f"Alt text missing for {entry['file_name']}"
)
entry["alt_text"] = alt_value
alt_cache.set(asset.id, alt_value)
content, mime_type = immich_client.fetch_asset_content(
asset.id, "original"
)
media_id = mastodon_client.upload_media(
asset.file_name, content, mime_type, alt_value
)
media_ids.append(media_id)
reply_target_id = latest_status_id_form or (
latest_status["id"] if latest_status else None
)
reply_id = reply_target_id if reply_to_latest else None
status = mastodon_client.create_status(
post_text.strip(), media_ids, in_reply_to_id=reply_id
)
status_url = status.get("url")
if status_url:
flash(
f'Post sent to Mastodon: <a href="{status_url}" target="_blank" rel="noopener">{status_url}</a>',
"success",
)
else:
flash("Post sent to Mastodon.", "success")
return redirect(url_for("main.index"))
except ValueError as exc:
error_message = str(exc)
except ImmichError as exc:
error_message = f"Failed to fetch media: {exc}"
except MastodonError as exc:
error_message = str(exc)
elif action:
error_message = "Unknown action."
return render_template(
"compose_draft.html",
assets=asset_entries,
error_message=error_message,
post_text=post_text,
instructions=instructions,
mastodon_ready=bool(mastodon_client),
max_photos=MAX_MEDIA_ATTACHMENTS,
latest_status=latest_status,
latest_status_error=latest_status_error,
reply_to_latest=reply_to_latest,
)
@bp.route("/proxy/assets/<asset_id>/<variant>")
def asset_proxy(asset_id: str, variant: str):
immich_client = current_app.immich_client
try:
content, mimetype = immich_client.fetch_asset_content(asset_id, variant)
except ImmichError as exc:
abort(404, description=str(exc))
return Response(content, mimetype=mimetype)