From fd281532e6f849576923565d8697197da241f267 Mon Sep 17 00:00:00 2001 From: Edward Betts Date: Wed, 25 Oct 2023 07:54:05 +0100 Subject: [PATCH] Add type hints and docstrings --- app.py | 252 +++++++++-------- depicts/artwork.py | 152 +++++++---- depicts/barnesfoundation.py | 61 +++-- depicts/category.py | 132 ++++++--- depicts/commons.py | 37 ++- depicts/database.py | 36 ++- depicts/dia.py | 58 ++-- depicts/error_mail.py | 60 ++-- depicts/fixtures.py | 41 ++- depicts/human.py | 57 ++-- depicts/mail.py | 29 +- depicts/mediawiki.py | 218 +++++++++------ depicts/model.py | 198 ++++++++------ depicts/museodelprado.py | 45 ++- depicts/npg.py | 45 ++- depicts/pager.py | 67 +++-- depicts/relaxed_ssl.py | 23 +- depicts/rijksmuseum.py | 45 ++- depicts/saam.py | 56 ++-- depicts/type.py | 50 ++++ depicts/utils.py | 118 +++++--- depicts/wd_catalog.py | 528 +++++++++++++++++++++++++----------- depicts/wdqs.py | 233 +++++++++------- depicts/wikibase.py | 62 +++-- depicts/wikidata_edit.py | 27 +- depicts/wikidata_oauth.py | 112 +++++--- 26 files changed, 1774 insertions(+), 968 deletions(-) create mode 100644 depicts/type.py diff --git a/app.py b/app.py index a8fda6a..25245a6 100755 --- a/app.py +++ b/app.py @@ -1,18 +1,18 @@ #!/usr/bin/python3 import hashlib -import inspect import itertools import json import os import re import socket +import typing from collections import defaultdict from datetime import datetime +from typing import Iterable, Mapping import requests.exceptions import simplejson.errors -import werkzeug from flask import ( Flask, g, @@ -27,6 +27,7 @@ from requests_oauthlib import OAuth1Session from sqlalchemy import distinct, func from sqlalchemy.orm import aliased from sqlalchemy.sql.expression import desc +from werkzeug.wrappers import Response from depicts import ( artwork, @@ -54,6 +55,7 @@ from depicts.model import ( WikidataQuery, ) from depicts.pager import Pagination, init_pager +from depicts.type import Entity user_agent = "Mozilla/5.0 (X11; Linux i586; rv:32.0) Gecko/20160101 Firefox/32.0" @@ -109,30 +111,15 @@ re_pid = re.compile(r"^P(\d+)") @app.teardown_appcontext -def shutdown_session(exception=None): - database.session.remove() - - -@app.errorhandler(werkzeug.exceptions.InternalServerError) -def exception_handler(e): - tb = werkzeug.debug.tbtools.get_current_traceback() - last_frame = next(frame for frame in reversed(tb.frames) if not frame.is_library) - last_frame_args = inspect.getargs(last_frame.code) - return ( - render_template( - "show_error.html", - tb=tb, - last_frame=last_frame, - last_frame_args=last_frame_args, - ), - 500, - ) +def shutdown_session(exception: Exception | None = None) -> None: + database.session.remove() # type:ignore @app.template_global() -def set_url_args(endpoint=None, **new_args): +def set_url_args(endpoint: str | None = None, **new_args: str) -> str: if endpoint is None: endpoint = request.endpoint + assert endpoint and request.view_args args = request.view_args.copy() args.update(request.args) args.update(new_args) @@ -141,23 +128,25 @@ def set_url_args(endpoint=None, **new_args): @app.template_global() -def current_url(): +def current_url() -> str: + """Get current URL.""" + assert request and request.view_args and request.endpoint args = request.view_args.copy() args.update(request.args) return url_for(request.endpoint, **args) @app.before_request -def init_profile(): +def init_profile() -> None: g.profiling = [] @app.before_request -def global_user(): +def global_user() -> None: g.user = wikidata_oauth.get_username() -def check_for_blocks(): +def check_for_blocks() -> None: if hasattr(g, "server_ip"): # already done return hostname = app.config.get("HOSTNAME") @@ -172,43 +161,45 @@ def check_for_blocks(): @app.before_request -def get_blocks(): +def get_blocks() -> None: if app.config.get("SHOW_BLOCK_ALERT") is not False: check_for_blocks() @app.route("/find_more_setting") -def flip_find_more(): +def flip_find_more() -> str: session["no_find_more"] = not session.get("no_find_more") display = {True: "on", False: "off"}[not session["no_find_more"]] return "flipped. find more is " + display -def existing_edit(item_id, depicts_id): - q = Edit.query.filter_by(artwork_id=item_id, depicts_id=depicts_id) - return q.count() != 0 +def existing_edit(item_id: int, depicts_id: int) -> bool: + q = Edit.query.filter_by(artwork_id=item_id, depicts_id=depicts_id) # type: ignore + return bool(q.count() != 0) @app.route("/save/Q", methods=["POST"]) -def save(item_id): +def save(item_id: int) -> str | Response: depicts = request.form.getlist("depicts") username = wikidata_oauth.get_username() assert username token = wikidata_oauth.get_token() - artwork_item = Item.query.get(item_id) + artwork_item = Item.query.get(item_id) # type: ignore if artwork_item is None: artwork_entity = mediawiki.get_entity_with_cache(f"Q{item_id}") - artwork_item = Item(item_id=item_id, entity=artwork_entity) + artwork_item = Item( + item_id=item_id, entity=typing.cast(dict[str, str], artwork_entity) + ) database.session.add(artwork_item) database.session.commit() for depicts_qid in depicts: depicts_id = int(depicts_qid[1:]) - depicts_item = DepictsItem.query.get(depicts_id) + depicts_item = DepictsItem.query.get(depicts_id) # type: ignore if depicts_item is None: depicts_item = wikidata_edit.create_depicts_item(depicts_id) database.session.add(depicts_item) @@ -247,24 +238,24 @@ def save(item_id): @app.route("/settings", methods=["GET", "POST"]) -def user_settings(): +def user_settings() -> str: return render_template("user_settings.html") @app.route("/test/lookup") -def test_lookup_page(): +def test_lookup_page() -> str: return render_template("test_lookup.html") @app.route("/property/P") -def property_query_page(property_id): +def property_query_page(property_id: int) -> str: pid = f"P{property_id}" g.title = find_more_props[pid] sort = request.args.get("sort") sort_by_name = sort and sort.lower().strip() == "name" q = ( - database.session.query( + database.session.query( # type: ignore Triple.object_id, func.count(func.distinct(Triple.subject_id)).label("c") ) .filter_by(predicate_id=property_id) @@ -302,15 +293,19 @@ def property_query_page(property_id): @app.route("/") -def start(): +def start() -> Response: return random_artwork() @app.route("/next") -def random_artwork(): +def random_artwork() -> Response: found = None while True: - q = Item.query.filter_by(is_artwork=True).order_by(func.random()).limit(30) + q = ( + Item.query.filter_by(is_artwork=True) # type: ignore + .order_by(func.random()) + .limit(30) + ) for item in q: has_depicts = "P180" in item.entity["claims"] if has_depicts: @@ -325,7 +320,7 @@ def random_artwork(): @app.route("/oauth/start") -def start_oauth(): +def start_oauth() -> Response: next_page = request.args.get("next") if next_page: session["after_login"] = next_page @@ -349,7 +344,7 @@ def start_oauth(): @app.route("/oauth/callback", methods=["GET"]) -def oauth_callback(): +def oauth_callback() -> Response: base_url = "https://www.wikidata.org/w/index.php" client_key = app.config["CLIENT_KEY"] client_secret = app.config["CLIENT_SECRET"] @@ -381,17 +376,17 @@ def oauth_callback(): @app.route("/oauth/disconnect") -def oauth_disconnect(): +def oauth_disconnect() -> Response: for key in "owner_key", "owner_secret", "username", "after_login": if key in session: del session[key] return redirect(url_for("browse_page")) -def create_claim(artwork_id, depicts_id, token): +def create_claim(artwork_id: int, depicts_id: int, token: str) -> requests.Response: artwork_qid = f"Q{artwork_id}" value = json.dumps({"entity-type": "item", "numeric-id": depicts_id}) - params = { + params: dict[str, str | int] = { "action": "wbcreateclaim", "entity": artwork_qid, "property": "P180", @@ -401,10 +396,11 @@ def create_claim(artwork_id, depicts_id, token): "format": "json", "formatversion": 2, } - return wikidata_oauth.api_post_request(params) + r: requests.Response = wikidata_oauth.api_post_request(params) + return r -def image_with_cache(qid, image_filename, width): +def image_with_cache(qid: str, image_filename: str, width: int) -> dict[str, str]: filename = f"cache/{qid}_{width}_image.json" detail = json.load(open(filename)) if os.path.exists(filename) else {} @@ -416,10 +412,11 @@ def image_with_cache(qid, image_filename, width): detail = commons.image_detail([image_filename], thumbwidth=width) json.dump(detail, open(filename, "w"), indent=2) - return detail.get(image_filename) + image: dict[str, str] = detail.get(image_filename) + return image -def existing_depicts_from_entity(entity): +def existing_depicts_from_entity(entity: Entity) -> list[dict[str, typing.Any]]: if "P180" not in entity["claims"]: return [] existing = [] @@ -430,7 +427,7 @@ def existing_depicts_from_entity(entity): item_id = claim["mainsnak"]["datavalue"]["value"]["numeric-id"] - item = DepictsItem.query.get(item_id) + item = DepictsItem.query.get(item_id) # type: ignore if not item: item = wikidata_edit.create_depicts_item(item_id) database.session.add(item) @@ -448,27 +445,32 @@ def existing_depicts_from_entity(entity): return existing -def get_institution(entity, other): +def get_institution(entity: Entity, other: Mapping[str, str | None]) -> str | None: if "P276" in entity["claims"]: location = wikibase.first_datavalue(entity, "P276") + assert isinstance(location, dict) if location: - return other.get(location["id"]) + return typing.cast(str, other.get(location["id"])) if "P195" in entity["claims"]: collection = wikibase.first_datavalue(entity, "P195") + assert isinstance(collection, dict) if collection: - return other.get(collection["id"]) + return typing.cast(str, other.get(collection["id"])) + return None @app.route("/item/Q") -def item_page(item_id): +def item_page(item_id: int) -> str | Response: qid = f"Q{item_id}" g.qid = qid item = artwork.Artwork(qid) from_redirect = qid in session and session.pop(qid) == "from redirect" entity = mediawiki.get_entity_with_cache(qid, refresh=not from_redirect) + assert entity if "redirects" in entity: redirect_to_item_id = int(entity["redirects"]["to"][1:]) + assert request.endpoint return redirect(url_for(request.endpoint, item_id=redirect_to_item_id)) existing_depicts = existing_depicts_from_entity(entity) @@ -494,7 +496,7 @@ def item_page(item_id): label_languages = label_and_language["languages"] if label_and_language else [] show_translation_links = all(lang.code != "en" for lang in label_languages) - artwork_item = Item.query.get(item_id) + artwork_item = Item.query.get(item_id) # type: ignore if artwork_item is None: if not wdqs.is_artificial_physical_object(qid): return render_template( @@ -517,7 +519,7 @@ def item_page(item_id): artwork_item = Item( item_id=item_id, - entity=entity, + entity=typing.cast(dict[str, str], entity), lastrevid=entity["lastrevid"], modified=modified, ) @@ -525,7 +527,9 @@ def item_page(item_id): catalog = wd_catalog.get_catalog_from_artwork(entity) if not catalog.get("institution"): - catalog["institution"] = get_institution(entity, other) + institution = get_institution(entity, other) + assert institution + catalog["institution"] = institution return render_template( "item.html", @@ -548,11 +552,13 @@ def item_page(item_id): ) -def get_languages(codes): - return Language.query.filter(Language.wikimedia_language_code.in_(codes)) +def get_languages(codes: typing.Iterable[str]) -> typing.Any: + return Language.query.filter( # type: ignore + Language.wikimedia_language_code.in_(codes) + ) -def get_entity_label_and_language(entity): +def get_entity_label_and_language(entity: Entity) -> dict[str, typing.Any] | None: """ Look for a useful label and return it with a list of languages that have that label. @@ -573,8 +579,10 @@ def get_entity_label_and_language(entity): label, languages = list(group_by_label.items())[0] return {"label": label, "languages": get_languages(languages)} + return None -def get_labels(keys, name=None): + +def get_labels(keys: typing.Iterable[str], name: str | None = None) -> dict[str, str]: keys = sorted(keys, key=lambda i: int(i[1:])) if name is None: name = hashlib.md5("_".join(keys).encode("utf-8")).hexdigest() @@ -590,10 +598,13 @@ def get_labels(keys, name=None): json.dump({"keys": keys, "labels": labels}, open(filename, "w"), indent=2) - return {entity["id"]: wikibase.get_entity_label(entity) for entity in labels} + return { + entity["id"]: wikibase.get_entity_label(entity) or "no English label" + for entity in labels + } -def get_labels_db(keys): +def get_labels_db(keys: Iterable[str]): keys = set(keys) labels = {} missing = set() @@ -601,7 +612,7 @@ def get_labels_db(keys): m = re_qid.match(qid) if m: item_id = int(m.group(1)) - item = Item.query.get(item_id) + item = Item.query.get(item_id) # type: ignore if item: labels[qid] = item.label continue @@ -619,8 +630,8 @@ def get_labels_db(keys): modified = datetime.strptime(entity["modified"], "%Y-%m-%dT%H:%M:%SZ") # FIXME: check if the item is an artwork and set is_artwork correctly - item = Item( - item_id=qid[1:], + item = Item( # type: ignore + item_id=int(qid[1:]), entity=entity, lastrevid=entity["lastrevid"], modified=modified, @@ -635,7 +646,7 @@ def get_labels_db(keys): return labels -def build_other_set(entity): +def build_other_set(entity: Entity) -> set[str]: other_items = set() for key in find_more_props.keys(): for claim in entity["claims"].get(key, []): @@ -645,20 +656,24 @@ def build_other_set(entity): return other_items -def get_other(entity): +def get_other(entity: Entity) -> Mapping[str, str | None]: other_items = build_other_set(entity) return get_labels(other_items) @app.route("/edits") -def list_edits(): - q = Edit.query.order_by(Edit.timestamp.desc()) +def list_edits() -> str: + q = Edit.query.order_by(Edit.timestamp.desc()) # type: ignore page = utils.get_int_arg("page") or 1 pager = Pagination(page, 100, q.count()) - item_count = database.session.query(func.count(distinct(Edit.artwork_id))).scalar() + item_count = database.session.query( + func.count(distinct(Edit.artwork_id)) + ).scalar() # type: ignore - user_count = database.session.query(func.count(distinct(Edit.username))).scalar() + user_count = database.session.query( + func.count(distinct(Edit.username)) + ).scalar() # type: ignore return render_template( "list_edits.html", @@ -670,11 +685,13 @@ def list_edits(): @app.route("/user/") -def user_page(username): - edit_list = Edit.query.filter_by(username=username).order_by(Edit.timestamp.desc()) +def user_page(username: str) -> str: + edit_list = Edit.query.filter_by(username=username).order_by( # type: ignore + Edit.timestamp.desc() + ) item_count = ( - database.session.query(func.count(distinct(Edit.artwork_id))) + database.session.query(func.count(distinct(Edit.artwork_id))) # type: ignore .filter_by(username=username) .scalar() ) @@ -682,20 +699,22 @@ def user_page(username): return render_template( "user_page.html", username=username, - edits=Edit.query, + edits=Edit.query, # type: ignore edit_list=edit_list, item_count=item_count, ) @app.route("/next/Q") -def next_page(item_id): +def next_page(item_id: int) -> str: qid = f"Q{item_id}" entity = mediawiki.get_entity_with_cache(qid) + assert entity width = 800 image_filename = wikibase.first_datavalue(entity, "P18") + assert isinstance(image_filename, str) image = image_with_cache(qid, image_filename, width) label = wikibase.get_entity_label(entity) @@ -756,13 +775,13 @@ def next_page(item_id): @app.route("/P/Q") -def find_more_page(property_id, item_id): +def find_more_page(property_id: int, item_id: int) -> Response: pid, qid = f"P{property_id}", f"Q{item_id}" - return redirect(url_for("browse_page", **{pid: qid})) + return redirect(url_for("browse_page", **{pid: qid})) # type: ignore @app.route("/toolinfo.json") -def tool_info(): +def tool_info() -> Response: info = { "name": "wade", "title": "Wikidata Art Depiction Explorer", @@ -775,14 +794,16 @@ def tool_info(): return jsonify(info) -def get_facets(params): +def get_facets(params) -> dict[str, typing.Any]: properties = [pid for pid in find_more_props.keys() if pid not in request.args] bindings = wdqs.run_from_template_with_cache( "query/facet.sparql", params=params, isa_list=isa_list, properties=properties ) - facets = {key: [] for key in find_more_props.keys()} + facets: dict[str, list[dict[str, str | int]]] = { + key: [] for key in find_more_props.keys() + } for row in bindings: pid = row["property"]["value"].rpartition("/")[2] qid = row["object"]["value"].rpartition("/")[2] @@ -800,7 +821,7 @@ def get_facets(params): } -def get_artwork_params(): +def get_artwork_params() -> list[tuple[str, str]]: params = [] for pid, qid in request.args.items(): m = re_pid.match(pid) @@ -817,14 +838,14 @@ def get_artwork_params(): return params -def filter_artwork(params): +def filter_artwork(params: list[tuple[str, str]]) -> list[wdqs.Row]: return wdqs.run_from_template_with_cache( "query/find_more.sparql", params=params, isa_list=isa_list ) @app.route("/catalog") -def catalog_page(): +def catalog_page() -> str: params = get_artwork_params() bindings = filter_artwork(params) page = utils.get_int_arg("page") or 1 @@ -837,7 +858,7 @@ def catalog_page(): qids = [f"Q{item_id}" for item_id in sorted(item_ids)] - items = [Item.query.get(item_id) for item_id in item_ids] + items = [Item.query.get(item_id) for item_id in item_ids] # type: ignore entities = mediawiki.get_entities_with_cache(qids) @@ -878,7 +899,9 @@ def catalog_page(): ) -def get_image_detail_with_cache(items, cache_name, thumbwidth=None, refresh=False): +def get_image_detail_with_cache( + items, cache_name: str, thumbwidth: int | None = None, refresh: bool = False +): filenames = [cur.image_filename() for cur in items] if thumbwidth is None: @@ -902,9 +925,9 @@ def get_image_detail_with_cache(items, cache_name, thumbwidth=None, refresh=Fals return detail -def browse_index(): +def browse_index() -> str: q = ( - database.session.query( + database.session.query( # type: ignore Triple.predicate_id, func.count(func.distinct(Triple.object_id)) ) .join(Item, Triple.subject_id == Item.item_id) @@ -918,13 +941,13 @@ def browse_index(): @app.route("/debug/show_user") -def debug_show_user(): +def debug_show_user() -> str: userinfo = wikidata_oauth.userinfo_call() return "
" + json.dumps(userinfo, indent=2) + "
" @app.route("/browse/facets.json") -def browse_facets(): +def browse_facets() -> Response: params = get_artwork_params() if not params: return jsonify(notice="facet criteria missing") @@ -940,7 +963,7 @@ def browse_facets(): def get_db_items(params): """Get items for browse page based on criteria.""" - q = Item.query.filter_by(is_artwork=True) + q = Item.query.filter_by(is_artwork=True) # type: ignore for pid, qid in params: q = q.join(Triple, Item.item_id == Triple.subject_id, aliased=True).filter( Triple.predicate_id == pid[1:], Triple.object_id == qid[1:] @@ -955,7 +978,9 @@ def get_db_facets(params): facet_limit = 18 for pid, qid in params: - q = q.join(Triple, t.subject_id == Triple.subject_id, aliased=True).filter( + q = q.join( # type: ignore + Triple, t.subject_id == Triple.subject_id, aliased=True + ).filter( Triple.predicate_id == pid[1:], Triple.object_id == qid[1:], t.predicate_id != pid[1:], @@ -967,9 +992,9 @@ def get_db_facets(params): results = sorted(tuple(row) for row in q.all()) facet_list = {} - subject_qids = set() + subject_qids: set[str] = set() for predicate_id, x in itertools.groupby(results, lambda row: row[0]): - hits = sorted(list(x), key=lambda row: row[1], reverse=True) + hits = sorted(x, key=lambda row: row[1], reverse=True) values = [ {"count": count, "qid": f"Q{value}"} for _, count, value in hits[:facet_limit] @@ -987,7 +1012,7 @@ def get_db_facets(params): @app.route("/browse") -def browse_page(): +def browse_page() -> str: page_size = 45 params = get_artwork_params() @@ -1051,7 +1076,7 @@ def browse_page(): @app.route("/find_more.json") -def find_more_json(): +def find_more_json() -> Response: pid = request.args.get("pid") qid_list = request.args.getlist("qid") limit = 6 @@ -1090,7 +1115,10 @@ def find_more_json(): return jsonify(items=items) -def wikibase_search(terms): +Hit = dict[str, str | int | None] + + +def wikibase_search(terms: str) -> list[Hit]: hits = [] r = mediawiki.api_call( { @@ -1114,10 +1142,10 @@ def wikibase_search(terms): return hits -def add_images_to_depicts_lookup(hits): +def add_images_to_depicts_lookup(hits: list[dict[str, str]]) -> None: qid_to_item = {hit["qid"]: hit for hit in hits} all_qids = [hit["qid"] for hit in hits] - entities = mediawiki.get_entities_with_cache(all_qids) + entities: list[Entity] = mediawiki.get_entities_with_cache(all_qids) for entity in entities: qid = entity["id"] @@ -1144,7 +1172,7 @@ def add_images_to_depicts_lookup(hits): @app.route("/lookup") -def depicts_lookup(): +def depicts_lookup() -> Response: terms = request.args.get("terms") if not terms: return jsonify(error="terms parameter is required") @@ -1159,7 +1187,7 @@ def depicts_lookup(): item_ids = [] hits = [] - q1 = DepictsItem.query.filter(DepictsItem.label.ilike(terms + "%")) + q1 = DepictsItem.query.filter(DepictsItem.label.ilike(terms + "%")) # type: ignore seen = set() for item in q1: hit = { @@ -1173,7 +1201,9 @@ def depicts_lookup(): seen.add(item.qid) cls = DepictsItemAltLabel - q2 = cls.query.filter(cls.alt_label.ilike(terms + "%"), ~cls.item_id.in_(item_ids)) + q2 = cls.query.filter( # type: ignore + cls.alt_label.ilike(terms + "%"), ~cls.item_id.in_(item_ids) + ) for alt in q2: item = alt.item @@ -1208,12 +1238,12 @@ def depicts_lookup(): @app.route("/report/missing_image") -def missing_image_report(): +def missing_image_report() -> str: limit = utils.get_int_arg("limit") or 1000 - q = DepictsItem.query.order_by(DepictsItem.count.desc()).limit(limit) + q = DepictsItem.query.order_by(DepictsItem.count.desc()).limit(limit) # type:ignore qids = [item.qid for item in q] - entities = mediawiki.get_entities_dict_with_cache(qids) + entities: dict[str, Entity] = mediawiki.get_entities_dict_with_cache(qids) item_list = [] @@ -1229,19 +1259,19 @@ def missing_image_report(): @app.route("/report/wdqs") -def wikidata_query_list(): - q = WikidataQuery.query.order_by(WikidataQuery.start_time.desc()) +def wikidata_query_list() -> str: + q = WikidataQuery.query.order_by(WikidataQuery.start_time.desc()) # type: ignore return render_template("query_list.html", q=q) @app.route("/report/blocks") -def server_block_report(): +def server_block_report() -> str: check_for_blocks() return render_template("block_report.html") @app.route("/fixture/save_error") -def save_error_fixture(): +def save_error_fixture() -> str: error = fixtures.save_error()["error"] return render_template("save_error.html", error=error) diff --git a/depicts/artwork.py b/depicts/artwork.py index aa13fd8..88ee450 100644 --- a/depicts/artwork.py +++ b/depicts/artwork.py @@ -1,106 +1,150 @@ +"""Class to represent artwork.""" + from . import mediawiki +from .type import Claims, Entity, Sitelinks + class Artwork: - def __init__(self, qid): - self.entity = mediawiki.get_entity_with_cache(qid) + """Artwork.""" + + entity: Entity + artist_entities: list[Entity] + + def __init__(self, qid: str) -> None: + """Init.""" + entity = mediawiki.get_entity_with_cache(qid) + assert entity + self.entity = entity self.item_id = int(qid[1:]) - sites = ['commons', 'enwiki'] + sites = ["commons", "enwiki"] self.parent_categories = {site: {} for site in sites} @property - def image_filename(self): - if 'P18' in self.entity['claims']: - return self.entity['claims']['P18'][0]['mainsnak']['datavalue']['value'] + def image_filename(self) -> str | None: + """Image filename.""" + if "P18" in self.entity["claims"]: + f: str = self.entity["claims"]["P18"][0]["mainsnak"]["datavalue"]["value"] + return f + else: + return None @property - def display_title(self): - if 'en' not in self.entity['labels']: - return self.qid - return f'{self.en_title} ({self.qid})' + def display_title(self) -> str: + """Display title.""" + return ( + f"{self.en_title} ({self.qid})" + if "en" in self.entity["labels"] + else self.qid + ) @property - def url(self): - return 'https://www.wikidata.org/wiki/' + self.qid + def url(self) -> str: + """Wikidata item URL.""" + return "https://www.wikidata.org/wiki/" + self.qid - def get_artist_entities(self): + def get_artist_entities(self) -> None: + """Get artist entities.""" self.artist_entities = [] for artist in self.artists_claim: - artist_qid = artist['id'] - self.artist_entities.append(mediawiki.get_entity(artist_qid)) + artist_entity = mediawiki.get_entity(artist["id"]) + assert artist_entity + self.artist_entities.append(artist_entity) - def artist_labels(self): - if not hasattr(self, 'artist_entities'): + def artist_labels(self) -> list[str]: + """Get artist Wikidata item labels.""" + if not hasattr(self, "artist_entities"): self.get_artist_entities() - return [artist['labels']['en']['value'] for artist in self.artist_entities] + return [artist["labels"]["en"]["value"] for artist in self.artist_entities] @property - def commons_cats(self): - return [i['mainsnak']['datavalue']['value'] - for i in self.entity['claims'].get('P373', [])] + def commons_cats(self) -> list[str]: + return [ + i["mainsnak"]["datavalue"]["value"] + for i in self.entity["claims"].get("P373", []) + ] @property - def commons_sitelink(self): - return self.sitelinks['commons']['value'] if 'commons' in self.sitelinks else None + def commons_sitelink(self) -> str | None: + """Wikimedia sitelink.""" + return ( + self.sitelinks["commons"]["value"] if "commons" in self.sitelinks else None + ) @property - def en_title(self): - if 'en' in self.entity['labels']: - return self.entity['labels']['en']['value'] + def en_title(self) -> str: + if "en" in self.entity["labels"]: + title: str = self.entity["labels"]["en"]["value"] + return title else: return self.qid @property - def artists_claim(self): - return [image['mainsnak']['datavalue']['value'] - for image in self.entity['claims'].get('P170', [])] + def artists_claim(self) -> list[dict[str, str]]: + return [ + image["mainsnak"]["datavalue"]["value"] + for image in self.entity["claims"].get("P170", []) + ] @property - def artists(self): - if not hasattr(self, 'artist_entities'): + def artists(self) -> list[dict[str, str]]: + if not hasattr(self, "artist_entities"): self.get_artist_entities() - items = [image['mainsnak']['datavalue']['value'] - for image in self.entity['claims'].get('P170', [])] + items = [ + image["mainsnak"]["datavalue"]["value"] + for image in self.entity["claims"].get("P170", []) + ] - lookup = {artist['id']: artist['labels'] for artist in self.artist_entities} + lookup = {artist["id"]: artist["labels"] for artist in self.artist_entities} for item in items: - item['labels'] = lookup[item['id']] + item["labels"] = lookup[item["id"]] return items @property - def qid(self): - return f'Q{self.item_id}' + def qid(self) -> str: + """Wikidata item QID.""" + return f"Q{self.item_id}" @property - def commons_filenames(self): - return [image['mainsnak']['datavalue']['value'] - for image in self.entity['claims'].get('P18', [])] + def commons_filenames(self) -> list[str]: + """Filenames of images on Wikimedia Commons.""" + return [ + image["mainsnak"]["datavalue"]["value"] + for image in self.entity["claims"].get("P18", []) + ] - def commons_cat_from_sitelink(self): - ns = 'Category:' + def commons_cat_from_sitelink(self) -> str | None: + ns = "Category:" if not self.commons_sitelink or not self.commons_sitelink.startswith(ns): - return - return self.commons_sitelink[len(ns):] + return None + return self.commons_sitelink[len(ns) :] @property - def enwiki_url(self): + def enwiki_url(self) -> str | None: + """URL for English Wikipedia article about artwork.""" enwiki = self.enwiki - if not enwiki: - return - return 'https://en.wikipedia.org/wiki/' + enwiki.replace(' ', '_') + assert enwiki + return ( + "https://en.wikipedia.org/wiki/" + enwiki.replace(" ", "_") + if not enwiki + else None + ) @property - def sitelinks(self): - return self.entity['sitelinks'] + def sitelinks(self) -> Sitelinks: + """Item sitelinks.""" + return self.entity["sitelinks"] @property - def claims(self): - return self.entity['claims'] + def claims(self) -> Claims: + """Item claims.""" + return self.entity["claims"] @property - def enwiki(self): - return self.sitelinks['enwiki']['title'] if 'enwiki' in self.sitelinks else None + def enwiki(self) -> str | None: + """Article title on English Wikipedia.""" + return self.sitelinks["enwiki"]["title"] if "enwiki" in self.sitelinks else None diff --git a/depicts/barnesfoundation.py b/depicts/barnesfoundation.py index 76987f9..ee724fb 100644 --- a/depicts/barnesfoundation.py +++ b/depicts/barnesfoundation.py @@ -1,32 +1,51 @@ -import requests -import os +"""Barnes Foundation (Q808462) - art museum in Philadephia, Pennsylvania.""" + import json +import os +import typing -def get_json(catalog_id): - filename = f'cache/barnesfoundation_{catalog_id}.html' +import requests - url = 'https://collection.barnesfoundation.org/api/search' +from .type import CatalogDict - body = {"query": {"bool": {"filter": {"exists": {"field": "imageSecret"}}, - "must": {"match": {"_id": int(catalog_id)}}}}} +JsonData = dict[str, dict[str, typing.Any]] - if os.path.exists(filename): - return json.load(open(filename)) - else: - r = requests.get(url, params={'body': json.dumps(body)}) - print(r.url) - open(filename, 'w').write(r.text) - return r.json() -def parse_catalog(data): - hit = data['hits']['hits'][0]['_source'] +def get_json(catalog_id: str | int) -> JsonData: + """Get JSON from website and cache.""" + filename = f"cache/barnesfoundation_{catalog_id}.html" - return { - 'institution': 'Barnes Foundation', - 'description': hit['shortDescription'], - 'keywords': [tag['tag'] for tag in hit['tags']], + url = "https://collection.barnesfoundation.org/api/search" + + body = { + "query": { + "bool": { + "filter": {"exists": {"field": "imageSecret"}}, + "must": {"match": {"_id": int(catalog_id)}}, + } + } } -def get_catalog(catalog_id): + if os.path.exists(filename): + return typing.cast(JsonData, json.load(open(filename))) + r = requests.get(url, params={"body": json.dumps(body)}) + print(r.url) + open(filename, "w").write(r.text) + return typing.cast(JsonData, r.json()) + + +def parse_catalog(data: JsonData) -> CatalogDict: + """Parse catalog JSON.""" + hit = data["hits"]["hits"][0]["_source"] + + return { + "institution": "Barnes Foundation", + "description": hit["shortDescription"], + "keywords": [tag["tag"] for tag in hit["tags"]], + } + + +def get_catalog(catalog_id: str | int) -> CatalogDict: + """Lookup artwork using catalog ID and return keywords.""" data = get_json(catalog_id) return parse_catalog(data) diff --git a/depicts/category.py b/depicts/category.py index 2d350e0..88e15bd 100644 --- a/depicts/category.py +++ b/depicts/category.py @@ -1,49 +1,71 @@ -from . import utils -import re import calendar +import re -month_pattern = '|'.join(m for m in calendar.month_name if m) -re_date_based = re.compile(r'^(\d{4}-\d{2}-\d{2}|(' + month_pattern + r') \d{4}|\d{4}s?|\d{1,2}(st|nd|rd|th)-century) ') +from . import artwork, utils + +month_pattern = "|".join(m for m in calendar.month_name if m) +re_date_based = re.compile( + r"^(\d{4}-\d{2}-\d{2}|(" + + month_pattern + + r") \d{4}|\d{4}s?|\d{1,2}(st|nd|rd|th)-century) " +) + +ns_cat = "Category:" -ns_cat = 'Category:' class Category: - def __init__(self, title, site): + """Category.""" + + title: str + site: str + item: artwork.Artwork | None + + def __init__(self, title: str, site: str): + """Init.""" if title.startswith(ns_cat): - title = title[len(ns_cat):] + title = title[len(ns_cat) :] self.title = title self.site = site self.item = None - def __repr__(self): - return f'{self.__class__.__name__}({self.title!r}, {self.site!r})' + def __repr__(self) -> str: + """Repr.""" + return f"{self.__class__.__name__}({self.title!r}, {self.site!r})" - def set_item(self, item): + def set_item(self, item: artwork.Artwork | None) -> None: self.item = item @property - def url(self): - return utils.wiki_url(self.title, self.site, ns='Category') + def url(self) -> str | None: + """Category URL.""" + assert self.title and self.site + return utils.wiki_url(self.title, self.site, ns="Category") - def date_based(self): + def date_based(self) -> bool: + """Category title is date based.""" return bool(re_date_based.match(self.title)) - def contains_artist_name(self): + def contains_artist_name(self) -> bool: + """Category title contains artists name.""" if not self.item: - return - return any(artist.lower() in self.title.lower() - for artist in self.item.artist_labels()) + return False + return any( + artist.lower() in self.title.lower() for artist in self.item.artist_labels() + ) - def parents(self): + def parents(self) -> list["Category"]: + """Parent categories.""" if not self.item: return [] return self.item.parent_categories[self.site].get(self.title, []) - def is_exhibition(self): - return any(parent.title.startswith('Art exhibitions ') - for parent in self.parents()) + def is_exhibition(self) -> bool: + """Category represents art exhibition.""" + return any( + parent.title.startswith("Art exhibitions ") for parent in self.parents() + ) - def names_for_wikidata(self): + def names_for_wikidata(self) -> list[str]: highlight = self.check() interesting = len(highlight) > 1 @@ -58,7 +80,7 @@ class Category: continue title = text.strip() title = title[0].upper() + title[1:] - for sep in ' with ', ' at ', ' wearing ': + for sep in " with ", " at ", " wearing ": if sep in title: before, _, after = title.partition(sep) names = [] @@ -66,44 +88,76 @@ class Category: names += utils.also_singular(x) return names return utils.also_singular(title) + return [] - def urls_for_wikidata(self): - return [utils.wiki_url(name, self.site, ns='Category') - for name in self.names_for_wikidata()] + def urls_for_wikidata(self) -> list[str]: + return [ + utils.wiki_url(name, self.site, ns="Category") + for name in self.names_for_wikidata() + ] - def check(self): + def check(self) -> list[tuple[bool, str]]: cat = self.title lc_cat = cat.lower() - by_endings = ['title', 'technique', 'period', 'century', 'country', 'movement', - 'medium', 'year', 'painter'] + by_endings = [ + "title", + "technique", + "period", + "century", + "country", + "movement", + "medium", + "year", + "painter", + ] if self.item: by_endings += self.item.artist_labels() - for after in ('in art', 'in portrait paintings', 'in landscape paintings', 'in culture', 'in popular culture', 'in painting', 'in 1', 'in 2', 'looking at viewer'): + for after in ( + "in art", + "in portrait paintings", + "in landscape paintings", + "in culture", + "in popular culture", + "in painting", + "in 1", + "in 2", + "looking at viewer", + ): pos = lc_cat.find(after) # don't highlight "1512 in art" - if pos == -1 or cat[:pos - 1].isdigit(): + if pos == -1 or cat[: pos - 1].isdigit(): continue return [(True, cat[:pos]), (False, cat[pos:])] - for before in ('paintings of', 'portraits of', 'landscapes of', - 'portraits with', 'paintings with', 'paintings depicting', - 'portraits depicting', 'landscapes depicting', 'works about'): + for before in ( + "paintings of", + "portraits of", + "landscapes of", + "portraits with", + "paintings with", + "paintings depicting", + "portraits depicting", + "landscapes depicting", + "works about", + ): pos = lc_cat.find(before) if pos == -1: continue pos += len(before) for by_ending in by_endings: - ending = ' by ' + by_ending + ending = " by " + by_ending if lc_cat.endswith(ending): - return [(False, cat[:pos]), - (True, cat[pos:-len(ending)]), - (False, cat[-len(ending):])] + return [ + (False, cat[:pos]), + (True, cat[pos : -len(ending)]), + (False, cat[-len(ending) :]), + ] return [(False, cat[:pos]), (True, cat[pos:])] - pos = lc_cat.find('of ') + pos = lc_cat.find("of ") if pos != -1: return [(True, cat[:pos]), (False, cat[pos:])] diff --git a/depicts/commons.py b/depicts/commons.py index bd695a9..3c9e288 100644 --- a/depicts/commons.py +++ b/depicts/commons.py @@ -1,36 +1,43 @@ -from . import mediawiki, utils +"""Wikimedia Commons API call.""" -commons_url = 'https://commons.wikimedia.org/w/api.php' +from . import mediawiki, utils +from .type import CallParams + +commons_url = "https://commons.wikimedia.org/w/api.php" page_size = 50 -def image_detail(filenames, thumbheight=None, thumbwidth=None): + +def image_detail( + filenames: list[str] | str, + thumbheight: int | None = None, + thumbwidth: int | None = None, +) -> dict[str, dict[str, str]]: + """Get image detail from Wikimedia Commons.""" if not isinstance(filenames, list): filenames = [filenames] if not filenames: return {} - params = { - 'action': 'query', - 'prop': 'imageinfo', - 'iiprop': 'url', + params: CallParams = { + "action": "query", + "prop": "imageinfo", + "iiprop": "url", } if thumbheight is not None: - params['iiurlheight'] = thumbheight + params["iiurlheight"] = thumbheight if thumbwidth is not None: - params['iiurlwidth'] = thumbwidth + params["iiurlwidth"] = thumbwidth images = {} for cur in utils.chunk(filenames, page_size): call_params = params.copy() - call_params['titles'] = '|'.join(f'File:{f}' for f in cur) + call_params["titles"] = "|".join(f"File:{f}" for f in cur) r = mediawiki.api_post(call_params, api_url=commons_url) - for image in r.json()['query']['pages']: - filename = utils.drop_start(image['title'], 'File:') - images[filename] = image['imageinfo'][0] if 'imageinfo' in image else None + for image in r.json()["query"]["pages"]: + filename = utils.drop_start(image["title"], "File:") + images[filename] = image["imageinfo"][0] if "imageinfo" in image else None return images - - diff --git a/depicts/database.py b/depicts/database.py index 7cd43fe..2d8c99c 100644 --- a/depicts/database.py +++ b/depicts/database.py @@ -1,21 +1,35 @@ +"""Interact with SQL database.""" + +import typing + +import flask +import sqlalchemy from sqlalchemy import create_engine, func from sqlalchemy.orm import scoped_session, sessionmaker session = scoped_session(sessionmaker()) -def init_db(db_url): - session.configure(bind=get_engine(db_url)) -def get_engine(db_url): +def get_engine(db_url: str) -> sqlalchemy.engine.base.Engine: + """Create an engine object.""" return create_engine(db_url, pool_recycle=3600, pool_size=20, max_overflow=40) -def init_app(app, echo=False): - db_url = app.config['DB_URL'] - session.configure(bind=get_engine(db_url, echo=echo)) - @app.teardown_appcontext - def shutdown_session(exception=None): - session.remove() +def init_db(db_url: str) -> None: + """Initialise database.""" + session.configure(bind=get_engine(db_url)) # type:ignore -def now_utc(): - return func.timezone('utc', func.now()) + +def init_app(app: flask.app.Flask) -> None: + """Initialise database connection within flask app.""" + db_url = app.config["DB_URL"] + session.configure(bind=get_engine(db_url)) # type: ignore + + @app.teardown_appcontext # type: ignore + def shutdown_session(exception: Exception | None = None) -> None: + session.remove() # type: ignore + + +def now_utc() -> typing.Any: + """Get current time in UTC.""" + return func.timezone("utc", func.now()) diff --git a/depicts/dia.py b/depicts/dia.py index a9722d8..4335f1a 100644 --- a/depicts/dia.py +++ b/depicts/dia.py @@ -1,55 +1,55 @@ -import requests -import lxml.html +"""Detroit Institute of Arts (Q1201549) - art museum in Detroit, Michigan.""" + import os import re -re_url = re.compile(r'https?://www.dia.org/art/collection/object/(.+)$') +import lxml.html +import requests -def get_html(url): +from .type import CatalogDict + +re_url = re.compile(r"https?://www.dia.org/art/collection/object/(.+)$") + + +def get_html(url: str) -> str | None: + """Get HTML from web catalog.""" m = re_url.search(url) if not m: - return - catalog_id = m.group(1).replace('/', '_') + return None + catalog_id = m.group(1).replace("/", "_") - filename = f'cache/dia_{catalog_id}.html' + filename = f"cache/dia_{catalog_id}.html" if os.path.exists(filename): html = open(filename).read() else: r = requests.get(url) html = r.text - open(filename, 'w').write(html) + open(filename, "w").write(html) return html -def parse_html(html): + +def parse_html(html: str) -> CatalogDict: + """Parse HTML and extract keywords.""" root = lxml.html.fromstring(html) keywords = [] - for a in root.findall('.//a[@href]'): - href = a.get('href') - if not href.startswith('/art/collection?keys='): + for a in root.findall(".//a[@href]"): + href = a.get("href") + assert href is not None + if not href.startswith("/art/collection?keys="): continue + assert a.text keywords.append(a.text) - if False: - sidebar = root.find('.//aside[@id="sidebar"]') - h2_list = sidebar.findall('.//h2') - h2_keyword = next((h2 for h2 in h2_list if h2.text == 'Keywords'), None) - if not h2_keyword: - return {} - keyword_div = h2_keyword.getparent() - for a in keyword_div: - if a.tag != 'a': - continue - keywords.append(a.text) - return { - 'institution': 'Detroit Institute of Arts', - 'keywords': keywords, + "institution": "Detroit Institute of Arts", + "keywords": keywords, } -def get_catalog(url): + +def get_catalog(url: str) -> CatalogDict | None: + """Get catalog web page and extract keywords.""" html = get_html(url) - if html: - return parse_html(html) + return parse_html(html) if html else None diff --git a/depicts/error_mail.py b/depicts/error_mail.py index d45b1e3..dbcf2c4 100644 --- a/depicts/error_mail.py +++ b/depicts/error_mail.py @@ -1,32 +1,48 @@ +"""Send mail to admin when an error happens.""" + import logging -from logging.handlers import SMTPHandler from logging import Formatter -from flask import request, g +from logging.handlers import SMTPHandler -PROJECT = 'depicts' +import flask +from flask import g, request -class MatcherSMTPHandler(SMTPHandler): - def getSubject(self, record): # noqa: N802 - subject = (f'{PROJECT} error: {record.exc_info[0].__name__}' - if (record.exc_info and record.exc_info[0]) - else f'{PROJECT} error: {record.pathname}:{record.lineno:d}') +PROJECT = "depicts" - if qid := getattr(g, 'qid', None): - subject += f' {qid}' - if label := getattr(g, 'label', None): - subject += f': {label}' +class MySMTPHandler(SMTPHandler): + """Custom SMTP handler to change mail subject.""" + + def getSubject(self, record: logging.LogRecord) -> str: + """Specify subject line for error mails.""" + subject = ( + f"{PROJECT} error: {record.exc_info[0].__name__}" + if (record.exc_info and record.exc_info[0]) + else f"{PROJECT} error: {record.pathname}:{record.lineno:d}" + ) + + if qid := getattr(g, "qid", None): + subject += f" {qid}" + + if label := getattr(g, "label", None): + subject += f": {label}" return subject + class RequestFormatter(Formatter): + """Custom logging formatter to include request.""" + def format(self, record): + """Record includes request.""" record.request = request return super().format(record) -def setup_error_mail(app): - formatter = RequestFormatter(''' +def setup_error_mail(app: flask.Flask) -> None: + """Send mail to admins when an error happens.""" + formatter = RequestFormatter( + """ Message type: {levelname} Location: {pathname:s}:{lineno:d} Module: {module:s} @@ -38,13 +54,17 @@ def setup_error_mail(app): Message: {message:s} - ''', style='{') + """, + style="{", + ) - mail_handler = MatcherSMTPHandler(app.config['SMTP_HOST'], - app.config['MAIL_FROM'], - app.config['ADMINS'], - app.name + ' error', - timeout=30) + mail_handler = MySMTPHandler( + app.config["SMTP_HOST"], + app.config["MAIL_FROM"], + app.config["ADMINS"], + app.name + " error", + timeout=30, + ) mail_handler.setFormatter(formatter) mail_handler.setLevel(logging.ERROR) diff --git a/depicts/fixtures.py b/depicts/fixtures.py index 338ef52..0968b55 100644 --- a/depicts/fixtures.py +++ b/depicts/fixtures.py @@ -1,3 +1,40 @@ -def save_error(): - return {"error":{"code":"failed-save","info":"The save has failed.","messages":[{"name":"wikibase-api-failed-save","parameters":[],"html":"The save has failed."},{"name":"wikimedia-globalblocking-ipblocked-range","parameters":["[//meta.wikimedia.org/wiki/User:Jon_Kolbert Jon Kolbert]","meta.wikimedia.org","[[m:NOP|Open Proxy]]: Colocation webhost, Contact [[m:Special:Contact/stewards|stewards]] if you are affected","04:21, 8 April 2020","04:21, 8 April 2023","78.129.222.14","78.129.128.0/17"],"html":"

Your IP address is in a range that has been blocked on all Wikimedia Foundation wikis.\n

The block was made by Jon Kolbert (meta.wikimedia.org).\nThe reason given is Open Proxy: Colocation webhost, Contact stewards if you are affected.\n

\n
  • Start of block: 04:21, 8 April 2020
  • \n
  • Expiry of block: 04:21, 8 April 2023
\n

Your current IP address is 78.129.222.14 and the blocked range is 78.129.128.0/17.\nPlease include all above details in any queries you make.\n

If you believe you were blocked by mistake, you can find additional information and instructions in the No open proxies global policy.\nOtherwise, to discuss the block please post a request for review on Meta-Wiki or send an email to the stewards OTRS queue at stewards@wikimedia.org including all above details.\n

"},{"name":"permissionserrors","parameters":[],"html":"Permission error"}],"docref":"See https://www.wikidata.org/w/api.php for API usage. Subscribe to the mediawiki-api-announce mailing list at <https://lists.wikimedia.org/mailman/listinfo/mediawiki-api-announce> for notice of API deprecations and breaking changes."},"servedby":"mw1315"} +"""Fixtures.""" +import typing + + +def save_error() -> dict[str, str | dict[str, typing.Any]]: + """Save error reply.""" + return { + "error": { + "code": "failed-save", + "info": "The save has failed.", + "messages": [ + { + "name": "wikibase-api-failed-save", + "parameters": [], + "html": "The save has failed.", + }, + { + "name": "wikimedia-globalblocking-ipblocked-range", + "parameters": [ + "[//meta.wikimedia.org/wiki/User:Jon_Kolbert Jon Kolbert]", + "meta.wikimedia.org", + "[[m:NOP|Open Proxy]]: Colocation webhost, Contact [[m:Special:Contact/stewards|stewards]] if you are affected", + "04:21, 8 April 2020", + "04:21, 8 April 2023", + "78.129.222.14", + "78.129.128.0/17", + ], + "html": '

Your IP address is in a range that has been blocked on all Wikimedia Foundation wikis.\n

The block was made by Jon Kolbert (meta.wikimedia.org).\nThe reason given is Open Proxy: Colocation webhost, Contact stewards if you are affected.\n

\n
  • Start of block: 04:21, 8 April 2020
  • \n
  • Expiry of block: 04:21, 8 April 2023
\n

Your current IP address is 78.129.222.14 and the blocked range is 78.129.128.0/17.\nPlease include all above details in any queries you make.\n

If you believe you were blocked by mistake, you can find additional information and instructions in the No open proxies global policy.\nOtherwise, to discuss the block please post a request for review on Meta-Wiki or send an email to the stewards OTRS queue at stewards@wikimedia.org including all above details.\n

', + }, + { + "name": "permissionserrors", + "parameters": [], + "html": "Permission error", + }, + ], + "docref": "See https://www.wikidata.org/w/api.php for API usage. Subscribe to the mediawiki-api-announce mailing list at <https://lists.wikimedia.org/mailman/listinfo/mediawiki-api-announce> for notice of API deprecations and breaking changes.", + }, + "servedby": "mw1315", + } diff --git a/depicts/human.py b/depicts/human.py index 4547c74..c492e22 100644 --- a/depicts/human.py +++ b/depicts/human.py @@ -1,19 +1,30 @@ -from .model import HumanItem -from . import mediawiki, wikibase +"""Human.""" + import re -re_four_digits = re.compile(r'\b\d{4}\b') +from . import mediawiki, wikibase +from .model import HumanItem +from .type import HumanDict -re_iso_date = re.compile(r'\b\d{4}-\d{2}-\d{2}\b') -re_four_and_two = re.compile(r'\b(\d{2})(\d{2})[-–](\d{2})\b') -re_catalog_number = re.compile(r'\b\d{4}[^\d]+\d+[^\d]+\d{4}\b') +re_four_digits = re.compile(r"\b\d{4}\b") -def query(yob, yod): +re_iso_date = re.compile(r"\b\d{4}-\d{2}-\d{2}\b") +re_four_and_two = re.compile(r"\b(\d{2})(\d{2})[-–](\d{2})\b") +re_catalog_number = re.compile(r"\b\d{4}[^\d]+\d+[^\d]+\d{4}\b") + + +def query(yob: int, yod: int) -> list[HumanItem]: + """Search for people with given birth and death years.""" if yod < yob: return [] - return HumanItem.query.filter_by(yob=yob, yod=yod).all() + humans: list[HumanItem] = HumanItem.query.filter_by( # type: ignore + yob=yob, yod=yod + ).all() + return humans -def get_items_from_name(name): + +def get_items_from_name(name: str) -> list[HumanItem]: + """Get people with name.""" found = [] m = re_four_and_two.search(name) @@ -32,27 +43,29 @@ def get_items_from_name(name): return found -def from_name(name): + +def from_name(name: str) -> list[HumanDict]: + """Find candidate items from name.""" candidates = get_items_from_name(name) - lookup = {item.qid: item for item in candidates} + lookup = {str(item.qid): item for item in candidates} qids = list(lookup.keys()) found = [] - for entity in mediawiki.get_entities_with_cache(qids, props='labels|descriptions'): - if 'redirects' in entity or 'missing' in entity: + for entity in mediawiki.get_entities_with_cache(qids, props="labels|descriptions"): + if "redirects" in entity or "missing" in entity: continue - qid = entity['id'] + qid = entity["id"] item = lookup[qid] - i = { - 'qid': entity['id'], - 'year_of_birth': item.year_of_birth, - 'year_of_death': item.year_of_death, + i: HumanDict = { + "qid": entity["id"], + "year_of_birth": item.year_of_birth, + "year_of_death": item.year_of_death, } label = wikibase.get_entity_label(entity) if label: - i['label'] = label - if 'en' in entity.get('descriptions', {}): - i['description'] = entity['descriptions']['en']['value'] + i["label"] = label + if "en" in entity.get("descriptions", {}): + i["description"] = entity["descriptions"]["en"]["value"] found.append(i) - found.sort(key=lambda i: i.get('label', '')) + found.sort(key=lambda i: i.get("label", "")) return found diff --git a/depicts/mail.py b/depicts/mail.py index 1f2771f..9d88b8a 100644 --- a/depicts/mail.py +++ b/depicts/mail.py @@ -1,20 +1,25 @@ -from flask import current_app +"""Send email.""" + +import smtplib from email.mime.text import MIMEText from email.utils import formatdate, make_msgid -import smtplib -def send_mail(subject, body): +from flask import current_app + + +def send_mail(subject: str, body: str) -> None: + """Send email to site admin.""" app = current_app - mail_to = app.config['ADMIN_EMAIL'] - mail_from = app.config['MAIL_FROM'] - msg = MIMEText(body, 'plain', 'UTF-8') + mail_to = app.config["ADMIN_EMAIL"] + mail_from = app.config["MAIL_FROM"] + msg = MIMEText(body, "plain", "UTF-8") - msg['Subject'] = subject - msg['To'] = mail_to - msg['From'] = mail_from - msg['Date'] = formatdate() - msg['Message-ID'] = make_msgid() + msg["Subject"] = subject + msg["To"] = mail_to + msg["From"] = mail_from + msg["Date"] = formatdate() + msg["Message-ID"] = make_msgid() - s = smtplib.SMTP(app.config['SMTP_HOST']) + s = smtplib.SMTP(app.config["SMTP_HOST"]) s.sendmail(mail_from, [mail_to], msg.as_string()) s.quit() diff --git a/depicts/mediawiki.py b/depicts/mediawiki.py index 39b414d..d6fc761 100644 --- a/depicts/mediawiki.py +++ b/depicts/mediawiki.py @@ -1,114 +1,149 @@ -import requests -import os -import json -import hashlib -from .category import Category -from . import utils +"""Access MediaWiki API.""" -wikidata_url = 'https://www.wikidata.org/w/api.php' +import hashlib +import json +import os +import typing + +import requests + +from . import utils +from .category import Category +from .type import CallParams, Entity + +wikidata_url = "https://www.wikidata.org/w/api.php" page_size = 50 hosts = { - 'commons': 'commons.wikimedia.org', - 'enwiki': 'en.wikipedia.org', - 'wikidata': 'www.wikidata.org', + "commons": "commons.wikimedia.org", + "enwiki": "en.wikipedia.org", + "wikidata": "www.wikidata.org", } -def api_call(params, api_url=wikidata_url): - call_params = { - 'format': 'json', - 'formatversion': 2, + +def api_call(params: CallParams, api_url: str = wikidata_url) -> requests.Response: + """Mediawiki API call.""" + call_params: CallParams = { + "format": "json", + "formatversion": 2, **params, } r = requests.get(api_url, params=call_params, timeout=5) return r -def api_post(params, api_url=wikidata_url): - call_params = { - 'format': 'json', - 'formatversion': 2, + +def api_post(params: CallParams, api_url: str = wikidata_url) -> requests.Response: + call_params: CallParams = { + "format": "json", + "formatversion": 2, **params, } r = requests.post(api_url, data=call_params, timeout=5) return r -def get_list(list_name, **params): - r = api_call({'action': 'query', 'list': list_name, **params}) - return r.json()['query'][list_name] -def get_entity(qid, redirects=False): - json_data = api_call({'action': 'wbgetentities', - 'ids': qid, - 'redirects': {True: 'yes', False: 'no'}[redirects]}).json() +def get_list(list_name: str, **params: str | int) -> list[dict[str, typing.Any]]: + r = api_call({"action": "query", "list": list_name, **params}) + list_contents: list[dict[str, typing.Any]] = r.json()["query"][list_name] + return list_contents + + +def get_entity(qid: str, redirects: bool = False) -> Entity | None: + """Get entity from wikibase.""" + json_data = api_call( + { + "action": "wbgetentities", + "ids": qid, + "redirects": {True: "yes", False: "no"}[redirects], + } + ).json() try: - entity = list(json_data['entities'].values())[0] + entity = list(json_data["entities"].values())[0] except KeyError: - return - if 'missing' not in entity: - return entity + return None + if "missing" not in entity: + return typing.cast(Entity, entity) + return None -def wbgetentities(ids, **params): + +def wbgetentities(ids: typing.Iterable[str], **params: str | int) -> dict[str, Entity]: + """Get entities from wikibase.""" if not ids: - return [] + return {} params = { - 'action': 'wbgetentities', - 'ids': '|'.join(ids), + "action": "wbgetentities", + "ids": "|".join(ids), **params, } - return api_call(params).json()['entities'] + ret: dict[str, Entity] = api_call(params).json()["entities"] + return ret -def get_entities(ids, **params): - entity_list = [] + +def get_entities(ids: typing.Iterable[str], **params: str | int) -> list[Entity]: + entity_list: list[Entity] = [] for cur in utils.chunk(ids, page_size): entity_list += wbgetentities(cur, **params).values() return entity_list -def get_entities_dict(ids, **params): + +def get_entities_dict(ids: str, **params: str | int) -> dict[str, Entity]: entities = {} for cur in utils.chunk(ids, page_size): entities.update(wbgetentities(cur, **params)) return entities -def get_entity_with_cache(qid, refresh=False): - filename = f'cache/{qid}.json' + +def get_entity_with_cache(qid: str, refresh: bool = False) -> Entity | None: + filename = f"cache/{qid}.json" + entity: Entity | None if not refresh and os.path.exists(filename): entity = json.load(open(filename)) else: entity = get_entity(qid, redirects=True) - json.dump(entity, open(filename, 'w'), indent=2) + json.dump(entity, open(filename, "w"), indent=2) return entity -def get_entities_with_cache(ids, **params): - md5 = hashlib.md5(' '.join(ids).encode('utf-8')).hexdigest() - filename = f'cache/entities_{md5}.json' +def get_entities_with_cache(ids: list[str], **params: typing.Any) -> list[Entity]: + md5 = hashlib.md5(" ".join(ids).encode("utf-8")).hexdigest() + + entity_list: list[Entity] + filename = f"cache/entities_{md5}.json" if os.path.exists(filename): entity_list = json.load(open(filename)) else: entity_list = get_entities(ids, **params) - json.dump(entity_list, open(filename, 'w'), indent=2) + json.dump(entity_list, open(filename, "w"), indent=2) return entity_list -def get_entities_dict_with_cache(all_ids, **params): + +def get_entities_dict_with_cache( + all_ids: list[str], **params: typing.Any +) -> dict[str, Entity]: entities = {} for ids in utils.chunk(all_ids, page_size): - md5 = hashlib.md5(' '.join(ids).encode('utf-8')).hexdigest() + md5 = hashlib.md5(" ".join(ids).encode("utf-8")).hexdigest() - filename = f'cache/entities_dict_{md5}.json' + filename = f"cache/entities_dict_{md5}.json" if os.path.exists(filename): entities.update(json.load(open(filename))) continue cur = wbgetentities(ids, **params) - json.dump(cur, open(filename, 'w'), indent=2) + json.dump(cur, open(filename, "w"), indent=2) entities.update(cur) return entities -def mediawiki_query(titles, params, site): + +Page = dict[str, typing.Any] + + +def mediawiki_query(titles: list[str], params: CallParams, site: str) -> list[Page]: + """Mediawiki query.""" if not titles: return [] @@ -116,74 +151,85 @@ def mediawiki_query(titles, params, site): # FIXME: switch to utils.chunk if len(titles) > page_size: titles = titles[:page_size] - base = { - 'format': 'json', - 'formatversion': 2, - 'action': 'query', - 'continue': '', - 'titles': '|'.join(titles), + base: CallParams = { + "format": "json", + "formatversion": 2, + "action": "query", + "continue": "", + "titles": "|".join(titles), } p = base.copy() p.update(params) - query_url = f'https://{hosts[site]}/w/api.php' + query_url = f"https://{hosts[site]}/w/api.php" r = requests.get(query_url, params=p) - expect = 'application/json; charset=utf-8' + expect = "application/json; charset=utf-8" success = True if r.status_code != 200: - print('status code: {r.status_code}'.format(r=r)) + print("status code: {r.status_code}".format(r=r)) success = False - if r.headers['content-type'] != expect: - print('content-type: {r.headers[content-type]}'.format(r=r)) + if r.headers["content-type"] != expect: + print(f'content-type: {r.headers["content-type"]}') success = False assert success json_reply = r.json() - if 'query' not in json_reply: + if "query" not in json_reply: print(r.url) print(r.text) - return json_reply['query']['pages'] + pages: list[Page] = json_reply["query"]["pages"] + return pages -def get_content_and_categories(title, site): - params = { - 'prop': 'revisions|categories', - 'clshow': '!hidden', - 'cllimit': 'max', - 'rvprop': 'content', + +def get_content_and_categories(title: str, site: str) -> tuple[str, list[str]]: + """Get article contents and categories.""" + params: CallParams = { + "prop": "revisions|categories", + "clshow": "!hidden", + "cllimit": "max", + "rvprop": "content", } pages = mediawiki_query([title], params, site) assert len(pages) == 1 page = pages[0] - return (page['revisions'][0]['content'], page.get('categories', [])) + return (page["revisions"][0]["content"], page.get("categories", [])) -def host_from_site(site): + +def host_from_site(site: str) -> str: + """Host from site.""" return hosts[site] -def process_cats(cats, site): - return [Category(cat['title'], site) for cat in cats] -def get_categories(titles, site): - params = { - 'prop': 'categories', - 'clshow': '!hidden', - 'cllimit': 'max', +def process_cats(cats: list[dict[str, str]], site: str) -> list[Category]: + """Process categories.""" + return [Category(cat["title"], site) for cat in cats] + + +def get_categories(titles: list[str], site: str) -> list[tuple[str, list[Category]]]: + """Get categories for pages with given titles.""" + params: CallParams = { + "prop": "categories", + "clshow": "!hidden", + "cllimit": "max", } from_wiki = mediawiki_query(titles, params, site) title_and_cats = [] for i in from_wiki: - if 'categories' not in i: + if "categories" not in i: continue - cats = process_cats(i['categories'], site) + cats = process_cats(i["categories"], site) if not cats: continue - title_and_cats.append((i['title'], cats)) + title_and_cats.append((i["title"], cats)) return title_and_cats -def get_history(title, site): - params = { - 'prop': 'revisions', - 'rvlimit': 'max', - 'rvprop': 'timestamp|user|comment|ids|content', - 'rvslots': 'main', + +def get_history(title: str, site: str) -> list[Page]: + """Get history of a page.""" + params: CallParams = { + "prop": "revisions", + "rvlimit": "max", + "rvprop": "timestamp|user|comment|ids|content", + "rvslots": "main", } return mediawiki_query([title], params, site) diff --git a/depicts/model.py b/depicts/model.py index e4c4e37..01912b6 100644 --- a/depicts/model.py +++ b/depicts/model.py @@ -1,163 +1,181 @@ -from sqlalchemy.ext.declarative import declarative_base -from .database import session, now_utc -from . import wikibase, utils -from sqlalchemy.schema import Column, ForeignKey -from sqlalchemy.types import Integer, String, DateTime, Boolean -from sqlalchemy.orm import column_property, relationship, synonym -from sqlalchemy.ext.associationproxy import association_proxy -from sqlalchemy.sql.expression import cast -from sqlalchemy.dialects import postgresql +import typing +from datetime import timedelta from urllib.parse import quote +from sqlalchemy.dialects import postgresql +from sqlalchemy.ext.associationproxy import association_proxy +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import column_property, relationship, synonym +from sqlalchemy.schema import Column, ForeignKey +from sqlalchemy.sql.expression import cast +from sqlalchemy.types import Boolean, DateTime, Integer, String + +from . import utils, wikibase +from .database import now_utc, session +from .type import Claims, Entity + Base = declarative_base() -Base.query = session.query_property() +Base.query = session.query_property() # type: ignore + class User(Base): - __tablename__ = 'user' + __tablename__ = "user" id = Column(Integer, primary_key=True, autoincrement=False) username = Column(String, unique=True) options = Column(postgresql.JSON) first_seen = Column(DateTime, default=now_utc()) is_admin = Column(Boolean, default=False) + class DepictsItem(Base): - __tablename__ = 'depicts' + __tablename__ = "depicts" item_id = Column(Integer, primary_key=True, autoincrement=False) label = Column(String) description = Column(String) count = Column(Integer) - qid = column_property('Q' + cast(item_id, String)) - db_alt_labels = relationship('DepictsItemAltLabel', - collection_class=set, - cascade='save-update, merge, delete, delete-orphan', - backref='item') - alt_labels = association_proxy('db_alt_labels', 'alt_label') + qid = column_property("Q" + cast(item_id, String)) + db_alt_labels = relationship( + "DepictsItemAltLabel", + collection_class=set, + cascade="save-update, merge, delete, delete-orphan", + backref="item", + ) + alt_labels = association_proxy("db_alt_labels", "alt_label") # type: ignore + class DepictsItemAltLabel(Base): - __tablename__ = 'depicts_alt_label' - item_id = Column(Integer, - ForeignKey('depicts.item_id'), - primary_key=True, - autoincrement=False) + __tablename__ = "depicts_alt_label" + item_id = Column( + Integer, ForeignKey("depicts.item_id"), primary_key=True, autoincrement=False + ) alt_label = Column(String, primary_key=True) - def __init__(self, alt_label): + def __init__(self, alt_label: str) -> None: self.alt_label = alt_label + class Item(Base): - __tablename__ = 'item' + __tablename__ = "item" item_id = Column(Integer, primary_key=True, autoincrement=False) # label = Column(String) # column removed 2019-12-18 entity = Column(postgresql.JSON) lastrevid = Column(Integer, nullable=True, unique=True) modified = Column(DateTime, nullable=True) is_artwork = Column(Boolean, nullable=False, default=False) - qid = column_property('Q' + cast(item_id, String)) + qid = column_property("Q" + cast(item_id, String)) - def image_count(self): - p18 = self.entity['claims'].get('P18') + @property + def claims(self) -> Claims: + return typing.cast(Entity, self.entity)["claims"] + + def image_count(self) -> int: + p18 = self.claims.get("P18") return len(p18) if p18 else 0 - def image_filename(self): - p18 = self.entity['claims'].get('P18') + def image_filename(self) -> str | None: + p18 = self.claims.get("P18") if not p18: - return + return None try: - return p18[0]['mainsnak']['datavalue']['value'] + filename: str = p18[0]["mainsnak"]["datavalue"]["value"] + return filename except KeyError: - return + return None @property - def label(self): - return wikibase.get_entity_label(self.entity) + def label(self) -> str | None: + return wikibase.get_entity_label(typing.cast(Entity, self.entity)) @property - def artist(self): - v = wikibase.first_datavalue(self.entity, 'P170') - if not v: - return - return v['id'] + def artist(self) -> str | None: + v = wikibase.first_datavalue(typing.cast(Entity, self.entity), "P170") + assert isinstance(v, dict) + return v["id"] if v else None @property - def depicts(self): - return self.linked_qids('P180') + def depicts(self) -> list[str]: + return self.linked_qids("P180") @property - def instance_of(self): - return self.linked_qids('P31') + def instance_of(self) -> list[str]: + return self.linked_qids("P31") - def linked_qids(self, prop): - values = self.entity['claims'].get(prop) or [] - return [v['mainsnak']['datavalue']['value']['id'] - for v in values - if 'datavalue' in v['mainsnak']] + def linked_qids(self, prop: str) -> list[str]: + values = typing.cast(Entity, self.entity)["claims"].get(prop) or [] + return [ + v["mainsnak"]["datavalue"]["value"]["id"] + for v in values + if "datavalue" in v["mainsnak"] + ] @property - def date(self): - v = wikibase.first_datavalue(self.entity, 'P571') - if v: - return utils.format_time(v['time'], v['precision']) + def date(self) -> str | None: + v = wikibase.first_datavalue(typing.cast(Entity, self.entity), "P571") + assert isinstance(v, dict) + return utils.format_time(v["time"], v["precision"]) if v else None + class Triple(Base): - __tablename__ = 'triple' - subject_id = Column(Integer, - ForeignKey('item.item_id'), - primary_key=True) + __tablename__ = "triple" + subject_id = Column(Integer, ForeignKey("item.item_id"), primary_key=True) predicate_id = Column(Integer, primary_key=True, index=True) object_id = Column(Integer, primary_key=True, index=True) - subject = relationship('Item', backref='triples') + subject = relationship("Item", backref="triples") + class HumanItem(Base): - __tablename__ = 'human' + __tablename__ = "human" item_id = Column(Integer, primary_key=True, autoincrement=False) year_of_birth = Column(Integer, nullable=False) year_of_death = Column(Integer, nullable=False) age_at_death = column_property(year_of_death - year_of_birth) - qid = column_property('Q' + cast(item_id, String)) + qid = column_property("Q" + cast(item_id, String)) + + yob = synonym("year_of_birth") + yod = synonym("year_of_death") - yob = synonym('year_of_birth') - yod = synonym('year_of_death') class Language(Base): - __tablename__ = 'language' + __tablename__ = "language" item_id = Column(Integer, primary_key=True, autoincrement=False) wikimedia_language_code = Column(String, index=True, unique=True) en_label = Column(String, nullable=False) - code = synonym('wikimedia_language_code') - label = synonym('en_label') + code = synonym("wikimedia_language_code") + label = synonym("en_label") @classmethod - def get_by_code(cls, code): - return cls.query.filter_by(wikimedia_language_code=code).one() + def get_by_code(cls, code: str) -> "Language": + return cls.query.filter_by(wikimedia_language_code=code).one() # type: ignore class Edit(Base): - __tablename__ = 'edit' + __tablename__ = "edit" username = Column(String, primary_key=True) - artwork_id = Column(Integer, ForeignKey('item.item_id'), primary_key=True) - depicts_id = Column(Integer, ForeignKey('depicts.item_id'), primary_key=True) + artwork_id = Column(Integer, ForeignKey("item.item_id"), primary_key=True) + depicts_id = Column(Integer, ForeignKey("depicts.item_id"), primary_key=True) timestamp = Column(DateTime, default=now_utc()) lastrevid = Column(Integer, nullable=True) - artwork_qid = column_property('Q' + cast(artwork_id, String)) - depicts_qid = column_property('Q' + cast(depicts_id, String)) + artwork_qid = column_property("Q" + cast(artwork_id, String)) + depicts_qid = column_property("Q" + cast(depicts_id, String)) - artwork = relationship('Item') - depicts = relationship('DepictsItem') + artwork = relationship("Item") + depicts = relationship("DepictsItem") @property - def url_norm_username(self): - return quote(self.username.replace(' ', '_')) + def url_norm_username(self) -> str: + return quote(self.username.replace(" ", "_")) @property - def user_wikidata_url(self): - return 'https://www.wikidata.org/wiki/User:' + self.url_norm_username + def user_wikidata_url(self) -> str: + return "https://www.wikidata.org/wiki/User:" + self.url_norm_username + class WikidataQuery(Base): - __tablename__ = 'wikidata_query' + __tablename__ = "wikidata_query" id = Column(Integer, primary_key=True) start_time = Column(DateTime) end_time = Column(DateTime) @@ -171,27 +189,27 @@ class WikidataQuery(Base): endpoint = Column(String) @property - def duration(self): - if self.end_time: - return self.end_time - self.start_time + def duration(self) -> timedelta: + assert self.start_time and self.end_time + return self.end_time - self.start_time @property - def display_seconds(self): - return f'{self.duration.total_seconds():.1f}' + def display_seconds(self) -> str: + return f"{self.duration.total_seconds():.1f}" @property - def template(self): + def template(self) -> str | None: if not self.query_template: - return + return None t = self.query_template - if t.startswith('query/'): + if t.startswith("query/"): t = t[6:] - if t.endswith('.sparql'): + if t.endswith(".sparql"): t = t[:-7] return t @property - def bad(self): - return self.status_code and self.status_code != 200 + def bad(self) -> bool: + return bool(self.status_code and self.status_code != 200) diff --git a/depicts/museodelprado.py b/depicts/museodelprado.py index 129127a..be8a86d 100644 --- a/depicts/museodelprado.py +++ b/depicts/museodelprado.py @@ -1,42 +1,57 @@ -import requests -import lxml.html +"""Museo del Prado (Q160112) - Spanish national art museum in Madrid, Spain.""" + import os import re +import typing -re_url = re.compile(r'www.museodelprado.es/(.+)$') +import lxml.html +import requests -def get_html(url): - catalog_id = re_url.search(url).group(1).replace('/', '_') +from .type import CatalogDict, EmptyDict - filename = f'cache/museodelprado_{catalog_id}.html' +re_url = re.compile(r"www.museodelprado.es/(.+)$") + + +def get_html(url: str) -> str: + """Get HTML from web catalog.""" + assert (m := re_url.search(url)) + catalog_id = m.group(1).replace("/", "_") + + filename = f"cache/museodelprado_{catalog_id}.html" if os.path.exists(filename): html = open(filename).read() else: r = requests.get(url) html = r.text - open(filename, 'w').write(html) + open(filename, "w").write(html) return html -def parse_html(html): + +def parse_html(html: str) -> CatalogDict | EmptyDict: + """Parse HTML and extract keywords.""" root = lxml.html.fromstring(html) keywords = [] - for h2 in root.findall('.//h2'): - if not h2.text or h2.text.strip() != 'Displayed objects': + for h2 in root.findall(".//h2"): + if not h2.text or h2.text.strip() != "Displayed objects": continue div = h2.getparent() - for keyword_span in div.findall('.//span[@property]'): + assert div is not None + for keyword_span in div.findall(".//span[@property]"): + assert isinstance(keyword_span.text, str) keywords.append(keyword_span.text) if not keywords: - return {} + return typing.cast(EmptyDict, {}) return { - 'institution': 'Museo del Prado', - 'keywords': keywords, + "institution": "Museo del Prado", + "keywords": keywords, } -def get_catalog(url): + +def get_catalog(url: str) -> CatalogDict | EmptyDict: + """Get catalog web page and extract keywords.""" return parse_html(get_html(url)) diff --git a/depicts/npg.py b/depicts/npg.py index d88cdd5..1fde880 100644 --- a/depicts/npg.py +++ b/depicts/npg.py @@ -1,37 +1,52 @@ -import requests -import lxml.html +"""National Portrait Gallery (Q238587) - art museum in London, England.""" + import os import re -re_url = re.compile(r'www.npg.org.uk/collections/search/(.+)$') +import lxml.html +import requests -def get_html(url): - catalog_id = re_url.search(url).group(1).replace('/', '_') +from .type import CatalogDict - filename = f'cache/npg_{catalog_id}.html' +re_url = re.compile(r"www.npg.org.uk/collections/search/(.+)$") + + +def get_html(url: str) -> str: + """Get HTML from web catalog.""" + assert (m := re_url.search(url)) + catalog_id = m.group(1).replace("/", "_") + + filename = f"cache/npg_{catalog_id}.html" if os.path.exists(filename): html = open(filename).read() else: r = requests.get(url) html = r.text - open(filename, 'w').write(html) + open(filename, "w").write(html) return html -def parse_html(html): + +def parse_html(html: str) -> CatalogDict: + """Parse HTML and extract keywords.""" root = lxml.html.fromstring(html) - keywords = [a.text for a in root.findall('.//a[@href]') - if 'subj=' in a.get('href')] + keywords = [ + a.text + for a in root.findall(".//a[@href]") + if "subj=" in a.get("href") # type: ignore + ] - skip = {'oil', 'painting'} - keywords = [k for k in keywords if k.lower() not in skip] + skip = {"oil", "painting"} + keywords = [k for k in keywords if k.lower() not in skip] # type: ignore return { - 'institution': 'National Portrait Gallery', - 'keywords': keywords, + "institution": "National Portrait Gallery", + "keywords": keywords, # type: ignore } -def get_catalog(url): + +def get_catalog(url: str) -> CatalogDict: + """Get catalog web page and extract keywords.""" return parse_html(get_html(url)) diff --git a/depicts/pager.py b/depicts/pager.py index 16846ed..d848094 100644 --- a/depicts/pager.py +++ b/depicts/pager.py @@ -1,47 +1,80 @@ +"""Pagination.""" + +import typing from math import ceil -from flask import request, url_for + +from flask import Flask, request, url_for + +T = typing.TypeVar("T") + class Pagination(object): - def __init__(self, page, per_page, total_count): + """Pagination.""" + + page: int + per_page: int + total_count: int + + def __init__(self, page: int, per_page: int, total_count: int) -> None: + """Init.""" self.page = page self.per_page = per_page self.total_count = total_count @property - def pages(self): + def pages(self) -> int: + """Page count.""" return int(ceil(self.total_count / float(self.per_page))) @property - def has_prev(self): + def has_prev(self) -> bool: + """Has previous page.""" return self.page > 1 @property - def has_next(self): + def has_next(self) -> bool: + """Has next page.""" return self.page < self.pages - def slice(self, items): - first = ((self.page - 1) * self.per_page) + def slice(self, items: list[T]) -> list[T]: + """Slice of items for the current page.""" + first = (self.page - 1) * self.per_page last = self.page * self.per_page return items[first:last] - def iter_pages(self, left_edge=2, left_current=6, - right_current=6, right_edge=2): + def iter_pages( + self, + left_edge: int = 2, + left_current: int = 6, + right_current: int = 6, + right_edge: int = 2, + ) -> typing.Iterator[int | None]: + """Iterate page numbers.""" last = 0 for num in range(1, self.pages + 1): - if num <= left_edge or \ - (num > self.page - left_current - 1 and \ - num < self.page + right_current) or \ - num > self.pages - right_edge: + if ( + num <= left_edge + or ( + num > self.page - left_current - 1 + and num < self.page + right_current + ) + or num > self.pages - right_edge + ): if last + 1 != num: yield None yield num last = num -def url_for_other_page(page): + +def url_for_other_page(page: int) -> str: + """Make URL for other page.""" + assert request.view_args is not None and request.endpoint args = request.view_args.copy() args.update(request.args) - args['page'] = page + args["page"] = page return url_for(request.endpoint, **args) -def init_pager(app): - app.jinja_env.globals['url_for_other_page'] = url_for_other_page + +def init_pager(app: Flask) -> None: + """Initialise pager.""" + app.jinja_env.globals["url_for_other_page"] = url_for_other_page diff --git a/depicts/relaxed_ssl.py b/depicts/relaxed_ssl.py index 1c70a3a..7678681 100644 --- a/depicts/relaxed_ssl.py +++ b/depicts/relaxed_ssl.py @@ -1,16 +1,25 @@ +"""Enable Python requests that ignores bad HTTPS certificates.""" + +import typing + import requests from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.ssl_ import create_urllib3_context -CIPHERS = 'DEFAULT@SECLEVEL=1' +CIPHERS = "DEFAULT@SECLEVEL=1" + class HTTPSAdapter(HTTPAdapter): - def init_poolmanager(self, *args, **kwargs): - context = create_urllib3_context(ciphers=CIPHERS) - kwargs['ssl_context'] = context - return super().init_poolmanager(*args, **kwargs) + """HTTPS Adapter subclass.""" -def get(*args, **kwargs): + def init_poolmanager(self, *args: typing.Any, **kwargs: typing.Any) -> None: + """Init pool manager.""" + context = create_urllib3_context(ciphers=CIPHERS) + kwargs["ssl_context"] = context + return super().init_poolmanager(*args, **kwargs) # type: ignore + + +def get(*args: typing.Any, **kwargs: typing.Any) -> requests.Response: s = requests.Session() - s.mount('https://', HTTPSAdapter()) + s.mount("https://", HTTPSAdapter()) return s.get(*args, **kwargs, verify=False) diff --git a/depicts/rijksmuseum.py b/depicts/rijksmuseum.py index 86284a3..59fc7cc 100644 --- a/depicts/rijksmuseum.py +++ b/depicts/rijksmuseum.py @@ -1,34 +1,51 @@ -import requests -import lxml.html +"""Rijksmuseuma (Q190804) - museum in Amsterdam, Netherlands.""" + import os import re -re_url = re.compile(r'^https://www.rijksmuseum.nl/(?:nl/collectie|en/collection)/([^/]+)$') +import lxml.html +import requests -def get_html(catalog_id): - filename = f'cache/rijksmuseum_{catalog_id}.html' - en_url = 'https://www.rijksmuseum.nl/en/collection/' + catalog_id +from .type import CatalogDict + +re_url = re.compile( + r"^https://www.rijksmuseum.nl/(?:nl/collectie|en/collection)/([^/]+)$" +) + + +def get_html(catalog_id: str) -> str: + """Get HTML from web catalog.""" + filename = f"cache/rijksmuseum_{catalog_id}.html" + en_url = "https://www.rijksmuseum.nl/en/collection/" + catalog_id if os.path.exists(filename): html = open(filename).read() else: r = requests.get(en_url) html = r.text - open(filename, 'w').write(html) + open(filename, "w").write(html) return html -def parse_html(html): + +def parse_html(html: str) -> CatalogDict: + """Parse HTML and extract keywords.""" root = lxml.html.fromstring(html) - keywords = [a.text for a in root.findall('.//a[@href]') - if 'f.classification.iconClassDescription.sort' in a.get('href')] + keywords = [ + a.text + for a in root.findall(".//a[@href]") + if "f.classification.iconClassDescription.sort" in a.get("href") # type: ignore + ] return { - 'institution': 'Rijksmuseum', - 'keywords': keywords, + "institution": "Rijksmuseum", + "keywords": keywords, # type: ignore } -def get_catalog(url): - catalog_id = re_url.search(url).group(1) + +def get_catalog(url: str) -> CatalogDict: + """Get catalog web page and extract keywords.""" + assert (m := re_url.search(url)) + catalog_id = m.group(1) return parse_html(get_html(catalog_id)) diff --git a/depicts/saam.py b/depicts/saam.py index 1d58b7c..c9199d6 100644 --- a/depicts/saam.py +++ b/depicts/saam.py @@ -1,45 +1,59 @@ -import requests -import lxml.html +"""Smithsonian American Art Museum (Q1192305) - fine arts museum in Washington, D.C.""" + import json import os +import typing -def get_html(saam_id): - filename = f'cache/saam_{saam_id}.html' - url = 'http://americanart.si.edu/collections/search/artwork/' +import lxml.html +import requests + +from .type import CatalogDict, EmptyDict + + +def get_html(saam_id: str | int) -> str: + """Get HTML from web catalog.""" + filename = f"cache/saam_{saam_id}.html" + url = "http://americanart.si.edu/collections/search/artwork/" if os.path.exists(filename): html = open(filename).read() else: - r = requests.get(url, params={'id': saam_id}) + r = requests.get(url, params={"id": saam_id}) html = r.text - open(filename, 'w').write(html) + open(filename, "w").write(html) return html -def parse_html(html): + +def parse_html(html: str) -> dict[str, typing.Any] | None: + """Parse HTML and extract keywords.""" root = lxml.html.fromstring(html) ld_json = root.findtext('.//script[@type="application/ld+json"]') if ld_json is None: - return {'ld': {}, 'keywords': []} + return {"ld": {}, "keywords": []} ld = json.loads(ld_json) ul = root.find('.//ul[@class="ontology-list"]') if ul is None: - return - assert ul.tag == 'ul' + return None + assert ul.tag == "ul" keywords = [li.text for li in ul] - return {'ld': ld, 'keywords': keywords} + return {"ld": ld, "keywords": keywords} -def get_catalog(saam_id): + +def get_catalog(saam_id: int | str) -> CatalogDict | EmptyDict: + """Get catalog web page and extract keywords.""" data = parse_html(get_html(saam_id)) + empty: EmptyDict = {} if not data: - return {} - ret = { - 'institution': 'Smithsonian American Art Museum', + return empty + ret: CatalogDict = { + "institution": "Smithsonian American Art Museum", + "keywords": [], } - if data['keywords']: - ret['keywords'] = data['keywords'] - if 'description' in data['ld']: - ret['description'] = data['ld']['description'] + if data["keywords"]: + ret["keywords"] = data["keywords"] + if "description" in data["ld"]: + ret["description"] = data["ld"]["description"] - return ret if 'description' in ret or 'keywords' in ret else {} + return ret if "description" in ret or "keywords" in ret else empty diff --git a/depicts/type.py b/depicts/type.py new file mode 100644 index 0000000..3418f47 --- /dev/null +++ b/depicts/type.py @@ -0,0 +1,50 @@ +import typing +from typing import Required, TypedDict + +Claims = dict[str, list[dict[str, typing.Any]]] +Sitelinks = dict[str, dict[str, typing.Any]] + + +class EmptyDict(TypedDict): + """Empty dict.""" + + pass + + +class Entity(TypedDict, total=False): + """Wikidata Entity.""" + + id: str + labels: dict[str, typing.Any] + descriptions: dict[str, typing.Any] + claims: Claims + lastrevid: int + sitelinks: Sitelinks + modified: str + redirects: dict[str, typing.Any] + aliases: dict[str, list[dict[str, typing.Any]]] + + +class CatalogDict(TypedDict, total=False): + """Catalog record from institution web site.""" + + institution: str + url: str + ids: set[str] + detail: list[dict[str, str]] + description: str + keywords: list[str] + + +CallParams = dict[str, str | int] + + +class HumanDict(TypedDict, total=False): + """Human.""" + + qid: Required[str] + year_of_birth: Required[int] + year_of_death: Required[int] + + label: str + description: str diff --git a/depicts/utils.py b/depicts/utils.py index 9d4c93d..81dd8c1 100644 --- a/depicts/utils.py +++ b/depicts/utils.py @@ -1,91 +1,121 @@ -from flask import request -from itertools import islice -from datetime import datetime +"""Various utility functions.""" + +import typing import urllib.parse +from datetime import datetime +from itertools import islice + import inflect +from flask import request hosts = { - 'commons': 'commons.wikimedia.org', - 'enwiki': 'en.wikipedia.org', - 'wikidata': 'www.wikidata.org', + "commons": "commons.wikimedia.org", + "enwiki": "en.wikipedia.org", + "wikidata": "www.wikidata.org", } engine = inflect.engine() -skip_names = { - 'National Gallery' -} +skip_names = {"National Gallery"} -def ordinal(n): - return "%d%s" % (n, 'tsnrhtdd'[(n / 10 % 10 != 1) * (n % 10 < 4) * n % 10::4]) +T = typing.TypeVar("T") -def chunk(it, size): + +def ordinal(n: int) -> str: + """Convert number to ordinal.""" + return "%d%s" % (n, "tsnrhtdd"[(n / 10 % 10 != 1) * (n % 10 < 4) * n % 10 :: 4]) + + +def chunk(it: typing.Iterable[T], size: int) -> typing.Iterator[tuple[T, ...]]: + """Split an iterable into chunks of the given size.""" it = iter(it) return iter(lambda: tuple(islice(it, size)), ()) -def drop_start(s, start): + +def drop_start(s: str, start: str) -> str: + """Remove string prefix, otherwise throw an error.""" assert s.startswith(start) - return s[len(start):] + return s[len(start) :] -def drop_category_ns(s): - return drop_start(s, 'Category:') -def parse_sitelink(s, start): - return urllib.parse.unquote(drop_start(s, start)).replace('_', ' ') +def drop_category_ns(s: str) -> str: + """Remove 'Category:' from start of string.""" + return drop_start(s, "Category:") -def word_contains_letter(word): + +def parse_sitelink(s: str, start: str) -> str: + """Extract title from sitelink.""" + return urllib.parse.unquote(drop_start(s, start)).replace("_", " ") + + +def word_contains_letter(word: str) -> bool: + """Word contains letter.""" return any(c.isalpha() for c in word) -def also_singular(name): + +def also_singular(name: str) -> list[str]: names = also_singular_main(name) extra = [] for n in names: words = set(n.lower().split()) - for word in 'girl', 'boy': + for word in "girl", "boy": if word in words: extra.append(word) - if {'female', 'females', 'women'} & words: - extra.append('woman') - if {'male', 'males', 'men'} & words: - extra.append('man') + if {"female", "females", "women"} & words: + extra.append("woman") + if {"male", "males", "men"} & words: + extra.append("man") return [n for n in names + extra if n not in skip_names] -def also_singular_main(name): - ''' + +def also_singular_main(name: str) -> list[str]: + """ given a singular name return a list of both the plural and singular versions just return the name if it isn't singular - ''' - singular = engine.singular_noun(name.strip('|')) + """ + singular = engine.singular_noun(name.strip("|")) if not singular: return [name] n, s = name.lower(), singular.lower() - if (n == s or - n.replace('paintings', '') == s.replace('painting', '') or - n == 'venus' and s == 'venu'): + if ( + n == s + or n.replace("paintings", "") == s.replace("painting", "") + or n == "venus" + and s == "venu" + ): return [name] return [name, singular] -def wiki_url(title, site, ns=None): + +def wiki_url(title: str, site: str, ns: str | None = None) -> str: + """Build mediawiki URL for given title.""" host = hosts[site] - url_ns = ns + ':' if ns else '' - if not title: - return + url_ns = ns + ":" if ns else "" + assert title if title[0].islower(): title = title[0].upper() + title[1:] - return f'https://{host}/wiki/' + url_ns + urllib.parse.quote(title.replace(' ', '_')) + return ( + f"https://{host}/wiki/" + url_ns + urllib.parse.quote(title.replace(" ", "_")) + ) -def get_int_arg(name): + +def get_int_arg(name: str) -> int | None: + """Get an request arg and convert to integer.""" if name in request.args and request.args[name].isdigit(): return int(request.args[name]) + else: + return None -def format_time(time_value, precision): + +def format_time(time_value: str, precision: int) -> str: + """Format time with given precision.""" # FIXME handle dates like '1965-04-00T00:00:00Z' # FIXME handle BC dates properly, "120 B.C." instead of "-120" year = None - if '-00' in time_value: + if "-00" in time_value: # can't be represented as python datetime - year = int(time_value[:time_value.find('-', 1)]) + year = int(time_value[: time_value.find("-", 1)]) else: try: t = datetime.strptime(time_value[1:], "%Y-%m-%dT%H:%M:%SZ") @@ -96,10 +126,10 @@ def format_time(time_value, precision): if precision == 9: return str(year) if precision == 8: - return f'{year}s' + return f"{year}s" if precision == 7: - return f'{ordinal((year // 100) + 1)} century' + return f"{ordinal((year // 100) + 1)} century" if precision == 6: - return f'{ordinal((year // 1000) + 1)} millennium' + return f"{ordinal((year // 1000) + 1)} millennium" return time_value diff --git a/depicts/wd_catalog.py b/depicts/wd_catalog.py index 366b4d7..22bfa8a 100644 --- a/depicts/wd_catalog.py +++ b/depicts/wd_catalog.py @@ -1,147 +1,345 @@ -from depicts import (wikibase, relaxed_ssl, saam, dia, rijksmuseum, npg, - museodelprado, barnesfoundation) +import hashlib +import os.path + +import lxml.html import requests import requests.exceptions -import lxml.html -import os.path -import hashlib -user_agent = 'Mozilla/5.0 (X11; Linux i586; rv:32.0) Gecko/20160101 Firefox/32.0' +from depicts import ( + barnesfoundation, + dia, + museodelprado, + npg, + relaxed_ssl, + rijksmuseum, + saam, + wikibase, +) + +from .type import CatalogDict, Entity + +user_agent = "Mozilla/5.0 (X11; Linux i586; rv:32.0) Gecko/20160101 Firefox/32.0" table = { - 'P347': ('Joconde ID', 'https://www.pop.culture.gouv.fr/notice/joconde/$1'), - 'P350': ('RKDimages ID', 'https://rkd.nl/explore/images/$1'), - 'P1212': ('Atlas ID', 'http://cartelen.louvre.fr/cartelen/visite?srv=car_not_frame&idNotice=$1'), - 'P1428': ('Lost Art ID', 'http://www.lostart.de/EN/Verlust/$1'), - 'P1679': ('Art UK artwork ID', 'https://artuk.org/discover/artworks/$1'), - 'P1726': ('Florentine musea Inventario 1890 ID', 'http://www.polomuseale.firenze.it/inv1890/scheda.asp?position=1&ninv=$1'), - 'P2014': ('Museum of Modern Art work ID', 'http://www.moma.org/collection/works/$1'), - 'P2092': ('Bildindex der Kunst und Architektur ID', 'https://www.bildindex.de/document/obj$1'), - 'P2108': ('Kunstindeks Danmark artwork ID', 'https://www.kulturarv.dk/kid/VisVaerk.do?vaerkId=$1'), - 'P2242': ('Florentine musea catalogue ID', 'http://www.polomuseale.firenze.it/catalogo/scheda.asp?nctn=$1&value=1'), - 'P2282': ('Groeningemuseum work PID', 'http://groeningemuseum.be/collection/work/id/$1'), - 'P2344': ('AGORHA work ID', 'http://www.purl.org/inha/agorha/003/$1'), - 'P2511': ('MSK Gent work PID', 'http://mskgent.be/collection/work/id/$1'), - 'P2539': ('Nationalmuseum Sweden artwork ID', 'http://collection.nationalmuseum.se/eMuseumPlus?service=ExternalInterface&module=collection&objectId=$1&viewType=detailView'), - 'P2582': ('J. Paul Getty Museum object ID', 'http://www.getty.edu/art/collection/objects/$1'), - 'P3272': ('Zeri image ID', 'http://catalogo.fondazionezeri.unibo.it/scheda/opera/$1/'), - 'P3293': ('BALaT object ID', 'http://balat.kikirpa.be/object/$1'), - 'P3386': ('French Sculpture Census work ID', 'https://frenchsculpture.org/en/sculpture/$1'), - 'P3467': ('Inventario Sculture - Polo Museale Fiorentino', 'http://www.polomuseale.firenze.it/invSculture/scheda.asp?position=1&ninv=$1'), - 'P3504': ('Florentine Inventario Palatina art ID', 'http://www.polomuseale.firenze.it/invpalatina/scheda.asp?position=1&ninv=$1'), - 'P3634': ('The Met object ID', 'http://www.metmuseum.org/art/collection/search/$1'), - 'P3711': ('Vanderkrogt.net Statues ID', 'http://vanderkrogt.net/statues/object.php?record=$1'), - 'P3855': ('LombardiaBeniCulturali artwork ID', 'http://www.lombardiabeniculturali.it/opere-arte/schede/$1/'), - 'P3929': ('V&A item ID', 'http://collections.vam.ac.uk/item/$1'), - 'P4144': ('Athenaeum artwork ID', 'http://www.the-athenaeum.org/art/detail.php?id=$1'), - 'P4257': ('National Museums of Japan e-museum ID', 'http://www.emuseum.jp/detail/$1'), - 'P4373': ('National Trust Collections ID', 'http://www.nationaltrustcollections.org.uk/object/$1'), - 'P4380': ('Sandrart.net artwork ID', 'http://ta.sandrart.net/-artwork-$1'), - 'P4399': ('Enciclopédia Itaú Cultural ID', 'http://enciclopedia.itaucultural.org.br/$1'), - 'P4525': ('MuIS object ID', 'http://opendata.muis.ee/object/$1'), - 'P4564': ('Art Museum of Estonia artwork ID', 'https://digikogu.ekm.ee/oid-$1'), - 'P4582': ('Kulturelles Erbe Köln object ID', 'https://www.kulturelles-erbe-koeln.de/documents/obj/$1'), - 'P4610': ('ARTIC artwork ID', 'https://www.artic.edu/artworks/$1'), - 'P4611': ('LACMA ID', 'https://collections.lacma.org/node/$1'), - 'P4625': ('Museum of Fine Arts, Boston object ID', 'https://www.mfa.org/collections/object/$1'), - 'P4643': ('Philadelphia Museum of Art ID', 'http://www.philamuseum.org/collections/permanent/$1.html'), - 'P4659': ("Musée d'Orsay artwork ID", 'http://www.musee-orsay.fr/en/collections/index-of-works/notice.html?nnumid=$1'), - 'P4673': ('Museum of Fine Arts, Houston object ID', 'https://www.mfah.org/art/detail/$1'), - 'P4674': ('Indianapolis Museum of Art artwork ID', 'http://collection.imamuseum.org/artwork/$1/'), - 'P4683': ('National Gallery of Art artwork ID', 'https://www.nga.gov/content/ngaweb/Collection/art-object-page.$1.html'), - 'P4684': ('National Gallery of Victoria artwork ID', 'https://www.ngv.vic.gov.au/explore/collection/work/$1/'), - 'P4686': ('Carnegie Museum of Art ID', 'https://collection.cmoa.org/objects/$1'), - 'P4692': ('American Art Collaborative object ID', 'http://browse.americanartcollaborative.org/object/$1.html'), - 'P4701': ('Google Arts & Culture asset ID', 'https://artsandculture.google.com/asset/wd/$1'), - 'P4704': ('Smithsonian American Art Museum ID', 'https://americanart.si.edu/collections/search/artwork/?id=$1'), - 'P4709': ('Barnes Foundation ID', 'https://collection.barnesfoundation.org/objects/$1/details'), - 'P4712': ('Minneapolis Institute of Art artwork ID', 'https://collections.artsmia.org/art/$1'), - 'P4713': ('Walters Art Museum ID', 'http://art.thewalters.org/detail/$1'), - 'P4721': ('MuBE Virtual ID', 'http://mubevirtual.com.br/pt_br?Dados&area=ver&id=$1'), - 'P4737': ('Solomon R. Guggenheim Foundation artwork ID', 'https://www.guggenheim.org/artwork/$1'), - 'P4738': ('Yale Center for British Art artwork ID', 'http://collections.britishart.yale.edu/vufind/Record/$1'), - 'P4739': ('Musée des Augustins artwork ID', 'https://www.augustins.org/fr/oeuvre/-/oeuvre/$1'), - 'P4740': ('Brooklyn Museum artwork ID', 'https://www.brooklynmuseum.org/opencollection/objects/$1'), - 'P4761': ("Images d'Art artwork ID", 'http://art.rmngp.fr/en/library/artworks/$1'), - 'P4764': ('Arcade artwork ID', 'http://www.culture.gouv.fr/public/mistral/arcade_fr?ACTION=CHERCHER&FIELD_1=REF&VALUE_1=$1'), - 'P4814': ('Inventories of American Painting and Sculpture control number', 'https://siris-artinventories.si.edu/ipac20/ipac.jsp?&menu=search&index=.NW&term=$1'), - 'P4905': ('KMSKA work PID', 'http://kmska.be/collection/work/id/$1'), - 'P5210': ('National Gallery of Armenia work ID', 'http://www.gallery.am/en/database/item/$1/'), - 'P5223': ('Information Center for Israeli Art artwork ID', 'http://museum.imj.org.il/artcenter/includes/item.asp?id=$1'), - 'P5265': ('Dordrechts Museum artwork ID', 'https://www.dordrechtsmuseum.nl/objecten/id/$1'), - 'P5268': ('MNAV work ID', 'http://acervo.mnav.gub.uy/obras.php?q=ni:$1'), - 'P5269': ('Web umenia work ID', 'https://www.webumenia.sk/dielo/$1'), - 'P5407': ('MHK object ID', 'http://datenbank.museum-kassel.de/$1'), - 'P5499': ('Boijmans work ID', 'https://www.boijmans.nl/en/collection/artworks/$1'), - 'P5783': ('Cranach Digital Archive artwork ID', 'http://lucascranach.org/$1'), - 'P5823': ('Belvedere object ID', 'https://digital.belvedere.at/objects/$1/'), - 'P5891': ('Bpk-ID', 'http://www.bpk-images.de/id/$1'), - 'P6004': ('Brasiliana Iconográfica ID', 'https://www.brasilianaiconografica.art.br/obras/$1/wd'), - 'P6007': ('Salons ID', 'http://salons.musee-orsay.fr/index/notice/$1'), - 'P6020': ("d'Art d'Art ! ID", 'https://www.france.tv/france-2/d-art-d-art/$1.html'), - 'P6141': ('À nos grands hommes ID', 'https://anosgrandshommes.musee-orsay.fr/index.php/Detail/objects/$1'), - 'P6152': ('National Portrait Gallery (United States) object ID', 'http://npg.si.edu/object/npg_$1'), - 'P6238': ('Monument aux morts ID', 'https://monumentsmorts.univ-lille.fr/monument/$1/wd/'), - 'P6239': ('IEC commemorative monument of Catalonia ID', 'https://monuments.iec.cat/fitxa.asp?id=$1'), - 'P6246': ('Paris Musées work ID', 'http://parismuseescollections.paris.fr/en/node/$1'), - 'P6310': ('Muséosphère work ID', 'http://museosphere.paris.fr/oeuvres/$1'), - 'P6332': ("Panorama de l'art ID", 'https://www.panoramadelart.com/$1'), - 'P6355': ('MNAM artwork ID', 'https://collection.centrepompidou.fr/#/artwork/$1'), - 'P6356': ('IHOI work ID', 'http://www.ihoi.org/app/photopro.sk/ihoi_icono/detail?docid=$1&lang=eng'), - 'P6358': ('Musée Picasso artwork ID', 'https://www.navigart.fr/picassoparis/#/artwork/$1'), - 'P6372': ('Interpol WOA artwork ID (OBSOLETE)', 'https://www.interpol.int/notice/search/woa/$1'), - 'P6374': ('MAMVP artwork ID', 'http://www.mam.paris.fr/en/online-collections#/artwork/$1'), - 'P6489': ('Joan Miró Online Image Bank ID', 'https://www.successiomiro.com/catalogue/object/$1'), - 'P6506': ('Eliseu Visconti Project ID', 'https://eliseuvisconti.com.br/obra/$1'), - 'P6565': ('Musenor artwork ID', 'https://webmuseo.com/ws/musenor/app/collection/record/$1'), - 'P6576': ('Art Fund artwork ID', 'https://www.artfund.org/supporting-museums/art-weve-helped-buy/artwork/$1/wd'), - 'P6595': ('Paintings by Salvador Dalí ID', 'https://www.salvador-dali.org/en/artwork/catalogue-raisonne/obra/$1/'), - 'P6610': ('Ashmolean museum ID', 'http://collections.ashmolean.org/object/$1'), - 'P6625': ('Salvador Dali Museum ID', 'http://archive.thedali.org/mwebcgi/mweb.exe?request=record;id=$1;type=101'), - 'P6629': ('Artcurial lot ID', 'https://www.artcurial.com/en/$1'), - 'P6631': ('Tainacan MHN ID', 'http://mhn.acervos.museus.gov.br/reserva-tecnica/$1'), - 'P6633': ('Cini Foundation ID', 'http://arte.cini.it/Opere/$1'), - 'P6643': ('TV Spielfilm series ID', 'https://www.tvspielfilm.de/serien/$1'), - 'P6738': ('Whitney Museum of American Art artwork ID', 'https://whitney.org/collection/works/$1'), - 'P7229': ('Fundación Goya en Aragón ID', 'https://fundaciongoyaenaragon.es/obra/wd/$1'), + "P347": ("Joconde ID", "https://www.pop.culture.gouv.fr/notice/joconde/$1"), + "P350": ("RKDimages ID", "https://rkd.nl/explore/images/$1"), + "P1212": ( + "Atlas ID", + "http://cartelen.louvre.fr/cartelen/visite?srv=car_not_frame&idNotice=$1", + ), + "P1428": ("Lost Art ID", "http://www.lostart.de/EN/Verlust/$1"), + "P1679": ("Art UK artwork ID", "https://artuk.org/discover/artworks/$1"), + "P1726": ( + "Florentine musea Inventario 1890 ID", + "http://www.polomuseale.firenze.it/inv1890/scheda.asp?position=1&ninv=$1", + ), + "P2014": ( + "Museum of Modern Art work ID", + "http://www.moma.org/collection/works/$1", + ), + "P2092": ( + "Bildindex der Kunst und Architektur ID", + "https://www.bildindex.de/document/obj$1", + ), + "P2108": ( + "Kunstindeks Danmark artwork ID", + "https://www.kulturarv.dk/kid/VisVaerk.do?vaerkId=$1", + ), + "P2242": ( + "Florentine musea catalogue ID", + "http://www.polomuseale.firenze.it/catalogo/scheda.asp?nctn=$1&value=1", + ), + "P2282": ( + "Groeningemuseum work PID", + "http://groeningemuseum.be/collection/work/id/$1", + ), + "P2344": ("AGORHA work ID", "http://www.purl.org/inha/agorha/003/$1"), + "P2511": ("MSK Gent work PID", "http://mskgent.be/collection/work/id/$1"), + "P2539": ( + "Nationalmuseum Sweden artwork ID", + "http://collection.nationalmuseum.se/eMuseumPlus?service=ExternalInterface&module=collection&objectId=$1&viewType=detailView", + ), + "P2582": ( + "J. Paul Getty Museum object ID", + "http://www.getty.edu/art/collection/objects/$1", + ), + "P3272": ( + "Zeri image ID", + "http://catalogo.fondazionezeri.unibo.it/scheda/opera/$1/", + ), + "P3293": ("BALaT object ID", "http://balat.kikirpa.be/object/$1"), + "P3386": ( + "French Sculpture Census work ID", + "https://frenchsculpture.org/en/sculpture/$1", + ), + "P3467": ( + "Inventario Sculture - Polo Museale Fiorentino", + "http://www.polomuseale.firenze.it/invSculture/scheda.asp?position=1&ninv=$1", + ), + "P3504": ( + "Florentine Inventario Palatina art ID", + "http://www.polomuseale.firenze.it/invpalatina/scheda.asp?position=1&ninv=$1", + ), + "P3634": ("The Met object ID", "http://www.metmuseum.org/art/collection/search/$1"), + "P3711": ( + "Vanderkrogt.net Statues ID", + "http://vanderkrogt.net/statues/object.php?record=$1", + ), + "P3855": ( + "LombardiaBeniCulturali artwork ID", + "http://www.lombardiabeniculturali.it/opere-arte/schede/$1/", + ), + "P3929": ("V&A item ID", "http://collections.vam.ac.uk/item/$1"), + "P4144": ( + "Athenaeum artwork ID", + "http://www.the-athenaeum.org/art/detail.php?id=$1", + ), + "P4257": ( + "National Museums of Japan e-museum ID", + "http://www.emuseum.jp/detail/$1", + ), + "P4373": ( + "National Trust Collections ID", + "http://www.nationaltrustcollections.org.uk/object/$1", + ), + "P4380": ("Sandrart.net artwork ID", "http://ta.sandrart.net/-artwork-$1"), + "P4399": ( + "Enciclopédia Itaú Cultural ID", + "http://enciclopedia.itaucultural.org.br/$1", + ), + "P4525": ("MuIS object ID", "http://opendata.muis.ee/object/$1"), + "P4564": ("Art Museum of Estonia artwork ID", "https://digikogu.ekm.ee/oid-$1"), + "P4582": ( + "Kulturelles Erbe Köln object ID", + "https://www.kulturelles-erbe-koeln.de/documents/obj/$1", + ), + "P4610": ("ARTIC artwork ID", "https://www.artic.edu/artworks/$1"), + "P4611": ("LACMA ID", "https://collections.lacma.org/node/$1"), + "P4625": ( + "Museum of Fine Arts, Boston object ID", + "https://www.mfa.org/collections/object/$1", + ), + "P4643": ( + "Philadelphia Museum of Art ID", + "http://www.philamuseum.org/collections/permanent/$1.html", + ), + "P4659": ( + "Musée d'Orsay artwork ID", + "http://www.musee-orsay.fr/en/collections/index-of-works/notice.html?nnumid=$1", + ), + "P4673": ( + "Museum of Fine Arts, Houston object ID", + "https://www.mfah.org/art/detail/$1", + ), + "P4674": ( + "Indianapolis Museum of Art artwork ID", + "http://collection.imamuseum.org/artwork/$1/", + ), + "P4683": ( + "National Gallery of Art artwork ID", + "https://www.nga.gov/content/ngaweb/Collection/art-object-page.$1.html", + ), + "P4684": ( + "National Gallery of Victoria artwork ID", + "https://www.ngv.vic.gov.au/explore/collection/work/$1/", + ), + "P4686": ("Carnegie Museum of Art ID", "https://collection.cmoa.org/objects/$1"), + "P4692": ( + "American Art Collaborative object ID", + "http://browse.americanartcollaborative.org/object/$1.html", + ), + "P4701": ( + "Google Arts & Culture asset ID", + "https://artsandculture.google.com/asset/wd/$1", + ), + "P4704": ( + "Smithsonian American Art Museum ID", + "https://americanart.si.edu/collections/search/artwork/?id=$1", + ), + "P4709": ( + "Barnes Foundation ID", + "https://collection.barnesfoundation.org/objects/$1/details", + ), + "P4712": ( + "Minneapolis Institute of Art artwork ID", + "https://collections.artsmia.org/art/$1", + ), + "P4713": ("Walters Art Museum ID", "http://art.thewalters.org/detail/$1"), + "P4721": ( + "MuBE Virtual ID", + "http://mubevirtual.com.br/pt_br?Dados&area=ver&id=$1", + ), + "P4737": ( + "Solomon R. Guggenheim Foundation artwork ID", + "https://www.guggenheim.org/artwork/$1", + ), + "P4738": ( + "Yale Center for British Art artwork ID", + "http://collections.britishart.yale.edu/vufind/Record/$1", + ), + "P4739": ( + "Musée des Augustins artwork ID", + "https://www.augustins.org/fr/oeuvre/-/oeuvre/$1", + ), + "P4740": ( + "Brooklyn Museum artwork ID", + "https://www.brooklynmuseum.org/opencollection/objects/$1", + ), + "P4761": ("Images d'Art artwork ID", "http://art.rmngp.fr/en/library/artworks/$1"), + "P4764": ( + "Arcade artwork ID", + "http://www.culture.gouv.fr/public/mistral/arcade_fr?ACTION=CHERCHER&FIELD_1=REF&VALUE_1=$1", + ), + "P4814": ( + "Inventories of American Painting and Sculpture control number", + "https://siris-artinventories.si.edu/ipac20/ipac.jsp?&menu=search&index=.NW&term=$1", + ), + "P4905": ("KMSKA work PID", "http://kmska.be/collection/work/id/$1"), + "P5210": ( + "National Gallery of Armenia work ID", + "http://www.gallery.am/en/database/item/$1/", + ), + "P5223": ( + "Information Center for Israeli Art artwork ID", + "http://museum.imj.org.il/artcenter/includes/item.asp?id=$1", + ), + "P5265": ( + "Dordrechts Museum artwork ID", + "https://www.dordrechtsmuseum.nl/objecten/id/$1", + ), + "P5268": ("MNAV work ID", "http://acervo.mnav.gub.uy/obras.php?q=ni:$1"), + "P5269": ("Web umenia work ID", "https://www.webumenia.sk/dielo/$1"), + "P5407": ("MHK object ID", "http://datenbank.museum-kassel.de/$1"), + "P5499": ("Boijmans work ID", "https://www.boijmans.nl/en/collection/artworks/$1"), + "P5783": ("Cranach Digital Archive artwork ID", "http://lucascranach.org/$1"), + "P5823": ("Belvedere object ID", "https://digital.belvedere.at/objects/$1/"), + "P5891": ("Bpk-ID", "http://www.bpk-images.de/id/$1"), + "P6004": ( + "Brasiliana Iconográfica ID", + "https://www.brasilianaiconografica.art.br/obras/$1/wd", + ), + "P6007": ("Salons ID", "http://salons.musee-orsay.fr/index/notice/$1"), + "P6020": ("d'Art d'Art ! ID", "https://www.france.tv/france-2/d-art-d-art/$1.html"), + "P6141": ( + "À nos grands hommes ID", + "https://anosgrandshommes.musee-orsay.fr/index.php/Detail/objects/$1", + ), + "P6152": ( + "National Portrait Gallery (United States) object ID", + "http://npg.si.edu/object/npg_$1", + ), + "P6238": ( + "Monument aux morts ID", + "https://monumentsmorts.univ-lille.fr/monument/$1/wd/", + ), + "P6239": ( + "IEC commemorative monument of Catalonia ID", + "https://monuments.iec.cat/fitxa.asp?id=$1", + ), + "P6246": ( + "Paris Musées work ID", + "http://parismuseescollections.paris.fr/en/node/$1", + ), + "P6310": ("Muséosphère work ID", "http://museosphere.paris.fr/oeuvres/$1"), + "P6332": ("Panorama de l'art ID", "https://www.panoramadelart.com/$1"), + "P6355": ("MNAM artwork ID", "https://collection.centrepompidou.fr/#/artwork/$1"), + "P6356": ( + "IHOI work ID", + "http://www.ihoi.org/app/photopro.sk/ihoi_icono/detail?docid=$1&lang=eng", + ), + "P6358": ( + "Musée Picasso artwork ID", + "https://www.navigart.fr/picassoparis/#/artwork/$1", + ), + "P6372": ( + "Interpol WOA artwork ID (OBSOLETE)", + "https://www.interpol.int/notice/search/woa/$1", + ), + "P6374": ( + "MAMVP artwork ID", + "http://www.mam.paris.fr/en/online-collections#/artwork/$1", + ), + "P6489": ( + "Joan Miró Online Image Bank ID", + "https://www.successiomiro.com/catalogue/object/$1", + ), + "P6506": ("Eliseu Visconti Project ID", "https://eliseuvisconti.com.br/obra/$1"), + "P6565": ( + "Musenor artwork ID", + "https://webmuseo.com/ws/musenor/app/collection/record/$1", + ), + "P6576": ( + "Art Fund artwork ID", + "https://www.artfund.org/supporting-museums/art-weve-helped-buy/artwork/$1/wd", + ), + "P6595": ( + "Paintings by Salvador Dalí ID", + "https://www.salvador-dali.org/en/artwork/catalogue-raisonne/obra/$1/", + ), + "P6610": ("Ashmolean museum ID", "http://collections.ashmolean.org/object/$1"), + "P6625": ( + "Salvador Dali Museum ID", + "http://archive.thedali.org/mwebcgi/mweb.exe?request=record;id=$1;type=101", + ), + "P6629": ("Artcurial lot ID", "https://www.artcurial.com/en/$1"), + "P6631": ("Tainacan MHN ID", "http://mhn.acervos.museus.gov.br/reserva-tecnica/$1"), + "P6633": ("Cini Foundation ID", "http://arte.cini.it/Opere/$1"), + "P6643": ("TV Spielfilm series ID", "https://www.tvspielfilm.de/serien/$1"), + "P6738": ( + "Whitney Museum of American Art artwork ID", + "https://whitney.org/collection/works/$1", + ), + "P7229": ( + "Fundación Goya en Aragón ID", + "https://fundaciongoyaenaragon.es/obra/wd/$1", + ), } -def lookup(property_id, value): + +def lookup(property_id: str, value: str) -> dict[str, str]: + """Lookup property ID.""" label, formatter = table[property_id] - url = formatter.replace('$1', value) + url = formatter.replace("$1", value) return { - 'label': label, - 'url': url, - 'value': value, + "label": label, + "url": url, + "value": value, } -def find_catalog_id(entity): - return table.keys() & entity['claims'].keys() -def check_catalog(entity, catalog): - catalog_url = catalog['url'] - catalog_ids = catalog['ids'] +def find_catalog_id(entity: Entity) -> set[str]: + """Find catalog property IDs that appear in entity claims.""" + keys: set[str] = table.keys() & entity["claims"].keys() + return keys - if 'P4704' in entity['claims']: - saam_id = wikibase.first_datavalue(entity, 'P4704') + +def check_catalog(entity: Entity, catalog: CatalogDict) -> None: + """Check catalog.""" + catalog_url = catalog["url"] + catalog_ids = catalog["ids"] + + if "P4704" in entity["claims"]: + saam_id = wikibase.first_datavalue(entity, "P4704") + assert saam_id and isinstance(saam_id, (str, int)) cat = saam.get_catalog(saam_id) if cat: catalog.update(cat) - return + return None - if 'P4709' in entity['claims']: - catalog_id = wikibase.first_datavalue(entity, 'P4709') + if "P4709" in entity["claims"]: + catalog_id = wikibase.first_datavalue(entity, "P4709") + assert catalog_id and isinstance(catalog_id, (str, int)) cat = barnesfoundation.get_catalog(catalog_id) if cat: catalog.update(cat) - return + return None institutions = [ - ('www.dia.org', dia), - ('www.rijksmuseum.nl', rijksmuseum), - ('www.npg.org.uk', npg), - ('www.museodelprado.es', museodelprado), + ("www.dia.org", dia), + ("www.rijksmuseum.nl", rijksmuseum), + ("www.npg.org.uk", npg), + ("www.museodelprado.es", museodelprado), ] if catalog_url: @@ -151,25 +349,26 @@ def check_catalog(entity, catalog): if not cat: continue catalog.update(cat) - return + return None try: html = get_catalog_url(catalog_url) if html: description = get_description_from_page(html) if description: - catalog['description'] = description - return + catalog["description"] = description + return None except UnicodeDecodeError: - return + return None for property_id in sorted(catalog_ids): - if property_id == 'P350': + if property_id == "P350": continue # RKDimages ID value = wikibase.first_datavalue(entity, property_id) # identifier can be 'no value', example: Q26754456 if value is None: continue + assert isinstance(value, str) detail = lookup(property_id, value) try: html = get_catalog_page(property_id, value) @@ -181,11 +380,13 @@ def check_catalog(entity, catalog): if not description: continue catalog = { - 'institution': detail['label'], - 'description': description, + "institution": detail["label"], + "description": description, } -def get_catalog_from_artwork(entity): + +def get_catalog_from_artwork(entity: Entity) -> CatalogDict: + """Get catalog from artwork.""" catalog_ids = find_catalog_id(entity) catalog_detail = [] for property_id in sorted(catalog_ids): @@ -193,47 +394,54 @@ def get_catalog_from_artwork(entity): # identifier can be 'no value', example: Q26754456 if value is None: continue + assert isinstance(value, str) detail = lookup(property_id, value) catalog_detail.append(detail) - catalog = { - 'url': wikibase.first_datavalue(entity, 'P973'), - 'detail': catalog_detail, - 'ids': catalog_ids, + url = wikibase.first_datavalue(entity, "P973") + assert isinstance(url, str) + catalog: CatalogDict = { + "url": url, + "detail": catalog_detail, + "ids": catalog_ids, } try: check_catalog(entity, catalog) - except (requests.exceptions.ReadTimeout, - requests.exceptions.ConnectTimeout, - requests.exceptions.ConnectionError, - requests.exceptions.TooManyRedirects): + except ( + requests.exceptions.ReadTimeout, + requests.exceptions.ConnectTimeout, + requests.exceptions.ConnectionError, + requests.exceptions.TooManyRedirects, + ): pass return catalog -def get_description_from_page(html): + +def get_description_from_page(html: bytes) -> str | None: + """Check HTML for description of artwork.""" if not html: - return + return None root = lxml.html.fromstring(html) div = root.find('.//div[@itemprop="description"]') if div is not None: return div.text - div_list = root.find_class('item-description') + div_list = root.find_class("item-description") if len(div_list): return div_list[0].text_content() meta_twitter_description = root.find('.//meta[@name="twitter:description"]') if meta_twitter_description is None: - return - twitter_description = meta_twitter_description.get('content') + return None + twitter_description = meta_twitter_description.get("content") if not twitter_description: - return + return None twitter_description = twitter_description.strip() if not twitter_description: - return + return None for element in root.getiterator(): if not element.text: @@ -246,33 +454,35 @@ def get_description_from_page(html): return twitter_description -def get_catalog_page(property_id, value): + +def get_catalog_page(property_id: str, value: str) -> bytes: + """Get catalog page.""" detail = lookup(property_id, value) - url = detail['url'] - catalog_id = value.replace('/', '_') + url = detail["url"] + catalog_id = value.replace("/", "_") - filename = f'cache/{property_id}_{catalog_id}.html' + filename = f"cache/{property_id}_{catalog_id}.html" if os.path.exists(filename): - html = open(filename, 'rb').read() + html = open(filename, "rb").read() else: - r = requests.get(url, headers={'User-Agent': user_agent}, timeout=2) + r = requests.get(url, headers={"User-Agent": user_agent}, timeout=2) html = r.content - open(filename, 'wb').write(html) + open(filename, "wb").write(html) return html -def get_catalog_url(url): - md5_filename = hashlib.md5(url.encode('utf-8')).hexdigest() + '.html' - filename = 'cache/' + md5_filename + +def get_catalog_url(url: str) -> bytes: + """Get catalog URL and cache.""" + md5_filename = hashlib.md5(url.encode("utf-8")).hexdigest() + ".html" + filename = "cache/" + md5_filename if os.path.exists(filename): - html = open(filename, 'rb').read() + html = open(filename, "rb").read() else: - r = relaxed_ssl.get(url, - headers={'User-Agent': user_agent}, - timeout=2) + r = relaxed_ssl.get(url, headers={"User-Agent": user_agent}, timeout=2) html = r.content - open(filename, 'wb').write(html) + open(filename, "wb").write(html) return html diff --git a/depicts/wdqs.py b/depicts/wdqs.py index 9da2b1a..2e12adf 100644 --- a/depicts/wdqs.py +++ b/depicts/wdqs.py @@ -1,59 +1,93 @@ -import requests -import json -import urllib.parse -import os -import dateutil.parser +"""Wikidata query service.""" + import hashlib -from flask import request, render_template, g +import json +import os +import typing +import urllib.parse from collections import defaultdict from datetime import datetime -from .model import WikidataQuery -from . import utils, database -query_url = 'https://query.wikidata.org/bigdata/namespace/wdq/sparql' -url_start = 'http://www.wikidata.org/entity/Q' -commons_start = 'http://commons.wikimedia.org/wiki/Special:FilePath/' +import dateutil.parser +import requests +from flask import g, render_template, request + +from . import database, utils +from .model import WikidataQuery + +query_url = "https://query.wikidata.org/bigdata/namespace/wdq/sparql" +url_start = "http://www.wikidata.org/entity/Q" +commons_start = "http://commons.wikimedia.org/wiki/Special:FilePath/" + +Row = dict[str, dict[str, typing.Any]] + class QueryError(Exception): - def __init__(self, query, r): + """Query error.""" + + query: str + r: requests.Response + + def __init__(self, query: str, r: requests.Response) -> None: + """Init.""" self.query = query self.r = r + class QueryTimeout(QueryError): - def __init__(self, query, r): + """Query timeout.""" + + def __init__(self, query: str, r: requests.Response) -> None: + """Init.""" self.query = query self.r = r -def row_id(row, field='item'): - return int(utils.drop_start(row[field]['value'], url_start)) -def get_row_value(row, field): - return row[field]['value'] if field in row else None +def row_id(row: Row, field: str = "item") -> int: + """Get item_id for row.""" + return int(utils.drop_start(row[field]["value"], url_start)) -def get_row_text(row, field): - if field in row and 'xml:lang' in row[field]: - return row[field]['value'] -def commons_uri_to_filename(uri): +def get_row_value(row: Row, field: str) -> str | None: + """Get value from row field.""" + return typing.cast(str, row[field]["value"]) if field in row else None + + +def get_row_text(row: Row, field: str) -> str | None: + """Get text from row field.""" + return row[field]["value"] if field in row and "xml:lang" in row[field] else None + + +def commons_uri_to_filename(uri: str) -> str: + """Commons URI to filename.""" return urllib.parse.unquote(utils.drop_start(uri, commons_start)) -def run_from_template(template_name, **context): + +def run_from_template(template_name: str, **context: typing.Any) -> requests.Response: query = render_template(template_name, **context) return run_query(query, query_template=template_name) -def run_from_template_with_cache(template_name, cache_name=None, **context): + +def run_from_template_with_cache( + template_name: str, cache_name: str | None = None, **context: typing.Any +) -> list[Row]: query = render_template(template_name, **context) return run_query_with_cache(query, name=cache_name, query_template=template_name) -def run_query(query, **kwargs): + +def run_query(query: str, **kwargs: typing.Any) -> requests.Response: + """Run WDQS query.""" r, db_query = record_query(query, **kwargs) return r -def record_query(query, query_template=None): - params = {'query': query, 'format': 'json'} + +def record_query( + query: str, query_template: str | None = None +) -> tuple[requests.Response, WikidataQuery]: + params = {"query": query, "format": "json"} start = datetime.utcnow() - path = request.full_path.rstrip('?') if request else None + path = request.full_path.rstrip("?") if request else None endpoint = request.endpoint if request else None db_query = WikidataQuery( @@ -61,8 +95,9 @@ def record_query(query, query_template=None): sparql_query=query, path=path, query_template=query_template, - page_title=getattr(g, 'title', None), - endpoint=endpoint) + page_title=getattr(g, "title", None), + endpoint=endpoint, + ) database.session.add(db_query) database.session.commit() @@ -74,78 +109,84 @@ def record_query(query, query_template=None): db_query.error_text = r.text database.session.commit() - if 'java.util.concurrent.TimeoutException' in r.text: - raise QueryTimeout(params, r) + if "java.util.concurrent.TimeoutException" in r.text: + raise QueryTimeout(query, r) else: - raise QueryError(params, r) + raise QueryError(query, r) database.session.commit() return r, db_query -def md5_query(query): - ''' generate the md5 hexdigest of a SPARQL query ''' - return hashlib.md5(query.encode('utf-8')).hexdigest() -def run_query_with_cache(q, name=None, query_template=None): +def md5_query(query: str) -> str: + """generate the md5 hexdigest of a SPARQL query.""" + return hashlib.md5(query.encode("utf-8")).hexdigest() + + +def run_query_with_cache( + q: str, name: str | None = None, query_template: str | None = None +) -> list[Row]: if name is None: name = md5_query(q) - filename = f'cache/{name}.json' + filename = f"cache/{name}.json" if os.path.exists(filename): from_cache = json.load(open(filename)) - if isinstance(from_cache, dict) and from_cache.get('query') == q: - return from_cache['bindings'] + if isinstance(from_cache, dict) and from_cache.get("query") == q: + return typing.cast(list[Row], from_cache["bindings"]) r, db_query = record_query(q, query_template=query_template) - bindings = r.json()['results']['bindings'] - json.dump({'query': q, 'bindings': bindings}, - open(filename, 'w'), indent=2) + bindings: list[Row] = r.json()["results"]["bindings"] + json.dump({"query": q, "bindings": bindings}, open(filename, "w"), indent=2) db_query.row_count = len(bindings) database.session.commit() return bindings -def format_time(row_time, row_timeprecision): - t = dateutil.parser.parse(row_time['value']) - precision = int(row_timeprecision['value']) + +def format_time(row_time: dict[str, str], row_timeprecision: dict[str, int]) -> str: + """Format time with given precision.""" + t = dateutil.parser.parse(row_time["value"]) + precision = int(row_timeprecision["value"]) if precision == 9: - return t.year + return str(t.year) if precision == 8: - return f'{t.year}s' + return f"{t.year}s" if precision == 7: - return f'{utils.ordinal((t.year // 100) + 1)} century' + return f"{utils.ordinal((t.year // 100) + 1)} century" if precision == 6: - return f'{utils.ordinal((t.year // 1000) + 1)} millennium' + return f"{utils.ordinal((t.year // 1000) + 1)} millennium" - return row_time['value'] + return row_time["value"] -def build_browse_item_map(bindings): + +def build_browse_item_map(bindings: list[Row]) -> dict[int, dict[str, typing.Any]]: row_map = defaultdict(list) for row in bindings: item_id = row_id(row) - label = row['itemLabel']['value'] - image_filename = commons_uri_to_filename(row['image']['value']) + label = row["itemLabel"]["value"] + image_filename = commons_uri_to_filename(row["image"]["value"]) - artist_name = get_row_value(row, 'artistLabel') + artist_name = get_row_value(row, "artistLabel") - d = format_time(row['time'], row['timeprecision']) if 'time' in row else None - row_qid = f'Q{item_id}' + d = format_time(row["time"], row["timeprecision"]) if "time" in row else None + row_qid = f"Q{item_id}" item = { - 'image_filename': image_filename, - 'date': d, - 'depicts': row['depictsList']['value'].split('|'), + "image_filename": image_filename, + "date": d, + "depicts": row["depictsList"]["value"].split("|"), } if artist_name: - item['artist_name'] = artist_name + item["artist_name"] = artist_name if label != row_qid: - item['label'] = label + item["label"] = label - title = get_row_value(row, 'title') + title = get_row_value(row, "title") if title: - lang = get_row_value(row, 'titleLang') - item['title'] = (lang, title) + lang = get_row_value(row, "titleLang") + item["title"] = (lang, title) row_map[item_id].append(item) @@ -158,53 +199,59 @@ def build_browse_item_map(bindings): when = None depicts = [] for item in items: - if 'title' in item: - lang, title = item['title'] + if "title" in item: + lang, title = item["title"] titles[lang] = title - filenames.add(item['image_filename']) - artist_name = item.get('artist_name') + filenames.add(item["image_filename"]) + artist_name = item.get("artist_name") if artist_name and artist_name not in artist_names: artist_names.append(artist_name) - if 'label' in item: - labels.add(item['label']) - if when is None and item.get('date'): - when = item['date'] - for d in item['depicts']: + if "label" in item: + labels.add(item["label"]) + if when is None and item.get("date"): + when = item["date"] + for d in item["depicts"]: if d not in depicts: depicts.append(d) item = { - 'qid': f'Q{item_id}', - 'item_id': item_id, - 'image_filename': list(filenames), - 'artist_name': ', '.join(artist_names), - 'date': when, - 'depicts': depicts, + "qid": f"Q{item_id}", + "item_id": item_id, + "image_filename": list(filenames), + "artist_name": ", ".join(artist_names), + "date": when, + "depicts": depicts, } if artist_names: - item['artist_name'] = ', '.join(artist_names) + item["artist_name"] = ", ".join(artist_names) if labels: assert len(labels) == 1 - item['label'] = list(labels)[0] - elif 'en' in titles: - item['label'] = titles['en'] + item["label"] = list(labels)[0] + elif "en" in titles: + item["label"] = titles["en"] else: - item['label'] = '[ label missing ]' + item["label"] = "[ label missing ]" item_map[item_id] = item return item_map -def quote_list(l): - no_dups = list(dict.fromkeys(l)) # remove duplicates - return ' '.join('("' + s.replace('"', '\\"') + '")' for s in no_dups) -def url_list(l): - no_dups = list(dict.fromkeys(l)) # remove duplicates - return ' '.join(f'(<{s}>)' for s in no_dups) +def quote_list(list_of_strings: list[str]) -> str: + """Quote strings and combine into list for SPARQL query.""" + no_dups = list(dict.fromkeys(list_of_strings)) # remove duplicates + return " ".join('("' + s.replace('"', '\\"') + '")' for s in no_dups) -def is_artificial_physical_object(qid): - bindings = run_from_template_with_cache('query/item_type.sparql', qid=qid) - types = {row_id(row, field='item') for row in bindings} + +def url_list(urls: list[str]) -> str: + """Combine URLs into list for SPARQL query.""" + no_dups = list(dict.fromkeys(urls)) # remove duplicates + return " ".join(f"(<{s}>)" for s in no_dups) + + +def is_artificial_physical_object(qid: str) -> bool: + """Item is artificial physical object.""" + bindings = run_from_template_with_cache("query/item_type.sparql", qid=qid) + types = {row_id(row, field="item") for row in bindings} # Q8205328 == artificial physical object return 8205328 in types diff --git a/depicts/wikibase.py b/depicts/wikibase.py index 0c2a4d0..c7c1a52 100644 --- a/depicts/wikibase.py +++ b/depicts/wikibase.py @@ -1,25 +1,51 @@ -def first_datavalue(entity, pid): - if pid in entity['claims']: - mainsnak = entity['claims'][pid][0]['mainsnak'] - if 'datavalue' in mainsnak: - return mainsnak['datavalue']['value'] +"""Wikibase functions.""" -def get_entity_label(entity): - if 'labels' not in entity: +import typing + +from .type import Entity + + +def first_datavalue( + entity: Entity, pid: str +) -> str | int | None | dict[str, typing.Any]: + """Get first datavalue from claim.""" + if pid in entity["claims"]: + mainsnak = entity["claims"][pid][0]["mainsnak"] + if "datavalue" in mainsnak: + v = mainsnak["datavalue"]["value"] + assert isinstance(v, str | int) + return v + return None + + +def get_entity_label(entity: Entity) -> str | None: + """Get entity label.""" + if "labels" not in entity: return None - if 'en' in entity['labels']: - return entity['labels']['en']['value'] + label: str + if "en" in entity["labels"]: + label = entity["labels"]["en"]["value"] + assert isinstance(label, str) + return label - label_values = {l['value'] for l in entity['labels'].values()} + label_values = {lang["value"] for lang in entity["labels"].values()} if len(label_values) == 1: - return list(label_values)[0] + label = list(label_values)[0] + assert isinstance(label, str) + return label + return None -def get_en_value(entity, key): - if 'en' in entity[key]: - return entity[key]['en']['value'] -def get_en_label(entity): - return get_en_value(entity, 'labels') +def get_en_value(entity: Entity, key: str) -> str | None: + """Get English value from label or description.""" + return entity[key]["en"]["value"] if "en" in entity[key] else None # type: ignore -def get_en_description(entity): - return get_en_value(entity, 'descriptions') + +def get_en_label(entity: Entity) -> str | None: + """Get English label.""" + return get_en_value(entity, "labels") + + +def get_en_description(entity: Entity) -> str | None: + """Get English description of enttity.""" + return get_en_value(entity, "descriptions") diff --git a/depicts/wikidata_edit.py b/depicts/wikidata_edit.py index b9e1738..dff4209 100644 --- a/depicts/wikidata_edit.py +++ b/depicts/wikidata_edit.py @@ -1,17 +1,24 @@ +"""Make an edit to Wikidata.""" + from depicts import mediawiki, wikibase from depicts.model import DepictsItem -def create_depicts_item(item_id): - qid = f'Q{item_id}' - entity = mediawiki.get_entity(qid) - if 'en' in entity['aliases']: - alt_labels = {alt['value'] for alt in entity['aliases']['en']} +def create_depicts_item(item_id: int) -> DepictsItem: + """Create depicts item.""" + qid = f"Q{item_id}" + entity = mediawiki.get_entity(qid) + assert entity + + if "en" in entity["aliases"]: + alt_labels = {alt["value"] for alt in entity["aliases"]["en"]} else: alt_labels = set() - return DepictsItem(item_id=item_id, - label=wikibase.get_en_label(entity), - description=wikibase.get_en_description(entity), - alt_labels=alt_labels, - count=0) + return DepictsItem( + item_id=item_id, + label=wikibase.get_en_label(entity), + description=wikibase.get_en_description(entity), + alt_labels=alt_labels, + count=0, + ) diff --git a/depicts/wikidata_oauth.py b/depicts/wikidata_oauth.py index e441678..b10450b 100644 --- a/depicts/wikidata_oauth.py +++ b/depicts/wikidata_oauth.py @@ -1,67 +1,93 @@ -from flask import current_app, session -from requests_oauthlib import OAuth1Session +import typing from urllib.parse import urlencode -def get_edit_proxy(): - edit_proxy = current_app.config.get('EDIT_PROXY') +import requests +from flask import current_app, session +from requests_oauthlib import OAuth1Session + +from .type import CallParams + + +def get_edit_proxy() -> dict[str, str]: + edit_proxy = current_app.config.get("EDIT_PROXY") if edit_proxy: - return {'http': edit_proxy, 'https': edit_proxy} + return {"http": edit_proxy, "https": edit_proxy} else: return {} -def api_post_request(params): + +def api_post_request(params: dict[str, str | int]) -> requests.Response: app = current_app - url = 'https://www.wikidata.org/w/api.php' - client_key = app.config['CLIENT_KEY'] - client_secret = app.config['CLIENT_SECRET'] - oauth = OAuth1Session(client_key, - client_secret=client_secret, - resource_owner_key=session['owner_key'], - resource_owner_secret=session['owner_secret']) + url = "https://www.wikidata.org/w/api.php" + client_key = app.config["CLIENT_KEY"] + client_secret = app.config["CLIENT_SECRET"] + oauth = OAuth1Session( + client_key, + client_secret=client_secret, + resource_owner_key=session["owner_key"], + resource_owner_secret=session["owner_secret"], + ) proxies = get_edit_proxy() - return oauth.post(url, data=params, timeout=4, proxies=proxies) + r: requests.Response = oauth.post(url, data=params, timeout=4, proxies=proxies) + return r -def raw_request(params): + +def raw_request(params: dict[str, str | int]) -> requests.Response: app = current_app - url = 'https://www.wikidata.org/w/api.php?' + urlencode(params) - client_key = app.config['CLIENT_KEY'] - client_secret = app.config['CLIENT_SECRET'] - oauth = OAuth1Session(client_key, - client_secret=client_secret, - resource_owner_key=session['owner_key'], - resource_owner_secret=session['owner_secret']) + url = "https://www.wikidata.org/w/api.php?" + urlencode(params) + client_key = app.config["CLIENT_KEY"] + client_secret = app.config["CLIENT_SECRET"] + oauth = OAuth1Session( + client_key, + client_secret=client_secret, + resource_owner_key=session["owner_key"], + resource_owner_secret=session["owner_secret"], + ) proxies = get_edit_proxy() - return oauth.get(url, timeout=4, proxies=proxies) + r: requests.Response = oauth.get(url, timeout=4, proxies=proxies) + return r -def api_request(params): - return raw_request(params).json() -def get_token(): - params = { - 'action': 'query', - 'meta': 'tokens', - 'format': 'json', - 'formatversion': 2, +def api_request(params: dict[str, str | int]) -> dict[str, typing.Any]: + data: dict[str, typing.Any] = raw_request(params).json() + return data + + +def get_token() -> str: + params: CallParams = { + "action": "query", + "meta": "tokens", + "format": "json", + "formatversion": 2, } reply = api_request(params) - token = reply['query']['tokens']['csrftoken'] + token: str = reply["query"]["tokens"]["csrftoken"] return token -def userinfo_call(): - params = {'action': 'query', 'meta': 'userinfo', 'format': 'json'} + +def userinfo_call() -> dict[str, typing.Any]: + params: dict[str, str | int] = { + "action": "query", + "meta": "userinfo", + "format": "json", + } return api_request(params) -def get_username(): - if 'owner_key' not in session: - return # not authorized - if 'username' in session: - return session['username'] +def get_username() -> str | None: + if "owner_key" not in session: + return None # not authorized + + username: str + if "username" in session: + username = session["username"] + return username reply = userinfo_call() - if 'query' not in reply: - return - session['username'] = reply['query']['userinfo']['name'] + if "query" not in reply: + return None + username = reply["query"]["userinfo"]["name"] + session["username"] = username - return session['username'] + return username