diff --git a/matcher/api.py b/matcher/api.py index 97e47f1..9c1d431 100644 --- a/matcher/api.py +++ b/matcher/api.py @@ -941,11 +941,7 @@ def get_item(item_id: int) -> model.Item | None: def get_item_street_addresses(item: model.Item) -> list[str]: """Hunt for street addresses for the given item.""" - p6375 = item.get_claim("P6375") - assert isinstance(p6375, list) - street_address: list[str] = [ - typing.cast(str, addr["text"]) for addr in p6375 if addr - ] + street_address = [addr["text"] for addr in item.get_claim("P6375") if addr] if street_address or "P669" not in item.claims: return street_address @@ -955,9 +951,6 @@ def get_item_street_addresses(item: model.Item) -> list[str]: qualifiers = claim.get("qualifiers") if not qualifiers or "P670" not in qualifiers: continue - if "datavalue" not in qualifiers["P670"][0]: # 'no value' for P670 - assert qualifiers["P670"][0]["snaktype"] == "novalue" - continue number = qualifiers["P670"][0]["datavalue"]["value"] street_item = get_item(claim["mainsnak"]["datavalue"]["value"]["numeric-id"]) diff --git a/matcher/model.py b/matcher/model.py index 4f61118..40d56f5 100644 --- a/matcher/model.py +++ b/matcher/model.py @@ -107,7 +107,7 @@ class Item(Base): aliases = Column(postgresql.JSONB) sitelinks = Column(postgresql.JSONB) claims = Column(postgresql.JSONB, nullable=False) - lastrevid = Column(BigInteger, nullable=False, unique=True) + lastrevid = Column(Integer, nullable=False, unique=True) locations: Mapped[list["ItemLocation"]] = relationship( "ItemLocation", cascade="all, delete-orphan", backref="item" ) diff --git a/matcher/osm_oauth.py b/matcher/osm_oauth.py index 09eccae..98c1f4d 100644 --- a/matcher/osm_oauth.py +++ b/matcher/osm_oauth.py @@ -1,82 +1,67 @@ """OSM Authentication.""" -import json import typing from datetime import datetime from urllib.parse import urlencode -import flask import lxml.etree -import requests -from requests_oauthlib import OAuth2Session +from flask import current_app, g, session +from requests_oauthlib import OAuth1Session from . import user_agent_headers from .model import User osm_api_base = "https://api.openstreetmap.org/api/0.6" -scope = ["read_prefs", "write_api"] -def get_session() -> OAuth2Session: - """Get session.""" - token = flask.session.get("oauth_token") - if not token: - user = flask.g.user - assert user.is_authenticated - token = json.loads(user.osm_oauth_token) - flask.session["oauth_token"] = token - - callback = flask.url_for("oauth_callback", _external=True) - return OAuth2Session( - flask.current_app.config["CLIENT_KEY"], - redirect_uri=callback, - scope=scope, - token=token, +def api_put_request(path, **kwargs): + user = g.user + assert user.is_authenticated + oauth = OAuth1Session( + current_app.config["CLIENT_KEY"], + client_secret=current_app.config["CLIENT_SECRET"], + resource_owner_key=user.osm_oauth_token, + resource_owner_secret=user.osm_oauth_token_secret, ) - - -def api_put_request(path: str, **kwargs: typing.Any) -> requests.Response: - """Send OSM API PUT request.""" - oauth = get_session() - return oauth.request( "PUT", osm_api_base + path, headers=user_agent_headers(), **kwargs ) -def api_request(path: str, **params: typing.Any) -> requests.Response: - """Send OSM API request.""" +def api_request(path, **params): + user = g.user + assert user.is_authenticated + app = current_app url = osm_api_base + path if params: url += "?" + urlencode(params) - - oauth = get_session() + client_key = app.config["CLIENT_KEY"] + client_secret = app.config["CLIENT_SECRET"] + oauth = OAuth1Session( + client_key, + client_secret=client_secret, + resource_owner_key=user.osm_oauth_token, + resource_owner_secret=user.osm_oauth_token_secret, + ) return oauth.get(url, timeout=4) -def parse_iso_date(value: str) -> datetime: - """Parse ISO date.""" +def parse_iso_date(value): return datetime.strptime(value, "%Y-%m-%dT%H:%M:%SZ") -def parse_userinfo_call(xml: bytes) -> dict[str, typing.Any]: - """Parse userinfo call.""" +def parse_userinfo_call(xml): root = lxml.etree.fromstring(xml) user = root[0] img = user.find(".//img") - account_created_date = user.get("account_created") - assert account_created_date - account_created = parse_iso_date(account_created_date) + account_created = parse_iso_date(user.get("account_created")) assert user.tag == "user" - id_str = user.get("id") - assert id_str and isinstance(id_str, str) - return { "account_created": account_created, - "id": int(id_str), + "id": int(user.get("id")), "username": user.get("display_name"), "description": user.findtext(".//description"), "img": (img.get("href") if img is not None else None), @@ -85,10 +70,10 @@ def parse_userinfo_call(xml: bytes) -> dict[str, typing.Any]: def get_username() -> str | None: """Get username of current user.""" - if "user_id" not in flask.session: + if "user_id" not in session: return None # not authorized - user_id = flask.session["user_id"] + user_id = session["user_id"] user = User.query.get(user_id) return typing.cast(str, user.username) diff --git a/matcher/utils.py b/matcher/utils.py index 55f3bd2..dbe850f 100644 --- a/matcher/utils.py +++ b/matcher/utils.py @@ -204,7 +204,7 @@ def format_wikibase_time(v: WikibaseTime) -> str | None: case 7: # century century = ((int(t[:5]) - 1) // 100) + 1 ordinal_num = num2words(abs(century), to="ordinal_num") - return f"{ordinal_num} century{' BC' if century < 0 else ''}" + return f"{ordinal_num} {century}{' BC' if century < 0 else ''}" case 6: # millennium millennium = ((int(t[:5]) - 1) // 1000) + 1 ordinal_num = num2words(abs(millennium), to="ordinal_num") diff --git a/tests/test_utils.py b/tests/test_utils.py index c9f6c7f..2064264 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,30 +1,17 @@ -"""Test matcher utils.""" - from matcher import utils -def test_format_wikibase_time_year() -> None: - """Test passing a year to format_wikibase_time.""" +def test_format_wikibase_time_year(): v = {"time": "+1950-00-00T00:00:00Z", "precision": 9} assert utils.format_wikibase_time(v) == "1950" -def test_format_wikibase_time_century() -> None: - """Test passing centuries to format_wikibase_time.""" +def test_format_wikibase_time_century(): v = {"time": "+0800-00-00T00:00:00Z", "precision": 7} assert utils.format_wikibase_time(v) == "8th century" v = {"time": "+1950-00-00T00:00:00Z", "precision": 7} assert utils.format_wikibase_time(v) == "20th century" - -def test_format_wikibase_time_decade() -> None: - """Test passing a full date to format_wikibase_time.""" - v = {"time": "+1910-00-00T00:00:00Z", "precision": 8} - assert utils.format_wikibase_time(v) == "1910s" - - -def test_format_wikibase_time_day() -> None: - """Test passing a full date to format_wikibase_time.""" v = {"time": "+1868-01-09T00:00:00Z", "precision": 11} assert utils.format_wikibase_time(v) == "9 January 1868" diff --git a/update.py b/update.py index ee26a3b..f3626a7 100755 --- a/update.py +++ b/update.py @@ -6,8 +6,6 @@ import json import typing from time import sleep -import requests.exceptions - from matcher import model, wikidata, wikidata_api from matcher.database import init_db, session @@ -83,14 +81,7 @@ def handle_edit(change: Change) -> None: print(f"{ts}: no need to update {qid}") return - for attempt in range(100): - try: - entity = wikidata_api.get_entity(qid) - except requests.exceptions.ConnectionError: - print("connection error, retrying.") - sleep(10) - else: - break + entity = wikidata_api.get_entity(qid) entity_qid = entity.pop("id") if entity_qid != qid: print(f"{ts}: item {qid} replaced with redirect") @@ -132,15 +123,6 @@ def update_database() -> None: r = wikidata_api.get_recent_changes(rcstart=start, rccontinue=rccontinue) reply = r.json() - if ( - "error" in reply - and reply["error"]["code"] == "internal_api_error_DBQueryTimeoutError" - ): - print(reply) - sleep(10) - continue - if "query" not in reply: - print(reply) for change in reply["query"]["recentchanges"]: rctype = change["type"] timestamp = change["timestamp"] diff --git a/web_view.py b/web_view.py index 16049b3..03ab0f3 100755 --- a/web_view.py +++ b/web_view.py @@ -244,12 +244,9 @@ def identifier_index(): @app.route("/commons/") def get_commons_image(filename): - if filename == "null": - flask.abort(404) detail = commons.image_detail([filename], thumbheight=1200, thumbwidth=1200) - if filename not in detail: - flask.abort(404) - return flask.redirect(detail[filename]["thumburl"]) + image = detail[filename] + return flask.redirect(image["thumburl"]) @app.route("/identifier/") @@ -555,8 +552,7 @@ def api_get_item_tags(item_id): ) -def expand_street_name(from_names: typing.Collection[str]) -> set[str]: - """Expand street name.""" +def expand_street_name(from_names): ret = set(from_names) for name in from_names: if any(name.startswith(st) for st in ("St ", "St. ")): @@ -564,10 +560,12 @@ def expand_street_name(from_names: typing.Collection[str]) -> set[str]: ret.add("Saint" + name[first_space:]) if ", " in name: - comma = name.find(", ") - ret.add(name[:comma]) + for n in set(ret): + comma = n.find(", ") + ret.add(name[:comma]) elif "/" in name: - ret.update(part.strip() for part in name.split("/")) + for n in set(ret): + ret.extend(part.strip() for part in n.split("/")) ret.update({"The " + name for name in ret if not name.startswith("The ")}) return ret @@ -678,7 +676,7 @@ def api_polygon(osm_type, osm_id): @app.route("/refresh/Q") def refresh_item(item_id: int) -> str: """Refresh the local mirror of a Wikidata item.""" - item = model.Item.query.get(item_id) + existing = model.Item.query.get(item_id) qid = f"Q{item_id}" entity = wikidata_api.get_entity(qid) @@ -688,9 +686,9 @@ def refresh_item(item_id: int) -> str: coords = wikidata.get_entity_coords(entity["claims"]) obj = {k: v for k, v in entity.items() if k in entity_keys} - if item: + if existing: for k, v in obj.items(): - setattr(item, k, v) + setattr(model, k, v) else: item = model.Item(item_id=item_id, **obj) database.session.add(item)