281 lines
8 KiB
Python
Executable file
281 lines
8 KiB
Python
Executable file
#!/usr/bin/python3
|
|
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import time
|
|
import typing
|
|
from typing import cast
|
|
|
|
import flask
|
|
import requests
|
|
import sqlalchemy
|
|
from sqlalchemy import func, update
|
|
from werkzeug.wrappers import Response
|
|
|
|
from confarchive import database, model
|
|
|
|
app = flask.Flask(__name__)
|
|
app.debug = True
|
|
|
|
app.config.from_object("config.default")
|
|
database.init_app(app)
|
|
|
|
wikidata_api = "https://www.wikidata.org/w/api.php"
|
|
|
|
|
|
def md5sum(s: str) -> str:
|
|
return hashlib.md5(s.encode("utf-8")).hexdigest()
|
|
|
|
|
|
def wikidata_search(q: str) -> list[dict[str, typing.Any]]:
|
|
q += " haswbstatement:P31=Q5"
|
|
q_md5 = md5sum(q)
|
|
|
|
cache_filename = os.path.join("cache", q_md5 + ".json")
|
|
|
|
if os.path.exists(cache_filename):
|
|
data = json.load(open(cache_filename))
|
|
else:
|
|
params: dict[str, str | int] = {
|
|
"action": "query",
|
|
"list": "search",
|
|
"format": "json",
|
|
"formatversion": 2,
|
|
"srsearch": q,
|
|
"srlimit": "max",
|
|
}
|
|
r = requests.get(wikidata_api, params=params)
|
|
open(cache_filename, "w").write(r.text)
|
|
data = r.json()
|
|
time.sleep(1)
|
|
|
|
return cast(dict[str, typing.Any], data["query"]["search"])
|
|
|
|
|
|
def wikidata_get_item(qid: str) -> typing.Any:
|
|
cache_filename = os.path.join("items", qid + ".json")
|
|
if os.path.exists(cache_filename):
|
|
item = json.load(open(cache_filename))
|
|
else:
|
|
params: dict[str, str | int] = {
|
|
"action": "wbgetentities",
|
|
"ids": qid,
|
|
"format": "json",
|
|
"formatversion": 2,
|
|
}
|
|
r = requests.get(wikidata_api, params=params)
|
|
item = r.json()["entities"][qid]
|
|
with open(cache_filename, "w") as f:
|
|
json.dump(item, f, indent=2)
|
|
time.sleep(0.1)
|
|
return item
|
|
|
|
|
|
def top_speakers() -> sqlalchemy.orm.query.Query:
|
|
q = (
|
|
database.session.query(model.Person, func.count())
|
|
.join(model.ConferencePerson)
|
|
.group_by(model.Person)
|
|
.order_by(func.count().desc())
|
|
.having(func.count() > 5)
|
|
)
|
|
return q
|
|
|
|
|
|
def top_speakers2() -> sqlalchemy.orm.query.Query:
|
|
q = (
|
|
database.session.query(model.Person, func.count())
|
|
.join(model.ConferencePerson)
|
|
.filter(model.Person.name.like("% %"))
|
|
.group_by(model.Person)
|
|
.order_by(func.count().desc())
|
|
.having(func.count() > 2)
|
|
)
|
|
# .order_by(func.length(model.Person.name).desc())
|
|
return q
|
|
|
|
|
|
def top_events() -> sqlalchemy.orm.query.Query:
|
|
q = (
|
|
database.session.query(model.Event.title, func.count())
|
|
.group_by(model.Event.title)
|
|
.order_by(func.count().desc())
|
|
.having(func.count() > 5)
|
|
)
|
|
return q
|
|
|
|
|
|
@app.route("/person/<int:person_id>", methods=["GET", "POST"])
|
|
def person(person_id: int) -> str | Response:
|
|
item = model.Person.query.get(person_id)
|
|
if flask.request.method == "POST":
|
|
item.wikidata_qid = flask.request.form["wikidata_qid"] or None
|
|
item.name = flask.request.form["name"]
|
|
database.session.commit()
|
|
assert flask.request.endpoint
|
|
return flask.redirect(
|
|
flask.url_for(flask.request.endpoint, person_id=person_id)
|
|
)
|
|
|
|
return flask.render_template("person.html", item=item, Event=model.Event)
|
|
|
|
|
|
@app.route("/event/<int:event_id>")
|
|
def event_page(event_id: int) -> str:
|
|
item = model.Event.query.get(event_id)
|
|
return flask.render_template("event.html", item=item)
|
|
|
|
|
|
@app.route("/conference/<short_name>")
|
|
def conference_page(short_name: str) -> str:
|
|
item = model.Conference.query.filter_by(short_name=short_name).one_or_none()
|
|
if item is None:
|
|
flask.abort(404)
|
|
return flask.render_template("conference.html", item=item)
|
|
|
|
|
|
@app.route("/people")
|
|
def search_people() -> str:
|
|
search_for = flask.request.args["q"]
|
|
assert search_for
|
|
search_for = search_for.strip()
|
|
q = model.Person.query.filter(model.Person.name.ilike(f"%{search_for}%")).order_by(
|
|
model.Person.name
|
|
)
|
|
return flask.render_template("search_people.html", q=q, search_for=search_for)
|
|
|
|
|
|
@app.route("/merge", methods=["GET", "POST"])
|
|
def merge() -> str | Response:
|
|
if flask.request.method == "POST":
|
|
search_for = flask.request.form["q"]
|
|
|
|
item_ids_str = flask.request.form.getlist("person_id")
|
|
item_ids: list[int] = [int(i) for i in item_ids_str]
|
|
|
|
merge_to_id = min(item_ids)
|
|
other_ids = [i for i in item_ids if i != merge_to_id]
|
|
|
|
print(other_ids, "->", merge_to_id)
|
|
|
|
with database.session.begin():
|
|
print("update ConferencePerson")
|
|
database.session.execute(
|
|
update(model.ConferencePerson)
|
|
.where(model.ConferencePerson.person_id.in_(other_ids))
|
|
.values(person_id=merge_to_id)
|
|
)
|
|
|
|
print("update EventPerson")
|
|
database.session.execute(
|
|
update(model.EventPerson)
|
|
.where(model.EventPerson.person_id.in_(other_ids))
|
|
.values(person_id=merge_to_id)
|
|
)
|
|
|
|
print("delete people")
|
|
for person_id in other_ids:
|
|
item = model.Person.query.get(person_id)
|
|
database.session.delete(item)
|
|
|
|
endpoint = flask.request.endpoint
|
|
assert endpoint
|
|
return flask.redirect(flask.url_for(endpoint, q=search_for))
|
|
|
|
else:
|
|
search_for = flask.request.args["q"]
|
|
|
|
assert search_for
|
|
search_for = search_for.strip()
|
|
q = model.Person.query.filter(model.Person.name.ilike(f"%{search_for}%")).order_by(
|
|
model.Person.name
|
|
)
|
|
return flask.render_template("merge_people.html", q=q, search_for=search_for)
|
|
|
|
|
|
@app.route("/events")
|
|
def events_page() -> str:
|
|
search_for = flask.request.args.get("q")
|
|
if not search_for:
|
|
return flask.render_template("top_events.html", top_events=top_events())
|
|
|
|
q = model.Event.query.filter(model.Event.title.ilike(f"%{search_for}%")).order_by(
|
|
model.Event.title
|
|
)
|
|
return flask.render_template("search_events.html", q=q, search_for=search_for)
|
|
|
|
|
|
@app.route("/")
|
|
def index() -> str:
|
|
"""Start page."""
|
|
if False:
|
|
q = (
|
|
model.Conference.query.order_by(model.Conference.start.desc())
|
|
.add_columns(
|
|
func.count(model.Event.id), func.count(model.ConferencePerson.person_id)
|
|
)
|
|
.group_by(model.Conference)
|
|
)
|
|
|
|
q = model.Conference.query.order_by(model.Conference.start.desc())
|
|
|
|
count = {
|
|
"conference": model.Conference.query.count(),
|
|
"event": model.Event.query.count(),
|
|
"person": model.Person.query.count(),
|
|
}
|
|
|
|
return flask.render_template("index.html", items=q, count=count)
|
|
|
|
|
|
@app.route("/speakers")
|
|
def top_speakers_page() -> str:
|
|
"""Top speakers page."""
|
|
return flask.render_template("top_speakers.html", top_speakers=top_speakers())
|
|
|
|
|
|
@app.route("/wikidata")
|
|
def link_to_wikidata() -> str:
|
|
items = []
|
|
for person, num in top_speakers2():
|
|
if person.wikidata_qid:
|
|
continue
|
|
search_hits = wikidata_search(f'"{person.name}"')
|
|
if not search_hits:
|
|
continue
|
|
|
|
if len(search_hits) > 10:
|
|
continue
|
|
|
|
hits = []
|
|
|
|
for search_hit in search_hits:
|
|
qid = search_hit["title"]
|
|
item = wikidata_get_item(qid)
|
|
if "en" in item["labels"]:
|
|
label = item["labels"]["en"]["value"]
|
|
else:
|
|
label = "[no english label]"
|
|
|
|
if "en" in item["descriptions"]:
|
|
description = item["descriptions"]["en"]["value"]
|
|
else:
|
|
description = "[no english description]"
|
|
|
|
hits.append(
|
|
{
|
|
"qid": qid,
|
|
"label": label,
|
|
"description": description,
|
|
}
|
|
)
|
|
|
|
items.append((person, num, hits))
|
|
|
|
return flask.render_template("wikidata.html", items=items)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host="0.0.0.0", port=5002)
|