debian-todo/debian_todo/__init__.py
Edward Betts 3b7a07e1d9 Refactor update_package with comprehensive error handling and integrate update scripts.
Add custom exception hierarchy (PackageUpdateError, PackageNotFoundError,
RepositoryStateError, ExternalCommandError, MissingToolError) for specific
error handling throughout package update operations.

Extract helper functions from 90-line update_package function:
- validate_package_info: Package validation
- resolve_package_directories: Path resolution
- ensure_package_checkout: Salsa checkout with error handling
- validate_repository_state: Check for uncommitted changes
- run_gbp_pq_workflow: Run gbp pq import/switch with error checks
- import_upstream_version: Import upstream using gbp
- run_command: Centralized subprocess execution with consistent errors
- check_required_tools: Validate required tools upfront

Incorporate update scripts as Python functions (replaces external shell scripts):
- update_debian_control: Update Standards-Version, remove obsolete fields
- update_debian_copyright_year: Update copyright years to current
- update_debian_watch: Upgrade watch files from version 4 to 5
- add_salsa_ci: Add debian/salsa-ci.yml if missing
- run_package_updates: Orchestrator for all update operations

Enhance network error handling in fetch_todo_list:
- Handle HTTP errors, network errors, timeouts, invalid JSON
- Add timeout parameter (default 30s)
- Graceful error messages in list_todos and update_todos

Add comprehensive test coverage:
- 67 tests for update-pkg functionality
- Tests for all helper functions and update operations
- Network error handling tests
- 125 total tests, all passing

Benefits:
- No external script dependencies
- Consistent error handling throughout
- Better error messages with actionable guidance
- All update logic in one maintainable codebase

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-01 16:55:50 +00:00

1184 lines
35 KiB
Python

