179 lines
4.8 KiB
Python
Executable file
179 lines
4.8 KiB
Python
Executable file
#!/usr/bin/python3
|
|
|
|
"""Download Wikidata recent changes and update items in local database."""
|
|
|
|
import json
|
|
import typing
|
|
from time import sleep
|
|
|
|
import requests.exceptions
|
|
|
|
from matcher import model, wikidata, wikidata_api
|
|
from matcher.database import init_db, session
|
|
|
|
DB_URL = "postgresql:///matcher"
|
|
init_db(DB_URL)
|
|
|
|
entity_keys = {"labels", "sitelinks", "aliases", "claims", "descriptions", "lastrevid"}
|
|
|
|
|
|
class Change(typing.TypedDict):
|
|
"""Dict representing an edit in recent changes."""
|
|
|
|
title: str
|
|
timestamp: str
|
|
redirect: dict[str, typing.Any] | None
|
|
revid: int
|
|
|
|
|
|
def handle_new(change: Change) -> None:
|
|
"""Handle a new Wikidata item from the recent changes feed."""
|
|
qid = change["title"]
|
|
ts = change["timestamp"]
|
|
if change["redirect"]:
|
|
print(f"{ts}: new item {qid}, since replaced with redirect")
|
|
return
|
|
item = model.Item.query.get(qid[1:]) # check if item is already loaded
|
|
if item:
|
|
return handle_edit(change)
|
|
|
|
entity = wikidata_api.get_entity(qid)
|
|
if entity["id"] != qid:
|
|
print(f'redirect {qid} -> {entity["id"]}')
|
|
return
|
|
|
|
if "claims" not in entity:
|
|
print(qid)
|
|
print(entity)
|
|
coords = wikidata.get_entity_coords(entity["claims"])
|
|
if not coords:
|
|
print(f"{ts}: new item {qid} without coordinates")
|
|
return
|
|
|
|
print(f"{ts}: new item {qid} with coordinates")
|
|
|
|
item_id = int(qid[1:])
|
|
obj = {k: v for k, v in entity.items() if k in entity_keys}
|
|
try:
|
|
item = model.Item(item_id=item_id, **obj)
|
|
except TypeError:
|
|
print(qid)
|
|
print(f'{entity["pageid"]=} {entity["ns"]=} {entity["type"]=}')
|
|
print(entity.keys())
|
|
raise
|
|
item.locations = model.location_objects(coords)
|
|
session.add(item)
|
|
|
|
|
|
def coords_equal(a: dict[str, typing.Any], b: dict[str, typing.Any]) -> bool:
|
|
"""Deep equality comparison of nested dicts."""
|
|
return json.dumps(a, sort_keys=True) == json.dumps(b, sort_keys=True)
|
|
|
|
|
|
def handle_edit(change: Change) -> None:
|
|
"""Process an edit from recent changes."""
|
|
qid = change["title"]
|
|
item = model.Item.query.get(qid[1:])
|
|
if not item:
|
|
return # item isn't in our database so it probably has no coordinates
|
|
|
|
ts = change["timestamp"]
|
|
|
|
if item.lastrevid >= change["revid"]:
|
|
print(f"{ts}: no need to update {qid}")
|
|
return
|
|
|
|
for attempt in range(100):
|
|
try:
|
|
entity = wikidata_api.get_entity(qid)
|
|
except requests.exceptions.ConnectionError:
|
|
print("connection error, retrying.")
|
|
sleep(10)
|
|
else:
|
|
break
|
|
entity_qid = entity.pop("id")
|
|
if entity_qid != qid:
|
|
print(f"{ts}: item {qid} replaced with redirect")
|
|
session.delete(item)
|
|
session.commit()
|
|
return
|
|
|
|
assert entity_qid == qid
|
|
existing_coords = wikidata.get_entity_coords(item.claims)
|
|
if "claims" not in entity:
|
|
return
|
|
coords = wikidata.get_entity_coords(entity["claims"])
|
|
|
|
if not coords_equal(existing_coords, coords):
|
|
print(f"{ts}: update item {qid}, including coordinates")
|
|
item.locations = model.location_objects(coords)
|
|
else:
|
|
print(f"{ts}: update item {qid}, no change to coordinates")
|
|
|
|
for key in entity_keys:
|
|
setattr(item, key, entity[key]) # type: ignore
|
|
|
|
|
|
def update_timestamp(timestamp: str) -> None:
|
|
"""Save timestamp to rc_timestamp."""
|
|
out = open("rc_timestamp", "w")
|
|
print(timestamp, file=out)
|
|
out.close()
|
|
|
|
|
|
def update_database() -> None:
|
|
"""Check recent changes and apply updates to local mirror of Wikidata."""
|
|
with open("rc_timestamp") as f:
|
|
start = f.read().strip()
|
|
|
|
rccontinue = None
|
|
seen = set()
|
|
while True:
|
|
r = wikidata_api.get_recent_changes(rcstart=start, rccontinue=rccontinue)
|
|
|
|
reply = r.json()
|
|
if (
|
|
"error" in reply
|
|
and reply["error"]["code"] == "internal_api_error_DBQueryTimeoutError"
|
|
):
|
|
print(reply)
|
|
sleep(10)
|
|
continue
|
|
if "query" not in reply:
|
|
print(reply)
|
|
for change in reply["query"]["recentchanges"]:
|
|
rctype = change["type"]
|
|
timestamp = change["timestamp"]
|
|
qid = change["title"]
|
|
if qid in seen:
|
|
continue
|
|
|
|
if rctype == "new":
|
|
handle_new(change)
|
|
seen.add(qid)
|
|
if rctype == "edit":
|
|
handle_edit(change)
|
|
seen.add(qid)
|
|
|
|
update_timestamp(timestamp)
|
|
print("commit")
|
|
session.commit()
|
|
|
|
if "continue" not in reply:
|
|
break
|
|
|
|
rccontinue = reply["continue"]["rccontinue"]
|
|
session.commit()
|
|
print("finished")
|
|
|
|
|
|
def main() -> None:
|
|
"""Infinite loop."""
|
|
while True:
|
|
update_database()
|
|
sleep(60)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|