forked from edward/owl-map
Compare commits
14 commits
3869ef32c0
...
acd5f94bd3
Author | SHA1 | Date | |
---|---|---|---|
acd5f94bd3 | |||
78296ce189 | |||
fa02c59ae7 | |||
Edward Betts | 72063f2e2b | ||
Edward Betts | 4d64ac212d | ||
Edward Betts | 3f04c82ba9 | ||
Edward Betts | 468d1e49c7 | ||
Edward Betts | ba22e8e9df | ||
Edward Betts | 938f79ba19 | ||
Edward Betts | 4863eb59d5 | ||
Edward Betts | a04106ce1f | ||
Edward Betts | 13ecf4526d | ||
Edward Betts | 269c6bce54 | ||
Edward Betts | 4d32f62dfb |
|
@ -941,7 +941,11 @@ def get_item(item_id: int) -> model.Item | None:
|
||||||
|
|
||||||
def get_item_street_addresses(item: model.Item) -> list[str]:
|
def get_item_street_addresses(item: model.Item) -> list[str]:
|
||||||
"""Hunt for street addresses for the given item."""
|
"""Hunt for street addresses for the given item."""
|
||||||
street_address = [addr["text"] for addr in item.get_claim("P6375") if addr]
|
p6375 = item.get_claim("P6375")
|
||||||
|
assert isinstance(p6375, list)
|
||||||
|
street_address: list[str] = [
|
||||||
|
typing.cast(str, addr["text"]) for addr in p6375 if addr
|
||||||
|
]
|
||||||
if street_address or "P669" not in item.claims:
|
if street_address or "P669" not in item.claims:
|
||||||
return street_address
|
return street_address
|
||||||
|
|
||||||
|
@ -951,6 +955,9 @@ def get_item_street_addresses(item: model.Item) -> list[str]:
|
||||||
qualifiers = claim.get("qualifiers")
|
qualifiers = claim.get("qualifiers")
|
||||||
if not qualifiers or "P670" not in qualifiers:
|
if not qualifiers or "P670" not in qualifiers:
|
||||||
continue
|
continue
|
||||||
|
if "datavalue" not in qualifiers["P670"][0]: # 'no value' for P670
|
||||||
|
assert qualifiers["P670"][0]["snaktype"] == "novalue"
|
||||||
|
continue
|
||||||
number = qualifiers["P670"][0]["datavalue"]["value"]
|
number = qualifiers["P670"][0]["datavalue"]["value"]
|
||||||
|
|
||||||
street_item = get_item(claim["mainsnak"]["datavalue"]["value"]["numeric-id"])
|
street_item = get_item(claim["mainsnak"]["datavalue"]["value"]["numeric-id"])
|
||||||
|
|
|
@ -107,7 +107,7 @@ class Item(Base):
|
||||||
aliases = Column(postgresql.JSONB)
|
aliases = Column(postgresql.JSONB)
|
||||||
sitelinks = Column(postgresql.JSONB)
|
sitelinks = Column(postgresql.JSONB)
|
||||||
claims = Column(postgresql.JSONB, nullable=False)
|
claims = Column(postgresql.JSONB, nullable=False)
|
||||||
lastrevid = Column(Integer, nullable=False, unique=True)
|
lastrevid = Column(BigInteger, nullable=False, unique=True)
|
||||||
locations: Mapped[list["ItemLocation"]] = relationship(
|
locations: Mapped[list["ItemLocation"]] = relationship(
|
||||||
"ItemLocation", cascade="all, delete-orphan", backref="item"
|
"ItemLocation", cascade="all, delete-orphan", backref="item"
|
||||||
)
|
)
|
||||||
|
|
|
@ -1,67 +1,82 @@
|
||||||
"""OSM Authentication."""
|
"""OSM Authentication."""
|
||||||
|
|
||||||
|
import json
|
||||||
import typing
|
import typing
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from urllib.parse import urlencode
|
from urllib.parse import urlencode
|
||||||
|
|
||||||
|
import flask
|
||||||
import lxml.etree
|
import lxml.etree
|
||||||
from flask import current_app, g, session
|
import requests
|
||||||
from requests_oauthlib import OAuth1Session
|
from requests_oauthlib import OAuth2Session
|
||||||
|
|
||||||
from . import user_agent_headers
|
from . import user_agent_headers
|
||||||
from .model import User
|
from .model import User
|
||||||
|
|
||||||
osm_api_base = "https://api.openstreetmap.org/api/0.6"
|
osm_api_base = "https://api.openstreetmap.org/api/0.6"
|
||||||
|
scope = ["read_prefs", "write_api"]
|
||||||
|
|
||||||
|
|
||||||
def api_put_request(path, **kwargs):
|
def get_session() -> OAuth2Session:
|
||||||
user = g.user
|
"""Get session."""
|
||||||
|
token = flask.session.get("oauth_token")
|
||||||
|
if not token:
|
||||||
|
user = flask.g.user
|
||||||
assert user.is_authenticated
|
assert user.is_authenticated
|
||||||
oauth = OAuth1Session(
|
token = json.loads(user.osm_oauth_token)
|
||||||
current_app.config["CLIENT_KEY"],
|
flask.session["oauth_token"] = token
|
||||||
client_secret=current_app.config["CLIENT_SECRET"],
|
|
||||||
resource_owner_key=user.osm_oauth_token,
|
callback = flask.url_for("oauth_callback", _external=True)
|
||||||
resource_owner_secret=user.osm_oauth_token_secret,
|
return OAuth2Session(
|
||||||
|
flask.current_app.config["CLIENT_KEY"],
|
||||||
|
redirect_uri=callback,
|
||||||
|
scope=scope,
|
||||||
|
token=token,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def api_put_request(path: str, **kwargs: typing.Any) -> requests.Response:
|
||||||
|
"""Send OSM API PUT request."""
|
||||||
|
oauth = get_session()
|
||||||
|
|
||||||
return oauth.request(
|
return oauth.request(
|
||||||
"PUT", osm_api_base + path, headers=user_agent_headers(), **kwargs
|
"PUT", osm_api_base + path, headers=user_agent_headers(), **kwargs
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def api_request(path, **params):
|
def api_request(path: str, **params: typing.Any) -> requests.Response:
|
||||||
user = g.user
|
"""Send OSM API request."""
|
||||||
assert user.is_authenticated
|
|
||||||
app = current_app
|
|
||||||
url = osm_api_base + path
|
url = osm_api_base + path
|
||||||
if params:
|
if params:
|
||||||
url += "?" + urlencode(params)
|
url += "?" + urlencode(params)
|
||||||
client_key = app.config["CLIENT_KEY"]
|
|
||||||
client_secret = app.config["CLIENT_SECRET"]
|
oauth = get_session()
|
||||||
oauth = OAuth1Session(
|
|
||||||
client_key,
|
|
||||||
client_secret=client_secret,
|
|
||||||
resource_owner_key=user.osm_oauth_token,
|
|
||||||
resource_owner_secret=user.osm_oauth_token_secret,
|
|
||||||
)
|
|
||||||
return oauth.get(url, timeout=4)
|
return oauth.get(url, timeout=4)
|
||||||
|
|
||||||
|
|
||||||
def parse_iso_date(value):
|
def parse_iso_date(value: str) -> datetime:
|
||||||
|
"""Parse ISO date."""
|
||||||
return datetime.strptime(value, "%Y-%m-%dT%H:%M:%SZ")
|
return datetime.strptime(value, "%Y-%m-%dT%H:%M:%SZ")
|
||||||
|
|
||||||
|
|
||||||
def parse_userinfo_call(xml):
|
def parse_userinfo_call(xml: bytes) -> dict[str, typing.Any]:
|
||||||
|
"""Parse userinfo call."""
|
||||||
root = lxml.etree.fromstring(xml)
|
root = lxml.etree.fromstring(xml)
|
||||||
user = root[0]
|
user = root[0]
|
||||||
img = user.find(".//img")
|
img = user.find(".//img")
|
||||||
|
|
||||||
account_created = parse_iso_date(user.get("account_created"))
|
account_created_date = user.get("account_created")
|
||||||
|
assert account_created_date
|
||||||
|
account_created = parse_iso_date(account_created_date)
|
||||||
|
|
||||||
assert user.tag == "user"
|
assert user.tag == "user"
|
||||||
|
|
||||||
|
id_str = user.get("id")
|
||||||
|
assert id_str and isinstance(id_str, str)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"account_created": account_created,
|
"account_created": account_created,
|
||||||
"id": int(user.get("id")),
|
"id": int(id_str),
|
||||||
"username": user.get("display_name"),
|
"username": user.get("display_name"),
|
||||||
"description": user.findtext(".//description"),
|
"description": user.findtext(".//description"),
|
||||||
"img": (img.get("href") if img is not None else None),
|
"img": (img.get("href") if img is not None else None),
|
||||||
|
@ -70,10 +85,10 @@ def parse_userinfo_call(xml):
|
||||||
|
|
||||||
def get_username() -> str | None:
|
def get_username() -> str | None:
|
||||||
"""Get username of current user."""
|
"""Get username of current user."""
|
||||||
if "user_id" not in session:
|
if "user_id" not in flask.session:
|
||||||
return None # not authorized
|
return None # not authorized
|
||||||
|
|
||||||
user_id = session["user_id"]
|
user_id = flask.session["user_id"]
|
||||||
|
|
||||||
user = User.query.get(user_id)
|
user = User.query.get(user_id)
|
||||||
return typing.cast(str, user.username)
|
return typing.cast(str, user.username)
|
||||||
|
|
|
@ -204,7 +204,7 @@ def format_wikibase_time(v: WikibaseTime) -> str | None:
|
||||||
case 7: # century
|
case 7: # century
|
||||||
century = ((int(t[:5]) - 1) // 100) + 1
|
century = ((int(t[:5]) - 1) // 100) + 1
|
||||||
ordinal_num = num2words(abs(century), to="ordinal_num")
|
ordinal_num = num2words(abs(century), to="ordinal_num")
|
||||||
return f"{ordinal_num} {century}{' BC' if century < 0 else ''}"
|
return f"{ordinal_num} century{' BC' if century < 0 else ''}"
|
||||||
case 6: # millennium
|
case 6: # millennium
|
||||||
millennium = ((int(t[:5]) - 1) // 1000) + 1
|
millennium = ((int(t[:5]) - 1) // 1000) + 1
|
||||||
ordinal_num = num2words(abs(millennium), to="ordinal_num")
|
ordinal_num = num2words(abs(millennium), to="ordinal_num")
|
||||||
|
|
|
@ -1,17 +1,30 @@
|
||||||
|
"""Test matcher utils."""
|
||||||
|
|
||||||
from matcher import utils
|
from matcher import utils
|
||||||
|
|
||||||
|
|
||||||
def test_format_wikibase_time_year():
|
def test_format_wikibase_time_year() -> None:
|
||||||
|
"""Test passing a year to format_wikibase_time."""
|
||||||
v = {"time": "+1950-00-00T00:00:00Z", "precision": 9}
|
v = {"time": "+1950-00-00T00:00:00Z", "precision": 9}
|
||||||
assert utils.format_wikibase_time(v) == "1950"
|
assert utils.format_wikibase_time(v) == "1950"
|
||||||
|
|
||||||
|
|
||||||
def test_format_wikibase_time_century():
|
def test_format_wikibase_time_century() -> None:
|
||||||
|
"""Test passing centuries to format_wikibase_time."""
|
||||||
v = {"time": "+0800-00-00T00:00:00Z", "precision": 7}
|
v = {"time": "+0800-00-00T00:00:00Z", "precision": 7}
|
||||||
assert utils.format_wikibase_time(v) == "8th century"
|
assert utils.format_wikibase_time(v) == "8th century"
|
||||||
|
|
||||||
v = {"time": "+1950-00-00T00:00:00Z", "precision": 7}
|
v = {"time": "+1950-00-00T00:00:00Z", "precision": 7}
|
||||||
assert utils.format_wikibase_time(v) == "20th century"
|
assert utils.format_wikibase_time(v) == "20th century"
|
||||||
|
|
||||||
|
|
||||||
|
def test_format_wikibase_time_decade() -> None:
|
||||||
|
"""Test passing a full date to format_wikibase_time."""
|
||||||
|
v = {"time": "+1910-00-00T00:00:00Z", "precision": 8}
|
||||||
|
assert utils.format_wikibase_time(v) == "1910s"
|
||||||
|
|
||||||
|
|
||||||
|
def test_format_wikibase_time_day() -> None:
|
||||||
|
"""Test passing a full date to format_wikibase_time."""
|
||||||
v = {"time": "+1868-01-09T00:00:00Z", "precision": 11}
|
v = {"time": "+1868-01-09T00:00:00Z", "precision": 11}
|
||||||
assert utils.format_wikibase_time(v) == "9 January 1868"
|
assert utils.format_wikibase_time(v) == "9 January 1868"
|
||||||
|
|
18
update.py
18
update.py
|
@ -6,6 +6,8 @@ import json
|
||||||
import typing
|
import typing
|
||||||
from time import sleep
|
from time import sleep
|
||||||
|
|
||||||
|
import requests.exceptions
|
||||||
|
|
||||||
from matcher import model, wikidata, wikidata_api
|
from matcher import model, wikidata, wikidata_api
|
||||||
from matcher.database import init_db, session
|
from matcher.database import init_db, session
|
||||||
|
|
||||||
|
@ -81,7 +83,14 @@ def handle_edit(change: Change) -> None:
|
||||||
print(f"{ts}: no need to update {qid}")
|
print(f"{ts}: no need to update {qid}")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
for attempt in range(100):
|
||||||
|
try:
|
||||||
entity = wikidata_api.get_entity(qid)
|
entity = wikidata_api.get_entity(qid)
|
||||||
|
except requests.exceptions.ConnectionError:
|
||||||
|
print("connection error, retrying.")
|
||||||
|
sleep(10)
|
||||||
|
else:
|
||||||
|
break
|
||||||
entity_qid = entity.pop("id")
|
entity_qid = entity.pop("id")
|
||||||
if entity_qid != qid:
|
if entity_qid != qid:
|
||||||
print(f"{ts}: item {qid} replaced with redirect")
|
print(f"{ts}: item {qid} replaced with redirect")
|
||||||
|
@ -123,6 +132,15 @@ def update_database() -> None:
|
||||||
r = wikidata_api.get_recent_changes(rcstart=start, rccontinue=rccontinue)
|
r = wikidata_api.get_recent_changes(rcstart=start, rccontinue=rccontinue)
|
||||||
|
|
||||||
reply = r.json()
|
reply = r.json()
|
||||||
|
if (
|
||||||
|
"error" in reply
|
||||||
|
and reply["error"]["code"] == "internal_api_error_DBQueryTimeoutError"
|
||||||
|
):
|
||||||
|
print(reply)
|
||||||
|
sleep(10)
|
||||||
|
continue
|
||||||
|
if "query" not in reply:
|
||||||
|
print(reply)
|
||||||
for change in reply["query"]["recentchanges"]:
|
for change in reply["query"]["recentchanges"]:
|
||||||
rctype = change["type"]
|
rctype = change["type"]
|
||||||
timestamp = change["timestamp"]
|
timestamp = change["timestamp"]
|
||||||
|
|
22
web_view.py
22
web_view.py
|
@ -244,9 +244,12 @@ def identifier_index():
|
||||||
|
|
||||||
@app.route("/commons/<filename>")
|
@app.route("/commons/<filename>")
|
||||||
def get_commons_image(filename):
|
def get_commons_image(filename):
|
||||||
|
if filename == "null":
|
||||||
|
flask.abort(404)
|
||||||
detail = commons.image_detail([filename], thumbheight=1200, thumbwidth=1200)
|
detail = commons.image_detail([filename], thumbheight=1200, thumbwidth=1200)
|
||||||
image = detail[filename]
|
if filename not in detail:
|
||||||
return flask.redirect(image["thumburl"])
|
flask.abort(404)
|
||||||
|
return flask.redirect(detail[filename]["thumburl"])
|
||||||
|
|
||||||
|
|
||||||
@app.route("/identifier/<pid>")
|
@app.route("/identifier/<pid>")
|
||||||
|
@ -552,7 +555,8 @@ def api_get_item_tags(item_id):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def expand_street_name(from_names):
|
def expand_street_name(from_names: typing.Collection[str]) -> set[str]:
|
||||||
|
"""Expand street name."""
|
||||||
ret = set(from_names)
|
ret = set(from_names)
|
||||||
for name in from_names:
|
for name in from_names:
|
||||||
if any(name.startswith(st) for st in ("St ", "St. ")):
|
if any(name.startswith(st) for st in ("St ", "St. ")):
|
||||||
|
@ -560,12 +564,10 @@ def expand_street_name(from_names):
|
||||||
ret.add("Saint" + name[first_space:])
|
ret.add("Saint" + name[first_space:])
|
||||||
|
|
||||||
if ", " in name:
|
if ", " in name:
|
||||||
for n in set(ret):
|
comma = name.find(", ")
|
||||||
comma = n.find(", ")
|
|
||||||
ret.add(name[:comma])
|
ret.add(name[:comma])
|
||||||
elif "/" in name:
|
elif "/" in name:
|
||||||
for n in set(ret):
|
ret.update(part.strip() for part in name.split("/"))
|
||||||
ret.extend(part.strip() for part in n.split("/"))
|
|
||||||
|
|
||||||
ret.update({"The " + name for name in ret if not name.startswith("The ")})
|
ret.update({"The " + name for name in ret if not name.startswith("The ")})
|
||||||
return ret
|
return ret
|
||||||
|
@ -676,7 +678,7 @@ def api_polygon(osm_type, osm_id):
|
||||||
@app.route("/refresh/Q<int:item_id>")
|
@app.route("/refresh/Q<int:item_id>")
|
||||||
def refresh_item(item_id: int) -> str:
|
def refresh_item(item_id: int) -> str:
|
||||||
"""Refresh the local mirror of a Wikidata item."""
|
"""Refresh the local mirror of a Wikidata item."""
|
||||||
existing = model.Item.query.get(item_id)
|
item = model.Item.query.get(item_id)
|
||||||
|
|
||||||
qid = f"Q{item_id}"
|
qid = f"Q{item_id}"
|
||||||
entity = wikidata_api.get_entity(qid)
|
entity = wikidata_api.get_entity(qid)
|
||||||
|
@ -686,9 +688,9 @@ def refresh_item(item_id: int) -> str:
|
||||||
coords = wikidata.get_entity_coords(entity["claims"])
|
coords = wikidata.get_entity_coords(entity["claims"])
|
||||||
|
|
||||||
obj = {k: v for k, v in entity.items() if k in entity_keys}
|
obj = {k: v for k, v in entity.items() if k in entity_keys}
|
||||||
if existing:
|
if item:
|
||||||
for k, v in obj.items():
|
for k, v in obj.items():
|
||||||
setattr(model, k, v)
|
setattr(item, k, v)
|
||||||
else:
|
else:
|
||||||
item = model.Item(item_id=item_id, **obj)
|
item = model.Item(item_id=item_id, **obj)
|
||||||
database.session.add(item)
|
database.session.add(item)
|
||||||
|
|
Loading…
Reference in a new issue