Implement Schengen Area Compliance Report

How many close am I to 90 days in the last 180.

Fixes #193.
This commit is contained in:
Edward Betts 2025-07-15 14:40:42 +02:00
parent 084e5f44e3
commit d2c4fa69ee
8 changed files with 866 additions and 1 deletions

334
agenda/schengen.py Normal file
View file

@ -0,0 +1,334 @@
"""Schengen area rolling time calculator for travel tracking."""
from datetime import date, datetime, timedelta
from .trip import depart_datetime
from .types import SchengenCalculation, SchengenStay, StrDict
# Schengen Area countries as of 2025
SCHENGEN_COUNTRIES = {
# EU countries in Schengen
"at", # Austria
"be", # Belgium
"bg", # Bulgaria (joined January 2025)
"hr", # Croatia
"cz", # Czech Republic
"dk", # Denmark
"ee", # Estonia
"fi", # Finland
"fr", # France
"de", # Germany
"gr", # Greece
"hu", # Hungary
"it", # Italy
"lv", # Latvia
"lt", # Lithuania
"lu", # Luxembourg
"mt", # Malta
"nl", # Netherlands
"pl", # Poland
"pt", # Portugal
"ro", # Romania (joined January 2025)
"sk", # Slovakia
"si", # Slovenia
"es", # Spain
"se", # Sweden
# Non-EU countries in Schengen
"is", # Iceland
"li", # Liechtenstein
"no", # Norway
"ch", # Switzerland
}
# Country code to emoji flag mapping
COUNTRY_FLAG_MAP = {
"at": "🇦🇹", # Austria
"be": "🇧🇪", # Belgium
"bg": "🇧🇬", # Bulgaria
"hr": "🇭🇷", # Croatia
"cz": "🇨🇿", # Czech Republic
"dk": "🇩🇰", # Denmark
"ee": "🇪🇪", # Estonia
"fi": "🇫🇮", # Finland
"fr": "🇫🇷", # France
"de": "🇩🇪", # Germany
"gr": "🇬🇷", # Greece
"hu": "🇭🇺", # Hungary
"it": "🇮🇹", # Italy
"lv": "🇱🇻", # Latvia
"lt": "🇱🇹", # Lithuania
"lu": "🇱🇺", # Luxembourg
"mt": "🇲🇹", # Malta
"nl": "🇳🇱", # Netherlands
"pl": "🇵🇱", # Poland
"pt": "🇵🇹", # Portugal
"ro": "🇷🇴", # Romania
"sk": "🇸🇰", # Slovakia
"si": "🇸🇮", # Slovenia
"es": "🇪🇸", # Spain
"se": "🇸🇪", # Sweden
"is": "🇮🇸", # Iceland
"li": "🇱🇮", # Liechtenstein
"no": "🇳🇴", # Norway
"ch": "🇨🇭", # Switzerland
}
def get_country_flag(country_code: str) -> str:
"""Get emoji flag for a country code."""
if not country_code or not isinstance(country_code, str):
return ""
return COUNTRY_FLAG_MAP.get(country_code.lower(), "")
def is_schengen_country(country_code: str) -> bool:
"""Check if a country is in the Schengen area."""
if not country_code or not isinstance(country_code, str):
return False
return country_code.lower() in SCHENGEN_COUNTRIES
def extract_schengen_stays_from_travel(
travel_items: list[StrDict],
) -> list[SchengenStay]:
"""Extract Schengen stays from travel items."""
stays: list[SchengenStay] = []
current_location = None
entry_date = None
# Handle empty travel items
if not travel_items:
return stays
# Sort travel items by departure date, filtering out items without depart date
sorted_items = sorted(
[item for item in travel_items if item.get("depart")],
key=lambda x: depart_datetime(x),
)
for item in sorted_items:
from_country = None
to_country = None
travel_date = item.get("depart")
if not travel_date:
continue
# Extract travel date
if isinstance(travel_date, datetime):
travel_date = travel_date.date()
elif not isinstance(travel_date, date):
# Skip items with invalid travel dates
continue
# Determine origin and destination countries
if item.get("type") == "flight":
from_airport = item.get("from_airport", {})
to_airport = item.get("to_airport", {})
from_country = from_airport.get("country")
to_country = to_airport.get("country")
elif item.get("type") == "train":
from_station = item.get("from_station", {})
to_station = item.get("to_station", {})
from_country = from_station.get("country")
to_country = to_station.get("country")
elif item.get("type") == "ferry":
from_terminal = item.get("from_terminal", {})
to_terminal = item.get("to_terminal", {})
from_country = from_terminal.get("country")
to_country = to_terminal.get("country")
# Handle entering/exiting Schengen
if current_location and is_schengen_country(current_location):
# Currently in Schengen
if to_country and not is_schengen_country(to_country):
# Exiting Schengen
if entry_date:
stays.append(
SchengenStay(
entry_date=entry_date,
exit_date=travel_date,
country=current_location,
days=0, # Will be calculated in __post_init__
)
)
entry_date = None
else:
# Currently outside Schengen
if to_country and is_schengen_country(to_country):
# Entering Schengen
entry_date = travel_date
current_location = to_country
# Handle case where still in Schengen
if current_location and is_schengen_country(current_location) and entry_date:
stays.append(
SchengenStay(
entry_date=entry_date,
exit_date=None, # Still in Schengen
country=current_location,
days=0, # Will be calculated in __post_init__
)
)
return stays
def calculate_schengen_time(
travel_items: list[StrDict], calculation_date: date | None = None
) -> SchengenCalculation:
"""
Calculate Schengen rolling time compliance.
Args:
travel_items: List of travel items from the trip system
calculation_date: Date to calculate from (defaults to today)
Returns:
SchengenCalculation with compliance status and details
"""
if calculation_date is None:
calculation_date = date.today()
# Extract Schengen stays from travel data
stays = extract_schengen_stays_from_travel(travel_items)
# Calculate 180-day window (ending on calculation_date)
window_start = calculation_date - timedelta(days=179)
window_end = calculation_date
# Find stays that overlap with the 180-day window
relevant_stays = []
total_days = 0
for stay in stays:
# Check if stay overlaps with our 180-day window
stay_start = stay.entry_date
stay_end = stay.exit_date or calculation_date
if stay_end >= window_start and stay_start <= window_end:
# Calculate overlap with window
overlap_start = max(stay_start, window_start)
overlap_end = min(stay_end, window_end)
overlap_days = (overlap_end - overlap_start).days + 1
if overlap_days > 0:
# Create a new stay object for the overlapping period
overlapping_stay = SchengenStay(
entry_date=overlap_start,
exit_date=overlap_end if overlap_end != calculation_date else None,
country=stay.country,
days=overlap_days,
)
relevant_stays.append(overlapping_stay)
total_days += overlap_days
# Calculate compliance
is_compliant = total_days <= 90
days_remaining = max(0, 90 - total_days)
# Calculate next reset date (when earliest stay in window expires)
next_reset_date = None
if relevant_stays:
earliest_stay = min(relevant_stays, key=lambda s: s.entry_date)
next_reset_date = earliest_stay.entry_date + timedelta(days=180)
return SchengenCalculation(
total_days_used=total_days,
days_remaining=days_remaining,
is_compliant=is_compliant,
current_180_day_period=(window_start, window_end),
stays_in_period=relevant_stays,
next_reset_date=next_reset_date,
)
def format_schengen_report(calculation: SchengenCalculation) -> str:
"""Format a human-readable Schengen compliance report."""
report = []
# Header
report.append("=== SCHENGEN AREA COMPLIANCE REPORT ===")
report.append(
f"Calculation period: {calculation.current_180_day_period[0]} to {calculation.current_180_day_period[1]}"
)
report.append("")
# Summary
status = "✅ COMPLIANT" if calculation.is_compliant else "❌ NON-COMPLIANT"
report.append(f"Status: {status}")
report.append(f"Days used: {calculation.total_days_used}/90")
if calculation.is_compliant:
report.append(f"Days remaining: {calculation.days_remaining}")
else:
report.append(f"Days over limit: {calculation.days_over_limit}")
if calculation.next_reset_date:
report.append(f"Next reset date: {calculation.next_reset_date}")
report.append("")
# Detailed stays
if calculation.stays_in_period:
report.append("Stays in current 180-day period:")
for stay in calculation.stays_in_period:
exit_str = (
stay.exit_date.strftime("%Y-%m-%d") if stay.exit_date else "ongoing"
)
report.append(
f"{stay.entry_date.strftime('%Y-%m-%d')} to {exit_str} "
f"({stay.country.upper()}): {stay.days} days"
)
else:
report.append("No Schengen stays in current 180-day period.")
return "\n".join(report)
def get_schengen_countries_list() -> list[str]:
"""Get a list of all Schengen area country codes."""
return sorted(list(SCHENGEN_COUNTRIES))
def predict_future_compliance(
travel_items: list[StrDict],
future_travel: list[tuple[date, date, str]], # (entry_date, exit_date, country)
) -> list[SchengenCalculation]:
"""
Predict future Schengen compliance with planned travel.
Args:
travel_items: Existing travel history
future_travel: List of planned trips as (entry_date, exit_date, country)
Returns:
List of SchengenCalculation objects for each planned trip
"""
predictions = []
# Create mock travel items for future trips
extended_travel = travel_items.copy()
for entry_date, exit_date, country in future_travel:
# Add entry
extended_travel.append(
{"type": "flight", "depart": entry_date, "to_airport": {"country": country}}
)
# Add exit
extended_travel.append(
{
"type": "flight",
"depart": exit_date,
"from_airport": {"country": country},
"to_airport": {"country": "gb"}, # Assuming return to UK
}
)
# Calculate compliance at the exit date
calculation = calculate_schengen_time(extended_travel, exit_date)
predictions.append(calculation)
return predictions

