Upgrade to OAuth 2

This commit is contained in:
Edward Betts 2024-06-06 14:43:44 +01:00
parent fc34c2bdcf
commit 4d32f62dfb

View file

@ -1,67 +1,82 @@
"""OSM Authentication.""" """OSM Authentication."""
import json
import typing import typing
from datetime import datetime from datetime import datetime
from urllib.parse import urlencode from urllib.parse import urlencode
import flask
import lxml.etree import lxml.etree
from flask import current_app, g, session import requests
from requests_oauthlib import OAuth1Session from requests_oauthlib import OAuth2Session
from . import user_agent_headers from . import user_agent_headers
from .model import User from .model import User
osm_api_base = "https://api.openstreetmap.org/api/0.6" osm_api_base = "https://api.openstreetmap.org/api/0.6"
scope = ["read_prefs", "write_api"]
def api_put_request(path, **kwargs): def get_session() -> OAuth2Session:
user = g.user """Get session."""
assert user.is_authenticated token = flask.session.get("oauth_token")
oauth = OAuth1Session( if not token:
current_app.config["CLIENT_KEY"], user = flask.g.user
client_secret=current_app.config["CLIENT_SECRET"], assert user.is_authenticated
resource_owner_key=user.osm_oauth_token, token = json.loads(user.osm_oauth_token)
resource_owner_secret=user.osm_oauth_token_secret, flask.session["oauth_token"] = token
callback = flask.url_for("oauth_callback", _external=True)
return OAuth2Session(
flask.current_app.config["CLIENT_KEY"],
redirect_uri=callback,
scope=scope,
token=token,
) )
def api_put_request(path: str, **kwargs: typing.Any) -> requests.Response:
"""Send OSM API PUT request."""
oauth = get_session()
return oauth.request( return oauth.request(
"PUT", osm_api_base + path, headers=user_agent_headers(), **kwargs "PUT", osm_api_base + path, headers=user_agent_headers(), **kwargs
) )
def api_request(path, **params): def api_request(path: str, **params: typing.Any) -> requests.Response:
user = g.user """Send OSM API request."""
assert user.is_authenticated
app = current_app
url = osm_api_base + path url = osm_api_base + path
if params: if params:
url += "?" + urlencode(params) url += "?" + urlencode(params)
client_key = app.config["CLIENT_KEY"]
client_secret = app.config["CLIENT_SECRET"] oauth = get_session()
oauth = OAuth1Session(
client_key,
client_secret=client_secret,
resource_owner_key=user.osm_oauth_token,
resource_owner_secret=user.osm_oauth_token_secret,
)
return oauth.get(url, timeout=4) return oauth.get(url, timeout=4)
def parse_iso_date(value): def parse_iso_date(value: str) -> datetime:
"""Parse ISO date."""
return datetime.strptime(value, "%Y-%m-%dT%H:%M:%SZ") return datetime.strptime(value, "%Y-%m-%dT%H:%M:%SZ")
def parse_userinfo_call(xml): def parse_userinfo_call(xml: bytes) -> dict[str, typing.Any]:
"""Parse userinfo call."""
root = lxml.etree.fromstring(xml) root = lxml.etree.fromstring(xml)
user = root[0] user = root[0]
img = user.find(".//img") img = user.find(".//img")
account_created = parse_iso_date(user.get("account_created")) account_created_date = user.get("account_created")
assert account_created_date
account_created = parse_iso_date(account_created_date)
assert user.tag == "user" assert user.tag == "user"
id_str = user.get("id")
assert id_str and isinstance(id_str, str)
return { return {
"account_created": account_created, "account_created": account_created,
"id": int(user.get("id")), "id": int(id_str),
"username": user.get("display_name"), "username": user.get("display_name"),
"description": user.findtext(".//description"), "description": user.findtext(".//description"),
"img": (img.get("href") if img is not None else None), "img": (img.get("href") if img is not None else None),
@ -70,10 +85,10 @@ def parse_userinfo_call(xml):
def get_username() -> str | None: def get_username() -> str | None:
"""Get username of current user.""" """Get username of current user."""
if "user_id" not in session: if "user_id" not in flask.session:
return None # not authorized return None # not authorized
user_id = session["user_id"] user_id = flask.session["user_id"]
user = User.query.get(user_id) user = User.query.get(user_id)
return typing.cast(str, user.username) return typing.cast(str, user.username)