Compare commits

..

No commits in common. "7949e16195bc954efc050f4f7b379cfca2703adc" and "dd5ec4d2336558313e4f1518084a2016a7bf519e" have entirely different histories.

3 changed files with 154 additions and 168 deletions

View file

@ -5,6 +5,7 @@ import os
import typing import typing
import warnings import warnings
from datetime import date, datetime, time, timedelta, timezone from datetime import date, datetime, time, timedelta, timezone
from decimal import Decimal
from time import time as unixtime from time import time as unixtime
from typing import List from typing import List
@ -20,11 +21,11 @@ import pytz
import requests import requests
import yaml import yaml
from dateutil.easter import easter from dateutil.easter import easter
from dateutil.relativedelta import FR, relativedelta from dateutil.relativedelta import FR, SA, relativedelta
from agenda import thespacedevs from agenda import thespacedevs
from . import fx, gwr, markets, waste_schedule from . import gwr, waste_schedule
from .types import Event from .types import Event
warnings.simplefilter(action="ignore", category=FutureWarning) warnings.simplefilter(action="ignore", category=FutureWarning)
@ -149,6 +150,35 @@ def get_next_bank_holiday(input_date: date) -> list[Event]:
return hols return hols
def get_gbpusd() -> Decimal:
"""Get the current value for GBPUSD, with caching."""
now = datetime.now()
now_str = now.strftime("%Y-%m-%d_%H:%M")
fx_dir = os.path.join(data_dir, "fx")
existing_data = os.listdir(fx_dir)
existing = [f for f in existing_data if f.endswith("_GBPUSD.json")]
if existing:
recent_filename = max(existing)
recent = datetime.strptime(recent_filename, "%Y-%m-%d_%H:%M_GBPUSD.json")
delta = now - recent
if existing and delta < timedelta(hours=6):
full = os.path.join(fx_dir, recent_filename)
data = json.load(open(full), parse_float=Decimal)
if "quotes" in data and "USDGBP" in data["quotes"]:
return typing.cast(Decimal, 1 / data["quotes"]["USDGBP"])
url = "http://api.exchangerate.host/live"
params = {"currencies": "GBP,USD", "access_key": access_key}
filename = f"{fx_dir}/{now_str}_GBPUSD.json"
r = requests.get(url, params=params)
open(filename, "w").write(r.text)
data = json.loads(r.text, parse_float=Decimal)
return typing.cast(Decimal, 1 / data["quotes"]["USDGBP"])
def next_economist(input_date: date) -> date: def next_economist(input_date: date) -> date:
"""Next date that the Economist is published.""" """Next date that the Economist is published."""
# Define the publication day (Thursday) and the day of the week of the input date # Define the publication day (Thursday) and the day of the week of the input date
@ -285,6 +315,124 @@ def critical_mass(start_date: date, limit: int = 12) -> list[Event]:
return events return events
def windmill_hill_market_days(start_date: date) -> list[Event]:
"""Windmill Hill Market days for the next 12 months from a given date."""
events: list[Event] = []
current_date = start_date
url = (
"https://www.windmillhillcityfarm.org.uk"
+ "/visit-us/shops-more/windmill-hill-market-bristol-market/"
)
# To keep count of how many market days have been calculated
count = 0
tz = pytz.timezone("Europe/London")
start = time(10, 0)
end = time(15, 0)
while count < 12:
# Skip months outside of April to December
if current_date.month < 4 or current_date.month > 12:
current_date += relativedelta(months=1)
current_date = date(current_date.year, current_date.month, 1)
continue
# Calculate the first Saturday of the current month
first_saturday = current_date + relativedelta(day=1, weekday=SA(+1))
# Include it in the list only if it's on or after the start_date
if first_saturday >= start_date:
events.append(
Event(
name="market",
title="Windmill Hill Market",
date=tz.localize(datetime.combine(first_saturday, start)),
end_date=tz.localize(datetime.combine(first_saturday, end)),
url=url,
)
)
count += 1
# Move to the next month
current_date += relativedelta(months=1)
current_date = date(current_date.year, current_date.month, 1)
return events
def tobacco_factory_market_days(start_date: date) -> list[Event]:
"""Tobacco Factory Market days for the next 12 months from a given date."""
events: list[Event] = []
current_date = start_date
count = 0
url = "https://tobaccofactory.com/whats-on/sunday-market/"
tz = pytz.timezone("Europe/London")
start = time(10, 0)
end = time(14, 30)
while count < 52: # 52 weeks in a year
# Calculate the next Sunday from the current date
next_sunday = current_date + relativedelta(weekday=6) # Sunday is 6
# Include it in the list only if it's on or after the start_date
if next_sunday >= start_date:
events.append(
Event(
name="market",
title="Tobacco Factory Sunday Market",
date=tz.localize(datetime.combine(next_sunday, start)),
end_date=tz.localize(datetime.combine(next_sunday, end)),
url=url,
)
)
count += 1
# Move to the next week
current_date += timedelta(weeks=1)
return events
def nailsea_farmers_market_days(start_date: date) -> list[Event]:
"""Nailsea Farmers Market days for the next 12 months from a given date."""
events: list[Event] = []
current_date = start_date
count = 0
tz = pytz.timezone("Europe/London")
t = time(9, 0) # The market starts at 9am
while count < 12:
# Calculate the 3rd Saturday of the current month
third_saturday = current_date + relativedelta(day=1, weekday=SA(+3))
# Include it in the list only if it's on or after the start_date
if third_saturday >= start_date:
events.append(
Event(
name="market",
title="Nailsea Farmers Market",
date=tz.localize(datetime.combine(third_saturday, t)),
)
)
count += 1
# Move to the next month
current_date += relativedelta(months=1)
current_date = date(current_date.year, current_date.month, 1)
return events
# Test the function
if __name__ == "__main__":
start_date = date(2023, 10, 29)
print(windmill_hill_market_days(start_date))
def as_date(d: date | datetime) -> date: def as_date(d: date | datetime) -> date:
"""Return date for given date or datetime.""" """Return date for given date or datetime."""
return d.date() if isinstance(d, datetime) else d return d.date() if isinstance(d, datetime) else d
@ -528,7 +676,7 @@ def get_data(now: datetime) -> typing.Mapping[str, str | object]:
reply = { reply = {
"now": now, "now": now,
"gbpusd": fx.get_gbpusd(config), "gbpusd": get_gbpusd(),
"next_economist": next_economist(today), "next_economist": next_economist(today),
"bank_holiday": get_next_bank_holiday(today), "bank_holiday": get_next_bank_holiday(today),
"us_holiday": get_us_holidays(today), "us_holiday": get_us_holidays(today),
@ -543,9 +691,9 @@ def get_data(now: datetime) -> typing.Mapping[str, str | object]:
"gwr_advance_tickets": gwr.advance_ticket_date(data_dir), "gwr_advance_tickets": gwr.advance_ticket_date(data_dir),
"critical_mass": critical_mass(today), "critical_mass": critical_mass(today),
"market": ( "market": (
markets.windmill_hill(today) windmill_hill_market_days(today)
+ markets.tobacco_factory(today) + tobacco_factory_market_days(today)
+ markets.nailsea_farmers(today) + nailsea_farmers_market_days(today)
), ),
"rockets": thespacedevs.get_launches(rocket_dir, limit=40), "rockets": thespacedevs.get_launches(rocket_dir, limit=40),
} }

