"""Flask views.""" import json import os import re import typing from datetime import datetime, timedelta import flask import requests from sqlalchemy import func, or_, update from werkzeug.wrappers import Response from confarchive import database, model, query, utils, wikidata app = flask.Flask(__name__) app.debug = True app.config.from_object("config.default") database.init_app(app) user_agent = "conference-archive/0.1 (contact: edward@4angle.com)" @app.route("/person/", methods=["GET", "POST"]) def person(person_id: int) -> str | Response: """Person page.""" item = model.Person.query.get(person_id) if flask.request.method == "POST" and check_admin_mode(): qid = flask.request.form["wikidata_qid"] or None item.name = flask.request.form["name"] if qid and qid != item.wikidata_qid: item.wikidata_qid = qid wd_item = wikidata.get_item(qid) if "P18" in wd_item["claims"]: claim_p18 = wd_item["claims"]["P18"] wikidata_photo = [ utils.drop_start(s["mainsnak"]["datavalue"]["value"], "-") for s in claim_p18 ] for filename in wikidata_photo: print(filename) wikidata.get_photo(filename) item.wikidata_photo = wikidata_photo database.session.commit() assert flask.request.endpoint return flask.redirect( flask.url_for(flask.request.endpoint, person_id=person_id) ) wikidata_hits: list[dict[str, str]] = [] if False and item.wikidata_qid is None: q = item.name + " haswbstatement:P31=Q5" search_hits = wikidata.search(q) print(len(search_hits)) for search_hit in search_hits: qid = search_hit["title"] wd_item = wikidata.get_item(qid) if "en" in wd_item["labels"]: label = wd_item["labels"]["en"]["value"] else: label = "[no english label]" if "en" in wd_item["descriptions"]: description = wd_item["descriptions"]["en"]["value"] else: description = "[no english description]" wikidata_hits.append( { "qid": qid, "label": label, "description": description, } ) return flask.render_template( "person.html", item=item, Event=model.Event, plural=utils.plural, wikidata_hits=wikidata_hits, is_admin=check_admin_mode, ) @app.route("/person//split", methods=["GET", "POST"]) def split_person(person_id: int) -> str | Response: """Split person.""" item = model.Person.query.get(person_id) assert not item.wikidata_qid if flask.request.method == "GET": return flask.render_template( "person/split.html", item=item, ) names = flask.request.form["names"].strip().splitlines() print(names) assert len(names) > 1 item.name = names[0] for cp in item.conferences_association: cp.named_as = names[0] for num, name in enumerate(names[1:], start=2): person = model.Person(name=name) database.session.add(person) for cp0 in item.conferences_association: cp = model.ConferencePerson( person=person, named_as=name, conference_id=cp0.conference_id ) database.session.add(cp) for event0 in item.events_association: ep = model.EventPerson( person=person, event_id=event0.event_id, position=num ) database.session.add(ep) database.session.commit() return flask.redirect(flask.url_for("person", person_id=person_id)) @app.route("/event/") def event_page(event_id: int) -> str: item = model.Event.query.get(event_id) return flask.render_template("event.html", item=item) @app.route("/conference/", methods=["GET", "POST"]) def conference_page(short_name: str) -> str | Response: item = model.Conference.query.filter_by(short_name=short_name).one_or_none() if item is None: flask.abort(404) if flask.request.method == "POST" and check_admin_mode(): item.short_name = flask.request.form["short_name"] item.title = flask.request.form["title"] database.session.commit() assert flask.request.endpoint return flask.redirect( flask.url_for(flask.request.endpoint, short_name=item.short_name) ) return flask.render_template( "conference.html", item=item, person_image_filename=person_image_filename, is_admin=check_admin_mode, ) @app.route("/people") def search_people() -> str: search_for = flask.request.args["q"] assert search_for search_for = search_for.strip() q = model.Person.query.filter(model.Person.name.ilike(f"%{search_for}%")).order_by( model.Person.name ) return flask.render_template( "search_people.html", q=q, search_for=search_for, is_admin=check_admin_mode ) @app.route("/merge", methods=["GET", "POST"]) def merge() -> str | Response: """Merge speakers.""" assert check_admin_mode() if flask.request.method == "GET": search_for = flask.request.args["q"] assert search_for search_for = search_for.strip() q = query.search_for_people(search_for) return flask.render_template("merge_people.html", q=q, search_for=search_for) assert flask.request.method == "POST" search_for = flask.request.form["q"] item_ids_str = flask.request.form.getlist("person_id") item_ids: list[int] = [int(i) for i in item_ids_str] merge_to_id: int = min(item_ids) other_ids = [i for i in item_ids if i != merge_to_id] name_from_person_id = int(flask.request.form["name"]) print(other_ids, "->", merge_to_id) conference_people = model.ConferencePerson.query.filter( model.ConferencePerson.person_id.in_(other_ids) ) with database.session.begin(): merge_to = model.Person.query.get(merge_to_id) existing_conferences = {conf.id for conf in merge_to.conferences} to_delete = [ cp for cp in conference_people if cp.conference_id in existing_conferences ] for cp in to_delete: database.session.delete(cp) if merge_to_id != name_from_person_id: name_from_person = model.Person.query.get(name_from_person_id) merge_to.name = name_from_person.name print("update ConferencePerson") database.session.execute( update(model.ConferencePerson) .where(model.ConferencePerson.person_id.in_(other_ids)) .values(person_id=merge_to_id) ) print("update EventPerson") database.session.execute( update(model.EventPerson) .where(model.EventPerson.person_id.in_(other_ids)) .values(person_id=merge_to_id) ) print("delete people") for person_id in other_ids: item = model.Person.query.get(person_id) database.session.delete(item) endpoint = flask.request.endpoint assert endpoint return flask.redirect(flask.url_for(endpoint, q=search_for)) @app.route("/events") def events_page() -> str: """Events page.""" search_for = flask.request.args.get("q") if search_for: q = query.search_for_events(search_for) return flask.render_template("search_events.html", q=q, search_for=search_for) else: return flask.render_template("top_events.html", top_events=query.top_events()) @app.route("/") def index() -> str: """Start page.""" q = model.Conference.query.order_by(model.Conference.start.desc()) count = { "conference": model.Conference.query.count(), "event": model.Event.query.count(), "person": model.Person.query.count(), "country": model.Country.query.count(), "venue": model.Venue.query.count(), } return flask.render_template("index.html", items=q, count=count) @app.route("/series") def list_series() -> str: """Page showing list of conference series.""" items = model.Series.query return flask.render_template("series/list.html", items=items) @app.route("/speakers") def top_speakers_page() -> str: """Top speakers page.""" top = query.top_speakers().having(func.count() > 4) return flask.render_template( "top_speakers.html", top_speakers=top, speaker_counts=query.speaker_counts(), plural=utils.plural, person_image_filename=person_image_filename, ) @app.route("/country") def country_list() -> str: """Country list.""" return flask.render_template("country_list.html", items=model.Country.query) @app.route("/city//venue/new", methods=["GET", "POST"]) def add_venue(city_id: int) -> str | Response: """Add new venue.""" city = model.City.query.get(city_id) if flask.request.method != "POST": return flask.render_template("add_venue.html", city=city) name = flask.request.form["name"] wikidata_qid = flask.request.form["wikidata_qid"] venue = model.Venue(name=name, city=city, wikidata_qid=wikidata_qid) database.session.add(venue) database.session.commit() endpoint = flask.endpoint return flask.redirect(flask.url_for(endpoint)) @app.route("/wikidata") def link_to_wikidata() -> str: items = [] top = ( query.top_speakers() .filter(model.Person.name.like("% %"), model.Person.wikidata_qid.is_(None)) .having(func.count() > 2) ) for person, num in top: search_hits = wikidata.search(person.name + " haswbstatement:P31=Q5") if not search_hits: continue if len(search_hits) > 14: continue hits = [] for search_hit in search_hits: qid = search_hit["title"] item = wikidata.get_item(qid) if "en" in item["labels"]: label = item["labels"]["en"]["value"] else: label = "[no english label]" if "en" in item["descriptions"]: description = item["descriptions"]["en"]["value"] else: description = "[no english description]" hits.append( { "qid": qid, "label": label, "description": description, } ) items.append((person, num, hits)) return flask.render_template("wikidata.html", items=items) @app.route("/search") def search_everything() -> str: search_for = flask.request.args.get("q") if not search_for: return flask.render_template("search_everything.html") search_for = search_for.strip() like = f"%{search_for}%" people = model.Person.query.filter(model.Person.name.ilike(like)).order_by( model.Person.name ) events = model.Event.query.filter( or_(model.Event.abstract.ilike(like), model.Event.description.ilike(like)) ).order_by(model.Event.event_date) return flask.render_template( "search_everything.html", people=people, events=events, search_for=search_for ) @app.route("/person//delete", methods=["POST"]) def delete_person(person_id: int) -> str | Response: assert app.config["ADMIN_MODE"] item = model.Person.query.get(person_id) for cp in item.conferences_association: database.session.delete(cp) for ep in item.events_association: database.session.delete(ep) database.session.delete(item) database.session.commit() return flask.redirect(flask.url_for("index")) def person_image_filename(person_id: int) -> str: """Filename for speaker photo.""" person = model.Person.query.get(person_id) return os.path.join("wikidata_photo", "thumb", person.wikidata_photo[0]) @app.route("/login", methods=["GET", "POST"]) def login() -> str | Response: """Login page.""" if flask.request.method == "GET": return flask.render_template("login.html") password = flask.request.form["password"] if password != app.config["ADMIN_PASSWORD"]: endpoint = flask.endpoint return flask.redirect(endpoint) flask.session["admin_password"] = password return flask.redirect(flask.url_for("index")) def check_admin_mode() -> bool: """User is an admin.""" return bool(flask.session.get("admin_password") == app.config["ADMIN_PASSWORD"]) @app.route("/github_wikidata") def github_wikidata() -> str: """Look for speakers on Wikidata based on the GitHub property.""" items = [] for line in open("found_wikidata_github"): person_id, person_name, qid, wd_name, github, desc = eval(line) person = model.Person.query.get(person_id) if person.wikidata_qid: continue items.append((person, qid, wd_name, desc)) items.sort(key=lambda i: len(i[0].name)) return flask.render_template("github.html", items=items) @app.route("/import") def import_start() -> str | Response: """Begin import.""" assert check_admin_mode() url = flask.request.args.get("url") if not url: return flask.render_template("import/start.html") m = re.match("https?://([^/]+)/", url) assert m hostname = m.group(1) return flask.redirect(flask.url_for("import_hostname", hostname=hostname)) def pretalx_api(hostname: str, path: str) -> typing.Any: """Call pretalx API and cache results.""" import_dir = os.path.join(app.config["DATA_DIR"], "import") api_url = f"https://{hostname}/api/{path}" cache_start = hostname + "_" + path.strip("/").replace("/", "_") existing = [ f for f in os.listdir(import_dir) if f.startswith(cache_start + "_2023") ] if existing: recent_filename = max(existing) recent = datetime.strptime( recent_filename, cache_start + "_%Y-%m-%d_%H:%M.json" ) delta = flask.g.now - recent if not existing or delta > timedelta(hours=1): filename = os.path.join(import_dir, f"{cache_start}_{flask.g.now_str}.json") r = requests.get(api_url, params={"limit": 500}) with open(filename, "w") as out: out.write(r.text) events = r.json() else: events = json.load(open(os.path.join(import_dir, recent_filename))) return typing.cast(typing.Any, events) def set_now() -> None: """Record current datetime in flask globals.""" flask.g.now = datetime.utcnow() flask.g.now_str = flask.g.now.strftime("%Y-%m-%d_%H:%M") @app.route("/import/") def import_hostname(hostname: str) -> str: """Import from hostname.""" assert check_admin_mode() set_now() events = pretalx_api(hostname, "events/") slugs = [event["slug"] for event in events] titles = [prefer_en_label(event["name"]) for event in events] slug_lookup = {event["slug"]: event for event in events} name_lookup = {prefer_en_label(event["name"]): event for event in events} print(slugs) q = model.Conference.query.filter( or_(model.Conference.short_name.in_(slugs), model.Conference.title.in_(titles)) ) print(q.count()) for conf in q: slug = conf.short_name if slug in slug_lookup: slug_lookup[slug]["existing"] = conf if conf.title in name_lookup: name_lookup[conf.title]["existing"] = conf return flask.render_template( "import/start.html", events=events, hostname=hostname, prefer_en_label=prefer_en_label, ) def run_import( event: dict[str, typing.Any], speakers: list[dict[str, typing.Any]], talks: list[dict[str, typing.Any]], speaker_lookup: dict[str, typing.Any], ) -> model.Conference: if True: conf = model.Conference( short_name=event["slug"], title=prefer_en_label(event["name"]), start=event["date_from"], end=event["date_to"], ) database.session.add(conf) code_to_speaker = {} for speaker in speakers: code, name, photo_url = speaker["code"], speaker["name"], speaker["avatar"] if code == "9NX3NE": continue if speaker_lookup[code].count(): person = speaker_lookup[code].one() else: person = model.Person(name=name) database.session.add(person) code_to_speaker[code] = person cp = model.ConferencePerson( person=person, conference=conf, named_as=name, photo_url=photo_url or None, ) database.session.add(cp) for talk in talks: if not talk["speakers"]: continue for s in talk["speakers"]: if s["code"] == "9NX3NE": s["code"] = "3BRWWP" start_time = talk["slot"]["start"][11:16] assert re.match(r"\d\d:\d\d", start_time) event = model.Event( title=talk["title"], conference=conf, event_date=talk["slot"]["start"], duration=talk["duration"], room=prefer_en_label(talk["slot"]["room"]), track=prefer_en_label(talk["track"]), abstract=talk["abstract"], description=talk["description"], start=start_time, people=[code_to_speaker[s["code"]] for s in talk["speakers"]], ) database.session.add(event) database.session.commit() if False: conf = model.Conference.query.filter_by(short_name="osfc2018").one() q = model.ConferencePerson.query.filter( model.ConferencePerson.conference == conf, model.ConferencePerson.photo_url.isnot(None), ) photo_dir = "confarchive/static/conference_photo" web_session = requests.Session() web_session.headers.update({"User-Agent": user_agent}) for cp in q: if not cp.photo_url: continue print(cp.photo_url) r = web_session.get(cp.photo_url) content_type = r.headers.get("Content-Type") print(content_type) assert content_type in model.content_type_to_extension image_ext = model.content_type_to_extension[content_type] photo_filename = f"{conf.id}_{cp.person_id}.{image_ext}" print((cp.person_id, cp.named_as, content_type, image_ext, photo_filename)) full_photo = os.path.join(photo_dir, photo_filename) with open(full_photo, "wb") as out: out.write(r.content) cp.photo_url_content_type = content_type database.session.commit() return conf def prefer_en_label(labels: dict[str, str] | None) -> str | None: if labels is None: return None if "en" in labels: return labels["en"] if len(labels) == 1: return list(labels.values())[0] else: return " / ".join(f"{lang}: {label}" for lang, label in labels.items()) def find_matching_name(name: str): """People with a matching name.""" name_parts = name.split() q1 = model.Person.query.filter(model.Person.name == name) if q1.count(): return q1 name_pattern = "%" + "%".join(name_parts) + "%" q2 = model.Person.query.filter(model.Person.name.ilike(name_pattern)) if len(name_parts) == 1 and q2.count() > 1: return q1 return q2 @app.route("/import//", methods=["GET", "POST"]) def import_event(hostname: str, slug: str) -> str | Response: """Import event.""" set_now() event = pretalx_api(hostname, f"events/{slug}") rooms = pretalx_api(hostname, f"events/{slug}/rooms") speakers = pretalx_api(hostname, f"events/{slug}/speakers") talks = pretalx_api(hostname, f"events/{slug}/talks") print(len(speakers["results"]), speakers["count"]) print((speakers["next"], speakers["previous"])) assert len(speakers["results"]) == speakers["count"] assert len(talks["results"]) == talks["count"] all_talk_speakers: set[str] = set() for talk in talks["results"]: all_talk_speakers.update(speaker["code"] for speaker in talk["speakers"]) person_candidates = { speaker["code"]: find_matching_name(speaker["name"]) for speaker in speakers["results"] } if flask.request.method == "GET": return flask.render_template( "import/event.html", hostname=hostname, slug=slug, event=event, rooms=rooms, speakers=[s for s in speakers["results"] if s["code"] in all_talk_speakers], talks=talks, person_candidates=person_candidates, plural=utils.plural, prefer_en_label=prefer_en_label, ) conf = run_import(event, speakers["results"], talks["results"], person_candidates) return flask.redirect(flask.url_for("conference_page", short_name=conf.short_name)) @app.route("/reports") def reports_page() -> str: """Page showing statistics.""" event_count = model.Event.query.count() missing_event_date_count = model.Event.query.filter( model.Event.event_date.is_(None) ).count() speaker_count = model.Person.query.count() no_bio_count = ( model.Person.query.join(model.ConferencePerson) .filter(model.ConferencePerson.bio.is_(None)) .group_by(model.Person) .count() ) one_bio_count = ( model.Person.query.join(model.ConferencePerson) .group_by(model.Person) .filter(model.ConferencePerson.bio.isnot(None)) .having(func.count() == 1) .count() ) multiple_bio = ( model.Person.query.join(model.ConferencePerson) .group_by(model.Person) .filter(model.ConferencePerson.bio.isnot(None)) .having(func.count() > 1) ) shorter_recent_bio = [] for person in multiple_bio: bio_with_date = sorted( [ (cp.conference.start, cp.bio) for cp in person.conferences_association if cp.bio ], reverse=True, ) if len(bio_with_date) < 2: continue most_recent_bio = bio_with_date[0][1] len_recent_bio = len(most_recent_bio) longest = max(len(bio) for start, bio in bio_with_date[1:]) if longest > len_recent_bio * 2: shorter_recent_bio.append((person, len_recent_bio, longest)) return flask.render_template( "reports.html", event_count=event_count, speaker_count=speaker_count, no_bio_count=no_bio_count, one_bio_count=one_bio_count, multiple_bio_count=multiple_bio.count(), shorter_recent_bio=shorter_recent_bio, missing_event_date_count=missing_event_date_count, missing_event_date=model.Event.query.filter( model.Event.event_date.is_(None) ).order_by(model.Event.title), no_venue_count=model.Conference.query.filter( model.Conference.venue_id.is_(None), model.Conference.online.isnot(True) ).count(), ) if __name__ == "__main__": app.run(host="0.0.0.0", port=5002)