Type hints and docstrings.

This commit is contained in:
Edward Betts 2023-05-14 21:04:26 +00:00
parent 49510843a7
commit b49ac353f5

View file

@ -1,8 +1,8 @@
import collections
import json import json
import os.path import os.path
import re import re
import typing import typing
from collections import Counter, defaultdict
import flask import flask
from sqlalchemy import and_, func, or_, text from sqlalchemy import and_, func, or_, text
@ -15,6 +15,8 @@ from sqlalchemy.types import Float
from matcher import database, model, wikidata, wikidata_api from matcher import database, model, wikidata, wikidata_api
from matcher.planet import line, point, polygon from matcher.planet import line, point, polygon
TagsType = dict[str, str]
srid = 4326 srid = 4326
re_point = re.compile(r"^POINT\((.+) (.+)\)$") re_point = re.compile(r"^POINT\((.+) (.+)\)$")
entity_keys = {"labels", "sitelinks", "aliases", "claims", "descriptions", "lastrevid"} entity_keys = {"labels", "sitelinks", "aliases", "claims", "descriptions", "lastrevid"}
@ -121,7 +123,7 @@ def make_envelope_around_point(lat: float, lon: float, distance: float):
return func.ST_MakeEnvelope(west, south, east, north, srid) return func.ST_MakeEnvelope(west, south, east, north, srid)
def drop_way_area(tags: dict[str, str]) -> dict[str, str]: def drop_way_area(tags: TagsType) -> TagsType:
"""Remove the way_area field from a tags dict.""" """Remove the way_area field from a tags dict."""
if "way_area" in tags: if "way_area" in tags:
del tags["way_area"] del tags["way_area"]
@ -198,7 +200,7 @@ def get_and_save_item(qid: str) -> model.Item | None:
def get_isa_count(items: list[model.Item]) -> list[tuple[str, int]]: def get_isa_count(items: list[model.Item]) -> list[tuple[str, int]]:
"""List of IsA counts.""" """List of IsA counts."""
isa_count: Counter[str] = Counter() isa_count: collections.Counter[str] = collections.Counter()
for item in items: for item in items:
if not item: if not item:
continue continue
@ -332,7 +334,7 @@ def get_item_tags(item: model.Item) -> dict[str, list[str]]:
(isa, []) for isa in get_items(isa_list) (isa, []) for isa in get_items(isa_list)
] ]
osm_list = defaultdict(list) osm_list = collections.defaultdict(list)
skip_isa = {row[0] for row in database.session.query(model.SkipIsA.item_id)} skip_isa = {row[0] for row in database.session.query(model.SkipIsA.item_id)}
@ -392,7 +394,7 @@ def get_tags_for_isa_item(item):
isa_list = [item.item_id] isa_list = [item.item_id]
isa_items = [(item, [])] isa_items = [(item, [])]
osm_list = defaultdict(list) osm_list = collections.defaultdict(list)
skip_isa = {row[0] for row in database.session.query(model.SkipIsA.item_id)} skip_isa = {row[0] for row in database.session.query(model.SkipIsA.item_id)}
@ -566,10 +568,10 @@ def get_preset_translations():
return {} return {}
def get_presets_from_tags(ending, tags): def get_presets_from_tags(ending: str, tags: TagsType) -> list[dict[str, typing.Any]]:
translations = get_preset_translations() translations = get_preset_translations()
found = [] found: list[dict[str, typing.Any]] = []
for k, v in tags.items(): for k, v in tags.items():
if k == "amenity" and v == "clock" and tags.get("display") == "sundial": if k == "amenity" and v == "clock" and tags.get("display") == "sundial":
@ -594,7 +596,8 @@ def get_presets_from_tags(ending, tags):
return found return found
def find_preset_file(k, v, ending): def find_preset_file(k: str, v: str, ending: str) -> dict[str, str] | None:
"""Find preset file."""
app = flask.current_app app = flask.current_app
ts_dir = app.config["ID_TAGGING_SCHEMA_DIR"] ts_dir = app.config["ID_TAGGING_SCHEMA_DIR"]
preset_dir = os.path.join(ts_dir, "data", "presets") preset_dir = os.path.join(ts_dir, "data", "presets")
@ -631,18 +634,22 @@ def find_preset_file(k, v, ending):
"filename": filename, "filename": filename,
} }
return None
def address_from_tags(tags):
def address_from_tags(tags: TagsType) -> str | None:
"""Build list of addresses based on OSM tags."""
keys = ["street", "housenumber"] keys = ["street", "housenumber"]
if not all("addr:" + k in tags for k in keys): if not all("addr:" + k in tags for k in keys):
return return None
if flask.g.street_number_first: if flask.g.street_number_first:
keys.reverse() keys.reverse()
return " ".join(tags["addr:" + k] for k in keys) return " ".join(tags["addr:" + k] for k in keys)
def address_node_label(tags): def address_node_label(tags: TagsType) -> str | None:
"""Label for an OSM node, based on tags."""
address = address_from_tags(tags) address = address_from_tags(tags)
return f"{tags['name']} ({address})" if "name" in tags else address return f"{tags['name']} ({address})" if "name" in tags else address
@ -659,7 +666,8 @@ def get_address_nodes_within_building(osm_id, bbox_list):
return [node.tags for node in q] return [node.tags for node in q]
def osm_display_name(tags): def osm_display_name(tags: dict[str, str]) -> str | None:
"""Get name to display from OSM tags."""
keys = ( keys = (
"bridge:name", "bridge:name",
"tunnel:name", "tunnel:name",
@ -668,9 +676,7 @@ def osm_display_name(tags):
"addr:housename", "addr:housename",
"inscription", "inscription",
) )
for key in keys: return next((tags[key] for key in keys if key in tags), None)
if key in tags:
return tags[key]
def street_address_in_tags(tags): def street_address_in_tags(tags):
@ -855,7 +861,7 @@ def find_osm_candidates(item, limit=80, max_distance=450, names=None):
if part_of: if part_of:
cur["part_of"] = part_of cur["part_of"] = part_of
if address := address_from_tags(tags): if address := address_from_tags(typing.cast(TagsType, tags)):
cur["address"] = address cur["address"] = address
nearby.append(cur) nearby.append(cur)
@ -863,25 +869,28 @@ def find_osm_candidates(item, limit=80, max_distance=450, names=None):
return nearby return nearby
def get_item(item_id): def get_item(item_id: int) -> model.Item | None:
"""Retrieve a Wikidata item, either from the database or from Wikidata.""" """Retrieve a Wikidata item, either from the database or from Wikidata."""
item = model.Item.query.get(item_id) item = model.Item.query.get(item_id)
return item or get_and_save_item(f"Q{item_id}") return item or get_and_save_item(f"Q{item_id}")
def get_item_street_addresses(item): def get_item_street_addresses(item: model.Item) -> list[str]:
"""Hunt for street addresses for the given item."""
street_address = [addr["text"] for addr in item.get_claim("P6375") if addr] street_address = [addr["text"] for addr in item.get_claim("P6375") if addr]
if street_address or "P669" not in item.claims: if street_address or "P669" not in item.claims:
return street_address return street_address
for claim in item.claims["P669"]: assert isinstance(item.claims, dict)
claims: wikidata.Claims = item.claims
for claim in claims["P669"]:
qualifiers = claim.get("qualifiers") qualifiers = claim.get("qualifiers")
if not qualifiers or "P670" not in qualifiers: if not qualifiers or "P670" not in qualifiers:
continue continue
number = qualifiers["P670"][0]["datavalue"]["value"] number = qualifiers["P670"][0]["datavalue"]["value"]
street_item = get_item(claim["mainsnak"]["datavalue"]["value"]["numeric-id"]) street_item = get_item(claim["mainsnak"]["datavalue"]["value"]["numeric-id"])
assert street_item
street = street_item.label() street = street_item.label()
for q in qualifiers["P670"]: for q in qualifiers["P670"]:
number = q["datavalue"]["value"] number = q["datavalue"]["value"]
@ -1059,7 +1068,15 @@ def isa_incremental_search(search_terms):
return ret return ret
def get_place_items(osm_type, osm_id): class PlaceItems(typing.TypedDict):
"""Place items."""
count: int
items: list[model.Item]
def get_place_items(osm_type: int, osm_id: int) -> PlaceItems:
"""Return place items for given osm_type and osm_id."""
src_id = osm_id * {"way": 1, "relation": -1}[osm_type] src_id = osm_id * {"way": 1, "relation": -1}[osm_type]
q = ( q = (