Add type hints and docstrings

This commit is contained in:
Edward Betts 2023-10-25 07:54:05 +01:00
parent 63d0d198eb
commit fd281532e6
26 changed files with 1774 additions and 968 deletions

252
app.py
View file

@ -1,18 +1,18 @@
#!/usr/bin/python3
import hashlib
import inspect
import itertools
import json
import os
import re
import socket
import typing
from collections import defaultdict
from datetime import datetime
from typing import Iterable, Mapping
import requests.exceptions
import simplejson.errors
import werkzeug
from flask import (
Flask,
g,
@ -27,6 +27,7 @@ from requests_oauthlib import OAuth1Session
from sqlalchemy import distinct, func
from sqlalchemy.orm import aliased
from sqlalchemy.sql.expression import desc
from werkzeug.wrappers import Response
from depicts import (
artwork,
@ -54,6 +55,7 @@ from depicts.model import (
WikidataQuery,
)
from depicts.pager import Pagination, init_pager
from depicts.type import Entity
user_agent = "Mozilla/5.0 (X11; Linux i586; rv:32.0) Gecko/20160101 Firefox/32.0"
@ -109,30 +111,15 @@ re_pid = re.compile(r"^P(\d+)")
@app.teardown_appcontext
def shutdown_session(exception=None):
database.session.remove()
@app.errorhandler(werkzeug.exceptions.InternalServerError)
def exception_handler(e):
tb = werkzeug.debug.tbtools.get_current_traceback()
last_frame = next(frame for frame in reversed(tb.frames) if not frame.is_library)
last_frame_args = inspect.getargs(last_frame.code)
return (
render_template(
"show_error.html",
tb=tb,
last_frame=last_frame,
last_frame_args=last_frame_args,
),
500,
)
def shutdown_session(exception: Exception | None = None) -> None:
database.session.remove() # type:ignore
@app.template_global()
def set_url_args(endpoint=None, **new_args):
def set_url_args(endpoint: str | None = None, **new_args: str) -> str:
if endpoint is None:
endpoint = request.endpoint
assert endpoint and request.view_args
args = request.view_args.copy()
args.update(request.args)
args.update(new_args)
@ -141,23 +128,25 @@ def set_url_args(endpoint=None, **new_args):
@app.template_global()
def current_url():
def current_url() -> str:
"""Get current URL."""
assert request and request.view_args and request.endpoint
args = request.view_args.copy()
args.update(request.args)
return url_for(request.endpoint, **args)
@app.before_request
def init_profile():
def init_profile() -> None:
g.profiling = []
@app.before_request
def global_user():
def global_user() -> None:
g.user = wikidata_oauth.get_username()
def check_for_blocks():
def check_for_blocks() -> None:
if hasattr(g, "server_ip"): # already done
return
hostname = app.config.get("HOSTNAME")
@ -172,43 +161,45 @@ def check_for_blocks():
@app.before_request
def get_blocks():
def get_blocks() -> None:
if app.config.get("SHOW_BLOCK_ALERT") is not False:
check_for_blocks()
@app.route("/find_more_setting")
def flip_find_more():
def flip_find_more() -> str:
session["no_find_more"] = not session.get("no_find_more")
display = {True: "on", False: "off"}[not session["no_find_more"]]
return "flipped. find more is " + display
def existing_edit(item_id, depicts_id):
q = Edit.query.filter_by(artwork_id=item_id, depicts_id=depicts_id)
return q.count() != 0
def existing_edit(item_id: int, depicts_id: int) -> bool:
q = Edit.query.filter_by(artwork_id=item_id, depicts_id=depicts_id) # type: ignore
return bool(q.count() != 0)
@app.route("/save/Q<int:item_id>", methods=["POST"])
def save(item_id):
def save(item_id: int) -> str | Response:
depicts = request.form.getlist("depicts")
username = wikidata_oauth.get_username()
assert username
token = wikidata_oauth.get_token()
artwork_item = Item.query.get(item_id)
artwork_item = Item.query.get(item_id) # type: ignore
if artwork_item is None:
artwork_entity = mediawiki.get_entity_with_cache(f"Q{item_id}")
artwork_item = Item(item_id=item_id, entity=artwork_entity)
artwork_item = Item(
item_id=item_id, entity=typing.cast(dict[str, str], artwork_entity)
)
database.session.add(artwork_item)
database.session.commit()
for depicts_qid in depicts:
depicts_id = int(depicts_qid[1:])
depicts_item = DepictsItem.query.get(depicts_id)
depicts_item = DepictsItem.query.get(depicts_id) # type: ignore
if depicts_item is None:
depicts_item = wikidata_edit.create_depicts_item(depicts_id)
database.session.add(depicts_item)
@ -247,24 +238,24 @@ def save(item_id):
@app.route("/settings", methods=["GET", "POST"])
def user_settings():
def user_settings() -> str:
return render_template("user_settings.html")
@app.route("/test/lookup")
def test_lookup_page():
def test_lookup_page() -> str:
return render_template("test_lookup.html")
@app.route("/property/P<int:property_id>")
def property_query_page(property_id):
def property_query_page(property_id: int) -> str:
pid = f"P{property_id}"
g.title = find_more_props[pid]
sort = request.args.get("sort")
sort_by_name = sort and sort.lower().strip() == "name"
q = (
database.session.query(
database.session.query( # type: ignore
Triple.object_id, func.count(func.distinct(Triple.subject_id)).label("c")
)
.filter_by(predicate_id=property_id)
@ -302,15 +293,19 @@ def property_query_page(property_id):
@app.route("/")
def start():
def start() -> Response:
return random_artwork()
@app.route("/next")
def random_artwork():
def random_artwork() -> Response:
found = None
while True:
q = Item.query.filter_by(is_artwork=True).order_by(func.random()).limit(30)
q = (
Item.query.filter_by(is_artwork=True) # type: ignore
.order_by(func.random())
.limit(30)
)
for item in q:
has_depicts = "P180" in item.entity["claims"]
if has_depicts:
@ -325,7 +320,7 @@ def random_artwork():
@app.route("/oauth/start")
def start_oauth():
def start_oauth() -> Response:
next_page = request.args.get("next")
if next_page:
session["after_login"] = next_page
@ -349,7 +344,7 @@ def start_oauth():
@app.route("/oauth/callback", methods=["GET"])
def oauth_callback():
def oauth_callback() -> Response:
base_url = "https://www.wikidata.org/w/index.php"
client_key = app.config["CLIENT_KEY"]
client_secret = app.config["CLIENT_SECRET"]
@ -381,17 +376,17 @@ def oauth_callback():
@app.route("/oauth/disconnect")
def oauth_disconnect():
def oauth_disconnect() -> Response:
for key in "owner_key", "owner_secret", "username", "after_login":
if key in session:
del session[key]
return redirect(url_for("browse_page"))
def create_claim(artwork_id, depicts_id, token):
def create_claim(artwork_id: int, depicts_id: int, token: str) -> requests.Response:
artwork_qid = f"Q{artwork_id}"
value = json.dumps({"entity-type": "item", "numeric-id": depicts_id})
params = {
params: dict[str, str | int] = {
"action": "wbcreateclaim",
"entity": artwork_qid,
"property": "P180",
@ -401,10 +396,11 @@ def create_claim(artwork_id, depicts_id, token):
"format": "json",
"formatversion": 2,
}
return wikidata_oauth.api_post_request(params)
r: requests.Response = wikidata_oauth.api_post_request(params)
return r
def image_with_cache(qid, image_filename, width):
def image_with_cache(qid: str, image_filename: str, width: int) -> dict[str, str]:
filename = f"cache/{qid}_{width}_image.json"
detail = json.load(open(filename)) if os.path.exists(filename) else {}
@ -416,10 +412,11 @@ def image_with_cache(qid, image_filename, width):
detail = commons.image_detail([image_filename], thumbwidth=width)
json.dump(detail, open(filename, "w"), indent=2)
return detail.get(image_filename)
image: dict[str, str] = detail.get(image_filename)
return image
def existing_depicts_from_entity(entity):
def existing_depicts_from_entity(entity: Entity) -> list[dict[str, typing.Any]]:
if "P180" not in entity["claims"]:
return []
existing = []
@ -430,7 +427,7 @@ def existing_depicts_from_entity(entity):
item_id = claim["mainsnak"]["datavalue"]["value"]["numeric-id"]
item = DepictsItem.query.get(item_id)
item = DepictsItem.query.get(item_id) # type: ignore
if not item:
item = wikidata_edit.create_depicts_item(item_id)
database.session.add(item)
@ -448,27 +445,32 @@ def existing_depicts_from_entity(entity):
return existing
def get_institution(entity, other):
def get_institution(entity: Entity, other: Mapping[str, str | None]) -> str | None:
if "P276" in entity["claims"]:
location = wikibase.first_datavalue(entity, "P276")
assert isinstance(location, dict)
if location:
return other.get(location["id"])
return typing.cast(str, other.get(location["id"]))
if "P195" in entity["claims"]:
collection = wikibase.first_datavalue(entity, "P195")
assert isinstance(collection, dict)
if collection:
return other.get(collection["id"])
return typing.cast(str, other.get(collection["id"]))
return None
@app.route("/item/Q<int:item_id>")
def item_page(item_id):
def item_page(item_id: int) -> str | Response:
qid = f"Q{item_id}"
g.qid = qid
item = artwork.Artwork(qid)
from_redirect = qid in session and session.pop(qid) == "from redirect"
entity = mediawiki.get_entity_with_cache(qid, refresh=not from_redirect)
assert entity
if "redirects" in entity:
redirect_to_item_id = int(entity["redirects"]["to"][1:])
assert request.endpoint
return redirect(url_for(request.endpoint, item_id=redirect_to_item_id))
existing_depicts = existing_depicts_from_entity(entity)
@ -494,7 +496,7 @@ def item_page(item_id):
label_languages = label_and_language["languages"] if label_and_language else []
show_translation_links = all(lang.code != "en" for lang in label_languages)
artwork_item = Item.query.get(item_id)
artwork_item = Item.query.get(item_id) # type: ignore
if artwork_item is None:
if not wdqs.is_artificial_physical_object(qid):
return render_template(
@ -517,7 +519,7 @@ def item_page(item_id):
artwork_item = Item(
item_id=item_id,
entity=entity,
entity=typing.cast(dict[str, str], entity),
lastrevid=entity["lastrevid"],
modified=modified,
)
@ -525,7 +527,9 @@ def item_page(item_id):
catalog = wd_catalog.get_catalog_from_artwork(entity)
if not catalog.get("institution"):
catalog["institution"] = get_institution(entity, other)
institution = get_institution(entity, other)
assert institution
catalog["institution"] = institution
return render_template(
"item.html",
@ -548,11 +552,13 @@ def item_page(item_id):
)
def get_languages(codes):
return Language.query.filter(Language.wikimedia_language_code.in_(codes))
def get_languages(codes: typing.Iterable[str]) -> typing.Any:
return Language.query.filter( # type: ignore
Language.wikimedia_language_code.in_(codes)
)
def get_entity_label_and_language(entity):
def get_entity_label_and_language(entity: Entity) -> dict[str, typing.Any] | None:
"""
Look for a useful label and return it with a list of languages that have that label.
@ -573,8 +579,10 @@ def get_entity_label_and_language(entity):
label, languages = list(group_by_label.items())[0]
return {"label": label, "languages": get_languages(languages)}
return None
def get_labels(keys, name=None):
def get_labels(keys: typing.Iterable[str], name: str | None = None) -> dict[str, str]:
keys = sorted(keys, key=lambda i: int(i[1:]))
if name is None:
name = hashlib.md5("_".join(keys).encode("utf-8")).hexdigest()
@ -590,10 +598,13 @@ def get_labels(keys, name=None):
json.dump({"keys": keys, "labels": labels}, open(filename, "w"), indent=2)
return {entity["id"]: wikibase.get_entity_label(entity) for entity in labels}
return {
entity["id"]: wikibase.get_entity_label(entity) or "no English label"
for entity in labels
}
def get_labels_db(keys):
def get_labels_db(keys: Iterable[str]):
keys = set(keys)
labels = {}
missing = set()
@ -601,7 +612,7 @@ def get_labels_db(keys):
m = re_qid.match(qid)
if m:
item_id = int(m.group(1))
item = Item.query.get(item_id)
item = Item.query.get(item_id) # type: ignore
if item:
labels[qid] = item.label
continue
@ -619,8 +630,8 @@ def get_labels_db(keys):
modified = datetime.strptime(entity["modified"], "%Y-%m-%dT%H:%M:%SZ")
# FIXME: check if the item is an artwork and set is_artwork correctly
item = Item(
item_id=qid[1:],
item = Item( # type: ignore
item_id=int(qid[1:]),
entity=entity,
lastrevid=entity["lastrevid"],
modified=modified,
@ -635,7 +646,7 @@ def get_labels_db(keys):
return labels
def build_other_set(entity):
def build_other_set(entity: Entity) -> set[str]:
other_items = set()
for key in find_more_props.keys():
for claim in entity["claims"].get(key, []):
@ -645,20 +656,24 @@ def build_other_set(entity):
return other_items
def get_other(entity):
def get_other(entity: Entity) -> Mapping[str, str | None]:
other_items = build_other_set(entity)
return get_labels(other_items)
@app.route("/edits")
def list_edits():
q = Edit.query.order_by(Edit.timestamp.desc())
def list_edits() -> str:
q = Edit.query.order_by(Edit.timestamp.desc()) # type: ignore
page = utils.get_int_arg("page") or 1
pager = Pagination(page, 100, q.count())
item_count = database.session.query(func.count(distinct(Edit.artwork_id))).scalar()
item_count = database.session.query(
func.count(distinct(Edit.artwork_id))
).scalar() # type: ignore
user_count = database.session.query(func.count(distinct(Edit.username))).scalar()
user_count = database.session.query(
func.count(distinct(Edit.username))
).scalar() # type: ignore
return render_template(
"list_edits.html",
@ -670,11 +685,13 @@ def list_edits():
@app.route("/user/<username>")
def user_page(username):
edit_list = Edit.query.filter_by(username=username).order_by(Edit.timestamp.desc())
def user_page(username: str) -> str:
edit_list = Edit.query.filter_by(username=username).order_by( # type: ignore
Edit.timestamp.desc()
)
item_count = (
database.session.query(func.count(distinct(Edit.artwork_id)))
database.session.query(func.count(distinct(Edit.artwork_id))) # type: ignore
.filter_by(username=username)
.scalar()
)
@ -682,20 +699,22 @@ def user_page(username):
return render_template(
"user_page.html",
username=username,
edits=Edit.query,
edits=Edit.query, # type: ignore
edit_list=edit_list,
item_count=item_count,
)
@app.route("/next/Q<int:item_id>")
def next_page(item_id):
def next_page(item_id: int) -> str:
qid = f"Q{item_id}"
entity = mediawiki.get_entity_with_cache(qid)
assert entity
width = 800
image_filename = wikibase.first_datavalue(entity, "P18")
assert isinstance(image_filename, str)
image = image_with_cache(qid, image_filename, width)
label = wikibase.get_entity_label(entity)
@ -756,13 +775,13 @@ def next_page(item_id):
@app.route("/P<int:property_id>/Q<int:item_id>")
def find_more_page(property_id, item_id):
def find_more_page(property_id: int, item_id: int) -> Response:
pid, qid = f"P{property_id}", f"Q{item_id}"
return redirect(url_for("browse_page", **{pid: qid}))
return redirect(url_for("browse_page", **{pid: qid})) # type: ignore
@app.route("/toolinfo.json")
def tool_info():
def tool_info() -> Response:
info = {
"name": "wade",
"title": "Wikidata Art Depiction Explorer",
@ -775,14 +794,16 @@ def tool_info():
return jsonify(info)
def get_facets(params):
def get_facets(params) -> dict[str, typing.Any]:
properties = [pid for pid in find_more_props.keys() if pid not in request.args]
bindings = wdqs.run_from_template_with_cache(
"query/facet.sparql", params=params, isa_list=isa_list, properties=properties
)
facets = {key: [] for key in find_more_props.keys()}
facets: dict[str, list[dict[str, str | int]]] = {
key: [] for key in find_more_props.keys()
}
for row in bindings:
pid = row["property"]["value"].rpartition("/")[2]
qid = row["object"]["value"].rpartition("/")[2]
@ -800,7 +821,7 @@ def get_facets(params):
}
def get_artwork_params():
def get_artwork_params() -> list[tuple[str, str]]:
params = []
for pid, qid in request.args.items():
m = re_pid.match(pid)
@ -817,14 +838,14 @@ def get_artwork_params():
return params
def filter_artwork(params):
def filter_artwork(params: list[tuple[str, str]]) -> list[wdqs.Row]:
return wdqs.run_from_template_with_cache(
"query/find_more.sparql", params=params, isa_list=isa_list
)
@app.route("/catalog")
def catalog_page():
def catalog_page() -> str:
params = get_artwork_params()
bindings = filter_artwork(params)
page = utils.get_int_arg("page") or 1
@ -837,7 +858,7 @@ def catalog_page():
qids = [f"Q{item_id}" for item_id in sorted(item_ids)]
items = [Item.query.get(item_id) for item_id in item_ids]
items = [Item.query.get(item_id) for item_id in item_ids] # type: ignore
entities = mediawiki.get_entities_with_cache(qids)
@ -878,7 +899,9 @@ def catalog_page():
)
def get_image_detail_with_cache(items, cache_name, thumbwidth=None, refresh=False):
def get_image_detail_with_cache(
items, cache_name: str, thumbwidth: int | None = None, refresh: bool = False
):
filenames = [cur.image_filename() for cur in items]
if thumbwidth is None:
@ -902,9 +925,9 @@ def get_image_detail_with_cache(items, cache_name, thumbwidth=None, refresh=Fals
return detail
def browse_index():
def browse_index() -> str:
q = (
database.session.query(
database.session.query( # type: ignore
Triple.predicate_id, func.count(func.distinct(Triple.object_id))
)
.join(Item, Triple.subject_id == Item.item_id)
@ -918,13 +941,13 @@ def browse_index():
@app.route("/debug/show_user")
def debug_show_user():
def debug_show_user() -> str:
userinfo = wikidata_oauth.userinfo_call()
return "<pre>" + json.dumps(userinfo, indent=2) + "</pre>"
@app.route("/browse/facets.json")
def browse_facets():
def browse_facets() -> Response:
params = get_artwork_params()
if not params:
return jsonify(notice="facet criteria missing")
@ -940,7 +963,7 @@ def browse_facets():
def get_db_items(params):
"""Get items for browse page based on criteria."""
q = Item.query.filter_by(is_artwork=True)
q = Item.query.filter_by(is_artwork=True) # type: ignore
for pid, qid in params:
q = q.join(Triple, Item.item_id == Triple.subject_id, aliased=True).filter(
Triple.predicate_id == pid[1:], Triple.object_id == qid[1:]
@ -955,7 +978,9 @@ def get_db_facets(params):
facet_limit = 18
for pid, qid in params:
q = q.join(Triple, t.subject_id == Triple.subject_id, aliased=True).filter(
q = q.join( # type: ignore
Triple, t.subject_id == Triple.subject_id, aliased=True
).filter(
Triple.predicate_id == pid[1:],
Triple.object_id == qid[1:],
t.predicate_id != pid[1:],
@ -967,9 +992,9 @@ def get_db_facets(params):
results = sorted(tuple(row) for row in q.all())
facet_list = {}
subject_qids = set()
subject_qids: set[str] = set()
for predicate_id, x in itertools.groupby(results, lambda row: row[0]):
hits = sorted(list(x), key=lambda row: row[1], reverse=True)
hits = sorted(x, key=lambda row: row[1], reverse=True)
values = [
{"count": count, "qid": f"Q{value}"}
for _, count, value in hits[:facet_limit]
@ -987,7 +1012,7 @@ def get_db_facets(params):
@app.route("/browse")
def browse_page():
def browse_page() -> str:
page_size = 45
params = get_artwork_params()
@ -1051,7 +1076,7 @@ def browse_page():
@app.route("/find_more.json")
def find_more_json():
def find_more_json() -> Response:
pid = request.args.get("pid")
qid_list = request.args.getlist("qid")
limit = 6
@ -1090,7 +1115,10 @@ def find_more_json():
return jsonify(items=items)
def wikibase_search(terms):
Hit = dict[str, str | int | None]
def wikibase_search(terms: str) -> list[Hit]:
hits = []
r = mediawiki.api_call(
{
@ -1114,10 +1142,10 @@ def wikibase_search(terms):
return hits
def add_images_to_depicts_lookup(hits):
def add_images_to_depicts_lookup(hits: list[dict[str, str]]) -> None:
qid_to_item = {hit["qid"]: hit for hit in hits}
all_qids = [hit["qid"] for hit in hits]
entities = mediawiki.get_entities_with_cache(all_qids)
entities: list[Entity] = mediawiki.get_entities_with_cache(all_qids)
for entity in entities:
qid = entity["id"]
@ -1144,7 +1172,7 @@ def add_images_to_depicts_lookup(hits):
@app.route("/lookup")
def depicts_lookup():
def depicts_lookup() -> Response:
terms = request.args.get("terms")
if not terms:
return jsonify(error="terms parameter is required")
@ -1159,7 +1187,7 @@ def depicts_lookup():
item_ids = []
hits = []
q1 = DepictsItem.query.filter(DepictsItem.label.ilike(terms + "%"))
q1 = DepictsItem.query.filter(DepictsItem.label.ilike(terms + "%")) # type: ignore
seen = set()
for item in q1:
hit = {
@ -1173,7 +1201,9 @@ def depicts_lookup():
seen.add(item.qid)
cls = DepictsItemAltLabel
q2 = cls.query.filter(cls.alt_label.ilike(terms + "%"), ~cls.item_id.in_(item_ids))
q2 = cls.query.filter( # type: ignore
cls.alt_label.ilike(terms + "%"), ~cls.item_id.in_(item_ids)
)
for alt in q2:
item = alt.item
@ -1208,12 +1238,12 @@ def depicts_lookup():
@app.route("/report/missing_image")
def missing_image_report():
def missing_image_report() -> str:
limit = utils.get_int_arg("limit") or 1000
q = DepictsItem.query.order_by(DepictsItem.count.desc()).limit(limit)
q = DepictsItem.query.order_by(DepictsItem.count.desc()).limit(limit) # type:ignore
qids = [item.qid for item in q]
entities = mediawiki.get_entities_dict_with_cache(qids)
entities: dict[str, Entity] = mediawiki.get_entities_dict_with_cache(qids)
item_list = []
@ -1229,19 +1259,19 @@ def missing_image_report():
@app.route("/report/wdqs")
def wikidata_query_list():
q = WikidataQuery.query.order_by(WikidataQuery.start_time.desc())
def wikidata_query_list() -> str:
q = WikidataQuery.query.order_by(WikidataQuery.start_time.desc()) # type: ignore
return render_template("query_list.html", q=q)
@app.route("/report/blocks")
def server_block_report():
def server_block_report() -> str:
check_for_blocks()
return render_template("block_report.html")
@app.route("/fixture/save_error")
def save_error_fixture():
def save_error_fixture() -> str:
error = fixtures.save_error()["error"]
return render_template("save_error.html", error=error)

View file

@ -1,106 +1,150 @@
"""Class to represent artwork."""
from . import mediawiki
from .type import Claims, Entity, Sitelinks
class Artwork:
def __init__(self, qid):
self.entity = mediawiki.get_entity_with_cache(qid)
"""Artwork."""
entity: Entity
artist_entities: list[Entity]
def __init__(self, qid: str) -> None:
"""Init."""
entity = mediawiki.get_entity_with_cache(qid)
assert entity
self.entity = entity
self.item_id = int(qid[1:])
sites = ['commons', 'enwiki']
sites = ["commons", "enwiki"]
self.parent_categories = {site: {} for site in sites}
@property
def image_filename(self):
if 'P18' in self.entity['claims']:
return self.entity['claims']['P18'][0]['mainsnak']['datavalue']['value']
def image_filename(self) -> str | None:
"""Image filename."""
if "P18" in self.entity["claims"]:
f: str = self.entity["claims"]["P18"][0]["mainsnak"]["datavalue"]["value"]
return f
else:
return None
@property
def display_title(self):
if 'en' not in self.entity['labels']:
return self.qid
return f'{self.en_title} ({self.qid})'
def display_title(self) -> str:
"""Display title."""
return (
f"{self.en_title} ({self.qid})"
if "en" in self.entity["labels"]
else self.qid
)
@property
def url(self):
return 'https://www.wikidata.org/wiki/' + self.qid
def url(self) -> str:
"""Wikidata item URL."""
return "https://www.wikidata.org/wiki/" + self.qid
def get_artist_entities(self):
def get_artist_entities(self) -> None:
"""Get artist entities."""
self.artist_entities = []
for artist in self.artists_claim:
artist_qid = artist['id']
self.artist_entities.append(mediawiki.get_entity(artist_qid))
artist_entity = mediawiki.get_entity(artist["id"])
assert artist_entity
self.artist_entities.append(artist_entity)
def artist_labels(self):
if not hasattr(self, 'artist_entities'):
def artist_labels(self) -> list[str]:
"""Get artist Wikidata item labels."""
if not hasattr(self, "artist_entities"):
self.get_artist_entities()
return [artist['labels']['en']['value'] for artist in self.artist_entities]
return [artist["labels"]["en"]["value"] for artist in self.artist_entities]
@property
def commons_cats(self):
return [i['mainsnak']['datavalue']['value']
for i in self.entity['claims'].get('P373', [])]
def commons_cats(self) -> list[str]:
return [
i["mainsnak"]["datavalue"]["value"]
for i in self.entity["claims"].get("P373", [])
]
@property
def commons_sitelink(self):
return self.sitelinks['commons']['value'] if 'commons' in self.sitelinks else None
def commons_sitelink(self) -> str | None:
"""Wikimedia sitelink."""
return (
self.sitelinks["commons"]["value"] if "commons" in self.sitelinks else None
)
@property
def en_title(self):
if 'en' in self.entity['labels']:
return self.entity['labels']['en']['value']
def en_title(self) -> str:
if "en" in self.entity["labels"]:
title: str = self.entity["labels"]["en"]["value"]
return title
else:
return self.qid
@property
def artists_claim(self):
return [image['mainsnak']['datavalue']['value']
for image in self.entity['claims'].get('P170', [])]
def artists_claim(self) -> list[dict[str, str]]:
return [
image["mainsnak"]["datavalue"]["value"]
for image in self.entity["claims"].get("P170", [])
]
@property
def artists(self):
if not hasattr(self, 'artist_entities'):
def artists(self) -> list[dict[str, str]]:
if not hasattr(self, "artist_entities"):
self.get_artist_entities()
items = [image['mainsnak']['datavalue']['value']
for image in self.entity['claims'].get('P170', [])]
items = [
image["mainsnak"]["datavalue"]["value"]
for image in self.entity["claims"].get("P170", [])
]
lookup = {artist['id']: artist['labels'] for artist in self.artist_entities}
lookup = {artist["id"]: artist["labels"] for artist in self.artist_entities}
for item in items:
item['labels'] = lookup[item['id']]
item["labels"] = lookup[item["id"]]
return items
@property
def qid(self):
return f'Q{self.item_id}'
def qid(self) -> str:
"""Wikidata item QID."""
return f"Q{self.item_id}"
@property
def commons_filenames(self):
return [image['mainsnak']['datavalue']['value']
for image in self.entity['claims'].get('P18', [])]
def commons_filenames(self) -> list[str]:
"""Filenames of images on Wikimedia Commons."""
return [
image["mainsnak"]["datavalue"]["value"]
for image in self.entity["claims"].get("P18", [])
]
def commons_cat_from_sitelink(self):
ns = 'Category:'
def commons_cat_from_sitelink(self) -> str | None:
ns = "Category:"
if not self.commons_sitelink or not self.commons_sitelink.startswith(ns):
return
return self.commons_sitelink[len(ns):]
return None
return self.commons_sitelink[len(ns) :]
@property
def enwiki_url(self):
def enwiki_url(self) -> str | None:
"""URL for English Wikipedia article about artwork."""
enwiki = self.enwiki
if not enwiki:
return
return 'https://en.wikipedia.org/wiki/' + enwiki.replace(' ', '_')
assert enwiki
return (
"https://en.wikipedia.org/wiki/" + enwiki.replace(" ", "_")
if not enwiki
else None
)
@property
def sitelinks(self):
return self.entity['sitelinks']
def sitelinks(self) -> Sitelinks:
"""Item sitelinks."""
return self.entity["sitelinks"]
@property
def claims(self):
return self.entity['claims']
def claims(self) -> Claims:
"""Item claims."""
return self.entity["claims"]
@property
def enwiki(self):
return self.sitelinks['enwiki']['title'] if 'enwiki' in self.sitelinks else None
def enwiki(self) -> str | None:
"""Article title on English Wikipedia."""
return self.sitelinks["enwiki"]["title"] if "enwiki" in self.sitelinks else None

View file

@ -1,32 +1,51 @@
import requests
import os
"""Barnes Foundation (Q808462) - art museum in Philadephia, Pennsylvania."""
import json
import os
import typing
def get_json(catalog_id):
filename = f'cache/barnesfoundation_{catalog_id}.html'
import requests
url = 'https://collection.barnesfoundation.org/api/search'
from .type import CatalogDict
body = {"query": {"bool": {"filter": {"exists": {"field": "imageSecret"}},
"must": {"match": {"_id": int(catalog_id)}}}}}
JsonData = dict[str, dict[str, typing.Any]]
if os.path.exists(filename):
return json.load(open(filename))
else:
r = requests.get(url, params={'body': json.dumps(body)})
print(r.url)
open(filename, 'w').write(r.text)
return r.json()
def parse_catalog(data):
hit = data['hits']['hits'][0]['_source']
def get_json(catalog_id: str | int) -> JsonData:
"""Get JSON from website and cache."""
filename = f"cache/barnesfoundation_{catalog_id}.html"
return {
'institution': 'Barnes Foundation',
'description': hit['shortDescription'],
'keywords': [tag['tag'] for tag in hit['tags']],
url = "https://collection.barnesfoundation.org/api/search"
body = {
"query": {
"bool": {
"filter": {"exists": {"field": "imageSecret"}},
"must": {"match": {"_id": int(catalog_id)}},
}
}
}
def get_catalog(catalog_id):
if os.path.exists(filename):
return typing.cast(JsonData, json.load(open(filename)))
r = requests.get(url, params={"body": json.dumps(body)})
print(r.url)
open(filename, "w").write(r.text)
return typing.cast(JsonData, r.json())
def parse_catalog(data: JsonData) -> CatalogDict:
"""Parse catalog JSON."""
hit = data["hits"]["hits"][0]["_source"]
return {
"institution": "Barnes Foundation",
"description": hit["shortDescription"],
"keywords": [tag["tag"] for tag in hit["tags"]],
}
def get_catalog(catalog_id: str | int) -> CatalogDict:
"""Lookup artwork using catalog ID and return keywords."""
data = get_json(catalog_id)
return parse_catalog(data)

View file

@ -1,49 +1,71 @@
from . import utils
import re
import calendar
import re
month_pattern = '|'.join(m for m in calendar.month_name if m)
re_date_based = re.compile(r'^(\d{4}-\d{2}-\d{2}|(' + month_pattern + r') \d{4}|\d{4}s?|\d{1,2}(st|nd|rd|th)-century) ')
from . import artwork, utils
month_pattern = "|".join(m for m in calendar.month_name if m)
re_date_based = re.compile(
r"^(\d{4}-\d{2}-\d{2}|("
+ month_pattern
+ r") \d{4}|\d{4}s?|\d{1,2}(st|nd|rd|th)-century) "
)
ns_cat = "Category:"
ns_cat = 'Category:'
class Category:
def __init__(self, title, site):
"""Category."""
title: str
site: str
item: artwork.Artwork | None
def __init__(self, title: str, site: str):
"""Init."""
if title.startswith(ns_cat):
title = title[len(ns_cat):]
title = title[len(ns_cat) :]
self.title = title
self.site = site
self.item = None
def __repr__(self):
return f'{self.__class__.__name__}({self.title!r}, {self.site!r})'
def __repr__(self) -> str:
"""Repr."""
return f"{self.__class__.__name__}({self.title!r}, {self.site!r})"
def set_item(self, item):
def set_item(self, item: artwork.Artwork | None) -> None:
self.item = item
@property
def url(self):
return utils.wiki_url(self.title, self.site, ns='Category')
def url(self) -> str | None:
"""Category URL."""
assert self.title and self.site
return utils.wiki_url(self.title, self.site, ns="Category")
def date_based(self):
def date_based(self) -> bool:
"""Category title is date based."""
return bool(re_date_based.match(self.title))
def contains_artist_name(self):
def contains_artist_name(self) -> bool:
"""Category title contains artists name."""
if not self.item:
return
return any(artist.lower() in self.title.lower()
for artist in self.item.artist_labels())
return False
return any(
artist.lower() in self.title.lower() for artist in self.item.artist_labels()
)
def parents(self):
def parents(self) -> list["Category"]:
"""Parent categories."""
if not self.item:
return []
return self.item.parent_categories[self.site].get(self.title, [])
def is_exhibition(self):
return any(parent.title.startswith('Art exhibitions ')
for parent in self.parents())
def is_exhibition(self) -> bool:
"""Category represents art exhibition."""
return any(
parent.title.startswith("Art exhibitions ") for parent in self.parents()
)
def names_for_wikidata(self):
def names_for_wikidata(self) -> list[str]:
highlight = self.check()
interesting = len(highlight) > 1
@ -58,7 +80,7 @@ class Category:
continue
title = text.strip()
title = title[0].upper() + title[1:]
for sep in ' with ', ' at ', ' wearing ':
for sep in " with ", " at ", " wearing ":
if sep in title:
before, _, after = title.partition(sep)
names = []
@ -66,44 +88,76 @@ class Category:
names += utils.also_singular(x)
return names
return utils.also_singular(title)
return []
def urls_for_wikidata(self):
return [utils.wiki_url(name, self.site, ns='Category')
for name in self.names_for_wikidata()]
def urls_for_wikidata(self) -> list[str]:
return [
utils.wiki_url(name, self.site, ns="Category")
for name in self.names_for_wikidata()
]
def check(self):
def check(self) -> list[tuple[bool, str]]:
cat = self.title
lc_cat = cat.lower()
by_endings = ['title', 'technique', 'period', 'century', 'country', 'movement',
'medium', 'year', 'painter']
by_endings = [
"title",
"technique",
"period",
"century",
"country",
"movement",
"medium",
"year",
"painter",
]
if self.item:
by_endings += self.item.artist_labels()
for after in ('in art', 'in portrait paintings', 'in landscape paintings', 'in culture', 'in popular culture', 'in painting', 'in 1', 'in 2', 'looking at viewer'):
for after in (
"in art",
"in portrait paintings",
"in landscape paintings",
"in culture",
"in popular culture",
"in painting",
"in 1",
"in 2",
"looking at viewer",
):
pos = lc_cat.find(after)
# don't highlight "1512 in art"
if pos == -1 or cat[:pos - 1].isdigit():
if pos == -1 or cat[: pos - 1].isdigit():
continue
return [(True, cat[:pos]), (False, cat[pos:])]
for before in ('paintings of', 'portraits of', 'landscapes of',
'portraits with', 'paintings with', 'paintings depicting',
'portraits depicting', 'landscapes depicting', 'works about'):
for before in (
"paintings of",
"portraits of",
"landscapes of",
"portraits with",
"paintings with",
"paintings depicting",
"portraits depicting",
"landscapes depicting",
"works about",
):
pos = lc_cat.find(before)
if pos == -1:
continue
pos += len(before)
for by_ending in by_endings:
ending = ' by ' + by_ending
ending = " by " + by_ending
if lc_cat.endswith(ending):
return [(False, cat[:pos]),
(True, cat[pos:-len(ending)]),
(False, cat[-len(ending):])]
return [
(False, cat[:pos]),
(True, cat[pos : -len(ending)]),
(False, cat[-len(ending) :]),
]
return [(False, cat[:pos]), (True, cat[pos:])]
pos = lc_cat.find('of ')
pos = lc_cat.find("of ")
if pos != -1:
return [(True, cat[:pos]), (False, cat[pos:])]

View file

@ -1,36 +1,43 @@
from . import mediawiki, utils
"""Wikimedia Commons API call."""
commons_url = 'https://commons.wikimedia.org/w/api.php'
from . import mediawiki, utils
from .type import CallParams
commons_url = "https://commons.wikimedia.org/w/api.php"
page_size = 50
def image_detail(filenames, thumbheight=None, thumbwidth=None):
def image_detail(
filenames: list[str] | str,
thumbheight: int | None = None,
thumbwidth: int | None = None,
) -> dict[str, dict[str, str]]:
"""Get image detail from Wikimedia Commons."""
if not isinstance(filenames, list):
filenames = [filenames]
if not filenames:
return {}
params = {
'action': 'query',
'prop': 'imageinfo',
'iiprop': 'url',
params: CallParams = {
"action": "query",
"prop": "imageinfo",
"iiprop": "url",
}
if thumbheight is not None:
params['iiurlheight'] = thumbheight
params["iiurlheight"] = thumbheight
if thumbwidth is not None:
params['iiurlwidth'] = thumbwidth
params["iiurlwidth"] = thumbwidth
images = {}
for cur in utils.chunk(filenames, page_size):
call_params = params.copy()
call_params['titles'] = '|'.join(f'File:{f}' for f in cur)
call_params["titles"] = "|".join(f"File:{f}" for f in cur)
r = mediawiki.api_post(call_params, api_url=commons_url)
for image in r.json()['query']['pages']:
filename = utils.drop_start(image['title'], 'File:')
images[filename] = image['imageinfo'][0] if 'imageinfo' in image else None
for image in r.json()["query"]["pages"]:
filename = utils.drop_start(image["title"], "File:")
images[filename] = image["imageinfo"][0] if "imageinfo" in image else None
return images

View file

@ -1,21 +1,35 @@
"""Interact with SQL database."""
import typing
import flask
import sqlalchemy
from sqlalchemy import create_engine, func
from sqlalchemy.orm import scoped_session, sessionmaker
session = scoped_session(sessionmaker())
def init_db(db_url):
session.configure(bind=get_engine(db_url))
def get_engine(db_url):
def get_engine(db_url: str) -> sqlalchemy.engine.base.Engine:
"""Create an engine object."""
return create_engine(db_url, pool_recycle=3600, pool_size=20, max_overflow=40)
def init_app(app, echo=False):
db_url = app.config['DB_URL']
session.configure(bind=get_engine(db_url, echo=echo))
@app.teardown_appcontext
def shutdown_session(exception=None):
session.remove()
def init_db(db_url: str) -> None:
"""Initialise database."""
session.configure(bind=get_engine(db_url)) # type:ignore
def now_utc():
return func.timezone('utc', func.now())
def init_app(app: flask.app.Flask) -> None:
"""Initialise database connection within flask app."""
db_url = app.config["DB_URL"]
session.configure(bind=get_engine(db_url)) # type: ignore
@app.teardown_appcontext # type: ignore
def shutdown_session(exception: Exception | None = None) -> None:
session.remove() # type: ignore
def now_utc() -> typing.Any:
"""Get current time in UTC."""
return func.timezone("utc", func.now())

View file

@ -1,55 +1,55 @@
import requests
import lxml.html
"""Detroit Institute of Arts (Q1201549) - art museum in Detroit, Michigan."""
import os
import re
re_url = re.compile(r'https?://www.dia.org/art/collection/object/(.+)$')
import lxml.html
import requests
def get_html(url):
from .type import CatalogDict
re_url = re.compile(r"https?://www.dia.org/art/collection/object/(.+)$")
def get_html(url: str) -> str | None:
"""Get HTML from web catalog."""
m = re_url.search(url)
if not m:
return
catalog_id = m.group(1).replace('/', '_')
return None
catalog_id = m.group(1).replace("/", "_")
filename = f'cache/dia_{catalog_id}.html'
filename = f"cache/dia_{catalog_id}.html"
if os.path.exists(filename):
html = open(filename).read()
else:
r = requests.get(url)
html = r.text
open(filename, 'w').write(html)
open(filename, "w").write(html)
return html
def parse_html(html):
def parse_html(html: str) -> CatalogDict:
"""Parse HTML and extract keywords."""
root = lxml.html.fromstring(html)
keywords = []
for a in root.findall('.//a[@href]'):
href = a.get('href')
if not href.startswith('/art/collection?keys='):
for a in root.findall(".//a[@href]"):
href = a.get("href")
assert href is not None
if not href.startswith("/art/collection?keys="):
continue
assert a.text
keywords.append(a.text)
if False:
sidebar = root.find('.//aside[@id="sidebar"]')
h2_list = sidebar.findall('.//h2')
h2_keyword = next((h2 for h2 in h2_list if h2.text == 'Keywords'), None)
if not h2_keyword:
return {}
keyword_div = h2_keyword.getparent()
for a in keyword_div:
if a.tag != 'a':
continue
keywords.append(a.text)
return {
'institution': 'Detroit Institute of Arts',
'keywords': keywords,
"institution": "Detroit Institute of Arts",
"keywords": keywords,
}
def get_catalog(url):
def get_catalog(url: str) -> CatalogDict | None:
"""Get catalog web page and extract keywords."""
html = get_html(url)
if html:
return parse_html(html)
return parse_html(html) if html else None

View file

@ -1,32 +1,48 @@
"""Send mail to admin when an error happens."""
import logging
from logging.handlers import SMTPHandler
from logging import Formatter
from flask import request, g
from logging.handlers import SMTPHandler
PROJECT = 'depicts'
import flask
from flask import g, request
class MatcherSMTPHandler(SMTPHandler):
def getSubject(self, record): # noqa: N802
subject = (f'{PROJECT} error: {record.exc_info[0].__name__}'
if (record.exc_info and record.exc_info[0])
else f'{PROJECT} error: {record.pathname}:{record.lineno:d}')
PROJECT = "depicts"
if qid := getattr(g, 'qid', None):
subject += f' {qid}'
if label := getattr(g, 'label', None):
subject += f': {label}'
class MySMTPHandler(SMTPHandler):
"""Custom SMTP handler to change mail subject."""
def getSubject(self, record: logging.LogRecord) -> str:
"""Specify subject line for error mails."""
subject = (
f"{PROJECT} error: {record.exc_info[0].__name__}"
if (record.exc_info and record.exc_info[0])
else f"{PROJECT} error: {record.pathname}:{record.lineno:d}"
)
if qid := getattr(g, "qid", None):
subject += f" {qid}"
if label := getattr(g, "label", None):
subject += f": {label}"
return subject
class RequestFormatter(Formatter):
"""Custom logging formatter to include request."""
def format(self, record):
"""Record includes request."""
record.request = request
return super().format(record)
def setup_error_mail(app):
formatter = RequestFormatter('''
def setup_error_mail(app: flask.Flask) -> None:
"""Send mail to admins when an error happens."""
formatter = RequestFormatter(
"""
Message type: {levelname}
Location: {pathname:s}:{lineno:d}
Module: {module:s}
@ -38,13 +54,17 @@ def setup_error_mail(app):
Message:
{message:s}
''', style='{')
""",
style="{",
)
mail_handler = MatcherSMTPHandler(app.config['SMTP_HOST'],
app.config['MAIL_FROM'],
app.config['ADMINS'],
app.name + ' error',
timeout=30)
mail_handler = MySMTPHandler(
app.config["SMTP_HOST"],
app.config["MAIL_FROM"],
app.config["ADMINS"],
app.name + " error",
timeout=30,
)
mail_handler.setFormatter(formatter)
mail_handler.setLevel(logging.ERROR)

View file

@ -1,3 +1,40 @@
def save_error():
return {"error":{"code":"failed-save","info":"The save has failed.","messages":[{"name":"wikibase-api-failed-save","parameters":[],"html":"The save has failed."},{"name":"wikimedia-globalblocking-ipblocked-range","parameters":["[//meta.wikimedia.org/wiki/User:Jon_Kolbert Jon Kolbert]","meta.wikimedia.org","[[m:NOP|Open Proxy]]: Colocation webhost, Contact [[m:Special:Contact/stewards|stewards]] if you are affected","04:21, 8 April 2020","04:21, 8 April 2023","78.129.222.14","78.129.128.0/17"],"html":"<p><b>Your IP address is in a range that has been <a href=\"https://meta.wikimedia.org/wiki/Special:MyLanguage/Global_blocks\" class=\"extiw\" title=\"m:Special:MyLanguage/Global blocks\">blocked on all Wikimedia Foundation wikis</a>.</b>\n</p><p>The block was made by <a class=\"external text\" href=\"https://meta.wikimedia.org/wiki/User:Jon_Kolbert\">Jon Kolbert</a> (meta.wikimedia.org).\nThe reason given is <i><a href=\"https://meta.wikimedia.org/wiki/NOP\" class=\"extiw\" title=\"m:NOP\">Open Proxy</a>: Colocation webhost, Contact <a href=\"https://meta.wikimedia.org/wiki/Special:Contact/stewards\" class=\"extiw\" title=\"m:Special:Contact/stewards\">stewards</a> if you are affected</i>.\n</p>\n<ul><li>Start of block: 04:21, 8 April 2020</li>\n<li>Expiry of block: 04:21, 8 April 2023</li></ul>\n<p>Your current IP address is 78.129.222.14 and the blocked range is 78.129.128.0/17.\nPlease include all above details in any queries you make.\n</p><p>If you believe you were blocked by mistake, you can find additional information and instructions in the <a href=\"https://meta.wikimedia.org/wiki/Special:MyLanguage/No_open_proxies\" class=\"extiw\" title=\"m:Special:MyLanguage/No open proxies\">No open proxies</a> global policy.\nOtherwise, to discuss the block please <a href=\"https://meta.wikimedia.org/wiki/Steward_requests/Global\" class=\"extiw\" title=\"m:Steward requests/Global\">post a request for review on Meta-Wiki</a> or send an email to the <a href=\"https://meta.wikimedia.org/wiki/Special:MyLanguage/Stewards\" class=\"extiw\" title=\"m:Special:MyLanguage/Stewards\">stewards</a> <a href=\"https://meta.wikimedia.org/wiki/Special:MyLanguage/OTRS\" class=\"extiw\" title=\"m:Special:MyLanguage/OTRS\">OTRS</a> queue at <kbd>stewards@wikimedia.org</kbd> including all above details.\n</p>"},{"name":"permissionserrors","parameters":[],"html":"Permission error"}],"docref":"See https://www.wikidata.org/w/api.php for API usage. Subscribe to the mediawiki-api-announce mailing list at &lt;https://lists.wikimedia.org/mailman/listinfo/mediawiki-api-announce&gt; for notice of API deprecations and breaking changes."},"servedby":"mw1315"}
"""Fixtures."""
import typing
def save_error() -> dict[str, str | dict[str, typing.Any]]:
"""Save error reply."""
return {
"error": {
"code": "failed-save",
"info": "The save has failed.",
"messages": [
{
"name": "wikibase-api-failed-save",
"parameters": [],
"html": "The save has failed.",
},
{
"name": "wikimedia-globalblocking-ipblocked-range",
"parameters": [
"[//meta.wikimedia.org/wiki/User:Jon_Kolbert Jon Kolbert]",
"meta.wikimedia.org",
"[[m:NOP|Open Proxy]]: Colocation webhost, Contact [[m:Special:Contact/stewards|stewards]] if you are affected",
"04:21, 8 April 2020",
"04:21, 8 April 2023",
"78.129.222.14",
"78.129.128.0/17",
],
"html": '<p><b>Your IP address is in a range that has been <a href="https://meta.wikimedia.org/wiki/Special:MyLanguage/Global_blocks" class="extiw" title="m:Special:MyLanguage/Global blocks">blocked on all Wikimedia Foundation wikis</a>.</b>\n</p><p>The block was made by <a class="external text" href="https://meta.wikimedia.org/wiki/User:Jon_Kolbert">Jon Kolbert</a> (meta.wikimedia.org).\nThe reason given is <i><a href="https://meta.wikimedia.org/wiki/NOP" class="extiw" title="m:NOP">Open Proxy</a>: Colocation webhost, Contact <a href="https://meta.wikimedia.org/wiki/Special:Contact/stewards" class="extiw" title="m:Special:Contact/stewards">stewards</a> if you are affected</i>.\n</p>\n<ul><li>Start of block: 04:21, 8 April 2020</li>\n<li>Expiry of block: 04:21, 8 April 2023</li></ul>\n<p>Your current IP address is 78.129.222.14 and the blocked range is 78.129.128.0/17.\nPlease include all above details in any queries you make.\n</p><p>If you believe you were blocked by mistake, you can find additional information and instructions in the <a href="https://meta.wikimedia.org/wiki/Special:MyLanguage/No_open_proxies" class="extiw" title="m:Special:MyLanguage/No open proxies">No open proxies</a> global policy.\nOtherwise, to discuss the block please <a href="https://meta.wikimedia.org/wiki/Steward_requests/Global" class="extiw" title="m:Steward requests/Global">post a request for review on Meta-Wiki</a> or send an email to the <a href="https://meta.wikimedia.org/wiki/Special:MyLanguage/Stewards" class="extiw" title="m:Special:MyLanguage/Stewards">stewards</a> <a href="https://meta.wikimedia.org/wiki/Special:MyLanguage/OTRS" class="extiw" title="m:Special:MyLanguage/OTRS">OTRS</a> queue at <kbd>stewards@wikimedia.org</kbd> including all above details.\n</p>',
},
{
"name": "permissionserrors",
"parameters": [],
"html": "Permission error",
},
],
"docref": "See https://www.wikidata.org/w/api.php for API usage. Subscribe to the mediawiki-api-announce mailing list at &lt;https://lists.wikimedia.org/mailman/listinfo/mediawiki-api-announce&gt; for notice of API deprecations and breaking changes.",
},
"servedby": "mw1315",
}

View file

@ -1,19 +1,30 @@
from .model import HumanItem
from . import mediawiki, wikibase
"""Human."""
import re
re_four_digits = re.compile(r'\b\d{4}\b')
from . import mediawiki, wikibase
from .model import HumanItem
from .type import HumanDict
re_iso_date = re.compile(r'\b\d{4}-\d{2}-\d{2}\b')
re_four_and_two = re.compile(r'\b(\d{2})(\d{2})[-](\d{2})\b')
re_catalog_number = re.compile(r'\b\d{4}[^\d]+\d+[^\d]+\d{4}\b')
re_four_digits = re.compile(r"\b\d{4}\b")
def query(yob, yod):
re_iso_date = re.compile(r"\b\d{4}-\d{2}-\d{2}\b")
re_four_and_two = re.compile(r"\b(\d{2})(\d{2})[-](\d{2})\b")
re_catalog_number = re.compile(r"\b\d{4}[^\d]+\d+[^\d]+\d{4}\b")
def query(yob: int, yod: int) -> list[HumanItem]:
"""Search for people with given birth and death years."""
if yod < yob:
return []
return HumanItem.query.filter_by(yob=yob, yod=yod).all()
humans: list[HumanItem] = HumanItem.query.filter_by( # type: ignore
yob=yob, yod=yod
).all()
return humans
def get_items_from_name(name):
def get_items_from_name(name: str) -> list[HumanItem]:
"""Get people with name."""
found = []
m = re_four_and_two.search(name)
@ -32,27 +43,29 @@ def get_items_from_name(name):
return found
def from_name(name):
def from_name(name: str) -> list[HumanDict]:
"""Find candidate items from name."""
candidates = get_items_from_name(name)
lookup = {item.qid: item for item in candidates}
lookup = {str(item.qid): item for item in candidates}
qids = list(lookup.keys())
found = []
for entity in mediawiki.get_entities_with_cache(qids, props='labels|descriptions'):
if 'redirects' in entity or 'missing' in entity:
for entity in mediawiki.get_entities_with_cache(qids, props="labels|descriptions"):
if "redirects" in entity or "missing" in entity:
continue
qid = entity['id']
qid = entity["id"]
item = lookup[qid]
i = {
'qid': entity['id'],
'year_of_birth': item.year_of_birth,
'year_of_death': item.year_of_death,
i: HumanDict = {
"qid": entity["id"],
"year_of_birth": item.year_of_birth,
"year_of_death": item.year_of_death,
}
label = wikibase.get_entity_label(entity)
if label:
i['label'] = label
if 'en' in entity.get('descriptions', {}):
i['description'] = entity['descriptions']['en']['value']
i["label"] = label
if "en" in entity.get("descriptions", {}):
i["description"] = entity["descriptions"]["en"]["value"]
found.append(i)
found.sort(key=lambda i: i.get('label', ''))
found.sort(key=lambda i: i.get("label", ""))
return found

View file

@ -1,20 +1,25 @@
from flask import current_app
"""Send email."""
import smtplib
from email.mime.text import MIMEText
from email.utils import formatdate, make_msgid
import smtplib
def send_mail(subject, body):
from flask import current_app
def send_mail(subject: str, body: str) -> None:
"""Send email to site admin."""
app = current_app
mail_to = app.config['ADMIN_EMAIL']
mail_from = app.config['MAIL_FROM']
msg = MIMEText(body, 'plain', 'UTF-8')
mail_to = app.config["ADMIN_EMAIL"]
mail_from = app.config["MAIL_FROM"]
msg = MIMEText(body, "plain", "UTF-8")
msg['Subject'] = subject
msg['To'] = mail_to
msg['From'] = mail_from
msg['Date'] = formatdate()
msg['Message-ID'] = make_msgid()
msg["Subject"] = subject
msg["To"] = mail_to
msg["From"] = mail_from
msg["Date"] = formatdate()
msg["Message-ID"] = make_msgid()
s = smtplib.SMTP(app.config['SMTP_HOST'])
s = smtplib.SMTP(app.config["SMTP_HOST"])
s.sendmail(mail_from, [mail_to], msg.as_string())
s.quit()

View file

@ -1,114 +1,149 @@
import requests
import os
import json
import hashlib
from .category import Category
from . import utils
"""Access MediaWiki API."""
wikidata_url = 'https://www.wikidata.org/w/api.php'
import hashlib
import json
import os
import typing
import requests
from . import utils
from .category import Category
from .type import CallParams, Entity
wikidata_url = "https://www.wikidata.org/w/api.php"
page_size = 50
hosts = {
'commons': 'commons.wikimedia.org',
'enwiki': 'en.wikipedia.org',
'wikidata': 'www.wikidata.org',
"commons": "commons.wikimedia.org",
"enwiki": "en.wikipedia.org",
"wikidata": "www.wikidata.org",
}
def api_call(params, api_url=wikidata_url):
call_params = {
'format': 'json',
'formatversion': 2,
def api_call(params: CallParams, api_url: str = wikidata_url) -> requests.Response:
"""Mediawiki API call."""
call_params: CallParams = {
"format": "json",
"formatversion": 2,
**params,
}
r = requests.get(api_url, params=call_params, timeout=5)
return r
def api_post(params, api_url=wikidata_url):
call_params = {
'format': 'json',
'formatversion': 2,
def api_post(params: CallParams, api_url: str = wikidata_url) -> requests.Response:
call_params: CallParams = {
"format": "json",
"formatversion": 2,
**params,
}
r = requests.post(api_url, data=call_params, timeout=5)
return r
def get_list(list_name, **params):
r = api_call({'action': 'query', 'list': list_name, **params})
return r.json()['query'][list_name]
def get_entity(qid, redirects=False):
json_data = api_call({'action': 'wbgetentities',
'ids': qid,
'redirects': {True: 'yes', False: 'no'}[redirects]}).json()
def get_list(list_name: str, **params: str | int) -> list[dict[str, typing.Any]]:
r = api_call({"action": "query", "list": list_name, **params})
list_contents: list[dict[str, typing.Any]] = r.json()["query"][list_name]
return list_contents
def get_entity(qid: str, redirects: bool = False) -> Entity | None:
"""Get entity from wikibase."""
json_data = api_call(
{
"action": "wbgetentities",
"ids": qid,
"redirects": {True: "yes", False: "no"}[redirects],
}
).json()
try:
entity = list(json_data['entities'].values())[0]
entity = list(json_data["entities"].values())[0]
except KeyError:
return
if 'missing' not in entity:
return entity
return None
if "missing" not in entity:
return typing.cast(Entity, entity)
return None
def wbgetentities(ids, **params):
def wbgetentities(ids: typing.Iterable[str], **params: str | int) -> dict[str, Entity]:
"""Get entities from wikibase."""
if not ids:
return []
return {}
params = {
'action': 'wbgetentities',
'ids': '|'.join(ids),
"action": "wbgetentities",
"ids": "|".join(ids),
**params,
}
return api_call(params).json()['entities']
ret: dict[str, Entity] = api_call(params).json()["entities"]
return ret
def get_entities(ids, **params):
entity_list = []
def get_entities(ids: typing.Iterable[str], **params: str | int) -> list[Entity]:
entity_list: list[Entity] = []
for cur in utils.chunk(ids, page_size):
entity_list += wbgetentities(cur, **params).values()
return entity_list
def get_entities_dict(ids, **params):
def get_entities_dict(ids: str, **params: str | int) -> dict[str, Entity]:
entities = {}
for cur in utils.chunk(ids, page_size):
entities.update(wbgetentities(cur, **params))
return entities
def get_entity_with_cache(qid, refresh=False):
filename = f'cache/{qid}.json'
def get_entity_with_cache(qid: str, refresh: bool = False) -> Entity | None:
filename = f"cache/{qid}.json"
entity: Entity | None
if not refresh and os.path.exists(filename):
entity = json.load(open(filename))
else:
entity = get_entity(qid, redirects=True)
json.dump(entity, open(filename, 'w'), indent=2)
json.dump(entity, open(filename, "w"), indent=2)
return entity
def get_entities_with_cache(ids, **params):
md5 = hashlib.md5(' '.join(ids).encode('utf-8')).hexdigest()
filename = f'cache/entities_{md5}.json'
def get_entities_with_cache(ids: list[str], **params: typing.Any) -> list[Entity]:
md5 = hashlib.md5(" ".join(ids).encode("utf-8")).hexdigest()
entity_list: list[Entity]
filename = f"cache/entities_{md5}.json"
if os.path.exists(filename):
entity_list = json.load(open(filename))
else:
entity_list = get_entities(ids, **params)
json.dump(entity_list, open(filename, 'w'), indent=2)
json.dump(entity_list, open(filename, "w"), indent=2)
return entity_list
def get_entities_dict_with_cache(all_ids, **params):
def get_entities_dict_with_cache(
all_ids: list[str], **params: typing.Any
) -> dict[str, Entity]:
entities = {}
for ids in utils.chunk(all_ids, page_size):
md5 = hashlib.md5(' '.join(ids).encode('utf-8')).hexdigest()
md5 = hashlib.md5(" ".join(ids).encode("utf-8")).hexdigest()
filename = f'cache/entities_dict_{md5}.json'
filename = f"cache/entities_dict_{md5}.json"
if os.path.exists(filename):
entities.update(json.load(open(filename)))
continue
cur = wbgetentities(ids, **params)
json.dump(cur, open(filename, 'w'), indent=2)
json.dump(cur, open(filename, "w"), indent=2)
entities.update(cur)
return entities
def mediawiki_query(titles, params, site):
Page = dict[str, typing.Any]
def mediawiki_query(titles: list[str], params: CallParams, site: str) -> list[Page]:
"""Mediawiki query."""
if not titles:
return []
@ -116,74 +151,85 @@ def mediawiki_query(titles, params, site):
# FIXME: switch to utils.chunk
if len(titles) > page_size:
titles = titles[:page_size]
base = {
'format': 'json',
'formatversion': 2,
'action': 'query',
'continue': '',
'titles': '|'.join(titles),
base: CallParams = {
"format": "json",
"formatversion": 2,
"action": "query",
"continue": "",
"titles": "|".join(titles),
}
p = base.copy()
p.update(params)
query_url = f'https://{hosts[site]}/w/api.php'
query_url = f"https://{hosts[site]}/w/api.php"
r = requests.get(query_url, params=p)
expect = 'application/json; charset=utf-8'
expect = "application/json; charset=utf-8"
success = True
if r.status_code != 200:
print('status code: {r.status_code}'.format(r=r))
print("status code: {r.status_code}".format(r=r))
success = False
if r.headers['content-type'] != expect:
print('content-type: {r.headers[content-type]}'.format(r=r))
if r.headers["content-type"] != expect:
print(f'content-type: {r.headers["content-type"]}')
success = False
assert success
json_reply = r.json()
if 'query' not in json_reply:
if "query" not in json_reply:
print(r.url)
print(r.text)
return json_reply['query']['pages']
pages: list[Page] = json_reply["query"]["pages"]
return pages
def get_content_and_categories(title, site):
params = {
'prop': 'revisions|categories',
'clshow': '!hidden',
'cllimit': 'max',
'rvprop': 'content',
def get_content_and_categories(title: str, site: str) -> tuple[str, list[str]]:
"""Get article contents and categories."""
params: CallParams = {
"prop": "revisions|categories",
"clshow": "!hidden",
"cllimit": "max",
"rvprop": "content",
}
pages = mediawiki_query([title], params, site)
assert len(pages) == 1
page = pages[0]
return (page['revisions'][0]['content'], page.get('categories', []))
return (page["revisions"][0]["content"], page.get("categories", []))
def host_from_site(site):
def host_from_site(site: str) -> str:
"""Host from site."""
return hosts[site]
def process_cats(cats, site):
return [Category(cat['title'], site) for cat in cats]
def get_categories(titles, site):
params = {
'prop': 'categories',
'clshow': '!hidden',
'cllimit': 'max',
def process_cats(cats: list[dict[str, str]], site: str) -> list[Category]:
"""Process categories."""
return [Category(cat["title"], site) for cat in cats]
def get_categories(titles: list[str], site: str) -> list[tuple[str, list[Category]]]:
"""Get categories for pages with given titles."""
params: CallParams = {
"prop": "categories",
"clshow": "!hidden",
"cllimit": "max",
}
from_wiki = mediawiki_query(titles, params, site)
title_and_cats = []
for i in from_wiki:
if 'categories' not in i:
if "categories" not in i:
continue
cats = process_cats(i['categories'], site)
cats = process_cats(i["categories"], site)
if not cats:
continue
title_and_cats.append((i['title'], cats))
title_and_cats.append((i["title"], cats))
return title_and_cats
def get_history(title, site):
params = {
'prop': 'revisions',
'rvlimit': 'max',
'rvprop': 'timestamp|user|comment|ids|content',
'rvslots': 'main',
def get_history(title: str, site: str) -> list[Page]:
"""Get history of a page."""
params: CallParams = {
"prop": "revisions",
"rvlimit": "max",
"rvprop": "timestamp|user|comment|ids|content",
"rvslots": "main",
}
return mediawiki_query([title], params, site)

View file

@ -1,163 +1,181 @@
from sqlalchemy.ext.declarative import declarative_base
from .database import session, now_utc
from . import wikibase, utils
from sqlalchemy.schema import Column, ForeignKey
from sqlalchemy.types import Integer, String, DateTime, Boolean
from sqlalchemy.orm import column_property, relationship, synonym
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.sql.expression import cast
from sqlalchemy.dialects import postgresql
import typing
from datetime import timedelta
from urllib.parse import quote
from sqlalchemy.dialects import postgresql
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import column_property, relationship, synonym
from sqlalchemy.schema import Column, ForeignKey
from sqlalchemy.sql.expression import cast
from sqlalchemy.types import Boolean, DateTime, Integer, String
from . import utils, wikibase
from .database import now_utc, session
from .type import Claims, Entity
Base = declarative_base()
Base.query = session.query_property()
Base.query = session.query_property() # type: ignore
class User(Base):
__tablename__ = 'user'
__tablename__ = "user"
id = Column(Integer, primary_key=True, autoincrement=False)
username = Column(String, unique=True)
options = Column(postgresql.JSON)
first_seen = Column(DateTime, default=now_utc())
is_admin = Column(Boolean, default=False)
class DepictsItem(Base):
__tablename__ = 'depicts'
__tablename__ = "depicts"
item_id = Column(Integer, primary_key=True, autoincrement=False)
label = Column(String)
description = Column(String)
count = Column(Integer)
qid = column_property('Q' + cast(item_id, String))
db_alt_labels = relationship('DepictsItemAltLabel',
collection_class=set,
cascade='save-update, merge, delete, delete-orphan',
backref='item')
alt_labels = association_proxy('db_alt_labels', 'alt_label')
qid = column_property("Q" + cast(item_id, String))
db_alt_labels = relationship(
"DepictsItemAltLabel",
collection_class=set,
cascade="save-update, merge, delete, delete-orphan",
backref="item",
)
alt_labels = association_proxy("db_alt_labels", "alt_label") # type: ignore
class DepictsItemAltLabel(Base):
__tablename__ = 'depicts_alt_label'
item_id = Column(Integer,
ForeignKey('depicts.item_id'),
primary_key=True,
autoincrement=False)
__tablename__ = "depicts_alt_label"
item_id = Column(
Integer, ForeignKey("depicts.item_id"), primary_key=True, autoincrement=False
)
alt_label = Column(String, primary_key=True)
def __init__(self, alt_label):
def __init__(self, alt_label: str) -> None:
self.alt_label = alt_label
class Item(Base):
__tablename__ = 'item'
__tablename__ = "item"
item_id = Column(Integer, primary_key=True, autoincrement=False)
# label = Column(String) # column removed 2019-12-18
entity = Column(postgresql.JSON)
lastrevid = Column(Integer, nullable=True, unique=True)
modified = Column(DateTime, nullable=True)
is_artwork = Column(Boolean, nullable=False, default=False)
qid = column_property('Q' + cast(item_id, String))
qid = column_property("Q" + cast(item_id, String))
def image_count(self):
p18 = self.entity['claims'].get('P18')
@property
def claims(self) -> Claims:
return typing.cast(Entity, self.entity)["claims"]
def image_count(self) -> int:
p18 = self.claims.get("P18")
return len(p18) if p18 else 0
def image_filename(self):
p18 = self.entity['claims'].get('P18')
def image_filename(self) -> str | None:
p18 = self.claims.get("P18")
if not p18:
return
return None
try:
return p18[0]['mainsnak']['datavalue']['value']
filename: str = p18[0]["mainsnak"]["datavalue"]["value"]
return filename
except KeyError:
return
return None
@property
def label(self):
return wikibase.get_entity_label(self.entity)
def label(self) -> str | None:
return wikibase.get_entity_label(typing.cast(Entity, self.entity))
@property
def artist(self):
v = wikibase.first_datavalue(self.entity, 'P170')
if not v:
return
return v['id']
def artist(self) -> str | None:
v = wikibase.first_datavalue(typing.cast(Entity, self.entity), "P170")
assert isinstance(v, dict)
return v["id"] if v else None
@property
def depicts(self):
return self.linked_qids('P180')
def depicts(self) -> list[str]:
return self.linked_qids("P180")
@property
def instance_of(self):
return self.linked_qids('P31')
def instance_of(self) -> list[str]:
return self.linked_qids("P31")
def linked_qids(self, prop):
values = self.entity['claims'].get(prop) or []
return [v['mainsnak']['datavalue']['value']['id']
for v in values
if 'datavalue' in v['mainsnak']]
def linked_qids(self, prop: str) -> list[str]:
values = typing.cast(Entity, self.entity)["claims"].get(prop) or []
return [
v["mainsnak"]["datavalue"]["value"]["id"]
for v in values
if "datavalue" in v["mainsnak"]
]
@property
def date(self):
v = wikibase.first_datavalue(self.entity, 'P571')
if v:
return utils.format_time(v['time'], v['precision'])
def date(self) -> str | None:
v = wikibase.first_datavalue(typing.cast(Entity, self.entity), "P571")
assert isinstance(v, dict)
return utils.format_time(v["time"], v["precision"]) if v else None
class Triple(Base):
__tablename__ = 'triple'
subject_id = Column(Integer,
ForeignKey('item.item_id'),
primary_key=True)
__tablename__ = "triple"
subject_id = Column(Integer, ForeignKey("item.item_id"), primary_key=True)
predicate_id = Column(Integer, primary_key=True, index=True)
object_id = Column(Integer, primary_key=True, index=True)
subject = relationship('Item', backref='triples')
subject = relationship("Item", backref="triples")
class HumanItem(Base):
__tablename__ = 'human'
__tablename__ = "human"
item_id = Column(Integer, primary_key=True, autoincrement=False)
year_of_birth = Column(Integer, nullable=False)
year_of_death = Column(Integer, nullable=False)
age_at_death = column_property(year_of_death - year_of_birth)
qid = column_property('Q' + cast(item_id, String))
qid = column_property("Q" + cast(item_id, String))
yob = synonym("year_of_birth")
yod = synonym("year_of_death")
yob = synonym('year_of_birth')
yod = synonym('year_of_death')
class Language(Base):
__tablename__ = 'language'
__tablename__ = "language"
item_id = Column(Integer, primary_key=True, autoincrement=False)
wikimedia_language_code = Column(String, index=True, unique=True)
en_label = Column(String, nullable=False)
code = synonym('wikimedia_language_code')
label = synonym('en_label')
code = synonym("wikimedia_language_code")
label = synonym("en_label")
@classmethod
def get_by_code(cls, code):
return cls.query.filter_by(wikimedia_language_code=code).one()
def get_by_code(cls, code: str) -> "Language":
return cls.query.filter_by(wikimedia_language_code=code).one() # type: ignore
class Edit(Base):
__tablename__ = 'edit'
__tablename__ = "edit"
username = Column(String, primary_key=True)
artwork_id = Column(Integer, ForeignKey('item.item_id'), primary_key=True)
depicts_id = Column(Integer, ForeignKey('depicts.item_id'), primary_key=True)
artwork_id = Column(Integer, ForeignKey("item.item_id"), primary_key=True)
depicts_id = Column(Integer, ForeignKey("depicts.item_id"), primary_key=True)
timestamp = Column(DateTime, default=now_utc())
lastrevid = Column(Integer, nullable=True)
artwork_qid = column_property('Q' + cast(artwork_id, String))
depicts_qid = column_property('Q' + cast(depicts_id, String))
artwork_qid = column_property("Q" + cast(artwork_id, String))
depicts_qid = column_property("Q" + cast(depicts_id, String))
artwork = relationship('Item')
depicts = relationship('DepictsItem')
artwork = relationship("Item")
depicts = relationship("DepictsItem")
@property
def url_norm_username(self):
return quote(self.username.replace(' ', '_'))
def url_norm_username(self) -> str:
return quote(self.username.replace(" ", "_"))
@property
def user_wikidata_url(self):
return 'https://www.wikidata.org/wiki/User:' + self.url_norm_username
def user_wikidata_url(self) -> str:
return "https://www.wikidata.org/wiki/User:" + self.url_norm_username
class WikidataQuery(Base):
__tablename__ = 'wikidata_query'
__tablename__ = "wikidata_query"
id = Column(Integer, primary_key=True)
start_time = Column(DateTime)
end_time = Column(DateTime)
@ -171,27 +189,27 @@ class WikidataQuery(Base):
endpoint = Column(String)
@property
def duration(self):
if self.end_time:
return self.end_time - self.start_time
def duration(self) -> timedelta:
assert self.start_time and self.end_time
return self.end_time - self.start_time
@property
def display_seconds(self):
return f'{self.duration.total_seconds():.1f}'
def display_seconds(self) -> str:
return f"{self.duration.total_seconds():.1f}"
@property
def template(self):
def template(self) -> str | None:
if not self.query_template:
return
return None
t = self.query_template
if t.startswith('query/'):
if t.startswith("query/"):
t = t[6:]
if t.endswith('.sparql'):
if t.endswith(".sparql"):
t = t[:-7]
return t
@property
def bad(self):
return self.status_code and self.status_code != 200
def bad(self) -> bool:
return bool(self.status_code and self.status_code != 200)

View file

@ -1,42 +1,57 @@
import requests
import lxml.html
"""Museo del Prado (Q160112) - Spanish national art museum in Madrid, Spain."""
import os
import re
import typing
re_url = re.compile(r'www.museodelprado.es/(.+)$')
import lxml.html
import requests
def get_html(url):
catalog_id = re_url.search(url).group(1).replace('/', '_')
from .type import CatalogDict, EmptyDict
filename = f'cache/museodelprado_{catalog_id}.html'
re_url = re.compile(r"www.museodelprado.es/(.+)$")
def get_html(url: str) -> str:
"""Get HTML from web catalog."""
assert (m := re_url.search(url))
catalog_id = m.group(1).replace("/", "_")
filename = f"cache/museodelprado_{catalog_id}.html"
if os.path.exists(filename):
html = open(filename).read()
else:
r = requests.get(url)
html = r.text
open(filename, 'w').write(html)
open(filename, "w").write(html)
return html
def parse_html(html):
def parse_html(html: str) -> CatalogDict | EmptyDict:
"""Parse HTML and extract keywords."""
root = lxml.html.fromstring(html)
keywords = []
for h2 in root.findall('.//h2'):
if not h2.text or h2.text.strip() != 'Displayed objects':
for h2 in root.findall(".//h2"):
if not h2.text or h2.text.strip() != "Displayed objects":
continue
div = h2.getparent()
for keyword_span in div.findall('.//span[@property]'):
assert div is not None
for keyword_span in div.findall(".//span[@property]"):
assert isinstance(keyword_span.text, str)
keywords.append(keyword_span.text)
if not keywords:
return {}
return typing.cast(EmptyDict, {})
return {
'institution': 'Museo del Prado',
'keywords': keywords,
"institution": "Museo del Prado",
"keywords": keywords,
}
def get_catalog(url):
def get_catalog(url: str) -> CatalogDict | EmptyDict:
"""Get catalog web page and extract keywords."""
return parse_html(get_html(url))

View file

@ -1,37 +1,52 @@
import requests
import lxml.html
"""National Portrait Gallery (Q238587) - art museum in London, England."""
import os
import re
re_url = re.compile(r'www.npg.org.uk/collections/search/(.+)$')
import lxml.html
import requests
def get_html(url):
catalog_id = re_url.search(url).group(1).replace('/', '_')
from .type import CatalogDict
filename = f'cache/npg_{catalog_id}.html'
re_url = re.compile(r"www.npg.org.uk/collections/search/(.+)$")
def get_html(url: str) -> str:
"""Get HTML from web catalog."""
assert (m := re_url.search(url))
catalog_id = m.group(1).replace("/", "_")
filename = f"cache/npg_{catalog_id}.html"
if os.path.exists(filename):
html = open(filename).read()
else:
r = requests.get(url)
html = r.text
open(filename, 'w').write(html)
open(filename, "w").write(html)
return html
def parse_html(html):
def parse_html(html: str) -> CatalogDict:
"""Parse HTML and extract keywords."""
root = lxml.html.fromstring(html)
keywords = [a.text for a in root.findall('.//a[@href]')
if 'subj=' in a.get('href')]
keywords = [
a.text
for a in root.findall(".//a[@href]")
if "subj=" in a.get("href") # type: ignore
]
skip = {'oil', 'painting'}
keywords = [k for k in keywords if k.lower() not in skip]
skip = {"oil", "painting"}
keywords = [k for k in keywords if k.lower() not in skip] # type: ignore
return {
'institution': 'National Portrait Gallery',
'keywords': keywords,
"institution": "National Portrait Gallery",
"keywords": keywords, # type: ignore
}
def get_catalog(url):
def get_catalog(url: str) -> CatalogDict:
"""Get catalog web page and extract keywords."""
return parse_html(get_html(url))

View file

@ -1,47 +1,80 @@
"""Pagination."""
import typing
from math import ceil
from flask import request, url_for
from flask import Flask, request, url_for
T = typing.TypeVar("T")
class Pagination(object):
def __init__(self, page, per_page, total_count):
"""Pagination."""
page: int
per_page: int
total_count: int
def __init__(self, page: int, per_page: int, total_count: int) -> None:
"""Init."""
self.page = page
self.per_page = per_page
self.total_count = total_count
@property
def pages(self):
def pages(self) -> int:
"""Page count."""
return int(ceil(self.total_count / float(self.per_page)))
@property
def has_prev(self):
def has_prev(self) -> bool:
"""Has previous page."""
return self.page > 1
@property
def has_next(self):
def has_next(self) -> bool:
"""Has next page."""
return self.page < self.pages
def slice(self, items):
first = ((self.page - 1) * self.per_page)
def slice(self, items: list[T]) -> list[T]:
"""Slice of items for the current page."""
first = (self.page - 1) * self.per_page
last = self.page * self.per_page
return items[first:last]
def iter_pages(self, left_edge=2, left_current=6,
right_current=6, right_edge=2):
def iter_pages(
self,
left_edge: int = 2,
left_current: int = 6,
right_current: int = 6,
right_edge: int = 2,
) -> typing.Iterator[int | None]:
"""Iterate page numbers."""
last = 0
for num in range(1, self.pages + 1):
if num <= left_edge or \
(num > self.page - left_current - 1 and \
num < self.page + right_current) or \
num > self.pages - right_edge:
if (
num <= left_edge
or (
num > self.page - left_current - 1
and num < self.page + right_current
)
or num > self.pages - right_edge
):
if last + 1 != num:
yield None
yield num
last = num
def url_for_other_page(page):
def url_for_other_page(page: int) -> str:
"""Make URL for other page."""
assert request.view_args is not None and request.endpoint
args = request.view_args.copy()
args.update(request.args)
args['page'] = page
args["page"] = page
return url_for(request.endpoint, **args)
def init_pager(app):
app.jinja_env.globals['url_for_other_page'] = url_for_other_page
def init_pager(app: Flask) -> None:
"""Initialise pager."""
app.jinja_env.globals["url_for_other_page"] = url_for_other_page

View file

@ -1,16 +1,25 @@
"""Enable Python requests that ignores bad HTTPS certificates."""
import typing
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.ssl_ import create_urllib3_context
CIPHERS = 'DEFAULT@SECLEVEL=1'
CIPHERS = "DEFAULT@SECLEVEL=1"
class HTTPSAdapter(HTTPAdapter):
def init_poolmanager(self, *args, **kwargs):
context = create_urllib3_context(ciphers=CIPHERS)
kwargs['ssl_context'] = context
return super().init_poolmanager(*args, **kwargs)
"""HTTPS Adapter subclass."""
def get(*args, **kwargs):
def init_poolmanager(self, *args: typing.Any, **kwargs: typing.Any) -> None:
"""Init pool manager."""
context = create_urllib3_context(ciphers=CIPHERS)
kwargs["ssl_context"] = context
return super().init_poolmanager(*args, **kwargs) # type: ignore
def get(*args: typing.Any, **kwargs: typing.Any) -> requests.Response:
s = requests.Session()
s.mount('https://', HTTPSAdapter())
s.mount("https://", HTTPSAdapter())
return s.get(*args, **kwargs, verify=False)

View file

@ -1,34 +1,51 @@
import requests
import lxml.html
"""Rijksmuseuma (Q190804) - museum in Amsterdam, Netherlands."""
import os
import re
re_url = re.compile(r'^https://www.rijksmuseum.nl/(?:nl/collectie|en/collection)/([^/]+)$')
import lxml.html
import requests
def get_html(catalog_id):
filename = f'cache/rijksmuseum_{catalog_id}.html'
en_url = 'https://www.rijksmuseum.nl/en/collection/' + catalog_id
from .type import CatalogDict
re_url = re.compile(
r"^https://www.rijksmuseum.nl/(?:nl/collectie|en/collection)/([^/]+)$"
)
def get_html(catalog_id: str) -> str:
"""Get HTML from web catalog."""
filename = f"cache/rijksmuseum_{catalog_id}.html"
en_url = "https://www.rijksmuseum.nl/en/collection/" + catalog_id
if os.path.exists(filename):
html = open(filename).read()
else:
r = requests.get(en_url)
html = r.text
open(filename, 'w').write(html)
open(filename, "w").write(html)
return html
def parse_html(html):
def parse_html(html: str) -> CatalogDict:
"""Parse HTML and extract keywords."""
root = lxml.html.fromstring(html)
keywords = [a.text for a in root.findall('.//a[@href]')
if 'f.classification.iconClassDescription.sort' in a.get('href')]
keywords = [
a.text
for a in root.findall(".//a[@href]")
if "f.classification.iconClassDescription.sort" in a.get("href") # type: ignore
]
return {
'institution': 'Rijksmuseum',
'keywords': keywords,
"institution": "Rijksmuseum",
"keywords": keywords, # type: ignore
}
def get_catalog(url):
catalog_id = re_url.search(url).group(1)
def get_catalog(url: str) -> CatalogDict:
"""Get catalog web page and extract keywords."""
assert (m := re_url.search(url))
catalog_id = m.group(1)
return parse_html(get_html(catalog_id))

View file

@ -1,45 +1,59 @@
import requests
import lxml.html
"""Smithsonian American Art Museum (Q1192305) - fine arts museum in Washington, D.C."""
import json
import os
import typing
def get_html(saam_id):
filename = f'cache/saam_{saam_id}.html'
url = 'http://americanart.si.edu/collections/search/artwork/'
import lxml.html
import requests
from .type import CatalogDict, EmptyDict
def get_html(saam_id: str | int) -> str:
"""Get HTML from web catalog."""
filename = f"cache/saam_{saam_id}.html"
url = "http://americanart.si.edu/collections/search/artwork/"
if os.path.exists(filename):
html = open(filename).read()
else:
r = requests.get(url, params={'id': saam_id})
r = requests.get(url, params={"id": saam_id})
html = r.text
open(filename, 'w').write(html)
open(filename, "w").write(html)
return html
def parse_html(html):
def parse_html(html: str) -> dict[str, typing.Any] | None:
"""Parse HTML and extract keywords."""
root = lxml.html.fromstring(html)
ld_json = root.findtext('.//script[@type="application/ld+json"]')
if ld_json is None:
return {'ld': {}, 'keywords': []}
return {"ld": {}, "keywords": []}
ld = json.loads(ld_json)
ul = root.find('.//ul[@class="ontology-list"]')
if ul is None:
return
assert ul.tag == 'ul'
return None
assert ul.tag == "ul"
keywords = [li.text for li in ul]
return {'ld': ld, 'keywords': keywords}
return {"ld": ld, "keywords": keywords}
def get_catalog(saam_id):
def get_catalog(saam_id: int | str) -> CatalogDict | EmptyDict:
"""Get catalog web page and extract keywords."""
data = parse_html(get_html(saam_id))
empty: EmptyDict = {}
if not data:
return {}
ret = {
'institution': 'Smithsonian American Art Museum',
return empty
ret: CatalogDict = {
"institution": "Smithsonian American Art Museum",
"keywords": [],
}
if data['keywords']:
ret['keywords'] = data['keywords']
if 'description' in data['ld']:
ret['description'] = data['ld']['description']
if data["keywords"]:
ret["keywords"] = data["keywords"]
if "description" in data["ld"]:
ret["description"] = data["ld"]["description"]
return ret if 'description' in ret or 'keywords' in ret else {}
return ret if "description" in ret or "keywords" in ret else empty

50
depicts/type.py Normal file
View file

@ -0,0 +1,50 @@
import typing
from typing import Required, TypedDict
Claims = dict[str, list[dict[str, typing.Any]]]
Sitelinks = dict[str, dict[str, typing.Any]]
class EmptyDict(TypedDict):
"""Empty dict."""
pass
class Entity(TypedDict, total=False):
"""Wikidata Entity."""
id: str
labels: dict[str, typing.Any]
descriptions: dict[str, typing.Any]
claims: Claims
lastrevid: int
sitelinks: Sitelinks
modified: str
redirects: dict[str, typing.Any]
aliases: dict[str, list[dict[str, typing.Any]]]
class CatalogDict(TypedDict, total=False):
"""Catalog record from institution web site."""
institution: str
url: str
ids: set[str]
detail: list[dict[str, str]]
description: str
keywords: list[str]
CallParams = dict[str, str | int]
class HumanDict(TypedDict, total=False):
"""Human."""
qid: Required[str]
year_of_birth: Required[int]
year_of_death: Required[int]
label: str
description: str

View file

@ -1,91 +1,121 @@
from flask import request
from itertools import islice
from datetime import datetime
"""Various utility functions."""
import typing
import urllib.parse
from datetime import datetime
from itertools import islice
import inflect
from flask import request
hosts = {
'commons': 'commons.wikimedia.org',
'enwiki': 'en.wikipedia.org',
'wikidata': 'www.wikidata.org',
"commons": "commons.wikimedia.org",
"enwiki": "en.wikipedia.org",
"wikidata": "www.wikidata.org",
}
engine = inflect.engine()
skip_names = {
'National Gallery'
}
skip_names = {"National Gallery"}
def ordinal(n):
return "%d%s" % (n, 'tsnrhtdd'[(n / 10 % 10 != 1) * (n % 10 < 4) * n % 10::4])
T = typing.TypeVar("T")
def chunk(it, size):
def ordinal(n: int) -> str:
"""Convert number to ordinal."""
return "%d%s" % (n, "tsnrhtdd"[(n / 10 % 10 != 1) * (n % 10 < 4) * n % 10 :: 4])
def chunk(it: typing.Iterable[T], size: int) -> typing.Iterator[tuple[T, ...]]:
"""Split an iterable into chunks of the given size."""
it = iter(it)
return iter(lambda: tuple(islice(it, size)), ())
def drop_start(s, start):
def drop_start(s: str, start: str) -> str:
"""Remove string prefix, otherwise throw an error."""
assert s.startswith(start)
return s[len(start):]
return s[len(start) :]
def drop_category_ns(s):
return drop_start(s, 'Category:')
def parse_sitelink(s, start):
return urllib.parse.unquote(drop_start(s, start)).replace('_', ' ')
def drop_category_ns(s: str) -> str:
"""Remove 'Category:' from start of string."""
return drop_start(s, "Category:")
def word_contains_letter(word):
def parse_sitelink(s: str, start: str) -> str:
"""Extract title from sitelink."""
return urllib.parse.unquote(drop_start(s, start)).replace("_", " ")
def word_contains_letter(word: str) -> bool:
"""Word contains letter."""
return any(c.isalpha() for c in word)
def also_singular(name):
def also_singular(name: str) -> list[str]:
names = also_singular_main(name)
extra = []
for n in names:
words = set(n.lower().split())
for word in 'girl', 'boy':
for word in "girl", "boy":
if word in words:
extra.append(word)
if {'female', 'females', 'women'} & words:
extra.append('woman')
if {'male', 'males', 'men'} & words:
extra.append('man')
if {"female", "females", "women"} & words:
extra.append("woman")
if {"male", "males", "men"} & words:
extra.append("man")
return [n for n in names + extra if n not in skip_names]
def also_singular_main(name):
'''
def also_singular_main(name: str) -> list[str]:
"""
given a singular name return a list of both the plural and singular versions
just return the name if it isn't singular
'''
singular = engine.singular_noun(name.strip('|'))
"""
singular = engine.singular_noun(name.strip("|"))
if not singular:
return [name]
n, s = name.lower(), singular.lower()
if (n == s or
n.replace('paintings', '') == s.replace('painting', '') or
n == 'venus' and s == 'venu'):
if (
n == s
or n.replace("paintings", "") == s.replace("painting", "")
or n == "venus"
and s == "venu"
):
return [name]
return [name, singular]
def wiki_url(title, site, ns=None):
def wiki_url(title: str, site: str, ns: str | None = None) -> str:
"""Build mediawiki URL for given title."""
host = hosts[site]
url_ns = ns + ':' if ns else ''
if not title:
return
url_ns = ns + ":" if ns else ""
assert title
if title[0].islower():
title = title[0].upper() + title[1:]
return f'https://{host}/wiki/' + url_ns + urllib.parse.quote(title.replace(' ', '_'))
return (
f"https://{host}/wiki/" + url_ns + urllib.parse.quote(title.replace(" ", "_"))
)
def get_int_arg(name):
def get_int_arg(name: str) -> int | None:
"""Get an request arg and convert to integer."""
if name in request.args and request.args[name].isdigit():
return int(request.args[name])
else:
return None
def format_time(time_value, precision):
def format_time(time_value: str, precision: int) -> str:
"""Format time with given precision."""
# FIXME handle dates like '1965-04-00T00:00:00Z'
# FIXME handle BC dates properly, "120 B.C." instead of "-120"
year = None
if '-00' in time_value:
if "-00" in time_value:
# can't be represented as python datetime
year = int(time_value[:time_value.find('-', 1)])
year = int(time_value[: time_value.find("-", 1)])
else:
try:
t = datetime.strptime(time_value[1:], "%Y-%m-%dT%H:%M:%SZ")
@ -96,10 +126,10 @@ def format_time(time_value, precision):
if precision == 9:
return str(year)
if precision == 8:
return f'{year}s'
return f"{year}s"
if precision == 7:
return f'{ordinal((year // 100) + 1)} century'
return f"{ordinal((year // 100) + 1)} century"
if precision == 6:
return f'{ordinal((year // 1000) + 1)} millennium'
return f"{ordinal((year // 1000) + 1)} millennium"
return time_value

View file

@ -1,147 +1,345 @@
from depicts import (wikibase, relaxed_ssl, saam, dia, rijksmuseum, npg,
museodelprado, barnesfoundation)
import hashlib
import os.path
import lxml.html
import requests
import requests.exceptions
import lxml.html
import os.path
import hashlib
user_agent = 'Mozilla/5.0 (X11; Linux i586; rv:32.0) Gecko/20160101 Firefox/32.0'
from depicts import (
barnesfoundation,
dia,
museodelprado,
npg,
relaxed_ssl,
rijksmuseum,
saam,
wikibase,
)
from .type import CatalogDict, Entity
user_agent = "Mozilla/5.0 (X11; Linux i586; rv:32.0) Gecko/20160101 Firefox/32.0"
table = {
'P347': ('Joconde ID', 'https://www.pop.culture.gouv.fr/notice/joconde/$1'),
'P350': ('RKDimages ID', 'https://rkd.nl/explore/images/$1'),
'P1212': ('Atlas ID', 'http://cartelen.louvre.fr/cartelen/visite?srv=car_not_frame&idNotice=$1'),
'P1428': ('Lost Art ID', 'http://www.lostart.de/EN/Verlust/$1'),
'P1679': ('Art UK artwork ID', 'https://artuk.org/discover/artworks/$1'),
'P1726': ('Florentine musea Inventario 1890 ID', 'http://www.polomuseale.firenze.it/inv1890/scheda.asp?position=1&ninv=$1'),
'P2014': ('Museum of Modern Art work ID', 'http://www.moma.org/collection/works/$1'),
'P2092': ('Bildindex der Kunst und Architektur ID', 'https://www.bildindex.de/document/obj$1'),
'P2108': ('Kunstindeks Danmark artwork ID', 'https://www.kulturarv.dk/kid/VisVaerk.do?vaerkId=$1'),
'P2242': ('Florentine musea catalogue ID', 'http://www.polomuseale.firenze.it/catalogo/scheda.asp?nctn=$1&value=1'),
'P2282': ('Groeningemuseum work PID', 'http://groeningemuseum.be/collection/work/id/$1'),
'P2344': ('AGORHA work ID', 'http://www.purl.org/inha/agorha/003/$1'),
'P2511': ('MSK Gent work PID', 'http://mskgent.be/collection/work/id/$1'),
'P2539': ('Nationalmuseum Sweden artwork ID', 'http://collection.nationalmuseum.se/eMuseumPlus?service=ExternalInterface&module=collection&objectId=$1&viewType=detailView'),
'P2582': ('J. Paul Getty Museum object ID', 'http://www.getty.edu/art/collection/objects/$1'),
'P3272': ('Zeri image ID', 'http://catalogo.fondazionezeri.unibo.it/scheda/opera/$1/'),
'P3293': ('BALaT object ID', 'http://balat.kikirpa.be/object/$1'),
'P3386': ('French Sculpture Census work ID', 'https://frenchsculpture.org/en/sculpture/$1'),
'P3467': ('Inventario Sculture - Polo Museale Fiorentino', 'http://www.polomuseale.firenze.it/invSculture/scheda.asp?position=1&ninv=$1'),
'P3504': ('Florentine Inventario Palatina art ID', 'http://www.polomuseale.firenze.it/invpalatina/scheda.asp?position=1&ninv=$1'),
'P3634': ('The Met object ID', 'http://www.metmuseum.org/art/collection/search/$1'),
'P3711': ('Vanderkrogt.net Statues ID', 'http://vanderkrogt.net/statues/object.php?record=$1'),
'P3855': ('LombardiaBeniCulturali artwork ID', 'http://www.lombardiabeniculturali.it/opere-arte/schede/$1/'),
'P3929': ('V&A item ID', 'http://collections.vam.ac.uk/item/$1'),
'P4144': ('Athenaeum artwork ID', 'http://www.the-athenaeum.org/art/detail.php?id=$1'),
'P4257': ('National Museums of Japan e-museum ID', 'http://www.emuseum.jp/detail/$1'),
'P4373': ('National Trust Collections ID', 'http://www.nationaltrustcollections.org.uk/object/$1'),
'P4380': ('Sandrart.net artwork ID', 'http://ta.sandrart.net/-artwork-$1'),
'P4399': ('Enciclopédia Itaú Cultural ID', 'http://enciclopedia.itaucultural.org.br/$1'),
'P4525': ('MuIS object ID', 'http://opendata.muis.ee/object/$1'),
'P4564': ('Art Museum of Estonia artwork ID', 'https://digikogu.ekm.ee/oid-$1'),
'P4582': ('Kulturelles Erbe Köln object ID', 'https://www.kulturelles-erbe-koeln.de/documents/obj/$1'),
'P4610': ('ARTIC artwork ID', 'https://www.artic.edu/artworks/$1'),
'P4611': ('LACMA ID', 'https://collections.lacma.org/node/$1'),
'P4625': ('Museum of Fine Arts, Boston object ID', 'https://www.mfa.org/collections/object/$1'),
'P4643': ('Philadelphia Museum of Art ID', 'http://www.philamuseum.org/collections/permanent/$1.html'),
'P4659': ("Musée d'Orsay artwork ID", 'http://www.musee-orsay.fr/en/collections/index-of-works/notice.html?nnumid=$1'),
'P4673': ('Museum of Fine Arts, Houston object ID', 'https://www.mfah.org/art/detail/$1'),
'P4674': ('Indianapolis Museum of Art artwork ID', 'http://collection.imamuseum.org/artwork/$1/'),
'P4683': ('National Gallery of Art artwork ID', 'https://www.nga.gov/content/ngaweb/Collection/art-object-page.$1.html'),
'P4684': ('National Gallery of Victoria artwork ID', 'https://www.ngv.vic.gov.au/explore/collection/work/$1/'),
'P4686': ('Carnegie Museum of Art ID', 'https://collection.cmoa.org/objects/$1'),
'P4692': ('American Art Collaborative object ID', 'http://browse.americanartcollaborative.org/object/$1.html'),
'P4701': ('Google Arts & Culture asset ID', 'https://artsandculture.google.com/asset/wd/$1'),
'P4704': ('Smithsonian American Art Museum ID', 'https://americanart.si.edu/collections/search/artwork/?id=$1'),
'P4709': ('Barnes Foundation ID', 'https://collection.barnesfoundation.org/objects/$1/details'),
'P4712': ('Minneapolis Institute of Art artwork ID', 'https://collections.artsmia.org/art/$1'),
'P4713': ('Walters Art Museum ID', 'http://art.thewalters.org/detail/$1'),
'P4721': ('MuBE Virtual ID', 'http://mubevirtual.com.br/pt_br?Dados&area=ver&id=$1'),
'P4737': ('Solomon R. Guggenheim Foundation artwork ID', 'https://www.guggenheim.org/artwork/$1'),
'P4738': ('Yale Center for British Art artwork ID', 'http://collections.britishart.yale.edu/vufind/Record/$1'),
'P4739': ('Musée des Augustins artwork ID', 'https://www.augustins.org/fr/oeuvre/-/oeuvre/$1'),
'P4740': ('Brooklyn Museum artwork ID', 'https://www.brooklynmuseum.org/opencollection/objects/$1'),
'P4761': ("Images d'Art artwork ID", 'http://art.rmngp.fr/en/library/artworks/$1'),
'P4764': ('Arcade artwork ID', 'http://www.culture.gouv.fr/public/mistral/arcade_fr?ACTION=CHERCHER&FIELD_1=REF&VALUE_1=$1'),
'P4814': ('Inventories of American Painting and Sculpture control number', 'https://siris-artinventories.si.edu/ipac20/ipac.jsp?&menu=search&index=.NW&term=$1'),
'P4905': ('KMSKA work PID', 'http://kmska.be/collection/work/id/$1'),
'P5210': ('National Gallery of Armenia work ID', 'http://www.gallery.am/en/database/item/$1/'),
'P5223': ('Information Center for Israeli Art artwork ID', 'http://museum.imj.org.il/artcenter/includes/item.asp?id=$1'),
'P5265': ('Dordrechts Museum artwork ID', 'https://www.dordrechtsmuseum.nl/objecten/id/$1'),
'P5268': ('MNAV work ID', 'http://acervo.mnav.gub.uy/obras.php?q=ni:$1'),
'P5269': ('Web umenia work ID', 'https://www.webumenia.sk/dielo/$1'),
'P5407': ('MHK object ID', 'http://datenbank.museum-kassel.de/$1'),
'P5499': ('Boijmans work ID', 'https://www.boijmans.nl/en/collection/artworks/$1'),
'P5783': ('Cranach Digital Archive artwork ID', 'http://lucascranach.org/$1'),
'P5823': ('Belvedere object ID', 'https://digital.belvedere.at/objects/$1/'),
'P5891': ('Bpk-ID', 'http://www.bpk-images.de/id/$1'),
'P6004': ('Brasiliana Iconográfica ID', 'https://www.brasilianaiconografica.art.br/obras/$1/wd'),
'P6007': ('Salons ID', 'http://salons.musee-orsay.fr/index/notice/$1'),
'P6020': ("d'Art d'Art ! ID", 'https://www.france.tv/france-2/d-art-d-art/$1.html'),
'P6141': ('À nos grands hommes ID', 'https://anosgrandshommes.musee-orsay.fr/index.php/Detail/objects/$1'),
'P6152': ('National Portrait Gallery (United States) object ID', 'http://npg.si.edu/object/npg_$1'),
'P6238': ('Monument aux morts ID', 'https://monumentsmorts.univ-lille.fr/monument/$1/wd/'),
'P6239': ('IEC commemorative monument of Catalonia ID', 'https://monuments.iec.cat/fitxa.asp?id=$1'),
'P6246': ('Paris Musées work ID', 'http://parismuseescollections.paris.fr/en/node/$1'),
'P6310': ('Muséosphère work ID', 'http://museosphere.paris.fr/oeuvres/$1'),
'P6332': ("Panorama de l'art ID", 'https://www.panoramadelart.com/$1'),
'P6355': ('MNAM artwork ID', 'https://collection.centrepompidou.fr/#/artwork/$1'),
'P6356': ('IHOI work ID', 'http://www.ihoi.org/app/photopro.sk/ihoi_icono/detail?docid=$1&lang=eng'),
'P6358': ('Musée Picasso artwork ID', 'https://www.navigart.fr/picassoparis/#/artwork/$1'),
'P6372': ('Interpol WOA artwork ID (OBSOLETE)', 'https://www.interpol.int/notice/search/woa/$1'),
'P6374': ('MAMVP artwork ID', 'http://www.mam.paris.fr/en/online-collections#/artwork/$1'),
'P6489': ('Joan Miró Online Image Bank ID', 'https://www.successiomiro.com/catalogue/object/$1'),
'P6506': ('Eliseu Visconti Project ID', 'https://eliseuvisconti.com.br/obra/$1'),
'P6565': ('Musenor artwork ID', 'https://webmuseo.com/ws/musenor/app/collection/record/$1'),
'P6576': ('Art Fund artwork ID', 'https://www.artfund.org/supporting-museums/art-weve-helped-buy/artwork/$1/wd'),
'P6595': ('Paintings by Salvador Dalí ID', 'https://www.salvador-dali.org/en/artwork/catalogue-raisonne/obra/$1/'),
'P6610': ('Ashmolean museum ID', 'http://collections.ashmolean.org/object/$1'),
'P6625': ('Salvador Dali Museum ID', 'http://archive.thedali.org/mwebcgi/mweb.exe?request=record;id=$1;type=101'),
'P6629': ('Artcurial lot ID', 'https://www.artcurial.com/en/$1'),
'P6631': ('Tainacan MHN ID', 'http://mhn.acervos.museus.gov.br/reserva-tecnica/$1'),
'P6633': ('Cini Foundation ID', 'http://arte.cini.it/Opere/$1'),
'P6643': ('TV Spielfilm series ID', 'https://www.tvspielfilm.de/serien/$1'),
'P6738': ('Whitney Museum of American Art artwork ID', 'https://whitney.org/collection/works/$1'),
'P7229': ('Fundación Goya en Aragón ID', 'https://fundaciongoyaenaragon.es/obra/wd/$1'),
"P347": ("Joconde ID", "https://www.pop.culture.gouv.fr/notice/joconde/$1"),
"P350": ("RKDimages ID", "https://rkd.nl/explore/images/$1"),
"P1212": (
"Atlas ID",
"http://cartelen.louvre.fr/cartelen/visite?srv=car_not_frame&idNotice=$1",
),
"P1428": ("Lost Art ID", "http://www.lostart.de/EN/Verlust/$1"),
"P1679": ("Art UK artwork ID", "https://artuk.org/discover/artworks/$1"),
"P1726": (
"Florentine musea Inventario 1890 ID",
"http://www.polomuseale.firenze.it/inv1890/scheda.asp?position=1&ninv=$1",
),
"P2014": (
"Museum of Modern Art work ID",
"http://www.moma.org/collection/works/$1",
),
"P2092": (
"Bildindex der Kunst und Architektur ID",
"https://www.bildindex.de/document/obj$1",
),
"P2108": (
"Kunstindeks Danmark artwork ID",
"https://www.kulturarv.dk/kid/VisVaerk.do?vaerkId=$1",
),
"P2242": (
"Florentine musea catalogue ID",
"http://www.polomuseale.firenze.it/catalogo/scheda.asp?nctn=$1&value=1",
),
"P2282": (
"Groeningemuseum work PID",
"http://groeningemuseum.be/collection/work/id/$1",
),
"P2344": ("AGORHA work ID", "http://www.purl.org/inha/agorha/003/$1"),
"P2511": ("MSK Gent work PID", "http://mskgent.be/collection/work/id/$1"),
"P2539": (
"Nationalmuseum Sweden artwork ID",
"http://collection.nationalmuseum.se/eMuseumPlus?service=ExternalInterface&module=collection&objectId=$1&viewType=detailView",
),
"P2582": (
"J. Paul Getty Museum object ID",
"http://www.getty.edu/art/collection/objects/$1",
),
"P3272": (
"Zeri image ID",
"http://catalogo.fondazionezeri.unibo.it/scheda/opera/$1/",
),
"P3293": ("BALaT object ID", "http://balat.kikirpa.be/object/$1"),
"P3386": (
"French Sculpture Census work ID",
"https://frenchsculpture.org/en/sculpture/$1",
),
"P3467": (
"Inventario Sculture - Polo Museale Fiorentino",
"http://www.polomuseale.firenze.it/invSculture/scheda.asp?position=1&ninv=$1",
),
"P3504": (
"Florentine Inventario Palatina art ID",
"http://www.polomuseale.firenze.it/invpalatina/scheda.asp?position=1&ninv=$1",
),
"P3634": ("The Met object ID", "http://www.metmuseum.org/art/collection/search/$1"),
"P3711": (
"Vanderkrogt.net Statues ID",
"http://vanderkrogt.net/statues/object.php?record=$1",
),
"P3855": (
"LombardiaBeniCulturali artwork ID",
"http://www.lombardiabeniculturali.it/opere-arte/schede/$1/",
),
"P3929": ("V&A item ID", "http://collections.vam.ac.uk/item/$1"),
"P4144": (
"Athenaeum artwork ID",
"http://www.the-athenaeum.org/art/detail.php?id=$1",
),
"P4257": (
"National Museums of Japan e-museum ID",
"http://www.emuseum.jp/detail/$1",
),
"P4373": (
"National Trust Collections ID",
"http://www.nationaltrustcollections.org.uk/object/$1",
),
"P4380": ("Sandrart.net artwork ID", "http://ta.sandrart.net/-artwork-$1"),
"P4399": (
"Enciclopédia Itaú Cultural ID",
"http://enciclopedia.itaucultural.org.br/$1",
),
"P4525": ("MuIS object ID", "http://opendata.muis.ee/object/$1"),
"P4564": ("Art Museum of Estonia artwork ID", "https://digikogu.ekm.ee/oid-$1"),
"P4582": (
"Kulturelles Erbe Köln object ID",
"https://www.kulturelles-erbe-koeln.de/documents/obj/$1",
),
"P4610": ("ARTIC artwork ID", "https://www.artic.edu/artworks/$1"),
"P4611": ("LACMA ID", "https://collections.lacma.org/node/$1"),
"P4625": (
"Museum of Fine Arts, Boston object ID",
"https://www.mfa.org/collections/object/$1",
),
"P4643": (
"Philadelphia Museum of Art ID",
"http://www.philamuseum.org/collections/permanent/$1.html",
),
"P4659": (
"Musée d'Orsay artwork ID",
"http://www.musee-orsay.fr/en/collections/index-of-works/notice.html?nnumid=$1",
),
"P4673": (
"Museum of Fine Arts, Houston object ID",
"https://www.mfah.org/art/detail/$1",
),
"P4674": (
"Indianapolis Museum of Art artwork ID",
"http://collection.imamuseum.org/artwork/$1/",
),
"P4683": (
"National Gallery of Art artwork ID",
"https://www.nga.gov/content/ngaweb/Collection/art-object-page.$1.html",
),
"P4684": (
"National Gallery of Victoria artwork ID",
"https://www.ngv.vic.gov.au/explore/collection/work/$1/",
),
"P4686": ("Carnegie Museum of Art ID", "https://collection.cmoa.org/objects/$1"),
"P4692": (
"American Art Collaborative object ID",
"http://browse.americanartcollaborative.org/object/$1.html",
),
"P4701": (
"Google Arts & Culture asset ID",
"https://artsandculture.google.com/asset/wd/$1",
),
"P4704": (
"Smithsonian American Art Museum ID",
"https://americanart.si.edu/collections/search/artwork/?id=$1",
),
"P4709": (
"Barnes Foundation ID",
"https://collection.barnesfoundation.org/objects/$1/details",
),
"P4712": (
"Minneapolis Institute of Art artwork ID",
"https://collections.artsmia.org/art/$1",
),
"P4713": ("Walters Art Museum ID", "http://art.thewalters.org/detail/$1"),
"P4721": (
"MuBE Virtual ID",
"http://mubevirtual.com.br/pt_br?Dados&area=ver&id=$1",
),
"P4737": (
"Solomon R. Guggenheim Foundation artwork ID",
"https://www.guggenheim.org/artwork/$1",
),
"P4738": (
"Yale Center for British Art artwork ID",
"http://collections.britishart.yale.edu/vufind/Record/$1",
),
"P4739": (
"Musée des Augustins artwork ID",
"https://www.augustins.org/fr/oeuvre/-/oeuvre/$1",
),
"P4740": (
"Brooklyn Museum artwork ID",
"https://www.brooklynmuseum.org/opencollection/objects/$1",
),
"P4761": ("Images d'Art artwork ID", "http://art.rmngp.fr/en/library/artworks/$1"),
"P4764": (
"Arcade artwork ID",
"http://www.culture.gouv.fr/public/mistral/arcade_fr?ACTION=CHERCHER&FIELD_1=REF&VALUE_1=$1",
),
"P4814": (
"Inventories of American Painting and Sculpture control number",
"https://siris-artinventories.si.edu/ipac20/ipac.jsp?&menu=search&index=.NW&term=$1",
),
"P4905": ("KMSKA work PID", "http://kmska.be/collection/work/id/$1"),
"P5210": (
"National Gallery of Armenia work ID",
"http://www.gallery.am/en/database/item/$1/",
),
"P5223": (
"Information Center for Israeli Art artwork ID",
"http://museum.imj.org.il/artcenter/includes/item.asp?id=$1",
),
"P5265": (
"Dordrechts Museum artwork ID",
"https://www.dordrechtsmuseum.nl/objecten/id/$1",
),
"P5268": ("MNAV work ID", "http://acervo.mnav.gub.uy/obras.php?q=ni:$1"),
"P5269": ("Web umenia work ID", "https://www.webumenia.sk/dielo/$1"),
"P5407": ("MHK object ID", "http://datenbank.museum-kassel.de/$1"),
"P5499": ("Boijmans work ID", "https://www.boijmans.nl/en/collection/artworks/$1"),
"P5783": ("Cranach Digital Archive artwork ID", "http://lucascranach.org/$1"),
"P5823": ("Belvedere object ID", "https://digital.belvedere.at/objects/$1/"),
"P5891": ("Bpk-ID", "http://www.bpk-images.de/id/$1"),
"P6004": (
"Brasiliana Iconográfica ID",
"https://www.brasilianaiconografica.art.br/obras/$1/wd",
),
"P6007": ("Salons ID", "http://salons.musee-orsay.fr/index/notice/$1"),
"P6020": ("d'Art d'Art ! ID", "https://www.france.tv/france-2/d-art-d-art/$1.html"),
"P6141": (
"À nos grands hommes ID",
"https://anosgrandshommes.musee-orsay.fr/index.php/Detail/objects/$1",
),
"P6152": (
"National Portrait Gallery (United States) object ID",
"http://npg.si.edu/object/npg_$1",
),
"P6238": (
"Monument aux morts ID",
"https://monumentsmorts.univ-lille.fr/monument/$1/wd/",
),
"P6239": (
"IEC commemorative monument of Catalonia ID",
"https://monuments.iec.cat/fitxa.asp?id=$1",
),
"P6246": (
"Paris Musées work ID",
"http://parismuseescollections.paris.fr/en/node/$1",
),
"P6310": ("Muséosphère work ID", "http://museosphere.paris.fr/oeuvres/$1"),
"P6332": ("Panorama de l'art ID", "https://www.panoramadelart.com/$1"),
"P6355": ("MNAM artwork ID", "https://collection.centrepompidou.fr/#/artwork/$1"),
"P6356": (
"IHOI work ID",
"http://www.ihoi.org/app/photopro.sk/ihoi_icono/detail?docid=$1&lang=eng",
),
"P6358": (
"Musée Picasso artwork ID",
"https://www.navigart.fr/picassoparis/#/artwork/$1",
),
"P6372": (
"Interpol WOA artwork ID (OBSOLETE)",
"https://www.interpol.int/notice/search/woa/$1",
),
"P6374": (
"MAMVP artwork ID",
"http://www.mam.paris.fr/en/online-collections#/artwork/$1",
),
"P6489": (
"Joan Miró Online Image Bank ID",
"https://www.successiomiro.com/catalogue/object/$1",
),
"P6506": ("Eliseu Visconti Project ID", "https://eliseuvisconti.com.br/obra/$1"),
"P6565": (
"Musenor artwork ID",
"https://webmuseo.com/ws/musenor/app/collection/record/$1",
),
"P6576": (
"Art Fund artwork ID",
"https://www.artfund.org/supporting-museums/art-weve-helped-buy/artwork/$1/wd",
),
"P6595": (
"Paintings by Salvador Dalí ID",
"https://www.salvador-dali.org/en/artwork/catalogue-raisonne/obra/$1/",
),
"P6610": ("Ashmolean museum ID", "http://collections.ashmolean.org/object/$1"),
"P6625": (
"Salvador Dali Museum ID",
"http://archive.thedali.org/mwebcgi/mweb.exe?request=record;id=$1;type=101",
),
"P6629": ("Artcurial lot ID", "https://www.artcurial.com/en/$1"),
"P6631": ("Tainacan MHN ID", "http://mhn.acervos.museus.gov.br/reserva-tecnica/$1"),
"P6633": ("Cini Foundation ID", "http://arte.cini.it/Opere/$1"),
"P6643": ("TV Spielfilm series ID", "https://www.tvspielfilm.de/serien/$1"),
"P6738": (
"Whitney Museum of American Art artwork ID",
"https://whitney.org/collection/works/$1",
),
"P7229": (
"Fundación Goya en Aragón ID",
"https://fundaciongoyaenaragon.es/obra/wd/$1",
),
}
def lookup(property_id, value):
def lookup(property_id: str, value: str) -> dict[str, str]:
"""Lookup property ID."""
label, formatter = table[property_id]
url = formatter.replace('$1', value)
url = formatter.replace("$1", value)
return {
'label': label,
'url': url,
'value': value,
"label": label,
"url": url,
"value": value,
}
def find_catalog_id(entity):
return table.keys() & entity['claims'].keys()
def check_catalog(entity, catalog):
catalog_url = catalog['url']
catalog_ids = catalog['ids']
def find_catalog_id(entity: Entity) -> set[str]:
"""Find catalog property IDs that appear in entity claims."""
keys: set[str] = table.keys() & entity["claims"].keys()
return keys
if 'P4704' in entity['claims']:
saam_id = wikibase.first_datavalue(entity, 'P4704')
def check_catalog(entity: Entity, catalog: CatalogDict) -> None:
"""Check catalog."""
catalog_url = catalog["url"]
catalog_ids = catalog["ids"]
if "P4704" in entity["claims"]:
saam_id = wikibase.first_datavalue(entity, "P4704")
assert saam_id and isinstance(saam_id, (str, int))
cat = saam.get_catalog(saam_id)
if cat:
catalog.update(cat)
return
return None
if 'P4709' in entity['claims']:
catalog_id = wikibase.first_datavalue(entity, 'P4709')
if "P4709" in entity["claims"]:
catalog_id = wikibase.first_datavalue(entity, "P4709")
assert catalog_id and isinstance(catalog_id, (str, int))
cat = barnesfoundation.get_catalog(catalog_id)
if cat:
catalog.update(cat)
return
return None
institutions = [
('www.dia.org', dia),
('www.rijksmuseum.nl', rijksmuseum),
('www.npg.org.uk', npg),
('www.museodelprado.es', museodelprado),
("www.dia.org", dia),
("www.rijksmuseum.nl", rijksmuseum),
("www.npg.org.uk", npg),
("www.museodelprado.es", museodelprado),
]
if catalog_url:
@ -151,25 +349,26 @@ def check_catalog(entity, catalog):
if not cat:
continue
catalog.update(cat)
return
return None
try:
html = get_catalog_url(catalog_url)
if html:
description = get_description_from_page(html)
if description:
catalog['description'] = description
return
catalog["description"] = description
return None
except UnicodeDecodeError:
return
return None
for property_id in sorted(catalog_ids):
if property_id == 'P350':
if property_id == "P350":
continue # RKDimages ID
value = wikibase.first_datavalue(entity, property_id)
# identifier can be 'no value', example: Q26754456
if value is None:
continue
assert isinstance(value, str)
detail = lookup(property_id, value)
try:
html = get_catalog_page(property_id, value)
@ -181,11 +380,13 @@ def check_catalog(entity, catalog):
if not description:
continue
catalog = {
'institution': detail['label'],
'description': description,
"institution": detail["label"],
"description": description,
}
def get_catalog_from_artwork(entity):
def get_catalog_from_artwork(entity: Entity) -> CatalogDict:
"""Get catalog from artwork."""
catalog_ids = find_catalog_id(entity)
catalog_detail = []
for property_id in sorted(catalog_ids):
@ -193,47 +394,54 @@ def get_catalog_from_artwork(entity):
# identifier can be 'no value', example: Q26754456
if value is None:
continue
assert isinstance(value, str)
detail = lookup(property_id, value)
catalog_detail.append(detail)
catalog = {
'url': wikibase.first_datavalue(entity, 'P973'),
'detail': catalog_detail,
'ids': catalog_ids,
url = wikibase.first_datavalue(entity, "P973")
assert isinstance(url, str)
catalog: CatalogDict = {
"url": url,
"detail": catalog_detail,
"ids": catalog_ids,
}
try:
check_catalog(entity, catalog)
except (requests.exceptions.ReadTimeout,
requests.exceptions.ConnectTimeout,
requests.exceptions.ConnectionError,
requests.exceptions.TooManyRedirects):
except (
requests.exceptions.ReadTimeout,
requests.exceptions.ConnectTimeout,
requests.exceptions.ConnectionError,
requests.exceptions.TooManyRedirects,
):
pass
return catalog
def get_description_from_page(html):
def get_description_from_page(html: bytes) -> str | None:
"""Check HTML for description of artwork."""
if not html:
return
return None
root = lxml.html.fromstring(html)
div = root.find('.//div[@itemprop="description"]')
if div is not None:
return div.text
div_list = root.find_class('item-description')
div_list = root.find_class("item-description")
if len(div_list):
return div_list[0].text_content()
meta_twitter_description = root.find('.//meta[@name="twitter:description"]')
if meta_twitter_description is None:
return
twitter_description = meta_twitter_description.get('content')
return None
twitter_description = meta_twitter_description.get("content")
if not twitter_description:
return
return None
twitter_description = twitter_description.strip()
if not twitter_description:
return
return None
for element in root.getiterator():
if not element.text:
@ -246,33 +454,35 @@ def get_description_from_page(html):
return twitter_description
def get_catalog_page(property_id, value):
def get_catalog_page(property_id: str, value: str) -> bytes:
"""Get catalog page."""
detail = lookup(property_id, value)
url = detail['url']
catalog_id = value.replace('/', '_')
url = detail["url"]
catalog_id = value.replace("/", "_")
filename = f'cache/{property_id}_{catalog_id}.html'
filename = f"cache/{property_id}_{catalog_id}.html"
if os.path.exists(filename):
html = open(filename, 'rb').read()
html = open(filename, "rb").read()
else:
r = requests.get(url, headers={'User-Agent': user_agent}, timeout=2)
r = requests.get(url, headers={"User-Agent": user_agent}, timeout=2)
html = r.content
open(filename, 'wb').write(html)
open(filename, "wb").write(html)
return html
def get_catalog_url(url):
md5_filename = hashlib.md5(url.encode('utf-8')).hexdigest() + '.html'
filename = 'cache/' + md5_filename
def get_catalog_url(url: str) -> bytes:
"""Get catalog URL and cache."""
md5_filename = hashlib.md5(url.encode("utf-8")).hexdigest() + ".html"
filename = "cache/" + md5_filename
if os.path.exists(filename):
html = open(filename, 'rb').read()
html = open(filename, "rb").read()
else:
r = relaxed_ssl.get(url,
headers={'User-Agent': user_agent},
timeout=2)
r = relaxed_ssl.get(url, headers={"User-Agent": user_agent}, timeout=2)
html = r.content
open(filename, 'wb').write(html)
open(filename, "wb").write(html)
return html

View file

@ -1,59 +1,93 @@
import requests
import json
import urllib.parse
import os
import dateutil.parser
"""Wikidata query service."""
import hashlib
from flask import request, render_template, g
import json
import os
import typing
import urllib.parse
from collections import defaultdict
from datetime import datetime
from .model import WikidataQuery
from . import utils, database
query_url = 'https://query.wikidata.org/bigdata/namespace/wdq/sparql'
url_start = 'http://www.wikidata.org/entity/Q'
commons_start = 'http://commons.wikimedia.org/wiki/Special:FilePath/'
import dateutil.parser
import requests
from flask import g, render_template, request
from . import database, utils
from .model import WikidataQuery
query_url = "https://query.wikidata.org/bigdata/namespace/wdq/sparql"
url_start = "http://www.wikidata.org/entity/Q"
commons_start = "http://commons.wikimedia.org/wiki/Special:FilePath/"
Row = dict[str, dict[str, typing.Any]]
class QueryError(Exception):
def __init__(self, query, r):
"""Query error."""
query: str
r: requests.Response
def __init__(self, query: str, r: requests.Response) -> None:
"""Init."""
self.query = query
self.r = r
class QueryTimeout(QueryError):
def __init__(self, query, r):
"""Query timeout."""
def __init__(self, query: str, r: requests.Response) -> None:
"""Init."""
self.query = query
self.r = r
def row_id(row, field='item'):
return int(utils.drop_start(row[field]['value'], url_start))
def get_row_value(row, field):
return row[field]['value'] if field in row else None
def row_id(row: Row, field: str = "item") -> int:
"""Get item_id for row."""
return int(utils.drop_start(row[field]["value"], url_start))
def get_row_text(row, field):
if field in row and 'xml:lang' in row[field]:
return row[field]['value']
def commons_uri_to_filename(uri):
def get_row_value(row: Row, field: str) -> str | None:
"""Get value from row field."""
return typing.cast(str, row[field]["value"]) if field in row else None
def get_row_text(row: Row, field: str) -> str | None:
"""Get text from row field."""
return row[field]["value"] if field in row and "xml:lang" in row[field] else None
def commons_uri_to_filename(uri: str) -> str:
"""Commons URI to filename."""
return urllib.parse.unquote(utils.drop_start(uri, commons_start))
def run_from_template(template_name, **context):
def run_from_template(template_name: str, **context: typing.Any) -> requests.Response:
query = render_template(template_name, **context)
return run_query(query, query_template=template_name)
def run_from_template_with_cache(template_name, cache_name=None, **context):
def run_from_template_with_cache(
template_name: str, cache_name: str | None = None, **context: typing.Any
) -> list[Row]:
query = render_template(template_name, **context)
return run_query_with_cache(query, name=cache_name, query_template=template_name)
def run_query(query, **kwargs):
def run_query(query: str, **kwargs: typing.Any) -> requests.Response:
"""Run WDQS query."""
r, db_query = record_query(query, **kwargs)
return r
def record_query(query, query_template=None):
params = {'query': query, 'format': 'json'}
def record_query(
query: str, query_template: str | None = None
) -> tuple[requests.Response, WikidataQuery]:
params = {"query": query, "format": "json"}
start = datetime.utcnow()
path = request.full_path.rstrip('?') if request else None
path = request.full_path.rstrip("?") if request else None
endpoint = request.endpoint if request else None
db_query = WikidataQuery(
@ -61,8 +95,9 @@ def record_query(query, query_template=None):
sparql_query=query,
path=path,
query_template=query_template,
page_title=getattr(g, 'title', None),
endpoint=endpoint)
page_title=getattr(g, "title", None),
endpoint=endpoint,
)
database.session.add(db_query)
database.session.commit()
@ -74,78 +109,84 @@ def record_query(query, query_template=None):
db_query.error_text = r.text
database.session.commit()
if 'java.util.concurrent.TimeoutException' in r.text:
raise QueryTimeout(params, r)
if "java.util.concurrent.TimeoutException" in r.text:
raise QueryTimeout(query, r)
else:
raise QueryError(params, r)
raise QueryError(query, r)
database.session.commit()
return r, db_query
def md5_query(query):
''' generate the md5 hexdigest of a SPARQL query '''
return hashlib.md5(query.encode('utf-8')).hexdigest()
def run_query_with_cache(q, name=None, query_template=None):
def md5_query(query: str) -> str:
"""generate the md5 hexdigest of a SPARQL query."""
return hashlib.md5(query.encode("utf-8")).hexdigest()
def run_query_with_cache(
q: str, name: str | None = None, query_template: str | None = None
) -> list[Row]:
if name is None:
name = md5_query(q)
filename = f'cache/{name}.json'
filename = f"cache/{name}.json"
if os.path.exists(filename):
from_cache = json.load(open(filename))
if isinstance(from_cache, dict) and from_cache.get('query') == q:
return from_cache['bindings']
if isinstance(from_cache, dict) and from_cache.get("query") == q:
return typing.cast(list[Row], from_cache["bindings"])
r, db_query = record_query(q, query_template=query_template)
bindings = r.json()['results']['bindings']
json.dump({'query': q, 'bindings': bindings},
open(filename, 'w'), indent=2)
bindings: list[Row] = r.json()["results"]["bindings"]
json.dump({"query": q, "bindings": bindings}, open(filename, "w"), indent=2)
db_query.row_count = len(bindings)
database.session.commit()
return bindings
def format_time(row_time, row_timeprecision):
t = dateutil.parser.parse(row_time['value'])
precision = int(row_timeprecision['value'])
def format_time(row_time: dict[str, str], row_timeprecision: dict[str, int]) -> str:
"""Format time with given precision."""
t = dateutil.parser.parse(row_time["value"])
precision = int(row_timeprecision["value"])
if precision == 9:
return t.year
return str(t.year)
if precision == 8:
return f'{t.year}s'
return f"{t.year}s"
if precision == 7:
return f'{utils.ordinal((t.year // 100) + 1)} century'
return f"{utils.ordinal((t.year // 100) + 1)} century"
if precision == 6:
return f'{utils.ordinal((t.year // 1000) + 1)} millennium'
return f"{utils.ordinal((t.year // 1000) + 1)} millennium"
return row_time['value']
return row_time["value"]
def build_browse_item_map(bindings):
def build_browse_item_map(bindings: list[Row]) -> dict[int, dict[str, typing.Any]]:
row_map = defaultdict(list)
for row in bindings:
item_id = row_id(row)
label = row['itemLabel']['value']
image_filename = commons_uri_to_filename(row['image']['value'])
label = row["itemLabel"]["value"]
image_filename = commons_uri_to_filename(row["image"]["value"])
artist_name = get_row_value(row, 'artistLabel')
artist_name = get_row_value(row, "artistLabel")
d = format_time(row['time'], row['timeprecision']) if 'time' in row else None
row_qid = f'Q{item_id}'
d = format_time(row["time"], row["timeprecision"]) if "time" in row else None
row_qid = f"Q{item_id}"
item = {
'image_filename': image_filename,
'date': d,
'depicts': row['depictsList']['value'].split('|'),
"image_filename": image_filename,
"date": d,
"depicts": row["depictsList"]["value"].split("|"),
}
if artist_name:
item['artist_name'] = artist_name
item["artist_name"] = artist_name
if label != row_qid:
item['label'] = label
item["label"] = label
title = get_row_value(row, 'title')
title = get_row_value(row, "title")
if title:
lang = get_row_value(row, 'titleLang')
item['title'] = (lang, title)
lang = get_row_value(row, "titleLang")
item["title"] = (lang, title)
row_map[item_id].append(item)
@ -158,53 +199,59 @@ def build_browse_item_map(bindings):
when = None
depicts = []
for item in items:
if 'title' in item:
lang, title = item['title']
if "title" in item:
lang, title = item["title"]
titles[lang] = title
filenames.add(item['image_filename'])
artist_name = item.get('artist_name')
filenames.add(item["image_filename"])
artist_name = item.get("artist_name")
if artist_name and artist_name not in artist_names:
artist_names.append(artist_name)
if 'label' in item:
labels.add(item['label'])
if when is None and item.get('date'):
when = item['date']
for d in item['depicts']:
if "label" in item:
labels.add(item["label"])
if when is None and item.get("date"):
when = item["date"]
for d in item["depicts"]:
if d not in depicts:
depicts.append(d)
item = {
'qid': f'Q{item_id}',
'item_id': item_id,
'image_filename': list(filenames),
'artist_name': ', '.join(artist_names),
'date': when,
'depicts': depicts,
"qid": f"Q{item_id}",
"item_id": item_id,
"image_filename": list(filenames),
"artist_name": ", ".join(artist_names),
"date": when,
"depicts": depicts,
}
if artist_names:
item['artist_name'] = ', '.join(artist_names)
item["artist_name"] = ", ".join(artist_names)
if labels:
assert len(labels) == 1
item['label'] = list(labels)[0]
elif 'en' in titles:
item['label'] = titles['en']
item["label"] = list(labels)[0]
elif "en" in titles:
item["label"] = titles["en"]
else:
item['label'] = '[ label missing ]'
item["label"] = "[ label missing ]"
item_map[item_id] = item
return item_map
def quote_list(l):
no_dups = list(dict.fromkeys(l)) # remove duplicates
return ' '.join('("' + s.replace('"', '\\"') + '")' for s in no_dups)
def url_list(l):
no_dups = list(dict.fromkeys(l)) # remove duplicates
return ' '.join(f'(<{s}>)' for s in no_dups)
def quote_list(list_of_strings: list[str]) -> str:
"""Quote strings and combine into list for SPARQL query."""
no_dups = list(dict.fromkeys(list_of_strings)) # remove duplicates
return " ".join('("' + s.replace('"', '\\"') + '")' for s in no_dups)
def is_artificial_physical_object(qid):
bindings = run_from_template_with_cache('query/item_type.sparql', qid=qid)
types = {row_id(row, field='item') for row in bindings}
def url_list(urls: list[str]) -> str:
"""Combine URLs into list for SPARQL query."""
no_dups = list(dict.fromkeys(urls)) # remove duplicates
return " ".join(f"(<{s}>)" for s in no_dups)
def is_artificial_physical_object(qid: str) -> bool:
"""Item is artificial physical object."""
bindings = run_from_template_with_cache("query/item_type.sparql", qid=qid)
types = {row_id(row, field="item") for row in bindings}
# Q8205328 == artificial physical object
return 8205328 in types

View file

@ -1,25 +1,51 @@
def first_datavalue(entity, pid):
if pid in entity['claims']:
mainsnak = entity['claims'][pid][0]['mainsnak']
if 'datavalue' in mainsnak:
return mainsnak['datavalue']['value']
"""Wikibase functions."""
def get_entity_label(entity):
if 'labels' not in entity:
import typing
from .type import Entity
def first_datavalue(
entity: Entity, pid: str
) -> str | int | None | dict[str, typing.Any]:
"""Get first datavalue from claim."""
if pid in entity["claims"]:
mainsnak = entity["claims"][pid][0]["mainsnak"]
if "datavalue" in mainsnak:
v = mainsnak["datavalue"]["value"]
assert isinstance(v, str | int)
return v
return None
def get_entity_label(entity: Entity) -> str | None:
"""Get entity label."""
if "labels" not in entity:
return None
if 'en' in entity['labels']:
return entity['labels']['en']['value']
label: str
if "en" in entity["labels"]:
label = entity["labels"]["en"]["value"]
assert isinstance(label, str)
return label
label_values = {l['value'] for l in entity['labels'].values()}
label_values = {lang["value"] for lang in entity["labels"].values()}
if len(label_values) == 1:
return list(label_values)[0]
label = list(label_values)[0]
assert isinstance(label, str)
return label
return None
def get_en_value(entity, key):
if 'en' in entity[key]:
return entity[key]['en']['value']
def get_en_label(entity):
return get_en_value(entity, 'labels')
def get_en_value(entity: Entity, key: str) -> str | None:
"""Get English value from label or description."""
return entity[key]["en"]["value"] if "en" in entity[key] else None # type: ignore
def get_en_description(entity):
return get_en_value(entity, 'descriptions')
def get_en_label(entity: Entity) -> str | None:
"""Get English label."""
return get_en_value(entity, "labels")
def get_en_description(entity: Entity) -> str | None:
"""Get English description of enttity."""
return get_en_value(entity, "descriptions")

View file

@ -1,17 +1,24 @@
"""Make an edit to Wikidata."""
from depicts import mediawiki, wikibase
from depicts.model import DepictsItem
def create_depicts_item(item_id):
qid = f'Q{item_id}'
entity = mediawiki.get_entity(qid)
if 'en' in entity['aliases']:
alt_labels = {alt['value'] for alt in entity['aliases']['en']}
def create_depicts_item(item_id: int) -> DepictsItem:
"""Create depicts item."""
qid = f"Q{item_id}"
entity = mediawiki.get_entity(qid)
assert entity
if "en" in entity["aliases"]:
alt_labels = {alt["value"] for alt in entity["aliases"]["en"]}
else:
alt_labels = set()
return DepictsItem(item_id=item_id,
label=wikibase.get_en_label(entity),
description=wikibase.get_en_description(entity),
alt_labels=alt_labels,
count=0)
return DepictsItem(
item_id=item_id,
label=wikibase.get_en_label(entity),
description=wikibase.get_en_description(entity),
alt_labels=alt_labels,
count=0,
)

View file

@ -1,67 +1,93 @@
from flask import current_app, session
from requests_oauthlib import OAuth1Session
import typing
from urllib.parse import urlencode
def get_edit_proxy():
edit_proxy = current_app.config.get('EDIT_PROXY')
import requests
from flask import current_app, session
from requests_oauthlib import OAuth1Session
from .type import CallParams
def get_edit_proxy() -> dict[str, str]:
edit_proxy = current_app.config.get("EDIT_PROXY")
if edit_proxy:
return {'http': edit_proxy, 'https': edit_proxy}
return {"http": edit_proxy, "https": edit_proxy}
else:
return {}
def api_post_request(params):
def api_post_request(params: dict[str, str | int]) -> requests.Response:
app = current_app
url = 'https://www.wikidata.org/w/api.php'
client_key = app.config['CLIENT_KEY']
client_secret = app.config['CLIENT_SECRET']
oauth = OAuth1Session(client_key,
client_secret=client_secret,
resource_owner_key=session['owner_key'],
resource_owner_secret=session['owner_secret'])
url = "https://www.wikidata.org/w/api.php"
client_key = app.config["CLIENT_KEY"]
client_secret = app.config["CLIENT_SECRET"]
oauth = OAuth1Session(
client_key,
client_secret=client_secret,
resource_owner_key=session["owner_key"],
resource_owner_secret=session["owner_secret"],
)
proxies = get_edit_proxy()
return oauth.post(url, data=params, timeout=4, proxies=proxies)
r: requests.Response = oauth.post(url, data=params, timeout=4, proxies=proxies)
return r
def raw_request(params):
def raw_request(params: dict[str, str | int]) -> requests.Response:
app = current_app
url = 'https://www.wikidata.org/w/api.php?' + urlencode(params)
client_key = app.config['CLIENT_KEY']
client_secret = app.config['CLIENT_SECRET']
oauth = OAuth1Session(client_key,
client_secret=client_secret,
resource_owner_key=session['owner_key'],
resource_owner_secret=session['owner_secret'])
url = "https://www.wikidata.org/w/api.php?" + urlencode(params)
client_key = app.config["CLIENT_KEY"]
client_secret = app.config["CLIENT_SECRET"]
oauth = OAuth1Session(
client_key,
client_secret=client_secret,
resource_owner_key=session["owner_key"],
resource_owner_secret=session["owner_secret"],
)
proxies = get_edit_proxy()
return oauth.get(url, timeout=4, proxies=proxies)
r: requests.Response = oauth.get(url, timeout=4, proxies=proxies)
return r
def api_request(params):
return raw_request(params).json()
def get_token():
params = {
'action': 'query',
'meta': 'tokens',
'format': 'json',
'formatversion': 2,
def api_request(params: dict[str, str | int]) -> dict[str, typing.Any]:
data: dict[str, typing.Any] = raw_request(params).json()
return data
def get_token() -> str:
params: CallParams = {
"action": "query",
"meta": "tokens",
"format": "json",
"formatversion": 2,
}
reply = api_request(params)
token = reply['query']['tokens']['csrftoken']
token: str = reply["query"]["tokens"]["csrftoken"]
return token
def userinfo_call():
params = {'action': 'query', 'meta': 'userinfo', 'format': 'json'}
def userinfo_call() -> dict[str, typing.Any]:
params: dict[str, str | int] = {
"action": "query",
"meta": "userinfo",
"format": "json",
}
return api_request(params)
def get_username():
if 'owner_key' not in session:
return # not authorized
if 'username' in session:
return session['username']
def get_username() -> str | None:
if "owner_key" not in session:
return None # not authorized
username: str
if "username" in session:
username = session["username"]
return username
reply = userinfo_call()
if 'query' not in reply:
return
session['username'] = reply['query']['userinfo']['name']
if "query" not in reply:
return None
username = reply["query"]["userinfo"]["name"]
session["username"] = username
return session['username']
return username