Playwright is heavyweight and currently broken. The login now uses the OAuth2 token endpoint directly with requests library instead of browser automation. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
217 lines
5.9 KiB
Python
Executable file
217 lines
5.9 KiB
Python
Executable file
#!/usr/bin/python3
|
|
"""Retrieve Wheelie Fresh Bins cleaning schedule and save HTML to file."""
|
|
|
|
import configparser
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import typing
|
|
from datetime import date, datetime, timezone
|
|
|
|
import ics # type: ignore
|
|
import jinja2
|
|
import requests
|
|
|
|
base_dir = os.path.dirname(__file__)
|
|
|
|
templates_dir = os.path.join(base_dir, "templates")
|
|
env = jinja2.Environment(loader=jinja2.FileSystemLoader(templates_dir))
|
|
|
|
template = env.get_template("schedule.html")
|
|
|
|
config_location = os.path.join(base_dir, "config")
|
|
auth_json_path = os.path.join(base_dir, "auth.json")
|
|
|
|
assert os.path.exists(config_location) and os.path.exists(auth_json_path)
|
|
config = configparser.ConfigParser()
|
|
config.read(config_location)
|
|
username = config["login"]["username"]
|
|
password = config["login"]["password"]
|
|
data_dir = config["location"]["data"]
|
|
|
|
no_permission = "You do not have permission to view this directory or page."
|
|
booking_id = config["booking"]["booking_id"]
|
|
|
|
login_url = "https://portal.wheeliefreshbins.com/Account/Login"
|
|
summary_url = "https://portal.wheeliefreshbins.com/Home/Summary"
|
|
|
|
dest = config["location"]["dest"]
|
|
ics_file = config["location"]["ics_file"]
|
|
|
|
|
|
def perform_login() -> None:
|
|
"""Login to the Wheelie Fresh Bin website using OAuth2 token endpoint."""
|
|
session = requests.Session()
|
|
|
|
# OAuth2 password grant format
|
|
token_data = {
|
|
"grant_type": "password",
|
|
"username": username,
|
|
"password": password,
|
|
}
|
|
|
|
# POST to the token endpoint
|
|
token_url = "https://portal.wheeliefreshbins.com/Token"
|
|
response = session.post(token_url, data=token_data)
|
|
|
|
if response.status_code != 200:
|
|
raise Exception(f"Login failed with status {response.status_code}: {response.text}")
|
|
|
|
# The token endpoint should set the authentication cookie
|
|
cookie_value = session.cookies.get(".AspNet.Cookies")
|
|
if not cookie_value:
|
|
raise Exception("Authentication cookie not found after login")
|
|
|
|
# Save cookie in the same format as Playwright did
|
|
auth_data = {
|
|
"cookies": [
|
|
{
|
|
"name": ".AspNet.Cookies",
|
|
"value": cookie_value,
|
|
"domain": "portal.wheeliefreshbins.com",
|
|
"path": "/",
|
|
}
|
|
]
|
|
}
|
|
|
|
with open(auth_json_path, "w") as f:
|
|
json.dump(auth_data, f, indent=2)
|
|
|
|
|
|
def get_cookie_value() -> str:
|
|
"""Get the value of the cookie we need from auth.json."""
|
|
auth = json.load(open(auth_json_path))
|
|
v: str = next(
|
|
cookie["value"]
|
|
for cookie in auth["cookies"]
|
|
if cookie["name"] == ".AspNet.Cookies"
|
|
)
|
|
return v
|
|
|
|
|
|
def retrieve_schedule() -> requests.models.Response:
|
|
"""Retrieve the bin cleaning schedule from the user dashboard."""
|
|
return requests.post(
|
|
"https://portal.wheeliefreshbins.com/home/schedule",
|
|
json={"bookingId": booking_id},
|
|
cookies={".AspNet.Cookies": get_cookie_value()},
|
|
)
|
|
|
|
|
|
def read_html_from_json(r: requests.models.Response) -> str:
|
|
"""Return HTML from the JSON response."""
|
|
return typing.cast(str, r.json()["html"])
|
|
|
|
|
|
def login() -> None:
|
|
"""Login to Wheelie Fresh Bins."""
|
|
perform_login()
|
|
|
|
|
|
def get_schedule_html() -> str | typing.NoReturn:
|
|
"""Grab the schedule and return the HTML part of the response."""
|
|
if not os.path.exists(auth_json_path):
|
|
login()
|
|
r = retrieve_schedule()
|
|
if r.text != no_permission:
|
|
return read_html_from_json(r)
|
|
|
|
login()
|
|
|
|
r = retrieve_schedule()
|
|
if r.text != no_permission:
|
|
return read_html_from_json(r)
|
|
|
|
print("login failed")
|
|
sys.exit(1)
|
|
|
|
|
|
re_div = re.compile(r"<div[^>]*?>.*?</div>")
|
|
re_bin = re.compile('<div class="col-xs-3 bincell.*(black|blue|green)bin">(.*?)</div>')
|
|
re_date = re.compile(r'<div class="[^"].*?">(\d{2} [A-Za-z]{3} \d{4})<\/div>')
|
|
|
|
|
|
def parse_bin_date(bin_date: str) -> date:
|
|
"""Parse bin date with year."""
|
|
return datetime.strptime(bin_date, "%A, %d %b %Y").date()
|
|
|
|
|
|
def find_date(d1: date, target: str) -> date:
|
|
"""Find the next occurrence of the same day and month."""
|
|
d2 = parse_bin_date(f"{target} {d1.year}")
|
|
if d2 < d1:
|
|
d2 = parse_bin_date(f"{target} {d1.year + 1}")
|
|
assert d1 <= d2
|
|
|
|
return d2
|
|
|
|
|
|
def get_date_from_line(line: str) -> date:
|
|
"""Read date from line."""
|
|
m_date = re_date.match(line)
|
|
assert m_date
|
|
return datetime.strptime(m_date.group(1), "%d %b %Y").date()
|
|
|
|
|
|
def parse_part(d: date, part: str) -> date | None:
|
|
"""Parse part."""
|
|
if "bincell" not in part:
|
|
return None
|
|
m = re_bin.match(part)
|
|
if not m:
|
|
print(part)
|
|
assert m
|
|
bin_colour, date_str = m.groups()
|
|
if date_str.endswith("Christmas Closure"):
|
|
return None
|
|
return find_date(d, date_str)
|
|
|
|
|
|
def html_to_ics(html: str) -> ics.Calendar:
|
|
"""Parse HTML file, return calendar."""
|
|
bin_dates: set[date] = set()
|
|
|
|
for line in html.splitlines():
|
|
if "weekcell" not in line:
|
|
continue
|
|
line = line.strip()
|
|
d = get_date_from_line(line)
|
|
|
|
bin_dates.update(
|
|
d for d in (parse_part(d, part) for part in re_div.findall(line)[1:]) if d
|
|
)
|
|
|
|
cal = ics.Calendar()
|
|
|
|
for d in bin_dates:
|
|
event = ics.Event()
|
|
event.name = "Wheelie Fresh Bins"
|
|
event.begin = d
|
|
event.make_all_day()
|
|
cal.events.add(event)
|
|
|
|
return cal
|
|
|
|
|
|
def main() -> None:
|
|
"""Get schedule and save as web page."""
|
|
html = get_schedule_html()
|
|
page = template.render(html=html)
|
|
|
|
# Drop the schedbody class because it sets max height to 400px and adds a scrollbar
|
|
with open(dest, "w") as fh:
|
|
fh.write(page.replace("schedbody ", '"'))
|
|
|
|
cal = html_to_ics(html)
|
|
with open(ics_file, "w") as fh:
|
|
fh.write(cal.serialize())
|
|
|
|
now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d_%H:%M")
|
|
filename = os.path.join(data_dir, now_str + ".html")
|
|
with open(filename, "w") as fh:
|
|
fh.write(page)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|