diff --git a/geocode/wikidata.py b/geocode/wikidata.py index 33ea0d2..226ecab 100644 --- a/geocode/wikidata.py +++ b/geocode/wikidata.py @@ -1,8 +1,12 @@ +from flask import render_template import requests import simplejson from . import headers +import urllib.parse wikidata_query_api_url = "https://query.wikidata.org/bigdata/namespace/wdq/sparql" +wd_entity = "http://www.wikidata.org/entity/Q" +commons_cat_start = "https://commons.wikimedia.org/wiki/Category:" class QueryError(Exception): @@ -49,3 +53,105 @@ def wdqs(query): return r.json()["results"]["bindings"] except simplejson.errors.JSONDecodeError: raise QueryError(query, r) + + +def wd_to_qid(wd): + # expecting {"type": "url", "value": "https://www.wikidata.org/wiki/Q30"} + if wd["type"] == "uri": + return wd_uri_to_qid(wd["value"]) + + +def wd_uri_to_qid(value): + assert value.startswith(wd_entity) + return value[len(wd_entity) - 1 :] + +def geosearch_query(lat, lon): + if isinstance(lat, float): + lat = f"{lat:f}" + if isinstance(lon, float): + lon = f"{lon:f}" + + query = render_template("sparql/geosearch.sparql", lat=lat, lon=lon) + return wdqs(query) + + +def geosearch(lat, lon): + default_max_dist = 1 + rows = geosearch_query(lat, lon) + max_dist = { + "Q188509": 1, # suburb + "Q3957": 2, # town + "Q532": 1, # village + "Q5084": 1, # hamlet + "Q515": 2, # city + "Q1549591": 3, # big city + } + for row in rows: + isa = wd_uri_to_qid(row["isa"]["value"]) + + if ( + "commonsCat" not in row + and "commonsSiteLink" not in row + and isa not in max_dist + ): + continue + + distance = float(row["distance"]["value"]) + if distance > max_dist.get(isa, default_max_dist): + continue + + if "commonsCat" not in row and "commonsSiteLink" not in row: + break + + return row + + +def lookup_scottish_parish_in_wikidata(code): + query = render_template("sparql/scottish_parish.sparql", code=code) + return wdqs(query) + + +def lookup_gss_in_wikidata(gss): + query = render_template("sparql/lookup_gss.sparql", gss=gss) + return wdqs(query) + + +def lookup_wikidata_by_name(name, lat, lon): + query = render_template( + "sparql/lookup_by_name.sparql", name=repr(name), lat=str(lat), lon=str(lon) + ) + return wdqs(query) + + +def unescape_title(t): + return urllib.parse.unquote(t.replace("_", " ")) + + +def commons_from_rows(rows): + for row in rows: + if "commonsCat" in row: + qid = wd_to_qid(row["item"]) + return {"wikidata": qid, "commons_cat": row["commonsCat"]["value"]} + if "commonsSiteLink" in row: + site_link = row["commonsSiteLink"]["value"] + qid = wd_to_qid(row["item"]) + cat = unescape_title(site_link[len(commons_cat_start) :]) + return {"wikidata": qid, "commons_cat": cat} + + +def get_commons_cat_from_gss(gss): + return commons_from_rows(lookup_gss_in_wikidata(gss)) + + +def build_dict(hit, lat, lon): + coords = {"lat": lat, "lon": lon} + if hit is None: + return dict(commons_cat=None, missing=True, coords=coords) + commons_cat = hit["commons_cat"] + url = commons_cat_start + urllib.parse.quote(commons_cat.replace(" ", "_")) + return dict( + commons_cat={"title": commons_cat, "url": url}, + coords=coords, + admin_level=hit.get("admin_level"), + wikidata=hit["wikidata"], + ) diff --git a/lookup.py b/lookup.py index e33fd0a..b70ab7b 100755 --- a/lookup.py +++ b/lookup.py @@ -1,19 +1,11 @@ #!/usr/bin/python3 from flask import Flask, render_template, request, jsonify, redirect, url_for -import geocode from geocode import wikidata, scotland, database, model -import urllib.parse +import geocode import random -# select gid, code, name from scotland where st_contains(geom, ST_Transform(ST_SetSRID(ST_MakePoint(-4.177, 55.7644), 4326), 27700)); - -commons_cat_start = "https://commons.wikimedia.org/wiki/Category:" - -wd_entity = "http://www.wikidata.org/entity/Q" city_of_london_qid = "Q23311" - - app = Flask(__name__) app.config.from_object("config.default") database.init_app(app) @@ -31,31 +23,6 @@ def get_random_lat_lon(): return lat, lon -def wd_to_qid(wd): - # expecting {"type": "url", "value": "https://www.wikidata.org/wiki/Q30"} - if wd["type"] == "uri": - return wd_uri_to_qid(wd["value"]) - - -def wd_uri_to_qid(value): - assert value.startswith(wd_entity) - return value[len(wd_entity) - 1 :] - - -def build_dict(hit, lat, lon): - coords = {"lat": lat, "lon": lon} - if hit is None: - return dict(commons_cat=None, missing=True, coords=coords) - commons_cat = hit["commons_cat"] - url = commons_cat_start + urllib.parse.quote(commons_cat.replace(" ", "_")) - return dict( - commons_cat={"title": commons_cat, "url": url}, - coords=coords, - admin_level=hit.get("admin_level"), - wikidata=hit["wikidata"], - ) - - def do_lookup(elements, lat, lon): try: hit = osm_lookup(elements, lat, lon) @@ -66,58 +33,17 @@ def do_lookup(elements, lat, lon): "query_url": "https://query.wikidata.org/#" + e.query, } - return build_dict(hit, lat, lon) - - -def wdqs_geosearch_query(lat, lon): - if isinstance(lat, float): - lat = f"{lat:f}" - if isinstance(lon, float): - lon = f"{lon:f}" - - query = render_template("sparql/geosearch.sparql", lat=lat, lon=lon) - return wikidata.wdqs(query) - - -def wdqs_geosearch(lat, lon): - default_max_dist = 1 - rows = wdqs_geosearch_query(lat, lon) - max_dist = { - "Q188509": 1, # suburb - "Q3957": 2, # town - "Q532": 1, # village - "Q5084": 1, # hamlet - "Q515": 2, # city - "Q1549591": 3, # big city - } - for row in rows: - isa = wd_uri_to_qid(row["isa"]["value"]) - - if ( - "commonsCat" not in row - and "commonsSiteLink" not in row - and isa not in max_dist - ): - continue - - distance = float(row["distance"]["value"]) - if distance > max_dist.get(isa, default_max_dist): - continue - - if "commonsCat" not in row and "commonsSiteLink" not in row: - break - - return row + return wikidata.build_dict(hit, lat, lon) def lat_lon_to_wikidata(lat, lon): scotland_code = scotland.get_scotland_code(lat, lon) if scotland_code: - rows = lookup_scottish_parish_in_wikidata(scotland_code) - hit = commons_from_rows(rows) + rows = wikidata.lookup_scottish_parish_in_wikidata(scotland_code) + hit = wikidata.commons_from_rows(rows) elements = [] - result = build_dict(hit, lat, lon) + result = wikidata.build_dict(hit, lat, lon) return {"elements": elements, "result": result} @@ -133,52 +59,15 @@ def lat_lon_to_wikidata(lat, lon): if not admin_level or admin_level >= 7: return {"elements": elements, "result": result} - row = wdqs_geosearch(lat, lon) + row = wikidata.geosearch(lat, lon) if row: - hit = commons_from_rows([row]) + hit = wikidata.commons_from_rows([row]) elements = [] - result = build_dict(hit, lat, lon) + result = wikidata.build_dict(hit, lat, lon) return {"elements": elements, "result": result} -def lookup_scottish_parish_in_wikidata(code): - query = render_template("sparql/scottish_parish.sparql", code=code) - return wikidata.wdqs(query) - - -def lookup_gss_in_wikidata(gss): - query = render_template("sparql/lookup_gss.sparql", gss=gss) - return wikidata.wdqs(query) - - -def lookup_wikidata_by_name(name, lat, lon): - query = render_template( - "sparql/lookup_by_name.sparql", name=repr(name), lat=str(lat), lon=str(lon) - ) - return wikidata.wdqs(query) - - -def unescape_title(t): - return urllib.parse.unquote(t.replace("_", " ")) - - -def commons_from_rows(rows): - for row in rows: - if "commonsCat" in row: - qid = wd_to_qid(row["item"]) - return {"wikidata": qid, "commons_cat": row["commonsCat"]["value"]} - if "commonsSiteLink" in row: - site_link = row["commonsSiteLink"]["value"] - qid = wd_to_qid(row["item"]) - cat = unescape_title(site_link[len(commons_cat_start) :]) - return {"wikidata": qid, "commons_cat": cat} - - -def get_commons_cat_from_gss(gss): - return commons_from_rows(lookup_gss_in_wikidata(gss)) - - def osm_lookup(elements, lat, lon): elements = sorted(elements, key=lambda e: e.area) @@ -203,7 +92,7 @@ def osm_lookup(elements, lat, lon): } gss = tags.get("ref:gss") if gss: - ret = get_commons_cat_from_gss(gss) + ret = wikidata.get_commons_cat_from_gss(gss) if ret: ret["admin_level"] = admin_level return ret @@ -213,10 +102,10 @@ def osm_lookup(elements, lat, lon): continue if name.endswith(" CP"): name = name[:-3] - rows = lookup_wikidata_by_name(name, lat, lon) + rows = wikidata.lookup_wikidata_by_name(name, lat, lon) if len(rows) == 1: - ret = commons_from_rows(rows) + ret = wikidata.commons_from_rows(rows) if ret: ret["admin_level"] = admin_level return ret @@ -269,10 +158,10 @@ def wikidata_tag(): scotland_code = scotland.get_scotland_code(lat, lon) if scotland_code: - rows = lookup_scottish_parish_in_wikidata(scotland_code) - hit = commons_from_rows(rows) + rows = wikidata.lookup_scottish_parish_in_wikidata(scotland_code) + hit = wikidata.commons_from_rows(rows) elements = [] - result = build_dict(hit, lat, lon) + result = wikidata.build_dict(hit, lat, lon) else: elements = model.Polygon.coords_within(lat, lon) result = do_lookup(elements, lat, lon)