diff --git a/config/default.py b/config/default.py
new file mode 100644
index 0000000..28f5052
--- /dev/null
+++ b/config/default.py
@@ -0,0 +1,25 @@
+"""Sample config."""
+
+
+ID_TAGGING_SCHEMA_DIR = "/var/lib/data/id-tagging-schema"
+ID_PRESET_DIR = "/var/lib/data/id-tagging-schema/data/presets/"
+GEOIP_DATA = "/var/lib/data/GeoIP/GeoIPCity.dat"
+GEOLITE2 = "/var/lib/data/GeoLite2/GeoLite2-City.mmdb"
+
+CLIENT_KEY = ""
+CLIENT_SECRET = ""
+
+SECRET_KEY = ""
+
+DEFAULT_COMMENT = "+wikidata"
+
+ADMIN_NAME = ""
+ADMIN_EMAIL = ""
+ADMINS = [ADMIN_EMAIL]
+
+SMTP_HOST = "localhost"
+MAIL_FROM = "osm-wikidata@localhost"
+
+ERROR_MAIL = True
+
+PROPAGATE_EXCEPTIONS = False
diff --git a/matcher/__init__.py b/matcher/__init__.py
new file mode 100644
index 0000000..5e9706c
--- /dev/null
+++ b/matcher/__init__.py
@@ -0,0 +1,13 @@
+"""Match OSM and Wikidata items."""
+
+
+CallParams = dict[str, str | int]
+
+user_agent = (
+ "osm-wikidata/0.1 (https://github.com/EdwardBetts/osm-wikidata; edward@4angle.com)"
+)
+
+
+def user_agent_headers() -> dict[str, str]:
+ """User-Agent headers."""
+ return {"User-Agent": user_agent}
diff --git a/matcher/api.py b/matcher/api.py
index e2beb7c..2f8adda 100644
--- a/matcher/api.py
+++ b/matcher/api.py
@@ -1,19 +1,21 @@
-from sqlalchemy import func, or_, and_, text
+import json
+import os.path
+import re
+from collections import Counter, defaultdict
+
+from flask import current_app, g
+from sqlalchemy import and_, func, or_, text
+from sqlalchemy.dialects import postgresql
from sqlalchemy.orm import selectinload
from sqlalchemy.sql import select
-from sqlalchemy.sql.expression import literal, union, cast, column
+from sqlalchemy.sql.expression import cast, column, literal, union
from sqlalchemy.types import Float
-from sqlalchemy.dialects import postgresql
-from matcher.planet import point, line, polygon
-from matcher import model, database, wikidata_api, wikidata
-from collections import Counter, defaultdict
-from flask import g, current_app
-import re
-import os.path
-import json
+
+from matcher import database, model, wikidata, wikidata_api
+from matcher.planet import line, point, polygon
srid = 4326
-re_point = re.compile(r'^POINT\((.+) (.+)\)$')
+re_point = re.compile(r"^POINT\((.+) (.+)\)$")
entity_keys = {"labels", "sitelinks", "aliases", "claims", "descriptions", "lastrevid"}
tag_prefixes = {
@@ -37,9 +39,9 @@ skip_tags = {
"Key:brand",
}
+
def get_country_iso3166_1(lat, lon):
- """
- For a given lat/lon return a set of ISO country codes.
+ """For a given lat/lon return a set of ISO country codes.
Also cache the country code in the global object.
@@ -48,8 +50,9 @@ def get_country_iso3166_1(lat, lon):
point = func.ST_SetSRID(func.ST_MakePoint(lon, lat), srid)
alpha2_codes = set()
- q = model.Polygon.query.filter(func.ST_Covers(model.Polygon.way, point),
- model.Polygon.admin_level == "2")
+ q = model.Polygon.query.filter(
+ func.ST_Covers(model.Polygon.way, point), model.Polygon.admin_level == "2"
+ )
for country in q:
alpha2 = country.tags.get("ISO3166-1")
if not alpha2:
@@ -60,22 +63,23 @@ def get_country_iso3166_1(lat, lon):
return alpha2_codes
-def is_street_number_first(lat, lon):
+def is_street_number_first(lat: float, lon: float) -> bool:
+ """Is lat/lon within a country that puts number first in a street address."""
if lat is None or lon is None:
return True
alpha2 = get_country_iso3166_1(lat, lon)
# Incomplete list of countries that put street number first.
alpha2_number_first = {
- 'GB', # United Kingdom
- 'IE', # Ireland
- 'US', # United States
- 'MX', # Mexico
- 'CA', # Canada
- 'FR', # France
- 'AU', # Australia
- 'NZ', # New Zealand
- 'ZA', # South Africa
+ "GB", # United Kingdom
+ "IE", # Ireland
+ "US", # United States
+ "MX", # Mexico
+ "CA", # Canada
+ "FR", # France
+ "AU", # Australia
+ "NZ", # New Zealand
+ "ZA", # South Africa
}
return bool(alpha2_number_first & alpha2)
@@ -84,22 +88,26 @@ def is_street_number_first(lat, lon):
def make_envelope(bounds):
return func.ST_MakeEnvelope(*bounds, srid)
+
def get_bbox_centroid(bbox):
bbox = make_envelope(bbox)
centroid = database.session.query(func.ST_AsText(func.ST_Centroid(bbox))).scalar()
return reversed(re_point.match(centroid).groups())
+
def make_envelope_around_point(lat, lon, distance):
conn = database.session.connection()
p = func.ST_MakePoint(lon, lat)
- s = select([
- func.ST_AsText(func.ST_Project(p, distance, func.radians(0))),
- func.ST_AsText(func.ST_Project(p, distance, func.radians(90))),
- func.ST_AsText(func.ST_Project(p, distance, func.radians(180))),
- func.ST_AsText(func.ST_Project(p, distance, func.radians(270))),
- ])
+ s = select(
+ [
+ func.ST_AsText(func.ST_Project(p, distance, func.radians(0))),
+ func.ST_AsText(func.ST_Project(p, distance, func.radians(90))),
+ func.ST_AsText(func.ST_Project(p, distance, func.radians(180))),
+ func.ST_AsText(func.ST_Project(p, distance, func.radians(270))),
+ ]
+ )
row = conn.execute(s).fetchone()
coords = [[float(v) for v in re_point.match(i).groups()] for i in row]
@@ -110,49 +118,64 @@ def make_envelope_around_point(lat, lon, distance):
return func.ST_MakeEnvelope(west, south, east, north, srid)
-def drop_way_area(tags):
- """ Remove the way_area field from a tags dict. """
+
+def drop_way_area(tags: dict[str, str]) -> dict[str, str]:
+ """Remove the way_area field from a tags dict."""
if "way_area" in tags:
del tags["way_area"]
return tags
+
def get_part_of(table_name, src_id, bbox):
- table_map = {'point': point, 'line': line, 'polygon': polygon}
+ table_map = {"point": point, "line": line, "polygon": polygon}
table_alias = table_map[table_name].alias()
- s = (select([polygon.c.osm_id,
- polygon.c.tags,
- func.ST_Area(func.ST_Collect(polygon.c.way))]).
- where(and_(func.ST_Intersects(bbox, polygon.c.way),
- func.ST_Covers(polygon.c.way, table_alias.c.way),
- table_alias.c.osm_id == src_id,
- polygon.c.tags.has_key("name"),
- or_(
- polygon.c.tags.has_key("landuse"),
- polygon.c.tags.has_key("amenity"),
- ))).
- group_by(polygon.c.osm_id, polygon.c.tags))
+ s = (
+ select(
+ [
+ polygon.c.osm_id,
+ polygon.c.tags,
+ func.ST_Area(func.ST_Collect(polygon.c.way)),
+ ]
+ )
+ .where(
+ and_(
+ func.ST_Intersects(bbox, polygon.c.way),
+ func.ST_Covers(polygon.c.way, table_alias.c.way),
+ table_alias.c.osm_id == src_id,
+ polygon.c.tags.has_key("name"),
+ or_(
+ polygon.c.tags.has_key("landuse"),
+ polygon.c.tags.has_key("amenity"),
+ ),
+ )
+ )
+ .group_by(polygon.c.osm_id, polygon.c.tags)
+ )
conn = database.session.connection()
- return [{
- "type": "way" if osm_id > 0 else "relation",
- "id": abs(osm_id),
- "tags": tags,
- "area": area,
- } for osm_id, tags, area in conn.execute(s)]
+ return [
+ {
+ "type": "way" if osm_id > 0 else "relation",
+ "id": abs(osm_id),
+ "tags": tags,
+ "area": area,
+ }
+ for osm_id, tags, area in conn.execute(s)
+ ]
-def get_and_save_item(qid):
- """ Download an item from Wikidata and cache it in the database. """
+def get_and_save_item(qid: str) -> model.Item | None:
+ """Download an item from Wikidata and cache it in the database."""
entity = wikidata_api.get_entity(qid)
entity_qid = entity["id"]
if entity_qid != qid:
- print(f'redirect {qid} -> {entity_qid}')
+ print(f"redirect {qid} -> {entity_qid}")
item = model.Item.query.get(entity_qid[1:])
return item
if "claims" not in entity:
- return
+ return None
coords = wikidata.get_entity_coords(entity["claims"])
item_id = int(qid[1:])
@@ -171,8 +194,9 @@ def get_and_save_item(qid):
return item
-def get_isa_count(items):
- isa_count = Counter()
+def get_isa_count(items: list[model.Item]) -> list[tuple[int, int]]:
+ """List of IsA counts."""
+ isa_count: Counter[int] = Counter()
for item in items:
if not item:
continue
@@ -199,13 +223,11 @@ def get_items_in_bbox(bbox):
def get_osm_with_wikidata_tag(bbox, isa_filter=None):
- bbox_str = ','.join(str(v) for v in bbox)
+ bbox_str = ",".join(str(v) for v in bbox)
extra_sql = ""
if isa_filter:
- q = (
- model.Item.query.join(model.ItemLocation)
- .filter(func.ST_Covers(make_envelope(bbox),
- model.ItemLocation.location))
+ q = model.Item.query.join(model.ItemLocation).filter(
+ func.ST_Covers(make_envelope(bbox), model.ItemLocation.location)
)
q = add_isa_filter(q, isa_filter)
qids = [isa.qid for isa in q]
@@ -216,7 +238,8 @@ def get_osm_with_wikidata_tag(bbox, isa_filter=None):
extra_sql += f" AND tags -> 'wikidata' in ({qid_list})"
# easier than building this query with SQLAlchemy
- sql = f'''
+ sql = (
+ f"""
SELECT tbl, osm_id, tags, ARRAY[ST_Y(centroid), ST_X(centroid)], geojson
FROM (
SELECT 'point' as tbl, osm_id, tags, ST_AsText(ST_Centroid(way)) as centroid, ST_AsGeoJSON(way) as geojson
@@ -235,24 +258,29 @@ UNION
HAVING st_area(st_collect(way)) < 20 * st_area(ST_MakeEnvelope({bbox_str}, {srid}))
) as anon
WHERE tags ? 'wikidata'
-''' + extra_sql
+"""
+ + extra_sql
+ )
conn = database.session.connection()
result = conn.execute(text(sql))
print(sql)
- point_sql = f'''
+ point_sql = (
+ f"""
SELECT 'point' as tbl, osm_id, tags, ST_AsText(ST_Centroid(way)) as centroid, ST_AsGeoJSON(way) as geojson
FROM planet_osm_point
WHERE ST_Intersects(ST_MakeEnvelope({bbox_str}, {srid}), way) and tags ? 'wikidata'
-''' + extra_sql
+"""
+ + extra_sql
+ )
print("point")
print(point_sql)
tagged = []
for tbl, osm_id, tags, centroid, geojson in result:
- if tbl == 'point':
+ if tbl == "point":
osm_type = "node"
else:
osm_type = "way" if osm_id > 0 else "relation"
@@ -260,15 +288,17 @@ WHERE tags ? 'wikidata'
name = tags.get("name") or tags.get("addr:housename") or "[no label]"
- tagged.append({
- "identifier": f"{osm_type}/{osm_id}",
- "id": osm_id,
- "type": osm_type,
- "geojson": json.loads(geojson),
- "centroid": centroid,
- "name": name,
- "wikidata": tags["wikidata"],
- })
+ tagged.append(
+ {
+ "identifier": f"{osm_type}/{osm_id}",
+ "id": osm_id,
+ "type": osm_type,
+ "geojson": json.loads(geojson),
+ "centroid": centroid,
+ "name": name,
+ "wikidata": tags["wikidata"],
+ }
+ )
return tagged
@@ -310,11 +340,13 @@ def get_item_tags(item):
isa, isa_path = isa_items.pop()
if not isa:
continue
- isa_path = isa_path + [{'qid': isa.qid, 'label': isa.label()}]
+ isa_path = isa_path + [{"qid": isa.qid, "label": isa.label()}]
osm = [v for v in isa.get_claim("P1282") if v not in skip_tags]
- osm += [extra.tag_or_key
- for extra in model.ItemExtraKeys.query.filter_by(item_id=isa.item_id)]
+ osm += [
+ extra.tag_or_key
+ for extra in model.ItemExtraKeys.query.filter_by(item_id=isa.item_id)
+ ]
for i in osm:
osm_list[i].append(isa_path[:])
@@ -369,14 +401,16 @@ def get_tags_for_isa_item(item):
isa, isa_path = isa_items.pop()
if not isa:
continue
- isa_path = isa_path + [{'qid': isa.qid, 'label': isa.label()}]
+ isa_path = isa_path + [{"qid": isa.qid, "label": isa.label()}]
if isa.item_id not in items_checked_done:
- items_checked.append({'qid': isa.qid, 'label': isa.label()})
+ items_checked.append({"qid": isa.qid, "label": isa.label()})
items_checked_done.add(isa.item_id)
osm = [v for v in isa.get_claim("P1282") if v not in skip_tags]
- osm += [extra.tag_or_key
- for extra in model.ItemExtraKeys.query.filter_by(item_id=isa.item_id)]
+ osm += [
+ extra.tag_or_key
+ for extra in model.ItemExtraKeys.query.filter_by(item_id=isa.item_id)
+ ]
for i in osm:
osm_list[i].append(isa_path[:])
@@ -403,34 +437,31 @@ def get_tags_for_isa_item(item):
seen.update(isa_list)
isa_items += [(isa, isa_path) for isa in get_items(isa_list)]
return {
- 'tags': {key: list(values) for key, values in osm_list.items()},
- 'checked': items_checked,
+ "tags": {key: list(values) for key, values in osm_list.items()},
+ "checked": items_checked,
}
def add_isa_filter(q, isa_qids):
-
q_subclass = database.session.query(model.Item.qid).filter(
func.jsonb_path_query_array(
model.Item.claims,
- '$.P279[*].mainsnak.datavalue.value.id',
- ).bool_op('?|')(list(isa_qids))
+ "$.P279[*].mainsnak.datavalue.value.id",
+ ).bool_op("?|")(list(isa_qids))
)
subclass_qid = {qid for qid, in q_subclass.all()}
isa = func.jsonb_path_query_array(
model.Item.claims,
- '$.P31[*].mainsnak.datavalue.value.id',
- ).bool_op('?|')
+ "$.P31[*].mainsnak.datavalue.value.id",
+ ).bool_op("?|")
return q.filter(isa(list(isa_qids | subclass_qid)))
def wikidata_items_count(bounds, isa_filter=None):
-
- q = (
- model.Item.query.join(model.ItemLocation)
- .filter(func.ST_Covers(make_envelope(bounds), model.ItemLocation.location))
+ q = model.Item.query.join(model.ItemLocation).filter(
+ func.ST_Covers(make_envelope(bounds), model.ItemLocation.location)
)
if isa_filter:
@@ -440,12 +471,12 @@ def wikidata_items_count(bounds, isa_filter=None):
return q.count()
+
def wikidata_isa_counts(bounds, isa_filter=None):
db_bbox = make_envelope(bounds)
- q = (
- model.Item.query.join(model.ItemLocation)
- .filter(func.ST_Covers(db_bbox, model.ItemLocation.location))
+ q = model.Item.query.join(model.ItemLocation).filter(
+ func.ST_Covers(db_bbox, model.ItemLocation.location)
)
if isa_filter:
@@ -474,12 +505,13 @@ def wikidata_isa_counts(bounds, isa_filter=None):
return isa_count
+
def get_tag_filter(tags, tag_list):
tag_filter = []
for tag_or_key in tag_list:
if tag_or_key.startswith("Key:"):
key = tag_or_key[4:]
- tag_filter.append(and_(tags.has_key(key), tags[key] != 'no'))
+ tag_filter.append(and_(tags.has_key(key), tags[key] != "no"))
for prefix in tag_prefixes:
tag_filter.append(tags.has_key(f"{prefix}:{key}"))
@@ -495,11 +527,11 @@ def get_tag_filter(tags, tag_list):
def get_preset_translations():
app = current_app
country_language = {
- 'AU': 'en-AU', # Australia
- 'GB': 'en-GB', # United Kingdom
- 'IE': 'en-GB', # Ireland
- 'IN': 'en-IN', # India
- 'NZ': 'en-NZ', # New Zealand
+ "AU": "en-AU", # Australia
+ "GB": "en-GB", # United Kingdom
+ "IE": "en-GB", # Ireland
+ "IN": "en-IN", # India
+ "NZ": "en-NZ", # New Zealand
}
ts_dir = app.config["ID_TAGGING_SCHEMA_DIR"]
translation_dir = os.path.join(ts_dir, "dist", "translations")
@@ -520,13 +552,14 @@ def get_preset_translations():
return {}
+
def get_presets_from_tags(ending, tags):
translations = get_preset_translations()
found = []
for k, v in tags.items():
- if k == 'amenity' and v == 'clock' and tags.get('display') == 'sundial':
+ if k == "amenity" and v == "clock" and tags.get("display") == "sundial":
tag_or_key = f"Tag:{k}={v}"
found.append({"tag_or_key": tag_or_key, "name": "Sundial"})
continue
@@ -604,8 +637,7 @@ def address_node_label(tags):
def get_address_nodes_within_building(osm_id, bbox_list):
q = model.Point.query.filter(
polygon.c.osm_id == osm_id,
- or_(*[func.ST_Intersects(bbox, model.Point.way)
- for bbox in bbox_list]),
+ or_(*[func.ST_Intersects(bbox, model.Point.way) for bbox in bbox_list]),
func.ST_Covers(polygon.c.way, model.Point.way),
model.Point.tags.has_key("addr:street"),
model.Point.tags.has_key("addr:housenumber"),
@@ -615,8 +647,14 @@ def get_address_nodes_within_building(osm_id, bbox_list):
def osm_display_name(tags):
- keys = ("bridge:name", "tunnel:name", "lock_name", "name", "addr:housename",
- "inscription")
+ keys = (
+ "bridge:name",
+ "tunnel:name",
+ "lock_name",
+ "name",
+ "addr:housename",
+ "inscription",
+ )
for key in keys:
if key in tags:
return tags[key]
@@ -625,6 +663,7 @@ def osm_display_name(tags):
def street_address_in_tags(tags):
return "addr:housenumber" in tags and "addr:street" in tags
+
def find_osm_candidates(item, limit=80, max_distance=450, names=None):
item_id = item.item_id
item_is_linear_feature = item.is_linear_feature()
@@ -637,51 +676,94 @@ def find_osm_candidates(item, limit=80, max_distance=450, names=None):
check_is_street_number_first(item.locations[0].get_lat_lon())
- bbox_list = [make_envelope_around_point(*loc.get_lat_lon(), max_distance)
- for loc in item.locations]
+ bbox_list = [
+ make_envelope_around_point(*loc.get_lat_lon(), max_distance)
+ for loc in item.locations
+ ]
null_area = cast(None, Float)
- dist = column('dist')
- tags = column('tags', postgresql.HSTORE)
+ dist = column("dist")
+ tags = column("tags", postgresql.HSTORE)
tag_list = get_item_tags(item)
# tag_filters = get_tag_filter(point.c.tags, tag_list)
# print(tag_filters)
- s_point = (select([literal('point').label('t'), point.c.osm_id, point.c.tags.label('tags'),
- func.min(func.ST_DistanceSphere(model.ItemLocation.location, point.c.way)).label('dist'),
- func.ST_AsText(point.c.way),
- func.ST_AsGeoJSON(point.c.way),
- null_area]).
- where(and_(
- or_(*[func.ST_Intersects(bbox, point.c.way)
- for bbox in bbox_list]),
- model.ItemLocation.item_id == item_id,
- or_(*get_tag_filter(point.c.tags, tag_list)))).
- group_by(point.c.osm_id, point.c.tags, point.c.way))
+ s_point = (
+ select(
+ [
+ literal("point").label("t"),
+ point.c.osm_id,
+ point.c.tags.label("tags"),
+ func.min(
+ func.ST_DistanceSphere(model.ItemLocation.location, point.c.way)
+ ).label("dist"),
+ func.ST_AsText(point.c.way),
+ func.ST_AsGeoJSON(point.c.way),
+ null_area,
+ ]
+ )
+ .where(
+ and_(
+ or_(*[func.ST_Intersects(bbox, point.c.way) for bbox in bbox_list]),
+ model.ItemLocation.item_id == item_id,
+ or_(*get_tag_filter(point.c.tags, tag_list)),
+ )
+ )
+ .group_by(point.c.osm_id, point.c.tags, point.c.way)
+ )
- s_line = (select([literal('line').label('t'), line.c.osm_id, line.c.tags.label('tags'),
- func.min(func.ST_DistanceSphere(model.ItemLocation.location, line.c.way)).label('dist'),
- func.ST_AsText(func.ST_Centroid(func.ST_Collect(line.c.way))),
- func.ST_AsGeoJSON(func.ST_Collect(line.c.way)),
- null_area]).
- where(and_(
- or_(*[func.ST_Intersects(bbox, line.c.way) for bbox in bbox_list]),
- model.ItemLocation.item_id == item_id,
- or_(*get_tag_filter(line.c.tags, tag_list)))).
- group_by(line.c.osm_id, line.c.tags))
+ s_line = (
+ select(
+ [
+ literal("line").label("t"),
+ line.c.osm_id,
+ line.c.tags.label("tags"),
+ func.min(
+ func.ST_DistanceSphere(model.ItemLocation.location, line.c.way)
+ ).label("dist"),
+ func.ST_AsText(func.ST_Centroid(func.ST_Collect(line.c.way))),
+ func.ST_AsGeoJSON(func.ST_Collect(line.c.way)),
+ null_area,
+ ]
+ )
+ .where(
+ and_(
+ or_(*[func.ST_Intersects(bbox, line.c.way) for bbox in bbox_list]),
+ model.ItemLocation.item_id == item_id,
+ or_(*get_tag_filter(line.c.tags, tag_list)),
+ )
+ )
+ .group_by(line.c.osm_id, line.c.tags)
+ )
- s_polygon = (select([literal('polygon').label('t'), polygon.c.osm_id, polygon.c.tags.label('tags'),
- func.min(func.ST_DistanceSphere(model.ItemLocation.location, polygon.c.way)).label('dist'),
- func.ST_AsText(func.ST_Centroid(func.ST_Collect(polygon.c.way))),
- func.ST_AsGeoJSON(func.ST_Collect(polygon.c.way)),
- func.ST_Area(func.ST_Collect(polygon.c.way))]).
- where(and_(
- or_(*[func.ST_Intersects(bbox, polygon.c.way) for bbox in bbox_list]),
- model.ItemLocation.item_id == item_id,
- or_(*get_tag_filter(polygon.c.tags, tag_list)))).
- group_by(polygon.c.osm_id, polygon.c.tags).
- having(func.ST_Area(func.ST_Collect(polygon.c.way)) < 20 * func.ST_Area(bbox_list[0])))
+ s_polygon = (
+ select(
+ [
+ literal("polygon").label("t"),
+ polygon.c.osm_id,
+ polygon.c.tags.label("tags"),
+ func.min(
+ func.ST_DistanceSphere(model.ItemLocation.location, polygon.c.way)
+ ).label("dist"),
+ func.ST_AsText(func.ST_Centroid(func.ST_Collect(polygon.c.way))),
+ func.ST_AsGeoJSON(func.ST_Collect(polygon.c.way)),
+ func.ST_Area(func.ST_Collect(polygon.c.way)),
+ ]
+ )
+ .where(
+ and_(
+ or_(*[func.ST_Intersects(bbox, polygon.c.way) for bbox in bbox_list]),
+ model.ItemLocation.item_id == item_id,
+ or_(*get_tag_filter(polygon.c.tags, tag_list)),
+ )
+ )
+ .group_by(polygon.c.osm_id, polygon.c.tags)
+ .having(
+ func.ST_Area(func.ST_Collect(polygon.c.way))
+ < 20 * func.ST_Area(bbox_list[0])
+ )
+ )
tables = ([] if item_is_linear_feature else [s_point]) + [s_line, s_polygon]
s = select([union(*tables).alias()]).where(dist < max_distance).order_by(dist)
@@ -695,10 +777,14 @@ def find_osm_candidates(item, limit=80, max_distance=450, names=None):
s = s.where(tags.has_key("name"))
if "Key:amenity" in tag_list:
- s = s.where(and_(tags["amenity"] != "bicycle_parking",
- tags["amenity"] != "bicycle_repair_station",
- tags["amenity"] != "atm",
- tags["amenity"] != "recycling"))
+ s = s.where(
+ and_(
+ tags["amenity"] != "bicycle_parking",
+ tags["amenity"] != "bicycle_repair_station",
+ tags["amenity"] != "atm",
+ tags["amenity"] != "recycling",
+ )
+ )
if limit:
s = s.limit(limit)
@@ -730,6 +816,8 @@ def find_osm_candidates(item, limit=80, max_distance=450, names=None):
shape = "area" if table == "polygon" else table
+ item_identifier_tags = item.get_identifiers_tags()
+
cur = {
"identifier": f"{osm_type}/{osm_id}",
"type": osm_type,
@@ -748,8 +836,9 @@ def find_osm_candidates(item, limit=80, max_distance=450, names=None):
part_of = []
for bbox in bbox_list:
- part_of += [i for i in get_part_of(table, src_id, bbox)
- if i["tags"]["name"] != name]
+ part_of += [
+ i for i in get_part_of(table, src_id, bbox) if i["tags"]["name"] != name
+ ]
if part_of:
cur["part_of"] = part_of
@@ -760,9 +849,9 @@ def find_osm_candidates(item, limit=80, max_distance=450, names=None):
return nearby
-def get_item(item_id):
- """ Retrieve a Wikidata item, either from the database or from Wikidata. """
+def get_item(item_id):
+ """Retrieve a Wikidata item, either from the database or from Wikidata."""
item = model.Item.query.get(item_id)
return item or get_and_save_item(f"Q{item_id}")
@@ -774,7 +863,7 @@ def get_item_street_addresses(item):
for claim in item.claims["P669"]:
qualifiers = claim.get("qualifiers")
- if not qualifiers or 'P670' not in qualifiers:
+ if not qualifiers or "P670" not in qualifiers:
continue
number = qualifiers["P670"][0]["datavalue"]["value"]
@@ -782,24 +871,26 @@ def get_item_street_addresses(item):
street = street_item.label()
for q in qualifiers["P670"]:
number = q["datavalue"]["value"]
- address = (f"{number} {street}"
- if g.street_number_first
- else f"{street} {number}")
+ address = (
+ f"{number} {street}" if g.street_number_first else f"{street} {number}"
+ )
street_address.append(address)
return street_address
+
def check_is_street_number_first(latlng):
g.street_number_first = is_street_number_first(*latlng)
+
def item_detail(item):
unsupported_relation_types = {
- 'Q194356', # wind farm
- 'Q2175765', # tram stop
+ "Q194356", # wind farm
+ "Q2175765", # tram stop
}
locations = [list(i.get_lat_lon()) for i in item.locations]
- if not hasattr(g, 'street_number_first'):
+ if not hasattr(g, "street_number_first"):
g.street_number_first = is_street_number_first(*locations[0])
image_filenames = item.get_claim("P18")
@@ -809,20 +900,24 @@ def item_detail(item):
heritage_designation = []
for v in item.get_claim("P1435"):
if not v:
- print('heritage designation missing:', item.qid)
+ print("heritage designation missing:", item.qid)
continue
heritage_designation_item = get_item(v["numeric-id"])
- heritage_designation.append({
- "qid": v["id"],
- "label": heritage_designation_item.label(),
- })
+ heritage_designation.append(
+ {
+ "qid": v["id"],
+ "label": heritage_designation_item.label(),
+ }
+ )
isa_items = [get_item(isa["numeric-id"]) for isa in item.get_isa()]
isa_lookup = {isa.qid: isa for isa in isa_items}
- wikipedia_links = [{"lang": site[:-4], "title": link["title"]}
- for site, link in sorted(item.sitelinks.items())
- if site.endswith("wiki") and len(site) < 8]
+ wikipedia_links = [
+ {"lang": site[:-4], "title": link["title"]}
+ for site, link in sorted(item.sitelinks.items())
+ if site.endswith("wiki") and len(site) < 8
+ ]
d = {
"qid": item.qid,
@@ -831,7 +926,9 @@ def item_detail(item):
"markers": locations,
"image_list": image_filenames,
"street_address": street_address,
- "isa_list": [{"qid": isa.qid, "label": isa.label()} for isa in isa_items if isa],
+ "isa_list": [
+ {"qid": isa.qid, "label": isa.label()} for isa in isa_items if isa
+ ],
"closed": item.closed(),
"inception": item.time_claim("P571"),
"p1619": item.time_claim("P1619"),
@@ -849,8 +946,9 @@ def item_detail(item):
unsupported = isa_lookup.keys() & unsupported_relation_types
if unsupported:
- d["unsupported_relation_types"] = [isa for isa in d["isa_list"]
- if isa["qid"] in isa_lookup]
+ d["unsupported_relation_types"] = [
+ isa for isa in d["isa_list"] if isa["qid"] in isa_lookup
+ ]
return d
@@ -889,7 +987,7 @@ def wikidata_items(bounds, isa_filter=None):
}
isa_count.append(isa)
- return {'items': items, 'isa_count': isa_count}
+ return {"items": items, "isa_count": isa_count}
def missing_wikidata_items(qids, lat, lon):
@@ -924,12 +1022,13 @@ def missing_wikidata_items(qids, lat, lon):
return dict(items=items, isa_count=isa_count)
+
def isa_incremental_search(search_terms):
en_label = func.jsonb_extract_path_text(model.Item.labels, "en", "value")
q = model.Item.query.filter(
- model.Item.claims.has_key("P1282"),
- en_label.ilike(f"%{search_terms}%"),
- func.length(en_label) < 20,
+ model.Item.claims.has_key("P1282"),
+ en_label.ilike(f"%{search_terms}%"),
+ func.length(en_label) < 20,
)
print(q.statement.compile(compile_kwargs={"literal_binds": True}))
@@ -943,13 +1042,18 @@ def isa_incremental_search(search_terms):
ret.append(cur)
return ret
-def get_place_items(osm_type, osm_id):
- src_id = osm_id * {'way': 1, 'relation': -1}[osm_type]
- q = (model.Item.query
- .join(model.ItemLocation)
- .join(model.Polygon, func.ST_Covers(model.Polygon.way, model.ItemLocation.location))
- .filter(model.Polygon.src_id == src_id))
+def get_place_items(osm_type, osm_id):
+ src_id = osm_id * {"way": 1, "relation": -1}[osm_type]
+
+ q = (
+ model.Item.query.join(model.ItemLocation)
+ .join(
+ model.Polygon,
+ func.ST_Covers(model.Polygon.way, model.ItemLocation.location),
+ )
+ .filter(model.Polygon.src_id == src_id)
+ )
# sql = q.statement.compile(compile_kwargs={"literal_binds": True})
item_count = q.count()
diff --git a/matcher/commons.py b/matcher/commons.py
index b65658c..8c537e9 100644
--- a/matcher/commons.py
+++ b/matcher/commons.py
@@ -1,18 +1,25 @@
-import requests
+"""Use mediawiki API to look up images on Wikimedia Commons."""
+
import urllib.parse
-from . import utils
+from typing import Any
+
+import requests
+
+from . import CallParams, utils
commons_start = "http://commons.wikimedia.org/wiki/Special:FilePath/"
commons_url = "https://www.wikidata.org/w/api.php"
page_size = 50
-def commons_uri_to_filename(uri):
+def commons_uri_to_filename(uri: str) -> str:
+ """Given the URI for a file on commons return the filename of the file."""
return urllib.parse.unquote(utils.drop_start(uri, commons_start))
-def api_call(params):
- call_params = {
+def api_call(params: CallParams) -> requests.Response:
+ """Call the Commons API."""
+ call_params: CallParams = {
"format": "json",
"formatversion": 2,
**params,
@@ -21,8 +28,11 @@ def api_call(params):
return requests.get(commons_url, params=call_params, timeout=5)
-def image_detail(filenames, thumbheight=None, thumbwidth=None):
- params = {
+def image_detail(
+ filenames: list[str], thumbheight: int | None = None, thumbwidth: int | None = None
+) -> dict[str, Any]:
+ """Detail for multiple images."""
+ params: CallParams = {
"action": "query",
"prop": "imageinfo",
"iiprop": "url",
@@ -32,7 +42,7 @@ def image_detail(filenames, thumbheight=None, thumbwidth=None):
if thumbwidth is not None:
params["iiurlwidth"] = thumbwidth
- images = {}
+ images: dict[str, Any] = {}
for cur in utils.chunk(filenames, page_size):
call_params = params.copy()
diff --git a/matcher/database.py b/matcher/database.py
new file mode 100644
index 0000000..e2e7398
--- /dev/null
+++ b/matcher/database.py
@@ -0,0 +1,39 @@
+"""Database functions."""
+
+import flask
+import sqlalchemy
+from sqlalchemy import create_engine, func
+from sqlalchemy.engine import reflection
+from sqlalchemy.orm import scoped_session, sessionmaker
+
+session: sqlalchemy.orm.scoping.scoped_session = scoped_session(sessionmaker())
+
+
+def init_db(db_url: str, echo: bool = False) -> None:
+ """Initialise database."""
+ session.configure(bind=get_engine(db_url, echo=echo))
+
+
+def get_engine(db_url: str, echo: bool = False) -> sqlalchemy.engine.base.Engine:
+ """Create an engine objcet."""
+ return create_engine(db_url, pool_recycle=3600, echo=echo)
+
+
+def get_tables() -> list[str]:
+ """Get a list of table names."""
+ tables: list[str] = reflection.Inspector.from_engine(session.bind).get_table_names()
+ return tables
+
+
+def init_app(app: flask.app.Flask, echo: bool = False) -> None:
+ """Initialise database connection within flask app."""
+ db_url = app.config["DB_URL"]
+ session.configure(bind=get_engine(db_url, echo=echo))
+
+ @app.teardown_appcontext
+ def shutdown_session(exception: Exception | None = None) -> None:
+ session.remove()
+
+
+def now_utc():
+ return func.timezone("utc", func.now())
diff --git a/matcher/edit.py b/matcher/edit.py
new file mode 100644
index 0000000..664b408
--- /dev/null
+++ b/matcher/edit.py
@@ -0,0 +1,71 @@
+from flask import g
+from . import user_agent_headers, database, osm_oauth, mail
+from .model import Changeset
+import requests
+import html
+
+really_save = True
+osm_api_base = "https://api.openstreetmap.org/api/0.6"
+
+
+def new_changeset(comment):
+ return f"""
+