Add a needs_commons parameter (default true) to both the API endpoint and the detail page. When needs_commons=false, look up Wikidata items by OSM relation ID (P402) via WDQS to return the most specific matching item even if it has no Wikimedia Commons category. Only activate this path when the matched item has no Commons category, so that locations with a Commons cat always get the same result regardless of the parameter. Remove the nearest-polygon fallback that was returning incorrect results for inland points in broad admin areas (e.g. returning Falmer for a point in Brighton). That fallback found the nearest polygon by boundary distance without requiring containment, so the pin would appear outside the polygon. The geosearch handles these cases correctly. Redesign the detail page: place name as heading, result card, collapsible API response and SPARQL query, improved OSM element cards with left-border highlight on the matched element, and a toggle button between modes. Redesign the index page: two-column layout with numbered steps and API documentation including the needs_commons parameter, Bootstrap form, and examples as a table. Closes #28 (Add support for returning Wikidata item instead of commons category) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
566 lines
18 KiB
Python
Executable file
566 lines
18 KiB
Python
Executable file
#!/usr/bin/python3
|
|
"""Reverse geocode: convert lat/lon to Wikidata item & Wikimedia Commons category."""
|
|
|
|
import inspect
|
|
import random
|
|
import re
|
|
import socket
|
|
import sys
|
|
import traceback
|
|
import typing
|
|
from time import time
|
|
|
|
import sqlalchemy.exc
|
|
import werkzeug.debug.tbtools
|
|
from flask import Flask, jsonify, redirect, render_template, request, url_for
|
|
from pygments import highlight
|
|
from pygments.formatters import HtmlFormatter
|
|
from pygments.lexers import SparqlLexer
|
|
from sqlalchemy import func, text
|
|
from sqlalchemy.orm.query import Query
|
|
from werkzeug.wrappers import Response
|
|
|
|
import geocode
|
|
from geocode import database, model, scotland, wikidata
|
|
from geocode.error_mail import setup_error_mail
|
|
|
|
city_of_london_qid = "Q23311"
|
|
app = Flask(__name__)
|
|
app.config.from_object("config.default")
|
|
database.init_app(app)
|
|
setup_error_mail(app)
|
|
|
|
Tags = typing.Mapping[str, str]
|
|
StrDict = dict[str, typing.Any]
|
|
logging_enabled = True
|
|
|
|
fallback_qid_to_commons_cat = {"Q68816332": "Orphir", "Q68815208": "Crail"}
|
|
|
|
|
|
@app.errorhandler(werkzeug.exceptions.InternalServerError)
|
|
def exception_handler(e: werkzeug.exceptions.InternalServerError) -> tuple[str, int]:
|
|
"""Handle exception."""
|
|
exec_type, exc_value, current_traceback = sys.exc_info()
|
|
assert exc_value
|
|
tb = werkzeug.debug.tbtools.DebugTraceback(exc_value)
|
|
|
|
summary = tb.render_traceback_html(include_title=False)
|
|
exc_lines = "".join(tb._te.format_exception_only())
|
|
|
|
last_frame = list(traceback.walk_tb(current_traceback))[-1][0]
|
|
last_frame_args = inspect.getargs(last_frame.f_code)
|
|
|
|
return (
|
|
render_template(
|
|
"show_error.html",
|
|
plaintext=tb.render_traceback_text(),
|
|
exception=exc_lines,
|
|
exception_type=tb._te.exc_type.__name__,
|
|
summary=summary,
|
|
last_frame=last_frame,
|
|
last_frame_args=last_frame_args,
|
|
),
|
|
500,
|
|
)
|
|
|
|
|
|
def get_random_lat_lon() -> tuple[float, float]:
|
|
"""Select random lat/lon within the UK."""
|
|
south, east = 50.8520, 0.3536
|
|
north, west = 53.7984, -2.7296
|
|
|
|
mul = 10000
|
|
lat = random.randrange(int(south * mul), int(north * mul)) / mul
|
|
lon = random.randrange(int(west * mul), int(east * mul)) / mul
|
|
|
|
return lat, lon
|
|
|
|
|
|
def do_lookup(
|
|
elements: "Query[model.Polygon]", lat: float, lon: float
|
|
) -> wikidata.WikidataDict:
|
|
"""Do lookup."""
|
|
try:
|
|
hit = osm_lookup(elements, lat, lon)
|
|
except wikidata.QueryError as e:
|
|
return {
|
|
"query": e.query,
|
|
"error": e.r.text,
|
|
"query_url": "https://query.wikidata.org/#" + e.query,
|
|
}
|
|
|
|
return wikidata.build_dict(hit, lat, lon)
|
|
|
|
|
|
def add_missing_commons_cat(rows: list[StrDict]) -> None:
|
|
"""Add missing details for Commons Categories to Wikidata query results."""
|
|
for row in rows:
|
|
if "commonsSiteLink" in row or "commonsCat" in row:
|
|
continue
|
|
|
|
qid = row["item"]["value"].rpartition("/")[2]
|
|
if qid not in fallback_qid_to_commons_cat:
|
|
continue
|
|
|
|
commons_cat = fallback_qid_to_commons_cat[qid]
|
|
row["commonsCat"] = {"type": "literal", "value": commons_cat}
|
|
|
|
|
|
def lat_lon_to_wikidata(
|
|
lat: float, lon: float, needs_commons: bool = True
|
|
) -> dict[str, typing.Any]:
|
|
"""Lookup lat/lon and find most appropriate Wikidata item."""
|
|
scotland_code = scotland.get_scotland_code(lat, lon)
|
|
|
|
elements: typing.Any
|
|
if scotland_code:
|
|
rows = wikidata.lookup_scottish_parish_in_wikidata(scotland_code)
|
|
add_missing_commons_cat(rows)
|
|
hit = wikidata.commons_from_rows(rows)
|
|
elements = []
|
|
result = wikidata.build_dict(hit, lat, lon)
|
|
|
|
if not result.get("missing"):
|
|
return {"elements": elements, "result": result}
|
|
|
|
elements = model.Polygon.coords_within(lat, lon)
|
|
result = do_lookup(elements, lat, lon)
|
|
|
|
# special case because the City of London is admin_level=6 in OSM
|
|
if result.get("wikidata") == city_of_london_qid:
|
|
return {"elements": elements, "result": result}
|
|
|
|
admin_level = result.get("admin_level")
|
|
if not admin_level:
|
|
return {"elements": elements, "result": result}
|
|
|
|
assert isinstance(admin_level, int)
|
|
if admin_level >= 7:
|
|
return {"elements": elements, "result": result}
|
|
|
|
# Point is in Scotland but not in a specific parish — try nearest parish
|
|
if result.get("wikidata") == "Q22":
|
|
nearby_code = scotland.get_nearest_scotland_code(lat, lon)
|
|
if nearby_code:
|
|
rows = wikidata.lookup_scottish_parish_in_wikidata(nearby_code)
|
|
add_missing_commons_cat(rows)
|
|
hit = wikidata.commons_from_rows(rows)
|
|
if hit:
|
|
nearby_result = wikidata.build_dict(hit, lat, lon)
|
|
if not nearby_result.get("missing"):
|
|
return {"elements": elements, "result": nearby_result}
|
|
|
|
if not needs_commons:
|
|
# Direct lookup: find Wikidata items whose P402 (OSM relation ID) matches
|
|
# one of the OSM polygons that contain this point.
|
|
osm_id_to_element: dict[int, model.Polygon] = {}
|
|
relation_ids_for_lookup: list[int] = []
|
|
for e in elements:
|
|
if e.osm_id < 0:
|
|
rel_id = abs(e.osm_id)
|
|
relation_ids_for_lookup.append(rel_id)
|
|
osm_id_to_element[e.osm_id] = e
|
|
|
|
if relation_ids_for_lookup:
|
|
lookup_rows = wikidata.lookup_wikidata_by_osm_relation_ids(
|
|
relation_ids_for_lookup
|
|
)
|
|
rel_to_hit: dict[int, wikidata.Hit] = {}
|
|
for row in lookup_rows:
|
|
rel_id = int(row["osmRelation"]["value"])
|
|
if rel_id not in rel_to_hit:
|
|
rel_to_hit[rel_id] = wikidata.hit_from_row(row)
|
|
|
|
# Iterate elements in specificity order (smallest area first, from coords_within).
|
|
# Only use hits without a Commons category — if there's a Commons cat,
|
|
# the geosearch path will find it (or something more specific).
|
|
for e in elements:
|
|
if e.osm_id >= 0:
|
|
continue
|
|
rel_id = abs(e.osm_id)
|
|
if rel_id not in rel_to_hit:
|
|
continue
|
|
hit = rel_to_hit[rel_id]
|
|
if hit.get("commons_cat"):
|
|
continue
|
|
if e.tags:
|
|
hit["admin_level"] = get_admin_level(e.tags)
|
|
hit["element"] = e.osm_id
|
|
hit["geojson"] = typing.cast(str, e.geojson_str)
|
|
result = wikidata.build_dict(hit, lat, lon)
|
|
return {"elements": elements, "result": result}
|
|
|
|
query = wikidata.geosearch_query(lat, lon)
|
|
geo_rows = wikidata.wdqs(query)
|
|
row = wikidata.filter_geosearch_row(geo_rows)
|
|
if row:
|
|
hit = wikidata.commons_from_rows([row])
|
|
elements = []
|
|
result = wikidata.build_dict(hit, lat, lon)
|
|
|
|
return {"elements": elements, "result": result, "query": query}
|
|
|
|
|
|
def get_admin_level(tags: Tags) -> int | None:
|
|
"""Read admin_level from tags."""
|
|
admin_level_tag = tags.get("admin_level")
|
|
return (
|
|
int(admin_level_tag) if admin_level_tag and admin_level_tag.isdigit() else None
|
|
)
|
|
|
|
|
|
def hit_from_wikidata_tag(tags: Tags) -> wikidata.Hit | None:
|
|
"""Check element for a wikidata tag."""
|
|
return (
|
|
{
|
|
"wikidata": qid,
|
|
"commons_cat": commons,
|
|
}
|
|
if "wikidata" in tags
|
|
and (commons := wikidata.qid_to_commons_category(qid := tags["wikidata"]))
|
|
else None
|
|
)
|
|
|
|
|
|
def hit_from_ref_gss_tag(tags: Tags) -> wikidata.Hit | None:
|
|
"""Check element for rss:gss tag."""
|
|
gss = tags.get("ref:gss")
|
|
return wikidata.get_commons_cat_from_gss(gss) if gss else None
|
|
|
|
|
|
def hit_from_name(tags: Tags, lat: float, lon: float) -> wikidata.Hit | None:
|
|
"""Use name to look for hit."""
|
|
if not (name := tags.get("name")):
|
|
return None
|
|
if name.endswith(" CP"): # civil parish
|
|
name = name[:-3]
|
|
|
|
rows = wikidata.lookup_wikidata_by_name(name, lat, lon)
|
|
return wikidata.commons_from_rows(rows) if len(rows) == 1 else None
|
|
|
|
|
|
def osm_lookup(
|
|
elements: "Query[model.Polygon]", lat: float, lon: float
|
|
) -> wikidata.Hit | None:
|
|
"""OSM lookup."""
|
|
for e in elements:
|
|
assert isinstance(e, model.Polygon)
|
|
assert e.tags
|
|
tags: typing.Mapping[str, str] = e.tags
|
|
admin_level: int | None = get_admin_level(tags)
|
|
if not admin_level and tags.get("boundary") not in ("political", "place"):
|
|
continue
|
|
if not (
|
|
(hit := hit_from_wikidata_tag(tags))
|
|
or (hit := hit_from_ref_gss_tag(tags))
|
|
or (hit := hit_from_name(tags, lat, lon))
|
|
):
|
|
continue
|
|
hit["admin_level"] = admin_level
|
|
hit["element"] = e.osm_id
|
|
hit["geojson"] = typing.cast(str, e.geojson_str)
|
|
return hit
|
|
|
|
has_wikidata_tag = [e for e in elements if e.tags.get("wikidata")]
|
|
if len(has_wikidata_tag) != 1:
|
|
return None
|
|
|
|
e = has_wikidata_tag[0]
|
|
assert e.tags
|
|
qid = e.tags["wikidata"]
|
|
return {
|
|
"wikidata": qid,
|
|
"element": e.osm_id,
|
|
"geojson": typing.cast(str, e.geojson_str),
|
|
"commons_cat": wikidata.qid_to_commons_category(qid),
|
|
"admin_level": admin_level,
|
|
}
|
|
|
|
|
|
def redirect_to_detail(q: str) -> Response:
|
|
"""Redirect to detail page."""
|
|
lat, lon = [v.strip() for v in q.split(",", 1)]
|
|
return redirect(url_for("detail_page", lat=lat, lon=lon))
|
|
|
|
|
|
@app.errorhandler(sqlalchemy.exc.OperationalError)
|
|
def handle_database_error(error: Exception) -> tuple[str, int]:
|
|
"""Show error screen on database error."""
|
|
return render_template("database_error.html"), 500
|
|
|
|
|
|
def parse_coordinate(coord_str: str) -> float:
|
|
"""Parse coordinate string in various formats to decimal degrees."""
|
|
coord_str = coord_str.strip()
|
|
|
|
# Try decimal degrees first
|
|
try:
|
|
return float(coord_str)
|
|
except ValueError:
|
|
pass
|
|
|
|
# Parse DMS format (e.g., "56°5'58.56"N" or "3°22'33.71"W")
|
|
dms_pattern = r"""
|
|
(?P<degrees>\d+)°
|
|
(?P<minutes>\d+)'
|
|
(?P<seconds>[\d.]+)"?
|
|
(?P<direction>[NSEW])?
|
|
"""
|
|
|
|
match = re.match(dms_pattern, coord_str, re.VERBOSE)
|
|
if match:
|
|
degrees = int(match.group("degrees"))
|
|
minutes = int(match.group("minutes"))
|
|
seconds = float(match.group("seconds"))
|
|
direction = match.group("direction")
|
|
|
|
# Convert to decimal degrees
|
|
decimal = degrees + minutes / 60 + seconds / 3600
|
|
|
|
# Apply direction
|
|
if direction in ["S", "W"]:
|
|
decimal = -decimal
|
|
|
|
return decimal
|
|
|
|
# If all parsing attempts fail
|
|
raise ValueError(f"Could not parse coordinate: {coord_str}")
|
|
|
|
|
|
def validate_coordinates(lat: float, lon: float) -> str | None:
|
|
"""Validate latitude and longitude ranges. Returns error message if invalid."""
|
|
if lat < -90 or lat > 90:
|
|
return "Latitude must be between -90 and 90 degrees"
|
|
if lon < -180 or lon > 180:
|
|
return "Longitude must be between -180 and 180 degrees"
|
|
return None
|
|
|
|
|
|
@app.route("/")
|
|
def index() -> str | Response:
|
|
"""Index page."""
|
|
t0 = time()
|
|
database.session.execute(text("SELECT 1"))
|
|
q = request.args.get("q")
|
|
if q and q.strip():
|
|
return redirect_to_detail(q)
|
|
|
|
lat_str, lon_str = request.args.get("lat"), request.args.get("lon")
|
|
|
|
if lat_str is None or lon_str is None:
|
|
samples = sorted(geocode.samples, key=lambda row: row[2])
|
|
return render_template("index.html", samples=samples)
|
|
|
|
lat, lon = float(lat_str), float(lon_str)
|
|
|
|
try:
|
|
lat = parse_coordinate(lat_str)
|
|
lon = parse_coordinate(lon_str)
|
|
except ValueError:
|
|
return jsonify(
|
|
coords={"lat": lat_str, "lon": lon_str},
|
|
error="Invalid coordinate format. "
|
|
+ "Please use decimal degrees (e.g., 56.099600) "
|
|
+ "or DMS format (e.g., 56°5'58.56\"N)",
|
|
)
|
|
|
|
if error_msg := validate_coordinates(lat, lon):
|
|
return jsonify(coords={"lat": lat, "lon": lon}, error=error_msg)
|
|
|
|
needs_commons = request.args.get("needs_commons", "true").lower() != "false"
|
|
result = lat_lon_to_wikidata(lat, lon, needs_commons=needs_commons)["result"]
|
|
result.pop("element", None)
|
|
result.pop("geojson", None)
|
|
if logging_enabled:
|
|
remote_addr = request.headers.get("X-Forwarded-For", request.remote_addr)
|
|
log = model.LookupLog(
|
|
lat=lat,
|
|
lon=lon,
|
|
remote_addr=remote_addr,
|
|
fqdn=socket.getfqdn(remote_addr) if remote_addr else None,
|
|
result=result,
|
|
response_time_ms=int((time() - t0) * 1000),
|
|
)
|
|
database.session.add(log)
|
|
database.session.commit()
|
|
return jsonify(result)
|
|
|
|
|
|
@app.route("/random")
|
|
def random_location() -> str | Response:
|
|
"""Return detail page for random lat/lon."""
|
|
lat, lon = get_random_lat_lon()
|
|
return build_detail_page(lat, lon)
|
|
|
|
|
|
@app.route("/wikidata_tag")
|
|
def wikidata_tag() -> str:
|
|
"""Lookup Wikidata tag for lat/lon."""
|
|
lat_str, lon_str = request.args["lat"], request.args["lon"]
|
|
lat, lon = float(lat_str), float(lon_str)
|
|
|
|
scotland_code = scotland.get_scotland_code(lat, lon)
|
|
|
|
elements: typing.Any
|
|
if scotland_code:
|
|
rows = wikidata.lookup_scottish_parish_in_wikidata(scotland_code)
|
|
hit = wikidata.commons_from_rows(rows)
|
|
elements = []
|
|
result = wikidata.build_dict(hit, lat, lon)
|
|
else:
|
|
elements = model.Polygon.coords_within(lat, lon)
|
|
result = do_lookup(elements, lat, lon)
|
|
|
|
return render_template(
|
|
"wikidata_tag.html", lat=lat, lon=lon, result=result, elements=elements
|
|
)
|
|
|
|
|
|
def highlight_sparql(query: str) -> str:
|
|
"""Highlight SPARQL query syntax using Pygments."""
|
|
lexer = SparqlLexer()
|
|
formatter = HtmlFormatter()
|
|
return highlight(query, lexer, formatter)
|
|
|
|
|
|
app.jinja_env.filters["highlight_sparql"] = highlight_sparql
|
|
|
|
|
|
def build_detail_page(lat: float, lon: float, needs_commons: bool = True) -> str:
|
|
"""Run lookup and build detail page."""
|
|
if lat < -90 or lat > 90 or lon < -180 or lon > 180:
|
|
error = (
|
|
"latitude must be between -90 and 90, "
|
|
+ "and longitude must be between -180 and 180"
|
|
)
|
|
return render_template("query_error.html", lat=lat, lon=lon, error=error)
|
|
|
|
try:
|
|
reply = lat_lon_to_wikidata(lat, lon, needs_commons=needs_commons)
|
|
except wikidata.QueryError as e:
|
|
query, r = e.args
|
|
return render_template("query_error.html", lat=lat, lon=lon, query=query, r=r)
|
|
|
|
element = reply["result"].pop("element", None)
|
|
geojson = reply["result"].pop("geojson", None)
|
|
|
|
css = HtmlFormatter().get_style_defs(".highlight")
|
|
|
|
return render_template(
|
|
"detail.html",
|
|
lat=lat,
|
|
lon=lon,
|
|
str=str,
|
|
element_id=element,
|
|
geojson=geojson,
|
|
css=css,
|
|
needs_commons=needs_commons,
|
|
**reply,
|
|
)
|
|
|
|
|
|
@app.route("/detail")
|
|
def detail_page() -> Response | str:
|
|
"""Detail page."""
|
|
database.session.execute(text("SELECT 1"))
|
|
|
|
lat_str = request.args.get("lat")
|
|
lon_str = request.args.get("lon")
|
|
|
|
if not lat_str or not lon_str:
|
|
return redirect(url_for("index"))
|
|
|
|
try:
|
|
lat = parse_coordinate(lat_str)
|
|
lon = parse_coordinate(lon_str)
|
|
except ValueError as e:
|
|
error = f"Invalid coordinate format: {str(e)}"
|
|
return render_template(
|
|
"coordinate_error.html", lat_str=lat_str, lon_str=lon_str, error=error
|
|
)
|
|
|
|
needs_commons = request.args.get("needs_commons", "true").lower() != "false"
|
|
return build_detail_page(lat, lon, needs_commons=needs_commons)
|
|
|
|
|
|
@app.route("/reports")
|
|
def reports() -> str:
|
|
"""Return reports page with various statistics."""
|
|
log_count = model.LookupLog.query.count()
|
|
|
|
log_start_time, average_response_time = database.session.query(
|
|
func.min(model.LookupLog.dt), func.avg(model.LookupLog.response_time_ms)
|
|
).one()
|
|
|
|
# Construct the query
|
|
by_day = (
|
|
database.session.query(
|
|
func.date(model.LookupLog.dt).label("log_date"),
|
|
func.count(model.LookupLog.id).label("count"),
|
|
)
|
|
.group_by("log_date")
|
|
.order_by(func.date(model.LookupLog.dt).desc())
|
|
)
|
|
|
|
top_places = (
|
|
database.session.query(
|
|
model.LookupLog.result["commons_cat"]["title"].label("place"),
|
|
func.count().label("num"),
|
|
)
|
|
.group_by("place")
|
|
.order_by(func.count().desc())
|
|
.limit(50)
|
|
)
|
|
|
|
missing_places = (
|
|
database.session.query(model.LookupLog)
|
|
.filter(
|
|
model.LookupLog.result.has_key("missing") # type: ignore
|
|
) # Filtering for entries where result contains 'missing'
|
|
.order_by(model.LookupLog.dt.desc()) # Ordering by dt in descending order
|
|
.limit(50) # Limiting to the top 50 results
|
|
)
|
|
|
|
return render_template(
|
|
"reports.html",
|
|
log_count=log_count,
|
|
log_start_time=log_start_time,
|
|
average_response_time=average_response_time,
|
|
by_day=by_day,
|
|
top_places=top_places,
|
|
missing_places=missing_places,
|
|
)
|
|
|
|
|
|
@app.route("/pin/<lat>/<lon>")
|
|
def pin_detail(lat: str, lon: str) -> Response:
|
|
"""Details for map pin location."""
|
|
reply = lat_lon_to_wikidata(float(lat), float(lon))
|
|
element = reply["result"].pop("element", None)
|
|
geojson = reply["result"].pop("geojson", None)
|
|
|
|
css = HtmlFormatter().get_style_defs(".highlight")
|
|
|
|
html = render_template(
|
|
"pin_detail.html",
|
|
lat=lat,
|
|
lon=lon,
|
|
str=str,
|
|
element_id=element,
|
|
geojson=geojson,
|
|
css=css,
|
|
**reply,
|
|
)
|
|
|
|
return jsonify(html=html)
|
|
|
|
|
|
@app.route("/map")
|
|
def map_page() -> str:
|
|
"""Map page."""
|
|
css = HtmlFormatter().get_style_defs(".highlight")
|
|
return render_template("map.html", css=css)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host="0.0.0.0")
|