View file

@ -1,42 +0,0 @@
"""Currency exchange rates."""
import configparser
import json
import os
import typing
from datetime import datetime, timedelta
from decimal import Decimal
import requests
def get_gbpusd(config: configparser.ConfigParser) -> Decimal:
"""Get the current value for GBPUSD, with caching."""
access_key = config.get("exchangerate", "access_key")
data_dir = config.get("data", "dir")
now = datetime.now()
now_str = now.strftime("%Y-%m-%d_%H:%M")
fx_dir = os.path.join(data_dir, "fx")
existing_data = os.listdir(fx_dir)
existing = [f for f in existing_data if f.endswith("_GBPUSD.json")]
if existing:
recent_filename = max(existing)
recent = datetime.strptime(recent_filename, "%Y-%m-%d_%H:%M_GBPUSD.json")
delta = now - recent
if existing and delta < timedelta(hours=6):
full = os.path.join(fx_dir, recent_filename)
data = json.load(open(full), parse_float=Decimal)
if "quotes" in data and "USDGBP" in data["quotes"]:
return typing.cast(Decimal, 1 / data["quotes"]["USDGBP"])
url = "http://api.exchangerate.host/live"
params = {"currencies": "GBP,USD", "access_key": access_key}
filename = f"{fx_dir}/{now_str}_GBPUSD.json"
r = requests.get(url, params=params)
open(filename, "w").write(r.text)
data = json.loads(r.text, parse_float=Decimal)
return typing.cast(Decimal, 1 / data["quotes"]["USDGBP"])

