From 3b7a07e1d9a2e87813c181bbfcab35b8021a5ebb Mon Sep 17 00:00:00 2001 From: Edward Betts Date: Sun, 1 Feb 2026 16:55:50 +0000 Subject: [PATCH] Refactor update_package with comprehensive error handling and integrate update scripts. Add custom exception hierarchy (PackageUpdateError, PackageNotFoundError, RepositoryStateError, ExternalCommandError, MissingToolError) for specific error handling throughout package update operations. Extract helper functions from 90-line update_package function: - validate_package_info: Package validation - resolve_package_directories: Path resolution - ensure_package_checkout: Salsa checkout with error handling - validate_repository_state: Check for uncommitted changes - run_gbp_pq_workflow: Run gbp pq import/switch with error checks - import_upstream_version: Import upstream using gbp - run_command: Centralized subprocess execution with consistent errors - check_required_tools: Validate required tools upfront Incorporate update scripts as Python functions (replaces external shell scripts): - update_debian_control: Update Standards-Version, remove obsolete fields - update_debian_copyright_year: Update copyright years to current - update_debian_watch: Upgrade watch files from version 4 to 5 - add_salsa_ci: Add debian/salsa-ci.yml if missing - run_package_updates: Orchestrator for all update operations Enhance network error handling in fetch_todo_list: - Handle HTTP errors, network errors, timeouts, invalid JSON - Add timeout parameter (default 30s) - Graceful error messages in list_todos and update_todos Add comprehensive test coverage: - 67 tests for update-pkg functionality - Tests for all helper functions and update operations - Network error handling tests - 125 total tests, all passing Benefits: - No external script dependencies - Consistent error handling throughout - Better error messages with actionable guidance - All update logic in one maintainable codebase Co-Authored-By: Claude Sonnet 4.5 --- debian_todo/__init__.py | 713 ++++++++++++++++++++++++++++-- tests/test_update_pkg.py | 914 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 1580 insertions(+), 47 deletions(-) create mode 100644 tests/test_update_pkg.py diff --git a/debian_todo/__init__.py b/debian_todo/__init__.py index cc141fc..ce2768b 100644 --- a/debian_todo/__init__.py +++ b/debian_todo/__init__.py @@ -5,12 +5,15 @@ packages where a new upstream version is available. Filters out pre-release versions and shows team/uploader metadata. """ +import datetime import json import re import glob import os +import shutil import subprocess import sys +import urllib.error from pathlib import Path from typing import Any, Optional, cast from urllib.request import urlopen @@ -38,6 +41,97 @@ HIDE_UPLOADER = "Edward Betts " DEBIAN_SRC_BASE = Path.home() / "src" / "debian" +class PackageUpdateError(Exception): + """Base exception for package update operations.""" + pass + + +class PackageNotFoundError(PackageUpdateError): + """Package not found in source info.""" + pass + + +class RepositoryStateError(PackageUpdateError): + """Repository in invalid state (uncommitted changes, etc).""" + pass + + +class ExternalCommandError(PackageUpdateError): + """External command failed (salsa, gbp, dch).""" + def __init__(self, command: str, returncode: int, message: str): + self.command = command + self.returncode = returncode + super().__init__(message) + + +class MissingToolError(PackageUpdateError): + """Required external tool not available.""" + pass + + +def run_command( + cmd: list[str], + cwd: Path, + description: str, + capture_output: bool = False, + check: bool = True, +) -> subprocess.CompletedProcess: + """Run command with consistent error handling. + + Args: + cmd: Command and arguments to run + cwd: Working directory for the command + description: Human-readable description of what the command does + capture_output: Whether to capture stdout/stderr + check: Whether to raise exception on non-zero exit + + Returns: + CompletedProcess instance + + Raises: + ExternalCommandError: If command fails and check=True + MissingToolError: If command not found + """ + try: + result = subprocess.run( + cmd, + cwd=cwd, + capture_output=capture_output, + text=True, + ) + if check and result.returncode != 0: + cmd_str = " ".join(cmd) + raise ExternalCommandError( + command=cmd_str, + returncode=result.returncode, + message=f"{description} failed: {cmd_str} (exit code {result.returncode})", + ) + return result + except FileNotFoundError as e: + raise MissingToolError( + f"Command not found: {cmd[0]}. Please install it to continue." + ) from e + + +def check_required_tools() -> None: + """Check that required external tools are available. + + Raises: + MissingToolError: If a required tool is not found + """ + required_tools = ["salsa", "gbp", "dch"] + missing_tools = [] + for tool in required_tools: + if not shutil.which(tool): + missing_tools.append(tool) + + if missing_tools: + tools_str = ", ".join(missing_tools) + raise MissingToolError( + f"Required tools not found: {tools_str}. Please install them to continue." + ) + + def parse_details(details: str) -> tuple[str, Optional[str]]: """Parse version details string into new and current versions. @@ -195,11 +289,40 @@ def load_source_info_map() -> dict[str, SourceInfo]: return vcs_by_source -def fetch_todo_list() -> TodoList: - """Fetch the TODO list from UDD as JSON.""" - with urlopen(TODO_URL) as response: - payload = response.read().decode("utf-8") - return cast(TodoList, json.loads(payload)) +def fetch_todo_list(timeout: int = 30) -> TodoList: + """Fetch the TODO list from UDD as JSON. + + Args: + timeout: Request timeout in seconds (default: 30) + + Returns: + List of TODO items + + Raises: + PackageUpdateError: If network request fails or JSON is invalid + """ + try: + with urlopen(TODO_URL, timeout=timeout) as response: + payload = response.read().decode("utf-8") + except urllib.error.HTTPError as e: + raise PackageUpdateError( + f"HTTP error {e.code} while fetching TODO list from {TODO_URL}: {e.reason}" + ) from e + except urllib.error.URLError as e: + raise PackageUpdateError( + f"Network error while fetching TODO list from {TODO_URL}: {e.reason}" + ) from e + except TimeoutError as e: + raise PackageUpdateError( + f"Timeout after {timeout}s while fetching TODO list from {TODO_URL}" + ) from e + + try: + return cast(TodoList, json.loads(payload)) + except json.JSONDecodeError as e: + raise PackageUpdateError( + f"Invalid JSON in TODO list response: {e}" + ) from e def save_todo_list(todo_list: TodoList) -> None: @@ -328,8 +451,13 @@ def list_todos(include_prerelease: bool) -> None: """ if not TODO_PATH.exists(): print("Downloading todo.json...") - todo_list = fetch_todo_list() - save_todo_list(todo_list) + try: + todo_list = fetch_todo_list() + save_todo_list(todo_list) + except PackageUpdateError as e: + print(f"Error: {e}", file=sys.stderr) + print("Please try again later or check your network connection.", file=sys.stderr) + sys.exit(1) else: with TODO_PATH.open("r", encoding="utf-8") as handle: todo_list = cast(TodoList, json.load(handle)) @@ -394,7 +522,14 @@ def update_todos() -> None: if TODO_PATH.exists(): with TODO_PATH.open("r", encoding="utf-8") as handle: old_list = cast(TodoList, json.load(handle)) - todo_list = fetch_todo_list() + + try: + todo_list = fetch_todo_list() + except PackageUpdateError as e: + print(f"Error: {e}", file=sys.stderr) + print("Failed to fetch updated TODO list.", file=sys.stderr) + sys.exit(1) + save_todo_list(todo_list) print_changes(filter_todo_list(old_list), filter_todo_list(todo_list)) @@ -464,21 +599,39 @@ def add_changelog_entry(pkg_dir: Path) -> bool: return result.returncode == 0 -def update_package(package: str) -> None: - """Update a Debian package to a new upstream version. +def validate_package_info(package: str) -> SourceInfo: + """Validate package exists in source info map. - Checks out the package if not already present, imports the new upstream - version using gbp, and creates a new changelog entry. + Args: + package: Package name to validate + + Returns: + Source info dictionary containing vcs_git and uploaders + + Raises: + PackageNotFoundError: If package not found or has no vcs_git """ source_info_map = load_source_info_map() source_info = source_info_map.get(package) if not source_info or not source_info.get("vcs_git"): - print( - f"Error: Could not find team info for package '{package}'", file=sys.stderr + raise PackageNotFoundError( + f"Could not find team info for package '{package}'" ) - sys.exit(1) + return source_info + + +def resolve_package_directories(package: str, source_info: SourceInfo) -> tuple[Path, Path, Path]: + """Resolve team_dir, pkg_dir, and repo_dir paths. + + Args: + package: Package name + source_info: Source info dictionary with vcs_git + + Returns: + Tuple of (team_dir, pkg_dir, repo_dir) + """ team_slug = source_info["vcs_git"] display_team = team_slug_to_display_name(team_slug).lower() @@ -486,46 +639,508 @@ def update_package(package: str) -> None: pkg_dir = team_dir / package repo_dir = pkg_dir / package - # Create package directory if it doesn't exist - pkg_dir.mkdir(parents=True, exist_ok=True) + return team_dir, pkg_dir, repo_dir - if not repo_dir.exists(): - # Package not checked out, do salsa checkout - print(f"Checking out {package} into {pkg_dir}...") - salsa_path = f"{team_slug}/deps/{package}" - result = subprocess.run( - ["salsa", "checkout", salsa_path], - cwd=pkg_dir, + +def ensure_package_checkout(package: str, source_info: SourceInfo, pkg_dir: Path) -> None: + """Checkout package from salsa. + + Args: + package: Package name + source_info: Source info dictionary with vcs_git + pkg_dir: Directory to check out into + + Raises: + ExternalCommandError: If salsa checkout fails + RepositoryStateError: If directory creation fails + """ + try: + pkg_dir.mkdir(parents=True, exist_ok=True) + except OSError as e: + raise RepositoryStateError( + f"Failed to create directory {pkg_dir}: {e}" + ) from e + + print(f"Checking out {package} into {pkg_dir}...") + team_slug = source_info["vcs_git"] + salsa_path = f"{team_slug}/deps/{package}" + run_command( + ["salsa", "checkout", salsa_path], + pkg_dir, + f"Checkout {salsa_path}", + ) + + +def validate_repository_state(repo_dir: Path) -> None: + """Validate repository has no uncommitted changes. + + Args: + repo_dir: Repository directory to check + + Raises: + RepositoryStateError: If there are uncommitted changes + """ + if has_uncommitted_changes(repo_dir): + raise RepositoryStateError( + f"{repo_dir} has uncommitted changes. " + "Please commit or stash them first." ) - if result.returncode != 0: - print(f"Error: salsa checkout failed for {salsa_path}", file=sys.stderr) - sys.exit(1) - else: - # Package already checked out, check for uncommitted changes - if has_uncommitted_changes(repo_dir): - print( - f"Error: {repo_dir} has uncommitted changes. " - "Please commit or stash them first.", - file=sys.stderr, + + +def run_gbp_pq_workflow(repo_dir: Path) -> None: + """Run gbp pq import and switch. + + Args: + repo_dir: Repository directory + + Raises: + ExternalCommandError: If gbp pq commands fail + """ + run_command( + ["gbp", "pq", "import"], + repo_dir, + "Import patch queue", + ) + + run_command( + ["gbp", "pq", "switch"], + repo_dir, + "Switch to patch branch", + ) + + +def import_upstream_version(repo_dir: Path) -> None: + """Import new upstream version using gbp. + + Args: + repo_dir: Repository directory + + Raises: + ExternalCommandError: If gbp import-orig fails + """ + print("Importing new upstream version...") + run_command( + ["gbp", "import-orig", "--uscan", "--pristine-tar", "--no-interactive"], + repo_dir, + "Import upstream version", + ) + + +def update_debian_control(repo_dir: Path) -> None: + """Update debian/control file. + + - Removes obsolete 'Priority: optional' (now default) + - Removes obsolete 'Rules-Requires-Root: no' (now default) + - Updates Standards-Version to current (4.7.3) + + Args: + repo_dir: Repository directory + + Raises: + ExternalCommandError: If dch command fails + """ + control_path = repo_dir / "debian" / "control" + if not control_path.exists(): + return + + lines = control_path.read_text().splitlines() + current_standards_version = "4.7.3" + new_lines = [] + + for line in lines: + if line == "Priority: optional": + run_command( + ["dch", "-a", "Remove 'Priority: optional', now the default."], + repo_dir, + "Add changelog entry for Priority removal", ) - sys.exit(1) + continue + if line == "Rules-Requires-Root: no": + run_command( + ["dch", "-a", "Remove 'Rules-Requires-Root: no', now the default."], + repo_dir, + "Add changelog entry for Rules-Requires-Root removal", + ) + continue + if line.startswith("Standards-Version: "): + standards_version = line[len("Standards-Version: "):] + if standards_version != current_standards_version: + line = "Standards-Version: " + current_standards_version + run_command( + ["dch", "-a", "Update Standards-Version."], + repo_dir, + "Add changelog entry for Standards-Version update", + ) + + new_lines.append(line) + + control_path.write_text("\n".join(new_lines) + "\n") + + +def update_debian_copyright_year(repo_dir: Path) -> None: + """Update Edward Betts' copyright years in debian/copyright. + + Transforms: + - "Copyright: 2025 Edward Betts <...>" -> "Copyright: 2025-2026 Edward Betts <...>" + - "Copyright: 2022-2024 Edward Betts <...>" -> "Copyright: 2022-2026 Edward Betts <...>" + + Args: + repo_dir: Repository directory + + Raises: + ExternalCommandError: If dch command fails + """ + copyright_path = repo_dir / "debian" / "copyright" + if not copyright_path.exists(): + return + + full_name_email = "Edward Betts " + year = datetime.date.today().year + original = copyright_path.read_text(encoding="utf-8") + + holder_re = re.escape(full_name_email) + copyright_line_re = re.compile( + rf"^(?P\s*)" + rf"(?:(?P