229
agenda/trip_schengen.py Normal file
View file

@ -0,0 +1,229 @@
"""Integration of Schengen calculator with the existing trip system."""
import typing
from datetime import date, timedelta
import flask
from . import trip
from .schengen import (
SchengenCalculation,
calculate_schengen_time,
extract_schengen_stays_from_travel,
)
from .types import StrDict, Trip
def add_schengen_compliance_to_trip(trip_obj: Trip) -> Trip:
"""Add Schengen compliance information to a trip object."""
try:
# Calculate Schengen compliance for the trip
calculation = calculate_schengen_time(trip_obj.travel)
# Add the calculation to the trip object
trip_obj.schengen_compliance = calculation
except Exception as e:
# Log the error but don't fail the trip loading
import logging
logging.warning(f"Failed to calculate Schengen compliance for trip {trip_obj.start}: {e}")
trip_obj.schengen_compliance = None
return trip_obj
def get_schengen_compliance_for_all_trips(
trip_list: list[Trip],
) -> dict[date, SchengenCalculation]:
"""Calculate Schengen compliance for all trips."""
compliance_by_date = {}
# Collect all travel items across all trips
all_travel_items = []
for trip_obj in trip_list:
all_travel_items.extend(trip_obj.travel)
# Calculate compliance for each trip's start date
for trip_obj in trip_list:
calculation = calculate_schengen_time(all_travel_items, trip_obj.start)
compliance_by_date[trip_obj.start] = calculation
return compliance_by_date
def generate_schengen_warnings(trip_list: list[Trip]) -> list[dict[str, typing.Any]]:
"""Generate warnings for potential Schengen violations."""
warnings = []
# Get compliance for all trips
compliance_by_date = get_schengen_compliance_for_all_trips(trip_list)
for trip_date, calculation in compliance_by_date.items():
if not calculation.is_compliant:
warnings.append(
{
"type": "schengen_violation",
"date": trip_date,
"message": f"Schengen violation on {trip_date}: {calculation.days_over_limit} days over limit",
"severity": "error",
"calculation": calculation,
}
)
elif calculation.days_remaining < 10:
warnings.append(
{
"type": "schengen_warning",
"date": trip_date,
"message": f"Low Schengen allowance on {trip_date}: only {calculation.days_remaining} days remaining",
"severity": "warning",
"calculation": calculation,
}
)
return warnings
def schengen_dashboard_data(data_dir: str | None = None) -> dict[str, typing.Any]:
"""Generate dashboard data for Schengen compliance."""
if data_dir is None:
data_dir = flask.current_app.config["PERSONAL_DATA"]
# Load all trips
trip_list = trip.build_trip_list(data_dir)
# Calculate current compliance
all_travel_items = []
for trip_obj in trip_list:
all_travel_items.extend(trip_obj.travel)
current_calculation = calculate_schengen_time(all_travel_items)
# Generate warnings
warnings = generate_schengen_warnings(trip_list)
# Get compliance history for the last year
compliance_history = []
current_date = date.today()
for i in range(365, 0, -7): # Weekly snapshots for the last year
snapshot_date = current_date - timedelta(days=i)
calculation = calculate_schengen_time(all_travel_items, snapshot_date)
compliance_history.append(
{
"date": snapshot_date,
"days_used": calculation.total_days_used,
"is_compliant": calculation.is_compliant,
}
)
return {
"current_compliance": current_calculation,
"warnings": warnings,
"compliance_history": compliance_history,
"trips_with_compliance": get_schengen_compliance_for_all_trips(trip_list),
}
def flask_route_schengen_report():
"""Flask route for Schengen compliance report."""
data_dir = flask.current_app.config["PERSONAL_DATA"]
dashboard_data = schengen_dashboard_data(data_dir)
return flask.render_template("schengen_report.html", **dashboard_data)
def export_schengen_data_for_external_calculator(
travel_items: list[StrDict],
) -> list[dict[str, typing.Any]]:
"""Export travel data in format suitable for external Schengen calculators."""
stays = extract_schengen_stays_from_travel(travel_items)
export_data = []
for stay in stays:
export_data.append(
{
"entry_date": stay.entry_date.strftime("%Y-%m-%d"),
"exit_date": (
stay.exit_date.strftime("%Y-%m-%d") if stay.exit_date else None
),
"country": stay.country.upper(),
"days": stay.days,
}
)
return export_data
def add_schengen_info_to_events(events: list, data_dir: str | None = None) -> list:
"""Add Schengen compliance info to event list."""
if data_dir is None:
data_dir = flask.current_app.config["PERSONAL_DATA"]
# Load all trips to get travel context
trip_list = trip.build_trip_list(data_dir)
all_travel_items = []
for trip_obj in trip_list:
all_travel_items.extend(trip_obj.travel)
# Add Schengen status to each event
for event in events:
if hasattr(event, "date"):
calculation = calculate_schengen_time(all_travel_items, event.date)
event.schengen_days_used = calculation.total_days_used
event.schengen_compliant = calculation.is_compliant
event.schengen_days_remaining = calculation.days_remaining
return events
# Integration with existing trip.py functions
def enhanced_build_trip_list(
data_dir: str | None = None,
route_distances: typing.Any = None,
include_schengen: bool = True,
) -> list[Trip]:
"""Enhanced version of build_trip_list that includes Schengen compliance."""
# Use the original function
trip_list = trip.build_trip_list(data_dir, route_distances)
if include_schengen:
# Add Schengen compliance to each trip
for trip_obj in trip_list:
add_schengen_compliance_to_trip(trip_obj)
return trip_list
def check_schengen_compliance_for_new_trip(
existing_trips: list[Trip],
new_trip_dates: tuple[date, date],
destination_country: str,
) -> SchengenCalculation:
"""Check if a new trip would violate Schengen rules."""
# Collect all existing travel
all_travel_items = []
for trip_obj in existing_trips:
all_travel_items.extend(trip_obj.travel)
# Add the new trip as mock travel items
entry_date, exit_date = new_trip_dates
# Mock entry to Schengen
all_travel_items.append(
{
"type": "flight",
"depart": entry_date,
"to_airport": {"country": destination_country},
}
)
# Mock exit from Schengen
all_travel_items.append(
{
"type": "flight",
"depart": exit_date,
"from_airport": {"country": destination_country},
"to_airport": {"country": "gb"}, # Assuming return to UK
}
)
# Calculate compliance at the exit date
return calculate_schengen_time(all_travel_items, exit_date)