View file

@ -1,120 +0,0 @@
"""Market days."""
from datetime import date, datetime, time, timedelta
import pytz
from dateutil.relativedelta import SA, relativedelta
from .types import Event
def windmill_hill(start_date: date) -> list[Event]:
"""Windmill Hill Market days for the next 12 months from a given date."""
events: list[Event] = []
current_date = start_date
url = (
"https://www.windmillhillcityfarm.org.uk"
+ "/visit-us/shops-more/windmill-hill-market-bristol-market/"
)
# To keep count of how many market days have been calculated
count = 0
tz = pytz.timezone("Europe/London")
start = time(10, 0)
end = time(15, 0)
while count < 12:
# Skip months outside of April to December
if current_date.month < 4 or current_date.month > 12:
current_date += relativedelta(months=1)
current_date = date(current_date.year, current_date.month, 1)
continue
# Calculate the first Saturday of the current month
first_saturday = current_date + relativedelta(day=1, weekday=SA(+1))
# Include it in the list only if it's on or after the start_date
if first_saturday >= start_date:
events.append(
Event(
name="market",
title="Windmill Hill Market",
date=tz.localize(datetime.combine(first_saturday, start)),
end_date=tz.localize(datetime.combine(first_saturday, end)),
url=url,
)
)
count += 1
# Move to the next month
current_date += relativedelta(months=1)
current_date = date(current_date.year, current_date.month, 1)
return events
def tobacco_factory(start_date: date) -> list[Event]:
"""Tobacco Factory Market days for the next 12 months from a given date."""
events: list[Event] = []
current_date = start_date
count = 0
url = "https://tobaccofactory.com/whats-on/sunday-market/"
tz = pytz.timezone("Europe/London")
start = time(10, 0)
end = time(14, 30)
while count < 52: # 52 weeks in a year
# Calculate the next Sunday from the current date
next_sunday = current_date + relativedelta(weekday=6) # Sunday is 6
# Include it in the list only if it's on or after the start_date
if next_sunday >= start_date:
events.append(
Event(
name="market",
title="Tobacco Factory Sunday Market",
date=tz.localize(datetime.combine(next_sunday, start)),
end_date=tz.localize(datetime.combine(next_sunday, end)),
url=url,
)
)
count += 1
# Move to the next week
current_date += timedelta(weeks=1)
return events
def nailsea_farmers(start_date: date) -> list[Event]:
"""Nailsea Farmers Market days for the next 12 months from a given date."""
events: list[Event] = []
current_date = start_date
count = 0
tz = pytz.timezone("Europe/London")
t = time(9, 0) # The market starts at 9am
while count < 12:
# Calculate the 3rd Saturday of the current month
third_saturday = current_date + relativedelta(day=1, weekday=SA(+3))
# Include it in the list only if it's on or after the start_date
if third_saturday >= start_date:
events.append(
Event(
name="market",
title="Nailsea Farmers Market",
date=tz.localize(datetime.combine(third_saturday, t)),
)
)
count += 1
# Move to the next month
current_date += relativedelta(months=1)
current_date = date(current_date.year, current_date.month, 1)
return events