#!/usr/bin/python3 import hashlib import itertools import json import os import re import socket import typing from collections import defaultdict from datetime import datetime from typing import Iterable, Mapping import requests.exceptions import simplejson.errors from flask import ( Flask, g, jsonify, redirect, render_template, request, session, url_for, ) from requests_oauthlib import OAuth1Session from sqlalchemy import distinct, func from sqlalchemy.orm import aliased from sqlalchemy.sql.expression import desc from werkzeug.wrappers import Response from depicts import ( artwork, commons, database, fixtures, human, mail, mediawiki, utils, wd_catalog, wdqs, wikibase, wikidata_edit, wikidata_oauth, ) from depicts.error_mail import setup_error_mail from depicts.model import ( DepictsItem, DepictsItemAltLabel, Edit, Item, Language, Triple, WikidataQuery, ) from depicts.pager import Pagination, init_pager from depicts.type import Entity user_agent = "Mozilla/5.0 (X11; Linux i586; rv:32.0) Gecko/20160101 Firefox/32.0" app = Flask(__name__) app.config.from_object("config.default") database.init_db(app.config["DB_URL"]) init_pager(app) setup_error_mail(app) find_more_props = { "P135": "movement", "P136": "genre", "P170": "artist", "P195": "collection", "P276": "location", "P495": "country of origin", "P127": "owned by", "P179": "part of the series", "P921": "main subject", "P186": "material used", "P88": "commissioned by", "P1028": "donated by", "P1071": "location of final assembly", "P138": "named after", "P1433": "published in", "P144": "based on", "P2079": "fabrication method", "P2348": "time period", "P361": "part of", "P608": "exhibition history", "P180": "depicts", "P31": "instance of", # possible future props # 'P571': 'inception', # 'P166': 'award received', (only 2) # 'P1419': 'shape', (only 2) # 'P123': 'publisher', (only 1) } isa_list = [ "Q60520", # sketchbook "Q93184", # drawing "Q3305213", # painting "Q15123870", # lithograph "Q18761202", # watercolor painting "Q79218", # triptych "Q2647254", # study "Q46686", # reredos ] re_qid = re.compile(r"^Q(\d+)") re_pid = re.compile(r"^P(\d+)") @app.teardown_appcontext def shutdown_session(exception: Exception | None = None) -> None: database.session.remove() # type:ignore @app.template_global() def set_url_args(endpoint: str | None = None, **new_args: str) -> str: if endpoint is None: endpoint = request.endpoint assert endpoint and request.view_args args = request.view_args.copy() args.update(request.args) args.update(new_args) args = {k: v for k, v in args.items() if v is not None} return url_for(endpoint, **args) @app.template_global() def current_url() -> str: """Get current URL.""" assert request and request.view_args and request.endpoint args = request.view_args.copy() args.update(request.args) return url_for(request.endpoint, **args) @app.before_request def init_profile() -> None: g.profiling = [] @app.before_request def global_user() -> None: g.user = wikidata_oauth.get_username() def check_for_blocks() -> None: if hasattr(g, "server_ip"): # already done return hostname = app.config.get("HOSTNAME") if not hostname: return g.server_ip = socket.gethostbyname(hostname) try: g.local_blocks = mediawiki.get_list("blocks", bkip=g.server_ip) g.global_blocks = mediawiki.get_list("globalblocks", bgip=g.server_ip) except Exception: pass @app.before_request def get_blocks() -> None: if app.config.get("SHOW_BLOCK_ALERT") is not False: check_for_blocks() @app.route("/find_more_setting") def flip_find_more() -> str: session["no_find_more"] = not session.get("no_find_more") display = {True: "on", False: "off"}[not session["no_find_more"]] return "flipped. find more is " + display def existing_edit(item_id: int, depicts_id: int) -> bool: q = Edit.query.filter_by(artwork_id=item_id, depicts_id=depicts_id) # type: ignore return bool(q.count() != 0) @app.route("/save/Q", methods=["POST"]) def save(item_id: int) -> str | Response: depicts = request.form.getlist("depicts") username = wikidata_oauth.get_username() assert username token = wikidata_oauth.get_token() artwork_item = Item.query.get(item_id) # type: ignore if artwork_item is None: artwork_entity = mediawiki.get_entity_with_cache(f"Q{item_id}") artwork_item = Item( item_id=item_id, entity=typing.cast(dict[str, str], artwork_entity) ) database.session.add(artwork_item) database.session.commit() for depicts_qid in depicts: depicts_id = int(depicts_qid[1:]) depicts_item = DepictsItem.query.get(depicts_id) # type: ignore if depicts_item is None: depicts_item = wikidata_edit.create_depicts_item(depicts_id) database.session.add(depicts_item) database.session.commit() for depicts_qid in depicts: depicts_id = int(depicts_qid[1:]) if existing_edit(item_id, depicts_id): continue r = create_claim(item_id, depicts_id, token) try: reply = r.json() except simplejson.errors.JSONDecodeError: mail.send_mail("depicts save error", r.text) raise save_error = reply.get("error") if save_error: mail.send_mail("depicts save error", r.text) return render_template("save_error.html", error=save_error) saved = r.json() lastrevid = saved["pageinfo"]["lastrevid"] assert saved["success"] == 1 edit = Edit( username=username, artwork_id=item_id, depicts_id=depicts_id, lastrevid=lastrevid, ) database.session.add(edit) database.session.commit() return redirect(url_for("next_page", item_id=item_id)) @app.route("/settings", methods=["GET", "POST"]) def user_settings() -> str: return render_template("user_settings.html") @app.route("/test/lookup") def test_lookup_page() -> str: return render_template("test_lookup.html") @app.route("/property/P") def property_query_page(property_id: int) -> str: pid = f"P{property_id}" g.title = find_more_props[pid] sort = request.args.get("sort") sort_by_name = sort and sort.lower().strip() == "name" q = ( database.session.query( # type: ignore Triple.object_id, func.count(func.distinct(Triple.subject_id)).label("c") ) .filter_by(predicate_id=property_id) .join(Item, Item.item_id == Triple.subject_id) .filter_by(is_artwork=True) .group_by(Triple.object_id) .order_by(desc("c")) ) page = utils.get_int_arg("page") or 1 total = q.count() page_size = 100 pager = Pagination(page, page_size, total) page_hits = pager.slice(q) labels = get_labels_db({f"Q{object_id}" for object_id, c in page_hits}) hits = [] for object_id, count in page_hits: qid = f"Q{object_id}" hits.append( {"qid": qid, "label": labels.get(qid) or "[item missing]", "count": count} ) return render_template( "property.html", label=g.title, order=("name" if sort_by_name else "count"), pid=pid, page=page, pager=pager, hits=hits, ) @app.route("/") def start() -> Response: return random_artwork() @app.route("/next") def random_artwork() -> Response: found = None while True: q = ( Item.query.filter_by(is_artwork=True) # type: ignore .order_by(func.random()) .limit(30) ) for item in q: has_depicts = "P180" in item.entity["claims"] if has_depicts: continue found = item break if found: break session[found.qid] = "from redirect" return redirect(url_for("item_page", item_id=found.item_id)) @app.route("/oauth/start") def start_oauth() -> Response: next_page = request.args.get("next") if next_page: session["after_login"] = next_page client_key = app.config["CLIENT_KEY"] client_secret = app.config["CLIENT_SECRET"] base_url = "https://www.wikidata.org/w/index.php" request_token_url = base_url + "?title=Special%3aOAuth%2finitiate" oauth = OAuth1Session(client_key, client_secret=client_secret, callback_uri="oob") fetch_response = oauth.fetch_request_token(request_token_url) session["owner_key"] = fetch_response.get("oauth_token") session["owner_secret"] = fetch_response.get("oauth_token_secret") base_authorization_url = "https://www.wikidata.org/wiki/Special:OAuth/authorize" authorization_url = oauth.authorization_url( base_authorization_url, oauth_consumer_key=client_key ) return redirect(authorization_url) @app.route("/oauth/callback", methods=["GET"]) def oauth_callback() -> Response: base_url = "https://www.wikidata.org/w/index.php" client_key = app.config["CLIENT_KEY"] client_secret = app.config["CLIENT_SECRET"] oauth = OAuth1Session( client_key, client_secret=client_secret, resource_owner_key=session["owner_key"], resource_owner_secret=session["owner_secret"], ) oauth_response = oauth.parse_authorization_response(request.url) verifier = oauth_response.get("oauth_verifier") access_token_url = base_url + "?title=Special%3aOAuth%2ftoken" oauth = OAuth1Session( client_key, client_secret=client_secret, resource_owner_key=session["owner_key"], resource_owner_secret=session["owner_secret"], verifier=verifier, ) oauth_tokens = oauth.fetch_access_token(access_token_url) session["owner_key"] = oauth_tokens.get("oauth_token") session["owner_secret"] = oauth_tokens.get("oauth_token_secret") next_page = session.get("after_login") return redirect(next_page) if next_page else random_artwork() @app.route("/oauth/disconnect") def oauth_disconnect() -> Response: for key in "owner_key", "owner_secret", "username", "after_login": if key in session: del session[key] return redirect(url_for("browse_page")) def create_claim(artwork_id: int, depicts_id: int, token: str) -> requests.Response: artwork_qid = f"Q{artwork_id}" value = json.dumps({"entity-type": "item", "numeric-id": depicts_id}) params: dict[str, str | int] = { "action": "wbcreateclaim", "entity": artwork_qid, "property": "P180", "snaktype": "value", "value": value, "token": token, "format": "json", "formatversion": 2, } r: requests.Response = wikidata_oauth.api_post_request(params) return r def image_with_cache(qid: str, image_filename: str, width: int) -> dict[str, str]: filename = f"cache/{qid}_{width}_image.json" detail = json.load(open(filename)) if os.path.exists(filename) else {} image_filename = image_filename.replace("_", " ") # The image associated with an item can change. # If that happens the detail in the cache will be for the wrong file. if not detail or image_filename not in detail: detail = commons.image_detail([image_filename], thumbwidth=width) json.dump(detail, open(filename, "w"), indent=2) image: dict[str, str] = detail.get(image_filename) return image def existing_depicts_from_entity(entity: Entity) -> list[dict[str, typing.Any]]: if "P180" not in entity["claims"]: return [] existing = [] new_depicts = False for claim in entity["claims"]["P180"]: if "datavalue" not in claim["mainsnak"]: continue item_id = claim["mainsnak"]["datavalue"]["value"]["numeric-id"] item = DepictsItem.query.get(item_id) # type: ignore if not item: item = wikidata_edit.create_depicts_item(item_id) database.session.add(item) new_depicts = True d = { "label": item.label, "description": item.description, "qid": f"Q{item.item_id}", "count": item.count, "existing": True, } existing.append(d) if new_depicts: database.session.commit() return existing def get_institution(entity: Entity, other: Mapping[str, str | None]) -> str | None: if "P276" in entity["claims"]: location = wikibase.first_datavalue(entity, "P276") assert isinstance(location, dict) if location: return typing.cast(str, other.get(location["id"])) if "P195" in entity["claims"]: collection = wikibase.first_datavalue(entity, "P195") assert isinstance(collection, dict) if collection: return typing.cast(str, other.get(collection["id"])) return None @app.route("/item/Q") def item_page(item_id: int) -> str | Response: qid = f"Q{item_id}" g.qid = qid item = artwork.Artwork(qid) from_redirect = qid in session and session.pop(qid) == "from redirect" entity = mediawiki.get_entity_with_cache(qid, refresh=not from_redirect) assert entity if "redirects" in entity: redirect_to_item_id = int(entity["redirects"]["to"][1:]) assert request.endpoint return redirect(url_for(request.endpoint, item_id=redirect_to_item_id)) existing_depicts = existing_depicts_from_entity(entity) width = 800 image_filename = item.image_filename if image_filename: image = image_with_cache(qid, image_filename, width) else: image = None # hits = item.run_query() label_and_language = get_entity_label_and_language(entity) if label_and_language: label = label_and_language["label"] else: label = None g.label = label other = get_other(item.entity) people = human.from_name(label) if label else None label_languages = label_and_language["languages"] if label_and_language else [] show_translation_links = all(lang.code != "en" for lang in label_languages) artwork_item = Item.query.get(item_id) # type: ignore if artwork_item is None: if not wdqs.is_artificial_physical_object(qid): return render_template( "not_artwork.html", qid=qid, item_id=item_id, item=item, labels=find_more_props, entity=item.entity, username=g.user, label=label, label_languages=label_languages, show_translation_links=show_translation_links, image=image, other=other, title=item.display_title, ) modified = datetime.strptime(entity["modified"], "%Y-%m-%dT%H:%M:%SZ") artwork_item = Item( item_id=item_id, entity=typing.cast(dict[str, str], entity), lastrevid=entity["lastrevid"], modified=modified, ) database.session.add(artwork_item) catalog = wd_catalog.get_catalog_from_artwork(entity) if not catalog.get("institution"): institution = get_institution(entity, other) assert institution catalog["institution"] = institution return render_template( "item.html", qid=qid, item_id=item_id, item=item, catalog=catalog, labels=find_more_props, entity=item.entity, username=g.user, label=label, label_languages=label_languages, show_translation_links=show_translation_links, existing_depicts=existing_depicts, image=image, people=people, other=other, # hits=hits, title=item.display_title, ) def get_languages(codes: typing.Iterable[str]) -> typing.Any: return Language.query.filter( # type: ignore Language.wikimedia_language_code.in_(codes) ) def get_entity_label_and_language(entity: Entity) -> dict[str, typing.Any] | None: """ Look for a useful label and return it with a list of languages that have that label. If the entity has a label in English return it. Otherwise check if all languages have the same label, if so then return it. """ group_by_label = defaultdict(set) for language, l in entity["labels"].items(): group_by_label[l["value"]].add(language) if "en" in entity["labels"]: label = entity["labels"]["en"]["value"] return {"label": label, "languages": get_languages(group_by_label[label])} if len(group_by_label) == 1: label, languages = list(group_by_label.items())[0] return {"label": label, "languages": get_languages(languages)} return None def get_labels(keys: typing.Iterable[str], name: str | None = None) -> dict[str, str]: keys = sorted(keys, key=lambda i: int(i[1:])) if name is None: name = hashlib.md5("_".join(keys).encode("utf-8")).hexdigest() filename = f"cache/{name}_labels.json" labels = [] if os.path.exists(filename): from_cache = json.load(open(filename)) if isinstance(from_cache, dict) and from_cache.get("keys") == keys: labels = from_cache["labels"] if not labels: for cur in utils.chunk(keys, 50): labels += mediawiki.get_entities(cur, props="labels") json.dump({"keys": keys, "labels": labels}, open(filename, "w"), indent=2) return { entity["id"]: wikibase.get_entity_label(entity) or "no English label" for entity in labels } def get_labels_db(keys: Iterable[str]): keys = set(keys) labels = {} missing = set() for qid in keys: m = re_qid.match(qid) if m: item_id = int(m.group(1)) item = Item.query.get(item_id) # type: ignore if item: labels[qid] = item.label continue missing.add(qid) page_size = 50 try: for cur in utils.chunk(missing, page_size): for entity in mediawiki.get_entities(cur): if "redirects" in entity: continue qid = entity["id"] modified = datetime.strptime(entity["modified"], "%Y-%m-%dT%H:%M:%SZ") # FIXME: check if the item is an artwork and set is_artwork correctly item = Item( # type: ignore item_id=int(qid[1:]), entity=entity, lastrevid=entity["lastrevid"], modified=modified, is_artwork=False, ) database.session.add(item) labels[qid] = item.label database.session.commit() except requests.exceptions.ReadTimeout: pass return labels def build_other_set(entity: Entity) -> set[str]: other_items = set() for key in find_more_props.keys(): for claim in entity["claims"].get(key, []): if "datavalue" not in claim["mainsnak"]: continue other_items.add(claim["mainsnak"]["datavalue"]["value"]["id"]) return other_items def get_other(entity: Entity) -> Mapping[str, str | None]: other_items = build_other_set(entity) return get_labels(other_items) @app.route("/edits") def list_edits() -> str: q = Edit.query.order_by(Edit.timestamp.desc()) # type: ignore page = utils.get_int_arg("page") or 1 pager = Pagination(page, 100, q.count()) item_count = database.session.query( func.count(distinct(Edit.artwork_id)) ).scalar() # type: ignore user_count = database.session.query( func.count(distinct(Edit.username)) ).scalar() # type: ignore return render_template( "list_edits.html", pager=pager, edit_list=pager.slice(q), item_count=item_count, user_count=user_count, ) @app.route("/user/") def user_page(username: str) -> str: edit_list = Edit.query.filter_by(username=username).order_by( # type: ignore Edit.timestamp.desc() ) item_count = ( database.session.query(func.count(distinct(Edit.artwork_id))) # type: ignore .filter_by(username=username) .scalar() ) return render_template( "user_page.html", username=username, edits=Edit.query, # type: ignore edit_list=edit_list, item_count=item_count, ) @app.route("/next/Q") def next_page(item_id: int) -> str: qid = f"Q{item_id}" entity = mediawiki.get_entity_with_cache(qid) assert entity width = 800 image_filename = wikibase.first_datavalue(entity, "P18") assert isinstance(image_filename, str) image = image_with_cache(qid, image_filename, width) label = wikibase.get_entity_label(entity) other = get_other(entity) other_list = [] for key, prop_label in find_more_props.items(): if key == "P186": # skip material used continue # too generic claims = entity["claims"].get(key) if not claims: continue values = [] for claim in claims: if "datavalue" not in claim["mainsnak"]: continue value = claim["mainsnak"]["datavalue"]["value"] claim_qid = value["id"] if claim_qid == "Q4233718": continue # anonymous artist numeric_id = value["numeric-id"] href = url_for("find_more_page", property_id=key[1:], item_id=numeric_id) values.append( { "href": href, "qid": claim_qid, "label": other.get(claim_qid), } ) if not values: continue qid_list = [v["qid"] for v in values] other_list.append( { "label": prop_label, "image_lookup": url_for("find_more_json", pid=key, qid=qid_list), "pid": key, "values": values, "images": [], } ) return render_template( "next.html", qid=qid, label=label, image=image, labels=find_more_props, other=other, entity=entity, other_props=other_list, ) @app.route("/P/Q") def find_more_page(property_id: int, item_id: int) -> Response: pid, qid = f"P{property_id}", f"Q{item_id}" return redirect(url_for("browse_page", **{pid: qid})) # type: ignore @app.route("/toolinfo.json") def tool_info() -> Response: info = { "name": "wade", "title": "Wikidata Art Depiction Explorer", "description": "Add depicts statements to works of art.", "url": "https://art.wikidata.link/", "keywords": "art, depicts, paintings, depiction", "author": "Edward Betts", "repository": "https://github.com/edwardbetts/depicts.git", } return jsonify(info) def get_facets(params) -> dict[str, typing.Any]: properties = [pid for pid in find_more_props.keys() if pid not in request.args] bindings = wdqs.run_from_template_with_cache( "query/facet.sparql", params=params, isa_list=isa_list, properties=properties ) facets: dict[str, list[dict[str, str | int]]] = { key: [] for key in find_more_props.keys() } for row in bindings: pid = row["property"]["value"].rpartition("/")[2] qid = row["object"]["value"].rpartition("/")[2] label = row["objectLabel"]["value"] count = int(row["count"]["value"]) if pid not in find_more_props: continue facets[pid].append({"qid": qid, "label": label, "count": count}) return { key: sorted(values, key=lambda i: i["count"], reverse=True)[:15] for key, values in facets.items() if values } def get_artwork_params() -> list[tuple[str, str]]: params = [] for pid, qid in request.args.items(): m = re_pid.match(pid) if not m: continue pid = m.group(0) m = re_qid.match(qid) if not m: continue qid = m.group(0) params.append((pid, qid)) return params def filter_artwork(params: list[tuple[str, str]]) -> list[wdqs.Row]: return wdqs.run_from_template_with_cache( "query/find_more.sparql", params=params, isa_list=isa_list ) @app.route("/catalog") def catalog_page() -> str: params = get_artwork_params() bindings = filter_artwork(params) page = utils.get_int_arg("page") or 1 page_size = 45 item_ids = set() for row in bindings: item_id = wdqs.row_id(row) item_ids.add(item_id) qids = [f"Q{item_id}" for item_id in sorted(item_ids)] items = [Item.query.get(item_id) for item_id in item_ids] # type: ignore entities = mediawiki.get_entities_with_cache(qids) items = [] other_items = set() for entity in entities: other_items.update(build_other_set(entity)) continue item = { "label": wikibase.get_entity_label(entity), "qid": entity["id"], "item_id": int(entity["id"][1:]), "image_filename": wikibase.first_datavalue(entity, "P18"), "entity": entity, } items.append(item) other = get_labels(other_items) flat = "_".join(f"{pid}={qid}" for pid, qid in params) thumbwidth = 400 # FIXME cache_name can be too long for filesystem cache_name = f"{flat}_{page}_{page_size}_{thumbwidth}" detail = get_image_detail_with_cache(items, cache_name, thumbwidth=thumbwidth) for item in items: item["url"] = url_for("item_page", item_id=item["item_id"]) item["image"] = detail[item["image_filename"]] item_labels = get_labels(qid for pid, qid in params) title = " / ".join( find_more_props[pid] + ": " + item_labels[qid] for pid, qid in params ) return render_template( "catalog.html", labels=find_more_props, items=items, other=other, title=title ) def get_image_detail_with_cache( items, cache_name: str, thumbwidth: int | None = None, refresh: bool = False ): filenames = [cur.image_filename() for cur in items] if thumbwidth is None: thumbwidth = app.config["THUMBWIDTH"] filename = f"cache/{cache_name}_images.json" cache_exists = os.path.exists(filename) detail = None if not refresh and cache_exists: try: detail = json.load(open(filename)) except json.decoder.JSONDecodeError: pass if not detail: try: detail = commons.image_detail(filenames, thumbwidth=thumbwidth) json.dump(detail, open(filename, "w"), indent=2) except requests.exceptions.ReadTimeout: detail = json.load(open(filename)) if cache_exists else {} return detail def browse_index() -> str: q = ( database.session.query( # type: ignore Triple.predicate_id, func.count(func.distinct(Triple.object_id)) ) .join(Item, Triple.subject_id == Item.item_id) .filter_by(is_artwork=True) .group_by(Triple.predicate_id) ) counts = {f"P{predicate_id}": count for predicate_id, count in q} return render_template("browse_index.html", props=find_more_props, counts=counts) @app.route("/debug/show_user") def debug_show_user() -> str: userinfo = wikidata_oauth.userinfo_call() return "
" + json.dumps(userinfo, indent=2) + "
" @app.route("/browse/facets.json") def browse_facets() -> Response: params = get_artwork_params() if not params: return jsonify(notice="facet criteria missing") facets = get_facets(params) for key, values in facets.items(): for v in values: v["href"] = set_url_args(endpoint="browse_page", **{key: v["qid"]}) return jsonify(params=params, facets=facets, prop_labels=find_more_props) def get_db_items(params): """Get items for browse page based on criteria.""" q = Item.query.filter_by(is_artwork=True) # type: ignore for pid, qid in params: q = q.join(Triple, Item.item_id == Triple.subject_id, aliased=True).filter( Triple.predicate_id == pid[1:], Triple.object_id == qid[1:] ) return q def get_db_facets(params): t = aliased(Triple) q = database.session.query(t.predicate_id, func.count().label("count"), t.object_id) facet_limit = 18 for pid, qid in params: q = q.join( # type: ignore Triple, t.subject_id == Triple.subject_id, aliased=True ).filter( Triple.predicate_id == pid[1:], Triple.object_id == qid[1:], t.predicate_id != pid[1:], t.object_id != qid[1:], ) q = q.group_by(t.predicate_id, t.object_id) results = sorted(tuple(row) for row in q.all()) facet_list = {} subject_qids: set[str] = set() for predicate_id, x in itertools.groupby(results, lambda row: row[0]): hits = sorted(x, key=lambda row: row[1], reverse=True) values = [ {"count": count, "qid": f"Q{value}"} for _, count, value in hits[:facet_limit] ] facet_list[f"P{predicate_id}"] = values subject_qids.update(i["qid"] for i in values) labels = get_labels_db(subject_qids) for values in facet_list.values(): for v in values: v["label"] = labels.get(v["qid"]) return facet_list @app.route("/browse") def browse_page() -> str: page_size = 45 params = get_artwork_params() if not params: return browse_index() flat = "_".join(f"{pid}={qid}" for pid, qid in params) item_labels = get_labels_db(qid for pid, qid in params) g.title = " / ".join( find_more_props[pid] + ": " + (item_labels.get(qid) or qid) for pid, qid in params ) q_items = get_db_items(params) facets = get_db_facets(params) all_items = q_items.all() page = utils.get_int_arg("page") or 1 total = q_items.count() pager = Pagination(page, page_size, total) items = [item for item in pager.slice(all_items) if item.image_filename()] cache_name = f"{flat}_{page}_{page_size}" detail = get_image_detail_with_cache(items, cache_name) cache_refreshed = False linked_qids = {qid for pid, qid in params} for item in items: artist_qid = item.artist if artist_qid: linked_qids.add(artist_qid) for prop in "P31", "P180": linked_qids.update(item.linked_qids(prop)) linked_labels = get_labels_db(linked_qids) for item in items: image_filename = item.image_filename() if not cache_refreshed and image_filename not in detail: detail = get_image_detail_with_cache(items, cache_name, refresh=True) cache_refreshed = True item.image = detail.get(image_filename) return render_template( "find_more.html", page=page, label=g.title, pager=pager, prop_labels=find_more_props, labels=find_more_props, linked_labels=linked_labels, items=items, total=total, params=params, facets=facets, ) return jsonify(params=params, items=items.count(), facets=facets) @app.route("/find_more.json") def find_more_json() -> Response: pid = request.args.get("pid") qid_list = request.args.getlist("qid") limit = 6 filenames = [] cache_name = f'{pid}={",".join(qid_list)}_{limit}' bindings = wdqs.run_from_template_with_cache( "query/find_more_basic.sparql", cache_name=cache_name, qid_list=qid_list, pid=pid, limit=limit, ) items = [] for row in bindings: item_id = wdqs.row_id(row) row_qid = f"Q{item_id}" image_filename = wdqs.commons_uri_to_filename(row["image"]["value"]) filenames.append(image_filename) items.append( { "qid": row_qid, "item_id": item_id, "href": url_for("item_page", item_id=item_id), "filename": image_filename, } ) thumbheight = 120 detail = commons.image_detail(filenames, thumbheight=thumbheight) for item in items: item["image"] = detail[item["filename"]] return jsonify(items=items) Hit = dict[str, str | int | None] def wikibase_search(terms: str) -> list[Hit]: hits = [] r = mediawiki.api_call( { "action": "wbsearchentities", "search": terms, "limit": "max", "language": "en", } ) for result in r.json()["search"]: hit = { "label": result["label"], "description": result.get("description") or None, "qid": result["id"], "count": 0, } if result["match"]["type"] == "alias": hit["alt_label"] = result["match"]["text"] hits.append(hit) return hits def add_images_to_depicts_lookup(hits: list[dict[str, str]]) -> None: qid_to_item = {hit["qid"]: hit for hit in hits} all_qids = [hit["qid"] for hit in hits] entities: list[Entity] = mediawiki.get_entities_with_cache(all_qids) for entity in entities: qid = entity["id"] item = qid_to_item[qid] item.entity = entity database.session.commit() for hit in hits: item = qid_to_item[hit["qid"]] if item.entity: image_filename = wikibase.first_datavalue(item.entity, "P18") hit["image_filename"] = image_filename filenames = [hit["image_filename"] for hit in hits if hit.get("image_filename")] filenames = filenames[:50] thumbwidth = 200 detail = commons.image_detail(filenames, thumbwidth=thumbwidth) for hit in hits: filename = hit.get("image_filename") if not filename or filename not in detail: continue hit["image"] = detail[filename] @app.route("/lookup") def depicts_lookup() -> Response: terms = request.args.get("terms") if not terms: return jsonify(error="terms parameter is required") terms = terms.strip() if len(terms) < 3: return jsonify( count=0, hits=[], notice="terms too short for lookup", ) item_ids = [] hits = [] q1 = DepictsItem.query.filter(DepictsItem.label.ilike(terms + "%")) # type: ignore seen = set() for item in q1: hit = { "label": item.label, "description": item.description, "qid": item.qid, "count": item.count, } item_ids.append(item.item_id) hits.append(hit) seen.add(item.qid) cls = DepictsItemAltLabel q2 = cls.query.filter( # type: ignore cls.alt_label.ilike(terms + "%"), ~cls.item_id.in_(item_ids) ) for alt in q2: item = alt.item if item.count is None: continue hit = { "label": item.label, "description": item.description, "qid": item.qid, "count": item.count, "alt_label": alt.alt_label, } hits.append(hit) seen.add(item.qid) hits.sort(key=lambda hit: hit["count"], reverse=True) if app.config.get("LOOKUP_INCLUDES_IMAGES"): add_images_to_depicts_lookup(hits) if app.config.get("SEARCH_WIKIDATA"): search_hits = wikibase_search(terms) hits += [hit for hit in search_hits if hit["qid"] not in seen] ret = { "count": q1.count() + q2.count(), "hits": hits, "terms": terms, } return jsonify(ret) @app.route("/report/missing_image") def missing_image_report() -> str: limit = utils.get_int_arg("limit") or 1000 q = DepictsItem.query.order_by(DepictsItem.count.desc()).limit(limit) # type:ignore qids = [item.qid for item in q] entities: dict[str, Entity] = mediawiki.get_entities_dict_with_cache(qids) item_list = [] for depicts in q: entity = entities[depicts.qid] if any(wikibase.first_datavalue(entity, prop) for prop in ("P18", "P2716")): continue item_list.append(depicts) # TODO: call wikidata search to find images that depict item return render_template("missing_image.html", item_list=item_list) @app.route("/report/wdqs") def wikidata_query_list() -> str: q = WikidataQuery.query.order_by(WikidataQuery.start_time.desc()) # type: ignore return render_template("query_list.html", q=q) @app.route("/report/blocks") def server_block_report() -> str: check_for_blocks() return render_template("block_report.html") @app.route("/fixture/save_error") def save_error_fixture() -> str: error = fixtures.save_error()["error"] return render_template("save_error.html", error=error) if __name__ == "__main__": app.debug = True app.run(host="0.0.0.0", debug=True)