#!/usr/bin/python3 import datetime import os import sys import lxml.etree from confarchive import database, model DB_URL = "postgresql:///confarchive" schedules_loc = "/home/edward/src/2022/conference-gender-mix/schedules" database.init_db(DB_URL) Element = lxml.etree._Element meals = {"lunch", "dinner", "breakfast"} non_talk_titles = {"afternoon break", "cheese and wine party", "debcamp", "job fair"} def not_a_talk(title: str) -> bool: """Event with this title is not a talk.""" return is_meal(title) or title.lower() in non_talk_titles def is_meal(title: str) -> bool: """Event title represents a meal.""" return title.lower() in meals def read_field(root: Element, field: str) -> str | None: """Get conference field.""" value = root.findtext(".//" + field) if value is None: return None assert isinstance(value, str) return value def read_date_field(root: Element, field: str) -> datetime.date | None: """Read date from a field.""" value = read_field(root, field) return parse_isodate(value) if value else None def read_required_field(root: Element, field: str) -> str: """Read a required field.""" value = read_field(root, field) assert value return value def parse_isodate(iso_date: str) -> datetime.date: """Read a date in ISO format.""" return datetime.datetime.fromisoformat(iso_date).date() def conference_obj(root: Element) -> model.Conference: """Build conference object.""" e = root.find(".//conference") assert e is not None start = read_date_field(e, "start") or read_date_field(e, "start_date") days_str = read_field(e, "days") days = int(days_str) if days_str else None assert start end: datetime.date | None if days: end = start + datetime.timedelta(days=days - 1) else: end = read_date_field(e, "end") or read_date_field(e, "end_date") assert end if not start: print(lxml.etree.tostring(e, encoding=str)) sys.exit(1) assert start and end and end >= start return model.Conference( title=read_required_field(e, "title"), start=start, end=end, timezone=read_field(e, "time_zone_name"), ) def build_event_object( e: Element, person_lookup: dict[str, model.Person] ) -> model.Event | None: """Build an event object.""" title = read_field(e, "title") guid = e.get("guid") room = read_field(e, "room") slug = read_field(e, "slug") description = read_field(e, "description") event_type = read_field(e, "type") url = read_field(e, "url") if title is None: print("no title") assert description is None and event_type is None return None persons = e.find(".//persons") if persons is None or len(persons) == 0: persons = e.findall(".//person") if persons is None or len(persons) == 0: return None people = [] seen_person = set() print("persons:", len(persons)) for p in persons: name = p.text print("peron:", name) if name is None: print("no name") if name in seen_person: print("seen already:", name) if name is None or name in seen_person: continue seen_person.add(name) people.append(model.EventPerson(person=person_lookup[name])) if not people: print("no people") return None return model.Event( guid=guid, title=title, room=room, slug=slug, description=description, event_type=event_type, url=url, people_detail=people, ) def schedule_has_person_ids(root: Element) -> bool: """People listed in schedule have ids.""" person = root.find(".//person") assert person is not None person_id = person.get("id") return person_id is not None def get_all_people(root: Element) -> list[tuple[int, str]]: people: dict[int, str] = {} for person in root.findall(".//person"): assert person.text person_id_str = person.get("id") if not person_id_str: print(lxml.etree.tostring(person, encoding=str)) assert person_id_str person_id = int(person_id_str) existing = people.get(person_id) if existing: assert person.text == existing continue people[person_id] = person.text return sorted(people.items()) def get_people_names(root: Element) -> set[str]: return { normalize_name(person.text) for person in root.findall(".//person") if person.text } def normalize_name(n: str) -> str: """Normalize name.""" return " ".join(n.split()).strip() def find_existing_person(name: str) -> model.Person | None: # print("searching for:", name) person = model.Person.query.filter(model.Person.name.ilike(name)).one_or_none() assert person is None or isinstance(person, model.Person) if person: return person person = model.ConferencePerson.query.filter( model.ConferencePerson.named_as.ilike(name) ).one_or_none() assert person is None or isinstance(person, model.Person) if person: return person def load(filename: str, short_name: str) -> None: """Load conference schedule.""" start = open(filename).read(15) if start == "BEGIN:VCALENDAR" or start.startswith("{"): return None root = lxml.etree.parse(filename).getroot() conf = model.Conference.query.filter_by(short_name=short_name).one_or_none() if conf: assert conf.events.count() == 0 else: conf = conference_obj(root) assert model.Conference.query.filter_by(title=conf.title).count() == 0 database.session.add(conf) print((conf.short_name, conf.title)) event_count = 0 people_names = get_people_names(root) person_lookup = {} for name in people_names: cp = model.ConferencePerson.query.filter_by( conference=conf, named_as=name ).one_or_none() if cp and cp.person.events_association.count() == 0: person = cp.person for cp2 in person.conferences_association: database.session.delete(cp2) database.session.delete(cp.person) database.session.commit() person = find_existing_person(name) if not person: person = model.Person(name=name) database.session.add(person) person_lookup[name] = person for name, person in person_lookup.items(): if model.ConferencePerson.query.filter_by( conference=conf, person=person ).one_or_none(): continue conf_person = model.ConferencePerson( conference=conf, person=person, named_as=name ) database.session.add(conf_person) for day in root.findall(".//day"): day_index_str = day.get("index") # assert day_index_str is not None # day_index = int(day_index_str) print("day", day_index_str) for event_element in day.findall(".//event"): title = read_field(event_element, "title") event = build_event_object(event_element, person_lookup) if not event: print(f"skip event: {title}") continue event.conference = conf # event.day = day_index database.session.add(event) event_count += 1 if event_count > 1: database.session.commit() for f in os.scandir(schedules_loc): if f.is_dir(): continue if f.name in {"datenspuren_2019"}: continue if not f.name.startswith("capitole_du_libre"): continue print(f.name) load(f.path, f.name)