From 2302809364bd1a6b58f88a667d0efa7b4aec7d9c Mon Sep 17 00:00:00 2001
From: Edward Betts <edward@4angle.com>
Date: Sat, 4 May 2024 06:48:22 +0000
Subject: [PATCH] Add types and docstrings

---
 web_view.py | 82 +++++++++++++++++++++++++++++++++++------------------
 1 file changed, 55 insertions(+), 27 deletions(-)

diff --git a/web_view.py b/web_view.py
index 6ac6dac..0b25724 100755
--- a/web_view.py
+++ b/web_view.py
@@ -4,15 +4,17 @@
 
 import json
 import re
+import typing
 from time import sleep, time
 
 import flask
-import flask_login
-import GeoIP
+import flask_login  # type: ignore
+import GeoIP  # type: ignore
 import lxml
 import maxminddb
 import requests
 import sqlalchemy
+import werkzeug
 from requests_oauthlib import OAuth1Session
 from sqlalchemy import func
 from sqlalchemy.sql.expression import update
@@ -34,6 +36,8 @@ from matcher.data import property_map
 
 # from werkzeug.debug.tbtools import get_current_traceback
 
+StrDict = dict[str, typing.Any]
+
 srid = 4326
 re_point = re.compile(r"^POINT\((.+) (.+)\)$")
 
@@ -56,7 +60,8 @@ re_qid = re.compile(r"^Q\d+$")
 
 
 @app.teardown_appcontext
-def shutdown_session(exception=None):
+def shutdown_session(exception=None) -> None:
+    """Shutdown session."""
     database.session.remove()
 
 
@@ -70,14 +75,15 @@ def dict_repr_values(d):
     return {key: repr(value) for key, value in d.items()}
 
 
-def cors_jsonify(*args, **kwargs):
+def cors_jsonify(*args, **kwargs) -> flask.Response:
     """Add CORS header to JSON."""
     response = flask.jsonify(*args, **kwargs)
     response.headers["Access-Control-Allow-Origin"] = "*"
     return response
 
 
-def check_for_tagged_qids(qids):
+def check_for_tagged_qids(qids: list[str]) -> set[str]:
+    """Check OSM for existing wikidata tags for given QIDs."""
     tagged = set()
     for qid in qids:
         for cls in model.Point, model.Polygon, model.Line:
@@ -103,23 +109,27 @@ def check_for_tagged_qid(qid):
 def geoip_user_record():
     gi = GeoIP.open(app.config["GEOIP_DATA"], GeoIP.GEOIP_STANDARD)
 
-    remote_ip = flask.request.get("ip", flask.request.remote_addr)
+    remote_ip = flask.request.args.get("ip", flask.request.remote_addr)
     return gi.record_by_addr(remote_ip)
 
 
-def get_user_location():
+def get_user_location() -> StrDict | None:
+    """Get user location."""
     remote_ip = flask.request.args.get("ip", flask.request.remote_addr)
+    assert remote_ip
     maxmind = maxminddb_reader.get(remote_ip)
-    return maxmind.get("location") if maxmind else None
+    return typing.cast(StrDict, maxmind.get("location")) if maxmind else None
 
 
 @app.route("/")
-def redirect_from_root():
+def redirect_from_root() -> werkzeug.wrappers.Response:
+    """Redirect from root to map start page."""
     return flask.redirect(flask.url_for("map_start_page"))
 
 
 @app.route("/index")
-def index_page():
+def index_page() -> str:
+    """Index page."""
     return flask.render_template("index.html")
 
 
@@ -130,7 +140,8 @@ def get_username() -> str | None:
 
 
 @app.route("/isa/Q<int:item_id>", methods=["GET", "POST"])
-def isa_page(item_id):
+def isa_page(item_id: int) -> werkzeug.wrappers.Response | str:
+    """Return IsA page."""
     item = api.get_item(item_id)
 
     if flask.request.method == "POST":
@@ -140,19 +151,26 @@ def isa_page(item_id):
         database.session.commit()
         flask.flash("extra OSM tag/key added")
 
-        return flask.redirect(flask.url_for(flask.request.endpoint, item_id=item_id))
+        endpoint = flask.request.endpoint
+        assert endpoint
+        return flask.redirect(flask.url_for(endpoint, item_id=item_id))
 
     q = model.ItemExtraKeys.query.filter_by(item=item)
     extra = [e.tag_or_key for e in q]
     subclass_property = "P279"
 
     subclass_list = []
