#!/usr/bin/python3 """Reverse geocode: convert lat/lon to Wikidata item & Wikimedia Commons category.""" import inspect import random import socket import sys import traceback import typing from time import time import sqlalchemy import sqlalchemy.exc import werkzeug.debug.tbtools from flask import Flask, jsonify, redirect, render_template, request, url_for from sqlalchemy.orm.query import Query from werkzeug.wrappers import Response import geocode from geocode import database, model, scotland, wikidata from geocode.error_mail import setup_error_mail city_of_london_qid = "Q23311" app = Flask(__name__) app.config.from_object("config.default") database.init_app(app) setup_error_mail(app) Tags = typing.Mapping[str, str] logging_enabled = True @app.errorhandler(werkzeug.exceptions.InternalServerError) def exception_handler(e: werkzeug.exceptions.InternalServerError) -> tuple[str, int]: """Handle exception.""" exec_type, exc_value, current_traceback = sys.exc_info() assert exc_value tb = werkzeug.debug.tbtools.DebugTraceback(exc_value) summary = tb.render_traceback_html(include_title=False) exc_lines = "".join(tb._te.format_exception_only()) last_frame = list(traceback.walk_tb(current_traceback))[-1][0] last_frame_args = inspect.getargs(last_frame.f_code) return ( render_template( "show_error.html", plaintext=tb.render_traceback_text(), exception=exc_lines, exception_type=tb._te.exc_type.__name__, summary=summary, last_frame=last_frame, last_frame_args=last_frame_args, ), 500, ) def get_random_lat_lon() -> tuple[float, float]: """Select random lat/lon within the UK.""" south, east = 50.8520, 0.3536 north, west = 53.7984, -2.7296 mul = 10000 lat = random.randrange(int(south * mul), int(north * mul)) / mul lon = random.randrange(int(west * mul), int(east * mul)) / mul return lat, lon def do_lookup( elements: "Query[model.Polygon]", lat: str | float, lon: str | float ) -> wikidata.WikidataDict: """Do lookup.""" try: hit = osm_lookup(elements, lat, lon) except wikidata.QueryError as e: return { "query": e.query, "error": e.r.text, "query_url": "https://query.wikidata.org/#" + e.query, } return wikidata.build_dict(hit, lat, lon) def lat_lon_to_wikidata(lat: float, lon: float) -> dict[str, typing.Any]: """Lookup lat/lon and find most appropriate Wikidata item.""" scotland_code = scotland.get_scotland_code(lat, lon) elements: typing.Any if scotland_code: rows = wikidata.lookup_scottish_parish_in_wikidata(scotland_code) hit = wikidata.commons_from_rows(rows) elements = [] result = wikidata.build_dict(hit, lat, lon) return {"elements": elements, "result": result} elements = model.Polygon.coords_within(lat, lon) result = do_lookup(elements, lat, lon) # special case because the City of London is admin_level=6 in OSM if result.get("wikidata") == city_of_london_qid: return {"elements": elements, "result": result} admin_level = result.get("admin_level") if not admin_level: return {"elements": elements, "result": result} assert isinstance(admin_level, int) if admin_level >= 7: return {"elements": elements, "result": result} row = wikidata.geosearch(lat, lon) if row: hit = wikidata.commons_from_rows([row]) elements = [] result = wikidata.build_dict(hit, lat, lon) return {"elements": elements, "result": result} def get_admin_level(tags: Tags) -> int | None: """Read admin_level from tags.""" admin_level_tag = tags.get("admin_level") return ( int(admin_level_tag) if admin_level_tag and admin_level_tag.isdigit() else None ) def hit_from_wikidata_tag(tags: Tags) -> wikidata.Hit | None: """Check element for a wikidata tag.""" return ( { "wikidata": qid, "commons_cat": commons, } if "wikidata" in tags and (commons := wikidata.qid_to_commons_category(qid := tags["wikidata"])) else None ) def hit_from_ref_gss_tag(tags: Tags) -> wikidata.Hit | None: """Check element for rss:gss tag.""" gss = tags.get("ref:gss") return wikidata.get_commons_cat_from_gss(gss) if gss else None def hit_from_name( tags: Tags, lat: str | float, lon: str | float ) -> wikidata.Hit | None: """Use name to look for hit.""" if not (name := tags.get("name")): return None if name.endswith(" CP"): # civil parish name = name[:-3] rows = wikidata.lookup_wikidata_by_name(name, lat, lon) return wikidata.commons_from_rows(rows) if len(rows) == 1 else None def osm_lookup( elements: "Query[model.Polygon]", lat: str | float, lon: str | float ) -> wikidata.Hit | None: """OSM lookup.""" ret: wikidata.Hit | None for e in elements: assert isinstance(e, model.Polygon) assert e.tags tags: typing.Mapping[str, str] = e.tags admin_level: int | None = get_admin_level(tags) if not admin_level and tags.get("boundary") not in ("political", "place"): continue if not ( (hit := hit_from_wikidata_tag(tags)) or (hit := hit_from_ref_gss_tag(tags)) or (hit := hit_from_name(tags, lat, lon)) ): continue hit["admin_level"] = admin_level hit["element"] = e.osm_id hit["geojson"] = typing.cast(str, e.geojson_str) return hit has_wikidata_tag = [e for e in elements if e.tags.get("wikidata")] if len(has_wikidata_tag) != 1: return None e = has_wikidata_tag[0] assert e.tags qid = e.tags["wikidata"] return { "wikidata": qid, "element": e.osm_id, "geojson": typing.cast(str, e.geojson_str), "commons_cat": wikidata.qid_to_commons_category(qid), "admin_level": admin_level, } def redirect_to_detail(q: str) -> Response: """Redirect to detail page.""" lat, lon = [v.strip() for v in q.split(",", 1)] return redirect(url_for("detail_page", lat=lat, lon=lon)) @app.errorhandler(sqlalchemy.exc.OperationalError) def handle_database_error(error: Exception) -> tuple[str, int]: """Show error screen on database error.""" return render_template("database_error.html"), 500 @app.route("/") def index() -> str | Response: """Index page.""" t0 = time() database.session.execute("SELECT 1") q = request.args.get("q") if q and q.strip(): return redirect_to_detail(q) lat_str, lon_str = request.args.get("lat"), request.args.get("lon") if lat_str is None or lon_str is None: samples = sorted(geocode.samples, key=lambda row: row[2]) return render_template("index.html", samples=samples) lat, lon = float(lat_str), float(lon_str) if lat < -90 or lat > 90 or lon < -180 or lon > 180: return jsonify( coords={"lat": lat, "lon": lon}, error="lat must be between -90 and 90, " + "and lon must be between -180 and 180", ) result = lat_lon_to_wikidata(lat, lon)["result"] result.pop("element", None) result.pop("geojson", None) if logging_enabled: remote_addr = request.headers.get("X-Forwarded-For", request.remote_addr) log = model.LookupLog( lat=lat, lon=lon, remote_addr=remote_addr, fqdn=socket.getfqdn(remote_addr) if remote_addr else None, result=result, response_time_ms=int((time() - t0) * 1000), ) database.session.add(log) database.session.commit() return jsonify(result) @app.route("/random") def random_location() -> str | Response: """Return detail page for random lat/lon.""" lat, lon = get_random_lat_lon() return build_detail_page(lat, lon) @app.route("/wikidata_tag") def wikidata_tag() -> str: """Lookup Wikidata tag for lat/lon.""" lat_str, lon_str = request.args["lat"], request.args["lon"] lat, lon = float(lat_str), float(lon_str) scotland_code = scotland.get_scotland_code(lat, lon) elements: typing.Any if scotland_code: rows = wikidata.lookup_scottish_parish_in_wikidata(scotland_code) hit = wikidata.commons_from_rows(rows) elements = [] result = wikidata.build_dict(hit, lat, lon) else: elements = model.Polygon.coords_within(lat, lon) result = do_lookup(elements, lat, lon) return render_template( "wikidata_tag.html", lat=lat, lon=lon, result=result, elements=elements ) def build_detail_page(lat: float, lon: float) -> str: """Run lookup and build detail page.""" if lat < -90 or lat > 90 or lon < -180 or lon > 180: error = ( "latitude must be between -90 and 90, " + "and longitude must be between -180 and 180" ) return render_template("query_error.html", lat=lat, lon=lon, error=error) try: reply = lat_lon_to_wikidata(lat, lon) except wikidata.QueryError as e: query, r = e.args return render_template("query_error.html", lat=lat, lon=lon, query=query, r=r) element = reply["result"].pop("element", None) geojson = reply["result"].pop("geojson", None) return render_template( "detail.html", lat=lat, lon=lon, str=str, element_id=element, geojson=geojson, **reply, ) @app.route("/detail") def detail_page() -> Response | str: """Detail page.""" database.session.execute("SELECT 1") try: lat_str, lon_str = request.args["lat"], request.args["lon"] lat, lon = float(lat_str), float(lon_str) except TypeError: return redirect(url_for("index")) return build_detail_page(lat, lon) @app.route("/reports") def reports() -> str: """Reports page with various statistics.""" log_count = model.LookupLog.query.count() log_start_time = database.session.query( sqlalchemy.func.min(model.LookupLog.dt) ).scalar() return render_template( "reports.html", log_count=log_count, log_start_time=log_start_time ) if __name__ == "__main__": app.run(host="0.0.0.0")