debian-todo/todo

313 lines
10 KiB
Python
Executable file

#!/usr/bin/python3
import json
import re
import glob
import os
from pathlib import Path
from typing import Any, Optional, cast
from urllib.request import urlopen
import click
from debian import deb822
from rich.console import Console
from rich.table import Table
TODO_URL = "https://udd.debian.org/dmd/?email1=edward%404angle.com&format=json"
TODO_PATH = Path("todo.json")
NOTES_PATH = Path("notes")
TodoItem = dict[str, Any]
TodoList = list[TodoItem]
PRERELEASE_RE = re.compile(r"(?i)(?:^|[0-9.\-])(?:alpha|beta|rc|a|b)\d*")
CURRENTLY_RE = re.compile(
r"^(?P<new>.+?)\s*\(currently in unstable:\s*(?P<current>.+?)\)\s*$"
)
CACHE_PATH = Path(".vcs_git_cache.json")
CACHE_VERSION = 4
SourceInfo = dict[str, str]
HIDE_UPLOADER = "Edward Betts <edward@4angle.com>"
def parse_details(details: str) -> tuple[str, Optional[str]]:
match = CURRENTLY_RE.match(details)
if match:
return match.group("new").strip(), match.group("current").strip()
return details.strip(), None
def is_prerelease_version(details: str) -> bool:
new_version, _ = parse_details(details)
return bool(PRERELEASE_RE.search(new_version))
def vcs_git_to_team(vcs_git: Optional[str]) -> Optional[str]:
if not vcs_git:
return None
match = re.search(r"salsa\.debian\.org/([^/]+)/", vcs_git)
if not match:
return vcs_git
return match.group(1)
def normalize_uploaders(uploaders: str) -> str:
parts = [part.strip().strip(",") for part in uploaders.split(",")]
cleaned = [part for part in parts if part and part != HIDE_UPLOADER]
return "\n".join(cleaned)
def load_cache(source_paths: list[str]) -> Optional[dict[str, SourceInfo]]:
try:
with CACHE_PATH.open("r", encoding="utf-8") as handle:
data = json.load(handle)
except (FileNotFoundError, json.JSONDecodeError, OSError):
return None
if not isinstance(data, dict):
return None
if data.get("cache_version") != CACHE_VERSION:
return None
cached_mtimes = data.get("sources_mtimes", {})
if not isinstance(cached_mtimes, dict):
return None
for path in source_paths:
try:
mtime = os.path.getmtime(path)
except OSError:
return None
if str(mtime) != str(cached_mtimes.get(path)):
return None
vcs_by_source = data.get("vcs_by_source")
if not isinstance(vcs_by_source, dict):
return None
normalized: dict[str, SourceInfo] = {}
for key, value in vcs_by_source.items():
if not isinstance(key, str):
return None
if isinstance(value, str):
normalized[key] = {"vcs_git": value, "uploaders": ""}
continue
if not isinstance(value, dict):
return None
vcs_git = value.get("vcs_git")
uploaders = value.get("uploaders")
if not isinstance(vcs_git, str) or not isinstance(uploaders, str):
return None
normalized[key] = {"vcs_git": vcs_git, "uploaders": uploaders}
return normalized
def save_cache(source_paths: list[str], vcs_by_source: dict[str, SourceInfo]) -> None:
sources_mtimes: dict[str, float] = {}
for path in source_paths:
try:
sources_mtimes[path] = os.path.getmtime(path)
except OSError:
return
data = {
"cache_version": CACHE_VERSION,
"sources_mtimes": sources_mtimes,
"vcs_by_source": vcs_by_source,
}
try:
with CACHE_PATH.open("w", encoding="utf-8") as handle:
json.dump(data, handle, sort_keys=True)
except OSError:
return
def load_source_info_map() -> dict[str, SourceInfo]:
source_paths = sorted(glob.glob("/var/lib/apt/lists/*Sources"))
cached = load_cache(source_paths)
if cached is not None:
return cached
vcs_by_source: dict[str, SourceInfo] = {}
for path in source_paths:
with Path(path).open("r", encoding="utf-8", errors="replace") as handle:
for entry in deb822.Deb822.iter_paragraphs(handle):
source = entry.get("Source") or entry.get("Package")
if not source or source in vcs_by_source:
continue
vcs_git = entry.get("Vcs-Git")
uploaders = entry.get("Uploaders")
team = None
if vcs_git:
team = vcs_git_to_team(vcs_git.strip())
uploaders_text = ""
if uploaders:
uploaders_text = normalize_uploaders(
re.sub(r"\s+", " ", uploaders).strip()
)
if team or uploaders_text:
vcs_by_source[source] = {
"vcs_git": team or "",
"uploaders": uploaders_text,
}
save_cache(source_paths, vcs_by_source)
return vcs_by_source
def fetch_todo_list() -> TodoList:
with urlopen(TODO_URL) as response:
payload = response.read().decode("utf-8")
return cast(TodoList, json.loads(payload))
def save_todo_list(todo_list: TodoList) -> None:
with TODO_PATH.open("w", encoding="utf-8") as handle:
json.dump(todo_list, handle, indent=2, ensure_ascii=True)
handle.write("\n")
def summarize_sources(todo_list: TodoList) -> set[str]:
sources: set[str] = set()
for item in todo_list:
source = item.get(":source")
if isinstance(source, str):
sources.add(source)
return sources
def load_notes() -> dict[str, str]:
if not NOTES_PATH.exists():
return {}
notes_by_source: dict[str, list[str]] = {}
with NOTES_PATH.open("r", encoding="utf-8") as handle:
for line in handle:
stripped = line.strip()
if not stripped:
continue
parts = stripped.split(None, 1)
if not parts:
continue
source = parts[0]
note = parts[1] if len(parts) > 1 else ""
notes_by_source.setdefault(source, []).append(note)
return {
source: "; ".join(note for note in notes if note)
for source, notes in notes_by_source.items()
}
def filter_todo_list(todo_list: TodoList, include_prerelease: bool = False) -> TodoList:
filtered: TodoList = []
for item in todo_list:
shortname = item.get(":shortname")
details = item.get(":details")
if not isinstance(shortname, str) or not isinstance(details, str):
continue
if not shortname.startswith("newupstream_"):
continue
if not include_prerelease and is_prerelease_version(details):
continue
new_version, current_version = parse_details(details)
if current_version:
normalized_new = normalize_upstream_version(new_version)
normalized_current = normalize_upstream_version(current_version)
if normalized_new == normalized_current:
continue
filtered.append(item)
return filtered
def normalize_upstream_version(version: str) -> str:
if ":" in version:
version = version.split(":", 1)[1]
if "-" in version:
version = version.rsplit("-", 1)[0]
return version.strip()
def print_changes(old_list: TodoList, new_list: TodoList) -> None:
old_sources = summarize_sources(old_list)
new_sources = summarize_sources(new_list)
added = sorted(new_sources - old_sources)
removed = sorted(old_sources - new_sources)
if not added and not removed:
print("No changes in todo.json.")
return
if added:
print("New packages:")
for source in added:
print(f" {source}")
if removed:
print("Removed packages:")
for source in removed:
print(f" {source}")
def list_todos(include_prerelease: bool) -> None:
with TODO_PATH.open("r", encoding="utf-8") as handle:
todo_list = cast(TodoList, json.load(handle))
source_info_map = load_source_info_map()
notes_by_source = load_notes()
console = Console()
table = Table(title="Debian New Upstream TODOs")
table.add_column("Source", style="bold")
table.add_column("New", style="green", justify="right")
table.add_column("Current", style="dim", justify="right")
table.add_column("Team", justify="right")
table.add_column("Note/Uploaders", overflow="fold")
filtered = filter_todo_list(todo_list, include_prerelease=include_prerelease)
for todo in filtered:
new_version, current_version = parse_details(todo[":details"])
source_info = source_info_map.get(todo[":source"], {})
vcs_git = source_info.get("vcs_git")
uploaders = source_info.get("uploaders", "")
source = todo[":source"]
note = notes_by_source.get(source, "")
display_new = new_version
if is_prerelease_version(todo[":details"]):
display_new = f"[yellow]{new_version} (pre)[/yellow]"
display_team = vcs_git or "-"
if display_team == "homeassistant-team":
display_team = "HA"
elif display_team.endswith("-team"):
display_team = display_team[:-5]
display_note = note or uploaders or "-"
table.add_row(
source,
display_new,
current_version or "-",
display_team,
display_note,
)
console.print(table)
console.print(f"Packages: {len(filtered)}")
def update_todos() -> None:
old_list: TodoList = []
if TODO_PATH.exists():
with TODO_PATH.open("r", encoding="utf-8") as handle:
old_list = cast(TodoList, json.load(handle))
todo_list = fetch_todo_list()
save_todo_list(todo_list)
print_changes(filter_todo_list(old_list), filter_todo_list(todo_list))
@click.group(invoke_without_command=True)
@click.pass_context
def cli(context: click.Context) -> None:
if context.invoked_subcommand is None:
list_todos(include_prerelease=False)
@cli.command("list", help="List filtered new upstream todo entries.")
@click.option(
"--show-prerelease",
is_flag=True,
help="Include prerelease versions in the list output.",
)
def list_command(show_prerelease: bool) -> None:
list_todos(include_prerelease=show_prerelease)
@cli.command("update", help="Fetch the latest todo list and show changes.")
def update_command() -> None:
update_todos()
if __name__ == "__main__":
cli()