210 lines
6.7 KiB
Python
210 lines
6.7 KiB
Python
"""Waste collection schedules."""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import typing
|
|
from collections import defaultdict
|
|
from datetime import date, datetime, time, timedelta
|
|
|
|
import httpx
|
|
import lxml.html
|
|
|
|
from . import uk_time
|
|
from .types import Event
|
|
|
|
ttl_hours = 12
|
|
|
|
|
|
def make_waste_dir(data_dir: str) -> None:
|
|
"""Make waste dir if missing."""
|
|
waste_dir = os.path.join(data_dir, "waste")
|
|
if not os.path.exists(waste_dir):
|
|
os.mkdir(waste_dir)
|
|
|
|
|
|
async def get_html(data_dir: str, postcode: str, uprn: str) -> str:
|
|
"""Get waste schedule."""
|
|
now = datetime.now()
|
|
waste_dir = os.path.join(data_dir, "waste")
|
|
|
|
make_waste_dir(data_dir)
|
|
|
|
existing_data = os.listdir(waste_dir)
|
|
existing = [f for f in existing_data if f.endswith(".html")]
|
|
if existing:
|
|
recent_filename = max(existing)
|
|
recent = datetime.strptime(recent_filename, "%Y-%m-%d_%H:%M.html")
|
|
delta = now - recent
|
|
|
|
if existing and delta < timedelta(hours=ttl_hours):
|
|
return open(os.path.join(waste_dir, recent_filename)).read()
|
|
|
|
now_str = now.strftime("%Y-%m-%d_%H:%M")
|
|
filename = f"{waste_dir}/{now_str}.html"
|
|
|
|
forms_base_url = "https://forms.n-somerset.gov.uk"
|
|
# url2 = "https://forms.n-somerset.gov.uk/Waste/CollectionSchedule/ViewSchedule"
|
|
url = "https://forms.n-somerset.gov.uk/Waste/CollectionSchedule"
|
|
async with httpx.AsyncClient() as client:
|
|
r = await client.post(
|
|
url,
|
|
data={
|
|
"PreviousHouse": "",
|
|
"PreviousPostcode": "-",
|
|
"Postcode": postcode,
|
|
"SelectedUprn": uprn,
|
|
},
|
|
)
|
|
form_post_html = r.text
|
|
pattern = r'<h2>Object moved to <a href="([^"]*)">here<\/a>\.<\/h2>'
|
|
m = re.search(pattern, form_post_html)
|
|
if m:
|
|
r = await client.get(forms_base_url + m.group(1))
|
|
html = r.text
|
|
open(filename, "w").write(html)
|
|
return html
|
|
|
|
|
|
def parse_waste_schedule_date(day_and_month: str) -> date:
|
|
"""Parse waste schedule date."""
|
|
today = date.today()
|
|
this_year = today.year
|
|
date_format = "%A %d %B %Y"
|
|
d = datetime.strptime(f"{day_and_month} {this_year}", date_format).date()
|
|
if d < today:
|
|
d = datetime.strptime(f"{day_and_month} {this_year + 1}", date_format).date()
|
|
return d
|
|
|
|
|
|
def parse(root: lxml.html.HtmlElement) -> list[Event]:
|
|
"""Parse waste schedule."""
|
|
tbody = root.find(".//table/tbody")
|
|
assert tbody is not None
|
|
by_date = defaultdict(list)
|
|
for e_service, e_next_date, e_following in tbody:
|
|
assert e_service.text and e_next_date.text and e_following.text
|
|
service = e_service.text
|
|
next_date = parse_waste_schedule_date(e_next_date.text)
|
|
following_date = parse_waste_schedule_date(e_following.text)
|
|
|
|
by_date[next_date].append(service)
|
|
by_date[following_date].append(service)
|
|
|
|
return [
|
|
Event(
|
|
name="waste_schedule",
|
|
date=uk_time(d, time(6, 30)),
|
|
title="Backwell: " + ", ".join(services),
|
|
)
|
|
for d, services in by_date.items()
|
|
]
|
|
|
|
|
|
BristolSchedule = list[dict[str, typing.Any]]
|
|
|
|
|
|
async def get_bristol_data(data_dir: str, uprn: str) -> BristolSchedule:
|
|
"""Get Bristol Waste schedule, with cache."""
|
|
now = datetime.now()
|
|
waste_dir = os.path.join(data_dir, "waste")
|
|
|
|
make_waste_dir(data_dir)
|
|
|
|
existing_data = os.listdir(waste_dir)
|
|
existing = [f for f in existing_data if f.endswith(f"_{uprn}.json")]
|
|
if existing:
|
|
recent_filename = max(existing)
|
|
recent = datetime.strptime(recent_filename, f"%Y-%m-%d_%H:%M_{uprn}.json")
|
|
delta = now - recent
|
|
|
|
def get_from_recent() -> BristolSchedule:
|
|
json_data = json.load(open(os.path.join(waste_dir, recent_filename)))
|
|
return typing.cast(BristolSchedule, json_data["data"])
|
|
|
|
if existing and delta < timedelta(hours=ttl_hours):
|
|
return get_from_recent()
|
|
|
|
try:
|
|
r = await get_bristol_gov_uk_data(uprn)
|
|
except httpx.ReadTimeout:
|
|
return get_from_recent()
|
|
|
|
with open(f'{waste_dir}/{now.strftime("%Y-%m-%d_%H:%M")}_{uprn}.json', "wb") as out:
|
|
out.write(r.content)
|
|
|
|
return typing.cast(BristolSchedule, r.json()["data"])
|
|
|
|
|
|
async def get_bristol_gov_uk_data(uprn: str) -> httpx.Response:
|
|
"""Get JSON from Bristol City Council."""
|
|
UA = "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
|
|
HEADERS = {
|
|
"Accept": "*/*",
|
|
"Accept-Language": "en-GB,en;q=0.9",
|
|
"Connection": "keep-alive",
|
|
"Ocp-Apim-Subscription-Key": "47ffd667d69c4a858f92fc38dc24b150",
|
|
"Ocp-Apim-Trace": "true",
|
|
"Origin": "https://bristolcouncil.powerappsportals.com",
|
|
"Referer": "https://bristolcouncil.powerappsportals.com/",
|
|
"Sec-Fetch-Dest": "empty",
|
|
"Sec-Fetch-Mode": "cors",
|
|
"Sec-Fetch-Site": "cross-site",
|
|
"Sec-GPC": "1",
|
|
"User-Agent": UA,
|
|
}
|
|
|
|
_uprn = str(uprn).zfill(12)
|
|
|
|
async with httpx.AsyncClient(timeout=20) as client:
|
|
# Initialise form
|
|
payload = {"servicetypeid": "7dce896c-b3ba-ea11-a812-000d3a7f1cdc"}
|
|
response = await client.get(
|
|
"https://bristolcouncil.powerappsportals.com/completedynamicformunauth/",
|
|
headers=HEADERS,
|
|
params=payload,
|
|
)
|
|
|
|
host = "bcprdapidyna002.azure-api.net"
|
|
|
|
# Set the search criteria
|
|
payload = {"Uprn": "UPRN" + _uprn}
|
|
response = await client.post(
|
|
f"https://{host}/bcprdfundyna001-llpg/DetailedLLPG",
|
|
headers=HEADERS,
|
|
json=payload,
|
|
)
|
|
|
|
# Retrieve the schedule
|
|
payload = {"uprn": _uprn}
|
|
response = await client.post(
|
|
f"https://{host}/bcprdfundyna001-alloy/NextCollectionDates",
|
|
headers=HEADERS,
|
|
json=payload,
|
|
)
|
|
|
|
return response
|
|
|
|
|
|
async def get_bristol_gov_uk(start_date: date, data_dir: str, uprn: str) -> list[Event]:
|
|
"""Get waste collection schedule from Bristol City Council."""
|
|
data = await get_bristol_data(data_dir, uprn)
|
|
|
|
by_date: defaultdict[date, list[str]] = defaultdict(list)
|
|
|
|
for item in data:
|
|
service = item["containerName"]
|
|
service = "Recycling" if "Recycling" in service else service.partition(" ")[2]
|
|
for collection in item["collection"]:
|
|
for collection_date_key in ["nextCollectionDate", "lastCollectionDate"]:
|
|
d = date.fromisoformat(collection[collection_date_key][:10])
|
|
if d < start_date:
|
|
continue
|
|
if service not in by_date[d]:
|
|
by_date[d].append(service)
|
|
|
|
return [
|
|
Event(name="waste_schedule", date=d, title="Bristol: " + ", ".join(services))
|
|
for d, services in by_date.items()
|
|
]
|