Initial commit.

This commit is contained in:
Edward Betts 2023-09-13 16:19:08 +05:30
commit 4e5ee195dd
10 changed files with 372 additions and 0 deletions

1
.gitignore vendored Normal file
View file

@ -0,0 +1 @@
__pycache__

28
confarchive/__init__.py Normal file
View file

@ -0,0 +1,28 @@
from dateutil.relativedelta import relativedelta
durations = [
"5 seconds",
"25 seconds",
"2 minutes",
"10 minutes",
"1 hour",
"5 hours",
"1 day",
"5 days",
"25 days",
"4 months",
]
def rd(label):
num, _, unit = label.partition(" ")
if not unit.endswith("s"):
unit += "s"
return relativedelta(**{unit: int(num)})
bins = (
[{"label": None, "delta": None}]
+ [{"label": label, "delta": rd(label)} for label in durations]
+ [{"label": "Never", "delta": None}]
)

30
confarchive/database.py Normal file
View file

@ -0,0 +1,30 @@
import flask
import sqlalchemy
from sqlalchemy import create_engine, func
from sqlalchemy.orm import scoped_session, sessionmaker
session = scoped_session(sessionmaker())
def init_db(db_url: str, echo: bool = False) -> None:
"""Initialise databsae."""
session.configure(bind=get_engine(db_url, echo=echo))
def get_engine(db_url: str, echo: bool = False) -> sqlalchemy.engine.base.Engine:
"""Create an engine object."""
return create_engine(db_url, pool_recycle=3600, echo=echo)
def init_app(app: flask.app.Flask, echo: bool = False) -> None:
"""Initialise database connection within flask app."""
db_url = app.config["DB_URL"]
session.configure(bind=get_engine(db_url, echo=echo))
@app.teardown_appcontext
def shutdown_session(exception: Exception | None = None) -> None:
session.remove()
def now_utc():
return func.timezone("utc", func.now())

101
confarchive/model.py Normal file
View file

@ -0,0 +1,101 @@
"""Database models."""
import sqlalchemy
import sqlalchemy.orm.decl_api
from sqlalchemy import func
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.orderinglist import ordering_list
from sqlalchemy.orm import relationship
from sqlalchemy.schema import Column, ForeignKey
from sqlalchemy.types import Date, DateTime, Integer, String
from .database import session
Base: sqlalchemy.orm.decl_api.DeclarativeMeta = declarative_base()
Base.query = session.query_property()
class TimeStampedModel(Base):
"""Time stamped model."""
__abstract__ = True
created = Column(DateTime, default=func.now())
modified = Column(DateTime, default=func.now(), onupdate=func.now())
class Conference(TimeStampedModel):
"""Conference."""
__tablename__ = "conference"
id = Column(Integer, primary_key=True)
title = Column(String, nullable=False)
start = Column(Date)
end = Column(Date)
days = Column(Integer)
timezone = Column(String)
location = Column(String)
country = Column(String)
acronym = Column(String)
url = Column(String)
schedule_xml_url = Column(String)
class Event(TimeStampedModel):
"""Event."""
__tablename__ = "event"
id = Column(Integer, primary_key=True)
conference_id = Column(Integer, ForeignKey("conference.id"), nullable=False)
event_date = Column(DateTime)
day = Column(Integer)
guid = Column(String)
start = Column(String)
duration = Column(String)
room = Column(String)
track = Column(String)
slug = Column(String)
title = Column(String, nullable=False)
description = Column(String)
event_type = Column(String)
url = Column(String)
conference = relationship("Conference", backref="events")
people_association = relationship(
"EventPerson",
order_by="EventPerson.position",
back_populates="event",
collection_class=ordering_list("position"),
)
people = association_proxy(
"people_association",
"person",
creator=lambda person: EventPerson(person=person),
)
class Person(TimeStampedModel):
"""Person."""
__tablename__ = "person"
id = Column(Integer, primary_key=True)
name = Column(String)
wikidata_qid = Column(String)
gender = Column(String)
events_association = relationship("EventPerson", back_populates="person")
events = association_proxy("event_association", "event")
class EventPerson(Base):
"""Event person."""
__tablename__ = "event_person"
event_id = Column(Integer, ForeignKey("event.id"), primary_key=True)
person_id = Column(Integer, ForeignKey("person.id"), primary_key=True)
position = Column(Integer, nullable=False)
named_as = Column(String)
person = relationship("Person", back_populates="events_association")
event = relationship("Event", back_populates="people_association")

7
config/default.py Normal file
View file

@ -0,0 +1,7 @@
DB_URL = "postgresql:///confarchive"
SECRET_KEY = '67e49de8fa5d1fe0d767076f382f098176ae17721c90836f'
ADMINS = ['edward@4angle.com']
SMTP_HOST = 'localhost'
MAIL_FROM = 'edward@4angle.com'
ADMIN_EMAIL = 'edward@4angle.com'

