diff --git a/app.py b/app.py index f68611b..24f4d44 100755 --- a/app.py +++ b/app.py @@ -1,65 +1,93 @@ #!/usr/bin/python3 -from flask import Flask, render_template, url_for, redirect, request, g, jsonify, session -from depicts import (utils, wdqs, commons, mediawiki, artwork, database, - wd_catalog, human, wikibase, wikidata_oauth, wikidata_edit, mail, - fixtures) -from depicts.pager import Pagination, init_pager -from depicts.model import (DepictsItem, DepictsItemAltLabel, Edit, Item, - Language, WikidataQuery, Triple) -from depicts.error_mail import setup_error_mail -from requests_oauthlib import OAuth1Session -from werkzeug.exceptions import InternalServerError -from werkzeug.debug.tbtools import get_current_traceback -from sqlalchemy import func, distinct -from sqlalchemy.orm import aliased -from sqlalchemy.sql.expression import desc -from collections import defaultdict -from datetime import datetime -import simplejson.errors -import requests.exceptions +import hashlib import inspect import itertools -import hashlib import json -import os import locale -import socket +import os import re +import socket +from collections import defaultdict +from datetime import datetime -locale.setlocale(locale.LC_ALL, 'en_US.UTF-8') -user_agent = 'Mozilla/5.0 (X11; Linux i586; rv:32.0) Gecko/20160101 Firefox/32.0' +import requests.exceptions +import simplejson.errors +import werkzeug +from flask import ( + Flask, + g, + jsonify, + redirect, + render_template, + request, + session, + url_for, +) +from requests_oauthlib import OAuth1Session +from sqlalchemy import distinct, func +from sqlalchemy.orm import aliased +from sqlalchemy.sql.expression import desc + +from depicts import ( + artwork, + commons, + database, + fixtures, + human, + mail, + mediawiki, + utils, + wd_catalog, + wdqs, + wikibase, + wikidata_edit, + wikidata_oauth, +) +from depicts.error_mail import setup_error_mail +from depicts.model import ( + DepictsItem, + DepictsItemAltLabel, + Edit, + Item, + Language, + Triple, + WikidataQuery, +) +from depicts.pager import Pagination, init_pager + +locale.setlocale(locale.LC_ALL, "en_US.UTF-8") +user_agent = "Mozilla/5.0 (X11; Linux i586; rv:32.0) Gecko/20160101 Firefox/32.0" app = Flask(__name__) -app.config.from_object('config.default') -database.init_db(app.config['DB_URL']) +app.config.from_object("config.default") +database.init_db(app.config["DB_URL"]) init_pager(app) setup_error_mail(app) find_more_props = { - 'P135': 'movement', - 'P136': 'genre', - 'P170': 'artist', - 'P195': 'collection', - 'P276': 'location', - 'P495': 'country of origin', - 'P127': 'owned by', - 'P179': 'part of the series', - 'P921': 'main subject', - 'P186': 'material used', - 'P88': 'commissioned by', - 'P1028': 'donated by', - 'P1071': 'location of final assembly', - 'P138': 'named after', - 'P1433': 'published in', - 'P144': 'based on', - 'P2079': 'fabrication method', - 'P2348': 'time period', - 'P361': 'part of', - 'P608': 'exhibition history', - 'P180': 'depicts', - 'P31': 'instance of', - + "P135": "movement", + "P136": "genre", + "P170": "artist", + "P195": "collection", + "P276": "location", + "P495": "country of origin", + "P127": "owned by", + "P179": "part of the series", + "P921": "main subject", + "P186": "material used", + "P88": "commissioned by", + "P1028": "donated by", + "P1071": "location of final assembly", + "P138": "named after", + "P1433": "published in", + "P144": "based on", + "P2079": "fabrication method", + "P2348": "time period", + "P361": "part of", + "P608": "exhibition history", + "P180": "depicts", + "P31": "instance of", # possible future props # 'P571': 'inception', # 'P166': 'award received', (only 2) @@ -68,32 +96,40 @@ find_more_props = { } isa_list = [ - 'Q60520', # sketchbook - 'Q93184', # drawing - 'Q3305213', # painting - 'Q15123870', # lithograph - 'Q18761202', # watercolor painting - 'Q79218', # triptych - 'Q2647254', # study - 'Q46686' # reredos + "Q60520", # sketchbook + "Q93184", # drawing + "Q3305213", # painting + "Q15123870", # lithograph + "Q18761202", # watercolor painting + "Q79218", # triptych + "Q2647254", # study + "Q46686", # reredos ] -re_qid = re.compile(r'^Q(\d+)') -re_pid = re.compile(r'^P(\d+)') +re_qid = re.compile(r"^Q(\d+)") +re_pid = re.compile(r"^P(\d+)") + @app.teardown_appcontext def shutdown_session(exception=None): database.session.remove() -@app.errorhandler(InternalServerError) + +@app.errorhandler(werkzeug.exceptions.InternalServerError) def exception_handler(e): - tb = get_current_traceback() + tb = werkzeug.debug.tbtools.get_current_traceback() last_frame = next(frame for frame in reversed(tb.frames) if not frame.is_library) last_frame_args = inspect.getargs(last_frame.code) - return render_template('show_error.html', - tb=tb, - last_frame=last_frame, - last_frame_args=last_frame_args), 500 + return ( + render_template( + "show_error.html", + tb=tb, + last_frame=last_frame, + last_frame_args=last_frame_args, + ), + 500, + ) + @app.template_global() def set_url_args(endpoint=None, **new_args): @@ -105,54 +141,60 @@ def set_url_args(endpoint=None, **new_args): args = {k: v for k, v in args.items() if v is not None} return url_for(endpoint, **args) + @app.template_global() def current_url(): args = request.view_args.copy() args.update(request.args) return url_for(request.endpoint, **args) + @app.before_request def init_profile(): g.profiling = [] + @app.before_request def global_user(): g.user = wikidata_oauth.get_username() + def check_for_blocks(): - if hasattr(g, 'server_ip'): # already done + if hasattr(g, "server_ip"): # already done return - hostname = app.config.get('HOSTNAME') + hostname = app.config.get("HOSTNAME") if not hostname: return g.server_ip = socket.gethostbyname(hostname) try: - g.local_blocks = mediawiki.get_list('blocks', - bkip=g.server_ip) - g.global_blocks = mediawiki.get_list('globalblocks', - bgip=g.server_ip) + g.local_blocks = mediawiki.get_list("blocks", bkip=g.server_ip) + g.global_blocks = mediawiki.get_list("globalblocks", bgip=g.server_ip) except Exception: pass + @app.before_request def get_blocks(): - if app.config.get('SHOW_BLOCK_ALERT') is not False: + if app.config.get("SHOW_BLOCK_ALERT") is not False: check_for_blocks() -@app.route('/find_more_setting') -def flip_find_more(): - session['no_find_more'] = not session.get('no_find_more') - display = {True: 'on', False: 'off'}[not session['no_find_more']] - return 'flipped. find more is ' + display +@app.route("/find_more_setting") +def flip_find_more(): + session["no_find_more"] = not session.get("no_find_more") + display = {True: "on", False: "off"}[not session["no_find_more"]] + + return "flipped. find more is " + display + def existing_edit(item_id, depicts_id): q = Edit.query.filter_by(artwork_id=item_id, depicts_id=depicts_id) return q.count() != 0 -@app.route('/save/Q', methods=['POST']) + +@app.route("/save/Q", methods=["POST"]) def save(item_id): - depicts = request.form.getlist('depicts') + depicts = request.form.getlist("depicts") username = wikidata_oauth.get_username() assert username @@ -160,7 +202,7 @@ def save(item_id): artwork_item = Item.query.get(item_id) if artwork_item is None: - artwork_entity = mediawiki.get_entity_with_cache(f'Q{item_id}') + artwork_entity = mediawiki.get_entity_with_cache(f"Q{item_id}") artwork_item = Item(item_id=item_id, entity=artwork_entity) database.session.add(artwork_item) database.session.commit() @@ -183,84 +225,96 @@ def save(item_id): try: reply = r.json() except simplejson.errors.JSONDecodeError: - mail.send_mail('depicts save error', r.text) + mail.send_mail("depicts save error", r.text) raise - save_error = reply.get('error') + save_error = reply.get("error") if save_error: - mail.send_mail('depicts save error', r.text) - return render_template('save_error.html', error=save_error) + mail.send_mail("depicts save error", r.text) + return render_template("save_error.html", error=save_error) saved = r.json() - lastrevid = saved['pageinfo']['lastrevid'] - assert saved['success'] == 1 - edit = Edit(username=username, - artwork_id=item_id, - depicts_id=depicts_id, - lastrevid=lastrevid) + lastrevid = saved["pageinfo"]["lastrevid"] + assert saved["success"] == 1 + edit = Edit( + username=username, + artwork_id=item_id, + depicts_id=depicts_id, + lastrevid=lastrevid, + ) database.session.add(edit) database.session.commit() - return redirect(url_for('next_page', item_id=item_id)) + return redirect(url_for("next_page", item_id=item_id)) -@app.route('/settings', methods=['GET', 'POST']) + +@app.route("/settings", methods=["GET", "POST"]) def user_settings(): - return render_template('user_settings.html') + return render_template("user_settings.html") -@app.route('/test/lookup') + +@app.route("/test/lookup") def test_lookup_page(): - return render_template('test_lookup.html') + return render_template("test_lookup.html") + @app.route("/property/P") def property_query_page(property_id): - pid = f'P{property_id}' + pid = f"P{property_id}" g.title = find_more_props[pid] - sort = request.args.get('sort') - sort_by_name = sort and sort.lower().strip() == 'name' + sort = request.args.get("sort") + sort_by_name = sort and sort.lower().strip() == "name" - q = (database.session.query(Triple.object_id, - func.count(func.distinct(Triple.subject_id)).label('c')) - .filter_by(predicate_id=property_id) - .join(Item, Item.item_id == Triple.subject_id) - .filter_by(is_artwork=True) - .group_by(Triple.object_id) - .order_by(desc('c'))) + q = ( + database.session.query( + Triple.object_id, func.count(func.distinct(Triple.subject_id)).label("c") + ) + .filter_by(predicate_id=property_id) + .join(Item, Item.item_id == Triple.subject_id) + .filter_by(is_artwork=True) + .group_by(Triple.object_id) + .order_by(desc("c")) + ) - page = utils.get_int_arg('page') or 1 + page = utils.get_int_arg("page") or 1 total = q.count() page_size = 100 pager = Pagination(page, page_size, total) page_hits = pager.slice(q) - labels = get_labels_db({f'Q{object_id}' for object_id, c in page_hits}) + labels = get_labels_db({f"Q{object_id}" for object_id, c in page_hits}) hits = [] for object_id, count in page_hits: - qid = f'Q{object_id}' - hits.append({'qid': qid, - 'label': labels.get(qid) or '[item missing]', - 'count': count}) + qid = f"Q{object_id}" + hits.append( + {"qid": qid, "label": labels.get(qid) or "[item missing]", "count": count} + ) - return render_template('property.html', - label=g.title, - order=('name' if sort_by_name else 'count'), - pid=pid, - page=page, - pager=pager, - hits=hits) + return render_template( + "property.html", + label=g.title, + order=("name" if sort_by_name else "count"), + pid=pid, + page=page, + pager=pager, + hits=hits, + ) -@app.route('/') + +@app.route("/") def start(): return random_artwork() -@app.route('/next') + +@app.route("/next") def random_artwork(): found = None while True: q = Item.query.filter_by(is_artwork=True).order_by(func.random()).limit(30) for item in q: - has_depicts = 'P180' in item.entity['claims'] + has_depicts = "P180" in item.entity["claims"] if has_depicts: continue found = item @@ -268,107 +322,115 @@ def random_artwork(): if found: break - session[found.qid] = 'from redirect' - return redirect(url_for('item_page', item_id=found.item_id)) + session[found.qid] = "from redirect" + return redirect(url_for("item_page", item_id=found.item_id)) -@app.route('/oauth/start') + +@app.route("/oauth/start") def start_oauth(): - next_page = request.args.get('next') + next_page = request.args.get("next") if next_page: - session['after_login'] = next_page + session["after_login"] = next_page - client_key = app.config['CLIENT_KEY'] - client_secret = app.config['CLIENT_SECRET'] - base_url = 'https://www.wikidata.org/w/index.php' - request_token_url = base_url + '?title=Special%3aOAuth%2finitiate' + client_key = app.config["CLIENT_KEY"] + client_secret = app.config["CLIENT_SECRET"] + base_url = "https://www.wikidata.org/w/index.php" + request_token_url = base_url + "?title=Special%3aOAuth%2finitiate" - oauth = OAuth1Session(client_key, - client_secret=client_secret, - callback_uri='oob') + oauth = OAuth1Session(client_key, client_secret=client_secret, callback_uri="oob") fetch_response = oauth.fetch_request_token(request_token_url) - session['owner_key'] = fetch_response.get('oauth_token') - session['owner_secret'] = fetch_response.get('oauth_token_secret') + session["owner_key"] = fetch_response.get("oauth_token") + session["owner_secret"] = fetch_response.get("oauth_token_secret") - base_authorization_url = 'https://www.wikidata.org/wiki/Special:OAuth/authorize' - authorization_url = oauth.authorization_url(base_authorization_url, - oauth_consumer_key=client_key) + base_authorization_url = "https://www.wikidata.org/wiki/Special:OAuth/authorize" + authorization_url = oauth.authorization_url( + base_authorization_url, oauth_consumer_key=client_key + ) return redirect(authorization_url) + @app.route("/oauth/callback", methods=["GET"]) def oauth_callback(): - base_url = 'https://www.wikidata.org/w/index.php' - client_key = app.config['CLIENT_KEY'] - client_secret = app.config['CLIENT_SECRET'] + base_url = "https://www.wikidata.org/w/index.php" + client_key = app.config["CLIENT_KEY"] + client_secret = app.config["CLIENT_SECRET"] - oauth = OAuth1Session(client_key, - client_secret=client_secret, - resource_owner_key=session['owner_key'], - resource_owner_secret=session['owner_secret']) + oauth = OAuth1Session( + client_key, + client_secret=client_secret, + resource_owner_key=session["owner_key"], + resource_owner_secret=session["owner_secret"], + ) oauth_response = oauth.parse_authorization_response(request.url) - verifier = oauth_response.get('oauth_verifier') - access_token_url = base_url + '?title=Special%3aOAuth%2ftoken' - oauth = OAuth1Session(client_key, - client_secret=client_secret, - resource_owner_key=session['owner_key'], - resource_owner_secret=session['owner_secret'], - verifier=verifier) + verifier = oauth_response.get("oauth_verifier") + access_token_url = base_url + "?title=Special%3aOAuth%2ftoken" + oauth = OAuth1Session( + client_key, + client_secret=client_secret, + resource_owner_key=session["owner_key"], + resource_owner_secret=session["owner_secret"], + verifier=verifier, + ) oauth_tokens = oauth.fetch_access_token(access_token_url) - session['owner_key'] = oauth_tokens.get('oauth_token') - session['owner_secret'] = oauth_tokens.get('oauth_token_secret') + session["owner_key"] = oauth_tokens.get("oauth_token") + session["owner_secret"] = oauth_tokens.get("oauth_token_secret") - next_page = session.get('after_login') + next_page = session.get("after_login") return redirect(next_page) if next_page else random_artwork() -@app.route('/oauth/disconnect') + +@app.route("/oauth/disconnect") def oauth_disconnect(): - for key in 'owner_key', 'owner_secret', 'username', 'after_login': + for key in "owner_key", "owner_secret", "username", "after_login": if key in session: del session[key] - return redirect(url_for('browse_page')) + return redirect(url_for("browse_page")) + def create_claim(artwork_id, depicts_id, token): - artwork_qid = f'Q{artwork_id}' - value = json.dumps({'entity-type': 'item', - 'numeric-id': depicts_id}) + artwork_qid = f"Q{artwork_id}" + value = json.dumps({"entity-type": "item", "numeric-id": depicts_id}) params = { - 'action': 'wbcreateclaim', - 'entity': artwork_qid, - 'property': 'P180', - 'snaktype': 'value', - 'value': value, - 'token': token, - 'format': 'json', - 'formatversion': 2, + "action": "wbcreateclaim", + "entity": artwork_qid, + "property": "P180", + "snaktype": "value", + "value": value, + "token": token, + "format": "json", + "formatversion": 2, } return wikidata_oauth.api_post_request(params) + def image_with_cache(qid, image_filename, width): - filename = f'cache/{qid}_{width}_image.json' + filename = f"cache/{qid}_{width}_image.json" detail = json.load(open(filename)) if os.path.exists(filename) else {} - image_filename = image_filename.replace('_', ' ') + image_filename = image_filename.replace("_", " ") # The image associated with an item can change. # If that happens the detail in the cache will be for the wrong file. if not detail or image_filename not in detail: detail = commons.image_detail([image_filename], thumbwidth=width) - json.dump(detail, open(filename, 'w'), indent=2) + json.dump(detail, open(filename, "w"), indent=2) return detail.get(image_filename) + def existing_depicts_from_entity(entity): - if 'P180' not in entity['claims']: + if "P180" not in entity["claims"]: return [] existing = [] new_depicts = False - for claim in entity['claims']['P180']: - if 'datavalue' not in claim['mainsnak']: + for claim in entity["claims"]["P180"]: + if "datavalue" not in claim["mainsnak"]: continue - item_id = claim['mainsnak']['datavalue']['value']['numeric-id'] + item_id = claim["mainsnak"]["datavalue"]["value"]["numeric-id"] item = DepictsItem.query.get(item_id) if not item: @@ -376,37 +438,39 @@ def existing_depicts_from_entity(entity): database.session.add(item) new_depicts = True d = { - 'label': item.label, - 'description': item.description, - 'qid': f'Q{item.item_id}', - 'count': item.count, - 'existing': True, + "label": item.label, + "description": item.description, + "qid": f"Q{item.item_id}", + "count": item.count, + "existing": True, } existing.append(d) if new_depicts: database.session.commit() return existing + def get_institution(entity, other): - if 'P276' in entity['claims']: - location = wikibase.first_datavalue(entity, 'P276') + if "P276" in entity["claims"]: + location = wikibase.first_datavalue(entity, "P276") if location: - return other.get(location['id']) - if 'P195' in entity['claims']: - collection = wikibase.first_datavalue(entity, 'P195') + return other.get(location["id"]) + if "P195" in entity["claims"]: + collection = wikibase.first_datavalue(entity, "P195") if collection: - return other.get(collection['id']) + return other.get(collection["id"]) + @app.route("/item/Q") def item_page(item_id): - qid = f'Q{item_id}' + qid = f"Q{item_id}" g.qid = qid item = artwork.Artwork(qid) - from_redirect = qid in session and session.pop(qid) == 'from redirect' + from_redirect = qid in session and session.pop(qid) == "from redirect" entity = mediawiki.get_entity_with_cache(qid, refresh=not from_redirect) - if 'redirects' in entity: - redirect_to_item_id = int(entity['redirects']['to'][1:]) + if "redirects" in entity: + redirect_to_item_id = int(entity["redirects"]["to"][1:]) return redirect(url_for(request.endpoint, item_id=redirect_to_item_id)) existing_depicts = existing_depicts_from_entity(entity) @@ -421,7 +485,7 @@ def item_page(item_id): # hits = item.run_query() label_and_language = get_entity_label_and_language(entity) if label_and_language: - label = label_and_language['label'] + label = label_and_language["label"] else: label = None g.label = label @@ -429,101 +493,107 @@ def item_page(item_id): people = human.from_name(label) if label else None - label_languages = label_and_language['languages'] if label_and_language else [] - show_translation_links = all(lang.code != 'en' for lang in label_languages) + label_languages = label_and_language["languages"] if label_and_language else [] + show_translation_links = all(lang.code != "en" for lang in label_languages) artwork_item = Item.query.get(item_id) if artwork_item is None: - if not wdqs.is_artificial_physical_object(qid): - return render_template('not_artwork.html', - qid=qid, - item_id=item_id, - item=item, - labels=find_more_props, - entity=item.entity, - username=g.user, - label=label, - label_languages=label_languages, - show_translation_links=show_translation_links, - image=image, - other=other, - title=item.display_title) + return render_template( + "not_artwork.html", + qid=qid, + item_id=item_id, + item=item, + labels=find_more_props, + entity=item.entity, + username=g.user, + label=label, + label_languages=label_languages, + show_translation_links=show_translation_links, + image=image, + other=other, + title=item.display_title, + ) - modified = datetime.strptime(entity['modified'], "%Y-%m-%dT%H:%M:%SZ") + modified = datetime.strptime(entity["modified"], "%Y-%m-%dT%H:%M:%SZ") - artwork_item = Item(item_id=item_id, - entity=entity, - lastrevid=entity['lastrevid'], - modified=modified) + artwork_item = Item( + item_id=item_id, + entity=entity, + lastrevid=entity["lastrevid"], + modified=modified, + ) database.session.add(artwork_item) catalog = wd_catalog.get_catalog_from_artwork(entity) - if not catalog.get('institution'): - catalog['institution'] = get_institution(entity, other) + if not catalog.get("institution"): + catalog["institution"] = get_institution(entity, other) + + return render_template( + "item.html", + qid=qid, + item_id=item_id, + item=item, + catalog=catalog, + labels=find_more_props, + entity=item.entity, + username=g.user, + label=label, + label_languages=label_languages, + show_translation_links=show_translation_links, + existing_depicts=existing_depicts, + image=image, + people=people, + other=other, + # hits=hits, + title=item.display_title, + ) - return render_template('item.html', - qid=qid, - item_id=item_id, - item=item, - catalog=catalog, - labels=find_more_props, - entity=item.entity, - username=g.user, - label=label, - label_languages=label_languages, - show_translation_links=show_translation_links, - existing_depicts=existing_depicts, - image=image, - people=people, - other=other, - # hits=hits, - title=item.display_title) def get_languages(codes): return Language.query.filter(Language.wikimedia_language_code.in_(codes)) + def get_entity_label_and_language(entity): - ''' + """ Look for a useful label and return it with a list of languages that have that label. If the entity has a label in English return it. Otherwise check if all languages have the same label, if so then return it. - ''' + """ group_by_label = defaultdict(set) - for language, l in entity['labels'].items(): - group_by_label[l['value']].add(language) + for language, l in entity["labels"].items(): + group_by_label[l["value"]].add(language) - if 'en' in entity['labels']: - label = entity['labels']['en']['value'] - return {'label': label, - 'languages': get_languages(group_by_label[label])} + if "en" in entity["labels"]: + label = entity["labels"]["en"]["value"] + return {"label": label, "languages": get_languages(group_by_label[label])} if len(group_by_label) == 1: label, languages = list(group_by_label.items())[0] - return {'label': label, - 'languages': get_languages(languages)} + return {"label": label, "languages": get_languages(languages)} + def get_labels(keys, name=None): keys = sorted(keys, key=lambda i: int(i[1:])) if name is None: - name = hashlib.md5('_'.join(keys).encode('utf-8')).hexdigest() - filename = f'cache/{name}_labels.json' + name = hashlib.md5("_".join(keys).encode("utf-8")).hexdigest() + filename = f"cache/{name}_labels.json" labels = [] if os.path.exists(filename): from_cache = json.load(open(filename)) - if isinstance(from_cache, dict) and from_cache.get('keys') == keys: - labels = from_cache['labels'] + if isinstance(from_cache, dict) and from_cache.get("keys") == keys: + labels = from_cache["labels"] if not labels: for cur in utils.chunk(keys, 50): - labels += mediawiki.get_entities(cur, props='labels') + labels += mediawiki.get_entities(cur, props="labels") - json.dump({'keys': keys, 'labels': labels}, - open(filename, 'w'), indent=2) + json.dump({"keys": keys, "labels": labels}, open(filename, "w"), indent=2) + + return {entity["id"]: wikibase.get_entity_label(entity) for entity in labels} - return {entity['id']: wikibase.get_entity_label(entity) for entity in labels} def get_labels_db(keys): keys = set(keys) @@ -544,18 +614,20 @@ def get_labels_db(keys): try: for cur in utils.chunk(missing, page_size): for entity in mediawiki.get_entities(cur): - if 'redirects' in entity: + if "redirects" in entity: continue - qid = entity['id'] + qid = entity["id"] - modified = datetime.strptime(entity['modified'], "%Y-%m-%dT%H:%M:%SZ") + modified = datetime.strptime(entity["modified"], "%Y-%m-%dT%H:%M:%SZ") # FIXME: check if the item is an artwork and set is_artwork correctly - item = Item(item_id=qid[1:], - entity=entity, - lastrevid=entity['lastrevid'], - modified=modified, - is_artwork=False) + item = Item( + item_id=qid[1:], + entity=entity, + lastrevid=entity["lastrevid"], + modified=modified, + is_artwork=False, + ) database.session.add(item) labels[qid] = item.label database.session.commit() @@ -564,63 +636,68 @@ def get_labels_db(keys): return labels + def build_other_set(entity): other_items = set() for key in find_more_props.keys(): - for claim in entity['claims'].get(key, []): - if 'datavalue' not in claim['mainsnak']: + for claim in entity["claims"].get(key, []): + if "datavalue" not in claim["mainsnak"]: continue - other_items.add(claim['mainsnak']['datavalue']['value']['id']) + other_items.add(claim["mainsnak"]["datavalue"]["value"]["id"]) return other_items + def get_other(entity): other_items = build_other_set(entity) return get_labels(other_items) + @app.route("/edits") def list_edits(): q = Edit.query.order_by(Edit.timestamp.desc()) - page = utils.get_int_arg('page') or 1 + page = utils.get_int_arg("page") or 1 pager = Pagination(page, 100, q.count()) - item_count = (database.session - .query(func.count(distinct(Edit.artwork_id))) - .scalar()) + item_count = database.session.query(func.count(distinct(Edit.artwork_id))).scalar() - user_count = (database.session - .query(func.count(distinct(Edit.username))) - .scalar()) + user_count = database.session.query(func.count(distinct(Edit.username))).scalar() + + return render_template( + "list_edits.html", + pager=pager, + edit_list=pager.slice(q), + item_count=item_count, + user_count=user_count, + ) - return render_template('list_edits.html', - pager=pager, - edit_list=pager.slice(q), - item_count=item_count, - user_count=user_count) @app.route("/user/") def user_page(username): - edit_list = (Edit.query.filter_by(username=username) - .order_by(Edit.timestamp.desc())) + edit_list = Edit.query.filter_by(username=username).order_by(Edit.timestamp.desc()) - item_count = (database.session - .query(func.count(distinct(Edit.artwork_id))) - .filter_by(username=username) - .scalar()) + item_count = ( + database.session.query(func.count(distinct(Edit.artwork_id))) + .filter_by(username=username) + .scalar() + ) + + return render_template( + "user_page.html", + username=username, + edits=Edit.query, + edit_list=edit_list, + item_count=item_count, + ) - return render_template('user_page.html', - username=username, - edits=Edit.query, - edit_list=edit_list, - item_count=item_count) @app.route("/next/Q") def next_page(item_id): - qid = f'Q{item_id}' + qid = f"Q{item_id}" entity = mediawiki.get_entity_with_cache(qid) width = 800 - image_filename = wikibase.first_datavalue(entity, 'P18') + image_filename = wikibase.first_datavalue(entity, "P18") image = image_with_cache(qid, image_filename, width) label = wikibase.get_entity_label(entity) @@ -628,95 +705,103 @@ def next_page(item_id): other_list = [] for key, prop_label in find_more_props.items(): - if key == 'P186': # skip material used - continue # too generic - claims = entity['claims'].get(key) + if key == "P186": # skip material used + continue # too generic + claims = entity["claims"].get(key) if not claims: continue values = [] for claim in claims: - if 'datavalue' not in claim['mainsnak']: + if "datavalue" not in claim["mainsnak"]: continue - value = claim['mainsnak']['datavalue']['value'] - claim_qid = value['id'] - if claim_qid == 'Q4233718': + value = claim["mainsnak"]["datavalue"]["value"] + claim_qid = value["id"] + if claim_qid == "Q4233718": continue # anonymous artist - numeric_id = value['numeric-id'] - href = url_for('find_more_page', property_id=key[1:], item_id=numeric_id) - values.append({ - 'href': href, - 'qid': claim_qid, - 'label': other.get(claim_qid), - }) + numeric_id = value["numeric-id"] + href = url_for("find_more_page", property_id=key[1:], item_id=numeric_id) + values.append( + { + "href": href, + "qid": claim_qid, + "label": other.get(claim_qid), + } + ) if not values: continue - qid_list = [v['qid'] for v in values] + qid_list = [v["qid"] for v in values] - other_list.append({ - 'label': prop_label, - 'image_lookup': url_for('find_more_json', pid=key, qid=qid_list), - 'pid': key, - 'values': values, - 'images': [], - }) + other_list.append( + { + "label": prop_label, + "image_lookup": url_for("find_more_json", pid=key, qid=qid_list), + "pid": key, + "values": values, + "images": [], + } + ) - return render_template('next.html', - qid=qid, - label=label, - image=image, - labels=find_more_props, - other=other, - entity=entity, - other_props=other_list) + return render_template( + "next.html", + qid=qid, + label=label, + image=image, + labels=find_more_props, + other=other, + entity=entity, + other_props=other_list, + ) -@app.route('/P/Q') + +@app.route("/P/Q") def find_more_page(property_id, item_id): - pid, qid = f'P{property_id}', f'Q{item_id}' - return redirect(url_for('browse_page', **{pid: qid})) + pid, qid = f"P{property_id}", f"Q{item_id}" + return redirect(url_for("browse_page", **{pid: qid})) -@app.route('/toolinfo.json') + +@app.route("/toolinfo.json") def tool_info(): info = { - 'name': 'wade', - 'title': 'Wikidata Art Depiction Explorer', - 'description': 'Add depicts statements to works of art.', - 'url': 'https://art.wikidata.link/', - 'keywords': 'art, depicts, paintings, depiction', - 'author': 'Edward Betts', - 'repository': 'https://github.com/edwardbetts/depicts.git', + "name": "wade", + "title": "Wikidata Art Depiction Explorer", + "description": "Add depicts statements to works of art.", + "url": "https://art.wikidata.link/", + "keywords": "art, depicts, paintings, depiction", + "author": "Edward Betts", + "repository": "https://github.com/edwardbetts/depicts.git", } return jsonify(info) -def get_facets(params): - properties = [pid for pid in find_more_props.keys() - if pid not in request.args] - bindings = wdqs.run_from_template_with_cache('query/facet.sparql', - params=params, - isa_list=isa_list, - properties=properties) +def get_facets(params): + properties = [pid for pid in find_more_props.keys() if pid not in request.args] + + bindings = wdqs.run_from_template_with_cache( + "query/facet.sparql", params=params, isa_list=isa_list, properties=properties + ) facets = {key: [] for key in find_more_props.keys()} for row in bindings: - pid = row['property']['value'].rpartition('/')[2] - qid = row['object']['value'].rpartition('/')[2] - label = row['objectLabel']['value'] - count = int(row['count']['value']) + pid = row["property"]["value"].rpartition("/")[2] + qid = row["object"]["value"].rpartition("/")[2] + label = row["objectLabel"]["value"] + count = int(row["count"]["value"]) if pid not in find_more_props: continue - facets[pid].append({'qid': qid, 'label': label, 'count': count}) + facets[pid].append({"qid": qid, "label": label, "count": count}) return { - key: sorted(values, key=lambda i: i['count'], reverse=True)[:15] + key: sorted(values, key=lambda i: i["count"], reverse=True)[:15] for key, values in facets.items() if values } + def get_artwork_params(): params = [] for pid, qid in request.args.items(): @@ -733,16 +818,18 @@ def get_artwork_params(): params.append((pid, qid)) return params -def filter_artwork(params): - return wdqs.run_from_template_with_cache('query/find_more.sparql', - params=params, - isa_list=isa_list) -@app.route('/catalog') +def filter_artwork(params): + return wdqs.run_from_template_with_cache( + "query/find_more.sparql", params=params, isa_list=isa_list + ) + + +@app.route("/catalog") def catalog_page(): params = get_artwork_params() bindings = filter_artwork(params) - page = utils.get_int_arg('page') or 1 + page = utils.get_int_arg("page") or 1 page_size = 45 item_ids = set() @@ -750,7 +837,7 @@ def catalog_page(): item_id = wdqs.row_id(row) item_ids.add(item_id) - qids = [f'Q{item_id}' for item_id in sorted(item_ids)] + qids = [f"Q{item_id}" for item_id in sorted(item_ids)] items = [Item.query.get(item_id) for item_id in item_ids] @@ -763,43 +850,43 @@ def catalog_page(): continue item = { - 'label': wikibase.get_entity_label(entity), - 'qid': entity['id'], - 'item_id': int(entity['id'][1:]), - 'image_filename': wikibase.first_datavalue(entity, 'P18'), - 'entity': entity, + "label": wikibase.get_entity_label(entity), + "qid": entity["id"], + "item_id": int(entity["id"][1:]), + "image_filename": wikibase.first_datavalue(entity, "P18"), + "entity": entity, } items.append(item) other = get_labels(other_items) - flat = '_'.join(f'{pid}={qid}' for pid, qid in params) + flat = "_".join(f"{pid}={qid}" for pid, qid in params) thumbwidth = 400 # FIXME cache_name can be too long for filesystem - cache_name = f'{flat}_{page}_{page_size}_{thumbwidth}' + cache_name = f"{flat}_{page}_{page_size}_{thumbwidth}" detail = get_image_detail_with_cache(items, cache_name, thumbwidth=thumbwidth) for item in items: - item['url'] = url_for('item_page', item_id=item['item_id']) - item['image'] = detail[item['image_filename']] + item["url"] = url_for("item_page", item_id=item["item_id"]) + item["image"] = detail[item["image_filename"]] item_labels = get_labels(qid for pid, qid in params) - title = ' / '.join(find_more_props[pid] + ': ' + item_labels[qid] - for pid, qid in params) + title = " / ".join( + find_more_props[pid] + ": " + item_labels[qid] for pid, qid in params + ) + + return render_template( + "catalog.html", labels=find_more_props, items=items, other=other, title=title + ) - return render_template('catalog.html', - labels=find_more_props, - items=items, - other=other, - title=title) def get_image_detail_with_cache(items, cache_name, thumbwidth=None, refresh=False): filenames = [cur.image_filename() for cur in items] if thumbwidth is None: - thumbwidth = app.config['THUMBWIDTH'] + thumbwidth = app.config["THUMBWIDTH"] - filename = f'cache/{cache_name}_images.json' + filename = f"cache/{cache_name}_images.json" cache_exists = os.path.exists(filename) detail = None if not refresh and cache_exists: @@ -810,66 +897,72 @@ def get_image_detail_with_cache(items, cache_name, thumbwidth=None, refresh=Fals if not detail: try: detail = commons.image_detail(filenames, thumbwidth=thumbwidth) - json.dump(detail, open(filename, 'w'), indent=2) + json.dump(detail, open(filename, "w"), indent=2) except requests.exceptions.ReadTimeout: detail = json.load(open(filename)) if cache_exists else {} return detail + def browse_index(): - q = (database.session.query(Triple.predicate_id, - func.count(func.distinct(Triple.object_id))) - .join(Item, Triple.subject_id == Item.item_id) - .filter_by(is_artwork=True) - .group_by(Triple.predicate_id)) + q = ( + database.session.query( + Triple.predicate_id, func.count(func.distinct(Triple.object_id)) + ) + .join(Item, Triple.subject_id == Item.item_id) + .filter_by(is_artwork=True) + .group_by(Triple.predicate_id) + ) - counts = {f'P{predicate_id}': count for predicate_id, count in q} + counts = {f"P{predicate_id}": count for predicate_id, count in q} - return render_template('browse_index.html', - props=find_more_props, - counts=counts) + return render_template("browse_index.html", props=find_more_props, counts=counts) -@app.route('/debug/show_user') + +@app.route("/debug/show_user") def debug_show_user(): userinfo = wikidata_oauth.userinfo_call() - return '
' + json.dumps(userinfo, indent=2) + '
' + return "
" + json.dumps(userinfo, indent=2) + "
" -@app.route('/browse/facets.json') + +@app.route("/browse/facets.json") def browse_facets(): params = get_artwork_params() if not params: - return jsonify(notice='facet criteria missing') + return jsonify(notice="facet criteria missing") facets = get_facets(params) for key, values in facets.items(): for v in values: - v['href'] = set_url_args(endpoint='browse_page', **{key: v['qid']}) + v["href"] = set_url_args(endpoint="browse_page", **{key: v["qid"]}) + + return jsonify(params=params, facets=facets, prop_labels=find_more_props) - return jsonify(params=params, - facets=facets, - prop_labels=find_more_props) def get_db_items(params): - ''' Get items for browse page based on criteria. ''' + """Get items for browse page based on criteria.""" q = Item.query.filter_by(is_artwork=True) for pid, qid in params: - q = (q.join(Triple, Item.item_id == Triple.subject_id, aliased=True) - .filter(Triple.predicate_id == pid[1:], Triple.object_id == qid[1:])) + q = q.join(Triple, Item.item_id == Triple.subject_id, aliased=True).filter( + Triple.predicate_id == pid[1:], Triple.object_id == qid[1:] + ) return q + def get_db_facets(params): t = aliased(Triple) - q = database.session.query(t.predicate_id, func.count().label('count'), t.object_id) + q = database.session.query(t.predicate_id, func.count().label("count"), t.object_id) facet_limit = 18 for pid, qid in params: - q = (q.join(Triple, t.subject_id == Triple.subject_id, aliased=True) - .filter(Triple.predicate_id == pid[1:], - Triple.object_id == qid[1:], - t.predicate_id != pid[1:], - t.object_id != qid[1:])) + q = q.join(Triple, t.subject_id == Triple.subject_id, aliased=True).filter( + Triple.predicate_id == pid[1:], + Triple.object_id == qid[1:], + t.predicate_id != pid[1:], + t.object_id != qid[1:], + ) q = q.group_by(t.predicate_id, t.object_id) @@ -879,20 +972,23 @@ def get_db_facets(params): subject_qids = set() for predicate_id, x in itertools.groupby(results, lambda row: row[0]): hits = sorted(list(x), key=lambda row: row[1], reverse=True) - values = [{'count': count, 'qid': f'Q{value}'} - for _, count, value in hits[:facet_limit]] - facet_list[f'P{predicate_id}'] = values - subject_qids.update(i['qid'] for i in values) + values = [ + {"count": count, "qid": f"Q{value}"} + for _, count, value in hits[:facet_limit] + ] + facet_list[f"P{predicate_id}"] = values + subject_qids.update(i["qid"] for i in values) labels = get_labels_db(subject_qids) for values in facet_list.values(): for v in values: - v['label'] = labels.get(v['qid']) + v["label"] = labels.get(v["qid"]) return facet_list -@app.route('/browse') + +@app.route("/browse") def browse_page(): page_size = 45 params = get_artwork_params() @@ -900,23 +996,25 @@ def browse_page(): if not params: return browse_index() - flat = '_'.join(f'{pid}={qid}' for pid, qid in params) + flat = "_".join(f"{pid}={qid}" for pid, qid in params) item_labels = get_labels_db(qid for pid, qid in params) - g.title = ' / '.join(find_more_props[pid] + ': ' + (item_labels.get(qid) or qid) - for pid, qid in params) + g.title = " / ".join( + find_more_props[pid] + ": " + (item_labels.get(qid) or qid) + for pid, qid in params + ) q_items = get_db_items(params) facets = get_db_facets(params) all_items = q_items.all() - page = utils.get_int_arg('page') or 1 + page = utils.get_int_arg("page") or 1 total = q_items.count() pager = Pagination(page, page_size, total) items = [item for item in pager.slice(all_items) if item.image_filename()] - cache_name = f'{flat}_{page}_{page_size}' + cache_name = f"{flat}_{page}_{page_size}" detail = get_image_detail_with_cache(items, cache_name) cache_refreshed = False @@ -925,7 +1023,7 @@ def browse_page(): artist_qid = item.artist if artist_qid: linked_qids.add(artist_qid) - for prop in 'P31', 'P180': + for prop in "P31", "P180": linked_qids.update(item.linked_qids(prop)) linked_labels = get_labels_db(linked_qids) @@ -937,173 +1035,183 @@ def browse_page(): cache_refreshed = True item.image = detail.get(image_filename) - return render_template('find_more.html', - page=page, - label=g.title, - pager=pager, - prop_labels=find_more_props, - labels=find_more_props, - linked_labels=linked_labels, - items=items, - total=total, - params=params, - facets=facets) + return render_template( + "find_more.html", + page=page, + label=g.title, + pager=pager, + prop_labels=find_more_props, + labels=find_more_props, + linked_labels=linked_labels, + items=items, + total=total, + params=params, + facets=facets, + ) - return jsonify(params=params, - items=items.count(), - facets=facets) + return jsonify(params=params, items=items.count(), facets=facets) -@app.route('/find_more.json') + +@app.route("/find_more.json") def find_more_json(): - pid = request.args.get('pid') - qid_list = request.args.getlist('qid') + pid = request.args.get("pid") + qid_list = request.args.getlist("qid") limit = 6 filenames = [] cache_name = f'{pid}={",".join(qid_list)}_{limit}' - bindings = wdqs.run_from_template_with_cache('query/find_more_basic.sparql', - cache_name=cache_name, - qid_list=qid_list, - pid=pid, - limit=limit) + bindings = wdqs.run_from_template_with_cache( + "query/find_more_basic.sparql", + cache_name=cache_name, + qid_list=qid_list, + pid=pid, + limit=limit, + ) items = [] for row in bindings: item_id = wdqs.row_id(row) - row_qid = f'Q{item_id}' - image_filename = wdqs.commons_uri_to_filename(row['image']['value']) + row_qid = f"Q{item_id}" + image_filename = wdqs.commons_uri_to_filename(row["image"]["value"]) filenames.append(image_filename) - items.append({'qid': row_qid, - 'item_id': item_id, - 'href': url_for('item_page', item_id=item_id), - 'filename': image_filename}) + items.append( + { + "qid": row_qid, + "item_id": item_id, + "href": url_for("item_page", item_id=item_id), + "filename": image_filename, + } + ) thumbheight = 120 detail = commons.image_detail(filenames, thumbheight=thumbheight) for item in items: - item['image'] = detail[item['filename']] + item["image"] = detail[item["filename"]] return jsonify(items=items) + def wikibase_search(terms): hits = [] - r = mediawiki.api_call({ - 'action': 'wbsearchentities', - 'search': terms, - 'limit': 'max', - 'language': 'en' - }) - for result in r.json()['search']: - hit = { - 'label': result['label'], - 'description': result.get('description') or None, - 'qid': result['id'], - 'count': 0, + r = mediawiki.api_call( + { + "action": "wbsearchentities", + "search": terms, + "limit": "max", + "language": "en", } - if result['match']['type'] == 'alias': - hit['alt_label'] = result['match']['text'] + ) + for result in r.json()["search"]: + hit = { + "label": result["label"], + "description": result.get("description") or None, + "qid": result["id"], + "count": 0, + } + if result["match"]["type"] == "alias": + hit["alt_label"] = result["match"]["text"] hits.append(hit) return hits + def add_images_to_depicts_lookup(hits): - qid_to_item = {hit['qid']: hit for hit in hits} - all_qids = [hit['qid'] for hit in hits] + qid_to_item = {hit["qid"]: hit for hit in hits} + all_qids = [hit["qid"] for hit in hits] entities = mediawiki.get_entities_with_cache(all_qids) for entity in entities: - qid = entity['id'] + qid = entity["id"] item = qid_to_item[qid] item.entity = entity database.session.commit() for hit in hits: - item = qid_to_item[hit['qid']] + item = qid_to_item[hit["qid"]] if item.entity: - image_filename = wikibase.first_datavalue(item.entity, 'P18') - hit['image_filename'] = image_filename + image_filename = wikibase.first_datavalue(item.entity, "P18") + hit["image_filename"] = image_filename - filenames = [hit['image_filename'] - for hit in hits - if hit.get('image_filename')] + filenames = [hit["image_filename"] for hit in hits if hit.get("image_filename")] filenames = filenames[:50] thumbwidth = 200 detail = commons.image_detail(filenames, thumbwidth=thumbwidth) for hit in hits: - filename = hit.get('image_filename') + filename = hit.get("image_filename") if not filename or filename not in detail: continue - hit['image'] = detail[filename] + hit["image"] = detail[filename] -@app.route('/lookup') + +@app.route("/lookup") def depicts_lookup(): - terms = request.args.get('terms') + terms = request.args.get("terms") if not terms: - return jsonify(error='terms parameter is required') + return jsonify(error="terms parameter is required") terms = terms.strip() if len(terms) < 3: return jsonify( count=0, hits=[], - notice='terms too short for lookup', + notice="terms too short for lookup", ) item_ids = [] hits = [] - q1 = DepictsItem.query.filter(DepictsItem.label.ilike(terms + '%')) + q1 = DepictsItem.query.filter(DepictsItem.label.ilike(terms + "%")) seen = set() for item in q1: hit = { - 'label': item.label, - 'description': item.description, - 'qid': item.qid, - 'count': item.count, + "label": item.label, + "description": item.description, + "qid": item.qid, + "count": item.count, } item_ids.append(item.item_id) hits.append(hit) seen.add(item.qid) cls = DepictsItemAltLabel - q2 = cls.query.filter(cls.alt_label.ilike(terms + '%'), - ~cls.item_id.in_(item_ids)) + q2 = cls.query.filter(cls.alt_label.ilike(terms + "%"), ~cls.item_id.in_(item_ids)) for alt in q2: item = alt.item if item.count is None: continue hit = { - 'label': item.label, - 'description': item.description, - 'qid': item.qid, - 'count': item.count, - 'alt_label': alt.alt_label, + "label": item.label, + "description": item.description, + "qid": item.qid, + "count": item.count, + "alt_label": alt.alt_label, } hits.append(hit) seen.add(item.qid) - hits.sort(key=lambda hit: hit['count'], reverse=True) + hits.sort(key=lambda hit: hit["count"], reverse=True) - if app.config.get('LOOKUP_INCLUDES_IMAGES'): + if app.config.get("LOOKUP_INCLUDES_IMAGES"): add_images_to_depicts_lookup(hits) - if app.config.get('SEARCH_WIKIDATA'): + if app.config.get("SEARCH_WIKIDATA"): search_hits = wikibase_search(terms) - hits += [hit for hit in search_hits if hit['qid'] not in seen] + hits += [hit for hit in search_hits if hit["qid"] not in seen] ret = { - 'count': q1.count() + q2.count(), - 'hits': hits, - 'terms': terms, + "count": q1.count() + q2.count(), + "hits": hits, + "terms": terms, } return jsonify(ret) -@app.route('/report/missing_image') + +@app.route("/report/missing_image") def missing_image_report(): - limit = utils.get_int_arg('limit') or 1000 + limit = utils.get_int_arg("limit") or 1000 q = DepictsItem.query.order_by(DepictsItem.count.desc()).limit(limit) qids = [item.qid for item in q] @@ -1113,30 +1221,33 @@ def missing_image_report(): for depicts in q: entity = entities[depicts.qid] - if any(wikibase.first_datavalue(entity, prop) for prop in ('P18', 'P2716')): + if any(wikibase.first_datavalue(entity, prop) for prop in ("P18", "P2716")): continue item_list.append(depicts) # TODO: call wikidata search to find images that depict item - return render_template('missing_image.html', item_list=item_list) + return render_template("missing_image.html", item_list=item_list) -@app.route('/report/wdqs') + +@app.route("/report/wdqs") def wikidata_query_list(): q = WikidataQuery.query.order_by(WikidataQuery.start_time.desc()) - return render_template('query_list.html', q=q) + return render_template("query_list.html", q=q) -@app.route('/report/blocks') + +@app.route("/report/blocks") def server_block_report(): check_for_blocks() - return render_template('block_report.html') + return render_template("block_report.html") -@app.route('/fixture/save_error') + +@app.route("/fixture/save_error") def save_error_fixture(): - error = fixtures.save_error()['error'] - return render_template('save_error.html', error=error) + error = fixtures.save_error()["error"] + return render_template("save_error.html", error=error) if __name__ == "__main__": app.debug = True - app.run(host='0.0.0.0', debug=True) + app.run(host="0.0.0.0", debug=True)