diff --git a/agenda/airbnb.py b/agenda/airbnb.py deleted file mode 100644 index dfbf3c2..0000000 --- a/agenda/airbnb.py +++ /dev/null @@ -1,190 +0,0 @@ -"""Library for parsing Airbnb booking HTML files.""" - -import json -import re -import typing -from datetime import datetime -from typing import Any -from zoneinfo import ZoneInfo - -import lxml.html -import pycountry - -StrDict = dict[str, typing.Any] - - -def build_datetime(date_str: str, time_str: str, tz_name: str) -> datetime: - """ - Combine an ISO date string, HH:MM time string, and a timezone name - into a timezone-aware datetime in the specified timezone. - """ - dt_str = f"{date_str}T{time_str}" - naive_dt = datetime.fromisoformat(dt_str) - return naive_dt.replace(tzinfo=ZoneInfo(tz_name)) - - -def list_to_dict(items: list[typing.Any]) -> dict[str, typing.Any]: - """Convert a flat list to a dict, assuming alternating keys and values.""" - return {items[i]: items[i + 1] for i in range(0, len(items), 2)} - - -def extract_country_code(address: str) -> str | None: - """Return ISO 3166-1 alpha-2 country code from a free-text address.""" - address_lower = address.lower() - for country in pycountry.countries: - if country.name.lower() in address_lower: - return str(country.alpha_2.lower()) - if ( - hasattr(country, "official_name") - and country.official_name.lower() in address_lower - ): - return str(country.alpha_2.lower()) - return None - - -def get_json_blob(tree: Any) -> str: - data_id = "data-injector-instances" - js_string = tree.xpath(f'//*[@id="{data_id}"]/text()')[0] - return str(js_string) - - -def get_ui_state(tree: Any) -> StrDict: - data_id = "data-injector-instances" - js_string = tree.xpath(f'//*[@id="{data_id}"]/text()')[0] - big_blob = json.loads(str(js_string)) - ui_state = walk_tree(big_blob, "uiState") - return list_to_dict(ui_state[0]) - - -def get_reservation_data(ui_state: StrDict) -> StrDict: - return { - row["id"]: row for row in ui_state["reservation"]["scheduled_event"]["rows"] - } - - -def get_room_url(tree: Any) -> str | None: - for e in tree.xpath('//a[@data-testid="reservation-destination-link"]'): - href = e.get("href") - assert isinstance(href, str) - if not href.startswith("/room"): - continue - return "https://www.airbnb.co.uk" + href - return None - - -def get_price_from_reservation(reservation: StrDict) -> str: - price = reservation["payment_summary"]["subtitle"] - assert isinstance(price, str) - tc = "Total cost: " - if price.startswith(tc): - price = price[len(tc) :] - assert price[0] == "£" - return price[1:] - - -def extract_booking_from_html(html_file: str) -> StrDict: - """Extract booking information from Airbnb HTML file.""" - - with open(html_file, "r", encoding="utf-8") as f: - text_content = f.read() - - confirmation_match = re.search( - r"/trips/v1/reservation-details/ro/RESERVATION2_CHECKIN/([A-Z0-9]+)", - text_content, - ) - if confirmation_match is None: - raise ValueError("Could not find confirmation code in HTML") - confirmation_code = confirmation_match.group(1) - - tree = lxml.html.parse(html_file) - root = tree.getroot() - try: - ui_state = get_ui_state(tree) - except Exception: - print(html_file) - raise - - reservation = get_reservation_data(ui_state) - m_guests = re.match(r"^(\d+) guests?$", reservation["guests"]["subtitle"]) - if m_guests is None: - raise ValueError("Could not parse number of guests") - number_of_adults = int(m_guests.group(1)) - - price = get_price_from_reservation(reservation) - metadata = ui_state["reservation"]["metadata"] - country_code = metadata["country"].lower() - - title = reservation["dynamic_marquee_title_image_v3"]["title"] - location = title.rpartition(" in ")[2] - - checkin_checkout = reservation["checkin_checkout_arrival_guide"] - check_in_time = checkin_checkout["leading_subtitle"] - check_out_time = checkin_checkout["trailing_subtitle"] - - check_in = build_datetime( - metadata["check_in_date"], check_in_time, metadata["timezone"] - ) - - check_out = build_datetime( - metadata["check_out_date"], check_out_time, metadata["timezone"] - ) - - address = reservation["map"]["address"] if "map" in reservation else None - - if "header_action.pdp" in reservation: - name = reservation["header_action.pdp"]["subtitle"] - else: - name = root.findtext(".//h1") - - booking = { - "type": "apartment", - "operator": "airbnb", - "name": name, - "location": location, - "booking_reference": confirmation_code, - "booking_url": f"https://www.airbnb.co.uk/trips/v1/reservation-details/ro/RESERVATION2_CHECKIN/{confirmation_code}", - "country": country_code, - "latitude": metadata["lat"], - "longitude": metadata["lng"], - "timezone": metadata["timezone"], - "from": check_in, - "to": check_out, - "price": price, - "currency": "GBP", - "number_of_adults": number_of_adults, - } - if address: - booking["address"] = address - - room_url = get_room_url(tree) - if room_url is not None: - booking["url"] = room_url - - return booking - - -def walk_tree(data: Any, want_key: str) -> Any: - """Recursively search for a dict containing 'reservation' and return its value.""" - if isinstance(data, dict): - if want_key in data: - return data[want_key] - for key, value in data.items(): - result = walk_tree(value, want_key) - if result is not None: - return result - elif isinstance(data, list): - for item in data: - result = walk_tree(item, want_key) - if result is not None: - return result - return None - - -def parse_multiple_files(filenames: list[str]) -> list[StrDict]: - """Parse multiple Airbnb HTML files and return a list of booking dictionaries.""" - bookings = [] - for html_file in sorted(filenames): - booking = extract_booking_from_html(html_file) - assert booking - bookings.append(booking) - return bookings diff --git a/agenda/types.py b/agenda/types.py index 67a9545..6ba7f23 100644 --- a/agenda/types.py +++ b/agenda/types.py @@ -2,11 +2,11 @@ import collections import datetime +from datetime import date import functools import typing from collections import defaultdict from dataclasses import dataclass, field -from datetime import date import emoji from pycountry.db import Country @@ -77,9 +77,6 @@ class Trip: event["title"] for event in self.events ] or self.titles_from_travel() - if not titles: - titles = [acc["location"] for acc in self.accommodation] - return format_list_with_ampersand(titles) or "[unnamed trip]" def titles_from_travel(self) -> list[str]: diff --git a/parse_airbnb.py b/parse_airbnb.py index 0f9843f..65b27fc 100755 --- a/parse_airbnb.py +++ b/parse_airbnb.py @@ -1,17 +1,208 @@ #!/usr/bin/python3 +import json +import re import sys +import typing +from datetime import datetime +from typing import Any +from zoneinfo import ZoneInfo +import lxml.html +import pycountry import yaml -from agenda.airbnb import parse_multiple_files +StrDict = dict[str, typing.Any] + + +def build_datetime(date_str: str, time_str: str, tz_name: str) -> datetime: + """ + Combine an ISO date string, HH:MM time string, and a timezone name + into a timezone-aware datetime in the specified timezone. + """ + dt_str = f"{date_str}T{time_str}" + naive_dt = datetime.fromisoformat(dt_str) + return naive_dt.replace(tzinfo=ZoneInfo(tz_name)) + + +def list_to_dict(items: list) -> dict[str, int]: + """Convert a flat list to a dict, assuming alternating keys and values.""" + return {items[i]: items[i + 1] for i in range(0, len(items), 2)} + + +def extract_country_code(address: str) -> str | None: + """Return ISO 3166-1 alpha-2 country code from a free-text address.""" + address_lower = address.lower() + for country in pycountry.countries: + if country.name.lower() in address_lower: + return str(country.alpha_2.lower()) + if ( + hasattr(country, "official_name") + and country.official_name.lower() in address_lower + ): + return str(country.alpha_2.lower()) + return None + + +def get_json_blob(tree) -> str: + data_id = "data-injector-instances" + js_string = tree.xpath(f'//*[@id="{data_id}"]/text()')[0] + return js_string + + +def get_ui_state(tree) -> StrDict: + data_id = "data-injector-instances" + js_string = tree.xpath(f'//*[@id="{data_id}"]/text()')[0] + # print(js_string) + # sys.exit(0) + big_blob = json.loads(js_string) + ui_state = walk_tree(big_blob, "uiState") + # print(json.dumps(ui_state)) + + return list_to_dict(ui_state[0]) + + +def get_reservation_data(ui_state: StrDict) -> StrDict: + return { + row["id"]: row for row in ui_state["reservation"]["scheduled_event"]["rows"] + } + + +def get_room_url(tree) -> str: + for e in tree.xpath('//a[@data-testid="reservation-destination-link"]'): + href = e.get("href") + assert isinstance(href, str) + if not href.startswith("/room"): + continue + return "https://www.airbnb.co.uk" + href + + +def get_price_from_reservation(reservation: StrDict) -> str: + price_string = reservation["payment_summary"]["subtitle"] + assert isinstance(price_string, str) + tc = "Total cost: " + assert price_string.startswith(tc) + price = price_string[len(tc) :] + assert price[0] == "£" + return price[1:] + + +def extract_booking_from_html(html_file: str) -> StrDict: + """Extract booking information from Airbnb HTML file.""" + + with open(html_file, "r", encoding="utf-8") as f: + text_content = f.read() + + confirmation_code = re.search( + r"/trips/v1/reservation-details/ro/RESERVATION2_CHECKIN/([A-Z0-9]+)", + text_content, + ).group(1) + + tree = lxml.html.parse(html_file) + root = tree.getroot() + try: + ui_state = get_ui_state(tree) + except Exception: + print(html_file) + raise + # print(json.dumps(ui_state)) + + reservation = get_reservation_data(ui_state) + m_guests = re.match(r"^(\d+) guests?$", reservation["guests"]["subtitle"]) + number_of_adults = int(m_guests.group(1)) + + price = get_price_from_reservation(reservation) + # print(reservation["payment_summary"]["subtitle"]) + # print(json.dumps(reservation)) + metadata = ui_state["reservation"]["metadata"] + country_code = metadata["country"].lower() + # pprint(metadata) + + # print(json.dumps(x)) + + title = reservation["dynamic_marquee_title_image_v3"]["title"] + location = title.rpartition(" in ")[2] + + # print(json.dumps(reservation)) + checkin_checkout = reservation["checkin_checkout_arrival_guide"] + # pprint(checkin_checkout) + check_in_time = checkin_checkout["leading_subtitle"] + check_out_time = checkin_checkout["trailing_subtitle"] + + check_in = build_datetime( + metadata["check_in_date"], check_in_time, metadata["timezone"] + ) + + check_out = build_datetime( + metadata["check_out_date"], check_out_time, metadata["timezone"] + ) + + # print(check_in, check_out) + + address = reservation["map"]["address"] + + # country_code = extract_country_code(address) + + # if "header_action.pdp" not in reservation: + # pprint(reservation) + + if "header_action.pdp" in reservation: + name = reservation["header_action.pdp"]["subtitle"] + else: + name = root.findtext(".//h1") + + booking = { + "type": "apartment", + "operator": "airbnb", + "name": name, + "location": location, + "booking_reference": confirmation_code, + "booking_url": f"https://www.airbnb.co.uk/trips/v1/reservation-details/ro/RESERVATION2_CHECKIN/{confirmation_code}", + "address": address, + "country": country_code, + "latitude": metadata["lat"], + "longitude": metadata["lng"], + "timezone": metadata["timezone"], + "from": check_in, + "to": check_out, + "price": price, + "currency": "GBP", + "number_of_adults": number_of_adults, + } + + booking["url"] = get_room_url(tree) + + return booking + + +def walk_tree(data: Any, want_key: str) -> Any: + """Recursively search for a dict containing 'reservation' and return its value.""" + if isinstance(data, dict): + if want_key in data: + return data[want_key] + for key, value in data.items(): + result = walk_tree(value, want_key) + if result is not None: + return result + elif isinstance(data, list): + for item in data: + result = walk_tree(item, want_key) + if result is not None: + return result + return None def main() -> None: """Main function.""" - filenames = sys.argv[1:] - bookings = parse_multiple_files(filenames) + filenames = sorted(sys.argv[1:]) + + bookings = [] + for html_file in filenames: + booking = extract_booking_from_html(html_file) + assert booking + bookings.append(booking) + print(yaml.dump(bookings, sort_keys=False)) diff --git a/tests/test_airbnb.py b/tests/test_airbnb.py deleted file mode 100644 index 7036cbf..0000000 --- a/tests/test_airbnb.py +++ /dev/null @@ -1,186 +0,0 @@ -"""Tests for agenda.airbnb module.""" - -import pytest -from datetime import datetime -from zoneinfo import ZoneInfo -from unittest.mock import Mock, patch, mock_open - -from agenda.airbnb import ( - build_datetime, - list_to_dict, - extract_country_code, - walk_tree, - get_ui_state, - get_reservation_data, - get_price_from_reservation, - parse_multiple_files, -) - - -class TestBuildDatetime: - def test_build_datetime_utc(self): - result = build_datetime("2025-07-28", "15:30", "UTC") - expected = datetime(2025, 7, 28, 15, 30, tzinfo=ZoneInfo("UTC")) - assert result == expected - - def test_build_datetime_local_timezone(self): - result = build_datetime("2025-12-25", "09:00", "Europe/London") - expected = datetime(2025, 12, 25, 9, 0, tzinfo=ZoneInfo("Europe/London")) - assert result == expected - - -class TestListToDict: - def test_list_to_dict_even_items(self): - items = ["key1", "value1", "key2", "value2"] - result = list_to_dict(items) - expected = {"key1": "value1", "key2": "value2"} - assert result == expected - - def test_list_to_dict_empty_list(self): - result = list_to_dict([]) - assert result == {} - - def test_list_to_dict_single_pair(self): - items = ["name", "John"] - result = list_to_dict(items) - assert result == {"name": "John"} - - -class TestExtractCountryCode: - def test_extract_country_code_uk(self): - address = "123 Main Street, London, United Kingdom" - result = extract_country_code(address) - assert result == "gb" - - def test_extract_country_code_france(self): - address = "456 Rue de la Paix, Paris, France" - result = extract_country_code(address) - assert result == "fr" - - def test_extract_country_code_usa(self): - address = "789 Broadway, New York, United States" - result = extract_country_code(address) - assert result == "us" - - def test_extract_country_code_not_found(self): - address = "123 Unknown Street, Mystery City" - result = extract_country_code(address) - assert result is None - - def test_extract_country_code_case_insensitive(self): - address = "123 Main Street, UNITED KINGDOM" - result = extract_country_code(address) - assert result == "gb" - - -class TestWalkTree: - def test_walk_tree_dict_found(self): - data = {"level1": {"level2": {"target": "found"}}} - result = walk_tree(data, "target") - assert result == "found" - - def test_walk_tree_dict_not_found(self): - data = {"level1": {"level2": {"other": "value"}}} - result = walk_tree(data, "target") - assert result is None - - def test_walk_tree_list_found(self): - data = [{"other": "value"}, {"target": "found"}] - result = walk_tree(data, "target") - assert result == "found" - - def test_walk_tree_nested_list_dict(self): - data = [{"level1": [{"target": "found"}]}] - result = walk_tree(data, "target") - assert result == "found" - - def test_walk_tree_empty_data(self): - result = walk_tree({}, "target") - assert result is None - - -class TestGetPriceFromReservation: - def test_get_price_from_reservation_valid(self): - reservation = { - "payment_summary": {"subtitle": "Total cost: £150.00"} - } - result = get_price_from_reservation(reservation) - assert result == "150.00" - - def test_get_price_from_reservation_different_amount(self): - reservation = { - "payment_summary": {"subtitle": "Total cost: £89.99"} - } - result = get_price_from_reservation(reservation) - assert result == "89.99" - - -class TestParseMultipleFiles: - @patch('agenda.airbnb.extract_booking_from_html') - def test_parse_multiple_files_single_file(self, mock_extract): - mock_booking = { - "type": "apartment", - "operator": "airbnb", - "name": "Test Apartment", - "booking_reference": "ABC123" - } - mock_extract.return_value = mock_booking - - result = parse_multiple_files(["test1.html"]) - - assert len(result) == 1 - assert result[0] == mock_booking - mock_extract.assert_called_once_with("test1.html") - - @patch('agenda.airbnb.extract_booking_from_html') - def test_parse_multiple_files_multiple_files(self, mock_extract): - mock_booking1 = {"booking_reference": "ABC123"} - mock_booking2 = {"booking_reference": "DEF456"} - mock_extract.side_effect = [mock_booking1, mock_booking2] - - result = parse_multiple_files(["test2.html", "test1.html"]) - - assert len(result) == 2 - assert result[0] == mock_booking1 - assert result[1] == mock_booking2 - - @patch('agenda.airbnb.extract_booking_from_html') - def test_parse_multiple_files_empty_list(self, mock_extract): - result = parse_multiple_files([]) - assert result == [] - mock_extract.assert_not_called() - - -class TestGetUiState: - @patch('lxml.html.etree') - def test_get_ui_state_with_mock_tree(self, mock_etree): - mock_tree = Mock() - mock_tree.xpath.return_value = ['{"test": [["uiState", {"key": "value"}]]}'] - - with patch('agenda.airbnb.walk_tree') as mock_walk: - mock_walk.return_value = [["key", "value"]] - result = get_ui_state(mock_tree) - - assert result == {"key": "value"} - mock_tree.xpath.assert_called_once_with('//*[@id="data-injector-instances"]/text()') - - -class TestGetReservationData: - def test_get_reservation_data(self): - ui_state = { - "reservation": { - "scheduled_event": { - "rows": [ - {"id": "row1", "data": "value1"}, - {"id": "row2", "data": "value2"} - ] - } - } - } - - result = get_reservation_data(ui_state) - expected = { - "row1": {"id": "row1", "data": "value1"}, - "row2": {"id": "row2", "data": "value2"} - } - assert result == expected \ No newline at end of file