agenda/agenda/trip_schengen.py
2025-11-03 19:31:14 +00:00

279 lines
8.8 KiB
Python

"""Integration of Schengen calculator with the existing trip system."""
import logging
import typing
from datetime import date, timedelta
import flask
from . import get_country, trip
from .schengen import (
SCHENGEN_COUNTRIES,
calculate_schengen_time,
extract_schengen_stays_from_travel,
)
from .types import SchengenCalculation, SchengenStay, StrDict, Trip
def trip_includes_schengen(trip: Trip) -> bool:
return bool({c.alpha_2.lower() for c in trip.countries} & SCHENGEN_COUNTRIES)
def add_schengen_compliance_to_trip(trip: Trip) -> Trip:
"""Add Schengen compliance information to a trip object."""
if not trip_includes_schengen(trip):
return trip
try:
# Calculate Schengen compliance for the trip
calculation = calculate_schengen_time(trip.travel)
# Add the calculation to the trip object
trip.schengen_compliance = calculation
except Exception as e:
# Log the error but don't fail the trip loading
logging.warning(
f"Failed to calculate Schengen compliance for trip {trip.start}: {e}"
)
trip.schengen_compliance = None
return trip
def get_schengen_compliance_for_all_trips(
trip_list: list[Trip],
) -> dict[date, SchengenCalculation]:
"""Calculate Schengen compliance for all trips."""
compliance_by_date = {}
# Collect all travel items across all trips
all_travel_items = []
for trip_obj in trip_list:
all_travel_items.extend(trip_obj.travel)
# Calculate compliance for each trip's start date
for trip_obj in trip_list:
calculation = calculate_schengen_time(all_travel_items, trip_obj.start)
compliance_by_date[trip_obj.start] = calculation
return compliance_by_date
def generate_schengen_warnings(trip_list: list[Trip]) -> list[dict[str, typing.Any]]:
"""Generate warnings for potential Schengen violations."""
warnings = []
# Get compliance for all trips
compliance_by_date = get_schengen_compliance_for_all_trips(trip_list)
for trip_date, calculation in compliance_by_date.items():
if not calculation.is_compliant:
warnings.append(
{
"type": "schengen_violation",
"date": trip_date,
"message": f"Schengen violation on {trip_date}: {calculation.days_over_limit} days over limit",
"severity": "error",
"calculation": calculation,
}
)
elif calculation.days_remaining < 10:
warnings.append(
{
"type": "schengen_warning",
"date": trip_date,
"message": f"Low Schengen allowance on {trip_date}: only {calculation.days_remaining} days remaining",
"severity": "warning",
"calculation": calculation,
}
)
return warnings
def extract_schengen_stays_with_trip_info(
trip_list: list[Trip],
) -> list[SchengenStay]:
"""Extract Schengen stays from travel items with trip information."""
# Create a mapping of travel items to their trips
travel_to_trip_map = {}
for trip_obj in trip_list:
for travel_item in trip_obj.travel:
travel_to_trip_map[id(travel_item)] = trip_obj
# Collect all travel items
all_travel_items = []
for trip_obj in trip_list:
all_travel_items.extend(trip_obj.travel)
# Get stays with trip information
stays = extract_schengen_stays_from_travel(all_travel_items)
# Try to associate stays with trips based on entry dates
for stay in stays:
# Find the trip that contains this stay's entry date
for trip_obj in trip_list:
if trip_obj.start <= stay.entry_date <= (trip_obj.end or stay.entry_date):
stay.trip_date = trip_obj.start
stay.trip_name = trip_obj.title
break
# If no exact match, find the closest trip by start date
if stay.trip_date is None:
closest_trip = min(
trip_list,
key=lambda t: abs((t.start - stay.entry_date).days),
default=None,
)
if closest_trip:
stay.trip_date = closest_trip.start
stay.trip_name = closest_trip.title
return stays
def schengen_dashboard_data(data_dir: str | None = None) -> dict[str, typing.Any]:
"""Generate dashboard data for Schengen compliance."""
if data_dir is None:
data_dir = flask.current_app.config["PERSONAL_DATA"]
# Load all trips
trip_list = [
trip for trip in trip.build_trip_list(data_dir) if trip_includes_schengen(trip)
]
# Calculate current compliance with trip information
all_travel_items = []
for trip_obj in trip_list:
all_travel_items.extend(trip_obj.travel)
current_calculation = calculate_schengen_time(all_travel_items)
# Get stays with trip information
stays_with_trip_info = extract_schengen_stays_with_trip_info(trip_list)
# Update current calculation with trip information
current_calculation.stays_in_period = [
stay
for stay in stays_with_trip_info
if stay.entry_date >= current_calculation.current_180_day_period[0]
and stay.entry_date <= current_calculation.current_180_day_period[1]
]
# Generate warnings
warnings = generate_schengen_warnings(trip_list)
# Get compliance history for the last year
compliance_history = []
current_date = date.today()
for i in range(365, 0, -7): # Weekly snapshots for the last year
snapshot_date = current_date - timedelta(days=i)
calculation = calculate_schengen_time(all_travel_items, snapshot_date)
compliance_history.append(
{
"date": snapshot_date,
"days_used": calculation.total_days_used,
"is_compliant": calculation.is_compliant,
}
)
return {
"current_compliance": current_calculation,
"warnings": warnings,
"compliance_history": compliance_history,
"trips_with_compliance": get_schengen_compliance_for_all_trips(trip_list),
}
def flask_route_schengen_report() -> str:
"""Flask route for Schengen compliance report."""
data_dir = flask.current_app.config["PERSONAL_DATA"]
dashboard_data = schengen_dashboard_data(data_dir)
# Load trips for the template
trip_list = trip.build_trip_list(data_dir)
return flask.render_template(
"schengen_report.html",
trip_list=trip_list,
get_country=get_country,
**dashboard_data,
)
def export_schengen_data_for_external_calculator(
travel_items: list[StrDict],
) -> list[dict[str, typing.Any]]:
"""Export travel data in format suitable for external Schengen calculators."""
stays = extract_schengen_stays_from_travel(travel_items)
export_data = []
for stay in stays:
export_data.append(
{
"entry_date": stay.entry_date.strftime("%Y-%m-%d"),
"exit_date": (
stay.exit_date.strftime("%Y-%m-%d") if stay.exit_date else None
),
"country": stay.country.upper(),
"days": stay.days,
}
)
return export_data
# Integration with existing trip.py functions
def enhanced_build_trip_list(
data_dir: str | None = None,
route_distances: typing.Any = None,
include_schengen: bool = True,
) -> list[Trip]:
"""Enhanced version of build_trip_list that includes Schengen compliance."""
# Use the original function
trip_list = trip.build_trip_list(data_dir, route_distances)
if include_schengen:
# Add Schengen compliance to each trip
for trip_obj in trip_list:
add_schengen_compliance_to_trip(trip_obj)
return trip_list
def check_schengen_compliance_for_new_trip(
existing_trips: list[Trip],
new_trip_dates: tuple[date, date],
destination_country: str,
) -> SchengenCalculation:
"""Check if a new trip would violate Schengen rules."""
# Collect all existing travel
all_travel_items = []
for trip_obj in existing_trips:
all_travel_items.extend(trip_obj.travel)
# Add the new trip as mock travel items
entry_date, exit_date = new_trip_dates
# Mock entry to Schengen
all_travel_items.append(
{
"type": "flight",
"depart": entry_date,
"to_airport": {"country": destination_country},
}
)
# Mock exit from Schengen
all_travel_items.append(
{
"type": "flight",
"depart": exit_date,
"from_airport": {"country": destination_country},
"to_airport": {"country": "gb"}, # Assuming return to UK
}
)
# Calculate compliance at the exit date
return calculate_schengen_time(all_travel_items, exit_date)