#!/usr/bin/python3 import hashlib import json import os import time import typing from typing import cast import flask import requests import sqlalchemy from sqlalchemy import func, or_, update from werkzeug.wrappers import Response from confarchive import database, model, wikidata app = flask.Flask(__name__) app.debug = True app.config.from_object("config.default") database.init_app(app) wikidata_api = "https://www.wikidata.org/w/api.php" def md5sum(s: str) -> str: return hashlib.md5(s.encode("utf-8")).hexdigest() def wikidata_search(q: str) -> list[dict[str, typing.Any]]: q += " haswbstatement:P31=Q5" q_md5 = md5sum(q) cache_filename = os.path.join("cache", q_md5 + ".json") if os.path.exists(cache_filename): data = json.load(open(cache_filename)) else: params: dict[str, str | int] = { "action": "query", "list": "search", "format": "json", "formatversion": 2, "srsearch": q, "srlimit": "max", } r = requests.get(wikidata_api, params=params) open(cache_filename, "w").write(r.text) data = r.json() time.sleep(1) return cast(list[dict[str, typing.Any]], data["query"]["search"]) def top_speakers() -> sqlalchemy.orm.query.Query: q = ( database.session.query(model.Person, func.count()) .join(model.ConferencePerson) .filter(model.Person.id != 1046) # FOSDEM Staff .group_by(model.Person) .order_by(func.count().desc(), model.Person.name) .having(func.count() > 4) ) return q def top_speakers2() -> sqlalchemy.orm.query.Query: q = ( database.session.query(model.Person, func.count()) .join(model.ConferencePerson) .filter(model.Person.name.like("% %")) .group_by(model.Person) .order_by(func.count().desc()) .having(func.count() > 2) ) # .order_by(func.length(model.Person.name).desc()) return q def top_events() -> sqlalchemy.orm.query.Query: q = ( database.session.query(model.Event.title, func.count()) .group_by(model.Event.title) .order_by(func.count().desc()) .having(func.count() > 3) ) return q def drop_start(s: str, start: str) -> str: return s[len(start) :] if s.startswith(start) else s @app.route("/person/", methods=["GET", "POST"]) def person(person_id: int) -> str | Response: item = model.Person.query.get(person_id) if flask.request.method == "POST": qid = flask.request.form["wikidata_qid"] or None item.name = flask.request.form["name"] if qid and qid != item.wikidata_qid: item.wikidata_qid = qid wd_item = wikidata.get_item(qid) if "P18" in wd_item["claims"]: claim_p18 = wd_item["claims"]["P18"] wikidata_photo = [ drop_start(s["mainsnak"]["datavalue"]["value"], "-") for s in claim_p18 ] for filename in wikidata_photo: print(filename) wikidata.get_photo(filename) item.wikidata_photo = wikidata_photo database.session.commit() assert flask.request.endpoint return flask.redirect( flask.url_for(flask.request.endpoint, person_id=person_id) ) wikidata_hits: list[dict[str, str]] = [] if False and item.wikidata_qid is None: search_hits = wikidata_search(item.name) print(len(search_hits)) for search_hit in search_hits: qid = search_hit["title"] wd_item = wikidata.get_item(qid) if "en" in wd_item["labels"]: label = wd_item["labels"]["en"]["value"] else: label = "[no english label]" if "en" in wd_item["descriptions"]: description = wd_item["descriptions"]["en"]["value"] else: description = "[no english description]" wikidata_hits.append( { "qid": qid, "label": label, "description": description, } ) return flask.render_template( "person.html", item=item, Event=model.Event, plural=plural, wikidata_hits=wikidata_hits, ) @app.route("/event/") def event_page(event_id: int) -> str: item = model.Event.query.get(event_id) return flask.render_template("event.html", item=item) @app.route("/conference/") def conference_page(short_name: str) -> str: item = model.Conference.query.filter_by(short_name=short_name).one_or_none() if item is None: flask.abort(404) return flask.render_template( "conference.html", item=item, person_image_filename=person_image_filename ) @app.route("/people") def search_people() -> str: search_for = flask.request.args["q"] assert search_for search_for = search_for.strip() q = model.Person.query.filter(model.Person.name.ilike(f"%{search_for}%")).order_by( model.Person.name ) return flask.render_template("search_people.html", q=q, search_for=search_for) @app.route("/merge", methods=["GET", "POST"]) def merge() -> str | Response: assert app.config["ADMIN_MODE"] if flask.request.method == "POST": search_for = flask.request.form["q"] item_ids_str = flask.request.form.getlist("person_id") item_ids: list[int] = [int(i) for i in item_ids_str] merge_to_id = min(item_ids) other_ids = [i for i in item_ids if i != merge_to_id] name_from_person_id = flask.request.form["name"] print(other_ids, "->", merge_to_id) with database.session.begin(): if merge_to_id != name_from_person_id: merge_to = model.Person.query.get(merge_to_id) name_from_person = model.Person.query.get(name_from_person_id) merge_to.name = name_from_person.name print("update ConferencePerson") database.session.execute( update(model.ConferencePerson) .where(model.ConferencePerson.person_id.in_(other_ids)) .values(person_id=merge_to_id) ) print("update EventPerson") database.session.execute( update(model.EventPerson) .where(model.EventPerson.person_id.in_(other_ids)) .values(person_id=merge_to_id) ) print("delete people") for person_id in other_ids: item = model.Person.query.get(person_id) database.session.delete(item) endpoint = flask.request.endpoint assert endpoint return flask.redirect(flask.url_for(endpoint, q=search_for)) else: search_for = flask.request.args["q"] assert search_for search_for = search_for.strip() q = model.Person.query.filter(model.Person.name.ilike(f"%{search_for}%")).order_by( model.Person.name ) return flask.render_template("merge_people.html", q=q, search_for=search_for) @app.route("/events") def events_page() -> str: search_for = flask.request.args.get("q") if not search_for: return flask.render_template("top_events.html", top_events=top_events()) q = model.Event.query.filter(model.Event.title.ilike(f"%{search_for}%")).order_by( model.Event.title ) return flask.render_template("search_events.html", q=q, search_for=search_for) @app.route("/") def index() -> str: """Start page.""" if False: q = ( model.Conference.query.order_by(model.Conference.start.desc()) .add_columns( func.count(model.Event.id), func.count(model.ConferencePerson.person_id) ) .group_by(model.Conference) ) q = model.Conference.query.order_by(model.Conference.start.desc()) count = { "conference": model.Conference.query.count(), "event": model.Event.query.count(), "person": model.Person.query.count(), "country": model.Country.query.count(), "venue": model.Venue.query.count(), } return flask.render_template("index.html", items=q, count=count) def plural(num: int, label: str) -> str: return f'{num:,d} {label}{"s" if num != 1 else ""}' def speaker_counts(): sql = """ select num, count(*) from (select person_id, count(*) as num from conference_person group by person_id) a group by num order by num """ return database.session.execute(sql) @app.route("/speakers") def top_speakers_page() -> str: top = top_speakers() """Top speakers page.""" photos = [] for person, count in top: photo = person.photo_filename() if photo: photos.append((person, photo)) left_photos = photos[::2] right_photos = photos[1::2] photo_person_ids = [person.id for person, photo in photos] left = photo_person_ids[::2] right = photo_person_ids[1::2] return flask.render_template( "top_speakers.html", top_speakers=top, speaker_counts=speaker_counts(), plural=plural, person_image_filename=person_image_filename, # photo_person_ids=photo_person_ids, left=left, right=right, left_photos=left_photos, right_photos=right_photos, ) @app.route("/country") def country_list() -> str: """Country list.""" return flask.render_template("country_list.html", items=model.Country.query) @app.route("/city//venue/new", methods=["GET", "POST"]) def add_venue(city_id: int) -> str | Response: """Add new venue.""" city = model.City.query.get(city_id) if flask.request.method != "POST": return flask.render_template("add_venue.html", city=city) name = flask.request.form["name"] wikidata_qid = flask.request.form["wikidata_qid"] venue = model.Venue(name=name, city=city, wikidata_qid=wikidata_qid) database.session.add(venue) database.session.commit() endpoint = flask.endpoint return flask.redirect(flask.url_for(endpoint)) @app.route("/wikidata") def link_to_wikidata() -> str: items = [] for person, num in top_speakers2(): if person.wikidata_qid: continue search_hits = wikidata_search(person.name) if not search_hits: continue if len(search_hits) > 14: continue hits = [] for search_hit in search_hits: qid = search_hit["title"] item = wikidata_get_item(qid) if "en" in item["labels"]: label = item["labels"]["en"]["value"] else: label = "[no english label]" if "en" in item["descriptions"]: description = item["descriptions"]["en"]["value"] else: description = "[no english description]" hits.append( { "qid": qid, "label": label, "description": description, } ) items.append((person, num, hits)) return flask.render_template("wikidata.html", items=items) @app.route("/search") def search_everything() -> str: search_for = flask.request.args.get("q") if not search_for: return flask.render_template("search_everything.html") search_for = search_for.strip() like = f"%{search_for}%" people = model.Person.query.filter(model.Person.name.ilike(like)).order_by( model.Person.name ) events = model.Event.query.filter( or_(model.Event.abstract.ilike(like), model.Event.description.ilike(like)) ).order_by(model.Event.event_date) return flask.render_template( "search_everything.html", people=people, events=events, search_for=search_for ) @app.route("/person//delete", methods=["POST"]) def delete_person(person_id: int) -> str | Response: assert app.config["ADMIN_MODE"] item = model.Person.query.get(person_id) for cp in item.conferences_association: database.session.delete(cp) for ep in item.events_association: database.session.delete(ep) database.session.delete(item) database.session.commit() return flask.redirect(flask.url_for("index")) def person_image_filename(person_id): person = model.Person.query.get(person_id) return os.path.join("wikidata_photo", "thumb", person.wikidata_photo[0]) for filename in person.wikidata_photo: face_crop = "face_1_" + filename full = os.path.join("static", "wikidata_photo", "face_cropped", face_crop) if os.path.exists(full): return os.path.join("wikidata_photo", "face_cropped", face_crop) return os.path.join("wikidata_photo", "thumb", person.wikidata_photo[0]) @app.route("/github_wikidata") def github_wikidata() -> str: items = [] for line in open("found_wikidata_github"): person_id, person_name, qid, wd_name, github, photo = eval(line) person = model.Person.query.get(person_id) if person.wikidata_qid: continue items.append((person, qid, wd_name, photo)) return flask.render_template("github.html", items=items) @app.route("/reports") def reports_page() -> str: event_count = model.Event.query.count() missing_event_date_count = model.Event.query.filter( model.Event.event_date.is_(None) ).count() speaker_count = model.Person.query.count() no_bio_count = ( model.Person.query.join(model.ConferencePerson) .filter(model.ConferencePerson.bio.is_(None)) .group_by(model.Person) .count() ) one_bio_count = ( model.Person.query.join(model.ConferencePerson) .group_by(model.Person) .filter(model.ConferencePerson.bio.isnot(None)) .having(func.count() == 1) .count() ) multiple_bio = ( model.Person.query.join(model.ConferencePerson) .group_by(model.Person) .filter(model.ConferencePerson.bio.isnot(None)) .having(func.count() > 1) ) shorter_recent_bio = [] for person in multiple_bio: bio_with_date = sorted( [ (cp.conference.start, cp.bio) for cp in person.conferences_association if cp.bio ], reverse=True, ) if len(bio_with_date) < 2: continue most_recent_bio = bio_with_date[0][1] len_recent_bio = len(most_recent_bio) longest = max(len(bio) for start, bio in bio_with_date[1:]) if longest > len_recent_bio * 2: shorter_recent_bio.append((person, len_recent_bio, longest)) return flask.render_template( "reports.html", event_count=event_count, speaker_count=speaker_count, no_bio_count=no_bio_count, one_bio_count=one_bio_count, multiple_bio_count=multiple_bio.count(), shorter_recent_bio=shorter_recent_bio, missing_event_date_count=missing_event_date_count, missing_event_date=model.Event.query.filter( model.Event.event_date.is_(None) ).order_by(model.Event.title), ) if __name__ == "__main__": app.run(host="0.0.0.0", port=5002)