diff --git a/matcher/api.py b/matcher/api.py index 38ca8fa..3395035 100644 --- a/matcher/api.py +++ b/matcher/api.py @@ -1,8 +1,8 @@ +import collections import json import os.path import re import typing -from collections import Counter, defaultdict import flask from sqlalchemy import and_, func, or_, text @@ -15,6 +15,8 @@ from sqlalchemy.types import Float from matcher import database, model, wikidata, wikidata_api from matcher.planet import line, point, polygon +TagsType = dict[str, str] + srid = 4326 re_point = re.compile(r"^POINT\((.+) (.+)\)$") entity_keys = {"labels", "sitelinks", "aliases", "claims", "descriptions", "lastrevid"} @@ -121,7 +123,7 @@ def make_envelope_around_point(lat: float, lon: float, distance: float): return func.ST_MakeEnvelope(west, south, east, north, srid) -def drop_way_area(tags: dict[str, str]) -> dict[str, str]: +def drop_way_area(tags: TagsType) -> TagsType: """Remove the way_area field from a tags dict.""" if "way_area" in tags: del tags["way_area"] @@ -198,7 +200,7 @@ def get_and_save_item(qid: str) -> model.Item | None: def get_isa_count(items: list[model.Item]) -> list[tuple[str, int]]: """List of IsA counts.""" - isa_count: Counter[str] = Counter() + isa_count: collections.Counter[str] = collections.Counter() for item in items: if not item: continue @@ -332,7 +334,7 @@ def get_item_tags(item: model.Item) -> dict[str, list[str]]: (isa, []) for isa in get_items(isa_list) ] - osm_list = defaultdict(list) + osm_list = collections.defaultdict(list) skip_isa = {row[0] for row in database.session.query(model.SkipIsA.item_id)} @@ -392,7 +394,7 @@ def get_tags_for_isa_item(item): isa_list = [item.item_id] isa_items = [(item, [])] - osm_list = defaultdict(list) + osm_list = collections.defaultdict(list) skip_isa = {row[0] for row in database.session.query(model.SkipIsA.item_id)} @@ -566,10 +568,10 @@ def get_preset_translations(): return {} -def get_presets_from_tags(ending, tags): +def get_presets_from_tags(ending: str, tags: TagsType) -> list[dict[str, typing.Any]]: translations = get_preset_translations() - found = [] + found: list[dict[str, typing.Any]] = [] for k, v in tags.items(): if k == "amenity" and v == "clock" and tags.get("display") == "sundial": @@ -594,7 +596,8 @@ def get_presets_from_tags(ending, tags): return found -def find_preset_file(k, v, ending): +def find_preset_file(k: str, v: str, ending: str) -> dict[str, str] | None: + """Find preset file.""" app = flask.current_app ts_dir = app.config["ID_TAGGING_SCHEMA_DIR"] preset_dir = os.path.join(ts_dir, "data", "presets") @@ -631,18 +634,22 @@ def find_preset_file(k, v, ending): "filename": filename, } + return None -def address_from_tags(tags): + +def address_from_tags(tags: TagsType) -> str | None: + """Build list of addresses based on OSM tags.""" keys = ["street", "housenumber"] if not all("addr:" + k in tags for k in keys): - return + return None if flask.g.street_number_first: keys.reverse() return " ".join(tags["addr:" + k] for k in keys) -def address_node_label(tags): +def address_node_label(tags: TagsType) -> str | None: + """Label for an OSM node, based on tags.""" address = address_from_tags(tags) return f"{tags['name']} ({address})" if "name" in tags else address @@ -659,7 +666,8 @@ def get_address_nodes_within_building(osm_id, bbox_list): return [node.tags for node in q] -def osm_display_name(tags): +def osm_display_name(tags: dict[str, str]) -> str | None: + """Get name to display from OSM tags.""" keys = ( "bridge:name", "tunnel:name", @@ -668,9 +676,7 @@ def osm_display_name(tags): "addr:housename", "inscription", ) - for key in keys: - if key in tags: - return tags[key] + return next((tags[key] for key in keys if key in tags), None) def street_address_in_tags(tags): @@ -855,7 +861,7 @@ def find_osm_candidates(item, limit=80, max_distance=450, names=None): if part_of: cur["part_of"] = part_of - if address := address_from_tags(tags): + if address := address_from_tags(typing.cast(TagsType, tags)): cur["address"] = address nearby.append(cur) @@ -863,25 +869,28 @@ def find_osm_candidates(item, limit=80, max_distance=450, names=None): return nearby -def get_item(item_id): +def get_item(item_id: int) -> model.Item | None: """Retrieve a Wikidata item, either from the database or from Wikidata.""" - item = model.Item.query.get(item_id) return item or get_and_save_item(f"Q{item_id}") -def get_item_street_addresses(item): +def get_item_street_addresses(item: model.Item) -> list[str]: + """Hunt for street addresses for the given item.""" street_address = [addr["text"] for addr in item.get_claim("P6375") if addr] if street_address or "P669" not in item.claims: return street_address - for claim in item.claims["P669"]: + assert isinstance(item.claims, dict) + claims: wikidata.Claims = item.claims + for claim in claims["P669"]: qualifiers = claim.get("qualifiers") if not qualifiers or "P670" not in qualifiers: continue number = qualifiers["P670"][0]["datavalue"]["value"] street_item = get_item(claim["mainsnak"]["datavalue"]["value"]["numeric-id"]) + assert street_item street = street_item.label() for q in qualifiers["P670"]: number = q["datavalue"]["value"] @@ -1059,7 +1068,15 @@ def isa_incremental_search(search_terms): return ret -def get_place_items(osm_type, osm_id): +class PlaceItems(typing.TypedDict): + """Place items.""" + + count: int + items: list[model.Item] + + +def get_place_items(osm_type: int, osm_id: int) -> PlaceItems: + """Return place items for given osm_type and osm_id.""" src_id = osm_id * {"way": 1, "relation": -1}[osm_type] q = (