owl-map/update.py

179 lines
4.8 KiB
Python
Raw Normal View History

#!/usr/bin/python3
"""Download Wikidata recent changes and update items in local database."""
import json
2023-05-14 14:48:26 +01:00
import typing
from time import sleep
import requests.exceptions
from matcher import model, wikidata, wikidata_api
from matcher.database import init_db, session
DB_URL = "postgresql:///matcher"
init_db(DB_URL)
entity_keys = {"labels", "sitelinks", "aliases", "claims", "descriptions", "lastrevid"}
2023-05-14 15:00:45 +01:00
class Change(typing.TypedDict):
"""Dict representing an edit in recent changes."""
title: str
timestamp: str
redirect: dict[str, typing.Any] | None
revid: int
def handle_new(change: Change) -> None:
"""Handle a new Wikidata item from the recent changes feed."""
qid = change["title"]
ts = change["timestamp"]
if change["redirect"]:
print(f"{ts}: new item {qid}, since replaced with redirect")
return
item = model.Item.query.get(qid[1:]) # check if item is already loaded
if item:
return handle_edit(change)
entity = wikidata_api.get_entity(qid)
if entity["id"] != qid:
print(f'redirect {qid} -> {entity["id"]}')
return
if "claims" not in entity:
print(qid)
print(entity)
coords = wikidata.get_entity_coords(entity["claims"])
if not coords:
print(f"{ts}: new item {qid} without coordinates")
return
print(f"{ts}: new item {qid} with coordinates")
item_id = int(qid[1:])
obj = {k: v for k, v in entity.items() if k in entity_keys}
try:
item = model.Item(item_id=item_id, **obj)
except TypeError:
print(qid)
print(f'{entity["pageid"]=} {entity["ns"]=} {entity["type"]=}')
print(entity.keys())
raise
item.locations = model.location_objects(coords)
session.add(item)
2023-05-14 14:48:26 +01:00
def coords_equal(a: dict[str, typing.Any], b: dict[str, typing.Any]) -> bool:
"""Deep equality comparison of nested dicts."""
return json.dumps(a, sort_keys=True) == json.dumps(b, sort_keys=True)
2023-05-14 15:00:45 +01:00
def handle_edit(change: Change) -> None:
"""Process an edit from recent changes."""
qid = change["title"]
item = model.Item.query.get(qid[1:])
if not item:
return # item isn't in our database so it probably has no coordinates
ts = change["timestamp"]
if item.lastrevid >= change["revid"]:
print(f"{ts}: no need to update {qid}")
return
for attempt in range(100):
try:
entity = wikidata_api.get_entity(qid)
except requests.exceptions.ConnectionError:
print("connection error, retrying.")
sleep(10)
else:
break
entity_qid = entity.pop("id")
if entity_qid != qid:
print(f"{ts}: item {qid} replaced with redirect")
session.delete(item)
session.commit()
return
assert entity_qid == qid
existing_coords = wikidata.get_entity_coords(item.claims)
if "claims" not in entity:
return
coords = wikidata.get_entity_coords(entity["claims"])
if not coords_equal(existing_coords, coords):
print(f"{ts}: update item {qid}, including coordinates")
item.locations = model.location_objects(coords)
else:
print(f"{ts}: update item {qid}, no change to coordinates")
for key in entity_keys:
setattr(item, key, entity[key]) # type: ignore
2023-05-14 14:48:26 +01:00
def update_timestamp(timestamp: str) -> None:
"""Save timestamp to rc_timestamp."""
out = open("rc_timestamp", "w")
print(timestamp, file=out)
out.close()
2023-05-14 14:48:26 +01:00
def update_database() -> None:
2023-05-14 15:00:45 +01:00
"""Check recent changes and apply updates to local mirror of Wikidata."""
with open("rc_timestamp") as f:
start = f.read().strip()
rccontinue = None
seen = set()
while True:
r = wikidata_api.get_recent_changes(rcstart=start, rccontinue=rccontinue)
reply = r.json()
if (
"error" in reply
and reply["error"]["code"] == "internal_api_error_DBQueryTimeoutError"
):
print(reply)
sleep(10)
continue
if "query" not in reply:
print(reply)
for change in reply["query"]["recentchanges"]:
rctype = change["type"]
timestamp = change["timestamp"]
qid = change["title"]
if qid in seen:
continue
if rctype == "new":
handle_new(change)
seen.add(qid)
if rctype == "edit":
handle_edit(change)
seen.add(qid)
update_timestamp(timestamp)
print("commit")
session.commit()
if "continue" not in reply:
break
rccontinue = reply["continue"]["rccontinue"]
session.commit()
print("finished")
2023-05-14 14:50:11 +01:00
def main() -> None:
"""Infinite loop."""
while True:
update_database()
sleep(60)
2023-05-14 14:50:11 +01:00
if __name__ == "__main__":
main()