Move code around.

This commit is contained in:
Edward Betts 2023-09-25 15:14:05 +01:00
parent 73e694d373
commit 7760ed0b58
2 changed files with 123 additions and 121 deletions

59
confarchive/query.py Normal file
View file

@ -0,0 +1,59 @@
"""Database queries."""
import typing
from sqlalchemy.orm.query import Query
from sqlalchemy.engine.cursor import CursorResult
from sqlalchemy import func
from . import database, model
def top_speakers() -> Query:
"""Find people who spoke at the most conferences."""
q: Query = (
database.session.query(model.Person, func.count())
.join(model.ConferencePerson)
.filter(model.Person.id != 1046) # FOSDEM Staff
.group_by(model.Person)
.order_by(func.count().desc(), model.Person.name)
)
return q
def top_events() -> Query:
"""Most common titles of events."""
q: Query = (
database.session.query(model.Event.title, func.count())
.group_by(model.Event.title)
.order_by(func.count().desc())
.having(func.count() > 3)
)
return q
def search_for_events(search_for: str) -> Query:
"""Search for events with by title."""
q: Query = model.Event.query.filter(
model.Event.title.ilike(f"%{search_for}%")
).order_by(model.Event.title)
return q
def search_for_people(search_for: str) -> Query:
"""Search for people by name."""
q: Query = model.Person.query.filter(
model.Person.name.ilike(f"%{search_for}%")
).order_by(model.Person.name)
return q
def speaker_counts() -> CursorResult:
"""Speaker/conference frequency distribution."""
sql = """
select num, count(*)
from (select person_id, count(*) as num from conference_person group by person_id) a
group by num
order by num
"""
return typing.cast(CursorResult, database.session.execute(sql))

185
main.py
View file

