From ba37fae51ad4778fe1640cbde3ea4de6c9820051 Mon Sep 17 00:00:00 2001 From: Edward Betts Date: Sun, 14 May 2023 10:44:41 +0000 Subject: [PATCH] Add the Wikidata recent changes updater Download Wikidata recent changes and update items in local database. --- update.py | 249 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 249 insertions(+) create mode 100755 update.py diff --git a/update.py b/update.py new file mode 100755 index 0000000..038b736 --- /dev/null +++ b/update.py @@ -0,0 +1,249 @@ +#!/usr/bin/python3 + +"""Download Wikidata recent changes and update items in local database.""" + +import json +import os +import sys +from time import sleep + +from matcher import database, model, utils, wikidata, wikidata_api + +DB_URL = "postgresql:///matcher" +database.init_db(DB_URL) + +previous_max_lastrevid = 1388804050 # Q106152661 + +entity_keys = {"labels", "sitelinks", "aliases", "claims", "descriptions", "lastrevid"} + + +def read_changes(): + qids = set() + max_lastrevid = 0 + for f in sorted(os.listdir("changes"), key=lambda f: int(f.partition(".")[0])): + reply = json.load(open("changes/" + f)) + print(f, len(qids)) + for change in reply["query"]["recentchanges"]: + # rctype = change["type"] + title = change["title"] + revid = change["revid"] + if revid and revid > max_lastrevid: + max_lastrevid = revid + assert title.startswith("Q") + qids.add(title) + print(len(qids)) + print(max_lastrevid) + + return + + for cur in utils.chunk(qids, 50): + print(cur) + for qid, entity in wikidata_api.get_entities(cur): + with open(f"items/{qid}.json", "w") as out: + json.dump(entity, out) + + +def get_changes(): + start = "2021-03-24T11:56:11" + rccontinue = None + i = 0 + while True: + i += 1 + r = wikidata_api.query_wd_api(rcstart=start, rccontinue=rccontinue) + with open(f"changes/{i:06d}.json", "w") as out: + out.write(r.text) + + reply = r.json() + try: + print(reply["query"]["recentchanges"][0]["timestamp"]) + except KeyError: + print("KeyError") + + if False: + for change in reply["query"]["recentchanges"]: + # rctype = change["type"] + # if change["revid"] == 0 and change["old_revid"] == 0: + # continue + + if change["logtype"] == "delete" and change["logaction"] in { + "revision", + "delete", + "restore", + }: + continue + + if change["logtype"] == "protect" and change["logaction"] in { + "unprotect", + "protect", + }: + continue + + print(json.dumps(change, indent=2)) + sys.exit(0) + + continue + + if not change["title"].startswith("Q"): + continue # not an item + + qid = change["title"] + assert qid[1:].isdigit() + item_id = int(qid[1:]) + revid = change["revid"] + + item = model.Item.query.get(item_id) + if change["type"] == "edit" and not item: + continue + + if change["type"] == "new" and not item: + print(("new", qid)) + continue + + if not item: + print(qid) + print(json.dumps(change, indent=2)) + print((change["type"], qid, item.lastrevid, revid)) + + # print(json.dumps(reply, indent=2)) + + if "continue" not in reply: + break + + rccontinue = reply["continue"]["rccontinue"] + print(rccontinue) + sleep(1) + + +def get_timestamp(): + ts = wikidata_api.get_revision_timestamp(previous_max_lastrevid) + print(ts) + + +def handle_new(change): + qid = change["title"] + ts = change["timestamp"] + if change["redirect"]: + print(f"{ts}: new item {qid}, since replaced with redirect") + return + item = model.Item.query.get(qid[1:]) # check if item is already loaded + if item: + return handle_edit(change) + + entity = wikidata_api.get_entity(qid) + if entity["id"] != qid: + print(f'redirect {qid} -> {entity["id"]}') + return + + if "claims" not in entity: + print(qid) + print(entity) + coords = wikidata.get_entity_coords(entity["claims"]) + if not coords: + print(f"{ts}: new item {qid} without coordinates") + return + + print(f"{ts}: new item {qid} with coordinates") + + item_id = int(qid[1:]) + obj = {k: v for k, v in entity.items() if k in entity_keys} + try: + item = model.Item(item_id=item_id, **obj) + except TypeError: + print(qid) + print(f'{entity["pageid"]=} {entity["ns"]=} {entity["type"]=}') + print(entity.keys()) + raise + item.locations = model.location_objects(coords) + database.session.add(item) + + +def coords_equal(a, b): + """Deep equality comparison of nested dicts.""" + return json.dumps(a, sort_keys=True) == json.dumps(b, sort_keys=True) + + +def handle_edit(change): + qid = change["title"] + item = model.Item.query.get(qid[1:]) + if not item: + return # item isn't in our database so it probably has no coordinates + + ts = change["timestamp"] + + if item.lastrevid >= change["revid"]: + print(f"{ts}: no need to update {qid}") + return + + entity = wikidata_api.get_entity(qid) + entity_qid = entity.pop("id") + if entity_qid != qid: + print(f"{ts}: item {qid} replaced with redirect") + database.session.delete(item) + database.session.commit() + return + + assert entity_qid == qid + existing_coords = wikidata.get_entity_coords(item.claims) + if "claims" not in entity: + return + coords = wikidata.get_entity_coords(entity["claims"]) + + if not coords_equal(existing_coords, coords): + print(f"{ts}: update item {qid}, including coordinates") + item.locations = model.location_objects(coords) + else: + print(f"{ts}: update item {qid}, no change to coordinates") + + for key in entity_keys: + setattr(item, key, entity[key]) + + +def update_timestamp(timestamp): + out = open("rc_timestamp", "w") + print(timestamp, file=out) + out.close() + + +def update_database(): + with open("rc_timestamp") as f: + start = f.read().strip() + + rccontinue = None + seen = set() + while True: + r = wikidata_api.get_recent_changes(rcstart=start, rccontinue=rccontinue) + + reply = r.json() + for change in reply["query"]["recentchanges"]: + rctype = change["type"] + timestamp = change["timestamp"] + qid = change["title"] + if qid in seen: + continue + + if rctype == "new": + handle_new(change) + seen.add(qid) + if rctype == "edit": + handle_edit(change) + seen.add(qid) + + update_timestamp(timestamp) + print("commit") + database.session.commit() + + if "continue" not in reply: + break + + rccontinue = reply["continue"]["rccontinue"] + database.session.commit() + print("finished") + + +# read_changes() +# get_timestamp() +# get_changes() + +while True: + update_database() + sleep(60)