diff --git a/agenda/airbnb.py b/agenda/airbnb.py new file mode 100644 index 0000000..65ff5ad --- /dev/null +++ b/agenda/airbnb.py @@ -0,0 +1,189 @@ +"""Library for parsing Airbnb booking HTML files.""" + +import json +import re +import typing +from datetime import datetime +from typing import Any +from zoneinfo import ZoneInfo + +import lxml.html +import pycountry + +StrDict = dict[str, typing.Any] + + +def build_datetime(date_str: str, time_str: str, tz_name: str) -> datetime: + """ + Combine an ISO date string, HH:MM time string, and a timezone name + into a timezone-aware datetime in the specified timezone. + """ + dt_str = f"{date_str}T{time_str}" + naive_dt = datetime.fromisoformat(dt_str) + return naive_dt.replace(tzinfo=ZoneInfo(tz_name)) + + +def list_to_dict(items: list[typing.Any]) -> dict[str, typing.Any]: + """Convert a flat list to a dict, assuming alternating keys and values.""" + return {items[i]: items[i + 1] for i in range(0, len(items), 2)} + + +def extract_country_code(address: str) -> str | None: + """Return ISO 3166-1 alpha-2 country code from a free-text address.""" + address_lower = address.lower() + for country in pycountry.countries: + if country.name.lower() in address_lower: + return str(country.alpha_2.lower()) + if ( + hasattr(country, "official_name") + and country.official_name.lower() in address_lower + ): + return str(country.alpha_2.lower()) + return None + + +def get_json_blob(tree: Any) -> str: + data_id = "data-injector-instances" + js_string = tree.xpath(f'//*[@id="{data_id}"]/text()')[0] + return str(js_string) + + +def get_ui_state(tree: Any) -> StrDict: + data_id = "data-injector-instances" + js_string = tree.xpath(f'//*[@id="{data_id}"]/text()')[0] + big_blob = json.loads(str(js_string)) + ui_state = walk_tree(big_blob, "uiState") + return list_to_dict(ui_state[0]) + + +def get_reservation_data(ui_state: StrDict) -> StrDict: + return { + row["id"]: row for row in ui_state["reservation"]["scheduled_event"]["rows"] + } + + +def get_room_url(tree: Any) -> str | None: + for e in tree.xpath('//a[@data-testid="reservation-destination-link"]'): + href = e.get("href") + assert isinstance(href, str) + if not href.startswith("/room"): + continue + return "https://www.airbnb.co.uk" + href + return None + + +def get_price_from_reservation(reservation: StrDict) -> str: + price_string = reservation["payment_summary"]["subtitle"] + assert isinstance(price_string, str) + tc = "Total cost: " + assert price_string.startswith(tc) + price = price_string[len(tc) :] + assert price[0] == "£" + return price[1:] + + +def extract_booking_from_html(html_file: str) -> StrDict: + """Extract booking information from Airbnb HTML file.""" + + with open(html_file, "r", encoding="utf-8") as f: + text_content = f.read() + + confirmation_match = re.search( + r"/trips/v1/reservation-details/ro/RESERVATION2_CHECKIN/([A-Z0-9]+)", + text_content, + ) + if confirmation_match is None: + raise ValueError("Could not find confirmation code in HTML") + confirmation_code = confirmation_match.group(1) + + tree = lxml.html.parse(html_file) + root = tree.getroot() + try: + ui_state = get_ui_state(tree) + except Exception: + print(html_file) + raise + + reservation = get_reservation_data(ui_state) + m_guests = re.match(r"^(\d+) guests?$", reservation["guests"]["subtitle"]) + if m_guests is None: + raise ValueError("Could not parse number of guests") + number_of_adults = int(m_guests.group(1)) + + price = get_price_from_reservation(reservation) + metadata = ui_state["reservation"]["metadata"] + country_code = metadata["country"].lower() + + title = reservation["dynamic_marquee_title_image_v3"]["title"] + location = title.rpartition(" in ")[2] + + checkin_checkout = reservation["checkin_checkout_arrival_guide"] + check_in_time = checkin_checkout["leading_subtitle"] + check_out_time = checkin_checkout["trailing_subtitle"] + + check_in = build_datetime( + metadata["check_in_date"], check_in_time, metadata["timezone"] + ) + + check_out = build_datetime( + metadata["check_out_date"], check_out_time, metadata["timezone"] + ) + + address = reservation["map"]["address"] + + if "header_action.pdp" in reservation: + name = reservation["header_action.pdp"]["subtitle"] + else: + name = root.findtext(".//h1") + + booking = { + "type": "apartment", + "operator": "airbnb", + "name": name, + "location": location, + "booking_reference": confirmation_code, + "booking_url": f"https://www.airbnb.co.uk/trips/v1/reservation-details/ro/RESERVATION2_CHECKIN/{confirmation_code}", + "address": address, + "country": country_code, + "latitude": metadata["lat"], + "longitude": metadata["lng"], + "timezone": metadata["timezone"], + "from": check_in, + "to": check_out, + "price": price, + "currency": "GBP", + "number_of_adults": number_of_adults, + } + + room_url = get_room_url(tree) + if room_url is not None: + booking["url"] = room_url + + return booking + + +def walk_tree(data: Any, want_key: str) -> Any: + """Recursively search for a dict containing 'reservation' and return its value.""" + if isinstance(data, dict): + if want_key in data: + return data[want_key] + for key, value in data.items(): + result = walk_tree(value, want_key) + if result is not None: + return result + elif isinstance(data, list): + for item in data: + result = walk_tree(item, want_key) + if result is not None: + return result + return None + + +def parse_multiple_files(filenames: list[str]) -> list[StrDict]: + """Parse multiple Airbnb HTML files and return a list of booking dictionaries.""" + bookings = [] + for html_file in sorted(filenames): + booking = extract_booking_from_html(html_file) + assert booking + bookings.append(booking) + return bookings diff --git a/parse_airbnb.py b/parse_airbnb.py index 65b27fc..0f9843f 100755 --- a/parse_airbnb.py +++ b/parse_airbnb.py @@ -1,208 +1,17 @@ #!/usr/bin/python3 -import json -import re import sys -import typing -from datetime import datetime -from typing import Any -from zoneinfo import ZoneInfo -import lxml.html -import pycountry import yaml -StrDict = dict[str, typing.Any] - - -def build_datetime(date_str: str, time_str: str, tz_name: str) -> datetime: - """ - Combine an ISO date string, HH:MM time string, and a timezone name - into a timezone-aware datetime in the specified timezone. - """ - dt_str = f"{date_str}T{time_str}" - naive_dt = datetime.fromisoformat(dt_str) - return naive_dt.replace(tzinfo=ZoneInfo(tz_name)) - - -def list_to_dict(items: list) -> dict[str, int]: - """Convert a flat list to a dict, assuming alternating keys and values.""" - return {items[i]: items[i + 1] for i in range(0, len(items), 2)} - - -def extract_country_code(address: str) -> str | None: - """Return ISO 3166-1 alpha-2 country code from a free-text address.""" - address_lower = address.lower() - for country in pycountry.countries: - if country.name.lower() in address_lower: - return str(country.alpha_2.lower()) - if ( - hasattr(country, "official_name") - and country.official_name.lower() in address_lower - ): - return str(country.alpha_2.lower()) - return None - - -def get_json_blob(tree) -> str: - data_id = "data-injector-instances" - js_string = tree.xpath(f'//*[@id="{data_id}"]/text()')[0] - return js_string - - -def get_ui_state(tree) -> StrDict: - data_id = "data-injector-instances" - js_string = tree.xpath(f'//*[@id="{data_id}"]/text()')[0] - # print(js_string) - # sys.exit(0) - big_blob = json.loads(js_string) - ui_state = walk_tree(big_blob, "uiState") - # print(json.dumps(ui_state)) - - return list_to_dict(ui_state[0]) - - -def get_reservation_data(ui_state: StrDict) -> StrDict: - return { - row["id"]: row for row in ui_state["reservation"]["scheduled_event"]["rows"] - } - - -def get_room_url(tree) -> str: - for e in tree.xpath('//a[@data-testid="reservation-destination-link"]'): - href = e.get("href") - assert isinstance(href, str) - if not href.startswith("/room"): - continue - return "https://www.airbnb.co.uk" + href - - -def get_price_from_reservation(reservation: StrDict) -> str: - price_string = reservation["payment_summary"]["subtitle"] - assert isinstance(price_string, str) - tc = "Total cost: " - assert price_string.startswith(tc) - price = price_string[len(tc) :] - assert price[0] == "£" - return price[1:] - - -def extract_booking_from_html(html_file: str) -> StrDict: - """Extract booking information from Airbnb HTML file.""" - - with open(html_file, "r", encoding="utf-8") as f: - text_content = f.read() - - confirmation_code = re.search( - r"/trips/v1/reservation-details/ro/RESERVATION2_CHECKIN/([A-Z0-9]+)", - text_content, - ).group(1) - - tree = lxml.html.parse(html_file) - root = tree.getroot() - try: - ui_state = get_ui_state(tree) - except Exception: - print(html_file) - raise - # print(json.dumps(ui_state)) - - reservation = get_reservation_data(ui_state) - m_guests = re.match(r"^(\d+) guests?$", reservation["guests"]["subtitle"]) - number_of_adults = int(m_guests.group(1)) - - price = get_price_from_reservation(reservation) - # print(reservation["payment_summary"]["subtitle"]) - # print(json.dumps(reservation)) - metadata = ui_state["reservation"]["metadata"] - country_code = metadata["country"].lower() - # pprint(metadata) - - # print(json.dumps(x)) - - title = reservation["dynamic_marquee_title_image_v3"]["title"] - location = title.rpartition(" in ")[2] - - # print(json.dumps(reservation)) - checkin_checkout = reservation["checkin_checkout_arrival_guide"] - # pprint(checkin_checkout) - check_in_time = checkin_checkout["leading_subtitle"] - check_out_time = checkin_checkout["trailing_subtitle"] - - check_in = build_datetime( - metadata["check_in_date"], check_in_time, metadata["timezone"] - ) - - check_out = build_datetime( - metadata["check_out_date"], check_out_time, metadata["timezone"] - ) - - # print(check_in, check_out) - - address = reservation["map"]["address"] - - # country_code = extract_country_code(address) - - # if "header_action.pdp" not in reservation: - # pprint(reservation) - - if "header_action.pdp" in reservation: - name = reservation["header_action.pdp"]["subtitle"] - else: - name = root.findtext(".//h1") - - booking = { - "type": "apartment", - "operator": "airbnb", - "name": name, - "location": location, - "booking_reference": confirmation_code, - "booking_url": f"https://www.airbnb.co.uk/trips/v1/reservation-details/ro/RESERVATION2_CHECKIN/{confirmation_code}", - "address": address, - "country": country_code, - "latitude": metadata["lat"], - "longitude": metadata["lng"], - "timezone": metadata["timezone"], - "from": check_in, - "to": check_out, - "price": price, - "currency": "GBP", - "number_of_adults": number_of_adults, - } - - booking["url"] = get_room_url(tree) - - return booking - - -def walk_tree(data: Any, want_key: str) -> Any: - """Recursively search for a dict containing 'reservation' and return its value.""" - if isinstance(data, dict): - if want_key in data: - return data[want_key] - for key, value in data.items(): - result = walk_tree(value, want_key) - if result is not None: - return result - elif isinstance(data, list): - for item in data: - result = walk_tree(item, want_key) - if result is not None: - return result - return None +from agenda.airbnb import parse_multiple_files def main() -> None: """Main function.""" - filenames = sorted(sys.argv[1:]) - - bookings = [] - for html_file in filenames: - booking = extract_booking_from_html(html_file) - assert booking - bookings.append(booking) - + filenames = sys.argv[1:] + bookings = parse_multiple_files(filenames) print(yaml.dump(bookings, sort_keys=False)) diff --git a/tests/test_airbnb.py b/tests/test_airbnb.py new file mode 100644 index 0000000..7036cbf --- /dev/null +++ b/tests/test_airbnb.py @@ -0,0 +1,186 @@ +"""Tests for agenda.airbnb module.""" + +import pytest +from datetime import datetime +from zoneinfo import ZoneInfo +from unittest.mock import Mock, patch, mock_open + +from agenda.airbnb import ( + build_datetime, + list_to_dict, + extract_country_code, + walk_tree, + get_ui_state, + get_reservation_data, + get_price_from_reservation, + parse_multiple_files, +) + + +class TestBuildDatetime: + def test_build_datetime_utc(self): + result = build_datetime("2025-07-28", "15:30", "UTC") + expected = datetime(2025, 7, 28, 15, 30, tzinfo=ZoneInfo("UTC")) + assert result == expected + + def test_build_datetime_local_timezone(self): + result = build_datetime("2025-12-25", "09:00", "Europe/London") + expected = datetime(2025, 12, 25, 9, 0, tzinfo=ZoneInfo("Europe/London")) + assert result == expected + + +class TestListToDict: + def test_list_to_dict_even_items(self): + items = ["key1", "value1", "key2", "value2"] + result = list_to_dict(items) + expected = {"key1": "value1", "key2": "value2"} + assert result == expected + + def test_list_to_dict_empty_list(self): + result = list_to_dict([]) + assert result == {} + + def test_list_to_dict_single_pair(self): + items = ["name", "John"] + result = list_to_dict(items) + assert result == {"name": "John"} + + +class TestExtractCountryCode: + def test_extract_country_code_uk(self): + address = "123 Main Street, London, United Kingdom" + result = extract_country_code(address) + assert result == "gb" + + def test_extract_country_code_france(self): + address = "456 Rue de la Paix, Paris, France" + result = extract_country_code(address) + assert result == "fr" + + def test_extract_country_code_usa(self): + address = "789 Broadway, New York, United States" + result = extract_country_code(address) + assert result == "us" + + def test_extract_country_code_not_found(self): + address = "123 Unknown Street, Mystery City" + result = extract_country_code(address) + assert result is None + + def test_extract_country_code_case_insensitive(self): + address = "123 Main Street, UNITED KINGDOM" + result = extract_country_code(address) + assert result == "gb" + + +class TestWalkTree: + def test_walk_tree_dict_found(self): + data = {"level1": {"level2": {"target": "found"}}} + result = walk_tree(data, "target") + assert result == "found" + + def test_walk_tree_dict_not_found(self): + data = {"level1": {"level2": {"other": "value"}}} + result = walk_tree(data, "target") + assert result is None + + def test_walk_tree_list_found(self): + data = [{"other": "value"}, {"target": "found"}] + result = walk_tree(data, "target") + assert result == "found" + + def test_walk_tree_nested_list_dict(self): + data = [{"level1": [{"target": "found"}]}] + result = walk_tree(data, "target") + assert result == "found" + + def test_walk_tree_empty_data(self): + result = walk_tree({}, "target") + assert result is None + + +class TestGetPriceFromReservation: + def test_get_price_from_reservation_valid(self): + reservation = { + "payment_summary": {"subtitle": "Total cost: £150.00"} + } + result = get_price_from_reservation(reservation) + assert result == "150.00" + + def test_get_price_from_reservation_different_amount(self): + reservation = { + "payment_summary": {"subtitle": "Total cost: £89.99"} + } + result = get_price_from_reservation(reservation) + assert result == "89.99" + + +class TestParseMultipleFiles: + @patch('agenda.airbnb.extract_booking_from_html') + def test_parse_multiple_files_single_file(self, mock_extract): + mock_booking = { + "type": "apartment", + "operator": "airbnb", + "name": "Test Apartment", + "booking_reference": "ABC123" + } + mock_extract.return_value = mock_booking + + result = parse_multiple_files(["test1.html"]) + + assert len(result) == 1 + assert result[0] == mock_booking + mock_extract.assert_called_once_with("test1.html") + + @patch('agenda.airbnb.extract_booking_from_html') + def test_parse_multiple_files_multiple_files(self, mock_extract): + mock_booking1 = {"booking_reference": "ABC123"} + mock_booking2 = {"booking_reference": "DEF456"} + mock_extract.side_effect = [mock_booking1, mock_booking2] + + result = parse_multiple_files(["test2.html", "test1.html"]) + + assert len(result) == 2 + assert result[0] == mock_booking1 + assert result[1] == mock_booking2 + + @patch('agenda.airbnb.extract_booking_from_html') + def test_parse_multiple_files_empty_list(self, mock_extract): + result = parse_multiple_files([]) + assert result == [] + mock_extract.assert_not_called() + + +class TestGetUiState: + @patch('lxml.html.etree') + def test_get_ui_state_with_mock_tree(self, mock_etree): + mock_tree = Mock() + mock_tree.xpath.return_value = ['{"test": [["uiState", {"key": "value"}]]}'] + + with patch('agenda.airbnb.walk_tree') as mock_walk: + mock_walk.return_value = [["key", "value"]] + result = get_ui_state(mock_tree) + + assert result == {"key": "value"} + mock_tree.xpath.assert_called_once_with('//*[@id="data-injector-instances"]/text()') + + +class TestGetReservationData: + def test_get_reservation_data(self): + ui_state = { + "reservation": { + "scheduled_event": { + "rows": [ + {"id": "row1", "data": "value1"}, + {"id": "row2", "data": "value2"} + ] + } + } + } + + result = get_reservation_data(ui_state) + expected = { + "row1": {"id": "row1", "data": "value1"}, + "row2": {"id": "row2", "data": "value2"} + } + assert result == expected \ No newline at end of file