@ -3,11 +3,10 @@
import os
import flask
import sqlalchemy
from sqlalchemy import func, or_, update
from werkzeug.wrappers import Response
from confarchive import database, model, wikidata
from confarchive import database, model, wikidata, query
app = flask.Flask(__name__)
app.debug = True
@ -16,41 +15,6 @@ app.config.from_object("config.default")
database.init_app(app)
def top_speakers() -> sqlalchemy.orm.query.Query:
q = (
database.session.query(model.Person, func.count())
.join(model.ConferencePerson)
.filter(model.Person.id != 1046) # FOSDEM Staff
.group_by(model.Person)
.order_by(func.count().desc(), model.Person.name)
.having(func.count() > 4)
)
return q
def top_speakers2() -> sqlalchemy.orm.query.Query:
q = (
database.session.query(model.Person, func.count())
.join(model.ConferencePerson)
.filter(model.Person.name.like("% %"))
.group_by(model.Person)
.order_by(func.count().desc())
.having(func.count() > 2)
)
# .order_by(func.length(model.Person.name).desc())
return q
def top_events() -> sqlalchemy.orm.query.Query:
q = (
database.session.query(model.Event.title, func.count())
.group_by(model.Event.title)
.order_by(func.count().desc())
.having(func.count() > 3)
)
return q
def drop_start(s: str, start: str) -> str:
"""Remove text from the start of a string."""
return s[len(start) :] if s.startswith(start) else s
@ -162,85 +126,73 @@ def search_people() -> str:
@app.route("/merge", methods=["GET", "POST"])
def merge() -> str | Response:
assert app.config["ADMIN_MODE"]
"""Merge speakers."""
assert check_admin_mode()
if flask.request.method == "POST":
search_for = flask.request.form["q"]
item_ids_str = flask.request.form.getlist("person_id")
item_ids: list[int] = [int(i) for i in item_ids_str]
merge_to_id = min(item_ids)
other_ids = [i for i in item_ids if i != merge_to_id]
name_from_person_id = flask.request.form["name"]
print(other_ids, "->", merge_to_id)
with database.session.begin():
if merge_to_id != name_from_person_id:
merge_to = model.Person.query.get(merge_to_id)
name_from_person = model.Person.query.get(name_from_person_id)
merge_to.name = name_from_person.name
print("update ConferencePerson")
database.session.execute(
update(model.ConferencePerson)
.where(model.ConferencePerson.person_id.in_(other_ids))
.values(person_id=merge_to_id)
)
print("update EventPerson")
database.session.execute(
update(model.EventPerson)
.where(model.EventPerson.person_id.in_(other_ids))
.values(person_id=merge_to_id)
)
print("delete people")
for person_id in other_ids:
item = model.Person.query.get(person_id)
database.session.delete(item)
endpoint = flask.request.endpoint
assert endpoint
return flask.redirect(flask.url_for(endpoint, q=search_for))
else:
if flask.request.method == "GET":
search_for = flask.request.args["q"]
assert search_for
search_for = search_for.strip()
q = query.search_for_people(search_for)
return flask.render_template("merge_people.html", q=q, search_for=search_for)
assert search_for
search_for = search_for.strip()
q = model.Person.query.filter(model.Person.name.ilike(f"%{search_for}%")).order_by(
model.Person.name
)
return flask.render_template("merge_people.html", q=q, search_for=search_for)
assert flask.request.method == "POST"
search_for = flask.request.form["q"]
item_ids_str = flask.request.form.getlist("person_id")
item_ids: list[int] = [int(i) for i in item_ids_str]
merge_to_id: int = min(item_ids)
other_ids = [i for i in item_ids if i != merge_to_id]
name_from_person_id = int(flask.request.form["name"])
print(other_ids, "->", merge_to_id)
with database.session.begin():
if merge_to_id != name_from_person_id:
merge_to = model.Person.query.get(merge_to_id)
name_from_person = model.Person.query.get(name_from_person_id)
merge_to.name = name_from_person.name
print("update ConferencePerson")
database.session.execute(
update(model.ConferencePerson)
.where(model.ConferencePerson.person_id.in_(other_ids))
.values(person_id=merge_to_id)
)
print("update EventPerson")
database.session.execute(
update(model.EventPerson)
.where(model.EventPerson.person_id.in_(other_ids))
.values(person_id=merge_to_id)
)
print("delete people")
for person_id in other_ids:
item = model.Person.query.get(person_id)
database.session.delete(item)
endpoint = flask.request.endpoint
assert endpoint
return flask.redirect(flask.url_for(endpoint, q=search_for))
@app.route("/events")
def events_page() -> str:
"""Events page."""
search_for = flask.request.args.get("q")
if not search_for:
return flask.render_template("top_events.html", top_events=top_events())
q = model.Event.query.filter(model.Event.title.ilike(f"%{search_for}%")).order_by(
model.Event.title
)
return flask.render_template("search_events.html", q=q, search_for=search_for)
if search_for:
q = query.search_for_events(search_for)
return flask.render_template("search_events.html", q=q, search_for=search_for)
else:
return flask.render_template("top_events.html", top_events=query.top_events())
@app.route("/")
def index() -> str:
"""Start page."""
if False:
q = (
model.Conference.query.order_by(model.Conference.start.desc())
.add_columns(
func.count(model.Event.id), func.count(model.ConferencePerson.person_id)
)
.group_by(model.Conference)
)
q = model.Conference.query.order_by(model.Conference.start.desc())
count = {
@ -258,17 +210,6 @@ def plural(num: int, label: str) -> str:
return f'{num:,d} {label}{"s" if num != 1 else ""}'
def speaker_counts():
sql = """
select num, count(*)
from (select person_id, count(*) as num from conference_person group by person_id) a
group by num
order by num
"""
return database.session.execute(sql)
@app.route("/series")
def list_series() -> str:
"""Page showing list of conference series."""
@ -279,7 +220,7 @@ def list_series() -> str:
@app.route("/speakers")
def top_speakers_page() -> str:
top = top_speakers()
top = query.top_speakers().having(func.count() > 4)
"""Top speakers page."""
photos = []
@ -298,7 +239,7 @@ def top_speakers_page() -> str:
return flask.render_template(
"top_speakers.html",
top_speakers=top,
speaker_counts=speaker_counts(),
speaker_counts=query.speaker_counts(),
plural=plural,
person_image_filename=person_image_filename,
# photo_person_ids=photo_person_ids,
@ -335,11 +276,13 @@ def add_venue(city_id: int) -> str | Response:
@app.route("/wikidata")
def link_to_wikidata() -> str:
items = []
for person, num in top_speakers2():
if person.wikidata_qid:
continue
q = person.name + " haswbstatement:P31=Q5"
search_hits = wikidata.search(q)
top = (
query.top_speakers()
.filter(model.Person.name.like("% %"), model.Person.wikidata_qid.is_(None))
.having(func.count() > 2)
)
for person, num in top:
search_hits = wikidata.search(person.name + " haswbstatement:P31=Q5")
if not search_hits:
continue