diff --git a/lookup.py b/lookup.py index d727738..5d741f7 100755 --- a/lookup.py +++ b/lookup.py @@ -12,54 +12,53 @@ from geopy.distance import distance # select gid, code, name from scotland where st_contains(geom, ST_Transform(ST_SetSRID(ST_MakePoint(-4.177, 55.7644), 4326), 27700)); -commons_cat_start = 'https://commons.wikimedia.org/wiki/Category:' +commons_cat_start = "https://commons.wikimedia.org/wiki/Category:" use_cache = False headers = { - 'User-Agent': 'UK gecode/0.1 (edward@4angle.com)', + "User-Agent": "UK gecode/0.1 (edward@4angle.com)", } -OVERPASS_URL = 'https://lz4.overpass-api.de' -wikidata_query_api_url = 'https://query.wikidata.org/bigdata/namespace/wdq/sparql' -wikidata_url = 'https://www.wikidata.org/w/api.php' -wd_entity = 'http://www.wikidata.org/entity/Q' -city_of_london_qid = 'Q23311' +OVERPASS_URL = "https://lz4.overpass-api.de" +wikidata_query_api_url = "https://query.wikidata.org/bigdata/namespace/wdq/sparql" +wd_entity = "http://www.wikidata.org/entity/Q" +city_of_london_qid = "Q23311" samples = [ - (50.8326, -0.2689, 'Adur'), - (52.4914, -0.69645, 'Corby'), - (50.893, -4.265, 'Newton St Petrock'), - (51.779, 0.128, 'Harlow'), - (52.387, 0.294, 'Ely'), - (50.9, -1.6, 'Minstead'), - (52.43, -1.11, 'South Kilworth'), - (53.117, -0.202, 'Tattershall Thorpe'), - (53.351, -2.701, 'Halton'), - (52.421, -0.651, 'Warkton'), - (51.51, -1.547, 'Lambourn'), - (52.62, -1.884, 'Shenstone'), - (53.309, -1.539, 'Sheffield'), - (53.322, 0.032, 'Haugham'), - (51.05, -2.598, 'Babcary'), - (51.158, -1.906, 'Berwick St James'), - (51.867, -1.204, 'Weston-on-the-Green'), - (51.034, -2.005, 'Ebbesbourne Wake'), - (51.07, -0.718, 'Fernhurst'), - (53.059, -0.144, 'Wildmore'), - (51.473, 0.221, 'Dartford'), - (51.059, 0.05, 'Danehill'), - (52.253, -0.122, 'Papworth Everard'), - (53.498, -0.415, 'West Lindsey'), - (53.392, -0.022, 'Brackenborough with Little Grimsby'), - (53.463, -0.027, 'Fulstow'), - (52.766, 0.31, 'Terrington St Clement'), - (53.1540, -1.8034, 'Hartington Town Quarter'), - (51.8532, -0.8829, 'Fleet Marston'), - (51.4785, -0.354, 'London Borough of Hounslow'), - (51.9687, -0.0327, 'Buckland, Hertfordshire'), - (51.0804, -2.3263, 'Zeals'), - (55.7644, -4.1770, 'East Kilbride'), - (51.4520, -2.6210, 'Bristol'), + (50.8326, -0.2689, "Adur"), + (52.4914, -0.69645, "Corby"), + (50.893, -4.265, "Newton St Petrock"), + (51.779, 0.128, "Harlow"), + (52.387, 0.294, "Ely"), + (50.9, -1.6, "Minstead"), + (52.43, -1.11, "South Kilworth"), + (53.117, -0.202, "Tattershall Thorpe"), + (53.351, -2.701, "Halton"), + (52.421, -0.651, "Warkton"), + (51.51, -1.547, "Lambourn"), + (52.62, -1.884, "Shenstone"), + (53.309, -1.539, "Sheffield"), + (53.322, 0.032, "Haugham"), + (51.05, -2.598, "Babcary"), + (51.158, -1.906, "Berwick St James"), + (51.867, -1.204, "Weston-on-the-Green"), + (51.034, -2.005, "Ebbesbourne Wake"), + (51.07, -0.718, "Fernhurst"), + (53.059, -0.144, "Wildmore"), + (51.473, 0.221, "Dartford"), + (51.059, 0.05, "Danehill"), + (52.253, -0.122, "Papworth Everard"), + (53.498, -0.415, "West Lindsey"), + (53.392, -0.022, "Brackenborough with Little Grimsby"), + (53.463, -0.027, "Fulstow"), + (52.766, 0.31, "Terrington St Clement"), + (53.1540, -1.8034, "Hartington Town Quarter"), + (51.8532, -0.8829, "Fleet Marston"), + (51.4785, -0.354, "London Borough of Hounslow"), + (51.9687, -0.0327, "Buckland, Hertfordshire"), + (51.0804, -2.3263, "Zeals"), + (55.7644, -4.1770, "East Kilbride"), + (51.4520, -2.6210, "Bristol"), ] class QueryError(Exception): @@ -91,13 +90,13 @@ def random_location(): elements = get_osm_elements(lat, lon) result = do_lookup(elements, lat, lon) - return render_template('random.html', lat=lat, lon=lon, result=result, elements=elements) + return render_template("random.html", lat=lat, lon=lon, result=result, elements=elements) @app.route("/wikidata_tag") def wikidata_tag(): - lat = float(request.args.get('lat')) - lon = float(request.args.get('lon')) + lat = float(request.args.get("lat")) + lon = float(request.args.get("lon")) scotland_code = get_scotland_code(lat, lon) @@ -110,32 +109,32 @@ def wikidata_tag(): elements = get_osm_elements(lat, lon) result = do_lookup(elements, lat, lon) - return render_template('wikidata_tag.html', lat=lat, lon=lon, result=result, elements=elements) + return render_template("wikidata_tag.html", lat=lat, lon=lon, result=result, elements=elements) @app.route("/detail") def detail_page(): try: - lat, lon = [float(request.args.get(param)) for param in ('lat', 'lon')] + lat, lon = [float(request.args.get(param)) for param in ("lat", "lon")] except TypeError: - return redirect(url_for('index')) + return redirect(url_for("index")) reply = lat_lon_to_wikidata(lat, lon) - return render_template('random.html', lat=lat, lon=lon, **reply) + return render_template("random.html", lat=lat, lon=lon, **reply) def bounding_box_area(element): - bbox = element['bounds'] + bbox = element["bounds"] - x = distance((bbox['maxlat'], bbox['minlon']), (bbox['maxlat'], bbox['maxlon'])) - y = distance((bbox['minlat'], bbox['maxlon']), (bbox['maxlat'], bbox['minlon'])) + x = distance((bbox["maxlat"], bbox["minlon"]), (bbox["maxlat"], bbox["maxlon"])) + y = distance((bbox["minlat"], bbox["maxlon"]), (bbox["maxlat"], bbox["minlon"])) return x.km * y.km def wd_to_qid(wd): - # expecting {'type': 'url', 'value': 'https://www.wikidata.org/wiki/Q30'} - if wd['type'] == 'uri': - return wd_uri_to_qid(wd['value']) + # expecting {"type": "url", "value": "https://www.wikidata.org/wiki/Q30"} + if wd["type"] == "uri": + return wd_uri_to_qid(wd["value"]) def wd_uri_to_qid(value): @@ -144,15 +143,15 @@ def wd_uri_to_qid(value): def build_dict(hit, lat, lon): - coords = {'lat': lat, 'lon': lon} + coords = {"lat": lat, "lon": lon} if hit is None: return dict(commons_cat=None, missing=True, coords=coords) - commons_cat = hit['commons_cat'] - url = commons_cat_start + urllib.parse.quote(commons_cat.replace(' ', '_')) - return dict(commons_cat={'title': commons_cat, 'url': url}, + commons_cat = hit["commons_cat"] + url = commons_cat_start + urllib.parse.quote(commons_cat.replace(" ", "_")) + return dict(commons_cat={"title": commons_cat, "url": url}, coords=coords, - admin_level=hit.get('admin_level'), - wikidata=hit['wikidata']) + admin_level=hit.get("admin_level"), + wikidata=hit["wikidata"]) def do_lookup(elements, lat, lon): @@ -160,25 +159,25 @@ def do_lookup(elements, lat, lon): hit = osm_lookup(elements, lat, lon) except QueryError as e: return { - 'query': e.query, - 'error': e.r.text, - 'query_url': 'https://query.wikidata.org/#' + e.query, + "query": e.query, + "error": e.r.text, + "query_url": "https://query.wikidata.org/#" + e.query, } return build_dict(hit, lat, lon) def get_scotland_code(lat, lon): - conn = psycopg2.connect(dbname='geocode', user='geocode', password='ooK3ohgh', host='localhost') + conn = psycopg2.connect(dbname="geocode", user="geocode", password="ooK3ohgh", host="localhost") cur = conn.cursor() - point = f'ST_Transform(ST_SetSRID(ST_MakePoint({lon}, {lat}), 4326), 27700)' - cur.execute(f'select code, name from scotland where st_contains(geom, {point});') + point = f"ST_Transform(ST_SetSRID(ST_MakePoint({lon}, {lat}), 4326), 27700)" + cur.execute(f"select code, name from scotland where st_contains(geom, {point});") row = cur.fetchone() # expand search, disabled for now 2020-04-20 if not row: - cur.execute(f'select code, name from scotland where ST_DWithin(geom, {point}, 100);') + cur.execute(f"select code, name from scotland where ST_DWithin(geom, {point}, 100);") row = cur.fetchone() conn.close() @@ -280,45 +279,43 @@ def lat_lon_to_wikidata(lat, lon): @app.route("/") def index(): - q = request.args.get('q') + q = request.args.get("q") if q and q.strip(): - lat, lon = [v.strip() for v in q.split(',', 1)] - return redirect(url_for('detail_page', lat=lat, lon=lon)) + lat, lon = [v.strip() for v in q.split(",", 1)] + return redirect(url_for("detail_page", lat=lat, lon=lon)) - lat = request.args.get('lat') - lon = request.args.get('lon') + lat = request.args.get("lat") + lon = request.args.get("lon") if lat is None or lon is None: samples.sort(key=lambda row: row[2]) - return render_template('index.html', samples=samples) + return render_template("index.html", samples=samples) - return jsonify(lat_lon_to_wikidata(lat, lon)['result']) + return jsonify(lat_lon_to_wikidata(lat, lon)["result"]) def wikidata_api_call(params): - call_params = { - 'format': 'json', - 'formatversion': 2, - **params, - } + return requests.get( + "https://www.wikidata.org/w/api.php", + params={"format": "json", "formatversion": 2, **params}, + headers=headers + ).json() - r = requests.get(wikidata_url, params=call_params, headers=headers) - return r def get_entity(qid): - json_data = wikidata_api_call({'action': 'wbgetentities', 'ids': qid}).json() + json_data = wikidata_api_call({"action": "wbgetentities", "ids": qid}) try: - entity = list(json_data['entities'].values())[0] + entity = list(json_data["entities"].values())[0] except KeyError: return - if 'missing' not in entity: + if "missing" not in entity: return entity def qid_to_commons_category(qid): entity = get_entity(qid) try: - commons_cat = entity['claims']['P373'][0]['mainsnak']['datavalue']['value'] + commons_cat = entity["claims"]["P373"][0]["mainsnak"]["datavalue"]["value"] except Exception: commons_cat = None @@ -326,9 +323,11 @@ def qid_to_commons_category(qid): def wdqs(query): - r = requests.post(wikidata_query_api_url, - data={'query': query, 'format': 'json'}, - headers=headers) + r = requests.post( + wikidata_query_api_url, + data={"query": query, "format": "json"}, + headers=headers + ) try: return r.json() @@ -336,12 +335,8 @@ def wdqs(query): raise QueryError(query, r) -def endpoint(): - return OVERPASS_URL + '/api/interpreter' - - def run_query(oql, error_on_rate_limit=True): - return requests.post(endpoint(), + return requests.post(OVERPASS_URL + '/api/interpreter', data=oql.encode('utf-8'), headers=headers) @@ -411,111 +406,91 @@ SELECT DISTINCT ?item ?itemLabel ?commonsSiteLink ?commonsCat WHERE { def unescape_title(t): - return urllib.parse.unquote(t.replace('_', ' ')) + return urllib.parse.unquote(t.replace("_", " ")) def commons_from_rows(rows): for row in rows: - if 'commonsCat' in row: - qid = wd_to_qid(row['item']) - return {'wikidata': qid, - 'commons_cat': row['commonsCat']['value']} - if 'commonsSiteLink' in row: - site_link = row['commonsSiteLink']['value'] - qid = wd_to_qid(row['item']) + if "commonsCat" in row: + qid = wd_to_qid(row["item"]) + return {"wikidata": qid, + "commons_cat": row["commonsCat"]["value"]} + if "commonsSiteLink" in row: + site_link = row["commonsSiteLink"]["value"] + qid = wd_to_qid(row["item"]) cat = unescape_title(site_link[len(commons_cat_start):]) - return {'wikidata': qid, 'commons_cat': cat} + return {"wikidata": qid, "commons_cat": cat} def get_commons_cat_from_gss(gss): - print('GSS:', gss) - rows = lookup_gss_in_wikidata(gss) - return commons_from_rows(rows) + return commons_from_rows(lookup_gss_in_wikidata(gss)) def get_osm_elements(lat, lon): - filename = f'cache/{lat}_{lon}.json' + filename = f"cache/{lat}_{lon}.json" if use_cache and os.path.exists(filename): - elements = json.load(open(filename))['elements'] + elements = json.load(open(filename))["elements"] else: r = is_in_lat_lon(lat, lon) if use_cache: - open(filename, 'wb').write(r.content) - elements = r.json()['elements'] + open(filename, "wb").write(r.content) + elements = r.json()["elements"] return elements def osm_lookup(elements, lat, lon): - is_in = [] elements.sort(key=lambda e: bounding_box_area(e)) - if False: - for e in sorted(elements, key=lambda e: e['area']): - try: - admin_level = int(e['tags']['admin_level']) - except (ValueError, KeyError): - admin_level = None - - if admin_level is None: - if e['id'] == 6038068: # Great Britain - continue - if 'place' in e['tags'] or e['tags'].get('type') == 'boundary': - is_in.append((99, e['tags'])) - continue - - is_in.append((admin_level, e['tags'])) - - # for _, tags in sorted(is_in, key=lambda i: i[0], reverse=True): for e in elements: - if 'tags' not in e: + if "tags" not in e: continue - tags = e['tags'] - admin_level_tag = tags.get('admin_level') + tags = e["tags"] + admin_level_tag = tags.get("admin_level") admin_level = int(admin_level_tag) if admin_level_tag and admin_level_tag.isdigit() else None - if not admin_level and tags.get('boundary') != 'political': + if not admin_level and tags.get("boundary") != "political": continue - if 'wikidata' in tags: - qid = tags['wikidata'] + if "wikidata" in tags: + qid = tags["wikidata"] commons = qid_to_commons_category(qid) if commons: return { - 'wikidata': qid, - 'commons_cat': commons, - 'admin_level': admin_level, + "wikidata": qid, + "commons_cat": commons, + "admin_level": admin_level, } - gss = tags.get('ref:gss') + gss = tags.get("ref:gss") if gss: ret = get_commons_cat_from_gss(gss) if ret: - ret['admin_level'] = admin_level + ret["admin_level"] = admin_level return ret - name = tags.get('name') + name = tags.get("name") if not name: continue - if name.endswith(' CP'): + if name.endswith(" CP"): name = name[:-3] rows = lookup_wikidata_by_name(name, lat, lon) if len(rows) == 1: ret = commons_from_rows(rows) if ret: - ret['admin_level'] = admin_level + ret["admin_level"] = admin_level return ret - has_wikidata_tag = [e['tags'] for e in elements if 'wikidata' in e['tags']] + has_wikidata_tag = [e["tags"] for e in elements if "wikidata" in e["tags"]] if len(has_wikidata_tag) != 1: return - qid = has_wikidata_tag[0]['wikidata'] + qid = has_wikidata_tag[0]["wikidata"] return { - 'wikidata': qid, - 'commons_cat': qid_to_commons_category(qid), - 'admin_level': admin_level, + "wikidata": qid, + "commons_cat": qid_to_commons_category(qid), + "admin_level": admin_level, } -if __name__ == '__main__': - app.run(host='0.0.0.0') +if __name__ == "__main__": + app.run(host="0.0.0.0")