conference-archive/main.py
2023-09-21 04:59:17 +01:00

388 lines
11 KiB
Python
Executable file

#!/usr/bin/python3
import hashlib
import json
import os
import time
import typing
from typing import cast
import flask
import requests
import sqlalchemy
from sqlalchemy import func, or_, update
from werkzeug.wrappers import Response
from confarchive import database, model
app = flask.Flask(__name__)
app.debug = True
app.config.from_object("config.default")
database.init_app(app)
wikidata_api = "https://www.wikidata.org/w/api.php"
def md5sum(s: str) -> str:
return hashlib.md5(s.encode("utf-8")).hexdigest()
def wikidata_search(q: str) -> list[dict[str, typing.Any]]:
q += " haswbstatement:P31=Q5"
q_md5 = md5sum(q)
cache_filename = os.path.join("cache", q_md5 + ".json")
if os.path.exists(cache_filename):
data = json.load(open(cache_filename))
else:
params: dict[str, str | int] = {
"action": "query",
"list": "search",
"format": "json",
"formatversion": 2,
"srsearch": q,
"srlimit": "max",
}
r = requests.get(wikidata_api, params=params)
open(cache_filename, "w").write(r.text)
data = r.json()
time.sleep(1)
return cast(list[dict[str, typing.Any]], data["query"]["search"])
def wikidata_get_item(qid: str) -> typing.Any:
cache_filename = os.path.join("items", qid + ".json")
if os.path.exists(cache_filename):
item = json.load(open(cache_filename))
else:
print(qid)
params: dict[str, str | int] = {
"action": "wbgetentities",
"ids": qid,
"format": "json",
"formatversion": 2,
}
r = requests.get(wikidata_api, params=params)
item = r.json()["entities"][qid]
with open(cache_filename, "w") as f:
json.dump(item, f, indent=2)
time.sleep(0.1)
return item
def top_speakers() -> sqlalchemy.orm.query.Query:
q = (
database.session.query(model.Person, func.count())
.join(model.ConferencePerson)
.group_by(model.Person)
.order_by(func.count().desc())
.having(func.count() > 3)
)
return q
def top_speakers2() -> sqlalchemy.orm.query.Query:
q = (
database.session.query(model.Person, func.count())
.join(model.ConferencePerson)
.filter(model.Person.name.like("% %"))
.group_by(model.Person)
.order_by(func.count().desc())
.having(func.count() > 2)
)
# .order_by(func.length(model.Person.name).desc())
return q
def top_events() -> sqlalchemy.orm.query.Query:
q = (
database.session.query(model.Event.title, func.count())
.group_by(model.Event.title)
.order_by(func.count().desc())
.having(func.count() > 3)
)
return q
@app.route("/person/<int:person_id>", methods=["GET", "POST"])
def person(person_id: int) -> str | Response:
item = model.Person.query.get(person_id)
if flask.request.method == "POST":
item.wikidata_qid = flask.request.form["wikidata_qid"] or None
item.name = flask.request.form["name"]
database.session.commit()
assert flask.request.endpoint
return flask.redirect(
flask.url_for(flask.request.endpoint, person_id=person_id)
)
return flask.render_template(
"person.html", item=item, Event=model.Event, plural=plural
)
@app.route("/event/<int:event_id>")
def event_page(event_id: int) -> str:
item = model.Event.query.get(event_id)
return flask.render_template("event.html", item=item)
@app.route("/conference/<short_name>")
def conference_page(short_name: str) -> str:
item = model.Conference.query.filter_by(short_name=short_name).one_or_none()
if item is None:
flask.abort(404)
return flask.render_template(
"conference.html", item=item, person_image_filename=person_image_filename
)
@app.route("/people")
def search_people() -> str:
search_for = flask.request.args["q"]
assert search_for
search_for = search_for.strip()
q = model.Person.query.filter(model.Person.name.ilike(f"%{search_for}%")).order_by(
model.Person.name
)
return flask.render_template("search_people.html", q=q, search_for=search_for)
@app.route("/merge", methods=["GET", "POST"])
def merge() -> str | Response:
if flask.request.method == "POST":
search_for = flask.request.form["q"]
item_ids_str = flask.request.form.getlist("person_id")
item_ids: list[int] = [int(i) for i in item_ids_str]
merge_to_id = min(item_ids)
other_ids = [i for i in item_ids if i != merge_to_id]
name_from_person_id = flask.request.form["name"]
print(other_ids, "->", merge_to_id)
with database.session.begin():
if merge_to_id != name_from_person_id:
merge_to = model.Person.query.get(merge_to_id)
name_from_person = model.Person.query.get(name_from_person_id)
merge_to.name = name_from_person.name
print("update ConferencePerson")
database.session.execute(
update(model.ConferencePerson)
.where(model.ConferencePerson.person_id.in_(other_ids))
.values(person_id=merge_to_id)
)
print("update EventPerson")
database.session.execute(
update(model.EventPerson)
.where(model.EventPerson.person_id.in_(other_ids))
.values(person_id=merge_to_id)
)
print("delete people")
for person_id in other_ids:
item = model.Person.query.get(person_id)
database.session.delete(item)
endpoint = flask.request.endpoint
assert endpoint
return flask.redirect(flask.url_for(endpoint, q=search_for))
else:
search_for = flask.request.args["q"]
assert search_for
search_for = search_for.strip()
q = model.Person.query.filter(model.Person.name.ilike(f"%{search_for}%")).order_by(
model.Person.name
)
return flask.render_template("merge_people.html", q=q, search_for=search_for)
@app.route("/events")
def events_page() -> str:
search_for = flask.request.args.get("q")
if not search_for:
return flask.render_template("top_events.html", top_events=top_events())
q = model.Event.query.filter(model.Event.title.ilike(f"%{search_for}%")).order_by(
model.Event.title
)
return flask.render_template("search_events.html", q=q, search_for=search_for)
@app.route("/")
def index() -> str:
"""Start page."""
if False:
q = (
model.Conference.query.order_by(model.Conference.start.desc())
.add_columns(
func.count(model.Event.id), func.count(model.ConferencePerson.person_id)
)
.group_by(model.Conference)
)
q = model.Conference.query.order_by(model.Conference.start.desc())
count = {
"conference": model.Conference.query.count(),
"event": model.Event.query.count(),
"person": model.Person.query.count(),
"country": model.Country.query.count(),
"venue": model.Venue.query.count(),
}
return flask.render_template("index.html", items=q, count=count)
def plural(num: int, label: str) -> str:
return f'{num:,d} {label}{"s" if num != 1 else ""}'
def speaker_counts():
sql = """
select num, count(*)
from (select person_id, count(*) as num from conference_person group by person_id) a
group by num
order by num
"""
return database.session.execute(sql)
@app.route("/speakers")
def top_speakers_page() -> str:
"""Top speakers page."""
return flask.render_template(
"top_speakers.html",
top_speakers=top_speakers(),
speaker_counts=speaker_counts(),
plural=plural,
)
@app.route("/country")
def country_list() -> str:
"""Country list."""
return flask.render_template("country_list.html", items=model.Country.query)
@app.route("/city/<int:city_id>/venue/new", methods=["GET", "POST"])
def add_venue(city_id: int) -> str | Response:
"""Add new venue."""
city = model.City.query.get(city_id)
if flask.request.method != "POST":
return flask.render_template("add_venue.html", city=city)
name = flask.request.form["name"]
wikidata_qid = flask.request.form["wikidata_qid"]
venue = model.Venue(name=name, city=city, wikidata_qid=wikidata_qid)
database.session.add(venue)
database.session.commit()
endpoint = flask.endpoint
return flask.redirect(flask.url_for(endpoint))
@app.route("/wikidata")
def link_to_wikidata() -> str:
items = []
for person, num in top_speakers2():
if person.wikidata_qid:
continue
search_hits = wikidata_search(person.name)
if not search_hits:
continue
if len(search_hits) > 14:
continue
hits = []
for search_hit in search_hits:
qid = search_hit["title"]
item = wikidata_get_item(qid)
if "en" in item["labels"]:
label = item["labels"]["en"]["value"]
else:
label = "[no english label]"
if "en" in item["descriptions"]:
description = item["descriptions"]["en"]["value"]
else:
description = "[no english description]"
hits.append(
{
"qid": qid,
"label": label,
"description": description,
}
)
items.append((person, num, hits))
return flask.render_template("wikidata.html", items=items)
@app.route("/search")
def search_everything() -> str:
search_for = flask.request.args["q"]
if not search_for:
return flask.render_template("search_everything.html")
search_for = search_for.strip()
like = f"%{search_for}%"
people = model.Person.query.filter(model.Person.name.ilike(like)).order_by(
model.Person.name
)
events = model.Event.query.filter(
or_(model.Event.abstract.ilike(like), model.Event.description.ilike(like))
).order_by(model.Event.event_date)
return flask.render_template(
"search_everything.html", people=people, events=events, search_for=search_for
)
@app.route("/person/<int:person_id>/delete", methods=["POST"])
def delete_person(person_id: int) -> str | Response:
item = model.Person.query.get(person_id)
for cp in item.conferences_association:
database.session.delete(cp)
for ep in item.events_association:
database.session.delete(ep)
database.session.delete(item)
database.session.commit()
return flask.redirect(flask.url_for("index"))
def person_image_filename(person_id):
person = model.Person.query.get(person_id)
return os.path.join("wikidata_photo", "thumb", person.wikidata_photo[0])
for filename in person.wikidata_photo:
face_crop = "face_1_" + filename
full = os.path.join("static", "wikidata_photo", "face_cropped", face_crop)
if os.path.exists(full):
return os.path.join("wikidata_photo", "face_cropped", face_crop)
return os.path.join("wikidata_photo", "thumb", person.wikidata_photo[0])
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5002)