Compare commits

..

No commits in common. "acd5f94bd3163d61c2f1a171e1b1278884d21ce0" and "3869ef32c0174c819330889dc80fcba23e8c50a8" have entirely different histories.

7 changed files with 45 additions and 100 deletions

View file

@ -941,11 +941,7 @@ def get_item(item_id: int) -> model.Item | None:
def get_item_street_addresses(item: model.Item) -> list[str]: def get_item_street_addresses(item: model.Item) -> list[str]:
"""Hunt for street addresses for the given item.""" """Hunt for street addresses for the given item."""
p6375 = item.get_claim("P6375") street_address = [addr["text"] for addr in item.get_claim("P6375") if addr]
assert isinstance(p6375, list)
street_address: list[str] = [
typing.cast(str, addr["text"]) for addr in p6375 if addr
]
if street_address or "P669" not in item.claims: if street_address or "P669" not in item.claims:
return street_address return street_address
@ -955,9 +951,6 @@ def get_item_street_addresses(item: model.Item) -> list[str]:
qualifiers = claim.get("qualifiers") qualifiers = claim.get("qualifiers")
if not qualifiers or "P670" not in qualifiers: if not qualifiers or "P670" not in qualifiers:
continue continue
if "datavalue" not in qualifiers["P670"][0]: # 'no value' for P670
assert qualifiers["P670"][0]["snaktype"] == "novalue"
continue
number = qualifiers["P670"][0]["datavalue"]["value"] number = qualifiers["P670"][0]["datavalue"]["value"]
street_item = get_item(claim["mainsnak"]["datavalue"]["value"]["numeric-id"]) street_item = get_item(claim["mainsnak"]["datavalue"]["value"]["numeric-id"])

View file

@ -107,7 +107,7 @@ class Item(Base):
aliases = Column(postgresql.JSONB) aliases = Column(postgresql.JSONB)
sitelinks = Column(postgresql.JSONB) sitelinks = Column(postgresql.JSONB)
claims = Column(postgresql.JSONB, nullable=False) claims = Column(postgresql.JSONB, nullable=False)
lastrevid = Column(BigInteger, nullable=False, unique=True) lastrevid = Column(Integer, nullable=False, unique=True)
locations: Mapped[list["ItemLocation"]] = relationship( locations: Mapped[list["ItemLocation"]] = relationship(
"ItemLocation", cascade="all, delete-orphan", backref="item" "ItemLocation", cascade="all, delete-orphan", backref="item"
) )

View file