+    assert item
     for s in item.get_claim(subclass_property):
-        subclass = api.get_item(s["numeric-id"])
+        assert isinstance(s, dict)
+        subclass_item_id = s["numeric-id"]
+        assert subclass_item_id and isinstance(subclass_item_id, int)
+        subclass = api.get_item(subclass_item_id)
+        assert subclass
         subclass_list.append(
             {
                 "qid": s["id"],
-                "item_id": s["numeric-id"],
+                "item_id": subclass_item_id,
                 "label": subclass.label(),
                 "description": subclass.description(),
                 "isa_page_url": flask.url_for("isa_page", item_id=s["numeric-id"]),
@@ -241,7 +259,8 @@ def identifier_page(pid):
 
 
 @app.route("/map")
-def map_start_page():
+def map_start_page() -> werkzeug.wrappers.Response:
+    """Map start page."""
     loc = get_user_location()
 
     if loc:
@@ -278,6 +297,7 @@ def documentation_page() -> str:
 def search_page() -> str:
     """Search."""
     loc = get_user_location()
+    assert loc
     q = flask.request.args.get("q")
 
     user = flask_login.current_user
@@ -298,15 +318,16 @@ def search_page() -> str:
 
 @app.route("/map/<int:zoom>/<float(signed=True):lat>/<float(signed=True):lon>")
 def map_location(zoom: int, lat: float, lon: float) -> str:
+    """Map location."""
     qid = flask.request.args.get("item")
     isa_param = flask.request.args.get("isa")
     if qid:
-        api.get_item(qid[1:])
+        api.get_item(int(qid[1:]))
 
     isa_list = []
     if isa_param:
         for isa_qid in isa_param.split(";"):
-            isa = api.get_item(isa_qid[1:])
+            isa = api.get_item(int(isa_qid[1:]))
             if not isa:
                 continue
             cur = {
@@ -330,7 +351,8 @@ def map_location(zoom: int, lat: float, lon: float) -> str:
 
 
 @app.route("/item/Q<int:item_id>")
-def lookup_item(item_id):
+def lookup_item(item_id: int):
+    """Lookup item."""
     item = api.get_item(item_id)
     if not item:
         # TODO: show nicer page for Wikidata item not found
@@ -360,7 +382,8 @@ def lookup_item(item_id):
 
 
 @app.route("/search/map")
-def search_map_page():
+def search_map_page() -> str:
+    """Search map page."""
     user_lat, user_lon = get_user_location() or (None, None)
 
     q = flask.request.args.get("q")
@@ -394,12 +417,13 @@ def read_isa_filter_param():
 
 
 @app.route("/api/1/location")
-def show_user_location():
+def show_user_location() -> werkzeug.wrappers.Response:
+    """User location."""
     return cors_jsonify(get_user_location())
 
 
 @app.route("/api/1/count")
-def api_wikidata_items_count():
+def api_wikidata_items_count() -> werkzeug.wrappers.Response:
     t0 = time()
     isa_filter = read_isa_filter_param()
     count = api.wikidata_items_count(read_bounds_param(), isa_filter=isa_filter)
@@ -610,7 +634,7 @@ def api_polygon(osm_type, osm_id):
 @app.route("/refresh/Q<int:item_id>")
 def refresh_item(item_id: int) -> str:
     """Refresh the local mirror of a Wikidata item."""
-    assert not model.Item.query.get(item_id)
+    existing = model.Item.query.get(item_id)
 
     qid = f"Q{item_id}"
     entity = wikidata_api.get_entity(qid)
@@ -618,13 +642,17 @@ def refresh_item(item_id: int) -> str:
     assert qid == entity_qid
 
     coords = wikidata.get_entity_coords(entity["claims"])
-    assert coords
 
     obj = {k: v for k, v in entity.items() if k in entity_keys}
-    item = model.Item(item_id=item_id, **obj)
-    print(item)
-    item.locations = model.location_objects(coords)
-    database.session.add(item)
+    if existing:
+        for k, v in obj.items():
+            setattr(model, k, v)
+    else:
+        item = model.Item(item_id=item_id, **obj)
+        database.session.add(item)
+
+    if coords:
+        item.locations = model.location_objects(coords)
     database.session.commit()
 
     return "done"