"""CLI tool for tracking Debian packages with new upstream versions.
Fetches TODO items from the Debian UDD (Ultimate Debian Database) and displays
packages where a new upstream version is available. Filters out pre-release
versions and shows team/uploader metadata.
"""
import datetime
import json
import re
import glob
import os
import shutil
import subprocess
import sys
import urllib.error
from pathlib import Path
from typing import Any, Optional, cast
from urllib.request import urlopen
import click
from debian import deb822
from rich.console import Console
from rich.table import Table
TODO_URL = "https://udd.debian.org/dmd/?email1=edward%404angle.com&format=json"
TODO_PATH = Path("todo.json")
NOTES_PATH = Path("notes")
TodoItem = dict[str, Any]
TodoList = list[TodoItem]
PRERELEASE_RE = re.compile(r"(?i)(?:^|[0-9.\-])(?:alpha|beta|rc|a|b)\d*")
CURRENTLY_RE = re.compile(
r"^(?P<new>.+?)\s*\(currently in unstable:\s*(?P<current>.+?)\)\s*$"
)
CACHE_PATH = Path(".vcs_git_cache.json")
CACHE_VERSION = 4
SourceInfo = dict[str, str]
HIDE_UPLOADER = "Edward Betts <edward@4angle.com>"
DEBIAN_SRC_BASE = Path.home() / "src" / "debian"
class PackageUpdateError(Exception):
"""Base exception for package update operations."""
pass
class PackageNotFoundError(PackageUpdateError):
"""Package not found in source info."""
pass
class RepositoryStateError(PackageUpdateError):
"""Repository in invalid state (uncommitted changes, etc)."""
pass
class ExternalCommandError(PackageUpdateError):
"""External command failed (salsa, gbp, dch)."""
def __init__(self, command: str, returncode: int, message: str):
self.command = command
self.returncode = returncode
super().__init__(message)
class MissingToolError(PackageUpdateError):
"""Required external tool not available."""
pass
def run_command(
cmd: list[str],
cwd: Path,
description: str,
capture_output: bool = False,
check: bool = True,
) -> subprocess.CompletedProcess:
"""Run command with consistent error handling.
Args:
cmd: Command and arguments to run
cwd: Working directory for the command
description: Human-readable description of what the command does
capture_output: Whether to capture stdout/stderr
check: Whether to raise exception on non-zero exit
Returns:
CompletedProcess instance
Raises:
ExternalCommandError: If command fails and check=True
MissingToolError: If command not found
"""
try:
result = subprocess.run(
cmd,
cwd=cwd,
capture_output=capture_output,
text=True,
)
if check and result.returncode != 0:
cmd_str = " ".join(cmd)
raise ExternalCommandError(
command=cmd_str,
returncode=result.returncode,
message=f"{description} failed: {cmd_str} (exit code {result.returncode})",
)
return result
except FileNotFoundError as e:
raise MissingToolError(
f"Command not found: {cmd[0]}. Please install it to continue."
) from e
def check_required_tools() -> None:
"""Check that required external tools are available.
Raises:
MissingToolError: If a required tool is not found
"""
required_tools = ["salsa", "gbp", "dch"]
missing_tools = []
for tool in required_tools:
if not shutil.which(tool):
missing_tools.append(tool)
if missing_tools:
tools_str = ", ".join(missing_tools)
raise MissingToolError(
f"Required tools not found: {tools_str}. Please install them to continue."
)
def parse_details(details: str) -> tuple[str, Optional[str]]:
"""Parse version details string into new and current versions.
Args:
details: String like "1.2.3 (currently in unstable: 1.2.2-1)"
Returns:
Tuple of (new_version, current_version). current_version is None
if not present in the input.
"""
match = CURRENTLY_RE.match(details)
if match:
return match.group("new").strip(), match.group("current").strip()
return details.strip(), None
def is_prerelease_version(details: str) -> bool:
"""Check if the new version in details is a pre-release.
Detects versions containing alpha, beta, rc, a, or b suffixes.
"""
new_version, _ = parse_details(details)
return bool(PRERELEASE_RE.search(new_version))
def vcs_git_to_team(vcs_git: Optional[str]) -> Optional[str]:
"""Extract team name from a Vcs-Git URL.
For salsa.debian.org URLs, extracts the group/team name.
Returns the full URL for non-Salsa repositories.
"""
if not vcs_git:
return None
match = re.search(r"salsa\.debian\.org/([^/]+)/", vcs_git)
if not match:
return vcs_git
return match.group(1)
def normalize_uploaders(uploaders: str) -> str:
"""Clean and format uploaders string.
Splits comma-separated uploaders, removes the configured HIDE_UPLOADER,
and joins with newlines.
"""
parts = [part.strip().strip(",") for part in uploaders.split(",")]
cleaned = [part for part in parts if part and part != HIDE_UPLOADER]
return "\n".join(cleaned)
def load_cache(source_paths: list[str]) -> Optional[dict[str, SourceInfo]]:
"""Load cached Vcs-Git and uploader info if still valid.
Returns None if cache is missing, corrupted, or stale (based on
Sources file mtimes or cache version mismatch).
"""
try:
with CACHE_PATH.open("r", encoding="utf-8") as handle:
data = json.load(handle)
except (FileNotFoundError, json.JSONDecodeError, OSError):
return None
if not isinstance(data, dict):
return None
if data.get("cache_version") != CACHE_VERSION:
return None
cached_mtimes = data.get("sources_mtimes", {})
if not isinstance(cached_mtimes, dict):
return None
for path in source_paths:
try:
mtime = os.path.getmtime(path)
except OSError:
return None
if str(mtime) != str(cached_mtimes.get(path)):
return None
vcs_by_source = data.get("vcs_by_source")
if not isinstance(vcs_by_source, dict):
return None
normalized: dict[str, SourceInfo] = {}
for key, value in vcs_by_source.items():
if not isinstance(key, str):
return None
if isinstance(value, str):
normalized[key] = {"vcs_git": value, "uploaders": ""}
continue
if not isinstance(value, dict):
return None
vcs_git = value.get("vcs_git")
uploaders = value.get("uploaders")
if not isinstance(vcs_git, str) or not isinstance(uploaders, str):
return None
normalized[key] = {"vcs_git": vcs_git, "uploaders": uploaders}
return normalized
def save_cache(source_paths: list[str], vcs_by_source: dict[str, SourceInfo]) -> None:
"""Save Vcs-Git and uploader info to cache file.
Stores current mtimes of Sources files for cache invalidation.
"""
sources_mtimes: dict[str, float] = {}
for path in source_paths:
try:
sources_mtimes[path] = os.path.getmtime(path)
except OSError:
return
data = {
"cache_version": CACHE_VERSION,
"sources_mtimes": sources_mtimes,
"vcs_by_source": vcs_by_source,
}
try:
with CACHE_PATH.open("w", encoding="utf-8") as handle:
json.dump(data, handle, sort_keys=True)
except OSError:
return
def load_source_info_map() -> dict[str, SourceInfo]:
"""Load Vcs-Git and uploader info for all source packages.
Parses APT Sources files from /var/lib/apt/lists/ and extracts
Vcs-Git URLs and Uploaders fields. Results are cached to disk.
"""
source_paths = sorted(glob.glob("/var/lib/apt/lists/*Sources"))
cached = load_cache(source_paths)
if cached is not None:
return cached
vcs_by_source: dict[str, SourceInfo] = {}
for path in source_paths:
with Path(path).open("r", encoding="utf-8", errors="replace") as handle:
for entry in deb822.Deb822.iter_paragraphs(handle):
source = entry.get("Source") or entry.get("Package")
if not source or source in vcs_by_source:
continue
vcs_git = entry.get("Vcs-Git")
uploaders = entry.get("Uploaders")
team = None
if vcs_git:
team = vcs_git_to_team(vcs_git.strip())
uploaders_text = ""
if uploaders:
uploaders_text = normalize_uploaders(
re.sub(r"\s+", " ", uploaders).strip()
)
if team or uploaders_text:
vcs_by_source[source] = {
"vcs_git": team or "",
"uploaders": uploaders_text,
}
save_cache(source_paths, vcs_by_source)
return vcs_by_source
def fetch_todo_list(timeout: int = 30) -> TodoList:
"""Fetch the TODO list from UDD as JSON.
Args:
timeout: Request timeout in seconds (default: 30)
Returns:
List of TODO items
Raises:
PackageUpdateError: If network request fails or JSON is invalid
"""
try:
with urlopen(TODO_URL, timeout=timeout) as response:
payload = response.read().decode("utf-8")
except urllib.error.HTTPError as e:
raise PackageUpdateError(
f"HTTP error {e.code} while fetching TODO list from {TODO_URL}: {e.reason}"
) from e
except urllib.error.URLError as e:
raise PackageUpdateError(
f"Network error while fetching TODO list from {TODO_URL}: {e.reason}"
) from e
except TimeoutError as e:
raise PackageUpdateError(
f"Timeout after {timeout}s while fetching TODO list from {TODO_URL}"
) from e
try:
return cast(TodoList, json.loads(payload))
except json.JSONDecodeError as e:
raise PackageUpdateError(
f"Invalid JSON in TODO list response: {e}"
) from e
def save_todo_list(todo_list: TodoList) -> None:
"""Save TODO list to local JSON file."""
with TODO_PATH.open("w", encoding="utf-8") as handle:
json.dump(todo_list, handle, indent=2, ensure_ascii=True)
handle.write("\n")
def summarize_sources(todo_list: TodoList) -> set[str]:
"""Extract set of source package names from TODO list."""
sources: set[str] = set()
for item in todo_list:
source = item.get(":source")
if isinstance(source, str):
sources.add(source)
return sources
def load_notes() -> dict[str, str]:
"""Load per-package notes from the notes file.
Each line should be: <source-package> <note text>
Multiple notes for the same package are joined with semicolons.
"""
if not NOTES_PATH.exists():
return {}
notes_by_source: dict[str, list[str]] = {}
with NOTES_PATH.open("r", encoding="utf-8") as handle:
for line in handle:
stripped = line.strip()
if not stripped:
continue
parts = stripped.split(None, 1)
if not parts:
continue
source = parts[0]
note = parts[1] if len(parts) > 1 else ""
notes_by_source.setdefault(source, []).append(note)
return {
source: "; ".join(note for note in notes if note)
for source, notes in notes_by_source.items()
}
def filter_todo_list(todo_list: TodoList, include_prerelease: bool = False) -> TodoList:
"""Filter TODO list to only new upstream version items.
Removes non-upstream items, pre-releases (unless include_prerelease=True),
and items where normalized versions already match.
"""
filtered: TodoList = []
for item in todo_list:
shortname = item.get(":shortname")
details = item.get(":details")
if not isinstance(shortname, str) or not isinstance(details, str):
continue
if not shortname.startswith("newupstream_"):
continue
if not include_prerelease and is_prerelease_version(details):
continue
new_version, current_version = parse_details(details)
if current_version:
normalized_new = normalize_upstream_version(new_version)
normalized_current = normalize_upstream_version(current_version)
if normalized_new == normalized_current:
continue
filtered.append(item)
return filtered
def normalize_upstream_version(version: str) -> str:
"""Strip epoch and Debian revision from version string.
"1:2.3.4-5" -> "2.3.4"
"""
if ":" in version:
version = version.split(":", 1)[1]
if "-" in version:
version = version.rsplit("-", 1)[0]
return version.strip()
def print_changes(old_list: TodoList, new_list: TodoList) -> None:
"""Print added and removed packages between two TODO lists."""
def format_details(details: str) -> str:
new_version, current_version = parse_details(details)
display_new = new_version
if is_prerelease_version(details):
display_new = f"{new_version} (pre)"
return f"New: {display_new} | Current: {current_version or '-'}"
def build_details(todo_list: TodoList) -> dict[str, str]:
details_by_source: dict[str, str] = {}
for item in todo_list:
source = item.get(":source")
details = item.get(":details")
if isinstance(source, str) and isinstance(details, str):
details_by_source[source] = format_details(details)
return details_by_source
old_details = build_details(old_list)
new_details = build_details(new_list)
old_sources = set(old_details)
new_sources = set(new_details)
added = sorted(new_sources - old_sources)
removed = sorted(old_sources - new_sources)
if not added and not removed:
print("No changes in todo.json.")
return
if added:
print("New packages:")
for source in added:
print(f" {source} - {new_details.get(source, '-')}")
if removed:
print("Removed packages:")
for source in removed:
print(f" {source} - {old_details.get(source, '-')}")
def list_todos(include_prerelease: bool) -> None:
"""Display filtered TODO items in a table.
Downloads todo.json from UDD if not present locally.
"""
if not TODO_PATH.exists():
print("Downloading todo.json...")
try:
todo_list = fetch_todo_list()
save_todo_list(todo_list)
except PackageUpdateError as e:
print(f"Error: {e}", file=sys.stderr)
print("Please try again later or check your network connection.", file=sys.stderr)
sys.exit(1)
else:
with TODO_PATH.open("r", encoding="utf-8") as handle:
todo_list = cast(TodoList, json.load(handle))
source_info_map = load_source_info_map()
notes_by_source = load_notes()
console = Console()
filtered = filter_todo_list(todo_list, include_prerelease=include_prerelease)
is_narrow = console.width < 100
if is_narrow:
console.print("Debian New Upstream TODOs")
else:
table = Table(title="Debian New Upstream TODOs")
table.add_column("Source", style="bold")
table.add_column("New", style="green", justify="right")
table.add_column("Current", style="dim", justify="right")
table.add_column("Team", justify="right")
table.add_column("Note/Uploaders", overflow="fold")
for todo in filtered:
new_version, current_version = parse_details(todo[":details"])
source_info = source_info_map.get(todo[":source"], {})
vcs_git = source_info.get("vcs_git")
uploaders = source_info.get("uploaders", "")
source = todo[":source"]
note = notes_by_source.get(source, "")
display_new = new_version
if is_prerelease_version(todo[":details"]):
display_new = f"[yellow]{new_version} (pre)[/yellow]"
display_team = vcs_git or "-"
if display_team == "homeassistant-team":
display_team = "HA"
elif display_team.endswith("-team"):
display_team = display_team[:-5]
display_note = note or uploaders or "-"
if is_narrow:
parts = [f"[bold]{source}[/bold]"]
if display_team != "-":
parts.append(f"[dim]{display_team}[/dim]")
parts.append(f"N: {display_new}")
parts.append(f"C: {current_version or '-'}")
console.print(" ".join(parts))
if display_note != "-":
console.print(" " + display_note)
else:
table.add_row(
source,
display_new,
current_version or "-",
display_team,
display_note,
)
if not is_narrow:
console.print(table)
console.print(f"Packages: {len(filtered)}")
def update_todos() -> None:
"""Fetch latest TODO list from UDD and show changes."""
old_list: TodoList = []
if TODO_PATH.exists():
with TODO_PATH.open("r", encoding="utf-8") as handle:
old_list = cast(TodoList, json.load(handle))
try:
todo_list = fetch_todo_list()
except PackageUpdateError as e:
print(f"Error: {e}", file=sys.stderr)
print("Failed to fetch updated TODO list.", file=sys.stderr)
sys.exit(1)
save_todo_list(todo_list)
print_changes(filter_todo_list(old_list), filter_todo_list(todo_list))
def team_slug_to_display_name(team_slug: str) -> str:
"""Convert a team slug to a display name for directory structure.
homeassistant-team -> HA
python-team -> python
openstack-team -> openstack
"""
if team_slug == "homeassistant-team":
return "HA"
if team_slug.endswith("-team"):
return team_slug[:-5]
return team_slug
def has_uncommitted_changes(pkg_dir: Path) -> bool:
"""Check if the git repository has uncommitted changes."""
result = subprocess.run(
["git", "status", "--porcelain"],
cwd=pkg_dir,
capture_output=True,
text=True,
)
return bool(result.stdout.strip())
def extract_upstream_version_from_git_log(pkg_dir: Path) -> Optional[str]:
"""Extract upstream version from the latest git commit message."""
result = subprocess.run(
["git", "log", "-1"],
cwd=pkg_dir,
capture_output=True,
text=True,
)
if result.returncode != 0:
return None
match = re.search(
r"Update upstream source from tag 'upstream/(.+?)'", result.stdout
)
if not match:
return None
return match.group(1)
def add_changelog_entry(pkg_dir: Path) -> bool:
"""Add a new debian/changelog entry for the upstream version."""
version = extract_upstream_version_from_git_log(pkg_dir)
if not version:
print("Could not find upstream version in git log.", file=sys.stderr)
return False
new_version = f"{version}-1"
result = subprocess.run(
[
"dch",
"--release-heuristic",
"log",
"--newversion",
new_version,
"New upstream release.",
],
cwd=pkg_dir,
)
return result.returncode == 0
def validate_package_info(package: str) -> SourceInfo:
"""Validate package exists in source info map.
Args:
package: Package name to validate
Returns:
Source info dictionary containing vcs_git and uploaders
Raises:
PackageNotFoundError: If package not found or has no vcs_git
"""
source_info_map = load_source_info_map()
source_info = source_info_map.get(package)
if not source_info or not source_info.get("vcs_git"):
raise PackageNotFoundError(
f"Could not find team info for package '{package}'"
)
return source_info
def resolve_package_directories(package: str, source_info: SourceInfo) -> tuple[Path, Path, Path]:
"""Resolve team_dir, pkg_dir, and repo_dir paths.
Args:
package: Package name
source_info: Source info dictionary with vcs_git
Returns:
Tuple of (team_dir, pkg_dir, repo_dir)
"""
team_slug = source_info["vcs_git"]
display_team = team_slug_to_display_name(team_slug).lower()
team_dir = DEBIAN_SRC_BASE / display_team
pkg_dir = team_dir / package
repo_dir = pkg_dir / package
return team_dir, pkg_dir, repo_dir
def ensure_package_checkout(package: str, source_info: SourceInfo, pkg_dir: Path) -> None:
"""Checkout package from salsa.
Args:
package: Package name
source_info: Source info dictionary with vcs_git
pkg_dir: Directory to check out into
Raises:
ExternalCommandError: If salsa checkout fails
RepositoryStateError: If directory creation fails
"""
try:
pkg_dir.mkdir(parents=True, exist_ok=True)
except OSError as e:
raise RepositoryStateError(
f"Failed to create directory {pkg_dir}: {e}"
) from e
print(f"Checking out {package} into {pkg_dir}...")
team_slug = source_info["vcs_git"]
salsa_path = f"{team_slug}/deps/{package}"
run_command(
["salsa", "checkout", salsa_path],
pkg_dir,
f"Checkout {salsa_path}",
)
def validate_repository_state(repo_dir: Path) -> None:
"""Validate repository has no uncommitted changes.
Args:
repo_dir: Repository directory to check
Raises:
RepositoryStateError: If there are uncommitted changes
"""
if has_uncommitted_changes(repo_dir):
raise RepositoryStateError(
f"{repo_dir} has uncommitted changes. "
"Please commit or stash them first."
)
def run_gbp_pq_workflow(repo_dir: Path) -> None:
"""Run gbp pq import and switch.
Args:
repo_dir: Repository directory
Raises:
ExternalCommandError: If gbp pq commands fail
"""
run_command(
["gbp", "pq", "import"],
repo_dir,
"Import patch queue",
)
run_command(
["gbp", "pq", "switch"],
repo_dir,
"Switch to patch branch",
)
def import_upstream_version(repo_dir: Path) -> None:
"""Import new upstream version using gbp.
Args:
repo_dir: Repository directory
Raises:
ExternalCommandError: If gbp import-orig fails
"""
print("Importing new upstream version...")
run_command(
["gbp", "import-orig", "--uscan", "--pristine-tar", "--no-interactive"],
repo_dir,
"Import upstream version",
)
def update_debian_control(repo_dir: Path) -> None:
"""Update debian/control file.
- Removes obsolete 'Priority: optional' (now default)
- Removes obsolete 'Rules-Requires-Root: no' (now default)
- Updates Standards-Version to current (4.7.3)
Args:
repo_dir: Repository directory
Raises:
ExternalCommandError: If dch command fails
"""
control_path = repo_dir / "debian" / "control"
if not control_path.exists():
return
lines = control_path.read_text().splitlines()
current_standards_version = "4.7.3"
new_lines = []
for line in lines:
if line == "Priority: optional":
run_command(
["dch", "-a", "Remove 'Priority: optional', now the default."],
repo_dir,
"Add changelog entry for Priority removal",
)
continue
if line == "Rules-Requires-Root: no":
run_command(
["dch", "-a", "Remove 'Rules-Requires-Root: no', now the default."],
repo_dir,
"Add changelog entry for Rules-Requires-Root removal",
)
continue
if line.startswith("Standards-Version: "):
standards_version = line[len("Standards-Version: "):]
if standards_version != current_standards_version:
line = "Standards-Version: " + current_standards_version
run_command(
["dch", "-a", "Update Standards-Version."],
repo_dir,
"Add changelog entry for Standards-Version update",
)
new_lines.append(line)
control_path.write_text("\n".join(new_lines) + "\n")
def update_debian_copyright_year(repo_dir: Path) -> None:
"""Update Edward Betts' copyright years in debian/copyright.
Transforms:
- "Copyright: 2025 Edward Betts <...>" -> "Copyright: 2025-2026 Edward Betts <...>"
- "Copyright: 2022-2024 Edward Betts <...>" -> "Copyright: 2022-2026 Edward Betts <...>"
Args:
repo_dir: Repository directory
Raises:
ExternalCommandError: If dch command fails
"""
copyright_path = repo_dir / "debian" / "copyright"
if not copyright_path.exists():
return
full_name_email = "Edward Betts <edward@4angle.com>"
year = datetime.date.today().year
original = copyright_path.read_text(encoding="utf-8")
holder_re = re.escape(full_name_email)
copyright_line_re = re.compile(
rf"^(?P<indent>\s*)"
rf"(?:(?P<label>Copyright:\s+))?"
rf"(?P<years>\d{{4}}(?:-\d{{4}})?)"
rf"(?P<suffix>\s+{holder_re}\s*)$"
)
copyright_field_start_re = re.compile(r"^\s*Copyright:\s+")
field_start_re = re.compile(r"^(?:\S[^:]*):\s*")
continuation_re = re.compile(r"^\s+")
def update_years_token(years: str) -> Optional[str]:
"""Return an updated years token, or None if no change is needed."""
if "-" in years:
start_s, end_s = years.split("-", 1)
start = int(start_s)
end = int(end_s)
else:
start = end = int(years)
if end >= year:
return None
return f"{start}-{year}" if start != year else str(year)
lines = original.splitlines(keepends=True)
out: list[str] = []
in_copyright_field = False
for line in lines:
stripped = line.rstrip("\n")
if field_start_re.match(stripped) and not copyright_field_start_re.match(stripped):
in_copyright_field = False
if copyright_field_start_re.match(stripped):
in_copyright_field = True
m = copyright_line_re.match(stripped)
should_consider = False
if m and m.group("label"):
should_consider = True
elif m and in_copyright_field and continuation_re.match(stripped):
should_consider = True
if should_consider and m:
new_years = update_years_token(m.group("years"))
if new_years is not None:
indent = m.group("indent")
label = m.group("label") or ""
suffix = m.group("suffix")
line = f"{indent}{label}{new_years}{suffix}\n"
out.append(line)
updated = "".join(out)
if updated != original:
copyright_path.write_text(updated, encoding="utf-8")
run_command(
["dch", "-a", "Update copyright year."],
repo_dir,
"Add changelog entry for copyright year update",
)
def update_debian_watch(repo_dir: Path) -> None:
"""Upgrade debian/watch from version 4 to version 5.
Supports GitHub and PyPI watch entries.
Args:
repo_dir: Repository directory
Raises:
ExternalCommandError: If dch command fails
"""
watch_path = repo_dir / "debian" / "watch"
if not watch_path.exists():
return
try:
original = watch_path.read_text(encoding="utf-8")
except FileNotFoundError:
return
# Already version 5
if re.search(r"(?im)^\s*Version:\s*5\s*$", original):
return
# GitHub URL patterns
github_url_re = re.compile(
r"""(?xi)
https?://github\.com/
(?P<owner>[^/\s]+)/(?P<repo>[^/\s#?]+)
(?:/(?:releases|tags))?
(?:[^\s]*)?
"""
)
# PyPI URL patterns
pypi_debian_net_re = re.compile(
r"""(?xi)
https?://pypi\.debian\.net/
(?P<name>[a-z0-9][a-z0-9._-]*)
/
"""
)
pypi_python_org_source_re = re.compile(
r"""(?xi)
https?://pypi(?:\.python)?\.org/
packages/source/
(?P<letter>[a-z0-9])/
(?P<name>[a-z0-9][a-z0-9._-]*)
/
"""
)
pythonhosted_re = re.compile(
r"""(?xi)
https?://files\.pythonhosted\.org/
packages/source/
(?P<letter>[a-z0-9])/
(?P<name>[a-z0-9][a-z0-9._-]*)
/
"""
)
# Join wrapped lines
joined_lines = []
buf = ""
for line in original.splitlines():
if line.rstrip().endswith("\\"):
buf += line.rstrip()[:-1] + " "
else:
joined_lines.append(buf + line)
buf = ""
if buf:
joined_lines.append(buf)
joined = "\n".join(joined_lines)
# Try GitHub first
m = github_url_re.search(joined)
if m:
owner, repo = m.group("owner"), m.group("repo")
if repo.endswith(".git"):
repo = repo[:-4]
new_body = (
"Version: 5\n"
"\n"
"Template: GitHub\n"
f"Owner: {owner}\n"
f"Project: {repo}\n"
)
else:
# Try PyPI
pypi_name = None
for pattern in [pypi_debian_net_re, pypi_python_org_source_re, pythonhosted_re]:
m = pattern.search(joined)
if m:
pypi_name = m.group("name")
break
if not pypi_name:
# Can't upgrade this watch file
return
new_body = f"Version: 5\n\nTemplate: Pypi\nDist: {pypi_name}\n"
# Preserve leading comments
leading = []
for line in original.splitlines(keepends=True):
if line.lstrip().startswith("#") or not line.strip():
leading.append(line)
continue
break
header = "".join(leading).rstrip()
if header:
header += "\n\n"
new_contents = header + new_body
if new_contents != original:
watch_path.write_text(new_contents, encoding="utf-8")
run_command(
["dch", "-a", "Update debian/watch to format version 5."],
repo_dir,
"Add changelog entry for debian/watch update",
)
def add_salsa_ci(repo_dir: Path) -> None:
"""Add debian/salsa-ci.yml if missing.
Args:
repo_dir: Repository directory
Raises:
ExternalCommandError: If git or dch commands fail
"""
salsa_ci_path = repo_dir / "debian" / "salsa-ci.yml"
if salsa_ci_path.exists():
return
control_path = repo_dir / "debian" / "control"
if not control_path.exists():
return
# Extract salsa repo from Vcs-Git
vcs_git_re = re.compile(r"^Vcs-Git: https://salsa\.debian\.org/(.*).git$")
salsa_repo = None
with control_path.open() as f:
for line in f:
if not line.startswith("Vcs-Git"):
continue
m = vcs_git_re.match(line.rstrip())
if m:
salsa_repo = m.group(1)
break
if not salsa_repo:
return
content = """---
include:
- https://salsa.debian.org/salsa-ci-team/pipeline/raw/master/recipes/debian.yml
"""
salsa_ci_path.write_text(content)
run_command(
["git", "add", str(salsa_ci_path)],
repo_dir,
"Add debian/salsa-ci.yml to git",
)
run_command(
["dch", "-a", "Add debian/salsa-ci.yml."],
repo_dir,
"Add changelog entry for salsa-ci.yml",
)
# Update salsa project settings for homeassistant-team
if salsa_repo.startswith("homeassistant-team"):
run_command(
[
"salsa",
"update_projects",
salsa_repo,
"--jobs",
"yes",
"--ci-config-path",
"debian/salsa-ci.yml",
],
repo_dir,
"Update salsa project CI settings",
check=False, # Don't fail if this doesn't work
)
def run_package_updates(repo_dir: Path) -> None:
"""Run all package update operations.
Performs the following updates:
- Update debian/control (remove obsolete fields, update Standards-Version)
- Update copyright year
- Upgrade debian/watch to version 5
- Add debian/salsa-ci.yml if missing
Args:
repo_dir: Repository directory
Raises:
ExternalCommandError: If any update command fails
"""
print("Updating package files...")
update_debian_control(repo_dir)
update_debian_copyright_year(repo_dir)
update_debian_watch(repo_dir)
add_salsa_ci(repo_dir)
def update_package(package: str) -> None:
"""Update a Debian package to a new upstream version.
Checks out the package if not already present, imports the new upstream
version using gbp, and creates a new changelog entry.
Raises:
PackageNotFoundError: If package not found in source info
RepositoryStateError: If repository has uncommitted changes
ExternalCommandError: If any external command fails
MissingToolError: If required tools are not available
"""
# Validate package exists and check required tools
source_info = validate_package_info(package)
check_required_tools()
# Resolve directory paths
team_dir, pkg_dir, repo_dir = resolve_package_directories(package, source_info)
# Checkout package if needed, or validate existing checkout
if not repo_dir.exists():
ensure_package_checkout(package, source_info, pkg_dir)
else:
validate_repository_state(repo_dir)
print(f"Package {package} already checked out at {pkg_dir}")
# Run gbp pq workflow
run_gbp_pq_workflow(repo_dir)
# Import new upstream version
import_upstream_version(repo_dir)
# Add changelog entry
print("Adding changelog entry...")
version = extract_upstream_version_from_git_log(repo_dir)
if not version:
raise ExternalCommandError(
"git log",
0,
"Could not find upstream version in git log.",
)
new_version = f"{version}-1"
run_command(
[
"dch",
"--release-heuristic",
"log",
"--newversion",
new_version,
"New upstream release.",
],
repo_dir,
"Add changelog entry",
)
# Run package updates
run_package_updates(repo_dir)
print(f"Successfully updated {package}")
print(repo_dir)
@click.group(invoke_without_command=True)
@click.pass_context
def cli(context: click.Context) -> None:
"""Track Debian packages with new upstream versions."""
if context.invoked_subcommand is None:
list_todos(include_prerelease=False)
@cli.command("list", help="List filtered new upstream todo entries.")
@click.option(
"--show-prerelease",
is_flag=True,
help="Include prerelease versions in the list output.",
)
def list_command(show_prerelease: bool) -> None:
list_todos(include_prerelease=show_prerelease)
@cli.command("update", help="Fetch the latest todo list and show changes.")
def update_command() -> None:
update_todos()
@cli.command("update-pkg", help="Update a package to a new upstream version.")
@click.argument("package")
def update_pkg_command(package: str) -> None:
try:
update_package(package)
except PackageUpdateError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def main() -> None:
"""Entry point for the CLI."""
cli()