#!/usr/bin/python3 from flask import Flask, render_template, url_for, redirect, request from itertools import islice from pprint import pprint import dateutil.parser import urllib.parse import lxml.etree import requests import json import os url_start = 'http://www.wikidata.org/entity/Q' wikidata_url = 'https://www.wikidata.org/w/api.php' commons_url = 'https://www.wikidata.org/w/api.php' wikidata_query_api_url = 'https://query.wikidata.org/bigdata/namespace/wdq/sparql' commons_start = 'http://commons.wikimedia.org/wiki/Special:FilePath/' commons_api_url = 'https://tools.wmflabs.org/magnus-toolserver/commonsapi.php' commons_query_url = 'https://commons.wikimedia.org/w/api.php' thumbwidth = 300 thumbheight = 400 app = Flask(__name__) find_more_props = { 'P135': 'movement', 'P136': 'genre', 'P170': 'artist', 'P195': 'collection', 'P276': 'location', 'P495': 'country of origin', 'P127': 'owned by', 'P179': 'part of the series', 'P921': 'main subject', 'P186': 'material used', 'P88': 'commissioned by', 'P1028': 'donated by', 'P1071': 'location of final assembly', 'P138': 'named after', 'P1433': 'published in', 'P144': 'based on', 'P2079': 'fabrication method', 'P2348': 'time period', 'P361': 'part of', 'P608': 'exhibition history', # possible future props # 'P571': 'inception', # 'P166': 'award received', (only 2) # 'P1419': 'shape', (only 2) # 'P123': 'publisher', (only 1) } find_more_query = ''' select ?item ?itemLabel ?image ?artist ?artistLabel ?title ?time ?timeprecision { SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". } ?item wdt:P31 wd:Q3305213 . PARAMS ?item wdt:P18 ?image . OPTIONAL { ?item p:P571/psv:P571 ?timenode . ?timenode wikibase:timeValue ?time. ?timenode wikibase:timePrecision ?timeprecision. } OPTIONAL { ?item wdt:P1476 ?title } OPTIONAL { ?item wdt:P170 ?artist } FILTER NOT EXISTS { ?item wdt:P180 ?depicts } } ''' facet_query = ''' select ?property ?object ?objectLabel (count(*) as ?count) { SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". } ?item wdt:P31 wd:Q3305213 . ?item wdt:P18 ?image . PARAMS values ?property { PROPERTY_LIST } ?item ?property ?object . FILTER NOT EXISTS { ?item wdt:P180 ?depicts } } group by ?property ?propertyLabel ?object ?objectLabel ''' property_query = ''' select ?object ?objectLabel ?objectDescription (count(*) as ?count) { ?item wdt:P31 wd:Q3305213 . ?item wdt:P18 ?image . ?item wdt:PID ?object . filter not exists { ?item wdt:P180 ?depicts } optional { ?object rdfs:label ?objectLabel. FILTER(LANG(?objectLabel) = "en"). } optional { ?object schema:description ?objectDescription . filter(lang(?objectDescription) = "en") } } group by ?object ?objectLabel ?objectDescription order by desc(?count) ''' def ordinal(n): return "%d%s" % (n, 'tsnrhtdd'[(n / 10 % 10 != 1) * (n % 10 < 4) * n % 10::4]) def chunk(it, size): it = iter(it) return iter(lambda: tuple(islice(it, size)), ()) def run_wikidata_query(query): params = {'query': query, 'format': 'json'} r = requests.post(wikidata_query_api_url, data=params, stream=True) assert r.status_code == 200 return r def drop_start(s, start): assert s.startswith(start) return s[len(start):] def row_id(row): return int(drop_start(row['item']['value'], url_start)) def api_call(params, api_url=wikidata_url): call_params = { 'format': 'json', 'formatversion': 2, **params, } r = requests.get(wikidata_url, params=call_params) return r def get_entity(qid): json_data = api_call({'action': 'wbgetentities', 'ids': qid}).json() try: entity = list(json_data['entities'].values())[0] except KeyError: return if 'missing' not in entity: return entity def get_entities(ids, **params): if not ids: return [] params = { 'action': 'wbgetentities', 'ids': '|'.join(ids), **params, } r = api_call(params) json_data = r.json() return list(json_data['entities'].values()) @app.route("/") def index(): return render_template('index.html', props=find_more_props) def run_query_with_cache(q, name): filename = f'cache/{name}.json' if os.path.exists(filename): bindings = json.load(open(filename)) else: r = run_wikidata_query(q) bindings = r.json()['results']['bindings'] json.dump(bindings, open(filename, 'w'), indent=2) return bindings @app.route("/property/P") def property_query_page(property_id): pid = f'P{property_id}' q = property_query.replace('PID', pid) open(f'cache/{pid}_query.sparql', 'w').write(q) rows = run_query_with_cache(q, name=pid) label = find_more_props[pid] return render_template('property.html', label=label, pid=pid, rows=rows) @app.route("/item/Q") def item_page(item_id): qid = f'Q{item_id}' return render_template('item.html', qid=qid) def get_en_label(entity): if 'en' in entity['labels']: return entity['labels']['en']['value'] def get_labels(keys, name=None): keys = sorted(keys, key=lambda i: int(i[1:])) if name is None: name = '_'.join(keys) filename = f'cache/{name}_labels.json' if os.path.exists(filename): labels = json.load(open(filename)) else: labels = [] for cur in chunk(keys, 50): labels += get_entities(cur, props='labels') json.dump(labels, open(filename, 'w'), indent=2) try: return {prop['id']: get_en_label(prop) or '[ label missing ]' for prop in labels} except TypeError: pprint(labels) raise def get_entity_with_cache(qid): filename = f'cache/{qid}.json' if os.path.exists(filename): entity = json.load(open(filename)) else: entity = get_entity(qid) json.dump(entity, open(filename, 'w'), indent=2) return entity def commons_uri_to_filename(uri): return urllib.parse.unquote(drop_start(uri, commons_start)) def image_detail(filenames, thumbheight=None, thumbwidth=None): if not isinstance(filenames, list): filenames = [filenames] if not filenames: return {} params = { 'action': 'query', 'titles': '|'.join(f'File:{f}' for f in filenames), 'prop': 'imageinfo', 'iiprop': 'url', } if thumbheight is not None: params['iiurlheight'] = thumbheight if thumbwidth is not None: params['iiurlwidth'] = thumbwidth r = api_call(params, api_url=commons_url) images = {} for image in r.json()['query']['pages']: filename = drop_start(image['title'], 'File:') images[filename] = image['imageinfo'][0] return images def image_detail_old(filenames, thumbwidth=None): if not isinstance(filenames, list): filenames = [filenames] params = {'image': '|'.join(filenames)} if thumbwidth is not None: params['thumbwidth'] = thumbwidth r = requests.get(commons_api_url, params=params) xml = r.text # workaround a bug in the commons API # the API doesn't encode " in filenames for f in filenames: if '"' not in f: continue esc = f.replace('"', '"') xml = xml.replace(f'name="{f}"', f'name="{esc}"') root = lxml.etree.fromstring(xml.encode('utf-8')) images = [] for image in root: if image.tag == 'image': file_element = image.find('./file') elif image.tag == 'file': file_element = image else: continue thumb_element = file_element.find('./urls/thumbnail') image = { 'name': image.get('name'), 'image': file_element.find('./urls/file').text, 'height': int(file_element.find('./height').text), 'width': int(file_element.find('./width').text), } if thumb_element is not None: image['thumbnail'] = thumb_element.text images.append(image) return images # def commons_filename(row): # image = row['image']['value'] # assert image.startswith(commons_start) # return urllib.parse.unquote(image[len(commons_start):]) # # def commons_api(row): # params = { # 'image': commons_filename(row), # 'thumbwidth': thumbwidth, # } # r = requests.get(commons_api_url, params=params) # return r # # def get_commons(row): # r = commons_api(row) # root = lxml.etree.fromstring(r.content) # # return root.find('./file/urls/thumbnail').text @app.route("/next/Q") def next_page(item_id): qid = f'Q{item_id}' entity = get_entity_with_cache(qid) width = 800 image_filename = entity['claims']['P18'][0]['mainsnak']['datavalue']['value'] filename = f'cache/{qid}_{width}_image.json' if os.path.exists(filename): detail = json.load(open(filename)) else: detail = image_detail([image_filename], thumbwidth=width) json.dump(detail, open(filename, 'w'), indent=2) other_items = set() for key in find_more_props.keys(): if key not in entity['claims']: continue for claim in entity['claims'][key]: other_items.add(claim['mainsnak']['datavalue']['value']['id']) item_labels = get_labels(other_items) if 'en' in entity['labels']: label = entity['labels']['en']['value'] elif len(entity['labels']) == 1: label = list(entity['labels'].values())[0]['value'] else: label = 'title missing' return render_template('next.html', qid=qid, label=label, image=detail[image_filename], labels=find_more_props, other=item_labels, entity=entity) @app.route('/P/Q') def find_more_page(property_id, item_id): pid, qid = f'P{property_id}', f'Q{item_id}' return redirect(url_for('browse_page') + f'?{pid}={qid}') def get_facets(sparql_params, params): flat = '_'.join(f'{pid}={qid}' for pid, qid in params) property_list = ' '.join(f'wdt:{pid}' for pid in find_more_props.keys() if pid not in request.args) q = (facet_query.replace('PARAMS', sparql_params) .replace('PROPERTY_LIST', property_list)) open(f'cache/{flat}_facets_query.sparql', 'w').write(q) bindings = run_query_with_cache(q, flat + '_facets') facets = {key: [] for key in find_more_props.keys()} for row in bindings: pid = row['property']['value'].rpartition('/')[2] qid = row['object']['value'].rpartition('/')[2] label = row['objectLabel']['value'] count = int(row['count']['value']) facets[pid].append({'qid': qid, 'label': label, 'count': count}) return { key: sorted(values, key=lambda i: i['count'], reverse=True)[:15] for key, values in facets.items() if values } @app.route('/browse') def browse_page(): params = [(pid, qid) for pid, qid in request.args.items() if pid.startswith('P') and qid.startswith('Q')] flat = '_'.join(f'{pid}={qid}' for pid, qid in params) # item_entity = get_entity_with_cache(qid) item_labels = get_labels(qid for pid, qid in params) # property_keys = item_entity['claims'].keys() # property_labels = get_labels(property_keys, name=f'{flat}_property_labels') sparql_params = ''.join( f'?item wdt:{pid} wd:{qid} .\n' for pid, qid in params) query = find_more_query.replace('PARAMS', sparql_params) filename = f'cache/{flat}.json' if os.path.exists(filename): bindings = json.load(open(filename)) else: r = run_wikidata_query(query) bindings = r.json()['results']['bindings'] json.dump(bindings, open(filename, 'w'), indent=2) facets = get_facets(sparql_params, params) page_size = 45 item_map = {} for row in bindings: item_id = row_id(row) row_qid = f'Q{item_id}' label = row['itemLabel']['value'] image_filename = commons_uri_to_filename(row['image']['value']) if item_id in item_map: item = item_map[item_id] item['image_filename'].append(image_filename) continue if label == row_qid: if 'title' in row: label = row['title']['value'] else: label = 'name missing' if 'artistLabel' in row: artist_name = row['artistLabel']['value'] else: artist_name = '[artist unknown]' if 'time' in row: t = dateutil.parser.parse(row['time']['value']) precision = int(row['timeprecision']['value']) if precision == 9: d = t.year elif precision == 8: d = f'{t.year}s' elif precision == 7: d = f'{ordinal((t.year // 100) + 1)} century' elif precision == 6: d = f'{ordinal((t.year // 1000) + 1)} millennium' else: d = row['time']['value'] else: d = None item = { 'url': url_for('next_page', item_id=item_id), 'image_filename': [image_filename], 'item_id': item_id, 'qid': row_qid, 'label': label, 'date': d, 'artist_name': artist_name, } item_map[item_id] = item items = [] for item in item_map.values(): if len(item['image_filename']) != 1: continue item['image_filename'] = item['image_filename'][0] items.append(item) if len(items) >= page_size: break filenames = [cur['image_filename'] for cur in items] filename = f'cache/{flat}_{page_size}_images.json' if os.path.exists(filename): detail = json.load(open(filename)) else: detail = image_detail(filenames, thumbwidth=thumbwidth) json.dump(detail, open(filename, 'w'), indent=2) for item in items: item['image'] = detail[item['image_filename']] total = len(bindings) title = ' / '.join(item_labels[qid] for pid, qid in params) return render_template('find_more.html', # qid=qid, # pid=pid, # item_entity=item_entity, # property_labels=property_labels, facets=facets, prop_labels=find_more_props, label=title, labels=find_more_props, bindings=bindings, items=items, total=total) if __name__ == "__main__": app.debug = True app.run(host='0.0.0.0', debug=True) # server = Server(app.wsgi_app) # server.watch('template/*') # server.serve()