diff --git a/lookup.py b/lookup.py index 5d741f7..96d3f38 100755 --- a/lookup.py +++ b/lookup.py @@ -61,6 +61,7 @@ samples = [ (51.4520, -2.6210, "Bristol"), ] + class QueryError(Exception): def __init__(self, query, r): self.query = query @@ -72,7 +73,7 @@ app.debug = True def get_random_lat_lon(): - ''' Select random lat/lon within the UK ''' + """ Select random lat/lon within the UK """ south, east = 50.8520, 0.3536 north, west = 53.7984, -2.7296 @@ -90,7 +91,9 @@ def random_location(): elements = get_osm_elements(lat, lon) result = do_lookup(elements, lat, lon) - return render_template("random.html", lat=lat, lon=lon, result=result, elements=elements) + return render_template( + "random.html", lat=lat, lon=lon, result=result, elements=elements + ) @app.route("/wikidata_tag") @@ -109,7 +112,9 @@ def wikidata_tag(): elements = get_osm_elements(lat, lon) result = do_lookup(elements, lat, lon) - return render_template("wikidata_tag.html", lat=lat, lon=lon, result=result, elements=elements) + return render_template( + "wikidata_tag.html", lat=lat, lon=lon, result=result, elements=elements + ) @app.route("/detail") @@ -139,7 +144,7 @@ def wd_to_qid(wd): def wd_uri_to_qid(value): assert value.startswith(wd_entity) - return value[len(wd_entity) - 1:] + return value[len(wd_entity) - 1 :] def build_dict(hit, lat, lon): @@ -148,10 +153,12 @@ def build_dict(hit, lat, lon): return dict(commons_cat=None, missing=True, coords=coords) commons_cat = hit["commons_cat"] url = commons_cat_start + urllib.parse.quote(commons_cat.replace(" ", "_")) - return dict(commons_cat={"title": commons_cat, "url": url}, - coords=coords, - admin_level=hit.get("admin_level"), - wikidata=hit["wikidata"]) + return dict( + commons_cat={"title": commons_cat, "url": url}, + coords=coords, + admin_level=hit.get("admin_level"), + wikidata=hit["wikidata"], + ) def do_lookup(elements, lat, lon): @@ -168,7 +175,9 @@ def do_lookup(elements, lat, lon): def get_scotland_code(lat, lon): - conn = psycopg2.connect(dbname="geocode", user="geocode", password="ooK3ohgh", host="localhost") + conn = psycopg2.connect( + dbname="geocode", user="geocode", password="ooK3ohgh", host="localhost" + ) cur = conn.cursor() point = f"ST_Transform(ST_SetSRID(ST_MakePoint({lon}, {lat}), 4326), 27700)" @@ -177,7 +186,9 @@ def get_scotland_code(lat, lon): # expand search, disabled for now 2020-04-20 if not row: - cur.execute(f"select code, name from scotland where ST_DWithin(geom, {point}, 100);") + cur.execute( + f"select code, name from scotland where ST_DWithin(geom, {point}, 100);" + ) row = cur.fetchone() conn.close() @@ -187,11 +198,11 @@ def get_scotland_code(lat, lon): def wdqs_geosearch_query(lat, lon): if isinstance(lat, float): - lat = f'{lat:f}' + lat = f"{lat:f}" if isinstance(lon, float): - lon = f'{lon:f}' + lon = f"{lon:f}" - query_template = ''' + query_template = """ SELECT DISTINCT ?item ?distance ?itemLabel ?isa ?isaLabel ?commonsCat ?commonsSiteLink WHERE { { @@ -211,35 +222,39 @@ SELECT DISTINCT ?item ?distance ?itemLabel ?isa ?isaLabel ?commonsCat ?commonsSi OPTIONAL { ?commonsSiteLink schema:about ?item; schema:isPartOf . } SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". } -} ORDER BY (?distance)''' +} ORDER BY (?distance)""" - query = query_template.replace('LAT', lat).replace('LON', lon) + query = query_template.replace("LAT", lat).replace("LON", lon) reply = wdqs(query) - return reply['results']['bindings'] + return reply["results"]["bindings"] def wdqs_geosearch(lat, lon): default_max_dist = 1 rows = wdqs_geosearch_query(lat, lon) max_dist = { - 'Q188509': 1, # suburb - 'Q3957': 2, # town - 'Q532': 1, # village - 'Q5084': 1, # hamlet - 'Q515': 2, # city - 'Q1549591': 3, # big city + "Q188509": 1, # suburb + "Q3957": 2, # town + "Q532": 1, # village + "Q5084": 1, # hamlet + "Q515": 2, # city + "Q1549591": 3, # big city } for row in rows: - isa = wd_uri_to_qid(row['isa']['value']) + isa = wd_uri_to_qid(row["isa"]["value"]) - if ('commonsCat' not in row and 'commonsSiteLink' not in row and isa not in max_dist): + if ( + "commonsCat" not in row + and "commonsSiteLink" not in row + and isa not in max_dist + ): continue - distance = float(row['distance']['value']) + distance = float(row["distance"]["value"]) if distance > max_dist.get(isa, default_max_dist): continue - if 'commonsCat' not in row and 'commonsSiteLink' not in row: + if "commonsCat" not in row and "commonsSiteLink" not in row: break return row @@ -254,19 +269,19 @@ def lat_lon_to_wikidata(lat, lon): elements = [] result = build_dict(hit, lat, lon) - return {'elements': elements, 'result': result} + return {"elements": elements, "result": result} elements = get_osm_elements(lat, lon) result = do_lookup(elements, lat, lon) # special case because the City of London is admin_level=6 in OSM - if result['wikidata'] == city_of_london_qid: - return {'elements': elements, 'result': result} + if result["wikidata"] == city_of_london_qid: + return {"elements": elements, "result": result} - admin_level = result['admin_level'] + admin_level = result["admin_level"] if not admin_level or admin_level >= 7: - return {'elements': elements, 'result': result} + return {"elements": elements, "result": result} row = wdqs_geosearch(lat, lon) if row: @@ -274,7 +289,7 @@ def lat_lon_to_wikidata(lat, lon): elements = [] result = build_dict(hit, lat, lon) - return {'elements': elements, 'result': result} + return {"elements": elements, "result": result} @app.route("/") @@ -297,7 +312,7 @@ def wikidata_api_call(params): return requests.get( "https://www.wikidata.org/w/api.php", params={"format": "json", "formatversion": 2, **params}, - headers=headers + headers=headers, ).json() @@ -324,9 +339,7 @@ def qid_to_commons_category(qid): def wdqs(query): r = requests.post( - wikidata_query_api_url, - data={"query": query, "format": "json"}, - headers=headers + wikidata_query_api_url, data={"query": query, "format": "json"}, headers=headers ) try: @@ -336,27 +349,27 @@ def wdqs(query): def run_query(oql, error_on_rate_limit=True): - return requests.post(OVERPASS_URL + '/api/interpreter', - data=oql.encode('utf-8'), - headers=headers) + return requests.post( + OVERPASS_URL + "/api/interpreter", data=oql.encode("utf-8"), headers=headers + ) def get_elements(oql): - return run_query(oql).json()['elements'] + return run_query(oql).json()["elements"] def is_in_lat_lon(lat, lon): - oql = f''' + oql = f""" [out:json][timeout:25]; is_in({lat},{lon})->.a; (way(pivot.a); rel(pivot.a);); -out bb tags qt;''' +out bb tags qt;""" return run_query(oql) def lookup_scottish_parish_in_wikidata(code): - query = ''' + query = """ SELECT ?item ?itemLabel ?commonsSiteLink ?commonsCat WHERE { ?item wdt:P528 "CODE" . ?item wdt:P31 wd:Q5124673 . @@ -365,13 +378,15 @@ SELECT ?item ?itemLabel ?commonsSiteLink ?commonsCat WHERE { OPTIONAL { ?item wdt:P373 ?commonsCat } SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". } } -'''.replace('CODE', code) +""".replace( + "CODE", code + ) reply = wdqs(query) - return reply['results']['bindings'] + return reply["results"]["bindings"] def lookup_gss_in_wikidata(gss): - query = ''' + query = """ SELECT ?item ?itemLabel ?commonsSiteLink ?commonsCat WHERE { ?item wdt:P836 GSS . OPTIONAL { ?commonsSiteLink schema:about ?item ; @@ -379,13 +394,16 @@ SELECT ?item ?itemLabel ?commonsSiteLink ?commonsCat WHERE { OPTIONAL { ?item wdt:P373 ?commonsCat } SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". } } -'''.replace('GSS', repr(gss)) +""".replace( + "GSS", repr(gss) + ) reply = wdqs(query) - return reply['results']['bindings'] + return reply["results"]["bindings"] def lookup_wikidata_by_name(name, lat, lon): - query = ''' + query = ( + """ SELECT DISTINCT ?item ?itemLabel ?commonsSiteLink ?commonsCat WHERE { ?item rdfs:label LABEL@en . FILTER NOT EXISTS { ?item wdt:P31 wd:Q17362920 } .# ignore Wikimedia duplicated page @@ -399,10 +417,15 @@ SELECT DISTINCT ?item ?itemLabel ?commonsSiteLink ?commonsCat WHERE { SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". } } -'''.replace('LABEL', repr(name)).replace('LAT', str(lat)).replace('LON', str(lon)) +""".replace( + "LABEL", repr(name) + ) + .replace("LAT", str(lat)) + .replace("LON", str(lon)) + ) reply = wdqs(query) - return reply['results']['bindings'] + return reply["results"]["bindings"] def unescape_title(t): @@ -413,12 +436,11 @@ def commons_from_rows(rows): for row in rows: if "commonsCat" in row: qid = wd_to_qid(row["item"]) - return {"wikidata": qid, - "commons_cat": row["commonsCat"]["value"]} + return {"wikidata": qid, "commons_cat": row["commonsCat"]["value"]} if "commonsSiteLink" in row: site_link = row["commonsSiteLink"]["value"] qid = wd_to_qid(row["item"]) - cat = unescape_title(site_link[len(commons_cat_start):]) + cat = unescape_title(site_link[len(commons_cat_start) :]) return {"wikidata": qid, "commons_cat": cat} @@ -448,7 +470,11 @@ def osm_lookup(elements, lat, lon): continue tags = e["tags"] admin_level_tag = tags.get("admin_level") - admin_level = int(admin_level_tag) if admin_level_tag and admin_level_tag.isdigit() else None + admin_level = ( + int(admin_level_tag) + if admin_level_tag and admin_level_tag.isdigit() + else None + ) if not admin_level and tags.get("boundary") != "political": continue if "wikidata" in tags: