276 lines
7.6 KiB
Python
Executable file
276 lines
7.6 KiB
Python
Executable file
#!/usr/bin/python3
|
|
|
|
import datetime
|
|
import os
|
|
import sys
|
|
|
|
import lxml.etree
|
|
|
|
from confarchive import database, model
|
|
|
|
DB_URL = "postgresql:///confarchive"
|
|
schedules_loc = "/home/edward/src/2022/conference-gender-mix/schedules"
|
|
|
|
database.init_db(DB_URL)
|
|
|
|
Element = lxml.etree._Element
|
|
|
|
meals = {"lunch", "dinner", "breakfast"}
|
|
non_talk_titles = {"afternoon break", "cheese and wine party", "debcamp", "job fair"}
|
|
|
|
|
|
def not_a_talk(title: str) -> bool:
|
|
"""Event with this title is not a talk."""
|
|
return is_meal(title) or title.lower() in non_talk_titles
|
|
|
|
|
|
def is_meal(title: str) -> bool:
|
|
"""Event title represents a meal."""
|
|
return title.lower() in meals
|
|
|
|
|
|
def read_field(root: Element, field: str) -> str | None:
|
|
"""Get conference field."""
|
|
value = root.findtext(".//" + field)
|
|
if value is None:
|
|
return None
|
|
assert isinstance(value, str)
|
|
return value
|
|
|
|
|
|
def read_date_field(root: Element, field: str) -> datetime.date | None:
|
|
"""Read date from a field."""
|
|
value = read_field(root, field)
|
|
return parse_isodate(value) if value else None
|
|
|
|
|
|
def read_required_field(root: Element, field: str) -> str:
|
|
"""Read a required field."""
|
|
value = read_field(root, field)
|
|
assert value
|
|
return value
|
|
|
|
|
|
def parse_isodate(iso_date: str) -> datetime.date:
|
|
"""Read a date in ISO format."""
|
|
return datetime.datetime.fromisoformat(iso_date).date()
|
|
|
|
|
|
def conference_obj(root: Element) -> model.Conference:
|
|
"""Build conference object."""
|
|
e = root.find(".//conference")
|
|
assert e is not None
|
|
|
|
start = read_date_field(e, "start") or read_date_field(e, "start_date")
|
|
days_str = read_field(e, "days")
|
|
days = int(days_str) if days_str else None
|
|
assert start
|
|
end: datetime.date | None
|
|
if days:
|
|
end = start + datetime.timedelta(days=days - 1)
|
|
else:
|
|
end = read_date_field(e, "end") or read_date_field(e, "end_date")
|
|
assert end
|
|
|
|
if not start:
|
|
print(lxml.etree.tostring(e, encoding=str))
|
|
sys.exit(1)
|
|
|
|
assert start and end and end >= start
|
|
|
|
return model.Conference(
|
|
title=read_required_field(e, "title"),
|
|
start=start,
|
|
end=end,
|
|
timezone=read_field(e, "time_zone_name"),
|
|
)
|
|
|
|
|
|
def build_event_object(
|
|
e: Element, person_lookup: dict[str, model.Person]
|
|
) -> model.Event | None:
|
|
"""Build an event object."""
|
|
title = read_field(e, "title")
|
|
guid = e.get("guid")
|
|
room = read_field(e, "room")
|
|
slug = read_field(e, "slug")
|
|
description = read_field(e, "description")
|
|
event_type = read_field(e, "type")
|
|
url = read_field(e, "url")
|
|
|
|
if title is None:
|
|
print("no title")
|
|
assert description is None and event_type is None
|
|
return None
|
|
|
|
persons = e.find(".//persons")
|
|
if persons is None or len(persons) == 0:
|
|
persons = e.findall(".//person")
|
|
if persons is None or len(persons) == 0:
|
|
return None
|
|
people = []
|
|
seen_person = set()
|
|
print("persons:", len(persons))
|
|
for p in persons:
|
|
name = p.text
|
|
print("peron:", name)
|
|
if name is None:
|
|
print("no name")
|
|
if name in seen_person:
|
|
print("seen already:", name)
|
|
if name is None or name in seen_person:
|
|
continue
|
|
seen_person.add(name)
|
|
people.append(model.EventPerson(person=person_lookup[name]))
|
|
|
|
if not people:
|
|
print("no people")
|
|
return None
|
|
|
|
return model.Event(
|
|
guid=guid,
|
|
title=title,
|
|
room=room,
|
|
slug=slug,
|
|
description=description,
|
|
event_type=event_type,
|
|
url=url,
|
|
people_detail=people,
|
|
)
|
|
|
|
|
|
def schedule_has_person_ids(root: Element) -> bool:
|
|
"""People listed in schedule have ids."""
|
|
person = root.find(".//person")
|
|
assert person is not None
|
|
person_id = person.get("id")
|
|
return person_id is not None
|
|
|
|
|
|
def get_all_people(root: Element) -> list[tuple[int, str]]:
|
|
people: dict[int, str] = {}
|
|
for person in root.findall(".//person"):
|
|
assert person.text
|
|
person_id_str = person.get("id")
|
|
if not person_id_str:
|
|
print(lxml.etree.tostring(person, encoding=str))
|
|
assert person_id_str
|
|
person_id = int(person_id_str)
|
|
existing = people.get(person_id)
|
|
if existing:
|
|
assert person.text == existing
|
|
continue
|
|
people[person_id] = person.text
|
|
|
|
return sorted(people.items())
|
|
|
|
|
|
def get_people_names(root: Element) -> set[str]:
|
|
return {
|
|
normalize_name(person.text)
|
|
for person in root.findall(".//person")
|
|
if person.text
|
|
}
|
|
|
|
|
|
def normalize_name(n: str) -> str:
|
|
"""Normalize name."""
|
|
return " ".join(n.split()).strip()
|
|
|
|
|
|
def find_existing_person(name: str) -> model.Person | None:
|
|
# print("searching for:", name)
|
|
person = model.Person.query.filter(model.Person.name.ilike(name)).one_or_none()
|
|
assert person is None or isinstance(person, model.Person)
|
|
if person:
|
|
return person
|
|
|
|
person = model.ConferencePerson.query.filter(
|
|
model.ConferencePerson.named_as.ilike(name)
|
|
).one_or_none()
|
|
assert person is None or isinstance(person, model.Person)
|
|
if person:
|
|
return person
|
|
|
|
|
|
def load(filename: str, short_name: str) -> None:
|
|
"""Load conference schedule."""
|
|
start = open(filename).read(15)
|
|
if start == "BEGIN:VCALENDAR" or start.startswith("{"):
|
|
return None
|
|
|
|
root = lxml.etree.parse(filename).getroot()
|
|
|
|
conf = model.Conference.query.filter_by(short_name=short_name).one_or_none()
|
|
|
|
if conf:
|
|
assert conf.events.count() == 0
|
|
else:
|
|
conf = conference_obj(root)
|
|
assert model.Conference.query.filter_by(title=conf.title).count() == 0
|
|
database.session.add(conf)
|
|
|
|
print((conf.short_name, conf.title))
|
|
|
|
event_count = 0
|
|
people_names = get_people_names(root)
|
|
person_lookup = {}
|
|
for name in people_names:
|
|
cp = model.ConferencePerson.query.filter_by(
|
|
conference=conf, named_as=name
|
|
).one_or_none()
|
|
|
|
if cp and cp.person.events_association.count() == 0:
|
|
person = cp.person
|
|
for cp2 in person.conferences_association:
|
|
database.session.delete(cp2)
|
|
database.session.delete(cp.person)
|
|
database.session.commit()
|
|
|
|
person = find_existing_person(name)
|
|
if not person:
|
|
person = model.Person(name=name)
|
|
database.session.add(person)
|
|
person_lookup[name] = person
|
|
|
|
for name, person in person_lookup.items():
|
|
if model.ConferencePerson.query.filter_by(
|
|
conference=conf, person=person
|
|
).one_or_none():
|
|
continue
|
|
|
|
conf_person = model.ConferencePerson(
|
|
conference=conf, person=person, named_as=name
|
|
)
|
|
database.session.add(conf_person)
|
|
|
|
for day in root.findall(".//day"):
|
|
day_index_str = day.get("index")
|
|
# assert day_index_str is not None
|
|
# day_index = int(day_index_str)
|
|
print("day", day_index_str)
|
|
for event_element in day.findall(".//event"):
|
|
title = read_field(event_element, "title")
|
|
event = build_event_object(event_element, person_lookup)
|
|
if not event:
|
|
print(f"skip event: {title}")
|
|
continue
|
|
event.conference = conf
|
|
# event.day = day_index
|
|
database.session.add(event)
|
|
event_count += 1
|
|
|
|
if event_count > 1:
|
|
database.session.commit()
|
|
|
|
|
|
for f in os.scandir(schedules_loc):
|
|
if f.is_dir():
|
|
continue
|
|
if f.name in {"datenspuren_2019"}:
|
|
continue
|
|
if not f.name.startswith("capitole_du_libre"):
|
|
continue
|
|
print(f.name)
|
|
load(f.path, f.name)
|