Move code around

This commit is contained in:
Edward Betts 2021-04-17 19:29:09 +02:00
parent 44241751b2
commit c5a66abb25
2 changed files with 120 additions and 125 deletions

View file

@ -1,8 +1,12 @@
from flask import render_template
import requests import requests
import simplejson import simplejson
from . import headers from . import headers
import urllib.parse
wikidata_query_api_url = "https://query.wikidata.org/bigdata/namespace/wdq/sparql" wikidata_query_api_url = "https://query.wikidata.org/bigdata/namespace/wdq/sparql"
wd_entity = "http://www.wikidata.org/entity/Q"
commons_cat_start = "https://commons.wikimedia.org/wiki/Category:"
class QueryError(Exception): class QueryError(Exception):
@ -49,3 +53,105 @@ def wdqs(query):
return r.json()["results"]["bindings"] return r.json()["results"]["bindings"]
except simplejson.errors.JSONDecodeError: except simplejson.errors.JSONDecodeError:
raise QueryError(query, r) raise QueryError(query, r)
def wd_to_qid(wd):
# expecting {"type": "url", "value": "https://www.wikidata.org/wiki/Q30"}
if wd["type"] == "uri":
return wd_uri_to_qid(wd["value"])
def wd_uri_to_qid(value):
assert value.startswith(wd_entity)
return value[len(wd_entity) - 1 :]
def geosearch_query(lat, lon):
if isinstance(lat, float):
lat = f"{lat:f}"
if isinstance(lon, float):
lon = f"{lon:f}"
query = render_template("sparql/geosearch.sparql", lat=lat, lon=lon)
return wdqs(query)
def geosearch(lat, lon):
default_max_dist = 1
rows = geosearch_query(lat, lon)
max_dist = {
"Q188509": 1, # suburb
"Q3957": 2, # town
"Q532": 1, # village
"Q5084": 1, # hamlet
"Q515": 2, # city
"Q1549591": 3, # big city
}
for row in rows:
isa = wd_uri_to_qid(row["isa"]["value"])
if (
"commonsCat" not in row
and "commonsSiteLink" not in row
and isa not in max_dist
):
continue
distance = float(row["distance"]["value"])
if distance > max_dist.get(isa, default_max_dist):
continue
if "commonsCat" not in row and "commonsSiteLink" not in row:
break
return row
def lookup_scottish_parish_in_wikidata(code):
query = render_template("sparql/scottish_parish.sparql", code=code)
return wdqs(query)
def lookup_gss_in_wikidata(gss):
query = render_template("sparql/lookup_gss.sparql", gss=gss)
return wdqs(query)
def lookup_wikidata_by_name(name, lat, lon):
query = render_template(
"sparql/lookup_by_name.sparql", name=repr(name), lat=str(lat), lon=str(lon)
)
return wdqs(query)
def unescape_title(t):
return urllib.parse.unquote(t.replace("_", " "))
def commons_from_rows(rows):
for row in rows:
if "commonsCat" in row:
qid = wd_to_qid(row["item"])
return {"wikidata": qid, "commons_cat": row["commonsCat"]["value"]}
if "commonsSiteLink" in row:
site_link = row["commonsSiteLink"]["value"]
qid = wd_to_qid(row["item"])
cat = unescape_title(site_link[len(commons_cat_start) :])
return {"wikidata": qid, "commons_cat": cat}
def get_commons_cat_from_gss(gss):
return commons_from_rows(lookup_gss_in_wikidata(gss))
def build_dict(hit, lat, lon):
coords = {"lat": lat, "lon": lon}
if hit is None:
return dict(commons_cat=None, missing=True, coords=coords)
commons_cat = hit["commons_cat"]
url = commons_cat_start + urllib.parse.quote(commons_cat.replace(" ", "_"))
return dict(
commons_cat={"title": commons_cat, "url": url},
coords=coords,
admin_level=hit.get("admin_level"),
wikidata=hit["wikidata"],
)

