diff --git a/confarchive/wikidata.py b/confarchive/wikidata.py index 19fd78f..0df3a6e 100644 --- a/confarchive/wikidata.py +++ b/confarchive/wikidata.py @@ -13,8 +13,6 @@ commons_url = "https://www.wikidata.org/w/api.php" wikidata_api = "https://www.wikidata.org/w/api.php" user_agent = "conference-archive/0.1 (contact: edward@4angle.com)" -CallParams = dict[str, str | int] - s = requests.Session() s.headers.update({"User-Agent": user_agent}) @@ -51,7 +49,7 @@ def search(q: str) -> list[dict[str, typing.Any]]: def api_image_detail_call(filename: str) -> requests.Response: """Call the Commons API.""" - call_params: CallParams = { + call_params = { "format": "json", "formatversion": 2, "action": "query", @@ -104,7 +102,6 @@ def download_photo(filename: str) -> None: def get_photo(filename: str) -> None: - """Download filename and resize.""" save_to = os.path.join("static", "wikidata_photo", filename) thumb = os.path.join("static", "wikidata_photo", "thumb", filename) if not os.path.exists(save_to): diff --git a/main.py b/main.py index 7751551..e259721 100755 --- a/main.py +++ b/main.py @@ -1,8 +1,14 @@ #!/usr/bin/python3 +import hashlib +import json import os +import time +import typing +from typing import cast import flask +import requests import sqlalchemy from sqlalchemy import func, or_, update from werkzeug.wrappers import Response @@ -15,6 +21,37 @@ app.debug = True app.config.from_object("config.default") database.init_app(app) +wikidata_api = "https://www.wikidata.org/w/api.php" + + +def md5sum(s: str) -> str: + return hashlib.md5(s.encode("utf-8")).hexdigest() + + +def wikidata_search(q: str) -> list[dict[str, typing.Any]]: + q += " haswbstatement:P31=Q5" + q_md5 = md5sum(q) + + cache_filename = os.path.join("cache", q_md5 + ".json") + + if os.path.exists(cache_filename): + data = json.load(open(cache_filename)) + else: + params: dict[str, str | int] = { + "action": "query", + "list": "search", + "format": "json", + "formatversion": 2, + "srsearch": q, + "srlimit": "max", + } + r = requests.get(wikidata_api, params=params) + open(cache_filename, "w").write(r.text) + data = r.json() + time.sleep(1) + + return cast(list[dict[str, typing.Any]], data["query"]["search"]) + def top_speakers() -> sqlalchemy.orm.query.Query: q = ( @@ -52,7 +89,6 @@ def top_events() -> sqlalchemy.orm.query.Query: def drop_start(s: str, start: str) -> str: - """Remove text from the start of a string.""" return s[len(start) :] if s.startswith(start) else s @@ -85,8 +121,7 @@ def person(person_id: int) -> str | Response: wikidata_hits: list[dict[str, str]] = [] if False and item.wikidata_qid is None: - q = item.name + " haswbstatement:P31=Q5" - search_hits = wikidata.search(q) + search_hits = wikidata_search(item.name) print(len(search_hits)) for search_hit in search_hits: qid = search_hit["title"] @@ -315,8 +350,7 @@ def link_to_wikidata() -> str: for person, num in top_speakers2(): if person.wikidata_qid: continue - q = person.name + " haswbstatement:P31=Q5" - search_hits = wikidata.search(q) + search_hits = wikidata_search(person.name) if not search_hits: continue @@ -327,7 +361,7 @@ def link_to_wikidata() -> str: for search_hit in search_hits: qid = search_hit["title"] - item = wikidata.get_item(qid) + item = wikidata_get_item(qid) if "en" in item["labels"]: label = item["labels"]["en"]["value"] else: @@ -391,8 +425,7 @@ def delete_person(person_id: int) -> str | Response: return flask.redirect(flask.url_for("index")) -def person_image_filename(person_id: int) -> str: - """Filename for speaker photo.""" +def person_image_filename(person_id): person = model.Person.query.get(person_id) return os.path.join("wikidata_photo", "thumb", person.wikidata_photo[0]) @@ -407,7 +440,6 @@ def person_image_filename(person_id: int) -> str: @app.route("/github_wikidata") def github_wikidata() -> str: - """Look for speakers on Wikidata based on the GitHub property.""" items = [] for line in open("found_wikidata_github"): person_id, person_name, qid, wd_name, github, photo = eval(line) @@ -421,7 +453,6 @@ def github_wikidata() -> str: @app.route("/reports") def reports_page() -> str: - """Page showing statistics.""" event_count = model.Event.query.count() missing_event_date_count = model.Event.query.filter(