forked from edward/owl-map
1070 lines
31 KiB
Python
1070 lines
31 KiB
Python
import json
|
|
import os.path
|
|
import re
|
|
from collections import Counter, defaultdict
|
|
|
|
from flask import current_app, g
|
|
from sqlalchemy import and_, func, or_, text
|
|
from sqlalchemy.dialects import postgresql
|
|
from sqlalchemy.orm import selectinload
|
|
from sqlalchemy.sql import select
|
|
from sqlalchemy.sql.expression import cast, column, literal, union
|
|
from sqlalchemy.types import Float
|
|
|
|
from matcher import database, model, wikidata, wikidata_api
|
|
from matcher.planet import line, point, polygon
|
|
|
|
srid = 4326
|
|
re_point = re.compile(r"^POINT\((.+) (.+)\)$")
|
|
entity_keys = {"labels", "sitelinks", "aliases", "claims", "descriptions", "lastrevid"}
|
|
|
|
tag_prefixes = {
|
|
"disused",
|
|
"was",
|
|
"abandoned",
|
|
"demolished",
|
|
"destroyed",
|
|
"ruins",
|
|
"historic",
|
|
}
|
|
|
|
# these tags are too generic, so we ignore them
|
|
skip_tags = {
|
|
"Key:addr",
|
|
"Key:addr:street",
|
|
"Key:lit",
|
|
"Key:image",
|
|
"Key:name",
|
|
"Key:symbol",
|
|
"Key:brand",
|
|
}
|
|
|
|
|
|
def get_country_iso3166_1(lat: float, lon: float) -> set[str]:
|
|
"""For a given lat/lon return a set of ISO country codes.
|
|
|
|
Also cache the country code in the global object.
|
|
|
|
Normally there should be only one country.
|
|
"""
|
|
point = func.ST_SetSRID(func.ST_MakePoint(lon, lat), srid)
|
|
alpha2_codes: set[str] = set()
|
|
q = model.Polygon.query.filter(
|
|
func.ST_Covers(model.Polygon.way, point), model.Polygon.admin_level == "2"
|
|
)
|
|
for country in q:
|
|
alpha2: str = country.tags.get("ISO3166-1")
|
|
if not alpha2:
|
|
continue
|
|
alpha2_codes.add(alpha2)
|
|
|
|
g.alpha2_codes = alpha2_codes
|
|
return alpha2_codes
|
|
|
|
|
|
def is_street_number_first(lat: float, lon: float) -> bool:
|
|
"""Is lat/lon within a country that puts number first in a street address."""
|
|
if lat is None or lon is None:
|
|
return True
|
|
|
|
alpha2 = get_country_iso3166_1(lat, lon)
|
|
# Incomplete list of countries that put street number first.
|
|
alpha2_number_first = {
|
|
"GB", # United Kingdom
|
|
"IE", # Ireland
|
|
"US", # United States
|
|
"MX", # Mexico
|
|
"CA", # Canada
|
|
"FR", # France
|
|
"AU", # Australia
|
|
"NZ", # New Zealand
|
|
"ZA", # South Africa
|
|
}
|
|
|
|
return bool(alpha2_number_first & alpha2)
|
|
|
|
|
|
def make_envelope(bounds):
|
|
return func.ST_MakeEnvelope(*bounds, srid)
|
|
|
|
|
|
def get_bbox_centroid(bbox):
|
|
bbox = make_envelope(bbox)
|
|
centroid = database.session.query(func.ST_AsText(func.ST_Centroid(bbox))).scalar()
|
|
m = re_point.match(centroid)
|
|
assert m
|
|
return reversed(m.groups())
|
|
|
|
|
|
def make_envelope_around_point(lat: float, lon: float, distance: float):
|
|
conn = database.session.connection()
|
|
|
|
p = func.ST_MakePoint(lon, lat)
|
|
|
|
s = select(
|
|
[
|
|
func.ST_AsText(func.ST_Project(p, distance, func.radians(0))),
|
|
func.ST_AsText(func.ST_Project(p, distance, func.radians(90))),
|
|
func.ST_AsText(func.ST_Project(p, distance, func.radians(180))),
|
|
func.ST_AsText(func.ST_Project(p, distance, func.radians(270))),
|
|
]
|
|
)
|
|
row = conn.execute(s).fetchone()
|
|
coords = [[float(v) for v in re_point.match(i).groups()] for i in row]
|
|
|
|
north = coords[0][1]
|
|
east = coords[1][0]
|
|
south = coords[2][1]
|
|
west = coords[3][0]
|
|
|
|
return func.ST_MakeEnvelope(west, south, east, north, srid)
|
|
|
|
|
|
def drop_way_area(tags: dict[str, str]) -> dict[str, str]:
|
|
"""Remove the way_area field from a tags dict."""
|
|
if "way_area" in tags:
|
|
del tags["way_area"]
|
|
return tags
|
|
|
|
|
|
def get_part_of(table_name, src_id, bbox):
|
|
table_map = {"point": point, "line": line, "polygon": polygon}
|
|
table_alias = table_map[table_name].alias()
|
|
|
|
s = (
|
|
select(
|
|
[
|
|
polygon.c.osm_id,
|
|
polygon.c.tags,
|
|
func.ST_Area(func.ST_Collect(polygon.c.way)),
|
|
]
|
|
)
|
|
.where(
|
|
and_(
|
|
func.ST_Intersects(bbox, polygon.c.way),
|
|
func.ST_Covers(polygon.c.way, table_alias.c.way),
|
|
table_alias.c.osm_id == src_id,
|
|
polygon.c.tags.has_key("name"),
|
|
or_(
|
|
polygon.c.tags.has_key("landuse"),
|
|
polygon.c.tags.has_key("amenity"),
|
|
),
|
|
)
|
|
)
|
|
.group_by(polygon.c.osm_id, polygon.c.tags)
|
|
)
|
|
|
|
conn = database.session.connection()
|
|
return [
|
|
{
|
|
"type": "way" if osm_id > 0 else "relation",
|
|
"id": abs(osm_id),
|
|
"tags": tags,
|
|
"area": area,
|
|
}
|
|
for osm_id, tags, area in conn.execute(s)
|
|
]
|
|
|
|
|
|
def get_and_save_item(qid: str) -> model.Item | None:
|
|
"""Download an item from Wikidata and cache it in the database."""
|
|
entity = wikidata_api.get_entity(qid)
|
|
entity_qid = entity["id"]
|
|
if entity_qid != qid:
|
|
print(f"redirect {qid} -> {entity_qid}")
|
|
item: model.Item | None = model.Item.query.get(entity_qid[1:])
|
|
return item
|
|
|
|
if "claims" not in entity:
|
|
return None
|
|
coords = wikidata.get_entity_coords(entity["claims"])
|
|
|
|
item_id = int(qid[1:])
|
|
obj = {k: v for k, v in entity.items() if k in entity_keys}
|
|
try:
|
|
item = model.Item(item_id=item_id, **obj)
|
|
except TypeError:
|
|
print(qid)
|
|
print(f'{entity["pageid"]=} {entity["ns"]=} {entity["type"]=}')
|
|
print(entity.keys())
|
|
raise
|
|
item.locations = model.location_objects(coords)
|
|
database.session.add(item)
|
|
database.session.commit()
|
|
|
|
return item
|
|
|
|
|
|
def get_isa_count(items: list[model.Item]) -> list[tuple[int, int]]:
|
|
"""List of IsA counts."""
|
|
isa_count: Counter[int] = Counter()
|
|
for item in items:
|
|
if not item:
|
|
continue
|
|
isa_list = item.get_claim("P31")
|
|
for isa in isa_list:
|
|
if not isa:
|
|
print("missing IsA:", item.qid)
|
|
continue
|
|
isa_count[isa["id"]] += 1
|
|
|
|
return isa_count.most_common()
|
|
|
|
|
|
def get_items_in_bbox(bbox):
|
|
db_bbox = make_envelope(bbox)
|
|
|
|
q = (
|
|
model.Item.query.join(model.ItemLocation)
|
|
.filter(func.ST_Covers(db_bbox, model.ItemLocation.location))
|
|
.options(selectinload(model.Item.locations))
|
|
)
|
|
|
|
return q
|
|
|
|
|
|
def get_osm_with_wikidata_tag(bbox, isa_filter=None):
|
|
bbox_str = ",".join(str(v) for v in bbox)
|
|
extra_sql = ""
|
|
if isa_filter:
|
|
q = model.Item.query.join(model.ItemLocation).filter(
|
|
func.ST_Covers(make_envelope(bbox), model.ItemLocation.location)
|
|
)
|
|
q = add_isa_filter(q, isa_filter)
|
|
qids = [isa.qid for isa in q]
|
|
if not qids:
|
|
return []
|
|
|
|
qid_list = ",".join(f"'{qid}'" for qid in qids)
|
|
extra_sql += f" AND tags -> 'wikidata' in ({qid_list})"
|
|
|
|
# easier than building this query with SQLAlchemy
|
|
sql = (
|
|
f"""
|
|
SELECT tbl, osm_id, tags, ARRAY[ST_Y(centroid), ST_X(centroid)], geojson
|
|
FROM (
|
|
SELECT 'point' as tbl, osm_id, tags, ST_AsText(ST_Centroid(way)) as centroid, ST_AsGeoJSON(way) as geojson
|
|
FROM planet_osm_point
|
|
WHERE ST_Intersects(ST_MakeEnvelope({bbox_str}, {srid}), way)
|
|
UNION
|
|
SELECT 'line' as tbl, osm_id, tags, ST_AsText(ST_Centroid(ST_Collect(way))) AS centroid, ST_AsGeoJSON(ST_Collect(way)) AS geojson
|
|
FROM planet_osm_line
|
|
WHERE ST_Intersects(ST_MakeEnvelope({bbox_str}, {srid}), way)
|
|
GROUP BY osm_id, tags
|
|
UNION
|
|
SELECT 'polygon' as tbl, osm_id, tags, ST_AsText(ST_Centroid(ST_Collect(way))) AS centroid, ST_AsGeoJSON(ST_Collect(way)) AS geojson
|
|
FROM planet_osm_polygon
|
|
WHERE ST_Intersects(ST_MakeEnvelope({bbox_str}, {srid}), way)
|
|
GROUP BY osm_id, tags
|
|
HAVING st_area(st_collect(way)) < 20 * st_area(ST_MakeEnvelope({bbox_str}, {srid}))
|
|
) as anon
|
|
WHERE tags ? 'wikidata'
|
|
"""
|
|
+ extra_sql
|
|
)
|
|
conn = database.session.connection()
|
|
result = conn.execute(text(sql))
|
|
|
|
# print(sql)
|
|
|
|
point_sql = (
|
|
f"""
|
|
SELECT 'point' as tbl, osm_id, tags, ST_AsText(ST_Centroid(way)) as centroid, ST_AsGeoJSON(way) as geojson
|
|
FROM planet_osm_point
|
|
WHERE ST_Intersects(ST_MakeEnvelope({bbox_str}, {srid}), way) and tags ? 'wikidata'
|
|
"""
|
|
+ extra_sql
|
|
)
|
|
|
|
print("point")
|
|
print(point_sql)
|
|
|
|
tagged = []
|
|
for tbl, osm_id, tags, centroid, geojson in result:
|
|
if tbl == "point":
|
|
osm_type = "node"
|
|
else:
|
|
osm_type = "way" if osm_id > 0 else "relation"
|
|
osm_id = abs(osm_id)
|
|
|
|
name = tags.get("name") or tags.get("addr:housename") or "[no label]"
|
|
|
|
tagged.append(
|
|
{
|
|
"identifier": f"{osm_type}/{osm_id}",
|
|
"id": osm_id,
|
|
"type": osm_type,
|
|
"geojson": json.loads(geojson),
|
|
"centroid": centroid,
|
|
"name": name,
|
|
"wikidata": tags["wikidata"],
|
|
}
|
|
)
|
|
|
|
return tagged
|
|
|
|
|
|
def get_items(item_ids: list[int]) -> list[model.Item]:
|
|
"""Get a Wikidata items with the given item IDs."""
|
|
items = []
|
|
for item_id in item_ids:
|
|
item = model.Item.query.get(item_id)
|
|
if not item:
|
|
if not get_and_save_item(f"Q{item_id}"):
|
|
continue
|
|
item = model.Item.query.get(item_id)
|
|
items.append(item)
|
|
|
|
return items
|
|
|
|
|
|
def get_item_tags(item: model.Item) -> dict[str, list[str]]:
|
|
isa_list: list[int] = [v["numeric-id"] for v in item.get_isa()]
|
|
isa_items = [(isa, []) for isa in get_items(isa_list)]
|
|
|
|
osm_list = defaultdict(list)
|
|
|
|
skip_isa = {row[0] for row in database.session.query(model.SkipIsA.item_id)}
|
|
|
|
tram_stop_id = 41176
|
|
airport_id = 1248784
|
|
aerodrome_id = 62447
|
|
if {tram_stop_id, airport_id, aerodrome_id} & set(isa_list):
|
|
skip_isa.add(41176) # building (Q41176)
|
|
|
|
seen = set(isa_list) | skip_isa
|
|
stop = {
|
|
"Q11799049": "public institution",
|
|
"Q7075": "library",
|
|
"Q329683": "industrial park",
|
|
}
|
|
while isa_items:
|
|
isa, isa_path = isa_items.pop()
|
|
if not isa:
|
|
continue
|
|
isa_path = isa_path + [{"qid": isa.qid, "label": isa.label()}]
|
|
osm = [v for v in isa.get_claim("P1282") if v not in skip_tags]
|
|
|
|
osm += [
|
|
extra.tag_or_key
|
|
for extra in model.ItemExtraKeys.query.filter_by(item_id=isa.item_id)
|
|
]
|
|
|
|
for i in osm:
|
|
osm_list[i].append(isa_path[:])
|
|
|
|
if isa.qid in stop:
|
|
# item is specific enough, no need to keep walking the item hierarchy
|
|
continue
|
|
|
|
check = set()
|
|
properties = [
|
|
("P279", "subclass of"),
|
|
("P140", "religion"),
|
|
("P641", "sport"),
|
|
("P366", "use"),
|
|
("P1269", "facet of"),
|
|
# ("P361", "part of"),
|
|
]
|
|
|
|
for pid, label in properties:
|
|
check |= {v["numeric-id"] for v in (isa.get_claim(pid) or []) if v}
|
|
|
|
print(isa.qid, isa.label(), check)
|
|
isa_list = check - seen
|
|
seen.update(isa_list)
|
|
isa_items += [(isa, isa_path) for isa in get_items(isa_list)]
|
|
return {key: list(values) for key, values in osm_list.items()}
|
|
|
|
|
|
def get_tags_for_isa_item(item):
|
|
isa_list = [item.item_id]
|
|
isa_items = [(item, [])]
|
|
|
|
osm_list = defaultdict(list)
|
|
|
|
skip_isa = {row[0] for row in database.session.query(model.SkipIsA.item_id)}
|
|
|
|
tram_stop_id = 41176
|
|
airport_id = 1248784
|
|
aerodrome_id = 62447
|
|
if {tram_stop_id, airport_id, aerodrome_id} & set(isa_list):
|
|
skip_isa.add(41176) # building (Q41176)
|
|
|
|
seen = set(isa_list) | skip_isa
|
|
stop = {
|
|
"Q11799049": "public institution",
|
|
"Q7075": "library",
|
|
"Q329683": "industrial park",
|
|
}
|
|
items_checked = []
|
|
items_checked_done = set()
|
|
while isa_items:
|
|
isa, isa_path = isa_items.pop()
|
|
if not isa:
|
|
continue
|
|
isa_path = isa_path + [{"qid": isa.qid, "label": isa.label()}]
|
|
if isa.item_id not in items_checked_done:
|
|
items_checked.append({"qid": isa.qid, "label": isa.label()})
|
|
items_checked_done.add(isa.item_id)
|
|
osm = [v for v in isa.get_claim("P1282") if v not in skip_tags]
|
|
|
|
osm += [
|
|
extra.tag_or_key
|
|
for extra in model.ItemExtraKeys.query.filter_by(item_id=isa.item_id)
|
|
]
|
|
|
|
for i in osm:
|
|
osm_list[i].append(isa_path[:])
|
|
|
|
if isa.qid in stop:
|
|
# item is specific enough, no need to keep walking the item hierarchy
|
|
continue
|
|
|
|
check = set()
|
|
properties = [
|
|
("P279", "subclass of"),
|
|
("P140", "religion"),
|
|
("P641", "sport"),
|
|
("P366", "use"),
|
|
("P1269", "facet of"),
|
|
# ("P361", "part of"),
|
|
]
|
|
|
|
for pid, label in properties:
|
|
check |= {v["numeric-id"] for v in (isa.get_claim(pid) or []) if v}
|
|
|
|
print(isa.qid, isa.label(), check)
|
|
isa_list = check - seen
|
|
seen.update(isa_list)
|
|
isa_items += [(isa, isa_path) for isa in get_items(isa_list)]
|
|
return {
|
|
"tags": {key: list(values) for key, values in osm_list.items()},
|
|
"checked": items_checked,
|
|
}
|
|
|
|
|
|
def add_isa_filter(q, isa_qids):
|
|
q_subclass = database.session.query(model.Item.qid).filter(
|
|
func.jsonb_path_query_array(
|
|
model.Item.claims,
|
|
"$.P279[*].mainsnak.datavalue.value.id",
|
|
).bool_op("?|")(list(isa_qids))
|
|
)
|
|
|
|
subclass_qid = {qid for qid, in q_subclass.all()}
|
|
|
|
isa = func.jsonb_path_query_array(
|
|
model.Item.claims,
|
|
"$.P31[*].mainsnak.datavalue.value.id",
|
|
).bool_op("?|")
|
|
return q.filter(isa(list(isa_qids | subclass_qid)))
|
|
|
|
|
|
def wikidata_items_count(bounds, isa_filter=None):
|
|
q = model.Item.query.join(model.ItemLocation).filter(
|
|
func.ST_Covers(make_envelope(bounds), model.ItemLocation.location)
|
|
)
|
|
|
|
if isa_filter:
|
|
q = add_isa_filter(q, isa_filter)
|
|
|
|
# print(q.statement.compile(compile_kwargs={"literal_binds": True}))
|
|
|
|
return q.count()
|
|
|
|
|
|
def wikidata_isa_counts(bounds, isa_filter=None):
|
|
db_bbox = make_envelope(bounds)
|
|
|
|
q = model.Item.query.join(model.ItemLocation).filter(
|
|
func.ST_Covers(db_bbox, model.ItemLocation.location)
|
|
)
|
|
|
|
if isa_filter:
|
|
q = add_isa_filter(q, isa_filter)
|
|
|
|
db_items = q.all()
|
|
|
|
counts = get_isa_count(db_items)
|
|
isa_ids = [qid[1:] for qid, count in counts]
|
|
isa_items = {
|
|
isa.qid: isa for isa in model.Item.query.filter(model.Item.item_id.in_(isa_ids))
|
|
}
|
|
isa_count = []
|
|
for qid, count in counts:
|
|
item = isa_items.get(qid)
|
|
if not item:
|
|
item = get_and_save_item(qid)
|
|
|
|
label = item.label() if item else "[missing]"
|
|
isa = {
|
|
"qid": qid,
|
|
"count": count,
|
|
"label": label,
|
|
}
|
|
isa_count.append(isa)
|
|
|
|
return isa_count
|
|
|
|
|
|
def get_tag_filter(tags, tag_list):
|
|
tag_filter = []
|
|
for tag_or_key in tag_list:
|
|
if tag_or_key.startswith("Key:"):
|
|
key = tag_or_key[4:]
|
|
tag_filter.append(and_(tags.has_key(key), tags[key] != "no"))
|
|
for prefix in tag_prefixes:
|
|
tag_filter.append(tags.has_key(f"{prefix}:{key}"))
|
|
|
|
if tag_or_key.startswith("Tag:"):
|
|
k, _, v = tag_or_key[4:].partition("=")
|
|
tag_filter.append(tags[k] == v)
|
|
for prefix in tag_prefixes:
|
|
tag_filter.append(tags[f"{prefix}:{k}"] == v)
|
|
|
|
return tag_filter
|
|
|
|
|
|
def get_preset_translations():
|
|
app = current_app
|
|
country_language = {
|
|
"AU": "en-AU", # Australia
|
|
"GB": "en-GB", # United Kingdom
|
|
"IE": "en-GB", # Ireland
|
|
"IN": "en-IN", # India
|
|
"NZ": "en-NZ", # New Zealand
|
|
}
|
|
ts_dir = app.config["ID_TAGGING_SCHEMA_DIR"]
|
|
translation_dir = os.path.join(ts_dir, "dist", "translations")
|
|
|
|
for code in g.alpha2_codes:
|
|
lang_code = country_language.get("code")
|
|
if not lang_code:
|
|
continue
|
|
filename = os.path.join(translation_dir, lang_code + ".json")
|
|
json_data = json.load(open(filename))
|
|
if lang_code not in json_data:
|
|
continue
|
|
|
|
try:
|
|
return json_data[lang_code]["presets"]["presets"]
|
|
except KeyError:
|
|
pass
|
|
|
|
return {}
|
|
|
|
|
|
def get_presets_from_tags(ending, tags):
|
|
translations = get_preset_translations()
|
|
|
|
found = []
|
|
|
|
for k, v in tags.items():
|
|
if k == "amenity" and v == "clock" and tags.get("display") == "sundial":
|
|
tag_or_key = f"Tag:{k}={v}"
|
|
found.append({"tag_or_key": tag_or_key, "name": "Sundial"})
|
|
continue
|
|
|
|
match = find_preset_file(k, v, ending)
|
|
if not match:
|
|
continue
|
|
|
|
preset = match["preset"]
|
|
if preset in translations:
|
|
match["name"] = translations[preset]["name"]
|
|
else:
|
|
match["name"] = json.load(open(match["filename"]))["name"]
|
|
|
|
del match["filename"]
|
|
|
|
found.append(match)
|
|
|
|
return found
|
|
|
|
|
|
def find_preset_file(k, v, ending):
|
|
app = current_app
|
|
ts_dir = app.config["ID_TAGGING_SCHEMA_DIR"]
|
|
preset_dir = os.path.join(ts_dir, "data", "presets")
|
|
|
|
filename = os.path.join(preset_dir, k, v + ".json")
|
|
if os.path.exists(filename):
|
|
return {
|
|
"tag_or_key": f"Tag:{k}={v}",
|
|
"preset": f"{k}/{v}",
|
|
"filename": filename,
|
|
}
|
|
|
|
filename = os.path.join(preset_dir, k, f"{v}_{ending}.json")
|
|
if os.path.exists(filename):
|
|
return {
|
|
"tag_or_key": f"Tag:{k}={v}",
|
|
"preset": f"{k}/{v}",
|
|
"filename": filename,
|
|
}
|
|
|
|
filename = os.path.join(preset_dir, k, "_" + v + ".json")
|
|
if os.path.exists(filename):
|
|
return {
|
|
"tag_or_key": f"Tag:{k}={v}",
|
|
"preset": f"{k}/{v}",
|
|
"filename": filename,
|
|
}
|
|
|
|
filename = os.path.join(preset_dir, k + ".json")
|
|
if os.path.exists(filename):
|
|
return {
|
|
"tag_or_key": f"Key:{k}",
|
|
"preset": k,
|
|
"filename": filename,
|
|
}
|
|
|
|
|
|
def address_from_tags(tags):
|
|
keys = ["street", "housenumber"]
|
|
if not all("addr:" + k in tags for k in keys):
|
|
return
|
|
|
|
if g.street_number_first:
|
|
keys.reverse()
|
|
return " ".join(tags["addr:" + k] for k in keys)
|
|
|
|
|
|
def address_node_label(tags):
|
|
address = address_from_tags(tags)
|
|
return f"{tags['name']} ({address})" if "name" in tags else address
|
|
|
|
|
|
def get_address_nodes_within_building(osm_id, bbox_list):
|
|
q = model.Point.query.filter(
|
|
polygon.c.osm_id == osm_id,
|
|
or_(*[func.ST_Intersects(bbox, model.Point.way) for bbox in bbox_list]),
|
|
func.ST_Covers(polygon.c.way, model.Point.way),
|
|
model.Point.tags.has_key("addr:street"),
|
|
model.Point.tags.has_key("addr:housenumber"),
|
|
)
|
|
|
|
return [node.tags for node in q]
|
|
|
|
|
|
def osm_display_name(tags):
|
|
keys = (
|
|
"bridge:name",
|
|
"tunnel:name",
|
|
"lock_name",
|
|
"name",
|
|
"addr:housename",
|
|
"inscription",
|
|
)
|
|
for key in keys:
|
|
if key in tags:
|
|
return tags[key]
|
|
|
|
|
|
def street_address_in_tags(tags):
|
|
return "addr:housenumber" in tags and "addr:street" in tags
|
|
|
|
|
|
def find_osm_candidates(item, limit=80, max_distance=450, names=None):
|
|
item_id = item.item_id
|
|
item_is_linear_feature = item.is_linear_feature()
|
|
item_is_street = item.is_street()
|
|
item_names_dict = item.names()
|
|
if item_names_dict:
|
|
item_names = {n.lower() for n in item_names_dict.keys()}
|
|
else:
|
|
item_names = set()
|
|
|
|
check_is_street_number_first(item.locations[0].get_lat_lon())
|
|
|
|
bbox_list = [
|
|
make_envelope_around_point(*loc.get_lat_lon(), max_distance)
|
|
for loc in item.locations
|
|
]
|
|
|
|
null_area = cast(None, Float)
|
|
dist = column("dist")
|
|
tags = column("tags", postgresql.HSTORE)
|
|
|
|
tag_list = get_item_tags(item)
|
|
# tag_filters = get_tag_filter(point.c.tags, tag_list)
|
|
# print(tag_filters)
|
|
|
|
s_point = (
|
|
select(
|
|
[
|
|
literal("point").label("t"),
|
|
point.c.osm_id,
|
|
point.c.tags.label("tags"),
|
|
func.min(
|
|
func.ST_DistanceSphere(model.ItemLocation.location, point.c.way)
|
|
).label("dist"),
|
|
func.ST_AsText(point.c.way),
|
|
func.ST_AsGeoJSON(point.c.way),
|
|
null_area,
|
|
]
|
|
)
|
|
.where(
|
|
and_(
|
|
or_(*[func.ST_Intersects(bbox, point.c.way) for bbox in bbox_list]),
|
|
model.ItemLocation.item_id == item_id,
|
|
or_(*get_tag_filter(point.c.tags, tag_list)),
|
|
)
|
|
)
|
|
.group_by(point.c.osm_id, point.c.tags, point.c.way)
|
|
)
|
|
|
|
s_line = (
|
|
select(
|
|
[
|
|
literal("line").label("t"),
|
|
line.c.osm_id,
|
|
line.c.tags.label("tags"),
|
|
func.min(
|
|
func.ST_DistanceSphere(model.ItemLocation.location, line.c.way)
|
|
).label("dist"),
|
|
func.ST_AsText(func.ST_Centroid(func.ST_Collect(line.c.way))),
|
|
func.ST_AsGeoJSON(func.ST_Collect(line.c.way)),
|
|
null_area,
|
|
]
|
|
)
|
|
.where(
|
|
and_(
|
|
or_(*[func.ST_Intersects(bbox, line.c.way) for bbox in bbox_list]),
|
|
model.ItemLocation.item_id == item_id,
|
|
or_(*get_tag_filter(line.c.tags, tag_list)),
|
|
)
|
|
)
|
|
.group_by(line.c.osm_id, line.c.tags)
|
|
)
|
|
|
|
s_polygon = (
|
|
select(
|
|
[
|
|
literal("polygon").label("t"),
|
|
polygon.c.osm_id,
|
|
polygon.c.tags.label("tags"),
|
|
func.min(
|
|
func.ST_DistanceSphere(model.ItemLocation.location, polygon.c.way)
|
|
).label("dist"),
|
|
func.ST_AsText(func.ST_Centroid(func.ST_Collect(polygon.c.way))),
|
|
func.ST_AsGeoJSON(func.ST_Collect(polygon.c.way)),
|
|
func.ST_Area(func.ST_Collect(polygon.c.way)),
|
|
]
|
|
)
|
|
.where(
|
|
and_(
|
|
or_(*[func.ST_Intersects(bbox, polygon.c.way) for bbox in bbox_list]),
|
|
model.ItemLocation.item_id == item_id,
|
|
or_(*get_tag_filter(polygon.c.tags, tag_list)),
|
|
)
|
|
)
|
|
.group_by(polygon.c.osm_id, polygon.c.tags)
|
|
.having(
|
|
func.ST_Area(func.ST_Collect(polygon.c.way))
|
|
< 20 * func.ST_Area(bbox_list[0])
|
|
)
|
|
)
|
|
|
|
tables = ([] if item_is_linear_feature else [s_point]) + [s_line, s_polygon]
|
|
s = select([union(*tables).alias()]).where(dist < max_distance).order_by(dist)
|
|
|
|
if names:
|
|
s = s.where(or_(tags["name"].in_(names), tags["old_name"].in_(names)))
|
|
|
|
if item_is_street:
|
|
s = s.where(tags["highway"] != "bus_stop")
|
|
if not names:
|
|
s = s.where(tags.has_key("name"))
|
|
|
|
if "Key:amenity" in tag_list:
|
|
s = s.where(
|
|
and_(
|
|
tags["amenity"] != "bicycle_parking",
|
|
tags["amenity"] != "bicycle_repair_station",
|
|
tags["amenity"] != "atm",
|
|
tags["amenity"] != "recycling",
|
|
)
|
|
)
|
|
|
|
if limit:
|
|
s = s.limit(limit)
|
|
|
|
# print(s.compile(compile_kwargs={"literal_binds": True}))
|
|
|
|
conn = database.session.connection()
|
|
nearby = []
|
|
for table, src_id, tags, distance, centroid, geojson, area in conn.execute(s):
|
|
osm_id = src_id
|
|
if table == "point":
|
|
osm_type = "node"
|
|
elif osm_id > 0:
|
|
osm_type = "way"
|
|
else:
|
|
osm_type = "relation"
|
|
osm_id = -osm_id
|
|
|
|
tags.pop("way_area", None)
|
|
name = osm_display_name(tags)
|
|
if not name and street_address_in_tags(tags):
|
|
name = address_from_tags(tags)
|
|
|
|
if table == "polygon" and "building" in tags:
|
|
address_nodes = get_address_nodes_within_building(src_id, bbox_list)
|
|
address_list = [address_node_label(addr) for addr in address_nodes]
|
|
else:
|
|
address_list = []
|
|
|
|
shape = "area" if table == "polygon" else table
|
|
|
|
item_identifier_tags = item.get_identifiers_tags()
|
|
|
|
cur = {
|
|
"identifier": f"{osm_type}/{osm_id}",
|
|
"type": osm_type,
|
|
"id": osm_id,
|
|
"distance": distance,
|
|
"name": name,
|
|
"name_match": (name and name.lower() in item_names),
|
|
"tags": tags,
|
|
"geojson": json.loads(geojson),
|
|
"presets": get_presets_from_tags(shape, tags),
|
|
"address_list": address_list,
|
|
"centroid": list(reversed(re_point.match(centroid).groups())),
|
|
}
|
|
if area is not None:
|
|
cur["area"] = area
|
|
|
|
part_of = []
|
|
for bbox in bbox_list:
|
|
part_of += [
|
|
i for i in get_part_of(table, src_id, bbox) if i["tags"]["name"] != name
|
|
]
|
|
if part_of:
|
|
cur["part_of"] = part_of
|
|
|
|
if address := address_from_tags(tags):
|
|
cur["address"] = address
|
|
|
|
nearby.append(cur)
|
|
|
|
return nearby
|
|
|
|
|
|
def get_item(item_id):
|
|
"""Retrieve a Wikidata item, either from the database or from Wikidata."""
|
|
|
|
item = model.Item.query.get(item_id)
|
|
return item or get_and_save_item(f"Q{item_id}")
|
|
|
|
|
|
def get_item_street_addresses(item):
|
|
street_address = [addr["text"] for addr in item.get_claim("P6375") if addr]
|
|
if street_address or "P669" not in item.claims:
|
|
return street_address
|
|
|
|
for claim in item.claims["P669"]:
|
|
qualifiers = claim.get("qualifiers")
|
|
if not qualifiers or "P670" not in qualifiers:
|
|
continue
|
|
number = qualifiers["P670"][0]["datavalue"]["value"]
|
|
|
|
street_item = get_item(claim["mainsnak"]["datavalue"]["value"]["numeric-id"])
|
|
street = street_item.label()
|
|
for q in qualifiers["P670"]:
|
|
number = q["datavalue"]["value"]
|
|
address = (
|
|
f"{number} {street}" if g.street_number_first else f"{street} {number}"
|
|
)
|
|
street_address.append(address)
|
|
|
|
return street_address
|
|
|
|
|
|
def check_is_street_number_first(latlng):
|
|
g.street_number_first = is_street_number_first(*latlng)
|
|
|
|
|
|
def item_detail(item):
|
|
unsupported_relation_types = {
|
|
"Q194356", # wind farm
|
|
"Q2175765", # tram stop
|
|
}
|
|
|
|
locations = [list(i.get_lat_lon()) for i in item.locations]
|
|
if not hasattr(g, "street_number_first"):
|
|
g.street_number_first = is_street_number_first(*locations[0])
|
|
|
|
image_filenames = item.get_claim("P18")
|
|
|
|
street_address = get_item_street_addresses(item)
|
|
|
|
heritage_designation = []
|
|
for v in item.get_claim("P1435"):
|
|
if not v:
|
|
print("heritage designation missing:", item.qid)
|
|
continue
|
|
heritage_designation_item = get_item(v["numeric-id"])
|
|
heritage_designation.append(
|
|
{
|
|
"qid": v["id"],
|
|
"label": heritage_designation_item.label(),
|
|
}
|
|
)
|
|
|
|
isa_items = [get_item(isa["numeric-id"]) for isa in item.get_isa()]
|
|
isa_lookup = {isa.qid: isa for isa in isa_items}
|
|
|
|
wikipedia_links = [
|
|
{"lang": site[:-4], "title": link["title"]}
|
|
for site, link in sorted(item.sitelinks.items())
|
|
if site.endswith("wiki") and len(site) < 8
|
|
]
|
|
|
|
d = {
|
|
"qid": item.qid,
|
|
"label": item.label(),
|
|
"description": item.description(),
|
|
"markers": locations,
|
|
"image_list": image_filenames,
|
|
"street_address": street_address,
|
|
"isa_list": [
|
|
{"qid": isa.qid, "label": isa.label()} for isa in isa_items if isa
|
|
],
|
|
"closed": item.closed(),
|
|
"inception": item.time_claim("P571"),
|
|
"p1619": item.time_claim("P1619"),
|
|
"p576": item.time_claim("P576"),
|
|
"heritage_designation": heritage_designation,
|
|
"wikipedia": wikipedia_links,
|
|
"identifiers": item.get_identifiers(),
|
|
}
|
|
|
|
if aliases := item.get_aliases():
|
|
d["aliases"] = aliases
|
|
|
|
if "commonswiki" in item.sitelinks:
|
|
d["commons"] = item.sitelinks["commonswiki"]["title"]
|
|
|
|
unsupported = isa_lookup.keys() & unsupported_relation_types
|
|
if unsupported:
|
|
d["unsupported_relation_types"] = [
|
|
isa for isa in d["isa_list"] if isa["qid"] in isa_lookup
|
|
]
|
|
|
|
return d
|
|
|
|
|
|
def get_markers(all_items):
|
|
return [item_detail(item) for item in all_items if item]
|
|
|
|
|
|
def wikidata_items(bounds, isa_filter=None):
|
|
check_is_street_number_first(get_bbox_centroid(bounds))
|
|
q = get_items_in_bbox(bounds)
|
|
|
|
if isa_filter:
|
|
q = add_isa_filter(q, isa_filter)
|
|
|
|
db_items = q.all()
|
|
items = get_markers(db_items)
|
|
|
|
counts = get_isa_count(db_items)
|
|
isa_ids = [qid[1:] for qid, count in counts]
|
|
isa_items = {
|
|
isa.qid: isa for isa in model.Item.query.filter(model.Item.item_id.in_(isa_ids))
|
|
}
|
|
|
|
isa_count = []
|
|
for qid, count in counts:
|
|
item = isa_items.get(qid)
|
|
if not item:
|
|
item = get_and_save_item(qid)
|
|
|
|
label = item.label() if item else "[missing]"
|
|
isa = {
|
|
"qid": qid,
|
|
"count": count,
|
|
"label": label,
|
|
}
|
|
isa_count.append(isa)
|
|
|
|
return {"items": items, "isa_count": isa_count}
|
|
|
|
|
|
def missing_wikidata_items(qids, lat, lon):
|
|
g.street_number_first = is_street_number_first(lat, lon)
|
|
|
|
db_items = []
|
|
for qid in qids:
|
|
item = model.Item.query.get(qid[1:])
|
|
if not item:
|
|
item = get_and_save_item(qid)
|
|
db_items.append(item)
|
|
items = get_markers(db_items)
|
|
counts = get_isa_count(db_items)
|
|
isa_ids = [qid[1:] for qid, count in counts]
|
|
isa_items = {
|
|
isa.qid: isa for isa in model.Item.query.filter(model.Item.item_id.in_(isa_ids))
|
|
}
|
|
|
|
isa_count = []
|
|
for qid, count in counts:
|
|
item = isa_items.get(qid)
|
|
if not item:
|
|
item = get_and_save_item(qid)
|
|
|
|
label = item.label() if item else "[missing]"
|
|
isa = {
|
|
"qid": qid,
|
|
"count": count,
|
|
"label": label,
|
|
}
|
|
isa_count.append(isa)
|
|
|
|
return dict(items=items, isa_count=isa_count)
|
|
|
|
|
|
def isa_incremental_search(search_terms):
|
|
en_label = func.jsonb_extract_path_text(model.Item.labels, "en", "value")
|
|
q = model.Item.query.filter(
|
|
model.Item.claims.has_key("P1282"),
|
|
en_label.ilike(f"%{search_terms}%"),
|
|
func.length(en_label) < 20,
|
|
)
|
|
|
|
# print(q.statement.compile(compile_kwargs={"literal_binds": True}))
|
|
|
|
ret = []
|
|
for item in q:
|
|
cur = {
|
|
"qid": item.qid,
|
|
"label": item.label(),
|
|
}
|
|
ret.append(cur)
|
|
return ret
|
|
|
|
|
|
def get_place_items(osm_type, osm_id):
|
|
src_id = osm_id * {"way": 1, "relation": -1}[osm_type]
|
|
|
|
q = (
|
|
model.Item.query.join(model.ItemLocation)
|
|
.join(
|
|
model.Polygon,
|
|
func.ST_Covers(model.Polygon.way, model.ItemLocation.location),
|
|
)
|
|
.filter(model.Polygon.src_id == src_id)
|
|
)
|
|
# sql = q.statement.compile(compile_kwargs={"literal_binds": True})
|
|
|
|
item_count = q.count()
|
|
items = []
|
|
for item in q:
|
|
keys = ["item_id", "labels", "descriptions", "aliases", "sitelinks", "claims"]
|
|
item_dict = {key: getattr(item, key) for key in keys}
|
|
items.append(item_dict)
|
|
|
|
return {"count": item_count, "items": items}
|