agenda/agenda/trip_schengen.py
Edward Betts 8363a581c7 Add trip linking and names to Schengen compliance report
- Add trip_date and trip_name fields to SchengenStay dataclass
- Implement extract_schengen_stays_with_trip_info() to associate stays with trips
- Update schengen_report.html to show trip names with clickable links
- Add Trip column to stays table and trip name column to compliance history
- Links navigate to individual trip pages using existing URL structure

Closes #197

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-15 15:06:09 +02:00

286 lines
9.3 KiB
Python

"""Integration of Schengen calculator with the existing trip system."""
import logging
import typing
from datetime import date, timedelta
import flask
from . import trip
from .schengen import calculate_schengen_time, extract_schengen_stays_from_travel
from .types import SchengenCalculation, SchengenStay, StrDict, Trip
def add_schengen_compliance_to_trip(trip_obj: Trip) -> Trip:
"""Add Schengen compliance information to a trip object."""
try:
# Calculate Schengen compliance for the trip
calculation = calculate_schengen_time(trip_obj.travel)
# Add the calculation to the trip object
trip_obj.schengen_compliance = calculation
except Exception as e:
# Log the error but don't fail the trip loading
logging.warning(
f"Failed to calculate Schengen compliance for trip {trip_obj.start}: {e}"
)
trip_obj.schengen_compliance = None
return trip_obj
def get_schengen_compliance_for_all_trips(
trip_list: list[Trip],
) -> dict[date, SchengenCalculation]:
"""Calculate Schengen compliance for all trips."""
compliance_by_date = {}
# Collect all travel items across all trips
all_travel_items = []
for trip_obj in trip_list:
all_travel_items.extend(trip_obj.travel)
# Calculate compliance for each trip's start date
for trip_obj in trip_list:
calculation = calculate_schengen_time(all_travel_items, trip_obj.start)
compliance_by_date[trip_obj.start] = calculation
return compliance_by_date
def generate_schengen_warnings(trip_list: list[Trip]) -> list[dict[str, typing.Any]]:
"""Generate warnings for potential Schengen violations."""
warnings = []
# Get compliance for all trips
compliance_by_date = get_schengen_compliance_for_all_trips(trip_list)
for trip_date, calculation in compliance_by_date.items():
if not calculation.is_compliant:
warnings.append(
{
"type": "schengen_violation",
"date": trip_date,
"message": f"Schengen violation on {trip_date}: {calculation.days_over_limit} days over limit",
"severity": "error",
"calculation": calculation,
}
)
elif calculation.days_remaining < 10:
warnings.append(
{
"type": "schengen_warning",
"date": trip_date,
"message": f"Low Schengen allowance on {trip_date}: only {calculation.days_remaining} days remaining",
"severity": "warning",
"calculation": calculation,
}
)
return warnings
def extract_schengen_stays_with_trip_info(
trip_list: list[Trip],
) -> list[SchengenStay]:
"""Extract Schengen stays from travel items with trip information."""
# Create a mapping of travel items to their trips
travel_to_trip_map = {}
for trip_obj in trip_list:
for travel_item in trip_obj.travel:
travel_to_trip_map[id(travel_item)] = trip_obj
# Collect all travel items
all_travel_items = []
for trip_obj in trip_list:
all_travel_items.extend(trip_obj.travel)
# Get stays with trip information
stays = extract_schengen_stays_from_travel(all_travel_items)
# Try to associate stays with trips based on entry dates
for stay in stays:
# Find the trip that contains this stay's entry date
for trip_obj in trip_list:
if trip_obj.start <= stay.entry_date <= (trip_obj.end or stay.entry_date):
stay.trip_date = trip_obj.start
stay.trip_name = trip_obj.title
break
# If no exact match, find the closest trip by start date
if stay.trip_date is None:
closest_trip = min(
trip_list,
key=lambda t: abs((t.start - stay.entry_date).days),
default=None,
)
if closest_trip:
stay.trip_date = closest_trip.start
stay.trip_name = closest_trip.title
return stays
def schengen_dashboard_data(data_dir: str | None = None) -> dict[str, typing.Any]:
"""Generate dashboard data for Schengen compliance."""
if data_dir is None:
data_dir = flask.current_app.config["PERSONAL_DATA"]
# Load all trips
trip_list = trip.build_trip_list(data_dir)
# Calculate current compliance with trip information
all_travel_items = []
for trip_obj in trip_list:
all_travel_items.extend(trip_obj.travel)
current_calculation = calculate_schengen_time(all_travel_items)
# Get stays with trip information
stays_with_trip_info = extract_schengen_stays_with_trip_info(trip_list)
# Update current calculation with trip information
current_calculation.stays_in_period = [
stay
for stay in stays_with_trip_info
if stay.entry_date >= current_calculation.current_180_day_period[0]
and stay.entry_date <= current_calculation.current_180_day_period[1]
]
# Generate warnings
warnings = generate_schengen_warnings(trip_list)
# Get compliance history for the last year
compliance_history = []
current_date = date.today()
for i in range(365, 0, -7): # Weekly snapshots for the last year
snapshot_date = current_date - timedelta(days=i)
calculation = calculate_schengen_time(all_travel_items, snapshot_date)
compliance_history.append(
{
"date": snapshot_date,
"days_used": calculation.total_days_used,
"is_compliant": calculation.is_compliant,
}
)
return {
"current_compliance": current_calculation,
"warnings": warnings,
"compliance_history": compliance_history,
"trips_with_compliance": get_schengen_compliance_for_all_trips(trip_list),
}
def flask_route_schengen_report() -> str:
"""Flask route for Schengen compliance report."""
data_dir = flask.current_app.config["PERSONAL_DATA"]
dashboard_data = schengen_dashboard_data(data_dir)
# Load trips for the template
trip_list = trip.build_trip_list(data_dir)
return flask.render_template(
"schengen_report.html", trip_list=trip_list, **dashboard_data
)
def export_schengen_data_for_external_calculator(
travel_items: list[StrDict],
) -> list[dict[str, typing.Any]]:
"""Export travel data in format suitable for external Schengen calculators."""
stays = extract_schengen_stays_from_travel(travel_items)
export_data = []
for stay in stays:
export_data.append(
{
"entry_date": stay.entry_date.strftime("%Y-%m-%d"),
"exit_date": (
stay.exit_date.strftime("%Y-%m-%d") if stay.exit_date else None
),
"country": stay.country.upper(),
"days": stay.days,
}
)
return export_data
def add_schengen_info_to_events(events: list, data_dir: str | None = None) -> list:
"""Add Schengen compliance info to event list."""
if data_dir is None:
data_dir = flask.current_app.config["PERSONAL_DATA"]
# Load all trips to get travel context
trip_list = trip.build_trip_list(data_dir)
all_travel_items = []
for trip_obj in trip_list:
all_travel_items.extend(trip_obj.travel)
# Add Schengen status to each event
for event in events:
if hasattr(event, "date"):
calculation = calculate_schengen_time(all_travel_items, event.date)
event.schengen_days_used = calculation.total_days_used
event.schengen_compliant = calculation.is_compliant
event.schengen_days_remaining = calculation.days_remaining
return events
# Integration with existing trip.py functions
def enhanced_build_trip_list(
data_dir: str | None = None,
route_distances: typing.Any = None,
include_schengen: bool = True,
) -> list[Trip]:
"""Enhanced version of build_trip_list that includes Schengen compliance."""
# Use the original function
trip_list = trip.build_trip_list(data_dir, route_distances)
if include_schengen:
# Add Schengen compliance to each trip
for trip_obj in trip_list:
add_schengen_compliance_to_trip(trip_obj)
return trip_list
def check_schengen_compliance_for_new_trip(
existing_trips: list[Trip],
new_trip_dates: tuple[date, date],
destination_country: str,
) -> SchengenCalculation:
"""Check if a new trip would violate Schengen rules."""
# Collect all existing travel
all_travel_items = []
for trip_obj in existing_trips:
all_travel_items.extend(trip_obj.travel)
# Add the new trip as mock travel items
entry_date, exit_date = new_trip_dates
# Mock entry to Schengen
all_travel_items.append(
{
"type": "flight",
"depart": entry_date,
"to_airport": {"country": destination_country},
}
)
# Mock exit from Schengen
all_travel_items.append(
{
"type": "flight",
"depart": exit_date,
"from_airport": {"country": destination_country},
"to_airport": {"country": "gb"}, # Assuming return to UK
}
)
# Calculate compliance at the exit date
return calculate_schengen_time(all_travel_items, exit_date)