From 733ca3aa8fefaea85caa6aa018926cc5266b50b5 Mon Sep 17 00:00:00 2001 From: Edward Betts Date: Sat, 13 May 2023 21:57:58 +0200 Subject: [PATCH] Update --- matcher/api.py | 466 +++++++++++++++++++++++++++------------------ matcher/commons.py | 13 +- matcher/planet.py | 18 +- matcher/utils.py | 100 ++++++---- 4 files changed, 366 insertions(+), 231 deletions(-) diff --git a/matcher/api.py b/matcher/api.py index 2199311..5fd8a9c 100644 --- a/matcher/api.py +++ b/matcher/api.py @@ -1,19 +1,21 @@ -from sqlalchemy import func, or_, and_, text +import json +import os.path +import re +from collections import Counter, defaultdict + +from flask import current_app, g +from sqlalchemy import and_, func, or_, text +from sqlalchemy.dialects import postgresql from sqlalchemy.orm import selectinload from sqlalchemy.sql import select -from sqlalchemy.sql.expression import literal, union, cast, column +from sqlalchemy.sql.expression import cast, column, literal, union from sqlalchemy.types import Float -from sqlalchemy.dialects import postgresql -from matcher.planet import point, line, polygon -from matcher import model, database, wikidata_api, wikidata -from collections import Counter, defaultdict -from flask import g, current_app -import re -import os.path -import json + +from matcher import database, model, wikidata, wikidata_api +from matcher.planet import line, point, polygon srid = 4326 -re_point = re.compile(r'^POINT\((.+) (.+)\)$') +re_point = re.compile(r"^POINT\((.+) (.+)\)$") entity_keys = {"labels", "sitelinks", "aliases", "claims", "descriptions", "lastrevid"} tag_prefixes = { @@ -37,19 +39,19 @@ skip_tags = { "Key:brand", } -def get_country_iso3166_1(lat, lon): - """ - For a given lat/lon return a set of ISO country codes. + +def get_country_iso3166_1(lat: float, lon: float) -> set[str]: + """For a given lat/lon return a set of ISO country codes. Also cache the country code in the global object. Normally there should be only one country. """ - point = func.ST_SetSRID(func.ST_MakePoint(lon, lat), srid) alpha2_codes = set() - q = model.Polygon.query.filter(func.ST_Covers(model.Polygon.way, point), - model.Polygon.admin_level == "2") + q = model.Polygon.query.filter( + func.ST_Covers(model.Polygon.way, point), model.Polygon.admin_level == "2" + ) for country in q: alpha2 = country.tags.get("ISO3166-1") if not alpha2: @@ -60,22 +62,23 @@ def get_country_iso3166_1(lat, lon): return alpha2_codes -def is_street_number_first(lat, lon): +def is_street_number_first(lat: float, lon: float) -> bool: + """Is lat/lon within a country that puts number first in a street address.""" if lat is None or lon is None: return True alpha2 = get_country_iso3166_1(lat, lon) # Incomplete list of countries that put street number first. alpha2_number_first = { - 'GB', # United Kingdom - 'IE', # Ireland - 'US', # United States - 'MX', # Mexico - 'CA', # Canada - 'FR', # France - 'AU', # Australia - 'NZ', # New Zealand - 'ZA', # South Africa + "GB", # United Kingdom + "IE", # Ireland + "US", # United States + "MX", # Mexico + "CA", # Canada + "FR", # France + "AU", # Australia + "NZ", # New Zealand + "ZA", # South Africa } return bool(alpha2_number_first & alpha2) @@ -84,22 +87,26 @@ def is_street_number_first(lat, lon): def make_envelope(bounds): return func.ST_MakeEnvelope(*bounds, srid) + def get_bbox_centroid(bbox): bbox = make_envelope(bbox) centroid = database.session.query(func.ST_AsText(func.ST_Centroid(bbox))).scalar() return reversed(re_point.match(centroid).groups()) + def make_envelope_around_point(lat, lon, distance): conn = database.session.connection() p = func.ST_MakePoint(lon, lat) - s = select([ - func.ST_AsText(func.ST_Project(p, distance, func.radians(0))), - func.ST_AsText(func.ST_Project(p, distance, func.radians(90))), - func.ST_AsText(func.ST_Project(p, distance, func.radians(180))), - func.ST_AsText(func.ST_Project(p, distance, func.radians(270))), - ]) + s = select( + [ + func.ST_AsText(func.ST_Project(p, distance, func.radians(0))), + func.ST_AsText(func.ST_Project(p, distance, func.radians(90))), + func.ST_AsText(func.ST_Project(p, distance, func.radians(180))), + func.ST_AsText(func.ST_Project(p, distance, func.radians(270))), + ] + ) row = conn.execute(s).fetchone() coords = [[float(v) for v in re_point.match(i).groups()] for i in row] @@ -110,49 +117,64 @@ def make_envelope_around_point(lat, lon, distance): return func.ST_MakeEnvelope(west, south, east, north, srid) -def drop_way_area(tags): - """ Remove the way_area field from a tags dict. """ + +def drop_way_area(tags: dict[str, str]) -> dict[str, str]: + """Remove the way_area field from a tags dict.""" if "way_area" in tags: del tags["way_area"] return tags + def get_part_of(table_name, src_id, bbox): - table_map = {'point': point, 'line': line, 'polygon': polygon} + table_map = {"point": point, "line": line, "polygon": polygon} table_alias = table_map[table_name].alias() - s = (select([polygon.c.osm_id, - polygon.c.tags, - func.ST_Area(func.ST_Collect(polygon.c.way))]). - where(and_(func.ST_Intersects(bbox, polygon.c.way), - func.ST_Covers(polygon.c.way, table_alias.c.way), - table_alias.c.osm_id == src_id, - polygon.c.tags.has_key("name"), - or_( - polygon.c.tags.has_key("landuse"), - polygon.c.tags.has_key("amenity"), - ))). - group_by(polygon.c.osm_id, polygon.c.tags)) + s = ( + select( + [ + polygon.c.osm_id, + polygon.c.tags, + func.ST_Area(func.ST_Collect(polygon.c.way)), + ] + ) + .where( + and_( + func.ST_Intersects(bbox, polygon.c.way), + func.ST_Covers(polygon.c.way, table_alias.c.way), + table_alias.c.osm_id == src_id, + polygon.c.tags.has_key("name"), + or_( + polygon.c.tags.has_key("landuse"), + polygon.c.tags.has_key("amenity"), + ), + ) + ) + .group_by(polygon.c.osm_id, polygon.c.tags) + ) conn = database.session.connection() - return [{ - "type": "way" if osm_id > 0 else "relation", - "id": abs(osm_id), - "tags": tags, - "area": area, - } for osm_id, tags, area in conn.execute(s)] + return [ + { + "type": "way" if osm_id > 0 else "relation", + "id": abs(osm_id), + "tags": tags, + "area": area, + } + for osm_id, tags, area in conn.execute(s) + ] -def get_and_save_item(qid): - """ Download an item from Wikidata and cache it in the database. """ +def get_and_save_item(qid: str) -> model.Item | None: + """Download an item from Wikidata and cache it in the database.""" entity = wikidata_api.get_entity(qid) entity_qid = entity["id"] if entity_qid != qid: - print(f'redirect {qid} -> {entity_qid}') + print(f"redirect {qid} -> {entity_qid}") item = model.Item.query.get(entity_qid[1:]) return item if "claims" not in entity: - return + return None coords = wikidata.get_entity_coords(entity["claims"]) item_id = int(qid[1:]) @@ -171,8 +193,9 @@ def get_and_save_item(qid): return item -def get_isa_count(items): - isa_count = Counter() +def get_isa_count(items: list[model.Item]) -> list[tuple[int, int]]: + """List of IsA counts.""" + isa_count: Counter[int] = Counter() for item in items: if not item: continue @@ -199,13 +222,11 @@ def get_items_in_bbox(bbox): def get_osm_with_wikidata_tag(bbox, isa_filter=None): - bbox_str = ','.join(str(v) for v in bbox) + bbox_str = ",".join(str(v) for v in bbox) extra_sql = "" if isa_filter: - q = ( - model.Item.query.join(model.ItemLocation) - .filter(func.ST_Covers(make_envelope(bbox), - model.ItemLocation.location)) + q = model.Item.query.join(model.ItemLocation).filter( + func.ST_Covers(make_envelope(bbox), model.ItemLocation.location) ) q = add_isa_filter(q, isa_filter) qids = [isa.qid for isa in q] @@ -216,7 +237,8 @@ def get_osm_with_wikidata_tag(bbox, isa_filter=None): extra_sql += f" AND tags -> 'wikidata' in ({qid_list})" # easier than building this query with SQLAlchemy - sql = f''' + sql = ( + f""" SELECT tbl, osm_id, tags, ARRAY[ST_Y(centroid), ST_X(centroid)], geojson FROM ( SELECT 'point' as tbl, osm_id, tags, ST_AsText(ST_Centroid(way)) as centroid, ST_AsGeoJSON(way) as geojson @@ -235,24 +257,29 @@ UNION HAVING st_area(st_collect(way)) < 20 * st_area(ST_MakeEnvelope({bbox_str}, {srid})) ) as anon WHERE tags ? 'wikidata' -''' + extra_sql +""" + + extra_sql + ) conn = database.session.connection() result = conn.execute(text(sql)) print(sql) - point_sql = f''' + point_sql = ( + f""" SELECT 'point' as tbl, osm_id, tags, ST_AsText(ST_Centroid(way)) as centroid, ST_AsGeoJSON(way) as geojson FROM planet_osm_point WHERE ST_Intersects(ST_MakeEnvelope({bbox_str}, {srid}), way) and tags ? 'wikidata' -''' + extra_sql +""" + + extra_sql + ) print("point") print(point_sql) tagged = [] for tbl, osm_id, tags, centroid, geojson in result: - if tbl == 'point': + if tbl == "point": osm_type = "node" else: osm_type = "way" if osm_id > 0 else "relation" @@ -260,15 +287,17 @@ WHERE tags ? 'wikidata' name = tags.get("name") or tags.get("addr:housename") or "[no label]" - tagged.append({ - "identifier": f"{osm_type}/{osm_id}", - "id": osm_id, - "type": osm_type, - "geojson": json.loads(geojson), - "centroid": centroid, - "name": name, - "wikidata": tags["wikidata"], - }) + tagged.append( + { + "identifier": f"{osm_type}/{osm_id}", + "id": osm_id, + "type": osm_type, + "geojson": json.loads(geojson), + "centroid": centroid, + "name": name, + "wikidata": tags["wikidata"], + } + ) return tagged @@ -310,11 +339,13 @@ def get_item_tags(item): isa, isa_path = isa_items.pop() if not isa: continue - isa_path = isa_path + [{'qid': isa.qid, 'label': isa.label()}] + isa_path = isa_path + [{"qid": isa.qid, "label": isa.label()}] osm = [v for v in isa.get_claim("P1282") if v not in skip_tags] - osm += [extra.tag_or_key - for extra in model.ItemExtraKeys.query.filter_by(item_id=isa.item_id)] + osm += [ + extra.tag_or_key + for extra in model.ItemExtraKeys.query.filter_by(item_id=isa.item_id) + ] for i in osm: osm_list[i].append(isa_path[:]) @@ -369,14 +400,16 @@ def get_tags_for_isa_item(item): isa, isa_path = isa_items.pop() if not isa: continue - isa_path = isa_path + [{'qid': isa.qid, 'label': isa.label()}] + isa_path = isa_path + [{"qid": isa.qid, "label": isa.label()}] if isa.item_id not in items_checked_done: - items_checked.append({'qid': isa.qid, 'label': isa.label()}) + items_checked.append({"qid": isa.qid, "label": isa.label()}) items_checked_done.add(isa.item_id) osm = [v for v in isa.get_claim("P1282") if v not in skip_tags] - osm += [extra.tag_or_key - for extra in model.ItemExtraKeys.query.filter_by(item_id=isa.item_id)] + osm += [ + extra.tag_or_key + for extra in model.ItemExtraKeys.query.filter_by(item_id=isa.item_id) + ] for i in osm: osm_list[i].append(isa_path[:]) @@ -403,34 +436,31 @@ def get_tags_for_isa_item(item): seen.update(isa_list) isa_items += [(isa, isa_path) for isa in get_items(isa_list)] return { - 'tags': {key: list(values) for key, values in osm_list.items()}, - 'checked': items_checked, + "tags": {key: list(values) for key, values in osm_list.items()}, + "checked": items_checked, } def add_isa_filter(q, isa_qids): - q_subclass = database.session.query(model.Item.qid).filter( func.jsonb_path_query_array( model.Item.claims, - '$.P279[*].mainsnak.datavalue.value.id', - ).bool_op('?|')(list(isa_qids)) + "$.P279[*].mainsnak.datavalue.value.id", + ).bool_op("?|")(list(isa_qids)) ) subclass_qid = {qid for qid, in q_subclass.all()} isa = func.jsonb_path_query_array( model.Item.claims, - '$.P31[*].mainsnak.datavalue.value.id', - ).bool_op('?|') + "$.P31[*].mainsnak.datavalue.value.id", + ).bool_op("?|") return q.filter(isa(list(isa_qids | subclass_qid))) def wikidata_items_count(bounds, isa_filter=None): - - q = ( - model.Item.query.join(model.ItemLocation) - .filter(func.ST_Covers(make_envelope(bounds), model.ItemLocation.location)) + q = model.Item.query.join(model.ItemLocation).filter( + func.ST_Covers(make_envelope(bounds), model.ItemLocation.location) ) if isa_filter: @@ -440,12 +470,12 @@ def wikidata_items_count(bounds, isa_filter=None): return q.count() + def wikidata_isa_counts(bounds, isa_filter=None): db_bbox = make_envelope(bounds) - q = ( - model.Item.query.join(model.ItemLocation) - .filter(func.ST_Covers(db_bbox, model.ItemLocation.location)) + q = model.Item.query.join(model.ItemLocation).filter( + func.ST_Covers(db_bbox, model.ItemLocation.location) ) if isa_filter: @@ -474,12 +504,13 @@ def wikidata_isa_counts(bounds, isa_filter=None): return isa_count + def get_tag_filter(tags, tag_list): tag_filter = [] for tag_or_key in tag_list: if tag_or_key.startswith("Key:"): key = tag_or_key[4:] - tag_filter.append(and_(tags.has_key(key), tags[key] != 'no')) + tag_filter.append(and_(tags.has_key(key), tags[key] != "no")) for prefix in tag_prefixes: tag_filter.append(tags.has_key(f"{prefix}:{key}")) @@ -495,11 +526,11 @@ def get_tag_filter(tags, tag_list): def get_preset_translations(): app = current_app country_language = { - 'AU': 'en-AU', # Australia - 'GB': 'en-GB', # United Kingdom - 'IE': 'en-GB', # Ireland - 'IN': 'en-IN', # India - 'NZ': 'en-NZ', # New Zealand + "AU": "en-AU", # Australia + "GB": "en-GB", # United Kingdom + "IE": "en-GB", # Ireland + "IN": "en-IN", # India + "NZ": "en-NZ", # New Zealand } ts_dir = app.config["ID_TAGGING_SCHEMA_DIR"] translation_dir = os.path.join(ts_dir, "dist", "translations") @@ -520,13 +551,14 @@ def get_preset_translations(): return {} + def get_presets_from_tags(ending, tags): translations = get_preset_translations() found = [] for k, v in tags.items(): - if k == 'amenity' and v == 'clock' and tags.get('display') == 'sundial': + if k == "amenity" and v == "clock" and tags.get("display") == "sundial": tag_or_key = f"Tag:{k}={v}" found.append({"tag_or_key": tag_or_key, "name": "Sundial"}) continue @@ -604,8 +636,7 @@ def address_node_label(tags): def get_address_nodes_within_building(osm_id, bbox_list): q = model.Point.query.filter( polygon.c.osm_id == osm_id, - or_(*[func.ST_Intersects(bbox, model.Point.way) - for bbox in bbox_list]), + or_(*[func.ST_Intersects(bbox, model.Point.way) for bbox in bbox_list]), func.ST_Covers(polygon.c.way, model.Point.way), model.Point.tags.has_key("addr:street"), model.Point.tags.has_key("addr:housenumber"), @@ -615,8 +646,14 @@ def get_address_nodes_within_building(osm_id, bbox_list): def osm_display_name(tags): - keys = ("bridge:name", "tunnel:name", "lock_name", "name", "addr:housename", - "inscription") + keys = ( + "bridge:name", + "tunnel:name", + "lock_name", + "name", + "addr:housename", + "inscription", + ) for key in keys: if key in tags: return tags[key] @@ -625,6 +662,7 @@ def osm_display_name(tags): def street_address_in_tags(tags): return "addr:housenumber" in tags and "addr:street" in tags + def find_osm_candidates(item, limit=80, max_distance=450, names=None): item_id = item.item_id item_is_linear_feature = item.is_linear_feature() @@ -637,51 +675,94 @@ def find_osm_candidates(item, limit=80, max_distance=450, names=None): check_is_street_number_first(item.locations[0].get_lat_lon()) - bbox_list = [make_envelope_around_point(*loc.get_lat_lon(), max_distance) - for loc in item.locations] + bbox_list = [ + make_envelope_around_point(*loc.get_lat_lon(), max_distance) + for loc in item.locations + ] null_area = cast(None, Float) - dist = column('dist') - tags = column('tags', postgresql.HSTORE) + dist = column("dist") + tags = column("tags", postgresql.HSTORE) tag_list = get_item_tags(item) # tag_filters = get_tag_filter(point.c.tags, tag_list) # print(tag_filters) - s_point = (select([literal('point').label('t'), point.c.osm_id, point.c.tags.label('tags'), - func.min(func.ST_DistanceSphere(model.ItemLocation.location, point.c.way)).label('dist'), - func.ST_AsText(point.c.way), - func.ST_AsGeoJSON(point.c.way), - null_area]). - where(and_( - or_(*[func.ST_Intersects(bbox, point.c.way) - for bbox in bbox_list]), - model.ItemLocation.item_id == item_id, - or_(*get_tag_filter(point.c.tags, tag_list)))). - group_by(point.c.osm_id, point.c.tags, point.c.way)) + s_point = ( + select( + [ + literal("point").label("t"), + point.c.osm_id, + point.c.tags.label("tags"), + func.min( + func.ST_DistanceSphere(model.ItemLocation.location, point.c.way) + ).label("dist"), + func.ST_AsText(point.c.way), + func.ST_AsGeoJSON(point.c.way), + null_area, + ] + ) + .where( + and_( + or_(*[func.ST_Intersects(bbox, point.c.way) for bbox in bbox_list]), + model.ItemLocation.item_id == item_id, + or_(*get_tag_filter(point.c.tags, tag_list)), + ) + ) + .group_by(point.c.osm_id, point.c.tags, point.c.way) + ) - s_line = (select([literal('line').label('t'), line.c.osm_id, line.c.tags.label('tags'), - func.min(func.ST_DistanceSphere(model.ItemLocation.location, line.c.way)).label('dist'), - func.ST_AsText(func.ST_Centroid(func.ST_Collect(line.c.way))), - func.ST_AsGeoJSON(func.ST_Collect(line.c.way)), - null_area]). - where(and_( - or_(*[func.ST_Intersects(bbox, line.c.way) for bbox in bbox_list]), - model.ItemLocation.item_id == item_id, - or_(*get_tag_filter(line.c.tags, tag_list)))). - group_by(line.c.osm_id, line.c.tags)) + s_line = ( + select( + [ + literal("line").label("t"), + line.c.osm_id, + line.c.tags.label("tags"), + func.min( + func.ST_DistanceSphere(model.ItemLocation.location, line.c.way) + ).label("dist"), + func.ST_AsText(func.ST_Centroid(func.ST_Collect(line.c.way))), + func.ST_AsGeoJSON(func.ST_Collect(line.c.way)), + null_area, + ] + ) + .where( + and_( + or_(*[func.ST_Intersects(bbox, line.c.way) for bbox in bbox_list]), + model.ItemLocation.item_id == item_id, + or_(*get_tag_filter(line.c.tags, tag_list)), + ) + ) + .group_by(line.c.osm_id, line.c.tags) + ) - s_polygon = (select([literal('polygon').label('t'), polygon.c.osm_id, polygon.c.tags.label('tags'), - func.min(func.ST_DistanceSphere(model.ItemLocation.location, polygon.c.way)).label('dist'), - func.ST_AsText(func.ST_Centroid(func.ST_Collect(polygon.c.way))), - func.ST_AsGeoJSON(func.ST_Collect(polygon.c.way)), - func.ST_Area(func.ST_Collect(polygon.c.way))]). - where(and_( - or_(*[func.ST_Intersects(bbox, polygon.c.way) for bbox in bbox_list]), - model.ItemLocation.item_id == item_id, - or_(*get_tag_filter(polygon.c.tags, tag_list)))). - group_by(polygon.c.osm_id, polygon.c.tags). - having(func.ST_Area(func.ST_Collect(polygon.c.way)) < 20 * func.ST_Area(bbox_list[0]))) + s_polygon = ( + select( + [ + literal("polygon").label("t"), + polygon.c.osm_id, + polygon.c.tags.label("tags"), + func.min( + func.ST_DistanceSphere(model.ItemLocation.location, polygon.c.way) + ).label("dist"), + func.ST_AsText(func.ST_Centroid(func.ST_Collect(polygon.c.way))), + func.ST_AsGeoJSON(func.ST_Collect(polygon.c.way)), + func.ST_Area(func.ST_Collect(polygon.c.way)), + ] + ) + .where( + and_( + or_(*[func.ST_Intersects(bbox, polygon.c.way) for bbox in bbox_list]), + model.ItemLocation.item_id == item_id, + or_(*get_tag_filter(polygon.c.tags, tag_list)), + ) + ) + .group_by(polygon.c.osm_id, polygon.c.tags) + .having( + func.ST_Area(func.ST_Collect(polygon.c.way)) + < 20 * func.ST_Area(bbox_list[0]) + ) + ) tables = ([] if item_is_linear_feature else [s_point]) + [s_line, s_polygon] s = select([union(*tables).alias()]).where(dist < max_distance).order_by(dist) @@ -695,10 +776,14 @@ def find_osm_candidates(item, limit=80, max_distance=450, names=None): s = s.where(tags.has_key("name")) if "Key:amenity" in tag_list: - s = s.where(and_(tags["amenity"] != "bicycle_parking", - tags["amenity"] != "bicycle_repair_station", - tags["amenity"] != "atm", - tags["amenity"] != "recycling")) + s = s.where( + and_( + tags["amenity"] != "bicycle_parking", + tags["amenity"] != "bicycle_repair_station", + tags["amenity"] != "atm", + tags["amenity"] != "recycling", + ) + ) if limit: s = s.limit(limit) @@ -750,8 +835,9 @@ def find_osm_candidates(item, limit=80, max_distance=450, names=None): part_of = [] for bbox in bbox_list: - part_of += [i for i in get_part_of(table, src_id, bbox) - if i["tags"]["name"] != name] + part_of += [ + i for i in get_part_of(table, src_id, bbox) if i["tags"]["name"] != name + ] if part_of: cur["part_of"] = part_of @@ -762,8 +848,9 @@ def find_osm_candidates(item, limit=80, max_distance=450, names=None): return nearby + def get_item(item_id): - """ Retrieve a Wikidata item, either from the database or from Wikidata. """ + """Retrieve a Wikidata item, either from the database or from Wikidata.""" item = model.Item.query.get(item_id) return item or get_and_save_item(f"Q{item_id}") @@ -776,7 +863,7 @@ def get_item_street_addresses(item): for claim in item.claims["P669"]: qualifiers = claim.get("qualifiers") - if not qualifiers or 'P670' not in qualifiers: + if not qualifiers or "P670" not in qualifiers: continue number = qualifiers["P670"][0]["datavalue"]["value"] @@ -784,24 +871,26 @@ def get_item_street_addresses(item): street = street_item.label() for q in qualifiers["P670"]: number = q["datavalue"]["value"] - address = (f"{number} {street}" - if g.street_number_first - else f"{street} {number}") + address = ( + f"{number} {street}" if g.street_number_first else f"{street} {number}" + ) street_address.append(address) return street_address + def check_is_street_number_first(latlng): g.street_number_first = is_street_number_first(*latlng) + def item_detail(item): unsupported_relation_types = { - 'Q194356', # wind farm - 'Q2175765', # tram stop + "Q194356", # wind farm + "Q2175765", # tram stop } locations = [list(i.get_lat_lon()) for i in item.locations] - if not hasattr(g, 'street_number_first'): + if not hasattr(g, "street_number_first"): g.street_number_first = is_street_number_first(*locations[0]) image_filenames = item.get_claim("P18") @@ -811,20 +900,24 @@ def item_detail(item): heritage_designation = [] for v in item.get_claim("P1435"): if not v: - print('heritage designation missing:', item.qid) + print("heritage designation missing:", item.qid) continue heritage_designation_item = get_item(v["numeric-id"]) - heritage_designation.append({ - "qid": v["id"], - "label": heritage_designation_item.label(), - }) + heritage_designation.append( + { + "qid": v["id"], + "label": heritage_designation_item.label(), + } + ) isa_items = [get_item(isa["numeric-id"]) for isa in item.get_isa()] isa_lookup = {isa.qid: isa for isa in isa_items} - wikipedia_links = [{"lang": site[:-4], "title": link["title"]} - for site, link in sorted(item.sitelinks.items()) - if site.endswith("wiki") and len(site) < 8] + wikipedia_links = [ + {"lang": site[:-4], "title": link["title"]} + for site, link in sorted(item.sitelinks.items()) + if site.endswith("wiki") and len(site) < 8 + ] d = { "qid": item.qid, @@ -833,7 +926,9 @@ def item_detail(item): "markers": locations, "image_list": image_filenames, "street_address": street_address, - "isa_list": [{"qid": isa.qid, "label": isa.label()} for isa in isa_items if isa], + "isa_list": [ + {"qid": isa.qid, "label": isa.label()} for isa in isa_items if isa + ], "closed": item.closed(), "inception": item.time_claim("P571"), "p1619": item.time_claim("P1619"), @@ -851,8 +946,9 @@ def item_detail(item): unsupported = isa_lookup.keys() & unsupported_relation_types if unsupported: - d["unsupported_relation_types"] = [isa for isa in d["isa_list"] - if isa["qid"] in isa_lookup] + d["unsupported_relation_types"] = [ + isa for isa in d["isa_list"] if isa["qid"] in isa_lookup + ] return d @@ -891,7 +987,7 @@ def wikidata_items(bounds, isa_filter=None): } isa_count.append(isa) - return {'items': items, 'isa_count': isa_count} + return {"items": items, "isa_count": isa_count} def missing_wikidata_items(qids, lat, lon): @@ -926,12 +1022,13 @@ def missing_wikidata_items(qids, lat, lon): return dict(items=items, isa_count=isa_count) + def isa_incremental_search(search_terms): en_label = func.jsonb_extract_path_text(model.Item.labels, "en", "value") q = model.Item.query.filter( - model.Item.claims.has_key("P1282"), - en_label.ilike(f"%{search_terms}%"), - func.length(en_label) < 20, + model.Item.claims.has_key("P1282"), + en_label.ilike(f"%{search_terms}%"), + func.length(en_label) < 20, ) print(q.statement.compile(compile_kwargs={"literal_binds": True})) @@ -945,13 +1042,18 @@ def isa_incremental_search(search_terms): ret.append(cur) return ret -def get_place_items(osm_type, osm_id): - src_id = osm_id * {'way': 1, 'relation': -1}[osm_type] - q = (model.Item.query - .join(model.ItemLocation) - .join(model.Polygon, func.ST_Covers(model.Polygon.way, model.ItemLocation.location)) - .filter(model.Polygon.src_id == src_id)) +def get_place_items(osm_type, osm_id): + src_id = osm_id * {"way": 1, "relation": -1}[osm_type] + + q = ( + model.Item.query.join(model.ItemLocation) + .join( + model.Polygon, + func.ST_Covers(model.Polygon.way, model.ItemLocation.location), + ) + .filter(model.Polygon.src_id == src_id) + ) # sql = q.statement.compile(compile_kwargs={"literal_binds": True}) item_count = q.count() diff --git a/matcher/commons.py b/matcher/commons.py index b65658c..469566c 100644 --- a/matcher/commons.py +++ b/matcher/commons.py @@ -1,5 +1,9 @@ -import requests +"""Use mediawiki API to look up images on Wikimedia Commons.""" + import urllib.parse + +import requests + from . import utils commons_start = "http://commons.wikimedia.org/wiki/Special:FilePath/" @@ -7,11 +11,13 @@ commons_url = "https://www.wikidata.org/w/api.php" page_size = 50 -def commons_uri_to_filename(uri): +def commons_uri_to_filename(uri: str) -> str: + """Given the URI for a file on commons return the filename of the file.""" return urllib.parse.unquote(utils.drop_start(uri, commons_start)) -def api_call(params): +def api_call(params: dict[str, str | int]) -> requests.models.Response: + """Make an API call.""" call_params = { "format": "json", "formatversion": 2, @@ -22,6 +28,7 @@ def api_call(params): def image_detail(filenames, thumbheight=None, thumbwidth=None): + """Detail for multiple images.""" params = { "action": "query", "prop": "imageinfo", diff --git a/matcher/planet.py b/matcher/planet.py index 57f7b44..15b8016 100644 --- a/matcher/planet.py +++ b/matcher/planet.py @@ -1,24 +1,32 @@ -from sqlalchemy import Table, Column, Integer, String, Float, MetaData -from sqlalchemy.dialects import postgresql +"""Planet tables.""" + from geoalchemy2 import Geometry +from sqlalchemy import Column, Float, Integer, MetaData, String, Table +from sqlalchemy.dialects import postgresql metadata = MetaData() -point = Table("planet_osm_point", metadata, +point = Table( + "planet_osm_point", + metadata, Column("osm_id", Integer), Column("name", String), Column("tags", postgresql.HSTORE), Column("way", Geometry("GEOMETRY", srid=4326, spatial_index=True), nullable=False), ) -line = Table("planet_osm_line", metadata, +line = Table( + "planet_osm_line", + metadata, Column("osm_id", Integer), Column("name", String), Column("tags", postgresql.HSTORE), Column("way", Geometry("GEOMETRY", srid=4326, spatial_index=True), nullable=False), ) -polygon = Table("planet_osm_polygon", metadata, +polygon = Table( + "planet_osm_polygon", + metadata, Column("osm_id", Integer), Column("name", String), Column("tags", postgresql.HSTORE), diff --git a/matcher/utils.py b/matcher/utils.py index 0cace13..8cbdb56 100644 --- a/matcher/utils.py +++ b/matcher/utils.py @@ -1,97 +1,114 @@ -from flask import current_app, request -from itertools import islice -import os.path import json import math -import user_agents +import os.path import re +import typing from datetime import date +from itertools import islice +from typing import Any, cast + +import flask +import user_agents from num2words import num2words metres_per_mile = 1609.344 feet_per_metre = 3.28084 feet_per_mile = 5280 +T = typing.TypeVar("T") -def chunk(it, size): + +def chunk(it: typing.Iterable[T], size: int) -> typing.Iterator[tuple[T, ...]]: + """Split an iterable into chunks of the given size.""" it = iter(it) return iter(lambda: tuple(islice(it, size)), ()) -def flatten(l): - return [item for sublist in l for item in sublist] +def flatten(top_list: list[list[T]]) -> list[T]: + """Flatten a list.""" + return [item for sub_list in top_list for item in sub_list] -def drop_start(s, start): +def drop_start(s: str, start: str) -> str: + """Remove string prefix, otherwise throw an error.""" assert s.startswith(start) return s[len(start) :] -def remove_start(s, start): +def remove_start(s: str, start: str) -> str: + """Remove a string prefix, if present.""" return s[len(start) :] if s.startswith(start) else s -def normalize_url(url): +def normalize_url(url: str) -> str: + """Standardize URLs to help in comparison.""" for start in "http://", "https://", "www.": url = remove_start(url, start) return url.rstrip("/") -def contains_digit(s): +def contains_digit(s: str) -> bool: + """Check if string contains a digit.""" return any(c.isdigit() for c in s) -def cache_dir(): - return current_app.config["CACHE_DIR"] +def cache_dir() -> str: + """Get cache dir location.""" + d: str = flask.current_app.config["CACHE_DIR"] + return d -def cache_filename(filename): +def cache_filename(filename: str) -> str: + """Get absolute path for cache file.""" return os.path.join(cache_dir(), filename) -def load_from_cache(filename): +def load_from_cache(filename: str) -> Any: + """Load JSON data from cache.""" return json.load(open(cache_filename(filename))) -def get_radius(default=1000): - arg_radius = request.args.get("radius") +def get_radius(default: int = 1000) -> int | None: + """Get radius request argument with default.""" + arg_radius = flask.request.args.get("radius") return int(arg_radius) if arg_radius and arg_radius.isdigit() else default -def get_int_arg(name): - if name in request.args and request.args[name].isdigit(): - return int(request.args[name]) +def get_int_arg(name: str) -> int | None: + """Get an request arg and convert to integer.""" + v = flask.request.args.get(name) + return int(v) if v and v.isdigit() else None -def calc_chunk_size(area_in_sq_km, size=22): +def calc_chunk_size(area_in_sq_km: float, size: int = 22) -> int: + """Work out the size of a chunk.""" side = math.sqrt(area_in_sq_km) return max(1, math.ceil(side / size)) -def file_missing_or_empty(filename): +def file_missing_or_empty(filename: str) -> bool: + """Check if a file is missing or empty.""" return os.path.exists(filename) or os.stat(filename).st_size == 0 -def is_bot(): - """ Is the current request from a web robot? """ - ua = request.headers.get("User-Agent") - return ua and user_agents.parse(ua).is_bot +def is_bot() -> bool: + """Is the current request from a web robot.""" + ua = flask.request.headers.get("User-Agent") + return bool(ua and user_agents.parse(ua).is_bot) -def log_location(): - return current_app.config["LOG_DIR"] +def log_location() -> str: + """Get log location from Flask config.""" + return cast(str, flask.current_app.config["LOG_DIR"]) -def good_location(): - return os.path.join(log_location(), "complete") - - -def capfirst(value): - """ Uppercase first letter of string, leave rest as is. """ +def capfirst(value: str) -> str: + """Uppercase first letter of string, leave rest as is.""" return value[0].upper() + value[1:] if value else value -def any_upper(value): +def any_upper(value: str) -> bool: + """Check if string contains any uppercase characters.""" return any(c.isupper() for c in value) @@ -102,7 +119,8 @@ def find_log_file(place): return f.path -def get_free_space(config): +def get_free_space(config: flask.config.Config) -> int: + """Return the amount of available free space.""" s = os.statvfs(config["FREE_SPACE_PATH"]) return s.f_bsize * s.f_bavail @@ -132,12 +150,12 @@ def display_distance(units, dist): return f"{dist / 1000:,.2f} km" -re_range = re.compile(r"\b(\d+) ?(?:to|-) ?(\d+)\b", re.I) -re_number_list = re.compile(r"\b([\d, ]+) (?:and|&) (\d+)\b", re.I) -re_number = re.compile(r"^(?:No\.?|Number)? ?(\d+)\b") +def is_in_range(address_range: str, address: str) -> bool: + """Check if an address is within a range.""" + re_range = re.compile(r"\b(\d+) ?(?:to|-) ?(\d+)\b", re.I) + re_number_list = re.compile(r"\b([\d, ]+) (?:and|&) (\d+)\b", re.I) + re_number = re.compile(r"^(?:No\.?|Number)? ?(\d+)\b") - -def is_in_range(address_range, address): m_number = re_number.match(address) if not m_number: return False