owl-map/web_view.py

457 lines
13 KiB
Python
Executable file

#!/usr/bin/python3
from flask import Flask, render_template, request, jsonify, redirect, url_for
from sqlalchemy import func
from sqlalchemy.orm import selectinload
from matcher import nominatim, model, database, commons, wikidata, wikidata_api
from collections import Counter
from time import time
import GeoIP
srid = 4326
app = Flask(__name__)
app.debug = True
DB_URL = "postgresql:///matcher"
database.init_db(DB_URL)
entity_keys = {"labels", "sitelinks", "aliases", "claims", "descriptions", "lastrevid"}
property_map = [
("P238", ["iata"], "IATA airport code"),
("P239", ["icao"], "ICAO airport code"),
("P240", ["faa", "ref"], "FAA airport code"),
# ('P281', ['addr:postcode', 'postal_code'], 'postal code'),
("P296", ["ref", "ref:train", "railway:ref"], "station code"),
("P300", ["ISO3166-2"], "ISO 3166-2 code"),
("P359", ["ref:rce"], "Rijksmonument ID"),
("P590", ["ref:gnis", "GNISID", "gnis:id", "gnis:feature_id"], "USGS GNIS ID"),
("P649", ["ref:nrhp"], "NRHP reference number"),
("P722", ["uic_ref"], "UIC station code"),
("P782", ["ref"], "LAU (local administrative unit)"),
("P836", ["ref:gss"], "UK Government Statistical Service code"),
("P856", ["website", "contact:website", "url"], "website"),
("P882", ["nist:fips_code"], "FIPS 6-4 (US counties)"),
("P901", ["ref:fips"], "FIPS 10-4 (countries and regions)"),
# A UIC id can be a IBNR, but not every IBNR is an UIC id
("P954", ["uic_ref"], "IBNR ID"),
("P981", ["ref:woonplaatscode"], "BAG code for Dutch residencies"),
("P1216", ["HE_ref"], "National Heritage List for England number"),
("P2253", ["ref:edubase"], "EDUBase URN"),
("P2815", ["esr:user", "ref", "ref:train"], "ESR station code"),
("P3425", ["ref", "ref:SIC"], "Natura 2000 site ID"),
("P3562", ["seamark:light:reference"], "Admiralty number"),
(
"P4755",
["ref", "ref:train", "ref:crs", "crs", "nat_ref"],
"UK railway station code",
),
("P4803", ["ref", "ref:train"], "Amtrak station code"),
("P6082", ["nycdoitt:bin"], "NYC Building Identification Number"),
("P5086", ["ref"], "FIPS 5-2 alpha code (US states)"),
("P5087", ["ref:fips"], "FIPS 5-2 numeric code (US states)"),
("P5208", ["ref:bag"], "BAG building ID for Dutch buildings"),
]
@app.teardown_appcontext
def shutdown_session(exception=None):
database.session.remove()
def check_for_tagged_qids(qids):
tagged = set()
for qid in qids:
for cls in model.Point, model.Polygon, model.Line:
q = cls.query.filter(cls.tags["wikidata"] == qid)
print(q)
if q.count():
tagged.add(qid)
break
return tagged
def check_for_tagged_qid(qid):
return any(
database.session.query(
cls.query.filter(
cls.tags.has_key("wikidata"), cls.tags["wikidata"] == qid
).exists()
).scalar()
for cls in (model.Point, model.Polygon, model.Line)
)
def make_envelope(bbox):
west, south, east, north = [float(i) for i in bbox.split(",")]
return func.ST_MakeEnvelope(west, south, east, north, srid)
def get_osm_with_wikidata_tag(bbox):
db_bbox = make_envelope(bbox)
tagged = []
seen = set()
for cls in (model.Point, model.Polygon, model.Line):
q = cls.query.filter(
cls.tags.has_key("wikidata"),
func.ST_Intersects(db_bbox, cls.way),
func.ST_Area(cls.way) < 20 * func.ST_Area(db_bbox),
)
for osm in q:
if osm.identifier in seen:
continue
seen.add(osm.identifier)
name = osm.name
if not name:
if "addr:housename" in osm.tags:
name = osm.tags["addr:housename"]
else:
name = "[no label]"
tagged.append(
{
"identifier": osm.identifier,
"id": osm.id,
"type": osm.type,
"url": osm.osm_url,
"geojson": osm.geojson(),
"centroid": list(osm.get_centroid()),
"name": name,
"wikidata": osm.tags["wikidata"],
}
)
return tagged
def get_items_in_bbox(bbox):
db_bbox = make_envelope(bbox)
q = (
model.Item.query.join(model.ItemLocation)
.filter(func.ST_Covers(db_bbox, model.ItemLocation.location))
.options(selectinload(model.Item.locations))
)
return q
def get_markers(all_items):
items = []
for item in all_items:
if "en" not in item.labels:
continue
locations = [list(i.get_lat_lon()) for i in item.locations]
image_filenames = item.get_claim("P18")
item = {
"qid": item.qid,
"label": item.label(),
"description": item.description(),
"markers": locations,
"image_list": image_filenames,
"street_address": item.get_claim("P6375"),
"isa_list": [v["id"] for v in item.get_claim("P31")],
}
items.append(item)
return items
def get_user_location():
gi = GeoIP.open("/home/edward/lib/data/GeoIPCity.dat", GeoIP.GEOIP_STANDARD)
remote_ip = request.remote_addr
gir = gi.record_by_addr(remote_ip)
if not gir:
return
lat, lon = gir["latitude"], gir["longitude"]
return (lat, lon)
@app.route("/")
def redirect_from_root():
return redirect(url_for("map_start_page"))
@app.route("/index")
def index_page():
return render_template("index.html")
@app.route("/identifier")
def identifier_index():
return render_template("identifier_index.html", property_map=property_map)
@app.route("/commons/<filename>")
def get_commons_image(filename):
detail = commons.image_detail([filename], thumbheight=600, thumbwidth=600)
image = detail[filename]
return redirect(image["thumburl"])
@app.route("/identifier/<pid>")
def identifier_page(pid):
per_page = 10
page = int(request.args.get("page", 1))
property_dict = {pid: (osm_keys, label) for pid, osm_keys, label in property_map}
osm_keys, label = property_dict[pid]
wd = model.Item.query.filter(model.Item.claims.has_key(pid))
total = wd.count()
start = per_page * (page - 1)
items = wd.all()[start : per_page * page]
qids = [item.qid for item in items]
print(qids)
# pred = None
# values = set()
# for item in items:
# values |= set(item.get_claim(pid))
#
# for key in osm_keys:
# if key == 'ref':
# continue
# if pred is None:
# pred = model.Point.tags[key].in_(values)
# else:
# pred |= model.Point.tags[key].in_(values)
#
osm_points = {}
for qid in qids:
osm_points[qid] = model.Point.query.filter(
model.Point.tags["wikidata"] == qid
).all()
osm_total = len(osm_points)
return render_template(
"identifier_page.html",
pid=pid,
osm_keys=osm_keys,
label=label,
items=items,
total=total,
osm_total=osm_total,
osm_points=osm_points,
)
@app.route("/map/<int:zoom>/<float(signed=True):lat>/<float(signed=True):lng>")
def map_location(zoom, lat, lng):
t = int(time())
return render_template("map.html", zoom=zoom, lat=lat, lng=lng, time=t)
@app.route("/map")
def map_start_page():
t = int(time())
location = get_user_location()
if not location:
return render_template("map.html", zoom=16, lat=None, lng=None, time=t)
lat, lng = location
return render_template("map.html", zoom=16, lat=lat, lng=lng, time=t)
@app.route("/search/map")
def search_map_page():
user_lat, user_lon = get_user_location() or (None, None)
q = request.args.get("q")
if not q:
return render_template("map.html", user_lat=user_lat, user_lon=user_lon)
hits = nominatim.lookup(q)
for hit in hits:
if "geotext" in hit:
del hit["geotext"]
bbox = [hit["boundingbox"] for hit in hits]
return render_template(
"search_map.html",
hits=hits,
bbox_list=bbox,
user_lat=user_lat,
user_lon=user_lon,
)
@app.route("/search")
def search_page():
q = request.args.get("q")
if not q:
return render_template("search.html", hits=None, bbox_list=None)
hits = nominatim.lookup(q)
for hit in hits:
if "geotext" in hit:
del hit["geotext"]
bbox = [hit["boundingbox"] for hit in hits]
return render_template("search.html", hits=hits, bbox_list=bbox)
def get_isa_count(items):
isa_count = Counter()
for item in items:
isa_list = item.get_claim("P31")
for isa in isa_list:
isa_count[isa["id"]] += 1
return isa_count.most_common()
def get_and_save_item(qid):
entity = wikidata_api.get_entity(qid)
if entity["id"] != qid:
print(f'redirect {qid} -> {entity["id"]}')
return
coords = wikidata.get_entity_coords(entity["claims"])
item_id = int(qid[1:])
obj = {k: v for k, v in entity.items() if k in entity_keys}
try:
item = model.Item(item_id=item_id, **obj)
except TypeError:
print(qid)
print(f'{entity["pageid"]=} {entity["ns"]=} {entity["type"]=}')
print(entity.keys())
raise
item.locations = model.location_objects(coords)
database.session.add(item)
return item
@app.route("/api/1/items")
def api_wikidata_items():
bounds = request.args.get("bounds")
t0 = time()
q = get_items_in_bbox(bounds)
db_items = q.all()
items = get_markers(db_items)
counts = get_isa_count(db_items)
isa_ids = [qid[1:] for qid, count in counts]
isa_items = {
isa.qid: isa for isa in model.Item.query.filter(model.Item.item_id.in_(isa_ids))
}
isa_count = []
for qid, count in counts:
item = isa_items.get(qid)
if not item:
item = get_and_save_item(qid)
label = item.label() if item else "[missing]"
isa = {
"qid": qid,
"count": count,
"label": label,
}
isa_count.append(isa)
t1 = time() - t0
print(f"wikidata: {t1} seconds")
return jsonify(success=True, items=items, isa_count=isa_count, duration=t1)
@app.route("/api/1/osm")
def api_osm_objects():
bounds = request.args.get("bounds")
t0 = time()
objects = get_osm_with_wikidata_tag(bounds)
t1 = time() - t0
print(f"OSM: {t1} seconds")
return jsonify(success=True, objects=objects, duration=t1)
skip_isa = {13226383, 16686448, 2221906}
skip_tags = {"Key:addr:street"}
def get_item_tags(item):
isa_list = [v["numeric-id"] for v in item.get_claim("P31")]
isa_items = model.Item.query.filter(model.Item.item_id.in_(isa_list)).all()
osm_list = set()
seen = set(isa_list) | skip_isa
while isa_items:
isa = isa_items.pop()
osm = [v for v in isa.get_claim("P1282") if v not in skip_tags]
osm_list.update(osm)
subclass_of = [v["numeric-id"] for v in isa.get_claim("P279")]
isa_list = [isa_id for isa_id in subclass_of if isa_id not in seen]
seen.update(isa_list)
isa_items += model.Item.query.filter(model.Item.item_id.in_(isa_list)).all()
return sorted(osm_list)
@app.route("/api/1/item/Q<int:item_id>/tags")
def api_get_item_tags(item_id):
t0 = time()
item = model.Item.query.get(item_id)
osm_list = get_item_tags(item)
t1 = time() - t0
return jsonify(success=True, qid=item.qid, tag_or_key_list=osm_list, duration=t1)
@app.route("/api/1/missing")
def api_missing_wikidata_items():
qids_arg = request.args.get("qids")
qids = qids_arg.split(",")
if not qids or not qids[0]:
return jsonify(success=True, items=[], isa_count=[])
db_items = []
for qid in qids:
item = model.Item.query.get(qid[1:])
if not item:
item = get_and_save_item(qid)
db_items.append(item)
items = get_markers(db_items)
counts = get_isa_count(db_items)
isa_ids = [qid[1:] for qid, count in counts]
isa_items = {
isa.qid: isa for isa in model.Item.query.filter(model.Item.item_id.in_(isa_ids))
}
isa_count = []
for qid, count in counts:
item = isa_items.get(qid)
if not item:
item = get_and_save_item(qid)
label = item.label() if item else "[missing]"
isa = {
"qid": qid,
"count": count,
"label": label,
}
isa_count.append(isa)
return jsonify(success=True, items=items, isa_count=isa_count)
@app.route("/api/1/search")
def api_search():
q = request.args["q"]
hits = nominatim.lookup(q)
for hit in hits:
if "geotext" in hit:
del hit["geotext"]
hit["name"] = nominatim.get_hit_name(hit)
return jsonify(hits=hits)
if __name__ == "__main__":
app.run(host="0.0.0.0")