diff --git a/agenda/schengen.py b/agenda/schengen.py new file mode 100644 index 0000000..81a9fad --- /dev/null +++ b/agenda/schengen.py @@ -0,0 +1,334 @@ +"""Schengen area rolling time calculator for travel tracking.""" + +from datetime import date, datetime, timedelta + +from .trip import depart_datetime +from .types import SchengenCalculation, SchengenStay, StrDict + +# Schengen Area countries as of 2025 +SCHENGEN_COUNTRIES = { + # EU countries in Schengen + "at", # Austria + "be", # Belgium + "bg", # Bulgaria (joined January 2025) + "hr", # Croatia + "cz", # Czech Republic + "dk", # Denmark + "ee", # Estonia + "fi", # Finland + "fr", # France + "de", # Germany + "gr", # Greece + "hu", # Hungary + "it", # Italy + "lv", # Latvia + "lt", # Lithuania + "lu", # Luxembourg + "mt", # Malta + "nl", # Netherlands + "pl", # Poland + "pt", # Portugal + "ro", # Romania (joined January 2025) + "sk", # Slovakia + "si", # Slovenia + "es", # Spain + "se", # Sweden + # Non-EU countries in Schengen + "is", # Iceland + "li", # Liechtenstein + "no", # Norway + "ch", # Switzerland +} + +# Country code to emoji flag mapping +COUNTRY_FLAG_MAP = { + "at": "🇦🇹", # Austria + "be": "🇧🇪", # Belgium + "bg": "🇧🇬", # Bulgaria + "hr": "🇭🇷", # Croatia + "cz": "🇨🇿", # Czech Republic + "dk": "🇩🇰", # Denmark + "ee": "🇪🇪", # Estonia + "fi": "🇫🇮", # Finland + "fr": "🇫🇷", # France + "de": "🇩🇪", # Germany + "gr": "🇬🇷", # Greece + "hu": "🇭🇺", # Hungary + "it": "🇮🇹", # Italy + "lv": "🇱🇻", # Latvia + "lt": "🇱🇹", # Lithuania + "lu": "🇱🇺", # Luxembourg + "mt": "🇲🇹", # Malta + "nl": "🇳🇱", # Netherlands + "pl": "🇵🇱", # Poland + "pt": "🇵🇹", # Portugal + "ro": "🇷🇴", # Romania + "sk": "🇸🇰", # Slovakia + "si": "🇸🇮", # Slovenia + "es": "🇪🇸", # Spain + "se": "🇸🇪", # Sweden + "is": "🇮🇸", # Iceland + "li": "🇱🇮", # Liechtenstein + "no": "🇳🇴", # Norway + "ch": "🇨🇭", # Switzerland +} + + +def get_country_flag(country_code: str) -> str: + """Get emoji flag for a country code.""" + if not country_code or not isinstance(country_code, str): + return "" + return COUNTRY_FLAG_MAP.get(country_code.lower(), "") + + +def is_schengen_country(country_code: str) -> bool: + """Check if a country is in the Schengen area.""" + if not country_code or not isinstance(country_code, str): + return False + return country_code.lower() in SCHENGEN_COUNTRIES + + +def extract_schengen_stays_from_travel( + travel_items: list[StrDict], +) -> list[SchengenStay]: + """Extract Schengen stays from travel items.""" + stays: list[SchengenStay] = [] + current_location = None + entry_date = None + + # Handle empty travel items + if not travel_items: + return stays + + # Sort travel items by departure date, filtering out items without depart date + sorted_items = sorted( + [item for item in travel_items if item.get("depart")], + key=lambda x: depart_datetime(x), + ) + + for item in sorted_items: + from_country = None + to_country = None + travel_date = item.get("depart") + + if not travel_date: + continue + + # Extract travel date + if isinstance(travel_date, datetime): + travel_date = travel_date.date() + elif not isinstance(travel_date, date): + # Skip items with invalid travel dates + continue + + # Determine origin and destination countries + if item.get("type") == "flight": + from_airport = item.get("from_airport", {}) + to_airport = item.get("to_airport", {}) + from_country = from_airport.get("country") + to_country = to_airport.get("country") + elif item.get("type") == "train": + from_station = item.get("from_station", {}) + to_station = item.get("to_station", {}) + from_country = from_station.get("country") + to_country = to_station.get("country") + elif item.get("type") == "ferry": + from_terminal = item.get("from_terminal", {}) + to_terminal = item.get("to_terminal", {}) + from_country = from_terminal.get("country") + to_country = to_terminal.get("country") + + # Handle entering/exiting Schengen + if current_location and is_schengen_country(current_location): + # Currently in Schengen + if to_country and not is_schengen_country(to_country): + # Exiting Schengen + if entry_date: + stays.append( + SchengenStay( + entry_date=entry_date, + exit_date=travel_date, + country=current_location, + days=0, # Will be calculated in __post_init__ + ) + ) + entry_date = None + else: + # Currently outside Schengen + if to_country and is_schengen_country(to_country): + # Entering Schengen + entry_date = travel_date + + current_location = to_country + + # Handle case where still in Schengen + if current_location and is_schengen_country(current_location) and entry_date: + stays.append( + SchengenStay( + entry_date=entry_date, + exit_date=None, # Still in Schengen + country=current_location, + days=0, # Will be calculated in __post_init__ + ) + ) + + return stays + + +def calculate_schengen_time( + travel_items: list[StrDict], calculation_date: date | None = None +) -> SchengenCalculation: + """ + Calculate Schengen rolling time compliance. + + Args: + travel_items: List of travel items from the trip system + calculation_date: Date to calculate from (defaults to today) + + Returns: + SchengenCalculation with compliance status and details + """ + if calculation_date is None: + calculation_date = date.today() + + # Extract Schengen stays from travel data + stays = extract_schengen_stays_from_travel(travel_items) + + # Calculate 180-day window (ending on calculation_date) + window_start = calculation_date - timedelta(days=179) + window_end = calculation_date + + # Find stays that overlap with the 180-day window + relevant_stays = [] + total_days = 0 + + for stay in stays: + # Check if stay overlaps with our 180-day window + stay_start = stay.entry_date + stay_end = stay.exit_date or calculation_date + + if stay_end >= window_start and stay_start <= window_end: + # Calculate overlap with window + overlap_start = max(stay_start, window_start) + overlap_end = min(stay_end, window_end) + overlap_days = (overlap_end - overlap_start).days + 1 + + if overlap_days > 0: + # Create a new stay object for the overlapping period + overlapping_stay = SchengenStay( + entry_date=overlap_start, + exit_date=overlap_end if overlap_end != calculation_date else None, + country=stay.country, + days=overlap_days, + ) + relevant_stays.append(overlapping_stay) + total_days += overlap_days + + # Calculate compliance + is_compliant = total_days <= 90 + days_remaining = max(0, 90 - total_days) + + # Calculate next reset date (when earliest stay in window expires) + next_reset_date = None + if relevant_stays: + earliest_stay = min(relevant_stays, key=lambda s: s.entry_date) + next_reset_date = earliest_stay.entry_date + timedelta(days=180) + + return SchengenCalculation( + total_days_used=total_days, + days_remaining=days_remaining, + is_compliant=is_compliant, + current_180_day_period=(window_start, window_end), + stays_in_period=relevant_stays, + next_reset_date=next_reset_date, + ) + + +def format_schengen_report(calculation: SchengenCalculation) -> str: + """Format a human-readable Schengen compliance report.""" + report = [] + + # Header + report.append("=== SCHENGEN AREA COMPLIANCE REPORT ===") + report.append( + f"Calculation period: {calculation.current_180_day_period[0]} to {calculation.current_180_day_period[1]}" + ) + report.append("") + + # Summary + status = "✅ COMPLIANT" if calculation.is_compliant else "❌ NON-COMPLIANT" + report.append(f"Status: {status}") + report.append(f"Days used: {calculation.total_days_used}/90") + + if calculation.is_compliant: + report.append(f"Days remaining: {calculation.days_remaining}") + else: + report.append(f"Days over limit: {calculation.days_over_limit}") + + if calculation.next_reset_date: + report.append(f"Next reset date: {calculation.next_reset_date}") + + report.append("") + + # Detailed stays + if calculation.stays_in_period: + report.append("Stays in current 180-day period:") + for stay in calculation.stays_in_period: + exit_str = ( + stay.exit_date.strftime("%Y-%m-%d") if stay.exit_date else "ongoing" + ) + report.append( + f" • {stay.entry_date.strftime('%Y-%m-%d')} to {exit_str} " + f"({stay.country.upper()}): {stay.days} days" + ) + else: + report.append("No Schengen stays in current 180-day period.") + + return "\n".join(report) + + +def get_schengen_countries_list() -> list[str]: + """Get a list of all Schengen area country codes.""" + return sorted(list(SCHENGEN_COUNTRIES)) + + +def predict_future_compliance( + travel_items: list[StrDict], + future_travel: list[tuple[date, date, str]], # (entry_date, exit_date, country) +) -> list[SchengenCalculation]: + """ + Predict future Schengen compliance with planned travel. + + Args: + travel_items: Existing travel history + future_travel: List of planned trips as (entry_date, exit_date, country) + + Returns: + List of SchengenCalculation objects for each planned trip + """ + predictions = [] + + # Create mock travel items for future trips + extended_travel = travel_items.copy() + + for entry_date, exit_date, country in future_travel: + # Add entry + extended_travel.append( + {"type": "flight", "depart": entry_date, "to_airport": {"country": country}} + ) + + # Add exit + extended_travel.append( + { + "type": "flight", + "depart": exit_date, + "from_airport": {"country": country}, + "to_airport": {"country": "gb"}, # Assuming return to UK + } + ) + + # Calculate compliance at the exit date + calculation = calculate_schengen_time(extended_travel, exit_date) + predictions.append(calculation) + + return predictions diff --git a/agenda/trip_schengen.py b/agenda/trip_schengen.py new file mode 100644 index 0000000..6a1e2d8 --- /dev/null +++ b/agenda/trip_schengen.py @@ -0,0 +1,229 @@ +"""Integration of Schengen calculator with the existing trip system.""" + +import typing +from datetime import date, timedelta + +import flask + +from . import trip +from .schengen import ( + SchengenCalculation, + calculate_schengen_time, + extract_schengen_stays_from_travel, +) +from .types import StrDict, Trip + + +def add_schengen_compliance_to_trip(trip_obj: Trip) -> Trip: + """Add Schengen compliance information to a trip object.""" + try: + # Calculate Schengen compliance for the trip + calculation = calculate_schengen_time(trip_obj.travel) + + # Add the calculation to the trip object + trip_obj.schengen_compliance = calculation + except Exception as e: + # Log the error but don't fail the trip loading + import logging + logging.warning(f"Failed to calculate Schengen compliance for trip {trip_obj.start}: {e}") + trip_obj.schengen_compliance = None + + return trip_obj + + +def get_schengen_compliance_for_all_trips( + trip_list: list[Trip], +) -> dict[date, SchengenCalculation]: + """Calculate Schengen compliance for all trips.""" + compliance_by_date = {} + + # Collect all travel items across all trips + all_travel_items = [] + for trip_obj in trip_list: + all_travel_items.extend(trip_obj.travel) + + # Calculate compliance for each trip's start date + for trip_obj in trip_list: + calculation = calculate_schengen_time(all_travel_items, trip_obj.start) + compliance_by_date[trip_obj.start] = calculation + + return compliance_by_date + + +def generate_schengen_warnings(trip_list: list[Trip]) -> list[dict[str, typing.Any]]: + """Generate warnings for potential Schengen violations.""" + warnings = [] + + # Get compliance for all trips + compliance_by_date = get_schengen_compliance_for_all_trips(trip_list) + + for trip_date, calculation in compliance_by_date.items(): + if not calculation.is_compliant: + warnings.append( + { + "type": "schengen_violation", + "date": trip_date, + "message": f"Schengen violation on {trip_date}: {calculation.days_over_limit} days over limit", + "severity": "error", + "calculation": calculation, + } + ) + elif calculation.days_remaining < 10: + warnings.append( + { + "type": "schengen_warning", + "date": trip_date, + "message": f"Low Schengen allowance on {trip_date}: only {calculation.days_remaining} days remaining", + "severity": "warning", + "calculation": calculation, + } + ) + + return warnings + + +def schengen_dashboard_data(data_dir: str | None = None) -> dict[str, typing.Any]: + """Generate dashboard data for Schengen compliance.""" + if data_dir is None: + data_dir = flask.current_app.config["PERSONAL_DATA"] + + # Load all trips + trip_list = trip.build_trip_list(data_dir) + + # Calculate current compliance + all_travel_items = [] + for trip_obj in trip_list: + all_travel_items.extend(trip_obj.travel) + + current_calculation = calculate_schengen_time(all_travel_items) + + # Generate warnings + warnings = generate_schengen_warnings(trip_list) + + # Get compliance history for the last year + compliance_history = [] + current_date = date.today() + for i in range(365, 0, -7): # Weekly snapshots for the last year + snapshot_date = current_date - timedelta(days=i) + calculation = calculate_schengen_time(all_travel_items, snapshot_date) + compliance_history.append( + { + "date": snapshot_date, + "days_used": calculation.total_days_used, + "is_compliant": calculation.is_compliant, + } + ) + + return { + "current_compliance": current_calculation, + "warnings": warnings, + "compliance_history": compliance_history, + "trips_with_compliance": get_schengen_compliance_for_all_trips(trip_list), + } + + +def flask_route_schengen_report(): + """Flask route for Schengen compliance report.""" + data_dir = flask.current_app.config["PERSONAL_DATA"] + dashboard_data = schengen_dashboard_data(data_dir) + return flask.render_template("schengen_report.html", **dashboard_data) + + +def export_schengen_data_for_external_calculator( + travel_items: list[StrDict], +) -> list[dict[str, typing.Any]]: + """Export travel data in format suitable for external Schengen calculators.""" + + stays = extract_schengen_stays_from_travel(travel_items) + + export_data = [] + for stay in stays: + export_data.append( + { + "entry_date": stay.entry_date.strftime("%Y-%m-%d"), + "exit_date": ( + stay.exit_date.strftime("%Y-%m-%d") if stay.exit_date else None + ), + "country": stay.country.upper(), + "days": stay.days, + } + ) + + return export_data + + +def add_schengen_info_to_events(events: list, data_dir: str | None = None) -> list: + """Add Schengen compliance info to event list.""" + if data_dir is None: + data_dir = flask.current_app.config["PERSONAL_DATA"] + + # Load all trips to get travel context + trip_list = trip.build_trip_list(data_dir) + all_travel_items = [] + for trip_obj in trip_list: + all_travel_items.extend(trip_obj.travel) + + # Add Schengen status to each event + for event in events: + if hasattr(event, "date"): + calculation = calculate_schengen_time(all_travel_items, event.date) + event.schengen_days_used = calculation.total_days_used + event.schengen_compliant = calculation.is_compliant + event.schengen_days_remaining = calculation.days_remaining + + return events + + +# Integration with existing trip.py functions +def enhanced_build_trip_list( + data_dir: str | None = None, + route_distances: typing.Any = None, + include_schengen: bool = True, +) -> list[Trip]: + """Enhanced version of build_trip_list that includes Schengen compliance.""" + # Use the original function + trip_list = trip.build_trip_list(data_dir, route_distances) + + if include_schengen: + # Add Schengen compliance to each trip + for trip_obj in trip_list: + add_schengen_compliance_to_trip(trip_obj) + + return trip_list + + +def check_schengen_compliance_for_new_trip( + existing_trips: list[Trip], + new_trip_dates: tuple[date, date], + destination_country: str, +) -> SchengenCalculation: + """Check if a new trip would violate Schengen rules.""" + # Collect all existing travel + all_travel_items = [] + for trip_obj in existing_trips: + all_travel_items.extend(trip_obj.travel) + + # Add the new trip as mock travel items + entry_date, exit_date = new_trip_dates + + # Mock entry to Schengen + all_travel_items.append( + { + "type": "flight", + "depart": entry_date, + "to_airport": {"country": destination_country}, + } + ) + + # Mock exit from Schengen + all_travel_items.append( + { + "type": "flight", + "depart": exit_date, + "from_airport": {"country": destination_country}, + "to_airport": {"country": "gb"}, # Assuming return to UK + } + ) + + # Calculate compliance at the exit date + return calculate_schengen_time(all_travel_items, exit_date) diff --git a/agenda/types.py b/agenda/types.py index 4f9d273..789c37c 100644 --- a/agenda/types.py +++ b/agenda/types.py @@ -2,6 +2,7 @@ import collections import datetime +from datetime import date import functools import typing from collections import defaultdict @@ -65,6 +66,7 @@ class Trip: flight_bookings: list[StrDict] = field(default_factory=list) name: str | None = None private: bool = False + schengen_compliance: typing.Optional["SchengenCalculation"] = None @property def title(self) -> str: @@ -365,3 +367,37 @@ class Holiday: if self.local_name and self.local_name != self.name else self.name ) + + +@dataclass +class SchengenStay: + """Represents a stay in the Schengen area.""" + + entry_date: date + exit_date: typing.Optional[date] # None if currently in Schengen + country: str + days: int + + def __post_init__(self) -> None: + if self.exit_date is None: + # Currently in Schengen, calculate days up to today + self.days = (date.today() - self.entry_date).days + 1 + else: + self.days = (self.exit_date - self.entry_date).days + 1 + + +@dataclass +class SchengenCalculation: + """Result of Schengen time calculation.""" + + total_days_used: int + days_remaining: int + is_compliant: bool + current_180_day_period: tuple[date, date] # (start, end) + stays_in_period: list["SchengenStay"] + next_reset_date: typing.Optional[date] # When the 180-day window resets + + @property + def days_over_limit(self) -> int: + """Days over the 90-day limit.""" + return max(0, self.total_days_used - 90) diff --git a/templates/macros.html b/templates/macros.html index 9a7b068..d5592a6 100644 --- a/templates/macros.html +++ b/templates/macros.html @@ -351,6 +351,18 @@
Total CO₂: {{ "{:,.1f}".format(total_co2_kg) }} kg
{% endif %} + {% if trip.schengen_compliance %} +
+ Schengen: + {% if trip.schengen_compliance.is_compliant %} + ✅ Compliant + {% else %} + ❌ Non-compliant + {% endif %} + ({{ trip.schengen_compliance.total_days_used }}/90 days used) +
+ {% endif %} + {{ conference_list(trip) }} {% for day, elements in trip.elements_grouped_by_day() %} diff --git a/templates/navbar.html b/templates/navbar.html index 52f2a3d..eed9c1f 100644 --- a/templates/navbar.html +++ b/templates/navbar.html @@ -14,6 +14,7 @@ {"endpoint": "weekends", "label": "Weekends" }, {"endpoint": "launch_list", "label": "Space launches" }, {"endpoint": "holiday_list", "label": "Holidays" }, + {"endpoint": "schengen_report", "label": "Schengen" }, ] + ([{"endpoint": "birthday_list", "label": "Birthdays" }] if g.user.is_authenticated else []) %} diff --git a/templates/schengen_report.html b/templates/schengen_report.html new file mode 100644 index 0000000..4311a0b --- /dev/null +++ b/templates/schengen_report.html @@ -0,0 +1,211 @@ +{% extends "base.html" %} + +{% set heading = "Schengen Area Compliance Report" %} + +{% block title %}{{ heading }} - Edward Betts{% endblock %} + +{% block content %} +
+

