debian-todo/debian_todo/__init__.py
2026-02-01 14:02:53 +00:00

424 lines
14 KiB
Python

"""CLI tool for tracking Debian packages with new upstream versions.
Fetches TODO items from the Debian UDD (Ultimate Debian Database) and displays
packages where a new upstream version is available. Filters out pre-release
versions and shows team/uploader metadata.
"""
import json
import re
import glob
import os
from pathlib import Path
from typing import Any, Optional, cast
from urllib.request import urlopen
import click
from debian import deb822
from rich.console import Console
from rich.table import Table
TODO_URL = "https://udd.debian.org/dmd/?email1=edward%404angle.com&format=json"
TODO_PATH = Path("todo.json")
NOTES_PATH = Path("notes")
TodoItem = dict[str, Any]
TodoList = list[TodoItem]
PRERELEASE_RE = re.compile(r"(?i)(?:^|[0-9.\-])(?:alpha|beta|rc|a|b)\d*")
CURRENTLY_RE = re.compile(
r"^(?P<new>.+?)\s*\(currently in unstable:\s*(?P<current>.+?)\)\s*$"
)
CACHE_PATH = Path(".vcs_git_cache.json")
CACHE_VERSION = 4
SourceInfo = dict[str, str]
HIDE_UPLOADER = "Edward Betts <edward@4angle.com>"
def parse_details(details: str) -> tuple[str, Optional[str]]:
"""Parse version details string into new and current versions.
Args:
details: String like "1.2.3 (currently in unstable: 1.2.2-1)"
Returns:
Tuple of (new_version, current_version). current_version is None
if not present in the input.
"""
match = CURRENTLY_RE.match(details)
if match:
return match.group("new").strip(), match.group("current").strip()
return details.strip(), None
def is_prerelease_version(details: str) -> bool:
"""Check if the new version in details is a pre-release.
Detects versions containing alpha, beta, rc, a, or b suffixes.
"""
new_version, _ = parse_details(details)
return bool(PRERELEASE_RE.search(new_version))
def vcs_git_to_team(vcs_git: Optional[str]) -> Optional[str]:
"""Extract team name from a Vcs-Git URL.
For salsa.debian.org URLs, extracts the group/team name.
Returns the full URL for non-Salsa repositories.
"""
if not vcs_git:
return None
match = re.search(r"salsa\.debian\.org/([^/]+)/", vcs_git)
if not match:
return vcs_git
return match.group(1)
def normalize_uploaders(uploaders: str) -> str:
"""Clean and format uploaders string.
Splits comma-separated uploaders, removes the configured HIDE_UPLOADER,
and joins with newlines.
"""
parts = [part.strip().strip(",") for part in uploaders.split(",")]
cleaned = [part for part in parts if part and part != HIDE_UPLOADER]
return "\n".join(cleaned)
def load_cache(source_paths: list[str]) -> Optional[dict[str, SourceInfo]]:
"""Load cached Vcs-Git and uploader info if still valid.
Returns None if cache is missing, corrupted, or stale (based on
Sources file mtimes or cache version mismatch).
"""
try:
with CACHE_PATH.open("r", encoding="utf-8") as handle:
data = json.load(handle)
except (FileNotFoundError, json.JSONDecodeError, OSError):
return None
if not isinstance(data, dict):
return None
if data.get("cache_version") != CACHE_VERSION:
return None
cached_mtimes = data.get("sources_mtimes", {})
if not isinstance(cached_mtimes, dict):
return None
for path in source_paths:
try:
mtime = os.path.getmtime(path)
except OSError:
return None
if str(mtime) != str(cached_mtimes.get(path)):
return None
vcs_by_source = data.get("vcs_by_source")
if not isinstance(vcs_by_source, dict):
return None
normalized: dict[str, SourceInfo] = {}
for key, value in vcs_by_source.items():
if not isinstance(key, str):
return None
if isinstance(value, str):
normalized[key] = {"vcs_git": value, "uploaders": ""}
continue
if not isinstance(value, dict):
return None
vcs_git = value.get("vcs_git")
uploaders = value.get("uploaders")
if not isinstance(vcs_git, str) or not isinstance(uploaders, str):
return None
normalized[key] = {"vcs_git": vcs_git, "uploaders": uploaders}
return normalized
def save_cache(source_paths: list[str], vcs_by_source: dict[str, SourceInfo]) -> None:
"""Save Vcs-Git and uploader info to cache file.
Stores current mtimes of Sources files for cache invalidation.
"""
sources_mtimes: dict[str, float] = {}
for path in source_paths:
try:
sources_mtimes[path] = os.path.getmtime(path)
except OSError:
return
data = {
"cache_version": CACHE_VERSION,
"sources_mtimes": sources_mtimes,
"vcs_by_source": vcs_by_source,
}
try:
with CACHE_PATH.open("w", encoding="utf-8") as handle:
json.dump(data, handle, sort_keys=True)
except OSError:
return
def load_source_info_map() -> dict[str, SourceInfo]:
"""Load Vcs-Git and uploader info for all source packages.
Parses APT Sources files from /var/lib/apt/lists/ and extracts
Vcs-Git URLs and Uploaders fields. Results are cached to disk.
"""
source_paths = sorted(glob.glob("/var/lib/apt/lists/*Sources"))
cached = load_cache(source_paths)
if cached is not None:
return cached
vcs_by_source: dict[str, SourceInfo] = {}
for path in source_paths:
with Path(path).open("r", encoding="utf-8", errors="replace") as handle:
for entry in deb822.Deb822.iter_paragraphs(handle):
source = entry.get("Source") or entry.get("Package")
if not source or source in vcs_by_source:
continue
vcs_git = entry.get("Vcs-Git")
uploaders = entry.get("Uploaders")
team = None
if vcs_git:
team = vcs_git_to_team(vcs_git.strip())
uploaders_text = ""
if uploaders:
uploaders_text = normalize_uploaders(
re.sub(r"\s+", " ", uploaders).strip()
)
if team or uploaders_text:
vcs_by_source[source] = {
"vcs_git": team or "",
"uploaders": uploaders_text,
}
save_cache(source_paths, vcs_by_source)
return vcs_by_source
def fetch_todo_list() -> TodoList:
"""Fetch the TODO list from UDD as JSON."""
with urlopen(TODO_URL) as response:
payload = response.read().decode("utf-8")
return cast(TodoList, json.loads(payload))
def save_todo_list(todo_list: TodoList) -> None:
"""Save TODO list to local JSON file."""
with TODO_PATH.open("w", encoding="utf-8") as handle:
json.dump(todo_list, handle, indent=2, ensure_ascii=True)
handle.write("\n")
def summarize_sources(todo_list: TodoList) -> set[str]:
"""Extract set of source package names from TODO list."""
sources: set[str] = set()
for item in todo_list:
source = item.get(":source")
if isinstance(source, str):
sources.add(source)
return sources
def load_notes() -> dict[str, str]:
"""Load per-package notes from the notes file.
Each line should be: <source-package> <note text>
Multiple notes for the same package are joined with semicolons.
"""
if not NOTES_PATH.exists():
return {}
notes_by_source: dict[str, list[str]] = {}
with NOTES_PATH.open("r", encoding="utf-8") as handle:
for line in handle:
stripped = line.strip()
if not stripped:
continue
parts = stripped.split(None, 1)
if not parts:
continue
source = parts[0]
note = parts[1] if len(parts) > 1 else ""
notes_by_source.setdefault(source, []).append(note)
return {
source: "; ".join(note for note in notes if note)
for source, notes in notes_by_source.items()
}
def filter_todo_list(todo_list: TodoList, include_prerelease: bool = False) -> TodoList:
"""Filter TODO list to only new upstream version items.
Removes non-upstream items, pre-releases (unless include_prerelease=True),
and items where normalized versions already match.
"""
filtered: TodoList = []
for item in todo_list:
shortname = item.get(":shortname")
details = item.get(":details")
if not isinstance(shortname, str) or not isinstance(details, str):
continue
if not shortname.startswith("newupstream_"):
continue
if not include_prerelease and is_prerelease_version(details):
continue
new_version, current_version = parse_details(details)
if current_version:
normalized_new = normalize_upstream_version(new_version)
normalized_current = normalize_upstream_version(current_version)
if normalized_new == normalized_current:
continue
filtered.append(item)
return filtered
def normalize_upstream_version(version: str) -> str:
"""Strip epoch and Debian revision from version string.
"1:2.3.4-5" -> "2.3.4"
"""
if ":" in version:
version = version.split(":", 1)[1]
if "-" in version:
version = version.rsplit("-", 1)[0]
return version.strip()
def print_changes(old_list: TodoList, new_list: TodoList) -> None:
"""Print added and removed packages between two TODO lists."""
def format_details(details: str) -> str:
new_version, current_version = parse_details(details)
display_new = new_version
if is_prerelease_version(details):
display_new = f"{new_version} (pre)"
return f"New: {display_new} | Current: {current_version or '-'}"
def build_details(todo_list: TodoList) -> dict[str, str]:
details_by_source: dict[str, str] = {}
for item in todo_list:
source = item.get(":source")
details = item.get(":details")
if isinstance(source, str) and isinstance(details, str):
details_by_source[source] = format_details(details)
return details_by_source
old_details = build_details(old_list)
new_details = build_details(new_list)
old_sources = set(old_details)
new_sources = set(new_details)
added = sorted(new_sources - old_sources)
removed = sorted(old_sources - new_sources)
if not added and not removed:
print("No changes in todo.json.")
return
if added:
print("New packages:")
for source in added:
print(f" {source} - {new_details.get(source, '-')}")
if removed:
print("Removed packages:")
for source in removed:
print(f" {source} - {old_details.get(source, '-')}")
def list_todos(include_prerelease: bool) -> None:
"""Display filtered TODO items in a table.
Downloads todo.json from UDD if not present locally.
"""
if not TODO_PATH.exists():
print("Downloading todo.json...")
todo_list = fetch_todo_list()
save_todo_list(todo_list)
else:
with TODO_PATH.open("r", encoding="utf-8") as handle:
todo_list = cast(TodoList, json.load(handle))
source_info_map = load_source_info_map()
notes_by_source = load_notes()
console = Console()
filtered = filter_todo_list(todo_list, include_prerelease=include_prerelease)
is_narrow = console.width < 100
if is_narrow:
console.print("Debian New Upstream TODOs")
else:
table = Table(title="Debian New Upstream TODOs")
table.add_column("Source", style="bold")
table.add_column("New", style="green", justify="right")
table.add_column("Current", style="dim", justify="right")
table.add_column("Team", justify="right")
table.add_column("Note/Uploaders", overflow="fold")
for todo in filtered:
new_version, current_version = parse_details(todo[":details"])
source_info = source_info_map.get(todo[":source"], {})
vcs_git = source_info.get("vcs_git")
uploaders = source_info.get("uploaders", "")
source = todo[":source"]
note = notes_by_source.get(source, "")
display_new = new_version
if is_prerelease_version(todo[":details"]):
display_new = f"[yellow]{new_version} (pre)[/yellow]"
display_team = vcs_git or "-"
if display_team == "homeassistant-team":
display_team = "HA"
elif display_team.endswith("-team"):
display_team = display_team[:-5]
display_note = note or uploaders or "-"
if is_narrow:
parts = [f"[bold]{source}[/bold]"]
if display_team != "-":
parts.append(f"[dim]{display_team}[/dim]")
parts.append(f"N: {display_new}")
parts.append(f"C: {current_version or '-'}")
console.print(" ".join(parts))
if display_note != "-":
console.print(" " + display_note)
else:
table.add_row(
source,
display_new,
current_version or "-",
display_team,
display_note,
)
if not is_narrow:
console.print(table)
console.print(f"Packages: {len(filtered)}")
def update_todos() -> None:
"""Fetch latest TODO list from UDD and show changes."""
old_list: TodoList = []
if TODO_PATH.exists():
with TODO_PATH.open("r", encoding="utf-8") as handle:
old_list = cast(TodoList, json.load(handle))
todo_list = fetch_todo_list()
save_todo_list(todo_list)
print_changes(filter_todo_list(old_list), filter_todo_list(todo_list))
@click.group(invoke_without_command=True)
@click.pass_context
def cli(context: click.Context) -> None:
"""Track Debian packages with new upstream versions."""
if context.invoked_subcommand is None:
list_todos(include_prerelease=False)
@cli.command("list", help="List filtered new upstream todo entries.")
@click.option(
"--show-prerelease",
is_flag=True,
help="Include prerelease versions in the list output.",
)
def list_command(show_prerelease: bool) -> None:
list_todos(include_prerelease=show_prerelease)
@cli.command("update", help="Fetch the latest todo list and show changes.")
def update_command() -> None:
update_todos()
def main() -> None:
"""Entry point for the CLI."""
cli()