#!/usr/bin/python3 import hashlib import json import os import time import typing from typing import cast import flask import requests import sqlalchemy from sqlalchemy import func, update from werkzeug.wrappers import Response from confarchive import database, model app = flask.Flask(__name__) app.debug = True app.config.from_object("config.default") database.init_app(app) wikidata_api = "https://www.wikidata.org/w/api.php" def md5sum(s: str) -> str: return hashlib.md5(s.encode("utf-8")).hexdigest() def wikidata_search(q: str) -> list[dict[str, typing.Any]]: q += " haswbstatement:P31=Q5" q_md5 = md5sum(q) cache_filename = os.path.join("cache", q_md5 + ".json") if os.path.exists(cache_filename): data = json.load(open(cache_filename)) else: params: dict[str, str | int] = { "action": "query", "list": "search", "format": "json", "formatversion": 2, "srsearch": q, "srlimit": "max", } r = requests.get(wikidata_api, params=params) open(cache_filename, "w").write(r.text) data = r.json() time.sleep(1) return cast(dict[str, typing.Any], data["query"]["search"]) def wikidata_get_item(qid: str) -> typing.Any: cache_filename = os.path.join("items", qid + ".json") if os.path.exists(cache_filename): item = json.load(open(cache_filename)) else: params: dict[str, str | int] = { "action": "wbgetentities", "ids": qid, "format": "json", "formatversion": 2, } r = requests.get(wikidata_api, params=params) item = r.json()["entities"][qid] with open(cache_filename, "w") as f: json.dump(item, f, indent=2) time.sleep(0.1) return item def top_speakers() -> sqlalchemy.orm.query.Query: q = ( database.session.query(model.Person, func.count()) .join(model.ConferencePerson) .group_by(model.Person) .order_by(func.count().desc()) .having(func.count() > 5) ) return q def top_speakers2() -> sqlalchemy.orm.query.Query: q = ( database.session.query(model.Person, func.count()) .join(model.ConferencePerson) .filter(model.Person.name.like("% %")) .group_by(model.Person) .order_by(func.count().desc()) .having(func.count() > 2) ) # .order_by(func.length(model.Person.name).desc()) return q def top_events() -> sqlalchemy.orm.query.Query: q = ( database.session.query(model.Event.title, func.count()) .group_by(model.Event.title) .order_by(func.count().desc()) .having(func.count() > 3) ) return q @app.route("/person/", methods=["GET", "POST"]) def person(person_id: int) -> str | Response: item = model.Person.query.get(person_id) if flask.request.method == "POST": item.wikidata_qid = flask.request.form["wikidata_qid"] or None item.name = flask.request.form["name"] database.session.commit() assert flask.request.endpoint return flask.redirect( flask.url_for(flask.request.endpoint, person_id=person_id) ) return flask.render_template("person.html", item=item, Event=model.Event) @app.route("/event/") def event_page(event_id: int) -> str: item = model.Event.query.get(event_id) return flask.render_template("event.html", item=item) @app.route("/conference/") def conference_page(short_name: str) -> str: item = model.Conference.query.filter_by(short_name=short_name).one_or_none() if item is None: flask.abort(404) return flask.render_template("conference.html", item=item) @app.route("/people") def search_people() -> str: search_for = flask.request.args["q"] assert search_for search_for = search_for.strip() q = model.Person.query.filter(model.Person.name.ilike(f"%{search_for}%")).order_by( model.Person.name ) return flask.render_template("search_people.html", q=q, search_for=search_for) @app.route("/merge", methods=["GET", "POST"]) def merge() -> str | Response: if flask.request.method == "POST": search_for = flask.request.form["q"] item_ids_str = flask.request.form.getlist("person_id") item_ids: list[int] = [int(i) for i in item_ids_str] merge_to_id = min(item_ids) other_ids = [i for i in item_ids if i != merge_to_id] print(other_ids, "->", merge_to_id) with database.session.begin(): print("update ConferencePerson") database.session.execute( update(model.ConferencePerson) .where(model.ConferencePerson.person_id.in_(other_ids)) .values(person_id=merge_to_id) ) print("update EventPerson") database.session.execute( update(model.EventPerson) .where(model.EventPerson.person_id.in_(other_ids)) .values(person_id=merge_to_id) ) print("delete people") for person_id in other_ids: item = model.Person.query.get(person_id) database.session.delete(item) endpoint = flask.request.endpoint assert endpoint return flask.redirect(flask.url_for(endpoint, q=search_for)) else: search_for = flask.request.args["q"] assert search_for search_for = search_for.strip() q = model.Person.query.filter(model.Person.name.ilike(f"%{search_for}%")).order_by( model.Person.name ) return flask.render_template("merge_people.html", q=q, search_for=search_for) @app.route("/events") def events_page() -> str: search_for = flask.request.args.get("q") if not search_for: return flask.render_template("top_events.html", top_events=top_events()) q = model.Event.query.filter(model.Event.title.ilike(f"%{search_for}%")).order_by( model.Event.title ) return flask.render_template("search_events.html", q=q, search_for=search_for) @app.route("/") def index() -> str: """Start page.""" if False: q = ( model.Conference.query.order_by(model.Conference.start.desc()) .add_columns( func.count(model.Event.id), func.count(model.ConferencePerson.person_id) ) .group_by(model.Conference) ) q = model.Conference.query.order_by(model.Conference.start.desc()) count = { "conference": model.Conference.query.count(), "event": model.Event.query.count(), "person": model.Person.query.count(), } return flask.render_template("index.html", items=q, count=count) @app.route("/speakers") def top_speakers_page() -> str: """Top speakers page.""" return flask.render_template("top_speakers.html", top_speakers=top_speakers()) @app.route("/country") def country_list() -> str: """Country list.""" return flask.render_template("country_list.html", items=model.Country.query) @app.route("/city//venue/new", methods=["GET", "POST"]) def add_venue(city_id: int) -> str | Response: """Add new venue.""" city = model.City.query.get(city_id) if flask.request.method != "POST": return flask.render_template("add_venue.html", city=city) name = flask.request.form["name"] wikidata_qid = flask.request.form["wikidata_qid"] venue = model.Venue(name=name, city=city, wikidata_qid=wikidata_qid) database.session.add(venue) database.session.commit() endpoint = flask.endpoint return flask.redirect(flask.url_for(endpoint)) @app.route("/wikidata") def link_to_wikidata() -> str: items = [] for person, num in top_speakers2(): if person.wikidata_qid: continue search_hits = wikidata_search(f'"{person.name}"') if not search_hits: continue if len(search_hits) > 10: continue hits = [] for search_hit in search_hits: qid = search_hit["title"] item = wikidata_get_item(qid) if "en" in item["labels"]: label = item["labels"]["en"]["value"] else: label = "[no english label]" if "en" in item["descriptions"]: description = item["descriptions"]["en"]["value"] else: description = "[no english description]" hits.append( { "qid": qid, "label": label, "description": description, } ) items.append((person, num, hits)) return flask.render_template("wikidata.html", items=items) if __name__ == "__main__": app.run(host="0.0.0.0", port=5002)