View file

@ -2,6 +2,7 @@
import collections
import datetime
from datetime import date
import functools
import typing
from collections import defaultdict
@ -65,6 +66,7 @@ class Trip:
flight_bookings: list[StrDict] = field(default_factory=list)
name: str | None = None
private: bool = False
schengen_compliance: typing.Optional["SchengenCalculation"] = None
@property
def title(self) -> str:
@ -365,3 +367,37 @@ class Holiday:
if self.local_name and self.local_name != self.name
else self.name
)
@dataclass
class SchengenStay:
"""Represents a stay in the Schengen area."""
entry_date: date
exit_date: typing.Optional[date] # None if currently in Schengen
country: str
days: int
def __post_init__(self) -> None:
if self.exit_date is None:
# Currently in Schengen, calculate days up to today
self.days = (date.today() - self.entry_date).days + 1
else:
self.days = (self.exit_date - self.entry_date).days + 1
@dataclass
class SchengenCalculation:
"""Result of Schengen time calculation."""
total_days_used: int
days_remaining: int
is_compliant: bool
current_180_day_period: tuple[date, date] # (start, end)
stays_in_period: list["SchengenStay"]
next_reset_date: typing.Optional[date] # When the 180-day window resets
@property
def days_over_limit(self) -> int:
"""Days over the 90-day limit."""
return max(0, self.total_days_used - 90)

