Split currency rate code into own file

This commit is contained in:
Edward Betts 2023-11-05 12:27:40 +00:00
parent 583bfcac2e
commit 7949e16195
2 changed files with 44 additions and 32 deletions

View file

@ -5,7 +5,6 @@ import os
import typing import typing
import warnings import warnings
from datetime import date, datetime, time, timedelta, timezone from datetime import date, datetime, time, timedelta, timezone
from decimal import Decimal
from time import time as unixtime from time import time as unixtime
from typing import List from typing import List
@ -25,7 +24,7 @@ from dateutil.relativedelta import FR, relativedelta
from agenda import thespacedevs from agenda import thespacedevs
from . import gwr, markets, waste_schedule from . import fx, gwr, markets, waste_schedule
from .types import Event from .types import Event
warnings.simplefilter(action="ignore", category=FutureWarning) warnings.simplefilter(action="ignore", category=FutureWarning)
@ -150,35 +149,6 @@ def get_next_bank_holiday(input_date: date) -> list[Event]:
return hols return hols
def get_gbpusd() -> Decimal:
"""Get the current value for GBPUSD, with caching."""
now = datetime.now()
now_str = now.strftime("%Y-%m-%d_%H:%M")
fx_dir = os.path.join(data_dir, "fx")
existing_data = os.listdir(fx_dir)
existing = [f for f in existing_data if f.endswith("_GBPUSD.json")]
if existing:
recent_filename = max(existing)
recent = datetime.strptime(recent_filename, "%Y-%m-%d_%H:%M_GBPUSD.json")
delta = now - recent
if existing and delta < timedelta(hours=6):
full = os.path.join(fx_dir, recent_filename)
data = json.load(open(full), parse_float=Decimal)
if "quotes" in data and "USDGBP" in data["quotes"]:
return typing.cast(Decimal, 1 / data["quotes"]["USDGBP"])
url = "http://api.exchangerate.host/live"
params = {"currencies": "GBP,USD", "access_key": access_key}
filename = f"{fx_dir}/{now_str}_GBPUSD.json"
r = requests.get(url, params=params)
open(filename, "w").write(r.text)
data = json.loads(r.text, parse_float=Decimal)
return typing.cast(Decimal, 1 / data["quotes"]["USDGBP"])
def next_economist(input_date: date) -> date: def next_economist(input_date: date) -> date:
"""Next date that the Economist is published.""" """Next date that the Economist is published."""
# Define the publication day (Thursday) and the day of the week of the input date # Define the publication day (Thursday) and the day of the week of the input date
@ -558,7 +528,7 @@ def get_data(now: datetime) -> typing.Mapping[str, str | object]:
reply = { reply = {
"now": now, "now": now,
"gbpusd": get_gbpusd(), "gbpusd": fx.get_gbpusd(config),
"next_economist": next_economist(today), "next_economist": next_economist(today),
"bank_holiday": get_next_bank_holiday(today), "bank_holiday": get_next_bank_holiday(today),
"us_holiday": get_us_holidays(today), "us_holiday": get_us_holidays(today),

42
agenda/fx.py Normal file
View file

@ -0,0 +1,42 @@
"""Currency exchange rates."""
import configparser
import json
import os
import typing
from datetime import datetime, timedelta
from decimal import Decimal
import requests
def get_gbpusd(config: configparser.ConfigParser) -> Decimal:
"""Get the current value for GBPUSD, with caching."""
access_key = config.get("exchangerate", "access_key")
data_dir = config.get("data", "dir")
now = datetime.now()
now_str = now.strftime("%Y-%m-%d_%H:%M")
fx_dir = os.path.join(data_dir, "fx")
existing_data = os.listdir(fx_dir)
existing = [f for f in existing_data if f.endswith("_GBPUSD.json")]
if existing:
recent_filename = max(existing)
recent = datetime.strptime(recent_filename, "%Y-%m-%d_%H:%M_GBPUSD.json")
delta = now - recent
if existing and delta < timedelta(hours=6):
full = os.path.join(fx_dir, recent_filename)
data = json.load(open(full), parse_float=Decimal)
if "quotes" in data and "USDGBP" in data["quotes"]:
return typing.cast(Decimal, 1 / data["quotes"]["USDGBP"])
url = "http://api.exchangerate.host/live"
params = {"currencies": "GBP,USD", "access_key": access_key}
filename = f"{fx_dir}/{now_str}_GBPUSD.json"
r = requests.get(url, params=params)
open(filename, "w").write(r.text)
data = json.loads(r.text, parse_float=Decimal)
return typing.cast(Decimal, 1 / data["quotes"]["USDGBP"])