diff --git a/debian_todo/__init__.py b/debian_todo/__init__.py index cc141fc..ce2768b 100644 --- a/debian_todo/__init__.py +++ b/debian_todo/__init__.py @@ -5,12 +5,15 @@ packages where a new upstream version is available. Filters out pre-release versions and shows team/uploader metadata. """ +import datetime import json import re import glob import os +import shutil import subprocess import sys +import urllib.error from pathlib import Path from typing import Any, Optional, cast from urllib.request import urlopen @@ -38,6 +41,97 @@ HIDE_UPLOADER = "Edward Betts " DEBIAN_SRC_BASE = Path.home() / "src" / "debian" +class PackageUpdateError(Exception): + """Base exception for package update operations.""" + pass + + +class PackageNotFoundError(PackageUpdateError): + """Package not found in source info.""" + pass + + +class RepositoryStateError(PackageUpdateError): + """Repository in invalid state (uncommitted changes, etc).""" + pass + + +class ExternalCommandError(PackageUpdateError): + """External command failed (salsa, gbp, dch).""" + def __init__(self, command: str, returncode: int, message: str): + self.command = command + self.returncode = returncode + super().__init__(message) + + +class MissingToolError(PackageUpdateError): + """Required external tool not available.""" + pass + + +def run_command( + cmd: list[str], + cwd: Path, + description: str, + capture_output: bool = False, + check: bool = True, +) -> subprocess.CompletedProcess: + """Run command with consistent error handling. + + Args: + cmd: Command and arguments to run + cwd: Working directory for the command + description: Human-readable description of what the command does + capture_output: Whether to capture stdout/stderr + check: Whether to raise exception on non-zero exit + + Returns: + CompletedProcess instance + + Raises: + ExternalCommandError: If command fails and check=True + MissingToolError: If command not found + """ + try: + result = subprocess.run( + cmd, + cwd=cwd, + capture_output=capture_output, + text=True, + ) + if check and result.returncode != 0: + cmd_str = " ".join(cmd) + raise ExternalCommandError( + command=cmd_str, + returncode=result.returncode, + message=f"{description} failed: {cmd_str} (exit code {result.returncode})", + ) + return result + except FileNotFoundError as e: + raise MissingToolError( + f"Command not found: {cmd[0]}. Please install it to continue." + ) from e + + +def check_required_tools() -> None: + """Check that required external tools are available. + + Raises: + MissingToolError: If a required tool is not found + """ + required_tools = ["salsa", "gbp", "dch"] + missing_tools = [] + for tool in required_tools: + if not shutil.which(tool): + missing_tools.append(tool) + + if missing_tools: + tools_str = ", ".join(missing_tools) + raise MissingToolError( + f"Required tools not found: {tools_str}. Please install them to continue." + ) + + def parse_details(details: str) -> tuple[str, Optional[str]]: """Parse version details string into new and current versions. @@ -195,11 +289,40 @@ def load_source_info_map() -> dict[str, SourceInfo]: return vcs_by_source -def fetch_todo_list() -> TodoList: - """Fetch the TODO list from UDD as JSON.""" - with urlopen(TODO_URL) as response: - payload = response.read().decode("utf-8") - return cast(TodoList, json.loads(payload)) +def fetch_todo_list(timeout: int = 30) -> TodoList: + """Fetch the TODO list from UDD as JSON. + + Args: + timeout: Request timeout in seconds (default: 30) + + Returns: + List of TODO items + + Raises: + PackageUpdateError: If network request fails or JSON is invalid + """ + try: + with urlopen(TODO_URL, timeout=timeout) as response: + payload = response.read().decode("utf-8") + except urllib.error.HTTPError as e: + raise PackageUpdateError( + f"HTTP error {e.code} while fetching TODO list from {TODO_URL}: {e.reason}" + ) from e + except urllib.error.URLError as e: + raise PackageUpdateError( + f"Network error while fetching TODO list from {TODO_URL}: {e.reason}" + ) from e + except TimeoutError as e: + raise PackageUpdateError( + f"Timeout after {timeout}s while fetching TODO list from {TODO_URL}" + ) from e + + try: + return cast(TodoList, json.loads(payload)) + except json.JSONDecodeError as e: + raise PackageUpdateError( + f"Invalid JSON in TODO list response: {e}" + ) from e def save_todo_list(todo_list: TodoList) -> None: @@ -328,8 +451,13 @@ def list_todos(include_prerelease: bool) -> None: """ if not TODO_PATH.exists(): print("Downloading todo.json...") - todo_list = fetch_todo_list() - save_todo_list(todo_list) + try: + todo_list = fetch_todo_list() + save_todo_list(todo_list) + except PackageUpdateError as e: + print(f"Error: {e}", file=sys.stderr) + print("Please try again later or check your network connection.", file=sys.stderr) + sys.exit(1) else: with TODO_PATH.open("r", encoding="utf-8") as handle: todo_list = cast(TodoList, json.load(handle)) @@ -394,7 +522,14 @@ def update_todos() -> None: if TODO_PATH.exists(): with TODO_PATH.open("r", encoding="utf-8") as handle: old_list = cast(TodoList, json.load(handle)) - todo_list = fetch_todo_list() + + try: + todo_list = fetch_todo_list() + except PackageUpdateError as e: + print(f"Error: {e}", file=sys.stderr) + print("Failed to fetch updated TODO list.", file=sys.stderr) + sys.exit(1) + save_todo_list(todo_list) print_changes(filter_todo_list(old_list), filter_todo_list(todo_list)) @@ -464,21 +599,39 @@ def add_changelog_entry(pkg_dir: Path) -> bool: return result.returncode == 0 -def update_package(package: str) -> None: - """Update a Debian package to a new upstream version. +def validate_package_info(package: str) -> SourceInfo: + """Validate package exists in source info map. - Checks out the package if not already present, imports the new upstream - version using gbp, and creates a new changelog entry. + Args: + package: Package name to validate + + Returns: + Source info dictionary containing vcs_git and uploaders + + Raises: + PackageNotFoundError: If package not found or has no vcs_git """ source_info_map = load_source_info_map() source_info = source_info_map.get(package) if not source_info or not source_info.get("vcs_git"): - print( - f"Error: Could not find team info for package '{package}'", file=sys.stderr + raise PackageNotFoundError( + f"Could not find team info for package '{package}'" ) - sys.exit(1) + return source_info + + +def resolve_package_directories(package: str, source_info: SourceInfo) -> tuple[Path, Path, Path]: + """Resolve team_dir, pkg_dir, and repo_dir paths. + + Args: + package: Package name + source_info: Source info dictionary with vcs_git + + Returns: + Tuple of (team_dir, pkg_dir, repo_dir) + """ team_slug = source_info["vcs_git"] display_team = team_slug_to_display_name(team_slug).lower() @@ -486,46 +639,508 @@ def update_package(package: str) -> None: pkg_dir = team_dir / package repo_dir = pkg_dir / package - # Create package directory if it doesn't exist - pkg_dir.mkdir(parents=True, exist_ok=True) + return team_dir, pkg_dir, repo_dir - if not repo_dir.exists(): - # Package not checked out, do salsa checkout - print(f"Checking out {package} into {pkg_dir}...") - salsa_path = f"{team_slug}/deps/{package}" - result = subprocess.run( - ["salsa", "checkout", salsa_path], - cwd=pkg_dir, + +def ensure_package_checkout(package: str, source_info: SourceInfo, pkg_dir: Path) -> None: + """Checkout package from salsa. + + Args: + package: Package name + source_info: Source info dictionary with vcs_git + pkg_dir: Directory to check out into + + Raises: + ExternalCommandError: If salsa checkout fails + RepositoryStateError: If directory creation fails + """ + try: + pkg_dir.mkdir(parents=True, exist_ok=True) + except OSError as e: + raise RepositoryStateError( + f"Failed to create directory {pkg_dir}: {e}" + ) from e + + print(f"Checking out {package} into {pkg_dir}...") + team_slug = source_info["vcs_git"] + salsa_path = f"{team_slug}/deps/{package}" + run_command( + ["salsa", "checkout", salsa_path], + pkg_dir, + f"Checkout {salsa_path}", + ) + + +def validate_repository_state(repo_dir: Path) -> None: + """Validate repository has no uncommitted changes. + + Args: + repo_dir: Repository directory to check + + Raises: + RepositoryStateError: If there are uncommitted changes + """ + if has_uncommitted_changes(repo_dir): + raise RepositoryStateError( + f"{repo_dir} has uncommitted changes. " + "Please commit or stash them first." ) - if result.returncode != 0: - print(f"Error: salsa checkout failed for {salsa_path}", file=sys.stderr) - sys.exit(1) - else: - # Package already checked out, check for uncommitted changes - if has_uncommitted_changes(repo_dir): - print( - f"Error: {repo_dir} has uncommitted changes. " - "Please commit or stash them first.", - file=sys.stderr, + + +def run_gbp_pq_workflow(repo_dir: Path) -> None: + """Run gbp pq import and switch. + + Args: + repo_dir: Repository directory + + Raises: + ExternalCommandError: If gbp pq commands fail + """ + run_command( + ["gbp", "pq", "import"], + repo_dir, + "Import patch queue", + ) + + run_command( + ["gbp", "pq", "switch"], + repo_dir, + "Switch to patch branch", + ) + + +def import_upstream_version(repo_dir: Path) -> None: + """Import new upstream version using gbp. + + Args: + repo_dir: Repository directory + + Raises: + ExternalCommandError: If gbp import-orig fails + """ + print("Importing new upstream version...") + run_command( + ["gbp", "import-orig", "--uscan", "--pristine-tar", "--no-interactive"], + repo_dir, + "Import upstream version", + ) + + +def update_debian_control(repo_dir: Path) -> None: + """Update debian/control file. + + - Removes obsolete 'Priority: optional' (now default) + - Removes obsolete 'Rules-Requires-Root: no' (now default) + - Updates Standards-Version to current (4.7.3) + + Args: + repo_dir: Repository directory + + Raises: + ExternalCommandError: If dch command fails + """ + control_path = repo_dir / "debian" / "control" + if not control_path.exists(): + return + + lines = control_path.read_text().splitlines() + current_standards_version = "4.7.3" + new_lines = [] + + for line in lines: + if line == "Priority: optional": + run_command( + ["dch", "-a", "Remove 'Priority: optional', now the default."], + repo_dir, + "Add changelog entry for Priority removal", ) - sys.exit(1) + continue + if line == "Rules-Requires-Root: no": + run_command( + ["dch", "-a", "Remove 'Rules-Requires-Root: no', now the default."], + repo_dir, + "Add changelog entry for Rules-Requires-Root removal", + ) + continue + if line.startswith("Standards-Version: "): + standards_version = line[len("Standards-Version: "):] + if standards_version != current_standards_version: + line = "Standards-Version: " + current_standards_version + run_command( + ["dch", "-a", "Update Standards-Version."], + repo_dir, + "Add changelog entry for Standards-Version update", + ) + + new_lines.append(line) + + control_path.write_text("\n".join(new_lines) + "\n") + + +def update_debian_copyright_year(repo_dir: Path) -> None: + """Update Edward Betts' copyright years in debian/copyright. + + Transforms: + - "Copyright: 2025 Edward Betts <...>" -> "Copyright: 2025-2026 Edward Betts <...>" + - "Copyright: 2022-2024 Edward Betts <...>" -> "Copyright: 2022-2026 Edward Betts <...>" + + Args: + repo_dir: Repository directory + + Raises: + ExternalCommandError: If dch command fails + """ + copyright_path = repo_dir / "debian" / "copyright" + if not copyright_path.exists(): + return + + full_name_email = "Edward Betts " + year = datetime.date.today().year + original = copyright_path.read_text(encoding="utf-8") + + holder_re = re.escape(full_name_email) + copyright_line_re = re.compile( + rf"^(?P\s*)" + rf"(?:(?P