Merge branch 'main' of github.com:EdwardBetts/owl-map

This commit is contained in:
Edward Betts 2023-05-14 10:30:15 +00:00
commit 6ce8b30fcc
14 changed files with 847 additions and 452 deletions

View file

@ -1,19 +1,21 @@
from sqlalchemy import func, or_, and_, text
import json
import os.path
import re
from collections import Counter, defaultdict
from flask import current_app, g
from sqlalchemy import and_, func, or_, text
from sqlalchemy.dialects import postgresql
from sqlalchemy.orm import selectinload
from sqlalchemy.sql import select
from sqlalchemy.sql.expression import literal, union, cast, column
from sqlalchemy.sql.expression import cast, column, literal, union
from sqlalchemy.types import Float
from sqlalchemy.dialects import postgresql
from matcher.planet import point, line, polygon
from matcher import model, database, wikidata_api, wikidata
from collections import Counter, defaultdict
from flask import g, current_app
import re
import os.path
import json
from matcher import database, model, wikidata, wikidata_api
from matcher.planet import line, point, polygon
srid = 4326
re_point = re.compile(r'^POINT\((.+) (.+)\)$')
re_point = re.compile(r"^POINT\((.+) (.+)\)$")
entity_keys = {"labels", "sitelinks", "aliases", "claims", "descriptions", "lastrevid"}
tag_prefixes = {
@ -37,9 +39,9 @@ skip_tags = {
"Key:brand",
}
def get_country_iso3166_1(lat, lon):
"""
For a given lat/lon return a set of ISO country codes.
"""For a given lat/lon return a set of ISO country codes.
Also cache the country code in the global object.
@ -48,8 +50,9 @@ def get_country_iso3166_1(lat, lon):
point = func.ST_SetSRID(func.ST_MakePoint(lon, lat), srid)
alpha2_codes = set()
q = model.Polygon.query.filter(func.ST_Covers(model.Polygon.way, point),
model.Polygon.admin_level == "2")
q = model.Polygon.query.filter(
func.ST_Covers(model.Polygon.way, point), model.Polygon.admin_level == "2"
)
for country in q:
alpha2 = country.tags.get("ISO3166-1")
if not alpha2:
@ -60,22 +63,23 @@ def get_country_iso3166_1(lat, lon):
return alpha2_codes
def is_street_number_first(lat, lon):
def is_street_number_first(lat: float, lon: float) -> bool:
"""Is lat/lon within a country that puts number first in a street address."""
if lat is None or lon is None:
return True
alpha2 = get_country_iso3166_1(lat, lon)
# Incomplete list of countries that put street number first.
alpha2_number_first = {
'GB', # United Kingdom
'IE', # Ireland
'US', # United States
'MX', # Mexico
'CA', # Canada
'FR', # France
'AU', # Australia
'NZ', # New Zealand
'ZA', # South Africa
"GB", # United Kingdom
"IE", # Ireland
"US", # United States
"MX", # Mexico
"CA", # Canada
"FR", # France
"AU", # Australia
"NZ", # New Zealand
"ZA", # South Africa
}
return bool(alpha2_number_first & alpha2)
@ -84,22 +88,26 @@ def is_street_number_first(lat, lon):
def make_envelope(bounds):
return func.ST_MakeEnvelope(*bounds, srid)
def get_bbox_centroid(bbox):
bbox = make_envelope(bbox)
centroid = database.session.query(func.ST_AsText(func.ST_Centroid(bbox))).scalar()
return reversed(re_point.match(centroid).groups())
def make_envelope_around_point(lat, lon, distance):
conn = database.session.connection()
p = func.ST_MakePoint(lon, lat)
s = select([
func.ST_AsText(func.ST_Project(p, distance, func.radians(0))),
func.ST_AsText(func.ST_Project(p, distance, func.radians(90))),
func.ST_AsText(func.ST_Project(p, distance, func.radians(180))),
func.ST_AsText(func.ST_Project(p, distance, func.radians(270))),
])
s = select(
[
func.ST_AsText(func.ST_Project(p, distance, func.radians(0))),
func.ST_AsText(func.ST_Project(p, distance, func.radians(90))),
func.ST_AsText(func.ST_Project(p, distance, func.radians(180))),
func.ST_AsText(func.ST_Project(p, distance, func.radians(270))),
]
)
row = conn.execute(s).fetchone()
coords = [[float(v) for v in re_point.match(i).groups()] for i in row]
@ -110,49 +118,64 @@ def make_envelope_around_point(lat, lon, distance):
return func.ST_MakeEnvelope(west, south, east, north, srid)
def drop_way_area(tags):
""" Remove the way_area field from a tags dict. """
def drop_way_area(tags: dict[str, str]) -> dict[str, str]:
"""Remove the way_area field from a tags dict."""
if "way_area" in tags:
del tags["way_area"]
return tags
def get_part_of(table_name, src_id, bbox):
table_map = {'point': point, 'line': line, 'polygon': polygon}
table_map = {"point": point, "line": line, "polygon": polygon}
table_alias = table_map[table_name].alias()
s = (select([polygon.c.osm_id,
polygon.c.tags,
func.ST_Area(func.ST_Collect(polygon.c.way))]).
where(and_(func.ST_Intersects(bbox, polygon.c.way),
func.ST_Covers(polygon.c.way, table_alias.c.way),
table_alias.c.osm_id == src_id,
polygon.c.tags.has_key("name"),
or_(
polygon.c.tags.has_key("landuse"),
polygon.c.tags.has_key("amenity"),
))).
group_by(polygon.c.osm_id, polygon.c.tags))
s = (
select(
[
polygon.c.osm_id,
polygon.c.tags,
func.ST_Area(func.ST_Collect(polygon.c.way)),
]
)
.where(
and_(
func.ST_Intersects(bbox, polygon.c.way),
func.ST_Covers(polygon.c.way, table_alias.c.way),
table_alias.c.osm_id == src_id,
polygon.c.tags.has_key("name"),
or_(
polygon.c.tags.has_key("landuse"),
polygon.c.tags.has_key("amenity"),
),
)
)
.group_by(polygon.c.osm_id, polygon.c.tags)
)
conn = database.session.connection()
return [{
"type": "way" if osm_id > 0 else "relation",
"id": abs(osm_id),
"tags": tags,
"area": area,
} for osm_id, tags, area in conn.execute(s)]
return [
{
"type": "way" if osm_id > 0 else "relation",
"id": abs(osm_id),
"tags": tags,
"area": area,
}
for osm_id, tags, area in conn.execute(s)
]
def get_and_save_item(qid):
""" Download an item from Wikidata and cache it in the database. """
def get_and_save_item(qid: str) -> model.Item | None:
"""Download an item from Wikidata and cache it in the database."""
entity = wikidata_api.get_entity(qid)
entity_qid = entity["id"]
if entity_qid != qid:
print(f'redirect {qid} -> {entity_qid}')
print(f"redirect {qid} -> {entity_qid}")
item = model.Item.query.get(entity_qid[1:])
return item
if "claims" not in entity:
return
return None
coords = wikidata.get_entity_coords(entity["claims"])
item_id = int(qid[1:])
@ -171,8 +194,9 @@ def get_and_save_item(qid):
return item
def get_isa_count(items):
isa_count = Counter()
def get_isa_count(items: list[model.Item]) -> list[tuple[int, int]]:
"""List of IsA counts."""
isa_count: Counter[int] = Counter()
for item in items:
if not item:
continue
@ -199,13 +223,11 @@ def get_items_in_bbox(bbox):
def get_osm_with_wikidata_tag(bbox, isa_filter=None):
bbox_str = ','.join(str(v) for v in bbox)
bbox_str = ",".join(str(v) for v in bbox)
extra_sql = ""
if isa_filter:
q = (
model.Item.query.join(model.ItemLocation)
.filter(func.ST_Covers(make_envelope(bbox),
model.ItemLocation.location))
q = model.Item.query.join(model.ItemLocation).filter(
func.ST_Covers(make_envelope(bbox), model.ItemLocation.location)
)
q = add_isa_filter(q, isa_filter)
qids = [isa.qid for isa in q]
@ -216,7 +238,8 @@ def get_osm_with_wikidata_tag(bbox, isa_filter=None):
extra_sql += f" AND tags -> 'wikidata' in ({qid_list})"
# easier than building this query with SQLAlchemy
sql = f'''
sql = (
f"""
SELECT tbl, osm_id, tags, ARRAY[ST_Y(centroid), ST_X(centroid)], geojson
FROM (
SELECT 'point' as tbl, osm_id, tags, ST_AsText(ST_Centroid(way)) as centroid, ST_AsGeoJSON(way) as geojson
@ -235,24 +258,29 @@ UNION
HAVING st_area(st_collect(way)) < 20 * st_area(ST_MakeEnvelope({bbox_str}, {srid}))
) as anon
WHERE tags ? 'wikidata'
''' + extra_sql
"""
+ extra_sql
)
conn = database.session.connection()
result = conn.execute(text(sql))
print(sql)
point_sql = f'''
point_sql = (
f"""
SELECT 'point' as tbl, osm_id, tags, ST_AsText(ST_Centroid(way)) as centroid, ST_AsGeoJSON(way) as geojson
FROM planet_osm_point
WHERE ST_Intersects(ST_MakeEnvelope({bbox_str}, {srid}), way) and tags ? 'wikidata'
''' + extra_sql
"""
+ extra_sql
)
print("point")
print(point_sql)
tagged = []
for tbl, osm_id, tags, centroid, geojson in result:
if tbl == 'point':
if tbl == "point":
osm_type = "node"
else:
osm_type = "way" if osm_id > 0 else "relation"
@ -260,15 +288,17 @@ WHERE tags ? 'wikidata'
name = tags.get("name") or tags.get("addr:housename") or "[no label]"
tagged.append({
"identifier": f"{osm_type}/{osm_id}",
"id": osm_id,
"type": osm_type,
"geojson": json.loads(geojson),
"centroid": centroid,
"name": name,
"wikidata": tags["wikidata"],
})
tagged.append(
{
"identifier": f"{osm_type}/{osm_id}",
"id": osm_id,
"type": osm_type,
"geojson": json.loads(geojson),
"centroid": centroid,
"name": name,
"wikidata": tags["wikidata"],
}
)
return tagged
@ -310,11 +340,13 @@ def get_item_tags(item):
isa, isa_path = isa_items.pop()
if not isa:
continue
isa_path = isa_path + [{'qid': isa.qid, 'label': isa.label()}]
isa_path = isa_path + [{"qid": isa.qid, "label": isa.label()}]
osm = [v for v in isa.get_claim("P1282") if v not in skip_tags]
osm += [extra.tag_or_key
for extra in model.ItemExtraKeys.query.filter_by(item_id=isa.item_id)]
osm += [
extra.tag_or_key
for extra in model.ItemExtraKeys.query.filter_by(item_id=isa.item_id)
]
for i in osm:
osm_list[i].append(isa_path[:])
@ -369,14 +401,16 @@ def get_tags_for_isa_item(item):
isa, isa_path = isa_items.pop()
if not isa:
continue
isa_path = isa_path + [{'qid': isa.qid, 'label': isa.label()}]
isa_path = isa_path + [{"qid": isa.qid, "label": isa.label()}]
if isa.item_id not in items_checked_done:
items_checked.append({'qid': isa.qid, 'label': isa.label()})
items_checked.append({"qid": isa.qid, "label": isa.label()})
items_checked_done.add(isa.item_id)
osm = [v for v in isa.get_claim("P1282") if v not in skip_tags]
osm += [extra.tag_or_key
for extra in model.ItemExtraKeys.query.filter_by(item_id=isa.item_id)]
osm += [
extra.tag_or_key
for extra in model.ItemExtraKeys.query.filter_by(item_id=isa.item_id)
]
for i in osm:
osm_list[i].append(isa_path[:])
@ -403,34 +437,31 @@ def get_tags_for_isa_item(item):
seen.update(isa_list)
isa_items += [(isa, isa_path) for isa in get_items(isa_list)]
return {
'tags': {key: list(values) for key, values in osm_list.items()},
'checked': items_checked,
"tags": {key: list(values) for key, values in osm_list.items()},
"checked": items_checked,
}
def add_isa_filter(q, isa_qids):
q_subclass = database.session.query(model.Item.qid).filter(
func.jsonb_path_query_array(
model.Item.claims,
'$.P279[*].mainsnak.datavalue.value.id',
).bool_op('?|')(list(isa_qids))
"$.P279[*].mainsnak.datavalue.value.id",
).bool_op("?|")(list(isa_qids))
)
subclass_qid = {qid for qid, in q_subclass.all()}
isa = func.jsonb_path_query_array(
model.Item.claims,
'$.P31[*].mainsnak.datavalue.value.id',
).bool_op('?|')
"$.P31[*].mainsnak.datavalue.value.id",
).bool_op("?|")
return q.filter(isa(list(isa_qids | subclass_qid)))
def wikidata_items_count(bounds, isa_filter=None):
q = (
model.Item.query.join(model.ItemLocation)
.filter(func.ST_Covers(make_envelope(bounds), model.ItemLocation.location))
q = model.Item.query.join(model.ItemLocation).filter(
func.ST_Covers(make_envelope(bounds), model.ItemLocation.location)
)
if isa_filter:
@ -440,12 +471,12 @@ def wikidata_items_count(bounds, isa_filter=None):
return q.count()
def wikidata_isa_counts(bounds, isa_filter=None):
db_bbox = make_envelope(bounds)
q = (
model.Item.query.join(model.ItemLocation)
.filter(func.ST_Covers(db_bbox, model.ItemLocation.location))
q = model.Item.query.join(model.ItemLocation).filter(
func.ST_Covers(db_bbox, model.ItemLocation.location)
)
if isa_filter:
@ -474,12 +505,13 @@ def wikidata_isa_counts(bounds, isa_filter=None):
return isa_count
def get_tag_filter(tags, tag_list):
tag_filter = []
for tag_or_key in tag_list:
if tag_or_key.startswith("Key:"):
key = tag_or_key[4:]
tag_filter.append(and_(tags.has_key(key), tags[key] != 'no'))
tag_filter.append(and_(tags.has_key(key), tags[key] != "no"))
for prefix in tag_prefixes:
tag_filter.append(tags.has_key(f"{prefix}:{key}"))
@ -495,11 +527,11 @@ def get_tag_filter(tags, tag_list):
def get_preset_translations():
app = current_app
country_language = {
'AU': 'en-AU', # Australia
'GB': 'en-GB', # United Kingdom
'IE': 'en-GB', # Ireland
'IN': 'en-IN', # India
'NZ': 'en-NZ', # New Zealand
"AU": "en-AU", # Australia
"GB": "en-GB", # United Kingdom
"IE": "en-GB", # Ireland
"IN": "en-IN", # India
"NZ": "en-NZ", # New Zealand
}
ts_dir = app.config["ID_TAGGING_SCHEMA_DIR"]
translation_dir = os.path.join(ts_dir, "dist", "translations")
@ -520,13 +552,14 @@ def get_preset_translations():
return {}
def get_presets_from_tags(ending, tags):
translations = get_preset_translations()
found = []
for k, v in tags.items():
if k == 'amenity' and v == 'clock' and tags.get('display') == 'sundial':
if k == "amenity" and v == "clock" and tags.get("display") == "sundial":
tag_or_key = f"Tag:{k}={v}"
found.append({"tag_or_key": tag_or_key, "name": "Sundial"})
continue
@ -604,8 +637,7 @@ def address_node_label(tags):
def get_address_nodes_within_building(osm_id, bbox_list):
q = model.Point.query.filter(
polygon.c.osm_id == osm_id,
or_(*[func.ST_Intersects(bbox, model.Point.way)
for bbox in bbox_list]),
or_(*[func.ST_Intersects(bbox, model.Point.way) for bbox in bbox_list]),
func.ST_Covers(polygon.c.way, model.Point.way),
model.Point.tags.has_key("addr:street"),
model.Point.tags.has_key("addr:housenumber"),
@ -615,8 +647,14 @@ def get_address_nodes_within_building(osm_id, bbox_list):
def osm_display_name(tags):
keys = ("bridge:name", "tunnel:name", "lock_name", "name", "addr:housename",
"inscription")
keys = (
"bridge:name",
"tunnel:name",
"lock_name",
"name",
"addr:housename",
"inscription",
)
for key in keys:
if key in tags:
return tags[key]
@ -625,6 +663,7 @@ def osm_display_name(tags):
def street_address_in_tags(tags):
return "addr:housenumber" in tags and "addr:street" in tags
def find_osm_candidates(item, limit=80, max_distance=450, names=None):
item_id = item.item_id
item_is_linear_feature = item.is_linear_feature()
@ -637,51 +676,94 @@ def find_osm_candidates(item, limit=80, max_distance=450, names=None):
check_is_street_number_first(item.locations[0].get_lat_lon())
bbox_list = [make_envelope_around_point(*loc.get_lat_lon(), max_distance)
for loc in item.locations]
bbox_list = [
make_envelope_around_point(*loc.get_lat_lon(), max_distance)
for loc in item.locations
]
null_area = cast(None, Float)
dist = column('dist')
tags = column('tags', postgresql.HSTORE)
dist = column("dist")
tags = column("tags", postgresql.HSTORE)
tag_list = get_item_tags(item)
# tag_filters = get_tag_filter(point.c.tags, tag_list)
# print(tag_filters)
s_point = (select([literal('point').label('t'), point.c.osm_id, point.c.tags.label('tags'),
func.min(func.ST_DistanceSphere(model.ItemLocation.location, point.c.way)).label('dist'),
func.ST_AsText(point.c.way),
func.ST_AsGeoJSON(point.c.way),
null_area]).
where(and_(
or_(*[func.ST_Intersects(bbox, point.c.way)
for bbox in bbox_list]),
model.ItemLocation.item_id == item_id,
or_(*get_tag_filter(point.c.tags, tag_list)))).
group_by(point.c.osm_id, point.c.tags, point.c.way))
s_point = (
select(
[
literal("point").label("t"),
point.c.osm_id,
point.c.tags.label("tags"),
func.min(
func.ST_DistanceSphere(model.ItemLocation.location, point.c.way)
).label("dist"),
func.ST_AsText(point.c.way),
func.ST_AsGeoJSON(point.c.way),
null_area,
]
)
.where(
and_(
or_(*[func.ST_Intersects(bbox, point.c.way) for bbox in bbox_list]),
model.ItemLocation.item_id == item_id,
or_(*get_tag_filter(point.c.tags, tag_list)),
)
)
.group_by(point.c.osm_id, point.c.tags, point.c.way)
)
s_line = (select([literal('line').label('t'), line.c.osm_id, line.c.tags.label('tags'),
func.min(func.ST_DistanceSphere(model.ItemLocation.location, line.c.way)).label('dist'),
func.ST_AsText(func.ST_Centroid(func.ST_Collect(line.c.way))),
func.ST_AsGeoJSON(func.ST_Collect(line.c.way)),
null_area]).
where(and_(
or_(*[func.ST_Intersects(bbox, line.c.way) for bbox in bbox_list]),
model.ItemLocation.item_id == item_id,
or_(*get_tag_filter(line.c.tags, tag_list)))).
group_by(line.c.osm_id, line.c.tags))
s_line = (
select(
[
literal("line").label("t"),
line.c.osm_id,
line.c.tags.label("tags"),
func.min(
func.ST_DistanceSphere(model.ItemLocation.location, line.c.way)
).label("dist"),
func.ST_AsText(func.ST_Centroid(func.ST_Collect(line.c.way))),
func.ST_AsGeoJSON(func.ST_Collect(line.c.way)),
null_area,
]
)
.where(
and_(
or_(*[func.ST_Intersects(bbox, line.c.way) for bbox in bbox_list]),
model.ItemLocation.item_id == item_id,
or_(*get_tag_filter(line.c.tags, tag_list)),
)
)
.group_by(line.c.osm_id, line.c.tags)
)
s_polygon = (select([literal('polygon').label('t'), polygon.c.osm_id, polygon.c.tags.label('tags'),
func.min(func.ST_DistanceSphere(model.ItemLocation.location, polygon.c.way)).label('dist'),
func.ST_AsText(func.ST_Centroid(func.ST_Collect(polygon.c.way))),
func.ST_AsGeoJSON(func.ST_Collect(polygon.c.way)),
func.ST_Area(func.ST_Collect(polygon.c.way))]).
where(and_(
or_(*[func.ST_Intersects(bbox, polygon.c.way) for bbox in bbox_list]),
model.ItemLocation.item_id == item_id,
or_(*get_tag_filter(polygon.c.tags, tag_list)))).
group_by(polygon.c.osm_id, polygon.c.tags).
having(func.ST_Area(func.ST_Collect(polygon.c.way)) < 20 * func.ST_Area(bbox_list[0])))
s_polygon = (
select(
[
literal("polygon").label("t"),
polygon.c.osm_id,
polygon.c.tags.label("tags"),
func.min(
func.ST_DistanceSphere(model.ItemLocation.location, polygon.c.way)
).label("dist"),
func.ST_AsText(func.ST_Centroid(func.ST_Collect(polygon.c.way))),
func.ST_AsGeoJSON(func.ST_Collect(polygon.c.way)),
func.ST_Area(func.ST_Collect(polygon.c.way)),
]
)
.where(
and_(
or_(*[func.ST_Intersects(bbox, polygon.c.way) for bbox in bbox_list]),
model.ItemLocation.item_id == item_id,
or_(*get_tag_filter(polygon.c.tags, tag_list)),
)
)
.group_by(polygon.c.osm_id, polygon.c.tags)
.having(
func.ST_Area(func.ST_Collect(polygon.c.way))
< 20 * func.ST_Area(bbox_list[0])
)
)
tables = ([] if item_is_linear_feature else [s_point]) + [s_line, s_polygon]
s = select([union(*tables).alias()]).where(dist < max_distance).order_by(dist)
@ -695,10 +777,14 @@ def find_osm_candidates(item, limit=80, max_distance=450, names=None):
s = s.where(tags.has_key("name"))
if "Key:amenity" in tag_list:
s = s.where(and_(tags["amenity"] != "bicycle_parking",
tags["amenity"] != "bicycle_repair_station",
tags["amenity"] != "atm",
tags["amenity"] != "recycling"))
s = s.where(
and_(
tags["amenity"] != "bicycle_parking",
tags["amenity"] != "bicycle_repair_station",
tags["amenity"] != "atm",
tags["amenity"] != "recycling",
)
)
if limit:
s = s.limit(limit)
@ -730,6 +816,8 @@ def find_osm_candidates(item, limit=80, max_distance=450, names=None):
shape = "area" if table == "polygon" else table
item_identifier_tags = item.get_identifiers_tags()
cur = {
"identifier": f"{osm_type}/{osm_id}",
"type": osm_type,
@ -748,8 +836,9 @@ def find_osm_candidates(item, limit=80, max_distance=450, names=None):
part_of = []
for bbox in bbox_list:
part_of += [i for i in get_part_of(table, src_id, bbox)
if i["tags"]["name"] != name]
part_of += [
i for i in get_part_of(table, src_id, bbox) if i["tags"]["name"] != name
]
if part_of:
cur["part_of"] = part_of
@ -760,9 +849,9 @@ def find_osm_candidates(item, limit=80, max_distance=450, names=None):
return nearby
def get_item(item_id):
""" Retrieve a Wikidata item, either from the database or from Wikidata. """
def get_item(item_id):
"""Retrieve a Wikidata item, either from the database or from Wikidata."""
item = model.Item.query.get(item_id)
return item or get_and_save_item(f"Q{item_id}")
@ -774,7 +863,7 @@ def get_item_street_addresses(item):
for claim in item.claims["P669"]:
qualifiers = claim.get("qualifiers")
if not qualifiers or 'P670' not in qualifiers:
if not qualifiers or "P670" not in qualifiers:
continue
number = qualifiers["P670"][0]["datavalue"]["value"]
@ -782,24 +871,26 @@ def get_item_street_addresses(item):
street = street_item.label()
for q in qualifiers["P670"]:
number = q["datavalue"]["value"]
address = (f"{number} {street}"
if g.street_number_first
else f"{street} {number}")
address = (
f"{number} {street}" if g.street_number_first else f"{street} {number}"
)
street_address.append(address)
return street_address
def check_is_street_number_first(latlng):
g.street_number_first = is_street_number_first(*latlng)
def item_detail(item):
unsupported_relation_types = {
'Q194356', # wind farm
'Q2175765', # tram stop
"Q194356", # wind farm
"Q2175765", # tram stop
}
locations = [list(i.get_lat_lon()) for i in item.locations]
if not hasattr(g, 'street_number_first'):
if not hasattr(g, "street_number_first"):
g.street_number_first = is_street_number_first(*locations[0])
image_filenames = item.get_claim("P18")
@ -809,20 +900,24 @@ def item_detail(item):
heritage_designation = []
for v in item.get_claim("P1435"):
if not v:
print('heritage designation missing:', item.qid)
print("heritage designation missing:", item.qid)
continue
heritage_designation_item = get_item(v["numeric-id"])
heritage_designation.append({
"qid": v["id"],
"label": heritage_designation_item.label(),
})
heritage_designation.append(
{
"qid": v["id"],
"label": heritage_designation_item.label(),
}
)
isa_items = [get_item(isa["numeric-id"]) for isa in item.get_isa()]
isa_lookup = {isa.qid: isa for isa in isa_items}
wikipedia_links = [{"lang": site[:-4], "title": link["title"]}
for site, link in sorted(item.sitelinks.items())
if site.endswith("wiki") and len(site) < 8]
wikipedia_links = [
{"lang": site[:-4], "title": link["title"]}
for site, link in sorted(item.sitelinks.items())
if site.endswith("wiki") and len(site) < 8
]
d = {
"qid": item.qid,
@ -831,7 +926,9 @@ def item_detail(item):
"markers": locations,
"image_list": image_filenames,
"street_address": street_address,
"isa_list": [{"qid": isa.qid, "label": isa.label()} for isa in isa_items if isa],
"isa_list": [
{"qid": isa.qid, "label": isa.label()} for isa in isa_items if isa
],
"closed": item.closed(),
"inception": item.time_claim("P571"),
"p1619": item.time_claim("P1619"),
@ -849,8 +946,9 @@ def item_detail(item):
unsupported = isa_lookup.keys() & unsupported_relation_types
if unsupported:
d["unsupported_relation_types"] = [isa for isa in d["isa_list"]
if isa["qid"] in isa_lookup]
d["unsupported_relation_types"] = [
isa for isa in d["isa_list"] if isa["qid"] in isa_lookup
]
return d
@ -889,7 +987,7 @@ def wikidata_items(bounds, isa_filter=None):
}
isa_count.append(isa)
return {'items': items, 'isa_count': isa_count}
return {"items": items, "isa_count": isa_count}
def missing_wikidata_items(qids, lat, lon):
@ -924,12 +1022,13 @@ def missing_wikidata_items(qids, lat, lon):
return dict(items=items, isa_count=isa_count)
def isa_incremental_search(search_terms):
en_label = func.jsonb_extract_path_text(model.Item.labels, "en", "value")
q = model.Item.query.filter(
model.Item.claims.has_key("P1282"),
en_label.ilike(f"%{search_terms}%"),
func.length(en_label) < 20,
model.Item.claims.has_key("P1282"),
en_label.ilike(f"%{search_terms}%"),
func.length(en_label) < 20,
)
print(q.statement.compile(compile_kwargs={"literal_binds": True}))
@ -943,13 +1042,18 @@ def isa_incremental_search(search_terms):
ret.append(cur)
return ret
def get_place_items(osm_type, osm_id):
src_id = osm_id * {'way': 1, 'relation': -1}[osm_type]
q = (model.Item.query
.join(model.ItemLocation)
.join(model.Polygon, func.ST_Covers(model.Polygon.way, model.ItemLocation.location))
.filter(model.Polygon.src_id == src_id))
def get_place_items(osm_type, osm_id):
src_id = osm_id * {"way": 1, "relation": -1}[osm_type]
q = (
model.Item.query.join(model.ItemLocation)
.join(
model.Polygon,
func.ST_Covers(model.Polygon.way, model.ItemLocation.location),
)
.filter(model.Polygon.src_id == src_id)
)
# sql = q.statement.compile(compile_kwargs={"literal_binds": True})
item_count = q.count()