Add types and docstrings.

This commit is contained in:
Edward Betts 2023-10-10 10:11:23 +01:00
parent 5c01d9aebf
commit e975e86af5
6 changed files with 181 additions and 87 deletions

View file

@ -1,3 +1,6 @@
"""Reverse geocode for Wikidata and Wikimedia Commons."""
headers = {"User-Agent": "UK gecode/0.1 (edward@4angle.com)"}
samples = [

View file

@ -1,25 +1,31 @@
import flask
import sqlalchemy
from sqlalchemy import create_engine, func
from sqlalchemy.orm import scoped_session, sessionmaker
session = scoped_session(sessionmaker())
def init_db(db_url):
session.configure(bind=get_engine(db_url))
def init_db(db_url: str, echo: bool = False) -> None:
"""Initialise databsae."""
session.configure(bind=get_engine(db_url, echo=echo))
def get_engine(db_url, echo=False):
def get_engine(db_url: str, echo: bool = False) -> sqlalchemy.engine.base.Engine:
"""Create an engine object."""
return create_engine(db_url, pool_recycle=3600, echo=echo)
def init_app(app, echo=False):
def init_app(app: flask.app.Flask, echo: bool = False) -> None:
"""Initialise database connection within flask app."""
db_url = app.config["DB_URL"]
session.configure(bind=get_engine(db_url, echo=echo))
@app.teardown_appcontext
def shutdown_session(exception=None):
def shutdown_session(exception: Exception | None = None) -> None:
session.remove()
def now_utc():
"""Time in UTC via SQL."""
return func.timezone("utc", func.now())

View file

@ -1,12 +1,15 @@
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.schema import Column
from sqlalchemy.types import Integer, Float, Numeric, String
from sqlalchemy.dialects import postgresql
from sqlalchemy.orm import column_property
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy import func, cast
"""Database model."""
import sqlalchemy
from geoalchemy2 import Geometry
from sqlalchemy import cast, func
from sqlalchemy.dialects import postgresql
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import column_property
from sqlalchemy.schema import Column
from sqlalchemy.types import Float, Integer, Numeric, String
from .database import session
Base = declarative_base()
@ -14,6 +17,8 @@ Base.query = session.query_property()
class Polygon(Base):
"""Polygon."""
__tablename__ = "planet_osm_polygon"
osm_id = Column(Integer, primary_key=True, autoincrement=False)
@ -25,25 +30,33 @@ class Polygon(Base):
area = column_property(func.ST_Area(way, False))
@property
def osm_url(self):
def osm_url(self) -> str:
"""OSM URL for polygon."""
osm_type = "way" if self.osm_id > 0 else "relation"
return f"https://www.openstreetmap.org/{osm_type}/{abs(self.osm_id)}"
@hybrid_property
def area_in_sq_km(self):
return self.area / (1000 * 1000)
def area_in_sq_km(self) -> float:
"""Area in square kilometers."""
return float(self.area) / (1000 * 1000)
@classmethod
def coords_within(cls, lat, lon):
def coords_within(
cls, lat: str | float, lon: str | float
) -> sqlalchemy.orm.query.Query["Polygon"]:
"""Polygons that contain given coordinates."""
point = func.ST_SetSRID(func.ST_MakePoint(lon, lat), 4326)
return cls.query.filter(
q: sqlalchemy.orm.query.Query["Polygon"] = cls.query.filter(
cls.admin_level.isnot(None),
cls.admin_level.regexp_match("^\d+$"),
func.ST_Within(point, cls.way),
).order_by(cls.area, cast(cls.admin_level, Integer).desc())
return q
class Scotland(Base):
"""Civil parishes in Scotland."""
__tablename__ = "scotland"
gid = Column(Integer, primary_key=True)

View file

@ -1,8 +1,11 @@
from flask import current_app
"""Reverse geocode civil parishes in Scotland."""
import psycopg2
from flask import current_app
def get_scotland_code(lat, lon):
def get_scotland_code(lat: str | float, lon: str | float) -> str | None:
"""Find civil parish in Scotland for given lat/lon."""
conn = psycopg2.connect(**current_app.config["DB_PARAMS"])
cur = conn.cursor()

View file

@ -1,41 +1,54 @@
from flask import render_template
import requests
import simplejson
from . import headers
"""Wikidata API functions."""
import typing
import urllib.parse
import requests
import simplejson.errors
from flask import render_template
from . import headers
wikidata_query_api_url = "https://query.wikidata.org/bigdata/namespace/wdq/sparql"
wd_entity = "http://www.wikidata.org/entity/Q"
commons_cat_start = "https://commons.wikimedia.org/wiki/Category:"
class QueryError(Exception):
def __init__(self, query, r):
"""Query error."""
def __init__(self, query: str, r: requests.Response):
"""Init."""
self.query = query
self.r = r
def api_call(params):
return requests.get(
"https://www.wikidata.org/w/api.php",
params={"format": "json", "formatversion": 2, **params},
headers=headers,
).json()
def api_call(params: dict[str, str | int]) -> dict[str, typing.Any]:
"""Wikidata API call."""
api_params: dict[str, str | int] = {"format": "json", "formatversion": 2, **params}
r = requests.get(
"https://www.wikidata.org/w/api.php", params=api_params, headers=headers
)
return typing.cast(dict[str, typing.Any], r.json())
def get_entity(qid):
def get_entity(qid: str) -> dict[str, typing.Any] | None:
"""Get Wikidata entity."""
json_data = api_call({"action": "wbgetentities", "ids": qid})
try:
entity = list(json_data["entities"].values())[0]
entity: dict[str, typing.Any] = list(json_data["entities"].values())[0]
except KeyError:
return
if "missing" not in entity:
return entity
return None
return entity if "missing" not in entity else None
def qid_to_commons_category(qid):
def qid_to_commons_category(qid: str) -> str | None:
"""Commons category for a given Wikidata item."""
entity = get_entity(qid)
if not entity:
return None
commons_cat: str | None
try:
commons_cat = entity["claims"]["P373"][0]["mainsnak"]["datavalue"]["value"]
except Exception:
@ -44,29 +57,36 @@ def qid_to_commons_category(qid):
return commons_cat
def wdqs(query):
Row = dict[str, dict[str, typing.Any]]
def wdqs(query: str) -> list[Row]:
"""Pass query to the Wikidata Query Service."""
r = requests.post(
wikidata_query_api_url, data={"query": query, "format": "json"}, headers=headers
)
try:
return r.json()["results"]["bindings"]
return typing.cast(list[Row], r.json()["results"]["bindings"])
except simplejson.errors.JSONDecodeError:
raise QueryError(query, r)
def wd_to_qid(wd):
def wd_to_qid(wd: dict[str, str]) -> str:
"""Convert Wikidata URL from WDQS to QID."""
# expecting {"type": "url", "value": "https://www.wikidata.org/wiki/Q30"}
if wd["type"] == "uri":
return wd_uri_to_qid(wd["value"])
assert wd["type"] == "uri"
return wd_uri_to_qid(wd["value"])
def wd_uri_to_qid(value):
def wd_uri_to_qid(value: str) -> str:
"""Convert URL like https://www.wikidata.org/wiki/Q30 to QID."""
assert value.startswith(wd_entity)
return value[len(wd_entity) - 1 :]
def geosearch_query(lat, lon):
def geosearch_query(lat: str | float, lon: str | float) -> list[Row]:
"""Geosearch via WDQS."""
if isinstance(lat, float):
lat = f"{lat:f}"
if isinstance(lon, float):
@ -76,7 +96,8 @@ def geosearch_query(lat, lon):
return wdqs(query)
def geosearch(lat, lon):
def geosearch(lat: str | float, lon: str | float) -> Row | None:
"""Geosearch."""
default_max_dist = 1
rows = geosearch_query(lat, lon)
max_dist = {
@ -105,30 +126,37 @@ def geosearch(lat, lon):
break
return row
return None
def lookup_scottish_parish_in_wikidata(code):
query = render_template("sparql/scottish_parish.sparql", code=code)
return wdqs(query)
def lookup_scottish_parish_in_wikidata(code: str) -> list[Row]:
"""Lookup scottish parish in Wikidata."""
return wdqs(render_template("sparql/scottish_parish.sparql", code=code))
def lookup_gss_in_wikidata(gss):
query = render_template("sparql/lookup_gss.sparql", gss=gss)
return wdqs(query)
def lookup_gss_in_wikidata(gss: str) -> list[Row]:
"""Lookup GSS in Wikidata."""
return wdqs(render_template("sparql/lookup_gss.sparql", gss=gss))
def lookup_wikidata_by_name(name, lat, lon):
def lookup_wikidata_by_name(name: str, lat: float | str, lon: float | str) -> list[Row]:
"""Lookup place in Wikidata by name."""
query = render_template(
"sparql/lookup_by_name.sparql", name=repr(name), lat=str(lat), lon=str(lon)
)
return wdqs(query)
def unescape_title(t):
def unescape_title(t: str) -> str:
"""Unescape article title."""
return urllib.parse.unquote(t.replace("_", " "))
def commons_from_rows(rows):
Hit = dict[str, str | None]
def commons_from_rows(rows: list[Row]) -> Hit | None:
"""Commons from rows."""
for row in rows:
if "commonsCat" in row:
qid = wd_to_qid(row["item"])
@ -138,22 +166,28 @@ def commons_from_rows(rows):
qid = wd_to_qid(row["item"])
cat = unescape_title(site_link[len(commons_cat_start) :])
return {"wikidata": qid, "commons_cat": cat}
return None
def get_commons_cat_from_gss(gss):
def get_commons_cat_from_gss(gss: str) -> Hit | None:
"""Get commons from GSS via Wikidata."""
return commons_from_rows(lookup_gss_in_wikidata(gss))
def build_dict(hit, lat, lon):
WikidataDict = dict[str, None | bool | str | int | dict[str, typing.Any]]
def build_dict(hit: Hit | None, lat: str | float, lon: str | float) -> WikidataDict:
"""Build dict."""
coords = {"lat": lat, "lon": lon}
if hit is None:
return dict(commons_cat=None, missing=True, coords=coords)
return {"commons_cat": None, "missing": True, "coords": coords}
commons_cat = hit["commons_cat"]
ret = dict(
coords=coords,
admin_level=hit.get("admin_level"),
wikidata=hit["wikidata"],
)
ret: WikidataDict = {
"coords": coords,
"admin_level": hit.get("admin_level"),
"wikidata": hit["wikidata"],
}
if not commons_cat:
return ret