Refactor Airbnb parsing into reusable library

- Move all parsing logic from parse_airbnb.py to agenda/airbnb.py
- Update parse_airbnb.py to use the new library module
- Add comprehensive tests in tests/test_airbnb.py covering all functions
- Maintain backward compatibility for the command-line interface

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Edward Betts 2025-07-18 10:35:26 +02:00
parent 6b3e8e31eb
commit 567f3b0208
3 changed files with 378 additions and 194 deletions

189
agenda/airbnb.py Normal file
View file

@ -0,0 +1,189 @@
"""Library for parsing Airbnb booking HTML files."""
import json
import re
import typing
from datetime import datetime
from typing import Any
from zoneinfo import ZoneInfo
import lxml.html
import pycountry
StrDict = dict[str, typing.Any]
def build_datetime(date_str: str, time_str: str, tz_name: str) -> datetime:
"""
Combine an ISO date string, HH:MM time string, and a timezone name
into a timezone-aware datetime in the specified timezone.
"""
dt_str = f"{date_str}T{time_str}"
naive_dt = datetime.fromisoformat(dt_str)
return naive_dt.replace(tzinfo=ZoneInfo(tz_name))
def list_to_dict(items: list[typing.Any]) -> dict[str, typing.Any]:
"""Convert a flat list to a dict, assuming alternating keys and values."""
return {items[i]: items[i + 1] for i in range(0, len(items), 2)}
def extract_country_code(address: str) -> str | None:
"""Return ISO 3166-1 alpha-2 country code from a free-text address."""
address_lower = address.lower()
for country in pycountry.countries:
if country.name.lower() in address_lower:
return str(country.alpha_2.lower())
if (
hasattr(country, "official_name")
and country.official_name.lower() in address_lower
):
return str(country.alpha_2.lower())
return None
def get_json_blob(tree: Any) -> str:
data_id = "data-injector-instances"
js_string = tree.xpath(f'//*[@id="{data_id}"]/text()')[0]
return str(js_string)
def get_ui_state(tree: Any) -> StrDict:
data_id = "data-injector-instances"
js_string = tree.xpath(f'//*[@id="{data_id}"]/text()')[0]
big_blob = json.loads(str(js_string))
ui_state = walk_tree(big_blob, "uiState")
return list_to_dict(ui_state[0])
def get_reservation_data(ui_state: StrDict) -> StrDict:
return {
row["id"]: row for row in ui_state["reservation"]["scheduled_event"]["rows"]
}
def get_room_url(tree: Any) -> str | None:
for e in tree.xpath('//a[@data-testid="reservation-destination-link"]'):
href = e.get("href")
assert isinstance(href, str)
if not href.startswith("/room"):
continue
return "https://www.airbnb.co.uk" + href
return None
def get_price_from_reservation(reservation: StrDict) -> str:
price_string = reservation["payment_summary"]["subtitle"]
assert isinstance(price_string, str)
tc = "Total cost: "
assert price_string.startswith(tc)
price = price_string[len(tc) :]
assert price[0] == "£"
return price[1:]
def extract_booking_from_html(html_file: str) -> StrDict:
"""Extract booking information from Airbnb HTML file."""
with open(html_file, "r", encoding="utf-8") as f:
text_content = f.read()
confirmation_match = re.search(
r"/trips/v1/reservation-details/ro/RESERVATION2_CHECKIN/([A-Z0-9]+)",
text_content,
)
if confirmation_match is None:
raise ValueError("Could not find confirmation code in HTML")
confirmation_code = confirmation_match.group(1)
tree = lxml.html.parse(html_file)
root = tree.getroot()
try:
ui_state = get_ui_state(tree)
except Exception:
print(html_file)
raise
reservation = get_reservation_data(ui_state)
m_guests = re.match(r"^(\d+) guests?$", reservation["guests"]["subtitle"])
if m_guests is None:
raise ValueError("Could not parse number of guests")
number_of_adults = int(m_guests.group(1))
price = get_price_from_reservation(reservation)
metadata = ui_state["reservation"]["metadata"]
country_code = metadata["country"].lower()
title = reservation["dynamic_marquee_title_image_v3"]["title"]
location = title.rpartition(" in ")[2]
checkin_checkout = reservation["checkin_checkout_arrival_guide"]
check_in_time = checkin_checkout["leading_subtitle"]
check_out_time = checkin_checkout["trailing_subtitle"]
check_in = build_datetime(
metadata["check_in_date"], check_in_time, metadata["timezone"]
)
check_out = build_datetime(
metadata["check_out_date"], check_out_time, metadata["timezone"]
)
address = reservation["map"]["address"]
if "header_action.pdp" in reservation:
name = reservation["header_action.pdp"]["subtitle"]
else:
name = root.findtext(".//h1")
booking = {
"type": "apartment",
"operator": "airbnb",
"name": name,
"location": location,
"booking_reference": confirmation_code,
"booking_url": f"https://www.airbnb.co.uk/trips/v1/reservation-details/ro/RESERVATION2_CHECKIN/{confirmation_code}",
"address": address,
"country": country_code,
"latitude": metadata["lat"],
"longitude": metadata["lng"],
"timezone": metadata["timezone"],
"from": check_in,
"to": check_out,
"price": price,
"currency": "GBP",
"number_of_adults": number_of_adults,
}
room_url = get_room_url(tree)
if room_url is not None:
booking["url"] = room_url
return booking
def walk_tree(data: Any, want_key: str) -> Any:
"""Recursively search for a dict containing 'reservation' and return its value."""
if isinstance(data, dict):
if want_key in data:
return data[want_key]
for key, value in data.items():
result = walk_tree(value, want_key)
if result is not None:
return result
elif isinstance(data, list):
for item in data:
result = walk_tree(item, want_key)
if result is not None:
return result
return None
def parse_multiple_files(filenames: list[str]) -> list[StrDict]:
"""Parse multiple Airbnb HTML files and return a list of booking dictionaries."""
bookings = []
for html_file in sorted(filenames):
booking = extract_booking_from_html(html_file)
assert booking
bookings.append(booking)
return bookings

