#!/usr/bin/python3 import json import re import glob import os from pathlib import Path from typing import Any, Optional, cast from urllib.request import urlopen import click from debian import deb822 from rich.console import Console from rich.table import Table TODO_URL = "https://udd.debian.org/dmd/?email1=edward%404angle.com&format=json" TODO_PATH = Path("todo.json") NOTES_PATH = Path("notes") TodoItem = dict[str, Any] TodoList = list[TodoItem] PRERELEASE_RE = re.compile(r"(?i)(?:^|[0-9.\-])(?:alpha|beta|rc|a|b)\d*") CURRENTLY_RE = re.compile( r"^(?P.+?)\s*\(currently in unstable:\s*(?P.+?)\)\s*$" ) CACHE_PATH = Path(".vcs_git_cache.json") CACHE_VERSION = 4 SourceInfo = dict[str, str] HIDE_UPLOADER = "Edward Betts " def parse_details(details: str) -> tuple[str, Optional[str]]: match = CURRENTLY_RE.match(details) if match: return match.group("new").strip(), match.group("current").strip() return details.strip(), None def is_prerelease_version(details: str) -> bool: new_version, _ = parse_details(details) return bool(PRERELEASE_RE.search(new_version)) def vcs_git_to_team(vcs_git: Optional[str]) -> Optional[str]: if not vcs_git: return None match = re.search(r"salsa\.debian\.org/([^/]+)/", vcs_git) if not match: return vcs_git return match.group(1) def normalize_uploaders(uploaders: str) -> str: parts = [part.strip().strip(",") for part in uploaders.split(",")] cleaned = [part for part in parts if part and part != HIDE_UPLOADER] return "\n".join(cleaned) def load_cache(source_paths: list[str]) -> Optional[dict[str, SourceInfo]]: try: with CACHE_PATH.open("r", encoding="utf-8") as handle: data = json.load(handle) except (FileNotFoundError, json.JSONDecodeError, OSError): return None if not isinstance(data, dict): return None if data.get("cache_version") != CACHE_VERSION: return None cached_mtimes = data.get("sources_mtimes", {}) if not isinstance(cached_mtimes, dict): return None for path in source_paths: try: mtime = os.path.getmtime(path) except OSError: return None if str(mtime) != str(cached_mtimes.get(path)): return None vcs_by_source = data.get("vcs_by_source") if not isinstance(vcs_by_source, dict): return None normalized: dict[str, SourceInfo] = {} for key, value in vcs_by_source.items(): if not isinstance(key, str): return None if isinstance(value, str): normalized[key] = {"vcs_git": value, "uploaders": ""} continue if not isinstance(value, dict): return None vcs_git = value.get("vcs_git") uploaders = value.get("uploaders") if not isinstance(vcs_git, str) or not isinstance(uploaders, str): return None normalized[key] = {"vcs_git": vcs_git, "uploaders": uploaders} return normalized def save_cache(source_paths: list[str], vcs_by_source: dict[str, SourceInfo]) -> None: sources_mtimes: dict[str, float] = {} for path in source_paths: try: sources_mtimes[path] = os.path.getmtime(path) except OSError: return data = { "cache_version": CACHE_VERSION, "sources_mtimes": sources_mtimes, "vcs_by_source": vcs_by_source, } try: with CACHE_PATH.open("w", encoding="utf-8") as handle: json.dump(data, handle, sort_keys=True) except OSError: return def load_source_info_map() -> dict[str, SourceInfo]: source_paths = sorted(glob.glob("/var/lib/apt/lists/*Sources")) cached = load_cache(source_paths) if cached is not None: return cached vcs_by_source: dict[str, SourceInfo] = {} for path in source_paths: with Path(path).open("r", encoding="utf-8", errors="replace") as handle: for entry in deb822.Deb822.iter_paragraphs(handle): source = entry.get("Source") or entry.get("Package") if not source or source in vcs_by_source: continue vcs_git = entry.get("Vcs-Git") uploaders = entry.get("Uploaders") team = None if vcs_git: team = vcs_git_to_team(vcs_git.strip()) uploaders_text = "" if uploaders: uploaders_text = normalize_uploaders( re.sub(r"\s+", " ", uploaders).strip() ) if team or uploaders_text: vcs_by_source[source] = { "vcs_git": team or "", "uploaders": uploaders_text, } save_cache(source_paths, vcs_by_source) return vcs_by_source def fetch_todo_list() -> TodoList: with urlopen(TODO_URL) as response: payload = response.read().decode("utf-8") return cast(TodoList, json.loads(payload)) def save_todo_list(todo_list: TodoList) -> None: with TODO_PATH.open("w", encoding="utf-8") as handle: json.dump(todo_list, handle, indent=2, ensure_ascii=True) handle.write("\n") def summarize_sources(todo_list: TodoList) -> set[str]: sources: set[str] = set() for item in todo_list: source = item.get(":source") if isinstance(source, str): sources.add(source) return sources def load_notes() -> dict[str, str]: if not NOTES_PATH.exists(): return {} notes_by_source: dict[str, list[str]] = {} with NOTES_PATH.open("r", encoding="utf-8") as handle: for line in handle: stripped = line.strip() if not stripped: continue parts = stripped.split(None, 1) if not parts: continue source = parts[0] note = parts[1] if len(parts) > 1 else "" notes_by_source.setdefault(source, []).append(note) return { source: "; ".join(note for note in notes if note) for source, notes in notes_by_source.items() } def filter_todo_list(todo_list: TodoList, include_prerelease: bool = False) -> TodoList: filtered: TodoList = [] for item in todo_list: shortname = item.get(":shortname") details = item.get(":details") if not isinstance(shortname, str) or not isinstance(details, str): continue if not shortname.startswith("newupstream_"): continue if not include_prerelease and is_prerelease_version(details): continue new_version, current_version = parse_details(details) if current_version: normalized_new = normalize_upstream_version(new_version) normalized_current = normalize_upstream_version(current_version) if normalized_new == normalized_current: continue filtered.append(item) return filtered def normalize_upstream_version(version: str) -> str: if ":" in version: version = version.split(":", 1)[1] if "-" in version: version = version.rsplit("-", 1)[0] return version.strip() def print_changes(old_list: TodoList, new_list: TodoList) -> None: def format_details(details: str) -> str: new_version, current_version = parse_details(details) display_new = new_version if is_prerelease_version(details): display_new = f"{new_version} (pre)" return f"New: {display_new} | Current: {current_version or '-'}" def build_details(todo_list: TodoList) -> dict[str, str]: details_by_source: dict[str, str] = {} for item in todo_list: source = item.get(":source") details = item.get(":details") if isinstance(source, str) and isinstance(details, str): details_by_source[source] = format_details(details) return details_by_source old_details = build_details(old_list) new_details = build_details(new_list) old_sources = set(old_details) new_sources = set(new_details) added = sorted(new_sources - old_sources) removed = sorted(old_sources - new_sources) if not added and not removed: print("No changes in todo.json.") return if added: print("New packages:") for source in added: print(f" {source} - {new_details.get(source, '-')}") if removed: print("Removed packages:") for source in removed: print(f" {source} - {old_details.get(source, '-')}") def list_todos(include_prerelease: bool) -> None: with TODO_PATH.open("r", encoding="utf-8") as handle: todo_list = cast(TodoList, json.load(handle)) source_info_map = load_source_info_map() notes_by_source = load_notes() console = Console() filtered = filter_todo_list(todo_list, include_prerelease=include_prerelease) is_narrow = console.width < 100 if is_narrow: console.print("Debian New Upstream TODOs") else: table = Table(title="Debian New Upstream TODOs") table.add_column("Source", style="bold") table.add_column("New", style="green", justify="right") table.add_column("Current", style="dim", justify="right") table.add_column("Team", justify="right") table.add_column("Note/Uploaders", overflow="fold") for todo in filtered: new_version, current_version = parse_details(todo[":details"]) source_info = source_info_map.get(todo[":source"], {}) vcs_git = source_info.get("vcs_git") uploaders = source_info.get("uploaders", "") source = todo[":source"] note = notes_by_source.get(source, "") display_new = new_version if is_prerelease_version(todo[":details"]): display_new = f"[yellow]{new_version} (pre)[/yellow]" display_team = vcs_git or "-" if display_team == "homeassistant-team": display_team = "HA" elif display_team.endswith("-team"): display_team = display_team[:-5] display_note = note or uploaders or "-" if is_narrow: parts = [f"[bold]{source}[/bold]"] if display_team != "-": parts.append(f"[dim]{display_team}[/dim]") parts.append(f"N: {display_new}") parts.append(f"C: {current_version or '-'}") console.print(" ".join(parts)) if display_note != "-": console.print(" " + display_note) else: table.add_row( source, display_new, current_version or "-", display_team, display_note, ) if not is_narrow: console.print(table) console.print(f"Packages: {len(filtered)}") def update_todos() -> None: old_list: TodoList = [] if TODO_PATH.exists(): with TODO_PATH.open("r", encoding="utf-8") as handle: old_list = cast(TodoList, json.load(handle)) todo_list = fetch_todo_list() save_todo_list(todo_list) print_changes(filter_todo_list(old_list), filter_todo_list(todo_list)) @click.group(invoke_without_command=True) @click.pass_context def cli(context: click.Context) -> None: if context.invoked_subcommand is None: list_todos(include_prerelease=False) @cli.command("list", help="List filtered new upstream todo entries.") @click.option( "--show-prerelease", is_flag=True, help="Include prerelease versions in the list output.", ) def list_command(show_prerelease: bool) -> None: list_todos(include_prerelease=show_prerelease) @cli.command("update", help="Fetch the latest todo list and show changes.") def update_command() -> None: update_todos() if __name__ == "__main__": cli()