6
config/sample.py Normal file
View file

@ -0,0 +1,6 @@
DB_URL = "postgresql:///spaced"
SECRET_KEY = "YOUR-SECRET-KEY"
ADMINS = ["admin@example.org"]
SMTP_HOST = "localhost"
MAIL_FROM = "admin@example.org"

15
create_db.py Executable file
View file

@ -0,0 +1,15 @@
#!/usr/bin/python3
from confarchive import database, model
from main import app
def create_db() -> None:
"""Create database."""
app
engine = database.session.get_bind()
model.Base.metadata.create_all(engine)
if __name__ == "__main__":
create_db()

153
load_conference.py Executable file
View file

@ -0,0 +1,153 @@
#!/usr/bin/python3
import datetime
import lxml.etree
from confarchive import database, model
DB_URL = "postgresql:///confarchive"
database.init_db(DB_URL)
Element = lxml.etree._Element
meals = {"lunch", "dinner", "breakfast"}
non_talk_titles = {"afternoon break", "cheese and wine party", "debcamp", "job fair"}
def not_a_talk(title: str) -> bool:
"""Event with this title is not a talk."""
return is_meal(title) or title.lower() in non_talk_titles
def is_meal(title: str) -> bool:
"""Event title represents a meal."""
return title.lower() in meals
def read_field(root: Element, field: str) -> str | None:
"""Get conference field."""
value = root.findtext(".//" + field)
if value is None:
return None
assert isinstance(value, str)
return value
def read_date_field(root: Element, field: str) -> datetime.date | None:
"""Read date from a field."""
value = read_field(root, field)
return parse_isodate(value) if value is not None else None
def read_required_field(root: Element, field: str) -> str:
"""Read a required field."""
value = read_field(root, field)
assert value
return value
def parse_isodate(iso_date: str) -> datetime.date:
"""Read a date in ISO format."""
return datetime.datetime.fromisoformat(iso_date).date()
def conference_obj(root: Element) -> model.Conference:
"""Build conference object."""
e = root.find(".//conference")
assert e is not None
return model.Conference(
title=read_required_field(e, "title"),
start=read_date_field(e, "start"),
end=read_date_field(e, "end"),
timezone=read_field(e, "time_zone_name"),
)
def build_event_object(
e: Element, person_lookup: dict[int, model.Person]
) -> model.Event | None:
"""Build an event object."""
title = read_required_field(e, "title")
if not_a_talk(title):
return None
room = read_field(e, "room")
slug = read_field(e, "slug")
description = read_field(e, "description")
event_type = read_field(e, "type")
url = read_field(e, "url")
persons_element = e.find(".//persons")
if persons_element is None:
return None
people = []
for p in persons_element:
id_str = p.get("id")
assert id_str is not None
people.append(person_lookup[int(id_str)])
print(title, people)
return model.Event(
title=title,
room=room,
slug=slug,
description=description,
event_type=event_type,
url=url,
people=people,
)
def get_all_people(root: Element) -> list[tuple[int, str]]:
people: dict[int, str] = {}
for person in root.findall(".//person"):
assert person.text
person_id_str = person.get("id")
assert person_id_str
person_id = int(person_id_str)
existing = people.get(person_id)
if existing:
assert person.text == existing
continue
people[person_id] = person.text
return sorted(people.items())
def load(filename: str) -> None:
"""Load conference schedule."""
root = lxml.etree.parse(filename).getroot()
conf = conference_obj(root)
database.session.add(conf)
event_count = 0
people = get_all_people(root)
person_lookup = {}
for person_id, name in people:
person = model.Person.query.filter_by(name=name).first()
if not person:
person = model.Person(name=name)
database.session.add(person)
person_lookup[person_id] = person
for day in root.findall(".//day"):
for room in root.findall(".//room"):
for event_element in root.findall(".//event"):
event = build_event_object(event_element, person_lookup)
if not event:
continue
event.conference = conf
print()
database.session.add(event)
event_count += 1
if event_count > 10:
return None
load("/home/edward/src/2022/conference-gender-mix/schedules/debconf22")
database.session.commit()

21
main.py Executable file
View file

@ -0,0 +1,21 @@
#!/usr/bin/python3
import flask
from confarchive import database
app = flask.Flask(__name__)
app.debug = True
app.config.from_object("config.default")
database.init_app(app)
@app.route("/")
def index() -> str:
"""Start page."""
return flask.render_template("index.html")
if __name__ == "__main__":
app.run(host="0.0.0.0")

10
templates/index.html Normal file
View file

@ -0,0 +1,10 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title></title>
</head>
<body>
</body>
</html>