View file

@ -1,208 +1,17 @@
#!/usr/bin/python3
import json
import re
import sys
import typing
from datetime import datetime
from typing import Any
from zoneinfo import ZoneInfo
import lxml.html
import pycountry
import yaml
StrDict = dict[str, typing.Any]
def build_datetime(date_str: str, time_str: str, tz_name: str) -> datetime:
"""
Combine an ISO date string, HH:MM time string, and a timezone name
into a timezone-aware datetime in the specified timezone.
"""
dt_str = f"{date_str}T{time_str}"
naive_dt = datetime.fromisoformat(dt_str)
return naive_dt.replace(tzinfo=ZoneInfo(tz_name))
def list_to_dict(items: list) -> dict[str, int]:
"""Convert a flat list to a dict, assuming alternating keys and values."""
return {items[i]: items[i + 1] for i in range(0, len(items), 2)}
def extract_country_code(address: str) -> str | None:
"""Return ISO 3166-1 alpha-2 country code from a free-text address."""
address_lower = address.lower()
for country in pycountry.countries:
if country.name.lower() in address_lower:
return str(country.alpha_2.lower())
if (
hasattr(country, "official_name")
and country.official_name.lower() in address_lower
):
return str(country.alpha_2.lower())
return None
def get_json_blob(tree) -> str:
data_id = "data-injector-instances"
js_string = tree.xpath(f'//*[@id="{data_id}"]/text()')[0]
return js_string
def get_ui_state(tree) -> StrDict:
data_id = "data-injector-instances"
js_string = tree.xpath(f'//*[@id="{data_id}"]/text()')[0]
# print(js_string)
# sys.exit(0)
big_blob = json.loads(js_string)
ui_state = walk_tree(big_blob, "uiState")
# print(json.dumps(ui_state))
return list_to_dict(ui_state[0])
def get_reservation_data(ui_state: StrDict) -> StrDict:
return {
row["id"]: row for row in ui_state["reservation"]["scheduled_event"]["rows"]
}
def get_room_url(tree) -> str:
for e in tree.xpath('//a[@data-testid="reservation-destination-link"]'):
href = e.get("href")
assert isinstance(href, str)
if not href.startswith("/room"):
continue
return "https://www.airbnb.co.uk" + href
def get_price_from_reservation(reservation: StrDict) -> str:
price_string = reservation["payment_summary"]["subtitle"]
assert isinstance(price_string, str)
tc = "Total cost: "
assert price_string.startswith(tc)
price = price_string[len(tc) :]
assert price[0] == "£"
return price[1:]
def extract_booking_from_html(html_file: str) -> StrDict:
"""Extract booking information from Airbnb HTML file."""
with open(html_file, "r", encoding="utf-8") as f:
text_content = f.read()
confirmation_code = re.search(
r"/trips/v1/reservation-details/ro/RESERVATION2_CHECKIN/([A-Z0-9]+)",
text_content,
).group(1)
tree = lxml.html.parse(html_file)
root = tree.getroot()
try:
ui_state = get_ui_state(tree)
except Exception:
print(html_file)
raise
# print(json.dumps(ui_state))
reservation = get_reservation_data(ui_state)
m_guests = re.match(r"^(\d+) guests?$", reservation["guests"]["subtitle"])
number_of_adults = int(m_guests.group(1))
price = get_price_from_reservation(reservation)
# print(reservation["payment_summary"]["subtitle"])
# print(json.dumps(reservation))
metadata = ui_state["reservation"]["metadata"]
country_code = metadata["country"].lower()
# pprint(metadata)
# print(json.dumps(x))
title = reservation["dynamic_marquee_title_image_v3"]["title"]
location = title.rpartition(" in ")[2]
# print(json.dumps(reservation))
checkin_checkout = reservation["checkin_checkout_arrival_guide"]
# pprint(checkin_checkout)
check_in_time = checkin_checkout["leading_subtitle"]
check_out_time = checkin_checkout["trailing_subtitle"]
check_in = build_datetime(
metadata["check_in_date"], check_in_time, metadata["timezone"]
)
check_out = build_datetime(
metadata["check_out_date"], check_out_time, metadata["timezone"]
)
# print(check_in, check_out)
address = reservation["map"]["address"]
# country_code = extract_country_code(address)
# if "header_action.pdp" not in reservation:
# pprint(reservation)
if "header_action.pdp" in reservation:
name = reservation["header_action.pdp"]["subtitle"]
else:
name = root.findtext(".//h1")
booking = {
"type": "apartment",
"operator": "airbnb",
"name": name,
"location": location,
"booking_reference": confirmation_code,
"booking_url": f"https://www.airbnb.co.uk/trips/v1/reservation-details/ro/RESERVATION2_CHECKIN/{confirmation_code}",
"address": address,
"country": country_code,
"latitude": metadata["lat"],
"longitude": metadata["lng"],
"timezone": metadata["timezone"],
"from": check_in,
"to": check_out,
"price": price,
"currency": "GBP",
"number_of_adults": number_of_adults,
}
booking["url"] = get_room_url(tree)
return booking
def walk_tree(data: Any, want_key: str) -> Any:
"""Recursively search for a dict containing 'reservation' and return its value."""
if isinstance(data, dict):
if want_key in data:
return data[want_key]
for key, value in data.items():
result = walk_tree(value, want_key)
if result is not None:
return result
elif isinstance(data, list):
for item in data:
result = walk_tree(item, want_key)
if result is not None:
return result
return None
from agenda.airbnb import parse_multiple_files
def main() -> None:
"""Main function."""
filenames = sorted(sys.argv[1:])
bookings = []
for html_file in filenames:
booking = extract_booking_from_html(html_file)
assert booking
bookings.append(booking)
filenames = sys.argv[1:]
bookings = parse_multiple_files(filenames)
print(yaml.dump(bookings, sort_keys=False))

