Compare commits

..

No commits in common. "4b8c29a8d0f32644085183300fda1acb89d305b9" and "6b3e8e31eb99898620978ce5a10851811a6bf964" have entirely different histories.

4 changed files with 195 additions and 383 deletions

View file

@ -1,190 +0,0 @@
"""Library for parsing Airbnb booking HTML files."""
import json
import re
import typing
from datetime import datetime
from typing import Any
from zoneinfo import ZoneInfo
import lxml.html
import pycountry
StrDict = dict[str, typing.Any]
def build_datetime(date_str: str, time_str: str, tz_name: str) -> datetime:
"""
Combine an ISO date string, HH:MM time string, and a timezone name
into a timezone-aware datetime in the specified timezone.
"""
dt_str = f"{date_str}T{time_str}"
naive_dt = datetime.fromisoformat(dt_str)
return naive_dt.replace(tzinfo=ZoneInfo(tz_name))
def list_to_dict(items: list[typing.Any]) -> dict[str, typing.Any]:
"""Convert a flat list to a dict, assuming alternating keys and values."""
return {items[i]: items[i + 1] for i in range(0, len(items), 2)}
def extract_country_code(address: str) -> str | None:
"""Return ISO 3166-1 alpha-2 country code from a free-text address."""
address_lower = address.lower()
for country in pycountry.countries:
if country.name.lower() in address_lower:
return str(country.alpha_2.lower())
if (
hasattr(country, "official_name")
and country.official_name.lower() in address_lower
):
return str(country.alpha_2.lower())
return None
def get_json_blob(tree: Any) -> str:
data_id = "data-injector-instances"
js_string = tree.xpath(f'//*[@id="{data_id}"]/text()')[0]
return str(js_string)
def get_ui_state(tree: Any) -> StrDict:
data_id = "data-injector-instances"
js_string = tree.xpath(f'//*[@id="{data_id}"]/text()')[0]
big_blob = json.loads(str(js_string))
ui_state = walk_tree(big_blob, "uiState")
return list_to_dict(ui_state[0])
def get_reservation_data(ui_state: StrDict) -> StrDict:
return {
row["id"]: row for row in ui_state["reservation"]["scheduled_event"]["rows"]
}
def get_room_url(tree: Any) -> str | None:
for e in tree.xpath('//a[@data-testid="reservation-destination-link"]'):
href = e.get("href")
assert isinstance(href, str)
if not href.startswith("/room"):
continue
return "https://www.airbnb.co.uk" + href
return None
def get_price_from_reservation(reservation: StrDict) -> str:
price = reservation["payment_summary"]["subtitle"]
assert isinstance(price, str)
tc = "Total cost: "
if price.startswith(tc):
price = price[len(tc) :]
assert price[0] == "£"
return price[1:]
def extract_booking_from_html(html_file: str) -> StrDict:
"""Extract booking information from Airbnb HTML file."""
with open(html_file, "r", encoding="utf-8") as f:
text_content = f.read()
confirmation_match = re.search(
r"/trips/v1/reservation-details/ro/RESERVATION2_CHECKIN/([A-Z0-9]+)",
text_content,
)
if confirmation_match is None:
raise ValueError("Could not find confirmation code in HTML")
confirmation_code = confirmation_match.group(1)
tree = lxml.html.parse(html_file)
root = tree.getroot()
try:
ui_state = get_ui_state(tree)
except Exception:
print(html_file)
raise
reservation = get_reservation_data(ui_state)
m_guests = re.match(r"^(\d+) guests?$", reservation["guests"]["subtitle"])
if m_guests is None:
raise ValueError("Could not parse number of guests")
number_of_adults = int(m_guests.group(1))
price = get_price_from_reservation(reservation)
metadata = ui_state["reservation"]["metadata"]
country_code = metadata["country"].lower()
title = reservation["dynamic_marquee_title_image_v3"]["title"]
location = title.rpartition(" in ")[2]
checkin_checkout = reservation["checkin_checkout_arrival_guide"]
check_in_time = checkin_checkout["leading_subtitle"]
check_out_time = checkin_checkout["trailing_subtitle"]
check_in = build_datetime(
metadata["check_in_date"], check_in_time, metadata["timezone"]
)
check_out = build_datetime(
metadata["check_out_date"], check_out_time, metadata["timezone"]
)
address = reservation["map"]["address"] if "map" in reservation else None
if "header_action.pdp" in reservation:
name = reservation["header_action.pdp"]["subtitle"]
else:
name = root.findtext(".//h1")
booking = {
"type": "apartment",
"operator": "airbnb",
"name": name,
"location": location,
"booking_reference": confirmation_code,
"booking_url": f"https://www.airbnb.co.uk/trips/v1/reservation-details/ro/RESERVATION2_CHECKIN/{confirmation_code}",
"country": country_code,
"latitude": metadata["lat"],
"longitude": metadata["lng"],
"timezone": metadata["timezone"],
"from": check_in,
"to": check_out,
"price": price,
"currency": "GBP",
"number_of_adults": number_of_adults,
}
if address:
booking["address"] = address
room_url = get_room_url(tree)
if room_url is not None:
booking["url"] = room_url
return booking
def walk_tree(data: Any, want_key: str) -> Any:
"""Recursively search for a dict containing 'reservation' and return its value."""
if isinstance(data, dict):
if want_key in data:
return data[want_key]
for key, value in data.items():
result = walk_tree(value, want_key)
if result is not None:
return result
elif isinstance(data, list):
for item in data:
result = walk_tree(item, want_key)
if result is not None:
return result
return None
def parse_multiple_files(filenames: list[str]) -> list[StrDict]:
"""Parse multiple Airbnb HTML files and return a list of booking dictionaries."""
bookings = []
for html_file in sorted(filenames):
booking = extract_booking_from_html(html_file)
assert booking
bookings.append(booking)
return bookings

