diff --git a/web_view.py b/web_view.py index 6ac6dac..0b25724 100755 --- a/web_view.py +++ b/web_view.py @@ -4,15 +4,17 @@ import json import re +import typing from time import sleep, time import flask -import flask_login -import GeoIP +import flask_login # type: ignore +import GeoIP # type: ignore import lxml import maxminddb import requests import sqlalchemy +import werkzeug from requests_oauthlib import OAuth1Session from sqlalchemy import func from sqlalchemy.sql.expression import update @@ -34,6 +36,8 @@ from matcher.data import property_map # from werkzeug.debug.tbtools import get_current_traceback +StrDict = dict[str, typing.Any] + srid = 4326 re_point = re.compile(r"^POINT\((.+) (.+)\)$") @@ -56,7 +60,8 @@ re_qid = re.compile(r"^Q\d+$") @app.teardown_appcontext -def shutdown_session(exception=None): +def shutdown_session(exception=None) -> None: + """Shutdown session.""" database.session.remove() @@ -70,14 +75,15 @@ def dict_repr_values(d): return {key: repr(value) for key, value in d.items()} -def cors_jsonify(*args, **kwargs): +def cors_jsonify(*args, **kwargs) -> flask.Response: """Add CORS header to JSON.""" response = flask.jsonify(*args, **kwargs) response.headers["Access-Control-Allow-Origin"] = "*" return response -def check_for_tagged_qids(qids): +def check_for_tagged_qids(qids: list[str]) -> set[str]: + """Check OSM for existing wikidata tags for given QIDs.""" tagged = set() for qid in qids: for cls in model.Point, model.Polygon, model.Line: @@ -103,23 +109,27 @@ def check_for_tagged_qid(qid): def geoip_user_record(): gi = GeoIP.open(app.config["GEOIP_DATA"], GeoIP.GEOIP_STANDARD) - remote_ip = flask.request.get("ip", flask.request.remote_addr) + remote_ip = flask.request.args.get("ip", flask.request.remote_addr) return gi.record_by_addr(remote_ip) -def get_user_location(): +def get_user_location() -> StrDict | None: + """Get user location.""" remote_ip = flask.request.args.get("ip", flask.request.remote_addr) + assert remote_ip maxmind = maxminddb_reader.get(remote_ip) - return maxmind.get("location") if maxmind else None + return typing.cast(StrDict, maxmind.get("location")) if maxmind else None @app.route("/") -def redirect_from_root(): +def redirect_from_root() -> werkzeug.wrappers.Response: + """Redirect from root to map start page.""" return flask.redirect(flask.url_for("map_start_page")) @app.route("/index") -def index_page(): +def index_page() -> str: + """Index page.""" return flask.render_template("index.html") @@ -130,7 +140,8 @@ def get_username() -> str | None: @app.route("/isa/Q", methods=["GET", "POST"]) -def isa_page(item_id): +def isa_page(item_id: int) -> werkzeug.wrappers.Response | str: + """Return IsA page.""" item = api.get_item(item_id) if flask.request.method == "POST": @@ -140,19 +151,26 @@ def isa_page(item_id): database.session.commit() flask.flash("extra OSM tag/key added") - return flask.redirect(flask.url_for(flask.request.endpoint, item_id=item_id)) + endpoint = flask.request.endpoint + assert endpoint + return flask.redirect(flask.url_for(endpoint, item_id=item_id)) q = model.ItemExtraKeys.query.filter_by(item=item) extra = [e.tag_or_key for e in q] subclass_property = "P279" subclass_list = [] + assert item for s in item.get_claim(subclass_property): - subclass = api.get_item(s["numeric-id"]) + assert isinstance(s, dict) + subclass_item_id = s["numeric-id"] + assert subclass_item_id and isinstance(subclass_item_id, int) + subclass = api.get_item(subclass_item_id) + assert subclass subclass_list.append( { "qid": s["id"], - "item_id": s["numeric-id"], + "item_id": subclass_item_id, "label": subclass.label(), "description": subclass.description(), "isa_page_url": flask.url_for("isa_page", item_id=s["numeric-id"]), @@ -241,7 +259,8 @@ def identifier_page(pid): @app.route("/map") -def map_start_page(): +def map_start_page() -> werkzeug.wrappers.Response: + """Map start page.""" loc = get_user_location() if loc: @@ -278,6 +297,7 @@ def documentation_page() -> str: def search_page() -> str: """Search.""" loc = get_user_location() + assert loc q = flask.request.args.get("q") user = flask_login.current_user @@ -298,15 +318,16 @@ def search_page() -> str: @app.route("/map///") def map_location(zoom: int, lat: float, lon: float) -> str: + """Map location.""" qid = flask.request.args.get("item") isa_param = flask.request.args.get("isa") if qid: - api.get_item(qid[1:]) + api.get_item(int(qid[1:])) isa_list = [] if isa_param: for isa_qid in isa_param.split(";"): - isa = api.get_item(isa_qid[1:]) + isa = api.get_item(int(isa_qid[1:])) if not isa: continue cur = { @@ -330,7 +351,8 @@ def map_location(zoom: int, lat: float, lon: float) -> str: @app.route("/item/Q") -def lookup_item(item_id): +def lookup_item(item_id: int): + """Lookup item.""" item = api.get_item(item_id) if not item: # TODO: show nicer page for Wikidata item not found @@ -360,7 +382,8 @@ def lookup_item(item_id): @app.route("/search/map") -def search_map_page(): +def search_map_page() -> str: + """Search map page.""" user_lat, user_lon = get_user_location() or (None, None) q = flask.request.args.get("q") @@ -394,12 +417,13 @@ def read_isa_filter_param(): @app.route("/api/1/location") -def show_user_location(): +def show_user_location() -> werkzeug.wrappers.Response: + """User location.""" return cors_jsonify(get_user_location()) @app.route("/api/1/count") -def api_wikidata_items_count(): +def api_wikidata_items_count() -> werkzeug.wrappers.Response: t0 = time() isa_filter = read_isa_filter_param() count = api.wikidata_items_count(read_bounds_param(), isa_filter=isa_filter) @@ -610,7 +634,7 @@ def api_polygon(osm_type, osm_id): @app.route("/refresh/Q") def refresh_item(item_id: int) -> str: """Refresh the local mirror of a Wikidata item.""" - assert not model.Item.query.get(item_id) + existing = model.Item.query.get(item_id) qid = f"Q{item_id}" entity = wikidata_api.get_entity(qid) @@ -618,13 +642,17 @@ def refresh_item(item_id: int) -> str: assert qid == entity_qid coords = wikidata.get_entity_coords(entity["claims"]) - assert coords obj = {k: v for k, v in entity.items() if k in entity_keys} - item = model.Item(item_id=item_id, **obj) - print(item) - item.locations = model.location_objects(coords) - database.session.add(item) + if existing: + for k, v in obj.items(): + setattr(model, k, v) + else: + item = model.Item(item_id=item_id, **obj) + database.session.add(item) + + if coords: + item.locations = model.location_objects(coords) database.session.commit() return "done"