View file

@ -351,6 +351,18 @@
<div>Total CO₂: {{ "{:,.1f}".format(total_co2_kg) }} kg</div>
{% endif %}
{% if trip.schengen_compliance %}
<div>
<strong>Schengen:</strong>
{% if trip.schengen_compliance.is_compliant %}
<span class="badge bg-success">✅ Compliant</span>
{% else %}
<span class="badge bg-danger">❌ Non-compliant</span>
{% endif %}
<span class="text-muted small">({{ trip.schengen_compliance.total_days_used }}/90 days used)</span>
</div>
{% endif %}
{{ conference_list(trip) }}
{% for day, elements in trip.elements_grouped_by_day() %}

View file

@ -14,6 +14,7 @@
{"endpoint": "weekends", "label": "Weekends" },
{"endpoint": "launch_list", "label": "Space launches" },
{"endpoint": "holiday_list", "label": "Holidays" },
{"endpoint": "schengen_report", "label": "Schengen" },
] + ([{"endpoint": "birthday_list", "label": "Birthdays" }]
if g.user.is_authenticated else [])
%}

View file

@ -0,0 +1,211 @@
{% extends "base.html" %}
{% set heading = "Schengen Area Compliance Report" %}
{% block title %}{{ heading }} - Edward Betts{% endblock %}
{% block content %}
<div class="container-fluid">
<h1>Schengen Area Compliance Report</h1>
<div class="row">
<div class="col-md-8">
<div class="card mb-4">
<div class="card-header">
<h5 class="card-title">Current Status</h5>
</div>
<div class="card-body">
<div class="row">
<div class="col-md-6">
<h6>Compliance Status</h6>
{% if current_compliance.is_compliant %}
<span class="badge bg-success fs-6">✅ COMPLIANT</span>
{% else %}
<span class="badge bg-danger fs-6">❌ NON-COMPLIANT</span>
{% endif %}
</div>
<div class="col-md-6">
<h6>Days Used</h6>
<div class="fs-4">{{ current_compliance.total_days_used }}/90</div>
</div>
</div>
<div class="row mt-3">
<div class="col-md-6">
{% if current_compliance.is_compliant %}
<h6>Days Remaining</h6>
<div class="fs-5 text-success">{{ current_compliance.days_remaining }}</div>
{% else %}
<h6>Days Over Limit</h6>
<div class="fs-5 text-danger">{{ current_compliance.days_over_limit }}</div>
{% endif %}
</div>
<div class="col-md-6">
{% if current_compliance.next_reset_date %}
<h6>Next Reset Date</h6>
<div class="fs-6">{{ current_compliance.next_reset_date.strftime('%Y-%m-%d') }}</div>
{% endif %}
</div>
</div>
<div class="mt-3">
<h6>Current 180-day Period</h6>
<div class="text-muted">
{{ current_compliance.current_180_day_period[0].strftime('%Y-%m-%d') }} to
{{ current_compliance.current_180_day_period[1].strftime('%Y-%m-%d') }}
</div>
</div>
</div>
</div>
{% if current_compliance.stays_in_period %}
<div class="card mb-4">
<div class="card-header">
<h5 class="card-title">Stays in Current 180-day Period</h5>
</div>
<div class="card-body">
<div class="table-responsive">
<table class="table table-striped">
<thead>
<tr>
<th>Entry Date</th>
<th>Exit Date</th>
<th>Country</th>
<th>Days</th>
</tr>
</thead>
<tbody>
{% for stay in current_compliance.stays_in_period %}
<tr>
<td>{{ stay.entry_date.strftime('%Y-%m-%d') }}</td>
<td>
{% if stay.exit_date %}
{{ stay.exit_date.strftime('%Y-%m-%d') }}
{% else %}
<span class="text-muted">ongoing</span>
{% endif %}
</td>
<td>{{ stay.country | country_flag }} {{ stay.country.upper() }}</td>
<td>{{ stay.days }}</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
</div>
</div>
{% endif %}
</div>
<div class="col-md-4">
{% if warnings %}
<div class="card mb-4">
<div class="card-header">
<h5 class="card-title">Warnings</h5>
</div>
<div class="card-body">
{% for warning in warnings %}
<div class="alert alert-{{ 'danger' if warning.severity == 'error' else 'warning' }} alert-dismissible fade show" role="alert">
<strong>{{ warning.date.strftime('%Y-%m-%d') }}</strong><br>
{{ warning.message }}
<button type="button" class="btn-close" data-bs-dismiss="alert" aria-label="Close"></button>
</div>
{% endfor %}
</div>
</div>
{% endif %}
<div class="card">
<div class="card-header">
<h5 class="card-title">Compliance History</h5>
</div>
<div class="card-body">
<div class="chart-container" style="height: 300px;">
<canvas id="complianceChart"></canvas>
</div>
</div>
</div>
</div>
</div>
{% if trips_with_compliance %}
<div class="card mt-4">
<div class="card-header">
<h5 class="card-title">Trip Compliance History</h5>
</div>
<div class="card-body">
<div class="table-responsive">
<table class="table table-striped">
<thead>
<tr>
<th>Trip Date</th>
<th>Days Used</th>
<th>Days Remaining</th>
<th>Status</th>
</tr>
</thead>
<tbody>
{% for trip_date, calculation in trips_with_compliance.items() %}
<tr>
<td>{{ trip_date.strftime('%Y-%m-%d') }}</td>
<td>{{ calculation.total_days_used }}/90</td>
<td>{{ calculation.days_remaining }}</td>
<td>
{% if calculation.is_compliant %}
<span class="badge bg-success">Compliant</span>
{% else %}
<span class="badge bg-danger">Non-compliant</span>
{% endif %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
</div>
</div>
{% endif %}
</div>
{% endblock %}
{% block scripts %}
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
<script>
const ctx = document.getElementById('complianceChart').getContext('2d');
const complianceHistory = {{ compliance_history | tojson }};
const chart = new Chart(ctx, {
type: 'line',
data: {
labels: complianceHistory.map(item => item.date),
datasets: [{
label: 'Days Used',
data: complianceHistory.map(item => item.days_used),
borderColor: 'rgb(75, 192, 192)',
backgroundColor: 'rgba(75, 192, 192, 0.2)',
tension: 0.1
}]
},
options: {
responsive: true,
maintainAspectRatio: false,
scales: {
y: {
beginAtZero: true,
max: 90,
ticks: {
callback: function(value) {
return value + '/90';
}
}
}
},
plugins: {
legend: {
display: true
}
}
}
});
</script>
{% endblock %}

View file

@ -117,6 +117,25 @@
{% if delta %}
<div>How long until trip: {{ delta }}</div>
{% endif %}
{% if trip.schengen_compliance %}
<div class="mt-3">
<strong>Schengen Compliance:</strong>
{% if trip.schengen_compliance.is_compliant %}
<span class="badge bg-success">✅ Compliant</span>
{% else %}
<span class="badge bg-danger">❌ Non-compliant</span>
{% endif %}
<div class="text-muted small">
{{ trip.schengen_compliance.total_days_used }}/90 days used
{% if trip.schengen_compliance.is_compliant %}
({{ trip.schengen_compliance.days_remaining }} remaining)
{% else %}
({{ trip.schengen_compliance.days_over_limit }} over limit)
{% endif %}
</div>
</div>
{% endif %}
</div>
{% for item in trip.conferences %}

View file

@ -25,6 +25,7 @@ import agenda.holidays
import agenda.stats
import agenda.thespacedevs
import agenda.trip
import agenda.trip_schengen
import agenda.utils
from agenda import calendar, format_list_with_ampersand, travel, uk_tz
from agenda.types import StrDict, Trip
@ -36,6 +37,13 @@ app.config.from_object("config.default")
agenda.error_mail.setup_error_mail(app)
@app.template_filter("country_flag")
def country_flag_filter(country_code: str) -> str:
"""Convert country code to emoji flag."""
from agenda.schengen import get_country_flag
return get_country_flag(country_code)
@app.before_request
def handle_auth() -> None:
"""Handle authentication and set global user."""
@ -387,11 +395,17 @@ def get_trip_list(
route_distances: agenda.travel.RouteDistances | None = None,
) -> list[Trip]:
"""Get list of trips respecting current authentication status."""
return [
trips = [
trip
for trip in agenda.trip.build_trip_list(route_distances=route_distances)
if flask.g.user.is_authenticated or not trip.private
]
# Add Schengen compliance information to each trip
for trip in trips:
agenda.trip_schengen.add_schengen_compliance_to_trip(trip)
return trips
@app.route("/trip")
@ -527,6 +541,9 @@ def trip_page(start: str) -> str:
prev_trip, trip, next_trip = get_prev_current_and_next_trip(start, trip_list)
if not trip:
flask.abort(404)
# Add Schengen compliance information
trip = agenda.trip_schengen.add_schengen_compliance_to_trip(trip)
coordinates = agenda.trip.collect_trip_coordinates(trip)
routes = agenda.trip.get_trip_routes(trip, app.config["PERSONAL_DATA"])
@ -603,6 +620,12 @@ def trip_stats() -> str:
)
@app.route("/schengen")
def schengen_report() -> str:
"""Schengen compliance report."""
return agenda.trip_schengen.flask_route_schengen_report()
@app.route("/callback")
def auth_callback() -> tuple[str, int] | werkzeug.Response:
"""Process the authentication callback."""