conference-archive/load_conference.py
2023-09-15 23:34:41 +05:30

276 lines
7.6 KiB
Python
Executable file

#!/usr/bin/python3
import datetime
import os
import sys
import lxml.etree
from confarchive import database, model
DB_URL = "postgresql:///confarchive"
schedules_loc = "/home/edward/src/2022/conference-gender-mix/schedules"
database.init_db(DB_URL)
Element = lxml.etree._Element
meals = {"lunch", "dinner", "breakfast"}
non_talk_titles = {"afternoon break", "cheese and wine party", "debcamp", "job fair"}
def not_a_talk(title: str) -> bool:
"""Event with this title is not a talk."""
return is_meal(title) or title.lower() in non_talk_titles
def is_meal(title: str) -> bool:
"""Event title represents a meal."""
return title.lower() in meals
def read_field(root: Element, field: str) -> str | None:
"""Get conference field."""
value = root.findtext(".//" + field)
if value is None:
return None
assert isinstance(value, str)
return value
def read_date_field(root: Element, field: str) -> datetime.date | None:
"""Read date from a field."""
value = read_field(root, field)
return parse_isodate(value) if value else None
def read_required_field(root: Element, field: str) -> str:
"""Read a required field."""
value = read_field(root, field)
assert value
return value
def parse_isodate(iso_date: str) -> datetime.date:
"""Read a date in ISO format."""
return datetime.datetime.fromisoformat(iso_date).date()
def conference_obj(root: Element) -> model.Conference:
"""Build conference object."""
e = root.find(".//conference")
assert e is not None
start = read_date_field(e, "start") or read_date_field(e, "start_date")
days_str = read_field(e, "days")
days = int(days_str) if days_str else None
assert start
end: datetime.date | None
if days:
end = start + datetime.timedelta(days=days - 1)
else:
end = read_date_field(e, "end") or read_date_field(e, "end_date")
assert end
if not start:
print(lxml.etree.tostring(e, encoding=str))
sys.exit(1)
assert start and end and end >= start
return model.Conference(
title=read_required_field(e, "title"),
start=start,
end=end,
timezone=read_field(e, "time_zone_name"),
)
def build_event_object(
e: Element, person_lookup: dict[str, model.Person]
) -> model.Event | None:
"""Build an event object."""
title = read_field(e, "title")
guid = e.get("guid")
room = read_field(e, "room")
slug = read_field(e, "slug")
description = read_field(e, "description")
event_type = read_field(e, "type")
url = read_field(e, "url")
if title is None:
print("no title")
assert description is None and event_type is None
return None
persons = e.find(".//persons")
if persons is None or len(persons) == 0:
persons = e.findall(".//person")
if persons is None or len(persons) == 0:
return None
people = []
seen_person = set()
print("persons:", len(persons))
for p in persons:
name = p.text
print("peron:", name)
if name is None:
print("no name")
if name in seen_person:
print("seen already:", name)
if name is None or name in seen_person:
continue
seen_person.add(name)
people.append(model.EventPerson(person=person_lookup[name]))
if not people:
print("no people")
return None
return model.Event(
guid=guid,
title=title,
room=room,
slug=slug,
description=description,
event_type=event_type,
url=url,
people_detail=people,
)
def schedule_has_person_ids(root: Element) -> bool:
"""People listed in schedule have ids."""
person = root.find(".//person")
assert person is not None
person_id = person.get("id")
return person_id is not None
def get_all_people(root: Element) -> list[tuple[int, str]]:
people: dict[int, str] = {}
for person in root.findall(".//person"):
assert person.text
person_id_str = person.get("id")
if not person_id_str:
print(lxml.etree.tostring(person, encoding=str))
assert person_id_str
person_id = int(person_id_str)
existing = people.get(person_id)
if existing:
assert person.text == existing
continue
people[person_id] = person.text
return sorted(people.items())
def get_people_names(root: Element) -> set[str]:
return {
normalize_name(person.text)
for person in root.findall(".//person")
if person.text
}
def normalize_name(n: str) -> str:
"""Normalize name."""
return " ".join(n.split()).strip()
def find_existing_person(name: str) -> model.Person | None:
# print("searching for:", name)
person = model.Person.query.filter(model.Person.name.ilike(name)).one_or_none()
assert person is None or isinstance(person, model.Person)
if person:
return person
person = model.ConferencePerson.query.filter(
model.ConferencePerson.named_as.ilike(name)
).one_or_none()
assert person is None or isinstance(person, model.Person)
if person:
return person
def load(filename: str, short_name: str) -> None:
"""Load conference schedule."""
start = open(filename).read(15)
if start == "BEGIN:VCALENDAR" or start.startswith("{"):
return None
root = lxml.etree.parse(filename).getroot()
conf = model.Conference.query.filter_by(short_name=short_name).one_or_none()
if conf:
assert conf.events.count() == 0
else:
conf = conference_obj(root)
assert model.Conference.query.filter_by(title=conf.title).count() == 0
database.session.add(conf)
print((conf.short_name, conf.title))
event_count = 0
people_names = get_people_names(root)
person_lookup = {}
for name in people_names:
cp = model.ConferencePerson.query.filter_by(
conference=conf, named_as=name
).one_or_none()
if cp and cp.person.events_association.count() == 0:
person = cp.person
for cp2 in person.conferences_association:
database.session.delete(cp2)
database.session.delete(cp.person)
database.session.commit()
person = find_existing_person(name)
if not person:
person = model.Person(name=name)
database.session.add(person)
person_lookup[name] = person
for name, person in person_lookup.items():
if model.ConferencePerson.query.filter_by(
conference=conf, person=person
).one_or_none():
continue
conf_person = model.ConferencePerson(
conference=conf, person=person, named_as=name
)
database.session.add(conf_person)
for day in root.findall(".//day"):
day_index_str = day.get("index")
# assert day_index_str is not None
# day_index = int(day_index_str)
print("day", day_index_str)
for event_element in day.findall(".//event"):
title = read_field(event_element, "title")
event = build_event_object(event_element, person_lookup)
if not event:
print(f"skip event: {title}")
continue
event.conference = conf
# event.day = day_index
database.session.add(event)
event_count += 1
if event_count > 1:
database.session.commit()
for f in os.scandir(schedules_loc):
if f.is_dir():
continue
if f.name in {"datenspuren_2019"}:
continue
if not f.name.startswith("capitole_du_libre"):
continue
print(f.name)
load(f.path, f.name)