geocode/geocode/wikidata.py
Edward Betts 1442620ece Handle Wikidata API errors with proper HTTP responses (429, 503)
Also retry on RequestException and raise QueryError on non-200 responses.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-15 15:19:02 +00:00

274 lines
8.2 KiB
Python

"""Wikidata API functions."""
import typing
import urllib.parse
import backoff
import backoff.types
import requests
from flask import render_template
from requests.exceptions import JSONDecodeError, RequestException
from . import headers, mail
wikidata_query_api_url = "https://query.wikidata.org/bigdata/namespace/wdq/sparql"
wd_entity = "http://www.wikidata.org/entity/Q"
commons_cat_start = "https://commons.wikimedia.org/wiki/Category:"
def giveup(details: backoff.types.Details) -> None:
"""Display API call fail debug info."""
last_exception = details["exception"] # type: ignore
if last_exception and isinstance(last_exception, APIResponseError):
body = f"Error making Wikidata API call\n\n{last_exception.response.text}"
try:
mail.send_to_admin("Geocode error", body)
except Exception:
pass
class QueryError(Exception):
"""Query error."""
def __init__(self, query: str, r: requests.Response):
"""Init."""
self.query = query
self.r = r
class APIResponseError(Exception):
"""Custom exception for API errors with response text."""
def __init__(self, message: str, response: requests.Response):
"""Init."""
super().__init__(message)
self.response = response
@backoff.on_exception(
backoff.expo,
(RequestException, APIResponseError),
max_tries=5,
on_giveup=giveup,
)
def api_call(params: dict[str, str | int]) -> dict[str, typing.Any]:
"""Wikidata API call."""
api_params: dict[str, str | int] = {"format": "json", "formatversion": 2, **params}
try:
r = requests.get(
"https://www.wikidata.org/w/api.php", params=api_params, headers=headers
)
return typing.cast(dict[str, typing.Any], r.json())
except JSONDecodeError:
raise APIResponseError("Failed to decode JSON", r)
def get_entity(qid: str) -> dict[str, typing.Any] | None:
"""Get Wikidata entity."""
json_data = api_call({"action": "wbgetentities", "ids": qid})
try:
entity: dict[str, typing.Any] = list(json_data["entities"].values())[0]
except KeyError:
return None
return entity if "missing" not in entity else None
def qid_to_commons_category(qid: str, check_p910: bool = True) -> str | None:
"""Commons category for a given Wikidata item."""
entity = get_entity(qid)
cat_start = "Category:"
if not entity:
return None
try:
cat: str = entity["claims"]["P373"][0]["mainsnak"]["datavalue"]["value"]
return cat
except Exception:
pass
try:
sitelink = entity["sitelinks"]["commonswiki"]["title"]
except KeyError:
sitelink = None
if sitelink:
return sitelink[len(cat_start) :] if sitelink.startswith(cat_start) else None
if not check_p910:
return None
try:
cat_qid = entity["claims"]["P910"][0]["mainsnak"]["datavalue"]["value"]["id"]
except Exception:
return None
return qid_to_commons_category(cat_qid, check_p910=False)
Row = dict[str, dict[str, typing.Any]]
Hit = dict[str, str | int | None]
@backoff.on_exception(backoff.expo, (QueryError, RequestException), max_tries=5)
def wdqs(query: str) -> list[Row]:
"""Pass query to the Wikidata Query Service."""
r = requests.post(
wikidata_query_api_url, data={"query": query, "format": "json"}, headers=headers
)
if not r.ok:
raise QueryError(query, r)
try:
return typing.cast(list[Row], r.json()["results"]["bindings"])
except requests.exceptions.JSONDecodeError:
raise QueryError(query, r)
def wd_to_qid(wd: dict[str, str]) -> str:
"""Convert Wikidata URL from WDQS to QID."""
# expecting {"type": "url", "value": "https://www.wikidata.org/wiki/Q30"}
assert wd["type"] == "uri"
return wd_uri_to_qid(wd["value"])
def wd_uri_to_qid(value: str) -> str:
"""Convert URL like https://www.wikidata.org/wiki/Q30 to QID."""
assert value.startswith(wd_entity)
return value[len(wd_entity) - 1 :]
def geosearch_query(lat: float, lon: float) -> str:
"""Geosearch via WDQS."""
lat_str, lon_str = f"{lat:f}", f"{lon:f}"
query = render_template("sparql/geosearch.sparql", lat=lat_str, lon=lon_str)
return query
def filter_geosearch_row(rows: list[Row]) -> Row | None:
"""Filter geosearch rows for the best match that has a Commons category."""
default_max_dist = 1
max_dist = {
"Q188509": 1, # suburb
"Q3957": 2, # town
"Q532": 1, # village
"Q5084": 1, # hamlet
"Q515": 2, # city
"Q1549591": 3, # big city
"Q589282": 2, # ward or electoral division of the United Kingdom
}
for row in rows:
isa = wd_uri_to_qid(row["isa"]["value"])
if (
"commonsCat" not in row
and "commonsSiteLink" not in row
and isa not in max_dist
):
continue
distance = float(row["distance"]["value"])
if distance > max_dist.get(isa, default_max_dist):
continue
if "commonsCat" not in row and "commonsSiteLink" not in row:
break
return row
return None
def geosearch(lat: float, lon: float) -> Row | None:
"""Geosearch."""
rows = wdqs(geosearch_query(lat, lon))
return filter_geosearch_row(rows)
def hit_from_row(row: Row) -> Hit:
"""Build a Hit from a geosearch row (commons cat is optional)."""
qid = wd_to_qid(row["item"])
commons_cat: str | None = None
if "commonsCat" in row:
commons_cat = row["commonsCat"]["value"]
elif "commonsSiteLink" in row:
site_link = row["commonsSiteLink"]["value"]
commons_cat = unescape_title(site_link[len(commons_cat_start):])
return {"wikidata": qid, "commons_cat": commons_cat}
def lookup_wikidata_by_osm_relation_ids(relation_ids: list[int]) -> list[Row]:
"""Look up Wikidata items that reference the given OSM relation IDs via P402."""
query = render_template(
"sparql/lookup_by_osm_relation.sparql",
relation_ids=[str(r) for r in relation_ids],
)
return wdqs(query)
def lookup_scottish_parish_in_wikidata(code: str) -> list[Row]:
"""Lookup scottish parish in Wikidata."""
return wdqs(render_template("sparql/scottish_parish.sparql", code=code))
def lookup_gss_in_wikidata(gss: str) -> list[Row]:
"""Lookup GSS in Wikidata."""
return wdqs(render_template("sparql/lookup_gss.sparql", gss=gss))
def lookup_wikidata_by_name(name: str, lat: float, lon: float) -> list[Row]:
"""Lookup place in Wikidata by name."""
query = render_template(
"sparql/lookup_by_name.sparql", name=repr(name), lat=str(lat), lon=str(lon)
)
return wdqs(query)
def unescape_title(t: str) -> str:
"""Unescape article title."""
return urllib.parse.unquote(t.replace("_", " "))
def commons_from_rows(rows: list[Row]) -> Hit | None:
"""Commons from rows."""
for row in rows:
if "commonsCat" in row:
qid = wd_to_qid(row["item"])
return {"wikidata": qid, "commons_cat": row["commonsCat"]["value"]}
if "commonsSiteLink" in row:
site_link = row["commonsSiteLink"]["value"]
qid = wd_to_qid(row["item"])
cat = unescape_title(site_link[len(commons_cat_start) :])
return {"wikidata": qid, "commons_cat": cat}
return None
def get_commons_cat_from_gss(gss: str) -> Hit | None:
"""Get commons from GSS via Wikidata."""
return commons_from_rows(lookup_gss_in_wikidata(gss))
WikidataDict = dict[str, None | bool | str | int | dict[str, typing.Any]]
def build_dict(hit: Hit | None, lat: float, lon: float) -> WikidataDict:
"""Build dict."""
coords = {"lat": lat, "lon": lon}
if hit is None:
return {"commons_cat": None, "missing": True, "coords": coords}
commons_cat = hit["commons_cat"]
ret: WikidataDict = {
"coords": coords,
"admin_level": hit.get("admin_level"),
"wikidata": hit["wikidata"],
"element": hit.get("element"),
"geojson": hit.get("geojson"),
}
if not commons_cat:
return ret
assert isinstance(commons_cat, str)
url = commons_cat_start + urllib.parse.quote(commons_cat.replace(" ", "_"))
ret["commons_cat"] = {"title": commons_cat, "url": url}
return ret