Add the Wikidata recent changes updater

Download Wikidata recent changes and update items in local database.
This commit is contained in:
Edward Betts 2023-05-14 10:44:41 +00:00
parent 503280cfc1
commit ba37fae51a

249
update.py Executable file
View file

@ -0,0 +1,249 @@
#!/usr/bin/python3
"""Download Wikidata recent changes and update items in local database."""
import json
import os
import sys
from time import sleep
from matcher import database, model, utils, wikidata, wikidata_api
DB_URL = "postgresql:///matcher"
database.init_db(DB_URL)
previous_max_lastrevid = 1388804050 # Q106152661
entity_keys = {"labels", "sitelinks", "aliases", "claims", "descriptions", "lastrevid"}
def read_changes():
qids = set()
max_lastrevid = 0
for f in sorted(os.listdir("changes"), key=lambda f: int(f.partition(".")[0])):
reply = json.load(open("changes/" + f))
print(f, len(qids))
for change in reply["query"]["recentchanges"]:
# rctype = change["type"]
title = change["title"]
revid = change["revid"]
if revid and revid > max_lastrevid:
max_lastrevid = revid
assert title.startswith("Q")
qids.add(title)
print(len(qids))
print(max_lastrevid)
return
for cur in utils.chunk(qids, 50):
print(cur)
for qid, entity in wikidata_api.get_entities(cur):
with open(f"items/{qid}.json", "w") as out:
json.dump(entity, out)
def get_changes():
start = "2021-03-24T11:56:11"
rccontinue = None
i = 0
while True:
i += 1
r = wikidata_api.query_wd_api(rcstart=start, rccontinue=rccontinue)
with open(f"changes/{i:06d}.json", "w") as out:
out.write(r.text)
reply = r.json()
try:
print(reply["query"]["recentchanges"][0]["timestamp"])
except KeyError:
print("KeyError")
if False:
for change in reply["query"]["recentchanges"]:
# rctype = change["type"]
# if change["revid"] == 0 and change["old_revid"] == 0:
# continue
if change["logtype"] == "delete" and change["logaction"] in {
"revision",
"delete",
"restore",
}:
continue
if change["logtype"] == "protect" and change["logaction"] in {
"unprotect",
"protect",
}:
continue
print(json.dumps(change, indent=2))
sys.exit(0)
continue
if not change["title"].startswith("Q"):
continue # not an item
qid = change["title"]
assert qid[1:].isdigit()
item_id = int(qid[1:])
revid = change["revid"]
item = model.Item.query.get(item_id)
if change["type"] == "edit" and not item:
continue
if change["type"] == "new" and not item:
print(("new", qid))
continue
if not item:
print(qid)
print(json.dumps(change, indent=2))
print((change["type"], qid, item.lastrevid, revid))
# print(json.dumps(reply, indent=2))
if "continue" not in reply:
break
rccontinue = reply["continue"]["rccontinue"]
print(rccontinue)
sleep(1)
def get_timestamp():
ts = wikidata_api.get_revision_timestamp(previous_max_lastrevid)
print(ts)
def handle_new(change):
qid = change["title"]
ts = change["timestamp"]
if change["redirect"]:
print(f"{ts}: new item {qid}, since replaced with redirect")
return
item = model.Item.query.get(qid[1:]) # check if item is already loaded
if item:
return handle_edit(change)
entity = wikidata_api.get_entity(qid)
if entity["id"] != qid:
print(f'redirect {qid} -> {entity["id"]}')
return
if "claims" not in entity:
print(qid)
print(entity)
coords = wikidata.get_entity_coords(entity["claims"])
if not coords:
print(f"{ts}: new item {qid} without coordinates")
return
print(f"{ts}: new item {qid} with coordinates")
item_id = int(qid[1:])
obj = {k: v for k, v in entity.items() if k in entity_keys}
try:
item = model.Item(item_id=item_id, **obj)
except TypeError:
print(qid)
print(f'{entity["pageid"]=} {entity["ns"]=} {entity["type"]=}')
print(entity.keys())
raise
item.locations = model.location_objects(coords)
database.session.add(item)
def coords_equal(a, b):
"""Deep equality comparison of nested dicts."""
return json.dumps(a, sort_keys=True) == json.dumps(b, sort_keys=True)
def handle_edit(change):
qid = change["title"]
item = model.Item.query.get(qid[1:])
if not item:
return # item isn't in our database so it probably has no coordinates
ts = change["timestamp"]
if item.lastrevid >= change["revid"]:
print(f"{ts}: no need to update {qid}")
return
entity = wikidata_api.get_entity(qid)
entity_qid = entity.pop("id")
if entity_qid != qid:
print(f"{ts}: item {qid} replaced with redirect")
database.session.delete(item)
database.session.commit()
return
assert entity_qid == qid
existing_coords = wikidata.get_entity_coords(item.claims)
if "claims" not in entity:
return
coords = wikidata.get_entity_coords(entity["claims"])
if not coords_equal(existing_coords, coords):
print(f"{ts}: update item {qid}, including coordinates")
item.locations = model.location_objects(coords)
else:
print(f"{ts}: update item {qid}, no change to coordinates")
for key in entity_keys:
setattr(item, key, entity[key])
def update_timestamp(timestamp):
out = open("rc_timestamp", "w")
print(timestamp, file=out)
out.close()
def update_database():
with open("rc_timestamp") as f:
start = f.read().strip()
rccontinue = None
seen = set()
while True:
r = wikidata_api.get_recent_changes(rcstart=start, rccontinue=rccontinue)
reply = r.json()
for change in reply["query"]["recentchanges"]:
rctype = change["type"]
timestamp = change["timestamp"]
qid = change["title"]
if qid in seen:
continue
if rctype == "new":
handle_new(change)
seen.add(qid)
if rctype == "edit":
handle_edit(change)
seen.add(qid)
update_timestamp(timestamp)
print("commit")
database.session.commit()
if "continue" not in reply:
break
rccontinue = reply["continue"]["rccontinue"]
database.session.commit()
print("finished")
# read_changes()
# get_timestamp()
# get_changes()
while True:
update_database()
sleep(60)