186
tests/test_airbnb.py Normal file
View file

@ -0,0 +1,186 @@
"""Tests for agenda.airbnb module."""
import pytest
from datetime import datetime
from zoneinfo import ZoneInfo
from unittest.mock import Mock, patch, mock_open
from agenda.airbnb import (
build_datetime,
list_to_dict,
extract_country_code,
walk_tree,
get_ui_state,
get_reservation_data,
get_price_from_reservation,
parse_multiple_files,
)
class TestBuildDatetime:
def test_build_datetime_utc(self):
result = build_datetime("2025-07-28", "15:30", "UTC")
expected = datetime(2025, 7, 28, 15, 30, tzinfo=ZoneInfo("UTC"))
assert result == expected
def test_build_datetime_local_timezone(self):
result = build_datetime("2025-12-25", "09:00", "Europe/London")
expected = datetime(2025, 12, 25, 9, 0, tzinfo=ZoneInfo("Europe/London"))
assert result == expected
class TestListToDict:
def test_list_to_dict_even_items(self):
items = ["key1", "value1", "key2", "value2"]
result = list_to_dict(items)
expected = {"key1": "value1", "key2": "value2"}
assert result == expected
def test_list_to_dict_empty_list(self):
result = list_to_dict([])
assert result == {}
def test_list_to_dict_single_pair(self):
items = ["name", "John"]
result = list_to_dict(items)
assert result == {"name": "John"}
class TestExtractCountryCode:
def test_extract_country_code_uk(self):
address = "123 Main Street, London, United Kingdom"
result = extract_country_code(address)
assert result == "gb"
def test_extract_country_code_france(self):
address = "456 Rue de la Paix, Paris, France"
result = extract_country_code(address)
assert result == "fr"
def test_extract_country_code_usa(self):
address = "789 Broadway, New York, United States"
result = extract_country_code(address)
assert result == "us"
def test_extract_country_code_not_found(self):
address = "123 Unknown Street, Mystery City"
result = extract_country_code(address)
assert result is None
def test_extract_country_code_case_insensitive(self):
address = "123 Main Street, UNITED KINGDOM"
result = extract_country_code(address)
assert result == "gb"
class TestWalkTree:
def test_walk_tree_dict_found(self):
data = {"level1": {"level2": {"target": "found"}}}
result = walk_tree(data, "target")
assert result == "found"
def test_walk_tree_dict_not_found(self):
data = {"level1": {"level2": {"other": "value"}}}
result = walk_tree(data, "target")
assert result is None
def test_walk_tree_list_found(self):
data = [{"other": "value"}, {"target": "found"}]
result = walk_tree(data, "target")
assert result == "found"
def test_walk_tree_nested_list_dict(self):
data = [{"level1": [{"target": "found"}]}]
result = walk_tree(data, "target")
assert result == "found"
def test_walk_tree_empty_data(self):
result = walk_tree({}, "target")
assert result is None
class TestGetPriceFromReservation:
def test_get_price_from_reservation_valid(self):
reservation = {
"payment_summary": {"subtitle": "Total cost: £150.00"}
}
result = get_price_from_reservation(reservation)
assert result == "150.00"
def test_get_price_from_reservation_different_amount(self):
reservation = {
"payment_summary": {"subtitle": "Total cost: £89.99"}
}
result = get_price_from_reservation(reservation)
assert result == "89.99"
class TestParseMultipleFiles:
@patch('agenda.airbnb.extract_booking_from_html')
def test_parse_multiple_files_single_file(self, mock_extract):
mock_booking = {
"type": "apartment",
"operator": "airbnb",
"name": "Test Apartment",
"booking_reference": "ABC123"
}
mock_extract.return_value = mock_booking
result = parse_multiple_files(["test1.html"])
assert len(result) == 1
assert result[0] == mock_booking
mock_extract.assert_called_once_with("test1.html")
@patch('agenda.airbnb.extract_booking_from_html')
def test_parse_multiple_files_multiple_files(self, mock_extract):
mock_booking1 = {"booking_reference": "ABC123"}
mock_booking2 = {"booking_reference": "DEF456"}
mock_extract.side_effect = [mock_booking1, mock_booking2]
result = parse_multiple_files(["test2.html", "test1.html"])
assert len(result) == 2
assert result[0] == mock_booking1
assert result[1] == mock_booking2
@patch('agenda.airbnb.extract_booking_from_html')
def test_parse_multiple_files_empty_list(self, mock_extract):
result = parse_multiple_files([])
assert result == []
mock_extract.assert_not_called()
class TestGetUiState:
@patch('lxml.html.etree')
def test_get_ui_state_with_mock_tree(self, mock_etree):
mock_tree = Mock()
mock_tree.xpath.return_value = ['{"test": [["uiState", {"key": "value"}]]}']
with patch('agenda.airbnb.walk_tree') as mock_walk:
mock_walk.return_value = [["key", "value"]]
result = get_ui_state(mock_tree)
assert result == {"key": "value"}
mock_tree.xpath.assert_called_once_with('//*[@id="data-injector-instances"]/text()')
class TestGetReservationData:
def test_get_reservation_data(self):
ui_state = {
"reservation": {
"scheduled_event": {
"rows": [
{"id": "row1", "data": "value1"},
{"id": "row2", "data": "value2"}
]
}
}
}
result = get_reservation_data(ui_state)
expected = {
"row1": {"id": "row1", "data": "value1"},
"row2": {"id": "row2", "data": "value2"}
}
assert result == expected