From e975e86af5419207baacb16fca6470782e2b7a09 Mon Sep 17 00:00:00 2001 From: Edward Betts Date: Tue, 10 Oct 2023 10:11:23 +0100 Subject: [PATCH] Add types and docstrings. --- geocode/__init__.py | 3 ++ geocode/database.py | 16 ++++-- geocode/model.py | 37 +++++++++----- geocode/scotland.py | 7 ++- geocode/wikidata.py | 118 ++++++++++++++++++++++++++++---------------- lookup.py | 87 ++++++++++++++++++++++---------- 6 files changed, 181 insertions(+), 87 deletions(-) diff --git a/geocode/__init__.py b/geocode/__init__.py index d1b913c..8e2efce 100644 --- a/geocode/__init__.py +++ b/geocode/__init__.py @@ -1,3 +1,6 @@ +"""Reverse geocode for Wikidata and Wikimedia Commons.""" + + headers = {"User-Agent": "UK gecode/0.1 (edward@4angle.com)"} samples = [ diff --git a/geocode/database.py b/geocode/database.py index dc667bf..aa59e8e 100644 --- a/geocode/database.py +++ b/geocode/database.py @@ -1,25 +1,31 @@ +import flask +import sqlalchemy from sqlalchemy import create_engine, func from sqlalchemy.orm import scoped_session, sessionmaker session = scoped_session(sessionmaker()) -def init_db(db_url): - session.configure(bind=get_engine(db_url)) +def init_db(db_url: str, echo: bool = False) -> None: + """Initialise databsae.""" + session.configure(bind=get_engine(db_url, echo=echo)) -def get_engine(db_url, echo=False): +def get_engine(db_url: str, echo: bool = False) -> sqlalchemy.engine.base.Engine: + """Create an engine object.""" return create_engine(db_url, pool_recycle=3600, echo=echo) -def init_app(app, echo=False): +def init_app(app: flask.app.Flask, echo: bool = False) -> None: + """Initialise database connection within flask app.""" db_url = app.config["DB_URL"] session.configure(bind=get_engine(db_url, echo=echo)) @app.teardown_appcontext - def shutdown_session(exception=None): + def shutdown_session(exception: Exception | None = None) -> None: session.remove() def now_utc(): + """Time in UTC via SQL.""" return func.timezone("utc", func.now()) diff --git a/geocode/model.py b/geocode/model.py index a4b1802..343dfdc 100644 --- a/geocode/model.py +++ b/geocode/model.py @@ -1,12 +1,15 @@ -from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.schema import Column -from sqlalchemy.types import Integer, Float, Numeric, String -from sqlalchemy.dialects import postgresql -from sqlalchemy.orm import column_property -from sqlalchemy.ext.hybrid import hybrid_property -from sqlalchemy import func, cast +"""Database model.""" +import sqlalchemy from geoalchemy2 import Geometry +from sqlalchemy import cast, func +from sqlalchemy.dialects import postgresql +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.ext.hybrid import hybrid_property +from sqlalchemy.orm import column_property +from sqlalchemy.schema import Column +from sqlalchemy.types import Float, Integer, Numeric, String + from .database import session Base = declarative_base() @@ -14,6 +17,8 @@ Base.query = session.query_property() class Polygon(Base): + """Polygon.""" + __tablename__ = "planet_osm_polygon" osm_id = Column(Integer, primary_key=True, autoincrement=False) @@ -25,25 +30,33 @@ class Polygon(Base): area = column_property(func.ST_Area(way, False)) @property - def osm_url(self): + def osm_url(self) -> str: + """OSM URL for polygon.""" osm_type = "way" if self.osm_id > 0 else "relation" return f"https://www.openstreetmap.org/{osm_type}/{abs(self.osm_id)}" @hybrid_property - def area_in_sq_km(self): - return self.area / (1000 * 1000) + def area_in_sq_km(self) -> float: + """Area in square kilometers.""" + return float(self.area) / (1000 * 1000) @classmethod - def coords_within(cls, lat, lon): + def coords_within( + cls, lat: str | float, lon: str | float + ) -> sqlalchemy.orm.query.Query["Polygon"]: + """Polygons that contain given coordinates.""" point = func.ST_SetSRID(func.ST_MakePoint(lon, lat), 4326) - return cls.query.filter( + q: sqlalchemy.orm.query.Query["Polygon"] = cls.query.filter( cls.admin_level.isnot(None), cls.admin_level.regexp_match("^\d+$"), func.ST_Within(point, cls.way), ).order_by(cls.area, cast(cls.admin_level, Integer).desc()) + return q class Scotland(Base): + """Civil parishes in Scotland.""" + __tablename__ = "scotland" gid = Column(Integer, primary_key=True) diff --git a/geocode/scotland.py b/geocode/scotland.py index d87dd2b..27c50e3 100644 --- a/geocode/scotland.py +++ b/geocode/scotland.py @@ -1,8 +1,11 @@ -from flask import current_app +"""Reverse geocode civil parishes in Scotland.""" + import psycopg2 +from flask import current_app -def get_scotland_code(lat, lon): +def get_scotland_code(lat: str | float, lon: str | float) -> str | None: + """Find civil parish in Scotland for given lat/lon.""" conn = psycopg2.connect(**current_app.config["DB_PARAMS"]) cur = conn.cursor() diff --git a/geocode/wikidata.py b/geocode/wikidata.py index 7e17c0c..4b4f920 100644 --- a/geocode/wikidata.py +++ b/geocode/wikidata.py @@ -1,41 +1,54 @@ -from flask import render_template -import requests -import simplejson -from . import headers +"""Wikidata API functions.""" + +import typing import urllib.parse +import requests +import simplejson.errors +from flask import render_template + +from . import headers + wikidata_query_api_url = "https://query.wikidata.org/bigdata/namespace/wdq/sparql" wd_entity = "http://www.wikidata.org/entity/Q" commons_cat_start = "https://commons.wikimedia.org/wiki/Category:" class QueryError(Exception): - def __init__(self, query, r): + """Query error.""" + + def __init__(self, query: str, r: requests.Response): + """Init.""" self.query = query self.r = r -def api_call(params): - return requests.get( - "https://www.wikidata.org/w/api.php", - params={"format": "json", "formatversion": 2, **params}, - headers=headers, - ).json() +def api_call(params: dict[str, str | int]) -> dict[str, typing.Any]: + """Wikidata API call.""" + api_params: dict[str, str | int] = {"format": "json", "formatversion": 2, **params} + r = requests.get( + "https://www.wikidata.org/w/api.php", params=api_params, headers=headers + ) + return typing.cast(dict[str, typing.Any], r.json()) -def get_entity(qid): +def get_entity(qid: str) -> dict[str, typing.Any] | None: + """Get Wikidata entity.""" json_data = api_call({"action": "wbgetentities", "ids": qid}) try: - entity = list(json_data["entities"].values())[0] + entity: dict[str, typing.Any] = list(json_data["entities"].values())[0] except KeyError: - return - if "missing" not in entity: - return entity + return None + return entity if "missing" not in entity else None -def qid_to_commons_category(qid): +def qid_to_commons_category(qid: str) -> str | None: + """Commons category for a given Wikidata item.""" entity = get_entity(qid) + if not entity: + return None + commons_cat: str | None try: commons_cat = entity["claims"]["P373"][0]["mainsnak"]["datavalue"]["value"] except Exception: @@ -44,29 +57,36 @@ def qid_to_commons_category(qid): return commons_cat -def wdqs(query): +Row = dict[str, dict[str, typing.Any]] + + +def wdqs(query: str) -> list[Row]: + """Pass query to the Wikidata Query Service.""" r = requests.post( wikidata_query_api_url, data={"query": query, "format": "json"}, headers=headers ) try: - return r.json()["results"]["bindings"] + return typing.cast(list[Row], r.json()["results"]["bindings"]) except simplejson.errors.JSONDecodeError: raise QueryError(query, r) -def wd_to_qid(wd): +def wd_to_qid(wd: dict[str, str]) -> str: + """Convert Wikidata URL from WDQS to QID.""" # expecting {"type": "url", "value": "https://www.wikidata.org/wiki/Q30"} - if wd["type"] == "uri": - return wd_uri_to_qid(wd["value"]) + assert wd["type"] == "uri" + return wd_uri_to_qid(wd["value"]) -def wd_uri_to_qid(value): +def wd_uri_to_qid(value: str) -> str: + """Convert URL like https://www.wikidata.org/wiki/Q30 to QID.""" assert value.startswith(wd_entity) return value[len(wd_entity) - 1 :] -def geosearch_query(lat, lon): +def geosearch_query(lat: str | float, lon: str | float) -> list[Row]: + """Geosearch via WDQS.""" if isinstance(lat, float): lat = f"{lat:f}" if isinstance(lon, float): @@ -76,7 +96,8 @@ def geosearch_query(lat, lon): return wdqs(query) -def geosearch(lat, lon): +def geosearch(lat: str | float, lon: str | float) -> Row | None: + """Geosearch.""" default_max_dist = 1 rows = geosearch_query(lat, lon) max_dist = { @@ -105,30 +126,37 @@ def geosearch(lat, lon): break return row + return None -def lookup_scottish_parish_in_wikidata(code): - query = render_template("sparql/scottish_parish.sparql", code=code) - return wdqs(query) +def lookup_scottish_parish_in_wikidata(code: str) -> list[Row]: + """Lookup scottish parish in Wikidata.""" + return wdqs(render_template("sparql/scottish_parish.sparql", code=code)) -def lookup_gss_in_wikidata(gss): - query = render_template("sparql/lookup_gss.sparql", gss=gss) - return wdqs(query) +def lookup_gss_in_wikidata(gss: str) -> list[Row]: + """Lookup GSS in Wikidata.""" + return wdqs(render_template("sparql/lookup_gss.sparql", gss=gss)) -def lookup_wikidata_by_name(name, lat, lon): +def lookup_wikidata_by_name(name: str, lat: float | str, lon: float | str) -> list[Row]: + """Lookup place in Wikidata by name.""" query = render_template( "sparql/lookup_by_name.sparql", name=repr(name), lat=str(lat), lon=str(lon) ) return wdqs(query) -def unescape_title(t): +def unescape_title(t: str) -> str: + """Unescape article title.""" return urllib.parse.unquote(t.replace("_", " ")) -def commons_from_rows(rows): +Hit = dict[str, str | None] + + +def commons_from_rows(rows: list[Row]) -> Hit | None: + """Commons from rows.""" for row in rows: if "commonsCat" in row: qid = wd_to_qid(row["item"]) @@ -138,22 +166,28 @@ def commons_from_rows(rows): qid = wd_to_qid(row["item"]) cat = unescape_title(site_link[len(commons_cat_start) :]) return {"wikidata": qid, "commons_cat": cat} + return None -def get_commons_cat_from_gss(gss): +def get_commons_cat_from_gss(gss: str) -> Hit | None: + """Get commons from GSS via Wikidata.""" return commons_from_rows(lookup_gss_in_wikidata(gss)) -def build_dict(hit, lat, lon): +WikidataDict = dict[str, None | bool | str | int | dict[str, typing.Any]] + + +def build_dict(hit: Hit | None, lat: str | float, lon: str | float) -> WikidataDict: + """Build dict.""" coords = {"lat": lat, "lon": lon} if hit is None: - return dict(commons_cat=None, missing=True, coords=coords) + return {"commons_cat": None, "missing": True, "coords": coords} commons_cat = hit["commons_cat"] - ret = dict( - coords=coords, - admin_level=hit.get("admin_level"), - wikidata=hit["wikidata"], - ) + ret: WikidataDict = { + "coords": coords, + "admin_level": hit.get("admin_level"), + "wikidata": hit["wikidata"], + } if not commons_cat: return ret diff --git a/lookup.py b/lookup.py index 80efb8b..e814377 100755 --- a/lookup.py +++ b/lookup.py @@ -1,9 +1,14 @@ #!/usr/bin/python3 -from flask import Flask, render_template, request, jsonify, redirect, url_for -from geocode import wikidata, scotland, database, model -import geocode import random +import typing + +import sqlalchemy +from flask import Flask, jsonify, redirect, render_template, request, url_for +from werkzeug.wrappers import Response + +import geocode +from geocode import database, model, scotland, wikidata city_of_london_qid = "Q23311" app = Flask(__name__) @@ -11,8 +16,8 @@ app.config.from_object("config.default") database.init_app(app) -def get_random_lat_lon(): - """Select random lat/lon within the UK""" +def get_random_lat_lon() -> tuple[float, float]: + """Select random lat/lon within the UK.""" south, east = 50.8520, 0.3536 north, west = 53.7984, -2.7296 @@ -23,7 +28,12 @@ def get_random_lat_lon(): return lat, lon -def do_lookup(elements, lat, lon): +Elements = sqlalchemy.orm.query.Query[model.Polygon] + + +def do_lookup( + elements: Elements, lat: str | float, lon: str | float +) -> wikidata.WikidataDict: try: hit = osm_lookup(elements, lat, lon) except wikidata.QueryError as e: @@ -36,9 +46,10 @@ def do_lookup(elements, lat, lon): return wikidata.build_dict(hit, lat, lon) -def lat_lon_to_wikidata(lat, lon): +def lat_lon_to_wikidata(lat: str | float, lon: str | float) -> dict[str, typing.Any]: scotland_code = scotland.get_scotland_code(lat, lon) + elements: typing.Any if scotland_code: rows = wikidata.lookup_scottish_parish_in_wikidata(scotland_code) hit = wikidata.commons_from_rows(rows) @@ -55,6 +66,7 @@ def lat_lon_to_wikidata(lat, lon): return {"elements": elements, "result": result} admin_level = result.get("admin_level") + assert isinstance(admin_level, int) if not admin_level or admin_level >= 7: return {"elements": elements, "result": result} @@ -68,11 +80,17 @@ def lat_lon_to_wikidata(lat, lon): return {"elements": elements, "result": result} -def osm_lookup(elements, lat, lon): +def osm_lookup( + elements: Elements, lat: str | float, lon: str | float +) -> wikidata.Hit | None: + """OSM lookup.""" + ret: wikidata.Hit | None for e in elements: - tags = e.tags + assert isinstance(e, model.Polygon) + assert e.tags + tags: typing.Mapping[str, typing.Any] = e.tags admin_level_tag = tags.get("admin_level") - admin_level = ( + admin_level: int | None = ( int(admin_level_tag) if admin_level_tag and admin_level_tag.isdigit() else None @@ -108,24 +126,36 @@ def osm_lookup(elements, lat, lon): ret["admin_level"] = admin_level return ret - has_wikidata_tag = [e.tags for e in elements if e.tags.get("wikidata")] + has_wikidata_tag = [ + e.tags for e in elements if e.tags.get("wikidata") # type: ignore + ] if len(has_wikidata_tag) != 1: - return + return None + assert has_wikidata_tag[0] qid = has_wikidata_tag[0]["wikidata"] - return { - "wikidata": qid, - "commons_cat": wikidata.qid_to_commons_category(qid), - "admin_level": admin_level, - } + return typing.cast( + wikidata.Hit, + { + "wikidata": qid, + "commons_cat": wikidata.qid_to_commons_category(qid), + "admin_level": admin_level, + }, + ) + + +def redirect_to_detail(q: str) -> Response: + """Redirect to detail page.""" + lat, lon = [v.strip() for v in q.split(",", 1)] + return redirect(url_for("detail_page", lat=lat, lon=lon)) @app.route("/") -def index(): +def index() -> str | Response: + """Index page.""" q = request.args.get("q") if q and q.strip(): - lat, lon = [v.strip() for v in q.split(",", 1)] - return redirect(url_for("detail_page", lat=lat, lon=lon)) + return redirect_to_detail(q) lat, lon = request.args.get("lat"), request.args.get("lon") @@ -137,7 +167,8 @@ def index(): @app.route("/random") -def random_location(): +def random_location() -> str: + """Return detail page for random lat/lon.""" lat, lon = get_random_lat_lon() elements = model.Polygon.coords_within(lat, lon) @@ -149,12 +180,14 @@ def random_location(): @app.route("/wikidata_tag") -def wikidata_tag(): - lat = float(request.args.get("lat")) - lon = float(request.args.get("lon")) +def wikidata_tag() -> str: + """Lookup Wikidata tag for lat/lon.""" + lat_str, lon_str = request.args["lat"], request.args["lon"] + lat, lon = float(lat_str), float(lon_str) scotland_code = scotland.get_scotland_code(lat, lon) + elements: typing.Any if scotland_code: rows = wikidata.lookup_scottish_parish_in_wikidata(scotland_code) hit = wikidata.commons_from_rows(rows) @@ -170,9 +203,11 @@ def wikidata_tag(): @app.route("/detail") -def detail_page(): +def detail_page() -> Response | str: + """Detail page.""" try: - lat, lon = [float(request.args.get(param)) for param in ("lat", "lon")] + lat_str, lon_str = request.args["lat"], request.args["lon"] + lat, lon = float(lat_str), float(lon_str) except TypeError: return redirect(url_for("index")) try: