This commit is contained in:
Edward Betts 2023-05-13 21:57:58 +02:00
parent 9f8ccf95f1
commit 733ca3aa8f
4 changed files with 366 additions and 231 deletions

View file

@ -1,19 +1,21 @@
from sqlalchemy import func, or_, and_, text
import json
import os.path
import re
from collections import Counter, defaultdict
from flask import current_app, g
from sqlalchemy import and_, func, or_, text
from sqlalchemy.dialects import postgresql
from sqlalchemy.orm import selectinload
from sqlalchemy.sql import select
from sqlalchemy.sql.expression import literal, union, cast, column
from sqlalchemy.sql.expression import cast, column, literal, union
from sqlalchemy.types import Float
from sqlalchemy.dialects import postgresql
from matcher.planet import point, line, polygon
from matcher import model, database, wikidata_api, wikidata
from collections import Counter, defaultdict
from flask import g, current_app
import re
import os.path
import json
from matcher import database, model, wikidata, wikidata_api
from matcher.planet import line, point, polygon
srid = 4326
re_point = re.compile(r'^POINT\((.+) (.+)\)$')
re_point = re.compile(r"^POINT\((.+) (.+)\)$")
entity_keys = {"labels", "sitelinks", "aliases", "claims", "descriptions", "lastrevid"}
tag_prefixes = {
@ -37,19 +39,19 @@ skip_tags = {
"Key:brand",
}
def get_country_iso3166_1(lat, lon):
"""
For a given lat/lon return a set of ISO country codes.
def get_country_iso3166_1(lat: float, lon: float) -> set[str]:
"""For a given lat/lon return a set of ISO country codes.
Also cache the country code in the global object.
Normally there should be only one country.
"""
point = func.ST_SetSRID(func.ST_MakePoint(lon, lat), srid)
alpha2_codes = set()
q = model.Polygon.query.filter(func.ST_Covers(model.Polygon.way, point),
model.Polygon.admin_level == "2")
q = model.Polygon.query.filter(
func.ST_Covers(model.Polygon.way, point), model.Polygon.admin_level == "2"
)
for country in q:
alpha2 = country.tags.get("ISO3166-1")
if not alpha2:
@ -60,22 +62,23 @@ def get_country_iso3166_1(lat, lon):
return alpha2_codes
def is_street_number_first(lat, lon):
def is_street_number_first(lat: float, lon: float) -> bool:
"""Is lat/lon within a country that puts number first in a street address."""
if lat is None or lon is None:
return True
alpha2 = get_country_iso3166_1(lat, lon)
# Incomplete list of countries that put street number first.
alpha2_number_first = {
'GB', # United Kingdom
'IE', # Ireland
'US', # United States
'MX', # Mexico
'CA', # Canada
'FR', # France
'AU', # Australia
'NZ', # New Zealand
'ZA', # South Africa
"GB", # United Kingdom
"IE", # Ireland
"US", # United States
"MX", # Mexico
"CA", # Canada
"FR", # France
"AU", # Australia
"NZ", # New Zealand
"ZA", # South Africa
}
return bool(alpha2_number_first & alpha2)
@ -84,22 +87,26 @@ def is_street_number_first(lat, lon):
def make_envelope(bounds):
return func.ST_MakeEnvelope(*bounds, srid)
def get_bbox_centroid(bbox):
bbox = make_envelope(bbox)
centroid = database.session.query(func.ST_AsText(func.ST_Centroid(bbox))).scalar()
return reversed(re_point.match(centroid).groups())
def make_envelope_around_point(lat, lon, distance):
conn = database.session.connection()
p = func.ST_MakePoint(lon, lat)
s = select([
s = select(
[
func.ST_AsText(func.ST_Project(p, distance, func.radians(0))),
func.ST_AsText(func.ST_Project(p, distance, func.radians(90))),
func.ST_AsText(func.ST_Project(p, distance, func.radians(180))),
func.ST_AsText(func.ST_Project(p, distance, func.radians(270))),
])
]
)
row = conn.execute(s).fetchone()
coords = [[float(v) for v in re_point.match(i).groups()] for i in row]
@ -110,49 +117,64 @@ def make_envelope_around_point(lat, lon, distance):
return func.ST_MakeEnvelope(west, south, east, north, srid)
def drop_way_area(tags):
def drop_way_area(tags: dict[str, str]) -> dict[str, str]:
"""Remove the way_area field from a tags dict."""
if "way_area" in tags:
del tags["way_area"]
return tags
def get_part_of(table_name, src_id, bbox):
table_map = {'point': point, 'line': line, 'polygon': polygon}
table_map = {"point": point, "line": line, "polygon": polygon}
table_alias = table_map[table_name].alias()
s = (select([polygon.c.osm_id,
s = (
select(
[
polygon.c.osm_id,
polygon.c.tags,
func.ST_Area(func.ST_Collect(polygon.c.way))]).
where(and_(func.ST_Intersects(bbox, polygon.c.way),
func.ST_Area(func.ST_Collect(polygon.c.way)),
]
)
.where(
and_(
func.ST_Intersects(bbox, polygon.c.way),
func.ST_Covers(polygon.c.way, table_alias.c.way),
table_alias.c.osm_id == src_id,
polygon.c.tags.has_key("name"),
or_(
polygon.c.tags.has_key("landuse"),
polygon.c.tags.has_key("amenity"),
))).
group_by(polygon.c.osm_id, polygon.c.tags))
),
)
)
.group_by(polygon.c.osm_id, polygon.c.tags)
)
conn = database.session.connection()
return [{
return [
{
"type": "way" if osm_id > 0 else "relation",
"id": abs(osm_id),
"tags": tags,
"area": area,
} for osm_id, tags, area in conn.execute(s)]
}
for osm_id, tags, area in conn.execute(s)
]
def get_and_save_item(qid):
def get_and_save_item(qid: str) -> model.Item | None:
"""Download an item from Wikidata and cache it in the database."""
entity = wikidata_api.get_entity(qid)
entity_qid = entity["id"]
if entity_qid != qid:
print(f'redirect {qid} -> {entity_qid}')
print(f"redirect {qid} -> {entity_qid}")
item = model.Item.query.get(entity_qid[1:])
return item
if "claims" not in entity:
return
return None
coords = wikidata.get_entity_coords(entity["claims"])
item_id = int(qid[1:])
@ -171,8 +193,9 @@ def get_and_save_item(qid):
return item
def get_isa_count(items):
isa_count = Counter()
def get_isa_count(items: list[model.Item]) -> list[tuple[int, int]]:
"""List of IsA counts."""
isa_count: Counter[int] = Counter()
for item in items:
if not item:
continue
@ -199,13 +222,11 @@ def get_items_in_bbox(bbox):
def get_osm_with_wikidata_tag(bbox, isa_filter=None):
bbox_str = ','.join(str(v) for v in bbox)
bbox_str = ",".join(str(v) for v in bbox)
extra_sql = ""
if isa_filter:
q = (
model.Item.query.join(model.ItemLocation)
.filter(func.ST_Covers(make_envelope(bbox),
model.ItemLocation.location))
q = model.Item.query.join(model.ItemLocation).filter(
func.ST_Covers(make_envelope(bbox), model.ItemLocation.location)
)
q = add_isa_filter(q, isa_filter)
qids = [isa.qid for isa in q]
@ -216,7 +237,8 @@ def get_osm_with_wikidata_tag(bbox, isa_filter=None):
extra_sql += f" AND tags -> 'wikidata' in ({qid_list})"
# easier than building this query with SQLAlchemy
sql = f'''
sql = (
f"""
SELECT tbl, osm_id, tags, ARRAY[ST_Y(centroid), ST_X(centroid)], geojson
FROM (
SELECT 'point' as tbl, osm_id, tags, ST_AsText(ST_Centroid(way)) as centroid, ST_AsGeoJSON(way) as geojson
@ -235,24 +257,29 @@ UNION
HAVING st_area(st_collect(way)) < 20 * st_area(ST_MakeEnvelope({bbox_str}, {srid}))
) as anon
WHERE tags ? 'wikidata'
''' + extra_sql
"""
+ extra_sql
)
conn = database.session.connection()
result = conn.execute(text(sql))
print(sql)
point_sql = f'''
point_sql = (
f"""
SELECT 'point' as tbl, osm_id, tags, ST_AsText(ST_Centroid(way)) as centroid, ST_AsGeoJSON(way) as geojson
FROM planet_osm_point
WHERE ST_Intersects(ST_MakeEnvelope({bbox_str}, {srid}), way) and tags ? 'wikidata'
''' + extra_sql
"""
+ extra_sql
)
print("point")
print(point_sql)
tagged = []
for tbl, osm_id, tags, centroid, geojson in result:
if tbl == 'point':
if tbl == "point":
osm_type = "node"
else:
osm_type = "way" if osm_id > 0 else "relation"
@ -260,7 +287,8 @@ WHERE tags ? 'wikidata'
name = tags.get("name") or tags.get("addr:housename") or "[no label]"
tagged.append({
tagged.append(
{
"identifier": f"{osm_type}/{osm_id}",
"id": osm_id,
"type": osm_type,
@ -268,7 +296,8 @@ WHERE tags ? 'wikidata'
"centroid": centroid,
"name": name,
"wikidata": tags["wikidata"],
})
}
)
return tagged
@ -310,11 +339,13 @@ def get_item_tags(item):
isa, isa_path = isa_items.pop()
if not isa:
continue
isa_path = isa_path + [{'qid': isa.qid, 'label': isa.label()}]
isa_path = isa_path + [{"qid": isa.qid, "label": isa.label()}]
osm = [v for v in isa.get_claim("P1282") if v not in skip_tags]
osm += [extra.tag_or_key
for extra in model.ItemExtraKeys.query.filter_by(item_id=isa.item_id)]
osm += [
extra.tag_or_key
for extra in model.ItemExtraKeys.query.filter_by(item_id=isa.item_id)
]
for i in osm:
osm_list[i].append(isa_path[:])
@ -369,14 +400,16 @@ def get_tags_for_isa_item(item):
isa, isa_path = isa_items.pop()
if not isa:
continue
isa_path = isa_path + [{'qid': isa.qid, 'label': isa.label()}]
isa_path = isa_path + [{"qid": isa.qid, "label": isa.label()}]
if isa.item_id not in items_checked_done:
items_checked.append({'qid': isa.qid, 'label': isa.label()})
items_checked.append({"qid": isa.qid, "label": isa.label()})
items_checked_done.add(isa.item_id)
osm = [v for v in isa.get_claim("P1282") if v not in skip_tags]
osm += [extra.tag_or_key
for extra in model.ItemExtraKeys.query.filter_by(item_id=isa.item_id)]
osm += [
extra.tag_or_key
for extra in model.ItemExtraKeys.query.filter_by(item_id=isa.item_id)
]
for i in osm:
osm_list[i].append(isa_path[:])
@ -403,34 +436,31 @@ def get_tags_for_isa_item(item):
seen.update(isa_list)
isa_items += [(isa, isa_path) for isa in get_items(isa_list)]
return {
'tags': {key: list(values) for key, values in osm_list.items()},
'checked': items_checked,
"tags": {key: list(values) for key, values in osm_list.items()},
"checked": items_checked,
}
def add_isa_filter(q, isa_qids):
q_subclass = database.session.query(model.Item.qid).filter(
func.jsonb_path_query_array(
model.Item.claims,
'$.P279[*].mainsnak.datavalue.value.id',
).bool_op('?|')(list(isa_qids))
"$.P279[*].mainsnak.datavalue.value.id",
).bool_op("?|")(list(isa_qids))
)
subclass_qid = {qid for qid, in q_subclass.all()}
isa = func.jsonb_path_query_array(
model.Item.claims,
'$.P31[*].mainsnak.datavalue.value.id',
).bool_op('?|')
"$.P31[*].mainsnak.datavalue.value.id",
).bool_op("?|")
return q.filter(isa(list(isa_qids | subclass_qid)))
def wikidata_items_count(bounds, isa_filter=None):
q = (
model.Item.query.join(model.ItemLocation)
.filter(func.ST_Covers(make_envelope(bounds), model.ItemLocation.location))
q = model.Item.query.join(model.ItemLocation).filter(
func.ST_Covers(make_envelope(bounds), model.ItemLocation.location)
)
if isa_filter:
@ -440,12 +470,12 @@ def wikidata_items_count(bounds, isa_filter=None):
return q.count()
def wikidata_isa_counts(bounds, isa_filter=None):
db_bbox = make_envelope(bounds)
q = (
model.Item.query.join(model.ItemLocation)
.filter(func.ST_Covers(db_bbox, model.ItemLocation.location))
q = model.Item.query.join(model.ItemLocation).filter(
func.ST_Covers(db_bbox, model.ItemLocation.location)
)
if isa_filter:
@ -474,12 +504,13 @@ def wikidata_isa_counts(bounds, isa_filter=None):
return isa_count
def get_tag_filter(tags, tag_list):
tag_filter = []
for tag_or_key in tag_list:
if tag_or_key.startswith("Key:"):
key = tag_or_key[4:]
tag_filter.append(and_(tags.has_key(key), tags[key] != 'no'))
tag_filter.append(and_(tags.has_key(key), tags[key] != "no"))
for prefix in tag_prefixes:
tag_filter.append(tags.has_key(f"{prefix}:{key}"))
@ -495,11 +526,11 @@ def get_tag_filter(tags, tag_list):
def get_preset_translations():
app = current_app
country_language = {
'AU': 'en-AU', # Australia
'GB': 'en-GB', # United Kingdom
'IE': 'en-GB', # Ireland
'IN': 'en-IN', # India
'NZ': 'en-NZ', # New Zealand
"AU": "en-AU", # Australia
"GB": "en-GB", # United Kingdom
"IE": "en-GB", # Ireland
"IN": "en-IN", # India
"NZ": "en-NZ", # New Zealand
}
ts_dir = app.config["ID_TAGGING_SCHEMA_DIR"]
translation_dir = os.path.join(ts_dir, "dist", "translations")
@ -520,13 +551,14 @@ def get_preset_translations():
return {}
def get_presets_from_tags(ending, tags):
translations = get_preset_translations()
found = []
for k, v in tags.items():
if k == 'amenity' and v == 'clock' and tags.get('display') == 'sundial':
if k == "amenity" and v == "clock" and tags.get("display") == "sundial":
tag_or_key = f"Tag:{k}={v}"
found.append({"tag_or_key": tag_or_key, "name": "Sundial"})
continue
@ -604,8 +636,7 @@ def address_node_label(tags):
def get_address_nodes_within_building(osm_id, bbox_list):
q = model.Point.query.filter(
polygon.c.osm_id == osm_id,
or_(*[func.ST_Intersects(bbox, model.Point.way)
for bbox in bbox_list]),
or_(*[func.ST_Intersects(bbox, model.Point.way) for bbox in bbox_list]),
func.ST_Covers(polygon.c.way, model.Point.way),
model.Point.tags.has_key("addr:street"),
model.Point.tags.has_key("addr:housenumber"),
@ -615,8 +646,14 @@ def get_address_nodes_within_building(osm_id, bbox_list):
def osm_display_name(tags):
keys = ("bridge:name", "tunnel:name", "lock_name", "name", "addr:housename",
"inscription")
keys = (
"bridge:name",
"tunnel:name",
"lock_name",
"name",
"addr:housename",
"inscription",
)
for key in keys:
if key in tags:
return tags[key]
@ -625,6 +662,7 @@ def osm_display_name(tags):
def street_address_in_tags(tags):
return "addr:housenumber" in tags and "addr:street" in tags
def find_osm_candidates(item, limit=80, max_distance=450, names=None):
item_id = item.item_id
item_is_linear_feature = item.is_linear_feature()
@ -637,51 +675,94 @@ def find_osm_candidates(item, limit=80, max_distance=450, names=None):
check_is_street_number_first(item.locations[0].get_lat_lon())
bbox_list = [make_envelope_around_point(*loc.get_lat_lon(), max_distance)
for loc in item.locations]
bbox_list = [
make_envelope_around_point(*loc.get_lat_lon(), max_distance)
for loc in item.locations
]
null_area = cast(None, Float)
dist = column('dist')
tags = column('tags', postgresql.HSTORE)
dist = column("dist")
tags = column("tags", postgresql.HSTORE)
tag_list = get_item_tags(item)
# tag_filters = get_tag_filter(point.c.tags, tag_list)
# print(tag_filters)
s_point = (select([literal('point').label('t'), point.c.osm_id, point.c.tags.label('tags'),
func.min(func.ST_DistanceSphere(model.ItemLocation.location, point.c.way)).label('dist'),
s_point = (
select(
[
literal("point").label("t"),
point.c.osm_id,
point.c.tags.label("tags"),
func.min(
func.ST_DistanceSphere(model.ItemLocation.location, point.c.way)
).label("dist"),
func.ST_AsText(point.c.way),
func.ST_AsGeoJSON(point.c.way),
null_area]).
where(and_(
or_(*[func.ST_Intersects(bbox, point.c.way)
for bbox in bbox_list]),
null_area,
]
)
.where(
and_(
or_(*[func.ST_Intersects(bbox, point.c.way) for bbox in bbox_list]),
model.ItemLocation.item_id == item_id,
or_(*get_tag_filter(point.c.tags, tag_list)))).
group_by(point.c.osm_id, point.c.tags, point.c.way))
or_(*get_tag_filter(point.c.tags, tag_list)),
)
)
.group_by(point.c.osm_id, point.c.tags, point.c.way)
)
s_line = (select([literal('line').label('t'), line.c.osm_id, line.c.tags.label('tags'),
func.min(func.ST_DistanceSphere(model.ItemLocation.location, line.c.way)).label('dist'),
s_line = (
select(
[
literal("line").label("t"),
line.c.osm_id,
line.c.tags.label("tags"),
func.min(
func.ST_DistanceSphere(model.ItemLocation.location, line.c.way)
).label("dist"),
func.ST_AsText(func.ST_Centroid(func.ST_Collect(line.c.way))),
func.ST_AsGeoJSON(func.ST_Collect(line.c.way)),
null_area]).
where(and_(
null_area,
]
)
.where(
and_(
or_(*[func.ST_Intersects(bbox, line.c.way) for bbox in bbox_list]),
model.ItemLocation.item_id == item_id,
or_(*get_tag_filter(line.c.tags, tag_list)))).
group_by(line.c.osm_id, line.c.tags))
or_(*get_tag_filter(line.c.tags, tag_list)),
)
)
.group_by(line.c.osm_id, line.c.tags)
)
s_polygon = (select([literal('polygon').label('t'), polygon.c.osm_id, polygon.c.tags.label('tags'),
func.min(func.ST_DistanceSphere(model.ItemLocation.location, polygon.c.way)).label('dist'),
s_polygon = (
select(
[
literal("polygon").label("t"),
polygon.c.osm_id,
polygon.c.tags.label("tags"),
func.min(
func.ST_DistanceSphere(model.ItemLocation.location, polygon.c.way)
).label("dist"),
func.ST_AsText(func.ST_Centroid(func.ST_Collect(polygon.c.way))),
func.ST_AsGeoJSON(func.ST_Collect(polygon.c.way)),
func.ST_Area(func.ST_Collect(polygon.c.way))]).
where(and_(
func.ST_Area(func.ST_Collect(polygon.c.way)),
]
)
.where(
and_(
or_(*[func.ST_Intersects(bbox, polygon.c.way) for bbox in bbox_list]),
model.ItemLocation.item_id == item_id,
or_(*get_tag_filter(polygon.c.tags, tag_list)))).
group_by(polygon.c.osm_id, polygon.c.tags).
having(func.ST_Area(func.ST_Collect(polygon.c.way)) < 20 * func.ST_Area(bbox_list[0])))
or_(*get_tag_filter(polygon.c.tags, tag_list)),
)
)
.group_by(polygon.c.osm_id, polygon.c.tags)
.having(
func.ST_Area(func.ST_Collect(polygon.c.way))
< 20 * func.ST_Area(bbox_list[0])
)
)
tables = ([] if item_is_linear_feature else [s_point]) + [s_line, s_polygon]
s = select([union(*tables).alias()]).where(dist < max_distance).order_by(dist)
@ -695,10 +776,14 @@ def find_osm_candidates(item, limit=80, max_distance=450, names=None):
s = s.where(tags.has_key("name"))
if "Key:amenity" in tag_list:
s = s.where(and_(tags["amenity"] != "bicycle_parking",
s = s.where(
and_(
tags["amenity"] != "bicycle_parking",
tags["amenity"] != "bicycle_repair_station",
tags["amenity"] != "atm",
tags["amenity"] != "recycling"))
tags["amenity"] != "recycling",
)
)
if limit:
s = s.limit(limit)
@ -750,8 +835,9 @@ def find_osm_candidates(item, limit=80, max_distance=450, names=None):
part_of = []
for bbox in bbox_list:
part_of += [i for i in get_part_of(table, src_id, bbox)
if i["tags"]["name"] != name]
part_of += [
i for i in get_part_of(table, src_id, bbox) if i["tags"]["name"] != name
]
if part_of:
cur["part_of"] = part_of
@ -762,6 +848,7 @@ def find_osm_candidates(item, limit=80, max_distance=450, names=None):
return nearby
def get_item(item_id):
"""Retrieve a Wikidata item, either from the database or from Wikidata."""
@ -776,7 +863,7 @@ def get_item_street_addresses(item):
for claim in item.claims["P669"]:
qualifiers = claim.get("qualifiers")
if not qualifiers or 'P670' not in qualifiers:
if not qualifiers or "P670" not in qualifiers:
continue
number = qualifiers["P670"][0]["datavalue"]["value"]
@ -784,24 +871,26 @@ def get_item_street_addresses(item):
street = street_item.label()
for q in qualifiers["P670"]:
number = q["datavalue"]["value"]
address = (f"{number} {street}"
if g.street_number_first
else f"{street} {number}")
address = (
f"{number} {street}" if g.street_number_first else f"{street} {number}"
)
street_address.append(address)
return street_address
def check_is_street_number_first(latlng):
g.street_number_first = is_street_number_first(*latlng)
def item_detail(item):
unsupported_relation_types = {
'Q194356', # wind farm
'Q2175765', # tram stop
"Q194356", # wind farm
"Q2175765", # tram stop
}
locations = [list(i.get_lat_lon()) for i in item.locations]
if not hasattr(g, 'street_number_first'):
if not hasattr(g, "street_number_first"):
g.street_number_first = is_street_number_first(*locations[0])
image_filenames = item.get_claim("P18")
@ -811,20 +900,24 @@ def item_detail(item):
heritage_designation = []
for v in item.get_claim("P1435"):
if not v:
print('heritage designation missing:', item.qid)
print("heritage designation missing:", item.qid)
continue
heritage_designation_item = get_item(v["numeric-id"])
heritage_designation.append({
heritage_designation.append(
{
"qid": v["id"],
"label": heritage_designation_item.label(),
})
}
)
isa_items = [get_item(isa["numeric-id"]) for isa in item.get_isa()]
isa_lookup = {isa.qid: isa for isa in isa_items}
wikipedia_links = [{"lang": site[:-4], "title": link["title"]}
wikipedia_links = [
{"lang": site[:-4], "title": link["title"]}
for site, link in sorted(item.sitelinks.items())
if site.endswith("wiki") and len(site) < 8]
if site.endswith("wiki") and len(site) < 8
]
d = {
"qid": item.qid,
@ -833,7 +926,9 @@ def item_detail(item):
"markers": locations,
"image_list": image_filenames,
"street_address": street_address,
"isa_list": [{"qid": isa.qid, "label": isa.label()} for isa in isa_items if isa],
"isa_list": [
{"qid": isa.qid, "label": isa.label()} for isa in isa_items if isa
],
"closed": item.closed(),
"inception": item.time_claim("P571"),
"p1619": item.time_claim("P1619"),
@ -851,8 +946,9 @@ def item_detail(item):
unsupported = isa_lookup.keys() & unsupported_relation_types
if unsupported:
d["unsupported_relation_types"] = [isa for isa in d["isa_list"]
if isa["qid"] in isa_lookup]
d["unsupported_relation_types"] = [
isa for isa in d["isa_list"] if isa["qid"] in isa_lookup
]
return d
@ -891,7 +987,7 @@ def wikidata_items(bounds, isa_filter=None):
}
isa_count.append(isa)
return {'items': items, 'isa_count': isa_count}
return {"items": items, "isa_count": isa_count}
def missing_wikidata_items(qids, lat, lon):
@ -926,6 +1022,7 @@ def missing_wikidata_items(qids, lat, lon):
return dict(items=items, isa_count=isa_count)
def isa_incremental_search(search_terms):
en_label = func.jsonb_extract_path_text(model.Item.labels, "en", "value")
q = model.Item.query.filter(
@ -945,13 +1042,18 @@ def isa_incremental_search(search_terms):
ret.append(cur)
return ret
def get_place_items(osm_type, osm_id):
src_id = osm_id * {'way': 1, 'relation': -1}[osm_type]
q = (model.Item.query
.join(model.ItemLocation)
.join(model.Polygon, func.ST_Covers(model.Polygon.way, model.ItemLocation.location))
.filter(model.Polygon.src_id == src_id))
def get_place_items(osm_type, osm_id):
src_id = osm_id * {"way": 1, "relation": -1}[osm_type]
q = (
model.Item.query.join(model.ItemLocation)
.join(
model.Polygon,
func.ST_Covers(model.Polygon.way, model.ItemLocation.location),
)
.filter(model.Polygon.src_id == src_id)
)
# sql = q.statement.compile(compile_kwargs={"literal_binds": True})
item_count = q.count()

View file

@ -1,5 +1,9 @@
import requests
"""Use mediawiki API to look up images on Wikimedia Commons."""
import urllib.parse
import requests
from . import utils
commons_start = "http://commons.wikimedia.org/wiki/Special:FilePath/"
@ -7,11 +11,13 @@ commons_url = "https://www.wikidata.org/w/api.php"
page_size = 50
def commons_uri_to_filename(uri):
def commons_uri_to_filename(uri: str) -> str:
"""Given the URI for a file on commons return the filename of the file."""
return urllib.parse.unquote(utils.drop_start(uri, commons_start))
def api_call(params):
def api_call(params: dict[str, str | int]) -> requests.models.Response:
"""Make an API call."""
call_params = {
"format": "json",
"formatversion": 2,
@ -22,6 +28,7 @@ def api_call(params):
def image_detail(filenames, thumbheight=None, thumbwidth=None):
"""Detail for multiple images."""
params = {
"action": "query",
"prop": "imageinfo",

View file

@ -1,24 +1,32 @@
from sqlalchemy import Table, Column, Integer, String, Float, MetaData
from sqlalchemy.dialects import postgresql
"""Planet tables."""
from geoalchemy2 import Geometry
from sqlalchemy import Column, Float, Integer, MetaData, String, Table
from sqlalchemy.dialects import postgresql
metadata = MetaData()
point = Table("planet_osm_point", metadata,
point = Table(
"planet_osm_point",
metadata,
Column("osm_id", Integer),
Column("name", String),
Column("tags", postgresql.HSTORE),
Column("way", Geometry("GEOMETRY", srid=4326, spatial_index=True), nullable=False),
)
line = Table("planet_osm_line", metadata,
line = Table(
"planet_osm_line",
metadata,
Column("osm_id", Integer),
Column("name", String),
Column("tags", postgresql.HSTORE),
Column("way", Geometry("GEOMETRY", srid=4326, spatial_index=True), nullable=False),
)
polygon = Table("planet_osm_polygon", metadata,
polygon = Table(
"planet_osm_polygon",
metadata,
Column("osm_id", Integer),
Column("name", String),
Column("tags", postgresql.HSTORE),

View file

@ -1,97 +1,114 @@
from flask import current_app, request
from itertools import islice
import os.path
import json
import math
import user_agents
import os.path
import re
import typing
from datetime import date
from itertools import islice
from typing import Any, cast
import flask
import user_agents
from num2words import num2words
metres_per_mile = 1609.344
feet_per_metre = 3.28084
feet_per_mile = 5280
T = typing.TypeVar("T")
def chunk(it, size):
def chunk(it: typing.Iterable[T], size: int) -> typing.Iterator[tuple[T, ...]]:
"""Split an iterable into chunks of the given size."""
it = iter(it)
return iter(lambda: tuple(islice(it, size)), ())
def flatten(l):
return [item for sublist in l for item in sublist]
def flatten(top_list: list[list[T]]) -> list[T]:
"""Flatten a list."""
return [item for sub_list in top_list for item in sub_list]
def drop_start(s, start):
def drop_start(s: str, start: str) -> str:
"""Remove string prefix, otherwise throw an error."""
assert s.startswith(start)
return s[len(start) :]
def remove_start(s, start):
def remove_start(s: str, start: str) -> str:
"""Remove a string prefix, if present."""
return s[len(start) :] if s.startswith(start) else s
def normalize_url(url):
def normalize_url(url: str) -> str:
"""Standardize URLs to help in comparison."""
for start in "http://", "https://", "www.":
url = remove_start(url, start)
return url.rstrip("/")
def contains_digit(s):
def contains_digit(s: str) -> bool:
"""Check if string contains a digit."""
return any(c.isdigit() for c in s)
def cache_dir():
return current_app.config["CACHE_DIR"]
def cache_dir() -> str:
"""Get cache dir location."""
d: str = flask.current_app.config["CACHE_DIR"]
return d
def cache_filename(filename):
def cache_filename(filename: str) -> str:
"""Get absolute path for cache file."""
return os.path.join(cache_dir(), filename)
def load_from_cache(filename):
def load_from_cache(filename: str) -> Any:
"""Load JSON data from cache."""
return json.load(open(cache_filename(filename)))
def get_radius(default=1000):
arg_radius = request.args.get("radius")
def get_radius(default: int = 1000) -> int | None:
"""Get radius request argument with default."""
arg_radius = flask.request.args.get("radius")
return int(arg_radius) if arg_radius and arg_radius.isdigit() else default
def get_int_arg(name):
if name in request.args and request.args[name].isdigit():
return int(request.args[name])
def get_int_arg(name: str) -> int | None:
"""Get an request arg and convert to integer."""
v = flask.request.args.get(name)
return int(v) if v and v.isdigit() else None
def calc_chunk_size(area_in_sq_km, size=22):
def calc_chunk_size(area_in_sq_km: float, size: int = 22) -> int:
"""Work out the size of a chunk."""
side = math.sqrt(area_in_sq_km)
return max(1, math.ceil(side / size))
def file_missing_or_empty(filename):
def file_missing_or_empty(filename: str) -> bool:
"""Check if a file is missing or empty."""
return os.path.exists(filename) or os.stat(filename).st_size == 0
def is_bot():
""" Is the current request from a web robot? """
ua = request.headers.get("User-Agent")
return ua and user_agents.parse(ua).is_bot
def is_bot() -> bool:
"""Is the current request from a web robot."""
ua = flask.request.headers.get("User-Agent")
return bool(ua and user_agents.parse(ua).is_bot)
def log_location():
return current_app.config["LOG_DIR"]
def log_location() -> str:
"""Get log location from Flask config."""
return cast(str, flask.current_app.config["LOG_DIR"])
def good_location():
return os.path.join(log_location(), "complete")
def capfirst(value):
def capfirst(value: str) -> str:
"""Uppercase first letter of string, leave rest as is."""
return value[0].upper() + value[1:] if value else value
def any_upper(value):
def any_upper(value: str) -> bool:
"""Check if string contains any uppercase characters."""
return any(c.isupper() for c in value)
@ -102,7 +119,8 @@ def find_log_file(place):
return f.path
def get_free_space(config):
def get_free_space(config: flask.config.Config) -> int:
"""Return the amount of available free space."""
s = os.statvfs(config["FREE_SPACE_PATH"])
return s.f_bsize * s.f_bavail
@ -132,12 +150,12 @@ def display_distance(units, dist):
return f"{dist / 1000:,.2f} km"
def is_in_range(address_range: str, address: str) -> bool:
"""Check if an address is within a range."""
re_range = re.compile(r"\b(\d+) ?(?:to|-) ?(\d+)\b", re.I)
re_number_list = re.compile(r"\b([\d, ]+) (?:and|&) (\d+)\b", re.I)
re_number = re.compile(r"^(?:No\.?|Number)? ?(\d+)\b")
def is_in_range(address_range, address):
m_number = re_number.match(address)
if not m_number:
return False