#!/usr/bin/python3 import json import re import glob import os from pathlib import Path from typing import Any, Optional, cast from urllib.request import urlopen import click from debian import deb822 TODO_URL = "https://udd.debian.org/dmd/?email1=edward%404angle.com&format=json" TODO_PATH = Path("todo.json") TodoItem = dict[str, Any] TodoList = list[TodoItem] PRERELEASE_RE = re.compile(r"(?i)(?:^|[0-9.\-])(?:alpha|beta|rc|a|b)\d*") CURRENTLY_RE = re.compile( r"^(?P.+?)\s*\(currently in unstable:\s*(?P.+?)\)\s*$" ) CACHE_PATH = Path(".vcs_git_cache.json") def parse_details(details: str) -> tuple[str, Optional[str]]: match = CURRENTLY_RE.match(details) if match: return match.group("new").strip(), match.group("current").strip() return details.strip(), None def is_prerelease_version(details: str) -> bool: new_version, _ = parse_details(details) return bool(PRERELEASE_RE.search(new_version)) def vcs_git_to_team(vcs_git: Optional[str]) -> Optional[str]: if not vcs_git: return None match = re.search(r"salsa\.debian\.org/([^/]+)/", vcs_git) if not match: return vcs_git return match.group(1) def load_cache(source_paths: list[str]) -> Optional[dict[str, str]]: try: with CACHE_PATH.open("r", encoding="utf-8") as handle: data = json.load(handle) except (FileNotFoundError, json.JSONDecodeError, OSError): return None if not isinstance(data, dict): return None cached_mtimes = data.get("sources_mtimes", {}) if not isinstance(cached_mtimes, dict): return None for path in source_paths: try: mtime = os.path.getmtime(path) except OSError: return None if str(mtime) != str(cached_mtimes.get(path)): return None vcs_by_source = data.get("vcs_by_source") if not isinstance(vcs_by_source, dict): return None if not all( isinstance(key, str) and isinstance(value, str) for key, value in vcs_by_source.items() ): return None return cast(dict[str, str], vcs_by_source) def save_cache(source_paths: list[str], vcs_by_source: dict[str, str]) -> None: sources_mtimes: dict[str, float] = {} for path in source_paths: try: sources_mtimes[path] = os.path.getmtime(path) except OSError: return data = { "sources_mtimes": sources_mtimes, "vcs_by_source": vcs_by_source, } try: with CACHE_PATH.open("w", encoding="utf-8") as handle: json.dump(data, handle, sort_keys=True) except OSError: return def load_vcs_git_map() -> dict[str, str]: source_paths = sorted(glob.glob("/var/lib/apt/lists/*Sources")) cached = load_cache(source_paths) if cached is not None: return cached vcs_by_source: dict[str, str] = {} for path in source_paths: with Path(path).open("r", encoding="utf-8", errors="replace") as handle: for entry in deb822.Deb822.iter_paragraphs(handle): source = entry.get("Source") or entry.get("Package") if not source or source in vcs_by_source: continue vcs_git = entry.get("Vcs-Git") if vcs_git: team = vcs_git_to_team(vcs_git.strip()) if team: vcs_by_source[source] = team save_cache(source_paths, vcs_by_source) return vcs_by_source def fetch_todo_list() -> TodoList: with urlopen(TODO_URL) as response: payload = response.read().decode("utf-8") return cast(TodoList, json.loads(payload)) def save_todo_list(todo_list: TodoList) -> None: with TODO_PATH.open("w", encoding="utf-8") as handle: json.dump(todo_list, handle, indent=2, ensure_ascii=True) handle.write("\n") def summarize_sources(todo_list: TodoList) -> set[str]: sources: set[str] = set() for item in todo_list: source = item.get(":source") if isinstance(source, str): sources.add(source) return sources def filter_todo_list(todo_list: TodoList, include_prerelease: bool = False) -> TodoList: filtered: TodoList = [] for item in todo_list: shortname = item.get(":shortname") details = item.get(":details") if not isinstance(shortname, str) or not isinstance(details, str): continue if not shortname.startswith("newupstream_"): continue if not include_prerelease and is_prerelease_version(details): continue filtered.append(item) return filtered def print_changes(old_list: TodoList, new_list: TodoList) -> None: old_sources = summarize_sources(old_list) new_sources = summarize_sources(new_list) added = sorted(new_sources - old_sources) removed = sorted(old_sources - new_sources) if not added and not removed: print("No changes in todo.json.") return if added: print("New packages:") for source in added: print(f" {source}") if removed: print("Removed packages:") for source in removed: print(f" {source}") def list_todos(include_prerelease: bool) -> None: with TODO_PATH.open("r", encoding="utf-8") as handle: todo_list = cast(TodoList, json.load(handle)) vcs_git_map = load_vcs_git_map() for todo in filter_todo_list(todo_list, include_prerelease=include_prerelease): new_version, current_version = parse_details(todo[":details"]) vcs_git = vcs_git_map.get(todo[":source"]) print((todo[":source"], new_version, current_version, vcs_git)) def update_todos() -> None: old_list: TodoList = [] if TODO_PATH.exists(): with TODO_PATH.open("r", encoding="utf-8") as handle: old_list = cast(TodoList, json.load(handle)) todo_list = fetch_todo_list() save_todo_list(todo_list) print_changes(filter_todo_list(old_list), filter_todo_list(todo_list)) @click.group(invoke_without_command=True) @click.pass_context def cli(context: click.Context) -> None: if context.invoked_subcommand is None: list_todos(include_prerelease=False) @cli.command("list", help="List filtered new upstream todo entries.") @click.option( "--show-prerelease", is_flag=True, help="Include prerelease versions in the list output.", ) def list_command(show_prerelease: bool) -> None: list_todos(include_prerelease=show_prerelease) @cli.command("update", help="Fetch the latest todo list and show changes.") def update_command() -> None: update_todos() if __name__ == "__main__": cli()