From 50c222d66a05b10ae29112aa4c4d919cb63550df Mon Sep 17 00:00:00 2001 From: Edward Betts Date: Sat, 4 Nov 2023 20:49:36 +0000 Subject: [PATCH] Split up code into functions to ease comprehension --- lookup.py | 129 ++++++++++++++++++++++++++++++------------------------ 1 file changed, 73 insertions(+), 56 deletions(-) diff --git a/lookup.py b/lookup.py index b776652..575c339 100755 --- a/lookup.py +++ b/lookup.py @@ -15,6 +15,9 @@ app = Flask(__name__) app.config.from_object("config.default") database.init_app(app) +Tags = typing.Mapping[str, str] +Elements = sqlalchemy.orm.query.Query[model.Polygon] + def get_random_lat_lon() -> tuple[float, float]: """Select random lat/lon within the UK.""" @@ -28,12 +31,10 @@ def get_random_lat_lon() -> tuple[float, float]: return lat, lon -Elements = sqlalchemy.orm.query.Query - - def do_lookup( elements: Elements, lat: str | float, lon: str | float ) -> wikidata.WikidataDict: + """Do lookup.""" try: hit = osm_lookup(elements, lat, lon) except wikidata.QueryError as e: @@ -47,6 +48,7 @@ def do_lookup( def lat_lon_to_wikidata(lat: str | float, lon: str | float) -> dict[str, typing.Any]: + """Lookup lat/lon and find most appropriate Wikidata item.""" scotland_code = scotland.get_scotland_code(lat, lon) elements: typing.Any @@ -66,8 +68,11 @@ def lat_lon_to_wikidata(lat: str | float, lon: str | float) -> dict[str, typing. return {"elements": elements, "result": result} admin_level = result.get("admin_level") + if not admin_level: + return {"elements": elements, "result": result} - if not admin_level or admin_level >= 7: + assert isinstance(admin_level, int) + if admin_level >= 7: return {"elements": elements, "result": result} row = wikidata.geosearch(lat, lon) @@ -79,71 +84,83 @@ def lat_lon_to_wikidata(lat: str | float, lon: str | float) -> dict[str, typing. return {"elements": elements, "result": result} +def get_admin_level(tags: Tags) -> int | None: + """Read admin_level from tags.""" + admin_level_tag = tags.get("admin_level") + return ( + int(admin_level_tag) if admin_level_tag and admin_level_tag.isdigit() else None + ) + + +def hit_from_wikidata_tag(tags: Tags) -> wikidata.Hit | None: + """Check element for a wikidata tag.""" + return ( + { + "wikidata": qid, + "commons_cat": commons, + } + if "wikidata" in tags + and (commons := wikidata.qid_to_commons_category(qid := tags["wikidata"])) + else None + ) + + +def hit_from_ref_gss_tag(tags: Tags) -> wikidata.Hit | None: + """Check element for rss:gss tag.""" + gss = tags.get("ref:gss") + return wikidata.get_commons_cat_from_gss(gss) if gss else None + + +def hit_from_name( + tags: Tags, lat: str | float, lon: str | float +) -> wikidata.Hit | None: + """Use name to look for hit.""" + if not (name := tags.get("name")): + return None + if name.endswith(" CP"): # civil parish + name = name[:-3] + + rows = wikidata.lookup_wikidata_by_name(name, lat, lon) + return wikidata.commons_from_rows(rows) if len(rows) == 1 else None + + def osm_lookup( - elements: Elements, lat: str | float, lon: str | float # type:ignore + elements: Elements, lat: str | float, lon: str | float ) -> wikidata.Hit | None: """OSM lookup.""" ret: wikidata.Hit | None for e in elements: assert isinstance(e, model.Polygon) assert e.tags - tags: typing.Mapping[str, typing.Any] = e.tags - admin_level_tag = tags.get("admin_level") - admin_level: int | None = ( - int(admin_level_tag) - if admin_level_tag and admin_level_tag.isdigit() - else None - ) + tags: typing.Mapping[str, str] = e.tags + admin_level: int | None = get_admin_level(tags) if not admin_level and tags.get("boundary") != "political": continue - if "wikidata" in tags: - qid = tags["wikidata"] - commons = wikidata.qid_to_commons_category(qid) - if commons: - return { - "wikidata": qid, - "commons_cat": commons, - "admin_level": admin_level, - "element": e.osm_id, - "geojson": typing.cast(str, e.geojson_str), - } - gss = tags.get("ref:gss") - if gss: - ret = wikidata.get_commons_cat_from_gss(gss) - if ret: - ret["admin_level"] = admin_level - ret["element"] = e.osm_id - ret["geojson"] = typing.cast(str, e.geojson_str) - return ret - - name = tags.get("name") - if not name: + if not ( + (hit := hit_from_wikidata_tag(tags)) + or (hit := hit_from_ref_gss_tag(tags)) + or (hit := hit_from_name(tags, lat, lon)) + ): continue - if name.endswith(" CP"): - name = name[:-3] - rows = wikidata.lookup_wikidata_by_name(name, lat, lon) + hit["admin_level"] = admin_level + hit["element"] = e.osm_id + hit["geojson"] = typing.cast(str, e.geojson_str) + return hit - if len(rows) == 1: - ret = wikidata.commons_from_rows(rows) - if ret: - ret["admin_level"] = admin_level - ret["element"] = e.osm_id - return ret - - has_wikidata_tag = [e.tags for e in elements if e.tags.get("wikidata")] + has_wikidata_tag = [e for e in elements if e.tags.get("wikidata")] if len(has_wikidata_tag) != 1: return None - assert has_wikidata_tag[0] - qid = has_wikidata_tag[0]["wikidata"] - return typing.cast( - wikidata.Hit, - { - "wikidata": qid, - "commons_cat": wikidata.qid_to_commons_category(qid), - "admin_level": admin_level, - }, - ) + e = has_wikidata_tag[0] + assert e.tags + qid = e.tags["wikidata"] + return { + "wikidata": qid, + "element": e.osm_id, + "geojson": typing.cast(str, e.geojson_str), + "commons_cat": wikidata.qid_to_commons_category(qid), + "admin_level": admin_level, + } def redirect_to_detail(q: str) -> Response: @@ -231,7 +248,7 @@ def detail_page() -> Response | str: str=str, element_id=element, geojson=geojson, - **reply + **reply, )