geocode/lookup.py

397 lines
12 KiB
Python
Raw Normal View History

2019-08-18 15:56:53 +01:00
#!/usr/bin/python3
2023-11-17 15:23:03 +00:00
"""Reverse geocode: convert lat/lon to Wikidata item & Wikimedia Commons category."""
2019-08-18 15:56:53 +01:00
2023-11-25 14:27:47 +00:00
import inspect
2019-08-18 15:56:53 +01:00
import random
2023-11-24 15:35:54 +00:00
import socket
2023-11-25 14:27:47 +00:00
import sys
import traceback
2023-10-10 10:11:23 +01:00
import typing
2023-12-10 10:41:36 +00:00
from time import time
2023-10-10 10:11:23 +01:00
import sqlalchemy.exc
2023-11-25 14:27:47 +00:00
import werkzeug.debug.tbtools
2023-10-10 10:11:23 +01:00
from flask import Flask, jsonify, redirect, render_template, request, url_for
from sqlalchemy import func
2023-11-04 21:35:33 +00:00
from sqlalchemy.orm.query import Query
2023-10-10 10:11:23 +01:00
from werkzeug.wrappers import Response
import geocode
from geocode import database, model, scotland, wikidata
2023-11-25 11:41:53 +00:00
from geocode.error_mail import setup_error_mail
2021-04-15 12:28:02 +01:00
2021-04-16 16:21:38 +01:00
city_of_london_qid = "Q23311"
2019-08-18 15:56:53 +01:00
app = Flask(__name__)
2021-04-16 21:24:59 +01:00
app.config.from_object("config.default")
2021-04-17 17:02:53 +01:00
database.init_app(app)
2023-11-25 11:41:53 +00:00
setup_error_mail(app)
2019-08-18 15:56:53 +01:00
Tags = typing.Mapping[str, str]
StrDict = dict[str, typing.Any]
2023-11-24 15:35:54 +00:00
logging_enabled = True
2024-01-09 11:37:02 +00:00
fallback_qid_to_commons_cat = {"Q68816332": "Orphir", "Q68815208": "Crail"}
2019-08-18 15:56:53 +01:00
2023-11-25 14:27:47 +00:00
@app.errorhandler(werkzeug.exceptions.InternalServerError)
def exception_handler(e: werkzeug.exceptions.InternalServerError) -> tuple[str, int]:
"""Handle exception."""
exec_type, exc_value, current_traceback = sys.exc_info()
assert exc_value
tb = werkzeug.debug.tbtools.DebugTraceback(exc_value)
summary = tb.render_traceback_html(include_title=False)
exc_lines = "".join(tb._te.format_exception_only())
last_frame = list(traceback.walk_tb(current_traceback))[-1][0]
last_frame_args = inspect.getargs(last_frame.f_code)
return (
render_template(
"show_error.html",
plaintext=tb.render_traceback_text(),
exception=exc_lines,
exception_type=tb._te.exc_type.__name__,
summary=summary,
last_frame=last_frame,
last_frame_args=last_frame_args,
),
500,
)
2023-10-10 10:11:23 +01:00
def get_random_lat_lon() -> tuple[float, float]:
"""Select random lat/lon within the UK."""
2021-04-16 16:14:12 +01:00
south, east = 50.8520, 0.3536
north, west = 53.7984, -2.7296
mul = 10000
2019-08-18 15:56:53 +01:00
lat = random.randrange(int(south * mul), int(north * mul)) / mul
lon = random.randrange(int(west * mul), int(east * mul)) / mul
2021-04-16 16:14:12 +01:00
return lat, lon
2023-10-10 10:11:23 +01:00
def do_lookup(
2023-12-12 09:43:57 +00:00
elements: "Query[model.Polygon]", lat: float, lon: float
2023-10-10 10:11:23 +01:00
) -> wikidata.WikidataDict:
"""Do lookup."""
2021-04-15 12:28:02 +01:00
try:
hit = osm_lookup(elements, lat, lon)
2021-04-17 17:02:53 +01:00
except wikidata.QueryError as e:
2021-04-15 12:28:02 +01:00
return {
2021-04-16 16:21:38 +01:00
"query": e.query,
"error": e.r.text,
"query_url": "https://query.wikidata.org/#" + e.query,
2021-04-15 12:28:02 +01:00
}
2021-04-17 18:29:09 +01:00
return wikidata.build_dict(hit, lat, lon)
2021-04-15 12:28:02 +01:00
2021-04-16 16:14:32 +01:00
def add_missing_commons_cat(rows: list[StrDict]) -> None:
"""Add missing details for Commons Categories to Wikidata query results."""
for row in rows:
if "commonsSiteLink" in row or "commonsCat" in row:
continue
qid = row["item"]["value"].rpartition("/")[2]
if qid not in fallback_qid_to_commons_cat:
continue
commons_cat = fallback_qid_to_commons_cat[qid]
row["commonsCat"] = {"type": "literal", "value": commons_cat}
def lat_lon_to_wikidata(lat: float, lon: float) -> dict[str, typing.Any]:
"""Lookup lat/lon and find most appropriate Wikidata item."""
2021-04-17 17:02:53 +01:00
scotland_code = scotland.get_scotland_code(lat, lon)
2021-04-15 12:28:02 +01:00
2023-10-10 10:11:23 +01:00
elements: typing.Any
2021-04-15 12:28:02 +01:00
if scotland_code:
2021-04-17 18:29:09 +01:00
rows = wikidata.lookup_scottish_parish_in_wikidata(scotland_code)
add_missing_commons_cat(rows)
2021-04-17 18:29:09 +01:00
hit = wikidata.commons_from_rows(rows)
2021-04-15 12:28:02 +01:00
elements = []
2021-04-17 18:29:09 +01:00
result = wikidata.build_dict(hit, lat, lon)
2021-04-15 12:28:02 +01:00
if not result.get("missing"):
return {"elements": elements, "result": result}
2021-04-15 12:28:02 +01:00
2021-04-17 17:31:58 +01:00
elements = model.Polygon.coords_within(lat, lon)
2021-04-15 12:28:02 +01:00
result = do_lookup(elements, lat, lon)
# special case because the City of London is admin_level=6 in OSM
2022-05-18 14:12:34 +01:00
if result.get("wikidata") == city_of_london_qid:
2021-04-16 16:22:39 +01:00
return {"elements": elements, "result": result}
2021-04-15 12:28:02 +01:00
2022-05-18 14:12:34 +01:00
admin_level = result.get("admin_level")
if not admin_level:
return {"elements": elements, "result": result}
2021-04-15 12:28:02 +01:00
assert isinstance(admin_level, int)
if admin_level >= 7:
2021-04-16 16:22:39 +01:00
return {"elements": elements, "result": result}
2021-04-15 12:28:02 +01:00
2021-04-17 18:29:09 +01:00
row = wikidata.geosearch(lat, lon)
2021-04-15 12:28:02 +01:00
if row:
2021-04-17 18:29:09 +01:00
hit = wikidata.commons_from_rows([row])
2021-04-15 12:28:02 +01:00
elements = []
2021-04-17 18:29:09 +01:00
result = wikidata.build_dict(hit, lat, lon)
2021-04-15 12:28:02 +01:00
2021-04-16 16:22:39 +01:00
return {"elements": elements, "result": result}
2019-08-18 15:56:53 +01:00
2021-04-16 16:14:32 +01:00
def get_admin_level(tags: Tags) -> int | None:
"""Read admin_level from tags."""
admin_level_tag = tags.get("admin_level")
return (
int(admin_level_tag) if admin_level_tag and admin_level_tag.isdigit() else None
)
def hit_from_wikidata_tag(tags: Tags) -> wikidata.Hit | None:
"""Check element for a wikidata tag."""
return (
{
"wikidata": qid,
"commons_cat": commons,
}
if "wikidata" in tags
and (commons := wikidata.qid_to_commons_category(qid := tags["wikidata"]))
else None
)
def hit_from_ref_gss_tag(tags: Tags) -> wikidata.Hit | None:
"""Check element for rss:gss tag."""
gss = tags.get("ref:gss")
return wikidata.get_commons_cat_from_gss(gss) if gss else None
2023-12-12 09:43:57 +00:00
def hit_from_name(tags: Tags, lat: float, lon: float) -> wikidata.Hit | None:
"""Use name to look for hit."""
if not (name := tags.get("name")):
return None
if name.endswith(" CP"): # civil parish
name = name[:-3]
rows = wikidata.lookup_wikidata_by_name(name, lat, lon)
return wikidata.commons_from_rows(rows) if len(rows) == 1 else None
2023-10-10 10:11:23 +01:00
def osm_lookup(
2023-12-12 09:43:57 +00:00
elements: "Query[model.Polygon]", lat: float, lon: float
2023-10-10 10:11:23 +01:00
) -> wikidata.Hit | None:
"""OSM lookup."""
2021-04-15 12:28:02 +01:00
for e in elements:
2023-10-10 10:11:23 +01:00
assert isinstance(e, model.Polygon)
assert e.tags
tags: typing.Mapping[str, str] = e.tags
admin_level: int | None = get_admin_level(tags)
if not admin_level and tags.get("boundary") not in ("political", "place"):
2021-04-15 12:28:02 +01:00
continue
if not (
(hit := hit_from_wikidata_tag(tags))
or (hit := hit_from_ref_gss_tag(tags))
or (hit := hit_from_name(tags, lat, lon))
):
2019-08-18 15:56:53 +01:00
continue
hit["admin_level"] = admin_level
hit["element"] = e.osm_id
hit["geojson"] = typing.cast(str, e.geojson_str)
return hit
has_wikidata_tag = [e for e in elements if e.tags.get("wikidata")]
2021-04-15 12:28:02 +01:00
if len(has_wikidata_tag) != 1:
2023-10-10 10:11:23 +01:00
return None
2021-04-15 12:28:02 +01:00
e = has_wikidata_tag[0]
assert e.tags
qid = e.tags["wikidata"]
return {
"wikidata": qid,
"element": e.osm_id,
"geojson": typing.cast(str, e.geojson_str),
"commons_cat": wikidata.qid_to_commons_category(qid),
"admin_level": admin_level,
}
2023-10-10 10:11:23 +01:00
def redirect_to_detail(q: str) -> Response:
"""Redirect to detail page."""
lat, lon = [v.strip() for v in q.split(",", 1)]
return redirect(url_for("detail_page", lat=lat, lon=lon))
2019-08-18 15:56:53 +01:00
@app.errorhandler(sqlalchemy.exc.OperationalError)
def handle_database_error(error: Exception) -> tuple[str, int]:
"""Show error screen on database error."""
return render_template("database_error.html"), 500
2021-04-16 21:30:25 +01:00
@app.route("/")
2023-10-10 10:11:23 +01:00
def index() -> str | Response:
"""Index page."""
2023-12-10 10:41:36 +00:00
t0 = time()
database.session.execute("SELECT 1")
2021-04-16 21:30:25 +01:00
q = request.args.get("q")
if q and q.strip():
2023-10-10 10:11:23 +01:00
return redirect_to_detail(q)
2021-04-16 21:30:25 +01:00
lat_str, lon_str = request.args.get("lat"), request.args.get("lon")
2021-04-16 21:30:25 +01:00
if lat_str is None or lon_str is None:
2023-11-24 15:13:00 +00:00
samples = sorted(geocode.samples, key=lambda row: row[2])
return render_template("index.html", samples=samples)
lat, lon = float(lat_str), float(lon_str)
if lat < -90 or lat > 90 or lon < -180 or lon > 180:
return jsonify(
coords={"lat": lat, "lon": lon},
error="lat must be between -90 and 90, "
+ "and lon must be between -180 and 180",
)
2023-11-24 15:13:00 +00:00
result = lat_lon_to_wikidata(lat, lon)["result"]
result.pop("element", None)
result.pop("geojson", None)
2023-11-24 15:35:54 +00:00
if logging_enabled:
remote_addr = request.headers.get("X-Forwarded-For", request.remote_addr)
log = model.LookupLog(
lat=lat,
lon=lon,
remote_addr=remote_addr,
2023-11-25 12:51:45 +00:00
fqdn=socket.getfqdn(remote_addr) if remote_addr else None,
2023-11-24 15:35:54 +00:00
result=result,
2023-12-10 10:41:36 +00:00
response_time_ms=int((time() - t0) * 1000),
2023-11-24 15:35:54 +00:00
)
database.session.add(log)
database.session.commit()
2023-11-24 15:13:00 +00:00
return jsonify(result)
2021-04-16 21:30:25 +01:00
@app.route("/random")
2023-11-04 21:36:05 +00:00
def random_location() -> str | Response:
2023-10-10 10:11:23 +01:00
"""Return detail page for random lat/lon."""
2021-04-16 21:30:25 +01:00
lat, lon = get_random_lat_lon()
2023-11-04 21:36:05 +00:00
return build_detail_page(lat, lon)
2021-04-16 21:30:25 +01:00
@app.route("/wikidata_tag")
2023-10-10 10:11:23 +01:00
def wikidata_tag() -> str:
"""Lookup Wikidata tag for lat/lon."""
lat_str, lon_str = request.args["lat"], request.args["lon"]
lat, lon = float(lat_str), float(lon_str)
2021-04-16 21:30:25 +01:00
2021-04-17 17:02:53 +01:00
scotland_code = scotland.get_scotland_code(lat, lon)
2021-04-16 21:30:25 +01:00
2023-10-10 10:11:23 +01:00
elements: typing.Any
2021-04-16 21:30:25 +01:00
if scotland_code:
2021-04-17 18:29:09 +01:00
rows = wikidata.lookup_scottish_parish_in_wikidata(scotland_code)
hit = wikidata.commons_from_rows(rows)
2021-04-16 21:30:25 +01:00
elements = []
2021-04-17 18:29:09 +01:00
result = wikidata.build_dict(hit, lat, lon)
2021-04-16 21:30:25 +01:00
else:
2021-04-17 17:31:58 +01:00
elements = model.Polygon.coords_within(lat, lon)
2021-04-16 21:30:25 +01:00
result = do_lookup(elements, lat, lon)
return render_template(
"wikidata_tag.html", lat=lat, lon=lon, result=result, elements=elements
)
2023-11-04 21:36:05 +00:00
def build_detail_page(lat: float, lon: float) -> str:
"""Run lookup and build detail page."""
if lat < -90 or lat > 90 or lon < -180 or lon > 180:
error = (
"latitude must be between -90 and 90, "
+ "and longitude must be between -180 and 180"
)
return render_template("query_error.html", lat=lat, lon=lon, error=error)
2022-05-18 14:12:34 +01:00
try:
reply = lat_lon_to_wikidata(lat, lon)
except wikidata.QueryError as e:
query, r = e.args
2023-10-10 10:08:59 +01:00
return render_template("query_error.html", lat=lat, lon=lon, query=query, r=r)
2022-05-18 14:12:34 +01:00
2023-10-13 16:50:27 +01:00
element = reply["result"].pop("element", None)
geojson = reply["result"].pop("geojson", None)
2023-10-13 16:50:27 +01:00
return render_template(
"detail.html",
lat=lat,
lon=lon,
str=str,
element_id=element,
geojson=geojson,
**reply,
2023-10-13 16:50:27 +01:00
)
2021-04-16 21:30:25 +01:00
2023-11-04 21:36:05 +00:00
@app.route("/detail")
def detail_page() -> Response | str:
"""Detail page."""
database.session.execute("SELECT 1")
2023-11-04 21:36:05 +00:00
try:
lat_str, lon_str = request.args["lat"], request.args["lon"]
lat, lon = float(lat_str), float(lon_str)
except TypeError:
return redirect(url_for("index"))
return build_detail_page(lat, lon)
2023-12-10 10:32:15 +00:00
@app.route("/reports")
def reports() -> str:
"""Return reports page with various statistics."""
2023-12-10 10:32:15 +00:00
log_count = model.LookupLog.query.count()
log_start_time, average_response_time = database.session.query(
func.min(model.LookupLog.dt), func.avg(model.LookupLog.response_time_ms)
).one()
2023-12-10 10:32:15 +00:00
# Construct the query
by_day = (
database.session.query(
func.date(model.LookupLog.dt).label("log_date"),
func.count(model.LookupLog.id).label("count"),
)
.group_by("log_date")
.order_by(func.date(model.LookupLog.dt).desc())
)
top_places = (
database.session.query(
model.LookupLog.result["commons_cat"]["title"].label("place"),
func.count().label("num"),
)
.group_by("place")
.order_by(func.count().desc())
.limit(50)
)
missing_places = (
database.session.query(model.LookupLog)
.filter(
model.LookupLog.result.has_key("missing") # type: ignore
) # Filtering for entries where result contains 'missing'
.order_by(model.LookupLog.dt.desc()) # Ordering by dt in descending order
.limit(50) # Limiting to the top 50 results
)
2023-12-10 10:32:15 +00:00
return render_template(
"reports.html",
log_count=log_count,
log_start_time=log_start_time,
average_response_time=average_response_time,
by_day=by_day,
top_places=top_places,
missing_places=missing_places,
2023-12-10 10:32:15 +00:00
)
2021-04-16 16:21:38 +01:00
if __name__ == "__main__":
app.run(host="0.0.0.0")