@ -1,82 +1,67 @@
"""OSM Authentication.""" """OSM Authentication."""
import json
import typing import typing
from datetime import datetime from datetime import datetime
from urllib.parse import urlencode from urllib.parse import urlencode
import flask
import lxml.etree import lxml.etree
import requests from flask import current_app, g, session
from requests_oauthlib import OAuth2Session from requests_oauthlib import OAuth1Session
from . import user_agent_headers from . import user_agent_headers
from .model import User from .model import User
osm_api_base = "https://api.openstreetmap.org/api/0.6" osm_api_base = "https://api.openstreetmap.org/api/0.6"
scope = ["read_prefs", "write_api"]
def get_session() -> OAuth2Session: def api_put_request(path, **kwargs):
"""Get session.""" user = g.user
token = flask.session.get("oauth_token")
if not token:
user = flask.g.user
assert user.is_authenticated assert user.is_authenticated
token = json.loads(user.osm_oauth_token) oauth = OAuth1Session(
flask.session["oauth_token"] = token current_app.config["CLIENT_KEY"],
client_secret=current_app.config["CLIENT_SECRET"],
callback = flask.url_for("oauth_callback", _external=True) resource_owner_key=user.osm_oauth_token,
return OAuth2Session( resource_owner_secret=user.osm_oauth_token_secret,
flask.current_app.config["CLIENT_KEY"],
redirect_uri=callback,
scope=scope,
token=token,
) )
def api_put_request(path: str, **kwargs: typing.Any) -> requests.Response:
"""Send OSM API PUT request."""
oauth = get_session()
return oauth.request( return oauth.request(
"PUT", osm_api_base + path, headers=user_agent_headers(), **kwargs "PUT", osm_api_base + path, headers=user_agent_headers(), **kwargs
) )
def api_request(path: str, **params: typing.Any) -> requests.Response: def api_request(path, **params):
"""Send OSM API request.""" user = g.user
assert user.is_authenticated
app = current_app
url = osm_api_base + path url = osm_api_base + path
if params: if params:
url += "?" + urlencode(params) url += "?" + urlencode(params)
client_key = app.config["CLIENT_KEY"]
oauth = get_session() client_secret = app.config["CLIENT_SECRET"]
oauth = OAuth1Session(
client_key,
client_secret=client_secret,
resource_owner_key=user.osm_oauth_token,
resource_owner_secret=user.osm_oauth_token_secret,
)
return oauth.get(url, timeout=4) return oauth.get(url, timeout=4)
def parse_iso_date(value: str) -> datetime: def parse_iso_date(value):
"""Parse ISO date."""
return datetime.strptime(value, "%Y-%m-%dT%H:%M:%SZ") return datetime.strptime(value, "%Y-%m-%dT%H:%M:%SZ")
def parse_userinfo_call(xml: bytes) -> dict[str, typing.Any]: def parse_userinfo_call(xml):
"""Parse userinfo call."""
root = lxml.etree.fromstring(xml) root = lxml.etree.fromstring(xml)
user = root[0] user = root[0]
img = user.find(".//img") img = user.find(".//img")
account_created_date = user.get("account_created") account_created = parse_iso_date(user.get("account_created"))
assert account_created_date
account_created = parse_iso_date(account_created_date)
assert user.tag == "user" assert user.tag == "user"
id_str = user.get("id")
assert id_str and isinstance(id_str, str)
return { return {
"account_created": account_created, "account_created": account_created,
"id": int(id_str), "id": int(user.get("id")),
"username": user.get("display_name"), "username": user.get("display_name"),
"description": user.findtext(".//description"), "description": user.findtext(".//description"),
"img": (img.get("href") if img is not None else None), "img": (img.get("href") if img is not None else None),
@ -85,10 +70,10 @@ def parse_userinfo_call(xml: bytes) -> dict[str, typing.Any]:
def get_username() -> str | None: def get_username() -> str | None:
"""Get username of current user.""" """Get username of current user."""
if "user_id" not in flask.session: if "user_id" not in session:
return None # not authorized return None # not authorized
user_id = flask.session["user_id"] user_id = session["user_id"]
user = User.query.get(user_id) user = User.query.get(user_id)
return typing.cast(str, user.username) return typing.cast(str, user.username)

View file

@ -204,7 +204,7 @@ def format_wikibase_time(v: WikibaseTime) -> str | None:
case 7: # century case 7: # century
century = ((int(t[:5]) - 1) // 100) + 1 century = ((int(t[:5]) - 1) // 100) + 1
ordinal_num = num2words(abs(century), to="ordinal_num") ordinal_num = num2words(abs(century), to="ordinal_num")
return f"{ordinal_num} century{' BC' if century < 0 else ''}" return f"{ordinal_num} {century}{' BC' if century < 0 else ''}"
case 6: # millennium case 6: # millennium
millennium = ((int(t[:5]) - 1) // 1000) + 1 millennium = ((int(t[:5]) - 1) // 1000) + 1
ordinal_num = num2words(abs(millennium), to="ordinal_num") ordinal_num = num2words(abs(millennium), to="ordinal_num")

View file

@ -1,30 +1,17 @@
"""Test matcher utils."""
from matcher import utils from matcher import utils
def test_format_wikibase_time_year() -> None: def test_format_wikibase_time_year():
"""Test passing a year to format_wikibase_time."""
v = {"time": "+1950-00-00T00:00:00Z", "precision": 9} v = {"time": "+1950-00-00T00:00:00Z", "precision": 9}
assert utils.format_wikibase_time(v) == "1950" assert utils.format_wikibase_time(v) == "1950"
def test_format_wikibase_time_century() -> None: def test_format_wikibase_time_century():
"""Test passing centuries to format_wikibase_time."""
v = {"time": "+0800-00-00T00:00:00Z", "precision": 7} v = {"time": "+0800-00-00T00:00:00Z", "precision": 7}
assert utils.format_wikibase_time(v) == "8th century" assert utils.format_wikibase_time(v) == "8th century"
v = {"time": "+1950-00-00T00:00:00Z", "precision": 7} v = {"time": "+1950-00-00T00:00:00Z", "precision": 7}
assert utils.format_wikibase_time(v) == "20th century" assert utils.format_wikibase_time(v) == "20th century"
def test_format_wikibase_time_decade() -> None:
"""Test passing a full date to format_wikibase_time."""
v = {"time": "+1910-00-00T00:00:00Z", "precision": 8}
assert utils.format_wikibase_time(v) == "1910s"
def test_format_wikibase_time_day() -> None:
"""Test passing a full date to format_wikibase_time."""
v = {"time": "+1868-01-09T00:00:00Z", "precision": 11} v = {"time": "+1868-01-09T00:00:00Z", "precision": 11}
assert utils.format_wikibase_time(v) == "9 January 1868" assert utils.format_wikibase_time(v) == "9 January 1868"

View file

@ -6,8 +6,6 @@ import json
import typing import typing
from time import sleep from time import sleep
import requests.exceptions
from matcher import model, wikidata, wikidata_api from matcher import model, wikidata, wikidata_api
from matcher.database import init_db, session from matcher.database import init_db, session
@ -83,14 +81,7 @@ def handle_edit(change: Change) -> None:
print(f"{ts}: no need to update {qid}") print(f"{ts}: no need to update {qid}")
return return
for attempt in range(100):
try:
entity = wikidata_api.get_entity(qid) entity = wikidata_api.get_entity(qid)
except requests.exceptions.ConnectionError:
print("connection error, retrying.")
sleep(10)
else:
break
entity_qid = entity.pop("id") entity_qid = entity.pop("id")
if entity_qid != qid: if entity_qid != qid:
print(f"{ts}: item {qid} replaced with redirect") print(f"{ts}: item {qid} replaced with redirect")
@ -132,15 +123,6 @@ def update_database() -> None:
r = wikidata_api.get_recent_changes(rcstart=start, rccontinue=rccontinue) r = wikidata_api.get_recent_changes(rcstart=start, rccontinue=rccontinue)
reply = r.json() reply = r.json()
if (
"error" in reply
and reply["error"]["code"] == "internal_api_error_DBQueryTimeoutError"
):
print(reply)
sleep(10)
continue
if "query" not in reply:
print(reply)
for change in reply["query"]["recentchanges"]: for change in reply["query"]["recentchanges"]:
rctype = change["type"] rctype = change["type"]
timestamp = change["timestamp"] timestamp = change["timestamp"]

View file

@ -244,12 +244,9 @@ def identifier_index():
@app.route("/commons/<filename>") @app.route("/commons/<filename>")
def get_commons_image(filename): def get_commons_image(filename):
if filename == "null":
flask.abort(404)
detail = commons.image_detail([filename], thumbheight=1200, thumbwidth=1200) detail = commons.image_detail([filename], thumbheight=1200, thumbwidth=1200)
if filename not in detail: image = detail[filename]
flask.abort(404) return flask.redirect(image["thumburl"])
return flask.redirect(detail[filename]["thumburl"])
@app.route("/identifier/<pid>") @app.route("/identifier/<pid>")
@ -555,8 +552,7 @@ def api_get_item_tags(item_id):
) )
def expand_street_name(from_names: typing.Collection[str]) -> set[str]: def expand_street_name(from_names):
"""Expand street name."""
ret = set(from_names) ret = set(from_names)
for name in from_names: for name in from_names:
if any(name.startswith(st) for st in ("St ", "St. ")): if any(name.startswith(st) for st in ("St ", "St. ")):
@ -564,10 +560,12 @@ def expand_street_name(from_names: typing.Collection[str]) -> set[str]:
ret.add("Saint" + name[first_space:]) ret.add("Saint" + name[first_space:])
if ", " in name: if ", " in name:
comma = name.find(", ") for n in set(ret):
comma = n.find(", ")
ret.add(name[:comma]) ret.add(name[:comma])
elif "/" in name: elif "/" in name:
ret.update(part.strip() for part in name.split("/")) for n in set(ret):
ret.extend(part.strip() for part in n.split("/"))
ret.update({"The " + name for name in ret if not name.startswith("The ")}) ret.update({"The " + name for name in ret if not name.startswith("The ")})
return ret return ret
@ -678,7 +676,7 @@ def api_polygon(osm_type, osm_id):
@app.route("/refresh/Q<int:item_id>") @app.route("/refresh/Q<int:item_id>")
def refresh_item(item_id: int) -> str: def refresh_item(item_id: int) -> str:
"""Refresh the local mirror of a Wikidata item.""" """Refresh the local mirror of a Wikidata item."""
item = model.Item.query.get(item_id) existing = model.Item.query.get(item_id)
qid = f"Q{item_id}" qid = f"Q{item_id}"
entity = wikidata_api.get_entity(qid) entity = wikidata_api.get_entity(qid)
@ -688,9 +686,9 @@ def refresh_item(item_id: int) -> str:
coords = wikidata.get_entity_coords(entity["claims"]) coords = wikidata.get_entity_coords(entity["claims"])
obj = {k: v for k, v in entity.items() if k in entity_keys} obj = {k: v for k, v in entity.items() if k in entity_keys}
if item: if existing:
for k, v in obj.items(): for k, v in obj.items():
setattr(item, k, v) setattr(model, k, v)
else: else:
item = model.Item(item_id=item_id, **obj) item = model.Item(item_id=item_id, **obj)
database.session.add(item) database.session.add(item)