View file

@ -2,11 +2,11 @@
import collections
import datetime
from datetime import date
import functools
import typing
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import date
import emoji
from pycountry.db import Country
@ -77,9 +77,6 @@ class Trip:
event["title"] for event in self.events
] or self.titles_from_travel()
if not titles:
titles = [acc["location"] for acc in self.accommodation]
return format_list_with_ampersand(titles) or "[unnamed trip]"
def titles_from_travel(self) -> list[str]:

View file

@ -1,17 +1,208 @@
#!/usr/bin/python3
import json
import re
import sys
import typing
from datetime import datetime
from typing import Any
from zoneinfo import ZoneInfo
import lxml.html
import pycountry
import yaml
from agenda.airbnb import parse_multiple_files
StrDict = dict[str, typing.Any]
def build_datetime(date_str: str, time_str: str, tz_name: str) -> datetime:
"""
Combine an ISO date string, HH:MM time string, and a timezone name
into a timezone-aware datetime in the specified timezone.
"""
dt_str = f"{date_str}T{time_str}"
naive_dt = datetime.fromisoformat(dt_str)
return naive_dt.replace(tzinfo=ZoneInfo(tz_name))
def list_to_dict(items: list) -> dict[str, int]:
"""Convert a flat list to a dict, assuming alternating keys and values."""
return {items[i]: items[i + 1] for i in range(0, len(items), 2)}
def extract_country_code(address: str) -> str | None:
"""Return ISO 3166-1 alpha-2 country code from a free-text address."""
address_lower = address.lower()
for country in pycountry.countries:
if country.name.lower() in address_lower:
return str(country.alpha_2.lower())
if (
hasattr(country, "official_name")
and country.official_name.lower() in address_lower
):
return str(country.alpha_2.lower())
return None
def get_json_blob(tree) -> str:
data_id = "data-injector-instances"
js_string = tree.xpath(f'//*[@id="{data_id}"]/text()')[0]
return js_string
def get_ui_state(tree) -> StrDict:
data_id = "data-injector-instances"
js_string = tree.xpath(f'//*[@id="{data_id}"]/text()')[0]
# print(js_string)
# sys.exit(0)
big_blob = json.loads(js_string)
ui_state = walk_tree(big_blob, "uiState")
# print(json.dumps(ui_state))
return list_to_dict(ui_state[0])
def get_reservation_data(ui_state: StrDict) -> StrDict:
return {
row["id"]: row for row in ui_state["reservation"]["scheduled_event"]["rows"]
}
def get_room_url(tree) -> str:
for e in tree.xpath('//a[@data-testid="reservation-destination-link"]'):
href = e.get("href")
assert isinstance(href, str)
if not href.startswith("/room"):
continue
return "https://www.airbnb.co.uk" + href
def get_price_from_reservation(reservation: StrDict) -> str:
price_string = reservation["payment_summary"]["subtitle"]
assert isinstance(price_string, str)
tc = "Total cost: "
assert price_string.startswith(tc)
price = price_string[len(tc) :]
assert price[0] == "£"
return price[1:]
def extract_booking_from_html(html_file: str) -> StrDict:
"""Extract booking information from Airbnb HTML file."""
with open(html_file, "r", encoding="utf-8") as f:
text_content = f.read()
confirmation_code = re.search(
r"/trips/v1/reservation-details/ro/RESERVATION2_CHECKIN/([A-Z0-9]+)",
text_content,
).group(1)
tree = lxml.html.parse(html_file)
root = tree.getroot()
try:
ui_state = get_ui_state(tree)
except Exception:
print(html_file)
raise
# print(json.dumps(ui_state))
reservation = get_reservation_data(ui_state)
m_guests = re.match(r"^(\d+) guests?$", reservation["guests"]["subtitle"])
number_of_adults = int(m_guests.group(1))
price = get_price_from_reservation(reservation)
# print(reservation["payment_summary"]["subtitle"])
# print(json.dumps(reservation))
metadata = ui_state["reservation"]["metadata"]
country_code = metadata["country"].lower()
# pprint(metadata)
# print(json.dumps(x))
title = reservation["dynamic_marquee_title_image_v3"]["title"]
location = title.rpartition(" in ")[2]
# print(json.dumps(reservation))
checkin_checkout = reservation["checkin_checkout_arrival_guide"]
# pprint(checkin_checkout)
check_in_time = checkin_checkout["leading_subtitle"]
check_out_time = checkin_checkout["trailing_subtitle"]
check_in = build_datetime(
metadata["check_in_date"], check_in_time, metadata["timezone"]
)
check_out = build_datetime(
metadata["check_out_date"], check_out_time, metadata["timezone"]
)
# print(check_in, check_out)
address = reservation["map"]["address"]
# country_code = extract_country_code(address)
# if "header_action.pdp" not in reservation:
# pprint(reservation)
if "header_action.pdp" in reservation:
name = reservation["header_action.pdp"]["subtitle"]
else:
name = root.findtext(".//h1")
booking = {
"type": "apartment",
"operator": "airbnb",
"name": name,
"location": location,
"booking_reference": confirmation_code,
"booking_url": f"https://www.airbnb.co.uk/trips/v1/reservation-details/ro/RESERVATION2_CHECKIN/{confirmation_code}",
"address": address,
"country": country_code,
"latitude": metadata["lat"],
"longitude": metadata["lng"],
"timezone": metadata["timezone"],
"from": check_in,
"to": check_out,
"price": price,
"currency": "GBP",
"number_of_adults": number_of_adults,
}
booking["url"] = get_room_url(tree)
return booking
def walk_tree(data: Any, want_key: str) -> Any:
"""Recursively search for a dict containing 'reservation' and return its value."""
if isinstance(data, dict):
if want_key in data:
return data[want_key]
for key, value in data.items():
result = walk_tree(value, want_key)
if result is not None:
return result
elif isinstance(data, list):
for item in data:
result = walk_tree(item, want_key)
if result is not None:
return result
return None
def main() -> None:
"""Main function."""
filenames = sys.argv[1:]
bookings = parse_multiple_files(filenames)
filenames = sorted(sys.argv[1:])
bookings = []
for html_file in filenames:
booking = extract_booking_from_html(html_file)
assert booking
bookings.append(booking)
print(yaml.dump(bookings, sort_keys=False))