Schengen Area Compliance Report

+ +
+
+
+
+
Current Status
+
+
+
+
+
Compliance Status
+ {% if current_compliance.is_compliant %} + ✅ COMPLIANT + {% else %} + ❌ NON-COMPLIANT + {% endif %} +
+
+
Days Used
+
{{ current_compliance.total_days_used }}/90
+
+
+ +
+
+ {% if current_compliance.is_compliant %} +
Days Remaining
+
{{ current_compliance.days_remaining }}
+ {% else %} +
Days Over Limit
+
{{ current_compliance.days_over_limit }}
+ {% endif %} +
+
+ {% if current_compliance.next_reset_date %} +
Next Reset Date
+
{{ current_compliance.next_reset_date.strftime('%Y-%m-%d') }}
+ {% endif %} +
+
+ +
+
Current 180-day Period
+
+ {{ current_compliance.current_180_day_period[0].strftime('%Y-%m-%d') }} to + {{ current_compliance.current_180_day_period[1].strftime('%Y-%m-%d') }} +
+
+
+
+ + {% if current_compliance.stays_in_period %} +
+
+
Stays in Current 180-day Period
+
+
+
+ + + + + + + + + + + {% for stay in current_compliance.stays_in_period %} + + + + + + + {% endfor %} + +
Entry DateExit DateCountryDays
{{ stay.entry_date.strftime('%Y-%m-%d') }} + {% if stay.exit_date %} + {{ stay.exit_date.strftime('%Y-%m-%d') }} + {% else %} + ongoing + {% endif %} + {{ stay.country | country_flag }} {{ stay.country.upper() }}{{ stay.days }}
+
+
+
+ {% endif %} +
+ +
+ {% if warnings %} +
+
+
Warnings
+
+
+ {% for warning in warnings %} + + {% endfor %} +
+
+ {% endif %} + +
+
+
Compliance History
+
+
+
+ +
+
+
+
+
+ + {% if trips_with_compliance %} +
+
+
Trip Compliance History
+
+
+
+ + + + + + + + + + + {% for trip_date, calculation in trips_with_compliance.items() %} + + + + + + + {% endfor %} + +
Trip DateDays UsedDays RemainingStatus
{{ trip_date.strftime('%Y-%m-%d') }}{{ calculation.total_days_used }}/90{{ calculation.days_remaining }} + {% if calculation.is_compliant %} + Compliant + {% else %} + Non-compliant + {% endif %} +
+
+
+
+ {% endif %} +
+{% endblock %} + +{% block scripts %} + + +{% endblock %} \ No newline at end of file diff --git a/templates/trip_page.html b/templates/trip_page.html index 3f853ca..23d48c3 100644 --- a/templates/trip_page.html +++ b/templates/trip_page.html @@ -117,6 +117,25 @@ {% if delta %}
How long until trip: {{ delta }}
{% endif %} + + {% if trip.schengen_compliance %} +
+ Schengen Compliance: + {% if trip.schengen_compliance.is_compliant %} + ✅ Compliant + {% else %} + ❌ Non-compliant + {% endif %} +
+ {{ trip.schengen_compliance.total_days_used }}/90 days used + {% if trip.schengen_compliance.is_compliant %} + ({{ trip.schengen_compliance.days_remaining }} remaining) + {% else %} + ({{ trip.schengen_compliance.days_over_limit }} over limit) + {% endif %} +
+
+ {% endif %} {% for item in trip.conferences %} diff --git a/web_view.py b/web_view.py index 62222be..2042432 100755 --- a/web_view.py +++ b/web_view.py @@ -25,6 +25,7 @@ import agenda.holidays import agenda.stats import agenda.thespacedevs import agenda.trip +import agenda.trip_schengen import agenda.utils from agenda import calendar, format_list_with_ampersand, travel, uk_tz from agenda.types import StrDict, Trip @@ -36,6 +37,13 @@ app.config.from_object("config.default") agenda.error_mail.setup_error_mail(app) +@app.template_filter("country_flag") +def country_flag_filter(country_code: str) -> str: + """Convert country code to emoji flag.""" + from agenda.schengen import get_country_flag + return get_country_flag(country_code) + + @app.before_request def handle_auth() -> None: """Handle authentication and set global user.""" @@ -387,11 +395,17 @@ def get_trip_list( route_distances: agenda.travel.RouteDistances | None = None, ) -> list[Trip]: """Get list of trips respecting current authentication status.""" - return [ + trips = [ trip for trip in agenda.trip.build_trip_list(route_distances=route_distances) if flask.g.user.is_authenticated or not trip.private ] + + # Add Schengen compliance information to each trip + for trip in trips: + agenda.trip_schengen.add_schengen_compliance_to_trip(trip) + + return trips @app.route("/trip") @@ -527,6 +541,9 @@ def trip_page(start: str) -> str: prev_trip, trip, next_trip = get_prev_current_and_next_trip(start, trip_list) if not trip: flask.abort(404) + + # Add Schengen compliance information + trip = agenda.trip_schengen.add_schengen_compliance_to_trip(trip) coordinates = agenda.trip.collect_trip_coordinates(trip) routes = agenda.trip.get_trip_routes(trip, app.config["PERSONAL_DATA"]) @@ -603,6 +620,12 @@ def trip_stats() -> str: ) +@app.route("/schengen") +def schengen_report() -> str: + """Schengen compliance report.""" + return agenda.trip_schengen.flask_route_schengen_report() + + @app.route("/callback") def auth_callback() -> tuple[str, int] | werkzeug.Response: """Process the authentication callback."""