139
lookup.py
View file

@ -1,19 +1,11 @@
#!/usr/bin/python3 #!/usr/bin/python3
from flask import Flask, render_template, request, jsonify, redirect, url_for from flask import Flask, render_template, request, jsonify, redirect, url_for
import geocode
from geocode import wikidata, scotland, database, model from geocode import wikidata, scotland, database, model
import urllib.parse import geocode
import random import random
# select gid, code, name from scotland where st_contains(geom, ST_Transform(ST_SetSRID(ST_MakePoint(-4.177, 55.7644), 4326), 27700));
commons_cat_start = "https://commons.wikimedia.org/wiki/Category:"
wd_entity = "http://www.wikidata.org/entity/Q"
city_of_london_qid = "Q23311" city_of_london_qid = "Q23311"
app = Flask(__name__) app = Flask(__name__)
app.config.from_object("config.default") app.config.from_object("config.default")
database.init_app(app) database.init_app(app)
@ -31,31 +23,6 @@ def get_random_lat_lon():
return lat, lon return lat, lon
def wd_to_qid(wd):
# expecting {"type": "url", "value": "https://www.wikidata.org/wiki/Q30"}
if wd["type"] == "uri":
return wd_uri_to_qid(wd["value"])
def wd_uri_to_qid(value):
assert value.startswith(wd_entity)
return value[len(wd_entity) - 1 :]
def build_dict(hit, lat, lon):
coords = {"lat": lat, "lon": lon}
if hit is None:
return dict(commons_cat=None, missing=True, coords=coords)
commons_cat = hit["commons_cat"]
url = commons_cat_start + urllib.parse.quote(commons_cat.replace(" ", "_"))
return dict(
commons_cat={"title": commons_cat, "url": url},
coords=coords,
admin_level=hit.get("admin_level"),
wikidata=hit["wikidata"],
)
def do_lookup(elements, lat, lon): def do_lookup(elements, lat, lon):
try: try:
hit = osm_lookup(elements, lat, lon) hit = osm_lookup(elements, lat, lon)
@ -66,58 +33,17 @@ def do_lookup(elements, lat, lon):
"query_url": "https://query.wikidata.org/#" + e.query, "query_url": "https://query.wikidata.org/#" + e.query,
} }
return build_dict(hit, lat, lon) return wikidata.build_dict(hit, lat, lon)
def wdqs_geosearch_query(lat, lon):
if isinstance(lat, float):
lat = f"{lat:f}"
if isinstance(lon, float):
lon = f"{lon:f}"
query = render_template("sparql/geosearch.sparql", lat=lat, lon=lon)
return wikidata.wdqs(query)
def wdqs_geosearch(lat, lon):
default_max_dist = 1
rows = wdqs_geosearch_query(lat, lon)
max_dist = {
"Q188509": 1, # suburb
"Q3957": 2, # town
"Q532": 1, # village
"Q5084": 1, # hamlet
"Q515": 2, # city
"Q1549591": 3, # big city
}
for row in rows:
isa = wd_uri_to_qid(row["isa"]["value"])
if (
"commonsCat" not in row
and "commonsSiteLink" not in row
and isa not in max_dist
):
continue
distance = float(row["distance"]["value"])
if distance > max_dist.get(isa, default_max_dist):
continue
if "commonsCat" not in row and "commonsSiteLink" not in row:
break
return row
def lat_lon_to_wikidata(lat, lon): def lat_lon_to_wikidata(lat, lon):
scotland_code = scotland.get_scotland_code(lat, lon) scotland_code = scotland.get_scotland_code(lat, lon)
if scotland_code: if scotland_code:
rows = lookup_scottish_parish_in_wikidata(scotland_code) rows = wikidata.lookup_scottish_parish_in_wikidata(scotland_code)
hit = commons_from_rows(rows) hit = wikidata.commons_from_rows(rows)
elements = [] elements = []
result = build_dict(hit, lat, lon) result = wikidata.build_dict(hit, lat, lon)
return {"elements": elements, "result": result} return {"elements": elements, "result": result}
@ -133,52 +59,15 @@ def lat_lon_to_wikidata(lat, lon):
if not admin_level or admin_level >= 7: if not admin_level or admin_level >= 7:
return {"elements": elements, "result": result} return {"elements": elements, "result": result}
row = wdqs_geosearch(lat, lon) row = wikidata.geosearch(lat, lon)
if row: if row:
hit = commons_from_rows([row]) hit = wikidata.commons_from_rows([row])
elements = [] elements = []
result = build_dict(hit, lat, lon) result = wikidata.build_dict(hit, lat, lon)
return {"elements": elements, "result": result} return {"elements": elements, "result": result}
def lookup_scottish_parish_in_wikidata(code):
query = render_template("sparql/scottish_parish.sparql", code=code)
return wikidata.wdqs(query)
def lookup_gss_in_wikidata(gss):
query = render_template("sparql/lookup_gss.sparql", gss=gss)
return wikidata.wdqs(query)
def lookup_wikidata_by_name(name, lat, lon):
query = render_template(
"sparql/lookup_by_name.sparql", name=repr(name), lat=str(lat), lon=str(lon)
)
return wikidata.wdqs(query)
def unescape_title(t):
return urllib.parse.unquote(t.replace("_", " "))
def commons_from_rows(rows):
for row in rows:
if "commonsCat" in row:
qid = wd_to_qid(row["item"])
return {"wikidata": qid, "commons_cat": row["commonsCat"]["value"]}
if "commonsSiteLink" in row:
site_link = row["commonsSiteLink"]["value"]
qid = wd_to_qid(row["item"])
cat = unescape_title(site_link[len(commons_cat_start) :])
return {"wikidata": qid, "commons_cat": cat}
def get_commons_cat_from_gss(gss):
return commons_from_rows(lookup_gss_in_wikidata(gss))
def osm_lookup(elements, lat, lon): def osm_lookup(elements, lat, lon):
elements = sorted(elements, key=lambda e: e.area) elements = sorted(elements, key=lambda e: e.area)
@ -203,7 +92,7 @@ def osm_lookup(elements, lat, lon):
} }
gss = tags.get("ref:gss") gss = tags.get("ref:gss")
if gss: if gss:
ret = get_commons_cat_from_gss(gss) ret = wikidata.get_commons_cat_from_gss(gss)
if ret: if ret:
ret["admin_level"] = admin_level ret["admin_level"] = admin_level
return ret return ret
@ -213,10 +102,10 @@ def osm_lookup(elements, lat, lon):
continue continue
if name.endswith(" CP"): if name.endswith(" CP"):
name = name[:-3] name = name[:-3]
rows = lookup_wikidata_by_name(name, lat, lon) rows = wikidata.lookup_wikidata_by_name(name, lat, lon)
if len(rows) == 1: if len(rows) == 1:
ret = commons_from_rows(rows) ret = wikidata.commons_from_rows(rows)
if ret: if ret:
ret["admin_level"] = admin_level ret["admin_level"] = admin_level
return ret return ret
@ -269,10 +158,10 @@ def wikidata_tag():
scotland_code = scotland.get_scotland_code(lat, lon) scotland_code = scotland.get_scotland_code(lat, lon)
if scotland_code: if scotland_code:
rows = lookup_scottish_parish_in_wikidata(scotland_code) rows = wikidata.lookup_scottish_parish_in_wikidata(scotland_code)
hit = commons_from_rows(rows) hit = wikidata.commons_from_rows(rows)
elements = [] elements = []
result = build_dict(hit, lat, lon) result = wikidata.build_dict(hit, lat, lon)
else: else:
elements = model.Polygon.coords_within(lat, lon) elements = model.Polygon.coords_within(lat, lon)
result = do_lookup(elements, lat, lon) result = do_lookup(elements, lat, lon)