geocode/lookup.py

331 lines
9.2 KiB
Python
Raw Normal View History

2019-08-18 15:56:53 +01:00
#!/usr/bin/python3
2021-04-15 12:28:02 +01:00
from flask import Flask, render_template, request, jsonify, redirect, url_for
2021-04-16 20:24:23 +01:00
import geocode
2021-04-16 21:24:59 +01:00
import geocode.wikidata
import geocode.overpass
2019-08-18 15:56:53 +01:00
import urllib.parse
import random
2021-04-15 12:28:02 +01:00
import psycopg2
from geopy.distance import distance
# select gid, code, name from scotland where st_contains(geom, ST_Transform(ST_SetSRID(ST_MakePoint(-4.177, 55.7644), 4326), 27700));
2019-08-18 15:56:53 +01:00
2021-04-16 16:21:38 +01:00
commons_cat_start = "https://commons.wikimedia.org/wiki/Category:"
2019-08-18 15:56:53 +01:00
2021-04-16 16:21:38 +01:00
wd_entity = "http://www.wikidata.org/entity/Q"
city_of_london_qid = "Q23311"
2019-08-18 15:56:53 +01:00
2021-04-16 16:22:39 +01:00
2019-08-18 15:56:53 +01:00
app = Flask(__name__)
2021-04-16 21:24:59 +01:00
app.config.from_object("config.default")
2019-08-18 15:56:53 +01:00
2021-04-16 16:14:12 +01:00
def get_random_lat_lon():
2021-04-16 16:22:39 +01:00
""" Select random lat/lon within the UK """
2021-04-16 16:14:12 +01:00
south, east = 50.8520, 0.3536
north, west = 53.7984, -2.7296
mul = 10000
2019-08-18 15:56:53 +01:00
lat = random.randrange(int(south * mul), int(north * mul)) / mul
lon = random.randrange(int(west * mul), int(east * mul)) / mul
2021-04-16 16:14:12 +01:00
return lat, lon
2021-04-15 12:28:02 +01:00
def bounding_box_area(element):
2021-04-16 16:21:38 +01:00
bbox = element["bounds"]
2021-04-15 12:28:02 +01:00
2021-04-16 16:21:38 +01:00
x = distance((bbox["maxlat"], bbox["minlon"]), (bbox["maxlat"], bbox["maxlon"]))
y = distance((bbox["minlat"], bbox["maxlon"]), (bbox["maxlat"], bbox["minlon"]))
2021-04-15 12:28:02 +01:00
return x.km * y.km
2021-04-16 16:14:32 +01:00
2021-04-15 12:28:02 +01:00
def wd_to_qid(wd):
2021-04-16 16:21:38 +01:00
# expecting {"type": "url", "value": "https://www.wikidata.org/wiki/Q30"}
if wd["type"] == "uri":
return wd_uri_to_qid(wd["value"])
2021-04-15 12:28:02 +01:00
2021-04-16 16:14:32 +01:00
2021-04-15 12:28:02 +01:00
def wd_uri_to_qid(value):
assert value.startswith(wd_entity)
2021-04-16 16:22:39 +01:00
return value[len(wd_entity) - 1 :]
2021-04-15 12:28:02 +01:00
2021-04-16 16:14:32 +01:00
2021-04-15 12:28:02 +01:00
def build_dict(hit, lat, lon):
2021-04-16 16:21:38 +01:00
coords = {"lat": lat, "lon": lon}
2021-04-15 12:28:02 +01:00
if hit is None:
2019-08-18 15:56:53 +01:00
return dict(commons_cat=None, missing=True, coords=coords)
2021-04-16 16:21:38 +01:00
commons_cat = hit["commons_cat"]
url = commons_cat_start + urllib.parse.quote(commons_cat.replace(" ", "_"))
2021-04-16 16:22:39 +01:00
return dict(
commons_cat={"title": commons_cat, "url": url},
coords=coords,
admin_level=hit.get("admin_level"),
wikidata=hit["wikidata"],
)
2021-04-15 12:28:02 +01:00
def do_lookup(elements, lat, lon):
try:
hit = osm_lookup(elements, lat, lon)
2021-04-16 21:24:59 +01:00
except geocode.wkidata.QueryError as e:
2021-04-15 12:28:02 +01:00
return {
2021-04-16 16:21:38 +01:00
"query": e.query,
"error": e.r.text,
"query_url": "https://query.wikidata.org/#" + e.query,
2021-04-15 12:28:02 +01:00
}
return build_dict(hit, lat, lon)
2021-04-16 16:14:32 +01:00
2021-04-15 12:28:02 +01:00
def get_scotland_code(lat, lon):
2021-04-16 21:24:59 +01:00
conn = psycopg2.connect(**app.config["DB_PARAMS"])
2021-04-15 12:28:02 +01:00
cur = conn.cursor()
2021-04-16 16:21:38 +01:00
point = f"ST_Transform(ST_SetSRID(ST_MakePoint({lon}, {lat}), 4326), 27700)"
cur.execute(f"select code, name from scotland where st_contains(geom, {point});")
2021-04-15 12:28:02 +01:00
row = cur.fetchone()
# expand search, disabled for now 2020-04-20
if not row:
2021-04-16 16:22:39 +01:00
cur.execute(
f"select code, name from scotland where ST_DWithin(geom, {point}, 100);"
)
2021-04-15 12:28:02 +01:00
row = cur.fetchone()
conn.close()
if row:
return row[0]
2021-04-16 16:14:32 +01:00
2021-04-15 12:28:02 +01:00
def wdqs_geosearch_query(lat, lon):
if isinstance(lat, float):
2021-04-16 16:22:39 +01:00
lat = f"{lat:f}"
2021-04-15 12:28:02 +01:00
if isinstance(lon, float):
2021-04-16 16:22:39 +01:00
lon = f"{lon:f}"
2021-04-15 12:28:02 +01:00
2021-04-16 21:24:59 +01:00
query = render_template("sparql/geosearch.sparql", lat=lat, lon=lon)
return geocode.wikidata.wdqs(query)
2021-04-15 12:28:02 +01:00
2021-04-16 16:14:32 +01:00
2021-04-15 12:28:02 +01:00
def wdqs_geosearch(lat, lon):
default_max_dist = 1
rows = wdqs_geosearch_query(lat, lon)
max_dist = {
2021-04-16 16:22:39 +01:00
"Q188509": 1, # suburb
"Q3957": 2, # town
"Q532": 1, # village
"Q5084": 1, # hamlet
"Q515": 2, # city
"Q1549591": 3, # big city
2021-04-15 12:28:02 +01:00
}
for row in rows:
2021-04-16 16:22:39 +01:00
isa = wd_uri_to_qid(row["isa"]["value"])
2021-04-15 12:28:02 +01:00
2021-04-16 16:22:39 +01:00
if (
"commonsCat" not in row
and "commonsSiteLink" not in row
and isa not in max_dist
):
2021-04-15 12:28:02 +01:00
continue
2021-04-16 16:22:39 +01:00
distance = float(row["distance"]["value"])
2021-04-15 12:28:02 +01:00
if distance > max_dist.get(isa, default_max_dist):
continue
2021-04-16 16:22:39 +01:00
if "commonsCat" not in row and "commonsSiteLink" not in row:
2021-04-15 12:28:02 +01:00
break
return row
2021-04-16 16:14:32 +01:00
2021-04-15 12:28:02 +01:00
def lat_lon_to_wikidata(lat, lon):
scotland_code = get_scotland_code(lat, lon)
if scotland_code:
rows = lookup_scottish_parish_in_wikidata(scotland_code)
hit = commons_from_rows(rows)
elements = []
result = build_dict(hit, lat, lon)
2021-04-16 16:22:39 +01:00
return {"elements": elements, "result": result}
2021-04-15 12:28:02 +01:00
2021-04-16 21:24:59 +01:00
elements = geocode.overpass.get_osm_elements(lat, lon)
2021-04-15 12:28:02 +01:00
result = do_lookup(elements, lat, lon)
# special case because the City of London is admin_level=6 in OSM
2021-04-16 16:22:39 +01:00
if result["wikidata"] == city_of_london_qid:
return {"elements": elements, "result": result}
2021-04-15 12:28:02 +01:00
2021-04-16 16:22:39 +01:00
admin_level = result["admin_level"]
2021-04-15 12:28:02 +01:00
if not admin_level or admin_level >= 7:
2021-04-16 16:22:39 +01:00
return {"elements": elements, "result": result}
2021-04-15 12:28:02 +01:00
row = wdqs_geosearch(lat, lon)
if row:
hit = commons_from_rows([row])
elements = []
result = build_dict(hit, lat, lon)
2021-04-16 16:22:39 +01:00
return {"elements": elements, "result": result}
2019-08-18 15:56:53 +01:00
2021-04-16 16:14:32 +01:00
2021-04-15 12:28:02 +01:00
def lookup_scottish_parish_in_wikidata(code):
2021-04-16 21:24:59 +01:00
query = render_template("sparql/scottish_parish.sparql", code=code)
return geocode.wikidata.wdqs(query)
2021-04-15 12:28:02 +01:00
2021-04-16 16:14:32 +01:00
2019-08-18 15:56:53 +01:00
def lookup_gss_in_wikidata(gss):
2021-04-16 21:24:59 +01:00
query = render_template("sparql/lookup_gss.sparql", gss=gss)
return geocode.wikidata.wdqs(query)
2021-04-15 12:28:02 +01:00
2021-04-16 16:14:32 +01:00
2021-04-15 12:28:02 +01:00
def lookup_wikidata_by_name(name, lat, lon):
2021-04-16 21:24:59 +01:00
query = render_template(
"sparql/lookup_by_name.sparql", name=repr(name), lat=str(lat), lon=str(lon)
2021-04-16 16:22:39 +01:00
)
2021-04-16 21:24:59 +01:00
return geocode.wikidata.wdqs(query)
2019-08-18 15:56:53 +01:00
2021-04-16 16:14:32 +01:00
2019-08-18 15:56:53 +01:00
def unescape_title(t):
2021-04-16 16:21:38 +01:00
return urllib.parse.unquote(t.replace("_", " "))
2019-08-18 15:56:53 +01:00
2021-04-16 16:14:32 +01:00
2021-04-15 12:28:02 +01:00
def commons_from_rows(rows):
2019-08-18 15:56:53 +01:00
for row in rows:
2021-04-16 16:21:38 +01:00
if "commonsCat" in row:
qid = wd_to_qid(row["item"])
2021-04-16 16:22:39 +01:00
return {"wikidata": qid, "commons_cat": row["commonsCat"]["value"]}
2021-04-16 16:21:38 +01:00
if "commonsSiteLink" in row:
site_link = row["commonsSiteLink"]["value"]
qid = wd_to_qid(row["item"])
2021-04-16 16:22:39 +01:00
cat = unescape_title(site_link[len(commons_cat_start) :])
2021-04-16 16:21:38 +01:00
return {"wikidata": qid, "commons_cat": cat}
2021-04-15 12:28:02 +01:00
2021-04-16 16:14:32 +01:00
2021-04-15 12:28:02 +01:00
def get_commons_cat_from_gss(gss):
2021-04-16 16:21:38 +01:00
return commons_from_rows(lookup_gss_in_wikidata(gss))
2019-08-18 15:56:53 +01:00
2021-04-16 16:14:32 +01:00
2021-04-15 12:28:02 +01:00
def osm_lookup(elements, lat, lon):
elements.sort(key=lambda e: bounding_box_area(e))
for e in elements:
2021-04-16 16:21:38 +01:00
if "tags" not in e:
2021-04-15 12:28:02 +01:00
continue
2021-04-16 16:21:38 +01:00
tags = e["tags"]
admin_level_tag = tags.get("admin_level")
2021-04-16 16:22:39 +01:00
admin_level = (
int(admin_level_tag)
if admin_level_tag and admin_level_tag.isdigit()
else None
)
2021-04-16 16:21:38 +01:00
if not admin_level and tags.get("boundary") != "political":
2021-04-15 12:28:02 +01:00
continue
2021-04-16 16:21:38 +01:00
if "wikidata" in tags:
qid = tags["wikidata"]
2021-04-16 21:24:59 +01:00
commons = geocode.wikidata.qid_to_commons_category(qid)
2021-04-15 12:28:02 +01:00
if commons:
return {
2021-04-16 16:21:38 +01:00
"wikidata": qid,
"commons_cat": commons,
"admin_level": admin_level,
2021-04-15 12:28:02 +01:00
}
2021-04-16 16:21:38 +01:00
gss = tags.get("ref:gss")
2021-04-15 12:28:02 +01:00
if gss:
ret = get_commons_cat_from_gss(gss)
if ret:
2021-04-16 16:21:38 +01:00
ret["admin_level"] = admin_level
2021-04-15 12:28:02 +01:00
return ret
2021-04-16 16:21:38 +01:00
name = tags.get("name")
2021-04-15 12:28:02 +01:00
if not name:
2019-08-18 15:56:53 +01:00
continue
2021-04-16 16:21:38 +01:00
if name.endswith(" CP"):
2021-04-15 12:28:02 +01:00
name = name[:-3]
rows = lookup_wikidata_by_name(name, lat, lon)
if len(rows) == 1:
ret = commons_from_rows(rows)
if ret:
2021-04-16 16:21:38 +01:00
ret["admin_level"] = admin_level
2021-04-15 12:28:02 +01:00
return ret
2021-04-16 16:21:38 +01:00
has_wikidata_tag = [e["tags"] for e in elements if "wikidata" in e["tags"]]
2021-04-15 12:28:02 +01:00
if len(has_wikidata_tag) != 1:
return
2021-04-16 16:21:38 +01:00
qid = has_wikidata_tag[0]["wikidata"]
2021-04-15 12:28:02 +01:00
return {
2021-04-16 16:21:38 +01:00
"wikidata": qid,
2021-04-16 20:24:23 +01:00
"commons_cat": geocode.qid_to_commons_category(qid),
2021-04-16 16:21:38 +01:00
"admin_level": admin_level,
2021-04-15 12:28:02 +01:00
}
2019-08-18 15:56:53 +01:00
2021-04-16 21:30:25 +01:00
@app.route("/")
def index():
q = request.args.get("q")
if q and q.strip():
lat, lon = [v.strip() for v in q.split(",", 1)]
return redirect(url_for("detail_page", lat=lat, lon=lon))
lat, lon = request.args.get("lat"), request.args.get("lon")
if lat is not None and lon is not None:
return jsonify(lat_lon_to_wikidata(lat, lon)["result"])
samples = sorted(geocode.samples, key=lambda row: row[2])
return render_template("index.html", samples=samples)
@app.route("/random")
def random_location():
lat, lon = get_random_lat_lon()
elements = geocode.overpass.get_osm_elements(lat, lon)
result = do_lookup(elements, lat, lon)
return render_template(
"detail.html", lat=lat, lon=lon, result=result, elements=elements
)
@app.route("/wikidata_tag")
def wikidata_tag():
lat = float(request.args.get("lat"))
lon = float(request.args.get("lon"))
scotland_code = get_scotland_code(lat, lon)
if scotland_code:
rows = lookup_scottish_parish_in_wikidata(scotland_code)
hit = commons_from_rows(rows)
elements = []
result = build_dict(hit, lat, lon)
else:
elements = geocode.overpass.get_osm_elements(lat, lon)
result = do_lookup(elements, lat, lon)
return render_template(
"wikidata_tag.html", lat=lat, lon=lon, result=result, elements=elements
)
@app.route("/detail")
def detail_page():
try:
lat, lon = [float(request.args.get(param)) for param in ("lat", "lon")]
except TypeError:
return redirect(url_for("index"))
reply = lat_lon_to_wikidata(lat, lon)
return render_template("detail.html", lat=lat, lon=lon, **reply)
2021-04-16 16:21:38 +01:00
if __name__ == "__main__":
app.run(host="0.0.0.0")