1281 lines
37 KiB
Python
Executable file
1281 lines
37 KiB
Python
Executable file
#!/usr/bin/python3
|
|
|
|
import hashlib
|
|
import itertools
|
|
import json
|
|
import os
|
|
import re
|
|
import socket
|
|
import typing
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
from typing import Iterable, Mapping
|
|
|
|
import requests.exceptions
|
|
import simplejson.errors
|
|
from flask import (
|
|
Flask,
|
|
g,
|
|
jsonify,
|
|
redirect,
|
|
render_template,
|
|
request,
|
|
session,
|
|
url_for,
|
|
)
|
|
from requests_oauthlib import OAuth1Session # type: ignore
|
|
from sqlalchemy import distinct, func
|
|
from sqlalchemy.orm import aliased
|
|
from sqlalchemy.sql.expression import desc
|
|
from werkzeug.wrappers import Response
|
|
|
|
from depicts import (
|
|
artwork,
|
|
commons,
|
|
database,
|
|
fixtures,
|
|
human,
|
|
mail,
|
|
mediawiki,
|
|
utils,
|
|
wd_catalog,
|
|
wdqs,
|
|
wikibase,
|
|
wikidata_edit,
|
|
wikidata_oauth,
|
|
)
|
|
from depicts.error_mail import setup_error_mail
|
|
from depicts.model import (
|
|
DepictsItem,
|
|
DepictsItemAltLabel,
|
|
Edit,
|
|
Item,
|
|
Language,
|
|
Triple,
|
|
WikidataQuery,
|
|
)
|
|
from depicts.pager import Pagination, init_pager
|
|
from depicts.type import Entity
|
|
|
|
user_agent = "Mozilla/5.0 (X11; Linux i586; rv:32.0) Gecko/20160101 Firefox/32.0"
|
|
|
|
app = Flask(__name__)
|
|
app.config.from_object("config.default")
|
|
database.init_db(app.config["DB_URL"])
|
|
init_pager(app)
|
|
setup_error_mail(app)
|
|
|
|
find_more_props = {
|
|
"P135": "movement",
|
|
"P136": "genre",
|
|
"P170": "artist",
|
|
"P195": "collection",
|
|
"P276": "location",
|
|
"P495": "country of origin",
|
|
"P127": "owned by",
|
|
"P179": "part of the series",
|
|
"P921": "main subject",
|
|
"P186": "material used",
|
|
"P88": "commissioned by",
|
|
"P1028": "donated by",
|
|
"P1071": "location of final assembly",
|
|
"P138": "named after",
|
|
"P1433": "published in",
|
|
"P144": "based on",
|
|
"P2079": "fabrication method",
|
|
"P2348": "time period",
|
|
"P361": "part of",
|
|
"P608": "exhibition history",
|
|
"P180": "depicts",
|
|
"P31": "instance of",
|
|
# possible future props
|
|
# 'P571': 'inception',
|
|
# 'P166': 'award received', (only 2)
|
|
# 'P1419': 'shape', (only 2)
|
|
# 'P123': 'publisher', (only 1)
|
|
}
|
|
|
|
isa_list = [
|
|
"Q60520", # sketchbook
|
|
"Q93184", # drawing
|
|
"Q3305213", # painting
|
|
"Q15123870", # lithograph
|
|
"Q18761202", # watercolor painting
|
|
"Q79218", # triptych
|
|
"Q2647254", # study
|
|
"Q46686", # reredos
|
|
]
|
|
|
|
re_qid = re.compile(r"^Q(\d+)")
|
|
re_pid = re.compile(r"^P(\d+)")
|
|
|
|
|
|
@app.teardown_appcontext
|
|
def shutdown_session(exception: Exception | None = None) -> None:
|
|
database.session.remove() # type:ignore
|
|
|
|
|
|
@app.template_global()
|
|
def set_url_args(endpoint: str | None = None, **new_args: str) -> str:
|
|
if endpoint is None:
|
|
endpoint = request.endpoint
|
|
assert endpoint and request.view_args is not None
|
|
args = request.view_args.copy()
|
|
args.update(request.args)
|
|
args.update(new_args)
|
|
args = {k: v for k, v in args.items() if v is not None}
|
|
return url_for(endpoint, **args)
|
|
|
|
|
|
@app.template_global()
|
|
def current_url() -> str:
|
|
"""Get current URL."""
|
|
assert request and request.view_args and request.endpoint
|
|
args = request.view_args.copy()
|
|
args.update(request.args)
|
|
return url_for(request.endpoint, **args)
|
|
|
|
|
|
@app.before_request
|
|
def init_profile() -> None:
|
|
g.profiling = []
|
|
|
|
|
|
@app.before_request
|
|
def global_user() -> None:
|
|
g.user = wikidata_oauth.get_username()
|
|
|
|
|
|
def check_for_blocks() -> None:
|
|
if hasattr(g, "server_ip"): # already done
|
|
return
|
|
hostname = app.config.get("HOSTNAME")
|
|
if not hostname:
|
|
return
|
|
g.server_ip = socket.gethostbyname(hostname)
|
|
try:
|
|
g.local_blocks = mediawiki.get_list("blocks", bkip=g.server_ip)
|
|
g.global_blocks = mediawiki.get_list("globalblocks", bgip=g.server_ip)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
@app.before_request
|
|
def get_blocks() -> None:
|
|
if app.config.get("SHOW_BLOCK_ALERT") is not False:
|
|
check_for_blocks()
|
|
|
|
|
|
@app.route("/find_more_setting")
|
|
def flip_find_more() -> str:
|
|
session["no_find_more"] = not session.get("no_find_more")
|
|
display = {True: "on", False: "off"}[not session["no_find_more"]]
|
|
|
|
return "flipped. find more is " + display
|
|
|
|
|
|
def existing_edit(item_id: int, depicts_id: int) -> bool:
|
|
q = Edit.query.filter_by(artwork_id=item_id, depicts_id=depicts_id) # type: ignore
|
|
return bool(q.count() != 0)
|
|
|
|
|
|
@app.route("/save/Q<int:item_id>", methods=["POST"])
|
|
def save(item_id: int) -> str | Response:
|
|
depicts = request.form.getlist("depicts")
|
|
username = wikidata_oauth.get_username()
|
|
assert username
|
|
|
|
token = wikidata_oauth.get_token()
|
|
|
|
artwork_item = Item.query.get(item_id) # type: ignore
|
|
if artwork_item is None:
|
|
artwork_entity = mediawiki.get_entity_with_cache(f"Q{item_id}")
|
|
artwork_item = Item(
|
|
item_id=item_id, entity=typing.cast(dict[str, str], artwork_entity)
|
|
)
|
|
database.session.add(artwork_item)
|
|
database.session.commit()
|
|
|
|
for depicts_qid in depicts:
|
|
depicts_id = int(depicts_qid[1:])
|
|
|
|
depicts_item = DepictsItem.query.get(depicts_id) # type: ignore
|
|
if depicts_item is None:
|
|
depicts_item = wikidata_edit.create_depicts_item(depicts_id)
|
|
database.session.add(depicts_item)
|
|
database.session.commit()
|
|
|
|
for depicts_qid in depicts:
|
|
depicts_id = int(depicts_qid[1:])
|
|
if existing_edit(item_id, depicts_id):
|
|
continue
|
|
|
|
r = create_claim(item_id, depicts_id, token)
|
|
try:
|
|
reply = r.json()
|
|
except simplejson.errors.JSONDecodeError:
|
|
mail.send_mail("depicts save error", r.text)
|
|
raise
|
|
|
|
save_error = reply.get("error")
|
|
if save_error:
|
|
mail.send_mail("depicts save error", r.text)
|
|
return render_template("save_error.html", error=save_error)
|
|
|
|
saved = r.json()
|
|
lastrevid = saved["pageinfo"]["lastrevid"]
|
|
assert saved["success"] == 1
|
|
edit = Edit(
|
|
username=username,
|
|
artwork_id=item_id,
|
|
depicts_id=depicts_id,
|
|
lastrevid=lastrevid,
|
|
)
|
|
database.session.add(edit)
|
|
database.session.commit()
|
|
|
|
return redirect(url_for("next_page", item_id=item_id))
|
|
|
|
|
|
@app.route("/settings", methods=["GET", "POST"])
|
|
def user_settings() -> str:
|
|
return render_template("user_settings.html")
|
|
|
|
|
|
@app.route("/test/lookup")
|
|
def test_lookup_page() -> str:
|
|
return render_template("test_lookup.html")
|
|
|
|
|
|
@app.route("/property/P<int:property_id>")
|
|
def property_query_page(property_id: int) -> str:
|
|
pid = f"P{property_id}"
|
|
g.title = find_more_props[pid]
|
|
sort = request.args.get("sort")
|
|
sort_by_name = sort and sort.lower().strip() == "name"
|
|
|
|
q = (
|
|
database.session.query( # type: ignore
|
|
Triple.object_id, func.count(func.distinct(Triple.subject_id)).label("c")
|
|
)
|
|
.filter_by(predicate_id=property_id)
|
|
.join(Item, Item.item_id == Triple.subject_id)
|
|
.filter_by(is_artwork=True)
|
|
.group_by(Triple.object_id)
|
|
.order_by(desc("c"))
|
|
)
|
|
|
|
page = utils.get_int_arg("page") or 1
|
|
total = q.count()
|
|
page_size = 100
|
|
pager = Pagination(page, page_size, total)
|
|
|
|
page_hits = pager.slice(q)
|
|
|
|
labels = get_labels_db({f"Q{object_id}" for object_id, c in page_hits})
|
|
|
|
hits = []
|
|
for object_id, count in page_hits:
|
|
qid = f"Q{object_id}"
|
|
hits.append(
|
|
{"qid": qid, "label": labels.get(qid) or "[item missing]", "count": count}
|
|
)
|
|
|
|
return render_template(
|
|
"property.html",
|
|
label=g.title,
|
|
order=("name" if sort_by_name else "count"),
|
|
pid=pid,
|
|
page=page,
|
|
pager=pager,
|
|
hits=hits,
|
|
)
|
|
|
|
|
|
@app.route("/")
|
|
def start() -> Response:
|
|
return random_artwork()
|
|
|
|
|
|
@app.route("/next")
|
|
def random_artwork() -> Response:
|
|
found = None
|
|
while True:
|
|
q = (
|
|
Item.query.filter_by(is_artwork=True) # type: ignore
|
|
.order_by(func.random())
|
|
.limit(30)
|
|
)
|
|
for item in q:
|
|
has_depicts = "P180" in item.entity["claims"]
|
|
if has_depicts:
|
|
continue
|
|
found = item
|
|
break
|
|
if found:
|
|
break
|
|
|
|
session[found.qid] = "from redirect"
|
|
return redirect(url_for("item_page", item_id=found.item_id))
|
|
|
|
|
|
@app.route("/oauth/start")
|
|
def start_oauth() -> Response:
|
|
next_page = request.args.get("next")
|
|
if next_page:
|
|
session["after_login"] = next_page
|
|
|
|
client_key = app.config["CLIENT_KEY"]
|
|
client_secret = app.config["CLIENT_SECRET"]
|
|
base_url = "https://www.wikidata.org/w/index.php"
|
|
request_token_url = base_url + "?title=Special%3aOAuth%2finitiate"
|
|
|
|
oauth = OAuth1Session(client_key, client_secret=client_secret, callback_uri="oob")
|
|
fetch_response = oauth.fetch_request_token(request_token_url)
|
|
|
|
session["owner_key"] = fetch_response.get("oauth_token")
|
|
session["owner_secret"] = fetch_response.get("oauth_token_secret")
|
|
|
|
base_authorization_url = "https://www.wikidata.org/wiki/Special:OAuth/authorize"
|
|
authorization_url = oauth.authorization_url(
|
|
base_authorization_url, oauth_consumer_key=client_key
|
|
)
|
|
return redirect(authorization_url)
|
|
|
|
|
|
@app.route("/oauth/callback", methods=["GET"])
|
|
def oauth_callback() -> Response:
|
|
base_url = "https://www.wikidata.org/w/index.php"
|
|
client_key = app.config["CLIENT_KEY"]
|
|
client_secret = app.config["CLIENT_SECRET"]
|
|
|
|
oauth = OAuth1Session(
|
|
client_key,
|
|
client_secret=client_secret,
|
|
resource_owner_key=session["owner_key"],
|
|
resource_owner_secret=session["owner_secret"],
|
|
)
|
|
|
|
oauth_response = oauth.parse_authorization_response(request.url)
|
|
verifier = oauth_response.get("oauth_verifier")
|
|
access_token_url = base_url + "?title=Special%3aOAuth%2ftoken"
|
|
oauth = OAuth1Session(
|
|
client_key,
|
|
client_secret=client_secret,
|
|
resource_owner_key=session["owner_key"],
|
|
resource_owner_secret=session["owner_secret"],
|
|
verifier=verifier,
|
|
)
|
|
|
|
oauth_tokens = oauth.fetch_access_token(access_token_url)
|
|
session["owner_key"] = oauth_tokens.get("oauth_token")
|
|
session["owner_secret"] = oauth_tokens.get("oauth_token_secret")
|
|
|
|
next_page = session.get("after_login")
|
|
return redirect(next_page) if next_page else random_artwork()
|
|
|
|
|
|
@app.route("/oauth/disconnect")
|
|
def oauth_disconnect() -> Response:
|
|
for key in "owner_key", "owner_secret", "username", "after_login":
|
|
if key in session:
|
|
del session[key]
|
|
return redirect(url_for("browse_page"))
|
|
|
|
|
|
def create_claim(artwork_id: int, depicts_id: int, token: str) -> requests.Response:
|
|
artwork_qid = f"Q{artwork_id}"
|
|
value = json.dumps({"entity-type": "item", "numeric-id": depicts_id})
|
|
params: dict[str, str | int] = {
|
|
"action": "wbcreateclaim",
|
|
"entity": artwork_qid,
|
|
"property": "P180",
|
|
"snaktype": "value",
|
|
"value": value,
|
|
"token": token,
|
|
"format": "json",
|
|
"formatversion": 2,
|
|
}
|
|
r: requests.Response = wikidata_oauth.api_post_request(params)
|
|
return r
|
|
|
|
|
|
def image_with_cache(qid: str, image_filename: str, width: int) -> dict[str, str]:
|
|
filename = f"cache/{qid}_{width}_image.json"
|
|
detail = json.load(open(filename)) if os.path.exists(filename) else {}
|
|
|
|
image_filename = image_filename.replace("_", " ")
|
|
|
|
# The image associated with an item can change.
|
|
# If that happens the detail in the cache will be for the wrong file.
|
|
if not detail or image_filename not in detail:
|
|
detail = commons.image_detail([image_filename], thumbwidth=width)
|
|
json.dump(detail, open(filename, "w"), indent=2)
|
|
|
|
image: dict[str, str] = detail.get(image_filename)
|
|
return image
|
|
|
|
|
|
def existing_depicts_from_entity(entity: Entity) -> list[dict[str, typing.Any]]:
|
|
if "P180" not in entity["claims"]:
|
|
return []
|
|
existing = []
|
|
new_depicts = False
|
|
for claim in entity["claims"]["P180"]:
|
|
if "datavalue" not in claim["mainsnak"]:
|
|
continue
|
|
|
|
item_id = claim["mainsnak"]["datavalue"]["value"]["numeric-id"]
|
|
|
|
item = DepictsItem.query.get(item_id) # type: ignore
|
|
if not item:
|
|
item = wikidata_edit.create_depicts_item(item_id)
|
|
database.session.add(item)
|
|
new_depicts = True
|
|
d = {
|
|
"label": item.label,
|
|
"description": item.description,
|
|
"qid": f"Q{item.item_id}",
|
|
"count": item.count,
|
|
"existing": True,
|
|
}
|
|
existing.append(d)
|
|
if new_depicts:
|
|
database.session.commit()
|
|
return existing
|
|
|
|
|
|
def get_institution(entity: Entity, other: Mapping[str, str | None]) -> str | None:
|
|
if "P276" in entity["claims"]:
|
|
location = wikibase.first_datavalue(entity, "P276")
|
|
assert isinstance(location, dict)
|
|
if location:
|
|
return typing.cast(str, other.get(location["id"]))
|
|
if "P195" in entity["claims"]:
|
|
collection = wikibase.first_datavalue(entity, "P195")
|
|
assert isinstance(collection, dict)
|
|
if collection:
|
|
return typing.cast(str, other.get(collection["id"]))
|
|
return None
|
|
|
|
|
|
@app.route("/item/Q<int:item_id>")
|
|
def item_page(item_id: int) -> str | Response:
|
|
qid = f"Q{item_id}"
|
|
g.qid = qid
|
|
item = artwork.Artwork(qid)
|
|
from_redirect = qid in session and session.pop(qid) == "from redirect"
|
|
entity = mediawiki.get_entity_with_cache(qid, refresh=not from_redirect)
|
|
assert entity
|
|
|
|
if "redirects" in entity:
|
|
redirect_to_item_id = int(entity["redirects"]["to"][1:])
|
|
assert request.endpoint
|
|
return redirect(url_for(request.endpoint, item_id=redirect_to_item_id))
|
|
|
|
existing_depicts = existing_depicts_from_entity(entity)
|
|
|
|
width = 1200
|
|
image_filename = item.image_filename
|
|
if image_filename:
|
|
image = image_with_cache(qid, image_filename, width)
|
|
else:
|
|
image = None
|
|
|
|
# hits = item.run_query()
|
|
label_and_language = get_entity_label_and_language(entity)
|
|
if label_and_language:
|
|
label = label_and_language["label"]
|
|
else:
|
|
label = None
|
|
g.label = label
|
|
other = get_other(item.entity)
|
|
|
|
people = human.from_name(label) if label else None
|
|
|
|
label_languages = label_and_language["languages"] if label_and_language else []
|
|
show_translation_links = all(lang.code != "en" for lang in label_languages)
|
|
|
|
artwork_item = Item.query.get(item_id)
|
|
if artwork_item is None:
|
|
if not wdqs.is_artificial_physical_object(qid):
|
|
return render_template(
|
|
"not_artwork.html",
|
|
qid=qid,
|
|
item_id=item_id,
|
|
item=item,
|
|
labels=find_more_props,
|
|
entity=item.entity,
|
|
username=g.user,
|
|
label=label,
|
|
label_languages=label_languages,
|
|
show_translation_links=show_translation_links,
|
|
image=image,
|
|
other=other,
|
|
title=item.display_title,
|
|
)
|
|
|
|
modified = datetime.strptime(entity["modified"], "%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
artwork_item = Item(
|
|
item_id=item_id,
|
|
entity=typing.cast(dict[str, str], entity),
|
|
lastrevid=entity["lastrevid"],
|
|
modified=modified,
|
|
)
|
|
database.session.add(artwork_item)
|
|
|
|
catalog = wd_catalog.get_catalog_from_artwork(entity)
|
|
if not catalog.get("institution"):
|
|
institution = get_institution(entity, other)
|
|
catalog["institution"] = institution
|
|
|
|
return render_template(
|
|
"item.html",
|
|
qid=qid,
|
|
item_id=item_id,
|
|
item=item,
|
|
catalog=catalog,
|
|
labels=find_more_props,
|
|
entity=item.entity,
|
|
username=g.user,
|
|
label=label,
|
|
label_languages=label_languages,
|
|
show_translation_links=show_translation_links,
|
|
existing_depicts=existing_depicts,
|
|
image=image,
|
|
people=people,
|
|
other=other,
|
|
# hits=hits,
|
|
title=item.display_title,
|
|
)
|
|
|
|
|
|
def get_languages(codes: typing.Iterable[str]) -> typing.Any:
|
|
return Language.query.filter( # type: ignore
|
|
Language.wikimedia_language_code.in_(codes)
|
|
)
|
|
|
|
|
|
def get_entity_label_and_language(entity: Entity) -> dict[str, typing.Any] | None:
|
|
"""
|
|
Look for a useful label and return it with a list of languages that have that label.
|
|
|
|
If the entity has a label in English return it.
|
|
|
|
Otherwise check if all languages have the same label, if so then return it.
|
|
"""
|
|
|
|
group_by_label = defaultdict(set)
|
|
for language, l in entity["labels"].items():
|
|
group_by_label[l["value"]].add(language)
|
|
|
|
if "en" in entity["labels"]:
|
|
label = entity["labels"]["en"]["value"]
|
|
return {"label": label, "languages": get_languages(group_by_label[label])}
|
|
|
|
if len(group_by_label) == 1:
|
|
label, languages = list(group_by_label.items())[0]
|
|
return {"label": label, "languages": get_languages(languages)}
|
|
|
|
return None
|
|
|
|
|
|
def get_labels(keys: typing.Iterable[str], name: str | None = None) -> dict[str, str]:
|
|
keys = sorted(keys, key=lambda i: int(i[1:]))
|
|
if name is None:
|
|
name = hashlib.md5("_".join(keys).encode("utf-8")).hexdigest()
|
|
filename = f"cache/{name}_labels.json"
|
|
labels = []
|
|
if os.path.exists(filename):
|
|
from_cache = json.load(open(filename))
|
|
if isinstance(from_cache, dict) and from_cache.get("keys") == keys:
|
|
labels = from_cache["labels"]
|
|
if not labels:
|
|
for cur in utils.chunk(keys, 50):
|
|
labels += mediawiki.get_entities(cur, props="labels")
|
|
|
|
json.dump({"keys": keys, "labels": labels}, open(filename, "w"), indent=2)
|
|
|
|
return {
|
|
entity["id"]: wikibase.get_entity_label(entity) or "no English label"
|
|
for entity in labels
|
|
}
|
|
|
|
|
|
def get_labels_db(keys: Iterable[str]):
|
|
keys = set(keys)
|
|
labels = {}
|
|
missing = set()
|
|
for qid in keys:
|
|
m = re_qid.match(qid)
|
|
if m:
|
|
item_id = int(m.group(1))
|
|
item = Item.query.get(item_id) # type: ignore
|
|
if item:
|
|
labels[qid] = item.label
|
|
continue
|
|
|
|
missing.add(qid)
|
|
|
|
page_size = 50
|
|
try:
|
|
for cur in utils.chunk(missing, page_size):
|
|
for entity in mediawiki.get_entities(cur):
|
|
if "redirects" in entity:
|
|
continue
|
|
|
|
qid = entity["id"]
|
|
|
|
modified = datetime.strptime(entity["modified"], "%Y-%m-%dT%H:%M:%SZ")
|
|
# FIXME: check if the item is an artwork and set is_artwork correctly
|
|
item = Item( # type: ignore
|
|
item_id=int(qid[1:]),
|
|
entity=entity,
|
|
lastrevid=entity["lastrevid"],
|
|
modified=modified,
|
|
is_artwork=False,
|
|
)
|
|
database.session.add(item)
|
|
labels[qid] = item.label
|
|
database.session.commit()
|
|
except requests.exceptions.ReadTimeout:
|
|
pass
|
|
|
|
return labels
|
|
|
|
|
|
def build_other_set(entity: Entity) -> set[str]:
|
|
other_items = set()
|
|
for key in find_more_props.keys():
|
|
for claim in entity["claims"].get(key, []):
|
|
if "datavalue" not in claim["mainsnak"]:
|
|
continue
|
|
other_items.add(claim["mainsnak"]["datavalue"]["value"]["id"])
|
|
return other_items
|
|
|
|
|
|
def get_other(entity: Entity) -> Mapping[str, str | None]:
|
|
other_items = build_other_set(entity)
|
|
return get_labels(other_items)
|
|
|
|
|
|
@app.route("/edits")
|
|
def list_edits() -> str:
|
|
q = Edit.query.order_by(Edit.timestamp.desc()) # type: ignore
|
|
page = utils.get_int_arg("page") or 1
|
|
pager = Pagination(page, 100, q.count())
|
|
|
|
item_count = database.session.query(
|
|
func.count(distinct(Edit.artwork_id))
|
|
).scalar() # type: ignore
|
|
|
|
user_count = database.session.query(
|
|
func.count(distinct(Edit.username))
|
|
).scalar() # type: ignore
|
|
|
|
return render_template(
|
|
"list_edits.html",
|
|
pager=pager,
|
|
edit_list=pager.slice(q),
|
|
item_count=item_count,
|
|
user_count=user_count,
|
|
)
|
|
|
|
|
|
@app.route("/user/<username>")
|
|
def user_page(username: str) -> str:
|
|
edit_list = Edit.query.filter_by(username=username).order_by( # type: ignore
|
|
Edit.timestamp.desc()
|
|
)
|
|
|
|
item_count = (
|
|
database.session.query(func.count(distinct(Edit.artwork_id))) # type: ignore
|
|
.filter_by(username=username)
|
|
.scalar()
|
|
)
|
|
|
|
return render_template(
|
|
"user_page.html",
|
|
username=username,
|
|
edits=Edit.query, # type: ignore
|
|
edit_list=edit_list,
|
|
item_count=item_count,
|
|
)
|
|
|
|
|
|
@app.route("/next/Q<int:item_id>")
|
|
def next_page(item_id: int) -> str:
|
|
qid = f"Q{item_id}"
|
|
|
|
entity = mediawiki.get_entity_with_cache(qid)
|
|
assert entity
|
|
|
|
width = 800
|
|
image_filename = wikibase.first_datavalue(entity, "P18")
|
|
assert isinstance(image_filename, str)
|
|
image = image_with_cache(qid, image_filename, width)
|
|
|
|
label = wikibase.get_entity_label(entity)
|
|
other = get_other(entity)
|
|
|
|
other_list = []
|
|
for key, prop_label in find_more_props.items():
|
|
if key == "P186": # skip material used
|
|
continue # too generic
|
|
claims = entity["claims"].get(key)
|
|
if not claims:
|
|
continue
|
|
|
|
values = []
|
|
|
|
for claim in claims:
|
|
if "datavalue" not in claim["mainsnak"]:
|
|
continue
|
|
value = claim["mainsnak"]["datavalue"]["value"]
|
|
claim_qid = value["id"]
|
|
if claim_qid == "Q4233718":
|
|
continue # anonymous artist
|
|
numeric_id = value["numeric-id"]
|
|
href = url_for("find_more_page", property_id=key[1:], item_id=numeric_id)
|
|
values.append(
|
|
{
|
|
"href": href,
|
|
"qid": claim_qid,
|
|
"label": other.get(claim_qid),
|
|
}
|
|
)
|
|
|
|
if not values:
|
|
continue
|
|
|
|
qid_list = [v["qid"] for v in values]
|
|
|
|
other_list.append(
|
|
{
|
|
"label": prop_label,
|
|
"image_lookup": url_for("find_more_json", pid=key, qid=qid_list),
|
|
"pid": key,
|
|
"values": values,
|
|
"images": [],
|
|
}
|
|
)
|
|
|
|
return render_template(
|
|
"next.html",
|
|
qid=qid,
|
|
label=label,
|
|
image=image,
|
|
labels=find_more_props,
|
|
other=other,
|
|
entity=entity,
|
|
other_props=other_list,
|
|
)
|
|
|
|
|
|
@app.route("/P<int:property_id>/Q<int:item_id>")
|
|
def find_more_page(property_id: int, item_id: int) -> Response:
|
|
pid, qid = f"P{property_id}", f"Q{item_id}"
|
|
return redirect(url_for("browse_page", **{pid: qid})) # type: ignore
|
|
|
|
|
|
@app.route("/toolinfo.json")
|
|
def tool_info() -> Response:
|
|
info = {
|
|
"name": "wade",
|
|
"title": "Wikidata Art Depiction Explorer",
|
|
"description": "Add depicts statements to works of art.",
|
|
"url": "https://art.wikidata.link/",
|
|
"keywords": "art, depicts, paintings, depiction",
|
|
"author": "Edward Betts",
|
|
"repository": "https://github.com/edwardbetts/depicts.git",
|
|
}
|
|
return jsonify(info)
|
|
|
|
|
|
def get_facets(params) -> dict[str, typing.Any]:
|
|
properties = [pid for pid in find_more_props.keys() if pid not in request.args]
|
|
|
|
bindings = wdqs.run_from_template_with_cache(
|
|
"query/facet.sparql", params=params, isa_list=isa_list, properties=properties
|
|
)
|
|
|
|
facets: dict[str, list[dict[str, str | int]]] = {
|
|
key: [] for key in find_more_props.keys()
|
|
}
|
|
for row in bindings:
|
|
pid = row["property"]["value"].rpartition("/")[2]
|
|
qid = row["object"]["value"].rpartition("/")[2]
|
|
label = row["objectLabel"]["value"]
|
|
count = int(row["count"]["value"])
|
|
|
|
if pid not in find_more_props:
|
|
continue
|
|
facets[pid].append({"qid": qid, "label": label, "count": count})
|
|
|
|
return {
|
|
key: sorted(values, key=lambda i: i["count"], reverse=True)[:15]
|
|
for key, values in facets.items()
|
|
if values
|
|
}
|
|
|
|
|
|
def get_artwork_params() -> list[tuple[str, str]]:
|
|
params = []
|
|
for pid, qid in request.args.items():
|
|
m = re_pid.match(pid)
|
|
|
|
if not m:
|
|
continue
|
|
pid = m.group(0)
|
|
|
|
m = re_qid.match(qid)
|
|
if not m:
|
|
continue
|
|
qid = m.group(0)
|
|
params.append((pid, qid))
|
|
return params
|
|
|
|
|
|
def filter_artwork(params: list[tuple[str, str]]) -> list[wdqs.Row]:
|
|
return wdqs.run_from_template_with_cache(
|
|
"query/find_more.sparql", params=params, isa_list=isa_list
|
|
)
|
|
|
|
|
|
@app.route("/catalog")
|
|
def catalog_page() -> str:
|
|
params = get_artwork_params()
|
|
bindings = filter_artwork(params)
|
|
page = utils.get_int_arg("page") or 1
|
|
page_size = 45
|
|
|
|
item_ids = set()
|
|
for row in bindings:
|
|
item_id = wdqs.row_id(row)
|
|
item_ids.add(item_id)
|
|
|
|
qids = [f"Q{item_id}" for item_id in sorted(item_ids)]
|
|
|
|
items = [Item.query.get(item_id) for item_id in item_ids] # type: ignore
|
|
|
|
entities = mediawiki.get_entities_with_cache(qids)
|
|
|
|
items = []
|
|
other_items = set()
|
|
for entity in entities:
|
|
other_items.update(build_other_set(entity))
|
|
continue
|
|
|
|
item = {
|
|
"label": wikibase.get_entity_label(entity),
|
|
"qid": entity["id"],
|
|
"item_id": int(entity["id"][1:]),
|
|
"image_filename": wikibase.first_datavalue(entity, "P18"),
|
|
"entity": entity,
|
|
}
|
|
items.append(item)
|
|
|
|
other = get_labels(other_items)
|
|
|
|
flat = "_".join(f"{pid}={qid}" for pid, qid in params)
|
|
thumbwidth = 400
|
|
# FIXME cache_name can be too long for filesystem
|
|
cache_name = f"{flat}_{page}_{page_size}_{thumbwidth}"
|
|
detail = get_image_detail_with_cache(items, cache_name, thumbwidth=thumbwidth)
|
|
|
|
for item in items:
|
|
item["url"] = url_for("item_page", item_id=item["item_id"])
|
|
item["image"] = detail[item["image_filename"]]
|
|
|
|
item_labels = get_labels(qid for pid, qid in params)
|
|
title = " / ".join(
|
|
find_more_props[pid] + ": " + item_labels[qid] for pid, qid in params
|
|
)
|
|
|
|
return render_template(
|
|
"catalog.html", labels=find_more_props, items=items, other=other, title=title
|
|
)
|
|
|
|
|
|
def get_image_detail_with_cache(
|
|
items, cache_name: str, thumbwidth: int | None = None, refresh: bool = False
|
|
):
|
|
filenames = [cur.image_filename() for cur in items]
|
|
|
|
if thumbwidth is None:
|
|
thumbwidth = app.config["THUMBWIDTH"]
|
|
|
|
filename = f"cache/{cache_name}_images.json"
|
|
cache_exists = os.path.exists(filename)
|
|
detail = None
|
|
if not refresh and cache_exists:
|
|
try:
|
|
detail = json.load(open(filename))
|
|
except json.decoder.JSONDecodeError:
|
|
pass
|
|
if not detail:
|
|
try:
|
|
detail = commons.image_detail(filenames, thumbwidth=thumbwidth)
|
|
json.dump(detail, open(filename, "w"), indent=2)
|
|
except requests.exceptions.ReadTimeout:
|
|
detail = json.load(open(filename)) if cache_exists else {}
|
|
|
|
return detail
|
|
|
|
|
|
def browse_index() -> str:
|
|
q = (
|
|
database.session.query( # type: ignore
|
|
Triple.predicate_id, func.count(func.distinct(Triple.object_id))
|
|
)
|
|
.join(Item, Triple.subject_id == Item.item_id)
|
|
.filter_by(is_artwork=True)
|
|
.group_by(Triple.predicate_id)
|
|
)
|
|
|
|
counts = {f"P{predicate_id}": count for predicate_id, count in q}
|
|
|
|
return render_template("browse_index.html", props=find_more_props, counts=counts)
|
|
|
|
|
|
@app.route("/debug/show_user")
|
|
def debug_show_user() -> str:
|
|
userinfo = wikidata_oauth.userinfo_call()
|
|
return "<pre>" + json.dumps(userinfo, indent=2) + "</pre>"
|
|
|
|
|
|
@app.route("/browse/facets.json")
|
|
def browse_facets() -> Response:
|
|
params = get_artwork_params()
|
|
if not params:
|
|
return jsonify(notice="facet criteria missing")
|
|
|
|
facets = get_facets(params)
|
|
|
|
for key, values in facets.items():
|
|
for v in values:
|
|
v["href"] = set_url_args(endpoint="browse_page", **{key: v["qid"]})
|
|
|
|
return jsonify(params=params, facets=facets, prop_labels=find_more_props)
|
|
|
|
|
|
def get_db_items(params):
|
|
"""Get items for browse page based on criteria."""
|
|
q = Item.query.filter_by(is_artwork=True) # type: ignore
|
|
for pid, qid in params:
|
|
q = q.join(Triple, Item.item_id == Triple.subject_id, aliased=True).filter(
|
|
Triple.predicate_id == pid[1:], Triple.object_id == qid[1:]
|
|
)
|
|
|
|
return q
|
|
|
|
|
|
def get_db_facets(params):
|
|
t = aliased(Triple)
|
|
q = database.session.query(t.predicate_id, func.count().label("count"), t.object_id)
|
|
facet_limit = 18
|
|
|
|
for pid, qid in params:
|
|
q = q.join( # type: ignore
|
|
Triple, t.subject_id == Triple.subject_id, aliased=True
|
|
).filter(
|
|
Triple.predicate_id == pid[1:],
|
|
Triple.object_id == qid[1:],
|
|
t.predicate_id != pid[1:],
|
|
t.object_id != qid[1:],
|
|
)
|
|
|
|
q = q.group_by(t.predicate_id, t.object_id)
|
|
|
|
results = sorted(tuple(row) for row in q.all())
|
|
|
|
facet_list = {}
|
|
subject_qids: set[str] = set()
|
|
for predicate_id, x in itertools.groupby(results, lambda row: row[0]):
|
|
hits = sorted(x, key=lambda row: row[1], reverse=True)
|
|
values = [
|
|
{"count": count, "qid": f"Q{value}"}
|
|
for _, count, value in hits[:facet_limit]
|
|
]
|
|
facet_list[f"P{predicate_id}"] = values
|
|
subject_qids.update(i["qid"] for i in values)
|
|
|
|
labels = get_labels_db(subject_qids)
|
|
|
|
for values in facet_list.values():
|
|
for v in values:
|
|
v["label"] = labels.get(v["qid"])
|
|
|
|
return facet_list
|
|
|
|
|
|
@app.route("/browse")
|
|
def browse_page() -> str:
|
|
page_size = 45
|
|
params = get_artwork_params()
|
|
|
|
if not params:
|
|
return browse_index()
|
|
|
|
flat = "_".join(f"{pid}={qid}" for pid, qid in params)
|
|
item_labels = get_labels_db(qid for pid, qid in params)
|
|
g.title = " / ".join(
|
|
find_more_props[pid] + ": " + (item_labels.get(qid) or qid)
|
|
for pid, qid in params
|
|
)
|
|
|
|
q_items = get_db_items(params)
|
|
facets = get_db_facets(params)
|
|
|
|
all_items = q_items.all()
|
|
|
|
page = utils.get_int_arg("page") or 1
|
|
total = q_items.count()
|
|
pager = Pagination(page, page_size, total)
|
|
|
|
items = [item for item in pager.slice(all_items) if item.image_filename()]
|
|
|
|
cache_name = f"{flat}_{page}_{page_size}"
|
|
detail = get_image_detail_with_cache(items, cache_name)
|
|
cache_refreshed = False
|
|
|
|
linked_qids = {qid for pid, qid in params}
|
|
for item in items:
|
|
artist_qid = item.artist
|
|
if artist_qid:
|
|
linked_qids.add(artist_qid)
|
|
for prop in "P31", "P180":
|
|
linked_qids.update(item.linked_qids(prop))
|
|
|
|
linked_labels = get_labels_db(linked_qids)
|
|
|
|
for item in items:
|
|
image_filename = item.image_filename()
|
|
if not cache_refreshed and image_filename not in detail:
|
|
detail = get_image_detail_with_cache(items, cache_name, refresh=True)
|
|
cache_refreshed = True
|
|
item.image = detail.get(image_filename)
|
|
|
|
return render_template(
|
|
"find_more.html",
|
|
page=page,
|
|
label=g.title,
|
|
pager=pager,
|
|
prop_labels=find_more_props,
|
|
labels=find_more_props,
|
|
linked_labels=linked_labels,
|
|
items=items,
|
|
total=total,
|
|
params=params,
|
|
facets=facets,
|
|
)
|
|
|
|
return jsonify(params=params, items=items.count(), facets=facets)
|
|
|
|
|
|
@app.route("/find_more.json")
|
|
def find_more_json() -> Response:
|
|
pid = request.args.get("pid")
|
|
qid_list = request.args.getlist("qid")
|
|
limit = 6
|
|
|
|
filenames = []
|
|
cache_name = f'{pid}={",".join(qid_list)}_{limit}'
|
|
bindings = wdqs.run_from_template_with_cache(
|
|
"query/find_more_basic.sparql",
|
|
cache_name=cache_name,
|
|
qid_list=qid_list,
|
|
pid=pid,
|
|
limit=limit,
|
|
)
|
|
|
|
items = []
|
|
for row in bindings:
|
|
item_id = wdqs.row_id(row)
|
|
row_qid = f"Q{item_id}"
|
|
image_filename = wdqs.commons_uri_to_filename(row["image"]["value"])
|
|
filenames.append(image_filename)
|
|
items.append(
|
|
{
|
|
"qid": row_qid,
|
|
"item_id": item_id,
|
|
"href": url_for("item_page", item_id=item_id),
|
|
"filename": image_filename,
|
|
}
|
|
)
|
|
|
|
thumbheight = 120
|
|
detail = commons.image_detail(filenames, thumbheight=thumbheight)
|
|
|
|
for item in items:
|
|
item["image"] = detail[item["filename"]]
|
|
|
|
return jsonify(items=items)
|
|
|
|
|
|
Hit = dict[str, str | int | None]
|
|
|
|
|
|
def wikibase_search(terms: str) -> list[Hit]:
|
|
hits = []
|
|
r = mediawiki.api_call(
|
|
{
|
|
"action": "wbsearchentities",
|
|
"search": terms,
|
|
"limit": "max",
|
|
"language": "en",
|
|
}
|
|
)
|
|
for result in r.json()["search"]:
|
|
hit = {
|
|
"label": result["label"],
|
|
"description": result.get("description") or None,
|
|
"qid": result["id"],
|
|
"count": 0,
|
|
}
|
|
if result["match"]["type"] == "alias":
|
|
hit["alt_label"] = result["match"]["text"]
|
|
hits.append(hit)
|
|
|
|
return hits
|
|
|
|
|
|
def add_images_to_depicts_lookup(hits: list[dict[str, str]]) -> None:
|
|
qid_to_item = {hit["qid"]: hit for hit in hits}
|
|
all_qids = [hit["qid"] for hit in hits]
|
|
entities: list[Entity] = mediawiki.get_entities_with_cache(all_qids)
|
|
|
|
for entity in entities:
|
|
qid = entity["id"]
|
|
item = qid_to_item[qid]
|
|
item.entity = entity
|
|
database.session.commit()
|
|
|
|
for hit in hits:
|
|
item = qid_to_item[hit["qid"]]
|
|
if item.entity:
|
|
image_filename = wikibase.first_datavalue(item.entity, "P18")
|
|
hit["image_filename"] = image_filename
|
|
|
|
filenames = [hit["image_filename"] for hit in hits if hit.get("image_filename")]
|
|
filenames = filenames[:50]
|
|
thumbwidth = 200
|
|
detail = commons.image_detail(filenames, thumbwidth=thumbwidth)
|
|
|
|
for hit in hits:
|
|
filename = hit.get("image_filename")
|
|
if not filename or filename not in detail:
|
|
continue
|
|
hit["image"] = detail[filename]
|
|
|
|
|
|
@app.route("/lookup")
|
|
def depicts_lookup() -> Response:
|
|
terms = request.args.get("terms")
|
|
if not terms:
|
|
return jsonify(error="terms parameter is required")
|
|
|
|
terms = terms.strip()
|
|
if len(terms) < 3:
|
|
return jsonify(
|
|
count=0,
|
|
hits=[],
|
|
notice="terms too short for lookup",
|
|
)
|
|
|
|
item_ids = []
|
|
hits = []
|
|
q1 = DepictsItem.query.filter(DepictsItem.label.ilike(terms + "%")) # type: ignore
|
|
seen = set()
|
|
for item in q1:
|
|
hit = {
|
|
"label": item.label,
|
|
"description": item.description,
|
|
"qid": item.qid,
|
|
"count": item.count,
|
|
}
|
|
item_ids.append(item.item_id)
|
|
hits.append(hit)
|
|
seen.add(item.qid)
|
|
|
|
cls = DepictsItemAltLabel
|
|
q2 = cls.query.filter( # type: ignore
|
|
cls.alt_label.ilike(terms + "%"), ~cls.item_id.in_(item_ids)
|
|
)
|
|
|
|
for alt in q2:
|
|
item = alt.item
|
|
if item.count is None:
|
|
continue
|
|
hit = {
|
|
"label": item.label,
|
|
"description": item.description,
|
|
"qid": item.qid,
|
|
"count": item.count,
|
|
"alt_label": alt.alt_label,
|
|
}
|
|
hits.append(hit)
|
|
seen.add(item.qid)
|
|
|
|
hits.sort(key=lambda hit: hit["count"], reverse=True)
|
|
|
|
if app.config.get("LOOKUP_INCLUDES_IMAGES"):
|
|
add_images_to_depicts_lookup(hits)
|
|
|
|
if app.config.get("SEARCH_WIKIDATA"):
|
|
search_hits = wikibase_search(terms)
|
|
hits += [hit for hit in search_hits if hit["qid"] not in seen]
|
|
|
|
ret = {
|
|
"count": q1.count() + q2.count(),
|
|
"hits": hits,
|
|
"terms": terms,
|
|
}
|
|
|
|
return jsonify(ret)
|
|
|
|
|
|
@app.route("/report/missing_image")
|
|
def missing_image_report() -> str:
|
|
limit = utils.get_int_arg("limit") or 1000
|
|
q = DepictsItem.query.order_by(DepictsItem.count.desc()).limit(limit) # type:ignore
|
|
|
|
qids = [item.qid for item in q]
|
|
entities: dict[str, Entity] = mediawiki.get_entities_dict_with_cache(qids)
|
|
|
|
item_list = []
|
|
|
|
for depicts in q:
|
|
entity = entities[depicts.qid]
|
|
if any(wikibase.first_datavalue(entity, prop) for prop in ("P18", "P2716")):
|
|
continue
|
|
item_list.append(depicts)
|
|
|
|
# TODO: call wikidata search to find images that depict item
|
|
|
|
return render_template("missing_image.html", item_list=item_list)
|
|
|
|
|
|
@app.route("/report/wdqs")
|
|
def wikidata_query_list() -> str:
|
|
q = WikidataQuery.query.order_by(WikidataQuery.start_time.desc()) # type: ignore
|
|
return render_template("query_list.html", q=q)
|
|
|
|
|
|
@app.route("/report/blocks")
|
|
def server_block_report() -> str:
|
|
check_for_blocks()
|
|
return render_template("block_report.html")
|
|
|
|
|
|
@app.route("/fixture/save_error")
|
|
def save_error_fixture() -> str:
|
|
error = fixtures.save_error()["error"]
|
|
return render_template("save_error.html", error=error)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.debug = True
|
|
app.run(host="0.0.0.0", debug=True)
|