View file

@ -1,186 +0,0 @@
"""Tests for agenda.airbnb module."""
import pytest
from datetime import datetime
from zoneinfo import ZoneInfo
from unittest.mock import Mock, patch, mock_open
from agenda.airbnb import (
build_datetime,
list_to_dict,
extract_country_code,
walk_tree,
get_ui_state,
get_reservation_data,
get_price_from_reservation,
parse_multiple_files,
)
class TestBuildDatetime:
def test_build_datetime_utc(self):
result = build_datetime("2025-07-28", "15:30", "UTC")
expected = datetime(2025, 7, 28, 15, 30, tzinfo=ZoneInfo("UTC"))
assert result == expected
def test_build_datetime_local_timezone(self):
result = build_datetime("2025-12-25", "09:00", "Europe/London")
expected = datetime(2025, 12, 25, 9, 0, tzinfo=ZoneInfo("Europe/London"))
assert result == expected
class TestListToDict:
def test_list_to_dict_even_items(self):
items = ["key1", "value1", "key2", "value2"]
result = list_to_dict(items)
expected = {"key1": "value1", "key2": "value2"}
assert result == expected
def test_list_to_dict_empty_list(self):
result = list_to_dict([])
assert result == {}
def test_list_to_dict_single_pair(self):
items = ["name", "John"]
result = list_to_dict(items)
assert result == {"name": "John"}
class TestExtractCountryCode:
def test_extract_country_code_uk(self):
address = "123 Main Street, London, United Kingdom"
result = extract_country_code(address)
assert result == "gb"
def test_extract_country_code_france(self):
address = "456 Rue de la Paix, Paris, France"
result = extract_country_code(address)
assert result == "fr"
def test_extract_country_code_usa(self):
address = "789 Broadway, New York, United States"
result = extract_country_code(address)
assert result == "us"
def test_extract_country_code_not_found(self):
address = "123 Unknown Street, Mystery City"
result = extract_country_code(address)
assert result is None
def test_extract_country_code_case_insensitive(self):
address = "123 Main Street, UNITED KINGDOM"
result = extract_country_code(address)
assert result == "gb"
class TestWalkTree:
def test_walk_tree_dict_found(self):
data = {"level1": {"level2": {"target": "found"}}}
result = walk_tree(data, "target")
assert result == "found"
def test_walk_tree_dict_not_found(self):
data = {"level1": {"level2": {"other": "value"}}}
result = walk_tree(data, "target")
assert result is None
def test_walk_tree_list_found(self):
data = [{"other": "value"}, {"target": "found"}]
result = walk_tree(data, "target")
assert result == "found"
def test_walk_tree_nested_list_dict(self):
data = [{"level1": [{"target": "found"}]}]
result = walk_tree(data, "target")
assert result == "found"
def test_walk_tree_empty_data(self):
result = walk_tree({}, "target")
assert result is None
class TestGetPriceFromReservation:
def test_get_price_from_reservation_valid(self):
reservation = {
"payment_summary": {"subtitle": "Total cost: £150.00"}
}
result = get_price_from_reservation(reservation)
assert result == "150.00"
def test_get_price_from_reservation_different_amount(self):
reservation = {
"payment_summary": {"subtitle": "Total cost: £89.99"}
}
result = get_price_from_reservation(reservation)
assert result == "89.99"
class TestParseMultipleFiles:
@patch('agenda.airbnb.extract_booking_from_html')
def test_parse_multiple_files_single_file(self, mock_extract):
mock_booking = {
"type": "apartment",
"operator": "airbnb",
"name": "Test Apartment",
"booking_reference": "ABC123"
}
mock_extract.return_value = mock_booking
result = parse_multiple_files(["test1.html"])
assert len(result) == 1
assert result[0] == mock_booking
mock_extract.assert_called_once_with("test1.html")
@patch('agenda.airbnb.extract_booking_from_html')
def test_parse_multiple_files_multiple_files(self, mock_extract):
mock_booking1 = {"booking_reference": "ABC123"}
mock_booking2 = {"booking_reference": "DEF456"}
mock_extract.side_effect = [mock_booking1, mock_booking2]
result = parse_multiple_files(["test2.html", "test1.html"])
assert len(result) == 2
assert result[0] == mock_booking1
assert result[1] == mock_booking2
@patch('agenda.airbnb.extract_booking_from_html')
def test_parse_multiple_files_empty_list(self, mock_extract):
result = parse_multiple_files([])
assert result == []
mock_extract.assert_not_called()
class TestGetUiState:
@patch('lxml.html.etree')
def test_get_ui_state_with_mock_tree(self, mock_etree):
mock_tree = Mock()
mock_tree.xpath.return_value = ['{"test": [["uiState", {"key": "value"}]]}']
with patch('agenda.airbnb.walk_tree') as mock_walk:
mock_walk.return_value = [["key", "value"]]
result = get_ui_state(mock_tree)
assert result == {"key": "value"}
mock_tree.xpath.assert_called_once_with('//*[@id="data-injector-instances"]/text()')
class TestGetReservationData:
def test_get_reservation_data(self):
ui_state = {
"reservation": {
"scheduled_event": {
"rows": [
{"id": "row1", "data": "value1"},
{"id": "row2", "data": "value2"}
]
}
}
}
result = get_reservation_data(ui_state)
expected = {
"row1": {"id": "row1", "data": "value1"},
"row2": {"id": "row2", "data": "value2"}
}
assert result == expected