Edward Betts
a20666f9d4
Allow sorting property values by name Use label from different language if English label is missing
553 lines
17 KiB
Python
Executable file
553 lines
17 KiB
Python
Executable file
#!/usr/bin/python3
|
|
|
|
from flask import Flask, render_template, url_for, redirect, request
|
|
from itertools import islice
|
|
from pprint import pprint
|
|
import dateutil.parser
|
|
import urllib.parse
|
|
import lxml.etree
|
|
import requests
|
|
import json
|
|
import os
|
|
import locale
|
|
|
|
locale.setlocale(locale.LC_ALL, 'en_US.UTF-8')
|
|
|
|
url_start = 'http://www.wikidata.org/entity/Q'
|
|
wikidata_url = 'https://www.wikidata.org/w/api.php'
|
|
commons_url = 'https://www.wikidata.org/w/api.php'
|
|
wikidata_query_api_url = 'https://query.wikidata.org/bigdata/namespace/wdq/sparql'
|
|
commons_start = 'http://commons.wikimedia.org/wiki/Special:FilePath/'
|
|
commons_api_url = 'https://tools.wmflabs.org/magnus-toolserver/commonsapi.php'
|
|
commons_query_url = 'https://commons.wikimedia.org/w/api.php'
|
|
thumbwidth = 300
|
|
thumbheight = 400
|
|
|
|
app = Flask(__name__)
|
|
|
|
find_more_props = {
|
|
'P135': 'movement',
|
|
'P136': 'genre',
|
|
'P170': 'artist',
|
|
'P195': 'collection',
|
|
'P276': 'location',
|
|
'P495': 'country of origin',
|
|
'P127': 'owned by',
|
|
'P179': 'part of the series',
|
|
'P921': 'main subject',
|
|
'P186': 'material used',
|
|
'P88': 'commissioned by',
|
|
'P1028': 'donated by',
|
|
'P1071': 'location of final assembly',
|
|
'P138': 'named after',
|
|
'P1433': 'published in',
|
|
'P144': 'based on',
|
|
'P2079': 'fabrication method',
|
|
'P2348': 'time period',
|
|
'P361': 'part of',
|
|
'P608': 'exhibition history',
|
|
|
|
# possible future props
|
|
# 'P571': 'inception',
|
|
# 'P166': 'award received', (only 2)
|
|
# 'P1419': 'shape', (only 2)
|
|
# 'P123': 'publisher', (only 1)
|
|
}
|
|
|
|
find_more_query = '''
|
|
select ?item ?itemLabel ?image ?artist ?artistLabel ?title ?time ?timeprecision {
|
|
SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }
|
|
?item wdt:P31 wd:Q3305213 .
|
|
PARAMS
|
|
?item wdt:P18 ?image .
|
|
OPTIONAL {
|
|
?item p:P571/psv:P571 ?timenode .
|
|
?timenode wikibase:timeValue ?time.
|
|
?timenode wikibase:timePrecision ?timeprecision.
|
|
}
|
|
OPTIONAL { ?item wdt:P1476 ?title }
|
|
OPTIONAL { ?item wdt:P170 ?artist }
|
|
FILTER NOT EXISTS { ?item wdt:P180 ?depicts }
|
|
}
|
|
'''
|
|
|
|
facet_query = '''
|
|
select ?property ?object ?objectLabel (count(*) as ?count) {
|
|
SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }
|
|
?item wdt:P31 wd:Q3305213 .
|
|
?item wdt:P18 ?image .
|
|
PARAMS
|
|
values ?property { PROPERTY_LIST }
|
|
?item ?property ?object .
|
|
FILTER NOT EXISTS { ?item wdt:P180 ?depicts }
|
|
} group by ?property ?propertyLabel ?object ?objectLabel
|
|
'''
|
|
|
|
property_query = '''
|
|
select ?object ?objectLabel ?objectDescription (count(*) as ?count) {
|
|
?item wdt:P31 wd:Q3305213 .
|
|
?item wdt:P18 ?image .
|
|
?item wdt:PID ?object .
|
|
filter not exists { ?item wdt:P180 ?depicts }
|
|
optional {
|
|
?object rdfs:label ?objectLabel.
|
|
FILTER(LANG(?objectLabel) = "en").
|
|
}
|
|
optional {
|
|
?object schema:description ?objectDescription .
|
|
filter(lang(?objectDescription) = "en")
|
|
}
|
|
|
|
} group by ?object ?objectLabel ?objectDescription
|
|
order by desc(?count)
|
|
'''
|
|
|
|
def ordinal(n):
|
|
return "%d%s" % (n, 'tsnrhtdd'[(n / 10 % 10 != 1) * (n % 10 < 4) * n % 10::4])
|
|
|
|
def chunk(it, size):
|
|
it = iter(it)
|
|
return iter(lambda: tuple(islice(it, size)), ())
|
|
|
|
def run_wikidata_query(query):
|
|
params = {'query': query, 'format': 'json'}
|
|
r = requests.post(wikidata_query_api_url, data=params, stream=True)
|
|
assert r.status_code == 200
|
|
return r
|
|
|
|
def drop_start(s, start):
|
|
assert s.startswith(start)
|
|
return s[len(start):]
|
|
|
|
def row_id(row):
|
|
return int(drop_start(row['item']['value'], url_start))
|
|
|
|
def api_call(params, api_url=wikidata_url):
|
|
call_params = {
|
|
'format': 'json',
|
|
'formatversion': 2,
|
|
**params,
|
|
}
|
|
|
|
r = requests.get(wikidata_url, params=call_params)
|
|
return r
|
|
|
|
def get_entity(qid):
|
|
json_data = api_call({'action': 'wbgetentities', 'ids': qid}).json()
|
|
|
|
try:
|
|
entity = list(json_data['entities'].values())[0]
|
|
except KeyError:
|
|
return
|
|
if 'missing' not in entity:
|
|
return entity
|
|
|
|
def get_entities(ids, **params):
|
|
if not ids:
|
|
return []
|
|
params = {
|
|
'action': 'wbgetentities',
|
|
'ids': '|'.join(ids),
|
|
**params,
|
|
}
|
|
r = api_call(params)
|
|
json_data = r.json()
|
|
return list(json_data['entities'].values())
|
|
|
|
@app.route("/")
|
|
def index():
|
|
return render_template('index.html', props=find_more_props)
|
|
|
|
def run_query_with_cache(q, name):
|
|
filename = f'cache/{name}.json'
|
|
if os.path.exists(filename):
|
|
from_cache = json.load(open(filename))
|
|
if isinstance(from_cache, dict) and from_cache.get('query') == q:
|
|
return from_cache['bindings']
|
|
|
|
r = run_wikidata_query(q)
|
|
bindings = r.json()['results']['bindings']
|
|
json.dump({'query': q, 'bindings': bindings},
|
|
open(filename, 'w'), indent=2)
|
|
|
|
return bindings
|
|
|
|
def get_row_value(row, field):
|
|
return row[field]['value'] if field in row else None
|
|
|
|
@app.route("/property/P<int:property_id>")
|
|
def property_query_page(property_id):
|
|
pid = f'P{property_id}'
|
|
sort = request.args.get('sort')
|
|
sort_by_name = sort and sort.lower().strip() == 'name'
|
|
|
|
q = property_query.replace('PID', pid)
|
|
rows = run_query_with_cache(q, name=pid)
|
|
|
|
no_label_qid = [row['object']['value'].rpartition('/')[2]
|
|
for row in rows
|
|
if 'objectLabel' not in row and '/' in row['object']['value']]
|
|
|
|
if no_label_qid:
|
|
extra_label = get_labels(no_label_qid, name=f'{pid}_extra_labels')
|
|
if extra_label:
|
|
for row in rows:
|
|
item = row['object']['value']
|
|
if 'objectLabel' in row or '/' not in item:
|
|
continue
|
|
qid = item.rpartition('/')[2]
|
|
if extra_label.get(qid):
|
|
row['objectLabel'] = {'value': extra_label[qid]}
|
|
|
|
if sort_by_name:
|
|
# put rows with no English label at the end
|
|
no_label = [row for row in rows if 'objectLabel' not in row]
|
|
has_label = sorted((row for row in rows if 'objectLabel' in row),
|
|
key=lambda row: locale.strxfrm(row['objectLabel']['value']))
|
|
rows = has_label + no_label
|
|
label = find_more_props[pid]
|
|
|
|
return render_template('property.html',
|
|
label=label,
|
|
order=('name' if sort_by_name else 'count'),
|
|
pid=pid,
|
|
rows=rows)
|
|
|
|
@app.route("/item/Q<int:item_id>")
|
|
def item_page(item_id):
|
|
qid = f'Q{item_id}'
|
|
return render_template('item.html', qid=qid)
|
|
|
|
def get_entity_label(entity):
|
|
if 'en' in entity['labels']:
|
|
return entity['labels']['en']['value']
|
|
|
|
label_values = {l['value'] for l in entity['labels'].values()}
|
|
if len(label_values) == 1:
|
|
return list(label_values)[0]
|
|
|
|
def get_labels(keys, name=None):
|
|
keys = sorted(keys, key=lambda i: int(i[1:]))
|
|
if name is None:
|
|
name = '_'.join(keys)
|
|
filename = f'cache/{name}_labels.json'
|
|
labels = []
|
|
if os.path.exists(filename):
|
|
from_cache = json.load(open(filename))
|
|
if isinstance(from_cache, dict) and from_cache.get('keys') == keys:
|
|
labels = from_cache['labels']
|
|
if not labels:
|
|
for cur in chunk(keys, 50):
|
|
labels += get_entities(cur, props='labels')
|
|
|
|
json.dump({'keys': keys, 'labels': labels},
|
|
open(filename, 'w'), indent=2)
|
|
|
|
try:
|
|
return {entity['id']: get_entity_label(entity) for entity in labels}
|
|
except TypeError:
|
|
pprint(labels)
|
|
raise
|
|
|
|
|
|
def get_entity_with_cache(qid):
|
|
filename = f'cache/{qid}.json'
|
|
if os.path.exists(filename):
|
|
entity = json.load(open(filename))
|
|
else:
|
|
entity = get_entity(qid)
|
|
json.dump(entity, open(filename, 'w'), indent=2)
|
|
|
|
return entity
|
|
|
|
def commons_uri_to_filename(uri):
|
|
return urllib.parse.unquote(drop_start(uri, commons_start))
|
|
|
|
def image_detail(filenames, thumbheight=None, thumbwidth=None):
|
|
if not isinstance(filenames, list):
|
|
filenames = [filenames]
|
|
if not filenames:
|
|
return {}
|
|
|
|
params = {
|
|
'action': 'query',
|
|
'titles': '|'.join(f'File:{f}' for f in filenames),
|
|
'prop': 'imageinfo',
|
|
'iiprop': 'url',
|
|
}
|
|
if thumbheight is not None:
|
|
params['iiurlheight'] = thumbheight
|
|
if thumbwidth is not None:
|
|
params['iiurlwidth'] = thumbwidth
|
|
r = api_call(params, api_url=commons_url)
|
|
|
|
images = {}
|
|
|
|
for image in r.json()['query']['pages']:
|
|
filename = drop_start(image['title'], 'File:')
|
|
images[filename] = image['imageinfo'][0]
|
|
|
|
return images
|
|
|
|
def image_detail_old(filenames, thumbwidth=None):
|
|
if not isinstance(filenames, list):
|
|
filenames = [filenames]
|
|
params = {'image': '|'.join(filenames)}
|
|
if thumbwidth is not None:
|
|
params['thumbwidth'] = thumbwidth
|
|
r = requests.get(commons_api_url, params=params)
|
|
xml = r.text
|
|
# workaround a bug in the commons API
|
|
# the API doesn't encode " in filenames
|
|
for f in filenames:
|
|
if '"' not in f:
|
|
continue
|
|
esc = f.replace('"', '"')
|
|
|
|
xml = xml.replace(f'name="{f}"', f'name="{esc}"')
|
|
|
|
root = lxml.etree.fromstring(xml.encode('utf-8'))
|
|
|
|
images = []
|
|
for image in root:
|
|
if image.tag == 'image':
|
|
file_element = image.find('./file')
|
|
elif image.tag == 'file':
|
|
file_element = image
|
|
else:
|
|
continue
|
|
thumb_element = file_element.find('./urls/thumbnail')
|
|
|
|
image = {
|
|
'name': image.get('name'),
|
|
'image': file_element.find('./urls/file').text,
|
|
'height': int(file_element.find('./height').text),
|
|
'width': int(file_element.find('./width').text),
|
|
}
|
|
|
|
if thumb_element is not None:
|
|
image['thumbnail'] = thumb_element.text
|
|
|
|
images.append(image)
|
|
|
|
return images
|
|
|
|
# def commons_filename(row):
|
|
# image = row['image']['value']
|
|
# assert image.startswith(commons_start)
|
|
# return urllib.parse.unquote(image[len(commons_start):])
|
|
#
|
|
# def commons_api(row):
|
|
# params = {
|
|
# 'image': commons_filename(row),
|
|
# 'thumbwidth': thumbwidth,
|
|
# }
|
|
# r = requests.get(commons_api_url, params=params)
|
|
# return r
|
|
#
|
|
# def get_commons(row):
|
|
# r = commons_api(row)
|
|
# root = lxml.etree.fromstring(r.content)
|
|
#
|
|
# return root.find('./file/urls/thumbnail').text
|
|
|
|
@app.route("/next/Q<int:item_id>")
|
|
def next_page(item_id):
|
|
qid = f'Q{item_id}'
|
|
|
|
entity = get_entity_with_cache(qid)
|
|
|
|
width = 800
|
|
image_filename = entity['claims']['P18'][0]['mainsnak']['datavalue']['value']
|
|
filename = f'cache/{qid}_{width}_image.json'
|
|
if os.path.exists(filename):
|
|
detail = json.load(open(filename))
|
|
else:
|
|
detail = image_detail([image_filename], thumbwidth=width)
|
|
json.dump(detail, open(filename, 'w'), indent=2)
|
|
|
|
other_items = set()
|
|
for key in find_more_props.keys():
|
|
if key not in entity['claims']:
|
|
continue
|
|
for claim in entity['claims'][key]:
|
|
other_items.add(claim['mainsnak']['datavalue']['value']['id'])
|
|
|
|
item_labels = get_labels(other_items)
|
|
|
|
if 'en' in entity['labels']:
|
|
label = entity['labels']['en']['value']
|
|
elif len(entity['labels']) == 1:
|
|
label = list(entity['labels'].values())[0]['value']
|
|
else:
|
|
label = 'title missing'
|
|
|
|
return render_template('next.html',
|
|
qid=qid,
|
|
label=label,
|
|
image=detail[image_filename],
|
|
labels=find_more_props,
|
|
other=item_labels,
|
|
entity=entity)
|
|
|
|
@app.route('/P<int:property_id>/Q<int:item_id>')
|
|
def find_more_page(property_id, item_id):
|
|
pid, qid = f'P{property_id}', f'Q{item_id}'
|
|
|
|
return redirect(url_for('browse_page') + f'?{pid}={qid}')
|
|
|
|
def get_facets(sparql_params, params):
|
|
flat = '_'.join(f'{pid}={qid}' for pid, qid in params)
|
|
|
|
property_list = ' '.join(f'wdt:{pid}' for pid in find_more_props.keys()
|
|
if pid not in request.args)
|
|
|
|
q = (facet_query.replace('PARAMS', sparql_params)
|
|
.replace('PROPERTY_LIST', property_list))
|
|
|
|
# open(f'cache/{flat}_facets_query.sparql', 'w').write(q)
|
|
|
|
bindings = run_query_with_cache(q, flat + '_facets')
|
|
|
|
facets = {key: [] for key in find_more_props.keys()}
|
|
for row in bindings:
|
|
pid = row['property']['value'].rpartition('/')[2]
|
|
qid = row['object']['value'].rpartition('/')[2]
|
|
label = row['objectLabel']['value']
|
|
count = int(row['count']['value'])
|
|
|
|
facets[pid].append({'qid': qid, 'label': label, 'count': count})
|
|
|
|
return {
|
|
key: sorted(values, key=lambda i: i['count'], reverse=True)[:15]
|
|
for key, values in facets.items()
|
|
if values
|
|
}
|
|
|
|
@app.route('/browse')
|
|
def browse_page():
|
|
params = [(pid, qid) for pid, qid in request.args.items()
|
|
if pid.startswith('P') and qid.startswith('Q')]
|
|
|
|
flat = '_'.join(f'{pid}={qid}' for pid, qid in params)
|
|
|
|
# item_entity = get_entity_with_cache(qid)
|
|
|
|
item_labels = get_labels(qid for pid, qid in params)
|
|
|
|
# property_keys = item_entity['claims'].keys()
|
|
# property_labels = get_labels(property_keys, name=f'{flat}_property_labels')
|
|
|
|
sparql_params = ''.join(
|
|
f'?item wdt:{pid} wd:{qid} .\n' for pid, qid in params)
|
|
|
|
query = find_more_query.replace('PARAMS', sparql_params)
|
|
|
|
filename = f'cache/{flat}.json'
|
|
if os.path.exists(filename):
|
|
bindings = json.load(open(filename))
|
|
else:
|
|
r = run_wikidata_query(query)
|
|
bindings = r.json()['results']['bindings']
|
|
json.dump(bindings, open(filename, 'w'), indent=2)
|
|
|
|
facets = get_facets(sparql_params, params)
|
|
|
|
page_size = 45
|
|
|
|
item_map = {}
|
|
for row in bindings:
|
|
item_id = row_id(row)
|
|
row_qid = f'Q{item_id}'
|
|
label = row['itemLabel']['value']
|
|
image_filename = commons_uri_to_filename(row['image']['value'])
|
|
if item_id in item_map:
|
|
item = item_map[item_id]
|
|
item['image_filename'].append(image_filename)
|
|
continue
|
|
|
|
if label == row_qid:
|
|
if 'title' in row:
|
|
label = row['title']['value']
|
|
else:
|
|
label = 'name missing'
|
|
if 'artistLabel' in row:
|
|
artist_name = row['artistLabel']['value']
|
|
else:
|
|
artist_name = '[artist unknown]'
|
|
|
|
if 'time' in row:
|
|
t = dateutil.parser.parse(row['time']['value'])
|
|
precision = int(row['timeprecision']['value'])
|
|
|
|
if precision == 9:
|
|
d = t.year
|
|
elif precision == 8:
|
|
d = f'{t.year}s'
|
|
elif precision == 7:
|
|
d = f'{ordinal((t.year // 100) + 1)} century'
|
|
elif precision == 6:
|
|
d = f'{ordinal((t.year // 1000) + 1)} millennium'
|
|
else:
|
|
d = row['time']['value']
|
|
else:
|
|
d = None
|
|
|
|
item = {
|
|
'url': url_for('next_page', item_id=item_id),
|
|
'image_filename': [image_filename],
|
|
'item_id': item_id,
|
|
'qid': row_qid,
|
|
'label': label,
|
|
'date': d,
|
|
'artist_name': artist_name,
|
|
}
|
|
item_map[item_id] = item
|
|
|
|
items = []
|
|
for item in item_map.values():
|
|
if len(item['image_filename']) != 1:
|
|
continue
|
|
item['image_filename'] = item['image_filename'][0]
|
|
items.append(item)
|
|
if len(items) >= page_size:
|
|
break
|
|
|
|
filenames = [cur['image_filename'] for cur in items]
|
|
|
|
filename = f'cache/{flat}_{page_size}_images.json'
|
|
if os.path.exists(filename):
|
|
detail = json.load(open(filename))
|
|
else:
|
|
detail = image_detail(filenames, thumbwidth=thumbwidth)
|
|
json.dump(detail, open(filename, 'w'), indent=2)
|
|
|
|
for item in items:
|
|
item['image'] = detail[item['image_filename']]
|
|
|
|
total = len(bindings)
|
|
|
|
title = ' / '.join(item_labels[qid] for pid, qid in params)
|
|
|
|
return render_template('find_more.html',
|
|
# qid=qid,
|
|
# pid=pid,
|
|
# item_entity=item_entity,
|
|
# property_labels=property_labels,
|
|
facets=facets,
|
|
prop_labels=find_more_props,
|
|
label=title,
|
|
labels=find_more_props,
|
|
bindings=bindings,
|
|
items=items,
|
|
total=total)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.debug = True
|
|
app.run(host='0.0.0.0', debug=True)
|
|
|
|
# server = Server(app.wsgi_app)
|
|
# server.watch('template/*')
|
|
# server.serve()
|