agenda/agenda/schengen.py
Edward Betts 338d6ea067 Fix Schengen time tracking to use arrival date for long-haul flights
Use arrival date instead of departure date when entering Schengen countries
to correctly handle overnight flights that cross date boundaries.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-15 15:31:05 +02:00

342 lines
11 KiB
Python

"""Schengen area rolling time calculator for travel tracking."""
from datetime import date, datetime, timedelta
from .trip import depart_datetime
from .types import SchengenCalculation, SchengenStay, StrDict
# Schengen Area countries as of 2025
SCHENGEN_COUNTRIES = {
# EU countries in Schengen
"at", # Austria
"be", # Belgium
"bg", # Bulgaria (joined January 2025)
"hr", # Croatia
"cz", # Czech Republic
"dk", # Denmark
"ee", # Estonia
"fi", # Finland
"fr", # France
"de", # Germany
"gr", # Greece
"hu", # Hungary
"it", # Italy
"lv", # Latvia
"lt", # Lithuania
"lu", # Luxembourg
"mt", # Malta
"nl", # Netherlands
"pl", # Poland
"pt", # Portugal
"ro", # Romania (joined January 2025)
"sk", # Slovakia
"si", # Slovenia
"es", # Spain
"se", # Sweden
# Non-EU countries in Schengen
"is", # Iceland
"li", # Liechtenstein
"no", # Norway
"ch", # Switzerland
}
# Country code to emoji flag mapping
COUNTRY_FLAG_MAP = {
"at": "🇦🇹", # Austria
"be": "🇧🇪", # Belgium
"bg": "🇧🇬", # Bulgaria
"hr": "🇭🇷", # Croatia
"cz": "🇨🇿", # Czech Republic
"dk": "🇩🇰", # Denmark
"ee": "🇪🇪", # Estonia
"fi": "🇫🇮", # Finland
"fr": "🇫🇷", # France
"de": "🇩🇪", # Germany
"gr": "🇬🇷", # Greece
"hu": "🇭🇺", # Hungary
"it": "🇮🇹", # Italy
"lv": "🇱🇻", # Latvia
"lt": "🇱🇹", # Lithuania
"lu": "🇱🇺", # Luxembourg
"mt": "🇲🇹", # Malta
"nl": "🇳🇱", # Netherlands
"pl": "🇵🇱", # Poland
"pt": "🇵🇹", # Portugal
"ro": "🇷🇴", # Romania
"sk": "🇸🇰", # Slovakia
"si": "🇸🇮", # Slovenia
"es": "🇪🇸", # Spain
"se": "🇸🇪", # Sweden
"is": "🇮🇸", # Iceland
"li": "🇱🇮", # Liechtenstein
"no": "🇳🇴", # Norway
"ch": "🇨🇭", # Switzerland
}
def get_country_flag(country_code: str) -> str:
"""Get emoji flag for a country code."""
if not country_code or not isinstance(country_code, str):
return ""
return COUNTRY_FLAG_MAP.get(country_code.lower(), "")
def is_schengen_country(country_code: str) -> bool:
"""Check if a country is in the Schengen area."""
if not country_code or not isinstance(country_code, str):
return False
return country_code.lower() in SCHENGEN_COUNTRIES
def extract_schengen_stays_from_travel(
travel_items: list[StrDict],
) -> list[SchengenStay]:
"""Extract Schengen stays from travel items."""
stays: list[SchengenStay] = []
current_location = None
entry_date = None
# Handle empty travel items
if not travel_items:
return stays
# Sort travel items by departure date, filtering out items without depart date
sorted_items = sorted(
[item for item in travel_items if item.get("depart")],
key=lambda x: depart_datetime(x),
)
for item in sorted_items:
from_country = None
to_country = None
travel_date = item.get("depart")
if not travel_date:
continue
# Extract travel date
if isinstance(travel_date, datetime):
travel_date = travel_date.date()
elif not isinstance(travel_date, date):
# Skip items with invalid travel dates
continue
# Determine origin and destination countries
if item.get("type") == "flight":
from_airport = item.get("from_airport", {})
to_airport = item.get("to_airport", {})
from_country = from_airport.get("country")
to_country = to_airport.get("country")
elif item.get("type") == "train":
from_station = item.get("from_station", {})
to_station = item.get("to_station", {})
from_country = from_station.get("country")
to_country = to_station.get("country")
elif item.get("type") == "ferry":
from_terminal = item.get("from_terminal", {})
to_terminal = item.get("to_terminal", {})
from_country = from_terminal.get("country")
to_country = to_terminal.get("country")
# Handle entering/exiting Schengen
if current_location and is_schengen_country(current_location):
# Currently in Schengen
if to_country and not is_schengen_country(to_country):
# Exiting Schengen - use departure date
if entry_date:
stays.append(
SchengenStay(
entry_date=entry_date,
exit_date=travel_date,
country=current_location,
days=0, # Will be calculated in __post_init__
)
)
entry_date = None
else:
# Currently outside Schengen
if to_country and is_schengen_country(to_country):
# Entering Schengen - use arrival date for long-haul flights
entry_travel_date = travel_date
if item.get("arrive"):
arrive_date = item.get("arrive")
if isinstance(arrive_date, datetime):
arrive_date = arrive_date.date()
elif isinstance(arrive_date, date):
entry_travel_date = arrive_date
entry_date = entry_travel_date
current_location = to_country
# Handle case where still in Schengen
if current_location and is_schengen_country(current_location) and entry_date:
stays.append(
SchengenStay(
entry_date=entry_date,
exit_date=None, # Still in Schengen
country=current_location,
days=0, # Will be calculated in __post_init__
)
)
return stays
def calculate_schengen_time(
travel_items: list[StrDict], calculation_date: date | None = None
) -> SchengenCalculation:
"""
Calculate Schengen rolling time compliance.
Args:
travel_items: List of travel items from the trip system
calculation_date: Date to calculate from (defaults to today)
Returns:
SchengenCalculation with compliance status and details
"""
if calculation_date is None:
calculation_date = date.today()
# Extract Schengen stays from travel data
stays = extract_schengen_stays_from_travel(travel_items)
# Calculate 180-day window (ending on calculation_date)
window_start = calculation_date - timedelta(days=179)
window_end = calculation_date
# Find stays that overlap with the 180-day window
relevant_stays = []
total_days = 0
for stay in stays:
# Check if stay overlaps with our 180-day window
stay_start = stay.entry_date
stay_end = stay.exit_date or calculation_date
if stay_end >= window_start and stay_start <= window_end:
# Calculate overlap with window
overlap_start = max(stay_start, window_start)
overlap_end = min(stay_end, window_end)
overlap_days = (overlap_end - overlap_start).days + 1
if overlap_days > 0:
# Create a new stay object for the overlapping period
overlapping_stay = SchengenStay(
entry_date=overlap_start,
exit_date=overlap_end if overlap_end != calculation_date else None,
country=stay.country,
days=overlap_days,
)
relevant_stays.append(overlapping_stay)
total_days += overlap_days
# Calculate compliance
is_compliant = total_days <= 90
days_remaining = max(0, 90 - total_days)
# Calculate next reset date (when earliest stay in window expires)
next_reset_date = None
if relevant_stays:
earliest_stay = min(relevant_stays, key=lambda s: s.entry_date)
next_reset_date = earliest_stay.entry_date + timedelta(days=180)
return SchengenCalculation(
total_days_used=total_days,
days_remaining=days_remaining,
is_compliant=is_compliant,
current_180_day_period=(window_start, window_end),
stays_in_period=relevant_stays,
next_reset_date=next_reset_date,
)
def format_schengen_report(calculation: SchengenCalculation) -> str:
"""Format a human-readable Schengen compliance report."""
report = []
# Header
report.append("=== SCHENGEN AREA COMPLIANCE REPORT ===")
report.append(
f"Calculation period: {calculation.current_180_day_period[0]} to {calculation.current_180_day_period[1]}"
)
report.append("")
# Summary
status = "✅ COMPLIANT" if calculation.is_compliant else "❌ NON-COMPLIANT"
report.append(f"Status: {status}")
report.append(f"Days used: {calculation.total_days_used}/90")
if calculation.is_compliant:
report.append(f"Days remaining: {calculation.days_remaining}")
else:
report.append(f"Days over limit: {calculation.days_over_limit}")
if calculation.next_reset_date:
report.append(f"Next reset date: {calculation.next_reset_date}")
report.append("")
# Detailed stays
if calculation.stays_in_period:
report.append("Stays in current 180-day period:")
for stay in calculation.stays_in_period:
exit_str = (
stay.exit_date.strftime("%Y-%m-%d") if stay.exit_date else "ongoing"
)
report.append(
f"{stay.entry_date.strftime('%Y-%m-%d')} to {exit_str} "
f"({stay.country.upper()}): {stay.days} days"
)
else:
report.append("No Schengen stays in current 180-day period.")
return "\n".join(report)
def get_schengen_countries_list() -> list[str]:
"""Get a list of all Schengen area country codes."""
return sorted(list(SCHENGEN_COUNTRIES))
def predict_future_compliance(
travel_items: list[StrDict],
future_travel: list[tuple[date, date, str]], # (entry_date, exit_date, country)
) -> list[SchengenCalculation]:
"""
Predict future Schengen compliance with planned travel.
Args:
travel_items: Existing travel history
future_travel: List of planned trips as (entry_date, exit_date, country)
Returns:
List of SchengenCalculation objects for each planned trip
"""
predictions = []
# Create mock travel items for future trips
extended_travel = travel_items.copy()
for entry_date, exit_date, country in future_travel:
# Add entry
extended_travel.append(
{"type": "flight", "depart": entry_date, "to_airport": {"country": country}}
)
# Add exit
extended_travel.append(
{
"type": "flight",
"depart": exit_date,
"from_airport": {"country": country},
"to_airport": {"country": "gb"}, # Assuming return to UK
}
)
# Calculate compliance at the exit date
calculation = calculate_schengen_time(extended_travel, exit_date)
predictions.append(calculation)
return predictions